"""
A utility that prints the statistics about
currently running indexing processes
"""
# Copyright (c) 2021. Harvard University
#
# Developed by Research Software Engineering,
# Faculty of Arts and Sciences, Research Computing (FAS RC)
# Author: Michael A Bouzinier
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import datetime
import logging
import threading
import time
from typing import List, Dict, Optional, Callable
from psycopg2.extras import RealDictCursor
from psycopg2.extensions import connection
from nsaph.db import Connection
from nsaph.loader.common import DBConnectionConfig
from nsaph_utils.utils.context import Argument, Cardinality
INDEX_MON_SQL12 = """
SELECT
now()::TIME(0),
p.command,
a.query,
p.phase,
p.blocks_total,
p.blocks_done,
p.tuples_total,
p.tuples_done,
p.pid
FROM pg_stat_progress_create_index p
JOIN pg_stat_activity a ON p.pid = a.pid
"""
INDEX_MON_SQL11 = """
SELECT
now()::TIME(0),
a.query,
a.state
FROM pg_stat_activity a
WHERE a.query LIKE 'CREATE%INDEX%'
"""
ACTIVITY_QUERY = """
SELECT
"datname",
"pid",
"leader_pid",
"usename",
"application_name",
"client_addr",
"backend_start",
"xact_start",
"backend_xid",
"query_start",
"state_change",
"wait_event_type",
"wait_event",
"state",
"backend_xmin",
"query"
FROM
"pg_catalog"."pg_stat_activity"
"""
LOCK_QUERY = """
SELECT blocked_locks.pid AS blocked_pid,
blocked_activity.usename AS blocked_user,
blocking_locks.pid AS blocking_pid,
blocking_activity.usename AS blocking_user,
blocked_activity.application_name AS blocked_application,
blocking_activity.application_name AS blocking_application
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity
ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks
ON blocking_locks.locktype = blocked_locks.locktype
AND blocking_locks.DATABASE IS NOT DISTINCT FROM blocked_locks.DATABASE
AND blocking_locks.relation IS NOT DISTINCT FROM blocked_locks.relation
AND blocking_locks.page IS NOT DISTINCT FROM blocked_locks.page
AND blocking_locks.tuple IS NOT DISTINCT FROM blocked_locks.tuple
AND blocking_locks.virtualxid IS NOT DISTINCT FROM blocked_locks.virtualxid
AND blocking_locks.transactionid IS NOT DISTINCT FROM blocked_locks.transactionid
AND blocking_locks.classid IS NOT DISTINCT FROM blocked_locks.classid
AND blocking_locks.objid IS NOT DISTINCT FROM blocked_locks.objid
AND blocking_locks.objsubid IS NOT DISTINCT FROM blocked_locks.objsubid
AND blocking_locks.pid != blocked_locks.pid
JOIN pg_catalog.pg_stat_activity blocking_activity
ON blocking_activity.pid = blocking_locks.pid
WHERE
NOT blocked_locks.GRANTED;
"""
ACTIVITY_BY_PID = ACTIVITY_QUERY + "WHERE pid = {pid} or leader_pid = {pid}"
ACTIVITY_BY_DB = ACTIVITY_QUERY + "WHERE datname = '{}'"
[docs]class Lock:
def __init__(self, data: List):
self.blocked_pid = data[0]
self.blocking_pid = data[2]
self.blocked_user = data[1]
self.blocking_user = data[3]
self.blocked_app = data[4]
self.blocking_app = data[5]
def __str__(self) -> str:
msg = "{}[{:d}] is blocked by {}@{}[{}]".format(
self.blocked_app, self.blocked_pid,
self.blocking_user, self.blocking_app, self.blocking_pid
)
return super().__str__()
[docs]class DBMonitorConfig(DBConnectionConfig):
_pid = Argument("pid",
help = "Display monitoring information only for selected process ids",
type = int,
required = False,
aliases = ["p"],
default = None,
cardinality = Cardinality.multiple
)
_state = Argument("state",
help = "Show only processes in the given state",
type = str,
required = False,
default=None,
cardinality = Cardinality.single
)
def __init__(self, subclass, doc):
self.pid:List[int] = []
''' process id list '''
self.state:Optional[str] = None
''' Display only processes in the given state '''
if subclass is None:
super().__init__(DBMonitorConfig, doc)
else:
super().__init__(subclass, doc)
self._attrs += [
attr[1:] for attr in DBMonitorConfig.__dict__
if attr[0] == '_' and attr[1] != '_'
]
[docs]class DBActivityMonitor:
[docs] @classmethod
def get_instance (cls, context: DBConnectionConfig) -> DBMonitorConfig:
if isinstance(context, DBMonitorConfig):
return context
if isinstance(context, DBConnectionConfig):
obj = DBMonitorConfig(None, __doc__)
obj.connection = context.connection
obj.db = context.db
obj.verbose = context.verbose
return obj
raise TypeError(
"{} cannot be cast to DBMonitorConfig"
.format(str(context))
)
def __init__(self, context: DBConnectionConfig = None):
if context:
context = self.get_instance(context)
else:
context = DBMonitorConfig(None, __doc__).instantiate()
self.context = context
self.connection = None
self.blocks: Dict[int,Lock] = dict()
[docs] def run(self):
self.get_blocks()
for lock in self.blocks.values():
print(str(lock))
for msg in self.get_indexing_progress():
print(msg)
if self.context.pid:
for pid in self.context.pid:
for msg in self.get_activity(pid):
print(msg)
else:
for msg in self.get_activity():
print(msg)
def _connect(self):
self.connection = Connection(self.context.db,
self.context.connection,
silent=True,
app_name_postfix=".monitor")
return self.connection.connect()
[docs] def get_blocks(self):
with self._connect() as cnxn:
with cnxn.cursor() as cursor:
cursor.execute(LOCK_QUERY)
for row in cursor:
lock = Lock(row)
self.blocks[lock.blocked_pid] = lock
return
[docs] def get_indexing_progress(self) -> List[str]:
with self._connect() as cnxn:
cursor = cnxn.cursor()
version = cnxn.info.server_version
if version > 120000:
sql = INDEX_MON_SQL12
else:
sql = INDEX_MON_SQL11
cursor.execute(sql)
msgs = []
for row in cursor:
if version > 120000:
t = row[0]
c = row[1]
q = row[2][len(c):].strip().split(" ")
if q:
n = "None"
for x in q:
if x not in ["IF", "NOT", "EXISTS"]:
n = x
break
else:
n = "?"
p = row[3]
b = row[5] * 100.0 / row[4] if row[4] else 0
tp = row[7] * 100.0 / row[6] if row[6] else 0
pid = row[8]
msgs.append("[{}] {}: {}. Blocks: {:2.0f}%, Tuples: {:2.0f}%. PID = {:d}"
.format(str(t), p, n, b, tp, pid))
else:
t = row[0]
q = row[2]
s = row[2]
msgs.append("[{}] {}: {}".format(t, s, q))
return msgs
[docs] def get_activity(self, pid: int = None) -> List[str]:
msgs = []
leaders: List[Dict] = []
workers: List[Dict] = []
with self._connect() as c:
with c.cursor() as cursor:
cursor.execute("SELECT now()")
for row in cursor:
now = row[0]
break
with c.cursor(cursor_factory=RealDictCursor) as cursor:
if pid:
sql = ACTIVITY_BY_PID.format(pid = pid)
else:
db = self.connection.parameters["database"]
sql = ACTIVITY_BY_DB.format(db)
if (self.context.verbose):
print(sql)
cursor.execute(sql)
for row in cursor:
if row["leader_pid"]:
workers.append(row.copy())
else:
leaders.append(row.copy())
for l in [leaders, workers]:
for p in l:
if self.context.state is not None:
if p["state"] != self.context.state:
continue
if self.context.verbose:
activity = Activity(p, self.blocks, now, -1)
else:
activity = Activity(p, self.blocks, now)
msgs.append(str(activity))
return msgs
[docs] @staticmethod
def execute(what: Callable, on_monitor: Callable):
x = threading.Thread(target=what)
x.start()
n = 0
step = 100
while x.is_alive():
time.sleep(0.1)
n += 1
if (n % step) == 0:
on_monitor()
if n > 100000:
step = 6000
elif n > 10000:
step = 600
x.join()
[docs] def log_activity(self, pid: int):
activity = self.get_activity(pid)
for msg in activity:
logging.info(msg)
return
[docs]class Activity:
def __init__(self, activity: Dict, known_blocks, now: datetime, msg_len=32):
self.now = now
self.msg_len = msg_len
self.database = activity["datname"]
self.pid = int(activity["pid"])
self.leader = int(activity["leader_pid"]) if activity["leader_pid"] else None
self.app = activity["application_name"]
self.state = activity["state"] if activity["state"] else "wait"
self.blocked_by = ""
if self.state == "wait" or (
self.state == "active"
and activity["wait_event_type"]
and activity["wait_event"]
):
self.wait = "{} waiting for {}".format(
activity["wait_event"], activity["wait_event_type"]
)
if self.pid in known_blocks:
self.blocked_by = " [{}]".format(known_blocks[self.pid])
else:
self.wait = ""
self.xid = activity["backend_xid"]
self.in_transaction = True if self.xid else False
self.query = activity["query"]
self.start = activity["backend_start"]
self.last = activity["state_change"]
if self.in_transaction:
self.xact_start = activity["xact_start"]
else:
self.xact_start = None
if self.query:
self.query_start = activity["query_start"]
else:
self.query_start = None
def __str__(self):
msg = "{:d}".format(self.pid)
if self.leader:
msg += " <= {}".format(self.leader)
msg += " {}".format(self.state)
if self.app:
if self.database:
app = " [{}:{}]".format(self.app, self.database)
else:
app = " [{}]".format(self.app)
msg += app
msg += ". Started at {}, running for {}".format(
str(self.start),
str(self.now - self.start)
)
if self.in_transaction:
msg += ". Transaction {} started at {}, running {}".format(
self.xid, str(self.xact_start),
str(self.now - self.xact_start)
)
if self.wait:
if self.now and self.last:
wait_duration = self.now - self.last
else:
wait_duration = "Unknown"
msg += ". {} from {} for {}".format(
self.wait.capitalize(),
str(self.last),
str(wait_duration)
)
if self.blocked_by:
msg += self.blocked_by
if self.query:
if self.msg_len < 0:
msg += ". Executing: \n{}".format(self.query)
else:
msg += ". Executing: {}".format(self.query[:self.msg_len])
return msg
if __name__ == '__main__':
DBActivityMonitor().run()