Source code for rtctools.optimization.collocated_integrated_optimization_problem

import itertools
import logging
import warnings
from abc import ABCMeta, abstractmethod
from typing import Dict, Union

import casadi as ca
import numpy as np

from rtctools._internal.alias_tools import AliasDict
from rtctools._internal.casadi_helpers import (
    interpolate,
    is_affine,
    nullvertcat,
    reduce_matvec,
    substitute_in_external,
)
from rtctools._internal.debug_check_helpers import DebugLevel, debug_check

from .optimization_problem import OptimizationProblem
from .timeseries import Timeseries

logger = logging.getLogger("rtctools")


[docs] class CollocatedIntegratedOptimizationProblem(OptimizationProblem, metaclass=ABCMeta): """ Discretizes your model using a mixed collocation/integration scheme. Collocation means that the discretized model equations are included as constraints between state variables in the optimization problem. Integration means that the model equations are solved from one time step to the next in a sequential fashion, using a rootfinding algorithm at each and every step. The results of the integration procedure feature as inputs to the objective functions as well as to any constraints that do not originate from the DAE model. .. note:: To ensure that your optimization problem only has globally optimal solutions, any model equations that are collocated must be linear. By default, all model equations are collocated, and linearity of the model equations is verified. Working with non-linear models is possible, but discouraged. :cvar check_collocation_linearity: If ``True``, check whether collocation constraints are linear. Default is ``True``. """ #: Check whether the collocation constraints are linear check_collocation_linearity = True #: Whether or not the collocation constraints are linear (affine) linear_collocation = None def __init__(self, **kwargs): # Variables that will be optimized self.dae_variables["free_variables"] = ( self.dae_variables["states"] + self.dae_variables["algebraics"] + self.dae_variables["control_inputs"] ) # Cache names of states self.__differentiated_states = [ variable.name() for variable in self.dae_variables["states"] ] self.__differentiated_states_map = { v: i for i, v in enumerate(self.__differentiated_states) } self.__algebraic_states = [variable.name() for variable in self.dae_variables["algebraics"]] self.__algebraic_states_map = {v: i for i, v in enumerate(self.__algebraic_states)} self.__controls = [variable.name() for variable in self.dae_variables["control_inputs"]] self.__controls_map = {v: i for i, v in enumerate(self.__controls)} self.__derivative_names = [ variable.name() for variable in self.dae_variables["derivatives"] ] self.__initial_derivative_names = [ "initial_" + variable for variable in self.__derivative_names ] self.__initial_derivative_nominals = {} # DAE cache self.__integrator_step_function = None self.__dae_residual_function_collocated = None self.__initial_residual_with_params_fun_map = None # Create dictionary of variables so that we have O(1) state lookup available self.__variables = AliasDict(self.alias_relation) for var in itertools.chain( self.dae_variables["states"], self.dae_variables["algebraics"], self.dae_variables["control_inputs"], self.dae_variables["constant_inputs"], self.dae_variables["parameters"], self.dae_variables["time"], ): self.__variables[var.name()] = var # Call super super().__init__(**kwargs)
[docs] @abstractmethod def times(self, variable=None): """ List of time stamps for variable (to optimize for). :param variable: Variable name. :returns: A list of time stamps for the given variable. """ pass
[docs] def interpolation_method(self, variable=None): """ Interpolation method for variable. :param variable: Variable name. :returns: Interpolation method for the given variable. """ return self.INTERPOLATION_LINEAR
@property def integrate_states(self): """ TRUE if all states are to be integrated rather than collocated. """ return False @property def theta(self): r""" RTC-Tools discretizes differential equations of the form .. math:: \dot{x} = f(x, u) using the :math:`\theta`-method .. math:: x_{i+1} = x_i + \Delta t \left[\theta f(x_{i+1}, u_{i+1}) + (1 - \theta) f(x_i, u_i)\right] The default is :math:`\theta = 1`, resulting in the implicit or backward Euler method. Note that in this case, the control input at the initial time step is not used. Set :math:`\theta = 0` to use the explicit or forward Euler method. Note that in this case, the control input at the final time step is not used. .. warning:: This is an experimental feature for :math:`0 < \theta < 1`. .. deprecated:: 2.4 Support for semi-explicit collocation (theta < 1) will be removed in a future release. """ # Default to implicit Euler collocation, which is cheaper to evaluate # than the trapezoidal method, while being A-stable. # # N.B. Setting theta to 0 will cause problems with algebraic equations, # unless a consistent initialization is supplied for the algebraics. # N.B. Setting theta to any value strictly between 0 and 1 will cause # algebraic equations to be solved in an average sense. This may # induce unexpected oscillations. # TODO Fix these issue by performing index reduction and splitting DAE into ODE and # algebraic parts. Theta then only applies to the ODE part. return 1.0
[docs] def map_options(self) -> Dict[str, Union[str, int]]: """ Returns a dictionary of CasADi ``map()`` options. +---------------+-----------+---------------+ | Option | Type | Default value | +===============+===========+===============+ | ``mode`` | ``str` | ``openmp`` | +---------------+-----------+---------------+ | ``n_threads`` | ``int`` | ``None`` | +---------------+-----------+---------------+ The ``mode`` option controls the mode of the ``map()`` call. Valid values include ``openmp``, ``thread``, and ``unroll``. See the CasADi and documentation for detailed documentation on these modes. The ``n_threads`` option controls the number of threads used when in ``thread`` mode. .. note:: Not every CasADi build has support for OpenMP enabled. For such builds, the `thread` mode offers an alternative parallelization mode. .. note:: The use of ``expand=True`` in ``solver_options()`` may negate the parallelization benefits obtained using ``map()``. :returns: A dictionary of options for the `map()` call used to evaluate constraints on every time stamp. """ return {"mode": "openmp"}
def transcribe(self): # DAE residual dae_residual = self.dae_residual # Initial residual initial_residual = self.initial_residual logger.info( f"Transcribing problem with a DAE of {dae_residual.size1()} equations, " f"{len(self.times())} collocation points, " f"and {len(self.dae_variables['free_variables'])} free variables" ) # Reset dictionary of variables for var in itertools.chain(self.path_variables, self.extra_variables): self.__variables[var.name()] = var # Split the constant inputs into those used in the DAE, and additional # ones used for just the objective and/or constraints dae_constant_inputs_names = [x.name() for x in self.dae_variables["constant_inputs"]] extra_constant_inputs_name_and_size = [] for ensemble_member in range(self.ensemble_size): extra_constant_inputs_name_and_size.extend( [ (x, v.values.shape[1] if v.values.ndim > 1 else 1) for x, v in self.constant_inputs(ensemble_member).items() if x not in dae_constant_inputs_names ] ) self.__extra_constant_inputs = [] for var_name, size in extra_constant_inputs_name_and_size: var = ca.MX.sym(var_name, size) self.__variables[var_name] = var self.__extra_constant_inputs.append(var) # Cache extra and path variable names, and variable sizes self.__path_variable_names = [variable.name() for variable in self.path_variables] self.__extra_variable_names = [variable.name() for variable in self.extra_variables] # Cache the variable sizes, as a repeated call to .name() and .size1() # is expensive due to SWIG call overhead. self.__variable_sizes = {} for variable in itertools.chain( self.differentiated_states, self.algebraic_states, self.controls, self.__initial_derivative_names, ): self.__variable_sizes[variable] = 1 for mx_symbol, variable in zip(self.path_variables, self.__path_variable_names): self.__variable_sizes[variable] = mx_symbol.size1() for mx_symbol, variable in zip(self.extra_variables, self.__extra_variable_names): self.__variable_sizes[variable] = mx_symbol.size1() # Calculate nominals for the initial derivatives. We assume that the # history has (roughly) identical time steps for the entire ensemble. self.__initial_derivative_nominals = {} history_0 = self.history(0) for variable, initial_der_name in zip( self.__differentiated_states, self.__initial_derivative_names ): times = self.times(variable) default_time_step_size = 0 if len(times) > 1: default_time_step_size = times[1] - times[0] try: h = history_0[variable] if h.times[0] == times[0] or len(h.values) == 1: dt = default_time_step_size else: assert h.times[-1] == times[0] dt = h.times[-1] - h.times[-2] except KeyError: dt = default_time_step_size if dt > 0: self.__initial_derivative_nominals[initial_der_name] = ( self.variable_nominal(variable) / dt ) else: self.__initial_derivative_nominals[initial_der_name] = self.variable_nominal( variable ) # Check that the removed (because broken) integrated_states option is not used try: _ = self.integrated_states except AttributeError: # We expect there to be an error as users should use self.integrate_states pass else: raise Exception( "The integrated_states property is no longer supported. " "Use integrate_states instead." ) # Variables that are integrated states are not yet allowed to have size > 1 if self.integrate_states: self.__integrated_states = [*self.differentiated_states, *self.algebraic_states] for variable in self.__integrated_states: if self.__variable_sizes.get(variable, 1) > 1: raise NotImplementedError( "Vector symbol not supported for integrated state '{}'".format(variable) ) else: self.__integrated_states = [] # The same holds for controls for variable in self.controls: if self.__variable_sizes.get(variable, 1) > 1: raise NotImplementedError( "Vector symbol not supported for control state '{}'".format(variable) ) # Collocation times collocation_times = self.times() n_collocation_times = len(collocation_times) # Dynamic parameters dynamic_parameters = self.dynamic_parameters() dynamic_parameter_names = set() # Parameter symbols symbolic_parameters = ca.vertcat(*self.dae_variables["parameters"]) def _interpolate_constant_inputs(variables, raw_constant_inputs): constant_inputs_interpolated = {} for variable in variables: variable = variable.name() try: constant_input = raw_constant_inputs[variable] except KeyError: raise Exception("No values found for constant input {}".format(variable)) else: values = constant_input.values interpolation_method = self.interpolation_method(variable) constant_inputs_interpolated[variable] = self.interpolate( collocation_times, constant_input.times, values, 0.0, 0.0, interpolation_method, ) return constant_inputs_interpolated # Create a store of all ensemble-member-specific data for all ensemble members # N.B. Don't use n * [{}], as it creates n refs to the same dict. ensemble_store = [{} for i in range(self.ensemble_size)] for ensemble_member in range(self.ensemble_size): ensemble_data = ensemble_store[ensemble_member] # Store parameters parameters = self.parameters(ensemble_member) parameter_values = [None] * len(self.dae_variables["parameters"]) for i, symbol in enumerate(self.dae_variables["parameters"]): variable = symbol.name() try: parameter_values[i] = parameters[variable] except KeyError: raise Exception("No value specified for parameter {}".format(variable)) if len(dynamic_parameters) > 0: jac_1 = ca.jacobian(symbolic_parameters, ca.vertcat(*dynamic_parameters)) jac_2 = ca.jacobian(ca.vertcat(*parameter_values), ca.vertcat(*dynamic_parameters)) for i, symbol in enumerate(self.dae_variables["parameters"]): if jac_1[i, :].nnz() > 0 or jac_2[i, :].nnz() > 0: dynamic_parameter_names.add(symbol.name()) if np.any( [isinstance(value, ca.MX) and not value.is_constant() for value in parameter_values] ): parameter_values = nullvertcat(*parameter_values) [parameter_values] = substitute_in_external( [parameter_values], self.dae_variables["parameters"], parameter_values ) else: parameter_values = nullvertcat(*parameter_values) if ensemble_member == 0: # Store parameter values of member 0, as variable bounds may depend on these. self.__parameter_values_ensemble_member_0 = parameter_values ensemble_data["parameters"] = parameter_values # Store constant inputs raw_constant_inputs = self.constant_inputs(ensemble_member) ensemble_data["constant_inputs"] = _interpolate_constant_inputs( self.dae_variables["constant_inputs"], raw_constant_inputs ) ensemble_data["extra_constant_inputs"] = _interpolate_constant_inputs( self.__extra_constant_inputs, raw_constant_inputs ) # Handle all extra constant input data uniformly as 2D arrays for k, v in ensemble_data["extra_constant_inputs"].items(): if v.ndim == 1: ensemble_data["extra_constant_inputs"][k] = v[:, None] bounds = self.bounds() # Initialize control discretization ( control_size, discrete_control, lbx_control, ubx_control, x0_control, indices_control, ) = self.discretize_controls(bounds) # Initialize state discretization ( state_size, discrete_state, lbx_state, ubx_state, x0_state, indices_state, ) = self.discretize_states(bounds) # Merge state vector offset dictionary self.__indices = indices_control for ensemble_member in range(self.ensemble_size): for key, value in indices_state[ensemble_member].items(): if isinstance(value, slice): value = slice(value.start + control_size, value.stop + control_size) else: value += control_size self.__indices[ensemble_member][key] = value # Initialize vector of optimization symbols X = ca.MX.sym("X", control_size + state_size) self.__solver_input = X # Later on, we will be slicing MX/SX objects a few times for vectorized operations (to # reduce the overhead induced for each CasADi call). When slicing MX/SX objects, we want # to do that with a list of Python ints. Slicing with something else (e.g. a list of # np.int32, or a numpy array) is significantly slower. x_inds = list(range(X.size1())) self.__indices_as_lists = [{} for ensemble_member in range(self.ensemble_size)] for ensemble_member in range(self.ensemble_size): for k, v in self.__indices[ensemble_member].items(): if isinstance(v, slice): self.__indices_as_lists[ensemble_member][k] = x_inds[v] elif isinstance(v, int): self.__indices_as_lists[ensemble_member][k] = [v] else: self.__indices_as_lists[ensemble_member][k] = [int(i) for i in v] # Initialize bound and seed vectors discrete = np.zeros(X.size1(), dtype=bool) lbx = -np.inf * np.ones(X.size1()) ubx = np.inf * np.ones(X.size1()) x0 = np.zeros(X.size1()) discrete[: len(discrete_control)] = discrete_control discrete[len(discrete_control) :] = discrete_state lbx[: len(lbx_control)] = lbx_control lbx[len(lbx_control) :] = lbx_state ubx[: len(ubx_control)] = ubx_control ubx[len(lbx_control) :] = ubx_state x0[: len(x0_control)] = x0_control x0[len(x0_control) :] = x0_state # Provide a state for self.state_at() and self.der() to work with. self.__control_size = control_size self.__state_size = state_size self.__symbol_cache = {} # Free variables for the collocated optimization problem if self.integrate_states: integrated_variables = self.dae_variables["states"] + self.dae_variables["algebraics"] collocated_variables = [] else: integrated_variables = [] collocated_variables = self.dae_variables["states"] + self.dae_variables["algebraics"] collocated_variables += self.dae_variables["control_inputs"] if logger.getEffectiveLevel() == logging.DEBUG: logger.debug("Integrating variables {}".format(repr(integrated_variables))) logger.debug("Collocating variables {}".format(repr(collocated_variables))) integrated_variable_names = [v.name() for v in integrated_variables] integrated_variable_nominals = np.array( [self.variable_nominal(v) for v in integrated_variable_names] ) collocated_variable_names = [v.name() for v in collocated_variables] collocated_variable_nominals = np.array( [self.variable_nominal(v) for v in collocated_variable_names] ) # Split derivatives into "integrated" and "collocated" lists. if self.integrate_states: integrated_derivatives = self.dae_variables["derivatives"][:] collocated_derivatives = [] else: integrated_derivatives = [] collocated_derivatives = self.dae_variables["derivatives"][:] self.__algebraic_and_control_derivatives = [] for var in self.dae_variables["algebraics"]: sym = ca.MX.sym("der({})".format(var.name())) self.__algebraic_and_control_derivatives.append(sym) if self.integrate_states: integrated_derivatives.append(sym) else: collocated_derivatives.append(sym) for var in self.dae_variables["control_inputs"]: sym = ca.MX.sym("der({})".format(var.name())) self.__algebraic_and_control_derivatives.append(sym) collocated_derivatives.append(sym) # Path objective path_objective = self.path_objective(0) # Path constraints path_constraints = self.path_constraints(0) path_constraint_expressions = ca.vertcat( *[f_constraint for (f_constraint, lb, ub) in path_constraints] ) # Delayed feedback delayed_feedback_expressions, delayed_feedback_states, delayed_feedback_durations = ( [], [], [], ) delayed_feedback = self.delayed_feedback() if delayed_feedback: delayed_feedback_expressions, delayed_feedback_states, delayed_feedback_durations = zip( *delayed_feedback ) # Make sure the original data cannot be used anymore, because it will # become incorrect/stale with the inlining of constant parameters. del delayed_feedback # Initial time t0 = self.initial_time # Establish integrator theta theta = self.theta if theta < 1: warnings.warn( ( "Explicit collocation/integration is deprecated " "and will be removed in a future version." ), FutureWarning, stacklevel=1, ) # Set CasADi function options options = self.solver_options() function_options = {"max_num_dir": options["optimized_num_dir"]} # Update the store of all ensemble-member-specific data for all ensemble members # with initial states, derivatives, and path variables. # Use vectorized approach to avoid SWIG call overhead for each CasADi call. n = len(integrated_variables) + len(collocated_variables) for ensemble_member in range(self.ensemble_size): ensemble_data = ensemble_store[ensemble_member] initial_state_indices = [None] * n # Derivatives take a bit more effort to vectorize, as we can have # both constant values and elements in the state vector initial_derivatives = ca.MX.zeros((n, 1)) init_der_variable = [] init_der_variable_indices = [] init_der_variable_nominals = [] init_der_constant = [] init_der_constant_values = [] history = self.history(ensemble_member) for j, variable in enumerate(integrated_variable_names + collocated_variable_names): initial_state_indices[j] = self.__indices_as_lists[ensemble_member][variable][0] try: i = self.__differentiated_states_map[variable] initial_der_name = self.__initial_derivative_names[i] init_der_variable_nominals.append(self.variable_nominal(initial_der_name)) init_der_variable_indices.append( self.__indices[ensemble_member][initial_der_name] ) init_der_variable.append(j) except KeyError: # We do interpolation here instead of relying on der_at. This is faster because: # 1. We can reuse the history variable. # 2. We know that "variable" is a canonical state # 3. We know that we are only dealing with history (numeric values, not # symbolics) try: h = history[variable] if h.times[0] == t0 or len(h.values) == 1: init_der = 0.0 else: assert h.times[-1] == t0 init_der = (h.values[-1] - h.values[-2]) / (h.times[-1] - h.times[-2]) except KeyError: init_der = 0.0 init_der_constant_values.append(init_der) init_der_constant.append(j) initial_derivatives[init_der_variable] = X[init_der_variable_indices] * np.array( init_der_variable_nominals ) if len(init_der_constant_values) > 0: initial_derivatives[init_der_constant] = init_der_constant_values ensemble_data["initial_state"] = X[initial_state_indices] * np.concatenate( (integrated_variable_nominals, collocated_variable_nominals) ) ensemble_data["initial_derivatives"] = initial_derivatives # Store initial path variables initial_path_variable_inds = [] path_variables_size = sum(self.__variable_sizes[v] for v in self.__path_variable_names) path_variables_nominals = np.ones(path_variables_size) offset = 0 for variable in self.__path_variable_names: step = len(self.times(variable)) initial_path_variable_inds.extend( self.__indices_as_lists[ensemble_member][variable][0::step] ) variable_size = self.__variable_sizes[variable] path_variables_nominals[offset : offset + variable_size] = self.variable_nominal( variable ) offset += variable_size ensemble_data["initial_path_variables"] = ( X[initial_path_variable_inds] * path_variables_nominals ) # Replace parameters which are constant across the entire ensemble constant_parameters = [] constant_parameter_values = [] ensemble_parameters = [] ensemble_parameter_values = [[] for i in range(self.ensemble_size)] for i, parameter in enumerate(self.dae_variables["parameters"]): values = [ ensemble_store[ensemble_member]["parameters"][i] for ensemble_member in range(self.ensemble_size) ] if ( len(values) == 1 or (np.all(values) == values[0]) ) and parameter.name() not in dynamic_parameter_names: constant_parameters.append(parameter) constant_parameter_values.append(values[0]) else: ensemble_parameters.append(parameter) for ensemble_member in range(self.ensemble_size): ensemble_parameter_values[ensemble_member].append(values[ensemble_member]) symbolic_parameters = ca.vertcat(*ensemble_parameters) # Inline constant parameter values if constant_parameters: delayed_feedback_expressions = ca.substitute( delayed_feedback_expressions, constant_parameters, constant_parameter_values ) delayed_feedback_durations = ca.substitute( delayed_feedback_durations, constant_parameters, constant_parameter_values ) path_objective, path_constraint_expressions = ca.substitute( [path_objective, path_constraint_expressions], constant_parameters, constant_parameter_values, ) # Collect extra variable symbols symbolic_extra_variables = ca.vertcat(*self.extra_variables) # Aggregate ensemble data ensemble_aggregate = {} ensemble_aggregate["parameters"] = ca.horzcat( *[nullvertcat(*p) for p in ensemble_parameter_values] ) ensemble_aggregate["initial_constant_inputs"] = ca.horzcat( *[ nullvertcat( *[ float(d["constant_inputs"][variable.name()][0]) for variable in self.dae_variables["constant_inputs"] ] ) for d in ensemble_store ] ) ensemble_aggregate["initial_extra_constant_inputs"] = ca.horzcat( *[ nullvertcat( *[ d["extra_constant_inputs"][variable.name()][0, :] for variable in self.__extra_constant_inputs ] ) for d in ensemble_store ] ) ensemble_aggregate["initial_state"] = ca.horzcat( *[d["initial_state"] for d in ensemble_store] ) ensemble_aggregate["initial_state"] = reduce_matvec( ensemble_aggregate["initial_state"], self.solver_input ) ensemble_aggregate["initial_derivatives"] = ca.horzcat( *[d["initial_derivatives"] for d in ensemble_store] ) ensemble_aggregate["initial_derivatives"] = reduce_matvec( ensemble_aggregate["initial_derivatives"], self.solver_input ) ensemble_aggregate["initial_path_variables"] = ca.horzcat( *[d["initial_path_variables"] for d in ensemble_store] ) ensemble_aggregate["initial_path_variables"] = reduce_matvec( ensemble_aggregate["initial_path_variables"], self.solver_input ) if (self.__dae_residual_function_collocated is None) and ( self.__integrator_step_function is None ): # Insert lookup tables. No support yet for different lookup tables per ensemble member. lookup_tables = self.lookup_tables(0) for sym in self.dae_variables["lookup_tables"]: sym_name = sym.name() try: lookup_table = lookup_tables[sym_name] except KeyError: raise Exception("Unable to find lookup table function for {}".format(sym_name)) else: input_syms = [ self.variable(input_sym.name()) for input_sym in lookup_table.inputs ] value = lookup_table.function(*input_syms) [dae_residual] = ca.substitute([dae_residual], [sym], [value]) if len(self.dae_variables["lookup_tables"]) > 0 and self.ensemble_size > 1: logger.warning("Using lookup tables of ensemble member #0 for all members.") # Insert constant parameter values dae_residual, initial_residual = ca.substitute( [dae_residual, initial_residual], constant_parameters, constant_parameter_values ) # Allocate DAE to an integrated or to a collocated part if self.integrate_states: dae_residual_integrated = dae_residual dae_residual_collocated = ca.MX() else: dae_residual_integrated = ca.MX() dae_residual_collocated = dae_residual # Check linearity of collocated part if self.check_collocation_linearity and dae_residual_collocated.size1() > 0: # Check linearity of collocation constraints, which is a necessary condition for the # optimization problem to be convex self.linear_collocation = True # Aside from decision variables, the DAE expression also contains parameters # and constant inputs. We need to inline them before we do the affinity check. # Note that this not an exhaustive check, as other values for the # parameters/constant inputs may result in a non-affine DAE (or vice-versa). np.random.seed(42) fixed_vars = ca.vertcat( *self.dae_variables["time"], *self.dae_variables["constant_inputs"], ca.MX(symbolic_parameters), ) fixed_var_values = np.random.rand(fixed_vars.size1()) if not is_affine( ca.substitute(dae_residual_collocated, fixed_vars, fixed_var_values), ca.vertcat( *collocated_variables + integrated_variables + collocated_derivatives + integrated_derivatives ), ): self.linear_collocation = False logger.warning( "The DAE residual contains equations that are not affine. " "There is therefore no guarantee that the optimization problem is convex. " "This will, in general, result in the existence of multiple local optima " "and trouble finding a feasible initial solution." ) # Transcribe DAE using theta method collocation if self.integrate_states: I = ca.MX.sym("I", len(integrated_variables)) # noqa: E741 I0 = ca.MX.sym("I0", len(integrated_variables)) C0 = [ca.MX.sym("C0[{}]".format(i)) for i in range(len(collocated_variables))] CI0 = [ ca.MX.sym("CI0[{}]".format(i)) for i in range(len(self.dae_variables["constant_inputs"])) ] dt_sym = ca.MX.sym("dt") integrated_finite_differences = (I - I0) / dt_sym [dae_residual_integrated_0] = ca.substitute( [dae_residual_integrated], ( integrated_variables + collocated_variables + integrated_derivatives + self.dae_variables["constant_inputs"] + self.dae_variables["time"] ), ( [I0[i] for i in range(len(integrated_variables))] + [C0[i] for i in range(len(collocated_variables))] + [ integrated_finite_differences[i] for i in range(len(integrated_derivatives)) ] + [CI0[i] for i in range(len(self.dae_variables["constant_inputs"]))] + [self.dae_variables["time"][0] - dt_sym] ), ) [dae_residual_integrated_1] = ca.substitute( [dae_residual_integrated], (integrated_variables + integrated_derivatives), ( [I[i] for i in range(len(integrated_variables))] + [ integrated_finite_differences[i] for i in range(len(integrated_derivatives)) ] ), ) if theta == 0: dae_residual_integrated = dae_residual_integrated_0 elif theta == 1: dae_residual_integrated = dae_residual_integrated_1 else: dae_residual_integrated = ( 1 - theta ) * dae_residual_integrated_0 + theta * dae_residual_integrated_1 dae_residual_function_integrated = ca.Function( "dae_residual_function_integrated", [ I, I0, symbolic_parameters, ca.vertcat( *( [C0[i] for i in range(len(collocated_variables))] + [ CI0[i] for i in range(len(self.dae_variables["constant_inputs"])) ] + [dt_sym] + collocated_variables + collocated_derivatives + self.dae_variables["constant_inputs"] + self.dae_variables["time"] ) ), ], [dae_residual_integrated], function_options, ) try: dae_residual_function_integrated = dae_residual_function_integrated.expand() except RuntimeError as e: # We only expect to fail if the DAE was an external function if "'eval_sx' not defined for External" in str(e): pass else: raise options = self.integrator_options() self.__integrator_step_function = ca.rootfinder( "integrator_step_function", "fast_newton", dae_residual_function_integrated, options, ) # Initialize a Function for the DAE residual (collocated part) elif len(collocated_variables) > 0: self.__dae_residual_function_collocated = ca.Function( "dae_residual_function_collocated", [ symbolic_parameters, ca.vertcat( *( collocated_variables + collocated_derivatives + self.dae_variables["constant_inputs"] + self.dae_variables["time"] ) ), ], [dae_residual_collocated], function_options, ) try: self.__dae_residual_function_collocated = ( self.__dae_residual_function_collocated.expand() ) except RuntimeError as e: # We only expect to fail if the DAE was an external function if "'eval_sx' not defined for External" in str(e): pass else: raise if self.integrate_states: integrator_step_function = self.__integrator_step_function dae_residual_collocated_size = 0 elif len(collocated_variables) > 0: dae_residual_function_collocated = self.__dae_residual_function_collocated dae_residual_collocated_size = dae_residual_function_collocated.mx_out(0).size1() else: dae_residual_collocated_size = 0 # Note that this list is stored, such that it can be reused in the # map_path_expression() method. self.__func_orig_inputs = [ symbolic_parameters, ca.vertcat( *integrated_variables, *collocated_variables, *integrated_derivatives, *collocated_derivatives, *self.dae_variables["constant_inputs"], *self.dae_variables["time"], *self.path_variables, *self.__extra_constant_inputs, ), symbolic_extra_variables, ] # Initialize a Function for the path objective # Note that we assume that the path objective expression is the same for all ensemble # members path_objective_function = ca.Function( "path_objective", self.__func_orig_inputs, [path_objective], function_options ) path_objective_function = path_objective_function.expand() # Initialize a Function for the path constraints # Note that we assume that the path constraint expression is the same for all ensemble # members path_constraints_function = ca.Function( "path_constraints", self.__func_orig_inputs, [path_constraint_expressions], function_options, ) path_constraints_function = path_constraints_function.expand() # Initialize a Function for the delayed feedback delayed_feedback_function = ca.Function( "delayed_feedback", self.__func_orig_inputs, delayed_feedback_expressions, function_options, ) delayed_feedback_function = delayed_feedback_function.expand() # Set up accumulation over time (integration, and generation of # collocation constraints) if self.integrate_states: accumulated_X = ca.MX.sym("accumulated_X", len(integrated_variables)) else: accumulated_X = ca.MX.sym("accumulated_X", 0) path_variables_size = sum(x.size1() for x in self.path_variables) extra_constant_inputs_size = sum(x.size1() for x in self.__extra_constant_inputs) accumulated_U = ca.MX.sym( "accumulated_U", ( 2 * (len(collocated_variables) + len(self.dae_variables["constant_inputs"]) + 1) + path_variables_size + extra_constant_inputs_size ), ) integrated_states_0 = accumulated_X[0 : len(integrated_variables)] integrated_states_1 = ca.MX.sym("integrated_states_1", len(integrated_variables)) collocated_states_0 = accumulated_U[0 : len(collocated_variables)] collocated_states_1 = accumulated_U[ len(collocated_variables) : 2 * len(collocated_variables) ] constant_inputs_0 = accumulated_U[ 2 * len(collocated_variables) : 2 * len(collocated_variables) + len(self.dae_variables["constant_inputs"]) ] constant_inputs_1 = accumulated_U[ 2 * len(collocated_variables) + len(self.dae_variables["constant_inputs"]) : 2 * len(collocated_variables) + 2 * len(self.dae_variables["constant_inputs"]) ] offset = 2 * (len(collocated_variables) + len(self.dae_variables["constant_inputs"])) collocation_time_0 = accumulated_U[offset + 0] collocation_time_1 = accumulated_U[offset + 1] path_variables_1 = accumulated_U[offset + 2 : offset + 2 + len(self.path_variables)] extra_constant_inputs_1 = accumulated_U[offset + 2 + len(self.path_variables) :] # Approximate derivatives using backwards finite differences dt = collocation_time_1 - collocation_time_0 integrated_finite_differences = ca.MX() # Overwritten later if integrate_states is True collocated_finite_differences = (collocated_states_1 - collocated_states_0) / dt # We use ca.vertcat to compose the list into an MX. This is, in # CasADi 2.4, faster. accumulated_Y = [] # Integrate integrated states if self.integrate_states: # Perform step by computing implicit function # CasADi shares subexpressions that are bundled into the same Function. # The first argument is the guess for the new value of # integrated_states. [integrated_states_1] = integrator_step_function.call( [ integrated_states_0, integrated_states_0, symbolic_parameters, ca.vertcat( collocated_states_0, constant_inputs_0, dt, collocated_states_1, collocated_finite_differences, constant_inputs_1, collocation_time_1 - t0, ), ], False, True, ) accumulated_Y.append(integrated_states_1) # Recompute finite differences with computed new state. # We don't use substititute() for this, as it becomes expensive over long # integration horizons. integrated_finite_differences = (integrated_states_1 - integrated_states_0) / dt # Call DAE residual at collocation point # Time stamp following paragraph 3.6.7 of the Modelica # specifications, version 3.3. elif len(collocated_variables) > 0: if theta < 1: # Obtain state vector [dae_residual_0] = dae_residual_function_collocated.call( [ symbolic_parameters, ca.vertcat( collocated_states_0, collocated_finite_differences, constant_inputs_0, collocation_time_0 - t0, ), ], False, True, ) if theta > 0: # Obtain state vector [dae_residual_1] = dae_residual_function_collocated.call( [ symbolic_parameters, ca.vertcat( collocated_states_1, collocated_finite_differences, constant_inputs_1, collocation_time_1 - t0, ), ], False, True, ) if theta == 0: accumulated_Y.append(dae_residual_0) elif theta == 1: accumulated_Y.append(dae_residual_1) else: accumulated_Y.append((1 - theta) * dae_residual_0 + theta * dae_residual_1) self.__func_inputs_implicit = [ symbolic_parameters, ca.vertcat( integrated_states_1, collocated_states_1, integrated_finite_differences, collocated_finite_differences, constant_inputs_1, collocation_time_1 - t0, path_variables_1, extra_constant_inputs_1, ), symbolic_extra_variables, ] accumulated_Y.extend(path_objective_function.call(self.__func_inputs_implicit, False, True)) accumulated_Y.extend( path_constraints_function.call(self.__func_inputs_implicit, False, True) ) accumulated_Y.extend( delayed_feedback_function.call(self.__func_inputs_implicit, False, True) ) # Save the accumulated inputs such that can be used later in map_path_expression() self.__func_accumulated_inputs = ( accumulated_X, accumulated_U, ca.veccat(symbolic_parameters, symbolic_extra_variables), ) # Use map/mapaccum to capture integration and collocation constraint generation over the # entire time horizon with one symbolic operation. This saves a lot of memory. if n_collocation_times > 1: if self.integrate_states: accumulated = ca.Function( "accumulated", self.__func_accumulated_inputs, [accumulated_Y[0], ca.vertcat(*accumulated_Y[1:])], function_options, ) accumulation = accumulated.mapaccum("accumulation", n_collocation_times - 1) else: # Fully collocated problem. Use map(), so that we can use # parallelization along the time axis. accumulated = ca.Function( "accumulated", self.__func_accumulated_inputs, [ca.vertcat(*accumulated_Y)], function_options, ) options = self.map_options() if options["mode"] == "thread": accumulation = accumulated.map( n_collocation_times - 1, options["mode"], options["n_threads"] ) else: accumulation = accumulated.map(n_collocation_times - 1, options["mode"]) else: accumulation = None # Start collecting constraints f = [] g = [] lbg = [] ubg = [] # Add constraints for initial conditions if self.__initial_residual_with_params_fun_map is None: initial_residual_with_params_fun = ca.Function( "initial_residual_total", [ symbolic_parameters, ca.vertcat( *( self.dae_variables["states"] + self.dae_variables["algebraics"] + self.dae_variables["control_inputs"] + integrated_derivatives + collocated_derivatives + self.dae_variables["constant_inputs"] + self.dae_variables["time"] ) ), ], [ca.veccat(dae_residual, initial_residual)], function_options, ) self.__initial_residual_with_params_fun_map = initial_residual_with_params_fun.map( self.ensemble_size ) initial_residual_with_params_fun_map = self.__initial_residual_with_params_fun_map [res] = initial_residual_with_params_fun_map.call( [ ensemble_aggregate["parameters"], ca.vertcat( *[ ensemble_aggregate["initial_state"], ensemble_aggregate["initial_derivatives"], ensemble_aggregate["initial_constant_inputs"], ca.repmat([0.0], 1, self.ensemble_size), ] ), ], False, True, ) res = ca.vec(res) g.append(res) zeros = [0.0] * res.size1() lbg.extend(zeros) ubg.extend(zeros) # The initial values and the interpolated mapped arguments are saved # such that can be reused in map_path_expression(). self.__func_initial_inputs = [] self.__func_map_args = [] # Integrators are saved for result extraction later on self.__integrators = [] # Process the objectives and constraints for each ensemble member separately. # Note that we don't use map here for the moment, so as to allow each ensemble member to # define its own constraints and objectives. Path constraints are applied for all ensemble # members simultaneously at the moment. We can get rid of map again, and allow every # ensemble member to specify its own path constraints as well, once CasADi has some kind # of loop detection. for ensemble_member in range(self.ensemble_size): logger.info( "Transcribing ensemble member {}/{}".format(ensemble_member + 1, self.ensemble_size) ) initial_state = ensemble_aggregate["initial_state"][:, ensemble_member] initial_derivatives = ensemble_aggregate["initial_derivatives"][:, ensemble_member] initial_path_variables = ensemble_aggregate["initial_path_variables"][ :, ensemble_member ] initial_constant_inputs = ensemble_aggregate["initial_constant_inputs"][ :, ensemble_member ] initial_extra_constant_inputs = ensemble_aggregate["initial_extra_constant_inputs"][ :, ensemble_member ] parameters = ensemble_aggregate["parameters"][:, ensemble_member] extra_variables = ca.vertcat( *[self.extra_variable(var.name(), ensemble_member) for var in self.extra_variables] ) constant_inputs = ensemble_store[ensemble_member]["constant_inputs"] extra_constant_inputs = ensemble_store[ensemble_member]["extra_constant_inputs"] # Initial conditions specified in history timeseries history = self.history(ensemble_member) for variable in itertools.chain( self.differentiated_states, self.algebraic_states, self.controls ): try: history_timeseries = history[variable] except KeyError: pass else: interpolation_method = self.interpolation_method(variable) val = self.interpolate( t0, history_timeseries.times, history_timeseries.values, np.nan, np.nan, interpolation_method, ) val /= self.variable_nominal(variable) if not np.isnan(val): idx = self.__indices_as_lists[ensemble_member][variable][0] if val < lbx[idx] or val > ubx[idx]: logger.warning( "Initial value {} for variable '{}' outside bounds.".format( val, variable ) ) lbx[idx] = ubx[idx] = val initial_derivative_constraints = [] for i, variable in enumerate(self.differentiated_states): try: history_timeseries = history[variable] except KeyError: pass else: if len(history_timeseries.times) <= 1 or np.isnan( history_timeseries.values[-2] ): continue assert history_timeseries.times[-1] == t0 if np.isnan(history_timeseries.values[-1]): t0_val = self.state_vector(variable, ensemble_member=ensemble_member)[0] t0_val *= self.variable_nominal(variable) val = (t0_val - history_timeseries.values[-2]) / ( t0 - history_timeseries.times[-2] ) sym = initial_derivatives[i] initial_derivative_constraints.append(sym - val) else: interpolation_method = self.interpolation_method(variable) t0_val = self.interpolate( t0, history_timeseries.times, history_timeseries.values, np.nan, np.nan, interpolation_method, ) initial_der_name = self.__initial_derivative_names[i] val = (t0_val - history_timeseries.values[-2]) / ( t0 - history_timeseries.times[-2] ) val /= self.variable_nominal(initial_der_name) idx = self.__indices[ensemble_member][initial_der_name] lbx[idx] = ubx[idx] = val if len(initial_derivative_constraints) > 0: g.append(ca.vertcat(*initial_derivative_constraints)) lbg.append(np.zeros(len(initial_derivative_constraints))) ubg.append(np.zeros(len(initial_derivative_constraints))) # Initial conditions for integrator accumulation_X0 = [] if self.integrate_states: for variable in integrated_variable_names: value = self.state_vector(variable, ensemble_member=ensemble_member)[0] nominal = self.variable_nominal(variable) if nominal != 1: value *= nominal accumulation_X0.append(value) accumulation_X0 = ca.vertcat(*accumulation_X0) # Input for map logger.info("Interpolating states") accumulation_U = [None] * ( 1 + 2 * len(self.dae_variables["constant_inputs"]) + 3 + len(self.__extra_constant_inputs) ) # Most variables have collocation times equal to the global # collocation times. Use a vectorized approach to process them. interpolated_states_explicit = [] interpolated_states_implicit = [] place_holder = [-1] * n_collocation_times for variable in collocated_variable_names: var_inds = self.__indices_as_lists[ensemble_member][variable] # If the variable times != collocation times, what we do here is just a placeholder if len(var_inds) != n_collocation_times: var_inds = var_inds.copy() var_inds.extend(place_holder) var_inds = var_inds[:n_collocation_times] interpolated_states_explicit.extend(var_inds[:-1]) interpolated_states_implicit.extend(var_inds[1:]) repeated_nominals = np.tile( np.repeat(collocated_variable_nominals, n_collocation_times - 1), 2 ) interpolated_states = ( ca.vertcat(X[interpolated_states_explicit], X[interpolated_states_implicit]) * repeated_nominals ) interpolated_states = interpolated_states.reshape( (n_collocation_times - 1, len(collocated_variables) * 2) ) # Handle variables that have different collocation times. for j, variable in enumerate(collocated_variable_names): times = self.times(variable) if n_collocation_times == len(times): # Already handled continue interpolation_method = self.interpolation_method(variable) values = self.state_vector(variable, ensemble_member=ensemble_member) interpolated = interpolate( times, values, collocation_times, False, interpolation_method ) nominal = self.variable_nominal(variable) if nominal != 1: interpolated *= nominal interpolated_states[:, j] = interpolated[:-1] interpolated_states[:, len(collocated_variables) + j] = interpolated[1:] # We do not cache the Jacobians, as the structure may change from ensemble member to # member, and from goal programming/homotopy run to run. # We could, of course, pick the states apart into controls and states, and generate # Jacobians for each set separately and for each ensemble member separately, but in # this case the increased complexity may well offset the performance gained by # caching. interpolated_states = reduce_matvec(interpolated_states, self.solver_input) accumulation_U[0] = interpolated_states for j, variable in enumerate(self.dae_variables["constant_inputs"]): variable = variable.name() constant_input = constant_inputs[variable] accumulation_U[1 + j] = ca.MX(constant_input[0 : n_collocation_times - 1]) accumulation_U[1 + len(self.dae_variables["constant_inputs"]) + j] = ca.MX( constant_input[1:n_collocation_times] ) accumulation_U[1 + 2 * len(self.dae_variables["constant_inputs"])] = ca.MX( collocation_times[0 : n_collocation_times - 1] ) accumulation_U[1 + 2 * len(self.dae_variables["constant_inputs"]) + 1] = ca.MX( collocation_times[1:n_collocation_times] ) path_variables = [None] * len(self.path_variables) for j, variable in enumerate(self.__path_variable_names): variable_size = self.__variable_sizes[variable] values = self.state_vector(variable, ensemble_member=ensemble_member) nominal = self.variable_nominal(variable) if isinstance(nominal, np.ndarray): nominal = ( np.broadcast_to(nominal, (n_collocation_times, variable_size)) .transpose() .ravel() ) values *= nominal elif nominal != 1: values *= nominal path_variables[j] = values.reshape((n_collocation_times, variable_size))[1:, :] path_variables = reduce_matvec(ca.horzcat(*path_variables), self.solver_input) accumulation_U[1 + 2 * len(self.dae_variables["constant_inputs"]) + 2] = path_variables for j, variable in enumerate(self.__extra_constant_inputs): variable = variable.name() constant_input = extra_constant_inputs[variable] accumulation_U[1 + 2 * len(self.dae_variables["constant_inputs"]) + 3 + j] = ca.MX( constant_input[1:n_collocation_times, :] ) # Construct matrix using O(states) CasADi operations # This is faster than using blockcat, presumably because of the # row-wise scaling operations. logger.info("Aggregating and de-scaling variables") accumulation_U = [var for var in accumulation_U if var.numel() > 0] accumulation_U = ca.transpose(ca.horzcat(*accumulation_U)) # Map to all time steps logger.info("Mapping") # Save these inputs such that can be used later in map_path_expression() self.__func_initial_inputs.append( [ parameters, ca.vertcat( initial_state, initial_derivatives, initial_constant_inputs, 0.0, initial_path_variables, initial_extra_constant_inputs, ), extra_variables, ] ) if accumulation is not None: integrators_and_collocation_and_path_constraints = accumulation( accumulation_X0, accumulation_U, ca.repmat(ca.vertcat(parameters, extra_variables), 1, n_collocation_times - 1), ) else: integrators_and_collocation_and_path_constraints = None if accumulation is not None and self.integrate_states: integrators = integrators_and_collocation_and_path_constraints[0] integrators_and_collocation_and_path_constraints = ( integrators_and_collocation_and_path_constraints[1] ) if ( accumulation is not None and integrators_and_collocation_and_path_constraints.numel() > 0 ): collocation_constraints = ca.vec( integrators_and_collocation_and_path_constraints[ :dae_residual_collocated_size, 0 : n_collocation_times - 1 ] ) discretized_path_objective = ca.vec( integrators_and_collocation_and_path_constraints[ dae_residual_collocated_size : dae_residual_collocated_size + path_objective.size1(), 0 : n_collocation_times - 1, ] ) discretized_path_constraints = ca.vec( integrators_and_collocation_and_path_constraints[ dae_residual_collocated_size + path_objective.size1() : dae_residual_collocated_size + path_objective.size1() + path_constraint_expressions.size1(), 0 : n_collocation_times - 1, ] ) discretized_delayed_feedback = integrators_and_collocation_and_path_constraints[ dae_residual_collocated_size + path_objective.size1() + path_constraint_expressions.size1() :, 0 : n_collocation_times - 1, ] else: collocation_constraints = ca.MX() discretized_path_objective = ca.MX() discretized_path_constraints = ca.MX() discretized_delayed_feedback = ca.MX() logger.info("Composing NLP segment") # Store integrators for result extraction if self.integrate_states: # Store integrators for result extraction self.__integrators.append( { variable: integrators[i, :] for i, variable in enumerate(integrated_variable_names) } ) else: # Add collocation constraints g.append(collocation_constraints) zeros = np.zeros(collocation_constraints.size1()) lbg.extend(zeros) ubg.extend(zeros) # Prepare arguments for map_path_expression() calls to ca.map() if len(integrated_variables) + len(collocated_variables) > 0: if self.integrate_states: # Inputs states_and_algebraics_and_controls = ca.vertcat( *[ self.variable_nominal(variable) * self.__integrators[ensemble_member][variable] for variable in integrated_variable_names ], interpolated_states[ :, len(collocated_variables) :, ].T, ) states_and_algebraics_and_controls_derivatives = ( ( states_and_algebraics_and_controls - ca.horzcat( ensemble_store[ensemble_member]["initial_state"], states_and_algebraics_and_controls[:, :-1], ) ).T / (collocation_times[1:] - collocation_times[:-1]) ).T else: states_and_algebraics_and_controls = interpolated_states[ :, len(collocated_variables) : ].T states_and_algebraics_and_controls_derivatives = ( ( interpolated_states[:, len(collocated_variables) :] - interpolated_states[:, : len(collocated_variables)] ) / (collocation_times[1:] - collocation_times[:-1]) ).T else: states_and_algebraics_and_controls = ca.MX() states_and_algebraics_and_controls_derivatives = ca.MX() self.__func_map_args.append( [ ca.repmat( ca.vertcat(*ensemble_parameter_values[ensemble_member]), 1, n_collocation_times - 1, ), ca.vertcat( states_and_algebraics_and_controls, states_and_algebraics_and_controls_derivatives, *[ ca.horzcat(*constant_inputs[variable][1:]) for variable in dae_constant_inputs_names ], ca.horzcat(*collocation_times[1:]), path_variables.T if path_variables.numel() > 0 else ca.MX(), *[ ca.horzcat(*extra_constant_inputs[variable][1:]) for (variable, _) in extra_constant_inputs_name_and_size ], ), ca.repmat(extra_variables, 1, n_collocation_times - 1), ] ) # Delayed feedback # Make an array of all unique times in history series history_times = np.unique( np.hstack( (np.array([]), *[history_series.times for history_series in history.values()]) ) ) # By convention, the last timestep in history series is the initial time. We drop this # index history_times = history_times[:-1] # Find the historical values of states, extrapolating backward if necessary history_values = np.empty( (history_times.shape[0], len(integrated_variables) + len(collocated_variables)) ) if history_times.shape[0] > 0: for j, var in enumerate(integrated_variables + collocated_variables): var_name = var.name() try: history_series = history[var_name] except KeyError: history_values[:, j] = np.nan else: interpolation_method = self.interpolation_method(var_name) history_values[:, j] = self.interpolate( history_times, history_series.times, history_series.values, np.nan, np.nan, interpolation_method, ) # Calculate the historical derivatives of historical values history_derivatives = ca.repmat(np.nan, 1, history_values.shape[1]) if history_times.shape[0] > 1: history_derivatives = ca.vertcat( history_derivatives, np.diff(history_values, axis=0) / np.diff(history_times)[:, None], ) # Find the historical values of constant inputs, extrapolating backward if necessary constant_input_values = np.empty( (history_times.shape[0], len(self.dae_variables["constant_inputs"])) ) if history_times.shape[0] > 0: for j, var in enumerate(self.dae_variables["constant_inputs"]): var_name = var.name() try: constant_input_series = raw_constant_inputs[var_name] except KeyError: constant_input_values[:, j] = np.nan else: interpolation_method = self.interpolation_method(var_name) constant_input_values[:, j] = self.interpolate( history_times, constant_input_series.times, constant_input_series.values, np.nan, np.nan, interpolation_method, ) if len(delayed_feedback_expressions) > 0: delayed_feedback_history = np.zeros( (history_times.shape[0], len(delayed_feedback_expressions)) ) for i, time in enumerate(history_times): history_delayed_feedback_res = delayed_feedback_function.call( [ parameters, ca.veccat( ca.transpose(history_values[i, :]), ca.transpose(history_derivatives[i, :]), ca.transpose(constant_input_values[i, :]), time, ca.repmat(np.nan, len(self.path_variables)), ca.repmat(np.nan, len(self.__extra_constant_inputs)), ), ca.repmat(np.nan, len(self.extra_variables)), ] ) delayed_feedback_history[i, :] = [ float(val) for val in history_delayed_feedback_res ] initial_delayed_feedback = delayed_feedback_function.call( self.__func_initial_inputs[ensemble_member], False, True ) path_variables_nominal = np.ones(path_variables_size) offset = 0 for variable in self.__path_variable_names: variable_size = self.__variable_sizes[variable] path_variables_nominal[offset : offset + variable_size] = self.variable_nominal( variable ) offset += variable_size nominal_delayed_feedback = delayed_feedback_function.call( [ parameters, ca.vertcat( [ self.variable_nominal(var.name()) for var in integrated_variables + collocated_variables ], np.zeros((initial_derivatives.size1(), 1)), initial_constant_inputs, 0.0, path_variables_nominal, initial_extra_constant_inputs, ), extra_variables, ] ) if delayed_feedback_expressions: # Resolve delay values # First, substitute parameters for values all at once. Make # sure substitute() gets called with the right signature. This # means we need at least one element that is of type MX. delayed_feedback_durations = list(delayed_feedback_durations) delayed_feedback_durations[0] = ca.MX(delayed_feedback_durations[0]) substituted_delay_durations = ca.substitute( delayed_feedback_durations, [ca.vertcat(symbolic_parameters)], [ca.vertcat(parameters)], ) # Use mapped function to evaluate delay in terms of constant inputs mapped_delay_function = ca.Function( "delay_values", self.dae_variables["time"] + self.dae_variables["constant_inputs"], substituted_delay_durations, ).map(len(collocation_times)) # Call mapped delay function with inputs as arrays evaluated_delay_durations = mapped_delay_function.call( [collocation_times] + [constant_inputs[v.name()] for v in self.dae_variables["constant_inputs"]] ) for i in range(len(delayed_feedback_expressions)): in_variable_name = delayed_feedback_states[i] expression = delayed_feedback_expressions[i] delay = evaluated_delay_durations[i] # Resolve aliases in_canonical, in_sign = self.alias_relation.canonical_signed(in_variable_name) in_times = self.times(in_canonical) in_nominal = self.variable_nominal(in_canonical) in_values = in_nominal * self.state_vector( in_canonical, ensemble_member=ensemble_member ) if in_sign < 0: in_values *= in_sign # Cast delay from DM to np.array delay = delay.toarray().flatten() assert np.all( np.isfinite(delay) ), "Delay duration must be resolvable to real values at transcribe()" out_times = np.concatenate([history_times, collocation_times]) out_values = ca.veccat( delayed_feedback_history[:, i], initial_delayed_feedback[i], ca.transpose(discretized_delayed_feedback[i, :]), ) # Check whether enough history has been specified, and that no # needed history values are missing hist_earliest = np.min(collocation_times - delay) hist_start_ind = np.searchsorted(out_times, hist_earliest) if out_times[hist_start_ind] != hist_earliest: # We need an earlier value to interpolate with hist_start_ind -= 1 if hist_start_ind < 0 or np.any( np.isnan(delayed_feedback_history[hist_start_ind:, i]) ): logger.warning( "Incomplete history for delayed expression {}. " "Extrapolating t0 value backwards in time.".format(expression) ) out_times = out_times[len(history_times) :] out_values = out_values[len(history_times) :] # Set up delay constraints if len(collocation_times) != len(in_times): interpolation_method = self.interpolation_method(in_canonical) x_in = interpolate( in_times, in_values, collocation_times, False, interpolation_method ) else: x_in = in_values interpolation_method = self.interpolation_method(in_canonical) x_out_delayed = interpolate( out_times, out_values, collocation_times - delay, False, interpolation_method, ) nominal = nominal_delayed_feedback[i] g.append((x_in - x_out_delayed) / nominal) zeros = np.zeros(n_collocation_times) lbg.extend(zeros) ubg.extend(zeros) # Objective f_member = self.objective(ensemble_member) if f_member.size1() == 0: f_member = 0 if path_objective.size1() > 0: initial_path_objective = path_objective_function.call( self.__func_initial_inputs[ensemble_member], False, True ) f_member += initial_path_objective[0] + ca.sum1(discretized_path_objective) f.append(self.ensemble_member_probability(ensemble_member) * f_member) if logger.getEffectiveLevel() == logging.DEBUG: logger.debug("Adding objective {}".format(f_member)) # Constraints constraints = self.constraints(ensemble_member) if constraints is None: raise Exception( "The `constraints` method returned None, but should always return a list." ) if logger.getEffectiveLevel() == logging.DEBUG: for constraint in constraints: logger.debug("Adding constraint {}, {}, {}".format(*constraint)) if constraints: g_constraint, lbg_constraint, ubg_constraint = list(zip(*constraints)) lbg_constraint = list(lbg_constraint) ubg_constraint = list(ubg_constraint) # Broadcast lbg/ubg if it's a vector constraint for i, (g_i, lbg_i, ubg_i) in enumerate( zip(g_constraint, lbg_constraint, ubg_constraint) ): s = g_i.size1() if s > 1: if not isinstance(lbg_i, np.ndarray) or lbg_i.shape[0] == 1: lbg_constraint[i] = np.full(s, lbg_i) elif lbg_i.shape[0] != g_i.shape[0]: raise Exception( "Shape mismatch between constraint " "#{} ({},) and its lower bound ({},)".format( i, g_i.shape[0], lbg_i.shape[0] ) ) if not isinstance(ubg_i, np.ndarray) or ubg_i.shape[0] == 1: ubg_constraint[i] = np.full(s, ubg_i) elif ubg_i.shape[0] != g_i.shape[0]: raise Exception( "Shape mismatch between constraint " "#{} ({},) and its upper bound ({},)".format( i, g_i.shape[0], ubg_i.shape[0] ) ) g.extend(g_constraint) lbg.extend(lbg_constraint) ubg.extend(ubg_constraint) # Path constraints # We need to call self.path_constraints() again here, # as the bounds may change from ensemble member to member. if ensemble_member > 0: path_constraints = self.path_constraints(ensemble_member) if len(path_constraints) > 0: # We need to evaluate the path constraints at t0, as the initial time is not # included in the accumulation. [initial_path_constraints] = path_constraints_function.call( self.__func_initial_inputs[ensemble_member], False, True ) g.append(initial_path_constraints) g.append(discretized_path_constraints) lbg_path_constraints = np.empty( (path_constraint_expressions.size1(), n_collocation_times) ) ubg_path_constraints = np.empty( (path_constraint_expressions.size1(), n_collocation_times) ) j = 0 for path_constraint in path_constraints: if logger.getEffectiveLevel() == logging.DEBUG: logger.debug("Adding path constraint {}, {}, {}".format(*path_constraint)) s = path_constraint[0].size1() lb = path_constraint[1] if isinstance(lb, ca.MX) and not lb.is_constant(): [lb] = ca.substitute( [lb], symbolic_parameters, self.__parameter_values_ensemble_member_0 ) elif isinstance(lb, Timeseries): lb = self.interpolate( collocation_times, lb.times, lb.values, -np.inf, -np.inf ).transpose() elif isinstance(lb, np.ndarray): lb = np.broadcast_to(lb, (n_collocation_times, s)).transpose() ub = path_constraint[2] if isinstance(ub, ca.MX) and not ub.is_constant(): [ub] = ca.substitute( [ub], symbolic_parameters, self.__parameter_values_ensemble_member_0 ) elif isinstance(ub, Timeseries): ub = self.interpolate( collocation_times, ub.times, ub.values, np.inf, np.inf ).transpose() elif isinstance(ub, np.ndarray): ub = np.broadcast_to(ub, (n_collocation_times, s)).transpose() lbg_path_constraints[j : j + s, :] = lb ubg_path_constraints[j : j + s, :] = ub j += s lbg.extend(lbg_path_constraints.transpose().ravel()) ubg.extend(ubg_path_constraints.transpose().ravel()) # NLP function logger.info("Creating NLP dictionary") nlp = {"x": X, "f": ca.sum1(ca.vertcat(*f)), "g": ca.vertcat(*g)} # Done logger.info("Done transcribing problem") # Debug check coefficients self.__debug_check_transcribe_linear_coefficients(discrete, lbx, ubx, lbg, ubg, x0, nlp) return discrete, lbx, ubx, lbg, ubg, x0, nlp def clear_transcription_cache(self): """ Clears the DAE ``Function``s that were cached by ``transcribe``. """ self.__dae_residual_function_collocated = None self.__integrator_step_function = None self.__initial_residual_with_params_fun_map = None def extract_results(self, ensemble_member=0): logger.info("Extracting results") # Gather results in a dictionary control_results = self.extract_controls(ensemble_member) state_results = self.extract_states(ensemble_member) # Merge dictionaries results = AliasDict(self.alias_relation) results.update(control_results) results.update(state_results) logger.info("Done extracting results") # Return results dictionary return results @property def solver_input(self): return self.__solver_input def solver_options(self): options = super(CollocatedIntegratedOptimizationProblem, self).solver_options() solver = options["solver"] assert solver in ["bonmin", "ipopt"] # Set the option in both cases, to avoid one inadvertently remaining in the cache. options[solver]["jac_c_constant"] = "yes" if self.linear_collocation else "no" return options
[docs] def integrator_options(self): """ Configures the implicit function used for time step integration. :returns: A dictionary of CasADi :class:`rootfinder` options. See the CasADi documentation for details. """ return {}
@property def controls(self): return self.__controls def _collint_get_lbx_ubx(self, count, indices): bounds = self.bounds() lbx = np.full(count, -np.inf, dtype=np.float64) ubx = np.full(count, np.inf, dtype=np.float64) # Variables that are not collocated, and only have a single entry in the state vector scalar_variables_set = set(self.__extra_variable_names) | set(self.__integrated_states) variable_sizes = self.__variable_sizes # Bounds, defaulting to +/- inf, if not set for ensemble_member in range(self.ensemble_size): for variable, inds in indices[ensemble_member].items(): variable_size = variable_sizes[variable] if variable in scalar_variables_set: times = self.initial_time n_times = 1 else: times = self.times(variable) n_times = len(times) try: bound = bounds[variable] except KeyError: pass else: nominal = self.variable_nominal(variable) interpolation_method = self.interpolation_method(variable) if isinstance(nominal, np.ndarray): nominal = ( np.broadcast_to(nominal, (n_times, variable_size)).transpose().ravel() ) if bound[0] is not None: if isinstance(bound[0], Timeseries): lower_bound = self.interpolate( times, bound[0].times, bound[0].values, -np.inf, -np.inf, interpolation_method, ).ravel() elif isinstance(bound[0], np.ndarray): lower_bound = ( np.broadcast_to(bound[0], (n_times, variable_size)) .transpose() .ravel() ) else: lower_bound = bound[0] lbx[inds] = lower_bound / nominal if bound[1] is not None: if isinstance(bound[1], Timeseries): upper_bound = self.interpolate( times, bound[1].times, bound[1].values, +np.inf, +np.inf, interpolation_method, ).ravel() elif isinstance(bound[1], np.ndarray): upper_bound = ( np.broadcast_to(bound[1], (n_times, variable_size)) .transpose() .ravel() ) else: upper_bound = bound[1] ubx[inds] = upper_bound / nominal # Warn for NaNs if np.any(np.isnan(lbx[inds])): logger.error("Lower bound on variable {} contains NaN".format(variable)) if np.any(np.isnan(ubx[inds])): logger.error("Upper bound on variable {} contains NaN".format(variable)) return lbx, ubx def _collint_get_x0(self, count, indices): x0 = np.zeros(count, dtype=np.float64) # Variables that are not collocated, and only have a single entry in the state vector scalar_variables_set = set(self.__extra_variable_names) | set(self.__integrated_states) variable_sizes = self.__variable_sizes for ensemble_member in range(self.ensemble_size): seed = self.seed(ensemble_member) for variable, inds in indices[ensemble_member].items(): variable_size = variable_sizes[variable] if variable in scalar_variables_set: times = self.initial_time n_times = 1 else: times = self.times(variable) n_times = len(times) try: seed_k = seed[variable] nominal = self.variable_nominal(variable) interpolation_method = self.interpolation_method(variable) if isinstance(nominal, np.ndarray): nominal = ( np.broadcast_to(nominal, (n_times, variable_size)).transpose().ravel() ) if isinstance(seed_k, Timeseries): x0[inds] = ( self.interpolate( times, seed_k.times, seed_k.values, 0, 0, interpolation_method ) .transpose() .ravel() ) else: x0[inds] = seed_k x0[inds] /= nominal except KeyError: pass return x0 def _collint_get_discrete(self, count, indices): discrete = np.zeros(count, dtype=bool) for ensemble_member in range(self.ensemble_size): for variable, inds in indices[ensemble_member].items(): discrete[inds] = self.variable_is_discrete(variable) return discrete def discretize_control(self, variable, ensemble_member, times, offset): # Default implementation: One single set of control inputs for all # ensembles try: return self.__discretize_control_cache[variable] except KeyError: control_indices = slice(offset, offset + len(times)) self.__discretize_control_cache[variable] = control_indices return control_indices def discretize_controls(self, bounds): self.__discretize_control_cache = {} indices = [{} for ensemble_member in range(self.ensemble_size)] count = 0 for variable in self.controls: times = self.times(variable) for ensemble_member in range(self.ensemble_size): control_indices = self.discretize_control(variable, ensemble_member, times, count) indices[ensemble_member][variable] = control_indices control_indices_stop = ( control_indices.stop if isinstance(control_indices, slice) else (int(np.max(control_indices)) + 1) ) # indices need not be ordered count = max(count, control_indices_stop) discrete = self._collint_get_discrete(count, indices) lbx, ubx = self._collint_get_lbx_ubx(count, indices) x0 = self._collint_get_x0(count, indices) # Return number of control variables return count, discrete, lbx, ubx, x0, indices def extract_controls(self, ensemble_member=0): X = self.solver_output.copy() indices = self.__indices[ensemble_member] results = {} for variable in self.controls: inds = indices[variable] results[variable] = self.variable_nominal(variable) * X[inds] return results def control_at(self, variable, t, ensemble_member=0, scaled=False, extrapolate=True): canonical, sign = self.alias_relation.canonical_signed(variable) if canonical not in self.__controls_map: raise KeyError(variable) return self.state_at(variable, t, ensemble_member, scaled, extrapolate) @property def differentiated_states(self): return self.__differentiated_states @property def algebraic_states(self): return self.__algebraic_states def discretize_states(self, bounds): # Default implementation: States for all ensemble members variable_sizes = self.__variable_sizes # Space for collocated states ensemble_member_size = 0 if self.integrate_states: n_model_states = len(self.differentiated_states) + len(self.algebraic_states) if len(self.__integrated_states) != n_model_states: error_msg = ( "CollocatedIntegratedOptimizationProblem: " "integrated_states should specify all model states, or none at all" ) logger.error(error_msg) raise Exception(error_msg) # Count initial states only ensemble_member_size += n_model_states else: # Count discretised states over optimization horizon for variable in itertools.chain(self.differentiated_states, self.algebraic_states): ensemble_member_size += variable_sizes[variable] * len(self.times(variable)) # Count any additional path variables (which cannot be integrated) for variable in self.__path_variable_names: ensemble_member_size += variable_sizes[variable] * len(self.times(variable)) # Space for extra variables for variable in self.__extra_variable_names: ensemble_member_size += variable_sizes[variable] # Space for initial states and derivatives ensemble_member_size += len(self.dae_variables["derivatives"]) # Total space requirement count = self.ensemble_size * ensemble_member_size # Allocate arrays indices = [{} for ensemble_member in range(self.ensemble_size)] for ensemble_member in range(self.ensemble_size): offset = ensemble_member * ensemble_member_size for variable in itertools.chain(self.differentiated_states, self.algebraic_states): variable_size = variable_sizes[variable] if self.integrate_states: assert variable_size == 1 indices[ensemble_member][variable] = offset offset += 1 else: times = self.times(variable) n_times = len(times) indices[ensemble_member][variable] = slice( offset, offset + n_times * variable_size ) offset += n_times * variable_size for variable in self.__path_variable_names: variable_size = variable_sizes[variable] times = self.times(variable) n_times = len(times) indices[ensemble_member][variable] = slice(offset, offset + n_times * variable_size) offset += n_times * variable_size for extra_variable in self.__extra_variable_names: variable_size = variable_sizes[extra_variable] indices[ensemble_member][extra_variable] = slice(offset, offset + variable_size) offset += variable_size for initial_der_name in self.__initial_derivative_names: indices[ensemble_member][initial_der_name] = offset offset += 1 discrete = self._collint_get_discrete(count, indices) lbx, ubx = self._collint_get_lbx_ubx(count, indices) x0 = self._collint_get_x0(count, indices) # Return number of state variables return count, discrete, lbx, ubx, x0, indices def extract_states(self, ensemble_member=0): # Solver output X = self.solver_output.copy() indices = self.__indices[ensemble_member] # Extract control inputs results = {} # Perform integration, in order to extract integrated variables # We bundle all integrations into a single Function, so that subexpressions # are evaluated only once. if self.integrate_states: # Use integrators to facilitate common subexpression # elimination. f = ca.Function( "f", [self.solver_input], [ ca.vertcat( *[ self.__integrators[ensemble_member][variable] for variable in self.__integrated_states ] ) ], ) integrators_output = f(X) j = 0 for variable in self.__integrated_states: inds = indices[variable] initial_value = X[inds] n = self.__integrators[ensemble_member][variable].size1() results[variable] = self.variable_nominal(variable) * np.concatenate( [[initial_value], np.array(integrators_output[j : j + n, :]).ravel()] ) j += n # Extract initial derivatives for initial_der_name in self.__initial_derivative_names: inds = indices[initial_der_name] try: nominal = self.variable_nominal(initial_der_name) results[initial_der_name] = nominal * X[inds].ravel() except KeyError: pass # Extract all other variables variable_sizes = self.__variable_sizes for variable in itertools.chain( self.differentiated_states, self.algebraic_states, self.__path_variable_names, self.__extra_variable_names, ): if variable in results: continue inds = indices[variable] variable_size = variable_sizes[variable] if variable_size > 1: results[variable] = X[inds].reshape((variable_size, -1)).transpose() else: results[variable] = X[inds] results[variable] = self.variable_nominal(variable) * results[variable] # Extract constant input aliases constant_inputs = self.constant_inputs(ensemble_member) for variable in self.dae_variables["constant_inputs"]: variable = variable.name() try: constant_input = constant_inputs[variable] except KeyError: pass else: results[variable] = np.interp( self.times(variable), constant_input.times, constant_input.values ) return results def state_vector(self, variable, ensemble_member=0): indices = self.__indices[ensemble_member][variable] return self.solver_input[indices] def state_at(self, variable, t, ensemble_member=0, scaled=False, extrapolate=True): if isinstance(variable, ca.MX): variable = variable.name() if self.__variable_sizes.get(variable, 1) > 1: raise NotImplementedError("state_at() not supported for vector states") name = "{}[{},{}]{}".format( variable, ensemble_member, t - self.initial_time, "S" if scaled else "" ) if extrapolate: name += "E" try: return self.__symbol_cache[name] except KeyError: # Look up transcribe_problem() state. t0 = self.initial_time X = self.solver_input # Fetch appropriate symbol, or value. canonical, sign = self.alias_relation.canonical_signed(variable) found = False # Check if it is in the state vector try: inds = self.__indices[ensemble_member][canonical] except KeyError: pass else: times = self.times(canonical) if self.integrate_states: nominal = 1 if t == self.initial_time: sym = sign * X[inds] found = True else: variable_values = ca.horzcat( sign * X[inds], self.__integrators[ensemble_member][canonical] ).T else: nominal = self.variable_nominal(canonical) variable_values = X[inds] if not found: f_left, f_right = np.nan, np.nan if t < t0: history = self.history(ensemble_member) try: history_timeseries = history[canonical] except KeyError: if extrapolate: sym = variable_values[0] else: sym = np.nan else: if extrapolate: f_left = history_timeseries.values[0] f_right = history_timeseries.values[-1] interpolation_method = self.interpolation_method(canonical) sym = self.interpolate( t, history_timeseries.times, history_timeseries.values, f_left, f_right, interpolation_method, ) if scaled and nominal != 1: sym /= nominal else: if not extrapolate and (t < times[0] or t > times[-1]): raise Exception( "Cannot interpolate for {}: " "Point {} outside of range [{}, {}]".format( canonical, t, times[0], times[-1] ) ) interpolation_method = self.interpolation_method(canonical) sym = interpolate(times, variable_values, [t], False, interpolation_method) if not scaled and nominal != 1: sym *= nominal if sign < 0: sym *= -1 found = True if not found: constant_inputs = self.constant_inputs(ensemble_member) try: constant_input = constant_inputs[variable] found = True except KeyError: pass else: times = self.times(variable) f_left, f_right = np.nan, np.nan if extrapolate: f_left = constant_input.values[0] f_right = constant_input.values[-1] interpolation_method = self.interpolation_method(variable) sym = self.interpolate( t, constant_input.times, constant_input.values, f_left, f_right, interpolation_method, ) if not found: parameters = self.parameters(ensemble_member) try: sym = parameters[variable] found = True except KeyError: pass if not found: raise KeyError(variable) # Cache symbol. self.__symbol_cache[name] = sym return sym def variable(self, variable): return self.__variables[variable] def variable_nominal(self, variable): try: return self.__initial_derivative_nominals[variable] except KeyError: return super().variable_nominal(variable) def extra_variable(self, extra_variable, ensemble_member=0): indices = self.__indices[ensemble_member][extra_variable] return self.solver_input[indices] * self.variable_nominal(extra_variable) def __states_times_in(self, variable, t0=None, tf=None, ensemble_member=0): # Time stamps for this variable times = self.times(variable) # Set default values if t0 is None: t0 = times[0] if tf is None: tf = times[-1] # Find canonical variable canonical, sign = self.alias_relation.canonical_signed(variable) nominal = self.variable_nominal(canonical) state = self.state_vector(canonical, ensemble_member) if self.integrate_states and canonical in self.__integrators[ensemble_member]: state = ca.vertcat(state, ca.transpose(self.__integrators[ensemble_member][canonical])) state *= nominal if sign < 0: state *= -1 # Compute combined points if t0 < times[0]: history = self.history(ensemble_member) try: history_timeseries = history[canonical] except KeyError: raise Exception( "No history found for variable {}, but a historical value was requested".format( variable ) ) else: history_times = history_timeseries.times[:-1] history = history_timeseries.values[:-1] if sign < 0: history *= -1 else: history_times = np.empty(0) history = np.empty(0) # Collect time stamps and states, "knots". (indices,) = np.where(np.logical_and(times >= t0, times <= tf)) (history_indices,) = np.where(np.logical_and(history_times >= t0, history_times <= tf)) if (t0 not in times[indices]) and (t0 not in history_times[history_indices]): x0 = self.state_at(variable, t0, ensemble_member) else: t0 = x0 = ca.MX() if (tf not in times[indices]) and (tf not in history_times[history_indices]): xf = self.state_at(variable, tf, ensemble_member) else: tf = xf = ca.MX() t = ca.vertcat(t0, history_times[history_indices], times[indices], tf) x = ca.vertcat(x0, history[history_indices], state[indices[0] : indices[-1] + 1], xf) return x, t def states_in(self, variable, t0=None, tf=None, ensemble_member=0): x, _ = self.__states_times_in(variable, t0, tf, ensemble_member) return x def integral(self, variable, t0=None, tf=None, ensemble_member=0): x, t = self.__states_times_in(variable, t0, tf, ensemble_member) if x.size1() > 1: # Integrate knots using trapezoid rule x_avg = 0.5 * (x[: x.size1() - 1] + x[1:]) dt = t[1:] - t[: x.size1() - 1] return ca.sum1(x_avg * dt) else: return ca.MX(0) def der(self, variable): # Look up the derivative variable for the given non-derivative variable canonical, sign = self.alias_relation.canonical_signed(variable) try: i = self.__differentiated_states_map[canonical] return sign * self.dae_variables["derivatives"][i] except KeyError: try: i = self.__algebraic_states_map[canonical] except KeyError: i = len(self.algebraic_states) + self.__controls_map[canonical] return sign * self.__algebraic_and_control_derivatives[i] def der_at(self, variable, t, ensemble_member=0): # Special case t being t0 for differentiated states if t == self.initial_time: # We have a special symbol for t0 derivatives X = self.solver_input canonical, sign = self.alias_relation.canonical_signed(variable) try: i = self.__differentiated_states_map[canonical] except KeyError: # Fall through, in case 'variable' is not a differentiated state. pass else: initial_der_name = self.__initial_derivative_names[i] nominal = self.variable_nominal(initial_der_name) idx = self.__indices[ensemble_member][initial_der_name] return nominal * sign * X[idx] # Time stamps for this variable times = self.times(variable) if t <= self.initial_time: # Derivative requested for t0 or earlier. We need the history. history = self.history(ensemble_member) try: htimes = history[variable].times[:-1] history_and_times = np.hstack((htimes, times)) except KeyError: history_and_times = times else: history_and_times = times # Special case t being the initial available point. In this case, we have # no derivative information available. if t == history_and_times[0]: return 0.0 # Handle t being an interior point, or t0 for a non-differentiated # state for i in range(len(history_and_times)): # Use finite differences when between collocation points, and # backward finite differences when on one. if t > history_and_times[i] and t <= history_and_times[i + 1]: dx = self.state_at( variable, history_and_times[i + 1], ensemble_member=ensemble_member ) - self.state_at(variable, history_and_times[i], ensemble_member=ensemble_member) dt = history_and_times[i + 1] - history_and_times[i] return dx / dt # t does not belong to any collocation point interval raise IndexError def map_path_expression(self, expr, ensemble_member): f = ca.Function("f", self.__func_orig_inputs, [expr]).expand() initial_values = f(*self.__func_initial_inputs[ensemble_member]) # Map number_of_timeslots = len(self.times()) if number_of_timeslots > 1: fmap = f.map(number_of_timeslots - 1) values = fmap(*self.__func_map_args[ensemble_member]) all_values = ca.horzcat(initial_values, values) else: all_values = initial_values return ca.transpose(all_values) def solver_success(self, *args, **kwargs): self.__debug_check_state_output_scaling() return super().solver_success(*args, **kwargs) def _debug_get_named_nlp(self, nlp): x = nlp["x"] f = nlp["f"] g = nlp["g"] expand_f_g = ca.Function("f_g", [x], [f, g]).expand() x_sx = ca.SX.sym("X", *x.shape) f_sx, g_sx = expand_f_g(x_sx) x, f, g = x_sx, f_sx, g_sx # Build a vector of symbols with the descriptive names for useful # logging of constraints. Some decision variables may be shared # between ensemble members, so first we build a complete mapping of # state_index -> (canonical name, ensemble members, time step index) state_index_map = {} for ensemble_member in range(self.ensemble_size): indices = self.__indices_as_lists[ensemble_member] for k, v in indices.items(): for t_i, i in enumerate(v): if i in state_index_map: # Shared state vector entry between ensemble members assert k == state_index_map[i][0] assert t_i == state_index_map[i][2] state_index_map[i][1].append(ensemble_member) else: state_index_map[i] = [k, [ensemble_member], t_i] assert len(state_index_map) == x.size1() # Build descriptive decision variables for each state vector entry var_names = [] for i in range(len(state_index_map)): var_name, ensemble_members, t_i = state_index_map[i] if len(ensemble_members) == 1: ensemble_members = ensemble_members[0] else: ensemble_members = "[{}]".format( ",".join((str(x) for x in sorted(ensemble_members))) ) var_names.append("{}__e{}__t{}".format(var_name, ensemble_members, t_i)) # Create named versions of the constraints named_x = ca.vertcat(*(ca.SX.sym(v) for v in var_names)) named_g = ca.vertsplit(ca.Function("tmp", [x], [g])(named_x)) named_f = ca.vertsplit(ca.Function("tmp", [x], [f])(named_x))[0] return var_names, named_x, named_f, named_g @debug_check(DebugLevel.VERYHIGH) def __debug_check_transcribe_linear_coefficients( self, discrete, lbx, ubx, lbg, ubg, x0, nlp, tol_rhs=1e6, tol_zero=1e-12, tol_up=1e2, tol_down=1e-2, tol_range=1e3, evaluate_at_x0=False, ): nlp = nlp.copy() expand_f_g = ca.Function("f_g", [nlp["x"]], [nlp["f"], nlp["g"]]).expand() X_sx = ca.SX.sym("X", *nlp["x"].shape) f_sx, g_sx = expand_f_g(X_sx) nlp["x"] = X_sx nlp["f"] = f_sx nlp["g"] = g_sx lbg = np.array(ca.veccat(*lbg)).ravel() ubg = np.array(ca.veccat(*ubg)).ravel() var_names, named_x, named_f, named_g = self._debug_get_named_nlp(nlp) def constr_to_str(i): c_str = str(named_g[i]) lb, ub = lbg[i], ubg[i] if np.isfinite(lb) and np.isfinite(ub) and lb == ub: c_str = "{} = {}".format(c_str, lb) elif np.isfinite(lb) and np.isfinite(ub): c_str = "{} <= {} <= {}".format(lb, c_str, ub) elif np.isfinite(lb): c_str = "{} >= {}".format(c_str, lb) elif np.isfinite(ub): c_str = "{} <= {}".format(c_str, ub) return c_str # Checking for right hand side of constraints logger.info("Sanity check of lbg and ubg, checking for small values (<{})".format(tol_zero)) lbg_abs_no_zero = np.abs(lbg.copy()) lbg_abs_no_zero[lbg_abs_no_zero == 0.0] = +np.inf ind = np.argmin(lbg_abs_no_zero) if np.any(np.isfinite(lbg_abs_no_zero)): logger.info("Smallest (absolute) lbg coefficient {}".format(lbg_abs_no_zero[ind])) logger.info("E.g., {}".format(constr_to_str(ind))) lbg_inds = lbg_abs_no_zero < tol_zero if np.any(lbg_inds): logger.info("Too small of a (absolute) lbg found: {}".format(min(lbg[lbg_inds]))) ubg_abs_no_zero = np.abs(ubg.copy()) ubg_abs_no_zero[ubg_abs_no_zero == 0.0] = +np.inf ind = np.argmin(ubg_abs_no_zero) if np.any(np.isfinite(ubg_abs_no_zero)): logger.info("Smallest (absolute) ubg coefficient {}".format(ubg_abs_no_zero[ind])) logger.info("E.g., {}".format(constr_to_str(ind))) ubg_inds = ubg_abs_no_zero < tol_zero if np.any(ubg_inds): logger.info("Too small of a (absolute) ubg found: {}".format(min(ubg[ubg_inds]))) logger.info("Sanity check of lbg and ubg, checking for large values (>{})".format(tol_rhs)) lbg_abs_no_inf = np.abs(lbg.copy()) lbg_abs_no_inf[~np.isfinite(lbg_abs_no_inf)] = -np.inf ind = np.argmax(lbg_abs_no_inf) if np.any(np.isfinite(lbg_abs_no_inf)): logger.info("Largest (absolute) lbg coefficient {}".format(lbg_abs_no_inf[ind])) logger.info("E.g., {}".format(constr_to_str(ind))) lbg_inds = lbg_abs_no_inf > tol_rhs if np.any(lbg_inds): raise Exception("Too large of a (absolute) lbg found: {}".format(max(lbg[lbg_inds]))) ubg_abs_no_inf = np.abs(ubg.copy()) ubg_abs_no_inf[~np.isfinite(ubg)] = -np.inf ind = np.argmax(ubg_abs_no_inf) if np.any(np.isfinite(ubg_abs_no_inf)): logger.info("Largest (absolute) ubg coefficient {}".format(ubg_abs_no_inf[ind])) logger.info("E.g., {}".format(constr_to_str(ind))) ubg_inds = ubg_abs_no_inf > tol_rhs if np.any(ubg_inds): raise Exception("Too large of a (absolute) ubg found: {}".format(max(ubg[ubg_inds]))) eval_point = x0 if evaluate_at_x0 else 1.0 eval_point_str = "x0" if evaluate_at_x0 else "1.0" # Check coefficient matrix logger.info( "Sanity check on objective and constraints Jacobian matrix" "/constant coefficients values" ) in_var = nlp["x"] out = [] for o in [nlp["f"], nlp["g"]]: Af = ca.Function("Af", [in_var], [ca.jacobian(o, in_var)]) bf = ca.Function("bf", [in_var], [o]) A = Af(eval_point) A = ca.sparsify(A) b = bf(0) b = ca.sparsify(b) out.append((A.tocsc().tocoo(), b.tocsc().tocoo())) # Objective A_obj, b_obj = out[0] logger.info( "Statistics of objective: max & min of abs(jac(f, {}))) f({}), constants".format( eval_point_str, eval_point_str ) ) max_obj_A = max(np.abs(A_obj.data), default=None) min_obj_A = min(np.abs(A_obj.data[A_obj.data != 0.0]), default=None) obj_x0 = np.array(ca.Function("tmp", [in_var], [nlp["f"]])(eval_point)).ravel()[0] obj_b = b_obj.data[0] if len(b_obj.data) > 0 else 0.0 logger.info("{} & {}, {}, {}".format(max_obj_A, min_obj_A, obj_x0, obj_b)) if abs(obj_b) > tol_up: logger.info( "Constant '{}' in objective exceeds upper tolerance of '{}'".format(obj_b, tol_up) ) if abs(obj_b) > tol_up: logger.info( "Objective value at x0 '{}' exceeds upper tolerance of '{}'".format(obj_x0, tol_up) ) # Constraints A_constr, b_constr = out[1] logger.info( "Statistics of constraints: max & min of abs(jac(g, x0))), max & min of abs(g(x0))" ) max_constr_A = max(np.abs(A_constr.data), default=None) min_constr_A = min(np.abs(A_constr.data[A_constr.data != 0.0]), default=None) max_constr_b = max(np.abs(b_constr.data), default=None) min_constr_b = min(np.abs(b_constr.data[b_constr.data != 0.0]), default=None) logger.info( "{} & {}, {} & {}".format(max_constr_A, min_constr_A, max_constr_b, min_constr_b) ) maxs = [x for x in [max_constr_A, max_constr_b, max_obj_A, obj_b] if x is not None] mins = [x for x in [min_constr_A, min_constr_b, min_obj_A, obj_b] if x is not None] if (maxs and max(maxs) > tol_up) or (mins and min(mins) < tol_down): logger.info("Jacobian matrix /constants coefficients values outside typical range!") # Check on individual constraints. (Only check values of constraint's Jacobian.) A_constr_csr = A_constr.tocsr() exceedences = [] for i in range(A_constr_csr.shape[0]): r = A_constr_csr.getrow(i) data = r.data try: max_r = max(np.abs(data)) min_r = min(np.abs(data)) except ValueError: # Emtpy constraint? continue assert min_r != 0.0 if max_r > tol_up or min_r < tol_down or max_r / min_r > tol_range: c_str = constr_to_str(i) exceedences.append((i, max_r, min_r, c_str)) if exceedences: logger.info( "Exceedence in jacobian of constraints evaluated at x0" " (max > {:g}, min < {:g}, or max / min > {:g}):".format( tol_up, tol_down, tol_range ) ) exceedences = sorted(exceedences, key=lambda x: x[1] / x[2], reverse=True) for i, (r, max_r, min_r, c) in enumerate(exceedences): logger.info( "row {} (max: {}, min: {}, range: {}): {}".format( r, max_r, min_r, max_r / min_r, c ) ) if i >= 9: logger.info( "Too many warnings of same type ({} others remain).".format( len(exceedences) - 10 ) ) break # Columns A_constr_csc = A_constr.tocsc() coeffs = [] max_range_found = 1.0 logger.info( "Checking for range exceedence for each variable (i.e., check Jacobian matrix columns)" ) exceedences = [] for c in range(A_constr_csc.shape[1]): cur_col = A_constr_csc.getcol(c) cur_coeffs = cur_col.data if len(cur_coeffs) == 0: coeffs.append(None) continue abs_coeffs = np.abs(cur_coeffs) max_r_i = np.argmax(abs_coeffs) min_r_i = np.argmin(abs_coeffs) max_r = abs_coeffs[max_r_i] min_r = abs_coeffs[min_r_i] assert min_r != 0.0 max_range_found = max(max_r / min_r, max_range_found) if max_r / min_r > tol_range: inds = cur_col.indices c_min = inds[min_r_i] c_max = inds[max_r_i] r = A_constr_csr.getrow(c_min) c_min_str = constr_to_str(c_min) r = A_constr_csr.getrow(c_max) c_max_str = constr_to_str(c_max) exceedences.append((c, max_r / min_r, min_r, max_r, c_min_str, c_max_str)) coeffs.append((min_r, max_r)) exceedences = sorted(exceedences, key=lambda x: x[1], reverse=True) logger.info("Max range found: {}".format(max_range_found)) if exceedences: logger.info("Exceedence in range per column (max / min > {:g}):".format(tol_range)) for i, (c, exc, min_, max_, c_min_str, c_max_str) in enumerate(exceedences): logger.info( "col {} ({}): range {}, min {}, max {}".format( c, var_names[c], exc, min_, max_ ) ) logger.info(c_min_str) logger.info(c_max_str) if i >= 9: logger.info( "Too many warnings of same type ({} others remain).".format( len(exceedences) - 10 ) ) break logger.info("Checking for range exceedence for variables in the objective function") max_range_found = 1.0 exceedences = [] for c, d in zip(A_obj.col, A_obj.data): cofc = coeffs[c] if cofc is None: # Variable does not appear in constraints continue min_r, max_r = cofc obj_coeff = abs(d) max_range = max(obj_coeff / min_r, max_r / obj_coeff) max_range_found = max(max_range, max_range_found) if max_range > tol_range: exceedences.append((c, max_range, obj_coeff, min_r, max_r)) logger.info("Max range found: {}".format(max_range_found)) if exceedences: logger.info( "Exceedence in range of objective variable (range > {:g}):".format(tol_range) ) for i, (c, max_range, obj_coeff, min_r, max_r) in enumerate(exceedences): logger.info( "col {} ({}): range: {}, obj: {}, min constr: {}, max constr {}".format( c, var_names[c], max_range, obj_coeff, min_r, max_r ) ) if i >= 9: logger.info( "Too many warnings of same type ({} others remain).".format( len(exceedences) - 10 ) ) break @debug_check(DebugLevel.VERYHIGH) def __debug_check_state_output_scaling(self, tol_up=1e4, tol_down=1e-2, ignore_all_zero=True): """ Check the scaling using the resulting/optimized solver output. Exceedences of the (absolute) state vector of `tol_up` are rather unambiguously bad. If a certain variable has _any_ violation, we report it. Exceedences on `tol_down` are more difficult as maybe the scaling is correct, but the answer just happened to be (almost) zero. We only report if _all_ values are in violation (and even then can't really be certain). """ abs_output = np.abs(self.solver_output) inds_up = np.flatnonzero(abs_output >= tol_up) inds_down = np.flatnonzero(abs_output <= tol_down) indices = self.__indices_as_lists variable_to_all_indices = {k: set(v) for k, v in indices[0].items()} for ensemble_indices in indices[1:]: for k, v in ensemble_indices.items(): variable_to_all_indices[k] |= v if len(inds_up) > 0: exceedences = [] for k, v in variable_to_all_indices.items(): inds = v.intersection(inds_up) if inds: exceedences.append((k, max(abs_output[list(inds)]))) exceedences = sorted(exceedences, key=lambda x: x[1], reverse=True) if exceedences: logger.info( "Variables with at least one (absolute) state vector entry/entries " "larger than {}".format(tol_up) ) for k, v in exceedences: logger.info("{}: abs max = {}".format(k, v)) if len(inds_down) > 0: exceedences = [] for k, v in variable_to_all_indices.items(): if v.issubset(inds_down): exceedences.append((k, max(abs_output[list(v)]))) exceedences = sorted(exceedences, key=lambda x: x[1], reverse=True) if next((v for k, v in exceedences if not ignore_all_zero or v > 0.0), None): ignore_all_zero_string = " (but not all zero)" if ignore_all_zero else "" logger.info( "Variables with all (absolute) state vector entry/entries " "smaller than {}{}".format(tol_down, ignore_all_zero_string) ) for k, v in exceedences: if ignore_all_zero and v == 0.0: continue logger.info("{}: abs max = {}".format(k, v))