5 Ekim 2022 Çarşamba

Hazelcast Jet Sources Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.pipeline.Sources;
Bazı metodları StreamSource bazıları da BatchSource döner. 
1. Bazı metodlar altta bir builder kullanır. Yani bir anlamda builder için façade gibidir.
2. Bazı metodlar ise işi batchFromProcessor() veya streamFromProcessor() metodlarına yönlendirirler ve bu metodlar bir ProcessorMetaSupplier geçerler

batchFromProcessor metodu
Sources.batchFromProcessor yazısına taşıdım

files
Altta bir FileSourcesBuilder kullanır
Örnek
Şöyle yaparız
//Convert object to string for KNS key
FunctionEx<Log, String> keyFn = l -> l.service(); //Convert object to byte[] for KNS value FunctionEx<Log, byte[]> valueFn = l -> l.message().getBytes(); Sink<Log> sink = KinesisSinks.kinesis("stream", keyFn, valueFn).build(); p.readFrom(Sources.files("home/logs")) //read lines of text from log files .map(line -> LogParser.parse(line)) //parse lines into Log data objects .writeTo(sink);
fileWatcher metodu
Örnek ver

jdbc metodu
Sources.jdbc metodu yazısına taşıdım

json metodu
İmzası şöyle
public static <T> BatchSource<T> json(@Nonnull String directory, @Nonnull Class<T> type)
jmsQueue metodu
Sources.jmsQueue metodu yazısına taşıdım

list metodu
Hazelcast listesinden okur

Örnek
Şöyle yaparız
import static com.hazelcast.function.Functions.wholeItem;
import static com.hazelcast.jet.Traversers.traverseArray;
import static com.hazelcast.jet.aggregate.AggregateOperations.counting;

Pipeline p = Pipeline.create();
p.readFrom(Sources.<String>list(LIST_NAME))
  .flatMap(word -> traverseArray(word.toLowerCase().split("\\W+")))
  .filter(word -> !word.isEmpty())
  .groupingKey(wholeItem())
  .aggregate(counting())
  .writeTo(Sinks.map(MAP_NAME));
map metodu
Hazelcast map'inden okur
Örnek
Şöyle yaparız
Pipeline p = Pipeline.create();
p.readFrom(Sources.<String, User>map("userCache"))
 .map(user -> entry(user.country(), user))
 .writeTo(Sinks.mapWithMerging("usersByCountry",
    e -> e.getKey(),
    e -> e.getValue().name(),
    (oldValue, newValue) -> oldValue + ", " + newValue)
  );
mapJournal metodu 
Sources.mapJournal metodu yazısına taşıdım

remoteMapJournal metodu 
Sources.remoteMapJournal metodu yazısına taşıdım

socket metodu
Sources.socket metodu yazısına taşıdım

streamFromProcessor metodu - sourceName + ProcessorMetaSupplier
Örnek
Şöyle yaparız
StreamSource<Integer> source = Sources.streamFromProcessor("src",
  ProcessorMetaSupplier.of(() -> new GeneratorP(rate)));
StreamStage<WindowResult<Long>> stage = p.drawFrom(source)
  .withIngestionTimestamps()
  .map(...)
  .window(tumbling(1_000))
  .aggregate(counting());
streamFromProcessorWithWatermarks metodu - sourceName  + supportsNativeTimestamps + ProcessorMetaSupplier
Eğer Hazelcast nesneleri haricinde bir Stream varsa ve bunu okuyacak bir Processor varsa, bu ikisini StreamSource olarak kullanabilmeyi sağlar.

Üçüncü parametre bir function. EventTimePolicy nesnesi ile çağrılır ve ProcessorMetaSupplier nesnesi döner. Burada ProcessorMetaSupplier bir şekilde EventTimePolicy ile çalışmayı biliyor.
Açıklaması şöyle
Jet will call the function you supply with an EventTimePolicy and it must return a meta-supplier of processors that will act according to the parameters in the policy and must emit the watermark items as the policy specifies.
Örnek
Şöyle yaparız
public StreamSource<ChangeRecord> build() {
 return Sources.streamFromProcessorWithWatermarks(
  "name",
  true,
  eventTimePolicy -> ProcessorMetaSupplier.forceTotalParallelismOne(
    ProcessorSupplier.of(() -> new MyP(...))));
}






Hiç yorum yok:

Yorum Gönder

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt