20 Ekim 2022 Perşembe

Hazelcast Jet Pipeline Execution Model

Giriş
Jet'i anlatan bir makale burada
Biz Pipeline kodunu yazıyoruz. Elimizde şöyle bir kod olsun
Pipeline p = Pipeline.create();
p.readFrom(textSource())
 .flatMap(line -> traverseArray(line.toLowerCase().split("\\W+")))
 .filter(word -> !word.isEmpty())
 .groupingKey(wholeItem())
 .aggregate(AggregateOperations.counting())
 .writeTo(someSink());
Sonra bu Pipeline DAG'a çevriliyor. Pipeline DAG ise bazı optimizasyonlara uğradıktan sonra Core DAG'a çevriliyor.  DAG Sınıfı yazısına bakabilirsiniz

Şeklen şöyle
Optimizasyon için açıklama şöyle
We can see that Jet applied some simple graph transformations:
- joined consecutive stateless transforms into a single vertex
- implemented the group-and-aggregate transform as two vertices
Sonra Core DAG stage'leri arasında bağlantı hesaplanıyor. Şeklen şöyle
Yukarıdaki şekilde "Combine" safhasının "distributed partitioned" olduğu görülebilir. Yani verilen key'e bakılarak hangi üyede çalışan hangi stage'e gönderileceği hesaplanır. Buna execution plan olarak bakarsak  şeklen şöyle. Burda Jet Node 1 açısından gösteriliyor. "From Accumulate on Jet Node 2" ile veri giriyor. "To Combine on Jet Node 2" ile veri gönderiliyor.


Stage Çeşitleri
İki çeşit Stage var.
 1. BatchStage
2. StreamStage

Kodda BatchStage ve StreamStage arayüzleri görülebilir.
Örnek
Şöyle yaparız
// Listing 2: Hybrid Batch & Streaming Program
Pipeline p = Pipeline.create();
BatchStage<Person> persons = p.readFrom("persons");
BatchStage<Tuple2<Integer, Long>> countByAge = persons
  .groupingKey(Person::age)
  .aggregate(counting());

StreamStage<Order> orders = p.readFrom(kafka(...));
StreamStage<Entry<Order, Long>> ordersWithAgeCounts = orders
  .hashJoin(countByAge, joiningMapEntries(Order::ageOfBuyer))
  .writeTo(someSink());
Cooperative Execution Engine
Core DAG oluşturulduktan sonra bunun çalıştırılması var. Çalıştırmak için cooperative model kullanılıyor. Bu modelde her işlemci için bir thread açılır. Thread per core yani.

Cooperative olması için pipeline içindeki thread'ler hiç bir zaman bloke olmaması gerekir. Execution Plan'deki her bir Stage bir thread'e atanıyor. Şeklen şöyle. Yukarıdaki şekilde 10 tane Stage vardı. Aşağıdaki şekilde de 10 tane stage var. Her birisi bir thread'e atanmış vaziyette.
Thread için döngü şöyle.
while (true) {
  boolean madeProgress = false;
  for (Iterator<Tasklet> it = tasklets.iterator(); it.hasNext();) {
    ProgressState ps = it.next().call();
    if (ps.isDone) {
      it.remove();
    }
    madeProgress |= ps.madeProgress;
  }
  if (!madeProgress) {
    backOff();
  }
}
Tasklet Nedir?
Tasklet yazısına taşıdım

Processor Outbox'ı Çağırır
Outbox şöyle
interface Outbox {
  boolean offer(int ordinal, @Nonnull Object item);
  ...
}
Açıklaması şöyle
offer() is non-blocking, but will fail when the outbox is full, returning false. The processor will react to this by returning from its process() method, and then the tasklet returns from call(). The processor must preserve its state of computation so that it can resume where it left off the next time it's called.
Processor Input'u Lazy Okur
Traverser kullanır. Traverser şöyle
interface Traverser<T> {
  T next();
  ...
}
Açıklaması şöyle
In many cases the processor satisfies the non-blocking contract by creating a lazy sequence from the input and attaching transformation steps to it (akin to Kotlin sequences). Jet defines the Traverser<T> type for this purpose, an iterator-like object with just a single abstract method:
Copy

This lightweight contract allows us to implement Traverser with just a lambda expression. If you look at the source code of Jet processor, you may encounter quite complex code inside Traverser transforms. A good example is the SlidingWindowP processor.
BackPressure
Açıklaması şöyle
Local communication between tasklets inside the same Jet node is easy: we just use bounded queues and force the tasklets to back off as soon as all their output queues are full.

Backpressure is trickier over a network link: instead of a shared memory location you can use for reliable instant signaling, all we have are messages sent over unreliable links that have significant latency. Hazelcast Jet uses a design very similar to the TCP/IP adaptive receive window: the sender must wait for an acknowledgment from the receiver telling it how many more data items it can send. After processing item N, the receiver sends a message that the sender can send up to item N + RWIN.

The receiver sends the acknowledgment message ten times per second, so as long as the receive window is large enough to hold the amount of data processed within 100 milliseconds plus network link latency, the receiver will always have data ready to be processed:



Hiç yorum yok:

Yorum Gönder

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt