Source code for training_ml_control.hyperopt

from typing import Callable

import numpy as np
import optuna
import pykoopman as pk
import pysindy as ps
from gymnasium import Env
from numpy.typing import NDArray
from sklearn.metrics import mean_squared_error

__all__ = ["create_sindy_objective", "create_dmd_objective"]


[docs] def create_sindy_objective( env: Env, X_train: NDArray, X_test: NDArray, U_train: NDArray, U_test: NDArray, t_train: NDArray, ) -> Callable[[optuna.Trial], tuple[float, float]]: def objective(trial: optuna.Trial) -> tuple[float, float]: threshold = trial.suggest_float("threshold", 0.05, 0.5) alpha = trial.suggest_float("threshold", 0.01, 0.5) max_iter = trial.suggest_int("max_iter", 50, 300) opt = ps.STLSQ(threshold=threshold, max_iter=max_iter, alpha=alpha) feature_library_cls_name = trial.suggest_categorical( "feature_library", ["polynomial", "fourier", "hybrid"] ) if feature_library_cls_name == "polynomial": degree = trial.suggest_int("degree", 1, 5) feature_library = ps.PolynomialLibrary(degree=degree) elif feature_library_cls_name == "fourier": n_frequencies = trial.suggest_int("n_frequencies", 1, 5) feature_library = ps.FourierLibrary(n_frequencies=n_frequencies) elif feature_library_cls_name == "hybrid": degree = trial.suggest_int("degree", 1, 5) n_frequencies = trial.suggest_int("n_frequencies", 1, 5) feature_library = ps.PolynomialLibrary(degree=degree) + ps.FourierLibrary( n_frequencies=n_frequencies ) else: raise ValueError(f"Unknown feature_library: {feature_library_cls_name}") order = trial.suggest_int("order", 1, 3) differentiation_method = ps.FiniteDifference(order=order) model = ps.SINDy( optimizer=opt, feature_library=feature_library, differentiation_method=differentiation_method, ) model.fit(X_train, u=U_train, t=t_train) score = model.score( X_test, u=U_test, t=env.dt, metric=mean_squared_error ).item() complexity = model.complexity coefficients = model.coefficients() coefficients_rows_all_zero_count = len(np.where(~coefficients.any(axis=1))[0]) if coefficients_rows_all_zero_count > 0: raise optuna.TrialPruned() return score, complexity return objective
[docs] def create_dmd_objective( env: Env, X_train: NDArray, X_test: NDArray, U_train: NDArray, U_test: NDArray ) -> Callable[[optuna.Trial], float]: def objective(trial: optuna.Trial) -> float: observable_cls_name = trial.suggest_categorical( "observable", ["polynomial", "time-delay", "fourier"] ) if observable_cls_name == "polynomial": degree = trial.suggest_int("degree", 1, 5) obsv = pk.observables.Polynomial(degree=degree) elif observable_cls_name == "time-delay": delay = trial.suggest_int("delay", 1, 5) n_delays = trial.suggest_int("n_delays", 1, 5) obsv = pk.observables.TimeDelay(delay=delay, n_delays=n_delays) elif observable_cls_name == "fourier": gamma = trial.suggest_float("gamma", 0.1, 2.0) D = trial.suggest_int("D", 1, 5) obsv = pk.observables.RandomFourierFeatures( include_state=True, gamma=gamma, D=D ) else: raise ValueError(f"Unknown observable: {observable_cls_name}") EDMDc = pk.regression.EDMDc() model = pk.Koopman(observables=obsv, regressor=EDMDc) try: model.fit(X_train, u=U_train, dt=env.dt) except: raise optuna.TrialPruned() X_dmd = model.simulate(X_test[0], U_test, n_steps=X_test.shape[0] - 1) X_dmd = np.vstack([X_test[0][np.newaxis, :], X_dmd]) score = mean_squared_error(X_test, X_dmd) return score return objective