Skip to content
Open
44 changes: 17 additions & 27 deletions source/isaaclab/isaaclab/envs/direct_marl_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from .direct_marl_env_cfg import DirectMARLEnvCfg
from .ui import ViewportCameraController
from .utils.spaces import sample_space, spec_to_gym_space
from .utils.video_recorder import VideoRecorder

# import logger
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -168,6 +169,19 @@ def _init_sim(self, render_mode: str | None = None, **kwargs):
if "prestartup" in self.event_manager.available_modes:
self.event_manager.apply(mode="prestartup")

# Instantiate the video recorder before sim.reset() so that any fallback TiledCamera
# (used for state-based envs without an observation camera) is spawned into the USD
# stage and registered for the PHYSICS_READY callback before physics initialises.
# Forward render_mode so VideoRecorder only spawns fallback cameras when --video is active.
if self.cfg.video_recorder is not None:
self.cfg.video_recorder.env_render_mode = render_mode
vr = self.cfg.video_recorder
vr.camera_position = tuple(float(x) for x in self.cfg.viewer.eye)
vr.camera_target = tuple(float(x) for x in self.cfg.viewer.lookat)
self.video_recorder: VideoRecorder = self.cfg.video_recorder.class_type(self.cfg.video_recorder, self.scene)
else:
self.video_recorder = None

# play the simulator to activate physics handles
# note: this activates the physics simulation view that exposes TensorAPIs
# note: when started in extension mode, first call sim.reset_async() and then initialize the managers
Expand Down Expand Up @@ -521,33 +535,9 @@ def render(self, recompute: bool = False) -> np.ndarray | None:
if self.render_mode == "human" or self.render_mode is None:
return None
elif self.render_mode == "rgb_array":
# check that if any render could have happened
if not self.sim.has_gui and not self.sim.has_offscreen_render:
raise RuntimeError(
f"Cannot render '{self.render_mode}' - no GUI and offscreen rendering not enabled."
" If running headless, make sure --enable_cameras is set."
)
# create the annotator if it does not exist
if not hasattr(self, "_rgb_annotator"):
import omni.replicator.core as rep

