Skip to content

Commit

Permalink
Implemented Mellinger Attitude Controller in UKF
Browse files Browse the repository at this point in the history
  • Loading branch information
Rather1337 committed Dec 19, 2024
1 parent ccda7c9 commit 0e15d22
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 7 deletions.
167 changes: 162 additions & 5 deletions lsy_drone_racing/utils/disturbance_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def __init__(self, dt: np.floating, initial_obs: dict[str, NDArray[np.floating]]
dt: Time step between callings.
initial_obs: Optional, the initial observation of the environment's state. See the environment's observation space for details.
"""
self.f_x = f_x_attitude_dyn # TODO initialize this while creating UKF object
self.f_x = f_x_att_ctrl # TODO initialize this while creating UKF object
self.f_x_continuous = True

dim_x = 18 # (pos, rpy, vel, rpy rates, f_dis, t_dis)
Expand All @@ -211,8 +211,8 @@ def __init__(self, dt: np.floating, initial_obs: dict[str, NDArray[np.floating]]
# self._UKF.Q[12:15, 12:15] = self._UKF.Q[12:15, 12:15] * 1e1 # Force
# self._UKF.Q[15:18, 15:18] = self._UKF.Q[15:18, 15:18] * 1e1 # Torque

self._varQ = 1e-1
self._varR = 1e-2
self._varQ = 1e-2
self._varR = 1e-3

# self._UKF.Q = Q_discrete_white_noise(dim=3, dt=self._dt, var=self._varQ, block_size=6, order_by_dim=False) # TODO manually setup matrix
# self._UKF.Q[12:15, 12:15] = self._UKF.Q[12:15, 12:15] * 5e0 # Force
Expand Down Expand Up @@ -346,6 +346,20 @@ def f_x_dyn(x: NDArray[np.floating], u: NDArray[np.floating], dt: np.floating |

return x_dot

def f_x_att_ctrl(x: NDArray[np.floating], u: NDArray[np.floating], dt: np.floating) -> NDArray[np.floating]:
"""Example dynamics of the drone. Extension of the above dyn model to attitude inputs for the Mellinger controller.
Args:
x: State of the system, batched with shape (batches, states)
u: input of the system, (batches, inputs). [Thrust, R, P, Y]
dt: time step size
Return:
The derivative of x for given x and u
"""
u_RPM, _ = attitude_controller(x, u, dt)
return f_x_dyn(x, u_RPM, dt)

def f_x_attitude_dyn(x: NDArray[np.floating], u: NDArray[np.floating], dt: np.floating) -> NDArray[np.floating]:
"""Example dynamics of the drone. Extension of the above dyn model to attitude inputs.
Expand Down Expand Up @@ -394,11 +408,20 @@ def f_x_attitude_dyn(x: NDArray[np.floating], u: NDArray[np.floating], dt: np.fl
rpms = pwm2rpm(pwm)
return f_x_dyn(x, rpms, dt)

def rot2frame(rot: R, yaw: np.floating) -> NDArray[np.floating]:
"""Converts a rotation object with a given yaw into a base frame."""
z_axis_desired = rot.apply(np.array([0,0,1]), inverse=True) # desired z axis in body frame
x_axis_desired = np.array([np.cos(yaw), np.sin(yaw), 0]) # temporary x axis from yaw angle
y_axis_desired = np.cross(z_axis_desired, x_axis_desired) # desired y axis in body frame
y_axis_desired = y_axis_desired/np.linalg.norm(y_axis_desired) # normalizing y_axis
x_axis_desired = np.cross(y_axis_desired, z_axis_desired) # desired x axis in body frame


def thrust2pwm(thrust: NDArray[np.floating]) -> NDArray[np.floating]:
"""Convert the desired thrust into motor PWM.
Args:
thrust: The desired thrust per motor.
thrust: The desired thrust *per motor*.
Returns:
The motors' PWMs to apply to the quadrotor.
Expand All @@ -415,13 +438,55 @@ def thrust2pwm(thrust: NDArray[np.floating]) -> NDArray[np.floating]:
thrust = np.clip(thrust, MIN_THRUST, MAX_THRUST) # Protect against NaN values
return np.clip((np.sqrt(thrust / KF) - PWM2RPM_CONST) / PWM2RPM_SCALE, MIN_PWM, MAX_PWM)

def pwms_to_thrust(pwms: NDArray[np.floating]) -> NDArray[np.floating]:

Check failure on line 441 in lsy_drone_racing/utils/disturbance_observer.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (D103)

lsy_drone_racing/utils/disturbance_observer.py:441:5: D103 Missing docstring in public function
pwm2rpm_scale = 0.2685
pwm2rpm_const = 4070.3
return kf * (pwm2rpm_scale * pwms + pwm2rpm_const) ** 2

def thrust_to_pwm(thrust):

Check failure on line 446 in lsy_drone_racing/utils/disturbance_observer.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (ANN201)

lsy_drone_racing/utils/disturbance_observer.py:446:5: ANN201 Missing return type annotation for public function `thrust_to_pwm`

Check failure on line 446 in lsy_drone_racing/utils/disturbance_observer.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (D103)

lsy_drone_racing/utils/disturbance_observer.py:446:5: D103 Missing docstring in public function

Check failure on line 446 in lsy_drone_racing/utils/disturbance_observer.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (ANN001)

lsy_drone_racing/utils/disturbance_observer.py:446:19: ANN001 Missing type annotation for function argument `thrust`
# from drone.py
thrust_curve_a = -1.1264
thrust_curve_b = 2.2541
thrust_curve_c = 0.0209 # Thrust curve parameters for brushed motors
max_pwm = 65535
pwm = thrust_curve_a * thrust * thrust + thrust_curve_b * thrust + thrust_curve_c
return np.clip(pwm, 0, 1) * max_pwm

def thrust_to_rpm(thrust: NDArray[np.floating]) -> NDArray[np.floating]:

Check failure on line 455 in lsy_drone_racing/utils/disturbance_observer.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (D103)

lsy_drone_racing/utils/disturbance_observer.py:455:5: D103 Missing docstring in public function
# From sim.py
min_pwm = 20000
max_pwm = 65535
pwm2rpm_scale = 0.2685
pwm2rpm_const = 4070.3
min_rpm = pwm2rpm_scale * min_pwm + pwm2rpm_const
max_rpm = pwm2rpm_scale * max_pwm + pwm2rpm_const
min_thrust = kf * min_rpm**2
max_thrust = kf * max_rpm**2

thrust = np.clip(thrust, min_thrust, max_thrust)

pwm = (np.sqrt(thrust / kf) - pwm2rpm_const) / pwm2rpm_scale
pwm = np.clip(pwm, min_pwm, max_pwm)
return pwm2rpm_const + pwm2rpm_scale * pwm


def pwm2rpm(pwm: NDArray[np.floating]) -> NDArray[np.floating]:
"""Convert the motors' PWMs into RPMs."""
PWM2RPM_SCALE = 0.2685
PWM2RPM_CONST = 4070.3
return PWM2RPM_CONST + PWM2RPM_SCALE * pwm

def batComp(pwm: NDArray[np.floating]) -> NDArray[np.floating]:
"""Compensate the battery voltage."""
supply_voltage = 3.0 # assumed supply voltage. Taken from drone.py
max_pwm = 65535
pwm = pwm/max_pwm*60
pwm_curve_a = -0.0006239 # PWM curve parameters for brushed motors
pwm_curve_b = 0.088 # PWM curve parameters for brushed motors
voltage = pwm_curve_a * pwm**2 + pwm_curve_b * pwm
percentage = np.minimum(1, voltage/supply_voltage)
return percentage * max_pwm

def f_x_fitted(x: NDArray[np.floating], u: NDArray[np.floating], dt: np.floating) -> NDArray[np.floating]:
"""Fitted Dynamics of the drone from Haocheng.
Expand Down Expand Up @@ -560,4 +625,96 @@ def f_x_identified(x: NDArray[np.floating], u: NDArray[np.floating], dt: np.floa
x_dot[i, 6:9] = acc
x_dot[i, 9:12] = rot.apply(rpy_rates)/dt # model doesn't really have the derivative, so it's calculated manually

return x_dot
return x_dot

def attitude_controller(x: NDArray[np.floating], u: NDArray[np.floating], dt: np.floating) -> NDArray[np.floating]:
"""Simulates the attitude controller of the Mellinger controller.
Args:
x: State of the system, batched with shape (batches, states)
u: input of the system, (batches, inputs). [Thrust, R, P, Y]
dt: time step size
Return:
Commanded RPMs
"""
# From firmware controller_mellinger
# l. 52
mass_thrust = 132000
# l. 66-79
# Attitude
kR_xy = 70000 # P
kw_xy = 20000 # D
ki_m_xy = 0.0 # I
i_range_m_xy = 1.0

# Yaw
kR_z = 60000 # P
kw_z = 12000 # D
ki_m_z = 500 # I
i_range_m_z = 1500

# roll and pitch angular velocity
kd_omega_rp = 200 # D

### Calculate RPMs for first x (mean of UKF estimate) & u to keep RPM command constant
rpy = x[0, 3:6]
angular_vel = x[0, 9:12]
thrust_des = u[0, 0]
rpy_des = u[0, 1:]

### From firmware controller_mellinger:
# l. 220 ff
rot = R.from_euler("xyz", rpy)
rot_des = R.from_euler("xyz", rpy_des)
R_act = rot.as_matrix()
R_des = rot_des.as_matrix()
eR = 0.5 * ( R_des.T@R_act - R_act.T@R_des )
eR = np.array([eR[2,1], -eR[0,2], eR[1,0]]) # vee operator (SO3 to R3), the -y is to account for the frame of the crazyflie
# l.256 ff
angular_vel_des = np.zeros(3) # for now assuming angular_vel_des = 0 (would need to be given as input)
ew = angular_vel - angular_vel_des
ew[1] = -ew[1]
# l. 259 ff
prev_angular_vel_des = np.zeros(3) # zero for now (would need to be stored)
prev_angular_vel = np.zeros(3) # for now zeros, would need to be stored outside
err_d = ( (angular_vel_des - prev_angular_vel_des) - (angular_vel - prev_angular_vel) )/dt *0 # set to zeros cause not used
err_d[1] = -err_d[1]
# l. 268 ff
i_error_m = np.zeros(3) # this part is usually longer, but we say the error is zeros for now
# l. 279 ff
Mx = -kR_xy * eR[0] + kw_xy * ew[0] + ki_m_xy * i_error_m[0] + kd_omega_rp * err_d[0]
My = -kR_xy * eR[1] + kw_xy * ew[1] + ki_m_xy * i_error_m[1] + kd_omega_rp * err_d[1]
Mz = -kR_z * eR[2] + kw_z * ew[2] + ki_m_z * i_error_m[2]
# l. 297 ff
if thrust_des > 0:
cmd_roll = np.clip(Mx, -32000, 32000)
cmd_pitch = np.clip(My, -32000, 32000)
cmd_yaw = np.clip(-Mz, -32000, 32000)
else:
cmd_roll = 0
cmd_pitch = 0
cmd_yaw = 0
# From firmware powerDistributionLegacy()
# cmd_PWM = thrust2pwm(thrust_des/4) # /4 because we got total thrust, but function takes single thrust
cmd_PWM = thrust_to_pwm(thrust_des)
# cmd_PWM = mass_thrust*thrust_des
cmd_roll = cmd_roll/2
cmd_pitch = cmd_pitch/2
cmd_PWM = cmd_PWM + np.array([-cmd_roll + cmd_pitch + cmd_yaw,
-cmd_roll - cmd_pitch - cmd_yaw,
+cmd_roll - cmd_pitch + cmd_yaw,
+cmd_roll + cmd_pitch - cmd_yaw])
cmd_PWM = np.clip(cmd_PWM, 20000, 65535)

# From firmware motors.c motorsSetRatio()
# l. 236 ff
cmd_PWM = batComp(cmd_PWM)

# cmd_RPMs = pwm2rpm(cmd_PWM)
cmd_RPMs = thrust_to_rpm((pwms_to_thrust(cmd_PWM)))

### Stack RPMs to fit
u_RPM = np.zeros_like(u)
u_RPM += cmd_RPMs
return u_RPM, cmd_PWM
22 changes: 20 additions & 2 deletions scripts/sim_dob.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from lsy_drone_racing.sim.noise import ExternalForceGrid
from lsy_drone_racing.utils import load_config, load_controller
from lsy_drone_racing.utils.disturbance_observer import UKF, FxTDO
from lsy_drone_racing.utils.disturbance_observer import UKF, FxTDO, attitude_controller

if TYPE_CHECKING:
from munch import Munch
Expand Down Expand Up @@ -68,14 +68,16 @@ def simulate(
controller_cls = load_controller(controller_path) # This returns a class, not an instance
# Create the racing environment
env: DroneRacingEnv = gymnasium.make(env_id or config.env.id, config=config)
dist = ExternalForceGrid(3, [True, True, False], max_force=0.01, grid_size=1.0)
dist = ExternalForceGrid(3, [True, True, False], max_force=0.05, grid_size=1.0)
env.sim.disturbances["dynamics"].append(dist) #WARN: env.sim to get variables from other wrappers is deprecated and will be removed in v1.0, to get this variable you can do `env.unwrapped.sim` for environment variables or `env.get_wrapper_attr('sim')` that will search the reminding wrappers.

ep_times = []
ukf_times = []
gui_timer = None
fxtdo_predictions = [] # x, v, v_hat, f_dis, f_hat
ukf_predictions = []
rpms_list = []
pwms_list = []
for _ in range(n_runs): # Run n_runs episodes with the controller
done = False
obs, info = env.reset()
Expand Down Expand Up @@ -103,6 +105,11 @@ def simulate(


rpms = env.sim.drone.rpm
pwms = env.sim.drone._pwms # _pwms # _setpoint.thrust
x = np.array([np.concatenate( (obs["pos"], obs["rpy"], obs["vel"], obs["ang_vel"]) )])
rpms_est, pwm_est = attitude_controller(x, np.array([action]), dt=1 / config.env.freq) #, obs["vel"], obs["ang_vel"]
rpms_list.append([np.array(rpms), rpms_est[0]])
pwms_list.append([np.mean(pwms), np.mean(pwm_est)])
t_ukf = time.perf_counter()
ukf_pred = ukf.step(obs, u=action) #action, rpms
t_ukf = (time.perf_counter() - t_ukf)
Expand Down Expand Up @@ -140,6 +147,17 @@ def simulate(
controller.episode_reset()
ep_times.append(curr_time if obs["target_gate"] == -1 else None)

rpms_list = np.array(rpms_list)
pwms_list = np.array(pwms_list)
# print(inputs)
fig, ax = plt.subplots(2, 1, figsize=(10, 8))
ax[0].plot(np.arange(len(rpms_list)), np.mean(rpms_list[:,0,:], axis=1), label="True RPM Command")
ax[0].plot(np.arange(len(rpms_list)), np.mean(rpms_list[:,1,:], axis=1), label="Est RPM Command")
ax[0].legend()
ax[1].plot(np.arange(len(pwms_list)), pwms_list[:,0], label="True PWM Command")
ax[1].plot(np.arange(len(pwms_list)), pwms_list[:,1], label="Est PWM Command")
ax[1].legend()
plt.show()
plot_predictions(fxtdo_predictions, 1 / config.env.freq)
plot_predictions(ukf_predictions, 1 / config.env.freq)
plt.show()
Expand Down

0 comments on commit 0e15d22

Please sign in to comment.