9 Haziran 2023 Cuma

Hazelcast Jet Processor.complete metodu - Source Processor İçindir

Giriş
İmzası şöyle
boolean complete()
Açıklaması şöyle
For example, a streaming source processor will return false forever. 
Açıklaması şöyle. Yani false dönerse, Jet Engine Processor'ı tekrar çağırır
If the complete() method returns false, the Hazelcast Jet engine will attempt to reschedule the processor to run again.
Örnek - Stream Source
Bu kodu ReadKafkaConnectP.java dosyasından aldım. İskeleti şöyle. complete() hep false döner. Eğer okunan kaynak boş bir şey dönerse eventTimeMapper.flatMapIdle(); çağrılır. Bu metod içinde saat bilgisi bulunduran bir Traverser döner. Yan aslında watermark döndürüyor
public class ReadKafkaConnectP extends AbstractProcessor {
  ...
  private final EventTimeMapper<SourceRecord> eventTimeMapper;
  private Traverser<?> traverser;
  
  @Override
  public boolean complete() {
    ...
    if (traverser == null) {
      List<SourceRecord> sourceRecords = ...
      this.traverser = traverser(sourceRecords)
        .map(rec -> {
          taskRunner.commitRecord((SourceRecord) rec);
          return rec;
        })
        .onFirstNull(() -> traverser = null);
    }
    emitFromTraverser(traverser);
    return false;
  }
  public Traverser<?> traverser(List<SourceRecord> sourceRecords) {
    if (sourceRecords.isEmpty()) {
      return eventTimeMapper.flatMapIdle();
    }
    return traverseIterable(sourceRecords)
      .flatMap(rec -> {
        long eventTime = rec.timestamp() == null ? 0 : rec.timestamp();
        return eventTimeMapper.flatMapEvent(rec, 0, eventTime);
    });
  }
}
Örnek - Batch veya Stream Source
Bu kodu ReadMongoP.java dosyasından aldım. complete() metodunda 
1. batch iş için reader.everCompletes() içinde hep true döner, 
2. stream iş için reader.everCompletes() içinde hep false döner
3. emitFromTraverser() eğer traverse'da next() yoksa false döner.
public class ReadMongoP<I> extends AbstractProcessor {

  private final MongoChunkedReader reader;
 
  private Traverser<?> traverser;
   
  @Override
  public boolean complete() {
    ...
    if (traverser == null) {
      this.traverser = reader.nextChunkTraverser()
                             .onFirstNull(() -> traverser = null);
    }
    if (!emitFromTraverser(traverser)) {
      return false;
    }
    ...
    return reader.everCompletes();
  }
  ...
}

Hiç yorum yok:

Yorum Gönder

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt