Compare commits
12 commits
f8b0f86f22
...
7f0228a905
Author | SHA1 | Date | |
---|---|---|---|
7f0228a905 | |||
bc6a5a3018 | |||
2436928e7a | |||
d1bae309a9 | |||
711daa3a5d | |||
c2b5732be8 | |||
20cf200053 | |||
277b3e936e | |||
5b07b407de | |||
c22959f4b9 | |||
eeba6f7942 | |||
d355d7b348 |
1 changed files with 76 additions and 67 deletions
|
@ -1,7 +1,7 @@
|
||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import numpy as np
|
import pandas as pd
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
import os.path
|
import os.path
|
||||||
|
|
||||||
|
@ -32,13 +32,6 @@ def parse_jobid(s):
|
||||||
|
|
||||||
|
|
||||||
class Power:
|
class Power:
|
||||||
def __init__(self, nodes):
|
|
||||||
self.nodes = nodes
|
|
||||||
self.epochs = OrderedDict()
|
|
||||||
self.first_ts = None
|
|
||||||
self.last_ts = None
|
|
||||||
self.warnings = ""
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_list(cls, data):
|
def from_list(cls, data):
|
||||||
"""
|
"""
|
||||||
|
@ -46,51 +39,43 @@ class Power:
|
||||||
|
|
||||||
Assumptions:
|
Assumptions:
|
||||||
- data is sorted by timestamp ascending
|
- data is sorted by timestamp ascending
|
||||||
- for each timestamp, there is the same set of nodes and in the same order
|
|
||||||
"""
|
"""
|
||||||
|
df = pd.DataFrame(data, columns=['time', 'node', 'power'])
|
||||||
idx_ts = 0; idx_node = 1; idx_value = 2
|
power = cls(df, columns={})
|
||||||
nodes = list(OrderedDict.fromkeys([line[idx_node] for line in data])) # preserves order of nodes
|
|
||||||
power = Power(nodes)
|
|
||||||
|
|
||||||
values = {}
|
|
||||||
for l in data:
|
|
||||||
ts = l[idx_ts]
|
|
||||||
if ts not in values:
|
|
||||||
values[ts] = []
|
|
||||||
value = l[idx_value]
|
|
||||||
values[ts].append(value)
|
|
||||||
|
|
||||||
epochs = values.keys()
|
|
||||||
for epoch in epochs:
|
|
||||||
power.insert_epoch(epoch, values[epoch])
|
|
||||||
|
|
||||||
# check implicit assumptions: 1) ts/epochs are sorted
|
|
||||||
e = list(epochs)
|
|
||||||
k = list(values.keys())
|
|
||||||
if not e == k:
|
|
||||||
power.warnings += "# Warning: Unexpected unsorted timestamps.\n"
|
|
||||||
|
|
||||||
# check implicit assumptions: 2) each line has #nodes values
|
|
||||||
nnodes = len(nodes)
|
|
||||||
for epoch in epochs:
|
|
||||||
actual = len(values[epoch])
|
|
||||||
if actual != nnodes:
|
|
||||||
power.warnings += "# Warning: Unexpected number of nodes ({actual}/{expected})\n".format(actual=actual, expected=nnodes)
|
|
||||||
break
|
|
||||||
|
|
||||||
return power
|
return power
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_db(cls, db, jobid, interval, hawk_ai):
|
def from_db(cls, db, jobid, interval, hawk_ai):
|
||||||
all_list = db.db_to_list(jobid, interval, hawk_ai)
|
df = db.db_to_pf(jobid, interval, hawk_ai)
|
||||||
if not all_list:
|
if df.empty:
|
||||||
raise RuntimeError
|
raise RuntimeError
|
||||||
|
power = cls(df, {'time': 'time', 'name': 'node', 'avg': 'power'})
|
||||||
power = cls.from_list(all_list)
|
|
||||||
|
|
||||||
return power
|
return power
|
||||||
|
|
||||||
|
def __init__(self, dataframe, columns={}):
|
||||||
|
if columns:
|
||||||
|
dataframe.rename(columns=columns, inplace=True)
|
||||||
|
_required_cols = {'time', 'node', 'power'}
|
||||||
|
if not _required_cols.issubset(set(dataframe.columns)):
|
||||||
|
raise RuntimeError
|
||||||
|
if not dataframe['time'].is_monotonic_increasing:
|
||||||
|
raise RuntimeError
|
||||||
|
|
||||||
|
by_node = dataframe.groupby('node')
|
||||||
|
nodes = list(by_node.groups.keys())
|
||||||
|
epochs = dataframe.groupby('time')
|
||||||
|
times = list(epochs.groups.keys())
|
||||||
|
|
||||||
|
self.dataframe = dataframe
|
||||||
|
self.nodes = nodes
|
||||||
|
self.epochs = epochs
|
||||||
|
self.by_node = by_node
|
||||||
|
self.first_ts, self.last_ts = times[0], times[-1]
|
||||||
|
self.warnings = "" # add check for warning, i.e. data gaps due to missing nodes
|
||||||
|
self.energy = self._summarize_energy()
|
||||||
|
|
||||||
def to_file(self, jobid, header=""):
|
def to_file(self, jobid, header=""):
|
||||||
"""Dumps power data to file. Returns filename is succesfull and None if unsucessfull."""
|
"""Dumps power data to file. Returns filename is succesfull and None if unsucessfull."""
|
||||||
fname = self.filename(jobid)
|
fname = self.filename(jobid)
|
||||||
|
@ -99,6 +84,7 @@ class Power:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
header += self.warnings
|
header += self.warnings
|
||||||
|
header += self.energy
|
||||||
try:
|
try:
|
||||||
with open(fname, "w+") as f:
|
with open(fname, "w+") as f:
|
||||||
f.write(header + self.header())
|
f.write(header + self.header())
|
||||||
|
@ -109,16 +95,10 @@ class Power:
|
||||||
|
|
||||||
return fname
|
return fname
|
||||||
|
|
||||||
def insert_epoch(self, ts, values):
|
|
||||||
self.epochs[ts] = values
|
|
||||||
if not self.first_ts:
|
|
||||||
self.first_ts = ts
|
|
||||||
self.last_ts = ts
|
|
||||||
|
|
||||||
def header(self):
|
def header(self):
|
||||||
hd = "# all timestamp have unit miliseconds since unix epoch\n"
|
hd = "# all timestamp have unit miliseconds since unix epoch\n"
|
||||||
hd += "# all power values have unit Watt\n"
|
hd += "# all power values have unit Watt\n"
|
||||||
hd += "timestamp,RESERVED,head_node_power,avg_node_power,median_node_power,min_node_power,max_node_power,std_dev_node_power"
|
hd += "timestamp,RESERVED,head_node_power,avg_node_power,median_node_power,min_node_power,max_node_power,std_dev_sample_node_power"
|
||||||
# add node names here instead
|
# add node names here instead
|
||||||
hd += "," + ",".join(self.nodes)
|
hd += "," + ",".join(self.nodes)
|
||||||
hd += "\n"
|
hd += "\n"
|
||||||
|
@ -126,31 +106,59 @@ class Power:
|
||||||
|
|
||||||
def body(self):
|
def body(self):
|
||||||
_body = ""
|
_body = ""
|
||||||
for epoch in self.epochs.items():
|
for epoch in self.epochs:
|
||||||
_body += self.pretty_print(self.summarize_epoch(epoch))
|
_body += self.pretty_print(*self.summarize_epoch(epoch))
|
||||||
return _body
|
return _body
|
||||||
|
|
||||||
def summarize_time(self, ts):
|
def _summarize_time(self, ts):
|
||||||
return ts, ""
|
return Power.to_csv(ts, "")
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def summarize_values(val):
|
def _summarize_values(df):
|
||||||
values = np.asarray(val)
|
values = df['power']
|
||||||
head = values[0]
|
head = values.iloc[0]
|
||||||
min, max = values.min(), values.max()
|
min, max = values.min(), values.max()
|
||||||
avg, stddev = values.mean(), values.std()
|
avg, stddev = values.mean(), values.std()
|
||||||
median = np.median(values)
|
median = values.median()
|
||||||
return head, avg, median, min, max, stddev
|
return Power.to_csv(head, avg, median, min, max, stddev)
|
||||||
|
|
||||||
def summarize_epoch(self, epoch):
|
def summarize_epoch(self, epoch):
|
||||||
ts, values = epoch
|
ts, values = epoch
|
||||||
return self.summarize_time(ts) \
|
return self._summarize_time(ts), \
|
||||||
+ self.summarize_values(values) \
|
self._summarize_values(values), \
|
||||||
+ tuple(values)
|
self._all_values(values)
|
||||||
|
|
||||||
|
def _all_values(self, values):
|
||||||
|
# reindex frame to get all nodes; introduces gaps
|
||||||
|
values = values[['node', 'power']].set_index('node').reindex(self.nodes)
|
||||||
|
# hack to_csv() to transpose array
|
||||||
|
csv = values.to_csv(header=False, index=False, line_terminator=',', na_rep=' ')
|
||||||
|
csv = csv[:-1] # strip line terminator ',' from end of string
|
||||||
|
return csv
|
||||||
|
|
||||||
|
def energy_total(self):
|
||||||
|
energy = None
|
||||||
|
if hasattr(self, "by_node"):
|
||||||
|
energy = self.by_node.apply(self._energy_node).sum()
|
||||||
|
return energy
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def pretty_print(args):
|
def _energy_node(group):
|
||||||
return ",".join(str(a) for a in args) + '\n'
|
"""Left-sided Riemann sum is enough, as time is lower bound of bucket"""
|
||||||
|
delta_t = group["time"].diff().shift(-1)/1000. # in seconds
|
||||||
|
pow = group['power']
|
||||||
|
return (delta_t * pow).iloc[:-1].sum()
|
||||||
|
|
||||||
|
def _summarize_energy(self):
|
||||||
|
return "# Total energy consumed by job: {energy:.0f} J\n".format(energy=self.energy_total())
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def to_csv(*args):
|
||||||
|
return ",".join(str(a) for a in args)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def pretty_print(*args):
|
||||||
|
return Power.to_csv(*args) + '\n'
|
||||||
|
|
||||||
def filename(self, jobid):
|
def filename(self, jobid):
|
||||||
fname = "detailed_power_{jobid}.hawk-pbs5.{first}-{last}.csv".format(
|
fname = "detailed_power_{jobid}.hawk-pbs5.{first}-{last}.csv".format(
|
||||||
|
@ -235,7 +243,7 @@ class App:
|
||||||
if warnings:
|
if warnings:
|
||||||
print(warnings)
|
print(warnings)
|
||||||
|
|
||||||
header = f"# {config.datetime}: {config.cmd}\n"
|
header = f"# {self.config.datetime}: {self.config.cmd}\n"
|
||||||
if warnings:
|
if warnings:
|
||||||
header += f"{warnings}\n"
|
header += f"{warnings}\n"
|
||||||
header += "#\n"
|
header += "#\n"
|
||||||
|
@ -252,7 +260,8 @@ class App:
|
||||||
print('Created file {fn}'.format(fn=fn))
|
print('Created file {fn}'.format(fn=fn))
|
||||||
if power.warnings:
|
if power.warnings:
|
||||||
print(power.warnings)
|
print(power.warnings)
|
||||||
|
if power.energy:
|
||||||
|
print(power.energy)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
import sys
|
import sys
|
||||||
|
|
Loading…
Reference in a new issue