Source code for soccer_object_detection.object_detect_node

#!/usr/bin/env python3

import enum
import os
from math import nan

import numpy as np
from matplotlib import pyplot as plt
from rospy.impl.tcpros_base import DEFAULT_BUFF_SIZE

if "ROS_NAMESPACE" not in os.environ:
    os.environ["ROS_NAMESPACE"] = "/robot1"

from argparse import ArgumentParser

import cv2
import rospy
import torch
from cv_bridge import CvBridge
from rospy import ROSException
from sensor_msgs.msg import Image

from soccer_common.camera import Camera
from soccer_msgs.msg import BoundingBox, BoundingBoxes, GameState, RobotState


[docs]class Label(enum.IntEnum): # Defines output channels of model # Refer to class name enumeration in soccer_object_detection/config/Torso21.yaml BALL = 0 GOALPOST = 1 ROBOT = 2 L_INTERSECTION = 3 T_INTERSECTION = 4 X_INTERSECTION = 5 TOPBAR = 6
[docs]class bcolors: HEADER = "\033[95m" OKBLUE = "\033[94m" OKCYAN = "\033[96m" OKGREEN = "\033[92m" MAGENTA = "\u001b[35m" WARNING = "\033[93m" FAIL = "\033[91m" ENDC = "\033[0m" BOLD = "\033[1m" UNDERLINE = "\033[4m"
[docs]class ObjectDetectionNode(object): """ Detect ball, robot publish bounding boxes input: 480x640x4 bgra8 -> output: 3x200x150 """
[docs] def __init__(self, model_path): self.SOCCER_BALL = 0 self.CONFIDENCE_THRESHOLD = rospy.get_param("~ball_confidence_threshold", 0.75) torch.hub._validate_not_a_forked_repo = ( lambda a, b, c: True ) # https://discuss.pytorch.org/t/help-for-http-error-403-rate-limit-exceeded/125907 self.model = torch.hub.load("ultralytics/yolov5", "custom", path=model_path) if torch.cuda.is_available(): rospy.loginfo(f"{bcolors.OKGREEN}Using CUDA for object detection{bcolors.ENDC}") self.model.cuda() else: rospy.logwarn("Not using CUDA") self.robot_name = rospy.get_namespace()[1:-1] # remove '/' self.camera = Camera(self.robot_name) self.camera.reset_position() # Params self.br = CvBridge() self.pub_detection = rospy.Publisher("detection_image", Image, queue_size=1, latch=True) self.pub_boundingbox = rospy.Publisher("object_bounding_boxes", BoundingBoxes, queue_size=1, latch=True) self.image_subscriber = rospy.Subscriber( "camera/image_raw", Image, self.callback, queue_size=1, buff_size=DEFAULT_BUFF_SIZE * 64 ) # Large buff size (https://answers.ros.org/question/220502/image-subscriber-lag-despite-queue-1/) self.robot_state_subscriber = rospy.Subscriber("state", RobotState, self.robot_state_callback) self.robot_state = RobotState() self.game_state_subscriber = rospy.Subscriber("gamestate", GameState, self.game_state_callback) self.game_state = GameState()
def game_state_callback(self, game_state: GameState): self.game_state = game_state def robot_state_callback(self, robot_state: RobotState): self.robot_state = robot_state def callback(self, msg: Image): # webots: 480x640x4pixels if self.robot_state.status not in [ RobotState.STATUS_LOCALIZING, RobotState.STATUS_READY, RobotState.ROLE_UNASSIGNED, ]: return if self.game_state.gameState != GameState.GAMESTATE_PLAYING: return rospy.loginfo_once("Object Detection Receiving image") # width x height x channels (bgra8) image = self.br.imgmsg_to_cv2(msg) self.camera.reset_position(timestamp=msg.header.stamp) # cover horizon to help robot ignore things outside field cover_horizon_up_threshold = rospy.get_param("cover_horizon_up_threshold", 30) h = max(self.camera.calculateHorizonCoverArea() - cover_horizon_up_threshold, 0) if image is not None: # 1. preprocess image img = image[:, :, :3] # get rid of alpha channel img = img[..., ::-1] # convert bgr to rgb img = img[max(0, h + 1) :, :] # 2. inference results = self.model(img) bbs_msg = BoundingBoxes() id = 0 for prediction in results.xyxy[0]: x1, y1, x2, y2, confidence, img_class = prediction.cpu().numpy() y1 += h + 1 y2 += h + 1 if img_class in [label.value for label in Label] and confidence > self.CONFIDENCE_THRESHOLD: bb_msg = BoundingBox() bb_msg.xmin = round(x1) # top left of bounding box bb_msg.ymin = round(y1) bb_msg.xmax = round(x2) # bottom right of bounding box bb_msg.ymax = round(y2) bb_msg.probability = confidence bb_msg.id = id bb_msg.Class = str(int(img_class)) # TODO Joanne look the pixels of the image in addition to the bounding box, # calculate likely foot coordinate xy if bb_msg.Class == "2": # --- simple version just draw the box in bottom ratio of box to detect feet position--- # only look at bottom 1/3 of bounding box (assumption: bounding box is of a standing robot) # Calculate ymin value to start checking for black pixels if bb_msg.ymax < self.camera.resolution_y - 5: temp_ymin = round(bb_msg.ymax * 0.85 + bb_msg.ymin * 0.15) midpoint = [(bb_msg.xmax + bb_msg.xmin) / 2, (bb_msg.ymax + temp_ymin) / 2] bb_msg.ybase = round(midpoint[1]) bb_msg.xbase = round(midpoint[0]) bb_msg.obstacle_detected = True bbs_msg.bounding_boxes.append(bb_msg) id += 1 bbs_msg.header = msg.header try: if self.pub_detection.get_num_connections() > 0: detection_image = np.squeeze(results.render()) detection_image = np.concatenate((np.zeros((h + 1, msg.width, 3), detection_image.dtype), detection_image)) detection_image = detection_image[..., ::-1] # convert rgb to bgr self.pub_detection.publish(self.br.cv2_to_imgmsg(detection_image, encoding="bgr8")) if self.pub_boundingbox.get_num_connections() > 0 and len(bbs_msg.bounding_boxes) > 0: self.pub_boundingbox.publish(bbs_msg) except ROSException as re: print(re) exit(0)
if __name__ == "__main__": parser = ArgumentParser() parser.add_argument("--model", dest="model_path", default="../../models/July14.pt", help="pytorch model") parser.add_argument("--num-feat", dest="num_feat", default=10, help="specify model size of the neural network") args, unknown = parser.parse_known_args() rospy.init_node("object_detector") my_node = ObjectDetectionNode(args.model_path) try: rospy.spin() except ROSException as rx: exit(0)