Giriş
Cluster'ın hazelcast-jet-kafka jar'ını dahil ediyor olması gerekir.
Sütun tipleri
Sütun tipleri şunlar olabilir
VARCHAR
DOUBLE
BIGINT
DECIMAL
keyFormat ve valueFormat
keyFormat ve valueFormat şunlar olabilir
varchar,
bigint,
json-flat : Bu durumda Jackson kütüphanesi kullanılır
Örnek
Şöyle yaparız
CREATE OR REPLACE MAPPING "my-topic" (__key VARCHAR, this VARCHAR) TYPE Kafka OPTIONS ( 'keyFormat'='varchar', 'valueFormat' = 'varchar', 'bootstrap.servers' = '127.0.0.1:9092' ); INSERT INTO "my-topic" (__key, this) VALUES ('key2','value2'); SELECT * FROM "my-topic";
Örnek
Şöyle yaparız
CREATE MAPPING my_topic( __key BIGINT, ticker VARCHAR, amount INT) TYPE Kafka OPTIONS ( 'keyFormat'='bigint', 'valueFormat'='json-flat', 'bootstrap.servers' = '127.0.0.1:9092' )
Örnek
Şöyle yaparız
CREATE MAPPING trades ( id BIGINT, ticker VARCHAR, price DECIMAL, amount BIGINT) TYPE Kafka OPTIONS ( 'valueFormat' = 'json-flat', 'bootstrap.servers' = '127.0.0.1:9092' );
Örnek
Kafka topic şöyle olsun
{"measurement_id":"f2c36920-c09e-4014-9117-14945a0ea0c2""ts":1666623490338"station_id":"KMCB""lat":31.17833"long":31.17833"temperature":11}
Mapping yaratırız
CREATE MAPPING weather ( measurement_id VARCHAR, ts TIMESTAMP, station_id VARCHAR, state VARCHAR, lat DOUBLE, long DOUBLE, temperature DOUBLE) TYPE Kafka OPTIONS ( 'keyFormat' = 'varchar', 'valueFormat' = 'json-flat', 'bootstrap.servers' = '127.0.0.1:9092' );
Erişmek için şöyle yaparız
CREATE VIEW weather_ordered AS SELECT * FROM TABLE(IMPOSE_ORDER( TABLE weather, DESCRIPTOR(ts), INTERVAL '0.5' SECOND) );
Deserializer
Örnek
Şöyle yaparız. Burada 3 tane broker adresi veriliyor. Ayrıca key ve value için de deserializer tanımlanıyor
CREATE OR REPLACE EXTERNAL MAPPING "foo" TYPE Kafka OPTIONS ( 'keyFormat' = 'varchar' , 'valueFormat' = 'varchar' , 'auto.offset.reset' = 'earliest' , 'bootstrap.servers' = 'kafka-broker-0:9092,kafka-broker-1:9093,kafka-broker-2:9094' , 'group.id' = '779e9af8-b43d-41c5-86de-14199bd62995' , 'key.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer' , 'value.deserializer' = 'org.apache.kafka.common.serialization.StringDeserializer' )
Group.id
Açıklaması şöyle
If you don't have a processing guarantee and group.id it will always (re)start from the offset defined in auto.offset.reset
Acks
Örnek
Şöyle yaparız
CREATE OR REPLACE MAPPING testTopic ( id INT, ticker VARCHAR, price DECIMAL, amount BIGINT) TYPE Kafka OPTIONS ( 'keyFormat'='int', 'valueFormat' = 'json-flat', 'bootstrap.servers' = 'PLAINTEXT://localhost:65243', 'session.timeout.ms' = '45000', 'acks' = 'all' );
SSL
Örnek
Şöyle yaparız
CREATE OR REPLACE MAPPING my_topic
TYPE Kafka
OPTIONS (
'keyFormat' = 'varchar',
'valueFormat' = 'json',
'bootstrap.servers' = 'pkc-41wq6.eu-west-2.aws.confluent.cloud:9092',
'sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=''user'' password=''password'';',
'security.protocol' = 'SASL_SSL',
'sasl.mechanism' = 'PLAIN'
);
IMPOSE_ORDER()
Açıklaması şöyle
... Hazelcast can’t emit the results of windowed aggregations or stream-to-stream joins until it has received all the events belonging to the defined timeframe. However, due to differences in latency, events that fall within the timeframe may not arrive for processing until after it ends. To place a limit on late events, Hazelcast uses the IMPOSE_ORDER() function. This function allows you to specify a maximum event lag. Any event that arrives later than the maximum event lag is dropped.
Örnek
Şöyle yaparız
CREATE or REPLACE MAPPING trades ( id BIGINT, ticker VARCHAR, tradetime TIMESTAMP WITH TIME ZONE, price DECIMAL, amount BIGINT) TYPE Kafka OPTIONS ( 'valueFormat' = 'json-flat', 'bootstrap.servers' = 'broker:9092' ); CREATE VIEW trades_ordered AS SELECT * FROM TABLE(IMPOSE_ORDER(TABLE trades,DESCRIPTOR(tradetime), INTERVAL '0.5' SECONDS));
Hiç yorum yok:
Yorum Gönder