Skip to content

Commit fa3ac31

Browse files
authored
curobo collision spheres visualization. (#16)
1 parent b512d23 commit fa3ac31

1 file changed

Lines changed: 270 additions & 0 deletions

File tree

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,270 @@
1+
"""Visualize cuRobo robot self-collision spheres during pipeline execution.
2+
3+
This script runs an AutoSim pipeline and updates collision sphere + EE frame
4+
visualization after every simulation step.
5+
6+
Usage
7+
-----
8+
Run with Isaac Sim UI enabled (do NOT use ``--headless``):
9+
10+
python examples/visualization/curobo_collision_spheres.py --pipeline_id <PIPELINE_ID>
11+
12+
Defaults
13+
--------
14+
* Environment ID: 0
15+
* Sphere color: green (0.2, 0.9, 0.2)
16+
* Sphere opacity: 0.4
17+
* EE frame scale: 0.1
18+
19+
Notes
20+
-----
21+
* Pipeline execution logic is inlined so visualization can hook into every step.
22+
* Spheres with radius <= 0 are disabled placeholders and are skipped.
23+
* VisualizationMarkers groups spheres by radius (one USD prototype per unique radius).
24+
"""
25+
26+
from __future__ import annotations
27+
28+
import argparse
29+
30+
import numpy as np
31+
import torch
32+
from isaaclab.app import AppLauncher
33+
34+
parser = argparse.ArgumentParser(description="Visualize cuRobo collision spheres during pipeline execution.")
35+
parser.add_argument("--pipeline_id", type=str, required=True, help="Name of the autosim pipeline.")
36+
37+
AppLauncher.add_app_launcher_args(parser)
38+
args_cli = parser.parse_args()
39+
40+
app_launcher = AppLauncher(vars(args_cli))
41+
simulation_app = app_launcher.app
42+
43+
import isaaclab.sim as sim_utils
44+
from isaaclab.markers import VisualizationMarkers, VisualizationMarkersCfg
45+
from isaaclab.markers.config import FRAME_MARKER_CFG
46+
47+
import autosim_examples # noqa: F401
48+
from autosim import make_pipeline
49+
from autosim.core.registration import SkillRegistry
50+
51+
52+
def _build_curobo_q(pipeline, env_id: int) -> torch.Tensor:
53+
"""Build a joint position tensor in cuRobo's joint order from Isaac Lab state.
54+
55+
Isaac Lab and cuRobo use different joint orderings. We look up each cuRobo
56+
joint by name in Isaac Lab's joint_names list and reorder accordingly.
57+
Joints not present in Isaac Lab (e.g. virtual base joints) are set to 0.
58+
"""
59+
planner = pipeline._motion_planner
60+
robot = pipeline._robot
61+
62+
isaaclab_names = list(robot.data.joint_names)
63+
isaaclab_q = robot.data.joint_pos[env_id]
64+
65+
q = torch.zeros(len(planner.target_joint_names), dtype=isaaclab_q.dtype, device=isaaclab_q.device)
66+
for i, name in enumerate(planner.target_joint_names):
67+
if name in isaaclab_names:
68+
q[i] = isaaclab_q[isaaclab_names.index(name)]
69+
# joints missing from Isaac Lab (virtual base joints) stay at 0
70+
71+
return planner._to_curobo_device(q)
72+
73+
74+
def _get_spheres_world(pipeline, env_id: int) -> tuple[np.ndarray, np.ndarray]:
75+
"""Return (positions, radii) for all active collision spheres in world frame."""
76+
import isaaclab.utils.math as PoseUtils
77+
from curobo.types.state import JointState
78+
79+
planner = pipeline._motion_planner
80+
robot = pipeline._robot
81+
82+
q_curobo = _build_curobo_q(pipeline, env_id)
83+
js = JointState(position=q_curobo, joint_names=planner.target_joint_names)
84+
kin_state = planner.motion_gen.compute_kinematics(js)
85+
86+
spheres_root = kin_state.robot_spheres[0].detach() # [N, 4]
87+
88+
root_pose = robot.data.root_pose_w[env_id].detach()
89+
robot_root_pos = root_pose[:3]
90+
robot_root_quat = root_pose[3:] # wxyz
91+
92+
device, dtype = root_pose.device, root_pose.dtype
93+
xyz = spheres_root[:, :3].to(device=device, dtype=dtype)
94+
radii_t = spheres_root[:, 3].to(device=device, dtype=dtype)
95+
96+
n = xyz.shape[0]
97+
robot_root_pos_b = robot_root_pos.unsqueeze(0).expand(n, -1)
98+
robot_root_quat_b = robot_root_quat.unsqueeze(0).expand(n, -1)
99+
identity = torch.tensor([1.0, 0.0, 0.0, 0.0], device=device, dtype=dtype).unsqueeze(0).expand(n, -1)
100+
101+
centers_w, _ = PoseUtils.combine_frame_transforms(robot_root_pos_b, robot_root_quat_b, xyz, identity)
102+
103+
mask = radii_t > 0.0
104+
positions = centers_w[mask].cpu().numpy()
105+
radii = radii_t[mask].cpu().numpy()
106+
return positions, radii
107+
108+
109+
def _get_ee_pose_world(pipeline, env_id: int) -> torch.Tensor:
110+
"""Return EE pose in world frame as [x, y, z, qw, qx, qy, qz] via cuRobo FK."""
111+
import isaaclab.utils.math as PoseUtils
112+
113+
planner = pipeline._motion_planner
114+
robot = pipeline._robot
115+
116+
q_curobo = _build_curobo_q(pipeline, env_id)
117+
ee_pose_root = planner.get_ee_pose(q_curobo)
118+
119+
root_pose = robot.data.root_pose_w[env_id].detach()
120+
rr_pos = root_pose[:3].unsqueeze(0)
121+
rr_quat = root_pose[3:].unsqueeze(0) # wxyz
122+
123+
device, dtype = root_pose.device, root_pose.dtype
124+
ee_pos_root = ee_pose_root.position.view(1, 3).to(device=device, dtype=dtype)
125+
ee_quat_root = ee_pose_root.quaternion.view(1, 4).to(device=device, dtype=dtype) # wxyz
126+
127+
ee_pos_w, ee_quat_w = PoseUtils.combine_frame_transforms(rr_pos, rr_quat, ee_pos_root, ee_quat_root)
128+
return torch.cat([ee_pos_w, ee_quat_w], dim=-1).squeeze(0) # [7]
129+
130+
131+
def _create_ee_marker(scale: float) -> VisualizationMarkers:
132+
"""Create a frame-axis marker for the EE pose."""
133+
cfg = FRAME_MARKER_CFG.copy()
134+
cfg.markers["frame"].scale = (scale, scale, scale)
135+
cfg = cfg.replace(prim_path="/World/debug/ee_frame")
136+
return VisualizationMarkers(cfg)
137+
138+
139+
def _update_ee_marker(vm: VisualizationMarkers, pose_w: torch.Tensor) -> None:
140+
pos = pose_w[:3].unsqueeze(0) # [1, 3]
141+
quat = pose_w[3:].unsqueeze(0) # [1, 4] wxyz
142+
vm.visualize(translations=pos, orientations=quat, marker_indices=[0])
143+
144+
145+
def _create_markers(unique_radii: np.ndarray, color: list[float], alpha: float) -> VisualizationMarkers:
146+
"""Build a VisualizationMarkers with one sphere prototype per unique radius."""
147+
markers_cfg: dict[str, sim_utils.SphereCfg] = {}
148+
for i, r in enumerate(unique_radii):
149+
markers_cfg[f"sphere_{i}"] = sim_utils.SphereCfg(
150+
radius=float(r),
151+
visual_material=sim_utils.PreviewSurfaceCfg(
152+
diffuse_color=tuple(color),
153+
opacity=alpha,
154+
),
155+
)
156+
cfg = VisualizationMarkersCfg(prim_path="/World/debug/collision_spheres", markers=markers_cfg)
157+
return VisualizationMarkers(cfg)
158+
159+
160+
def _update_markers(
161+
vm: VisualizationMarkers,
162+
positions: np.ndarray,
163+
radii: np.ndarray,
164+
unique_radii: np.ndarray,
165+
) -> None:
166+
radius_to_idx = {float(r): i for i, r in enumerate(unique_radii)}
167+
marker_indices = np.array([radius_to_idx[float(r)] for r in radii], dtype=np.int32)
168+
translations = torch.from_numpy(positions).float()
169+
vm.visualize(translations=translations, marker_indices=marker_indices.tolist())
170+
171+
172+
def _update_visualization(pipeline, env_id, vm_spheres, vm_ee, unique_radii):
173+
positions, radii = _get_spheres_world(pipeline, env_id)
174+
_update_markers(vm_spheres, positions, radii, unique_radii)
175+
_update_ee_marker(vm_ee, _get_ee_pose_world(pipeline, env_id))
176+
177+
178+
def _execute_single_skill_with_viz(pipeline, skill, goal, vm_spheres, vm_ee, unique_radii, env_id):
179+
"""Inlined from AutoSimPipeline._execute_single_skill with per-step visualization."""
180+
world_state = pipeline._build_world_state()
181+
plan_success = skill.plan(world_state, goal)
182+
183+
steps = 0
184+
while plan_success and steps < pipeline.cfg.max_steps:
185+
world_state = pipeline._build_world_state()
186+
output = skill.step(world_state)
187+
188+
adapter_result = pipeline._action_adapter.apply(skill, output, pipeline._env)
189+
action = pipeline._last_action.clone()
190+
action[pipeline._env_id, : adapter_result.shape[0]] = adapter_result
191+
192+
pipeline._env.step(action)
193+
pipeline._last_action = action
194+
pipeline._generated_actions.append(action)
195+
196+
pipeline._env.sim.render()
197+
_update_visualization(pipeline, env_id, vm_spheres, vm_ee, unique_radii)
198+
199+
steps += 1
200+
if output.done:
201+
return True, steps
202+
203+
if steps >= pipeline.cfg.max_steps:
204+
world_state = pipeline._build_world_state()
205+
current_pos = world_state.robot_base_pose[:2]
206+
if goal.target_pose is not None:
207+
target_pos = goal.target_pose[:2]
208+
dist = float(torch.linalg.norm(current_pos - target_pos))
209+
pipeline._logger.warning(
210+
f"Max steps reached. Current pos: ({current_pos[0]:.3f}, {current_pos[1]:.3f}), "
211+
f"Target pos: ({target_pos[0]:.3f}, {target_pos[1]:.3f}), Distance: {dist:.3f}m"
212+
)
213+
214+
return False, steps
215+
216+
217+
def main():
218+
env_id = 0
219+
color = [0.2, 0.9, 0.2]
220+
alpha = 0.4
221+
ee_scale = 0.1
222+
223+
pipeline = make_pipeline(args_cli.pipeline_id)
224+
pipeline.initialize()
225+
226+
# Build markers using the initial robot pose (before reset)
227+
positions, radii = _get_spheres_world(pipeline, env_id)
228+
unique_radii = np.unique(radii)
229+
vm_spheres = _create_markers(unique_radii, color, alpha)
230+
vm_ee = _create_ee_marker(scale=ee_scale)
231+
232+
# Decompose task
233+
decompose_result = pipeline.decompose()
234+
235+
# Execute skill sequence with per-step visualization
236+
pipeline._check_skill_extra_cfg()
237+
pipeline.reset_env()
238+
_update_visualization(pipeline, env_id, vm_spheres, vm_ee, unique_radii)
239+
240+
for subtask in decompose_result.subtasks:
241+
for skill_info in subtask.skills:
242+
skill = SkillRegistry.create(
243+
skill_info.skill_type, pipeline.cfg.skills.get(skill_info.skill_type).extra_cfg
244+
)
245+
246+
if pipeline._action_adapter.should_skip_apply(skill):
247+
pipeline._logger.info(f"Skill {skill_info.skill_type} skipped.")
248+
continue
249+
250+
goal = skill.extract_goal_from_info(skill_info, pipeline._env, pipeline._env_extra_info)
251+
success, steps = _execute_single_skill_with_viz(
252+
pipeline, skill, goal, vm_spheres, vm_ee, unique_radii, env_id
253+
)
254+
255+
if not success:
256+
pipeline._logger.error(f"Skill {skill_info.skill_type} failed after {steps} steps.")
257+
raise ValueError(f"Skill {skill_info.skill_type} failed after {steps} steps.")
258+
pipeline._logger.info(f"Skill {skill_info.skill_type} done ({steps} steps).")
259+
260+
pipeline._logger.info(f"Subtask {subtask.subtask_name} completed.")
261+
262+
pipeline._logger.info("Pipeline execution completed.")
263+
264+
while simulation_app.is_running():
265+
pipeline._env.sim.render()
266+
267+
268+
if __name__ == "__main__":
269+
main()
270+
simulation_app.close()

0 commit comments

Comments
 (0)