Giriş
batchFromProcessor metodu
Şu satırı dahil ederiz
import com.hazelcast.jet.pipeline.Sources;
Bazı metodları StreamSource bazıları da BatchSource döner.
1. Bazı metodlar altta bir builder kullanır. Yani bir anlamda builder için façade gibidir.
2. Bazı metodlar ise işi batchFromProcessor() veya streamFromProcessor() metodlarına yönlendirirler ve bu metodlar bir ProcessorMetaSupplier geçerler
Sources.batchFromProcessor yazısına taşıdım
files
Altta bir FileSourcesBuilder kullanır
Örnek
Şöyle yaparız
//Convert object to string for KNS keyFunctionEx<Log, String> keyFn = l -> l.service(); //Convert object to byte[] for KNS value FunctionEx<Log, byte[]> valueFn = l -> l.message().getBytes(); Sink<Log> sink = KinesisSinks.kinesis("stream", keyFn, valueFn).build(); p.readFrom(Sources.files("home/logs")) //read lines of text from log files .map(line -> LogParser.parse(line)) //parse lines into Log data objects .writeTo(sink);
fileWatcher metodu
Örnek ver
jdbc metodu
Sources.jdbc metodu yazısına taşıdım
json metodu
İmzası şöyle
public static <T> BatchSource<T> json(@Nonnull String directory, @Nonnull Class<T> type)
jmsQueue metodu
Sources.jmsQueue metodu yazısına taşıdım
list metodu
Hazelcast listesinden okur
Örnek
Şöyle yaparız
import static com.hazelcast.function.Functions.wholeItem; import static com.hazelcast.jet.Traversers.traverseArray; import static com.hazelcast.jet.aggregate.AggregateOperations.counting; Pipeline p = Pipeline.create(); p.readFrom(Sources.<String>list(LIST_NAME)) .flatMap(word -> traverseArray(word.toLowerCase().split("\\W+"))) .filter(word -> !word.isEmpty()) .groupingKey(wholeItem()) .aggregate(counting()) .writeTo(Sinks.map(MAP_NAME));
map metodu
Hazelcast map'inden okur
Örnek
Şöyle yaparız
Pipeline p = Pipeline.create(); p.readFrom(Sources.<String, User>map("userCache")) .map(user -> entry(user.country(), user)) .writeTo(Sinks.mapWithMerging("usersByCountry", e -> e.getKey(), e -> e.getValue().name(), (oldValue, newValue) -> oldValue + ", " + newValue) );
mapJournal metodu
Sources.mapJournal metodu yazısına taşıdım
remoteMapJournal metodu
Sources.remoteMapJournal metodu yazısına taşıdım
socket metodu
Sources.socket metodu yazısına taşıdım
streamFromProcessor metodu - sourceName + ProcessorMetaSupplier
Örnek
Şöyle yaparız
StreamSource<Integer> source = Sources.streamFromProcessor("src", ProcessorMetaSupplier.of(() -> new GeneratorP(rate))); StreamStage<WindowResult<Long>> stage = p.drawFrom(source) .withIngestionTimestamps() .map(...) .window(tumbling(1_000)) .aggregate(counting());
streamFromProcessorWithWatermarks metodu - sourceName + supportsNativeTimestamps + ProcessorMetaSupplier
Eğer Hazelcast nesneleri haricinde bir Stream varsa ve bunu okuyacak bir Processor varsa, bu ikisini StreamSource olarak kullanabilmeyi sağlar.
Üçüncü parametre bir function. EventTimePolicy nesnesi ile çağrılır ve ProcessorMetaSupplier nesnesi döner. Burada ProcessorMetaSupplier bir şekilde EventTimePolicy ile çalışmayı biliyor.
Açıklaması şöyle
Jet will call the function you supply with an EventTimePolicy and it must return a meta-supplier of processors that will act according to the parameters in the policy and must emit the watermark items as the policy specifies.
Örnek
Şöyle yaparız
public StreamSource<ChangeRecord> build() { return Sources.streamFromProcessorWithWatermarks( "name", true, eventTimePolicy -> ProcessorMetaSupplier.forceTotalParallelismOne( ProcessorSupplier.of(() -> new MyP(...)))); }
Hiç yorum yok:
Yorum Gönder