Source code for gridmet.task

This module contains classes used to define, schedule and execute
long running computations used by gridMET pipelines

#  Copyright (c) 2021. Harvard University
#  Developed by Research Software Engineering,
#  Faculty of Arts and Sciences, Research Computing (FAS RC)
#  Author: Michael A Bouzinier
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  See the License for the specific language governing permissions and
#  limitations under the License.

import csv
import logging
import os
from abc import ABC, abstractmethod
from datetime import date, timedelta, datetime
from enum import Enum
from typing import List, Dict

from netCDF4._netCDF4 import Dataset
from import Raster
from tqdm import tqdm

from gridmet.config import GridmetVariable, GridMETContext, Shape
from gridmet.gridmet_tools import find_shape_file, get_nkn_url, get_variable, get_days, \
    get_affine_transform, disaggregate
from gridmet.netCDF_tools import NCViewer
from import ProfilingData
from nsaph_gis.compute_shape import StatsCounter
from nsaph_gis.constants import Geography, RasterizationStrategy
from nsaph_gis.geometry import PointInRaster
from nsaph_utils.utils.io_utils import DownloadTask, fopen, as_stream
from nsaph_utils.utils.profile_utils import mem

NO_DATA = 32767.0  # The value filled in masked arrays in NetCDF files
# for the masked cells

