Source code for nsaph.loader

#  Copyright (c) 2021. Harvard University
#  Developed by Research Software Engineering,
#  Faculty of Arts and Sciences, Research Computing (FAS RC)
#  Author: Michael A Bouzinier
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  See the License for the specific language governing permissions and
#  limitations under the License.

import glob
import logging
import os
import threading
import time
from abc import ABC
from pathlib import Path

import sys
from typing import Iterable, Callable

from nsaph_utils.utils.io_utils import is_yaml_or_json, is_dir
from psycopg2.extensions import connection

from nsaph import init_logging, app_name
from nsaph.data_model.domain import Domain
from nsaph.loader.common import CommonConfig
from nsaph.db import Connection
from nsaph.loader.loader_config import LoaderConfig
from nsaph.loader.monitor import DBActivityMonitor

[docs]def diff(files: Iterable[str]) -> bool: """ Given a list of files, check if they are all identical :param files: a list of files :return: True if all files in the list are identical one to another """ ff = list(files) for i in range(len(ff) - 1): ret = os.system("diff {} {}".format(ff[i], ff[i+1])) if ret != 0: return False return True
[docs]class LoaderBase(ABC): """ Base class for tools responsible for data loading and processing """
[docs] @classmethod def find_file_in_path(cls, fname: str): files = set() for d in sys.path: files.update(glob.glob( os.path.join(d, "**", fname), recursive=True )) if not files: raise ValueError("File {} not found".format(fname)) if len(files) > 1: if not diff(files): raise ValueError("Ambiguous files {}".format(';'.join(files))) return files.pop()
[docs] @classmethod def get_domain(cls, name: str, registry: str = None) -> Domain: src = None registry_path = None if registry: if is_yaml_or_json(registry): if os.path.dirname(registry) or os.path.exists(registry): registry_path = os.path.abspath(registry) else: registry_path = cls.find_file_in_path(registry) if not os.path.isfile(registry_path): raise ValueError("{} - is not a file".format(registry_path)) elif not is_dir(registry): raise ValueError("{} - is not a valid registry path".format(registry)) elif os.path.isdir(registry): src = registry else: raise NotImplementedError("Not Implemented: registry in an archive") if not src: src = Path(__file__).parents[3] if not registry_path: registry_path = os.path.join(src, "yml", name + ".yaml") if not os.path.isfile(registry_path): registry_path = cls.find_file_in_path(name + ".yaml") if not os.path.isfile(registry_path): raise ValueError("File {} does not exist".format(os.path.abspath(registry_path))) domain = Domain(registry_path, name) return domain
def __init__(self, context: LoaderConfig): app = app_name() if context.table: name = context.table + '-' + app else: name = app init_logging(name=app) self.context = None self.domain = self.get_domain(context.domain, context.registry) if isinstance(context, LoaderConfig) and context.sloppy: self.domain.set_sloppy() self.domain.init() self.table = context.table self.monitor = DBActivityMonitor(context) self.exception = None def _connect(self) -> connection: c = Connection(self.context.db, self.context.connection)\ .connect(self.context.autocommit) return c
[docs] def get_pid(self, connxn: connection) -> int: with connxn.cursor() as cursor: cursor.execute("SELECT pg_backend_pid()") for row in cursor: return row[0]
[docs] def execute_with_monitor(self, what: Callable, connxn: connection = None, on_monitor: Callable = None): if not on_monitor: pid = self.get_pid(connxn) on_monitor = lambda: self.monitor.log_activity(pid) try: DBActivityMonitor.execute(what, on_monitor) except Exception as x: self.exception = x