import logging
from abc import ABCMeta, abstractmethod, abstractproperty
from typing import Any, Dict, Iterator, List, Tuple, Union
import casadi as ca
import numpy as np
from rtctools._internal.alias_tools import AliasDict
from rtctools._internal.debug_check_helpers import DebugLevel, debug_check
from rtctools.data.storage import DataStoreAccessor
from .timeseries import Timeseries
logger = logging.getLogger("rtctools")
# Typical type for a bound on a variable
BT = Union[float, np.ndarray, Timeseries]
class LookupTable:
"""
Base class for LookupTables.
"""
@property
def inputs(self) -> List[ca.MX]:
"""
List of lookup table input variables.
"""
raise NotImplementedError
@property
def function(self) -> ca.Function:
"""
Lookup table CasADi :class:`Function`.
"""
raise NotImplementedError
[docs]
class OptimizationProblem(DataStoreAccessor, metaclass=ABCMeta):
"""
Base class for all optimization problems.
"""
_debug_check_level = DebugLevel.MEDIUM
_debug_check_options = {}
def __init__(self, **kwargs):
# Call parent class first for default behaviour.
super().__init__(**kwargs)
self.__mixed_integer = False
[docs]
def optimize(
self,
preprocessing: bool = True,
postprocessing: bool = True,
log_solver_failure_as_error: bool = True,
) -> bool:
"""
Perform one initialize-transcribe-solve-finalize cycle.
:param preprocessing: True to enable a call to ``pre`` preceding the optimization.
:param postprocessing: True to enable a call to ``post`` following the optimization.
:returns: True on success.
"""
# Deprecations / removals
if hasattr(self, "initial_state"):
raise RuntimeError(
"Support for `initial_state()` has been removed. Please use `history()` instead."
)
logger.info("Entering optimize()")
# Do any preprocessing, which may include changing parameter values on
# the model
if preprocessing:
self.pre()
# Check if control inputs are bounded
self.__check_bounds_control_input()
else:
logger.debug("Skipping Preprocessing in OptimizationProblem.optimize()")
# Transcribe problem
discrete, lbx, ubx, lbg, ubg, x0, nlp = self.transcribe()
# Create an NLP solver
logger.debug("Collecting solver options")
self.__mixed_integer = np.any(discrete)
options = {}
options.update(self.solver_options()) # Create a copy
logger.debug("Creating solver")
if options.pop("expand", False):
# NOTE: CasADi only supports the "expand" option for nlpsol. To
# also be able to expand with e.g. qpsol, we do the expansion
# ourselves here.
logger.debug("Expanding objective and constraints to SX")
expand_f_g = ca.Function("f_g", [nlp["x"]], [nlp["f"], nlp["g"]]).expand()
X_sx = ca.SX.sym("X", *nlp["x"].shape)
f_sx, g_sx = expand_f_g(X_sx)
nlp["f"] = f_sx
nlp["g"] = g_sx
nlp["x"] = X_sx
# Debug check for non-linearity in constraints
self.__debug_check_linearity_constraints(nlp)
# Debug check for linear independence of the constraints
self.__debug_check_linear_independence(lbx, ubx, lbg, ubg, nlp)
# Solver option
my_solver = options["solver"]
del options["solver"]
# Already consumed
del options["optimized_num_dir"]
# Iteration callback
iteration_callback = options.pop("iteration_callback", None)
# CasADi solver to use
casadi_solver = options.pop("casadi_solver")
if isinstance(casadi_solver, str):
casadi_solver = getattr(ca, casadi_solver)
nlpsol_options = {**options}
if self.__mixed_integer:
nlpsol_options["discrete"] = discrete
if iteration_callback:
nlpsol_options["iteration_callback"] = iteration_callback
# Remove ipopt and bonmin defaults if they are not used
if my_solver != "ipopt":
nlpsol_options.pop("ipopt", None)
if my_solver != "bonmin":
nlpsol_options.pop("bonmin", None)
solver = casadi_solver("nlp", my_solver, nlp, nlpsol_options)
# Solve NLP
logger.info("Calling solver")
results = solver(x0=x0, lbx=lbx, ubx=ubx, lbg=ca.veccat(*lbg), ubg=ca.veccat(*ubg))
# Extract relevant stats
self.__objective_value = float(results["f"])
self.__solver_output = np.array(results["x"]).ravel()
self.__transcribed_problem = {
"lbx": lbx,
"ubx": ubx,
"lbg": lbg,
"ubg": ubg,
"x0": x0,
"nlp": nlp,
}
self.__lam_g = results.get("lam_g")
self.__lam_x = results.get("lam_x")
self.__solver_stats = solver.stats()
success, log_level = self.solver_success(self.__solver_stats, log_solver_failure_as_error)
return_status = self.__solver_stats["return_status"]
if "secondary_return_status" in self.__solver_stats:
return_status = "{}: {}".format(
return_status, self.__solver_stats["secondary_return_status"]
)
wall_clock_time = "elapsed time not read"
if "t_wall_total" in self.__solver_stats:
wall_clock_time = "{} seconds".format(self.__solver_stats["t_wall_total"])
elif "t_wall_solver" in self.__solver_stats:
wall_clock_time = "{} seconds".format(self.__solver_stats["t_wall_solver"])
if success:
logger.log(
log_level,
"Solver succeeded with status {} ({}).".format(return_status, wall_clock_time),
)
else:
try:
ii = [y[0] for y in self.loop_over_error].index(self.priority)
loop_error_indicator = self.loop_over_error[ii][1]
try:
loop_error = self.loop_over_error[ii][2]
if loop_error_indicator and loop_error in return_status:
log_level = logging.INFO
except IndexError:
if loop_error_indicator:
log_level = logging.INFO
logger.log(
log_level,
"Solver succeeded with status {} ({}).".format(return_status, wall_clock_time),
)
except (AttributeError, ValueError):
logger.log(
log_level,
"Solver succeeded with status {} ({}).".format(return_status, wall_clock_time),
)
# Do any postprocessing
if postprocessing:
self.post()
else:
logger.debug("Skipping Postprocessing in OptimizationProblem.optimize()")
# Done
logger.info("Done with optimize()")
return success
def __check_bounds_control_input(self) -> None:
# Checks if at the control inputs have bounds, log warning when a control input is not
# bounded.
bounds = self.bounds()
for variable in self.dae_variables["control_inputs"]:
variable = variable.name()
if variable not in bounds:
logger.warning(
"OptimizationProblem: control input {} has no bounds.".format(variable)
)
@abstractmethod
def transcribe(
self,
) -> Tuple[
np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, Dict[str, ca.MX]
]:
"""
Transcribe the continuous optimization problem to a discretized, solver-ready
optimization problem.
"""
pass
[docs]
def solver_options(self) -> Dict[str, Union[str, int, float, bool, str]]:
"""
Returns a dictionary of CasADi optimization problem solver options.
The default solver for continuous problems is `Ipopt
<https://projects.coin-or.org/Ipopt/>`_.
The default solver for mixed integer problems is `Bonmin
<http://projects.coin-or.org/Bonmin/>`_.
:returns: A dictionary of solver options. See the CasADi and
respective solver documentation for details.
"""
options = {"error_on_fail": False, "optimized_num_dir": 3, "casadi_solver": ca.nlpsol}
if self.__mixed_integer:
options["solver"] = "bonmin"
bonmin_options = options["bonmin"] = {}
bonmin_options["algorithm"] = "B-BB"
bonmin_options["nlp_solver"] = "Ipopt"
bonmin_options["nlp_log_level"] = 2
bonmin_options["linear_solver"] = "mumps"
else:
options["solver"] = "ipopt"
ipopt_options = options["ipopt"] = {}
ipopt_options["linear_solver"] = "mumps"
return options
[docs]
def solver_success(
self, solver_stats: Dict[str, Union[str, bool]], log_solver_failure_as_error: bool
) -> Tuple[bool, int]:
"""
Translates the returned solver statistics into a boolean and log level
to indicate whether the solve was succesful, and how to log it.
:param solver_stats: Dictionary containing information about the
solver status. See explanation below.
:param log_solver_failure_as_error: Indicates whether a solve failure
Should be logged as an error or info message.
``solver_stats`` typically consist of three fields:
* return_status: ``str``
* secondary_return_status: ``str``
* success: ``bool``
By default we rely on CasADi's interpretation of the return_status
(and secondary status) to the success variable, with an exception for
IPOPT (see below).
The logging level is typically ``logging.INFO`` for success, and
``logging.ERROR`` for failure. Only for IPOPT an exception is made for
`Not_Enough_Degrees_Of_Freedom`, which returns ``logging.WARNING`` instead.
For example, this can happen when too many goals are specified, and
lower priority goals cannot improve further on the current result.
:returns: A tuple indicating whether or not the solver has succeeded, and what level to log
it with.
"""
success = solver_stats["success"]
log_level = logging.INFO if success else logging.ERROR
if self.solver_options()["solver"].lower() in ["bonmin", "ipopt"] and solver_stats[
"return_status"
] in ["Not_Enough_Degrees_Of_Freedom"]:
log_level = logging.WARNING
if log_level == logging.ERROR and not log_solver_failure_as_error:
log_level = logging.INFO
return success, log_level
@abstractproperty
def solver_input(self) -> ca.MX:
"""
The symbolic input to the NLP solver.
"""
pass
@abstractmethod
def extract_results(self, ensemble_member: int = 0) -> Dict[str, np.ndarray]:
"""
Extracts state and control input time series from optimizer results.
:returns: A dictionary of result time series.
"""
pass
@property
def objective_value(self) -> float:
"""
The last obtained objective function value.
"""
return self.__objective_value
@property
def solver_output(self) -> np.ndarray:
"""
The raw output from the last NLP solver run.
"""
return self.__solver_output
@property
def solver_stats(self) -> Dict[str, Any]:
"""
The stats from the last NLP solver run.
"""
return self.__solver_stats
@property
def lagrange_multipliers(self) -> Tuple[Any, Any]:
"""
The lagrange multipliers at the solution.
"""
return self.__lam_g, self.__lam_x
@property
def transcribed_problem(self) -> Dict[str, Any]:
"""
The transcribed problem.
"""
return self.__transcribed_problem
[docs]
def pre(self) -> None:
"""
Preprocessing logic is performed here.
"""
pass
@abstractproperty
def dae_residual(self) -> ca.MX:
"""
Symbolic DAE residual of the model.
"""
pass
@abstractproperty
def dae_variables(self) -> Dict[str, List[ca.MX]]:
"""
Dictionary of symbolic variables for the DAE residual.
"""
pass
@property
def path_variables(self) -> List[ca.MX]:
"""
List of additional, time-dependent optimization variables, not covered by the DAE model.
"""
return []
@abstractmethod
def variable(self, variable: str) -> ca.MX:
"""
Returns an :class:`MX` symbol for the given variable.
:param variable: Variable name.
:returns: The associated CasADi :class:`MX` symbol.
"""
raise NotImplementedError
@property
def extra_variables(self) -> List[ca.MX]:
"""
List of additional, time-independent optimization variables, not covered by the DAE model.
"""
return []
@property
def output_variables(self) -> List[ca.MX]:
"""
List of variables that the user requests to be included in the output files.
"""
return []
[docs]
def delayed_feedback(self) -> List[Tuple[str, str, float]]:
"""
Returns the delayed feedback mappings. These are given as a list of triples
:math:`(x, y, \\tau)`, to indicate that :math:`y = x(t - \\tau)`.
:returns: A list of triples.
Example::
def delayed_feedback(self):
fb1 = ['x', 'y', 0.1]
fb2 = ['x', 'z', 0.2]
return [fb1, fb2]
"""
return []
@property
def ensemble_size(self) -> int:
"""
The number of ensemble members.
"""
return 1
[docs]
def ensemble_member_probability(self, ensemble_member: int) -> float:
"""
The probability of an ensemble member occurring.
:param ensemble_member: The ensemble member index.
:returns: The probability of an ensemble member occurring.
:raises: IndexError
"""
return 1.0
[docs]
def parameters(self, ensemble_member: int) -> AliasDict[str, Union[bool, int, float, ca.MX]]:
"""
Returns a dictionary of parameters.
:param ensemble_member: The ensemble member index.
:returns: A dictionary of parameter names and values.
"""
return AliasDict(self.alias_relation)
def string_parameters(self, ensemble_member: int) -> Dict[str, str]:
"""
Returns a dictionary of string parameters.
:param ensemble_member: The ensemble member index.
:returns: A dictionary of string parameter names and values.
"""
return {}
[docs]
def lookup_tables(self, ensemble_member: int) -> AliasDict[str, LookupTable]:
"""
Returns a dictionary of lookup tables.
:param ensemble_member: The ensemble member index.
:returns: A dictionary of variable names and lookup tables.
"""
return AliasDict(self.alias_relation)
[docs]
@staticmethod
def merge_bounds(a: Tuple[BT, BT], b: Tuple[BT, BT]) -> Tuple[BT, BT]:
"""
Returns a pair of bounds which is the intersection of the two pairs of
bounds given as input.
:param a: First pair ``(upper, lower)`` bounds
:param b: Second pair ``(upper, lower)`` bounds
:returns: A pair of ``(upper, lower)`` bounds which is the
intersection of the two input bounds.
"""
a, A = a
b, B = b
# Make sure we are dealing with the correct types
if __debug__:
for v in (a, A, b, B):
if isinstance(v, np.ndarray):
assert v.ndim == 1
assert np.issubdtype(v.dtype, np.number)
else:
assert isinstance(v, (float, int, Timeseries))
all_bounds = [a, A, b, B]
# First make sure that we treat single element vectors as scalars
for i, v in enumerate(all_bounds):
if isinstance(v, np.ndarray) and np.prod(v.shape) == 1:
all_bounds[i] = v.item()
# Upcast lower bounds to be of equal type, and upper bounds as well.
for i, j in [(0, 2), (2, 0), (1, 3), (3, 1)]:
v1 = all_bounds[i]
v2 = all_bounds[j]
# We only check for v1 being of a "smaller" type than v2, as we
# know we will encounter the reverse as well.
if isinstance(v1, type(v2)):
# Same type, nothing to do.
continue
elif isinstance(v1, (int, float)) and isinstance(v2, Timeseries):
all_bounds[i] = Timeseries(v2.times, np.full_like(v2.values, v1))
elif isinstance(v1, np.ndarray) and isinstance(v2, Timeseries):
if v2.values.ndim != 2 or len(v1) != v2.values.shape[1]:
raise Exception(
"Mismatching vector size when upcasting to Timeseries, {} vs. {}.".format(
v1, v2
)
)
all_bounds[i] = Timeseries(v2.times, np.broadcast_to(v1, v2.values.shape))
elif isinstance(v1, (int, float)) and isinstance(v2, np.ndarray):
all_bounds[i] = np.full_like(v2, v1)
a, A, b, B = all_bounds
assert isinstance(a, type(b))
assert isinstance(A, type(B))
# Merge the bounds
m, M = None, None
if isinstance(a, np.ndarray):
if not a.shape == b.shape:
raise Exception("Cannot merge vector minimum bounds of non-equal size")
m = np.maximum(a, b)
elif isinstance(a, Timeseries):
if len(a.times) != len(b.times):
raise Exception("Cannot merge Timeseries minimum bounds with different lengths")
elif not np.all(a.times == b.times):
raise Exception("Cannot merge Timeseries minimum bounds with non-equal times")
elif not a.values.shape == b.values.shape:
raise Exception("Cannot merge vector Timeseries minimum bounds of non-equal size")
m = Timeseries(a.times, np.maximum(a.values, b.values))
else:
m = max(a, b)
if isinstance(A, np.ndarray):
if not A.shape == B.shape:
raise Exception("Cannot merge vector maximum bounds of non-equal size")
M = np.minimum(A, B)
elif isinstance(A, Timeseries):
if len(A.times) != len(B.times):
raise Exception("Cannot merge Timeseries maximum bounds with different lengths")
elif not np.all(A.times == B.times):
raise Exception("Cannot merge Timeseries maximum bounds with non-equal times")
elif not A.values.shape == B.values.shape:
raise Exception("Cannot merge vector Timeseries maximum bounds of non-equal size")
M = Timeseries(A.times, np.minimum(A.values, B.values))
else:
M = min(A, B)
return m, M
[docs]
def bounds(self) -> AliasDict[str, Tuple[BT, BT]]:
"""
Returns variable bounds as a dictionary mapping variable names to a pair of bounds.
A bound may be a constant, or a time series.
:returns: A dictionary of variable names and ``(upper, lower)`` bound pairs.
The bounds may be numbers or :class:`.Timeseries` objects.
Example::
def bounds(self):
return {'x': (1.0, 2.0), 'y': (2.0, 3.0)}
"""
return AliasDict(self.alias_relation)
[docs]
def history(self, ensemble_member: int) -> AliasDict[str, Timeseries]:
"""
Returns the state history.
:param ensemble_member: The ensemble member index.
:returns:
A dictionary of variable names and historical time series (up to and including t0).
"""
return AliasDict(self.alias_relation)
def variable_is_discrete(self, variable: str) -> bool:
"""
Returns ``True`` if the provided variable is discrete.
:param variable: Variable name.
:returns: ``True`` if variable is discrete (integer).
"""
return False
def variable_nominal(self, variable: str) -> Union[float, np.ndarray]:
"""
Returns the nominal value of the variable. Variables are scaled by replacing them with
their nominal value multiplied by the new variable.
:param variable: Variable name.
:returns: The nominal value of the variable.
"""
return 1
@property
def initial_time(self) -> float:
"""
The initial time in seconds.
"""
return self.times()[0]
@property
def initial_residual(self) -> ca.MX:
"""
The initial equation residual.
Initial equations are used to find consistent initial conditions.
:returns: An :class:`MX` object representing F in the initial equation F = 0.
"""
return ca.MX(0)
[docs]
def seed(self, ensemble_member: int) -> AliasDict[str, Union[float, Timeseries]]:
"""
Seeding data. The optimization algorithm is seeded with the data returned by this method.
:param ensemble_member: The ensemble member index.
:returns: A dictionary of variable names and seed time series.
"""
return AliasDict(self.alias_relation)
[docs]
def objective(self, ensemble_member: int) -> ca.MX:
"""
The objective function for the given ensemble member.
Call :func:`OptimizationProblem.state_at` to return a symbol representing a model variable
at a given time.
:param ensemble_member: The ensemble member index.
:returns: An :class:`MX` object representing the objective function.
Example::
def objective(self, ensemble_member):
# Return value of state 'x' at final time:
times = self.times()
return self.state_at('x', times[-1], ensemble_member)
"""
return ca.MX(0)
[docs]
def path_objective(self, ensemble_member: int) -> ca.MX:
"""
Returns a path objective the given ensemble member.
Path objectives apply to all times and ensemble members simultaneously.
Call :func:`OptimizationProblem.state` to return a time- and ensemble-member-independent
symbol representing a model variable.
:param ensemble_member: The ensemble member index. This index is currently unused,
and here for future use only.
:returns: A :class:`MX` object representing the path objective.
Example::
def path_objective(self, ensemble_member):
# Minimize x(t) for all t
return self.state('x')
"""
return ca.MX(0)
[docs]
def constraints(
self, ensemble_member: int
) -> List[Tuple[ca.MX, Union[float, np.ndarray], Union[float, np.ndarray]]]:
"""
Returns a list of constraints for the given ensemble member.
Call :func:`OptimizationProblem.state_at` to return a symbol representing a model variable
at a given time.
:param ensemble_member: The ensemble member index.
:returns: A list of triples ``(f, m, M)``, with an :class:`MX` object representing
the constraint function ``f``, lower bound ``m``, and upper bound ``M``.
The bounds must be numbers.
Example::
def constraints(self, ensemble_member):
t = 1.0
constraint1 = (
2 * self.state_at('x', t, ensemble_member),
2.0, 4.0)
constraint2 = (
self.state_at('x', t, ensemble_member) + self.state_at('y', t, ensemble_member),
2.0, 3.0)
return [constraint1, constraint2]
"""
return []
[docs]
def path_constraints(
self, ensemble_member: int
) -> List[Tuple[ca.MX, Union[float, np.ndarray], Union[float, np.ndarray]]]:
"""
Returns a list of path constraints.
Path constraints apply to all times and ensemble members simultaneously.
Call :func:`OptimizationProblem.state` to return a time- and ensemble-member-independent
symbol representing a model variable.
:param ensemble_member: The ensemble member index. This index may only
be used to supply member-dependent bounds.
:returns: A list of triples ``(f, m, M)``, with an :class:`MX` object representing
the path constraint function ``f``, lower bound ``m``, and upper bound ``M``.
The bounds may be numbers or :class:`.Timeseries` objects.
Example::
def path_constraints(self, ensemble_member):
# 2 * x must lie between 2 and 4 for every time instance.
path_constraint1 = (2 * self.state('x'), 2.0, 4.0)
# x + y must lie between 2 and 3 for every time instance
path_constraint2 = (self.state('x') + self.state('y'), 2.0, 3.0)
return [path_constraint1, path_constraint2]
"""
return []
[docs]
def post(self) -> None:
"""
Postprocessing logic is performed here.
"""
pass
@property
def equidistant(self) -> bool:
"""
``True`` if all time series are equidistant.
"""
return False
INTERPOLATION_LINEAR = 0
INTERPOLATION_PIECEWISE_CONSTANT_FORWARD = 1
INTERPOLATION_PIECEWISE_CONSTANT_BACKWARD = 2
[docs]
def interpolate(
self,
t: Union[float, np.ndarray],
ts: np.ndarray,
fs: np.ndarray,
f_left: float = np.nan,
f_right: float = np.nan,
mode: int = INTERPOLATION_LINEAR,
) -> Union[float, np.ndarray]:
"""
Linear interpolation over time.
:param t: Time at which to evaluate the interpolant.
:type t: float or vector of floats
:param ts: Time stamps.
:type ts: numpy array
:param fs: Function values at time stamps ts.
:param f_left: Function value left of leftmost time stamp.
:param f_right: Function value right of rightmost time stamp.
:param mode: Interpolation mode.
:returns: The interpolated value.
"""
if isinstance(fs, np.ndarray) and fs.ndim == 2:
# 2-D array of values. Interpolate each column separately.
if len(t) == len(ts) and np.all(t == ts):
# Early termination; nothing to interpolate
return fs.copy()
fs_int = [
self.interpolate(t, ts, fs[:, i], f_left, f_right, mode) for i in range(fs.shape[1])
]
return np.stack(fs_int, axis=1)
elif hasattr(t, "__iter__"):
if len(t) == len(ts) and np.all(t == ts):
# Early termination; nothing to interpolate
return fs.copy()
return self.__interpolate(t, ts, fs, f_left, f_right, mode)
else:
if ts[0] == t:
# Early termination; nothing to interpolate
return fs[0]
return self.__interpolate(t, ts, fs, f_left, f_right, mode)
def __interpolate(self, t, ts, fs, f_left=np.nan, f_right=np.nan, mode=INTERPOLATION_LINEAR):
"""
Linear interpolation over time.
:param t: Time at which to evaluate the interpolant.
:type t: float or vector of floats
:param ts: Time stamps.
:type ts: numpy array
:param fs: Function values at time stamps ts.
:param f_left: Function value left of leftmost time stamp.
:param f_right: Function value right of rightmost time stamp.
:param mode: Interpolation mode.
Note that it is assumed that `ts` is sorted. No such assumption is made for `t`.
:returns: The interpolated value.
"""
if f_left is None:
if (min(t) if hasattr(t, "__iter__") else t) < ts[0]:
raise Exception("Interpolation: Point {} left of range".format(t))
if f_right is None:
if (max(t) if hasattr(t, "__iter__") else t) > ts[-1]:
raise Exception("Interpolation: Point {} right of range".format(t))
if mode == self.INTERPOLATION_LINEAR:
# No need to handle f_left / f_right; NumPy already does this for us
return np.interp(t, ts, fs, f_left, f_right)
elif mode == self.INTERPOLATION_PIECEWISE_CONSTANT_FORWARD:
v = fs[np.maximum(np.searchsorted(ts, t, side="right") - 1, 0)]
elif mode == self.INTERPOLATION_PIECEWISE_CONSTANT_BACKWARD:
v = fs[np.minimum(np.searchsorted(ts, t, side="left"), len(ts) - 1)]
else:
raise NotImplementedError
# Handle f_left / f_right:
if hasattr(t, "__iter__"):
v[t < ts[0]] = f_left
v[t > ts[-1]] = f_right
else:
if t < ts[0]:
v = f_left
elif t > ts[-1]:
v = f_right
return v
@abstractproperty
def controls(self) -> List[str]:
"""
List of names of the control variables (excluding aliases).
"""
pass
@abstractmethod
def discretize_controls(
self, resolved_bounds: AliasDict
) -> Tuple[int, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Performs the discretization of the control inputs, filling lower and upper
bound vectors for the resulting optimization variables, as well as an initial guess.
:param resolved_bounds: :class:`AliasDict` of numerical bound values. This is the
same dictionary as returned by :func:`bounds`, but with all
parameter symbols replaced with their numerical values.
:returns: The number of control variables in the optimization problem, a lower
bound vector, an upper bound vector, a seed vector, and a dictionary
of offset values.
"""
pass
def dynamic_parameters(self) -> List[ca.MX]:
"""
Returns a list of parameter symbols that may vary from run to run. The values
of these parameters are not cached.
:returns: A list of parameter symbols.
"""
return []
@abstractmethod
def extract_controls(self, ensemble_member: int = 0) -> Dict[str, np.ndarray]:
"""
Extracts state time series from optimizer results.
Must return a dictionary of result time series.
:param ensemble_member: The ensemble member index.
:returns: A dictionary of control input time series.
"""
pass
def control_vector(self, variable: str, ensemble_member: int = 0) -> Union[ca.MX, List[ca.MX]]:
"""
Return the optimization variables for the entire time horizon of the given state.
:param variable: Variable name.
:param ensemble_member: The ensemble member index.
:returns: A vector of control input symbols for the entire time horizon.
:raises: KeyError
"""
return self.state_vector(variable, ensemble_member)
[docs]
def control(self, variable: str) -> ca.MX:
"""
Returns an :class:`MX` symbol for the given control input, not bound to any time.
:param variable: Variable name.
:returns: :class:`MX` symbol for given control input.
:raises: KeyError
"""
return self.variable(variable)
[docs]
@abstractmethod
def control_at(
self, variable: str, t: float, ensemble_member: int = 0, scaled: bool = False
) -> ca.MX:
"""
Returns an :class:`MX` symbol representing the given control input at the given time.
:param variable: Variable name.
:param t: Time.
:param ensemble_member: The ensemble member index.
:param scaled: True to return the scaled variable.
:returns: :class:`MX` symbol representing the control input at the given time.
:raises: KeyError
"""
pass
@abstractproperty
def differentiated_states(self) -> List[str]:
"""
List of names of the differentiated state variables (excluding aliases).
"""
pass
@abstractproperty
def algebraic_states(self) -> List[str]:
"""
List of names of the algebraic state variables (excluding aliases).
"""
pass
@abstractmethod
def discretize_states(
self, resolved_bounds: AliasDict
) -> Tuple[int, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
"""
Perform the discretization of the states.
Fills lower and upper bound vectors for the resulting optimization
variables, as well as an initial guess.
:param resolved_bounds: :class:`AliasDict` of numerical bound values.
This is the same dictionary as returned by :func:`bounds`, but
with all parameter symbols replaced with their numerical values.
:returns: The number of control variables in the optimization problem,
a lower bound vector, an upper bound vector, a seed vector,
and a dictionary of vector offset values.
"""
pass
@abstractmethod
def extract_states(self, ensemble_member: int = 0) -> Dict[str, np.ndarray]:
"""
Extracts state time series from optimizer results.
Must return a dictionary of result time series.
:param ensemble_member: The ensemble member index.
:returns: A dictionary of state time series.
"""
pass
@abstractmethod
def state_vector(self, variable: str, ensemble_member: int = 0) -> Union[ca.MX, List[ca.MX]]:
"""
Return the optimization variables for the entire time horizon of the given state.
:param variable: Variable name.
:param ensemble_member: The ensemble member index.
:returns: A vector of state symbols for the entire time horizon.
:raises: KeyError
"""
pass
[docs]
def state(self, variable: str) -> ca.MX:
"""
Returns an :class:`MX` symbol for the given state, not bound to any time.
:param variable: Variable name.
:returns: :class:`MX` symbol for given state.
:raises: KeyError
"""
return self.variable(variable)
[docs]
@abstractmethod
def state_at(
self, variable: str, t: float, ensemble_member: int = 0, scaled: bool = False
) -> ca.MX:
"""
Returns an :class:`MX` symbol representing the given variable at the given time.
:param variable: Variable name.
:param t: Time.
:param ensemble_member: The ensemble member index.
:param scaled: True to return the scaled variable.
:returns: :class:`MX` symbol representing the state at the given time.
:raises: KeyError
"""
pass
@abstractmethod
def extra_variable(self, variable: str, ensemble_member: int = 0) -> ca.MX:
"""
Returns an :class:`MX` symbol representing the extra variable inside the state vector.
:param variable: Variable name.
:param ensemble_member: The ensemble member index.
:returns: :class:`MX` symbol representing the extra variable.
:raises: KeyError
"""
pass
[docs]
@abstractmethod
def states_in(
self, variable: str, t0: float = None, tf: float = None, ensemble_member: int = 0
) -> Iterator[ca.MX]:
"""
Iterates over symbols for states in the interval [t0, tf].
:param variable: Variable name.
:param t0: Left bound of interval. If equal to None, the initial time is used.
:param tf: Right bound of interval. If equal to None, the final time is used.
:param ensemble_member: The ensemble member index.
:raises: KeyError
"""
pass
[docs]
@abstractmethod
def integral(
self, variable: str, t0: float = None, tf: float = None, ensemble_member: int = 0
) -> ca.MX:
"""
Returns an expression for the integral over the interval [t0, tf].
:param variable: Variable name.
:param t0: Left bound of interval. If equal to None, the initial time is used.
:param tf: Right bound of interval. If equal to None, the final time is used.
:param ensemble_member: The ensemble member index.
:returns: :class:`MX` object representing the integral.
:raises: KeyError
"""
pass
[docs]
@abstractmethod
def der(self, variable: str) -> ca.MX:
"""
Returns an :class:`MX` symbol for the time derivative given state, not bound to any time.
:param variable: Variable name.
:returns: :class:`MX` symbol for given state.
:raises: KeyError
"""
pass
[docs]
@abstractmethod
def der_at(self, variable: str, t: float, ensemble_member: int = 0) -> ca.MX:
"""
Returns an expression for the time derivative of the specified variable at time t.
:param variable: Variable name.
:param t: Time.
:param ensemble_member: The ensemble member index.
:returns: :class:`MX` object representing the derivative.
:raises: KeyError
"""
pass
[docs]
def get_timeseries(self, variable: str, ensemble_member: int = 0) -> Timeseries:
"""
Looks up a timeseries from the internal data store.
:param variable: Variable name.
:param ensemble_member: The ensemble member index.
:returns: The requested time series.
:rtype: :class:`.Timeseries`
:raises: KeyError
"""
raise NotImplementedError
[docs]
def set_timeseries(
self,
variable: str,
timeseries: Timeseries,
ensemble_member: int = 0,
output: bool = True,
check_consistency: bool = True,
) -> None:
"""
Sets a timeseries in the internal data store.
:param variable: Variable name.
:param timeseries: Time series data.
:type timeseries: iterable of floats, or :class:`.Timeseries`
:param ensemble_member: The ensemble member index.
:param output: Whether to include this time series in output data files.
:param check_consistency: Whether to check consistency between the time stamps on
the new timeseries object and any existing time stamps.
"""
raise NotImplementedError
[docs]
def timeseries_at(self, variable: str, t: float, ensemble_member: int = 0) -> float:
"""
Return the value of a time series at the given time.
:param variable: Variable name.
:param t: Time.
:param ensemble_member: The ensemble member index.
:returns: The interpolated value of the time series.
:raises: KeyError
"""
raise NotImplementedError
def map_path_expression(self, expr: ca.MX, ensemble_member: int) -> ca.MX:
"""
Maps the path expression `expr` over the entire time horizon of the optimization problem.
:param expr: An :class:`MX` path expression.
:returns: An :class:`MX` expression evaluating `expr` over the entire time horizon.
"""
raise NotImplementedError
@debug_check(DebugLevel.HIGH)
def __debug_check_linearity_constraints(self, nlp):
x = nlp["x"]
f = nlp["f"]
g = nlp["g"]
expand_f_g = ca.Function("f_g", [x], [f, g]).expand()
X_sx = ca.SX.sym("X", *x.shape)
f_sx, g_sx = expand_f_g(X_sx)
jac = ca.Function("j", [X_sx], [ca.jacobian(g_sx, X_sx)]).expand()
if jac(np.nan).is_regular():
logger.info("The constraints are linear")
else:
hes = ca.Function("j", [X_sx], [ca.jacobian(ca.jacobian(g_sx, X_sx), X_sx)]).expand()
if hes(np.nan).is_regular():
logger.info("The constraints are quadratic")
else:
logger.info("The constraints are nonlinear")
@debug_check(DebugLevel.VERYHIGH)
def __debug_check_linear_independence(self, lbx, ubx, lbg, ubg, nlp):
x = nlp["x"]
f = nlp["f"]
g = nlp["g"]
expand_f_g = ca.Function("f_g", [x], [f, g]).expand()
x_sx = ca.SX.sym("X", *x.shape)
f_sx, g_sx = expand_f_g(x_sx)
x, f, g = x_sx, f_sx, g_sx
lbg = np.array(ca.vertsplit(ca.veccat(*lbg))).ravel()
ubg = np.array(ca.vertsplit(ca.veccat(*ubg))).ravel()
# Find the linear constraints
g_sjac = ca.Function("Af", [x], [ca.jtimes(g, x, x.ones(*x.shape))])
res = g_sjac(np.nan)
res = np.array(res).ravel()
g_is_linear = ~np.isnan(res)
# Find the rows in the jacobian with only a single entry
g_jac_csr = ca.DM(ca.Function("tmp", [x], [g]).sparsity_jac(0, 0)).tocsc().tocsr()
g_single_variable = np.diff(g_jac_csr.indptr) == 1
# Find the rows which are equality constraints
g_eq_constraint = lbg == ubg
# The intersection of all selections are constraints like we want
g_constant_assignment = g_is_linear & g_single_variable & g_eq_constraint
# Map of variable (index) to constraints/row numbers
var_index_assignment = {}
for i in range(g.size1()):
if g_constant_assignment[i]:
var_ind = g_jac_csr.getrow(i).indices[0]
var_index_assignment.setdefault(var_ind, []).append(i)
var_names, named_x, named_f, named_g = self._debug_get_named_nlp(nlp)
for vi, g_inds in var_index_assignment.items():
if len(g_inds) > 1:
logger.info(
"Variable '{}' has duplicate constraints setting its value:".format(
var_names[vi]
)
)
for g_i in g_inds:
logger.info("row {}: {} = {}".format(g_i, named_g[g_i], lbg[g_i]))
# Find variables for which the bounds are equal, but also an equality
# constraint is set. This would result in a constraint `1 = 1` with
# the default IPOPT option `fixed_variable_treatment = make_parameter`
x_inds = np.flatnonzero(lbx == ubx)
for vi in x_inds:
if vi in var_index_assignment:
logger.info(
"Variable '{}' has equal bounds (value = {}), "
"but also the following equality constraints:".format(var_names[vi], lbx[vi])
)
for g_i in var_index_assignment[vi]:
logger.info("row {}: {} = {}".format(g_i, named_g[g_i], lbg[g_i]))