20 Haziran 2023 Salı

HazelcastAPI Ringbuffer Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.ringbuffer.Ringbuffer;
Açıklaması şöyle. Yani tüm veri tek bir node ve replica node üzerindedir.
Hazelcast Ringbuffer is a replicated but not partitioned data structure that stores its data in a ring-like structure. You can think of it as a circular array with a given capacity. 
readManyAsync metodu
İmzası şöyle
CompletionStage<ReadResultSet<E>> readManyAsync(
  long startSequence,
  int minCount,
  int maxCount,
  IFunction<E, Boolean> filter);
Açıklaması şöyle
startSequence: Sequence of the first item to read.

minCount: Minimum number of items to read. If you do not want to block, set it to 0. If you want to block for at least one item, set it to 1.

maxCount: Maximum number of the items to retrieve. Its value cannot exceed 1000.

filter: A function that accepts an item and checks if it should be returned. If no filtering should be applied, set it to null.
Örnek
Şöyle yaparız. Burada sequence farklı artırılıyor çünkü, filtre ile bazı şeyler süzülmüş olabilir
long sequence = rb.headSequence();
for(;;) {
  CompletionStage<ReadResultSet<String>> f = rb.readManyAsync(sequence, 1, 10, null);
  CompletionStage<Integer> readCountStage = f.thenApplyAsync(rs -> {
    for (String s : rs) {
      System.out.println(s);
    }
    return rs.readCount();
  });
  sequence += readCountStage.toCompletableFuture().join();
}
readOne metodu
Açıklaması şöyle
Use the method readOne to return the item at the given sequence; readOne blocks if no item is available. To read the next item, increment the sequence by one.
By exposing the sequence, you can now move the item from the Ringbuffer as long as the item is still available. If the item is not available any longer, StaleSequenceException is thrown.
Örnek
Şöyle yaparız
HazelcastInstance hz = Hazelcast.newHazelcastInstance();
Ringbuffer<String> ringbuffer = hz.getRingbuffer("rb");
long sequence = ringbuffer.headSequence();
while(true){
  String item = ringbuffer.readOne(sequence);
  sequence++;
  // process item
}

16 Haziran 2023 Cuma

hazelcast-cloud Maven Plugin

Giriş
apiKey şöyle aFQOq7bmm3yfveUmZHRbtyRiggggg
apiSecret daha uzun ve şöyle m1bxcktn59JeXl1aTo3EbSZnGhuRbiz8eo7OXfYO0izBOh98pAFfEU61fDAAA5555

Örnektekiler rastgele sayılar

Örnek
Şöyle yaparız
<plugin>
  <groupId>com.hazelcast.cloud</groupId>
  <artifactId>hazelcast-cloud-maven-plugin</artifactId>
  <version>0.0.7</version>
  <configuration>
    <apiBaseUrl>https://api.viridian.hazelcast.com</apiBaseUrl>
    <clusterName>${clusterName}</clusterName>
    <apiKey>${apiKey}</apiKey>
    <apiSecret>${apiSecret}</apiSecret>
  </configuration>
</plugin>


Management Center

Metrics
Management Center cluster içindeki tüm üyelerden Metrics verilerini toplar ve bunları Prometheus ile okunabilir hale getirir

hazelcast.xml - Management Center Ayarları

enabled Alanı
Örnek
Şöyle yaparız
hazelcast:
  cluster-name: springhow
  management-center:
    enabled: true
    url: 'http://localhost:8080/'
  network:
    join:
      multicast:
        enabled: true
scripting-enabled Alanı
Örnek 
Şöyle yaparız
<hazelcast xmlns="http://www.hazelcast.com/schema/config">
  <cluster-name>hazelcast-test</cluster-name>
  <network>
    <port auto-increment="true" port-count="20">5701</port>
    <join>
      <multicast enabled="false"/>
      <tcp-ip enabled="true">
        <member>localhost:5701</member> 
        <member>localhost:5702</member>
        <member>localhost:5703</member>
      </tcp-ip>
    </join>
  </network>

  <management-center scripting-enabled="true" />
</hazelcast>

15 Haziran 2023 Perşembe

Hazelcast Jet ReadMapOrCacheP Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.impl.connector.ReadMapOrCacheP;
Sanırım her partition için 
- CacheFetchEntriesOperation
- MapFetchEntriesOperation
nesnelerini kullanarak veri okur

9 Haziran 2023 Cuma

Hazelcast Jet Processor.process metodu

