Ray-Based Cluster Launching#
The Cluster class manages the connection to the Ray cluster and the launching of management actors and worker actors. It serves as a singleton representing the entire cluster’s resources and provides methods to allocate workers on specific nodes and GPUs. By encapsulating Ray initialization and node information, it simplifies distributed setup for the rest of the framework.
Note
Ray version requirement: RLinf requires ray>=2.47.0 (enforced at import time).
Do not call ray.init before creating a Cluster; premature initialization can interfere with the namespace and logging configuration.
Initialization and Ray Setup#
When a Cluster object is created, it performs the following steps in its initialization:
Ray Initialization : If Ray is not already started, it calls
ray.init()with the namespaceCluster.NAMESPACE.Waiting for Nodes : After Ray initialization,
Clusterwaits until the expected number of nodes (num_nodes) have registered with Ray.Gathering Node Information : Once the nodes are ready,
Clusterconstructs a list ofNodeInfoobjects (Ray ID, IP, CPU and GPU counts). The ‘master’ node is placed first; remaining nodes are sorted by IP.Master Address and Port : The master node’s IP is stored and a free TCP port is chosen for collective communications.
Global Manager Actors : A key part of initialization is launching three singleton manager actors:
WorkerManager: tracks every worker’s metadata.CollectiveManager: stores collective-group information, including rendezvous ports.NodeManager: provides node layout (IP, GPU count, master port) to workers.
Using Cluster to Allocate Workers#
Cluster.allocate() starts a Ray actor of class cls on a specific node with a controlled runtime environment:
handle = Cluster.allocate(
cls, # The actor class to launch
worker_name, # A unique, human-readable name for the actor
node_id, # Index into Cluster's node list (0 is master)
gpu_id, # Local GPU index on that node (used for env isolation)
env_vars, # Dict of environment variables for this actor (e.g., CUDA_VISIBLE_DEVICES)
cls_args=[], # Positional args to the actor's constructor
cls_kwargs={}, # Keyword args to the actor's constructor
)
What it does:
Validates
node_idandgpu_idagainst discovered topology.Wraps
clsviaray.remote(cls).Applies options:
runtime_env={"env_vars": env_vars}(propagates variables likeCUDA_VISIBLE_DEVICES,rank… )name=worker_name(makes the actor discoverable by name)scheduling_strategy=NodeAffinitySchedulingStrategy(node_id=<Ray NodeID>, soft=False)(pins the actor to the requested physical node)
Finally, it invokes .remote(*cls_args, **cls_kwargs) to launch the actor asynchronously and returns the actor handle.
Attaching to an Existing Cluster#
When a Cluster is created without arguments, it attaches to the running cluster:
Ensures Ray is initialized with
address="auto"and the known namespace.Retrieves the existing managers and shared state from
NodeManager:self._node_manager = NodeManager.get_proxy() self._nodes = self._node_manager.get_nodes() self._num_nodes = len(self._nodes) self._master_ip = self._node_manager.get_master_ip() self._master_port = self._node_manager.get_master_port()
This guarantees that every process using the same namespace observes the same cluster view.
Summary#
The Cluster singleton centralizes Ray initialization, node discovery, and manager-actor lifecycle under a stable namespace.
Driver code initializes once (launching managers and selecting a master), while subsequent processes simply attach and retrieve the shared view.
With allocate(), users can reliably place actors on specific nodes and set the environment variables accordingly,
keeping distributed orchestration predictable and consistent across the framework.