Worker-Based Programming Interface#

In this section, we introduce the most fundamental components of the RLinf framework — Worker and WorkerGroup — the building blocks upon which the entire framework is constructed.

Worker#

A Worker represents a single remote process or computational unit. By inheriting from Worker, a worker or processor class gains the ability to:

  • Run remotely across nodes in a distributed environment.

  • Communicate with other workers in the cluster.

  • Automatically receive essential environment variables such as MASTER_ADDR, MASTER_PORT, RANK, LOCAL_RANK, and WORLD_SIZE.

These features enable the seamless creation of process groups and simplify distributed training setup. A Worker encapsulates the logic for an individual execution unit, making it easy to scale tasks across multiple GPUs and nodes.

WorkerTimer#

RLinf provides a lightweight timer utility to profile worker functions.

Context manager usage:

class MyWorker(Worker):
    def run_training(self, batch):
        with self.worker_timer("run_training"):
            return self.train_step(batch)

    def train_step(self, batch): ...

Decorator usage:

class MyWorker(Worker):
    @Worker.timer("run_training")
    def run_training(self, batch):
        return self.train_step(batch)

    @Worker.timer("train_step")
    def train_step(self, batch): ...

Retrieving timing metrics:

handle = worker_group.run_training(...)
result = handle.wait()
timing = handle.consume_durations()  # dict of tag -> duration

The timer tags are aggregated across ranks in the worker group. By default, consume_durations() reduces with max.

The returned timing map contains both run_training and train_step so you can profile nested/child functions explicitly.

WorkerInfo#

The WorkerInfo dataclass captures key properties of a worker at runtime.

Attribute

Description

address

WorkerAddress of the worker

rank

Rank of the worker within its group

node_id

Identifier of the node hosting the worker

gpu_id

Identifier of the GPU assigned to the worker

node_ip

IP address of the node hosting the worker

available_gpus

List of CUDA device IDs available to worker

WorkerAddress#

The WorkerAddress class provides a hierarchical naming scheme for Workers. It combines a root group name with an ordered path of ranks to uniquely identify a worker in a worker group structure.

For instance, a root worker group might be named "Worker_group_MyWorker", and Workers within it have addresses like "Worker_group_MyWorker:0", "Worker_group_MyWorker:1", etc. If those Workers spawn their own sub-Workers, additional ranks are appended (e.g. "Worker_group_MyWorker:0:0" for a child of rank 0). The WorkerAddress supports operations to navigate this hierarchy: one can get a string name via get_name(), retrieve the parent’s rank or address (get_parent_rank(), get_parent_address()), or derive a child’s address (get_child_address(rank)).

This address system is crucial for identifying Workers across the cluster in a nested scenario—any worker can refer to another by its address, even across different groups, enabling flexible communication patterns.

Communication Methods#

Once initialized, a Worker exposes high-level methods to communicate with other Workers:

  • send(object, dst_group_name, dst_rank, async_op=False) and the counterpart recv(src_group_name, src_rank, async_op=False) allow transferring arbitrary Python objects or tensors between Workers. Under the hood, these calls construct a WorkerAddress for the peer and use an appropriate collective group to perform point-to-point communication.

  • Optimized tensor operations: send_tensor(tensor, dst_group_name, dst_rank, async_op=False) and recv_tensor(tensor, src_group_name, src_rank, async_op=False) are specialized for sending a single tensor efficiently. They avoid sending extra metadata about tensor shapes and types by assuming the receiver is already prepared with a correctly sized tensor buffer.

The Worker does not handle communication directly; instead, it delegates the actual communication to a CollectiveGroup. See P2P Communication for more details.

In addition to pairwise communications, the Worker also provides an interface for Channels, which are FIFO queues for exchanging data between Workers:

  • create_channel(name, node_id=0, maxsize=0) sets up a new channel. connect_channel(name) allows other Workers to connect to an existing channel by name.

  • Once a channel is connected, data can be stored and retrieved through it using methods such as put(), get(), and get_batch().

These channel methods show how Workers coordinate higher-level workflows: the actual data transfer in channels still relies on the Worker’s send and recv methods, while the channel abstraction takes care of queuing data and controlling the flow (see Channel Queuing for Pipelining for details).

WorkerGroup#

