31 Ocak 2023 Salı

Hazelcast Jet JdbcSqlConnector Arayüzü - Stateless Çalışır

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.sql.impl.connector.jdbc.JdbcSqlConnector;
Not : Aslında bu sınıf biraz kötü bir tasarım çünkü Single Responsibility kuralını ihlal ediyor. Çünkü bu sınıf
1. Hem mapping yaratılırken veri tabanındaki tabloyu keşfediyor
2. Hem Hazelcast SQL cümlelerini çalıştırıyor

SqlConnector arayüzünden kalıtır. Mapping type olarak JDBC kullanan mapping'lerle ilgilenir Kodu şöyle
public class JdbcSqlConnector implements SqlConnector {
  public static final String TYPE_NAME = "JDBC";
  ...
}
- Hazelcast SQL cümlelerini gerçek veri tabanına gönderilecek SQL cümlesi haline getirir. 
- Gerçek SQL cümlesi haline getirme işini builder sınıflar yapar. Bu sınıflar şöyle
  • DeleteQueryBuilder
  • InsertQueryBuilder 
  • SelectQueryBuilder, 
  • UpdateQueryBuilder

Gerçek SQL cümlesini şu metodlar yerine getirir. Bu sınıflar yeni bir processor nesnesi yaratıp çalışması için Jet Engine'e verirler.
  • deleteProcessor()
  • insertProcessor()
  • fullScanReade()
  • updateProcessor()
  • sinkProcessor()

createTable metodu
İmzası şöyle
public Table createTable(
            NodeEngine nodeEngine,
            String schemaName,
            String mappingName,
            String[] externalName,
            String dataConnectionName,
            Map<String, String> options,
            List<MappingField> resolvedFields)
TableResolverImpl tarafından çağrılır. Tanımlı olan her mapping için bir tane JdbcTable yaratır.

deleteProcessor metodu
İmzası şöyle. DeleteProcessorSupplier aracılığıyla WriteJdbcP yaratır
public Vertex deleteProcessor(DagBuildContext context)
fullScanReader metodu
İmzası şöyle. SelectProcessorSupplier aracılığıyla  ReadJdbcP yaratır
public Vertex fullScanReader(
  DagBuildContext context,
  HazelcastRexNode predicate,
  List<HazelcastRexNode> projection,
  FunctionEx<ExpressionEvalContext, EventTimePolicy<JetSqlRow>> eventTimePolicyProvider
)
insertProcessor metodu
İmzası şöyle. InsertProcessorSupplier aracılığıyla WriteJdbcP yaratır
public VertexWithInputConfig insertProcessor(DagBuildContext context)
InsertQueryBuilder nesnesini kullanarak JdbcTable nesnesinde belirtilen tüm sütunları kullanan bir INSERT cümlesi üretir. Bu cümle bir InsertProcessorSupplier nesnesine geçilir ve InsertProcessorSupplier da bir Vertex nesnesine eklenir.

updateProcessor metodu
İmzası şöyle. UpdateProcessorSupplier aracılığıyla  WriteJdbcP yaratır
public Vertex updateProcessor(
            DagBuildContext context,
            List<String> fieldNames,
            List<HazelcastRexNode> expressions)
resolveAndValidateFields metodu
İmzası şöyle
public List<MappingField> resolveAndValidateFields(
  NodeEngine nodeEngine,
  Map<String, String> options,
  List<MappingField> userFields,
  String[] externalName,
  String dataConnectionName)
TableResolverImpl tarafından "CREATE MAPPING ..." SQL cümlesi için çağrılır. Belirtilen mapping için mapping yaratılırken external name ile belirtilen tabloya ait sütunları keşfeder. Çağrı sırası kabaca şöyle
SqlService.execute(String sql, Object... arguments)
SqlServiceImpl.execute(SqlStatement statement)
SqlServiceImpl.execute(SqlStatement statement, SqlSecurityContext securityContext)
...
SqlServiceImpl.execute(SqlStatement statement, SqlSecurityContext securityContext,
            QueryId queryId,
            boolean skipStats
)
SqlPlanImpl.execute(QueryId queryId, List<Object> arguments, long timeout)
PlanExecutor.execute(CreateMappingPlan plan)
TableResolverImpl.createMapping(Mapping mapping, boolean replace, boolean ifNotExists)
TableResolverImpl.resolveMapping(Mapping mapping)
Eğer userFields belirtilmişse sadece bu sütunla keşfedilir. Belirtilmemişse tüm sütunlar keşfedilir. 
Keşif sırasında tabloya ait bilgiler DbField denilen geçici bir nesne içinde saklanır. Daha sonra DbField nesnesi, MappingField nesnesine çevrilir. Metod bir List<MappingField> döndürür.

resolveDialect metodu
Bir tane JDBC DataSource nesnesi yaratır. Bu DataSource nesnesinden JDBC Connection nesnesi alır. Connection nesnesinden de DatabaseMetaData nesnesi alır. Sonra Apache Calcite kütüphanesine bu DatabaseMetaData nesnesi için bir SqlDialect nesnesi üretmesini söyler.









Hazelcast Jet ClusterProcessorSupplier Sınıfı - Processor Factory

Giriş
Şu satırı dahil ederiz 
import com.hazelcast.jet.impl.connector.StreamEventJournalP;
Cluster Member Sayısı kadar ClusterProcessorSupplier nesnesi yaratılır.  Bu sınıf ta kendine verilen  partition listesi * localParallelism kadar Processor yaratılır. Kod şöyle.
private static class ClusterProcessorSupplier<E, T> implements ProcessorSupplier {

  static final long serialVersionUID = 1L;

  @Nonnull
  private final List<Integer> ownedPartitions;
  @Nullable
  private final String clientXml;
  @Nonnull
  private final FunctionEx<? super HazelcastInstance, ? extends EventJournalReader<E>>
                eventJournalReaderSupplier;
  @Nonnull
  private final PredicateEx<? super E> predicate;
  @Nonnull
  private final FunctionEx<? super E, ? extends T> projection;
  @Nonnull
  private final JournalInitialPosition initialPos;
  @Nonnull
  private final EventTimePolicy<? super T> eventTimePolicy;

  private transient HazelcastInstance client;
  private transient EventJournalReader<E> eventJournalReader;
  ...
}
constructor
İmzası şöyle. Bir member adresteki belirtilen partition listesi için çalışır.
ClusterProcessorSupplier(
  @Nonnull List<Integer> ownedPartitions,
  @Nonnull ClusterMetaSupplierParams clusterMetaSupplierParams
) {
  ...
}
ExecutionPlanBuilder tarafından çağrılır. Şöyle
<init>:465, StreamEventJournalP$ClusterProcessorSupplier (com.hazelcast.jet.impl.connector)
lambda$get$2:418, StreamEventJournalP$ClusterMetaSupplier (com.hazelcast.jet.impl.connector)
apply:-1, StreamEventJournalP$ClusterMetaSupplier$$Lambda$1974/0x00000008011c3a98 (com.hazelcast.jet.impl.connector)
lambda$null$4:128, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init)
call:-1, ExecutionPlanBuilder$$Lambda$1773/0x000000080114a000 (com.hazelcast.jet.impl.execution.init)
doWithClassLoader:547, Util (com.hazelcast.jet.impl.util)
lambda$createExecutionPlans$b9e545c5$1:128, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init)
runEx:-1, ExecutionPlanBuilder$$Lambda$1753/0x0000000801141d98 (com.hazelcast.jet.impl.execution.init)
run:31, RunnableEx (com.hazelcast.jet.function)
run$$$capture:1804, CompletableFuture$AsyncRun (java.util.concurrent)
run:-1, CompletableFuture$AsyncRun (java.util.concurrent)
 - Async stack trace
<init>:1790, CompletableFuture$AsyncRun (java.util.concurrent)
asyncRunStage:1818, CompletableFuture (java.util.concurrent)
runAsync:2033, CompletableFuture (java.util.concurrent)
createExecutionPlans:141, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init)
createExecutionPlans:255, MasterJobContext (com.hazelcast.jet.impl)
lambda$tryStartJob$4:237, MasterJobContext (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl)
submitToCoordinatorThread:1315, JobCoordinationService (com.hazelcast.jet.impl)
submitToCoordinatorThread:1305, JobCoordinationService (com.hazelcast.jet.impl)
tryStartJob:210, MasterJobContext (com.hazelcast.jet.impl)
tryStartJob:1198, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitJob$2:308, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$55:1327, JobCoordinationService (com.hazelcast.jet.impl)
get metodu - int count
İmzası şöyle. localParallelism kadar processor yaratır
public List<Processor> get(int count)
ExecutionPlan tarafından çağrılır. Şöyle
get:494, StreamEventJournalP$ClusterProcessorSupplier (com.hazelcast.jet.impl.connector)
get:432, StreamEventJournalP$ClusterProcessorSupplier (com.hazelcast.jet.impl.connector)
createProcessors:503, ExecutionPlan (com.hazelcast.jet.impl.execution.init)
lambda$null$1:215, ExecutionPlan (com.hazelcast.jet.impl.execution.init)
call:-1, ExecutionPlan$$Lambda$2012/0x00000008011edb90 (com.hazelcast.jet.impl.execution.init)
doWithClassLoader:547, Util (com.hazelcast.jet.impl.util)
lambda$initialize$3:213, ExecutionPlan (com.hazelcast.jet.impl.execution.init)
accept:-1, ExecutionPlan$$Lambda$1989/0x00000008011dc000 (com.hazelcast.jet.impl.execution.init)
tryFire$$$capture:718, CompletableFuture$UniAccept (java.util.concurrent)
tryFire:-1, CompletableFuture$UniAccept (java.util.concurrent)
 - Async stack trace