[docs]def count_lines(f): with fopen(f, "r") as x: return sum(1 for line in x)
[docs]def quote(s:str) -> str: return '"' + s + '"'
[docs]class Parallel(Enum): points = "points" bands = "bands" days = "days"
[docs]class Collector(ABC): def __init__(self): pass
[docs] @abstractmethod def writerow(self, data: List): pass
[docs] def flush(self): pass
[docs]class CSVWriter(Collector): def __init__(self, out_stream): super().__init__() self.out = out_stream self.writer = csv.writer(out_stream, delimiter=',', quoting=csv.QUOTE_NONE)
[docs] def writerow(self, row: List): self.writer.writerow(row)
[docs] def flush(self): self.out.flush()
[docs]class ListCollector(Collector): def __init__(self): super().__init__() self.collection = []
[docs] def writerow(self, data: List): self.collection.append(data)
[docs] def get_result(self): return self.collection
[docs]class ComputeGridmetTask(ABC): """ An abstract class for a computational task that processes data in `Unidata netCDF (Version 4) format <>`_ """ origin = date(1900, 1, 1) def __init__(self, year: int, variable: GridmetVariable, infile: str, outfile: str, date_filter=None, ram: int=0): """ :param ram: :param date_filter: :param year: year :param variable: Gridemt band (variable) :param infile: File with source data in NCDF4 format :param outfile: Resulting CSV file """ self.year = year self.infile = infile self.outfile = outfile = variable self.factor = 1 self.affine = None self.dataset = None self.variable = None self.parallel = {Parallel.points} self.date_filter = date_filter self.perf = ProfilingData() self.missing_value = None self.ram = ram
[docs] @classmethod def get_variable(cls, dataset: Dataset, variable: GridmetVariable): return get_variable(dataset, variable.value)
[docs] @abstractmethod def get_key(self): pass
[docs] def get_days(self) -> Dict: all_days = get_days(self.dataset) days = dict() for i in range(len(all_days)): day = all_days[i] if self.date_filter: if self.date_filter.accept(self.to_date(day)): days[day] = i else: days[day] = i return days
[docs] def prepare(self) -> Dict:"%s => %s", self.infile, self.outfile) self.dataset = Dataset(self.infile) days = self.get_days() self.variable = self.get_variable(self.dataset, viewer = NCViewer(self.infile) if viewer.missing_value: self.missing_value = viewer.missing_value self.factor = viewer.get_optimal_downscaling_factor(self.ram) self.on_prepare() if not self.affine: self.affine = get_affine_transform(self.infile, self.factor) return days
[docs] def on_prepare(self): pass
[docs] def execute(self, mode: str = "wt"): """ Executes computational task :param mode: mode to use opening result file :type mode: str :return: """ days = self.prepare() with fopen(self.outfile, mode) as out: writer = CSVWriter(out) if 'a' not in mode: writer.writerow([, "date", self.get_key().lower()]) self.collect_data(days, writer)
[docs] def collect_data(self, days: Dict, collector: Collector): t0 = for day in days: idx = days[day] layer = self.dataset[self.variable][idx, :, :] # layer = layer[layer.mask == False] t1 = self.compute_one_day(collector, day, layer) collector.flush() t3 = t = - t0" \t{} [{}]".format(str(t3 - t1), str(t))) return collector
[docs] @abstractmethod def compute_one_day(self, writer: Collector, day, layer): """ Computes required statistics for a single day. This method is called by `execute()` and is implemented in specific subclasses :param writer: CSV Writer to output the result :param day: day :param layer: layer, corresponding to the day :return: Nothing """ pass
[docs] def to_date(self, day) -> return self.origin + timedelta(days=day)
[docs]class ComputeShapesTask(ComputeGridmetTask): """ Class describes a compute task to aggregate data over geography shapes The data is expected in .. _Unidata netCDF (Version 4) format: """ def __init__(self, year: int, variable: GridmetVariable, infile: str, outfile: str, strategy: RasterizationStrategy, shapefile: str, geography: Geography, date_filter=None, ram=0): """ :param ram: :param date_filter: :param year: year :param variable: gridMET band (variable) :param infile: File with source data in NCDF4 format :param outfile: Resulting CSV file :param strategy: Rasterization strategy to use :param shapefile: Shapefile for used collection of geographies :param geography: Type of geography, e.g. zip code or county """ super().__init__(year, variable, infile, outfile, date_filter, ram) self.strategy = strategy self.shapefile = shapefile self.geography = geography
[docs] def on_prepare(self): if self.strategy in [ RasterizationStrategy.default, RasterizationStrategy.all_touched ]: self.factor = 1
[docs] def get_key(self): return self.geography.value.upper()
[docs] def compute_one_day(self, writer: Collector, day, layer): dt = self.to_date(day) start_ts = if self.factor > 1:"Downscaling by the factor of " + str(self.factor)) layer = disaggregate(layer, self.factor) if self.factor > self.perf.factor: self.perf.factor = self.factor x = layer.shape[0] y = layer.shape[1] if x > self.perf.shape_x: self.perf.shape_x = x if y > self.perf.shape_y: self.perf.shape_y = y "%s:%s:%s:%s: layer shape %s", str(start_ts), self.geography.value,, dt, str(layer.shape) ) aggr_ts = for record in StatsCounter.process( self.strategy, self.shapefile, self.affine, layer, self.geography, self.missing_value ): writer.writerow([record.value, dt.strftime("%Y-%m-%d"), record.prop]) now = delta_ts = now - start_ts delta_aggr_ts = now -aggr_ts logging.debug( "Completed in %s, aggregation: %s", delta_ts, delta_aggr_ts ) self.perf.update_mem_time( StatsCounter.max_mem_used, delta_ts, delta_aggr_ts )
[docs]class ComputePointsTask(ComputeGridmetTask): """ Class describes a compute task to assign data to a collection of points The data is expected in .. _Unidata netCDF (Version 4) format: """ force_standard_api = False def __init__(self, year: int, variable: GridmetVariable, infile: str, outfile: str, points_file: str, coordinates: List, metadata: List, date_filter=None, ram=0): """ :param ram: :param year: year :param variable: Gridemt band (variable) :param infile: File with source data in NCDF4 format :param outfile: Resulting CSV file :param points_file: path to a file containing coordinates of points in csv format. :param coordinates: A two element list of column names in csv corresponding to coordinates :param metadata: A list of column names in csv that should be interpreted as metadata (e.g. ZIP, site_id, etc.) """ super().__init__(year, variable, infile, outfile, date_filter, ram) self.points_file = points_file assert len(coordinates) == 2 self.coordinates = coordinates self.metadata = metadata self.first_layer = None
[docs] def get_key(self): return self.metadata[0]
[docs] def on_prepare(self): self.first_layer = Raster( self.dataset[self.variable][0, :, :], self.affine, nodata=-NO_DATA, ) return
[docs] def make_point(self, row) -> PointInRaster: x = float(row[self.coordinates[0]]) y = float(row[self.coordinates[1]]) point = PointInRaster(self.first_layer, self.affine, x, y) return point
[docs] def execute(self, mode: str = "w") -> None: days = self.prepare()"Prepare rasters") rasters = {} for day_number in tqdm(range(len(days)), total=len(days)): layer = self.dataset[self.variable][day_number, :, :] raster = Raster(layer, self.affine, nodata=NO_DATA) rasters[day_number] = raster"Process points") with fopen(self.points_file, "r") as points_file, \ fopen(self.outfile, "wt") as out: reader = csv.DictReader(points_file) writer = CSVWriter(out) writer.writerow([, "date", self.get_key().lower()]) for n, row in enumerate(tqdm(reader)): point = self.make_point(row) if point.is_masked(): continue metadata = [row[p] for p in self.metadata] for day_number in range(len(days)): dt = self.origin + timedelta(days=days[day_number]) mean = point.bilinear(rasters[day_number]) writer.writerow([mean, dt] + metadata) if n % 10_000: writer.flush()
[docs] def compute_one_day(self, writer: Collector, day, layer): pass
[docs]class DownloadGridmetTask: """ Task to download source file in NCDF4 format """ BLOCK_SIZE = 65536
[docs] @classmethod def get_url(cls, year:int, variable: GridmetVariable) -> str: """ Constructs URL given a year and band :param year: year :param variable: Gridmet band (variable) :return: URL for download """ return get_nkn_url(variable.value, year)
def __init__(self, year: int, variable: GridmetVariable, destination: str): """ :param year: year :param variable: Gridmet band (variable) :param destination: Destination directory for all downloads """ if not os.path.isdir(destination): os.makedirs(destination) url = self.get_url(year, variable) target = os.path.join(destination, url.split('/')[-1]) self.download_task = DownloadTask(target, [url]) self.perf = ProfilingData() #self.max_mem_used = 0
[docs] def target(self): """ :return: File path for downloaded data """ return self.download_task.destination
[docs] def execute(self): """ Executes the task :return: None """ if self.download_task.is_up_to_date():"Up to date") return buffer = bytearray(self.BLOCK_SIZE) start_ts = with fopen(, "wb") as writer, \ as_stream(self.download_task.urls[0]) as reader: n = 0 while True: ret = reader.readinto(buffer) if not ret: break writer.write(buffer[:ret]) n += 1 if (n % 20) == 0: print("*", end='') self.perf.update_mem_time(mem(), - start_ts) return
[docs]class GridmetTask: """ Defines a task to download and process data for a single year and variable Instances of this class can be used to parallelize processing """
[docs] @classmethod def destination_file_name(cls, context: GridMETContext, year: int, variable: GridmetVariable): """ Constructs a file name for a given set of parameters :param context: Configuration object for the pipeline :param year: year :param variable: Gridmet band (variable) :return: `variable_geography_year.csv[.gz]` """ g = context.geography.value s = context.shapes[0].value if len(context.shapes) == 1 else "all" f = "{}_{}_{}_{:d}.csv".format(variable.value, g, s, year) if context.compress: f += ".gz" return os.path.join(context.destination, f)
[docs] @classmethod def find_shape_file(cls, context: GridMETContext, year: int, shape: Shape): """ Finds shapefile for a given type of geographies for the closest available year :param context: Configuration object for the pipeline :param year: year :param shape: Shape type :return: a shape file for a given year if it exists or for the latest year before the given """ parent_dir = context.shapes_dir geo_type = context.geography.value shape_type = shape.value return find_shape_file(parent_dir, year, geo_type, shape_type)
def __init__(self, context: GridMETContext, year: int, variable: GridmetVariable): """ :param context: Configuration object for the pipeline :param year: year :param variable: gridMET band (variable) """ if os.path.isfile(context.raw_downloads): self.download_task = None self.raw_download = context.raw_downloads else: dest = context.raw_downloads _, ext = os.path.splitext(dest) if not os.path.isdir(dest) or ext: dest = os.path.dirname(dest) self.download_task = DownloadGridmetTask(year, variable, dest) self.raw_download = destination = context.destination if not os.path.isdir(destination): os.makedirs(destination) result = self.destination_file_name(context, year, variable) self.compute_tasks = [] if context.shape_files: self.compute_tasks = [ ComputeShapesTask(year, variable, self.raw_download, result, context.strategy, shape_filename, context.geography, context.dates, ram=context.ram) for shape_filename in context.shape_files ] elif Shape.polygon in context.shapes or not context.points: self.compute_tasks = [ ComputeShapesTask(year, variable,, result, context.strategy, shape_file, context.geography, context.dates, ram=context.ram) for shape_file in [ self.find_shape_file(context, year, shape) for shape in context.shapes ] ] if Shape.point in context.shapes and context.points: self.compute_tasks += [ ComputePointsTask(year, variable,, result, context.points, context.coordinates, context.metadata, ram=context.ram) ] if not self.compute_tasks: raise Exception("Invalid combination of arguments") self.perf = ProfilingData()
[docs] def execute(self): """ Executes the task. First the download subtask is executed unless the corresponding file has already been downloaded. Then the compute tasks are executed :return: None """ if self.download_task is not None: self.download_task.execute() self.perf.update(self.download_task.perf) for task in self.compute_tasks: task.execute() self.perf.update(task.perf)