27 Kasım 2022 Pazar

Hazelcast Jet ProcessorMetaSupplier Arayüzü - Job Coordinator'a Gönderilir

Giriş
Açıklaması şöyle
Factory of ProcessorSupplier instances. The starting point of the chain leading to the eventual creation of Processor instances on each cluster member:
1. client creates ProcessorMetaSupplier as a part of the DAG;
2. serializes it and sends to a cluster member;
3. the member deserializes and uses it to create one ProcessorSupplier for each cluster member;
4. serializes each ProcessorSupplier and sends it to its target member;
5. the target member deserializes and uses it to instantiate as many instances of Processor as requested by the parallelism property on the corresponding Vertex.

Before being asked to create ProcessorSuppliers this meta-supplier will be given access to the Hazelcast instance and, in particular, its cluster topology and partitioning services. It can use the information from these services to precisely parameterize each Processor instance that will be created on each member.
  1. Kısaca Job başlatmak isteyen client veya member, JobDefinition nesnesini Job Coordinator'a gönderir.
  2. JobDefinition içeriğinde ProcessorMetaSupplier'da bulunur. 
  3. ProcessorMetaSupplier  Job Coordinator tarafından çalıştırılır ve her bir member için ProcessorSupplier döndüren bir supplier üretir.
  4. ProcessorSupplier gerekli member'a gönderilir ve bu member ProcessorSupplier nesnesini kullanarak Processor Yaratır

1. client creates ProcessorMetaSupplier as a part of the DAG;
Örnek bir kod şöyle. Yani ProcessorMetaSupplier  Vertex'e ekleniyor
Planner p = ...
ProcessorMetaSupplier metaSupplier = ...
...
p.addVertex(this, name(), determinedLocalParallelism(), metaSupplier);
Metodlar
get metodu - List<Address>
İmzası şöyle. ProcessorSupplier döndüren bir Function döner. Burada sanırım addresses listesi get() metodunun üzerinde çalıştığı tüm member listesini belirtmek için kullanılıyor. Bu parametre çoğunlukla önemli değil
Function<? super Address, ? extends ProcessorSupplier> get(List<Address> addresses);
Örnek
Şöyle yaparız
public class MyProcessorMetaSupplier implements ProcessorMetaSupplier {

  @Override
  public Function<? super Address, ? extends ProcessorSupplier> get(
    List<Address> addresses) {
    return address -> {
      // Get the index of the member corresponding to this address
      int memberId = getMemberId(addresses, address);

      // Create a custom processor with behavior specific to this member
      Processor processor = new MyProcessor(memberId);

      // Return a processor supplier that provides instances of the custom processor
      return ProcessorSupplier.of(processor);
    };
  }

  // Helper method to get the index of a member based on its address
  private int getMemberId(List<Address> addresses, Address address) {
    int index = addresses.indexOf(address);
    return index < 0 ? -1 : index;
  }
}
of metodu
Sanırım tüm member'lar üzerinde Vertex.LOCAL_PARALLELISM_USE_DEFAULT local parallelism ile çalışır

preferLocalParallelismOne metodu
Sanırım tüm member'lar üzerine 1 local parallelism ile çalışır

preferLocalParallelismOnSingleMember metodu
Rastgele bir member üzerinde belirtilen localParallelism kadar Processor yaratır

randomMember metodu
Rastgelen  bir member'lar üzerine 1 local parallelism ile çalışır




Hiç yorum yok:

Yorum Gönder

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt