Source code for dataretrieval.waterdata.ratings

"""USGS rating-curve retrieval via the Water Data STAC catalog.

Wraps ``https://api.waterdata.usgs.gov/stac/v0/search`` and the per-feature
RDB downloads that follow. The STAC endpoint hosts standard NWIS rating
files (``exsa``, ``base``, ``corr``) for active streamgages — see the
service overview at https://api.waterdata.usgs.gov/docs/stac/ and the
WDFN announcement at https://waterdata.usgs.gov/blog/wdfn-rating-curves/.

The R analogue is ``read_waterdata_ratings`` in
https://github.com/DOI-USGS/dataRetrieval/.
"""

from __future__ import annotations

import logging
import os
from collections.abc import Iterable
from typing import Any, Literal, get_args

import pandas as pd
import requests

from dataretrieval.rdb import extract_rdb_comment, read_rdb

from .utils import (
    _DURATION_RE,
    BASE_URL,
    _check_monitoring_location_id,
    _default_headers,
    _format_api_dates,
)

logger = logging.getLogger(__name__)

STAC_URL = f"{BASE_URL}/stac/v0"

RATING_FILE_TYPE = Literal["exsa", "base", "corr"]
_VALID_FILE_TYPES = get_args(RATING_FILE_TYPE)


[docs] def get_ratings( monitoring_location_id: str | Iterable[str] | None = None, file_type: RATING_FILE_TYPE | list[RATING_FILE_TYPE] = "exsa", file_path: str | None = None, time: str | list[str] | None = None, bbox: list[float] | None = None, limit: int = 10000, download_and_parse: bool = True, ssl_check: bool = True, ) -> dict[str, pd.DataFrame] | list[dict[str, Any]]: """Get USGS stage-discharge rating curves from the Water Data STAC catalog. Returns the current rating tables for one or more active USGS streamgages. The catalog hosts three file types: - ``"exsa"`` — expanded shift-adjusted rating (default). Adds a ``SHIFT`` column to ``"base"`` indicating the current shift for each ``INDEP``. - ``"base"`` — three columns: ``INDEP`` (typically gage height, ft); ``DEP`` (typically discharge, ft^3/s); ``STOR`` ("``*``" marks fixed points of the rating). - ``"corr"`` — three columns: ``INDEP``; ``CORR`` (correction for that value); ``CORRINDEP`` (corrected INDEP). See https://api.waterdata.usgs.gov/docs/stac/ for the upstream service docs and https://waterdata.usgs.gov/blog/wdfn-rating-curves/ for the background announcement. The R analogue is ``read_waterdata_ratings`` in https://github.com/DOI-USGS/dataRetrieval/. Parameters ---------- monitoring_location_id : string or iterable of strings, optional One or more identifiers in ``AGENCY-ID`` form (e.g. ``"USGS-01104475"``). If omitted, the spatial / temporal filters determine the result set. file_type : ``"exsa"``, ``"base"``, ``"corr"``, or a list, default ``"exsa"`` Which rating file(s) to request. file_path : string, optional Directory the downloaded RDB files are written to. If ``None`` (the default), the parsed ``DataFrame`` is returned without persisting the bytes to disk; ``df.attrs["url"]`` still records where each rating came from. time : string or list of strings, optional STAC ``datetime`` filter (passed through verbatim under that name) — a single date / datetime, or an interval (``"start/end"``, optionally half-bounded with ``..``). ISO 8601 *durations* (``"P1M"``, ``"PT36H"``, …) are **not** supported by the rating-curve service; passing one raises ``ValueError``. bbox : list of numbers, optional Only features whose geometry intersects the bounding box are selected. Format: ``[xmin, ymin, xmax, ymax]`` in CRS 4326 (longitude / latitude, west-south-east-north). limit : int, default 10000 Page size for the STAC ``/search`` request (capped at 10000). download_and_parse : bool, default ``True`` If ``True``, download every matching RDB file and parse it into a ``DataFrame``. If ``False``, return the raw list of STAC feature dicts so the caller can inspect what's available before pulling bytes. ssl_check : bool, default ``True`` Verify the server's SSL certificate. Returns ------- dict[str, pandas.DataFrame] or list[dict] When ``download_and_parse=True`` (the default), a dict keyed by feature ID (e.g. ``"USGS-01104475.exsa.rdb"``) mapping to a parsed ``DataFrame``. Each frame carries provenance in ``df.attrs["comment"]`` (the RDB ``#``-prefixed header lines, like rating id, parameter, last-shifted timestamp) and ``df.attrs["url"]`` (the asset URL it was fetched from). When ``download_and_parse=False``, the raw list of STAC feature dicts as returned by the search endpoint. Raises ------ ValueError For an unrecognized ``file_type`` value or an ISO 8601 duration in ``time``. Examples -------- .. code:: >>> # Default exsa ratings for two sites >>> ratings = dataretrieval.waterdata.get_ratings( ... monitoring_location_id=["USGS-01104475", "USGS-01104460"], ... file_type="exsa", ... ) >>> ratings["USGS-01104475.exsa.rdb"].head() >>> # Both exsa and corr files for the same two sites >>> ratings = dataretrieval.waterdata.get_ratings( ... monitoring_location_id=["USGS-01104475", "USGS-01104460"], ... file_type=["exsa", "corr"], ... ) >>> # Bounding-box query, listing what's available without downloading >>> features = dataretrieval.waterdata.get_ratings( ... bbox=[-95.0, 40.0, -92.0, 42.0], ... download_and_parse=False, ... ) >>> # Restrict to features in a date range (durations not supported) >>> features = dataretrieval.waterdata.get_ratings( ... bbox=[-95.0, 40.0, -92.0, 42.0], ... time=["2026-04-29", ".."], ... download_and_parse=False, ... ) """ monitoring_location_id = _check_monitoring_location_id(monitoring_location_id) file_types = _as_list(file_type) invalid = [ft for ft in file_types if ft not in _VALID_FILE_TYPES] if invalid: raise ValueError( f"Invalid file_type {invalid!r}; " f"valid options are {list(_VALID_FILE_TYPES)}." ) if time is not None and any(_DURATION_RE.match(str(v)) for v in _as_list(time)): raise ValueError( "ISO 8601 durations (e.g. 'P7D') are not supported in `time` " "for the rating-curve service. Provide a date or interval instead." ) time_str = _format_api_dates(time) if time is not None else None # Mirror R: pin file_type server-side only when one type is requested. server_file_type = file_types[0] if len(file_types) == 1 else None filter_str = _build_filter(monitoring_location_id, server_file_type) features = _search(filter_str, time_str, bbox, limit, ssl_check) if not download_and_parse: return features requested = set(file_types) matching = [ f for f in features if f.get("properties", {}).get("file_type") in requested ] if file_path is not None: os.makedirs(file_path, exist_ok=True) out: dict[str, pd.DataFrame] = {} for feature in matching: fid = feature["id"] try: out[fid] = _download_and_parse(feature, file_path, ssl_check) except (requests.RequestException, ValueError, OSError) as e: logger.warning("Failed to download / parse %s: %s", fid, e) return out
def _as_list(x: str | Iterable[str]) -> list[str]: """Normalize a string or iterable-of-strings to a list.""" return [x] if isinstance(x, str) else list(x) def _quote_cql_str(value: str) -> str: """Escape a single-quoted CQL literal by doubling embedded quotes. Defends against malformed filters / injection on arbitrary user input, even though valid USGS monitoring-location IDs cannot contain a quote. """ return value.replace("'", "''") def _build_filter( monitoring_location_id: str | list[str] | None, file_type: str | None, ) -> str | None: """Compose the CQL filter sent to STAC ``/search``. Returns ``None`` when neither argument constrains the search. """ parts: list[str] = [] if monitoring_location_id is not None: ids = _as_list(monitoring_location_id) joined = "', '".join(_quote_cql_str(i) for i in ids) parts.append(f"monitoring_location_id IN ('{joined}')") if file_type is not None: parts.append(f"file_type = '{_quote_cql_str(file_type)}'") return " AND ".join(parts) if parts else None def _search( filter_str: str | None, time_str: str | None, bbox: list[float] | None, limit: int, ssl_check: bool, ) -> list[dict[str, Any]]: """Run a single STAC ``/search`` request and return its features.""" params: dict[str, Any] = {"limit": limit} if filter_str is not None: params["filter"] = filter_str if time_str is not None: params["datetime"] = time_str if bbox is not None: params["bbox"] = ",".join(map(str, bbox)) response = requests.get( f"{STAC_URL}/search", params=params, headers=_default_headers(), verify=ssl_check, ) response.raise_for_status() return response.json().get("features", []) def _download_and_parse( feature: dict[str, Any], file_path: str | None, ssl_check: bool, ) -> pd.DataFrame: """Fetch the feature's data asset, parse RDB, optionally persist to disk.""" url = feature["assets"]["data"]["href"] response = requests.get(url, headers=_default_headers(), verify=ssl_check) response.raise_for_status() if file_path is not None: with open(os.path.join(file_path, feature["id"]), "w") as f: f.write(response.text) df = read_rdb(response.text) df.attrs["comment"] = extract_rdb_comment(response.text) df.attrs["url"] = url return df