BETA. It may be incomplete and/or inaccurate. Post suggestions to the Comments section and be sure to read about updates also within the Comments section.
This tutorial outlines the most straightforward way to run a GATK4 Spark tool using Google Cloud’s Dataproc. We cover how to spin up a Dataproc cluster via a browser (section 1) and also via a gcloud command (section 3). We use GATK4 FlagStatSpark as an example tool to survey SAM flags on a tiny 53MB BAM file (section 2) and on an entire 12.21GB exome BAM file (section 4). Both files are made available in a publicly accessible Google Cloud storage bucket. We conclude by examining the impact of adjusting some Spark parameters on job run times (section 5).
First things first. This tutorial costs about fourteen cents a.
Second, we remind you that GATK4 is in BETA as of this writing. Spark capabilities are still under active development and have yet to become full-featured across the tools. We write at this time to enable use of the spark capability of GermlineCNVCaller, a tool for germline copy number variant (CNV) detection that also runs in single-machine mode.
Prepare the following to be able to actively follow along. If you need more detailed instructions, search the GATK forum or Google Cloud documentation.
gcloud initinto your bash command line. Follow the directions for configuration. Be mindful in setting the region and zone b, and be sure to enable API compute. View the configuration with
gcloud config list.
Again, the data files are in Google Cloud Storage, as Multi-Regional data accessible globally. The point of this tutorial is that you need not download any large data. We keep the BAMs in the cloud and take our analysis to the cloud. Here are the cloud locations of the tutorial BAM files. FlagStat needs only the BAM.
53MB snippet from Tutorial#6484. This particular dataset is also downloadable from our FTP site.
12.21GB WES data from 1000 Genomes Project aligned to GRCh38DH. The 1000 Genomes Project provides GRCh37-aligned files at gs://genomics-public-data/1000-genomes/bam. The GRCh38 equivalent files are currently only available in CRAM format from the project’s FTP site and have known issues detailed in the site README. Because CRAM support is yet to become widely available for GATK4 tools, for the purposes of this tutorial we provide the decompressed BAM at:
Check if you can access these files with the following command.
gsutil ls -lr gs://gatk-test-data/exome_bam/1000G_wex_hg38/HG00133.alt_bwamem_GRCh38DH.20150826.GBR.exome.bam
You should see the size of the BAM listed at roughly 12.21GB.
You can either spin up a cluster (i) via the command line using gcloud or (ii) use a browser to click through Google Cloud Platform’s Dataproc form. Given the second option shows the possible parameters, it is useful to learn how to navigate. Let’s start with this latter approach.
In the browser, from your Google Cloud console, click on the main menu’s triple-bar icon that looks like an abstract hamburger in the upper-left corner. Navigate to Menu > Dataproc > Clusters.
If this is the first time you land here, then click the
Enable APIbutton and wait a few minutes as it enables.
Fill out the cluster creation form.
Regionsetting to the default
globalwhen creating Dataproc clusters, unless your job launches, data and clusters all reside within the same region. Then set it to the specific region b. Set the
Zoneto match what you have set for your Cloud SDK, which presumably in turn matches where most of your data resides.
n1-standard-1 (1 vCPU, 3.75 GB memory)and 10GB disk size for both the master and the worker nodes. We use the default minimum two worker nodes. For tools such as GermlineCNVCaller, you will want to use more resources, e.g select
n1-standard-16 (16 vCPU, 60.0 GB memory)for both master and worker nodes and ten worker nodes for 200 to 400 samples.
Image Versionchanges the Dataproc version, which by default is set to the latest stable version. As of this writing, the default
Image Versionis v1.1. Selecting another version, e.g. an earlier version, may cause the jobs in this tutorial to error. Of note is the option
Initialization actions. This argument allows you to run your own script(s), stored in a google cloud bucket, e.g. to install programs on the nodes.
It will take a few minutes for the cluster to provision. In this view, so long as the status of this VM is Running, it is incurring fees c.
View the individual cluster nodes under Menu > Compute Engine > VM instances. If there are many VMs, type the name of the cluster into the textbox to filter for specific cluster VMs. Each of the master and worker nodes list separately, e.g. parsley-m, parsley-w-0 and parsley-w1, where
m stands for master and
w stands for worker.
Now here is something handy. See the SSH menu under Connect? Select this and then select View gcloud command. Copy and paste this command into bash. You may need to click the right-hand HIDE INFO PANEL option to see the column.
gcloud compute --project "broad-dsde-dev" ssh --zone "us-central1-a" "parsley-m"
Another way to interact with the clusters, that this tutorial does not cover and that we recommend exploring, is to use it to monitor the job progress from the cluster’s Hadoop application. The application creates an HTML report that is viewable from a local browser. This requires creating a tunnel to the cloud VM. See instructions at https://cloud.google.com/dataproc/docs/concepts/cluster-web-interfaces. Similarly, and this is a digression, we could view BAM files with IGV d.
back to top
For runs that you expect to take a long time, be sure to enable TMUX, Screen or equivalent terminal multiplexer in your bash session.
Type following command to run FlagStatSpark on a publically accessible cloud BAM. For an introduction to SAM flags, see Blog#7019.
gatk-launch FlagStatSpark \ -I gs://gatk-tutorials/how-to/6484_snippet.bam \ --disableReadFilter WellformedReadFilter \ -- --sparkRunner GCS --cluster parsley
The first half of the command refers to tool-specific arguments and the second half to Spark-specific arguments. The command separates these sections with the standalone double-dashes
-Iand disable the
WellformedReadFilterread filter (more about this later).
--sparkRunner GCS, which refers to Google Cloud Service.
FlagStatSpark writes results to standard output (stdout), directly to the bash session. Amongst other standard messages, the SAM flag counts of interest are:
We also see the run completes in less than a minute. In the next sections, we create and use a new cluster that is identical in specs to parsley. You can opt to skip section 3 and to run section 4's command on parsley. Otherwise, click on the cluster from Menu>Dataproc>Clusters and click on Delete to delete it.
As the name implies, FlagStatSpark is the Spark version of FlagStat. Many GATK4 tools, if Spark-enabled, contain Spark within the tool name. However, there are a number of tool names that lack the Spark label that are in fact Spark-capable. See tool documentation for details.
On my local computer, if I run either FlagStatSpark or FlagStat on the same data, I will get the same counts. However, the stdouts are noticeably different. The Spark version of the tool automatically runs on my laptop using available cores. Both tools run quickly, in a matter of seconds. For more details on how to tweak a local Spark run, see https://github.com/broadinstitute/gatk#sparkcluster.
This GATK implementation of Samtools flagstat can give different results from the original tool. To get equivalent results, we must disable the
WellformedReadFilter that the GATK4 Engine uses to remove what it considers malformed reads. For details on filters, see Read Filters documentation. GATK4 employs the
WellformedReadFilter for FlagStat because most BAM processing tools use this filter. It is practical for GATK4 FlagStat to survey SAM flags only for reads that GATK4 tools actually consider.
back to top
Instead of clicking through a GUI on the browser to generate a cluster, we can use the gcloud program to generate clusters via the bash command line.
Here is the command that creates a new cluster we call
basil that is identical in specifications to
parsley. Be sure to fill in the
gcloud dataproc clusters create basil \ --zone us-central1-a \ --master-machine-type n1-standard-1 \ --master-boot-disk-size 10 \ --num-workers 2 \ --worker-machine-type n1-standard-1 \ --worker-boot-disk-size 10 \ --project project_id
Again, we are careful to match our
--zone to that of our local Cloud SDK setup, e.g. in my case
us-central1-a. By omitting the
--region argument, Dataproc sets it to the default
global. Refresh the browser to see the cluster appear in the console, or use
gcloud dataproc clusters list.
Next, we run FlagStatSpark on a 12.21GB 1000 Genomes Project WES BAM using the cluster
basil. The approach we are taking to run FlagStatSpark, using Dataproc, directly distributes the BAM record data to worker nodes. Spark distributes the data across the worker nodes in partitions called Resilient Distributed Datasets or RDDs. The cluster’s two worker nodes sum to 20GB is disk space and happen to be able to hold the size of the WES BAM’s data. However, for our case, this is not a necessary requirement e.
The FlagStatSpark command is pretty much the same as that in section 2, except we use a different cloud BAM and the new cluster.
gatk-launch FlagStatSpark \ -I gs://gatk-test-data/exome_bam/1000G_wex_hg38/HG00133.alt_bwamem_GRCh38DH.20150826.GBR.exome.bam \ --disableReadFilter WellformedReadFilter \ -- --sparkRunner GCS --cluster basil
We can view each node’s use of CPU, disk, disk operations, network and network packets under the console’s Compute Engine by clicking on the node. Similarly, we can see the same for the entire job under Dataproc by clicking on the cluster.
When we view
basil’s resource utilization charts, we observe something counterintuitive. Based on CPU utilization, it appears that one worker, w-0, performs most of the work, while the other worker, w-1, is mostly inactive. This comes as a surprise, because we expect some optimized distribution and throughput for the work, namely even distribution for shorter runtimes, given we enable dynamic allocation by omitting executor options f. If we increase the number of worker nodes, we see work distributes more equitably across all but one worker. We still observe the single worker that carries a minimal load. We discuss how to remedy such apparent lazy workers in section 5.
Finally, clean up so as not to incur unnecessary charges. Delete clusters as well as any log and staging directories the job creates in your storage bucket g. In the browser, toggle the box next to the cluster name and select Delete. From the command line use
gcloud dataproc clusters delete basil
So far, this tutorial familiarizes you to spinning up a Dataproc cluster and running a GATK4 Spark tool. It does not showcase Spark capabilities.
Multiple factors impact the performance of tools on Spark and these factors vary for different tools and data, e.g. a tool that counts versus performs algorithmic functions, network connections and the size and location of data. The appeal of Spark is in the ability of workers to each carry forward transformed data to the next analysis without having to read or write the intermediary data from and to files. GATK4 offers a number of pipelines, e.g. BQSRPipelineSpark, and running a pipeline should provide improvements to wall-clock performance compared to a pipeline consisting of the equivalent individual tools, whether Spark or non-Spark. Similarly, GATK4’s GermlineCNVCaller is a tool that in actuality is a pipeline that mirrors the functionality of the multiple tools in the Somatic CNV workflow outlined in Tutorial#9143.
We can amend our FlagStatSpark command to specify the following Spark components related to executors. A Spark executor runs tasks. It can run tasks multiply on two fronts--(i) over its lifetime and (ii) concurrently (see this Cloudera blogpost). A worker node can have multiple executors and these executors can share cores via task switching.
--executor-memorycontrols the executor heap size
--num-executorsspecifies the total number of executors for the cluster
--executor-coresis the number of cores per executor
When we specify the
--num-executors argument for a GATK Spark tool, we turn off the default dynamic allocation f.
We can tune the executor parameters to a combination that hits a sweet spot of efficient cluster utilization for our particular data. In testing combinations, we follow guidelines from a Cloudera blogpost by Sandy Ryza that gives a rule-of-thumb recommendation and that illustrates with bad and good allocations.
We start with identical cluster setups that use
n1-standard-4, 15GB nodes for the master and two workers. We vary the three executor parameters, and find we can reduce the wallclock time of running FlagStatSpark on the WES data by over half, from ~6 minutes with dynamic allocation to roughly two and a half minutes with six or more executors. Note that runtimes are subject to some amount of variance, so the same command run at different times can complete with different times. The exact cluster parameters for the data points in the chart above are in the table below.
For the datapoint at seven executors, the corresponding FlagStatSpark command is:
gatk-launch FlagStatSpark \ -I gs://gatk-test-data/exome_bam/1000G_wex_hg38/HG00133.alt_bwamem_GRCh38DH.20150826.GBR.exome.bam \ --disableReadFilter WellformedReadFilter \ -- --sparkRunner GCS --cluster parsley \ --num-executors 7 \ --executor-memory 4G \ --executor-cores 3
If we peruse the CPU utilization charts for these jobs, we see that the two workers carryout more equitable workloads. The comparison below shows CPU and I/O utilization by FlagStatSpark on the WES BAM for dynamically allocated versus seven executors.
Finally, we can test the effect of increasing the number of worker nodes, while keeping the number of executors proportional, by running FlagStatSpark on a ~113GB WGS BAM e. Here we see diminishing returns after a certain number of executors, i.e. forty and greater.
The simplest approach is to use a bash script. Run the script locally for authentication purposes.
Here is a simple interactive script that spins up a cluster, runs FlagStatSpark on a user-specified cloud BAM and cleans up afterwards by deleting the cluster: gatk4_flagstatspark_dataproc.txt. This script cannot run on the free trial account a. Download and save with
.sh extension. Navigate to the directory the script is saved in and run it by typing
If you have problems running the script, you may need to change the ownership with
chmod u+x gatk4_flagstatspark_dataproc.sh
[a] As of this writing (July 2017), there is a free Google Cloud Platform trial that gives you $300 in credit that is good for a year. I signed up in a few minutes for the explicit purpose of getting a sense of pricing. Because the account is free, it has limitations, namely we can only have eight CPU cores. For a minimal Dataproc cluster consisting of three identical nodes, this means we can only use two of the offered machine types. This is inadequate to demonstrate the efficiency of Spark, and so in section 5 we plot run times from some other setups by switching to a non-trial account.
Zone settings are important and can be a bit tricky. Check what these are set to with
gcloud config list compute/region and
gcloud config list compute/zone. If you haven’t set these, you can either choose to forge ahead without setting these or set them by initializing with
gcloud init. You can change these directly, e.g. with
gcloud config set compute/zone us-central1-a and
gcloud config set compute/region global.
Regionrefers to regions from which you can control the cluster, i.e. run jobs. Because we write this from the U.S. East Coast, from Boston, but our cluster resides in the Midwest (in
us-central1), we keep the
global. Otherwise, our job submissions will error about non-extant clusters, e.g.
ERROR: (gcloud.dataproc.jobs.submit.spark) NOT_FOUND: No current cluster for project id 'XYZ' with name 'ABC'
Conceptually, regions are made up of zones and so when we specify the
Zone, we are in fact also implying a region, which is what can make setting these two parameters confusing. To reiterate, leave the
Region setting to the default
global when creating Dataproc clusters, unless your job launches, data and clusters all reside within the same region. Then set it to the specific region. Be sure to set the
Zone to match what you have set for your Cloud SDK, which presumably in turn matches where most of your data resides. Accessing data within a zone is free while accessing data across zones incurs a fee, e.g. one cent per egress. See https://cloud.google.com/compute/pricing.
[c] As of this writing, Dataproc rounds billing up to ten minutes if you use less, and then bills by the minute.
[d] For viewing BAM files, SSH to install IGV on the VM, use IGV where the data resides and view by tunneling. Transmitting imagery through the tunnel expends less compute than transmitting read data across a network.
[e] The worker nodes do not necessarily have to hold all the starting data at the beginning. The job can run by streaming the data as it runs, i.e. it can rely on deliveries of data packets as it processes the data. FlagStat goes through BAM records to count SAM flags, which is a process that requires minimal compute. Furthermore, the tool need not write a large BAM, and needs only to write a small table of counts. So we can skip factoring for disk space to write results. We can in fact run FlagStatSpark on a 113GB WGS BAM on a cluster whose worker node disks sum to a fraction of the size without issue, e.g. where workers sum to 30GB of disk. If we use the n1-standard-4 machine for two 15GB disk workers and the seven executor setup from section 5, FlagStatSpark completes processing the WGS BAM in roughly 28 minutes. If we normalize the WGS runtime against the number of SAM records it contains compared to the WES data, its FlagStatSpark runtime takes 1.61 times longer. If we normalize using the sizes of the BAM file, a rough heuristic to factor for length of reads that assumes similar complexity of base quality profiles, then the WGS runtime takes 1.39 times longer. We conjecture this lag results from the additional time spent on ferrying over data as well as on clearing processed data.
[f] GATK4’s README as of the gatk4.beta.3 release states:
You can also omit the "--num-executors" argument to enable dynamic allocation if you configure the cluster properly (see the Spark website for instructions).
[g] What do we mean by log and staging directories? In our storage bucket, we see that Dataproc creates cluster and job logs for each cluster and each
gatk-launch command. If we look into the job log, we see the run saves the standard output, which for this tool actually contains FlagStat results of interest. Additionally, the staging folder contains the
gatk-package-4.beta.2-spark.jar, which is 131.6MB.