"""Resumable chunk-interruption exceptions — the public resume contract.
When a transparently-chunked request fails mid-stream (a 429, a 5xx, or a
bare transport error), the work already completed is preserved and the call
is resumable: the raised exception carries a ``.call`` handle whose
``resume()`` re-issues only the still-pending sub-requests. These exception
types are that contract, re-exported at the top level
(``from dataretrieval import ChunkInterrupted``). The execution machinery
that raises and resumes them lives in :mod:`dataretrieval.ogc.chunking`.
"""
from __future__ import annotations
from collections.abc import Awaitable, Callable
from typing import TYPE_CHECKING, Any, ClassVar
import httpx
import pandas as pd
from dataretrieval.exceptions import DataRetrievalError
if TYPE_CHECKING:
from dataretrieval.ogc.chunking import ChunkedCall
# ``_Fetch`` is the per-sub-request fetcher the decorator wraps and
# ``ChunkedCall`` drives: an ``async def fetch(args) -> (df, response)``.
_Fetch = Callable[[dict[str, Any]], Awaitable[tuple[pd.DataFrame, httpx.Response]]]
# Caller-supplied transform applied to the combined chunk result, so a
# resumed call returns the same shape as an un-interrupted one rather than
# the chunker's raw ``(frame, httpx.Response)``. This keeps the chunker
# generic: the OGC getters inject their post-processing (type coercion,
# column arrangement, ``BaseMetadata``) through ``utils._finalize_ogc``.
# The default is identity, so direct ``ChunkedCall`` use is unaffected.
_Finalize = Callable[[pd.DataFrame, httpx.Response], tuple[pd.DataFrame, Any]]
def _passthrough_result(
frame: pd.DataFrame, response: httpx.Response
) -> tuple[pd.DataFrame, Any]:
"""Default :data:`_Finalize`: return the raw combined pair unchanged."""
return frame, response
[docs]
class ChunkInterrupted(DataRetrievalError):
"""
Base class for mid-stream chunk failures whose completed work is
preserved and resumable.
A ``ChunkInterrupted`` subclass means: a sub-request failed, but
``ChunkedCall`` still owns whatever completed successfully before
the failure. Call ``self.call.resume()`` to pick up where the
failure stopped you — only still-pending sub-requests are
re-issued.
Subclasses describe *why* ``ChunkedCall`` stopped so callers can
pick a retry policy: :class:`QuotaExhausted` for 429 (wait for the
rate-limit window), :class:`ServiceInterrupted` for 5xx (wait for
the upstream to recover). The ``.call`` handle is the same object
across every interruption of a single chunked call — frames
accumulate across retries.
Attributes
----------
call : ChunkedCall or None
Resumable handle into the ``ChunkedCall`` that raised this
exception. ``None`` only on hand-constructed exceptions (test
fixtures), where ``.call``-derived accessors degrade to
empty/``None``.
retry_after : float or None
Seconds the server suggested waiting (``Retry-After`` header).
``None`` when the server gave no hint.
completed_chunks : int
Number of sub-requests successfully completed before the failure.
total_chunks : int
Total sub-requests in the plan.
partial_frame : pandas.DataFrame
Combined frame of work completed by the moment this exception
was raised. Snapshot at raise time — does NOT advance on a
later ``call.resume()`` (use ``exc.call.partial_frame`` for
the live view).
partial_response : httpx.Response or None
Raw aggregate response covering the completed sub-requests at
raise time; ``None`` if nothing had completed yet. Same snapshot
semantics as ``partial_frame``. (Raw, not finalized — use
``exc.call.resume()`` for the finalized ``(df, metadata)`` result.)
Examples
--------
Retry on any transient interruption, honoring the server's
``Retry-After`` hint when present and falling back to a fixed wait
otherwise. Each new interruption keeps the already-completed work
intact — only the still-pending sub-requests are re-issued.
.. code-block:: python
import time
from dataretrieval import ChunkInterrupted
# ``getter`` is any chunked OGC getter — e.g.
# ``waterdata.get_daily`` or ``ngwmn.get_water_level``.
try:
df, md = getter(monitoring_location_id=long_list_of_sites)
except ChunkInterrupted as exc:
while True:
time.sleep(exc.retry_after or 5 * 60)
try:
df, md = exc.call.resume()
break
except ChunkInterrupted as next_exc:
exc = next_exc
"""
# Subclasses override with a ``str.format`` template; the format
# call sees ``completed_chunks`` and ``total_chunks`` as kwargs.
_MESSAGE_TEMPLATE: ClassVar[str] = (
"Chunked request interrupted after {completed_chunks}/"
"{total_chunks} sub-requests; call .call.resume() to continue."
)
def __init__(
self,
*,
completed_chunks: int,
total_chunks: int,
call: ChunkedCall | None = None,
retry_after: float | None = None,
cause: BaseException | None = None,
) -> None:
message = self._MESSAGE_TEMPLATE.format(
completed_chunks=completed_chunks, total_chunks=total_chunks
)
if cause is not None:
cause_msg = str(cause) or type(cause).__name__
message = f"{message} Cause: {type(cause).__name__}: {cause_msg}"
super().__init__(message)
self.completed_chunks = completed_chunks
self.total_chunks = total_chunks
self.call = call
self.retry_after = retry_after
# Snapshot partial state at raise time so the exception's view stays
# stable across later ``call.resume()`` advances (the live view is on
# ``call.partial_frame`` / ``.partial_response``). ``.copy()`` guards
# the single-chunk fast path, where the frame may be returned verbatim.
if call is None:
self.partial_frame: pd.DataFrame = pd.DataFrame()
self.partial_response: httpx.Response | None = None
else:
self.partial_frame = call.partial_frame.copy()
self.partial_response = call.partial_response
def __getstate__(self) -> dict[str, Any]:
# Drop the live ChunkedCall before pickling: its ``.fetch`` is an
# undecorated module function pickle can't reference by name, so the
# interruption can't cross a process boundary with ``.call`` attached.
# The degraded ``call=None`` form keeps the counts, retry hint, and
# partial frame / response; only ``.resume()`` is lost (cross-process
# resume was never possible anyway).
return {**super().__getstate__(), "call": None}
[docs]
class QuotaExhausted(ChunkInterrupted):
"""
A sub-request returned HTTP 429 — the per-key rate-limit window
is exhausted. Subclass of :class:`ChunkInterrupted`.
The completed sub-requests are preserved on ``.call``; once the
rate-limit window resets, ``.call.resume()`` re-issues only the
still-pending work. ``partial_frame`` holds what completed
before the 429.
"""
_MESSAGE_TEMPLATE = (
"HTTP 429 after {completed_chunks}/{total_chunks} sub-requests; "
"catch QuotaExhausted (or ChunkInterrupted) to access "
".partial_frame or .call.resume() once the rate-limit "
"window has rolled over."
)
[docs]
class ServiceInterrupted(ChunkInterrupted):
"""
A sub-request returned HTTP 5xx — the upstream service failed
transiently. Subclass of :class:`ChunkInterrupted`.
The completed sub-requests are preserved on ``.call``; once the
upstream recovers, ``.call.resume()`` resumes only the
still-pending work.
"""
_MESSAGE_TEMPLATE = (
"Service error after {completed_chunks}/{total_chunks} "
"sub-requests; catch ServiceInterrupted (or ChunkInterrupted) "
"and call .call.resume() once the upstream service recovers."
)