Actor Interface#

This section provides the key APIs of the Actor classes in the RLinf framework. It includes implementations based on both Megatron and FSDP backends. In addition, information about the ModelManager is provided. As the parent class of the Actor classes, it manages the underlying model as well as critical APIs for parameter onload/offload.

MegatronActor#

class rlinf.workers.actor.megatron_actor_worker.MegatronActor#

Bases: MegatronWorker

__init__(cfg, placement, role='actor')#

Initialize the MegatronWorker.

Parameters:
get_forward_step_func()#

Acquire the forward step function for the model.

sync_model_to_rollout()#

Send the model weights to the destination ranks in the rollout task.

MegatronModelManager#

class rlinf.hybrid_engines.megatron.megatron_model_manager.MegatronModelManager#

Megatron Model Manager for RL training

__init__(cfg)#
Parameters:

cfg (DictConfig)

setup_model_and_optimizer(model_type=megatron.core.enums.ModelType.encoder_or_decoder)#

Setup model and optimizer.

model_provider_func(pre_process, post_process)#

Model depends on pipeline paralellism.

make_data_iterator_list(data_iterator, padding=False, vpp_size=1)#

Convert the data iterator into the format expected by Megatron. With interleaved pipeline parallelism, Megatron expects a list of one data iterator per model chunk.

Parameters:
  • data_iterator (Iterator)

  • padding (bool)

  • vpp_size (int)

Return type:

list[Iterator]

static custom_forward(model, input_ids, attention_mask, position_ids, sequence_parallel, value_model=False, pack_seqs=True, logits_processor=None, logits_processor_args=None, temperature=1.0, max_batch_seqlen=4096, padding_seqlen=None)#

Default forward pass for GPT models with optional sequence packing.

Parameters:
  • logits_processor_args (dict | None)

  • temperature (float)

  • max_batch_seqlen (int)

  • padding_seqlen (int | None)

offload_megatron_copy_params(optimizers)#

Offload optimizer parameters to CPU. Supports both Megatron optimizers and ChainedOptimizer, which wraps a list of underlying optimizers.

Parameters:

optimizers – The optimizer or ChainedOptimizer instance.

load_megatron_copy_params(optimizers)#

Load optimizer parameters back to GPU. Handles ChainedOptimizer.

Parameters:

optimizers – Optimizer or ChainedOptimizer instance.

FSDPActor#

class rlinf.workers.actor.fsdp_actor_worker.EmbodiedFSDPActor#

Bases: FSDPModelManager, Worker

__init__(cfg)#

Initialize FSDP Model Manager.

Parameters:
  • cfg (DictConfig) – actor config in yaml file.

  • world_size – total number of FSDP actor processes.

init_worker()#

Initialize the actor worker. build the model and use corresponding training backend, if needed, offload model parameters and optimizer states to CPU.

Return type:

None

model_provider_func()#

Initialize model used by FSDP actor

Returns:

the initialized model.

Return type:

model

sync_model_to_rollout()#

Sync the model’s full state dict to the rollout worker.

Return type:

None

async recv_rollout_trajectories(input_channel)#

Receive rollout trajectories from rollout workers.

Parameters:

input_channel (Channel) – The input channel to read from.

Return type:

None

compute_advantages_and_returns()#

Compute the advantages and returns.

Return type:

dict[str, Tensor]

run_training()#

Run the training process using the received rollout batch.

Return type:

None

set_global_step(global_step)#

Set the global step for the model, if needed.

Parameters:

global_step (int)

Return type:

None

FSDPModelManager#

class rlinf.hybrid_engines.fsdp.fsdp_model_manager.FSDPModelManager#

FSDP Model Manager for RL training

__init__(cfg, world_size, rank)#

Initialize FSDP Model Manager.

Parameters:
  • cfg (DictConfig) – actor config in yaml file.

  • world_size (int) – total number of FSDP actor processes.

  • rank (int)

Return type:

None

model_provider_func()#

Initialize model used by FSDP actor

Returns:

the initialized model.

Return type:

model

setup_model_and_optimizer()#

Setup model, lr_scheduler, optimizer and grad_scaler.

Return type:

None

get_model_state_dict(cpu_offload, full_state_dict)#

Get the model state dict according to the specified options.

Parameters:
  • cpu_offload (-) – Whether returned state_dict’s value will be offloaded to CPU If true, will be copied to CPU memory, or just keep a reference to the original GPU tensor.

  • full_state_dict (-) – Whether to get the full state dict.

Returns:

The state dict of the FSDP wrapped model according to the specified options

Return type:

  • dict

load_checkpoint(load_path)#

Load checkpoint from local path.

Parameters:

load_path (str) – the directory to load checkpoint.

Return type:

None

save_checkpoint(save_path, step=0)#

Save checkpoint to local path. Every rank will save its own model and optim shard.

Parameters:
  • save_path (str) – the directory to save checkpoint.

  • step (int)

Return type:

None

offload_param_and_grad(offload_grad=False)#

Offload FSDP parameters and gradients(options) to CPU.

Parameters:

offload_grad (bool) – whether to offload gradients.

Return type:

None

load_param_and_grad(device_id, load_grad=False)#

Load FSDP parameters and gradients(options) to the specified device.

Parameters:
  • device_id (int) – the target device id to load parameters and gradients.

  • load_grad (bool) – whether to load gradients.

Return type:

None

offload_optimizer()#

Offload optimizer states to CPU.

Return type:

None

load_optimizer(device_id)#

Load optimizer states to the specified device.

Parameters:

device_id (int) – the target device id to load optimizer states.

Return type:

None

optimizer_step()#

Perform optimizer step using its optimizer, lr_scheduler and grad_scaler.

Returns:

A tuple of (grad_norm, lr_list), lr_list contains learning rates for all param groups.

Return type:

tuple[float, list[float]]

build_lr_scheduler(optimizer, optim_config)#

Build the learning rate scheduler based on the configuration. Currently only support LambdaLR scheduler with various warmup styles.

Parameters:
  • optimizer (Optimizer) – The optimizer for which to schedule the learning rate.

  • optim_config (DictConfig) – The optimizer config.

Returns:

The learning rate scheduler.

Return type:

LRScheduler

build_optimizer(model, enable_critic_warmup=False)#

Build the optimizer based on the configuration, currently only support Adam optimizer.

Parameters:
  • model (Module | FSDPModule | FullyShardedDataParallel) – The model to optimize, can be nn.Module, FSDPModule (used in FSDP2) or FSDP.

  • enable_critic_warmup (bool) – Whether to enable critic warmup used for value network.

Returns:

The constructed optimizer.

Return type:

Optimizer

build_grad_scaler(enabled, **kwargs)#

Build the gradient scaler based on the configuration.

Parameters:
  • enabled (bool) – Whether to enable gradient scaling.

  • kwargs – Optional parameters for ShardedGradScaler.

Returns:

The gradient scaler.

Return type:

ShardedGradScaler

before_micro_batch(model, is_last_micro_batch)#

Setup context manager before processing a micro-batch. This is used to control gradient synchronization behavior. Depending on the specific FSDP strategy being used, if using FSDP, it will return model.no_sync() for non-last micro-batches to avoid gradient synchronization, and nullcontext() for the last micro-batch to ensure gradients are synchronized and updated. If using FSDP2, it will set requires_gradient_sync flag on the model accordingly.

Parameters:
  • model (FullyShardedDataParallel | FSDPModule) – The FSDP or FSDPModule model.

  • is_last_micro_batch (bool) – A boolean indicating if this is the last micro-batch.

Returns:

A context manager for the micro-batch processing.

Return type:

ContextManager