by Edward Ma and Vishrut Gupta (Hewlett Packard Enterprise)
A few weeks ago, we revealed ddR (Distributed Data-structures in R), an exciting new project started by R-Core, Hewlett Packard Enterprise, and others that provides a fresh new set of computational primitives for distributed and parallel computing in R. The package sets the seed for what may become a standardized and easy way to write parallel algorithms in R, regardless of the computational engine of choice.
In designing ddR, we wanted to keep things simple and familiar. We expose only a small number of new user functions that are very close in semantics and API to their R counterparts. You can read the introductory material about the package here. In this post, we show how to use ddR functions.
Classes dlist, darray, and dframe: These classes are the distributed equivalents of list, matrix, and data.frame, respectively. Keeping their APIs similar to those for the vanilla R classes, we implemented operators and functions that work on these functions in the same ways. The example below creates two distributed lists -- one of five 3s and one out of the elements 1 through 5.
a <- dmapply(function(x) { x }, rep(3,5))
b <- dlist(1,2,3,4,5,nparts=1L)
The argument nparts specifies the number of partitions to split the resulting dlist b into. For darrays and dframes, which are two-dimensional, nparts also permits a two-element vector, which specifies the two-dimensional partitioning of the output.
Functions dmapply and dlapply: Following R’s functional-programming paradigm, we have created these two functions as the distributed equivalents of R’s mapply and lapply. One can supply any combination of distributed objects and regular R args into dmapply:
addThenSubtract <- function(x,y,z) { x + y - z}
c <- dmapply(addThenSubtract,a,b,MoreArgs=list(z=5))
Functions parts and collect: The parts construct gives users the ability to partition data in a manner that is very explicit. parts is often used in conjunction with dmapply to achieve partition-level parallelism. To fetch data, the collect keyword is used. So, if we wanted to check our result in c from our previous example, we may do:
collect(c)
## [[1]]
## [1] 2
##
## [[2]]
## [1] 3
##
## [[3]]
## [1] 4
##
## [[4]]
## [1] 5
##
## [[5]]
## [1] 6
Backends can easily provide custom implementations of dlist, darray, and dframe, as well as for dmapply. At a minimum, backends define only a couple of new custom classes (extending ddR’s classes), as well as the definitions for a couple of generic functions, including dmapply.
With these definitions in place, ddR knows how to properly dispatch work to backends where behaviors differ, whilst taking care of the rest of the work -- since most of these other operations can be defined using just dmapply. For example, colSums should automatically work on any darray created by a backend that has defined dmapply!
Putting it to Work: RandomForest written in ddR
In addition to adding new backend drivers for ddR (e.g., for Spark), part of this initiative is to develop an initial suite of algorithms written in ddR, such that they are portable to all ddR backends. RandomForest.ddR is one such algorithm that we have completed, now available on CRAN. ddR packages for K-Means and GLM (generalized linear models) are now also available.
Random Forest is an algorithm that can be parallelized in a very simple way by asking each worker to create a subset of the trees:
simple_RF <-function(formula, data, ntree = 500, ..., nparts = 2)
{
execute_randomForest_parallel <- function(ntree, formula, data, inputArgs)
{
inputArgs$formula <- formula
inputArgs$data <- data
inputArgs$ntree <- ntree
suppressMessages(requireNamespace("randomForest"))
model <- do.call(randomForest::randomForest,inputArgs)
}
dmodel <- dmapply(execute_randomForest_parallel,
ntree = rep(ceiling(500/nparts),nparts),
MoreArgs = list(formula=formula,
data=data,inputArgs=list(...)),
output.type = "dlist", nparts = nparts)
model <- do.call(randomForest::combine, collect(dmodel))
}
model <- simple_RF(Species ~ ., iris)
The main dmapply in the above code snippet simply broadcasts all the objects passed to the function to the workers and calls randomForest with the same parameters. An important point here is that even if ‘data’ is a distributed object, it will still be broadcast because it is listed in MoreArgs, which accepts a key-value list of either distributed objects or normal R objects.
Here is a sample performance plot of running randomforest:
We tested the randomForest.ddR package on a medium sized dataset to measure speedup when increasing the number of cores. From the graph, it is clear that up until 4 cores, there is great improvement and only then does it start to reach the point of diminishing returns. Since most computers these days have several cores, the randomForest.ddR package should be helpful for most people. On a single node you can use parallel which stops at 24 cores which corresponds to all the cores of the test machine. You can use DistributedR to continue to scale beyond 24 cores, as it can utilize multiple machines.
To read up a bit more on ddR and its semantics, visit our GitHub page here or read the user guide on CRAN.