Advanced Topics#
Deep dives into advanced GenesisLab features and techniques.
Topics#
Manager Internals#
Understanding the manager system architecture.
Topics:
Manager lifecycle
Term resolution and execution
Custom manager implementation
Advanced term patterns
Scene Patterns#
Scene building, controlling, and querying patterns.
Topics:
SceneBuilder pattern
SceneController pattern
SceneQuerier pattern
Genesis scene API
Performance Optimization#
Techniques for maximizing simulation and training speed.
Topics:
Profiling simulation
GPU utilization
Vectorization best practices
Memory optimization
Checkpointing#
Saving and loading simulation states.
Topics:
Scene checkpointing
Policy checkpointing
Resume training
Checkpoint versioning
Logging and Visualization#
Monitoring and analyzing training.
Topics:
TensorBoard integration
WandB integration
Custom metrics
Visualization tools
Multi-GPU Training#
Scaling across multiple GPUs.
Topics:
Data parallelism
Environment distribution
Gradient synchronization
Performance considerations
Debugging#
Techniques for debugging robot learning systems.
Topics:
Debugging observations
Debugging rewards
Debugging physics
Common issues and solutions
Advanced Examples#
Custom Manager Implementation#
from genesislab.managers import ManagerBase
class MyCustomManager(ManagerBase):
def __init__(self, cfg: MyCustomManagerCfg, scene: LabScene):
super().__init__(cfg, scene)
self._init_terms()
def _init_terms(self):
"""Initialize manager terms."""
for term_name, term_cfg in self.cfg.__dict__.items():
if isinstance(term_cfg, MyTermCfg):
self.terms[term_name] = MyTerm(term_cfg, self.scene)
def reset(self, env_ids: torch.Tensor | None = None):
"""Reset manager state."""
for term in self.terms.values():
term.reset(env_ids)
def compute(self) -> torch.Tensor:
"""Compute manager output."""
results = []
for term in self.terms.values():
results.append(term.compute())
return torch.stack(results, dim=-1)
Advanced Observation Processing#
class HistoryObservationTerm:
def __init__(self, cfg: ObservationTermCfg, scene: LabScene):
self.cfg = cfg
self.scene = scene
self.history_length = cfg.history_length
self.history_buffer = None
def reset(self, env_ids: torch.Tensor | None = None):
if env_ids is None:
self.history_buffer = torch.zeros(
self.scene.num_envs,
self.history_length,
self.obs_dim,
device=self.scene.device
)
else:
self.history_buffer[env_ids] = 0
def compute(self) -> torch.Tensor:
# Get current observation
current_obs = self.cfg.func(self.scene)
# Update history
self.history_buffer = torch.roll(self.history_buffer, 1, dims=1)
self.history_buffer[:, 0] = current_obs
# Return flattened history
return self.history_buffer.flatten(start_dim=1)
Custom Scene Builder#
class CustomSceneBuilder:
def __init__(self, cfg: SceneBuilderCfg):
self.cfg = cfg
def build(self, scene: gs.Scene) -> dict:
"""Build scene and return entity references."""
entities = {}
# Build terrain
entities['terrain'] = self.build_terrain(scene)
# Build robot
entities['robot'] = self.build_robot(scene)
# Build sensors
entities['sensors'] = self.build_sensors(scene)
# Build objects
entities['objects'] = self.build_objects(scene)
return entities
def build_terrain(self, scene: gs.Scene):
# Custom terrain generation
pass
def build_robot(self, scene: gs.Scene):
# Robot loading and configuration
pass
Best Practices#
1. Vectorization#
Always operate on all environments simultaneously:
# ✅ Good: Vectorized
rewards = (velocities[:, 0] - target_vel) ** 2
# ❌ Bad: Loop over environments
rewards = torch.zeros(num_envs)
for i in range(num_envs):
rewards[i] = (velocities[i, 0] - target_vel) ** 2
2. Avoid CPU-GPU Transfers#
Minimize data transfers between CPU and GPU:
# ✅ Good: Keep data on GPU
data = scene.robot.get_joint_positions() # Already on GPU
result = process_on_gpu(data)
# ❌ Bad: Unnecessary transfers
data = scene.robot.get_joint_positions().cpu().numpy()
result = process(data)
result = torch.from_numpy(result).cuda()
3. Pre-allocate Tensors#
Reuse buffers instead of creating new tensors:
class MyTerm:
def __init__(self, scene):
# Pre-allocate buffer
self.buffer = torch.zeros(
scene.num_envs,
self.dim,
device=scene.device
)
def compute(self):
# Reuse buffer
self.buffer[:] = compute_values()
return self.buffer
4. Use In-Place Operations#
When possible, use in-place operations:
# ✅ Good: In-place
tensor += delta
tensor.clamp_(min_val, max_val)
# ❌ Creates new tensor
tensor = tensor + delta
tensor = tensor.clamp(min_val, max_val)
Debugging Techniques#
Observation Debugging#
# Print observation statistics
obs = env.reset()[0]
print(f"Obs shape: {obs.shape}")
print(f"Obs range: [{obs.min():.2f}, {obs.max():.2f}]")
print(f"Obs mean: {obs.mean():.2f}, std: {obs.std():.2f}")
# Check for NaN/Inf
assert not torch.isnan(obs).any(), "NaN in observations!"
assert not torch.isinf(obs).any(), "Inf in observations!"
Reward Debugging#
# Log individual reward terms
reward_manager = env.unwrapped.scene.reward_manager
for name, values in reward_manager.term_rewards.items():
print(f"{name}:")
print(f" mean: {values.mean():.4f}")
print(f" std: {values.std():.4f}")
print(f" min: {values.min():.4f}")
print(f" max: {values.max():.4f}")
Physics Debugging#
# Visualize and slow down
env = gym.make("GenesisLab-Go2-Flat-v0", num_envs=1, headless=False)
for _ in range(1000):
action = policy(obs)
obs, rew, term, trunc, info = env.step(action)
time.sleep(0.1) # Slow down
Performance Profiling#
import cProfile
import pstats
# Profile training loop
def train_step():
action = policy(obs)
obs, rew, term, trunc, info = env.step(action)
policy.update(obs, rew, term)
profiler = cProfile.Profile()
profiler.enable()
for _ in range(100):
train_step()
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20)
Next Steps#
Explore specific advanced topics
Check performance guide
See debugging guide
Review API reference