Drake has extensive high-performance computing support, from local multicore parallelism to serious distributed computing across multiple nodes of a cluster. Control it with the parallelism and jobs arguments to make(), and use future::plan() if parallelism is "future_lapply".

The concept

Drake's approach to parallelism relies on the network graph representation of a project.

clean()
load_basic_example()
config <- make(my_plan, jobs = 2, verbose = FALSE) # Parallelize over 2 jobs.
# Change a dependency.
reg2 <- function(d) {
  d$x3 <- d$x ^ 3
  lm(y ~ x3, data = d)
}
# Hover, click, drag, zoom, and pan.
vis_drake_graph(config, width = "100%", height = "500px")

The nodes in each column above are conditionally independent given the dependencies to the left. So in general, the targets and imports are processed column by column from left to right, and everything within a column is executed in parallel. When some targets are already up to date, drake searches ahead in the graph to maximize the number of outdated targets in each parallelizable stage.

To show the parallelizable stages of the next make() programmatically, use the parallel_stages() function. All the targets/imports in a stage are processed in parallel before moving on to the next stage.

parallel_stages(config)
##                      item imported  file stage
## 1            "report.Rmd"     TRUE  TRUE     1
## 2              data.frame     TRUE FALSE     1
## 3                    knit     TRUE FALSE     1
## 4                      lm     TRUE FALSE     1
## 5                  mtcars     TRUE FALSE     1
## 6                    nrow     TRUE FALSE     1
## 7              sample.int     TRUE FALSE     1
## 8                 summary     TRUE FALSE     1
## 9        suppressWarnings     TRUE FALSE     1
## 10            random_rows     TRUE FALSE     2
## 11                   reg1     TRUE FALSE     2
## 12                   reg2     TRUE FALSE     2
## 13               simulate     TRUE FALSE     3
## 14      regression2_large    FALSE FALSE     4
## 15      regression2_small    FALSE FALSE     4
## 16 coef_regression2_large    FALSE FALSE     5
## 17 coef_regression2_small    FALSE FALSE     5
## 18 summ_regression2_large    FALSE FALSE     5
## 19 summ_regression2_small    FALSE FALSE     5
## 20            "report.md"    FALSE  TRUE     6

How many parallel jobs should you use?

Not too many!

Be mindful of the maximum number of simultaneous parallel jobs you deploy. Consequences of greed and carelessness range from poor etiquette to system crashes. In most cases, the jobs argument to make() sets the maximum number of simultaneous jobs, but it does not apply to the parallel execution of targets when parallelism is "future_lapply". If you use "future_lapply" parallelism, please see the workers argument to most supporting functions passed to future::plan() (for example, future::plan(multisession(workers = 2))). Depending on the future backend you select with future::plan() or future::plan(), you might also make use of one of the other environment variables listed in ?future::future.options.

Drake can report the maximum number of useful simultaneous jobs

The max_useful_jobs() function analyzes your project and recommends a maximum value for the jobs argument to the next make() (or the workers argument to a backend function in future). This number returned by max_useful_jobs() is only an upper bound, not necessarily the number of jobs you should choose.

library(drake)
load_basic_example()
config <- drake_config(my_plan)
vis_drake_graph(config) # Set targets_only to TRUE for smaller graphs.
max_useful_jobs(config) # 8
max_useful_jobs(config, imports = "files") # 8
max_useful_jobs(config, imports = "all") # 8
max_useful_jobs(config, imports = "none") # 8
config <- make(my_plan, jobs = 4)
vis_drake_graph(config)
# Ignore the targets already built.
max_useful_jobs(config) # 1
max_useful_jobs(config, imports = "files") # 1
max_useful_jobs(config, imports = "all") # 8
max_useful_jobs(config, imports = "none") # 0
# Change a function so some targets are now out of date.
reg2 <- function(d){
  d$x3 <- d$x ^ 3
  lm(y ~ x3, data = d)
}
vis_drake_graph(config)
max_useful_jobs(config) # 4
max_useful_jobs(config, from_scratch = TRUE) # 8
max_useful_jobs(config, imports = "files") # 4
max_useful_jobs(config, imports = "all") # 8
max_useful_jobs(config, imports = "none") # 4

Caveats

Drake claims that it can

  1. Build and cache your targets in parallel (in stages).
  2. Build and cache your targets in the correct order, finishing dependencies before starting targets that depend on them.
  3. Deploy your targets to the parallel backend of your choice.

However, the practical efficiency of the parallel computing functionality remains to be verified rigorously. Serious performance studies will be part of future work that has not yet been conducted at the time of writing. In addition, each project has its own best parallel computing set up, and the user needs to optimize it on a case-by-case basis. Some general considerations include the following.

