"""
Converter for CMS DAT files described by FTS to CSV
"""
# Copyright (c) 2022-2022. Harvard University
#
# Developed by Research Software Engineering,
# Faculty of Arts and Sciences, Research Computing (FAS RC)
# Author: Michael A Bouzinier
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
import concurrent
import glob
import os
import sys
import traceback
from argparse import ArgumentParser
from concurrent.futures import ThreadPoolExecutor
from typing import List, Optional
from cms.tools.mcr_file import MedicareFile
[docs]class MedParFileSet:
def __init__(self, fts:str, dat: List[str], destination: str):
self.fts = fts
self.dat = dat
self.year = None
self.dir = os.path.dirname(fts)
self.name, _ = os.path.splitext(os.path.basename(fts))
d = self.dir
while len(d) > 3:
d, e = os.path.split(d)
if e.isdigit():
yyyy = int(e)
if 1999 < yyyy < 2030:
if e in self.name:
self.year = yyyy
if self.year is None:
raise ValueError("Could not find year for " + fts)
self.reader: MedicareFile = MedicareFile(
dir_path=self.dir,
name=self.name,
year=str(self.year),
dest = os.path.join(destination, str(self.year))
)
return
def __str__(self) -> str:
return "{:d}: {}.(fts|{:d}-dat)".format(
self.year, self.name, len(self.dat)
)
[docs]class MedparConverter:
[docs] @classmethod
def dataset(cls, fts, destination) -> Optional[MedParFileSet]:
base, ext = os.path.splitext(fts)
csv_gz = sorted(glob.glob(base + "*.csv.gz"))
if csv_gz:
print("Skipping " + fts)
return None
dat = sorted(glob.glob(base + "*.dat"))
if not dat:
raise ValueError(
"Mismatch: {} does not have corresponding dat file(s)".
format(fts)
)
return MedParFileSet(fts, dat, destination)
[docs] @classmethod
def find(cls, basepath: str, destination: str) -> List[MedParFileSet]:
datasets: List[MedParFileSet] = []
fts_files = sorted(glob.glob(
os.path.join(basepath, "**", "*.fts"),
recursive=True
))
for fts in fts_files:
ds = cls.dataset(fts, destination)
if ds is not None:
datasets.append(ds)
return datasets
def __init__(self, source_path: str,
destination: str = None,
verbose: bool = True):
self.datasets: List[MedParFileSet] = []
self.verbose = verbose
if os.path.isdir(source_path):
if destination is None:
destination = source_path
self.datasets = self.find(source_path, destination)
elif os.path.isfile(source_path):
if destination is None:
raise ValueError(
"When source path is a single file, "
"destination must be defined"
)
self.datasets = [self.dataset(source_path, destination)]
[docs] def list(self):
for dataset in self.datasets:
print(dataset)
[docs] @staticmethod
def convert_dataset(dataset: MedParFileSet, verbose):
try:
status = dataset.reader.status()
if status in ["READY", "ERROR"] or "MISMATCH" in status:
return "{}: SKIPPED[{}]".format(dataset.fts, status)
if verbose:
dataset.reader.info()
dataset.reader.export()
return "{}: SUCCESS".format(dataset.fts)
except Exception as x:
traceback.print_exc()
return "{}: FAILED".format(dataset.fts)
[docs] def convert(self):
with ThreadPoolExecutor() as executor:
futures = []
for dataset in self.datasets:
futures.append(
executor.submit(self.convert_dataset,
dataset=dataset,
verbose=self.verbose)
)
for future in concurrent.futures.as_completed(futures):
print(future.result())
[docs] def status(self):
with ThreadPoolExecutor() as executor:
futures = []
for dataset in self.datasets:
futures.append(
executor.submit(dataset.reader.status_message)
)
for future in concurrent.futures.as_completed(futures):
print(future.result())
[docs]def args():
parser = ArgumentParser ("Converter for CMS dat files described by FTS to csv")
parser.add_argument(help="Path to a source directory or an FTS file",
dest="input")
parser.add_argument("--status", "-s", action='store_true',
help="Display status and exit")
parser.add_argument("--convert", "-c", action='store_true',
help="Do conversion")
parser.add_argument("--verbose", "-v", action='store_true',
help="Display additional information")
parser.add_argument("--destination", "-d",
help="Destination for converted files")
arguments = parser.parse_args()
return arguments
if __name__ == '__main__':
my_args = args()
status = False
converter = MedparConverter(source_path=my_args.input,
destination=my_args.destination,
verbose=my_args.verbose)
if my_args.verbose:
converter.list()
if my_args.status:
converter.status()
if my_args.convert:
converter.convert()