Managers#
Manager modules for handling different aspects of robot learning.
Overview#
Managers are specialized components that handle specific aspects of the learning problem:
ObservationManager: Computes observations from the scene
ActionManager: Processes and applies actions
RewardManager: Calculates reward signals
CommandManager: Generates command targets
TerminationManager: Checks termination conditions
CurriculumManager: Implements training curricula
EventManager: Handles domain randomization
ObjectManager: Manages dynamic objects
Manager Base Class#
All managers inherit from ManagerBase:
class ManagerBase:
def __init__(self, cfg: ManagerBaseCfg, scene: LabScene):
"""
Initialize manager.
Args:
cfg: Manager configuration
scene: The simulation scene
"""
self.cfg = cfg
self.scene = scene
self.terms = {}
def reset(self, env_ids: torch.Tensor | None = None):
"""Reset manager state."""
pass
def compute(self) -> Any:
"""Compute manager output."""
pass
ObservationManager#
Computes observation tensors from individual observation terms.
Configuration#
@configclass
class ObservationManagerCfg:
# Policy observations (actor)
base_lin_vel = ObservationTermCfg(
func=observations.base_lin_vel,
noise=UniformNoiseCfg(min=-0.1, max=0.1),
scale=1.0
)
joint_pos = ObservationTermCfg(
func=observations.joint_pos,
scale=1.0
)
# Privileged observations (critic)
class critic:
terrain_height = ObservationTermCfg(
func=observations.terrain_height_scan
)
API#
class ObservationManager(ManagerBase):
def compute(self) -> dict[str, torch.Tensor]:
"""
Compute observations.
Returns:
Dictionary with keys:
- "obs": Policy observations [num_envs, obs_dim]
- "obs_critic": Privileged observations [num_envs, critic_obs_dim]
"""
pass
def compute_group(self, group: str = "obs") -> torch.Tensor:
"""Compute specific observation group."""
pass
Common Observation Terms#
Robot State:
base_lin_vel: Base linear velocity (3)base_ang_vel: Base angular velocity (3)projected_gravity: Gravity direction in base frame (3)base_pos: Base position (3)base_quat: Base orientation quaternion (4)
Joint State:
joint_pos: Joint positions (num_dof)joint_vel: Joint velocities (num_dof)joint_pos_rel: Joint positions relative to default (num_dof)
Commands:
commands: Current commands (cmd_dim)
History:
last_actions: Previous actions (action_dim)
Sensors:
imu_data: IMU measurementscamera_rgb: RGB imageslidar_scan: LiDAR point cloud
RewardManager#
Computes total reward from weighted reward terms.
Configuration#
@configclass
class RewardManagerCfg:
# Main objectives
forward_vel = RewardTermCfg(
func=rewards.forward_velocity,
weight=1.0,
params={"target_vel": 1.0}
)
# Regularization
energy_penalty = RewardTermCfg(
func=rewards.energy_consumption,
weight=-0.001
)
action_smoothness = RewardTermCfg(
func=rewards.action_rate,
weight=-0.01
)
API#
class RewardManager(ManagerBase):
def compute(self) -> torch.Tensor:
"""
Compute total reward.
Returns:
Total reward tensor [num_envs]
"""
pass
@property
def term_rewards(self) -> dict[str, torch.Tensor]:
"""Get individual reward terms for logging."""
pass
Common Reward Terms#
Tracking Rewards:
forward_velocity: Reward for forward motionvelocity_tracking: Track command velocitiesorientation_tracking: Maintain upright orientation
Regularization:
energy_consumption: Penalize high motor torquesaction_rate: Penalize action changesjoint_acceleration: Penalize joint accelerationsjoint_limits: Penalize near joint limits
Constraints:
termination_penalty: Large penalty on terminationcontact_penalty: Penalize undesired contacts
ActionManager#
Processes actions from the policy and applies them to the robot.
Configuration#
@configclass
class ActionManagerCfg:
joint_positions = ActionTermCfg(
func=actions.joint_position_action,
scale=0.25, # Action scaling
clip=(-1.0, 1.0), # Clip range
params={"offset": 0.0}
)
API#
class ActionManager(ManagerBase):
def process(self, actions: torch.Tensor) -> None:
"""
Process and apply actions.
Args:
actions: Raw policy actions [num_envs, action_dim]
"""
pass
@property
def last_actions(self) -> torch.Tensor:
"""Get previous actions for observation."""
pass
Common Action Terms#
joint_position_action: PD position controljoint_velocity_action: PD velocity controljoint_torque_action: Direct torque control
CommandManager#
Generates and manages command targets for the robot.
Configuration#
@configclass
class CommandManagerCfg:
base_velocity = CommandTermCfg(
func=commands.uniform_velocity_command,
resampling_time=10.0, # Resample every 10s
ranges=VelocityCommandRanges(
lin_vel_x=(0.0, 1.0),
lin_vel_y=(-0.5, 0.5),
ang_vel_z=(-1.0, 1.0)
)
)
API#
class CommandManager(ManagerBase):
def reset(self, env_ids: torch.Tensor | None = None):
"""Resample commands for specified environments."""
pass
def compute(self) -> torch.Tensor:
"""
Get current commands.
Returns:
Command tensor [num_envs, cmd_dim]
"""
pass
Common Command Terms#
uniform_velocity_command: Random velocity commandspose_command: Target posedirection_command: Direction to follow
TerminationManager#
Checks conditions for episode termination.
Configuration#
@configclass
class TerminationManagerCfg:
timeout = TimeoutTermCfg(max_time=20.0)
base_height = TerminationTermCfg(
func=terminations.base_height_below,
params={"threshold": 0.3}
)
base_orientation = TerminationTermCfg(
func=terminations.base_orientation,
params={"threshold": 0.7}
)
API#
class TerminationManager(ManagerBase):
def compute(self) -> tuple[torch.Tensor, torch.Tensor]:
"""
Check termination conditions.
Returns:
Tuple of:
- terminated: True if episode failed [num_envs]
- truncated: True if episode timed out [num_envs]
"""
pass
Common Termination Terms#
timeout: Episode time limitbase_height_below: Robot fellbase_orientation: Robot tipped overout_of_bounds: Robot left arena
CurriculumManager#
Implements progressive training curricula.
Configuration#
@configclass
class CurriculumManagerCfg:
terrain_levels = CurriculumTermCfg(
func=curriculum.terrain_difficulty,
start_level=0,
end_level=10,
num_steps=1_000_000
)
API#
class CurriculumManager(ManagerBase):
def update(self, metrics: dict[str, float]):
"""Update curriculum based on performance."""
pass
def get_level(self) -> int:
"""Get current difficulty level."""
pass
EventManager#
Handles domain randomization and episodic events.
Configuration#
@configclass
class EventManagerCfg:
# On reset events
reset_base = EventTermCfg(
func=events.reset_root_state_uniform,
mode="reset"
)
# Interval events
push_robot = EventTermCfg(
func=events.apply_external_force,
mode="interval",
interval_steps=250,
params={"force_range": (0, 100)}
)
# Startup events
randomize_mass = EventTermCfg(
func=events.randomize_rigid_body_mass,
mode="startup",
params={"mass_range": (0.8, 1.2)}
)
API#
class EventManager(ManagerBase):
def reset(self, env_ids: torch.Tensor | None = None):
"""Apply reset events."""
pass
def step(self):
"""Apply interval events."""
pass
Next Steps#
See observation terms reference
See reward terms reference
See action terms reference
Check examples