In this note we will show how to speed up work in R by partitioning data and process-level parallelization. We will show the technique with three different R packages: rqdatatable, data.table, and dplyr. The methods shown will also work with base-R and other packages.
For each of the above packages we speed up work by using wrapr::execute_parallel which in turn uses wrapr::partition_tables to partition un-related data.frame rows and then distributes them to different processors to be executed. rqdatatable::ex_data_table_parallel conveniently bundles all of these steps together when working with rquery pipelines.
The partitioning is specified by the user preparing a grouping column that tells the system which sets of rows must be kept together in a correct calculation. We are going to try to demonstrate everything with simple code examples, and minimal discussion.
Keep in mind: unless the pipeline steps have non-trivial cost, the overhead of partitioning and distributing the work may overwhelm any parallel speedup. Also data.table itself already seems to exploit some thread-level parallelism (notice user time is greater than elapsed time). That being said, in this note we will demonstrate a synthetic example where computation is expensive due to a blow-up in an intermediate join step.
First we set up our execution environment and example (some details: OSX 10.13.4 on a 2.8 GHz Intel Core i5 Mac Mini (Late 2015 model) with 8GB RAM and hybrid disk drive).
Loading required package: rquery
1 | |
ncore <- parallel::detectCores() print(ncore)
[1] 4
1 | |
set.seed(2362) mk_example <- function(nkey, nrep, ngroup = 20) { keys <- paste0(“key_”, seq_len(nkey)) key_group <- sample(as.character(seq_len(ngroup)), length(keys), replace = TRUE) names(key_group) <- keys key_table <- data.frame( key = rep(keys, nrep), stringsAsFactors = FALSE) key_table$data <- runif(nrow(key_table)) instance_table <- data.frame( key = rep(keys, nrep), stringsAsFactors = FALSE) instance_table$id <- seq_len(nrow(instance_table)) instance_table$info <- runif(nrow(instance_table)) # groups should be no finer than keys key_table$key_group <- key_group[key_table$key] instance_table$key_group <- key_group[instance_table$key] list(key_table = key_table, instance_table = instance_table) }
dlist <- mk_example(10, 10) data <- dlist$instance_table annotation <- dlist$key_table
1 | |
possible data lookup: find rows that
have lookup data <= info
optree <- local_td(data) %.>% natural_join(., local_td(annotation), jointype = “INNER”, by = “key”) %.>% select_rows_nse(., data <= info) %.>% pick_top_k(., k = 1, partitionby = “id”, orderby = “data”, reverse = “data”, keep_order_column = FALSE) %.>% orderby(., “id”) cat(format(optree))
1 | |
res1 <- ex_data_table(optree) head(res1)
1 | |
And we can execute the operations in parallel.
1 | |
[[1]]
[1] “rqdatatable” “rquery” “stats” “graphics” “grDevices”
[6] “utils” “datasets” “methods” “base”
##
[[2]]
[1] “rqdatatable” “rquery” “stats” “graphics” “grDevices”
[6] “utils” “datasets” “methods” “base”
##
[[3]]
[1] “rqdatatable” “rquery” “stats” “graphics” “grDevices”
[6] “utils” “datasets” “methods” “base”
##
[[4]]
[1] “rqdatatable” “rquery” “stats” “graphics” “grDevices”
[6] “utils” “datasets” “methods” “base”
1 | |
data id info key key_group
1: 0.9152014 1 0.9860654 key_1 20
2: 0.5599810 2 0.5857570 key_2 8
3: 0.3011882 3 0.3334490 key_3 10
4: 0.3650987 4 0.3960980 key_4 5
5: 0.1469254 5 0.1753649 key_5 14
6: 0.2567631 6 0.3510280 key_6 7
[1] 94
1 | |
##
Attaching package: ‘data.table’
The following objects are masked from ‘package:dplyr’:
##
between, first, last
packageVersion(“data.table”)
[1] ‘1.11.4’
1 | |
id key info key_group.x data key_group.y
1: 1 key_1 0.9860654 20 0.9152014 20
2: 2 key_2 0.5857570 8 0.5599810 8
3: 3 key_3 0.3334490 10 0.3011882 10
4: 4 key_4 0.3960980 5 0.3650987 5
5: 5 key_5 0.1753649 14 0.1469254 14
6: 6 key_6 0.3510280 7 0.2567631 7
[1] 94
1 | |
[[1]]
[1] “data.table” “rqdatatable” “rquery” “stats” “graphics”
[6] “grDevices” “utils” “datasets” “methods” “base”
##
[[2]]
[1] “data.table” “rqdatatable” “rquery” “stats” “graphics”
[6] “grDevices” “utils” “datasets” “methods” “base”
##
[[3]]
[1] “data.table” “rqdatatable” “rquery” “stats” “graphics”
[6] “grDevices” “utils” “datasets” “methods” “base”
##
[[4]]
[1] “data.table” “rqdatatable” “rquery” “stats” “graphics”
[6] “grDevices” “utils” “datasets” “methods” “base”
1 | |
id key info key_group.x data key_group.y
1: 1 key_1 0.9860654 20 0.9152014 20
2: 2 key_2 0.5857570 8 0.5599810 8
3: 3 key_3 0.3334490 10 0.3011882 10
4: 4 key_4 0.3960980 5 0.3650987 5
5: 5 key_5 0.1753649 14 0.1469254 14
6: 6 key_6 0.3510280 7 0.2567631 7
[1] 94
1 | |
dplyr_pipeline <- function(data, annotation) { res <- data %>% inner_join(annotation, by = “key”) %>% filter(data <= info) %>% group_by(id) %>% arrange(-data) %>% mutate(rownum = row_number()) %>% ungroup() %>% filter(rownum == 1) %>% arrange(id) res }
resd <- dplyr_pipeline(data, annotation) head(resd)
1 |
|
And we can use wrapr::execute_parallel to parallelize the dplyr solution.
parallel::clusterEvalQ(cl, library(“dplyr”))
1 | |
parallel::clusterExport(cl, “dplyr_pipeline”)
dplyr_f <- function(tables_list) { data <- tables_list$data annotation <- tables_list$annotation dplyr_pipeline(data, annotation) }
dplyr_parallel_f <- function(data, annotation) { respdt <- wrapr::execute_parallel( tables = list(data = data, annotation = annotation), f = dplyr_f, partition_column = “key_group”, cl = cl) %>% dplyr::bind_rows() %>% arrange(id) } respdplyr <- dplyr_parallel_f(data, annotation) head(respdplyr)
1 |
|
We can benchmark the various realizations.
1 | |
Unit: seconds
expr min lq mean median uq
data_table_parallel 5.274560 5.457105 5.609827 5.546554 5.686829
data_table 9.401677 9.496280 9.701807 9.541218 9.748159
rqdatatable_parallel 7.165216 7.497561 7.587663 7.563883 7.761987
rqdatatable 12.490469 12.700474 13.320480 12.898154 14.229233
dplyr_parallel 6.492262 6.572062 6.784865 6.787277 6.875076
dplyr 20.056555 20.450064 20.647073 20.564529 20.800350
max neval
6.265888 10
10.419316 10
7.949404 10
14.282269 10
7.328223 10
21.332103 10
1 | |
The benchmark timings show parallelized data.table is the fastest, followed by parallelized dplyr, and parallelized rqdatatable. In the non-paraellized case data.table is the fastest, followed by rqdatatable, and then dplyr.
A reason dplyr sees greater speedup relative to its own non-parallel implementation (yet does not beat data.table) is that data.table starts already multi-threaded, so data.table is exploiting some parallelism even before we added the process level parallelism (and hence sees less of a speed up, though it is fastest).
rquery pipelines exhibit superior performance on big data systems (Spark, PostgreSQL, Amazon Redshift, and hopefully soon Google bigquery), and rqdatatable supplies a very good in-memory implementation of the rquery system based on data.table. rquery also speeds up solution development by supplying higher order operators and early debugging features.
In this note we have demonstrated simple procedures to reliably parallelize any of rqdatatable, data.table, or dplyr.
Note: we did not include alternatives such as multidplyr or dtplyr in the timings, as they did not appear to work on this example.
Note: Matt Summersgill pointed out for this sort of problem data.table is far more effective than I have originally shown. rqdatatable can not take advantage of this technique, because I have not yet truly implemented theta-joins in the rqdatatable dialect of rquery (it is on my TODO-list).
A re-rendering of this article can be found here, source code here, and raw timings here.
Like this:
Like Loading…
Related