Channel Queuing for Pipelining#

The channel module provides a high-level distributed producer–consumer queue abstraction for workers to exchange data asynchronously. A Channel allows one or more producer workers to put items into a named queue and one or more consumer workers to get them, optionally accumulating batches based on per-item weights.

Channel Creation and Connection#

A new channel can be created using:

Worker.create_channel(
    channel_name,
    node_id=0,
    maxsize=0
)

This method:

  • Determines placement — If group_affinity or group_rank_affinity are not specified, the channel is hosted in the current worker’s group and rank (same node and GPU).

  • Launches a dedicated channel actor — Uses PackedPlacementStrategy to start a ChannelWorker (that actually holds the queue) with num_processes=1 on the selected node/GPU.

  • Returns a Channel object that wraps the actor. The channel actor’s address is channel_name:0.

To connect to an existing channel from another worker, use:

Worker.connect_channel(channel_name)

This looks up the channel actor in the Ray namespace and returns a Channel object bound to both the actor and the current worker.

Putting Items into the Channel#

Use channel.put(item, weight=0, key="default", async_op=False) to send data.

  • The sending worker first transmits the item to the ChannelWorker that actually owns the target queue.

  • The ChannelWorker receives the data, wraps it as a WeightedItem (with the given weight), and enqueues it into the specified queue. If the queue has a size limit (maxsize > 0) and is full, the enqueue will block until space becomes available.

Getting Items from the Channel#

Use channel.get(key="default", async_op=False) to retrieve data which is essentially the reverse of put.

  • The ChannelWorker first dequeues an item from the specified queue.

  • It then sends this item to the worker that requested it, where it is returned to the caller.

Batch Retrieval#

Use channel.get_batch(batch_weight, key="default", async_op=False) to retrieve multiple items at once.

  • The ChannelWorker repeatedly dequeues items from the queue, summing their weight values.

  • Once the accumulated weight reaches or exceeds batch_weight, it stops.

  • All dequeued items are combined into a list and sent to the requesting worker in one message.

This feature is useful for dynamically forming batches of experiences or workers to process, where each item has a cost or size (the weight) and you want to process roughly uniform batch sizes.

Load Balancing#

During the Rollout stage, trajectories often vary significantly in length. If these are distributed to each data parallel (DP) training group without any design, it can result in severe load imbalance.

To address this issue, we implement a channel-based load balancing mechanism. Specifically, all generators in the generation stage sequentially put complete rollout trajectories into a shared rollout_output_queue. Since the trajectories are inserted in temporal order, the sequence lengths in the rollout_output_queue tend to grow over time.

Using a round-robin strategy, we continuously get trajectories from the rollout_output_queue and assign them to each DP training group in turn. This method helps approximate balanced workload distribution across all training DP groups, ensuring better utilization and efficiency during training.

Example#

class rlinf.scheduler.Channel

A FIFO queue-like channel for inter-worker communication.

Creation: Channel can be created both inside and outside of worker contexts. The recommended practice is to create channels outside of worker contexts using Channel.create(), and then pass them into workers as needed. You can also create channels inside worker contexts or connect to existing channels, using self.create_channel() or self.connect_channel().

Interface: Similar as the asyncio.Queue, the Channel provides interfaces like put, get, put_no_wait, and get_no_wait, as well as query interfaces like qsize, empty, and full. The semantics of these interfaces are identical to those of asyncio.Queue.

Features:

  1. Async operation: Channel supports both synchronous and asynchronous put and get operations, similar to Worker’s send and recv APIs. Both operations accept arbitrary data item as long as it’s serializable. The default behavior is synchronous, and async operations can be enabled by setting the async_op flag. This async can be used not only in asyncio context with await channel.get(async_op=True).async_wait(), but also in non-asyncio contexts by generating a communication handle that can be waited later, like async torch.distributed.send().

  2. Key-based routing: Channel allows specifying a key for each data item, which can be used to identify and route messages. For example, if you wish a specific data to be get and processed by a specific worker, you can assign a unique key to that data item when putting it into the channel. The target worker can then use this key to retrieve the specific data item.This is useful in multi-turn scenarios in agent and embodied RL, where a data is processed by a fixed set of workers.

  3. Weight and batch processing: Channel also supports assigning weights to individual data items, allowing for more fine-grained control over how messages are processed. A get_batch method can be used to retrieve a batch of messages which respects the assigned weights.

  4. Debugging: Channel allows you to print a Channel’s internal data by directly print the Channel object.

Example:

>>> import sys
>>> import os
>>> import asyncio
>>> import torch
>>> from rlinf.scheduler import (
...     Worker,
...     Cluster,
...     PackedPlacementStrategy,
... )
>>>
>>> class Producer(Worker):
...     def __init__(self):
...         super().__init__()
...
...     def produce(self, channel: Channel):
...         # Synchronous put of common object
...         channel.put("Hello from Producer")
...
...         # Synchronous put of tensor
...         tensor = torch.ones(1, device=torch.cuda.current_device())
...         channel.put(tensor)
...
...         # Asynchronous put of common object
...         async_work = channel.put(
...             "Hello from Producer asynchronously", async_op=True
...         )
...         async_work.wait()
...
...         # Asynchronous put using asyncio
...         async_work = channel.put(tensor, async_op=True)
...
...         async def wait_async():
...             await async_work.async_wait()
...
...         asyncio.run(wait_async())
...
...         # Put object with weight
...         channel.put("Hello with weight", weight=1)
...         channel.put(tensor, weight=2)
>>>
>>> class Consumer(Worker):
...     def __init__(self):
...         super().__init__()
...
...     def consume(self, channel: Channel):
...         tensor = channel.get()
...
...         async_work = channel.get(async_op=True)
...         async_result = async_work.wait()
...
...         async_work = channel.get(async_op=True)
...
...         async def wait_async():
...             result = await async_work.async_wait()
...
...         asyncio.run(wait_async())
...
...         # Get batch of objects based on weight
...         batch = channel.get_batch(target_weight=3)
>>>
>>> cluster = Cluster(num_nodes=1)
>>> channel = Channel.create(name="channel")
>>> placement = PackedPlacementStrategy(
...     start_hardware_rank=0, end_hardware_rank=0
... )
>>> producer = Producer.create_group().launch(
...     cluster, name="test", placement_strategy=placement
... )
>>> consumer = Consumer.create_group().launch(
...     cluster, name="test2", placement_strategy=placement
... )
>>> r1 = producer.produce(channel)
>>> r2 = consumer.consume(channel)
>>> res = r1.wait()
>>> res = r2.wait()

Summary#

The Channel component offers a distributed producer-consumer queue for worker communication. It wraps the collective send/recv mechanism with an intuitive interface supporting priority and batching, enabling decoupled, asynchronous data flow—ideal for reinforcement learning scenarios with parallel data collection and batched consumption.