Actor 接口#
本节介绍 RLinf 框架中 Actor 类的关键 API。 其实现包括基于 Megatron 和 FSDP 两种后端。
此外,还提供了关于 ModelManager 的信息。
ModelManager 作为 Actor 类的父类,负责管理底层模型,并提供参数加载 / 卸载等关键 API。
MegatronActor#
- class rlinf.workers.actor.megatron_actor_worker.MegatronActor#
基类:
MegatronWorker- __init__(cfg, placement, role='actor')#
Initialize the MegatronWorker.
- 参数:
cfg (
DictConfig) -- The configuration for the actor.placement (ModelParallelComponentPlacement)
- get_forward_step_func()#
Acquire the forward step function for the model.
- sync_model_to_rollout()#
Sync the model's full state dict to the rollout worker.
MegatronModelManager#
- class rlinf.hybrid_engines.megatron.megatron_model_manager.MegatronModelManager#
Megatron Model Manager for RL training
- __init__(cfg)#
- 参数:
cfg (DictConfig)
- setup_model_and_optimizer(model_type=megatron.core.enums.ModelType.encoder_or_decoder)#
Setup model and optimizer.
- model_provider_func(pre_process, post_process)#
Model depends on pipeline paralellism.
- make_data_iterator_list(data_iterator, padding=False, vpp_size=1)#
Convert the data iterator into the format expected by Megatron. With interleaved pipeline parallelism, Megatron expects a list of one data iterator per model chunk.
- 参数:
data_iterator (Iterator)
padding (bool)
vpp_size (int)
- 返回类型:
list[Iterator]
- static custom_forward(model, input_ids, attention_mask, position_ids, sequence_parallel, value_model=False, pack_seqs=True, logits_processor=None, logits_processor_args=None, temperature=1.0, max_batch_seqlen=4096, padding_seqlen=None, keep_left_padding=False, **model_forward_kwargs)#
Default forward pass for GPT models with optional sequence packing.
- 参数:
logits_processor_args (dict | None)
temperature (float)
max_batch_seqlen (int)
padding_seqlen (int | None)
keep_left_padding (bool)
- offload_megatron_copy_params(optimizers)#
Offload optimizer parameters to CPU. Supports both Megatron optimizers and
ChainedOptimizer, which wraps a list of underlying optimizers.- 参数:
optimizers -- The optimizer or ChainedOptimizer instance.
- load_megatron_copy_params(optimizers)#
Load optimizer parameters back to GPU. Handles ChainedOptimizer.
- 参数:
optimizers -- Optimizer or ChainedOptimizer instance.
FSDPActor#
- class rlinf.workers.actor.fsdp_actor_worker.EmbodiedFSDPActor#
-
- __init__(cfg)#
Initialize FSDP Model Manager.
- 参数:
cfg (DictConfig) -- actor config in yaml file.
world_size -- total number of FSDP actor processes.
- init_worker()#
Initialize the actor worker. build the model and use corresponding training backend, if needed, offload model parameters and optimizer states to CPU.
- 返回类型:
None
- model_provider_func()#
Initialize model used by FSDP actor
- 返回:
the initialized model.
- 返回类型:
model
- async recv_rollout_trajectories(input_channel)#
Receive rollout trajectories from rollout workers.
- 参数:
input_channel (Channel) -- The input channel to read from.
- 返回类型:
None
- compute_advantages_and_returns()#
Compute the advantages and returns.
- 返回类型:
dict[str, Tensor]
- run_training()#
Run the training process using the received rollout batch.
- 返回类型:
None
- set_global_step(global_step)#
Set the global step for the model, if needed.
- 参数:
global_step (int)
- 返回类型:
None
FSDPModelManager#
- class rlinf.hybrid_engines.fsdp.fsdp_model_manager.FSDPModelManager#
FSDP Model Manager for RL training
- __init__(cfg, world_size, rank)#
Initialize FSDP Model Manager.
- 参数:
cfg (DictConfig) -- actor config in yaml file.
world_size (int) -- total number of FSDP actor processes.
rank (int)
- 返回类型:
None
- model_provider_func()#
Initialize model used by FSDP actor
- 返回:
the initialized model.
- 返回类型:
model
- setup_model_and_optimizer()#
Setup model, lr_scheduler, optimizer and grad_scaler.
- 返回类型:
None
- get_model_state_dict(cpu_offload, full_state_dict)#
Get the model state dict according to the specified options.
- 参数:
cpu_offload (-) -- Whether returned state_dict's value will be offloaded to CPU If true, will be copied to CPU memory, or just keep a reference to the original GPU tensor.
full_state_dict (-) -- Whether to get the full state dict.
- 返回:
The state dict of the FSDP wrapped model according to the specified options
- 返回类型:
dict
- load_checkpoint(load_path)#
Load checkpoint from local path.
- 参数:
load_path (str) -- the directory to load checkpoint.
- 返回类型:
None
- save_checkpoint(save_path, step=0)#
Save checkpoint to local path. Every rank will save its own model and optim shard.
- 参数:
save_path (str) -- the directory to save checkpoint.
step (int)
- 返回类型:
None
- offload_param_and_grad(offload_grad=False)#
Offload FSDP parameters and gradients(options) to CPU.
- 参数:
offload_grad (bool) -- whether to offload gradients.
- 返回类型:
None
- load_param_and_grad(device_id, load_grad=False)#
Load FSDP parameters and gradients(options) to the specified device.
- 参数:
device_id (int) -- the target device id to load parameters and gradients.
load_grad (bool) -- whether to load gradients.
- 返回类型:
None
- offload_optimizer()#
Offload optimizer states to CPU.
- 返回类型:
None
- load_optimizer(device_id)#
Load optimizer states to the specified device.
- 参数:
device_id (int) -- the target device id to load optimizer states.
- 返回类型:
None
- optimizer_step()#
Perform optimizer step using its optimizer, lr_scheduler and grad_scaler.
- 返回:
A tuple of (grad_norm, lr_list), lr_list contains learning rates for all param groups.
- 返回类型:
tuple[float, list[float]]
- build_lr_scheduler(optimizer, optim_config)#
Build the learning rate scheduler based on the configuration. Currently only support LambdaLR scheduler with various warmup styles.
- 参数:
optimizer (
Optimizer) -- The optimizer for which to schedule the learning rate.optim_config (
DictConfig) -- The optimizer config.
- 返回:
The learning rate scheduler.
- 返回类型:
LRScheduler
- build_optimizer(model, enable_critic_warmup=False)#
Build the optimizer based on the configuration, currently only support Adam optimizer.
- 参数:
model (Module | FSDPModule | FullyShardedDataParallel) -- The model to optimize, can be nn.Module, FSDPModule (used in FSDP2) or FSDP.
enable_critic_warmup (bool) -- Whether to enable critic warmup used for value network.
- 返回:
The constructed optimizer.
- 返回类型:
Optimizer
- build_grad_scaler(enabled, **kwargs)#
Build the gradient scaler based on the configuration.
- 参数:
enabled (
bool) -- Whether to enable gradient scaling.kwargs -- Optional parameters for ShardedGradScaler.
- 返回:
The gradient scaler.
- 返回类型:
ShardedGradScaler
- before_micro_batch(model, is_last_micro_batch)#
Setup context manager before processing a micro-batch. This is used to control gradient synchronization behavior. Depending on the specific FSDP strategy being used, if using FSDP, it will return model.no_sync() for non-last micro-batches to avoid gradient synchronization, and nullcontext() for the last micro-batch to ensure gradients are synchronized and updated. If using FSDP2, it will set requires_gradient_sync flag on the model accordingly.
- 参数:
model (FullyShardedDataParallel | FSDPModule) -- The FSDP or FSDPModule model.
is_last_micro_batch (bool) -- A boolean indicating if this is the last micro-batch.
- 返回:
A context manager for the micro-batch processing.
- 返回类型:
ContextManager