Giriş
Write Processor veya ara adımları geçekleştiren Processor  tarafından override edilir. İmzası şöyle
void process(int ordinal, Inbox inbox)
Örnek
Kod şöyle. Burada en son işlenen mesajlar bittiyse buffer nesnesi tekrar dolduruluyor ve gönderme işlemi tekrar başlıyor
private final Buffer<T> buffer;

@Override
public void process(int ordinal, @Nonnull Inbox inbox) {
  ...
  if (sendResult != null) {
    checkIfSendingFinished();
  }
  if (sendResult == null) {
    initSending(inbox);
  }
}
private void initSending(@Nullable Inbox inbox) {
  if (inbox != null) {
    bufferFromInbox(inbox);
  }
  attemptToDispatchBufferContent();
}
private void bufferFromInbox(@Nonnull Inbox inbox) {
  for (T t; (t = (T) inbox.peek()) != null && buffer.add(t); ) {
    inbox.remove();
  }
}
private void attemptToDispatchBufferContent() {
  if (buffer.isEmpty()) {
    return;
  }
  long currentTime = nanoTime();
  if (currentTime < nextSendTime) {
    return;
  }
  List<PutRecordsRequestEntry> entries = buffer.content();
  sendResult = putRecordsAsync(entries);
  nextSendTime = currentTime;
}

Hazelcast Jet Processor.complete metodu - Source Processor İçindir

Giriş
İmzası şöyle
boolean complete()
Açıklaması şöyle
For example, a streaming source processor will return false forever. 
Açıklaması şöyle. Yani false dönerse, Jet Engine Processor'ı tekrar çağırır
If the complete() method returns false, the Hazelcast Jet engine will attempt to reschedule the processor to run again.
Örnek - Stream Source
Bu kodu ReadKafkaConnectP.java dosyasından aldım. İskeleti şöyle. complete() hep false döner. Eğer okunan kaynak boş bir şey dönerse eventTimeMapper.flatMapIdle(); çağrılır. Bu metod içinde saat bilgisi bulunduran bir Traverser döner. Yan aslında watermark döndürüyor
public class ReadKafkaConnectP extends AbstractProcessor {
  ...
  private final EventTimeMapper<SourceRecord> eventTimeMapper;
  private Traverser<?> traverser;
  
  @Override
  public boolean complete() {
    ...
    if (traverser == null) {
      List<SourceRecord> sourceRecords = ...
      this.traverser = traverser(sourceRecords)
        .map(rec -> {
          taskRunner.commitRecord((SourceRecord) rec);
          return rec;
        })
        .onFirstNull(() -> traverser = null);
    }
    emitFromTraverser(traverser);
    return false;
  }
  public Traverser<?> traverser(List<SourceRecord> sourceRecords) {
    if (sourceRecords.isEmpty()) {
      return eventTimeMapper.flatMapIdle();
    }
    return traverseIterable(sourceRecords)
      .flatMap(rec -> {
        long eventTime = rec.timestamp() == null ? 0 : rec.timestamp();
        return eventTimeMapper.flatMapEvent(rec, 0, eventTime);
    });
  }
}
Örnek - Batch veya Stream Source
Bu kodu ReadMongoP.java dosyasından aldım. complete() metodunda 
1. batch iş için reader.everCompletes() içinde hep true döner, 
2. stream iş için reader.everCompletes() içinde hep false döner
3. emitFromTraverser() eğer traverse'da next() yoksa false döner.
public class ReadMongoP<I> extends AbstractProcessor {

  private final MongoChunkedReader reader;
 
  private Traverser<?> traverser;
   
  @Override
  public boolean complete() {
    ...
    if (traverser == null) {
      this.traverser = reader.nextChunkTraverser()
                             .onFirstNull(() -> traverser = null);
    }
    if (!emitFromTraverser(traverser)) {
      return false;
    }
    ...
    return reader.everCompletes();
  }
  ...
}

HazelcastAPI QueueStoreWrapper Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.collection.impl.queue.QueueStoreWrapper;
Kod şöyle. İçinde bir QueueStore var
public final class QueueStoreWrapper implements QueueStore<Data> {
  private final String name;
  private int memoryLimit = DEFAULT_MEMORY_LIMIT;
  private int bulkLoad = DEFAULT_BULK_LOAD;
  private boolean enabled;
  private boolean binary;
  private QueueStore store;
  private SerializationService serializationService;
  ...
}
store metodu
Kod şöyle
@Override
public void store(Long key, Data value) {
  if (!enabled) {
    return;
  }
  Object actualValue;
  if (binary) {
    // WARNING: we can't pass original Data to the user
    actualValue = Arrays.copyOf(value.toByteArray(), value.totalSize());
  } else {
    actualValue = serializationService.toObject(value);
  }
  store.store(key, actualValue);
}
binary için açıklama şöyle. Yani Queue içindeki veri byte[] şeklinde, eğer store içinde de byte[] olsun istiyorsak binary = true yapılır, eğer Java nesnesi şeklinde istiyorsak binary = false yapılır
Binary: By default, Hazelcast stores the queue items in serialized form, and before it inserts the queue items into the queue store, it deserializes them. If you are not reaching the queue store from an external application, you might prefer that the items be inserted in binary form. Do this by setting the binary property to true: then you can get rid of the deserialization step, which is a performance optimization. The binary property is false by default.

