Skip to contents

Increasingly there are high frequency sensor data available for water quality data. There’s a common need to join the sensor and discrete data by the closest time. This article will discuss how to do that with common tools, then we’ll put those techniques together in a function.

dataRetrieval

Let’s look at site “01646500”, and a nearby site with a real-time nitrate-plus-nitrite sensor.

library(dataRetrieval)

site_uv <- "01646500"
site_qw <- "USGS-01646580"
pcode_uv <- "99133"
pcode_qw <- "00631"
uv_data <- readNWISuv(site_uv, pcode_uv)
start_sensor_date <- as.Date(min(uv_data$dateTime))
qw_data <- readWQPqw(site_qw, pcode_qw,
                     startDate = start_sensor_date)

First let’s trim down the data sets so that they are easy to print in this document.

The sensor data (“uv” data) has 2 columns of data that are important. The first task is to combine those columns. This is rather unique to this particular site and probably won’t need to be done generally.

library(dplyr)

uv_trim <- uv_data |> 
  select(uv_date = dateTime, 
         val1 = X_SUNA...Discontinued._99133_00000,
         val2 = X_SUNA_99133_00000) |> 
  mutate(val_uv = if_else(is.na(val1), val2, val1)) |> 
  select(-val1, -val2)
uv_date val_uv
2011-12-03 05:00:00 1.29
2011-12-03 05:15:00 1.29
2011-12-03 05:30:00 1.29
2011-12-03 05:45:00 1.28
2011-12-03 06:00:00 1.27
2011-12-03 06:15:00 1.27

Next we’ll clean up the discrete water quality “qw” data to make it easy to follow in this tutorial.

qw_trim <- qw_data |> 
  filter(ActivityTypeCode == "Sample-Routine",
         !is.na(ActivityStartDateTime)) |> 
  arrange(ActivityStartDateTime) |> 
  select(qw_date = ActivityStartDateTime,
         val_qw = ResultMeasureValue,
         det_txt = ResultDetectionConditionText)
qw_date val_qw det_txt
2012-01-11 14:45:00 1.810 NA
2012-02-01 14:45:00 1.570 NA
2012-03-01 18:15:00 1.140 NA
2012-03-06 14:45:00 1.120 NA
2012-04-03 13:45:00 1.100 NA
2012-05-01 14:15:00 0.864 NA

Now we’ll use the data.table package to do a join to the nearest date. The code to do that is here:

library(data.table)
# coerce to data.table and append join columns to preserve the original columns 
setDT(qw_trim)[, join_date := qw_date]

setDT(uv_trim)[, join_date := uv_date]
# rolling join
closest_dt <- uv_trim[qw_trim, on = .(join_date), roll = "nearest"]

closest_dt is a data.table object. It similar to a data.frame, but not identical. We can convert it to a data.frame and then use dplyr commands. Note: the whole analysis can be done via data.table, but most examples in EGRET have used dplyr, which is why we bring it back to data.frame. dplyr also has a join_by(closest()) option, but it is more complicated because you can only specify the closeness in either the forward or backwards direction (and we want either direction).

We first calculate “delta_time” - the difference in time between the uv and qw data. Then, if that difference is greater than 24 hours, we’ll substitute NA.

qw_closest <- data.frame(closest_dt) |> 
  mutate(delta_time = difftime(qw_date, uv_date, units="hours"),
         val_uv = if_else(abs(as.numeric(delta_time)) >= 24, NA, val_uv)) |> 
  select(-join_date)
uv_date val_uv qw_date val_qw det_txt delta_time
2012-01-11 14:45:00 1.75 2012-01-11 14:45:00 1.810 NA 0 hours
2012-02-01 14:45:00 1.51 2012-02-01 14:45:00 1.570 NA 0 hours
2012-03-01 18:15:00 1.14 2012-03-01 18:15:00 1.140 NA 0 hours
2012-03-06 14:45:00 1.15 2012-03-06 14:45:00 1.120 NA 0 hours
2012-04-03 13:45:00 1.06 2012-04-03 13:45:00 1.100 NA 0 hours
2012-05-01 14:15:00 0.89 2012-05-01 14:15:00 0.864 NA 0 hours

