rflow is a package that allows cacheing of functions in memory, on disk or in both memory and disk.

Problems addressed by rflow

A common problem when processing data as part of a pipeline is avoiding unnecessary calculations. For example, if a function is called over and over with the same arguments, it should not recalculate the result each time but it should provide the cached (pre-computed) result.

While caching of the function output resolves the first problem, a second issue occurs when large data sets are being processed. In this case, hashing of the input arguments each time might take too long. This issue can be solved by hashing the data only once (as output) and then by noticing changes in the hash received by the downstream function. In other words, it is not the data that flows through the pipeline (as is the case with standard function), but hashes of the data.

A third issue is output sub-setting. When working with a pipeline there is often the case (e.g. ETL, Machine Learning) that we need to pass the whole data frame but the function is going to use only a subset (e.g. a CV fold). Since the main data frame has changes, caching of the result is no longer efficient. The solution involves hashing of the subset of interest which can be done by introducing additional intermediate functions in the pipeline. However, there is a loss of efficiency due to excessive rehashing as the main data frame passes through many functions.

The package rflow addresses these inefficiencies and makes pipelines as easy to use as in tidyverse.

Basics

Here’s how a simple rflow pipeline works with functions.

library(rflow)

x1 <- 10
x2 <- 0.5
x3 <- 2

f1 <- function(a, b, c = 1) {a * b + c}
f2 <- function(d, e) {d / e}

# passing the results downstream using functions
(o1 <- f1(x1, x2))  
#> [1] 6
(o2 <- f2(o1, x3)) 
#> [1] 3


# variant 1: declaring flows for each function using default options
rf1 <- flow_fn(x1, x2, fn = f1)
rf2 <- flow_fn(rf1, x3, fn = f2)

# collecting the results
collect(rf1)         
#> [1] 6
collect(rf2)         
#> [1] 3


# variant 2: arguments and functions withing one call
library(dplyr)                          # makes life easier 
flow_fn(x1, x2, fn = f1) %>%            # reuses cache created by ff1
  flow_fn(x3, fn = f2) %>%              # reuses cache created by ff2
  collect()                             # no actual re-calc takes place
#> [1] 3

Eddy functions

An eddy is a R6 object in which the rflow data is stored. It also contains information regarding the type of cache (memory, file or file-memory) used. Once an eddy is set, all data from the flows created afterwards is kept in it implicitly. Examples of eddy functions:

library(rflow)

# create a new eddy
new_eddy("new_eddy_example")
get_eddy("new_eddy_example")

# set "new_eddy_example" as current
set_current_eddy("new_eddy_example")

# the eddy used to store flowed_fn data will be "new_eddy_example"
fn <- function(x, y) { x + y }
flowed_fn <- flow_fn(1, 2, fn = fn)

# creates a new eddy or re-uses it if present
use_eddy("use_eddy")

# the eddy used to store flowed_fn data will now be "use_eddy"
fn <- function(x, y) { x + y }
flowed_fn <- flow_fn(1, 2, fn = fn)

delete_eddy("use_eddy")

# set the eddy to be used in future flow calls
set_current_eddy("new_eddy_example")
get_current_eddy()

# notice the message received due to creating a flow with the same function
# and the same arguments in "new_eddy_example"
fn <- function(x, y) { x + y }
flowed_fn <- flow_fn(1, 2, fn = fn)

# remove the eddy object
delete_eddy("new_eddy_example")

# create custom file cache used by a future eddy
cache <- cache_file("cache_dir")

new_eddy("custom_eddy",
         cache = cache)
delete_eddy("custom_eddy")

Flow functions

Flow functions encapsulate the main functionality of the rflow package. They allow cacheing of function output and return a pre-computed result for a function called with the same arguments. Use cases of the flow functions are presented below

library(rflow)

use_eddy("rflow_examples")
# the usage of flow_fn is strongly recommended
# it creates an implicit cache of a function and of the given flow call
# Note that the arguments fn, fn_id and flow_options must be named
# fn_id allows the user to suppress 
# console messages and  to explicitly indicate whether to reuse the old
# cache or create a new one.
fn1 <- function(x, y) { x + y + 1 }
flowed_fn <- flow_fn(2, 3, fn = fn1)
# by passing the created flow to the collect function, the pre-computed
# is obtained
collected_result <- flowed_fn %>%
    collect() # [1] 6

# if th previously created function changes its body,
# the flow object will update its state 
# when called again with the same arguments
fn1 <- function(x, y) { x + y }
flowed_fn <- flow_fn(2, 3, fn = fn1)
# by passing the created flow to the collect function, the pre-computed
# is obtained
collected_result <- flowed_fn %>%
    collect() # [1] 5

# the flow object created previously can also be passed as argument to another 
# flow_fn fn function
fn2 <- function(x, y) { x * y }
collected_pipe_result <- flowed_fn %>%
    flow_fn(2, fn = fn2) %>%
    collect() # [1] 10

# make_flow_fn creates an explicit cache of a function. In order to use the 
# functionality of an R6flow object, the output of make_flow_fn has to be 
# collected first
fn3 <- function(x, y) { x + y + 2 }
make_flow_function <- make_flow_fn(fn3)
rflow_function <- make_flow_function(2, 3)
flow_result <- rflow_function %>% 
        collect() # [1] 7

# usage with rflow pipes
make_flow_function2 <- make_flow_fn(fn2)
collected_pipe_flow <- make_flow_function(1, 2) %>%
    make_flow_function2(2) %>%
    collect() # [1] 10

# flow call creates an implicit cache of a function and of the given call.
# it is not recommended to use it
fn4 <- function(x, y) { x + y + 3 }
call_flow <- flow_call(fn4(x = 1, y = 2))
collected_result <- call_flow %>% 
                collect() # [1] 6

Flow elements

A R6Flow element represents an object containing cache information regarding an element from a named list returned by a flowed function.

library(rflow)

fn_list_output <- function(x, y) { list(x = x, y = y, Z = 6) }
flowed_fn <- flow_fn(2, 3, fn = fn_list_output)  
flow_element <- element(flowed_fn, "x")

# the name of the element can be extracted by using the '[' selector 
# as presented below
element_name <- flowed_fn["x"]$elem_name
# [1] "x"

Flow source and flow sink

The function flow_source creates a flow object that watches one or more files. This flow object does not throw an error if the file_path argument is missing, but it changes its state. Hece, it can be used to trigger a downstream flow object if the file is now present, changed or missing.

# write for the first time content in file and create flow
file_temp <- tempfile(pattern = "example_source")
write.csv(head(mtcars), file_temp, row.names = FALSE)
rflow_source <- flow_file_source(file_temp)
 
# write other content in the same file
# now the flow object will update its state
write.csv(tail(mtcars), file_temp, row.names = FALSE)
rflow_source <- flow_file_source(file_temp)
unlink(file_temp)

The function flow_ns_sink writes a value to a namespace only if the value has changed. The argument x represents the value to asign, the argument var_name represents the name (as a string) of the variable, ns is the namespace, either an environment or a Shiny::reactiveValues object. An example of how flow_ns_sink works with R6flow objects is presented below.

fn <- function(x, y) { x + y }
flowed_fn <- flow_fn(1, 2, fn = fn)
sunk_flow <- flow_ns_sink(flowed_fn, "test_sink_flow", new.env()) 

Flow for data frame and grouped data frame

The function flow_dfr performs row-wise caching of operations on data frame. Function fn operates on a data frame received as argument. fn will receive only the rows changed; it may drop some of the rows, but will not add any new rows. the function fn may return fewer or more columns or modify existing columns as long it always returns a consistent schema (i.e., the same column data types and names) for all calls. the data frame df passed to fn will include one additional column ..row_hash.. that must be returned as is in order to identify changes.

Arguments fn, fn_id and flow_options, when provided, must be named. Argument fn must be always provided.

library(rflow)
df_fn <- function(df, i = NULL) {
     if (is.null(i)) { 
         dfi <- df
         dfi$rm <- rowMeans(dfi[1:10])
    } else {
         dfi <- df[i, , drop = FALSE]
     }
     dfi
}

# the flow element can also become input for another flow_df function 
# in order to allow multiple, chained computations
dfr_flow <- flow_dfr(mtcars, 1, fn = df_fn)
collected_dfr <- dfr_flow %>%
                 collect()

# if one row of the dataframe changes, the R6Flow will change its state
mtcars[1, "mpg"] <- 22
dfr_flow <- flow_dfr(mtcars, 1, fn = df_fn)
collected_dfr <- dfr_flow %>%
                collect()

The function flow_dfg performs group-wise cacheing operations on data frame Function fn will receive only the rows and groups changed; it may drop some of the rows, but will not add any new rows. The function fn may return fewer or more columns or modify existing columns as long it always returns a consistent schema (i.e., the same column data types and names) for all calls.

dfg_fn <- function(df) {
    df <- df %>%
        dplyr::mutate(Sepal.Length = Sepal.Length * 2)
}

dfg_fn2 <- function(df) {
    df <- df %>%
        dplyr::mutate(Petal.Length = Petal.Length * 3)
}

iris <- iris %>%
    dplyr::group_by(Species)
