1#!/usr/bin/env cwl-runner
2### Pipeline to aggregate data in NetCDF format over given geographies
3# Copyright (c) 2021-2022. Harvard University
4#
5# Developed by Research Software Engineering,
6# Faculty of Arts and Sciences, Research Computing (FAS RC)
7# Author: Michael A Bouzinier
8#
9# Licensed under the Apache License, Version 2.0 (the "License");
10# you may not use this file except in compliance with the License.
11# You may obtain a copy of the License at
12#
13# http://www.apache.org/licenses/LICENSE-2.0
14#
15# Unless required by applicable law or agreed to in writing, software
16# distributed under the License is distributed on an "AS IS" BASIS,
17# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18# See the License for the specific language governing permissions and
19# limitations under the License.
20#
21
22cwlVersion: v1.2
23class: Workflow
24
25requirements:
26 SubworkflowFeatureRequirement: {}
27 StepInputExpressionRequirement: {}
28 InlineJavascriptRequirement: {}
29 ScatterFeatureRequirement: {}
30 MultipleInputFeatureRequirement: {}
31
32
33doc: |
34 Workflow to aggregate pollution data coming in NetCDF format
35 over given geographies (zip codes or counties) and output as
36 CSV files. This is a wrapper around actual aggregation of
37 one file allowing to scatter (parallelize) the aggregation
38 over years.
39
40 The output of the workflow are gzipped CSV files containing
41 aggregated data.
42
43 Optionally, the aggregated data can be ingested into a database
44 specified in the connection parameters:
45
46 * `database.ini` file containing connection descriptions
47 * `connection_name` a string referring to a section in the `database.ini`
48 file, identifying specific connection to be used.
49
50 The workflow can be invoked either by providing command line options
51 as in the following example:
52
53 toil-cwl-runner --retryCount 1 --cleanWorkDir never \
54 --outdir /scratch/work/exposures/outputs \
55 --workDir /scratch/work/exposures \
56 pm25_yearly_download.cwl \
57 --database /opt/local/database.ini \
58 --connection_name dorieh \
59 --downloads s3://nsaph-public/data/exposures/wustl/ \
60 --strategy default \
61 --geography zcta \
62 --shape_file_collection tiger \
63 --table pm25_annual_components_mean
64
65 Or, by providing a YaML file (see [example](../test_exposure_job))
66 with similar options:
67
68 toil-cwl-runner --retryCount 1 --cleanWorkDir never \
69 --outdir /scratch/work/exposures/outputs \
70 --workDir /scratch/work/exposures \
71 pm25_yearly_download.cwl test_exposure_job.yml
72
73
74inputs:
75 proxy:
76 type: string?
77 default: ""
78 doc: HTTP/HTTPS Proxy if required
79 downloads:
80 type: Directory
81 doc: |
82 Local or AWS bucket folder containing netCDF grid files, downloaded
83 and unpacked from Washington University in St. Louis (WUSTL) Box
84 site. Annual and monthly data repositories are described in
85 [WUSTL Atmospheric Composition Analysis Group](https://sites.wustl.edu/acag/datasets/surface-pm2-5/).
86
87 The annual data for PM2.5 is also available in
88 a Harvard URC AWS Bucket: `s3://nsaph-public/data/exposures/wustl/`
89 geography:
90 type: string
91 doc: |
92 Type of geography: zip codes or counties
93 Supported values: "zip", "zcta" or "county"
94 years:
95 type: int[]
96 default: [2000,2001,2002,2003,2004,2005,2006,2007,2008,2009,2010,2011,2012,2013,2014,2015,2016,2017]
97 variable:
98 type: string
99 default: PM25
100 doc: |
101 The main variable that is being aggregated over shapes. We have tested
102 the pipeline for PM25
103 component:
104 type: string[]
105 default: [BC, NH4, NIT, OM, SO4, SOIL, SS]
106 doc: |
107 Optional components provided as percentages in a separate set
108 of netCDF files
109 strategy:
110 type: string
111 default: auto
112 doc: |
113 Rasterization strategy, see
114 [documentation](https://nsaph-data-platform.github.io/nsaph-platform-docs/common/gridmet/doc/strategy.html)
115 for the list of supported values and explanations
116 ram:
117 type: string
118 default: 2GB
119 doc: Runtime memory, available to the process
120
121 shape_file_collection:
122 type: string
123 default: tiger
124 doc: |
125 [Collection of shapefiles](https://www2.census.gov/geo/tiger),
126 either GENZ or TIGER
127 database:
128 type: File
129 doc: |
130 Path to database connection file, usually database.ini.
131 This argument is ignored if `connection_name` == `None`
132 default:
133 path: database.ini
134 class: File
135
136 connection_name:
137 type: string
138 doc: |
139 The name of the section in the database.ini file or a literal
140 `None` to skip over database ingestion step
141 table:
142 type: string
143 doc: The name of the table to store teh aggregated data in
144 default: pm25_aggregated
145
146
147steps:
148 initdb:
149 run: initdb.cwl
150 doc: Ensure that database utilities are at their latest version
151 in:
152 database: database
153 connection_name: connection_name
154 out:
155 - log
156 - err
157
158 process:
159 doc: Downloads raw data and aggregates it over shapes and time
160 scatter:
161 - year
162 run: aggregate_one_file.cwl
163 in:
164 proxy: proxy
165 downloads: downloads
166 geography: geography
167 shape_file_collection: shape_file_collection
168 year: years
169 variable: variable
170 component: component
171 strategy: strategy
172 ram: ram
173 table: table
174 depends_on: initdb/log
175 out:
176 - shapes
177 - aggregate_data
178 - consolidated_data
179 - aggregate_log
180 - aggregate_err
181 - data_dictionary
182
183 extract_data_dictionary:
184 run:
185 class: ExpressionTool
186 inputs:
187 yaml_files:
188 type: File[]
189 outputs:
190 data_dictionary:
191 type: File
192 expression: |
193 ${
194 return {data_dictionary: inputs.yaml_files[0]}
195 }
196 in:
197 yaml_files: process/data_dictionary
198 out:
199 - data_dictionary
200
201 ingest:
202 run: ingest.cwl
203 when: $(inputs.connection_name.toLowerCase() != 'none')
204 doc: Uploads data into the database
205 in:
206 registry: extract_data_dictionary/data_dictionary
207 domain:
208 valueFrom: "exposures"
209 table: table
210 input: process/aggregate_data
211 database: database
212 connection_name: connection_name
213 out: [log, errors]
214
215 index:
216 run: index.cwl
217 when: $(inputs.connection_name.toLowerCase() != 'none')
218 in:
219 depends_on: ingest/log
220 registry: extract_data_dictionary/data_dictionary
221 domain:
222 valueFrom: "exposures"
223 table: table
224 database: database
225 connection_name: connection_name
226 out: [log, errors]
227
228 vacuum:
229 run: vacuum.cwl
230 when: $(inputs.connection_name.toLowerCase() != 'none')
231 in:
232 depends_on: index/log
233 registry: extract_data_dictionary/data_dictionary
234 domain:
235 valueFrom: "exposures"
236 table: table
237 database: database
238 connection_name: connection_name
239 out: [log, errors]
240
241
242
243outputs:
244 aggregate_data:
245 type: File[]
246 outputSource: process/aggregate_data
247 data_dictionary:
248 type: File
249 outputSource: extract_data_dictionary/data_dictionary
250 doc: Data dictionary file, in YaML format, describing output variables
251 consolidated_data:
252 type: File[]
253 outputSource: process/consolidated_data
254 shapes:
255 type:
256 type: array
257 items:
258 type: array
259 items: [File]
260 outputSource: process/shapes
261
262 aggregate_log:
263 type:
264 type: array
265 items: Any
266
267 outputSource: process/aggregate_log
268 aggregate_err:
269 type: File[]
270 outputSource: process/aggregate_err
271
272 ingest_log:
273 type: File
274 outputSource: ingest/log
275 index_log:
276 type: File
277 outputSource: index/log
278 vacuum_log:
279 type: File
280 outputSource: vacuum/log
281 ingest_err:
282 type: File
283 outputSource: ingest/errors
284 index_err:
285 type: File
286 outputSource: index/errors
287 vacuum_err:
288 type: File
289 outputSource: vacuum/errors