Source code for soccer_trajectories.soccer_trajectories

#!/usr/bin/env python3
import copy
import csv
import os
from typing import Optional

if "ROS_NAMESPACE" not in os.environ:
    os.environ["ROS_NAMESPACE"] = "/robot1"

import rospy
from rospy import ROSException
from scipy.interpolate import interp1d
from sensor_msgs.msg import JointState
from std_msgs.msg import Empty

from soccer_msgs.msg import FixedTrajectoryCommand, RobotState


[docs]class Trajectory: """Interpolates a CSV trajectory for multiple joints.""" RATE = 100
[docs] def __init__(self, trajectory_path: str, mirror=False): """Initialize a Trajectory from a CSV file at trajectory_path. if it's getup trajectory append the desired final pose so the robot is ready for next action expects rectangular shape for csv table""" self.terminate = False self.mirror = mirror self.splines = {} self.step_map = {} self.time_to_last_pose = 2 # seconds self.trajectory_path = trajectory_path last_joint_state = JointState() try: last_joint_state = rospy.wait_for_message("joint_states", JointState, timeout=2) except (ROSException, AttributeError) as ex: rospy.logerr(ex) except ValueError as ex: print(ex) with open(trajectory_path) as f: csv_traj = csv.reader(f) for row in csv_traj: joint_name = row[0] if joint_name == "comment": continue if joint_name == "time": self.times = list(map(float, row[1:])) self.times = [0] + self.times # + [self.times[-1] + self.time_to_last_pose] self.max_time = self.times[-1] else: joint_values = list(map(float, row[1:])) last_pose_value = 0 if joint_name in last_joint_state.name: last_pose_value = last_joint_state.position[last_joint_state.name.index(joint_name)] joint_values = [last_pose_value] + joint_values # + [last_pose_value] self.splines[joint_name] = interp1d(self.times, joint_values)
[docs] def get_setpoint(self, timestamp): """Get the position of each joint at timestamp. If timestamp < 0 or timestamp > self.total_time this will throw a ValueError. """ return {joint: spline(timestamp) for joint, spline in self.splines.items()}
[docs] def joints(self): """Returns a list of joints in this trajectory.""" return self.splines.keys()
def run(self, real_time=True): pub_all_motor = rospy.Publisher("joint_command", JointState, queue_size=2) rate = rospy.Rate(Trajectory.RATE) t = 0 while not rospy.is_shutdown() and t <= self.max_time + 0.01 and not self.terminate: js = JointState() js.header.stamp = rospy.Time.now() for joint, setpoint in self.get_setpoint(round(t * 100) / 100).items(): if self.mirror: if "left" in joint: joint = joint.replace("left", "right") elif "right" in joint: joint = joint.replace("right", "left") js.name.append(joint) js.position.append(float(setpoint)) try: pub_all_motor.publish(js) except ROSException as ex: print(ex) exit(0) t += 0.01 if int(t + 0.01) != int(t): print(f"Trajectory at t={t}") if real_time: rate.sleep()
[docs]class TrajectoryManager:
[docs] def __init__(self): use_sim_time_prefix = "_sim" if rospy.get_param("use_sim_time", "false") == "true" else "" self.trajectory_path = ( rospy.get_param_cached( "~trajectory_path", os.path.join(os.path.dirname(__file__), "../../trajectories/") + rospy.get_param_cached("robot_model", "bez1") ) + use_sim_time_prefix ) self.trajectory: Optional[Trajectory] = None self.command_subscriber = rospy.Subscriber("command", FixedTrajectoryCommand, self.command_callback, queue_size=1) self.robot_state_subscriber = rospy.Subscriber("state", RobotState, self.robot_state_callback, queue_size=1) self.finish_trajectory_publisher = rospy.Publisher("action_complete", Empty, queue_size=1)
def robot_state_callback(self, state: RobotState): if state.status in [RobotState.STATUS_PENALIZED]: if self.trajectory is not None: self.trajectory.terminate = True def command_callback(self, command: FixedTrajectoryCommand): if self.trajectory is not None: return path = self.trajectory_path + "/" + command.trajectory_name + ".csv" if not os.path.exists(path): rospy.logerr(f"Trajectory doesn't exist in path {path}") return self.trajectory = Trajectory(path, command.mirror)
if __name__ == "__main__": rospy.init_node("soccer_trajectories") trajectory_class = TrajectoryManager() r = rospy.Rate(100) while not rospy.is_shutdown(): if trajectory_class.trajectory is not None: rospy.loginfo("Running Trajectory: " + trajectory_class.trajectory.trajectory_path + f" {trajectory_class.trajectory.mirror}") trajectory_class.trajectory.run() rospy.loginfo("Finished Trajectory: " + trajectory_class.trajectory.trajectory_path) trajectory_class.finish_trajectory_publisher.publish() trajectory_class.trajectory = None r.sleep()