Cluster 接口#
本节介绍 RLinf 中的 Cluster 类,它负责启动远程节点和 GPU。 该类基于从 Placement 策略获取的元数据,利用 Ray 来精确调度所有训练资源,以支持分布式训练。
Cluster#
- class rlinf.scheduler.cluster.Cluster#
A singleton class that manages the cluster resources for Ray workers.
- exception NamespaceConflictError#
Raised when there is a namespace conflict in Ray initialization.
- classmethod find_free_port()#
Find a free port on the node.
- classmethod has_initialized()#
Check if the cluster has been initialized.
- __init__(num_nodes=None, cluster_cfg=None, distributed_log_dir=None)#
Initialize the cluster.
- 参数:
num_nodes (
int) -- The number of nodes in the cluster. When you wish to acquire the cluster instance in a processes other than the main driver process, do not pass this argument. Instead, use theCluster()constructor without arguments. If num_nodes is 0, it will initialize the cluster with all ray-connected nodes.cluster_cfg (
Optional[DictConfig]) -- The cluster's configuration dictionary. If set, num_nodes will be ignored and inferred from the config.distributed_log_dir (
Optional[str]) -- Output directory for split logs. This must be provided whendistributed_loggingis True.
- static get_full_env_var_name(var)#
Get the full environment variable name with system prefix.
- 参数:
var (ClusterEnvVar)
- 返回类型:
str
- static get_sys_env_var(env_var, default=None)#
Get the system environment variable for the cluster.
- 参数:
env_var (ClusterEnvVar)
default (str | None)
- 返回类型:
str | None
- property num_nodes#
Get the number of nodes in the cluster.
- property num_accelerators#
Get the number of accelerators in the cluster.
- property accelerator_ranks: list[list[int]]#
Get the global accelerator ranks for each node in the cluster.
- static get_alive_nodes()#
Get the list of alive nodes in the Ray cluster.
- get_node_group(label='cluster')#
Get the node group information by label.
- 参数:
label (
Optional[str]) -- The label of the node group.- 返回:
The node group information.
- 返回类型:
Optional[NodeGroupInfo]
- get_node_info(node_rank)#
Get the NodeInfo of a specific node rank.
- 参数:
node_rank (int)
- get_node_ip(node_rank)#
Get the IP address of a specific node by its rank.
- 参数:
node_rank (int)
- 返回类型:
str
- classmethod modify_profile_context(python_interpreter_path, worker_name, profiling_cfg)#
Wrap
py_executablewith a profiler command if profiling is configured.Dispatches to the accelerator manager that owns the profiling config type, so all profiler-specific CLI construction stays in the hardware layer.
- 参数:
python_interpreter_path (str)
worker_name (str)
profiling_cfg (ProfileConfig | None)
- 返回类型:
str
- classmethod get_profiling_env_vars_for_worker(worker_name, profiling_cfg)#
Return backend-specific env vars to inject when profiling is active.
Called alongside
modify_profile_context(); the returned dict is merged into the worker's environment so that backends relying on env-var configuration (e.g.ROCPROFSYS_OUTPUT_PATH) work without requiring manualnode_groups.env_configsentries.Returns an empty dict when profiling is disabled or no env vars are needed.
- 参数:
worker_name (str)
profiling_cfg (ProfileConfig | None)
- 返回类型:
dict[str, str]
- classmethod get_path_env_merge_mode(env_vars)#
Resolve the path-like env merge mode from environment variables.
- 参数:
env_vars (dict[str, str])
- 返回类型:
PathEnvMergeMode
- classmethod merge_worker_env_vars(base_env_vars, incoming_env_vars, mode)#
Merge worker env vars with special handling for path-like variables.
- 参数:
base_env_vars (dict[str, str])
incoming_env_vars (dict[str, str])
mode (PathEnvMergeMode)
- 返回类型:
dict[str, str]