Giriş
Şu satırı dahil ederiz
import com.hazelcast.map.EntryProcessor;
Neden Lazım?
Açıklaması şöyle. Yani Map üzerinde bir işlem yapmak istersek nesneyi işlemi başlatan yere kadar taşıyıp, nesneyi güncelleyip tekrar geri göndermek gerekir. Bu da maliyetli. EntryProcessor ile işlem sunucu üzerinde yani yerinde yapılabiliyor. Ayrıca sonuçlar toplanıp tekrar istemciye gönderiliyor.
An EntryProcessor passes you a Map.Entry. At the time you receive it the entry is locked and not released until the EntryProcessor completes. This obviates the need to explicitly lock as would be required with a ExecutorService.Performance can be very high as the data is not moved off the Member partition. This avoids network cost and, if the storage format is InMemoryFormat.OBJECT, then there is no de-serialization or serialization cost.
Açıklaması şöyle
The ability to collocate computation with data is probably the biggest strength of Hazelcast. Load your data set in a distributed IMap and process it with EntryProcessors. Or run a stream processing pipeline on top of a stream of CDC events.
Kim Çalıştırır
Açıklaması şöyle. Partition Thread tarafından çalıştırılır
EntryProcessors execute on the partition thread in a member.
Açıklaması şöyle
Internally, EPs are executed by partition threads, where one partition thread takes care of multiple partitions. When an EP comes to Hazelcast, it is picked by the owner thread of the partition where the key belongs. Once the processing is completed, the partition thread is ready to accept and execute other tasks (which may well be same EP for same key, submitted by another thread).
Kod şöyle.
@BinaryInterface @FunctionalInterface public interface EntryProcessor<K, V, R> extends Serializable { R process(Entry<K, V> entry); default @Nullable EntryProcessor<K, V, R> getBackupProcessor() { if (this instanceof ReadOnly) { return null; } return this; } }
EntryProcessor ve Exception
EntryProcessor kendisini çağıran partition thread'e exception fırlatmasa daha iyi
- Eğer null dönerse EntryProcessor sadece key'in sahibi olan Partition üzerinde çalışır
- Eğer null dönmezse backup amaçlı özel bir EntryProcessor dönmek gerekir
Diğer
EntryProcessor IMap arayüzünün şu metodları ile kullanılabilir
<R> Map<K, R> executeOnEntries(@Nonnull EntryProcessor<K, V, R> entryProcessor); <R> Map<K, R> executeOnEntries(@Nonnull EntryProcessor<K, V, R> entryProcessor, @Nonnull Predicate<K, V> predicate); <R> R executeOnKey(@Nonnull K key,@Nonnull EntryProcessor<K, V, R> entryProcessor); <R> Map<K, R> executeOnKeys(@Nonnull Set<K> keys, @Nonnull EntryProcessor<K, V, R> entryProcessor); <R> CompletionStage<R> submitToKey(@Nonnull K key, @Nonnull EntryProcessor<K, V, R> entryProcessor); <R> CompletionStage<Map<K, R>> submitToKeys( @Nonnull Set<K> keys, @Nonnull EntryProcessor<K, V, R> entryProcessor);
Örnek
Şöyle yaparız
public class TestProcessor implements EntryProcessor<String, TestPojo, TestPojo> { private static final long serialVersionUID = -7228575928294057876L; private final TestPojo newValue; TestProcessor(TestPojo newValue) { this.newValue = newValue; } @Override public TestPojo process(Map.Entry<String, TestPojo> entry) { System.out.println("update value: " + entry.getValue() + " with new value " + this.newValue); entry.setValue(newValue); return this.newValue; } } IMap<String, TestPojo> map = ...; map.executeOnKey("key", new TestProcessor(new TestPojo("Version2")));
ComputeEntryProcessor Sınıfı
Örnek
Şöyle yaparız
IMap<Long, Integer> cart = hazelcast.getMap("default"); Integer productId = ...; Integer newQuantity = cart.executeOnKey(productId, new ComputeEntryProcessor<Long, Integer, Integer>((key,value) -> { Integer newQuantity = value + 1; return newQuantity; });