Şu satırı dahil ederiz
import com.hazelcast.jet.kafka.impl.StreamKafkaP;
Processor arayüzünden kalıtır. Üye alanlar şöyle
public final class StreamKafkaP<K, V, T> extends AbstractProcessor { private KafkaConsumer<K, V> consumer; ... }
Bu nesneyi yaratan çağrı zinciri şöyle
KafkaSources -> KafkaProcessors -> StreamKafkaP
complete metodu
Bu processor hiç bir zaman kapanmayacağı için complete metodu false döner. Kod şöyle
@Override public boolean complete() { if (!emitFromTraverser(traverser)) { return false; } ConsumerRecords<K, V> records = null; assignPartitions(); if (!currentAssignment.isEmpty()) { records = consumer.poll(Duration.ZERO); } ... return false; }
init metodu
Kod şöyle. Burada kafkaConsumerFn.apply(context); çağrısı KafkaConsumer nesnesini yaratılıyor.
@Override protected void init(@Nonnull Context context) { topics = new ArrayList<>(topicsConfig.getTopicNames()); for (String topic : topics) { offsets.put(topic, new long[0]); } processorIndex = context.globalProcessorIndex(); totalParallelism = context.totalParallelism(); processingGuarantee = context.processingGuarantee(); consumer = kafkaConsumerFn.apply(context); if (processingGuarantee == NONE) { warnWhenInitialOffsetsProvided(); } }
warnWhenInitialOffsetsProvided metodu
Eğer processingGuarantee NONE ise topic offset earliest varsayılır. TopicsConfig nesnesindeki her TopicConfig nesnesi için getPartitionsInitialOffsets() değerinin boş olup olmadığını kontrol eder.
Hiç yorum yok:
Yorum Gönder