Source code for mfsetup.mfmodel

import os
import time
import warnings
from collections import defaultdict
from pathlib import Path

import flopy
import geopandas as gpd
import numpy as np
import pandas as pd
import pyproj
from packaging import version

fm = flopy.modflow
mf6 = flopy.mf6
import gisutils
import sfrmaker
from gisutils import get_shapefile_crs, get_values_at_points, project
from sfrmaker import Lines
from sfrmaker.utils import assign_layers

from mfsetup.bcs import (
    get_bc_package_cells,
    setup_basic_stress_data,
    setup_flopy_stress_period_data,
)
from mfsetup.config import validate_configuration
from mfsetup.fileio import (
    check_source_files,
    load,
    load_array,
    load_cfg,
    save_array,
    set_cfg_paths_to_absolute,
    setup_external_filepaths,
)
from mfsetup.grid import MFsetupGrid, get_ij, rasterize, setup_structured_grid
from mfsetup.interpolate import (
    get_source_dest_model_xys,
    interp_weights,
    interpolate,
    regrid,
)
from mfsetup.lakes import make_lakarr2d, setup_lake_fluxes, setup_lake_info
from mfsetup.mf5to6 import (
    get_model_length_units,
    get_model_time_units,
    get_package_name,
)
from mfsetup.model_version import get_versions
from mfsetup.sourcedata import TransientTabularSourceData, setup_array
from mfsetup.tdis import (
    concat_periodata_groups,
    get_parent_stress_periods,
    parse_perioddata_groups,
    setup_perioddata,
    setup_perioddata_group,
)
from mfsetup.tmr import Tmr
from mfsetup.units import convert_length_units, lenuni_text, lenuni_values
from mfsetup.utils import flatten, get_input_arguments, get_packages, update
from mfsetup.wells import setup_wel_data

if version.parse(gisutils.__version__) < version.parse('0.2.2'):
    warnings.warn('Automatic reprojection functionality requires gis-utils >= 0.2.2'
                  '\nPlease pip install --upgrade gis-utils')
if version.parse(sfrmaker.__version__) < version.parse('0.6'):
    warnings.warn('sfr: sfrmaker_options: add_outlet functionality requires sfrmaker >= 0.6'
                  '\nPlease pip install --upgrade sfrmaker')


