Embodied Data 接口#
本节介绍具身场景下 rollout 与训练过程中使用的核心数据结构:
EnvOutput、ChunkStepResult、EmbodiedRolloutResult 和 Trajectory。
它们共同完成从环境输出、chunk step 级结果累积,到轨迹化与批量训练输入的闭环。
整体关系#
EnvOutput:环境每个 chunk step 的原始输出(obs、reward、done 等)。ChunkStepResult:模型推理结果与奖励信号在 chunk step 维度上的封装。EmbodiedRolloutResult:将多个 chunk step 的结果与 transitions 逐步积累。Trajectory:将累积结果整理为轨迹张量(形状通常为[T, B, ...])。
其中 EmbodiedRolloutResult.to_splited_trajectories() 可将轨迹按 batch 维度切分,
用于通过 Channel 分发给多个 Actor/Trainer。
EnvOutput#
EnvOutput 描述环境侧的输出,包含 observation 与 episode 结束信号。
在初始化时,张量会被移动到 CPU 并整理为连续内存。
- class rlinf.data.embodied_io_struct.EnvOutput#
Environment output for a single chunk step.
- static merge_env_outputs(env_outputs)#
Merge multiple env output dicts into one batch-aligned env output.
Merge strategy:
Tensor fields: concatenate on batch dimension.
List fields: flatten in source order.
Nonefields: keepNone.final_obssupports partialNoneacross shards. For shardswithout
final_obs, use the correspondingobsas fallback to keep batch alignment.
- 参数:
env_outputs (list[dict]) -- Per-source env output dicts that share the same schema.
- 返回:
A merged env output dict produced via
EnvOutput(...).to_dict().- 返回类型:
dict[str, Any]
- __init__(*, obs, final_obs=None, dones=None, terminations=None, truncations=None, rewards=None, env_infos=None, intervene_actions=None, intervene_flags=None)#
- 参数:
obs (dict[str, Any])
final_obs (dict[str, Any] | None)
dones (Tensor | None)
terminations (Tensor | None)
truncations (Tensor | None)
rewards (Tensor | None)
env_infos (dict[str, Any] | None)
intervene_actions (Tensor | None)
intervene_flags (Tensor | None)
- 返回类型:
None
ChunkStepResult#
ChunkStepResult 描述单步推理的结果与训练所需的附加信息,
包含动作、对数概率、价值估计与额外的 forward inputs。
初始化时会将张量统一移动到 CPU。
- class rlinf.data.embodied_io_struct.ChunkStepResult#
Model outputs, env outputs (without observations), and training forward inputs for a chunk step.
- __init__(*, actions=None, prev_logprobs=None, prev_values=None, dones=None, truncations=None, terminations=None, rewards=None, forward_inputs=<factory>, versions=None)#
- 参数:
actions (Tensor)
prev_logprobs (Tensor)
prev_values (Tensor)
dones (Tensor)
truncations (Tensor)
terminations (Tensor)
rewards (Tensor)
forward_inputs (dict[str, Tensor])
versions (Tensor)
- 返回类型:
None
EmbodiedRolloutResult#
EmbodiedRolloutResult 负责在 rollout 期间逐步积累 chunk step 级结果与 transitions,
并提供转换为 Trajectory 的方法:
append_step_result():追加 chunk step 级结果append_transitions():追加 curr/next transition 观测to_trajectory():拼接为轨迹张量to_splited_trajectories():按 batch 维度切分轨迹
- class rlinf.data.embodied_io_struct.EmbodiedRolloutResult#
Collect chunk-step results and transitions during rollout, and convert them into trajectory tensors.
- __init__(*, max_episode_length=0, actions=<factory>, intervene_flags=<factory>, rewards=<factory>, terminations=<factory>, truncations=<factory>, dones=<factory>, prev_logprobs=<factory>, prev_values=<factory>, versions=<factory>, forward_inputs=<factory>, curr_obs=<factory>, next_obs=<factory>)#
- 参数:
max_episode_length (int)
actions (list[Tensor])
intervene_flags (list[Tensor])
rewards (list[Tensor])
terminations (list[Tensor])
truncations (list[Tensor])
dones (list[Tensor])
prev_logprobs (list[Tensor])
prev_values (list[Tensor])
versions (list[Tensor])
forward_inputs (list[dict[str, Any]])
curr_obs (list[dict[str, Any]])
next_obs (list[dict[str, Any]])
- 返回类型:
None
Trajectory#
Trajectory 是最终进入训练流程的轨迹表示,包含动作、奖励、终止标记、
观测与模型前向输入等字段。其张量维度一般为 [T, B, ...],
其中 T 表示 chunk step 数, B 表示并行环境数 (batch 维度)。
- class rlinf.data.embodied_io_struct.Trajectory#
trajectory contains multiple episodes.
- __init__(max_episode_length=0, model_weights_id='', actions=None, intervene_flags=None, rewards=None, terminations=None, truncations=None, dones=None, prev_logprobs=None, prev_values=None, versions=None, forward_inputs=<factory>, curr_obs=<factory>, next_obs=<factory>)#
- 参数:
max_episode_length (int)
model_weights_id (str)
actions (Tensor)
intervene_flags (Tensor)
rewards (Tensor)
terminations (Tensor)
truncations (Tensor)
dones (Tensor)
prev_logprobs (Tensor)
prev_values (Tensor)
versions (Tensor)
forward_inputs (dict[str, Any])
curr_obs (dict[str, Any])
next_obs (dict[str, Any])
- 返回类型:
None