Source code for cms.duplicates

"""
In this module we attempt to analyze duplicate records in CMS medicaid data

Original R function to remove duplicates:
[de_duplicate](https://github.com/NSAPH/NSAPHplatform/blob/master/R/intake.R#L29-L45)

selects a random record from a set of duplicate records

Original R code that has been used to create Demographics files:
https://github.com/NSAPH/data_model/blob/master/scripts/medicaid_scripts/processed_data/1_create_demographics_data.R

calls de_duplicate function:

    de_duplicate(demographics, "BENE_ID", seed = 987987)


If I understand this code correctly, it:

* Removes all beneficiaries that have multiple records with inconsistent death dates (el_dod)
    (https://github.com/NSAPH/data_model/blob/master/scripts/medicaid_scripts/processed_data/1_create_demographics_data.R#L50)
    See also https://github.com/NSAPH/data_requests/tree/master/request_projects/dec2019_medicaid_platform_cvd#varying-death-dates
* For remaining beneficiaries with multiple records, randomly selects a single record
    (https://github.com/NSAPH/data_model/blob/master/scripts/medicaid_scripts/processed_data/1_create_demographics_data.R#L44)
    as stated here: https://github.com/NSAPH/data_requests/tree/master/request_projects/dec2019_medicaid_platform_cvd#variation-in-demographic-information-across-years

More details about duplicates:

* Within states: https://github.com/NSAPH/data_requests/tree/master/request_projects/dec2019_medicaid_platform_cvd#duplicates-within-states
* Across states: https://github.com/NSAPH/data_requests/tree/master/request_projects/dec2019_medicaid_platform_cvd#duplicates-across-states

Official documentation
https://www2.ccwdata.org/documents/10280/19002246/ccw-max-user-guide.pdf

Section Assignment of a Beneficiary Identifier

To construct the CCW BENE_ID, the CMS CCW team developed an internal cross-reference file
consisting of historical Medicaid and Medicare enrollment information using CMS data sources
such as the Enterprise Cross Reference (ECR) file. When a new MAX PS file is received, the
MSIS_ID, STATE_CD, SSN, DOB, Gender and other beneficiary identifying information is compared
against the historical enrollment file. If there is a single record in the historical enrollment file
that “best matches” the information in the MAX PS record, then the BENE_ID on that historical
record is assigned to the MAX PS record. If there is no match or no “best match” after CCW has
exhausted a stringent matching process, a null (or missing) BENE_ID is assigned to the MAX PS
record. For any given year, approximately 7% to 8% of MAX PS records have a BENE_ID that is
null. Once a BENE_ID is assigned to a MAX PS record for a particular year (with the exception of
those assigned to a null value), it will not change. When a new MAX PS file is received, CCW
attempts to reassign those with missing BENE_IDs.

Also, see: https://resdac.org/cms-data/variables/encrypted-723-ccw-beneficiary-id

"""
#  Copyright (c) 2021. Harvard University
#
#  Developed by Research Software Engineering,
#  Faculty of Arts and Sciences, Research Computing (FAS RC)
#  Author: Michael A Bouzinier
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

import gzip
import json
import os.path
from argparse import ArgumentParser
from datetime import date, timedelta
from typing import Dict

from nsaph_utils.utils.io_utils import fopen

from nsaph.db import Connection

FIND_DUPLICATES = '''
SELECT
    bene_id,
    FILE,
    COUNT(*)
FROM
    cms.ps
WHERE  bene_id IS NOT NULL   
GROUP BY FILE, bene_id
HAVING COUNT(*) > 1
ORDER BY COUNT(*) desc
'''


EXPLORE_BENE = '''
SELECT
    *
FROM 
    cms.ps
WHERE bene_id = '{id}'    
'''


