6 Kasım 2022 Pazar

Hazelcast SQL - Kafka İçin CREATE MAPPING Örnekleri

Giriş
Cluster'ın hazelcast-jet-kafka jar'ını dahil ediyor olması gerekir.

Sütun tipleri
Sütun tipleri şunlar olabilir
VARCHAR
DOUBLE
BIGINT
DECIMAL

keyFormat ve valueFormat
keyFormat ve valueFormat şunlar olabilir
varchar, 
bigint,
json-flat : Bu durumda Jackson kütüphanesi kullanılır


Örnek
Şöyle yaparız
CREATE OR REPLACE MAPPING "my-topic" (__key VARCHAR, this VARCHAR)
TYPE Kafka 
OPTIONS (
  'keyFormat'='varchar',
  'valueFormat' = 'varchar',
  'bootstrap.servers' = '127.0.0.1:9092'
);

INSERT INTO "my-topic" (__key, this) VALUES ('key2','value2');

SELECT * FROM "my-topic";
Örnek
Şöyle yaparız
CREATE MAPPING my_topic(
  __key BIGINT,
  ticker VARCHAR,
  amount INT)
TYPE Kafka
OPTIONS (
  'keyFormat'='bigint',
  'valueFormat'='json-flat',
  'bootstrap.servers' = '127.0.0.1:9092'
)
Örnek
Şöyle yaparız
CREATE MAPPING trades (
  id BIGINT,
  ticker VARCHAR,
  price DECIMAL,
  amount BIGINT)
TYPE Kafka
OPTIONS (
  'valueFormat' = 'json-flat',
  'bootstrap.servers' = '127.0.0.1:9092'
);
Örnek
Kafka topic şöyle olsun
{
  "measurement_id":"f2c36920-c09e-4014-9117-14945a0ea0c2"
  "ts":1666623490338
  "station_id":"KMCB"
  "lat":31.17833
  "long":31.17833
  "temperature":11
}
Mapping yaratırız
CREATE MAPPING weather (
    measurement_id VARCHAR,
    ts TIMESTAMP,
    station_id VARCHAR,
    state VARCHAR,
    lat DOUBLE,
    long DOUBLE,
    temperature DOUBLE)
TYPE Kafka
OPTIONS (
    'keyFormat' = 'varchar',
    'valueFormat' = 'json-flat',
    'bootstrap.servers' = '127.0.0.1:9092'
);
Erişmek için şöyle yaparız
CREATE VIEW weather_ordered AS
SELECT *
  FROM TABLE(IMPOSE_ORDER(
  TABLE weather,
  DESCRIPTOR(ts),
  INTERVAL '0.5' SECOND)
);
Deserializer
Örnek
Şöyle yaparız. Burada 3 tane broker adresi veriliyor. Ayrıca key ve value için de deserializer tanımlanıyor
CREATE OR REPLACE EXTERNAL MAPPING "foo" TYPE Kafka 
OPTIONS (  
  'keyFormat' = 'varchar' ,
  'valueFormat' = 'varchar' ,
  'auto.offset.reset' = 'earliest' ,
  'bootstrap.servers' = 'kafka-broker-0:9092,kafka-broker-1:9093,kafka-broker-2:9094' ,
  'group.id' = '779e9af8-b43d-41c5-86de-14199bd62995' ,
  'key.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer' ,
  'value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer' 
) 
Group.id
Açıklaması şöyle
If you don't have a processing guarantee and group.id it will always (re)start from the offset defined in auto.offset.reset

Acks
Örnek
Şöyle yaparız
CREATE OR REPLACE MAPPING testTopic (
  id INT,
  ticker VARCHAR,
  price DECIMAL,
  amount BIGINT)
TYPE Kafka
OPTIONS (
  'keyFormat'='int',
  'valueFormat' = 'json-flat',
  'bootstrap.servers' = 'PLAINTEXT://localhost:65243',
  'session.timeout.ms' = '45000',
  'acks' = 'all'
);
SSL
Örnek
Şöyle yaparız
CREATE OR REPLACE MAPPING my_topic
TYPE Kafka
OPTIONS (
  'keyFormat' = 'varchar',
  'valueFormat' = 'json',
  'bootstrap.servers' = 'pkc-41wq6.eu-west-2.aws.confluent.cloud:9092',
  'sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=''user'' password=''password'';',
  'security.protocol' =  'SASL_SSL',
  'sasl.mechanism' = 'PLAIN'
);
IMPOSE_ORDER()
Açıklaması şöyle
... Hazelcast can’t emit the results of windowed aggregations or stream-to-stream joins until it has received all the events belonging to the defined timeframe. However, due to differences in latency, events that fall within the timeframe may not arrive for processing until after it ends. To place a limit on late events, Hazelcast uses the IMPOSE_ORDER() function. This function allows you to specify a maximum event lag. Any event that arrives later than the maximum event lag is dropped.
Örnek
Şöyle yaparız
CREATE or REPLACE MAPPING trades (
  id BIGINT,
  ticker VARCHAR,
  tradetime TIMESTAMP WITH TIME ZONE,
  price DECIMAL,
  amount BIGINT)
TYPE Kafka
OPTIONS (
  'valueFormat' = 'json-flat',
  'bootstrap.servers' = 'broker:9092'
);

CREATE VIEW trades_ordered AS
SELECT *
  FROM TABLE(IMPOSE_ORDER(TABLE trades,DESCRIPTOR(tradetime), INTERVAL '0.5' SECONDS));

Hiç yorum yok:

Yorum Gönder

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt