pm25_yearly_download.cwl

#!/usr/bin/env cwl-runner
### Pipeline to aggregate data in NetCDF format over given geographies
#  Copyright (c) 2021-2022. Harvard University
#
#  Developed by Research Software Engineering,
#  Faculty of Arts and Sciences, Research Computing (FAS RC)
#  Author: Michael A Bouzinier
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

cwlVersion: v1.2
class: Workflow

requirements:
  SubworkflowFeatureRequirement: {}
  StepInputExpressionRequirement: {}
  InlineJavascriptRequirement: {}
  ScatterFeatureRequirement: {}
  MultipleInputFeatureRequirement: {}


doc: |
  Workflow to aggregate pollution data coming in NetCDF format
  over given geographies (zip codes or counties) and output as 
  CSV files. This is a wrapper around actual aggregation of
  one file allowing to scatter (parallelize) the aggregation
  over years.
  
  The output of the workflow are gzipped CSV files containing
  aggregated data. 
  
  Optionally, the aggregated data can be ingested into a database
  specified in the connection parameters:
  
  * `database.ini` file containing connection descriptions
  * `connection_name`  a string referring to a section in the `database.ini`
     file, identifying specific connection to be used.

  The workflow can be invoked either by providing command line options 
  as in the following example:
  
      toil-cwl-runner --retryCount 1 --cleanWorkDir never \ 
          --outdir /scratch/work/exposures/outputs \ 
          --workDir /scratch/work/exposures \
          pm25_yearly_download.cwl \  
          --database /opt/local/database.ini \ 
          --connection_name dorieh \ 
          --downloads s3://nsaph-public/data/exposures/wustl/ \ 
          --strategy default \ 
          --geography zcta \ 
          --shape_file_collection tiger \ 
          --table pm25_annual_components_mean

  Or, by providing a YaML file (see [example](../test_exposure_job)) 
  with similar options:
  
      toil-cwl-runner --retryCount 1 --cleanWorkDir never \ 
          --outdir /scratch/work/exposures/outputs \ 
          --workDir /scratch/work/exposures \
          pm25_yearly_download.cwl test_exposure_job.yml 
  

inputs:
  proxy:
    type: string?
    default: ""
    doc: HTTP/HTTPS Proxy if required
  downloads:
    type: Directory
    doc: |
      Local or AWS bucket folder containing netCDF grid files, downloaded 
      and unpacked from Washington University in St. Louis (WUSTL) Box
      site. Annual and monthly data repositories are described in
      [WUSTL Atmospheric Composition Analysis Group](https://sites.wustl.edu/acag/datasets/surface-pm2-5/).
      
      The annual data for PM2.5 is also available in 
      a Harvard URC AWS Bucket: `s3://nsaph-public/data/exposures/wustl/`
  geography:
    type: string
    doc: |
      Type of geography: zip codes or counties
      Supported values: "zip", "zcta" or "county"
  years:
    type: int[]
    default: [2000,2001,2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017]
  variable:
    type: string
    default:  PM25
    doc: |
      The main variable that is being aggregated over shapes. We have tested
      the pipeline for PM25
  component:
    type: string[]
    default: [BC, NH4, NIT, OM, SO4, SOIL, SS]
    doc: |
      Optional components provided as percentages in a separate set 
      of netCDF files
  strategy:
    type: string
    default: auto
    doc: |
      Rasterization strategy, see
      [documentation](https://nsaph-data-platform.github.io/nsaph-platform-docs/common/gridmet/doc/strategy.html)
      for the list of supported values and explanations
  ram:
    type: string
    default: 2GB
    doc: Runtime memory, available to the process

  shape_file_collection:
    type: string
    default: tiger
    doc: |
      [Collection of shapefiles](https://www2.census.gov/geo/tiger), 
      either GENZ or TIGER
  database:
    type: File
    doc: |
      Path to database connection file, usually database.ini. 
      This argument is ignored if `connection_name` == `None`
    default:
      path: database.ini
      class: File

  connection_name:
    type: string
    doc: |
      The name of the section in the database.ini file or a literal
      `None` to skip over database ingestion step
  table:
    type: string
    doc: The name of the table to store teh aggregated data in
    default: pm25_aggregated


steps:
  initdb:
    run: initdb.cwl
    doc: Ensure that database utilities are at their latest version
    in:
      database: database
      connection_name: connection_name
    out:
      - log
      - err

  process:
    doc: Downloads raw data and aggregates it over shapes and time
    scatter:
      - year
    run: aggregate_one_file.cwl
    in:
      proxy: proxy
      downloads: downloads
      geography: geography
      shape_file_collection: shape_file_collection
      year: years
      variable: variable
      component: component
      strategy: strategy
      ram: ram
      table: table
      depends_on: initdb/log
    out:
      - shapes
      - aggregate_data
      - consolidated_data
      - aggregate_log
      - aggregate_err
      - data_dictionary

  extract_data_dictionary:
    run:
      class: ExpressionTool
      inputs:
        yaml_files:
          type: File[]
      outputs:
        data_dictionary:
          type: File
      expression: |
        ${
          return {data_dictionary: inputs.yaml_files[0]}
        }
    in:
      yaml_files: process/data_dictionary
    out:
      - data_dictionary

  ingest:
    run: ingest.cwl
    when: $(inputs.connection_name.toLowerCase() != 'none')
    doc: Uploads data into the database
    in:
      registry: extract_data_dictionary/data_dictionary
      domain:
        valueFrom: "exposures"
      table: table
      input: process/aggregate_data
      database: database
      connection_name: connection_name
    out: [log, errors]

  index:
    run: index.cwl
    when: $(inputs.connection_name.toLowerCase() != 'none')
    in:
      depends_on: ingest/log
      registry: extract_data_dictionary/data_dictionary
      domain:
        valueFrom: "exposures"
      table: table
      database: database
      connection_name: connection_name
    out: [log, errors]

  vacuum:
    run: vacuum.cwl
    when: $(inputs.connection_name.toLowerCase() != 'none')
    in:
      depends_on: index/log
      registry: extract_data_dictionary/data_dictionary
      domain:
        valueFrom: "exposures"
      table: table
      database: database
      connection_name: connection_name
    out: [log, errors]



outputs:
  aggregate_data:
    type: File[]
    outputSource: process/aggregate_data
  data_dictionary:
    type: File
    outputSource: extract_data_dictionary/data_dictionary
    doc: Data dictionary file, in YaML format, describing output variables
  consolidated_data:
    type: File[]
    outputSource: process/consolidated_data
  shapes:
    type:
      type: array
      items:
        type: array
        items: [File]
    outputSource: process/shapes

  aggregate_log:
    type:
      type: array
      items: Any

    outputSource: process/aggregate_log
  aggregate_err:
    type: File[]
    outputSource: process/aggregate_err

  ingest_log:
    type: File
    outputSource: ingest/log
  index_log:
    type: File
    outputSource: index/log
  vacuum_log:
    type: File
    outputSource: vacuum/log
  ingest_err:
    type: File
    outputSource: ingest/errors
  index_err:
    type: File
    outputSource: index/errors
  vacuum_err:
    type: File
    outputSource: vacuum/errors