31 Ekim 2022 Pazartesi

HazelcastAPI FlakeIdGenerator Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.flakeidgen.FlakeIdGenerator;
newId metodu
Örnek
Şöyle yaparız
FlakeIdGenerator idGenerator = hazelcastInstance.getFlakeIdGenerator("newid");
for (int i = 0; i < 10; i++) {
  map.put(idGenerator.newId(), "message" + i);
}


HazelcastAPI Aggregators Sınıfı - Kullanmayın

Giriş
IMap 3 tane metod sunuyor. Bunlar şöyle
accumulate()
combine()
aggregate()
Sanırım Predicate API artık idame (maintenance) durumda, ve bunun yerine SQL API'nin kullanılması daha iyi.

aggregate metodu
İmzası şöyle
<R> R aggregate(Aggregator<Map.Entry<K, V>, R> aggregator, Predicate<K, V> predicate);
Aggregator Sınıfı
Şu satırı dahil ederiz
import com.hazelcast.aggregation.Aggregators;
Aggregators sınıf hazır bazı aggregator metodları sunuyor. Bunlar şöyle
count
distinct
bigDecimal sum/avg/min/max
bigInteger sum/avg/min/max
double sum/avg/min/max
integer sum/avg/min/max
long sum/avg/min/max
number avg
comparable min/max
fixedPointSum, floatingPointSum
count metodu
Örnek
Şöyle yaparız
HazelcastInstance hazelCast = Hazelcast.newHazelcastInstance();
IMap<String, String> map1 = hazelCast.getMap("map1");

map1.put("1", "john");
map1.put("2", "charlie");   
map1.put("3", "john");
map1.put("4", "john");
    
Long count = map1.aggregate(Aggregators.count(), e -> "john".equals(e.getValue()));

System.out.println(count);



30 Ekim 2022 Pazar

HazelcastAPI CPSubsystemConfig Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.config.cp.CPSubsystemConfig;
setGroupSize metodu
Açıklaması şöyle
When the CP subsystem starts, it internally creates 2 consensus groups. We call these groups CP groups. Each CP group runs the Raft consensus algorithm to elect a leader node and commit operations. The first CP group is the Metadata CP group that manages CP members and CP groups created for data structures. The second CP group is the Default CP group. When you do not specify a CP group name while fetching a CP data structure proxy, the proxy internally talks to the Default CP group.
Şöyle yaparız
Config config = ...
config.getCPSubsystemConfig().setCPMemberCount(5);
config.getCPSubsystemConfig().setGroupSize(3);
Açıklaması şöyle
Here, we configure the CP group size and CP member count parameters separately. What we do is, we form the CP subsystem with 5 CP members. However, CP groups will be created with 3 CP members. It means that when a CP group is created, its members will be randomly selected among all CP members available in the CP subsystem. With this method, you can improve throughput by running multiple instances of the Raft consensus algorithm via CP groups and having multiple Raft leaders. This approach is basically partitioning (sharding) and you decide how partitions will be created.
setCPMemberCount
Açıklaması şöyle
The current set of CP data structures (IAtomicLong, IAtomicReference, ICountDownLatch, ISemaphore, FencedLock) have relatively small memory footprints. Therefore, all members of your Hazelcast IMDG cluster do not have to be CP members. It is perfectly fine to configure only 3 or 5 CP members in a 100-member Hazelcast cluster. All members of a cluster have access to the CP Subsystem APIs.
Örnek
Şöyle yaparız
Config config = ...
config.getCPSubsystemConfig().setCPMemberCount(3);
setPersistenceEnabled metodu
Örnek
Şöyle yaparız
config.getCPSubsystemConfig().setPersistenceEnabled(true);

28 Ekim 2022 Cuma

HazelcastAPI GenericMapStore Sınıfı - Hazelcast ile Hazır Geliyor

Giriş
Şu satırı dahil ederiz
import com.hazelcast.mapStore.GenericMapStore;
Maven
Şu satırı dahil ederiz
<dependency>
  <groupId>com.hazelcast</groupId>
  <artifactId>hazelcast-mapstore</artifactId>
  <version>${hazelcast.version}</version>
</dependency>

<dependency>
  <groupId>com.hazelcast</groupId>
  <artifactId>hazelcast-sql</artifactId>
  <version>${hazelcast.version}</version>