<init>:697, CompletableFuture$UniAccept (java.util.concurrent)
uniAcceptStage:737, CompletableFuture (java.util.concurrent)
thenAccept:2182, CompletableFuture (java.util.concurrent)
initialize:191, ExecutionPlan (com.hazelcast.jet.impl.execution.init)
initialize:170, ExecutionContext (com.hazelcast.jet.impl.execution)
lambda$initExecution$9:344, JobExecutionService (com.hazelcast.jet.impl)
doWithClassLoader:547, Util (com.hazelcast.jet.impl.util)
initExecution:343, JobExecutionService (com.hazelcast.jet.impl)
doRun:98, InitExecutionOperation (com.hazelcast.jet.impl.operation)
run:55, AsyncOperation (com.hazelcast.jet.impl.operation)
call:190, Operation (com.hazelcast.spi.impl.operationservice)
call:283, OperationRunnerImpl (com.hazelcast.spi.impl.operationservice.impl)
run:258, OperationRunnerImpl (com.hazelcast.spi.impl.operationservice.impl)
run:219, OperationRunnerImpl (com.hazelcast.spi.impl.operationservice.impl)
run:411, OperationExecutorImpl (com.hazelcast.spi.impl.operationexecutor.impl)
runOrExecute:438, OperationExecutorImpl (com.hazelcast.spi.impl.operationexecutor.impl)
doInvokeLocal:601, Invocation (com.hazelcast.spi.impl.operationservice.impl)
doInvoke:580, Invocation (com.hazelcast.spi.impl.operationservice.impl)
invoke0:541, Invocation (com.hazelcast.spi.impl.operationservice.impl)
invoke:241, Invocation (com.hazelcast.spi.impl.operationservice.impl)
invoke:61, InvocationBuilderImpl (com.hazelcast.spi.impl.operationservice.impl)
invokeOnParticipant:294, MasterContext (com.hazelcast.jet.impl)
invokeOnParticipants:277, MasterContext (com.hazelcast.jet.impl)
initExecution:268, MasterJobContext (com.hazelcast.jet.impl)
lambda$null$1:239, MasterJobContext (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$55:1327, JobCoordinationService (com.hazelcast.jet.impl)
init metodu
Kod şöyle. Yeni bir HazelcastInstance yaratır ve ondan EventJournalReader nesnesi elde eder
@Override
public void init(@Nonnull Context context) {
  HazelcastInstance instance = context.hazelcastInstance();
  if (clientXml != null) {
    client = newHazelcastClient(asClientConfig(clientXml));
    instance = client;
  }
  eventJournalReader = eventJournalReaderSupplier.apply(instance);
}
Buradaki Context parametresi şöyle
ProcessorMetaSupplier.Context -> MetaSupplierCtx
  Processor.Context -> ProcSupplierCtx hem de MetaSupplierCtx
    		       ProcCtx
ExecutionPlan koşmaya başlayınca çağrılır. Şöyle
init:476, StreamEventJournalP$ClusterProcessorSupplier (com.hazelcast.jet.impl.connector)
lambda$null$9c5eb802$1:456, ExecutionPlan (com.hazelcast.jet.impl.execution.init)
runEx:-1, ExecutionPlan$$Lambda$1987/0x00000008011d61f8 (com.hazelcast.jet.impl.execution.init)
run:31, RunnableEx (com.hazelcast.jet.function)
doWithClassLoader:532, Util (com.hazelcast.jet.impl.util)
lambda$initProcSuppliers$10beb6ff$1:455, ExecutionPlan (com.hazelcast.jet.impl.execution.init)
runEx:-1, ExecutionPlan$$Lambda$1986/0x00000008011d36c8 (com.hazelcast.jet.impl.execution.init)
run:31, RunnableEx (com.hazelcast.jet.function)
run$$$capture:1804, CompletableFuture$AsyncRun (java.util.concurrent)
run:-1, CompletableFuture$AsyncRun (java.util.concurrent)
 - Async stack trace
<init>:1790, CompletableFuture$AsyncRun (java.util.concurrent)
asyncRunStage:1818, CompletableFuture (java.util.concurrent)
runAsync:2033, CompletableFuture (java.util.concurrent)
initProcSuppliers:476, ExecutionPlan (com.hazelcast.jet.impl.execution.init)
initialize:189, ExecutionPlan (com.hazelcast.jet.impl.execution.init)
initialize:170, ExecutionContext (com.hazelcast.jet.impl.execution)
lambda$initExecution$9:344, JobExecutionService (com.hazelcast.jet.impl)
doWithClassLoader:547, Util (com.hazelcast.jet.impl.util)
initExecution:343, JobExecutionService (com.hazelcast.jet.impl)
doRun:98, InitExecutionOperation (com.hazelcast.jet.impl.operation)
run:55, AsyncOperation (com.hazelcast.jet.impl.operation)
call:190, Operation (com.hazelcast.spi.impl.operationservice)
call:283, OperationRunnerImpl (com.hazelcast.spi.impl.operationservice.impl)
run:258, OperationRunnerImpl (com.hazelcast.spi.impl.operationservice.impl)
run:219, OperationRunnerImpl (com.hazelcast.spi.impl.operationservice.impl)
run:411, OperationExecutorImpl (com.hazelcast.spi.impl.operationexecutor.impl)
runOrExecute:438, OperationExecutorImpl (com.hazelcast.spi.impl.operationexecutor.impl)
doInvokeLocal:601, Invocation (com.hazelcast.spi.impl.operationservice.impl)
doInvoke:580, Invocation (com.hazelcast.spi.impl.operationservice.impl)
invoke0:541, Invocation (com.hazelcast.spi.impl.operationservice.impl)
invoke:241, Invocation (com.hazelcast.spi.impl.operationservice.impl)
invoke:61, InvocationBuilderImpl (com.hazelcast.spi.impl.operationservice.impl)
invokeOnParticipant:294, MasterContext (com.hazelcast.jet.impl)
invokeOnParticipants:277, MasterContext (com.hazelcast.jet.impl)
initExecution:268, MasterJobContext (com.hazelcast.jet.impl)
lambda$null$1:239, MasterJobContext (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$55:1327, JobCoordinationService (com.hazelcast.jet.impl)

30 Ocak 2023 Pazartesi

Hazelcast Jet SubmitJobOperation Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.impl.operation.SubmitJobOperation;
Kalıtım şöyle
Operation
  AsyncOperation
    AsyncJobOperation
      SubmitJobOperation

doRun metodu
Kod şöyle. İşi JobCoordinationService nesnesine gönderiyor.
@Override
public CompletableFuture<Void> doRun() {
  JobConfig jobConfig = deserializedJobConfig != null ? deserializedJobConfig :
    getNodeEngine().getSerializationService().toObject(serializedJobConfig);
  if (isLightJob) {
    if (deserializedJobDefinition != null) {
      return getJobCoordinationService().submitLightJob(jobId(), 
        deserializedJobDefinition, null, jobConfig, subject);
    }
    return getJobCoordinationService().submitLightJob(jobId(), null, 
      serializedJobDefinition, jobConfig, subject);
  }
  // the jobDefinition for non-light job is always serialized
  assert deserializedJobDefinition == null; 
  return getJobCoordinationService().submitJob(jobId(), 
    serializedJobDefinition, jobConfig, subject);
}

protected JobCoordinationService getJobCoordinationService() {
  return getJetServiceBackend().getJobCoordinationService();
}

Hazelcast Jet AsyncOperation Sınıfı

Giriş
Şu satırı dahil ederiz 
import com.hazelcast.jet.impl.operation.AsyncOperation;
Kalıtım şöyle
DataSerializable
Tenantable
  Operation
    AsyncOperation

doSendResponse metodu
İmzası şöyle
private void doSendResponse(Object value)
Çağrı şöyle. 
- OutboundResponseHandler sınıfına verilen value nesnesini gönderir. 
- OutboundResponseHandler sınıfı da AbstractSerializationService nesnesini kullanarak value nesnesini byte[] haline çevirir. 
SerializationService yazısına bakabilirsiniz
com.hazelcast.internal.serialization.impl.AbstractSerializationService.toBytes
com.hazelcast.spi.impl.operationservice.impl.OutboundResponseHandler.send
com.hazelcast.spi.impl.operationservice.impl.OutboundResponseHandler.sendResponse
com.hazelcast.spi.impl.operationservice.Operation.sendResponse
com.hazelcast.jet.impl.operation.AsyncOperation.doSendResponse


27 Ocak 2023 Cuma

Hazelcast Jet Sources.batchFromProcessor metodu

Giriş
İmzası şöyle
@Nonnull
 public static <T> BatchSource<T> batchFromProcessor(
  @Nonnull String sourceName,
  @Nonnull ProcessorMetaSupplier metaSupplier
 );
Belirtilen ProcessorMetaSupplier factory nesnesini kullanarak batch işlemi yapan bir BatchSource döndürür

26 Ocak 2023 Perşembe

HazelcastAPI WriteThroughStore Sınıfı - Bir MapDatastore Türü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.map.impl.mapstore.writethrough.WriteThroughStore;
MapDatastore şunlar olabilir
MapDatastore
  EmptyMapDataStore
  AbstractMapDataStore
    WriteBehindStore
    WriteThroughStore
Çağrı sırası şöyle
com.hazelcast.map.impl.MapStoreWrapper.load
com.hazelcast.map.impl.mapstore.writethrough.WriteThroughStore.load
com.hazelcast.map.impl.mapstore.writethrough.WriteThroughStore.load
com.hazelcast.map.impl.recordstore.DefaultRecordStore.loadRecordOrNull
com.hazelcast.map.impl.recordstore.DefaultRecordStore.get
com.hazelcast.map.impl.operation.GetOperation.run
com.hazelcast.spi.Operation.call
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.call
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.run
AbstractMapDataStore kalıtımı sebebiyle içinde bir MapStoreWrapper vardır

24 Ocak 2023 Salı

HazelcastAPI SerializerHook Arayüzü

META-INF/services/com.hazelcast.SerializerHook dosyasına SerializerHook arayüzünü gerçekleştiren sınıfın tam ismi yazılır

Örnek
Kod şöyle olsun
package com.hazelcast.samples.jet;
class PersonSerializerHook implements SerializerHook<Person> { @Override public Class<Person> getSerializationType() { return Person.class; } @Override public Serializer createSerializer() { return new PersonSerializer(); } @Override public boolean isOverwritable() { return true; } }
Dosyaya  şöyle yazarı
com.hazelcast.samples.jet.PersonSerializerHook

Hazelcast Jet Sources.remoteMapJournal metodu

Giriş
Bir başka cluster'daki belirtilen IMap'e ait EventJournalMapEvent olaylarını okur. Bu yüzden ClientConfig nesnesini geçmek gerekir. StreamSource nesnesi döner. 

Uzaktaki IMap nesnesinin event journal ayarlarının yapılmış olması gerekir


remoteMapJournal metodu - mapName + ClientConfig + JournalInitialPosition + projectionFn + predicateFn

Örnek
Şöyle yaparız. Örnekteki initialPosition parametresi yanlıştı. Bu parametre bu enumeration
JournalInitialPosition.START_FROM_OLDEST veya JournalInitialPosition.START_FROM_CURRENT
olmalı
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Integer, Integer, Integer>remoteMapJournal(MAP_NAME, 
    clientConfig,
    JournalInitialPosition.START_FROM_OLDEST, // initial position
    EventJournalMapEvent::getNewValue,        // projection
    true))                                    // predicate
  .peek()
  .drainTo(Sinks.list(SINK_NAME));


23 Ocak 2023 Pazartesi

Hazelcast Jet ClusterMetaSupplier Sınıfı - Member Başına Düşen Partition'ı Hesaplar

Giriş
Şu satırı dahil ederiz 
import com.hazelcast.jet.impl.connector.StreamEventJournalP;
Bu sınıf StreamEventJournalP içinde saklı. Kalıtım şöyle
ProcessorMetaSupplier
  ClusterMetaSupplier
  ElasticSourcePMetaSupplier
  FileProcessorMetaSupplier
  KinesisSourcePMetaSupplier
  ...
Kaç tane job submit edildiyse o kadar ClusterMetaSupplier nesnesi yaratılır. Kod şöyle.
private static class ClusterMetaSupplier<E, T> implements ProcessorMetaSupplier {

  static final long serialVersionUID = 1L;

  private final String clientXml;
  private final FunctionEx<? super HazelcastInstance, ? extends EventJournalReader<E>>
                eventJournalReaderSupplier;
  private final PredicateEx<? super E> predicate;
  private final FunctionEx<? super E, ? extends T> projection;
  private final JournalInitialPosition initialPos;
  private final EventTimePolicy<? super T> eventTimePolicy;
  private final SupplierEx<Permission> permissionFn;

  private transient int remotePartitionCount;
  private transient Map<Address, List<Integer>> addrToPartitions;
  ...
}
constructor
DAG aşamasında çağrılır. Şöyle
<init>:368, StreamEventJournalP$ClusterMetaSupplier (com.hazelcast.jet.impl.connector)
streamRemoteMapSupplier:537, StreamEventJournalP (com.hazelcast.jet.impl.connector)
lambda$remoteMapJournal$98b231e7$1:658, Sources (com.hazelcast.jet.pipeline)
applyEx:-1, Sources$$Lambda$1647/0x00000008010ef238 (com.hazelcast.jet.pipeline)
apply:49, FunctionEx (com.hazelcast.function)
addToDag:81, StreamSourceTransform (com.hazelcast.jet.impl.pipeline.transform)
createDag:138, Planner (com.hazelcast.jet.impl.pipeline)
toDag:122, PipelineImpl (com.hazelcast.jet.impl.pipeline)
lambda$submitJob$2:254, JobCoordinationService (com.hazelcast.jet.impl)
run:-1, JobCoordinationService$$Lambda$1628/0x00000008010cc580 (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl)
call:-1, JobCoordinationService$$Lambda$1629/0x00000008010cc7a8 (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$55:1327, JobCoordinationService (com.hazelcast.jet.impl)
call:-1, JobCoordinationService$$Lambda$1630/0x00000008010cc9c8 (com.hazelcast.jet.impl)
init metodu
ExecutionPlanBuilder tarafından çağrılır. Şöyle
init:384, StreamEventJournalP$ClusterMetaSupplier (com.hazelcast.jet.impl.connector)
lambda$null$ba46c9b5$1:117, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init)
runEx:-1, ExecutionPlanBuilder$$Lambda$1757/0x0000000801143ab8 (com.hazelcast.jet.impl.execution.init)
run:31, RunnableEx (com.hazelcast.jet.function)
doWithClassLoader:532, Util (com.hazelcast.jet.impl.util)
lambda$createExecutionPlans$b9e545c5$1:116, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init)
runEx:-1, ExecutionPlanBuilder$$Lambda$1753/0x0000000801141d98 (com.hazelcast.jet.impl.execution.init)
run:31, RunnableEx (com.hazelcast.jet.function)
run$$$capture:1804, CompletableFuture$AsyncRun (java.util.concurrent)
run:-1, CompletableFuture$AsyncRun (java.util.concurrent)
 - Async stack trace
