Run massive parallel R jobs cheaply

Due to its integration with future you can run massive computing tasks using a Google Compute Engine cluster with just a few lines of code.

Some more examples of using future can be found here, using fractals as an example.

On other platforms, see also an Azure example here on Revolution Analytics.

Remote R cluster

This workflow takes advantage of the future integration to run your local R-functions within a cluster of GCE machines.
You can do this to throw up expensive computations by spinning up a cluster and tearing it down again once you are done.

In summary, this workflow:

  1. Creates a GCE cluster
  2. Lets you perform computations
  3. Stops the VMs

Create the cluster

The example below uses a default r-base template, but you can also create a dynamic_template pulled from the Container Registry if required.

Instead of the more generic gce_vm() that is used for more interactive use, we create the instances directly using gce_vm_template() so it doesn’t wait for the job to complete before starting the next (not useful if you have a lot of VMs). You can then use gce_get_zone_op() to get the job status, and use gce_wait() to return when they are done.

library(future)
library(googleComputeEngineR)

## names for your cluster
vm_names <- c("vm1","vm2","vm3")

## create the cluster using default template for r-base
## creates jobs that are creating VMs in background
jobs <- lapply(vm_names, function(x) {
    gce_vm_template(template = "r-base",
                    predefined_type = "n1-highmem-2",
                    name = x,
                    dynamic_image = my_docker),
                    wait = FALSE
                    })
jobs
# [[1]]
# ==Operation insert :  PENDING
# Started:  2016-11-16 06:52:58
# [[2]]
# ==Operation insert :  PENDING
# Started:  2016-11-16 06:53:04
# [[3]]
# ==Operation insert :  PENDING
# Started:  2016-11-16 06:53:09

## check status of jobs
lapply(jobs, gce_get_zone_op)
# [[1]]
# ==Operation insert :  DONE
# Started:  2016-11-16 06:52:58
# Ended: 2016-11-16 06:53:14 
# Operation complete in 16 secs 

# [[2]]
# ==Operation insert :  DONE
# Started:  2016-11-16 06:53:04
# Ended: 2016-11-16 06:53:20 
# Operation complete in 16 secs 

# [[3]]
# ==Operation insert :  DONE
# Started:  2016-11-16 06:53:09
# Ended: 2016-11-16 06:53:30 
# Operation complete in 21 secs


## wait for all the jobs to complete and VMs are ready
vms <- lapply(vm_names, gce_wait)


## get the VM objects
vms <- lapply(vm_names, gce_vm)

It is safest to setup the SSH keys seperately for multiple instances, using gce_ssh_setup() - this is normally called for you when you first connect to a VM.

## set up SSH for the VMs
vms <- lapply(vms, gce_ssh_setup)

We now make the VM cluster as per details given in the future README

## make a future cluster
plan(cluster, workers = vms)

Using your own Docker image

The default uses rocker/r-base as its image, but if you want your own custom image then create your own Docker image, for example via this tutorial using Google Build Triggers.

This will give you a docker image name such as gcr.io/my-project/my-r - use a version of the code below to use this in your cluster:

plan(cluster, workers = as.cluster(
  vms, 
  docker_image="gcr.io/my-project/my-r",
  rscript=c("docker", "run", c("--net=host","--shm-size=13G"),
            "gcr.io/my-rstudio", 
            "Rscript")))

Using the cluster

The cluster is now ready to recieve jobs. You can send them by simply using %<-% instead of <-. Another useful function is future::future_lapply that lets you loop over a cluster. Consult the future documentation for details.

## use %<-% to send functions to work on cluster
## See future README for details: https://github.com/HenrikBengtsson/future
a %<-% Sys.getpid()

## make a big function to run asynchronously
f <- function(my_data, args){
   ## ....expensive...computations
   
   result
}

## send to cluster
result %<-% f(my_data) 

For long running jobs you can use future::resolved to check on its progress.

## check if resolved
resolved(result)
[1] TRUE

Example - forecasting a large data set

The below splits a dataset into chunks that are each run on a seperate VMs, using a custom Docker image that has the necessary packages installed, for instance via build triggers. Optimise by including the package future in these Docker images.

library(future)
library(googleComputeEngineR)

## names for your cluster - just three for this example
vm_names <- c("vm1","vm2","vm3")

my_docker <- gce_tag_container("custom-image", project = "my-project")

## create the cluster using custom docker image
## creates jobs that are creating VMs in background
jobs <- lapply(vm_names, function(x) {
    gce_vm_template(template = "r-base",
                    predefined_type = "n1-highmem-2",
                    name = x,
                    dynamic_image = my_docker),
                    wait = FALSE
                    })
                     
## wait for all the jobs to complete and VMs are ready
vms <- lapply(vm_names, gce_wait)
                     
## set up SSH for the VMs
vms <- lapply(vms, gce_ssh_setup)  # set any settings necessary here for SSH                    

## the Rscript command that will run in the cluster
## customise as needed, this for example sets shared RAM to 13GB
my_rscript <- c("docker", 
                "run", c("--net=host","--shm-size=13G"),
                docker_image, 
                "Rscript")
                
## create the future cluster
plan(cluster, 
     workers = as.cluster(vms, 
                          docker_image=my_docker,
                          rscript=my_rscript),
                          
## create the list of data to run on the cluster
## here we assume they are in a folder of CSVs
## and there are as many files as VMs to run it upon
my_files <- list.files("myfolder")

my_data <- lapply(my_files, read.csv)

## make a big function to run asynchronously
cluster_f <- function(my_data, args = 4){
   
   forecast::forecast(forecast::auto.arima(ts(my_data, frequency = args)))
   
}

## send to cluster
result <- future::future_lapply(my_data, cluster_f, args = 4) 

## once done this will be TRUE
resolved(result)

## Your list of forecasts are now available
result

Cleanup

Remember to shut down your cluster. You are charged per second, per instance of uptime.

## shutdown instances when finished
lapply(vms, gce_vm_stop)

Pre-emptible VMs

Preemptible VMs are a lot cheaper (80%) than normal instances, but Google reserves the right to stop them at any time. They are intended to be used in non-critical jobs where if they shutdown you can account for it and launch another.

To create them, you need to pass scheduling = list(preemptible = TRUE) to gce_vm_create() creation family of functions.

Make sure you can cope with the result may not be returned, so over provision the VMs and ensure your script can deal with redoing jobs if they didn’t complete.

Quotas

You can launch as many VMs as you have quota for in your account. These vary from region, from ~240 to 720. You can apply for more quota if you need it.