Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.impl.connector.StreamEventJournalP;
Bu sınıf StreamEventJournalP içinde saklı. Kalıtım şöyle
ProcessorMetaSupplier
ClusterMetaSupplier
ElasticSourcePMetaSupplier
FileProcessorMetaSupplier
KinesisSourcePMetaSupplier
...
Kaç tane job submit edildiyse o kadar ClusterMetaSupplier nesnesi yaratılır. Kod şöyle.
private static class ClusterMetaSupplier<E, T> implements ProcessorMetaSupplier {
static final long serialVersionUID = 1L;
private final String clientXml;
private final FunctionEx<? super HazelcastInstance, ? extends EventJournalReader<E>>
eventJournalReaderSupplier;
private final PredicateEx<? super E> predicate;
private final FunctionEx<? super E, ? extends T> projection;
private final JournalInitialPosition initialPos;
private final EventTimePolicy<? super T> eventTimePolicy;
private final SupplierEx<Permission> permissionFn;
private transient int remotePartitionCount;
private transient Map<Address, List<Integer>> addrToPartitions;
...
}
constructor
DAG aşamasında çağrılır. Şöyle
<init>:368, StreamEventJournalP$ClusterMetaSupplier (com.hazelcast.jet.impl.connector) streamRemoteMapSupplier:537, StreamEventJournalP (com.hazelcast.jet.impl.connector) lambda$remoteMapJournal$98b231e7$1:658, Sources (com.hazelcast.jet.pipeline) applyEx:-1, Sources$$Lambda$1647/0x00000008010ef238 (com.hazelcast.jet.pipeline) apply:49, FunctionEx (com.hazelcast.function) addToDag:81, StreamSourceTransform (com.hazelcast.jet.impl.pipeline.transform) createDag:138, Planner (com.hazelcast.jet.impl.pipeline) toDag:122, PipelineImpl (com.hazelcast.jet.impl.pipeline) lambda$submitJob$2:254, JobCoordinationService (com.hazelcast.jet.impl) run:-1, JobCoordinationService$$Lambda$1628/0x00000008010cc580 (com.hazelcast.jet.impl) lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl) call:-1, JobCoordinationService$$Lambda$1629/0x00000008010cc7a8 (com.hazelcast.jet.impl) lambda$submitToCoordinatorThread$55:1327, JobCoordinationService (com.hazelcast.jet.impl) call:-1, JobCoordinationService$$Lambda$1630/0x00000008010cc9c8 (com.hazelcast.jet.impl)
init metodu
ExecutionPlanBuilder tarafından çağrılır. Şöyle
init:384, StreamEventJournalP$ClusterMetaSupplier (com.hazelcast.jet.impl.connector) lambda$null$ba46c9b5$1:117, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init) runEx:-1, ExecutionPlanBuilder$$Lambda$1757/0x0000000801143ab8 (com.hazelcast.jet.impl.execution.init) run:31, RunnableEx (com.hazelcast.jet.function) doWithClassLoader:532, Util (com.hazelcast.jet.impl.util) lambda$createExecutionPlans$b9e545c5$1:116, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init) runEx:-1, ExecutionPlanBuilder$$Lambda$1753/0x0000000801141d98 (com.hazelcast.jet.impl.execution.init) run:31, RunnableEx (com.hazelcast.jet.function) run$$$capture:1804, CompletableFuture$AsyncRun (java.util.concurrent) run:-1, CompletableFuture$AsyncRun (java.util.concurrent) - Async stack trace <init>:1790, CompletableFuture$AsyncRun (java.util.concurrent) asyncRunStage:1818, CompletableFuture (java.util.concurrent) runAsync:2033, CompletableFuture (java.util.concurrent) createExecutionPlans:141, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init) createExecutionPlans:255, MasterJobContext (com.hazelcast.jet.impl) lambda$tryStartJob$4:237, MasterJobContext (com.hazelcast.jet.impl) lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl) submitToCoordinatorThread:1315, JobCoordinationService (com.hazelcast.jet.impl) submitToCoordinatorThread:1305, JobCoordinationService (com.hazelcast.jet.impl) tryStartJob:210, MasterJobContext (com.hazelcast.jet.impl) tryStartJob:1198, JobCoordinationService (com.hazelcast.jet.impl) lambda$submitJob$2:308, JobCoordinationService (com.hazelcast.jet.impl) lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl) lambda$submitToCoordinatorThread$55:1327, JobCoordinationService (com.hazelcast.jet.impl)
get metodu - List<Address>
İmzası şöyle
public Function<Address, ProcessorSupplier> get(@Nonnull List<Address> addresses);
Cluster içindeki her member'ın adresi liste olarak geçilir. Bu adrese ait partition id'ler işleyen bir
ClusterProcessorSupplier yani processor factory nesnesi döner. ExecutionPlanBuilder tarafından çağrılır. Şöyle
get:411, StreamEventJournalP$ClusterMetaSupplier (com.hazelcast.jet.impl.connector) lambda$null$3:125, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init) call:-1, ExecutionPlanBuilder$$Lambda$1765/0x000000080114e9d8 (com.hazelcast.jet.impl.execution.init) doWithClassLoader:547, Util (com.hazelcast.jet.impl.util) lambda$createExecutionPlans$b9e545c5$1:125, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init) runEx:-1, ExecutionPlanBuilder$$Lambda$1753/0x0000000801141d98 (com.hazelcast.jet.impl.execution.init) run:31, RunnableEx (com.hazelcast.jet.function) run$$$capture:1804, CompletableFuture$AsyncRun (java.util.concurrent) run:-1, CompletableFuture$AsyncRun (java.util.concurrent) - Async stack trace <init>:1790, CompletableFuture$AsyncRun (java.util.concurrent) asyncRunStage:1818, CompletableFuture (java.util.concurrent) runAsync:2033, CompletableFuture (java.util.concurrent) createExecutionPlans:141, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init) createExecutionPlans:255, MasterJobContext (com.hazelcast.jet.impl) lambda$tryStartJob$4:237, MasterJobContext (com.hazelcast.jet.impl) lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl) submitToCoordinatorThread:1315, JobCoordinationService (com.hazelcast.jet.impl) submitToCoordinatorThread:1305, JobCoordinationService (com.hazelcast.jet.impl) tryStartJob:210, MasterJobContext (com.hazelcast.jet.impl) tryStartJob:1198, JobCoordinationService (com.hazelcast.jet.impl) lambda$submitJob$2:308, JobCoordinationService (com.hazelcast.jet.impl) lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl) lambda$submitToCoordinatorThread$55:1327, JobCoordinationService (com.hazelcast.jet.impl)
Hiç yorum yok:
Yorum Gönder