31 Mayıs 2023 Çarşamba

HazelcastAPI QueueStore Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.collection.QueueStore;
Bu sınıfı QueueStoreWrapper çağırır

Örnek - SpringBoot Kullanan
Şöyle yaparız
@Component
public class PostgresQueueStore implements QueueStore<Event> {

  EventRepository eventRepository;

  public PostgresQueueStore(@Lazy EventRepository eventRepository) {
    this.eventRepository = eventRepository;
  }

  @Override
  public void store(Long key, Event value) {
    EventEntity entity = ...;
    eventRepository.save(entity);
  }

  @Override
  public void storeAll(Map<Long, Event> map) {
    Collection<EventEntity> eventEntities = map.entrySet().stream()
      .map(entry -> ...)
      .toList();
      eventRepository.saveAll(eventEntities);
  }

  @Override
  public void delete(Long key) {
    eventRepository.deleteById(key);
  }

  @Override
  public void deleteAll(Collection<Long> keys) {
    eventRepository.deleteAllById(keys);
  }

  @Override
  public Event load(Long key) {
    return eventRepository.findById(key)
      .map(e -> ...)
      .orElse(null);
  }

  @Override
  public Map<Long, Event> loadAll(Collection<Long> keys) {
    List<EventEntity> entities = eventRepository.findAllById(keys);
    return entities.stream().collect(Collectors.toMap(EventEntity::getId,
                e -> ...);
  }

  @Override
  public Set<Long> loadAllKeys() {
    List<EventEntity> events = eventRepository.findAll();
    return events.stream().map(EventEntity::getId).collect(Collectors.toSet());
  }
}


HazelcastAPI QueueStoreConfig Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.config.QueueStoreConfig;
setStoreImplementation metodu
QueueStore nesnesi atar

Örnek
Şöyle yaparız
QueueStore<Foo> queueStore = ...

QueueConfig queueConfig =...
queueConfig.setQueueStoreConfig(new QueueStoreConfig()
  .setEnabled(true)
  .setStoreImplementation(queueStore)
  .setProperty("binary", "false")
);

25 Mayıs 2023 Perşembe

HazelcastAPI MigrationListener Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.partition.MigrationListener;
Kod şöyle. Yani java.util.EventListener arayüzünden kalıtır
public interface MigrationListener extends EventListener {...}
Açıklaması şöyle
it only triggers when a node joins or leaves "gracefully", it doesn't trigger when a node leaves "ungracefully".
migrationStarted metodu
Açıklama yaz

migrationFinished metodu
Açıklama yaz

replicaMigrationCompleted metodu
Açıklama yaz

replicaMigrationFailed metodu
Açıklama yaz

Hazelcast Projesi .github Dizini

CODEOWNERS Dosyası
Dosya şöyle

dependabot.yml
dependabot ayalarını içerir

.github/scripts/publish-rhel.sh Dosyası
Redhat deposuna Docker image yayınlar


HazelcastAPI Data Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.internal.serialization.Data;
Kalıtım şöyle
Data
  HeapData
    Packet

SerializationService tarafından nesne Data arayüzüne çevrilir

getType() metodu
SerializationConstants sınıfındaki sabitlerden birisini döner


24 Mayıs 2023 Çarşamba

HazelcastAPI MapProxySupport Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.map.impl.proxy.MapProxySupport;
constructor
protected MapProxySupport(String name, MapService service, NodeEngine nodeEngine, 
  MapConfig mapConfig) {
  super(nodeEngine, service);
  this.name = name;

  HazelcastProperties properties = nodeEngine.getProperties();

  this.mapServiceContext = service.getMapServiceContext();
  this.mapConfig = mapConfig;
  this.partitionStrategy = mapServiceContext.getPartitioningStrategy(mapConfig.getName(),
    mapConfig.getPartitioningStrategyConfig(), mapConfig.getPartitioningAttributeConfigs());
  this.localMapStats = mapServiceContext.getLocalMapStatsProvider()
    .getLocalMapStatsImpl(name);
  this.partitionService = getNodeEngine().getPartitionService();
  this.lockSupport = new LockProxySupport(MapService.getObjectNamespace(name),
    LockSupportServiceImpl.getMaxLeaseTimeInMillis(properties));
  this.operationProvider = mapServiceContext.getMapOperationProvider(name);
  this.operationService = nodeEngine.getOperationService();
  this.serializationService = nodeEngine.getSerializationService();
  this.thisAddress = nodeEngine.getClusterService().getThisAddress();
  this.statisticsEnabled = mapConfig.isStatisticsEnabled();

  this.putAllBatchSize = properties.getInteger(MAP_PUT_ALL_BATCH_SIZE);
  this.putAllInitialSizeFactor = properties.getFloat(MAP_PUT_ALL_INITIAL_SIZE_FACTOR);
  default value the same as in OperationService
  this.failOnIndeterminateOperationState = properties
    .getBoolean(FAIL_ON_INDETERMINATE_OPERATION_STATE);
}
İçinde bir çeşit operationProvider isminde MapOperationProvider nesnesi barındırır. Bu MapOperationProvider  şeklen şöyle kullanılır. Yani MapProxyImpl ne iş yapacaksa MapOperationProvider sınıfından bir operation nesnesi alır ve onu çalıştırır





hazelcast.xml - IExecutorService Ayarları

Örnek
Şöyle yaparız
hazelcast:
network: join: multicast: enabled: true executor-service: exec: pool-size: 32 queue-capacity: 200 statistics-enabled: true

23 Mayıs 2023 Salı

HazelcastAPI WanReplicationService Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.wan.impl.WanReplicationService;
Kalıtım şöyle
WanReplicationService
  WanReplicationServiceImpl
  EnterpriseWanReplicationService

getSupportedWanProtocolVersions() metodu
İmzası şöyle
List<Version> getSupportedWanProtocolVersions();

18 Mayıs 2023 Perşembe

Hazelcast Jet FileSourceFactory Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.pipeline.file.impl.FileSourceFactory;
Kalıtım şöyle
FileSourceFactory 
  HadoopFileSourceFactory
  LocalFileSourceFactory

Repartitioning Ne Demek

Giriş
Açıklaması şöyle
Repartitioning is the process of redistribution of partition ownerships. Hazelcast performs the repartitioning when a member joins or leaves the cluster.

In these cases, the partition table in the master member is updated with the new partition ownerships. If a lite member joins or leaves a cluster, repartitioning is not triggered since lite members do not own any partitions.

16 Mayıs 2023 Salı

Hazelcast SQL - TUMBLE

Örnek
Şöyle yaparız
SELECT
     window_start,
     window_end,
     ticker,
     ROUND(MAX(price),2) AS high,
     ROUND(MIN(price),2) AS low
FROM TABLE(TUMBLE(
     TABLE trades_ordered,
     DESCRIPTOR(trade_ts),
     INTERVAL '5' SECONDS
))
GROUP BY 1,2,3
;

Hazelcast Jet SqlNode Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.shaded.org.apache.calcite.sql.SqlNode;
SQL cümlelerini parse eden sınıflar. Kalıtım şöyle
SqlNode
  SqlCall
    SqlDdl
      SqlCreate
        SqlCreateMapping
        SqlCreateMaterializedView : CREATE MATERIALIZED VIEW
        com.hazelcast.shaded.org.apache.calcite.sql.ddl.SqlCreateView: CREATE  VIEW
        SqlCreateIndex: CREATE INDEX
        SqlCreateFunction: CREATE FUNCTION
        SqlCreateDataConnection: CREATE DATA CONNECTION
        SqlCreateSchema: CREATE SCHEMA
        com.hazelcast.jet.sql.impl.parse.SqlCreateType: CREATE TYPE
        SqlCreateForeignSchema: CREATE FOREIGN SCHEMA
        SqlCreateJob: CREATE JOB
        com.hazelcast.jet.sql.impl.parse.SqlCreateView
        SqlCreateTable: CREATE TABLE
        SqlCreateSnapshot: CREATE SNAPSHOT
        com.hazelcast.shaded.org.apache.calcite.sql.ddl.SqlCreateType : CREATE TYPE

14 Mayıs 2023 Pazar

Hazelcast Jet SelectProcessorSupplier Sınıfı - ReadJdbcP Processor Yaratır

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.sql.impl.connector.jdbc.SelectProcessorSupplier;
constructor
Kod şöyle. Processor'a verilecek SQL cümlesi ve parametreler belirtilir.
public SelectProcessorSupplier(String dataConnectionName,
                               String query,
                               int[] parameterPositions) {
  ...
}
get metodu
Kod şöyle. count parametresi dikkate alınmaz ve tek bir ReadJdbcP yaratılır
@Override
public Collection<? extends Processor> get(int count) {
  Processor processor = new ReadJdbcP<>(
    ...
  );
  return singleton(processor);
}



12 Mayıs 2023 Cuma

HazelcastAPI DataConnectionRef Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.pipeline.DataConnectionRef;
Bu sınıf Serializable ve Sources.XXX gibi metodlara parametre olarak geçiliyor.

dataConnectionRef metodu
Örnek
Şöyle yaparız
String mapName = ...;
StreamSource<Entry<String, Integer>> source = Sources.remoteMapJournal(
  mapName, 
  DataConnectionRef.dataConnectionRef(...), 
  JournalInitialPosition.START_FROM_OLDEST
);



Hazelcast Jet WriteMapP Sınıfı

Giriş
Şu satırı dahil ederiz 
import com.hazelcast.jet.impl.connector.WriteMapP;
Bir IMap nesnesine yazmak işlemini gerçekleştiren Processor.

processInternal metodu
Kod şöyle. ArrayMap aslında bir Map gerçekleştirimi. Inbox'tan nesneleri okuyor ve buffer nesnesine ekliyor.
public final class WriteMapP<T, K, V> extends AsyncHazelcastWriterP {
    private static final int BUFFER_LIMIT = 1024;
    private ArrayMap<Object, Object> buffer;
    ...
}
 
Kod şöyle.
@Override
protected void processInternal(Inbox inbox) {
  if (buffer.size() < BUFFER_LIMIT) {
    inbox.drain(addToBuffer);
  }
  submitPending();
}

private boolean submitPending() {
  if (buffer.isEmpty()) {
    return true;
  }
  if (!tryAcquirePermit()) {
    return false;
  }
  setCallback(map.putAllAsync(buffer));
  resetBuffer();
  return true;
}


9 Mayıs 2023 Salı

HazelcastAPI UserCodeDeploymentConfig Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.config.UserCodeDeploymentConfig;
addClass metodu
Hem istemci hem de sunucu tarafında kullanım için bir örnek burada

setBlacklistedPrefixes metodu
Örnek
Şöyle yaparız. Burada sunucu tarafında kullanılıyor
Config config = new Config();
UserCodeDeploymentConfig distCLConfig = config.getUserCodeDeploymentConfig();
distCLConfig.setEnabled(true)
  .setClassCacheMode(UserCodeDeploymentConfig.ClassCacheMode.ETERNAL)
  .setProviderMode(UserCodeDeploymentConfig.ProviderMode.LOCAL_AND_CACHED_CLASSES)
  .setBlacklistedPrefixes("com.foo,com.bar")
  .setWhitelistedPrefixes("com.bar.MyClass")
  .setProviderFilter("HAS_ATTRIBUTE:lite");

Hazelcast SQL - SHOW JOBS

GiriŞ
Çalışmakta olan işleri gösterir

Örnek
Şöyle yaparız
SHOW JOBS
Örnek
Şöyle yaparız
sql> SHOW JOBS;
+--------------------+ |name | +--------------------+ |myingest | +--------------------+ 1 row(s) selected

Hazelcast SQL - CREATE JOB

Giriş
Söz dizimi şöyle
CREATE JOB ... AS SINK INTO ...  SELECT ...

Örnek - 3 Tablo Join
Şöyle yaparız. Burada orders, customers, inventory tabloları birleştiriliyor
CREATE JOB amount_due AS
SINK INTO amount_due
     SELECT
          ord.id AS __key,
          ord.cust_id AS cust_id,
          cust.last_name AS last_name,
          cust.first_name AS first_name,
          cust.address1 AS address1,
          cust.address2 AS address2,
          cust.phone AS phone,
          (ord.quantity*inv.unit_price) AS total
     FROM orders AS ord
     JOIN customers AS cust ON ord.cust_id = cust.cust_id
     JOIN inventory AS inv ON ord.item_num = inv.item_num;
Örnek - 2 Tablo Join
Şöyle yaparız. Burada trades ve companies tabloları ticker sütünü ile birleştiriliyor ve sonuç trade_map tablosuna yazılıyor
CREATE JOB ingest_trades AS
  SINK INTO trade_map
  SELECT trades.id, trades.ticker, companies.company, trades.amount FROM trades
  JOIN companies ON companies.ticker = trades.ticker;
Options Örnekleri

processingGuarantee
Sadece Stream işler için kullanılabilir

Örnek
Şöyle yaparız. Burada IMap'ten SELECT yapıp, Kafka Topic'e yazan bir iş var
CREATE JOB testJob
OPTIONS (
  'processingGuarantee' = 'exactlyOnce'
) AS
SINK INTO testTopic
SELECT __key, ticker, price, amount FROM testMap


Hazelcast DataConnection Arayüzü

Giriş
Şu satırı dahil ederiz 
import com.hazelcast.dataconnection.DataConnection;
Kalıtım şöyle
DataConnection
  DataConnectionBase
    HazelcastDataConnection
    KafkaDataConnection
    MongoDataConnection

HazelcastAPI SilentException Arayüzü

Giriş
Şu satırı dahil ederiz. 
import com.hazelcast.spi.exception.SilentException;
Eğer exception sınıfımız bu arayüzden kalıtırsa, member exception fırlatsa bile, Hazelcast bunu sadece finest seviyesi etkin ise loglar. Operation sınıfındaki kod şöyle
public void logError(Throwable e) {
  final ILogger logger = getLogger();
  if (e instanceof SilentException) {
    logger.finest(e.getMessage(), e);
  }
  ...
}

8 Mayıs 2023 Pazartesi

Hazelcast SQL TableResolverImpl Sınıfı

Giriş
Şu satırı dahil ederiz. TableResolver arayüzünü gerçekleştirir.
import com.hazelcast.jet.sql.impl.schema.TableResolverImpl;
TablesTable  Yaratma
TablesTable nesnesini yaratır. Kod şöyle
private static final List<QuadFunction<List<Mapping>, List<View>, List<Type>, 
  NodeEngine, Table>> ADDITIONAL_TABLE_PRODUCERS
  = asList(
  (m, v, t, hz) -> new TablesTable(CATALOG, SCHEMA_NAME_INFORMATION_SCHEMA, 
    SCHEMA_NAME_PUBLIC, m, v),
  .
 );
CATALOG : "hazelcast"
SCHEMA_NAME_INFORMATION_SCHEMA : "information_schema"
SCHEMA_NAME_PUBLIC : "public"
TablesTable nesnesindeki tablo ismi zaten "tables"





Hazelcast SQL TablesTable Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.sql.impl.connector.infoschema.TablesTable;
Şu sorgu için sonuç döner
SELECT * FROM information_schema.tables;
Bu sınıfı yaratan kod TableResolverImpl
rows metodu
Kod şöyle
@Override
protected List<Object[]> rows() {
  List<Object[]> rows = new ArrayList<>(views.size());
  for (Mapping m : mappings) {
    rows.add(new Object[]{
      ...
    });
  }
  for (View v : views) {
    rows.add(new Object[]{
      ...
    });
  }
  return rows;
}

Hazelcast SQL InfoSchemaTable Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.sql.impl.connector.infoschema.InfoSchemaTable;
Kalıtım şöyle
InfoSchemaTable
  DataConnectionsTable
  MappingColumnsTable
  MappingsTable
  UDTAttributesTable
  UserDefinedTypesTable
  ViewsTable




Hazelcast Jet SlidingWindowP Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.impl.processor.SlidingWindowP;
WindowDefinition ile belirtilen sliding, tumbling window işlerini gerçekleştirir.

7 Mayıs 2023 Pazar

hazelcast.xml - IScheduledExecutorService Ayarları

Örnek
Şöyle yaparız
hazelcast:
  ...
  executor-service:
    default:
      statistics-enabled: true
      pool-size: 16
      queue-capacity: 0
  durable-executor-service:
    default:
      pool-size: 16
      durability: 1
      capacity: 100
  scheduled-executor-service:
    default:
      pool-size: 16
      durability: 1
      capacity: 100
      capacity-policy: PER_NODE
      merge-policy:
        batch-size: 100
        class-name: PutIfAbsentMergePolicy
  ...

4 Mayıs 2023 Perşembe

Partition Thread

Giriş
Açıklaması şöyle
These threads handle partition migration and partition replica synchronization. The number of partition threads can be configured using the hazelcast.partition.thread.count property.

Hazelcast SQL SqlRow Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.sql.SqlRow;
Kalıtım şöyle
SqlRow
  SqlRowImpl

SqlRowImpl Sınıfı
Kendi içinde bir JetSqlRow nesnesi taşır. Kod şöyle
public class SqlRowImpl implements SqlRow {

