Source code for cms.tools.mcr_combine_tables

#  Copyright (c) 2022. Harvard University
#
#  Developed by Research Software Engineering,
#  Faculty of Arts and Sciences, Research Computing (FAS RC)
#  Author: Michael A Bouzinier
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

"""
This utility combines Medicare Patient Summary or Inpatient Admissions data,
that is originally
in the form of one table per year into a single view.

It takes care of different types and format of the most common columns,
such as: year, state, DO[B/D], zip codes, state and county codes.

The actual schema is defined in ../models/medicare.yaml

This utility uses extended syntax compared with the general data modelling
"""

import logging
import os
from pathlib import Path
from typing import List, Tuple, Optional, Dict

import yaml

from nsaph import init_logging
from nsaph.data_model.domain import Domain
from nsaph.data_model.utils import split

from nsaph.db import Connection
from nsaph.loader import DBActivityMonitor

from nsaph.loader.common import DBTableConfig


[docs]class MedicareCombinedView: ps = "ps" ip = "ip" supported_tables = [ps, ip, "mbsf_d"] def __init__(self, context: DBTableConfig = None): if not context: context = DBTableConfig(None, __doc__).instantiate() self.context = context if not self.context.table: raise ValueError("'--table' is a required option") init_logging(name="combine-tables-into-" + self.context.table) self.view = self.context.table.lower() if self.view not in self.supported_tables: raise ValueError( "The only supported options are: " + ", ".join(self.supported_tables) ) src = Path(__file__).parents[1] rp = os.path.join(src, "models", "medicare.yaml") with open(rp) as f: content = yaml.safe_load(f) self.table = content["medicare"]["tables"][self.view] self.schema = content["medicare"]["schema"] self.sql = "" self.monitor = DBActivityMonitor(context) self.exception = None
[docs] def print_sql(self): if not self.sql: self.generate_sql() logging.info(self.sql)
# print(self.sql)
[docs] def execute(self): if self.context.dryrun: print("Dry run: nothing is done") return if not self.sql: self.generate_sql() with Connection(self.context.db, self.context.connection) as cnxn: with cnxn.cursor() as cursor: pid = Connection.get_pid(cnxn) self.monitor.execute(lambda: self.execute_sql(cursor, self.sql), lambda: self.monitor.log_activity(pid)) if self.exception is not None: raise self.exception cnxn.commit() print("All Done")
[docs] def execute_sql(self, cursor, sql: str): try: cursor.execute(sql) except Exception as x: logging.exception("Failed to execute SQL statement") self.exception = x
[docs] def generate_sql(self): with Connection(self.context.db, self.context.connection) as cnxn: cursor = cnxn.cursor() tables = self.get_tables(cursor) tt = [self.table_sql(cursor, t) for t in tables] view = self.table["create"].get("type", "VIEW") sql = "DROP {} IF EXISTS {}.{} CASCADE;\n".format( view, self.schema, self.view ) sql += "CREATE SCHEMA IF NOT EXISTS {};\n".format(self.schema) sql += "CREATE {} {}.{} AS \n".format(view, self.schema, self.view) sql += "\nUNION\n".join(tt) self.sql = sql
[docs] def get_tables(self, cursor): tables: List[str] = self.table["create"]["from"] if "exclude" in self.table["create"]: exclusions = set(self.table["create"]["exclude"]) else: exclusions = set() if isinstance(tables, str): tables = [tables] sql = """ SELECT table_schema, table_name FROM information_schema.tables WHERE ({}) ORDER BY 1 """ cc = [] for t in tables: t = t.replace('*', '%') if '.' in t: tt = t.split('.') cc.append("table_schema LIKE '{}' AND table_name LIKE '{}'" .format(tt[0], tt[1])) else: cc.append("table_name LIKE '{}'") sql = sql.format(" OR ".join(cc)) logging.debug(sql) cursor.execute(sql) return [(t[0], t[1]) for t in cursor if t[1] not in exclusions]
[docs] def table_sql(self, cursor, qtable: Tuple) -> str: schema, table = qtable columns = self.get_columns(cursor, table) sql = "SELECT \n{} \nFROM {}.{}" cc = [] for column in columns: target, src = column cc.append("{} AS {}".format(src, target)) sql = sql.format(",\n\t".join(cc), schema, table) return sql
[docs] def get_columns(self, cursor, table: str) -> List[Tuple]: columns: List[Tuple] = [] column_defs = Domain.get_columns(self.table) for clmn in column_defs: n, c = split(clmn) if "optional" in c and c["optional"]: opt = True else: opt = False if "source" in c: src = c["source"] else: src = [n] if isinstance(src, str): if src.strip()[0] != '(': src = [src] else: columns.append((n, src)) continue if "type" in c and "cast" in c: ctype = (c["type"], c["cast"]) else: ctype = None macros = dict() for candidate in src: if '$' in candidate: var = candidate[candidate.find('$') + 1] if var not in c: raise ValueError( "Macro {} is used but is not defined for column {}" " and candidate {}".format( var, n, candidate ) ) macros[var] = c[var] source_column = self.get_column(cursor, table, src, ctype, macros) # if "clean" in c: # source_column = c["clean"].format(n=source_column) if source_column is None: if opt: if "type" in c: source_column = "NULL::" + c["type"] else: source_column = "NULL" else: raise ValueError("{}.{}".format(table, n)) columns.append((n, source_column)) return columns
[docs] @classmethod def get_column(cls, cursor, table: str, candidates: List[str], ctype: Tuple, macros: Dict) -> Optional[str]: cols = [] simple_candidates = [c for c in candidates if '$' not in c] ext_candidates = [c for c in candidates if '$' in c] for candidate in ext_candidates: expr = cls.find_column2arr(cursor, table, candidate, macros) if expr: cols.append(expr) if not cols: cols = cls.get_simple_column(cursor, table, simple_candidates, ctype) if len(cols) > 1: raise ValueError( "Multiple options for table {}: {}".format( table, ', '.join(cols) ) ) if not cols: return None return cols[0]
[docs] @classmethod def get_simple_column(cls, cursor, table: str, candidates: List[str], ctype: Tuple): cols = [] sql = """ SELECT column_name, data_type FROM information_schema.columns WHERE ({}) AND table_name = '{}' AND table_schema = 'cms' """ cc = ["column_name = '{}'".format(c) for c in candidates] sql = sql.format(" OR ".join(cc), table) logging.debug(sql) cursor.execute(sql) if ctype is not None: casts = ctype[1] target_type = ctype[0] else: casts = None target_type = None for c in cursor: if ctype is not None and c[1].upper() != target_type.upper(): if c[1] in casts: cast = casts[c[1]] elif '*' in casts: cast = casts['*'] else: raise ValueError( "Column {}.{}: cast from {} to {} is not defined" .format( table, c[0], c[1], target_type ) ) cols.append(cast.format(column_name=c[0])) else: cols.append(c[0]) return cols
[docs] @classmethod def find_column2arr(cls, cursor, table: str, candidate: str, macros: Dict): cc = [candidate] for var in macros: subst: List[str] = macros[var] ccc = [] for c in cc: expansion = [ c.replace("${}".format(var), v) for v in subst ] ccc.extend(expansion) cc = ccc lst = ["'{}'".format(c) for c in cc] condition = "column_name IN ({})".format(', '.join(lst)) sql = """ SELECT column_name FROM information_schema.columns WHERE ({}) AND table_name = '{}' AND table_schema = 'cms' """.format(condition, table) logging.debug(sql) cursor.execute(sql) source_columns = [c[0] for c in cursor] if not source_columns: return None if len(source_columns) != len(cc): logging.warning( "Not all members [{:d}] were found for column {} in table {}" .format(len(source_columns), candidate, table) ) return None return "ARRAY[{}]".format(','.join(cc))
if __name__ == '__main__': mpst = MedicareCombinedView() mpst.print_sql() mpst.execute()