docs/source/format/DissociatedIPC.rst
.. Licensed to the Apache Software Foundation (ASF) under one .. or more contributor license agreements. See the NOTICE file .. distributed with this work for additional information .. regarding copyright ownership. The ASF licenses this file .. to you under the Apache License, Version 2.0 (the .. "License"); you may not use this file except in compliance .. with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing, .. software distributed under the License is distributed on an .. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY .. KIND, either express or implied. See the License for the .. specific language governing permissions and limitations .. under the License.
.. _dissociated-ipc:
.. warning::
Experimental: The Dissociated IPC Protocol is experimental in its current
form. Based on feedback and usage the protocol definition may change until
it is fully standardized.
The :ref:Arrow IPC format <format-ipc> describes a protocol for transferring
Arrow data as a stream of record batches. This protocol expects a continuous
stream of bytes divided into discrete messages (using a length prefix and
continuation indicator). Each discrete message consists of two portions:
Flatbuffers_ header messageFor most cases, the existing IPC format as it currently exists is sufficiently efficient:
However, there are use cases that aren't handled by this:
Constructing the IPC record batch message requires allocating a contiguous chunk of bytes and copying all of the data buffers into it, packed together back-to-back. This pessimizes the common case of wrapping existing, directly consumable data into an IPC message.
Even if Arrow data is located in a memory accessible across process boundaries or transports (such as UCX), there is no standard way to specify that shared location to consumers which could take advantage of it.
Arrow data located on a non-CPU device (such as a GPU) cannot be sent using Arrow IPC without having to copy the data back to the host device or copying the Flatbuffers metadata bytes into device memory.
This protocol attempts to solve these use cases in an efficient manner.
Define a generic protocol for passing Arrow IPC data, not tied to any particular
transport, that also allows for utilizing non-CPU device memory, shared memory, and
newer "high performance" transports such as UCX_ or libfabric_.
Allow for using :ref:Flight RPC <flight-rpc> purely for control flow by separating
the stream of IPC metadata from IPC body bytes
.. glossary::
IPC Metadata The Flatbuffers message bytes that encompass the header of an Arrow IPC message
Tag
A little-endian uint64 value used for flow control and used in determining
how to interpret the body of a message. Specific bits can be masked to allow
identifying messages by only a portion of the tag, leaving the rest of the bits
to be used for control flow or other message metadata. Some transports, such as
UCX, have built-in support for such tag values and will provide them in CPU
memory regardless of whether or not the body of the message may reside on a
non-CPU device.
Sequence Number A little-endian, 4-byte unsigned integer starting at 0 for a stream, indicating the sequence order of messages. It is also used to identify specific messages to tie the IPC metadata header to its corresponding body since the metadata and body can be sent across separate pipes/streams/transports.
If a sequence number reaches ``UINT32_MAX``, it should be allowed to roll over as
it is unlikely there would be enough unprocessed messages waiting to be processed
that would cause an overlap of sequence numbers.
The sequence number serves two purposes: To identify corresponding metadata and
tagged body data messages and to ensure we do not rely on messages having to arrive
in order. A client should use the sequence number to correctly order messages as
they arrive for processing.
A reference example implementation utilizing libcudf_ and UCX_ can be found in the
arrow-experiments repo <https://github.com/apache/arrow-experiments/tree/main/dissociated-ipc>_.
A transport implementing this protocol MUST provide two pieces of functionality:
Message sending
Delimited messages (like gRPC) as opposed to non-delimited streams (like plain TCP without further framing).
Alternatively, a framing mechanism like the :ref:encapsulated message format <ipc-message-format>
for the IPC protocol can be used while leaving out the body bytes.
Tagged message sending
When providing a URI to a consumer to contact for use with this protocol (such as via
the :ref:Location URI for Flight <flight-location-uris>), the URI should specify a scheme
like ucx: or fabric:, that is easily identifiable. In addition, the URI should
encode the following URI query parameters:
.. note:: As this protocol matures, this document will get updated with commonly recognized transport schemes that get used with it.
want_data - REQUIRED - uint64 integer value
Ticket in the Flight RPC protocol)free_data - OPTIONAL - uint64 integer value
remote_handle - OPTIONAL - base64-encoded string
When working with shared memory or remote memory, this value indicates any required handle or identifier that is necessary for accessing the memory.
Using UCX, this would be an rkey value
With CUDA IPC, this would be the value of the base GPU pointer or memory handle, and subsequent addresses would be offsets from this base pointer.
Currently this proposal does not specify any way to manage the backpressure of messages to throttle for memory and bandwidth reasons. For now, this will be transport-defined rather than lock into something sub-optimal.
As usage among different transports and libraries grows, common patterns will emerge that will allow for a generic, but efficient, way to handle backpressure across different use cases.
.. note:: While the protocol itself is transport agnostic, the current usage and examples only have been tested using UCX and libfabric transports so far, but that's all.
There are two possibilities that can occur:
.. mermaid:: ./DissociatedIPC/SequenceDiagramSeparate.mmd
.. mermaid:: ./DissociatedIPC/SequenceDiagramSame.mmd
There can be either a single server handling both the IPC Metadata stream and the Body data streams, or separate servers for handling the IPC Metadata and the body data. This allows for streaming of data across either a single transport pipe or two pipes if desired.
Metadata Stream Sequence ''''''''''''''''''''''''
The standing state of the server is waiting for a tagged message with a specific
<want_data> tag value to initiate a transfer. This <want_data> value is defined
by the server and propagated to any clients via the URI they are provided. This protocol
does not prescribe any particular value so that it will not interfere with any other
existing protocols that rely on tag values. The body of that message will contain an
opaque, binary identifier to indicate a particular dataset / data stream to send.
.. note::
For instance, the ticket that was passed with a FlightInfo message would be the body of this message. Because it is opaque, it can be anything the server wants to use. The URI and identifier do not need to be given to the client via Flight RPC, but could come across from any transport or protocol desired.
Upon receiving a <want_data> request, the server should respond by sending a stream
of messages consisting of the following:
.. mermaid::
block-beta columns 8
block:P["\n\n\n\nPrefix"]:5 T["Message type\nByte 0"] S["Sequence number\nBytes 1-4"] end H["Flatbuffer bytes\nRest of the message"]:3
A 5-byte prefix
The first byte of the message indicates the type of message, currently there are only two allowed message types (more types may get added in the future):
the next 4-bytes are a little-endian, unsigned 32-bit integer indicating the sequence number of
the message. The first message in the stream (MUST always be a schema message) MUST
have a sequence number of 0. Each subsequent message MUST increment the number by
1.
The full Flatbuffers bytes of an Arrow IPC header
As defined in the Arrow IPC format, each metadata message can represent a chunk of data or dictionaries for use by the stream of data.
After sending the last metadata message, the server MUST indicate the end of the stream by sending a message consisting of exactly 5 bytes:
0, indicating an End of Stream messageData Stream Sequence ''''''''''''''''''''
If a single server is handling both the data and metadata streams, then the data messages
should begin being sent to the client in parallel with the metadata messages. Otherwise,
as with the metadata sequence, the standing state of the server is to wait for a tagged
message with the <want_data> tag value, whose body indicates the dataset / data stream
to send to the client.
For each IPC message in the stream of data, a tagged message MUST be sent on the data
stream if that message has a body (i.e. a Record Batch or Dictionary message). The
:term:tag <Tag> for each message should be structured as follows:
.. mermaid::
block-beta columns 8
S["Sequence number\nBytes 0-3"]:4 U["Unused (Reserved)\nBytes 4-6"]:3 T["Message type\nByte 7"]:1
The least significant 4-bytes (bits 0 - 31) of the tag should be the unsigned 32-bit, little-endian sequence number of the message.
The most significant byte (bits 56 - 63) of the tag indicates the message body type as an 8-bit unsigned integer. Currently only two message types are specified, but more can be added as needed to expand the protocol:
The body contains the raw body buffer bytes as a packed buffer (i.e. the standard IPC format body bytes)
The body contains a series of unsigned, little-endian 64-bit integer pairs to represent either shared or remote memory, schematically structured as
The first two integers (e.g. the first 16 bytes) represent the total size (in bytes)
of all buffers and the number of buffers in this message (and thus the number of following
pairs of uint64)
Each subsequent pair of uint64 values are an address / offset followed the length of
that particular buffer.
All unspecified bits (bits 32 - 55) of the tag are reserved for future use by potential updates to this protocol. For now they MUST be 0.
.. note::
Any shared/remote memory addresses that are sent across MUST be kept alive by the server
until a corresponding tagged <free_data> message is received. If the client disconnects
before sending any <free_data> messages, it can be assumed to be safe to clean up the memory
if desired by the server.
After sending the last tagged IPC body message, the server should maintain the connection and wait
for tagged <free_data> messages. The structure of these <free_data> messages is simple:
one or more unsigned, little-endian 64-bit integers which indicate the addresses/offsets that can
be freed.
Once there are no more outstanding addresses to be freed, the work for this stream is complete.
A client for this protocol needs to concurrently handle both the data and metadata streams of messages which may either both come from the same server or different servers. Below is a flowchart showing how a client might handle the metadata and data streams:
.. mermaid:: ./DissociatedIPC/ClientFlowchart.mmd
#. First the client sends a tagged message using the <want_data> value it was provided in the
URI as the tag, and the opaque ID as the body.
<want_data> message needs to be sent
separately to each.#. For each untagged message the client receives in the metadata stream:
The first byte of the message indicates whether it is an End of Stream message (value 0)
or a metadata message (value 1).
The next 4 bytes are the sequence number of the message, an unsigned 32-bit integer in little-endian byte order.
If it is not an End of Stream message, the remaining bytes are the IPC Flatbuffer bytes which can be interpreted as normal.
If it is an End of Stream message, then it is safe to close the metadata connection if there are no gaps in the sequence numbers received.
#. When a metadata message that requires a body is received, the tag mask of 0x00000000FFFFFFFF should
be used alongside the sequence number to match the message regardless of the higher bytes (e.g. we only
care about matching the lower 4 bytes to the sequence number)
Once received, the Most Significant Byte's value determines how the client processes the body data:
If the most significant byte is 0: Then the body of the message is the raw IPC packed body buffers allowing it to easily be processed with the corresponding metadata header bytes.
If the most significant byte is 1: The body of the message will consist of a series of pairs of unsigned, 64-bit integers in little-endian byte order.
The first two integers represent 1) the total size of all the body buffers together to allow
for easy allocation if an intermediate buffer is needed and 2) the number of buffers being sent (nbuf).
The rest of the message will be nbuf pairs of integers, one for each buffer. Each pair is
1) the address / offset of the buffer and 2) the length of that buffer. Memory can then be retrieved
via shared or remote memory routines based on the underlying transport. These addresses / offsets MUST
be retained so they can be sent back in <free_data> messages later, indicating to the server that
the client no longer needs the shared memory.
#. Once an End of Stream message is received, the client should process any remaining un-processed IPC metadata messages.
#. After individual memory addresses / offsets are able to be freed by the remote server (in the case where
it has sent these rather than the full body bytes), the client should send corresponding <free_data> messages
to the server.
<free_data> message consists of an arbitrary number of unsigned 64-bit integer values, representing
the addresses / offsets which can be freed. The reason for it being an arbitrary number is to allow a client
to choose whether to send multiple messages to free multiple addresses or to coalesce multiple addresses into
fewer messages to be freed (thus making the protocol less "chatty" if desired)If you decide to try this protocol in your own environments and system, we'd love feedback and to learn about your use case. As this is an experimental protocol currently, we need real-world usage in order to facilitate improving it and finding the right generalizations to standardize on across transports.
Please chime in using the Arrow Developers Mailing list: https://arrow.apache.org/community/#mailing-lists
.. _Flatbuffers: http://github.com/google/flatbuffers .. _UCX: https://openucx.org/ .. _libfabric: https://ofiwg.github.io/libfabric/ .. _libcudf: https://docs.rapids.ai/api