WorkerGroup is a utility for creating and managing a collection of Workers of the same type. It simplifies the process of launching multiple Workers across the cluster and executing methods on them in parallel. Key aspects of WorkerGroup include:

  • Group Creation: Calling MyWorker.create_group().launch(cluster, placement) creates a group of MyWorker instances on the cluster’s resources. The placement strategy defines how many workers are launched and the specific node/GPU each will occupy (see Worker Placement Strategy for details). During this process, the environment variables required for distributed execution are set automatically, and Cluster.allocate(...) is invoked to start each Ray actor on the designated node and GPU with those variables.

  • Collective Execution of Methods: One powerful feature of WorkerGroup is the ability to call a method on all Workers as if it were a single call. After creating the group, the WorkerGroup instance dynamically attaches all the methods of the underlying Worker class onto itself. When you call one of these methods on the WorkerGroup, it will internally invoke that method on each worker in parallel (via Ray remote calls).

Example#

class rlinf.scheduler.worker.worker.Worker

Class representing a remote process or worker.

Inheriting Worker will grant your worker or processor class the ability to run remotely and communicate with other workers in the cluster. Also, essential environment variables like MASTER_ADDR, MASTER_PORT, RANK, LOCAL_RANK, WORLD_SIZE will be set automatically. This allows easy creation of torch process groups and distributed training.

The following example shows how to use the Worker class to create a simple distributed worker that can run on multiple GPUs and nodes.

Example:

>>> import torch
>>> from rlinf.scheduler import Cluster, Worker
>>>
>>> class MyWorker(Worker):
...     def __init__(self):
...         super().__init__()
...
...     def initialize(self):
...         torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
...         if not torch.distributed.is_initialized():
...             torch.distributed.init_process_group(backend="nccl")
...
...         test_tensor = torch.ones(
...             size=(1, 1), dtype=torch.float32, device=torch.cuda.current_device()
...         )
...         torch.distributed.all_reduce(test_tensor)
...         return test_tensor
...
...     def hello(self):
...         return self._rank
>>>
>>> cluster = Cluster(num_nodes=1)
>>> my_worker_group = MyWorker.create_group().launch(cluster=cluster, name="my_worker_group")
>>> my_worker_group.initialize().wait()[0]
tensor([[4.]], device='cuda:0')
>>> # This will execute the hello method only on ranks 0 and 1.
>>> my_worker_group.execute_on(0, 3).hello().wait()
[0, 3]

The following example shows the communication capabilities of the Worker class.

Example:

>>> import asyncio
>>> import torch
>>> from rlinf.scheduler import Cluster, Worker
>>> SEND_GROUP_NAME = "send_worker_group"
>>> RECV_GROUP_NAME = "recv_worker_group"
>>>
>>> class SendWorker(Worker):
...     def __init__(self):
...         super().__init__()
...
...     def hello_recv(self):
...         # 1. Send a message (string or any serializable object) to the RecvWorker group with the same rank as this SendWorker worker.
...         msg = f"Hello from SendWorker Rank {self._rank}!"
...         self.send(msg, dst_group_name=RECV_GROUP_NAME, dst_rank=self._rank)
...
...         # 2. Receive a reply from the RecvWorker group with the same rank.
...         reply = self.recv(
...             src_group_name=RECV_GROUP_NAME, src_rank=self._rank
...         )
...
...         # 3. The send/recv APIs can also handle tensor, list of tensors and dict of tensors.
...         torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
...         dst_rank = (
...             self._rank + 1
...         ) % self._world_size  # Send to the next rank in the group
...         tensor = torch.ones(
...             size=(1, 1),
...             dtype=torch.float32,
...             device=torch.cuda.current_device(),
...         )
...         self.send(tensor, dst_group_name=RECV_GROUP_NAME, dst_rank=dst_rank)
...
...         tensor_list = [
...             torch.tensor(
...                 1.0, dtype=torch.float32, device=torch.cuda.current_device()
...             )
...             for _ in range(4)
...         ]
...         self.send(
...             tensor_list, dst_group_name=RECV_GROUP_NAME, dst_rank=dst_rank
...         )
...
...         tensor_dict = {
...             "tensor1": torch.tensor(
...                 2.0, dtype=torch.float32, device=torch.cuda.current_device()
...             ),
...             "tensor2": torch.tensor(
...                 3.0, dtype=torch.float32, device=torch.cuda.current_device()
...             ),
...         }
...         self.send(
...             tensor_dict, dst_group_name=RECV_GROUP_NAME, dst_rank=dst_rank
...         )
...
...         # 4. Send tensor directly without metadata overhead if you already know the tensor shape and dtype at the recv side
...         tensor = torch.ones(
...             size=(2, 1),
...             dtype=torch.float32,
...             device=torch.cuda.current_device(),
...         )
...         self.send_tensor(
...             tensor, dst_group_name=RECV_GROUP_NAME, dst_rank=dst_rank
...         )
...
...     def hello_recv_async(self):
...         # 1. Send a tensor asynchronously to the RecvWorker group with the next rank.
...         dst_rank = (self._rank + 1) % self._world_size
...         tensor = torch.ones(
...             size=(3, 1),
...             dtype=torch.float32,
...             device=torch.cuda.current_device(),
...         )
...         async_send_work = self.send(
...             tensor,
...             dst_group_name=RECV_GROUP_NAME,
...             dst_rank=dst_rank,
...             async_op=True,
...         )
...         async_send_work.wait()  # Wait for the async send to complete
...
...         # 2. Send a tensor asynchronously and use asyncio to wait for the operation to complete.
...         async def send_tensor_async():
...             dst_rank = (self._rank + 1) % self._world_size
...             tensor = torch.ones(
...                 size=(4, 1),
...                 dtype=torch.float32,
...                 device=torch.cuda.current_device(),
...             )
...             async_send_work = self.send(
...                 tensor,
...                 dst_group_name=RECV_GROUP_NAME,
...                 dst_rank=dst_rank,
...                 async_op=True,
...             )
...             await async_send_work.async_wait()
...
...         asyncio.run(send_tensor_async())
>>>
>>> class RecvWorker(Worker):
...     def __init__(self):
...         super().__init__()
...
...     def hello_recv(self):
...         # 1. Receive a message from the SendWorker worker group with the same rank.
...         msg = self.recv(src_group_name=SEND_GROUP_NAME, src_rank=self._rank)
...
...         # 2. Send a reply back to the SendWorker worker group with the same rank.
...         reply = f"Hello from RecvWorker Rank {self._rank}!"
...         self.send(
...             reply, dst_group_name=SEND_GROUP_NAME, dst_rank=self._rank
...         )
...
...         # 3. Receive a tensor, tensor list and tensor dict from the SendWorker worker group with the same rank.
...         torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
...         src_rank = (
...             self._rank - 1
...         ) % self._world_size  # Receive from the previous rank in the group
...         tensor = self.recv(
...             src_group_name=SEND_GROUP_NAME, src_rank=src_rank
...         )
...         tensor_list = self.recv(
...             src_group_name=SEND_GROUP_NAME, src_rank=src_rank
...         )
...         tensor_dict = self.recv(
...             src_group_name=SEND_GROUP_NAME, src_rank=src_rank
...         )
...
...         # 4. In-place receive tensor directly without metadata overhead
...         tensor = torch.empty(
...             size=(2, 1),
...             dtype=torch.float32,
...             device=torch.cuda.current_device(),
...         )
...         self.recv_tensor(
...             tensor, src_group_name=SEND_GROUP_NAME, src_rank=src_rank
...         )
...
...     def hello_recv_async(self):
...         # 1. Receive a tensor asynchronously from the SendWorker group with the next rank.
...         src_rank = (self._rank - 1) % self._world_size
...         async_recv_work = self.recv(
...             src_group_name=SEND_GROUP_NAME, src_rank=src_rank, async_op=True
...         )
...         tensor = async_recv_work.wait()
...
...         # 2. Receive a tensor asynchronously and use asyncio to wait for the operation to complete.
...         async def recv_tensor_async():
...             src_rank = (self._rank - 1) % self._world_size
...             async_recv_work = self.recv(
...                 src_group_name=SEND_GROUP_NAME,
...                 src_rank=src_rank,
...                 async_op=True,
...             )
...             tensor = await async_recv_work.async_wait()
...
...         asyncio.run(recv_tensor_async())
>>>
>>> cluster = Cluster(num_nodes=1)
>>> send_group = SendWorker.create_group().launch(cluster=cluster, name=SEND_GROUP_NAME)
>>> recv_group = RecvWorker.create_group().launch(cluster=cluster, name=RECV_GROUP_NAME)
>>> res = send_group.hello_recv()
>>> res = recv_group.hello_recv().wait()
>>> res = send_group.hello_recv_async()
>>> res = recv_group.hello_recv_async().wait()

Summary#

In summary, the Worker module provides the foundation for distributed execution. WorkerAddress gives each worker a unique identity in a potentially nested group structure, WorkerInfo holds runtime metadata, and the Worker class manages the lifecycle of each distributed worker. On top of this, WorkerGroup groups multiple Workers, handling their placement and collective method execution. These abstractions hide much of the Ray-specific details and low-level environment setup, allowing users to focus on the higher-level logic of their distributed reinforcement learning algorithm.