Source code for gridmet.netCDF_file_processor


#  Copyright (c) 2023.  Harvard University
#
#   Developed by Research Software Engineering,
#   Harvard University Research Computing and Data (RCD) Services.
#
#   Author: Michael A Bouzinier
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#          http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
#

"""
An entry point to a command line utility aggregating grid data
provided as NetCDF file over a set of shape files, assigning
labels defined in the shape files to the aggregated values


`see https://www.unidata.ucar.edu/software/netcdf/`__
"""

import logging
import os
from datetime import datetime
from typing import Optional, Dict

import yaml
from nsaph_utils.utils.io_utils import sizeof_fmt

from gridmet.gridmet_tools import find_shape_file
from nsaph import init_logging
from nsaph_gis.compute_shape import StatsCounter

from gridmet.config import GridContext, OutputType
from gridmet.aggregator import Aggregator, GeoTiffAggregator, NetCDFAggregator

[docs]class NetCDFFile: def __init__(self, context: GridContext = None): """ Creates a new instance :param context: An optional GridmetContext object, if not specified, then it is constructed from the command line arguments """ if not context: context = GridContext(doc=__doc__).instantiate() self.context = context self.file_type = None log = os.path.basename(self.context.raw_downloads).split('.')[0] init_logging( name="aggr-" + log, level=logging.INFO ) self.aggregator: Optional[Aggregator] = None self.infile = self.context.raw_downloads self.extra_columns = None StatsCounter.statistics = context.statistics return
[docs] def on_prepare(self): """ This method can be overwritten by subclasses to configure proper aggregation """ pass
[docs] def get_aggregation_year(self): return self.context.years
[docs] def prepare(self): if self.infile.endswith(".nc"): self.file_type = "nc" aggregator = NetCDFAggregator elif self.infile.endswith(".tif") or self.infile.endswith(".tiff"): self.file_type = 'tiff' aggregator = GeoTiffAggregator elif OutputType.aggregation not in self.context.output: self.file_type = "nc" aggregator = NetCDFAggregator else: raise ValueError("NetCDF file is expected (extension .nc)") self.on_prepare() of, _ = os.path.splitext(os.path.basename(self.infile)) of += '_' + self.context.geography.value + ".csv" if not os.path.isdir(self.context.destination): os.makedirs(self.context.destination, exist_ok=True) of = os.path.join(self.context.destination, of) if self.context.compress: of += ".gz" if not self.context.shape_files and self.context.shapes_dir: self.context.shape_files = find_shape_file( self.context.shapes_dir, int(self.get_aggregation_year()), str(self.context.geography.value), "polygon" ) if len(self.context.shape_files) != 1: raise ValueError("Shape type is required and only one " "shape type is allowed for aggregation." "len(self.context.shape_files)={:d}" .format(len(self.context.shape_files))) shape_file = self.context.shape_files[0] if len(self.context.variables) > 0: variable = self.context.variables else: raise ValueError("No variables are specified") self.aggregator = aggregator( infile=self.infile, variable=variable, outfile=of, strategy=self.context.strategy, shapefile=shape_file, geography=self.context.geography, extra_columns=self.extra_columns, ram=self.context.ram ) return
[docs] def get_domain_name(self): return "exposures"
[docs] def get_table_name(self): if self.context.table is not None: return self.context.table of = os.path.basename(self.aggregator.outfile).split('.') return of[0]
[docs] def execute(self): start = datetime.now() if OutputType.aggregation in self.context.output: if os.path.isfile(self.infile): self.aggregator.execute() print( "Aggregation of data from {} by {} has been executed. " "Output: {}" .format( self.infile, self.context.geography.value, self.aggregator.outfile )) else: of = self.aggregator.write_header() logging.info("Input file was not found. Created empty file: {}" .format(os.path.abspath(of))) if OutputType.data_dictionary in self.context.output: registry = self.get_registry() of = os.path.join( self.context.destination, self.get_domain_name() + ".yaml" ) with open (of, "wt") as out: yaml.dump(registry, out) logging.info("Created data dictionary: " + os.path.abspath(of)) # Info: end = datetime.now() self.aggregator.perf.total_time = end - start self.aggregator.perf.log("Resources: ") return
[docs] def get_registry(self) -> Dict: return self.aggregator.get_registry( self.get_domain_name(), self.get_table_name(), description=self.context.description )
if __name__ == '__main__': task = NetCDFFile() task.prepare() task.execute()