Putting it together

A more realistic scenario might be that we want to create an EGRET Sample data frame, and get the real-time water quality and flow values joined by the closest date/time.

We’ll re-pull the real-time data, and this time include discharge:

library(EGRET)

# Sensor discharge:
uv_flow_qw <- readNWISuv(site_uv, c(pcode_uv, "00060"), 
                         startDate = start_sensor_date)

#special cleanup needed at this site:
uv_flow_qw2 <- uv_flow_qw |> 
  rename(val1 = X_SUNA...Discontinued._99133_00000,
         val2 = X_SUNA_99133_00000,
         rmk1 = X_SUNA...Discontinued._99133_00000_cd,
         rmk2 = X_SUNA_99133_00000_cd) |> 
  mutate(qw_val_uv = if_else(is.na(val1), val2, val1),
         qw_rmk_uv = if_else(is.na(rmk1), rmk2, rmk1)) |> 
  select(-val1, -val2, -rmk1, -rmk2)

Here we create a function that does the data.table join, and then continues the steps necessary to obtain a Sample data.frame.

The inputs are:

Argument Description
qw_data Data frame with discrete water quality data
uv_flow_qw Data frame with real-time (high frequency) water quality and/or flow data
hour_threshold Number of hours that the dates need to be within to match up
join_by_qw Name of the date/time column in the qw_data data frame to join by
join_by_uv Name of the date/time column in the uv_flow_qw data frame to join by
qw_val Name of the water quality value column in the qw_data data frame
qw_rmk Name of the water quality remark column in the qw_data data frame
qw_det_val Name of the
qw_val_uv Name of the water quality value column in the uv_flow_qw data frame
qw_rmk_uv Name of the water quality remark column in the uv_flow_qw data frame
flow_val Name of the flow value column in the uv_flow_qw data frame
flow_rmk Name of the flow remark column in the uv_flow_qw data frame

Default values are provided that match the output of readWQPqw.