dfg_flow <- flow_dfg(iris, fn = dfg_fn)
collected_dfg <- dfg_flow %>% collect()

# when a change in group is made, the flow object changes
iris[1, "Species"] <- "virginica"
dfg_flow <- flow_dfg(iris, fn = dfg_fn)
collected_dfg <- dfg_flow %>% collect()

# the flow element can also become input for another flow_dfg function
# in order to allow multiple, chained computations
collected_dfg2 <- dfg_flow %>%
   flow_dfg(fn = dfg_fn2, group_by = "Species") %>%
   collect()

Shiny

Shiny from RStudio uses reactive values to know what changes took place and what to recompute. It is thus possible to use a series of reactive elements in Shiny to prevent expensive re-computations from taking place.

library(shiny)
library(rflow)
library(ggplot2)

fn_add_mpg <- function(df, mpg) {
    df$mpg <- df$mpg + mpg
    df
}
fn_add_cyl <- function(df, cyl) {
    df$cyl <- df$cyl + cyl
    df
}

if (interactive()) {
    options(device.ask.default = FALSE)

    app <- shinyApp(
        ui = fluidPage(
            numericInput("mpg", "mpg", 1), #add mpg
            numericInput("cyl", "cyl", 2), #add cyl
            plotOutput("mpg_vs_cyl")
        ),
        server = function(input, output) {
            rv_fn_add_mpg <- reactive({ #add mpg
                # use two reactives in order to not re-calculate
                # mtcars2 each time the input changes
                fn_add_mpg(mtcars, input$mpg)
            })
            # it only changes when cyl changes its value
            rv_fn_add_cyl <- reactive({
                fn_add_cyl(rv_fn_add_mpg(), input$cyl)
            })
            rv_plot <- reactive({
                mtcars2 <- rv_fn_add_cyl()
                mtcars_plot <- ggplot2::ggplot(mtcars2, aes(x = cyl, y = mpg)) +
                    ggplot2::geom_point()
                mtcars_plot
            })
            output$mpg_vs_cyl <- renderPlot(rv_plot())
        }
    )

    runApp(app)
}

The downside is that we need one reactive element for each function in the pipeline - this makes data processing dependent on UI / Shiny. Using rflow, we can separate the UI from the data processing, maintaining the caching not only for the current state but for all previously computed states.

library(shiny)
library(rflow)
library(ggplot2)

fn_add_mpg <- function(df, mpg) {
    df$mpg <- df$mpg + mpg
    df
}
fn_add_cyl <- function(df, cyl) {
    df$cyl <- df$cyl + cyl
    df
}

if (interactive()) {
    options(device.ask.default = FALSE)
    
    app <- shinyApp(
        ui = fluidPage(
            numericInput("mpg", "mpg", 1), #add mpg
            numericInput("cyl", "cyl", 2), #add cyl
            plotOutput("mpg_vs_cyl")
        ),
        server = function(input, output) {
            # reacts to both mpg and cyl changes simultaneously but if nothing 
            # has changed, the function result is taken from cache. 
            # If only one value has changed, that value will be recalculated
            
            rv_plot <- reactive({
                mtcars2 <- mtcars %>%
                    flow_fn(input$mpg, fn = fn_add_mpg) %>%
                    flow_fn(input$cyl, fn = fn_add_cyl) %>%
                    collect() 
                mtcars_plot <- ggplot2::ggplot(mtcars2, aes(x = cyl, y = mpg)) +
                    ggplot2::geom_point()
                mtcars_plot
            })
            output$mpg_vs_cyl <- renderPlot(rv_plot())
        }
    )
    
    runApp(app)
}

While a similar workflow can be achieved with package memoise, it suffers from several disadvantages (below).

Memoise

Package memoise by Hadley Wickham, Jim Hester and others was the main source of inspiration. Memoise is elegant, fast, simple to use, but it suffers from certain limitations that we hope to overcome in this package:

  • excessive rehashing of inputs
  • only one cache layer (although its cache framework is extensible)
  • no input/output sub-setting, it uses the complete set of arguments provided
  • no reactivity (yet to be implemented in rflow)

Drake

Package drake by Will Landau and others provides a complete framework for large data sets, including using files as inputs and outputs. The downside is that it requires additional overhead to get started and its focus is on the pipeline as a whole. If your work requires many hours of computations (which increases the value of each result), the overhead due to the setup has a relatively lower cost - in this scenario drake is an excellent choice.

Summary

Package rflow is somewhere between memoise and drake:

  • one can start using rflow right away, with minimal overhead
  • allows focusing on the data processing (e.g., EDA) and not on the framework