Advanced Topics#

Deep dives into advanced GenesisLab features and techniques.

Topics#

Manager Internals#

Understanding the manager system architecture.

Topics:

  • Manager lifecycle

  • Term resolution and execution

  • Custom manager implementation

  • Advanced term patterns


Scene Patterns#

Scene building, controlling, and querying patterns.

Topics:

  • SceneBuilder pattern

  • SceneController pattern

  • SceneQuerier pattern

  • Genesis scene API


Performance Optimization#

Techniques for maximizing simulation and training speed.

Topics:

  • Profiling simulation

  • GPU utilization

  • Vectorization best practices

  • Memory optimization


Checkpointing#

Saving and loading simulation states.

Topics:

  • Scene checkpointing

  • Policy checkpointing

  • Resume training

  • Checkpoint versioning


Logging and Visualization#

Monitoring and analyzing training.

Topics:

  • TensorBoard integration

  • WandB integration

  • Custom metrics

  • Visualization tools


Multi-GPU Training#

Scaling across multiple GPUs.

Topics:

  • Data parallelism

  • Environment distribution

  • Gradient synchronization

  • Performance considerations


Debugging#

Techniques for debugging robot learning systems.

Topics:

  • Debugging observations

  • Debugging rewards

  • Debugging physics

  • Common issues and solutions

Advanced Examples#

Custom Manager Implementation#

from genesislab.managers import ManagerBase

class MyCustomManager(ManagerBase):
    def __init__(self, cfg: MyCustomManagerCfg, scene: LabScene):
        super().__init__(cfg, scene)
        self._init_terms()
    
    def _init_terms(self):
        """Initialize manager terms."""
        for term_name, term_cfg in self.cfg.__dict__.items():
            if isinstance(term_cfg, MyTermCfg):
                self.terms[term_name] = MyTerm(term_cfg, self.scene)
    
    def reset(self, env_ids: torch.Tensor | None = None):
        """Reset manager state."""
        for term in self.terms.values():
            term.reset(env_ids)
    
    def compute(self) -> torch.Tensor:
        """Compute manager output."""
        results = []
        for term in self.terms.values():
            results.append(term.compute())
        return torch.stack(results, dim=-1)

Advanced Observation Processing#

class HistoryObservationTerm:
    def __init__(self, cfg: ObservationTermCfg, scene: LabScene):
        self.cfg = cfg
        self.scene = scene
        self.history_length = cfg.history_length
        self.history_buffer = None
    
    def reset(self, env_ids: torch.Tensor | None = None):
        if env_ids is None:
            self.history_buffer = torch.zeros(
                self.scene.num_envs,
                self.history_length,
                self.obs_dim,
                device=self.scene.device
            )
        else:
            self.history_buffer[env_ids] = 0
    
    def compute(self) -> torch.Tensor:
        # Get current observation
        current_obs = self.cfg.func(self.scene)
        
        # Update history
        self.history_buffer = torch.roll(self.history_buffer, 1, dims=1)
        self.history_buffer[:, 0] = current_obs
        
        # Return flattened history
        return self.history_buffer.flatten(start_dim=1)

Custom Scene Builder#

class CustomSceneBuilder:
    def __init__(self, cfg: SceneBuilderCfg):
        self.cfg = cfg
    
    def build(self, scene: gs.Scene) -> dict:
        """Build scene and return entity references."""
        entities = {}
        
        # Build terrain
        entities['terrain'] = self.build_terrain(scene)
        
        # Build robot
        entities['robot'] = self.build_robot(scene)
        
        # Build sensors
        entities['sensors'] = self.build_sensors(scene)
        
        # Build objects
        entities['objects'] = self.build_objects(scene)
        
        return entities
    
    def build_terrain(self, scene: gs.Scene):
        # Custom terrain generation
        pass
    
    def build_robot(self, scene: gs.Scene):
        # Robot loading and configuration
        pass

Best Practices#

1. Vectorization#

Always operate on all environments simultaneously:

# ✅ Good: Vectorized
rewards = (velocities[:, 0] - target_vel) ** 2

# ❌ Bad: Loop over environments
rewards = torch.zeros(num_envs)
for i in range(num_envs):
    rewards[i] = (velocities[i, 0] - target_vel) ** 2

2. Avoid CPU-GPU Transfers#

Minimize data transfers between CPU and GPU:

# ✅ Good: Keep data on GPU
data = scene.robot.get_joint_positions()  # Already on GPU
result = process_on_gpu(data)

# ❌ Bad: Unnecessary transfers
data = scene.robot.get_joint_positions().cpu().numpy()
result = process(data)
result = torch.from_numpy(result).cuda()

3. Pre-allocate Tensors#

Reuse buffers instead of creating new tensors:

class MyTerm:
    def __init__(self, scene):
        # Pre-allocate buffer
        self.buffer = torch.zeros(
            scene.num_envs,
            self.dim,
            device=scene.device
        )
    
    def compute(self):
        # Reuse buffer
        self.buffer[:] = compute_values()
        return self.buffer

4. Use In-Place Operations#

When possible, use in-place operations:

# ✅ Good: In-place
tensor += delta
tensor.clamp_(min_val, max_val)

# ❌ Creates new tensor
tensor = tensor + delta
tensor = tensor.clamp(min_val, max_val)

Debugging Techniques#

Observation Debugging#

# Print observation statistics
obs = env.reset()[0]
print(f"Obs shape: {obs.shape}")
print(f"Obs range: [{obs.min():.2f}, {obs.max():.2f}]")
print(f"Obs mean: {obs.mean():.2f}, std: {obs.std():.2f}")

# Check for NaN/Inf
assert not torch.isnan(obs).any(), "NaN in observations!"
assert not torch.isinf(obs).any(), "Inf in observations!"

Reward Debugging#

# Log individual reward terms
reward_manager = env.unwrapped.scene.reward_manager
for name, values in reward_manager.term_rewards.items():
    print(f"{name}:")
    print(f"  mean: {values.mean():.4f}")
    print(f"  std: {values.std():.4f}")
    print(f"  min: {values.min():.4f}")
    print(f"  max: {values.max():.4f}")

Physics Debugging#

# Visualize and slow down
env = gym.make("GenesisLab-Go2-Flat-v0", num_envs=1, headless=False)
for _ in range(1000):
    action = policy(obs)
    obs, rew, term, trunc, info = env.step(action)
    time.sleep(0.1)  # Slow down

Performance Profiling#

import cProfile
import pstats

# Profile training loop
def train_step():
    action = policy(obs)
    obs, rew, term, trunc, info = env.step(action)
    policy.update(obs, rew, term)

profiler = cProfile.Profile()
profiler.enable()

for _ in range(100):
    train_step()

profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20)

Next Steps#