[docs]class DuplicatesExplorer: def __init__(self, arguments): self.arguments = arguments self.duplicates = None self.reset = arguments.reset self.duplicate_deaths = None self.duplicate_births = None self.change = False
[docs] def init (self): if self.duplicates is not None: return if not self.reset and os.path.isfile(self.arguments.report): self.load() return with Connection(self.arguments.db, self.arguments.connection) as connection: with (connection.cursor()) as cursor: print("Examining database {}".format(self.arguments.connection)) cursor.execute(FIND_DUPLICATES) self.duplicates = {row[0]: None for row in cursor} self.change = True self.save() return
[docs] def explore_one(self, id, cursor): sql = EXPLORE_BENE.format(id=id) cursor.execute(sql) all_columns = [desc[0] for desc in cursor.description] data = [row for row in cursor] diff_columns = [] for j in range(len(all_columns)): column = {row[j] for row in data} if len(column) > 1: diff_columns.append(j) report = { all_columns[j]: [str(row[j]) for row in data] for j in diff_columns } self.duplicates[id] = report self.change = True
[docs] def explore_all(self): with Connection(self.arguments.db, self.arguments.connection) as connection: with (connection.cursor()) as cursor: n = 0 for bene_id in self.duplicates: if not self.duplicates[bene_id]: self.explore_one(bene_id, cursor) n += 1 if (n % 100) == 0: print(n)
[docs] def is_loaded(self): if self.duplicates is None: return False for bene_id in self.duplicates: if not self.duplicates[bene_id]: return False return True
[docs] def load(self): with fopen(self.arguments.report, "rt") as f: self.duplicates = json.load(f)
[docs] def save(self): fname = self.arguments.report if not fname.endswith(".gz"): fname += ".gz" if self.change: with gzip.open(fname, "wt") as f: json.dump(self.duplicates, f, indent=2) name = '.'.join(fname.split('.')[:-2]) if self.duplicate_births is not None: with open(name + "_births.json", "wt") as f: json.dump(self.duplicate_births, f, indent=2) if self.duplicate_deaths is not None: with open(name + "_deaths.json", "wt") as f: json.dump(self.duplicate_deaths, f, indent=2)
[docs] def report(self): self.init() print("Found {:d} duplicates.".format(len(self.duplicates))) if self.reset or not self.is_loaded(): self.explore_all() self.duplicate_deaths = self.find_duplicate_dates("el_dod") self.duplicate_births = self.find_duplicate_dates("el_dob") self.save()
[docs] def find_duplicate_dates(self, date_type) -> Dict: if date_type == "el_dod": keep_none = False else: keep_none = True report = dict() for bene_id in self.duplicates: columns = self.duplicates[bene_id] if date_type in columns: dates = columns[date_type] date_range = sorted({d for d in dates if keep_none or d != "None"}) if len(date_range) < 2: continue report[bene_id] = dict() report[bene_id]["range"] = [d for d in date_range] report[bene_id]["MSIS"] = { columns["msis_id"][i]: dates[i] for i in range(len(dates)) } return report
[docs] def analyze_inconsistent_age(self): self.init() report = self.find_duplicate_dates("el_dob") max_delta = timedelta() max_bene = None num_age = 0 for bene_id in report: dates = sorted([date.fromisoformat(d) for d in report[bene_id]["range"] if d != "None"]) if len(dates) < 2: continue delta = dates[-1] - dates[0] if delta.days > 365: num_age += 1 if delta > max_delta: max_delta = delta max_bene = bene_id print("Max delta is {} for bene_id {}".format(str(max_delta), max_bene)) print("Beneficiaries with difference in age more than 1 year: {:d}".format(num_age))
[docs]def run(): arguments = args() explorer = DuplicatesExplorer(arguments) if arguments.action == "age": explorer.analyze_inconsistent_age() else: explorer.report()
[docs]def args(): parser = ArgumentParser ("Utility to explore duplicates in CMS medicaid data") parser.add_argument("--db", help="Path to a database connection parameters file", default="database.ini", required=False) parser.add_argument("--connection", help="Section in the database connection parameters file", default="nsaph2", required=False) parser.add_argument("--report", help="Path to a duplicates report file", default="cms_ps_duplicates.json.gz", required=False) parser.add_argument("--reset", action='store_true', help="Force recreating duplicate report if it already exists") parser.add_argument("--action", default="report", help="Force recreating duplicate report if it already exists") arguments = parser.parse_args() return arguments
if __name__ == '__main__': run()