Placement 接口#

本节介绍 RLinf 中的 GPU 和节点放置(placement)策略。 无论是在 共置模式(collocated mode)分离模式(disaggregated mode) 还是 混合模式(hybrid mode) 下,ComponentPlacement 都是面向用户的接口,用于为不同组件的 worker(例如 actor、env、rollout、inference) 生成放置信息;而各类 placement 策略则是实现“每个节点、每个 GPU 资源精确分配”的 底层机制。 生成的 placement 元数据(placement metadata) 随后会用于配合 Ray 启动远程任务。

组件 Placement#

ComponentPlacement 接口负责解析配置文件中的 cluster.component_placement 字段,并为不同组件的 worker 生成精确的放置信息。

需要注意的是,ComponentPlacement 还通过 cluster.node_groups 配置中的 node_group 字段,支持对异构集群的放置描述。

关于语法的详细说明,可参考异构集群教程和下方自动文档。

class rlinf.utils.placement.ComponentPlacement#

基类:object

Base component placement for parsing cluster.component_placement config.

The component placement config is defined as either:

group_name1,group_name2,...: resource_ranks1:process_ranks1, resource_ranks2:process_ranks2,...

or:

group_name1,group_name2,...:
    node_group: <node_group_label>
    placement: "resource_ranks1":"process_ranks1", "resource_ranks2":"process_ranks2",...

A simple example is:

cluster:
    num_nodes: 1
    actor,inference: 0-7

which means both the actor and inference groups' process 0-7 evenly occupy accelerator 0 to 7.

A more complex example is:

cluster:
num_nodes: 2
component_placement:
    actor:
        node_group: a800
        placement: 0-8
    rollout:
        node_group: 4090
        placement: 0-8
    env:
        node_group: robot # Assuming robot hardware type is defined in the node group config
        placement: 0-3:0-7
    agent:
        node_group: node
        placement: 0-1:0-200,2-3:201-511

which means:

  • The actor group occupies accelerators 0-8 on node group 'a800'.

  • The rollout group occupies accelerators 0-8 on node group '4090'.

  • The env group occupies robot hardware 0-3 on node group 'robot', with each robot hardware shared by 2 processes.

  • The agent group occupies nodes 0-1 for process 0-200, and nodes 2-3 for process 201-511.

The concrete specifications of the config format are as follows:

  • resource_ranks is the ranks of the resources (e.g., GPUs, robots, or nodes) to use for the component(s). resource ranks are by default the accelerator ranks (within the node group if node_group is given, counted from 0) if no hardware is specified in the config. If the nodes do not have accelerators, resource ranks are the node ranks. If a hardware is specified in the node group config, the resource ranks are the hardware ranks within the label node group, e.g., for nodes with robotic systems.

    The format of resource_ranks is an integer range a-b, which means all ranks from a to b including a and b. For example, 0-3 means rank 0, 1, 2, 3. Alternatively, "all" can be used to specify all resources.

  • process_ranks is the ranks of the processes of the component(s), following the same format of resource_ranks. The processes will be evenly assigned to the specified resource ranks. For example, 0-3:0-7 means process 0-7 will be evenly assigned to resource ranks 0-3, with 2 processes sharing 1 resource. If the number of processes is smaller than the number of resources, it means one process occupy multiple resources. If process_ranks is not specified, each process will be assigned to one resource rank in order. For example, 0-4 means process 0-4 will be assigned to resource ranks 0-4 respectively.

    Fancier syntax mixing the two formats is also supported, e.g., 0-1:0-3,3-5,7-10:7-14, which means process 0-3 will be evenly assigned to resource ranks 0-1, process 4-6 will be assigned to resource ranks 3-5 (implicitly inferred by the scheduler) respectively, and process 7-14 will be evenly assigned to resource ranks 7-10. Note that even if the process ranks are not specified, they are assumed to be continuous from 0 to N-1, where N is the total number of processes. Failure to follow this rule will raise an assertion error.

  • For the second format, the node_group label is the label defined in cluster.node_groups.label, which is optional. It can be either a single string (single node group) or a comma-separated string/list (multiple node groups, e.g., node_group: "a800,4090"). If not specified, all nodes in the cluster are used. A node label is reserved by the scheduler for allocating on node ranks only (no accelerators or other hardware).

    When multiple node groups are specified, hardware ranks span across all groups in order, starting from 0. For example, if group "a800" has 8 GPUs (ranks 0-7) and group "4090" has 8 GPUs (ranks 0-7 within that group), then in the composite placement, hardware ranks 0-7 belong to "a800" and ranks 8-15 belong to "4090". Note that hardware ranks within a single process must all be of the same hardware type and on the same node.

__init__(config, cluster)#

Parsing component placement configuration.

参数:
  • config (DictConfig) -- The configuration dictionary for the component placement.

  • cluster (Cluster) -- The cluster to use for placement.

property placement_mode#

Get the placement mode for the component.

返回:

The placement mode for the component.

返回类型:

PlacementMode

property components: list[str]#

Get the list of components defined in the placement.

返回:

The list of component names.

返回类型:

list[str]

get_hardware_ranks(component_name)#

Get the hardware count for a specific component.

参数:

component_name (str) -- The name of the component.

返回:

The hardware ranks for the specified component.

返回类型:

list[int]

get_world_size(component_name)#

Get the world size for a specific component.

参数:

component_name (str) -- The name of the component.

返回:

The world size for the specified component.

返回类型:

int

get_strategy(component_name)#

Get the placement strategy for a component based on the configuration.

参数:

component_name (str) -- The name of the component to retrieve the placement strategy for.

返回:

The placement strategy for the specified component.

返回类型:

PackedPlacementStrategy

具身智能(embodied intelligence)数学推理(MATH reasoning) 场景中,分别使用 HybridComponentPlacementModelParallelComponentPlacement 来生成 worker 放置方案。 HybridComponentPlacement 直接继承自 ComponentPlacement, 而 ModelParallelComponentPlacement 在其基础上扩展了放置逻辑, 以支持在多张 GPU 上进行推理引擎的模型并行。

HybridComponentPlacement#

class rlinf.utils.placement.HybridComponentPlacement#

基类:ComponentPlacement

Hybrid component placement that allows components to run on any sets of GPUs.

__init__(config, cluster)#

Initialize HybridComponentPlacement

参数:
  • config (DictConfig) -- The configuration dictionary.

  • cluster (Cluster)

ModelParallelComponentPlacement#

class rlinf.utils.placement.ModelParallelComponentPlacement#

基类:ComponentPlacement

Component placement for model-parallel components.

The components must be actor, rollout, and optionally inference, whose GPUs must be continuous.

This placement supports both collocated and disaggregated modes.

In the collocated mode, all components share the same set of GPUs. In particular, the rollout group is specially placed in a strided manner to enable fast cudaIPC-based weight sync. In the disaggregated mode, each component has its own dedicated set of GPUs.

In the collocated mode, only actor and rollout exist. While in the disaggregated mode, actor, rollout, and inference should all exist.

__init__(config, cluster)#

Initialize ModelParallelComponentPlacement

参数:
  • config (DictConfig) -- The configuration dictionary for the component placement.

  • cluster (Cluster)

Placement 策略#

Placement 策略是 ComponentPlacement 用来获得“每个节点、每个 GPU 资源 精确分配方案”的底层机制。 如果你希望自定义更细粒度的放置方式,可以参考以下内置策略: FlexiblePlacementStrategyPackedPlacementStrategyNodePlacementStrategy。 其中,FlexiblePlacementStrategyPackedPlacementStrategy 用于在加速器/GPU 上放置 worker 进程,而 NodePlacementStrategy 则在仅关注“节点位置”而不关心底层加速器资源时使用, 因此非常适合只依赖 CPU 的 worker。

FlexiblePlacementStrategy#

class rlinf.scheduler.placement.flexible.FlexiblePlacementStrategy#

基类:PlacementStrategy

This placement strategy allows processes to be placed on any hardware (accelerators, robots, etc.) by specifying a list of global hardware ranks for each process.

备注

The global hardware rank means the hardware rank across the entire cluster or a node group if node_group_label is given. For example, if a cluster has 2 nodes, each with 8 GPUs, then the global GPU ranks are 0~7 for node 0 and 8~15 for node 1.

The following example shows how to use the placement strategy.

Example:

>>> from rlinf.scheduler import (
...     Cluster,
...     Worker,
...     FlexiblePlacementStrategy,
... )
>>>
>>> class MyWorker(Worker):
...     def __init__(self, msg: str = "Hello, World!"):
...         super().__init__()
...         self._msg = msg
...
...     def hello(self):
...         return self._rank
...
...     def available_gpus(self):
...         import torch
...         available_gpus = torch.cuda.device_count()
...         gpu_ids = [
...             torch.cuda.get_device_properties(i) for i in range(available_gpus)
...         ]
...         return available_gpus
>>>
>>> cluster = Cluster(num_nodes=1)
>>>
>>> # `FlexiblePlacementStrategy` allows you to specify the *global* accelerator/GPU ranks for each process.
>>> placement = FlexiblePlacementStrategy([[0, 1], [2], [3]])
>>> my_worker = MyWorker.create_group().launch(
...     cluster=cluster, name="flexible_placement", placement_strategy=placement
... )
>>> # This will run 3 processes on the first node's GPU 0, 1, 2, 3, where the first process uses GPUs 0 and 1, the second process uses GPU 2, and the third process uses GPU 3.
>>> my_worker.available_gpus().wait()
[2, 1, 1]
__init__(hardware_ranks_list, node_group_label=None)#

Initialize the FlexiblePlacementStrategy.

备注

The hardware ranks in each inner list must be on the same node and must be unique.

备注

The hardware ranks will be sorted in ascending order both within each process and across processes (based on the first rank).

参数:
  • hardware_ranks_list (List[List[int]]) -- A list of lists, where each inner list contains the hardware (e.g., GPU) ranks to allocate for a specific process.

  • node_group_label (Optional[str | Sequence[str]]) -- The label or list of labels of the node groups to which the accelerator ranks belong. If specified, the accelerator ranks mean local ranks within the selected node groups. Otherwise, accelerator ranks are global ranks.

get_placement(cluster, isolate_accelerator=True)#

Generate a list of placements based on the flexible strategy.

参数:
  • cluster (Cluster) -- The cluster object containing information about the nodes and accelerators.

  • isolate_accelerator (bool) -- Whether accelerators not allocated to a worker will not be visible to the worker (by settings envs like CUDA_VISIBLE_DEVICES). Defaults to True.

返回:

A list of Placement objects representing the placements of processes on accelerators.

返回类型:

List[Placement]

PackedPlacementStrategy#

class rlinf.scheduler.placement.packed.PackedPlacementStrategy#

基类:PlacementStrategy

Placement strategy that allows processes to be placed on hardware (e.g., GPUs) in a close-packed manner. One process can have one or multiple hardware.

The following example shows how to use the placement strategy.

Example:

>>> from rlinf.scheduler import (
...     Cluster,
...     Worker,
...     PackedPlacementStrategy,
... )
>>>
>>> class MyWorker(Worker):
...     def __init__(self, msg: str = "Hello, World!"):
...         super().__init__()
...         self._msg = msg
...
...     def hello(self):
...         return self._rank
...
...     def available_gpus(self):
...         import torch
...         available_gpus = torch.cuda.device_count()
...         gpu_ids = [
...             torch.cuda.get_device_properties(i) for i in range(available_gpus)
...         ]
...         return available_gpus
>>>
>>> cluster = Cluster(num_nodes=1)
>>>
>>> # `PackedPlacementStrategy` will fill up nodes with workers before moving to the next node.
>>> placement = PackedPlacementStrategy(start_hardware_rank=0, end_hardware_rank=3)
>>> my_worker = MyWorker.create_group().launch(
...     cluster=cluster, name="packed_placement", placement_strategy=placement
... )
>>> my_worker.available_gpus().wait() # This will run 4 processes on the first node's GPU 0, 1, 2, 3, each using 1 GPU.
[1, 1, 1, 1]
>>>
>>>
>>> # `num_hardware_per_process` allows for one process to hold multiple accelerators/GPUs.
>>> # For example, if you want a process to hold 4 GPUs, you can set the `num_hardware_per_process` to 4.
>>> placement_chunked = PackedPlacementStrategy(
...     start_hardware_rank=0, end_hardware_rank=3, num_hardware_per_process=2
... )
>>> my_worker_chunked = MyWorker.create_group().launch(
...     cluster=cluster,
...     name="chunked_placement",
...     placement_strategy=placement_chunked,
... )
>>> my_worker_chunked.available_gpus().wait()  # This will run 2 processes, each using 2 GPUs (0-1 and 2-3) of the first node.
[2, 2]
>>>
>>>
>>> # `stride` allows for strided placement of workers across GPUs.
>>> # For example, if you want to place workers on every second GPU, you can set the stride to 2.
>>> placement_strided = PackedPlacementStrategy(
...     start_hardware_rank=0, end_hardware_rank=3, stride=2, num_hardware_per_process=2
... )
>>> my_worker_strided = MyWorker.create_group().launch(
...     cluster=cluster,
...     name="strided_placement",
...     placement_strategy=placement_strided,
... )
>>> # This will run 2 processes, each using 2 GPUs (0,2 1,3) of the first node.
>>> my_worker_strided.available_gpus().wait()
[2, 2]
__init__(start_hardware_rank, end_hardware_rank, num_hardware_per_process=1, stride=1, node_group=None)#

Initialize the PackedPlacementStrategy.

参数:
  • start_hardware_rank (int) -- The global rank of the starting hardware in the cluster or node group for the placement.

  • end_hardware_rank (int) -- The global rank of the end hardware in the cluster or node group for the placement.

  • num_hardware_per_process (int) -- The number of hardware resources to allocate for each process.

  • stride (int) -- The stride to use when allocating hardware. This allows one process to have multiple hardware in a strided manner, e.g., GPU 0, 2, 4 (stride 2) or GPU 0, 3, 6 (stride 3).

  • node_group (Optional[str | Sequence[str]]) -- The label(s) of the node group(s) to use for placement. Provide a list to span multiple node groups. If None, the entire cluster is considered.

get_placement(cluster, isolate_accelerator=True)#

Generate a list of placements based on the packed strategy.

参数:
  • cluster (Cluster) -- The cluster object containing information about the nodes and accelerators.

  • isolate_accelerator (bool) -- Whether accelerators not allocated to a worker will not be visible to the worker (by settings envs like CUDA_VISIBLE_DEVICES). Defaults to True.

返回:

A list of Placement objects representing the placements of processes on accelerators.

返回类型:

list[Placement]

NodePlacementStrategy#

class rlinf.scheduler.placement.node.NodePlacementStrategy#

基类:PlacementStrategy

This placement strategy places processes on specific nodes (using global node rank) without limiting accelerators. This is useful for CPU-only workers who do not rely on accelerators.

备注

The global node rank means the node rank across the entire cluster. For example, if a cluster has 16 nodes, the node ranks are 0~15.

Example:

>>> from rlinf.scheduler import (
...     Cluster,
...     Worker,
...     NodePlacementStrategy,
... )
>>>
>>> class MyWorker(Worker):
...     def __init__(self, msg: str = "Hello, World!"):
...         super().__init__()
...         self._msg = msg
...
...     def hello(self):
...         return self._rank
...
>>>
>>> cluster = Cluster(num_nodes=1)
>>>
>>> # `NodePlacementStrategy` allows you to specify the *global* node ranks for each process.
>>> placement = NodePlacementStrategy([0] * 4)
>>> my_worker = MyWorker.create_group().launch(
...     cluster=cluster, name="node_placement", placement_strategy=placement
... )
>>> my_worker.hello().wait() # This will run 4 processes on the first node
[0, 1, 2, 3]
__init__(node_ranks, node_group_label=None)#

Initialize the NodePlacementStrategy.

备注

The node ranks will be sorted.

参数:
  • node_ranks (List[int]) -- A list of node ranks to allocate for the processes.

  • node_group_label (Optional[str | Sequence[str]]) -- The label or list of labels of the node groups to which the node ranks belong. If specified, the node_ranks are local ranks within the selected node groups. Otherwise, node_ranks are global ranks.

get_placement(cluster, isolate_accelerator=True)#

Generate a list of placements based on the node placement strategy.

参数:
  • cluster (Cluster) -- The cluster object containing information about the nodes and hardware.

  • isolate_accelerator (bool) -- Whether accelerators not allocated to a worker will not be visible to the worker (by settings envs like CUDA_VISIBLE_DEVICES). Defaults to True.

返回:

A list of Placement objects representing the placements of processes.

返回类型:

List[Placement]

Placement 元数据#

class rlinf.scheduler.placement.placement.Placement#

Class representing the placement of a worker on a specific GPU.

rank: int#

Global rank of the worker in the cluster.

cluster_node_rank: int#

Global node rank in the cluster where the worker is placed.

placement_node_rank: int#

Local rank of the node in the placement.

local_accelerator_rank: int#

Local GPU ID on the node.

accelerator_type: AcceleratorType#

Type of accelerators on the node.

local_rank: int#

Local rank of the worker on the node.

local_world_size: int#

Local world size (number of workers) on the node.

visible_accelerators: list[str]#

List of CUDA visible devices for the worker.

isolate_accelerator: bool#

Flag to indicate if the local rank should be set to zero. This is useful for workers that require multiple GPUs.

local_hardware_ranks: list[int]#

The assigned local hardware ranks of the worker

node_group_label: str#

The label of the node group where the worker is placed.