5 Aralık 2022 Pazartesi

Hazelcast Jet JobExecutionService Sınıfı - Job Coordinator Çalıştırır

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.impl.JobExecutionService;
Kod şöyle. ExecutionContext nesnelerini saklar.
/**
 * Service to handle ExecutionContexts on all cluster members. Job-control
 * operations from coordinator are handled here.
 */
public class JobExecutionService implements DynamicMetricsProvider {

  /**
    * A timeout after which we cancel a light job that doesn't receive InitOp
    * from the coordinator. {@link ExecutionContext} can be created in
    * response to data packet received for that execution, but it doesn't know
    * the coordinator. Therefore, the checker cannot confirm with the
    * coordinator if it still exists. We terminate these jobs after a timeout.
    * However, the timeout has to be long enough because if the job happens to
    * be initialized later, we'll lose data, and we won't even detect it. It can
    * also happen that we lose a DONE_ITEM and the job will get stuck, though
    * that's better than incorrect results.
  */
  private static final long UNINITIALIZED_CONTEXT_MAX_AGE_NS = MINUTES.toNanos(5);

  private static final long FAILED_EXECUTION_EXPIRY_NS = SECONDS.toNanos(5);
  private static final CompletableFuture<?>[] EMPTY_COMPLETABLE_FUTURE_ARRAY 
    = new CompletableFuture[0];

  private final Object mutex = new Object();

  private final NodeEngineImpl nodeEngine;
  private final ILogger logger;
  private final TaskletExecutionService taskletExecutionService;
  private final JobClassLoaderService jobClassloaderService;

  private final Set<Long> executionContextJobIds = newSetFromMap(
    new ConcurrentHashMap<>());

  // key: executionId
  private final ConcurrentMap<Long, ExecutionContext> executionContexts 
    = new ConcurrentHashMap<>();

  /**
    * Key: executionId
    * Value: expiry time (as per System.nanoTime())
    * <p>
    * This map contains executions, that failed or were cancelled.
    * These executions are very likely to receive further data packets
    * from other members whose executions are concurrently cancelled
    * too. If we keep no track of these exceptions, in failure-heavy or
    * cancellation-heavy scenarios a significant amount of memory could
    * be held for time defined in {@link
    * #UNINITIALIZED_CONTEXT_MAX_AGE_NS}, see
    * <a href="https://github.com/hazelcast/hazelcast/issues/19897">issue #19897</a>.
  */
  private final ConcurrentMap<Long, Long> failedJobs = new ConcurrentHashMap<>();

  @Probe(name = MetricNames.JOB_EXECUTIONS_STARTED)
  private final Counter executionStarted = MwCounter.newMwCounter();
  @Probe(name = MetricNames.JOB_EXECUTIONS_COMPLETED)
  private final Counter executionCompleted = MwCounter.newMwCounter();

  private final Function<? super Long, ? extends ExecutionContext> 
    newLightJobExecutionContextFunction;

  private final ScheduledFuture<?> lightExecutionsCheckerFuture;
  ...
}
initExecution metodu
İmzası şöyle
public CompletableFuture<Void> initExecution(
  long jobId, long executionId, Address coordinator, int coordinatorMemberListVersion,
  Set<MemberInfo> participants, ExecutionPlan plan
)
Belirtilen ExecutionPlan'ı çalıştırır. Çağrı sırası şöyle
assertIsMaster:410, JobExecutionService (com.hazelcast.jet.impl)
addExecutionContext:387, JobExecutionService (com.hazelcast.jet.impl)
initExecution:338, JobExecutionService (com.hazelcast.jet.impl)
doRun:98, InitExecutionOperation (com.hazelcast.jet.impl.operation)
run:55, AsyncOperation (com.hazelcast.jet.impl.operation)




Hiç yorum yok:

Yorum Gönder

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt