Worker 接口#

本节将详细介绍 RLinf 中 WorkerWorkerGroup 的统一接口设计。 Worker 是 RLinf 中最基本的执行单元。RL 训练的不同阶段都会继承自 Worker,从而实现统一的通信与调度。 WorkerGroup 则是多个 Worker 的集合,它让用户无需直接处理分布式训练的复杂性。 通过 WorkerGroup,用户可以更方便地管理和调度多个 Worker,从而实现更高效的分布式训练。

Worker#

class rlinf.scheduler.Worker#

Class representing a remote process or worker.

Inheriting Worker will grant your worker or processor class the ability to run remotely and communicate with other workers in the cluster. Also, essential environment variables like MASTER_ADDR, MASTER_PORT, RANK, LOCAL_RANK, WORLD_SIZE will be set automatically. This allows easy creation of torch process groups and distributed training.

The following example shows how to use the Worker class to create a simple distributed worker that can run on multiple GPUs and nodes.

Example:

>>> import torch
>>> from rlinf.scheduler import Cluster, Worker
>>>
>>> class MyWorker(Worker):
...     def __init__(self):
...         super().__init__()
...
...     def initialize(self):
...         torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
...         if not torch.distributed.is_initialized():
...             torch.distributed.init_process_group(backend="nccl")
...
...         test_tensor = torch.ones(
...             size=(1, 1), dtype=torch.float32, device=torch.cuda.current_device()
...         )
...         torch.distributed.all_reduce(test_tensor)
...         return test_tensor
...
...     def hello(self):
...         return self._rank
>>>
>>> cluster = Cluster(num_nodes=1)
>>> my_worker_group = MyWorker.create_group().launch(cluster=cluster, name="my_worker_group")
>>> my_worker_group.initialize().wait()[0]
tensor([[4.]], device='cuda:0')
>>> # This will execute the hello method only on ranks 0 and 1.
>>> my_worker_group.execute_on(0, 3).hello().wait()
[0, 3]

The following example shows the communication capabilities of the Worker class.

Example:

>>> import asyncio
>>> import torch
>>> from rlinf.scheduler import Cluster, Worker
>>> SEND_GROUP_NAME = "send_worker_group"
>>> RECV_GROUP_NAME = "recv_worker_group"
>>>
>>> class SendWorker(Worker):
...     def __init__(self):
...         super().__init__()
...
...     def hello_recv(self):
...         # 1. Send a message (string or any serializable object) to the RecvWorker group with the same rank as this SendWorker worker.
...         msg = f"Hello from SendWorker Rank {self._rank}!"
...         self.send(msg, dst_group_name=RECV_GROUP_NAME, dst_rank=self._rank)
...
...         # 2. Receive a reply from the RecvWorker group with the same rank.
...         reply = self.recv(
...             src_group_name=RECV_GROUP_NAME, src_rank=self._rank
...         )
...
...         # 3. The send/recv APIs can also handle tensor, list of tensors and dict of tensors.
...         torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
...         dst_rank = (
...             self._rank + 1
...         ) % self._world_size  # Send to the next rank in the group
...         tensor = torch.ones(
...             size=(1, 1),
...             dtype=torch.float32,
...             device=torch.cuda.current_device(),
...         )
...         self.send(tensor, dst_group_name=RECV_GROUP_NAME, dst_rank=dst_rank)
...
...         tensor_list = [
...             torch.tensor(
...                 1.0, dtype=torch.float32, device=torch.cuda.current_device()
...             )
...             for _ in range(4)
...         ]
...         self.send(
...             tensor_list, dst_group_name=RECV_GROUP_NAME, dst_rank=dst_rank
...         )
...
...         tensor_dict = {
...             "tensor1": torch.tensor(
...                 2.0, dtype=torch.float32, device=torch.cuda.current_device()
...             ),
...             "tensor2": torch.tensor(
...                 3.0, dtype=torch.float32, device=torch.cuda.current_device()
...             ),
...         }
...         self.send(
...             tensor_dict, dst_group_name=RECV_GROUP_NAME, dst_rank=dst_rank
...         )
...
...         # 4. Send tensor directly without metadata overhead if you already know the tensor shape and dtype at the recv side
...         tensor = torch.ones(
...             size=(2, 1),
...             dtype=torch.float32,
...             device=torch.cuda.current_device(),
...         )
...         self.send_tensor(
...             tensor, dst_group_name=RECV_GROUP_NAME, dst_rank=dst_rank
...         )
...
...     def hello_recv_async(self):
...         # 1. Send a tensor asynchronously to the RecvWorker group with the next rank.
...         dst_rank = (self._rank + 1) % self._world_size
...         tensor = torch.ones(
...             size=(3, 1),
...             dtype=torch.float32,
...             device=torch.cuda.current_device(),
...         )
...         async_send_work = self.send(
...             tensor,
...             dst_group_name=RECV_GROUP_NAME,
...             dst_rank=dst_rank,
...             async_op=True,
...         )
...         async_send_work.wait()  # Wait for the async send to complete
...
...         # 2. Send a tensor asynchronously and use asyncio to wait for the operation to complete.
...         async def send_tensor_async():
...             dst_rank = (self._rank + 1) % self._world_size
...             tensor = torch.ones(
...                 size=(4, 1),
...                 dtype=torch.float32,
...                 device=torch.cuda.current_device(),
...             )
...             async_send_work = self.send(
...                 tensor,
...                 dst_group_name=RECV_GROUP_NAME,
...                 dst_rank=dst_rank,
...                 async_op=True,
...             )
...             await async_send_work.async_wait()
...
...         asyncio.run(send_tensor_async())
>>>
>>> class RecvWorker(Worker):
...     def __init__(self):
...         super().__init__()
...
...     def hello_recv(self):
...         # 1. Receive a message from the SendWorker worker group with the same rank.
...         msg = self.recv(src_group_name=SEND_GROUP_NAME, src_rank=self._rank)
...
...         # 2. Send a reply back to the SendWorker worker group with the same rank.
...         reply = f"Hello from RecvWorker Rank {self._rank}!"
...         self.send(
...             reply, dst_group_name=SEND_GROUP_NAME, dst_rank=self._rank
...         )
...
...         # 3. Receive a tensor, tensor list and tensor dict from the SendWorker worker group with the same rank.
...         torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
...         src_rank = (
...             self._rank - 1
...         ) % self._world_size  # Receive from the previous rank in the group
...         tensor = self.recv(
...             src_group_name=SEND_GROUP_NAME, src_rank=src_rank
...         )
...         tensor_list = self.recv(
...             src_group_name=SEND_GROUP_NAME, src_rank=src_rank
...         )
...         tensor_dict = self.recv(
...             src_group_name=SEND_GROUP_NAME, src_rank=src_rank
...         )
...
...         # 4. In-place receive tensor directly without metadata overhead
...         tensor = torch.empty(
...             size=(2, 1),
...             dtype=torch.float32,
...             device=torch.cuda.current_device(),
...         )
...         self.recv_tensor(
...             tensor, src_group_name=SEND_GROUP_NAME, src_rank=src_rank
...         )
...
...     def hello_recv_async(self):
...         # 1. Receive a tensor asynchronously from the SendWorker group with the next rank.
...         src_rank = (self._rank - 1) % self._world_size
...         async_recv_work = self.recv(
...             src_group_name=SEND_GROUP_NAME, src_rank=src_rank, async_op=True
...         )
...         tensor = async_recv_work.wait()
...
...         # 2. Receive a tensor asynchronously and use asyncio to wait for the operation to complete.
...         async def recv_tensor_async():
...             src_rank = (self._rank - 1) % self._world_size
...             async_recv_work = self.recv(
...                 src_group_name=SEND_GROUP_NAME,
...                 src_rank=src_rank,
...                 async_op=True,
...             )
...             tensor = await async_recv_work.async_wait()
...
...         asyncio.run(recv_tensor_async())
>>>
>>> cluster = Cluster(num_nodes=1)
>>> send_group = SendWorker.create_group().launch(cluster=cluster, name=SEND_GROUP_NAME)
>>> recv_group = RecvWorker.create_group().launch(cluster=cluster, name=RECV_GROUP_NAME)
>>> res = send_group.hello_recv()
>>> res = recv_group.hello_recv().wait()
>>> res = send_group.hello_recv_async()
>>> res = recv_group.hello_recv_async().wait()
__init__(parent_address=None, world_size=None, rank=None)#