<init>:1790, CompletableFuture$AsyncRun (java.util.concurrent)
asyncRunStage:1818, CompletableFuture (java.util.concurrent)
runAsync:2033, CompletableFuture (java.util.concurrent)
createExecutionPlans:141, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init)
createExecutionPlans:255, MasterJobContext (com.hazelcast.jet.impl)
lambda$tryStartJob$4:237, MasterJobContext (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl)
submitToCoordinatorThread:1315, JobCoordinationService (com.hazelcast.jet.impl)
submitToCoordinatorThread:1305, JobCoordinationService (com.hazelcast.jet.impl)
tryStartJob:210, MasterJobContext (com.hazelcast.jet.impl)
tryStartJob:1198, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitJob$2:308, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$55:1327, JobCoordinationService (com.hazelcast.jet.impl)
get metodu - List<Address>
İmzası şöyle
public Function<Address, ProcessorSupplier> get(@Nonnull List<Address> addresses);
Cluster içindeki her member'ın adresi liste olarak geçilir. Bu adrese ait partition id'ler işleyen bir 
ClusterProcessorSupplier yani processor factory nesnesi döner. ExecutionPlanBuilder tarafından çağrılır. Şöyle
get:411, StreamEventJournalP$ClusterMetaSupplier (com.hazelcast.jet.impl.connector)
lambda$null$3:125, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init)
call:-1, ExecutionPlanBuilder$$Lambda$1765/0x000000080114e9d8 (com.hazelcast.jet.impl.execution.init)
doWithClassLoader:547, Util (com.hazelcast.jet.impl.util)
lambda$createExecutionPlans$b9e545c5$1:125, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init)
runEx:-1, ExecutionPlanBuilder$$Lambda$1753/0x0000000801141d98 (com.hazelcast.jet.impl.execution.init)
run:31, RunnableEx (com.hazelcast.jet.function)
run$$$capture:1804, CompletableFuture$AsyncRun (java.util.concurrent)
run:-1, CompletableFuture$AsyncRun (java.util.concurrent)
 - Async stack trace
