9 Haziran 2023 Cuma

Hazelcast Jet Processor.process metodu

Giriş
Write Processor veya ara adımları geçekleştiren Processor  tarafından override edilir. İmzası şöyle
void process(int ordinal, Inbox inbox)
Örnek
Kod şöyle. Burada en son işlenen mesajlar bittiyse buffer nesnesi tekrar dolduruluyor ve gönderme işlemi tekrar başlıyor
private final Buffer<T> buffer;

@Override
public void process(int ordinal, @Nonnull Inbox inbox) {
  ...
  if (sendResult != null) {
    checkIfSendingFinished();
  }
  if (sendResult == null) {
    initSending(inbox);
  }
}
private void initSending(@Nullable Inbox inbox) {
  if (inbox != null) {
    bufferFromInbox(inbox);
  }
  attemptToDispatchBufferContent();
}
private void bufferFromInbox(@Nonnull Inbox inbox) {
  for (T t; (t = (T) inbox.peek()) != null && buffer.add(t); ) {
    inbox.remove();
  }
}
private void attemptToDispatchBufferContent() {
  if (buffer.isEmpty()) {
    return;
  }
  long currentTime = nanoTime();
  if (currentTime < nextSendTime) {
    return;
  }
  List<PutRecordsRequestEntry> entries = buffer.content();
  sendResult = putRecordsAsync(entries);
  nextSendTime = currentTime;
}

Hiç yorum yok:

Yorum Gönder

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt