pm25_yearly_download.cwl

  1#!/usr/bin/env cwl-runner
  2### Pipeline to aggregate data in NetCDF format over given geographies
  3#  Copyright (c) 2021-2022. Harvard University
  4#
  5#  Developed by Research Software Engineering,
  6#  Faculty of Arts and Sciences, Research Computing (FAS RC)
  7#  Author: Michael A Bouzinier
  8#
  9#  Licensed under the Apache License, Version 2.0 (the "License");
 10#  you may not use this file except in compliance with the License.
 11#  You may obtain a copy of the License at
 12#
 13#         http://www.apache.org/licenses/LICENSE-2.0
 14#
 15#  Unless required by applicable law or agreed to in writing, software
 16#  distributed under the License is distributed on an "AS IS" BASIS,
 17#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 18#  See the License for the specific language governing permissions and
 19#  limitations under the License.
 20#
 21
 22cwlVersion: v1.2
 23class: Workflow
 24
 25requirements:
 26  SubworkflowFeatureRequirement: {}
 27  StepInputExpressionRequirement: {}
 28  InlineJavascriptRequirement: {}
 29  ScatterFeatureRequirement: {}
 30  MultipleInputFeatureRequirement: {}
 31
 32
 33doc: |
 34  Workflow to aggregate pollution data coming in NetCDF format
 35  over given geographies (zip codes or counties) and output as 
 36  CSV files. This is a wrapper around actual aggregation of
 37  one file allowing to scatter (parallelize) the aggregation
 38  over years.
 39  
 40  The output of the workflow are gzipped CSV files containing
 41  aggregated data. 
 42  
 43  Optionally, the aggregated data can be ingested into a database
 44  specified in the connection parameters:
 45  
 46  * `database.ini` file containing connection descriptions
 47  * `connection_name`  a string referring to a section in the `database.ini`
 48     file, identifying specific connection to be used.
 49
 50  The workflow can be invoked either by providing command line options 
 51  as in the following example:
 52  
 53      toil-cwl-runner --retryCount 1 --cleanWorkDir never \ 
 54          --outdir /scratch/work/exposures/outputs \ 
 55          --workDir /scratch/work/exposures \
 56          pm25_yearly_download.cwl \  
 57          --database /opt/local/database.ini \ 
 58          --connection_name dorieh \ 
 59          --downloads s3://nsaph-public/data/exposures/wustl/ \ 
 60          --strategy default \ 
 61          --geography zcta \ 
 62          --shape_file_collection tiger \ 
 63          --table pm25_annual_components_mean
 64
 65  Or, by providing a YaML file (see [example](../test_exposure_job)) 
 66  with similar options:
 67  
 68      toil-cwl-runner --retryCount 1 --cleanWorkDir never \ 
 69          --outdir /scratch/work/exposures/outputs \ 
 70          --workDir /scratch/work/exposures \
 71          pm25_yearly_download.cwl test_exposure_job.yml 
 72  
 73
 74inputs:
 75  proxy:
 76    type: string?
 77    default: ""
 78    doc: HTTP/HTTPS Proxy if required
 79  downloads:
 80    type: Directory
 81    doc: |
 82      Local or AWS bucket folder containing netCDF grid files, downloaded 
 83      and unpacked from Washington University in St. Louis (WUSTL) Box
 84      site. Annual and monthly data repositories are described in
 85      [WUSTL Atmospheric Composition Analysis Group](https://sites.wustl.edu/acag/datasets/surface-pm2-5/).
 86      
 87      The annual data for PM2.5 is also available in 
 88      a Harvard URC AWS Bucket: `s3://nsaph-public/data/exposures/wustl/`
 89  geography:
 90    type: string
 91    doc: |
 92      Type of geography: zip codes or counties
 93      Supported values: "zip", "zcta" or "county"
 94  years:
 95    type: int[]
 96    default: [2000,2001,2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017]
 97  variable:
 98    type: string
 99    default:  PM25
100    doc: |
101      The main variable that is being aggregated over shapes. We have tested
102      the pipeline for PM25
103  component:
104    type: string[]
105    default: [BC, NH4, NIT, OM, SO4, SOIL, SS]
106    doc: |
107      Optional components provided as percentages in a separate set 
108      of netCDF files
109  strategy:
110    type: string
111    default: auto
112    doc: |
113      Rasterization strategy, see
114      [documentation](https://nsaph-data-platform.github.io/nsaph-platform-docs/common/gridmet/doc/strategy.html)
115      for the list of supported values and explanations
116  ram:
117    type: string
118    default: 2GB
119    doc: Runtime memory, available to the process
120
121  shape_file_collection:
122    type: string
123    default: tiger
124    doc: |
125      [Collection of shapefiles](https://www2.census.gov/geo/tiger), 
126      either GENZ or TIGER
127  database:
128    type: File
129    doc: |
130      Path to database connection file, usually database.ini. 
131      This argument is ignored if `connection_name` == `None`
132    default:
133      path: database.ini
134      class: File
135
136  connection_name:
137    type: string
138    doc: |
139      The name of the section in the database.ini file or a literal
140      `None` to skip over database ingestion step
141  table:
142    type: string
143    doc: The name of the table to store teh aggregated data in
144    default: pm25_aggregated
145
146
147steps:
148  initdb:
149    run: initdb.cwl
150    doc: Ensure that database utilities are at their latest version
151    in:
152      database: database
153      connection_name: connection_name
154    out:
155      - log
156      - err
157
158  process:
159    doc: Downloads raw data and aggregates it over shapes and time
160    scatter:
161      - year
162    run: aggregate_one_file.cwl
163    in:
164      proxy: proxy
165      downloads: downloads
166      geography: geography
167      shape_file_collection: shape_file_collection
168      year: years
169      variable: variable
170      component: component
171      strategy: strategy
172      ram: ram
173      table: table
174      depends_on: initdb/log
175    out:
176      - shapes
177      - aggregate_data
178      - consolidated_data
179      - aggregate_log
180      - aggregate_err
181      - data_dictionary
182
183  extract_data_dictionary:
184    run:
185      class: ExpressionTool
186      inputs:
187        yaml_files:
188          type: File[]
189      outputs:
190        data_dictionary:
191          type: File
192      expression: |
193        ${
194          return {data_dictionary: inputs.yaml_files[0]}
195        }
196    in:
197      yaml_files: process/data_dictionary
198    out:
199      - data_dictionary
200
201  ingest:
202    run: ingest.cwl
203    when: $(inputs.connection_name.toLowerCase() != 'none')
204    doc: Uploads data into the database
205    in:
206      registry: extract_data_dictionary/data_dictionary
207      domain:
208        valueFrom: "exposures"
209      table: table
210      input: process/aggregate_data
211      database: database
212      connection_name: connection_name
213    out: [log, errors]
214
215  index:
216    run: index.cwl
217    when: $(inputs.connection_name.toLowerCase() != 'none')
218    in:
219      depends_on: ingest/log
220      registry: extract_data_dictionary/data_dictionary
221      domain:
222        valueFrom: "exposures"
223      table: table
224      database: database
225      connection_name: connection_name
226    out: [log, errors]
227
228  vacuum:
229    run: vacuum.cwl
230    when: $(inputs.connection_name.toLowerCase() != 'none')
231    in:
232      depends_on: index/log
233      registry: extract_data_dictionary/data_dictionary
234      domain:
235        valueFrom: "exposures"
236      table: table
237      database: database
238      connection_name: connection_name
239    out: [log, errors]
240
241
242
243outputs:
244  aggregate_data:
245    type: File[]
246    outputSource: process/aggregate_data
247  data_dictionary:
248    type: File
249    outputSource: extract_data_dictionary/data_dictionary
250    doc: Data dictionary file, in YaML format, describing output variables
251  consolidated_data:
252    type: File[]
253    outputSource: process/consolidated_data
254  shapes:
255    type:
256      type: array
257      items:
258        type: array
259        items: [File]
260    outputSource: process/shapes
261
262  aggregate_log:
263    type:
264      type: array
265      items: Any
266
267    outputSource: process/aggregate_log
268  aggregate_err:
269    type: File[]
270    outputSource: process/aggregate_err
271
272  ingest_log:
273    type: File
274    outputSource: ingest/log
275  index_log:
276    type: File
277    outputSource: index/log
278  vacuum_log:
279    type: File
280    outputSource: vacuum/log
281  ingest_err:
282    type: File
283    outputSource: ingest/errors
284  index_err:
285    type: File
286    outputSource: index/errors
287  vacuum_err:
288    type: File
289    outputSource: vacuum/errors