# Managers Manager modules for handling different aspects of robot learning. ## Overview Managers are specialized components that handle specific aspects of the learning problem: - **ObservationManager**: Computes observations from the scene - **ActionManager**: Processes and applies actions - **RewardManager**: Calculates reward signals - **CommandManager**: Generates command targets - **TerminationManager**: Checks termination conditions - **CurriculumManager**: Implements training curricula - **EventManager**: Handles domain randomization - **ObjectManager**: Manages dynamic objects ## Manager Base Class All managers inherit from `ManagerBase`: ```python class ManagerBase: def __init__(self, cfg: ManagerBaseCfg, scene: LabScene): """ Initialize manager. Args: cfg: Manager configuration scene: The simulation scene """ self.cfg = cfg self.scene = scene self.terms = {} def reset(self, env_ids: torch.Tensor | None = None): """Reset manager state.""" pass def compute(self) -> Any: """Compute manager output.""" pass ``` ## ObservationManager Computes observation tensors from individual observation terms. ### Configuration ```python @configclass class ObservationManagerCfg: # Policy observations (actor) base_lin_vel = ObservationTermCfg( func=observations.base_lin_vel, noise=UniformNoiseCfg(min=-0.1, max=0.1), scale=1.0 ) joint_pos = ObservationTermCfg( func=observations.joint_pos, scale=1.0 ) # Privileged observations (critic) class critic: terrain_height = ObservationTermCfg( func=observations.terrain_height_scan ) ``` ### API ```python class ObservationManager(ManagerBase): def compute(self) -> dict[str, torch.Tensor]: """ Compute observations. Returns: Dictionary with keys: - "obs": Policy observations [num_envs, obs_dim] - "obs_critic": Privileged observations [num_envs, critic_obs_dim] """ pass def compute_group(self, group: str = "obs") -> torch.Tensor: """Compute specific observation group.""" pass ``` ### Common Observation Terms **Robot State**: - `base_lin_vel`: Base linear velocity (3) - `base_ang_vel`: Base angular velocity (3) - `projected_gravity`: Gravity direction in base frame (3) - `base_pos`: Base position (3) - `base_quat`: Base orientation quaternion (4) **Joint State**: - `joint_pos`: Joint positions (num_dof) - `joint_vel`: Joint velocities (num_dof) - `joint_pos_rel`: Joint positions relative to default (num_dof) **Commands**: - `commands`: Current commands (cmd_dim) **History**: - `last_actions`: Previous actions (action_dim) **Sensors**: - `imu_data`: IMU measurements - `camera_rgb`: RGB images - `lidar_scan`: LiDAR point cloud ## RewardManager Computes total reward from weighted reward terms. ### Configuration ```python @configclass class RewardManagerCfg: # Main objectives forward_vel = RewardTermCfg( func=rewards.forward_velocity, weight=1.0, params={"target_vel": 1.0} ) # Regularization energy_penalty = RewardTermCfg( func=rewards.energy_consumption, weight=-0.001 ) action_smoothness = RewardTermCfg( func=rewards.action_rate, weight=-0.01 ) ``` ### API ```python class RewardManager(ManagerBase): def compute(self) -> torch.Tensor: """ Compute total reward. Returns: Total reward tensor [num_envs] """ pass @property def term_rewards(self) -> dict[str, torch.Tensor]: """Get individual reward terms for logging.""" pass ``` ### Common Reward Terms **Tracking Rewards**: - `forward_velocity`: Reward for forward motion - `velocity_tracking`: Track command velocities - `orientation_tracking`: Maintain upright orientation **Regularization**: - `energy_consumption`: Penalize high motor torques - `action_rate`: Penalize action changes - `joint_acceleration`: Penalize joint accelerations - `joint_limits`: Penalize near joint limits **Constraints**: - `termination_penalty`: Large penalty on termination - `contact_penalty`: Penalize undesired contacts ## ActionManager Processes actions from the policy and applies them to the robot. ### Configuration ```python @configclass class ActionManagerCfg: joint_positions = ActionTermCfg( func=actions.joint_position_action, scale=0.25, # Action scaling clip=(-1.0, 1.0), # Clip range params={"offset": 0.0} ) ``` ### API ```python class ActionManager(ManagerBase): def process(self, actions: torch.Tensor) -> None: """ Process and apply actions. Args: actions: Raw policy actions [num_envs, action_dim] """ pass @property def last_actions(self) -> torch.Tensor: """Get previous actions for observation.""" pass ``` ### Common Action Terms - `joint_position_action`: PD position control - `joint_velocity_action`: PD velocity control - `joint_torque_action`: Direct torque control ## CommandManager Generates and manages command targets for the robot. ### Configuration ```python @configclass class CommandManagerCfg: base_velocity = CommandTermCfg( func=commands.uniform_velocity_command, resampling_time=10.0, # Resample every 10s ranges=VelocityCommandRanges( lin_vel_x=(0.0, 1.0), lin_vel_y=(-0.5, 0.5), ang_vel_z=(-1.0, 1.0) ) ) ``` ### API ```python class CommandManager(ManagerBase): def reset(self, env_ids: torch.Tensor | None = None): """Resample commands for specified environments.""" pass def compute(self) -> torch.Tensor: """ Get current commands. Returns: Command tensor [num_envs, cmd_dim] """ pass ``` ### Common Command Terms - `uniform_velocity_command`: Random velocity commands - `pose_command`: Target pose - `direction_command`: Direction to follow ## TerminationManager Checks conditions for episode termination. ### Configuration ```python @configclass class TerminationManagerCfg: timeout = TimeoutTermCfg(max_time=20.0) base_height = TerminationTermCfg( func=terminations.base_height_below, params={"threshold": 0.3} ) base_orientation = TerminationTermCfg( func=terminations.base_orientation, params={"threshold": 0.7} ) ``` ### API ```python class TerminationManager(ManagerBase): def compute(self) -> tuple[torch.Tensor, torch.Tensor]: """ Check termination conditions. Returns: Tuple of: - terminated: True if episode failed [num_envs] - truncated: True if episode timed out [num_envs] """ pass ``` ### Common Termination Terms - `timeout`: Episode time limit - `base_height_below`: Robot fell - `base_orientation`: Robot tipped over - `out_of_bounds`: Robot left arena ## CurriculumManager Implements progressive training curricula. ### Configuration ```python @configclass class CurriculumManagerCfg: terrain_levels = CurriculumTermCfg( func=curriculum.terrain_difficulty, start_level=0, end_level=10, num_steps=1_000_000 ) ``` ### API ```python class CurriculumManager(ManagerBase): def update(self, metrics: dict[str, float]): """Update curriculum based on performance.""" pass def get_level(self) -> int: """Get current difficulty level.""" pass ``` ## EventManager Handles domain randomization and episodic events. ### Configuration ```python @configclass class EventManagerCfg: # On reset events reset_base = EventTermCfg( func=events.reset_root_state_uniform, mode="reset" ) # Interval events push_robot = EventTermCfg( func=events.apply_external_force, mode="interval", interval_steps=250, params={"force_range": (0, 100)} ) # Startup events randomize_mass = EventTermCfg( func=events.randomize_rigid_body_mass, mode="startup", params={"mass_range": (0.8, 1.2)} ) ``` ### API ```python class EventManager(ManagerBase): def reset(self, env_ids: torch.Tensor | None = None): """Apply reset events.""" pass def step(self): """Apply interval events.""" pass ``` ## Next Steps - See [observation terms](observations.md) reference - See [reward terms](rewards.md) reference - See [action terms](actions.md) reference - Check [examples](../../user_guide/tutorials/index.md)