Source code for rtctools.optimization.modelica_mixin

import itertools
import logging
from typing import Dict, Union

import casadi as ca

import numpy as np

import pkg_resources

import pymoca
import pymoca.backends.casadi.api

from rtctools._internal.alias_tools import AliasDict
from rtctools._internal.caching import cached
from rtctools._internal.casadi_helpers import substitute_in_external

from .optimization_problem import OptimizationProblem
from .timeseries import Timeseries

logger = logging.getLogger("rtctools")


[docs]class ModelicaMixin(OptimizationProblem): """ Adds a `Modelica <http://www.modelica.org/>`_ model to your optimization problem. During preprocessing, the Modelica files located inside the ``model`` subfolder are loaded. :cvar modelica_library_folders: Folders in which any referenced Modelica libraries are to be found. Default is an empty list. """ # Folders in which the referenced Modelica libraries are found modelica_library_folders = [] def __init__(self, **kwargs): # Check arguments assert ('model_folder' in kwargs) # Log pymoca version logger.debug("Using pymoca {}.".format(pymoca.__version__)) # Transfer model from the Modelica .mo file to CasADi using pymoca if 'model_name' in kwargs: model_name = kwargs['model_name'] else: if hasattr(self, 'model_name'): model_name = self.model_name else: model_name = self.__class__.__name__ self.__pymoca_model = pymoca.backends.casadi.api.transfer_model( kwargs['model_folder'], model_name, self.compiler_options()) # Extract the CasADi MX variables used in the model self.__mx = {} self.__mx['time'] = [self.__pymoca_model.time] self.__mx['states'] = [v.symbol for v in self.__pymoca_model.states] self.__mx['derivatives'] = [v.symbol for v in self.__pymoca_model.der_states] self.__mx['algebraics'] = [v.symbol for v in self.__pymoca_model.alg_states] self.__mx['parameters'] = [v.symbol for v in self.__pymoca_model.parameters] self.__mx['string_parameters'] = [v.name for v in (*self.__pymoca_model.string_parameters, *self.__pymoca_model.string_constants)] self.__mx['control_inputs'] = [] self.__mx['constant_inputs'] = [] self.__mx['lookup_tables'] = [] # Merge with user-specified delayed feedback for v in self.__pymoca_model.inputs: if v.symbol.name() in self.__pymoca_model.delay_states: # Delayed feedback variables are local to each ensemble, and # therefore belong to the collection of algebraic variables, # rather than to the control inputs. self.__mx['algebraics'].append(v.symbol) else: if v.symbol.name() in kwargs.get('lookup_tables', []): self.__mx['lookup_tables'].append(v.symbol) elif v.fixed: self.__mx['constant_inputs'].append(v.symbol) else: self.__mx['control_inputs'].append(v.symbol) # Initialize nominals and types # These are not in @cached dictionary properties for backwards compatibility. self.__python_types = AliasDict(self.alias_relation) for v in itertools.chain( self.__pymoca_model.states, self.__pymoca_model.alg_states, self.__pymoca_model.inputs): self.__python_types[v.symbol.name()] = v.python_type # Initialize dae, initial residuals, as well as delay arguments # These are not in @cached dictionary properties so that we need to create the list # of function arguments only once. variable_lists = ['states', 'der_states', 'alg_states', 'inputs', 'constants', 'parameters'] function_arguments = [self.__pymoca_model.time] + [ ca.veccat(*[v.symbol for v in getattr(self.__pymoca_model, variable_list)]) for variable_list in variable_lists] self.__dae_residual = self.__pymoca_model.dae_residual_function(*function_arguments) if self.__dae_residual is None: self.__dae_residual = ca.MX() self.__initial_residual = self.__pymoca_model.initial_residual_function(*function_arguments) if self.__initial_residual is None: self.__initial_residual = ca.MX() # Log variables in debug mode if logger.getEffectiveLevel() == logging.DEBUG: logger.debug("ModelicaMixin: Found states {}".format( ', '.join([var.name() for var in self.__mx['states']]))) logger.debug("ModelicaMixin: Found derivatives {}".format( ', '.join([var.name() for var in self.__mx['derivatives']]))) logger.debug("ModelicaMixin: Found algebraics {}".format( ', '.join([var.name() for var in self.__mx['algebraics']]))) logger.debug("ModelicaMixin: Found control inputs {}".format( ', '.join([var.name() for var in self.__mx['control_inputs']]))) logger.debug("ModelicaMixin: Found constant inputs {}".format( ', '.join([var.name() for var in self.__mx['constant_inputs']]))) logger.debug("ModelicaMixin: Found parameters {}".format( ', '.join([var.name() for var in self.__mx['parameters']]))) # Call parent class first for default behaviour. super().__init__(**kwargs) @cached def compiler_options(self) -> Dict[str, Union[str, bool]]: """ Subclasses can configure the `pymoca <http://github.com/pymoca/pymoca>`_ compiler options here. :returns: A dictionary of pymoca compiler options. See the pymoca documentation for details. """ # Default options compiler_options = {} # Expand vector states to multiple scalar component states. compiler_options['expand_vectors'] = True # Where imported model libraries are located. library_folders = self.modelica_library_folders.copy() for ep in pkg_resources.iter_entry_points(group='rtctools.libraries.modelica'): if ep.name == "library_folder": library_folders.append( pkg_resources.resource_filename(ep.module_name, ep.attrs[0])) compiler_options['library_folders'] = library_folders # Eliminate equations of the type 'var = const'. compiler_options['eliminate_constant_assignments'] = True # Eliminate constant symbols from model, replacing them with the values # specified in the model. compiler_options['replace_constant_values'] = True # Replace any constant expressions into the model. compiler_options['replace_constant_expressions'] = True # Replace any parameter expressions into the model. compiler_options['replace_parameter_expressions'] = True # Eliminate variables starting with underscores. compiler_options['eliminable_variable_expression'] = r'(.*[.]|^)_\w+(\[[\d,]+\])?\Z' # Pymoca currently requires `expand_mx` to be set for # `eliminable_variable_expression` to work. compiler_options['expand_mx'] = True # Automatically detect and eliminate alias variables. compiler_options['detect_aliases'] = True # Disallow aliasing to derivative states compiler_options['allow_derivative_aliases'] = False # Cache the model on disk compiler_options['cache'] = True # Done return compiler_options def delayed_feedback(self): delayed_feedback = super().delayed_feedback() # Create delayed feedback for delay_state, delay_argument in zip(self.__pymoca_model.delay_states, self.__pymoca_model.delay_arguments): delayed_feedback.append( (delay_argument.expr, delay_state, delay_argument.duration)) return delayed_feedback @property def dae_residual(self): return self.__dae_residual @property def dae_variables(self): return self.__mx @property @cached def output_variables(self): output_variables = [ca.MX.sym(variable) for variable in self.__pymoca_model.outputs] output_variables.extend(self.__mx['control_inputs']) return output_variables @cached def parameters(self, ensemble_member): # Call parent class first for default values. parameters = super().parameters(ensemble_member) # Return parameter values from pymoca model parameters.update({v.symbol.name(): v.value for v in self.__pymoca_model.parameters}) # Done return parameters @cached def string_parameters(self, ensemble_member): # Call parent class first for default values. parameters = super().string_parameters(ensemble_member) # Return parameter values from pymoca model parameters.update({v.name: v.value for v in self.__pymoca_model.string_parameters}) parameters.update({v.name: v.value for v in self.__pymoca_model.string_constants}) # Done return parameters @cached def history(self, ensemble_member): history = super().history(ensemble_member) initial_time = np.array([self.initial_time]) # Parameter values parameters = self.parameters(ensemble_member) parameter_values = [parameters.get(param.name(), param) for param in self.__mx['parameters']] # Initial conditions obtained from start attributes. for v in self.__pymoca_model.states: if v.fixed: sym_name = v.symbol.name() start = v.start if isinstance(start, ca.MX): # If start contains symbolics, try substituting parameter values if isinstance(start, ca.MX) and not start.is_constant(): [start] = substitute_in_external([start], self.__mx['parameters'], parameter_values) if not start.is_constant() or np.isnan(float(start)): raise Exception('ModelicaMixin: Could not resolve initial value for {}'.format(sym_name)) start = v.python_type(start) history[sym_name] = Timeseries(initial_time, start) if logger.getEffectiveLevel() == logging.DEBUG: logger.debug("ModelicaMixin: Initial state variable {} = {}".format(sym_name, start)) return history @property def initial_residual(self): return self.__initial_residual @cached def bounds(self): # Call parent class first for default values. bounds = super().bounds() # Parameter values parameters = self.parameters(0) parameter_values = [parameters.get(param.name(), param) for param in self.__mx['parameters']] # Load additional bounds from model for v in itertools.chain( self.__pymoca_model.states, self.__pymoca_model.alg_states, self.__pymoca_model.inputs): sym_name = v.symbol.name() try: (m, M) = bounds[sym_name] except KeyError: if self.__python_types.get(sym_name, float) == bool: (m, M) = (0, 1) else: (m, M) = (-np.inf, np.inf) m_ = v.min if isinstance(m_, ca.MX) and not m_.is_constant(): [m_] = substitute_in_external([m_], self.__mx['parameters'], parameter_values) if not m_.is_constant() or np.isnan(float(m_)): raise Exception('Could not resolve lower bound for variable {}'.format(sym_name)) m_ = float(m_) M_ = v.max if isinstance(M_, ca.MX) and not M_.is_constant(): [M_] = substitute_in_external([M_], self.__mx['parameters'], parameter_values) if not M_.is_constant() or np.isnan(float(M_)): raise Exception('Could not resolve upper bound for variable {}'.format(sym_name)) M_ = float(M_) # We take the intersection of all provided bounds m = max(m, m_) M = min(M, M_) bounds[sym_name] = (m, M) return bounds @cached def seed(self, ensemble_member): # Call parent class first for default values. seed = super().seed(ensemble_member) # Parameter values parameters = self.parameters(ensemble_member) parameter_values = [parameters.get(param.name(), param) for param in self.__mx['parameters']] # Load seeds for var in itertools.chain(self.__pymoca_model.states, self.__pymoca_model.alg_states): if var.fixed: # Values will be set from import timeseries continue start = var.start if isinstance(start, ca.MX) or start != 0.0: sym_name = var.symbol.name() # If start contains symbolics, try substituting parameter values if isinstance(start, ca.MX) and not start.is_constant(): [start] = substitute_in_external([start], self.__mx['parameters'], parameter_values) if not start.is_constant() or np.isnan(float(start)): logger.error('ModelicaMixin: Could not resolve seed value for {}'.format(sym_name)) continue times = self.times(sym_name) start = var.python_type(start) s = Timeseries(times, np.full_like(times, start)) if logger.getEffectiveLevel() == logging.DEBUG: logger.debug("ModelicaMixin: Seeded variable {} = {}".format(sym_name, start)) seed[sym_name] = s return seed def variable_is_discrete(self, variable): return self.__python_types.get(variable, float) != float @property @cached def alias_relation(self): return self.__pymoca_model.alias_relation @property @cached def __nominals(self): # Make the dict nominal_dict = AliasDict(self.alias_relation) # Grab parameters and their values parameters = self.parameters(0) parameter_values = [parameters.get(param.name(), param) for param in self.__mx['parameters']] # Iterate over nominalizable states for v in itertools.chain( self.__pymoca_model.states, self.__pymoca_model.alg_states, self.__pymoca_model.inputs): sym_name = v.symbol.name() nominal = v.nominal # If nominal contains parameter symbols, substitute them if isinstance(nominal, ca.MX) and not nominal.is_constant(): [nominal] = substitute_in_external([nominal], self.__mx['parameters'], parameter_values) if not nominal.is_constant() or np.isnan(float(nominal)): logger.error('ModelicaMixin: Could not resolve nominal value for {}'.format(sym_name)) continue nominal = float(nominal) if not np.isnan(nominal): # Take absolute value (nominal sign is meaningless- a nominal is a magnitude) nominal = abs(nominal) # If nominal is 0 or 1, we just use the default (1.0) if nominal in (0.0, 1.0): continue nominal_dict[sym_name] = nominal if logger.getEffectiveLevel() == logging.DEBUG: logger.debug("ModelicaMixin: Set nominal value for variable {} to {}".format( sym_name, nominal)) else: logger.warning("ModelicaMixin: Could not set nominal value for {}".format(sym_name)) return nominal_dict def variable_nominal(self, variable): try: return self.__nominals[variable] except KeyError: return super().variable_nominal(variable)