Source code for ravenpy.config.utils

import os
import typing
from typing import Optional, Sequence, Union

import cf_xarray  # noqa: F401
import numpy as np
import xarray as xr

from .conventions import CF_RAVEN, MonthlyAverages
from .defaults import RAVEN_NO_DATA_VALUE


[docs] def nc_specs( fn: Union[str, os.PathLike[str]], data_type: str, station_idx: Optional[int] = None, alt_names: Union[str, Sequence[str]] = None, mon_ave: bool = False, engine: str = "h5netcdf", # FIXME: Is this call signature still relevant? linear_transform=None, ): """Extract specifications from netCDF file. Parameters ---------- fn : str, Path NetCDF file path or DAP link. data_type : str Raven data type. station_idx : int, optional Index along station dimension. Starts at 1. alt_names : str, list Alternative variable names for data type if not the CF standard default. mon_ave : bool If True, compute the monthly average. engine : str The engine used to open the dataset. Default is 'h5netcdf'. Returns ------- dict Notes ----- Attributes: var_name_nc, dim_names_nc, units, linear_transform, latitude_var_name_nc, longitude_var_name_nc, elevation_var_name_nc latitude, longitude, elevation, name """ from ravenpy.utilities.coords import infer_scale_and_offset if isinstance(fn, str) and str(fn)[:4] == "http": pass elif os.path.exists(fn): # `strict` kwarg is not available in Python 3.9 try: fn = os.path.realpath(fn, strict=True) except TypeError: fn = os.path.realpath(fn) else: raise ValueError("NetCDF file not found.") if alt_names is None: alt_names = () elif isinstance(alt_names, str): alt_names = (alt_names,) attrs = { "file_name_nc": fn, "data_type": data_type, "forcing_type": data_type, "station_idx": station_idx, } with xr.open_dataset(fn, engine=engine) as ds: var_names = CF_RAVEN.get(data_type, ()) + tuple(alt_names) if len(var_names) == 0: raise ValueError( f"Provide alternative variable name with `alt_names` for {data_type}" ) for v in var_names: if v in ds.data_vars: nc_var = ds[v] attrs["var_name_nc"] = v attrs["dim_names_nc"] = nc_var.dims attrs["_time_dim_name_nc"] = ds.cf["time"].name attrs["_dim_size_nc"] = dict(zip(nc_var.dims, nc_var.shape)) attrs["units"] = nc_var.attrs.get("units") if attrs["units"] is not None: s, o = infer_scale_and_offset(nc_var, data_type) attrs["linear_transform"] = dict(scale=s, offset=o) if mon_ave: ma = MonthlyAverages.get(data_type) if ma: attrs[ma] = nc_var.groupby("time.month").mean().values.tolist() break else: raise ValueError(f"No variable found for {data_type}.\n {ds.data_vars}") if station_idx is not None: # Convert to NumPy 0-based indexing i = station_idx - 1 try: attrs["latitude_var_name_nc"] = ds.cf["latitude"].name attrs["longitude_var_name_nc"] = ds.cf["longitude"].name attrs["latitude"] = ds.cf["latitude"][i] attrs["longitude"] = ds.cf["longitude"][i] except KeyError: pass try: nc_elev = ds.cf["vertical"].name except KeyError: nc_elev = "elevation" if "elevation" in ds else None finally: if nc_elev is not None: attrs["elevation_var_name_nc"] = nc_elev try: attrs["elevation"] = ds[nc_elev][i] except IndexError: # Elevation is a scalar attrs["elevation"] = ds[nc_elev].item(0) if "station_id" in ds: if ds["station_id"].shape and len(ds["station_id"]) > i: attrs["name"] = ds["station_id"].values[i] return attrs
[docs] def filter_for(kls, attrs, **kwds): """Return attributes that are fields of dataclass. Notes ----- If attrs includes an attribute name and its Raven alias, e.g. `linear_transform` and `LinearTransform`, the latter will have priority. """ from pydantic._internal._model_construction import ModelMetaclass from .commands import Command attrs.update(kwds) if hasattr(kls, "__dataclass_fields__"): return { k: v for (k, v) in attrs.items() if k in kls.__dataclass_fields__.keys() } else: out = {} for key, field in kls.model_fields.items(): if field.alias in attrs: out[field.alias] = attrs[field.alias] elif key in attrs: out[key] = attrs[key] elif isinstance(field.annotation, ModelMetaclass) and issubclass( field.annotation, Command ): out[key] = filter_for(field.annotation, attrs) return out
# return {k: v for (k, v) in attrs.items() if k in kls.__fields__.keys()}
[docs] def get_average_annual_runoff( nc_file_path: Union[str, os.PathLike[str]], area_in_m2: float, time_dim: str = "time", obs_var: str = "qobs", na_value: Union[int, float] = RAVEN_NO_DATA_VALUE, ): """Compute the average annual runoff from observed data.""" with xr.open_dataset(nc_file_path) as ds: q_obs = ds.where(ds[obs_var] != na_value)[obs_var] q_obs *= 86400.0 # convert m**3/s to m**3/d axis = q_obs.dims.index(time_dim) # avg daily runoff [m3/d] for each year in record q_year = np.nanmean(q_obs.groupby("time.year").mean("time"), axis=axis) q_year = q_year / area_in_m2 * 365 * 1000.0 # [mm/yr] for each year in record q_year = np.mean(q_year) # [mm/yr] mean over all years in record return q_year
[docs] def get_annotations(a): """Return all annotations inside [] or Union[...].""" for arg in typing.get_args(a): if typing.get_origin(arg) == Union: yield from get_annotations(arg) else: yield arg