基于 Ray 的集群启动#
Cluster 类负责连接 Ray 集群,并启动管理 actor 与 worker actor。 它作为整个集群资源的单例入口,提供了在特定节点和 GPU 上分配 worker 的方法。 通过封装 Ray 的初始化与节点信息,它简化了整个框架的分布式部署流程。
备注
Ray 版本要求:RLinf 需要 ray>=2.47.0 (在导入时强制检查)。
请 不要 在创建 Cluster 之前调用 ray.init ,
否则可能会破坏命名空间和日志配置。
初始化与 Ray 设置#
当创建一个 Cluster 实例时,其初始化过程包括以下步骤:
Ray 初始化:如果 Ray 尚未启动,将使用命名空间
Cluster.NAMESPACE调用ray.init()。等待节点就绪:Ray 初始化后,
Cluster会等待num_nodes个节点在 Ray 中注册完成。收集节点信息:节点就绪后,
Cluster会构造一个NodeInfo列表(包含 Ray ID、IP、CPU 和 GPU 数量)。 “主节点”排在列表首位,其余节点按 IP 排序。主地址与端口:记录主节点的 IP,并选择一个空闲的 TCP 端口用于集体通信。
全局管理器 actor:初始化阶段还会启动三个全局管理器 actor:
WorkerManager:追踪每个 worker 的元信息CollectiveManager:存储集体组信息,包括通信端口等NodeManager:为 workers 提供节点布局(IP、GPU 数、主端口)
使用 Cluster 分配 Worker#
Cluster.allocate() 会在 指定节点 上以受控环境启动一个 Ray actor:
handle = Cluster.allocate(
cls, # 要启动的 actor 类
worker_name, # 该 actor 的唯一可读名称
node_id, # 节点在 Cluster 列表中的索引(0 表示主节点)
gpu_id, # 节点内的本地 GPU 编号(用于隔离)
env_vars, # 该 actor 的环境变量(如 CUDA_VISIBLE_DEVICES)
cls_args=[], # 传给构造函数的位置参数
cls_kwargs={}, # 传给构造函数的关键字参数
)
该方法执行以下操作:
验证
node_id与gpu_id是否存在于已发现的集群拓扑中。使用
ray.remote(cls)包装传入的类。应用如下运行配置:
runtime_env={"env_vars": env_vars}(传递变量如CUDA_VISIBLE_DEVICES、rank等)name=worker_name(使 actor 可通过名称发现)scheduling_strategy=NodeAffinitySchedulingStrategy(node_id=<Ray NodeID>, soft=False)(将 actor 固定调度到指定的 物理节点)
最后,调用 .remote(*cls_args, **cls_kwargs) 异步启动 actor,并返回该 actor 的句柄。
连接已有集群#
当不带参数创建 Cluster 时,它会自动附着到当前运行的集群:
确保 Ray 以
address="auto"和指定命名空间初始化;从
NodeManager获取现有管理器与共享状态:self._node_manager = NodeManager.get_proxy() self._nodes = self._node_manager.get_nodes() self._num_nodes = len(self._nodes) self._master_ip = self._node_manager.get_master_ip() self._master_port = self._node_manager.get_master_port()
这可以确保所有使用相同命名空间的进程观察到相同的集群视图。
总结#
Cluster 单例统一管理 Ray 初始化、节点发现、以及管理器 actor 生命周期,并维持在一个稳定命名空间下。
主驱动程序只需初始化一次(启动管理器、选定主节点),
后续所有进程只需附着连接,并共享该集群视图。
通过 allocate() 方法,用户可以可靠地将 actor 安置在指定节点,并设置其环境变量,
从而在整个框架中保持分布式调度的稳定性与一致性。