#!/usr/bin/env python3 import argparse import numpy as np from collections import OrderedDict import os.path def parse_arguments(args): parser = argparse.ArgumentParser( description='Produce detailed power usage data for a list of jobids.') parser.add_argument('-v', '--verbose', action='store_true', help='Show database querries, etc.') parser.add_argument('-t', '--interval', action='store', type=float, default=5.0, help="Interval between power values in seconds") parser.add_argument('--hawk-ai', action='store_true', help="Job did run on Hawk-AI") parser.add_argument('jobid', type=parse_jobid, nargs='+', help='Job ID such as "2260215" or "2260215.hawk-pbs5"') return parser.parse_args(args) def parse_jobid(s): import re hawkpbs = r'.hawk-pbs5' jobid = re.sub(hawkpbs, '', s) not_allowed = r'[^0-9\[\]]' # Jobid can be more complex than just digits, eg 2444420[201] if re.search(not_allowed, jobid): raise argparse.ArgumentTypeError(f'invalid job ID "{s}"') return jobid class Power: def __init__(self, nodes): self.nodes = nodes self.epochs = None self.first_ts = None self.last_ts = None self.warnings = "" @classmethod def from_list(cls, data): """ Returns a Power instance from a list of tuples (timestamp, node, value). Assumptions: - data is sorted by timestamp ascending - for each timestamp, there is the same set of nodes and in the same order """ idx_ts = 0; idx_node = 1; idx_value = 2 nodes = list(OrderedDict.fromkeys([line[idx_node] for line in data])) # preserves order of nodes power = Power(nodes) values = {} for l in data: ts = l[idx_ts] if ts not in values: values[ts] = [] value = l[idx_value] values[ts].append(value) epochs = values.keys() for epoch in epochs: power.insert_epoch(epoch, values[epoch]) # check implicit assumptions: 1) ts/epochs are sorted e = list(epochs) k = list(values.keys()) if not e == k: power.warnings += "# Warning: Unexpected unsorted timestamps.\n" # check implicit assumptions: 2) each line has #nodes values nnodes = len(nodes) for epoch in epochs: actual = len(values[epoch]) if actual != nnodes: power.warnings += "# Warning: Unexpected number of nodes ({actual}/{expected})\n".format(actual=actual, expected=nnodes) break return power @classmethod def from_db(cls, db, jobid, interval, hawk_ai): df = db.db_to_pf(jobid, interval, hawk_ai) power = cls.from_pandas(df, {'time': 'time', 'name': 'node', 'avg': 'power'}) return power @classmethod def from_pandas(cls, dataframe, columns): dataframe.rename(columns=columns, inplace=True) by_node = dataframe.groupby('node') nodes = list(by_node.groups.keys()) power = cls(nodes) power.epochs = dataframe.groupby('time') power.dataframe = dataframe power.by_node = power.dataframe.groupby('node') power.energy = power.summarize_energy() return power def to_file(self, jobid, header=""): """Dumps power data to file. Returns filename is succesfull and None if unsucessfull.""" fname = self.filename(jobid) if os.path.exists(fname): print("Error: cowardly refusing to overwrite file ", fname) return None header += self.warnings header += self.energy try: with open(fname, "w+") as f: f.write(header + self.header()) f.write(self.body()) except IOError: print("Error: could not write to file ", fname) fname = None return fname def insert_epoch(self, ts, values): self.epochs[ts] = values if not self.first_ts: self.first_ts = ts self.last_ts = ts def header(self): hd = "# all timestamp have unit miliseconds since unix epoch\n" hd += "# all power values have unit Watt\n" hd += "timestamp,RESERVED,head_node_power,avg_node_power,median_node_power,min_node_power,max_node_power,std_dev_node_power" # add node names here instead hd += "," + ",".join(self.nodes) hd += "\n" return hd def body(self): _body = "" for epoch in self.epochs: _body += self.pretty_print(self.summarize_epoch(epoch)) return _body def summarize_time(self, ts): return ts, "" @staticmethod def summarize_values(df): values = df['power'].values head = values[0] min, max = values.min(), values.max() avg, stddev = values.mean(), values.std() median = np.median(values) return head, avg, median, min, max, stddev def summarize_epoch(self, epoch): ts, values = epoch return self.summarize_time(ts) \ + self.summarize_values(values) \ + self.all_values(values) def all_values(self, values): return tuple(values['power'].tolist()) def energy_total(self): energy = None if hasattr(self, "by_node"): energy = self.by_node.apply(self.energy_node).sum() return energy @staticmethod def energy_node(group): """Left-sided Riemann sum is enough, as time is lower bound of bucket""" delta_t = group["time"].diff().shift(-1)/1000. # in seconds pow = group['power'] return (delta_t * pow).iloc[:-1].sum() def summarize_energy(self): return "# Total energy consumed by job: {energy:.0f} J\n".format(energy=self.energy_total()) @staticmethod def pretty_print(args): return ",".join(str(a) for a in args) + '\n' def filename(self, jobid): fname = "detailed_power_{jobid}.hawk-pbs5.{first}-{last}.csv".format( jobid=jobid, first=self.first_ts, last=self.last_ts ) return fname class MonitoringDB: QUERY_STRING_HAWK = """ -- For description of get_job_data(), see https://kb.hlrs.de/monitoring/index.php/TimescaleDB_-_Query_Guidelines#Function:_get_job_data_and_get_ai_job_data select * from get_job_data( '{jobid}.hawk-pbs5', 'cmc_power_racktraynodepoweravg', -- power data source '{interval} seconds', array['avg'], -- aggregation: average across samples in bucket array['time','node'] -- sort by time first than node (ascending) ) as t(time bigint, name varchar, avg double precision); """ QUERY_STRING_HAWK_AI = """ -- For description of get_ai_job_data(), see https://kb.hlrs.de/monitoring/index.php/TimescaleDB_-_Query_Guidelines#Function:_get_job_data_and_get_ai_job_data select * from get_ai_job_data( '{jobid}.hawk-pbs5', 'telegraf_ipmi_power_meter', -- power data source '{interval} seconds', array['avg'], -- aggregation: average across samples in bucket array['time','node'] -- sort by time first than node (ascending) ) as t(time bigint, name varchar, avg double precision); """ def __init__(self, verbose): self.connection = self.init_db(verbose) @staticmethod def init_db(verbose): import sqlalchemy as db engine = db.create_engine('postgresql://hpc@hawk-monitor4:5432/coe_mon', echo=verbose) connection = engine.connect() return connection def close_db(self): return self.connection.close() @classmethod def build_query(cls, jobid, interval, hawk_ai): import sqlalchemy as db if hawk_ai: query_string = cls.QUERY_STRING_HAWK_AI else: query_string = cls.QUERY_STRING_HAWK return db.text(query_string.format(jobid=jobid, interval=interval)) def db_to_list(self, jobid, interval, hawk_ai): query = self.build_query(jobid, interval, hawk_ai) return self.connection.execute(query).fetchall() def db_to_pf(self, jobid, interval, hawk_ai): import pandas as pd query = self.build_query(jobid, interval, hawk_ai) return pd.read_sql(query, con=self.connection) class App: def __init__(self, config): self.config = config self.db = MonitoringDB(self.config.verbose) @staticmethod def warnings(config): warn = "" if not config.hawk_ai and config.interval < 5: warn += '# Warning: interval<5 is very small and may lead to data gaps.' if config.hawk_ai and config.interval < 60: warn += '# Warning: interval<60 is very small for Hawk-AI nodes and may lead to data gaps.' return warn def run_all(self): warnings = self.warnings(self.config) if warnings: print(warnings) header = f"# {self.config.datetime}: {self.config.cmd}\n" if warnings: header += f"{warnings}\n" header += "#\n" for jobid in self.config.jobid: try: power = Power.from_db(self.db, jobid, self.config.interval, self.config.hawk_ai) except RuntimeError: print('No data found for job ID "{}"'.format(jobid)) continue fn = power.to_file(jobid, header) if fn: print('Created file {fn}'.format(fn=fn)) if power.warnings: print(power.warnings) if power.energy: print(power.energy) if __name__ == "__main__": import sys from datetime import datetime config = parse_arguments(sys.argv[1:]) config.cmd = " ".join(sys.argv) config.datetime = f"{datetime.now()}" main = App(config) main.run_all()