Back to Arrow

Arrow Flight RPC

docs/source/cpp/flight.rst

latest14.5 KB
Original Source

.. Licensed to the Apache Software Foundation (ASF) under one .. or more contributor license agreements. See the NOTICE file .. distributed with this work for additional information .. regarding copyright ownership. The ASF licenses this file .. to you under the Apache License, Version 2.0 (the .. "License"); you may not use this file except in compliance .. with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing, .. software distributed under the License is distributed on an .. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY .. KIND, either express or implied. See the License for the .. specific language governing permissions and limitations .. under the License.

.. default-domain:: cpp .. highlight:: cpp

================ Arrow Flight RPC

Arrow Flight is an RPC framework for efficient transfer of Flight data over the network.

.. seealso::

:doc:Flight protocol documentation <../format/Flight> Documentation of the Flight protocol, including how to use Flight conceptually.

:doc:Flight API documentation <./api/flight> C++ API documentation listing all of the various client and server types.

C++ Cookbook <https://arrow.apache.org/cookbook/cpp/flight.html>_ Recipes for using Arrow Flight in C++.

Writing a Flight Service

Servers are subclasses of :class:arrow::flight::FlightServerBase. To implement individual RPCs, override the RPC methods on this class.

.. code-block:: cpp

class MyFlightServer : public FlightServerBase { Status ListFlights(const ServerCallContext& context, const Criteria* criteria, std::unique_ptr<FlightListing>* listings) override { std::vector<FlightInfo> flights = ...; *listings = std::unique_ptr<FlightListing>(new SimpleFlightListing(flights)); return Status::OK(); } };

Each RPC method always takes a :class:arrow::flight::ServerCallContext for common parameters and returns a :class:arrow::Status to indicate success or failure. Flight-specific error codes can be returned via :func:arrow::flight::MakeFlightError.

RPC methods that return a value in addition to a status will use an out parameter, as shown above. Often, there are helper classes providing basic implementations of these out parameters. For instance, above, :class:arrow::flight::SimpleFlightListing uses a vector of :class:arrow::flight::FlightInfo objects as the result of a ListFlights RPC.

To start a server, create a :class:arrow::flight::Location to specify where to listen, and call :func:arrow::flight::FlightServerBase::Init. This will start the server, but won't block the rest of the program. Use :func:arrow::flight::FlightServerBase::SetShutdownOnSignals to enable stopping the server if an interrupt signal is received, then call :func:arrow::flight::FlightServerBase::Serve to block until the server stops.

.. code-block:: cpp

std::unique_ptrarrow::flight::FlightServerBase server; // Initialize server arrow::flight::Location location; // Listen to all interfaces on a free port ARROW_CHECK_OK(arrow::flight::Location::ForGrpcTcp("0.0.0.0", 0, &location)); arrow::flight::FlightServerOptions options(location);

// Start the server ARROW_CHECK_OK(server->Init(options)); // Exit with a clean error code (0) on SIGTERM ARROW_CHECK_OK(server->SetShutdownOnSignals({SIGTERM}));

std::cout << "Server listening on localhost:" << server->port() << std::endl; ARROW_CHECK_OK(server->Serve());

Using the Flight Client

To connect to a Flight service, create an instance of :class:arrow::flight::FlightClient by calling :func:Connect <arrow::flight::FlightClient::Connect>.

Each RPC method returns :class:arrow::Result to indicate the success/failure of the request, and the result object if the request succeeded. Some calls are streaming calls, so they will return a reader and/or a writer object; the final call status isn't known until the stream is completed.

Cancellation and Timeouts

When making a call, clients can optionally provide :class:FlightCallOptions <arrow::flight::FlightCallOptions>. This allows clients to set a timeout on calls or provide custom HTTP headers, among other features. Also, some objects returned by client RPC calls expose a Cancel method which allows terminating a call early.

On the server side, no additional code is needed to implement timeouts. For cancellation, the server needs to manually poll :func:ServerCallContext::is_cancelled <arrow::flight::ServerCallContext::is_cancelled> to check if the client has cancelled the call, and if so, break out of any processing the server is currently doing.

Enabling TLS

TLS can be enabled when setting up a server by providing a certificate and key pair to :func:FlightServerBase::Init <arrow::flight::FlightServerBase::Init>.

On the client side, use :func:Location::ForGrpcTls <arrow::flight::Location::ForGrpcTls> to construct the :class:arrow::flight::Location to listen on.

Enabling Authentication

.. warning:: Authentication is insecure without enabling TLS.

Handshake-based authentication can be enabled by implementing :class:ServerAuthHandler <arrow::flight::ServerAuthHandler> and providing this to the server during construction.

Authentication consists of two parts: on initial client connection, the server and client authentication implementations can perform any negotiation needed. The client authentication handler then provides a token that will be attached to future calls. This is done by calling :func:Authenticate <arrow::flight::FlightClient::Authenticate> with the desired client authentication implementation.

On each RPC thereafter, the client handler's token is automatically added to the call in the request headers. The server authentication handler validates the token and provides the identity of the client. On the server, this identity can be obtained from the :class:arrow::flight::ServerCallContext.

Custom Middleware

Servers and clients support custom middleware (or interceptors) that are called on every request and can modify the request in a limited fashion. These can be implemented by subclassing :class:ServerMiddleware <arrow::flight::ServerMiddleware> and :class:ClientMiddleware <arrow::flight::ClientMiddleware>, then providing them when creating the client or server.

Middleware are fairly limited, but they can add headers to a request/response. On the server, they can inspect incoming headers and fail the request; hence, they can be used to implement custom authentication methods.

.. _flight-best-practices:

Best practices

gRPC

When using the default gRPC transport, options can be passed to it via :member:arrow::flight::FlightClientOptions::generic_options. For example:

.. tab-set::

.. tab-item:: C++

  .. code-block:: cpp

     auto options = FlightClientOptions::Defaults();
     // Set the period after which a keepalive ping is sent on transport.
     options.generic_options.emplace_back(GRPC_ARG_KEEPALIVE_TIME_MS, 60000);

.. tab-item:: Python

  .. code-block:: python

     # Set the period after which a keepalive ping is sent on transport.
     generic_options = [("GRPC_ARG_KEEPALIVE_TIME_MS", 60000)]
     client = pyarrow.flight.FlightClient(server_uri, generic_options=generic_options)

Also see best gRPC practices_ and available gRPC keys_.

Re-use clients whenever possible

Creating and closing clients requires setup and teardown on the client and server side which can take away from actually handling RPCs. Reuse clients whenever possible to avoid this. Note that clients are thread-safe, so a single client can be shared across multiple threads.

Don’t round-robin load balance

Round robin load balancing_ means every client can have an open connection to every server, causing an unexpected number of open connections and depleting server resources.

Debugging connection issues

When facing unexpected disconnects on long running connections use netstat to monitor the number of open connections. If number of connections is much greater than the number of clients it might cause issues.

For debugging, certain environment variables enable logging in gRPC. For example, env GRPC_VERBOSITY=info GRPC_TRACE=http will print the initial headers (on both sides) so you can see if gRPC established the connection or not. It will also print when a message is sent, so you can tell if the connection is open or not.

gRPC may not report connection errors until a call is actually made. Hence, to detect connection errors when creating a client, some sort of dummy RPC should be made.

Memory management

Flight tries to reuse allocations made by gRPC to avoid redundant data copies. However, experience shows that such data is frequently misaligned. Some use cases might require data to have data type-specific alignment (for example, for the data buffer of an Int32 array to be aligned on a 4-byte boundary), which can be enforced by setting :member:arrow::ipc::IpcReadOptions::ensure_alignment to :member:arrow::ipc::Alignment::kDataTypeSpecificAlignment. This uses the :member:arrow::ipc::IpcReadOptions::memory_pool to allocate memory with aligned addresses, but only for mis-aligned data. However, this creates data copies of your data received via Flight.

Unless gRPC data are copied as described above, allocations made by gRPC may not be tracked by the Arrow memory pool, and that memory usage behavior, such as whether free memory is returned to the system, is dependent on the allocator that gRPC uses (usually the system allocator).

A quick way of testing: attach to the process with a debugger and call malloc_trim, or call :func:ReleaseUnused <arrow::MemoryPool::ReleaseUnused> on the system pool. If memory usage drops, then likely, there is memory allocated by gRPC or by the application that the system allocator was holding on to. This can be adjusted in platform-specific ways; see an investigation in ARROW-16697_ for an example of how this works on Linux/glibc. glibc malloc can be explicitly told to dump caches.

Excessive traffic

gRPC will spawn up to max threads quota of threads for concurrent clients. Those threads are not necessarily cleaned up (a "cached thread pool" in Java parlance). glibc malloc clears some per thread state and the default tuning never clears caches in some workloads.

gRPC's default behaviour allows one server to accept many connections from many different clients, but if requests do a lot of work (as they may under Flight), the server may not be able to keep up. Configuring clients to retry with backoff (and potentially connect to a different node), would give more consistent quality of service.

.. tab-set::

.. tab-item:: C++

  .. code-block:: cpp

     auto options = FlightClientOptions::Defaults();
     // Set the minimum time between subsequent connection attempts.
     options.generic_options.emplace_back(GRPC_ARG_MIN_RECONNECT_BACKOFF_MS, 2000);

.. tab-item:: Python

  .. code-block:: python

     # Set the minimum time between subsequent connection attempts.
     generic_options = [("GRPC_ARG_MIN_RECONNECT_BACKOFF_MS", 2000)]
     client = pyarrow.flight.FlightClient(server_uri, generic_options=generic_options)

Limiting DoPut Batch Size

You may wish to limit the maximum batch size a client can submit to a server through DoPut, to prevent a request from taking up too much memory on the server. On the client-side, set :member:arrow::flight::FlightClientOptions::write_size_limit_bytes. On the server-side, set the gRPC option GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH. The client-side option will return an error that can be retried with smaller batches, while the server-side limit will close out the connection. Setting both can be wise, since the former provides a better user experience but the latter may be necessary to defend against impolite clients.

Closing unresponsive connections

  1. A stale call can be closed using :member:arrow::flight::FlightCallOptions::stop_token. This requires recording the stop token at call establishment time.

    .. tab-set::

    .. tab-item:: C++

      .. code-block:: cpp
    
           StopSource stop_source;
           FlightCallOptions options;
           options.stop_token = stop_source.token();
           stop_source.RequestStop(Status::Cancelled("StopSource"));
           flight_client->DoAction(options, {});
    
  2. Use call timeouts. (This is a general gRPC best practice.)

    .. tab-set::

    .. tab-item:: C++

      .. code-block:: cpp
    
         FlightCallOptions options;
         options.timeout = TimeoutDuration{0.2};
         Status status = client->GetFlightInfo(options, FlightDescriptor{}).status();
    

    .. tab-item:: Java

      .. code-block:: java
    
         Iterator<Result> results = client.doAction(new Action("hang"), CallOptions.timeout(0.2, TimeUnit.SECONDS));
    

    .. tab-item:: Python

      .. code-block:: python
    
         options = pyarrow.flight.FlightCallOptions(timeout=0.2)
         result = client.do_action(action, options=options)
    
  3. Client timeouts are not great for long-running streaming calls, where it may be hard to choose a timeout for the entire operation. Instead, what is often desired is a per-read or per-write timeout so that the operation fails if it isn't making progress. This can be implemented with a background thread that calls Cancel() on a timer, with the main thread resetting the timer every time an operation completes successfully. For a fully-worked out example, see the Cookbook.

    .. note:: There is a long standing ticket for a per-write/per-read timeout instead of a per call timeout (ARROW-6062_), but this is not (easily) possible to implement with the blocking gRPC API.

.. _best gRPC practices: https://grpc.io/docs/guides/performance/#general .. _gRPC keys: https://grpc.github.io/grpc/cpp/group__grpc__arg__keys.html .. _Round robin load balancing: https://github.com/grpc/grpc/blob/master/doc/load-balancing.md#round_robin .. _ARROW-15764: https://issues.apache.org/jira/browse/ARROW-15764 .. _ARROW-16697: https://issues.apache.org/jira/browse/ARROW-16697 .. _ARROW-6062: https://issues.apache.org/jira/browse/ARROW-6062 .. _gRPC: https://grpc.io/