Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.config.JobConfig;
JobConfig nesnesi JetService nesnesine parametre olarak geçilir.
addClass metodu
Neden Lazım?
Çünkü lambda'lar otomatik gönderilmiyor. Şu kod çalışmaz. Lambda nesnesi HazelcastSerializationException'a sebep olur.
Pipeline pipeline = Pipeline.create() .readFrom(Sources.jdbc( url, "SELECT * FROM person", r-> new Person(rs.getInt(1),rs.getString(2),rs.getString(3)) ) ) .writeTo(Sinks.logger()) .getPipeline(); JobConfig jobConfig = new JobConfig(); .addClass(Person.class); setName("example") hazelcastInstance.getJet().newJob(pipeline,jobConfig);
Örnek
Şu kod çalışır.
BatchSource<String> items = TestSources.items( "United States", "Turkey", "United Kingdom", "Poland", "Ukraine" ); Pipeline pipeline = Pipeline.create() .readFrom(items) .map(new UpperCaseFunction()) .writeTo(Sinks.logger()) .getPipeline(); JobConfig jobConfig = new JobConfig() .addClass(UpperCaseFunction.class); client.getJet().newJob(pipeline, jobConfig);
Örnek
Şöyle yaparız
jobConfig .setName("Myjob") .addPackage("com.foo") .addClass(com.hazelcast.jet.cdc.DebeziumCdcSources.class) .addClass(io.debezium.connector.mongodb.MongoDbConnector.class); ... hz.getJet().newJobIfAbsent(pipeline, jobConfig);
addCustomClasspath metodu
Member başlarken ClusterProperty.PROCESSOR_CUSTOM_LIB_DIR ile belirtilen dizindeki kendi oluşturduğum bir jar dosyasını Job'ın kullanabilmesini sağlar.
addJar metodu
Açıklaması şöyle
Adds a JAR whose contents will be accessible to all the code attached to theunderlying pipeline or DAG, but not to any other code. An important exampleis the IMap data source, which can instantiate only the classes fromthe Jet instance's classpath.)This variant identifies the JAR with a path string. The filename part will beused as the resource ID, so two JARs with the same filename will be inconflict.Cannot be used for JetService#newLightJob(Pipeline) light jobs@implNote Backing storage for this method is an {@link IMap} with a defaultbackup count of 1. When adding big files as a resource, size thecluster accordingly in terms of memory, since each file will have 2copies inside the cluster(primary + backup replica).
Örnek
Şöyle yaparız
String jarPath = "/mydir/foo.jar"; jobConfig.addJar(jarPath);
addInZip metodu
Örnek
Şöyle yaparız
URL url = new URL("https://.../foo.zip"); jobConfig.addJarsInZip(url);
registerSerializer metodu
İmzası şöyle. Yani ikinci parametre bir StreamSerializer
public <T, S extends StreamSerializer<T>> JobConfig registerSerializer( Class<T> clazz, Class<S> serializerClass )
Örnek
Şöyle yaparız
new JobConfig() .registerSerializer(Person.class, PersonSerializer.class)
setArgument metodu
İş çalıştırılırken kullanılacak parametreyi belirtir
setName metodu
Şöyle yaparız
jobConfig
.setName("Myjob")
...
setProcessingGuarantee metodu
Şöyle yaparız
JobConfig jobConfig = new JobConfig() .setName(...) .setProcessingGuarantee(ProcessingGuarantee.AT_LEAST_ONCE) .setSnapshotIntervalMillis(250) .setSuspendOnFailure(true);
setTimeoutMillis metodu
İş belirtilen sürede bitmezse exception fırlatır. Şöyle yaparız
JobConfig jobConfig = new JobConfig() .setTimeoutMillis(10L);
Hiç yorum yok:
Yorum Gönder