trajectory_supervisor.supervisor module

class trajectory_supervisor.supervisor.Supervisor(module_enabled: dict, supmod_config_path: str, veh_params: dict, logging_param: Optional[dict] = None, zone_file_path: Optional[str] = None, occ_map_path: Optional[str] = None, use_mp: bool = False)[source]

Bases: object

Supervisor class, holding the core functionality and execution steps for an online verification of motion primitives. This class handles the following core tasks:

  • Holds methods to retrieve the latest parameter updates (e.g. trajectories, object list, …)

  • Handles the temporal synchronisation of trajectories and object-list

  • Triggers calculation and retrieves results of safety scores

Authors
Created on

14.06.2019

Parameters
  • module_enabled – dict of dicts specifying which modules of the supervisor are enabled

  • supmod_config_path – path pointing to the config file of the Supervisor modules

  • logging_param

    dict of logging parameters (e.g. file paths), must hold the following keys: file_log_level - string spec. file log level (e.g. INFO, WARNING, CRITICAL) console_log_level - string spec. console log level (e.g. INFO, WARNING, CRITICAL) log_path_data - string for path of file where the data log should be generated log_path_msg - string for path of file where the msg log should be generated log_path_map - string for path of file where the map log should be generated NOTE: - all log files should reside in the same folder, for the log viewer to

    automatically locate the most recent log, stick to the example on github

    • if “None” is provided, logging is disabled; ignoring setting in config

  • veh_params

    dict of vehicle parameters; must hold the following keys: veh_width - width of the ego-vehicle [in m]

    NOTE: also used for offline pre-calculations of other vehicles

    veh_length - length of the ego-vehicle [in m]

    NOTE: also used for offline pre-calculations of other vehicles

    turn_rad - turning radius of vehicle [in m] dyn_model_exp - exponent used in vehicle dynamics model (range [1.0, 2.0])

    NOTE: 2.0 -> ideal friction circle; 1.0 -> clean diamond shape

    drag_coeff - drag coeff. incl. all constants (0.5*c_w*A_front*rho_air)

    set zero to disable [in m2*kg/m3]

    m_veh - vehicle mass [in kg]

  • zone_file_path – (optional) path pointing to a Roborace zone spec. (e.g. for reach set reduct.)

  • occ_map_path – (optional) path pointing to location where occupation map can be stored to avoid offline calculation on every launch

  • use_mp – (optional) if set to true and executed on a Linux machine, multiprocessing is used

get_fired_modules()list[source]

Return a list of strings for the modules fired in the preceding processing step.

Returns

get_safe_trajectory(traj_perf: Optional[dict] = None, traj_em: Optional[dict] = None)dict[source]

Selects a safety trajectory based on the provided new performance and emergency trajectory and a previously stored emergency trajectory. If in the first call no safe emergency trajectory is provided, an error is raised.

Note

If no trajectory is provided, the last calculated internal set of trajectories is used

Parameters
  • traj_perf – (optional) performance trajectory in form of a dict with the following entries: - ‘traj’: trajectory data as numpy array with columns: s, x, y, head, curv, vel, acc - ‘id’: unique id of the trajectory (int) - ‘time’: time stamp of the trajectory in seconds - ‘valid’: boolean flag indicating safety

  • traj_em – (optional) emergency trajectory in form of a dict (same entries / format as traj_perf)

Returns

  • safe_traj - safe trajectory to be executed (either performance or emergency) in form of dict

init_supmods()None[source]

Initialize all activated SUPervisor MODules.

log(traj_perf_ref: Optional[numpy.ndarray] = None, traj_em_ref: Optional[numpy.ndarray] = None, objects_ref: Optional[dict] = None)None[source]

Log relevant data to file (if parameterized). A reference trajectory (to the ones provided via “set_inputs”) for the performance and emergency trajectory can be provided. This could be helpful when using the “sync_data” helper-function. That way one can log the synced as well as the original trajectory. The same scheme applies for a reference object-list

Parameters
  • traj_perf_ref – (optional) reference trajectory for the internal performance trajectory

  • traj_em_ref – (optional) reference trajectory for the internal emergency trajectory

  • objects_ref – (optional) reference object-list for the internal object list

Returns

process_data()tuple[source]

Triggers the calculation of the safety metric with all activated supervisor modules. If parallelization is activated in the configuration, the safety score for the performance and emergency trajectory is each calculated in an separate thread.

This method returns trajectory-dicts with embedded safety rating (key ‘valid’).

Returns

  • traj_perf - performance trajectory in form of a dict with the following entries:
    • ’traj’: trajectory data numpy array with columns: s, x, y, head, curv, vel, acc

    • ’id’: unique id of the trajectory (int)

    • ’time’: time stamp of the trajectory in seconds

    • ’valid’: ‘True’ if the trajectory is rated as safe, else ‘False’

    • ’valid_dyn’: ‘True’ if rated as safe w.r.t. the dynamic environment, else ‘False’

    • ’valid_stat’: ‘True’ if rated as safe w.r.t. the static environment, else ‘False’

    • ’time_safety’: time.time() stamp at which safety score was concluded

  • traj_em - emergency trajectory in form of a dict (same entries / format as traj_perf)

