4 Kasım 2022 Cuma

Hazelcast Jet JobCoordinationService Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.impl.JobCoordinationService;
Jet Coordinator nesnesinin kendi thread pool'u var.

Bu sınıfı direkt kullanmıyoruz. Jet node üzerinde çalışan Job'lar burada saklanıyor.  Job'ları saklayan yapı şöyle
public class JobCoordinationService {

  ConcurrentMap<Long, MasterContext> masterContexts = new ConcurrentHashMap<>();
  ConcurrentMap<Long, Object> lightMasterContexts = new ConcurrentHashMap<>();
  ...
}
Açıklaması şöyle
When you submit a job to the cluster, one of the members takes on the role of a coordinator to carry out the following tasks:

1. Expand the core DAG into the tasklet execution plan.

2. Distribute the execution plan to all the other members.

3. Move the pipeline execution job through its lifecycle (initialize, run, clean up) while the other members follow its commands and report state changes.

If a cluster member fails (leaves the cluster), the coordinator suspends all the jobs, rescales them to the new cluster topology, and resumes them.

If the coordinator fails, the other members enter a consensus protocol to elect a new one, which then restores all the running jobs.

Açıklaması şöyle
Q : When you connect to Kafka or JDBC, is it only the coordinator member that actually connects to the source/sink?

A : Kafka
With local parallelism >1, each member connects multiple times...

JDBC :
 Sanırım aynı
Auto Cluster State
Eğer -Dhazelcast.persistence.auto.cluster.state=true ise Bazen şöyle hatalar görebiliriz
com.hazelcast.spi.exception.RetryableHazelcastException: Cannot submit job with name '..' before the master node initializes job coordination service's state
Sebebinin açıklaması şöyle. Yani cluster daha tam oturaklı değil demek.
When automatic cluster state management is enabled, Jet jobs do not start until the cluster reaches desired topology. Also Jet is not initialIzed until that happens
JobCoordinationService loglarında şöyle satırlar görebiliriz
Not starting jobs because cluster is running in managed context and is not yet stable.

1. Light Job
submitLightJob metodu
Kodu şöyle. Burada LightMasterContext yaratılıyor
public CompletableFuture<Void> submitLightJob(
            long jobId,
            Object deserializedJobDefinition,
            Data serializedJobDefinition,
            JobConfig jobConfig,
            Subject subject
    ) {
  ...
  // First insert just a marker into the map. This is to prevent initializing the light 
  //job if the jobId was submitted twice. This can happen e.g. if the client retries.
  Object oldContext = lightMasterContexts.putIfAbsent(jobId, 
                                                      UNINITIALIZED_LIGHT_JOB_MARKER);
  if (oldContext != null) {
    throw new JetException("duplicate jobId " + idToString(jobId));
  }
  ...
  // Initialize and start the job. We do this before adding the actual
  // LightMasterContext to the map to avoid possible races of the job initialization and 
  //cancellation.
  return LightMasterContext.createContext(nodeEngine, this, dag, jobId, jobConfig,subject)
    .thenComposeAsync(mc -> {
      Object oldCtx = lightMasterContexts.put(jobId, mc);
      assert oldCtx == UNINITIALIZED_LIGHT_JOB_MARKER;
      scheduleJobTimeout(jobId, jobConfig.getTimeoutMillis());

      return mc.getCompletionFuture()
               .whenComplete((r, t) -> {
                 Object removed = lightMasterContexts.remove(jobId);
                 unscheduleJobTimeout(jobId);
               });
       }, 
       //The executor for thenComposeAsync
       coordinationExecutor());
}
terminateLightJob metodu
job.cancel() ile terminate isteği gelince kod şöyle. İstek LightMasterContext'e aktarılıyor. O da executionPlanMap üzerinde dolaşıp job hangi member'da koşuyorsa ona terminate isteği gönderiyor.
public void terminateLightJob(long jobId) {
  Object mc = lightMasterContexts.get(jobId);
  if (mc == null || mc == UNINITIALIZED_LIGHT_JOB_MARKER) {
    throw new JobNotFoundException(jobId);
  }
  ((LightMasterContext) mc).requestTermination();
}
2. Normal Job
Normal job'lar JobRepository içinde saklanıyor. Kodu şöyle
public class JobCoordinationService {
  ...     
  private final JobRepository jobRepository;
  ...
}
submitJob metodu
SubmitJobOperation tarafından çağrılır. Kodu şöyle
Burada 
- assertIsMaster() çağrısıyla üyenin master yani job coordinator üye olduğu kontrol ediliyor.
- MasterContext yaratılıyor..
public CompletableFuture<Void> submitJob(
            long jobId,
            Data serializedJobDefinition,
            JobConfig jobConfig,
            Subject subject
) {
  CompletableFuture<Void> res = new CompletableFuture<>();
  submitToCoordinatorThread(() -> {
    MasterContext masterContext;

    assertIsMaster("Cannot submit job " + idToString(jobId) + " to non-master node");
    ...
    // the order of operations is important.

    // first, check if the job is already completed
    JobResult jobResult = jobRepository.getJobResult(jobId);
    if (jobResult != null) {
      ...
      return;
   }
   ...
   int quorumSize = ...;
   JobExecutionRecord jobExecutionRecord = new JobExecutionRecord(jobId, quorumSize);
   masterContext = createMasterContext(jobRecord, jobExecutionRecord);
   ... 
   MasterContext prev = masterContexts.putIfAbsent(jobId, masterContext);
   ...
   tryStartJob(masterContext);
   ...
   return res;
}
assertIsMaster metodu
Kod şöyle
void assertIsMaster(String error) {
  if (!isMaster()) {
    throw new JetException(error + ". Master address: " +
      nodeEngine.getClusterService().getMasterAddress());
  }
}

private boolean isMaster() {
  return nodeEngine.getClusterService().isMaster();
}



Hiç yorum yok:

Yorum Gönder

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt