Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.impl.JobExecutionService;
Kod şöyle. ExecutionContext nesnelerini saklar.
/**
* Service to handle ExecutionContexts on all cluster members. Job-control
* operations from coordinator are handled here.
*/
public class JobExecutionService implements DynamicMetricsProvider {
/**
* A timeout after which we cancel a light job that doesn't receive InitOp
* from the coordinator. {@link ExecutionContext} can be created in
* response to data packet received for that execution, but it doesn't know
* the coordinator. Therefore, the checker cannot confirm with the
* coordinator if it still exists. We terminate these jobs after a timeout.
* However, the timeout has to be long enough because if the job happens to
* be initialized later, we'll lose data, and we won't even detect it. It can
* also happen that we lose a DONE_ITEM and the job will get stuck, though
* that's better than incorrect results.
*/
private static final long UNINITIALIZED_CONTEXT_MAX_AGE_NS = MINUTES.toNanos(5);
private static final long FAILED_EXECUTION_EXPIRY_NS = SECONDS.toNanos(5);
private static final CompletableFuture<?>[] EMPTY_COMPLETABLE_FUTURE_ARRAY
= new CompletableFuture[0];
private final Object mutex = new Object();
private final NodeEngineImpl nodeEngine;
private final ILogger logger;
private final TaskletExecutionService taskletExecutionService;
private final JobClassLoaderService jobClassloaderService;
private final Set<Long> executionContextJobIds = newSetFromMap(
new ConcurrentHashMap<>());
// key: executionId
private final ConcurrentMap<Long, ExecutionContext> executionContexts
= new ConcurrentHashMap<>();
/**
* Key: executionId
* Value: expiry time (as per System.nanoTime())
* <p>
* This map contains executions, that failed or were cancelled.
* These executions are very likely to receive further data packets
* from other members whose executions are concurrently cancelled
* too. If we keep no track of these exceptions, in failure-heavy or
* cancellation-heavy scenarios a significant amount of memory could
* be held for time defined in {@link
* #UNINITIALIZED_CONTEXT_MAX_AGE_NS}, see
* <a href="https://github.com/hazelcast/hazelcast/issues/19897">issue #19897</a>.
*/
private final ConcurrentMap<Long, Long> failedJobs = new ConcurrentHashMap<>();
@Probe(name = MetricNames.JOB_EXECUTIONS_STARTED)
private final Counter executionStarted = MwCounter.newMwCounter();
@Probe(name = MetricNames.JOB_EXECUTIONS_COMPLETED)
private final Counter executionCompleted = MwCounter.newMwCounter();
private final Function<? super Long, ? extends ExecutionContext>
newLightJobExecutionContextFunction;
private final ScheduledFuture<?> lightExecutionsCheckerFuture;
...
}initExecution metodu
İmzası şöyle
public CompletableFuture<Void> initExecution( long jobId, long executionId, Address coordinator, int coordinatorMemberListVersion, Set<MemberInfo> participants, ExecutionPlan plan )
Belirtilen ExecutionPlan'ı çalıştırır. Çağrı sırası şöyle
assertIsMaster:410, JobExecutionService (com.hazelcast.jet.impl) addExecutionContext:387, JobExecutionService (com.hazelcast.jet.impl) initExecution:338, JobExecutionService (com.hazelcast.jet.impl) doRun:98, InitExecutionOperation (com.hazelcast.jet.impl.operation) run:55, AsyncOperation (com.hazelcast.jet.impl.operation)
Hiç yorum yok:
Yorum Gönder