</dependency>
Özet
Eğer bir IMap varsa bunu harici veri tabanına kaydetmek için Hazelcast ile hazır gelen GenericMapStore kullanılabilir. 

Bu sınıf, Hazelcast içinde otomatik olarak bir mapping yaratır. Hazelcast içindeki mapping ise harici bir veri tabanına bağlıdır. Hazelcast mapping'ine yapılan sorgu aslında harici veri tabanına yönlendirilir.  Örneğin harici veri tabanı kapatılır ve Hazelcast mapping'ine sorgu yapılırsa, hata alındığı görülebilir. 
Açıklaması şöyle
The name of the mapping is the same name as your map prefixed with __map-store

Peki amaç nedir ? Bence amaç Jet Job'ları içinde de SQL ile sorgu yapabilmek.

- SQL binding için gereken cümleleri com.hazelcast.sql.impl.SqlServiceImpl çalıştırıyor. Bu sınıf  com.hazelcast.jet.sql.impl.SqlPlanImpl içindeki plan nesnelerini çalıştırıyor
- Map'a yazılan verinin DB'ye yazma işlemini com.hazelcast.jet.impl.connector.WriteJdbcP sınıfı gerçekleştirir.

Kullanım
Bu sınıfı kullanabilmek için  şöyle yaparız
IMap<Integer, GenericRecord> map = hazelcastInstance.getMap("...");
veya şöyle yaparız. Bu durumda Person nesnesi Compact veya Portable serialization desteklemelidir.
IMap<Integer, Person> map = hazelcastInstance.getMap("...");
Jet Ayarları
Bu sınıfı kullanabilmek için ayrıca Jet ayarlarının yapılmış olması gerekir. Yoksa ilk put() çağrısında şöyle bir hata alırız
Caused by: com.hazelcast.sql.HazelcastSqlException: The Jet engine is disabled.
Jet ayarları için hazelcast.xml dosyasında şöyle yaparız
<jet enabled="true" resource-upload-enabled="true">
  <instance>
    <cooperative-thread-count>4</cooperative-thread-count>
    <flow-control-period>100</flow-control-period>
    <backup-count>1</backup-count>
    <scale-up-delay-millis>10000</scale-up-delay-millis>
    <lossless-restart-enabled>false</lossless-restart-enabled>
    <max-processor-accumulated-records>1000000000</max-processor-accumulated-records>
  </instance>
  <edge-defaults>
    <queue-size>1024</queue-size>
    <packet-size-limit>16384</packet-size-limit>
    <receive-window-multiplier>3</receive-window-multiplier>
    </edge-defaults>
</jet>
Açıklaması şöyle
The generic MapStore is a pre-built implementation that connects to an external data store, using the external-data-store configuration.

The generic MapStore is called low-code because it requires you to write little to no Java code:

- If Hazelcast provides a built-in data store factory for your data store, you can configure your cluster to use it, without writing any Java code.

- If Hazelcast doesn’t provide a built-in data store factory for your data store, you can write your own in Java to allow the pre-built MapStore to connect to your data store.

Kod Konfigürasyonu
data-connection-ref tanımlı olmalı

Örnek - MapStoreConfig 
Şöyle yaparız. MapStoreConfig sınıfına verilen string sabitler aynı zamanda GenericMapStore içinde de tanımlı
Config config = ...

MapStoreConfig mapStoreConfig = new MapStoreConfig()
  .setClassName(GenericMapStore.class.getName()) // Use GenericMapStore as MapStore
  .setProperty("data-connection-ref", "datastore") // Datalink to use
  .setProperty("table-name", myTableName); // Table name to read from
  .setProperty("type-name", "org.example.Person");

MapConfig mapConfig = new MapConfig(myTableName)
.setMapStoreConfig(mapStoreConfig); config.addMapConfig(mapConfig);
XML Konfigürasyonu
XML Konfigürasyonu yazısına taşıdım




27 Ekim 2022 Perşembe

HazelcastAPI ReplicatedMapConfig Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.config.ReplicatedMapConfig;
Örnek
Şöyle yaparız
ReplicatedMapConfig replicatedMapConfig = config.getReplicatedMapConfig("rogueUsers");

