Şu satırı dahil ederiz
<dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast</artifactId> <version>5.1.4</version> </dependency> <dependency> <groupId>com.hazelcast.jet.contrib</groupId> <artifactId>pulsar</artifactId> <version>0.1</version> </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>2.10.1</version> </dependency>
Şöyle yaparız
StreamSource<Event>source = PulsarSources.pulsarReaderBuilder(
topicName,
() -> PulsarClient.builder().serviceUrl(“pulsar://localhost:6650”).build(),
() -> Schema.JSON(Event.class),
Message::getValue)
.build();
Pipeline p = Pipeline.create();
p.readFrom(source)
.withNativeTimestamps(0)
.groupingKey(Event::getUser)
.window(sliding(SECONDS.toMillis(60), SECONDS.toMillis(30)))
.aggregate(counting())
.writeTo(Sinks.logger(wr -> String.format(
“At %s Pulsar got %,d messages in the previous minute from %s.”,
TIME_FORMATTER.format(LocalDateTime.ofInstant(
Instant.ofEpochMilli(wr.end()), ZoneId.systemDefault())),
wr.result(), wr.key())));
JobConfig cfg = new JobConfig()
.setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE)
.setSnapshotIntervalMillis(SECONDS.toMillis(1))
.setName(“pulsar-airquality-counter”);
HazelcastInstance hz = Hazelcast.bootstrappedInstance();
hz.getJet().newJob(p, cfg);
Hiç yorum yok:
Yorum Gönder