Source code for gridmet.aggregator

"""
API to aggregate data over shapes

The Aggregator class expects a netCDF dataset, containing 3 variables:
value, latitude and longitude
"""
import abc
#  Copyright (c) 2022. Harvard University
#
#  Developed by Research Software Engineering,
#  Faculty of Arts and Sciences, Research Computing (FAS RC)
#  Author: Michael A Bouzinier
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
import logging
import math
import os
import sys
from abc import ABC, abstractmethod
from datetime import datetime
from typing import List, Tuple, Set, Any

from gridmet.netCDF_tools import NCViewer
from gridmet.prof import ProfilingData
from nsaph_utils.utils.profile_utils import mem, qmem, qqmem

import rasterio
from netCDF4 import Dataset
from nsaph import init_logging
from nsaph.pg_keywords import PG_NUMERIC_TYPE, PG_DATE_TYPE, PG_STR_TYPE, \
    PG_INT_TYPE
from nsaph_gis.compute_shape import StatsCounter
from nsaph_gis.constants import RasterizationStrategy, Geography
from nsaph_utils.utils.io_utils import fopen, CSVWriter, Collector, sizeof_fmt

from gridmet.gridmet_tools import get_affine_transform, disaggregate, \
    estimate_optimal_downscaling_factor


[docs]class Aggregator(ABC): def __init__(self, infile: str, variable: str, outfile: str, strategy: RasterizationStrategy, shapefile: str, geography: Geography, extra_columns: Tuple[List[str], List[str]] = None, ram=0): """ :param infile: Path to file with raster data to be aggregated. Can be either NetCDF or GeoTiff file :param variable: Name of variable or variables that need to be aggregated :param outfile: Path to the output "csv.gz" file :param strategy: Rasterization strategy :param shapefile: Path to shapefile with polygons :param geography: What kind of geography: US Counties or ZIP/ZCTA codes :param extra_columns: if we need to add any extra columns to the CSV :param ram: Runtime memory available to the process """ self.infile = infile self.outfile = outfile self.factor = 1 self.affine = None self.dataset: Dataset = None if isinstance(variable, list): self.aggr_variables = variable else: self.aggr_variables = [str(variable)] self.shapefile = shapefile self.geography = geography if extra_columns: self.extra_headers, self.extra_values = extra_columns else: self.extra_headers, self.extra_values = None, None self.strategy = None self.missing_value = None self.ram = ram self.set_strategy(strategy) self.perf = ProfilingData()
[docs] def set_strategy(self, strategy: RasterizationStrategy): self.strategy = strategy if self.strategy in [ RasterizationStrategy.default, RasterizationStrategy.all_touched ]: self.factor = 1 set_factor = False else: set_factor = True ram = int (self.ram / math.sqrt(len(self.aggr_variables))) self.on_set_strategy(ram, set_factor)
[docs] def on_set_strategy(self, ram: int, set_factor: bool): pass
[docs] def prepare(self): if not self.affine: self.affine = get_affine_transform(self.infile, self.factor) logging.info("%s => %s", self.infile, self.outfile) self.open() variables = self.get_dataset_variables() for v in variables: if v in self.aggr_variables: return lv = [v.lower() for v in self.aggr_variables] for v in variables: if v.lower() in lv: idx = lv.index(v.lower()) self.aggr_variables[idx] = v return vvv = [v for v in variables if v.lower() not in ['lat', 'lon']] raise ValueError( "Variable {} not found in the file {}. Available variables: {}" .format(self.aggr_variables, self.infile, ','.join(vvv)) )
[docs] @abstractmethod def open(self): pass
[docs] @abstractmethod def get_dataset_variables(self) -> Set[str]: pass
[docs] @abstractmethod def get_layer(self, var): pass
[docs] def get_header(self) -> List[str]: key = str(self.geography.value).lower() headers = self.aggr_variables + [key] if self.extra_headers: headers += self.extra_headers return headers
[docs] def write_header(self): with fopen(self.outfile, "wt") as out: writer = CSVWriter(out) headers = self.get_header() writer.writerow(headers) return self.outfile
[docs] def get_registry(self, domain_name: str, table_name: str, description: str = None): t0 = datetime.now() if description is None: description = "Dorieh data model for aggregation of a grid" key = str(self.geography.value).lower() domain = { domain_name: { "schema": domain_name, "index": "all", "description": description, "header": True, "quoting": 3, "tables": { } } } columns = [ {var: { "type": PG_NUMERIC_TYPE }} for var in self.aggr_variables ] columns.append({key: { "type": PG_STR_TYPE, "index": True }}) pk = None if self.extra_headers: for i in range(len(self.extra_headers)): c = self.extra_headers[i] v = self.extra_values[i] if isinstance(i, int): t = PG_INT_TYPE else: t = PG_STR_TYPE columns.append({c: { "type": t, "index": True }}) if c.lower() == "year": pk = [key, c] table = { "columns": columns } if pk is not None: table["primary_key"] = pk domain[domain_name]["tables"][table_name] = table m = qmem() self.perf.update_mem_time(m, datetime.now() - t0) return domain
[docs] def execute(self, mode: str = "wt"): """ Executes computational task :param mode: mode to use opening result file :type mode: str :return: """ self.prepare() if 'a' not in mode: self.write_header() if 't' in mode: mode = 'at' else: mode = 'a' with fopen(self.outfile, mode) as out: writer = CSVWriter(out) self.collect_data(writer) m = qmem() self.perf.update_mem_only(m)
[docs] def collect_data(self, collector: Collector): t0 = datetime.now() layers = [self.get_layer(var) for var in self.aggr_variables] t1 = datetime.now() self.compute(collector, layers) collector.flush() t3 = datetime.now() t = datetime.now() - t0 logging.info(" \t{} [{}]".format(str(t3 - t1), str(t)))
[docs] def downscale(self, layer): if self.factor > 1: logging.info("Downscaling by the factor of " + str(self.factor)) layer = disaggregate(layer, self.factor) else: layer = layer[:] m = qmem() self.perf.update_mem_only(m) return layer
[docs] def compute(self, writer: Collector, layers): fid, _ = os.path.splitext(os.path.basename(self.infile)) now = datetime.now() shape = layers[0].shape logging.info( "%s:%s:strategy=%s:%s: %s: layer shape %s", str(now), self.geography.value, self.strategy.value, self.aggr_variables, fid, str(shape) ) if self.factor > self.perf.factor: self.perf.factor = self.factor x = layers[0].shape[0] y = layers[0].shape[1] if x > self.perf.shape_x: self.perf.shape_x = x if y > self.perf.shape_y: self.perf.shape_y = y if len(layers) == 1: layer = layers[0] for record in StatsCounter.process( self.strategy, self.shapefile, self.affine, layer, self.geography ): row = [record.value, record.prop] if self.extra_values: row += self.extra_values writer.writerow(row) else: for record in StatsCounter.process_layers( self.strategy, self.shapefile, self.affine, layers, self.geography ): row: List[Any] = [v for v in record.values] row.append(record.prop) if self.extra_values: row += self.extra_values writer.writerow(row) dt = datetime.now() - now self.perf.update_mem_time(StatsCounter.max_mem_used, None, dt) logging.info( "%s: %s completed in %s, memory used: %s", str(datetime.now()), fid, dt, sizeof_fmt(StatsCounter.max_mem_used) )
[docs]class NetCDFAggregator(Aggregator):
[docs] def open(self): self.dataset = Dataset(self.infile)
[docs] def get_dataset_variables(self) -> Set[str]: return set(self.dataset.variables)
[docs] def get_layer(self, var): logging.info("Extracting layer: " + var) return self.downscale(self.dataset[var])
[docs] def on_set_strategy(self, ram: int, set_factor): viewer = NCViewer(self.infile) if viewer.missing_value: self.missing_value = viewer.missing_value if set_factor: self.factor = viewer.get_optimal_downscaling_factor(ram)
[docs]class GeoTiffAggregator(Aggregator): def __init__(self, infile: str, variable: str, outfile: str, strategy: RasterizationStrategy, shapefile: str, geography: Geography, extra_columns: Tuple[List[str], List[str]] = None, ram: int = 0): super().__init__(infile, variable, outfile, strategy, shapefile, geography, extra_columns, ram) self.array = None
[docs] def open(self): if self.dataset is None: self.dataset = rasterio.open(self.infile) self.array = self.dataset.read()
[docs] def get_dataset_variables(self) -> Set[str]: return set(self.dataset.descriptions)
[docs] def get_layer(self, var): logging.info("Extracting layer: " + var) if var not in self.dataset.descriptions: raise ValueError(f'Variable {var} is not in the dataset') idx = self.dataset.descriptions.index(var) return self.downscale(self.array[idx])
[docs] def on_set_strategy(self, ram: int, set_factor): self.dataset = rasterio.open(self.infile) grid_size = self.dataset.shape[0] * self.dataset.shape[1] if set_factor: self.factor = estimate_optimal_downscaling_factor(grid_size, ram)
if __name__ == '__main__': init_logging(level=logging.INFO) fn = sys.argv[1] if not fn.endswith(".nc"): raise ValueError("NetCDF file is expected (extension .nc)") sf = sys.argv[2] of, _ = os.path.splitext(fn) of += ".csv.gz" a = NetCDFAggregator( infile=fn, variable="PM25", outfile=of, strategy=RasterizationStrategy.downscale, shapefile=sf, geography=Geography.county, extra_columns=(["Year", "Month"], ["2018", "12"]) ) a.execute() print("All Done")