Şu satırı dahil ederiz
bidirectionalStreamingService metoduimport com.hazelcast.jet.grpc.GrpcServices;
Örnek
Şöyle yaparız. Birinci parametre gRPC projesine ait bir ManagedChannelBuilder kullanır. İkinci parametre yine gRPC projesine ait bir asenkron stub yaratır. Stub şu anda brokerInfo() metodunu çağırıyor.
ServiceFactory<?, ? extends GrpcService<BrokerInfoRequest, BrokerInfoReply>> brokerService = GrpcServices.bidirectionalStreamingService( () -> ManagedChannelBuilder.forAddress("localhost", PORT).usePlaintext(), channel -> BrokerServiceGrpc.newStub(channel)::brokerInfo );
ServiceFactory ile bir Service yaratılır. İçi şöyle. CompletableFuture döner
public final class BidirectionalStreamingService<T, R> implements GrpcService<T, R> { @Nonnull @Override public CompletableFuture<R> call(@Nonnull T input) { checkForServerError(); CompletableFuture<R> future = new CompletableFuture<>(); futureQueue.add(future); sink.onNext(input); return future; } ... }
Bu yüzde servisi şöyle kullanabiliriz. call() metodu CompletableFuture döndüğü için thenApply() ile başka bir şey takılabilir.
StreamStage<Tuple2<Trade, String>> tradeAndProducts = ... tradeAndProducts.mapUsingServiceAsync(brokerService, (service, t) -> { BrokerInfoRequest request = BrokerInfoRequest .newBuilder().setId(t.f0().brokerId()).build(); return service .call(request) .thenApply(brokerReply -> tuple3(t.f0(), t.f1(), brokerReply.getBrokerName())); })
Hiç yorum yok:
Yorum Gönder