-
Notifications
You must be signed in to change notification settings - Fork 119
add remote teleoperation support for SO101 leader arm. #146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,169 @@ | ||
| # /// script | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this script include automatic detection and calibration logic, similar to the teleoperation in leisaac?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this might be a better UX instead of relying on another lerobot package. |
||
| # requires-python = ">=3.10" | ||
| # dependencies = ["leisaac[remote]"] | ||
| # /// | ||
| """Publish normalized SO101 leader arm joint states over ZMQ PUB. | ||
|
|
||
| Run this on the machine where the leader arm is physically connected. | ||
| A remote LeIsaac instance subscribes to receive joint states for teleoperation. | ||
|
|
||
| Usage: | ||
| python so101_joint_state_server.py --port /dev/ttyACM0 --id leader_arm --rate 50 | ||
| python so101_joint_state_server.py --port /dev/ttyACM0 --id leader_arm --recalibrate | ||
| """ | ||
|
|
||
| import argparse | ||
| import json | ||
| import os | ||
| import struct | ||
| import time | ||
|
|
||
| import zmq | ||
|
|
||
| try: | ||
| from leisaac.devices.lerobot.common.motors import ( | ||
| FeetechMotorsBus, | ||
| Motor, | ||
| MotorCalibration, | ||
| MotorNormMode, | ||
| ) | ||
| except ImportError as e: | ||
| raise ImportError("leisaac is required. Install with: pip install leisaac[remote]") from e | ||
|
|
||
| MOTORS = { | ||
| "shoulder_pan": Motor(1, "sts3215", MotorNormMode.RANGE_M100_100), | ||
| "shoulder_lift": Motor(2, "sts3215", MotorNormMode.RANGE_M100_100), | ||
| "elbow_flex": Motor(3, "sts3215", MotorNormMode.RANGE_M100_100), | ||
| "wrist_flex": Motor(4, "sts3215", MotorNormMode.RANGE_M100_100), | ||
| "wrist_roll": Motor(5, "sts3215", MotorNormMode.RANGE_M100_100), | ||
| "gripper": Motor(6, "sts3215", MotorNormMode.RANGE_0_100), | ||
| } | ||
| MOTOR_NAMES = list(MOTORS.keys()) | ||
| CACHE_DIR = os.path.join(os.path.dirname(__file__), ".cache") | ||
|
|
||
|
|
||
| def get_calibration_path(arm_id: str) -> str: | ||
| return os.path.join(CACHE_DIR, f"{arm_id}.json") | ||
|
|
||
|
|
||
| def load_calibration(path: str) -> dict[str, MotorCalibration]: | ||
| with open(path) as f: | ||
| data = json.load(f) | ||
| return { | ||
| name: MotorCalibration( | ||
| id=int(d["id"]), | ||
| drive_mode=int(d["drive_mode"]), | ||
| homing_offset=int(d["homing_offset"]), | ||
| range_min=int(d["range_min"]), | ||
| range_max=int(d["range_max"]), | ||
| ) | ||
| for name, d in data.items() | ||
| } | ||
|
|
||
|
|
||
| def save_calibration(path: str, calibration: dict[str, MotorCalibration]): | ||
| os.makedirs(os.path.dirname(path), exist_ok=True) | ||
| data = { | ||
| k: { | ||
| "id": v.id, | ||
| "drive_mode": v.drive_mode, | ||
| "homing_offset": v.homing_offset, | ||
| "range_min": v.range_min, | ||
| "range_max": v.range_max, | ||
| } | ||
| for k, v in calibration.items() | ||
| } | ||
| with open(path, "w") as f: | ||
| json.dump(data, f, indent=4) | ||
|
|
||
|
|
||
| def calibrate(port: str, arm_id: str) -> dict[str, MotorCalibration]: | ||
| """Run interactive calibration using leisaac's calibration logic.""" | ||
| print(f"\nRunning calibration for {arm_id} on {port}") | ||
|
|
||
| bus = FeetechMotorsBus(port=port, motors=MOTORS) | ||
| bus.connect() | ||
| bus.disable_torque() | ||
|
|
||
| input("Move the leader arm to the MIDDLE of its range of motion and press ENTER...") | ||
| homing_offsets = bus.set_half_turn_homings() | ||
|
|
||
| print("Move all joints through their FULL range of motion.") | ||
| print("Press ENTER when done...") | ||
| range_mins, range_maxes = bus.record_ranges_of_motion() | ||
|
|
||
| calibration = { | ||
| name: MotorCalibration( | ||
| id=MOTORS[name].id, | ||
| drive_mode=0, | ||
| homing_offset=homing_offsets[name], | ||
| range_min=range_mins[name], | ||
| range_max=range_maxes[name], | ||
| ) | ||
| for name in MOTOR_NAMES | ||
| } | ||
|
|
||
| path = get_calibration_path(arm_id) | ||
| save_calibration(path, calibration) | ||
| print(f"Calibration saved to {path}") | ||
|
|
||
| bus.disconnect() | ||
| return calibration | ||
|
|
||
|
|
||
| def main(): | ||
| parser = argparse.ArgumentParser(description="SO101 leader arm joint state publisher") | ||
| parser.add_argument("--port", default="/dev/ttyACM0") | ||
| parser.add_argument("--id", default="leader_arm", help="Calibration ID") | ||
| parser.add_argument("--bind", default="tcp://0.0.0.0:5556") | ||
| parser.add_argument("--rate", type=int, default=50, help="Publish rate in Hz") | ||
| parser.add_argument("--recalibrate", action="store_true", help="Force recalibration") | ||
| args = parser.parse_args() | ||
|
|
||
| calib_path = get_calibration_path(args.id) | ||
|
|
||
| if args.recalibrate or not os.path.exists(calib_path): | ||
| calibration = calibrate(args.port, args.id) | ||
| else: | ||
| calibration = load_calibration(calib_path) | ||
| print(f"Loaded calibration from {calib_path}") | ||
|
|
||
| bus = FeetechMotorsBus(port=args.port, motors=MOTORS, calibration=calibration) | ||
| bus.connect() | ||
| bus.disable_torque() | ||
|
|
||
| ctx = zmq.Context() | ||
| pub = ctx.socket(zmq.PUB) | ||
| pub.setsockopt(zmq.CONFLATE, 1) | ||
| pub.bind(args.bind) | ||
| print(f"Publishing on {args.bind} at {args.rate} Hz") | ||
| time.sleep(0.5) | ||
|
|
||
| interval = 1.0 / args.rate | ||
| count = 0 | ||
| next_t = time.monotonic() | ||
|
|
||
| try: | ||
| while True: | ||
| positions = bus.sync_read("Present_Position") | ||
| values = [positions[name] for name in MOTOR_NAMES] | ||
| pub.send(struct.pack("<6f", *values), zmq.NOBLOCK) | ||
|
|
||
| count += 1 | ||
| if count % (args.rate * 10) == 0: | ||
| print(f"{count} msgs sent") | ||
|
|
||
| next_t += interval | ||
| sleep_t = next_t - time.monotonic() | ||
| if sleep_t > 0: | ||
| time.sleep(sleep_t) | ||
| except KeyboardInterrupt: | ||
| print(f"\nDone: {count} msgs") | ||
| finally: | ||
| bus.disconnect() | ||
| pub.close() | ||
| ctx.term() | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| main() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,44 @@ | ||
| from .device_base import DeviceBase | ||
| from .gamepad import SO101Gamepad | ||
| from .keyboard import SO101Keyboard | ||
| from .lekiwi import LeKiwiGamepad, LeKiwiKeyboard, LeKiwiLeader | ||
| from .lerobot import BiSO101Leader, SO101Leader | ||
| from importlib import import_module | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| if TYPE_CHECKING: | ||
| from .device_base import DeviceBase | ||
| from .gamepad import SO101Gamepad | ||
| from .keyboard import SO101Keyboard | ||
| from .lekiwi import LeKiwiGamepad, LeKiwiKeyboard, LeKiwiLeader | ||
| from .lerobot import BiSO101Leader, SO101Leader, SO101LeaderRemote | ||
|
|
||
| __all__ = [ | ||
| "DeviceBase", | ||
| "SO101Gamepad", | ||
| "SO101Keyboard", | ||
| "LeKiwiGamepad", | ||
| "LeKiwiKeyboard", | ||
| "LeKiwiLeader", | ||
| "BiSO101Leader", | ||
| "SO101Leader", | ||
| "SO101LeaderRemote", | ||
| ] | ||
|
|
||
| _LAZY_IMPORTS = { | ||
| "DeviceBase": (".device_base", "DeviceBase"), | ||
| "SO101Gamepad": (".gamepad", "SO101Gamepad"), | ||
| "SO101Keyboard": (".keyboard", "SO101Keyboard"), | ||
| "LeKiwiGamepad": (".lekiwi", "LeKiwiGamepad"), | ||
| "LeKiwiKeyboard": (".lekiwi", "LeKiwiKeyboard"), | ||
| "LeKiwiLeader": (".lekiwi", "LeKiwiLeader"), | ||
| "BiSO101Leader": (".lerobot", "BiSO101Leader"), | ||
| "SO101Leader": (".lerobot", "SO101Leader"), | ||
| "SO101LeaderRemote": (".lerobot", "SO101LeaderRemote"), | ||
| } | ||
|
|
||
|
|
||
| def __getattr__(name): | ||
| if name not in _LAZY_IMPORTS: | ||
| raise AttributeError(f"module {__name__!r} has no attribute {name!r}") | ||
|
|
||
| module_name, attr_name = _LAZY_IMPORTS[name] | ||
| module = import_module(module_name, __name__) | ||
| value = getattr(module, attr_name) | ||
| globals()[name] = value | ||
| return value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uvfor now and keep it consistent with the project’s current environment management approach.pip install -e source/leisaac[remote]on the local machine?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These suggestions and questions are related. Let me explain why I used uv in the first place first. I used
uvbecause it supports PEP-508 which let me embed dependencies of the script. And you could run it withuv run xxx.pyand all dependencies will be installed in uv's venv. I do this because leisaac has a heavy set of dependencies. And since the local machine does not need any other features of leisaac except forwarding the motor states to the remote. It would be convenient for user to run the script withuv runwithout pulling a lot of dependencies.So back to your comments:
The dependency of leisaac is avoided on purpose. But I could change it to depend on leisaac if you thought it would better fit the project.
uvis only introduced for users to run the script easily. You could use any other tool that supports PEP-508. Or if you already have all dependencies installed, you could just run it directly withoutuv. Souvis not required, it's just my suggested way to run the script.No, you don't need to do any
pip installin the local machine.uv runtakes care of all dependencies embedded in the script. But I could improve the doc to make it more clear.Let me know what you think for 1., I will take your decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that installing a clean environment on the local machine (without Isaac Sim + Lab + torch) is better for joint_state_server.
The
pyproject.tomlin leisaac does not enforce the installation of Isaac Lab/Sim/torch. I think we can depend on leisaac (to reuse the calibration logic), and at the same time use try(ImportError)-except when importing related packages to skip them and print corresponding warning messages, or perform the imports within the functions where these dependencies are actually used.Ideally, users should only need to run
pip install leisaac[remote]without installing other complex dependencies to runso101_joint_state_server.py.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will try this path. I was assuming Isaac Sim/torch is a dependency of leisaac without confirmation. Thanks for the correction.