replicatedMapConfig.setInMemoryFormat(InMemoryFormat.BINARY);
replicatedMapConfig.setAsyncFillup(true);
replicatedMapConfig.setStatisticsEnabled(true);
replicatedMapConfig.setSplitBrainProtectionName("splitbrainprotection-name");
Daha sonra şöyle yaparız
HazelcastInstance hz = ...;
ReplicatedMap<String, String> map = hz.getReplicatedMap("rogueUsers");

hazelcast.xml - Network Join Ayarları

Giriş
Not : Bu yazı hazelcast.xml - Cluster'ı Başlatmak İçin Ayarlar yazısının bir devamı

Network join  için varsayılan yöntem multicast yöntemidir. Yani Hazelcast kümesindeki bilgisayarlar birbirlerini bulmak için multicast kullanırlar. Başka bir yöntem kullanmak istiyorsak multicast yöntemini kapatmak gerekir. Şöyle yaparız
<network>
  <join>
    <multicast enabled="false"/>
    Another join method is defined here
  </join>
</network>
Üyeler birbirlerini bulunca şuna benzer bir çıktı verir. Çıktıda üye kendisini ilk satıra yazar ve "this" olarak belirtir. Ayrıca "ver:" alanı ile kaç defa güncellendiği yani version belirtilir.
Members {size:2, ver:2} [
    Member [10.5.167.10]:5701 - 36651420-f623-4ac0-adc7-73326fba9fa8 this
    Member [10.5.167.11]:5701 - f41c88f0-4e7a-466d-897a-b222effe19d2
]
aws
Şöyle yaparız
<aws enabled="false">
  <access-key>my-access-key</access-key>
  <secret-key>my-secret-key</secret-key>
  <region>us-west-1</region>
  <host-header>ec2.amazonaws.com</host-header>
  <security-group-name>hazelcast-sg</security-group-name>
  <tag-key>type</tag-key>
  <tag-value>hz-nodes</tag-value>
</aws>
discovery-strategy
Sanırım custom discovery-strategy için kullanılır.
Örnek
Şöyle yaparız
<discovery-strategies>
  <discovery-strategy enabled="true" class="xx.xx.DiscoveryStrategy">
    <properties>
      <property name="ws_port">5701</property>
    </properties>
  </discovery-strategy>
</discovery-strategies>
kubernetes

multicast
Multicast'i denemek için Linux'ta şöyle yaparız
on the server: sockperf server -i 224.2.2.3 -p 54327
on the client: sockperf ping-pong -i 224.2.2.3 -p 54327
Açıklaması şöyle
multicast-group: The multicast group IP address. Specify it when you want to create clusters within the same network. Values can be between 224.0.0.0 and 239.255.255.255. Its default value is 224.2.2.3.

multicast-port: The multicast socket port that the Hazelcast member listens to and sends discovery messages through. Its default value is 54327.

Örnek
Şöyle yaparız
hazelcast:
cluster-name: dev-ptt group: network: join: multicast: enabled: true
Örnek - IPv4
Şöyle yaparız
<multicast enabled="false" loopbackModeEnabled="false">
  <multicast-group>224.2.2.3</multicast-group>
  <multicast-port>54327</multicast-port>
  <multicast-timeout-seconds>2</multicast-timeout-seconds>
  <multicast-time-to-live>32</multicast-time-to-live>
</multicast>
Örnek - IPv6
Şöyle yaparız
<multicast enabled="false">
  <multicast-group>FF02:0:0:0:0:0:0:1</multicast-group>
  <multicast-port>54327</multicast-port>
</multicast>
tcp
Açıklaması şöyle. Yani cluster member olarak iki bilgisayar varsa ve üçüncüsünü eklersek sadece üçüncüsünün XML'inde değişiklik yapmak yeterli.
The members section in TCP is for finding the cluster.

You list some places where cluster members may be. The process starting tries those locations, and if it gets a response the response includes the locations of all cluster members.

When scaling up you frequently won't know the location in advance. The TCP list is one solution, but there's other ways if running on the cloud, etc.
Bir başka açıklama şöyle
Usually, you don't need to change anything, and just starting a new member (IP3) with listed IP1 and IP2 will work. The third member will join the cluster.

