Compare commits

...

12 commits

View file

@ -1,7 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import argparse
import numpy as np import pandas as pd
from collections import OrderedDict from collections import OrderedDict
import os.path import os.path
@ -32,13 +32,6 @@ def parse_jobid(s):
class Power: class Power:
def __init__(self, nodes):
self.nodes = nodes
self.epochs = OrderedDict()
self.first_ts = None
self.last_ts = None
self.warnings = ""
@classmethod @classmethod
def from_list(cls, data): def from_list(cls, data):
""" """
@ -46,51 +39,43 @@ class Power:
Assumptions: Assumptions:
- data is sorted by timestamp ascending - data is sorted by timestamp ascending
- for each timestamp, there is the same set of nodes and in the same order
""" """
df = pd.DataFrame(data, columns=['time', 'node', 'power'])
power = cls(df, columns={})
idx_ts = 0; idx_node = 1; idx_value = 2
nodes = list(OrderedDict.fromkeys([line[idx_node] for line in data])) # preserves order of nodes
power = Power(nodes)
values = {}
for l in data:
ts = l[idx_ts]
if ts not in values:
values[ts] = []
value = l[idx_value]
values[ts].append(value)
epochs = values.keys()
for epoch in epochs:
power.insert_epoch(epoch, values[epoch])
# check implicit assumptions: 1) ts/epochs are sorted
e = list(epochs)
k = list(values.keys())
if not e == k:
power.warnings += "# Warning: Unexpected unsorted timestamps.\n"
# check implicit assumptions: 2) each line has #nodes values
nnodes = len(nodes)
for epoch in epochs:
actual = len(values[epoch])
if actual != nnodes:
power.warnings += "# Warning: Unexpected number of nodes ({actual}/{expected})\n".format(actual=actual, expected=nnodes)
break
return power return power
@classmethod @classmethod
def from_db(cls, db, jobid, interval, hawk_ai): def from_db(cls, db, jobid, interval, hawk_ai):
all_list = db.db_to_list(jobid, interval, hawk_ai) df = db.db_to_pf(jobid, interval, hawk_ai)
if not all_list: if df.empty:
raise RuntimeError
power = cls(df, {'time': 'time', 'name': 'node', 'avg': 'power'})
return power
def __init__(self, dataframe, columns={}):
if columns:
dataframe.rename(columns=columns, inplace=True)
_required_cols = {'time', 'node', 'power'}
if not _required_cols.issubset(set(dataframe.columns)):
raise RuntimeError
if not dataframe['time'].is_monotonic_increasing:
raise RuntimeError raise RuntimeError
power = cls.from_list(all_list) by_node = dataframe.groupby('node')
nodes = list(by_node.groups.keys())
epochs = dataframe.groupby('time')
times = list(epochs.groups.keys())
self.dataframe = dataframe
self.nodes = nodes
self.epochs = epochs
self.by_node = by_node
self.first_ts, self.last_ts = times[0], times[-1]
self.warnings = "" # add check for warning, i.e. data gaps due to missing nodes
self.energy = self._summarize_energy()
return power
def to_file(self, jobid, header=""): def to_file(self, jobid, header=""):
"""Dumps power data to file. Returns filename is succesfull and None if unsucessfull.""" """Dumps power data to file. Returns filename is succesfull and None if unsucessfull."""
fname = self.filename(jobid) fname = self.filename(jobid)
@ -99,6 +84,7 @@ class Power:
return None return None
header += self.warnings header += self.warnings
header += self.energy
try: try:
with open(fname, "w+") as f: with open(fname, "w+") as f:
f.write(header + self.header()) f.write(header + self.header())
@ -108,17 +94,11 @@ class Power:
fname = None fname = None
return fname return fname
def insert_epoch(self, ts, values):
self.epochs[ts] = values
if not self.first_ts:
self.first_ts = ts
self.last_ts = ts
def header(self): def header(self):
hd = "# all timestamp have unit miliseconds since unix epoch\n" hd = "# all timestamp have unit miliseconds since unix epoch\n"
hd += "# all power values have unit Watt\n" hd += "# all power values have unit Watt\n"
hd += "timestamp,RESERVED,head_node_power,avg_node_power,median_node_power,min_node_power,max_node_power,std_dev_node_power" hd += "timestamp,RESERVED,head_node_power,avg_node_power,median_node_power,min_node_power,max_node_power,std_dev_sample_node_power"
# add node names here instead # add node names here instead
hd += "," + ",".join(self.nodes) hd += "," + ",".join(self.nodes)
hd += "\n" hd += "\n"
@ -126,31 +106,59 @@ class Power:
def body(self): def body(self):
_body = "" _body = ""
for epoch in self.epochs.items(): for epoch in self.epochs:
_body += self.pretty_print(self.summarize_epoch(epoch)) _body += self.pretty_print(*self.summarize_epoch(epoch))
return _body return _body
def summarize_time(self, ts): def _summarize_time(self, ts):
return ts, "" return Power.to_csv(ts, "")
@staticmethod @staticmethod
def summarize_values(val): def _summarize_values(df):
values = np.asarray(val) values = df['power']
head = values[0] head = values.iloc[0]
min, max = values.min(), values.max() min, max = values.min(), values.max()
avg, stddev = values.mean(), values.std() avg, stddev = values.mean(), values.std()
median = np.median(values) median = values.median()
return head, avg, median, min, max, stddev return Power.to_csv(head, avg, median, min, max, stddev)
def summarize_epoch(self, epoch): def summarize_epoch(self, epoch):
ts, values = epoch ts, values = epoch
return self.summarize_time(ts) \ return self._summarize_time(ts), \
+ self.summarize_values(values) \ self._summarize_values(values), \
+ tuple(values) self._all_values(values)
def _all_values(self, values):
# reindex frame to get all nodes; introduces gaps
values = values[['node', 'power']].set_index('node').reindex(self.nodes)
# hack to_csv() to transpose array
csv = values.to_csv(header=False, index=False, line_terminator=',', na_rep=' ')
csv = csv[:-1] # strip line terminator ',' from end of string
return csv
def energy_total(self):
energy = None
if hasattr(self, "by_node"):
energy = self.by_node.apply(self._energy_node).sum()
return energy
@staticmethod @staticmethod
def pretty_print(args): def _energy_node(group):
return ",".join(str(a) for a in args) + '\n' """Left-sided Riemann sum is enough, as time is lower bound of bucket"""
delta_t = group["time"].diff().shift(-1)/1000. # in seconds
pow = group['power']
return (delta_t * pow).iloc[:-1].sum()
def _summarize_energy(self):
return "# Total energy consumed by job: {energy:.0f} J\n".format(energy=self.energy_total())
@staticmethod
def to_csv(*args):
return ",".join(str(a) for a in args)
@staticmethod
def pretty_print(*args):
return Power.to_csv(*args) + '\n'
def filename(self, jobid): def filename(self, jobid):
fname = "detailed_power_{jobid}.hawk-pbs5.{first}-{last}.csv".format( fname = "detailed_power_{jobid}.hawk-pbs5.{first}-{last}.csv".format(
@ -235,7 +243,7 @@ class App:
if warnings: if warnings:
print(warnings) print(warnings)
header = f"# {config.datetime}: {config.cmd}\n" header = f"# {self.config.datetime}: {self.config.cmd}\n"
if warnings: if warnings:
header += f"{warnings}\n" header += f"{warnings}\n"
header += "#\n" header += "#\n"
@ -252,7 +260,8 @@ class App:
print('Created file {fn}'.format(fn=fn)) print('Created file {fn}'.format(fn=fn))
if power.warnings: if power.warnings:
print(power.warnings) print(power.warnings)
if power.energy:
print(power.energy)
if __name__ == "__main__": if __name__ == "__main__":
import sys import sys