Source code for rtctools.simulation.pi_mixin

import logging
from datetime import timedelta

import numpy as np

import rtctools.data.pi as pi
import rtctools.data.rtc as rtc
from rtctools.simulation.io_mixin import IOMixin

logger = logging.getLogger("rtctools")


[docs] class PIMixin(IOMixin): """ Adds `Delft-FEWS Published Interface <https://publicwiki.deltares.nl/display/FEWSDOC/The+Delft-Fews+Published+Interface>`_ I/O to your simulation problem. During preprocessing, files named ``rtcDataConfig.xml``, ``timeseries_import.xml``, and``rtcParameterConfig.xml`` are read from the ``input`` subfolder. ``rtcDataConfig.xml`` maps tuples of FEWS identifiers, including location and parameter ID, to RTC-Tools time series identifiers. During postprocessing, a file named ``timeseries_export.xml`` is written to the ``output`` subfolder. :cvar pi_binary_timeseries: Whether to use PI binary timeseries format. Default is ``False``. :cvar pi_parameter_config_basenames: List of parameter config file basenames to read. Default is [``rtcParameterConfig``]. :cvar pi_check_for_duplicate_parameters: Check if duplicate parameters are read. Default is ``True``. :cvar pi_validate_timeseries: Check consistency of timeseries. Default is ``True``. """ #: Whether to use PI binary timeseries format pi_binary_timeseries = False #: Location of rtcParameterConfig files pi_parameter_config_basenames = ["rtcParameterConfig"] #: Check consistency of timeseries pi_validate_timeseries = True #: Check for duplicate parameters pi_check_for_duplicate_parameters = True #: Ensemble member to read from input pi_ensemble_member = 0 def __init__(self, **kwargs): # Call parent class first for default behaviour. super().__init__(**kwargs) # Load rtcDataConfig.xml. We assume this file does not change over the # life time of this object. self.__data_config = rtc.DataConfig(self._input_folder) def read(self): # Call parent class first for default behaviour. super().read() # rtcParameterConfig self.__parameter_config = [] try: for pi_parameter_config_basename in self.pi_parameter_config_basenames: self.__parameter_config.append( pi.ParameterConfig(self._input_folder, pi_parameter_config_basename) ) except FileNotFoundError: raise FileNotFoundError( "PIMixin: {}.xml not found in {}.".format( pi_parameter_config_basename, self._input_folder ) ) # Make a parameters dict for later access for parameter_config in self.__parameter_config: for location_id, model_id, parameter_id, value in parameter_config: try: parameter = self.__data_config.parameter(parameter_id, location_id, model_id) except KeyError: parameter = parameter_id self.io.set_parameter(parameter, value) try: self.__timeseries_import = pi.Timeseries( self.__data_config, self._input_folder, self.timeseries_import_basename, binary=self.pi_binary_timeseries, pi_validate_times=self.pi_validate_timeseries, ) except FileNotFoundError: raise FileNotFoundError( "PIMixin: {}.xml not found in {}".format( self.timeseries_import_basename, self._input_folder ) ) self.__timeseries_export = pi.Timeseries( self.__data_config, self._output_folder, self.timeseries_export_basename, binary=self.pi_binary_timeseries, pi_validate_times=False, make_new_file=True, ) # Convert timeseries timestamps to seconds since t0 for internal use timeseries_import_times = self.__timeseries_import.times # Timestamp check if self.pi_validate_timeseries: for i in range(len(timeseries_import_times) - 1): if timeseries_import_times[i] >= timeseries_import_times[i + 1]: raise ValueError("PIMixin: Time stamps must be strictly increasing.") # Check if the timeseries are equidistant dt = timeseries_import_times[1] - timeseries_import_times[0] if self.pi_validate_timeseries: for i in range(len(timeseries_import_times) - 1): if timeseries_import_times[i + 1] - timeseries_import_times[i] != dt: raise ValueError( "PIMixin: Expecting equidistant timeseries, the time step " "towards {} is not the same as the time step(s) before. Set " "unit to nonequidistant if this is intended.".format( timeseries_import_times[i + 1] ) ) # Stick timeseries into an AliasDict self.io.reference_datetime = self.__timeseries_import.forecast_datetime debug = logger.getEffectiveLevel() == logging.DEBUG for variable, values in self.__timeseries_import.items(self.pi_ensemble_member): self.io.set_timeseries(variable, timeseries_import_times, values) if debug and variable in self.get_variables(): logger.debug( "PIMixin: Timeseries {} replaced another aliased timeseries.".format(variable) ) def write(self): # Call parent class first for default behaviour. super().write() times = self._simulation_times if len(set(np.diff(times))) == 1: dt = timedelta(seconds=times[1] - times[0]) else: dt = None # Start of write output # Write the time range for the export file. self.__timeseries_export.times = [ self.io.reference_datetime + timedelta(seconds=s) for s in times ] # Write other time settings self.__timeseries_export.forecast_datetime = self.io.reference_datetime self.__timeseries_export.dt = dt self.__timeseries_export.timezone = self.__timeseries_import.timezone # Write the ensemble properties for the export file. self.__timeseries_export.ensemble_size = 1 self.__timeseries_export.contains_ensemble = self.__timeseries_import.contains_ensemble # For all variables that are output variables the values are # extracted from the results. for variable in self._io_output_variables: values = np.array(self._io_output[variable]) # Check if ID mapping is present try: self.__data_config.pi_variable_ids(variable) except KeyError: logger.debug( "PIMixin: variable {} has no mapping defined in rtcDataConfig " "so cannot be added to the output file.".format(variable) ) continue # Add series to output file self.__timeseries_export.set( variable, values, unit=self.__timeseries_import.get_unit(variable) ) # Write output file to disk self.__timeseries_export.write() @property def timeseries_import(self): """ :class:`pi.Timeseries` object containing the input data. """ return self.__timeseries_import @property def timeseries_import_times(self): """ List of time stamps for which input data is specified. The time stamps are in seconds since t0, and may be negative. """ return self.io.times_sec @property def timeseries_export(self): """ :class:`pi.Timeseries` object for holding the output data. """ return self.__timeseries_export def set_timeseries(self, variable, values, output=True, check_consistency=True, unit=None): if check_consistency: if len(self.times()) != len(values): raise ValueError( "PIMixin: Trying to set/append values {} with a different " "length than the forecast length. Please make sure the " "values cover forecastDate through endDate with timestep {}.".format( variable, self.__timeseries_import.dt ) ) if unit is None: unit = self.__timeseries_import.get_unit(variable) if output: try: self.__data_config.pi_variable_ids(variable) except KeyError: logger.debug( "PIMixin: variable {} has no mapping defined in rtcDataConfig " "so cannot be added to the output file.".format(variable) ) else: self.__timeseries_export.set(variable, values, unit=unit) self.__timeseries_import.set(variable, values, unit=unit) self.io.set_timeseries(variable, self.io.datetimes, values) def get_timeseries(self, variable): _, values = self.io.get_timeseries(variable) return values def set_unit(self, variable: str, unit: str): """ Set the unit of a time series. :param variable: Time series ID. :param unit: Unit. """ assert hasattr( self, "_PIMixin__timeseries_import" ), "set_unit can only be called after read() in pre() has finished." self.__timeseries_import.set_unit(variable, unit, 0) self.__timeseries_export.set_unit(variable, unit, 0)