diff --git a/lsy_drone_racing/control/attitude_controller.py b/lsy_drone_racing/control/attitude_controller.py index e1a6c5aa..dab7d758 100644 --- a/lsy_drone_racing/control/attitude_controller.py +++ b/lsy_drone_racing/control/attitude_controller.py @@ -112,7 +112,7 @@ def compute_control( target_thrust[2] += self.drone_mass * self.g # Update z_axis to the current orientation of the drone - z_axis = R.from_euler("xyz", obs["rpy"]).as_matrix()[:, 2] + z_axis = R.from_quat(obs["quat"]).as_matrix()[:, 2] # update current thrust thrust_desired = target_thrust.dot(z_axis) diff --git a/lsy_drone_racing/envs/drone_racing_deploy_env.py b/lsy_drone_racing/envs/drone_racing_deploy_env.py index 4658f6bf..1b4ef595 100644 --- a/lsy_drone_racing/envs/drone_racing_deploy_env.py +++ b/lsy_drone_racing/envs/drone_racing_deploy_env.py @@ -76,6 +76,7 @@ def __init__(self, config: dict | ConfigDict): Args: config: The configuration of the environment. """ + raise NotImplementedError("The deployment environment is currently not functional.") super().__init__() self.config = config self.action_space = gymnasium.spaces.Box(low=-1, high=1, shape=(13,)) @@ -86,12 +87,12 @@ def __init__(self, config: dict | ConfigDict): self.observation_space = spaces.Dict( { "pos": spaces.Box(low=-np.inf, high=np.inf, shape=(3,)), - "rpy": spaces.Box(low=-np.inf, high=np.inf, shape=(3,)), + "quat": spaces.Box(low=-1, high=1, shape=(4,)), "vel": spaces.Box(low=-np.inf, high=np.inf, shape=(3,)), "ang_vel": spaces.Box(low=-np.inf, high=np.inf, shape=(3,)), "target_gate": spaces.Discrete(n_gates, start=-1), "gates_pos": spaces.Box(low=-np.inf, high=np.inf, shape=(n_gates, 3)), - "gates_rpy": spaces.Box(low=-np.pi, high=np.pi, shape=(n_gates, 3)), + "gates_quat": spaces.Box(low=-1, high=1, shape=(n_gates, 4)), "gates_visited": spaces.Box(low=0, high=1, shape=(n_gates,), dtype=np.bool_), "obstacles_pos": spaces.Box(low=-np.inf, high=np.inf, shape=(n_obstacles, 3)), "obstacles_visited": spaces.Box( @@ -199,13 +200,11 @@ def close(self): def obs(self) -> dict: """Return the observation of the environment.""" drone = self.vicon.drone_name - rpy = self.vicon.rpy[drone] - ang_vel = R.from_euler("xyz", rpy).inv().apply(self.vicon.ang_vel[drone]) obs = { "pos": self.vicon.pos[drone].astype(np.float32), - "rpy": rpy.astype(np.float32), + "quat": self.vicon.quat[drone].astype(np.float32), "vel": self.vicon.vel[drone].astype(np.float32), - "ang_vel": ang_vel.astype(np.float32), + "ang_vel": self.vicon.ang_vel[drone].astype(np.float32), } sensor_range = self.config.env.sensor_range @@ -246,7 +245,7 @@ def obs(self) -> dict: obs["obstacles_visited"] = self.obstacles_visited obs["gates_pos"] = gates_pos.astype(np.float32) - obs["gates_rpy"] = gates_rpy.astype(np.float32) + obs["gates_quat"] = R.from_euler("xyz", gates_rpy).as_quat().astype(np.float32) obs["obstacles_pos"] = obstacles_pos.astype(np.float32) self._obs = obs return obs @@ -268,7 +267,7 @@ def gate_passed(self, pos: NDArray[np.floating], prev_pos: NDArray[np.floating]) # Real gates measure 0.4m x 0.4m, we account for meas. error gate_size = (0.56, 0.56) gate_pos = self._obs["gates_pos"][self.target_gate] - gate_rot = R.from_euler("xyz", self._obs["gates_rpy"][self.target_gate]) + gate_rot = R.from_quat(self._obs["gates_quat"][self.target_gate]) return check_gate_pass(gate_pos, gate_rot, gate_size, pos, prev_pos) return False diff --git a/lsy_drone_racing/envs/race_core.py b/lsy_drone_racing/envs/race_core.py index aced6056..a2d91b66 100644 --- a/lsy_drone_racing/envs/race_core.py +++ b/lsy_drone_racing/envs/race_core.py @@ -63,8 +63,6 @@ class EnvData: contact_masks: Array pos_limit_low: Array pos_limit_high: Array - rpy_limit_low: Array - rpy_limit_high: Array gate_mj_ids: Array obstacle_mj_ids: Array max_episode_steps: Array @@ -84,8 +82,6 @@ def create( sensor_range: float, pos_limit_low: Array, pos_limit_high: Array, - rpy_limit_low: Array, - rpy_limit_high: Array, device: Device, ) -> EnvData: """Create a new environment data struct with default values.""" @@ -100,8 +96,6 @@ def create( steps=jp.zeros(n_envs, dtype=int, device=device), pos_limit_low=jp.array(pos_limit_low, dtype=np.float32, device=device), pos_limit_high=jp.array(pos_limit_high, dtype=np.float32, device=device), - rpy_limit_low=jp.array(rpy_limit_low, dtype=np.float32, device=device), - rpy_limit_high=jp.array(rpy_limit_high, dtype=np.float32, device=device), gate_mj_ids=jp.array(gate_mj_ids, dtype=int, device=device), obstacle_mj_ids=jp.array(obstacle_mj_ids, dtype=int, device=device), max_episode_steps=jp.array([max_episode_steps], dtype=int, device=device), @@ -124,12 +118,12 @@ def build_observation_space(n_gates: int, n_obstacles: int) -> spaces.Dict: """Create the observation space for the environment.""" obs_spec = { "pos": spaces.Box(low=-np.inf, high=np.inf, shape=(3,)), - "rpy": spaces.Box(low=-np.pi, high=np.pi, shape=(3,)), + "quat": spaces.Box(low=-1, high=1, shape=(4,)), "vel": spaces.Box(low=-np.inf, high=np.inf, shape=(3,)), "ang_vel": spaces.Box(low=-np.inf, high=np.inf, shape=(3,)), "target_gate": spaces.Discrete(n_gates, start=-1), "gates_pos": spaces.Box(low=-np.inf, high=np.inf, shape=(n_gates, 3)), - "gates_rpy": spaces.Box(low=-np.pi, high=np.pi, shape=(n_gates, 3)), + "gates_quat": spaces.Box(low=-1, high=1, shape=(n_gates, 4)), "gates_visited": spaces.Box(low=0, high=1, shape=(n_gates,), dtype=bool), "obstacles_pos": spaces.Box(low=-np.inf, high=np.inf, shape=(n_obstacles, 3)), "obstacles_visited": spaces.Box(low=0, high=1, shape=(n_obstacles,), dtype=bool), @@ -160,15 +154,15 @@ class RaceCoreEnv: The observation space is a dictionary with the following keys: - "pos": Drone position - - "rpy": Drone orientation (roll, pitch, yaw) + - "quat": Drone orientation as a quaternion (x, y, z, w) - "vel": Drone linear velocity - "ang_vel": Drone angular velocity - - "gates.pos": Positions of the gates - - "gates.rpy": Orientations of the gates - - "gates.visited": Flags indicating if the drone already was/ is in the sensor range of the + - "gates_pos": Positions of the gates + - "gates_quat": Orientations of the gates + - "gates_visited": Flags indicating if the drone already was/ is in the sensor range of the gates and the true position is known - - "obstacles.pos": Positions of the obstacles - - "obstacles.visited": Flags indicating if the drone already was/ is in the sensor range of the + - "obstacles_pos": Positions of the obstacles + - "obstacles_visited": Flags indicating if the drone already was/ is in the sensor range of the obstacles and the true position is known - "target_gate": The current target gate index @@ -253,7 +247,6 @@ def __init__( gate_mj_ids, obstacle_mj_ids = self.gates["mj_ids"], self.obstacles["mj_ids"] pos_limit_low = jp.array([-3, -3, 0], dtype=np.float32, device=self.device) pos_limit_high = jp.array([3, 3, 2.5], dtype=np.float32, device=self.device) - rpy_limit = jp.array([jp.pi / 2, jp.pi / 2, jp.pi], dtype=jp.float32, device=self.device) self.data = EnvData.create( n_envs, n_drones, @@ -266,8 +259,6 @@ def __init__( sensor_range, pos_limit_low, pos_limit_high, - -rpy_limit, - rpy_limit, self.device, ) @@ -315,16 +306,14 @@ def _step( self.sim.data = self._warp_disabled_drones(self.sim.data, self.data.disabled_drones) # Apply the environment logic. Check which drones are now disabled, check which gates have # been passed, and update the target gate. - drone_pos, drone_quat = self.sim.data.states.pos, self.sim.data.states.quat + drone_pos = self.sim.data.states.pos mocap_pos, mocap_quat = self.sim.data.mjx_data.mocap_pos, self.sim.data.mjx_data.mocap_quat contacts = self.sim.contacts() # Get marked_for_reset before it is updated, because the autoreset needs to be based on the # previous flags, not the ones from the current step marked_for_reset = self.data.marked_for_reset # Apply the environment logic with updated simulation data. - self.data = self._step_env( - self.data, drone_pos, drone_quat, mocap_pos, mocap_quat, contacts - ) + self.data = self._step_env(self.data, drone_pos, mocap_pos, mocap_quat, contacts) # Auto-reset envs. Add configuration option to disable for single-world envs if self.autoreset and marked_for_reset.any(): self._reset(mask=marked_for_reset) @@ -357,27 +346,25 @@ def obs(self) -> dict[str, NDArray[np.floating]]: """Return the observation of the environment.""" # Add the gate and obstacle poses to the info. If gates or obstacles are in sensor range, # use the actual pose, otherwise use the nominal pose. - gates_pos, gates_rpy, obstacles_pos = self._obs( + gates_pos, gates_quat, obstacles_pos = self._obs( self.sim.data.mjx_data.mocap_pos, self.sim.data.mjx_data.mocap_quat, self.data.gates_visited, self.gates["mj_ids"], self.gates["nominal_pos"], - self.gates["nominal_rpy"], + self.gates["nominal_quat"], self.data.obstacles_visited, self.obstacles["mj_ids"], self.obstacles["nominal_pos"], ) - quat = self.sim.data.states.quat - rpy = R.from_quat(quat.reshape(-1, 4)).as_euler("xyz").reshape((*quat.shape[:-1], 3)) obs = { "pos": np.array(self.sim.data.states.pos, dtype=np.float32), - "rpy": rpy.astype(np.float32), + "quat": np.array(self.sim.data.states.quat, dtype=np.float32), "vel": np.array(self.sim.data.states.vel, dtype=np.float32), "ang_vel": np.array(self.sim.data.states.ang_vel, dtype=np.float32), "target_gate": np.array(self.data.target_gate, dtype=int), "gates_pos": np.asarray(gates_pos, dtype=np.float32), - "gates_rpy": np.asarray(gates_rpy, dtype=np.float32), + "gates_quat": np.asarray(gates_quat, dtype=np.float32), "gates_visited": np.asarray(self.data.gates_visited, dtype=bool), "obstacles_pos": np.asarray(obstacles_pos, dtype=np.float32), "obstacles_visited": np.asarray(self.data.obstacles_visited, dtype=bool), @@ -447,16 +434,11 @@ def _reset_env_data(data: EnvData, drone_pos: Array, mask: Array | None = None) @staticmethod @jax.jit def _step_env( - data: EnvData, - drone_pos: Array, - drone_quat: Array, - mocap_pos: Array, - mocap_quat: Array, - contacts: Array, + data: EnvData, drone_pos: Array, mocap_pos: Array, mocap_quat: Array, contacts: Array ) -> EnvData: """Step the environment data.""" n_gates = len(data.gate_mj_ids) - disabled_drones = RaceCoreEnv._disabled_drones(drone_pos, drone_quat, contacts, data) + disabled_drones = RaceCoreEnv._disabled_drones(drone_pos, contacts, data) gates_pos = mocap_pos[:, data.gate_mj_ids] obstacles_pos = mocap_pos[:, data.obstacle_mj_ids] # We need to convert the mocap quat from MuJoCo order to scipy order @@ -498,27 +480,24 @@ def _obs( gates_visited: Array, gate_mocap_ids: Array, nominal_gate_pos: NDArray, - nominal_gate_rpy: NDArray, + nominal_gate_quat: NDArray, obstacles_visited: Array, obstacle_mocap_ids: Array, nominal_obstacle_pos: NDArray, ) -> tuple[Array, Array]: """Get the nominal or real gate positions and orientations depending on the sensor range.""" mask, real_pos = gates_visited[..., None], mocap_pos[:, gate_mocap_ids] - real_rpy = JaxR.from_quat(mocap_quat[:, gate_mocap_ids][..., [1, 2, 3, 0]]).as_euler("xyz") + real_quat = mocap_quat[:, gate_mocap_ids][..., [1, 2, 3, 0]] gates_pos = jp.where(mask, real_pos[:, None], nominal_gate_pos[None, None]) - gates_rpy = jp.where(mask, real_rpy[:, None], nominal_gate_rpy[None, None]) + gates_quat = jp.where(mask, real_quat[:, None], nominal_gate_quat[None, None]) mask, real_pos = obstacles_visited[..., None], mocap_pos[:, obstacle_mocap_ids] obstacles_pos = jp.where(mask, real_pos[:, None], nominal_obstacle_pos[None, None]) - return gates_pos, gates_rpy, obstacles_pos + return gates_pos, gates_quat, obstacles_pos @staticmethod - def _disabled_drones(pos: Array, quat: Array, contacts: Array, data: EnvData) -> Array: - rpy = JaxR.from_quat(quat).as_euler("xyz") - disabled = jp.logical_or(data.disabled_drones, jp.all(pos < data.pos_limit_low, axis=-1)) - disabled = jp.logical_or(disabled, jp.all(pos > data.pos_limit_high, axis=-1)) - disabled = jp.logical_or(disabled, jp.all(rpy < data.rpy_limit_low, axis=-1)) - disabled = jp.logical_or(disabled, jp.all(rpy > data.rpy_limit_high, axis=-1)) + def _disabled_drones(pos: Array, contacts: Array, data: EnvData) -> Array: + disabled = jp.logical_or(data.disabled_drones, jp.any(pos < data.pos_limit_low, axis=-1)) + disabled = jp.logical_or(disabled, jp.any(pos > data.pos_limit_high, axis=-1)) disabled = jp.logical_or(disabled, data.target_gate == -1) contacts = jp.any(jp.logical_and(contacts[:, None, :], data.contact_masks), axis=-1) disabled = jp.logical_or(disabled, contacts) @@ -539,8 +518,13 @@ def _warp_disabled_drones(data: SimData, mask: Array) -> SimData: def _load_track(self, track: dict) -> tuple[dict, dict, dict]: """Load the track from the config file.""" gate_pos = np.array([g["pos"] for g in track.gates]) - gate_rpy = np.array([g["rpy"] for g in track.gates]) - gates = {"pos": gate_pos, "rpy": gate_rpy, "nominal_pos": gate_pos, "nominal_rpy": gate_rpy} + gate_quat = R.from_euler("xyz", np.array([g["rpy"] for g in track.gates])).as_quat() + gates = { + "pos": gate_pos, + "quat": gate_quat, + "nominal_pos": gate_pos, + "nominal_quat": gate_quat, + } obstacle_pos = np.array([o["pos"] for o in track.obstacles]) obstacles = {"pos": obstacle_pos, "nominal_pos": obstacle_pos} drone_keys = ("pos", "rpy", "vel", "ang_vel") @@ -578,7 +562,7 @@ def _load_track_into_sim(self, gate_spec: MjSpec, obstacle_spec: MjSpec): gate = frame.attach_body(gate_spec.find_body("gate"), "", f":{i}") gate.pos = self.gates["pos"][i] # Convert from scipy order to MuJoCo order - gate.quat = R.from_euler("xyz", self.gates["rpy"][i]).as_quat()[[3, 0, 1, 2]] + gate.quat = self.gates["quat"][i][[3, 0, 1, 2]] gate.mocap = True # Make mocap to modify the position of static bodies during sim for i in range(n_obstacles): obstacle = frame.attach_body(obstacle_spec.find_body("obstacle"), "", f":{i}") diff --git a/lsy_drone_racing/utils/ros_utils.py b/lsy_drone_racing/utils/ros_utils.py index 4b637ae4..9f61943c 100644 --- a/lsy_drone_racing/utils/ros_utils.py +++ b/lsy_drone_racing/utils/ros_utils.py @@ -34,9 +34,9 @@ def check_race_track(config: ConfigDict): assert rng_info.obstacle_pos.type == "uniform", "Race track checks expect uniform distributions" for i, gate in enumerate(config.env.track.gates): name = f"gate{i + 1}" - gate_pos, gate_rot = vicon.pos[name], R.from_euler("xyz", vicon.rpy[name]) + gate_pos, gate_rot = vicon.pos[name], R.from_quat(vicon.quat[name]) check_bounds(name, gate_pos, gate.pos, rng_info.gate_pos.low, rng_info.gate_pos.high) - check_rotation(name, gate_rot, R.from_euler("xyz", gate.rpy), ang_tol) + check_rotation(name, gate_rot, R.from_quat(gate.quat), ang_tol) for i, obstacle in enumerate(config.env.track.obstacles): name = f"obstacle{i + 1}" diff --git a/lsy_drone_racing/vicon.py b/lsy_drone_racing/vicon.py index 66c5012e..507e1fe9 100644 --- a/lsy_drone_racing/vicon.py +++ b/lsy_drone_racing/vicon.py @@ -60,7 +60,7 @@ def __init__( self.track_names = track_names # Register the Vicon subscribers for the drone and any other tracked object self.pos: dict[str, np.ndarray] = {} - self.rpy: dict[str, np.ndarray] = {} + self.quat: dict[str, np.ndarray] = {} self.vel: dict[str, np.ndarray] = {} self.ang_vel: dict[str, np.ndarray] = {} self.time: dict[str, float] = {} @@ -90,8 +90,7 @@ def estimator_callback(self, data: StateVector): if self.drone_name is None: return self.pos[self.drone_name] = np.array(data.pos) - rpy = R.from_quat(data.quat).as_euler("xyz") - self.rpy[self.drone_name] = np.array(rpy) + self.quat[self.drone_name] = np.array(data.quat) self.vel[self.drone_name] = np.array(data.vel) self.ang_vel[self.drone_name] = np.array(data.omega_b) @@ -110,10 +109,10 @@ def tf_callback(self, data: TFMessage): continue T, Rot = tf.transform.translation, tf.transform.rotation pos = np.array([T.x, T.y, T.z]) - rpy = R.from_quat([Rot.x, Rot.y, Rot.z, Rot.w]).as_euler("xyz") + quat = np.array([Rot.x, Rot.y, Rot.z, Rot.w]) self.time[name] = time.time() self.pos[name] = pos - self.rpy[name] = rpy + self.quat[name] = quat def pose(self, name: str) -> tuple[np.ndarray, np.ndarray]: """Get the latest pose of a tracked object. @@ -122,14 +121,14 @@ def pose(self, name: str) -> tuple[np.ndarray, np.ndarray]: name: The name of the object. Returns: - The position and rotation of the object. The rotation is in roll-pitch-yaw format. + The position and orientation (as xyzw quaternion) of the object. """ - return self.pos[name], self.rpy[name] + return self.pos[name], self.quat[name] @property def poses(self) -> tuple[np.ndarray, np.ndarray]: """Get the latest poses of all objects.""" - return np.stack(self.pos.values()), np.stack(self.rpy.values()) + return np.stack(self.pos.values()), np.stack(self.quat.values()) @property def names(self) -> list[str]: