Data Processing Pipelines

Introduction

For the majority of computational studies, their data acquisition workflow can be represented as a data processing pipeline. A data processing pipeline consists of steps, each step being either a script, a binary executable, or a specific data transformation within a data warehouse. Some steps are dependent on the results of other steps. Therefore, every workflow can be represented as a directed acyclic graph (DAG), where steps are the nodes of the graph and dependencies are its edges. Pipeline topologies are complex as they provide for massive parallelization and multiple dependencies. When pipeline logic is expressed in a procedural programming language, the result is a very convoluted program that is economically ineffective to maintain. Fortunately, alternatives to procedural languages have been proposed and are widely used.

To the full extent both repeatability and reproducibility of data processing pipelines is addressed by special descriptive domain specific languages (DSL). Three such languages have been developed by different communities that work primarily in bioinformatics. The most widely adopted is Common Workflow Language (CWL), the most popular in terms of studies that use it, the number of published workflows and the number of runtime platforms supporting execution of the workflows defined in CWL. The pipelines we publish here are all written in CWL.

Descriptive workflow languages focus on explicit definition of the pipeline topology and insulation of the topology, inputs, requirements and outputs from the actual processing algorithms. Developed primarily by bioinformatics communities and mostly used by bioinformatics projects they expect the inputs to be in a few well-defined formats and orchestrate the work of a limited set of known tools. In other areas, for example, in population health research, the data comes from much more diverse sources, in diverse and often incompatible formats. Therefore, many steps (nodes in the pipeline topology) will be responsible for various data transformation operations.

Running Pipelines

Tested runners

CWL is “write once run anywhere” language. If a piepline one has developed runs on the developer’s laptop it is more or less guaranteed it will run in any on-prem cluster or cloud environment, provided a runner supporting a given environment is used. One can find the list of currently supported runners in CWL Implementations page.

We have used cwltool, CWL-Airflow and Toil in our development and production. Toil’s output is a little too verbose, but it has great features like ability to restart a pipeline from a failed step (even after minor corrections) and native support for AWS Batch. CWL-Airflow provides a nice graphical user interface.

See Toil documentation for some details of running CWL workflows.

Providing parameters to the pipelines

Parameters can be provided either as command line options with two dashes -- or in YaML or JSON file.

Note, that when using a YaML file, files and directories have to be specified in the following way:

my_file:
  path: /path/to/data.nc
  class: File
my_directory:
  path: /path/to/data/downloads
  class: Directory

Using Toil

A few hints if you are using Toil:

  1. Install Toil with aws and cwl options:

     pip install toil[aws,cwl]
    
  2. If you want to be able to restart a pipeline from a failed step you need to provide --jobStore option. To restart - give exactly the same options adding --restart

Here is an example of a command to execute a pipeline:

toil-cwl-runner --retryCount 1 --cleanWorkDir never \ 
  --outdir /scratch/work/exposures/outputs \ 
  --workDir /scratch/work/exposures \
  --jobStore /scratch/work/someDir123
  pm25_yearly_download.cwl test_exposure_job.yml 

Testing pipelines

Pipelines can be tested using included DBT Pipeline Testing Framework

Installing Python dependencies

The following requirements.txt file can be used to fetch all Python packages included in the platform.

git+https://github.com/NSAPH-Data-Platform/nsaph-gridmet.git@develop
git+https://github.com/NSAPH-Data-Platform/nsaph-cms.git@develop
git+https://github.com/NSAPH-Data-Platform/nsaph-epa.git@develop

If you are running pipelines only from a specific domain, you can fetch only this domain, e.g., for exposures and climate:

pip install git+https://github.com/NSAPH-Data-Platform/nsaph-gridmet.git@develop

or, for CMS data:

pip install git+https://github.com/NSAPH-Data-Platform/nsaph-cms.git@develop