  private final SqlRowMetadata rowMetadata;
  private final JetSqlRow row;
  ...
}



  

Hazelcast SQL - Kafka İçin CREATE DATA CONNECTION Örnekleri

Örnek
Şöyle yaparız. 
CREATE DATA CONNECTION mykafka 
TYPE KAFKA SHARED 
OPTIONS(
  'bootstrap.servers' = '127.0.0.1:9092', 
  'key.serializer' = 'org.apache.kafka.common.serialization.StringSerializer', 
  'value.serializer' = 'org.apache.kafka.common.serialization.StringSerializer');

Hazelcast SQL SqlResult Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.sql.SqlResult;
Arayüzün tanımı şöyle
public interface SqlResult extends Iterable<SqlRow>, AutoCloseable {}
Kalıtım şöyle
SqlResult
  AbstractSqlResult
    SqlResultImpl
    UpdateSqlResultImpl
SqlClientResult 
  
iterator metodu
Sonuç üzerinde yürüyebilmeyi sağlar. SqlRow nesnesi döner.
Örnek
Şöyle yaparız
HazelcastInstance hzInstance = ...;
public List<String> getCountries(String continent) {
  List<String> result = new ArrayList<>();
  try (SqlResult queryResult = hzInstance.getSql()
    .execute("SELECT name FROM countries WHERE continentName = ?", continent)
  ) {
    for (SqlRow row : queryResult) {
      String name = row.getObject(0);
      result.add(name);
    }
  }
}
spliterator metodu
Aslında Iterable arayüzünden geliyor. SqlResult arayüzünü Stream'e çevirebilmeyi sağlar

Hazelcast SQL SqlService Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.sql.SqlService;
Kalıtım şöyle
SqlService 
  InternalSqlService
    MissingSqlService
    SqlServiceImpl
  SqlClientService : HZ Client kullanır
  
executeUpdate metodu
ROW döndürmeyen SQL cümlelerini çalıştırır
Örnek
Şöyle yaparız
SqlService.executeUpdate("CREATE MAPPING my_table DATA CONNECTION FOO");
iterator metodu
Örnek
Eğer SELECT COUNT(*) gibi tek bir sütün dönen SQL kullandıysak şöyle yaparız
HazelcastInstance client = ...;
Long count = client.getSql().execute("...").iterator().next().getObject(0);


Hazelcast SQL - SHOW MAPPINGS

Örnek
Kodla şöyle yaparız. Her row bir string
HazelcastInstance instance = 
SqlService sqlService = instance.getSql();

String sql = "SHOW MAPPINGS";
SqlStatement statement = new SqlStatement(sql);

try (SqlResult result = sqlService.execute(statement)) {
  result.iterator().forEachRemaining(row -> ...));
}

