Source code for pycap.solutions

import sys
import warnings
import numpy as np
import pandas as pd
import scipy.integrate as integrate
import scipy.special as sps
from scipy.special import gammaln

from pycap.pycap_exceptions import PycapException


def _time_dist_error(funcname):
    """Function for trying to call both time and distance
    as arrays in a function
    """
    raise PycapException(
        "cannot have both time and distance as arrays\n"
        + f"in the {funcname} method.  Need to externally loop\n"
        + "over one of the arrays and pass the other"
    )


def _make_arrays(a):
    """private function to force values to
    arrays from lists or scalars
    """
    if isinstance(a, np.ndarray):
        return a.astype(float)
    else:
        return np.atleast_1d(a).astype(float)


# suppress divide by zero errors
np.seterr(divide="ignore", invalid="ignore")

""" File of drawdown and stream depletion analytical solutions
    as part of the pycap suite.

"""


def _check_nones(all_vars, var_dict):
    """Function to check if any of the required parameters are
    set to None (default) value. Call to this function
    is added to all solutions that require parameters in
    addition to T, S, time, dist and Q.

    Parameters
    ----------
    all_vars: dictionary
        dictionary of variable values passed from calling
        routine, can be generated using locals()
    var_dict: dictionary
        key is the function name and value is a list of
        required parameter names.
    """
    fxn_name = list(var_dict.keys())[0]

    nonevars = {
        k: v
        for k, v in all_vars.items()
        if (k in var_dict[fxn_name]) & (v is None)
    }
    if len(nonevars) > 0:
        raise PycapException(
            f"The function: {fxn_name} requires the following\n"
            + "additional arguments which were missing\n"
            + "in the function call:\n"
            + ", ".join(nonevars.keys())
        )


# define drawdown methods here
[docs]def theis_drawdown(T, S, time, dist, Q, **kwargs): """Function to calculate Theis drawdown. Units are not specified, but should be consistent length and time. Calculates the Theis drawdown solution at specified times or distances from a pumping well. https://pubs.usgs.gov/publication/70198446 Parameters ---------- T: float transmissivity [L**2/T] S: float storage [dimensionless] time: float, optionally np.array or list time at which to calculate results [T] dist: float, optionally np.array or list distance at which to calculate results in [L] Q: float pumping rate (+ is extraction) [L**3/T] **kwargs: included to all drawdown methods for extra values required in some calls Returns ------- drawdown: float or array of floats drawdown values at input parameter times/distances [L] """ time = _make_arrays(time) dist = _make_arrays(dist) if len(dist) > 1 and len(time) > 1: _time_dist_error("theis_drawdown") # construct the well function argument # is dist is zero, then function does not exist # trap for dist==0 and set to small value # sort out geometry of solution if len(dist) >= len(time): if time[0] == 0: return 0 else: ddn = np.zeros_like(dist) u = dist**2.0 * S / (4.0 * T * time) ddn[dist != 0] = (Q / (4.0 * np.pi * T)) * sps.exp1(u[dist != 0]) return ddn elif len(time) > len(dist): ddn = np.zeros_like(time) u = np.zeros_like(time) u[time != 0] = dist**2.0 * S / (4.0 * T * time[time != 0]) ddn[time != 0] = (Q / (4.0 * np.pi * T)) * sps.exp1(u[time != 0]) return ddn
[docs]def hunt_99_drawdown( T, S, time, dist, Q, streambed_conductance=None, x=None, y=None, **kwargs ): """Function to calculate drawdown in an aquifer with a partially penetrating stream including streambed resistance (Hunt, 1999). Units are not specified, but should be consistent length and time. The solution becomes the Theis solution if streambed conductance is zero, and approaches an image-well solution from Theis or Glover and Balmer (1954) as streambed conductance gets very large. Note that the well is located at the location x,y = (l, 0) and the stream is aligned with y-axis at x=0. x and y locations provided are the points at which drawdown is calculated and reported. It is possible to provide x and y ndarrays generated with `numpy.meshgrid`. Hunt, B., 1999, Unsteady streamflow depletion from ground water pumping: Groundwater, v. 37, no. 1, pgs. 98-102, https://doi.org/10.1111/j.1745-6584.1999.tb00962.x Parameters ---------- T: float Transmissivity of aquifer [L**2/T] S: float Storativity of aquifer [dimensionless] time: float, optionally np.array or list time at which to calculate results [T] dist: float distance between well and stream in [L] Q : float pumping rate (+ is extraction) [L**3/T] streambed_conductance: float streambed conductance [ft/d] (lambda in the paper) x: float, optionally ndarray x locations at which to report calculated drawdown. y: float, optionally ndarray y locations at which to report calculated drawdown. **kwargs: included to all drawdown methods for extra values required in some calls Returns ------- drawdown: float single value, meshgrid of drawdowns, or np.array with shape (ntimes, meshgridxx, meshgridyy) depending on input form of x, y, and ntimes [L] """ _check_nones( locals(), {"hunt_99_drawdown": ["streambed_conductance", "x", "y"]} ) # turn lists into np.array so they get handled correctly, # check if time or space is an array time = _make_arrays(time) dist = _make_arrays(dist) if len(dist) > 1: PycapException( "hunt_99_drawdown can only accept a single distance argument" ) spacescalar = True if isinstance(x, np.ndarray): spacescalar = False # compute a single x, y point at a given time if len(time) == 1 and spacescalar: # handle zero time if time[0] == 0: return 0 else: warnings.filterwarnings( "ignore", category=integrate.IntegrationWarning ) [strmintegral, err] = integrate.quad( _ddwn2, 0.0, np.inf, args=(dist[0], x, y, T, streambed_conductance, time[0], S), ) return (Q / (4.0 * np.pi * T)) * ( _ddwn1(dist[0], x, y, T, streambed_conductance, time[0], S) - strmintegral ) # compute a vector of times for a given point if len(time) > 1 and spacescalar: drawdowns = np.zeros_like(time) for i, tm in enumerate(time): # special case for zero time if tm != 0: warnings.filterwarnings( "ignore", category=integrate.IntegrationWarning ) [strmintegral, err] = integrate.quad( _ddwn2, 0.0, np.inf, args=(dist[0], x, y, T, streambed_conductance, tm, S), ) drawdowns[i] = (Q / (4.0 * np.pi * T)) * ( _ddwn1(dist[0], x, y, T, streambed_conductance, tm, S) - strmintegral ) return drawdowns # if meshgrid is passed, return an np.array with dimensions # ntimes, num_x, num_y if not spacescalar: numrow = np.shape(x)[0] numcol = np.shape(x)[1] drawdowns = np.zeros(shape=(len(time), numrow, numcol)) for time_idx in range(0, len(time)): for i in range(0, numrow): for j in range(0, numcol): # special case for zero time if time[time_idx] == 0: drawdowns[time_idx, i, j] = 0 else: warnings.filterwarnings( "ignore", category=integrate.IntegrationWarning ) [strmintegral, err] = integrate.quad( _ddwn2, 0.0, np.inf, args=( dist[0], x[i, j], y[i, j], T, streambed_conductance, time[time_idx], S, ), ) drawdowns[time_idx, i, j] = (Q / (4.0 * np.pi * T)) * ( _ddwn1( dist[0], x[i, j], y[i, j], T, streambed_conductance, time[time_idx], S, ) - strmintegral ) return drawdowns
def _ddwn1(dist, x, y, T, streambed, time, S): """Internal method to calculate Theis drawdown function for a point (x,y) Used in computing Hunt, 1999 estimate of drawdown. Equation 30 from the paper. Variables described in hunt_99_drawdown function. """ # construct the well function argument # if (l-x) is zero, then function does not exist # trap for (l-x)==0 and set to small value dist = dist - x if dist == 0.0: dist = 0.001 u1 = ((dist) ** 2 + y**2) / (4.0 * T * time / S) return sps.exp1(u1) def _ddwn2(theta, dist, x, y, T, streambed, time, S): """Internal method to calculate function that gets integrated in the Hunt (1999) solution Equations 29 and 30 in the paper, theta is the constant of integration and the rest of the variables described in the hunt_99_drawdown function. """ if streambed == 0.0: return 0.0 u2 = ((dist + np.abs(x) + 2 * T * theta / streambed) ** 2 + y**2) / ( 4.0 * T * time / S ) return np.exp(-theta) * sps.exp1(u2)
[docs]def ward_lough_drawdown( T1, S1, t, dist, Q, T2=None, S2=None, width=None, streambed_thick=None, streambed_K=None, aquitard_thick=None, aquitard_K=None, x=None, y=None, NSteh1=2, NSteh2=2, **kwargs, ): """Compute drawdown using Ward and Lough (2011) solution Ward and Lough (2011) presented a solution for streamflow depletion by a pumping well in a layered aquifer system. The stream is in the upper aquifer, and the pumping well is in a lower aquifer that is separated from the upper aquifer by a semi-confining aquitard layer. Ward, N.D.,and Lough, H., 2011, Stream depletion from pumping a semiconfined aquifer in a two-layer leaky aquifer system (technical note): Journal of Hydrologic Engineering ASCE, v. 16, no. 11, pgs. 955-959, https://doi.org/10.1061/(ASCE)HE.1943-5584.0000382. Parameters ---------- T: float Transmissivity in the upper aquifer [L**2/T] (K*D or T1 in the original paper) S: float Specific yield for upper aquifer [unitless] (S1 in the original paper) t: float, optionally np.array or list time at which to calculate results [T] dist: Distance between pumping well and stream [L] (L in the original paper) Q: float pumping rate (+ is extraction) [L**3/T] **kwargs: included to all drawdown methods for extra values required in some calls Returns ------- ddwn float, 2-column ndarray drawdown at specified location [L] in the shallow aquifer (column 0) and the deeper aquifer (column 1) Other Parameters ---------------- T2: float Transmissivity of deeper system S2: float Storativity of streambed_thick: float thickness of streambed streambed_K: float hydraulic conductivity of streambed, [L/T] aquitard_thick: float thickness of intervening leaky aquitard, [L] aquitard_K: float hydraulic conductivity of intervening leaky aquifer, [L/T] x: float x-coordinate of drawdown location (with origin being x=0 at stream location) [L] y: float y-coordinate of drawdown location (with origin being y=0 at pumping well location) [L] NSteh1: int Number of Stehfest series levels - algorithmic tuning parameter. Defaults to 2. NStehl2: int Number of Stehfest series levels - algorithmic tuning parameter. Defaults to 2. width: float stream width (b in paper) [L] """ t = _make_arrays(t) dist = _make_arrays(dist) if len(dist) > 1: PycapException( "ward_lough_drawdown can only accept a single distance argument" ) _check_nones( locals(), { "ward_lough_drawdown": [ "T2", "S2", "width", "streambed_thick", "streambed_K", "aquitard_thick", "aquitard_K", "x", "y", ] }, ) # first nondimensionalize all the parameters x, y, t, T1, S1, K, lambd = _WardLoughNonDimensionalize( T1, T2, S1, S2, width, Q, dist[0], streambed_thick, streambed_K, aquitard_thick, aquitard_K, t, x, y, ) # Initialize output arrays s1 = np.zeros_like(t) s2 = np.zeros_like(t) # Inverse Fourier transform for ii in range(len(t)): # special case for zero time if t[ii] == 0: s1[ii] = 0 s2[ii] = 0 else: try: s1[ii] = _StehfestCoeff(1, NSteh1) * _if1( T1, S1, K, lambd, x, y, np.log(2) / t[ii] ) for jj in range(2, NSteh1 + 1): s1[ii] += _StehfestCoeff(jj, NSteh1) * _if1( T1, S1, K, lambd, x, y, jj * np.log(2) / t[ii] ) s1[ii] *= np.log(2) / t[ii] except OverflowError as e: print(f"Overflow error in s1 calculation at index {ii}: {e}") s1[ii] = np.nan # Assign NaN if there's an overflow try: s2[ii] = _StehfestCoeff(1, NSteh2) * _if2( T1, S1, K, lambd, x, y, np.log(2) / t[ii] ) for jj in range(2, NSteh2 + 1): s2[ii] += _StehfestCoeff(jj, NSteh2) * _if2( T1, S1, K, lambd, x, y, jj * np.log(2) / t[ii] ) s2[ii] *= np.log(2) / t[ii] except OverflowError as e: print(f"Overflow error in s2 calculation at index {ii}: {e}") s2[ii] = np.nan # Assign NaN if there's an overflow return np.array(list(zip(s1 * Q / T2, s2 * Q / T2))) # re-dimensionalize
# define stream depletion methods here
[docs]def glover_depletion(T, S, time, dist, Q, **kwargs): """ Calculate Glover and Balmer (1954) solution for stream depletion Depletion solution for a well near a river where the river fully penetrates the aquifer and there is no streambed resistance. Glover, R.E. and Balmer, G.G., 1954, River depletion from pumping a well near a river, Eos Transactions of the American Geophysical Union, v. 35, no. 3, pg. 468-470, https://doi.org/10.1029/TR035i003p00468. Parameters ---------- T: float transmissivity [L**2/T] S: float storage [unitless] time: float, optionally np.array or list time at which to calculate results [T] dist: float, optionally np.array or list distance at which to calculate results in [ft] Q: float pumping rate (+ is extraction) [L**3/T] **kwargs: included to all depletion methods for extra values required in some calls Returns ------- drawdown: float depletion values at at input parameter times/distances """ # turn lists into np.array so they get handled correctly time = _make_arrays(time) dist = _make_arrays(dist) if len(dist) > 1 and len(time) > 1: _time_dist_error("glover_depletion") if len(time) == 1 and len(dist) == 1: if time == 0: return 0 else: return Q * sps.erfc(dist[0] / np.sqrt(4 * (T / S) * time[0])) elif len(time) == 1 and len(dist) > 1: # handle zero time condition for list-like times if time[0] == 0: return np.zeros_like(dist) else: z = dist / np.sqrt(4 * (T / S) * time) return Q * sps.erfc(z) elif len(time) > 1 and len(dist) == 1: # handle zero time condition for list-like times z = np.zeros_like(time) z[time != 0] = dist / np.sqrt(4 * (T / S) * time[time != 0]) depl = np.zeros_like(time) depl[time != 0] = Q * sps.erfc(z[time != 0]) return depl
[docs]def sdf(T, S, dist, **kwargs): """ internal function for Stream Depletion Factor Stream Depletion Factor was defined by Jenkins (1968) and described in Jenkins as the time when the volume of stream depletion is 28 percent of the net volume pumped from the well. SDF = dist**2 * S/T. Jenkins, C.T., Computation of rate and volume of stream depletion by wells: U.S. Geological Survey Techniques of Water-Resources Investigations, Chapter D1, Book 4, https://pubs.usgs.gov/twri/twri4d1/. Parameters ---------- T: float transmissivity [L**2/T] S: float storage [unitless] dist: float, optionally np.array or list distance at which to calculate results in [L] **kwargs: included to all depletion methods for extra values required in some calls Returns ------- SDF: float Stream depletion factor [T] """ if isinstance(dist, list): dist = np.array(dist) return dist**2 * S / T
[docs]def walton_depletion(T, S, time, dist, Q, **kwargs): """ Calculate depletion using Walton (1987) PT-8 BASIC program logic Provides the Glover and Balmer (Jenkins) solution. Walton, W.C., Groundwater Pumping Tests: Lewis Publishers, Chelsea, Michigan, 201 p. Note that unlike the other depletion functions, this Walton function is unit-specific, using feet and days as dimensions. Parameters ---------- T: float transmissivity [gal per d per ft] S: float storage [unitless] time: float, optionally np.array or list time at which to calculate results [d] dist: float, optionally np.array or list distance at which to calculate results in [ft] Q: float pumping rate (+ is extraction) [ft**3/d] **kwargs: included to all depletion methods for extra values required in some calls Returns ------- drawdown: float depletion values at at input parameter times/distances """ # turn lists into np.array so they get handled correctly time = _make_arrays(time) dist = _make_arrays(dist) if len(dist) > 1 and len(time) > 1: _time_dist_error("walton_depletion") if len(time) == 1: if time[0] == 0: return 0 else: # avoid divide by zero for time==0 # time = time.values G = dist / np.sqrt((0.535 * time * T / S)) elif len(dist) == 1: G = np.zeros_like(time).astype(float) G[time != 0] = dist / np.sqrt((0.535 * time[time != 0] * T / S)) I = ( 1 + 0.0705230784 * G + 0.0422820123 * (G**2) + 9.2705272e-03 * (G**3) ) J = ( I + 1.52014e-04 * (G**4) + 2.76567e-04 * (G**5) + 4.30638e-05 * (G**6) ) ** 16 depl = Q * (1 / J) # handle zero time condition depl[time == 0] = 0.0 if len(depl) == 1: return depl[0] else: return depl
[docs]def hunt_99_depletion( T, S, time, dist, Q, streambed_conductance=None, **kwargs ): """Function for Hunt (1999) solution for streamflow depletion by a pumping well. Computes streamflow depletion by a pumping well for a partially penetrating stream with streambed resistance. Hunt, B., 1999, Unsteady streamflow depletion from ground water pumping: Groundwater, v. 37, no. 1, pgs. 98-102, https://doi.org/10.1111/j.1745-6584.1999.tb00962.x Parameters ---------- T: float transmissivity [L**2/T] S: float storage [unitless] time: float, optionally np.array or list time at which to calculate results [T] dist: float, optionally np.array or list distance at which to calculate results in [L] Q: float pumping rate (+ is extraction) [L**3/T] **kwargs: included to all depletion methods for extra values required in some calls Returns ------- Qs: float streamflow depletion rate, optionally np.array or list depending on input of time and dist [L**3/T] Other Parameters ---------------- streambed_conductance: float streambed_conductance conductance [L/T] (lambda in the paper) """ _check_nones(locals(), {"hunt_99_depletion": ["streambed_conductance"]}) # turn lists into np.array so they get handled correctly time = _make_arrays(time) dist = _make_arrays(dist) if len(dist) > 1 and len(time) > 1: _time_dist_error("hunt_99_depletion") elif len(time) == 1: if time[0] == 0: return 0 else: a = np.sqrt(S * dist**2 / (4.0 * T * time)) b = (streambed_conductance**2 * time) / (4 * S * T) c = (streambed_conductance * dist) / (2.0 * T) y = np.sqrt(b) + a t1 = sps.erfcx(y) t2 = np.exp(b + c - y**2) depl = sps.erfc(a) - (t1 * t2) if len(dist) > 1: return depl * Q else: return depl[0] * Q elif len(dist) == 1: a = np.zeros_like(time) a[time != 0] = np.sqrt(S * dist**2 / (4.0 * T * time[time != 0])) b = (streambed_conductance**2 * time) / (4 * S * T) c = (streambed_conductance * dist) / (2.0 * T) y = np.sqrt(b) + a t1 = sps.erfcx(y) t2 = np.exp(b + c - y**2) depl = np.zeros_like(a) depl[time != 0] = sps.erfc(a[time != 0]) - ( t1[time != 0] * t2[time != 0] ) return depl * Q
[docs]def hunt_03_depletion( T, S, time, dist, Q, Bprime=None, Bdouble=None, aquitard_K=None, sigma=None, width=None, streambed_conductance=None, **kwargs, ): """Function for Hunt (2003) solution for streamflow depletion by a pumping well. Computes streamflow depletion by a pumping well in a semiconfined aquifer for a partially penetrating stream. The stream is in an upper semi-confining aquifer and pumping is in a lower aquifer. Hunt, B., 2003, Unsteady streamflow depletion when pumping from semiconfined aquifer: Journal of Hydrologic Engineering, v.8, no. 1, pgs 12-19. https://doi.org/10.1061/(ASCE)1084-0699(2003)8:1(12) Parameters ---------- T: float transmissivity [L**2/T] S: float storage [unitless] time: float, optionally np.array or list time at which to calculate results [T] dist: float, distance at which to calculate results in [L] Note, because of computation demand, only a single value for distance can be computed in a call Q: float pumping rate (+ is extraction) [L**3/T] **kwargs: included to all depletion methods for extra values required in some calls Returns ------- Qs: float streamflow depletion rate, optionally np.array or list depending on input of time and dist [L**3/T] Other Parameters ---------------- Bprime: float saturated thickness of semiconfining layer containing stream, [L] Bdouble: float distance from bottom of stream to bottom of semiconfining layer, [L] (aquitard thickness beneath the stream) aquitard_K: float hydraulic conductivity of semiconfining layer [L/T] sigma: float porosity of semiconfining layer width: float stream width (b in paper) [T] streambed_conductance: float streambed conductance [L/T] (lambda in the paper), only used if K is less than 1e-10 """ _check_nones( locals(), { "hunt_03_depletion": [ "Bprime", "Bdouble", "aquitard_K", "sigma", "width", "streambed_conductance", ] }, ) # turn lists into np.array so they get handled correctly time = _make_arrays(time) dist = _make_arrays(dist) if len(dist) > 1 and len(time) > 1: _time_dist_error("hunt_03_depletion") if len(dist) > 1: raise PycapException( "cannot have distance as an array\n" + f"in the hunt_03_depletion method. Need to externally loop\n" + "over distance" ) dist = dist[0] # make dimensionless group used in equations dtime = (T * time) / (S * np.power(dist, 2)) # if K is really small, set streambed conductance to a value # so solution collapses to Hunt 1999 (confined aquifer solution) if aquitard_K < 1.0e-10: lam = streambed_conductance else: lam = aquitard_K * width / Bdouble dlam = lam * dist / T epsilon = S / sigma dK = (aquitard_K / Bprime) * np.power(dist, 2) / T # numerical integration of F() and G() functions to # get correction to Hunt(1999) estimate of streamflow depletion # because of storage in the semiconfining aquifer correction = [] for dt in dtime: # correcting for zero time if dt == 0: correction.append(0) else: warnings.filterwarnings( "ignore", category=integrate.IntegrationWarning ) # note that err from fixed_quad gets returned as None y, err = integrate.fixed_quad( _integrand, 0.0, 1.0, args=(dlam, dt, epsilon, dK), n=100 ) correction.append(dlam * y) # terms for depletion, similar to Hunt (1999) but repeated # here so it matches the 2003 paper. # note correcting for zero time a = np.zeros_like(dtime) a[dtime != 0] = 1.0 / (2.0 * np.sqrt(dtime[dtime != 0])) b = dlam / 2.0 + (dtime * np.power(dlam, 2) / 4.0) c = a + (dlam * np.sqrt(dtime) / 2.0) # use erfxc() function from scipy (see hunt_99_depletion above) # for erf(b)*erfc(c) term t1 = sps.erfcx(c) t2 = np.exp(b - c**2) # note correcting for zero time depl = np.zeros_like(dtime) depl[dtime != 0] = sps.erfc(a[dtime != 0]) - ( t1[dtime != 0] * t2[dtime != 0] ) # corrected depletion for storage of upper semiconfining unit if len(depl) == 1: return Q * (depl[0] - correction[0]) else: return Q * (depl - correction)
def _F(alpha, dlam, dtime): """F function from paper in equation (46) as given by equation (47) in Hunt (2003) Parameters ---------- alpha: float integration variable dlam: float dimensionless streambed/semiconfining unit conductance (width * K/B'') * distance/Transmissivity dt: float dimensionless time (time * transmissivity)/(storativity * distance**2) """ # Hunt uses an expansion if dimensionless time>3 z = alpha * dlam * np.sqrt(dtime) / 2.0 + 1.0 / ( 2.0 * alpha * np.sqrt(dtime) ) F = np.where(np.abs(z)<3.0, _f1(z, dlam, dtime, alpha), _f2(z, dlam, dtime, alpha)) return F def _f1(z, dlam, dtime, alpha): """function for splitting up _F above""" a = dlam / 2.0 + (dtime * np.power(alpha, 2) * np.power(dlam, 2) / 4.0) t1 = sps.erfcx(z) t2 = np.exp(a - z**2) b = -1.0 / (4 * dtime * alpha**2) # equation 47 in paper F = np.exp(b) * np.sqrt(dtime / np.pi) - ( alpha * dtime * dlam ) / 2.0 * (t1 * t2) return F def _f2(z, dlam, dtime, alpha): """function for splitting up _F above""" t1 = np.exp(-(1.0 / (4.0 * dtime * alpha**2))) / ( 2.0 * alpha * z * np.sqrt(np.pi) ) t2 = 2.0 / (dlam * (1.0 + (1.0 / (dlam * dtime * alpha**2)) ** 2)) sumterm = ( 1 - (3.0 / (2 * z**2)) + (15.0 / (4.0 * z**4)) - (105.0 / (8 * z**6)) ) F = t1 * (1.0 + t2 * sumterm) # equation 53 in paper return F def _fgt(n, ab, abterm): """function for splitting up _G below""" n2 = n*2 return np.exp( np.log(sps.binom(n2, n)) + np.log(sps.gammainc(n2 + 1, ab)) + (n2) * np.log(abterm) ) def _flt(n, ab, abterm): """function for splitting up _G below""" n2 = n*2 return ( sps.binom(n2, n) * sps.gammainc(n2 + 1, ab) * abterm ** (n2) ) def _G(alpha, epsilon, dK, dtime): """G function from paper in equation (46) in Hunt (2003) This function is in equation (46) and expanded in equation (53). Function uses scipy special for incomplete Gamma Function (P(a,b)), binomial coefficient, and modified Bessel function of zero order (I0). Parameters ---------- alpha: float integration variable epsilon: float dimensionless storage storativity/porosity of semiconfining bed dK: float dimensionless conductivity of semiconfining unit (K * Bprime) * dist**2/Transmissivity """ # if dimensionless K is zero (check really small), return 0 # this avoids divide by zero error in terms that have divide by (a+b) if dK < 1.0e-10: return 0.0 alpha2 = alpha**2 dKdtime = dK * dtime a = epsilon * dKdtime * (1.0 - alpha2) b = dKdtime * alpha2 ab = a + b atb = a * b sqrt_atb = np.sqrt(atb) term1 = np.where(ab<80, np.exp(-ab) * sps.i0(2.0 * sqrt_atb), 0.0) abterm = sqrt_atb / ab n = np.arange(60, dtype=np.float64) sum1 = 0. for n in np.arange(60, dtype=np.float64): sum1 = sum1 + np.where(n <= 8, _flt(n, ab, abterm), _fgt(n, ab, abterm)) eqn52 = 0.5 * (1.0 - term1 + ((b - a) / ab) * sum1) eqn52 = np.where(eqn52< 0., 0., eqn52) eqn52 = np.where(eqn52> 1., 1., eqn52) return eqn52 def _integrand(alpha, dlam, dtime, epsilon, dK): """internal function returning product of F() and G() terms for numerical integration""" return _F(alpha, dlam, dtime) * _G(alpha, epsilon, dK, dtime) def _calc_deltaQ(Q): """internal function to parse the Q time series to find changes and their associated times Parameters ---------- Q: pandas Series time series of pumping Returns ------- deltaQ: pandas Series) times and changes in Q over time """ # find the differences in pumping dq = Q.copy() dq.iloc[1:] = np.diff(Q) # get the locations of changes deltaQ = dq.loc[dq != 0] # special case for starting with 0 pumping if Q.index[0] not in deltaQ.index: deltaQ.loc[Q.index[0]] = Q.iloc[0] deltaQ.sort_index(inplace=True) return deltaQ def _WardLoughNonDimensionalize( T1, T2, S1, S2, width, Q, dist, streambed_thick, streambed_K, aquitard_thick, aquitard_K, t, x=0, y=0, ): """Internal function to make non-dimensional groups for Ward and Lough solution""" t = np.array(t) # make sure not passing a list if x is not None: x /= dist if y is not None: y /= dist t = t * T2 / (S2 * (dist**2)) T1 /= T2 S1 /= S2 K = ((aquitard_K / aquitard_thick) * (dist**2)) / T2 lambd = ((streambed_K * width) / streambed_thick) * dist / T2 return x, y, t, T1, S1, K, lambd
[docs]def ward_lough_depletion( T1, S1, t, dist, Q, T2=None, S2=None, width=None, streambed_thick=None, streambed_K=None, aquitard_thick=None, aquitard_K=None, NSteh1=2, **kwargs, ): """ Compute streamflow depletion using Ward and Lough (2011) solution Ward and Lough (2011) presented a solution for streamflow depletion by a pumping well in a layered aquifer system. The stream is in the upper aquifer, and the pumping well is in a lower aquifer that is separated from the upper aquifer by a semi-confining aquitard layer. Ward, N.D.,and Lough, H., 2011, Stream depletion from pumping a semiconfined aquifer in a two-layer leaky aquifer system (techical note): Journal of Hydrologic Engineering ASCE, v. 16, no. 11, pgs. 955-959, https://doi.org/10.1061/(ASCE)HE.1943-5584.0000382. Parameters ---------- T: float transmissivity [L**2/T] storage [unitless] specific yield in the upper q aquifer time: float, optionally np.array or list time at which to calculate results [T] dist: float, optionally np.array or list distance at which to calculate results in [L] Q: float pumping rate (+ is extraction) [L**3/T] **kwargs: included to all depletion methods for extra values required in some calls Returns ------- Qs: float streamflow depletion rate, optionally np.array or list depending on input of time and dist [L**3/T] Other Parameters ---------------- T2: float Transmissivity of deeper system S2: float Storativity of streambed_thick: float thickness of streambed streambed_K: float hydraulic conductivity of streambed, [L/T] aquitard_thick: float thickness of intervening leaky aquitard, [L] aquitard_K: float hydraulic conductivity of intervening leaky aquifer, [L/T] NSteh1: int Number of Stehfest series levels - algorithmic tuning parameter. Defaults to 2. width: float stream width (b in paper) [L] """ _check_nones( locals(), { "ward_lough_depletion": [ "T2", "S2", "width", "streambed_thick", "streambed_K", "aquitard_thick", "aquitard_K", ] }, ) # first nondimensionalize all the parameters x, y, t, T1, S1, K, lambd = _WardLoughNonDimensionalize( T1, T2, S1, S2, width, Q, dist, streambed_thick, streambed_K, aquitard_thick, aquitard_K, t, 0, 0, ) # Inverse Fourier transform if isinstance(t, int) or isinstance(t, float): if t == 0: return 0 else: if isinstance(t, list): t = np.array(t) DeltaQ = np.zeros_like(t) DeltaQ[t != 0] = _StehfestCoeff(1, NSteh1) * _if1_dQ( T1, S1, K, lambd, np.log(2) / t[t != 0] ) for jj in range(2, NSteh1 + 1): DeltaQ[t != 0] += _StehfestCoeff(jj, NSteh1) * _if1_dQ( T1, S1, K, lambd, jj * np.log(2) / t[t != 0] ) DeltaQ[t != 0] = ( 2 * np.pi * lambd * DeltaQ[t != 0] * np.log(2) / t[t != 0] ) return DeltaQ * Q # redimentionalize
def _if1_dQ(T1, S1, K, lambda_, p): """Internal function for Ward and Lough (2011) solution""" return _kernel1(T1, S1, K, lambda_, 0, 0, p) + _kernel2( T1, S1, K, lambda_, 0, 0, p ) def _if1(T1, S1, K, lambd, x, y, p): """Internal function for Ward and Lough (2011) solution""" G = ( lambda phi: 2 * ( _kernel1(T1, S1, K, lambd, x, np.tan(phi), p) + _kernel2(T1, S1, K, lambd, x, np.tan(phi), p) ) * np.cos(np.tan(phi) * y) / np.cos(phi) ** 2 ) warnings.filterwarnings("ignore", category=integrate.IntegrationWarning) s1InvFour, _ = integrate.quad( G, 0, np.pi / 2, epsrel=1e-1, epsabs=1e-1, limit=10000 ) return s1InvFour def _if2(T1, S1, K, lambd, x, y, p): """Internal function for Ward and Lough (2011) solution""" H = ( lambda phi: 2 * ( _coeff_s1_1(T1, S1, K, lambd, np.tan(phi), p) * _kernel1(T1, S1, K, lambd, x, np.tan(phi), p) + _coeff_s1_2(T1, S1, K, lambd, np.tan(phi), p) * _kernel2(T1, S1, K, lambd, x, np.tan(phi), p) ) * np.cos(np.tan(phi) * y) / np.cos(phi) ** 2 ) warnings.filterwarnings("ignore", category=integrate.IntegrationWarning) s2InvFour, errbnd = integrate.quad( H, 0, np.pi / 2, epsrel=1e-1, epsabs=1e-1, limit=10000 ) return s2InvFour def _coeff_s1_1(T1, S1, K, lambd, theta, p): """Internal function for Ward and Lough (2011) solution""" b11, b12, b22, mu1, mu2, l1, l2, beta1, beta2, A1, A2 = _coeffs( T1, S1, K, lambd, theta, p ) B1 = (mu1 * T1 - b11) / b12 return B1 def _coeff_s1_2(T1, S1, K, lambd, theta, p): """Internal function for Ward and Lough (2011) solution""" b11, b12, b22, mu1, mu2, l1, l2, beta1, beta2, A1, A2 = _coeffs( T1, S1, K, lambd, theta, p ) B2 = (mu2 * T1 - b11) / b12 return B2 def _kernel1(T1, S1, K, lambd, x, theta_or_y, p): """Internal function for Ward and Lough (2011) solution""" b11, b12, b22, mu1, mu2, l1, l2, beta1, beta2, A1, A2 = _coeffs( T1, S1, K, lambd, theta_or_y, p ) if x < 0: F1 = A1 * np.exp(x * np.sqrt(mu1)) elif 0 <= x <= 1: F1 = A1 * np.exp(-x * np.sqrt(mu1)) + beta1 / ( 2 * np.sqrt(mu1) * l1 ) * (np.exp((x - 1) * np.sqrt(mu1)) - np.exp(-(x + 1) * np.sqrt(mu1))) else: F1 = A1 * np.exp(-x * np.sqrt(mu1)) + beta1 / ( 2 * np.sqrt(mu1) * l1 ) * (np.exp((1 - x) * np.sqrt(mu1)) - np.exp(-(x + 1) * np.sqrt(mu1))) return F1 def _kernel2(T1, S1, K, lambd, x, theta_or_y, p): """Internal function for Ward and Lough (2011) solution""" b11, b12, b22, mu1, mu2, l1, l2, beta1, beta2, A1, A2 = _coeffs( T1, S1, K, lambd, theta_or_y, p ) if x < 0: F2 = A2 * np.exp(x * np.sqrt(mu2)) elif 0 <= x <= 1: F2 = A2 * np.exp(-x * np.sqrt(mu2)) + beta2 / ( 2 * np.sqrt(mu2) * l2 ) * (np.exp((x - 1) * np.sqrt(mu2)) - np.exp(-(x + 1) * np.sqrt(mu2))) else: F2 = A2 * np.exp(-x * np.sqrt(mu2)) + beta2 / ( 2 * np.sqrt(mu2) * l2 ) * (np.exp((1 - x) * np.sqrt(mu1)) - np.exp(-(x + 1) * np.sqrt(mu1))) return F2 def _coeffs(T1, S1, K, lambd, theta_or_y, p): """Internal function for Ward and Lough (2011) solution""" b11 = T1 * theta_or_y**2 + S1 * p + K b12 = -K b22 = theta_or_y**2 + p + K mu1 = (b11 / T1 + b22) / 2 + np.sqrt( (b11 / T1 + b22) ** 2 / 4 + (b12**2 - b11 * b22) / T1 ) mu2 = (b11 / T1 + b22) / 2 - np.sqrt( (b11 / T1 + b22) ** 2 / 4 + (b12**2 - b11 * b22) / T1 ) l1 = T1 + ((mu1 * T1 - b11) / b12) ** 2 l2 = T1 + ((mu2 * T1 - b11) / b12) ** 2 beta1 = (mu1 * T1 - b11) / (b12 * 2 * np.pi * p) beta2 = (mu2 * T1 - b11) / (b12 * 2 * np.pi * p) Delta = 4 * np.sqrt(mu1 * mu2) + 2 * lambd * ( np.sqrt(mu1) / l2 + np.sqrt(mu2) / l1 ) A1 = ( ( (lambd / l2 + 2 * np.sqrt(mu2)) * beta1 * np.exp(-np.sqrt(mu1)) - lambd * beta2 / l2 * np.exp(-np.sqrt(mu2)) ) / Delta / l1 ) A2 = ( ( -lambd * beta1 / l1 * np.exp(-np.sqrt(mu1)) + (lambd / l1 + 2 * np.sqrt(mu1)) * beta2 * np.exp(-np.sqrt(mu2)) ) / Delta / l2 ) return b11, b12, b22, mu1, mu2, l1, l2, beta1, beta2, A1, A2 def _safe_factorial(n): """Calculate factorial using logarithmic method to avoid overflow.""" if n < 0: return float("inf") elif n < 2: return 1 else: return np.exp(gammaln(n + 1)) def _StehfestCoeff(jj, N): """Internal function for Ward and Lough (2011) solution""" LowerLimit = (jj + 1) // 2 UpperLimit = min(jj, N // 2) V = 0 for kk in range(LowerLimit, UpperLimit + 1): denominator = ( _safe_factorial(N // 2 - kk) * _safe_factorial(kk) * _safe_factorial(kk - 1) * _safe_factorial(jj - kk) * _safe_factorial(2 * kk - jj) ) if denominator != 0: # Prevent division by zero V += kk ** (N // 2) * _safe_factorial(2 * kk) / denominator V *= (-1) ** (N // 2 + jj) return V # List drawdown and depletion methods so they can be called # programatically ALL_DD_METHODS = { "theis_drawdown": theis_drawdown, "hunt_99_drawdown": hunt_99_drawdown, "ward_lough_drawdown": ward_lough_drawdown, } ALL_DEPL_METHODS = { "glover_depletion": glover_depletion, "walton_depletion": walton_depletion, "hunt_99_depletion": hunt_99_depletion, "hunt_03_depletion": hunt_03_depletion, "ward_lough_depletion": ward_lough_depletion, } GPM2CFD = 60 * 24 / 7.48 # factor to convert from GPM to CFD CFD2GPM = 1 / GPM2CFD # factor to convert from CFD to GPM SEC2DAY = 60 * 60 * 24 # factor to conver x/sec to x/day