Source code for training_ml_control.control

from typing import Protocol

import numpy as np
from do_mpc.controller import LQR, MPC
from do_mpc.model import LinearModel, Model
from gymnasium import Env
from numpy.typing import NDArray

__all__ = [
    "FeedbackController",
    "Observer",
    "ConstantController",
    "SineController",
    "SumOfSineController",
    "SchroederSweepController",
    "PRBSController",
    "RandomController",
    "build_lqr_controller",
    "build_mpc_controller",
]


[docs] class FeedbackController(Protocol):
[docs] def control(self, measurement: NDArray) -> NDArray: ...
[docs] class Observer(Protocol):
[docs] def observe(self, measurement: NDArray) -> NDArray: ...
[docs] class ConstantController: def __init__(self, u: NDArray = np.zeros(1)) -> None: self.u = u
[docs] def act(self, measurement: NDArray) -> NDArray: return self.u
[docs] class SineController: def __init__( self, env: Env, u_max: NDArray = np.asarray([10]), frequency: float = 1 ) -> None: self.dt = env.unwrapped.dt self.u_max = u_max self.frequency = frequency self.i = 0
[docs] def act(self, measurement: NDArray) -> NDArray: t = self.dt * self.i self.i += 1 u = self.u_max * np.sin(2 * np.pi * self.frequency * t) return u
[docs] class SumOfSineController: def __init__( self, env: Env, u_max: NDArray = np.asarray([10]), frequencies: list[float] = [1.0], ) -> None: self.dt = env.unwrapped.dt self.u_max = u_max self.frequencies = frequencies self.i = 0
[docs] def act(self, measurement: NDArray) -> NDArray: t = self.dt * self.i self.i += 1 u = np.asarray([0.0]) for frequency in self.frequencies: u += np.sin(2 * np.pi * frequency * t) u *= self.u_max return u
[docs] class SchroederSweepController: def __init__( self, env: Env, n_time_steps: int = 200, input_power: float = 10, n_harmonics: int = 3, frequency: float = 1, ) -> None: self.dt = env.unwrapped.dt self.input_power = input_power self.n_time_steps = n_time_steps self.n_harmonics = n_harmonics self.frequency = frequency self.amplitude = np.sqrt(self.input_power / self.n_harmonics) self.phis = np.zeros(self.n_harmonics) for k in range(1, self.n_harmonics): self.phis[k] = self.phis[k - 1] - np.pi * k**2 / self.n_time_steps self.i = 0
[docs] def act(self, measurement: NDArray) -> NDArray: t = self.dt * self.i self.i += 1 u = np.asarray([0.0]) for k, phi in enumerate(self.phis): u += np.cos(2 * np.pi * (k + 1) * self.frequency * t + phi) u *= self.amplitude return u
[docs] class PRBSController: def __init__(self, u_max: NDArray = np.asarray([10]), seed: int = 16) -> None: self.u_max = u_max self.rng = np.random.default_rng(seed)
[docs] def act(self, measurement: NDArray) -> NDArray: return self.rng.choice([self.u_max, -self.u_max])
[docs] class RandomController: def __init__(self, env: Env) -> None: self.action_space = env.action_space
[docs] def act(self, measurement: NDArray) -> NDArray: return self.action_space.sample()
[docs] def build_lqr_controller( model: LinearModel, t_step: float, n_horizon: int | None, setpoint: NDArray, Q: NDArray, R: NDArray, ) -> LQR: lqr = LQR(model) lqr.settings.t_step = t_step lqr.settings.n_horizon = n_horizon lqr.set_objective(Q=Q, R=R) lqr.setup() lqr.set_setpoint(setpoint) return lqr
[docs] def build_mpc_controller( model: Model, t_step: float, n_horizon: int | None, terminal_cost, stage_cost, x_limits: dict[str, NDArray] | None = None, u_limits: dict[str, NDArray] | None = None, u_penalty: dict[str, float] | None = None, *, uncertainty_values: dict[str, NDArray] | None = None, n_robust: int = 1, ) -> MPC: mpc = MPC(model) mpc_params = { "n_horizon": n_horizon, "t_step": t_step, "n_robust": n_robust, "state_discretization": "collocation", "collocation_type": "radau", "collocation_deg": 3, "collocation_ni": 1, "store_full_solution": True, # Use MA27 linear solver in ipopt for faster calculations: "nlpsol_opts": {"ipopt.linear_solver": "mumps"}, } mpc.set_param(**mpc_params) mpc.set_objective(mterm=terminal_cost, lterm=stage_cost) if u_penalty is not None: mpc.set_rterm(**u_penalty) # lower and upper bounds of the position if x_limits is not None: for key, value in x_limits.items(): mpc.bounds["lower", "_x", key] = value[0] mpc.bounds["upper", "_x", key] = value[1] # lower and upper bounds of the input if u_limits is not None: for key, value in u_limits.items(): mpc.bounds["lower", "_u", key] = value[0] mpc.bounds["upper", "_u", key] = value[1] # Parameter uncertainty if uncertainty_values is not None: mpc.set_uncertainty_values(**uncertainty_values) mpc.setup() return mpc