<init>:1790, CompletableFuture$AsyncRun (java.util.concurrent)
asyncRunStage:1818, CompletableFuture (java.util.concurrent)
runAsync:2033, CompletableFuture (java.util.concurrent)
createExecutionPlans:141, ExecutionPlanBuilder (com.hazelcast.jet.impl.execution.init)
createExecutionPlans:255, MasterJobContext (com.hazelcast.jet.impl)
lambda$tryStartJob$4:237, MasterJobContext (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl)
submitToCoordinatorThread:1315, JobCoordinationService (com.hazelcast.jet.impl)
submitToCoordinatorThread:1305, JobCoordinationService (com.hazelcast.jet.impl)
tryStartJob:210, MasterJobContext (com.hazelcast.jet.impl)
tryStartJob:1198, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitJob$2:308, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$54:1306, JobCoordinationService (com.hazelcast.jet.impl)
lambda$submitToCoordinatorThread$55:1327, JobCoordinationService (com.hazelcast.jet.impl)

HazelcastAPI DataLinkService Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.datalink.DataLinkService;
DataLink Sınıfı
Kalıtım şöyle
DataLink
DataLinkBase
  JdbcDataLink
  HazelcastDataLink
  MongoDataLink

DataLinkService  Arayüzü
Member tarafından kullanılan servis bu. 
Kalıtım şöyle
DataLinkService
  InternalDataLinkService
    DataLinkServiceImpl

