Source code for gridmet.task

"""
This module contains classes used to define, schedule and execute
long running computations used by gridMET pipelines
"""

#  Copyright (c) 2021. Harvard University
#
#  Developed by Research Software Engineering,
#  Faculty of Arts and Sciences, Research Computing (FAS RC)
#  Author: Michael A Bouzinier
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

import csv
import logging
import os
from abc import ABC, abstractmethod
from datetime import date, timedelta, datetime
from enum import Enum
from typing import List, Dict

from netCDF4._netCDF4 import Dataset
from rasterstats.io import Raster
from tqdm import tqdm

from gridmet.config import GridmetVariable, GridMETContext, Shape
from gridmet.gridmet_tools import find_shape_file, get_nkn_url, get_variable, get_days, \
    get_affine_transform, disaggregate
from gridmet.netCDF_tools import NCViewer
from gridmet.prof import ProfilingData
from nsaph_gis.compute_shape import StatsCounter
from nsaph_gis.constants import Geography, RasterizationStrategy
from nsaph_gis.geometry import PointInRaster
from nsaph_utils.utils.io_utils import DownloadTask, fopen, as_stream
from nsaph_utils.utils.profile_utils import mem

NO_DATA = 32767.0  # The value filled in masked arrays in NetCDF files
# for the masked cells