8 Haziran 2023 Perşembe

HazelcastAPI ExecutorConfig Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.config.ExecutorConfig;
setPoolSize metodu
Örnek
Şöyle yaparız
Config config = ...;
config.addExecutorConfig(new ExecutorConfig("distributed-scheduler")
    .setPoolSize(10)
    .setQueueCapacity(1_000));
setQueueCapacity metodu
Örnek yukarıda


HazelcastAPI Member Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.cluster.Member;
Cluster nesnesi ile elde edilir.

getAddressMap metodu
Örnek
Şöyle yaparız
HazelcastInstance hazelcastInstance = ...
Set<Member> members = hazelcastInstance.getCluster().getMembers(); for (Member member : members) { System.out.println(member.getAddressMap()); }
getUuid metodu
Örnek
Şöyle yaparız. Burada java.util.UUID karşılaştırması yapılarak en eski Member bulunuyor
HazelcastInstance instance = ..;

@Scheduled(fixedDelay = 10_000, initialDelay = 60_000)
void startJob() {
   String leaderAddress = getOldestMember().getSocketAddress().toString();
   String currentAddress = instance.getCluster().getLocalMember().getSocketAddress()
     .toString();

   // Run the task only on leader node

   if (currentAddress.equals(leaderAddress)) {
      System.out.println("I am leader, use me to poll database, distribute task etc");
      ...
  }
}

private Member getOldestMember() {
  Cluster cluster = instance.getCluster();
  Member oldestMember = null;
  for (Member member : cluster.getMembers()) {
    if (oldestMember == null || member.getUuid().compareTo(oldestMember.getUuid()) < 0) {
      oldestMember = member;
    }
  }
  return oldestMember;
}

HazelcastSpringBoot Cache Kullanımı

Maven
Şu satırı dahil ederiz
<dependency>
  <groupId>com.hazelcast</groupId>
  <artifactId>hazelcast-spring</artifactId>
  <version>5.0.1</version>
</dependency>
hazelcast-spring Projesine Dair Notlar
XML dosyasında geçen tag'leri hangi sınıfın bileşenlerine ayıraca HazelcastNamespaceHandler dosyasında tanımlı.

Konfigürasyon
1. hazelcast.xml veya hazelcast.yaml Dosyası kullanılır
spring-boot-autoconfigure projesinde iki tane auto configuration için bean var. Bunlar

- Server sınıfı hazelcast.xml veya hazelcast.yaml dosyası varsa etkindir.  ConfigAvailableCondition sınıfında görülebilir

Client sınıfı hazelcast-client.xml, hazelcast-client.yml dosyası varsa etkindir. HazelcastClientConfigAvailableCondition sınıfında görülebilir

1.1 ClassPath'e Eklenir
Açıklaması şöyle
Add a hazelcast.xml file to the src/main/resources folder and spring boot will auto configure hazelcast for you. 
1.2 Dosya Yolu Belirtilir
Açıklaması şöyle
You can optionally configure the location of the hazelcast.xml file in your properties or YAML file using spring.hazelcast.config
Örnek
Şöyle yaparız
# application.yml
spring:
  hazelcast:
    config: classpath:config/hazelcast.xml

# application.properties
spring.hazelcast.config=classpath:config/hazelcast.xml

2. Kodla Konfigürasyon

1. Config Nesnesi
Bir tane Config nesnesi yaratılır. Bu nesne isteğe bağlı olarak bean yapılır

Örnek - Bir Cluster Oluşturmak
Şöyle yaparız
@Bean
public Config hazelcastConfig() {
  Config config = new Config();
  NetworkConfig networkConfig = config.getNetworkConfig();
  JoinConfig joinConfig = networkConfig.getJoin();

  joinConfig.getMulticastConfig().setEnabled(false);
  joinConfig.getTcpIpConfig().setEnabled(true)
    .setMembers(Arrays.asList("127.0.0.1"));

  config.addExecutorConfig(new ExecutorConfig("distributed-scheduler")
    .setPoolSize(10)
    .setQueueCapacity(1_000));
  return config;
}
Örnek
Şöyle yaparız
@Configuration
public class HazelcastConfiguration {

