"""
Useful utilities for data munging.
"""
from __future__ import annotations
import warnings
from collections.abc import Iterable
from typing import Any
import httpx
import pandas as pd
import dataretrieval
from dataretrieval.codes import tz
from dataretrieval.exceptions import (
NetworkError,
NoSitesError,
URLTooLong,
error_for_status,
)
# Typed as ``dict[str, Any]`` (not the inferred ``dict[str, object]``) so that
# splatting it as ``**HTTPX_DEFAULTS`` into ``httpx.get`` / ``httpx.AsyncClient``
# type-checks: the values are a heterogeneous bag of httpx keyword arguments.
HTTPX_DEFAULTS: dict[str, Any] = {
"follow_redirects": True,
"timeout": httpx.Timeout(60.0, connect=10.0),
}
[docs]
def to_str(listlike: object, delimiter: str = ",") -> str | None:
"""Translates list-like objects into strings.
Parameters
----------
listlike: list-like object
An object that is a list, or list-like
(e.g., ``pandas.core.series.Series``)
delimiter: string, optional
The delimiter that is placed between entries in listlike when it is
turned into a string. Default value is a comma.
Returns
-------
listlike: string
The listlike object as string separated by the delimiter
Examples
--------
.. doctest::
>>> dataretrieval.utils.to_str([1, "a", 2])
'1,a,2'
>>> dataretrieval.utils.to_str([0, 10, 42], delimiter="+")
'0+10+42'
"""
if isinstance(listlike, str):
return listlike
if isinstance(listlike, Iterable):
return delimiter.join(map(str, listlike))
return None
# (time-suffix, tz-suffix) pairs that follow a "<prefix>Date" column.
_TIME_TZ_SUFFIXES = (
# WQX3 / Samples, e.g.
# Activity_StartDate / Activity_StartTime / Activity_StartTimeZone
("Time", "TimeZone"),
# Legacy WQP (slash-separated), e.g.
# ActivityStartDate / ActivityStartTime/Time / ActivityStartTime/TimeZoneCode
("Time/Time", "Time/TimeZoneCode"),
)
[docs]
def _build_utc_datetime(
date_series: pd.Series, time_series: pd.Series, tz_series: pd.Series
) -> pd.Series:
"""Combine date + time + tz-abbreviation columns into a UTC pandas Series.
Unknown timezone codes (and rows missing any of the three values) yield
``NaT``. The input columns are not mutated.
"""
offsets = tz_series.map(tz)
combined = (
date_series.astype("string")
+ " "
+ time_series.astype("string")
+ " "
+ offsets.astype("string")
)
return pd.to_datetime(
combined, format="%Y-%m-%d %H:%M:%S %z", utc=True, errors="coerce"
)
[docs]
def _attach_datetime_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Add ``<prefix>DateTime`` UTC columns for any Date/Time/TimeZone triplets
and sort the frame by the activity-start datetime.
Detects two naming patterns that appear in USGS Samples and Water Quality
Portal CSV responses:
* **WQX3** — ``<prefix>Date``, ``<prefix>Time``, ``<prefix>TimeZone``
* **Legacy WQP** — ``<prefix>Date``, ``<prefix>Time/Time``,
``<prefix>Time/TimeZoneCode``
For every triplet present, a new ``<prefix>DateTime`` column is appended
holding a UTC ``Timestamp`` (offsets resolved via
:data:`dataretrieval.codes.tz`). The original Date/Time/TimeZone columns
are left intact, and an existing ``<prefix>DateTime`` column is never
overwritten.
Rows are sorted (and the index reset) by the canonical activity-start
datetime when present — ``Activity_StartDateTime`` (WQX3) or
``ActivityStartDateTime`` (legacy WQP) — falling back to the first
detected ``*Date`` column. Mirrors R ``dataRetrieval``'s
end-of-pipeline sort in ``importWQP.R``.
Parameters
----------
df : ``pandas.DataFrame``
DataFrame returned from a Samples or WQP CSV endpoint.
Returns
-------
df : ``pandas.DataFrame``
A new DataFrame with derivable ``<prefix>DateTime`` columns appended
and rows sorted by the activity-start datetime (if any date column
was detected).
"""
columns = set(df.columns)
new_columns = {}
first_date_col = None
for col in df.columns:
if not col.endswith("Date"):
continue
if first_date_col is None:
first_date_col = col
prefix = col.removesuffix("Date")
target = prefix + "DateTime"
if target in columns or target in new_columns:
continue
for time_suffix, tz_suffix in _TIME_TZ_SUFFIXES:
time_col = prefix + time_suffix
tz_col = prefix + tz_suffix
if time_col in columns and tz_col in columns:
new_columns[target] = _build_utc_datetime(
df[col], df[time_col], df[tz_col]
)
break
if new_columns:
# Concat in one shot — per-column assignment on a wide CSV-derived
# frame triggers pandas' fragmentation PerformanceWarning.
df = pd.concat([df, pd.DataFrame(new_columns, index=df.index)], axis=1)
sort_key: str | None
if "Activity_StartDateTime" in df.columns:
sort_key = "Activity_StartDateTime"
elif "ActivityStartDateTime" in df.columns:
sort_key = "ActivityStartDateTime"
else:
sort_key = first_date_col
if sort_key is not None:
df = df.sort_values(by=sort_key, ignore_index=True)
return df
_URL_TOO_LONG_EXAMPLE = """
# n is the number of chunks to divide the query into \n
split_list = np.array_split(site_list, n)
data_list = [] # list to store chunk results in \n
# loop through chunks and make requests \n
for site_list in split_list: \n
data = nwis.get_record(sites=site_list, service='dv', \n
start=start, end=end) \n
data_list.append(data) # append results to list"""
def _url_too_long_error(detail: str) -> URLTooLong:
return URLTooLong(
"Request URL too long. Modify your query to use fewer sites. "
f"{detail}. Pseudo-code example of how to split your query: "
f"\n {_URL_TOO_LONG_EXAMPLE}"
)
[docs]
def _network_error(url: str | httpx.URL, exc: httpx.TransportError) -> NetworkError:
"""Build the :class:`~dataretrieval.exceptions.NetworkError` for a failed
round-trip ``exc`` (no HTTP response: timeout, DNS, refused connection)."""
# Some httpx transport errors stringify empty (e.g. ``ConnectTimeout()``);
# fall back to the class name so the message is always informative.
detail = str(exc) or type(exc).__name__
return NetworkError(f"Could not reach the service at {url}: {detail}")
[docs]
def _get(url: str | httpx.URL, **kwargs: Any) -> httpx.Response:
"""``httpx.get`` for the single-shot paths, surfacing a transport failure as
a typed :class:`~dataretrieval.exceptions.NetworkError` (the chunker wraps its
own as resumable interruptions, so it stays off this wrapper)."""
try:
return httpx.get(url, **kwargs)
except httpx.TransportError as exc:
raise _network_error(url, exc) from exc
[docs]
def _raise_for_status(response: httpx.Response) -> None:
"""Raise the typed :class:`DataRetrievalError` for an HTTP error response;
return ``None`` on success.
Shared by the legacy :func:`query` path (and ``nadp`` / ``streamstats``).
Delegates the status-to-type mapping to
:func:`dataretrieval.exceptions.error_for_status`, except a too-long-URL
status (413 / 414): that gets the same actionable "split your query"
remediation as the client-side over-long-URL case below, rather than a bare
``HTTP 414`` (both still raise :class:`~dataretrieval.exceptions.URLTooLong`).
"""
status = response.status_code
if status < 400:
return
if status in (413, 414):
raise _url_too_long_error(f"API response reason: {response.reason_phrase}")
raise error_for_status(
status,
f"HTTP {status} {response.reason_phrase}".rstrip() + f" (URL: {response.url})",
)
[docs]
def query(
url: str,
payload: dict[str, Any],
delimiter: str = ",",
ssl_check: bool = True,
) -> httpx.Response:
"""Send a query.
Wrapper for httpx.get that handles errors, converts listed
query parameters to comma separated strings, and returns response.
Parameters
----------
url: string
URL to query
payload: dict
query parameters passed to ``httpx.get``
delimiter: string
delimiter to use with lists
ssl_check: bool
If True, check SSL certificates, if False, do not check SSL,
default is True
Returns
-------
response: ``httpx.Response``
The response from the API query ``httpx.get`` function call.
Raises
------
DataRetrievalError
On an HTTP error response, the typed subclass for the status (see
:func:`dataretrieval.exceptions.error_for_status` for the mapping); or
:class:`~dataretrieval.exceptions.NoSitesError` when a 200 response
reports no data matched; or :class:`~dataretrieval.exceptions.NetworkError`
on a connection-level failure (timeout, DNS), with the underlying
``httpx`` exception on ``__cause__``.
"""
for key, value in payload.items():
payload[key] = to_str(value, delimiter)
# httpx serializes None params as ``foo=``; USGS rejects with 400.
# Drop them. (``to_str`` returns None for non-iterable scalars like bools.)
payload = {k: v for k, v in payload.items() if v is not None}
user_agent = {"user-agent": f"python-dataretrieval/{dataretrieval.__version__}"}
try:
response = _get(
url,
params=payload,
headers=user_agent,
verify=ssl_check,
**HTTPX_DEFAULTS,
)
except httpx.InvalidURL as exc:
raise _url_too_long_error(f"httpx rejected the URL client-side: {exc}") from exc
_raise_for_status(response)
# USGS waterservices signals an empty result with a 200 whose body starts
# "No sites/data ..." (its legacy wording); surface it as NoSitesError.
if response.text.startswith("No sites/data"):
raise NoSitesError(response.url)
return response