[docs]def count_lines(f): with fopen(f, "r") as x: return sum(1 for line in x)
[docs]def quote(s:str) -> str: return '"' + s + '"'
[docs]class Parallel(Enum): points = "points" bands = "bands" days = "days"
[docs]class Collector(ABC): def __init__(self): pass
[docs] @abstractmethod def writerow(self, data: List): pass
[docs] def flush(self): pass
[docs]class CSVWriter(Collector): def __init__(self, out_stream): super().__init__() self.out = out_stream self.writer = csv.writer(out_stream, delimiter=',', quoting=csv.QUOTE_NONE)
[docs] def writerow(self, row: List): self.writer.writerow(row)
[docs] def flush(self): self.out.flush()
[docs]class ListCollector(Collector): def __init__(self): super().__init__() self.collection = []
[docs] def writerow(self, data: List): self.collection.append(data)
[docs] def get_result(self): return self.collection
[docs]class ComputeGridmetTask(ABC): """ An abstract class for a computational task that processes data in `Unidata netCDF (Version 4) format <https://www.unidata.ucar.edu/software/netcdf/>`_ """ origin = date(1900, 1, 1) def __init__(self, year: int, variable: GridmetVariable, infile: str, outfile: str, date_filter=None, ram: int=0): """ :param ram: :param date_filter: :param year: year :param variable: Gridemt band (variable) :param infile: File with source data in NCDF4 format :param outfile: Resulting CSV file """ self.year = year self.infile = infile self.outfile = outfile self.band = variable self.factor = 1 self.affine = None self.dataset = None self.variable = None self.parallel = {Parallel.points} self.date_filter = date_filter self.perf = ProfilingData() self.missing_value = None self.ram = ram
[docs] @classmethod def get_variable(cls, dataset: Dataset, variable: GridmetVariable): return get_variable(dataset, variable.value)
[docs] @abstractmethod def get_key(self): pass
[docs] def get_days(self) -> Dict: all_days = get_days(self.dataset) days = dict() for i in range(len(all_days)): day = all_days[i] if self.date_filter: if self.date_filter.accept(self.to_date(day)): days[day] = i else: days[day] = i return days
[docs] def prepare(self) -> Dict: logging.info("%s => %s", self.infile, self.outfile) self.dataset = Dataset(self.infile) days = self.get_days() self.variable = self.get_variable(self.dataset, self.band) viewer = NCViewer(self.infile) if viewer.missing_value: self.missing_value = viewer.missing_value self.factor = viewer.get_optimal_downscaling_factor(self.ram) self.on_prepare() if not self.affine: self.affine = get_affine_transform(self.infile, self.factor) return days
[docs] def on_prepare(self): pass
[docs] def execute(self, mode: str = "wt"): """ Executes computational task :param mode: mode to use opening result file :type mode: str :return: """ days = self.prepare() with fopen(self.outfile, mode) as out: writer = CSVWriter(out) if 'a' not in mode: writer.writerow([self.band.value, "date", self.get_key().lower()]) self.collect_data(days, writer)
[docs] def collect_data(self, days: Dict, collector: Collector): t0 = datetime.now() for day in days: idx = days[day] layer = self.dataset[self.variable][idx, :, :] # layer = layer[layer.mask == False] t1 = datetime.now() self.compute_one_day(collector, day, layer) collector.flush() t3 = datetime.now() t = datetime.now() - t0 logging.info(" \t{} [{}]".format(str(t3 - t1), str(t))) return collector
[docs] @abstractmethod def compute_one_day(self, writer: Collector, day, layer): """ Computes required statistics for a single day. This method is called by `execute()` and is implemented in specific subclasses :param writer: CSV Writer to output the result :param day: day :param layer: layer, corresponding to the day :return: Nothing """ pass
[docs] def to_date(self, day) -> datetime.date: return self.origin + timedelta(days=day)
[docs]class ComputeShapesTask(ComputeGridmetTask): """ Class describes a compute task to aggregate data over geography shapes The data is expected in .. _Unidata netCDF (Version 4) format: https://www.unidata.ucar.edu/software/netcdf/ """ def __init__(self, year: int, variable: GridmetVariable, infile: str, outfile: str, strategy: RasterizationStrategy, shapefile: str, geography: Geography, date_filter=None, ram=0): """ :param ram: :param date_filter: :param year: year :param variable: gridMET band (variable) :param infile: File with source data in NCDF4 format :param outfile: Resulting CSV file :param strategy: Rasterization strategy to use :param shapefile: Shapefile for used collection of geographies :param geography: Type of geography, e.g. zip code or county """ super().__init__(year, variable, infile, outfile, date_filter, ram) self.strategy = strategy self.shapefile = shapefile self.geography = geography
[docs] def on_prepare(self): if self.strategy in [ RasterizationStrategy.default, RasterizationStrategy.all_touched ]: self.factor = 1
[docs] def get_key(self): return self.geography.value.upper()
[docs] def compute_one_day(self, writer: Collector, day, layer): dt = self.to_date(day) start_ts = datetime.now() if self.factor > 1: logging.info("Downscaling by the factor of " + str(self.factor)) layer = disaggregate(layer, self.factor) if self.factor > self.perf.factor: self.perf.factor = self.factor x = layer.shape[0] y = layer.shape[1] if x > self.perf.shape_x: self.perf.shape_x = x if y > self.perf.shape_y: self.perf.shape_y = y logging.info( "%s:%s:%s:%s: layer shape %s", str(start_ts), self.geography.value, self.band.value, dt, str(layer.shape) ) aggr_ts = datetime.now() for record in StatsCounter.process( self.strategy, self.shapefile, self.affine, layer, self.geography, self.missing_value ): writer.writerow([record.value, dt.strftime("%Y-%m-%d"), record.prop]) now = datetime.now() delta_ts = now - start_ts delta_aggr_ts = now -aggr_ts logging.debug( "Completed in %s, aggregation: %s", delta_ts, delta_aggr_ts ) self.perf.update_mem_time( StatsCounter.max_mem_used, delta_ts, delta_aggr_ts )
[docs]class ComputePointsTask(ComputeGridmetTask): """ Class describes a compute task to assign data to a collection of points The data is expected in .. _Unidata netCDF (Version 4) format: https://www.unidata.ucar.edu/software/netcdf/ """ force_standard_api = False def __init__(self, year: int, variable: GridmetVariable, infile: str, outfile: str, points_file: str, coordinates: List, metadata: List, date_filter=None, ram=0): """ :param ram: :param year: year :param variable: Gridemt band (variable) :param infile: File with source data in NCDF4 format :param outfile: Resulting CSV file :param points_file: path to a file containing coordinates of points in csv format. :param coordinates: A two element list of column names in csv corresponding to coordinates :param metadata: A list of column names in csv that should be interpreted as metadata (e.g. ZIP, site_id, etc.) """ super().__init__(year, variable, infile, outfile, date_filter, ram) self.points_file = points_file assert len(coordinates) == 2 self.coordinates = coordinates self.metadata = metadata self.first_layer = None
[docs] def get_key(self): return self.metadata[0]
[docs] def on_prepare(self): self.first_layer = Raster( self.dataset[self.variable][0, :, :], self.affine, nodata=-NO_DATA, ) return
[docs] def make_point(self, row) -> PointInRaster: x = float(row[self.coordinates[0]]) y = float(row[self.coordinates[1]]) point = PointInRaster(self.first_layer, self.affine, x, y) return point
[docs] def execute(self, mode: str = "w") -> None: days = self.prepare() logging.info("Prepare rasters") rasters = {} for day_number in tqdm(range(len(days)), total=len(days)): layer = self.dataset[self.variable][day_number, :, :] raster = Raster(layer, self.affine, nodata=NO_DATA) rasters[day_number] = raster logging.info("Process points") with fopen(self.points_file, "r") as points_file, \ fopen(self.outfile, "wt") as out: reader = csv.DictReader(points_file) writer = CSVWriter(out) writer.writerow([self.band.value, "date", self.get_key().lower()]) for n, row in enumerate(tqdm(reader)): point = self.make_point(row) if point.is_masked(): continue metadata = [row[p] for p in self.metadata] for day_number in range(len(days)): dt = self.origin + timedelta(days=days[day_number]) mean = point.bilinear(rasters[day_number]) writer.writerow([mean, dt] + metadata) if n % 10_000: writer.flush()
[docs] def compute_one_day(self, writer: Collector, day, layer): pass
[docs]class DownloadGridmetTask: """ Task to download source file in NCDF4 format """ BLOCK_SIZE = 65536
[docs] @classmethod def get_url(cls, year:int, variable: GridmetVariable) -> str: """ Constructs URL given a year and band :param year: year :param variable: Gridmet band (variable) :return: URL for download """ return get_nkn_url(variable.value, year)
def __init__(self, year: int, variable: GridmetVariable, destination: str): """ :param year: year :param variable: Gridmet band (variable) :param destination: Destination directory for all downloads """ if not os.path.isdir(destination): os.makedirs(destination) url = self.get_url(year, variable) target = os.path.join(destination, url.split('/')[-1]) self.download_task = DownloadTask(target, [url]) self.perf = ProfilingData() #self.max_mem_used = 0
[docs] def target(self): """ :return: File path for downloaded data """ return self.download_task.destination
[docs] def execute(self): """ Executes the task :return: None """ logging.info(str(self.download_task)) if self.download_task.is_up_to_date(): logging.info("Up to date") return buffer = bytearray(self.BLOCK_SIZE) start_ts = datetime.now() with fopen(self.target(), "wb") as writer, \ as_stream(self.download_task.urls[0]) as reader: n = 0 while True: ret = reader.readinto(buffer) if not ret: break writer.write(buffer[:ret]) n += 1 if (n % 20) == 0: print("*", end='') self.perf.update_mem_time(mem(), datetime.now() - start_ts) return
[docs]class GridmetTask: """ Defines a task to download and process data for a single year and variable Instances of this class can be used to parallelize processing """
[docs] @classmethod def destination_file_name(cls, context: GridMETContext, year: int, variable: GridmetVariable): """ Constructs a file name for a given set of parameters :param context: Configuration object for the pipeline :param year: year :param variable: Gridmet band (variable) :return: `variable_geography_year.csv[.gz]` """ g = context.geography.value s = context.shapes[0].value if len(context.shapes) == 1 else "all" f = "{}_{}_{}_{:d}.csv".format(variable.value, g, s, year) if context.compress: f += ".gz" return os.path.join(context.destination, f)
[docs] @classmethod def find_shape_file(cls, context: GridMETContext, year: int, shape: Shape): """ Finds shapefile for a given type of geographies for the closest available year :param context: Configuration object for the pipeline :param year: year :param shape: Shape type :return: a shape file for a given year if it exists or for the latest year before the given """ parent_dir = context.shapes_dir geo_type = context.geography.value shape_type = shape.value return find_shape_file(parent_dir, year, geo_type, shape_type)
def __init__(self, context: GridMETContext, year: int, variable: GridmetVariable): """ :param context: Configuration object for the pipeline :param year: year :param variable: gridMET band (variable) """ if os.path.isfile(context.raw_downloads): self.download_task = None self.raw_download = context.raw_downloads else: dest = context.raw_downloads _, ext = os.path.splitext(dest) if not os.path.isdir(dest) or ext: dest = os.path.dirname(dest) self.download_task = DownloadGridmetTask(year, variable, dest) self.raw_download = self.download_task.target() destination = context.destination if not os.path.isdir(destination): os.makedirs(destination) result = self.destination_file_name(context, year, variable) self.compute_tasks = [] if context.shape_files: self.compute_tasks = [ ComputeShapesTask(year, variable, self.raw_download, result, context.strategy, shape_filename, context.geography, context.dates, ram=context.ram) for shape_filename in context.shape_files ] elif Shape.polygon in context.shapes or not context.points: self.compute_tasks = [ ComputeShapesTask(year, variable, self.download_task.target(), result, context.strategy, shape_file, context.geography, context.dates, ram=context.ram) for shape_file in [ self.find_shape_file(context, year, shape) for shape in context.shapes ] ] if Shape.point in context.shapes and context.points: self.compute_tasks += [ ComputePointsTask(year, variable, self.download_task.target(), result, context.points, context.coordinates, context.metadata, ram=context.ram) ] if not self.compute_tasks: raise Exception("Invalid combination of arguments") self.perf = ProfilingData()
[docs] def execute(self): """ Executes the task. First the download subtask is executed unless the corresponding file has already been downloaded. Then the compute tasks are executed :return: None """ if self.download_task is not None: self.download_task.execute() self.perf.update(self.download_task.perf) for task in self.compute_tasks: task.execute() self.perf.update(task.perf)