31 Ocak 2023 Salı

Hazelcast Jet ClusterProcessorSupplier Sınıfı - Processor Factory

Giriş
Şu satırı dahil ederiz 
import com.hazelcast.jet.impl.connector.StreamEventJournalP;
Cluster Member Sayısı kadar ClusterProcessorSupplier nesnesi yaratılır.  Bu sınıf ta kendine verilen  partition listesi * localParallelism kadar Processor yaratılır. Kod şöyle.
private static class ClusterProcessorSupplier<E, T> implements ProcessorSupplier {

  static final long serialVersionUID = 1L;

  @Nonnull
  private final List<Integer> ownedPartitions;
  @Nullable
  private final String clientXml;
  @Nonnull
  private final FunctionEx<? super HazelcastInstance, ? extends EventJournalReader<E>>
                eventJournalReaderSupplier;
  @Nonnull
  private final PredicateEx<? super E> predicate;
  @Nonnull
  private final FunctionEx<? super E, ? extends T> projection;
  @Nonnull
  private final JournalInitialPosition initialPos;
  @Nonnull
  private final EventTimePolicy<? super T> eventTimePolicy;

  private transient HazelcastInstance client;
  private transient EventJournalReader<E> eventJournalReader;
  ...
}
constructor
İmzası şöyle. Bir member adresteki belirtilen partition listesi için çalışır.
ClusterProcessorSupplier(
  @Nonnull List<Integer> ownedPartitions,
  @Nonnull ClusterMetaSupplierParams clusterMetaSupplierParams
) {
  ...
}
ExecutionPlanBuilder tarafından çağrılır. Şöyle
<init>:465, StreamEventJournalP$ClusterProcessorSupplier (com.hazelcast.jet.impl.connector)
lambda$get$2:418, StreamEventJournalP$ClusterMetaSupplier (com.hazelcast.jet.impl.connector)
apply:-1, StreamEventJournalP$ClusterMetaSupplier$$Lambda$1974/0x00000008011c3a98 (com.hazelcast.jet.impl.connector)
lambda$null$4:128, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init)
call:-1, ExecutionPlanBuilder$$Lambda$1773/0x000000080114a000 (com.hazelcast.jet.impl.execution.init)
doWithClassLoader:547, Util (com.hazelcast.jet.impl.util)
lambda$createExecutionPlans$b9e545c5$1:128, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init)
runEx:-1, ExecutionPlanBuilder$$Lambda$1753/0x0000000801141d98 (com.hazelcast.jet.impl.execution.init)
run:31, RunnableEx (com.hazelcast.jet.function)
run$$$capture:1804, CompletableFuture$AsyncRun (java.util.concurrent)
run:-1, CompletableFuture$AsyncRun (java.util.concurrent)
 - Async stack trace
<init>:1790, CompletableFuture$AsyncRun (java.util.concurrent)
asyncRunStage:1818, CompletableFuture (java.util.concurrent)
runAsync:2033, CompletableFuture (java.util.concurrent)
createExecutionPlans:141, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init)
createExecutionPlans:255, MasterJobContext (com.hazelcast.jet.impl)
lambda$tryStartJob$4:237, MasterJobContext (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl)
submitToCoordinatorThread:1315, JobCoordinationService (com.hazelcast.jet.impl)
submitToCoordinatorThread:1305, JobCoordinationService (com.hazelcast.jet.impl)
tryStartJob:210, MasterJobContext (com.hazelcast.jet.impl)
tryStartJob:1198, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitJob$2:308, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$55:1327, JobCoordinationService (com.hazelcast.jet.impl)
get metodu - int count
İmzası şöyle. localParallelism kadar processor yaratır
public List<Processor> get(int count)
ExecutionPlan tarafından çağrılır. Şöyle
get:494, StreamEventJournalP$ClusterProcessorSupplier (com.hazelcast.jet.impl.connector)
get:432, StreamEventJournalP$ClusterProcessorSupplier (com.hazelcast.jet.impl.connector)
createProcessors:503, ExecutionPlan (com.hazelcast.jet.impl.execution.init)
lambda$null$1:215, ExecutionPlan (com.hazelcast.jet.impl.execution.init)
call:-1, ExecutionPlan$$Lambda$2012/0x00000008011edb90 (com.hazelcast.jet.impl.execution.init)
doWithClassLoader:547, Util (com.hazelcast.jet.impl.util)
lambda$initialize$3:213, ExecutionPlan (com.hazelcast.jet.impl.execution.init)
accept:-1, ExecutionPlan$$Lambda$1989/0x00000008011dc000 (com.hazelcast.jet.impl.execution.init)
tryFire$$$capture:718, CompletableFuture$UniAccept (java.util.concurrent)
tryFire:-1, CompletableFuture$UniAccept (java.util.concurrent)
 - Async stack trace
