Actor Interface#
This section provides the key APIs of the Actor classes in the RLinf framework.
It includes implementations based on both Megatron and FSDP backends.
In addition, information about the ModelManager is provided. As the parent class of the Actor classes, it manages the underlying model as well as critical APIs for parameter onload/offload.
MegatronActor#
- class rlinf.workers.actor.megatron_actor_worker.MegatronActor#
Bases:
MegatronWorker- __init__(cfg, placement, role='actor')#
Initialize the MegatronWorker.
- Parameters:
cfg (
DictConfig) β The configuration for the actor.placement (ModelParallelComponentPlacement)
- get_forward_step_func()#
Acquire the forward step function for the model.
- sync_model_to_rollout()#
Send the model weights to the destination ranks in the rollout task.
MegatronModelManager#
- class rlinf.hybrid_engines.megatron.megatron_model_manager.MegatronModelManager#
Megatron Model Manager for RL training
- __init__(cfg)#
- Parameters:
cfg (DictConfig)
- setup_model_and_optimizer(model_type=megatron.core.enums.ModelType.encoder_or_decoder)#
Setup model and optimizer.
- model_provider_func(pre_process, post_process)#
Model depends on pipeline paralellism.
- make_data_iterator_list(data_iterator, padding=False, vpp_size=1)#
Convert the data iterator into the format expected by Megatron. With interleaved pipeline parallelism, Megatron expects a list of one data iterator per model chunk.
- Parameters:
data_iterator (Iterator)
padding (bool)
vpp_size (int)
- Return type:
list[Iterator]
- static custom_forward(model, input_ids, attention_mask, position_ids, sequence_parallel, value_model=False, pack_seqs=True, logits_processor=None, logits_processor_args=None, temperature=1.0, max_batch_seqlen=4096, padding_seqlen=None)#
Default forward pass for GPT models with optional sequence packing.
- Parameters:
logits_processor_args (dict | None)
temperature (float)
max_batch_seqlen (int)
padding_seqlen (int | None)
- offload_megatron_copy_params(optimizers)#
Offload optimizer parameters to CPU. Supports both Megatron optimizers and
ChainedOptimizer, which wraps a list of underlying optimizers.- Parameters:
optimizers β The optimizer or ChainedOptimizer instance.
- load_megatron_copy_params(optimizers)#
Load optimizer parameters back to GPU. Handles ChainedOptimizer.
- Parameters:
optimizers β Optimizer or ChainedOptimizer instance.
FSDPActor#
- class rlinf.workers.actor.fsdp_actor_worker.EmbodiedFSDPActor#
Bases:
FSDPModelManager,Worker- __init__(cfg)#
Initialize FSDP Model Manager.
- Parameters:
cfg (DictConfig) β actor config in yaml file.
world_size β total number of FSDP actor processes.
- init_worker()#
Initialize the actor worker. build the model and use corresponding training backend, if needed, offload model parameters and optimizer states to CPU.
- Return type:
None
- model_provider_func()#
Initialize model used by FSDP actor
- Returns:
the initialized model.
- Return type:
model
- sync_model_to_rollout()#
Sync the modelβs full state dict to the rollout worker.
- Return type:
None
- async recv_rollout_trajectories(input_channel)#
Receive rollout trajectories from rollout workers.
- Parameters:
input_channel (Channel) β The input channel to read from.
- Return type:
None
- compute_advantages_and_returns()#
Compute the advantages and returns.
- Return type:
dict[str, Tensor]
- run_training()#
Run the training process using the received rollout batch.
- Return type:
None
- set_global_step(global_step)#
Set the global step for the model, if needed.
- Parameters:
global_step (int)
- Return type:
None
FSDPModelManager#
- class rlinf.hybrid_engines.fsdp.fsdp_model_manager.FSDPModelManager#
FSDP Model Manager for RL training
- __init__(cfg, world_size, rank)#
Initialize FSDP Model Manager.
- Parameters:
cfg (DictConfig) β actor config in yaml file.
world_size (int) β total number of FSDP actor processes.
rank (int)
- Return type:
None
- model_provider_func()#
Initialize model used by FSDP actor
- Returns:
the initialized model.
- Return type:
model
- setup_model_and_optimizer()#
Setup model, lr_scheduler, optimizer and grad_scaler.
- Return type:
None
- get_model_state_dict(cpu_offload, full_state_dict)#
Get the model state dict according to the specified options.
- Parameters:
cpu_offload (-) β Whether returned state_dictβs value will be offloaded to CPU If true, will be copied to CPU memory, or just keep a reference to the original GPU tensor.
full_state_dict (-) β Whether to get the full state dict.
- Returns:
The state dict of the FSDP wrapped model according to the specified options
- Return type:
dict
- load_checkpoint(load_path)#
Load checkpoint from local path.
- Parameters:
load_path (str) β the directory to load checkpoint.
- Return type:
None
- save_checkpoint(save_path, step=0)#
Save checkpoint to local path. Every rank will save its own model and optim shard.
- Parameters:
save_path (str) β the directory to save checkpoint.
step (int)
- Return type:
None
- offload_param_and_grad(offload_grad=False)#
Offload FSDP parameters and gradients(options) to CPU.
- Parameters:
offload_grad (bool) β whether to offload gradients.
- Return type:
None
- load_param_and_grad(device_id, load_grad=False)#
Load FSDP parameters and gradients(options) to the specified device.
- Parameters:
device_id (int) β the target device id to load parameters and gradients.
load_grad (bool) β whether to load gradients.
- Return type:
None
- offload_optimizer()#
Offload optimizer states to CPU.
- Return type:
None
- load_optimizer(device_id)#
Load optimizer states to the specified device.
- Parameters:
device_id (int) β the target device id to load optimizer states.
- Return type:
None
- optimizer_step()#
Perform optimizer step using its optimizer, lr_scheduler and grad_scaler.
- Returns:
A tuple of (grad_norm, lr_list), lr_list contains learning rates for all param groups.
- Return type:
tuple[float, list[float]]
- build_lr_scheduler(optimizer, optim_config)#
Build the learning rate scheduler based on the configuration. Currently only support LambdaLR scheduler with various warmup styles.
- Parameters:
optimizer (
Optimizer) β The optimizer for which to schedule the learning rate.optim_config (
DictConfig) β The optimizer config.
- Returns:
The learning rate scheduler.
- Return type:
LRScheduler
- build_optimizer(model, enable_critic_warmup=False)#
Build the optimizer based on the configuration, currently only support Adam optimizer.
- Parameters:
model (Module | FSDPModule | FullyShardedDataParallel) β The model to optimize, can be nn.Module, FSDPModule (used in FSDP2) or FSDP.
enable_critic_warmup (bool) β Whether to enable critic warmup used for value network.
- Returns:
The constructed optimizer.
- Return type:
Optimizer
- build_grad_scaler(enabled, **kwargs)#
Build the gradient scaler based on the configuration.
- Parameters:
enabled (
bool) β Whether to enable gradient scaling.kwargs β Optional parameters for ShardedGradScaler.
- Returns:
The gradient scaler.
- Return type:
ShardedGradScaler
- before_micro_batch(model, is_last_micro_batch)#
Setup context manager before processing a micro-batch. This is used to control gradient synchronization behavior. Depending on the specific FSDP strategy being used, if using FSDP, it will return model.no_sync() for non-last micro-batches to avoid gradient synchronization, and nullcontext() for the last micro-batch to ensure gradients are synchronized and updated. If using FSDP2, it will set requires_gradient_sync flag on the model accordingly.
- Parameters:
model (FullyShardedDataParallel | FSDPModule) β The FSDP or FSDPModule model.
is_last_micro_batch (bool) β A boolean indicating if this is the last micro-batch.
- Returns:
A context manager for the micro-batch processing.
- Return type:
ContextManager