In this note we will show how to speed up work in R
by partitioning data and process-level parallelization. We will show the technique with three different R
packages: rqdatatable
, data.table
, and dplyr
. The methods shown will also work with base-R
and other packages.
For each of the above packages we speed up work by using wrapr::execute_parallel
which in turn uses wrapr::partition_tables
to partition un-related data.frame
rows and then distributes them to different processors to be executed. rqdatatable::ex_data_table_parallel
conveniently bundles all of these steps together when working with rquery
pipelines.
The partitioning is specified by the user preparing a grouping column that tells the system which sets of rows must be kept together in a correct calculation. We are going to try to demonstrate everything with simple code examples, and minimal discussion.
Keep in mind: unless the pipeline steps have non-trivial cost, the overhead of partitioning and distributing the work may overwhelm any parallel speedup. Also data.table
itself already seems to exploit some thread-level parallelism (notice user time is greater than elapsed time). That being said, in this note we will demonstrate a synthetic example where computation is expensive due to a blow-up in an intermediate join step.
First we set up our execution environment and example (some details: OSX 10.13.4 on a 2.8 GHz Intel Core i5 Mac Mini (Late 2015 model) with 8GB RAM and hybrid disk drive).
Loading required package: rquery
1 |
|
ncore <- parallel::detectCores() print(ncore)
[1] 4
1 |
|
set.seed(2362) mk_example <- function(nkey, nrep, ngroup = 20) { keys <- paste0(“key_”, seq_len(nkey)) key_group <- sample(as.character(seq_len(ngroup)), length(keys), replace = TRUE) names(key_group) <- keys key_table <- data.frame( key = rep(keys, nrep), stringsAsFactors = FALSE) key_table$data <- runif(nrow(key_table)) instance_table <- data.frame( key = rep(keys, nrep), stringsAsFactors = FALSE) instance_table$id <- seq_len(nrow(instance_table)) instance_table$info <- runif(nrow(instance_table)) # groups should be no finer than keys key_table$key_group <- key_group[key_table$key] instance_table$key_group <- key_group[instance_table$key] list(key_table = key_table, instance_table = instance_table) }
dlist <- mk_example(10, 10) data <- dlist$instance_table annotation <- dlist$key_table
1 |
|
possible data lookup: find rows that
have lookup data <= info
optree <- local_td(data) %.>% natural_join(., local_td(annotation), jointype = “INNER”, by = “key”) %.>% select_rows_nse(., data <= info) %.>% pick_top_k(., k = 1, partitionby = “id”, orderby = “data”, reverse = “data”, keep_order_column = FALSE) %.>% orderby(., “id”) cat(format(optree))
1 |
|
res1 <- ex_data_table(optree) head(res1)
1 |
|
And we can execute the operations in parallel.
1 |
|
[[1]]
[1] “rqdatatable” “rquery” “stats” “graphics” “grDevices”
[6] “utils” “datasets” “methods” “base”
##
[[2]]
[1] “rqdatatable” “rquery” “stats” “graphics” “grDevices”
[6] “utils” “datasets” “methods” “base”
##
[[3]]
[1] “rqdatatable” “rquery” “stats” “graphics” “grDevices”
[6] “utils” “datasets” “methods” “base”
##
[[4]]
[1] “rqdatatable” “rquery” “stats” “graphics” “grDevices”
[6] “utils” “datasets” “methods” “base”
1 |
|
data id info key key_group
1: 0.9152014 1 0.9860654 key_1 20
2: 0.5599810 2 0.5857570 key_2 8
3: 0.3011882 3 0.3334490 key_3 10
4: 0.3650987 4 0.3960980 key_4 5
5: 0.1469254 5 0.1753649 key_5 14
6: 0.2567631 6 0.3510280 key_6 7
[1] 94
1 |
|
##
Attaching package: ‘data.table’
The following objects are masked from ‘package:dplyr’:
##
between, first, last
packageVersion(“data.table”)
[1] ‘1.11.4’
1 |
|
id key info key_group.x data key_group.y
1: 1 key_1 0.9860654 20 0.9152014 20
2: 2 key_2 0.5857570 8 0.5599810 8
3: 3 key_3 0.3334490 10 0.3011882 10
4: 4 key_4 0.3960980 5 0.3650987 5
5: 5 key_5 0.1753649 14 0.1469254 14
6: 6 key_6 0.3510280 7 0.2567631 7
[1] 94
1 |
|
[[1]]
[1] “data.table” “rqdatatable” “rquery” “stats” “graphics”
[6] “grDevices” “utils” “datasets” “methods” “base”
##
[[2]]
[1] “data.table” “rqdatatable” “rquery” “stats” “graphics”
[6] “grDevices” “utils” “datasets” “methods” “base”
##
[[3]]
[1] “data.table” “rqdatatable” “rquery” “stats” “graphics”
[6] “grDevices” “utils” “datasets” “methods” “base”
##
[[4]]
[1] “data.table” “rqdatatable” “rquery” “stats” “graphics”
[6] “grDevices” “utils” “datasets” “methods” “base”
1 |
|
id key info key_group.x data key_group.y
1: 1 key_1 0.9860654 20 0.9152014 20
2: 2 key_2 0.5857570 8 0.5599810 8
3: 3 key_3 0.3334490 10 0.3011882 10
4: 4 key_4 0.3960980 5 0.3650987 5
5: 5 key_5 0.1753649 14 0.1469254 14
6: 6 key_6 0.3510280 7 0.2567631 7
[1] 94
1 |
|
dplyr_pipeline <- function(data, annotation) { res <- data %>% inner_join(annotation, by = “key”) %>% filter(data <= info) %>% group_by(id) %>% arrange(-data) %>% mutate(rownum = row_number()) %>% ungroup() %>% filter(rownum == 1) %>% arrange(id) res }
resd <- dplyr_pipeline(data, annotation) head(resd)
1 |
|
And we can use wrapr::execute_parallel
to parallelize the dplyr
solution.
parallel::clusterEvalQ(cl, library(“dplyr”))
1 |
|
parallel::clusterExport(cl, “dplyr_pipeline”)
dplyr_f <- function(tables_list) { data <- tables_list$data annotation <- tables_list$annotation dplyr_pipeline(data, annotation) }
dplyr_parallel_f <- function(data, annotation) { respdt <- wrapr::execute_parallel( tables = list(data = data, annotation = annotation), f = dplyr_f, partition_column = “key_group”, cl = cl) %>% dplyr::bind_rows() %>% arrange(id) } respdplyr <- dplyr_parallel_f(data, annotation) head(respdplyr)
1 |
|
We can benchmark the various realizations.
1 |
|
Unit: seconds
expr min lq mean median uq
data_table_parallel 5.274560 5.457105 5.609827 5.546554 5.686829
data_table 9.401677 9.496280 9.701807 9.541218 9.748159
rqdatatable_parallel 7.165216 7.497561 7.587663 7.563883 7.761987
rqdatatable 12.490469 12.700474 13.320480 12.898154 14.229233
dplyr_parallel 6.492262 6.572062 6.784865 6.787277 6.875076
dplyr 20.056555 20.450064 20.647073 20.564529 20.800350
max neval
6.265888 10
10.419316 10
7.949404 10
14.282269 10
7.328223 10
21.332103 10
1 |
|
The benchmark timings show parallelized data.table
is the fastest, followed by parallelized dplyr
, and parallelized rqdatatable
. In the non-paraellized case data.table
is the fastest, followed by rqdatatable
, and then dplyr
.
A reason dplyr
sees greater speedup relative to its own non-parallel implementation (yet does not beat data.table
) is that data.table
starts already multi-threaded, so data.table
is exploiting some parallelism even before we added the process level parallelism (and hence sees less of a speed up, though it is fastest).
rquery
pipelines exhibit superior performance on big data systems (Spark, PostgreSQL, Amazon Redshift, and hopefully soon Google bigquery), and rqdatatable
supplies a very good in-memory implementation of the rquery
system based on data.table
. rquery
also speeds up solution development by supplying higher order operators and early debugging features.
In this note we have demonstrated simple procedures to reliably parallelize any of rqdatatable
, data.table
, or dplyr
.
Note: we did not include alternatives such as multidplyr
or dtplyr
in the timings, as they did not appear to work on this example.
Note: Matt Summersgill pointed out for this sort of problem data.table
is far more effective than I have originally shown. rqdatatable
can not take advantage of this technique, because I have not yet truly implemented theta-joins in the rqdatatable
dialect of rquery
(it is on my TODO-list).
A re-rendering of this article can be found here, source code here, and raw timings here.
Like this:
Like Loading…
Related