[docs] class MFsetupMixin(): """Mixin class for shared functionality between MF6model and MFnwtModel. Meant to be inherited by both those classes and not be called directly. https://stackoverflow.com/questions/533631/what-is-a-mixin-and-why-are-they-useful """ source_path = Path(__file__).parent """ -1 : well 0 : no lake 1 : lak package lake (lakarr > 0) 2 : high-k lake 3 : ghb 4 : sfr""" # package variable name: number bc_numbers = {'wel': -1, 'lak': 1, 'high-k lake': 2, 'ghb': 3, 'sfr': 4, 'riv': 5 } model_type = "mfsetup" def __init__(self, parent): # property attributes self._cfg = None self._nper = None self._perioddata = None self._sr = None self._modelgrid = None self._bbox = None self._parent = parent self._parent_layers = None self._parent_default_source_data = False self._parent_mask = None self._lakarr_2d = None self._isbc_2d = None self._lakarr = None self._isbc = None self._lake_bathymetry = None self._high_k_lake_recharge = None self._nodata_value = -9999 self._model_ws = None self._abs_model_ws = None self._model_version = None # semantic version of model self._longname = None # long name for model (short name is self.name) self._header = None # header for files and repr self.inset = None # dictionary of inset models attached to LGR parent self._is_lgr = False # flag for lgr inset models self.lgr = None # holds flopy Lgr utility object self._lgr_idomain2d = None # array of Lgr inset model locations within parent grid self.tmr = None # holds TMR class instance for TMR-type perimeter boundaries self._load = False # whether model is being made or loaded from existing files self.lake_info = None self.lake_fluxes = None # flopy settings self._mg_resync = False self._features = {} # dictionary for caching shapefile datasets in memory # arrays remade during this session self.updated_arrays = set() # cache of interpolation weights to speed up regridding self._interp_weights = None def __repr__(self): header = f'{self.header}\n' txt = '' if self.parent is not None: txt += 'Parent model: {}/{}\n'.format(self.parent.model_ws, self.parent.name) if self._modelgrid is not None: txt += f'{self._modelgrid.__repr__()}' txt += 'Packages:' for pkg in self.get_package_list(): txt += ' {}'.format(pkg.lower()) txt += '\n' txt += f'{self.nper:d} period(s):\n' if self._perioddata is not None: cols = ['per', 'start_datetime', 'end_datetime', 'perlen', 'steady', 'nstp'] txt += self.perioddata[cols].head(3).to_string(index=False) txt += '\n ...\n' tail = self.perioddata[cols].tail(1).to_string(index=False) txt += tail.split('\n')[1] txt = header + txt return txt def __eq__(self, other): """Test for equality to another model object.""" if not isinstance(other, self.__class__): return False # kludge: skip obs packages for now # - obs packages aren't read in with same name under which they were created # - also SFR_OBS package is handled by SFRmaker instead of Flopy; # a loaded version of a model might have SFR_OBS, # where a freshly made version may not (even though SFRmaker will write it) # all_packages = set(self.get_package_list()).union(other.get_package_list()) exceptions = {p for p in all_packages if p.lower().startswith('obs') or p.lower().endswith('obs')} other_packages = [s for s in sorted(other.get_package_list()) if s not in exceptions] packages = [s for s in sorted(self.get_package_list()) if s not in exceptions] if other_packages != packages: return False if other.modelgrid != self.modelgrid: return False if other.nlay != self.nlay: return False if not np.array_equal(other.perioddata, self.perioddata): return False # TODO: add checks of actual array values and other parameters for k, v in self.__dict__.items(): if k in ['cfg', 'sfrdata', '_load', '_packagelist', '_package_paths', 'package_key_dict', 'package_type_dict', 'package_name_dict', '_ftype_num_dict']: continue elif k not in other.__dict__: return False elif type(v) == bool: if not v == other.__dict__[k]: return False elif k == 'cfg': continue elif type(v) in [str, int, float, dict, list]: if v != other.__dict__[k]: pass continue return True @property def nper(self): if self.perioddata is not None: return len(self.perioddata) @property def nrow(self): if self.modelgrid.grid_type == 'structured': return self.modelgrid.nrow @property def ncol(self): if self.modelgrid.grid_type == 'structured': return self.modelgrid.ncol @property def modelgrid(self): if self._modelgrid is None: self.setup_grid() # trap for instance where default (base) modelgrid # instance is attached to the flopy model # (because the grid hasn't been set up with) # self._modelgrid.nlay will error in this case # because of NotImplementedError in base class elif self._modelgrid.grid_type is None: pass # add layer tops and bottoms and idomain to the model grid # if they haven't been yet elif self._modelgrid.nlay is None and 'DIS' in self.get_package_list(): self._modelgrid._top = self.dis.top.array self._modelgrid._botm = self.dis.botm.array if self.version == 'mf6': self._modelgrid._idomain = self.dis.idomain.array elif 'bas6' in self.get_package_list(): self._modelgrid._idomain = self.bas6.ibound.array #self.setup_grid() return self._modelgrid @property def bbox(self): if self._bbox is None and self.modelgrid is not None: self._bbox = self.modelgrid.bbox return self._bbox #@property #def perioddata(self): # """DataFrame summarizing stress period information. # # Columns: # # start_date_time : pandas datetimes; start date/time of each stress period # (does not include steady-state periods) # end_date_time : pandas datetimes; end date/time of each stress period # (does not include steady-state periods) # time : float; cumulative MODFLOW time (includes steady-state periods) # per : zero-based stress period # perlen : stress period length in model time units # nstp : number of timesteps in the stress period # tsmult : timestep multiplier for stress period # steady : True=steady-state, False=Transient # oc : MODFLOW-6 output control options # """ # if self._perioddata is None: # perioddata = setup_perioddata(self) # return self._perioddata @property def parent(self): return self._parent @property def parent_layers(self): """Mapping between layers in source model and layers in destination model. Returns ------- parent_layers : dict {inset layer : parent layer} """ if self._parent_layers is None: parent_layers = None botm_source_data = self.cfg['dis'].get('source_data', {}).get('botm', {}) nlay = self.modelgrid.nlay if nlay is None: nlay = self.cfg['dis']['dimensions']['nlay'] if self.cfg['parent'].get('inset_layer_mapping') is not None: parent_layers = self.cfg['parent'].get('inset_layer_mapping') elif isinstance(botm_source_data, dict) and 'from_parent' in botm_source_data: parent_layers = botm_source_data.get('from_parent') elif self.parent is not None and (self.parent.modelgrid.nlay == nlay): parent_layers = dict(zip(range(self.parent.modelgrid.nlay), range(nlay))) else: #parent_layers = dict(zip(range(self.parent.modelgrid.nlay), range(self.parent.modelgrid.nlay))) parent_layers = None self._parent_layers = parent_layers return self._parent_layers @property def parent_stress_periods(self): """Mapping between stress periods in source model and stress periods in destination model. Returns ------- parent_stress_periods : dict {inset stress period : parent stress period} """ return dict(zip(self.perioddata['per'], self.perioddata['parent_sp'])) @property def package_list(self): """Definitive list of packages. Get from namefile input first (as in mf6 input), then look under model input. """ packages = self.cfg.get('nam', {}).get('packages', []) if len(packages) == 0: packages = self.cfg['model'].get('packages', []) return [p for p in self._package_setup_order if p in packages] @property def perimeter_bc_type(self): """Dictates how perimeter boundaries are set up. if 'head'; a constant head package is created from the parent model starting heads if 'flux'; a specified flux boundary is created from parent model cell by cell flow output """ perimeter_boundary_type = self.cfg['model'].get('perimeter_boundary_type') if perimeter_boundary_type is not None: if 'head' in perimeter_boundary_type: return 'head' if 'flux' in perimeter_boundary_type: return 'flux' @property def model_ws(self): if self._model_ws is None: self._model_ws = Path(self._get_model_ws()) return self._model_ws @model_ws.setter def model_ws(self, model_ws): self._model_ws = model_ws self._abs_model_ws = os.path.normpath(os.path.abspath(model_ws)) @property def model_version(self): """Semantic version of model, using a hacked version of the versioneer. Version is reported using git tags for the model repository or a start_version: key specified in the configuration file (default 0). The start_version or tag is then appended by the remaining information in a pep440-post style version tag (e.g. most recent git commit hash for the model repository + "dirty" if the model repository has uncommited changes) References ---------- https://github.com/warner/python-versioneer https://github.com/warner/python-versioneer/blob/master/details.md """ if self._model_version is None: self._model_version = get_versions(path=self.model_ws, start_version=str(self.cfg['metadata']['start_version'])) return self._model_version @property def longname(self): if self._longname is None: longname = self.cfg['metadata'].get('longname') if longname is None: longname = f'{self.name} model' self._longname = longname return self._longname @property def header(self): if self._header is None: version_str = self.model_version['version'] header = f'{self.longname} version {version_str}' self._header = header return self._header @property def tmpdir(self): #abspath = os.path.abspath( # self.cfg['intermediate_data']['output_folder']) abspath = self.model_ws / 'original-arrays' self.cfg['intermediate_data']['output_folder'] = str(abspath) abspath.mkdir(exist_ok=True) #if not os.path.isdir(abspath): # os.makedirs(abspath) tmpdir = abspath if self.relative_external_paths: #tmpdir = os.path.relpath(abspath) tmpdir = abspath.relative_to(self.model_ws) #else: # do we need to normalize with Pathlib?? # tmpdir = os.path.normpath(abspath) return tmpdir @property def external_path(self): abspath = os.path.abspath( self.cfg.get('model', {}).get('external_path', 'external')) if not os.path.isdir(abspath): os.makedirs(abspath) if self.relative_external_paths: ext_path = os.path.relpath(abspath) else: ext_path = os.path.normpath(abspath) return ext_path @external_path.setter def external_path(self, x): pass # bypass any setting in parent class @property def interp_weights(self): """For a given parent, only calculate interpolation weights once to speed up re-gridding of arrays to pfl_nwt.""" if self._interp_weights is None: parent_xy, inset_xy = get_source_dest_model_xys(self.parent, self) self._interp_weights = interp_weights(parent_xy, inset_xy) return self._interp_weights @property def parent_mask(self): """Boolean array indicating window in parent model grid (subset of cells) that encompass the inset model domain, with a surrounding buffer. Used to speed up interpolation of parent grid values onto inset model grid.""" if self._parent_mask is None: x, y = np.squeeze(self.bbox.exterior.coords.xy) pi, pj = get_ij(self.parent.modelgrid, x, y) pad = 3 i0 = np.max([pi.min() - pad, 0]) i1 = np.min([pi.max() + pad + 1, self.parent.modelgrid.nrow]) j0 = np.max([pj.min() - pad, 0]) j1 = np.min([pj.max() + pad + 1, self.parent.modelgrid.ncol]) mask = np.zeros((self.parent.modelgrid.nrow, self.parent.modelgrid.ncol), dtype=bool) mask[i0:i1, j0:j1] = True self._parent_mask = mask return self._parent_mask @property def nlakes(self): if self.lakarr is not None: return int(np.max(self.lakarr)) else: return 0 @property def _lakarr2d(self): """2-D array of areal extent of lakes. Non-zero values correspond to lak package IDs.""" if self._lakarr_2d is None: self._set_lakarr2d() return self._lakarr_2d @property def lakarr(self): """3-D array of lake extents in each layer. Non-zero values correspond to lak package IDs. Extent of lake in each layer is based on bathymetry and model layer thickness. """ if self._lakarr is None: self.setup_external_filepaths('lak', 'lakarr', self.cfg['lak']['{}_filename_fmt'.format('lakarr')], file_numbers=list(range(self.nlay))) if self.isbc is None: return None else: self._set_lakarr() return self._lakarr @property def _isbc2d(self): """2-D array indicating the i, j locations of boundary conditions. -1 : well 0 : no lake 1 : lak package lake (lakarr > 0) 2 : high-k lake 3 : ghb 4 : sfr 5 : riv see also the .bc_numbers attibute """ if self._isbc_2d is None: self._set_isbc2d() return self._isbc_2d @property def isbc(self): """3D array indicating which cells have a lake in each layer. -1 : well 0 : no lake 1 : lak package lake (lakarr > 0) 2 : high-k lake 3 : ghb 4 : sfr 5 : riv see also the .bc_numbers attibute """ # DIS package is needed to set up the isbc array # (to compare lake bottom elevations to layer bottoms) if self.get_package('dis') is None: return None if self._isbc is None: self._set_isbc() return self._isbc @property def lake_bathymetry(self): """Put lake bathymetry setup logic here instead of DIS package. """ if self._lake_bathymetry is None: self._set_lake_bathymetry() return self._lake_bathymetry @property def high_k_lake_recharge(self): """Recharge value to apply to high-K lakes, in model units. """ if self._high_k_lake_recharge is None and self.cfg['high_k_lakes']['simulate_high_k_lakes']: if self.lake_info is None: self.lake_info = setup_lake_info(self) if self.lake_info is not None: self.lake_fluxes = setup_lake_fluxes(self, block='high_k_lakes') self._high_k_lake_recharge = self.lake_fluxes.groupby('per').mean()['highk_lake_rech'].sort_index() return self._high_k_lake_recharge def load_array(self, filename): if isinstance(filename, list): arrays = [] for f in filename: arrays.append(load_array(f, shape=(self.nrow, self.ncol), nodata=self._nodata_value ) ) return np.array(arrays) return load_array(filename, shape=(self.nrow, self.ncol))
[docs] def load_features(self, filename, bbox_filter=None, id_column=None, include_ids=None, cache=True): """Load vector and attribute data from a shapefile; cache it to the _features dictionary. """ if isinstance(filename, str): features_file = [filename] dfs_list = [] for f in features_file: if f not in self._features.keys(): if os.path.exists(f): features_crs = get_shapefile_crs(f) if bbox_filter is None: if self.bbox is not None: bbox = self.bbox elif self.parent.modelgrid is not None: bbox = self.parent.modelgrid.bbox model_crs = self.parent.modelgrid.crs assert model_crs is not None if features_crs != self.modelgrid.crs: bbox_filter = project(bbox, self.modelgrid.crs, features_crs).bounds else: bbox_filter = bbox.bounds # implement automatic reprojection in gis-utils # maintaining backwards compatibility df = gpd.read_file(f) df.to_crs(self.modelgrid.crs, inplace=True) df.columns = [c.lower() for c in df.columns] if cache: print('caching data in {}...'.format(f)) self._features[f] = df else: print('feature input file {} not found'.format(f)) return else: df = self._features[f] if id_column is not None: id_column = id_column.lower() # convert any floating point dtypes to integer if df[id_column].dtype == float: df[id_column] = df[id_column].astype('int64') df.index = df[id_column] if include_ids is not None: df = df.loc[include_ids].copy() dfs_list.append(df) df = pd.concat(dfs_list) if len(df) == 0: warnings.warn('No features loaded from {}!'.format(filename)) return df
[docs] def get_boundary_cells(self, exclude_inactive=False): """Get the i, j locations of cells along the model perimeter. Returns ------- k, i, j : 1D numpy arrays of ints zero-based layer, row, column locations of boundary cells """ # top row, right side, left side, bottom row i_top = [0] * self.ncol j_top = list(range(self.ncol)) i_left = list(range(1, self.nrow - 1)) j_left = [0] * (self.nrow - 2) i_right = i_left j_right = [self.ncol - 1] * (self.nrow - 2) i_botm = [self.nrow - 1] * self.ncol j_botm = j_top i = i_top + i_left + i_right + i_botm j = j_top + j_left + j_right + j_botm assert len(i) == 2 * self.nrow + 2 * self.ncol - 4 nlaycells = len(i) k = np.array(sorted(list(range(self.nlay)) * len(i))) i = np.array(i * self.nlay) j = np.array(j * self.nlay) assert np.sum(k[nlaycells:nlaycells * 2]) == nlaycells if exclude_inactive: if self.version == 'mf6': active_cells = self.idomain[k, i, j] >= 1 else: active_cells = self.ibound[k, i, j] >= 1 k = k[active_cells].copy() i = i[active_cells].copy() j = j[active_cells].copy() return k, i, j
[docs] def regrid_from_parent(self, parent_array, mask=None, method='linear'): """Interpolate values in parent array onto the pfl_nwt model grid, using model grid instances attached to the parent and pfl_nwt models. Parameters ---------- parent_array : ndarray Values from parent model to be interpolated to pfl_nwt grid. 1 or 2-D numpy array of same sizes as a layer of the parent model. mask : ndarray (bool) 1 or 2-D numpy array of same sizes as a layer of the parent model. True values indicate cells to include in interpolation, False values indicate cells that will be dropped. method : str ('linear', 'nearest') Interpolation method. """ if mask is not None: return regrid(parent_array, self.parent.modelgrid, self.modelgrid, mask1=mask, method=method) if method == 'linear': #parent_values = parent_array.flatten()[self.parent_mask.flatten()] parent_values = parent_array[self.parent_mask].flatten() regridded = interpolate(parent_values, *self.interp_weights) elif method == 'nearest': regridded = regrid(parent_array, self.parent.modelgrid, self.modelgrid, method='nearest') regridded = np.reshape(regridded, (self.nrow, self.ncol)) return regridded
[docs] def setup_external_filepaths(self, package, variable_name, filename_format, file_numbers=None): """Set up external file paths for a MODFLOW package variable. Sets paths for intermediate files, which are written from the (processed) source data. Intermediate files are supplied to Flopy as external files for a given package variable. Flopy writes external files to a specified location when the MODFLOW package file is written. This method gets the external file paths that will be written by FloPy, and puts them in the configuration dictionary under their respective variables. Parameters ---------- package : str Three-letter package abreviation (e.g. 'DIS' for discretization) variable_name : str FloPy name of variable represented by external files (e.g. 'top' or 'botm') filename_format : str File path to the external file(s). Can be a string representing a single file (e.g. 'top.dat'), or for variables where a file is written for each layer or stress period, a format string that will be formated with the zero-based layer number (e.g. 'botm{}.dat') for files botm0.dat, botm1.dat, ... file_numbers : list of ints List of numbers for the external files. Usually these represent zero-based layers or stress periods. relative_external_paths : bool If true, external paths will be specified relative to model_ws, otherwise, they will be absolute paths Returns ------- filepaths : list List of external file paths Adds intermediated file paths to model.cfg[<package>]['intermediate_data'] Adds external file paths to model.cfg[<package>][<variable_name>] """ # for lgr models, add the model name to the external filename # if lgr parent or lgr inset if self.lgr or self._is_lgr: filename_format = '{}_{}'.format(self.name, filename_format) return setup_external_filepaths(self, package, variable_name, filename_format, file_numbers=file_numbers, relative_external_paths=self.relative_external_paths)
def _get_model_ws(self, cfg=None): if cfg is None: cfg = self.cfg if self.version == 'mf6': abspath = os.path.abspath(cfg.get('simulation', {}).get('sim_ws', '.')) else: abspath = os.path.abspath(cfg.get('model', {}).get('model_ws', '.')) if not os.path.exists(abspath): os.makedirs(abspath) self._abs_model_ws = os.path.normpath(abspath) os.chdir(abspath) # within a session, modflow-setup operates in the model_ws if self.relative_external_paths: model_ws = os.path.relpath(abspath) else: model_ws = os.path.normpath(abspath) return Path(model_ws) def _reset_bc_arrays(self): """Reset the boundary condition property arrays in order. _lakarr2d (depends on _lakarr_2d _isbc2d (depends on _lakarr2d) _lake_bathymetry (depends on _isbc2d) _isbc (depends on _isbc2d) _lakarr (depends on _isbc and _lakarr2d) """ self._lakarr_2d = None self._isbc_2d = None # (depends on _lakarr2d) self._lake_bathymetry = None # (depends on _isbc2d) self._isbc = None # (depends on _isbc2d) self._lakarr = None # #self._set_lakarr2d() # calls self._set_isbc2d(), which calls self._set_lake_bathymetry() #self._set_isbc() # calls self._set_lakarr() def _set_cfg(self, user_specified_cfg): """Load configuration file; update dictionary. """ #self.cfg = defaultdict(dict) self.cfg = defaultdict(dict, self.cfg) if isinstance(user_specified_cfg, str) or \ isinstance(user_specified_cfg, Path): raise ValueError("Configuration should have already been loaded") # convert to an absolute path #user_specified_cfg = Path(user_specified_cfg).resolve() #assert user_specified_cfg.exists(), \ # "config file {} not found".format(user_specified_cfg) #updates = load(user_specified_cfg) #updates['filename'] = user_specified_cfg elif isinstance(user_specified_cfg, dict): updates = user_specified_cfg.copy() elif user_specified_cfg is None: return else: raise TypeError("unrecognized input for cfg") # if the user specifies a complexity option for IMS or NWT, # don't import any defaults ims_cfg = updates.get('ims', {}) if ims_cfg.get('options', {}).get('complexity'): # delete the defaults for default_block in 'nonlinear', 'linear': if default_block in self.cfg['ims']: del self.cfg['ims'][default_block] nwt_cfg = updates.get('nwt', {}) if nwt_cfg.get('options', 'specified').lower() != 'specified': keep_args = {'headtol', 'fluxtol', 'maxiterout', 'thickfact', 'linmeth', 'iprnwt', 'ibotav', 'Continue', 'use_existing_file'} self.cfg['nwt'] = {k: v for k, v in self.cfg['nwt'].items() if k in keep_args} update(self.cfg, updates) # make sure empty variables get initialized as dicts for k, v in self.cfg.items(): if v is None: self.cfg[k] = {} if 'filename' in self.cfg: config_file_path = Path(self.cfg['filename']) if config_file_path.is_absolute(): self.cfg = set_cfg_paths_to_absolute(self.cfg, config_file_path.parent) # mf6 models: set up or load the simulation if self.version == 'mf6': kwargs = self.cfg['simulation'].copy() kwargs.update(self.cfg['simulation']['options']) if os.path.exists('{}.nam'.format(kwargs['sim_name'])) and self._load: try: kwargs = get_input_arguments(kwargs, mf6.MFSimulation.load, warn=False) self._sim = mf6.MFSimulation.load(**kwargs) except: # create simulation kwargs = get_input_arguments(kwargs, mf6.MFSimulation, warn=False) self._sim = mf6.MFSimulation(**kwargs) else: # create simulation kwargs = get_input_arguments(kwargs, mf6.MFSimulation, warn=False) self._sim = mf6.MFSimulation(**kwargs) # load the parent model (skip if already attached) if 'namefile' in self.cfg.get('parent', {}).keys(): self._set_parent() output_paths = self.cfg['postprocessing']['output_folders'] for name, folder_path in output_paths.items(): if not os.path.exists(folder_path): os.makedirs(folder_path) setattr(self, '_{}_path'.format(name), folder_path) # absolute path to config file self._config_path = os.path.split(os.path.abspath(str(self.cfg['filename'])))[0] # set package keys to default dicts for pkg in self._package_setup_order: self.cfg[pkg] = defaultdict(dict, self.cfg.get(pkg, {})) # other variables self.cfg['external_files'] = {} # validate the configuration validate_configuration(self.cfg) def _get_high_k_lakes(self): """Get the i, j locations of any high-k lakes within the model grid. """ lakesdata = None lakes_shapefile = self.cfg['high_k_lakes'].get('source_data', {}).get('lakes_shapefile') if lakes_shapefile is not None: if isinstance(lakes_shapefile, str): lakes_shapefile = {'filename': lakes_shapefile} kwargs = get_input_arguments(lakes_shapefile, self.load_features) if 'include_ids' in kwargs: # load all lakes in shapefile kwargs.pop('include_ids') lakesdata = self.load_features(**kwargs) if lakesdata is not None: is_high_k_lake = rasterize(lakesdata, self.modelgrid) return is_high_k_lake > 0 def _set_isbc2d(self): """Set up the _isbc2d array, that indicates the i,j locations of boundary conditions. """ isbc = np.zeros((self.nrow, self.ncol), dtype=int) # high-k lakes if self.cfg['high_k_lakes']['simulate_high_k_lakes']: is_high_k_lake = self._get_high_k_lakes() if is_high_k_lake is not None: isbc[is_high_k_lake] = 2 # lake package lakes isbc[self._lakarr2d > 0] = 1 # add other bcs for packagename, bcnumber in self.bc_numbers.items(): if 'lak' not in packagename: package = self.get_package(packagename) if package is not None: # handle multiple instances of package # (in MODFLOW-6) if isinstance(package, flopy.pakbase.PackageInterface): packages = [package] else: packages = package for package in packages: k, i, j = get_bc_package_cells(package) not_a_lake = np.where(isbc[i, j] != 1) i = i[not_a_lake] j = j[not_a_lake] isbc[i, j] = bcnumber self._isbc_2d = isbc self._set_lake_bathymetry() def _set_isbc(self): isbc = np.zeros((self.nlay, self.nrow, self.ncol), dtype=int) isbc[0] = self._isbc2d # in mf6 models, the model top is set to the lake botm # and any layers originally above the lake botm # are also reset to the lake botm (given zero-thickness) lake_botm_elevations = self.dis.top.array below = self.dis.botm.array >= lake_botm_elevations if not self.version == 'mf6': lake_botm_elevations = self.dis.top.array - self.lake_bathymetry layer_tops = np.concatenate([[self.dis.top.array], self.dis.botm.array[:-1]]) # lakes must be at least 10% into a layer to get simulated in that layer below = layer_tops > lake_botm_elevations + 0.1 for i, ibelow in enumerate(below[1:]): if np.any(ibelow): isbc[i+1][ibelow] = self._isbc2d[ibelow] # add other bcs for packagename, bcnumber in self.bc_numbers.items(): if 'lak' not in packagename: package = self.get_package(packagename) if package is not None: # handle multiple instances of package # (in MODFLOW-6) if isinstance(package, flopy.pakbase.PackageInterface): packages = [package] else: packages = package for package in packages: k, i, j = get_bc_package_cells(package) not_a_lake = np.where(isbc[k, i, j] != 1) k = k[not_a_lake] i = i[not_a_lake] j = j[not_a_lake] isbc[k, i, j] = bcnumber self._isbc = isbc self._set_lakarr() def _set_lakarr2d(self): lakarr2d = np.zeros((self.nrow, self.ncol), dtype=int) if 'lak' in self.package_list: lakes_shapefile = self.cfg['lak'].get('source_data', {}).get('lakes_shapefile', {}).copy() if lakes_shapefile: kwargs = get_input_arguments(lakes_shapefile, self.load_features) lakesdata = self.load_features(**kwargs) # caches loaded features lakes_shapefile['lakesdata'] = lakesdata lakes_shapefile.pop('filename') kwargs = get_input_arguments(lakes_shapefile, make_lakarr2d) lakarr2d = make_lakarr2d(self.modelgrid, **kwargs) self._lakarr_2d = lakarr2d self._set_isbc2d() def _set_lakarr(self): self.setup_external_filepaths('lak', 'lakarr', self.cfg['lak']['{}_filename_fmt'.format('lakarr')], file_numbers=list(range(self.nlay))) # assign lakarr values from 3D isbc array lakarr = np.zeros((self.nlay, self.nrow, self.ncol), dtype=int) for k in range(self.nlay): lakarr[k][self.isbc[k] == 1] = self._lakarr2d[self.isbc[k] == 1] for k, ilakarr in enumerate(lakarr): save_array(self.cfg['intermediate_data']['lakarr'][0][k], ilakarr, fmt='%d') self._lakarr = lakarr def _set_lake_bathymetry(self): bathymetry_file = self.cfg.get('lak', {}).get('source_data', {}).get('bathymetry_raster') default_lake_depth = self.cfg['model'].get('default_lake_depth', 2) if bathymetry_file is not None: lmult = 1.0 if isinstance(bathymetry_file, dict): lmult = convert_length_units(bathymetry_file.get('length_units', 0), self.length_units) bathymetry_file = bathymetry_file['filename'] # sample pre-made bathymetry at grid points bathy = get_values_at_points(bathymetry_file, x=self.modelgrid.xcellcenters.ravel(), y=self.modelgrid.ycellcenters.ravel(), points_crs=self.modelgrid.crs, out_of_bounds_errors='coerce') bathy = np.reshape(bathy, (self.nrow, self.ncol)) * lmult bathy[(bathy < 0) | np.isnan(bathy)] = 0 # fill bathymetry grid in remaining lake cells with default lake depth # also ensure that all non lake cells have bathy=0 fill = (bathy == 0) & (self._isbc2d > 0) & (self._isbc2d < 3) bathy[fill] = default_lake_depth bathy[(self._isbc2d > 1) & (self._isbc2d > 2)] = 0 else: bathy = np.zeros((self.nrow, self.ncol)) self._lake_bathymetry = bathy def _set_parent_modelgrid(self, mg_kwargs=None): """Reset the parent model grid from keyword arguments or existing modelgrid, and DIS package. """ if mg_kwargs is not None: kwargs = mg_kwargs.copy() else: kwargs = {'xoff': self.parent.modelgrid.xoffset, 'yoff': self.parent.modelgrid.yoffset, 'angrot': self.parent.modelgrid.angrot, 'crs': self.parent.modelgrid.crs, 'epsg': self.parent.modelgrid.epsg, #'proj4': self.parent.modelgrid.proj4, } parent_units = get_model_length_units(self.parent) if 'lenuni' in self.cfg['parent']: parent_units = lenuni_text[self.cfg['parent']['lenuni']] elif 'length_units' in self.cfg['parent']: parent_units = self.cfg['parent']['length_units'] if self.version == 'mf6': self.parent.dis.length_units = parent_units else: self.parent.dis.lenuni = lenuni_values[parent_units] # make sure crs is populated, then get CRS units for the grid from gisutils import get_authority_crs if kwargs.get('crs') is not None: kwargs['crs'] = get_authority_crs(kwargs['crs']) elif kwargs.get('epsg') is not None: kwargs['crs'] = get_authority_crs(kwargs['epsg']) # no parent CRS info, assume the parent model is in the same CRS elif self.cfg['setup_grid'].get('crs') is not None: kwargs['crs'] = get_authority_crs(self.cfg['setup_grid']['crs']) # no parent CRS info, assume the parent model is in the same CRS elif self.cfg['setup_grid'].get('epsg') is not None: kwargs['crs'] = get_authority_crs(self.cfg['setup_grid']['epsg']) else: raise ValueError('No coordinate reference input in setup_grid: or parent: ' 'SpatialReference: blocks of configuration file. Supply ' 'at least coordinate reference information to ' 'setup_grid: crs: item.') parent_grid_units = kwargs['crs'].axis_info[0].unit_name if 'foot' in parent_grid_units.lower() or 'feet' in parent_grid_units.lower(): parent_grid_units = 'feet' elif 'metre' in parent_grid_units.lower() or 'meter' in parent_grid_units.lower(): parent_grid_units = 'meters' else: raise ValueError(f'unrecognized CRS units {parent_grid_units}: CRS must be projected in feet or meters') # assume that model grid is in a projected CRS of meters lmult = convert_length_units(parent_units, parent_grid_units) kwargs['delr'] = self.parent.dis.delr.array * lmult kwargs['delc'] = self.parent.dis.delc.array * lmult kwargs['top'] = self.parent.dis.top.array kwargs['botm'] = self.parent.dis.botm.array if hasattr(self.parent.dis, 'laycbd'): kwargs['laycbd'] = self.parent.dis.laycbd.array # renames for parent modelgrid renames = {'rotation': 'angrot'} for k, v in renames.items(): if k in kwargs: kwargs[v] = kwargs.pop(k) kwargs = get_input_arguments(kwargs, MFsetupGrid, warn=False) self._parent._mg_resync = False self._parent._modelgrid = MFsetupGrid(**kwargs) def _set_parent(self): """Set attributes related to a parent or source model if one is specified. """ # if it's an LGR model (where parent is also being created) # set up the parent DIS package if self._is_lgr and isinstance(self.parent, MFsetupMixin): if 'DIS' not in self.parent.get_package_list(): dis = self.parent.setup_dis() kwargs = self.cfg['parent'].copy() if kwargs is not None: kwargs = kwargs.copy() # load MF6 or MF2005 parent if self.parent is None: print('loading parent model {}...'.format(os.path.join(kwargs['model_ws'], kwargs['namefile']))) t0 = time.time() # load only specified packages that the parent model has packages_in_parent_namefile = get_packages(os.path.join(kwargs['model_ws'], kwargs['namefile'])) # load at least these packages # so that there is complete information on model time and space dis default_parent_packages = {'dis', 'tdis'} specified_packages = set(self.cfg['model'].get('packages', set())) specified_packages.update(default_parent_packages) # get equivalent packages to load if parent is another MODFLOW version; # then flatten (a package may have more than one equivalent) parent_packages = [get_package_name(p, kwargs['version']) for p in specified_packages] parent_packages = {item for subset in parent_packages for item in subset} if kwargs['version'] == 'mf6': parent_packages.add('sto') load_only = list(set(packages_in_parent_namefile).intersection(parent_packages)) if 'load_only' not in kwargs: kwargs['load_only'] = load_only if 'skip_load' in kwargs: kwargs['skip_load'] = [s.lower() for s in kwargs['skip_load']] kwargs['load_only'] = [pckg for pckg in kwargs['load_only'] if pckg not in kwargs['skip_load']] if self.cfg['parent']['version'] == 'mf6': sim_kwargs = kwargs.copy() if 'sim_name' not in kwargs: sim_kwargs['sim_name'] = kwargs.get('simulation', 'mfsim') if 'sim_ws' not in kwargs: sim_kwargs['sim_ws'] = sim_kwargs.get('model_ws', '.') sim_kwargs = get_input_arguments(sim_kwargs, mf6.MFSimulation.load, warn=False) parent_sim = mf6.MFSimulation.load(**sim_kwargs) modelname, _ = os.path.splitext(kwargs['namefile']) self._parent = parent_sim.get_model(modelname) else: kwargs['f'] = kwargs.pop('namefile') kwargs = get_input_arguments(kwargs, fm.Modflow.load, warn=False) self._parent = fm.Modflow.load(**kwargs) print("finished in {:.2f}s\n".format(time.time() - t0)) # set parent model units in config if not entered if 'length_units' not in self.cfg['parent']: self.cfg['parent']['length_units'] = get_model_length_units(self.parent) if 'time_units' not in self.cfg['parent']: self.cfg['parent']['time_units'] = get_model_time_units(self.parent) # set the parent model grid from mg_kwargs if not None # otherwise, convert parent model grid to MFsetupGrid mg_kwargs = self.cfg['parent'].get('SpatialReference', self.cfg['parent'].get('modelgrid', None)) # check configuration file input # for consistency with parent model DIS package input # (configuration file input may be different if an existing model # doesn't have a valid spatial reference in the DIS package) mf6_names = { 'rotation': 'angrot', 'xoff': 'xorigin', 'yoff': 'yorigin' } if mg_kwargs is not None and (self.parent.version == 'mf6') and not\ mg_kwargs.get('override_dis_package_input', False): for variable, mf6_name in mf6_names.items(): if (variable in mg_kwargs) and\ ('DIS' in self.parent.get_package_list()): dis_value = getattr(self.parent.dis, mf6_name).array if not np.allclose(mg_kwargs[variable], dis_value): raise ValueError( "Configuration file entry parent: SpatialReference: " f"{variable}: {mg_kwargs[variable]} does not match {mf6_name}={dis_value} " "specified in the parent model DIS package file. Either make " "these consistent or specify override_dis_package_input: True " "in the parent: SpatialReference: configuration block.") self._set_parent_modelgrid(mg_kwargs) # setup parent model perioddata table if getattr(self.parent, 'perioddata', None) is None: kwargs = self.cfg['parent'].copy() kwargs['model_time_units'] = self.cfg['parent']['time_units'] if self.parent.version == 'mf6': for var in ['perlen', 'nstp', 'tsmult']: kwargs[var] = getattr(self.parent.modeltime, var) kwargs['steady'] = self.parent.modeltime.steady_state kwargs['nper'] = self.parent.simulation.tdis.nper.array else: for var in ['perlen', 'steady', 'nstp', 'tsmult']: kwargs[var] = self.parent.dis.__dict__[var].array kwargs['nper'] = self.parent.dis.nper kwargs = get_input_arguments(kwargs, setup_perioddata_group) kwargs['oc_saverecord'] = {} if hasattr(self.parent, '_perioddata'): self._parent._perioddata = setup_perioddata_group(**kwargs) else: self._parent.perioddata = setup_perioddata_group(**kwargs) # default_source_data, where omitted configuration input is # obtained from parent model by default # Set default_source_data to True by default if it isn't specified if self.cfg['parent'].get('default_source_data') is None: self.cfg['parent']['default_source_data'] = True if self.cfg['parent'].get('default_source_data'): self._parent_default_source_data = True # set number of layers from parent if not specified if self.version == 'mf6' and self.cfg['dis']['dimensions'].get('nlay') is None: self.cfg['dis']['dimensions']['nlay'] = getattr(self.parent.dis.nlay, 'array', self.parent.dis.nlay) elif self.cfg['dis'].get('nlay') is None: self.cfg['dis']['nlay'] = getattr(self.parent.dis.nlay, 'array', self.parent.dis.nlay) # set start date/time from parent if not specified if not self._is_lgr: parent_start_date_time = self.cfg.get('parent', {}).get('start_date_time') if self.version == 'mf6': if self.cfg['tdis']['options'].get('start_date_time', '1970-01-01') == '1970-01-01' \ and parent_start_date_time is not None: self.cfg['tdis']['options']['start_date_time'] = self.cfg['parent']['start_date_time'] else: if self.cfg['dis'].get('start_date_time', '1970-01-01') == '1970-01-01' \ and parent_start_date_time is not None: self.cfg['dis']['start_date_time'] = self.cfg['parent']['start_date_time'] # only get time dis information from parent if # no periodata groups are specified, and nper is not specified under dimensions tdis_package = 'tdis' if self.version == 'mf6' else 'dis' # check if any item within perioddata block is a dictionary # (groups are subblocks within perioddata block) has_perioddata_groups = any([isinstance(k, dict) for k in self.cfg[tdis_package]['perioddata'].values()]) # get the number of inset model periods if not has_perioddata_groups: if self.version == 'mf6': if self.cfg['tdis']['dimensions'].get('nper') is None: self.cfg['tdis']['dimensions']['nper'] = self.parent.modeltime.nper nper = self.cfg['tdis']['dimensions']['nper'] else: if self.cfg['dis']['nper'] is None: self.cfg['dis']['nper'] = self.dis.nper nper = self.cfg['dis']['nper'] # get the periods that are shared with the parent model parent_periods = get_parent_stress_periods(self.parent, nper=nper, parent_stress_periods=self.cfg['parent'][ 'copy_stress_periods']) # get time discretization info. from the parent model if self.version == 'mf6': for var in ['perlen', 'nstp', 'tsmult']: if self.cfg['tdis']['perioddata'].get(var) is None: self.cfg['tdis']['perioddata'][var] = getattr(self.parent.modeltime, var)[ parent_periods] # 'steady' can be specified under sto package (as in MODFLOW-6) # or within perioddata group blocks # but not in the tdis perioddata block itset if self.cfg['sto'].get('steady') is None: self.cfg['sto']['steady'] = self.parent.modeltime.steady_state[parent_periods] else: for var in ['perlen', 'nstp', 'tsmult', 'steady']: if self.cfg['dis'].get(var) is None: self.cfg['dis'][var] = self.parent.dis.__dict__[var].array[parent_periods] def _setup_array(self, package, var, vmin=-1e30, vmax=1e30, source_model=None, source_package=None, **kwargs): return setup_array(self, package, var, vmin=vmin, vmax=vmax, source_model=source_model, source_package=source_package, **kwargs) def _setup_basic_stress_package(self, package, flopy_package_class, variable_columns, rivdata=None, **kwargs): print(f'\nSetting up {package.upper()} package...') t0 = time.time() # possible future support to # handle filenames of multiple packages # leave this out for now because of additional complexity # from multiple sets of external files #existing_packages = getattr(self, package, None) #filename = f"{self.name}.{package}" #if existing_packages is not None: # try: # len(existing_packages) # suffix = len(existing_packages) + 1 # except: # suffix = 1 # filename = f"{self.name}-{suffix}.{package}" # perimeter boundary (CHD or WEL) dfs = [] if 'perimeter_boundary' in kwargs: perimeter_cfg = kwargs['perimeter_boundary'] if package == 'chd': perimeter_cfg['boundary_type'] = 'head' boundname = 'perimeter-heads' elif package == 'wel': perimeter_cfg['boundary_type'] = 'flux' boundname = 'perimeter-fluxes' else: raise ValueError(f'Unsupported package for perimeter_boundary: {package.upper()}') if 'inset_parent_period_mapping' not in perimeter_cfg: perimeter_cfg['inset_parent_period_mapping'] = self.parent_stress_periods if 'parent_start_time' not in perimeter_cfg: perimeter_cfg['parent_start_date_time'] = self.parent.perioddata['start_datetime'][0] self.tmr = Tmr(self.parent, self, **perimeter_cfg) df = self.tmr.get_inset_boundary_values() # add boundname to allow boundary flux to be tracked as observation df['boundname'] = boundname dfs.append(df) # RIV package converted from SFR input elif rivdata is not None: if 'name' in rivdata.stress_period_data.columns: rivdata.stress_period_data['boundname'] = rivdata.stress_period_data['name'] dfs.append(rivdata.stress_period_data) # set up package from user input df_sd = None if 'source_data' in kwargs: if package == 'wel': dropped_wells_file =\ kwargs.get('output_files', {})\ .get('dropped_wells_file', '{}_dropped_wells.csv').format(self.name) df_sd = setup_wel_data(self, source_data=kwargs['source_data'], dropped_wells_file=dropped_wells_file) else: df_sd = setup_basic_stress_data(self, **kwargs['source_data'], **kwargs.get('mfsetup_options', dict())) if df_sd is not None and len(df_sd) > 0: dfs.append(df_sd) # set up package from parent model elif self.cfg['parent'].get('default_source_data') and\ hasattr(self.parent, package): if package == 'wel': dropped_wells_file =\ kwargs['output_files']['dropped_wells_file'].format(self.name) df_sd = setup_wel_data(self, dropped_wells_file=dropped_wells_file) else: print(f'Skipping setup of {package.upper()} Package from parent model-- not implemented.') if df_sd is not None and len(df_sd) > 0: dfs.append(df_sd) if len(dfs) == 0: print(f"{package.upper()} package:\n" "No input specified or package configuration file input " "not understood. See the Configuration " "File Gallery in the online docs for example input " "Note that direct input to basic stress period packages " "is currently not supported.") return else: df = pd.concat(dfs, axis=0) # option to write stress_period_data to external files if self.version == 'mf6': external_files = self.cfg[package]['mfsetup_options'].get('external_files', True) else: # external list or tabular type files not supported for MODFLOW-NWT # adding support for this may require changes to Flopy external_files = False external_filename_fmt = self.cfg[package]['mfsetup_options']['external_filename_fmt'] spd = setup_flopy_stress_period_data(self, package, df, flopy_package_class=flopy_package_class, variable_columns=variable_columns, external_files=external_files, external_filename_fmt=external_filename_fmt) kwargs = self.cfg[package] if isinstance(self.cfg[package]['options'], dict): kwargs.update(self.cfg[package]['options']) #kwargs['filename'] = filename # add observation for perimeter BCs # and any user input with a boundname col obslist = [] obsfile = f'{self.name}.{package}.obs.output.csv' if 'perimeter_boundary' in kwargs: perimeter_btype = f"perimeter-{perimeter_cfg['boundary_type']}" obslist.append((perimeter_btype, package, perimeter_btype)) if 'boundname' in df.columns: unique_boundnames = df['boundname'].unique() for bname in unique_boundnames: obslist.append((bname, package, bname)) if len(obslist) > 0: kwargs['observations'] = {obsfile: obslist} kwargs = get_input_arguments(kwargs, flopy_package_class) if not external_files: kwargs['stress_period_data'] = spd pckg = flopy_package_class(self, **kwargs) print("finished in {:.2f}s\n".format(time.time() - t0)) return pckg
[docs] def setup_grid(self): """Set up the attached modelgrid instance from configuration input """ if self.cfg['grid']: cfg = self.cfg['grid'] cfg['rotation'] = self.cfg['grid']['angrot'] else: cfg = self.cfg['setup_grid'] #.copy() # update grid configuration with any information supplied to dis package # (so that settings specified for DIS package have priority) self._update_grid_configuration_with_dis() if not cfg['structured']: raise NotImplementedError('Support for unstructured grids') features_shapefile = cfg.get('source_data', {}).get('features_shapefile') if features_shapefile is not None and 'features_shapefile' not in cfg: features_shapefile['features_shapefile'] = features_shapefile['filename'] del features_shapefile['filename'] cfg.update(features_shapefile) cfg['parent_model'] = self.parent cfg['model_length_units'] = self.length_units output_files = self.cfg['setup_grid']['output_files'] cfg['grid_file'] = output_files['grid_file'].format(self.name) bbox_shapefile_name = Path(output_files['bbox_shapefile'].format(self.name)).name cfg['bbox_shapefile'] = Path(self._shapefiles_path) / bbox_shapefile_name if 'DIS' in self.get_package_list(): cfg['top'] = self.dis.top.array cfg['botm'] = self.dis.botm.array # if model is an LGR inset with the default rotation=0 # and the LGR parent is rotated # assume that the inset model rotation should == parent # (different LGR parent/inset rotations not allowed) if self._is_lgr and (cfg['rotation'] == 0) and\ self.parent.modelgrid.angrot != 0: cfg['rotation'] = self.parent.modelgrid.angrot if os.path.exists(cfg['grid_file']) and self._load: print('Loading model grid definition from {}'.format(cfg['grid_file'])) cfg.update(load(cfg['grid_file'])) self.cfg['grid'] = cfg kwargs = get_input_arguments(self.cfg['grid'], MFsetupGrid) self._modelgrid = MFsetupGrid(**kwargs) self._modelgrid.cfg = self.cfg['grid'] else: kwargs = get_input_arguments(cfg, setup_structured_grid) if not set(kwargs.keys()).intersection({ 'features_shapefile', 'features', 'xoff', 'yoff', 'xul', 'yul'}): raise ValueError( "No features_shapefile or xoff, yoff supplied " "to setup_grid: block. Check configuration file input, " "including for accidental indentation of the setup_grid: block.") self._modelgrid = setup_structured_grid(**kwargs) self.cfg['grid'] = self._modelgrid.cfg # update DIS package configuration if self.version == 'mf6': self.cfg['dis']['dimensions']['nrow'] = self.cfg['grid']['nrow'] self.cfg['dis']['dimensions']['ncol'] = self.cfg['grid']['ncol'] else: self.cfg['dis']['nrow'] = self.cfg['grid']['nrow'] self.cfg['dis']['ncol'] = self.cfg['grid']['ncol'] self._reset_bc_arrays() # set up local grid refinement if 'lgr' in self.cfg['setup_grid'].keys(): if self.version != 'mf6': raise TypeError('LGR only supported for MODFLOW-6 models.') if not self.lgr: self.lgr = True for key, cfg in self.cfg['setup_grid']['lgr'].items(): existing_inset_models = set() if isinstance(self.inset, dict): existing_inset_models = {k for k, v in self.inset.items()} if key not in existing_inset_models: self.create_lgr_models()
[docs] def load_grid(self, gridfile=None): """Load model grid information from a json or yml file.""" if gridfile is None: if os.path.exists(self.cfg['setup_grid']['grid_file']): gridfile = self.cfg['setup_grid']['grid_file'] print('Loading model grid information from {}'.format(gridfile)) self.cfg['grid'] = load(gridfile)
def setup_sfr(self, **kwargs): package = 'sfr' print('\nSetting up {} package...'.format(package.upper())) t0 = time.time() # input flowlines = self.cfg['sfr'].get('source_data', {}).get('flowlines') if flowlines is not None: if 'nhdplus_paths' in flowlines.keys(): nhdplus_paths = flowlines['nhdplus_paths'] for f in nhdplus_paths: if not os.path.exists(f): print('SFR setup: missing input file: {}'.format(f)) nhdplus_paths.remove(f) if len(nhdplus_paths) == 0: return # create an sfrmaker.lines instance bbox_filter = project(self.bbox, self.modelgrid.crs, 'epsg:4269').bounds lines = Lines.from_nhdplus_v2(NHDPlus_paths=nhdplus_paths, bbox_filter=bbox_filter) else: for key in ['filename', 'filenames']: if key in flowlines: kwargs = flowlines.copy() kwargs['shapefile'] = kwargs.pop(key) check_source_files(kwargs['shapefile']) if 'epsg' not in kwargs: try: from gisutils import get_shapefile_crs shapefile_crs = get_shapefile_crs(kwargs['shapefile']) except Exception as e: print(e) msg = ('Need gis-utils >= 0.2 to get crs' ' for shapefile: {}\nPlease pip install ' '--upgrade gis-utils'.format(kwargs['shapefile'])) print(msg) else: shapefile_crs = pyproj.crs.CRS.from_epsg(kwargs['epsg']) authority = shapefile_crs.to_authority() if authority is not None: shapefile_crs = pyproj.CRS.from_user_input(shapefile_crs.to_authority()) bbox_filter = self.bbox.bounds if shapefile_crs != self.modelgrid.crs: bbox_filter = project(self.bbox, self.modelgrid.crs, shapefile_crs).bounds kwargs['bbox_filter'] = bbox_filter # create an sfrmaker.lines instance kwargs = get_input_arguments(kwargs, Lines.from_shapefile) lines = Lines.from_shapefile(**kwargs) break else: return # output output_path = self.cfg['sfr'].get('output_path') if output_path is not None: if not os.path.isdir(output_path): os.makedirs(output_path) else: output_path = self.cfg['postprocessing']['output_folders']['shapefiles'] self.cfg['sfr']['output_path'] = output_path # create isfr array (where SFR cells will be populated) if self.version == 'mf6': active_cells = np.sum(self.idomain >= 1, axis=0) > 0 # For models with LGR, set the LGR area to isfr=0 # to prevent SFR from being generated within the LGR area # needed for LGR models that only have refinement # in some layers (in other words, active parent model cells # below the LGR inset) if self.lgr: active_cells[self._lgr_idomain2d == 0] = 0 else: active_cells = np.sum(self.ibound >= 1, axis=0) > 0 #active_cells = self.ibound.sum(axis=0) > 0 # only include active cells that don't have another boundary condition # (besides the wel package) isfr = active_cells & (self._isbc2d <= 0) # kludge to get sfrmaker to work with modelgrid self.modelgrid.model_length_units = self.length_units # create an sfrmaker.sfrdata instance from the lines instance to_sfr_kwargs = self.cfg['sfr'].copy() if not self.cfg['sfr'].get('sfrmaker_options'): self.cfg['sfr']['sfrmaker_options'] = {} to_sfr_kwargs.update(self.cfg['sfr']['sfrmaker_options']) #to_sfr_kwargs = get_input_arguments(to_sfr_kwargs, Lines.to_sfr) sfr = lines.to_sfr(grid=self.modelgrid, isfr=isfr, model=self, **to_sfr_kwargs) if self.cfg['sfr'].get('set_streambed_top_elevations_from_dem'): warnings.warn('sfr: set_streambed_top_elevations_from_dem option is now under sfr: sfrmaker_options', DeprecationWarning) self.cfg['sfr']['sfrmaker_options']['set_streambed_top_elevations_from_dem'] = True if self.cfg['sfr']['sfrmaker_options'].get('set_streambed_top_elevations_from_dem'): dem_kwargs = self.cfg['sfr']['sfrmaker_options'].get('set_streambed_top_elevations_from_dem') if not isinstance(dem_kwargs, dict): dem_kwargs = {} error_msg = ( "If set_streambed_top_elevations_from_dem=True, " "need a dem block in source_data for SFR package. " "Otherwise set_streambed_top_elevations_from_dem should be" "a block with arguments to " "sfrmaker.SFRData.set_streambed_top_elevations_from_dem") assert 'dem' in self.cfg['sfr'].get('source_data', {}), error_msg dem_kwargs.update(self.cfg['sfr']['source_data']['dem']) sfr.set_streambed_top_elevations_from_dem(**dem_kwargs) else: sfr.reach_data['strtop'] = sfr.interpolate_to_reaches('elevup', 'elevdn') # assign layers to the sfr reaches botm = self.dis.botm.array.copy() if self.version == 'mf6': idomain = self.dis.idomain.array else: idomain = self.bas6.ibound.array layers, new_botm = assign_layers(sfr.reach_data, botm_array=botm, idomain=idomain) sfr.reach_data['k'] = layers if new_botm is not None: # run thru setup_array so that DIS input remains open/close self._setup_array('dis', 'botm', data={i: arr for i, arr in enumerate(new_botm)}, datatype='array3d', write_fmt='%.2f', dtype=int) # reset the bottom array in flopy (and in memory) # is this necessary? = self.dis.botm = new_botm # set bottom array to external files if self.version == 'mf6': self.dis.botm = self.cfg['dis']['griddata']['botm'] else: self.dis.botm = self.cfg['dis']['botm'] print('\nModel cell bottom elevations adjusted after assigning ' 'SFR reaches to layers\n(to accommodate SFR reach bottoms ' 'below the previous model bottom)\n') # option to convert reaches to the River Package if self.cfg['sfr'].get('to_riv'): warnings.warn('sfr: to_riv option is now under sfr: sfrmaker_options', DeprecationWarning) self.cfg['sfr']['sfrmaker_options']['to_riv'] = self.cfg['sfr'].get('to_riv') if self.cfg['sfr'].get('sfrmaker_options', {}).get('to_riv'): rivdata = sfr.to_riv(line_ids=self.cfg['sfr']['sfrmaker_options']['to_riv'], drop_in_sfr=True) # setup of RIV package from SFRmaker-derived RIVdata # and any user input # do this instead of 2 seperate packages # to avoid having two sets of external files self.setup_riv(rivdata, **self.cfg['riv'], **self.cfg['riv']['mfsetup_options']) rivdata_filename = self.cfg['riv']['output_files']['rivdata_file'].format(self.name) rivdata.write_table(os.path.join(self._tables_path, rivdata_filename)) rivdata.write_shapefiles('{}/{}'.format(self._shapefiles_path, self.name)) # optional routing input # (for a complete representation of a larger or more detailed # stream network that may be culled in SFR package) sd = self.cfg['sfr'].get('source_data', {}) routing_input_key = [k for k in sd.keys() if 'routing' in k] routing_input = None if len(routing_input_key) > 0: routing_input = sd.get(routing_input_key[0]) routing = pd.read_csv(routing_input['filename']) routing = dict(zip(routing[routing_input['id_column']], routing[routing_input['routing_column']])) # set any values (downstream lines) not in keys (upstream lines) # to 0 (outlet condition) routing = {k: v if v in routing.keys() else 0 for k, v in routing.items()} # use _original_routing attached to Lines instance as default else: routing = lines._original_routing # add inflows inflows_input = self.cfg['sfr'].get('source_data', {}).get('inflows') if inflows_input is not None: # resample inflows to model stress periods inflows_input['id_column'] = inflows_input['line_id_column'] sd = TransientTabularSourceData.from_config(inflows_input, dest_model=self) inflows_by_stress_period = sd.get_data() missing_sites = set(inflows_by_stress_period[inflows_input['id_column']]). \ difference(routing.keys()) if any(missing_sites): # cast IDs to strings for compatibility with SFRmaker > 0.11.3 # for now, assume IDs are numeric; future updates to SFRmaker # may eventually allow for alpha numeric IDs inflows_by_stress_period[inflows_input['id_column']] =\ inflows_by_stress_period[inflows_input['id_column']].astype(int).astype(str) # check if all inflow sites are included in sfr network missing_sites = set(inflows_by_stress_period[inflows_input['id_column']]). \ difference(routing.keys()) # if there are missing sites, try using the supplied routing if any(missing_sites): raise KeyError(('inflow sites {} are not within the model sfr network. ' 'Please supply an inflows_routing source_data block ' '(see shellmound example config file)'.format(missing_sites))) # add resampled inflows to SFR package inflows_input['data'] = inflows_by_stress_period inflows_input['flowline_routing'] = routing if self.version == 'mf6': inflows_input['variable'] = 'inflow' method = sfr.add_to_perioddata else: inflows_input['variable'] = 'flow' method = sfr.add_to_segment_data kwargs = get_input_arguments(inflows_input.copy(), method) method(**kwargs) # add runoff runoff_input = self.cfg['sfr'].get('source_data', {}).get('runoff') if runoff_input is not None: # resample inflows to model stress periods runoff_input['id_column'] = runoff_input['line_id_column'] sd = TransientTabularSourceData.from_config(runoff_input, dest_model=self) runoff_by_stress_period = sd.get_data() # check if all sites are included in sfr network missing_sites = set(runoff_by_stress_period[runoff_input['id_column']]). \ difference(routing.keys()) if any(missing_sites): warnings.warn(('runoff sites {} are not within the model sfr network. ' 'Please supply an inflows_routing source_data block ' '(see shellmound example config file)'.format(missing_sites)), UserWarning) # add resampled inflows to SFR package runoff_input['data'] = runoff_by_stress_period runoff_input['flowline_routing'] = routing runoff_input['variable'] = 'runoff' runoff_input['distribute_flows_to_reaches'] = True if self.version == 'mf6': method = sfr.add_to_perioddata else: method = sfr.add_to_segment_data kwargs = get_input_arguments(runoff_input.copy(), method) method(**kwargs) # add observations observations_input = self.cfg['sfr'].get('source_data', {}).get('observations') if self.version != 'mf6': sfr.gage_starting_unit_number = self.cfg['gag']['starting_unit_number'] if observations_input is not None: key = 'filename' if 'filename' in observations_input else 'filenames' observations_input['data'] = observations_input[key] kwargs = get_input_arguments(observations_input.copy(), sfr.add_observations) obsdata = sfr.add_observations(**kwargs) # resample observations to model stress periods; write to table # write reach and segment data tables sfr.write_tables('{}/{}'.format(self._tables_path, self.name)) # export shapefiles of lines, routing, cell polygons, inlets and outlets sfr.write_shapefiles('{}/{}'.format(self._shapefiles_path, self.name)) # create the flopy SFR package instance sfr.create_modflow_sfr2(model=self, istcb2=223) if self.version != 'mf6': sfr_package = sfr.modflow_sfr2 else: # pass options kwargs through to mf6 constructor kwargs = flatten({k:v for k, v in self.cfg[package].items() if k not in {'source_data', 'flowlines', 'inflows', 'observations', 'inflows_routing', 'dem', 'sfrmaker_options'}}) kwargs = get_input_arguments(kwargs, mf6.ModflowGwfsfr) sfr_package = sfr.create_mf6sfr(model=self, **kwargs) # monkey patch ModflowGwfsfr instance to behave like ModflowSfr2 sfr_package.reach_data = sfr.modflow_sfr2.reach_data # attach the sfrmaker.sfrdata instance as an attribute self.sfrdata = sfr # reset dependent arrays self._reset_bc_arrays() if self.version == 'mf6': self._set_idomain() print("finished in {:.2f}s\n".format(time.time() - t0)) return sfr_package def setup_solver(self): if self.version == 'mf6': solver_package = 'ims' else: solver_package = 'nwt' assert solver_package not in self.package_list setup_method_name = 'setup_{}'.format(solver_package) package_setup = getattr(self, setup_method_name, None) package_setup() def setup_packages(self, reset_existing=True): package_list = self.package_list #['sfr'] #m.package_list # ['tdis', 'dis', 'npf', 'oc'] if not reset_existing: package_list = [p for p in package_list if p.upper() not in self.get_package_list()] for pkg in package_list: setup_method_name = f'setup_{pkg}' package_setup = getattr(self, setup_method_name, None) if package_setup is None: print('{} package not supported for MODFLOW version={}'.format(pkg.upper(), self.version)) continue if not callable(package_setup): package_setup = getattr(MFsetupMixin, 'setup_{}'.format(pkg.strip('6'))) # avoid multiple package instances for now, except for obs if self.version != 'mf6' or pkg == 'obs' or not hasattr(self, pkg): package_setup(**self.cfg[pkg], **self.cfg[pkg]['mfsetup_options'])
[docs] @classmethod def load_cfg(cls, yamlfile, verbose=False): """Loads a configuration file, with default settings specific to the MFnwtModel or MF6model class. Parameters ---------- yamlfile : str (filepath) Configuration file in YAML format with pfl_nwt setup information. verbose : bool Returns ------- cfg : dict (configuration dictionary) """ return load_cfg(yamlfile, verbose=verbose, default_file=cls.default_file)
[docs] @classmethod def setup_from_yaml(cls, yamlfile, verbose=False): """Make a model from scratch, using information in a yamlfile. Parameters ---------- yamlfile : str (filepath) Configuration file in YAML format with pfl_nwt setup information. verbose : bool Returns ------- m : model instance """ cfg = cls.load_cfg(yamlfile, verbose=verbose) return cls.setup_from_cfg(cfg, verbose=verbose)
[docs] @classmethod def setup_from_cfg(cls, cfg, verbose=False): """Make a model from scratch, using information in a configuration dictionary. Parameters ---------- cfg : dict Configuration dictionary, as produced by the model.load_cfg method. verbose : bool Returns ------- m : model instance """ cfg_filename = Path(cfg.get('filename', '')).name msg = f"\nSetting up {cfg['model']['modelname']} model" if len(cfg_filename) > 0: msg += f" from configuration in {cfg_filename}" print(msg) t0 = time.time() m = cls(cfg=cfg) #, **kwargs) # make a grid if one isn't already specified if 'grid' not in m.cfg.keys(): m.setup_grid() # establish time discretization, including TDIS setup for MODFLOW-6 m.setup_tdis() # set up the solver m.setup_solver() # set up all of the packages specified in the config file m.setup_packages(reset_existing=False) # LGR inset model(s) if m.inset is not None: for k, v in m.inset.items(): if v._is_lgr: v.setup_packages() m.setup_lgr_exchanges() print('finished setting up model in {:.2f}s'.format(time.time() - t0)) print('\n{}'.format(m)) return m