Şu satırı dahil ederiz
import com.hazelcast.jet.kafka.connect.impl.KafkaConnectSource;
Örnek
Şöyle yaparız
Properties properties = new Properties(); properties.setProperty("name", "confluentinc-kafka-connect-jdbc"); properties.setProperty("connector.class", "io.confluent.connect.jdbc.JdbcSourceConnector"); properties.setProperty("mode", "incrementing"); properties.setProperty("tasks.max", "1"); String connectionUrl = ... properties.setProperty("connection.url", connectionUrl); properties.setProperty("connection.user", USERNAME); properties.setProperty("connection.password", PASSWORD); properties.setProperty("incrementing.column.name", "id"); properties.setProperty("table.whitelist", "dynamic_test_items1,dynamic_test_items2"); properties.setProperty("table.poll.interval.ms", "5000"); Pipeline pipeline = Pipeline.create(); StreamStage<String> streamStage = pipeline.readFrom(KafkaConnectSources.connect(properties)) .withoutTimestamps() ...
TaskRunner Sınıfı
Bu sınıf org.apache.kafka.connect.source.SourceTask nesnesine sahiptir. stop(), poll(), commit(), commitRecord() metodlarını çağırır. Yani aslında SourceTask nesnesine facade diyelim
ReadKafkaConnectP Sınıfı
TaskRunner nesnesinin kullanarak okuma yapar
ConnectorWrapper
Tüm TaskRunner'ları liste şeklinde saklar. Ayrıca org.apache.kafka.connect.source.SourceConnector nesnesine sahiptir
Hiç yorum yok:
Yorum Gönder