Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.pipeline.Sinks;
Stream olarak akan veriyi bir yere yazan Sink nesnesi yaratmak için kullanılır
jdbc metodu - SQL + Supplier + BiConsumerEx
Verilen SQL'i çalıştırır. Supplier ile DataSource nesnesi verilir. BiConsumerEx ile Prepared Statement parametreleri atanır
Örnek
Şöyle yaparız. IntStream'i, SimpleImmutableEntry nesnesine çevirir ve bunun taşıdığı değerleri veri tabanına yazar
//CREATE TABLE foo (id INT, name VARCHAR(255)) Pipeline p = Pipeline.create(); p.readFrom(TestSources.items(IntStream.range(0, 10).boxed().toArray(Integer[]::new))) .map(item -> new java.util.SimpleImmutableEntry<>(item, item.toString())) .writeTo(Sinks.jdbc("INSERT INTO foo VALUES(?, ?)", () -> createDataSource(), (stmt, item) -> { stmt.setInt(1, item.getKey()); stmt.setString(1, item.getValue()); } )); HazelcastInstance instance = ... instance.getJet().newJob(p).join();
jdbc metodu - SQL + DataLinkRef + BiConsumerEx
Verilen SQL'i çalıştırır. DataLinkRef ile konfigürasyonda tanımlanan DataSource nesnesi verilir. BiConsumerEx ile Prepared Statement parametreleri atanır
Örnek
Şöyle yaparız. IntStream'i, SimpleImmutableEntry nesnesine çevirir ve bunun taşıdığı değerleri veri tabanına yazar
//CREATE TABLE foo (id INT, name VARCHAR(255)) Pipeline p = Pipeline.create(); p.readFrom(TestSources.items(IntStream.range(0, 10).boxed().toArray(Integer[]::new))) .map(item -> new java.util.SimpleImmutableEntry<>(item, item.toString())) .writeTo(Sinks.jdbc("INSERT INTO foo VALUES(?, ?)", DataLinkRef::dataLinkRef("my-jdbc-data-store"), (stmt, item) -> { stmt.setInt(1, item.getKey()); stmt.setString(1, item.getValue()); } )); HazelcastInstance instance = ... instance.getJet().newJob(p).join();
list metodu
Örnek
Şöyle yaparız
Pipeline p = Pipeline.create(); p.readFrom(KafkaSources.kafka(properties, "mytopic")) .withoutTimestamps() .writeTo(Sinks.list("mylist"));
mapWithMerging metodu - mapName + toKeyFn + toValueFn + mergeFn
Hazelcast map'ine yazar. Map.merge() metodu gibidir, aynı key varsa value'ları birleştirmeye imkan tanır
Örnek
Şöyle yaparız
Pipeline p = Pipeline.create(); p.readFrom(Sources.<String, User>map("userCache")) .map(user -> entry(user.country(), user)) .writeTo(Sinks.mapWithMerging("usersByCountry", e -> e.getKey(), e -> e.getValue().name(), (oldValue, newValue) -> oldValue + ", " + newValue) );
Hazelcast map'ine yazar. Belirtilen key ve value metodlarını çağırır
Örnek
Şöyle yaparız
.writeTo(Sinks.mapWithUpdating("mymap", // Key (Tuple2<String, Integer> tuple2) -> tuple2.f0(), // Update value (Integer oldValue, Tuple2<String, Integer> tuple2) -> oldValue == null ? tuple2.f1() : oldValue + tuple2.f1()));
Hiç yorum yok:
Yorum Gönder