9 Haziran 2023 Cuma

Hazelcast Jet Processor.process metodu

Giriş
Write Processor veya ara adımları geçekleştiren Processor  tarafından override edilir. İmzası şöyle
void process(int ordinal, Inbox inbox)
Örnek
Kod şöyle. Burada en son işlenen mesajlar bittiyse buffer nesnesi tekrar dolduruluyor ve gönderme işlemi tekrar başlıyor
private final Buffer<T> buffer;

@Override
public void process(int ordinal, @Nonnull Inbox inbox) {
  ...
  if (sendResult != null) {
    checkIfSendingFinished();
  }
  if (sendResult == null) {
    initSending(inbox);
  }
}
private void initSending(@Nullable Inbox inbox) {
  if (inbox != null) {
    bufferFromInbox(inbox);
  }
  attemptToDispatchBufferContent();
}
private void bufferFromInbox(@Nonnull Inbox inbox) {
  for (T t; (t = (T) inbox.peek()) != null && buffer.add(t); ) {
    inbox.remove();
  }
}
private void attemptToDispatchBufferContent() {
  if (buffer.isEmpty()) {
    return;
  }
  long currentTime = nanoTime();
  if (currentTime < nextSendTime) {
    return;
  }
  List<PutRecordsRequestEntry> entries = buffer.content();
  sendResult = putRecordsAsync(entries);
  nextSendTime = currentTime;
}

Hiç yorum yok:

Yorum Gönder

HazelcastAPI CP SubSystem vs Transaciton Yapılar

Giriş Bu yazının yazılma sebebi bu soru 1. Transactional Yapılar Hazelcast TransactionalMap, birden fazla veri değişikliğinin tek bir işlem ...