"""Tools for hydrological regionalization."""
import logging
import tempfile
from pathlib import Path
from typing import Any, List, Tuple, Union
import numpy as np
import pandas as pd
import statsmodels.api as sm
import xarray as xr
from haversine import haversine_vector
from statsmodels.regression.linear_model import RegressionResults
from ravenpy.config import Config
from ravenpy.ravenpy import Emulator, EnsembleReader
from . import coords
LOGGER = logging.getLogger("PYWPS")
regionalisation_data_dir = Path(__file__).parent.parent / "data" / "regionalisation"
[docs]
def regionalize(
config: Config,
method: str,
nash: pd.Series,
params: pd.DataFrame = None,
props: pd.DataFrame = None,
target_props: Union[pd.Series, dict] = None,
size: int = 5,
min_NSE: float = 0.6,
workdir: Union[str, Path] = None,
overwrite: bool = False,
**kwds,
) -> Tuple[xr.DataArray, xr.Dataset]:
"""Perform regionalization for catchment whose outlet is defined by coordinates.
Parameters
----------
config : ravenpy.config.rvs.Config
Symbolic emulator configuration. Only GR4JCN, HMETS and Mohyse are supported.
method : {'MLR', 'SP', 'PS', 'SP_IDW', 'PS_IDW', 'SP_IDW_RA', 'PS_IDW_RA'}
Name of the regionalization method to use.
nash : pd.Series
NSE values for the parameters of gauged catchments.
params : pd.DataFrame
Model parameters of gauged catchments. Needed for all but MRL method.
props : pd.DataFrame
Properties of gauged catchments to be analyzed for the regionalization. Needed for MLR and RA methods.
target_props : pd.Series or dict
Properties of ungauged catchment. Needed for MLR and RA methods.
size : int
Number of catchments to use in the regionalization.
min_NSE : float
Minimum calibration NSE value required to be considered as a donor.
workdir : Union[str, Path]
Work directory. If None, a temporary directory will be created.
overwrite : bool
If True, existing files will be overwritten.
**kwds
Model configuration parameters, including the forcing files (ts).
Returns
-------
(qsim, ensemble)
qsim : DataArray (time, )
Multi-donor averaged predicted streamflow.
ensemble : Dataset
q_sim : DataArray (realization, time)
Ensemble of members based on number of donors.
parameter : DataArray (realization, param)
Parameters used to run the model.
"""
name = config.__class__.__name__
if name not in ["GR4JCN", "HMETS", "Mohyse"]:
raise ValueError(f"Emulator {name} is not supported for regionalization.")
if not config.is_symbolic:
raise ValueError("config should be a symbolic configuration.")
# TODO: Include list of available properties in docstring.
# TODO: Add error checking for source, target stuff wrt method chosen.
workdir = Path(workdir or tempfile.mkdtemp())
# Select properties based on those available in the ungauged properties DataFrame.
if isinstance(target_props, dict):
ungauged_properties = pd.Series(target_props)
elif isinstance(target_props, pd.Series):
ungauged_properties = target_props
elif isinstance(target_props, pd.DataFrame):
ungauged_properties = target_props.to_series()
else:
raise ValueError
cr = coords.realization(1 if method == "MLR" else size)
cp = coords.param(name)
# Filter on NSE
valid = nash > min_NSE
filtered_params = params.where(valid).dropna()
filtered_prop = props.where(valid).dropna()
# Check to see if we have enough data, otherwise raise error
if len(filtered_prop) < size and method != "MLR":
raise ValueError(
"Hydrological_model and minimum NSE threshold \
combination is too strict for the number of donor \
basins. Please reduce the number of donor basins OR \
reduce the minimum NSE threshold."
)
# Rank the matrix according to the similarity or distance.
if method in ["PS", "PS_IDW", "PS_IDW_RA"]: # Physical similarity
dist = similarity(filtered_prop, ungauged_properties)
else: # Geographical distance.
dist = distance(filtered_prop, ungauged_properties)
# Series of distances for the first `size` best donors
sdist = dist.sort_values().iloc[:size]
# Pick the donors' model parameters and catchment properties
sparams = filtered_params.loc[sdist.index]
sprop = filtered_prop.loc[sdist.index]
# Get the list of parameters to run
reg_params = regionalization_params(
method, sparams, sprop, ungauged_properties, filtered_params, filtered_prop
)
# Run the model over all parameters and create ensemble DataArray
ensemble = []
for i, rparams in enumerate(reg_params):
model_config_tmp = config.set_params(rparams)
out = Emulator(model_config_tmp, workdir=workdir / f"donor_{i}").run(
overwrite=overwrite
)
# Append to the ensemble.
ensemble.append(out)
qsim_obj = EnsembleReader(runs=ensemble, dim="members")
qsims = qsim_obj.hydrograph.q_sim
# 3. Aggregate runs into a single result -> dataset
if method in [
"MLR",
"SP",
"PS",
]: # Average (one realization for MLR, so no effect).
qsim = qsims.mean(dim="members", keep_attrs=True)
elif (
"IDW" in method
): # Here we are replacing the mean by the IDW average, keeping attributes and dimensions.
qsim = IDW(qsims, sdist)
else:
raise ValueError(f"No matching algorithm for {method}")
# Metadata handling
# TODO: Store the basin_name
# Create a DataArray for the parameters used in the regionalization
param_da = xr.DataArray(
reg_params,
dims=("members", "param"),
coords={"param": cp, "members": cr},
attrs={"long_name": "Model parameters used in the regionalization."},
)
ens = xr.Dataset(
data_vars={"q_sim": qsims, "parameter": param_da},
attrs={
"title": "Regionalization ensemble",
# "institution": "",
"source": f"Hydrological model {name}",
"history": "Created by ravenpy regionalize.",
# "references": "",
"comment": f"Regionalization method: {method}",
},
)
# TODO: Add global attributes (model name, date, version, etc)
return qsim, ens
[docs]
def read_gauged_properties(properties) -> pd.DataFrame:
"""Return table of gauged catchments properties over North America.
Returns
-------
pd.DataFrame
Catchment properties keyed by catchment ID.
"""
f = regionalisation_data_dir / "gauged_catchment_properties.csv"
proptable = pd.read_csv(f, index_col="ID")
return proptable[properties]
[docs]
def read_gauged_params(model):
"""Return table of NASH-Sutcliffe Efficiency values and model parameters for North American catchments.
Returns
-------
pd.DataFrame
Nash-Sutcliffe Efficiency keyed by catchment ID.
pd.DataFrame
Model parameters keyed by catchment ID.
"""
f = regionalisation_data_dir / f"{model}_parameters.csv"
params = pd.read_csv(f, index_col="ID")
return params["NASH"], params.iloc[:, 1:]
[docs]
def distance(gauged: pd.DataFrame, ungauged: pd.Series) -> pd.Series:
"""Return geographic distance [km] between ungauged and database of gauged catchments.
Parameters
----------
gauged : pd.DataFrame
Table containing columns for longitude and latitude of catchment's centroid.
ungauged : pd.Series
Coordinates of the ungauged catchment.
Returns
-------
pd.Series
"""
gauged_array = np.array(list(zip(gauged.latitude.values, gauged.longitude.values)))
return pd.Series(
data=haversine_vector(
gauged_array, np.array([ungauged.latitude, ungauged.longitude]), comb=True
)[0],
index=gauged.index,
)
[docs]
def similarity(
gauged: pd.DataFrame, ungauged: pd.DataFrame, kind: str = "ptp"
) -> pd.Series:
"""Return similarity measure between gauged and ungauged catchments.
Parameters
----------
gauged : pd.DataFrame
Gauged catchment properties.
ungauged : pd.DataFrame
Ungauged catchment properties
kind : {'ptp', 'std', 'iqr'}
Normalization method: peak to peak (maximum - minimum), standard deviation, inter-quartile range.
Returns
-------
pd.Series
"""
stats = gauged.describe()
if kind == "ptp":
spread = stats.loc["max"] - stats.loc["min"]
elif kind == "std":
spread = stats.loc["std"]
elif kind == "iqr":
spread = stats.loc["75%"] - stats.loc["25%"]
d = ungauged.values - gauged.values
n = np.abs(d) / spread.values
return pd.Series(data=n.sum(axis=1), index=gauged.index)
[docs]
def regionalization_params(
method: str,
gauged_params: pd.DataFrame,
gauged_properties: pd.DataFrame,
ungauged_properties: pd.DataFrame,
filtered_params: pd.DataFrame,
filtered_prop: pd.DataFrame,
) -> List[float]:
"""Return the model parameters to use for the regionalization.
Parameters
----------
method : {'MLR', 'SP', 'PS', 'SP_IDW', 'PS_IDW', 'SP_IDW_RA', 'PS_IDW_RA'}
Name of the regionalization method to use.
gauged_params : pd.DataFrame
A DataFrame of parameters for donor catchments (size = number of donors)
gauged_properties : pd.DataFrame
A DataFrame of properties of the donor catchments (size = number of donors)
ungauged_properties : pd.DataFrame
A DataFrame of properties of the ungauged catchment (size = 1)
filtered_params : pd.DataFrame
A DataFrame of parameters of all filtered catchments (size = all catchments with NSE > min_NSE)
filtered_prop : pd.DataFrame
A DataFrame of properties of all filtered catchments (size = all catchments with NSE > min_NSE)
Returns
-------
list
List of model parameters to be used for the regionalization.
"""
if method == "MLR" or "RA" in method:
mlr_params, r2 = multiple_linear_regression(
filtered_prop, filtered_params, ungauged_properties.to_frame().T
)
if method == "MLR": # Return the multiple linear regression parameters.
out = [
mlr_params,
]
elif "RA" in method:
gp = gauged_params.copy()
for p, r, col in zip(mlr_params, r2, gauged_params):
# If we have an R2 > 0.5 then we consider this to be a better estimator
if r > 0.5:
gp[col] = p
out = gp.values
else:
out = gauged_params.values
return out
[docs]
def IDW(qsims: xr.DataArray, dist: pd.Series) -> xr.DataArray:
"""Inverse distance weighting.
Parameters
----------
qsims : xr.DataArray
Ensemble of hydrogram stacked along the `members` dimension.
dist : pd.Series
Distance from catchment which generated each hydrogram to target catchment.
Returns
-------
xr.DataArray
Inverse distance weighted average of ensemble.
"""
# In IDW, weights are 1 / distance
weights = xr.DataArray(
1.0 / dist, dims="members", coords={"members": qsims.members}
)
# Make weights sum to one
weights /= weights.sum(axis=0)
# Calculate weighted average.
out = qsims.dot(weights)
out.name = qsims.name
out.attrs = qsims.attrs
return out
[docs]
def multiple_linear_regression(
source: pd.DataFrame, params: pd.DataFrame, target: pd.DataFrame
) -> Tuple[List[Any], List[int]]:
"""Multiple Linear Regression for model parameters over catchment properties.
Uses known catchment properties and model parameters to estimate model parameter over an
ungauged catchment using its properties.
Parameters
----------
source : pd.DataFrame
Properties of gauged catchments.
params : pd.DataFrame
Model parameters of gauged catchments.
target : pd.DataFrame
Properties of the ungauged catchment.
Returns
-------
list of Any, list of int
A named tuple of the estimated model parameters and the R2 of the linear regression.
"""
# Add constants to the gauged predictors
x = sm.add_constant(source)
# Add the constant 1 for the ungauged catchment predictors
predictors = sm.add_constant(target, prepend=True, has_constant="add")
# Perform regression for each parameter
regression = [sm.OLS(params[param].values, x).fit() for param in params]
# Perform prediction on each parameter based on the predictors
mlr_parameters = [r.predict(exog=predictors)[0] for r in regression]
# Extract the adjusted r_squared value for each parameter
r2 = [r.rsquared_adj for r in regression]
return mlr_parameters, r2