Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.cdc.mysql.MySqlCdcSources;
Şöyle yaparız
StreamSource<ChangeRecord> source = MySqlCdcSources.mysql("source").setDatabaseAddress("127.0.01").setDatabasePort(3306).setDatabaseUser("dbUser").setDatabasePassword("pw").setClusterName("dbServer1").setDatabaseWhitelist("inventory").setTableWhiteList("inventory.customers").build()Pipeline pipeline = Pipeline.create();pipeline.readFrom(source).withoutTimestamps().peek().writeTo(CdcSinks.map("customers",r-> r.key().toMap().get("id"),r-> r.value().toObject(Customer.class).toString()));JobConfig cfg = new JobConfig();Jet.bootstrappedInstance().newJob(pipeline,cfg);
Hiç yorum yok:
Yorum Gönder