Şu satırı dahil ederiz. Ya da operator gibi de düşünülebilir
import com.hazelcast.jet.pipeline.StreamStage;
GeneralStage arayüzünden kalıtır
İmzası şöyle. Birinci parametre lookup Map, ikinci parametre lookup Map'te arama yapmak için kullanılacak key, üçüncü parametre eski nesne ve lookup Map'ten gelen parametreyi alır ve yeni bir nesne dönmelidir.
<K, V, R> StreamStage<R> mapUsingIMap( IMap<K, V> iMap, FunctionEx<? super T, ? extends K> lookupKeyFn, BiFunctionEx<? super T, ? super V, ? extends R> mapFn);Belirtilen task'ı başka bir lookup Map kullanarak zenginleştirir
Örnek
Şöyle yaparız
IMap<String, String> lookupTable = ...; Pipeline p = ...; p.readFrom(...) .withNativeTimestamps(0) .mapUsingIMap(lookupTable, Trade::getSymbol, (trade, companyName) -> new EnrichedTrade(trade, companyName) ) .writeTo(Sinks.logger());
mapUsingService metodu
IMap yerine bir başka servisi kullanarak task'ı zenginleştirir
window metodu
WindowDefinition nesnesi alır
Hiç yorum yok:
Yorum Gönder