massive-parallel.Rmd
Due to its integration with future
you can run massive computing tasks using a Google Compute Engine cluster with just a few lines of code.
Some more examples of using future can be found here, using fractals as an example.
On other platforms, see also an Azure example here on Revolution Analytics.
This workflow takes advantage of the future
integration to run your local R-functions within a cluster of GCE machines.
You can do this to throw up expensive computations by spinning up a cluster and tearing it down again once you are done.
In summary, this workflow:
The example below uses a default rocker/r-parallel
template, but you can also create a dynamic_template
pulled from the Container Registry if required.
Instead of the more generic gce_vm()
that is used for more interactive use, we create the instances directly using gce_vm_cluster()
.
This creates a cluster, uploads any SSH settings you have and tests the connection, then returns the list of VMs suitable for use in future::cluster()
.
By default it makes a 3 size cluster called r-cluster-1/2/3
:
library(future)
library(googleComputeEngineR)
vms <- gce_vm_cluster()
#2019-03-29 23:24:54> # Creating cluster with these arguments:template = r-base,dynamic_image = rocker/r-parallel,wait = #FALSE,predefined_type = n1-standard-1
#2019-03-29 23:25:04> Operation running...
#2019-03-29 23:25:07> Operation running...
#2019-03-29 23:25:10> Operation running...
#2019-03-29 23:25:17> Operation complete in 13 secs
#2019-03-29 23:25:20> Operation complete in 13 secs
#2019-03-29 23:25:23> Operation complete in 14 secs
#2019-03-29 23:25:25> r-cluster-1 VM running
#2019-03-29 23:25:27> r-cluster-2 VM running
#2019-03-29 23:25:29> r-cluster-3 VM running
#2019-03-29 23:25:37> Public SSH key uploaded to instance
#2019-03-29 23:25:45> Public SSH key uploaded to instance
#2019-03-29 23:25:53> Public SSH key uploaded to instance
#2019-03-29 23:25:53> # Testing cluster:
#Warning: Permanently added '35.233.25.199' (ED25519) to the list of known hosts.
r-cluster-1 ssh working
#Warning: Permanently added '35.187.54.41' (ED25519) to the list of known hosts.
r-cluster-2 ssh working
#Warning: Permanently added '35.205.66.124' (ED25519) to the list of known hosts.
r-cluster-3 ssh working
We now make the VM cluster as per details given in the future README
You can pass in your own arguments to gce_vm_cluster()
such as which docker image to use, name and custom SSH arguments you may have. See the function documentation for details.
The default uses rocker/r-parallel
as its image, but if you want your own custom image then create your own Docker image based on that one, for example via this tutorial using Google Build Triggers.
This will give you a docker image name such as gcr.io/my-project/my-r
- use a version of the code below to use this in your cluster.
You can also customise the RScript
command that launches your script, but always make sure to include --net=host
as is shown in the default arguments, so the Docker image uses the SSH ports the host VM has (e.g. it can connect to your SSH commands)
The cluster is now ready to recieve jobs. You can send them by simply using %<-%
instead of <-
. Another useful function is future.apply::future_lapply
that lets you loop over a cluster. Consult the future.apply
documentation for details.
## use %<-% to send functions to work on cluster
## See future README for details: https://github.com/HenrikBengtsson/future
a %<-% Sys.getpid()
## make a big function to run asynchronously
f <- function(my_data, args){
## ....expensive...computations
result
}
## send to cluster
result %<-% f(my_data)
For long running jobs you can use future::resolved
to check on its progress.
## check if resolved
resolved(result)
[1] TRUE
(Contributed by Grant McDermott.)
The above setup will parallelize across the VMs in your cluster. However, each VM will still only run tasks sequentially. In order to parallelize tasks on the VMs too, we need to tell our remote cluster (via future
) to use a nested parallelization strategy. At a high level this will involve two steps:
To illustrate, consider an example where we wish to run a slow function across a cluster of three VMs that each have eight cores (yielding 24 cores in total). We again use gce_vm_cluster()
to set this up. Note in passing that the default instantiation of the rocker/r-parallel Docker image on each VM is what allows us to run parallel processes on these machines themselves without having to install any additional packages.
library(googleComputeEngineR)
library(future.apply)
## Emulate a slow function that can be sped up in parallel
slow_square <-
function(x = 1) {
x_sq <- x^2
Sys.sleep(5)
return(x_sq)
}
## Set up our cluster: 3 VMs with 8 cores each
vms_nested <-
gce_vm_cluster(
vm_prefix = "nested-cluster",
cluster_size = 3,
#docker_image = "rocker/r-parallel", ## Default
predefined_type = "n1-highcpu-8",
scheduling = list(preemptible = TRUE) ## Optional: Use cheaper, preemptible machines
)
#2019-10-25 12:56:21> # Creating cluster with settings: predefined_type = n1-highcpu-8, scheduling = list(preemptible = TRUE), template = r-base, dynamic_image = rocker/r-parallel, wait = FALSE
#2019-10-25 12:56:29> Operation running...
#2019-10-25 12:56:35> Operation complete in 6 secs
#2019-10-25 12:56:38> Operation complete in 4 secs
#2019-10-25 12:56:42> Operation complete in 6 secs
#2019-10-25 12:56:43> nested-cluster1 VM running
#2019-10-25 12:56:44> nested-cluster2 VM running
#2019-10-25 12:56:46> nested-cluster3 VM running
#2019-10-25 12:56:54> Public SSH key uploaded to instance
#2019-10-25 12:57:01> Public SSH key uploaded to instance
#2019-10-25 12:57:09> Public SSH key uploaded to instance
#2019-10-25 12:57:09> # Testing cluster:
#Warning: Permanently added 'XX.XX.XX.XXX' (ED25519) to the list of known hosts.
#nested-cluster1 ssh working
#Warning: Permanently added 'YY.YY.YYY.YY' (ED25519) to the list of known hosts.
#nested-cluster2 ssh working
#Warning: Permanently added 'ZZ.ZZ.ZZZ.ZZ' (ED25519) to the list of known hosts.
#nested-cluster3 ssh working
Next, we must tweak the cluster plan so that future
is aware of the nested parallel structure. Nesting in the future
framework is operationalised by defining a series of so-called future “topologies”. In this case, we are going to define two topology layers:
future
to use the cluster of three remote VMs.future
to use all eight cores on each VM via the multiprocess
option.plan(list(
## Topology 1: Use the cluster of remote VMs
tweak(cluster, workers = as.cluster(vms_nested)),
## Topology 2: Use all CPUs on each VM
tweak(multiprocess)
))
The final thing that we’ll do before using our cluster is defining a convenience function for chunking the input arguments (i.e. data) to our function. Remember, we need to do this or else each VM will duplicate effort by working on the same parts of the problem. R provides lots of ways to split data into chunks, but here’s one that I’ve borrowed from StackOverflow. The advantages of this particular function are that it only requires base R operations and is robust to complications like unequal chunk lengths and different input types (e.g. factors vs numeric).
We are now ready to use all 24 cores of the cluster. For this particular example, I’m going to loop over 48 iterations of our slow_square()
function. (Note that this would take two minutes to run sequentially.) I’ll use future.lapply::future_sapply()
to run things in parallel and will also use the tictoc
package to record timing. I’m going to comment the next code block quite extensively, but just to quickly highlight the key conceptual stages:
future_sapply()
call.1:16
, VM2 gets 17:32
, and VM3 gets 33:48
)future_sapply()
call.## Input data (vector to be iterated over by our function)
input_data <- 1:48
## Run the function in nested parallel on our cluster and record timing
tictoc::tic()
ans <-
## Parallelise over the three VMS
future_sapply(seq_along(vms_nested), function(v) {
## Split the input data into distinct chunks for each VM
input_chunk <- chunk_func(input_data, length(vms_nested))[[v]]
## Parallelise within each of the VMs
future_sapply(input_chunk, slow_square)
})
tictoc::toc()
#11.451 sec elapsed
## Show that it worked
as.vector(ans)
# [1] 1 4 9 16 25 36 49 64 81 100 121 144 169 196 225
#[16] 256 289 324 361 400 441 484 529 576 625 676 729 784 841 900
#[31] 961 1024 1089 1156 1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025
#[46] 2116 2209 2304
And there you have it: a 21x speed up compared to the sequential option. (There’s a tiny bit of overhead, which is why we don’t achieve the full theoretical speed-up of 24x. But those margins will improve as the scale of the problem increases.) The same general approach demonstrated here can be adapted fairly easily to greatly reduce computation times for even complex forms of analysis.
Since we have been using preemptible machines for this example, they will automatically be deleted within 24 hours. However, we’ll delete them manaully since there’s no point incurring additional charges now that we’re done with them.
The below splits a dataset into chunks that are each run on a seperate VMs, using a custom Docker image that has the necessary packages installed, for instance via build triggers. Optimise by including the package future
in these Docker images.
library(future.apply) ## Will automatically load future too
library(googleComputeEngineR)
my_docker <- gce_tag_container("custom-image", project = "my-project")
vms <- gce_vm_cluster("r-vm", cluster_size = 3, docker_image = my_docker)
## create the future cluster
plan(cluster,
workers = as.cluster(vms,
docker_image=my_docker))
## create the list of data to run on the cluster
## here we assume they are in a folder of CSVs
## and there are as many files as VMs to run it upon
my_files <- list.files("myfolder")
my_data <- lapply(my_files, read.csv)
## make a big function to run asynchronously
cluster_f <- function(my_data, args = 4){
forecast::forecast(forecast::auto.arima(ts(my_data, frequency = args)))
}
## send to cluster
result <- future.apply::future_lapply(my_data, cluster_f, args = 4)
## once done this will be TRUE
resolved(result)
## Your list of forecasts are now available
result
This is from @ctlamb’s GitHub issue #93 which uses a custom Dockerfile to install the raster package.
The custom Dockerfile was setup in this GitHub repo then made into an image with these Build Trigger settings:
Make sure the VMs are created in the same project as the build triggers to ensure authentication is smooth.
The example code is shown below, assuming your custom Docker image is available at gcr.io/your-project/raster
library(raster)
library(googleComputeEngineR)
library(future)
library(future.apply)
library(SpaDES.tools)
gce_global_project("your-project")
## create raster
row <- 8
col <- 8
r <- raster(nrows=row, ncols=col,
xmn=0, xmx=row,
ymn=0, ymx=col,
vals=c(1:(row*col)))
plot(r)
## Split
r_split <- splitRaster(r, nx=2, ny=2)
## create model
df <- data.frame(y=c(1:10),layer=c(1:5,7,6,8:10))
mod <- glm(y~layer, data=df)
## create CPUs names - here we customise the CPU machine type
vms <- gce_vm_cluster("myvms", predefined_type = "n1-highmem-2")
## once all launched, add to cluster with custom Dockerfile
## use plan(sequential) for local testing
plan(cluster, workers = as.cluster(vms, docker_image=my_image)
## make the vector of stuff to send to nodes
o <- lapply(r_split, readAll)
## the action you want to perform on the elements in the cluster
my_single_function <- function(x){
raster::predict(x, mod)
}
#parallel - working?
result <- future_lapply(o, my_single_function)
## tidy up
gce_vm_stop(vms)
Preemptible VMs are a lot cheaper (80%) than normal instances, but Google reserves the right to stop them at any time. They are intended to be used in non-critical jobs where if they shutdown you can account for it and launch another.
To create them, you need to pass scheduling = list(preemptible = TRUE)
to gce_vm_create()
creation family of functions.
Make sure you can cope with the result may not be returned, so over provision the VMs and ensure your script can deal with redoing jobs if they didn’t complete.
You can launch as many VMs as you have quota for in your account. These vary from region, from ~240 to 720. You can apply for more quota if you need it.