Actor 接口#

本节介绍 RLinf 框架中 Actor 类的关键 API。 其实现包括基于 MegatronFSDP 两种后端。

此外,还提供了关于 ModelManager 的信息。 ModelManager 作为 Actor 类的父类,负责管理底层模型,并提供参数加载 / 卸载等关键 API。

MegatronActor#

class rlinf.workers.actor.megatron_actor_worker.MegatronActor#

基类:MegatronWorker

__init__(cfg, placement, role='actor')#

Initialize the MegatronWorker.

参数:
get_forward_step_func()#

Acquire the forward step function for the model.

sync_model_to_rollout()#

Sync the model's full state dict to the rollout worker.

MegatronModelManager#

class rlinf.hybrid_engines.megatron.megatron_model_manager.MegatronModelManager#

Megatron Model Manager for RL training

__init__(cfg)#
参数:

cfg (DictConfig)

setup_model_and_optimizer(model_type=megatron.core.enums.ModelType.encoder_or_decoder)#

Setup model and optimizer.

model_provider_func(pre_process, post_process)#

Model depends on pipeline paralellism.

make_data_iterator_list(data_iterator, padding=False, vpp_size=1)#

Convert the data iterator into the format expected by Megatron. With interleaved pipeline parallelism, Megatron expects a list of one data iterator per model chunk.

参数:
  • data_iterator (Iterator)

  • padding (bool)

  • vpp_size (int)

返回类型:

list[Iterator]

static custom_forward(model, input_ids, attention_mask, position_ids, sequence_parallel, value_model=False, pack_seqs=True, logits_processor=None, logits_processor_args=None, temperature=1.0, max_batch_seqlen=4096, padding_seqlen=None, keep_left_padding=False, **model_forward_kwargs)#

Default forward pass for GPT models with optional sequence packing.

参数:
  • logits_processor_args (dict | None)

  • temperature (float)

  • max_batch_seqlen (int)

  • padding_seqlen (int | None)

  • keep_left_padding (bool)

offload_megatron_copy_params(optimizers)#

Offload optimizer parameters to CPU. Supports both Megatron optimizers and ChainedOptimizer, which wraps a list of underlying optimizers.

参数:

optimizers -- The optimizer or ChainedOptimizer instance.

load_megatron_copy_params(optimizers)#

Load optimizer parameters back to GPU. Handles ChainedOptimizer.

参数:

optimizers -- Optimizer or ChainedOptimizer instance.

FSDPActor#

class rlinf.workers.actor.fsdp_actor_worker.EmbodiedFSDPActor#

基类:FSDPModelManager, Worker

__init__(cfg)#

Initialize FSDP Model Manager.

参数:
  • cfg (DictConfig) -- actor config in yaml file.

  • world_size -- total number of FSDP actor processes.

init_worker()#

Initialize the actor worker. build the model and use corresponding training backend, if needed, offload model parameters and optimizer states to CPU.

返回类型:

None

model_provider_func()#

Initialize model used by FSDP actor

返回:

the initialized model.

返回类型:

model

async recv_rollout_trajectories(input_channel)#

Receive rollout trajectories from rollout workers.

参数:

input_channel (Channel) -- The input channel to read from.

返回类型:

None

compute_advantages_and_returns()#

Compute the advantages and returns.

返回类型:

dict[str, Tensor]

run_training()#

Run the training process using the received rollout batch.

返回类型:

None

set_global_step(global_step)#

Set the global step for the model, if needed.

参数:

global_step (int)

返回类型:

None

FSDPModelManager#

class rlinf.hybrid_engines.fsdp.fsdp_model_manager.FSDPModelManager#

FSDP Model Manager for RL training

__init__(cfg, world_size, rank)#

Initialize FSDP Model Manager.

参数:
  • cfg (DictConfig) -- actor config in yaml file.

  • world_size (int) -- total number of FSDP actor processes.

  • rank (int)

返回类型:

None

model_provider_func()#

Initialize model used by FSDP actor

返回:

the initialized model.

返回类型:

model

setup_model_and_optimizer()#

Setup model, lr_scheduler, optimizer and grad_scaler.

返回类型:

None

get_model_state_dict(cpu_offload, full_state_dict)#

Get the model state dict according to the specified options.

参数:
  • cpu_offload (-) -- Whether returned state_dict's value will be offloaded to CPU If true, will be copied to CPU memory, or just keep a reference to the original GPU tensor.

  • full_state_dict (-) -- Whether to get the full state dict.

返回:

The state dict of the FSDP wrapped model according to the specified options

返回类型:

  • dict

load_checkpoint(load_path)#

Load checkpoint from local path.

参数:

load_path (str) -- the directory to load checkpoint.

返回类型:

None

save_checkpoint(save_path, step=0)#

Save checkpoint to local path. Every rank will save its own model and optim shard.

参数:
  • save_path (str) -- the directory to save checkpoint.

  • step (int)

返回类型:

None

offload_param_and_grad(offload_grad=False)#

Offload FSDP parameters and gradients(options) to CPU.

参数:

offload_grad (bool) -- whether to offload gradients.

返回类型:

None

load_param_and_grad(device_id, load_grad=False)#

Load FSDP parameters and gradients(options) to the specified device.

参数:
  • device_id (int) -- the target device id to load parameters and gradients.

  • load_grad (bool) -- whether to load gradients.

返回类型:

None

offload_optimizer()#

Offload optimizer states to CPU.

返回类型:

None

load_optimizer(device_id)#

Load optimizer states to the specified device.

参数:

device_id (int) -- the target device id to load optimizer states.

返回类型:

None

optimizer_step()#

Perform optimizer step using its optimizer, lr_scheduler and grad_scaler.

返回:

A tuple of (grad_norm, lr_list), lr_list contains learning rates for all param groups.

返回类型:

tuple[float, list[float]]

build_lr_scheduler(optimizer, optim_config)#

Build the learning rate scheduler based on the configuration. Currently only support LambdaLR scheduler with various warmup styles.

参数:
  • optimizer (Optimizer) -- The optimizer for which to schedule the learning rate.

  • optim_config (DictConfig) -- The optimizer config.

返回:

The learning rate scheduler.

返回类型:

LRScheduler

build_optimizer(model, enable_critic_warmup=False)#

Build the optimizer based on the configuration, currently only support Adam optimizer.

参数:
  • model (Module | FSDPModule | FullyShardedDataParallel) -- The model to optimize, can be nn.Module, FSDPModule (used in FSDP2) or FSDP.

  • enable_critic_warmup (bool) -- Whether to enable critic warmup used for value network.

返回:

The constructed optimizer.

返回类型:

Optimizer

build_grad_scaler(enabled, **kwargs)#

Build the gradient scaler based on the configuration.

参数:
  • enabled (bool) -- Whether to enable gradient scaling.

  • kwargs -- Optional parameters for ShardedGradScaler.

返回:

The gradient scaler.

返回类型:

ShardedGradScaler

before_micro_batch(model, is_last_micro_batch)#

Setup context manager before processing a micro-batch. This is used to control gradient synchronization behavior. Depending on the specific FSDP strategy being used, if using FSDP, it will return model.no_sync() for non-last micro-batches to avoid gradient synchronization, and nullcontext() for the last micro-batch to ensure gradients are synchronized and updated. If using FSDP2, it will set requires_gradient_sync flag on the model accordingly.

参数:
  • model (FullyShardedDataParallel | FSDPModule) -- The FSDP or FSDPModule model.

  • is_last_micro_batch (bool) -- A boolean indicating if this is the last micro-batch.

返回:

A context manager for the micro-batch processing.

返回类型:

ContextManager