diff --git a/README.md b/README.md index 17af617..03c867d 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,7 @@ Running CAWSR CAWSR can both be ran `locally` or `distributed`. Due to the high-spec requirements, it is recommended to run distributed if you do not meet the following minimum specs: - At least **10GB** VRAM and a modern GPU (2080 ti or newer) - We currently supports 20 and 30 series GPUs. Compatibility with the 40 series is current WIP. + - At least **32GB** RAM - A modern Intel or AMD CPU with at least 8 cores. diff --git a/cawsr.py b/cawsr.py index eb952c1..a1489e2 100644 --- a/cawsr.py +++ b/cawsr.py @@ -37,6 +37,7 @@ from srunner.scenarioconfigs.route_scenario_configuration import ( RouteScenarioConfiguration, ) +from srunner.tools.route_visualisation import visualise_route from srunner.tools import route_manipulation from srunner.objects.ego_vehicle import EgoVehicle from srunner.tools.log import LogUtil @@ -161,11 +162,24 @@ def run_scenario( # update the world CarlaDataProvider.set_world(self.carla_world) - logger.info("Spawning ego...") - ego = EgoVehicle(env_config) - actor = ego.spawn() - self.ego_vehicles.append(actor) - logger.info(f"Spawned ego with id: {actor.id}") + # attempt to spawn the ego 3 times for redundancy + max_attempts = 3 + for attempt in range(max_attempts): + try: + logger.info("Spawning ego...") + ego = EgoVehicle(env_config) + actor = ego.spawn() + self.ego_vehicles.append(actor) + logger.info(f"Spawned ego with id: {actor.id}") + break # exit loop if successful + except Exception as e: + logger.error(f"Failed to spawn ego on attempt {attempt + 1}: {e}") + if attempt == max_attempts - 1: + logger.error( + "Failed to spawn ego: Terminating scenario. This is most likely an issue with CARLA" + ) + raise + time.sleep(2) # client must tick to spawn actors self._tick_carla() @@ -194,6 +208,9 @@ def run_scenario( ) route_config.agent.set_global_plan(gps_route, route) # set agent route + # visualise the route in CARLA + visualise_route([waypoint[0] for waypoint in route]) + ego.prepare_ego(route[0][0]) # set location to first waypoint # allow the agent X ticks to initialize sensors and set the route @@ -420,6 +437,7 @@ def _cawsr_process( else: seed = 0 + env_config.iteration = run scenario_process = multiprocessing.Process( target=self.run_scenario, # need to catch connection exception args=( @@ -446,11 +464,12 @@ def _cawsr_process( self.results_manager.cleanup_xml() # copy over the recording from CARLA container - time.sleep(1) # ensure file is written + time.sleep(1) # ensure file is finished writing CARLAManager.fetch_file( "/home/carla/recording.log", self.results_manager.last_scenario, ) + time.sleep(1) # allow some time for docker to copy it over # fetch execution status of the scenario (failure or success) status = result["status"] diff --git a/paper.md b/paper.md index ab46dfb..429a845 100644 --- a/paper.md +++ b/paper.md @@ -34,7 +34,7 @@ bibliography: paper.bib # Summary -`CAWSR` (**C**ARLA-**A**uto**W**are-**S**cenario **R**unner) facilitates the simulation-based testing of the open-source autonomous driving system, Autoware, within CARLA, the state-of-the-art open-source driving simulator. Building on existing tools, this project introduces a research-oriented testing framework for the execution of complex driving scenarios, as well as supporting implementation of a wide range of verification strategies. +`CAWSR` (**C**ARLA-**A**uto**W**are-**S**cenario **R**unner) facilitates the simulation-based testing of the open-source autonomous driving system, Autoware, within CARLA, the state-of-the-art open-source driving simulator. Building on existing tools, this project introduces a research-oriented testing framework for the execution of complex driving scenarios, as well as supporting the implementation of a wide range of verification strategies. # Statement of Need @@ -67,10 +67,23 @@ This facilitates a wide range of verification strategies based on common metrics Lastly, it is worth noting that simulators can often introduce unintended nondeterminism, which leads to inconsistent test results [@9793395; @osikowicz2025empirically]. Therefore, `CAWSR` is designed to minimise such nondeterminism throughout the evaluation pipeline. +# Research Impact -# Tool Overview +`CAWSR` provides a significant research impact by bridging the gap between the widely-used CARLA simulator and the industry-grade Autoware ADS. +It addresses a critical bottleneck in the ADS testing research community by offering a modern replacement for the outdated Apollo/LGSVL workflow, which has lacked official support since 2022. -`CAWSR` is a fully synchronous testing framework that directly integrates the CARLA simulator, Scenario Runner (as the scenario executor), and Autoware (as the System Under Test) to facilitate autonomous driving testing research. The tool is distributed as a containerized deployment using Docker and currently supports two modes of operation: +It enhances research reproducibility by implementing a fully synchronous evaluation pipeline that minimises unintended nondeterminism in simulation-based testing. +To ensure community readiness and ease of use, it is distributed as a Docker container. + +Furthermore, by adopting CARLA Leaderboard metrics, `CAWSR` enables researchers to directly compare Autoware with other state-of-the-art driving agents in the CARLA environment. +It provides an essential foundation for the systematic evaluation of various testing approaches for ADS. + + + +# Software Design + + +`CAWSR` is a fully synchronous testing framework that directly integrates the CARLA simulator, Scenario Runner (as the scenario executor), and Autoware (as the System Under Test) to facilitate autonomous driving testing research. The tool is distributed as a containerized deployment using Docker to manage complex dependencies and simplify the setup process. Currently, two modes of operation are supported: 1. *Scenario Generation Mode:* Enables the dynamic generation and execution of scenarios (e.g. iterative scenario generation) provided by a user-defined algorithm. This is particularly useful for assessing the performance of new simulation-based ADS testing techniques. 2. *Benchmark Mode:* Allows the execution of a predefined set of scenario definitions provided by the user. This is useful for standardised evaluations and comparisons between different driving agents. @@ -85,7 +98,8 @@ The evaluation pipeline is engineered to be fully synchronous, minimising uninte - JSON Parser: Translates the *scenario_definition* (see \autoref{fig:scenario_domain}) into a Behavior Tree (BT). It utilises Scenario Runner's *Atomic Behaviours* and *Atomic Conditions* as modular primitives to define discrete actions (e.g., spawning pedestrians) and logic triggers. -- ScenarioManager: Orchestrates the simulation loop by evaluating the BT to update actor states and triggering CARLA simulation ticks. Execution terminates based on CARLA Leaderboard criteria [@carla_leaderboard], as summarised in \autoref{tab:termination_criteria}. Post-execution, the module calculates the Driving Score (DS) according to the official leaderboard metrics. + +- ScenarioManager [@carla_scenario_runner_2025]: Orchestrates the simulation loop by evaluating the BT to update actor states and triggering CARLA simulation ticks. Execution terminates based on CARLA Leaderboard criteria [@carla_leaderboard], as summarised in \autoref{tab:termination_criteria}. Post-execution, the module calculates the Driving Score (DS) according to the official leaderboard metrics. - Agent and CarlaBridge: The Agent manages the ROS2 connection to Autoware. At each timestep, the CarlaBridge [@carlaautowarebridge] transforms CARLA snapshots and sensor data into the Autoware coordinate system. Autoware processes these inputs to issue control commands, which the Agent then applies to the ego vehicle. @@ -106,6 +120,10 @@ This model is based on the format introduced by Scenario Runner, facilitating su To summarise, `CAWSR` provides ADS testing research community an easy to use Autoware evaluation pipeline. We hope that this work can facilitate the evaluation of new testing approaches on a state of the art driving system. +# AI Usage Disclosure + + +Generative AI tools were used in this work solely to support high-level research concepts and structural ideas. All software implementation, including the source code, architecture, and deployment scripts, was authored entirely by the researchers without AI assistance. # Acknowledgements diff --git a/paper.pdf b/paper.pdf index e22b0df..3e71f59 100644 Binary files a/paper.pdf and b/paper.pdf differ diff --git a/srunner/autoagents/autoware_agent.py b/srunner/autoagents/autoware_agent.py index 5692b91..cc229eb 100644 --- a/srunner/autoagents/autoware_agent.py +++ b/srunner/autoagents/autoware_agent.py @@ -17,6 +17,7 @@ from srunner.autoagents.autoware_carla_interface.carla_autoware import ( InitializeInterface, ) +from srunner.tools.CARLA_manager import CARLAManager import threading import rclpy @@ -46,6 +47,7 @@ def setup(self, config: EnvironmentConfig) -> None: rclpy.init(args=None) self.config = config + self.tick_delta = CARLAManager.FIXED_DELTA_SECONDS # initialise autoware state object self.autoware_state = autoware_state.AutowareState("ego_vehicle", None) @@ -56,7 +58,9 @@ def setup(self, config: EnvironmentConfig) -> None: self.carla_interface = InitializeInterface(self.config, self._node) self.state_node = state_node.StateNode(self.autoware_state, self._node_state) - self.state_node.reset_autoware(self.config.town, self.config.ego_name) + if config.iteration == 1: + # only reset autoware on the first run + self.state_node.reset_autoware(self.config.town, self.config.ego_name) self.route_node = route_node.RouteNode(self.autoware_state, self._node_state) self.autoware_node = autoware_node.AutowareNode( @@ -92,7 +96,6 @@ def set_route(self) -> None: self.goal_pose_world = self._global_plan_world_coord[-1] self.waypoints_world = self._global_plan_world_coord[:-1] - # autoware cannot handle many waypoints, becomes unreliable n_waypoints = len(self.waypoints_world) segment_size = int(n_waypoints / 3) @@ -172,7 +175,7 @@ def run_step_init(self) -> bool: def run_step(self) -> None: """Tick method containing all logic based on autoware state""" self.counter += 1 - if self.counter % 20 == 0: + if self.counter % int(1 / self.tick_delta) == 0: logger.info( f"Ticked 1 second game-time, actual tick is {(time.perf_counter_ns() - self.last_tick) / 1e6}ms" ) diff --git a/srunner/autoagents/autoware_carla_interface/carla_ros.py b/srunner/autoagents/autoware_carla_interface/carla_ros.py index 398f759..220a1ab 100644 --- a/srunner/autoagents/autoware_carla_interface/carla_ros.py +++ b/srunner/autoagents/autoware_carla_interface/carla_ros.py @@ -26,7 +26,6 @@ from geometry_msgs.msg import Pose from geometry_msgs.msg import PoseWithCovarianceStamped import numpy -import datetime import pathlib from rosgraph_msgs.msg import Clock from sensor_msgs.msg import CameraInfo @@ -54,6 +53,8 @@ SensorInterface, ) +from srunner.tools.CARLA_manager import CARLAManager + class carla_ros2_interface(object): def __init__(self, node): @@ -80,9 +81,11 @@ def __init__(self, node): "pose": 2, } self.publish_prev_times = { - sensor: datetime.datetime.now() for sensor in self.sensor_frequencies + sensor: GameTime.get_time() for sensor in self.sensor_frequencies } + self.delta = CARLAManager.FIXED_DELTA_SECONDS # delta in seconds + self.ros2_node = node self.clock_publisher = self.ros2_node.create_publisher(Clock, "/clock", 10) @@ -137,6 +140,11 @@ def __init__(self, node): self.pub_camera_info = self.ros2_node.create_publisher( CameraInfo, "/sensing/camera/traffic_light/camera_info", 1 ) + + # self.pub_camera_yolo_info = self.ros2_node.create_publisher( + # CameraInfo, "/sensing/camera/0/camera_info", 1 + # ) + elif sensor["type"] == "sensor.lidar.ray_cast": if sensor["id"] in self.sensor_frequencies: self.pub_lidar[sensor["id"]] = self.ros2_node.create_publisher( @@ -165,9 +173,9 @@ def __call__(self): return control def checkFrequency(self, sensor): - time_delta = ( - datetime.datetime.now() - self.publish_prev_times[sensor] - ).microseconds / 1000000.0 + # implement frequency check based on game time + time_delta = GameTime.get_time() - self.publish_prev_times[sensor] + if 1.0 / time_delta >= self.sensor_frequencies[sensor]: return True return False @@ -185,7 +193,7 @@ def lidar(self, carla_lidar_measurement, id_): """Transform the received lidar measurement into a ROS point cloud message.""" if self.checkFrequency(id_): return - self.publish_prev_times[id_] = datetime.datetime.now() + self.publish_prev_times[id_] = GameTime.get_time() header = self.get_msg_header(frame_id="velodyne_top_changed") fields = [ @@ -258,7 +266,7 @@ def pose(self): """Transform odometry data to Pose and publish Pose with Covariance message.""" if self.checkFrequency("pose"): return - self.publish_prev_times["pose"] = datetime.datetime.now() + self.publish_prev_times["pose"] = GameTime.get_time() header = self.get_msg_header(frame_id="map") out_pose_with_cov = PoseWithCovarianceStamped() @@ -335,7 +343,7 @@ def camera(self, carla_camera_data): if self.checkFrequency("camera"): return - self.publish_prev_times["camera"] = datetime.datetime.now() + self.publish_prev_times["camera"] = GameTime.get_time() image_data_array = numpy.ndarray( shape=(carla_camera_data.height, carla_camera_data.width, 4), @@ -356,7 +364,7 @@ def imu(self, carla_imu_measurement): """Transform a received imu measurement into a ROS Imu message and publish Imu message.""" if self.checkFrequency("imu"): return - self.publish_prev_times["imu"] = datetime.datetime.now() + self.publish_prev_times["imu"] = GameTime.get_time() imu_msg = Imu() imu_msg.header = self.get_msg_header(frame_id="tamagawa/imu_link_changed") @@ -416,7 +424,7 @@ def ego_status(self): if self.checkFrequency("status"): return - self.publish_prev_times["status"] = datetime.datetime.now() + self.publish_prev_times["status"] = GameTime.get_time() # convert velocity from cartesian to ego frame trans_mat = numpy.array(self.ego_actor.get_transform().get_matrix()).reshape( diff --git a/srunner/autoagents/autoware_carla_interface/modules/carla_wrapper.py b/srunner/autoagents/autoware_carla_interface/modules/carla_wrapper.py index 0fa4276..48a861a 100644 --- a/srunner/autoagents/autoware_carla_interface/modules/carla_wrapper.py +++ b/srunner/autoagents/autoware_carla_interface/modules/carla_wrapper.py @@ -23,6 +23,8 @@ from srunner.scenariomanager.carla_data_provider import CarlaDataProvider +logger = logging.getLogger("scenario-runner") + # Sensor Wrapper for Agent class SensorReceivedNoData(Exception): @@ -54,7 +56,7 @@ def __call__(self, data): elif isinstance(data, GenericMeasurement): self._parse_pseudo_sensor(data, self._tag) else: - logging.error("No callback method for this sensor.") + logger.error("No callback method for this sensor.") # Parsing CARLA physical Sensors def _parse_image_cb(self, image, tag): @@ -216,6 +218,13 @@ def setup_sensors(self, vehicle, debug_mode=False): yaw=sensor_spec["spawn_point"]["yaw"] + 0.0364, ) + bp.set_attribute( + "sensor_tick", str(round(1.0 / sensor_spec["publish_frequency"], 2)) + ) + logger.info( + f"Sensor {sensor_spec['id']} tick rate set to {str(round(1.0 / sensor_spec['publish_frequency'], 2))}" + ) + # create sensor sensor_transform = carla.Transform(sensor_location, sensor_rotation) sensor = CarlaDataProvider.get_world().spawn_actor( diff --git a/srunner/autoagents/autoware_carla_interface/objects/sensors.json b/srunner/autoagents/autoware_carla_interface/objects/sensors.json index 7103118..3b4e4b0 100644 --- a/srunner/autoagents/autoware_carla_interface/objects/sensors.json +++ b/srunner/autoagents/autoware_carla_interface/objects/sensors.json @@ -13,7 +13,8 @@ }, "image_size_x": 1920, "image_size_y": 1080, - "fov": 90.0 + "fov": 90.0, + "publish_frequency": 11.0 }, { "type": "sensor.lidar.ray_cast", @@ -32,7 +33,8 @@ "upper_fov": 10.0, "lower_fov": -30.0, "rotation_frequency": 20, - "noise_stddev": 0.0 + "noise_stddev": 0.0, + "publish_frequency": 11.0 }, { "type": "sensor.other.gnss", @@ -44,7 +46,8 @@ "roll": 0.0, "pitch": 0.0, "yaw": 0.0 - } + }, + "publish_frequency": 2.0 }, { "type": "sensor.other.imu", @@ -56,7 +59,8 @@ "roll": 0.0, "pitch": 0.0, "yaw": 0.0 - } + }, + "publish_frequency": 50.0 } ] } diff --git a/srunner/scenarioconfigs/environment_configuration.py b/srunner/scenarioconfigs/environment_configuration.py index 28a3548..74c0495 100644 --- a/srunner/scenarioconfigs/environment_configuration.py +++ b/srunner/scenarioconfigs/environment_configuration.py @@ -26,3 +26,4 @@ def __init__(self) -> None: self.ego_spawn: carla.Transform | None = None self.sensor_config: list[DefaultSensor] = [] self.route_id: int = 0 + self.iteration: int = 0 diff --git a/srunner/scenariomanager/scenarioatomics/atomic_criteria.py b/srunner/scenariomanager/scenarioatomics/atomic_criteria.py index 45dc961..952084d 100644 --- a/srunner/scenariomanager/scenarioatomics/atomic_criteria.py +++ b/srunner/scenariomanager/scenarioatomics/atomic_criteria.py @@ -21,7 +21,6 @@ import shapely.geometry import carla -from agents.tools.misc import get_speed from srunner.scenariomanager.carla_data_provider import CarlaDataProvider from srunner.scenariomanager.timer import GameTime @@ -29,7 +28,6 @@ class Criterion(py_trees.behaviour.Behaviour): - """ Base class for all criteria used to evaluate a scenario for success/failure @@ -46,11 +44,7 @@ class Criterion(py_trees.behaviour.Behaviour): - units: units of the 'actual_value'. This is a string and is used by the result writter """ - def __init__(self, - name, - actor, - optional=False, - terminate_on_failure=False): + def __init__(self, name, actor, optional=False, terminate_on_failure=False): super(Criterion, self).__init__(name) self.logger.debug("%s.__init__()" % (self.__class__.__name__)) @@ -58,7 +52,9 @@ def __init__(self, self.actor = actor self.optional = optional self._terminate_on_failure = terminate_on_failure - self.test_status = "INIT" # Either "INIT", "RUNNING", "SUCCESS", "ACCEPTABLE" or "FAILURE" + self.test_status = ( + "INIT" # Either "INIT", "RUNNING", "SUCCESS", "ACCEPTABLE" or "FAILURE" + ) # Attributes to compare the current state (actual_value), with the expected ones self.success_value = 0 @@ -78,14 +74,16 @@ def terminate(self, new_status): """ Terminate the criterion. Can be extended by the user-derived class """ - if self.test_status in ('RUNNING', 'INIT'): + if self.test_status in ("RUNNING", "INIT"): self.test_status = "SUCCESS" - self.logger.debug("%s.terminate()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.terminate()[%s->%s]" + % (self.__class__.__name__, self.status, new_status) + ) class MaxVelocityTest(Criterion): - """ This class contains an atomic test for maximum velocity. @@ -95,7 +93,9 @@ class MaxVelocityTest(Criterion): - optional [optional]: If True, the result is not considered for an overall pass/fail result """ - def __init__(self, actor, max_velocity, optional=False, name="CheckMaximumVelocity"): + def __init__( + self, actor, max_velocity, optional=False, name="CheckMaximumVelocity" + ): """ Setup actor and maximum allowed velovity """ @@ -123,7 +123,9 @@ def update(self): if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status @@ -141,7 +143,14 @@ class DrivenDistanceTest(Criterion): - optional [optional]: If True, the result is not considered for an overall pass/fail result """ - def __init__(self, actor, distance, acceptable_distance=None, optional=False, name="CheckDrivenDistance"): + def __init__( + self, + actor, + distance, + acceptable_distance=None, + optional=False, + name="CheckDrivenDistance", + ): """ Setup actor """ @@ -177,8 +186,10 @@ def update(self): if self.actual_value > self.success_value: self.test_status = "SUCCESS" - elif (self.acceptable_value is not None and - self.actual_value > self.acceptable_value): + elif ( + self.acceptable_value is not None + and self.actual_value > self.acceptable_value + ): self.test_status = "ACCEPTABLE" else: self.test_status = "RUNNING" @@ -186,7 +197,9 @@ def update(self): if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status @@ -201,7 +214,6 @@ def terminate(self, new_status): class AverageVelocityTest(Criterion): - """ This class contains an atomic test for average velocity. @@ -214,8 +226,14 @@ class AverageVelocityTest(Criterion): - optional [optional]: If True, the result is not considered for an overall pass/fail result """ - def __init__(self, actor, velocity, acceptable_velocity=None, optional=False, - name="CheckAverageVelocity"): + def __init__( + self, + actor, + velocity, + acceptable_velocity=None, + optional=False, + name="CheckAverageVelocity", + ): """ Setup actor and average velovity expected """ @@ -256,8 +274,10 @@ def update(self): if self.actual_value > self.success_value: self.test_status = "SUCCESS" - elif (self.acceptable_value is not None and - self.actual_value > self.acceptable_value): + elif ( + self.acceptable_value is not None + and self.actual_value > self.acceptable_value + ): self.test_status = "ACCEPTABLE" else: self.test_status = "RUNNING" @@ -265,7 +285,9 @@ def update(self): if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status @@ -279,7 +301,6 @@ def terminate(self, new_status): class CollisionTest(Criterion): - """ This class contains an atomic test for collisions. @@ -293,11 +314,22 @@ class CollisionTest(Criterion): """ COLLISION_RADIUS = 5 # Two collisions that happen within this distance count as one - MAX_ID_TIME = 5 # Two collisions with the same id that happen within this time count as one - EPSILON = 0.1 # Collisions at lower this speed won't be counted as the actor's fault - - def __init__(self, actor, other_actor=None, other_actor_type=None, - optional=False, terminate_on_failure=False, name="CollisionTest"): + MAX_ID_TIME = ( + 5 # Two collisions with the same id that happen within this time count as one + ) + EPSILON = ( + 0.1 # Collisions at lower this speed won't be counted as the actor's fault + ) + + def __init__( + self, + actor, + other_actor=None, + other_actor_type=None, + optional=False, + terminate_on_failure=False, + name="CollisionTest", + ): """ Construction with sensor setup """ @@ -316,8 +348,10 @@ def initialise(self): """ Creates the sensor and callback""" world = CarlaDataProvider.get_world() - blueprint = world.get_blueprint_library().find('sensor.other.collision') - self._collision_sensor = world.spawn_actor(blueprint, carla.Transform(), attach_to=self.actor) + blueprint = world.get_blueprint_library().find("sensor.other.collision") + self._collision_sensor = world.spawn_actor( + blueprint, carla.Transform(), attach_to=self.actor + ) self._collision_sensor.listen(lambda event: self._count_collisions(event)) super(CollisionTest, self).initialise() @@ -342,7 +376,9 @@ def update(self): if elapsed_time > self.MAX_ID_TIME: self._collision_id = None - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status @@ -356,7 +392,7 @@ def terminate(self, new_status): self._collision_sensor = None super(CollisionTest, self).terminate(new_status) - def _count_collisions(self, event): # pylint: disable=too-many-return-statements + def _count_collisions(self, event): # pylint: disable=too-many-return-statements """Update collision count""" actor_location = CarlaDataProvider.get_location(self.actor) @@ -366,10 +402,13 @@ def _count_collisions(self, event): # pylint: disable=too-many-return-statem if self._other_actor_type: if self._other_actor_type == "miscellaneous": # Special OpenScenario case - if "traffic" not in event.other_actor.type_id and "static" not in event.other_actor.type_id: + if ( + "traffic" not in event.other_actor.type_id + and "static" not in event.other_actor.type_id + ): return elif self._other_actor_type not in event.other_actor.type_id: - return + return # To avoid multiple counts of the same collision, filter some of them. if self._collision_id == event.other_actor.id: @@ -389,33 +428,40 @@ def _count_collisions(self, event): # pylint: disable=too-many-return-statem self._collision_time = GameTime.get_time() self._collision_location = actor_location - if event.other_actor.id != 0: # Number 0: static objects -> ignore it + if event.other_actor.id != 0: # Number 0: static objects -> ignore it self._collision_id = event.other_actor.id - if ('static' in event.other_actor.type_id or 'traffic' in event.other_actor.type_id) \ - and 'sidewalk' not in event.other_actor.type_id: + if ( + "static" in event.other_actor.type_id + or "traffic" in event.other_actor.type_id + ) and "sidewalk" not in event.other_actor.type_id: actor_type = TrafficEventType.COLLISION_STATIC - elif 'vehicle' in event.other_actor.type_id: + elif "vehicle" in event.other_actor.type_id: actor_type = TrafficEventType.COLLISION_VEHICLE - elif 'walker' in event.other_actor.type_id: + elif "walker" in event.other_actor.type_id: actor_type = TrafficEventType.COLLISION_PEDESTRIAN else: return - collision_event = TrafficEvent(event_type=actor_type, frame=GameTime.get_frame()) - collision_event.set_dict({'other_actor': event.other_actor, 'location': actor_location}) + collision_event = TrafficEvent( + event_type=actor_type, frame=GameTime.get_frame() + ) + collision_event.set_dict( + {"other_actor": event.other_actor, "location": actor_location} + ) collision_event.set_message( "Agent collided against object with type={} and id={} at (x={}, y={}, z={})".format( event.other_actor.type_id, event.other_actor.id, round(actor_location.x, 3), round(actor_location.y, 3), - round(actor_location.z, 3))) + round(actor_location.z, 3), + ) + ) self.events.append(collision_event) class ActorBlockedTest(Criterion): - """ This test will fail if the actor has had its linear velocity lower than a specific value for a specific amount of time @@ -426,7 +472,15 @@ class ActorBlockedTest(Criterion): - terminate_on_failure [optional]: If True, the complete scenario will terminate upon failure of this test """ - def __init__(self, actor, min_speed, max_time, name="ActorBlockedTest", optional=False, terminate_on_failure=False): + def __init__( + self, + actor, + min_speed, + max_time, + name="ActorBlockedTest", + optional=False, + terminate_on_failure=False, + ): """ Class constructor """ @@ -444,35 +498,46 @@ def update(self): new_status = py_trees.common.Status.RUNNING # Deactivate/Activate checking by blackboard message - active = py_trees.blackboard.Blackboard().get('AC_SwitchActorBlockedTest') + active = py_trees.blackboard.Blackboard().get("AC_SwitchActorBlockedTest") if active is not None: self._active = active self._time_last_valid_state = GameTime.get_time() - py_trees.blackboard.Blackboard().set("AC_SwitchActorBlockedTest", None, overwrite=True) + py_trees.blackboard.Blackboard().set( + "AC_SwitchActorBlockedTest", None, overwrite=True + ) if self._active: linear_speed = CarlaDataProvider.get_velocity(self.actor) if linear_speed is not None: if linear_speed < self._min_speed and self._time_last_valid_state: - if (GameTime.get_time() - self._time_last_valid_state) > self._max_time: + if ( + GameTime.get_time() - self._time_last_valid_state + ) > self._max_time: # The actor has been "blocked" for too long, save the data self.test_status = "FAILURE" vehicle_location = CarlaDataProvider.get_location(self.actor) - event = TrafficEvent(event_type=TrafficEventType.VEHICLE_BLOCKED, frame=GameTime.get_frame()) - event.set_message('Agent got blocked at (x={}, y={}, z={})'.format( - round(vehicle_location.x, 3), - round(vehicle_location.y, 3), - round(vehicle_location.z, 3)) + event = TrafficEvent( + event_type=TrafficEventType.VEHICLE_BLOCKED, + frame=GameTime.get_frame(), + ) + event.set_message( + "Agent got blocked at (x={}, y={}, z={})".format( + round(vehicle_location.x, 3), + round(vehicle_location.y, 3), + round(vehicle_location.z, 3), + ) ) - event.set_dict({'location': vehicle_location}) + event.set_dict({"location": vehicle_location}) self.events.append(event) else: self._time_last_valid_state = GameTime.get_time() if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status @@ -492,8 +557,10 @@ def __init__(self, actor, optional=False, name="CheckKeepLane"): super(KeepLaneTest, self).__init__(name, actor, optional) world = self.actor.get_world() - blueprint = world.get_blueprint_library().find('sensor.other.lane_invasion') - self._lane_sensor = world.spawn_actor(blueprint, carla.Transform(), attach_to=self.actor) + blueprint = world.get_blueprint_library().find("sensor.other.lane_invasion") + self._lane_sensor = world.spawn_actor( + blueprint, carla.Transform(), attach_to=self.actor + ) self._lane_sensor.listen(lambda event: self._count_lane_invasion(event)) def update(self): @@ -510,7 +577,9 @@ def update(self): if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status @@ -531,7 +600,6 @@ def _count_lane_invasion(self, event): class ReachedRegionTest(Criterion): - """ This class contains the reached region test The test is a success if the actor reaches a specified region @@ -565,7 +633,8 @@ def update(self): in_region = False if self.test_status != "SUCCESS": in_region = (location.x > self._min_x and location.x < self._max_x) and ( - location.y > self._min_y and location.y < self._max_y) + location.y > self._min_y and location.y < self._max_y + ) if in_region: self.test_status = "SUCCESS" else: @@ -574,12 +643,13 @@ def update(self): if self.test_status == "SUCCESS": new_status = py_trees.common.Status.SUCCESS - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status class OffRoadTest(Criterion): - """ Atomic containing a test to detect when an actor deviates from the driving lanes. This atomic can fail when actor has spent a specific time outside driving lanes (defined by OpenDRIVE). Simplified @@ -594,7 +664,14 @@ class OffRoadTest(Criterion): terminate_on_failure (bool): If True, the atomic will fail when the duration condition has been met. """ - def __init__(self, actor, duration=0, optional=False, terminate_on_failure=False, name="OffRoadTest"): + def __init__( + self, + actor, + duration=0, + optional=False, + terminate_on_failure=False, + name="OffRoadTest", + ): """ Setup of the variables """ @@ -622,14 +699,9 @@ def update(self): current_location = CarlaDataProvider.get_location(self.actor) # Get the waypoint at the current location to see if the actor is offroad - drive_waypoint = self._map.get_waypoint( - current_location, - project_to_road=False - ) + drive_waypoint = self._map.get_waypoint(current_location, project_to_road=False) park_waypoint = self._map.get_waypoint( - current_location, - project_to_road=False, - lane_type=carla.LaneType.Parking + current_location, project_to_road=False, lane_type=carla.LaneType.Parking ) if drive_waypoint or park_waypoint: self._offroad = False @@ -653,13 +725,14 @@ def update(self): if self._terminate_on_failure and self.test_status == "FAILURE": new_status = py_trees.common.Status.FAILURE - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status class EndofRoadTest(Criterion): - """ Atomic containing a test to detect when an actor has changed to a different road @@ -672,7 +745,14 @@ class EndofRoadTest(Criterion): terminate_on_failure (bool): If True, the atomic will fail when the duration condition has been met. """ - def __init__(self, actor, duration=0, optional=False, terminate_on_failure=False, name="EndofRoadTest"): + def __init__( + self, + actor, + duration=0, + optional=False, + terminate_on_failure=False, + name="EndofRoadTest", + ): """ Setup of the variables """ @@ -707,7 +787,6 @@ def update(self): else: # Wait until the actor has left the road if self._road_id != current_waypoint.road_id or self._start_time: - # Start counting if self._start_time is None: self._start_time = GameTime.get_time() @@ -721,12 +800,13 @@ def update(self): self.actual_value += 1 return py_trees.common.Status.SUCCESS - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status class OnSidewalkTest(Criterion): - """ Atomic containing a test to detect sidewalk invasions of a specific actor. This atomic can fail when actor has spent a specific time outside driving lanes (defined by OpenDRIVE). @@ -740,11 +820,20 @@ class OnSidewalkTest(Criterion): terminate_on_failure (bool): If True, the atomic will fail when the duration condition has been met. """ - def __init__(self, actor, duration=0, optional=False, terminate_on_failure=False, name="OnSidewalkTest"): + def __init__( + self, + actor, + duration=0, + optional=False, + terminate_on_failure=False, + name="OnSidewalkTest", + ): """ Construction with sensor setup """ - super(OnSidewalkTest, self).__init__(name, actor, optional, terminate_on_failure) + super(OnSidewalkTest, self).__init__( + name, actor, optional, terminate_on_failure + ) self._map = CarlaDataProvider.get_map() self._onsidewalk_active = False @@ -787,12 +876,16 @@ def update(self): self._sidewalk_start_location = current_loc # Case 2) Not inside allowed zones (Driving and Parking) - elif current_wp.lane_type not in (carla.LaneType.Driving, carla.LaneType.Parking): - + elif current_wp.lane_type not in ( + carla.LaneType.Driving, + carla.LaneType.Parking, + ): # Get the vertices of the vehicle heading_vec = current_tra.get_forward_vector() heading_vec.z = 0 - heading_vec = heading_vec / math.sqrt(math.pow(heading_vec.x, 2) + math.pow(heading_vec.y, 2)) + heading_vec = heading_vec / math.sqrt( + math.pow(heading_vec.x, 2) + math.pow(heading_vec.y, 2) + ) perpendicular_vec = carla.Vector3D(-heading_vec.y, heading_vec.x, 0) extent = self.actor.bounding_box.extent @@ -802,23 +895,37 @@ def update(self): bbox = [ current_loc + carla.Location(x_boundary_vector - y_boundary_vector), current_loc + carla.Location(x_boundary_vector + y_boundary_vector), - current_loc + carla.Location(-1 * x_boundary_vector - y_boundary_vector), - current_loc + carla.Location(-1 * x_boundary_vector + y_boundary_vector)] + current_loc + + carla.Location(-1 * x_boundary_vector - y_boundary_vector), + current_loc + + carla.Location(-1 * x_boundary_vector + y_boundary_vector), + ] bbox_wp = [ self._map.get_waypoint(bbox[0], lane_type=carla.LaneType.Any), self._map.get_waypoint(bbox[1], lane_type=carla.LaneType.Any), self._map.get_waypoint(bbox[2], lane_type=carla.LaneType.Any), - self._map.get_waypoint(bbox[3], lane_type=carla.LaneType.Any)] + self._map.get_waypoint(bbox[3], lane_type=carla.LaneType.Any), + ] - lane_type_list = [bbox_wp[0].lane_type, bbox_wp[1].lane_type, bbox_wp[2].lane_type, bbox_wp[3].lane_type] + lane_type_list = [ + bbox_wp[0].lane_type, + bbox_wp[1].lane_type, + bbox_wp[2].lane_type, + bbox_wp[3].lane_type, + ] # Case 2.1) Not quite outside yet - if bbox_wp[0].lane_type == (carla.LaneType.Driving or carla.LaneType.Parking) \ - or bbox_wp[1].lane_type == (carla.LaneType.Driving or carla.LaneType.Parking) \ - or bbox_wp[2].lane_type == (carla.LaneType.Driving or carla.LaneType.Parking) \ - or bbox_wp[3].lane_type == (carla.LaneType.Driving or carla.LaneType.Parking): - + if ( + bbox_wp[0].lane_type + == (carla.LaneType.Driving or carla.LaneType.Parking) + or bbox_wp[1].lane_type + == (carla.LaneType.Driving or carla.LaneType.Parking) + or bbox_wp[2].lane_type + == (carla.LaneType.Driving or carla.LaneType.Parking) + or bbox_wp[3].lane_type + == (carla.LaneType.Driving or carla.LaneType.Parking) + ): self._onsidewalk_active = False self._outside_lane_active = False @@ -829,11 +936,12 @@ def update(self): self._sidewalk_start_location = current_loc else: - distance_vehicle_wp = current_loc.distance(current_wp.transform.location) + distance_vehicle_wp = current_loc.distance( + current_wp.transform.location + ) # Case 2.3) Outside lane if distance_vehicle_wp >= current_wp.lane_width / 2: - if not self._outside_lane_active: self._outside_lane_active = True self._outside_lane_start_location = current_loc @@ -848,15 +956,15 @@ def update(self): # Check for false positives at junctions if current_wp.is_junction: distance_vehicle_wp = math.sqrt( - math.pow(current_wp.transform.location.x - current_loc.x, 2) + - math.pow(current_wp.transform.location.y - current_loc.y, 2)) + math.pow(current_wp.transform.location.x - current_loc.x, 2) + + math.pow(current_wp.transform.location.y - current_loc.y, 2) + ) if distance_vehicle_wp <= current_wp.lane_width / 2: self._onsidewalk_active = False self._outside_lane_active = False # Else, do nothing, the waypoint is too far to consider it a correct position else: - self._onsidewalk_active = False self._outside_lane_active = False @@ -875,8 +983,12 @@ def update(self): self.test_status = "FAILURE" # Update the distances - distance_vector = CarlaDataProvider.get_location(self.actor) - self._actor_location - distance = math.sqrt(math.pow(distance_vector.x, 2) + math.pow(distance_vector.y, 2)) + distance_vector = ( + CarlaDataProvider.get_location(self.actor) - self._actor_location + ) + distance = math.sqrt( + math.pow(distance_vector.x, 2) + math.pow(distance_vector.y, 2) + ) if distance >= 0.02: # Used to avoid micro-changes adding to considerable sums self._actor_location = CarlaDataProvider.get_location(self.actor) @@ -889,14 +1001,22 @@ def update(self): # Register the sidewalk event if not self._onsidewalk_active and self._wrong_sidewalk_distance > 0: - self.actual_value += 1 - onsidewalk_event = TrafficEvent(event_type=TrafficEventType.ON_SIDEWALK_INFRACTION, frame=GameTime.get_frame()) + onsidewalk_event = TrafficEvent( + event_type=TrafficEventType.ON_SIDEWALK_INFRACTION, + frame=GameTime.get_frame(), + ) self._set_event_message( - onsidewalk_event, self._sidewalk_start_location, self._wrong_sidewalk_distance) + onsidewalk_event, + self._sidewalk_start_location, + self._wrong_sidewalk_distance, + ) self._set_event_dict( - onsidewalk_event, self._sidewalk_start_location, self._wrong_sidewalk_distance) + onsidewalk_event, + self._sidewalk_start_location, + self._wrong_sidewalk_distance, + ) self._onsidewalk_active = False self._wrong_sidewalk_distance = 0 @@ -904,20 +1024,30 @@ def update(self): # Register the outside of a lane event if not self._outside_lane_active and self._wrong_outside_lane_distance > 0: - self.actual_value += 1 - outsidelane_event = TrafficEvent(event_type=TrafficEventType.OUTSIDE_LANE_INFRACTION, frame=GameTime.get_frame()) + outsidelane_event = TrafficEvent( + event_type=TrafficEventType.OUTSIDE_LANE_INFRACTION, + frame=GameTime.get_frame(), + ) self._set_event_message( - outsidelane_event, self._outside_lane_start_location, self._wrong_outside_lane_distance) + outsidelane_event, + self._outside_lane_start_location, + self._wrong_outside_lane_distance, + ) self._set_event_dict( - outsidelane_event, self._outside_lane_start_location, self._wrong_outside_lane_distance) + outsidelane_event, + self._outside_lane_start_location, + self._wrong_outside_lane_distance, + ) self._outside_lane_active = False self._wrong_outside_lane_distance = 0 self.events.append(outsidelane_event) - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status @@ -927,14 +1057,22 @@ def terminate(self, new_status): """ # If currently at a sidewalk, register the event if self._onsidewalk_active: - self.actual_value += 1 - onsidewalk_event = TrafficEvent(event_type=TrafficEventType.ON_SIDEWALK_INFRACTION, frame=GameTime.get_frame()) + onsidewalk_event = TrafficEvent( + event_type=TrafficEventType.ON_SIDEWALK_INFRACTION, + frame=GameTime.get_frame(), + ) self._set_event_message( - onsidewalk_event, self._sidewalk_start_location, self._wrong_sidewalk_distance) + onsidewalk_event, + self._sidewalk_start_location, + self._wrong_sidewalk_distance, + ) self._set_event_dict( - onsidewalk_event, self._sidewalk_start_location, self._wrong_sidewalk_distance) + onsidewalk_event, + self._sidewalk_start_location, + self._wrong_sidewalk_distance, + ) self._onsidewalk_active = False self._wrong_sidewalk_distance = 0 @@ -942,14 +1080,22 @@ def terminate(self, new_status): # If currently outside of our lane, register the event if self._outside_lane_active: - self.actual_value += 1 - outsidelane_event = TrafficEvent(event_type=TrafficEventType.OUTSIDE_LANE_INFRACTION, frame=GameTime.get_frame()) + outsidelane_event = TrafficEvent( + event_type=TrafficEventType.OUTSIDE_LANE_INFRACTION, + frame=GameTime.get_frame(), + ) self._set_event_message( - outsidelane_event, self._outside_lane_start_location, self._wrong_outside_lane_distance) + outsidelane_event, + self._outside_lane_start_location, + self._wrong_outside_lane_distance, + ) self._set_event_dict( - outsidelane_event, self._outside_lane_start_location, self._wrong_outside_lane_distance) + outsidelane_event, + self._outside_lane_start_location, + self._wrong_outside_lane_distance, + ) self._outside_lane_active = False self._wrong_outside_lane_distance = 0 @@ -962,27 +1108,28 @@ def _set_event_message(self, event, location, distance): Sets the message of the event """ if event.get_type() == TrafficEventType.ON_SIDEWALK_INFRACTION: - message_start = 'Agent invaded the sidewalk' + message_start = "Agent invaded the sidewalk" else: - message_start = 'Agent went outside the lane' + message_start = "Agent went outside the lane" event.set_message( - '{} for about {} meters, starting at (x={}, y={}, z={})'.format( + "{} for about {} meters, starting at (x={}, y={}, z={})".format( message_start, round(distance, 3), round(location.x, 3), round(location.y, 3), - round(location.z, 3))) + round(location.z, 3), + ) + ) def _set_event_dict(self, event, location, distance): """ Sets the dictionary of the event """ - event.set_dict({'location': location, 'distance': distance}) + event.set_dict({"location": location, "distance": distance}) class OutsideRouteLanesTest(Criterion): - """ Atomic to detect if the vehicle is either on a sidewalk or at a wrong lane. The distance spent outside is computed and it is returned as a percentage of the route distance traveled. @@ -993,10 +1140,16 @@ class OutsideRouteLanesTest(Criterion): optional (bool): If True, the result is not considered for an overall pass/fail result """ - ALLOWED_OUT_DISTANCE = 0.5 # At least 0.5, due to the mini-shoulder between lanes and sidewalks + ALLOWED_OUT_DISTANCE = ( + 0.5 # At least 0.5, due to the mini-shoulder between lanes and sidewalks + ) MAX_VEHICLE_ANGLE = 120.0 # Maximum angle between the yaw and waypoint lane - MAX_WAYPOINT_ANGLE = 150.0 # Maximum change between the yaw-lane angle between frames - WINDOWS_SIZE = 3 # Amount of additional waypoints checked (in case the first on fails) + MAX_WAYPOINT_ANGLE = ( + 150.0 # Maximum change between the yaw-lane angle between frames + ) + WINDOWS_SIZE = ( + 3 # Amount of additional waypoints checked (in case the first on fails) + ) def __init__(self, actor, route, optional=False, name="OutsideRouteLanesTest"): """ @@ -1040,20 +1193,26 @@ def update(self): return new_status # Deactivate / activate checking by blackboard message - active = py_trees.blackboard.Blackboard().get('AC_SwitchWrongDirectionTest') + active = py_trees.blackboard.Blackboard().get("AC_SwitchWrongDirectionTest") if active is not None: self._wrong_direction_active = active - py_trees.blackboard.Blackboard().set("AC_SwitchWrongDirectionTest", None, overwrite=True) + py_trees.blackboard.Blackboard().set( + "AC_SwitchWrongDirectionTest", None, overwrite=True + ) self._is_outside_driving_lanes(location) self._is_at_wrong_lane(location) - if self._outside_lane_active or (self._wrong_direction_active and self._wrong_lane_active): + if self._outside_lane_active or ( + self._wrong_direction_active and self._wrong_lane_active + ): self.test_status = "FAILURE" # Get the traveled distance - for index in range(self._current_index + 1, - min(self._current_index + self.WINDOWS_SIZE + 1, self._route_length)): + for index in range( + self._current_index + 1, + min(self._current_index + self.WINDOWS_SIZE + 1, self._route_length), + ): # Get the dot product to know if it has passed this location route_transform = self._route_transforms[index] route_location = route_transform.location @@ -1063,12 +1222,16 @@ def update(self): if wp_veh.dot(wp_dir) > 0: # Get the distance traveled and add it to the total distance - prev_route_location = self._route_transforms[self._current_index].location + prev_route_location = self._route_transforms[ + self._current_index + ].location new_dist = prev_route_location.distance(route_location) self._total_distance += new_dist # And to the wrong one if outside route lanes - if self._outside_lane_active or (self._wrong_direction_active and self._wrong_lane_active): + if self._outside_lane_active or ( + self._wrong_direction_active and self._wrong_lane_active + ): self._wrong_distance += new_dist if self._wrong_distance: @@ -1076,7 +1239,9 @@ def update(self): self._current_index = index - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status def _set_traffic_event(self): @@ -1084,7 +1249,10 @@ def _set_traffic_event(self): Creates the traffic event / updates it """ if self._traffic_event is None: - self._traffic_event = TrafficEvent(event_type=TrafficEventType.OUTSIDE_ROUTE_LANES_INFRACTION, frame=GameTime.get_frame()) + self._traffic_event = TrafficEvent( + event_type=TrafficEventType.OUTSIDE_ROUTE_LANES_INFRACTION, + frame=GameTime.get_frame(), + ) self.events.append(self._traffic_event) percentage = self._wrong_distance / self._total_distance * 100 @@ -1093,13 +1261,13 @@ def _set_traffic_event(self): self._traffic_event.set_message( "Agent went outside its route lanes for about {} meters " "({}% of the completed route)".format( - round(self._wrong_distance, 3), - round(percentage, 2))) + round(self._wrong_distance, 3), round(percentage, 2) + ) + ) - self._traffic_event.set_dict({ - 'distance': self._wrong_distance, - 'percentage': percentage - }) + self._traffic_event.set_dict( + {"distance": self._wrong_distance, "percentage": percentage} + ) self._traffic_event.set_frame(GameTime.get_frame()) @@ -1114,7 +1282,7 @@ def _is_outside_driving_lanes(self, location): if parking_wp is not None: # Some towns have no parking parking_distance = location.distance(parking_wp.transform.location) else: - parking_distance = float('inf') + parking_distance = float("inf") if driving_distance >= parking_distance: distance = parking_distance @@ -1123,7 +1291,9 @@ def _is_outside_driving_lanes(self, location): distance = driving_distance lane_width = driving_wp.lane_width - self._outside_lane_active = bool(distance > (lane_width / 2 + self.ALLOWED_OUT_DISTANCE)) + self._outside_lane_active = bool( + distance > (lane_width / 2 + self.ALLOWED_OUT_DISTANCE) + ) def _is_at_wrong_lane(self, location): """ @@ -1137,14 +1307,15 @@ def _is_at_wrong_lane(self, location): if waypoint.is_junction: self._wrong_lane_active = False elif self._last_road_id != road_id or self._last_lane_id != lane_id: - if self._last_ego_waypoint.is_junction: # Just exited a junction, check the wp direction vs the ego's one wp_yaw = waypoint.transform.rotation.yaw % 360 actor_yaw = self.actor.get_transform().rotation.yaw % 360 angle = (wp_yaw - actor_yaw) % 360 - if angle < self.MAX_VEHICLE_ANGLE or angle > (360 - self.MAX_VEHICLE_ANGLE): + if angle < self.MAX_VEHICLE_ANGLE or angle > ( + 360 - self.MAX_VEHICLE_ANGLE + ): self._wrong_lane_active = False else: self._wrong_lane_active = True @@ -1155,8 +1326,9 @@ def _is_at_wrong_lane(self, location): wp_yaw = waypoint.transform.rotation.yaw % 360 angle = (last_wp_yaw - wp_yaw) % 360 - if angle > self.MAX_WAYPOINT_ANGLE and angle < (360 - self.MAX_WAYPOINT_ANGLE): - + if angle > self.MAX_WAYPOINT_ANGLE and angle < ( + 360 - self.MAX_WAYPOINT_ANGLE + ): # Is the ego vehicle going back to the lane, or going out? Take the opposite self._wrong_lane_active = not bool(self._wrong_lane_active) @@ -1167,7 +1339,6 @@ def _is_at_wrong_lane(self, location): class WrongLaneTest(Criterion): - """ This class contains an atomic test to detect invasions to wrong direction lanes. @@ -1175,6 +1346,7 @@ class WrongLaneTest(Criterion): - actor: CARLA actor to be used for this test - optional [optional]: If True, the result is not considered for an overall pass/fail result """ + MAX_ALLOWED_ANGLE = 120.0 MAX_WAYPOINT_ANGLE = 150.0 @@ -1209,8 +1381,10 @@ def update(self): current_lane_id = lane_waypoint.lane_id current_road_id = lane_waypoint.road_id - if (self._last_road_id != current_road_id or self._last_lane_id != current_lane_id) \ - and not lane_waypoint.is_junction: + if ( + self._last_road_id != current_road_id + or self._last_lane_id != current_lane_id + ) and not lane_waypoint.is_junction: next_waypoint = lane_waypoint.next(2.0)[0] if not next_waypoint: @@ -1218,18 +1392,33 @@ def update(self): # The waypoint route direction can be considered continuous. # Therefore just check for a big gap in waypoint directions. - previous_lane_direction = self._previous_lane_waypoint.transform.get_forward_vector() + previous_lane_direction = ( + self._previous_lane_waypoint.transform.get_forward_vector() + ) current_lane_direction = lane_waypoint.transform.get_forward_vector() - p_lane_vector = np.array([previous_lane_direction.x, previous_lane_direction.y]) - c_lane_vector = np.array([current_lane_direction.x, current_lane_direction.y]) + p_lane_vector = np.array( + [previous_lane_direction.x, previous_lane_direction.y] + ) + c_lane_vector = np.array( + [current_lane_direction.x, current_lane_direction.y] + ) waypoint_angle = math.degrees( - math.acos(np.clip(np.dot(p_lane_vector, c_lane_vector) / - (np.linalg.norm(p_lane_vector) * np.linalg.norm(c_lane_vector)), -1.0, 1.0))) + math.acos( + np.clip( + np.dot(p_lane_vector, c_lane_vector) + / ( + np.linalg.norm(p_lane_vector) + * np.linalg.norm(c_lane_vector) + ), + -1.0, + 1.0, + ) + ) + ) if waypoint_angle > self.MAX_WAYPOINT_ANGLE and self._in_lane: - self.test_status = "FAILURE" self._in_lane = False self.actual_value += 1 @@ -1241,18 +1430,34 @@ def update(self): # Continuity is broken after a junction so check vehicle-lane angle instead if self._previous_lane_waypoint.is_junction: - - vector_wp = np.array([next_waypoint.transform.location.x - lane_waypoint.transform.location.x, - next_waypoint.transform.location.y - lane_waypoint.transform.location.y]) - - vector_actor = np.array([math.cos(math.radians(self.actor.get_transform().rotation.yaw)), - math.sin(math.radians(self.actor.get_transform().rotation.yaw))]) + vector_wp = np.array( + [ + next_waypoint.transform.location.x + - lane_waypoint.transform.location.x, + next_waypoint.transform.location.y + - lane_waypoint.transform.location.y, + ] + ) + + vector_actor = np.array( + [ + math.cos(math.radians(self.actor.get_transform().rotation.yaw)), + math.sin(math.radians(self.actor.get_transform().rotation.yaw)), + ] + ) vehicle_lane_angle = math.degrees( - math.acos(np.clip(np.dot(vector_actor, vector_wp) / (np.linalg.norm(vector_wp)), -1.0, 1.0))) + math.acos( + np.clip( + np.dot(vector_actor, vector_wp) + / (np.linalg.norm(vector_wp)), + -1.0, + 1.0, + ) + ) + ) if vehicle_lane_angle > self.MAX_ALLOWED_ANGLE: - self.test_status = "FAILURE" self._in_lane = False self.actual_value += 1 @@ -1260,9 +1465,13 @@ def update(self): # Keep adding "meters" to the counter distance_vector = self.actor.get_location() - self._actor_location - distance = math.sqrt(math.pow(distance_vector.x, 2) + math.pow(distance_vector.y, 2)) + distance = math.sqrt( + math.pow(distance_vector.x, 2) + math.pow(distance_vector.y, 2) + ) - if distance >= 0.02: # Used to avoid micro-changes adding add to considerable sums + if ( + distance >= 0.02 + ): # Used to avoid micro-changes adding add to considerable sums self._actor_location = CarlaDataProvider.get_location(self.actor) if not self._in_lane and not lane_waypoint.is_junction: @@ -1270,12 +1479,24 @@ def update(self): # Register the event if self._in_lane and self._wrong_distance > 0: - - wrong_way_event = TrafficEvent(event_type=TrafficEventType.WRONG_WAY_INFRACTION, frame=GameTime.get_frame()) - self._set_event_message(wrong_way_event, self._wrong_lane_start_location, - self._wrong_distance, current_road_id, current_lane_id) - self._set_event_dict(wrong_way_event, self._wrong_lane_start_location, - self._wrong_distance, current_road_id, current_lane_id) + wrong_way_event = TrafficEvent( + event_type=TrafficEventType.WRONG_WAY_INFRACTION, + frame=GameTime.get_frame(), + ) + self._set_event_message( + wrong_way_event, + self._wrong_lane_start_location, + self._wrong_distance, + current_road_id, + current_lane_id, + ) + self._set_event_dict( + wrong_way_event, + self._wrong_lane_start_location, + self._wrong_distance, + current_road_id, + current_lane_id, + ) self.events.append(wrong_way_event) self._wrong_distance = 0 @@ -1285,7 +1506,9 @@ def update(self): self._last_road_id = current_road_id self._previous_lane_waypoint = lane_waypoint - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status @@ -1294,16 +1517,28 @@ def terminate(self, new_status): If there is currently an event running, it is registered """ if not self._in_lane: - lane_waypoint = self._map.get_waypoint(self.actor.get_location()) current_lane_id = lane_waypoint.lane_id current_road_id = lane_waypoint.road_id - wrong_way_event = TrafficEvent(event_type=TrafficEventType.WRONG_WAY_INFRACTION, frame=GameTime.get_frame()) - self._set_event_message(wrong_way_event, self._wrong_lane_start_location, - self._wrong_distance, current_road_id, current_lane_id) - self._set_event_dict(wrong_way_event, self._wrong_lane_start_location, - self._wrong_distance, current_road_id, current_lane_id) + wrong_way_event = TrafficEvent( + event_type=TrafficEventType.WRONG_WAY_INFRACTION, + frame=GameTime.get_frame(), + ) + self._set_event_message( + wrong_way_event, + self._wrong_lane_start_location, + self._wrong_distance, + current_road_id, + current_lane_id, + ) + self._set_event_dict( + wrong_way_event, + self._wrong_lane_start_location, + self._wrong_distance, + current_road_id, + current_lane_id, + ) self._wrong_distance = 0 self._in_lane = True @@ -1324,21 +1559,25 @@ def _set_event_message(self, event, location, distance, road_id, lane_id): round(location.y, 3), round(location.z, 3), road_id, - lane_id)) + lane_id, + ) + ) def _set_event_dict(self, event, location, distance, road_id, lane_id): """ Sets the dictionary of the event """ - event.set_dict({ - 'location': location, - 'distance': distance, - 'road_id': road_id, - 'lane_id': lane_id}) + event.set_dict( + { + "location": location, + "distance": distance, + "road_id": road_id, + "lane_id": lane_id, + } + ) class InRadiusRegionTest(Criterion): - """ The test is a success if the actor is within a given radius of a specified region @@ -1348,12 +1587,11 @@ class InRadiusRegionTest(Criterion): """ def __init__(self, actor, x, y, radius, name="InRadiusRegionTest"): - """ - """ + """ """ super(InRadiusRegionTest, self).__init__(name, actor) self.logger.debug("%s.__init__()" % (self.__class__.__name__)) - self._x = x # pylint: disable=invalid-name - self._y = y # pylint: disable=invalid-name + self._x = x # pylint: disable=invalid-name + self._y = y # pylint: disable=invalid-name self._radius = radius def update(self): @@ -1367,10 +1605,18 @@ def update(self): return new_status if self.test_status != "SUCCESS": - in_radius = math.sqrt(((location.x - self._x)**2) + ((location.y - self._y)**2)) < self._radius + in_radius = ( + math.sqrt(((location.x - self._x) ** 2) + ((location.y - self._y) ** 2)) + < self._radius + ) if in_radius: - route_completion_event = TrafficEvent(event_type=TrafficEventType.ROUTE_COMPLETED, frame=GameTime.get_frame()) - route_completion_event.set_message("Destination was successfully reached") + route_completion_event = TrafficEvent( + event_type=TrafficEventType.ROUTE_COMPLETED, + frame=GameTime.get_frame(), + ) + route_completion_event.set_message( + "Destination was successfully reached" + ) self.events.append(route_completion_event) self.test_status = "SUCCESS" else: @@ -1379,13 +1625,14 @@ def update(self): if self.test_status == "SUCCESS": new_status = py_trees.common.Status.SUCCESS - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status class InRouteTest(Criterion): - """ The test is a success if the actor is never outside route. The actor can go outside of the route but only for a certain amount of distance @@ -1397,13 +1644,23 @@ class InRouteTest(Criterion): - offroad_min: Maximum safe distance (in meters). Might eventually cause failure - terminate_on_failure [optional]: If True, the complete scenario will terminate upon failure of this test """ + MAX_ROUTE_PERCENTAGE = 30 # % WINDOWS_SIZE = 5 # Amount of additional waypoints checked - def __init__(self, actor, route, offroad_min=None, offroad_max=30, name="InRouteTest", terminate_on_failure=False): - """ - """ - super(InRouteTest, self).__init__(name, actor, terminate_on_failure=terminate_on_failure) + def __init__( + self, + actor, + route, + offroad_min=None, + offroad_max=30, + name="InRouteTest", + terminate_on_failure=False, + ): + """ """ + super(InRouteTest, self).__init__( + name, actor, terminate_on_failure=terminate_on_failure + ) self.units = None # We care about whether or not it fails, no units attached self._route = route @@ -1448,23 +1705,27 @@ def update(self): if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE - elif self.test_status in ('RUNNING', 'INIT'): - + elif self.test_status in ("RUNNING", "INIT"): off_route = True - shortest_distance = float('inf') + shortest_distance = float("inf") closest_index = -1 # Get the closest distance - for index in range(self._current_index, - min(self._current_index + self.WINDOWS_SIZE + 1, self._route_length)): + for index in range( + self._current_index, + min(self._current_index + self.WINDOWS_SIZE + 1, self._route_length), + ): ref_location = self._route_transforms[index].location - distance = math.sqrt(((location.x - ref_location.x) ** 2) + ((location.y - ref_location.y) ** 2)) + distance = math.sqrt( + ((location.x - ref_location.x) ** 2) + + ((location.y - ref_location.y) ** 2) + ) if distance <= shortest_distance: closest_index = index shortest_distance = distance - if closest_index == -1 or shortest_distance == float('inf'): + if closest_index == -1 or shortest_distance == float("inf"): return new_status # Check if the actor is out of route @@ -1474,13 +1735,17 @@ def update(self): # If actor advanced a step, record the distance if self._current_index != closest_index: - - new_dist = self._accum_meters[closest_index] - self._accum_meters[self._current_index] + new_dist = ( + self._accum_meters[closest_index] + - self._accum_meters[self._current_index] + ) # If too far from the route, add it and check if its value if not self._in_safe_route: self._out_route_distance += new_dist - out_route_percentage = 100 * self._out_route_distance / self._accum_meters[-1] + out_route_percentage = ( + 100 * self._out_route_distance / self._accum_meters[-1] + ) if out_route_percentage > self.MAX_ROUTE_PERCENTAGE: off_route = True @@ -1491,13 +1756,16 @@ def update(self): blackv = py_trees.blackboard.Blackboard() _ = blackv.set("InRoute", False) - route_deviation_event = TrafficEvent(event_type=TrafficEventType.ROUTE_DEVIATION, frame=GameTime.get_frame()) + route_deviation_event = TrafficEvent( + event_type=TrafficEventType.ROUTE_DEVIATION, + frame=GameTime.get_frame(), + ) route_deviation_event.set_message( "Agent deviated from the route at (x={}, y={}, z={})".format( - round(location.x, 3), - round(location.y, 3), - round(location.z, 3))) - route_deviation_event.set_dict({'location': location}) + round(location.x, 3), round(location.y, 3), round(location.z, 3) + ) + ) + route_deviation_event.set_dict({"location": location}) self.events.append(route_deviation_event) @@ -1505,13 +1773,14 @@ def update(self): self.actual_value += 1 new_status = py_trees.common.Status.FAILURE - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status class RouteCompletionTest(Criterion): - """ Check at which stage of the route is the actor at each tick @@ -1520,16 +1789,20 @@ class RouteCompletionTest(Criterion): - route: Route to be checked - terminate_on_failure [optional]: If True, the complete scenario will terminate upon failure of this test """ + WINDOWS_SIZE = 2 # Thresholds to return that a route has been completed DISTANCE_THRESHOLD = 10.0 # meters PERCENTAGE_THRESHOLD = 99 # % - def __init__(self, actor, route, name="RouteCompletionTest", terminate_on_failure=False): - """ - """ - super(RouteCompletionTest, self).__init__(name, actor, terminate_on_failure=terminate_on_failure) + def __init__( + self, actor, route, name="RouteCompletionTest", terminate_on_failure=False + ): + """ """ + super(RouteCompletionTest, self).__init__( + name, actor, terminate_on_failure=terminate_on_failure + ) self.units = "%" self.success_value = 100 self._route = route @@ -1542,9 +1815,13 @@ def __init__(self, actor, route, name="RouteCompletionTest", terminate_on_failur self.target_location = self._route_transforms[-1].location - self._traffic_event = TrafficEvent(event_type=TrafficEventType.ROUTE_COMPLETION, frame=0) - self._traffic_event.set_dict({'route_completed': self.actual_value}) - self._traffic_event.set_message("Agent has completed {} of the route".format(self.actual_value)) + self._traffic_event = TrafficEvent( + event_type=TrafficEventType.ROUTE_COMPLETION, frame=0 + ) + self._traffic_event.set_dict({"route_completed": self.actual_value}) + self._traffic_event.set_message( + "Agent has completed {} of the route".format(self.actual_value) + ) self.events.append(self._traffic_event) def _get_acummulated_percentages(self): @@ -1574,32 +1851,42 @@ def update(self): if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE - elif self.test_status in ('RUNNING', 'INIT'): - - for index in range(self._index, min(self._index + self.WINDOWS_SIZE + 1, self._route_length)): + elif self.test_status in ("RUNNING", "INIT"): + for index in range( + self._index, + min(self._index + self.WINDOWS_SIZE + 1, self._route_length), + ): # Get the dot product to know if it has passed this location route_transform = self._route_transforms[index] route_location = route_transform.location - wp_dir = route_transform.get_forward_vector() # Waypoint's forward vector - wp_veh = location - route_location # vector route - vehicle + wp_dir = ( + route_transform.get_forward_vector() + ) # Waypoint's forward vector + wp_veh = location - route_location # vector route - vehicle if wp_veh.dot(wp_dir) > 0: self._index = index self.actual_value = self._route_accum_perc[self._index] self.actual_value = round(self.actual_value, 2) - self._traffic_event.set_dict({'route_completed': self.actual_value}) - self._traffic_event.set_message("Agent has completed {} of the route".format(self.actual_value)) - - if self.actual_value > self.PERCENTAGE_THRESHOLD \ - and location.distance(self.target_location) < self.DISTANCE_THRESHOLD: + self._traffic_event.set_dict({"route_completed": self.actual_value}) + self._traffic_event.set_message( + "Agent has completed {} of the route".format(self.actual_value) + ) + + if ( + self.actual_value > self.PERCENTAGE_THRESHOLD + and location.distance(self.target_location) < self.DISTANCE_THRESHOLD + ): self.test_status = "SUCCESS" self.actual_value = 100 elif self.test_status == "SUCCESS": new_status = py_trees.common.Status.SUCCESS - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status @@ -1609,8 +1896,10 @@ def terminate(self, new_status): """ self.actual_value = round(self.actual_value, 2) - self._traffic_event.set_dict({'route_completed': self.actual_value}) - self._traffic_event.set_message("Agent has completed {} of the route".format(self.actual_value)) + self._traffic_event.set_dict({"route_completed": self.actual_value}) + self._traffic_event.set_message( + "Agent has completed {} of the route".format(self.actual_value) + ) if self.test_status == "INIT": self.test_status = "FAILURE" @@ -1618,7 +1907,6 @@ def terminate(self, new_status): class RunningRedLightTest(Criterion): - """ Check if an actor is running a red light @@ -1626,13 +1914,16 @@ class RunningRedLightTest(Criterion): - actor: CARLA actor to be used for this test - terminate_on_failure [optional]: If True, the complete scenario will terminate upon failure of this test """ + DISTANCE_LIGHT = 15 # m def __init__(self, actor, name="RunningRedLightTest", terminate_on_failure=False): """ Init """ - super(RunningRedLightTest, self).__init__(name, actor, terminate_on_failure=terminate_on_failure) + super(RunningRedLightTest, self).__init__( + name, actor, terminate_on_failure=terminate_on_failure + ) self._world = CarlaDataProvider.get_world() self._map = CarlaDataProvider.get_map() self._list_traffic_lights = [] @@ -1641,7 +1932,7 @@ def __init__(self, actor, name="RunningRedLightTest", terminate_on_failure=False all_actors = CarlaDataProvider.get_all_actors() for _actor in all_actors: - if 'traffic_light' in _actor.type_id: + if "traffic_light" in _actor.type_id: center, waypoints = self.get_traffic_light_waypoints(_actor) self._list_traffic_lights.append((_actor, center, waypoints)) @@ -1650,8 +1941,12 @@ def is_vehicle_crossing_line(self, seg1, seg2): """ check if vehicle crosses a line segment """ - line1 = shapely.geometry.LineString([(seg1[0].x, seg1[0].y), (seg1[1].x, seg1[1].y)]) - line2 = shapely.geometry.LineString([(seg2[0].x, seg2[0].y), (seg2[1].x, seg2[1].y)]) + line1 = shapely.geometry.LineString( + [(seg1[0].x, seg1[0].y), (seg1[1].x, seg1[1].y)] + ) + line2 = shapely.geometry.LineString( + [(seg2[0].x, seg2[0].y), (seg2[1].x, seg2[1].y)] + ) inter = line1.intersection(line2) return not inter.is_empty @@ -1669,14 +1964,17 @@ def update(self): veh_extent = self.actor.bounding_box.extent.x - tail_close_pt = self.rotate_point(carla.Vector3D(-0.8 * veh_extent, 0, 0), transform.rotation.yaw) + tail_close_pt = self.rotate_point( + carla.Vector3D(-0.8 * veh_extent, 0, 0), transform.rotation.yaw + ) tail_close_pt = location + carla.Location(tail_close_pt) - tail_far_pt = self.rotate_point(carla.Vector3D(-veh_extent - 1, 0, 0), transform.rotation.yaw) + tail_far_pt = self.rotate_point( + carla.Vector3D(-veh_extent - 1, 0, 0), transform.rotation.yaw + ) tail_far_pt = location + carla.Location(tail_far_pt) for traffic_light, center, waypoints in self._list_traffic_lights: - if self.debug: z = 2.1 if traffic_light.state == carla.TrafficLightState.Red: @@ -1685,16 +1983,27 @@ def update(self): color = carla.Color(0, 155, 0) else: color = carla.Color(155, 155, 0) - self._world.debug.draw_point(center + carla.Location(z=z), size=0.2, color=color, life_time=0.01) + self._world.debug.draw_point( + center + carla.Location(z=z), size=0.2, color=color, life_time=0.01 + ) for wp in waypoints: text = "{}.{}".format(wp.road_id, wp.lane_id) self._world.debug.draw_string( - wp.transform.location + carla.Location(x=1, z=z), text, color=color, life_time=0.01) + wp.transform.location + carla.Location(x=1, z=z), + text, + color=color, + life_time=0.01, + ) self._world.debug.draw_point( - wp.transform.location + carla.Location(z=z), size=0.1, color=color, life_time=0.01) + wp.transform.location + carla.Location(z=z), + size=0.1, + color=color, + life_time=0.01, + ) center_loc = carla.Location(center) + # skip traffic lights that have already been passed, are too far or are not red if self._last_red_light_id and self._last_red_light_id == traffic_light.id: continue if center_loc.distance(location) > self.DISTANCE_LIGHT: @@ -1703,39 +2012,56 @@ def update(self): continue for wp in waypoints: - tail_wp = self._map.get_waypoint(tail_far_pt) # Calculate the dot product (Might be unscaled, as only its sign is important) - ve_dir = CarlaDataProvider.get_transform(self.actor).get_forward_vector() + ve_dir = CarlaDataProvider.get_transform( + self.actor + ).get_forward_vector() wp_dir = wp.transform.get_forward_vector() # Check the lane until all the "tail" has passed - if tail_wp.road_id == wp.road_id and tail_wp.lane_id == wp.lane_id and ve_dir.dot(wp_dir) > 0: + if ( + tail_wp.road_id == wp.road_id + and tail_wp.lane_id == wp.lane_id + and ve_dir.dot(wp_dir) > 0 + ): # This light is red and is affecting our lane yaw_wp = wp.transform.rotation.yaw lane_width = wp.lane_width location_wp = wp.transform.location - lft_lane_wp = self.rotate_point(carla.Vector3D(0.6 * lane_width, 0, 0), yaw_wp + 90) + lft_lane_wp = self.rotate_point( + carla.Vector3D(0.6 * lane_width, 0, 0), yaw_wp + 90 + ) lft_lane_wp = location_wp + carla.Location(lft_lane_wp) - rgt_lane_wp = self.rotate_point(carla.Vector3D(0.6 * lane_width, 0, 0), yaw_wp - 90) + rgt_lane_wp = self.rotate_point( + carla.Vector3D(0.6 * lane_width, 0, 0), yaw_wp - 90 + ) rgt_lane_wp = location_wp + carla.Location(rgt_lane_wp) # Is the vehicle traversing the stop line? - if self.is_vehicle_crossing_line((tail_close_pt, tail_far_pt), (lft_lane_wp, rgt_lane_wp)): - + if self.is_vehicle_crossing_line( + (tail_close_pt, tail_far_pt), (lft_lane_wp, rgt_lane_wp) + ): self.test_status = "FAILURE" self.actual_value += 1 location = traffic_light.get_transform().location - red_light_event = TrafficEvent(event_type=TrafficEventType.TRAFFIC_LIGHT_INFRACTION, frame=GameTime.get_frame()) + red_light_event = TrafficEvent( + event_type=TrafficEventType.TRAFFIC_LIGHT_INFRACTION, + frame=GameTime.get_frame(), + ) red_light_event.set_message( "Agent ran a red light {} at (x={}, y={}, z={})".format( traffic_light.id, round(location.x, 3), round(location.y, 3), - round(location.z, 3))) - red_light_event.set_dict({'id': traffic_light.id, 'location': location}) + round(location.z, 3), + ) + ) + red_light_event.set_dict( + {"id": traffic_light.id, "location": location} + ) self.events.append(red_light_event) self._last_red_light_id = traffic_light.id @@ -1744,7 +2070,9 @@ def update(self): if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status @@ -1752,8 +2080,14 @@ def rotate_point(self, point, angle): """ rotate a given point by a given angle """ - x_ = math.cos(math.radians(angle)) * point.x - math.sin(math.radians(angle)) * point.y - y_ = math.sin(math.radians(angle)) * point.x + math.cos(math.radians(angle)) * point.y + x_ = ( + math.cos(math.radians(angle)) * point.x + - math.sin(math.radians(angle)) * point.y + ) + y_ = ( + math.sin(math.radians(angle)) * point.x + + math.cos(math.radians(angle)) * point.y + ) return carla.Vector3D(x_, y_, point.z) def get_traffic_light_waypoints(self, traffic_light): @@ -1766,7 +2100,9 @@ def get_traffic_light_waypoints(self, traffic_light): # Discretize the trigger box into points area_ext = traffic_light.trigger_volume.extent - x_values = np.arange(-0.9 * area_ext.x, 0.9 * area_ext.x, 1.0) # 0.9 to avoid crossing to adjacent lanes + x_values = np.arange( + -0.9 * area_ext.x, 0.9 * area_ext.x, 1.0 + ) # 0.9 to avoid crossing to adjacent lanes area = [] for x in x_values: @@ -1779,7 +2115,11 @@ def get_traffic_light_waypoints(self, traffic_light): for pt in area: wpx = self._map.get_waypoint(pt) # As x_values are arranged in order, only the last one has to be checked - if not ini_wps or ini_wps[-1].road_id != wpx.road_id or ini_wps[-1].lane_id != wpx.lane_id: + if ( + not ini_wps + or ini_wps[-1].road_id != wpx.road_id + or ini_wps[-1].lane_id != wpx.lane_id + ): ini_wps.append(wpx) # Advance them until the intersection @@ -1797,7 +2137,6 @@ def get_traffic_light_waypoints(self, traffic_light): class RunningStopTest(Criterion): - """ Check if an actor is running a stop sign @@ -1805,14 +2144,16 @@ class RunningStopTest(Criterion): - actor: CARLA actor to be used for this test - terminate_on_failure [optional]: If True, the complete scenario will terminate upon failure of this test """ + PROXIMITY_THRESHOLD = 4.0 # Stops closer than this distance will be detected [m] - SPEED_THRESHOLD = 0.1 # Minimum speed to consider the actor has stopped [m/s] + SPEED_THRESHOLD = 0.1 # Minimum speed to consider the actor has stopped [m/s] WAYPOINT_STEP = 0.5 # m def __init__(self, actor, name="RunningStopTest", terminate_on_failure=False): - """ - """ - super(RunningStopTest, self).__init__(name, actor, terminate_on_failure=terminate_on_failure) + """ """ + super(RunningStopTest, self).__init__( + name, actor, terminate_on_failure=terminate_on_failure + ) self._world = CarlaDataProvider.get_world() self._map = CarlaDataProvider.get_map() self._list_stop_signs = [] @@ -1822,16 +2163,25 @@ def __init__(self, actor, name="RunningStopTest", terminate_on_failure=False): self._last_failed_stop = None for _actor in CarlaDataProvider.get_all_actors(): - if 'traffic.stop' in _actor.type_id: + if "traffic.stop" in _actor.type_id: self._list_stop_signs.append(_actor) def point_inside_boundingbox(self, point, bb_center, bb_extent, multiplier=1.2): """Checks whether or not a point is inside a bounding box.""" # pylint: disable=invalid-name - A = carla.Vector2D(bb_center.x - multiplier * bb_extent.x, bb_center.y - multiplier * bb_extent.y) - B = carla.Vector2D(bb_center.x + multiplier * bb_extent.x, bb_center.y - multiplier * bb_extent.y) - D = carla.Vector2D(bb_center.x - multiplier * bb_extent.x, bb_center.y + multiplier * bb_extent.y) + A = carla.Vector2D( + bb_center.x - multiplier * bb_extent.x, + bb_center.y - multiplier * bb_extent.y, + ) + B = carla.Vector2D( + bb_center.x + multiplier * bb_extent.x, + bb_center.y - multiplier * bb_extent.y, + ) + D = carla.Vector2D( + bb_center.x - multiplier * bb_extent.x, + bb_center.y + multiplier * bb_extent.y, + ) M = carla.Vector2D(point.x, point.y) AB = B - A @@ -1859,7 +2209,9 @@ def is_actor_affected_by_stop(self, wp_list, stop): # Using more than one waypoint removes issues with small trigger volumes and backwards movement stop_extent = stop.trigger_volume.extent for actor_wp in wp_list: - if self.point_inside_boundingbox(actor_wp.transform.location, stop_location, stop_extent): + if self.point_inside_boundingbox( + actor_wp.transform.location, stop_location, stop_extent + ): return True return False @@ -1915,7 +2267,9 @@ def update(self): check_wps = self._get_waypoints(self.actor) if not self._target_stop_sign: - self._target_stop_sign = self._scan_for_stop_sign(actor_transform, check_wps) + self._target_stop_sign = self._scan_for_stop_sign( + actor_transform, check_wps + ) return new_status if not self._stop_completed: @@ -1924,19 +2278,29 @@ def update(self): self._stop_completed = True if not self.is_actor_affected_by_stop(check_wps, self._target_stop_sign): - if not self._stop_completed and self._last_failed_stop != self._target_stop_sign.id: + if ( + not self._stop_completed + and self._last_failed_stop != self._target_stop_sign.id + ): # did we stop? self.actual_value += 1 self.test_status = "FAILURE" stop_location = self._target_stop_sign.get_transform().location - running_stop_event = TrafficEvent(event_type=TrafficEventType.STOP_INFRACTION, frame=GameTime.get_frame()) + running_stop_event = TrafficEvent( + event_type=TrafficEventType.STOP_INFRACTION, + frame=GameTime.get_frame(), + ) running_stop_event.set_message( "Agent ran a stop with id={} at (x={}, y={}, z={})".format( self._target_stop_sign.id, round(stop_location.x, 3), round(stop_location.y, 3), - round(stop_location.z, 3))) - running_stop_event.set_dict({'id': self._target_stop_sign.id, 'location': stop_location}) + round(stop_location.z, 3), + ) + ) + running_stop_event.set_dict( + {"id": self._target_stop_sign.id, "location": stop_location} + ) self.events.append(running_stop_event) @@ -1949,13 +2313,14 @@ def update(self): if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status class MinimumSpeedRouteTest(Criterion): - """ Check at which stage of the route is the actor at each tick @@ -1964,12 +2329,19 @@ class MinimumSpeedRouteTest(Criterion): - route: Route to be checked - terminate_on_failure [optional]: If True, the complete scenario will terminate upon failure of this test """ + WINDOWS_SIZE = 2 RATIO = 1 - def __init__(self, actor, route, checkpoints=1, name="MinimumSpeedRouteTest", terminate_on_failure=False): - """ - """ + def __init__( + self, + actor, + route, + checkpoints=1, + name="MinimumSpeedRouteTest", + terminate_on_failure=False, + ): + """ """ super().__init__(name, actor, terminate_on_failure=terminate_on_failure) self.units = "%" self.success_value = 100 @@ -1999,7 +2371,6 @@ def __init__(self, actor, route, checkpoints=1, name="MinimumSpeedRouteTest", te self._index = 0 - def update(self): """ Check if the actor location is within trigger region @@ -2009,12 +2380,14 @@ def update(self): if self._terminate_on_failure and (self.test_status == "FAILURE"): new_status = py_trees.common.Status.FAILURE - # Check the actor progress through the route + # Check the actor progress through the route location = CarlaDataProvider.get_location(self.actor) if location is None: return new_status - for index in range(self._index, min(self._index + self.WINDOWS_SIZE + 1, self._route_length)): + for index in range( + self._index, min(self._index + self.WINDOWS_SIZE + 1, self._route_length) + ): # Get the dot product to know if it has passed this location route_transform = self._route[index][0] route_location = route_transform.location @@ -2037,8 +2410,10 @@ def update(self): return new_status # Get the speed of the surrounding Background Activity - all_vehicles = CarlaDataProvider.get_all_actors().filter('vehicle*') - background_vehicles = [v for v in all_vehicles if v.attributes['role_name'] == 'background'] + all_vehicles = CarlaDataProvider.get_all_actors().filter("vehicle*") + background_vehicles = [ + v for v in all_vehicles if v.attributes["role_name"] == "background" + ] if background_vehicles: frame_mean_speed = 0 @@ -2051,16 +2426,19 @@ def update(self): self._actor_speed += velocity self._speed_points += 1 - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status def _set_traffic_event(self): - if self._speed_points > 0 and self._mean_speed: self._mean_speed /= self._speed_points self._actor_speed /= self._speed_points - checkpoint_value = round(self._actor_speed / (self.RATIO * self._mean_speed) * 100, 2) + checkpoint_value = round( + self._actor_speed / (self.RATIO * self._mean_speed) * 100, 2 + ) else: checkpoint_value = 100 @@ -2069,9 +2447,13 @@ def _set_traffic_event(self): else: self.test_status = "FAILURE" - self._traffic_event = TrafficEvent(TrafficEventType.MIN_SPEED_INFRACTION, GameTime.get_frame()) - self._traffic_event.set_dict({'percentage': checkpoint_value}) - self._traffic_event.set_message(f"Average speed is {checkpoint_value}% of the surrounding traffic's one") + self._traffic_event = TrafficEvent( + TrafficEventType.MIN_SPEED_INFRACTION, GameTime.get_frame() + ) + self._traffic_event.set_dict({"percentage": checkpoint_value}) + self._traffic_event.set_message( + f"Average speed is {checkpoint_value}% of the surrounding traffic's one" + ) self.events.append(self._traffic_event) self._checkpoint_values.append(checkpoint_value) @@ -2086,12 +2468,13 @@ def terminate(self, new_status): self._set_traffic_event() if len(self._checkpoint_values): - self.actual_value = round(sum(self._checkpoint_values) / len(self._checkpoint_values), 2) + self.actual_value = round( + sum(self._checkpoint_values) / len(self._checkpoint_values), 2 + ) super().terminate(new_status) class YieldToEmergencyVehicleTest(Criterion): - """ Atomic Criterion to detect if the actor yields its lane to the emergency vehicle behind it. Detection is done by checking if the ev is in front of the actor @@ -2102,7 +2485,7 @@ class YieldToEmergencyVehicleTest(Criterion): optional (bool): If True, the result is not considered for an overall pass/fail result """ - WAITING_TIME_THRESHOLD = 15 # Maximum time for actor to block ev + WAITING_TIME_THRESHOLD = 15 # Maximum time for actor to block ev def __init__(self, actor, ev, optional=False, name="YieldToEmergencyVehicleTest"): """ @@ -2136,7 +2519,9 @@ def update(self): else: self.test_status = "SUCCESS" - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status def terminate(self, new_status): @@ -2144,8 +2529,12 @@ def terminate(self, new_status): # Terminates are called multiple times. Do this only once if not self._terminated: if self.test_status == "FAILURE": - traffic_event = TrafficEvent(TrafficEventType.YIELD_TO_EMERGENCY_VEHICLE, GameTime.get_frame()) - traffic_event.set_message("Agent failed to yield to an emergency vehicle") + traffic_event = TrafficEvent( + TrafficEventType.YIELD_TO_EMERGENCY_VEHICLE, GameTime.get_frame() + ) + traffic_event.set_message( + "Agent failed to yield to an emergency vehicle" + ) self.events.append(traffic_event) self._terminated = True @@ -2154,7 +2543,6 @@ def terminate(self, new_status): class ScenarioTimeoutTest(Criterion): - """ Atomic Criterion to detect if the actor has been incapable of finishing an scenario @@ -2163,7 +2551,9 @@ class ScenarioTimeoutTest(Criterion): optional (bool): If True, the result is not considered for an overall pass/fail result """ - def __init__(self, actor, scenario_name, optional=False, name="ScenarioTimeoutTest"): + def __init__( + self, actor, scenario_name, optional=False, name="ScenarioTimeoutTest" + ): """ Constructor """ @@ -2175,7 +2565,9 @@ def __init__(self, actor, scenario_name, optional=False, name="ScenarioTimeoutTe def update(self): """wait""" new_status = py_trees.common.Status.RUNNING - self.logger.debug("%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status)) + self.logger.debug( + "%s.update()[%s->%s]" % (self.__class__.__name__, self.status, new_status) + ) return new_status def terminate(self, new_status): @@ -2188,7 +2580,9 @@ def terminate(self, new_status): self.actual_value = 1 self.test_status = "FAILURE" - traffic_event = TrafficEvent(event_type=TrafficEventType.SCENARIO_TIMEOUT, frame=GameTime.get_frame()) + traffic_event = TrafficEvent( + event_type=TrafficEventType.SCENARIO_TIMEOUT, frame=GameTime.get_frame() + ) traffic_event.set_message("Agent timed out a scenario") self.events.append(traffic_event) py_trees.blackboard.Blackboard().set(blackboard_name, None, True) diff --git a/srunner/scenarios/open_scenario.py b/srunner/scenarios/open_scenario.py deleted file mode 100644 index c7f58d0..0000000 --- a/srunner/scenarios/open_scenario.py +++ /dev/null @@ -1,617 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2019-2020 Intel Corporation -# -# This work is licensed under the terms of the MIT license. -# For a copy, see . - -""" -Basic scenario class using the OpenSCENARIO definition -""" - -from __future__ import print_function - -from distutils.util import strtobool -import itertools -import os -import py_trees - -from srunner.scenariomanager.carla_data_provider import CarlaDataProvider -from srunner.scenariomanager.scenarioatomics.atomic_behaviors import ChangeWeather, ChangeRoadFriction, ChangeParameter, \ - ChangeActorLaneOffset, ChangeActorWaypoints, ChangeLateralDistance -from srunner.scenariomanager.scenarioatomics.atomic_behaviors import ChangeActorControl, ChangeActorTargetSpeed -from srunner.scenariomanager.timer import GameTime -from srunner.scenariomanager.weather_sim import OSCWeatherBehavior -from srunner.scenarios.basic_scenario import BasicScenario -from srunner.tools.openscenario_parser import OpenScenarioParser, oneshot_with_check, ParameterRef -from srunner.tools.py_trees_port import Decorator - - -def repeatable_behavior(behaviour, name=None): - """ - This behaviour allows a composite with oneshot ancestors to run multiple - times, resetting the oneshot variables after each execution - """ - if not name: - name = behaviour.name - clear_descendant_variables = ClearBlackboardVariablesStartingWith( - name="Clear Descendant Variables of {}".format(name), - variable_name_beginning=name + ">" - ) - # If it's a sequence, don't double-nest it in a redundant manner - if isinstance(behaviour, py_trees.composites.Sequence): - behaviour.add_child(clear_descendant_variables) - sequence = behaviour - else: - sequence = py_trees.composites.Sequence(name="RepeatableBehaviour of {}".format(name)) - sequence.add_children([behaviour, clear_descendant_variables]) - return sequence - - -class ClearBlackboardVariablesStartingWith(py_trees.behaviours.Success): - - """ - Clear the values starting with the specified string from the blackboard. - - Args: - name (:obj:`str`): name of the behaviour - variable_name_beginning (:obj:`str`): beginning of the names of variable to clear - """ - - def __init__(self, - name="Clear Blackboard Variable Starting With", - variable_name_beginning="dummy", - ): - super(ClearBlackboardVariablesStartingWith, self).__init__(name) - self.variable_name_beginning = variable_name_beginning - - def initialise(self): - """ - Delete the variables from the blackboard. - """ - blackboard_variables = [key for key, _ in py_trees.blackboard.Blackboard().__dict__.items( - ) if key.startswith(self.variable_name_beginning)] - for variable in blackboard_variables: - delattr(py_trees.blackboard.Blackboard(), variable) - - -class StoryElementStatusToBlackboard(Decorator): - - """ - Reflect the status of the decorator's child story element to the blackboard. - - Args: - child: the child behaviour or subtree - story_element_type: the element type [act,scene,maneuver,event,action] - element_name: the story element's name attribute - """ - - def __init__(self, child, story_element_type, element_name): - super(StoryElementStatusToBlackboard, self).__init__(name=child.name, child=child) - self.story_element_type = story_element_type - self.element_name = element_name - self.blackboard = py_trees.blackboard.Blackboard() - - def initialise(self): - """ - Record the elements's start time on the blackboard - """ - self.blackboard.set( - name="({}){}-{}".format(self.story_element_type.upper(), - self.element_name, "START"), - value=GameTime.get_time(), - overwrite=True - ) - - def update(self): - """ - Reflect the decorated child's status - Returns: the decorated child's status - """ - return self.decorated.status - - def terminate(self, new_status): - """ - Terminate and mark Blackboard entry with END - """ - # Report whether we ended with End or Cancel - # If we were ended or cancelled, our state will be INVALID and - # We will have an ancestor (a parallel SUCCESS_ON_ALL) with a successful child/children - # It's possible we ENDed AND CANCELled if both condition groups were true simultaneously - # NOTE 'py_trees.common.Status.INVALID' is the status of a behaviur which was terminated by a parent - rules = [] - if new_status == py_trees.common.Status.INVALID: - # We were terminated from above unnaturally - # Figure out if were ended or cancelled - terminating_ancestor = self.parent - while terminating_ancestor.status == py_trees.common.Status.INVALID: - terminating_ancestor = terminating_ancestor.parent - # We have found an ancestory which was not terminated by a parent - # Check what caused it to terminate its children - if terminating_ancestor.status == py_trees.common.Status.SUCCESS: - successful_children = [ - child.name - for child - in terminating_ancestor.children - if child.status == py_trees.common.Status.SUCCESS] - if "StopTrigger" in successful_children: - rules.append("END") - - # END is the default status unless we have a more detailed one - rules = rules or ["END"] - - for rule in rules: - self.blackboard.set( - name="({}){}-{}".format(self.story_element_type.upper(), - self.element_name, rule), - value=GameTime.get_time(), - overwrite=True - ) - - -def get_xml_path(tree, node): - """ - Extract the full path of a node within an XML tree - - Note: Catalogs are pulled from a separate file so the XML tree is split. - This means that in order to get the XML path, it must be done in 2 steps. - Some places in this python script do that by concatenating the results - of 2 get_xml_path calls with another ">". - Example: "Behavior>AutopilotSequence" + ">" + "StartAutopilot>StartAutopilot>StartAutopilot" - """ - - path = "" - parent_map = {c: p for p in tree.iter() for c in p} - - cur_node = node - while cur_node != tree: - path = "{}>{}".format(cur_node.attrib.get('name'), path) - cur_node = parent_map[cur_node] - - path = path[:-1] - return path - - -class OpenScenario(BasicScenario): - - """ - Implementation of the OpenSCENARIO scenario - """ - - def __init__(self, world, ego_vehicles, config, config_file, debug_mode=False, criteria_enable=True, timeout=300): - """ - Setup all relevant parameters and create scenario - """ - self.config = config - self.route = None - self.config_file = config_file - # Timeout of scenario in seconds - self.timeout = timeout - - super(OpenScenario, self).__init__(self.config.name, ego_vehicles=ego_vehicles, config=config, - world=world, debug_mode=debug_mode, - terminate_on_failure=False, criteria_enable=criteria_enable) - - def _initialize_parameters(self): - """ - Parse ParameterAction from Init and update global osc parameters. - """ - param_behavior = py_trees.composites.Parallel( - policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ALL, name="ParametersInit") - for i, global_action in enumerate(self.config.init.find('Actions').iter('GlobalAction')): - maneuver_name = 'InitParams' - if global_action.find('ParameterAction') is not None: - parameter_action = global_action.find('ParameterAction') - parameter_ref = parameter_action.attrib.get('parameterRef') - if parameter_action.find('ModifyAction') is not None: - action_rule = parameter_action.find('ModifyAction').find("Rule") - if action_rule.find("AddValue") is not None: - rule, value = '+', action_rule.find("AddValue").attrib.get('value') - else: - rule, value = '*', action_rule.find("MultiplyByValue").attrib.get('value') - else: - rule, value = None, parameter_action.find('SetAction').attrib.get('value') - parameter_update = ChangeParameter(parameter_ref, value=ParameterRef(value), rule=rule, - name=maneuver_name + '_%d' % i) - param_behavior.add_child(oneshot_with_check(variable_name="InitialParameters" + '_%d' % i, - behaviour=parameter_update)) - - return param_behavior - - def _initialize_environment(self, world): - """ - Initialization of weather and road friction. - """ - pass - - def _create_environment_behavior(self): - # Set the appropriate weather conditions - - env_behavior = py_trees.composites.Parallel( - policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ALL, name="EnvironmentBehavior") - - weather_update = ChangeWeather( - OpenScenarioParser.get_weather_from_env_action(self.config.init, self.config.catalogs)) - road_friction = ChangeRoadFriction( - OpenScenarioParser.get_friction_from_env_action(self.config.init, self.config.catalogs)) - env_behavior.add_child(oneshot_with_check(variable_name="InitialWeather", behaviour=weather_update)) - env_behavior.add_child(oneshot_with_check(variable_name="InitRoadFriction", behaviour=road_friction)) - - return env_behavior - - def _create_init_behavior(self): - - init_behavior = py_trees.composites.Parallel( - policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ALL, name="InitBehaviour") - - actor_list = self.other_actors + self.ego_vehicles + [None] - - for actor in self.config.other_actors + self.config.ego_vehicles: - for carla_actor in self.other_actors + self.ego_vehicles: - if (carla_actor is not None and 'role_name' in carla_actor.attributes and - carla_actor.attributes['role_name'] == actor.rolename): - actor_init_behavior = py_trees.composites.Sequence(name="InitActor{}".format(actor.rolename)) - - controller_atomic = None - atomic = None - for private in self.config.init.iter("Private"): - if private.attrib.get('entityRef', None) == actor.rolename: - for private_action in private.iter("PrivateAction"): - if private_action.find('ControllerAction') is not None: - for controller_action in private_action.iter('ControllerAction'): - module, args = OpenScenarioParser.get_controller( - controller_action, self.config.catalogs) - controller_atomic = ChangeActorControl( - carla_actor, control_py_module=module, args=args, - scenario_file_path=os.path.dirname(self.config.filename)) - - elif private_action.find('LateralAction') is not None: - private_action = private_action.find('LateralAction') - if private_action.find('LaneOffsetAction') is not None: - lat_maneuver = private_action.find('LaneOffsetAction') - continuous = bool(strtobool(lat_maneuver.attrib.get('continuous', "true"))) - # Parsing of the different Dynamic shapes is missing - lane_target_offset = lat_maneuver.find('LaneOffsetTarget') - if lane_target_offset.find('AbsoluteTargetLaneOffset') is not None: - absolute_offset = ParameterRef( - lane_target_offset.find('AbsoluteTargetLaneOffset').attrib.get('value', - 0)) - atomic = ChangeActorLaneOffset( - carla_actor, absolute_offset, continuous=continuous, - name='LaneOffsetAction') - - elif lane_target_offset.find('RelativeTargetLaneOffset') is not None: - relative_target_offset = lane_target_offset.find('RelativeTargetLaneOffset') - relative_offset = ParameterRef( - relative_target_offset.attrib.get('value', 0)) - relative_actor_name = relative_target_offset.attrib.get('entityRef', None) - relative_actor = None - for _actor in actor_list: - if _actor is not None and 'role_name' in _actor.attributes: - if relative_actor_name == _actor.attributes['role_name']: - relative_actor = _actor - break - if relative_actor is None: - raise AttributeError( - "Cannot find actor '{}' for condition".format(relative_actor_name)) - atomic = ChangeActorLaneOffset(carla_actor, relative_offset, relative_actor, - continuous=continuous, - name='LaneOffsetAction') - if private_action.find("LateralDistanceAction") is not None: - lat_maneuver = private_action.find('LateralDistanceAction') - maneuver_name = "LateralDistanceActionInit" - continuous = bool(strtobool(lat_maneuver.attrib.get('continuous', "false"))) - freespace = bool(strtobool(lat_maneuver.attrib.get('freespace', "false"))) - distance = ParameterRef(lat_maneuver.attrib.get('distance', float("inf"))) - constraints = lat_maneuver.find('DynamicConstraints') - max_speed = constraints.attrib.get('maxSpeed', - None) if constraints is not None else None - relative_actor = None - relative_actor_name = lat_maneuver.attrib.get('entityRef', None) - for _actor in actor_list: - if _actor is not None and 'role_name' in _actor.attributes: - if relative_actor_name == _actor.attributes['role_name']: - relative_actor = _actor - break - if relative_actor is None: - raise AttributeError( - "Cannot find actor '{}' for condition".format(relative_actor_name)) - atomic = ChangeLateralDistance(carla_actor, distance, relative_actor, - continuous=continuous, freespace=freespace, - name=maneuver_name) - - elif private_action.find('RoutingAction') is not None: - private_action = private_action.find('RoutingAction') - if private_action.find('AssignRouteAction') is not None: - route_action = private_action.find('AssignRouteAction') - waypoints = OpenScenarioParser.get_route(route_action, self.config.catalogs) - atomic = ChangeActorWaypoints(carla_actor, waypoints=waypoints, - name="AssignRouteAction") - elif private_action.find('FollowTrajectoryAction') is not None: - trajectory_action = private_action.find('FollowTrajectoryAction') - waypoints, times = OpenScenarioParser.get_trajectory(trajectory_action, - self.config.catalogs) - atomic = ChangeActorWaypoints(carla_actor, waypoints=list( - zip(waypoints, ['shortest'] * len(waypoints))), - times=times, name="FollowTrajectoryAction") - elif private_action.find('AcquirePositionAction') is not None: - route_action = private_action.find('AcquirePositionAction') - osc_position = route_action.find('Position') - waypoints = [(osc_position, 'fastest')] - atomic = ChangeActorWaypoints(carla_actor, waypoints=waypoints, - name="AcquirePositionAction") - - if controller_atomic is None: - controller_atomic = ChangeActorControl(carla_actor, control_py_module=None, args={}) - actor_init_behavior.add_child(controller_atomic) - if atomic is not None: - actor_init_behavior.add_child(atomic) - - if actor.speed > 0: - actor_init_behavior.add_child(ChangeActorTargetSpeed(carla_actor, actor.speed, init_speed=True)) - - init_behavior.add_child(actor_init_behavior) - break - - return init_behavior - - def _create_behavior(self): - """ - Basic behavior do nothing, i.e. Idle - """ - - stories_behavior = py_trees.composites.Parallel(policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ALL, - name="OSCStories") - joint_actor_list = self.other_actors + self.ego_vehicles + [None] - - for story in self.config.stories: - story_name = story.get("name") - story_behavior = py_trees.composites.Parallel(policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ALL, - name=story_name) - for act in story.iter("Act"): - - act_sequence = py_trees.composites.Sequence( - name="Act StartConditions and behaviours") - - start_conditions = py_trees.composites.Parallel( - policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE, name="StartConditions Group") - - parallel_behavior = py_trees.composites.Parallel( - policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE, name="Maneuver + EndConditions Group") - - parallel_sequences = py_trees.composites.Parallel( - policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ALL, name="Maneuvers") - - for sequence in act.iter("ManeuverGroup"): - sequence_behavior = py_trees.composites.Sequence(name=sequence.attrib.get('name')) - repetitions = sequence.attrib.get('maximumExecutionCount', 1) - - for _ in range(int(repetitions)): - - actor_ids = [] - for actor in sequence.iter("Actors"): - for entity in actor.iter("EntityRef"): - entity_name = entity.attrib.get('entityRef', None) - for k, _ in enumerate(joint_actor_list): - if (joint_actor_list[k] and - entity_name == joint_actor_list[k].attributes['role_name']): - actor_ids.append(k) - break - - if not actor_ids: - print("Warning: Maneuvergroup {} does not use reference actors!".format( - sequence.attrib.get('name'))) - actor_ids.append(len(joint_actor_list) - 1) - - # Collect catalog reference maneuvers in order to process them at the same time as normal maneuvers - catalog_maneuver_list = [] - for catalog_reference in sequence.iter("CatalogReference"): - catalog_maneuver = OpenScenarioParser.get_catalog_entry(self.config.catalogs, - catalog_reference) - catalog_maneuver_list.append(catalog_maneuver) - all_maneuvers = itertools.chain(iter(catalog_maneuver_list), sequence.iter("Maneuver")) - single_sequence_iteration = py_trees.composites.Parallel( - policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ALL, name=sequence_behavior.name) - for maneuver in all_maneuvers: # Iterates through both CatalogReferences and Maneuvers - maneuver_parallel = py_trees.composites.Parallel( - policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ALL, - name="Maneuver " + maneuver.attrib.get('name')) - for event in maneuver.iter("Event"): - event_sequence = py_trees.composites.Sequence( - name="Event " + event.attrib.get('name')) - parallel_actions = py_trees.composites.Parallel( - policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ALL, name="Actions") - for child in event.iter(): - if child.tag == "Action": - for actor_id in actor_ids: - maneuver_behavior = OpenScenarioParser.convert_maneuver_to_atomic( - child, joint_actor_list[actor_id], - joint_actor_list, self.config.catalogs, self.config) - maneuver_behavior = StoryElementStatusToBlackboard( - maneuver_behavior, "ACTION", child.attrib.get('name')) - parallel_actions.add_child( - oneshot_with_check(variable_name= # See note in get_xml_path - get_xml_path(story, sequence) + '>' + \ - get_xml_path(maneuver, child) + '>' + \ - str(actor_id), - behaviour=maneuver_behavior)) - - if child.tag == "StartTrigger": - # There is always one StartConditions block per Event - parallel_condition_groups = self._create_condition_container( - child, story, "Parallel Condition Groups", sequence, maneuver) - event_sequence.add_child( - parallel_condition_groups) - - parallel_actions = StoryElementStatusToBlackboard( - parallel_actions, "EVENT", event.attrib.get('name')) - event_sequence.add_child(parallel_actions) - maneuver_parallel.add_child( - oneshot_with_check(variable_name=get_xml_path(story, sequence) + '>' + - get_xml_path(maneuver, event), # See get_xml_path - behaviour=event_sequence)) - maneuver_parallel = StoryElementStatusToBlackboard( - maneuver_parallel, "MANEUVER", maneuver.attrib.get('name')) - single_sequence_iteration.add_child( - oneshot_with_check(variable_name=get_xml_path(story, sequence) + '>' + - maneuver.attrib.get('name'), # See get_xml_path - behaviour=maneuver_parallel)) - - # OpenSCENARIO refers to Sequences as Scenes in this instance - single_sequence_iteration = StoryElementStatusToBlackboard( - single_sequence_iteration, "SCENE", sequence.attrib.get('name')) - single_sequence_iteration = repeatable_behavior( - single_sequence_iteration, get_xml_path(story, sequence)) - - sequence_behavior.add_child(single_sequence_iteration) - - if sequence_behavior.children: - parallel_sequences.add_child( - oneshot_with_check(variable_name=get_xml_path(story, sequence), - behaviour=sequence_behavior)) - - if parallel_sequences.children: - parallel_sequences = StoryElementStatusToBlackboard( - parallel_sequences, "ACT", act.attrib.get('name')) - parallel_behavior.add_child(parallel_sequences) - - start_triggers = act.find("StartTrigger") - if list(start_triggers) is not None: - for start_condition in start_triggers: - parallel_start_criteria = self._create_condition_container(start_condition, - story, - "StartConditions") - if parallel_start_criteria.children: - start_conditions.add_child(parallel_start_criteria) - end_triggers = act.find("StopTrigger") - if end_triggers is not None and list(end_triggers) is not None: - for end_condition in end_triggers: - parallel_end_criteria = self._create_condition_container( - end_condition, story, "EndConditions", success_on_all=False) - if parallel_end_criteria.children: - parallel_behavior.add_child(parallel_end_criteria) - - if start_conditions.children: - act_sequence.add_child(start_conditions) - if parallel_behavior.children: - act_sequence.add_child(parallel_behavior) - - if act_sequence.children: - story_behavior.add_child(act_sequence) - - stories_behavior.add_child(oneshot_with_check(variable_name=get_xml_path(story, story) + '>' + - story_name, # See get_xml_path - behaviour=story_behavior)) - - # Build behavior tree - behavior = py_trees.composites.Parallel( - policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ALL, name="behavior") - - init_parameters = self._initialize_parameters() - if init_parameters is not None: - behavior.add_child(oneshot_with_check(variable_name="InitialParameterSettings", behaviour=init_parameters)) - - env_behavior = self._create_environment_behavior() - if env_behavior is not None: - behavior.add_child(oneshot_with_check(variable_name="InitialEnvironmentSettings", behaviour=env_behavior)) - - init_behavior = self._create_init_behavior() - if init_behavior is not None: - behavior.add_child(oneshot_with_check(variable_name="InitialActorSettings", behaviour=init_behavior)) - - behavior.add_child(stories_behavior) - - return behavior - - def _create_weather_behavior(self): - """ - Sets the osc weather behavior, which will monitor other behaviors, changing the weather - """ - return OSCWeatherBehavior() - - def _create_condition_container(self, node, story, name='Conditions Group', sequence=None, - maneuver=None, success_on_all=True): - """ - This is a generic function to handle conditions utilising ConditionGroups - Each ConditionGroup is represented as a Sequence of Conditions - The ConditionGroups are grouped under a SUCCESS_ON_ONE Parallel - """ - - parallel_condition_groups = py_trees.composites.Parallel(name, - policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE) - - for condition_group in node.iter("ConditionGroup"): - if success_on_all: - condition_group_sequence = py_trees.composites.Parallel( - name="Condition Group", policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ALL) - else: - condition_group_sequence = py_trees.composites.Parallel( - name="Condition Group", policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE) - for condition in condition_group.iter("Condition"): - criterion = OpenScenarioParser.convert_condition_to_atomic( - condition, self.other_actors + self.ego_vehicles) - if sequence is not None and maneuver is not None: - xml_path = get_xml_path(story, sequence) + '>' + \ - get_xml_path(maneuver, condition) # See note in get_xml_path - else: - xml_path = get_xml_path(story, condition) - criterion = oneshot_with_check(variable_name=xml_path, behaviour=criterion) - condition_group_sequence.add_child(criterion) - - if condition_group_sequence.children: - parallel_condition_groups.add_child(condition_group_sequence) - - return parallel_condition_groups - - def _create_test_criteria(self): - """ - A list of all test criteria will be created that is later used - in parallel behavior tree. - """ - parallel_criteria = py_trees.composites.Parallel("EndConditions (Criteria Group)", - policy=py_trees.common.ParallelPolicy.SUCCESS_ON_ONE) - - criteria = [] - for endcondition in self.config.storyboard.iter("StopTrigger"): - for condition in endcondition.iter("Condition"): - if condition.attrib.get('name').startswith('criteria_'): - criteria.append(condition) - - for condition in criteria: - criterion = OpenScenarioParser.convert_condition_to_atomic(condition, self.ego_vehicles) - parallel_criteria.add_child(criterion) - - return parallel_criteria - - def __del__(self): - """ - Remove all actors upon deletion - """ - self.remove_all_actors() - - def _initialize_actors(self, config): - """ - Override the superclass method to initialize other actors - """ - if config.other_actors: - for global_action in self.config.init.find("Actions").iter("GlobalAction"): - if global_action.find("EntityAction") is not None: - entity_action = global_action.find("EntityAction") - entity_ref = entity_action.attrib.get("entityRef") - if entity_action.find('AddEntityAction') is not None: - position = entity_action.find('AddEntityAction').find("Position") - actor_transform = OpenScenarioParser.convert_position_to_transform( - position, actor_list=config.other_actors + config.ego_vehicles) - for actor in config.other_actors: - if actor.rolename == entity_ref: - actor.transform = actor_transform - elif entity_action.find('DeleteEntityAction') is not None: - for actor in config.other_actors: - if actor.rolename == entity_ref: - config.other_actors.remove(actor) - - new_actors = CarlaDataProvider.request_new_actors(config.other_actors) - if not new_actors: - raise Exception("Error: Unable to add actors") - for new_actor in new_actors: - self.other_actors.append(new_actor) diff --git a/srunner/scenarios/osc2_scenario.py b/srunner/scenarios/osc2_scenario.py deleted file mode 100644 index f36dc6a..0000000 --- a/srunner/scenarios/osc2_scenario.py +++ /dev/null @@ -1,1447 +0,0 @@ -from __future__ import print_function - -import copy -import math -import operator -import random -import re -import sys -from typing import List, Tuple - -import py_trees -from agents.navigation.global_route_planner import GlobalRoutePlanner - -from srunner.osc2.ast_manager import ast_node -from srunner.osc2.ast_manager.ast_vistor import ASTVisitor - -# OSC2 -from srunner.osc2.symbol_manager.method_symbol import MethodSymbol -from srunner.osc2.symbol_manager.parameter_symbol import ParameterSymbol -from srunner.osc2.utils.log_manager import (LOG_INFO, LOG_ERROR, LOG_WARNING) -from srunner.osc2.utils.relational_operator import RelationalOperator -from srunner.osc2_dm.physical_types import Physical, Range - -# from sqlalchemy import true -# from srunner.osc2_stdlib import event, variables -from srunner.osc2_stdlib.modifier import ( - AccelerationModifier, - ChangeLaneModifier, - ChangeSpeedModifier, - LaneModifier, - PositionModifier, - SpeedModifier, -) - -# OSC2 -from srunner.scenarioconfigs.osc2_scenario_configuration import ( - OSC2ScenarioConfiguration, -) -from srunner.scenariomanager.carla_data_provider import CarlaDataProvider -from srunner.scenariomanager.scenarioatomics.atomic_behaviors import ( - ActorTransformSetter, - ChangeTargetSpeed, - LaneChange, - UniformAcceleration, - WaypointFollower, - calculate_distance, -) -from srunner.scenariomanager.scenarioatomics.atomic_criteria import CollisionTest -from srunner.scenariomanager.scenarioatomics.atomic_trigger_conditions import ( - IfTriggerer, - TimeOfWaitComparison, -) -from srunner.scenariomanager.timer import TimeOut -from srunner.scenarios.basic_scenario import BasicScenario -from srunner.tools.openscenario_parser import oneshot_with_check -from srunner.tools.osc2_helper import OSC2Helper - - -def para_type_str_sequence(config, arguments, line, column, node): - retrieval_name = "" - if isinstance(arguments, List): - for arg in arguments: - if isinstance(arg, Tuple): - if isinstance(arg[1], int): - retrieval_name = retrieval_name + "#int" - elif isinstance(arg[1], float): - retrieval_name = retrieval_name + "#float" - elif isinstance(arg[1], str): - retrieval_name = retrieval_name + "#str" - elif isinstance(arg[1], bool): - retrieval_name = retrieval_name + "#bool" - elif isinstance(arg[1], Physical): - physical_type_name = OSC2Helper.find_physical_type( - config.physical_dict, arg[1].unit.physical.si_base_exponent - ) - - if physical_type_name is None: - pass - else: - physical_type = ( - node.get_scope().resolve(physical_type_name).name - ) - retrieval_name += "#" + physical_type - else: - pass - elif isinstance(arg, str): - retrieval_name = retrieval_name + arg.split(".", 1)[-1] - else: - pass - elif isinstance(arguments, Tuple): - if isinstance(arguments[1], int): - retrieval_name = retrieval_name + "#int" - elif isinstance(arguments[1], float): - retrieval_name = retrieval_name + "#float" - elif isinstance(arguments[1], str): - retrieval_name = retrieval_name + "#str" - elif isinstance(arguments[1], bool): - retrieval_name = retrieval_name + "#bool" - elif isinstance(arguments[1], Physical): - physical_type_name = OSC2Helper.find_physical_type( - config.physical_dict, arguments[1].unit.physical.si_base_exponent - ) - - if physical_type_name is None: - pass - else: - physical_type = node.get_scope().resolve(physical_type_name).name - retrieval_name += "#" + physical_type - else: - pass - elif isinstance(arguments, int): - retrieval_name = retrieval_name + "#int" - elif isinstance(arguments, float): - retrieval_name = retrieval_name + "#float" - elif isinstance(arguments, str): - retrieval_name = retrieval_name + "#str" - elif isinstance(arguments, bool): - retrieval_name = retrieval_name + "#bool" - elif isinstance(arguments, Physical): - physical_type_name = OSC2Helper.find_physical_type( - config.physical_dict, arguments.unit.physical.si_base_exponent - ) - - if physical_type_name is None: - pass - else: - physical_type = node.get_scope().resolve(physical_type_name).name - retrieval_name += "#" + physical_type - else: - pass - return retrieval_name - - -def process_speed_modifier( - config, modifiers, duration: float, all_duration: float, father_tree -): - if not modifiers: - return - - for modifier in modifiers: - actor_name = modifier.get_actor_name() - - if isinstance(modifier, SpeedModifier): - # en_value_mps() The speed unit in Carla is m/s, so the default conversion unit is m/s - target_speed = modifier.get_speed().gen_physical_value() - # target_speed = float(modifier.get_speed())*0.27777778 - actor = CarlaDataProvider.get_actor_by_name(actor_name) - car_driving = WaypointFollower(actor, target_speed) - # car_driving.set_duration(duration) - - father_tree.add_child(car_driving) - - car_config = config.get_car_config(actor_name) - car_config.set_arg({"target_speed": target_speed}) - LOG_WARNING( - f"{actor_name} car speed will be set to {target_speed * 3.6} km/h" - ) - - # # _velocity speed, go straight down the driveway, and will hit the wall - # keep_speed = KeepVelocity(actor, target_speed, duration=father_duration.num) - elif isinstance(modifier, ChangeSpeedModifier): - # speed_delta indicates the increment of velocity - speed_delta = modifier.get_speed().gen_physical_value() - speed_delta = speed_delta * 3.6 - current_car_conf = config.get_car_config(actor_name) - current_car_speed = current_car_conf.get_arg("target_speed") - current_car_speed = current_car_speed * 3.6 - target_speed = current_car_speed + speed_delta - LOG_WARNING( - f"{actor_name} car speed will be changed to {target_speed} km/h" - ) - - actor = CarlaDataProvider.get_actor_by_name(actor_name) - change_speed = ChangeTargetSpeed(actor, target_speed) - - car_driving = WaypointFollower(actor) - # car_driving.set_duration(duration) - - father_tree.add_child(change_speed) - father_tree.add_child(car_driving) - elif isinstance(modifier, AccelerationModifier): - current_car_conf = config.get_car_config(actor_name) - current_car_speed = current_car_conf.get_arg("target_speed") - accelerate_speed = modifier.get_accelerate().gen_physical_value() - target_velocity = current_car_speed + accelerate_speed * duration - actor = CarlaDataProvider.get_actor_by_name(actor_name) - start_time = all_duration - duration - uniform_accelerate_speed = UniformAcceleration( - actor, current_car_speed, target_velocity, accelerate_speed, start_time - ) - print("END ACCELERATION") - car_driving = WaypointFollower(actor) - - father_tree.add_child(uniform_accelerate_speed) - father_tree.add_child(car_driving) - else: - LOG_WARNING("not implement modifier") - - -def process_location_modifier(config, modifiers, duration: float, father_tree): - # position([distance: ] | time: