17 Ekim 2022 Pazartesi

Hazelcast Jet GeneralStage Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.pipeline.GeneralStage;
1. Stream-to-Stream Joins
Açıklaması şöyle
With the added stream-to-stream join functionality, enterprises can merge multiple data streams and handle late-arriving records. For example, an online
business application may monitor both streams of orders and shipments to confirm accurate fulfillment, .... 
Hem BatchStage hem de StreamStage nesnelerine uygulanabilen işlemlerdir. Metodları gruplarsak şöyle
addTimestamps

customTransform

filter 
filterStateful
filterUsingService

flatMap
flatMapStateful
flatMapUsingService

map
mapStateful
mapUsingIMap
mapUsingService
mapUsingServiceAsync
mapUsingServiceAsyncBatched
mapUsingReplicatedMap

hashJoin
hashJoin2
hashJoinBuilder

innerHashJoin
innerHashJoin2

peek

rebalance

rollingAggregate

setLocalParallelism
setName

writeTo
2. Stateless Transforms
Bu metodlardan bazıları Stateless Transforms olarak geçiyor. Açıklaması şöyle
Stateless transforms are the bread and butter of a data pipeline: they transform the input into the correct shape that is required by further, more complex transforms. The key feature of these transforms is that they do not have side-effects and they treat each item in isolation.
Bunlar şöyle
map
filter
flatMap
merge
mapUsingIMap
mapUsingReplicatedMap
mapUsingService
mapUsingServiceAsync
mapUsingServiceAsyncBatched
mapUsingPython
hashJoin

3. Metod Kullanım Örnekleri

flatMap
Açıklaması şöyle
transforms each item into 0 or more output items
Example: separate a line of text into individual words

groupingKey metodu
Açıklaması şöyle
• GROUP BY from the SQL
• Splits the stream to sub-streams using the key extracted from each record
Sub-streams processed in parallel
Eğer hisse senetlerini isimlerine göre gruplasaydık şeklen şöyle olur

groupingKey işleminden sonra genellikle 
- aggregate(counting())
- aggregate2
gibi bir işlem gelir

Örnek
Şöyle yaparız
BatchStage<Map<String,Object>> batch1= pipeline.readFrom(companyListBatchSource);
BatchStage<Map<String,Object>> batch2= pipeline.readFrom(employeeListBatchSource);

//Getting group by key
BatchStageWithKey<Map<String,Object>, Object> jdbcGroupByKey = 
    batch1.groupingKey(a -> a.getSource1().get(col1));

BatchStageWithKey<Map<String,Object>, Object> fileGroupByKey = 
    batch2.groupingKey(b -> b.getSource1().get(col2));

//trying to join but not sure what exactly is happening.
BatchStage<Entry<String, Tuple2<List<Object>, List<Object>>>> d = 
    jdbcGroupByKey.aggregate2(toList(), fileGroupByKey, toList());
Örnek
Şöyle yaparız
Pipeline p = Pipeline.create();
BatchStage<Person> persons = p.readFrom("persons");
BatchStage<Tuple2<Integer, Long>> countByAge = persons
  .groupingKey(Person::age)
  .aggregate(counting());
hashJoin metodu - SQL Left Outer Join Gibidir
İki tane stage nesnesini birleştirerek  zenginleştirir. 

Açıklaması şöyle. Sol tarafa primary stage, sağ tarafa da enriching stage deniyor. 
1. Primary stage StreamStage veya BatchStage olabilir. Yani şöyle olabilir
StreamStage + BatchStage 
BatchStage + BatchStage
2. Sağ taraf her zaman BatchStage olma zorunda.
The hash-join is a joining transform designed for the use case of data enrichment with static data. It is an asymmetric join that joins the enriching stage(s) to the primary stage. The enriching stages must be batch stages — they must represent finite datasets. The primary stage may be either a batch or a stream stage.
Açıklaması şöyle.  Enriching stage belleğe okunuyor, yani çok büyük olmamalı. 
The hash-join transform is optimized for throughput — each cluster member materializes a local copy of all the enriching data, stored in hashtables (hence the name). It consumes the enriching streams in full before ingesting any data from the primary stream.
Açıklaması şöyle.  Enriching stage belleğe okunuyor, yani çok büyük olmamalı. 
The output of hashJoin is just like an SQL left outer join: for each primary item there are N output items, one for each matching item in the enriching set. If an enriching set doesn't have a matching item, the output will have a null instead of the enriching item.
 innerHashJoin metodu - SQL Inner Join Gibidir
Açıklaması şöyle
If you need SQL inner join, then you can use the specialised innerHashJoin function, in which for each primary item with at least one match, there are N output items, one for each matching item in the enriching set. If an enriching set doesn't have a matching item, there will be no records with the given primary item. In this case the output function's arguments are always non-null.
mapStateful metodu
Örnek
Şöyle yaparız
items.mapStateful (
  //Object that holds the state. Must be mutable!
  LongAccumulator::new,
  //Maping function that updates the state
  (sum,currentEvent) -> {
    sum.add(currentEvent);//Update the state object
    return (sum.get() <= THRESHOLD)
     ? null //Nothing is emitted
     : sum.get();
  }
);
mapUsingServiceAsyncmetodu
Örnek
Şöyle yaparız. Burada com.hazelcast.jet.pipeline.ServiceFactories kullanılıyor
interface ProductService {
  ProductDetails getDetails(int productId);
}

ServiceFactory<?, ProductService> productService = ServiceFactories
  .sharedService(ctx -> new ProductService(url))
  .toNonCooperative();

StreamStage<OrderDetails> details = orders.mapUsingService(productService,
  (service, order) -> {
      ProductDetails details = service.getDetails(order.getProductId);
      return new OrderDetails(order, details);
  }
);
mapUsingServiceAsync metodu
mapUsingServiceAsync() metodu gibidir. Farklı olarak senkron olarak üretilen bir sonuç nesnesi yerine CompletableFuture<T> nesnesi döndürülmesini bekler. 

Hiç yorum yok:

Yorum Gönder

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt