Placement Interface#

This section introduces the GPU and node placement strategies in RLinf. Whether in collocated mode, disaggregated mode, or hybrid mode, ComponentPlacement is the user-facing interface for generating the placements of different component workers (e.g., actor, env, rollout, inference), while placement strategies are the underlying mechanisms for obtaining precise allocation of each node and each GPU resource. The generated placement metadata is later used for remote launching with Ray.

Component Placement#

The ComponentPlacement interface is responsible for parsing the cluster.component_placement field in the configuration file and generating precise placements for different component workers.

Notably, ComponentPlacement also supports configuration of heterogeneous clusters through the node_group field in cluster.node_groups.

The detailed explanation of the syntax can be found in the docs below.

class rlinf.utils.placement.ComponentPlacement#

Bases: object

Base component placement for parsing cluster.component_placement config.

The component placement config is defined as either:

group_name1,group_name2,...: resource_ranks1:process_ranks1, resource_ranks2:process_ranks2,...

or:

group_name1,group_name2,...:
    node_group: <node_group_label>
    placement: "resource_ranks1":"process_ranks1", "resource_ranks2":"process_ranks2",...

A simple example is:

cluster:
    num_nodes: 1
    actor,inference: 0-7

which means both the actor and inference groups’ process 0-7 evenly occupy accelerator 0 to 7.

A more complex example is:

cluster:
num_nodes: 2
component_placement:
    actor:
        node_group: a800
        placement: 0-8
    rollout:
        node_group: 4090
        placement: 0-8
    env:
        node_group: robot # Assuming robot hardware type is defined in the node group config
        placement: 0-3:0-7
    agent:
        node_group: node
        placement: 0-1:0-200,2-3:201-511

which means:

  • The actor group occupies accelerators 0-8 on node group β€˜a800’.

  • The rollout group occupies accelerators 0-8 on node group β€˜4090’.

  • The env group occupies robot hardware 0-3 on node group β€˜robot’, with each robot hardware shared by 2 processes.

  • The agent group occupies nodes 0-1 for process 0-200, and nodes 2-3 for process 201-511.

The concrete specifications of the config format are as follows:

  • resource_ranks is the ranks of the resources (e.g., GPUs, robots, or nodes) to use for the component(s). resource ranks are by default the accelerator ranks (within the node group if node_group is given, counted from 0) if no hardware is specified in the config. If the nodes do not have accelerators, resource ranks are the node ranks. If a hardware is specified in the node group config, the resource ranks are the hardware ranks within the label node group, e.g., for nodes with robotic systems.

    The format of resource_ranks is an integer range a-b, which means all ranks from a to b including a and b. For example, 0-3 means rank 0, 1, 2, 3. Alternatively, β€œall” can be used to specify all resources.

  • process_ranks is the ranks of the processes of the component(s), following the same format of resource_ranks. The processes will be evenly assigned to the specified resource ranks. For example, 0-3:0-7 means process 0-7 will be evenly assigned to resource ranks 0-3, with 2 processes sharing 1 resource. If the number of processes is smaller than the number of resources, it means one process occupy multiple resources. If process_ranks is not specified, each process will be assigned to one resource rank in order. For example, 0-4 means process 0-4 will be assigned to resource ranks 0-4 respectively.

    Fancier syntax mixing the two formats is also supported, e.g., 0-1:0-3,3-5,7-10:7-14, which means process 0-3 will be evenly assigned to resource ranks 0-1, process 4-6 will be assigned to resource ranks 3-5 (implicitly inferred by the scheduler) respectively, and process 7-14 will be evenly assigned to resource ranks 7-10. Note that even if the process ranks are not specified, they are assumed to be continuous from 0 to N-1, where N is the total number of processes. Failure to follow this rule will raise an assertion error.

  • For the second format, the node_group label is the label defined in cluster.node_groups.label, which is optional. It can be either a single string (single node group) or a comma-separated string/list (multiple node groups, e.g., node_group: "a800,4090"). If not specified, all nodes in the cluster are used. A node label is reserved by the scheduler for allocating on node ranks only (no accelerators or other hardware).

    When multiple node groups are specified, hardware ranks span across all groups in order, starting from 0. For example, if group β€œa800” has 8 GPUs (ranks 0-7) and group β€œ4090” has 8 GPUs (ranks 0-7 within that group), then in the composite placement, hardware ranks 0-7 belong to β€œa800” and ranks 8-15 belong to β€œ4090”. Note that hardware ranks within a single process must all be of the same hardware type and on the same node.

__init__(config, cluster)#

Parsing component placement configuration.

Parameters:
  • config (DictConfig) – The configuration dictionary for the component placement.

  • cluster (Cluster) – The cluster to use for placement.

property placement_mode#

Get the placement mode for the component.

Returns:

The placement mode for the component.

Return type:

PlacementMode

property components: list[str]#

Get the list of components defined in the placement.

Returns:

The list of component names.

Return type:

list[str]

get_hardware_ranks(component_name)#

Get the hardware count for a specific component.

Parameters:

component_name (str) – The name of the component.

Returns:

The hardware ranks for the specified component.

Return type:

list[int]

get_world_size(component_name)#

Get the world size for a specific component.

Parameters:

component_name (str) – The name of the component.

Returns:

The world size for the specified component.

Return type:

int

get_strategy(component_name)#

Get the placement strategy for a component based on the configuration.

Parameters:

component_name (str) – The name of the component to retrieve the placement strategy for.

Returns:

The placement strategy for the specified component.

Return type:

PackedPlacementStrategy

In the embodied intelligence and MATH reasoning settings, HybridComponentPlacement and ModelParallelComponentPlacement are used to generate the worker placements, respectively. HybridComponentPlacement is a direct inheritance of ComponentPlacement, while ModelParallelComponentPlacement extends the placement logic to support model parallelism of inference engines across multiple GPUs.

HybridComponentPlacement#

class rlinf.utils.placement.HybridComponentPlacement#

Bases: ComponentPlacement

Hybrid component placement that allows components to run on any sets of GPUs.

__init__(config, cluster)#

Initialize HybridComponentPlacement

Parameters:
  • config (DictConfig) – The configuration dictionary.

  • cluster (Cluster)

ModelParallelComponentPlacement#

class rlinf.utils.placement.ModelParallelComponentPlacement#

Bases: ComponentPlacement

Component placement for model-parallel components.

The components must be actor, rollout, and optionally inference, whose GPUs must be continuous.

This placement supports both collocated and disaggregated modes.

In the collocated mode, all components share the same set of GPUs. In particular, the rollout group is specially placed in a strided manner to enable fast cudaIPC-based weight sync. In the disaggregated mode, each component has its own dedicated set of GPUs.

In the collocated mode, only actor and rollout exist. While in the disaggregated mode, actor, rollout, and inference should all exist.

__init__(config, cluster)#

Initialize ModelParallelComponentPlacement

Parameters:
  • config (DictConfig) – The configuration dictionary for the component placement.

  • cluster (Cluster)

Placement Strategies#

Placement strategies are the underlying mechanisms for obtaining precise allocation of each node and each GPU resource used by component placement. If you wish to customize placements, you can refer to the following built-in strategies, namely FlexiblePlacementStrategy, PackedPlacementStrategy and NodePlacementStrategy. Specifically, FlexiblePlacementStrategy and PackedPlacementStrategy are used for placing worker processes on top of accelerators/GPUs, while NodePlacementStrategy is used for placing worker processes on specific nodes without considering the underlying accelerator resources and thus useful for CPU-only workers.

FlexiblePlacementStrategy#

class rlinf.scheduler.placement.flexible.FlexiblePlacementStrategy#

Bases: PlacementStrategy

This placement strategy allows processes to be placed on any hardware (accelerators, robots, etc.) by specifying a list of global hardware ranks for each process.

Note

The global hardware rank means the hardware rank across the entire cluster or a node group if node_group_label is given. For example, if a cluster has 2 nodes, each with 8 GPUs, then the global GPU ranks are 0~7 for node 0 and 8~15 for node 1.

The following example shows how to use the placement strategy.

Example:

>>> from rlinf.scheduler import (
...     Cluster,
...     Worker,
...     FlexiblePlacementStrategy,
... )
>>>
>>> class MyWorker(Worker):
...     def __init__(self, msg: str = "Hello, World!"):
...         super().__init__()
...         self._msg = msg
...
...     def hello(self):
...         return self._rank
...
...     def available_gpus(self):
...         import torch
...         available_gpus = torch.cuda.device_count()
...         gpu_ids = [
...             torch.cuda.get_device_properties(i) for i in range(available_gpus)
...         ]
...         return available_gpus
>>>
>>> cluster = Cluster(num_nodes=1)
>>>
>>> # `FlexiblePlacementStrategy` allows you to specify the *global* accelerator/GPU ranks for each process.
>>> placement = FlexiblePlacementStrategy([[0, 1], [2], [3]])
>>> my_worker = MyWorker.create_group().launch(
...     cluster=cluster, name="flexible_placement", placement_strategy=placement
... )
>>> # This will run 3 processes on the first node's GPU 0, 1, 2, 3, where the first process uses GPUs 0 and 1, the second process uses GPU 2, and the third process uses GPU 3.
>>> my_worker.available_gpus().wait()
[2, 1, 1]
__init__(hardware_ranks_list, node_group_label=None)#

Initialize the FlexiblePlacementStrategy.

Note

The hardware ranks in each inner list must be on the same node and must be unique.

Note

The hardware ranks will be sorted in ascending order both within each process and across processes (based on the first rank).

Parameters:
  • hardware_ranks_list (List[List[int]]) – A list of lists, where each inner list contains the hardware (e.g., GPU) ranks to allocate for a specific process.

  • node_group_label (Optional[str | Sequence[str]]) – The label or list of labels of the node groups to which the accelerator ranks belong. If specified, the accelerator ranks mean local ranks within the selected node groups. Otherwise, accelerator ranks are global ranks.

get_placement(cluster, isolate_accelerator=True)#

Generate a list of placements based on the flexible strategy.

Parameters:
  • cluster (Cluster) – The cluster object containing information about the nodes and accelerators.

  • isolate_accelerator (bool) – Whether accelerators not allocated to a worker will not be visible to the worker (by settings envs like CUDA_VISIBLE_DEVICES). Defaults to True.

Returns:

A list of Placement objects representing the placements of processes on accelerators.

Return type:

List[Placement]

PackedPlacementStrategy#

class rlinf.scheduler.placement.packed.PackedPlacementStrategy#

Bases: PlacementStrategy

Placement strategy that allows processes to be placed on hardware (e.g., GPUs) in a close-packed manner. One process can have one or multiple hardware.

The following example shows how to use the placement strategy.

Example:

>>> from rlinf.scheduler import (
...     Cluster,
...     Worker,
...     PackedPlacementStrategy,
... )
>>>
>>> class MyWorker(Worker):
...     def __init__(self, msg: str = "Hello, World!"):
...         super().__init__()
...         self._msg = msg
...
...     def hello(self):
...         return self._rank
...
...     def available_gpus(self):
...         import torch
...         available_gpus = torch.cuda.device_count()
...         gpu_ids = [
...             torch.cuda.get_device_properties(i) for i in range(available_gpus)
...         ]
...         return available_gpus
>>>
>>> cluster = Cluster(num_nodes=1)
>>>
>>> # `PackedPlacementStrategy` will fill up nodes with workers before moving to the next node.
>>> placement = PackedPlacementStrategy(start_hardware_rank=0, end_hardware_rank=3)
>>> my_worker = MyWorker.create_group().launch(
...     cluster=cluster, name="packed_placement", placement_strategy=placement
... )
>>> my_worker.available_gpus().wait() # This will run 4 processes on the first node's GPU 0, 1, 2, 3, each using 1 GPU.
[1, 1, 1, 1]
>>>
>>>
>>> # `num_hardware_per_process` allows for one process to hold multiple accelerators/GPUs.
>>> # For example, if you want a process to hold 4 GPUs, you can set the `num_hardware_per_process` to 4.
>>> placement_chunked = PackedPlacementStrategy(
...     start_hardware_rank=0, end_hardware_rank=3, num_hardware_per_process=2
... )
>>> my_worker_chunked = MyWorker.create_group().launch(
...     cluster=cluster,
...     name="chunked_placement",
...     placement_strategy=placement_chunked,
... )
>>> my_worker_chunked.available_gpus().wait()  # This will run 2 processes, each using 2 GPUs (0-1 and 2-3) of the first node.
[2, 2]
>>>
>>>
>>> # `stride` allows for strided placement of workers across GPUs.
>>> # For example, if you want to place workers on every second GPU, you can set the stride to 2.
>>> placement_strided = PackedPlacementStrategy(
...     start_hardware_rank=0, end_hardware_rank=3, stride=2, num_hardware_per_process=2
... )
>>> my_worker_strided = MyWorker.create_group().launch(
...     cluster=cluster,
...     name="strided_placement",
...     placement_strategy=placement_strided,
... )
>>> # This will run 2 processes, each using 2 GPUs (0,2 1,3) of the first node.
>>> my_worker_strided.available_gpus().wait()
[2, 2]
__init__(start_hardware_rank, end_hardware_rank, num_hardware_per_process=1, stride=1, node_group=None)#

Initialize the PackedPlacementStrategy.

Parameters:
  • start_hardware_rank (int) – The global rank of the starting hardware in the cluster or node group for the placement.

  • end_hardware_rank (int) – The global rank of the end hardware in the cluster or node group for the placement.

  • num_hardware_per_process (int) – The number of hardware resources to allocate for each process.

  • stride (int) – The stride to use when allocating hardware. This allows one process to have multiple hardware in a strided manner, e.g., GPU 0, 2, 4 (stride 2) or GPU 0, 3, 6 (stride 3).

  • node_group (Optional[str | Sequence[str]]) – The label(s) of the node group(s) to use for placement. Provide a list to span multiple node groups. If None, the entire cluster is considered.

get_placement(cluster, isolate_accelerator=True)#

Generate a list of placements based on the packed strategy.

Parameters:
  • cluster (Cluster) – The cluster object containing information about the nodes and accelerators.

  • isolate_accelerator (bool) – Whether accelerators not allocated to a worker will not be visible to the worker (by settings envs like CUDA_VISIBLE_DEVICES). Defaults to True.

Returns:

A list of Placement objects representing the placements of processes on accelerators.

Return type:

list[Placement]

NodePlacementStrategy#

class rlinf.scheduler.placement.node.NodePlacementStrategy#

Bases: PlacementStrategy

This placement strategy places processes on specific nodes (using global node rank) without limiting accelerators. This is useful for CPU-only workers who do not rely on accelerators.

Note

The global node rank means the node rank across the entire cluster. For example, if a cluster has 16 nodes, the node ranks are 0~15.

Example:

>>> from rlinf.scheduler import (
...     Cluster,
...     Worker,
...     NodePlacementStrategy,
... )
>>>
>>> class MyWorker(Worker):
...     def __init__(self, msg: str = "Hello, World!"):
...         super().__init__()
...         self._msg = msg
...
...     def hello(self):
...         return self._rank
...
>>>
>>> cluster = Cluster(num_nodes=1)
>>>
>>> # `NodePlacementStrategy` allows you to specify the *global* node ranks for each process.
>>> placement = NodePlacementStrategy([0] * 4)
>>> my_worker = MyWorker.create_group().launch(
...     cluster=cluster, name="node_placement", placement_strategy=placement
... )
>>> my_worker.hello().wait() # This will run 4 processes on the first node
[0, 1, 2, 3]
__init__(node_ranks, node_group_label=None)#

Initialize the NodePlacementStrategy.

Note

The node ranks will be sorted.

Parameters:
  • node_ranks (List[int]) – A list of node ranks to allocate for the processes.

  • node_group_label (Optional[str | Sequence[str]]) – The label or list of labels of the node groups to which the node ranks belong. If specified, the node_ranks are local ranks within the selected node groups. Otherwise, node_ranks are global ranks.

get_placement(cluster, isolate_accelerator=True)#

Generate a list of placements based on the node placement strategy.

Parameters:
  • cluster (Cluster) – The cluster object containing information about the nodes and hardware.

  • isolate_accelerator (bool) – Whether accelerators not allocated to a worker will not be visible to the worker (by settings envs like CUDA_VISIBLE_DEVICES). Defaults to True.

Returns:

A list of Placement objects representing the placements of processes.

Return type:

List[Placement]

Placement Metadata#

class rlinf.scheduler.placement.placement.Placement#

Class representing the placement of a worker on a specific GPU.

rank: int#

Global rank of the worker in the cluster.

cluster_node_rank: int#

Global node rank in the cluster where the worker is placed.

placement_node_rank: int#

Local rank of the node in the placement.

local_accelerator_rank: int#

Local GPU ID on the node.

accelerator_type: AcceleratorType#

Type of accelerators on the node.

local_rank: int#

Local rank of the worker on the node.

local_world_size: int#

Local world size (number of workers) on the node.

visible_accelerators: list[str]#

List of CUDA visible devices for the worker.

isolate_accelerator: bool#

Flag to indicate if the local rank should be set to zero. This is useful for workers that require multiple GPUs.

local_hardware_ranks: list[int]#

The assigned local hardware ranks of the worker

node_group_label: str#

The label of the node group where the worker is placed.