Şu satırı dahil ederiz
import com.hazelcast.jet.kafka.impl.StreamKafkaP;
Processor arayüzünden kalıtır. Üye alanlar şöyle
public final class StreamKafkaP<K, V, T> extends AbstractProcessor {
private KafkaConsumer<K, V> consumer;
...
}Bu nesneyi yaratan çağrı zinciri şöyle
KafkaSources -> KafkaProcessors -> StreamKafkaP
complete metodu
Bu processor hiç bir zaman kapanmayacağı için complete metodu false döner. Kod şöyle
@Override
public boolean complete() {
if (!emitFromTraverser(traverser)) {
return false;
}
ConsumerRecords<K, V> records = null;
assignPartitions();
if (!currentAssignment.isEmpty()) {
records = consumer.poll(Duration.ZERO);
}
...
return false;
}init metodu
Kod şöyle. Burada kafkaConsumerFn.apply(context); çağrısı KafkaConsumer nesnesini yaratılıyor.
@Override
protected void init(@Nonnull Context context) {
topics = new ArrayList<>(topicsConfig.getTopicNames());
for (String topic : topics) {
offsets.put(topic, new long[0]);
}
processorIndex = context.globalProcessorIndex();
totalParallelism = context.totalParallelism();
processingGuarantee = context.processingGuarantee();
consumer = kafkaConsumerFn.apply(context);
if (processingGuarantee == NONE) {
warnWhenInitialOffsetsProvided();
}
}warnWhenInitialOffsetsProvided metodu
Eğer processingGuarantee NONE ise topic offset earliest varsayılır. TopicsConfig nesnesindeki her TopicConfig nesnesi için getPartitionsInitialOffsets() değerinin boş olup olmadığını kontrol eder.
Hiç yorum yok:
Yorum Gönder