Source code for rtctools.optimization.goal_programming_mixin_base

import functools
import logging
import sys
from abc import ABCMeta, abstractmethod
from collections import OrderedDict
from collections.abc import Callable

import casadi as ca
import numpy as np

from .optimization_problem import OptimizationProblem
from .timeseries import Timeseries

logger = logging.getLogger("rtctools")


class _EmptyEnsembleList(list):
    """
    An indexable object containing infinitely many empty lists.
    Only to be used as a placeholder.
    """

    def __getitem__(self, key):
        return []


class _EmptyEnsembleOrderedDict(OrderedDict):
    """
    An indexable object containing infinitely many empty OrderedDicts.
    Only to be used as a placeholder.
    """

    def __getitem__(self, key):
        return OrderedDict()


[docs] class Goal(metaclass=ABCMeta): r""" Base class for lexicographic goal programming goals. **Types of goals** There are 2 types of goals: minimization goals and target goals. *Minimization goals* Minimization goals are of the form: .. math:: \text{minimize } f(x). *Target goals* Target goals weakly enforce a constraint of the form .. math:: m_{target} \leq g(x) \leq M_{target}, by turning it into a minimization problem of the form .. math:: \text{minimize } & \epsilon^r, \\ \text{subject to } &g_{low}(\epsilon) \leq g(x) \leq g_{up}(\epsilon), \\ \text{and } &0 \leq \epsilon \leq 1, where .. math:: g_{low}(\epsilon) &:= (1-\epsilon) m_{target} + \epsilon m, \\ g_{up}(\epsilon) &:= (1-\epsilon) M_{target} + \epsilon M. Here, :math:`m` and :math:`M` are hard constraints for :math:`g(x)`, :math:`m_{target}` and :math:`M_{target}` are target bounds for :math:`g(x)`, :math:`\epsilon` is an auxiliary variable that indicates how strongly the target bounds are violated, and :math:`\epsilon^r` is a function that indicates the variation of :math:`\epsilon`, where the order :math:`r` is by default :math:`2`. We have .. math:: m < m_{target} \leq M_{target} < M. Note that when :math:`\epsilon=0`, the constraint on :math:`g(x)` becomes .. math:: m_{target} \leq g(x) \leq M_{target} and if :math:`\epsilon=1`, it becomes .. math:: m \leq g(x) \leq M. **Scaling goals** Goals can be scaled by a nominal value :math:`c_{nom}` to improve the performance of the solvers. In case of a minimization goal, the scaled problem is given by .. math:: \text{minimize } \hat{f}(x), where :math:`\hat{f}(x) := f(x) / c_{nom}`. In case of a target goal, the scaled problem is given by .. math:: \text{minimize } & \epsilon^r, \\ \text{subject to } &\hat{g}_{low}(\epsilon) \leq \hat{g}(x) \leq \hat{g}_{up}(\epsilon), \\ \text{and } &0 \leq \epsilon \leq 1, where :math:`\hat{g}(x) := g(x) / c_{nom}`, :math:`\hat{g}_{low}(\epsilon) := {g}_{low}(\epsilon) / c_{nom}`, and :math:`\hat{g}_{up}(\epsilon) := {g}_{up}(\epsilon) / c_{nom}`. **Implementing goals** A goal class is created by inheriting from the :py:class:Goal class and overriding the :func:`function` method. This method defines the goal function :math:`f(x)` in case of a minimization goal, and the goal function :math:`g(x)` in case of a target goal. A goal becomes a target goal if either the class attribute ``target_min`` or ``target_max`` is set. To further define a goal, the following class attributes can also be set. :cvar function_range: Range of goal function :math:`[m ,M]`. Only applies to target goals. Required for a target goal. :cvar function_nominal: Nominal value of a function :math:`c_{nom}`. Used for scaling. Default is ``1``. :cvar target_min: Desired lower bound for goal function :math:`m_{target}`. Default is ``numpy.nan``. :cvar target_max: Desired upper bound for goal function :math:`M_{target}`. Default is ``numpy.nan``. :cvar priority: Priority of a goal. Default is ``1``. :cvar weight: Optional weighting applied to the goal. Default is ``1.0``. :cvar order: Penalization order of goal violation :math:`r`. Default is ``2``. :cvar critical: If ``True``, the algorithm will abort if this goal cannot be fully met. Default is ``False``. :cvar relaxation: Amount of slack added to the hard constraints related to the goal. Must be a nonnegative value. The unit is equal to that of the goal function. Default is ``0.0``. When ``target_min`` is set, but not ``target_max``, the target goal becomes a lower bound target goal and the constraint on :math:`g(x)` becomes .. math:: g_{low}(\epsilon) \leq g(x). Similary, if ``target_max`` is set, but not ``target_min``, the target goal becomes a upper bound target goal and the constraint on :math:`g(x)` becomes .. math:: g(x) \leq g_{up}(\epsilon). Relaxation is used to loosen the constraints that are set after the optimization of the goal's priority. Notes: * If one is unsure about the function range, it is recommended to overestimate this interval. However, this will negatively influence how accurately the target bounds are met. * The function range should be strictly larger than the target range. In particular, :math:`m < m_{target}` and :math:`M_{target} < M`. * In a path goal, the target can be a Timeseries. * In case of multiple goals with the same priority, it is crucial that an accurate function nominal value is provided. This ensures that all goals are given similar importance. A goal can be written in vector form. In a vector goal: * The goal size determines how many goals there are. * The goal function has shape ``(goal size, 1)``. * The function is either minimized or has, possibly various, targets. * Function nominal can either be an array with as many entries as the goal size or have a single value. * Function ranges can either be an array with as many entries as the goal size or have a single value. * In a goal, the target can either be an array with as many entries as the goal size or have a single value. * In a path goal, the target can also be a Timeseries whose values are either a 1-dimensional vector or have as many columns as the goal size. **Examples** Example definition of the point goal :math:`x(t) \geq 1.1` for :math:`t=1.0` at priority 1:: class MyGoal(Goal): def function(self, optimization_problem, ensemble_member): # State 'x' at time t = 1.0 t = 1.0 return optimization_problem.state_at('x', t, ensemble_member) function_range = (1.0, 2.0) target_min = 1.1 priority = 1 Example definition of the path goal :math:`x(t) \geq 1.1` for all :math:`t` at priority 2:: class MyPathGoal(Goal): def function(self, optimization_problem, ensemble_member): # State 'x' at any point in time return optimization_problem.state('x') function_range = (1.0, 2.0) target_min = 1.1 priority = 2 **Note path goals** Note that for path goals, the ensemble member index is not passed to the call to :func:`OptimizationProblem.state`. This call returns a time-independent symbol that is also independent of the active ensemble member. Path goals are applied to all times and all ensemble members simultaneously. """
[docs] @abstractmethod def function(self, optimization_problem: OptimizationProblem, ensemble_member: int) -> ca.MX: """ This method returns a CasADi :class:`MX` object describing the goal function. :returns: A CasADi :class:`MX` object. """ pass
#: Range of goal function function_range = (np.nan, np.nan) #: Nominal value of function (used for scaling) function_nominal = 1.0 #: Desired lower bound for goal function target_min = np.nan #: Desired upper bound for goal function target_max = np.nan #: Lower priority goals take precedence over higher priority goals. priority = 1 #: Goals with the same priority are weighted off against each other in a #: single objective function. weight = 1.0 #: The goal violation value is taken to the order'th power in the objective #: function. order = 2 #: The size of the goal if it's a vector goal. size = 1 #: Critical goals must always be fully satisfied. critical = False #: Absolute relaxation applied to the optimized values of this goal relaxation = 0.0 #: Timeseries ID for function value data (optional) function_value_timeseries_id = None #: Timeseries ID for goal violation data (optional) violation_timeseries_id = None @property def has_target_min(self) -> bool: """ ``True`` if the user goal has min bounds. """ if isinstance(self.target_min, Timeseries): return True else: return np.any(np.isfinite(self.target_min)) @property def has_target_max(self) -> bool: """ ``True`` if the user goal has max bounds. """ if isinstance(self.target_max, Timeseries): return True else: return np.any(np.isfinite(self.target_max)) @property def has_target_bounds(self) -> bool: """ ``True`` if the user goal has min/max bounds. """ return self.has_target_min or self.has_target_max @property def is_empty(self) -> bool: target_min_set = isinstance(self.target_min, Timeseries) or np.any( np.isfinite(self.target_min) ) target_max_set = isinstance(self.target_max, Timeseries) or np.any( np.isfinite(self.target_max) ) if not target_min_set and not target_max_set: # A minimization goal return False target_min = self.target_min if isinstance(target_min, Timeseries): target_min = target_min.values target_max = self.target_max if isinstance(target_max, Timeseries): target_max = target_max.values min_empty = not np.any(np.isfinite(target_min)) max_empty = not np.any(np.isfinite(target_max)) return min_empty and max_empty
[docs] def get_function_key( self, optimization_problem: OptimizationProblem, ensemble_member: int ) -> str: """ Returns a key string uniquely identifying the goal function. This is used to eliminate linearly dependent constraints from the optimization problem. """ if hasattr(self, "function_key"): return self.function_key # This must be deterministic. See RTCTOOLS-485. if not hasattr(Goal, "_function_key_counter"): Goal._function_key_counter = 0 self.function_key = f"{self.__class__.__name__}_{Goal._function_key_counter}" Goal._function_key_counter += 1 return self.function_key
def __repr__(self) -> str: return ( f"{self.__class__}(priority={self.priority}, target_min={self.target_min}, " f"target_max={self.target_max}, function_range={self.function_range})" )
[docs] class StateGoal(Goal): r""" Base class for lexicographic goal programming path goals that act on a single model state. A state goal is defined by setting at least the ``state`` class variable. :cvar state: State on which the goal acts. *Required*. :cvar target_min: Desired lower bound for goal function. Default is ``numpy.nan``. :cvar target_max: Desired upper bound for goal function. Default is ``numpy.nan``. :cvar priority: Integer priority of goal. Default is ``1``. :cvar weight: Optional weighting applied to the goal. Default is ``1.0``. :cvar order: Penalization order of goal violation. Default is ``2``. :cvar critical: If ``True``, the algorithm will abort if this goal cannot be fully met. Default is ``False``. Example definition of the goal :math:`x(t) \geq 1.1` for all :math:`t` at priority 2:: class MyStateGoal(StateGoal): state = 'x' target_min = 1.1 priority = 2 Contrary to ordinary ``Goal`` objects, ``PathGoal`` objects need to be initialized with an ``OptimizationProblem`` instance to allow extraction of state metadata, such as bounds and nominal values. Consequently, state goals must be instantiated as follows:: my_state_goal = MyStateGoal(optimization_problem) Note that ``StateGoal`` is a helper class. State goals can also be defined using ``Goal`` as direct base class, by implementing the ``function`` method and providing the ``function_range`` and ``function_nominal`` class variables manually. """ #: The state on which the goal acts. state = None
[docs] def __init__(self, optimization_problem): """ Initialize the state goal object. :param optimization_problem: ``OptimizationProblem`` instance. """ # Check whether a state has been specified if self.state is None: raise Exception("Please specify a state.") # Extract state range from model if self.has_target_bounds: try: if optimization_problem.ensemble_specific_bounds: bounds = optimization_problem.bounds(0) bounds_state_ref = bounds[self.state] if np.array_equal(self.function_range, (np.nan, np.nan), equal_nan=True): # If the user has not set the function range themselves, we # try and set it automatically. This is only possible if the # bounds are the same for all ensemble members. for ensemble_member in range(optimization_problem.ensemble_size): bounds_state_ensemble = optimization_problem.bounds(ensemble_member)[ self.state ] # First, check if the types are equal, and then check if the values are # equal. For Timeseries and floats, we can do `==` comparison, for # arrays we need to use np.all. To simplify we wrap the `==` for floats # in an `np.all` as well. if type(bounds_state_ref) is not type( bounds_state_ensemble ) or not np.all(bounds_state_ref == bounds_state_ensemble): raise ValueError( f"Bounds for state {self.state} are not the same for all " f"ensemble members; please set the function_range explicitly" ) else: bounds = optimization_problem.bounds() self.function_range = bounds[self.state] except KeyError: raise Exception(f"State {self.state} has no bounds or does not exist in the model.") if self.function_range[0] is None: raise Exception(f"Please provide a lower bound for state {self.state}.") if self.function_range[1] is None: raise Exception(f"Please provide an upper bound for state {self.state}.") # Extract state nominal from model self.function_nominal = optimization_problem.variable_nominal(self.state) # Set function key canonical, sign = optimization_problem.alias_relation.canonical_signed(self.state) self.function_key = canonical if sign > 0.0 else "-" + canonical
def function(self, optimization_problem, ensemble_member): return optimization_problem.state(self.state) def __repr__(self): return ( f"{self.__class__}(priority={self.priority}, state={self.state}, " f"target_min={self.target_min}, target_max={self.target_max}, " f"function_range={self.function_range})" )
class _GoalConstraint: def __init__( self, goal: Goal, function: Callable[[OptimizationProblem], ca.MX], m: float | np.ndarray | Timeseries, M: float | np.ndarray | Timeseries, optimized: bool, ): assert isinstance(m, (float, np.ndarray, Timeseries)) assert isinstance(M, (float, np.ndarray, Timeseries)) assert type(m) is type(M) # NumPy arrays only allowed for vector goals if isinstance(m, np.ndarray): assert len(m) == goal.size assert len(M) == goal.size self.goal = goal self.function = function self.min = m self.max = M self.optimized = optimized def update_bounds(self, other, enforce="self"): # NOTE: a.update_bounds(b) is _not_ the same as b.update_bounds(a). # See how the 'enforce' parameter is used. min_, max_ = self.min, self.max other_min, other_max = other.min, other.max if isinstance(min_, Timeseries): assert isinstance(max_, Timeseries) assert isinstance(other_min, Timeseries) assert isinstance(other_max, Timeseries) min_ = min_.values max_ = max_.values other_min = other_min.values other_max = other_max.values min_ = np.maximum(min_, other_min) max_ = np.minimum(max_, other_max) # Ensure new constraint bounds do not loosen or shift # previous bounds due to numerical errors. if enforce == "self": min_ = np.minimum(max_, other_min) max_ = np.maximum(min_, other_max) else: min_ = np.minimum(min_, other_max) max_ = np.maximum(max_, other_min) # Ensure consistency of bounds. Bounds may become inconsistent due to # small numerical computation errors. min_ = np.minimum(min_, max_) if isinstance(self.min, Timeseries): self.min = Timeseries(self.min.times, min_) self.max = Timeseries(self.max.times, max_) else: self.min = min_ self.max = max_ class _GoalProgrammingMixinBase(OptimizationProblem, metaclass=ABCMeta): def _gp_n_objectives(self, subproblem_objectives, subproblem_path_objectives, ensemble_member): return ( ca.vertcat(*[o(self, ensemble_member) for o in subproblem_objectives]).size1() + ca.vertcat(*[o(self, ensemble_member) for o in subproblem_path_objectives]).size1() ) def _gp_objective(self, subproblem_objectives, n_objectives, ensemble_member): if len(subproblem_objectives) > 0: acc_objective = ca.sum1( ca.vertcat(*[o(self, ensemble_member) for o in subproblem_objectives]) ) if self.goal_programming_options()["scale_by_problem_size"]: acc_objective = acc_objective / n_objectives return acc_objective else: return ca.MX(0) def _gp_path_objective(self, subproblem_path_objectives, n_objectives, ensemble_member): if len(subproblem_path_objectives) > 0: acc_objective = ca.sum1( ca.vertcat(*[o(self, ensemble_member) for o in subproblem_path_objectives]) ) if self.goal_programming_options()["scale_by_problem_size"]: # Objective is already divided by number of active time steps # at this point when `scale_by_problem_size` is set. acc_objective = acc_objective / n_objectives return acc_objective else: return ca.MX(0) @abstractmethod def goal_programming_options(self) -> dict[str, float | bool]: raise NotImplementedError() def goals(self) -> list[Goal]: """ User problem returns list of :class:`Goal` objects. :returns: A list of goals. """ return [] def path_goals(self) -> list[Goal]: """ User problem returns list of path :class:`Goal` objects. :returns: A list of path goals. """ return [] def _gp_min_max_arrays(self, g, target_shape=None): """ Broadcasts the goal target minimum and target maximum to arrays of a desired target shape. Depending on whether g is a vector goal or not, the output shape differs: - A 2-D array of size (goal.size, target_shape or 1) if the goal size is larger than one, i.e. a vector goal - A 1-D array of size (target_shape or 1, ) otherwise """ times = self.times() m, M = None, None if isinstance(g.target_min, Timeseries): m = self.interpolate(times, g.target_min.times, g.target_min.values, -np.inf, -np.inf) if m.ndim > 1: m = m.transpose() elif isinstance(g.target_min, np.ndarray) and target_shape: m = np.broadcast_to(g.target_min, (target_shape, g.size)).transpose() elif target_shape: m = np.full(target_shape, g.target_min) else: m = np.array([g.target_min]).transpose() if isinstance(g.target_max, Timeseries): M = self.interpolate(times, g.target_max.times, g.target_max.values, np.inf, np.inf) if M.ndim > 1: M = M.transpose() elif isinstance(g.target_max, np.ndarray) and target_shape: M = np.broadcast_to(g.target_max, (target_shape, g.size)).transpose() elif target_shape: M = np.full(target_shape, g.target_max) else: M = np.array([g.target_max]).transpose() if g.size > 1 and m.ndim == 1: m = np.broadcast_to(m, (g.size, len(m))) if g.size > 1 and M.ndim == 1: M = np.broadcast_to(M, (g.size, len(M))) if g.size > 1: assert m.shape == (g.size, 1 if target_shape is None else target_shape) else: assert m.shape == (1 if target_shape is None else target_shape,) assert m.shape == M.shape return m, M def _gp_validate_goals(self, goals, is_path_goal): goals = sorted(goals, key=lambda x: x.priority) options = self.goal_programming_options() # Validate goal definitions for goal in goals: m, M = goal.function_range # The function range should not be a symbolic expression if isinstance(m, ca.MX): assert m.is_constant() if m.size1() == 1: m = float(m) else: m = np.array(m.to_DM()) if isinstance(M, ca.MX): assert M.is_constant() if M.size1() == 1: M = float(M) else: M = np.array(M.to_DM()) assert isinstance(m, (float, int, np.ndarray)) assert isinstance(M, (float, int, np.ndarray)) if np.any(goal.function_nominal <= 0): raise Exception(f"Nonpositive nominal value specified for goal {goal}") if goal.critical and not goal.has_target_bounds: raise Exception("Minimization goals cannot be critical") if goal.critical: # Allow a function range for backwards compatibility reasons. # Maybe raise a warning that its not actually used? pass elif goal.has_target_bounds: if not np.all(np.isfinite(m)) or not np.all(np.isfinite(M)): raise Exception(f"No function range specified for goal {goal}") if np.any(m >= M): raise Exception(f"Invalid function range for goal {goal}") if goal.weight <= 0: raise Exception(f"Goal weight should be positive for goal {goal}") else: if goal.function_range != (np.nan, np.nan): raise Exception(f"Specifying function range not allowed for goal {goal}") if not is_path_goal: if isinstance(goal.target_min, Timeseries): raise Exception(f"Target min cannot be a Timeseries for goal {goal}") if isinstance(goal.target_max, Timeseries): raise Exception(f"Target max cannot be a Timeseries for goal {goal}") try: int(goal.priority) except ValueError: raise Exception(f"Priority of not int or castable to int for goal {goal}") if options["keep_soft_constraints"]: if goal.relaxation != 0.0: raise Exception( f"Relaxation not allowed with `keep_soft_constraints` for goal {goal}" ) if goal.violation_timeseries_id is not None: raise Exception( "Violation timeseries id not allowed with " f"`keep_soft_constraints` for goal {goal}" ) else: if goal.size > 1: raise Exception( f"Option `keep_soft_constraints` needs to be set for vector goal {goal}" ) if goal.critical and goal.size > 1: raise Exception(f"Vector goal cannot be critical for goal {goal}") if is_path_goal: target_shape = len(self.times()) else: target_shape = None # Check consistency and monotonicity of goals. Scalar target min/max # of normal goals are also converted to arrays to unify checks with # path goals. if options["check_monotonicity"]: for e in range(self.ensemble_size): # Store the previous goal of a certain function key we # encountered, such that we can compare to it. fk_goal_map = {} for goal in goals: fk = goal.get_function_key(self, e) prev = fk_goal_map.get(fk) fk_goal_map[fk] = goal if prev is not None: goal_m, goal_M = self._gp_min_max_arrays(goal, target_shape) other_m, other_M = self._gp_min_max_arrays(prev, target_shape) indices = np.where( np.logical_not(np.logical_or(np.isnan(goal_m), np.isnan(other_m))) ) if goal.has_target_min: if np.any(goal_m[indices] < other_m[indices]): raise Exception( f"Target minimum of goal {goal} must be greater or equal than " f"target minimum of goal {prev}." ) indices = np.where( np.logical_not(np.logical_or(np.isnan(goal_M), np.isnan(other_M))) ) if goal.has_target_max: if np.any(goal_M[indices] > other_M[indices]): raise Exception( f"Target maximum of goal {goal} must be less or equal than " f"target maximum of goal {prev}" ) for goal in goals: goal_m, goal_M = self._gp_min_max_arrays(goal, target_shape) goal_lb = np.broadcast_to(goal.function_range[0], goal_m.shape[::-1]).transpose() goal_ub = np.broadcast_to(goal.function_range[1], goal_M.shape[::-1]).transpose() if goal.has_target_min and goal.has_target_max: indices = np.where( np.logical_not(np.logical_or(np.isnan(goal_m), np.isnan(goal_M))) ) if np.any(goal_m[indices] > goal_M[indices]): raise Exception(f"Target minimum exceeds target maximum for goal {goal}") if goal.has_target_min and not goal.critical: indices = np.where(np.isfinite(goal_m)) if np.any(goal_m[indices] <= goal_lb[indices]): raise Exception( "Target minimum should be greater than the lower bound " f"of the function range for goal {goal}" ) if np.any(goal_m[indices] > goal_ub[indices]): raise Exception( "Target minimum should not be greater than the upper bound " f"of the function range for goal {goal}" ) if goal.has_target_max and not goal.critical: indices = np.where(np.isfinite(goal_M)) if np.any(goal_M[indices] >= goal_ub[indices]): raise Exception( "Target maximum should be smaller than the upper bound " f"of the function range for goal {goal}" ) if np.any(goal_M[indices] < goal_lb[indices]): raise Exception( "Target maximum should not be smaller than the lower bound " f"of the function range for goal {goal}" ) if goal.relaxation < 0.0: raise Exception(f"Relaxation of goal {goal} should be a nonnegative value") def _gp_goal_constraints(self, goals, sym_index, options, is_path_goal): """ There are three ways in which a goal turns into objectives/constraints: 1. A goal with target bounds results in a part for the objective (the violation variable), and 1 or 2 constraints (target min, max, or both). 2. A goal without target bounds (i.e. minimization goal) results in just a part for the objective. 3. A critical goal results in just a (pair of) constraint(s). These are hard constraints, which need to be put in the constraint store to guarantee linear independence. """ epsilons = [] objectives = [] soft_constraints = [[] for ensemble_member in range(self.ensemble_size)] hard_constraints = [[] for ensemble_member in range(self.ensemble_size)] extra_constants = [] eps_format = "eps_{}_{}" min_format = "min_{}_{}" max_format = "max_{}_{}" if is_path_goal: eps_format = "path_" + eps_format min_format = "path_" + min_format max_format = "path_" + max_format for j, goal in enumerate(goals): if goal.critical: assert goal.size == 1, "Critical goals cannot be vector goals" epsilon = np.zeros(len(self.times()) if is_path_goal else 1) elif goal.has_target_bounds: epsilon = ca.MX.sym(eps_format.format(sym_index, j), goal.size) epsilons.append(epsilon) # Make symbols for the target bounds (if set) if goal.has_target_min: min_variable = min_format.format(sym_index, j) # NOTE: When using a vector goal, we want to be sure that its constraints # and objective end up _exactly_ equal to its non-vector equivalent. We # therefore have to get rid of any superfluous/trivial constraints that # would otherwise be generated by the vector goal. target_min_slice_inds = np.full(goal.size, True) if isinstance(goal.target_min, Timeseries): target_min = Timeseries(goal.target_min.times, goal.target_min.values) inds = np.logical_or( np.isnan(target_min.values), np.isneginf(target_min.values) ) target_min.values[inds] = -sys.float_info.max n_times = len(goal.target_min.times) target_min_slice_inds = ~np.all( np.broadcast_to(inds.transpose(), (goal.size, n_times)), axis=1 ) elif isinstance(goal.target_min, np.ndarray): target_min = goal.target_min.copy() inds = np.logical_or(np.isnan(target_min), np.isneginf(target_min)) target_min[inds] = -sys.float_info.max target_min_slice_inds = ~inds else: target_min = goal.target_min extra_constants.append((min_variable, target_min)) else: min_variable = None if goal.has_target_max: max_variable = max_format.format(sym_index, j) target_max_slice_inds = np.full(goal.size, True) if isinstance(goal.target_max, Timeseries): target_max = Timeseries(goal.target_max.times, goal.target_max.values) inds = np.logical_or( np.isnan(target_max.values), np.isposinf(target_max.values) ) target_max.values[inds] = sys.float_info.max n_times = len(goal.target_max.times) target_max_slice_inds = ~np.all( np.broadcast_to(inds.transpose(), (goal.size, n_times)), axis=1 ) elif isinstance(goal.target_max, np.ndarray): target_max = goal.target_max.copy() inds = np.logical_or(np.isnan(target_max), np.isposinf(target_max)) target_max[inds] = sys.float_info.max target_max_slice_inds = ~inds else: target_max = goal.target_max extra_constants.append((max_variable, target_max)) else: max_variable = None # Make objective for soft constraints and minimization goals if not goal.critical: if hasattr(goal, "_objective_func"): _objective_func = goal._objective_func elif goal.has_target_bounds: if is_path_goal and options["scale_by_problem_size"]: goal_m, goal_M = self._gp_min_max_arrays( goal, target_shape=len(self.times()) ) goal_active = np.isfinite(goal_m) | np.isfinite(goal_M) n_active = np.sum(goal_active.astype(int), axis=-1) # Avoid possible division by zero if goal is inactive n_active = np.maximum(n_active, 1) else: n_active = 1 def _objective_func( problem, ensemble_member, goal=goal, epsilon=epsilon, is_path_goal=is_path_goal, n_active=n_active, ): if is_path_goal: epsilon = problem.variable(epsilon.name()) else: epsilon = problem.extra_variable(epsilon.name(), ensemble_member) return goal.weight * ca.constpow(epsilon, goal.order) / n_active else: if is_path_goal and options["scale_by_problem_size"]: n_active = len(self.times()) else: n_active = 1 def _objective_func( problem, ensemble_member, goal=goal, is_path_goal=is_path_goal, n_active=n_active, ): f = goal.function(problem, ensemble_member) / goal.function_nominal return goal.weight * ca.constpow(f, goal.order) / n_active objectives.append(_objective_func) # Make constraints for goals with target bounds if goal.has_target_bounds: if goal.critical: for ensemble_member in range(self.ensemble_size): constraint = self._gp_goal_hard_constraint( goal, epsilon, None, ensemble_member, options, is_path_goal ) hard_constraints[ensemble_member].append(constraint) else: for ensemble_member in range(self.ensemble_size): # We use a violation variable formulation, with the violation # variables epsilon bounded between 0 and 1. def _soft_constraint_func( problem, target, bound, inds, goal=goal, epsilon=epsilon, ensemble_member=ensemble_member, is_path_constraint=is_path_goal, ): if is_path_constraint: target = problem.variable(target) eps = problem.variable(epsilon.name()) else: target = problem.parameters(ensemble_member)[target] eps = problem.extra_variable(epsilon.name(), ensemble_member) inds = inds.nonzero()[0].astype(int).tolist() f = goal.function(problem, ensemble_member) nominal = goal.function_nominal return ca.if_else( ca.fabs(target) < sys.float_info.max, (f - eps * (bound - target) - target) / nominal, 0.0, )[inds] if goal.has_target_min and np.any(target_min_slice_inds): _f = functools.partial( _soft_constraint_func, target=min_variable, bound=goal.function_range[0], inds=target_min_slice_inds, ) constraint = _GoalConstraint(goal, _f, 0.0, np.inf, False) soft_constraints[ensemble_member].append(constraint) if goal.has_target_max and np.any(target_max_slice_inds): _f = functools.partial( _soft_constraint_func, target=max_variable, bound=goal.function_range[1], inds=target_max_slice_inds, ) constraint = _GoalConstraint(goal, _f, -np.inf, 0.0, False) soft_constraints[ensemble_member].append(constraint) return epsilons, objectives, soft_constraints, hard_constraints, extra_constants def _gp_goal_hard_constraint( self, goal, epsilon, existing_constraint, ensemble_member, options, is_path_goal ): if not is_path_goal: epsilon = epsilon[:1] goal_m, goal_M = self._gp_min_max_arrays(goal, target_shape=epsilon.shape[0]) if goal.has_target_bounds: # We use a violation variable formulation, with the violation # variables epsilon bounded between 0 and 1. m, M = ( np.full_like(epsilon, -np.inf, dtype=np.float64), np.full_like(epsilon, np.inf, dtype=np.float64), ) # A function range does not have to be specified for critical # goals. Avoid multiplying with NaN in that case. if goal.has_target_min: m = ( epsilon * ((goal.function_range[0] - goal_m) if not goal.critical else 0.0) + goal_m - goal.relaxation ) / goal.function_nominal if goal.has_target_max: M = ( epsilon * ((goal.function_range[1] - goal_M) if not goal.critical else 0.0) + goal_M + goal.relaxation ) / goal.function_nominal if goal.has_target_min and goal.has_target_max: # Avoid comparing with NaN inds = ~(np.isnan(m) | np.isnan(M)) inds[inds] &= np.abs(m[inds] - M[inds]) < options["equality_threshold"] if np.any(inds): avg = 0.5 * (m + M) m[inds] = M[inds] = avg[inds] m[~np.isfinite(goal_m)] = -np.inf M[~np.isfinite(goal_M)] = np.inf inds = epsilon > options["violation_tolerance"] if np.any(inds): if is_path_goal: expr = self.map_path_expression( goal.function(self, ensemble_member), ensemble_member ) else: expr = goal.function(self, ensemble_member) function = ca.Function("f", [self.solver_input], [expr]) value = np.array(function(self.solver_output)) m[inds] = (value - goal.relaxation) / goal.function_nominal M[inds] = (value + goal.relaxation) / goal.function_nominal m -= options["constraint_relaxation"] M += options["constraint_relaxation"] else: # Epsilon encodes the position within the function range. if options["fix_minimized_values"] and goal.relaxation == 0.0: m = epsilon / goal.function_nominal M = epsilon / goal.function_nominal self.check_collocation_linearity = False self.linear_collocation = False else: m = -np.inf * np.ones(epsilon.shape) M = (epsilon + goal.relaxation) / goal.function_nominal + options[ "constraint_relaxation" ] if is_path_goal: m = Timeseries(self.times(), m) M = Timeseries(self.times(), M) else: m = m[0] M = M[0] constraint = _GoalConstraint( goal, lambda problem, ensemble_member=ensemble_member, goal=goal: ( goal.function(problem, ensemble_member) / goal.function_nominal ), m, M, True, ) # Epsilon is fixed. Override previous {min,max} constraints for this # state. if existing_constraint: constraint.update_bounds(existing_constraint, enforce="other") return constraint def _gp_update_constraint_store(self, constraint_store, constraints): for ensemble_member in range(self.ensemble_size): for other in constraints[ensemble_member]: fk = other.goal.get_function_key(self, ensemble_member) try: constraint_store[ensemble_member][fk].update_bounds(other) except KeyError: constraint_store[ensemble_member][fk] = other def priority_started(self, priority: int) -> None: """ Called when optimization for goals of certain priority is started. :param priority: The priority level that was started. """ self.skip_priority = False pass def priority_completed(self, priority: int) -> None: """ Called after optimization for goals of certain priority is completed. :param priority: The priority level that was completed. """ pass