Source code for nsaph.loader.project_loader

#  Copyright (c) 2022. Harvard University
#
#  Developed by Research Software Engineering,
#  Faculty of Arts and Sciences, Research Computing (FAS RC)
#  Author: Michael A Bouzinier
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

"""
This module is a command line tool to introspect and ingest into a database
a directory, containing CSV (or CSV-like, e.g. FST, JSON, SAS, etc.) files.
The directory can be structured, e.g. have nested subdirectories. All files
matching a certain name pattern at any nested subdirectory level
are included in the data set.

In the database, a schema is crated based on the given project name.
For each file in the data set a table is created. The name
of the table is constructed from the relative path of the
incoming data file with OS path separators (e.g. '/') being
replaced with underscores ('_').
"""

import logging
import os
from pathlib import PurePath

import yaml

from nsaph.data_model.domain import Domain
from nsaph.loader import LoaderConfig
from nsaph.loader.data_loader import DataLoader
from nsaph.loader.introspector import Introspector


[docs]def is_relative_to(p: PurePath, *other): """Return True if the path is relative to another path or False. """ try: p.relative_to(*other) return True except ValueError: return False
[docs]def remove_ext(p: str) -> str: d, f = os.path.split(p) pos = f.find('.') if pos >= 0: return os.path.join(d, f[:pos]) return os.path.join(d, f)
[docs]class ProjectLoader(DataLoader):
[docs] @classmethod def new_domain(cls, name: str, file_path: str): domain = dict() domain[name] = dict() d = domain[name] d["schema"] = name d["header"] = True d["quoting"] = 'QUOTE_MINIMAL' d["index"] = "unless excluded" d["tables"] = dict() with open(file_path, "wt") as f: yaml.dump(domain, f, indent=2)
def __init__(self, context: LoaderConfig = None): """ Class, implementing Project Loading functionality. :param context: Configuration options, usually provided as command line arguments """ if not context: context = LoaderConfig(__doc__).instantiate() if context.registry is not None and not os.path.exists(context.registry): self.new_domain(context.domain, context.registry) super().__init__(context) with open(context.registry) as f: self.registry = yaml.safe_load(f) return
[docs] def introspect(self, entry: str): p = PurePath(entry) table = None for root in self.context.data: if is_relative_to(p, root): relpath = PurePath(os.path.relpath(remove_ext(entry), root)) table = "_".join(relpath.parts) if table is None: raise ValueError(entry) introspector = Introspector(entry) introspector.lines_to_load = 20000 introspector.introspect() introspector.append_file_column() introspector.append_record_column() columns = introspector.get_columns() self.registry[self.domain.domain]["tables"][table] = { "columns": columns, "primary_key": [ "FILE", "RECORD" ] } return table
[docs] def save_registry(self): with open(self.context.registry, "wt") as f: yaml.dump(self.registry, f) f.write('\n') self.domain = Domain(self.registry, self.domain.domain) self.domain.init() return
[docs] def run(self): """ Run introspection and ingestion. :return: nothing """ entries = self.get_files() tables = { self.introspect(entry[0]): entry[0] for entry in entries } self.save_registry() if self.context.dryrun: return inputs = self.context.data s = 0 e = 0 for table in tables: self.set_table(table) self.context.data = [tables[table]] self.reset() try: self.load() s += 1 except: self.drop() logging.exception("Failed to load data into table " + table) e += 1 self.context.data = inputs logging.info( "All done. Tables loaded: {:d}, tables failed: {:d}.".format(s, e) ) return
if __name__ == '__main__': loader = ProjectLoader() loader.run()