Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.impl.execution.TaskletExecutionService;
Tasklet'leri çalıştıran şey TaskletExecutionService. Job'ı temsil eden şey ise LightMasterContext
Kod şöyle
public class TaskletExecutionService { public static final String TASKLET_INIT_CLOSE_EXECUTOR_NAME = "jet:tasklet_initClose"; private final ExecutorService blockingTaskletExecutor = newCachedThreadPool(new BlockingTaskThreadFactory()); private final ExecutionService hzExecutionService; private final CooperativeWorker[] cooperativeWorkers; private final Thread[] cooperativeThreadPool; private final String hzInstanceName; private final ILogger logger; private int cooperativeThreadIndex; Probe(name = "blockingWorkerCount") private final Counter blockingWorkerCount = MwCounter.newMwCounter(); private volatile boolean isShutdown; private final Object lock = new Object(); private final IdleStrategy idlerCooperative; private final IdleStrategy idlerNonCooperative; ... }
Tasklet Nedir?
Stage'i kodda temsil eden şey. Tasklet'in bir girdi Inbox'ı bir de çıktı Outbox'ı vardır. Açıklaması şöyle
Hazelcast creates multiple parallel tasklets for each stage. It transfers the data between the tasklets of consecutive stages using two main routing strategies:round-robin: a load-balancing edge that sends items to tasklets in a round-robin fashion. If a given queue is full, it tries the next one.isolated: isolates the parallel code paths from each other, thereby preserving the order of events in each path. When the two connected vertices have the same parallelism, it establishes one-to-one connections between tasklets.partitioned: computes the partition key of every item, which uniquely determines the destination tasklet. Necessary for stateful keyed transformations like group-and-aggregate.Round-robin is the default strategy. This means that an event emitted by a tasklet can be routed to any tasklet of the following stage. This strategy results in good balancing of the load of every CPU core, but it introduces event reordering.
İskeleti şöyle
interface Tasklet { ProgressState call(); ... }
Açıklaması şöyle
The execution engine repeatedly invokes call(), which is supposed to return in no more than 1 millisecond. The ProgressState result is a pair of booleans: (madeProgress, isDone). The former is used for CPU load control (to prevent a hot idle loop) and the latter signals the completion of the task. As long as the task keeps reporting "not done", Jet will call it again.
Tasklet Processor'ı Çağırır
Processor şöyle
interface Processor { void process(int ordinal, Inbox inbox); ... }
Açıklaması şöyle
ordinal identifies the input edge and inbox contains a batch of input data. The tasklet keeps calling this method until it has consumed all the items from the inbox and then refills the inbox with more data (possibly from a different input edge).
CooperativeWorker Sınıfı
TaskletExecutionService içindeki bir sınıf
run metodu
Metod kabaca şöyle
private final class CooperativeWorker implements Runnable { ... @Override public void run() { while (true) { boolean madeProgress = false; long idleCount = 0; for (Tasklet t : tasklets) { // let one tasklet do a piece of work madeProgress |= t.call(); } if (madeProgress) { idleCount = 0; } else { idle(idleCount++); } } } }
Açıklaması şöyle
The worker repeatedly calls the tasklets’ call() method. The method returns whether the tasklet made any progress. In case of processors, it returns true when the processor took anything from the inbox or put anything to the outbox (and few more special cases).The idle() call is a brief sleep in case none of the workers made progress. In Jet 3.0 it’s between 25 and 1000µs, depending on the idleCount (*2). As you probably can see now, if just one of the tasklets made progress, there won’t be any sleeping at all and we’ll proceed to the next iteration immediately. What this can cause is that even with pretty low traffic, the CPU usage will be quite high, especially if there are many tasklets per worker.
Hiç yorum yok:
Yorum Gönder