27 Aralık 2022 Salı

HazelcastAPI EntryProcessor Arayüzü - IMap İle Kullanılır

Giriş
Şu satırı dahil ederiz
import com.hazelcast.map.EntryProcessor;
Neden Lazım?
Açıklaması şöyle. Yani Map üzerinde bir işlem yapmak istersek nesneyi işlemi başlatan yere kadar taşıyıp, nesneyi güncelleyip tekrar geri göndermek gerekir. Bu da maliyetli. EntryProcessor ile işlem sunucu üzerinde yani yerinde yapılabiliyor. Ayrıca sonuçlar toplanıp tekrar istemciye gönderiliyor.
An EntryProcessor passes you a Map.Entry. At the time you receive it the entry is locked and not released until the EntryProcessor completes. This obviates the need to explicitly lock as would be required with a ExecutorService.

Performance can be very high as the data is not moved off the Member partition. This avoids network cost and, if the storage format is InMemoryFormat.OBJECT, then there is no de-serialization or serialization cost.
Açıklaması şöyle
The ability to collocate computation with data is probably the biggest strength of Hazelcast. Load your data set in a distributed IMap and process it with EntryProcessors. Or run a stream processing pipeline on top of a stream of CDC events.
Kim Çalıştırır
Açıklaması şöyle. Partition Thread tarafından çalıştırılır
EntryProcessors execute on the partition thread in a member.
Açıklaması şöyle
Internally, EPs are executed by partition threads, where one partition thread takes care of multiple partitions. When an EP comes to Hazelcast, it is picked by the owner thread of the partition where the key belongs. Once the processing is completed, the partition thread is ready to accept and execute other tasks (which may well be same EP for same key, submitted by another thread). 
Kod şöyle
@BinaryInterface
@FunctionalInterface
public interface EntryProcessor<K, V, R> extends Serializable {
    
  R process(Entry<K, V> entry);
    
  default @Nullable EntryProcessor<K, V, R> getBackupProcessor() {
    if (this instanceof ReadOnly) {
      return null;
    }
    return this;
  }
}
EntryProcessor ve Exception
EntryProcessor  kendisini çağıran partition thread'e exception fırlatmasa daha iyi

getBackupProcessor metodu
- Eğer null dönerse EntryProcessor sadece key'in sahibi olan Partition üzerinde çalışır
- Eğer null dönmezse backup amaçlı özel bir EntryProcessor dönmek gerekir

Diğer
EntryProcessor IMap arayüzünün şu metodları ile kullanılabilir
<R> Map<K, R> executeOnEntries(@Nonnull EntryProcessor<K, V, R> entryProcessor);
<R> Map<K, R> executeOnEntries(@Nonnull EntryProcessor<K, V, R> entryProcessor,
                               @Nonnull Predicate<K, V> predicate);

<R> R executeOnKey(@Nonnull K key,@Nonnull EntryProcessor<K, V, R> entryProcessor);
<R> Map<K, R> executeOnKeys(@Nonnull Set<K> keys,
                            @Nonnull EntryProcessor<K, V, R> entryProcessor);

<R> CompletionStage<R> submitToKey(@Nonnull K key,
                                   @Nonnull EntryProcessor<K, V, R> entryProcessor);
<R> CompletionStage<Map<K, R>> submitToKeys(
                               @Nonnull Set<K> keys,
                               @Nonnull EntryProcessor<K, V, R> entryProcessor);
Örnek
Şöyle yaparız
public class TestProcessor implements EntryProcessor<String, TestPojo, TestPojo> {
  private static final long serialVersionUID = -7228575928294057876L;
  private final TestPojo newValue;

  TestProcessor(TestPojo newValue) {
    this.newValue = newValue;
  }

  @Override
  public TestPojo process(Map.Entry<String, TestPojo> entry) {
    System.out.println("update value: " + entry.getValue() +
    " with new value " + this.newValue);
    entry.setValue(newValue);
    return this.newValue;
  }
}

IMap<String, TestPojo> map = ...;
map.executeOnKey("key", new TestProcessor(new TestPojo("Version2")));

ComputeEntryProcessor Sınıfı
Örnek
Şöyle yaparız
IMap<Long, Integer> cart = hazelcast.getMap("default");

Integer productId = ...;
Integer newQuantity = cart.executeOnKey(productId,
  new ComputeEntryProcessor<Long, Integer, Integer>((key,value) -> {
    Integer newQuantity = value + 1;
    return newQuantity;
});

22 Aralık 2022 Perşembe

HazelcastAPI MetricsService Sınıfı

Giriş
Şu satırı dahil ederiz 
import com.hazelcast.internal.metrics.impl.MetricsService;
Açıklaması şöyle
Service collecting the Metrics periodically and publishes them via MetricsPublishers
Kodu şöyle
public class MetricsService implements ManagedService, LiveOperationsTracker {
  public static final String SERVICE_NAME = "hz:impl:metricsService";

  ...
  private final CopyOnWriteArrayList<MetricsPublisher> publishers = 
    new CopyOnWriteArrayList<>();
  ...
  private volatile ScheduledFuture<?> scheduledFuture;
  ...
}
scheduleMetricsCollectorIfNeeded metodu
scheduledFuture ile bir timer başlatır ve collectMetrics() metodunu tetikler. Timer'ın sıklığı MetricsConfig.getCollectionFrequencySeconds() ile belirlenir.

collectMetrics metodu
Kod şöyle
void collectMetrics() {
  MetricsPublisher[] publishersArr = publishers.toArray(new MetricsPublisher[0]);
  PublisherMetricsCollector publisherCollector = 
    new PublisherMetricsCollector(publishersArr);
  collectMetrics(publisherCollector);
  publisherCollector.publishCollectedMetrics();
}  
PublisherMetricsCollector hem collector hem de publisher. Kalıtım şöyle
 MetricsCollector
  PublisherMetricsCollector

MetricsCollector sınıfının collect metodları var. Bunlar şöyle
void collectLong(MetricDescriptor descriptor, long value);

void collectDouble(MetricDescriptor descriptor, double value);

void collectException(MetricDescriptor descriptor, Exception e);

void collectNoValue(MetricDescriptor descriptor);
collectMetrics metodu - MetricsCollector metricsCollector
Kod şöyle. Yani PublisherMetricsCollector nesnesi MetricsCollector olarak metricsRegistrySupplier nesnesine geçiliyor. Her collectX() metodu çağrılınca PublisherMetricsCollector nesnesi de içindeki publisher'ları tetikliyor.
void collectMetrics(MetricsCollector metricsCollector) {
  metricsRegistrySupplier.get().collect(metricsCollector);
}





19 Aralık 2022 Pazartesi

Hazelcast Jet MasterJobContext Sınıfı

Giriş
Şu satırı dahil ederiz 
import com.hazelcast.jet.impl.MasterJobContext;
Bu sınıfın açıklaması şöyle
Part of MasterContext that deals with execution starting and termination
İlişkisi şöyle
DefaultNodeExtension
  JetServiceBackend
      MasterContext 
        MasterJobContext

finalizeJob metodu
İmzası şöyle. Bir exception fırlatılınca Job nesnesini sonlandırır
void finalizeJob(@Nullable Throwable failure)

16 Aralık 2022 Cuma

15 Aralık 2022 Perşembe

HazelcastClientAPI ClientInvocation Sınıfı - Client Tarafındaki Invocation

Giriş
Şu satırı dahil ederiz
import com.hazelcast.client.impl.spi.impl.ClientInvocation;
Kalıtım şöyle
Kalıtım şöyle
BaseInvocation
    MasterInvocation
    PartitionInvocation
    RaftInvocation
    TargetInvocation

allowRetryOnRandom Alanı
Varsayılan değeri true. Dolayısıyla client bir member'a çağrıda bulunur ancak başarısız olursa, tekrar dener ama bu sefer hedef olarak rastgele bir başka member'ı kullanır

disallowRetryOnRandom metodu
allowRetryOnRandom alanına false değerini atar.

invoke metodu
Metod şöyle. Burada bir Future döndürülüyor.
public ClientInvocationFuture invoke() {
  clientMessage.setCorrelationId(callIdSequence.next());
  invokeOnSelection();
  return clientInvocationFuture;
}
invokeOnSelection metodu
Metod şöyle. Detaylar önemli değil. Sonuçta çağrı invocationService nesnesine geçiliyor ve callback için invocationService nesnesine "this" ile kendisini veriyor.
private void invokeOnSelection() {
  ...
  if (isSmartRoutingEnabled) {
    if (partitionId != -1) {
      invoked = invocationService.invokeOnPartitionOwner(this, partitionId);
    } else if (uuid != null) {
      invoked = invocationService.invokeOnTarget(this, uuid);
    } else {
      invoked = invocationService.invoke(this);
    }
     ...
  } else {
    invoked = invocationService.invoke(this);
  }
  ...
}
shouldRetry metodu
Metod şöyle. Eğer TargetDisconnectedException gelirse client procotol'de bu çağrı retryable olarak işaretli ise tekrar dener.
private boolean shouldRetry(Throwable t) {
  if (t instanceof InvocationMightContainCompactDataException) {
    return true;
  }

  if (isBindToSingleConnection() && (t instanceof IOException ||
    t instanceof TargetDisconnectedException)) {
    return false;
  }

  if (uuid != null && t instanceof TargetNotMemberException) {
    //when invocation send to a specific member
    //if target is no longer a member, we should not retry
    //note that this exception could come from the server
    return false;
 }

 if (t instanceof IOException || t instanceof HazelcastInstanceNotActiveException || 
   t instanceof RetryableException) {
  return true;
 }
 if (t instanceof TargetDisconnectedException) {
   return clientMessage.isRetryable() || invocationService.isRedoOperation();
  }
  return false;
}


14 Aralık 2022 Çarşamba

HazelcastAPI MapInterceptor Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.map.MapInterceptor;
interceptPut metodu
map.put() çağrılınca interceptPut() tetiklenir

Örnek
Şöyle yaparız
public class TestInterceptor implements MapInterceptor {
  private static final long serialVersionUID = 4971835785800511499L;
  @Override
  public Object interceptGet(Object value) {
    return value;
  }
  @Override
  public void afterGet(Object value) {
  }
  @Override
  public Object interceptPut(Object oldValue, Object newValue) {
    System.out.println("old: " + oldValue + " new: " + newValue);
    if (newValue.equals(new TestPojo("Version2"))) {
      return new TestPojo("Version3");
    }
    return newValue != null ? newValue : oldValue;
  }
  @Override
  public void afterPut(Object value) {
  }
  @Override
  public Object interceptRemove(Object removedValue) {
    return null;
  }
  @Override
  public void afterRemove(Object oldValue) {
  }
}

IMap<String, TestPojo> map = ...;
map.addInterceptor(new TestInterceptor());


12 Aralık 2022 Pazartesi

HazelcastAPI TransactionalMap Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.transaction.TransactionalMap;
TransactionContext tarafından yaratılır

Örnek
Şöyle yaparız
TransactionalMap<String,Serializable> transactionalMap = 
  transactionalContext.getMap("...");
Örnek
Şöyle yaparız
HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();

TransactionOptions options = new TransactionOptions()
  .setTransactionType(TransactionOptions.TransactionType.ONE_PHASE );

TransactionContext context = hazelcastInstance.newTransactionContext(options);
context.beginTransaction();

TransactionalQueue queue = context.getQueue("myqueue");
TransactionalMap map = context.getMap("mymap");
TransactionalSet set = context.getSet("myset");

try {
  Object obj = queue.poll();
  //process obj
  map.put("1", "value1");
  set.add("value");
  //do other things
  context.commitTransaction();
} catch (Throwable t) {
  context.rollbackTransaction();
}


hazelcast.xml - Kubernetes İçin Network Join Ayarları

Giriş
Açıklaması şöyle
For production environments, the usage of UDP is not the best choice. So, among others, Hazelcast supports clusters which are deployed in Kubernetes environment without multicasting. For the detailed information, look into the documentation on Configuring Kubernetes. To activate the discovery on Kubernetes, set the properties as follows:
1. Bir kubernetes servisi yaratılır
2. Bu servise kubernetes tag içinde atıfta bulunulur

Örnek
Şöyle yaparız
<kubernetes enabled="true">
  <namespace>bjy</namespace>
  <service-name>hazelcast-service-lb</service-name>
</kubernetes>
Örnek
Şöyle yaparız. Burada ismi hazelcast olan bir servise atıfta bulunuluyor
hazelcast:
  instance-name: users-app
  cluster-name: users-app
  network:
    join:
      multicast:
        enabled: false
      kubernetes:
        enabled: true
        service-name: hazelcast
Açıklaması şöyle
This disables the multicast, enables Kubernetes discovery, and defines the name of the service hazelcast, which will be used to search for cluster members via Kubernetes API. Then, we need to create this service, which can be declared as the next
Şöyle yaparız
apiVersion: v1
kind: Service
metadata:
  name: hazelcast
  labels:
    app: hazelcast
spec:
  ports:
    - port: 5701
      protocol: TCP
  selector:
    app: app-users
  type: ClusterIP
Açıklaması şöyle
To allow Hazelcast to use the service inside Kubernetes for the discovery, we also need to grant certain permissions. An example of RBAC configuration for default namespace you can find in Hazelcast documentation.
Şöyle yaparız. Burada 'default' namespace içindeki 'default' service account'a ClusterRole bağlanıyor. ClusterRole ile get ve list verb'lerini çalıştırabiliyor.
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: hazelcast-cluster-role
rules:
  - apiGroups:
      - ""
    resources:
      - endpoints
      - pods
      - nodes
      - services
    verbs:
      - get
      - list
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: hazelcast-cluster-role-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: hazelcast-cluster-role
subjects:
  - kind: ServiceAccount
    name: default
    namespace: default
hazelcast.stale.join.prevention.duration.seconds Alanı
Örnek
5 yapılması öneriliyor. Şöyle yaparız
hazelcast.stale.join.prevention.duration.seconds=5'
Açıklaması şöyle
A member that’s starting up looks for candidate members according to its network join config. Once it has established an address for the cluster’s master, it issues join requests. It can be the case that the currently starting member sends out more than 1 request until master processes its join request and it becomes a member of the cluster.

A “stale join request” is one that is processed by master member after the sender is already in master’s cluster members list, so it is ignored. Master member maintains a list of “recently joined members” for this purpose.

How does this relate to kubernetes? With k8s managing pods and persistence enabled (so member has a persistent identity), it can be the case that this happens:
  • a pod is started, hazelcast member joins but fails startup shortly after joining.
  • k8s reschedules the failed pod quickly (shorter than “stale join prevention duration”)
  • new pod starts but master member ignores the join requests -> probably the member decides to start as standalone or it fails after cluster join timeout. In both cases, bad things happen.
To avoid such a situation, it is recommended to lower the stale join prevention duration.




11 Aralık 2022 Pazar

Client Protocol

Giriş
Client procotol ayrı bir proje. Her servis için bir yaml dosyası var. Bu dosyada metodlar tanımlı. Java kodu üretmek için önce bazı paketlerin kurulması gerekiyor. Şöyle yaparız
pip3 install -r requirements.txt
Daha sonra sadece java sınıfı üretmek için şöyle yaparız
 ./generator.py --no-binary -r ~/IdeaProjects/hazelcast/
Üretilen kodlar şu pakette
com.hazelcast.client.impl.protocol.codec
Eğer yeni bir alan falan eklediysek testleri de yeniden üretmek gerekiyor. Şöyle yaparız
 ./generator.py -r ~/IdeaProjects/hazelcast/
Yaml dosyasında her parametre için since diye bir alan var. Bu alan Hazelcast sürümü değil, client protocol sürümü. Yani since: 2.5 şeklinde yazmak gerekiyor.

retryable alanı - Mesaj İçindir
ClientInvocation Sınıfı TargetDisconnectedException exception gelirse çağrıyı tekrar dener

Örnek
Şöyle yaparız
- id: 17
    name: uploadJobMetaData
    since: 2.6
    doc: ''
    request:
      retryable: true
      partitionIdentifier: -1
      params:
        ...
    response: {}
response Alanı
Örnek
Şöyle yaparız
- id: 15
    name: getJobAndSqlSummaryList
    since: 2.5
    doc: ''
    request:
      retryable: true
      partitionIdentifier: -1
      params: []
    response:
      params:
        - name: response
          type: List_JobAndSqlSummary
          nullable: false
          since: 2.5
          doc: ''
Response Yani Cevap Mesajına Yeni Alan Ekleme
Örnek
Şöyle yaparız. Eğer üye alanın since değeri başlangıçtan farklı ise codec içinde abcExists şeklinde bir boolean daha üretiliyor.
- name: SqlError
  since: 2.1
  params:
    - name: code
      type: int
      nullable: false
      since: 2.1
    - name: message
      type: String
      nullable: true
      since: 2.1
    - name: originatingMemberId
      type: UUID
      nullable: false
      since: 2.1
    - name: suggestion
      type: String
      nullable: true
      since: 2.3
SqlErrorCodec için üretilen kod şöyle. Burada sonradan eklenen suggestion alanı için isSuggestionExists boolean görülebilir
public static com.hazelcast.sql.impl.client.SqlError decode(
  ClientMessage.ForwardFrameIterator iterator) {
  // begin frame
  iterator.next();

  ClientMessage.Frame initialFrame = iterator.next();
  int code = decodeInt(...;
  java.util.UUID originatingMemberId = decodeUUID(...);

  java.lang.String message = CodecUtil.decodeNullable(...);
  boolean isSuggestionExists = false;
  java.lang.String suggestion = null;
  if (!iterator.peekNext().isEndFrame()) {
    suggestion = CodecUtil.decodeNullable(iterator, StringCodec::decode);
    isSuggestionExists = true;
  }
  ...
  return new com.hazelcast.sql.impl.client.SqlError(
    code, message, originatingMemberId, isSuggestionExists, suggestion, ...);
}
Codec değiştiği için mesajın constructor metodu da değişiyor. Constructor değişirse testlerde kullanılan ReferenceObjects kodu da değişiyor. 

Eğer mesaj nesnesi custom ise yani otomatik üretilmiyorsa sınıfın equals() ve hashCode () metodlarını değiştirmek gerekiyor. Yoksa ClientCompatibilityTest_A_B veya MemberCompatibilityTest_A_B  
gibi testler başarısız olmaya başlıyor. Response mesajın elle kodlandığı için şöyle yapmak gerekir
@Override
public boolean equals(Object o) {
  ...
  if (suggestionExists && sqlError.suggestionExists) {
    if (!Objects.equals(suggestion, sqlError.suggestion)) {
      return false;
    }
  }
  ...
  return true;
}
Burada hem bende hem de karşıda suggestionExists bayrakları TRUE ise suggestion alanları karşılaştırılıyor. Böylece 
- yeni client eski member'dan mesaj alsa bile mesajı işleyebilir, çünkü karşı mesajdaki suggestionExists değeri FALSE
- eski client yeni member'dan mesaj alsa bile mesajı işleyebilir çünkü client açısından böyle bir alan yok




9 Aralık 2022 Cuma

hazelcast.xml - Wan Replication Ayarları

Wan Replication Nedir
Açıklaması şöyle
Wan sürekli devam eder. Kaynak değiştikçe bu değişiklikler hedefe de uygulanır
Disaster Recovery için kullanılabilir. 
Active-Active
Active-Passive olabilir

Hedef Nasıl Tanımlanır
batch-publisher altında IP adresi ve port tanımlanır.  Açıklaması şöyle
The receiving member on the target cluster will distribute the received entries to the partition owners. 
Hangi Veri Yapılarını Destekler
WAN replication tüm veri yapıları için çalışmıyor! Sadece  IMap ve ICache için çalışıyor. Bu veri yapıları üzerinde değişiklik yapan işlemler (set, remove gibi) karşıya kopyalanıyor.

- WanEvents nesneleri publisher kuyruklara ekleniyor. (ADD_OR_UPDATE, REMOVE)
- WanBatchPublisher bu kuyrukları okuyor
- Publisher karşıya gönderince ACK bekliyor. ACK_ON_RECEIPT, ACK_ON_OPERATION_TO_COMPLETE
- WanConnectionManager hedef adresleri saklar
- SerialBatchReplicationStrategy sanırım tüm event'leri sıra ile işliyor

Endpoint Discovery
Static veya Discovery Service kullanılabilir

Batch Paketleri
Key based coalescing kullanılabilir
Batch sending frequency (max size & delay)

Consistency Checking
REST API veya MC ile synchronization tetiklenebilir. Full Sync veya Delta Synch kullanılabilir.
Full Sync tehlikeli çünkü çok fazla veriyi taşımak gerekebilir
Delta Synch altta Merkle Tree kullanır ve bu özelliğin açık olması gerekir.

Conflict
Merge Policy gerekir. Data structure ayarlarında yapılır.
Value-based policy : PutIfAbsent, Passthrough
Metadata-based policy : HigherHits, ExpirationTime, LatestAccess, LatestUpdate
Custom merge policy 


Örnek
Şöyle yaparız. Burada IMap veya ICache wan-replication-ref ile wan-replication ayarlarına isim ile atıfta bulunuyor
<! -- Tokyo -->
<hazelcast>
 <cluster-name>tokyo</cluster-name>
 <wan-replication name="london-wan-rep">
   <batch-publisher>
     <publisher-id>londonPublisherId</publisher-id>
     <cluster-name>london</cluster-name> <! -- Target cluster name -->
     <target-endpoints>10.3.5.1:5701</target-endpoints>
   </batch-publisher>
  </wan-replication>

  <map name="replicatedMap">
    <wan-replication-ref name="london-wan-rep"/>
  </map>

</hazelcast>

<! -- London -->
<hazelcast>
  <cluster-name>london</cluster-name>
  <wan-replication name="tokyo-wan-rep">
    <batch-publisher>
      <publisher-id>tokyoPublisherId</publisher-id>
       <cluster-name>tokyo</cluster-name> <! -- Target cluster name -->
       <target-endpoints>32.1.1.1:5701</target-endpoints>
     </batch-publisher>
  </wan-replication>

  <map name="replicatedMap">
    <wan-replication-ref name="tokyo-wan-rep"/>
  </map>

</hazelcast>


Docker ve Hazelcast

Giriş
Image isimleri şöyle
1.hazelcast
2.hazelcast-enterprise
3.management-center


Image ismine eklenti olarak
latest : en son çıkan resmi sürüm
latest-snapshot : en son çıkan ve resmi olmayan sürüm
görülebilir.


1. hazelcast Image
Örnek
Tek  container için şöyle yaparız
$ docker run hazelcast/hazelcast:$HAZELCAST_VERSION
Örnek
İki container için şöyle yaparız. Burada HZ_NETWORK_PUBLICADDRESS değişkeni önemli
$ docker run -e HZ_NETWORK_PUBLICADDRESS=<host_ip>:5701 -p 5701:5701 hazelcast/hazelcast:$HAZELCAST_VERSION
$ docker run -e HZ_NETWORK_PUBLICADDRESS=<host_ip>:5702 -p 5702:5701 hazelcast/hazelcast:$HAZELCAST_VERSION
Örnek
Bazen aynı dependency farklı jar'ladan gelebiliyor. Hangi jardan geldiğini görmek için şöyle yaparız
docker run --user=root  
  --env=PATH=/opt/hazelcast/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin 
  --env=HZ_HOME=/opt/hazelcast 
  --env=CLASSPATH_DEFAULT=/opt/hazelcast/* 
  --env=JAVA_OPTS_DEFAULT=-Djava.net.preferIPv4Stack=true 
  --env=PROMETHEUS_PORT= 
  --env=PROMETHEUS_CONFIG=/opt/hazelcast/config/jmx_agent_config.yaml 
  --env=CLASSPATH= 
  --env=JAVA_OPTS=-verbose:class 
  --env=HAZELCAST_CONFIG=config/hazelcast-docker.xml 
  --env=LANG=C.UTF-8 
  --workdir=/opt/hazelcast  
  --name orcun 
  -p 5701:5701 
  -d 
  hazelcast/hazelcast:5.4.0-SNAPSHOT
Burada JAVA_OPTS=-verbose:class ile container başlayınca yüklenen sınıfların nereden geldiği gösteriliyor. Ayrıca user=root olarak başlatılyır. Böylece bazı dosyalar silinebilir ve container tekrar başlatılabilir
> cd lib
> rm -rf hazelcast-jet-hadoop-all-5.4.0-SNAPSHOT.jar
> rm -rf hazelcast-jet-files-gcs-5.4.0-SNAPSHOT.jar

2. hazelcast-enterprise Image
Slim image var. Farkı ne bilmiyorum
Örnek
Şöyle
hazelcast/hazelcast-enterprise:5.3.2-slim 
3. management-center
Örnek
Şöyle yaparız. 
docker run -p 8080:8080 hazelcast/management-center:latest-snapsho
Örnek - Base Image
Şöyle yaparız
FROM hazelcast/management-center:5.3.3

ENV JAVA_OPTS="
-Dhazelcast.mc.license=... 
-Dhazelcast.mc.session.timeout.seconds=120 
-Dhazelcast.mc.healthCheck.enable=true 
-Xms64m -Xmn1024m -Xmx2G"

# Start Management Center
CMD ["bash", "-c", "set -euo pipefail \
&& ./bin/mc-conf.sh cluster add --cluster-name="clstr1" --member-addresses="...:5701" \
&& ./bin/mc-conf.sh cluster add --cluster-name="clstr2" --member-addresses="...:5701" \
&& ./bin/mc-start.sh \
"]


4. hazelcast_cloud Image'ları
Bir de hazelcast_cloud için image'lar var. Link şöyle https://quay.io/organization/hazelcast_cloud
Bunlar ne işe yarıyor bilmiyorum



8 Aralık 2022 Perşembe

Hazelcast SPI OperationExecutorImpl Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.spi.impl.operationexecutor.impl.OperationExecutorImpl;
Bu aslında bir ExecutorService. İçinde GenericOperationThread dizisi barındırır. Bu thread'lerden bazıları priority thread olarak düşünülmüş. Heartbeat timeout gibi şeyler priority thread tarafından çalıştırılır

Kalıtım şöyle
Thread
  HazelcastManagedThread
    OperationThread
      GenericOperationThread
      PartitionOperationThread

public final class OperationExecutorImpl implements OperationExecutor, 
  StaticMetricsProvider {

  // all operations for specific partitions will be executed on these threads, 
  // e.g. map.put(key, value)
  private final PartitionOperationThread[] partitionThreads;
  private final OperationRunner[] partitionOperationRunners;

  private final OperationQueue genericQueue
    = new OperationQueueImpl(
      new LinkedBlockingQueue<>(), new LinkedBlockingQueue<>());

  // all operations that are not specific for a partition will be executed here, 
  // e.g. heartbeat or map.size()
  private final GenericOperationThread[] genericThreads;
  private final OperationRunner[] genericOperationRunners;
   ...
}
genericThreads MasterResponseOp vs gibi cluster işlerini çalıştırır

7 Aralık 2022 Çarşamba

HazelcastAPI ClusterJoinManager Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.internal.cluster.impl.ClusterJoinManager;
startJoinRequest metodu
Akış şöyle
JoinRequestOp.run()
  ClusterJoinManager.handleJoinRequest
    ClusterJoinManager.executeJoinRequest
      ClusterJoinManager.startJoinRequest
private void startJoinRequest(MemberInfo memberInfo) {
  long now = Clock.currentTimeMillis();
  ...
  final MemberInfo existing = joiningMembers.put(memberInfo.getAddress(), memberInfo);
  if (existing == null) {
    sendMasterAnswer(memberInfo.getAddress());
    ...
  } else if (!existing.getUuid().equals(memberInfo.getUuid())) {
    logger.warning("Received a new join request from " + memberInfo.getAddress()
      + " with a new UUID " + memberInfo.getUuid()
        + ". Previous UUID was " + existing.getUuid());
  }
  if (now >= timeToStartJoin) {
    startJoin();
  }
}
Master join isteği alınca sendMasterAnswer() içinde cevabı gönderiyor.

startJoin metodu
FinalizeJoinOp ile member listesi gönderilir.

HazelcastAPI HazelcastInstanceNotActiveException Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.core.HazelcastInstanceNotActiveException;
Açıklaması şöyle
What it means
The Hazelcast instance has been shut down, most likely ungracefully, and Hazelcast operations on this instance can no longer be processed.

What to do
 Assuming the node was ungracefully terminated but was done purposely, this message can be ignored. (This should be the root cause in most cases.) If not, Hazelcast crashed for an unknown reason. Examine the logs to find the root cause of the failure.
Callstack şöyle. shutdown() çağrılmış bir HazelcastIntance nesnesi üzerinde işlem yapılınca çağrılır
com.hazelcast.core.HazelcastInstanceNotActiveException: Hazelcast instance is not active!
at com.hazelcast.spi.impl.AbstractDistributedObject.throwNotActiveException(AbstractDistributedObject.java:115)
at com.hazelcast.spi.impl.AbstractDistributedObject.lifecycleCheck(AbstractDistributedObject.java:110)
at com.hazelcast.spi.impl.AbstractDistributedObject.getNodeEngine(AbstractDistributedObject.java:104)
at com.hazelcast.internal.crdt.pncounter.PNCounterProxy.getReplicaAddresses(PNCounterProxy.java:289)
Burada bir soru var

HazelcastAPI PNCounter Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.crdt.pncounter.PNCounter;
Açıklaması şöyle
In summary, IAtomicLong sacrifices Availability for Consistency. The result will always be correct, but it might not always be available.

PNCounter makes the opposite trade-off. It's always available (depending on the number of nodes of the cluster, of course) but it's eventually consistent as it's asynchronously replicated.
Command Pattern
com.hazelcast.internal.crdt.pncounter.operations.AddOperation

Hazelcast SQL JetSqlRow Sınıfı - SqlRow Tarafından Sarmalanır

Giriş
Şu satırı dahil ederiz
import com.hazelcast.sql.impl.row.JetSqlRow;
Açıklaması şöyle. Yani Processor nesneleri arasında JetSqlRow gönderilir
A row object that's sent between processors in the Jet SQL engine. It contains a fixed number of values.
Örneğin SelectProcessorSupplier, kendi yarattığı processor nesnesine mapOutputFn geçerken şöyle bir şey geçer. Burada ResultSet nesnesinden bir JetSqlRow yaratıldığı görülebilir
Processor processor = new ReadJdbcP<>(
  () -> dataConnection.getConnection(),
  (connection, parallelism, index) -> {
    ... // Function that executes PreparedStatement
  },
  rs -> { // Function that consumes the ResultSet
int columnCount = rs.getMetaData().getColumnCount(); Object[] row = new Object[columnCount]; for (int j = 0; j < columnCount; j++) { Object value = rs.getObject(j + 1); row[j] = convertValue(value); } return new JetSqlRow(evalContext.getSerializationService(), row); } );
Çağrı şöyle
<init>:54, JetSqlRow (com.hazelcast.sql.impl.row)
project:119, KvRowProjector (com.hazelcast.jet.sql.impl.connector.keyvalue)
project:104, KvRowProjector (com.hazelcast.jet.sql.impl.connector.keyvalue)
lambda$get$a3871156$1:72, RowProjectorProcessorSupplier (com.hazelcast.jet.sql.impl.connector.map)
applyEx:-1, 628161370 (com.hazelcast.jet.sql.impl.connector.map.RowProjectorProcessorSupplier$$Lambda$2238)
apply:49, FunctionEx (com.hazelcast.function)
tryProcess:566, AbstractProcessor$FlatMapper (com.hazelcast.jet.core)
tryProcess:45, TransformP (com.hazelcast.jet.impl.processor)
tryProcess0:187, AbstractProcessor (com.hazelcast.jet.core)
process0:602, AbstractProcessor (com.hazelcast.jet.core)
process:108, AbstractProcessor (com.hazelcast.jet.core)
lambda$processInbox$2f647568$2:490, ProcessorTasklet (com.hazelcast.jet.impl.execution)
runEx:-1, 722072740 (com.hazelcast.jet.impl.execution.ProcessorTasklet$$Lambda$2293)
run:31, RunnableEx (com.hazelcast.jet.function)
doWithClassLoader:532, Util (com.hazelcast.jet.impl.util)
processInbox:490, ProcessorTasklet (com.hazelcast.jet.impl.execution)
stateMachineStep:341, ProcessorTasklet (com.hazelcast.jet.impl.execution)
stateMachineStep:336, ProcessorTasklet (com.hazelcast.jet.impl.execution)
stateMachineStep:328, ProcessorTasklet (com.hazelcast.jet.impl.execution)
call:291, ProcessorTasklet (com.hazelcast.jet.impl.execution)
runTasklet:404, TaskletExecutionService$CooperativeWorker (com.hazelcast.jet.impl.execution)
accept:-1, 73422020 (com.hazelcast.jet.impl.execution.TaskletExecutionService$CooperativeWorker$$Lambda$1314)
forEach:895, CopyOnWriteArrayList (java.util.concurrent)
run:369, TaskletExecutionService$CooperativeWorker (com.hazelcast.jet.impl.execution)
run:748, Thread (java.lang)

HazelcastAPI MemberSocketInterceptor Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.nio.MemberSocketInterceptor;
Açıklaması şöyle.
Member Socket Interceptor can be registered via com.hazelcast.config.SocketInterceptorConfig
Warning: a MemberSocketInterceptor provides access to the socket and will bypass
any TLS encryption. So be warned that any data send using the SocketInterceptor
could be visible as plain text and could therefor be a security risk.
Kalıtım şöyle
SocketInterceptor
  MemberSocketInterceptor

Metodlar şöyle
public void onAccept(Socket acceptedSocket) throws IOException

public void onConnect(Socket connectedSocket) throws IOException

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt