Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ class MidLevelPlannerOutput:
target_point_metric: np.ndarray
path_shapely: shapely.LineString
path_waypoints_metric: list
global_path_target_point_metric: np.ndarray


class MidLevelPlanner:
Expand Down Expand Up @@ -304,6 +305,9 @@ def plan_path(self, high_level_path_metric):
),
path_shapely=shapely.LineString(high_level_path_metric[:, :2]),
path_waypoints_metric=[],
global_path_target_point_metric=self.grid_cell_to_global_pose(
target_point_grid
),
)

# plan using a_star
Expand All @@ -321,7 +325,10 @@ def plan_path(self, high_level_path_metric):
self.grid_cell_to_global_pose((pt[0], pt[1])) for pt in a_star_path_grid
]
a_star_path_metric = np.array(a_star_path_metric).reshape(-1, 4)
a_star_path_metric = a_star_path_metric[:, :2]
a_star_path_metric = a_star_path_metric[:, :2].reshape(-1, 2)
# TODO(multy) temp fix for visualization if a star only output one point
# in fact, this is probably reaching a local min we plan can't go farther.
a_star_path_metric = np.vstack((self.robot_pose[:, :2], a_star_path_metric))
a_star_path_execute = a_star_path_metric
output.path_shapely = shapely.LineString(a_star_path_execute)
output.path_waypoints_metric = a_star_path_metric
Expand Down Expand Up @@ -547,5 +554,6 @@ def plan_path(self, high_level_path_metric):
target_point_metric=None,
path_shapely=shapely.LineString(high_level_path_metric[:, :2]),
path_waypoints_metric=None,
global_path_target_point_metric=None,
)
return True, output
4 changes: 1 addition & 3 deletions spot_tools/src/spot_executor/fake_spot.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,7 @@ def set_pose(self, pose):
self.pose = pose

def get_state(self):
raise NotImplementedError(
"get_state not implemented for FakeSpot (what is it supposed to return?)"
)
return self.state_client.get_robot_state()

def get_image_RGB(self, view="hand_color_image", show=False):
return self.get_image(view=view, show=show)
Expand Down
10 changes: 9 additions & 1 deletion spot_tools/src/spot_executor/spot_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,18 @@ def execute_follow(self, command, feedback):
)
path_wp = planning_output.path_waypoints_metric
target_point_metric = planning_output.target_point_metric
target_point_global_path_metric = (
planning_output.global_path_target_point_metric
)
if not mlp_success:
feedback.print("INFO", "Mid-level planner failed to find a path")
if target_point_metric is not None:
feedback.path_follow_MLP_feedback(path_wp, target_point_metric)
feedback.path_follow_MLP_feedback(
path_wp,
target_point_metric,
target_point_global_path_metric,
command_to_send[-1],
)
else:
ret = follow_trajectory_continuous(
self.spot_interface,
Expand Down
28 changes: 19 additions & 9 deletions spot_tools/src/spot_skills/navigation_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ def follow_trajectory_continuous(
rate = 10
# TODO: reactive loop, yeild out the loop to get info
while 1:
curr_time = time.time()
feedback.print("INFO", f"Timeout progress {curr_time - t0} / {timeout}")
if curr_time - t0 > timeout:
# TODO: I think we need to tell Spot to stop?
# TODO: Also, we should probably have a finer-grained
# check about making progress
feedback.print("INFO", "follow_trajectory_continuous timeout")
return False

# if mid_level_planner is not None:
# update path every (couple?) loop
mlp_success, planning_output = mid_level_planner.plan_path(
Expand All @@ -130,25 +139,25 @@ def follow_trajectory_continuous(
path = planning_output.path_shapely
path_wp = planning_output.path_waypoints_metric
target_point_metric = planning_output.target_point_metric
target_point_global_path_metric = (
planning_output.global_path_target_point_metric
)

if feedback is not None and target_point_metric is not None:
feedback.print("INFO", f"target_point_metric: {target_point_metric}")
feedback.path_follow_MLP_feedback(path_wp, target_point_metric)
feedback.path_follow_MLP_feedback(
path_wp,
target_point_metric,
target_point_global_path_metric,
waypoints_list[-1],
)

if not mlp_success:
feedback.print(
"INFO", "Mid-level planner failed, following high-level path directly"
)
if time.time() - t0 > timeout:
return False

path = shapely.LineString(waypoints_list[:, :2])

if time.time() - t0 > timeout:
# TODO: I think we need to tell Spot to stop?
# TODO: Also, we should probably have a finer-grained
# check about making progress
return False
tform_body_in_vision = spot.get_pose()
distance_from_end = np.linalg.norm(
end_pt - np.array([tform_body_in_vision[0], tform_body_in_vision[1]])
Expand All @@ -167,6 +176,7 @@ def follow_trajectory_continuous(
current_point = shapely.Point(tform_body_in_vision[0], tform_body_in_vision[1])
progress_distance = shapely.line_locate_point(path, current_point)
progress_point = shapely.line_interpolate_point(path, progress_distance)

# 2. get line point at lookahead
target_distance = progress_distance + lookahead_distance
target_point = shapely.line_interpolate_point(path, target_distance)
Expand Down
91 changes: 84 additions & 7 deletions spot_tools_ros/src/spot_tools_ros/spot_executor_ros.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
import threading
import time
Expand Down Expand Up @@ -78,7 +79,7 @@ def build_markers(pts, namespaces, frames, colors):


class RosFeedbackCollector:
def __init__(self, odom_frame: str, output_dir: str):
def __init__(self, odom_frame: str, output_dir: str, log_to_file_level):
self.pick_confirmation_event = threading.Event()
# self.pick_confirmation_response = False

Expand All @@ -91,6 +92,37 @@ def __init__(self, odom_frame: str, output_dir: str):
self.odom_frame = odom_frame

self.output_dir = output_dir
self.log_to_file_level = log_to_file_level

if log_to_file_level != "":
self.str_log_file = os.path.join(self.output_dir, "str_log.txt")

match log_to_file_level:
case "DEBUG":
logging_level = logging.DEBUG
case "INFO":
logging_level = logging.INFO
case "WARNING":
logging_level = logging.WARNING
case "ERROR":
logging_level = logging.ERROR
case _:
raise ValueError(f"Invalid log level {log_to_file_level}")

# Create a custom logger
self.file_logger = logging.getLogger(__name__)
self.file_logger.setLevel(logging_level)

# Create a file handler
file_handler = logging.FileHandler(self.str_log_file)
file_handler.setLevel(logging_level)

# Create a formatter and add it to the handler
formatter = logging.Formatter("[%(asctime)s] - %(levelname)s - %(message)s")
file_handler.setFormatter(formatter)

# Add the handler to the logger
self.file_logger.addHandler(file_handler)

def bounding_box_detection_feedback(
self,
Expand Down Expand Up @@ -158,14 +190,38 @@ def path_following_progress_feedback(self, progress_point, target_point):
frames = [self.odom_frame] * 2
self.progress_point_pub.publish(build_markers(pts, namespaces, frames, colors))

def path_follow_MLP_feedback(self, path, target_point_metric):
def path_follow_MLP_feedback(
self,
path,
target_point_metric,
target_point_global_traj_metric,
subgoal_target_point_metric,
):
self.mlp_path_publisher.publish(waypoints_to_path(self.odom_frame, path))
if (
target_point_metric is None
or target_point_global_traj_metric is None
or subgoal_target_point_metric is None
):
return
target_point_metric_flattened = Point([p[0] for p in target_point_metric[:3]])
target_point_global_traj_metric_flattened = Point(
[p[0] for p in target_point_global_traj_metric[:3]]
)
subgoal_target_point_metric_flattened = Point(subgoal_target_point_metric[:3])

pts = [target_point_metric_flattened]
namespaces = ["projected target point"]
colors = [[1, 0, 1]]
frames = [self.odom_frame]
pts = [
target_point_global_traj_metric_flattened,
target_point_metric_flattened,
subgoal_target_point_metric_flattened,
]
namespaces = [
"projected target point",
"actual target point",
"subgoal target point",
]
colors = [[1, 0, 1], [0, 1, 1], [0, 0, 1]]
frames = [self.odom_frame] * 3
self.mlp_target_publisher.publish(
build_markers(pts, namespaces, frames, colors)
)
Expand All @@ -187,6 +243,21 @@ def print(self, level, string):
raise ValueError(f"Invalid log level {level}")
log_fn(str(string))

# TODO(multy): quick logic to log everything we print in the executor
if self.log_to_file_level != "":
match level:
case "DEBUG":
file_logger_fn = self.file_logger.debug
case "INFO":
file_logger_fn = self.file_logger.info
case "WARNING":
file_logger_fn = self.file_logger.warning
case "ERROR":
file_logger_fn = self.file_logger.error
case _:
raise ValueError(f"Invalid log level {level}")
file_logger_fn(str(string))

def feedback_viz_2(self, y):
pass

Expand Down Expand Up @@ -368,7 +439,13 @@ def __init__(self):
output_dir = self.get_parameter("output_dir").value
assert output_dir != ""

self.feedback_collector = RosFeedbackCollector(self.odom_frame, output_dir)
# TODO(multy): quick way to log everything feedback print to log file
self.declare_parameter("log_to_file", "")
log_to_file = self.get_parameter("log_to_file").value

self.feedback_collector = RosFeedbackCollector(
self.odom_frame, output_dir, log_to_file
)
self.feedback_collector.register_publishers(self)

self.tf_buffer = tf2_ros.Buffer()
Expand Down
Loading