# Source code for rtctools.optimization.control_tree_mixin

```
import logging
from typing import Dict, List, Tuple, Union
import numpy as np
from .optimization_problem import OptimizationProblem
logger = logging.getLogger("rtctools")
[docs]
class ControlTreeMixin(OptimizationProblem):
"""
Adds a stochastic control tree to your optimization problem.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__branches = {}
[docs]
def control_tree_options(self) -> Dict[str, Union[List[str], List[float], int]]:
"""
Returns a dictionary of options controlling the creation of a k-ary stochastic tree.
+------------------------+---------------------+-----------------------+
| Option | Type | Default value |
+========================+=====================+=======================+
| ``forecast_variables`` | ``list`` of strings | All constant inputs |
+------------------------+---------------------+-----------------------+
| ``branching_times`` | ``list`` of floats | ``self.times()`` |
+------------------------+---------------------+-----------------------+
| ``k`` | ``int`` | ``2`` |
+------------------------+---------------------+-----------------------+
A ``k``-ary tree is generated, branching at every interior branching time.
Ensemble members are clustered to paths through the tree based on average
distance over all forecast variables.
:returns: A dictionary of control tree generation options.
"""
options = {}
options["forecast_variables"] = [
var.name() for var in self.dae_variables["constant_inputs"]
]
options["branching_times"] = self.times()[1:]
options["k"] = 2
return options
def discretize_control(self, variable, ensemble_member, times, offset):
control_indices = np.zeros(len(times), dtype=np.int16)
for branch, members in self.__branches.items():
if ensemble_member not in members:
continue
branching_time_0 = self.__branching_times[len(branch) + 0]
branching_time_1 = self.__branching_times[len(branch) + 1]
els = np.logical_and(times >= branching_time_0, times < branching_time_1)
nnz = np.count_nonzero(els)
try:
control_indices[els] = self.__discretize_controls_cache[(variable, branch)]
except KeyError:
control_indices[els] = list(range(offset, offset + nnz))
self.__discretize_controls_cache[(variable, branch)] = control_indices[els]
offset += nnz
return control_indices
def discretize_controls(self, resolved_bounds):
self.__discretize_controls_cache = {}
# Collect options
options = self.control_tree_options()
# Make sure branching times contain initial and final time. The
# presence of these is assumed below.
times = self.times()
t0 = self.initial_time
self.__branching_times = options["branching_times"]
n_branching_times = len(self.__branching_times)
if n_branching_times > len(times) - 1:
raise Exception("Too many branching points specified")
self.__branching_times = np.concatenate(([t0], self.__branching_times, [np.inf]))
logger.debug("ControlTreeMixin: Branching times:")
logger.debug(self.__branching_times)
# Branches start at branching times, so that the tree looks like the following:
#
# *-----
# *-----
# *-----
#
# t0 t1
#
# with branching time t1.
branches = {}
def branch(current_branch: Tuple[int]):
if len(current_branch) >= n_branching_times:
return
# Branch stats
n_branch_members = len(branches[current_branch])
if n_branch_members == 0:
# Nothing to do
return
distances = np.zeros((n_branch_members, n_branch_members))
# Decide branching on a segment of the time horizon
branching_time_0 = self.__branching_times[len(current_branch) + 1]
branching_time_1 = self.__branching_times[len(current_branch) + 2]
# Compute reverse ensemble member index-to-distance index map.
reverse = {}
for i, member_i in enumerate(branches[current_branch]):
reverse[member_i] = i
# Compute distances between ensemble members, summed for all
# forecast variables
for forecast_variable in options["forecast_variables"]:
# We assume the time stamps of the forecasts in all ensemble
# members to be identical
timeseries = self.constant_inputs(ensemble_member=0)[forecast_variable]
els = np.logical_and(
timeseries.times >= branching_time_0, timeseries.times < branching_time_1
)
# Compute distance between ensemble members
for i, member_i in enumerate(branches[current_branch]):
timeseries_i = self.constant_inputs(ensemble_member=member_i)[forecast_variable]
for j, member_j in enumerate(branches[current_branch]):
timeseries_j = self.constant_inputs(ensemble_member=member_j)[
forecast_variable
]
distances[i, j] += np.linalg.norm(
timeseries_i.values[els] - timeseries_j.values[els]
)
# Keep track of ensemble members that have not yet been allocated
# to a new branch
available = set(branches[current_branch])
# We first select the scenario with the max distance to any other branch
idx = np.argmax(np.amax(distances, axis=0))
for i in range(options["k"]):
if idx >= 0:
branches[current_branch + (i,)] = [branches[current_branch][idx]]
available.remove(branches[current_branch][idx])
# We select the scenario with the max min distance to the other branches
min_distances = np.array(
[
min(
[np.inf]
+ [
distances[j, k]
for j, member_j in enumerate(branches[current_branch])
if member_j not in available and member_k in available
]
)
for k, member_k in enumerate(branches[current_branch])
],
dtype=np.float64,
)
min_distances[np.where(min_distances == np.inf)] = -np.inf
idx = np.argmax(min_distances)
if min_distances[idx] <= 0:
idx = -1
else:
branches[current_branch + (i,)] = []
# Cluster remaining ensemble members to branches
for member_i in available:
min_i = 0
min_distance = np.inf
for i in range(options["k"]):
branch2 = branches[current_branch + (i,)]
if len(branch2) > 0:
distance = distances[reverse[member_i], reverse[branch2[0]]]
if distance < min_distance:
min_distance = distance
min_i = i
branches[current_branch + (min_i,)].append(member_i)
# Recurse
for i in range(options["k"]):
branch(current_branch + (i,))
current_branch = ()
branches[current_branch] = list(range(self.ensemble_size))
branch(current_branch)
logger.debug("ControlTreeMixin: Control tree is:")
logger.debug(branches)
self.__branches = branches
# By now, the tree branches have been set up. We now rely
# on the default discretization logic to call discretize_control()
# for each (control variable, ensemble member) pair.
return super().discretize_controls(resolved_bounds)
@property
def control_tree_branches(self) -> Dict[Tuple[int], List[int]]:
"""
Returns a dictionary mapping the branch id (a Tuple of ints) to a list
of ensemble members in said branch.
Note that the root branch is an empty tuple containing all ensemble
members.
"""
return self.__branches.copy()
```