"""
This module introspects columnar data to infer the types of
the columns
"""
# Copyright (c) 2021. Harvard University
#
# Developed by Research Software Engineering,
# Faculty of Arts and Sciences, Research Computing (FAS RC)
# Author: Michael A Bouzinier
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import csv
import datetime
import glob
import json
import logging
import os
import re
import sys
import tarfile
from collections import OrderedDict
from typing import Dict, Callable, List, Union, Optional
import numbers
import yaml
from nsaph_utils.utils.io_utils import fopen, SpecialValues, is_dir, get_entries
from sas7bdat import SAS7BDAT
from nsaph.pg_keywords import *
from nsaph_utils.utils.pyfst import FSTReader
PG_MAXINT = 2147483647
integer = re.compile("-?\d+")
float_number = re.compile("(-?\d*)\.(\d+)([e|E][-|+]?\d+)?")
exponent = re.compile("(-?\d+)([e|E][-|+]?\d+)")
_date = "([12]\d{3}-(0[1-9]|1[0-2])-(0[1-9]|[12]\d|3[01]))"
date = re.compile(_date)
timestamp = re.compile(_date + "[T|t][0-9]{2}:[0-9]{2}")
[docs]class Introspector:
[docs] @staticmethod
def load_range(n, f) -> int:
for i in range(0, n):
try:
f()
except StopIteration:
return i
return n
[docs] @staticmethod
def name(path) -> str:
if isinstance(path, tarfile.TarInfo):
full_name = path.name
else:
full_name = str(path)
#name, _ = os.path.splitext(os.path.basename(full_name))
return full_name
[docs] @staticmethod
def csv_reader(data, unquote = True):
if unquote:
q = csv.QUOTE_ALL
else:
q = csv.QUOTE_NONE
return csv.reader(data, quotechar='"', delimiter=',',
quoting=q, skipinitialspace=True)
[docs] @staticmethod
def unquote(s: str) -> str:
return s.strip().strip('"')
def __init__(self,
data_file: str,
column_name_replacement: Dict = None):
self.entries = None
self.lines_to_load = 10000
if is_dir(data_file):
self.entries, self.open_entry_function = get_entries(data_file)
else:
self.open_entry_function = lambda x: fopen(x, "rt")
self.file_path = data_file
self.has_commas = False
self.quoted_values = False
self.sql_columns = []
self.csv_columns = []
self.types = []
self.descriptions = None
self.column_map = column_name_replacement \
if column_name_replacement else dict()
self.appended_columns = []
return
[docs] def fopen(self, source):
entry = self.open_entry_function(source)
return entry
[docs] def handle_csv(self, entry):
rows, lines = self.load_csv(entry)
for row in rows:
for cell in row:
if isinstance(cell, str) and ',' in cell:
self.has_commas = True
break
if not rows:
raise Exception("No data in {}".format(self.file_path))
self.guess_types(rows, lines)
[docs] def handle_json(self, entry):
rows = self.load_json(entry)
if not rows:
raise Exception("No data in {}".format(self.file_path))
m = len(rows)
n = len(self.csv_columns)
scale = 0
precision = 0
for c in range(0, n):
c_type = None
max_val = 0
for l in range(0, m):
v = rows[l][c]
if v is None:
continue
if isinstance(v, int):
t = PG_INT_TYPE
max_val = max(max_val, v)
elif isinstance(v, float):
t = PG_NUMERIC_TYPE
elif isinstance(v, str):
t, scale, precision, max_val = self.guess_str(
v, None, scale, precision, max_val
)
else:
raise ValueError(v)
try:
c_type = self.reconcile(t, c_type)
except InconsistentTypes:
msg = "Inconsistent type for column {:d} [{:s}]. " \
.format(c + 1, self.csv_columns[c])
msg += "Up to line {:d}: {:s}, for line={:d}: {:s}. " \
.format(l - 1, c_type, l, t)
msg += "Value = {}".format(v)
raise Exception(msg)
self.types.append(self.db_type(c_type, max_val, precision, scale))
return
[docs] @classmethod
def sas2db_type(cls, column, rows):
if column.type == "number":
values = [row[column.col_id] for row in rows]
is_date = all([isinstance(v, datetime.date) or v is None for v in values])
if is_date:
return PG_DATE_TYPE
is_ts = all([isinstance(v, datetime.datetime) or v is None for v in values])
if is_ts:
return PG_TS_TYPE
max_value = max(values)
is_int = all([isinstance(v, numbers.Integral) or v is None for v in values])
t = PG_INT_TYPE if is_int else PG_NUMERIC_TYPE
return cls.db_type(t, max_value, None, None)
if column.type == "string":
return "{}({:d})".format(PG_STR_TYPE, column.length)
raise ValueError("Unknown SAS datatype: {}".format(column.type))
[docs] def handle_sas(self, entry):
reader = SAS7BDAT(entry, skip_header=True)
rows = self.load_sas(reader)
sas_columns = reader.columns
self.csv_columns = [
column.name.decode("utf-8") for column in sas_columns
]
self.descriptions = [
column.label.decode("utf-8") for column in sas_columns
]
self.types = [self.sas2db_type(column, rows) for column in sas_columns]
[docs] def introspect(self, entry=None):
if not entry:
if self.entries is not None:
entry = self.entries[0]
else:
entry = self.file_path
logging.info("Using for data analysis: " + self.name(entry))
if ".json" in self.name(entry).lower():
self.handle_json(entry)
elif self.name(entry).lower().endswith(".sas7bdat"):
self.handle_sas(entry)
else:
self.handle_csv(entry)
self.sql_columns = []
for i, c in enumerate(self.csv_columns):
if not c:
self.append_sql_column("Col{:03d}".format(i+1))
elif c.lower() in self.column_map:
self.append_sql_column(self.column_map[c.lower()])
else:
if c[0].isdigit():
c = "c_" + c
self.append_sql_column(
c.replace('.', '_')
.replace(' ', '_')
.lower()
)
return
[docs] def append_sql_column(self, name: str):
fmt = "{}_{:03d}"
if name in self.sql_columns:
n = len(self.sql_columns)
i = self.sql_columns.index(name)
self.sql_columns[i] = fmt.format(name, i+1)
name = fmt.format(name, n)
self.sql_columns.append(name)
return
[docs] def load_csv(self, entry) -> (List[List[str]], List[List[str]]):
if isinstance(entry, str) and entry.lower().endswith(".fst"):
return self.load_fst(entry)
with self.fopen(entry) as data:
reader = self.csv_reader(data, True)
row = next(reader)
self.csv_columns = [self.unquote(c) for c in row]
rows = []
self.load_range(self.lines_to_load, lambda : rows.append(next(reader)))
with self.fopen(entry) as data:
reader = self.csv_reader(data, False)
next(reader)
lines = []
self.load_range(self.lines_to_load, lambda : lines.append(next(reader)))
return rows, lines
[docs] def load_fst(self, entry) -> (List[List[str]], List[List[str]]):
with FSTReader(entry, buffer_size=self.lines_to_load + 10) as reader:
self.csv_columns = [c for c in reader.columns]
rows = []
self.load_range(self.lines_to_load, lambda : rows.append(next(reader)))
lines = None
return rows, lines
[docs] def load_sas(self, reader: SAS7BDAT) -> List[List]:
rows = []
for row in reader:
rows.append(row)
if len(rows) >= self.lines_to_load:
break
return rows
[docs] def load_json(self, entry) -> List[List]:
headers = OrderedDict()
records = []
counter = 0
with self.fopen(entry) as data:
for line in data:
record = json.loads(line)
for h in record:
if h not in headers:
headers[h] = 1
records.append(record)
counter += 1
if counter > self.lines_to_load:
break
self.csv_columns = list(headers.keys())
rows = [
[record.get(h, None) for h in self.csv_columns]
for record in records
]
return rows
[docs] def guess_str(self, v, v2: Optional[str], scale, precision, max_val):
if timestamp.fullmatch(v):
t = PG_TS_TYPE
elif date.fullmatch(v):
t = PG_DATE_TYPE
elif v2 and v2 == '"{}"'.format(v):
t = PG_STR_TYPE
self.quoted_values = True
elif SpecialValues.is_untyped(v):
t = "0"
else:
f = float_number.fullmatch(v)
if f:
t = PG_NUMERIC_TYPE
s = len(f.group(2))
p = len(f.group(1))
scale = max(scale, s)
precision = max(precision, p)
elif exponent.fullmatch(v):
t = PG_NUMERIC_TYPE
elif integer.fullmatch(v):
t = PG_INT_TYPE
max_val = max(max_val, abs(int(v)))
else:
t = PG_STR_TYPE
if t == PG_STR_TYPE:
max_val = max(max_val, len(v))
return t, scale, precision, max_val
[docs] def guess_types(self, rows: list, lines: list):
m = len(rows)
n = len(rows[0])
self.types.clear()
for c in range(0, n):
c_type = None
precision = 0
scale = 0
max_val = 0
for l in range(0, m):
cell = rows[l][c]
if isinstance(cell, numbers.Number):
if isinstance(cell, numbers.Integral):
t = PG_INT_TYPE
else:
t = PG_NUMERIC_TYPE
elif SpecialValues.is_missing(cell):
t = "0"
else:
v = cell.strip()
if lines:
try:
v2 = lines[l][c].strip()
except:
v2 = None
else:
v2 = None
t, scale, precision, max_val = \
self.guess_str(v, v2, scale, precision, max_val)
if t == "0":
continue
try:
c_type = self.reconcile(t, c_type)
except InconsistentTypes:
msg = "Inconsistent type for column {:d} [{:s}]. " \
.format(c + 1, self.csv_columns[c])
msg += "Up to line {:d}: {:s}, for line={:d}: {:s}. " \
.format(l - 1, c_type, l, t)
msg += "Value = {}".format(cell)
raise Exception(msg)
self.types.append(self.db_type(c_type, max_val, precision, scale))
return
[docs] @staticmethod
def reconcile(cell_type, column_type) -> str:
if column_type == "0":
column_type = cell_type
elif column_type == PG_NUMERIC_TYPE and cell_type == PG_INT_TYPE:
return column_type
elif column_type == PG_STR_TYPE and cell_type in [PG_INT_TYPE, PG_NUMERIC_TYPE]:
return column_type
elif column_type == PG_INT_TYPE and cell_type == PG_NUMERIC_TYPE:
column_type = cell_type
elif column_type in [PG_INT_TYPE, PG_NUMERIC_TYPE] and cell_type == PG_STR_TYPE:
column_type = cell_type
elif (column_type and column_type != cell_type):
raise InconsistentTypes
else:
column_type = cell_type
return column_type
[docs] @staticmethod
def db_type(column_type, max_val, precision, scale) -> str:
if column_type == PG_INT_TYPE and max_val * 10 > PG_MAXINT:
column_type = PG_BIGINT_TYPE
if column_type == PG_NUMERIC_TYPE and precision and scale:
column_type = column_type + "({:d},{:d})".format(
precision + scale + 2, scale
)
if column_type == "0":
column_type = PG_NUMERIC_TYPE
if not column_type:
column_type = PG_STR_TYPE
if column_type == PG_STR_TYPE and max_val > 256:
column_type = PG_TXT_TYPE
return column_type
[docs] def get_columns(self) -> List[Dict]:
columns = self.appended_columns
for i, c in enumerate(self.sql_columns):
t = self.types[i]
s = self.csv_columns[i]
column = {
c: {
"type": t,
}
}
if self.descriptions is not None:
column[c]["description"] = self.descriptions[i]
if not s:
column[c]["source"] = i
elif s != c:
column[c]["source"] = s
columns.append(column)
return columns
[docs] def append_file_column(self):
self.appended_columns.append({
"FILE": {
"description": "original file name",
"index": {
"required_before_loading_data": True
},
"source": {
"type": "file"
},
"type": "VARCHAR(128)"
}
})
[docs] def append_record_column(self):
self.appended_columns.append({
"RECORD": {
"description": "Record (line) number in the file",
"index": True,
"type": PG_SERIAL_TYPE
}
}
)
[docs] @classmethod
def classify(cls, files):
classes = []
for f in files:
print(f)
introspector = Introspector(f)
introspector.introspect()
cc = introspector.get_columns()
new = True
for c in classes:
if c[0] == cc:
c[1].append(f)
new = False
break
if new:
classes.append([cc, [f]])
print("Found {:d} classes".format(len(classes)))
for i, c in enumerate(classes):
print("{:d}:".format(i+1))
for f in c[1]:
print("\t{}".format(f))
[docs]class InconsistentTypes(Exception):
pass
[docs]def test():
if len(sys.argv) == 2:
args = glob.glob(sys.argv[1])
else:
args = sys.argv[1:]
if len(args) > 1:
Introspector.classify(args)
else:
arg = args[0]
print(arg)
introspector = Introspector(arg)
introspector.introspect()
columns = introspector.get_columns()
print(yaml.dump(columns))
if __name__ == '__main__':
test()