<init>:697, CompletableFuture$UniAccept (java.util.concurrent)
uniAcceptStage:737, CompletableFuture (java.util.concurrent)
thenAccept:2182, CompletableFuture (java.util.concurrent)
initialize:191, ExecutionPlan (com.hazelcast.jet.impl.execution.init)
initialize:170, ExecutionContext (com.hazelcast.jet.impl.execution)
lambda$initExecution$9:344, JobExecutionService (com.hazelcast.jet.impl)
doWithClassLoader:547, Util (com.hazelcast.jet.impl.util)
initExecution:343, JobExecutionService (com.hazelcast.jet.impl)
doRun:98, InitExecutionOperation (com.hazelcast.jet.impl.operation)
run:55, AsyncOperation (com.hazelcast.jet.impl.operation)
call:190, Operation (com.hazelcast.spi.impl.operationservice)
call:283, OperationRunnerImpl (com.hazelcast.spi.impl.operationservice.impl)
run:258, OperationRunnerImpl (com.hazelcast.spi.impl.operationservice.impl)
run:219, OperationRunnerImpl (com.hazelcast.spi.impl.operationservice.impl)
run:411, OperationExecutorImpl (com.hazelcast.spi.impl.operationexecutor.impl)
runOrExecute:438, OperationExecutorImpl (com.hazelcast.spi.impl.operationexecutor.impl)
doInvokeLocal:601, Invocation (com.hazelcast.spi.impl.operationservice.impl)
doInvoke:580, Invocation (com.hazelcast.spi.impl.operationservice.impl)
invoke0:541, Invocation (com.hazelcast.spi.impl.operationservice.impl)
invoke:241, Invocation (com.hazelcast.spi.impl.operationservice.impl)
invoke:61, InvocationBuilderImpl (com.hazelcast.spi.impl.operationservice.impl)
invokeOnParticipant:294, MasterContext (com.hazelcast.jet.impl)
invokeOnParticipants:277, MasterContext (com.hazelcast.jet.impl)
initExecution:268, MasterJobContext (com.hazelcast.jet.impl)
lambda$null$1:239, MasterJobContext (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$55:1327, JobCoordinationService (com.hazelcast.jet.impl)
init metodu
Kod şöyle. Yeni bir HazelcastInstance yaratır ve ondan EventJournalReader nesnesi elde eder
@Override
public void init(@Nonnull Context context) {
  HazelcastInstance instance = context.hazelcastInstance();
  if (clientXml != null) {
    client = newHazelcastClient(asClientConfig(clientXml));
    instance = client;
  }
  eventJournalReader = eventJournalReaderSupplier.apply(instance);
}
Buradaki Context parametresi şöyle
ProcessorMetaSupplier.Context -> MetaSupplierCtx
  Processor.Context -> ProcSupplierCtx hem de MetaSupplierCtx
    		       ProcCtx
ExecutionPlan koşmaya başlayınca çağrılır. Şöyle
init:476, StreamEventJournalP$ClusterProcessorSupplier (com.hazelcast.jet.impl.connector)
lambda$null$9c5eb802$1:456, ExecutionPlan (com.hazelcast.jet.impl.execution.init)
runEx:-1, ExecutionPlan$$Lambda$1987/0x00000008011d61f8 (com.hazelcast.jet.impl.execution.init)
run:31, RunnableEx (com.hazelcast.jet.function)
doWithClassLoader:532, Util (com.hazelcast.jet.impl.util)
lambda$initProcSuppliers$10beb6ff$1:455, ExecutionPlan (com.hazelcast.jet.impl.execution.init)
runEx:-1, ExecutionPlan$$Lambda$1986/0x00000008011d36c8 (com.hazelcast.jet.impl.execution.init)
run:31, RunnableEx (com.hazelcast.jet.function)
run$$$capture:1804, CompletableFuture$AsyncRun (java.util.concurrent)
run:-1, CompletableFuture$AsyncRun (java.util.concurrent)
 - Async stack trace
<init>:1790, CompletableFuture$AsyncRun (java.util.concurrent)
asyncRunStage:1818, CompletableFuture (java.util.concurrent)
runAsync:2033, CompletableFuture (java.util.concurrent)
initProcSuppliers:476, ExecutionPlan (com.hazelcast.jet.impl.execution.init)
initialize:189, ExecutionPlan (com.hazelcast.jet.impl.execution.init)
initialize:170, ExecutionContext (com.hazelcast.jet.impl.execution)
lambda$initExecution$9:344, JobExecutionService (com.hazelcast.jet.impl)
doWithClassLoader:547, Util (com.hazelcast.jet.impl.util)
initExecution:343, JobExecutionService (com.hazelcast.jet.impl)
doRun:98, InitExecutionOperation (com.hazelcast.jet.impl.operation)
run:55, AsyncOperation (com.hazelcast.jet.impl.operation)
call:190, Operation (com.hazelcast.spi.impl.operationservice)
call:283, OperationRunnerImpl (com.hazelcast.spi.impl.operationservice.impl)
run:258, OperationRunnerImpl (com.hazelcast.spi.impl.operationservice.impl)
run:219, OperationRunnerImpl (com.hazelcast.spi.impl.operationservice.impl)
run:411, OperationExecutorImpl (com.hazelcast.spi.impl.operationexecutor.impl)
runOrExecute:438, OperationExecutorImpl (com.hazelcast.spi.impl.operationexecutor.impl)
doInvokeLocal:601, Invocation (com.hazelcast.spi.impl.operationservice.impl)
doInvoke:580, Invocation (com.hazelcast.spi.impl.operationservice.impl)
invoke0:541, Invocation (com.hazelcast.spi.impl.operationservice.impl)
invoke:241, Invocation (com.hazelcast.spi.impl.operationservice.impl)
invoke:61, InvocationBuilderImpl (com.hazelcast.spi.impl.operationservice.impl)
invokeOnParticipant:294, MasterContext (com.hazelcast.jet.impl)
invokeOnParticipants:277, MasterContext (com.hazelcast.jet.impl)
initExecution:268, MasterJobContext (com.hazelcast.jet.impl)
lambda$null$1:239, MasterJobContext (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$55:1327, JobCoordinationService (com.hazelcast.jet.impl)

Hiç yorum yok:

Yorum Gönder

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt