Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 6 additions & 80 deletions examples/visualization/reach_target_pose.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,6 @@
...
],
...
},
"object_extra_reach_target_poses": {
"<object_name>": {
"<ee_name>": [
[x, y, z, qw, qx, qy, qz],
...
],
...
},
...
}
}

Expand All @@ -53,8 +43,7 @@
default="reach_target_poses_debug.json",
help=(
"If provided, the script will export the current `object_reach_target_poses` "
"(and `object_extra_reach_target_poses`) to this JSON file after `reset_env()`, "
"and reload it on every file change."
"to this JSON file after `reset_env()`, and reload it on every file change."
),
)
parser.add_argument(
Expand All @@ -76,22 +65,12 @@
from autosim.utils.debug_util import visualize_reach_target_poses


def _load_env_extra_poses_json(
path: str,
) -> tuple[dict[str, list[list[float]]], dict[str, dict[str, list[list[float]]]]]:
"""Load env extra poses from the exported debug JSON.

We expect the payload format produced by this script:
{
"object_reach_target_poses": { "<obj>": [[7], ...] },
"object_extra_reach_target_poses": { "<obj>": { "<ee>": [[7], ...] } }
}
"""
def _load_env_extra_poses_json(path: str) -> dict[str, list[list[float]]]:
"""Load reach target poses from the exported debug JSON."""
with open(path, encoding="utf-8") as f:
data = json.load(f)

object_reach_target_poses: dict[str, list[list[float]]] = {}
object_extra_reach_target_poses: dict[str, dict[str, list[list[float]]]] = {}

if not isinstance(data, dict):
raise ValueError("Debug JSON root must be an object.")
Expand All @@ -111,43 +90,17 @@ def _load_env_extra_poses_json(
normalized.append([float(v) for v in pose])
object_reach_target_poses[obj_name] = normalized

extra = data.get("object_extra_reach_target_poses", {})
if extra is None:
extra = {}
if not isinstance(extra, dict):
raise ValueError("`object_extra_reach_target_poses` must be an object mapping.")
for obj_name, ee_dict in extra.items():
if not isinstance(obj_name, str):
raise ValueError("Extra: object names must be strings.")
if not isinstance(ee_dict, dict):
raise ValueError(f"Extra: `{obj_name}` must map ee_name -> pose_list.")
object_extra_reach_target_poses[obj_name] = {}
for ee_name, pose_list in ee_dict.items():
if not isinstance(ee_name, str):
raise ValueError("Extra: ee_name must be a string.")
if not isinstance(pose_list, list):
raise ValueError(f"Extra: `{obj_name}.{ee_name}` must be a list.")
normalized: list[list[float]] = []
for pose in pose_list:
if not (isinstance(pose, list) and len(pose) == 7):
raise ValueError(f"Extra: each pose for `{obj_name}.{ee_name}` must be list length 7.")
normalized.append([float(v) for v in pose])
object_extra_reach_target_poses[obj_name][ee_name] = normalized

return object_reach_target_poses, object_extra_reach_target_poses
return object_reach_target_poses


def _apply_live_poses(*, poses_path: str, pipeline) -> None:
"""Update `pipeline._env_extra_info.*` from JSON."""
"""Update `pipeline._env_extra_info.object_reach_target_poses` from JSON."""
env = pipeline._env
env_extra_info = pipeline._env_extra_info
object_reach_target_poses, object_extra_reach_target_poses = _load_env_extra_poses_json(poses_path)
object_reach_target_poses = _load_env_extra_poses_json(poses_path)

# Replace dicts entirely so removed keys in the JSON disappear from visualization.
env_extra_info.object_reach_target_poses = {}
env_extra_info.object_extra_reach_target_poses = {}

# Update primary reach targets
for obj_name, pose_list in object_reach_target_poses.items():
if obj_name not in env.scene.keys():
continue
Expand All @@ -158,45 +111,20 @@ def _apply_live_poses(*, poses_path: str, pipeline) -> None:
torch.tensor(pose, device=device, dtype=dtype) for pose in pose_list
]

# Update extra EE reach targets (if present)
for obj_name, ee_dict in object_extra_reach_target_poses.items():
if obj_name not in env.scene.keys():
continue
obj_pose_w = env.scene[obj_name].data.root_pose_w[0] # [7]
device = obj_pose_w.device
dtype = obj_pose_w.dtype
if obj_name not in env_extra_info.object_extra_reach_target_poses:
env_extra_info.object_extra_reach_target_poses[obj_name] = {}
for ee_name, pose_list in ee_dict.items():
env_extra_info.object_extra_reach_target_poses[obj_name][ee_name] = [
torch.tensor(pose, device=device, dtype=dtype) for pose in pose_list
]


def _export_env_extra_poses_to_json(*, out_path: str, pipeline) -> None:
"""Export current env_extra_info reach targets to JSON."""
env_extra_info = pipeline._env_extra_info

def _tensor_pose_to_list(p: list) -> list[float]:
# p is torch.Tensor(7,) but we want plain python floats.
return [float(x) for x in p]

object_reach_target_poses: dict[str, list[list[float]]] = {}
for obj_name, pose_list in env_extra_info.object_reach_target_poses.items():
object_reach_target_poses[obj_name] = [_tensor_pose_to_list(pose.tolist()) for pose in pose_list]

object_extra_reach_target_poses: dict[str, dict[str, list[list[float]]]] = {}
if hasattr(env_extra_info, "object_extra_reach_target_poses"):
for obj_name, ee_dict in env_extra_info.object_extra_reach_target_poses.items():
object_extra_reach_target_poses[obj_name] = {}
for ee_name, pose_list in ee_dict.items():
object_extra_reach_target_poses[obj_name][ee_name] = [
_tensor_pose_to_list(pose.tolist()) for pose in pose_list
]

payload = {
"object_reach_target_poses": object_reach_target_poses,
"object_extra_reach_target_poses": object_extra_reach_target_poses,
}
with open(out_path, "w", encoding="utf-8") as f:
json.dump(payload, f, indent=2)
Expand All @@ -209,11 +137,9 @@ def main():

debug_path = os.path.abspath(args_cli.debug_poses_path)

# Always export first so user can edit immediately after startup.
_export_env_extra_poses_to_json(out_path=debug_path, pipeline=pipeline)
print(f"[reach_target_pose] Exported debug poses to: {debug_path}")

# Apply once (in case user edited the file between process start and export completion).
try:
_apply_live_poses(poses_path=debug_path, pipeline=pipeline)
except Exception as e:
Expand Down
113 changes: 40 additions & 73 deletions source/autosim/autosim/calibration/plan_sweep.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from autosim.core.pipeline import AutoSimPipeline
from autosim.core.registration import SkillRegistry
from autosim.skills.reach import ReachSkill

from .pose_sampler import OffsetSampler, PoseSampler

Expand All @@ -32,6 +33,27 @@ def _fmt_pose(vals: list[float]) -> str:
return "[" + ", ".join(f"{v:.4f}" for v in vals) + "]"


def _build_extra_target_link_goals(
reach_skill: ReachSkill,
activate_q: torch.Tensor,
target_poses_r: torch.Tensor,
env_extra_info,
) -> dict[str, torch.Tensor] | None:
"""Build batched extra link goals by reusing ReachSkill logic."""
if not reach_skill.cfg.extra_cfg.extra_target_link_names:
return None

goal_list = []
for target_pose in target_poses_r:
goal_list.append(reach_skill._build_extra_target_poses(activate_q, target_pose, env_extra_info)) # noqa: SLF001

first_goal = goal_list[0]
if first_goal is None:
return None

return {link_name: torch.stack([goal[link_name] for goal in goal_list], dim=0) for link_name in first_goal.keys()}


def reach_plan_sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg) -> list[dict[str, Any]]:
"""
Execute the pipeline step by step. When the reach_skill_index-th reach skill
Expand All @@ -41,14 +63,9 @@ def reach_plan_sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg) -> list[
All skills before the target reach skill are executed normally, so the
environment reflects the actual state at the point of interest.

For multi-arm tasks (i.e. when `EnvExtraInfo.object_extra_reach_target_poses` is set),
extra EE goals are swept in lock-step with the main EE: both use the same sampled
offsets, so the relative configuration between arms is preserved across all candidates.

Returns:
Top-k result rows sorted by plan quality. Each row contains:
"pose_oe": list[float] — main EE pose in object frame [x,y,z,qw,qx,qy,qz]
"extra_pose_oe/<ee_name>": list[float] — extra EE poses (multi-arm only)
"plan_success": bool
"traj_len": int | None — trajectory length (full planning only)
"position_error": float | None — IK position error (IK-only mode)
Expand All @@ -63,7 +80,6 @@ def reach_plan_sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg) -> list[
python reach_plan_sweep.py --reach_skill_index 1 ...
"""

# modified from pipeline.run() to only execute the reach skill at the specified index
pipeline.initialize()

decompose_result = pipeline.decompose()
Expand All @@ -88,7 +104,7 @@ def reach_plan_sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg) -> list[
if is_reach and reach_skill_counter == cfg.reach_skill_index:
obj_name = skill_info.target_object
obj_pose_idx = reach_count_per_object.get(obj_name, 0)
return _sweep(pipeline, cfg, obj_name, obj_pose_idx)
return _sweep(pipeline, cfg, obj_name, obj_pose_idx, skill)

goal = skill.extract_goal_from_info(skill_info, pipeline._env, pipeline._env_extra_info)
if is_reach:
Expand All @@ -110,24 +126,20 @@ def reach_plan_sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg) -> list[
)


def _sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg, obj_name: str, obj_pose_idx: int) -> list[dict[str, Any]]:
def _sweep(
pipeline: AutoSimPipeline,
cfg: ReachPlanSweepCfg,
obj_name: str,
obj_pose_idx: int,
reach_skill: ReachSkill,
) -> list[dict[str, Any]]:
"""
Core sweep logic. Called once the environment is in the correct pre-reach state.

Samples K candidate poses around the base reach target (object frame), transforms them to
robot root frame, then batch-plans with cuRobo. If `object_extra_reach_target_poses` is
defined for this object, extra EE goals are sampled with the same offsets (OffsetSampler
resets its RNG from `self.seed` on every `sample()` call, so identical offsets are applied
to every EE) and passed to the planner as `link_goals`.

Args:
pipeline: The pipeline providing env, planner, and extra info.
cfg: Sweep configuration (sampler, top_k, ik_only).
obj_name: Name of the target object in the scene.
obj_pose_idx: Index into `object_reach_target_poses[obj_name]` — which reach call this is.

Returns:
Top-k result rows (see `reach_plan_sweep` for row schema), sorted by plan quality.
robot root frame, then batch-plans with cuRobo. When configured, extra link goals are
generated from the live joint state using the same extra-target strategy as
`ReachSkill.extract_goal_from_info()`.
"""

env = pipeline._env
Expand All @@ -141,17 +153,13 @@ def _sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg, obj_name: str, obj
raise ValueError(f"pose index {obj_pose_idx} out of range for object '{obj_name}' ({len(pose_list)} poses).")

base_pose_oe = torch.as_tensor(pose_list[obj_pose_idx], device=env.device, dtype=torch.float32).view(7)

# Sample object-frame candidate poses around base
poses_oe = cfg.sampling.sample(base_pose_oe)
k = int(poses_oe.shape[0])

# Read live object pose in world frame
obj_pose_w = env.scene[obj_name].data.root_pose_w[env_id]
obj_pos_w = obj_pose_w[:3].view(1, 3).repeat(k, 1)
obj_quat_w = obj_pose_w[3:].view(1, 4).repeat(k, 1)

# object frame -> world frame -> robot root frame
target_pos_w, target_quat_w = PoseUtils.combine_frame_transforms(
obj_pos_w, obj_quat_w, poses_oe[:, :3], poses_oe[:, 3:]
)
Expand All @@ -160,52 +168,16 @@ def _sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg, obj_name: str, obj
rr_quat_w = robot_root_pose_w[3:].view(1, 4).repeat(k, 1)
target_pos_r, target_quat_r = PoseUtils.subtract_frame_transforms(rr_pos_w, rr_quat_w, target_pos_w, target_quat_w)

# Build current joint state in planner joint order (name-based mapping)
state = pipeline._build_world_state()
full_sim_joint_names = state.sim_joint_names
full_q = state.robot_joint_pos
full_qd = state.robot_joint_vel
activate_q, activate_qd = [], []
for joint_name in planner.target_joint_names:
if joint_name not in full_sim_joint_names:
raise ValueError(f"Planner joint '{joint_name}' not found in simulation joint names.")
idx = full_sim_joint_names.index(joint_name)
activate_q.append(full_q[idx])
activate_qd.append(full_qd[idx])
activate_q = torch.stack(activate_q, dim=0)
activate_qd = torch.stack(activate_qd, dim=0)

# Build extra EE goals from object_extra_reach_target_poses (multi-arm only).
# OffsetSampler.sample() resets its RNG from self.seed on every call, so sample i of any
# extra EE receives the same dx/dy/dz/dyaw offset as sample i of the main EE — keeping all
# arms coherently displaced across the entire candidate batch.
extra_poses_oe_dict: dict[str, torch.Tensor] = {}
link_goals: dict[str, torch.Tensor] | None = None

