"""
API used by data modelling and data loading utilities
"""
# Copyright (c) 2021. Harvard University
#
# Developed by Research Software Engineering,
# Faculty of Arts and Sciences, Research Computing (FAS RC)
# Author: Michael A Bouzinier
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import os
import re
from typing import Any, List
from nsaph_utils.utils.fwf import FWFReader
from nsaph_utils.utils.pyfst import FSTReader
from nsaph_utils.utils.io_utils import fopen
import csv
from sas7bdat import SAS7BDAT
[docs]def split(node) -> (str, dict):
"""
Given a node in an array produced from JSON/YAML structure
returns a name and definition associated with the node
:param node: an array node, usually from YAML or JSON file
:return: a tuple consisting of a name and an array containing
definitions
"""
if isinstance(node, str):
return node, {}
if not isinstance(node, dict):
raise Exception("Unsupported type for column spec: " + str(node))
name = None
for entry in node:
name = entry
break
node = node[name]
if node is None:
return name, {}
if isinstance(node, str):
node = {"type": node}
if not isinstance(node, dict):
raise Exception("Unsupported spec type for node: " + name)
return name, node
[docs]def basename(table):
"""
Given fully qualified table name (schema.table)
returns just a basename of a table
:param table: a fully qualified table name
:return: just the base name of the table, a piece of name after the last dot
"""
return table.split('.')[-1]
[docs]def entry_to_path(entry: Any) -> str:
"""
Returns valid path for an archive entry
:param entry: an archive entry or a path to file on the file system
:return: path within the arcjve or file system
"""
if isinstance(entry, tuple):
path, _ = entry
return path if isinstance(path, str) else path.name
return str(entry)
[docs]class CSVLikeJsonReader:
"""
Class, providing CSV Reader interface for JSON files.
Helps abstracting reading different file types.
"""
def __init__(self, path:str, columns: List, returns_mapping = False):
self.path = path
self.stream = None
self.current = None
self.columns = columns
self.returns_mapping = returns_mapping
def __next__(self):
line = next(self.stream)
data: dict = json.loads(line)
if self.returns_mapping:
row = data
else:
row = [data.get(column, None) for column in self.columns]
return row
[docs] def open(self):
self.stream = fopen(self.path, "rt")
return
[docs] def close(self):
self.stream.close()
return
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
def __iter__(self):
return self
[docs]class DataReader:
"""
Generalized reader for columns-structured files, such as CSV, FST and
sas7bdat
This class is Context manager and can be used with with block
Example::
with DataReader(file_path) as reader:
for row in reader.rows():
print(row)
"""
def __init__(self, path: str,
buffer_size = None,
quoting=None,
has_header = None,
columns = None,
delimiter = None):
self.path = path
self.reader = None
self._reader = None
self.columns = columns
self.to_close = None
self.size = None
self.count = None
self.delimiter = delimiter
self.buffer_size = buffer_size
if quoting is not None:
if isinstance(quoting, int):
self.quoting = quoting
elif isinstance(quoting, str):
quoting = quoting.upper()
if quoting in ["QUOTE_ALL", "ALL"]:
self.quoting = csv.QUOTE_ALL
elif quoting in ["QUOTE_MINIMAL", "MINIMAL"]:
self.quoting = csv.QUOTE_MINIMAL
elif quoting in ["QUOTE_NONNUMERIC", "NONNUMERIC"]:
self.quoting = csv.QUOTE_NONNUMERIC
elif quoting in ["QUOTE_NONE", "NONE"]:
self.quoting = csv.QUOTE_NONE
else:
self.quoting = csv.QUOTE_NONNUMERIC
if has_header is not None:
self.has_header = has_header
else:
self.has_header = True
[docs] def open_fst(self, name = None):
if name is None:
name = self.path
bs = self.buffer_size if self.buffer_size else 100000
self.reader = FSTReader(name, bs)
self.reader.open()
self.to_close = self.reader
self.columns = list(self.reader.columns.keys())
[docs] def open_csv(self, path, f= lambda s: fopen(s, "rt")):
self.to_close = f(path)
if self.delimiter is not None:
self.reader = csv.reader(self.to_close, quoting=self.quoting,
delimiter = self.delimiter)
else:
self.reader = csv.reader(self.to_close, quoting=self.quoting)
# self.reader = csv.reader((line.replace('\0',' ') for line in self.to_close), quoting=self.quoting)
if self.has_header:
header = next(self.reader)
self.columns = header
return
[docs] def open_sas7bdat(self, name = None):
if name is None:
name = self.path
self._reader = SAS7BDAT(name, skip_header=True)
self.to_close = self._reader
sas_columns = self._reader.columns
self.columns = [
column.name if isinstance(column.name, str)
else column.name.decode("utf-8")
for column in sas_columns
]
header = self._reader.header.properties
self.count = header.row_count
self.size = header.page_count * header.page_length
self.reader = iter(self._reader)
return
[docs] def open_fwf(self, reader):
self.reader = reader
self.reader.open()
self.to_close = self.reader
[docs] def open_json(self, path):
self.reader = CSVLikeJsonReader(path, self.columns)
self.reader.open()
self.to_close = self.reader
return
[docs] def get_path(self) -> str:
return entry_to_path(self.path)
def __enter__(self):
opened = False
name = self.path
if isinstance(self.path, tuple):
path, f = self.path
if isinstance(path, FWFReader):
self.open_fwf(path)
return self
name = path if isinstance(path, str) else path.name
nlower = name.lower()
if nlower.endswith(".fst") or nlower.endswith(".sas7bdat"):
if not os.path.isfile(name):
raise Exception(
"Not implemented: reading FST or SAS files from archive"
)
elif isinstance(path, str) and ".json" in path.lower():
self.open_json(path)
opened = True
elif isinstance(path, str):
self.open_csv(path, f)
opened = True
if not opened:
if isinstance(name, FWFReader):
self.open_fwf(name)
elif name.lower().endswith(".fst"):
self.open_fst(name)
elif ".csv" in name.lower():
self.open_csv(name)
elif name.lower().endswith(".sas7bdat"):
self.open_sas7bdat(name)
elif ".json" in name.lower():
self.open_json(name)
else:
raise Exception("Unsupported file format: " + self.path)
return self
[docs] def rows(self):
return self.reader
def __exit__(self, exc_type, exc_val, exc_tb):
self.to_close.close()
[docs]def regex(pattern: str):
pattern = 'A' + pattern.replace('.', '_') + 'Z'
x = pattern.split('*')
y = [re.escape(s) for s in x]
regexp = ".*".join(y)[1:-1]
return re.compile(regexp)