22 Kasım 2022 Salı

Hazelcast Jet KafkaSources Sınıfı - Kafka Topic Consumer

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.kafka.KafkaSources;
Maven
Şu satırı dahil ederiz
<dependency>
  <groupId>com.hazelcast.jet</groupId>
  <artifactId>hazelcast-jet-kafka</artifactId>
  <version>5.0</version>
</dependency>
İmzası şöyle. Topic partition bilgisi nasıl geçiliyor görmek için TopicsCongis yazısına bakabilirsiniz
public static <K, V> StreamSource<Entry<K, V>> kafka(
  Properties properties,
  String ... topics
)

public static <K, V> StreamSource<Entry<K, V>> kafka(
  DataConnectionRef dataConnectionRef,
  String ... topics
)

public static <K, V, T> StreamSource<T> kafka(
  Properties properties,
  FunctionEx<ConsumerRecord<K, V>, T> projectionFn,
  TopicsConfig topicsConfig
)

public static <K, V, T> StreamSource<T> kafka(
  Properties properties,
  FunctionEx<ConsumerRecord<K, V>, T> projectionFn,
  String ... topics
)

public static <K, V, T> StreamSource<T> kafka(
  DataConnectionRef dataConnectionRef,
  FunctionEx<ConsumerRecord<K, V>, T> projectionFn,
  String ... topics
)
kafka metodu - Properties + topic names
Örnek
Şöyle yaparız
import static com.hazelcast.jet.aggregate.AggregateOperations.counting;
import static com.hazelcast.jet.pipeline.WindowDefinition.sliding;

public class JetJob {
  static final DateTimeFormatter TIME_FORMATTER =
            DateTimeFormatter.ofPattern("HH:mm:ss:SSS");

  public static void main(String[] args) {
    Pipeline p = Pipeline.create();
    p.readFrom(KafkaSources.kafka(kafkaProps(), "tweets"))
      .withNativeTimestamps(0)
      .window(sliding(1_000, 500))
      .aggregate(counting())
      writeTo(Sinks.logger(wr -> String.format(
                 "At %s Kafka got %,d tweets per second",
                 TIME_FORMATTER.format(LocalDateTime.ofInstant(
                         Instant.ofEpochMilli(wr.end()), ZoneId.systemDefault())),
                 wr.result())));

    JobConfig cfg = new JobConfig().setName("kafka-traffic-monitor");
    HazelcastInstance hz = Hazelcast.bootstrappedInstance();
    hz.getJet().newJob(p, cfg);
  }

  private static Properties kafkaProps() {
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", "localhost:9092");
    props.setProperty("key.deserializer", LongDeserializer.class.getCanonicalName());
    props.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
    props.setProperty("auto.offset.reset", "earliest");
    return props;
  }
}

Hiç yorum yok:

Yorum Gönder

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt