Source code for dataretrieval.wateruse

"""Retrieve USGS water-use data from the National Water Availability
Assessment Data Companion (NWDC).

The NWDC web services provide national-scale, USGS-modeled water-use data that
underlie the `USGS National Water Availability Assessment
<https://water.usgs.gov/nwaa-data/>`_. Estimates are served on a HUC12
(12-digit hydrologic unit) spatial grid and can be queried for any county,
state, or hydrologic unit. This is the modern replacement for the defunct
legacy NWIS water-use service (``nwis.get_water_use``).

Unlike the main Water Data getters (:mod:`dataretrieval.waterdata`) and NGWMN
(:mod:`dataretrieval.ngwmn`), the NWDC is a plain CSV REST service rather than
an OGC API Features collection. This module supplies the NWDC-specific bits —
request building, CSV parsing, the ``Link``-header cursor, and the ``{detail}``
error envelope — but reuses the OGC engine's generic, API-agnostic pagination
and sync-from-async plumbing (:func:`~dataretrieval.ogc.engine._paginate` and
:func:`~dataretrieval.ogc.engine._run_sync`) rather than re-implementing it. It
follows the same conventions: shared request headers
(:func:`~dataretrieval.utils._default_headers`), the typed
:class:`~dataretrieval.exceptions.DataRetrievalError` taxonomy, and a
``(DataFrame, BaseMetadata)`` return.

See https://api.water.usgs.gov/docs/nwaa-data/ for the API reference and
https://water.usgs.gov/nwaa-data/ for the catalog of available models and
variables.

Examples
--------
.. code-block:: python

    from dataretrieval import wateruse

    # Monthly public-supply withdrawals for Rhode Island, 2020 onward.
    df, md = wateruse.get_wateruse(
        model="wu-public-supply-wd",
        variable=["pswdtot", "pswdgw", "pswdsw"],
        state="RI",
        start_date="2020-01",
        time_resolution="monthly",
    )

"""

from __future__ import annotations

import asyncio
import io
from collections.abc import Callable, Iterable
from typing import Any

import httpx
import pandas as pd

from dataretrieval.codes.states import to_state
from dataretrieval.exceptions import DataRetrievalError
from dataretrieval.ogc.engine import _paginate, _run_sync
from dataretrieval.ogc.planning import _combine_chunk_frames, _combine_chunk_responses
from dataretrieval.utils import (
    HTTPX_DEFAULTS,
    BaseMetadata,
    _default_headers,
    _raise_for_status,
    to_str,
)

WATERUSE_URL = "https://api.water.usgs.gov/nwaa-data/data"

#: Water-use models (categories) served by the NWDC. The catalog at
#: https://water.usgs.gov/nwaa-data/ lists the variables available within each.
MODELS = (
    "wu-public-supply-wd",  # public-supply withdrawals
    "wu-public-supply-cu",  # public-supply consumptive use
    "wu-thermoelectric",  # thermoelectric-power water use
    "wu-irrigation-wd",  # irrigation withdrawals
    "wu-irrigation-cu",  # irrigation consumptive use
)

#: Temporal resolutions: monthly, annual calendar year, annual water year.
TIME_RESOLUTIONS = ("monthly", "annualcy", "annualwy")

#: Maximum locations fetched concurrently when a list of state/county/huc
#: selectors is fanned out (one request per location). Kept conservative
#: because this module intentionally carries no request backoff/retry; the
#: NWDC tolerates this level of concurrency without rate-limit errors (verified
#: by stress test). Set ``wateruse.MAX_CONCURRENT_REQUESTS = 1`` for serial.
MAX_CONCURRENT_REQUESTS = 4

# Page responses carry the HUC12 identifier in this column; it must stay a
# string so leading zeros (e.g. "010900020502") survive the round trip.
_HUC12_COLUMN = "huc12_id"


[docs] def get_wateruse( model: str, variable: str | Iterable[str] | None = None, state: str | int | Iterable[str | int] | None = None, county: str | Iterable[str] | None = None, huc: str | Iterable[str] | None = None, time_resolution: str | None = None, start_date: str | None = None, end_date: str | None = None, intersection: str = "overlap", limit: int = 600, ssl_check: bool = True, ) -> tuple[pd.DataFrame, BaseMetadata]: """Get USGS water-use data from the NWDC web service. Retrieves modeled water-use estimates from the USGS National Water Availability Assessment Data Companion. The area is given as exactly one of ``state``, ``county``, or ``huc``; results are always returned on a HUC12 grid, in a long (tidy) frame with one row per HUC12 and time step. Large areas (e.g. a whole region or a populous state) are served across multiple pages, which this function follows transparently and concatenates into one frame. Each selector also accepts a list of values. The NWDC queries one area per request, so a list is fanned out into one request per value — up to :data:`MAX_CONCURRENT_REQUESTS` in parallel — and the results are concatenated in the order given. Parameters ---------- model : string Water-use category to query. See :data:`MODELS` for the available options (e.g. ``"wu-public-supply-wd"``). The full catalog of models and their variables is at https://water.usgs.gov/nwaa-data/. variable : string or iterable of strings, optional One or more variable IDs within ``model`` (e.g. ``"pswdtot"`` for total public-supply withdrawals, or ``["pswdgw", "pswdsw"]`` for the groundwater and surface-water components). Multiple variables are comma-joined into a single request. The service requires at least one variable; omitting it returns a 400 listing the model's valid variable IDs (surfaced as a :class:`~dataretrieval.exceptions.DataRetrievalError`). state : string, int, or iterable, optional One or more US states/territories to query. Each accepts a full name (``"Wisconsin"``), a two-letter postal code (``"WI"``), or a two-digit ANSI/FIPS code (``"55"`` or ``55``), mirroring :func:`dataretrieval.ngwmn.get_sites`. county : string or iterable, optional One or more five-digit county FIPS codes — state FIPS + county FIPS, e.g. ``"55025"`` for Dane County, Wisconsin. huc : string or iterable, optional One or more hydrologic unit codes. Each code's level is taken from its length: a 2-digit code queries a HUC2 region, 8-digit a HUC8 subbasin, 12-digit a single HUC12, and so on (even lengths 2-12, e.g. ``"04"``, ``"07070005"``, ``"010900020502"``). Provide exactly one of ``state``, ``county``, or ``huc`` (each may be a single value or a list). time_resolution : string, optional Temporal resolution: ``"monthly"``, ``"annualcy"`` (annual, calendar year), or ``"annualwy"`` (annual, water year). See :data:`TIME_RESOLUTIONS`. start_date : string, optional Start of the query window, formatted ``"YYYY"`` for annual data or ``"YYYY-MM"`` for monthly data. end_date : string, optional End of the query window, in the same format as ``start_date``. intersection : string, optional How to select HUC12s that straddle the queried-area boundary: ``"overlap"`` (any overlap, the default) or ``"envelop"`` (fully enclosed). limit : int, optional Maximum number of HUC12s returned per page. Queries spanning more than ``limit`` HUC12s are split across pages and reassembled. Default 600. ssl_check : bool, optional If True (default), verify SSL certificates; set False to skip verification (e.g. behind a TLS-intercepting proxy). Returns ------- df : ``pandas.DataFrame`` Water-use estimates in long form: a ``huc12_id`` column (string, leading zeros preserved), a time column (``year_month`` for monthly data or ``year`` for annual data), and one value column per requested variable (suffixed with its unit, e.g. ``pswdtot_mgd`` for million gallons per day). md : :class:`dataretrieval.utils.BaseMetadata` Metadata describing the request (URL, query time, response headers). Raises ------ ValueError If not exactly one of ``state``, ``county``, or ``huc`` is given, or a given selector is malformed (an unrecognized state, a county code that is not five digits, or a HUC of invalid length). DataRetrievalError On an HTTP error response, the typed subclass for the status (see :func:`dataretrieval.exceptions.error_for_status`); or :class:`~dataretrieval.exceptions.NetworkError` on a connection-level failure (timeout, DNS). Examples -------- .. doctest:: :skipif: True # network >>> from dataretrieval import wateruse >>> df, md = wateruse.get_wateruse( ... model="wu-public-supply-wd", ... variable=["pswdtot", "pswdgw", "pswdsw"], ... state="RI", ... start_date="2020-01", ... time_resolution="monthly", ... ) """ # The public parameters are idiomatic snake_case (consistent with # ``waterdata.get_samples``); the NWDC service expects compact lowercase # query names, so map to those here as the request is built. base_params: dict[str, Any] = { "format": "csv", "model": model, "variable": to_str(variable), "timeres": time_resolution, "startdate": start_date, "enddate": end_date, "intersection": intersection, "limit": limit, } # Drop params the caller left unset; the service rejects empty values. base_params = {k: v for k, v in base_params.items() if v is not None} # The NWDC queries one location per request, so fan a multi-value selector # out into one request per location, each paginated by the OGC engine's # shared pager (``_paginate``), and concatenate the results. headers = _default_headers() requests = [ httpx.Request( "GET", WATERUSE_URL, params={**base_params, "location": location}, headers=headers, ) for location in _resolve_locations(state, county, huc) ] # ``_run_sync`` drives the async fan-out via an anyio portal, so it is safe # even inside an already-running event loop (e.g. a Jupyter notebook). # ``error_url`` is the host reported in any connection-error message (this # module builds its own requests, so it has no OGC request-builder base). df, response = _run_sync( lambda: _fan_out(requests, headers, ssl_check), service="wateruse", error_url=WATERUSE_URL, ) return df, BaseMetadata(response)
# Valid HUC code lengths (digits) → the hydrologic-unit level they query. _HUC_LENGTHS = (2, 4, 6, 8, 10, 12) # Maps each selector to the NWDC ``location=<type>:<id>`` value(s) it produces. # A value may be a single code or a list; ``_as_list`` normalizes both (``state`` # additionally normalizes to the two-letter postal code, and ``to_state`` may # itself return a scalar or list, which ``_as_list`` flattens the same way). # Since NWDC takes one location per request, a list value fans out — one request # per location (see :func:`_fan_out`). _LOCATION_BUILDERS: dict[str, Callable[[Any], list[str]]] = { "state": lambda v: [f"stateCd:{c}" for c in _as_list(to_state(v, to="postal"))], "county": lambda v: [f"countyCd:{_validate_county(c)}" for c in _as_list(v)], "huc": lambda v: [f"huc{len(c)}:{c}" for c in map(_validate_huc, _as_list(v))], }
[docs] def _resolve_locations( state: str | int | Iterable[str | int] | None, county: str | Iterable[str] | None, huc: str | Iterable[str] | None, ) -> list[str]: """Build the NWDC ``location=<type>:<id>`` value(s) from the selectors. Exactly one of ``state`` / ``county`` / ``huc`` must be given; each may be a single value or a list. ``state`` is normalized to the two-letter postal code ``stateCd`` requires; ``county`` is a five-digit FIPS code; and a ``huc`` code's length selects its level (``huc2`` … ``huc12``). Returns one location string per value — the caller issues one request per location. """ selected = { name: value for name, value in (("state", state), ("county", county), ("huc", huc)) if value is not None } if len(selected) != 1: raise ValueError( "Specify exactly one of state, county, or huc " f"(got: {', '.join(selected) or 'none'})." ) [(name, value)] = selected.items() locations = _LOCATION_BUILDERS[name](value) if not locations: raise ValueError( "The chosen location selector is empty; pass at least one value." ) return locations
[docs] def _as_list(value: object) -> list[Any]: """A scalar becomes a one-element list; any non-string iterable (list, tuple, Series, ndarray, generator) is materialized to a list. A string is treated as a scalar so it isn't exploded into characters.""" if isinstance(value, Iterable) and not isinstance(value, str): return list(value) return [value]
[docs] def _validate_county(value: object) -> str: """Validate and normalize a five-digit state+county FIPS code.""" code = str(value).strip() if not (code.isdigit() and len(code) == 5): raise ValueError( "county must be a five-digit state+county FIPS code " f"(e.g. '55025'), got {value!r}." ) return code
[docs] def _validate_huc(value: object) -> str: """Validate a HUC code (even length 2-12 digits; level set by length).""" code = str(value).strip() if not (code.isdigit() and len(code) in _HUC_LENGTHS): raise ValueError( "huc must be a hydrologic unit code of even length 2-12 digits " f"(e.g. '04', '07070005', '010900020502'), got {value!r}." ) return code
[docs] async def _fan_out( requests: list[httpx.Request], headers: dict[str, str], ssl_check: bool ) -> tuple[pd.DataFrame, httpx.Response]: """Fetch every request (each paginated) concurrently over one shared client. Each request is paginated by the engine's :func:`~dataretrieval.ogc.engine._paginate` with NWDC strategies: parse a CSV page and read its ``Link`` header cursor (``parse``), follow that cursor (``follow``), and raise the typed error carrying the NWDC ``detail`` (``raise_for_status``). Concurrency is bounded by a semaphore at :data:`MAX_CONCURRENT_REQUESTS`, and ``asyncio.gather`` preserves input order, so the concatenation is deterministic. The shared :class:`httpx.AsyncClient` keeps connections alive across pages and requests. """ def parse(response: httpx.Response) -> tuple[pd.DataFrame, str | None]: return _read_csv_page(response), _next_page_url(response) async def follow(cursor: str, sess: httpx.AsyncClient) -> httpx.Response: return await sess.get(cursor, headers=headers) def raise_for_status(response: httpx.Response) -> None: _raise_for_status(response, detail_from=_nwdc_error_detail) async with httpx.AsyncClient(verify=ssl_check, **HTTPX_DEFAULTS) as client: semaphore = asyncio.Semaphore(max(1, MAX_CONCURRENT_REQUESTS)) async def _one(request: httpx.Request) -> tuple[pd.DataFrame, httpx.Response]: async with semaphore: return await _paginate( request, parse_response=parse, follow_up=follow, client=client, raise_for_status=raise_for_status, ) results = await asyncio.gather(*(_one(req) for req in requests)) # Reuse the engine's combine helpers: drop empty frames and concat, and fold # the per-location responses into one (lowest-remaining rate-limit headers + # cumulative elapsed), keeping the first request's URL as the query identity. frames = [frame for frame, _ in results] responses = [resp for _, resp in results] return _combine_chunk_frames(frames), _combine_chunk_responses( responses, str(requests[0].url) )
[docs] def _read_csv_page(response: httpx.Response) -> pd.DataFrame: """Parse one CSV page; ``huc12_id`` stays a string to keep leading zeros.""" try: return pd.read_csv(io.BytesIO(response.content), dtype={_HUC12_COLUMN: str}) except pd.errors.EmptyDataError as exc: # NWDC normally signals "no data" with a 400 (handled above) or rows of # zeros, never an empty body — but keep the typed-error contract if it # ever returns one rather than leaking a bare pandas exception. raise DataRetrievalError( f"NWDC returned an empty response body (URL: {response.url})." ) from exc
[docs] def _next_page_url(response: httpx.Response) -> str | None: """Return the absolute URL of the next page, or None if this is the last. Reads the standard ``Link: <...>; rel="next"`` header (parsed by httpx into ``response.links``). A next link served against the bare ``water.usgs.gov`` host is normalized to the public ``api.water.usgs.gov`` gateway so the follow-up request reaches the API. """ url = response.links.get("next", {}).get("url") if not url: return None return url.replace("https://water.usgs.gov", "https://api.water.usgs.gov", 1)
[docs] def _nwdc_error_detail(response: httpx.Response) -> str | None: """Pull the ``detail`` message out of an NWDC JSON error envelope, if any. The NWDC reports errors as ``{"detail": "Invalid model name: ..."}``. Passed to :func:`~dataretrieval.utils._raise_for_status` as ``detail_from`` so the service's wording surfaces in the typed error message. """ try: body = response.json() except ValueError: return None return body.get("detail") if isinstance(body, dict) else None