Source code for gridmet.netCDF_file_processor

#  Copyright (c) 2023.  Harvard University
#   Developed by Research Software Engineering,
#   Harvard University Research Computing and Data (RCD) Services.
#   Author: Michael A Bouzinier
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  See the License for the specific language governing permissions and
#  limitations under the License.

An entry point to a command line utility aggregating grid data
provided as NetCDF file over a set of shape files, assigning
labels defined in the shape files to the aggregated values


import logging
import os
from datetime import datetime
from typing import Optional, Dict

import yaml
from nsaph_utils.utils.io_utils import sizeof_fmt

from gridmet.gridmet_tools import find_shape_file
from nsaph import init_logging
from nsaph_gis.compute_shape import StatsCounter

from gridmet.config import GridContext, OutputType
from gridmet.aggregator import Aggregator, GeoTiffAggregator, NetCDFAggregator

[docs]class NetCDFFile: def __init__(self, context: GridContext = None): """ Creates a new instance :param context: An optional GridmetContext object, if not specified, then it is constructed from the command line arguments """ if not context: context = GridContext(doc=__doc__).instantiate() self.context = context self.file_type = None log = os.path.basename(self.context.raw_downloads).split('.')[0] init_logging( name="aggr-" + log, level=logging.INFO ) self.aggregator: Optional[Aggregator] = None self.infile = self.context.raw_downloads self.extra_columns = None StatsCounter.statistics = context.statistics return
[docs] def on_prepare(self): """ This method can be overwritten by subclasses to configure proper aggregation """ pass
[docs] def get_aggregation_year(self): return self.context.years
[docs] def prepare(self): if self.infile.endswith(".nc"): self.file_type = "nc" aggregator = NetCDFAggregator elif self.infile.endswith(".tif") or self.infile.endswith(".tiff"): self.file_type = 'tiff' aggregator = GeoTiffAggregator elif OutputType.aggregation not in self.context.output: self.file_type = "nc" aggregator = NetCDFAggregator else: raise ValueError("NetCDF file is expected (extension .nc)") self.on_prepare() of, _ = os.path.splitext(os.path.basename(self.infile)) of += '_' + self.context.geography.value + ".csv" if not os.path.isdir(self.context.destination): os.makedirs(self.context.destination, exist_ok=True) of = os.path.join(self.context.destination, of) if self.context.compress: of += ".gz" if not self.context.shape_files and self.context.shapes_dir: self.context.shape_files = find_shape_file( self.context.shapes_dir, int(self.get_aggregation_year()), str(self.context.geography.value), "polygon" ) if len(self.context.shape_files) != 1: raise ValueError("Shape type is required and only one " "shape type is allowed for aggregation." "len(self.context.shape_files)={:d}" .format(len(self.context.shape_files))) shape_file = self.context.shape_files[0] if len(self.context.variables) > 0: variable = self.context.variables else: raise ValueError("No variables are specified") self.aggregator = aggregator( infile=self.infile, variable=variable, outfile=of, strategy=self.context.strategy, shapefile=shape_file, geography=self.context.geography, extra_columns=self.extra_columns, ram=self.context.ram ) return
[docs] def get_domain_name(self): return "exposures"
[docs] def get_table_name(self): if self.context.table is not None: return self.context.table of = os.path.basename(self.aggregator.outfile).split('.') return of[0]
[docs] def execute(self): start = if OutputType.aggregation in self.context.output: if os.path.isfile(self.infile): self.aggregator.execute() print( "Aggregation of data from {} by {} has been executed. " "Output: {}" .format( self.infile, self.context.geography.value, self.aggregator.outfile )) else: of = self.aggregator.write_header()"Input file was not found. Created empty file: {}" .format(os.path.abspath(of))) if OutputType.data_dictionary in self.context.output: registry = self.get_registry() of = os.path.join( self.context.destination, self.get_domain_name() + ".yaml" ) with open (of, "wt") as out: yaml.dump(registry, out)"Created data dictionary: " + os.path.abspath(of)) # Info: end = self.aggregator.perf.total_time = end - start self.aggregator.perf.log("Resources: ") return
[docs] def get_registry(self) -> Dict: return self.aggregator.get_registry( self.get_domain_name(), self.get_table_name(), description=self.context.description )
if __name__ == '__main__': task = NetCDFFile() task.prepare() task.execute()