How does it look like under the hood (simplified):

- the newly started member tries to contact addresses in its member list; after a successful connection, it sends a "Who Is Master" request and reads the master node address from the response;
- the new node makes a connection to the master address (if not established already) and asks it to join the cluster;
- if the join is successful, the master replies with an updated member list (cluster view) and it also sends the updated list to all other cluster members;

There were significant improvements in handling corner cases for these "incomplete address configuration" issues in Hazelcast 5.2. So if you are on an older version I strongly recommend switching to an up-to-date one.

If for any reason the default behavior is not sufficient in your case, you can also use the /hazelcast/rest/config/tcp-ip/member-list REST endpoint to make member-list changes. The endpoint was also introduced in 5.2. Find details in the documentation:

Örnek 
Şöyle yaparız
<hazelcast xmlns="http://www.hazelcast.com/schema/config">
  <cluster-name>hazelcast-test</cluster-name>
  <network>
    <port auto-increment="true" port-count="20">5701</port>
    <join>
      <multicast enabled="false"/>
      <tcp-ip enabled="true">
        <member>localhost:5701</member> 
        <member>localhost:5702</member>
        <member>localhost:5703</member>
      </tcp-ip>
    </join>
  </network>

  <management-center scripting-enabled="true" />
</hazelcast>
Örnek - tcp + timeout
Şöyle yaparız
<tcp-ip enabled="true" connection-timeout-seconds="5">
  <member-list>
    <member>10.244.0.35</member>
    <member>10.244.0.36</member>
    <member>10.244.0.37</member>
  </member-list>
</tcp-ip>


HazelcastAPI IQueue Arayüzü - Veriyi Bölümlendirerek Dağıtmaz

Giriş
Şu satırı dahil ederiz
import com.hazelcast.collection.IQueue;
java.util.concurrent.BlockingQueue arayüzünden kalıtır. Açıklaması şöyle
The IQueue is not a partitioned data-structure. All the content of an IQueue is stored in a single machine (and in the backup).
Örnek
Şöyle yaparız
HazelcastInstance instance = ...;
IQueue<Float> queue = instance.getQueue("dist_q");
addItemListener metodu
Örnek
Şöyle yaparız
import com.hazelcast.collection.ItemEvent;
import com.hazelcast.collection.ItemListener;

public class MyItemListener implements ItemListener<Foo> {
  @Override
  public void itemAdded(ItemEvent<Foo> item) {
    System.out.println("Item added: " + item.getItem());   
  }

  @Override
  public void itemRemoved(ItemEvent<Foo> item) {
    System.out.println("Item removed: " + item.getItem());      
  }
}

boolean includeValue = true;
queue.addItemListener(listener, includeValue);
drainTo metodu
Örnek
Şöyle yaparız
BlockingQueue<Message> queue = hazelcastInstance.getQueue("send-test");
Collection<Message> list = new HashSet<>();
int d = queue.drainTo(list, 100);
offer metodu
Bir örnek burada

poll metodu
Bir örnek burada

take metodu
Örnek ver











25 Ekim 2022 Salı

Hazelcast JDBC Driver - Java Uygulaması Hazelcast'e Bağlanır

Giriş
Açıklaması şöyle
Hazelcast JDBC Driver allows Java applications to connect to Hazelcast using the standard JDBC API.
Açıklaması şöyle
Hazelcast JDBC driver leverages the Client API of the Hazelcast JAR, which makes it a Type 2 Driver.
Yani aslında yapılan şey SqlService ile Hazelcast SQL Dialect cümlelerini çalıştırmak
Maven
Şu satırı dahil ederiz
<dependency>
  <groupId>com.hazelcast</groupId>
  <artifactId>hazelcast-jdbc</artifactId>
  <version>5.1</version>
</dependency>
Spring ile kullanmak için şöyle yaparız
spring.datasource.url=jdbc:hazelcast://localhost:5701
spring.datasource.driver-class-name=com.hazelcast.jdbc.Driver
Örnek
Şöyle yaparız
jdbc:hazelcast://localhost/
?discoveryToken=...
&sslEnabled=true
&javax.net.ssl.keyStore=...
&keyStorePassword=..." +
&trustStore=...
&trustStorePassword=...
&cloudUrl=https://api.viridian.hazelcast.com
Örnek
Elimizde şöyle bir IMap olsun
hazelcast:
  map:
    analytics:
      indexes:
        - type: HASH
          attributes:
            - "component"
        - type: SORTED
          attributes:
            - "instant"
