10 Ekim 2022 Pazartesi

Hazelcast Jet gRPC GrpcServices Sınıfı

Giriş
Şu satırı dahil ederiz
import com.hazelcast.jet.grpc.GrpcServices;
bidirectionalStreamingService metodu
Örnek
Şöyle yaparız. Birinci parametre gRPC  projesine ait bir ManagedChannelBuilder kullanır. İkinci parametre yine gRPC projesine ait bir asenkron stub yaratır. Stub şu anda brokerInfo() metodunu çağırıyor.
ServiceFactory<?, ? extends GrpcService<BrokerInfoRequest, BrokerInfoReply>>
brokerService = GrpcServices.bidirectionalStreamingService(
    () -> ManagedChannelBuilder.forAddress("localhost", PORT).usePlaintext(),
    channel -> BrokerServiceGrpc.newStub(channel)::brokerInfo
);
ServiceFactory ile bir Service yaratılır. İçi şöyle. CompletableFuture döner
public final class BidirectionalStreamingService<T, R> implements GrpcService<T, R> {

  @Nonnull @Override
  public CompletableFuture<R> call(@Nonnull T input) {
    checkForServerError();
    CompletableFuture<R> future = new CompletableFuture<>();
    futureQueue.add(future);
    sink.onNext(input);
    return future;
  }
  ...
}
Bu yüzde servisi şöyle kullanabiliriz. call() metodu CompletableFuture  döndüğü için thenApply() ile başka bir şey takılabilir.
StreamStage<Tuple2<Trade, String>> tradeAndProducts = ...
tradeAndProducts.mapUsingServiceAsync(brokerService,
  (service, t) -> {
    BrokerInfoRequest request = BrokerInfoRequest
      .newBuilder().setId(t.f0().brokerId()).build();
    return service
      .call(request)
      .thenApply(brokerReply ->
        tuple3(t.f0(), t.f1(), brokerReply.getBrokerName()));
})





Hiç yorum yok:

Yorum Gönder

THIRD-PARTY.txt Dosyası

Kullanılan harici kütüphanelerin sürümleri bu dosyada Dosyanın yolu şöyle hazelcast/licenses/THIRD-PARTY.txt