Posted on Categories Coding, data science, Exciting Techniques, math programming, Programming, TutorialsTags ,

A gentle introduction to parallel computing in R

Let’s talk about the use and benefits of parallel computation in R.


NewImage

IBM’s Blue Gene/P massively parallel supercomputer (Wikipedia).

Parallel computing is a type of computation in which many calculations are carried out simultaneously.”

Wikipedia quoting: Gottlieb, Allan; Almasi, George S. (1989). Highly parallel computing

The reason we care is: by making the computer work harder (perform many calculations simultaneously) we wait less time for our experiments and can run more experiments. This is especially important when doing data science (as we often do using the R analysis platform) as we often need to repeat variations of large analyses to learn things, infer parameters, and estimate model stability.

Typically to get the computer to work a harder the analyst, programmer, or library designer must themselves work a bit hard to arrange calculations in a parallel friendly manner. In the best circumstances somebody has already done this for you:

  • Good parallel libraries, such as the multi-threaded BLAS/LAPACK libraries included in Revolution R Open (RRO, now Microsoft R Open) (see here).
  • Specialized parallel extensions that supply their own high performance implementations of important procedures such as rx methods from RevoScaleR or h2o methods from h2o.ai.
  • Parallelization abstraction frameworks such as Thrust/Rth (see here).
  • Using R application libraries that dealt with parallelism on their own (examples include gbm, boot and our own vtreat). (Some of these libraries do not attempt parallel operation until you specify a parallel execution environment.)

In addition to having a task ready to “parallelize” you need a facility willing to work on it in a parallel manner. Examples include:

  • Your own machine. Even a laptop computer usually now has four our more cores. Potentially running four times faster, or equivalently waiting only one fourth the time, is big.
  • Graphics processing units (GPUs). Many machines have a one or more powerful graphics cards already installed. For some numerical task these cards are 10 to 100 times faster than the basic Central Processing Unit (CPU) you normally use for computation (see here).
  • Clusters of computers (such as Amazon ec2, Hadoop backends and more).

Obviously parallel computation with R is a vast and specialized topic. It can seem impossible to quickly learn how to use all this magic to run your own calculation more quickly.

In this tutorial we will demonstrate how to speed up a calculation of your own choosing using basic R.

First you must have a problem that is amenable to parallelism. The most obvious such problems involve repetition (what the cognoscenti call “embarrassingly parallel”):

  • Tuning model parameters by repeated re-fitting of models (such as is done by the caret package).
  • Apply a transform to many different variables (such as is done by the vtreat package).
  • Estimating model quality through cross-validation, bootstrap or other repeated sampling techniques.

So we are going to assume we have a task in mind that has already been cup up into a number of simple repetitions. Note: this conceptual cutting up is not necessarily always easy, but it is the step needed to start the processes.

Here is our example task: fitting a predictive model onto a small dataset. We load the data set and some definitions into our workspace:

d <- iris # let "d" refer to one of R's built in data sets
vars <- c('Sepal.Length','Sepal.Width','Petal.Length')
yName <- 'Species'
yLevels <- sort(unique(as.character(d[[yName]])))
print(yLevels)
## [1] "setosa"     "versicolor" "virginica" 

