import collections
import re
from pathlib import Path
import cftime
from xarray import open_dataset
from ..config import commands as rc
from .conventions import RAVEN_OUTPUT_FMT
[docs]
def parse_diagnostics(fn: Path):
"""Return dictionary of performance metrics."""
import csv
if fn.exists():
out = collections.defaultdict(list)
with open(fn) as f:
reader = csv.reader(f.readlines())
header = next(reader)
for row in reader:
for key, val in zip(header, row):
if "DIAG" in key:
val = float(val) # type: ignore
out[key].append(val)
out.pop("")
return out
else:
raise FileNotFoundError(fn)
[docs]
def parse_nc(fn: Path):
"""Open netCDF dataset with xarray if the path is valid, otherwise return None."""
if fn.exists():
if fn.suffix == ".nc":
return open_dataset(fn)
else:
raise NotImplementedError
else:
raise FileNotFoundError(fn)
[docs]
def parse_solution(fn: Path, calendar: str = "PROLEPTIC_GREGORIAN"):
"""Return command objects from the model output `solution.rvc`."""
if fn.exists():
if fn.suffix == ".rvc":
solution = fn.read_text()
out = {
"hru_state_variable_table": rc.HRUStateVariableTable.parse(solution),
"basin_state_variables": rc.BasinStateVariables.parse(solution),
"start_date": _time_stamp_from_solution(solution, calendar),
}
return out
else:
raise NotImplementedError
else:
raise FileNotFoundError(fn)
[docs]
def parse_raven_messages(path):
"""Parse Raven_errors and extract the messages, structured by types."""
messages = {
"ERROR": [],
"WARNING": [],
"ADVISORY": [],
"SIMULATION COMPLETE": False,
}
# The error message for an unknown command is exceptionally on two lines
# (the second starts with a triple space)
for m in re.findall("^([A-Z ]+) :(.+)(?:\n (.+))?", path.read_text(), re.M):
if m[0] == "SIMULATION COMPLETE":
messages["SIMULATION COMPLETE"] = True
continue
msg_type = m[0]
msg = f"{m[1]} {m[2]}".strip()
if msg == "Errors found in input data. See Raven_errors.txt for details":
# Skip this one because it's a bit circular
continue
messages[msg_type].append(msg) # type: ignore
return messages
[docs]
def output_files(run_name: str, path: Path):
"""Return path to each output file if it exists."""
out = {}
for k, v in RAVEN_OUTPUT_FMT.items():
p = path / v.format(run_name=run_name + "_" if run_name is not None else "")
if p.exists():
out[k] = p
return out
[docs]
def parse_outputs(run_name: str = None, outputdir: [str, Path] = None):
"""Parse outputs from model execution.
Parameters
----------
run_name : str
RunName value identifying model outputs.
outputdir : str or Path
Path to model output directory. Current directory if None.
Returns
-------
dict
Dictionary holding model outputs:
- hydrograph: xarray.Dataset
- storage: xarray.Dataset
- solution: Dict[str, Command]
- diagnostics: Dict[str, list]
Notes
-----
Values are set to None if no file is found.
"""
if outputdir is None:
outputdir = Path.cwd()
elif isinstance(type(outputdir), str):
outputdir = Path(outputdir)
parser = RAVEN_OUTPUT_PARSERS
out = {}
files = output_files(run_name, outputdir)
for key, path in files.items():
if path.exists():
if key in parser:
out[key] = parser[key](path)
else:
out[key] = path.read_text()
return out
def _time_stamp_from_solution(solution: str, calendar: str) -> cftime.datetime:
"""Return datetime from solution TimeStamp."""
match = re.search(r":TimeStamp (\S+ \S+)", solution)
if match:
tt = cftime._parse_date(match.groups()[0])
return cftime.datetime(*tt, calendar=calendar)
RAVEN_OUTPUT_PARSERS = {
"solution": parse_solution,
"diagnostics": parse_diagnostics,
"storage": parse_nc,
"hydrograph": parse_nc,
"messages": parse_raven_messages,
}