doc/source/ray-more-libs/ray-collective.rst
.. This part of the docs is generated from the ray.util.collective readme using m2r To update:
m2r RAY_ROOT/python/ray/util/collective/README.md.. _ray-collective:
The Ray collective communication library (\ ray.util.collective\ ) offers a set of native collective primitives for
communication between distributed CPUs or GPUs.
Ray collective communication library
See below the current support matrix for all collective calls with different backends.
.. list-table:: :header-rows: 1
torch.distributed.gloo <https://pytorch.org/docs/stable/distributed.html#gloo>_nccl <https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/index.html>_torch.Tensornumpy.ndarraycupy.ndarrayInstallation and Importing ^^^^^^^^^^^^^^^^^^^^^^^^^^
Ray collective library is bundled with the released Ray wheel. Besides Ray, users need to install either torch <https://pytorch.org/get-started/locally/>_
or cupy <https://docs.cupy.dev/en/stable/install.html>_ in order to use collective communication with the GLOO (torch.distributed.gloo) and NCCL backend, respectively.
.. code-block:: python
pip install torch pip install cupy-cudaxxx # replace xxx with the right cuda version in your environment
To use these APIs, import the collective package in your actor/task or driver code via:
.. code-block:: python
import ray.util.collective as col
Initialization ^^^^^^^^^^^^^^
Collective functions operate on collective groups. A collective group contains a number of processes (in Ray, they are usually Ray-managed actors or tasks) that will together enter the collective function calls. Before making collective calls, users need to declare a set of actors/tasks, statically, as a collective group.
Below is an example code snippet that uses the two APIs init_collective_group() and create_collective_group() to initialize collective groups among a few
remote actors. Refer to APIs <#api-reference>_ for the detailed descriptions of the two APIs.
.. code-block:: python
import ray import ray.util.collective as collective
import cupy as cp
@ray.remote(num_gpus=1) class Worker: def init(self): self.send = cp.ones((4, ), dtype=cp.float32) self.recv = cp.zeros((4, ), dtype=cp.float32)
def setup(self, world_size, rank):
collective.init_collective_group(world_size, rank, "nccl", "default")
return True
def compute(self):
collective.allreduce(self.send, "default")
return self.send
def destroy(self):
collective.destroy_group()
num_workers = 2 workers = [] init_rets = [] for i in range(num_workers): w = Worker.remote() workers.append(w) init_rets.append(w.setup.remote(num_workers, i)) _ = ray.get(init_rets) results = ray.get([w.compute.remote() for w in workers])
for i in range(num_workers): w = Worker.remote() workers.append(w) _options = { "group_name": "177", "world_size": 2, "ranks": [0, 1], "backend": "nccl" } collective.create_collective_group(workers, **_options) results = ray.get([w.compute.remote() for w in workers])
Note that for the same set of actors/task processes, multiple collective groups can be constructed, with group_name as their unique identifier.
This enables specifying complex communication patterns between different (sub)set of processes.
Collective Communication ^^^^^^^^^^^^^^^^^^^^^^^^
Check the support matrix <#collective-primitives-support-matrix>_ for the current status of supported collective calls and backends.
Note that the current set of collective communication APIs are imperative, and exhibit the following behaviours:
An example of using ray.util.collective.allreduce is below:
.. code-block:: python
import ray import cupy import ray.util.collective as col
@ray.remote(num_gpus=1) class Worker: def init(self): self.buffer = cupy.ones((10,), dtype=cupy.float32)
def compute(self):
col.allreduce(self.buffer, "default")
return self.buffer
A = Worker.remote() B = Worker.remote()
ray.get([A.compute.remote(), B.compute.remote()])
Point-to-point Communication ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ray.util.collective also supports P2P send/recv communication between processes.
The send/recv exhibits the same behavior with the collective functions: they are synchronous blocking calls -- a pair of send and recv must be called together on paired processes in order to specify the entire communication, and must successfully rendezvous with each other to proceed. See the code example below:
.. code-block:: python
import ray import cupy import ray.util.collective as col
@ray.remote(num_gpus=1) class Worker: def init(self): self.buffer = cupy.ones((10,), dtype=cupy.float32)
def get_buffer(self):
return self.buffer
def do_send(self, target_rank=0):
# this call is blocking
col.send(target_rank)
def do_recv(self, src_rank=0):
# this call is blocking
col.recv(src_rank)
def do_allreduce(self):
# this call is blocking as well
col.allreduce(self.buffer)
return self.buffer
A = Worker.remote() B = Worker.remote()
col.create_collective_group([A, B], options={rank=[0, 1], ...})
ray.get([A.do_send.remote(target_rank=1), B.do_recv.remote(src_rank=0)])
ray.get([A.do_send.remote(target_rank=1)])
Single-GPU and Multi-GPU Collective Primitives ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
In many cluster setups, a machine usually has more than 1 GPU;
effectively leveraging the GPU-GPU bandwidth, such as NVLINK <https://www.nvidia.com/en-us/data-center/nvlink/>_\ ,
can significantly improve communication performance.
ray.util.collective supports multi-GPU collective calls, in which case, a process (actor/tasks) manages more than 1 GPU (e.g., via ray.remote(num_gpus=4)\ ).
Using these multi-GPU collective functions are normally more performance-advantageous than using single-GPU collective API
and spawning the number of processes equal to the number of GPUs.
See the API references for the signatures of multi-GPU collective APIs.
Also of note that all multi-GPU APIs are with the following restrictions:
An example code utilizing the multi-GPU collective APIs is provided below:
.. code-block:: python
import ray import ray.util.collective as collective
import cupy as cp from cupy.cuda import Device
@ray.remote(num_gpus=2) class Worker: def init(self): with Device(0): self.send1 = cp.ones((4, ), dtype=cp.float32) with Device(1): self.send2 = cp.ones((4, ), dtype=cp.float32) * 2 with Device(0): self.recv1 = cp.ones((4, ), dtype=cp.float32) with Device(1): self.recv2 = cp.ones((4, ), dtype=cp.float32) * 2
def setup(self, world_size, rank):
self.rank = rank
collective.init_collective_group(world_size, rank, "nccl", "177")
return True
def allreduce_call(self):
collective.allreduce_multigpu([self.send1, self.send2], "177")
return [self.send1, self.send2]
def p2p_call(self):
if self.rank == 0:
collective.send_multigpu(self.send1 * 2, 1, 1, "8")
else:
collective.recv_multigpu(self.recv2, 0, 0, "8")
return self.recv2
num_workers = 2 workers = [] init_rets = [] for i in range(num_workers): w = Worker.remote() workers.append(w) init_rets.append(w.setup.remote(num_workers, i)) a = ray.get(init_rets) results = ray.get([w.allreduce_call.remote() for w in workers]) results = ray.get([w.p2p_call.remote() for w in workers])
The following links provide helpful resources on how to efficiently leverage the ray.util.collective library.
More running examples <https://github.com/ray-project/ray/tree/master/python/ray/util/collective/examples>_ under ray.util.collective.examples.Scaling up the spaCy Named Entity Recognition (NER) pipeline <https://github.com/explosion/spacy-ray>_ using Ray collective library.Implementing the AllReduce strategy <https://github.com/ray-project/distml/blob/master/distml/strategy/allreduce_strategy.py>_ for data-parallel distributed ML training... automodule:: ray.util.collective.collective :members: