Source code for dataretrieval.utils

"""
Useful utilities for data munging.
"""

from __future__ import annotations

import warnings
from collections.abc import Iterable
from typing import Any

import httpx
import pandas as pd

import dataretrieval
from dataretrieval.codes import tz
from dataretrieval.exceptions import (
    NetworkError,
    NoSitesError,
    URLTooLong,
    error_for_status,
)

# Typed as ``dict[str, Any]`` (not the inferred ``dict[str, object]``) so that
# splatting it as ``**HTTPX_DEFAULTS`` into ``httpx.get`` / ``httpx.AsyncClient``
# type-checks: the values are a heterogeneous bag of httpx keyword arguments.
HTTPX_DEFAULTS: dict[str, Any] = {
    "follow_redirects": True,
    "timeout": httpx.Timeout(60.0, connect=10.0),
}


[docs] def to_str(listlike: object, delimiter: str = ",") -> str | None: """Translates list-like objects into strings. Parameters ---------- listlike: list-like object An object that is a list, or list-like (e.g., ``pandas.core.series.Series``) delimiter: string, optional The delimiter that is placed between entries in listlike when it is turned into a string. Default value is a comma. Returns ------- listlike: string The listlike object as string separated by the delimiter Examples -------- .. doctest:: >>> dataretrieval.utils.to_str([1, "a", 2]) '1,a,2' >>> dataretrieval.utils.to_str([0, 10, 42], delimiter="+") '0+10+42' """ if isinstance(listlike, str): return listlike if isinstance(listlike, Iterable): return delimiter.join(map(str, listlike)) return None
[docs] def format_datetime( df: pd.DataFrame, date_field: str, time_field: str, tz_field: str ) -> pd.DataFrame: """Creates a datetime field from separate date, time, and time zone fields. Assumes ISO 8601. Parameters ---------- df: ``pandas.DataFrame`` A data frame containing date, time, and timezone fields. date_field: string Name of date column in df. time_field: string Name of time column in df. tz_field: string Name of time zone column in df. Returns ------- df: ``pandas.DataFrame`` The data frame with a formatted 'datetime' column """ # create a datetime index from the columns in qwdata response df[tz_field] = df[tz_field].map(tz) df["datetime"] = pd.to_datetime( df[date_field] + " " + df[time_field] + " " + df[tz_field], format="mixed", utc=True, ) # if there are any incomplete dates, warn the user if df["datetime"].isna().any(): count = df["datetime"].isna().sum() warnings.warn( f"Warning: {count} incomplete dates found, " + "consider setting datetime_index to False.", UserWarning, stacklevel=2, ) return df
# (time-suffix, tz-suffix) pairs that follow a "<prefix>Date" column. _TIME_TZ_SUFFIXES = ( # WQX3 / Samples, e.g. # Activity_StartDate / Activity_StartTime / Activity_StartTimeZone ("Time", "TimeZone"), # Legacy WQP (slash-separated), e.g. # ActivityStartDate / ActivityStartTime/Time / ActivityStartTime/TimeZoneCode ("Time/Time", "Time/TimeZoneCode"), )
[docs] def _build_utc_datetime( date_series: pd.Series, time_series: pd.Series, tz_series: pd.Series ) -> pd.Series: """Combine date + time + tz-abbreviation columns into a UTC pandas Series. Unknown timezone codes (and rows missing any of the three values) yield ``NaT``. The input columns are not mutated. """ offsets = tz_series.map(tz) combined = ( date_series.astype("string") + " " + time_series.astype("string") + " " + offsets.astype("string") ) return pd.to_datetime( combined, format="%Y-%m-%d %H:%M:%S %z", utc=True, errors="coerce" )
[docs] def _attach_datetime_columns(df: pd.DataFrame) -> pd.DataFrame: """Add ``<prefix>DateTime`` UTC columns for any Date/Time/TimeZone triplets and sort the frame by the activity-start datetime. Detects two naming patterns that appear in USGS Samples and Water Quality Portal CSV responses: * **WQX3** — ``<prefix>Date``, ``<prefix>Time``, ``<prefix>TimeZone`` * **Legacy WQP** — ``<prefix>Date``, ``<prefix>Time/Time``, ``<prefix>Time/TimeZoneCode`` For every triplet present, a new ``<prefix>DateTime`` column is appended holding a UTC ``Timestamp`` (offsets resolved via :data:`dataretrieval.codes.tz`). The original Date/Time/TimeZone columns are left intact, and an existing ``<prefix>DateTime`` column is never overwritten. Rows are sorted (and the index reset) by the canonical activity-start datetime when present — ``Activity_StartDateTime`` (WQX3) or ``ActivityStartDateTime`` (legacy WQP) — falling back to the first detected ``*Date`` column. Mirrors R ``dataRetrieval``'s end-of-pipeline sort in ``importWQP.R``. Parameters ---------- df : ``pandas.DataFrame`` DataFrame returned from a Samples or WQP CSV endpoint. Returns ------- df : ``pandas.DataFrame`` A new DataFrame with derivable ``<prefix>DateTime`` columns appended and rows sorted by the activity-start datetime (if any date column was detected). """ columns = set(df.columns) new_columns = {} first_date_col = None for col in df.columns: if not col.endswith("Date"): continue if first_date_col is None: first_date_col = col prefix = col.removesuffix("Date") target = prefix + "DateTime" if target in columns or target in new_columns: continue for time_suffix, tz_suffix in _TIME_TZ_SUFFIXES: time_col = prefix + time_suffix tz_col = prefix + tz_suffix if time_col in columns and tz_col in columns: new_columns[target] = _build_utc_datetime( df[col], df[time_col], df[tz_col] ) break if new_columns: # Concat in one shot — per-column assignment on a wide CSV-derived # frame triggers pandas' fragmentation PerformanceWarning. df = pd.concat([df, pd.DataFrame(new_columns, index=df.index)], axis=1) sort_key: str | None if "Activity_StartDateTime" in df.columns: sort_key = "Activity_StartDateTime" elif "ActivityStartDateTime" in df.columns: sort_key = "ActivityStartDateTime" else: sort_key = first_date_col if sort_key is not None: df = df.sort_values(by=sort_key, ignore_index=True) return df
[docs] class BaseMetadata: """Base class for metadata. Attributes ---------- url : str Response url query_time: datetime.timedelta Response elapsed time header: httpx.Headers Response headers """
[docs] def __init__(self, response: httpx.Response) -> None: """Generates a standard set of metadata informed by the response. Parameters ---------- response: ``httpx.Response`` Response object from the ``httpx`` module. """ # Coerce httpx.URL -> str: BaseMetadata.url has always been str. self.url = str(response.url) self.query_time = response.elapsed self.header = response.headers self.comment: str | None = None
# # not sure what statistic_info is # self.statistic_info = None # # disclaimer seems to be only part of importWaterML1 # self.disclaimer = None # These properties are to be set by `nwis` or `wqp`-specific metadata classes. @property def site_info(self) -> Any: raise NotImplementedError( "site_info must be implemented by utils.BaseMetadata children" ) @property def variable_info(self) -> Any: raise NotImplementedError( "variable_info must be implemented by utils.BaseMetadata children" )
[docs] def __repr__(self) -> str: return f"{type(self).__name__}(url={self.url})"
_URL_TOO_LONG_EXAMPLE = """ # n is the number of chunks to divide the query into \n split_list = np.array_split(site_list, n) data_list = [] # list to store chunk results in \n # loop through chunks and make requests \n for site_list in split_list: \n data = nwis.get_record(sites=site_list, service='dv', \n start=start, end=end) \n data_list.append(data) # append results to list""" def _url_too_long_error(detail: str) -> URLTooLong: return URLTooLong( "Request URL too long. Modify your query to use fewer sites. " f"{detail}. Pseudo-code example of how to split your query: " f"\n {_URL_TOO_LONG_EXAMPLE}" )
[docs] def _network_error(url: str | httpx.URL, exc: httpx.TransportError) -> NetworkError: """Build the :class:`~dataretrieval.exceptions.NetworkError` for a failed round-trip ``exc`` (no HTTP response: timeout, DNS, refused connection).""" # Some httpx transport errors stringify empty (e.g. ``ConnectTimeout()``); # fall back to the class name so the message is always informative. detail = str(exc) or type(exc).__name__ return NetworkError(f"Could not reach the service at {url}: {detail}")
[docs] def _get(url: str | httpx.URL, **kwargs: Any) -> httpx.Response: """``httpx.get`` for the single-shot paths, surfacing a transport failure as a typed :class:`~dataretrieval.exceptions.NetworkError` (the chunker wraps its own as resumable interruptions, so it stays off this wrapper).""" try: return httpx.get(url, **kwargs) except httpx.TransportError as exc: raise _network_error(url, exc) from exc
[docs] def _raise_for_status(response: httpx.Response) -> None: """Raise the typed :class:`DataRetrievalError` for an HTTP error response; return ``None`` on success. Shared by the legacy :func:`query` path (and ``nadp`` / ``streamstats``). Delegates the status-to-type mapping to :func:`dataretrieval.exceptions.error_for_status`, except a too-long-URL status (413 / 414): that gets the same actionable "split your query" remediation as the client-side over-long-URL case below, rather than a bare ``HTTP 414`` (both still raise :class:`~dataretrieval.exceptions.URLTooLong`). """ status = response.status_code if status < 400: return if status in (413, 414): raise _url_too_long_error(f"API response reason: {response.reason_phrase}") raise error_for_status( status, f"HTTP {status} {response.reason_phrase}".rstrip() + f" (URL: {response.url})", )
[docs] def query( url: str, payload: dict[str, Any], delimiter: str = ",", ssl_check: bool = True, ) -> httpx.Response: """Send a query. Wrapper for httpx.get that handles errors, converts listed query parameters to comma separated strings, and returns response. Parameters ---------- url: string URL to query payload: dict query parameters passed to ``httpx.get`` delimiter: string delimiter to use with lists ssl_check: bool If True, check SSL certificates, if False, do not check SSL, default is True Returns ------- response: ``httpx.Response`` The response from the API query ``httpx.get`` function call. Raises ------ DataRetrievalError On an HTTP error response, the typed subclass for the status (see :func:`dataretrieval.exceptions.error_for_status` for the mapping); or :class:`~dataretrieval.exceptions.NoSitesError` when a 200 response reports no data matched; or :class:`~dataretrieval.exceptions.NetworkError` on a connection-level failure (timeout, DNS), with the underlying ``httpx`` exception on ``__cause__``. """ for key, value in payload.items(): payload[key] = to_str(value, delimiter) # httpx serializes None params as ``foo=``; USGS rejects with 400. # Drop them. (``to_str`` returns None for non-iterable scalars like bools.) payload = {k: v for k, v in payload.items() if v is not None} user_agent = {"user-agent": f"python-dataretrieval/{dataretrieval.__version__}"} try: response = _get( url, params=payload, headers=user_agent, verify=ssl_check, **HTTPX_DEFAULTS, ) except httpx.InvalidURL as exc: raise _url_too_long_error(f"httpx rejected the URL client-side: {exc}") from exc _raise_for_status(response) # USGS waterservices signals an empty result with a 200 whose body starts # "No sites/data ..." (its legacy wording); surface it as NoSitesError. if response.text.startswith("No sites/data"): raise NoSitesError(response.url) return response