Source code for nsaph.requests.hdf5_export

"""
A utility to export result of quering the database in HDF5 format
"""

#  Copyright (c) 2021. Harvard University
#
#  Developed by Research Software Engineering,
#  Faculty of Arts and Sciences, Research Computing (FAS RC)
#  Author: Michael A Bouzinier
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

import argparse
import os
from typing import List

import h5py
import numpy

from nsaph.db import ResultSetDeprecated
from nsaph.requests.query import Query


[docs]class Dataset: def __init__(self, t): self.data = [] self.indices = [] self.type = t self.max_len = 1
[docs] def clear(self): self.data.clear()
[docs] def add_index(self, idx): self.indices.append(idx)
[docs] def append(self, row: list): if self.type == float: values = [float(row[i]) for i in self.indices] else: values = [row[i] for i in self.indices] if self.type == str: self.max_len = max([self.max_len] + [len(v) for v in values]) self.data.append(values)
[docs] def type_name(self): return self.type.__name__
[docs] def to_hdf5(self, parent, name): ds_name = "{}_{}".format(name, self.type_name()) if self.type == str: dtype = "<S{}".format(self.max_len) h5 = parent.create_dataset(ds_name, data=self.data, dtype=dtype) else: h5 = parent.create_dataset(ds_name, data=self.data) for v in enumerate(self.indices): h5.attrs[v[1]] = v[0] return h5
[docs]def dtype(t): if t == int: return numpy.dtype(numpy.int32) if t == float: return numpy.dtype(numpy.float) return numpy.dtype(numpy.str)
[docs]def append(datasets: List, row): for dataset in datasets: dataset.append(row) return
[docs]def map2ds(rs: ResultSetDeprecated, groups: list) -> List: datasets = {} for i in range(0, len(rs.header)): h = rs.header[i] t = rs.types[i] if h not in groups: if t.startswith("int"): if int not in datasets: datasets[int] = Dataset(int) datasets[int].add_index(h) elif t.startswith("numeric") or t.startswith("float"): if float not in datasets: datasets[float] = Dataset(float) datasets[float].add_index(h) else: if str not in datasets: datasets[str] = Dataset(str) datasets[str].add_index(h) return list(datasets.values())
[docs]def store(parent, name:str, datasets: list, attrs): for dataset in datasets: h5ds = dataset.to_hdf5(parent, name) for attr in attrs: h5ds.attrs[attr] = attrs[attr] dataset.clear()
def _export(request: dict, rs: ResultSetDeprecated, path: str): """ Internal method. Use :func:export :param request: A dictionary containing user request, normally the result of parsed YAML file :param rs: ResultSet obtained by executing the generated query :param path: Output path where to export HDF5 file :return: None """ name = os.path.basename(path).split('.')[0] groups = None if "package" in request: if "group" in request["package"]: groups = request["package"]["group"] if isinstance(groups, str): groups = [groups] ng = len(groups) data_columns = {} i = 0 for h in rs.header: if h not in groups: data_columns[i] = h i += 1 row = next(rs) current = {g: row[g] for g in groups} with h5py.File(path, "w") as f: f.attrs["name"] = request["name"] f.attrs["file_name"] = name h5 = f for g in groups: h5.attrs["grouping"] = g h5 = h5.create_group(str(row[g])) g = groups[-1] h5.attrs["grouping"] = g datasets = map2ds(rs, groups) idx = 0 append(datasets, row) for row in rs: i = ng while i > 0: g = groups[i-1] if row[g] == current[g]: break i -= 1 if i < ng: attrs = {g: str(current[g]) for g in groups} store(h5, str(row[groups[-1]]), datasets, attrs) current = {g: row[g] for g in groups} ii = i for i in range(ii, ng): h5 = h5.parent for i in range(ii, ng): g = groups[i] h5.attrs["grouping"] = g h5 = h5.create_group(str(row[g])) idx += 1 append(datasets, row) attrs = {g: str(current[g]) for g in groups} store(h5, str(row[groups[-1]]), datasets, attrs)
[docs]def export(path_to_user_request: str, db_ini: str, connection_name: str): """ Executes query specified by a user request and exports results as HDF5 file :param path_to_user_request: a path to YAML file containing user request specification :param db_ini: a path to database.ini file, containing connection parameters :param connection_name: section name in the db.ini file :return: """ with Query(path_to_user_request, (db_ini, connection_name)) as q: print(q.sql) count = 0 rs = q.execute() _export(q.request, rs, q.request["name"] + ".hdf5") print("All done")
if __name__ == '__main__': #init_logging() parser = argparse.ArgumentParser (description="Create table and load data") parser.add_argument("--request", "-r", help="Path to a table definition file for a table", default=None, required=False) parser.add_argument("--db", help="Path to a database connection parameters file", default="database.ini", required=False) parser.add_argument("--section", "--connection_name", help="Section in the database connection parameters file", default="postgresql", required=False) args = parser.parse_args() if not args.request: d = os.path.dirname(__file__) args.request = os.path.join(d, "../../../yml/example_request.yaml") export(args.request, args.db, args.section) print("All done")