23 Ocak 2023 Pazartesi

Hazelcast Jet ClusterMetaSupplier Sınıfı - Member Başına Düşen Partition'ı Hesaplar

Giriş
Şu satırı dahil ederiz 
import com.hazelcast.jet.impl.connector.StreamEventJournalP;
Bu sınıf StreamEventJournalP içinde saklı. Kalıtım şöyle
ProcessorMetaSupplier
  ClusterMetaSupplier
  ElasticSourcePMetaSupplier
  FileProcessorMetaSupplier
  KinesisSourcePMetaSupplier
  ...
Kaç tane job submit edildiyse o kadar ClusterMetaSupplier nesnesi yaratılır. Kod şöyle.
private static class ClusterMetaSupplier<E, T> implements ProcessorMetaSupplier {

  static final long serialVersionUID = 1L;

  private final String clientXml;
  private final FunctionEx<? super HazelcastInstance, ? extends EventJournalReader<E>>
                eventJournalReaderSupplier;
  private final PredicateEx<? super E> predicate;
  private final FunctionEx<? super E, ? extends T> projection;
  private final JournalInitialPosition initialPos;
  private final EventTimePolicy<? super T> eventTimePolicy;
  private final SupplierEx<Permission> permissionFn;

  private transient int remotePartitionCount;
  private transient Map<Address, List<Integer>> addrToPartitions;
  ...
}
constructor
DAG aşamasında çağrılır. Şöyle
<init>:368, StreamEventJournalP$ClusterMetaSupplier (com.hazelcast.jet.impl.connector)
streamRemoteMapSupplier:537, StreamEventJournalP (com.hazelcast.jet.impl.connector)
lambda$remoteMapJournal$98b231e7$1:658, Sources (com.hazelcast.jet.pipeline)
applyEx:-1, Sources$$Lambda$1647/0x00000008010ef238 (com.hazelcast.jet.pipeline)
apply:49, FunctionEx (com.hazelcast.function)
addToDag:81, StreamSourceTransform (com.hazelcast.jet.impl.pipeline.transform)
createDag:138, Planner (com.hazelcast.jet.impl.pipeline)
toDag:122, PipelineImpl (com.hazelcast.jet.impl.pipeline)
lambda$submitJob$2:254, JobCoordinationService (com.hazelcast.jet.impl)
run:-1, JobCoordinationService$$Lambda$1628/0x00000008010cc580 (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl)
call:-1, JobCoordinationService$$Lambda$1629/0x00000008010cc7a8 (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$55:1327, JobCoordinationService (com.hazelcast.jet.impl)
call:-1, JobCoordinationService$$Lambda$1630/0x00000008010cc9c8 (com.hazelcast.jet.impl)
init metodu
ExecutionPlanBuilder tarafından çağrılır. Şöyle
init:384, StreamEventJournalP$ClusterMetaSupplier (com.hazelcast.jet.impl.connector)
lambda$null$ba46c9b5$1:117, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init)
runEx:-1, ExecutionPlanBuilder$$Lambda$1757/0x0000000801143ab8 (com.hazelcast.jet.impl.execution.init)
run:31, RunnableEx (com.hazelcast.jet.function)
doWithClassLoader:532, Util (com.hazelcast.jet.impl.util)
lambda$createExecutionPlans$b9e545c5$1:116, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init)
runEx:-1, ExecutionPlanBuilder$$Lambda$1753/0x0000000801141d98 (com.hazelcast.jet.impl.execution.init)
run:31, RunnableEx (com.hazelcast.jet.function)
run$$$capture:1804, CompletableFuture$AsyncRun (java.util.concurrent)
run:-1, CompletableFuture$AsyncRun (java.util.concurrent)
 - Async stack trace
<init>:1790, CompletableFuture$AsyncRun (java.util.concurrent)
asyncRunStage:1818, CompletableFuture (java.util.concurrent)
runAsync:2033, CompletableFuture (java.util.concurrent)
createExecutionPlans:141, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init)
createExecutionPlans:255, MasterJobContext (com.hazelcast.jet.impl)
lambda$tryStartJob$4:237, MasterJobContext (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl)
submitToCoordinatorThread:1315, JobCoordinationService (com.hazelcast.jet.impl)
submitToCoordinatorThread:1305, JobCoordinationService (com.hazelcast.jet.impl)
tryStartJob:210, MasterJobContext (com.hazelcast.jet.impl)
tryStartJob:1198, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitJob$2:308, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$55:1327, JobCoordinationService (com.hazelcast.jet.impl)
get metodu - List<Address>
İmzası şöyle
public Function<Address, ProcessorSupplier> get(@Nonnull List<Address> addresses);
Cluster içindeki her member'ın adresi liste olarak geçilir. Bu adrese ait partition id'ler işleyen bir 
ClusterProcessorSupplier yani processor factory nesnesi döner. ExecutionPlanBuilder tarafından çağrılır. Şöyle
get:411, StreamEventJournalP$ClusterMetaSupplier (com.hazelcast.jet.impl.connector)
lambda$null$3:125, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init)
call:-1, ExecutionPlanBuilder$$Lambda$1765/0x000000080114e9d8 (com.hazelcast.jet.impl.execution.init)
doWithClassLoader:547, Util (com.hazelcast.jet.impl.util)
lambda$createExecutionPlans$b9e545c5$1:125, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init)
runEx:-1, ExecutionPlanBuilder$$Lambda$1753/0x0000000801141d98 (com.hazelcast.jet.impl.execution.init)
run:31, RunnableEx (com.hazelcast.jet.function)
run$$$capture:1804, CompletableFuture$AsyncRun (java.util.concurrent)
run:-1, CompletableFuture$AsyncRun (java.util.concurrent)
 - Async stack trace
<init>:1790, CompletableFuture$AsyncRun (java.util.concurrent)
asyncRunStage:1818, CompletableFuture (java.util.concurrent)
runAsync:2033, CompletableFuture (java.util.concurrent)
createExecutionPlans:141, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init)
createExecutionPlans:255, MasterJobContext (com.hazelcast.jet.impl)
lambda$tryStartJob$4:237, MasterJobContext (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl)
submitToCoordinatorThread:1315, JobCoordinationService (com.hazelcast.jet.impl)
submitToCoordinatorThread:1305, JobCoordinationService (com.hazelcast.jet.impl)
tryStartJob:210, MasterJobContext (com.hazelcast.jet.impl)
tryStartJob:1198, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitJob$2:308, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$55:1327, JobCoordinationService (com.hazelcast.jet.impl)

Hiç yorum yok:

Yorum Gönder

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt