Source code for rtctools.simulation.csv_mixin

import logging
import os

import numpy as np

import rtctools.data.csv as csv
from rtctools._internal.caching import cached
from rtctools.simulation.io_mixin import IOMixin

logger = logging.getLogger("rtctools")


[docs] class CSVMixin(IOMixin): """ Adds reading and writing of CSV timeseries and parameters to your simulation problem. During preprocessing, files named ``timeseries_import.csv``, ``initial_state.csv``, and ``parameters.csv`` are read from the ``input`` subfolder. During postprocessing, a file named ``timeseries_export.csv`` is written to the ``output`` subfolder. :cvar csv_delimiter: Column delimiter used in CSV files. Default is ``,``. :cvar csv_validate_timeseries: Check consistency of timeseries. Default is ``True``. """ #: Column delimiter used in CSV files csv_delimiter = "," #: Check consistency of timeseries csv_validate_timeseries = True def __init__(self, **kwargs): # Call parent class first for default behaviour. super().__init__(**kwargs) def read(self): # Call parent class first for default behaviour. super().read() # Helper function to check if initial state array actually defines # only the initial state def check_initial_state_array(initial_state): """ Check length of initial state array, throw exception when larger than 1. """ if initial_state.shape: raise Exception( "CSVMixin: Initial state file {} contains more than one row of data. " "Please remove the data row(s) that do not describe the initial " "state.".format(os.path.join(self._input_folder, "initial_state.csv")) ) # Read CSV files _timeseries = csv.load( os.path.join(self._input_folder, self.timeseries_import_basename + ".csv"), delimiter=self.csv_delimiter, with_time=True, ) self.__timeseries_times = _timeseries[_timeseries.dtype.names[0]] self.io.reference_datetime = self.__timeseries_times[0] for key in _timeseries.dtype.names[1:]: self.io.set_timeseries( key, self.__timeseries_times, np.asarray(_timeseries[key], dtype=np.float64) ) logger.debug("CSVMixin: Read timeseries.") try: _parameters = csv.load( os.path.join(self._input_folder, "parameters.csv"), delimiter=self.csv_delimiter ) for key in _parameters.dtype.names: self.io.set_parameter(key, float(_parameters[key])) logger.debug("CSVMixin: Read parameters.") except IOError: pass try: _initial_state = csv.load( os.path.join(self._input_folder, "initial_state.csv"), delimiter=self.csv_delimiter ) logger.debug("CSVMixin: Read initial state.") check_initial_state_array(_initial_state) self.__initial_state = { key: float(_initial_state[key]) for key in _initial_state.dtype.names } except IOError: self.__initial_state = {} # Check for collisions in __initial_state and timeseries import (CSV) for collision in set(self.__initial_state) & set(_timeseries.dtype.names[1:]): if self.__initial_state[collision] == _timeseries[collision][0]: continue else: logger.warning( "CSVMixin: Entry {} in initial_state.csv conflicts with " "timeseries_import.csv".format(collision) ) # Timestamp check if self.csv_validate_timeseries: times = self.__timeseries_times for i in range(len(times) - 1): if times[i] >= times[i + 1]: raise Exception("CSVMixin: Time stamps must be strictly increasing.") times = self.__timeseries_times dt = times[1] - times[0] # Check if the timeseries are truly equidistant if self.csv_validate_timeseries: for i in range(len(times) - 1): if times[i + 1] - times[i] != dt: raise Exception( "CSVMixin: Expecting equidistant timeseries, the time step " "towards {} is not the same as the time step(s) before. " "Set equidistant=False if this is intended.".format(times[i + 1]) ) def write(self): # Call parent class first for default behaviour. super().write() times = self._simulation_times # Write output names = ["time"] + sorted(set(self._io_output_variables)) formats = ["O"] + (len(names) - 1) * ["f8"] dtype = {"names": names, "formats": formats} data = np.zeros(len(times), dtype=dtype) data["time"] = self.io.sec_to_datetime(times, self.io.reference_datetime) for variable in self._io_output_variables: data[variable] = np.array(self._io_output[variable]) fname = os.path.join(self._output_folder, self.timeseries_export_basename + ".csv") csv.save(fname, data, delimiter=self.csv_delimiter, with_time=True) @cached def initial_state(self): """ The initial state. Includes entries from parent classes and initial_state.csv :returns: A dictionary of variable names and initial state (t0) values. """ # Call parent class first for default values. initial_state = super().initial_state() # Set of model vars that are allowed to have an initial state valid_model_vars = set(self.get_state_variables()) | set(self.get_input_variables()) # Load initial states from __initial_state for variable, value in self.__initial_state.items(): # Get the cannonical vars and signs canonical_var, sign = self.alias_relation.canonical_signed(variable) # Only store variables that are allowed to have an initial state if canonical_var in valid_model_vars: initial_state[canonical_var] = value * sign if logger.getEffectiveLevel() == logging.DEBUG: logger.debug("CSVMixin: Read initial state {} = {}".format(variable, value)) else: logger.warning( "CSVMixin: In initial_state.csv, {} is not an input or state variable.".format( variable ) ) return initial_state