Şu satırı dahil ederiz
import com.hazelcast.jet.impl.connector.WriteJdbcP;
Processor açısından bakarsak en önemli metodlar şöyle
1. init()
Yeni bir JDBC Connection nesnesi alır
2. process()
Inbox'tan yeni bir nesne alır ve onu veri tabanına yazar
3. close()
JDBC Connection nesnesini kapatır
Nesneyi Yaratmak
Bu nesneyi yaratmanın 2 yolu var.
1. SqlService.execute ile
2. Sinks vs gibi bir builder kod ile. Bu durumda metaSupplier metodu çağrılır
1. SqlService.execute
Eğer belirtilen SQL cümlesi "DELETE ..." ise DeleteProcessorSupplier tarafından yaratılır
metaSupplier metodu
Kodla yaratacaksak çağrı sırası şöyle
Sinks.jdbc() JdbcSinkBuilder.build() SinkProcessors.writeJdbcP() WriteJdbcP.metaSupplier()
Kod şöyle. ProcessorMetaSupplier döndürür.
1- ProcessorMetaSupplier sınıfı ProcessorSupplier döndürür
2. ProcessorSupplier İse Processor Yaratır
3. Yani isimlendirme saçma sapan. Kısaca ProcessorMetaSupplier aslında bir Processor factory.
Koddaki ProcessorMetaSupplier.preferLocalParallelismOne() ile bir MetaSupplierFromProcessorSupplier nesnesi döndürür. Bu nesne de hep aynı ProcessorSupplier'ı döndürür. Yani DataSource nesnesini ProcessorSupplier yönetiyor
public static <T> ProcessorMetaSupplier metaSupplier( String jdbcUrl, String updateQuery, FunctionEx<ProcessorMetaSupplier.Context, ? extends CommonDataSource> dataSourceSupplier, BiConsumerEx<? super PreparedStatement, ? super T> bindFn, boolean exactlyOnce, int batchLimit ) { ... return ProcessorMetaSupplier.preferLocalParallelismOne( ConnectorPermission.jdbc(jdbcUrl, ACTION_WRITE), new ProcessorSupplier() { private transient CommonDataSource dataSource; @Override public void init(@Nonnull Context context) { dataSource = dataSourceSupplier.apply(context); } @Override public void close(Throwable error) throws Exception { if (dataSource instanceof CloseableDataSource) { ((CloseableDataSource) dataSource).close(); } } @Override public Collection<? extends Processor> get(int localParallelism) { return IntStream.range(0, count) .mapToObj(i -> new WriteJdbcP<>(updateQuery, dataSource, bindFn, exactlyOnce, batchLimit)) .collect(Collectors.toList()); } ... }); }
Birinci parametre jdbc url
İkinci parametre SQL
Üçüncü parametre DataSource döndüren metod
Hiç yorum yok:
Yorum Gönder