28 Kasım 2022 Pazartesi

Hazelcast Jet WriteJdbcP Sınıfı - JDBC Sink

Giriş
Şu satırı dahil ederiz 
import com.hazelcast.jet.impl.connector.WriteJdbcP;
Processor açısından bakarsak en önemli metodlar şöyle
1. init()
Yeni bir JDBC Connection nesnesi alır

2. process()
Inbox'tan yeni bir nesne alır ve onu veri tabanına yazar

3. close()
JDBC Connection nesnesini kapatır

Nesneyi Yaratmak
Bu nesneyi yaratmanın 2 yolu var.
1. SqlService.execute ile
2. Sinks vs gibi bir builder kod ile. Bu durumda metaSupplier metodu çağrılır

1. SqlService.execute
Eğer belirtilen SQL cümlesi "DELETE ..." ise DeleteProcessorSupplier tarafından yaratılır

metaSupplier metodu
Kodla yaratacaksak çağrı sırası şöyle
Sinks.jdbc()
  JdbcSinkBuilder.build()
    SinkProcessors.writeJdbcP()
      WriteJdbcP.metaSupplier()
Kod şöyleProcessorMetaSupplier döndürür. 
1- ProcessorMetaSupplier sınıfı ProcessorSupplier  döndürür
2. ProcessorSupplier İse Processor Yaratır
3. Yani isimlendirme saçma sapan. Kısaca ProcessorMetaSupplier aslında bir Processor factory. 

Koddaki ProcessorMetaSupplier.preferLocalParallelismOne() ile bir MetaSupplierFromProcessorSupplier nesnesi döndürür. Bu nesne de hep aynı ProcessorSupplier'ı döndürür. Yani DataSource nesnesini ProcessorSupplier yönetiyor
public static <T> ProcessorMetaSupplier metaSupplier(
  String jdbcUrl,
  String updateQuery,
  FunctionEx<ProcessorMetaSupplier.Context, ? extends CommonDataSource> 
    dataSourceSupplier,
  BiConsumerEx<? super PreparedStatement, ? super T> bindFn,
  boolean exactlyOnce,
  int batchLimit
) {
  ...
  return ProcessorMetaSupplier.preferLocalParallelismOne(
    ConnectorPermission.jdbc(jdbcUrl, ACTION_WRITE),
    new ProcessorSupplier() {
      private transient CommonDataSource dataSource;
      @Override
      public void init(@Nonnull Context context) {
        dataSource = dataSourceSupplier.apply(context);
      }
      @Override
      public void close(Throwable error) throws Exception {
        if (dataSource instanceof CloseableDataSource) {
          ((CloseableDataSource) dataSource).close();
        }
      }
      @Override
      public Collection<? extends Processor> get(int localParallelism) {
        return IntStream.range(0, count)
          .mapToObj(i -> new WriteJdbcP<>(updateQuery, dataSource, bindFn,
                                        exactlyOnce, batchLimit))
          .collect(Collectors.toList());
      }
     ...
    });
}
Birinci parametre jdbc url
İkinci parametre SQL
Üçüncü parametre DataSource döndüren metod

Hiç yorum yok:

Yorum Gönder

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt