4 Ekim 2022 Salı

Hazelcast Jet WindowDefinition Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.pipeline.WindowDefinition;
Not : Gerçekleştirim için SlidingWindowP sınıfına bakınız

İki tane temel window var. Şeklen şöyle


Bu ikisiyle beraber "early results" kullanılabilir. Açıklaması şöyle
Compute sum of trades for 3-second intervals
• Tumbling window
• Output each 3 seconds, should be roughly constant

Compute sum of trades for 3-second intervals, result so far each second
• Tumbling windows with early results
• Output each second, grows for 3 seconds and then drops

Compute sum of trades in last 3 seconds, update each second
• Sliding windows
• Output each second, should be roughly constant
Birincisi sadece tumbling window
İkincisi tumbling window + early results
Üçüncüsü sliding window + early results

setEarlyResultsPeriod metodu
Pencere büyüklüğü olan 3 saniyenin dolmasını beklemeden her 1 saniyede erken sonuçları verir. Açıklaması şöyle
Frequency - When is the result provided?
When window closes
• All the data in the window were processed
• Non-practical for long time ranges (hour and more)

In specified time intervals - early results

With each item
• Use stateful mapping
Örnek
Soru şöyle olsun
// Part 2 - Compute sum of trades for 3-second intervals with speculative results every second
// - Use early results when defining the window
// - Watch the early result flag in the console output
Şöyle yaparız
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.SinkStage;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.WindowDefinition;

Pipeline p = Pipeline.create();
SinkStage sinkStage = p.readFrom(TradeSource.tradeSource(1000))
  .withNativeTimestamps(0)
  .window(WindowDefinition.tumbling(3000).setEarlyResultsPeriod(1000))
  .aggregate(AggregateOperations.summingLong(Trade::getPrice))
  .writeTo(Sinks.logger());
sliding metodu
- Birinci parametre milisaniye cinsinden pencerenin büyüklüğünü belirtir
- İkinci parametre milisaniye cinsinden pencerenin ne kadar kaydırılacağını belirtir.
Örnek
Şöyle yaparız
Pipeline p = Pipeline.create();

p.readFrom(source)
 .withNativeTimestamps(0)
 .groupingKey(Event::getUser)
 .window(WindowDefinition.sliding(SECONDS.toMillis(60), SECONDS.toMillis(30)))
 .aggregate(counting())
 .writeTo(
  Sinks.logger(
    wr -> String.format("got %,d messages in the previous minute from %s.",
      wr.result(), wr.key())
  )
); 
tumbling metodu
Pencere büyüklüğü olan 3 saniyede bir hesaplama yapar
Örnek
Soru şöyle olsun
// Part 1 - Compute sum of trades for 3-second intervals
// - Use 3 sec tumbling windows (defined in WindowDef.tumbling with size 3000
// - Sum trade prices
Şöyle yaparız
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.SinkStage;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.WindowDefinition;

Pipeline p = Pipeline.create();
SinkStage sinkStage = p.readFrom(TradeSource.tradeSource(1000))
  .withNativeTimestamps(0)
  .window(WindowDefinition.tumbling(3000))
  .aggregate(AggregateOperations.summingLong(Trade::getPrice))
  .writeTo(Sinks.logger());


Hiç yorum yok:

Yorum Gönder

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt