"""Tools for reading and writing geospatial data formats."""
import logging
import os
import tarfile
import tempfile
import warnings
import zipfile
from pathlib import Path
from re import search
from typing import List, Optional, Sequence, Tuple, Union
from . import gis_import_error_message
try:
import fiona
import rasterio
from pyproj import CRS
from shapely.geometry import shape
except (ImportError, ModuleNotFoundError) as e:
msg = gis_import_error_message.format(Path(__file__).stem)
raise ImportError(msg) from e
LOGGER = logging.getLogger("RavenPy")
WGS84 = 4326
# Function addressing exploit CVE-2007-4559
[docs]
def is_within_directory(
directory: Union[str, os.PathLike], target: Union[str, os.PathLike]
) -> bool:
abs_directory = os.path.abspath(directory)
abs_target = os.path.abspath(target)
prefix = os.path.commonprefix([abs_directory, abs_target])
return prefix == abs_directory
# Function addressing exploit CVE-2007-4559
[docs]
def address_append(address: Union[str, Path]) -> str:
"""Format a URL/URI to be more easily read with libraries such as "rasterstats".
Parameters
----------
address : Union[str, Path]
URL/URI to a potential zip or tar file
Returns
-------
str
URL/URI prefixed for archive type
"""
zipped = search(r"(\.zip)", str(address))
tarred = search(r"(\.tar)", str(address))
try:
if zipped:
return f"zip://{address}"
elif tarred:
return f"tar://{address}"
else:
LOGGER.info("No prefixes needed for address.")
return str(address)
except Exception:
LOGGER.error("Failed to prefix or parse URL %s." % address)
raise
[docs]
def archive_sniffer(
archives: Union[str, Path, List[Union[str, Path]]],
working_dir: Optional[Union[str, Path]] = None,
extensions: Optional[Sequence[str]] = None,
) -> List[Union[str, Path]]:
"""Return a list of locally unarchived files that match the desired extensions.
Parameters
----------
archives : str or Path or list of str or Path
Archive location or list of archive locations.
working_dir : str or Path, optional
String or Path to a working location.
extensions : Sequence of str, optional
List of accepted extensions.
Returns
-------
list of str or Path
List of files with matching accepted extensions.
"""
potential_files = list()
if not extensions:
extensions = [".gml", ".shp", ".geojson", ".gpkg", ".json"]
decompressed_files = generic_extract_archive(archives, output_dir=working_dir)
for file in decompressed_files:
if any(ext in Path(file).suffix for ext in extensions):
potential_files.append(file)
return potential_files
[docs]
def crs_sniffer(
*args: Union[str, Path, Sequence[Union[str, Path]]]
) -> Union[List[Union[str, int]], str, int]:
"""Return the list of CRS found in files.
Parameters
----------
args : Union[str, Path, Sequence[Union[str, Path]]]
Path(s) to the file(s) to examine.
Returns
-------
Union[List[str], str]
Returns either a list of CRSes or a single CRS definition, depending on the number of instances found.
"""
crs_list = list()
vectors = (".gml", ".shp", ".geojson", ".gpkg", ".json")
rasters = (".tif", ".tiff")
all_files = vectors + rasters
for file in args:
found_crs = False
suffix = Path(file).suffix.lower()
try:
if suffix == ".zip":
file = archive_sniffer(file, extensions=all_files)[0]
suffix = Path(file).suffix.lower()
if suffix in vectors:
if suffix == ".gpkg":
if len(fiona.listlayers(file)) > 1:
raise NotImplementedError
with fiona.open(file, "r") as src:
found_crs = CRS.from_wkt(src.crs_wkt).to_epsg()
elif suffix in rasters:
with rasterio.open(file, "r") as src:
found_crs = CRS.from_user_input(src.crs).to_epsg()
else:
raise FileNotFoundError("Invalid filename suffix")
except FileNotFoundError as e:
msg = f"{e}: Unable to open file {args}"
LOGGER.warning(msg)
raise Exception(msg)
except NotImplementedError as e:
msg = f"{e}: Multilayer GeoPackages are currently unsupported"
LOGGER.error(msg)
raise Exception(msg)
except RuntimeError:
pass
crs_list.append(found_crs)
if crs_list is None:
msg = f"No CRS definitions found in {args}."
raise FileNotFoundError(msg)
if len(crs_list) == 1:
if not crs_list[0]:
msg = f"No CRS definitions found in {args}. Assuming {WGS84}."
LOGGER.warning(msg)
warnings.warn(msg, UserWarning)
return WGS84
return crs_list[0]
return crs_list
[docs]
def raster_datatype_sniffer(file: Union[str, Path]) -> str:
"""Return the type of the raster stored in the file.
Parameters
----------
file : Union[str, Path]
Path to file.
Returns
-------
str
rasterio datatype of array values
"""
try:
with rasterio.open(file, "r") as src:
dtype = src.dtypes[0]
return dtype
except rasterio.errors.RasterioError:
msg = f"Unable to read data type from {file}."
LOGGER.exception(msg)
raise ValueError(msg)
[docs]
def get_bbox(
vector: Union[str, Path], all_features: bool = True
) -> Tuple[float, float, float, float]:
"""Return bounding box of all features or the first feature in file.
Parameters
----------
vector : str or Path
A path to file storing vector features.
all_features : bool
Return the bounding box for all features. Default: True.
Returns
-------
float, float, float, float
Geographic coordinates of the bounding box (lon0, lat0, lon1, lat1).
"""
if not all_features:
with fiona.open(vector, "r") as src:
for feature in src:
geom = shape(feature["geometry"])
return geom.bounds
with fiona.open(vector, "r") as src:
return src.bounds