Source code for rtctools.optimization.pi_mixin

import logging
from datetime import timedelta

import as pi
import as rtc
from rtctools.optimization.io_mixin import IOMixin

logger = logging.getLogger("rtctools")

[docs]class PIMixin(IOMixin): """ Adds `Delft-FEWS Published Interface <>`_ I/O to your optimization problem. During preprocessing, files named ``rtcDataConfig.xml``, ``timeseries_import.xml``, ``rtcParameterConfig.xml``, and ``rtcParameterConfig_Numerical.xml`` are read from the ``input`` subfolder. ``rtcDataConfig.xml`` maps tuples of FEWS identifiers, including location and parameter ID, to RTC-Tools time series identifiers. During postprocessing, a file named ``timeseries_export.xml`` is written to the ``output`` subfolder. :cvar pi_binary_timeseries: Whether to use PI binary timeseries format. Default is ``False``. :cvar pi_parameter_config_basenames: List of parameter config file basenames to read. Default is [``rtcParameterConfig``]. :cvar pi_parameter_config_numerical_basename: Numerical config file basename to read. Default is ``rtcParameterConfig_Numerical``. :cvar pi_check_for_duplicate_parameters: Check if duplicate parameters are read. Default is ``True``. :cvar pi_validate_timeseries: Check consistency of timeseries. Default is ``True``. """ #: Whether to use PI binary timeseries format pi_binary_timeseries = False #: Location of rtcParameterConfig files pi_parameter_config_basenames = ['rtcParameterConfig'] pi_parameter_config_numerical_basename = 'rtcParameterConfig_Numerical' #: Check consistency of timeseries pi_validate_timeseries = True #: Check for duplicate parameters pi_check_for_duplicate_parameters = True def __init__(self, **kwargs): # Call parent class first for default behaviour. super().__init__(**kwargs) # Load rtcDataConfig.xml. We assume this file does not change over the # life time of this object. self.__data_config = rtc.DataConfig(self._input_folder) def read(self): # Call parent class first for default behaviour. super().read() # rtcParameterConfig self.__parameter_config = [] try: for pi_parameter_config_basename in self.pi_parameter_config_basenames: self.__parameter_config.append(pi.ParameterConfig( self._input_folder, pi_parameter_config_basename)) except IOError: raise Exception( "PIMixin: {}.xml not found in {}.".format(pi_parameter_config_basename, self._input_folder)) try: self.__parameter_config_numerical = pi.ParameterConfig( self._input_folder, self.pi_parameter_config_numerical_basename) except IOError: self.__parameter_config_numerical = None try: self.__timeseries_import = pi.Timeseries( self.__data_config, self._input_folder, self.timeseries_import_basename, binary=self.pi_binary_timeseries, pi_validate_times=self.pi_validate_timeseries) except IOError: raise Exception("PIMixin: {}.xml not found in {}.".format( self.timeseries_import_basename, self._input_folder)) self.__timeseries_export = pi.Timeseries( self.__data_config, self._output_folder, self.timeseries_export_basename, binary=self.pi_binary_timeseries, pi_validate_times=False, make_new_file=True) # Convert timeseries timestamps to seconds since t0 for internal use timeseries_import_times = self.__timeseries_import.times # Timestamp check if self.pi_validate_timeseries: for i in range(len(timeseries_import_times) - 1): if timeseries_import_times[i] >= timeseries_import_times[i + 1]: raise Exception( 'PIMixin: Time stamps must be strictly increasing.') if self.__timeseries_import.dt: # Check if the timeseries are truly equidistant if self.pi_validate_timeseries: dt = timeseries_import_times[1] - timeseries_import_times[0] for i in range(len(timeseries_import_times) - 1): if timeseries_import_times[i + 1] - timeseries_import_times[i] != dt: raise Exception( 'PIMixin: Expecting equidistant timeseries, the time step ' 'towards {} is not the same as the time step(s) before. Set ' 'unit to nonequidistant if this is intended.'.format( timeseries_import_times[i + 1])) # Offer input timeseries to IOMixin = self.__timeseries_import.forecast_datetime for ensemble_member in range(self.__timeseries_import.ensemble_size): for variable, values in self.__timeseries_import.items(ensemble_member):, timeseries_import_times, values, ensemble_member) # store the parameters in the internal data store. Note that we # are effectively broadcasting parameters, as ParameterConfig does # not support parameters varying per ensemble member for parameter_config in self.__parameter_config: for location_id, model_id, parameter_id, value in parameter_config: try: parameter = self.__data_config.parameter(parameter_id, location_id, model_id) except KeyError: parameter = parameter_id, value, ensemble_member, check_duplicates=self.pi_check_for_duplicate_parameters) def solver_options(self): # Call parent options = super().solver_options() # Only do this if we have a rtcParameterConfig_Numerical if self.__parameter_config_numerical is None: return options # Load solver options from parameter config for _location_id, _model, option, value in self.__parameter_config_numerical: options[option] = value # Done return options def write(self): # Call parent class first for default behaviour. super().write() # Get time stamps times = self.times() if len(set(times[1:] - times[:-1])) == 1: dt = timedelta(seconds=times[1] - times[0]) else: dt = None # Start of write output # Write the time range for the export file. self.__timeseries_export.times = [ self.__timeseries_import.times[self.__timeseries_import.forecast_index] + timedelta(seconds=s) for s in times] # Write other time settings self.__timeseries_export.forecast_datetime = self.__timeseries_import.forecast_datetime self.__timeseries_export.dt = dt self.__timeseries_export.timezone = self.__timeseries_import.timezone # Write the ensemble properties for the export file. if self.ensemble_size > 1: self.__timeseries_export.contains_ensemble = True self.__timeseries_export.ensemble_size = self.ensemble_size self.__timeseries_export.contains_ensemble = (self.ensemble_size > 1) # Start looping over the ensembles for extraction of the output values. for ensemble_member in range(self.ensemble_size): results = self.extract_results(ensemble_member) # For all variables that are output variables the values are # extracted from the results. for variable in [ for sym in self.output_variables]: for alias in self.alias_relation.aliases(variable): try: values = results[alias] if len(values) != len(times): values = self.interpolate( times, self.times(alias), values, self.interpolation_method(alias)) except KeyError: try: ts = self.get_timeseries(alias, ensemble_member) if len(ts.times) != len(times): values = self.interpolate( times, ts.times, ts.values) else: values = ts.values except KeyError: logger.error( 'PIMixin: Output requested for non-existent alias {}. ' 'Will not be in output file.'.format(alias)) continue # Check if ID mapping is present try: self.__data_config.pi_variable_ids(alias) except KeyError: logger.debug( 'PIMixin: variable {} has no mapping defined in rtcDataConfig ' 'so cannot be added to the output file.'.format(alias)) continue # Add series to output file. # NOTE: We use the unit of the zeroth ensemble member, as # we might be outputting more ensembles than we read in. self.__timeseries_export.set( alias, values, unit=self.__timeseries_import.get_unit(alias, ensemble_member=0), ensemble_member=ensemble_member) # Write output file to disk self.__timeseries_export.write() def set_timeseries(self, variable: str, *args, unit: str = None, **kwargs): if unit is not None: self.__timeseries_import.set_unit(variable, unit, 0) super().set_timeseries(variable, *args, **kwargs) @property def timeseries_import(self): """ :class:`pi.Timeseries` object containing the input data. """ return self.__timeseries_import @property def timeseries_import_times(self): """ List of time stamps for which input data is specified. The time stamps are in seconds since t0, and may be negative. """ return @property def timeseries_export(self): """ :class:`pi.Timeseries` object for holding the output data. """ return self.__timeseries_export