Source code for rtctools.util

import cProfile
import logging
import os
import pstats
import re
import sys

import casadi

from . import __version__
from ._internal.alias_tools import OrderedSet
from .data import pi
from .optimization.csv_mixin import CSVMixin as OptimizationCSVMixin
from .optimization.io_mixin import IOMixin as OptimizationIOMixin
from .optimization.pi_mixin import PIMixin as OptimizationPIMixin
from .simulation.csv_mixin import CSVMixin as SimulationCSVMixin
from .simulation.io_mixin import IOMixin as SimulationIOMixin
from .simulation.pi_mixin import PIMixin as SimulationPIMixin


def _resolve_folder(kwargs, base_folder, subfolder_kw, default):
    subfolder = kwargs.pop(subfolder_kw, default)
    if os.path.isabs(subfolder):
        return subfolder
    else:
        return os.path.join(base_folder, subfolder)


[docs]def run_optimization_problem(optimization_problem_class, base_folder='..', log_level=logging.INFO, profile=False, **kwargs): """ Sets up and solves an optimization problem. This function makes the following assumptions: 1. That the ``base_folder`` contains subfolders ``input``, ``output``, and ``model``, containing input data, output data, and the model, respectively. 2. When using :class:`.CSVLookupTableMixin`, that the base folder contains a subfolder ``lookup_tables``. 3. When using :class:`.ModelicaMixin`, that the base folder contains a subfolder ``model``. 4. When using :class:`.ModelicaMixin`, that the toplevel Modelica model name equals the class name. :param optimization_problem_class: Optimization problem class to solve. :param base_folder: Base folder. :param log_level: The log level to use. :param profile: Whether or not to enable profiling. :returns: :class:`.OptimizationProblem` instance. """ # Do some checks on the problem class if issubclass(optimization_problem_class, SimulationCSVMixin): raise ValueError( "Optimization problem class cannot be derived from simulation.csv_mixin.CSVMixin.") if issubclass(optimization_problem_class, SimulationIOMixin): raise ValueError( "Optimization problem class cannot be derived from simulation.csv_mixin.IOMixin.") if issubclass(optimization_problem_class, SimulationPIMixin): raise ValueError( "Optimization problem class cannot be derived from simulation.csv_mixin.PIMixin.") # Set input/output folders if not os.path.isabs(base_folder): # Resolve base folder relative to script folder base_folder = os.path.join(sys.path[0], base_folder) model_folder = _resolve_folder(kwargs, base_folder, 'model_folder', 'model') input_folder = _resolve_folder(kwargs, base_folder, 'input_folder', 'input') output_folder = _resolve_folder(kwargs, base_folder, 'output_folder', 'output') # Set up logging logger = logging.getLogger("rtctools") # Add stream handler if it does not already exist. if not logger.hasHandlers() and not any((isinstance(h, logging.StreamHandler) for h in logger.handlers)): handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) # Add pi.DiagHandler, if using PIMixin. Only add it if it does not already exist. if (issubclass(optimization_problem_class, OptimizationPIMixin) and not any((isinstance(h, pi.DiagHandler) for h in logger.handlers))): handler = pi.DiagHandler(output_folder) logger.addHandler(handler) # Set log level logger.setLevel(log_level) # Log version info logger.info( "Using RTC-Tools {}.".format(__version__)) logger.debug( "Using CasADi {}.".format(casadi.__version__)) # Check for some common mistakes in inheritance order suggested_order = OrderedSet([ 'HomotopyMixin', 'MinAbsGoalProgrammingMixin', 'LinearizedOrderGoalProgrammingMixin', 'SinglePassGoalProgrammingMixin', 'GoalProgrammingMixin', 'PIMixin', 'CSVMixin', 'ModelicaMixin', 'ControlTreeMixin', 'CollocatedIntegratedOptimizationProblem', 'OptimizationProblem']) base_names = OrderedSet([b.__name__ for b in optimization_problem_class.__bases__]) if suggested_order & base_names != base_names & suggested_order: msg = 'Please inherit from base classes in the following order: {}'.format(list(base_names & suggested_order)) logger.error(msg) raise Exception(msg) # Run try: prob = optimization_problem_class( model_folder=model_folder, input_folder=input_folder, output_folder=output_folder, **kwargs) if profile: filename = os.path.join(base_folder, "profile.prof") cProfile.runctx("prob.optimize()", globals(), locals(), filename) s = pstats.Stats(filename) s.strip_dirs().sort_stats("time").print_stats() else: prob.optimize() return prob except Exception as e: logger.error(str(e)) if isinstance(e, TypeError): exc_info = sys.exc_info() value = exc_info[1] try: failed_class = re.search( "Can't instantiate (.*) with abstract methods", str(value)).group(1) abstract_method = re.search( ' with abstract methods (.*)', str(value)).group(1) logger.error( 'The {} is missing a mixin. Please add a mixin that instantiates ' 'abstract method {}, so that the optimizer can run.'.format( failed_class, abstract_method)) except Exception: pass for handler in logger.handlers: handler.flush() raise
[docs]def run_simulation_problem(simulation_problem_class, base_folder='..', log_level=logging.INFO, **kwargs): """ Sets up and runs a simulation problem. :param simulation_problem_class: Simulation problem class to solve. :param base_folder: Folder within which subfolders "input", "output", and "model" exist, containing input and output data, and the model, respectively. :param log_level: The log level to use. :returns: :class:`SimulationProblem` instance. """ # Do some checks on the problem class if issubclass(simulation_problem_class, OptimizationCSVMixin): raise ValueError( "Simulation problem class cannot be derived from optimization.csv_mixin.CSVMixin.") if issubclass(simulation_problem_class, OptimizationIOMixin): raise ValueError( "Simulation problem class cannot be derived from optimization.csv_mixin.IOMixin.") if issubclass(simulation_problem_class, OptimizationPIMixin): raise ValueError( "Simulation problem class cannot be derived from optimization.csv_mixin.PIMixin.") # Set input/output folders if base_folder is None: # Check command line arguments if len(sys.argv) != 2: raise Exception("Usage: {} BASE_FOLDER".format(sys.argv[0])) base_folder = sys.argv[1] else: if not os.path.isabs(base_folder): # Resolve base folder relative to script folder base_folder = os.path.join(sys.path[0], base_folder) model_folder = _resolve_folder(kwargs, base_folder, 'model_folder', 'model') input_folder = _resolve_folder(kwargs, base_folder, 'input_folder', 'input') output_folder = _resolve_folder(kwargs, base_folder, 'output_folder', 'output') # Set up logging logger = logging.getLogger("rtctools") if not logger.hasHandlers() and not any((isinstance(h, logging.StreamHandler) for h in logger.handlers)): handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) # Add pi.DiagHandler, if using PIMixin. Only add it if it does not already exist. if (issubclass(simulation_problem_class, SimulationPIMixin) and not any((isinstance(h, pi.DiagHandler) for h in logger.handlers))): handler = pi.DiagHandler(output_folder) logger.addHandler(handler) logger.setLevel(log_level) logger.info( 'Using RTC-Tools {}'.format(__version__)) logger.debug( 'Using CasADi {}.'.format(casadi.__version__)) # Run prob = simulation_problem_class( model_folder=model_folder, input_folder=input_folder, output_folder=output_folder, **kwargs) prob.simulate() return prob