Giriş
Açıklaması şöyle
Factory of ProcessorSupplier instances. The starting point of the chain leading to the eventual creation of Processor instances on each cluster member:1. client creates ProcessorMetaSupplier as a part of the DAG;2. serializes it and sends to a cluster member;3. the member deserializes and uses it to create one ProcessorSupplier for each cluster member;4. serializes each ProcessorSupplier and sends it to its target member;5. the target member deserializes and uses it to instantiate as many instances of Processor as requested by the parallelism property on the corresponding Vertex.Before being asked to create ProcessorSuppliers this meta-supplier will be given access to the Hazelcast instance and, in particular, its cluster topology and partitioning services. It can use the information from these services to precisely parameterize each Processor instance that will be created on each member.
- Kısaca Job başlatmak isteyen client veya member, JobDefinition nesnesini Job Coordinator'a gönderir.
- JobDefinition içeriğinde ProcessorMetaSupplier'da bulunur.
- ProcessorMetaSupplier Job Coordinator tarafından çalıştırılır ve her bir member için ProcessorSupplier döndüren bir supplier üretir.
- ProcessorSupplier gerekli member'a gönderilir ve bu member ProcessorSupplier nesnesini kullanarak Processor Yaratır
Örnek bir kod şöyle. Yani ProcessorMetaSupplier Vertex'e ekleniyor
Planner p = ... ProcessorMetaSupplier metaSupplier = ... ... p.addVertex(this, name(), determinedLocalParallelism(), metaSupplier);
Metodlar
get metodu - List<Address>
İmzası şöyle. ProcessorSupplier döndüren bir Function döner. Burada sanırım addresses listesi get() metodunun üzerinde çalıştığı tüm member listesini belirtmek için kullanılıyor. Bu parametre çoğunlukla önemli değil
Function<? super Address, ? extends ProcessorSupplier> get(List<Address> addresses);
Örnek
Şöyle yaparız
public class MyProcessorMetaSupplier implements ProcessorMetaSupplier { @Override public Function<? super Address, ? extends ProcessorSupplier> get( List<Address> addresses) { return address -> { // Get the index of the member corresponding to this address int memberId = getMemberId(addresses, address); // Create a custom processor with behavior specific to this member Processor processor = new MyProcessor(memberId); // Return a processor supplier that provides instances of the custom processor return ProcessorSupplier.of(processor); }; } // Helper method to get the index of a member based on its address private int getMemberId(List<Address> addresses, Address address) { int index = addresses.indexOf(address); return index < 0 ? -1 : index; } }
of metodu
Sanırım tüm member'lar üzerinde Vertex.LOCAL_PARALLELISM_USE_DEFAULT local parallelism ile çalışır
Sanırım tüm member'lar üzerine 1 local parallelism ile çalışır
preferLocalParallelismOnSingleMember metodu
Rastgele bir member üzerinde belirtilen localParallelism kadar Processor yaratır
randomMember metodu
Rastgelen bir member'lar üzerine 1 local parallelism ile çalışır
Hiç yorum yok:
Yorum Gönder