Embodied Data 接口#

本节介绍具身场景下 rollout 与训练过程中使用的核心数据结构: EnvOutputChunkStepResultEmbodiedRolloutResultTrajectory。 它们共同完成从环境输出、chunk step 级结果累积,到轨迹化与批量训练输入的闭环。

整体关系#

  • EnvOutput:环境每个 chunk step 的原始输出(obs、reward、done 等)。

  • ChunkStepResult:模型推理结果与奖励信号在 chunk step 维度上的封装。

  • EmbodiedRolloutResult:将多个 chunk step 的结果与 transitions 逐步积累。

  • Trajectory:将累积结果整理为轨迹张量(形状通常为 [T, B, ...])。

其中 EmbodiedRolloutResult.to_splited_trajectories() 可将轨迹按 batch 维度切分, 用于通过 Channel 分发给多个 Actor/Trainer。

EnvOutput#

EnvOutput 描述环境侧的输出,包含 observation 与 episode 结束信号。 在初始化时,张量会被移动到 CPU 并整理为连续内存。

class rlinf.data.embodied_io_struct.EnvOutput#

Environment output for a single chunk step.

static merge_env_outputs(env_outputs)#

Merge multiple env output dicts into one batch-aligned env output.

Merge strategy:

  • Tensor fields: concatenate on batch dimension.

  • List fields: flatten in source order.

  • None fields: keep None.

  • final_obs supports partial None across shards. For shards

    without final_obs, use the corresponding obs as fallback to keep batch alignment.

参数:

env_outputs (list[dict]) -- Per-source env output dicts that share the same schema.

返回:

A merged env output dict produced via EnvOutput(...).to_dict().

返回类型:

dict[str, Any]

__init__(*, obs, final_obs=None, dones=None, terminations=None, truncations=None, rewards=None, env_infos=None, intervene_actions=None, intervene_flags=None)#
参数:
  • obs (dict[str, Any])

  • final_obs (dict[str, Any] | None)

  • dones (Tensor | None)

  • terminations (Tensor | None)

  • truncations (Tensor | None)

  • rewards (Tensor | None)

  • env_infos (dict[str, Any] | None)

  • intervene_actions (Tensor | None)

  • intervene_flags (Tensor | None)

返回类型:

None

ChunkStepResult#

ChunkStepResult 描述单步推理的结果与训练所需的附加信息, 包含动作、对数概率、价值估计与额外的 forward inputs。 初始化时会将张量统一移动到 CPU。

class rlinf.data.embodied_io_struct.ChunkStepResult#

Model outputs, env outputs (without observations), and training forward inputs for a chunk step.

__init__(*, actions=None, prev_logprobs=None, prev_values=None, dones=None, truncations=None, terminations=None, rewards=None, forward_inputs=<factory>, versions=None)#
参数:
  • actions (Tensor)

  • prev_logprobs (Tensor)

  • prev_values (Tensor)

  • dones (Tensor)

  • truncations (Tensor)

  • terminations (Tensor)

  • rewards (Tensor)

  • forward_inputs (dict[str, Tensor])

  • versions (Tensor)

返回类型:

None

EmbodiedRolloutResult#

EmbodiedRolloutResult 负责在 rollout 期间逐步积累 chunk step 级结果与 transitions, 并提供转换为 Trajectory 的方法:

  • append_step_result():追加 chunk step 级结果

  • append_transitions():追加 curr/next transition 观测

  • to_trajectory():拼接为轨迹张量

  • to_splited_trajectories():按 batch 维度切分轨迹

class rlinf.data.embodied_io_struct.EmbodiedRolloutResult#

Collect chunk-step results and transitions during rollout, and convert them into trajectory tensors.

__init__(*, max_episode_length=0, actions=<factory>, intervene_flags=<factory>, rewards=<factory>, terminations=<factory>, truncations=<factory>, dones=<factory>, prev_logprobs=<factory>, prev_values=<factory>, versions=<factory>, forward_inputs=<factory>, curr_obs=<factory>, next_obs=<factory>)#
参数:
  • max_episode_length (int)

  • actions (list[Tensor])

  • intervene_flags (list[Tensor])

  • rewards (list[Tensor])

  • terminations (list[Tensor])

  • truncations (list[Tensor])

  • dones (list[Tensor])

  • prev_logprobs (list[Tensor])

  • prev_values (list[Tensor])

  • versions (list[Tensor])

  • forward_inputs (list[dict[str, Any]])

  • curr_obs (list[dict[str, Any]])

  • next_obs (list[dict[str, Any]])

返回类型:

None

Trajectory#

Trajectory 是最终进入训练流程的轨迹表示,包含动作、奖励、终止标记、 观测与模型前向输入等字段。其张量维度一般为 [T, B, ...], 其中 T 表示 chunk step 数B 表示并行环境数 (batch 维度)。

class rlinf.data.embodied_io_struct.Trajectory#

trajectory contains multiple episodes.

__init__(max_episode_length=0, model_weights_id='', actions=None, intervene_flags=None, rewards=None, terminations=None, truncations=None, dones=None, prev_logprobs=None, prev_values=None, versions=None, forward_inputs=<factory>, curr_obs=<factory>, next_obs=<factory>)#
参数:
  • max_episode_length (int)

  • model_weights_id (str)

  • actions (Tensor)

  • intervene_flags (Tensor)

  • rewards (Tensor)

  • terminations (Tensor)

  • truncations (Tensor)

  • dones (Tensor)

  • prev_logprobs (Tensor)

  • prev_values (Tensor)

  • versions (Tensor)

  • forward_inputs (dict[str, Any])

  • curr_obs (dict[str, Any])

  • next_obs (dict[str, Any])

返回类型:

None