Şu satırı dahil ederiz
<dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast</artifactId> <version>5.1.4</version> </dependency> <dependency> <groupId>com.hazelcast.jet.contrib</groupId> <artifactId>pulsar</artifactId> <version>0.1</version> </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>2.10.1</version> </dependency>
Şöyle yaparız
StreamSource<Event>source = PulsarSources.pulsarReaderBuilder( topicName, () -> PulsarClient.builder().serviceUrl(“pulsar://localhost:6650”).build(), () -> Schema.JSON(Event.class), Message::getValue) .build(); Pipeline p = Pipeline.create(); p.readFrom(source) .withNativeTimestamps(0) .groupingKey(Event::getUser) .window(sliding(SECONDS.toMillis(60), SECONDS.toMillis(30))) .aggregate(counting()) .writeTo(Sinks.logger(wr -> String.format( “At %s Pulsar got %,d messages in the previous minute from %s.”, TIME_FORMATTER.format(LocalDateTime.ofInstant( Instant.ofEpochMilli(wr.end()), ZoneId.systemDefault())), wr.result(), wr.key()))); JobConfig cfg = new JobConfig() .setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE) .setSnapshotIntervalMillis(SECONDS.toMillis(1)) .setName(“pulsar-airquality-counter”); HazelcastInstance hz = Hazelcast.bootstrappedInstance(); hz.getJet().newJob(p, cfg);
Hiç yorum yok:
Yorum Gönder