Skip to content

Commit

Permalink
Fix spaces. Fix tests. Set gymnasium seed. Fix linting. Add docs
Browse files Browse the repository at this point in the history
  • Loading branch information
amacati committed Jan 21, 2025
1 parent 4542ba0 commit 53d8b36
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 47 deletions.
3 changes: 2 additions & 1 deletion lsy_drone_racing/control/attitude_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ class AttitudeController(BaseController):
"""

def __init__(self, obs: dict[str, NDArray[np.floating]], info: dict, config: dict):
"""Initialization of the controller.
"""Initialize the attitude controller.
Args:
obs: The initial observation of the environment's state. See the environment's
observation space for details.
info: Additional environment information from the reset.
config: The configuration of the environment.
"""
super().__init__(obs, info, config)
self.freq = config.env.freq
Expand Down
75 changes: 73 additions & 2 deletions lsy_drone_racing/envs/drone_race.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@


class DroneRaceEnv(RaceCoreEnv, Env):
"""Single-agent drone racing environment."""

def __init__(
self,
freq: int,
Expand All @@ -31,6 +33,21 @@ def __init__(
max_episode_steps: int = 1500,
device: Literal["cpu", "gpu"] = "cpu",
):
"""Initialize the single-agent drone racing environment.
Args:
freq: Environment step frequency.
sim_config: Simulation configuration.
sensor_range: Sensor range.
action_space: Control mode for the drones. See `build_action_space` for details.
track: Track configuration.
disturbances: Disturbance configuration.
randomizations: Randomization configuration.
random_resets: Flag to reset the environment randomly.
seed: Random seed.
max_episode_steps: Maximum number of steps per episode.
device: Device used for the environment and the simulation.
"""
super().__init__(
n_envs=1,
n_drones=1,
Expand All @@ -52,19 +69,38 @@ def __init__(
self.autoreset = False

def reset(self, seed: int | None = None, options: dict | None = None) -> tuple[dict, dict]:
"""Reset the environment.
Args:
seed: Random seed.
options: Additional reset options. Not used.
Returns:
The initial observation and info.
"""
obs, info = super().reset(seed=seed, options=options)
obs = {k: v[0, 0] for k, v in obs.items()}
info = {k: v[0, 0] for k, v in info.items()}
return obs, info

def step(self, action: NDArray[np.floating]) -> tuple[dict, float, bool, bool, dict]:
"""Step the environment.
Args:
action: Action for the drone.
Returns:
Observation, reward, terminated, truncated, and info.
"""
obs, reward, terminated, truncated, info = super().step(action)
obs = {k: v[0, 0] for k, v in obs.items()}
info = {k: v[0, 0] for k, v in info.items()}
return obs, reward[0, 0], terminated[0, 0], truncated[0, 0], info
return obs, float(reward[0, 0]), bool(terminated[0, 0]), bool(truncated[0, 0]), info


class VecDroneRaceEnv(RaceCoreEnv, VectorEnv):
"""Vectorized single-agent drone racing environment."""

def __init__(
self,
num_envs: int,
Expand All @@ -80,6 +116,22 @@ def __init__(
max_episode_steps: int = 1500,
device: Literal["cpu", "gpu"] = "cpu",
):
"""Initialize the vectorized single-agent drone racing environment.
Args:
num_envs: Number of worlds in the vectorized environment.
freq: Environment step frequency.
sim_config: Simulation configuration.
sensor_range: Sensor range.
action_space: Control mode for the drones. See `build_action_space` for details.
track: Track configuration.
disturbances: Disturbance configuration.
randomizations: Randomization configuration.
random_resets: Flag to reset the environment randomly.
seed: Random seed.
max_episode_steps: Maximum number of steps per episode.
device: Device used for the environment and the simulation.
"""
super().__init__(
n_envs=num_envs,
n_drones=1,
Expand All @@ -102,12 +154,31 @@ def __init__(
self.observation_space = batch_space(self.single_observation_space, num_envs)

def reset(self, seed: int | None = None, options: dict | None = None) -> tuple[dict, dict]:
"""Reset the environment in all worlds.
Args:
seed: Random seed.
options: Additional reset options. Not used.
Returns:
The initial observation and info.
"""
obs, info = super().reset(seed=seed, options=options)
obs = {k: v[:, 0] for k, v in obs.items()}
info = {k: v[:, 0] for k, v in info.items()}
return obs, info

def step(self, action: NDArray[np.floating]) -> tuple[dict, float, bool, bool, dict]:
def step(
self, action: NDArray[np.floating]
) -> tuple[dict, NDArray[np.floating], NDArray[np.bool_], NDArray[np.bool_], dict]:
"""Step the environment in all worlds.
Args:
action: Action for all worlds, i.e., a batch of (n_envs, action_dim) arrays.
Returns:
Observation, reward, terminated, truncated, and info.
"""
obs, reward, terminated, truncated, info = super().step(action)
obs = {k: v[:, 0] for k, v in obs.items()}
info = {k: v[:, 0] for k, v in info.items()}
Expand Down
61 changes: 61 additions & 0 deletions lsy_drone_racing/envs/multi_drone_race.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@


class MultiDroneRaceEnv(RaceCoreEnv, Env):
"""Multi-agent drone racing environment.
This environment enables multiple agents to simultaneously compete with each other on the same
track.
"""

def __init__(
self,
n_drones: int,
Expand All @@ -28,6 +34,22 @@ def __init__(
max_episode_steps: int = 1500,
device: Literal["cpu", "gpu"] = "cpu",
):
"""Initialize the multi-agent drone racing environment.
Args:
n_drones: Number of drones.
freq: Environment step frequency.
sim_config: Simulation configuration.
sensor_range: Sensor range.
action_space: Control mode for the drones. See `build_action_space` for details.
track: Track configuration.
disturbances: Disturbance configuration.
randomizations: Randomization configuration.
random_resets: Flag to reset the environment randomly.
seed: Random seed.
max_episode_steps: Maximum number of steps per episode.
device: Device used for the environment and the simulation.
"""
super().__init__(
n_envs=1,
n_drones=n_drones,
Expand All @@ -51,6 +73,15 @@ def __init__(
self.autoreset = False

def reset(self, seed: int | None = None, options: dict | None = None) -> tuple[dict, dict]:
"""Reset the environment for all drones.
Args:
seed: Random seed.
options: Additional reset options. Not used.
Returns:
Observation and info for all drones.
"""
obs, info = super().reset(seed=seed, options=options)
obs = {k: v[0] for k, v in obs.items()}
info = {k: v[0] for k, v in info.items()}
Expand All @@ -59,13 +90,26 @@ def reset(self, seed: int | None = None, options: dict | None = None) -> tuple[d
def step(
self, action: NDArray[np.floating]
) -> tuple[dict, NDArray[np.floating], NDArray[np.bool_], NDArray[np.bool_], dict]:
"""Step the environment for all drones.
Args:
action: Action for all drones, i.e., a batch of (n_drones, action_dim) arrays.
Returns:
Observation, reward, terminated, truncated, and info for all drones.
"""
obs, reward, terminated, truncated, info = super().step(action)
obs = {k: v[0] for k, v in obs.items()}
info = {k: v[0] for k, v in info.items()}
return obs, reward[0], terminated[0], truncated[0], info


class VecMultiDroneRaceEnv(RaceCoreEnv, VectorEnv):
"""Vectorized multi-agent drone racing environment.
This environment enables vectorized training of multi-agent drone racing agents.
"""

def __init__(
self,
num_envs: int,
Expand All @@ -82,6 +126,23 @@ def __init__(
max_episode_steps: int = 1500,
device: Literal["cpu", "gpu"] = "cpu",
):
"""Vectorized multi-agent drone racing environment.
Args:
num_envs: Number of worlds in the vectorized environment.
n_drones: Number of drones in each world.
freq: Environment step frequency.
sim_config: Simulation configuration.
sensor_range: Sensor range.
action_space: Control mode for the drones. See `build_action_space` for details.
track: Track configuration.
disturbances: Disturbance configuration.
randomizations: Randomization configuration.
random_resets: Flag to reset the environment randomly.
seed: Random seed.
max_episode_steps: Maximum number of steps per episode.
device: Device used for the environment and the simulation.
"""
super().__init__(
n_envs=num_envs,
n_drones=n_drones,
Expand Down
18 changes: 13 additions & 5 deletions lsy_drone_racing/envs/race_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,11 @@ def create(


def build_action_space(control_mode: Literal["state", "attitude"]) -> spaces.Box:
"""Create the action space for the environment."""
if control_mode == "state":
return spaces.Box(low=-1, high=1, shape=(13,))
elif control_mode == "attitude":
lim = np.array([1, np.pi, np.pi, np.pi])
lim = np.array([1, np.pi, np.pi, np.pi], dtype=np.float32)
return spaces.Box(low=-lim, high=lim)
else:
raise ValueError(f"Invalid control mode: {control_mode}")
Expand All @@ -122,10 +123,10 @@ def build_observation_space(n_gates: int, n_obstacles: int) -> spaces.Dict:
"""Create the observation space for the environment."""
obs_spec = {
"pos": spaces.Box(low=-np.inf, high=np.inf, shape=(3,)),
"rpy": spaces.Box(low=-np.inf, high=np.inf, shape=(3,)),
"rpy": spaces.Box(low=-np.pi, high=np.pi, shape=(3,)),
"vel": spaces.Box(low=-np.inf, high=np.inf, shape=(3,)),
"ang_vel": spaces.Box(low=-np.inf, high=np.inf, shape=(3,)),
"target_gate": spaces.MultiDiscrete([n_gates], start=[-1]),
"target_gate": spaces.Discrete(n_gates, start=-1),
"gates_pos": spaces.Box(low=-np.inf, high=np.inf, shape=(n_gates, 3)),
"gates_rpy": spaces.Box(low=-np.pi, high=np.pi, shape=(n_gates, 3)),
"gates_visited": spaces.Box(low=0, high=1, shape=(n_gates,), dtype=bool),
Expand Down Expand Up @@ -197,15 +198,20 @@ def __init__(
"""Initialize the DroneRacingEnv.
Args:
n_drones: Number of drones in the environment.
freq: Environment frequency.
n_envs: Number of worlds in the vectorized environment.
n_drones: Number of drones.
freq: Environment step frequency.
sim_config: Configuration dictionary for the simulation.
sensor_range: Sensor range for gate and obstacle detection.
action_space: Control mode for the drones. See `build_action_space` for details.
track: Track configuration.
disturbances: Disturbance configuration.
randomizations: Randomization configuration.
random_resets: Flag to randomize the environment on reset.
seed: Random seed of the environment.
max_episode_steps: Maximum number of steps per episode. Needs to be tracked manually for
vectorized environments.
device: Device used for the environment and the simulation.
"""
super().__init__()
self.sim = Sim(
Expand Down Expand Up @@ -271,6 +277,7 @@ def reset(
Args:
seed: Random seed.
options: Additional reset options. Not used.
mask: Mask of worlds to reset.
Returns:
Expand All @@ -279,6 +286,7 @@ def reset(
# TODO: Allow per-world sim seeding
if seed is not None:
self.sim.seed(seed)
self._np_random = np.random.default_rng(seed) # Also update gymnasium's rng
elif not self.random_resets:
self.sim.seed(self.seed)
# Randomization of gates, obstacles and drones is compiled into the sim reset function with
Expand Down
Binary file removed models/ppo/model.zip
Binary file not shown.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ target-version = "py38"

[tool.ruff.lint]
select = ["E4", "E7", "E9", "F", "I", "D", "TCH", "ANN"]
ignore = ["ANN101", "ANN401"]
ignore = ["ANN401"]
fixable = ["ALL"]
unfixable = []

Expand Down
4 changes: 3 additions & 1 deletion tests/integration/test_controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,14 @@ def test_attitude_controller(physics: str):

@pytest.mark.integration
@pytest.mark.parametrize("yaw", [0, np.pi / 2, np.pi, 3 * np.pi / 2])
@pytest.mark.parametrize("physics", ["analytical", "sys_id"])
@pytest.mark.parametrize("physics", ["analytical"])
def test_trajectory_controller_finish(yaw: float, physics: str):
"""Test if the trajectory controller can finish the track.
To catch bugs that only occur with orientations other than the unit quaternion, we test if the
controller can finish the track with different desired yaws.
Does not work for sys_id physics mode, since it assumes a 0 yaw angle.
"""
config = load_config(Path(__file__).parents[2] / "config/level0.toml")
config.sim.physics = physics
Expand Down
40 changes: 4 additions & 36 deletions tests/unit/envs/test_envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import gymnasium
import pytest
from gymnasium.utils.passive_env_checker import env_reset_passive_checker, env_step_passive_checker
from gymnasium.utils.env_checker import check_env

from lsy_drone_racing.utils import load_config

Expand All @@ -17,50 +17,18 @@ def test_passive_checker_wrapper_warnings(action_space: str):
still seen, and even raises them to an exception.
"""
config = load_config(Path(__file__).parents[3] / "config/level0.toml")
with warnings.catch_warnings(record=True) as w:
with warnings.catch_warnings(record=True): # Catch unnecessary warnings from gymnasium
env = gymnasium.make(
"DroneRacing-v0",
freq=config.env.freq,
sim_config=config.sim,
sensor_range=config.env.sensor_range,
action_space=action_space,
track=config.env.track,
disturbances=config.env.get("disturbances"),
randomizations=config.env.get("randomizations"),
random_resets=config.env.random_resets,
seed=config.env.seed,
disable_env_checker=False,
)
env_reset_passive_checker(env)
env_step_passive_checker(env, env.action_space.sample())
# Filter out any warnings about 2D Box observation spaces.
w = list(filter(lambda i: "neither an image, nor a 1D vector" not in i.message.args[0], w))
assert len(w) == 0, f"No warnings should be raised, got: {[i.message.args[0] for i in w]}"


@pytest.mark.unit
@pytest.mark.parametrize("action_space", ["state", "attitude"])
def test_vector_passive_checker_wrapper_warnings(action_space: str):
"""Check passive env checker wrapper warnings.
We disable the passive env checker by default. This test ensures that unexpected warnings are
still seen, and even raises them to an exception.
"""
config = load_config(Path(__file__).parents[3] / "config/level0.toml")
with warnings.catch_warnings(record=True) as w:
env = gymnasium.make_vec(
"DroneRacing-v0",
num_envs=2,
freq=config.env.freq,
sim_config=config.sim,
sensor_range=config.env.sensor_range,
track=config.env.track,
disturbances=config.env.get("disturbances"),
randomizations=config.env.get("randomizations"),
random_resets=config.env.random_resets,
seed=config.env.seed,
)
env_reset_passive_checker(env)
env_step_passive_checker(env, env.action_space.sample())
# Filter out any warnings about 2D Box observation spaces.
w = list(filter(lambda i: "neither an image, nor a 1D vector" not in i.message.args[0], w))
assert len(w) == 0, f"No warnings should be raised, got: {[i.message.args[0] for i in w]}"
check_env(env.unwrapped)
Loading

0 comments on commit 53d8b36

Please sign in to comment.