import enum
import warnings
from abc import abstractmethod
from typing import List, Optional, Set, Union
import numpy as np
from commonroad.common.validity import (
is_real_number,
is_real_number_vector,
is_valid_orientation,
)
from commonroad.geometry.shape import (
Circle,
Polygon,
Rectangle,
Shape,
occupancy_shape_from_state,
shape_group_occupancy_shape_from_state,
)
from commonroad.prediction.prediction import (
Occupancy,
Prediction,
SetBasedPrediction,
TrajectoryPrediction,
)
from commonroad.scenario.state import (
InitialState,
MetaInformationState,
SignalState,
TraceState,
)
from commonroad.visualization.draw_params import (
DynamicObstacleParams,
EnvironmentObstacleParams,
OptionalSpecificOrAllDrawParams,
PhantomObstacleParams,
StaticObstacleParams,
)
from commonroad.visualization.drawable import IDrawable
from commonroad.visualization.renderer import IRenderer
[docs]@enum.unique
class ObstacleRole(enum.Enum):
"""Enum containing all possible obstacle roles defined in CommonRoad."""
STATIC = "static"
DYNAMIC = "dynamic"
ENVIRONMENT = "environment"
Phantom = "phantom"
[docs]@enum.unique
class ObstacleType(enum.Enum):
"""Enum containing all possible obstacle types defined in CommonRoad."""
UNKNOWN = "unknown"
CAR = "car"
TRUCK = "truck"
BUS = "bus"
BICYCLE = "bicycle"
PEDESTRIAN = "pedestrian"
PRIORITY_VEHICLE = "priorityVehicle"
PARKED_VEHICLE = "parkedVehicle"
CONSTRUCTION_ZONE = "constructionZone"
TRAIN = "train"
ROAD_BOUNDARY = "roadBoundary"
MOTORCYCLE = "motorcycle"
TAXI = "taxi"
BUILDING = "building"
PILLAR = "pillar"
MEDIAN_STRIP = "median_strip"
[docs]class Obstacle(IDrawable):
"""Superclass for dynamic and static obstacles holding common properties
defined in CommonRoad."""
def __init__(
self,
obstacle_id: int,
obstacle_role: ObstacleRole,
obstacle_type: ObstacleType,
obstacle_shape: Shape,
initial_state: InitialState = None,
initial_center_lanelet_ids: Optional[Set[int]] = None,
initial_shape_lanelet_ids: Optional[Set[int]] = None,
initial_signal_state: Optional[SignalState] = None,
signal_series: Optional[List[SignalState]] = None,
):
"""
:param obstacle_id: unique ID of the obstacle
:param obstacle_role: obstacle role as defined in CommonRoad
:param obstacle_type: obstacle type as defined in CommonRoad (e.g. PARKED_VEHICLE)
:param obstacle_shape: occupied area of the obstacle
:param initial_state: initial state of the obstacle
:param initial_center_lanelet_ids: initial IDs of lanelets the obstacle center is on
:param initial_shape_lanelet_ids: initial IDs of lanelets the obstacle shape is on
:param initial_signal_state: initial signal state of obstacle
:param signal_series: list of signal states over time
"""
self._initial_occupancy_shape: Optional[Shape] = None
self.obstacle_id: int = obstacle_id
self.obstacle_role: ObstacleRole = obstacle_role
self.obstacle_type: ObstacleType = obstacle_type
self.obstacle_shape: Shape = obstacle_shape
self.initial_state: InitialState = initial_state
self.initial_center_lanelet_ids: Optional[Set[int]] = initial_center_lanelet_ids
self.initial_shape_lanelet_ids: Optional[Set[int]] = initial_shape_lanelet_ids
self.initial_signal_state: Optional[SignalState] = initial_signal_state
self.signal_series: Optional[List[SignalState]] = signal_series
def __eq__(self, other):
if not isinstance(other, Obstacle):
warnings.warn(f"Inequality between Obstacle {repr(self)} and different type {type(other)}")
return False
initial_center_lanelet_ids = (
list() if self._initial_center_lanelet_ids is None else list(self._initial_center_lanelet_ids)
)
initial_center_lanelet_ids_other = (
list() if other.initial_center_lanelet_ids is None else list(other.initial_center_lanelet_ids)
)
initial_shape_lanelet_ids = (
list() if self._initial_shape_lanelet_ids is None else list(self._initial_shape_lanelet_ids)
)
initial_shape_lanelet_ids_other = (
list() if other.initial_shape_lanelet_ids is None else list(other.initial_shape_lanelet_ids)
)
obstacle_eq = (
self._obstacle_id == other.obstacle_id
and self._obstacle_role == other.obstacle_role
and self._obstacle_type == other.obstacle_type
and self._obstacle_shape == other.obstacle_shape
and self._initial_state == other.initial_state
and initial_center_lanelet_ids == initial_center_lanelet_ids_other
and initial_shape_lanelet_ids == initial_shape_lanelet_ids_other
and self._initial_signal_state == other.initial_signal_state
and self._signal_series == other.signal_series
)
return obstacle_eq
def __hash__(self):
return hash(
(
self._obstacle_id,
self._obstacle_role,
self._obstacle_type,
self._obstacle_shape,
self._initial_state,
frozenset(self.initial_center_lanelet_ids),
frozenset(self.initial_shape_lanelet_ids),
self._initial_signal_state,
tuple(self.signal_series),
)
)
@property
def obstacle_id(self) -> int:
"""Unique ID of the obstacle."""
return self._obstacle_id
@obstacle_id.setter
def obstacle_id(self, obstacle_id: int):
assert isinstance(
obstacle_id, int
), "<Obstacle/obstacle_id>: argument obstacle_id of wrong type." "Expected type: %s. Got type: %s." % (
int,
type(obstacle_id),
)
if not hasattr(self, "_obstacle_id"):
self._obstacle_id = obstacle_id
else:
warnings.warn("<Obstacle/obstacle_id>: Obstacle ID is immutable.")
@property
def obstacle_role(self) -> ObstacleRole:
"""Obstacle role as defined in CommonRoad."""
return self._obstacle_role
@obstacle_role.setter
def obstacle_role(self, obstacle_role: ObstacleRole):
assert isinstance(
obstacle_role, ObstacleRole
), "<Obstacle/obstacle_role>: argument obstacle_role of wrong " "type. Expected type: %s. Got type: %s." % (
ObstacleRole,
type(obstacle_role),
)
if not hasattr(self, "_obstacle_role"):
self._obstacle_role = obstacle_role
else:
warnings.warn("<Obstacle/obstacle_role>: Obstacle role is immutable.")
@property
def obstacle_type(self) -> ObstacleType:
"""Obstacle type as defined in CommonRoad."""
return self._obstacle_type
@obstacle_type.setter
def obstacle_type(self, obstacle_type: ObstacleType):
assert isinstance(
obstacle_type, ObstacleType
), "<Obstacle/obstacle_type>: argument obstacle_type of wrong " "type. Expected type: %s. Got type: %s." % (
ObstacleType,
type(obstacle_type),
)
if not hasattr(self, "_obstacle_type"):
self._obstacle_type = obstacle_type
else:
warnings.warn("<Obstacle/obstacle_type>: Obstacle type is immutable.")
@property
def obstacle_shape(self) -> Union[Shape, Rectangle, Circle, Polygon]:
"""Obstacle shape as defined in CommonRoad."""
return self._obstacle_shape
@obstacle_shape.setter
def obstacle_shape(self, shape: Union[Shape, Rectangle, Circle, Polygon]):
assert isinstance(
shape, (type(None), Shape)
), "<Obstacle/obstacle_shape>: argument shape of wrong type. Expected " "type %s. Got type %s." % (
Shape,
type(shape),
)
if not hasattr(self, "_obstacle_shape"):
self._obstacle_shape = shape
else:
warnings.warn("<Obstacle/obstacle_shape>: Obstacle shape is immutable.")
@property
def initial_state(self) -> InitialState:
"""Initial state of the obstacle, e.g., obtained through sensor measurements."""
return self._initial_state
@initial_state.setter
def initial_state(self, initial_state: InitialState):
assert isinstance(
initial_state, InitialState
), "<Obstacle/initial_state>: argument initial_state of wrong type. " "Expected types: %s. Got type: %s." % (
InitialState,
type(initial_state),
)
self._initial_state = initial_state
self._initial_occupancy_shape = occupancy_shape_from_state(self._obstacle_shape, initial_state)
if not hasattr(self, "wheelbase_lengths"):
return
shapes = self.obstacle_shape.shapes
self._initial_occupancy_shape = shape_group_occupancy_shape_from_state(
shapes, initial_state, self.wheelbase_lengths
)
@property
def initial_center_lanelet_ids(self) -> Union[None, Set[int]]:
"""Initial lanelets of obstacle center, e.g., obtained through localization."""
return self._initial_center_lanelet_ids
@initial_center_lanelet_ids.setter
def initial_center_lanelet_ids(self, initial_center_lanelet_ids: Union[None, Set[int]]):
assert isinstance(initial_center_lanelet_ids, (set, type(None))), (
"<Obstacle/initial_center_lanelet_ids>: argument initial_lanelet_ids of wrong type. "
"Expected types: %s, %s. Got type: %s." % (set, type(None), type(initial_center_lanelet_ids))
)
if initial_center_lanelet_ids is not None:
for lanelet_id in initial_center_lanelet_ids:
assert isinstance(lanelet_id, int), (
"<Obstacle/initial_center_lanelet_ids>: argument initial_lanelet of wrong type. "
"Expected types: %s. Got type: %s." % (int, type(lanelet_id))
)
self._initial_center_lanelet_ids = initial_center_lanelet_ids
@property
def initial_shape_lanelet_ids(self) -> Union[None, Set[int]]:
"""Initial lanelets of obstacle shape, e.g., obtained through localization."""
return self._initial_shape_lanelet_ids
@initial_shape_lanelet_ids.setter
def initial_shape_lanelet_ids(self, initial_shape_lanelet_ids: Union[None, Set[int]]):
assert isinstance(initial_shape_lanelet_ids, (set, type(None))), (
"<Obstacle/initial_shape_lanelet_ids>: argument initial_lanelet_ids of wrong type. "
"Expected types: %s, %s. Got type: %s." % (set, type(None), type(initial_shape_lanelet_ids))
)
if initial_shape_lanelet_ids is not None:
for lanelet_id in initial_shape_lanelet_ids:
assert isinstance(lanelet_id, int), (
"<Obstacle/initial_shape_lanelet_ids>: argument initial_lanelet of wrong type. "
"Expected types: %s. Got type: %s." % (int, type(lanelet_id))
)
self._initial_shape_lanelet_ids = initial_shape_lanelet_ids
@property
def initial_signal_state(self) -> SignalState:
"""Signal state as defined in CommonRoad."""
return self._initial_signal_state
@initial_signal_state.setter
def initial_signal_state(self, initial_signal_state: SignalState):
assert isinstance(initial_signal_state, (SignalState, type(None))), (
"<Obstacle/initial_signal_state>: "
"argument initial_signal_state of wrong "
"type. Expected types: %s, %s. Got type: %s." % (SignalState, type(None), type(initial_signal_state))
)
self._initial_signal_state = initial_signal_state
@property
def signal_series(self) -> List[SignalState]:
"""Signal series as defined in CommonRoad."""
return self._signal_series
@signal_series.setter
def signal_series(self, signal_series: List[SignalState]):
assert isinstance(signal_series, (list, type(None))), (
"<Obstacle/initial_signal_state>: "
"argument initial_signal_state of wrong "
"type. Expected types: %s, %s. Got type: %s." % (list, type(None), type(signal_series))
)
self._signal_series = signal_series
@abstractmethod
def occupancy_at_time(self, time_step: int) -> Union[None, Occupancy]:
pass
@abstractmethod
def state_at_time(self, time_step: int) -> Union[None, TraceState]:
pass
@abstractmethod
def translate_rotate(self, translation: np.ndarray, angle: float):
pass
[docs] def signal_state_at_time_step(self, time_step: int) -> Union[SignalState, None]:
"""
Extracts signal state at a time step
:param time_step: time step of interest
:returns: signal state or None if time step does not exist
"""
if self.initial_signal_state is not None and time_step == self.initial_signal_state.time_step:
return self.initial_signal_state
elif self.signal_series is None:
return None
else:
for state in self.signal_series:
if state.time_step == time_step:
return state
return None
[docs]class StaticObstacle(Obstacle):
"""Class representing static obstacles as defined in CommonRoad."""
def __init__(
self,
obstacle_id: int,
obstacle_type: ObstacleType,
obstacle_shape: Shape,
initial_state: InitialState,
initial_center_lanelet_ids: Union[None, Set[int]] = None,
initial_shape_lanelet_ids: Union[None, Set[int]] = None,
initial_signal_state: Union[None, SignalState] = None,
signal_series: List[SignalState] = None,
):
"""
:param obstacle_id: unique ID of the obstacle
:param obstacle_type: type of obstacle (e.g. PARKED_VEHICLE)
:param obstacle_shape: shape of the static obstacle
:param initial_state: initial state of the static obstacle
:param initial_center_lanelet_ids: initial IDs of lanelets the obstacle center is on
:param initial_shape_lanelet_ids: initial IDs of lanelets the obstacle shape is on
:param initial_signal_state: initial signal state of static obstacle
:param signal_series: list of signal states over time
"""
Obstacle.__init__(
self,
obstacle_id=obstacle_id,
obstacle_role=ObstacleRole.STATIC,
obstacle_type=obstacle_type,
obstacle_shape=obstacle_shape,
initial_state=initial_state,
initial_center_lanelet_ids=initial_center_lanelet_ids,
initial_shape_lanelet_ids=initial_shape_lanelet_ids,
initial_signal_state=initial_signal_state,
signal_series=signal_series,
)
def __eq__(self, other):
if not isinstance(other, StaticObstacle):
warnings.warn(f"Inequality between StaticObstacle {repr(self)} and different type {type(other)}")
return False
return Obstacle.__eq__(self, other)
def __hash__(self):
return Obstacle.__hash__(self)
[docs] def translate_rotate(self, translation: np.ndarray, angle: float):
"""First translates the static obstacle, then rotates the static obstacle around the origin.
:param translation: translation vector [x_off, y_off] in x- and y-direction
:param angle: rotation angle in radian (counter-clockwise)
"""
assert is_real_number_vector(translation, 2), (
"<StaticObstacle/translate_rotate>: argument translation is " "not a vector of real numbers of length 2."
)
assert is_real_number(angle), (
"<StaticObstacle/translate_rotate>: argument angle must be a scalar. " "angle = %s" % angle
)
assert is_valid_orientation(angle), (
"<StaticObstacle/translate_rotate>: argument angle must be within the "
"interval [-2pi, 2pi]. angle = %s" % angle
)
self.initial_state = self._initial_state.translate_rotate(translation, angle)
[docs] def occupancy_at_time(self, time_step: int) -> Occupancy:
"""
Returns the predicted occupancy of the obstacle at a specific time step.
:param time_step: discrete time step
:return: occupancy of the static obstacle at time step
"""
return Occupancy(time_step=time_step, shape=self._initial_occupancy_shape)
[docs] def state_at_time(self, time_step: int) -> TraceState:
"""
Returns the state the obstacle at a specific time step.
:param time_step: discrete time step
:return: state of the static obstacle at time step
"""
return self.initial_state
def __str__(self):
obs_str = "Static Obstacle:\n"
obs_str += "\nid: {}".format(self.obstacle_id)
obs_str += "\ntype: {}".format(self.obstacle_type.value)
obs_str += "\ninitial state: {}".format(self.initial_state)
return obs_str
[docs] def draw(self, renderer: IRenderer, draw_params: OptionalSpecificOrAllDrawParams[StaticObstacleParams] = None):
renderer.draw_static_obstacle(self, draw_params)
[docs]class DynamicObstacle(Obstacle):
"""Class representing dynamic obstacles as defined in CommonRoad. Each dynamic obstacle has stored its predicted
movement in future time steps.
"""
def __init__(
self,
obstacle_id: int,
obstacle_type: ObstacleType,
obstacle_shape: Shape,
initial_state: InitialState,
prediction: Union[None, Prediction, TrajectoryPrediction, SetBasedPrediction] = None,
initial_center_lanelet_ids: Optional[Set[int]] = None,
initial_shape_lanelet_ids: Optional[Set[int]] = None,
initial_signal_state: Optional[SignalState] = None,
signal_series: List[SignalState] = None,
initial_meta_information_state: MetaInformationState = None,
meta_information_series: List[MetaInformationState] = None,
external_dataset_id: int = None,
history: Optional[List[TraceState]] = None,
signal_history: Optional[List[SignalState]] = None,
center_lanelet_ids_history: Optional[List[Set[int]]] = None,
shape_lanelet_ids_history: Optional[List[Set[int]]] = None,
**kwargs,
):
"""
:param obstacle_id: unique ID of the obstacle
:param obstacle_type: type of obstacle (e.g. PARKED_VEHICLE)
:param obstacle_shape: shape of the static obstacle
:param initial_state: initial state of the static obstacle
:param prediction: predicted movement of the dynamic obstacle
:param initial_center_lanelet_ids: initial IDs of lanelets the obstacle center is on
:param initial_shape_lanelet_ids: initial IDs of lanelets the obstacle shape is on
:param initial_signal_state: initial signal state of static obstacle
:param signal_series: list of signal states over time
:param wheelbase: list of wheelbase lengths
:param initial_meta_information_state: meta information of the dynamic obstacle
:param meta_information_series: list of meta information
:param external_dataset_id: ID of the external dataset
:param history: History of actual states
:param signal_history: History of signal states
"""
for field, value in kwargs.items():
setattr(self, field, value)
Obstacle.__init__(
self,
obstacle_id=obstacle_id,
obstacle_role=ObstacleRole.DYNAMIC,
obstacle_type=obstacle_type,
obstacle_shape=obstacle_shape,
initial_state=initial_state,
initial_center_lanelet_ids=initial_center_lanelet_ids,
initial_shape_lanelet_ids=initial_shape_lanelet_ids,
initial_signal_state=initial_signal_state,
signal_series=signal_series,
)
self.prediction: Prediction = prediction
self.initial_meta_information_state = initial_meta_information_state
self.meta_information_series = meta_information_series
self.external_dataset_id = external_dataset_id
self.history = history or []
self.signal_history = signal_history or []
self.center_lanelet_ids_history = center_lanelet_ids_history or []
self.shape_lanelet_ids_history = shape_lanelet_ids_history or []
def __eq__(self, other):
if not isinstance(other, DynamicObstacle):
warnings.warn(f"Inequality between DynamicObstacle {repr(self)} and different type {type(other)}")
return False
return (
self._prediction == other.prediction
and Obstacle.__eq__(self, other)
and self._initial_meta_information_state == other.initial_meta_information_state
and self._meta_information_series == other.meta_information_series
and self._external_dataset_id == other.external_dataset_id
and self.history == other.history
and self.signal_history == other.signal_history
and self.center_lanelet_ids_history == other.center_lanelet_ids_history
and self.shape_lanelet_ids_history == other.shape_lanelet_ids_history
)
def __hash__(self):
center_lanelet_ids_history = (
None
if self.center_lanelet_ids_history is None
else tuple(frozenset(value) for value in self.center_lanelet_ids_history)
)
shape_lanelet_ids_history = (
None
if self.shape_lanelet_ids_history is None
else tuple(frozenset(value) for value in self.shape_lanelet_ids_history)
)
return hash(
(
self._prediction,
self._initial_meta_information_state,
tuple(self._meta_information_series),
self._external_dataset_id,
tuple(self.history),
tuple(self.signal_history),
center_lanelet_ids_history,
shape_lanelet_ids_history,
Obstacle.__hash__(self),
)
)
@property
def prediction(self) -> Union[None, Prediction, TrajectoryPrediction, SetBasedPrediction]:
"""Prediction describing the movement of the dynamic obstacle over time."""
return self._prediction
@prediction.setter
def prediction(self, prediction: Union[Prediction, TrajectoryPrediction, SetBasedPrediction, None]):
assert isinstance(prediction, (Prediction, type(None))), (
"<DynamicObstacle/prediction>: argument prediction "
"of wrong type. Expected types: %s, %s. Got type: "
"%s." % (Prediction, type(None), type(prediction))
)
self._prediction = prediction
@property
def initial_meta_information_state(self) -> Union[None, MetaInformationState]:
"""Meta information of the dynamic obstacle."""
return self._initial_meta_information_state
@initial_meta_information_state.setter
def initial_meta_information_state(self, initial_meta_information_state: Union[MetaInformationState, None]):
assert isinstance(initial_meta_information_state, (MetaInformationState, type(None))), (
"<DynamicObstacle/initial_meta_information_state>: argument prediction "
"of wrong type. Expected types: %s, %s. Got type: %s."
% (MetaInformationState, type(None), type(initial_meta_information_state))
)
self._initial_meta_information_state = initial_meta_information_state
@property
def meta_information_series(self) -> Union[None, List[MetaInformationState]]:
"""List of meta information."""
return self._meta_information_series
@meta_information_series.setter
def meta_information_series(self, meta_information_series: Union[List[MetaInformationState], None]):
assert isinstance(meta_information_series, (List, type(None))), (
"<DynamicObstacle/meta_information_series>: argument prediction "
"of wrong type. Expected types: %s, %s. Got type: %s." % (List, type(None), type(meta_information_series))
)
self._meta_information_series = meta_information_series
@property
def external_dataset_id(self) -> Union[int, None]:
"""ID of the external dataset."""
return self._external_dataset_id
@external_dataset_id.setter
def external_dataset_id(self, external_dataset_id: Union[int, None]):
assert isinstance(external_dataset_id, (int, type(None))), (
"<DynamicObstacle/external_dataset_id>: argument prediction of wrong type. "
"Expected types: %s, %s. Got type: %s." % (int, type(None), type(external_dataset_id))
)
self._external_dataset_id = external_dataset_id
[docs] def occupancy_at_time(self, time_step: int) -> Union[None, Occupancy]:
"""
Returns the predicted occupancy of the obstacle at a specific time step.
:param time_step: discrete time step
:return: predicted occupancy of the obstacle at time step
"""
occupancy = None
if time_step == self.initial_state.time_step:
occupancy = Occupancy(time_step, self._initial_occupancy_shape)
elif time_step > self.initial_state.time_step and self._prediction is not None:
occupancy = self._prediction.occupancy_at_time_step(time_step)
return occupancy
[docs] def state_at_time(self, time_step: int) -> Union[None, TraceState]:
"""
Returns the predicted state of the obstacle at a specific time step.
:param time_step: discrete time step
:return: predicted state of the obstacle at time step
"""
if time_step == self.initial_state.time_step:
return self.initial_state
elif type(self._prediction) is SetBasedPrediction:
warnings.warn("<DynamicObstacle/state_at_time>: Set-based prediction is used. State cannot be returned!")
return None
elif time_step > self.initial_state.time_step and self._prediction is not None:
return self.prediction.trajectory.state_at_time_step(time_step)
else:
return None
[docs] def translate_rotate(self, translation: np.ndarray, angle: float):
"""First translates the dynamic obstacle, then rotates the dynamic obstacle around the origin.
:param translation: translation vector [x_off, y_off] in x- and y-direction
:param angle: rotation angle in radian (counter-clockwise)
"""
assert is_real_number_vector(translation, 2), (
"<DynamicObstacle/translate_rotate>: argument translation is " "not a vector of real numbers of length 2."
)
assert is_real_number(angle), (
"<DynamicObstacle/translate_rotate>: argument angle must be a scalar. " "angle = %s" % angle
)
assert is_valid_orientation(angle), (
"<DynamicObstacle/translate_rotate>: argument angle must be within the "
"interval [-2pi, 2pi]. angle = %s" % angle
)
if self._prediction is not None:
self.prediction.translate_rotate(translation, angle)
self.initial_state = self._initial_state.translate_rotate(translation, angle)
[docs] def update_initial_state(
self,
current_state: TraceState,
current_signal_state: Optional[SignalState] = None,
current_center_lanelet_ids: Optional[Set[int]] = None,
current_shape_lanelet_ids: Optional[Set[int]] = None,
max_history_length: int = 6000,
):
"""Updates the initial state to the given current state, appends the current initial state to the history, and
invalidates the prediction.
:param current_state: current state of the dynamic obstacle, will become the new initial state
:param current_signal_state: current signal state of the dynamic obstacle, will become the new initial signal
state
:param current_center_lanelet_ids: current center lanelet ids of the dynamic obstacle, will become the new
initial center lanelet ids
:param current_shape_lanelet_ids: current shape lanelet ids of the dynamic obstacle, will become the new
initial shape lanelet ids
:param max_history_length: maximum length of the history, if the history exceeds this it will be truncated,
dropping the oldest elements first; must be greater than 0
"""
assert max_history_length > 0, (
f"<DynamicObstacle/update_initial_state>: argument max_history_length must be"
f"greater than 0. max_history_length = {max_history_length}"
)
# append current initial state to history
self.history.append(self.initial_state)
self.signal_history.append(self.initial_signal_state)
self.center_lanelet_ids_history.append(self.initial_center_lanelet_ids)
self.shape_lanelet_ids_history.append(self.initial_shape_lanelet_ids)
# set initial state to current state
self.initial_state = current_state
self.initial_signal_state = current_signal_state
self.initial_center_lanelet_ids = current_center_lanelet_ids
self.initial_shape_lanelet_ids = current_shape_lanelet_ids
# invalidate prediction
self.prediction = None
self.signal_series = None
# truncate history if it is longer than desired max. length
if len(self.history) > max_history_length:
self.history = self.history[-max_history_length:]
self.signal_history = self.signal_history[-max_history_length:]
self.center_lanelet_ids_history = self.center_lanelet_ids_history[-max_history_length:]
self.shape_lanelet_ids_history = self.shape_lanelet_ids_history[-max_history_length:]
[docs] def update_prediction(
self,
prediction: Union[Prediction, SetBasedPrediction, TrajectoryPrediction],
signal_series: Optional[List[SignalState]] = None,
):
"""Updates the prediction of the dynamic obstacle.
:param prediction: updated movement prediction of the dynamic obstacle
:param signal_series: updated prediction of the signal state of the dynamic obstacle
"""
self.prediction = prediction
self.signal_series = signal_series
def __str__(self):
obs_str = "Dynamic Obstacle:\n"
obs_str += "\nid: {}".format(self.obstacle_id)
obs_str += "\ntype: {}".format(self.obstacle_type.value)
obs_str += "\ninitial state: {}".format(self.initial_state)
return obs_str
[docs] def draw(self, renderer: IRenderer, draw_params: OptionalSpecificOrAllDrawParams[DynamicObstacleParams] = None):
renderer.draw_dynamic_obstacle(self, draw_params)
class PhantomObstacle(IDrawable):
"""Class representing phantom obstacles as defined in CommonRoad. Each phantom obstacle has stored its predicted
movement in future time steps as occupancy set.
"""
def __init__(self, obstacle_id: int, prediction: SetBasedPrediction = None):
"""
Constructor of PhantomObstacle object.
:param obstacle_id: unique ID of the obstacle
:param prediction: set-based prediction of phantom obstacle
"""
self.obstacle_id = obstacle_id
self.prediction: SetBasedPrediction = prediction
self.obstacle_role: ObstacleRole = ObstacleRole.Phantom
def __eq__(self, other):
if not isinstance(other, PhantomObstacle):
warnings.warn(f"Inequality between PhantomObstacle {repr(self)} and different type {type(other)}")
return False
obstacle_eq = self.obstacle_id == other.obstacle_id and self.prediction == other.prediction
return obstacle_eq
def __hash__(self):
return hash((self.obstacle_id, self._prediction))
@property
def prediction(self) -> Union[SetBasedPrediction, None]:
"""Prediction describing the movement of the dynamic obstacle over time."""
return self._prediction
@prediction.setter
def prediction(self, prediction: Union[Prediction, TrajectoryPrediction, SetBasedPrediction, None]):
assert isinstance(prediction, (SetBasedPrediction, type(None))), (
"<PhantomObstacle/prediction>: argument prediction of wrong type. Expected types: %s, %s. Got type: "
"%s." % (SetBasedPrediction, type(None), type(prediction))
)
self._prediction = prediction
@property
def obstacle_role(self) -> ObstacleRole:
"""Obstacle role as defined in CommonRoad."""
return self._obstacle_role
@obstacle_role.setter
def obstacle_role(self, obstacle_role: ObstacleRole):
assert isinstance(
obstacle_role, ObstacleRole
), "<Obstacle/obstacle_role>: argument obstacle_role of wrong " "type. Expected type: %s. Got type: %s." % (
ObstacleRole,
type(obstacle_role),
)
if not hasattr(self, "_obstacle_role"):
self._obstacle_role = obstacle_role
else:
warnings.warn("<Obstacle/obstacle_role>: Obstacle role is immutable.")
def occupancy_at_time(self, time_step: int) -> Union[None, Occupancy]:
"""
Returns the predicted occupancy of the obstacle at a specific time step.
:param time_step: discrete time step
:return: predicted occupancy of the obstacle at time step
"""
occupancy = None
if self._prediction is not None and self._prediction.occupancy_at_time_step(time_step) is not None:
occupancy = self._prediction.occupancy_at_time_step(time_step)
else:
warnings.warn("<PhantomObstacle/occupancy_at_time>: Time step does not exist!")
return occupancy
@staticmethod
def state_at_time() -> Union[None, TraceState]:
"""
Returns the predicted state of the obstacle at a specific time step.
:return: predicted state of the obstacle at time step
"""
warnings.warn("<PhantomObstacle/state_at_time>: Set-based prediction is used. State cannot be returned!")
return None
def translate_rotate(self, translation: np.ndarray, angle: float):
"""First translates the dynamic obstacle, then rotates the dynamic obstacle around the origin.
:param translation: translation vector [x_off, y_off] in x- and y-direction
:param angle: rotation angle in radian (counter-clockwise)
"""
assert is_real_number_vector(translation, 2), (
"<DynamicObstacle/translate_rotate>: argument translation is " "not a vector of real numbers of length 2."
)
assert is_real_number(angle), (
"<DynamicObstacle/translate_rotate>: argument angle must be a scalar. " "angle = %s" % angle
)
assert is_valid_orientation(angle), (
"<DynamicObstacle/translate_rotate>: argument angle must be within the "
"interval [-2pi, 2pi]. angle = %s" % angle
)
if self._prediction is not None:
self.prediction.translate_rotate(translation, angle)
def __str__(self):
obs_str = "Phantom Obstacle:\n"
obs_str += "\nid: {}".format(self.obstacle_id)
return obs_str
def draw(self, renderer: IRenderer, draw_params: OptionalSpecificOrAllDrawParams[PhantomObstacleParams] = None):
renderer.draw_phantom_obstacle(self, draw_params)
class EnvironmentObstacle(IDrawable):
"""Class representing environment obstacles as defined in CommonRoad."""
def __init__(self, obstacle_id: int, obstacle_type: ObstacleType, obstacle_shape: Shape):
"""
:param obstacle_id: unique ID of the obstacle
:param obstacle_type: type of obstacle (e.g. BUILDING)
:param obstacle_shape: shape of the static obstacle
"""
self.obstacle_id: int = obstacle_id
self.obstacle_role: ObstacleRole = ObstacleRole.ENVIRONMENT
self.obstacle_type: ObstacleType = obstacle_type
self.obstacle_shape: Shape = obstacle_shape
def __eq__(self, other):
if not isinstance(other, EnvironmentObstacle):
warnings.warn(f"Inequality between EnvironmentObstacle {repr(self)} and different type {type(other)}")
return False
obstacle_eq = (
self._obstacle_id == other.obstacle_id
and self._obstacle_role == other.obstacle_role
and self._obstacle_type == other.obstacle_type
and self._obstacle_shape == other.obstacle_shape
)
return obstacle_eq
def __hash__(self):
return hash((self._obstacle_id, self._obstacle_role, self._obstacle_type, self._obstacle_shape))
@property
def obstacle_id(self) -> int:
"""Unique ID of the obstacle."""
return self._obstacle_id
@obstacle_id.setter
def obstacle_id(self, obstacle_id: int):
assert isinstance(
obstacle_id, int
), "<Obstacle/obstacle_id>: argument obstacle_id of wrong type." "Expected type: %s. Got type: %s." % (
int,
type(obstacle_id),
)
if not hasattr(self, "_obstacle_id"):
self._obstacle_id = obstacle_id
else:
warnings.warn("<Obstacle/obstacle_id>: Obstacle ID is immutable.")
@property
def obstacle_role(self) -> ObstacleRole:
"""Obstacle role as defined in CommonRoad."""
return self._obstacle_role
@obstacle_role.setter
def obstacle_role(self, obstacle_role: ObstacleRole):
assert isinstance(
obstacle_role, ObstacleRole
), "<Obstacle/obstacle_role>: argument obstacle_role of wrong " "type. Expected type: %s. Got type: %s." % (
ObstacleRole,
type(obstacle_role),
)
if not hasattr(self, "_obstacle_role"):
self._obstacle_role = obstacle_role
else:
warnings.warn("<Obstacle/obstacle_role>: Obstacle role is immutable.")
@property
def obstacle_type(self) -> ObstacleType:
"""Obstacle type as defined in CommonRoad."""
return self._obstacle_type
@obstacle_type.setter
def obstacle_type(self, obstacle_type: ObstacleType):
assert isinstance(
obstacle_type, ObstacleType
), "<Obstacle/obstacle_type>: argument obstacle_type of wrong " "type. Expected type: %s. Got type: %s." % (
ObstacleType,
type(obstacle_type),
)
if not hasattr(self, "_obstacle_type"):
self._obstacle_type = obstacle_type
else:
warnings.warn("<Obstacle/obstacle_type>: Obstacle type is immutable.")
@property
def obstacle_shape(self) -> Union[Shape, Polygon, Circle, Rectangle]:
"""Obstacle shape as defined in CommonRoad."""
return self._obstacle_shape
@obstacle_shape.setter
def obstacle_shape(self, shape: Union[Shape, Polygon, Circle, Rectangle]):
assert isinstance(
shape, (type(None), Shape)
), "<Obstacle/obstacle_shape>: argument shape of wrong type. Expected " "type %s. Got type %s." % (
Shape,
type(shape),
)
if not hasattr(self, "_obstacle_shape"):
self._obstacle_shape = shape
else:
warnings.warn("<Obstacle/obstacle_shape>: Obstacle shape is immutable.")
def occupancy_at_time(self, time_step: int) -> Occupancy:
"""
Returns the predicted occupancy of the obstacle at a specific time step.
:param time_step: discrete time step
:return: occupancy of the static obstacle at time step
"""
return Occupancy(time_step=time_step, shape=self._obstacle_shape)
def __str__(self):
obs_str = "Environment Obstacle:\n"
obs_str += "\nid: {}".format(self.obstacle_id)
return obs_str
def draw(self, renderer: IRenderer, draw_params: OptionalSpecificOrAllDrawParams[EnvironmentObstacleParams] = None):
renderer.draw_environment_obstacle(self, draw_params)