Source code for nsaph.loader.loader_config

"""
Domain Loader Configurator

Intended to configure loading of a single or a set of column-formatted files
into NSAPH PostgreSQL Database.
Input (aka source) files can be either in FST or in CSV format

Configurator assumes that the database schema is defined as a YAML or JSON file.
A separate tool is available to introspect source files and infer possible
database schema.
"""

#  Copyright (c) 2021. Harvard University
#
#  Developed by Research Software Engineering,
#  Faculty of Arts and Sciences, Research Computing (FAS RC)
#  Author: Michael A Bouzinier
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

from enum import Enum
from typing import Optional

from nsaph_utils.utils.context import Context, Argument, Cardinality

from nsaph.loader.common import CommonConfig


[docs]class Parallelization(Enum): lines = "lines" files = "files" none = "none"
[docs]class DataLoaderAction(Enum): drop = "drop" load = "load" insert = "insert" print = "print"
[docs] @classmethod def new(cls, value: str): if value is None: return None return cls(value)
#default = "default"
[docs]class LoaderConfig(CommonConfig): """ Configurator class for data loader """ _action = Argument( "action", help = "Action to perform", type = str, required=False, valid_values = [v.value for v in DataLoaderAction] ) _data = Argument("data", help = "Path to a data file or directory. Can be a " + "single CSV, gzipped CSV or FST file or a directory recursively " + "containing CSV files. Can also be a tar, tar.gz (or tgz) or zip archive " + "containing CSV files", type = str, required = False, cardinality = Cardinality.multiple ) _pattern = Argument("pattern", help = "pattern for files in a directory or an archive, " + "e.g. \"**/maxdata_*_ps_*.csv\"", type = str, required = False, cardinality = Cardinality.multiple ) _reset = Argument("reset", help = "Force recreating table(s) if it/they already exist", type = bool, default = False, cardinality = Cardinality.single ) _incremental = Argument("incremental", help = "Commit every file and skip over files that " + "have already been ingested", type = bool, default = False, cardinality = Cardinality.single ) _sloppy = Argument("sloppy", help = "Do not update existing tables", type = bool, default = False, cardinality = Cardinality.single ) _page = Argument( "page", help = "Explicit page size for the database", required = False, type = int ) _log = Argument( "log", help = "Explicit interval for logging", required = False, type = int ) _limit = Argument( "limit", help = "Load at most specified number of records", required = False, type = str ) _buffer = Argument( "buffer", help = "Buffer size for converting fst files", required = False, type = int ) _threads = Argument( "threads", help = "Number of threads writing into the database", default = 1, type = int ) _parallelization = Argument( "parallelization", help = "Type of parallelization, if any", default = "lines", cardinality = Cardinality.single, valid_values = [v.value for v in Parallelization] ) def __init__(self, doc): self.action: Optional[DataLoaderAction] = None """ If this option is given, then the whole domain schema will be dropped """ self.data = None """ Path to a data file or directory. Can be a single CSV, gzipped CSV or FST file or a directory recursively containing CSV files. Can also be a tar, tar.gz (or tgz) or zip archive containing CSV files """ self.reset = None ''' Force recreating table(s) if it/they already exist ''' self.page = None ''' Explicit page size for the database ''' self.log = None ''' Explicit interval for logging ''' self.limit = None ''' Load at most specified number of records ''' self.buffer = None ''' Buffer size for converting fst files ''' self.threads = None ''' Number of threads writing into the database ''' self.parallelization = None ''' Type of parallelization, if any ''' self.pattern = None """ pattern for files in a directory or an archive, e.g., \"\*\*/maxdata_\*_ps_\*.csv\" """ self.incremental = None """ Commit every file and skip over files that have already been ingested """ self.sloppy = False '''Do not update existing tables and views''' super().__init__(LoaderConfig, doc)
[docs] def validate(self, attr, value): value = super().validate(attr, value) if attr == self._parallelization.name: return Parallelization(value) if attr == self._action.name: return DataLoaderAction.new(value) return value