doc/source/ray-core/direct-transport.rst
.. _direct-transport:
Ray Direct Transport (RDT)
Ray objects are normally stored in Ray's CPU-based object store and copied and deserialized when accessed by a Ray task or actor.
For GPU data specifically, this can lead to unnecessary and expensive data transfers.
For example, passing a CUDA torch.Tensor from one Ray task to another would require a copy from GPU to CPU memory, then back again to GPU memory.
Ray Direct Transport (RDT) is a new feature that allows Ray to store and pass objects directly between Ray actors.
This feature augments the familiar Ray :class:ObjectRef <ray.ObjectRef> API by:
Gloo <https://github.com/pytorch/gloo>__ or NCCL <https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/index.html>) or point-to-point RDMA (via NVIDIA's NIXL <https://github.com/ai-dynamo/nixl>) to transfer data directly between devices, including both CPU and GPUs.. note::
RDT is currently in alpha and doesn't support all Ray Core APIs yet. Future releases may introduce breaking API changes. See the :ref:limitations <limitations> section for more details.
.. tip::
RDT currently supports torch.Tensor objects created by Ray actor tasks. Other datatypes and Ray non-actor tasks may be supported in future releases.
This walkthrough will show how to create and use RDT with different tensor transports, i.e. the mechanism used to transfer the tensor between actors. Currently, RDT supports the following tensor transports:
Gloo <https://github.com/pytorch/gloo>__: A collective communication library for PyTorch and CPUs.NVIDIA NCCL <https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/index.html>__: A collective communication library for NVIDIA GPUs.NVIDIA NIXL <https://github.com/ai-dynamo/nixl>__ (backed by UCX <https://github.com/openucx/ucx>__): A library for accelerating point-to-point transfers via RDMA, especially between various types of memory and NVIDIA GPUs.For ease of following along, we'll start with the Gloo <https://github.com/pytorch/gloo>__ transport, which can be used without any physical GPUs.
.. _direct-transport-gloo:
Installation ^^^^^^^^^^^^
.. note:: Under construction.
Walkthrough ^^^^^^^^^^^
To get started, define an actor class and a task that returns a torch.Tensor:
.. literalinclude:: doc_code/direct_transport_gloo.py :language: python :start-after: normal_example_start :end-before: normal_example_end
As written, when the torch.Tensor is returned, it will be copied into Ray's CPU-based object store.
For CPU-based tensors, this can require an expensive step to copy and serialize the object, while GPU-based tensors additionally require a copy to and from CPU memory.
To enable RDT, use the tensor_transport option in the :func:@ray.method <ray.method> decorator.
.. literalinclude:: doc_code/direct_transport_gloo.py :language: python :start-after: gloo_example_start :end-before: gloo_example_end
This decorator can be added to any actor tasks that return a torch.Tensor, or that return torch.Tensors nested inside other Python objects.
Adding this decorator will change Ray's behavior in the following ways:
ray.ObjectRef is passed to another task, Ray will use Gloo to transfer the tensor to the destination task.Note that for (2) to work, the :func:@ray.method(tensor_transport) <ray.method> decorator only needs to be added to the actor task that returns the tensor. It should not be added to actor tasks that consume the tensor (unless those tasks also return tensors).
Also, for (2) to work, we must first create a collective group of actors.
Creating a collective group ^^^^^^^^^^^^^^^^^^^^^^^^^^^
To create a collective group for use with RDT:
ray.experimental.collective.create_collective_group <ray.experimental.collective.create_collective_group> function. The backend specified must match the tensor_transport used in the :func:@ray.method <ray.method> decorator.Here is an example:
.. literalinclude:: doc_code/direct_transport_gloo.py :language: python :start-after: gloo_group_start :end-before: gloo_group_end
The actors can now communicate directly via gloo.
The group can also be destroyed using the :func:ray.experimental.collective.destroy_collective_group <ray.experimental.collective.destroy_collective_group> function.
After calling this function, a new collective group can be created on the same actors.
Passing objects to other actors ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Now that we have a collective group, we can create and pass RDT objects between the actors. Here is a full example:
.. literalinclude:: doc_code/direct_transport_gloo.py :language: python :start-after: gloo_full_example_start :end-before: gloo_full_example_end
When the :class:ray.ObjectRef is passed to another task, Ray will use Gloo to transfer the tensor directly from the source actor to the destination actor instead of the default object store.
Note that the :func:@ray.method(tensor_transport) <ray.method> decorator is only added to the actor task that returns the tensor; once this hint has been added, the receiving actor task receiver.sum will automatically use Gloo to receive the tensor.
In this example, because MyActor.sum does not have the :func:@ray.method(tensor_transport) <ray.method> decorator, it will use the default Ray object store transport to return torch.sum(tensor).
RDT also supports passing tensors nested inside Python data structures, as well as actor tasks that return multiple tensors, like in this example:
.. literalinclude:: doc_code/direct_transport_gloo.py :language: python :start-after: gloo_multiple_tensors_example_start :end-before: gloo_multiple_tensors_example_end
Passing RDT objects to the actor that produced them ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
RDT :class:ray.ObjectRefs <ray.ObjectRef> can also be passed to the actor that produced them.
This avoids any copies and just provides a reference to the same torch.Tensor that was previously created.
For example:
.. literalinclude:: doc_code/direct_transport_gloo.py :language: python :start-after: gloo_intra_actor_start :end-before: gloo_intra_actor_end
.. note::
Ray only keeps a reference to the tensor created by the user, so the tensor objects are mutable.
If sender.sum were to modify the tensor in the above example, the changes would also be seen by receiver.sum.
This differs from the normal Ray Core API, which always makes an immutable copy of data returned by actors.
ray.get
^^^^^^^^^^^
The :func:ray.get <ray.get> function can also be used as usual to retrieve the result of an RDT object. However, :func:ray.get <ray.get> will by default use the same tensor transport as the one specified in the :func:@ray.method <ray.method> decorator. For collective-based transports, this will not work if the caller is not part of the collective group.
Therefore, users need to specify the Ray object store as the tensor transport explicitly by setting _use_object_store in :func:ray.get <ray.get>.
.. literalinclude:: doc_code/direct_transport_gloo.py :language: python :start-after: gloo_get_start :end-before: gloo_get_end
Object mutability ^^^^^^^^^^^^^^^^^
Unlike objects in the Ray object store, RDT objects are mutable, meaning that Ray only holds a reference to the tensor and will not copy it until a transfer is requested. This means that if the actor that returns a tensor also keeps a reference to the tensor, and the actor later modifies it in place while Ray is still storing the tensor reference, it's possible that some or all of the changes may be seen by receiving actors.
Here is an example of what can go wrong:
.. literalinclude:: doc_code/direct_transport_gloo.py :language: python :start-after: gloo_wait_tensor_freed_bad_start :end-before: gloo_wait_tensor_freed_bad_end
In this example, the sender actor returns a tensor to Ray, but it also keeps a reference to the tensor in its local state.
Then, in sender.increment_and_sum_stored_tensor, the sender actor modifies the tensor in place while Ray is still holding the tensor reference.
Then, the receiver.increment_and_sum task receives the modified tensor instead of the original, so the assertion fails.
To fix this kind of error, use the :func:ray.experimental.wait_tensor_freed <ray.experimental.wait_tensor_freed> function to wait for Ray to release all references to the tensor, so that the actor can safely write to the tensor again.
:func:wait_tensor_freed <ray.experimental.wait_tensor_freed> will unblock once all tasks that depend on the tensor have finished executing and all corresponding ObjectRefs have gone out of scope.
Ray tracks tasks that depend on the tensor by keeping track of which tasks take the ObjectRef corresponding to the tensor as an argument.
Here's a fixed version of the earlier example.
.. literalinclude:: doc_code/direct_transport_gloo.py :language: python :start-after: gloo_wait_tensor_freed_start :end-before: gloo_wait_tensor_freed_end
The main changes are:
sender calls :func:wait_tensor_freed <ray.experimental.wait_tensor_freed> before modifying the tensor in place.ray.get <ray.get> because :func:wait_tensor_freed <ray.experimental.wait_tensor_freed> blocks until all ObjectRefs pointing to the tensor are freed, so calling :func:ray.get <ray.get> here would cause a deadlock.del tensor to release its reference to the tensor. Again, this is necessary because :func:wait_tensor_freed <ray.experimental.wait_tensor_freed> blocks until all ObjectRefs pointing to the tensor are freed.When an RDT ObjectRef is passed back to the same actor that produced it, Ray passes back a reference to the tensor instead of a copy. Therefore, the same kind of bug can occur.
To help catch such cases, Ray will print a warning if an RDT object is passed to the actor that produced it and a different actor, like so:
.. literalinclude:: doc_code/direct_transport_gloo.py :language: python :start-after: gloo_object_mutability_warning_start :end-before: gloo_object_mutability_warning_end
RDT requires just a few lines of code change to switch tensor transports. Here is the :ref:Gloo example <direct-transport-gloo>, modified to use NVIDIA GPUs and the NCCL <https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/index.html>__ library for collective GPU communication.
.. literalinclude:: doc_code/direct_transport_nccl.py :language: python :start-after: nccl_full_example_start :end-before: nccl_full_example_end
The main code differences are:
@ray.method <ray.method> uses tensor_transport="nccl" instead of tensor_transport="gloo".ray.experimental.collective.create_collective_group <ray.experimental.collective.create_collective_group> function is used to create a collective group..cuda() method.Installation ^^^^^^^^^^^^
For maximum performance, run the install_gdrcopy.sh <https://github.com/ray-project/ray/blob/master/doc/tools/install_gdrcopy.sh>__ script (e.g., install_gdrcopy.sh "${GDRCOPY_OS_VERSION}" "12.8" "x64"). You can find available OS versions here <https://developer.download.nvidia.com/compute/redist/gdrcopy/CUDA%2012.8/>__. If gdrcopy is not installed, things will still work with a plain pip install nixl, just with lower performance. nixl and ucx are installed as dependencies via pip.
Walkthrough ^^^^^^^^^^^
NIXL can transfer data between different devices, including CPUs and NVIDIA GPUs, but doesn't require a collective group to be created ahead of time. This means that any actor that has NIXL installed in its environment can be used to create and pass an RDT object.
Otherwise, the usage is the same as in the :ref:Gloo example <direct-transport-gloo>.
Here is an example showing how to use NIXL to transfer an RDT object between two actors:
.. literalinclude:: doc_code/direct_transport_nixl.py :language: python :start-after: nixl_full_example_start :end-before: nixl_full_example_end
Compared to the :ref:Gloo example <direct-transport-gloo>, the main code differences are:
@ray.method <ray.method> uses tensor_transport="nixl" instead of tensor_transport="gloo".ray.put and ray.get with NIXL ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Unlike the collective-based tensor transports (Gloo and NCCL), the :func:ray.get <ray.get> function can use NIXL to retrieve a copy of the result.
By default, the tensor transport for :func:ray.get <ray.get> will be the one specified in the :func:@ray.method <ray.method> decorator.
.. literalinclude:: doc_code/direct_transport_nixl.py :language: python :start-after: nixl_get_start :end-before: nixl_get_end
You can also use NIXL to retrieve the result from references created by :func:ray.put <ray.put>.
.. literalinclude:: doc_code/direct_transport_nixl.py :language: python :start-after: nixl_put__and_get_start :end-before: nixl_put__and_get_end
RDT allows Ray to store and pass objects directly between Ray actors, using accelerated transports like GLOO, NCCL, and NIXL. Here are the main points to keep in mind:
For a full list of limitations, see the :ref:limitations <limitations> section.
.. note:: Under construction.
.. _limitations:
RDT is currently in alpha and currently has the following limitations, which may be addressed in future releases:
torch.Tensor objects only.asyncio <https://docs.python.org/3/library/asyncio.html>. Follow the tracking issue <https://github.com/ray-project/ray/issues/56398> for updates.For collective-based tensor transports (Gloo and NCCL):
ray.ObjectRefs <ray.ObjectRef> to other Ray tasks or actors. Instead, the :class:ray.ObjectRef\s can only be passed as direct arguments to other actor tasks, and those actors must be in the same collective group.ray.put <ray.put>.Due to a known issue, for NIXL, we currently do not support storing different GPU objects at the same actor, where the objects contain an overlapping but not equal set of tensors. To support this pattern, ensure that the first ObjectRef has gone out of scope before storing the same tensor(s) again in a second object.
.. literalinclude:: doc_code/direct_transport_nixl.py :language: python :start-after: nixl_limitations_start :end-before: nixl_limitations_end
Application-level errors, i.e. exceptions raised by user code, will not destroy the collective group and will instead be propagated to any dependent task(s), as for non-RDT Ray objects.
If a system-level error occurs during a GLOO or NCCL collective operation, the collective group will be destroyed and the actors will be killed to prevent any hanging.
If a system-level error occurs during a NIXL transfer, Ray or NIXL will abort the transfer with an exception and Ray will raise the exception in the dependent task or on the ray.get on the NIXL ref.
System-level errors include:
RAY_rdt_fetch_fail_timeout_milliseconds environment variable)Ray allows users to register new tensor transports at runtime for use with RDT.
To implement a new tensor transport, implement the abstract interface :class:ray.experimental.TensorTransportManager <ray.experimental.TensorTransportManager>
defined in tensor_transport_manager.py <https://github.com/ray-project/ray/blob/master/python/ray/experimental/rdt/tensor_transport_manager.py>__.
Then call register_tensor_transport <ray.experimental.register_tensor_transport> with the transport name, supported devices for the transport,
the class that implements TensorTransportManager, and the data type for the transport. Note that you have to register from the same process in which you create the actor you want
to use the transport with, and actors only have access to transports registered before their creation.
.. code-block:: python
import sys import torch import ray from ray.experimental import ( register_tensor_transport, TensorTransportManager, TensorTransportMetadata, CommunicatorMetadata, )
@dataclass class CustomTransportMetadata(TensorTransportMetadata): pass
@dataclass class CustomCommunicatorMetadata(CommunicatorMetadata): pass
class CustomTransport(TensorTransportManager): ...
register_tensor_transport("CUSTOM", ["cuda", "cpu"], CustomTransport, torch.Tensor)
Note that there are currently some limitations with custom transports:
.. note:: Under construction.