rflow is a package that allows cacheing of functions in memory, on disk or in both memory and disk.
A common problem when processing data as part of a pipeline is avoiding unnecessary calculations. For example, if a function is called over and over with the same arguments, it should not recalculate the result each time but it should provide the cached (pre-computed) result.
While caching of the function output resolves the first problem, a second issue occurs when large data sets are being processed. In this case, hashing of the input arguments each time might take too long. This issue can be solved by hashing the data only once (as output) and then by noticing changes in the hash received by the downstream function. In other words, it is not the data that flows through the pipeline (as is the case with standard function), but hashes of the data.
A third issue is output sub-setting. When working with a pipeline there is often the case (e.g. ETL, Machine Learning) that we need to pass the whole data frame but the function is going to use only a subset (e.g. a CV fold). Since the main data frame has changes, caching of the result is no longer efficient. The solution involves hashing of the subset of interest which can be done by introducing additional intermediate functions in the pipeline. However, there is a loss of efficiency due to excessive rehashing as the main data frame passes through many functions.
The package rflow addresses these inefficiencies and makes pipelines as easy to use as in tidyverse.
Here’s how a simple rflow pipeline works with functions.
library(rflow)
x1 <- 10
x2 <- 0.5
x3 <- 2
f1 <- function(a, b, c = 1) {a * b + c}
f2 <- function(d, e) {d / e}
# passing the results downstream using functions
(o1 <- f1(x1, x2))
#> [1] 6
(o2 <- f2(o1, x3))
#> [1] 3
# variant 1: declaring flows for each function using default options
rf1 <- flow_fn(x1, x2, fn = f1)
rf2 <- flow_fn(rf1, x3, fn = f2)
# collecting the results
collect(rf1)
#> [1] 6
collect(rf2)
#> [1] 3
# variant 2: arguments and functions withing one call
library(dplyr) # makes life easier
flow_fn(x1, x2, fn = f1) %>% # reuses cache created by ff1
flow_fn(x3, fn = f2) %>% # reuses cache created by ff2
collect() # no actual re-calc takes place
#> [1] 3
An eddy
is a R6
object in which the rflow data is stored. It also contains information regarding the type of cache (memory, file or file-memory) used. Once an eddy is set, all data from the flows created afterwards is kept in it implicitly. Examples of eddy functions:
library(rflow)
# create a new eddy
new_eddy("new_eddy_example")
get_eddy("new_eddy_example")
# set "new_eddy_example" as current
set_current_eddy("new_eddy_example")
# the eddy used to store flowed_fn data will be "new_eddy_example"
fn <- function(x, y) { x + y }
flowed_fn <- flow_fn(1, 2, fn = fn)
# creates a new eddy or re-uses it if present
use_eddy("use_eddy")
# the eddy used to store flowed_fn data will now be "use_eddy"
fn <- function(x, y) { x + y }
flowed_fn <- flow_fn(1, 2, fn = fn)
delete_eddy("use_eddy")
# set the eddy to be used in future flow calls
set_current_eddy("new_eddy_example")
get_current_eddy()
# notice the message received due to creating a flow with the same function
# and the same arguments in "new_eddy_example"
fn <- function(x, y) { x + y }
flowed_fn <- flow_fn(1, 2, fn = fn)
# remove the eddy object
delete_eddy("new_eddy_example")
# create custom file cache used by a future eddy
cache <- cache_file("cache_dir")
new_eddy("custom_eddy",
cache = cache)
delete_eddy("custom_eddy")
Flow functions encapsulate the main functionality of the rflow
package. They allow cacheing of function output and return a pre-computed result for a function called with the same arguments. Use cases of the flow functions are presented below
library(rflow)
use_eddy("rflow_examples")
# the usage of flow_fn is strongly recommended
# it creates an implicit cache of a function and of the given flow call
# Note that the arguments fn, fn_id and flow_options must be named
# fn_id allows the user to suppress
# console messages and to explicitly indicate whether to reuse the old
# cache or create a new one.
fn1 <- function(x, y) { x + y + 1 }
flowed_fn <- flow_fn(2, 3, fn = fn1)
# by passing the created flow to the collect function, the pre-computed
# is obtained
collected_result <- flowed_fn %>%
collect() # [1] 6
# if th previously created function changes its body,
# the flow object will update its state
# when called again with the same arguments
fn1 <- function(x, y) { x + y }
flowed_fn <- flow_fn(2, 3, fn = fn1)
# by passing the created flow to the collect function, the pre-computed
# is obtained
collected_result <- flowed_fn %>%
collect() # [1] 5
# the flow object created previously can also be passed as argument to another
# flow_fn fn function
fn2 <- function(x, y) { x * y }
collected_pipe_result <- flowed_fn %>%
flow_fn(2, fn = fn2) %>%
collect() # [1] 10
# make_flow_fn creates an explicit cache of a function. In order to use the
# functionality of an R6flow object, the output of make_flow_fn has to be
# collected first
fn3 <- function(x, y) { x + y + 2 }
make_flow_function <- make_flow_fn(fn3)
rflow_function <- make_flow_function(2, 3)
flow_result <- rflow_function %>%
collect() # [1] 7
# usage with rflow pipes
make_flow_function2 <- make_flow_fn(fn2)
collected_pipe_flow <- make_flow_function(1, 2) %>%
make_flow_function2(2) %>%
collect() # [1] 10
# flow call creates an implicit cache of a function and of the given call.
# it is not recommended to use it
fn4 <- function(x, y) { x + y + 3 }
call_flow <- flow_call(fn4(x = 1, y = 2))
collected_result <- call_flow %>%
collect() # [1] 6
A R6Flow
element represents an object containing cache information regarding an element from a named list returned by a flowed function.
library(rflow)
fn_list_output <- function(x, y) { list(x = x, y = y, Z = 6) }
flowed_fn <- flow_fn(2, 3, fn = fn_list_output)
flow_element <- element(flowed_fn, "x")
# the name of the element can be extracted by using the '[' selector
# as presented below
element_name <- flowed_fn["x"]$elem_name
# [1] "x"
The function flow_source
creates a flow object that watches one or more files. This flow object does not throw an error if the file_path
argument is missing, but it changes its state. Hece, it can be used to trigger a downstream flow object if the file is now present, changed or missing.
# write for the first time content in file and create flow
file_temp <- tempfile(pattern = "example_source")
write.csv(head(mtcars), file_temp, row.names = FALSE)
rflow_source <- flow_file_source(file_temp)
# write other content in the same file
# now the flow object will update its state
write.csv(tail(mtcars), file_temp, row.names = FALSE)
rflow_source <- flow_file_source(file_temp)
unlink(file_temp)
The function flow_ns_sink
writes a value to a namespace only if the value has changed. The argument x represents the value to asign, the argument var_name
represents the name (as a string) of the variable, ns
is the namespace, either an environment
or a Shiny::reactiveValues
object. An example of how flow_ns_sink
works with R6flow
objects is presented below.
fn <- function(x, y) { x + y }
flowed_fn <- flow_fn(1, 2, fn = fn)
sunk_flow <- flow_ns_sink(flowed_fn, "test_sink_flow", new.env())
The function flow_dfr
performs row-wise caching of operations on data frame. Function fn
operates on a data frame received as argument. fn
will receive only the rows changed; it may drop some of the rows, but will not add any new rows. the function fn
may return fewer or more columns or modify existing columns as long it always returns a consistent schema (i.e., the same column data types and names) for all calls. the data frame df
passed to fn
will include one additional column ..row_hash..
that must be returned as is in order to identify changes.
Arguments fn
, fn_id
and flow_options
, when provided, must be named. Argument fn
must be always provided.
library(rflow)
df_fn <- function(df, i = NULL) {
if (is.null(i)) {
dfi <- df
dfi$rm <- rowMeans(dfi[1:10])
} else {
dfi <- df[i, , drop = FALSE]
}
dfi
}
# the flow element can also become input for another flow_df function
# in order to allow multiple, chained computations
dfr_flow <- flow_dfr(mtcars, 1, fn = df_fn)
collected_dfr <- dfr_flow %>%
collect()
# if one row of the dataframe changes, the R6Flow will change its state
mtcars[1, "mpg"] <- 22
dfr_flow <- flow_dfr(mtcars, 1, fn = df_fn)
collected_dfr <- dfr_flow %>%
collect()
The function flow_dfg
performs group-wise cacheing operations on data frame Function fn
will receive only the rows and groups changed; it may drop some of the rows, but will not add any new rows. The function fn
may return fewer or more columns or modify existing columns as long it always returns a consistent schema (i.e., the same column data types and names) for all calls.
dfg_fn <- function(df) {
df <- df %>%
dplyr::mutate(Sepal.Length = Sepal.Length * 2)
}
dfg_fn2 <- function(df) {
df <- df %>%
dplyr::mutate(Petal.Length = Petal.Length * 3)
}
iris <- iris %>%
dplyr::group_by(Species)
dfg_flow <- flow_dfg(iris, fn = dfg_fn)
collected_dfg <- dfg_flow %>% collect()
# when a change in group is made, the flow object changes
iris[1, "Species"] <- "virginica"
dfg_flow <- flow_dfg(iris, fn = dfg_fn)
collected_dfg <- dfg_flow %>% collect()
# the flow element can also become input for another flow_dfg function
# in order to allow multiple, chained computations
collected_dfg2 <- dfg_flow %>%
flow_dfg(fn = dfg_fn2, group_by = "Species") %>%
collect()
Shiny from RStudio uses reactive values to know what changes took place and what to recompute. It is thus possible to use a series of reactive elements in Shiny to prevent expensive re-computations from taking place.
library(shiny)
library(rflow)
library(ggplot2)
fn_add_mpg <- function(df, mpg) {
df$mpg <- df$mpg + mpg
df
}
fn_add_cyl <- function(df, cyl) {
df$cyl <- df$cyl + cyl
df
}
if (interactive()) {
options(device.ask.default = FALSE)
app <- shinyApp(
ui = fluidPage(
numericInput("mpg", "mpg", 1), #add mpg
numericInput("cyl", "cyl", 2), #add cyl
plotOutput("mpg_vs_cyl")
),
server = function(input, output) {
rv_fn_add_mpg <- reactive({ #add mpg
# use two reactives in order to not re-calculate
# mtcars2 each time the input changes
fn_add_mpg(mtcars, input$mpg)
})
# it only changes when cyl changes its value
rv_fn_add_cyl <- reactive({
fn_add_cyl(rv_fn_add_mpg(), input$cyl)
})
rv_plot <- reactive({
mtcars2 <- rv_fn_add_cyl()
mtcars_plot <- ggplot2::ggplot(mtcars2, aes(x = cyl, y = mpg)) +
ggplot2::geom_point()
mtcars_plot
})
output$mpg_vs_cyl <- renderPlot(rv_plot())
}
)
runApp(app)
}
The downside is that we need one reactive element for each function in the pipeline - this makes data processing dependent on UI / Shiny. Using rflow
, we can separate the UI from the data processing, maintaining the caching not only for the current state but for all previously computed states.
library(shiny)
library(rflow)
library(ggplot2)
fn_add_mpg <- function(df, mpg) {
df$mpg <- df$mpg + mpg
df
}
fn_add_cyl <- function(df, cyl) {
df$cyl <- df$cyl + cyl
df
}
if (interactive()) {
options(device.ask.default = FALSE)
app <- shinyApp(
ui = fluidPage(
numericInput("mpg", "mpg", 1), #add mpg
numericInput("cyl", "cyl", 2), #add cyl
plotOutput("mpg_vs_cyl")
),
server = function(input, output) {
# reacts to both mpg and cyl changes simultaneously but if nothing
# has changed, the function result is taken from cache.
# If only one value has changed, that value will be recalculated
rv_plot <- reactive({
mtcars2 <- mtcars %>%
flow_fn(input$mpg, fn = fn_add_mpg) %>%
flow_fn(input$cyl, fn = fn_add_cyl) %>%
collect()
mtcars_plot <- ggplot2::ggplot(mtcars2, aes(x = cyl, y = mpg)) +
ggplot2::geom_point()
mtcars_plot
})
output$mpg_vs_cyl <- renderPlot(rv_plot())
}
)
runApp(app)
}
While a similar workflow can be achieved with package memoise
, it suffers from several disadvantages (below).
Package memoise by Hadley Wickham, Jim Hester and others was the main source of inspiration. Memoise is elegant, fast, simple to use, but it suffers from certain limitations that we hope to overcome in this package:
rflow
)Package drake by Will Landau and others provides a complete framework for large data sets, including using files as inputs and outputs. The downside is that it requires additional overhead to get started and its focus is on the pipeline as a whole. If your work requires many hours of computations (which increases the value of each result), the overhead due to the setup has a relatively lower cost - in this scenario drake
is an excellent choice.