Source code for rtctools.optimization.homotopy_mixin

import logging
from typing import Dict, Union

from .optimization_problem import OptimizationProblem
from .timeseries import Timeseries

logger = logging.getLogger("rtctools")


[docs] class HomotopyMixin(OptimizationProblem): """ Adds homotopy to your optimization problem. A homotopy is a continuous transformation between two optimization problems, parametrized by a single parameter :math:`\\theta \\in [0, 1]`. Homotopy may be used to solve non-convex optimization problems, by starting with a convex approximation at :math:`\\theta = 0.0` and ending with the non-convex problem at :math:`\\theta = 1.0`. .. note:: It is advised to look for convex reformulations of your problem, before resorting to a use of the (potentially expensive) homotopy process. """ def seed(self, ensemble_member): seed = super().seed(ensemble_member) options = self.homotopy_options() # Overwrite the seed only when the results of the latest run are # stored within this class. That is, when the GoalProgrammingMixin # class is not used or at the first run of the goal programming loop. if self.__theta > options["theta_start"] and getattr(self, "_gp_first_run", True): for key, result in self.__results[ensemble_member].items(): times = self.times(key) if (result.ndim == 1 and len(result) == len(times)) or ( result.ndim == 2 and result.shape[0] == len(times) ): # Only include seed timeseries which are consistent # with the specified time stamps. seed[key] = Timeseries(times, result) elif (result.ndim == 1 and len(result) == 1) or ( result.ndim == 2 and result.shape[0] == 1 ): seed[key] = result return seed def parameters(self, ensemble_member): parameters = super().parameters(ensemble_member) options = self.homotopy_options() try: # Only set the theta if we are in the optimization loop. We want # to avoid accidental usage of the parameter value in e.g. pre(). # Note that we use a try-except here instead of hasattr, to avoid # explicit name mangling. parameters[options["homotopy_parameter"]] = self.__theta except AttributeError: pass return parameters
[docs] def homotopy_options(self) -> Dict[str, Union[str, float]]: """ Returns a dictionary of options controlling the homotopy process. +------------------------+------------+---------------+ | Option | Type | Default value | +========================+============+===============+ | ``theta_start`` | ``float`` | ``0.0`` | +------------------------+------------+---------------+ | ``delta_theta_0`` | ``float`` | ``1.0`` | +------------------------+------------+---------------+ | ``delta_theta_min`` | ``float`` | ``0.01`` | +------------------------+------------+---------------+ | ``homotopy_parameter`` | ``string`` | ``theta`` | +------------------------+------------+---------------+ The homotopy process is controlled by the homotopy parameter in the model, specified by the option ``homotopy_parameter``. The homotopy parameter is initialized to ``theta_start``, and increases to a value of ``1.0`` with a dynamically changing step size. This step size is initialized with the value of the option ``delta_theta_0``. If this step size is too large, i.e., if the problem with the increased homotopy parameter fails to converge, the step size is halved. The process of halving terminates when the step size falls below the minimum value specified by the option ``delta_theta_min``. :returns: A dictionary of homotopy options. """ return { "theta_start": 0.0, "delta_theta_0": 1.0, "delta_theta_min": 0.01, "homotopy_parameter": "theta", }
def dynamic_parameters(self): dynamic_parameters = super().dynamic_parameters() if self.__theta > 0: # For theta = 0, we don't mark the homotopy parameter as being dynamic, # so that the correct sparsity structure is obtained for the linear model. options = self.homotopy_options() dynamic_parameters.append(self.variable(options["homotopy_parameter"])) return dynamic_parameters def optimize(self, preprocessing=True, postprocessing=True, log_solver_failure_as_error=True): # Pre-processing if preprocessing: self.pre() options = self.homotopy_options() delta_theta = options["delta_theta_0"] # Homotopy loop self.__theta = options["theta_start"] while self.__theta <= 1.0: logger.info("Solving with homotopy parameter theta = {}.".format(self.__theta)) success = super().optimize( preprocessing=False, postprocessing=False, log_solver_failure_as_error=False ) if success: self.__results = [ self.extract_results(ensemble_member) for ensemble_member in range(self.ensemble_size) ] if self.__theta == 0.0: self.check_collocation_linearity = False self.linear_collocation = False # Recompute the sparsity structure for the nonlinear model family. self.clear_transcription_cache() else: if self.__theta == options["theta_start"]: break self.__theta -= delta_theta delta_theta /= 2 if delta_theta < options["delta_theta_min"]: failure_message = ( "Solver failed with homotopy parameter theta = {}. Theta cannot " "be decreased further, as that would violate the minimum delta " "theta of {}.".format(self.__theta, options["delta_theta_min"]) ) if log_solver_failure_as_error: logger.error(failure_message) else: # In this case we expect some higher level process to deal # with the solver failure, so we only log it as info here. logger.info(failure_message) break self.__theta += delta_theta # Post-processing if postprocessing: self.post() return success