"""
This module processes layers of data containing geographic coordinates
and observed values for a certain parameter (band)
and returns records containing
an identifier for a geographic shape and the value of the band
aggregated over the shape.
"""
import os.path
import sys
# Copyright (c) 2021. Harvard University
#
# Developed by Research Software Engineering,
# Faculty of Arts and Sciences, Research Computing (FAS RC)
# Author: Michael A Bouzinier
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from dataclasses import dataclass
from typing import Iterable, Optional, Tuple, List, Callable, Dict
import rasterio
from rasterstats import zonal_stats, gen_zonal_stats
from tqdm import tqdm, trange
import shapefile
from nsaph_utils.utils.profile_utils import mem, qmem, qqmem
from .constants import RasterizationStrategy, Geography
NO_DATA = 32767.0 # The default value filled in masked arrays in NetCDF files
# for the masked cells
# This value is overridden if it is defined by a property "missing_value"
# in the metadata of the NetCDF file
[docs]@dataclass
class Record:
value: Optional[float]
prop: str
[docs]@dataclass
class MultiRecord:
values: List[Optional[float]]
prop: str
[docs]class StatsCounter:
statistics = "mean"
max_mem_used = 0
[docs] @classmethod
def get_key_for_geography(cls, shpfile: str, geography: Geography) -> Tuple:
shape = shapefile.Reader(shpfile)
fields = [f[0] for f in shape.fields]
if geography == Geography.zip:
key = cls._determine_zip_key(fields)
elif geography == Geography.zcta:
key = cls._determine_zcta_key(fields)
elif geography == Geography.county:
key = cls._determine_county_key(fields)
else:
raise ValueError("Unsupported geography: " + str(geography))
return key
[docs] @classmethod
def prepare_stats(cls, strategy: RasterizationStrategy, shpfile: str,
affine: rasterio.Affine, layer: Iterable, no_data,
is_generator: bool) -> List:
"""
Given a layer, i.e. a slice of a dataframe, and a shapefile
returns an iterable of records, containing aggregated values
of the observations over the shapes in the shapefile.
:param no_data: Value taht is treated as missing value
:param strategy: Rasterization strategy to be used
:param shpfile: A path to shapefile to be used
:param affine: An optional affine transformation to be applied to
the coordinates
:param layer: A slice of dataframe, containing coordinates and values
:return: A list of statistics compute objects
"""
if is_generator:
stats_function: Callable = gen_zonal_stats
else:
stats_function: Callable = zonal_stats
non_all_touched_strategies = [
RasterizationStrategy.default,
RasterizationStrategy.combined,
RasterizationStrategy.downscale,
]
all_touched_strategies = [
RasterizationStrategy.all_touched,
RasterizationStrategy.auto,
RasterizationStrategy.combined,
]
stats = []
if strategy in non_all_touched_strategies:
stats.append(
stats_function(
shpfile,
layer,
stats=cls.statistics,
affine=affine,
geojson_out=True,
all_touched=False,
nodata=no_data,
)
)
if strategy in all_touched_strategies:
stats.append(
stats_function(
shpfile,
layer,
stats=cls.statistics,
affine=affine,
geojson_out=True,
all_touched=True,
nodata=no_data,
)
)
return stats
[docs] @classmethod
def process(
cls,
strategy: RasterizationStrategy,
shpfile: str,
affine: rasterio.Affine,
layer: Iterable,
geography: Geography,
no_data=None
) -> Iterable[Record]:
"""
Given a layer, i.e. a slice of a dataframe, and a shapefile
returns an iterable of records, containing aggregated values
of the observations over the shapes in the shapefile.
:param no_data: Values that is treated as missing value
:param strategy: Rasterization strategy to be used
:param shpfile: A path to shapefile to be used
:param affine: An optional affine transformation to be applied to
the coordinates
:param layer: A slice of dataframe, containing coordinates and values
:param geography: WHat type of geography is to be used: zip codes
or counties
:return: An iterable of records, containing an identifier
of a certain shape with the aggregated value of the observation
for this shape
"""
if no_data is None:
no_data = NO_DATA
key = cls.get_key_for_geography(shpfile, geography)
stats = cls.prepare_stats(
strategy, shpfile, affine, layer, no_data, True
)
n = 0
step = 100
if len(stats) == 2:
# Combined strategy
iterator = zip(stats[0], stats[1])
zipped = True
else:
iterator = stats[0]
zipped = False
label = os.path.basename(shpfile)
cls.max_mem_used = 0
pid = os.getpid()
with tqdm(
file=sys.stdout,
desc=f"Aggregating over {label} using '{strategy.value}' strategy"
) as pbar:
for s in iterator:
if zipped:
s1, s2 = s
record = cls._combine(key, s1, s2)
else:
if isinstance(cls.statistics, list):
values = [
s['properties'][stat] for stat in cls.statistics
]
mean = ':'.join([
"{}={}".format(cls.statistics[i], str(values[i]))
for i in range(len(cls.statistics))
])
else:
mean = s['properties'][cls.statistics]
props = [s['properties'][subkey] for subkey in key]
prop = "".join(props)
record = Record(value=mean, prop=prop)
m = qqmem(pid)
if m > cls.max_mem_used:
cls.max_mem_used = m
if (n % step) == 0:
m = mem()
if m > cls.max_mem_used:
cls.max_mem_used = m
pbar.update(step)
if (n % step) == 0:
pbar.update(step)
n += 1
yield record
[docs] @classmethod
def process_layers(
cls,
strategy: RasterizationStrategy,
shpfile: str,
affine: rasterio.Affine,
layers: List[Iterable],
geography: Geography
) -> Iterable[MultiRecord]:
"""
Given a layer, i.e. a slice of a dataframe, and a shapefile
returns an iterable of records, containing aggregated values
of the observations over the shapes in the shapefile.
:param strategy: Rasterization strategy to be used
:param shpfile: A path to shapefile to be used
:param affine: An optional affine transformation to be applied to
the coordinates
:param layers: A list of slices of dataframe, containing coordinates
and values
:param geography: WHat type of geography is to be used: zip codes
or counties
:return: An iterable of records, containing an identifier
of a certain shape with the aggregated value of the observation
for this shape
"""
if strategy == RasterizationStrategy.combined:
raise IncompatibleArgumentsError(
"Combining all_tocuhed with default and multiple "
"variables is not implemented"
)
key = cls.get_key_for_geography(shpfile, geography)
stats = (
cls.prepare_stats(strategy, shpfile, affine, layer, NO_DATA,
True)[0]
for layer in layers
)
n = 0
step = 10
iterator = zip(*stats)
label = os.path.basename(shpfile)
cls.max_mem_used = 0
pid = os.getpid()
with tqdm(
file=sys.stdout,
desc=f"Aggregating over {label} using '{strategy.value}' strategy"
) as pbar:
for ss in iterator:
means = [s['properties'][cls.statistics] for s in ss]
props_array = [
[
s['properties'][subkey] for subkey in key
] for s in ss
]
props = {"".join(p) for p in props_array}
if len(props) != 1:
raise AggregationError("Conflicting geo ids: " + str(props))
prop = next(iter(props))
record = MultiRecord(values=means, prop=prop)
m = qqmem(pid)
if m > cls.max_mem_used:
cls.max_mem_used = m
if (n % step) == 0:
m = mem()
if m > cls.max_mem_used:
cls.max_mem_used = m
pbar.update(step)
n += 1
yield record
[docs] @classmethod
def process_layers_return_dict(
cls,
strategy: RasterizationStrategy,
shpfile: str,
affine: rasterio.Affine,
layers: List[Iterable],
geography: Geography
) -> Dict[str, MultiRecord]:
"""
Given a layer, i.e. a slice of a dataframe, and a shapefile
returns an iterable of records, containing aggregated values
of the observations over the shapes in the shapefile.
:param strategy: Rasterization strategy to be used
:param shpfile: A path to shapefile to be used
:param affine: An optional affine transformation to be applied to
the coordinates
:param layers: A list of slices of dataframe, containing coordinates
and values
:param geography: WHat type of geography is to be used: zip codes
or counties
:return: A List of records, containing an identifier
of a certain shape with the aggregated value of the observation
for this shape
"""
key = cls.get_key_for_geography(shpfile, geography)
records = dict()
for l1 in range(len(layers)):
layer = layers[l1]
stats = cls.prepare_stats(strategy, shpfile, affine, layer, NO_DATA,
False)
for i in range(len(stats[0])):
if len(stats) == 2:
# Combined strategy
mean, prop = cls._combine(key, stats[0][i], stats[1][i])
else:
mean = stats[0][i]['properties'][cls.statistics]
props = [stats[0][i]['properties'][subkey] for subkey in
key]
prop = "".join(props)
if prop in records:
record = records[prop]
else:
record = MultiRecord([None for _ in range(l1)], prop)
records[prop] = record
record.values.append(mean)
print('*', end=None)
print()
return records
@classmethod
def _determine_zip_key(cls, row) -> Tuple:
candidates = ["ZIP"]
return cls._determine_key(row, candidates),
@classmethod
def _determine_zcta_key(cls, row) -> Tuple:
candidates = ["ZCTA5", "ZCTA5CE00", "ZCTA5CE10", "ZCTA5CE20"]
return cls._determine_key(row, candidates),
@classmethod
def _determine_county_key(cls, row) -> Tuple:
candidates = ["COUNTY", "COUNTYFP"]
c = cls._determine_key(row, candidates)
s = cls._determine_key(row, ["STATE", "STATEFP"])
return s, c
@staticmethod
def _determine_key(row, candidates) -> str:
props = None
for candidate in candidates:
if isinstance(row, dict):
if candidate in row['properties']:
return candidate
props = str([key for key in row['properties']])
elif isinstance(row, list):
if candidate in row:
return candidate
props = row
else:
raise ValueError("Unknown type of row: " + str(row))
raise ValueError(
f"None of the expected properties found ('{candidates}'). "
+ f"Available: '{props}'"
)
@classmethod
def _combine(cls, key, r1, r2) -> Record:
prop1 = "".join([r1['properties'][subkey] for subkey in key])
prop2 = "".join([r2['properties'][subkey] for subkey in key])
assert prop1 == prop2
m1 = r1['properties'][cls.statistics]
m2 = r2['properties'][cls.statistics]
if m1 and m2:
mean = (m1 + m2) / 2
elif m2:
mean = m2
elif m1:
raise AssertionError("m1 && !m2")
else:
mean = None
return Record(value=mean, prop=prop1)
[docs]class AggregationError(RuntimeError):
pass
[docs]class IncompatibleArgumentsError(NotImplementedError):
pass