Şöyle yaparız
try (var connection = DriverManager.getConnection("jdbc:hazelcast://localhost:5701/");
  var statement = connection
    .prepareStatement("SELECT * FROM analytics where component = ? and instant < ?")) {
  statement.setString(1, name);
  statement.setLong(2, instant);
  var resultSet = statement.executeQuery();
  var modelBuilder = new TableModelBuilder<>();
  while (resultSet.next()) {
    var component = resultSet.getString("component");
    System.out.println(component);
  }
}
DBeaver Kullanmak
Hazelcast JDBC sürücüsü DBeaver ile gelmiyor. 
2. Database > Driver Manager > New ile yeni sürücü yaratma penceresi açılır
3. "Settings" sekmesi şöyledir. "Driver Name" olarak istediğim ismi veririm. "Class Name" olarak
"com.hazelcast.jdbc.Driver" yazılır

4. "Libraries" sekmesi şöyledir. "Add File" ile ilk adımda indirdiği jar dosyasını eklerim. "Find Class" düğmesi çalışmıyor.

5. Daha sonra yeni bir "Connection" yaratırım. JDBC adresi şöyledir. UR olarak
"jdbc:hazelcast://localhost:5701" yazdım. Username ve Password alanlarına gerek yok, çünkü Hazelcast'i çalıştırırken şifre koymadım. 5701 cluster'daki ilk member'ın kullandığı port. İkinci member ayarlara bağlı olarak 5702 vs gibi başka bir portu kullanacaktır

Bağlandıktan sonra tablolar şöyle. GenericMapStore  kullandığım için "Tables" altındaki "__map-store.dbmap" benim IMap için yarattığım tablo. Index koymadım.








20 Ekim 2022 Perşembe

Hazelcast Jet Pipeline Execution Model

Giriş
Jet'i anlatan bir makale burada
Biz Pipeline kodunu yazıyoruz. Elimizde şöyle bir kod olsun
Pipeline p = Pipeline.create();
p.readFrom(textSource())
 .flatMap(line -> traverseArray(line.toLowerCase().split("\\W+")))
 .filter(word -> !word.isEmpty())
 .groupingKey(wholeItem())
 .aggregate(AggregateOperations.counting())
 .writeTo(someSink());
Sonra bu Pipeline DAG'a çevriliyor. Pipeline DAG ise bazı optimizasyonlara uğradıktan sonra Core DAG'a çevriliyor.  DAG Sınıfı yazısına bakabilirsiniz

Şeklen şöyle
Optimizasyon için açıklama şöyle
We can see that Jet applied some simple graph transformations:
- joined consecutive stateless transforms into a single vertex
- implemented the group-and-aggregate transform as two vertices
Sonra Core DAG stage'leri arasında bağlantı hesaplanıyor. Şeklen şöyle
Yukarıdaki şekilde "Combine" safhasının "distributed partitioned" olduğu görülebilir. Yani verilen key'e bakılarak hangi üyede çalışan hangi stage'e gönderileceği hesaplanır. Buna execution plan olarak bakarsak  şeklen şöyle. Burda Jet Node 1 açısından gösteriliyor. "From Accumulate on Jet Node 2" ile veri giriyor. "To Combine on Jet Node 2" ile veri gönderiliyor.


Stage Çeşitleri
İki çeşit Stage var.
 1. BatchStage
2. StreamStage

Kodda BatchStage ve StreamStage arayüzleri görülebilir.
Örnek
Şöyle yaparız
// Listing 2: Hybrid Batch & Streaming Program
Pipeline p = Pipeline.create();
BatchStage<Person> persons = p.readFrom("persons");
BatchStage<Tuple2<Integer, Long>> countByAge = persons
  .groupingKey(Person::age)
  .aggregate(counting());

StreamStage<Order> orders = p.readFrom(kafka(...));
StreamStage<Entry<Order, Long>> ordersWithAgeCounts = orders
  .hashJoin(countByAge, joiningMapEntries(Order::ageOfBuyer))
  .writeTo(someSink());
