Şu satırı dahil ederiz
import com.hazelcast.jet.kafka.KafkaSources;
Maven
Şu satırı dahil ederiz
<dependency> <groupId>com.hazelcast.jet</groupId> <artifactId>hazelcast-jet-kafka</artifactId> <version>5.0</version> </dependency>
İmzası şöyle. Topic partition bilgisi nasıl geçiliyor görmek için TopicsCongis yazısına bakabilirsiniz
public static <K, V> StreamSource<Entry<K, V>> kafka( Properties properties, String ... topics ) public static <K, V> StreamSource<Entry<K, V>> kafka( DataConnectionRef dataConnectionRef, String ... topics ) public static <K, V, T> StreamSource<T> kafka( Properties properties, FunctionEx<ConsumerRecord<K, V>, T> projectionFn, TopicsConfig topicsConfig ) public static <K, V, T> StreamSource<T> kafka( Properties properties, FunctionEx<ConsumerRecord<K, V>, T> projectionFn, String ... topics ) public static <K, V, T> StreamSource<T> kafka( DataConnectionRef dataConnectionRef, FunctionEx<ConsumerRecord<K, V>, T> projectionFn, String ... topics )
kafka metodu - Properties + topic names
Örnek
Şöyle yaparız
import static com.hazelcast.jet.aggregate.AggregateOperations.counting; import static com.hazelcast.jet.pipeline.WindowDefinition.sliding; public class JetJob { static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss:SSS"); public static void main(String[] args) { Pipeline p = Pipeline.create(); p.readFrom(KafkaSources.kafka(kafkaProps(), "tweets")) .withNativeTimestamps(0) .window(sliding(1_000, 500)) .aggregate(counting()) writeTo(Sinks.logger(wr -> String.format( "At %s Kafka got %,d tweets per second", TIME_FORMATTER.format(LocalDateTime.ofInstant( Instant.ofEpochMilli(wr.end()), ZoneId.systemDefault())), wr.result()))); JobConfig cfg = new JobConfig().setName("kafka-traffic-monitor"); HazelcastInstance hz = Hazelcast.bootstrappedInstance(); hz.getJet().newJob(p, cfg); } private static Properties kafkaProps() { Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("key.deserializer", LongDeserializer.class.getCanonicalName()); props.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName()); props.setProperty("auto.offset.reset", "earliest"); return props; } }
Hiç yorum yok:
Yorum Gönder