基于 Worker 的编程接口#
本节介绍 RLinf 框架中最基本的组件 —— Worker 和 WorkerGroup,它们是构建整个框架的基石。
Worker#
一个 Worker 表示一个远程进程或计算单元。
通过继承 Worker,一个 Worker 或处理器类将具备以下能力:
在分布式环境中的多个节点上远程运行。
与集群中的其他 Workers 通信。
自动接收如
MASTER_ADDR、MASTER_PORT、RANK、LOCAL_RANK和WORLD_SIZE等必要的环境变量。
这些功能使得进程组的创建变得简单,并简化了分布式训练的设置流程。 一个 Worker 封装了单个执行单元的逻辑,使得在多个 GPU 和节点间扩展任务变得容易。
WorkerTimer#
RLinf 提供轻量级计时工具,用于统计 Worker 函数的耗时。
上下文管理器用法:
class MyWorker(Worker):
def run_training(self, batch):
with self.worker_timer("run_training"):
return self.train_step(batch)
def train_step(self, batch): ...
装饰器用法:
class MyWorker(Worker):
@Worker.timer("run_training")
def run_training(self, batch):
return self.train_step(batch)
@Worker.timer("train_step")
def train_step(self, batch): ...
获取耗时指标:
handle = worker_group.run_training(...)
result = handle.wait()
timing = handle.consume_durations() # dict: tag -> duration
计时 tag 会在 worker group 内进行聚合,默认使用 max 进行归约。
返回的 timing 会同时包含 run_training 和 train_step,
便于统计子函数耗时。
WorkerInfo#
WorkerInfo 数据类 在运行时捕获 Worker 的关键属性。
属性 |
描述 |
|---|---|
|
Worker 的 WorkerAddress |
|
Worker 在其所属组内的编号(rank) |
|
承载该 Worker 的节点标识符 |
|
分配给该 Worker 的 GPU 编号 |
|
承载该 Worker 的节点的 IP 地址 |
|
该 Worker 可用的 CUDA 设备编号列表 |
WorkerAddress#
WorkerAddress 类 为 Workers 提供了层级化的命名机制。
它通过将根组名与一系列 rank 路径组合在一起,为 Worker 群体中的每一个 Worker 提供唯一标识。
例如,根 WorkerGroup 可能命名为 "Worker_group_MyWorker",其中的 Worker 地址如 "Worker_group_MyWorker:0"、"Worker_group_MyWorker:1" 等。
若这些 Worker 创建了子 worker,则地址会继续追加 rank(例如 "Worker_group_MyWorker:0:0" 表示 rank 0 的子 worker)。
WorkerAddress 提供了一些函数用于在层级结构中导航:可通过 get_name() 获取字符串形式的地址,通过 get_parent_rank() 或 get_parent_address() 获取上级信息,或通过 get_child_address(rank) 获取某个 rank 的子地址。
这种地址系统在嵌套结构的集群中尤为关键 —— 任意 Worker 可通过地址引用其他 worker,即使它们不在同一个组内,从而实现灵活的通信模式。
通信方法#
一旦初始化完成,Worker 提供了多个高级方法用于与其他 Worker 通信:
send(object, dst_group_name, dst_rank, async_op=False)和recv(src_group_name, src_rank, async_op=False)可实现任意 Python 对象或张量的传输。 底层通过构造WorkerAddress并使用合适的 collective group 执行点对点通信。针对张量传输进行了优化:
send_tensor(tensor, dst_group_name, dst_rank, async_op=False)和recv_tensor(tensor, src_group_name, src_rank, async_op=False)提供了高效传输单个张量的能力。 由于假设接收端已准备好合适尺寸的缓冲区,因此可避免发送额外的张量形状和类型信息。
Worker 本身并不直接处理通信,而是将通信委托给 CollectiveGroup。
详细内容请见 点对点通信。
除了点对点通信外,Worker 还支持用于 Worker 间数据交换的 通道(Channel) 接口,这是一种先进先出(FIFO)的队列机制:
使用
create_channel(name, node_id=0, maxsize=0)创建新通道, 使用connect_channel(name)允许其他 Worker 按名称连接到已存在的通道。一旦连接成功,可通过
put()、get()和get_batch()等方法存入或提取数据。
这些通道方法展示了 Worker 如何协调更高级别的工作流程:
通道的数据传输仍然基于 Worker 的 send 和 recv 方法完成,
而通道的抽象则管理队列行为和流量控制(详细内容见 使用 Channel 进行通信)。
WorkerGroup#
WorkerGroup 是一个用于创建和管理一组同类 Worker 的工具类。
它简化了在集群中启动多个 Worker 并在其上并行执行方法的流程。其核心特性包括:
组创建:通过
MyWorker.create_group().launch(cluster, placement)可在集群资源上创建一组MyWorker实例。 placement 策略定义了启动的 Worker 数量以及它们分配到的具体节点/GPU(详见 Worker 放置策略)。 在这一过程中,所需的环境变量将自动设置, 并调用Cluster.allocate(...)以使用这些变量在指定节点和 GPU 上启动每个 Ray actor。方法的并行执行:
WorkerGroup的强大之处在于可以像调用一个函数那样一次性在所有 workers 上调用同一个方法。 创建 group 后,WorkerGroup会自动绑定底层Worker类中的所有方法。 当你调用其中一个方法时,它将在所有 Worker 上并行执行该方法(通过 Ray 的远程调用实现)。
示例#
- class rlinf.scheduler.worker.worker.Worker
Class representing a remote process or worker.
Inheriting
Workerwill grant your worker or processor class the ability to run remotely and communicate with other workers in the cluster. Also, essential environment variables like MASTER_ADDR, MASTER_PORT, RANK, LOCAL_RANK, WORLD_SIZE will be set automatically. This allows easy creation of torch process groups and distributed training.The following example shows how to use the Worker class to create a simple distributed worker that can run on multiple GPUs and nodes.
Example:
>>> import torch >>> from rlinf.scheduler import Cluster, Worker >>> >>> class MyWorker(Worker): ... def __init__(self): ... super().__init__() ... ... def initialize(self): ... torch.cuda.set_device(int(os.environ["LOCAL_RANK"])) ... if not torch.distributed.is_initialized(): ... torch.distributed.init_process_group(backend="nccl") ... ... test_tensor = torch.ones( ... size=(1, 1), dtype=torch.float32, device=torch.cuda.current_device() ... ) ... torch.distributed.all_reduce(test_tensor) ... return test_tensor ... ... def hello(self): ... return self._rank >>> >>> cluster = Cluster(num_nodes=1) >>> my_worker_group = MyWorker.create_group().launch(cluster=cluster, name="my_worker_group") >>> my_worker_group.initialize().wait()[0] tensor([[4.]], device='cuda:0') >>> # This will execute the hello method only on ranks 0 and 1. >>> my_worker_group.execute_on(0, 3).hello().wait() [0, 3]
The following example shows the communication capabilities of the Worker class.
Example:
>>> import asyncio >>> import torch >>> from rlinf.scheduler import Cluster, Worker >>> SEND_GROUP_NAME = "send_worker_group" >>> RECV_GROUP_NAME = "recv_worker_group" >>> >>> class SendWorker(Worker): ... def __init__(self): ... super().__init__() ... ... def hello_recv(self): ... # 1. Send a message (string or any serializable object) to the RecvWorker group with the same rank as this SendWorker worker. ... msg = f"Hello from SendWorker Rank {self._rank}!" ... self.send(msg, dst_group_name=RECV_GROUP_NAME, dst_rank=self._rank) ... ... # 2. Receive a reply from the RecvWorker group with the same rank. ... reply = self.recv( ... src_group_name=RECV_GROUP_NAME, src_rank=self._rank ... ) ... ... # 3. The send/recv APIs can also handle tensor, list of tensors and dict of tensors. ... torch.cuda.set_device(int(os.environ["LOCAL_RANK"])) ... dst_rank = ( ... self._rank + 1 ... ) % self._world_size # Send to the next rank in the group ... tensor = torch.ones( ... size=(1, 1), ... dtype=torch.float32, ... device=torch.cuda.current_device(), ... ) ... self.send(tensor, dst_group_name=RECV_GROUP_NAME, dst_rank=dst_rank) ... ... tensor_list = [ ... torch.tensor( ... 1.0, dtype=torch.float32, device=torch.cuda.current_device() ... ) ... for _ in range(4) ... ] ... self.send( ... tensor_list, dst_group_name=RECV_GROUP_NAME, dst_rank=dst_rank ... ) ... ... tensor_dict = { ... "tensor1": torch.tensor( ... 2.0, dtype=torch.float32, device=torch.cuda.current_device() ... ), ... "tensor2": torch.tensor( ... 3.0, dtype=torch.float32, device=torch.cuda.current_device() ... ), ... } ... self.send( ... tensor_dict, dst_group_name=RECV_GROUP_NAME, dst_rank=dst_rank ... ) ... ... # 4. Send tensor directly without metadata overhead if you already know the tensor shape and dtype at the recv side ... tensor = torch.ones( ... size=(2, 1), ... dtype=torch.float32, ... device=torch.cuda.current_device(), ... ) ... self.send_tensor( ... tensor, dst_group_name=RECV_GROUP_NAME, dst_rank=dst_rank ... ) ... ... def hello_recv_async(self): ... # 1. Send a tensor asynchronously to the RecvWorker group with the next rank. ... dst_rank = (self._rank + 1) % self._world_size ... tensor = torch.ones( ... size=(3, 1), ... dtype=torch.float32, ... device=torch.cuda.current_device(), ... ) ... async_send_work = self.send( ... tensor, ... dst_group_name=RECV_GROUP_NAME, ... dst_rank=dst_rank, ... async_op=True, ... ) ... async_send_work.wait() # Wait for the async send to complete ... ... # 2. Send a tensor asynchronously and use asyncio to wait for the operation to complete. ... async def send_tensor_async(): ... dst_rank = (self._rank + 1) % self._world_size ... tensor = torch.ones( ... size=(4, 1), ... dtype=torch.float32, ... device=torch.cuda.current_device(), ... ) ... async_send_work = self.send( ... tensor, ... dst_group_name=RECV_GROUP_NAME, ... dst_rank=dst_rank, ... async_op=True, ... ) ... await async_send_work.async_wait() ... ... asyncio.run(send_tensor_async()) >>> >>> class RecvWorker(Worker): ... def __init__(self): ... super().__init__() ... ... def hello_recv(self): ... # 1. Receive a message from the SendWorker worker group with the same rank. ... msg = self.recv(src_group_name=SEND_GROUP_NAME, src_rank=self._rank) ... ... # 2. Send a reply back to the SendWorker worker group with the same rank. ... reply = f"Hello from RecvWorker Rank {self._rank}!" ... self.send( ... reply, dst_group_name=SEND_GROUP_NAME, dst_rank=self._rank ... ) ... ... # 3. Receive a tensor, tensor list and tensor dict from the SendWorker worker group with the same rank. ... torch.cuda.set_device(int(os.environ["LOCAL_RANK"])) ... src_rank = ( ... self._rank - 1 ... ) % self._world_size # Receive from the previous rank in the group ... tensor = self.recv( ... src_group_name=SEND_GROUP_NAME, src_rank=src_rank ... ) ... tensor_list = self.recv( ... src_group_name=SEND_GROUP_NAME, src_rank=src_rank ... ) ... tensor_dict = self.recv( ... src_group_name=SEND_GROUP_NAME, src_rank=src_rank ... ) ... ... # 4. In-place receive tensor directly without metadata overhead ... tensor = torch.empty( ... size=(2, 1), ... dtype=torch.float32, ... device=torch.cuda.current_device(), ... ) ... self.recv_tensor( ... tensor, src_group_name=SEND_GROUP_NAME, src_rank=src_rank ... ) ... ... def hello_recv_async(self): ... # 1. Receive a tensor asynchronously from the SendWorker group with the next rank. ... src_rank = (self._rank - 1) % self._world_size ... async_recv_work = self.recv( ... src_group_name=SEND_GROUP_NAME, src_rank=src_rank, async_op=True ... ) ... tensor = async_recv_work.wait() ... ... # 2. Receive a tensor asynchronously and use asyncio to wait for the operation to complete. ... async def recv_tensor_async(): ... src_rank = (self._rank - 1) % self._world_size ... async_recv_work = self.recv( ... src_group_name=SEND_GROUP_NAME, ... src_rank=src_rank, ... async_op=True, ... ) ... tensor = await async_recv_work.async_wait() ... ... asyncio.run(recv_tensor_async()) >>> >>> cluster = Cluster(num_nodes=1) >>> send_group = SendWorker.create_group().launch(cluster=cluster, name=SEND_GROUP_NAME) >>> recv_group = RecvWorker.create_group().launch(cluster=cluster, name=RECV_GROUP_NAME) >>> res = send_group.hello_recv() >>> res = recv_group.hello_recv().wait() >>> res = send_group.hello_recv_async() >>> res = recv_group.hello_recv_async().wait()
总结#
总而言之,Worker 模块是分布式执行的基础。
WorkerAddress 为每个 Worker 提供唯一标识,支持嵌套结构;
WorkerInfo 保存运行时的元信息;
Worker 类管理每个分布式 Worker 的生命周期。
在此之上,WorkerGroup 可创建并管理多个 worker,负责其 placement 及方法的并行执行。
这些抽象隐藏了大量与 Ray 和底层环境设置相关的细节,使用户可以更专注于构建高层的分布式强化学习算法逻辑。