Source code for census.tigerweb

"""
tigerweb.py
=================================================
Code for interacting with the Census TIGERWEb API, query area and download shape
 files.
"""

#  Copyright (c) 2022. Harvard University
#
#  Developed by Harvard T.H. Chan School of Public Health
#  (HSPH) and Research Software Engineering,
#  Faculty of Arts and Sciences, Research Computing (FAS RC)
#  Author: Ben Sabath (https://github.com/mbsabath)
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

import logging
import os

import pandas as pd
import requests

from .data import load_state_codes
from .exceptions import CensusException
from .query import _prep_vars

GEOMETRY_CODES = {"zcta": 2,
                  "tract": 8,
                  "block group": 10,
                  "state": 82,
                  "county": 84}

TIGER_NAMES = {
    "zcta": "ZCTA5",
    "tract": "TRACT",
    "county": "COUNTY",
    "state": "STATE",
    "block group": "BG"
}

LOG = logging.getLogger(__name__)


class _BBox:
    """
    Internal class defining a simple bounding box
    """

    def __init__(self, xmin=-1.96724487545E7, ymin=-1678452.6019, xmax=1.62682738027E7, ymax=1.15436424852E7):
        self.xmin = xmin
        self.ymin = ymin
        self.xmax = xmax
        self.ymax = ymax

    def __str__(self):
        return str(self.xmin) + "," + str(self.ymin) + "," + str(self.xmax) + "," + str(self.ymax)

    def subdivide(self, factor=2):
        """
        Create list of BBox objects that break up the axes of the parent obejct by `factor`. I.e. A factor of
        :math:`n` will create :math:`n^2` child BBox objects

        :param factor: Factor to subdivide by
        :return: List of sub-boxes
        """
        xdiff = (self.xmax - self.xmin)/factor
        ydiff = (self.ymax - self.ymin)/factor

        out = []

        for i in range(factor):
            for j in range(factor):
                out.append(_BBox(xmin=self.xmin + (i * xdiff),
                                 ymin=self.ymin + (j * ydiff),
                                 xmax=self.xmin + ((i+1) * xdiff),
                                 ymax=self.ymin + ((j+1) * ydiff)))

        return out


def _tigerweb_endpoint(geometry):
    """
    Get the API endpoint for making queries to the census tigerweb

    :param geometry: type of census geometry to use
    :return: string of rest API URL endpoint
    """
    out = "https://tigerweb.geo.census.gov/arcgis/rest/services/TIGERweb/tigerWMS_ACS2019/MapServer/"
    out += str(GEOMETRY_CODES[geometry])
    out += "/query"

    return out


# noinspection PyDefaultArgument
def _tigerweb_params(attributes=["GEOID"], split_factor: int = None):
    """
    Create a list of  dictionaries of the necessary parameters to query the census tigerweb API. Returns
    a list to enable combining of queries that return sets larger than the maximum number of objects

    :param attributes: List of names of attributes to include. You should always include "GEOID" to enable
      linking of features with other census data
    :param split_factor: Factor to divide the bounding box by to enable splitting up of large queries
    :return: list of dictionary of needed parameters
    """
    out = []
    bbox = _BBox()

    if split_factor:
        boxes = bbox.subdivide(split_factor)
    else:
        boxes = [bbox]

    for box in boxes:
        params = dict()
        params["geometry"] = str(box)
        params["geometryType"] = "esriGeometryEnvelope"
        params["spatialRel"] = "esriSpatialRelIntersects"
        params["outFields"] = _prep_vars(attributes)
        params["returnTrueCurves"] = "false"
        params["returnTrueCurves"] = "false"
        params["returnIdsOnly"] = "false"
        params["returnCountOnly"] = "false"
        params["returnZ"] = "false"
        params["returnM"] = "false"
        params["returnExtentsOnly"] = "false"
        params["returnGeometry"] = "false"
        params["f"] = "pjson"

        out.append(params)

    return out


