Şu satırı dahil ederiz
bidirectionalStreamingService metoduimport com.hazelcast.jet.grpc.GrpcServices;
Örnek
Şöyle yaparız. Birinci parametre gRPC projesine ait bir ManagedChannelBuilder kullanır. İkinci parametre yine gRPC projesine ait bir asenkron stub yaratır. Stub şu anda brokerInfo() metodunu çağırıyor.
ServiceFactory<?, ? extends GrpcService<BrokerInfoRequest, BrokerInfoReply>>
brokerService = GrpcServices.bidirectionalStreamingService(
() -> ManagedChannelBuilder.forAddress("localhost", PORT).usePlaintext(),
channel -> BrokerServiceGrpc.newStub(channel)::brokerInfo
);ServiceFactory ile bir Service yaratılır. İçi şöyle. CompletableFuture döner
public final class BidirectionalStreamingService<T, R> implements GrpcService<T, R> {
@Nonnull @Override
public CompletableFuture<R> call(@Nonnull T input) {
checkForServerError();
CompletableFuture<R> future = new CompletableFuture<>();
futureQueue.add(future);
sink.onNext(input);
return future;
}
...
}Bu yüzde servisi şöyle kullanabiliriz. call() metodu CompletableFuture döndüğü için thenApply() ile başka bir şey takılabilir.
StreamStage<Tuple2<Trade, String>> tradeAndProducts = ...
tradeAndProducts.mapUsingServiceAsync(brokerService,
(service, t) -> {
BrokerInfoRequest request = BrokerInfoRequest
.newBuilder().setId(t.f0().brokerId()).build();
return service
.call(request)
.thenApply(brokerReply ->
tuple3(t.f0(), t.f1(), brokerReply.getBrokerName()));
})
Hiç yorum yok:
Yorum Gönder