Source code for sfrmaker.observations

"""
Functions for handling observations of SFR package output.
"""
import os
from pathlib import Path
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import Polygon
from gisutils import shp2df
try:
    import flopy
    fm = flopy.modflow
except:
    flopy = False
from .gis import get_shapefile_crs, project
from .fileio import read_tables
from .routing import get_next_id_in_subset


[docs] def add_observations(sfrdata, data, flowline_routing=None, obstype=None, sfrlines_shapefile=None, rno_column_in_sfrlines='rno', x_location_column=None, y_location_column=None, line_id_column=None, rno_column=None, obstype_column=None, obsname_column='site_no'): """Add SFR observations to the observations DataFrame attribute of an sfrdata instance. Observations can by located on the SFR network by specifying reach number directly (rno_column), by x, y location (x_column_in_data and y_column in data), or by specifying the source hydrography lines that they are located on (line_id_column). Parameters ---------- sfrdata : sfrmaker.SFRData instance SFRData instance with reach_data table attribute. To add observations from x, y coordinates, the reach_data table must have a geometry column with LineStrings representing each reach, or an sfrlines_shapefile is required. Reach numbers are assumed to be in an 'rno' column. data : DataFrame, path to csv file, or list of DataFrames or file paths Table with information on the observation sites to be located. Must have either reach numbers (rno_column), line_ids (line_id_column), or x and y locations (x_column_in_data and y_column_in_data). obstype : str or list-like (optional) Type(s) of observation to record, for MODFLOW-6 (default 'downstream-flow'; see MODFLOW-6 IO documentation for more details). Alternatively, observation types can be specified by row in data, using the obstype_column_in_data argument. x_location_column : str (optional) Column in data with site x-coordinates (in same CRS as SFR network). y_location_column : str (optional) Column in data with site y-coordinates (in same CRS as SFR network). sfrlines_shapefile : str (optional) Shapefile version of SFRdata.reach_data. Only needed if SFRdata.reach_data doesn't have LineString geometries for the reaches. rno_column_in_sfrlines : str (optional) Column in sfrlines with reach numbers for matching lines with reaches in sfrdata, or reach numbers assigned to observation sites. (default 'rno') line_id_column : str Column in data matching observation sites to line_ids in the source hydrography data. rno_column : str Column in data matching observation sites to reach numbers in the SFR network. flowline_routing : dict Optional dictionary of routing for source hydrography. Only needed if locating by line_id, and SFR network is a subset of the full source hydrography (i.e. some lines were dropped in the creation of the SFR packge, or if the sites are inflow points corresponding to lines outside of the model perimeter). In this case, observation points referenced to line_ids that are missing from the SFR network are placed at the first reach corresponding to the next downstream line_id that is represented in the SFR network. obstype_column : str (optional) Column in data with MODFLOW-6 observation types. For adding observations of different types. If obstype and obstype_column_in_data are none, the default of 'downstream-flow' will be used. obsname_column : str Column in data with unique identifier (e.g. site number or name) for observation sites. Notes ----- Sites located by line_id (source hydrography) will be assigned to the last reach in the segment corresponding to the line_id. Locating by x, y or reach number is more accurate. """ sfrd = sfrdata reach_data = sfrdata.reach_data.copy() # allow input via a list of tables or single table data = read_tables(data, dtype={obsname_column: object})#, dtype={obsname_column: object}) # need a special case if allowing obsname_column to also be identifier if obsname_column == rno_column: obsname_column = f'obsnamecol_{obsname_column}' data[obsname_column] = data[rno_column].astype(object) elif obsname_column == line_id_column: obsname_column = f'obsnamecol_{obsname_column}' data[obsname_column] = data[line_id_column].astype(object) else: data[obsname_column] = data[obsname_column].astype(object) assert data[obsname_column].dtype == object # read reach geometries from a shapefile if sfrlines_shapefile is not None: sfrlines = shp2df(sfrlines_shapefile) geoms = dict(zip(sfrlines[rno_column_in_sfrlines], sfrlines['geometry'])) reach_data['geometry'] = [geoms[rno] for rno in reach_data['rno']] # if no reach number is provided msg = "Observation sites need reach number, (x,y) coordinates, or source hydrography IDs" if rno_column not in data.columns: rno_column = 'rno' # get reach numbers by x, y location of sites if x_location_column in data.columns and y_location_column in data.columns: locs = locate_sites(data, reach_data, x_column_in_data=x_location_column, y_column_in_data=y_location_column, reach_id_col='rno', # reach number column in reach_data site_number_col=obsname_column ) data[rno_column] = locs['rno'] # get reach number from site locations in source hydrography (line_ids) elif line_id_column in data.columns: # map NHDPlus COMIDs to reach numbers if flowline_routing is None: line_id = dict(zip(reach_data.iseg, reach_data.line_id)) sfr_routing = sfrdata.segment_routing.copy() # routing for source hydrography flowline_routing = {line_id.get(k, 0): line_id.get(v, 0) for k, v in sfr_routing.items()} # get the last reach in each segment r1 = reach_data.sort_values(by=['iseg', 'ireach'], axis=0).groupby('iseg').last() line_id_rno_mapping = dict(zip(r1['line_id'], r1['rno'])) line_ids = get_next_id_in_subset(r1.line_id, flowline_routing, data[line_id_column]) data[rno_column] = [line_id_rno_mapping[lid] for lid in line_ids] else: raise ValueError(msg) # create observations dataframe obsdata = pd.DataFrame(columns=sfrd.observations.columns) # remove duplicate locations data = data.groupby(rno_column).first().reset_index() obsdata['rno'] = data[rno_column] # segment and reach info iseg_ireach = dict(list(zip(reach_data.rno, zip(reach_data.iseg, reach_data.ireach)))) obsdata['iseg'] = [iseg_ireach[rno][0] for rno in obsdata.rno] obsdata['ireach'] = [iseg_ireach[rno][1] for rno in obsdata.rno] for col in ['rno', 'iseg', 'ireach']: obsdata[col] = obsdata[col].astype(int) if obstype is not None: if isinstance(obstype, str): obsdata['obstype'] = obstype obsdata['obsname'] = data[obsname_column].astype(str) else: obstypes = obstype dfs = [] for obstype in obstypes: df = obsdata.copy() df['obstype'] = obstype obsnme_suffix = obstype.split('-')[-1] df['obsname'] = [f"{obsnme}-{obsnme_suffix}" for obsnme in data[obsname_column].astype(str)] dfs.append(df) obsdata = pd.concat(dfs) elif obstype_column in data.columns: obsdata['obstype'] = data[obstype_column] # check if base observation names (with just site info) are unique # if not, formulate name based on type as well site_names = data[obsname_column].astype(str) if len(set(site_names)) < len(data): obsname_suffixes = [obstype.split('-')[-1] for obstype in obsdata['obstype']] obsdata['obsname'] = [f"{obsnme}-{obsnme_suffix}" for obsnme, obsnme_suffix in zip(site_names, obsname_suffixes)] else: obsdata['obsname'] = site_names else: obsdata['obstype'] = 'downstream-flow' obsdata['obsname'] = data[obsname_column].astype(str) return obsdata
[docs] def get_closest_reach(x, y, sfrlines, rno_column='rno'): """Get the SFR reach number closest to a point feature. Parameters ---------- x : scalar or list of scalars x-coordinate(s) of point feature(s) y : scalar or list of scalars y-coordinate(s) or point feature(s) sfrlines: dataframe DataFrame containing a geometry column with SFR line arcs, and a column rno_column with unique numbers for each reach. rno_column: str Column with unique number for each reach. default "rno" threshold : numeric Distance threshold (in CRS units). Only return reaches within this distance. Returns ------- rno : int or list of ints Reach numbers for reaches closest to each location defined by x, y. distance: int or list of ints Distance from each site in x, y to the corresponding closest SFR reach. """ scalar = False if np.isscalar(x): scalar = True x = [x] if np.isscalar(y): y = [y] geoms = sfrlines.geometry.values rno = sfrlines[rno_column].values allX = [] # all x coordinates in sfrlines allY = [] # all y coordinates in sfrlines all_rno = [] # reach number for each x, y point in sfrlines for i, g in enumerate(geoms): if 'Multi' not in g.type: g = [g] for part in g: gx, gy = part.coords.xy allX += gx allY += gy all_rno += [rno[i]]*len(gx) allX = np.array(allX) allY = np.array(allY) all_rno = np.array(all_rno) assert len(all_rno) == len(allX) rno = [] distance = [] for i in range(len(x)): distances = np.sqrt((allX - x[i]) ** 2 + (allY - y[i]) ** 2) idx = np.argmin(distances) rno.append(all_rno[idx]) distance.append(np.min(distances)) if scalar: return rno[0], distance[0] else: return rno, distance
[docs] def locate_sites(site_data, reach_data, active_area_shapefile=None, x_column_in_data=None, y_column_in_data=None, reach_id_col='rno', ireach_col='ireach', iseg_col='iseg', site_number_col='site_no', keep_columns=None, perimeter_buffer=1000, distance_threshold=1000 ): """Get SFR reach locations corresponding to x, y points (e.g. measurement site locations). Parameters ---------- site_data: (Geo)DataFrame or shapefile DataFrame or shapefile with point locations and attribute data for stream flow observation sites. Point locations can be specified in a DataFrame by either x_column_in_data and y_column_in_data, or a 'geometry' column of shapely points. If shapefiles or GeoDataFrames are provided for both site_data and reach_data, they can be in any CRS, but both must have valid CRS references (.prj file or .crs attribute); otherwise site_data and reach_data are assumed to be in the same CRS. reach_data: (Geo)DataFrame or shapefile SFRData.reach_data DataFrame, or shapefile equivalent with line-arcs representing all segments and/or reaches. If shapefiles or GeoDataFrames are provided for both site_data and reach_data, they can be in any CRS, but both must have valid CRS references (.prj file or .crs attribute); otherwise site_data and reach_data are assumed to be in the same CRS. active_area_shapefile: ESRI shapefile or shapely polygon (optional) Shapefile or polygon, in same CRS as sfr_lines_shapefile, defining areal extent (perimeter) of SFR network. x_column_in_data : str (optional) Column in data with site x-coordinates (in same CRS as SFR network). y_column_in_data : str (optional) Column in data with site y-coordinates (in same CRS as SFR network). reach_id_col: str Column with 1-based unique number for each stream line-arc. default "rno" ireach_col: str Column with 1-based reach number within segment (i.e. in the modflow-2005 SFR data model), by default, 'ireach' iseg_col: str Column with 1-based reach segment number (i.e. in the modflow-2005 SFR data model), by default, 'iseg' site_number_col : str Name of column in sites_shapefile with number identifying each site to be located. default "site_no" keep_columns: list of strings List of columns in sites_shapefile to retain when writing output_csv_file and output_shape_file. perimeter_buffer : scalar Exclude flows within this distance of perimeter defined by active_area. For example, a value of 1000 would mean that sites must be at least 1 km inside of the active area perimeter to be included. distance_threshold : scalar Only consider sites within this distance of a stream line-arc. Returns ------- locs : DataFrame """ sfr_crs = None locs_crs = None # read in sfr lines if not isinstance(reach_data, pd.DataFrame): sfrlines = gpd.read_file(reach_data) sfr_crs = sfrlines.crs elif isinstance(reach_data, pd.DataFrame): sfrlines = reach_data.copy() sfr_crs = getattr(sfrlines, 'crs', None) else: raise TypeError('Datatype for reach_data not understood: {}'.format(reach_data)) if reach_id_col not in sfrlines.columns: sfrlines.index = np.arange(len(sfrlines)) + 1 sfrlines[reach_id_col] = np.arange(len(sfrlines)) + 1 else: sfrlines.index = sfrlines[reach_id_col] # sites to locate if not isinstance(site_data, pd.DataFrame): locs = gpd.read_file(site_data) if isinstance(site_data, list): locs_crs = get_shapefile_crs(site_data[0]) else: locs_crs = get_shapefile_crs(site_data) locs['site_no'] = locs[site_number_col] # str_ids(locs.site_no) elif isinstance(site_data, pd.DataFrame): locs = site_data.copy() locs_crs = getattr(locs, 'crs', None) else: raise TypeError('Datatype for site_data not understood: {}'.format(site_data)) # to_crs if crs are available if locs_crs is not None and sfr_crs is not None: locs['geometry'] = project(locs.geometry.values, locs_crs, sfr_crs) # get the x and y coordinates if x_column_in_data is not None and y_column_in_data is not None: x = locs[x_column_in_data] y = locs[y_column_in_data] else: x = [p.x for p in locs.geometry] y = [p.y for p in locs.geometry] ids, distances = get_closest_reach(x, y, sfrlines, rno_column=reach_id_col) reach_id_col = reach_id_col.lower() locs[reach_id_col] = ids locs['distance'] = distances if iseg_col in sfrlines.columns: locs[iseg_col] = sfrlines.loc[ids, iseg_col].values if ireach_col in sfrlines.columns: locs[ireach_col] = sfrlines.loc[ids, ireach_col].values locs = locs.loc[locs['distance'] <= distance_threshold] # cull observations at or outside of model perimeter # to only those along model perimeter if active_area_shapefile is not None: active_area = active_area_shapefile if not isinstance(active_area_shapefile, Polygon): active_area = gpd.read_file(active_area_shapefile).geometry[0] perimeter = active_area.exterior.buffer(perimeter_buffer) perimeter_inside_buffer = Polygon(perimeter.interiors[0]) keep = [] for rn in locs[reach_id_col]: geom = sfrlines.loc[rn, 'geometry'] keep.append(geom.within(perimeter_inside_buffer)) else: keep = slice(None) if keep_columns is None: keep_columns = locs.columns.tolist() for c in [reach_id_col, iseg_col, ireach_col, 'geometry']: if c not in keep_columns and c in locs.columns: keep_columns.append(c) locs = locs.loc[keep, keep_columns] return locs
[docs] def write_gage_package(location_data, gage_package_filename=None, gage_namfile_entries_file=None, model=None, sitename_col='site_no', gage_package_unit=25, start_gage_unit=200): """ Parameters ---------- location_data : pandas.DataFrame Table of observation locations. Must have columns: 'iseg': segment number 'ireach': reach number sitename_col: specified by sitename_col argument (default 'site_no') gage_package_filename : str or pathlike Name for the gage package file, which will be written to the model workspace (``model.model_ws``) of the supplied flopy model instance. This must also be manually added to the MODFLOW .nam file, for example:: GAGE 25 br_trans.gage Or, a new namefile can be written from the supplied flopy model instance. gage_namfile_entries_file : str or pathlike Namefile entries for the gage package output files will be written to this file. The contents of this file must then be manually copied and pasted into the MODFLOW .nam file. By default, None, in which case this file is named after gage_package_filename. model : flopy model instance Used for writing the gage package input file via flopy. sitename_col : str Unique name or number for each gage site. By default, 'site_no' gage_package_unit : int MODFLOW unit number to assign to gage package. By default, 25 start_gage_unit : int Modflow unit numbers for each gage package output file will be assigned sequentially starting at this number. By default, 200 Returns ------- """ if gage_package_filename is None: if model is not None: gage_package_filename = '{}.gage'.format(model.modelname) else: gage_package_filename = 'observations.gage' if gage_namfile_entries_file is None: gage_namfile_entries_file = gage_package_filename + '.namefile_entries' if model is not None: gage_package_filename = os.path.join(model.model_ws, os.path.split(gage_package_filename)[1]) gage_namfile_entries_file = os.path.join(model.model_ws, os.path.split(gage_namfile_entries_file)[1]) # read in stream gage observation locations from locate_flux_targets_in_SFR.py df = location_data.copy() df.sort_values(by=[sitename_col], inplace=True) df['gagefile'] = [f'{sitename}.ggo' for sitename in df[sitename_col]] if model is not None and flopy: # create flopy gage package object gage_data = fm.ModflowGage.get_empty(ncells=len(df)) gage_data['gageloc'] = df['iseg'] gage_data['gagerch'] = df['ireach'] gage_data['unit'] = np.arange(start_gage_unit, start_gage_unit + len(df)) gag = fm.ModflowGage(model, numgage=len(gage_data), gage_data=gage_data, files=df.gagefile.tolist(), unitnumber=gage_package_unit ) gag.fn_path = str(gage_package_filename) gag.write_file() else: raise NotImplementedError('writing a gage package without Flopy') with open(gage_namfile_entries_file, 'w') as output: for i, f in enumerate(df.gagefile.values): output.write('DATA {:d} {}\n'.format(gage_data.unit[i], f)) return gag
[docs] def write_mf6_sfr_obsfile(observation_locations, filename, sfr_output_filename, digits=5, print_input=True): """Write MODFLOW-6 observation input for the SFR package. Parameters ---------- observation_locations : DataFrame or str (filepath to csv file) line_id : unique identifier for observation locations filename : str File path for MODFLOW-6 SFR observation input file sfr_output_filename : str File path that SFR observation output file digits : int the number of significant digits with which simulated values are written to the output file. print_input : bool keyword to indicate that the list of observation information will be written to the listing file immediately after it is read. Returns ------- writes filename """ locs = observation_locations if isinstance(observation_locations, str) or isinstance(observation_locations, Path): locs = pd.read_csv(observation_locations) with open(filename, 'w') as output: output.write('BEGIN OPTIONS\n DIGITS {:d}\n'.format(digits)) if print_input: output.write(' PRINT_INPUT\n') output.write('END OPTIONS\n') output.write('BEGIN CONTINUOUS FILEOUT {}\n'.format(sfr_output_filename)) output.write('# obsname obstype rno\n') for i, r in locs.iterrows(): output.write(' {} {} {:d}\n'.format(r.obsname, r.obstype, r.rno)) output.write('END CONTINUOUS\n') print('wrote {}'.format(filename))