Parallel backends

Drake has multiple parallel backends, i.e. separate mechanisms for achieving parallelism. Some are low-overhead and limited, others are high-overhead and scalable. Just set the parallelism argument of Make to choose a backend. The best choice usually depends on your project's intended scale and stage of deployment.

parallelism_choices()
## [1] "mclapply"      "parLapply"     "future"        "future_lapply"
## [5] "Makefile"

parallelism_choices(distributed_only = TRUE)
## [1] "future"        "future_lapply" "Makefile"
?parallelism_choices  # Read an explanation of each backend.
default_parallelism() # "parLapply" on Windows, "mclapply" everywhere else
## [1] "mclapply"

mclapply

The mclapply backend is powered by the mclapply() function from the parallel package, and it forks multiple processes on your local machine. It spins up quickly, but it lacks scalability, and it does not work on Windows. If you try to call make(.., parallelism = "mclapply", jobs = 2) on a Windows machine, drake will warn you and then demote jobs to 1.

parLapply

make(.., parallelism = "mclapply", jobs = 2)

The parLapply backend is powered by the parLapply() function from the parallel package. Like the mclapply backend, parLapply only scales up to a handful of jobs on your local machine. parLapply parallelism works on all platforms, but it takes a few seconds to initialize during each make(). If jobs is less than 2, make() does not bother setting up a parallel socket cluster, opting instead for lapply() to reduce overhead. The default parallel backend is parLapply on Windows machines and mclapply everywhere else.

make(.., parallelism = "parLapply", jobs = 2)
default_parallelism() # "parLapply" on Windows, "mclapply" everywhere else

future_lapply

The future package unlocks a wide array of powerful parallel backends. The idea is to set up a future::plan() in advance and then call make(parallelism = "future_lapply").

library(future)
future::plan()
## sequential:
## - args: function (expr, envir = parent.frame(), substitute = TRUE, lazy = FALSE, seed = NULL, globals = TRUE, local = TRUE, earlySignal = FALSE, label = NULL, ...)
## - tweaked: FALSE
## - call: plan("default", .init = FALSE)

future::plan(multicore)
future::plan()
## multicore:
## - args: function (expr, envir = parent.frame(), substitute = TRUE, lazy = FALSE, seed = NULL, globals = TRUE, workers = availableCores(constraints = "multicore"), earlySignal = FALSE, label = NULL, ...)
## - tweaked: FALSE
## - call: future::plan(multicore)

make() knows which future::plan() you selected.

make(my_plan, parallelism = "future_lapply")

The multicore plan is the analogue of mclapply parallelism, and the multisession plan is the analogue of parLapply parallelism.

future::plan(multisession(workers = 4)) # Use a max of 4 parallel jobs at a time. # nolint
make(my_plan, parallelism = "future_lapply")

You can even deploy to your own parallel socket clusters clusters. You can use future::makeClusterPSOCK() rather than parallel::makePSOCKcluster().

cl <- future::makeClusterPSOCK(2L, dryrun = TRUE)
future::plan(cluster, workers = cl)
make(my_plan, parallelism = "future_lapply")

This approach should allow you to deploy targets to a Docker container.

## Setup of Docker worker running rocker and r-base # nolint
## (requires installation of future package)
cl <- future::makeClusterPSOCK(
  "localhost",
  ## Launch Rscript inside Docker container
  rscript = c(
    "docker", "run", "--net=host", "rocker/r-base",
    "Rscript"
  ),
  ## Install drake
  rscript_args = c(
    "-e", shQuote("install.packages('drake')")
  )
)
future::plan(cluster, workers = cl)
make(my_plan, parallelism = "future_lapply")

The future.batchtools package unlocks even more parallel computing functionality, particularly for popular job schedulers such as SLURM, TORQUE, and the Univa Grid Engine.

library(future.batchtools)
drake_batchtools_tmpl_file("slurm") # Write batchtools.slurm.tmpl.
future::plan(
  batchtools_slurm,
  template = "batchtools.slurm.tmpl",
  workers = 16
)
make(my_plan, parallelism = "future_lapply")

You can even nest parallelism strategies together. In the following example, targets are submitted as jobs on the Univa Grid engine, and then future-style multicore parallelism is applied to each target's command individually.

drake_batchtools_tmpl_file("sge") # Write sge-simple.tmpl.
future::plan(
  list(
    tweak(batchtools_sge, template = "sge-simple.tmpl"),
    multiprocess
  )
)
make(my_plan, parallelism = "future_lapply")

