Actor 接口#

本节介绍 RLinf 框架中 Actor 类的关键 API。 其实现包括基于 MegatronFSDP 两种后端。

此外,还提供了关于 ModelManager 的信息。 ModelManager 作为 Actor 类的父类,负责管理底层模型,并提供参数加载 / 卸载等关键 API。

MegatronActor#

class rlinf.workers.actor.megatron_actor_worker.MegatronActor#

基类:MegatronModelManager, Worker

The class for running the actor training using Megatron.

__init__(cfg, placement, role='actor')#

Initialize the MegatronActor.

参数:
get_forward_step_func()#

Acquire the forward step function for the model.

run_forward_backward(batch, forward_only=True)#

Run the forward and backward pass on the model.

参数:
  • batch (Dict[str, torch.Tensor]) -- The input batch for the forward pass.

  • forward_only (bool) -- If True, only run the forward pass without backpropagation.

run_forward_backward_iterator(batch_iterator, forward_only=False)#

Run the forward and backward pass on the model using batch resizing iterator.

This function is solely intended for the training step of pipeline mode, which mixes RL pipeline with training pipeline parallelism. So this function enforces forward_only to be false.

参数:
  • batch_iterator (Iterator) -- The input batch iterator for the forward pass.

  • forward_only (bool)

training_step(batch)#

Run a single training step on the model.

参数:

batch (Dict[str, torch.Tensor] | BatchResizingIterator) -- The input batch containing the data for the forward pass.

run_training(input_channel)#

Run the training loop for the actor.

参数:

input_channel (Channel)

run_training_pipeline(input_channel)#

Run the training loop for the actor.

参数:

input_channel (Channel)

scheduler_pre_process()#

Wait for the scheduler to send the pre-process response.

scheduler_scale_sync()#

Get a resharding response from the scheduler and apply this resharding response if it's not None.

scheduler_offload_sync()#

Send offloaded signal to the scheduler.

init_trainer_resharding(first_world_size=-1)#

Init resharding func.

参数:

first_world_size (int)

apply_parallel_strategy(parallel_strategy)#

Apply specified training parallel strategy

get_inference_weight_dst_ranks(inference_tp, inference_pp)#

Calculate the list of ranks corresponding to the first complete inference model parallel group after resharding.

返回:

List of ranks for the first complete inference model parallel group after resharding

run_inference(input_channel, output_channel, compute_ref_logprobs)#

Compute prev/ref logprobs using the actor Model's forward.

参数:
  • input_channel (Channel) -- The input channel to read from.

  • output_channel (Channel) -- The output channel to send results to.

  • compute_ref_logprobs (bool) -- Whether to compute reference logprobs.

compute_advantages_and_returns(batch)#

Compute the advantages and returns.

参数:

batch (Dict[str, torch.Tensor]) -- The rollout batch.

sync_model_to_rollout()#

Send the model weights to the destination ranks in the rollout task.

get_model_state_and_offload()#

Send the model weights to the destination ranks in the rollout task.

When in COLLOCATED mode or when use_pre_process_policy is True, first offload the optimizer and gradients. Then call _get_rollout_model_state_dict(), and finally offload the model weights.

MegatronModelManager#

class rlinf.hybrid_engines.megatron.megatron_model_manager.MegatronModelManager#

Megatron Model Manager for RL training

__init__(cfg)#
参数:

cfg (DictConfig)

setup_model_and_optimizer(model_type=megatron.core.enums.ModelType.encoder_or_decoder)#

Setup model and optimizer.

model_provider_func(pre_process, post_process)#

Model depends on pipeline paralellism.

make_data_iterator_list(data_iterator, padding=False, vpp_size=1)#

Convert the data iterator into the format expected by Megatron. With interleaved pipeline parallelism, Megatron expects a list of one data iterator per model chunk.

参数:
  • data_iterator (Iterator)

  • padding (bool)

  • vpp_size (int)

返回类型:

list[Iterator]

static custom_forward(model, input_ids, attention_mask, position_ids, sequence_parallel, value_model=False, pack_seqs=True, logits_processor=None, logits_processor_args=None, temperature=1.0, max_batch_seqlen=4096)#

Default forward pass for GPT models with optional sequence packing.

参数:
  • logits_processor_args (dict | None)

  • temperature (float)

  • max_batch_seqlen (int)

offload_megatron_copy_params(optimizers)#

Offload optimizer parameters to CPU. Supports both Megatron optimizers and ChainedOptimizer, which wraps a list of underlying optimizers.

参数:

optimizers -- The optimizer or ChainedOptimizer instance.

load_megatron_copy_params(optimizers)#

Load optimizer parameters back to GPU. Handles ChainedOptimizer.

