import numpy as np from collections import OrderedDict import os.path def init_db(): import sqlalchemy as db _verbose = False #True engine = db.create_engine('postgresql://hpc@hawk-monitor4:5432/coe_mon', echo=_verbose) conn = engine.connect() return conn def init_query(jobid, interval): import sqlalchemy as db query_string = """with job as ( select job_id, starttime, endtime, nodes from jobs where job_id='{jobid}.hawk-pbs5' ), node_series as( select n.name, scmcavg.id as series_id from nodes n inner join (select * from label where key='node') l on n.id = l.value::int inner join series_cmc_power_racktraynodepoweravg scmcavg on l.id = scmcavg.labels[( select pos from label_key_position where metric_category= 'cmc_power' and metric_name = 'RackTrayNodePowerAvg' and key = 'node' )] where n.id = any((select nodes from job)::int[]) ) select a.time, a.value, ns.name from ( select time_bucket(extract ('epoch' from '{interval} seconds'::interval)::int*1000, cmcavg.ts) as time, cmcavg.series_id::varchar, avg(cmcavg.val) AS value from cmc_power_racktraynodepoweravg cmcavg where ts <= (select endtime from job) and ts >= (select starttime from job) and series_id = Any(select series_id from node_series) group by time, cmcavg.series_id order by time desc) a inner join node_series ns on a.series_id::int = ns.series_id; """ return db.text(query_string.format(jobid=jobid, interval=interval)) def db_to_list(connection, query): return connection.execute(query).fetchall() def db_to_pf(connection, query): return pd.read_sql(query, con=connection) class Power: def __init__(self, nodes): self.nodes = nodes self.epochs = OrderedDict() self.first_ts = None self.last_ts = None def insert_epoch(self, ts, values): self.epochs[ts] = values if not self.first_ts: self.first_ts = ts self.last_ts = ts @classmethod def from_list(cls, data): """Assumes data is a list of tuples (timestamp, value, node)""" nodes = list(set([line[2] for line in data])) cls = Power(nodes) #times = list(set([line[0] for line in data])) # for now ignore order to nodes values = {} for l in data: ts = l[0] if ts not in values: values[ts] = [] # node = l[1] power = l[1] values[ts].append(power) epochs = sorted(values.keys()) for epoch in epochs: cls.insert_epoch(epoch, values[epoch]) return cls def header(self): hd = "# all timestamp have unit miliseconds since unix epoch\n" hd += "# all power values have unit Watt\n" hd += "timestamp,delta_t,head_node_power,avg_node_power,median_node_power,min_node_power,max_node_power,std_dev_node_power" # add node names here instead hd += ",NO_NODE_NAMES_YET\n" return hd @staticmethod def summarize_values(val): values = np.asarray(val) head = values[0] min, max = values.min(), values.max() avg, stddev = values.mean(), values.std() median = np.median(values) return head, avg, median, min, max, stddev @staticmethod def summarize_time(ts): return ts, -1 @staticmethod def summarize_epoch(epoch): ts, values = epoch return Power.summarize_time(ts) \ + Power.summarize_values(values) # + values @staticmethod def pretty_print(args): return ",".join(str(a) for a in args) + '\n' def body(self): _body = "" for epoch in self.epochs.items(): _body += Power.pretty_print(self.summarize_epoch(epoch)) return _body def filename(self, jobid): fname = "detailed_power_{jobid}.hawk-pbs5.{first}-{last}.csv".format( jobid=jobid, first=self.first_ts, last=self.last_ts ) return fname def to_file(self, jobid): """Dumps power data to file. Returns filename is succesfull and None if unsucessfull.""" fname = self.filename(jobid) if os.path.exists(fname): print("Error: cowardly refusing to overwrite file ", fname) return None try: with open(fname, "w+") as f: f.write(self.header()) f.write(self.body()) except IOError: print("Error: could not write to file ", filename) fname = None return fname if __name__ == "__main__": conn = init_db() jobid = "2260215" interval = 5 query = init_query(jobid, interval) all_list = db_to_list(conn, query) #all_df = db_to_df(conn, query) power = Power.from_list(all_list) # , jobid, interval) print("#epochs", len(power.epochs)) print(power.header()) epochs_iter = iter(power.epochs.items()) ts, values = next(epochs_iter) print(ts, values) print('time:', power.summarize_time(ts)) print('values:', power.summarize_values(values)) print('epoch:', power.pretty_print(power.summarize_epoch((ts, values)))) print("filename: ", power.to_file(jobid))