  @Bean
  public Config hazelCastConfig(){
    return new Config()
      .setInstanceName("hazelcast-instance")
      .addMapConfig(
        new MapConfig()
          .setName("instruments")
          .setMaxSizeConfig(
            new MaxSizeConfig(200,MaxSizeConfig.MaxSizePolicy.FREE_HEAP_SIZE))
          .setEvictionPolicy(EvictionPolicy.LRU)
          .setTimeToLiveSeconds(20));
  }
}

2. HazelcastInstance Nesnesi
Bu nesne bean yapılır. Bundan sonra HazelcastInstance nesnesi @Autowire ile kullanılabilir
Örnek
Şöyle yaparız
@Bean
public HazelcastInstance hazelcastInstance() {
  ClientConfig config = new ClientConfig();
  config.setClusterName("dev");
  return HazelcastClient.newHazelcastClient(config);
}
Örnek
Şöyle yaparız
@Service
public class MyService {
  @Autowired
  private HazelcastInstance instance;
  ...
}
3. HazelcastCacheManager 
Şu satırı dahil ederiz
import com.hazelcast.spring.cache.HazelcastCacheManager;
Bu nesneyi yarattıktan sonra SpringBoot ile gelen @Cacheable, @CachePut, @CacheEvict anotasyonları kullanılabilir.

Kalıtım şöyle
org.springframework.cache.CacheManager
  HazelcastCacheManager

Örnek
Şöyle yaparız. Burada org.springframework.cache.annotation.CachingConfigurerSupport kullanılıyor. Bir tane org.springframework.cache.CacheManager yaratılıyor
@Configuration
@EnableCaching
public class CacheConfig extends CachingConfigurerSupport {

  @Autowired
  private HazelcastInstance hazelcastInstance;

  @Override
  public CacheManager cacheManager() {
    return new HazelcastCacheManager(hazelcastInstance);
  }
}





HazelcastAPI TpcServerBootstrap Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.internal.tpc.TpcServerBootstrap;
bind metodu
Kod şöyle. Verilen port aralığındaki bir port numarasına bind etmeye çalışır
private int bind(AsyncServerSocket serverSocket, int port, int limit) {
  while (port < limit) {
    try {
      serverSocket.bind(new InetSocketAddress(thisAddress.getInetAddress(), port));
      return port + 1;
    } catch (UncheckedIOException e) {
      if (e.getCause() instanceof BindException) {
        // this port is occupied probably by another hz member, try another one
        port += tpcEngine.reactorCount();
      } else {
        throw e;
      }
    } catch (UnknownHostException e) {
      throw new UncheckedIOException(e);
    }
  }
  throw new HazelcastException("Could not find a free port in the TPC socket port range.");
}

7 Haziran 2023 Çarşamba

HazelcastAPI IMap.addIndex metodu

Giriş
İmzası şöyle
void addIndex(IndexConfig indexConfig);

default void addIndex(IndexType type, String... attributes) {
  IndexConfig config = IndexUtils.createIndexConfig(type, attributes);

  addIndex(config);
}
Örnek - Tek Index
Şöyle yaparız
IMap<String, Person> map = hzClient.getMap("map");
mapWithIndex.addIndex(new IndexConfig(IndexType.HASH, "name"));
Örnek - Key İçin Index
Şöyle yaparız
val userOrderConfig = MapConfig(Caches.USER_ORDERS)
userOrderConfig.apply {
  this.evictionConfig = EvictionConfig().setEvictionPolicy(EvictionPolicy.LRU).apply {
    this.maxSizePolicy =MaxSizePolicy.PER_NODE
    this.size = 10000
  }
  this.timeToLiveSeconds = (30*60)
}
    
userOrderConfig.addIndexConfig(IndexConfig(IndexType.SORTED, "__key"))
hzInstance.config.addMapConfig(userOrderConfig)
Örnek - İki Index
Şöyle yaparız
import com.hazelcast.query.Predicates;

IMap<String, Employee> map = instance.getMap("employees");
map.addIndex(IndexType.SORTED, "age");
map.addIndex(IndexType.HASH, "active");

Set<Map.Entry<String, Employee>> entries = map
  .entrySet(Predicates.sql("active=true and age>44"));

HazelcastAPI AbstractPartitionMessageTask Sınfı - Member Üzerindeki Client Thread Çalıştırır

Giriş
Şu satırı dahil ederiz
import com.hazelcast.client.impl.protocol.task.AbstractPartitionMessageTask;
Kalıtım şöyle
MessageTask
  AbstractMessageTask
    AbstractAsyncMessageTask
      AbstractPartitionMessageTask

