Source code for nsaph_gis.annotator

import csv
import os
from typing import List, Optional

import geopandas
import pandas
from numpy import nan
from shapely.geometry import Point

ZIP_COLUMNS = {'ZCTA'}
COUNTY_COLUMNS = {'STATEFP', 'COUNTYFP'}
CALCULATED_COLUMNS = {'COUNTY', 'STUSPS', 'STATEISO', 'STATE', 'FIPS5'}


[docs]class GISAnnotator: """ Geographic Annotator adds columns to a provided data frame containing latitude and longitude (or other CRS) with labels coming from provided shape files, such as zip codes or county names (or FIPS codes) """ def __init__(self, shape_files: List[str], columns: List[str], crs='EPSG:4326'): """ Create the Annotator :param shape_files: list of paths to shape files :param columns: List of columns to be added by the annotator :param crs: Coordinate reference system (CRS) used by the input data """ if set(columns) - (ZIP_COLUMNS | COUNTY_COLUMNS | CALCULATED_COLUMNS): raise ValueError('Unknown requested columns') self.shape_files = shape_files self.crs = crs self.columns = columns self.zip_shapes = None self.county_shapes = None self._states = dict()
[docs] def join(self, df: pandas.DataFrame, x='longitude', y='latitude') -> pandas.DataFrame: """ Adds columns with the labels to the data :param df: Incoming data frame :param x: A column, containing longitude :param y: A column, containing latitude :return: data frame with added annotations """ if df.empty: return df self._load_shape_files() self._check_columns() geometry = [Point(xy) for xy in zip(df[x], df[y])] if self.zip_shapes is not None: df = self._add_shape_columns(df, geometry, self.zip_shapes, ZIP_COLUMNS) if self.county_shapes is not None: df = self._add_shape_columns(df, geometry, self.county_shapes, COUNTY_COLUMNS) self._add_calculated_columns(df) return df
def _check_columns(self): if len(set(self.columns) & ZIP_COLUMNS) < 1 and self.zip_shapes is None: raise ValueError('ZIP column is requested, but no zip shape file found') if len(set(self.columns) & COUNTY_COLUMNS) < 1 and self.county_shapes is None: raise ValueError('County columns are requested, but no county shape file found')
[docs] @staticmethod def matches(columns: List[str], pattern: str) -> Optional[str]: for c in columns: if c.lower().startswith(pattern.lower()): return c return None
def _load_shape_files(self): if self.zip_shapes is not None or self.county_shapes is not None: return for filename in self.shape_files: data = geopandas.GeoDataFrame.from_file(filename).to_crs(self.crs) if 'ZIP' in data.columns: data.rename(columns={'ZIP': 'ZCTA'}, inplace=True) self.zip_shapes = data elif 'ZCTA' in data.columns: self.zip_shapes = data elif 'ZCTA5CE10' in data.columns: data.rename(columns={'ZCTA5CE10': 'ZCTA'}, inplace=True) self.zip_shapes = data elif 'ZCTA5CE20' in data.columns: data.rename(columns={'ZCTA5CE20': 'ZCTA'}, inplace=True) self.zip_shapes = data else: c = self.matches(data.columns, "ZCTA") if c is not None: data.rename(columns={'c': 'ZCTA'}, inplace=True) self.zip_shapes = data if 'STATEFP' in data.columns and 'COUNTYFP' in data.columns: self.county_shapes = data def _add_shape_columns( self, df: pandas.DataFrame, geometry: List, shape: geopandas.GeoDataFrame, shape_columns: set ) -> geopandas.GeoDataFrame: # join incoming data with polygons points = geopandas.GeoDataFrame(df, geometry=geometry, crs=self.crs) pts = geopandas.sjoin(points, shape, how='left') # drop all columns except of requested target_columns = list(df.columns) + list(set(self.columns) & shape_columns) df = geopandas.GeoDataFrame(pts[target_columns], geometry=geometry, crs=self.crs) return df def _add_calculated_columns(self, df: pandas.DataFrame) -> None: if 'STATEFP' not in df.columns: return if 'COUNTY' in self.columns: df['COUNTY'] = [ state_fp + county_fp if state_fp is not nan else None for state_fp, county_fp in zip(df['STATEFP'], df['COUNTYFP']) ] if 'FIPS5' in self.columns: df['FIPS5'] = [ state_fp + county_fp if state_fp is not nan else None for state_fp, county_fp in zip(df['STATEFP'], df['COUNTYFP']) ] if 'STATE' in self.columns: df['STATE'] = [ state_fp if state_fp is not nan else None for state_fp in df['STATEFP'] ] if 'STUSPS' in self.columns: df['STUSPS'] = [ self._get_state_by_fips(state_fp)['STUSPS'] if state_fp is not nan else None for state_fp in df['STATEFP'] ] if 'STATEISO' in self.columns: df['STATEISO'] = [ 'US-' + self._get_state_by_fips(state_fp)['STUSPS'] if state_fp is not nan else None for state_fp in df['STATEFP'] ] def _get_state_by_fips(self, fips: str) -> dict: if not self._states: self._read_states() return self._states[fips] def _read_states(self): states_filename = os.path.join(os.path.dirname(__file__), 'data', 'states.csv') with open(states_filename) as csvfile: reader = csv.DictReader(csvfile, delimiter=',') for state in reader: self._states[state['STATEFP']] = state