extra_pose_map = env_extra_info.object_extra_reach_target_poses.get(obj_name, {})
if extra_pose_map:
extra_link_pos_r: dict[str, torch.Tensor] = {}
extra_link_quat_r: dict[str, torch.Tensor] = {}
for ee_name, ee_pose_list in extra_pose_map.items():
if not (0 <= obj_pose_idx < len(ee_pose_list)):
raise ValueError(
f"extra pose index {obj_pose_idx} out of range for '{obj_name}'/'{ee_name}'"
f" ({len(ee_pose_list)} poses)."
)
ee_base = torch.as_tensor(ee_pose_list[obj_pose_idx], device=env.device, dtype=torch.float32).view(7)
ee_poses_oe = cfg.sampling.sample(ee_base)
extra_poses_oe_dict[ee_name] = ee_poses_oe

ee_pos_w, ee_quat_w = PoseUtils.combine_frame_transforms(
obj_pos_w, obj_quat_w, ee_poses_oe[:, :3], ee_poses_oe[:, 3:]
)
ee_pos_r, ee_quat_r = PoseUtils.subtract_frame_transforms(rr_pos_w, rr_quat_w, ee_pos_w, ee_quat_w)
extra_link_pos_r[ee_name] = ee_pos_r
extra_link_quat_r[ee_name] = ee_quat_r
activate_q, activate_qd = reach_skill._build_activate_joint_state( # noqa: SLF001
state.sim_joint_names, state.robot_joint_pos, state.robot_joint_vel
)
if activate_qd is None:
raise ValueError("activate_qd should not be None when sweep planning reach trajectories.")

link_goals = {ee: torch.cat([extra_link_pos_r[ee], extra_link_quat_r[ee]], dim=-1) for ee in extra_link_pos_r}
target_poses_r = torch.cat((target_pos_r, target_quat_r), dim=-1)
link_goals = _build_extra_target_link_goals(reach_skill, activate_q, target_poses_r, env_extra_info)

# Batch plan
t0 = time.time()
if cfg.ik_only:
result = planner.solve_ik_batch(target_pos_r, target_quat_r, link_goals=link_goals)
Expand All @@ -225,7 +197,6 @@ def _sweep(pipeline: AutoSimPipeline, cfg: ReachPlanSweepCfg, obj_name: str, obj
for i in range(k):
rows.append({
"pose_oe": _tensor_to_list(poses_oe[i]),
**{f"extra_pose_oe/{ee}": _tensor_to_list(extra_poses_oe_dict[ee][i]) for ee in extra_poses_oe_dict},
"plan_success": bool(success[i].item()),
"traj_len": int(traj_last[i]) if traj_last is not None else None,
"position_error": float(pos_err[i].item()) if pos_err is not None else None,
Expand All @@ -250,10 +221,6 @@ def _sort_key(r):
mark = "✓" if r["plan_success"] else "✗"
metric = f"traj_len={r['traj_len']}" if r["traj_len"] is not None else f"pos_err={r['position_error']:.4f}"
print(f" [{rank}] {mark} {_fmt_pose(r['pose_oe'])} # {metric}")
for ee in extra_poses_oe_dict:
print(f" {ee}: {_fmt_pose(r[f'extra_pose_oe/{ee}'])}")
if extra_poses_oe_dict:
print()
print(_SEP)

return top_k
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ def plan_motion(
current_q = current_q[:dof_needed]
current_qd = current_qd[:dof_needed]

joint_limits = self.motion_gen.kinematics.get_joint_limits()
current_q = torch.clamp(
self._to_curobo_device(current_q), joint_limits.position[0], joint_limits.position[1]
).to(current_q.device)

# build the target pose
goal = Pose(
position=self._to_curobo_device(target_pos),
Expand Down Expand Up @@ -385,8 +390,26 @@ def reset(self):
def get_ee_pose(self, current_q: torch.Tensor) -> Pose:
"""Get the end-effector pose of the robot."""

return self.get_link_pose(current_q, self.motion_gen.kinematics.ee_link)

def get_link_pose(self, current_q: torch.Tensor, link_name: str) -> Pose:
"""Get the pose of a specific link in the robot root frame."""

return self.get_link_poses(current_q, [link_name])[link_name]

def get_link_poses(self, current_q: torch.Tensor, link_names: list[str]) -> dict[str, Pose]:
"""Get the poses of specific links in the robot root frame."""

current_joint_state = JointState(
position=self._to_curobo_device(current_q), joint_names=self.target_joint_names
)
kin_state = self.motion_gen.compute_kinematics(current_joint_state)
return kin_state.link_poses[self.motion_gen.kinematics.ee_link]

missing_link_names = [link_name for link_name in link_names if link_name not in kin_state.link_poses]
if missing_link_names:
raise ValueError(
f"Unknown cuRobo link name(s): {missing_link_names}. Available links:"
f" {list(kin_state.link_poses.keys())}"
)

return {link_name: kin_state.link_poses[link_name] for link_name in link_names}
Loading
Loading