join_qw_uv <- function(qw_data, # data from readWQP
                       uv_flow_qw, # data from readNWISuv
                       hour_threshold = 24, # hours threshold for joining
                       join_by_qw = "ActivityStartDateTime", 
                       join_by_uv = "dateTime",
                       qw_val = "ResultMeasureValue",
                       qw_rmk = "ResultDetectionConditionText",
                       qw_det_val = "DetectionQuantitationLimitMeasure.MeasureValue",
                       qw_val_uv, # water quality value column in uv data
                       qw_rmk_uv, # water quality remark column in uv data
                       flow_val = "X_00060_00000", # uv flow parameter
                       flow_rmk = "X_00060_00000_cd"){ # uv flow parameter cd
  
  req_cols <- c(join_by_qw, qw_val, qw_rmk, qw_det_val)
  if(!all(req_cols %in% names(qw_data))){
    stop(paste('qw_data missing columns:', req_cols[!req_cols %in% names(qw_data)]))
  }
  
  req_cols_uv <- c(join_by_uv)
  if(!all(req_cols_uv %in% names(uv_flow_qw))){
    stop(paste('uv_data missing columns:', req_cols_uv[!req_cols_uv %in% names(uv_flow_qw)]))
  }
  
  setDT(qw_data)[, eval(parse(text = paste("join_date :=", join_by_qw)))]
  
  x <- qw_data[order(join_by_qw)]
  
  setDT(uv_flow_qw)[, eval(parse(text = paste("join_date :=", join_by_uv)))]
  
  # rolling join
  x <- uv_flow_qw[qw_data, on = .(join_date), roll = "nearest"]
  
  setnames(x, c(qw_val, join_by_uv, join_by_qw, qw_rmk, qw_det_val),
           c("val_qw","uv_date", "qw_date", "qw_rmk", "qw_det_val"))
  
  x_tib <- as_tibble(x)
  
  if(!is.na(flow_val) | flow_val != ""){
    x_tib$flow_uv <- x_tib[[flow_val]]
  }
  if(!is.na(flow_rmk) | flow_rmk != ""){
    x_tib$flow_rmk_uv <- x_tib[[flow_rmk]]
  }
  
  if(!is.na(qw_val_uv) | qw_val_uv != ""){
    x_tib$qw_val_uv <- x_tib[[qw_val_uv]]
  }
  if(!is.na(qw_rmk_uv) | qw_rmk_uv != ""){
    x_tib$qw_rmk_uv <- x_tib[[qw_rmk_uv]]
  }

  toMatch <- c("NON-DETECT", "NON DETECT", "NOT DETECTED",
             "DETECTED NOT QUANTIFIED", "BELOW QUANTIFICATION LIMIT")
  
  x_tib <- x_tib |> 
    mutate(delta_time = difftime(qw_date, uv_date, units = "hours"),
         qw_val_uv = if_else(abs(as.numeric(delta_time)) >= hour_threshold, 
                          NA, qw_val_uv),
         qualifier = if_else(grepl(paste(toMatch,collapse="|"),
                                   toupper(qw_rmk)),
                             "<", ""),
         value = if_else(qualifier == "<", qw_det_val, val_qw),
         date = as.Date(qw_date)) |> 
    select(any_of(c("uv_date", "qw_date", "delta_time", "date",
           "qw_val_uv", "qw_rmk_uv",
           "value", "qualifier", 
           "flow_uv", "flow_rmk_uv"))) |> 
    rename(dateTime = qw_date) 
  

  compressedData <- EGRET::compressData(x_tib[, c("date",
                                                "qualifier",
                                                "value")],
                                        verbose = FALSE)
  Sample <- EGRET::populateSampleColumns(compressedData)
  Sample <- Sample |>
    left_join(x_tib |>
                select(-qualifier) |> 
                rename(qw_dateTime = dateTime,
                       uv_dateTime = uv_date,
                       Date = date, 
                       ConcHigh = value),
              by = c("Date", "ConcHigh"))
  
  return(Sample)
  
}

Running the function:

Sample <- join_qw_uv(qw_data = qw_data,
                     uv_flow_qw =  uv_flow_qw2, 
                     hour_threshold = 24,
                     join_by_qw = "ActivityStartDateTime",
                     join_by_uv = "dateTime",
                     qw_val_uv = "qw_val_uv",
                     qw_rmk_uv = "qw_rmk_uv",
                     flow_val = "X_00060_00000",
                     flow_rmk = "X_00060_00000_cd")

In addition to the standard Sample output, you’ll get:

Column Description
uv_dateTime The date time from the uv data that was closest to the qw data.
qw_dateTime The date time from the qw data frame.
delta_time The difference in time in hours.
qw_val_uv The water-quality uv value that was closest to the qw data.
qw_rmk_uv The water-quality uv remark that was closest to the qw data.
flow_uv The flow value closest to the qw data.
flow_rmk_uv The flow remark closest to the qw data.

Using the function without real-time flow data:

Sample2 <- join_qw_uv(qw_data = qw_data,
                     uv_flow_qw =  uv_flow_qw2, 
                     hour_threshold = 24,
                     join_by_qw = "ActivityStartDateTime",
                     join_by_uv = "dateTime",
                     qw_val_uv = "qw_val_uv",
                     qw_rmk_uv = "qw_rmk_uv",
                     flow_val = "",
                     flow_rmk = "")

Using the function without real-time qw data:

Sample3 <- join_qw_uv(qw_data = qw_data,
                     uv_flow_qw =  uv_flow_qw2, 
                     hour_threshold = 24,
                     join_by_qw = "ActivityStartDateTime",
                     join_by_uv = "dateTime",
                     qw_val_uv = "",
                     qw_rmk_uv = "",
                     flow_val = "X_00060_00000",
                     flow_rmk = "X_00060_00000_cd")