5 Ekim 2022 Çarşamba

Hazelcast Jet Sinks Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.pipeline.Sinks;
Stream olarak akan veriyi bir yere yazan Sink nesnesi yaratmak için kullanılır

jdbc metodu - SQL + Supplier + BiConsumerEx
Verilen SQL'i çalıştırır. Supplier ile DataSource nesnesi verilir. BiConsumerEx ile Prepared Statement parametreleri atanır
Örnek
Şöyle yaparız. IntStream'i, SimpleImmutableEntry nesnesine çevirir ve bunun taşıdığı değerleri veri tabanına yazar
//CREATE TABLE foo (id INT, name VARCHAR(255))

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(IntStream.range(0, 10).boxed().toArray(Integer[]::new)))
  .map(item -> new java.util.SimpleImmutableEntry<>(item, item.toString()))
  .writeTo(Sinks.jdbc("INSERT INTO foo VALUES(?, ?)",
                      () -> createDataSource(),
                      (stmt, item) -> {
	                 stmt.setInt(1, item.getKey());
                         stmt.setString(1,  item.getValue());
                      }
));

HazelcastInstance instance = ...
instance.getJet().newJob(p).join();
jdbc metodu - SQL + DataLinkRef + BiConsumerEx 
Verilen SQL'i çalıştırır. DataLinkRef  ile konfigürasyonda tanımlanan DataSource nesnesi verilir. BiConsumerEx ile Prepared Statement parametreleri atanır
Örnek
Şöyle yaparız. IntStream'i, SimpleImmutableEntry nesnesine çevirir ve bunun taşıdığı değerleri veri tabanına yazar
//CREATE TABLE foo (id INT, name VARCHAR(255))

Pipeline p = Pipeline.create();
p.readFrom(TestSources.items(IntStream.range(0, 10).boxed().toArray(Integer[]::new)))
  .map(item -> new java.util.SimpleImmutableEntry<>(item, item.toString()))
  .writeTo(Sinks.jdbc("INSERT INTO foo VALUES(?, ?)",
                      DataLinkRef::dataLinkRef("my-jdbc-data-store"),
                      (stmt, item) -> {
	                 stmt.setInt(1, item.getKey());
                         stmt.setString(1,  item.getValue());
                      }
));

HazelcastInstance instance = ...
instance.getJet().newJob(p).join();
list metodu
Örnek
Şöyle yaparız
Pipeline p = Pipeline.create();
p.readFrom(KafkaSources.kafka(properties, "mytopic"))
  .withoutTimestamps()
  .writeTo(Sinks.list("mylist"));

mapWithMerging metodu - mapName + toKeyFn + toValueFn + mergeFn
Hazelcast map'ine yazar. Map.merge() metodu gibidir, aynı key varsa value'ları birleştirmeye imkan tanır
Örnek
Şöyle yaparız
Pipeline p = Pipeline.create();
p.readFrom(Sources.<String, User>map("userCache"))
 .map(user -> entry(user.country(), user))
 .writeTo(Sinks.mapWithMerging("usersByCountry",
    e -> e.getKey(),
    e -> e.getValue().name(),
    (oldValue, newValue) -> oldValue + ", " + newValue)
  );
mapWithUpdating metodu - mapName + toKeyFn + updateFn
Hazelcast map'ine yazar. Belirtilen key ve value metodlarını çağırır

Örnek
Şöyle yaparız
.writeTo(Sinks.mapWithUpdating("mymap",
  // Key
  (Tuple2<String, Integer> tuple2) -> tuple2.f0(),
  // Update value
  (Integer oldValue, Tuple2<String, Integer> tuple2) -> 
    oldValue == null ? tuple2.f1() : oldValue + tuple2.f1()));


Hiç yorum yok:

Yorum Gönder

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt