Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.pipeline.GeneralStage;
1. Stream-to-Stream Joins
Açıklaması şöyle
With the added stream-to-stream join functionality, enterprises can merge multiple data streams and handle late-arriving records. For example, an onlinebusiness application may monitor both streams of orders and shipments to confirm accurate fulfillment, ....
Hem BatchStage hem de StreamStage nesnelerine uygulanabilen işlemlerdir. Metodları gruplarsak şöyle
addTimestampscustomTransformfilterfilterStatefulfilterUsingServiceflatMapflatMapStatefulflatMapUsingServicemapmapStatefulmapUsingIMapmapUsingServicemapUsingServiceAsyncmapUsingServiceAsyncBatchedmapUsingReplicatedMaphashJoinhashJoin2hashJoinBuilderinnerHashJoininnerHashJoin2peekrebalancerollingAggregatesetLocalParallelismsetNamewriteTo
2. Stateless Transforms
Bu metodlardan bazıları Stateless Transforms olarak geçiyor. Açıklaması şöyle
Stateless transforms are the bread and butter of a data pipeline: they transform the input into the correct shape that is required by further, more complex transforms. The key feature of these transforms is that they do not have side-effects and they treat each item in isolation.
Bunlar şöyle
map
filter
flatMap
merge
mapUsingIMap
mapUsingReplicatedMap
mapUsingService
mapUsingServiceAsync
mapUsingServiceAsyncBatched
mapUsingPython
hashJoin
3. Metod Kullanım Örnekleri
flatMap
Açıklaması şöyle
transforms each item into 0 or more output itemsExample: separate a line of text into individual words
groupingKey metodu
Açıklaması şöyle
• GROUP BY from the SQL• Splits the stream to sub-streams using the key extracted from each record• Sub-streams processed in parallel
Eğer hisse senetlerini isimlerine göre gruplasaydık şeklen şöyle olur
groupingKey işleminden sonra genellikle
- aggregate(counting())
- aggregate2gibi bir işlem gelir
Örnek
Şöyle yaparız
BatchStage<Map<String,Object>> batch1= pipeline.readFrom(companyListBatchSource);BatchStage<Map<String,Object>> batch2= pipeline.readFrom(employeeListBatchSource);//Getting group by keyBatchStageWithKey<Map<String,Object>, Object> jdbcGroupByKey =batch1.groupingKey(a -> a.getSource1().get(col1));BatchStageWithKey<Map<String,Object>, Object> fileGroupByKey =batch2.groupingKey(b -> b.getSource1().get(col2));//trying to join but not sure what exactly is happening.BatchStage<Entry<String, Tuple2<List<Object>, List<Object>>>> d =jdbcGroupByKey.aggregate2(toList(), fileGroupByKey, toList());
Örnek
Şöyle yaparız
Pipeline p = Pipeline.create();BatchStage<Person> persons = p.readFrom("persons");BatchStage<Tuple2<Integer, Long>> countByAge = persons.groupingKey(Person::age).aggregate(counting());
hashJoin metodu - SQL Left Outer Join Gibidir
İki tane stage nesnesini birleştirerek zenginleştirir.
Açıklaması şöyle. Sol tarafa primary stage, sağ tarafa da enriching stage deniyor.
1. Primary stage StreamStage veya BatchStage olabilir. Yani şöyle olabilir
StreamStage + BatchStage BatchStage + BatchStage
2. Sağ taraf her zaman BatchStage olma zorunda.
The hash-join is a joining transform designed for the use case of data enrichment with static data. It is an asymmetric join that joins the enriching stage(s) to the primary stage. The enriching stages must be batch stages — they must represent finite datasets. The primary stage may be either a batch or a stream stage.
Açıklaması şöyle. Enriching stage belleğe okunuyor, yani çok büyük olmamalı.
The hash-join transform is optimized for throughput — each cluster member materializes a local copy of all the enriching data, stored in hashtables (hence the name). It consumes the enriching streams in full before ingesting any data from the primary stream.
Açıklaması şöyle. Enriching stage belleğe okunuyor, yani çok büyük olmamalı.
The output of hashJoin is just like an SQL left outer join: for each primary item there are N output items, one for each matching item in the enriching set. If an enriching set doesn't have a matching item, the output will have a null instead of the enriching item.
innerHashJoin metodu - SQL Inner Join Gibidir
Açıklaması şöyle.
If you need SQL inner join, then you can use the specialised innerHashJoin function, in which for each primary item with at least one match, there are N output items, one for each matching item in the enriching set. If an enriching set doesn't have a matching item, there will be no records with the given primary item. In this case the output function's arguments are always non-null.
mapStateful metodu
Örnek
Şöyle yaparız
items.mapStateful ( //Object that holds the state. Must be mutable! LongAccumulator::new, //Maping function that updates the state (sum,currentEvent) -> { sum.add(currentEvent);//Update the state object return (sum.get() <= THRESHOLD) ? null //Nothing is emitted : sum.get(); } );
mapUsingServiceAsyncmetodu
Örnek
Şöyle yaparız. Burada com.hazelcast.jet.pipeline.ServiceFactories kullanılıyor
interface ProductService { ProductDetails getDetails(int productId); } ServiceFactory<?, ProductService> productService = ServiceFactories .sharedService(ctx -> new ProductService(url)) .toNonCooperative(); StreamStage<OrderDetails> details = orders.mapUsingService(productService, (service, order) -> { ProductDetails details = service.getDetails(order.getProductId); return new OrderDetails(order, details); } );
mapUsingServiceAsync metodu
mapUsingServiceAsync() metodu gibidir. Farklı olarak senkron olarak üretilen bir sonuç nesnesi yerine CompletableFuture<T> nesnesi döndürülmesini bekler.
Hiç yorum yok:
Yorum Gönder