3 Mayıs 2023 Çarşamba

Hazelcast Jet PlanExecutor Sınıfı - Execution Engine

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.sql.impl.PlanExecutor;
catalog Alanı
Kod şöyleJDBC İçin CREATE MAPPING SQL cümlesi ile yaratılan nesneleri saklar
private final TableResolverImpl catalog;
Bu alanı değiştiren kod şöyle
SqlResult execute(CreateMappingPlan plan)
SqlResult execute(DropMappingPlan plan)
SqlResult execute(DropViewPlan plan)
SqlResult execute(DropTypePlan plan)
SqlResult execute(ShowStatementPlan plan)
dataConnectionCatalog Alanı
Kod şöyle. CREATE DATA CONNECTION SQL cümlesi ile yaratılan nesneleri saklar
private final DataConnectionResolver dataConnectionCatalog;
Bu alana ekleyen kod şöyle
SqlResult execute(CreateDataConnectionPlan plan)
SqlResult execute(DropDataConnectionPlan plan)
Catalog nesnelerine şöyle de erişebiliriz
HazelcastInstance hazelcastInstance = ...;
IMap<Object, Object> sqlCatalog = hazelcastInstance
  .getMap(JetServiceBackend.SQL_CATALOG_MAP_NAME);

for (Object catalogItem : sqlCatalog.values()) {
  if (catalogItem instanceof DataConnectionCatalogEntry) {
    ...
  }
}
execute metodları
Aslında bu sınıf bir Execution Engine. Ortalama bir veri tabanı tasarımı şöyle. Yani belirtilen Query Plan'ı çalıştırıyor.
Query Engine internally calls Parser to parse the SQL statements and convert it to Domain objects. 
The Domain Objects are then used by the Query Planner to create a query execution plan based on some rules or heuristics. 
The Query Plan is executed by the Execution Engine. 
The Execution engine talks to Storge Engine by scanning disk files and reading the table records. 
These records are then send back to Query Engine via Iterator Pattern. 
The Query Engine applies filters based on Selection and Projection Operators defined in the Query Plan and finally sends it back to the CLI for printing the table output.
Bazı imzalar şöyle
SqlResult execute(CreateMappingPlan plan)
SqlResult execute(DropMappingPlan plan)
SqlResult execute(CreateDataConnectionPlan plan)
SqlResult execute(DropDataConnectionPlan plan)
SqlResult execute(CreateIndexPlan plan)
SqlResult execute(CreateJobPlan plan, List<Object> arguments)
SqlResult execute(AlterJobPlan plan)
SqlResult execute(DropJobPlan plan)
SqlResult execute(CreateSnapshotPlan plan)
SqlResult execute(DropSnapshotPlan plan)
SqlResult execute(CreateViewPlan plan)
SqlResult execute(DropViewPlan plan)
SqlResult execute(DropTypePlan plan)
SqlResult execute(ShowStatementPlan plan)
SqlResult execute(ExplainStatementPlan plan)
execute metodu - ShowStatementPlan
İmzası şöyle. SHOW X şeklindeki SQL cümlesini çalıştırır. Örneğin SHOW RESOURCES gibi.
SqlResult execute(ShowStatementPlan plan)
Stack çıkıtısı şöyle. JdbcDataConnection#listResources metodunu çağrılıyor
listResources:124, JdbcDataConnection (com.hazelcast.dataconnection.impl)
listResources:58, JdbcDataConnection (com.hazelcast.dataconnection.impl)
executeShowResources:466, PlanExecutor (com.hazelcast.jet.sql.impl)
execute:425, PlanExecutor (com.hazelcast.jet.sql.impl)
execute:1011, SqlPlanImpl$ShowStatementPlan (com.hazelcast.jet.sql.impl) -> Visitor pattern
query0:266, SqlServiceImpl (com.hazelcast.sql.impl) -> Burada ShowStatementPlan yaratılır
execute:204, SqlServiceImpl (com.hazelcast.sql.impl)
execute:172, SqlServiceImpl (com.hazelcast.sql.impl)
execute:168, SqlServiceImpl (com.hazelcast.sql.impl)
execute:164, SqlServiceImpl (com.hazelcast.sql.impl)
execute:89, SqlService (com.hazelcast.sql)
execute metodu - CreateDataConnectionPlan
İmzası şöyle. SHOW X şeklindeki SQL cümlesini çalıştırır. Örneğin SHOW RESOURCES gibi.
SqlResult execute(CreateDataConnectionPlan plan)
DataConnectionResolver tipinden olan dataConnectionCatalog alanına bir data connection ekler. DataConnectionResolver  kendi içindeki DataConnectionStorage nesnesine yeni DataConnectionCatalogEntry nesnesini eklemeye çalışır. DataConnectionCatalogEntry nesnesi saklama alanı olarak aslında bir IMap kullanır. Kod şöyle. Yani __sql.catalog isimlie IMap kullanılıyor
IMap<String, Object> storage() {
  return nodeEngine.getHazelcastInstance().getMap(SQL_CATALOG_MAP_NAME);
}
Eğer ekleme işlemi başarılıysa bu cluster'daki tüm üyelere bildirilir. Kod şöyle
SqlResult execute(CreateDataConnectionPlan plan) {
  ...
  boolean added = dataConnectionCatalog.createDataConnection(...);
  if (added) {
    broadcastUpdateDataConnectionOperations(plan.name());
    // TODO invoke the listeners so plans can be invalidated after the
    //  change was propagated to InternalDataConnectionService
    dataConnectionCatalog.invokeChangeListeners();
  }
  return UpdateSqlResultImpl.createUpdateCountResult(0);
}
broadcastUpdateDataConnectionOperations() metodu kendi içinde UpdateDataConnectionOperation nesnesini tüm member'lara gönderir




