Managers#

Manager modules for handling different aspects of robot learning.

Overview#

Managers are specialized components that handle specific aspects of the learning problem:

  • ObservationManager: Computes observations from the scene

  • ActionManager: Processes and applies actions

  • RewardManager: Calculates reward signals

  • CommandManager: Generates command targets

  • TerminationManager: Checks termination conditions

  • CurriculumManager: Implements training curricula

  • EventManager: Handles domain randomization

  • ObjectManager: Manages dynamic objects

Manager Base Class#

All managers inherit from ManagerBase:

class ManagerBase:
    def __init__(self, cfg: ManagerBaseCfg, scene: LabScene):
        """
        Initialize manager.
        
        Args:
            cfg: Manager configuration
            scene: The simulation scene
        """
        self.cfg = cfg
        self.scene = scene
        self.terms = {}
        
    def reset(self, env_ids: torch.Tensor | None = None):
        """Reset manager state."""
        pass
    
    def compute(self) -> Any:
        """Compute manager output."""
        pass

ObservationManager#

Computes observation tensors from individual observation terms.

Configuration#

@configclass
class ObservationManagerCfg:
    # Policy observations (actor)
    base_lin_vel = ObservationTermCfg(
        func=observations.base_lin_vel,
        noise=UniformNoiseCfg(min=-0.1, max=0.1),
        scale=1.0
    )
    
    joint_pos = ObservationTermCfg(
        func=observations.joint_pos,
        scale=1.0
    )
    
    # Privileged observations (critic)
    class critic:
        terrain_height = ObservationTermCfg(
            func=observations.terrain_height_scan
        )

API#

class ObservationManager(ManagerBase):
    def compute(self) -> dict[str, torch.Tensor]:
        """
        Compute observations.
        
        Returns:
            Dictionary with keys:
            - "obs": Policy observations [num_envs, obs_dim]
            - "obs_critic": Privileged observations [num_envs, critic_obs_dim]
        """
        pass
    
    def compute_group(self, group: str = "obs") -> torch.Tensor:
        """Compute specific observation group."""
        pass

Common Observation Terms#

Robot State:

  • base_lin_vel: Base linear velocity (3)

  • base_ang_vel: Base angular velocity (3)

  • projected_gravity: Gravity direction in base frame (3)

  • base_pos: Base position (3)

  • base_quat: Base orientation quaternion (4)

Joint State:

  • joint_pos: Joint positions (num_dof)

  • joint_vel: Joint velocities (num_dof)

  • joint_pos_rel: Joint positions relative to default (num_dof)

Commands:

  • commands: Current commands (cmd_dim)

History:

  • last_actions: Previous actions (action_dim)

Sensors:

  • imu_data: IMU measurements

  • camera_rgb: RGB images

  • lidar_scan: LiDAR point cloud

RewardManager#

Computes total reward from weighted reward terms.

Configuration#

@configclass
class RewardManagerCfg:
    # Main objectives
    forward_vel = RewardTermCfg(
        func=rewards.forward_velocity,
        weight=1.0,
        params={"target_vel": 1.0}
    )
    
    # Regularization
    energy_penalty = RewardTermCfg(
        func=rewards.energy_consumption,
        weight=-0.001
    )
    
    action_smoothness = RewardTermCfg(
        func=rewards.action_rate,
        weight=-0.01
    )

API#

class RewardManager(ManagerBase):
    def compute(self) -> torch.Tensor:
        """
        Compute total reward.
        
        Returns:
            Total reward tensor [num_envs]
        """
        pass
    
    @property
    def term_rewards(self) -> dict[str, torch.Tensor]:
        """Get individual reward terms for logging."""
        pass

Common Reward Terms#

Tracking Rewards:

  • forward_velocity: Reward for forward motion

  • velocity_tracking: Track command velocities

  • orientation_tracking: Maintain upright orientation

Regularization:

  • energy_consumption: Penalize high motor torques

  • action_rate: Penalize action changes

  • joint_acceleration: Penalize joint accelerations

  • joint_limits: Penalize near joint limits

Constraints:

  • termination_penalty: Large penalty on termination

  • contact_penalty: Penalize undesired contacts

ActionManager#

Processes actions from the policy and applies them to the robot.

Configuration#

@configclass
class ActionManagerCfg:
    joint_positions = ActionTermCfg(
        func=actions.joint_position_action,
        scale=0.25,  # Action scaling
        clip=(-1.0, 1.0),  # Clip range
        params={"offset": 0.0}
    )

API#

class ActionManager(ManagerBase):
    def process(self, actions: torch.Tensor) -> None:
        """
        Process and apply actions.
        
        Args:
            actions: Raw policy actions [num_envs, action_dim]
        """
        pass
    
    @property
    def last_actions(self) -> torch.Tensor:
        """Get previous actions for observation."""
        pass

Common Action Terms#

  • joint_position_action: PD position control

  • joint_velocity_action: PD velocity control

  • joint_torque_action: Direct torque control

CommandManager#

Generates and manages command targets for the robot.

Configuration#

@configclass
class CommandManagerCfg:
    base_velocity = CommandTermCfg(
        func=commands.uniform_velocity_command,
        resampling_time=10.0,  # Resample every 10s
        ranges=VelocityCommandRanges(
            lin_vel_x=(0.0, 1.0),
            lin_vel_y=(-0.5, 0.5),
            ang_vel_z=(-1.0, 1.0)
        )
    )

API#

class CommandManager(ManagerBase):
    def reset(self, env_ids: torch.Tensor | None = None):
        """Resample commands for specified environments."""
        pass
    
    def compute(self) -> torch.Tensor:
        """
        Get current commands.
        
        Returns:
            Command tensor [num_envs, cmd_dim]
        """
        pass

Common Command Terms#

  • uniform_velocity_command: Random velocity commands

  • pose_command: Target pose

  • direction_command: Direction to follow

TerminationManager#

Checks conditions for episode termination.

Configuration#

@configclass
class TerminationManagerCfg:
    timeout = TimeoutTermCfg(max_time=20.0)
    
    base_height = TerminationTermCfg(
        func=terminations.base_height_below,
        params={"threshold": 0.3}
    )
    
    base_orientation = TerminationTermCfg(
        func=terminations.base_orientation,
        params={"threshold": 0.7}
    )

API#

class TerminationManager(ManagerBase):
    def compute(self) -> tuple[torch.Tensor, torch.Tensor]:
        """
        Check termination conditions.
        
        Returns:
            Tuple of:
            - terminated: True if episode failed [num_envs]
            - truncated: True if episode timed out [num_envs]
        """
        pass

Common Termination Terms#

  • timeout: Episode time limit

  • base_height_below: Robot fell

  • base_orientation: Robot tipped over

  • out_of_bounds: Robot left arena

CurriculumManager#

Implements progressive training curricula.

Configuration#

@configclass
class CurriculumManagerCfg:
    terrain_levels = CurriculumTermCfg(
        func=curriculum.terrain_difficulty,
        start_level=0,
        end_level=10,
        num_steps=1_000_000
    )

API#

class CurriculumManager(ManagerBase):
    def update(self, metrics: dict[str, float]):
        """Update curriculum based on performance."""
        pass
    
    def get_level(self) -> int:
        """Get current difficulty level."""
        pass

EventManager#

Handles domain randomization and episodic events.

Configuration#

@configclass
class EventManagerCfg:
    # On reset events
    reset_base = EventTermCfg(
        func=events.reset_root_state_uniform,
        mode="reset"
    )
    
    # Interval events
    push_robot = EventTermCfg(
        func=events.apply_external_force,
        mode="interval",
        interval_steps=250,
        params={"force_range": (0, 100)}
    )
    
    # Startup events
    randomize_mass = EventTermCfg(
        func=events.randomize_rigid_body_mass,
        mode="startup",
        params={"mass_range": (0.8, 1.2)}
    )

API#

class EventManager(ManagerBase):
    def reset(self, env_ids: torch.Tensor | None = None):
        """Apply reset events."""
        pass
    
    def step(self):
        """Apply interval events."""
        pass

Next Steps#