# create render product
self._render_product = rep.create.render_product(
self.cfg.viewer.cam_prim_path, self.cfg.viewer.resolution
)
# create rgb annotator -- used to read data from the render product
self._rgb_annotator = rep.AnnotatorRegistry.get_annotator("rgb", device="cpu")
self._rgb_annotator.attach([self._render_product])
# obtain the rgb data
rgb_data = self._rgb_annotator.get_data()
# convert to numpy array
rgb_data = np.frombuffer(rgb_data, dtype=np.uint8).reshape(*rgb_data.shape)
# return the rgb data
# note: initially the renderer is warming up and returns empty data
if rgb_data.size == 0:
return np.zeros((self.cfg.viewer.resolution[1], self.cfg.viewer.resolution[0], 3), dtype=np.uint8)
else:
return rgb_data[:, :, :3]
if self.video_recorder is None:
return None
return self.video_recorder.render_rgb_array()
else:
raise NotImplementedError(
f"Render mode '{self.render_mode}' is not supported. Please use: {self.metadata['render_modes']}."
Expand Down
4 changes: 4 additions & 0 deletions source/isaaclab/isaaclab/envs/direct_marl_env_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from isaaclab.utils.noise import NoiseModelCfg

from .common import AgentID, SpaceType, ViewerCfg
from .utils.video_recorder_cfg import VideoRecorderCfg


@configclass
Expand Down Expand Up @@ -234,3 +235,6 @@ class DirectMARLEnvCfg:

log_dir: str | None = None
"""Directory for logging experiment artifacts. Defaults to None, in which case no specific log directory is set."""

video_recorder: VideoRecorderCfg = VideoRecorderCfg()
"""Configuration for video recording when ``render_mode="rgb_array"`` (i.e. ``--video``)."""
45 changes: 18 additions & 27 deletions source/isaaclab/isaaclab/envs/direct_rl_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from .direct_rl_env_cfg import DirectRLEnvCfg
from .ui import ViewportCameraController
from .utils.spaces import sample_space, spec_to_gym_space
from .utils.video_recorder import VideoRecorder

if has_kit():
import omni.kit.app
Expand Down Expand Up @@ -173,6 +174,20 @@ def _init_sim(self, render_mode: str | None = None, **kwargs):
if "prestartup" in self.event_manager.available_modes:
self.event_manager.apply(mode="prestartup")

# Instantiate the video recorder before sim.reset() so that any fallback TiledCamera
# (used for state-based envs without an observation camera) is spawned into the USD
# stage and registered for the PHYSICS_READY callback before physics initialises.
# Forward render_mode so VideoRecorder only spawns fallback cameras when --video is active.
if self.cfg.video_recorder is not None:
self.cfg.video_recorder.env_render_mode = render_mode
# Perspective --video uses same eye/lookat as task viewer (Kit persp + Newton GL).
vr = self.cfg.video_recorder
vr.camera_position = tuple(float(x) for x in self.cfg.viewer.eye)
vr.camera_target = tuple(float(x) for x in self.cfg.viewer.lookat)
self.video_recorder: VideoRecorder = self.cfg.video_recorder.class_type(self.cfg.video_recorder, self.scene)
else:
self.video_recorder = None

# play the simulator to activate physics handles
# note: this activates the physics simulation view that exposes TensorAPIs
# note: when started in extension mode, first call sim.reset_async() and then initialize the managers
Expand Down Expand Up @@ -489,33 +504,9 @@ def render(self, recompute: bool = False) -> np.ndarray | None:
if self.render_mode == "human" or self.render_mode is None:
return None
elif self.render_mode == "rgb_array":
# check that if any render could have happened
if not self.sim.has_gui and not self.sim.has_offscreen_render:
raise RuntimeError(
f"Cannot render '{self.render_mode}' - no GUI and offscreen rendering not enabled."
" If running headless, make sure --enable_cameras is set."
)
# create the annotator if it does not exist
if not hasattr(self, "_rgb_annotator"):
import omni.replicator.core as rep

# create render product
self._render_product = rep.create.render_product(
self.cfg.viewer.cam_prim_path, self.cfg.viewer.resolution
)
# create rgb annotator -- used to read data from the render product
self._rgb_annotator = rep.AnnotatorRegistry.get_annotator("rgb", device="cpu")
self._rgb_annotator.attach([self._render_product])
# obtain the rgb data
rgb_data = self._rgb_annotator.get_data()
# convert to numpy array
rgb_data = np.frombuffer(rgb_data, dtype=np.uint8).reshape(*rgb_data.shape)
# return the rgb data
# note: initially the renerer is warming up and returns empty data
if rgb_data.size == 0:
return np.zeros((self.cfg.viewer.resolution[1], self.cfg.viewer.resolution[0], 3), dtype=np.uint8)
else:
return rgb_data[:, :, :3]
if self.video_recorder is None:
return None
return self.video_recorder.render_rgb_array()
else:
raise NotImplementedError(
f"Render mode '{self.render_mode}' is not supported. Please use: {self.metadata['render_modes']}."
Expand Down
4 changes: 4 additions & 0 deletions source/isaaclab/isaaclab/envs/direct_rl_env_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from isaaclab.utils.noise import NoiseModelCfg

from .common import SpaceType, ViewerCfg
from .utils.video_recorder_cfg import VideoRecorderCfg


@configclass
Expand Down Expand Up @@ -254,3 +255,6 @@ class DirectRLEnvCfg:

log_dir: str | None = None
"""Directory for logging experiment artifacts. Defaults to None, in which case no specific log directory is set."""

video_recorder: VideoRecorderCfg = VideoRecorderCfg()
"""Configuration for video recording when ``render_mode="rgb_array"`` (i.e. ``--video``)."""
11 changes: 11 additions & 0 deletions source/isaaclab/isaaclab/envs/manager_based_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from .manager_based_env_cfg import ManagerBasedEnvCfg
from .ui import ViewportCameraController
from .utils.io_descriptors import export_articulations_data, export_scene_data
from .utils.video_recorder import VideoRecorder

# import logger
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -182,6 +183,16 @@ def _init_sim(self):
if "prestartup" in self.event_manager.available_modes:
self.event_manager.apply(mode="prestartup")

# Instantiate the video recorder before sim.reset() so that any fallback TiledCamera
# (used for state-based envs without an observation camera) is spawned into the USD
# stage and registered for the PHYSICS_READY callback before physics initialises.
# env_render_mode and camera_position/camera_target are forwarded by subclasses (e.g. ManagerBasedRLEnv)
# into cfg.video_recorder before calling super().__init__().
if self.cfg.video_recorder is not None:
self.video_recorder: VideoRecorder = self.cfg.video_recorder.class_type(self.cfg.video_recorder, self.scene)
else:
self.video_recorder = None

# play the simulator to activate physics handles
# note: this activates the physics simulation view that exposes TensorAPIs
# note: when started in extension mode, first call sim.reset_async() and then initialize the managers
Expand Down
4 changes: 4 additions & 0 deletions source/isaaclab/isaaclab/envs/manager_based_env_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from isaaclab.utils import configclass

from .common import ViewerCfg
from .utils.video_recorder_cfg import VideoRecorderCfg


@configclass
Expand Down Expand Up @@ -163,3 +164,6 @@ class ManagerBasedEnvCfg:

log_dir: str | None = None
"""Directory for logging experiment artifacts. Defaults to None, in which case no specific log directory is set."""

video_recorder: VideoRecorderCfg = VideoRecorderCfg()
"""Configuration for video recording when ``render_mode="rgb_array"`` (i.e. ``--video``)."""
40 changes: 11 additions & 29 deletions source/isaaclab/isaaclab/envs/manager_based_rl_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ def __init__(self, cfg: ManagerBasedRLEnvCfg, render_mode: str | None = None, **
# initialize the episode length buffer BEFORE loading the managers to use it in mdp functions.
self.episode_length_buf = torch.zeros(cfg.scene.num_envs, device=cfg.sim.device, dtype=torch.long)

# Forward render_mode and viewer camera to VideoRecorderCfg before super().__init__()
# creates the VideoRecorder, so fallback cameras are only spawned when --video is active
# (env_render_mode="rgb_array") and the perspective view matches the task viewport.
if cfg.video_recorder is not None:
cfg.video_recorder.env_render_mode = render_mode
cfg.video_recorder.camera_position = tuple(float(x) for x in cfg.viewer.eye)
cfg.video_recorder.camera_target = tuple(float(x) for x in cfg.viewer.lookat)

# initialize the base class to setup the scene.
super().__init__(cfg=cfg)
# store the render mode
Expand Down Expand Up @@ -270,35 +278,9 @@ def render(self, recompute: bool = False) -> np.ndarray | None:
if self.render_mode == "human" or self.render_mode is None:
return None
elif self.render_mode == "rgb_array":
# check that if any render could have happened
# Check for GUI, offscreen rendering, or visualizers
has_visualizers = bool(self.sim.get_setting("/isaaclab/visualizer"))
if not (self.sim.has_gui or self.sim.has_offscreen_render or has_visualizers):
raise RuntimeError(
f"Cannot render '{self.render_mode}' - no GUI and offscreen rendering not enabled."
" If running headless, make sure --enable_cameras is set."
)
# create the annotator if it does not exist
if not hasattr(self, "_rgb_annotator"):
import omni.replicator.core as rep

# create render product
self._render_product = rep.create.render_product(
self.cfg.viewer.cam_prim_path, self.cfg.viewer.resolution
)
# create rgb annotator -- used to read data from the render product
self._rgb_annotator = rep.AnnotatorRegistry.get_annotator("rgb", device="cpu")
self._rgb_annotator.attach([self._render_product])
# obtain the rgb data
rgb_data = self._rgb_annotator.get_data()
# convert to numpy array
rgb_data = np.frombuffer(rgb_data, dtype=np.uint8).reshape(*rgb_data.shape)
# return the rgb data
# note: initially the renerer is warming up and returns empty data
if rgb_data.size == 0:
return np.zeros((self.cfg.viewer.resolution[1], self.cfg.viewer.resolution[0], 3), dtype=np.uint8)
else:
return rgb_data[:, :, :3]
if self.video_recorder is None:
return None
return self.video_recorder.render_rgb_array()
else:
raise NotImplementedError(
f"Render mode '{self.render_mode}' is not supported. Please use: {self.metadata['render_modes']}."
Expand Down
120 changes: 120 additions & 0 deletions source/isaaclab/isaaclab/envs/utils/video_recorder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# Copyright (c) 2022-2026, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: BSD-3-Clause

"""Video recorder implementation.

Captures a single wide-angle perspective view of the scene:

* **Kit backends** (PhysX physics or Isaac RTX renderer) — uses
:mod:`isaaclab_physx.video_recording.isaacsim_kit_perspective_video`.
* **Newton backends** (Newton physics or Newton Warp renderer only) — uses
:mod:`isaaclab_newton.video_recording.newton_gl_perspective_video`.

If neither a Kit nor a Newton backend is detected, construction raises so users do not
use ``--video`` on unsupported setups.

See :mod:`video_recorder_cfg` for configuration.
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Literal

import numpy as np

if TYPE_CHECKING:
from isaaclab.scene import InteractiveScene

from .video_recorder_cfg import VideoRecorderCfg

logger = logging.getLogger(__name__)

_VideoBackend = Literal["kit", "newton_gl"]


def _resolve_video_backend(scene: InteractiveScene) -> _VideoBackend:
"""Resolve which video backend to use from physics and renderer configs.

Priority: PhysX or Isaac RTX -> Kit camera; else Newton or Newton Warp -> GL viewer.
When both are present (e.g. PhysX + Newton Warp), Kit wins.
"""
sim = scene.sim
physics_name = sim.physics_manager.__name__.lower()
renderer_types: list[str] = scene._sensor_renderer_types()

use_kit = "physx" in physics_name or "isaac_rtx" in renderer_types
use_newton_gl = "newton" in physics_name or "newton_warp" in renderer_types

if use_kit:
return "kit"
if use_newton_gl:
return "newton_gl"
raise RuntimeError(
"Video recording (--video) requires a supported backend: "
"PhysX or Isaac RTX renderer (Kit camera), or Newton physics / Newton Warp renderer (GL viewer). "
"No supported backend detected; do not use --video for this setup."
)


class VideoRecorder:
"""Records perspective video frames from the scene's active renderer.

Args:
cfg: Recorder configuration.
scene: The interactive scene that owns the sensors.
"""

def __init__(self, cfg: VideoRecorderCfg, scene: InteractiveScene):
self.cfg = cfg
self._scene = scene
self._backend: _VideoBackend | None = None
self._capture = None

if cfg.env_render_mode == "rgb_array":
self._backend = _resolve_video_backend(scene)
if self._backend == "newton_gl":
try:
import pyglet

if not pyglet.options.get("headless", False):
pyglet.options["headless"] = True
except ImportError as e:
raise ImportError(
"The Newton GL video backend requires 'pyglet'. Install IsaacLab with './isaaclab.sh -i'."
) from e
from isaaclab_newton.video_recording.newton_gl_perspective_video import (
create_newton_gl_perspective_video,
)
from isaaclab_newton.video_recording.newton_gl_perspective_video_cfg import NewtonGlPerspectiveVideoCfg

ncfg = NewtonGlPerspectiveVideoCfg(
window_width=cfg.window_width,
window_height=cfg.window_height,
camera_position=cfg.camera_position,
camera_target=cfg.camera_target,
)
self._capture = create_newton_gl_perspective_video(ncfg)
else:
from isaaclab_physx.video_recording.isaacsim_kit_perspective_video import (
create_isaacsim_kit_perspective_video,
)
from isaaclab_physx.video_recording.isaacsim_kit_perspective_video_cfg import (
IsaacsimKitPerspectiveVideoCfg,
)

kcfg = IsaacsimKitPerspectiveVideoCfg(
camera_position=cfg.camera_position,
camera_target=cfg.camera_target,
window_width=cfg.window_width,
window_height=cfg.window_height,
)
self._capture = create_isaacsim_kit_perspective_video(kcfg)

def render_rgb_array(self) -> np.ndarray | None:
"""Return an RGB frame for the resolved backend. Fails if backend is unavailable."""
if self._backend is None or self._capture is None:
return None
return self._capture.render_rgb_array()
Loading