Hazelcast SQL DataConnectionConsistencyChecker Sınıfı

Giriş
Şu satırı dahil ederiz 
import  com.hazelcast.sql.impl.DataConnectionConsistencyChecker;
check metodu
Çıktısı şöyle
com.hazelcast.dataconnection.impl.JdbcDataConnection.validate
com.hazelcast.dataconnection.impl.JdbcDataConnection.<init>
sun.reflect.GeneratedConstructorAccessor14.newInstance
sun.reflect.DelegatingConstructorAccessorImpl.newInstance
java.lang.reflect.Constructor.newInstance
com.hazelcast.dataconnection.impl.DataConnectionServiceImpl.createDataConnectionInstance
com.hazelcast.dataconnection.impl.DataConnectionServiceImpl.lambda$put$2
java.util.concurrent.ConcurrentHashMap.compute
com.hazelcast.dataconnection.impl.DataConnectionServiceImpl.put
com.hazelcast.dataconnection.impl.DataConnectionServiceImpl.createOrReplaceSqlDataConnection
com.hazelcast.sql.impl.DataConnectionConsistencyChecker.check
com.hazelcast.sql.impl.state.QueryStateRegistryUpdater$Worker.checkDataConnectionsConsistency
com.hazelcast.sql.impl.state.QueryStateRegistryUpdater$Worker.executeInterruptibly
com.hazelcast.sql.impl.state.QueryStateRegistryUpdater$Worker.run
java.lang.Thread.run

