"""
This module supports building indices on domain tables
and monitoring the build progress
"""
# Copyright (c) 2021. Harvard University
#
# Developed by Research Software Engineering,
# Faculty of Arts and Sciences, Research Computing (FAS RC)
# Author: Michael A Bouzinier
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
from datetime import datetime
import nsaph.dictionary.element
from nsaph.loader import LoaderBase, CommonConfig
from nsaph_utils.utils.context import Argument, Cardinality
from nsaph.loader.monitor import DBActivityMonitor
[docs]def find_name(sql):
x = sql.split(" ")
for i in range(0, len(x)):
if x[i] == "ON":
return x[i-1]
raise Exception(sql)
[docs]class IndexerConfig(CommonConfig):
"""
Configurator class for index builder
"""
_reset = Argument("reset",
help = "Force rebuilding indices it/they already exist",
type = bool,
aliases=["r"],
default = False,
cardinality = Cardinality.single
)
_incremental = Argument("incremental",
help = "Skip over existing indices",
aliases=["i"],
type = bool,
default = False,
cardinality = Cardinality.single
)
def __init__(self, doc):
self.reset = None
self.incremental = None
super().__init__(IndexerConfig, doc)
[docs]class IndexBuilder(LoaderBase):
"""
Index Builder Class
"""
def __init__(self, context: IndexerConfig = None):
if not context:
context = IndexerConfig(__doc__).instantiate()
super().__init__(context)
self.context: IndexerConfig = context
[docs] def run(self):
self.execute_with_monitor(self.execute, on_monitor=self.print_stat)
if self.exception is not None:
raise self.exception
[docs] def execute(self):
try:
self._execute()
except BaseException as ex:
logging.exception("Exception building indices")
self.exception = ex
raise
def _execute(self):
domain = self.domain
if self.context.table is not None:
indices = domain.indices_by_table[
domain.fqn(self.context.table)]
else:
indices = domain.indices
print(indices)
if self.context.autocommit:
for index in indices:
with self._connect() as cnxn:
self.build(index, cnxn)
else:
with self._connect() as cnxn:
for index in indices:
self.build(index, cnxn)
logging.info("All indices have been built")
[docs] def build(self, index, cnxn):
name = find_name(index)
fqn = self.domain.fqn(name)
with (cnxn.cursor()) as cursor:
if self.context.reset:
sql = "DROP INDEX IF EXISTS {name}".format(name=fqn)
logging.info(str(datetime.now()) + ": " + sql)
cursor.execute(sql)
if self.context.incremental:
sql = index.replace(name, "IF NOT EXISTS " + name)
else:
sql = index
logging.info(str(datetime.now()) + ": " + sql)
cursor.execute(sql)
logging.info(str(datetime.now()) + ": Index " +
name + " is ready.")
[docs] def print_stat(self):
for msg in self.monitor.get_indexing_progress():
logging.info(msg)
[docs] def drop(self, schema: str, table: str = None):
with self._connect() as connection:
self.drop_all(connection, schema, table)
[docs] @classmethod
def drop_all (cls, connection, schema: str, table: str = None, act = True):
query = """
SELECT
i.relname,
n.nspname,
c.relname
FROM
"pg_catalog"."pg_index" as x
JOIN pg_class as i ON x.indexrelid = i.oid
JOIN pg_class as c ON x.indrelid = c.oid
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE nspname = '{}'
""".format(schema)
if table is not None:
query += " AND c.relname = '{}'".format(table)
with (connection.cursor()) as cursor:
cursor.execute(query)
indices = [row[0] for row in cursor]
logging.info("Found {:d} indices".format(len(indices)))
with (connection.cursor()) as cursor:
for index in indices:
sql = "DROP INDEX {}.{}".format(schema, index)
logging.info(sql)
if act:
cursor.execute(sql)
return
if __name__ == '__main__':
IndexBuilder().run()