Source code for rtctools.optimization.min_abs_goal_programming_mixin

import functools
import itertools
from typing import List

import casadi as ca
import numpy as np

from .goal_programming_mixin import GoalProgrammingMixin
from .goal_programming_mixin_base import (
    Goal,
    StateGoal,
    _EmptyEnsembleList,
    _EmptyEnsembleOrderedDict,
    _GoalConstraint,
    _GoalProgrammingMixinBase,
)
from .single_pass_goal_programming_mixin import SinglePassGoalProgrammingMixin
from .timeseries import Timeseries


[docs] class MinAbsGoal(Goal): """ Absolute minimization goal class which can be used to minimize the absolute value of the goal's (linear) goal function. Contrary to its super class, the default order is 1 as absolute minimization is typically desired for fully linear problems. """ order = 1
[docs] class MinAbsStateGoal(StateGoal, MinAbsGoal): pass
class _ConvertedMinAbsGoal(Goal): order = 1 def __init__(self, abs_variable, is_path_goal, orig_goal): self.abs_variable = abs_variable self.is_path_goal = is_path_goal self.orig_goal = orig_goal # Copy relevant properties self.size = orig_goal.size self.weight = orig_goal.weight self.relaxation = orig_goal.relaxation / orig_goal.function_nominal self.priority = orig_goal.priority def function(self, optimization_problem, ensemble_member): if self.is_path_goal: return optimization_problem.variable(self.abs_variable.name()) else: return optimization_problem.extra_variable(self.abs_variable.name(), ensemble_member)
[docs] class MinAbsGoalProgrammingMixin(_GoalProgrammingMixinBase): """ Similar behavior to :py:class:`.GoalProgrammingMixin`, but any :py:class:`MinAbsGoal` passed to :py:meth:`.min_abs_goals` or :py:meth:`.min_abs_path_goals` will be automatically converted to: 1. An auxiliary minimization variable 2. Two additional linear constraints relating the auxiliary variable to the goal function 3. A new goal (of a different type) minimizing the auxiliary variable """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # List for any absolute minimization goals self.__problem_constraints = _EmptyEnsembleList() self.__problem_vars = [] self.__problem_path_constraints = _EmptyEnsembleList() self.__problem_path_vars = [] self.__seeds = _EmptyEnsembleOrderedDict() self.__path_seeds = _EmptyEnsembleOrderedDict() self.__first_run = True @property def extra_variables(self): return super().extra_variables + self.__problem_vars @property def path_variables(self): return super().path_variables + self.__problem_path_vars def bounds(self): bounds = super().bounds() for abs_var in self.__problem_vars + self.__problem_path_vars: bounds[abs_var.name()] = (0.0, np.inf) return bounds def seed(self, ensemble_member): seed = super().seed(ensemble_member) # Seed minimization variables of current priority (not those of # previous priorities, as those are handled by GoalProgrammingMixin). for abs_var, val in self.__seeds[ensemble_member].items(): seed[abs_var] = val times = self.times() for abs_var, val in self.__path_seeds[ensemble_member].items(): seed[abs_var] = Timeseries(times, val) return seed def constraints(self, ensemble_member): constraints = super().constraints(ensemble_member) for constraint in self.__problem_constraints[ensemble_member]: constraints.append((constraint.function(self), constraint.min, constraint.max)) return constraints def path_constraints(self, ensemble_member): path_constraints = super().path_constraints(ensemble_member) for constraint in self.__problem_path_constraints[ensemble_member]: path_constraints.append((constraint.function(self), constraint.min, constraint.max)) return path_constraints def __validate_goals(self, goals, is_path_goal): goals = sorted(goals, key=lambda x: x.priority) for goal in goals: if not isinstance(goal, MinAbsGoal): raise Exception( "Absolute goal not an instance of MinAbsGoal for goal {}".format(goal) ) if goal.function_range != (np.nan, np.nan): raise Exception( "Absolute goal function is only allowed for minimization for goal {}".format( goal ) ) if goal.order != 1: raise Exception( "Absolute goal function is only allowed for order = 1 for goal {}".format(goal) ) if goal.weight <= 0: raise Exception( "Absolute goal function is only allowed for weight > 0 for goal {}".format(goal) ) @staticmethod def __convert_goals(goals, sym_index, ensemble_size, is_path_goal): # Replace absolute minimization goals with a new goal, and some # additional hard constraints. constraints = [[] for ensemble_member in range(ensemble_size)] variables = [] # It is easier to modify goals in place, but we do not want to modify # the original input list of goals. Make a copy to work with and # return when we are done. goals = goals.copy() for j, goal in enumerate(goals): assert isinstance(goal, MinAbsGoal) abs_variable_name = "abs_{}_{}".format(sym_index, j) if is_path_goal: abs_variable_name = "path_" + abs_variable_name abs_variable = ca.MX.sym(abs_variable_name, goal.size) variables.append(abs_variable) # Set constraints on how the additional variable relates to the # original goal function, such that it corresponds to its absolute # value when minimizing. for ensemble_member in range(ensemble_size): def _constraint_func( problem, sign, abs_variable=abs_variable, ensemble_member=ensemble_member, goal=goal, is_path_goal=is_path_goal, ): if is_path_goal: abs_variable = problem.variable(abs_variable.name()) else: abs_variable = problem.extra_variable(abs_variable.name(), ensemble_member) return ( abs_variable + sign * goal.function(problem, ensemble_member) / goal.function_nominal ) _pos = functools.partial(_constraint_func, sign=1) _neg = functools.partial(_constraint_func, sign=-1) constraints[ensemble_member].append(_GoalConstraint(None, _pos, 0.0, np.inf, False)) constraints[ensemble_member].append(_GoalConstraint(None, _neg, 0.0, np.inf, False)) # Overwrite the original goal, such that it is just a minimization # of the additional variable. goals[j] = _ConvertedMinAbsGoal(abs_variable, is_path_goal, goal) return goals, constraints, variables def __calculate_seed(self, goals, is_path_goal): assert self.__first_run is False seed = [{} for ensemble_member in range(self.ensemble_size)] for goal in goals: assert isinstance(goal, _ConvertedMinAbsGoal) for ensemble_member in range(self.ensemble_size): if is_path_goal: expr = self.map_path_expression( goal.orig_goal.function(self, ensemble_member), ensemble_member ) else: expr = goal.orig_goal.function(self, ensemble_member) function = ca.Function("f", [self.solver_input], [expr]) value = np.array(function(self.solver_output)) assert value.ndim == 2 if goal.size == 1: if is_path_goal: value = value.ravel() else: value = value.item() seed[ensemble_member][goal.abs_variable.name()] = np.abs(value) return seed def optimize(self, preprocessing=True, **kwargs): # Do pre-processing if preprocessing: self.pre() goals = self.min_abs_goals() path_goals = self.min_abs_path_goals() # Validate goal definitions self.__validate_goals(goals, is_path_goal=False) self.__validate_goals(path_goals, is_path_goal=True) # List for absolute minimization goals. These will be incrementally # filled only just before we need them to. self.__problem_constraints = [[] for ensemble_member in range(self.ensemble_size)] self.__problem_vars = [] self.__problem_path_constraints = [[] for ensemble_member in range(self.ensemble_size)] self.__problem_path_vars = [] # Similar to the above, but these keep track of all auxiliary # variables and constraints of priorities we have had (and therefore # activated), and those yet to come (and have not yet activated). self.__subproblem_constraints = {} self.__subproblem_vars = {} self.__subproblem_abs_goals = {} self.__subproblem_path_constraints = {} self.__subproblem_path_vars = {} self.__subproblem_path_abs_goals = {} # We want to have consistent naming with GPMixin for our auxiliary # variables. We therefore need to loop over all priorities, regardless # of whether there are any MinAbsGoals in it or not. priorities = { int(goal.priority) for goal in itertools.chain(goals, path_goals, self.goals(), self.path_goals()) if not goal.is_empty } subproblems = [] for priority in sorted(priorities): subproblems.append( ( priority, [ goal for goal in goals if int(goal.priority) == priority and not goal.is_empty ], [ goal for goal in path_goals if int(goal.priority) == priority and not goal.is_empty ], ) ) # Rewrite absolute minimization goals. self.__converted_goals = [] self.__converted_path_goals = [] for i, (priority, goals, path_goals) in enumerate(subproblems): ( goals, self.__subproblem_constraints[priority], self.__subproblem_vars[priority], ) = self.__convert_goals(goals, i, self.ensemble_size, False) self.__converted_goals.extend(goals) self.__subproblem_abs_goals[priority] = goals ( path_goals, self.__subproblem_path_constraints[priority], self.__subproblem_path_vars[priority], ) = self.__convert_goals(path_goals, i, self.ensemble_size, True) self.__converted_path_goals.extend(path_goals) self.__subproblem_path_abs_goals[priority] = path_goals return super().optimize(**kwargs, preprocessing=False) def priority_started(self, priority): super().priority_started(priority) # Enable constraints and auxiliary variables that we need starting # from this priority when using GoalProgrammingMixin. When using # SinglePassGoalProgrammingMixin, we need to add all constraints from # the start. if isinstance(self, GoalProgrammingMixin): priorities = [priority] elif isinstance(self, SinglePassGoalProgrammingMixin): if self.__first_run: priorities = self.__subproblem_constraints.keys() else: priorities = [] for p in priorities: for a, b in zip(self.__problem_constraints, self.__subproblem_constraints[p]): a.extend(b) self.__problem_vars.extend(self.__subproblem_vars[p]) for a, b in zip(self.__problem_path_constraints, self.__subproblem_path_constraints[p]): a.extend(b) self.__problem_path_vars.extend(self.__subproblem_path_vars[p]) # Calculate the seed needed for goals/variables introduced in this # priority. We can only calculate a seed if this is not the first # priority. if not self.__first_run and isinstance(self, GoalProgrammingMixin): self.__seeds = self.__calculate_seed(self.__subproblem_abs_goals[priority], False) self.__path_seeds = self.__calculate_seed( self.__subproblem_path_abs_goals[priority], True ) self.__first_run = False
[docs] def min_abs_goals(self) -> List[MinAbsGoal]: """ User problem returns list of :py:class:`MinAbsGoal` objects. :returns: A list of goals. """ return []
def goals(self): goals = super().goals() try: return goals + self.__converted_goals except AttributeError: return goals
[docs] def min_abs_path_goals(self) -> List[MinAbsGoal]: """ User problem returns list of :py:class:`MinAbsGoal` objects. :returns: A list of goals. """ return []
def path_goals(self): goals = super().path_goals() try: return goals + self.__converted_path_goals except AttributeError: return goals