From 831a85639bed812bf6817bb5a477531261694654 Mon Sep 17 00:00:00 2001 From: Jose Gracia Date: Thu, 26 Oct 2023 16:19:16 +0200 Subject: [PATCH] Add prototype to query job power from database --- monitoring/db/README.md | 17 ++ monitoring/db/scripts/get_detailed_power.py | 174 ++++++++++++++++++++ 2 files changed, 191 insertions(+) create mode 100644 monitoring/db/scripts/get_detailed_power.py diff --git a/monitoring/db/README.md b/monitoring/db/README.md index 9190f9b..e29ecd3 100644 --- a/monitoring/db/README.md +++ b/monitoring/db/README.md @@ -1,3 +1,20 @@ # hawk-utils-scripts/monitoring/db Use this directory for scripts, etc, that operate on or extract data from the the database. + +## scripts/get_detailed_power.py + +Python script which querries the database for a list of jobids and produces a file for each jobid. + +Those files contain time-resolved power consumption data and are meant to be consumed by the utility in [monitoring/logs/scripts/plot_energy_logs.sh](../logs/README.md#scripts/plot_energy_logs.sh). + +Requirements: + - Python module sqlalchemy `python -m pip install sqlalchemy` + + Usage: + ```bash + ssh monitoring + ./get_detailed_power.py 234556 767869.hawk-pbs5 + Saved file detailed_power.234556.hawk-pbs5.100000-1000004.csv + Saved file detailed_power.767869.hawk-pbs5.2432345-423423.csv + ``` diff --git a/monitoring/db/scripts/get_detailed_power.py b/monitoring/db/scripts/get_detailed_power.py new file mode 100644 index 0000000..023e4ba --- /dev/null +++ b/monitoring/db/scripts/get_detailed_power.py @@ -0,0 +1,174 @@ +import sqlalchemy as db +import numpy as np +from collections import OrderedDict +import os.path + +def init_db(): + _verbose = False #True + engine = db.create_engine('postgresql://hpc@hawk-monitor4:5432/coe_mon', echo=_verbose) + conn = engine.connect() + return conn + +def init_query(jobid, interval): + query_string = """with job as ( + select job_id, starttime, endtime, nodes from jobs where job_id='{jobid}.hawk-pbs5' +), +node_series as( + select n.name, scmcavg.id as series_id from nodes n + inner join (select * from label where key='node') l on n.id = l.value::int + inner join series_cmc_power_racktraynodepoweravg scmcavg on l.id = scmcavg.labels[( + select pos from label_key_position + where metric_category= 'cmc_power' + and metric_name = 'RackTrayNodePowerAvg' + and key = 'node' + )] + where n.id = any((select nodes from job)::int[]) +) +select a.time, a.value, ns.name from ( + select + time_bucket(extract ('epoch' from '{interval} seconds'::interval)::int*1000, cmcavg.ts) as time, + cmcavg.series_id::varchar, + avg(cmcavg.val) AS value + from cmc_power_racktraynodepoweravg cmcavg +where + ts <= (select endtime from job) + and ts >= (select starttime from job) + and series_id = Any(select series_id from node_series) +group by time, cmcavg.series_id order by time desc) a +inner join node_series ns on a.series_id::int = ns.series_id; +""" + return db.text(query_string.format(jobid=jobid, interval=interval)) + +def db_to_list(connection, query): + return connection.execute(query).fetchall() + +def db_to_pf(connection, query): + return pd.read_sql(query, con=connection) + +class Power: + def __init__(self, nodes): + self.nodes = nodes + self.epochs = OrderedDict() + self.first_ts = None + self.last_ts = None + + def insert_epoch(self, ts, values): + self.epochs[ts] = values + if not self.first_ts: + self.first_ts = ts + self.last_ts = ts + + @classmethod + def from_list(cls, data): + """Assumes data is a list of tuples (timestamp, value, node)""" + nodes = list(set([line[2] for line in data])) + cls = Power(nodes) + + #times = list(set([line[0] for line in data])) + + # for now ignore order to nodes + values = {} + for l in data: + ts = l[0] + if ts not in values: + values[ts] = [] + # node = l[1] + power = l[1] + values[ts].append(power) + + epochs = sorted(values.keys()) + for epoch in epochs: + cls.insert_epoch(epoch, values[epoch]) + + return cls + + def header(self): + hd = "# all timestamp have unit miliseconds since unix epoch\n" + hd += "# all power values have unit Watt\n" + hd += "timestamp,delta_t,head_node_power,avg_node_power,median_node_power,min_node_power,max_node_power,std_dev_node_power" + # add node names here instead + hd += ",NO_NODE_NAMES_YET\n" + return hd + + @staticmethod + def summarize_values(val): + values = np.asarray(val) + head = values[0] + min, max = values.min(), values.max() + avg, stddev = values.mean(), values.std() + median = np.median(values) + return head, avg, median, min, max, stddev + + + @staticmethod + def summarize_time(ts): + return ts, -1 + + @staticmethod + def summarize_epoch(epoch): + ts, values = epoch + return Power.summarize_time(ts) \ + + Power.summarize_values(values) + # + values + + @staticmethod + def pretty_print(args): + return ",".join(str(a) for a in args) + '\n' + + + def body(self): + _body = "" + for epoch in self.epochs.items(): + _body += Power.pretty_print(self.summarize_epoch(epoch)) + return _body + + def filename(self, jobid): + fname = "detailed_power_{jobid}.hawk-pbs5.{first}-{last}.csv".format( + jobid=jobid, first=self.first_ts, last=self.last_ts + ) + return fname + + + def to_file(self, jobid): + """Dumps power data to file. Returns filename is succesfull and None if unsucessfull.""" + fname = self.filename(jobid) + if os.path.exists(fname): + print("Error: cowardly refusing to overwrite file ", fname) + return None + + try: + with open(fname, "w+") as f: + f.write(self.header()) + f.write(self.body()) + except IOError: + print("Error: could not write to file ", filename) + fname = None + + return fname + + +if __name__ == "__main__": + conn = init_db() + jobid = "2260215" + interval = 5 + query = init_query(jobid, interval) + + all_list = db_to_list(conn, query) + #all_df = db_to_df(conn, query) + + power = Power.from_list(all_list) # , jobid, interval) + print("#epochs", len(power.epochs)) + + print(power.header()) + + epochs_iter = iter(power.epochs.items()) + ts, values = next(epochs_iter) + print(ts, values) + print('time:', power.summarize_time(ts)) + print('values:', power.summarize_values(values)) + print('epoch:', power.pretty_print(power.summarize_epoch((ts, values)))) + print("filename: ", power.to_file(jobid)) + + + +