For parallelism on clusters and job schedulers, special batchtools *.tmpl configuration files are required, and the technique is described in the documentation of batchtools. It is your responsibility to configure these files for your job scheduler. You can find some examples on the inst/templates folders of the batchtools and future.batchtools GitHub repositories. Drake has some built-in prepackaged example workflows as well. See drake_examples() to view your options, and then drake_example() to write the files for an example.

drake_example("sge")   # Sun/Univa Grid Engine workflow and supporting files
drake_example("slurm") # SLURM workflow and supporting files

To just write the batchtools *.tmpl for an example, use

drake_batchtools_tmpl_file("sge")   # Writes sge-simple.tmpl
drake_batchtools_tmpl_file("slurm") # Writes batchtools.slurm.tmpl

Be sure to heed the previously-mentioned cautionary note about deploying too many jobs at once. In "future_lapply" parallelism, the jobs argument applies to the imports, but not the targets. Functions passed to future::plan() such as mulitisession() and batchtools_slurm() usually have a workers arguments for this purpose. Depending on the future backend you select with future::plan(), you might also make use of one of the other environment variables listed in ?future::future.options.

future

The future backend is experimental and needs more real-world testing. It is similar to future_lapply except that individual futures are launched and managed using a manual job scheduler. Jobs are submitted as soon as workers become available, which overcomes an inefficiency of the usual staged parallelism. And with the optional evaluator column of the workflow plan data frame, you can use different computing resources for different targets. (See the evaluator argument of future().)

library(future)
library(drake)
load_basic_example()
remote <- future::plan(multisession)
local <- future::plan(multicore)
evaluator <- NULL
# Make the targets with the multisession future backend...
for (i in seq_len(nrow(my_plan))){
  evaluator <- c(evaluator, remote)
}
# ...except for the R Markdown report.
evaluator[[1]] <- local
my_plan$evaluator <- evaluator
make(my_plan, parallelism = "future", jobs = 8)

