Source code for nsaph.dictionary.tables

#  Copyright (c) 2023. Harvard University
#  Developed by Research Software Engineering,
#  Faculty of Arts and Sciences, Research Computing (FAS RC)
#  Author: Michael A Bouzinier
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  See the License for the specific language governing permissions and
#  limitations under the License.

Module to create list of tables from the Domain Data Model
import math
import re
from typing import Dict, Optional, List, Set
import html
import sqlparse

from nsaph.data_model.domain import Domain
from nsaph.dictionary.columns import Column
from nsaph.dictionary.element import HTML, DataModelElement, Graph, fqn, qstr, \
    attrs2string, add_row, start_table, add_html_row, end_table, add_header_row, \
    hr, start_invisible_row, end_invisible_row

[docs]class Table(DataModelElement): def __init__(self, schema: str, name: str, domain: Domain, domain_dict): = name self.schema = schema self.qualified_name = fqn(self.schema, super().__init__(domain.find(self.qualified_name)) print("Adding: " + fqn(schema, name)) self.domain = domain self.domain_dict = domain_dict self.domain_dict.tables[fqn(schema, name)] = self self.aggregation:Optional[Aggregation] = None self.union:Optional[Union] = None self.join:Optional[Join] = None self.parent = None self.type: str = "Table" self.master = None create_block = None if "create" in self.block: create_block:Dict[str, Dict] = self.block["create"] self.aggregation = self.get_aggregation(schema, name, create_block) if "type" in create_block: self.type = str(create_block["type"]) if "from" in create_block: from_block = create_block["from"] if self.aggregation is None: if isinstance(from_block, list): self.union = Union(schema, from_block, self.domain_dict) elif isinstance(from_block, str): if "join" in from_block: self.join = Join(schema, from_block, self.domain_dict) else: tname = fqn(schema, from_block) self.master = self.domain_dict.tables[tname] if "children" in self.block: children_block = self.block["children"] assert isinstance(children_block, dict) for child in children_block: child_table = Table(schema, child, self.domain, self.domain_dict) child_table.parent = self relation = Relation("parent-child", self, child_table) self.domain_dict.relations.append(relation) if self.aggregation is not None: relation = Relation( "aggregation", self.aggregation.parent, self, self.aggregation.on ) elif self.parent is not None: relation = Relation("parent-child", self.parent, self) elif self.master is not None: relation = Relation("transformation", self.master, self) elif self.join is not None: for t in self.join.tables: relation = Relation("join", t, self, self.join.on) self.domain_dict.relations.append(relation) relation = None elif self.union is not None: for t in self.union.tables: relation = Relation("union", t, self) self.domain_dict.relations.append(relation) relation = None else: relation = None if relation is not None: self.domain_dict.relations.append(relation) self.columns = dict() if "columns" in self.block: columns_block = self.block["columns"] for column_block in columns_block: cblocks = Column.expand_macro1(column_block) for cblock in cblocks: column = Column(self.qualified_name, cblock) self.columns[] = column if self.master is not None and create_block and "select" in create_block: self.add_master_columns() self.all_predecessors = None self.base_predecessors = None
[docs] def add_master_columns(self): identifiers = Domain.get_select_from(self.block) if not identifiers: return for identifier in identifiers: if isinstance(identifier, sqlparse.sql.Identifier): c = identifier.get_name() if c in self.master.columns: self.add_column_from_master(c) else: self.add_column_from_sql(identifier) elif identifier.ttype[0] == "Wildcard": for c in self.master.columns: self.add_column_from_master(c)
[docs] def add_column_from_master(self, c: str): p_column = self.master.columns[c] column_block = {c: p_column.block} column = Column(self.qualified_name, column_block) column.copied = True self.columns[] = column
[docs] def add_column_from_sql(self, identifier: sqlparse.sql.Identifier): c = identifier.get_name() column = Column(self.qualified_name, {c: None}) column.expression = [identifier.value] column.copied = False identifiers = self.get_source_columns(identifier) column.predecessors.update(identifiers) self.columns[] = column
[docs] def get_source_columns(self, identifier: sqlparse.sql.Identifier) -> Set[str]: identifiers = set() for t in identifier.tokens: if t.is_keyword and t.value.lower() == "as": break if isinstance(t, sqlparse.sql.Identifier) or t.is_group: tname = t.get_name() if tname in self.master.columns: identifiers.add(tname) elif len(t.tokens) > 1: identifiers.update(self.get_source_columns(t)) return identifiers
[docs] def get_aggregation(self, schema: str, name: str, create_block: dict): if "group by" not in create_block: return None group_by_block :Dict[str, Dict] = create_block["group by"] assert isinstance(group_by_block, list) parent = create_block["from"] assert isinstance(parent, str) return Aggregation(schema, name, parent, group_by_block, self.domain_dict)
def __str__(self) -> str: return self.type + " " + self.qualified_name def __eq__(self, __value): if isinstance(__value, Table): if == and self.schema == __value.schema: return True return False
[docs] def is_downstream(self, predecessor: str): if predecessor not in self.domain_dict.tables: raise ValueError(f"Unknown requirement {predecessor}") t = self.domain_dict.tables[predecessor] for r in self.domain_dict.relations: if r.x == t and r.y == self: return True return False
[docs] def add_predecessors_for_column(self, c: str, lst: List): for t in self.columns[c.lower()].requires: if self.is_downstream(t): continue table = self.domain_dict.tables[t] relation = Relation("custom-select", table, self) self.domain_dict.relations.append(relation) lst.append(relation) return
[docs] def get_predecessors(self) -> List: if self.base_predecessors is None: self.base_predecessors = [] for relation in self.domain_dict.relations: if relation.y == self: self.base_predecessors.append(relation) self.all_predecessors = [] self.all_predecessors.extend(self.base_predecessors) for c in self.columns: self.add_predecessors_for_column(c, self.all_predecessors) return self.all_predecessors
[docs] def get_predecessors_for_column(self, column: str) -> List: if self.base_predecessors is None: self.get_predecessors() predecessors = [] predecessors.extend(self.base_predecessors) self.add_predecessors_for_column(column, predecessors) return predecessors
[docs] def calculate_column_lineage(self, column: Column, graph: Graph): links = self.get_column_links(column) if len(links) > 3: group = "parent_" + column.qualified_name else: group = None for link in links: graph.add_edge(link) parent = qstr(link.x.qualified_name) graph.add_node(parent, repr(link.x), group) link.relation.x.calculate_column_lineage(link.x, graph) return
[docs] def column_lineage_to_dot(self, column_name: str, out): column = self.columns[column_name] graph = Graph() graph.add_node(qstr(column.qualified_name), repr(column)) self.calculate_column_lineage(column, graph) graph.print(indent=1, file=out)
[docs] def describe_column_type(self, column: Column): if column.column_type: return column.column_type if column.copied: return "copied" if column.expression: return "computed" if self.aggregation: if in self.aggregation.columns: return "grouping" else: return "aggregated" if column.is_transformed(): return "transformed" return ""
[docs] def describe_html(self, basedir) -> str: text = start_table(border=0) text += start_invisible_row() text += start_table(border=2) header = f'<FONT POINT-SIZE="20"><b>{self.type}: {self.qualified_name}</b></FONT>' text += add_html_row([header], border=2) if self.reference: text += f'<tr><td align = "center" border = "0"><i>For more information see: {self.reference}</i></td></tr>\n' if self.description and "text" in self.description: value = html.escape(self.description["text"]) text += add_row([value]) if self.parent: name = self.parent.qualified_name text += add_row([f"Child table of {self.link_to_table(name)}"]) if self.aggregation: name = self.aggregation.parent.qualified_name text += add_row([f"Aggregated from {self.link_to_table(name)} on {self.aggregation.on}"]) if self.master: name = self.master.qualified_name text += add_row([f"Transformed from {self.link_to_table(name)}"]) elif self.join: name = self.join.sql text += add_row([f"Join of tables: {name}"]) if "primary_key" in self.block: pk = self.block["primary_key"] pk_text = ", ".join(pk) text += add_row([f"Primary Key: {pk_text}"]) text += end_table() text += end_invisible_row() text += hr() text += start_invisible_row() text += start_table(border=2) text += add_header_row(["Column name", "Column type", "Datatype"]) for c in sorted(self.columns): column = self.columns[c] text += add_row([ self.link_to_column(, basedir), self.describe_column_type(column), column.datatype ]) text += end_table() text += end_invisible_row() text += end_table() return text
[docs] def html(self, of: str, svg = None): body = self.describe_html(self.qualified_name) if svg: body += "<hr/>\n" body += f'<object data="{svg}" type="image/svg+xml"> </object>' block = HTML.format( title = f"Table {self.qualified_name}", body = body ) with open(of, "wt") as out: print(block, file=out)
[docs]class Aggregation: def __init__(self, schema: str, name: str, parent: str, columns: List, domain_dict): self.domain_dict = domain_dict self.table = self.domain_dict.tables[fqn(schema, name)] if '.' not in parent: parent = fqn(schema, parent) self.parent = self.domain_dict.tables[parent] self.columns = columns self.on = "On " + ", ".join(self.columns)
[docs]class Union: def __init__(self, schema: str, tables: List[str], domain_dict): self.domain_dict = domain_dict self.tables = [] for t in tables: if '.' not in t: t = fqn(schema, t) pattern = re.compile(t.replace('*', ".*")) for t2 in self.domain_dict.tables: if pattern.fullmatch(t2): self.tables.append(self.domain_dict.tables[t2]) return
[docs]class Join: def __init__(self, schema: str, s: str, domain_dict): self.domain_dict = domain_dict self.sql = s natural_join = "natural" in s s = s.lower() for keyword in ["natural", "left", "right", "full"]: s = s.replace(keyword, '') i = s.find("on") if i > 0: s = s[:i] self.on = s[i:] elif natural_join: self.on = "natural" else: self.on = None tables = s.split("join") self.tables = [] for table in tables: table = table.strip() if '.' not in table: table = fqn(schema, table) self.tables.append(self.domain_dict.tables[table]) return
[docs]class Relation: def __init__(self, reltype: str, x: Table, y: Table, data = None): self.type = reltype self.x = x self.y = y = data
[docs] def label(self): return if is not None else ''
[docs] def as_edge_attr(self) -> Dict: attrs = dict() if is None: attrs["label"] = '\t' + qstr(self.type) else: attrs["label"] = f'"\t{self.type} {self.label()}"' if self.type == "aggregation": attrs["penwidth"] = 5 attrs["color"] = "blue" elif self.type == "union": attrs["penwidth"] = 2 attrs["color"] = "chocolate" elif self.type == "parent-child": attrs["penwidth"] = 2 attrs["color"] = "darkorchid" elif self.type == "custom-select": attrs["penwidth"] = 1 attrs["color"] = "chartreuse" else: attrs["penwidth"] = 1 attrs["color"] = "black" return attrs
def __str__(self) -> str: data = self.label() return f'{self.type}: {str(self.x)} => {str(self.y)} {data}'