Source code for soccer_strategy.robot_controlled_3d

import os
import time

import numpy as np
import rospy
import tf2_py
import tf.transformations
from geometry_msgs.msg import Pose, PoseArray, PoseStamped, PoseWithCovarianceStamped
from sensor_msgs.msg import Imu, Range
from std_msgs.msg import Bool, Empty

from soccer_common import Transformation
from soccer_msgs.msg import FixedTrajectoryCommand, RobotState
from soccer_strategy.ball import Ball
from soccer_strategy.obstacle import Obstacle
from soccer_strategy.robot import Robot
from soccer_strategy.robot_controlled import RobotControlled


[docs]class RobotControlled3D(RobotControlled):
[docs] def __init__(self, team, role, status): self.position_default = np.array([0, 0, 0]) super().__init__(team=team, role=role, status=status, position=self.position_default) # Subscribers self.amcl_pose_subscriber = rospy.Subscriber("amcl_pose", PoseWithCovarianceStamped, self.amcl_pose_callback) self.amcl_pose = None self.imu_subscriber = rospy.Subscriber("imu_filtered", Imu, self.imu_callback) self.action_completed_subscriber = rospy.Subscriber("action_complete", Empty, self.action_completed_callback, queue_size=1) self.head_centered_on_ball_subscriber = rospy.Subscriber("head_centered_on_ball", Empty, self.head_centered_on_ball_callback, queue_size=1) self.reset_robot_subscriber = rospy.Subscriber("reset_robot", PoseStamped, self.reset_robot_callback, queue_size=1) # Publishers self.robot_initial_pose_publisher = rospy.Publisher("initialpose", PoseWithCovarianceStamped, queue_size=1, latch=True) self.goal_publisher = rospy.Publisher("goal", PoseStamped, queue_size=1, latch=True) self.trajectory_publisher = rospy.Publisher("command", FixedTrajectoryCommand, queue_size=1, latch=True) self.kicking_range_publisher = rospy.Publisher("kicking_angle", Range, queue_size=1, latch=True) self.tf_listener = tf.TransformListener() self.robot_id = rospy.get_param("robot_id") self.robot_name = "robot " + str(self.robot_id) self.time_since_action_completed = rospy.Time(0) self.obstacles = PoseArray() self.update_robot_state_timer = rospy.Timer(rospy.Duration(0.2), self.update_robot_state, reset=True) self.robot_state_publisher = rospy.Publisher("state", RobotState, queue_size=1) self.active = True self.delocalized_threshold = rospy.get_param("delocalized_threshold", 0.15) self.relocalized_threshold = rospy.get_param("relocalized_threshold", 0.05) self.node_init_time = rospy.Time.now()
[docs] def set_navigation_position(self, goal_position): goal_position = self.shorten_navigation_position(goal_position) if not super().set_navigation_position(goal_position): return False p = PoseStamped() p.header.stamp = rospy.Time.now() p.header.frame_id = "world" p.pose.position.x = goal_position[0] p.pose.position.y = goal_position[1] p.pose.position.z = 0 angle_fixed = goal_position[2] q = tf.transformations.quaternion_about_axis(angle_fixed, (0, 0, 1)) p.pose.orientation.x = q[0] p.pose.orientation.y = q[1] p.pose.orientation.z = q[2] p.pose.orientation.w = q[3] rospy.loginfo("Sending New Goal: " + str(goal_position)) self.goal_position = goal_position self.goal_publisher.publish(p) return True
def reset_robot_callback(self, pose: PoseStamped): q = tf.transformations.euler_from_quaternion( [pose.pose.orientation.w, pose.pose.orientation.x, pose.pose.orientation.y, pose.pose.orientation.z] ) self.position = np.array([pose.pose.position.x, pose.pose.position.y, q[2]]) rospy.loginfo(f"Robot Reset Called to {pose.pose.position.x} {pose.pose.position.y} {q[2]} (self.position = {self.position}") if self.role == Robot.Role.UNASSIGNED: self.role = Robot.Role.STRIKER self.localized = True self.status = Robot.Status.READY self.reset_initial_position() def update_robot_state(self, _): # Get Ball Position from TF ground_truth = not bool(os.getenv("COMPETITION", True)) if ground_truth: rospy.loginfo_once("Using Ground Truth") else: rospy.loginfo_once("Using Actual Measurements") try: self.observed_ball = Ball() if ground_truth: self.observed_ball.last_observed_time_stamp = self.tf_listener.getLatestCommonTime("world", "robot" + str(self.robot_id) + "/ball_gt") ball_pose = self.tf_listener.lookupTransform( "world", "robot" + str(self.robot_id) + "/ball_gt", self.observed_ball.last_observed_time_stamp ) else: self.observed_ball.last_observed_time_stamp = self.tf_listener.getLatestCommonTime("world", "robot" + str(self.robot_id) + "/ball") ball_pose = self.tf_listener.lookupTransform( "world", "robot" + str(self.robot_id) + "/ball", self.observed_ball.last_observed_time_stamp ) self.observed_ball.position = np.array([ball_pose[0][0], ball_pose[0][1]]) except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException, tf2_py.TransformException): rospy.loginfo_throttle(30, "Still looking for ball in TF Tree") self.observed_ball = None # Get Obstacles from TF self.observed_obstacles.clear() more_obstacles = True obstacle_num = 0 while more_obstacles: try: obstacle_pose = self.tf_listener.lookupTransform( "world", "robot" + str(self.robot_id) + "/obstacle_" + str(obstacle_num), rospy.Time(0) ) o = Obstacle() o.position = np.array([obstacle_pose[0][0], obstacle_pose[0][1]]) self.observed_obstacles.append(o) except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException, tf2_py.TransformException): more_obstacles = False obstacle_num += 1 # Get Robot Position from TF trans = [self.position[0], self.position[1], 0] rot = tf.transformations.quaternion_from_euler(0, 0, self.position[2]) if self.status not in [Robot.Status.FALLEN_BACK, Robot.Status.FALLEN_SIDE, Robot.Status.FALLEN_FRONT, Robot.Status.GETTING_BACK_UP]: try: if ground_truth: (trans, rot) = self.tf_listener.lookupTransform("world", "robot" + str(self.robot_id) + "/base_footprint_gt", rospy.Time(0)) else: (trans, rot) = self.tf_listener.lookupTransform("world", "robot" + str(self.robot_id) + "/base_footprint", rospy.Time(0)) eul = tf.transformations.euler_from_quaternion(rot) self.position = np.array([trans[0], trans[1], eul[2]]) except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException): if rospy.Time.now() - self.node_init_time > rospy.Duration(5): rospy.logwarn_throttle(5, "Unable to locate robot in TF tree") # Publish Robot state info r = RobotState() r.header.stamp = rospy.Time.now() r.player_id = self.robot_id r.status = self.status r.role = self.role r.localized = self.localized r.pose.position.x = trans[0] r.pose.position.y = trans[1] r.pose.position.z = trans[2] r.pose.orientation.x = rot[0] r.pose.orientation.y = rot[1] r.pose.orientation.z = rot[2] r.pose.orientation.w = rot[3] if self.observed_ball is not None: r.ball_detected = True r.ball_pose.x = self.observed_ball.position[0] r.ball_pose.y = self.observed_ball.position[1] r.ball_pose.theta = 0 else: r.ball_detected = False r.ball_pose.x = 0 r.ball_pose.y = 0 r.ball_pose.theta = 0 self.robot_state_publisher.publish(r) pass def amcl_pose_callback(self, amcl_pose: PoseWithCovarianceStamped): self.amcl_pose = amcl_pose if self.status == Robot.Status.LOCALIZING: covariance_trace = np.sqrt(amcl_pose.pose.covariance[0] ** 2 + amcl_pose.pose.covariance[7] ** 2) rospy.logwarn_throttle(1, "Relocalizing, current cov trace: " + str(covariance_trace)) if covariance_trace < self.relocalized_threshold: rospy.loginfo("Relocalized") if self.role != Robot.Role.UNASSIGNED and self.localized: self.status = Robot.Status.READY else: self.status = Robot.Status.DETERMINING_SIDE def action_completed_callback(self, data): if self.status == Robot.Status.GETTING_BACK_UP: self.reset_initial_position(variance=0.3) self.status = Robot.Status.LOCALIZING elif self.status in [ Robot.Status.WALKING, Robot.Status.TERMINATING_WALK, Robot.Status.KICKING, ]: self.goal_position = None if self.amcl_pose is not None: covariance_trace = np.sqrt(self.amcl_pose.pose.covariance[0] ** 2 + self.amcl_pose.pose.covariance[7] ** 2) else: covariance_trace = 0 if covariance_trace >= self.delocalized_threshold: rospy.logwarn("Robot Delocalized, Sending Robot back to localizing, current cov trace: " + str(covariance_trace)) self.status = Robot.Status.LOCALIZING else: if self.role == Robot.Role.UNASSIGNED or not self.localized: self.status = Robot.Status.DETERMINING_SIDE else: self.status = Robot.Status.READY self.time_since_action_completed = rospy.Time.now() elif self.status == Robot.Status.PENALIZED: self.goal_position = None self.time_since_action_completed = rospy.Time.now() else: rospy.logerr("Invalid Action Completed " + str(self.status)) def head_centered_on_ball_callback(self, data): self.robot_focused_on_ball_time = rospy.Time.now() def imu_callback(self, msg): angle_threshold = np.pi / 4 # in radian t = Transformation([0, 0, 0], [msg.orientation.x, msg.orientation.y, msg.orientation.z, msg.orientation.w]) yaw, pitch, roll = t.orientation_euler if self.status in [ Robot.Status.DETERMINING_SIDE, Robot.Status.READY, Robot.Status.WALKING, Robot.Status.TERMINATING_WALK, Robot.Status.KICKING, Robot.Status.LOCALIZING, ]: if pitch < -angle_threshold: rospy.logwarn_throttle(1, f"Fallen Back: (R: {roll}, P: {pitch}, Y: {yaw}), {t.quaternion}") self.status = Robot.Status.FALLEN_BACK elif pitch > angle_threshold: rospy.logwarn_throttle(1, f"Fallen Front: (R: {roll}, P: {pitch}, Y: {yaw}), {t.quaternion}") self.status = Robot.Status.FALLEN_FRONT elif roll < -angle_threshold or roll > angle_threshold: rospy.logwarn_throttle(1, f"Fallen Side: (R: {roll}, P: {pitch}, Y: {yaw}), {t.quaternion}") self.status = Robot.Status.FALLEN_SIDE
[docs] def reset_initial_position(self, variance=0.02): position = self.position p = PoseWithCovarianceStamped() p.header.frame_id = "world" p.header.stamp = rospy.get_rostime() p.pose.pose.position.x = position[0] p.pose.pose.position.y = position[1] p.pose.pose.position.z = 0 angle_fixed = position[2] q = tf.transformations.quaternion_about_axis(angle_fixed, (0, 0, 1)) p.pose.pose.orientation.x = q[0] p.pose.pose.orientation.y = q[1] p.pose.pose.orientation.z = q[2] p.pose.pose.orientation.w = q[3] rospy.loginfo_throttle_identical(10, "Setting " + self.robot_name + " localization position " + str(position) + " orientation " + str(q)) # fmt: off p.pose.covariance = [variance ** 2, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, variance ** 2, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, (variance * 4) ** 2] # fmt: on self.robot_initial_pose_publisher.publish(p) rospy.sleep(1)
[docs] def kick(self, kick_velocity): return self.run_fixed_trajectory("rightkick")
def run_fixed_trajectory(self, trajectory_name="rightkick"): f = FixedTrajectoryCommand() f.trajectory_name = trajectory_name if not self.kick_with_right_foot: f.mirror = True self.trajectory_publisher.publish(f) if "kick" in trajectory_name: self.status = Robot.Status.KICKING else: self.status = Robot.Status.GETTING_BACK_UP rospy.loginfo(self.robot_name + " " + f.trajectory_name) def get_detected_obstacles(self): # TODO know if they are friendly or enemy robot obstacles = [] for i in range(1, 10): try: (trans, rot) = self.tf_listener.lookupTransform("world", self.robot_name + "/detected_robot_" + str(i), rospy.Time(0)) header = self.tf_listener.getLatestCommonTime("world", self.robot_name + "/detected_robot_" + str(i)) except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException) as ex: continue if abs(rospy.Time.now() - header) > rospy.Duration(0.5): continue obstacle_pose = Pose() obstacle_pose.position = trans obstacle_pose.orientation = rot self.obstacles.poses.append(obstacle_pose) eul = tf.transformations.euler_from_quaternion(rot) obstacle_position = [trans[0], trans[1], eul[2]] obstacles.append(obstacle_position) pass return obstacles
[docs] def can_kick(self, *args, **kwargs): # Initialize and create a Range visualizer for kicking angle r = Range() r.header.stamp = rospy.Time.now() r.header.frame_id = f"robot{self.robot_id}/base_footprint" r.field_of_view = self.min_kick_angle * 2 r.min_range = 0 r.max_range = self.min_kick_angle r.range = self.min_kick_distance r.radiation_type = 1 self.kicking_range_publisher.publish(r) return super().can_kick(*args, **kwargs)