Parallel computing

William Michael Landau

2017-11-05

## cache C:/Users/c240390/AppData/Local/Temp/RtmpSQkVhU/Rbuild2db86da6234/drake/...

Drake has extensive high-performance computing support, from local multicore computing on your laptop to serious supercomputing across multiple nodes of a large cluster. In make(), just set the jobs argument to something greater than 1. That unlocks local multicore parallelism. For large-scale distributed parallelism, set parallelism to "Makefile" and stay tuned for an explanation.

1 The approach

Drake’s approach to parallelism relies on the network graph of the targets and imports.

clean()
load_basic_example()
make(my_plan, jobs = 2, verbose = FALSE) # Parallelize over 2 jobs.
# Change a dependency.
reg2 <- function(d) {
  d$x3 <- d$x ^ 3
  lm(y ~ x3, data = d)
}
# Hover, click, drag, zoom, and pan.
plot_graph(my_plan, width = "100%", height = "500px")

When you call make(my_plan, jobs = 4), the work proceeds in chronological order from left to right. The items are built or imported column by column in sequence, and up-to-date targets are skipped. Within each column, the targets/objects are all independent of each other conditional on the previous steps, so they are distributed over the 4 available parallel jobs/workers. Assuming the targets are rate-limiting (as opposed to imported objects), the next make(..., jobs = 4) should be faster than make(..., jobs = 1), but it would be superfluous to use more than 4 jobs.

2 How many parallel jobs should you use?

2.1 Not too many!

Be mindful of the maximum number of simultaneous parallel jobs you deploy. At best, too many jobs is poor etiquette on a system with many users and limited resources. At worst, too many jobs will crash a system. The jobs argument to make() sets the maximum number of simultaneous jobs in most cases, but not all.

For most of drake’s parallel backends, jobs sets the maximum number of simultaneous parallel jobs. However, there are ways to break the pattern. For example, make(..., parallelism = "Makefile", jobs = 2, args = "--jobs=4") uses at most 2 jobs for the imports and at most 4 jobs for the targets. (In make(), args overrides jobs for the targets). For make(..., parallelism = "future_lapply"), the jobs argument is ignored altogether. Instead, you might limit the max number of jobs by setting options(mc.cores = 2) before calling make(). Depending on the future backend you select with backend() or future::plan(), you might make use of one of the other environment variables listed in ?future::future.options.

2.2 Drake can suggest a maximum number of useful jobs

For drake, the max useful jobs is the maximum number of targets in any parallelizable stage. Unless from_scratch is TRUE in max_useful_jobs(), all up-to-date targets are ignored.

library(drake)
load_basic_example()
plot_graph(my_plan) # Set targets_only to TRUE for smaller graphs.
max_useful_jobs(my_plan) # 8
max_useful_jobs(my_plan, imports = "files") # 8
max_useful_jobs(my_plan, imports = "all") # 8
max_useful_jobs(my_plan, imports = "none") # 8
make(my_plan, jobs = 4)
plot_graph(my_plan)
# Ignore the targets already built.
max_useful_jobs(my_plan) # 1
max_useful_jobs(my_plan, imports = "files") # 1
max_useful_jobs(my_plan, imports = "all") # 8
max_useful_jobs(my_plan, imports = "none") # 0
# Change a function so some targets are now out of date.
reg2 <- function(d){
  d$x3 <- d$x ^ 3
  lm(y ~ x3, data = d)
}
plot_graph(my_plan)
max_useful_jobs(my_plan) # 4
max_useful_jobs(my_plan, from_scratch = TRUE) # 8
max_useful_jobs(my_plan, imports = "files") # 4
max_useful_jobs(my_plan, imports = "all") # 8
max_useful_jobs(my_plan, imports = "none") # 4

3 Parallel backends

Drake has multiple parallel backends, i.e. separate mechanisms for achieving parallelism. Some are low-overhead and limited, others are high-overhead and scalable. Just set the parallelism argument of Make to choose a backend. The best choice usually depends on your project’s scale and stage of deployment.

