21 Kasım 2022 Pazartesi

Hazelcast Jet StreamKafkaP Sınıfı - Kafka Consumer

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.kafka.impl.StreamKafkaP;
Processor arayüzünden kalıtır. Üye alanlar şöyle
public final class StreamKafkaP<K, V, T> extends AbstractProcessor {

  private KafkaConsumer<K, V> consumer;
  ...
}
Bu nesneyi yaratan çağrı zinciri şöyle
KafkaSources -> KafkaProcessors -> StreamKafkaP
complete metodu
Bu processor hiç bir zaman kapanmayacağı için complete metodu false döner. Kod şöyle
@Override
public boolean complete() {
  if (!emitFromTraverser(traverser)) {
    return false;
  }

  ConsumerRecords<K, V> records = null;
  assignPartitions();
  if (!currentAssignment.isEmpty()) {
    records = consumer.poll(Duration.ZERO);
  }

  ...
  return false;
}
init metodu
Kod şöyle. Burada kafkaConsumerFn.apply(context); çağrısı KafkaConsumer nesnesini yaratılıyor. 
@Override
protected void init(@Nonnull Context context) {
  topics = new ArrayList<>(topicsConfig.getTopicNames());
  for (String topic : topics) {
    offsets.put(topic, new long[0]);
  }
  processorIndex = context.globalProcessorIndex();
  totalParallelism = context.totalParallelism();
  processingGuarantee = context.processingGuarantee();
  consumer = kafkaConsumerFn.apply(context);
  if (processingGuarantee == NONE) {
    warnWhenInitialOffsetsProvided();
  }
}
warnWhenInitialOffsetsProvided metodu
Eğer processingGuarantee NONE ise topic offset earliest varsayılır. TopicsConfig nesnesindeki her TopicConfig nesnesi için getPartitionsInitialOffsets() değerinin boş olup olmadığını kontrol eder.


Hiç yorum yok:

Yorum Gönder

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt