doc/source/ray-core/scheduling/placement-group.rst
.. _ray-placement-group-doc-ref:
Placement groups allow users to atomically reserve groups of resources across multiple nodes, a concept commonly known as gang scheduling. After atomically reserving resources, you can use placement groups to schedule Ray tasks and actors packed together for locality (PACK), or spread apart (SPREAD). Placement groups are generally used for gang-scheduling actors, but also support tasks.
Here are some real-world use cases:
Ray Train <train-docs> and :ref:Ray Tune <tune-main>) uses the placement group APIs to enable gang scheduling. In these settings, all resources for a trial must be available at the same time. Gang scheduling is a critical technique to enable all-or-nothing scheduling for deep learning training.Bundles
A **bundle** is a collection of "resources." It could be a single resource, ``{"CPU": 1}``, or a group of resources, ``{"CPU": 1, "GPU": 4}``.
A bundle is a unit of reservation for placement groups. "Scheduling a bundle" means we find a node that fits the bundle and reserve the resources specified by the bundle.
A bundle must be able to fit on a single node on the Ray cluster. For example, if you have an 8 CPU node and a 1 CPU node and want to schedule a bundle that requires ``{"CPU": 9}``,
Ray can't schedule the ``{"CPU": 9}``, because there's no single node with 9 CPU's.
Placement Group
A placement group reserves the resources from the cluster. Tasks or actors must use the :ref:PlacementGroupSchedulingStrategy <ray-placement-group-schedule-tasks-actors-ref> to use the reserved resources.
{"CPU": 1} * 4 means you'd like to reserve 4 bundles, each with 1 CPU.placement strategies <pgroup-strategy> across nodes on the cluster.You can create a placement group using :func:ray.util.placement_group.
Placement groups take in a list of bundles and a :ref:placement strategy <pgroup-strategy>.
Bundles are specified by a list of dictionaries, e.g., [{"CPU": 1}, {"CPU": 1, "GPU": 1}]).
CPU corresponds to num_cpus as used in :func:ray.remote <ray.remote>.GPU corresponds to num_gpus as used in :func:ray.remote <ray.remote>.memory corresponds to memory as used in :func:ray.remote <ray.remote>resources as used in :func:ray.remote <ray.remote> (E.g., ray.init(resources={"disk": 1}) can have a bundle of {"disk": 1}).Placement group scheduling is asynchronous. The ray.util.placement_group returns immediately.
.. tab-set::
.. tab-item:: Python
.. literalinclude:: ../doc_code/placement_group_example.py
:language: python
:start-after: __create_pg_start__
:end-before: __create_pg_end__
.. tab-item:: Java
.. code-block:: java
// Initialize Ray.
Ray.init();
// Construct a list of bundles.
Map<String, Double> bundle = ImmutableMap.of("CPU", 1.0);
List<Map<String, Double>> bundles = ImmutableList.of(bundle);
// Make a creation option with bundles and strategy.
PlacementGroupCreationOptions options =
new PlacementGroupCreationOptions.Builder()
.setBundles(bundles)
.setStrategy(PlacementStrategy.STRICT_SPREAD)
.build();
PlacementGroup pg = PlacementGroups.createPlacementGroup(options);
.. tab-item:: C++
.. code-block:: c++
// Initialize Ray.
ray::Init();
// Construct a list of bundles.
std::vector<std::unordered_map<std::string, double>> bundles{{{"CPU", 1.0}}};
// Make a creation option with bundles and strategy.
ray::internal::PlacementGroupCreationOptions options{
false, "my_pg", bundles, ray::internal::PlacementStrategy::PACK};
ray::PlacementGroup pg = ray::CreatePlacementGroup(options);
You can block your program until the placement group is ready using one of two APIs:
ready <ray.util.placement_group.PlacementGroup.ready>, which is compatible with ray.getwait <ray.util.placement_group.PlacementGroup.wait>, which blocks the program until the placement group is ready).. tab-set::
.. tab-item:: Python
.. literalinclude:: ../doc_code/placement_group_example.py
:language: python
:start-after: __ready_pg_start__
:end-before: __ready_pg_end__
.. tab-item:: Java
.. code-block:: java
// Wait for the placement group to be ready within the specified time(unit is seconds).
boolean ready = pg.wait(60);
Assert.assertTrue(ready);
// You can look at placement group states using this API.
List<PlacementGroup> allPlacementGroup = PlacementGroups.getAllPlacementGroups();
for (PlacementGroup group: allPlacementGroup) {
System.out.println(group);
}
.. tab-item:: C++
.. code-block:: c++
// Wait for the placement group to be ready within the specified time(unit is seconds).
bool ready = pg.Wait(60);
assert(ready);
// You can look at placement group states using this API.
std::vector<ray::PlacementGroup> all_placement_group = ray::GetAllPlacementGroups();
for (const ray::PlacementGroup &group : all_placement_group) {
std::cout << group.GetName() << std::endl;
}
Let's verify the placement group is successfully created.
.. code-block:: bash
pip install "ray[default]"ray list placement-groups
.. code-block:: bash
Total: 1
PLACEMENT_GROUP_ID NAME CREATOR_JOB_ID STATE
0 3cd6174711f47c14132155039c0501000000 01000000 CREATED
The placement group is successfully created. Out of the {"CPU": 2, "GPU": 2} resources, the placement group reserves {"CPU": 1, "GPU": 1}.
The reserved resources can only be used when you schedule tasks or actors with a placement group.
The diagram below demonstrates the "1 CPU and 1 GPU" bundle that the placement group reserved.
.. image:: ../images/pg_image_1.png :align: center
Ray creates placement groups atomically. If a bundle can't fit in any of the current nodes, Ray reserves no resources for the placement group.
To illustrate this, you can create another placement group with these two bundles {"CPU":1}, {"GPU": 2}.
.. tab-set::
.. tab-item:: Python
.. literalinclude:: ../doc_code/placement_group_example.py
:language: python
:start-after: __create_pg_failed_start__
:end-before: __create_pg_failed_end__
You can verify the new placement group is pending creation.
.. code-block:: bash
pip install "ray[default]"ray list placement-groups
.. code-block:: bash
Total: 2
PLACEMENT_GROUP_ID NAME CREATOR_JOB_ID STATE
0 3cd6174711f47c14132155039c0501000000 01000000 CREATED 1 e1b043bebc751c3081bddc24834d01000000 01000000 PENDING <---- the new placement group.
You can also verify that the {"CPU": 1, "GPU": 2} bundles can't be allocated, using the ray status CLI command.
.. code-block:: bash
ray status
.. code-block:: bash
Usage: 0.0/2.0 CPU (0.0 used of 1.0 reserved in placement groups) 0.0/2.0 GPU (0.0 used of 1.0 reserved in placement groups) 0B/3.46GiB memory 0B/1.73GiB object_store_memory
Demands: {'CPU': 1.0} * 1, {'GPU': 2.0} * 1 (PACK): 1+ pending placement groups <--- 1 placement group is pending creation.
The current cluster has {"CPU": 2, "GPU": 2}. We already created a {"CPU": 1, "GPU": 1} bundle, so the cluster only has 1 CPU and 1 GPU left.
If you try to schedule a placement group with these 2 bundles {"CPU": 1}, {"GPU": 2}, Ray doesn't create the placement group and doesn't reserve any resources,
including the {"CPU": 1} bundle.
.. image:: ../images/pg_image_2.png :align: center
When the placement group cannot be scheduled in any way, it is called "infeasible".
Imagine you schedule {"CPU": 4} bundle, but you only have a single node with 2 CPUs. There's no way to create this bundle in your cluster.
The Ray Autoscaler is aware of placement groups, and auto-scales the cluster to ensure pending groups can be placed as needed.
If the Autoscaler can't provide resources to schedule a placement group, Ray does not print a warning about infeasible groups and tasks and actors that use the groups.
You can observe the scheduling state of the placement group from the :ref:dashboard or state APIs <ray-placement-group-observability-ref>.
.. note:: When a placement group with GPUs is reserved successfully, the bundles are not necessarily ordered by GPU physical rank. That is, adjacent bundles don't necessarily map to adjacent physical GPUs.
.. _ray-placement-group-schedule-tasks-actors-ref:
In the previous section, we created a placement group that reserved {"CPU": 1, "GPU: 1"} from a 2 CPU and 2 GPU node.
Now let's schedule an actor to the placement group.
You can schedule actors or tasks to a placement group using
:class:options(scheduling_strategy=PlacementGroupSchedulingStrategy(...)) <ray.util.scheduling_strategies.PlacementGroupSchedulingStrategy>.
.. tab-set::
.. tab-item:: Python
.. literalinclude:: ../doc_code/placement_group_example.py
:language: python
:start-after: __schedule_pg_start__
:end-before: __schedule_pg_end__
.. tab-item:: Java
.. code-block:: java
public static class Counter {
private int value;
public Counter(int initValue) {
this.value = initValue;
}
public int getValue() {
return value;
}
public static String ping() {
return "pong";
}
}
// Create GPU actors on a gpu bundle.
for (int index = 0; index < 1; index++) {
Ray.actor(Counter::new, 1)
.setPlacementGroup(pg, 0)
.remote();
}
.. tab-item:: C++
.. code-block:: c++
class Counter {
public:
Counter(int init_value) : value(init_value){}
int GetValue() {return value;}
std::string Ping() {
return "pong";
}
private:
int value;
};
// Factory function of Counter class.
static Counter *CreateCounter() {
return new Counter();
};
RAY_REMOTE(&Counter::Ping, &Counter::GetValue, CreateCounter);
// Create GPU actors on a gpu bundle.
for (int index = 0; index < 1; index++) {
ray::Actor(CreateCounter)
.SetPlacementGroup(pg, 0)
.Remote(1);
}
.. note::
By default, Ray actors require 1 logical CPU at schedule time, but after being scheduled, they do not acquire any CPU resources. In other words, by default, actors cannot get scheduled on a zero-cpu node, but an infinite number of them can run on any non-zero cpu node. Thus, when scheduling an actor with the default resource requirements and a placement group, the placement group has to be created with a bundle containing at least 1 CPU (since the actor requires 1 CPU for scheduling). However, after the actor is created, it doesn't consume any placement group resources.
To avoid any surprises, always specify resource requirements explicitly for actors. If resources are specified explicitly, they are required both at schedule time and at execution time.
The actor is scheduled now! One bundle can be used by multiple tasks and actors (i.e., the bundle to task (or actor) is a one-to-many relationship).
In this case, since the actor uses 1 CPU, 1 GPU remains from the bundle.
You can verify this from the CLI command ray status. You can see the 1 CPU is reserved by the placement group, and 1.0 is used (by the actor we created).
.. code-block:: bash
ray status
.. code-block:: bash
Usage: 1.0/2.0 CPU (1.0 used of 1.0 reserved in placement groups) <--- 0.0/2.0 GPU (0.0 used of 1.0 reserved in placement groups) 0B/4.29GiB memory 0B/2.00GiB object_store_memory
Demands: (no resource demands)
You can also verify the actor is created using ray list actors.
.. code-block:: bash
pip install "ray[default]"ray list actors --detail
.. code-block:: bash
Since 1 GPU remains, let's create a new actor that requires 1 GPU.
This time, we also specify the placement_group_bundle_index. Each bundle is given an "index" within the placement group.
For example, a placement group of 2 bundles [{"CPU": 1}, {"GPU": 1}] has index 0 bundle {"CPU": 1}
and index 1 bundle {"GPU": 1}. Since we only have 1 bundle, we only have index 0. If you don't specify a bundle, the actor (or task)
is scheduled on a random bundle that has unallocated reserved resources.
.. tab-set::
.. tab-item:: Python
.. literalinclude:: ../doc_code/placement_group_example.py
:language: python
:start-after: __schedule_pg_3_start__
:end-before: __schedule_pg_3_end__
We succeed to schedule the GPU actor! The below image describes 2 actors scheduled into the placement group.
.. image:: ../images/pg_image_3.png :align: center
You can also verify that the reserved resources are all used, with the ray status command.
.. code-block:: bash
ray status
.. code-block:: bash
Usage: 1.0/2.0 CPU (1.0 used of 1.0 reserved in placement groups) 1.0/2.0 GPU (1.0 used of 1.0 reserved in placement groups) <---- 0B/4.29GiB memory 0B/2.00GiB object_store_memory
.. _pgroup-strategy:
One of the features the placement group provides is to add placement constraints among bundles.
For example, you'd like to pack your bundles to the same
node or spread out to multiple nodes as much as possible. You can specify the strategy via strategy argument.
This way, you can make sure your actors and tasks can be scheduled with certain placement constraints.
The example below creates a placement group with 2 bundles with a PACK strategy;
both bundles have to be created in the same node. Note that it is a soft policy. If the bundles cannot be packed
into a single node, they are spread to other nodes. If you'd like to avoid the problem, you can instead use STRICT_PACK
policies, which fail to create placement groups if placement requirements cannot be satisfied.
.. literalinclude:: ../doc_code/placement_group_example.py :language: python :start-after: strategy_pg_start :end-before: strategy_pg_end
The image below demonstrates the PACK policy. Three of the {"CPU": 2} bundles are located in the same node.
.. image:: ../images/pg_image_4.png :align: center
The image below demonstrates the SPREAD policy. Each of three of the {"CPU": 2} bundles are located in three different nodes.
.. image:: ../images/pg_image_5.png :align: center
Ray supports four placement group strategies. The default scheduling policy is PACK.
STRICT_PACK
All bundles must be placed into a single node on the cluster. Use this strategy when you want to maximize the locality.
PACK
All provided bundles are packed onto a single node on a best-effort basis. If strict packing is not feasible (i.e., some bundles do not fit on the node), bundles can be placed onto other nodes.
STRICT_SPREAD
Each bundle must be scheduled in a separate node.
SPREAD
Each bundle is spread onto separate nodes on a best-effort basis. If strict spreading is not feasible, bundles can be placed on overlapping nodes.
By default, a placement group's lifetime is scoped to the driver that creates placement groups
(unless you make it a :ref:detached placement group <placement-group-detached>). When the placement group is created from
a :ref:detached actor <actor-lifetimes>, the lifetime is scoped to the detached actor.
In Ray, the driver is the Python script that calls ray.init.
Reserved resources (bundles) from the placement group are automatically freed when the driver or detached actor
that creates placement group exits. To free the reserved resources manually, remove the placement
group using the :func:remove_placement_group <ray.util.remove_placement_group> API (which is also an asynchronous API).
.. note::
When you remove the placement group, actors or tasks that still use the reserved resources are forcefully killed.
.. tab-set::
.. tab-item:: Python
.. literalinclude:: ../doc_code/placement_group_example.py
:language: python
:start-after: __remove_pg_start__
:end-before: __remove_pg_end__
.. tab-item:: Java
.. code-block:: java
PlacementGroups.removePlacementGroup(placementGroup.getId());
PlacementGroup removedPlacementGroup = PlacementGroups.getPlacementGroup(placementGroup.getId());
Assert.assertEquals(removedPlacementGroup.getState(), PlacementGroupState.REMOVED);
.. tab-item:: C++
.. code-block:: c++
ray::RemovePlacementGroup(placement_group.GetID());
ray::PlacementGroup removed_placement_group = ray::GetPlacementGroup(placement_group.GetID());
assert(removed_placement_group.GetState(), ray::PlacementGroupState::REMOVED);
.. _ray-placement-group-observability-ref:
Ray provides several useful tools to inspect the placement group states and resource usage.
.. tab-set::
.. tab-item:: ray status (CLI)
The CLI command ``ray status`` provides the autoscaling status of the cluster.
It provides the "resource demands" from unscheduled placement groups as well as the resource reservation status.
.. code-block:: bash
Resources
---------------------------------------------------------------
Usage:
1.0/2.0 CPU (1.0 used of 1.0 reserved in placement groups)
0.0/2.0 GPU (0.0 used of 1.0 reserved in placement groups)
0B/4.29GiB memory
0B/2.00GiB object_store_memory
.. tab-item:: Dashboard
The :ref:`dashboard job view <dash-jobs-view>` provides the placement group table that displays the scheduling state and metadata of the placement group.
.. note::
Ray dashboard is only available when you install Ray is with ``pip install "ray[default]"``.
.. tab-item:: Ray State API
:ref:`Ray state API <state-api-overview-ref>` is a CLI tool for inspecting the state of Ray resources (tasks, actors, placement groups, etc.).
``ray list placement-groups`` provides the metadata and the scheduling state of the placement group.
``ray list placement-groups --detail`` provides statistics and scheduling state in a greater detail.
.. note::
State API is only available when you install Ray is with ``pip install "ray[default]"``
Inspect Placement Group Scheduling State
With the above tools, you can see the state of the placement group. The definition of states are specified in the following files:
- `High level state <https://github.com/ray-project/ray/blob/03a9d2166988b16b7cbf51dac0e6e586455b28d8/src/ray/protobuf/gcs.proto#L579>`_
- `Details <https://github.com/ray-project/ray/blob/03a9d2166988b16b7cbf51dac0e6e586455b28d8/src/ray/protobuf/gcs.proto#L524>`_
.. image:: ../images/pg_image_6.png
:align: center
[Advanced] Child Tasks and Actors
---------------------------------
By default, child actors and tasks don't share the same placement group that the parent uses.
To automatically schedule child actors or tasks to the same placement group,
set ``placement_group_capture_child_tasks`` to True.
.. tab-set::
.. tab-item:: Python
.. literalinclude:: ../doc_code/placement_group_capture_child_tasks_example.py
:language: python
:start-after: __child_capture_pg_start__
:end-before: __child_capture_pg_end__
.. tab-item:: Java
It's not implemented for Java APIs yet.
When ``placement_group_capture_child_tasks`` is True, but you don't want to schedule
child tasks and actors to the same placement group, specify ``PlacementGroupSchedulingStrategy(placement_group=None)``.
.. literalinclude:: ../doc_code/placement_group_capture_child_tasks_example.py
:language: python
:start-after: __child_capture_disable_pg_start__
:end-before: __child_capture_disable_pg_end__
[Advanced] Named Placement Group
--------------------------------
Within a :ref:`namespace <namespaces-guide>`, you can *name* a placement group.
You can use the name of a placement group to retrieve the placement group from any job
in the Ray cluster, as long as the job is within the same namespace.
This is useful if you can't directly pass the placement group handle to
the actor or task that needs it, or if you are trying to
access a placement group launched by another driver.
The placement group is destroyed when the original creation job completes if its
lifetime isn't `detached`. You can avoid this by using a :ref:`detached placement group <placement-group-detached>`
Note that this feature requires that you specify a
:ref:`namespace <namespaces-guide>` associated with it, or else you can't retrieve the
placement group across jobs.
.. tab-set::
.. tab-item:: Python
.. literalinclude:: ../doc_code/placement_group_example.py
:language: python
:start-after: __get_pg_start__
:end-before: __get_pg_end__
.. tab-item:: Java
.. code-block:: java
// Create a placement group with a unique name.
Map<String, Double> bundle = ImmutableMap.of("CPU", 1.0);
List<Map<String, Double>> bundles = ImmutableList.of(bundle);
PlacementGroupCreationOptions options =
new PlacementGroupCreationOptions.Builder()
.setBundles(bundles)
.setStrategy(PlacementStrategy.STRICT_SPREAD)
.setName("global_name")
.build();
PlacementGroup pg = PlacementGroups.createPlacementGroup(options);
pg.wait(60);
...
// Retrieve the placement group later somewhere.
PlacementGroup group = PlacementGroups.getPlacementGroup("global_name");
Assert.assertNotNull(group);
.. tab-item:: C++
.. code-block:: c++
// Create a placement group with a globally unique name.
std::vector<std::unordered_map<std::string, double>> bundles{{{"CPU", 1.0}}};
ray::PlacementGroupCreationOptions options{
true/*global*/, "global_name", bundles, ray::PlacementStrategy::STRICT_SPREAD};
ray::PlacementGroup pg = ray::CreatePlacementGroup(options);
pg.Wait(60);
...
// Retrieve the placement group later somewhere.
ray::PlacementGroup group = ray::GetGlobalPlacementGroup("global_name");
assert(!group.Empty());
We also support non-global named placement group in C++, which means that the placement group name is only valid within the job and cannot be accessed from another job.
.. code-block:: c++
// Create a placement group with a job-scope-unique name.
std::vector<std::unordered_map<std::string, double>> bundles{{{"CPU", 1.0}}};
ray::PlacementGroupCreationOptions options{
false/*non-global*/, "non_global_name", bundles, ray::PlacementStrategy::STRICT_SPREAD};
ray::PlacementGroup pg = ray::CreatePlacementGroup(options);
pg.Wait(60);
...
// Retrieve the placement group later somewhere in the same job.
ray::PlacementGroup group = ray::GetPlacementGroup("non_global_name");
assert(!group.Empty());
.. _placement-group-detached:
[Advanced] Detached Placement Group
-----------------------------------
By default, the lifetimes of placement groups belong to the driver and actor.
- If the placement group is created from a driver, it is destroyed when the driver is terminated.
- If it is created from a detached actor, it is killed when the detached actor is killed.
To keep the placement group alive regardless of its job or detached actor, specify
`lifetime="detached"`. For example:
.. tab-set::
.. tab-item:: Python
.. literalinclude:: ../doc_code/placement_group_example.py
:language: python
:start-after: __detached_pg_start__
:end-before: __detached_pg_end__
.. tab-item:: Java
The lifetime argument isn't implemented for Java APIs yet.
Let's terminate the current script and start a new Python script. Call ``ray list placement-groups``, and you can see the placement group is not removed.
Note that Ray decouples the lifetime option and the name option. If you only specify
the name without specifying ``lifetime="detached"``, then you can only retrieve the placement group
while the driver where you created the placement group is still running.
It's recommended to always specify the name when creating the detached placement group. If you don't,
there is no way to retrieve the placement group from another process, and there is no way
to kill it once you exit the driver script that created the placement group.
.. _ray-placement-group-ft-ref:
[Advanced] Fault Tolerance
--------------------------
Rescheduling Bundles on a Dead Node
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If nodes that contain some bundles of a placement group die, Ray tries to reschedule the lost bundles on different nodes.
This means that the initial creation of placement group is "atomic," but after the initial creation,
there could be partial placement groups. Actors or tasks running on bundles on the remaining live nodes continue to run.
Note that rescheduling bundles have higher scheduling priority than other placement group scheduling.
Provide Resources for Partially Lost Bundles
If there aren't enough resources to schedule the partially lost bundles, the placement group waits, assuming the Ray Autoscaler starts a new node to satisfy the resource requirements. If the autoscaler can't provide additional resources or if you're not using the autoscaler, the placement group remains in the partially created state indefinitely.
Fault Tolerance of Actors and Tasks that Use the Bundle
Ray reschedules Actors and tasks that use the bundle (reserved resources) based on their :ref:`fault tolerant policy <fault-tolerance>` once Ray recovers the bundle.
API Reference
-------------
:ref:`Placement Group API reference <ray-placement-group-ref>`