Cooperative Execution Engine
Core DAG oluşturulduktan sonra bunun çalıştırılması var. Çalıştırmak için cooperative model kullanılıyor. Bu modelde her işlemci için bir thread açılır. Thread per core yani.

Cooperative olması için pipeline içindeki thread'ler hiç bir zaman bloke olmaması gerekir. Execution Plan'deki her bir Stage bir thread'e atanıyor. Şeklen şöyle. Yukarıdaki şekilde 10 tane Stage vardı. Aşağıdaki şekilde de 10 tane stage var. Her birisi bir thread'e atanmış vaziyette.
Thread için döngü şöyle.
while (true) {
  boolean madeProgress = false;
  for (Iterator<Tasklet> it = tasklets.iterator(); it.hasNext();) {
    ProgressState ps = it.next().call();
    if (ps.isDone) {
      it.remove();
    }
    madeProgress |= ps.madeProgress;
  }
  if (!madeProgress) {
    backOff();
  }
}
Tasklet Nedir?
Tasklet yazısına taşıdım

Processor Outbox'ı Çağırır
Outbox şöyle
interface Outbox {
  boolean offer(int ordinal, @Nonnull Object item);
  ...
}
Açıklaması şöyle
offer() is non-blocking, but will fail when the outbox is full, returning false. The processor will react to this by returning from its process() method, and then the tasklet returns from call(). The processor must preserve its state of computation so that it can resume where it left off the next time it's called.
Processor Input'u Lazy Okur
Traverser kullanır. Traverser şöyle
interface Traverser<T> {
  T next();
  ...
}
Açıklaması şöyle
In many cases the processor satisfies the non-blocking contract by creating a lazy sequence from the input and attaching transformation steps to it (akin to Kotlin sequences). Jet defines the Traverser<T> type for this purpose, an iterator-like object with just a single abstract method:
Copy

This lightweight contract allows us to implement Traverser with just a lambda expression. If you look at the source code of Jet processor, you may encounter quite complex code inside Traverser transforms. A good example is the SlidingWindowP processor.
BackPressure
Açıklaması şöyle
Local communication between tasklets inside the same Jet node is easy: we just use bounded queues and force the tasklets to back off as soon as all their output queues are full.

Backpressure is trickier over a network link: instead of a shared memory location you can use for reliable instant signaling, all we have are messages sent over unreliable links that have significant latency. Hazelcast Jet uses a design very similar to the TCP/IP adaptive receive window: the sender must wait for an acknowledgment from the receiver telling it how many more data items it can send. After processing item N, the receiver sends a message that the sender can send up to item N + RWIN.

The receiver sends the acknowledgment message ten times per second, so as long as the receive window is large enough to hold the amount of data processed within 100 milliseconds plus network link latency, the receiver will always have data ready to be processed:



19 Ekim 2022 Çarşamba

HazelcastAPI MapLoader Arayüzü - Harici Bir Kaynaktan Veri Yükler

Giriş
Şu satırı dahil ederiz
import com.hazelcast.map.MapLoader;
MapLoader gerekirse MapLoaderLifecycleSupport arayüzünden kalıtabilir. 
Açıklaması şöyle
MapLoader is never executed from client thread, always from a separate thread pool
Eski Yöntem
Açıklaması şöyle
Here is the MapLoader initialization flow:

1. When getMap() is first called from any member, initialization starts depending on the value of the initial-mode property. If it is set to EAGER, initialization starts on all partitions as soon as the map is created, i.e., all partitions are loaded when getMap is called. A map is loaded when the IMap#getMap method is called in the EAGER mode; otherwise, it is loaded after the first operation, e.g., IMap#size, IMap#get, etc. of the map in the LAZY mode.

2. Hazelcast calls the MapLoader.loadAllKeys() to get all your keys on one of the members.

3. That member distributes keys to all other members in batches.

4. Each member loads values of all its owned keys by calling MapLoader.loadAll(keys).

5. Each member puts its owned entries into the map by calling IMap.putTransient(key,value).
Açıklaması şöyle
Probably in Hazelcast 3.4 we are going to enrich the loading model. I believe there will be more control on loading all keys per partition, because currently 1 machine is going to be 'victim' to load all keys, but in some cases it is better to spread the load. 
Yeni Yöntem
Not : Varsayılan MapStoreConfig.InitialLoadMode değeri LAZY
Açıklaması şöyle. Sanırım her partition kendisine ait değerleri yüklüyor
Each partition is loaded when it is first touched.

loadAll metodu
İmzası şöyle. Eğer key için value yoksa null döndürülemez!
Map<K, V> loadAll(Collection<K> keys);

loadAllKeys metodu
Map ilk yaratıldığında üyelerden sadece bir tanesi tarafından çalıştırılır. Key nesnelerini yükler. Açıklaması şöyle
This method is called when the Hazelcast IMap is first created, and it is used to preload all the keys from the data store into the map. By preloading the keys, you can avoid having to load the keys lazily when they are accessed for the first time, which can improve performance.
Açıklaması şöyle. Yani null, kısmi bir key Iterable veya her şeyi içeren bir Iterable döndürülebilir
The MapLoader.loadAllKeys() method can return all, some, or none of the keys. For example, you can specify a range of keys to be loaded, then rely on read-through to load the remaining keys on demand. Or, you can return a null value so that no data is loaded from the data store.

If the number of keys to load is large, it is more efficient to load them incrementally rather than loading them all at once. To support incremental loading, the MapLoader.loadAllKeys() method returns an Iterable which can be lazily populated with the results of a database query. Hazelcast iterates over the returned data and, while doing so, sends the keys to their respective owner members. The iterator that was returned from the MapLoader.loadAllKeys() method may also implement the Closeable interface, in which case the iterator is closed when the iteration is over. This is intended for releasing resources such as closing a JDBC result set.
Call stack şöyle
- MyMapLoader.loadAllKeys
MapStoreWrapper.loadAllKeys
BasicMapStoreContext.loadAllKeys : BasicMapStoreContext Iterable<K> null ise boş collection döner. Bu yüzden metod null döndürebilir
MapKeyLoader.sendKeysInBatches
MapKeyLoader.sendKeys

MapKeyLoader kodu şöyle. sendKeysInBatches ve dolayısıyla geri kalan her şey bir başka thread içinde çalıştırılıyor
private Future<?> sendKeys(final MapStoreContext mapStoreContext, 
                           final boolean replaceExistingValues) {
  if (keyLoadFinished.isDone()) {
    keyLoadFinished = new LoadFinishedFuture();

    Future<Boolean> sent = execService.submit(MAP_LOAD_ALL_KEYS_EXECUTOR, () -> {
      sendKeysInBatches(mapStoreContext, replaceExistingValues);
      return false;
    });

    execService.asCompletableFuture(sent).whenCompleteAsync(keyLoadFinished,
                    ConcurrencyUtil.getDefaultAsyncExecutor());
    }

    return keyLoadFinished;
}
Örnek
Şöyle yaparız. Bu örnekte veri tabanındaki her şey bir anda yükleniyor.
public class MyMapLoader implements MapLoader<Integer, String> {
    
  // other methods
    
  @Override
  public Iterable<Integer> loadAllKeys() {
    // load all the keys from the data store and return them as a set
    Set<Integer> keys = new HashSet<>();
    try (Connection conn = dataSource.getConnection();
      Statement stmt = conn.createStatement();
      ResultSet rs = stmt.executeQuery("SELECT id FROM my_table")) {
      while (rs.next()) {
        int id = rs.getInt("id");
        keys.add(id);
      }
    } catch (SQLException e) {
      throw new RuntimeException("Error loading keys", e);
    }
    return keys;
  }
}



Hazelcast SQL - IMap İçin CREATE MAPPING Örnekleri

Giriş
  • Type= IMap olarak belirtilir
  • keyFormat olarak int, bigintjava, compact vs kullanılabilir
  • keyFormat=java ise  keyJavaClass=String, Long vs olabilir
  • valueFormat=json-flat ise value olarak basit json string verilir.
  • valueFormat=java ise valueJavaClass=String, com.foo.Bar olarak kullanırız. Böylece value nesnesine ait tüm property'ler için sütun yaratır
  • valueFormat=json-flat ise value olarak basit json string verilir.
