|
|
|
@ -1,7 +1,7 @@
|
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
|
|
|
|
|
import argparse
|
|
|
|
|
import pandas as pd
|
|
|
|
|
import numpy as np
|
|
|
|
|
from collections import OrderedDict
|
|
|
|
|
import os.path
|
|
|
|
|
|
|
|
|
@ -13,12 +13,6 @@ def parse_arguments(args):
|
|
|
|
|
help='Show database querries, etc.')
|
|
|
|
|
parser.add_argument('-t', '--interval', action='store', type=float, default=5.0,
|
|
|
|
|
help="Interval between power values in seconds")
|
|
|
|
|
parser.add_argument('--hawk-ai', action='store_true',
|
|
|
|
|
help="Job did run on Hawk-AI")
|
|
|
|
|
parser.add_argument('--report-nodes', action='store_true',
|
|
|
|
|
help='Report power of all nodes, not just statisitcs across nodes.')
|
|
|
|
|
parser.add_argument('--energy-only', action='store_true',
|
|
|
|
|
help='Return only total consumed energy, not detailed power')
|
|
|
|
|
parser.add_argument('jobid', type=parse_jobid,
|
|
|
|
|
nargs='+',
|
|
|
|
|
help='Job ID such as "2260215" or "2260215.hawk-pbs5"')
|
|
|
|
@ -29,69 +23,62 @@ def parse_jobid(s):
|
|
|
|
|
import re
|
|
|
|
|
hawkpbs = r'.hawk-pbs5'
|
|
|
|
|
jobid = re.sub(hawkpbs, '', s)
|
|
|
|
|
not_allowed = r'[^0-9\[\]]' # Jobid can be more complex than just digits, eg 2444420[201]
|
|
|
|
|
if re.search(not_allowed, jobid):
|
|
|
|
|
if not jobid.isdigit():
|
|
|
|
|
raise argparse.ArgumentTypeError(f'invalid job ID "{s}"')
|
|
|
|
|
return jobid
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Power:
|
|
|
|
|
def __init__(self, nodes, interval=-1):
|
|
|
|
|
self.nodes = nodes
|
|
|
|
|
self.epochs = OrderedDict()
|
|
|
|
|
self.first_ts = None
|
|
|
|
|
self.last_ts = None
|
|
|
|
|
self.interval = interval
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def from_list(cls, data):
|
|
|
|
|
"""
|
|
|
|
|
Returns a Power instance from a list of tuples (timestamp, node, value).
|
|
|
|
|
"""Assumes data is a list of tuples (timestamp, value, node)"""
|
|
|
|
|
nodes = list(set([line[2] for line in data]))
|
|
|
|
|
cls = Power(nodes)
|
|
|
|
|
|
|
|
|
|
Assumptions:
|
|
|
|
|
- data is sorted by timestamp ascending
|
|
|
|
|
"""
|
|
|
|
|
df = pd.DataFrame(data, columns=['time', 'node', 'power'])
|
|
|
|
|
power = cls(df, columns={})
|
|
|
|
|
# for now ignore order to nodes
|
|
|
|
|
values = {}
|
|
|
|
|
for l in data:
|
|
|
|
|
ts = l[0]
|
|
|
|
|
if ts not in values:
|
|
|
|
|
values[ts] = []
|
|
|
|
|
# node = l[1]
|
|
|
|
|
power = l[1]
|
|
|
|
|
values[ts].append(power)
|
|
|
|
|
|
|
|
|
|
return power
|
|
|
|
|
epochs = sorted(values.keys())
|
|
|
|
|
for epoch in epochs:
|
|
|
|
|
cls.insert_epoch(epoch, values[epoch])
|
|
|
|
|
|
|
|
|
|
return cls
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def from_db(cls, db, jobid, interval, hawk_ai):
|
|
|
|
|
df = db.db_to_pf(jobid, interval, hawk_ai)
|
|
|
|
|
if df.empty:
|
|
|
|
|
def from_db(cls, db, jobid, interval):
|
|
|
|
|
all_list = db.db_to_list(jobid, interval)
|
|
|
|
|
if not all_list:
|
|
|
|
|
raise RuntimeError
|
|
|
|
|
power = cls(df, {'time': 'time', 'name': 'node', 'avg': 'power'})
|
|
|
|
|
|
|
|
|
|
power = cls.from_list(all_list)
|
|
|
|
|
power.set_interval(interval*1000) # milliseconds
|
|
|
|
|
|
|
|
|
|
return power
|
|
|
|
|
|
|
|
|
|
def __init__(self, dataframe, columns={}):
|
|
|
|
|
if columns:
|
|
|
|
|
dataframe.rename(columns=columns, inplace=True)
|
|
|
|
|
_required_cols = {'time', 'node', 'power'}
|
|
|
|
|
if not _required_cols.issubset(set(dataframe.columns)):
|
|
|
|
|
raise RuntimeError
|
|
|
|
|
if not dataframe['time'].is_monotonic_increasing:
|
|
|
|
|
raise RuntimeError
|
|
|
|
|
|
|
|
|
|
by_node = dataframe.groupby('node')
|
|
|
|
|
nodes = list(by_node.groups.keys())
|
|
|
|
|
epochs = dataframe.groupby('time')
|
|
|
|
|
times = list(epochs.groups.keys())
|
|
|
|
|
|
|
|
|
|
self.dataframe = dataframe
|
|
|
|
|
self.nodes = nodes
|
|
|
|
|
self.epochs = epochs
|
|
|
|
|
self.by_node = by_node
|
|
|
|
|
self.first_ts, self.last_ts = times[0], times[-1]
|
|
|
|
|
self.warnings = "" # add check for warning, i.e. data gaps due to missing nodes
|
|
|
|
|
self.energy = self._summarize_energy()
|
|
|
|
|
|
|
|
|
|
def to_file(self, jobid, header=""):
|
|
|
|
|
def to_file(self, jobid):
|
|
|
|
|
"""Dumps power data to file. Returns filename is succesfull and None if unsucessfull."""
|
|
|
|
|
fname = self.filename(jobid)
|
|
|
|
|
if os.path.exists(fname):
|
|
|
|
|
print("Error: cowardly refusing to overwrite file ", fname)
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
header += self.warnings
|
|
|
|
|
header += self.energy
|
|
|
|
|
try:
|
|
|
|
|
with open(fname, "w+") as f:
|
|
|
|
|
f.write(header + self.header())
|
|
|
|
|
f.write(self.header())
|
|
|
|
|
f.write(self.body())
|
|
|
|
|
except IOError:
|
|
|
|
|
print("Error: could not write to file ", fname)
|
|
|
|
@ -99,69 +86,50 @@ class Power:
|
|
|
|
|
|
|
|
|
|
return fname
|
|
|
|
|
|
|
|
|
|
def set_interval(self, interval):
|
|
|
|
|
self.interval = interval
|
|
|
|
|
|
|
|
|
|
def insert_epoch(self, ts, values):
|
|
|
|
|
self.epochs[ts] = values
|
|
|
|
|
if not self.first_ts:
|
|
|
|
|
self.first_ts = ts
|
|
|
|
|
self.last_ts = ts
|
|
|
|
|
|
|
|
|
|
def header(self):
|
|
|
|
|
hd = "# all timestamp have unit seconds since unix epoch\n"
|
|
|
|
|
hd = "# all timestamp have unit miliseconds since unix epoch\n"
|
|
|
|
|
hd += "# all power values have unit Watt\n"
|
|
|
|
|
hd += "timestamp,RESERVED,head_node_power,avg_node_power,median_node_power,min_node_power,max_node_power,std_dev_sample_node_power"
|
|
|
|
|
hd += "," + ",".join(self.nodes)
|
|
|
|
|
hd += "\n"
|
|
|
|
|
hd += "timestamp,delta_t,head_node_power,avg_node_power,median_node_power,min_node_power,max_node_power,std_dev_node_power"
|
|
|
|
|
# add node names here instead
|
|
|
|
|
hd += ",NO_NODE_NAMES_YET\n"
|
|
|
|
|
return hd
|
|
|
|
|
|
|
|
|
|
def body(self):
|
|
|
|
|
_body = ""
|
|
|
|
|
for epoch in self.epochs:
|
|
|
|
|
_body += self.pretty_print(*self.summarize_epoch(epoch))
|
|
|
|
|
for epoch in self.epochs.items():
|
|
|
|
|
_body += self.pretty_print(self.summarize_epoch(epoch))
|
|
|
|
|
return _body
|
|
|
|
|
|
|
|
|
|
def _summarize_time(self, ts):
|
|
|
|
|
return Power.to_csv(ts/1000., "") # time in seconds
|
|
|
|
|
def summarize_time(self, ts):
|
|
|
|
|
return ts, self.interval
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _summarize_values(df):
|
|
|
|
|
values = df['power']
|
|
|
|
|
head = values.iloc[0]
|
|
|
|
|
def summarize_values(val):
|
|
|
|
|
values = np.asarray(val)
|
|
|
|
|
head = values[0]
|
|
|
|
|
min, max = values.min(), values.max()
|
|
|
|
|
avg, stddev = values.mean(), values.std()
|
|
|
|
|
median = values.median()
|
|
|
|
|
return Power.to_csv(head, avg, median, min, max, stddev)
|
|
|
|
|
median = np.median(values)
|
|
|
|
|
return head, avg, median, min, max, stddev
|
|
|
|
|
|
|
|
|
|
def summarize_epoch(self, epoch):
|
|
|
|
|
ts, values = epoch
|
|
|
|
|
return self._summarize_time(ts), \
|
|
|
|
|
self._summarize_values(values), \
|
|
|
|
|
self._all_values(values)
|
|
|
|
|
|
|
|
|
|
def _all_values(self, values):
|
|
|
|
|
# reindex frame to get all nodes; introduces gaps
|
|
|
|
|
values = values[['node', 'power']].set_index('node').reindex(self.nodes)
|
|
|
|
|
# hack to_csv() to transpose array
|
|
|
|
|
csv = values.to_csv(header=False, index=False, line_terminator=',', na_rep=' ')
|
|
|
|
|
csv = csv[:-1] # strip line terminator ',' from end of string
|
|
|
|
|
return csv
|
|
|
|
|
|
|
|
|
|
def energy_total(self):
|
|
|
|
|
energy = None
|
|
|
|
|
if hasattr(self, "by_node"):
|
|
|
|
|
energy = self.by_node.apply(self._energy_node).sum()
|
|
|
|
|
return energy
|
|
|
|
|
return self.summarize_time(ts) \
|
|
|
|
|
+ self.summarize_values(values)
|
|
|
|
|
# + values
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def _energy_node(group):
|
|
|
|
|
"""Left-sided Riemann sum is enough, as time is lower bound of bucket"""
|
|
|
|
|
delta_t = group["time"].diff().shift(-1)/1000. # in seconds
|
|
|
|
|
pow = group['power']
|
|
|
|
|
return (delta_t * pow).iloc[:-1].sum()
|
|
|
|
|
|
|
|
|
|
def _summarize_energy(self):
|
|
|
|
|
return "# Total energy consumed by job: {energy:.0f} J\n".format(energy=self.energy_total())
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def to_csv(*args):
|
|
|
|
|
return ",".join(str(a) for a in args)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def pretty_print(*args):
|
|
|
|
|
return Power.to_csv(*args) + '\n'
|
|
|
|
|
def pretty_print(args):
|
|
|
|
|
return ",".join(str(a) for a in args) + '\n'
|
|
|
|
|
|
|
|
|
|
def filename(self, jobid):
|
|
|
|
|
fname = "detailed_power_{jobid}.hawk-pbs5.{first}-{last}.csv".format(
|
|
|
|
@ -171,36 +139,13 @@ class Power:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MonitoringDB:
|
|
|
|
|
QUERY_STRING_HAWK = """
|
|
|
|
|
-- For description of get_job_data(), see https://kb.hlrs.de/monitoring/index.php/TimescaleDB_-_Query_Guidelines#Function:_get_job_data_and_get_ai_job_data
|
|
|
|
|
select * from get_job_data(
|
|
|
|
|
'{jobid}.hawk-pbs5',
|
|
|
|
|
'cmc_power_racktraynodepoweravg', -- power data source
|
|
|
|
|
'{interval} seconds',
|
|
|
|
|
array['avg'], -- aggregation: average across samples in bucket
|
|
|
|
|
array['time','node'] -- sort by time first than node (ascending)
|
|
|
|
|
)
|
|
|
|
|
as t(time bigint, name varchar, avg double precision);
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
QUERY_STRING_HAWK_AI = """
|
|
|
|
|
-- For description of get_ai_job_data(), see https://kb.hlrs.de/monitoring/index.php/TimescaleDB_-_Query_Guidelines#Function:_get_job_data_and_get_ai_job_data
|
|
|
|
|
select * from get_ai_job_data(
|
|
|
|
|
'{jobid}.hawk-pbs5',
|
|
|
|
|
'telegraf_ipmi_power_meter', -- power data source
|
|
|
|
|
'{interval} seconds',
|
|
|
|
|
array['avg'], -- aggregation: average across samples in bucket
|
|
|
|
|
array['time','node'] -- sort by time first than node (ascending)
|
|
|
|
|
)
|
|
|
|
|
as t(time bigint, name varchar, avg double precision);
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, verbose):
|
|
|
|
|
self.connection = self.init_db(verbose)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def init_db(verbose):
|
|
|
|
|
import sqlalchemy as db
|
|
|
|
|
_verbose = False #True
|
|
|
|
|
engine = db.create_engine('postgresql://hpc@hawk-monitor4:5432/coe_mon', echo=verbose)
|
|
|
|
|
connection = engine.connect()
|
|
|
|
|
return connection
|
|
|
|
@ -208,22 +153,44 @@ as t(time bigint, name varchar, avg double precision);
|
|
|
|
|
def close_db(self):
|
|
|
|
|
return self.connection.close()
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def build_query(cls, jobid, interval, hawk_ai):
|
|
|
|
|
@staticmethod
|
|
|
|
|
def build_query(jobid, interval):
|
|
|
|
|
import sqlalchemy as db
|
|
|
|
|
if hawk_ai:
|
|
|
|
|
query_string = cls.QUERY_STRING_HAWK_AI
|
|
|
|
|
else:
|
|
|
|
|
query_string = cls.QUERY_STRING_HAWK
|
|
|
|
|
query_string = """with job as (
|
|
|
|
|
select job_id, starttime, endtime, nodes from jobs where job_id='{jobid}.hawk-pbs5'
|
|
|
|
|
),
|
|
|
|
|
node_series as(
|
|
|
|
|
select n.name, scmcavg.id as series_id from nodes n
|
|
|
|
|
inner join (select * from label where key='node') l on n.id = l.value::int
|
|
|
|
|
inner join series_cmc_power_racktraynodepoweravg scmcavg on l.id = scmcavg.labels[(
|
|
|
|
|
select pos from label_key_position
|
|
|
|
|
where metric_category= 'cmc_power'
|
|
|
|
|
and metric_name = 'RackTrayNodePowerAvg'
|
|
|
|
|
and key = 'node'
|
|
|
|
|
)]
|
|
|
|
|
where n.id = any((select nodes from job)::int[])
|
|
|
|
|
)
|
|
|
|
|
select a.time, a.value, ns.name from (
|
|
|
|
|
select
|
|
|
|
|
time_bucket(extract ('epoch' from '{interval} seconds'::interval)::int*1000, cmcavg.ts) as time,
|
|
|
|
|
cmcavg.series_id::varchar,
|
|
|
|
|
avg(cmcavg.val) AS value
|
|
|
|
|
from cmc_power_racktraynodepoweravg cmcavg
|
|
|
|
|
where
|
|
|
|
|
ts <= (select endtime from job)
|
|
|
|
|
and ts >= (select starttime from job)
|
|
|
|
|
and series_id = Any(select series_id from node_series)
|
|
|
|
|
group by time, cmcavg.series_id order by time desc) a
|
|
|
|
|
inner join node_series ns on a.series_id::int = ns.series_id;
|
|
|
|
|
"""
|
|
|
|
|
return db.text(query_string.format(jobid=jobid, interval=interval))
|
|
|
|
|
|
|
|
|
|
def db_to_list(self, jobid, interval, hawk_ai):
|
|
|
|
|
query = self.build_query(jobid, interval, hawk_ai)
|
|
|
|
|
def db_to_list(self, jobid, interval):
|
|
|
|
|
query = self.build_query(jobid, interval)
|
|
|
|
|
return self.connection.execute(query).fetchall()
|
|
|
|
|
|
|
|
|
|
def db_to_pf(self, jobid, interval, hawk_ai):
|
|
|
|
|
import pandas as pd
|
|
|
|
|
query = self.build_query(jobid, interval, hawk_ai)
|
|
|
|
|
def db_to_pf(self, jobid, inerval):
|
|
|
|
|
query = self.build_query(jobid, interval)
|
|
|
|
|
return pd.read_sql(query, con=self.connection)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -231,59 +198,23 @@ class App:
|
|
|
|
|
def __init__(self, config):
|
|
|
|
|
self.config = config
|
|
|
|
|
self.db = MonitoringDB(self.config.verbose)
|
|
|
|
|
if not self.config.report_nodes:
|
|
|
|
|
# Monkey patching class Power; what a hack!
|
|
|
|
|
Power._all_values = lambda s,x: ""
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def warnings(config):
|
|
|
|
|
warn = ""
|
|
|
|
|
if not config.hawk_ai and config.interval < 5:
|
|
|
|
|
warn += '# Warning: interval<5 is very small and may lead to data gaps.'
|
|
|
|
|
if config.hawk_ai and config.interval < 60:
|
|
|
|
|
warn += '# Warning: interval<60 is very small for Hawk-AI nodes and may lead to data gaps.'
|
|
|
|
|
return warn
|
|
|
|
|
|
|
|
|
|
def run_all(self):
|
|
|
|
|
warnings = self.warnings(self.config)
|
|
|
|
|
if warnings:
|
|
|
|
|
print(warnings)
|
|
|
|
|
|
|
|
|
|
header = f"# {self.config.datetime}: {self.config.cmd}\n"
|
|
|
|
|
if warnings:
|
|
|
|
|
header += f"{warnings}\n"
|
|
|
|
|
header += "#\n"
|
|
|
|
|
|
|
|
|
|
for jobid in self.config.jobid:
|
|
|
|
|
try:
|
|
|
|
|
power = Power.from_db(self.db, jobid, self.config.interval, self.config.hawk_ai)
|
|
|
|
|
power = Power.from_db(self.db, jobid, self.config.interval)
|
|
|
|
|
except RuntimeError:
|
|
|
|
|
print('No data found for job ID "{}"'.format(jobid))
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if self.config.energy_only:
|
|
|
|
|
if power.energy:
|
|
|
|
|
print(power.energy)
|
|
|
|
|
else:
|
|
|
|
|
print('No total energy for job ID "{}"'.format(jobid))
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
fn = power.to_file(jobid, header)
|
|
|
|
|
fn = power.to_file(jobid)
|
|
|
|
|
if fn:
|
|
|
|
|
print('Created file {fn}'.format(fn=fn))
|
|
|
|
|
if power.warnings:
|
|
|
|
|
print(power.warnings)
|
|
|
|
|
if power.energy:
|
|
|
|
|
print(power.energy)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
import sys
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
config = parse_arguments(sys.argv[1:])
|
|
|
|
|
config.cmd = " ".join(sys.argv)
|
|
|
|
|
config.datetime = f"{datetime.now()}"
|
|
|
|
|
if os.path.basename(__file__) == 'get_energy.py':
|
|
|
|
|
config.energy_only = True
|
|
|
|
|
|
|
|
|
|
main = App(config)
|
|
|
|
|
main.run_all()
|
|
|
|
|