Write Processor veya ara adımları geçekleştiren Processor tarafından override edilir. İmzası şöyle
void process(int ordinal, Inbox inbox)
Örnek
Kod şöyle. Burada en son işlenen mesajlar bittiyse buffer nesnesi tekrar dolduruluyor ve gönderme işlemi tekrar başlıyor
private final Buffer<T> buffer; @Override public void process(int ordinal, @Nonnull Inbox inbox) { ... if (sendResult != null) { checkIfSendingFinished(); } if (sendResult == null) { initSending(inbox); } } private void initSending(@Nullable Inbox inbox) { if (inbox != null) { bufferFromInbox(inbox); } attemptToDispatchBufferContent(); } private void bufferFromInbox(@Nonnull Inbox inbox) { for (T t; (t = (T) inbox.peek()) != null && buffer.add(t); ) { inbox.remove(); } } private void attemptToDispatchBufferContent() { if (buffer.isEmpty()) { return; } long currentTime = nanoTime(); if (currentTime < nextSendTime) { return; } List<PutRecordsRequestEntry> entries = buffer.content(); sendResult = putRecordsAsync(entries); nextSendTime = currentTime; }
Hiç yorum yok:
Yorum Gönder