hawk-utils-scripts/monitoring/db/scripts/get_detailed_power.py

228 lines
5.9 KiB
Python
Raw Normal View History

2023-11-22 15:04:30 +00:00
#!/usr/bin/env python3
2023-11-20 10:39:11 +00:00
import argparse
import numpy as np
from collections import OrderedDict
import os.path
2023-11-17 15:25:51 +00:00
2023-11-22 13:35:19 +00:00
def parse_arguments(args):
2023-11-17 15:25:51 +00:00
parser = argparse.ArgumentParser(
description='Produce detailed power usage data for a list of jobids.')
parser.add_argument('-v', '--verbose', action='store_true',
help='Show database querries, etc.')
parser.add_argument('-t', '--interval', action='store', type=float, default=5.0,
help="Interval between power values in seconds")
2023-11-17 15:25:51 +00:00
parser.add_argument('jobid', type=parse_jobid,
2023-11-22 09:30:36 +00:00
nargs='+',
2023-11-20 10:39:11 +00:00
help='Job ID such as "2260215" or "2260215.hawk-pbs5"')
2023-11-17 15:25:51 +00:00
2023-11-22 13:35:19 +00:00
return parser.parse_args(args)
2023-11-17 15:25:51 +00:00
def parse_jobid(s):
import re
hawkpbs = r'.hawk-pbs5'
jobid = re.sub(hawkpbs, '', s)
if not jobid.isdigit():
raise argparse.ArgumentTypeError(f'invalid job ID "{s}"')
return jobid
class Power:
def __init__(self, nodes, interval=-1):
self.nodes = nodes
self.epochs = OrderedDict()
self.first_ts = None
self.last_ts = None
self.interval = interval
@classmethod
def from_list(cls, data):
2024-02-07 09:20:13 +00:00
"""Assumes data is a list of tuples (timestamp, node, value)"""
idx_ts = 0; idx_node = 1; idx_value = 2
nodes = list(set([line[idx_node] for line in data]))
cls = Power(nodes)
2023-11-20 15:04:19 +00:00
# for now ignore order to nodes
values = {}
for l in data:
2024-02-07 09:20:13 +00:00
ts = l[idx_ts]
if ts not in values:
values[ts] = []
# node = l[1]
2024-02-07 09:20:13 +00:00
power = l[idx_value]
values[ts].append(power)
epochs = sorted(values.keys())
for epoch in epochs:
cls.insert_epoch(epoch, values[epoch])
return cls
2023-11-20 14:53:52 +00:00
@classmethod
def from_db(cls, db, jobid, interval):
all_list = db.db_to_list(jobid, interval)
2023-11-22 09:52:49 +00:00
if not all_list:
raise RuntimeError
power = cls.from_list(all_list)
power.set_interval(interval*1000) # milliseconds
return power
2024-02-07 10:31:30 +00:00
def to_file(self, jobid, header=""):
2023-11-20 14:53:52 +00:00
"""Dumps power data to file. Returns filename is succesfull and None if unsucessfull."""
fname = self.filename(jobid)
if os.path.exists(fname):
print("Error: cowardly refusing to overwrite file ", fname)
return None
try:
with open(fname, "w+") as f:
2024-02-07 10:31:30 +00:00
f.write(header + self.header())
2023-11-20 14:53:52 +00:00
f.write(self.body())
except IOError:
2023-11-22 13:46:51 +00:00
print("Error: could not write to file ", fname)
2023-11-20 14:53:52 +00:00
fname = None
return fname
def set_interval(self, interval):
self.interval = interval
2023-11-20 14:53:52 +00:00
def insert_epoch(self, ts, values):
self.epochs[ts] = values
if not self.first_ts:
self.first_ts = ts
self.last_ts = ts
def header(self):
hd = "# all timestamp have unit miliseconds since unix epoch\n"
hd += "# all power values have unit Watt\n"
hd += "timestamp,delta_t,head_node_power,avg_node_power,median_node_power,min_node_power,max_node_power,std_dev_node_power"
# add node names here instead
hd += ",NO_NODE_NAMES_YET\n"
return hd
2023-11-20 14:53:52 +00:00
def body(self):
_body = ""
for epoch in self.epochs.items():
2023-11-22 13:46:25 +00:00
_body += self.pretty_print(self.summarize_epoch(epoch))
2023-11-20 14:53:52 +00:00
return _body
def summarize_time(self, ts):
return ts, self.interval
2023-11-20 14:53:52 +00:00
@staticmethod
def summarize_values(val):
values = np.asarray(val)
head = values[0]
min, max = values.min(), values.max()
avg, stddev = values.mean(), values.std()
median = np.median(values)
return head, avg, median, min, max, stddev
def summarize_epoch(self, epoch):
ts, values = epoch
return self.summarize_time(ts) \
+ self.summarize_values(values)
# + values
@staticmethod
def pretty_print(args):
return ",".join(str(a) for a in args) + '\n'
def filename(self, jobid):
fname = "detailed_power_{jobid}.hawk-pbs5.{first}-{last}.csv".format(
jobid=jobid, first=self.first_ts, last=self.last_ts
)
return fname
2023-11-22 09:57:11 +00:00
2023-11-20 15:21:54 +00:00
class MonitoringDB:
2024-02-06 08:04:52 +00:00
QUERY_STRING_HAWK = """
-- For description of get_job_data(), see https://kb.hlrs.de/monitoring/index.php/TimescaleDB_-_Query_Guidelines#Function:_get_job_data_and_get_ai_job_data
select * from get_job_data(
'{jobid}.hawk-pbs5',
'cmc_power_racktraynodepoweravg', -- power data source
'{interval} seconds',
array['avg'], -- aggregation: average across samples in bucket
array['time','node'] -- sort by time first than node (ascending)
)
as t(time bigint, name varchar, avg double precision);
"""
2024-02-06 08:04:52 +00:00
def __init__(self, verbose):
self.connection = self.init_db(verbose)
@staticmethod
def init_db(verbose):
import sqlalchemy as db
_verbose = False #True
engine = db.create_engine('postgresql://hpc@hawk-monitor4:5432/coe_mon', echo=verbose)
connection = engine.connect()
return connection
def close_db(self):
return self.connection.close()
@classmethod
def build_query(cls, jobid, interval):
import sqlalchemy as db
query_string = cls.QUERY_STRING_HAWK
2023-11-20 15:21:54 +00:00
return db.text(query_string.format(jobid=jobid, interval=interval))
def db_to_list(self, jobid, interval):
query = self.build_query(jobid, interval)
return self.connection.execute(query).fetchall()
2023-11-20 15:21:54 +00:00
def db_to_pf(self, jobid, inerval):
query = self.build_query(jobid, interval)
return pd.read_sql(query, con=self.connection)
2023-11-22 09:57:11 +00:00
2023-11-22 09:30:36 +00:00
class App:
def __init__(self, config):
self.config = config
self.db = MonitoringDB(self.config.verbose)
2024-02-07 13:06:53 +00:00
@staticmethod
def warnings(config):
warn = ""
if config.interval < 5:
warn += '# Warning: interval<5 is very small and may lead to data gaps.'
return warn
2023-11-22 09:30:36 +00:00
def run_all(self):
2024-02-07 13:06:53 +00:00
warnings = self.warnings(self.config)
if warnings:
print(warnings)
header = f"# {config.datetime}: {config.cmd}\n"
if warnings:
header += f"{warnings}\n"
header += "#\n"
2023-11-22 09:30:36 +00:00
for jobid in self.config.jobid:
2023-11-22 09:52:49 +00:00
try:
power = Power.from_db(self.db, jobid, self.config.interval)
except RuntimeError:
print('No data found for job ID "{}"'.format(jobid))
continue
2023-11-22 09:57:11 +00:00
2024-02-07 10:31:30 +00:00
fn = power.to_file(jobid, header)
2023-11-22 09:30:36 +00:00
if fn:
print('Created file {fn}'.format(fn=fn))
if __name__ == "__main__":
2023-11-22 13:35:19 +00:00
import sys
2024-02-07 10:31:30 +00:00
from datetime import datetime
2023-11-22 13:35:19 +00:00
config = parse_arguments(sys.argv[1:])
2024-02-07 10:31:30 +00:00
config.cmd = " ".join(sys.argv)
config.datetime = f"{datetime.now()}"
2023-11-17 15:25:51 +00:00
2023-11-22 09:30:36 +00:00
main = App(config)
main.run_all()