getAndRetainDataLink metodu
Havuzdan bir tane nesne döndürür. Şöyle yaparız
DataLinkService dataLinkService = ...;
HazelcastDataLink hazelcastDataLink = dataLinkService
  .getAndRetainDataLink(dataLinkName, HazelcastDataLink.class);



22 Ocak 2023 Pazar

HazelcastAPI Invocation Sınıfı - Member Tarafındaki Invocation

Giriş
Şu satırı dahil ederiz
import com.hazelcast.spi.impl.AbstractInvocationFuture;
Member tarafındaki Invocation'ı temsil eder.
Kalıtım şöyle
BaseInvocation
  Invocation
    MasterInvocation
    PartitionInvocation
    RaftInvocation
    TargetInvocation

WrongTargetException Sınıfı
Açıklaması şöyle
Thrown when an operation is executed on the wrong machine, usually because the partition that operation belongs to has been moved to some other member.
sendResponse metodu
Kod şöyle. Operation sınıfının getResponse() metodu ne döndürürse, bu boolean, void vs. olabilir. complete() metoduna geçilir.
@Override
public void sendResponse(Operation op, Object response) {
  if (!RESPONSE_RECEIVED.compareAndSet(this, FALSE, TRUE)) {
    throw new ResponseAlreadySentException(...);
   }

   if (response instanceof CallTimeoutResponse) {
     notifyCallTimeout();
   } else if (response instanceof ErrorResponse || response instanceof Throwable) {
     notifyError(response);
   } else if (response instanceof NormalResponse) {
     NormalResponse normalResponse = (NormalResponse) response;
     notifyNormalResponse(normalResponse.getValue(), normalResponse.getBackupAcks());
   } else {
     // there are no backups or the number of expected backups has returned; 
     // so signal the future that the result is ready
     complete(response);
   }
 }


19 Ocak 2023 Perşembe

HazelcastAPI GenericRecord Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.nio.serialization.genericrecord.GenericRecord;
Kalıtım şöyle
GenericRecord 
  InternalGenericRecord
    AbstractGenericRecord

      //Compact olanlar
      CompactGenericRecord
        CompactInternalGenericRecord
          DefaultCompactReader
        DeserializedGenericRecord

      //Portable olanlar
      PortableGenericRecord
        DeserializedPortableGenericRecord
        PortableInternalGenericRecord
DeserializedGenericRecord Sınıfı
CompactSerializer ile yazılmış edilmiş nesneyi okurken eğer member tarafında CompactSerializer bulunamıyorsa, DeserializedGenericRecord döndürülür.



18 Ocak 2023 Çarşamba

Hazelcast Jet JobRepository Sınıfı - Light Olmayan Job'ları Saklar

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.impl.JobRepository;
constructor
Kodu şöyle. Yani JobRecord nesneleri de bir IMap üzerinde saklanır
public class JobRepository {

  private final HazelcastInstance instance;
  private final ILogger logger;

