"""
Module used for converting CMS DAT files to CSV
"""
# Copyright (c) 2021-2022. Harvard University
#
# Developed by Research Software Engineering,
# Faculty of Arts and Sciences, Research Computing (FAS RC)
# Author: Michael A Bouzinier
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import glob
import gzip
import os
import shutil
import traceback
from collections import OrderedDict
from typing import List
from dateutil import parser as date_parser
import csv
[docs]def log(s):
with open("run.log", "at") as w:
w.write(str(s) + '\n')
[docs]def width(s:str):
if '.' in s:
x = s.split('.')
return (int(x[0]), int(x[1]))
return (int(s), None)
[docs]class MedparParseException(Exception):
"""
Exception raised if data in a DAT file cannot be parsed
"""
def __init__(self, msg:str, pos:int):
super(MedparParseException, self).__init__(msg, pos)
self.pos = pos
[docs]class ColumnAttribute:
"""
Attribute of a column as described in the FTS file
"""
def __init__(self, start:int, end:int, conv):
self.start = start
self.end = end
self.conv = conv
[docs] def arg(self, line:str):
try:
return self.conv(line[self.start:self.end].strip())
except:
pass
[docs]class ColumnDef:
"""
Column definition as described in the FTS file
"""
def __init__(self, pattern):
fields = pattern.split(' ')
assert len(fields) == 7
self.attributes = []
i = 0
c = 0
for i in range(0, len(fields)):
l = len(fields[i])
if i in [0, 4]:
f = int
elif i in [5]:
f = width
else:
f = str
self.attributes.append(ColumnAttribute(c, c+l, f))
c += l + 1
[docs] def read(self, line):
attrs = [a.arg(line) for a in self.attributes]
return Column(*attrs)
[docs]class Column:
"""
Class representing column metadata required for generating
database schema
"""
def __init__(self, ord:int, long_name:str, short_name:str, type:str, start:int, width, desc:str):
self.name = short_name
self.long_name = long_name
self.type = type
self.ord = ord
self.start = start - 1
self.length = width[0]
self.end = self.start + self.length
self.desc = desc
self.d = width[1]
def __str__(self) -> str:
return "{}: [{}]".format(super().__str__(), self.name)
[docs]class MedicareFile:
"""
Class to manipulate a single CMS DAT file
"""
def __init__(self, dir_path: str, name: str,
year:str = None, dest:str = None):
self.dir = dir_path
if dest:
if not os.path.exists(dest):
os.makedirs(dest, exist_ok=True)
self.dest = dest
else:
self.dest = self.dir
self.name = os.path.join(self.dir, name)
self.fts = '.'.join([self.name, "fts"])
self.csv = os.path.join(self.dest, '.'.join([name, "csv.gz"]))
if not os.path.isfile(self.fts):
raise Exception("Not found: " + self.fts)
pattern = "{}*.dat".format(name)
self.dat = glob.glob(os.path.join(self.dir, pattern))
self.line_length = None
self.metadata = dict()
self.columns = OrderedDict()
self.init()
block_size = self.metadata["Exact File Record Length (Bytes in Variable Block)"]
block_size = block_size.strip()
if ',' in block_size:
print("[{}] Stripping commas: {}".format(self.fts, block_size))
block_size = block_size.replace(',','')
self.block_size = int(block_size)
if not year:
year = name[-4:]
self.year = year
[docs] def init(self):
with open(self.fts) as fts:
lines = [line for line in fts]
for i in range(0, len(lines)):
line = lines[i]
if line.startswith('---') and '------------------' in line:
break
if ':' in line:
x = line.split(':', 1)
self.metadata[x[0]] = x[1]
cdef = ColumnDef(line)
while i < len(lines) - 1:
i += 1
line = lines[i]
if 'End of Document' in line:
break
if not line.strip():
break
if line.startswith("Note:"):
break
column = cdef.read(line)
self.columns[column.name] = column
[docs] def read_record(self, data, ln) -> List:
"""
Reads one record from DAT file
:param data: a slice of raw data from DAT file
:param ln: line number, used for reporting
:return: Record data as list
"""
exception_count = 0
pieces = {}
for name in self.columns:
column = self.columns[name]
pieces[name] = data[column.start:column.end]
record = []
for name in self.columns:
column = self.columns[name]
s = pieces[name].decode("utf-8")
try:
if column.type == "NUM" and not column.d:
val = s.strip()
if val:
record.append(int(val))
else:
record.append(None)
elif column.type == "DATE":
if s.strip():
record.append(date_parser.parse(s))
else:
record.append(None)
else:
record.append(s)
except Exception as x:
log("{:d}: {}[{:d}]: - {}".format(
ln, column.name, column.ord, str(x))
)
record.append(s)
exception_count += 1
if exception_count > 3:
log(data)
raise MedparParseException("Too meany exceptions", column.start)
return record
[docs] def validate(self, record):
"""
Asserts that the given record is consistent with
the file from which it is read.
:param record: Record
:return: None
"""
yc = None
if "BENE_ENROLLMT_REF_YR" in self.columns:
yc = "BENE_ENROLLMT_REF_YR"
if "MEDPAR_YR_NUM" in self.columns:
yc = "MEDPAR_YR_NUM"
if yc is None:
raise AssertionError("Year column was not found in FTS")
assert record[self.columns[yc].ord - 1] == self.year
[docs] def count_lines_in_source(self):
"""
Counts number of lines in the original DAT file
:return: number of lines
"""
lines = 0
blocks = 0
bts = 0
t1 = datetime.datetime.now()
t0 = t1
for dat in self.dat:
print("{}: {}".format(t0.isoformat(), dat))
counter = 0
with open(dat, "rb") as source:
while source.readable():
chunk = source.read(1024*1024)
if len(chunk) < 1:
break
blocks += 1
bts += len(chunk)
n = chunk.count(b'\n')
counter += n
t2 = datetime.datetime.now()
elapsed = t2 - t1
if elapsed > datetime.timedelta(minutes=10):
t1 = t2
print(
(
"{} running for {}. Blocks = {:,}, lines = {:,}"
+ ", bytes = {:,}"
).format(dat, str(t2 - t0), blocks, counter, bts)
)
print("{}: {:d}".format(os.path.basename(dat), counter))
lines += counter
print("{}[Total]: {:d}".format(self.name, lines))
return lines
[docs] def count_lines_in_dest(self):
"""
Counts number of lines in the resulting CSV file
:return: number of lines
"""
lines = 0
if not os.path.isfile(self.csv):
return 0
with gzip.open(self.csv, "rt") as out:
for _ in out:
lines += 1
print("{}: {:d}".format(self.csv, lines))
return lines
[docs] def status(self) -> str:
"""
Checks if a given DAT file has been successfully converted to CSV
:return: Value of the status of the conversion
"""
try:
if not os.path.isfile(self.csv):
return "NONE"
l2 = self.count_lines_in_dest()
if l2 < 1:
return "EMPTY"
l1 = self.count_lines_in_source()
if l1 != l2:
return "MISMATCH: {:d}=>{:d}".format(l1, l2)
return "READY"
except Exception as x:
print(self.fts)
traceback.print_exception(type(x), x, None)
return "ERROR: " + str(x)
[docs] def status_message(self):
"""
Checks if a given DAT file has been successfully converted to CSV
:return: Message, containing the status of the conversion
"""
return "{}: {}".format(self.fts, self.status())
[docs] def export(self):
"""
Performs actual conversion
"""
if self.dir != self.dest:
shutil.copy(self.fts, self.dest)
t1 = datetime.datetime.now()
t0 = t1
with gzip.open(self.csv, "wt") as out:
writer = csv.writer(out, quoting=csv.QUOTE_MINIMAL, delimiter='\t')
for dat in self.dat:
print(dat)
counter = 0
good = 0
bad_lines = 0
remainder = b''
with open(dat, "rb") as source:
while source.readable():
l = self.block_size - len(remainder) + 100
block = remainder + source.read(l)
if len(block) < self.block_size:
break
idx = self.block_size
try:
record = self.read_record(block[:idx], counter)
self.validate(record)
writer.writerow(record)
good += 1
except MedparParseException as x:
log("Line = " + str(counter) + ':' + str(x.pos))
bad_lines += 1
log(x)
for idx in range(x.pos, self.block_size):
if block[idx] in [10, 13]:
break
except AssertionError as x:
log("Line = " + str(counter))
bad_lines += 1
log(x)
while idx < len(block) and block[idx] in [10, 13]:
idx += 1
remainder = block[idx:]
block = None
counter += 1
if (counter%100000) == 0:
t2 = datetime.datetime.now()
t1 = t2
print("{}[{}]: {:,}/{:,}/{:,}".format(
dat, str(t2 - t0),
counter, good, bad_lines
))
print("{} processed. Bad lines: {:,}"
.format(self.fts, bad_lines))
[docs] def info(self):
"""
Prints info about the DAT file (metadata) to standard output
"""
for s in [
"Columns in File",
"Exact File Record Length (Bytes in Variable Block)"
]:
print("{}: {}".format(s, self.metadata[s]))
for name in self.columns:
c = self.columns[name]
print("{:d} - {} - {} - {:d}".format(c.ord, c.name, c.type, c.start))
if __name__ == '__main__':
m = MedicareFile(os.curdir, "medpar_all_file_res000017155_req007087_2015")
m.info()
m.export()