Note

The returned values default to ‘None’, if no (new) data to process was handed to the class.

process_data_simple(traj_perf: dict, traj_em: dict, objects: dict)tuple[source]

Calls internal methods “set_inputs()” and “process_data()” and returns simplified safety rating (boolean T/F for performance and emergency trajectory respectively.

Note: if provided “None” instead of a dict, the previous value remains. However, the trajectories must always

be provided in pairs.

Parameters
  • traj_perf – performance trajectory in form of a dict with the following entries: - ‘traj’: trajectory data as numpy array with columns: s, x, y, head, curv, vel, acc - ‘id’: unique id of the trajectory (int) - ‘time’: time stamp of the trajectory in seconds

  • traj_em – emergency trajectory in form of a dict (same entries / format as traj_perf)

  • objects – object-list dict with each key being a dedicated object id and each object hosting a dict with (at least) the following information: - ‘id’: unique identifier - ‘X’: x position of cg - ‘Y’: y position of cg - ‘theta’: heading (north = 0, +pi -pi) - ‘v_x’: x velocity - ‘type’: “car”, … - ‘form’: “rectangle” / “circle” - ‘width’: width (if rectangle) - ‘length’: length (if rectangle) - ‘diameter’: diameter (if circle) - ‘time’: time-stamp

Returns

  • safety_perf - ‘True’ if provided performance trajectory is safe, ‘False’ else - None if not provided

  • safety_em - ‘True’ if provided performance trajectory is safe, ‘False’ else - None if not provided

Note

The returned values default to ‘None’, if no (new) data to process was handed to the class.

set_environment(bound_left: numpy.ndarray, bound_right: numpy.ndarray, ax_max_machines: numpy.ndarray, localgg: numpy.ndarray)[source]

Set map information including performance limits applicable on the given map.

Depending on the enabled modules this triggers an (re)initialization that may take some time. For online update of maps, evaluate performance first.

Parameters
  • bound_left – left track boundary coordinates as numpy array with columns [x, y]

  • bound_right – right track boundary coordinates as numpy array with columns [x, y]

  • ax_max_machines – acceleration profile for the machine of the used vehicles, if multiple types use fastest the acc. for a given velocity is extracted - provide numpy array with columns [v, a]

  • localgg – track specific acceleration limits as numpy array with columns [x, y, s_m, ax, ay] NOTE: currently only the maximum of ax and ay is used globally (worst case estimate)

set_inputs(traj_perf: dict, traj_em: dict, objects: dict)None[source]

Set new input data. Note: if provided “None” instead of a dict, the previous value remains. However, the trajectories must always

be provided in pairs.

Parameters
  • traj_perf – performance trajectory in form of a dict with the following entries: - ‘traj’: trajectory data as numpy array with columns: s, x, y, head, curv, vel, acc - ‘id’: unique id of the trajectory (int) - ‘time’: time stamp of the trajectory in seconds

  • traj_em – emergency trajectory in form of a dict (same entries / format as traj_perf)

  • objects – object-list dict with each key being a dedicated object id and each object hosting a dict with (at least) the following information: - ‘X’: x position of cg - ‘Y’: y position of cg - ‘theta’: heading (north = 0, +pi -pi) - ‘v_x’: x velocity - ‘type’: “car”, … - ‘form’: “rectangle” / “circle” - ‘width’: width (if rectangle) - ‘length’: length (if rectangle) - ‘diameter’: diameter (if circle) - ‘time’: time-stamp

trajectory_supervisor.supervisor.default(obj)[source]
trajectory_supervisor.supervisor.safety_rating(mod_dict_q_in: multiprocessing.context.BaseContext.Queue, mod_enabled: dict, traj: numpy.ndarray, objects_sync: dict, traj_type: str, mod_dict_q_out: multiprocessing.context.BaseContext.Queue, mp_queue: multiprocessing.context.BaseContext.Queue)None[source]

This function handles the generation of safety score for a given trajectory. Therefore, all activated supervisor modules for the trajectory type at hand (‘traj_type’) are called. The final safety rating is a conjunction of the returned scores of all passed supervisor modules.

Parameters
  • mod_dict_q_in – queue holding dictionary with initialized module classes (used for calculations)

  • mod_enabled – dictionary specifying whether a module is activated for the ‘perf’ and / or ‘emerg’ traj.

  • traj – trajectory data with columns [s, x, y, heading, curv, vel, acc]

  • objects_sync – synced object dictionary

  • traj_type – string describing the trajectory type (‘perf’ or ‘emerg’)

  • mod_dict_q_out – queue holding dictionary with initialized module classes (updated with executed calculation)

  • mp_queue – queue that will receive all results

Authors
Created on

30.03.2020