import argparse import numpy as np from collections import OrderedDict import os.path def parse_arguments(): parser = argparse.ArgumentParser( description='Produce detailed power usage data for a list of jobids.') parser.add_argument('-v', '--verbose', action='store_true', help='Show database querries, etc.') parser.add_argument('jobid', type=parse_jobid, nargs='+', help='Job ID such as "2260215" or "2260215.hawk-pbs5"') return parser.parse_args() def parse_jobid(s): import re hawkpbs = r'.hawk-pbs5' jobid = re.sub(hawkpbs, '', s) if not jobid.isdigit(): raise argparse.ArgumentTypeError(f'invalid job ID "{s}"') return jobid class Power: def __init__(self, nodes): self.nodes = nodes self.epochs = OrderedDict() self.first_ts = None self.last_ts = None @classmethod def from_list(cls, data): """Assumes data is a list of tuples (timestamp, value, node)""" nodes = list(set([line[2] for line in data])) cls = Power(nodes) # for now ignore order to nodes values = {} for l in data: ts = l[0] if ts not in values: values[ts] = [] # node = l[1] power = l[1] values[ts].append(power) epochs = sorted(values.keys()) for epoch in epochs: cls.insert_epoch(epoch, values[epoch]) return cls @classmethod def from_db(cls, db, jobid, interval): all_list = db.db_to_list(jobid, interval) return Power.from_list(all_list) def to_file(self, jobid): """Dumps power data to file. Returns filename is succesfull and None if unsucessfull.""" fname = self.filename(jobid) if os.path.exists(fname): print("Error: cowardly refusing to overwrite file ", fname) return None try: with open(fname, "w+") as f: f.write(self.header()) f.write(self.body()) except IOError: print("Error: could not write to file ", filename) fname = None return fname def insert_epoch(self, ts, values): self.epochs[ts] = values if not self.first_ts: self.first_ts = ts self.last_ts = ts def header(self): hd = "# all timestamp have unit miliseconds since unix epoch\n" hd += "# all power values have unit Watt\n" hd += "timestamp,delta_t,head_node_power,avg_node_power,median_node_power,min_node_power,max_node_power,std_dev_node_power" # add node names here instead hd += ",NO_NODE_NAMES_YET\n" return hd def body(self): _body = "" for epoch in self.epochs.items(): _body += Power.pretty_print(self.summarize_epoch(epoch)) return _body @staticmethod def summarize_time(ts): return ts, -1 @staticmethod def summarize_values(val): values = np.asarray(val) head = values[0] min, max = values.min(), values.max() avg, stddev = values.mean(), values.std() median = np.median(values) return head, avg, median, min, max, stddev @staticmethod def summarize_epoch(epoch): ts, values = epoch return Power.summarize_time(ts) \ + Power.summarize_values(values) # + values @staticmethod def pretty_print(args): return ",".join(str(a) for a in args) + '\n' def filename(self, jobid): fname = "detailed_power_{jobid}.hawk-pbs5.{first}-{last}.csv".format( jobid=jobid, first=self.first_ts, last=self.last_ts ) return fname class MonitoringDB: def __init__(self, verbose): self.connection = self.init_db(verbose) @staticmethod def init_db(verbose): import sqlalchemy as db _verbose = False #True engine = db.create_engine('postgresql://hpc@hawk-monitor4:5432/coe_mon', echo=verbose) connection = engine.connect() return connection def close_db(self): return self.connection.close() @staticmethod def build_query(jobid, interval): import sqlalchemy as db query_string = """with job as ( select job_id, starttime, endtime, nodes from jobs where job_id='{jobid}.hawk-pbs5' ), node_series as( select n.name, scmcavg.id as series_id from nodes n inner join (select * from label where key='node') l on n.id = l.value::int inner join series_cmc_power_racktraynodepoweravg scmcavg on l.id = scmcavg.labels[( select pos from label_key_position where metric_category= 'cmc_power' and metric_name = 'RackTrayNodePowerAvg' and key = 'node' )] where n.id = any((select nodes from job)::int[]) ) select a.time, a.value, ns.name from ( select time_bucket(extract ('epoch' from '{interval} seconds'::interval)::int*1000, cmcavg.ts) as time, cmcavg.series_id::varchar, avg(cmcavg.val) AS value from cmc_power_racktraynodepoweravg cmcavg where ts <= (select endtime from job) and ts >= (select starttime from job) and series_id = Any(select series_id from node_series) group by time, cmcavg.series_id order by time desc) a inner join node_series ns on a.series_id::int = ns.series_id; """ return db.text(query_string.format(jobid=jobid, interval=interval)) def db_to_list(self, jobid, interval): query = self.build_query(jobid, interval) return self.connection.execute(query).fetchall() def db_to_pf(self, jobid, inerval): query = self.build_query(jobid, interval) return pd.read_sql(query, con=self.connection) class App: def __init__(self, config): self.config = config self.db = MonitoringDB(self.config.verbose) def run_all(self): for jobid in self.config.jobid: power = Power.from_db(self.db, jobid, self.config.interval) fn = power.to_file(jobid) if fn: print('Created file {fn}'.format(fn=fn)) if __name__ == "__main__": config = parse_arguments() config.interval = 5 main = App(config) main.run_all() #power = Power.from_db(DB, config.jobid, config.interval) #print("#epochs", len(power.epochs)) # #print(power.header()) # #epochs_iter = iter(power.epochs.items()) #ts, values = next(epochs_iter) #print(ts, values) #print('time:', power.summarize_time(ts)) #print('values:', power.summarize_values(values)) #print('epoch:', power.pretty_print(power.summarize_epoch((ts, values)))) #print("filename: ", power.to_file(config.jobid))