#!/usr/bin/env python3 import argparse import pandas as pd from collections import OrderedDict import os.path def parse_arguments(args): parser = argparse.ArgumentParser( description='Produce detailed power usage data for a list of jobids.') parser.add_argument('-v', '--verbose', action='store_true', help='Show database querries, etc.') parser.add_argument('-t', '--interval', action='store', type=float, default=5.0, help="Interval between power values in seconds") parser.add_argument('--hawk-ai', action='store_true', help="Job did run on Hawk-AI") parser.add_argument('--report-nodes', action='store_true', help='Report power of all nodes, not just statisitcs across nodes.') parser.add_argument('--energy-only', action='store_true', help='Return only total consumed energy, not detailed power') parser.add_argument('jobid', type=parse_jobid, nargs='+', help='Job ID such as "2260215" or "2260215.hawk-pbs5"') return parser.parse_args(args) def parse_jobid(s): import re hawkpbs = r'.hawk-pbs5' jobid = re.sub(hawkpbs, '', s) not_allowed = r'[^0-9\[\]]' # Jobid can be more complex than just digits, eg 2444420[201] if re.search(not_allowed, jobid): raise argparse.ArgumentTypeError(f'invalid job ID "{s}"') return jobid class Power: @classmethod def from_list(cls, data): """ Returns a Power instance from a list of tuples (timestamp, node, value). Assumptions: - data is sorted by timestamp ascending """ df = pd.DataFrame(data, columns=['time', 'node', 'power']) power = cls(df, columns={}) return power @classmethod def from_db(cls, db, jobid, interval, hawk_ai): df = db.db_to_pf(jobid, interval, hawk_ai) if df.empty: raise RuntimeError power = cls(df, {'time': 'time', 'name': 'node', 'avg': 'power'}) return power def __init__(self, dataframe, columns={}): if columns: dataframe.rename(columns=columns, inplace=True) _required_cols = {'time', 'node', 'power'} if not _required_cols.issubset(set(dataframe.columns)): raise RuntimeError if not dataframe['time'].is_monotonic_increasing: raise RuntimeError by_node = dataframe.groupby('node') nodes = list(by_node.groups.keys()) epochs = dataframe.groupby('time') times = list(epochs.groups.keys()) self.dataframe = dataframe self.nodes = nodes self.epochs = epochs self.by_node = by_node self.first_ts, self.last_ts = times[0], times[-1] self.warnings = "" # add check for warning, i.e. data gaps due to missing nodes self.energy = self._summarize_energy() def to_file(self, jobid, header=""): """Dumps power data to file. Returns filename is succesfull and None if unsucessfull.""" fname = self.filename(jobid) if os.path.exists(fname): print("Error: cowardly refusing to overwrite file ", fname) return None header += self.warnings header += self.energy try: with open(fname, "w+") as f: f.write(header + self.header()) f.write(self.body()) except IOError: print("Error: could not write to file ", fname) fname = None return fname def header(self): hd = "# all timestamp have unit seconds since unix epoch\n" hd += "# all power values have unit Watt\n" hd += "timestamp,RESERVED,head_node_power,avg_node_power,median_node_power,min_node_power,max_node_power,std_dev_sample_node_power" hd += "," + ",".join(self.nodes) hd += "\n" return hd def body(self): _body = "" for epoch in self.epochs: _body += self.pretty_print(*self.summarize_epoch(epoch)) return _body def _summarize_time(self, ts): return Power.to_csv(ts/1000., "") # time in seconds @staticmethod def _summarize_values(df): values = df['power'] head = values.iloc[0] min, max = values.min(), values.max() avg, stddev = values.mean(), values.std() median = values.median() return Power.to_csv(head, avg, median, min, max, stddev) def summarize_epoch(self, epoch): ts, values = epoch return self._summarize_time(ts), \ self._summarize_values(values), \ self._all_values(values) def _all_values(self, values): # reindex frame to get all nodes; introduces gaps values = values[['node', 'power']].set_index('node').reindex(self.nodes) # hack to_csv() to transpose array csv = values.to_csv(header=False, index=False, line_terminator=',', na_rep=' ') csv = csv[:-1] # strip line terminator ',' from end of string return csv def energy_total(self): energy = None if hasattr(self, "by_node"): energy = self.by_node.apply(self._energy_node).sum() return energy @staticmethod def _energy_node(group): """Left-sided Riemann sum is enough, as time is lower bound of bucket""" delta_t = group["time"].diff().shift(-1)/1000. # in seconds pow = group['power'] return (delta_t * pow).iloc[:-1].sum() def _summarize_energy(self): return "# Total energy consumed by job: {energy:.0f} J\n".format(energy=self.energy_total()) @staticmethod def to_csv(*args): return ",".join(str(a) for a in args) @staticmethod def pretty_print(*args): return Power.to_csv(*args) + '\n' def filename(self, jobid): fname = "detailed_power_{jobid}.hawk-pbs5.{first}-{last}.csv".format( jobid=jobid, first=self.first_ts, last=self.last_ts ) return fname class MonitoringDB: QUERY_STRING_HAWK = """ -- For description of get_job_data(), see https://kb.hlrs.de/monitoring/index.php/TimescaleDB_-_Query_Guidelines#Function:_get_job_data_and_get_ai_job_data select * from get_job_data( '{jobid}.hawk-pbs5', 'cmc_power_racktraynodepoweravg', -- power data source '{interval} seconds', array['avg'], -- aggregation: average across samples in bucket array['time','node'] -- sort by time first than node (ascending) ) as t(time bigint, name varchar, avg double precision); """ QUERY_STRING_HAWK_AI = """ -- For description of get_ai_job_data(), see https://kb.hlrs.de/monitoring/index.php/TimescaleDB_-_Query_Guidelines#Function:_get_job_data_and_get_ai_job_data select * from get_ai_job_data( '{jobid}.hawk-pbs5', 'telegraf_ipmi_power_meter', -- power data source '{interval} seconds', array['avg'], -- aggregation: average across samples in bucket array['time','node'] -- sort by time first than node (ascending) ) as t(time bigint, name varchar, avg double precision); """ def __init__(self, verbose): self.connection = self.init_db(verbose) @staticmethod def init_db(verbose): import sqlalchemy as db engine = db.create_engine('postgresql://hpc@hawk-monitor4:5432/coe_mon', echo=verbose) connection = engine.connect() return connection def close_db(self): return self.connection.close() @classmethod def build_query(cls, jobid, interval, hawk_ai): import sqlalchemy as db if hawk_ai: query_string = cls.QUERY_STRING_HAWK_AI else: query_string = cls.QUERY_STRING_HAWK return db.text(query_string.format(jobid=jobid, interval=interval)) def db_to_list(self, jobid, interval, hawk_ai): query = self.build_query(jobid, interval, hawk_ai) return self.connection.execute(query).fetchall() def db_to_pf(self, jobid, interval, hawk_ai): import pandas as pd query = self.build_query(jobid, interval, hawk_ai) return pd.read_sql(query, con=self.connection) class App: def __init__(self, config): self.config = config self.db = MonitoringDB(self.config.verbose) if not self.config.report_nodes: # Monkey patching class Power; what a hack! Power._all_values = lambda s,x: "" @staticmethod def warnings(config): warn = "" if not config.hawk_ai and config.interval < 5: warn += '# Warning: interval<5 is very small and may lead to data gaps.' if config.hawk_ai and config.interval < 60: warn += '# Warning: interval<60 is very small for Hawk-AI nodes and may lead to data gaps.' return warn def run_all(self): warnings = self.warnings(self.config) if warnings: print(warnings) header = f"# {self.config.datetime}: {self.config.cmd}\n" if warnings: header += f"{warnings}\n" header += "#\n" for jobid in self.config.jobid: try: power = Power.from_db(self.db, jobid, self.config.interval, self.config.hawk_ai) except RuntimeError: print('No data found for job ID "{}"'.format(jobid)) continue if self.config.energy_only: if power.energy: print(power.energy) else: print('No total energy for job ID "{}"'.format(jobid)) continue fn = power.to_file(jobid, header) if fn: print('Created file {fn}'.format(fn=fn)) if power.warnings: print(power.warnings) if power.energy: print(power.energy) if __name__ == "__main__": import sys from datetime import datetime config = parse_arguments(sys.argv[1:]) config.cmd = " ".join(sys.argv) config.datetime = f"{datetime.now()}" if os.path.basename(__file__) == 'get_energy.py': config.energy_only = True main = App(config) main.run_all()