2 Mayıs 2023 Salı

HazelcastAPI Map Sınıfları İlişikisi

NodeEngineImpl has MapService
  MapService has MapServiceContext
    MapServiceContext has ConcurrentMap<String, MapContainer> mapContainers
      MapContainer has MapStoreContext. aka BasicMapStoreContext
        BasicMapStoreContext has MapStoreWrapper
        BasicMapStoreContext has MapStoreManager. aka WriteBehindManager, WriteThroughManager
          WriteBehindManager has StoreWorker
          WriteBehindManager has WriteBehindProcessor

  

hazelcast.xml - MapStore Ayarları

Giriş
Not : Kod için ayarlar MapStoreConfig ile yapılıyor
Tüm ayarlar <map-store tag></map-store tag> içinde. XML ayarlarının açıklaması şöyle
class-name: Name of the class implementing MapLoader and/or MapStore.

write-delay-seconds: Number of seconds to delay to call the MapStore.store(key, value)`. If the value is zero then it is write-through, so the MapStore.store(key,value) method is called as soon as the entry is updated. Otherwise, it is write-behind; so the updates will be stored after the write-delay-seconds value by calling the Hazelcast.storeAll(map) method. Its default value is 0 (write-through).

write-batch-size: Used to create batch chunks when writing to the external data store. In default mode, all map entries are tried to be written in one go. To create batch chunks, the minimum meaningful value for write-batch-size is 2. For values smaller than 2, it works as in default mode.

write-coalescing: In write-behind mode, Hazelcast coalesces updates on a specific key by default; it applies only the last update on it. You can set this element to false to store all updates performed on a key to the data store.

enabled: True to enable this map-store, false to disable. Its default value is true.

initial-mode: Sets the initial load mode. LAZY is the default load mode, where load is asynchronous. EAGER means map operations are blocked until all partitions are loaded.
Örnek
Şöyle yaparız
<map name="...">
  <map-store enabled="true">
    <write-delay-seconds>1</write-delay-seconds>
    <write-batch-size>5</write-batch-size>
    <write-coalescing>true</write-coalescing>
</map-store>
  <in-memory-format>NATIVE</in-memory-format>
  <backup-count>0</backup-count>
  <async-backup-count>1</async-backup-count>
  <time-to-live-seconds>0</time-to-live-seconds>
  <max-idle-seconds>1800</max-idle-seconds>
  <eviction eviction-policy="NONE" max-size-policy="PER_NODE" size="0"/>
  <merge-policy>com.hazelcast.spi.merge.PutIfAbsentMergePolicy</merge-policy>
</map>


Hazelcast SQL - SHOW RESOURCES FOR Örnekleri

Giriş
name ve type isimli iki sütun döner. 

Örnek
PostgresSQL çıktısı şöyle. Çünkü iki tane tablo vardı
"public"."myworker"         Table
"public"."myworker_backup"  Table
Örnek
MySQL çıktısı şöyle
"db"."myworker"            Table
"db"."myworker_backup"     Table

Hazelcast Jet AbstractProcessor Sınıfı

Giriş
Şu satırı dahil ederiz 
import com.hazelcast.jet.core.AbstractProcessor;
Hemen hemen tüm kalıtımlar AbstractProcessor sınıfından oluyor

emitFromTraverser
Belirtilen Traverser nesnesini dolaşır ve tryEmit() metoduna geçer. Yani aslında Outbox nesnesine yazar. Eğer her şeyi yazabildiyse true döner, Yazamadıysa false döner. Kod şöyle
protected final <E> boolean emitFromTraverser(int ordinal, Traverser<E> traverser) {
  E item;
  if (pendingItem != null) {
    item = (E) pendingItem;
    pendingItem = null;
  } else {
    item = traverser.next();
  }
    for (; item != null; item = traverser.next()) {
      if (!tryEmit(ordinal, item)) {
        pendingItem = item;
        return false;
      }
    }
    return true;
}
Örnek
Stack trace şöyle. complete metodu bir Traverser nesnesi yaratır ve bunu emitFromTraverser() çağrısına geçer. emitFromTraverser() ,  Traverser nesnesi null dönünceye kadar tryEmit() çağrısı yapar. tryEmit() traverser'ın döndürdüğü şeyi OutBox'a yazar.
tryEmit:309, AbstractProcessor (com.hazelcast.jet.core)
emitFromTraverser:408, AbstractProcessor (com.hazelcast.jet.core)
emitFromTraverser:421, AbstractProcessor (com.hazelcast.jet.core)
complete:96, ReadKafkaConnectP (com.hazelcast.jet.kafka.connect.impl)
complete:541, ProcessorTasklet (com.hazelcast.jet.impl.execution)
stateMachineStep:421, ProcessorTasklet (com.hazelcast.jet.impl.execution)
call:291, ProcessorTasklet (com.hazelcast.jet.impl.execution)
tryEmit metodu
İçi şöyle
protected final boolean tryEmit(int ordinal, @Nonnull Object item) {
  return outbox.offer(ordinal, item);
}

Hazelcast Jet Traverser Arayüzü - Iterator + Strream Gibi

Giriş
Şu satırı dahil ederiz 
import com.hazelcast.jet.Traverser;
- Belirtilen kaynaktan okur veya üzerinde yürüyerek bir veya daha fazla nesne döner. 
Traverser arayüzü bir @FunctionalInterface O yüzden çoğu gerçekleştirim sadece next() metodunu override eden lambda ile yapılabilir.

Neden Iterator + Stream Gibi
Çünkü hem next() metodu sağlıyor hem de Stream gibi flatMap(), map(), filter() gibi default metodlar sağlıyor

Traverser ve Inbox Ilişkisi
Traverser nesnesi Inbox'tan gelen bir nesneye işlem uygular ve bu çıktıyı Outbox'a yazılır. Yazma için 
AbstractProcessor#emitFromTraverser() metodu kullanılır

next metodu
Açıklaması şöyle
The next() method is called to obtain the next item in the sequence, and it returns null when there are no more items. A Traverser instance can be thought of as a non-blocking iterator that can be used to read items from an input source in a distributed and scalable manner.
Default Metodlar
dropWhile
filter
flatMap
map
onFirstNull
peek
prepend
takeWhile

flatMap metodu
Örnek
Şöyle yaparız. Iterator'deki her nesne için bir başka iterator dönülüyor
Iterator<JetSqlRow> joinRow(JetSqlRow leftRow) {
  ...       
}

Iterable<JetSqlRow> leftRows = ...;
Traversers.traverseIterable(leftRows)
  .flatMap(jetSqlRow -> Traversers.traverseIterator(joinRow(jetSqlRow)



THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt