Source code for epa.aqs_tools

"""
Python module to download EPA AQS Data hosted at https://www.epa.gov/aqs

The module can be used as a library of functions
to be called from other python scripts.

The data is downloaded from https://aqs.epa.gov/aqsweb/airdata/download_files.html

The tool adds a column containing a uniquely generated Monitor Key

Probably the only method useful to external user is :func:`download_aqs_data`

"""

#  Copyright (c) 2021. Harvard University
#
#  Developed by Research Software Engineering,
#  Faculty of Arts and Sciences, Research Computing (FAS RC)
#  Author: Michael A Bouzinier
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

import csv
import logging
from typing import List, Dict
import os

from epa import STATE_CODE, COUNTY_CODE, \
    SITE_NUM, PARAMETER_CODE, MONITOR, RECORD, add_record_num
from epa.aqs_ds_def import AQSContext, Parameter, Aggregation
from nsaph_utils.utils.io_utils import as_csv_reader, fopen, write_csv, \
    DownloadTask

BASE_AQS_EPA_URL = "https://aqs.epa.gov/aqsweb/airdata/"
ANNUAL_URI = "annual_conc_by_monitor_{year}.zip"
DAILY_URI = "daily_{parameter}_{year}.zip"
MONITOR_FORMAT = "{state}{county:03d}-{site:04d}"


[docs]def transfer(reader: csv.DictReader, writer: csv.DictWriter, flt=None, header: bool = True): """ Specific for EPA AQS Data Rewrites the CSV content adding Monitor Key and optionally filtering rows by a provided list of parameter codes :param reader: Input data as an instance of csv.DictReader :param writer: Output source should be provided as csv.DictWriter :param flt: Optionally, a callable function returning True for rows that should be written to the output and False for those that should be omitted :param header: whether to first write header row :return: Nothing """ write_csv(reader, writer, transformer=add_more_columns, filter=flt, write_header=header)
def add_more_columns(row: Dict): add_monitor_key(row) add_record_key(row) record_index = 0
[docs]def add_monitor_key(row: Dict): """ Internal method to generate and add unique Monitor Key :param row: a row of AQS CSV file :return: Nothing, modifies the given row in place """ monitor = MONITOR_FORMAT.format(state = row[STATE_CODE], county = int(row[COUNTY_CODE]), site = int(row[SITE_NUM])) row[MONITOR] = monitor
def add_record_key(row: Dict): global record_index record_index += 1 add_record_num(row, record_index)
[docs]def download_data(task: DownloadTask): """ A utility method to download the content of given URL to the given file :param url: Source URL :param target: Target file path :param parameters: An optional list of EPA AQS Parameter codes to include in the output :param append: whether to append to an existing file :return: Nothing """ target = task.destination parameters = task.metadata write_header = True for url in task.urls: print("{} => {}".format(url, target)) with fopen(target, "at") as ostream: attempt = 0 while True: try: reader = as_csv_reader(url) break except Exception: attempt += 1 if attempt > 3: raise logging.exception("Attempt {:d}: Error downloading {}". format(attempt, url)) fieldnames = list(reader.fieldnames) fieldnames.append(MONITOR) fieldnames.append(RECORD) writer = csv.DictWriter(ostream, fieldnames, quotechar='"', delimiter=',', quoting=csv.QUOTE_NONNUMERIC) if parameters: flt = lambda row: int(row[PARAMETER_CODE]) in parameters else: flt = None transfer(reader, writer, flt, write_header) write_header = False
[docs]def destination_path(destination: str, path: str) -> str: """ A utility method to construct destination file path :param destination: Destination directory :param path: Source path in URL :return: Path on a file system """ return os.path.join(destination, path.replace(".zip", ".csv.gz"))
[docs]def collect_annual_downloads(destination: str, path: str, contiguous_year_segment: List, parameters: List) -> DownloadTask: """ A utility method to collect all URLs that should be downloaded for a given list of years and EPA AQS parameters :param destination: Destination directory for downloads :param path: path element :param contiguous_year_segment: a list of contiguous years taht can be saved in the same file :param parameters: List of EPA AQS Parameter codes :param downloads: The resulting collection of downloads that have to be performed :return: downloads list """ if not parameters: target = destination_path(destination, path) else: f = path[:-4] + '_' + '_'.join(map(str, parameters)) + ".csv.gz" target = os.path.join(destination, f) pp = [int(p) for p in parameters] task = DownloadTask(target, metadata=pp) for year in contiguous_year_segment: task.add_url(BASE_AQS_EPA_URL + ANNUAL_URI.format(year=year)) return task
[docs]def collect_daily_downloads(destination: str, ylabel: str, contiguous_year_segment: List, parameter) -> DownloadTask: """ A utility method to collect all URLs that should be downloaded for a given list of years and EPA AQS parameters :param destination: Destination directory for downloads :param ylabel: a label to use for years in the destination path :param contiguous_year_segment: a list of contiguous years taht can be saved in the same file :param parameters: List of EPA AQS Parameter codes :param downloads: The resulting collection of downloads that have to be performed :return: downloads list """ if isinstance(parameter, Parameter) or parameter in Parameter.values(): p = Parameter(parameter) else: p = int(parameter) path = DAILY_URI.format(parameter=p, year=ylabel) target = destination_path(destination, path) task = DownloadTask(target) base_url = BASE_AQS_EPA_URL + DAILY_URI.format(parameter=int(parameter), year="{year}") for year in contiguous_year_segment: task.add_url(base_url.format(year=year)) return task
[docs]def collect_aqs_download_tasks (context: AQSContext): """ Main entry into the library :param aggregation: Type of time aggregation: annual or daily :param years: a list of years to include, if None - then all years are included :param destination: Destination Directory :param parameters: List of EPA AQS Parameter codes. For annual aggregation can be empty, in which case all data is downloaded. Required for daily aggregation. Can contain either integer codes, or mnemonic instanced of Parameter Enum or both. :param merge_years: :return: """ parameters = context.parameters if context.aggregation == Aggregation.DAILY: assert len(parameters) > 0 years = sorted(context.years) segment = [years[0]] contiguous_years = [segment] for i in range(1, len(years)): if context.merge_years and years[i-1] == years[i] - 1: segment.append(years[i]) else: segment = [years[i]] contiguous_years.append(segment) if parameters: parameters = sorted(parameters) downloads = [] for segment in contiguous_years: if len(segment) == 1: y = str(segment[0]) else: y = "{}-{}".format(segment[0], segment[-1]) if context.aggregation == Aggregation.ANNUAL: path = ANNUAL_URI.format(year=y) task = collect_annual_downloads(context.destination, path, segment, parameters) downloads.append(task) elif context.aggregation == Aggregation.DAILY: for parameter in parameters: task = collect_daily_downloads(context.destination, y, segment, parameter) downloads.append(task) return downloads