"""``get_nearest_continuous``: nearest-timestamp convenience on top of
``get_continuous``. Built on the CQL ``filter`` passthrough; only
``get_nearest_continuous`` is public — everything else is package-private.
"""
from __future__ import annotations
from typing import Literal, get_args
import pandas as pd
from dataretrieval.utils import BaseMetadata
from dataretrieval.waterdata.api import get_continuous
OnTie = Literal["first", "last", "mean"]
_VALID_ON_TIE: tuple[OnTie, ...] = get_args(OnTie)
[docs]
def get_nearest_continuous(
targets,
monitoring_location_id: str | list[str] | None = None,
parameter_code: str | list[str] | None = None,
*,
window: str | pd.Timedelta = "PT7M30S",
on_tie: OnTie = "first",
**kwargs,
) -> tuple[pd.DataFrame, BaseMetadata]:
"""For each target timestamp, return the nearest continuous observation.
Builds one bracketed ``(time >= t-window AND time <= t+window)`` clause
per target, joins them as a top-level CQL ``OR`` filter, and lets
``get_continuous`` (with its auto-chunking) fetch every observation
that falls in any window. Then, per ``(monitoring_location_id, target)``
pair, picks the single observation with the smallest ``|time - target|``.
The USGS continuous endpoint matches ``time`` parameters exactly rather
than fuzzily, and it does not implement ``sortby`` for arbitrary fields;
this function is the single-round-trip way to ask "what reading is
nearest this timestamp?" for many timestamps at once.
Parameters
----------
targets : list-like of datetime-convertible
Target timestamps. Naive datetimes are treated as UTC. Accepts a
list, ``pandas.Series``, ``pandas.DatetimeIndex``, ``numpy.ndarray``,
or anything ``pandas.to_datetime`` consumes.
monitoring_location_id : string or list of strings, optional
Forwarded to ``get_continuous``.
parameter_code : string or list of strings, optional
Forwarded to ``get_continuous``.
window : string or ``pandas.Timedelta``, default ``"PT7M30S"``
Half-window around each target, as an ISO 8601 duration
(``"PT7M30S"``, ``"PT15M"``, ``"PT1H"``, etc.). Also accepts
any other form ``pandas.Timedelta`` parses — ``HH:MM:SS``
(``"00:07:30"``), pandas shorthand (``"7min30s"``,
``"450s"``), or a ``pd.Timedelta`` directly. See the
`pandas.Timedelta docs
<https://pandas.pydata.org/docs/reference/api/pandas.Timedelta.html>`_
for the full grammar.
Must be small enough that every target's window captures
roughly one observation at the service cadence. The default
matches a 15-minute continuous gauge; widen (e.g.
``"PT15M"``) for irregular cadences or resilience to data
gaps.
on_tie : {"first", "last", "mean"}, default ``"first"``
How to resolve ties when two observations are exactly equidistant
from a target (which happens when the target falls at the midpoint
between grid points — e.g. target ``10:22:30`` for a 15-minute
gauge).
- ``"first"``: keep the earlier observation.
- ``"last"``: keep the later observation.
- ``"mean"``: average numeric columns; set the ``time`` column to
the target, since no real observation exists at the midpoint.
**kwargs
Additional keyword arguments forwarded to ``get_continuous``
(e.g. ``statistic_id``, ``approval_status``, ``properties``).
Passing ``time``, ``filter``, or ``filter_lang`` raises
``TypeError`` — this function builds those itself.
Returns
-------
df : ``pandas.DataFrame``
One row per ``(target, monitoring_location_id)`` combination that
had at least one observation in its window. Rows are augmented
with a ``target_time`` column indicating which target they
correspond to. Targets with no observations in their window are
silently dropped.
md : :class:`~dataretrieval.utils.BaseMetadata`
Metadata from the underlying ``get_continuous`` call.
Notes
-----
*Window sizing and ties.* When ``window`` is exactly half the service
cadence, most targets' windows contain a single observation and
``on_tie`` is moot. Ties arise only when a target sits exactly at the
window edge — rare in practice but possible. Setting ``window`` to a
full cadence (or larger) guarantees at least one observation per
target in steady state at the cost of more bytes per response.
*Why windowed CQL rather than sort+limit.* The API's advertised
``sortby`` parameter would make this a one-liner per target (``filter``
by ``time <= t`` and ``limit 1``), but it is per-query — you would need
one HTTP round-trip per target. The CQL ``OR``-chain approach folds
all N targets into one request (auto-chunked when the URL is long).
Examples
--------
.. code::
>>> import pandas as pd
>>> from dataretrieval import waterdata
>>> # Pair three off-grid timestamps with nearby observations
>>> targets = pd.to_datetime(
... [
... "2023-06-15T10:30:31Z",
... "2023-06-15T14:07:12Z",
... "2023-06-16T03:45:19Z",
... ]
... )
>>> df, md = waterdata.get_nearest_continuous(
... targets,
... monitoring_location_id="USGS-02238500",
... parameter_code="00060",
... )
>>> # Widen the window for an irregular-cadence gauge
>>> df, md = waterdata.get_nearest_continuous(
... targets,
... monitoring_location_id="USGS-02238500",
... parameter_code="00060",
... window="PT30M",
... on_tie="mean",
... )
"""
_check_nearest_kwargs(kwargs, on_tie)
targets = pd.DatetimeIndex(pd.to_datetime(targets, utc=True))
window_td = pd.Timedelta(window)
if len(targets) == 0:
raise ValueError("targets must contain at least one timestamp")
filter_expr = _build_window_or_filter(targets, window_td)
df, md = get_continuous(
monitoring_location_id=monitoring_location_id,
parameter_code=parameter_code,
filter=filter_expr,
filter_lang="cql-text",
**kwargs,
)
if df.empty:
return _empty_nearest_result(df), md
df = df.assign(time=pd.to_datetime(df["time"], utc=True))
site_groups = (
df.groupby("monitoring_location_id", sort=False)
if "monitoring_location_id" in df.columns
else [(None, df)]
)
selected = [
row
for _, site_df in site_groups
for target in targets
if (row := _pick_nearest_row(site_df, target, window_td, on_tie)) is not None
]
if not selected:
return _empty_nearest_result(df), md
return pd.DataFrame(selected).reset_index(drop=True), md
def _check_nearest_kwargs(kwargs: dict, on_tie: OnTie) -> None:
"""Reject kwargs the helper owns; validate ``on_tie``."""
for forbidden in ("time", "filter", "filter_lang"):
if forbidden in kwargs:
raise TypeError(
f"get_nearest_continuous constructs its own {forbidden!r}; "
"do not pass it directly"
)
if on_tie not in _VALID_ON_TIE:
raise ValueError(f"on_tie must be one of {_VALID_ON_TIE}; got {on_tie!r}")
def _build_window_or_filter(targets: pd.DatetimeIndex, window_td: pd.Timedelta) -> str:
"""Build the CQL OR-chain of ``time >= ... AND time <= ...`` windows.
``get_continuous`` auto-chunks the result if the full URL would
exceed the server's length limit, so this is always safe to build
as one string even for many targets.
"""
fmt = "%Y-%m-%dT%H:%M:%SZ"
lowers = (targets - window_td).strftime(fmt)
uppers = (targets + window_td).strftime(fmt)
return " OR ".join(
f"(time >= '{lo}' AND time <= '{up}')" for lo, up in zip(lowers, uppers)
)
def _pick_nearest_row(
site_df: pd.DataFrame,
target: pd.Timestamp,
window_td: pd.Timedelta,
on_tie: OnTie,
) -> pd.Series | None:
"""Return the single row within ``window_td`` of ``target``, or ``None``.
Resolves ties (two rows equidistant from ``target``) per ``on_tie``.
The returned row carries a ``target_time`` column identifying which
target it was selected for.
"""
in_window = site_df[
(site_df["time"] >= target - window_td)
& (site_df["time"] <= target + window_td)
]
if in_window.empty:
return None
deltas = (in_window["time"] - target).abs()
candidates = in_window[deltas == deltas.min()].sort_values("time")
if len(candidates) == 1 or on_tie == "first":
row = candidates.iloc[0].copy()
elif on_tie == "last":
row = candidates.iloc[-1].copy()
else: # "mean" — average numeric cols, set time to the target.
row = candidates.iloc[0].copy()
for col in candidates.select_dtypes("number").columns:
row[col] = candidates[col].mean()
row["time"] = target
row["target_time"] = target
return row
def _empty_nearest_result(template: pd.DataFrame) -> pd.DataFrame:
"""Empty frame matching ``template``'s columns plus a ``target_time``."""
base = template.iloc[0:0].copy()
base["target_time"] = pd.Series(dtype="datetime64[ns, UTC]")
return base