[docs]def get_area(geometry, sq_mi=True): """ Create a data frame of Census GEOIDs and Area. Due to the Tigerweb API's limiting of the number of features per query to 100,000, block groups aren't currently supported through this wrapper. :param geometry: type of census geometry to use :param sq_mi: Should areas be converted to square miles? :return: pandas data frame """ url = _tigerweb_endpoint(geometry) if geometry == "block group": split_factor = 10 else: split_factor = None param_list = _tigerweb_params(["GEOID", "AREALAND"], split_factor) out = None queries = 0 for params in param_list: queries += 1 LOG.debug("Area query " + str(queries) + " of " + str(len(param_list))) result = requests.get(url, params) if "error" in result.json() and result.json()["error"]["code"] == 404: raise CensusException(f"Url { url } not found") result = list(map(lambda x: x['attributes'], result.json()['features'])) result = pd.DataFrame(result) if out is None: out = result elif len(result.index) > 0: out = pd.merge(out, result, how="outer", on=["GEOID", "AREALAND"]) elif len(result.index) == 100000: LOG.error("Max rows hit, increase split factor, ending") raise CensusException("Max rows hit, increase split factor, ending") if sq_mi: out['AREALAND'] = out['AREALAND']/2589988 # 2589988 square meters to a square mile out.columns = out.columns.str.lower() return out
def _tiger_line_url(geometry, year): """ Return URL (or URLs) of zip file(s) containing shape files for a given census geography :param geometry: name of census geometry to download :param year: year of geometry to download :return: List of URLs """ base = "https://www2.census.gov/geo/tiger/" if geometry == "zcta" and year == 2011: return _tiger_line_url("zcta", 2010) # No ZCTAs listed in 2011 (for some reason) if year >= 2010: base += "TIGER" + str(year) + "/" base += TIGER_NAMES[geometry] + "/" if year == 2010: base += "2010/" else: year = 2000 base += "TIGER2010/" + TIGER_NAMES[geometry] + "/2000/" out = [] # Define file stem for each geometry if geometry == "zcta": if year == 2000: base += "tl_2010_us_zcta500.zip" else: base += "tl_" + str(year) + "_us_zcta510.zip" out.append(base) elif geometry == "county": if year == 2000: base += "tl_2010_us_county00.zip" elif year == 2010: base += "tl_2010_us_county10.zip" else: base += "tl_" + str(year) + "_us_county.zip" out.append(base) elif geometry == "state": if year == 2000: base += "tl_2010_us_state00.zip" elif year == 2010: base += "tl_2010_us_state10.zip" else: base += "tl_" + str(year) + "_us_state.zip" out.append(base) elif geometry == "block group": if year == 2000: base += "tl_2010_" for state in load_state_codes()['state']: out.append(base + state + "_bg00.zip") elif year == 2010: base += "tl_2010_" for state in load_state_codes()['state']: out.append(base + state + "_bg10.zip") else: base += "tl_" + str(year) + "_" for state in load_state_codes()['state']: out.append(base + state + "_bg.zip") elif geometry == "tract": if year == 2000: base += "tl_2010_" for state in load_state_codes()['state']: out.append(base + state + "_tract00.zip") elif year == 2010: base += "tl_2010_" for state in load_state_codes()['state']: out.append(base + state + "_tract10.zip") else: base += "tl_" + str(year) + "_" for state in load_state_codes()['state']: out.append(base + state + "_tract.zip") else: LOG.error("invalid geography: " + geometry + "provided" ) raise CensusException("invalid geography: " + geometry + "provided") return out def _download_file(url, out_dir): local_filename = out_dir + "/" + url.split('/')[-1] # NOTE the stream=True parameter below with r.get(url, stream=True) as result: result.raise_for_status() with open(local_filename, 'wb') as f: for chunk in result.iter_content(chunk_size=1024**2): f.write(chunk) return local_filename
[docs]def download_geometry(geometry, year=2019, out_dir="."): """ Get spatial information for a census geometry in geojson format and save it to disk :param geometry: type of census geometry to use :param year: Year to get geometry for :param out_dir: Directory to save downloaded files in. Note that due to requiring multiple downloads, tract and block group downloads will create a directory if no out_dir is defined. :return: None, downloads files only """ if geometry == "tract" and out_dir == ".": out_dir = "tract" elif geometry == "block group" and out_dir == ".": out_dir = "bg" if not os.path.isdir(out_dir): os.makedirs(out_dir) urls = _tiger_line_url(geometry, year) for url in urls: _download_file(url, out_dir)