Veri yapılarının belirtilen bir partition üzerinde işlem yapan tüm Task sınıfları bu sınıftan kalıtır
Yani List, Map, Queue vs için bir sürü Task sınıfı var.

Stack çıktısı şöyle
com.hazelcast.spi.impl.operationservice.impl.OperationServiceImpl.createInvocationBuilder
com.hazelcast.client.impl.protocol.task.AbstractPartitionMessageTask.processInternal
com.hazelcast.client.impl.protocol.task.AbstractAsyncMessageTask.processMessage
com.hazelcast.client.impl.protocol.task.AbstractMessageTask.initializeAndProcessMessage
com.hazelcast.client.impl.protocol.task.AbstractMessageTask.run
com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.loop
com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.executeRun
com.hazelcast.internal.util.executor.HazelcastManagedThread.run

6 Haziran 2023 Salı

HazelcastAPI WanProtocolNegotiationResponse Arayüzü

Giriş
Şu satırı dahil ederiz
import com.hazelcast.enterprise.wan.impl.operation.WanProtocolNegotiationResponse;
WanProtocolNegotiationOperation tarafından döndürülür. WanReplicationService nesnesinden 
getSupportedWanProtocolVersions() çağrısı ile desteklenen protokol sürümü alınır ve gelen istek ile karşılaştırılır.

İlave olarak cevap veren taraf şu bilgileri de gönderir
MemberVersion memberVersion = getNodeEngine().getLocalMember().getVersion();
Version clusterVersion = getNodeEngine().getClusterService().getClusterVersion();


4 Haziran 2023 Pazar

Tomcat

Giriş
GitHub kodu burada.  Bu projeyi ilk defa burada gördüm

HazelcastSessionManager Sınıfı
Bu sınıftan her tomcat sürümü için bir sürü var. Çünkü proje altında tomcat7, tomcat8, tomcat9 gibi modüller var. Ben son sürüm olan tomcat9'a baktım. Kod şöyle
public class HazelcastSessionManager extends ManagerBase implements Lifecycle, PropertyChangeListener, SessionManager {

  private static final String NAME = "HazelcastSessionManager";

  private static final int DEFAULT_SESSION_TIMEOUT = 60;

  private static final int SECONDS_IN_MINUTE = 60;

  private final Log log = LogFactory.getLog(HazelcastSessionManager.class);

  private IMap<String, HazelcastSession> sessionMap;

  private boolean clientOnly;

  private boolean sticky = true;

  private String mapName;

  private boolean deferredWrite = true;

  private String hazelcastInstanceName;

  private HazelcastInstance instance;
  ...
}
context.xml dosyasında Session Manager olarak Tomcat'e tanıtılıyor. Bundan sonra her yeni session için HazelcastSession nesnesi yaratıyor
<Context>
  ...
  <Manager className="com.hazelcast.session.HazelcastSessionManager"/>
  ...
</Context>
veya şöyle yaparız
<Context>
  <Manager className="com.hazelcast.session.HazelcastSessionManager"
    clientOnly="true"/>
</Context>
HazelcastInstanceFactory Sınıfı
Bu sınıftan Hazelcast projelerinde sanırım bir kaç tane var. Bir tanesi hazelcast-tomcat-sessionmanager projesinde.

getHazelcastInstance metodu
İmzası şöyle
public static HazelcastInstance getHazelcastInstance(ClassLoader classLoader, 
    boolean clientOnly, String instanceName) throws LifecycleException
tomcat7, tomcat8, tomcat9 gibi modüllerdeki HazelcastSessionManager tarafından çağrılır. Yeni bir HazelcastInstance yaratır. clientOnly değişkeni true ise, mevcut cluster'a bağlanır, false ise yeni bir member yaratır veya mevcut member'a bağlanır 






2 Haziran 2023 Cuma

HazelcastAPI ObjectDataOutputStream Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.internal.serialization.impl.ObjectDataOutputStream;
Kalıtım şöyle
ObjectDataOutput
  VersionedObjectDataOutput
    ObjectDataOutputStream

Aslında bu sınıf kendi içinde java.io.DataOutputStream sınıfını kullanıyor. Kod şöyle. Dolayısıyla StreamSerializer kullanmak ne avantaj getiriyor anlamadım.
public class ObjectDataOutputStream extends VersionedObjectDataOutput
  implements ObjectDataOutput, Closeable, SerializationServiceSupport, DataWriter {

  private final InternalSerializationService serializationService;
  private final DataOutputStream dataOut;
  private final ByteOrder byteOrder;
  ...
}

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt