Back to Hazelcast

007 - Extended gRPC Support

docs/design/jet/007-grpc-support.md

5.6.06.6 KB
Original Source

Since: 4.1

Goal

Provide a convenient way to call gRPC services with good performance as part of a pipeline.

Background

A Jet pipeline can contact outside services to process the data. The general facility is called mapUsingService, with variants that do filtering and flat-mapping. With this transform, each processor creates a proxy to a service and then invokes it for each input item. Jet also supports services with async APIs through mapUsingServiceAsync and in that case it can make several concurrent requests to the same service without any blocking.

gRPC is an RPC framework for building network services and handles concerns like transport, authentication and stub generation with support for several different programming languages.

If you have a pipeline where you want to call a gRPC service for each input item, you can use the mapUsingService transform directly, but this will not give you the best performance because it will not be making use of asynchronous calls and each processor will only have a single request in flight a time. It's also possible to use mapUsingServiceAsync in combination with the non-blocking stub, but this requires a lot of boilerplate.

Furthermore, our internal tests showed that the most efficient gRPC communication pattern is bidirectional streaming. Using it is even less straightforward, requiring some fairly advanced code in the ServiceFactory.

As a part of the Jet-Python integration, we discovered some best practices for integrating a Jet pipeline with gRPC, and the goal is to apply these in a more generic way.

## Design

The implementation includes a new module hazelcast-jet-grpc which is added to the extensions folder. The module has one main entry point which is GrpcServices which provides the convenience for creating ServiceFactory for the corresponding the gRPC service.

There are two methods added, where one is for unary service and the other for the bidirectional service.

### Unary Service

Unary service is the most basic gRPC service type, where each request has a matching response and the service is called in a request-response fashion.

Given the protobuf definition below for a unary service:

proto
service Greeter {
   // Sends a greeting
   rpc SayHello (HelloRequest) returns (HelloReply) {}
}

We can create the following service factory:

java

ServiceFactory<?, ? extends GrpcService<HelloRequest, HelloResponse> greeterService = unaryService(
  () -> ManagedChannelBuilder.forAddress("localhost", 5000).usePlaintext(),
  channel -> GreeterGrpc.newStub(channel)::sayHello
);

where GreeterGrpc is the class auto-generated by the protobuf compiler.

GrpcService has a very simple interface which only has a single method:

java
@FunctionalInterface
public interface GrpcService<T, R> {

    /**
     * Calls the requested service and returns a future which will be
     * completed with the result once a response is received.
     */
    @Nonnull
    CompletableFuture<R> call(@Nonnull T input);
}

So for every input item, a future containing the output item is created.

The created ServiceFactory<.., GrpcService> then can be used with the mapUsingServiceAsync transform as follows:

java
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("one", "two", "three", "four"))
  .mapUsingServiceAsync(greeterService, (service, input) -> {
    HelloRequest request = HelloRequest.newBuilder().setName(input).build();
    return service.call(request);
 })
 .writeTo(Sinks.logger());

In this implementation, Jet will simply send an async request for each item, and when the matching response comes it will be emitted further in the pipeline.

#### Ordering

When a unary service is used, the order of items and their matching timestamps in a stream are always preserved. The mapUsingServiceAsync transforms has several mechanisms for making sure that input/output items are matched and they are emitted in the input order.

Bidirectional Streaming

In bidirectional streaming mode, the requests are sent in a stream and the replies are also sent as a stream. This can provide better throughput because there's less overhead per item and there's also opportunity to send requests in batches.

Given the proto definition below:

proto
service Greeter {
  // Sends a greeting
  rpc SayHello (stream HelloRequest) returns (stream HelloReply) {}
}

We can create the following service factory:

java
ServiceFactory<?, ? extends GrpcService<HelloRequest, HelloResponse> greeterService = bidirectionalStreamingService(
  () -> ManagedChannelBuilder.forAddress("localhost", 5000).usePlaintext(),
  channel -> GreeterGrpc.newStub(channel)::sayHello
);

The way the service is invoked in a mapUsingServiceAsync call would be identical.

java
Pipeline p = Pipeline.create();
p.readFrom(TestSources.items("one", "two", "three", "four"))
  .mapUsingServiceAsync(greeterService, (service, input) -> {
    HelloRequest request = HelloRequest.newBuilder().setName(input).build();
    return service.call(request);
 })
 .writeTo(Sinks.logger());

#### Ordering

In the bidirectional streaming style, gRPC doesn't enforce any correlation between the input and output streams. However, from a higher-level perspective, the service is just mapping the input to the output, one item at a time, and Jet must match them up based on nothing more than their order of appearance. Therefore, for each input item your gRPC service receives, it must emit exactly one output item in exactly the same order. If you want to filter out some items, you must do that by using special "null sentinel" items that you can then filter out in post-processing.

### Error Handling

If the gRPC service returns an error response, Jet completes the corresponding future with an exception. For non-fatal errors you should use CompletableFuture.handle() to map the exception to an appropriate item, otherwise the job will fail.

Future Work

It may be interesting to use a builder pattern for building the gRPC factory to make the syntax a little simpler. This was investigated but dropped for now due to increased complexity.

It may be possible to support unordered bidirectional streaming by requiring a correlation id field to the requests and then matching them to the responses, but this was also considered out of scope for now.

Currently the deployment and scaling of a gRPC service is left to user, however it may be an option in the future to have a "Jet-managed" gRPC bundle which contains the service, and that can be deployed alongside the Jet nodes.