(We are using the convention that any line starting with “##” is a printed result from the R command above itself.)

We encounter a small modeling issue: the variable we are trying to predict takes on three levels. The modeling technique we were going to use (glm(family='binomial')) is not specialized to predict “multinomial outcomes” (though other libraries are). So we decide to solve this using a “one versus rest” strategy and build a series of classifiers: each separating one target from the rest of the outcomes. This is where we see a task that is an obvious candidate for parallelization. Let’s wrap building a single target model into a function for neatness:

fitOneTargetModel <- function(yName,yLevel,vars,data) {
  formula <- paste('(',yName,'=="',yLevel,'") ~ ',
                   paste(vars,collapse=' + '),sep='')
  glm(as.formula(formula),family=binomial,data=data)
}

Then the usual “serial” way to build all the models would look like the following:

for(yLevel in yLevels) {
  print("*****")
  print(yLevel)
  print(fitOneTargetModel(yName,yLevel,vars,d))
}

Or we could wrap our procedure into a new single argument function (this pattern is called Currying) and then use R’s elegant lapply() notation:

worker <- function(yLevel) {
  fitOneTargetModel(yName,yLevel,vars,d)
}
models <- lapply(yLevels,worker)
names(models) <- yLevels
print(models)

The advantage of the lapply() notation is it emphasizes the independent nature of each calculation, exactly the sort of isolation we need to parallelize our calculations. Think of the for-loop as accidentally over-specifying the calculation by introducing a needless order or sequence of operations.

Re-organizing our calculation functionally has gotten us ready to use a parallel library and perform the calculation in parallel. First we set up a parallel cluster to do the work:

# Start up a parallel cluster
parallelCluster <- parallel::makeCluster(parallel::detectCores())
print(parallelCluster)
## socket cluster with 4 nodes on host ‘localhost’

Notice we created a “socket cluster.” The socket cluster is a crude-seeming “course grained parallel” cluster that is extremely flexible. A socket cluster is crude in that is fairly slow to send jobs to it (so you want to send work in big “coarse” chunks) but amazingly flexible as it can be implemented as any of: multiple cores on a single machine, multiple cores on multiple machines on a shared network, or on top of other systems such as an MPI cluster.

At this point we would expect code like below to work (see here for details on tryCatch).

tryCatch(
  models <- parallel::parLapply(parallelCluster,
                                yLevels,worker),
  error = function(e) print(e)
)
## <simpleError in checkForRemoteErrors(val):
##   3 nodes produced errors; first error: 
##      could not find function "fitOneTargetModel">

Instead of results, we got the error “could not find function "fitOneTargetModel">.”

The issue is: on a socket cluster arguments to parallel::parLapply are copied to each processing node over a communications socket. However, the entirety of the current execution environment (in our case the so-called “global environment) is not copied over (or copied back, only values are returned). So our function worker() when transported to the parallel nodes must have an altered lexical closure (as it can not point back to our execution environment) and it appears this new lexical closure no longer contains references to the needed values yName, vars, d and fitOneTargetModel. This is unfortunate, but makes sense. R uses entire execution environments to implement the concept of lexical closures, and R has no way of knowing which values in a given environment are actually needed by a given function.

So we know what is wrong, how do we fix it? We fix it by using an environment that is not the global environment to transport the values we need. The easiest way to do this is to use our own lexical closure. To do this we wrap the entire process inside a function (so we are executing in a controlled environment). The code that works is as follows:

# build the single argument function we are going to pass to parallel
mkWorker <- function(yName,vars,d) {
  # make sure each of the three values we need passed 
  # are available in this environment
  force(yName)
  force(vars)
  force(d)
  # define any and every function our worker function 
  # needs in this environment
  fitOneTargetModel <- function(yName,yLevel,vars,data) {
    formula <- paste('(',yName,'=="',yLevel,'") ~ ',
                     paste(vars,collapse=' + '),sep='')
    glm(as.formula(formula),family=binomial,data=data)
  }
  # Finally: define and return our worker function.
  # The function worker's "lexical closure" 
  # (where it looks for unbound variables)
  # is mkWorker's activation/execution environment 
  # and not the usual Global environment.
  # The parallel library is willing to transport 
  # this environment (which it does not
  # do for the Global environment).
  worker <- function(yLevel) {
    fitOneTargetModel(yName,yLevel,vars,d)
  }
  return(worker)
}

models <- parallel::parLapply(parallelCluster,yLevels,
                              mkWorker(yName,vars,d))
names(models) <- yLevels
print(models)

The above works because we forced the values we needed into the new execution environment and defined the function we were going to use directly in that environment. Obviously it is incredibly tedious and wasteful to have to re-define every function we need every time we need it (though we could also have passed in the wrapper as we did with the other values). A more versatile pattern is: use a helper function we supply called “bindToEnv” to do some of the work. With bindToEnv the code looks like the following.

source('bindToEnv.R') # Download from: http://winvector.github.io/Parallel/bindToEnv.R
# build the single argument function we are going to pass to parallel
mkWorker <- function() {
  bindToEnv(objNames=c('yName','vars','d','fitOneTargetModel'))
  function(yLevel) {
    fitOneTargetModel(yName,yLevel,vars,d)
  }
}

models <- parallel::parLapply(parallelCluster,yLevels,
                              mkWorker())
names(models) <- yLevels
print(models)

The above pattern is concise and works very well. A few caveats to remember are:

  • Remember each parallel worker is a remote environment. Make sure libraries you need are defined on each remote machine.
  • Non-core libraries loaded in the home environment are not necessarily loaded in the remote environment. Prefer package-style notation such as stats::glm() when calling library functions (as calling library(...) on each remote node would be wasteful).
  • Our bindToEnv function is itself directly altering environments of passed functions (so they can refered to the values we are transporting). This can cause its own problems in damaging previously curried environments. Some notes on working around this can be found here.

This is a bit to think about. However, I think you will find adding about eight lines of wrapper/boilerplate code is amply paid back by the four or more times speedup you will routinely see in your work.

Also: when you are finished, you should remember to shutdown your cluster reference:

# Shutdown cluster neatly
if(!is.null(parallelCluster)) {
  parallel::stopCluster(parallelCluster)
  parallelCluster <- c()
}

And that concludes our initial tutorial. We will followup, in a later article, with how to quickly build socket clusters using many machines, and using Amazon ec2.


The bindToEnv function itself is fairly simple:

#' Copy arguments into env and re-bind any function's lexical scope to bindTargetEnv .
#' 
#' See http://winvector.github.io/Parallel/PExample.html for example use.
#' 
#' 
#' Used to send data along with a function in situations such as parallel execution 
#' (when the global environment would not be available).  Typically called within 
#' a function that constructs the worker function to pass to the parallel processes
#' (so we have a nice lexical closure to work with).
#' 
#' @param bindTargetEnv environment to bind to
#' @param objNames additional names to lookup in parent environment and bind
#' @param names of functions to NOT rebind the lexical environments of
bindToEnv <- function(bindTargetEnv=parent.frame(),objNames,doNotRebind=c()) {
  # Bind the values into environment
  # and switch any functions to this environment!
  for(var in objNames) {
    val <- get(var,envir=parent.frame())
    if(is.function(val) && (!(var %in% doNotRebind))) {
      # replace function's lexical environment with our target (DANGEROUS)
      environment(val) <- bindTargetEnv
    }
    # assign object to target environment, only after any possible alteration
    assign(var,val,envir=bindTargetEnv)
  }
}

It also can be dowloaded from here.

One of the pains of using parallelism is this way is you always seem to need one more function or data item. One way around this is to use R’s ls() command to build the list of names you want to send. Saving the results of an ls() right after sourcing files that define functions and important global values is especially effective. Without some such strategy you find your self adding items to lists in a painful piecemeal fashion as illustrated in the following video:


“The Jerk” remembering what values needs to pack for parallel computation.

For working at larger scale: rough instructions for spinning up multiple machines as R servers on ec2 can be found here. I have also found remembering the ssh credentials of local machines lets you perform informal network cluster calculations quite handily.

Some more references:

4 thoughts on “A gentle introduction to parallel computing in R”

  1. A good overview. I find mclapply from the parallel package the easiest to use, e.g. (from your first example) models <- mclapply(yLevels, mc.cores=4, worker). The only caveat is: you need to be on a Linux system to use it.

  2. Nice post, I wrote a year ago a post where I looked into the basic parallel functions in R (http://gforge.se/2015/02/how-to-go-parallel-in-r-basics-tips/). I’m intrigued by the neat bindToEnv function – a really neat way of dealing with the variables although I guess you are still stuck with loading the libraries in your new environment. I’m a little unclear to how this affects memory – since I often go parallel when working on large problems the memory often becomes a pressing issue – have you done any benchmarking?

    1. Nice article, thanks for sharing.

      For libraries I mostly re-write code to use the “::” notation which lets you use most libraries if they are merely installed (not loaded). Each node needs the same libraries installed (so you may need to send a parallel job to do that once), and some libraries do need to be loaded (so you would have to include that in your worker).

      As to memory- running from a clean environment can also be used to limit what gets sent to each worker node. We’ve looked into trying to cut down extra references (see http://www.win-vector.com/blog/2014/05/trimming-the-fat-from-glm-models-in-r/ and http://www.win-vector.com/blog/2015/03/using-closures-as-objects-in-r/ ).

      We’ve been using parallel mostly for hyper-paremeter tuning and plotting variations of expensive machine learning steps. So we have seen maybe 1 second set-up time and then 2 minutes run time per job. Thus we have been seeing speedups of 4 to 32 times depending on what sort of machines we are using and how many we organize into our informal cluster.

Comments are closed.