Source code for ravenpy.ravenpy

"""Main module."""

import collections
import os
import shutil
import subprocess
import tempfile
import warnings
from pathlib import Path
from typing import List, Optional, Union
from warnings import warn

import xarray as xr

from .config import parsers
from .config.rvs import Config

RAVEN_EXEC_PATH = os.getenv("RAVENPY_RAVEN_BINARY_PATH") or shutil.which("raven")


[docs] class Emulator: def __init__( self, config: Config, workdir: Optional[Union[str, os.PathLike]] = None, modelname: Optional[str] = None, overwrite: bool = False, ): """Convenience class to work with the Raven modeling framework. Parameters ---------- config : Config Emulator Config instance fully parameterized, i.e. without symbolic expressions. workdir : Path or str Path to rv files and model outputs. If None, create a temporary directory. modelname : str, None File name stem of configuration files: `<modelname>.rv*`. overwrite : bool If True, overwrite existing files. """ self._config = config.model_copy(deep=True) self._workdir = Path(workdir or tempfile.mkdtemp()) self._modelname = modelname self.overwrite = overwrite self._output = None # Model output path self._output_path = None # Write model config files self._rv = self._config.write_rv( workdir=self.workdir, modelname=self.modelname, overwrite=overwrite ) # Grab modelname in case it was set by config.write_rv self._modelname = self._rv["rvi"].stem
[docs] def run(self, overwrite: bool = False) -> "OutputReader": """Run the model. This will write RV files if not already done. Parameters ---------- overwrite : bool If True, overwrite existing files. """ if not (self.workdir / f"{self.modelname}.rvi").exists(): # FIXME: No attribute 'write_rv' on Emulator [attribute-error] self.write_rv(overwrite=overwrite) self._output_path = run( self.modelname, self.workdir, "output", overwrite=overwrite ) self._output = OutputReader(self.config.run_name, path=self._output_path) return self._output
@property def config(self) -> Config: """Read-only model configuration.""" return self._config @property def workdir(self) -> Path: """Path to RV files and output subdirectory.""" return self._workdir @property def output_path(self) -> Optional[Path]: """Path to model outputs.""" if self._output_path is not None: return self._output_path warnings.warn("`output_path` not set. Model must be run first") @property def modelname(self) -> str: """File name stem of configuration files.""" return self._modelname @property def output(self) -> "OutputReader": """Return simulation output object.""" return self._output
[docs] def resume(self, timestamp: bool = True) -> Config: """Return new model configuration using state variables from the end of the run. timestamp: bool If False, ignore time stamp information in the solution. If True, the solution will set StartDate to the solution's timestamp. """ return self.config.set_solution( self.output.files["solution"], timestamp=timestamp )
[docs] class OutputReader: def __init__( self, run_name: Optional[str] = None, path: Optional[Union[str, Path]] = None ): """Class facilitating access to Raven model output. Parameters ---------- run_name : str, optional Simulation name, if any is specified by the `RunName` configuration. path : str or Path Output directory where model results are stored. Defaults to the current directory. """ self._run_name = run_name self._path = Path(path) if path else Path.cwd() self._files = parsers.output_files(self._run_name, self._path) self._nc_hydrograph = None self._nc_storage = None # TODO: Check if no files are found. Otherwise we get cryptic errors. # if self._files["hydrograph"] @property def files(self) -> dict: """Return paths to output files.""" return self._files @property def solution(self) -> Optional[dict]: """Return solution file content.""" solution = self.files.get("solution") if solution: return parsers.parse_solution(solution) @property def diagnostics(self) -> Optional[dict]: """Return model diagnostics.""" diag = self.files.get("diagnostics") if diag: return parsers.parse_diagnostics(diag) @property def hydrograph(self) -> xr.Dataset: """Return the hydrograph.""" if self._nc_hydrograph is None: h = self.files.get("hydrograph") if h: self._nc_hydrograph = parsers.parse_nc(h) return self._nc_hydrograph @property def storage(self) -> xr.Dataset: """Return the storage variables.""" if self._nc_storage is None: s = self.files.get("storage") if s: self._nc_storage = parsers.parse_nc(s) return self._nc_storage @property def messages(self) -> Optional[str]: msg = self.files.get("messages") if msg: return msg.read_text() @property def path(self) -> Path: """Path to output directory.""" return self._path
[docs] class EnsembleReader: def __init__( self, *, run_name: Optional[str] = None, paths: Optional[List[Union[str, os.PathLike]]] = None, runs: List[OutputReader] = None, dim: str = "member", ): """ Class facilitating access to ensemble of Raven outputs. Parameters ---------- run_name : str, None Name given to simulation, if any. paths : list[str or Path], optional List of output paths. Defaults to all directories in current directory. runs : List[OutputReader] List of OutputReader instances. dim : str Name of concatenation dimension. """ self._dim = dim if runs is not None: self._outputs = runs self._paths = [o.path for o in self._outputs] else: if paths is None: paths = [p for p in Path.cwd().iterdir() if p.is_dir()] self._paths = [Path(p) for p in paths] self._outputs = [OutputReader(run_name, p) for p in self._paths] @property def files(self): out = collections.defaultdict(list) for o in self._outputs: for k, v in o.files.items(): out[k].append(v) return out @property def storage(self): if len(self.files["storage"]) == 0: raise ValueError( "No file found, make sure you have the right `run_name` and output `paths`." ) return xr.concat( [xr.open_dataset(f) for f in self.files["storage"]], dim=self._dim ) @property def hydrograph(self): if len(self.files["hydrograph"]) == 0: raise ValueError( "No file found, make sure you have the right `run_name` and output `paths`." ) return xr.concat( [xr.open_dataset(f) for f in self.files["hydrograph"]], dim=self._dim, coords="different", )
[docs] def run( modelname: str, configdir: Union[str, Path], outputdir: Optional[Union[str, Path]] = None, overwrite: bool = True, verbose: bool = False, ) -> Path: """Run Raven given the path to an existing model configuration. Parameters ---------- modelname : str Configuration files stem, i.e. the file name without extension. configdir : Path or str Path to configuration files directory. outputdir : Path or str, optional Path to model simulation output. If None, will write to configdir/output. overwrite : bool If True, overwrite existing files. verbose : bool If True, always display Raven warnings. If False, warnings will only be printed if an error occurs. Return ------ Path Path to model outputs. """ if not RAVEN_EXEC_PATH: raise RuntimeError( "Could not find raven binary in PATH, and RAVENPY_RAVEN_BINARY_PATH env variable is not set" ) # Confirm configdir exists configdir = Path(configdir) if not configdir.exists(): raise OSError("Workdir should include configuration files.") # Create outputdir outputdir = Path(outputdir or "output") if not outputdir.is_absolute(): outputdir = configdir / outputdir if overwrite and outputdir.exists(): shutil.rmtree(str(outputdir)) if not outputdir.exists(): os.makedirs(str(outputdir)) # Launch executable, wait for completion. cmd = [RAVEN_EXEC_PATH, modelname, "-o", str(outputdir)] process = subprocess.Popen( cmd, cwd=configdir, stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True, ) stdout, stderr = process.communicate(input="\n") returncode = process.wait() # Deal with errors and warnings messages = parsers.parse_raven_messages(outputdir / "Raven_errors.txt") if messages["ERROR"] or verbose: for msg in messages["WARNING"] + messages["ADVISORY"]: warn(msg, category=RavenWarning) if messages["ERROR"]: raise RavenError( "\n".join([f"Config directory: {configdir}"] + messages["ERROR"]) ) if returncode != 0: raise OSError(f"Raven segfaulted : \n{stdout}") return outputdir
[docs] class RavenError(Exception): """ This is an error that is meant to be raised whenever a message of type "ERROR" is found in the Raven_errors.txt file resulting from a Raven (i.e. the C program) run. """ pass
[docs] class RavenWarning(Warning): """ This is a warning corresponding to a message of type "WARNING" in the Raven_errors.txt file resulting from a Raven (i.e. the C program) run. """ pass