参数:

optimizers -- Optimizer or ChainedOptimizer instance.

FSDPActor#

class rlinf.workers.actor.fsdp_actor_worker.EmbodiedFSDPActor#

基类:FSDPModelManager, Worker

__init__(cfg)#

Initialize FSDP Model Manager.

Assumes:
  • torch.distributed has been initialized outside before calling this constructor.

  • all cfg parameters are validated in valid_fsdp_config.

Params:

cfg: actor config in yaml file. world_size: total number of FSDP actor processes.

参数:

cfg (DictConfig)

model_provider_func()#

Initialize model used by FSDP actor

返回:

the initialized model.

返回类型:

model

async recv_rollout_batch()#

Receive rollout batch from rollout workers.

返回类型:

None

FSDPModelManager#

class rlinf.hybrid_engines.fsdp.fsdp_model_manager.FSDPModelManager#

FSDP Model Manager for RL training

__init__(cfg, world_size, rank)#

Initialize FSDP Model Manager.

Assumes:
  • torch.distributed has been initialized outside before calling this constructor.

  • all cfg parameters are validated in valid_fsdp_config.

Params:

cfg: actor config in yaml file. world_size: total number of FSDP actor processes.

参数:
  • cfg (DictConfig)

  • world_size (int)

  • rank (int)

返回类型:

None

model_provider_func()#

Initialize model used by FSDP actor

返回:

the initialized model.

返回类型:

model

setup_model_and_optimizer()#

Setup model, lr_scheduler, optimizer and grad_scaler.

返回类型:

None

get_model_state_dict()#

Get full model state dict.

返回类型:

dict

load_checkpoint(load_path)#

Load checkpoint from local path.

Params:

load_path: the directory to load checkpoint.

参数:

load_path (str)

返回类型:

None

save_checkpoint(save_path)#

Save checkpoint to local path. Every rank will save its own model and optim shard.

Params:

save_path: the directory to save checkpoint.

参数:

save_path (str)

返回类型:

None

offload_param_and_grad(offload_grad=False)#

Offload FSDP parameters and gradients(options) to CPU.

Params:

offload_grad: whether to offload gradients.

参数:

offload_grad (bool)

返回类型:

None

load_param_and_grad(device_id, load_grad=False)#

Load FSDP parameters and gradients(options) to the specified device.

Params:

device_id: the target device id to load parameters and gradients. load_grad: whether to load gradients.

参数:
  • device_id (int)

  • load_grad (bool)

返回类型:

None

offload_optimizer()#

Offload optimizer states to CPU.

返回类型:

None

load_optimizer(device_id)#

Load optimizer states to the specified device.

Params:

device_id: the target device id to load optimizer states.

参数:

device_id (int)

返回类型:

None

optimizer_step()#

Perform optimizer step using its optimizer, lr_scheduler and grad_scaler.

返回:

A tuple of (grad_norm, lr_list), lr_list contains learning rates for all param groups.

返回类型:

tuple[float, list[float]]

build_lr_scheduler(optimizer)#

Build the learning rate scheduler based on the configuration. Currently only support LambdaLR scheduler with various warmup styles.

参数:

optimizer (Optimizer) -- The optimizer for which to schedule the learning rate.

返回:

The learning rate scheduler.

返回类型:

LRScheduler

build_optimizer(model, enable_critic_warmup=False)#

Build the optimizer based on the configuration, currently only support Adam optimizer.

参数:
  • model (Module | FSDPModule | FullyShardedDataParallel) -- The model to optimize, can be nn.Module, FSDPModule (used in FSDP2) or FSDP.

  • enable_critic_warmup (bool) -- Whether to enable critic warmup used for value network.

返回:

The constructed optimizer.

返回类型:

Optimizer

build_grad_scaler(enabled)#

Build the gradient scaler based on the configuration.

参数:

enabled (bool) -- Whether to enable gradient scaling.

返回:

The gradient scaler.

返回类型:

GradScaler

before_micro_batch(model, is_last_micro_batch)#

Setup context manager before processing a micro-batch. This is used to control gradient synchronization behavior. Depending on the specific FSDP strategy being used, if using FSDP, it will return model.no_sync() for non-last micro-batches to avoid gradient synchronization, and nullcontext() for the last micro-batch to ensure gradients are synchronized and updated. If using FSDP2, it will set requires_gradient_sync flag on the model accordingly.

参数:
  • model (FullyShardedDataParallel | FSDPModule) -- The FSDP or FSDPModule model.

  • is_last_micro_batch (bool) -- A boolean indicating if this is the last micro-batch.

返回:

A context manager for the micro-batch processing.

返回类型:

ContextManager