parallelism_choices()
## [1] "parLapply"     "mclapply"      "Makefile"      "future_lapply"
parallelism_choices(distributed_only = TRUE)
## [1] "Makefile"      "future_lapply"
?parallelism_choices  # Read an explanation of each backend.
default_parallelism() # "parLapply" on Windows, "mclapply" everywhere else

3.1 mclapply

The mclapply backend is powered by the mclapply() function from the parallel package. It is a way to fork multiple processes on your local machine to take advantage of multicore computing. It spins up quickly, but it lacks scalability, and it does not work on Windows. If you try to call make(.., parallelism = "mclapply", jobs = 2) on a Windows machine, drake will warn you and then demote the number of jobs to 1.

3.2 parLapply

make(.., parallelism = "mclapply", jobs = 2)

The parLapply backend is powered by the parLapply() function from the parallel package. Like the mclapply backend, parLapply only scales up to a handful of jobs on your local machine. However, it works on all platforms. The tradeoff is overhead. parLapply is fast once it gets going, but it takes a long time to set up because each call to make() creates a new parallel socket cluster and transfers all you data and session info to each parallel thread individually. So if jobs is less than 2, make() does not bother setting up a cluster, and it uses lapply() instead. More importantly, the default parallel backend is parLapply on Windows machines and mclapply everywhere else.

make(.., parallelism = "parLapply", jobs = 2)
default_parallelism() # "parLapply" on Windows, "mclapply" everywhere else

3.3 future_lapply

The future package unlocks a wide array of powerful parallel backends. The idea is to set up a future backend in advance (with drake::backend() or future::plan()) and then call make(parallelism = "future_lapply").

library(future)
backend() # same as future::plan()
## sequential:
## - args: function (expr, envir = parent.frame(), substitute = TRUE, lazy = FALSE, seed = NULL, globals = TRUE, local = TRUE, earlySignal = FALSE, label = NULL, ...)
## - tweaked: FALSE
## - call: plan("default", .init = FALSE)
backend(multicore)
backend()
## multicore:
## - args: function (expr, envir = parent.frame(), substitute = TRUE, lazy = FALSE, seed = NULL, globals = TRUE, workers = availableCores(constraints = "multicore"), earlySignal = FALSE, label = NULL, ...)
## - tweaked: FALSE
## - call: future::plan(...)

make() knows which future backend you selected.

make(my_plan, parallelism = "future_lapply")

You can try different backends in an R session. Here are examples for forked processes

backend(multicore)
make(my_plan, parallelism = "future_lapply")

and multiple R sessions.

backend(multisession)
make(my_plan, parallelism = "future_lapply")

You can even deploy to your own PSOCK clusters. We recommend future::makeClusterPSOCK() rather than parallel::makePSOCKcluster()

cl <- future::makeClusterPSOCK(2L, dryrun = TRUE)(2)
backend(cluster, workers = cl)
make(my_plan, parallelism = "future_lapply")

This approach should allow you to deploy targets to a Docker container.

## Setup of Docker worker running rocker and r-base
## (requires installation of future package)
cl <- future::makeClusterPSOCK(
  "localhost",
  ## Launch Rscript inside Docker container
  rscript = c(
    "docker", "run", "--net=host", "rocker/r-base",
    "Rscript"
  ),
  ## Install drake
  rscript_args = c(
    "-e", shQuote("install.packages('drake')")
  )
)
backend(cluster, workers = cl)
make(my_plan, parallelism = "future_lapply")

The future.batchtools has even more parallel backends, particularly for popular job schedulers such as SLURM, TORQUE, and the Univa Grid Engine.

library(future.batchtools)
backend(batchtools_local)
make(my_plan, parallelism = "future_lapply")

You can even nest parallelism strategies together. In the following example, targets are submitted as jobs on the Univa Grid engine, and then future-style multicore parallelism is applied to each target’s command individually.

backend(list(batchjobs_sge, multiprocess))
make(my_plan, parallelism = "future_lapply")

For parallelism on clusters and job schedulers, special batchtools *.tmpl configuration files are required, and the technique is described in the documentation of batchtools. It is your responsibility to configure these files for your job scheduler. You can find some examples on the inst/templates folders of the batchtools and future.batchtools GitHub repositories. Drake has some built-in prepackaged example workflows. See examples_drake() to view your options, and then example_drake() to write the files for an example.

