trajectory_supervisor.supervisor module¶
- class trajectory_supervisor.supervisor.Supervisor(module_enabled: dict, supmod_config_path: str, veh_params: dict, logging_param: Optional[dict] = None, zone_file_path: Optional[str] = None, occ_map_path: Optional[str] = None, use_mp: bool = False)[source]¶
Bases:
objectSupervisor class, holding the core functionality and execution steps for an online verification of motion primitives. This class handles the following core tasks:
Holds methods to retrieve the latest parameter updates (e.g. trajectories, object list, …)
Handles the temporal synchronisation of trajectories and object-list
Triggers calculation and retrieves results of safety scores
- Authors
Tim Stahl <tim.stahl@tum.de>
- Created on
14.06.2019
- Parameters
module_enabled – dict of dicts specifying which modules of the supervisor are enabled
supmod_config_path – path pointing to the config file of the Supervisor modules
logging_param –
dict of logging parameters (e.g. file paths), must hold the following keys: file_log_level - string spec. file log level (e.g. INFO, WARNING, CRITICAL) console_log_level - string spec. console log level (e.g. INFO, WARNING, CRITICAL) log_path_data - string for path of file where the data log should be generated log_path_msg - string for path of file where the msg log should be generated log_path_map - string for path of file where the map log should be generated NOTE: - all log files should reside in the same folder, for the log viewer to
automatically locate the most recent log, stick to the example on github
if “None” is provided, logging is disabled; ignoring setting in config
veh_params –
dict of vehicle parameters; must hold the following keys: veh_width - width of the ego-vehicle [in m]
NOTE: also used for offline pre-calculations of other vehicles
- veh_length - length of the ego-vehicle [in m]
NOTE: also used for offline pre-calculations of other vehicles
turn_rad - turning radius of vehicle [in m] dyn_model_exp - exponent used in vehicle dynamics model (range [1.0, 2.0])
NOTE: 2.0 -> ideal friction circle; 1.0 -> clean diamond shape
- drag_coeff - drag coeff. incl. all constants (0.5*c_w*A_front*rho_air)
set zero to disable [in m2*kg/m3]
m_veh - vehicle mass [in kg]
zone_file_path – (optional) path pointing to a Roborace zone spec. (e.g. for reach set reduct.)
occ_map_path – (optional) path pointing to location where occupation map can be stored to avoid offline calculation on every launch
use_mp – (optional) if set to true and executed on a Linux machine, multiprocessing is used
- get_fired_modules() → list[source]¶
Return a list of strings for the modules fired in the preceding processing step.
- Returns
- get_safe_trajectory(traj_perf: Optional[dict] = None, traj_em: Optional[dict] = None) → dict[source]¶
Selects a safety trajectory based on the provided new performance and emergency trajectory and a previously stored emergency trajectory. If in the first call no safe emergency trajectory is provided, an error is raised.
Note
If no trajectory is provided, the last calculated internal set of trajectories is used
- Parameters
traj_perf – (optional) performance trajectory in form of a dict with the following entries: - ‘traj’: trajectory data as numpy array with columns: s, x, y, head, curv, vel, acc - ‘id’: unique id of the trajectory (int) - ‘time’: time stamp of the trajectory in seconds - ‘valid’: boolean flag indicating safety
traj_em – (optional) emergency trajectory in form of a dict (same entries / format as traj_perf)
- Returns
safe_traj - safe trajectory to be executed (either performance or emergency) in form of dict
- log(traj_perf_ref: Optional[numpy.ndarray] = None, traj_em_ref: Optional[numpy.ndarray] = None, objects_ref: Optional[dict] = None) → None[source]¶
Log relevant data to file (if parameterized). A reference trajectory (to the ones provided via “set_inputs”) for the performance and emergency trajectory can be provided. This could be helpful when using the “sync_data” helper-function. That way one can log the synced as well as the original trajectory. The same scheme applies for a reference object-list
- Parameters
traj_perf_ref – (optional) reference trajectory for the internal performance trajectory
traj_em_ref – (optional) reference trajectory for the internal emergency trajectory
objects_ref – (optional) reference object-list for the internal object list
- Returns
- process_data() → tuple[source]¶
Triggers the calculation of the safety metric with all activated supervisor modules. If parallelization is activated in the configuration, the safety score for the performance and emergency trajectory is each calculated in an separate thread.
This method returns trajectory-dicts with embedded safety rating (key ‘valid’).
- Returns
- traj_perf - performance trajectory in form of a dict with the following entries:
’traj’: trajectory data numpy array with columns: s, x, y, head, curv, vel, acc
’id’: unique id of the trajectory (int)
’time’: time stamp of the trajectory in seconds
’valid’: ‘True’ if the trajectory is rated as safe, else ‘False’
’valid_dyn’: ‘True’ if rated as safe w.r.t. the dynamic environment, else ‘False’
’valid_stat’: ‘True’ if rated as safe w.r.t. the static environment, else ‘False’
’time_safety’: time.time() stamp at which safety score was concluded
traj_em - emergency trajectory in form of a dict (same entries / format as traj_perf)
Note
The returned values default to ‘None’, if no (new) data to process was handed to the class.
- process_data_simple(traj_perf: dict, traj_em: dict, objects: dict) → tuple[source]¶
Calls internal methods “set_inputs()” and “process_data()” and returns simplified safety rating (boolean T/F for performance and emergency trajectory respectively.
- Note: if provided “None” instead of a dict, the previous value remains. However, the trajectories must always
be provided in pairs.
- Parameters
traj_perf – performance trajectory in form of a dict with the following entries: - ‘traj’: trajectory data as numpy array with columns: s, x, y, head, curv, vel, acc - ‘id’: unique id of the trajectory (int) - ‘time’: time stamp of the trajectory in seconds
traj_em – emergency trajectory in form of a dict (same entries / format as traj_perf)
objects – object-list dict with each key being a dedicated object id and each object hosting a dict with (at least) the following information: - ‘id’: unique identifier - ‘X’: x position of cg - ‘Y’: y position of cg - ‘theta’: heading (north = 0, +pi -pi) - ‘v_x’: x velocity - ‘type’: “car”, … - ‘form’: “rectangle” / “circle” - ‘width’: width (if rectangle) - ‘length’: length (if rectangle) - ‘diameter’: diameter (if circle) - ‘time’: time-stamp
- Returns
safety_perf - ‘True’ if provided performance trajectory is safe, ‘False’ else - None if not provided
safety_em - ‘True’ if provided performance trajectory is safe, ‘False’ else - None if not provided
Note
The returned values default to ‘None’, if no (new) data to process was handed to the class.
- set_environment(bound_left: numpy.ndarray, bound_right: numpy.ndarray, ax_max_machines: numpy.ndarray, localgg: numpy.ndarray)[source]¶
Set map information including performance limits applicable on the given map.
Depending on the enabled modules this triggers an (re)initialization that may take some time. For online update of maps, evaluate performance first.
- Parameters
bound_left – left track boundary coordinates as numpy array with columns [x, y]
bound_right – right track boundary coordinates as numpy array with columns [x, y]
ax_max_machines – acceleration profile for the machine of the used vehicles, if multiple types use fastest the acc. for a given velocity is extracted - provide numpy array with columns [v, a]
localgg – track specific acceleration limits as numpy array with columns [x, y, s_m, ax, ay] NOTE: currently only the maximum of ax and ay is used globally (worst case estimate)
- set_inputs(traj_perf: dict, traj_em: dict, objects: dict) → None[source]¶
Set new input data. Note: if provided “None” instead of a dict, the previous value remains. However, the trajectories must always
be provided in pairs.
- Parameters
traj_perf – performance trajectory in form of a dict with the following entries: - ‘traj’: trajectory data as numpy array with columns: s, x, y, head, curv, vel, acc - ‘id’: unique id of the trajectory (int) - ‘time’: time stamp of the trajectory in seconds
traj_em – emergency trajectory in form of a dict (same entries / format as traj_perf)
objects – object-list dict with each key being a dedicated object id and each object hosting a dict with (at least) the following information: - ‘X’: x position of cg - ‘Y’: y position of cg - ‘theta’: heading (north = 0, +pi -pi) - ‘v_x’: x velocity - ‘type’: “car”, … - ‘form’: “rectangle” / “circle” - ‘width’: width (if rectangle) - ‘length’: length (if rectangle) - ‘diameter’: diameter (if circle) - ‘time’: time-stamp
- trajectory_supervisor.supervisor.safety_rating(mod_dict_q_in: multiprocessing.context.BaseContext.Queue, mod_enabled: dict, traj: numpy.ndarray, objects_sync: dict, traj_type: str, mod_dict_q_out: multiprocessing.context.BaseContext.Queue, mp_queue: multiprocessing.context.BaseContext.Queue) → None[source]¶
This function handles the generation of safety score for a given trajectory. Therefore, all activated supervisor modules for the trajectory type at hand (‘traj_type’) are called. The final safety rating is a conjunction of the returned scores of all passed supervisor modules.
- Parameters
mod_dict_q_in – queue holding dictionary with initialized module classes (used for calculations)
mod_enabled – dictionary specifying whether a module is activated for the ‘perf’ and / or ‘emerg’ traj.
traj – trajectory data with columns [s, x, y, heading, curv, vel, acc]
objects_sync – synced object dictionary
traj_type – string describing the trajectory type (‘perf’ or ‘emerg’)
mod_dict_q_out – queue holding dictionary with initialized module classes (updated with executed calculation)
mp_queue – queue that will receive all results
- Authors
Tim Stahl <tim.stahl@tum.de>
- Created on
30.03.2020