Source code for nsaph.data_model.utils

"""
API used by data modelling and data loading utilities
"""

#  Copyright (c) 2021. Harvard University
#
#  Developed by Research Software Engineering,
#  Faculty of Arts and Sciences, Research Computing (FAS RC)
#  Author: Michael A Bouzinier
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

import json
import os
import re
from typing import Any, List

from nsaph_utils.utils.fwf import FWFReader
from nsaph_utils.utils.pyfst import FSTReader
from nsaph_utils.utils.io_utils import fopen
import csv

from sas7bdat import SAS7BDAT


[docs]def split(node) -> (str, dict): """ Given a node in an array produced from JSON/YAML structure returns a name and definition associated with the node :param node: an array node, usually from YAML or JSON file :return: a tuple consisting of a name and an array containing definitions """ if isinstance(node, str): return node, {} if not isinstance(node, dict): raise Exception("Unsupported type for column spec: " + str(node)) name = None for entry in node: name = entry break node = node[name] if node is None: return name, {} if isinstance(node, str): node = {"type": node} if not isinstance(node, dict): raise Exception("Unsupported spec type for node: " + name) return name, node
[docs]def basename(table): """ Given fully qualified table name (schema.table) returns just a basename of a table :param table: a fully qualified table name :return: just the base name of the table, a piece of name after the last dot """ return table.split('.')[-1]
[docs]def entry_to_path(entry: Any) -> str: """ Returns valid path for an archive entry :param entry: an archive entry or a path to file on the file system :return: path within the arcjve or file system """ if isinstance(entry, tuple): path, _ = entry return path if isinstance(path, str) else path.name return str(entry)
[docs]class CSVLikeJsonReader: """ Class, providing CSV Reader interface for JSON files. Helps abstracting reading different file types. """ def __init__(self, path:str, columns: List, returns_mapping = False): self.path = path self.stream = None self.current = None self.columns = columns self.returns_mapping = returns_mapping def __next__(self): line = next(self.stream) data: dict = json.loads(line) if self.returns_mapping: row = data else: row = [data.get(column, None) for column in self.columns] return row
[docs] def open(self): self.stream = fopen(self.path, "rt") return
[docs] def close(self): self.stream.close() return
def __enter__(self): self.open() return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() def __iter__(self): return self
[docs]class DataReader: """ Generalized reader for columns-structured files, such as CSV, FST and sas7bdat This class is Context manager and can be used with with block Example:: with DataReader(file_path) as reader: for row in reader.rows(): print(row) """ def __init__(self, path: str, buffer_size = None, quoting=None, has_header = None, columns = None, delimiter = None): self.path = path self.reader = None self._reader = None self.columns = columns self.to_close = None self.size = None self.count = None self.delimiter = delimiter self.buffer_size = buffer_size if quoting is not None: if isinstance(quoting, int): self.quoting = quoting elif isinstance(quoting, str): quoting = quoting.upper() if quoting in ["QUOTE_ALL", "ALL"]: self.quoting = csv.QUOTE_ALL elif quoting in ["QUOTE_MINIMAL", "MINIMAL"]: self.quoting = csv.QUOTE_MINIMAL elif quoting in ["QUOTE_NONNUMERIC", "NONNUMERIC"]: self.quoting = csv.QUOTE_NONNUMERIC elif quoting in ["QUOTE_NONE", "NONE"]: self.quoting = csv.QUOTE_NONE else: self.quoting = csv.QUOTE_NONNUMERIC if has_header is not None: self.has_header = has_header else: self.has_header = True
[docs] def open_fst(self, name = None): if name is None: name = self.path bs = self.buffer_size if self.buffer_size else 100000 self.reader = FSTReader(name, bs) self.reader.open() self.to_close = self.reader self.columns = list(self.reader.columns.keys())
[docs] def open_csv(self, path, f= lambda s: fopen(s, "rt")): self.to_close = f(path) if self.delimiter is not None: self.reader = csv.reader(self.to_close, quoting=self.quoting, delimiter = self.delimiter) else: self.reader = csv.reader(self.to_close, quoting=self.quoting) # self.reader = csv.reader((line.replace('\0',' ') for line in self.to_close), quoting=self.quoting) if self.has_header: header = next(self.reader) self.columns = header return
[docs] def open_sas7bdat(self, name = None): if name is None: name = self.path self._reader = SAS7BDAT(name, skip_header=True) self.to_close = self._reader sas_columns = self._reader.columns self.columns = [ column.name if isinstance(column.name, str) else column.name.decode("utf-8") for column in sas_columns ] header = self._reader.header.properties self.count = header.row_count self.size = header.page_count * header.page_length self.reader = iter(self._reader) return
[docs] def open_fwf(self, reader): self.reader = reader self.reader.open() self.to_close = self.reader
[docs] def open_json(self, path): self.reader = CSVLikeJsonReader(path, self.columns) self.reader.open() self.to_close = self.reader return
[docs] def get_path(self) -> str: return entry_to_path(self.path)
def __enter__(self): opened = False name = self.path if isinstance(self.path, tuple): path, f = self.path if isinstance(path, FWFReader): self.open_fwf(path) return self name = path if isinstance(path, str) else path.name nlower = name.lower() if nlower.endswith(".fst") or nlower.endswith(".sas7bdat"): if not os.path.isfile(name): raise Exception( "Not implemented: reading FST or SAS files from archive" ) elif isinstance(path, str) and ".json" in path.lower(): self.open_json(path) opened = True elif isinstance(path, str): self.open_csv(path, f) opened = True if not opened: if isinstance(name, FWFReader): self.open_fwf(name) elif name.lower().endswith(".fst"): self.open_fst(name) elif ".csv" in name.lower(): self.open_csv(name) elif name.lower().endswith(".sas7bdat"): self.open_sas7bdat(name) elif ".json" in name.lower(): self.open_json(name) else: raise Exception("Unsupported file format: " + self.path) return self
[docs] def rows(self): return self.reader
def __exit__(self, exc_type, exc_val, exc_tb): self.to_close.close()
[docs]def regex(pattern: str): pattern = 'A' + pattern.replace('.', '_') + 'Z' x = pattern.split('*') y = [re.escape(s) for s in x] regexp = ".*".join(y)[1:-1] return re.compile(regexp)