example_drake("sge")   # Sun/Univa Grid Engine workflow and supporting files
example_drake("slurm") # SLURM workflow and supporting files

Be sure to heed the previously-mentioned cautionary note about deploying too many jobs. In "future_lapply" parallelism, the jobs argument is totally ignored. In at least some cases, you can limit the maximum number of jobs to 2 by calling options(mc.cores = 2) before make(). Depending on the future backend you select with backend() or future::plan(), you might make use of one of the other environment variables listed in ?future::future.options.

3.4 Makefile

The Makefile backend uses proper Makefiles to distribute targets across different R sessions. After processing all the imports in parallel using the default backend, make(..., parallelism = "Makefile") spins up whole new separate R session for each target individually. The Makefile acts as a job scheduler, waiting until the dependencies are finished before initiating the next targets at each parallelizable stage. Thanks to a clever idea by Kirill Muller, drake communicates with the Makefile by writing hidden dummy files in the cache whose only job is to hold a timestamp. The Makefile sees these timestamps and knows which jobs to run and which ones to skip.

Unlike other backends, the Makefile backend processes all the imports first before beginning the first target. This is different from the other backends, where some targets are sometimes built before or simultaneously with independent imports. In addition, during import processing, make() uses the system’s default parallelism (mclapply or parLapply) and the number of jobs you supplied to the jobs argument. Stay tuned for how to use different numbers of jobs for imports versus targets.

3.4.1 Basic Makefile parallelism

Before running Makefile parallelism, Windows users need to download and install Rtools. For everyone else, just make sure Make is installed. Then, in the next make(), simply set the parallelism and jobs arguments as before.

make(my_plan, parallelism = "Makefile", jobs = 2)

You will see a Makefile written to your working directory. Do not run this Makefile by itself. It will not work correctly by itself because it depends on the transient dummy timestamp files created by make().

Makefile parallelism has its own kind of flexibility. You can now use the args argument to send custom arguments to the Makefile. For example, you could use 4 parallel jobs for the imports and 6 parallel jobs for the targets.

make(my_plan, parallelism = "Makefile", jobs = 4, args = "--jobs=6 --silent")

The args also let you print out the Makefile without running it, which helps during troubleshooting.

make(my_plan, parallelism = "Makefile", args = c("--touch", "--silent"))

In addition, you can use a program other than GNU Make to run the Makefile. You may be interested in lsmake as an alternative, for example.

make(my_plan, parallelism = "Makefile", jobs = 4, command = "lsmake")
default_Makefile_command()
## [1] "make"

For finer control over the build process, use the recipe_command argument. By default, the recipe_command is "Rscript -e 'R_RECIPE'".

default_recipe_command()
## [1] "Rscript -e 'R_RECIPE'"
r_recipe_wildcard()
## [1] "R_RECIPE"

The R_RECIPE wildcard is replaced by drake::mk("your_target", "path_to_cache") in the Makefile. That way, a target named your_target is built with the Makefile recipe,

Rscript -e 'drake::mk("your_target", "path_to_cache")'

You can change the recipe with the recipe_command argument. For example, to save some time and skip the loading of the methods package, you might use "R -e 'R_RECIPE' -q".

make(my_plan, parallelism = "Makefile", jobs = 4,
  recipe_command = "R -e 'R_RECIPE' -q")

The Makefile recipe for your_target becomes

R -e 'drake::mk("your_target", "path_to_cache") -q'

That particular recipe fails on Windows, but you have flexibility.

Use the Makefile_recipe() function to show and tweak Makefile recipes in advance.

Makefile_recipe()
## Rscript -e 'drake::mk(target = "your_target", cache_path = "C:/Users/c240390/AppData/Local/Temp/RtmpSQkVhU/Rbuild2db86da6234/drake/vignettes/.drake")'
Makefile_recipe(
  recipe_command = "R -e 'R_RECIPE' -q",
  target = "this_target",
  cache_path = "custom_cache"
)
## R -e 'drake::mk(target = "this_target", cache_path = "custom_cache")' -q

