#!/usr/bin/env python3
"""
Created on Fri Jul 17 09:11:58 2020
@author: ets
"""
import datetime as dt
import logging
import os
import tempfile
import warnings
from copy import deepcopy
from pathlib import Path
from typing import List, Union
from urllib.parse import urlparse
import climpred
import xarray as xr
from climpred import HindcastEnsemble
from ravenpy import Emulator, EnsembleReader
from ravenpy.config import commands as rc
from ravenpy.config.rvs import Config
LOGGER = logging.getLogger("PYWPS")
# Can be set at runtime with `$ env RAVENPY_THREDDS_URL=https://xx.yy.zz/thredds/ ...`.
THREDDS_URL = os.environ.get(
"RAVENPY_THREDDS_URL", "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/"
)
if not THREDDS_URL.endswith("/"):
THREDDS_URL = f"{THREDDS_URL}/"
[docs]
def climatology_esp(
config,
workdir: Union[str, Path] = None,
years: List[int] = None,
overwrite: bool = False,
) -> EnsembleReader:
"""Ensemble Streamflow Prediction based on historical variability.
Run the model using forcing for different years.
No model warm-up is performed by this function, make sure the initial states are
consistent with the start date.
Parameters
----------
config : ravenpy.config.rvs.Config
Model configuration.
years : List[int]
Years from which forcing time series will be drawn.
If None, run for all years where forcing data is available.
workdir : str or Path
The path to rv files and model outputs. If None, create a temporary directory.
overwrite : bool
Whether to overwrite existing values or not. Default: False
Returns
-------
EnsembleReader
Class facilitating the analysis of multiple Raven outputs.
"""
workdir = Path(workdir or tempfile.mkdtemp())
# Time from meteo forcing files.
time = _get_time(config)
if years is None:
# Create list of years
years = list(range(time[0].dt.year.values, time[-1].dt.year.values + 1))
# Define duration and end_date
# Make sure configuration uses duration instead of end_date
if config.duration is None:
if config.end_date is None or config.start_date is None:
raise ValueError("StartDate and Duration must be configured.")
duration = config.end_date - config.start_date
end = config.end_date
config.duration = duration.days
config.end_date = None
else:
if config.start_date is None:
raise ValueError("StartDate must be configured.")
duration = config.duration
end = config.start_date + dt.timedelta(days=duration)
# Define start date
start = config.start_date
if start.month == 2 and start.day == 29:
start = start.replace(day=28)
# Get the start date that is immutable
startdate_fixed = deepcopy(start)
# Run the model for each year
ensemble = []
for year in years:
# Prepare model instance
config.start_date = s = startdate_fixed.replace(year=year)
if len(time.sel(time=slice(str(s), str(end.replace(year=year))))) < duration:
continue
out = Emulator(config=config, workdir=workdir / f"Y{year}").run(
overwrite=overwrite
)
# Format output so that `time` is set to the forecast time, not the forcing time.
if "hydrograph" in out.files:
out.files["hydrograph"] = _shift_esp_time(
out.files["hydrograph"], startdate_fixed.year
)
if "storage" in out.files:
out.files["storage"] = _shift_esp_time(
out.files["storage"], startdate_fixed.year
)
ensemble.append(out)
return EnsembleReader(run_name=config.run_name, runs=ensemble)
[docs]
def to_climpred_hindcast_ensemble(
hindcast: xr.Dataset, observations: xr.Dataset
) -> climpred.HindcastEnsemble:
"""Create a hindcasting object that can be used by the `climpred` toolbox for hindcast verification.
Parameters
----------
hindcast : xarray.Dataset
The hindcasted streamflow data for a given period.
observations : xarray.Dataset
The streamflow observations that are used to verify the hindcasts.
Returns
-------
climpred.HindcastEnsemble
The hindcast ensemble formatted to be used in climpred.
"""
# Make sure the variable names are the same.
var_names = set(hindcast.data_vars.keys()).intersection(
observations.data_vars.keys()
)
if len(var_names) == 0:
raise ValueError(
"`hindcast` and `observations` should have a variable with the same name."
)
# Todo: Add verification of sizes, catch and return message.
# Make the hindcastEnsemble object for the hindcast data
hindcast_obj = HindcastEnsemble(hindcast)
# Add the observations to that hindcastEnsemble object for verification.
hindcast_obj = hindcast_obj.add_observations(observations)
return hindcast_obj
[docs]
def warm_up(
config, duration: int, workdir: Union[str, Path] = None, overwrite: bool = False
) -> Config:
"""Run the model on a time series preceding the start date.
Parameters
----------
config : ravenpy.config.rvs.Config
Model configuration.
duration : int
Number of days the warm-up simulation should last *before* the start date.
workdir: Path
Work directory.
overwrite : bool
If True, overwrite existing files.
Returns
-------
ravenpy.config.rvs.Config
Model configuration with initial state set by running the model prior to the start date.
"""
wup = config.model_copy(deep=True)
wup.start_date = config.start_date - dt.timedelta(days=duration)
wup.duration = duration
wup.end_date = None
out = Emulator(wup, workdir=workdir).run(overwrite=overwrite)
return config.set_solution(out.files["solution"])
[docs]
def hindcast_climatology_esp(
config: Config,
warm_up_duration: int,
years: List[int] = None,
hindcast_years: List[int] = None,
workdir: Union[str, Path] = None,
overwrite: bool = False,
) -> xr.Dataset:
"""Hindcast of Ensemble Prediction Streamflow.
This function runs an emulator initialized for each year in `hindcast_years`,
using the forcing time series for each year in `years`. This allows an assessment
of the performance of the ESP. The total number of simulations is given by
`len(years) * len(hindcasts_years)`.
Parameters
----------
config : ravenpy.config.rvs.Config
Model configuration. Initial states will be overwritten.
warm_up_duration : int
Number of days to run the model prior to the starting date to initialize the state variables.
workdir : Path
Work directory. If None, creates a temporary directory.
years : List[int]
Years from which forcing time series will be drawn. If None, run for all years where
forcing data is available.
hindcast_years : List[int]
Years for which the model will be initialized and the `climatology_esp` function run.
Defaults to all years when forcing data is available.
overwrite : bool
If True, overwrite existing files.
Returns
-------
xarray.DataArray
The array containing the (init, member, lead) dimensions ready for using in climpred. (qsim)
Notes
-----
The dataset output dimensions are
- `init`: hindcast issue date,
- `member`: ESP members of the hindcasting experiment,
- `lead`: number of lead days of the forecast.
"""
workdir = Path(workdir or tempfile.mkdtemp())
# `hindcast_years` defaults to all available years
time = _get_time(config)
if hindcast_years is None:
# Create list of years
hindcast_years = list(
range(time[0].dt.year.values, time[-1].dt.year.values + 1)
)
# Define start date
start = config.start_date
if start.month == 2 and start.day == 29:
start = start.replace(day=28)
# Run climatology_esp for each year in `hindcast_years`, with initial conditions set
# by a warm-up simulation.
q_sims = {}
for year in hindcast_years:
# Compute initial state for each year
config.start_date = e = start.replace(year=year)
s = e - dt.timedelta(days=warm_up_duration)
if len(time.sel(time=slice(str(s), str(e)))) < warm_up_duration:
continue
# Run warm-up simulation to set initial state
conf = warm_up(
config,
duration=warm_up_duration,
workdir=workdir / "wup" / f"Y{year}",
overwrite=overwrite,
)
# Run climatology ESP
esp = climatology_esp(
conf, years=years, workdir=workdir / "esp" / f"Y{year}", overwrite=overwrite
)
# Format results for climpred
q_sim = esp.hydrograph.q_sim.expand_dims(
init=[
config.start_date,
]
)
q_sim = q_sim.rename({"time": "lead"})
q_sim.lead.attrs["units"] = "days"
q_sim = q_sim.assign_coords(lead=list(range(1, len(q_sim.lead) + 1)))
q_sims[year] = q_sim
# Format results for climpred
q_sims = xr.concat(q_sims.values(), dim="init")
q_sims.lead.attrs["units"] = "days"
# q_sims = q_sims.assign_coords(init=q_sims.init)
q_sims = q_sims.to_dataset()
return q_sims
def _shift_esp_time(nc, year, dim="member"):
"""Modify netCDF time to facilitate ensemble analysis out of ESP forecasts.
Modify time such that it starts on the given year, and add member dimension with original year as value.
"""
ds = xr.open_dataset(nc, use_cftime=True)
# Create new time coordinate
start = ds.time.data[0]
freq = xr.infer_freq(ds.time)
ds["time"] = xr.cftime_range(
start.replace(year=year), periods=len(ds.time), freq=freq
)
# New coordinate dimension to store the original year
out = ds.expand_dims(
{
dim: [
start.year,
]
}
)
# Write to disk
fn = nc.parent.joinpath(f"{nc.stem}_shifted{nc.suffix}")
out.to_netcdf(fn, mode="w")
return fn
def _get_time(config: Config) -> xr.DataArray:
"""Return time DataArray."""
if config.gauge is not None:
time = config.gauge[0].ds.time
elif config.station_forcing is not None:
time = config.station_forcing[0].da.time
elif config.gridded_forcing is not None:
time = config.gridded_forcing[0].da.time
else:
raise NotImplementedError()
return time
[docs]
def ensemble_prediction(
config,
forecast: Union[str, Path],
ens_dim: str = "member",
workdir=None,
overwrite=True,
**kwds,
) -> EnsembleReader:
r"""Ensemble Streamflow Prediction based on historical weather forecasts (CASPAR or other).
Run the model using forcing for different years.
No model warm-up is performed by this function, make sure the initial states are consistent with the start date.
Parameters
----------
config : ravenpy.config.rvs.Config
Model configuration.
forecast : str or Path
Forecast subsetted to the catchment location (.nc).
ens_dim : str
Name of dimension to iterate over.
workdir : str or Path
The path to rv files and model outputs. If None, create temporary directory.
overwrite : bool
Overwrite files when writing to disk.
\*\*kwds
Keywords for the `Gauge.from_nc` function.
Returns
-------
EnsembleReader
Class facilitating the analysis of multiple Raven outputs.
"""
workdir = Path(workdir or tempfile.mkdtemp())
# Run the model for each year
ensemble = []
forecast_ds = xr.open_dataset(forecast, use_cftime=True)
for member in range(0, len(forecast_ds[ens_dim])):
# Prepare model instance
out = Emulator(
config=config.model_copy(
update={
"gauge": [
rc.Gauge.from_nc(
forecast,
station_idx=member + 1,
**kwds,
),
]
},
),
workdir=workdir / f"Y{member}",
).run(overwrite=overwrite)
# Append to the ensemble.
ensemble.append(out)
return EnsembleReader(run_name=config.run_name, runs=ensemble, dim=ens_dim)
# Alias
hindcast_from_meteo_forecast = ensemble_prediction
[docs]
def compute_forecast_flood_risk(
forecast: xr.Dataset, flood_level: float, thredds: str = THREDDS_URL
) -> xr.Dataset:
"""Returns the empirical exceedance probability for each forecast day based on a flood level threshold.
Parameters
----------
forecast : xr.Dataset
Ensemble or deterministic streamflow forecast.
flood_level : float
Flood level threshold. Will be used to determine if forecasts exceed
this specified flood threshold. Should be in the same units as the forecasted streamflow.
thredds : str
The thredds server url. Default: "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/"
Returns
-------
xr.Dataset
Time series of probabilities of flood level exceedance.
"""
if thredds[-1] != "/":
warnings.warn(
"The thredds url should end with a slash. Appending it to the url."
)
thredds = f"{thredds}/"
# ---- Calculations ---- #
# Ensemble: for each day, calculate the percentage of members that are above the threshold
if "member" in forecast.coords:
# Get number of members originally
number_members = len(forecast.member)
# now compute the ratio of cases that are above the threshold
pct = (
forecast.where(forecast > flood_level)
.notnull()
.sum(dim="member", keep_attrs=True)
/ number_members
)
# it's deterministic:
else:
pct = (
forecast.where(forecast > flood_level).notnull() / 1.0
) # This is needed to return values instead of floats
domain = urlparse(thredds).netloc
out = pct.to_dataset(name="exceedance_probability")
out.attrs["source"] = f"PAVICS-Hydro flood risk forecasting tool, {domain}"
out.attrs["history"] = (
f"File created on {dt.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')} "
f"UTC on the PAVICS-Hydro service available at {domain}."
)
out.attrs["title"] = (
"Identification of ensemble members that exceed a certain flow threshold."
)
return out