"""
Useful utilities for data munging.
"""
import warnings
from collections.abc import Iterable
import pandas as pd
import requests
import dataretrieval
from dataretrieval.codes import tz
[docs]
def to_str(listlike, delimiter=","):
"""Translates list-like objects into strings.
Parameters
----------
listlike: list-like object
An object that is a list, or list-like
(e.g., ``pandas.core.series.Series``)
delimiter: string, optional
The delimiter that is placed between entries in listlike when it is
turned into a string. Default value is a comma.
Returns
-------
listlike: string
The listlike object as string separated by the delimiter
Examples
--------
.. doctest::
>>> dataretrieval.utils.to_str([1, "a", 2])
'1,a,2'
>>> dataretrieval.utils.to_str([0, 10, 42], delimiter="+")
'0+10+42'
"""
if isinstance(listlike, str):
return listlike
if isinstance(listlike, Iterable):
return delimiter.join(map(str, listlike))
return None
# (time-suffix, tz-suffix) pairs that follow a "<prefix>Date" column.
_TIME_TZ_SUFFIXES = (
# WQX3 / Samples, e.g.
# Activity_StartDate / Activity_StartTime / Activity_StartTimeZone
("Time", "TimeZone"),
# Legacy WQP (slash-separated), e.g.
# ActivityStartDate / ActivityStartTime/Time / ActivityStartTime/TimeZoneCode
("Time/Time", "Time/TimeZoneCode"),
)
[docs]
def _build_utc_datetime(
date_series: pd.Series, time_series: pd.Series, tz_series: pd.Series
) -> pd.Series:
"""Combine date + time + tz-abbreviation columns into a UTC pandas Series.
Unknown timezone codes (and rows missing any of the three values) yield
``NaT``. The input columns are not mutated.
"""
offsets = tz_series.map(tz)
combined = (
date_series.astype("string")
+ " "
+ time_series.astype("string")
+ " "
+ offsets.astype("string")
)
return pd.to_datetime(
combined, format="%Y-%m-%d %H:%M:%S %z", utc=True, errors="coerce"
)
[docs]
def _attach_datetime_columns(df: pd.DataFrame) -> pd.DataFrame:
"""Add ``<prefix>DateTime`` UTC columns for any Date/Time/TimeZone triplets
and sort the frame by the activity-start datetime.
Detects two naming patterns that appear in USGS Samples and Water Quality
Portal CSV responses:
* **WQX3** — ``<prefix>Date``, ``<prefix>Time``, ``<prefix>TimeZone``
* **Legacy WQP** — ``<prefix>Date``, ``<prefix>Time/Time``,
``<prefix>Time/TimeZoneCode``
For every triplet present, a new ``<prefix>DateTime`` column is appended
holding a UTC ``Timestamp`` (offsets resolved via
:data:`dataretrieval.codes.tz`). The original Date/Time/TimeZone columns
are left intact, and an existing ``<prefix>DateTime`` column is never
overwritten.
Rows are sorted (and the index reset) by the canonical activity-start
datetime when present — ``Activity_StartDateTime`` (WQX3) or
``ActivityStartDateTime`` (legacy WQP) — falling back to the first
detected ``*Date`` column. Mirrors R ``dataRetrieval``'s
end-of-pipeline sort in ``importWQP.R``.
Parameters
----------
df : ``pandas.DataFrame``
DataFrame returned from a Samples or WQP CSV endpoint.
Returns
-------
df : ``pandas.DataFrame``
A new DataFrame with derivable ``<prefix>DateTime`` columns appended
and rows sorted by the activity-start datetime (if any date column
was detected).
"""
columns = set(df.columns)
new_columns = {}
first_date_col = None
for col in df.columns:
if not col.endswith("Date"):
continue
if first_date_col is None:
first_date_col = col
prefix = col.removesuffix("Date")
target = prefix + "DateTime"
if target in columns or target in new_columns:
continue
for time_suffix, tz_suffix in _TIME_TZ_SUFFIXES:
time_col = prefix + time_suffix
tz_col = prefix + tz_suffix
if time_col in columns and tz_col in columns:
new_columns[target] = _build_utc_datetime(
df[col], df[time_col], df[tz_col]
)
break
if new_columns:
# Concat in one shot — per-column assignment on a wide CSV-derived
# frame triggers pandas' fragmentation PerformanceWarning.
df = pd.concat([df, pd.DataFrame(new_columns, index=df.index)], axis=1)
if "Activity_StartDateTime" in df.columns:
sort_key = "Activity_StartDateTime"
elif "ActivityStartDateTime" in df.columns:
sort_key = "ActivityStartDateTime"
else:
sort_key = first_date_col
if sort_key is not None:
df = df.sort_values(by=sort_key, ignore_index=True)
return df
[docs]
def query(url, payload, delimiter=",", ssl_check=True):
"""Send a query.
Wrapper for requests.get that handles errors, converts listed
query parameters to comma separated strings, and returns response.
Parameters
----------
url: string
URL to query
payload: dict
query parameters passed to ``requests.get``
delimiter: string
delimiter to use with lists
ssl_check: bool
If True, check SSL certificates, if False, do not check SSL,
default is True
Returns
-------
string: query response
The response from the API query ``requests.get`` function call.
"""
for key, value in payload.items():
payload[key] = to_str(value, delimiter)
# for index in range(len(payload)):
# key, value = payload[index]
# payload[index] = (key, to_str(value))
# define the user agent for the query
user_agent = {"user-agent": f"python-dataretrieval/{dataretrieval.__version__}"}
response = requests.get(url, params=payload, headers=user_agent, verify=ssl_check)
if response.status_code == 400:
raise ValueError(
f"Bad Request, check that your parameters are correct. URL: {response.url}"
)
elif response.status_code == 404:
raise ValueError(
"Page Not Found Error. May be the result of an empty query. "
+ f"URL: {response.url}"
)
elif response.status_code == 414:
_reason = response.reason
_example = """
# n is the number of chunks to divide the query into \n
split_list = np.array_split(site_list, n)
data_list = [] # list to store chunk results in \n
# loop through chunks and make requests \n
for site_list in split_list: \n
data = nwis.get_record(sites=site_list, service='dv', \n
start=start, end=end) \n
data_list.append(data) # append results to list"""
raise ValueError(
"Request URL too long. Modify your query to use fewer sites. "
+ f"API response reason: {_reason}. Pseudo-code example of how to "
+ f"split your query: \n {_example}"
)
elif response.status_code in [500, 502, 503]:
raise ValueError(
f"Service Unavailable: {response.status_code} {response.reason}. "
+ f"The service at {response.url} may be down or experiencing issues."
)
if response.text.startswith("No sites/data"):
raise NoSitesError(response.url)
return response
[docs]
class NoSitesError(Exception):
"""Custom error class used when selection criteria returns no sites/data."""
[docs]
def __init__(self, url):
self.url = url
[docs]
def __str__(self):
return (
"No sites/data found using the selection criteria specified in "
f"url: {self.url}"
)