Basit Tipler
Örnek 
Şöyle yaparız. Burada key ve value alanları simple bir type olduğu için __key ve this kullanılıyor. IMap<Integer,String> içindir
CREATE OR REPLACE MAPPING  foo (
  __key INT,
  this VARCHAR
) TYPE IMap
OPTIONS (
  'keyFormat' = 'int',
  'valueFormat' = 'varchar'
)
Bu durumda şöyle yaparız
SELECT __key, this from mymap;
INSERT INTO mymap (__key, this) VALUES (24, 'worker24';
Örnek 
Şöyle yaparız.  Burada sadece keyFormat ve valueFormat belirtiliyor. __key ve this kullanmaya gerek yok. IMap<Integer,Integer> içindir
sql> CREATE MAPPING test_map TYPE IMap OPTIONS ('keyFormat'='int', 'valueFormat'='int');
OK
sql> insert into test_map select v, 0 from table(generate_series(1, 100000));
OK
sql> select count(*) from test_map;
+--------------------+
|              EXPR$0|
+--------------------+
|              100000|
+--------------------+
1 row(s) selected

COMPACT KEY ve COMPACT VALUE
Örnek 
Şöyle yaparız. Burada key ve value alanları simple bir type olmadığı için compact seçiliyor  IMap<Integer, Person> içindir
CREATE MAPPING my_map (
    id INT EXTERNAL NAME "__key.id",
    name VARCHAR,
    surname VARCHAR,
    age INT)
TYPE IMap
OPTIONS (
    'keyFormat' = 'compact',
    'keyCompactTypeName' = 'personId',
    'valueFormat' = 'compact',
    'valueCompactTypeName' = 'person'
)
JSON VALUE
Burada dikkat edilmesi gereken bir şey var. Şöyle bir Job çalıştıralım
CREATE JOB jdbc_to_imap AS SINK INTO foo SELECT id,id,name,ssn FROM bar
IMap için mapping yarattıktan sonra value nesnesi HazelcastJsonValue tipindendir.

Örnek
Şöyle yaparız. Burada sütunlar belirtiliyor IMap<Long, String> içindir
CREATE MAPPING trade_map (
  __key BIGINT,
  ticker VARCHAR,
  company VARCHAR,
  amount BIGINT)

TYPE IMap
OPTIONS (
  'keyFormat'='bigint',
  'valueFormat'='json-flat');
Örnek
Şöyle yaparız. Burada sütunlar belirtiliyor. key int, value is json-flat.  IMap<Long, String> içindir
CREATE MAPPING cities (
__key INT,
countries VARCHAR,
cities VARCHAR)
TYPE IMap
OPTIONS('keyFormat'='int', 'valueFormat'='json-flat');
Daha sonra şöyle yaparız
INSERT INTO cities VALUES
(1, 'United Kingdom','London'),
(2, 'United Kingdom','Manchester'),
(3, 'United States', 'New York'),
(4, 'United States', 'Los Angeles'),
(5, 'Turkey', 'Ankara'),
(6, 'Turkey', 'Istanbul'),
(7, 'Brazil', 'Sao Paulo'),
(8, 'Brazil', 'Rio de Janeiro');
JAVA VALUE
Örnek
Şöyle yaparız. Burada sütunlar belirtilmiyor. IMap<Long, String> içindir
CREATE MAPPING "data" EXTERNAL NAME "Patient"
  TYPE IMap
  OPTIONS (
    'keyFormat' = 'java',
    'keyJavaClass' = 'java.lang.Long'
    'valueFormat' = 'java'
    'valueJavaClass' = 'java.lang.String'
)
Örnek
Şöyle yaparız. Burada sütunlar belirtilmiyor. IMap<String, Patient> için
CREATE MAPPING "Patient" EXTERNAL NAME "Patient"
  TYPE IMap
  OPTIONS (
    'keyFormat' = 'java',
    'keyJavaClass' = 'java.lang.String'
    'valueFormat' = 'java'
    'valueJavaClass' = 'com.example.model.Patient'
)
Örnek
Şöyle yaparız. IMap<String,HashMap> içindir
CREATE MAPPING "m" 
TYPE IMap
OPTIONS (
  'keyFormat'='varchar',
  'valueFormat'='java',
  'valueJavaClass'='java.util.HashMap'
)

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt