Compare commits

...

11 commits

2 changed files with 141 additions and 126 deletions

View file

@ -16,7 +16,7 @@ Requirements:
```bash ```bash
ssh monitoring ssh monitoring
./get_detailed_power.py 234556 767869.hawk-pbs5 ./get_detailed_power.py 2335001 7678069.hawk-pbs5
Saved file detailed_power.234556.hawk-pbs5.100000-1000004.csv Created file detailed_power.2335001.hawk-pbs5.100000-1000004.csv
Saved file detailed_power.767869.hawk-pbs5.2432345-423423.csv Created file detailed_power.7678069.hawk-pbs5.2432345-423423.csv
``` ```

View file

@ -1,3 +1,4 @@
import argparse
import numpy as np import numpy as np
from collections import OrderedDict from collections import OrderedDict
import os.path import os.path
@ -9,8 +10,8 @@ def parse_arguments():
parser.add_argument('-v', '--verbose', action='store_true', parser.add_argument('-v', '--verbose', action='store_true',
help='Show database querries, etc.') help='Show database querries, etc.')
parser.add_argument('jobid', type=parse_jobid, parser.add_argument('jobid', type=parse_jobid,
# nargs='+', nargs='+',
help='Job ID such as "2260215" or "226015.hawk-pbs5"') help='Job ID such as "2260215" or "2260215.hawk-pbs5"')
return parser.parse_args() return parser.parse_args()
@ -22,49 +23,6 @@ def parse_jobid(s):
raise argparse.ArgumentTypeError(f'invalid job ID "{s}"') raise argparse.ArgumentTypeError(f'invalid job ID "{s}"')
return jobid return jobid
def init_db():
import sqlalchemy as db
_verbose = False #True
engine = db.create_engine('postgresql://hpc@hawk-monitor4:5432/coe_mon', echo=_verbose)
conn = engine.connect()
return conn
def init_query(jobid, interval):
import sqlalchemy as db
query_string = """with job as (
select job_id, starttime, endtime, nodes from jobs where job_id='{jobid}.hawk-pbs5'
),
node_series as(
select n.name, scmcavg.id as series_id from nodes n
inner join (select * from label where key='node') l on n.id = l.value::int
inner join series_cmc_power_racktraynodepoweravg scmcavg on l.id = scmcavg.labels[(
select pos from label_key_position
where metric_category= 'cmc_power'
and metric_name = 'RackTrayNodePowerAvg'
and key = 'node'
)]
where n.id = any((select nodes from job)::int[])
)
select a.time, a.value, ns.name from (
select
time_bucket(extract ('epoch' from '{interval} seconds'::interval)::int*1000, cmcavg.ts) as time,
cmcavg.series_id::varchar,
avg(cmcavg.val) AS value
from cmc_power_racktraynodepoweravg cmcavg
where
ts <= (select endtime from job)
and ts >= (select starttime from job)
and series_id = Any(select series_id from node_series)
group by time, cmcavg.series_id order by time desc) a
inner join node_series ns on a.series_id::int = ns.series_id;
"""
return db.text(query_string.format(jobid=jobid, interval=interval))
def db_to_list(connection, query):
return connection.execute(query).fetchall()
def db_to_pf(connection, query):
return pd.read_sql(query, con=connection)
class Power: class Power:
def __init__(self, nodes): def __init__(self, nodes):
@ -73,20 +31,12 @@ class Power:
self.first_ts = None self.first_ts = None
self.last_ts = None self.last_ts = None
def insert_epoch(self, ts, values):
self.epochs[ts] = values
if not self.first_ts:
self.first_ts = ts
self.last_ts = ts
@classmethod @classmethod
def from_list(cls, data): def from_list(cls, data):
"""Assumes data is a list of tuples (timestamp, value, node)""" """Assumes data is a list of tuples (timestamp, value, node)"""
nodes = list(set([line[2] for line in data])) nodes = list(set([line[2] for line in data]))
cls = Power(nodes) cls = Power(nodes)
#times = list(set([line[0] for line in data]))
# for now ignore order to nodes # for now ignore order to nodes
values = {} values = {}
for l in data: for l in data:
@ -103,52 +53,13 @@ class Power:
return cls return cls
def header(self): @classmethod
hd = "# all timestamp have unit miliseconds since unix epoch\n" def from_db(cls, db, jobid, interval):
hd += "# all power values have unit Watt\n" all_list = db.db_to_list(jobid, interval)
hd += "timestamp,delta_t,head_node_power,avg_node_power,median_node_power,min_node_power,max_node_power,std_dev_node_power" if not all_list:
# add node names here instead raise RuntimeError
hd += ",NO_NODE_NAMES_YET\n"
return hd
@staticmethod
def summarize_values(val):
values = np.asarray(val)
head = values[0]
min, max = values.min(), values.max()
avg, stddev = values.mean(), values.std()
median = np.median(values)
return head, avg, median, min, max, stddev
@staticmethod
def summarize_time(ts):
return ts, -1
@staticmethod
def summarize_epoch(epoch):
ts, values = epoch
return Power.summarize_time(ts) \
+ Power.summarize_values(values)
# + values
@staticmethod
def pretty_print(args):
return ",".join(str(a) for a in args) + '\n'
def body(self):
_body = ""
for epoch in self.epochs.items():
_body += Power.pretty_print(self.summarize_epoch(epoch))
return _body
def filename(self, jobid):
fname = "detailed_power_{jobid}.hawk-pbs5.{first}-{last}.csv".format(
jobid=jobid, first=self.first_ts, last=self.last_ts
)
return fname
return Power.from_list(all_list)
def to_file(self, jobid): def to_file(self, jobid):
"""Dumps power data to file. Returns filename is succesfull and None if unsucessfull.""" """Dumps power data to file. Returns filename is succesfull and None if unsucessfull."""
@ -167,31 +78,135 @@ class Power:
return fname return fname
def insert_epoch(self, ts, values):
self.epochs[ts] = values
if not self.first_ts:
self.first_ts = ts
self.last_ts = ts
def header(self):
hd = "# all timestamp have unit miliseconds since unix epoch\n"
hd += "# all power values have unit Watt\n"
hd += "timestamp,delta_t,head_node_power,avg_node_power,median_node_power,min_node_power,max_node_power,std_dev_node_power"
# add node names here instead
hd += ",NO_NODE_NAMES_YET\n"
return hd
def body(self):
_body = ""
for epoch in self.epochs.items():
_body += Power.pretty_print(self.summarize_epoch(epoch))
return _body
@staticmethod
def summarize_time(ts):
return ts, -1
@staticmethod
def summarize_values(val):
values = np.asarray(val)
head = values[0]
min, max = values.min(), values.max()
avg, stddev = values.mean(), values.std()
median = np.median(values)
return head, avg, median, min, max, stddev
@staticmethod
def summarize_epoch(epoch):
ts, values = epoch
return Power.summarize_time(ts) \
+ Power.summarize_values(values)
# + values
@staticmethod
def pretty_print(args):
return ",".join(str(a) for a in args) + '\n'
def filename(self, jobid):
fname = "detailed_power_{jobid}.hawk-pbs5.{first}-{last}.csv".format(
jobid=jobid, first=self.first_ts, last=self.last_ts
)
return fname
class MonitoringDB:
def __init__(self, verbose):
self.connection = self.init_db(verbose)
@staticmethod
def init_db(verbose):
import sqlalchemy as db
_verbose = False #True
engine = db.create_engine('postgresql://hpc@hawk-monitor4:5432/coe_mon', echo=verbose)
connection = engine.connect()
return connection
def close_db(self):
return self.connection.close()
@staticmethod
def build_query(jobid, interval):
import sqlalchemy as db
query_string = """with job as (
select job_id, starttime, endtime, nodes from jobs where job_id='{jobid}.hawk-pbs5'
),
node_series as(
select n.name, scmcavg.id as series_id from nodes n
inner join (select * from label where key='node') l on n.id = l.value::int
inner join series_cmc_power_racktraynodepoweravg scmcavg on l.id = scmcavg.labels[(
select pos from label_key_position
where metric_category= 'cmc_power'
and metric_name = 'RackTrayNodePowerAvg'
and key = 'node'
)]
where n.id = any((select nodes from job)::int[])
)
select a.time, a.value, ns.name from (
select
time_bucket(extract ('epoch' from '{interval} seconds'::interval)::int*1000, cmcavg.ts) as time,
cmcavg.series_id::varchar,
avg(cmcavg.val) AS value
from cmc_power_racktraynodepoweravg cmcavg
where
ts <= (select endtime from job)
and ts >= (select starttime from job)
and series_id = Any(select series_id from node_series)
group by time, cmcavg.series_id order by time desc) a
inner join node_series ns on a.series_id::int = ns.series_id;
"""
return db.text(query_string.format(jobid=jobid, interval=interval))
def db_to_list(self, jobid, interval):
query = self.build_query(jobid, interval)
return self.connection.execute(query).fetchall()
def db_to_pf(self, jobid, inerval):
query = self.build_query(jobid, interval)
return pd.read_sql(query, con=self.connection)
class App:
def __init__(self, config):
self.config = config
self.db = MonitoringDB(self.config.verbose)
def run_all(self):
for jobid in self.config.jobid:
try:
power = Power.from_db(self.db, jobid, self.config.interval)
except RuntimeError:
print('No data found for job ID "{}"'.format(jobid))
continue
fn = power.to_file(jobid)
if fn:
print('Created file {fn}'.format(fn=fn))
if __name__ == "__main__": if __name__ == "__main__":
args = parse_arguments() config = parse_arguments()
config.interval = 5
conn = init_db()
jobid = args.jobid
interval = 5
query = init_query(jobid, interval)
all_list = db_to_list(conn, query)
#all_df = db_to_df(conn, query)
power = Power.from_list(all_list) # , jobid, interval)
print("#epochs", len(power.epochs))
print(power.header())
epochs_iter = iter(power.epochs.items())
ts, values = next(epochs_iter)
print(ts, values)
print('time:', power.summarize_time(ts))
print('values:', power.summarize_values(values))
print('epoch:', power.pretty_print(power.summarize_epoch((ts, values))))
print("filename: ", power.to_file(jobid))
main = App(config)
main.run_all()