diff --git a/examples/visualization/reach_target_pose.py b/examples/visualization/reach_target_pose.py index 6ee9997..ade3ac6 100644 --- a/examples/visualization/reach_target_pose.py +++ b/examples/visualization/reach_target_pose.py @@ -18,16 +18,6 @@ ... ], ... - }, - "object_extra_reach_target_poses": { - "": { - "": [ - [x, y, z, qw, qx, qy, qz], - ... - ], - ... - }, - ... } } @@ -53,8 +43,7 @@ default="reach_target_poses_debug.json", help=( "If provided, the script will export the current `object_reach_target_poses` " - "(and `object_extra_reach_target_poses`) to this JSON file after `reset_env()`, " - "and reload it on every file change." + "to this JSON file after `reset_env()`, and reload it on every file change." ), ) parser.add_argument( @@ -76,22 +65,12 @@ from autosim.utils.debug_util import visualize_reach_target_poses -def _load_env_extra_poses_json( - path: str, -) -> tuple[dict[str, list[list[float]]], dict[str, dict[str, list[list[float]]]]]: - """Load env extra poses from the exported debug JSON. - - We expect the payload format produced by this script: - { - "object_reach_target_poses": { "": [[7], ...] }, - "object_extra_reach_target_poses": { "": { "": [[7], ...] } } - } - """ +def _load_env_extra_poses_json(path: str) -> dict[str, list[list[float]]]: + """Load reach target poses from the exported debug JSON.""" with open(path, encoding="utf-8") as f: data = json.load(f) object_reach_target_poses: dict[str, list[list[float]]] = {} - object_extra_reach_target_poses: dict[str, dict[str, list[list[float]]]] = {} if not isinstance(data, dict): raise ValueError("Debug JSON root must be an object.") @@ -111,43 +90,17 @@ def _load_env_extra_poses_json( normalized.append([float(v) for v in pose]) object_reach_target_poses[obj_name] = normalized - extra = data.get("object_extra_reach_target_poses", {}) - if extra is None: - extra = {} - if not isinstance(extra, dict): - raise ValueError("`object_extra_reach_target_poses` must be an object mapping.") - for obj_name, ee_dict in extra.items(): - if not isinstance(obj_name, str): - raise ValueError("Extra: object names must be strings.") - if not isinstance(ee_dict, dict): - raise ValueError(f"Extra: `{obj_name}` must map ee_name -> pose_list.") - object_extra_reach_target_poses[obj_name] = {} - for ee_name, pose_list in ee_dict.items(): - if not isinstance(ee_name, str): - raise ValueError("Extra: ee_name must be a string.") - if not isinstance(pose_list, list): - raise ValueError(f"Extra: `{obj_name}.{ee_name}` must be a list.") - normalized: list[list[float]] = [] - for pose in pose_list: - if not (isinstance(pose, list) and len(pose) == 7): - raise ValueError(f"Extra: each pose for `{obj_name}.{ee_name}` must be list length 7.") - normalized.append([float(v) for v in pose]) - object_extra_reach_target_poses[obj_name][ee_name] = normalized - - return object_reach_target_poses, object_extra_reach_target_poses + return object_reach_target_poses def _apply_live_poses(*, poses_path: str, pipeline) -> None: - """Update `pipeline._env_extra_info.*` from JSON.""" + """Update `pipeline._env_extra_info.object_reach_target_poses` from JSON.""" env = pipeline._env env_extra_info = pipeline._env_extra_info - object_reach_target_poses, object_extra_reach_target_poses = _load_env_extra_poses_json(poses_path) + object_reach_target_poses = _load_env_extra_poses_json(poses_path) - # Replace dicts entirely so removed keys in the JSON disappear from visualization. env_extra_info.object_reach_target_poses = {} - env_extra_info.object_extra_reach_target_poses = {} - # Update primary reach targets for obj_name, pose_list in object_reach_target_poses.items(): if obj_name not in env.scene.keys(): continue @@ -158,45 +111,20 @@ def _apply_live_poses(*, poses_path: str, pipeline) -> None: torch.tensor(pose, device=device, dtype=dtype) for pose in pose_list ] - # Update extra EE reach targets (if present) - for obj_name, ee_dict in object_extra_reach_target_poses.items(): - if obj_name not in env.scene.keys(): - continue - obj_pose_w = env.scene[obj_name].data.root_pose_w[0] # [7] - device = obj_pose_w.device - dtype = obj_pose_w.dtype - if obj_name not in env_extra_info.object_extra_reach_target_poses: - env_extra_info.object_extra_reach_target_poses[obj_name] = {} - for ee_name, pose_list in ee_dict.items(): - env_extra_info.object_extra_reach_target_poses[obj_name][ee_name] = [ - torch.tensor(pose, device=device, dtype=dtype) for pose in pose_list - ] - def _export_env_extra_poses_to_json(*, out_path: str, pipeline) -> None: """Export current env_extra_info reach targets to JSON.""" env_extra_info = pipeline._env_extra_info def _tensor_pose_to_list(p: list) -> list[float]: - # p is torch.Tensor(7,) but we want plain python floats. return [float(x) for x in p] object_reach_target_poses: dict[str, list[list[float]]] = {} for obj_name, pose_list in env_extra_info.object_reach_target_poses.items(): object_reach_target_poses[obj_name] = [_tensor_pose_to_list(pose.tolist()) for pose in pose_list] - object_extra_reach_target_poses: dict[str, dict[str, list[list[float]]]] = {} - if hasattr(env_extra_info, "object_extra_reach_target_poses"): - for obj_name, ee_dict in env_extra_info.object_extra_reach_target_poses.items(): - object_extra_reach_target_poses[obj_name] = {} - for ee_name, pose_list in ee_dict.items(): - object_extra_reach_target_poses[obj_name][ee_name] = [ - _tensor_pose_to_list(pose.tolist()) for pose in pose_list - ] - payload = { "object_reach_target_poses": object_reach_target_poses, - "object_extra_reach_target_poses": object_extra_reach_target_poses, } with open(out_path, "w", encoding="utf-8") as f: json.dump(payload, f, indent=2) @@ -209,11 +137,9 @@ def main(): debug_path = os.path.abspath(args_cli.debug_poses_path) - # Always export first so user can edit immediately after startup. _export_env_extra_poses_to_json(out_path=debug_path, pipeline=pipeline) print(f"[reach_target_pose] Exported debug poses to: {debug_path}") - # Apply once (in case user edited the file between process start and export completion). try: _apply_live_poses(poses_path=debug_path, pipeline=pipeline) except Exception as e: diff --git a/source/autosim/autosim/calibration/plan_sweep.py b/source/autosim/autosim/calibration/plan_sweep.py index 8086be3..4b81716 100644 --- a/source/autosim/autosim/calibration/plan_sweep.py +++ b/source/autosim/autosim/calibration/plan_sweep.py @@ -7,6 +7,7 @@ from autosim.core.pipeline import AutoSimPipeline from autosim.core.registration import SkillRegistry +from autosim.skills.reach import ReachSkill from .pose_sampler import OffsetSampler, PoseSampler @@ -32,6 +33,27 @@ def _fmt_pose(vals: list[float]) -> str: return "[" + ", ".join(f"{v:.4f}" for v in vals) + "]" +def _build_extra_target_link_goals( + reach_skill: ReachSkill, + activate_q: torch.Tensor, + target_poses_r: torch.Tensor, + env_extra_info, +) -> dict[str, torch.Tensor] | None: + """Build batched extra link goals by reusing ReachSkill logic.""" + if not reach_skill.cfg.extra_cfg.extra_target_link_names: + return None + + goal_list = [] + for target_pose in target_poses_r: + goal_list.append(reach_skill._build_extra_target_poses(activate_q, target_pose, env_extra_info)) # noqa: SLF001 + + first_goal = goal_list[0] + if first_goal is None: + return None + + return {link_name: torch.stack([goal[link_name] for goal in goal_list], dim=0) for link_name in first_goal.keys()} + + def reach_plan_sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg) -> list[dict[str, Any]]: """ Execute the pipeline step by step. When the reach_skill_index-th reach skill @@ -41,14 +63,9 @@ def reach_plan_sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg) -> list[ All skills before the target reach skill are executed normally, so the environment reflects the actual state at the point of interest. - For multi-arm tasks (i.e. when `EnvExtraInfo.object_extra_reach_target_poses` is set), - extra EE goals are swept in lock-step with the main EE: both use the same sampled - offsets, so the relative configuration between arms is preserved across all candidates. - Returns: Top-k result rows sorted by plan quality. Each row contains: "pose_oe": list[float] — main EE pose in object frame [x,y,z,qw,qx,qy,qz] - "extra_pose_oe/": list[float] — extra EE poses (multi-arm only) "plan_success": bool "traj_len": int | None — trajectory length (full planning only) "position_error": float | None — IK position error (IK-only mode) @@ -63,7 +80,6 @@ def reach_plan_sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg) -> list[ python reach_plan_sweep.py --reach_skill_index 1 ... """ - # modified from pipeline.run() to only execute the reach skill at the specified index pipeline.initialize() decompose_result = pipeline.decompose() @@ -88,7 +104,7 @@ def reach_plan_sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg) -> list[ if is_reach and reach_skill_counter == cfg.reach_skill_index: obj_name = skill_info.target_object obj_pose_idx = reach_count_per_object.get(obj_name, 0) - return _sweep(pipeline, cfg, obj_name, obj_pose_idx) + return _sweep(pipeline, cfg, obj_name, obj_pose_idx, skill) goal = skill.extract_goal_from_info(skill_info, pipeline._env, pipeline._env_extra_info) if is_reach: @@ -110,24 +126,20 @@ def reach_plan_sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg) -> list[ ) -def _sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg, obj_name: str, obj_pose_idx: int) -> list[dict[str, Any]]: +def _sweep( + pipeline: AutoSimPipeline, + cfg: ReachPlanSweepCfg, + obj_name: str, + obj_pose_idx: int, + reach_skill: ReachSkill, +) -> list[dict[str, Any]]: """ Core sweep logic. Called once the environment is in the correct pre-reach state. Samples K candidate poses around the base reach target (object frame), transforms them to - robot root frame, then batch-plans with cuRobo. If `object_extra_reach_target_poses` is - defined for this object, extra EE goals are sampled with the same offsets (OffsetSampler - resets its RNG from `self.seed` on every `sample()` call, so identical offsets are applied - to every EE) and passed to the planner as `link_goals`. - - Args: - pipeline: The pipeline providing env, planner, and extra info. - cfg: Sweep configuration (sampler, top_k, ik_only). - obj_name: Name of the target object in the scene. - obj_pose_idx: Index into `object_reach_target_poses[obj_name]` — which reach call this is. - - Returns: - Top-k result rows (see `reach_plan_sweep` for row schema), sorted by plan quality. + robot root frame, then batch-plans with cuRobo. When configured, extra link goals are + generated from the live joint state using the same extra-target strategy as + `ReachSkill.extract_goal_from_info()`. """ env = pipeline._env @@ -141,17 +153,13 @@ def _sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg, obj_name: str, obj raise ValueError(f"pose index {obj_pose_idx} out of range for object '{obj_name}' ({len(pose_list)} poses).") base_pose_oe = torch.as_tensor(pose_list[obj_pose_idx], device=env.device, dtype=torch.float32).view(7) - - # Sample object-frame candidate poses around base poses_oe = cfg.sampling.sample(base_pose_oe) k = int(poses_oe.shape[0]) - # Read live object pose in world frame obj_pose_w = env.scene[obj_name].data.root_pose_w[env_id] obj_pos_w = obj_pose_w[:3].view(1, 3).repeat(k, 1) obj_quat_w = obj_pose_w[3:].view(1, 4).repeat(k, 1) - # object frame -> world frame -> robot root frame target_pos_w, target_quat_w = PoseUtils.combine_frame_transforms( obj_pos_w, obj_quat_w, poses_oe[:, :3], poses_oe[:, 3:] ) @@ -160,52 +168,16 @@ def _sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg, obj_name: str, obj rr_quat_w = robot_root_pose_w[3:].view(1, 4).repeat(k, 1) target_pos_r, target_quat_r = PoseUtils.subtract_frame_transforms(rr_pos_w, rr_quat_w, target_pos_w, target_quat_w) - # Build current joint state in planner joint order (name-based mapping) state = pipeline._build_world_state() - full_sim_joint_names = state.sim_joint_names - full_q = state.robot_joint_pos - full_qd = state.robot_joint_vel - activate_q, activate_qd = [], [] - for joint_name in planner.target_joint_names: - if joint_name not in full_sim_joint_names: - raise ValueError(f"Planner joint '{joint_name}' not found in simulation joint names.") - idx = full_sim_joint_names.index(joint_name) - activate_q.append(full_q[idx]) - activate_qd.append(full_qd[idx]) - activate_q = torch.stack(activate_q, dim=0) - activate_qd = torch.stack(activate_qd, dim=0) - - # Build extra EE goals from object_extra_reach_target_poses (multi-arm only). - # OffsetSampler.sample() resets its RNG from self.seed on every call, so sample i of any - # extra EE receives the same dx/dy/dz/dyaw offset as sample i of the main EE — keeping all - # arms coherently displaced across the entire candidate batch. - extra_poses_oe_dict: dict[str, torch.Tensor] = {} - link_goals: dict[str, torch.Tensor] | None = None - - extra_pose_map = env_extra_info.object_extra_reach_target_poses.get(obj_name, {}) - if extra_pose_map: - extra_link_pos_r: dict[str, torch.Tensor] = {} - extra_link_quat_r: dict[str, torch.Tensor] = {} - for ee_name, ee_pose_list in extra_pose_map.items(): - if not (0 <= obj_pose_idx < len(ee_pose_list)): - raise ValueError( - f"extra pose index {obj_pose_idx} out of range for '{obj_name}'/'{ee_name}'" - f" ({len(ee_pose_list)} poses)." - ) - ee_base = torch.as_tensor(ee_pose_list[obj_pose_idx], device=env.device, dtype=torch.float32).view(7) - ee_poses_oe = cfg.sampling.sample(ee_base) - extra_poses_oe_dict[ee_name] = ee_poses_oe - - ee_pos_w, ee_quat_w = PoseUtils.combine_frame_transforms( - obj_pos_w, obj_quat_w, ee_poses_oe[:, :3], ee_poses_oe[:, 3:] - ) - ee_pos_r, ee_quat_r = PoseUtils.subtract_frame_transforms(rr_pos_w, rr_quat_w, ee_pos_w, ee_quat_w) - extra_link_pos_r[ee_name] = ee_pos_r - extra_link_quat_r[ee_name] = ee_quat_r + activate_q, activate_qd = reach_skill._build_activate_joint_state( # noqa: SLF001 + state.sim_joint_names, state.robot_joint_pos, state.robot_joint_vel + ) + if activate_qd is None: + raise ValueError("activate_qd should not be None when sweep planning reach trajectories.") - link_goals = {ee: torch.cat([extra_link_pos_r[ee], extra_link_quat_r[ee]], dim=-1) for ee in extra_link_pos_r} + target_poses_r = torch.cat((target_pos_r, target_quat_r), dim=-1) + link_goals = _build_extra_target_link_goals(reach_skill, activate_q, target_poses_r, env_extra_info) - # Batch plan t0 = time.time() if cfg.ik_only: result = planner.solve_ik_batch(target_pos_r, target_quat_r, link_goals=link_goals) @@ -225,7 +197,6 @@ def _sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg, obj_name: str, obj for i in range(k): rows.append({ "pose_oe": _tensor_to_list(poses_oe[i]), - **{f"extra_pose_oe/{ee}": _tensor_to_list(extra_poses_oe_dict[ee][i]) for ee in extra_poses_oe_dict}, "plan_success": bool(success[i].item()), "traj_len": int(traj_last[i]) if traj_last is not None else None, "position_error": float(pos_err[i].item()) if pos_err is not None else None, @@ -250,10 +221,6 @@ def _sort_key(r): mark = "✓" if r["plan_success"] else "✗" metric = f"traj_len={r['traj_len']}" if r["traj_len"] is not None else f"pos_err={r['position_error']:.4f}" print(f" [{rank}] {mark} {_fmt_pose(r['pose_oe'])} # {metric}") - for ee in extra_poses_oe_dict: - print(f" {ee}: {_fmt_pose(r[f'extra_pose_oe/{ee}'])}") - if extra_poses_oe_dict: - print() print(_SEP) return top_k diff --git a/source/autosim/autosim/capabilities/motion_planning/curobo/curobo_planner.py b/source/autosim/autosim/capabilities/motion_planning/curobo/curobo_planner.py index 52c164f..c184dd9 100644 --- a/source/autosim/autosim/capabilities/motion_planning/curobo/curobo_planner.py +++ b/source/autosim/autosim/capabilities/motion_planning/curobo/curobo_planner.py @@ -202,6 +202,11 @@ def plan_motion( current_q = current_q[:dof_needed] current_qd = current_qd[:dof_needed] + joint_limits = self.motion_gen.kinematics.get_joint_limits() + current_q = torch.clamp( + self._to_curobo_device(current_q), joint_limits.position[0], joint_limits.position[1] + ).to(current_q.device) + # build the target pose goal = Pose( position=self._to_curobo_device(target_pos), @@ -385,8 +390,26 @@ def reset(self): def get_ee_pose(self, current_q: torch.Tensor) -> Pose: """Get the end-effector pose of the robot.""" + return self.get_link_pose(current_q, self.motion_gen.kinematics.ee_link) + + def get_link_pose(self, current_q: torch.Tensor, link_name: str) -> Pose: + """Get the pose of a specific link in the robot root frame.""" + + return self.get_link_poses(current_q, [link_name])[link_name] + + def get_link_poses(self, current_q: torch.Tensor, link_names: list[str]) -> dict[str, Pose]: + """Get the poses of specific links in the robot root frame.""" + current_joint_state = JointState( position=self._to_curobo_device(current_q), joint_names=self.target_joint_names ) kin_state = self.motion_gen.compute_kinematics(current_joint_state) - return kin_state.link_poses[self.motion_gen.kinematics.ee_link] + + missing_link_names = [link_name for link_name in link_names if link_name not in kin_state.link_poses] + if missing_link_names: + raise ValueError( + f"Unknown cuRobo link name(s): {missing_link_names}. Available links:" + f" {list(kin_state.link_poses.keys())}" + ) + + return {link_name: kin_state.link_poses[link_name] for link_name in link_names} diff --git a/source/autosim/autosim/core/types.py b/source/autosim/autosim/core/types.py index 9395ab3..9be2ae9 100644 --- a/source/autosim/autosim/core/types.py +++ b/source/autosim/autosim/core/types.py @@ -86,17 +86,19 @@ class EnvExtraInfo: object_reach_target_poses: dict[str, list[torch.Tensor]] = field(default_factory=dict) """The reach target poses in the objects frame. each object can have a list of reach target poses [x, y, z, qw, qx, qy, qz] in the order of execution.""" - object_extra_reach_target_poses: dict[str, dict[str, list[torch.Tensor]]] = field(default_factory=dict) - """The extra reach target poses in the objects frame. each object can have a list of extra reach target poses [x, y, z, qw, qx, qy, qz] with ee_name as the key in the order of execution.""" object_navigate_sample_range: dict[str, tuple[float, float]] = field(default_factory=dict) """The sample range for the navigate skill. each object can have a tuple of (min_angle, max_angle) in radians.""" + cached_initial_extra_target_offsets: dict[str, tuple[torch.Tensor, torch.Tensor]] | None = None + """Cached primary-frame offsets for extra target links, reused across multiple reach-like skills.""" + def __post_init__(self): self.reset() def reset(self) -> None: """Reset the environment extra information.""" + self.cached_initial_extra_target_offsets = None self._reset_target_pose_iterators() def _reset_target_pose_iterators(self) -> None: @@ -105,13 +107,6 @@ def _reset_target_pose_iterators(self) -> None: object_name: self._build_iterator(reach_target_poses) for object_name, reach_target_poses in self.object_reach_target_poses.items() } - self._object_extra_reach_target_poses_iterator_dict = { - object_name: { - ee_name: self._build_iterator(extra_reach_target_poses) - for ee_name, extra_reach_target_poses in extra_reach_target_poses.items() - } - for object_name, extra_reach_target_poses in self.object_extra_reach_target_poses.items() - } def _build_iterator(self, value_list: list[torch.Tensor]) -> Iterator[torch.Tensor]: yield from value_list @@ -119,9 +114,6 @@ def _build_iterator(self, value_list: list[torch.Tensor]) -> Iterator[torch.Tens def get_next_reach_target_pose(self, object_name: str) -> torch.Tensor: return next(self._object_reach_target_poses_iterator_dict[object_name]) - def get_next_extra_reach_target_pose(self, object_name: str, ee_name: str) -> torch.Tensor: - return next(self._object_extra_reach_target_poses_iterator_dict[object_name][ee_name]) - def get_navigate_sample_range(self, object_name: str) -> tuple[float, float]: return self.object_navigate_sample_range.get(object_name, (0.0, 2 * np.pi)) diff --git a/source/autosim/autosim/skills/base_skill.py b/source/autosim/autosim/skills/base_skill.py index dcbfdf2..8042ffd 100644 --- a/source/autosim/autosim/skills/base_skill.py +++ b/source/autosim/autosim/skills/base_skill.py @@ -88,6 +88,21 @@ class CuroboSkillExtraCfg(SkillExtraCfg): debug_target_pose: bool = False """Whether to debug the target pose.""" + extra_target_link_names: list[str] = [] + """Additional cuRobo link names constrained during planning.""" + + extra_target_mode: str = "keep_current" + """How additional cuRobo link goals are generated.""" + + def __post_init__(self) -> None: + supported_modes = {"keep_current", "keep_relative_offset", "keep_initial_relative_offset"} + if self.extra_target_mode not in supported_modes: + raise ValueError( + f"Unsupported extra_target_mode: {self.extra_target_mode}. Supported modes: {sorted(supported_modes)}" + ) + if len(self.extra_target_link_names) != len(set(self.extra_target_link_names)): + raise ValueError("extra_target_link_names must not contain duplicates.") + class CuroboSkillBase(Skill): """Base class for skills dependent on curobo.""" diff --git a/source/autosim/autosim/skills/reach.py b/source/autosim/autosim/skills/reach.py index 4fb61de..d528ca6 100644 --- a/source/autosim/autosim/skills/reach.py +++ b/source/autosim/autosim/skills/reach.py @@ -42,6 +42,203 @@ def __init__(self, extra_cfg: CuroboSkillExtraCfg) -> None: self._trajectory = None self._step_idx = 0 + def _build_activate_joint_state( + self, full_sim_joint_names: list[str], full_sim_q: torch.Tensor, full_sim_qd: torch.Tensor | None = None + ) -> tuple[torch.Tensor, torch.Tensor | None]: + """Extract the planner's active joint state from full simulation joint state. + + cuRobo typically plans over a subset of "active joints" (`self._planner.target_joint_names`). + This helper slices the full simulation joint vectors into that active subset, ordered exactly + as the planner expects, returning `q` and (optionally) `qd`. + + Args: + full_sim_joint_names: Joint name list from simulation (index-aligned with `full_sim_q`/`full_sim_qd`). + full_sim_q: Full simulation joint positions, shape `[num_sim_joints]`. + full_sim_qd: Optional full simulation joint velocities, shape `[num_sim_joints]`. + + Returns: + A tuple `(activate_q, activate_qd)` where: + - `activate_q` is ordered by `self._planner.target_joint_names`, shape `[num_active_joints]`. + - `activate_qd` is the corresponding velocities if `full_sim_qd` is provided; otherwise `None`. + + Raises: + ValueError: If any planner target joint is missing from `full_sim_joint_names`. + """ + + activate_q, activate_qd = [], [] if full_sim_qd is not None else None + for joint_name in self._planner.target_joint_names: + if joint_name not in full_sim_joint_names: + raise ValueError( + f"Joint {joint_name} in planner activate joints is not in the full simulation joint names." + ) + sim_joint_idx = full_sim_joint_names.index(joint_name) + activate_q.append(full_sim_q[sim_joint_idx]) + if full_sim_qd is not None and activate_qd is not None: + activate_qd.append(full_sim_qd[sim_joint_idx]) + + activate_q_tensor = torch.stack(activate_q, dim=0) + if activate_qd is None: + return activate_q_tensor, None + return activate_q_tensor, torch.stack(activate_qd, dim=0) + + def _get_current_primary_and_extra_link_poses( + self, activate_q: torch.Tensor + ) -> tuple[tuple[torch.Tensor, torch.Tensor], dict[str, tuple[torch.Tensor, torch.Tensor]]]: + """Get current primary and extra link poses in robot root frame.""" + + current_link_poses = self._planner.get_link_poses( + activate_q, [self._planner.motion_gen.kinematics.ee_link, *self.cfg.extra_cfg.extra_target_link_names] + ) + primary_pose_in_robot_root = current_link_poses[self._planner.motion_gen.kinematics.ee_link] + primary_link_pose_in_robot_root = ( + primary_pose_in_robot_root.position, + primary_pose_in_robot_root.quaternion, + ) + extra_link_poses_in_robot_root = { + link_name: (pose.position, pose.quaternion) + for link_name, pose in current_link_poses.items() + if link_name != self._planner.motion_gen.kinematics.ee_link + } + return primary_link_pose_in_robot_root, extra_link_poses_in_robot_root + + def _compute_relative_extra_target_poses( + self, + primary_link_pose_in_robot_root: tuple[torch.Tensor, torch.Tensor], + extra_link_offsets_in_primary: dict[str, tuple[torch.Tensor, torch.Tensor]], + target_pose: torch.Tensor, + ) -> dict[str, torch.Tensor]: + """Project cached or current primary-frame offsets onto the target primary pose.""" + + primary_target_pos_in_robot_root = target_pose[:3].unsqueeze(0).to(primary_link_pose_in_robot_root[0].device) + primary_target_quat_in_robot_root = target_pose[3:].unsqueeze(0).to(primary_link_pose_in_robot_root[1].device) + + extra_target_poses = {} + for link_name, (link_pos_in_primary, link_quat_in_primary) in extra_link_offsets_in_primary.items(): + link_target_pos_in_robot_root, link_target_quat_in_robot_root = PoseUtils.combine_frame_transforms( + primary_target_pos_in_robot_root, + primary_target_quat_in_robot_root, + link_pos_in_primary, + link_quat_in_primary, + ) + self._logger.info( + f"Relative offset for {link_name} in primary frame: pos={link_pos_in_primary}," + f" quat={link_quat_in_primary}" + ) + self._logger.info( + f"Target pose for {link_name} in robot root frame: pos={link_target_pos_in_robot_root}," + f" quat={link_target_quat_in_robot_root}" + ) + extra_target_poses[link_name] = torch.cat( + (link_target_pos_in_robot_root, link_target_quat_in_robot_root), dim=-1 + ).squeeze(0) + + return extra_target_poses + + def _compute_relative_offsets_in_primary( + self, + primary_link_pose_in_robot_root: tuple[torch.Tensor, torch.Tensor], + extra_link_poses_in_robot_root: dict[str, tuple[torch.Tensor, torch.Tensor]], + ) -> dict[str, tuple[torch.Tensor, torch.Tensor]]: + """Compute the current rigid offset from the primary link frame to each extra link.""" + + primary_pos_in_robot_root, primary_quat_in_robot_root = primary_link_pose_in_robot_root + extra_link_offsets_in_primary = {} + for link_name, (link_pos_in_robot_root, link_quat_in_robot_root) in extra_link_poses_in_robot_root.items(): + link_pos_in_primary, link_quat_in_primary = PoseUtils.subtract_frame_transforms( + primary_pos_in_robot_root, + primary_quat_in_robot_root, + link_pos_in_robot_root, + link_quat_in_robot_root, + ) + extra_link_offsets_in_primary[link_name] = (link_pos_in_primary, link_quat_in_primary) + + return extra_link_offsets_in_primary + + def _build_extra_target_poses( + self, + activate_q: torch.Tensor, + target_pose: torch.Tensor, + env_extra_info: EnvExtraInfo, + ) -> dict[str, torch.Tensor] | None: + """Build link-level extra target poses based on configuration. + + This is the dispatcher for `extra_target_mode`. It returns a dict mapping link names to pose + tensors in `[x, y, z, qw, qx, qy, qz]` (single-sample), used as additional link goals/constraints + during planning. + """ + + if not self.cfg.extra_cfg.extra_target_link_names: + return None + + if self.cfg.extra_cfg.extra_target_mode == "keep_current": + return self._build_keep_current_extra_target_poses(activate_q) + if self.cfg.extra_cfg.extra_target_mode == "keep_relative_offset": + return self._build_keep_relative_offset_extra_target_poses(activate_q, target_pose) + if self.cfg.extra_cfg.extra_target_mode == "keep_initial_relative_offset": + return self._build_keep_initial_relative_offset_extra_target_poses(activate_q, target_pose, env_extra_info) + raise ValueError(f"Unsupported extra_target_mode: {self.cfg.extra_cfg.extra_target_mode}") + + def _build_keep_current_extra_target_poses(self, activate_q: torch.Tensor) -> dict[str, torch.Tensor] | None: + """Build "keep current pose" extra targets for configured links. + + In `keep_current` mode, this computes FK for each link in `extra_target_link_names` and uses + its current pose as the planning target, effectively constraining those links to remain fixed. + """ + + extra_target_poses = {} + for link_name, pose in self._planner.get_link_poses( + activate_q, self.cfg.extra_cfg.extra_target_link_names + ).items(): + extra_target_poses[link_name] = torch.cat((pose.position, pose.quaternion), dim=-1).squeeze(0) + + return extra_target_poses + + def _build_keep_relative_offset_extra_target_poses( + self, activate_q: torch.Tensor, target_pose: torch.Tensor + ) -> dict[str, torch.Tensor] | None: + """Build extra targets by preserving the current rigid transform from primary EE to each extra link.""" + + primary_link_pose_in_robot_root, extra_link_poses_in_robot_root = ( + self._get_current_primary_and_extra_link_poses(activate_q) + ) + + extra_link_offsets_in_primary = self._compute_relative_offsets_in_primary( + primary_link_pose_in_robot_root, extra_link_poses_in_robot_root + ) + return self._compute_relative_extra_target_poses( + primary_link_pose_in_robot_root, extra_link_offsets_in_primary, target_pose + ) + + def _build_keep_initial_relative_offset_extra_target_poses( + self, + activate_q: torch.Tensor, + target_pose: torch.Tensor, + env_extra_info: EnvExtraInfo, + ) -> dict[str, torch.Tensor] | None: + """Build extra targets by preserving the first observed rigid transform from primary EE to each extra link.""" + + primary_link_pose_in_robot_root, extra_link_poses_in_robot_root = ( + self._get_current_primary_and_extra_link_poses(activate_q) + ) + + if env_extra_info.cached_initial_extra_target_offsets is None: + env_extra_info.cached_initial_extra_target_offsets = self._compute_relative_offsets_in_primary( + primary_link_pose_in_robot_root, extra_link_poses_in_robot_root + ) + self._logger.info( + "Cached initial relative offsets for extra links:" + f" {list(env_extra_info.cached_initial_extra_target_offsets.keys())}" + ) + else: + self._logger.info( + "Reusing cached initial relative offsets for extra links:" + f" {list(env_extra_info.cached_initial_extra_target_offsets.keys())}" + ) + + return self._compute_relative_extra_target_poses( + primary_link_pose_in_robot_root, env_extra_info.cached_initial_extra_target_offsets, target_pose + ) + def extract_goal_from_info( self, skill_info: SkillInfo, env: ManagerBasedEnv, env_extra_info: EnvExtraInfo ) -> SkillGoal: @@ -78,28 +275,10 @@ def extract_goal_from_info( ) target_pose = torch.cat((reach_target_pos_in_robot_root, reach_target_quat_in_robot_root), dim=-1).squeeze(0) - - if target_object in env_extra_info.object_extra_reach_target_poses.keys(): - extra_target_poses = {} - for ee_name in env_extra_info.object_extra_reach_target_poses[target_object].keys(): - ee_target_pose = env_extra_info.get_next_extra_reach_target_pose(target_object, ee_name) - ee_target_pose = torch.as_tensor(ee_target_pose, device=env.device) - extra_target_pos_in_obj, extra_target_quat_in_obj = ee_target_pose[:3].unsqueeze(0), ee_target_pose[ - 3: - ].unsqueeze(0) - extra_target_pos_in_env, extra_target_quat_in_env = PoseUtils.combine_frame_transforms( - object_pos_in_env, object_quat_in_env, extra_target_pos_in_obj, extra_target_quat_in_obj - ) - self._logger.info(f"Extra target position for {ee_name} in environment: {extra_target_pos_in_env}") - self._logger.info(f"Extra target quaternion for {ee_name} in environment: {extra_target_quat_in_env}") - extra_target_pos_in_robot_root, extra_target_quat_in_robot_root = PoseUtils.subtract_frame_transforms( - robot_root_pos_in_env, robot_root_quat_in_env, extra_target_pos_in_env, extra_target_quat_in_env - ) - extra_target_poses[ee_name] = torch.cat( - (extra_target_pos_in_robot_root, extra_target_quat_in_robot_root), dim=-1 - ).squeeze(0) - else: - extra_target_poses = None + activate_q, _ = self._build_activate_joint_state( + robot.data.joint_names, robot.data.joint_pos[0], robot.data.joint_vel[0] + ) + extra_target_poses = self._build_extra_target_poses(activate_q, target_pose, env_extra_info) return SkillGoal(target_object=target_object, target_pose=target_pose, extra_target_poses=extra_target_poses) @@ -111,22 +290,12 @@ def execute_plan(self, state: WorldState, goal: SkillGoal) -> bool: target_pose = goal.target_pose # target pose in the robot root frame target_pos, target_quat = target_pose[:3], target_pose[3:] - full_sim_joint_names = state.sim_joint_names - full_sim_q = state.robot_joint_pos - full_sim_qd = state.robot_joint_vel - planner_activate_joints = self._planner.target_joint_names - - activate_q, activate_qd = [], [] - for joint_name in planner_activate_joints: - if joint_name in full_sim_joint_names: - activate_q.append(full_sim_q[full_sim_joint_names.index(joint_name)]) - activate_qd.append(full_sim_qd[full_sim_joint_names.index(joint_name)]) - else: - raise ValueError( - f"Joint {joint_name} in planner activate joints is not in the full simulation joint names." - ) - activate_q = torch.stack(activate_q, dim=0) - activate_qd = torch.stack(activate_qd, dim=0) + activate_q, activate_qd = self._build_activate_joint_state( + state.sim_joint_names, state.robot_joint_pos, state.robot_joint_vel + ) + if activate_qd is None: + raise ValueError("activate_qd should not be None when planning reach trajectories.") + self._trajectory = self._planner.plan_motion( target_pos, target_quat, diff --git a/source/autosim/autosim/skills/relative_reach.py b/source/autosim/autosim/skills/relative_reach.py index 99caf3a..c69b6fd 100644 --- a/source/autosim/autosim/skills/relative_reach.py +++ b/source/autosim/autosim/skills/relative_reach.py @@ -67,22 +67,11 @@ def extract_goal_from_info( def execute_plan(self, state: WorldState, goal: SkillGoal) -> bool: """Execute the plan of the relative reach skill.""" - full_sim_joint_names = state.sim_joint_names - full_sim_q = state.robot_joint_pos - full_sim_qd = state.robot_joint_vel - planner_activate_joints = self._planner.target_joint_names - - activate_q, activate_qd = [], [] - for joint_name in planner_activate_joints: - if joint_name in full_sim_joint_names: - activate_q.append(full_sim_q[full_sim_joint_names.index(joint_name)]) - activate_qd.append(full_sim_qd[full_sim_joint_names.index(joint_name)]) - else: - raise ValueError( - f"Joint {joint_name} in planner activate joints is not in the full simulation joint names." - ) - activate_q = torch.stack(activate_q, dim=0) - activate_qd = torch.stack(activate_qd, dim=0) + activate_q, activate_qd = self._build_activate_joint_state( + state.sim_joint_names, state.robot_joint_pos, state.robot_joint_vel + ) + if activate_qd is None: + raise ValueError("activate_qd should not be None when planning relative reach trajectories.") ee_pose = self._planner.get_ee_pose(activate_q) target_pos, target_quat = ee_pose.position.clone(), ee_pose.quaternion.clone() diff --git a/source/autosim/autosim/utils/debug_util.py b/source/autosim/autosim/utils/debug_util.py index b1293b5..b249b2f 100644 --- a/source/autosim/autosim/utils/debug_util.py +++ b/source/autosim/autosim/utils/debug_util.py @@ -57,34 +57,15 @@ def visualize_reach_target_poses(env_extra_info, env: ManagerBasedEnv) -> None: Creates markers for: - ``env_extra_info.object_reach_target_poses`` under the marker name ``"reach_target_poses"``. - - Each extra EE in ``env_extra_info.object_extra_reach_target_poses`` under - ``"reach_target_poses_{ee_name}"``. Must be called after the environment has been reset so that object poses are at their initial positions. """ - # Primary reach target poses primary_poses_w = _collect_world_poses(env_extra_info.object_reach_target_poses, env) if primary_poses_w is not None: create_marker("reach_target_poses") visualize_marker("reach_target_poses", primary_poses_w) - # Extra EE reach target poses (multi-arm) - # object_extra_reach_target_poses: dict[obj_name, dict[ee_name, list[Tensor]]] - ee_pose_lists: dict[str, dict[str, list[torch.Tensor]]] = {} - for obj_name, ee_dict in env_extra_info.object_extra_reach_target_poses.items(): - for ee_name, pose_list in ee_dict.items(): - if ee_name not in ee_pose_lists: - ee_pose_lists[ee_name] = {} - ee_pose_lists[ee_name][obj_name] = pose_list - - for ee_name, obj_poses in ee_pose_lists.items(): - extra_poses_w = _collect_world_poses(obj_poses, env) - if extra_poses_w is not None: - marker_name = f"reach_target_poses_{ee_name}" - create_marker(marker_name) - visualize_marker(marker_name, extra_poses_w) - def debug_visualize_goal_sampling( occupancy_map: OccupancyMap,