import datetime as dt
import logging
import os
import re
import warnings
from pathlib import Path
from typing import Any, List, Tuple, Union
from urllib.parse import urljoin
import pandas as pd
import xarray as xr
from pandas import DatetimeIndex, Series, Timestamp
from xarray import Dataset
from . import gis_import_error_message
try:
import fiona
except (ImportError, ModuleNotFoundError) as e:
msg = gis_import_error_message.format(Path(__file__).stem)
raise ImportError(msg) from e
LOGGER = logging.getLogger("PYWPS")
# Can be set at runtime with `$ env RAVENPY_THREDDS_URL=https://xx.yy.zz/geoserver/ ...`.
THREDDS_URL = os.environ.get(
"RAVENPY_THREDDS_URL", "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/"
)
if not THREDDS_URL.endswith("/"):
THREDDS_URL = f"{THREDDS_URL}/"
[docs]
def get_hindcast_day(region_coll: fiona.Collection, date, climate_model="GEPS"):
"""Generate a forecast dataset that can be used to run raven.
Data comes from the CASPAR archive and must be aggregated such that each file
contains forecast data for a single day, but for all forecast timesteps and
all members.
The code takes the region shapefile, the forecast date required, and the
climate_model to use, here GEPS by default, but eventually could be GEPS, GDPS, REPS or RDPS.
"""
# Get the file locations and filenames as a function of the climate model and date
[ds, times] = get_CASPAR_dataset(climate_model, date)
return get_subsetted_forecast(region_coll, ds, times, True)
[docs]
def get_CASPAR_dataset(
climate_model: str,
date: dt.datetime,
thredds: str = THREDDS_URL,
directory: str = "dodsC/birdhouse/disk2/caspar/daily/",
) -> Tuple[
xr.Dataset, List[Union[Union[DatetimeIndex, Series, Timestamp, Timestamp], Any]]
]:
"""Return CASPAR dataset.
Parameters
----------
climate_model : str
Type of climate model, for now only "GEPS" is supported.
date : dt.datetime
The date of the forecast.
thredds : str
The thredds server url. Default: "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/"
directory : str
The directory on the thredds server where the data is stored. Default: "dodsC/birdhouse/disk2/caspar/daily/"
Returns
-------
xr.Dataset
The forecast dataset.
"""
if thredds[-1] != "/":
warnings.warn(
"The thredds url should end with a slash. Appending it to the url."
)
thredds = f"{thredds}/"
if climate_model == "GEPS":
d = dt.datetime.strftime(date, "%Y%m%d")
file_location = urljoin(directory, f"GEPS_{d}.nc")
file_url = urljoin(thredds, file_location)
ds = xr.open_dataset(file_url)
# Here we also extract the times at 6-hour intervals as Raven must have
# constant timesteps and GEPS goes to 6 hours
start = pd.to_datetime(ds.time[0].values)
times = [start + dt.timedelta(hours=n) for n in range(0, 384, 6)]
else:
# Eventually: GDPS, RDPS and REPS
raise NotImplementedError("Only the GEPS model is currently supported")
# Checking that these exist.
for f in ["pr", "tas"]:
if f not in ds:
raise AttributeError(f"'{f}' not present in dataset")
return ds, times
[docs]
def get_ECCC_dataset(
climate_model: str,
thredds: str = THREDDS_URL,
directory: str = "dodsC/datasets/forecasts/eccc_geps/",
) -> Tuple[
Dataset, List[Union[Union[DatetimeIndex, Series, Timestamp, Timestamp], Any]]
]:
"""Return latest GEPS forecast dataset.
Parameters
----------
climate_model : str
Type of climate model, for now only "GEPS" is supported.
thredds : str
The thredds server url. Default: "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/"
directory : str
The directory on the thredds server where the data is stored. Default: "dodsC/datasets/forecasts/eccc_geps/"
Returns
-------
xr.Dataset
The forecast dataset.
"""
if thredds[-1] != "/":
warnings.warn(
"The thredds url should end with a slash. Appending it to the url."
)
thredds = f"{thredds}/"
if climate_model == "GEPS":
# Eventually the file will find a permanent home, until then let's use the test folder.
file_location = urljoin(directory, "GEPS_latest.ncml")
file_url = urljoin(thredds, file_location)
ds = xr.open_dataset(file_url)
# Here we also extract the times at 6-hour intervals as Raven must have
# constant timesteps and GEPS goes to 6 hours
start = pd.to_datetime(ds.time[0].values)
times = [start + dt.timedelta(hours=n) for n in range(0, 384, 6)]
else:
# Eventually: GDPS, RDPS and REPS
raise NotImplementedError("Only the GEPS model is currently supported")
# Checking that these exist. IF the files are still processing, possible that one or both are not available!
for f in ["pr", "tas"]:
if f not in ds:
raise AttributeError(f"'{f}' not present in dataset")
return ds, times
[docs]
def get_recent_ECCC_forecast(
region_coll: fiona.Collection, climate_model: str = "GEPS"
) -> xr.Dataset:
"""Generate a forecast dataset that can be used to run raven.
Data comes from the ECCC datamart and collected daily. It is aggregated
such that each file contains forecast data for a single day, but for all
forecast timesteps and all members.
The code takes the region shapefile and the climate_model to use, here GEPS
by default, but eventually could be GEPS, GDPS, REPS or RDPS.
Parameters
----------
region_coll : fiona.Collection
The region vectors.
climate_model : str
Type of climate model, for now only "GEPS" is supported.
Returns
-------
xr.Dataset
The forecast dataset.
"""
[ds, times] = get_ECCC_dataset(climate_model)
# Make the variable name compatible with the hindcasting tools.
ds = ds.rename({"member": "members"})
return get_subsetted_forecast(region_coll, ds, times, False)
[docs]
def get_subsetted_forecast(
region_coll: fiona.Collection,
ds: xr.Dataset,
times: Union[dt.datetime, xr.DataArray],
is_caspar: bool,
) -> xr.Dataset:
"""Get Subsetted Forecast.
This function takes a dataset, a region and the time sampling array and returns
the subsetted values for the given region and times.
Parameters
----------
region_coll : fiona.Collection
The region vectors.
ds : xr.Dataset
The dataset containing the raw, worldwide forecast data
times : dt.datetime or xr.DataArray
The array of times required to do the forecast.
is_caspar : bool
True if the data comes from Caspar, false otherwise. Used to define lat/lon on rotated grid.
Returns
-------
xr.Dataset
The forecast dataset.
"""
# Extract the bounding box to subset the entire forecast grid to something
# more manageable
lon_min = region_coll.bounds[0]
lon_max = region_coll.bounds[2]
lat_min = region_coll.bounds[1]
lat_max = region_coll.bounds[3]
# Add a very simple lon wraparound if data suggests for it
if ((ds.lon.min() >= 0) and (ds.lon.max() <= 360)) and (lon_max < 0):
lon_min += 360
lon_max += 360
# Subset the data to the desired location (bounding box) and times
ds = ds.where(
(ds.lon <= lon_max)
& (ds.lon >= lon_min)
& (ds.lat <= lat_max)
& (ds.lat >= lat_min),
drop=True,
).sel(time=times)
# Rioxarray requires CRS definitions for variables
# Get CRS, e.g. 4326
crs = int(re.match(r"epsg:(\d+)", region_coll.crs["init"]).group(1))
# Here the name of the variable could differ based on the Caspar file processing
tas = ds.tas.rio.write_crs(crs)
pr = ds.pr.rio.write_crs(crs)
ds = xr.merge([tas, pr])
# Now apply the mask of the basin contour and average the values to get a single time series
if is_caspar:
ds.rio.set_spatial_dims("rlon", "rlat")
ds["rlon"] = ds["rlon"] - 360
# clip the netcdf and average across space.
shdf = [next(iter(region_coll))["geometry"]]
forecast = ds.rio.clip(shdf, crs=crs)
forecast = forecast.mean(dim={"rlat", "rlon"}, keep_attrs=True)
else:
ds.rio.set_spatial_dims("lon", "lat")
ds["lon"] = ds["lon"] - 360
# clip the netcdf and average across space.
shdf = [next(iter(region_coll))["geometry"]]
forecast = ds.rio.clip(shdf, crs=crs)
forecast = forecast.mean(dim={"lat", "lon"}, keep_attrs=True)
return forecast