16 Mart 2023 Perşembe

Hazelcast Jet PulsarSources Sınıfı

Maven
Şu satırı dahil ederiz
<dependency>
  <groupId>com.hazelcast</groupId>
  <artifactId>hazelcast</artifactId>
  <version>5.1.4</version>
</dependency>

<dependency>
  <groupId>com.hazelcast.jet.contrib</groupId>
  <artifactId>pulsar</artifactId>
  <version>0.1</version>
</dependency>

<dependency>
  <groupId>org.apache.pulsar</groupId>
  <artifactId>pulsar-client</artifactId>
  <version>2.10.1</version>
</dependency>
Örnek
Şöyle yaparız
StreamSource<Event>source = PulsarSources.pulsarReaderBuilder(
  topicName,
  () -> PulsarClient.builder().serviceUrl(“pulsar://localhost:6650”).build(),
  () -> Schema.JSON(Event.class),
  Message::getValue)
.build();

Pipeline p = Pipeline.create();
p.readFrom(source)
 .withNativeTimestamps(0)
 .groupingKey(Event::getUser)
 .window(sliding(SECONDS.toMillis(60), SECONDS.toMillis(30)))
 .aggregate(counting())
 .writeTo(Sinks.logger(wr -> String.format(
    “At %s Pulsar got %,d messages in the previous minute from %s.”,
      TIME_FORMATTER.format(LocalDateTime.ofInstant(
        Instant.ofEpochMilli(wr.end()), ZoneId.systemDefault())),
      wr.result(), wr.key())));

JobConfig cfg = new JobConfig()
  .setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE)
  .setSnapshotIntervalMillis(SECONDS.toMillis(1))
  .setName(“pulsar-airquality-counter”);

HazelcastInstance hz = Hazelcast.bootstrappedInstance();
hz.getJet().newJob(p, cfg);


Hiç yorum yok:

Yorum Gönder

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt