Cluster 接口#

本节介绍 RLinf 中的 Cluster 类,它负责启动远程节点和 GPU。 该类基于从 Placement 策略获取的元数据,利用 Ray 来精确调度所有训练资源,以支持分布式训练。

Cluster#

class rlinf.scheduler.cluster.Cluster#

A singleton class that manages the cluster resources for Ray workers.

exception NamespaceConflictError#

Raised when there is a namespace conflict in Ray initialization.

classmethod find_free_port()#

Find a free port on the node.

classmethod has_initialized()#

Check if the cluster has been initialized.

__init__(num_nodes=None, cluster_cfg=None, distributed_log_dir=None)#

Initialize the cluster.

参数:
  • num_nodes (int) -- The number of nodes in the cluster. When you wish to acquire the cluster instance in a processes other than the main driver process, do not pass this argument. Instead, use the Cluster() constructor without arguments. If num_nodes is 0, it will initialize the cluster with all ray-connected nodes.

  • cluster_cfg (Optional[DictConfig]) -- The cluster's configuration dictionary. If set, num_nodes will be ignored and inferred from the config.

  • distributed_log_dir (Optional[str]) -- Output directory for split logs. This must be provided when distributed_logging is True.

static get_full_env_var_name(var)#

Get the full environment variable name with system prefix.

参数:

var (ClusterEnvVar)

返回类型:

str

static get_sys_env_var(env_var, default=None)#

Get the system environment variable for the cluster.

参数:
  • env_var (ClusterEnvVar)

  • default (str | None)

返回类型:

str | None

property num_nodes#

Get the number of nodes in the cluster.

property num_accelerators#

Get the number of accelerators in the cluster.

property accelerator_ranks: list[list[int]]#

Get the global accelerator ranks for each node in the cluster.

static get_alive_nodes()#

Get the list of alive nodes in the Ray cluster.

get_node_group(label='cluster')#

Get the node group information by label.

参数:

label (Optional[str]) -- The label of the node group.

返回:

The node group information.

返回类型:

Optional[NodeGroupInfo]

get_node_info(node_rank)#

Get the NodeInfo of a specific node rank.

参数:

node_rank (int)

get_node_ip(node_rank)#

Get the IP address of a specific node by its rank.

参数:

node_rank (int)

返回类型:

str

classmethod modify_profile_context(python_interpreter_path, worker_name, profiling_cfg)#

Wrap py_executable with a profiler command if profiling is configured.

Dispatches to the accelerator manager that owns the profiling config type, so all profiler-specific CLI construction stays in the hardware layer.

参数:
  • python_interpreter_path (str)

  • worker_name (str)

  • profiling_cfg (ProfileConfig | None)

返回类型:

str

classmethod get_profiling_env_vars_for_worker(worker_name, profiling_cfg)#

Return backend-specific env vars to inject when profiling is active.

Called alongside modify_profile_context(); the returned dict is merged into the worker's environment so that backends relying on env-var configuration (e.g. ROCPROFSYS_OUTPUT_PATH) work without requiring manual node_groups.env_configs entries.

Returns an empty dict when profiling is disabled or no env vars are needed.

参数:
  • worker_name (str)

  • profiling_cfg (ProfileConfig | None)

返回类型:

dict[str, str]

classmethod get_path_env_merge_mode(env_vars)#

Resolve the path-like env merge mode from environment variables.

参数:

env_vars (dict[str, str])

返回类型:

PathEnvMergeMode

classmethod merge_worker_env_vars(base_env_vars, incoming_env_vars, mode)#

Merge worker env vars with special handling for path-like variables.

参数:
  • base_env_vars (dict[str, str])

  • incoming_env_vars (dict[str, str])

  • mode (PathEnvMergeMode)

返回类型:

dict[str, str]