Source code for rtctools.optimization.optimization_problem

import logging
from abc import ABCMeta, abstractmethod, abstractproperty
from typing import Any, Dict, Iterator, List, Tuple, Union

import casadi as ca

import numpy as np

from rtctools._internal.alias_tools import AliasDict
from rtctools._internal.debug_check_helpers import DebugLevel, debug_check
from rtctools.data.storage import DataStoreAccessor

from .timeseries import Timeseries

logger = logging.getLogger("rtctools")


# Typical type for a bound on a variable
BT = Union[float, np.ndarray, Timeseries]


class LookupTable:
    """
    Base class for LookupTables.
    """

    @property
    def inputs(self) -> List[ca.MX]:
        """
        List of lookup table input variables.
        """
        raise NotImplementedError

    @property
    def function(self) -> ca.Function:
        """
        Lookup table CasADi :class:`Function`.
        """
        raise NotImplementedError


[docs]class OptimizationProblem(DataStoreAccessor, metaclass=ABCMeta): """ Base class for all optimization problems. """ _debug_check_level = DebugLevel.MEDIUM _debug_check_options = {} def __init__(self, **kwargs): # Call parent class first for default behaviour. super().__init__(**kwargs) self.__mixed_integer = False
[docs] def optimize(self, preprocessing: bool = True, postprocessing: bool = True, log_solver_failure_as_error: bool = True) -> bool: """ Perform one initialize-transcribe-solve-finalize cycle. :param preprocessing: True to enable a call to ``pre`` preceding the optimization. :param postprocessing: True to enable a call to ``post`` following the optimization. :returns: True on success. """ # Deprecations / removals if hasattr(self, 'initial_state'): raise RuntimeError("Support for `initial_state()` has been removed. Please use `history()` instead.") logger.info("Entering optimize()") # Do any preprocessing, which may include changing parameter values on # the model if preprocessing: self.pre() # Check if control inputs are bounded self.__check_bounds_control_input() else: logger.debug( 'Skipping Preprocessing in OptimizationProblem.optimize()') # Transcribe problem discrete, lbx, ubx, lbg, ubg, x0, nlp = self.transcribe() # Create an NLP solver logger.debug("Collecting solver options") self.__mixed_integer = np.any(discrete) options = {} options.update(self.solver_options()) # Create a copy logger.debug("Creating solver") if options.pop('expand', False): # NOTE: CasADi only supports the "expand" option for nlpsol. To # also be able to expand with e.g. qpsol, we do the expansion # ourselves here. logger.debug("Expanding objective and constraints to SX") expand_f_g = ca.Function('f_g', [nlp['x']], [nlp['f'], nlp['g']]).expand() X_sx = ca.SX.sym('X', *nlp['x'].shape) f_sx, g_sx = expand_f_g(X_sx) nlp['f'] = f_sx nlp['g'] = g_sx nlp['x'] = X_sx # Debug check for non-linearity in constraints self.__debug_check_linearity_constraints(nlp) # Debug check for linear independence of the constraints self.__debug_check_linear_independence(lbx, ubx, lbg, ubg, nlp) # Solver option my_solver = options['solver'] del options['solver'] # Already consumed del options['optimized_num_dir'] # Iteration callback iteration_callback = options.pop('iteration_callback', None) # CasADi solver to use casadi_solver = options.pop('casadi_solver') if isinstance(casadi_solver, str): casadi_solver = getattr(ca, casadi_solver) nlpsol_options = {**options} if self.__mixed_integer: nlpsol_options['discrete'] = discrete if iteration_callback: nlpsol_options['iteration_callback'] = iteration_callback # Remove ipopt and bonmin defaults if they are not used if my_solver != 'ipopt': nlpsol_options.pop('ipopt', None) if my_solver != 'bonmin': nlpsol_options.pop('bonmin', None) solver = casadi_solver('nlp', my_solver, nlp, nlpsol_options) # Solve NLP logger.info("Calling solver") results = solver(x0=x0, lbx=lbx, ubx=ubx, lbg=ca.veccat(*lbg), ubg=ca.veccat(*ubg)) # Extract relevant stats self.__objective_value = float(results['f']) self.__solver_output = np.array(results['x']).ravel() self.__transcribed_problem = {"lbx": lbx, "ubx": ubx, "lbg": lbg, "ubg": ubg, "x0": x0, "nlp": nlp} self.__lam_g = results.get("lam_g") self.__lam_x = results.get("lam_x") self.__solver_stats = solver.stats() success, log_level = self.solver_success(self.__solver_stats, log_solver_failure_as_error) return_status = self.__solver_stats['return_status'] if 'secondary_return_status' in self.__solver_stats: return_status = "{}: {}".format(return_status, self.__solver_stats['secondary_return_status']) wall_clock_time = "elapsed time not read" if 't_wall_total' in self.__solver_stats: wall_clock_time = "{} seconds".format(self.__solver_stats['t_wall_total']) elif 't_wall_solver' in self.__solver_stats: wall_clock_time = "{} seconds".format(self.__solver_stats['t_wall_solver']) if success: logger.log(log_level, "Solver succeeded with status {} ({}).".format( return_status, wall_clock_time)) else: try: ii = [y[0] for y in self.loop_over_error].index(self.priority) loop_error_indicator = self.loop_over_error[ii][1] try: loop_error = self.loop_over_error[ii][2] if loop_error_indicator and loop_error in return_status: log_level = logging.INFO except IndexError: if loop_error_indicator: log_level = logging.INFO logger.log(log_level, "Solver succeeded with status {} ({}).".format( return_status, wall_clock_time)) except (AttributeError, ValueError): logger.log(log_level, "Solver succeeded with status {} ({}).".format( return_status, wall_clock_time)) # Do any postprocessing if postprocessing: self.post() else: logger.debug( 'Skipping Postprocessing in OptimizationProblem.optimize()') # Done logger.info("Done with optimize()") return success
def __check_bounds_control_input(self) -> None: # Checks if at the control inputs have bounds, log warning when a control input is not bounded. bounds = self.bounds() for variable in self.dae_variables['control_inputs']: variable = variable.name() if variable not in bounds: logger.warning( "OptimizationProblem: control input {} has no bounds.".format(variable)) @abstractmethod def transcribe(self) -> Tuple[ np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, Dict[str, ca.MX]]: """ Transcribe the continuous optimization problem to a discretized, solver-ready optimization problem. """ pass
[docs] def solver_options(self) -> Dict[str, Union[str, int, float, bool, str]]: """ Returns a dictionary of CasADi optimization problem solver options. The default solver for continuous problems is `Ipopt <https://projects.coin-or.org/Ipopt/>`_. The default solver for mixed integer problems is `Bonmin <http://projects.coin-or.org/Bonmin/>`_. :returns: A dictionary of solver options. See the CasADi and respective solver documentation for details. """ options = {'error_on_fail': False, 'optimized_num_dir': 3, 'casadi_solver': ca.nlpsol} if self.__mixed_integer: options['solver'] = 'bonmin' bonmin_options = options['bonmin'] = {} bonmin_options['algorithm'] = 'B-BB' bonmin_options['nlp_solver'] = 'Ipopt' bonmin_options['nlp_log_level'] = 2 bonmin_options['linear_solver'] = 'mumps' else: options['solver'] = 'ipopt' ipopt_options = options['ipopt'] = {} ipopt_options['linear_solver'] = 'mumps' return options
[docs] def solver_success(self, solver_stats: Dict[str, Union[str, bool]], log_solver_failure_as_error: bool) -> Tuple[bool, int]: """ Translates the returned solver statistics into a boolean and log level to indicate whether the solve was succesful, and how to log it. :param solver_stats: Dictionary containing information about the solver status. See explanation below. :param log_solver_failure_as_error: Indicates whether a solve failure Should be logged as an error or info message. ``solver_stats`` typically consist of three fields: * return_status: ``str`` * secondary_return_status: ``str`` * success: ``bool`` By default we rely on CasADi's interpretation of the return_status (and secondary status) to the success variable, with an exception for IPOPT (see below). The logging level is typically ``logging.INFO`` for success, and ``logging.ERROR`` for failure. Only for IPOPT an exception is made for `Not_Enough_Degrees_Of_Freedom`, which returns ``logging.WARNING`` instead. For example, this can happen when too many goals are specified, and lower priority goals cannot improve further on the current result. :returns: A tuple indicating whether or not the solver has succeeded, and what level to log it with. """ success = solver_stats['success'] log_level = logging.INFO if success else logging.ERROR if (self.solver_options()['solver'].lower() in ['bonmin', 'ipopt'] and solver_stats['return_status'] in ['Not_Enough_Degrees_Of_Freedom']): log_level = logging.WARNING if log_level == logging.ERROR and not log_solver_failure_as_error: log_level = logging.INFO return success, log_level
@abstractproperty def solver_input(self) -> ca.MX: """ The symbolic input to the NLP solver. """ pass @abstractmethod def extract_results(self, ensemble_member: int = 0) -> Dict[str, np.ndarray]: """ Extracts state and control input time series from optimizer results. :returns: A dictionary of result time series. """ pass @property def objective_value(self) -> float: """ The last obtained objective function value. """ return self.__objective_value @property def solver_output(self) -> np.ndarray: """ The raw output from the last NLP solver run. """ return self.__solver_output @property def solver_stats(self) -> Dict[str, Any]: """ The stats from the last NLP solver run. """ return self.__solver_stats @property def lagrange_multipliers(self) -> Tuple[Any, Any]: """ The lagrange multipliers at the solution. """ return self.__lam_g, self.__lam_x @property def transcribed_problem(self) -> Dict[str, Any]: """ The transcribed problem. """ return self.__transcribed_problem
[docs] def pre(self) -> None: """ Preprocessing logic is performed here. """ pass
@abstractproperty def dae_residual(self) -> ca.MX: """ Symbolic DAE residual of the model. """ pass @abstractproperty def dae_variables(self) -> Dict[str, List[ca.MX]]: """ Dictionary of symbolic variables for the DAE residual. """ pass @property def path_variables(self) -> List[ca.MX]: """ List of additional, time-dependent optimization variables, not covered by the DAE model. """ return [] @abstractmethod def variable(self, variable: str) -> ca.MX: """ Returns an :class:`MX` symbol for the given variable. :param variable: Variable name. :returns: The associated CasADi :class:`MX` symbol. """ raise NotImplementedError @property def extra_variables(self) -> List[ca.MX]: """ List of additional, time-independent optimization variables, not covered by the DAE model. """ return [] @property def output_variables(self) -> List[ca.MX]: """ List of variables that the user requests to be included in the output files. """ return []
[docs] def delayed_feedback(self) -> List[Tuple[str, str, float]]: """ Returns the delayed feedback mappings. These are given as a list of triples :math:`(x, y, \\tau)`, to indicate that :math:`y = x(t - \\tau)`. :returns: A list of triples. Example:: def delayed_feedback(self): fb1 = ['x', 'y', 0.1] fb2 = ['x', 'z', 0.2] return [fb1, fb2] """ return []
@property def ensemble_size(self) -> int: """ The number of ensemble members. """ return 1
[docs] def ensemble_member_probability(self, ensemble_member: int) -> float: """ The probability of an ensemble member occurring. :param ensemble_member: The ensemble member index. :returns: The probability of an ensemble member occurring. :raises: IndexError """ return 1.0
[docs] def parameters(self, ensemble_member: int) -> AliasDict[str, Union[bool, int, float, ca.MX]]: """ Returns a dictionary of parameters. :param ensemble_member: The ensemble member index. :returns: A dictionary of parameter names and values. """ return AliasDict(self.alias_relation)
def string_parameters(self, ensemble_member: int) -> Dict[str, str]: """ Returns a dictionary of string parameters. :param ensemble_member: The ensemble member index. :returns: A dictionary of string parameter names and values. """ return {}
[docs] def constant_inputs(self, ensemble_member: int) -> AliasDict[str, Timeseries]: """ Returns a dictionary of constant inputs. :param ensemble_member: The ensemble member index. :returns: A dictionary of constant input names and time series. """ return AliasDict(self.alias_relation)
[docs] def lookup_tables(self, ensemble_member: int) -> AliasDict[str, LookupTable]: """ Returns a dictionary of lookup tables. :param ensemble_member: The ensemble member index. :returns: A dictionary of variable names and lookup tables. """ return AliasDict(self.alias_relation)
[docs] @staticmethod def merge_bounds(a: Tuple[BT, BT], b: Tuple[BT, BT]) -> Tuple[BT, BT]: """ Returns a pair of bounds which is the intersection of the two pairs of bounds given as input. :param a: First pair ``(upper, lower)`` bounds :param b: Second pair ``(upper, lower)`` bounds :returns: A pair of ``(upper, lower)`` bounds which is the intersection of the two input bounds. """ a, A = a b, B = b # Make sure we are dealing with the correct types if __debug__: for v in (a, A, b, B): if isinstance(v, np.ndarray): assert v.ndim == 1 assert np.issubdtype(v.dtype, np.number) else: assert isinstance(v, (float, int, Timeseries)) all_bounds = [a, A, b, B] # First make sure that we treat single element vectors as scalars for i, v in enumerate(all_bounds): if isinstance(v, np.ndarray) and np.prod(v.shape) == 1: all_bounds[i] = v.item() # Upcast lower bounds to be of equal type, and upper bounds as well. for i, j in [(0, 2), (2, 0), (1, 3), (3, 1)]: v1 = all_bounds[i] v2 = all_bounds[j] # We only check for v1 being of a "smaller" type than v2, as we # know we will encounter the reverse as well. if isinstance(v1, type(v2)): # Same type, nothing to do. continue elif isinstance(v1, (int, float)) and isinstance(v2, Timeseries): all_bounds[i] = Timeseries(v2.times, np.full_like(v2.values, v1)) elif isinstance(v1, np.ndarray) and isinstance(v2, Timeseries): if v2.values.ndim != 2 or len(v1) != v2.values.shape[1]: raise Exception( "Mismatching vector size when upcasting to Timeseries, {} vs. {}.".format(v1, v2)) all_bounds[i] = Timeseries(v2.times, np.broadcast_to(v1, v2.values.shape)) elif isinstance(v1, (int, float)) and isinstance(v2, np.ndarray): all_bounds[i] = np.full_like(v2, v1) a, A, b, B = all_bounds assert isinstance(a, type(b)) assert isinstance(A, type(B)) # Merge the bounds m, M = None, None if isinstance(a, np.ndarray): if not a.shape == b.shape: raise Exception("Cannot merge vector minimum bounds of non-equal size") m = np.maximum(a, b) elif isinstance(a, Timeseries): if len(a.times) != len(b.times): raise Exception("Cannot merge Timeseries minimum bounds with different lengths") elif not np.all(a.times == b.times): raise Exception("Cannot merge Timeseries minimum bounds with non-equal times") elif not a.values.shape == b.values.shape: raise Exception("Cannot merge vector Timeseries minimum bounds of non-equal size") m = Timeseries(a.times, np.maximum(a.values, b.values)) else: m = max(a, b) if isinstance(A, np.ndarray): if not A.shape == B.shape: raise Exception("Cannot merge vector maximum bounds of non-equal size") M = np.minimum(A, B) elif isinstance(A, Timeseries): if len(A.times) != len(B.times): raise Exception("Cannot merge Timeseries maximum bounds with different lengths") elif not np.all(A.times == B.times): raise Exception("Cannot merge Timeseries maximum bounds with non-equal times") elif not A.values.shape == B.values.shape: raise Exception("Cannot merge vector Timeseries maximum bounds of non-equal size") M = Timeseries(A.times, np.minimum(A.values, B.values)) else: M = min(A, B) return m, M
[docs] def bounds(self) -> AliasDict[str, Tuple[BT, BT]]: """ Returns variable bounds as a dictionary mapping variable names to a pair of bounds. A bound may be a constant, or a time series. :returns: A dictionary of variable names and ``(upper, lower)`` bound pairs. The bounds may be numbers or :class:`.Timeseries` objects. Example:: def bounds(self): return {'x': (1.0, 2.0), 'y': (2.0, 3.0)} """ return AliasDict(self.alias_relation)
[docs] def history(self, ensemble_member: int) -> AliasDict[str, Timeseries]: """ Returns the state history. :param ensemble_member: The ensemble member index. :returns: A dictionary of variable names and historical time series (up to and including t0). """ return AliasDict(self.alias_relation)
def variable_is_discrete(self, variable: str) -> bool: """ Returns ``True`` if the provided variable is discrete. :param variable: Variable name. :returns: ``True`` if variable is discrete (integer). """ return False def variable_nominal(self, variable: str) -> Union[float, np.ndarray]: """ Returns the nominal value of the variable. Variables are scaled by replacing them with their nominal value multiplied by the new variable. :param variable: Variable name. :returns: The nominal value of the variable. """ return 1 @property def initial_time(self) -> float: """ The initial time in seconds. """ return self.times()[0] @property def initial_residual(self) -> ca.MX: """ The initial equation residual. Initial equations are used to find consistent initial conditions. :returns: An :class:`MX` object representing F in the initial equation F = 0. """ return ca.MX(0)
[docs] def seed(self, ensemble_member: int) -> AliasDict[str, Union[float, Timeseries]]: """ Seeding data. The optimization algorithm is seeded with the data returned by this method. :param ensemble_member: The ensemble member index. :returns: A dictionary of variable names and seed time series. """ return AliasDict(self.alias_relation)
[docs] def objective(self, ensemble_member: int) -> ca.MX: """ The objective function for the given ensemble member. Call :func:`OptimizationProblem.state_at` to return a symbol representing a model variable at a given time. :param ensemble_member: The ensemble member index. :returns: An :class:`MX` object representing the objective function. Example:: def objective(self, ensemble_member): # Return value of state 'x' at final time: times = self.times() return self.state_at('x', times[-1], ensemble_member) """ return ca.MX(0)
[docs] def path_objective(self, ensemble_member: int) -> ca.MX: """ Returns a path objective the given ensemble member. Path objectives apply to all times and ensemble members simultaneously. Call :func:`OptimizationProblem.state` to return a time- and ensemble-member-independent symbol representing a model variable. :param ensemble_member: The ensemble member index. This index is currently unused, and here for future use only. :returns: A :class:`MX` object representing the path objective. Example:: def path_objective(self, ensemble_member): # Minimize x(t) for all t return self.state('x') """ return ca.MX(0)
[docs] def constraints(self, ensemble_member: int) -> List[ Tuple[ca.MX, Union[float, np.ndarray], Union[float, np.ndarray]]]: """ Returns a list of constraints for the given ensemble member. Call :func:`OptimizationProblem.state_at` to return a symbol representing a model variable at a given time. :param ensemble_member: The ensemble member index. :returns: A list of triples ``(f, m, M)``, with an :class:`MX` object representing the constraint function ``f``, lower bound ``m``, and upper bound ``M``. The bounds must be numbers. Example:: def constraints(self, ensemble_member): t = 1.0 constraint1 = ( 2 * self.state_at('x', t, ensemble_member), 2.0, 4.0) constraint2 = ( self.state_at('x', t, ensemble_member) + self.state_at('y', t, ensemble_member), 2.0, 3.0) return [constraint1, constraint2] """ return []
[docs] def path_constraints(self, ensemble_member: int) -> List[ Tuple[ca.MX, Union[float, np.ndarray], Union[float, np.ndarray]]]: """ Returns a list of path constraints. Path constraints apply to all times and ensemble members simultaneously. Call :func:`OptimizationProblem.state` to return a time- and ensemble-member-independent symbol representing a model variable. :param ensemble_member: The ensemble member index. This index may only be used to supply member-dependent bounds. :returns: A list of triples ``(f, m, M)``, with an :class:`MX` object representing the path constraint function ``f``, lower bound ``m``, and upper bound ``M``. The bounds may be numbers or :class:`.Timeseries` objects. Example:: def path_constraints(self, ensemble_member): # 2 * x must lie between 2 and 4 for every time instance. path_constraint1 = (2 * self.state('x'), 2.0, 4.0) # x + y must lie between 2 and 3 for every time instance path_constraint2 = (self.state('x') + self.state('y'), 2.0, 3.0) return [path_constraint1, path_constraint2] """ return []
[docs] def post(self) -> None: """ Postprocessing logic is performed here. """ pass
@property def equidistant(self) -> bool: """ ``True`` if all time series are equidistant. """ return False INTERPOLATION_LINEAR = 0 INTERPOLATION_PIECEWISE_CONSTANT_FORWARD = 1 INTERPOLATION_PIECEWISE_CONSTANT_BACKWARD = 2
[docs] def interpolate( self, t: Union[float, np.ndarray], ts: np.ndarray, fs: np.ndarray, f_left: float = np.nan, f_right: float = np.nan, mode: int = INTERPOLATION_LINEAR) -> Union[float, np.ndarray]: """ Linear interpolation over time. :param t: Time at which to evaluate the interpolant. :type t: float or vector of floats :param ts: Time stamps. :type ts: numpy array :param fs: Function values at time stamps ts. :param f_left: Function value left of leftmost time stamp. :param f_right: Function value right of rightmost time stamp. :param mode: Interpolation mode. :returns: The interpolated value. """ if isinstance(fs, np.ndarray) and fs.ndim == 2: # 2-D array of values. Interpolate each column separately. if len(t) == len(ts) and np.all(t == ts): # Early termination; nothing to interpolate return fs.copy() fs_int = [self.interpolate(t, ts, fs[:, i], f_left, f_right, mode) for i in range(fs.shape[1])] return np.stack(fs_int, axis=1) elif hasattr(t, '__iter__'): if len(t) == len(ts) and np.all(t == ts): # Early termination; nothing to interpolate return fs.copy() return self.__interpolate(t, ts, fs, f_left, f_right, mode) else: if ts[0] == t: # Early termination; nothing to interpolate return fs[0] return self.__interpolate(t, ts, fs, f_left, f_right, mode)
def __interpolate(self, t, ts, fs, f_left=np.nan, f_right=np.nan, mode=INTERPOLATION_LINEAR): """ Linear interpolation over time. :param t: Time at which to evaluate the interpolant. :type t: float or vector of floats :param ts: Time stamps. :type ts: numpy array :param fs: Function values at time stamps ts. :param f_left: Function value left of leftmost time stamp. :param f_right: Function value right of rightmost time stamp. :param mode: Interpolation mode. Note that it is assumed that `ts` is sorted. No such assumption is made for `t` . :returns: The interpolated value. """ if f_left is None: if (min(t) if hasattr(t, '__iter__') else t) < ts[0]: raise Exception("Interpolation: Point {} left of range".format(t)) if f_right is None: if (max(t) if hasattr(t, '__iter__') else t) > ts[-1]: raise Exception("Interpolation: Point {} right of range".format(t)) if mode == self.INTERPOLATION_LINEAR: # No need to handle f_left / f_right; NumPy already does this for us return np.interp(t, ts, fs, f_left, f_right) elif mode == self.INTERPOLATION_PIECEWISE_CONSTANT_FORWARD: v = fs[np.maximum(np.searchsorted(ts, t, side='right') - 1, 0)] elif mode == self.INTERPOLATION_PIECEWISE_CONSTANT_BACKWARD: v = fs[np.minimum(np.searchsorted(ts, t, side='left'), len(ts) - 1)] else: raise NotImplementedError # Handle f_left / f_right: if hasattr(t, "__iter__"): v[t < ts[0]] = f_left v[t > ts[-1]] = f_right else: if t < ts[0]: v = f_left elif t > ts[-1]: v = f_right return v @abstractproperty def controls(self) -> List[str]: """ List of names of the control variables (excluding aliases). """ pass @abstractmethod def discretize_controls(self, resolved_bounds: AliasDict) -> Tuple[ int, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Performs the discretization of the control inputs, filling lower and upper bound vectors for the resulting optimization variables, as well as an initial guess. :param resolved_bounds: :class:`AliasDict` of numerical bound values. This is the same dictionary as returned by :func:`bounds`, but with all parameter symbols replaced with their numerical values. :returns: The number of control variables in the optimization problem, a lower bound vector, an upper bound vector, a seed vector, and a dictionary of offset values. """ pass def dynamic_parameters(self) -> List[ca.MX]: """ Returns a list of parameter symbols that may vary from run to run. The values of these parameters are not cached. :returns: A list of parameter symbols. """ return [] @abstractmethod def extract_controls(self, ensemble_member: int = 0) -> Dict[str, np.ndarray]: """ Extracts state time series from optimizer results. Must return a dictionary of result time series. :param ensemble_member: The ensemble member index. :returns: A dictionary of control input time series. """ pass def control_vector(self, variable: str, ensemble_member: int = 0) -> Union[ca.MX, List[ca.MX]]: """ Return the optimization variables for the entire time horizon of the given state. :param variable: Variable name. :param ensemble_member: The ensemble member index. :returns: A vector of control input symbols for the entire time horizon. :raises: KeyError """ return self.state_vector(variable, ensemble_member)
[docs] def control(self, variable: str) -> ca.MX: """ Returns an :class:`MX` symbol for the given control input, not bound to any time. :param variable: Variable name. :returns: :class:`MX` symbol for given control input. :raises: KeyError """ return self.variable(variable)
[docs] @abstractmethod def control_at(self, variable: str, t: float, ensemble_member: int = 0, scaled: bool = False) -> ca.MX: """ Returns an :class:`MX` symbol representing the given control input at the given time. :param variable: Variable name. :param t: Time. :param ensemble_member: The ensemble member index. :param scaled: True to return the scaled variable. :returns: :class:`MX` symbol representing the control input at the given time. :raises: KeyError """ pass
@abstractproperty def differentiated_states(self) -> List[str]: """ List of names of the differentiated state variables (excluding aliases). """ pass @abstractproperty def algebraic_states(self) -> List[str]: """ List of names of the algebraic state variables (excluding aliases). """ pass @abstractmethod def discretize_states(self, resolved_bounds: AliasDict) -> Tuple[ int, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Perform the discretization of the states. Fills lower and upper bound vectors for the resulting optimization variables, as well as an initial guess. :param resolved_bounds: :class:`AliasDict` of numerical bound values. This is the same dictionary as returned by :func:`bounds`, but with all parameter symbols replaced with their numerical values. :returns: The number of control variables in the optimization problem, a lower bound vector, an upper bound vector, a seed vector, and a dictionary of vector offset values. """ pass @abstractmethod def extract_states(self, ensemble_member: int = 0) -> Dict[str, np.ndarray]: """ Extracts state time series from optimizer results. Must return a dictionary of result time series. :param ensemble_member: The ensemble member index. :returns: A dictionary of state time series. """ pass @abstractmethod def state_vector(self, variable: str, ensemble_member: int = 0) -> Union[ca.MX, List[ca.MX]]: """ Return the optimization variables for the entire time horizon of the given state. :param variable: Variable name. :param ensemble_member: The ensemble member index. :returns: A vector of state symbols for the entire time horizon. :raises: KeyError """ pass
[docs] def state(self, variable: str) -> ca.MX: """ Returns an :class:`MX` symbol for the given state, not bound to any time. :param variable: Variable name. :returns: :class:`MX` symbol for given state. :raises: KeyError """ return self.variable(variable)
[docs] @abstractmethod def state_at(self, variable: str, t: float, ensemble_member: int = 0, scaled: bool = False) -> ca.MX: """ Returns an :class:`MX` symbol representing the given variable at the given time. :param variable: Variable name. :param t: Time. :param ensemble_member: The ensemble member index. :param scaled: True to return the scaled variable. :returns: :class:`MX` symbol representing the state at the given time. :raises: KeyError """ pass
@abstractmethod def extra_variable(self, variable: str, ensemble_member: int = 0) -> ca.MX: """ Returns an :class:`MX` symbol representing the extra variable inside the state vector. :param variable: Variable name. :param ensemble_member: The ensemble member index. :returns: :class:`MX` symbol representing the extra variable. :raises: KeyError """ pass
[docs] @abstractmethod def states_in(self, variable: str, t0: float = None, tf: float = None, ensemble_member: int = 0) -> Iterator[ca.MX]: """ Iterates over symbols for states in the interval [t0, tf]. :param variable: Variable name. :param t0: Left bound of interval. If equal to None, the initial time is used. :param tf: Right bound of interval. If equal to None, the final time is used. :param ensemble_member: The ensemble member index. :raises: KeyError """ pass
[docs] @abstractmethod def integral(self, variable: str, t0: float = None, tf: float = None, ensemble_member: int = 0) -> ca.MX: """ Returns an expression for the integral over the interval [t0, tf]. :param variable: Variable name. :param t0: Left bound of interval. If equal to None, the initial time is used. :param tf: Right bound of interval. If equal to None, the final time is used. :param ensemble_member: The ensemble member index. :returns: :class:`MX` object representing the integral. :raises: KeyError """ pass
[docs] @abstractmethod def der(self, variable: str) -> ca.MX: """ Returns an :class:`MX` symbol for the time derivative given state, not bound to any time. :param variable: Variable name. :returns: :class:`MX` symbol for given state. :raises: KeyError """ pass
[docs] @abstractmethod def der_at(self, variable: str, t: float, ensemble_member: int = 0) -> ca.MX: """ Returns an expression for the time derivative of the specified variable at time t. :param variable: Variable name. :param t: Time. :param ensemble_member: The ensemble member index. :returns: :class:`MX` object representing the derivative. :raises: KeyError """ pass
[docs] def get_timeseries(self, variable: str, ensemble_member: int = 0) -> Timeseries: """ Looks up a timeseries from the internal data store. :param variable: Variable name. :param ensemble_member: The ensemble member index. :returns: The requested time series. :rtype: :class:`.Timeseries` :raises: KeyError """ raise NotImplementedError
[docs] def set_timeseries( self, variable: str, timeseries: Timeseries, ensemble_member: int = 0, output: bool = True, check_consistency: bool = True) -> None: """ Sets a timeseries in the internal data store. :param variable: Variable name. :param timeseries: Time series data. :type timeseries: iterable of floats, or :class:`.Timeseries` :param ensemble_member: The ensemble member index. :param output: Whether to include this time series in output data files. :param check_consistency: Whether to check consistency between the time stamps on the new timeseries object and any existing time stamps. """ raise NotImplementedError
[docs] def timeseries_at(self, variable: str, t: float, ensemble_member: int = 0) -> float: """ Return the value of a time series at the given time. :param variable: Variable name. :param t: Time. :param ensemble_member: The ensemble member index. :returns: The interpolated value of the time series. :raises: KeyError """ raise NotImplementedError
def map_path_expression(self, expr: ca.MX, ensemble_member: int) -> ca.MX: """ Maps the path expression `expr` over the entire time horizon of the optimization problem. :param expr: An :class:`MX` path expression. :returns: An :class:`MX` expression evaluating `expr` over the entire time horizon. """ raise NotImplementedError @debug_check(DebugLevel.HIGH) def __debug_check_linearity_constraints(self, nlp): x = nlp['x'] f = nlp['f'] g = nlp['g'] expand_f_g = ca.Function('f_g', [x], [f, g]).expand() X_sx = ca.SX.sym('X', *x.shape) f_sx, g_sx = expand_f_g(X_sx) jac = ca.Function('j', [X_sx], [ca.jacobian(g_sx, X_sx)]).expand() if jac(np.nan).is_regular(): logger.info("The constraints are linear") else: hes = ca.Function('j', [X_sx], [ca.jacobian(ca.jacobian(g_sx, X_sx), X_sx)]).expand() if hes(np.nan).is_regular(): logger.info("The constraints are quadratic") else: logger.info("The constraints are nonlinear") @debug_check(DebugLevel.VERYHIGH) def __debug_check_linear_independence(self, lbx, ubx, lbg, ubg, nlp): x = nlp['x'] f = nlp['f'] g = nlp['g'] expand_f_g = ca.Function('f_g', [x], [f, g]).expand() x_sx = ca.SX.sym('X', *x.shape) f_sx, g_sx = expand_f_g(x_sx) x, f, g = x_sx, f_sx, g_sx lbg = np.array(ca.vertsplit(ca.veccat(*lbg))).ravel() ubg = np.array(ca.vertsplit(ca.veccat(*ubg))).ravel() # Find the linear constraints g_sjac = ca.Function('Af', [x], [ca.jtimes(g, x, x.ones(*x.shape))]) res = g_sjac(np.nan) res = np.array(res).ravel() g_is_linear = ~np.isnan(res) # Find the rows in the jacobian with only a single entry g_jac_csr = ca.DM(ca.Function('tmp', [x], [g]).sparsity_jac(0, 0)).tocsc().tocsr() g_single_variable = (np.diff(g_jac_csr.indptr) == 1) # Find the rows which are equality constraints g_eq_constraint = (lbg == ubg) # The intersection of all selections are constraints like we want g_constant_assignment = g_is_linear & g_single_variable & g_eq_constraint # Map of variable (index) to constraints/row numbers var_index_assignment = {} for i in range(g.size1()): if g_constant_assignment[i]: var_ind = g_jac_csr.getrow(i).indices[0] var_index_assignment.setdefault(var_ind, []).append(i) var_names, named_x, named_f, named_g = self._debug_get_named_nlp(nlp) for vi, g_inds in var_index_assignment.items(): if len(g_inds) > 1: logger.info("Variable '{}' has duplicate constraints setting its value:".format(var_names[vi])) for g_i in g_inds: logger.info("row {}: {} = {}".format(g_i, named_g[g_i], lbg[g_i])) # Find variables for which the bounds are equal, but also an equality # constraint is set. This would result in a constraint `1 = 1` with # the default IPOPT option `fixed_variable_treatment = make_parameter` x_inds = np.flatnonzero(lbx == ubx) for vi in x_inds: if vi in var_index_assignment: logger.info("Variable '{}' has equal bounds (value = {}), but also the following equality constraints:" .format(var_names[vi], lbx[vi])) for g_i in var_index_assignment[vi]: logger.info("row {}: {} = {}".format(g_i, named_g[g_i], lbg[g_i]))