Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.impl.connector.StreamEventJournalP;
Cluster Member Sayısı kadar ClusterProcessorSupplier nesnesi yaratılır. Bu sınıf ta kendine verilen partition listesi * localParallelism kadar Processor yaratılır. Kod şöyle.
private static class ClusterProcessorSupplier<E, T> implements ProcessorSupplier { static final long serialVersionUID = 1L; @Nonnull private final List<Integer> ownedPartitions; @Nullable private final String clientXml; @Nonnull private final FunctionEx<? super HazelcastInstance, ? extends EventJournalReader<E>> eventJournalReaderSupplier; @Nonnull private final PredicateEx<? super E> predicate; @Nonnull private final FunctionEx<? super E, ? extends T> projection; @Nonnull private final JournalInitialPosition initialPos; @Nonnull private final EventTimePolicy<? super T> eventTimePolicy; private transient HazelcastInstance client; private transient EventJournalReader<E> eventJournalReader; ... }
constructor
İmzası şöyle. Bir member adresteki belirtilen partition listesi için çalışır.
ClusterProcessorSupplier( @Nonnull List<Integer> ownedPartitions, @Nonnull ClusterMetaSupplierParams clusterMetaSupplierParams ) { ... }
ExecutionPlanBuilder tarafından çağrılır. Şöyle
<init>:465, StreamEventJournalP$ClusterProcessorSupplier (com.hazelcast.jet.impl.connector) lambda$get$2:418, StreamEventJournalP$ClusterMetaSupplier (com.hazelcast.jet.impl.connector) apply:-1, StreamEventJournalP$ClusterMetaSupplier$$Lambda$1974/0x00000008011c3a98 (com.hazelcast.jet.impl.connector) lambda$null$4:128, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init) call:-1, ExecutionPlanBuilder$$Lambda$1773/0x000000080114a000 (com.hazelcast.jet.impl.execution.init) doWithClassLoader:547, Util (com.hazelcast.jet.impl.util) lambda$createExecutionPlans$b9e545c5$1:128, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init) runEx:-1, ExecutionPlanBuilder$$Lambda$1753/0x0000000801141d98 (com.hazelcast.jet.impl.execution.init) run:31, RunnableEx (com.hazelcast.jet.function) run$$$capture:1804, CompletableFuture$AsyncRun (java.util.concurrent) run:-1, CompletableFuture$AsyncRun (java.util.concurrent) - Async stack trace <init>:1790, CompletableFuture$AsyncRun (java.util.concurrent) asyncRunStage:1818, CompletableFuture (java.util.concurrent) runAsync:2033, CompletableFuture (java.util.concurrent) createExecutionPlans:141, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init) createExecutionPlans:255, MasterJobContext (com.hazelcast.jet.impl) lambda$tryStartJob$4:237, MasterJobContext (com.hazelcast.jet.impl) lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl) submitToCoordinatorThread:1315, JobCoordinationService (com.hazelcast.jet.impl) submitToCoordinatorThread:1305, JobCoordinationService (com.hazelcast.jet.impl) tryStartJob:210, MasterJobContext (com.hazelcast.jet.impl) tryStartJob:1198, JobCoordinationService (com.hazelcast.jet.impl) lambda$submitJob$2:308, JobCoordinationService (com.hazelcast.jet.impl) lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl) lambda$submitToCoordinatorThread$55:1327, JobCoordinationService (com.hazelcast.jet.impl)
get metodu - int count
İmzası şöyle. localParallelism kadar processor yaratır
public List<Processor> get(int count)
get:494, StreamEventJournalP$ClusterProcessorSupplier (com.hazelcast.jet.impl.connector) get:432, StreamEventJournalP$ClusterProcessorSupplier (com.hazelcast.jet.impl.connector) createProcessors:503, ExecutionPlan (com.hazelcast.jet.impl.execution.init) lambda$null$1:215, ExecutionPlan (com.hazelcast.jet.impl.execution.init) call:-1, ExecutionPlan$$Lambda$2012/0x00000008011edb90 (com.hazelcast.jet.impl.execution.init) doWithClassLoader:547, Util (com.hazelcast.jet.impl.util) lambda$initialize$3:213, ExecutionPlan (com.hazelcast.jet.impl.execution.init) accept:-1, ExecutionPlan$$Lambda$1989/0x00000008011dc000 (com.hazelcast.jet.impl.execution.init) tryFire$$$capture:718, CompletableFuture$UniAccept (java.util.concurrent) tryFire:-1, CompletableFuture$UniAccept (java.util.concurrent) - Async stack trace <init>:697, CompletableFuture$UniAccept (java.util.concurrent) uniAcceptStage:737, CompletableFuture (java.util.concurrent) thenAccept:2182, CompletableFuture (java.util.concurrent) initialize:191, ExecutionPlan (com.hazelcast.jet.impl.execution.init) initialize:170, ExecutionContext (com.hazelcast.jet.impl.execution) lambda$initExecution$9:344, JobExecutionService (com.hazelcast.jet.impl) doWithClassLoader:547, Util (com.hazelcast.jet.impl.util) initExecution:343, JobExecutionService (com.hazelcast.jet.impl) doRun:98, InitExecutionOperation (com.hazelcast.jet.impl.operation) run:55, AsyncOperation (com.hazelcast.jet.impl.operation) call:190, Operation (com.hazelcast.spi.impl.operationservice) call:283, OperationRunnerImpl (com.hazelcast.spi.impl.operationservice.impl) run:258, OperationRunnerImpl (com.hazelcast.spi.impl.operationservice.impl) run:219, OperationRunnerImpl (com.hazelcast.spi.impl.operationservice.impl) run:411, OperationExecutorImpl (com.hazelcast.spi.impl.operationexecutor.impl) runOrExecute:438, OperationExecutorImpl (com.hazelcast.spi.impl.operationexecutor.impl) doInvokeLocal:601, Invocation (com.hazelcast.spi.impl.operationservice.impl) doInvoke:580, Invocation (com.hazelcast.spi.impl.operationservice.impl) invoke0:541, Invocation (com.hazelcast.spi.impl.operationservice.impl) invoke:241, Invocation (com.hazelcast.spi.impl.operationservice.impl) invoke:61, InvocationBuilderImpl (com.hazelcast.spi.impl.operationservice.impl) invokeOnParticipant:294, MasterContext (com.hazelcast.jet.impl) invokeOnParticipants:277, MasterContext (com.hazelcast.jet.impl) initExecution:268, MasterJobContext (com.hazelcast.jet.impl) lambda$null$1:239, MasterJobContext (com.hazelcast.jet.impl) lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl) lambda$submitToCoordinatorThread$55:1327, JobCoordinationService (com.hazelcast.jet.impl)
init metodu
Kod şöyle. Yeni bir HazelcastInstance yaratır ve ondan EventJournalReader nesnesi elde eder
@Override public void init(@Nonnull Context context) { HazelcastInstance instance = context.hazelcastInstance(); if (clientXml != null) { client = newHazelcastClient(asClientConfig(clientXml)); instance = client; } eventJournalReader = eventJournalReaderSupplier.apply(instance); }
Buradaki Context parametresi şöyle
ProcessorMetaSupplier.Context -> MetaSupplierCtx Processor.Context -> ProcSupplierCtx hem de MetaSupplierCtx ProcCtx
ExecutionPlan koşmaya başlayınca çağrılır. Şöyle
init:476, StreamEventJournalP$ClusterProcessorSupplier (com.hazelcast.jet.impl.connector) lambda$null$9c5eb802$1:456, ExecutionPlan (com.hazelcast.jet.impl.execution.init) runEx:-1, ExecutionPlan$$Lambda$1987/0x00000008011d61f8 (com.hazelcast.jet.impl.execution.init) run:31, RunnableEx (com.hazelcast.jet.function) doWithClassLoader:532, Util (com.hazelcast.jet.impl.util) lambda$initProcSuppliers$10beb6ff$1:455, ExecutionPlan (com.hazelcast.jet.impl.execution.init) runEx:-1, ExecutionPlan$$Lambda$1986/0x00000008011d36c8 (com.hazelcast.jet.impl.execution.init) run:31, RunnableEx (com.hazelcast.jet.function) run$$$capture:1804, CompletableFuture$AsyncRun (java.util.concurrent) run:-1, CompletableFuture$AsyncRun (java.util.concurrent) - Async stack trace <init>:1790, CompletableFuture$AsyncRun (java.util.concurrent) asyncRunStage:1818, CompletableFuture (java.util.concurrent) runAsync:2033, CompletableFuture (java.util.concurrent) initProcSuppliers:476, ExecutionPlan (com.hazelcast.jet.impl.execution.init) initialize:189, ExecutionPlan (com.hazelcast.jet.impl.execution.init) initialize:170, ExecutionContext (com.hazelcast.jet.impl.execution) lambda$initExecution$9:344, JobExecutionService (com.hazelcast.jet.impl) doWithClassLoader:547, Util (com.hazelcast.jet.impl.util) initExecution:343, JobExecutionService (com.hazelcast.jet.impl) doRun:98, InitExecutionOperation (com.hazelcast.jet.impl.operation) run:55, AsyncOperation (com.hazelcast.jet.impl.operation) call:190, Operation (com.hazelcast.spi.impl.operationservice) call:283, OperationRunnerImpl (com.hazelcast.spi.impl.operationservice.impl) run:258, OperationRunnerImpl (com.hazelcast.spi.impl.operationservice.impl) run:219, OperationRunnerImpl (com.hazelcast.spi.impl.operationservice.impl) run:411, OperationExecutorImpl (com.hazelcast.spi.impl.operationexecutor.impl) runOrExecute:438, OperationExecutorImpl (com.hazelcast.spi.impl.operationexecutor.impl) doInvokeLocal:601, Invocation (com.hazelcast.spi.impl.operationservice.impl) doInvoke:580, Invocation (com.hazelcast.spi.impl.operationservice.impl) invoke0:541, Invocation (com.hazelcast.spi.impl.operationservice.impl) invoke:241, Invocation (com.hazelcast.spi.impl.operationservice.impl) invoke:61, InvocationBuilderImpl (com.hazelcast.spi.impl.operationservice.impl) invokeOnParticipant:294, MasterContext (com.hazelcast.jet.impl) invokeOnParticipants:277, MasterContext (com.hazelcast.jet.impl) initExecution:268, MasterJobContext (com.hazelcast.jet.impl) lambda$null$1:239, MasterJobContext (com.hazelcast.jet.impl) lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl) lambda$submitToCoordinatorThread$55:1327, JobCoordinationService (com.hazelcast.jet.impl)
Hiç yorum yok:
Yorum Gönder