import copy
import itertools
import logging
import math
from collections import OrderedDict
from typing import List, Union
import casadi as ca
import numpy as np
import pkg_resources
import pymoca
import pymoca.backends.casadi.api
from rtctools._internal.alias_tools import AliasDict
from rtctools._internal.caching import cached
from rtctools._internal.debug_check_helpers import DebugLevel
from rtctools.data.storage import DataStoreAccessor
logger = logging.getLogger("rtctools")
class Variable:
"""
Modeled after the Variable class in pymoca.backends.casadi.model, with modifications to make it
easier for the common case in RTC-Tools to instantiate them.
That means:
- pass in name instead of ca.MX symbol
- only scalars are allowed (shape = (1, 1))
- no aliases
- no "python_type"
- able to specify nominal/min/max in constructor
"""
def __init__(self, name, /, min=-np.inf, max=np.inf, nominal=1.0):
self.name = name
self.min = min
self.max = max
self.nominal = nominal
self._symbol = ca.MX.sym(name)
@property
def symbol(self):
return self._symbol
[docs]
class SimulationProblem(DataStoreAccessor):
"""
Implements the `BMI <http://csdms.colorado.edu/wiki/BMI_Description>`_ Interface.
Base class for all Simulation problems. Loads the Modelica Model.
:cvar modelica_library_folders: Folders containing any referenced Modelica libraries. Default
is an empty list.
"""
_debug_check_level = DebugLevel.MEDIUM
_debug_check_options = {}
# Folders in which the referenced Modelica libraries are found
modelica_library_folders = []
# Force workaround for delay support by assuming zero delay. This flag
# will be removed when proper delay support is added.
_force_zero_delay = False
def __init__(self, **kwargs):
# Check arguments
assert "model_folder" in kwargs
# Log pymoca version
logger.debug("Using pymoca {}.".format(pymoca.__version__))
# Transfer model from the Modelica .mo file to CasADi using pymoca
if "model_name" in kwargs:
model_name = kwargs["model_name"]
else:
if hasattr(self, "model_name"):
model_name = self.model_name
else:
model_name = self.__class__.__name__
# Load model from pymoca backend
self.__pymoca_model = pymoca.backends.casadi.api.transfer_model(
kwargs["model_folder"], model_name, self.compiler_options()
)
# Extract the CasADi MX variables used in the model
self.__mx = {}
self.__mx["time"] = [self.__pymoca_model.time]
self.__mx["states"] = [v.symbol for v in self.__pymoca_model.states]
self.__mx["derivatives"] = [v.symbol for v in self.__pymoca_model.der_states]
self.__mx["algebraics"] = [v.symbol for v in self.__pymoca_model.alg_states]
self.__mx["parameters"] = [v.symbol for v in self.__pymoca_model.parameters]
self.__mx["constant_inputs"] = []
self.__mx["lookup_tables"] = []
for v in self.__pymoca_model.inputs:
if v.symbol.name() in self.__pymoca_model.delay_states:
# Delayed feedback variables are local to each ensemble, and
# therefore belong to the collection of algebraic variables,
# rather than to the control inputs.
self.__mx["algebraics"].append(v.symbol)
else:
if v.symbol.name() in kwargs.get("lookup_tables", []):
self.__mx["lookup_tables"].append(v.symbol)
else:
# All inputs are constant inputs
self.__mx["constant_inputs"].append(v.symbol)
# Set timestep size
self._dt_is_fixed = False
self.__dt = None
fixed_dt = kwargs.get("fixed_dt", None)
if fixed_dt is not None:
self._dt_is_fixed = True
self.__dt = fixed_dt
# Add auxiliary variables for keeping track of delay expressions to the algebraic states
n_delay_states = len(self.__pymoca_model.delay_states)
self.__delay_times = []
if n_delay_states > 0:
if fixed_dt is None and not self._force_zero_delay:
raise ValueError("fixed_dt should be set when using delay equations.")
self.__delay_times = self._get_delay_times()
delay_expression_states = self._create_delay_expression_states()
self.__mx["algebraics"] += delay_expression_states
# Log variables in debug mode
if logger.getEffectiveLevel() == logging.DEBUG:
logger.debug(
"SimulationProblem: Found states {}".format(
", ".join([var.name() for var in self.__mx["states"]])
)
)
logger.debug(
"SimulationProblem: Found derivatives {}".format(
", ".join([var.name() for var in self.__mx["derivatives"]])
)
)
logger.debug(
"SimulationProblem: Found algebraics {}".format(
", ".join([var.name() for var in self.__mx["algebraics"]])
)
)
logger.debug(
"SimulationProblem: Found constant inputs {}".format(
", ".join([var.name() for var in self.__mx["constant_inputs"]])
)
)
logger.debug(
"SimulationProblem: Found parameters {}".format(
", ".join([var.name() for var in self.__mx["parameters"]])
)
)
# Get the extra variables that are user defined
self.__extra_variables = self.extra_variables()
self.__extra_variables_symbols = [v.symbol for v in self.__extra_variables]
# Store the types in an AliasDict
self.__python_types = AliasDict(self.alias_relation)
model_variable_types = [
"states",
"der_states",
"alg_states",
"inputs",
"constants",
"parameters",
]
for t in model_variable_types:
for v in getattr(self.__pymoca_model, t):
self.__python_types[v.symbol.name()] = v.python_type
# Store the nominals in an AliasDict
self.__nominals = AliasDict(self.alias_relation)
for v in itertools.chain(self.__pymoca_model.states, self.__pymoca_model.alg_states):
sym_name = v.symbol.name()
# If the nominal is 0.0 or 1.0 or -1.0, ignore: get_variable_nominal returns a default
# of 1.0
# TODO: handle nominal vectors (update() will need to load them)
if (
ca.MX(v.nominal).is_zero()
or ca.MX(v.nominal - 1).is_zero()
or ca.MX(v.nominal + 1).is_zero()
):
continue
else:
if ca.MX(v.nominal).size1() != 1:
logger.error("Vector Nominals not supported yet. ({})".format(sym_name))
self.__nominals[sym_name] = ca.fabs(v.nominal)
if logger.getEffectiveLevel() == logging.DEBUG:
logger.debug(
"SimulationProblem: Setting nominal value for variable {} to {}".format(
sym_name, self.__nominals[sym_name]
)
)
for v in self.__extra_variables:
self.__nominals[v.name] = v.nominal
# Initialize DAE and initial residuals
variable_lists = ["states", "der_states", "alg_states", "inputs", "constants", "parameters"]
function_arguments = [self.__pymoca_model.time] + [
ca.veccat(*[v.symbol for v in getattr(self.__pymoca_model, variable_list)])
for variable_list in variable_lists
]
self.__dae_residual = self.__pymoca_model.dae_residual_function(*function_arguments)
if self.__dae_residual is None:
# DAE is empty, that can happen if we add the only (non-aliasing) equations
# in Python.
self.__dae_residual = ca.MX()
self.__initial_residual = self.__pymoca_model.initial_residual_function(*function_arguments)
if self.__initial_residual is None:
self.__initial_residual = ca.MX()
# Construct state vector
self.__sym_list = (
self.__mx["states"]
+ self.__mx["algebraics"]
+ self.__mx["derivatives"]
+ self.__extra_variables_symbols
+ self.__mx["time"]
+ self.__mx["constant_inputs"]
+ self.__mx["parameters"]
)
n_elements = np.array([var.numel() for var in self.__sym_list])
i_end = n_elements.cumsum()
i_start = np.array([0, *(i_end[:-1])])
self.__state_vector = np.full(n_elements.sum(), np.nan)
# A very handy index
self.__n_state_symbols = (
len(self.__mx["states"])
+ len(self.__mx["algebraics"])
+ len(self.__mx["derivatives"])
+ len(self.__extra_variables)
)
self.__n_states = i_end[self.__n_state_symbols - 1]
# NOTE: Backwards compatibility allowing set_var() for parameters. These
# variables check that this is only done before calling initialize().
self.__parameters = AliasDict(self.alias_relation)
self.__parameters.update({v.name(): v for v in self.__mx["parameters"]})
self.__parameters_set_var = True
# Construct a dict to look up symbols by name (or iterate over)
self.__sym_dict = OrderedDict(((sym.name(), sym) for sym in self.__sym_list))
# Generate a dictionary that we can use to lookup the index in the state vector.
# To avoid repeated and relatively expensive `canonical_signed` calls, we
# make a dictionary for all variables and their aliases.
self.__i_start = {}
self.__i_end = {}
for i, k in enumerate(self.__sym_dict.keys()):
for alias in self.alias_relation.aliases(k):
if alias.startswith("-"):
self.__i_start[alias[1:]] = (i_start[i], -1.0)
self.__i_end[alias[1:]] = (i_end[i], -1.0)
else:
self.__i_start[alias] = (i_start[i], 1.0)
self.__i_end[alias] = (i_end[i], 1.0)
self.__indices = self.__i_start
# Call parent class for default behaviour.
super().__init__(**kwargs)
def _get_delay_times(self):
"""
Get the delay times for each delay equation.
"""
if self._force_zero_delay:
return [0] * len(self.__pymoca_model.delay_states)
parameter_symbols = [v.symbol for v in self.__pymoca_model.parameters]
parameter_values = [v.value for v in self.__pymoca_model.parameters]
delay_time_expressions = [
delay_arg.duration for delay_arg in self.__pymoca_model.delay_arguments
]
delay_time_fun = ca.Function(
"delay_time_function", parameter_symbols, delay_time_expressions
)
delay_time_values = delay_time_fun(*parameter_values)
if len(delay_time_expressions) == 1:
return [delay_time_values]
return list(delay_time_values)
def _create_delay_expression_states(self):
"""
Create auxiliary states for delay equations.
Create states to keep track of the history of delay expressions.
For example, if we have a delay equation of the form
.. math::
x = delay(5 * y, 2 * dt),
Then we need variables to store :math:`5 * y` at time :math`t` and time :math:`t - dt`
(For each state, we also store the previous value,
so if we have a state for :math:`5 * y` at :math:`t - dt`,
then its previous value is :math:`5 * y` at :math:`t - 2 * dt`).
"""
delay_expression_states = []
for delay_state, delay_time in zip(self.__pymoca_model.delay_states, self.__delay_times):
if delay_time > 0:
n_previous_values = int(np.ceil(delay_time / self.get_time_step()))
else:
n_previous_values = 1
expression_state = delay_state + "_expr"
expression_symbol = ca.MX.sym(expression_state, n_previous_values)
delay_expression_states.append(expression_symbol)
return delay_expression_states
[docs]
def initialize(self, config_file=None):
"""
Initialize state vector with default values
:param config_file: Path to an initialization file.
"""
if config_file:
# TODO read start and stop time from config_file and call:
# self.setup_experiment(start,stop)
# for now, assume that setup_experiment was called beforehand
raise NotImplementedError
# Short-hand notation for the model
model = self.__pymoca_model
# Set values of parameters defined in the model into the state vector
for var in self.__pymoca_model.parameters:
# First check to see if parameter is already set (this allows child classes to override
# model defaults)
if np.isfinite(self.get_var(var.symbol.name())):
continue
# Also test to see if the value is constant
if isinstance(var.value, ca.MX) and not var.value.is_constant():
continue
# Try to extract the value
try:
# Extract the value as a python type
val = var.python_type(var.value)
except ValueError:
# var.value is a float NaN being cast to non-float
continue
else:
# If val is finite, we set it
if np.isfinite(val):
logger.debug(
"SimulationProblem: Setting parameter {} = {}".format(
var.symbol.name(), val
)
)
self.set_var(var.symbol.name(), val)
# Nominals can be symbolic, written in terms of parameters. After all
# parameter values are known, we evaluate the numeric values of the
# nominals.
nominal_vars = list(self.__nominals.keys())
symbolic_nominals = ca.vertcat(*[self.get_variable_nominal(v) for v in nominal_vars])
nominal_evaluator = ca.Function(
"nominal_evaluator", self.__mx["parameters"], [symbolic_nominals]
)
n_parameters = len(self.__mx["parameters"])
if n_parameters > 0:
[evaluated_nominals] = nominal_evaluator.call(self.__state_vector[-n_parameters:])
else:
[evaluated_nominals] = nominal_evaluator.call([])
evaluated_nominals = np.array(evaluated_nominals).ravel()
nominal_dict = dict(zip(nominal_vars, evaluated_nominals))
self.__nominals.update(nominal_dict)
# The variables that need a mutually consistent initial condition
X = ca.vertcat(*self.__sym_list[: self.__n_state_symbols])
X_prev = ca.vertcat(
*[
ca.MX.sym(sym.name() + "_prev", sym.shape)
for sym in self.__sym_list[: self.__n_state_symbols]
]
)
# Assemble initial residuals and set values from start attributes into the state vector
minimized_residuals = []
for var in itertools.chain(self.__pymoca_model.states, self.__pymoca_model.alg_states):
var_name = var.symbol.name()
var_nominal = self.get_variable_nominal(var_name)
# Attempt to cast var.start to python type
mx_start = ca.MX(var.start)
if mx_start.is_constant():
# cast var.start to python type
start_val = var.python_type(mx_start.to_DM())
else:
# var.start is a symbolic expression with unknown value
start_val = None
if start_val == 0.0 and not var.fixed:
# To make initialization easier, we allow setting initial states by providing
# timeseries with names that match a symbol in the model. We only check for this
# matching if the start and fixed attributes were left as default
try:
start_val = self.initial_state()[var_name]
except KeyError:
pass
else:
# An initial state was found- add it to the constrained residuals
logger.debug(
"Initialize: Added {} = {} to initial equations "
"(found matching timeseries).".format(var_name, start_val)
)
# Set var to be fixed
var.fixed = True
if not var.fixed:
# To make initialization easier, we allow setting initial guesses by providing
# timeseries with names that match a symbol in the model. We only check for this
# matching if the start and fixed attributes were left as default
try:
start_val = self.seed()[var_name]
except KeyError:
pass
else:
# An initial state was found- add it to the constrained residuals
logger.debug(
"Initialize: Added {} = {} as initial guess "
"(found matching timeseries).".format(var_name, start_val)
)
# Attempt to set start_val in the state vector. Default to zero if unknown.
try:
self.set_var(var_name, start_val if start_val is not None else 0.0)
except KeyError:
logger.warning(
"Initialize: {} not found in state vector. "
"Initial value of {} not set.".format(var_name, start_val)
)
# Add a residual for the difference between the state and its starting expression
start_expr = start_val if start_val is not None else var.start
if var.fixed:
# Set bounds to be equal to each other, such that IPOPT can
# turn the decision variable into a parameter.
var.min = start_expr
var.max = start_expr
else:
# minimize residual
minimized_residuals.append((var.symbol - start_expr) / var_nominal)
# Default start var for ders is zero
for der_var in self.__mx["derivatives"]:
self.set_var(der_var.name(), 0.0)
# Residuals for initial values for the delay states / expressions.
for delay_state, delay_argument in zip(model.delay_states, model.delay_arguments):
expression_state = delay_state + "_expr"
i_delay_state, _ = self.__indices[delay_state]
i_expr_start, _ = self.__i_start[expression_state]
i_expr_end, _ = self.__i_end[expression_state]
minimized_residuals.append(X[i_expr_start:i_expr_end] - delay_argument.expr)
minimized_residuals.append(X[i_delay_state] - delay_argument.expr)
# Warn for nans in state vector (verify we didn't miss anything)
self.__warn_for_nans()
# Optionally encourage a steady-state initial condition
if getattr(self, "encourage_steady_state_initial_conditions", False):
# add penalty for der(var) != 0.0
for d in self.__mx["derivatives"]:
logger.debug("Added {} to the minimized residuals.".format(d.name()))
minimized_residuals.append(d)
# Make minimized_residuals into a single symbolic object
if minimized_residuals:
minimized_residual = ca.vertcat(*minimized_residuals)
else:
# DAE is empty
minimized_residual = ca.MX(0)
# Extra equations
extra_equations = self.extra_equations()
# Assemble symbolics needed to make a function describing the initial condition of the model
# We constrain every entry in this MX to zero
equality_constraints = ca.vertcat(
self.__dae_residual, self.__initial_residual, *extra_equations
)
# Make a list of unscaled symbols and a list of their scaled equivalent
unscaled_symbols = []
scaled_symbols = []
for sym_name, nominal in self.__nominals.items():
# Note that sym_name is always a canonical state
index, _ = self.__indices[sym_name]
# If the symbol is a state, Add the symbol to the lists
if index <= self.__n_states:
unscaled_symbols.append(X[index])
scaled_symbols.append(X[index] * nominal)
# Also scale previous states
unscaled_symbols.append(X_prev[index])
scaled_symbols.append(X_prev[index] * nominal)
unscaled_symbols = ca.vertcat(*unscaled_symbols)
scaled_symbols = ca.vertcat(*scaled_symbols)
# Substitute unscaled terms for scaled terms
equality_constraints = ca.substitute(equality_constraints, unscaled_symbols, scaled_symbols)
minimized_residual = ca.substitute(minimized_residual, unscaled_symbols, scaled_symbols)
logger.debug("SimulationProblem: Initial Equations are " + str(equality_constraints))
logger.debug("SimulationProblem: Minimized Residuals are " + str(minimized_residual))
# State bounds can be symbolic, written in terms of parameters. After all
# parameter values are known, we evaluate the numeric values of bounds.
bound_vars = (
self.__pymoca_model.states
+ self.__pymoca_model.alg_states
+ self.__pymoca_model.der_states
+ self.__extra_variables
)
symbolic_bounds = ca.vertcat(*[ca.horzcat(v.min, v.max) for v in bound_vars])
bound_evaluator = ca.Function("bound_evaluator", self.__mx["parameters"], [symbolic_bounds])
# Evaluate bounds using values of parameters
n_parameters = len(self.__mx["parameters"])
if n_parameters > 0:
[evaluated_bounds] = bound_evaluator.call(self.__state_vector[-n_parameters:])
else:
[evaluated_bounds] = bound_evaluator.call([])
# Scale the bounds with the nominals
nominals = []
for var in bound_vars:
nominals.append(self.get_variable_nominal(var.symbol.name()))
evaluated_bounds = np.array(evaluated_bounds) / np.array(nominals)[:, None]
# Update with the bounds of delayed states / expressions
if model.delay_states:
i_start_first_delay_state, _ = self.__indices[model.delay_states[0]]
i_end_last_delay_expr, _ = self.__i_end[model.delay_states[-1] + "_expr"]
n_delay = i_end_last_delay_expr - i_start_first_delay_state
delay_bounds = np.array([-np.inf, np.inf] * n_delay).reshape((n_delay, 2))
# offset = len(self.__pymoca_model.states) + len(self.__pymoca_model.alg_states)
offset = i_start_first_delay_state
evaluated_bounds = np.vstack(
(evaluated_bounds[:offset, :], delay_bounds, evaluated_bounds[offset:, :])
)
# Construct arrays of state bounds (used in the initialize() nlp, but not in __do_step
# rootfinder)
self.__lbx = evaluated_bounds[:, 0]
self.__ubx = evaluated_bounds[:, 1]
# Constrain model equation residuals to zero
lbg = np.zeros(equality_constraints.size1())
ubg = np.zeros(equality_constraints.size1())
# Construct objective function from the input residual
objective_function = ca.dot(minimized_residual, minimized_residual)
# Substitute constants and parameters
const_and_par = ca.vertcat(
*self.__mx["time"], *self.__mx["constant_inputs"], *self.__mx["parameters"]
)
const_and_par_values = self.__state_vector[self.__n_states :]
objective_function = ca.substitute(objective_function, const_and_par, const_and_par_values)
equality_constraints = ca.substitute(
equality_constraints, const_and_par, const_and_par_values
)
# Construct nlp and solver to find initial state using ipopt
# Note that some operations cannot be expanded, e.g. ca.interpolant. So
# we _try_ to expand, but fall back on ca.MX evaluation if we cannot.
try:
expand_f_g = ca.Function("f", [X], [objective_function, equality_constraints]).expand()
except RuntimeError as e:
if "eval_sx" not in str(e):
raise
else:
logger.info(
"Cannot expand objective/constraints to SX, falling back to MX evaluation"
)
nlp = {"x": X, "f": objective_function, "g": equality_constraints}
else:
X_sx = ca.SX.sym("X", X.shape)
objective_function_sx, equality_constraints_sx = expand_f_g(X_sx)
nlp = {"x": X_sx, "f": objective_function_sx, "g": equality_constraints_sx}
solver = ca.nlpsol("solver", "ipopt", nlp, self.solver_options())
# Construct guess
guess = ca.vertcat(*np.nan_to_num(self.__state_vector[: self.__n_states]))
# Find initial state
initial_state = solver(x0=guess, lbx=self.__lbx, ubx=self.__ubx, lbg=lbg, ubg=ubg)
# If unsuccessful, stop.
return_status = solver.stats()["return_status"]
if return_status not in {"Solve_Succeeded", "Solved_To_Acceptable_Level"}:
raise Exception('Initialization Failed with return status "{}"'.format(return_status))
# Update state vector with initial conditions
self.__state_vector[: self.__n_states] = initial_state["x"][: self.__n_states].T
# make a copy of the initialized initial state vector in case we want to run the model again
self.__initialized_state_vector = copy.deepcopy(self.__state_vector)
# Warn for nans in state vector after initialization
self.__warn_for_nans()
# No longer allow setting parameters with set_var(), as we want to be
# clear that that does not work
self.__parameters_set_var = False
self.__parameter_names_including_aliases = set()
for p in self.__parameters.keys():
self.__parameter_names_including_aliases |= self.alias_relation.aliases(p)
# Construct the rootfinder
# Assemble some symbolics, including those needed for a backwards Euler derivative
# approximation
dt = ca.MX.sym("delta_t")
parameters = ca.vertcat(*self.__mx["parameters"])
if n_parameters > 0:
constants = ca.vertcat(X_prev, *self.__sym_list[self.__n_state_symbols : -n_parameters])
else:
constants = ca.vertcat(X_prev, *self.__sym_list[self.__n_state_symbols :])
# Make a list of derivative approximations using backwards Euler formulation
derivative_approximation_residuals = []
for index, derivative_state in enumerate(self.__mx["derivatives"]):
derivative_approximation_residuals.append(
derivative_state - (X[index] - X_prev[index]) / dt
)
# Delayed feedback
delay_equations = []
for delay_state, delay_argument, delay_time in zip(
model.delay_states,
model.delay_arguments,
self.__delay_times,
):
expression_state = delay_state + "_expr"
i_delay_state, _ = self.__indices[delay_state]
i_expr_start, _ = self.__i_start[expression_state]
i_expr_end, _ = self.__i_end[expression_state]
delay_equations.append(X[i_expr_start] - delay_argument.expr)
delay_equations.append(
X[i_expr_start + 1 : i_expr_end] - X_prev[i_expr_start : i_expr_end - 1]
)
n_previous_values = self.__sym_dict[expression_state].numel()
interpolation_weight = n_previous_values - delay_time / self.__dt
delay_equations.append(
X[i_delay_state]
- interpolation_weight * X[i_expr_end - 1]
- (1 - interpolation_weight) * X_prev[i_expr_end - 1]
)
# Append residuals for derivative approximations
dae_residual = ca.vertcat(
self.__dae_residual,
*derivative_approximation_residuals,
*delay_equations,
*extra_equations,
)
# TODO: implement lookup_tables
# Substitute unscaled terms for scaled terms
dae_residual = ca.substitute(dae_residual, unscaled_symbols, scaled_symbols)
# Substitute the parameters
if n_parameters > 0:
parameters_values = self.__state_vector[-n_parameters:]
dae_residual = ca.substitute(dae_residual, parameters, parameters_values)
if logger.getEffectiveLevel() == logging.DEBUG:
logger.debug("SimulationProblem: DAE Residual is " + str(dae_residual))
if X.size1() != dae_residual.size1():
logger.error(
"Formulation Error: Number of states ({}) "
"does not equal number of equations ({})".format(X.size1(), dae_residual.size1())
)
# Construct a function res_vals that returns the numerical residuals of a numerical state
self.__res_vals = ca.Function("res_vals", [X, dt, constants], [dae_residual])
try:
self.__res_vals = self.__res_vals.expand()
except RuntimeError as e:
if "eval_sx" not in str(e):
raise
else:
pass
# Use rootfinder() to make a function that takes a step forward in time by trying to zero
# res_vals()
options = self.rootfinder_options()
solver = options["solver"]
solver_options = options["solver_options"]
self.__do_step = ca.rootfinder("next_state", solver, self.__res_vals, solver_options)
[docs]
def pre(self):
"""
Any preprocessing takes place here.
"""
pass
[docs]
def post(self):
"""
Any postprocessing takes place here.
"""
pass
[docs]
def setup_experiment(self, start, stop, dt):
"""
Method for subclasses (PIMixin, CSVMixin, or user classes) to set timing information for a
simulation run.
:param start: Start time for the simulation.
:param stop: Final time for the simulation.
:param dt: Time step size.
"""
# Set class vars with start/stop/dt values
self.__start = start
self.__stop = stop
self.set_time_step(dt)
# Set time in state vector
self.set_var("time", start)
[docs]
def update(self, dt):
"""
Performs one timestep.
The methods ``setup_experiment`` and ``initialize`` must have been called before.
:param dt: Time step size.
"""
if dt > 0:
self.set_time_step(dt)
dt = self.get_time_step()
logger.debug("Taking a step at {} with size {}".format(self.get_current_time(), dt))
# increment time
self.set_var("time", self.get_current_time() + dt)
# take a step
guess = self.__state_vector[: self.__n_states]
if len(self.__mx["parameters"]) > 0:
next_state = self.__do_step(
guess, dt, self.__state_vector[: -len(self.__mx["parameters"])]
)
else:
next_state = self.__do_step(guess, dt, self.__state_vector)
# Check convergence of rootfinder
rootfinder_stats = self.__do_step.stats()
if not rootfinder_stats["success"]:
message = (
"Simulation has failed to converge at time {}. Solver failed with status {}"
).format(self.get_current_time(), rootfinder_stats["nlpsol"]["return_status"])
logger.error(message)
raise Exception(message)
if logger.getEffectiveLevel() == logging.DEBUG:
# compute max residual
largest_res = ca.norm_inf(
self.__res_vals(
next_state, self.__dt, self.__state_vector[: -len(self.__mx["parameters"])]
)
)
logger.debug("Residual maximum magnitude: {:.2E}".format(float(largest_res)))
# Update state vector
self.__state_vector[: self.__n_states] = next_state.toarray().ravel()
[docs]
def simulate(self):
"""
Run model from start_time to end_time.
"""
# Do any preprocessing, which may include changing parameter values on
# the model
logger.info("Preprocessing")
self.pre()
# Initialize model
logger.info("Initializing")
self.initialize()
# Perform all timesteps
logger.info("Running")
while self.get_current_time() < self.get_end_time():
self.update(-1)
# Do any postprocessing
logger.info("Postprocessing")
self.post()
[docs]
def reset(self):
"""
Reset the FMU.
"""
self.__state_vector = copy.deepcopy(self.__initialized_state_vector)
[docs]
def get_start_time(self):
"""
Return start time of experiment.
:returns: The start time of the experiment.
"""
return self.__start
[docs]
def get_end_time(self):
"""
Return end time of experiment.
:returns: The end time of the experiment.
"""
return self.__stop
[docs]
def get_current_time(self):
"""
Return current time of simulation.
:returns: The current simulation time.
"""
return self.get_var("time")
def get_time_step(self):
"""
Return simulation timestep.
:returns: The simulation timestep.
"""
return self.__dt
[docs]
def get_var(self, name):
"""
Return a numpy array from FMU.
:param name: Variable name.
:returns: The value of the variable.
"""
# Get the index of the canonical state and sign
index, sign = self.__indices[name]
value = self.__state_vector[index]
# Adjust sign if needed
if sign < 0:
value *= sign
# Adjust for nominal value if not default
if index <= self.__n_states:
nominal = self.get_variable_nominal(name)
value *= nominal
return value
[docs]
def get_var_count(self):
"""
Return the number of variables in the model.
:returns: The number of variables in the model.
"""
return len(self.get_variables())
[docs]
def get_var_name(self, i):
"""
Returns the name of a variable.
:param i: Index in ordered dictionary returned by method get_variables.
:returns: The name of the variable.
"""
return list(self.get_variables())[i]
[docs]
def get_var_type(self, name):
"""
Return type, compatible with numpy.
:param name: String variable name.
:returns: The numpy-compatible type of the variable.
:raises: KeyError
"""
return self.__python_types[name]
[docs]
def get_var_rank(self, name):
"""
Not implemented
"""
raise NotImplementedError
[docs]
def get_var_shape(self, name):
"""
Not implemented
"""
raise NotImplementedError
[docs]
def get_variables(self):
"""
Return all variables (both internal and user defined)
:returns: An ordered dictionary of all variables supported by the model.
"""
return AliasDict(self.alias_relation, self.__sym_dict)
@cached
def get_state_variables(self):
return AliasDict(
self.alias_relation,
{sym.name(): sym for sym in (self.__mx["states"] + self.__mx["algebraics"])},
)
@cached
def get_parameter_variables(self):
return AliasDict(self.alias_relation, {sym.name(): sym for sym in self.__mx["parameters"]})
@cached
def get_input_variables(self):
return AliasDict(
self.alias_relation, {sym.name(): sym for sym in self.__mx["constant_inputs"]}
)
@cached
def get_output_variables(self):
return self.__pymoca_model.outputs
def __warn_for_nans(self):
"""
Test state vector for missing values and warn
"""
value_is_nan = np.isnan(self.__state_vector)
if any(value_is_nan):
for sym, isnan in zip(self.__sym_list, value_is_nan):
if isnan:
logger.warning("Variable {} has no value.".format(sym))
def set_time_step(self, dt):
"""
Set the timestep size.
:param dt: Timestep size of the simulation.
"""
if self._dt_is_fixed:
assert math.isclose(
self.__dt, dt
), "Timestep size dt is marked as constant and cannot be changed."
else:
self.__dt = dt
def set_var(self, name, value):
"""
Set the value of the given variable.
:param name: Name of variable to set.
:param value: Value(s).
"""
# TODO: sanitize input
# Check if it is a parameter, and if it is allowed to be set
if not self.__parameters_set_var:
if name in self.__parameter_names_including_aliases:
raise Exception("Cannot set parameters after initialize() has been called.")
# Get the index of the canonical state and sign
index, sign = self.__indices[name]
if sign < 0:
value *= sign
# Adjust for nominal value if not default
if index <= self.__n_states:
nominal = self.get_variable_nominal(name)
value /= nominal
# Store value in state vector
self.__state_vector[index] = value
def set_var_slice(self, name, start, count, var):
"""
Not implemented.
"""
raise NotImplementedError
def set_var_index(self, name, index, var):
"""
Not implemented.
"""
raise NotImplementedError
def inq_compound(self, name):
"""
Not implemented.
"""
raise NotImplementedError
def inq_compound_field(self, name, index):
"""
Not implemented.
"""
raise NotImplementedError
def solver_options(self):
"""
Returns a dictionary of CasADi nlpsol() solver options.
:returns: A dictionary of CasADi :class:`nlpsol` options. See the CasADi
documentation for details.
"""
return {
"ipopt.fixed_variable_treatment": "make_parameter",
"ipopt.print_level": 0,
"print_time": False,
"error_on_fail": False,
}
def rootfinder_options(self):
"""
Returns a dictionary of CasADi rootfinder() options.
The dictionary has the following items:
* solver: the solver used for rootfinding, e.g. nlpsol, fast_newton, etc.
* solver_options: options for the solver
See the CasADi documentation for details on solvers and solver options.
:returns: A dictionary of CasADi :class:`rootfinder` options.
"""
return {
"solver": "nlpsol",
"solver_options": {
"nlpsol": "ipopt",
"nlpsol_options": self.solver_options(),
"error_on_fail": False,
},
}
def get_variable_nominal(self, variable) -> Union[float, ca.MX]:
"""
Get the value of the nominal attribute of a variable
NOTE: Due to backwards compatibility for allowing parameters to be set
with set_var() instead of overriding parameters(), this method can
return a symbolic value for nominals defined in the Modelica file. It
can only do so until the initializion() method in this class is
called/completed, after which it will return numeric values only.
"""
return self.__nominals.get(variable, 1.0)
def timeseries_at(self, variable, t):
"""
Get value of timeseries variable at time t: should be overridden by pi or csv mixin
"""
raise NotImplementedError
@cached
def initial_state(self) -> AliasDict[str, float]:
"""
The initial state.
:returns: A dictionary of variable names and initial state (t0) values.
"""
t0 = self.get_start_time()
initial_state_dict = AliasDict(self.alias_relation)
for variable in list(self.get_state_variables()) + list(self.get_input_variables()):
try:
initial_state_dict[variable] = self.timeseries_at(variable, t0)
except KeyError:
pass
except NotImplementedError:
pass
else:
if logger.getEffectiveLevel() == logging.DEBUG:
logger.debug("Read intial state for {}".format(variable))
return initial_state_dict
@cached
def seed(self) -> AliasDict[str, float]:
"""
Seed values providing an initial guess for the t0 states.
:returns: A dictionary of variable names and seed (t0) values.
"""
return AliasDict(self.alias_relation)
@cached
def parameters(self):
"""
Return a dictionary of parameter values extracted from the Modelica model
"""
# Create AliasDict
parameters = AliasDict(self.alias_relation)
# Update with model parameters
parameters.update({p.symbol.name(): p.value for p in self.__pymoca_model.parameters})
return parameters
def extra_variables(self) -> List[Variable]:
return []
def extra_equations(self) -> List[ca.MX]:
return []
@property
@cached
def alias_relation(self):
return self.__pymoca_model.alias_relation
@cached
def compiler_options(self):
"""
Subclasses can configure the `pymoca <http://github.com/pymoca/pymoca>`_ compiler options
here.
:returns:
A dictionary of pymoca compiler options. See the pymoca documentation for details.
"""
# Default options
compiler_options = {}
# Expand vector states to multiple scalar component states.
compiler_options["expand_vectors"] = True
# Where imported model libraries are located.
library_folders = self.modelica_library_folders.copy()
for ep in pkg_resources.iter_entry_points(group="rtctools.libraries.modelica"):
if ep.name == "library_folder":
library_folders.append(pkg_resources.resource_filename(ep.module_name, ep.attrs[0]))
compiler_options["library_folders"] = library_folders
# Eliminate equations of the type 'var = const'.
compiler_options["eliminate_constant_assignments"] = True
# Eliminate constant symbols from model, replacing them with the values
# specified in the model.
compiler_options["replace_constant_values"] = True
# Replace any constant expressions into the model.
compiler_options["replace_constant_expressions"] = True
# Replace any parameter expressions into the model.
compiler_options["replace_parameter_expressions"] = True
# Eliminate variables starting with underscores.
compiler_options["eliminable_variable_expression"] = r"(.*[.]|^)_\w+(\[[\d,]+\])?\Z"
# Pymoca currently requires `expand_mx` to be set for
# `eliminable_variable_expression` to work.
compiler_options["expand_mx"] = True
# Automatically detect and eliminate alias variables.
compiler_options["detect_aliases"] = True
# Cache the model on disk
compiler_options["cache"] = True
# Done
return compiler_options