Data 接口#

本节介绍 RLinf 中在 Megatron + SGLang 后端 组合下,不同 Worker 之间进行数据传输所使用的关键 数据结构。 其中包含两个基本结构:RolloutRequestRolloutResult

RolloutRequest#

class rlinf.data.io_struct.RolloutRequest#

Attr input_ids: list of input token IDs for rollout n: Number of completions to generate for each input image_data: list of image data (bytes or URLs) for multimodal inputs answers: list of answers for the requests, where each answer can be either a list of strings (for typical tasks) or a dict (for VQA tasks), if available. multi_modal_inputs: list of multi-modal inputs for the requests

to_seq_group_infos()#

Convert the RolloutRequest into a list of SeqGroupInfo objects.

返回:

A list of SeqGroupInfo objects.

返回类型:

list[SeqGroupInfo]

__init__(n, input_ids, image_data, answers, multi_modal_inputs)#
参数:
  • n (int)

  • input_ids (list[list[int]])

  • image_data (list[list[bytes]] | list[list[str]])

  • answers (list[list[str] | dict])

  • multi_modal_inputs (list[dict | None])

返回类型:

None

RolloutResult#

class rlinf.data.io_struct.RolloutResult#

Rollout Result

static from_vllm_results(group_size, results, answers=None, multi_modal_inputs=None, return_logprobs=False)#

Create a RolloutResult from the given vLLM results. every result is generated with n=1, so its outputs len is 1

参数:
  • group_size (int) -- The group size used during rollout.

  • results (list[VllmRequestOutput]) -- The rollout results from vLLM.

  • answers (Optional[Union[list[str], dict]]) -- The answers corresponding to the inputs, notably, if task type is vqa, answers is a dict.

  • multi_modal_inputs (Optional[list[dict]]) -- The multi-modal inputs corresponding to the inputs.

  • return_logprobs (bool) -- Whether to return log probabilities.

返回:

The constructed RolloutResult object.

返回类型:

RolloutResult

static from_sglang_results(results, group_size, input_ids, answers=None, image_data=None, multi_modal_inputs=None, return_logprobs=False)#

Create a MathRolloutResult from the given results and input IDs.

参数:
  • results (list[dict]) -- The rollout results from the model.

  • input_ids (list[list[int]]) -- The input IDs for the prompts.

  • return_logprobs (bool) -- Whether to return log probabilities.

  • group_size (int)

  • answers (list[list[int]] | None)

  • image_data (list[list[bytes]] | list[list[str]] | None)

  • multi_modal_inputs (list[dict] | None)

返回类型:

RolloutResult

static split_result_list_by_group(rollout_results)#

Split RolloutResult objects by group_size.

If input has only one RolloutResult, split it into multiple RolloutResult objects by group_size. If input has multiple RolloutResult objects, split each one and merge the results.

参数:

rollout_results (list[RolloutResult]) -- list of input RolloutResult objects

返回:

list of RolloutResult objects grouped by group_size

返回类型:

list[RolloutResult]

to_actor_batch(data_seq_length, training_seq_length, pad_token)#

Transform the rollout result into a format suitable for the actor.

参数:
  • data_seq_length (int) -- Maximum prompt length, e.g., 1024.

  • training_seq_length (int) -- Total sequence length for training, e.g., 8192. The maximum response length is calculated as training_seq_length - data_seq_length.

  • pad_token (int) -- Token used for padding, e.g., tokenizer.pad_token_id.

返回:

A dictionary with keys:

input_ids (torch.Tensor):

Concatenated prompt and response token IDs, shape [batch_size, training_seq_length].

attention_mask (torch.Tensor):

Attention mask for the whole input sequence, Value: True for or prompt_ids and response_ids, False for others shape [batch_size, training_seq_length].

response_mask (torch.Tensor):

Mask participate in adv and loss calculation for the whole input sequence, To filter ids not generated by llm, Value: True for or response_ids generated by llm, False for others shape [batch_size, training_seq_length].

is_end (torch.Tensor):

Boolean tensor indicating whether the sequence ends, shape [batch_size].

position_ids (torch.Tensor):

Position IDs for the input sequence, shape [batch_size, training_seq_length].

prompt_lengths (torch.Tensor):

Lengths of the prompt sequences, shape [batch_size].

response_lengths (torch.Tensor):

Lengths of the response sequences, shape [batch_size].

advantages (torch.Tensor), optional:

Advantage values for the responses, shape [batch_size, training_seq_length - data_seq_length].

返回类型:

dict[str, torch.Tensor]

static merge_batches(batches)#

Merge two batches into one.

参数:

batches (list[dict[str, Tensor]])

返回类型:

dict[str, Tensor]

static split_results(rollout_result, split_num)#

Split a single RolloutResult into multiple RolloutResult objects by group_size.

参数:
  • rollout_result (RolloutResult) -- The RolloutResult to be split

  • split_num (int)

返回:

list of split RolloutResult objects

返回类型:

list[RolloutResult]

__init__(*, num_sequence, group_size, prompt_lengths, prompt_ids, response_lengths, response_ids, is_end, rewards=None, advantages=None, prompt_texts=None, response_texts=None, answers=None, image_data=None, multi_modal_inputs=None, response_mask=None, rollout_logprobs=None, recompute_prev_logprobs=None, prev_logprobs=None, ref_logprobs=None, values=None, returns=None)#
参数:
  • num_sequence (int)

  • group_size (int)

  • prompt_lengths (list[int])

  • prompt_ids (list[list[int]])

  • response_lengths (list[int])

  • response_ids (list[list[int]])

  • is_end (list[bool])

  • rewards (list[float] | Tensor | None)

  • advantages (list[float] | Tensor | None)

  • prompt_texts (list[str] | None)

  • response_texts (list[str] | None)

  • answers (list[str | dict] | None)

  • image_data (list[list[bytes]] | list[list[str]] | None)

  • multi_modal_inputs (list[dict] | None)

  • response_mask (list[list[int]] | None)

  • rollout_logprobs (list[list[float]] | None)

  • recompute_prev_logprobs (Tensor | None)

  • prev_logprobs (Tensor | None)

  • ref_logprobs (Tensor | None)

  • values (Tensor | None)

  • returns (Tensor | None)

返回类型:

None