If recipe_command contains no mention of R_RECIPE, then R_RECIPE is single-quoted and appended automatically.

Makefile_recipe(recipe_command = "R -q -e")
## R -q -e 'drake::mk(target = "your_target", cache_path = "C:/Users/c240390/AppData/Local/Temp/RtmpSQkVhU/Rbuild2db86da6234/drake/vignettes/.drake")'

Try each of the following and look at the generated Makefile after each call to make(). To see the recipes printed to the console, run clean() between each make() and leave verbose equal to TRUE (default).

make(my_plan, parallelism = "Makefile", jobs = 4)
make(my_plan, parallelism = "Makefile", jobs = 4,
  recipe_command = "Rscript -e")
make(my_plan, parallelism = "Makefile", jobs = 4,
  recipe_command = "Rscript -e 'R_RECIPE'")

But do not try the following on Windows.

make(my_plan, parallelism = "Makefile", jobs = 4,
  recipe_command = "R -e 'R_RECIPE' -q")
make(my_plan, parallelism = "Makefile", jobs = 4,
  recipe_command = "R -q -e 'R_RECIPE'")
make(my_plan, parallelism = "Makefile", jobs = 4,
  recipe_command = "R -q -e")

3.4.2 Makefile parallelism on a cluster

For the recommended approach to supercomputing with drake, you need a new configuration file to tell the Makefile how to talk to the cluster. The shell_file() function writes a starter.

#!/bin/bash
shift
echo "module load R; $*" | qsub -sync y -cwd -j y

This file acts as the “shell” of the Makefile instead of, say, the Unix shell alone. It is a mechanism for tricking the Makefile into submitting each target as a job on your cluster rather than a new R session on your local machine. You may need to configure shell.sh for your system, such as changing module load R to reference the version of R installed on the compute nodes of the cluster.

To tell the Makefile to use shell.sh, you will need to add the line SHELL=./shell.sh to the top of the Makefile. This should not be done manually. Instead, use the prepend argument of make().

make(my_plan, parallelism = "Makefile", jobs = 2, prepend = "SHELL=./shell.sh")

SLURM users may be able to invoke srun and dispense with shell.sh altogether, though success may vary depending on the SLURM system. You will probably also need to set resource allocation parameters such as upper bounds on memory and runtime. See man srun for the possible .SHELLFLAGS.

make(
  my_plan,
  parallelism = "Makefile",
  jobs = 2,
  prepend = c(
    "SHELL=srun",
    ".SHELLFLAGS=-N1 -n1 bash -c"
  )
)

And you may be able to use recipe_command to to talk to the cluster rather than prepend (though most job schedulers require a script file).

make(my_plan, parallelism = "Makefile", jobs = 4,
  recipe_command = "tell_cluster_to_submit Rscript -e")

If you are interested in Makefile parallelism on a cluster, then you likely have a project that takes several hours or more to run. In that case, we recommend that you submit a master job on the login node that runs persistently until your work is complete. To do so, just save you call to make() in an R script, say my_script.R, and then deploy your work from the Linux terminal with the following.

nohup nice -19 R CMD BATCH script.R &

4 Final thoughts

4.1 Zombies

Some parallel backends, particularly mclapply and future::multicore, may create zombie processes. Zombie children are not usually harmful, but you may wish to kill them yourself. The following function by Carl Boneri should work on Unix-like systems. For a discussion, see drake issue 116.

fork_kill_zombies <- function(){
  require(inline)
  includes <- "#include <sys/wait.h>"
  code <- "int wstat; while (waitpid(-1, &wstat, WNOHANG) > 0) {};"

  wait <- inline::cfunction(
    body = code,
    includes = includes,
    convention = ".C"
  )

  invisible(wait())
}

4.2 More resources

See the timing vignette for explanations of functions rate_limiting_times() and predict_runtime(), which can help predict the possible speed gains of having multiple independent jobs. If you suspect drake itself is slowing down your project, you may want to read the storage vignette to learn how to set the hashing algorithms of your project.