Data 接口#
本节介绍 RLinf 中在 Megatron + SGLang 后端 组合下,不同 Worker 之间进行数据传输所使用的关键 数据结构。
其中包含两个基本结构:RolloutRequest 和 RolloutResult。
RolloutRequest#
- class rlinf.data.io_struct.RolloutRequest#
Attr input_ids: list of input token IDs for rollout n: Number of completions to generate for each input image_data: list of image data (bytes or URLs) for multimodal inputs answers: list of answers for the requests, where each answer can be either a list of strings (for typical tasks) or a dict (for VQA tasks), if available. multi_modal_inputs: list of multi-modal inputs for the requests
- to_seq_group_infos()#
Convert the RolloutRequest into a list of SeqGroupInfo objects.
- 返回:
A list of SeqGroupInfo objects.
- 返回类型:
list[SeqGroupInfo]
- __init__(n, input_ids, image_data, answers, multi_modal_inputs)#
- 参数:
n (int)
input_ids (list[list[int]])
image_data (list[list[bytes]] | list[list[str]])
answers (list[list[str] | dict])
multi_modal_inputs (list[dict | None])
- 返回类型:
None
RolloutResult#
- class rlinf.data.io_struct.RolloutResult#
Rollout Result
- static from_vllm_results(group_size, results, answers=None, multi_modal_inputs=None, return_logprobs=False)#
Create a RolloutResult from the given vLLM results. every result is generated with n=1, so its outputs len is 1
- 参数:
group_size (
int) -- The group size used during rollout.results (
list[VllmRequestOutput]) -- The rollout results from vLLM.answers (
Optional[Union[list[str],dict]]) -- The answers corresponding to the inputs, notably, if task type is vqa, answers is a dict.multi_modal_inputs (
Optional[list[dict]]) -- The multi-modal inputs corresponding to the inputs.return_logprobs (
bool) -- Whether to return log probabilities.
- 返回:
The constructed RolloutResult object.
- 返回类型:
- static from_sglang_results(results, group_size, input_ids, answers=None, image_data=None, multi_modal_inputs=None, return_logprobs=False)#
Create a MathRolloutResult from the given results and input IDs.
- 参数:
results (
list[dict]) -- The rollout results from the model.input_ids (
list[list[int]]) -- The input IDs for the prompts.return_logprobs (
bool) -- Whether to return log probabilities.group_size (int)
answers (list[list[int]] | None)
image_data (list[list[bytes]] | list[list[str]] | None)
multi_modal_inputs (list[dict] | None)
- 返回类型:
- static split_result_list_by_group(rollout_results)#
Split RolloutResult objects by group_size.
If input has only one RolloutResult, split it into multiple RolloutResult objects by group_size. If input has multiple RolloutResult objects, split each one and merge the results.
- 参数:
rollout_results (list[RolloutResult]) -- list of input RolloutResult objects
- 返回:
list of RolloutResult objects grouped by group_size
- 返回类型:
list[RolloutResult]
- to_actor_batch(data_seq_length, training_seq_length, pad_token)#
Transform the rollout result into a format suitable for the actor.
- 参数:
data_seq_length (
int) -- Maximum prompt length, e.g., 1024.training_seq_length (
int) -- Total sequence length for training, e.g., 8192. The maximum response length is calculated astraining_seq_length - data_seq_length.pad_token (
int) -- Token used for padding, e.g.,tokenizer.pad_token_id.
- 返回:
A dictionary with keys:
- input_ids (torch.Tensor):
Concatenated prompt and response token IDs, shape
[batch_size, training_seq_length].- attention_mask (torch.Tensor):
Attention mask for the whole input sequence, Value: True for or prompt_ids and response_ids, False for others shape
[batch_size, training_seq_length].- response_mask (torch.Tensor):
Mask participate in adv and loss calculation for the whole input sequence, To filter ids not generated by llm, Value: True for or response_ids generated by llm, False for others shape
[batch_size, training_seq_length].- is_end (torch.Tensor):
Boolean tensor indicating whether the sequence ends, shape
[batch_size].- position_ids (torch.Tensor):
Position IDs for the input sequence, shape
[batch_size, training_seq_length].- prompt_lengths (torch.Tensor):
Lengths of the prompt sequences, shape
[batch_size].- response_lengths (torch.Tensor):
Lengths of the response sequences, shape
[batch_size].- advantages (torch.Tensor), optional:
Advantage values for the responses, shape
[batch_size, training_seq_length - data_seq_length].
- 返回类型:
dict[str, torch.Tensor]
- static merge_batches(batches)#
Merge two batches into one.
- 参数:
batches (list[dict[str, Tensor]])
- 返回类型:
dict[str, Tensor]
- static split_results(rollout_result, split_num)#
Split a single RolloutResult into multiple RolloutResult objects by group_size.
- 参数:
rollout_result (RolloutResult) -- The RolloutResult to be split
split_num (int)
- 返回:
list of split RolloutResult objects
- 返回类型:
list[RolloutResult]
- __init__(*, num_sequence, group_size, prompt_lengths, prompt_ids, response_lengths, response_ids, is_end, rewards=None, advantages=None, prompt_texts=None, response_texts=None, answers=None, image_data=None, multi_modal_inputs=None, response_mask=None, rollout_logprobs=None, recompute_prev_logprobs=None, prev_logprobs=None, ref_logprobs=None, values=None, returns=None)#
- 参数:
num_sequence (int)
group_size (int)
prompt_lengths (list[int])
prompt_ids (list[list[int]])
response_lengths (list[int])
response_ids (list[list[int]])
is_end (list[bool])
rewards (list[float] | Tensor | None)
advantages (list[float] | Tensor | None)
prompt_texts (list[str] | None)
response_texts (list[str] | None)
answers (list[str | dict] | None)
image_data (list[list[bytes]] | list[list[str]] | None)
multi_modal_inputs (list[dict] | None)
response_mask (list[list[int]] | None)
rollout_logprobs (list[list[float]] | None)
recompute_prev_logprobs (Tensor | None)
prev_logprobs (Tensor | None)
ref_logprobs (Tensor | None)
values (Tensor | None)
returns (Tensor | None)
- 返回类型:
None