diff --git a/examples/visualization/reach_target_pose.py b/examples/visualization/reach_target_pose.py index 220e239..6ee9997 100644 --- a/examples/visualization/reach_target_pose.py +++ b/examples/visualization/reach_target_pose.py @@ -1,11 +1,68 @@ -"""Visualize all reach target poses for an autosim pipeline.""" +"""Visualize all reach target poses for an autosim pipeline. + +Usage +----- +1) Start the visualization (it will export a debug JSON once after `pipeline.reset_env()`): + python examples/visualization/reach_target_pose.py --pipeline_id \ + --debug_poses_path /abs/path/reach_target_poses_debug.json + +2) Edit and save that JSON file. The script polls its mtime and reloads markers automatically. + +`--debug_poses_path` JSON format +--------------------------------- +The script expects this payload: +{ + "object_reach_target_poses": { + "": [ + [x, y, z, qw, qx, qy, qz], + ... + ], + ... + }, + "object_extra_reach_target_poses": { + "": { + "": [ + [x, y, z, qw, qx, qy, qz], + ... + ], + ... + }, + ... + } +} + +Notes +----- +* Poses are in the object frame: [x, y, z, qw, qx, qy, qz]. +* `--live_poll_interval_s` controls how often the file is checked (default: 0.2s). +""" import argparse +import json +import os +import time +import torch from isaaclab.app import AppLauncher parser = argparse.ArgumentParser(description="Visualize reach target poses for an autosim pipeline.") parser.add_argument("--pipeline_id", type=str, default=None, help="Name of the autosim pipeline.") +parser.add_argument( + "--debug_poses_path", + type=str, + default="reach_target_poses_debug.json", + help=( + "If provided, the script will export the current `object_reach_target_poses` " + "(and `object_extra_reach_target_poses`) to this JSON file after `reset_env()`, " + "and reload it on every file change." + ), +) +parser.add_argument( + "--live_poll_interval_s", + type=float, + default=0.2, + help="Polling interval (seconds) for checking `--debug_poses_path` updates.", +) AppLauncher.add_app_launcher_args(parser) args_cli = parser.parse_args() @@ -19,15 +76,175 @@ from autosim.utils.debug_util import visualize_reach_target_poses +def _load_env_extra_poses_json( + path: str, +) -> tuple[dict[str, list[list[float]]], dict[str, dict[str, list[list[float]]]]]: + """Load env extra poses from the exported debug JSON. + + We expect the payload format produced by this script: + { + "object_reach_target_poses": { "": [[7], ...] }, + "object_extra_reach_target_poses": { "": { "": [[7], ...] } } + } + """ + with open(path, encoding="utf-8") as f: + data = json.load(f) + + object_reach_target_poses: dict[str, list[list[float]]] = {} + object_extra_reach_target_poses: dict[str, dict[str, list[list[float]]]] = {} + + if not isinstance(data, dict): + raise ValueError("Debug JSON root must be an object.") + + reach = data.get("object_reach_target_poses", {}) + if not isinstance(reach, dict): + raise ValueError("`object_reach_target_poses` must be an object mapping.") + for obj_name, pose_list in reach.items(): + if not isinstance(obj_name, str): + raise ValueError("Reach: object names must be strings.") + if not isinstance(pose_list, list): + raise ValueError(f"Reach: `{obj_name}` must map to a list of poses.") + normalized: list[list[float]] = [] + for pose in pose_list: + if not (isinstance(pose, list) and len(pose) == 7): + raise ValueError(f"Reach: each pose for `{obj_name}` must be list length 7.") + normalized.append([float(v) for v in pose]) + object_reach_target_poses[obj_name] = normalized + + extra = data.get("object_extra_reach_target_poses", {}) + if extra is None: + extra = {} + if not isinstance(extra, dict): + raise ValueError("`object_extra_reach_target_poses` must be an object mapping.") + for obj_name, ee_dict in extra.items(): + if not isinstance(obj_name, str): + raise ValueError("Extra: object names must be strings.") + if not isinstance(ee_dict, dict): + raise ValueError(f"Extra: `{obj_name}` must map ee_name -> pose_list.") + object_extra_reach_target_poses[obj_name] = {} + for ee_name, pose_list in ee_dict.items(): + if not isinstance(ee_name, str): + raise ValueError("Extra: ee_name must be a string.") + if not isinstance(pose_list, list): + raise ValueError(f"Extra: `{obj_name}.{ee_name}` must be a list.") + normalized: list[list[float]] = [] + for pose in pose_list: + if not (isinstance(pose, list) and len(pose) == 7): + raise ValueError(f"Extra: each pose for `{obj_name}.{ee_name}` must be list length 7.") + normalized.append([float(v) for v in pose]) + object_extra_reach_target_poses[obj_name][ee_name] = normalized + + return object_reach_target_poses, object_extra_reach_target_poses + + +def _apply_live_poses(*, poses_path: str, pipeline) -> None: + """Update `pipeline._env_extra_info.*` from JSON.""" + env = pipeline._env + env_extra_info = pipeline._env_extra_info + object_reach_target_poses, object_extra_reach_target_poses = _load_env_extra_poses_json(poses_path) + + # Replace dicts entirely so removed keys in the JSON disappear from visualization. + env_extra_info.object_reach_target_poses = {} + env_extra_info.object_extra_reach_target_poses = {} + + # Update primary reach targets + for obj_name, pose_list in object_reach_target_poses.items(): + if obj_name not in env.scene.keys(): + continue + obj_pose_w = env.scene[obj_name].data.root_pose_w[0] # [7] + device = obj_pose_w.device + dtype = obj_pose_w.dtype + env_extra_info.object_reach_target_poses[obj_name] = [ + torch.tensor(pose, device=device, dtype=dtype) for pose in pose_list + ] + + # Update extra EE reach targets (if present) + for obj_name, ee_dict in object_extra_reach_target_poses.items(): + if obj_name not in env.scene.keys(): + continue + obj_pose_w = env.scene[obj_name].data.root_pose_w[0] # [7] + device = obj_pose_w.device + dtype = obj_pose_w.dtype + if obj_name not in env_extra_info.object_extra_reach_target_poses: + env_extra_info.object_extra_reach_target_poses[obj_name] = {} + for ee_name, pose_list in ee_dict.items(): + env_extra_info.object_extra_reach_target_poses[obj_name][ee_name] = [ + torch.tensor(pose, device=device, dtype=dtype) for pose in pose_list + ] + + +def _export_env_extra_poses_to_json(*, out_path: str, pipeline) -> None: + """Export current env_extra_info reach targets to JSON.""" + env_extra_info = pipeline._env_extra_info + + def _tensor_pose_to_list(p: list) -> list[float]: + # p is torch.Tensor(7,) but we want plain python floats. + return [float(x) for x in p] + + object_reach_target_poses: dict[str, list[list[float]]] = {} + for obj_name, pose_list in env_extra_info.object_reach_target_poses.items(): + object_reach_target_poses[obj_name] = [_tensor_pose_to_list(pose.tolist()) for pose in pose_list] + + object_extra_reach_target_poses: dict[str, dict[str, list[list[float]]]] = {} + if hasattr(env_extra_info, "object_extra_reach_target_poses"): + for obj_name, ee_dict in env_extra_info.object_extra_reach_target_poses.items(): + object_extra_reach_target_poses[obj_name] = {} + for ee_name, pose_list in ee_dict.items(): + object_extra_reach_target_poses[obj_name][ee_name] = [ + _tensor_pose_to_list(pose.tolist()) for pose in pose_list + ] + + payload = { + "object_reach_target_poses": object_reach_target_poses, + "object_extra_reach_target_poses": object_extra_reach_target_poses, + } + with open(out_path, "w", encoding="utf-8") as f: + json.dump(payload, f, indent=2) + + def main(): pipeline = make_pipeline(args_cli.pipeline_id) pipeline.initialize() pipeline.reset_env() + + debug_path = os.path.abspath(args_cli.debug_poses_path) + + # Always export first so user can edit immediately after startup. + _export_env_extra_poses_to_json(out_path=debug_path, pipeline=pipeline) + print(f"[reach_target_pose] Exported debug poses to: {debug_path}") + + # Apply once (in case user edited the file between process start and export completion). + try: + _apply_live_poses(poses_path=debug_path, pipeline=pipeline) + except Exception as e: + print(f"[reach_target_pose] Failed to apply exported debug poses: {e}") visualize_reach_target_poses(pipeline._env_extra_info, pipeline._env) + last_mtime = os.path.getmtime(debug_path) + last_poll_t = 0.0 + while simulation_app.is_running(): pipeline._env.sim.render() + now = time.time() + if now - last_poll_t < args_cli.live_poll_interval_s: + continue + last_poll_t = now + + try: + mtime = os.path.getmtime(debug_path) + except OSError: + continue + + if mtime > last_mtime: + last_mtime = mtime + try: + _apply_live_poses(poses_path=debug_path, pipeline=pipeline) + visualize_reach_target_poses(pipeline._env_extra_info, pipeline._env) + print(f"[reach_target_pose] Reloaded markers from: {debug_path}") + except Exception as e: + print(f"[reach_target_pose] Failed to reload poses: {e}") + if __name__ == "__main__": main() diff --git a/source/autosim/autosim/calibration/plan_sweep.py b/source/autosim/autosim/calibration/plan_sweep.py index 3a84252..8086be3 100644 --- a/source/autosim/autosim/calibration/plan_sweep.py +++ b/source/autosim/autosim/calibration/plan_sweep.py @@ -64,6 +64,8 @@ def reach_plan_sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg) -> list[ """ # modified from pipeline.run() to only execute the reach skill at the specified index + pipeline.initialize() + decompose_result = pipeline.decompose() pipeline._check_skill_extra_cfg() pipeline.reset_env() diff --git a/source/autosim/autosim/capabilities/navigation/occupancy_map.py b/source/autosim/autosim/capabilities/navigation/occupancy_map.py index 6ae56c6..3d24ad3 100644 --- a/source/autosim/autosim/capabilities/navigation/occupancy_map.py +++ b/source/autosim/autosim/capabilities/navigation/occupancy_map.py @@ -14,6 +14,11 @@ _logger = AutoSimLogger("OccupancyMap") +# ----------------------------------------------------------------------------- +# Public config / API +# ----------------------------------------------------------------------------- + + @configclass class OccupancyMapCfg: """Configuration for the occupancy map.""" @@ -32,6 +37,13 @@ class OccupancyMapCfg: """The height to sample the occupancy map at, in meters.""" height_tolerance: float = 0.2 """The tolerance for the height sampling.""" + mesh_max_points: int = 5000 + """Max number of mesh vertices used for footprint estimation (downsample if larger).""" + + +# ----------------------------------------------------------------------------- +# USD helpers (geometry discovery) +# ----------------------------------------------------------------------------- def _get_prim_bounds(stage, prim_path: str, verbose: bool = True) -> tuple[np.ndarray, np.ndarray]: @@ -58,64 +70,349 @@ def _get_prim_bounds(stage, prim_path: str, verbose: bool = True) -> tuple[np.nd return np.array([min_point[0], min_point[1], min_point[2]]), np.array([max_point[0], max_point[1], max_point[2]]) -def _collect_collision_prims( - stage, floor_prim_path: str, sample_height_min: float, sample_height_max: float, min_xy_extent: float = 0.01 -) -> list: - """Collect collision primitives from the scene""" +def _is_geometry_prim(prim: Usd.Prim) -> bool: + """Check if a prim is a geometry prim.""" + return ( + prim.IsA(UsdGeom.Mesh) + or prim.IsA(UsdGeom.Cube) + or prim.IsA(UsdGeom.Cylinder) + or prim.IsA(UsdGeom.Sphere) + or prim.IsA(UsdGeom.Capsule) + ) + - collision_prims = [] +# ----------------------------------------------------------------------------- +# 2D geometry / rasterization utilities +# ----------------------------------------------------------------------------- + + +def _convex_hull_2d(points: np.ndarray) -> np.ndarray: + """Compute 2D convex hull using monotonic chain. Returns CCW hull vertices.""" + + if points.shape[0] == 0: + return points + pts = np.unique(points.astype(np.float64), axis=0) + if pts.shape[0] <= 2: + return pts + pts = pts[np.lexsort((pts[:, 1], pts[:, 0]))] + + def cross(o: np.ndarray, a: np.ndarray, b: np.ndarray) -> float: + return float((a[0] - o[0]) * (b[1] - o[1]) - (a[1] - o[1]) * (b[0] - o[0])) + + lower: list[np.ndarray] = [] + for p in pts: + while len(lower) >= 2 and cross(lower[-2], lower[-1], p) <= 0.0: + lower.pop() + lower.append(p) + + upper: list[np.ndarray] = [] + for p in pts[::-1]: + while len(upper) >= 2 and cross(upper[-2], upper[-1], p) <= 0.0: + upper.pop() + upper.append(p) + + return np.array(lower[:-1] + upper[:-1], dtype=np.float64) + + +def _points_in_convex_poly(points: np.ndarray, poly: np.ndarray) -> np.ndarray: + """Test if 2D points are inside a convex polygon (CCW).""" + + if poly.shape[0] < 3: + return np.zeros((points.shape[0],), dtype=bool) + x = points[:, 0] + y = points[:, 1] + inside = np.ones((points.shape[0],), dtype=bool) + for i in range(poly.shape[0]): + x1, y1 = poly[i] + x2, y2 = poly[(i + 1) % poly.shape[0]] + inside &= ((x2 - x1) * (y - y1) - (y2 - y1) * (x - x1)) >= 0.0 + if not inside.any(): + break + return inside + + +def _rasterize_convex_poly( + occupancy_map: np.ndarray, + poly_xy: np.ndarray, + map_min_x: float, + map_min_y: float, + cell_size: float, + map_height: int, + map_width: int, +) -> None: + """Rasterize a convex polygon into an occupancy map.""" + + poly_min_x = float(poly_xy[:, 0].min()) + poly_max_x = float(poly_xy[:, 0].max()) + poly_min_y = float(poly_xy[:, 1].min()) + poly_max_y = float(poly_xy[:, 1].max()) + + min_j = max(0, int((poly_min_x - map_min_x) / cell_size) - 1) + max_j = min(map_width - 1, int((poly_max_x - map_min_x) / cell_size) + 1) + min_i = max(0, int((poly_min_y - map_min_y) / cell_size) - 1) + max_i = min(map_height - 1, int((poly_max_y - map_min_y) / cell_size) + 1) + if min_j > max_j or min_i > max_i: + return + + cols = np.arange(min_j, max_j + 1, dtype=np.int64) + rows = np.arange(min_i, max_i + 1, dtype=np.int64) + cc, rr = np.meshgrid(cols, rows) + + xs = map_min_x + (cc.astype(np.float64) + 0.5) * cell_size + ys = map_min_y + (rr.astype(np.float64) + 0.5) * cell_size + pts = np.stack([xs.reshape(-1), ys.reshape(-1)], axis=1) + + inside = _points_in_convex_poly(pts, poly_xy).reshape(rr.shape) + occupancy_map[min_i : max_i + 1, min_j : max_j + 1][inside] = 1 + + +# ----------------------------------------------------------------------------- +# Candidate discovery & expansion +# ----------------------------------------------------------------------------- + + +def _collect_candidate_prim_paths( + stage, + floor_prim_path: str, + sample_height_min: float, + sample_height_max: float, + min_xy_extent: float = 0.01, +) -> list[str]: + """Collect candidate obstacle prim paths from the scene (coarse filtering only).""" + + def xform_has_geometry_child(xform_prim: Usd.Prim) -> bool: + """Match previous behavior: only consider direct children. + + This avoids pulling in very top-level container Xforms (e.g., env roots) whose + geometry only exists deeper in the hierarchy. + """ + return any(_is_geometry_prim(child) for child in xform_prim.GetChildren()) + + candidate_paths: list[str] = [] bbox_cache = UsdGeom.BBoxCache(Usd.TimeCode.Default(), includedPurposes=[UsdGeom.Tokens.default_]) for prim in stage.Traverse(): path_str = str(prim.GetPath()) - # Skip the floor itself and robot if floor_prim_path in path_str or "Robot" in path_str or "robot" in path_str.lower(): continue - - # Skip lights, cameras, and other non-geometry prims if any(skip in path_str.lower() for skip in ["light", "camera", "looks", "material"]): continue - # Check if prim has geometry - has_geometry = ( - prim.IsA(UsdGeom.Mesh) - or prim.IsA(UsdGeom.Cube) - or prim.IsA(UsdGeom.Cylinder) - or prim.IsA(UsdGeom.Sphere) - or prim.IsA(UsdGeom.Capsule) - ) - if not has_geometry and prim.IsA(UsdGeom.Xform): - # Check if it's a group that might contain geometry - for child in prim.GetChildren(): - if child.IsA(UsdGeom.Mesh) or child.IsA(UsdGeom.Cube): - has_geometry = True - break - - if has_geometry: - try: - # Get bounding box - bbox = bbox_cache.ComputeWorldBound(prim) - aligned_box = bbox.ComputeAlignedBox() - prim_min = aligned_box.GetMin() - prim_max = aligned_box.GetMax() - - # Check if this prim intersects our sampling height range - if prim_min[2] <= sample_height_max and prim_max[2] >= sample_height_min: - # Only include if it has significant XY extent - xy_extent_x = prim_max[0] - prim_min[0] - xy_extent_y = prim_max[1] - prim_min[1] - if xy_extent_x > min_xy_extent and xy_extent_y > min_xy_extent: - collision_prims.append({ - "path": path_str, - "min": np.array([prim_min[0], prim_min[1], prim_min[2]]), - "max": np.array([prim_max[0], prim_max[1], prim_max[2]]), - }) - except Exception: - # Skip prims that can't be processed + if _is_geometry_prim(prim): + pass + elif prim.IsA(UsdGeom.Xform): + if not xform_has_geometry_child(prim): continue + else: + continue + + bbox = bbox_cache.ComputeWorldBound(prim) + aligned = bbox.ComputeAlignedBox() + prim_min = aligned.GetMin() + prim_max = aligned.GetMax() - return collision_prims + if prim_min[2] > sample_height_max or prim_max[2] < sample_height_min: + continue + + xy_extent_x = prim_max[0] - prim_min[0] + xy_extent_y = prim_max[1] - prim_min[1] + if xy_extent_x <= min_xy_extent or xy_extent_y <= min_xy_extent: + continue + + candidate_paths.append(path_str) + + return candidate_paths + + +def _expand_to_geometry_prims(prim: Usd.Prim) -> list[Usd.Prim]: + """Expand a prim to leaf geometry prims.""" + + if _is_geometry_prim(prim): + return [prim] + if prim.IsA(UsdGeom.Xform): + out: list[Usd.Prim] = [] + stack = list(prim.GetChildren()) + while stack: + p = stack.pop() + if _is_geometry_prim(p): + out.append(p) + elif p.IsA(UsdGeom.Xform): + stack.extend(p.GetChildren()) + return out + return [] + + +# ----------------------------------------------------------------------------- +# Footprint generation (world XY convex polygons) +# ----------------------------------------------------------------------------- + + +def _transform_points_local_to_world(points_local: np.ndarray, mat) -> np.ndarray: + """Transform points from local to world coordinates.""" + + out = np.empty_like(points_local, dtype=np.float64) + for i in range(points_local.shape[0]): + p = points_local[i] + pw = mat.Transform((float(p[0]), float(p[1]), float(p[2]))) + out[i, 0] = pw[0] + out[i, 1] = pw[1] + out[i, 2] = pw[2] + return out + + +def _downsample_points(points: np.ndarray, max_points: int) -> np.ndarray: + """Downsample points.""" + + if points.shape[0] <= max_points: + return points + idx = np.linspace(0, points.shape[0] - 1, max_points, dtype=np.int64) + return points[idx] + + +def _mesh_footprint_poly_xy( + mesh_prim: Usd.Prim, + xform_cache: UsdGeom.XformCache, + sample_height_min: float, + sample_height_max: float, + mesh_max_points: int, +) -> np.ndarray | None: + """Generate a footprint polygon for a mesh.""" + + mesh = UsdGeom.Mesh(mesh_prim) + pts = mesh.GetPointsAttr().Get(Usd.TimeCode.Default()) + if pts is None or len(pts) == 0: + return None + points_local = np.asarray(pts, dtype=np.float64) + points_local = _downsample_points(points_local, mesh_max_points) + + mat = xform_cache.GetLocalToWorldTransform(mesh_prim) + points_w = _transform_points_local_to_world(points_local, mat) + + z = points_w[:, 2] + mask = (z >= sample_height_min) & (z <= sample_height_max) + if not np.any(mask): + return None + xy = points_w[mask][:, :2] + poly = _convex_hull_2d(xy) + if poly.shape[0] < 3: + return None + return poly + + +def _sample_circle_points(radius: float, num: int) -> np.ndarray: + """Sample points on a circle.""" + + angles = np.linspace(0.0, 2.0 * np.pi, num, endpoint=False) + x = radius * np.cos(angles) + y = radius * np.sin(angles) + z = np.zeros_like(x) + return np.stack([x, y, z], axis=1) + + +def _cube_footprint_poly_xy(cube_prim: Usd.Prim, xform_cache: UsdGeom.XformCache) -> np.ndarray | None: + """Generate a footprint polygon for a cube.""" + + cube = UsdGeom.Cube(cube_prim) + size = float(cube.GetSizeAttr().Get(Usd.TimeCode.Default()) or 0.0) + if size <= 0.0: + return None + s = 0.5 * size + corners_local = np.array([[-s, -s, 0.0], [-s, s, 0.0], [s, s, 0.0], [s, -s, 0.0]], dtype=np.float64) + mat = xform_cache.GetLocalToWorldTransform(cube_prim) + corners_w = _transform_points_local_to_world(corners_local, mat) + poly = _convex_hull_2d(corners_w[:, :2]) + if poly.shape[0] < 3: + return None + return poly + + +def _cylinder_like_footprint_poly_xy( + prim: Usd.Prim, radius: float, xform_cache: UsdGeom.XformCache, num_circle_points: int = 32 +) -> np.ndarray | None: + """Generate a footprint polygon for a cylinder-like prim.""" + + if radius <= 0.0: + return None + points_local = _sample_circle_points(radius, num_circle_points) + mat = xform_cache.GetLocalToWorldTransform(prim) + points_w = _transform_points_local_to_world(points_local, mat) + poly = _convex_hull_2d(points_w[:, :2]) + if poly.shape[0] < 3: + return None + return poly + + +def _capsule_footprint_poly_xy( + capsule_prim: Usd.Prim, xform_cache: UsdGeom.XformCache, num_circle_points: int = 32 +) -> np.ndarray | None: + """Generate a footprint polygon for a capsule.""" + + cap = UsdGeom.Capsule(capsule_prim) + radius = float(cap.GetRadiusAttr().Get(Usd.TimeCode.Default()) or 0.0) + height = float(cap.GetHeightAttr().Get(Usd.TimeCode.Default()) or 0.0) + if radius <= 0.0: + return None + + axis = str(cap.GetAxisAttr().Get(Usd.TimeCode.Default()) or "Z").upper() + if axis == "Z": + return _cylinder_like_footprint_poly_xy(capsule_prim, radius, xform_cache, num_circle_points=num_circle_points) + + half_len = 0.5 * max(0.0, height) + angles = np.linspace(0.0, 2.0 * np.pi, num_circle_points, endpoint=False) + circle = np.stack([radius * np.cos(angles), radius * np.sin(angles)], axis=1) + + if axis == "X": + c1 = np.array([-half_len, 0.0], dtype=np.float64) + c2 = np.array([half_len, 0.0], dtype=np.float64) + else: # "Y" + c1 = np.array([0.0, -half_len], dtype=np.float64) + c2 = np.array([0.0, half_len], dtype=np.float64) + + pts2 = np.concatenate([circle + c1, circle + c2], axis=0) + points_local = np.concatenate([pts2, np.zeros((pts2.shape[0], 1), dtype=np.float64)], axis=1) + + mat = xform_cache.GetLocalToWorldTransform(capsule_prim) + points_w = _transform_points_local_to_world(points_local, mat) + poly = _convex_hull_2d(points_w[:, :2]) + if poly.shape[0] < 3: + return None + return poly + + +def _fallback_bbox_footprint_poly_xy( + prim: Usd.Prim, bbox_cache: UsdGeom.BBoxCache, xform_cache: UsdGeom.XformCache +) -> np.ndarray | None: + """Fallback footprint: projected local bbox corners convex hull in world XY.""" + + local_bbox = bbox_cache.ComputeLocalBound(prim) + box = local_bbox.GetRange() + mat = xform_cache.GetLocalToWorldTransform(prim) + bmin = box.GetMin() + bmax = box.GetMax() + corners_local = np.array( + [ + [bmin[0], bmin[1], bmin[2]], + [bmin[0], bmin[1], bmax[2]], + [bmin[0], bmax[1], bmin[2]], + [bmin[0], bmax[1], bmax[2]], + [bmax[0], bmin[1], bmin[2]], + [bmax[0], bmin[1], bmax[2]], + [bmax[0], bmax[1], bmin[2]], + [bmax[0], bmax[1], bmax[2]], + ], + dtype=np.float64, + ) + corners_w = _transform_points_local_to_world(corners_local, mat) + poly = _convex_hull_2d(corners_w[:, :2]) + if poly.shape[0] < 3: + return None + return poly + + +# ----------------------------------------------------------------------------- +# Public API +# ----------------------------------------------------------------------------- def get_occupancy_map(env: ManagerBasedEnv, cfg: OccupancyMapCfg) -> OccupancyMap: @@ -179,25 +476,63 @@ def get_occupancy_map(env: ManagerBasedEnv, cfg: OccupancyMapCfg) -> OccupancyMa sample_height_max = min_bound[2] + cfg.sample_height + cfg.height_tolerance _logger.info(f"Sampling height range: [{sample_height_min:.2f}, {sample_height_max:.2f}]") - # Collect collision primitives - collision_prims = _collect_collision_prims( + candidate_paths = _collect_candidate_prim_paths( stage, floor_prim_path, sample_height_min, sample_height_max, cfg.min_xy_extent ) - _logger.info(f"Found {len(collision_prims)} collision primitives") + _logger.info(f"Found {len(candidate_paths)} candidate prims") + + time_code = Usd.TimeCode.Default() + bbox_cache = UsdGeom.BBoxCache(time_code, includedPurposes=[UsdGeom.Tokens.default_]) + xform_cache = UsdGeom.XformCache(time_code) - # Mark occupied cells - for prim_info in collision_prims: - prim_min = prim_info["min"] - prim_max = prim_info["max"] + geom_prims: list[Usd.Prim] = [] + for path_str in candidate_paths: + prim = stage.GetPrimAtPath(path_str) + if not prim.IsValid(): + continue + geom_prims.extend(_expand_to_geometry_prims(prim)) + _logger.info(f"Expanded to {len(geom_prims)} geometry prims") - # Calculate grid indices for this prim's bounding box - min_i = max(0, int((prim_min[1] - map_min_y) / cfg.cell_size)) - max_i = min(map_height - 1, int((prim_max[1] - map_min_y) / cfg.cell_size) + 1) - min_j = max(0, int((prim_min[0] - map_min_x) / cfg.cell_size)) - max_j = min(map_width - 1, int((prim_max[0] - map_min_x) / cfg.cell_size) + 1) + for prim in geom_prims: + if not prim.IsValid(): + continue + + # Coarse height filter + bbox = bbox_cache.ComputeWorldBound(prim) + aligned = bbox.ComputeAlignedBox() + pmin = aligned.GetMin() + pmax = aligned.GetMax() + if pmin[2] > sample_height_max or pmax[2] < sample_height_min: + continue + + poly: np.ndarray | None = None + if prim.IsA(UsdGeom.Mesh): + poly = _mesh_footprint_poly_xy( + prim, + xform_cache, + sample_height_min, + sample_height_max, + mesh_max_points=cfg.mesh_max_points, + ) + elif prim.IsA(UsdGeom.Cube): + poly = _cube_footprint_poly_xy(prim, xform_cache) + elif prim.IsA(UsdGeom.Cylinder): + cyl = UsdGeom.Cylinder(prim) + radius = float(cyl.GetRadiusAttr().Get(time_code) or 0.0) + poly = _cylinder_like_footprint_poly_xy(prim, radius, xform_cache) + elif prim.IsA(UsdGeom.Sphere): + sph = UsdGeom.Sphere(prim) + radius = float(sph.GetRadiusAttr().Get(time_code) or 0.0) + poly = _cylinder_like_footprint_poly_xy(prim, radius, xform_cache) + elif prim.IsA(UsdGeom.Capsule): + poly = _capsule_footprint_poly_xy(prim, xform_cache) + else: + poly = _fallback_bbox_footprint_poly_xy(prim, bbox_cache, xform_cache) + + if poly is None or poly.shape[0] < 3: + continue - # Mark all cells in this bounding box as occupied - occupancy_map[min_i : max_i + 1, min_j : max_j + 1] = 1 + _rasterize_convex_poly(occupancy_map, poly, map_min_x, map_min_y, cfg.cell_size, map_height, map_width) return OccupancyMap( occupancy_map=torch.from_numpy(occupancy_map).to(env.device), diff --git a/source/autosim/autosim/core/logger.py b/source/autosim/autosim/core/logger.py index 7576105..044e194 100644 --- a/source/autosim/autosim/core/logger.py +++ b/source/autosim/autosim/core/logger.py @@ -22,6 +22,10 @@ def logger(self): self._logger.propagate = False return self._logger + @property + def is_debug_enabled(self): + return self._level == logging.DEBUG + def debug(self, msg, *args, **kwargs): self.logger.debug(msg, *args, **kwargs) diff --git a/source/autosim/autosim/core/types.py b/source/autosim/autosim/core/types.py index 019a0f0..9395ab3 100644 --- a/source/autosim/autosim/core/types.py +++ b/source/autosim/autosim/core/types.py @@ -3,6 +3,7 @@ from enum import Enum from typing import Any +import numpy as np import torch """PIPELINE RELATED TYPES""" @@ -88,6 +89,9 @@ class EnvExtraInfo: object_extra_reach_target_poses: dict[str, dict[str, list[torch.Tensor]]] = field(default_factory=dict) """The extra reach target poses in the objects frame. each object can have a list of extra reach target poses [x, y, z, qw, qx, qy, qz] with ee_name as the key in the order of execution.""" + object_navigate_sample_range: dict[str, tuple[float, float]] = field(default_factory=dict) + """The sample range for the navigate skill. each object can have a tuple of (min_angle, max_angle) in radians.""" + def __post_init__(self): self.reset() @@ -118,6 +122,9 @@ def get_next_reach_target_pose(self, object_name: str) -> torch.Tensor: def get_next_extra_reach_target_pose(self, object_name: str, ee_name: str) -> torch.Tensor: return next(self._object_extra_reach_target_poses_iterator_dict[object_name][ee_name]) + def get_navigate_sample_range(self, object_name: str) -> tuple[float, float]: + return self.object_navigate_sample_range.get(object_name, (0.0, 2 * np.pi)) + @dataclass class WorldState: diff --git a/source/autosim/autosim/skills/navigate.py b/source/autosim/autosim/skills/navigate.py index 5aa2989..6f357af 100644 --- a/source/autosim/autosim/skills/navigate.py +++ b/source/autosim/autosim/skills/navigate.py @@ -16,6 +16,7 @@ SkillOutput, WorldState, ) +from autosim.utils.debug_util import debug_visualize_goal_sampling @configclass @@ -73,6 +74,9 @@ def __init__(self, extra_cfg: NavigateSkillExtraCfg) -> None: self._global_path = None self._current_waypoint_idx = 0 + # variables for debug + self._sample_range = None + def extract_goal_from_info( self, skill_info: SkillInfo, env: ManagerBasedEnv, env_extra_info: EnvExtraInfo ) -> SkillGoal: @@ -84,6 +88,7 @@ def extract_goal_from_info( target_object = env.scene[target_object_name] obj_pos_w = target_object.data.root_pos_w[0].cpu().numpy() + self._logger.info(f"Object pose in world frame: {target_object.data.root_pose_w[0]}") is_free = (self._occupancy_map.occupancy_map == 0).cpu().numpy() if np.any(is_free): @@ -93,8 +98,11 @@ def extract_goal_from_info( best_score = -1.0 - angles = np.linspace(0, 2 * np.pi, self.cfg.extra_cfg.num_samples, endpoint=False) + sample_range = env_extra_info.get_navigate_sample_range(target_object_name) + self._sample_range = sample_range + angles = np.linspace(sample_range[0], sample_range[1], self.cfg.extra_cfg.num_samples, endpoint=False) + target_pos_candidate = None for angle in angles: # calculate the sample point coordinates in the world frame cx = obj_pos_w[0] + self.cfg.extra_cfg.sampling_radius * np.cos(angle) @@ -158,6 +166,22 @@ def execute_plan(self, state: WorldState, goal: SkillGoal) -> bool: f"Planning from ({start_pos[0]:.2f}, {start_pos[1]:.2f}) to ({goal_pos[0]:.2f}, {goal_pos[1]:.2f})," f" target_yaw={target_yaw:.2f}." ) + if self._logger.is_debug_enabled: + # debug visualization of map / object / robot / sampling around the object + obj_tensor = state.objects[self._target_object_name] # [x, y, z, qw, qx, qy, qz] + obj_pos_w = obj_tensor[:3].detach().cpu().numpy() + robot_pos_w = state.robot_base_pose.detach().cpu().numpy() + target_pos_candidate = goal_pos.detach().cpu().numpy() + + debug_visualize_goal_sampling( + occupancy_map=self._occupancy_map, + obj_pos_w=obj_pos_w, + robot_pos_w=robot_pos_w, + sample_range=self._sample_range, + sampling_radius=self.cfg.extra_cfg.sampling_radius, + num_samples=self.cfg.extra_cfg.num_samples, + target_pos_candidate=target_pos_candidate, + ) self._global_path = self._global_planner.plan(start_pos, goal_pos) diff --git a/source/autosim/autosim/utils/debug_util.py b/source/autosim/autosim/utils/debug_util.py index bc89ec2..b1293b5 100644 --- a/source/autosim/autosim/utils/debug_util.py +++ b/source/autosim/autosim/utils/debug_util.py @@ -1,9 +1,14 @@ +import importlib.util + +import numpy as np import torch from isaaclab.envs import ManagerBasedEnv from isaaclab.markers import VisualizationMarkers from isaaclab.markers.config import FRAME_MARKER_CFG from isaaclab.utils.math import combine_frame_transforms +from autosim.core.types import OccupancyMap + markers: dict[str, VisualizationMarkers] = {} @@ -79,3 +84,118 @@ def visualize_reach_target_poses(env_extra_info, env: ManagerBasedEnv) -> None: marker_name = f"reach_target_poses_{ee_name}" create_marker(marker_name) visualize_marker(marker_name, extra_poses_w) + + +def debug_visualize_goal_sampling( + occupancy_map: OccupancyMap, + obj_pos_w: np.ndarray, + robot_pos_w: np.ndarray | None, + sample_range: tuple[float, float], + sampling_radius: float, + num_samples: int, + target_pos_candidate: np.ndarray | None, +) -> None: + """ + Visualize the occupancy map, object position, robot position, and sampling around the object. + Args: + occupancy_map: The occupancy map of the environment. + obj_pos_w: The position of the object in the world frame. + robot_pos_w: The position of the robot in the world frame. + sample_range: The range of the sampling angles. + sampling_radius: The radius of the sampling. + num_samples: The number of samples. + target_pos_candidate: The candidate position of the target. + + Returns: + None + """ + # skip if matplotlib is not installed + if importlib.util.find_spec("matplotlib") is None: + return + + import matplotlib.pyplot as plt # type: ignore[import] + + occ = occupancy_map.occupancy_map.cpu().numpy() + origin_x, origin_y = occupancy_map.origin + resolution = float(occupancy_map.resolution) + + # object position in world frame -> grid coordinates + ox = float(obj_pos_w[0]) + oy = float(obj_pos_w[1]) + ogx = int((ox - origin_x) / resolution) + ogy = int((oy - origin_y) / resolution) + + # robot position in world frame -> grid coordinates (if provided) + rgx, rgy = None, None + if robot_pos_w is not None: + rx = float(robot_pos_w[0]) + ry = float(robot_pos_w[1]) + rgx = int((rx - origin_x) / resolution) + rgy = int((ry - origin_y) / resolution) + + # sampling angles + angles = np.linspace(sample_range[0], sample_range[1], num_samples, endpoint=False) + + free_x, free_y = [], [] + occ_x, occ_y = [], [] + + for angle in angles: + cx = ox + sampling_radius * np.cos(angle) + cy = oy + sampling_radius * np.sin(angle) + + gx = int((cx - origin_x) / resolution) + gy = int((cy - origin_y) / resolution) + + # also visualize out of bounds, marked as "occupied" + if 0 <= gy < occ.shape[0] and 0 <= gx < occ.shape[1]: + if occ[gy, gx] == 0: + free_x.append(gx) + free_y.append(gy) + else: + occ_x.append(gx) + occ_y.append(gy) + else: + occ_x.append(gx) + occ_y.append(gy) + + # Reuse the same window across calls and clear it to avoid drawing overlaps. + fig = plt.figure("NavigateSkill Goal Sampling", figsize=(6, 6)) + fig.clear() + ax = fig.add_subplot(1, 1, 1) + ax.imshow(occ, origin="lower", cmap="gray_r", interpolation="nearest") + + # object + if 0 <= ogy < occ.shape[0] and 0 <= ogx < occ.shape[1]: + ax.scatter(ogx, ogy, c="red", marker="*", s=80, label="object") + + # robot position + if rgx is not None and rgy is not None: + if 0 <= rgy < occ.shape[0] and 0 <= rgx < occ.shape[1]: + ax.scatter(rgx, rgy, c="blue", marker="o", s=60, label="robot") + + # sampling points (free = green points, occupied/out = red crosses) + if free_x: + ax.scatter(free_x, free_y, c="green", s=30, label="free samples") + if occ_x: + ax.scatter(occ_x, occ_y, c="red", marker="x", s=30, label="occupied / oob samples") + + # final chosen candidate (if any) + if target_pos_candidate is not None: + cx = float(target_pos_candidate[0]) + cy = float(target_pos_candidate[1]) + cgx = int((cx - origin_x) / resolution) + cgy = int((cy - origin_y) / resolution) + if 0 <= cgy < occ.shape[0] and 0 <= cgx < occ.shape[1]: + ax.scatter(cgx, cgy, c="yellow", edgecolors="black", s=80, label="chosen candidate") + + ax.set_title("Goal Sampling around Object") + ax.set_xlabel("x (grid index)") + ax.set_ylabel("y (grid index)") + ax.legend(loc="upper right") + fig.tight_layout() + fig.show() + # Let the GUI event loop process draw/update events even if the main thread + # quickly goes back to heavy simulation work. + fig.canvas.draw_idle() + fig.canvas.flush_events() + plt.pause(0.02)