Source code for soccer_strategy.game_engine_2d_with_referee

import copy
import time

import rospy

from soccer_msgs.msg import GameState
from soccer_strategy.game_engine_2d import GameEngine2D
from soccer_strategy.game_engine_3d import decide_next_strategy
from soccer_strategy.robot import Robot
from soccer_strategy.robot_controlled_2d import RobotControlled2D


[docs]class GameEngine2DWithReferee(GameEngine2D): """ 2D simualtor for the game engine, used for testing strategy quickly without interfacing with webots """ PHYSICS_UPDATE_INTERVAL = 0.1 DISPLAY_UPDATE_INTERVAL = 0.01 GAMESTATE_LOOKUP = {getattr(GameState, key): key for key in dir(GameState) if key.startswith("GAMESTATE")} SECONDARY_STATE_LOOKUP = {getattr(GameState, key): key for key in dir(GameState) if key.startswith("STATE")} SECONDARY_STATE_MODE_LOOKUP = {getattr(GameState, key): key for key in dir(GameState) if key.startswith("MODE")}
[docs] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.game_state_subscribers = {} self.individual_robot_gamestates = {} self.individual_robot_last_strategy_update_time = {} self.last_display_update_time = rospy.Time.now() for team in [self.team1, self.team2]: for robot in team.robots: if team == self.team1: team_id = 16 else: team_id = 5 robot_id = robot.robot_id self.game_state_subscribers[(team, robot)] = rospy.Subscriber( f"/team_{team_id}/robot{robot_id}/gamestate", GameState, self.gamestate_callback_robot, robot ) self.individual_robot_gamestates[robot] = copy.deepcopy(self.gameState) self.individual_robot_last_strategy_update_time[robot] = rospy.Time.now()
[docs] def run(self): if self.display: for i in range(10): self.scene.update(self.team1.robots + self.team2.robots, self.ball) while not rospy.is_shutdown(): t_start = time.time() self.update_estimated_physics(self.team1.robots + self.team2.robots, self.ball) t1 = time.time() for teama, teamb in zip([self.team1, self.team2], [self.team2, self.team1]): for robot in teama.robots: strategy = self.robot_strategies[(robot.team, robot.robot_id)] gameState = self.individual_robot_gamestates[robot] new_strategy = decide_next_strategy(strategy=strategy, gameState=gameState, this_robot=robot) if type(strategy) != new_strategy: self.robot_strategies[(robot.team, robot.robot_id)] = new_strategy() rospy.loginfo(f"Team {robot.team} Robot {robot.robot_id} transitioned to strategy {str(new_strategy.__name__)}") if rospy.Time.now() - self.individual_robot_last_strategy_update_time[robot] > rospy.Duration(strategy.update_frequency): robot.active = True strategy.step_strategy(teama, teamb, gameState) robot.active = False self.individual_robot_last_strategy_update_time[robot] = rospy.Time.now() t2 = time.time() self.scene.update(self.team1.robots + self.team2.robots, self.ball) self.last_display_update_time = rospy.Time.now() rospy.loginfo_throttle(5, f"Physics Update Time {t1 - t_start} Strategy Update Time {t2 - t1}") return
[docs] def gamestate_callback_robot(self, gameState: GameState, robot: RobotControlled2D): """ Callback function for gamestate information from the webot's game controller :param gameState: Contains information about the game, whether it is playing or paused etc """ if robot.robot_id == 1 and robot.team == Robot.Team.FRIENDLY: if ( self.gameState.gameState != gameState.gameState or self.gameState.secondaryState != gameState.secondaryState or self.gameState.secondaryStateMode != gameState.secondaryStateMode ): self.gameState = gameState print( f"\033[92mGame State Changed: {GameEngine2DWithReferee.GAMESTATE_LOOKUP[gameState.gameState]}, Secondary State: {GameEngine2DWithReferee.SECONDARY_STATE_LOOKUP[gameState.secondaryState]}, Secondary State Mode: {GameEngine2DWithReferee.SECONDARY_STATE_MODE_LOOKUP[gameState.secondaryStateMode]}\033[0m" ) if gameState.penalty != GameState.PENALTY_NONE: robot.status = Robot.Status.PENALIZED elif robot.status in [Robot.Status.DISCONNECTED, Robot.Status.PENALIZED]: robot.status = Robot.Status.DETERMINING_SIDE self.individual_robot_gamestates[robot] = gameState