In addition, you can set the caching argument to control when the values of the targets are cached: "worker" for the individual workers (default) and "master" for the master process. If you let the workers do the caching, you can take advantage of parallelism when targets are stored. On the other hand, "master" is a better option if workers do not have cache access or you are using a custom cache that is not thread-safe (e.g. storr::storr_dbi().

Makefile

Makefile parallelism uses proper Makefiles to distribute targets across different R sessions. Similarly to future_lapply parallelism, it is a mechanism for distributing targets at scale.

Basic Makefile parallelism

Before running Makefile parallelism, Windows users need to download and install Rtools. For everyone else, just make sure Make is installed. Then, in the next make(), simply set the parallelism and jobs arguments as before.

make(my_plan, parallelism = "Makefile", jobs = 2)

You will see a Makefile written to your working directory. Do not run this Makefile separately from drake. It will not work correctly by itself because it depends on the transient dummy timestamp files created by make().

Makefile parallelism has its own modes of flexibility. You can now use the args argument to send custom arguments to the Makefile. For example, you could use 4 parallel jobs for the imports and 6 parallel jobs for the targets.

make(my_plan, parallelism = "Makefile", jobs = 4, args = "--jobs=6 --silent")

The args also let you print out the Makefile without running it, which helps during troubleshooting.

make(my_plan, parallelism = "Makefile", args = c("--touch", "--silent"))

In addition, you can use a program other than GNU Make, such as lsmake, to run the Makefile.

make(my_plan, parallelism = "Makefile", jobs = 4, command = "lsmake")
default_Makefile_command()
## [1] "make"

For finer control over the build process, use the recipe_command argument. By default, the recipe_command is "Rscript -e 'R_RECIPE'".

default_recipe_command()
## [1] "Rscript -e 'R_RECIPE'"

r_recipe_wildcard()
## [1] "R_RECIPE"

The R_RECIPE wildcard is replaced by drake::mk("your_target", "path_to_cache") in the Makefile. That way, a target named your_target is built with the Makefile recipe,

Rscript -e 'drake::mk("your_target", "path_to_cache")'

You can change the recipe with the recipe_command argument to make(). For example, to save some time and skip the loading of the methods package, you might use "R -e 'R_RECIPE' -q".

make(my_plan, parallelism = "Makefile", jobs = 4,
  recipe_command = "R -e 'R_RECIPE' -q")

The Makefile recipe for your_target becomes

R -e 'drake::mk("your_target", "path_to_cache") -q'

But be warned: that particular recipe fails on Windows.

Use the Makefile_recipe() function to show and tweak Makefile recipes in advance.

Makefile_recipe(cache_path = "just_use_the_default")
## Rscript -e 'drake::mk(target = "your_target", cache_path = "just_use_the_default")'

Makefile_recipe(
  recipe_command = "R -e 'R_RECIPE' -q",
  target = "this_target",
  cache_path = "custom_cache"
)
## R -e 'drake::mk(target = "this_target", cache_path = "custom_cache")' -q

If recipe_command contains no mention of R_RECIPE, then R_RECIPE is single-quoted and appended automatically.

Makefile_recipe(recipe_command = "R -q -e", cache_path = "supplied_by_default")
## R -q -e 'drake::mk(target = "your_target", cache_path = "supplied_by_default")'

Try each of the following and look at the generated Makefile after each call to make(). To see the recipes printed to the console, run clean() between each make() and leave verbose equal to TRUE (default).

make(my_plan, parallelism = "Makefile", jobs = 4)
make(my_plan, parallelism = "Makefile", jobs = 4,
  recipe_command = "Rscript -e")
make(my_plan, parallelism = "Makefile", jobs = 4,
  recipe_command = "Rscript -e 'R_RECIPE'")

But do not try the following on Windows.

make(my_plan, parallelism = "Makefile", jobs = 4,
  recipe_command = "R -e 'R_RECIPE' -q")
make(my_plan, parallelism = "Makefile", jobs = 4,
  recipe_command = "R -q -e 'R_RECIPE'")
make(my_plan, parallelism = "Makefile", jobs = 4,
  recipe_command = "R -q -e")

Makefile parallelism on a cluster

In the general case, you will need a new configuration file to tell the Makefile how to talk to the cluster. The shell_file() function writes a starter.

#!/bin/bash
shift
echo "module load R; $*" | qsub -sync y -cwd -j y

This file acts as the “shell” for the Makefile instead of a typical Unix shell. It is a mechanism for tricking the Makefile into submitting each target as a job on a cluster rather than your local machine. You may need to configure shell.sh for your system, possibly changing module load R to point to the appropriate copy of R.

To tell the Makefile to use shell.sh, you add the line SHELL=./shell.sh to the top of the Makefile using the prepend argument to make().

make(my_plan, parallelism = "Makefile", jobs = 2, prepend = "SHELL=./shell.sh")

SLURM users may be able to invoke srun and dispense with shell.sh altogether, though success may vary depending on the SLURM system. You will probably also need to set resource allocation parameters governing memory, runtime, etc. See man srun for the possible .SHELLFLAGS.

make(
  my_plan,
  parallelism = "Makefile",
  jobs = 2,
  prepend = c(
    "SHELL=srun",
    ".SHELLFLAGS=-N1 -n1 bash -c"
  )
)

In some cases, you may be able to use recipe_command to talk to the cluster rather than prepend.

make(my_plan, parallelism = "Makefile", jobs = 4,
  recipe_command = "tell_cluster_to_submit Rscript -e")

Finally, to deploy your work, just save the call to make() in an R script (say, my_script.R) and then launch it from the Linux terminal.

nohup nice -19 R CMD BATCH script.R &

Drake as an ordinary job scheduler

If you do not care about reproducibility and you want drake to be an ordinary job scheduler, consider using alternative triggers.

load_basic_example()
make(my_plan, trigger = "missing") # Also consider "always".

Above, drake only builds the missing targets. This skips much of the time-consuming hashing that ordinarily detects which targets are out of date.

Final thoughts

Debugging

For large workflows, downsizing and debugging tools become super important. See the “debug” vignette for help on diagnosing problems with a workflow. Triggers and cached error logs especially speed the development and testing process.

Zombies

In versions of R prior to 3.5.0, some parallel backends, particularly mclapply and future::multicore, may create zombie processes. This issue is fixed in R versions 3.5.0 and later. Zombie children are not usually harmful, but you may wish to kill them yourself. The following function by Carl Boneri should work on Unix-like systems. For a discussion, see drake issue 116.

fork_kill_zombies <- function(){
  require(inline)
  includes <- "#include <sys/wait.h>"
  code <- "int wstat; while (waitpid(-1, &wstat, WNOHANG) > 0) {};"

  wait <- inline::cfunction(
    body = code,
    includes = includes,
    convention = ".C"
  )

  invisible(wait())
}

More resources

See the timing vignette for explanations of functions rate_limiting_times() and predict_runtime(), which can help predict the possible speed gains of having multiple independent jobs. If you suspect drake itself is slowing down your project, you may want to read the storage vignette to learn how to set the hashing algorithms of your project.