Source code for pycap.solutions

import sys

import numpy as np
import pandas as pd
import scipy.integrate as integrate
import scipy.special as sps
from scipy.special import gammaln

""" File of drawdown and stream depletion analytical solutions
    as part of the pycap suite.

"""


# define drawdown methods here
[docs]def theis(T, S, time, dist, Q, **kwargs): """Function to calculate Theis drawdown. Units are not specified, but should be consistent length and time. Calculates the Theis drawdown solution at specified times or distances from a pumping well. https://pubs.usgs.gov/publication/70198446 Parameters ---------- T: float transmissivity [L**2/T] S: float storage [dimensionless] time: float, optionally np.array or list time at which to calculate results [T] dist: float, optionally np.array or list distance at which to calculate results in [L] Q: float pumping rate (+ is extraction) [L**3/T] **kwargs: included to all drawdown methods for extra values required in some calls Returns ------- drawdown: float or array of floats drawdown values at input parameter times/distances [L] """ isarray = False if isinstance(time, list): time = np.array(time) if isinstance(dist, list): dist = np.array(dist) if isinstance(dist, pd.Series): dist = dist.values if isinstance(dist, np.ndarray): isarray = True # construct the well function argument # is dist is zero, then function does not exist # trap for dist==0 and set to small value if isarray: dist = np.where(dist == 0, 0.001, dist) else: if dist == 0.0: dist = 0.001 # compute u and then the Theis solution u = dist**2.0 * S / (4.0 * T * time) # calculate and return return (Q / (4.0 * np.pi * T)) * sps.exp1(u)
[docs]def hunt99ddwn( T, S, time, dist, Q, streambed_conductance=0, x=0, y=0, **kwargs ): """Function to calculate drawdown in an aquifer with a partially penetrating stream including streambed resistance (Hunt, 1999). Units are not specified, but should be consistent length and time. The solution becomes the Theis solution if streambed conductance is zero, and approaches an image-well solution from Theis or Glover and Balmer (1954) as streambed conductance gets very large. Note that the well is located at the location x,y = (l, 0) and the stream is aligned with y-axis at x=0. x and y locations provided are the points at which drawdown is calculated and reported. It is possible to provide x and y ndarrays generated with `numpy.meshgrid`. Hunt, B., 1999, Unsteady streamflow depletion from ground water pumping: Groundwater, v. 37, no. 1, pgs. 98-102, https://doi.org/10.1111/j.1745-6584.1999.tb00962.x Parameters ---------- T: float Transmissivity of aquifer [L**2/T] S: float Storativity of aquifer [dimensionless] time: float, optionally np.array or list time at which to calculate results [T] dist: float distance between well and stream in [L] Q : float pumping rate (+ is extraction) [L**3/T] streambed_conductance: float streambed conductance [ft/d] (lambda in the paper) x: float, optionally ndarray x locations at which to report calculated drawdown. y: float, optionally ndarray y locations at which to report calculated drawdown. **kwargs: included to all drawdown methods for extra values required in some calls Returns ------- drawdown: float single value, meshgrid of drawdowns, or np.array with shape (ntimes, meshgridxx, meshgridyy) depending on input form of x, y, and ntimes [L] """ # turn lists into np.array so they get handled correctly, # check if time or space is an array timescalar = True spacescalar = True if isinstance(time, list): time = np.array(time) if isinstance(time, np.ndarray): timescalar = False if isinstance(x, np.ndarray): spacescalar = False # compute a single x, y point at a given time if timescalar and spacescalar: [strmintegral, err] = integrate.quad( _ddwn2, 0.0, np.inf, args=(dist, x, y, T, streambed_conductance, time, S), ) return (Q / (4.0 * np.pi * T)) * ( _ddwn1(dist, x, y, T, streambed_conductance, time, S) - strmintegral ) # compute a vector of times for a given point if not timescalar and spacescalar: drawdowns = [] for tm in time: [strmintegral, err] = integrate.quad( _ddwn2, 0.0, np.inf, args=(dist, x, y, T, streambed_conductance, tm, S), ) drawdowns.append( (Q / (4.0 * np.pi * T)) * ( _ddwn1(dist, x, y, T, streambed_conductance, tm, S) - strmintegral ) ) return drawdowns # if meshgrid is passed, return an np.array with dimensions # ntimes, num_x, num_y if not spacescalar: numrow = np.shape(x)[0] numcol = np.shape(x)[1] if timescalar: time = np.array([time]) drawdowns = np.zeros(shape=(len(time), numrow, numcol)) for time_idx in range(0, len(time)): for i in range(0, numrow): for j in range(0, numcol): [strmintegral, err] = integrate.quad( _ddwn2, 0.0, np.inf, args=( l, x[i, j], y[i, j], T, streambed_conductance, time[time_idx], S, ), ) drawdowns[time_idx, i, j] = (Q / (4.0 * np.pi * T)) * ( _ddwn1( l, x[i, j], y[i, j], T, streambed_conductance, time[time_idx], S, ) - strmintegral ) return drawdowns
def _ddwn1(dist, x, y, T, streambed, time, S): """Internal method to calculate Theis drawdown function for a point (x,y) Used in computing Hunt, 1999 estimate of drawdown. Equation 30 from the paper. Variables described in hunt99ddwn function. """ if isinstance(dist, list): dist = np.array(dist) if isinstance(dist, pd.Series): dist = dist.values isarray = False if isinstance(dist, np.ndarray): isarray = True # construct the well function argument # if (l-x) is zero, then function does not exist # trap for (l-x)==0 and set to small value dist = dist - x if isarray: dist = np.where(dist == 0, 0.001, dist) else: if dist == 0.0: dist = 0.001 u1 = ((dist) ** 2 + y**2) / (4.0 * T * time / S) return sps.exp1(u1) def _ddwn2(theta, l, x, y, T, streambed, time, S): """Internal method to calculate function that gets integrated in the Hunt (1999) solution Equations 29 and 30 in the paper, theta is the constant of integration and the rest of the variables described in the hunt99ddwn function. """ if streambed == 0.0: return 0.0 u2 = ((l + np.abs(x) + 2 * T * theta / streambed) ** 2 + y**2) / ( 4.0 * T * time / S ) return np.exp(-theta) * sps.exp1(u2)
[docs]def WardLoughDrawdown( T1, S1, t, dist, Q, T2, S2, width, streambed_thick, streambed_K, aquitard_thick, aquitard_K, x, y, NSteh1=2, NSteh2=2, **kwargs, ): """Compute drawdown using Ward and Lough (2011) solution Ward and Lough (2011) presented a solution for streamflow depletion by a pumping well in a layered aquifer system. The stream is in the upper aquifer, and the pumping well is in a lower aquifer that is separated from the upper aquifer by a semi-confining aquitard layer. Ward, N.D.,and Lough, H., 2011, Stream depletion from pumping a semiconfined aquifer in a two-layer leaky aquifer system (technical note): Journal of Hydrologic Engineering ASCE, v. 16, no. 11, pgs. 955-959, https://doi.org/10.1061/(ASCE)HE.1943-5584.0000382. Parameters ---------- T: float Transmissivity in the upper aquifer [L**2/T] (K*D or T1 in the original paper) S: float Specific yield for upper aquifer [unitless] (S1 in the original paper) t: float, optionally np.array or list time at which to calculate results [T] dist: Distance between pumping well and stream [L] (L in the original paper) Q: float pumping rate (+ is extraction) [L**3/T] **kwargs: included to all drawdown methods for extra values required in some calls Returns ------- ddwn float, 2-column ndarray drawdown at specified location [L] in the shallow aquifer (column 0) and the deeper aquifer (column 1) Other Parameters ---------------- T2: float Transmissivity of deeper system S2: float Storativity of streambed_thick: float thickness of streambed streambed_K: float hydraulic conductivity of streambed, [L/T] aquitard_thick: float thickness of intervening leaky aquitard, [L] aquitard_K: float hydraulic conductivity of intervening leaky aquifer, [L/T] x: float x-coordinate of drawdown location (with origin being x=0 at stream location) [L] y: float y-coordinate of drawdown location (with origin being y=0 at pumping well location) [L] NSteh1: int Number of Stehfest series levels - algorithmic tuning parameter. Defaults to 2. NStehl2: int Number of Stehfest series levels - algorithmic tuning parameter. Defaults to 2. width: float stream width (b in paper) [L] """ # first nondimensionalize all the parameters x, y, t, T1, S1, K, lambd = _WardLoughNonDimensionalize( T1, T2, S1, S2, width, Q, dist, streambed_thick, streambed_K, aquitard_thick, aquitard_K, t, x, y, ) # Initialize output arrays s1 = np.zeros_like(t) s2 = np.zeros_like(t) # Inverse Fourier transform for ii in range(len(t)): try: s1[ii] = _StehfestCoeff(1, NSteh1) * _if1( T1, S1, K, lambd, x, y, np.log(2) / t[ii] ) for jj in range(2, NSteh1 + 1): s1[ii] += _StehfestCoeff(jj, NSteh1) * _if1( T1, S1, K, lambd, x, y, jj * np.log(2) / t[ii] ) s1[ii] *= np.log(2) / t[ii] except OverflowError as e: print(f"Overflow error in s1 calculation at index {ii}: {e}") s1[ii] = np.nan # Assign NaN if there's an overflow try: s2[ii] = _StehfestCoeff(1, NSteh2) * _if2( T1, S1, K, lambd, x, y, np.log(2) / t[ii] ) for jj in range(2, NSteh2 + 1): s2[ii] += _StehfestCoeff(jj, NSteh2) * _if2( T1, S1, K, lambd, x, y, jj * np.log(2) / t[ii] ) s2[ii] *= np.log(2) / t[ii] except OverflowError as e: print(f"Overflow error in s2 calculation at index {ii}: {e}") s2[ii] = np.nan # Assign NaN if there's an overflow return np.array(list(zip(s1 * Q / T2, s2 * Q / T2))) # re-dimensionalize
# define stream depletion methods here
[docs]def glover(T, S, time, dist, Q, **kwargs): """ Calculate Glover and Balmer (1954) solution for stream depletion Depletion solution for a well near a river where the river fully penetrates the aquifer and there is no streambed resistance. Glover, R.E. and Balmer, G.G., 1954, River depletion from pumping a well near a river, Eos Transactions of the American Geophysical Union, v. 35, no. 3, pg. 468-470, https://doi.org/10.1029/TR035i003p00468. Parameters ---------- T: float transmissivity [L**2/T] S: float storage [unitless] time: float, optionally np.array or list time at which to calculate results [T] dist: float, optionally np.array or list distance at which to calculate results in [ft] Q: float pumping rate (+ is extraction) [L**3/T] **kwargs: included to all depletion methods for extra values required in some calls Returns ------- drawdown: float depletion values at at input parameter times/distances """ z = dist / np.sqrt(4 * (T / S) * time) return Q * sps.erfc(z)
[docs]def sdf(T, S, dist, **kwargs): """ internal function for Stream Depletion Factor Stream Depletion Factor was defined by Jenkins (1968) and described in Jenkins as the time when the volume of stream depletion is 28 percent of the net volume pumped from the well. SDF = dist**2 * S/T. Jenkins, C.T., Computation of rate and volume of stream depletion by wells: U.S. Geological Survey Techniques of Water-Resources Investigations, Chapter D1, Book 4, https://pubs.usgs.gov/twri/twri4d1/. Parameters ---------- T: float transmissivity [L**2/T] S: float storage [unitless] dist: float, optionally np.array or list distance at which to calculate results in [L] **kwargs: included to all depletion methods for extra values required in some calls Returns ------- SDF: float Stream depletion factor [T] """ if isinstance(dist, list): dist = np.array(dist) return dist**2 * S / T
[docs]def walton(T, S, time, dist, Q, **kwargs): """ Calculate depletion using Walton (1987) PT-8 BASIC program logic Provides the Glover and Balmer (Jenkins) solution. Walton, W.C., Groundwater Pumping Tests: Lewis Publishers, Chelsea, Michigan, 201 p. Note that unlike the other depletion functions, this Walton function is unit-specific, using feet and days as dimensions. Parameters ---------- T: float transmissivity [gal per d per ft] S: float storage [unitless] time: float, optionally np.array or list time at which to calculate results [d] dist: float, optionally np.array or list distance at which to calculate results in [ft] Q: float pumping rate (+ is extraction) [ft**3/d] **kwargs: included to all depletion methods for extra values required in some calls Returns ------- drawdown: float depletion values at at input parameter times/distances """ if isinstance(time, list): time = np.array(time) if isinstance(time, pd.Series): time = time.values if dist > 0: # avoid divide by zero for time==0 # time = time.values G = np.zeros_like(time).astype(float) G[time != 0] = dist / np.sqrt((0.535 * time[time != 0] * T / S)) else: G = 0 I = 1 + 0.0705230784 * G + 0.0422820123 * (G**2) + 9.2705272e-03 * (G**3) J = ( I + 1.52014e-04 * (G**4) + 2.76567e-04 * (G**5) + 4.30638e-05 * (G**6) ) ** 16 ret_vals = Q * (1 / J) ret_vals[time == 0] = 0.0 return ret_vals
[docs]def hunt99(T, S, time, dist, Q, streambed_conductance, **kwargs): """Function for Hunt (1999) solution for streamflow depletion by a pumping well. Computes streamflow depletion by a pumping well for a partially penetrating stream with streambed resistance. Hunt, B., 1999, Unsteady streamflow depletion from ground water pumping: Groundwater, v. 37, no. 1, pgs. 98-102, https://doi.org/10.1111/j.1745-6584.1999.tb00962.x Parameters ---------- T: float transmissivity [L**2/T] S: float storage [unitless] time: float, optionally np.array or list time at which to calculate results [T] dist: float, optionally np.array or list distance at which to calculate results in [L] Q: float pumping rate (+ is extraction) [L**3/T] **kwargs: included to all depletion methods for extra values required in some calls Returns ------- Qs: float streamflow depletion rate, optionally np.array or list depending on input of time and dist [L**3/T] Other Parameters ---------------- streambed_conductance: float streambed_conductance conductance [L/T] (lambda in the paper) """ # turn lists into np.array so they get handled correctly if isinstance(time, list) and isinstance(dist, list): print("cannot have both time and distance as arrays") print("in the Hunt99 method. Need to externally loop") print("over one of the arrays and pass the other") sys.exit() elif isinstance(time, list): time = np.array(time) elif isinstance(dist, list): dist = np.array(dist) a = np.sqrt(S * dist**2 / (4.0 * T * time)) b = (streambed_conductance**2 * time) / (4 * S * T) c = (streambed_conductance * dist) / (2.0 * T) # Qs/Q = erfc(a) - exp(b+c)*erfc(sqrt(b) + a) # in order to calculate exp(x)erfc(y) # as values get big. Use numpy special erfcx(), # scaled complementary error function, which returns # exp(y**2)erfc(y) # then compute exp(x)/exp(y**2) * erfcx(y) # which may be computed as exp(x - y**2) * erfcx(y) # This approach gives a product that goes to zero # as the exp() term gets big and erfc() goes to zero y = np.sqrt(b) + a t1 = sps.erfcx(y) t2 = np.exp(b + c - y**2) depl = sps.erfc(a) - (t1 * t2) return Q * depl
[docs]def hunt2003( T, S, time, dist, Q, Bprime, Bdouble, aquitard_K, sigma, width, streambed_conductance, **kwargs, ): """Function for Hunt (2003) solution for streamflow depletion by a pumping well. Computes streamflow depletion by a pumping well in a semiconfined aquifer for a partially penetrating stream. The stream is in an upper semi-confining aquifer and pumping is in a lower aquifer. Hunt, B., 2003, Unsteady streamflow depletion when pumping from semiconfined aquifer: Journal of Hydrologic Engineering, v.8, no. 1, pgs 12-19. https://doi.org/10.1061/(ASCE)1084-0699(2003)8:1(12) Parameters ---------- T: float transmissivity [L**2/T] S: float storage [unitless] time: float, optionally np.array or list time at which to calculate results [T] dist: float, optionally np.array or list distance at which to calculate results in [L] Q: float pumping rate (+ is extraction) [L**3/T] **kwargs: included to all depletion methods for extra values required in some calls Returns ------- Qs: float streamflow depletion rate, optionally np.array or list depending on input of time and dist [L**3/T] Other Parameters ---------------- Bprime: float saturated thickness of semiconfining layer containing stream, [L] Bdouble: float distance from bottom of stream to bottom of semiconfining layer, [L] (aquitard thickness beneath the stream) aquitard_K: float hydraulic conductivity of semiconfining layer [L/T] sigma: float porosity of semiconfining layer width: float stream width (b in paper) [T] streambed_conductance: float streambed conductance [L/T] (lambda in the paper), only used if K is less than 1e-10 """ # turn lists into np.array so they get handled correctly if isinstance(time, list) and isinstance(dist, list): print("cannot have both time and distance as arrays") print("in the Hunt2003 method. Need to externally loop") print("over one of the arrays and pass the other") sys.exit() elif isinstance(time, list): time = np.array(time) elif isinstance(dist, list): dist = np.array(dist) # make dimensionless group used in equations dtime = (T * time) / (S * np.power(dist, 2)) # if K is really small, set streambed conductance to a value # so solution collapses to Hunt 1999 (confined aquifer solution) if aquitard_K < 1.0e-10: lam = streambed_conductance else: lam = aquitard_K * width / Bdouble dlam = lam * dist / T epsilon = S / sigma dK = (aquitard_K / Bprime) * np.power(dist, 2) / T # numerical integration of F() and G() functions to # get correction to Hunt(1999) estimate of streamflow depletion # because of storage in the semiconfining aquifer correction = [] for dt in dtime: [y, err] = integrate.quad( _integrand, 0.0, 1.0, args=(dlam, dt, epsilon, dK), limit=500 ) correction.append(dlam * y) # terms for depletion, similar to Hunt (1999) but repeated # here so it matches the 2003 paper. a = 1.0 / (2.0 * np.sqrt(dtime)) b = dlam / 2.0 + (dtime * np.power(dlam, 2) / 4.0) c = a + (dlam * np.sqrt(dtime) / 2.0) # use erfxc() function from scipy (see _hunt99 above) # for erf(b)*erfc(c) term t1 = sps.erfcx(c) t2 = np.exp(b - c**2) depl = sps.erfc(a) - (t1 * t2) ## corrected depletion for storage of upper semiconfining unit return Q * (depl - correction)
def _F(alpha, dlam, dtime): """F function from paper in equation (46) as given by equation (47) in Hunt (2003) Parameters ---------- alpha: float integration variable dlam: float dimensionless streambed/semiconfining unit conductance (width * K/B'') * distance/Transmissivity dt: float dimensionless time (time * transmissivity)/(storativity * distance**2) """ # Hunt uses an expansion if dimensionless time>3 z = alpha * dlam * np.sqrt(dtime) / 2.0 + 1.0 / ( 2.0 * alpha * np.sqrt(dtime) ) if np.abs(z) < 3.0: a = dlam / 2.0 + (dtime * np.power(alpha, 2) * np.power(dlam, 2) / 4.0) t1 = sps.erfcx(z) t2 = np.exp(a - z**2) b = -1.0 / (4 * dtime * alpha**2) # equation 47 in paper F = np.exp(b) * np.sqrt(dtime / np.pi) - ( alpha * dtime * dlam ) / 2.0 * (t1 * t2) else: t1 = np.exp(-(1.0 / (4.0 * dtime * alpha**2))) / ( 2.0 * alpha * z * np.sqrt(np.pi) ) t2 = 2.0 / (dlam * (1.0 + (1.0 / (dlam * dtime * alpha**2)) ** 2)) sumterm = ( 1 - (3.0 / (2 * z**2)) + (15.0 / (4.0 * z**4)) - (105.0 / (8 * z**6)) ) F = t1 * (1.0 + t2 * sumterm) # equation 53 in paper if np.isnan(F): print(f"alpha {alpha}, dtime {dtime}, dlam {dlam}") sys.exit() return F def _G(alpha, epsilon, dK, dtime): """G function from paper in equation (46) in Hunt (2003) This function is in equation (46) and expanded in equation (53). Function uses scipy special for incomplete Gamma Function (P(a,b)), binomial coefficient, and modified Bessel function of zero order (I0). Parameters ---------- alpha: float integration variable epsilon: float dimensionless storage storativity/porosity of semiconfining bed dK: float dimensionless conductivity of semiconfining unit (K * Bprime) * dist**2/Transmissivity """ # if dimensionless K is zero (check really small), return 0 # this avoids divide by zero error in terms that have divide by (a+b) if dK < 1.0e-10: return 0.0 a = epsilon * dK * dtime * (1.0 - alpha**2) b = dK * dtime * alpha**2 if (a + b) < 80.0: term1 = np.exp(-(a + b)) * sps.i0(2.0 * np.sqrt(a * b)) else: term1 = 0.0 abterm = np.sqrt(a * b) / (a + b) sum = 0 for n in range(0, 101): if n <= 8: addterm = ( sps.binom(2 * n, n) * sps.gammainc(2 * n + 1, a + b) * abterm ** (2 * n) ) else: bi_term = np.log(sps.binom(2 * n, n)) inc_gamma = np.log(sps.gammainc(2 * n + 1, a + b)) logab = (2 * n) * np.log(abterm) addterm = np.exp(bi_term + inc_gamma + logab) sum = sum + addterm if addterm < 1.0e-08: break eqn52 = 0.5 * (1.0 - term1 + ((b - a) / (a + b)) * sum) if eqn52 < 0: eqn52 = 0.0 if eqn52 > 1.0: eqn52 = 1.0 if np.isnan(eqn52): print("equation 52 is nan") sys.exit() return eqn52 def _integrand(alpha, dlam, dtime, epsilon, dK): """internal function returning product of F() and G() terms for numerical integration""" return _F(alpha, dlam, dtime) * _G(alpha, epsilon, dK, dtime) def _calc_deltaQ(Q): """internal function to parse the Q time series to find changes and their associated times Parameters ---------- Q: pandas Series time series of pumping Returns ------- deltaQ: pandas Series) times and changes in Q over time """ # find the differences in pumping dq = Q.copy() dq.iloc[1:] = np.diff(Q) # get the locations of changes deltaQ = dq.loc[dq != 0] # special case for starting with 0 pumping if Q.index[0] not in deltaQ.index: deltaQ.loc[Q.index[0]] = Q.iloc[0] deltaQ.sort_index(inplace=True) return deltaQ def _WardLoughNonDimensionalize( T1, T2, S1, S2, width, Q, dist, streambed_thick, streambed_K, aquitard_thick, aquitard_K, t, x, y, ): """Internal function to make non-dimensional groups for Ward and Lough solution""" t = np.array(t) # make sure not passing a list if x is not None: x /= dist if y is not None: y /= dist t = t * T2 / (S2 * (dist**2)) T1 /= T2 S1 /= S2 K = ((aquitard_K / aquitard_thick) * (dist**2)) / T2 lambd = ((streambed_K * width) / streambed_thick) * dist / T2 return x, y, t, T1, S1, K, lambd
[docs]def WardLoughDepletion( T1, S1, t, dist, Q, T2, S2, width, streambed_thick, streambed_K, aquitard_thick, aquitard_K, x=0, y=0, NSteh1=2, **kwargs, ): """ Compute streamflow depletion using Ward and Lough (2011) solution Ward and Lough (2011) presented a solution for streamflow depletion by a pumping well in a layered aquifer system. The stream is in the upper aquifer, and the pumping well is in a lower aquifer that is separated from the upper aquifer by a semi-confining aquitard layer. Ward, N.D.,and Lough, H., 2011, Stream depletion from pumping a semiconfined aquifer in a two-layer leaky aquifer system (techical note): Journal of Hydrologic Engineering ASCE, v. 16, no. 11, pgs. 955-959, https://doi.org/10.1061/(ASCE)HE.1943-5584.0000382. Parameters ---------- T: float transmissivity [L**2/T] storage [unitless] specific yield in the upper q aquifer time: float, optionally np.array or list time at which to calculate results [T] dist: float, optionally np.array or list distance at which to calculate results in [L] Q: float pumping rate (+ is extraction) [L**3/T] **kwargs: included to all depletion methods for extra values required in some calls Returns ------- Qs: float streamflow depletion rate, optionally np.array or list depending on input of time and dist [L**3/T] Other Parameters ---------------- T2: float Transmissivity of deeper system S2: float Storativity of streambed_thick: float thickness of streambed streambed_K: float hydraulic conductivity of streambed, [L/T] aquitard_thick: float thickness of intervening leaky aquitard, [L] aquitard_K: float hydraulic conductivity of intervening leaky aquifer, [L/T] x: float x-coordinate of drawdown location (with origin being x=0 at stream location) [L] y: float y-coordinate of drawdown location (with origin being y=0 at pumping well location) [L] NSteh1: int Number of Stehfest series levels - algorithmic tuning parameter. Defaults to 2. width: float stream width (b in paper) [L] """ # first nondimensionalize all the parameters x, y, t, T1, S1, K, lambd = _WardLoughNonDimensionalize( T1, T2, S1, S2, width, Q, dist, streambed_thick, streambed_K, aquitard_thick, aquitard_K, t, x, y, ) # Inverse Fourier transform DeltaQ = _StehfestCoeff(1, NSteh1) * _if1_dQ( T1, S1, K, lambd, np.log(2) / t, x, y ) for jj in range(2, NSteh1 + 1): DeltaQ += _StehfestCoeff(jj, NSteh1) * _if1_dQ( T1, S1, K, lambd, jj * np.log(2) / t, x, y ) DeltaQ = 2 * np.pi * lambd * DeltaQ * np.log(2) / t return DeltaQ * Q # convert back to CFS from CFD
def _if1_dQ(T1, S1, K, lambda_, p, x, y): """Internal function for Ward and Lough (2011) solution""" return _kernel1(T1, S1, K, lambda_, 0, 0, p) + _kernel2( T1, S1, K, lambda_, 0, 0, p ) def _if1(T1, S1, K, lambd, x, y, p): """Internal function for Ward and Lough (2011) solution""" G = ( lambda phi: 2 * ( _kernel1(T1, S1, K, lambd, x, np.tan(phi), p) + _kernel2(T1, S1, K, lambd, x, np.tan(phi), p) ) * np.cos(np.tan(phi) * y) / np.cos(phi) ** 2 ) s1InvFour, _ = integrate.quad( G, 0, np.pi / 2, epsrel=1e-1, epsabs=1e-1, limit=10000 ) return s1InvFour def _if2(T1, S1, K, lambd, x, y, p): """Internal function for Ward and Lough (2011) solution""" H = ( lambda phi: 2 * ( _coeff_s1_1(T1, S1, K, lambd, np.tan(phi), p) * _kernel1(T1, S1, K, lambd, x, np.tan(phi), p) + _coeff_s1_2(T1, S1, K, lambd, np.tan(phi), p) * _kernel2(T1, S1, K, lambd, x, np.tan(phi), p) ) * np.cos(np.tan(phi) * y) / np.cos(phi) ** 2 ) s2InvFour, errbnd = integrate.quad( H, 0, np.pi / 2, epsrel=1e-1, epsabs=1e-1, limit=10000 ) return s2InvFour def _coeff_s1_1(T1, S1, K, lambd, theta, p): """Internal function for Ward and Lough (2011) solution""" b11, b12, b22, mu1, mu2, l1, l2, beta1, beta2, A1, A2 = _coeffs( T1, S1, K, lambd, theta, p ) B1 = (mu1 * T1 - b11) / b12 return B1 def _coeff_s1_2(T1, S1, K, lambd, theta, p): """Internal function for Ward and Lough (2011) solution""" b11, b12, b22, mu1, mu2, l1, l2, beta1, beta2, A1, A2 = _coeffs( T1, S1, K, lambd, theta, p ) B2 = (mu2 * T1 - b11) / b12 return B2 def _kernel1(T1, S1, K, lambd, x, theta_or_y, p): """Internal function for Ward and Lough (2011) solution""" b11, b12, b22, mu1, mu2, l1, l2, beta1, beta2, A1, A2 = _coeffs( T1, S1, K, lambd, theta_or_y, p ) if x < 0: F1 = A1 * np.exp(x * np.sqrt(mu1)) elif 0 <= x <= 1: F1 = A1 * np.exp(-x * np.sqrt(mu1)) + beta1 / ( 2 * np.sqrt(mu1) * l1 ) * (np.exp((x - 1) * np.sqrt(mu1)) - np.exp(-(x + 1) * np.sqrt(mu1))) else: F1 = A1 * np.exp(-x * np.sqrt(mu1)) + beta1 / ( 2 * np.sqrt(mu1) * l1 ) * (np.exp((1 - x) * np.sqrt(mu1)) - np.exp(-(x + 1) * np.sqrt(mu1))) return F1 def _kernel2(T1, S1, K, lambd, x, theta_or_y, p): """Internal function for Ward and Lough (2011) solution""" b11, b12, b22, mu1, mu2, l1, l2, beta1, beta2, A1, A2 = _coeffs( T1, S1, K, lambd, theta_or_y, p ) if x < 0: F2 = A2 * np.exp(x * np.sqrt(mu2)) elif 0 <= x <= 1: F2 = A2 * np.exp(-x * np.sqrt(mu2)) + beta2 / ( 2 * np.sqrt(mu2) * l2 ) * (np.exp((x - 1) * np.sqrt(mu2)) - np.exp(-(x + 1) * np.sqrt(mu2))) else: F2 = A2 * np.exp(-x * np.sqrt(mu2)) + beta2 / ( 2 * np.sqrt(mu2) * l2 ) * (np.exp((1 - x) * np.sqrt(mu1)) - np.exp(-(x + 1) * np.sqrt(mu1))) return F2 def _coeffs(T1, S1, K, lambd, theta_or_y, p): """Internal function for Ward and Lough (2011) solution""" b11 = T1 * theta_or_y**2 + S1 * p + K b12 = -K b22 = theta_or_y**2 + p + K mu1 = (b11 / T1 + b22) / 2 + np.sqrt( (b11 / T1 + b22) ** 2 / 4 + (b12**2 - b11 * b22) / T1 ) mu2 = (b11 / T1 + b22) / 2 - np.sqrt( (b11 / T1 + b22) ** 2 / 4 + (b12**2 - b11 * b22) / T1 ) l1 = T1 + ((mu1 * T1 - b11) / b12) ** 2 l2 = T1 + ((mu2 * T1 - b11) / b12) ** 2 beta1 = (mu1 * T1 - b11) / (b12 * 2 * np.pi * p) beta2 = (mu2 * T1 - b11) / (b12 * 2 * np.pi * p) Delta = 4 * np.sqrt(mu1 * mu2) + 2 * lambd * ( np.sqrt(mu1) / l2 + np.sqrt(mu2) / l1 ) A1 = ( ( (lambd / l2 + 2 * np.sqrt(mu2)) * beta1 * np.exp(-np.sqrt(mu1)) - lambd * beta2 / l2 * np.exp(-np.sqrt(mu2)) ) / Delta / l1 ) A2 = ( ( -lambd * beta1 / l1 * np.exp(-np.sqrt(mu1)) + (lambd / l1 + 2 * np.sqrt(mu1)) * beta2 * np.exp(-np.sqrt(mu2)) ) / Delta / l2 ) return b11, b12, b22, mu1, mu2, l1, l2, beta1, beta2, A1, A2 def _safe_factorial(n): """Calculate factorial using logarithmic method to avoid overflow.""" if n < 0: return float("inf") elif n < 2: return 1 else: return np.exp(gammaln(n + 1)) def _StehfestCoeff(jj, N): """Internal function for Ward and Lough (2011) solution""" LowerLimit = (jj + 1) // 2 UpperLimit = min(jj, N // 2) V = 0 for kk in range(LowerLimit, UpperLimit + 1): denominator = ( _safe_factorial(N // 2 - kk) * _safe_factorial(kk) * _safe_factorial(kk - 1) * _safe_factorial(jj - kk) * _safe_factorial(2 * kk - jj) ) if denominator != 0: # Prevent division by zero V += kk ** (N // 2) * _safe_factorial(2 * kk) / denominator V *= (-1) ** (N // 2 + jj) return V # List drawdown and depletion methods so they can be called # programatically ALL_DD_METHODS = { "theis": theis, "hunt99ddwn": hunt99ddwn, "wardloughddwn": WardLoughDrawdown, } ALL_DEPL_METHODS = { "glover": glover, "walton": walton, "hunt99": hunt99, "hunt03": hunt2003, "wardlough": WardLoughDepletion, } GPM2CFD = 60 * 24 / 7.48 # factor to convert from GPM to CFD