Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.impl.JobExecutionService;
Kod şöyle. ExecutionContext nesnelerini saklar.
/** * Service to handle ExecutionContexts on all cluster members. Job-control * operations from coordinator are handled here. */ public class JobExecutionService implements DynamicMetricsProvider { /** * A timeout after which we cancel a light job that doesn't receive InitOp * from the coordinator. {@link ExecutionContext} can be created in * response to data packet received for that execution, but it doesn't know * the coordinator. Therefore, the checker cannot confirm with the * coordinator if it still exists. We terminate these jobs after a timeout. * However, the timeout has to be long enough because if the job happens to * be initialized later, we'll lose data, and we won't even detect it. It can * also happen that we lose a DONE_ITEM and the job will get stuck, though * that's better than incorrect results. */ private static final long UNINITIALIZED_CONTEXT_MAX_AGE_NS = MINUTES.toNanos(5); private static final long FAILED_EXECUTION_EXPIRY_NS = SECONDS.toNanos(5); private static final CompletableFuture<?>[] EMPTY_COMPLETABLE_FUTURE_ARRAY = new CompletableFuture[0]; private final Object mutex = new Object(); private final NodeEngineImpl nodeEngine; private final ILogger logger; private final TaskletExecutionService taskletExecutionService; private final JobClassLoaderService jobClassloaderService; private final Set<Long> executionContextJobIds = newSetFromMap( new ConcurrentHashMap<>()); // key: executionId private final ConcurrentMap<Long, ExecutionContext> executionContexts = new ConcurrentHashMap<>(); /** * Key: executionId * Value: expiry time (as per System.nanoTime()) * <p> * This map contains executions, that failed or were cancelled. * These executions are very likely to receive further data packets * from other members whose executions are concurrently cancelled * too. If we keep no track of these exceptions, in failure-heavy or * cancellation-heavy scenarios a significant amount of memory could * be held for time defined in {@link * #UNINITIALIZED_CONTEXT_MAX_AGE_NS}, see * <a href="https://github.com/hazelcast/hazelcast/issues/19897">issue #19897</a>. */ private final ConcurrentMap<Long, Long> failedJobs = new ConcurrentHashMap<>(); @Probe(name = MetricNames.JOB_EXECUTIONS_STARTED) private final Counter executionStarted = MwCounter.newMwCounter(); @Probe(name = MetricNames.JOB_EXECUTIONS_COMPLETED) private final Counter executionCompleted = MwCounter.newMwCounter(); private final Function<? super Long, ? extends ExecutionContext> newLightJobExecutionContextFunction; private final ScheduledFuture<?> lightExecutionsCheckerFuture; ... }
initExecution metodu
İmzası şöyle
public CompletableFuture<Void> initExecution( long jobId, long executionId, Address coordinator, int coordinatorMemberListVersion, Set<MemberInfo> participants, ExecutionPlan plan )
Belirtilen ExecutionPlan'ı çalıştırır. Çağrı sırası şöyle
assertIsMaster:410, JobExecutionService (com.hazelcast.jet.impl) addExecutionContext:387, JobExecutionService (com.hazelcast.jet.impl) initExecution:338, JobExecutionService (com.hazelcast.jet.impl) doRun:98, InitExecutionOperation (com.hazelcast.jet.impl.operation) run:55, AsyncOperation (com.hazelcast.jet.impl.operation)
Hiç yorum yok:
Yorum Gönder