  private final ConcurrentMemoizingSupplier<IMap<Long, JobRecord>> jobRecords;
  private final ConcurrentMemoizingSupplier<IMap<Long, JobResult>> jobResults;
  private final Supplier<IMap<Long, JobExecutionRecord>> jobExecutionRecords;
  private final Supplier<IMap<Long, List<RawJobMetrics>>> jobMetrics;
  private final Supplier<IMap<String, SnapshotValidationRecord>> 
    exportedSnapshotDetailsCache;
  private final Supplier<FlakeIdGenerator> idGenerator;
  ...
}
getJobRecord metodu - long jobId
Belirtilen ID'ye sahip JobRecord  nesnesini döner.

13 Ocak 2023 Cuma

HazelcastAPI MessageTask Arayüzü - Member Üzerindeki Client Thread Çalıştırır

Giriş
Şu satırı dahil ederiz
import com.hazelcast.client.impl.protocol.task.MessageTask;
Member üzerindeki client.thread-N üzerinde çalışır. Client thread'i yöneten şey ClientEngineclient.thread-N bloke edilemez. Yoksa şurada exception fırlatır
registerWaiter:1156, AbstractInvocationFuture (com.hazelcast.spi.impl)
get:607, AbstractInvocationFuture (com.hazelcast.spi.impl)
getStatus0:79, JobProxy (com.hazelcast.jet.impl)
getStatus:198, AbstractJobProxy (com.hazelcast.jet.impl)
Exception şöyle. Yani current thread UnblockableThread ise assertion başarısız olur.
private Object registerWaiter(Object waiter, Executor executor) {
  assert !(waiter instanceof UnblockableThread) :
     "Waiting for response on this thread is illegal"; 
     ... 
}

1. AbstractMessageTask Sınıfı
checPermission metodu
Metod şöyle. Sanırım bu Task nesnesinin çalıp çalışmayacağına karar verir.
private void checkPermissions(ClientEndpoint endpoint) {
  SecurityContext securityContext = clientEngine.getSecurityContext();
  if (securityContext != null) {
    Permission permission = getRequiredPermission();
    if (permission != null) {
      securityContext.checkPermission(endpoint.getSubject(), permission);
    }
  }
}
getRequiredPermission() metodu
İmzası şöyle
Permission getRequiredPermission()
2. Jet için kalıtım
Jet için MessageTask nesnelerini yaratan sınıf JetMessageTaskFactoryProvider. Kalıtım şöyle
MessageTask 
  AbstractMessageTask
    AbstractAsyncMessageTask
      AbstractInvocationMessageTask
          Buradadan sonra Jet için MessageTask  nesneleri var

3. Callable için kalıtım
MessageTask 
    AbstractCallableMessageTask
          Burada bir sürü MessageTask  nesnesi var

12 Ocak 2023 Perşembe

Hazelcast Jet DeleteProcessorSupplier Sınıfı

Giriş
Şu satırı dahil ederiz 
import com.hazelcast.jet.sql.impl.connector.jdbc.DeleteProcessorSupplier;
Kalıtım şöyle
ProcessorSupplier
  AbstractJdbcSqlConnectorProcessorSupplier
    DeleteProcessorSupplier
get metodu
Kodu şöyle. Belirtilen sayıda WriteJdbcP processor yaratır
@Nonnull
@Override
public Collection<? extends Processor> get(int localParallelism) {
  List<Processor> processors = new ArrayList<>(localParallelism);
for (int i = 0; i < count; i++) { Processor processor = new WriteJdbcP<>( query, dataSource, (PreparedStatement ps, JetSqlRow row) -> { for (int j = 0; j < row.getFieldCount(); j++) { ps.setObject(j + 1, row.get(j)); } }, false, batchLimit ); processors.add(processor); } return processors; }


Hazelcast Jet Inbox Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.core.Inbox;
Kalıtım şöyle
Inbox
  AdaptingInbox
  ArrayDequeInbox : Esas kullanılan sınıf bu
  LoggingInbox
  TestInbox

ArrayDequeInbox  Sınıfı
Şu satırı dahil ederiz
import com.hazelcast.jet.impl.util.ArrayDequeInbox;
Kodu şöyle. İçinde bir ArrayDeque var. Jet sınıflarında durmak için ArrayDeque sınıfını add metodun breakpoint koyarız ve breakpoint condition olarak e instanceof JetSqlRow yaparız
/**
 * An {@link Inbox} implementation backed by an {@link ArrayDeque}.
 */
public final class ArrayDequeInbox implements Inbox {

  private final ProgressTracker progTracker;
  private final ArrayDeque<Object> queue = new ArrayDeque<>();
  ...
}
Bu sınıf thread safe değil gibi görünüyor ama aslında ProcessorTasklet#processInbox metodu tarafından doldurulup boşaltıldığı için yani tek thread ile çalıştığı için sorun yok. 

Processor'lar arasında iki tane sınıf var
OneToOneConcurrentArrayQueue
ManyToOneConcurrentArrayQueue

processInbox metodu bu kuyruklardan okuyor ve Inbox nesnesini dolduruyor






Hazelcast SQL - CREATE TYPE Örnekleri

Örnek
Kodla şöyle yaparız
HazelcastInstance hz = ...;
SqlService sql = hz.getSql();

