#!/usr/bin/env cwl-runner
### Pipeline to aggregate data from Climatology Lab
# Copyright (c) 2021-2022. Harvard University
#
# Developed by Research Software Engineering,
# Faculty of Arts and Sciences, Research Computing (FAS RC)
# Author: Michael A Bouzinier
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
cwlVersion: v1.2
class: Workflow
requirements:
SubworkflowFeatureRequirement: {}
StepInputExpressionRequirement: {}
InlineJavascriptRequirement: {}
ScatterFeatureRequirement: {}
MultipleInputFeatureRequirement: {}
doc: |
This workflow downloads NetCDF datasets from
[University of Idaho Gridded Surface Meteorological Dataset](https://www.northwestknowledge.net/metdata/data/),
aggregates gridded data to daily mean values over chosen geographies
and optionally ingests it into the database.
The output of the workflow are gzipped CSV files containing
aggregated data.
Optionally, the aggregated data can be ingested into a database
specified in the connection parameters:
* `database.ini` file containing connection descriptions
* `connection_name` a string referring to a section in the `database.ini`
file, identifying specific connection to be used.
The workflow can be invoked either by providing command line options
as in the following example:
toil-cwl-runner --retryCount 1 --cleanWorkDir never \
--outdir /scratch/work/exposures/outputs \
--workDir /scratch/work/exposures \
gridmet.cwl \
--database /opt/local/database.ini \
--connection_name dorieh \
--bands rmin rmax \
--strategy auto \
--geography zcta \
--ram 8GB
Or, by providing a YaML file (see [example](../test_gridmet_job))
with similar options:
toil-cwl-runner --retryCount 1 --cleanWorkDir never \
--outdir /scratch/work/exposures/outputs \
--workDir /scratch/work/exposures \
gridmet.cwl test_gridmet_job.yml
inputs:
proxy:
type: string?
default: ""
doc: HTTP/HTTPS Proxy if required
shapes:
type: Directory?
doc: Do we even need this parameter, as we instead downloading shapes?
geography:
type: string
doc: |
Type of geography: zip codes or counties
Valid values: "zip", "zcta" or "county"
years:
type: string[]
default: ['1999', '2000', '2001', '2002', '2003', '2004', '2005', '2006', '2007', '2008', '2009', '2010', '2011', '2012', '2013', '2014', '2015', '2016', '2017', '2018', '2019', '2020']
bands:
doc: |
University of Idaho Gridded Surface Meteorological Dataset
[bands](https://developers.google.com/earth-engine/datasets/catalog/IDAHO_EPSCOR_GRIDMET#bands)
type: string[]
# default: ['bi', 'erc', 'etr', 'fm100', 'fm1000', 'pet', 'pr', 'rmax', 'rmin', 'sph', 'srad', 'th', 'tmmn', 'tmmx', 'vpd', 'vs']
strategy:
type: string
default: auto
doc: |
[Rasterization strategy](https://nsaph-data-platform.github.io/nsaph-platform-docs/common/gridmet/doc/strategy.html)
used for spatial aggregation
ram:
type: string
default: 2GB
doc: |
Runtime memory, available to the process. When aggregation
strategy is `auto`, this value is used to calculate the optimal
downscaling factor for the available resources.
database:
type: File
doc: Path to database connection file, usually database.ini
connection_name:
type: string
doc: The name of the section in the database.ini file
dates:
type: string?
doc: 'dates restriction, for testing purposes only'
domain:
type: string
default: climate
steps:
init_db_schema:
doc: We need to do it because of parallel creation of tables
run:
class: CommandLineTool
baseCommand: [python, -m, nsaph.util.psql]
doc: |
This tool executes an SQL statement in the database to grant
read privileges to NSAPH users (memebrs of group nsaph_admin)
inputs:
database:
type: File
doc: Path to database connection file, usually database.ini
inputBinding:
prefix: --db
connection_name:
type: string
doc: The name of the section in the database.ini file
inputBinding:
prefix: --connection
domain:
type: string
#default: climate
arguments:
- valueFrom: $("CREATE SCHEMA IF NOT EXISTS " + inputs.domain + ';')
position: 3
outputs:
log:
type: stdout
err:
type: stderr
stderr: "schema.err"
stdout: "schema.log"
in:
database: database
connection_name: connection_name
domain: domain
out:
- log
- err
make_registry:
run: registry.cwl
doc: Writes down YAML file with the database model
in:
depends_on: init_db_schema/log
domain: domain
out:
- model
- log
- errors
init_tables:
doc: creates or recreates database tables, one for each band
scatter:
- band
run:
class: Workflow
inputs:
registry:
type: File
table:
type: string
domain:
type: string
database:
type: File
connection_name:
type: string
steps:
reset:
run: reset.cwl
in:
registry: registry
domain: domain
database: database
connection_name: connection_name
table: table
out:
- log
- errors
index:
run: index.cwl
in:
depends_on: reset/log
registry: registry
domain: domain
table: table
database: database
connection_name: connection_name
out: [log, errors]
outputs:
reset_log:
type: File
outputSource: reset/log
reset_err:
type: File
outputSource: reset/errors
index_log:
type: File
outputSource: index/log
index_err:
type: File
outputSource: index/errors
in:
registry: make_registry/model
database: database
connection_name: connection_name
band: bands
geography: geography
domain: domain
table:
valueFrom: $(inputs.geography + '_' + inputs.band)
out:
- reset_log
- reset_err
- index_log
- index_err
process:
run: gridmet_one_file.cwl
doc: Downloads raw data and aggregates it over shapes and time
scatter:
- band
- year
scatterMethod: nested_crossproduct
in:
proxy: proxy
depends_on: init_tables/index_log
model: make_registry/model
shapes: shapes
geography: geography
strategy: strategy
ram: ram
year: years
dates: dates
band: bands
database: database
connection_name: connection_name
domain: domain
months:
valueFrom: $([1,2,3,4,5,6,7,8,9,10,11,12])
table:
valueFrom: $(inputs.geography + '_' + inputs.band)
out:
- download_log
- download_err
- add_data_aggregate_errors
- add_data_data
- add_data_aggregate_log
- add_data_ingest_log
- add_data_ingest_errors
- vacuum_log
- vacuum_err
outputs:
registry:
type: File?
outputSource: make_registry/model
registry_log:
type: File?
outputSource: make_registry/log
registry_err:
type: File?
outputSource: make_registry/errors
data:
type:
type: array
items:
type: array
items:
type: array
items: [File]
outputSource: process/add_data_data
download_log:
type:
type: array
items:
type: array
items: [File]
outputSource: process/download_log
download_err:
type:
type: array
items:
type: array
items: [File]
outputSource: process/download_err
process_log:
type:
type: array
items:
type: array
items:
type: array
items: [File]
outputSource: process/add_data_aggregate_log
process_err:
type:
type: array
items:
type: array
items:
type: array
items: [File]
outputSource: process/add_data_aggregate_errors
ingest_log:
type:
type: array
items:
type: array
items:
type: array
items: [File]
outputSource: process/add_data_ingest_log
ingest_err:
type:
type: array
items:
type: array
items:
type: array
items: [File]
outputSource: process/add_data_ingest_errors
reset_log:
type:
type: array
items: [File]
outputSource: init_tables/reset_log
reset_err:
type:
type: array
items: [File]
outputSource: init_tables/reset_err
index_log:
type:
type: array
items: [File]
outputSource: init_tables/index_log
index_err:
type:
type: array
items: [File]
outputSource: init_tables/index_err
vacuum_log:
type:
type: array
items:
type: array
items: [File]
outputSource: process/vacuum_log
vacuum_err:
type:
type: array
items:
type: array
items: [File]
outputSource: process/vacuum_err