Şu satırı dahil ederiz
import com.hazelcast.jet.pipeline.WindowDefinition;
Not : Gerçekleştirim için SlidingWindowP sınıfına bakınız
İki tane temel window var. Şeklen şöyle
Bu ikisiyle beraber "early results" kullanılabilir. Açıklaması şöyle
Compute sum of trades for 3-second intervals• Tumbling window• Output each 3 seconds, should be roughly constantCompute sum of trades for 3-second intervals, result so far each second• Tumbling windows with early results• Output each second, grows for 3 seconds and then dropsCompute sum of trades in last 3 seconds, update each second• Sliding windows• Output each second, should be roughly constant
Birincisi sadece tumbling window
İkincisi tumbling window + early results
Üçüncüsü sliding window + early results
setEarlyResultsPeriod metodu
Pencere büyüklüğü olan 3 saniyenin dolmasını beklemeden her 1 saniyede erken sonuçları verir. Açıklaması şöyle
Frequency - When is the result provided?When window closes• All the data in the window were processed• Non-practical for long time ranges (hour and more)In specified time intervals - early resultsWith each item• Use stateful mapping
Örnek
Soru şöyle olsun
// Part 2 - Compute sum of trades for 3-second intervals with speculative results every second// - Use early results when defining the window// - Watch the early result flag in the console output
Şöyle yaparız
import com.hazelcast.jet.aggregate.AggregateOperations; import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.SinkStage; import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.WindowDefinition; Pipeline p = Pipeline.create(); SinkStage sinkStage = p.readFrom(TradeSource.tradeSource(1000)) .withNativeTimestamps(0) .window(WindowDefinition.tumbling(3000).setEarlyResultsPeriod(1000)) .aggregate(AggregateOperations.summingLong(Trade::getPrice)) .writeTo(Sinks.logger());
sliding metodu
- Birinci parametre milisaniye cinsinden pencerenin büyüklüğünü belirtir
- İkinci parametre milisaniye cinsinden pencerenin ne kadar kaydırılacağını belirtir.
Örnek
Şöyle yaparız
Pipeline p = Pipeline.create(); p.readFrom(source) .withNativeTimestamps(0) .groupingKey(Event::getUser) .window(WindowDefinition.sliding(SECONDS.toMillis(60), SECONDS.toMillis(30))) .aggregate(counting()) .writeTo( Sinks.logger( wr -> String.format("got %,d messages in the previous minute from %s.", wr.result(), wr.key()) ) );
tumbling metodu
Pencere büyüklüğü olan 3 saniyede bir hesaplama yapar
Örnek
Soru şöyle olsun
// Part 1 - Compute sum of trades for 3-second intervalsŞöyle yaparız
// - Use 3 sec tumbling windows (defined in WindowDef.tumbling with size 3000
// - Sum trade prices
import com.hazelcast.jet.aggregate.AggregateOperations; import com.hazelcast.jet.pipeline.Pipeline; import com.hazelcast.jet.pipeline.SinkStage; import com.hazelcast.jet.pipeline.Sinks; import com.hazelcast.jet.pipeline.WindowDefinition; Pipeline p = Pipeline.create(); SinkStage sinkStage = p.readFrom(TradeSource.tradeSource(1000)) .withNativeTimestamps(0) .window(WindowDefinition.tumbling(3000)) .aggregate(AggregateOperations.summingLong(Trade::getPrice)) .writeTo(Sinks.logger());
Hiç yorum yok:
Yorum Gönder