Initialize the Worker with the given parent address and world size.

Only non-Ray workers should provide parent_address, world_size and rank. For example, when a Worker is created via multiprocessing by another Worker, the parent address, world size and rank should be provided.

参数:
  • parent_address (Optional[WorkerAddress]) -- The address of the parent worker. This is used to set up the WorkerAddress for this worker.

  • world_size (Optional[int]) -- The total number of workers in the group. If not provided, it will be set to the environment variable WORLD_SIZE.

  • rank (Optional[int]) -- The rank of this worker in the group. If not provided, it will be set to the environment variable RANK.

property worker_address: WorkerAddress#

Get the WorkerAddress of the worker.

This is used to identify the worker in the WorkerGroup.

classmethod create_group(*args, **kwargs)#

Create a worker group with the class arguments.

参数:
  • args -- The positional arguments of the class.

  • kwargs -- The keyword arguments of the class.

返回类型:

WorkerGroup[WorkerClsType] | WorkerClsType

send(object, dst_group_name, dst_rank, async_op=False, options=None, piggyback_payload=None)#

Send an object to a specific worker address in the collective group.

The function is specially optimized for torch.Tensor, List of torch.Tensor, Dict of torch.Tensor, and dataclass containing torch.Tensor, which go through NCCL when the contained tensors are on GPU. Otherwise, all communications go through GLOO.

备注

Do not mix send with recv_tensor

备注

We only use NCCL primitives when the list or dict values only contain GPU tensors. We also see complex dicts with deep hierarchy as common Python objects, which will be serialized into a CPU tensor and sent through GLOO.

备注

When transferring GPU objects, the first send needs to be paired with a recv at the other end. Calling async send or recv first at both ends will result in communication hang, because NCCL communicators are established in a lazy manner when the first pair of send/recv is called.

备注

Do not mix CPU and GPU tensors in a list or dict.

备注

This method is not thread safe.

参数:
  • object (torch.Tensor | List[torch.Tensor] | Dict[str, torch.Tensor] | Any) -- The object to send.

  • dst_group_name (str) -- The name of the destination worker group.

  • dst_rank (int | List[int]) -- The rank or list of ranks in the destination worker group to send the object to. For SPMD-like workers, this should be a single rank. For SPSD-like workers forked by parent workers, this can be a list of ranks that forms a path from the root worker to the target worker.

  • async_op (bool) -- Whether to perform the operation asynchronously.

  • options (Optional[CollectiveGroupOptions]) -- The options for the collective group. The options will only take effect when two workers first communicate with each other, and will be ignored for subsequent communications. This option must match the options of the recv side.

  • piggyback_payload (Optional[Any]) -- The payload to piggyback on the send operation. This payload will be sent to the recv side and can be used to pass additional information to the recv side without disrupting the object's data structure, e.g., list/dict of tensors that are optimized for sending.

返回:

An AsyncWork object if async_op is True, otherwise None.

返回类型:

Optional[AsyncWork]

recv(src_group_name, src_rank, async_op=False, options=None)#

Out-of-place receive of an object from a specific worker address in the collective group.

备注

Do not mix recv with send_tensor

备注

When transferring GPU objects, the first send needs to be paired with a recv at the other end. Calling async send or recv first at both ends will result in communication hang, because NCCL communicators are established in a lazy manner when the first pair of send/recv is called.

备注

This method is not thread safe.

参数:
  • async_op (bool) -- Whether to perform the operation asynchronously.

  • src_group_name (str) -- The name of the source worker group.

  • src_rank (int | List[int]) -- The rank or list of ranks in the source worker group to receive the object from. For SPMD-like workers, this should be a single rank. For SPSD-like workers forked by parent workers, this can be a list of ranks that forms a path from the root worker to the target worker.

  • options (Optional[CollectiveGroupOptions]) -- The options for the collective group. The options will only take effect when two workers first communicate with each other, and will be ignored for subsequent communications. This option must match the options of the send side.

返回:

An AsyncWork object if async_op is True, otherwise the received object. If the send side sends a piggyback payload, the received object will be a tuple of the received object and the piggyback payload.

返回类型:

AsyncWork | torch.Tensor | List[torch.Tensor] | Dict[str, torch.Tensor] | Any

send_tensor(tensor, dst_group_name, dst_rank, async_op=False, options=None)#

Send a tensor to a specific worker address in the collective group. This function is optimized for sending a single tensor and does not introduce metadata communication overhead like send. But it needs to be paired with the in-place recv_tensor function which requires apriori knowledge of the tensor shape and dtype.

备注

Do not mix send_tensor with recv

备注

When transferring GPU objects, the first send_tensor needs to be paired with a recv_tensor at the other end. Calling async send_tensor or recv_tensor first at both ends will result in communication hang, because NCCL communicators are established in a lazy manner when the first pair of send/recv is called.

备注

This method is not thread safe.

参数:
  • tensor (torch.Tensor) -- The tensor to send.

  • dst_group_name (str) -- The name of the destination worker group.

  • dst_rank (int | List[int]) -- The rank or list of ranks in the destination worker group to send the tensor to. For SPMD-like workers, this should be a single rank. For SPSD-like workers forked by parent workers, this can be a list of ranks that forms a path from the root worker to the target worker.

  • async_op (bool) -- Whether to perform the operation asynchronously.

  • options (Optional[CollectiveGroupOptions]) -- The options for the collective group. The options will only take effect when two workers first communicate with each other, and will be ignored for subsequent communications. This option must match the options of the send side.

返回:

An AsyncWork object if async_op is True, otherwise None.

返回类型:

Optional[AsyncWork]

recv_tensor(tensor, src_group_name, src_rank, async_op=False, options=None)#

In-place receive of a tensor from a specific worker address in the collective group. This function is optimized for receiving a single tensor and does not introduce metadata communication overhead like recv. But it requires preallocation of the tensor with the correct shape and dtype.

备注

Do not mix recv_tensor with send

备注

When transferring GPU objects, the first send_tensor needs to be paired with a recv_tensor at the other end. Calling async send_tensor or recv_tensor first at both ends will result in communication hang, because NCCL communicators are established in a lazy manner when the first pair of send/recv is called.

备注

This method is not thread safe.

参数:
  • tensor (torch.Tensor) -- The tensor to receive. It must be preallocated with the correct shape and dtype.

  • src_group_name (str) -- The name of the source worker group.

  • src_rank (int | List[int]) -- The rank or list of ranks in the source worker group to receive the tensor from. For SPMD-like workers, this should be a single rank. For SPSD-like workers forked by parent workers, this can be a list of ranks that forms a path from the root worker to the target worker.

  • async_op (bool) -- Whether to perform the operation asynchronously.

  • options (Optional[CollectiveGroupOptions]) -- The options for the collective group. The options will only take effect when two workers first communicate with each other, and will be ignored for subsequent communications. This option must match the options of the send side.

返回:

An AsyncWork object if async_op is True, otherwise None.

返回类型:

Optional[AsyncWork]

broadcast(object=None, groups=None, src=None, async_op=False, options=None)#

Broadcast an object across workers in one or more groups.

The source is the first worker address in the expanded group list. The index in the expanded list is the rank in the communication group. All participating workers must call this method with identical arguments.

参数:
  • object (Any) -- The object to broadcast on the source worker. For non-src ranks, this is typically None.

  • groups (list[tuple[str, list[int] | list[tuple[int]] | tuple[int] | int]] | None) -- The participating groups with ranks. Each element must be a (group_name, ranks) tuple where ranks is either a single int (one worker of the rank), a list of ints (multiple workers of the same group), a tuple of ints (one worker of the rank path), or a list of tuples of ints (multiple workers of the rank paths of the same group).

  • src (tuple[str, tuple[int] | int] | None) -- The source group and rank. If not provided, the source will be the first worker address in the expanded group list.

  • async_op (bool) -- Whether to perform the operation asynchronously.

  • options (Optional[CollectiveGroupOptions]) -- The options for the collective group.

返回:

An AsyncWork object if async_op is True, otherwise the broadcast object.

返回类型:

AsyncWork | Any

create_channel(channel_name, maxsize=0, distributed=False, node_rank=0, local=False)#

Create a new channel with the specified placement rank and maximum size.

参数:
  • channel_name (str) -- The name of the channel.

  • maxsize (int) -- The maximum size of the channel queue. Defaults to 0 (unbounded).

  • distributed (bool) -- Whether the channel should be distributed. A distributed channel creates a distributed worker on each node, and routes communications to the channel worker on the same node as the current worker, benefitting from the locality of the data. The routing is based on the key of the put/get APIs. So if you expect the key to be randomly distributed, you should set this to False to avoid unnecessary routing overhead.

  • node_rank (int) -- The node rank of the current worker. Only valid when distributed is False.

  • local (bool) -- Create the channel for intra-process communication. A local channel cannot be connected by other workers, and its data cannot be shared among different processes.

返回:

A new instance of the Channel class.

返回类型:

Channel

connect_channel(channel_name)#

Connect to an existing channel.

参数:

channel_name (str) -- The name of the channel to connect to.

返回:

An instance of the Channel class connected to the specified channel.

返回类型:

Channel

WorkerGroup#

class rlinf.scheduler.worker.WorkerGroup#

The class that enables a worker to become a group of workers that can be executed collectively.

class WorkerRank#

A class that represents the ray actor and its rank in the worker group.

__init__(worker, rank)#
参数:
  • worker (ObjectRef)

  • rank (int)

返回类型:

None

__init__(worker_cls, args, kwargs)#

Initialize the WorkerGroup with a worker class. Used as a decorator to create a worker group.

参数:
  • worker_cls (Type[Worker]) -- The worker class to be used in the group.

  • args -- The positional arguments of the class.

  • kwargs -- The keyword arguments of the class.

property worker_cls_name: str#

Get the name of the worker class.

property worker_group_name: str#

Get the name of the worker group.

property worker_info_list#

Get the list of workers in the group.

classmethod from_group_name(worker_cls, group_name)#

Retrieve an existing worker group based on its worker class and group name.

参数:
  • worker_cls (type[ClsType])

  • group_name (str)

返回类型:

WorkerGroup[ClsType] | ClsType

launch(cluster, placement_strategy=None, name=None, max_concurrency=None, isolate_gpu=True, catch_system_failure=None, disable_distributed_log=False)#

Create a worker group with the specified cluster and options.

参数:
  • cluster (ClusterResource) -- The cluster resource to use for worker placement.

  • placement_strategy (Optional[PlacementStrategy]) -- The strategy to use for placing workers on nodes.

  • name (str, optional) -- The name of the worker group.

  • max_concurrency (Optional[int]) -- The maximum concurrency for the worker's underlying ray actor. See https://docs.ray.io/en/latest/ray-core/actors/async_api.html#setting-concurrency-in-async-actors for detailed explanation.

  • isolate_gpu (bool) -- Whether a worker should only see the GPUs that it's assigned via controlling CUDA_VISIBLE_DEVICES. Defaults to True.

  • catch_system_failure (Optional[bool]) -- Whether to catch system exit and signals in the worker process. If None, the environment variable RLINF_CATCH_FAILURE will take effect, whose default value is True. If set, then it will override the environment variable.

  • disable_distributed_log (bool) -- Whether to disable distributed log for the worker group.

  • self (WorkerGroup[WorkerClsType])

返回:

An instance of WorkerGroup with the specified configuration.

返回类型:

WorkerGroup

execute_on(*ranks)#

Set the ranks to execute functions on in the worker group. This function only affects the immediately subsequent call of any remote function of the WorkerGroup. After one call, the execute_on state is reset to execute on all ranks.

参数:
  • ranks (int) -- ranks to execute functions on. If None, all workers will be executed.

  • self (WorkerGroup[WorkerClsType])

返回类型:

WorkerGroup[WorkerClsType] | WorkerClsType