sql.execute("CREATE TYPE UserType (id BIGINT, name VARCHAR, organization OrganizationType)
  OPTIONS ('format'='java', 'javaClass'='com.foo.User')");

sql.execute("CREATE TYPE OrganizationType (id BIGINT, name VARCHAR, office OfficeType) "
OPTIONS ('format'='java', 'javaClass'='com.foo.Organization')");
sql.execute("CREATE TYPE OfficeType (id BIGINT, name VARCHAR) "
OPTIONS ('format'='java', 'javaClass'='com.foo.Office')");

IMap<Long, User> testMap = testInstance().getMap("test");
sql.execute("CREATE MAPPING test (__key BIGINT, this UserType) "
TYPE IMap OPTIONS (
'keyFormat'='bigint',
'valueFormat'='java',
  'valueJavaClass'='com.foo.User'");

String sql = "SELECT
test.this.name AS user_name, "
  test.this.organization.name AS org_name,
  test.this.organization.office.name AS office_name,
  test.this.organization.office.id AS office_id
FROM test";

sql.execute(sql);

HazelcastAPI EntryListenerConfig Sınıfı - Map ve Listener Birlikte Yaratılır Böylece Mesaj Kaçmaz

Giriş
Şu satırı dahil ederiz
import com.hazelcast.config.EntryListenerConfig;
Açıklaması şöyle
Registering Map Listeners
After you create your listener class, you can configure your cluster to include map listeners using the method addEntryListener 
Yani şöyle yaparız
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
IMap<String, String> map = hz.getMap( "somemap" );
map.addEntryListener( new MyEntryListener(), true );
Ancak bu yöntemde bazı listener mesajları kaçabilir
With the above approach, there is the possibility of missing events between the creation of the instance and registering the listener. To overcome this race condition, Hazelcast allows you to register listeners in configuration. 
Bu yüzden şöyle yaparız
mapConfig.addEntryListenerConfig(
new EntryListenerConfig( "com.yourpackage.MyEntryListener",
                                 false, false ) );
constructor - String className, boolean local, boolean includeValue
local  değeri true ise sanırım IMap.addLocalEntryListener gibidir. Yani bu member'da sadece kendi sahibi olduğu key değerler için tetiklenir

Örnek
Şöyle yaparız
public class ExpiredListener implements EntryEvictedListener<String, Application>, 
  EntryExpiredListener<String, Application> {

  private final ApplicationEventPublisher publisher;

  @Override
  public void entryEvicted(EntryEvent<String, Application> event) {
    publisher.publishEvent(new ApplicationCacheExpireEvent(this, event.getOldValue()));
  }

  @Override
  public void entryExpired(EntryEvent<String, Application> event) {
    publisher.publishEvent(new ApplicationCacheExpireEvent(this, event.getOldValue()));
  }
}

MapConfig applicationCache = ...
  .setMaxSizeConfig(new MaxSizeConfig()
                     .setMaxSizePolicy(MaxSizeConfig.MaxSizePolicy.USED_HEAP_SIZE)
                     .setSize(properties.getMaxMapHeapSize()))
                     .setTimeToLiveSeconds(properties.getTtlBeforeSave())
  .addEntryListenerConfig(new EntryListenerConfig(expiredListener, false, true));

11 Ocak 2023 Çarşamba

HazelcastAPI MapLoaderLifecycleSupport Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.map.MapLoaderLifecycleSupport;
destroy metodu
Kodu şöyle
/**
 * Hazelcast will call this method before shutting down.
 * This method can be overridden to clean up the resources
 * held by this map loader implementation, such as closing the
 * database connections, etc.
*/
void destroy();

HazelcastAPI ClusterHeartbeatManager Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.internal.cluster.impl.ClusterHeartbeatManager;
Kodu şöyle. Hata mesajı şöyle
Resetting heartbeat timestamps because of huge system clock jump! Clock-Jump: 174692 ms, Heartbeat-Timeout: 60000 ms]] 
Bir açıklama şöyle
Hazelcast maintains an internal clock to sync the nodes (a clock vector the size of the cluster nodes). When a JVM on one of the nodes will pause longer than 10 seconds, the internal clock for that node will register the time drift and will log the following message:System clock apparently jumped ...

The following is a full example of the message:
"INFO 2019-09-09 05:45:47,408 [cached1] com.hazelcast.cluster.impl.ClusterHeartbeatManager: [10.153.201.11]:5701 [1253850] [3.6.2] System clock apparently jumped from 2019-09-09T05:45:32.370 to 2019-09-09T05:45:47.408 since last heartbeat (+10038 ms)"

These drifts would cause the nodes to have a time difference between them. The time difference accumulates each time a long pause in JVM (> 10 seconds) is registered. When the time difference starts growing more than 20 seconds you might start observing first OperationTimeoutException messages in the logs. 

Hazelcast Jet JetClientInstanceImpl Sınıfı

Giriş 
Şu satırı dahil ederiz
import com.hazelcast.jet.impl.JetClientInstanceImpl;
getJobSummaryList metodu
JobSummary listesi döner. JobSummary  nesnesi içinde JobStatus diye bir alan var.

getJobAndSqlSummaryList metodu
JobAndSqlSummary listesi döner.

10 Ocak 2023 Salı

HazelcastAPI CP Subsystem ICountDownLatch Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.cp.ICountDownLatch ;
Örnek
Şöyle yaparız
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();
CPSubsystem cpSubsystem = hazelcastInstance.getCPSubsystem();


ICountDownLatch latch = cpSubsystem.getCountDownLatch("latch");
latch.countDown();

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt