Adaptive P2P Communication#
This component provides point-to-point (P2P) data transfer between workers with strict ordering and async handles, on top of PyTorch torch.distributed.
It consists of two public-facing classes:
Collective: per-worker singleton that creates/caches communication groups.
CollectiveGroup: a two-rank communication group that implements P2P send/recv for tensors, lists/dicts of tensors, and picklable Python objects.
Group Creation & Caching#
The Collective class is instantiated on each worker (as a singleton per worker) and is responsible for creating and caching CollectiveGroup instances.
When two workers or a set of workers need to communicate, a collective group must be established that includes all participants.
The typical usage in this framework is to form groups for point-to-point communication by
Collective.create_collective_group(worker_addresses, group_name=None),
which either retrieves an existing CollectiveGroup for the given set of worker addresses or creates a new one.
P2P Communication#
A CollectiveGroup is the core abstraction in RLinf for managing point-to-point communication between two workers.
It determines the local rank (0 or 1) from group_info and lazily initializes communication process groups on first use.
Internally, separate send and receive process groups are created for both GPU (NCCL) and CPU (Gloo), forming dedicated one-way channels; in a two-worker setup, a carefully configured broadcast is equivalent to a send/receive.
Initialization uses a TCP rendezvous to coordinate port allocation and synchronization, ensuring both sides are ready.
Each direction maintains a work queue backed by a dedicated CUDA stream, strictly preserving the order of send/recv operations and preventing message interleaving.
With the process groups in place, CollectiveGroup can perform communications. The main APIs are:
Send:
send(obj, async_op=False)sends an object (tensor, list of tensors, dict of tensors, or arbitrary picklable object) to the single other peer in the group. This method first sends a small header indicating the object type so that the receiver can interpret the payload.Recv:
recv(async_op=False)receives an object from the peer. It first receives the type code (CPU/Gloo), then dispatches to the appropriate receiver to reconstruct the object.Direct Tensor Send/Recv:
send_tensor(tensor, async_op=False)andrecv_tensor(tensor, async_op=False)are optimized for the case where only one tensor is being transferred and the receiver already has an allocated tensor buffer. These avoid the extra round-trip of sending metadata.
Note
All CUDA tensors must be contiguous; non-contiguity raises a helpful error. Mixing CPU and CUDA tensors in a single list/dict is disallowed.
Warning
send_tensor must be paired with recv_tensor (and vice versa). Do not mix them with the generic send/recv for the same message.
Asynchronous API#
All P2P APIs support asynchronous operation and return awaitable work handles when async_op=True. Internally, we expose a small hierarchy:
AsyncWork: abstract base withwait(),async_wait(),then(func, *args, **kwargs),done(), and chaining helpers (get_next_work(),get_last_work()).AsyncFuncWork: executes a Python callback when its predecessor completes, records a CUDA event, and can be chained viathen. If the callback returns anotherAsyncWork, completion is deferred until the last work in that chain finishes.AsyncCollWork: wraps atorch.distributedwork (e.g., broadcast) into our awaitable interface. It also supportsthen(single underlying work).AsyncChannelWork: wraps aray.ObjectRefas an awaitable (for channel RPCs).
Key properties:
Waiting:
wait()is blocking ;async_wait()isasyncio-friendly. Both ensure the recorded CUDA event has completed before returning.Chaining:
thenschedules a follow-up callback.Completion:
done()is a non-blocking query to check whether the underlying work finished.
Minimal examples:
# Async object send/recv with await
send_work = group.send(obj, async_op=True) # AsyncWork
await send_work.async_wait() # non-blocking await
recv_work = group.recv(async_op=True) # AsyncWork
obj = recv_work.wait() # blocking wait; returns received object
# Chaining a post-processing step
def postprocess(buf):
# e.g., move to CPU, cast, or notify another subsystem
return None
w = group.recv_tensor(tensor, async_op=True) # receiver-side preallocated tensor
w2 = w.then(postprocess) # AsyncFuncWork
w2.wait() # ensure postprocess finished
Summary#
In summary, the collective component provides the engine for P2P data transfer between workers. It abstracts away the details of using PyTorch’s distributed backends, managing multiple process groups to simulate send/receive, and optimizing for GPU transfers.
Users of the framework typically invoke these via the Worker.send/recv or channel operations, rather than calling CollectiveGroup directly.