Source code for rtctools.simulation.io_mixin

import bisect
import logging
from abc import ABCMeta, abstractmethod
from math import isfinite

import numpy as np

from rtctools._internal.alias_tools import AliasDict
from rtctools._internal.caching import cached
from rtctools.simulation.simulation_problem import SimulationProblem

logger = logging.getLogger("rtctools")


class IOMixin(SimulationProblem, metaclass=ABCMeta):
    """
    Base class for all IO methods of optimization problems.
    """

    def __init__(self, **kwargs):
        # Call parent class first for default behaviour.
        super().__init__(**kwargs)

        self._simulation_times = []

        self.__first_update_call = True

    def pre(self) -> None:
        # Call read method to read all input
        self.read()

        self._simulation_times = []

    @abstractmethod
    def read(self) -> None:
        """
        Reads input data from files, storing it in the internal data store through the various set
        or add methods
        """
        pass

    def post(self) -> None:
        # Call write method to write all output
        self.write()

    @abstractmethod
    def write(self) -> None:
        """
        Writes output data to files, getting the data from the data store through the various get
        methods
        """
        pass

    def initialize(self, config_file=None):
        # Set up experiment
        timeseries_import_times = self.io.times_sec
        self.__dt = timeseries_import_times[1] - timeseries_import_times[0]
        self.setup_experiment(0, timeseries_import_times[-1], self.__dt)

        parameter_variables = set(self.get_parameter_variables())

        logger.debug("Model parameters are {}".format(parameter_variables))

        for parameter, value in self.io.parameters().items():
            if parameter in parameter_variables:
                logger.debug("IOMixin: Setting parameter {} = {}".format(parameter, value))
                self.set_var(parameter, value)

        # Load input variable names
        self.__input_variables = set(self.get_input_variables().keys())

        # Set input values
        t_idx = bisect.bisect_left(timeseries_import_times, 0.0)
        self.__set_input_variables(t_idx)

        logger.debug("Model inputs are {}".format(self.__input_variables))

        # Set first timestep
        self._simulation_times.append(self.get_current_time())

        # Empty output
        self._io_output_variables = self.get_output_variables()
        self._io_output = AliasDict(self.alias_relation)

        # Call super, which will also initialize the model itself
        super().initialize(config_file)

        # Extract consistent t0 values
        for variable in self._io_output_variables:
            self._io_output[variable] = [self.get_var(variable)]

    def __set_input_variables(self, t_idx, use_cache=False):
        if not use_cache:
            self.__cache_loop_timeseries = {}

            timeseries_names = set(self.io.get_timeseries_names(0))
            for v in self.get_variables():
                if v in timeseries_names:
                    _, values = self.io.get_timeseries_sec(v)
                    self.__cache_loop_timeseries[v] = values

        for variable, values in self.__cache_loop_timeseries.items():
            value = values[t_idx]
            if isfinite(value):
                self.set_var(variable, value)
            else:
                logger.debug(
                    "IOMixin: Found bad value {} at index [{}] "
                    "in timeseries aliased to input {}".format(value, t_idx, variable)
                )

    def update(self, dt):
        # Time step
        if dt < 0:
            dt = self.__dt

        # Current time stamp
        t = self.get_current_time()
        self._simulation_times.append(t + dt)

        # Get current time index
        t_idx = bisect.bisect_left(self.io.times_sec, t + dt)

        # Set input values
        self.__set_input_variables(t_idx, not self.__first_update_call)

        # Call super
        super().update(dt)

        # Extract results
        for variable, values in self._io_output.items():
            values.append(self.get_var(variable))

        self.__first_update_call = False

    def extract_results(self):
        """
        Extracts the results of output

        :returns: An AliasDict of output variables and results array format.
        """
        io_outputs_arrays = self._io_output.copy()
        for k in io_outputs_arrays.keys():
            io_outputs_arrays[k] = np.array(io_outputs_arrays[k])

        return io_outputs_arrays

    @cached
    def parameters(self):
        """
        Return a dictionary of parameters, including parameters in the input files files.

        :returns: Dictionary of parameters
        """
        # Call parent class first for default values.
        parameters = super().parameters()

        # Load parameters from input files (stored in internal data store)
        for parameter_name, value in self.io.parameters().items():
            parameters[parameter_name] = value

        if logger.getEffectiveLevel() == logging.DEBUG:
            for parameter_name in self.io.parameters().keys():
                logger.debug("IOMixin: Read parameter {}".format(parameter_name))

        return parameters

    def times(self, variable=None):
        """
        Return a list of all the timesteps in seconds.

        :param variable: Variable name.

        :returns: List of all the timesteps in seconds.
        """
        idx = bisect.bisect_left(self.io.datetimes, self.io.reference_datetime)
        return self.io.times_sec[idx:]

    def timeseries_at(self, variable, t):
        """
        Return the value of a time series at the given time.

        :param variable: Variable name.
        :param t: Time.

        :returns: The interpolated value of the time series.

        :raises: KeyError
        """
        timeseries_times_sec, values = self.io.get_timeseries_sec(variable)
        t_idx = bisect.bisect_left(timeseries_times_sec, t)
        if timeseries_times_sec[t_idx] == t:
            return values[t_idx]
        else:
            return np.interp(t, timeseries_times_sec, values)