Compare commits
11 commits
294a4e1273
...
5f4765d196
Author | SHA1 | Date | |
---|---|---|---|
5f4765d196 | |||
c0f6adcf67 | |||
035803fbe2 | |||
ee106619ea | |||
36bfb06228 | |||
f9ebd68654 | |||
9294b04291 | |||
bacc9e63c9 | |||
8596fb4184 | |||
f90b3fc1da | |||
3a8170e7c9 |
2 changed files with 141 additions and 126 deletions
|
@ -16,7 +16,7 @@ Requirements:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
ssh monitoring
|
ssh monitoring
|
||||||
./get_detailed_power.py 234556 767869.hawk-pbs5
|
./get_detailed_power.py 2335001 7678069.hawk-pbs5
|
||||||
Saved file detailed_power.234556.hawk-pbs5.100000-1000004.csv
|
Created file detailed_power.2335001.hawk-pbs5.100000-1000004.csv
|
||||||
Saved file detailed_power.767869.hawk-pbs5.2432345-423423.csv
|
Created file detailed_power.7678069.hawk-pbs5.2432345-423423.csv
|
||||||
```
|
```
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import argparse
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
import os.path
|
import os.path
|
||||||
|
@ -9,8 +10,8 @@ def parse_arguments():
|
||||||
parser.add_argument('-v', '--verbose', action='store_true',
|
parser.add_argument('-v', '--verbose', action='store_true',
|
||||||
help='Show database querries, etc.')
|
help='Show database querries, etc.')
|
||||||
parser.add_argument('jobid', type=parse_jobid,
|
parser.add_argument('jobid', type=parse_jobid,
|
||||||
# nargs='+',
|
nargs='+',
|
||||||
help='Job ID such as "2260215" or "226015.hawk-pbs5"')
|
help='Job ID such as "2260215" or "2260215.hawk-pbs5"')
|
||||||
|
|
||||||
return parser.parse_args()
|
return parser.parse_args()
|
||||||
|
|
||||||
|
@ -22,49 +23,6 @@ def parse_jobid(s):
|
||||||
raise argparse.ArgumentTypeError(f'invalid job ID "{s}"')
|
raise argparse.ArgumentTypeError(f'invalid job ID "{s}"')
|
||||||
return jobid
|
return jobid
|
||||||
|
|
||||||
def init_db():
|
|
||||||
import sqlalchemy as db
|
|
||||||
_verbose = False #True
|
|
||||||
engine = db.create_engine('postgresql://hpc@hawk-monitor4:5432/coe_mon', echo=_verbose)
|
|
||||||
conn = engine.connect()
|
|
||||||
return conn
|
|
||||||
|
|
||||||
def init_query(jobid, interval):
|
|
||||||
import sqlalchemy as db
|
|
||||||
query_string = """with job as (
|
|
||||||
select job_id, starttime, endtime, nodes from jobs where job_id='{jobid}.hawk-pbs5'
|
|
||||||
),
|
|
||||||
node_series as(
|
|
||||||
select n.name, scmcavg.id as series_id from nodes n
|
|
||||||
inner join (select * from label where key='node') l on n.id = l.value::int
|
|
||||||
inner join series_cmc_power_racktraynodepoweravg scmcavg on l.id = scmcavg.labels[(
|
|
||||||
select pos from label_key_position
|
|
||||||
where metric_category= 'cmc_power'
|
|
||||||
and metric_name = 'RackTrayNodePowerAvg'
|
|
||||||
and key = 'node'
|
|
||||||
)]
|
|
||||||
where n.id = any((select nodes from job)::int[])
|
|
||||||
)
|
|
||||||
select a.time, a.value, ns.name from (
|
|
||||||
select
|
|
||||||
time_bucket(extract ('epoch' from '{interval} seconds'::interval)::int*1000, cmcavg.ts) as time,
|
|
||||||
cmcavg.series_id::varchar,
|
|
||||||
avg(cmcavg.val) AS value
|
|
||||||
from cmc_power_racktraynodepoweravg cmcavg
|
|
||||||
where
|
|
||||||
ts <= (select endtime from job)
|
|
||||||
and ts >= (select starttime from job)
|
|
||||||
and series_id = Any(select series_id from node_series)
|
|
||||||
group by time, cmcavg.series_id order by time desc) a
|
|
||||||
inner join node_series ns on a.series_id::int = ns.series_id;
|
|
||||||
"""
|
|
||||||
return db.text(query_string.format(jobid=jobid, interval=interval))
|
|
||||||
|
|
||||||
def db_to_list(connection, query):
|
|
||||||
return connection.execute(query).fetchall()
|
|
||||||
|
|
||||||
def db_to_pf(connection, query):
|
|
||||||
return pd.read_sql(query, con=connection)
|
|
||||||
|
|
||||||
class Power:
|
class Power:
|
||||||
def __init__(self, nodes):
|
def __init__(self, nodes):
|
||||||
|
@ -73,20 +31,12 @@ class Power:
|
||||||
self.first_ts = None
|
self.first_ts = None
|
||||||
self.last_ts = None
|
self.last_ts = None
|
||||||
|
|
||||||
def insert_epoch(self, ts, values):
|
|
||||||
self.epochs[ts] = values
|
|
||||||
if not self.first_ts:
|
|
||||||
self.first_ts = ts
|
|
||||||
self.last_ts = ts
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_list(cls, data):
|
def from_list(cls, data):
|
||||||
"""Assumes data is a list of tuples (timestamp, value, node)"""
|
"""Assumes data is a list of tuples (timestamp, value, node)"""
|
||||||
nodes = list(set([line[2] for line in data]))
|
nodes = list(set([line[2] for line in data]))
|
||||||
cls = Power(nodes)
|
cls = Power(nodes)
|
||||||
|
|
||||||
#times = list(set([line[0] for line in data]))
|
|
||||||
|
|
||||||
# for now ignore order to nodes
|
# for now ignore order to nodes
|
||||||
values = {}
|
values = {}
|
||||||
for l in data:
|
for l in data:
|
||||||
|
@ -103,52 +53,13 @@ class Power:
|
||||||
|
|
||||||
return cls
|
return cls
|
||||||
|
|
||||||
def header(self):
|
@classmethod
|
||||||
hd = "# all timestamp have unit miliseconds since unix epoch\n"
|
def from_db(cls, db, jobid, interval):
|
||||||
hd += "# all power values have unit Watt\n"
|
all_list = db.db_to_list(jobid, interval)
|
||||||
hd += "timestamp,delta_t,head_node_power,avg_node_power,median_node_power,min_node_power,max_node_power,std_dev_node_power"
|
if not all_list:
|
||||||
# add node names here instead
|
raise RuntimeError
|
||||||
hd += ",NO_NODE_NAMES_YET\n"
|
|
||||||
return hd
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def summarize_values(val):
|
|
||||||
values = np.asarray(val)
|
|
||||||
head = values[0]
|
|
||||||
min, max = values.min(), values.max()
|
|
||||||
avg, stddev = values.mean(), values.std()
|
|
||||||
median = np.median(values)
|
|
||||||
return head, avg, median, min, max, stddev
|
|
||||||
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def summarize_time(ts):
|
|
||||||
return ts, -1
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def summarize_epoch(epoch):
|
|
||||||
ts, values = epoch
|
|
||||||
return Power.summarize_time(ts) \
|
|
||||||
+ Power.summarize_values(values)
|
|
||||||
# + values
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def pretty_print(args):
|
|
||||||
return ",".join(str(a) for a in args) + '\n'
|
|
||||||
|
|
||||||
|
|
||||||
def body(self):
|
|
||||||
_body = ""
|
|
||||||
for epoch in self.epochs.items():
|
|
||||||
_body += Power.pretty_print(self.summarize_epoch(epoch))
|
|
||||||
return _body
|
|
||||||
|
|
||||||
def filename(self, jobid):
|
|
||||||
fname = "detailed_power_{jobid}.hawk-pbs5.{first}-{last}.csv".format(
|
|
||||||
jobid=jobid, first=self.first_ts, last=self.last_ts
|
|
||||||
)
|
|
||||||
return fname
|
|
||||||
|
|
||||||
|
return Power.from_list(all_list)
|
||||||
|
|
||||||
def to_file(self, jobid):
|
def to_file(self, jobid):
|
||||||
"""Dumps power data to file. Returns filename is succesfull and None if unsucessfull."""
|
"""Dumps power data to file. Returns filename is succesfull and None if unsucessfull."""
|
||||||
|
@ -167,31 +78,135 @@ class Power:
|
||||||
|
|
||||||
return fname
|
return fname
|
||||||
|
|
||||||
|
def insert_epoch(self, ts, values):
|
||||||
|
self.epochs[ts] = values
|
||||||
|
if not self.first_ts:
|
||||||
|
self.first_ts = ts
|
||||||
|
self.last_ts = ts
|
||||||
|
|
||||||
|
def header(self):
|
||||||
|
hd = "# all timestamp have unit miliseconds since unix epoch\n"
|
||||||
|
hd += "# all power values have unit Watt\n"
|
||||||
|
hd += "timestamp,delta_t,head_node_power,avg_node_power,median_node_power,min_node_power,max_node_power,std_dev_node_power"
|
||||||
|
# add node names here instead
|
||||||
|
hd += ",NO_NODE_NAMES_YET\n"
|
||||||
|
return hd
|
||||||
|
|
||||||
|
def body(self):
|
||||||
|
_body = ""
|
||||||
|
for epoch in self.epochs.items():
|
||||||
|
_body += Power.pretty_print(self.summarize_epoch(epoch))
|
||||||
|
return _body
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def summarize_time(ts):
|
||||||
|
return ts, -1
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def summarize_values(val):
|
||||||
|
values = np.asarray(val)
|
||||||
|
head = values[0]
|
||||||
|
min, max = values.min(), values.max()
|
||||||
|
avg, stddev = values.mean(), values.std()
|
||||||
|
median = np.median(values)
|
||||||
|
return head, avg, median, min, max, stddev
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def summarize_epoch(epoch):
|
||||||
|
ts, values = epoch
|
||||||
|
return Power.summarize_time(ts) \
|
||||||
|
+ Power.summarize_values(values)
|
||||||
|
# + values
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def pretty_print(args):
|
||||||
|
return ",".join(str(a) for a in args) + '\n'
|
||||||
|
|
||||||
|
def filename(self, jobid):
|
||||||
|
fname = "detailed_power_{jobid}.hawk-pbs5.{first}-{last}.csv".format(
|
||||||
|
jobid=jobid, first=self.first_ts, last=self.last_ts
|
||||||
|
)
|
||||||
|
return fname
|
||||||
|
|
||||||
|
|
||||||
|
class MonitoringDB:
|
||||||
|
def __init__(self, verbose):
|
||||||
|
self.connection = self.init_db(verbose)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def init_db(verbose):
|
||||||
|
import sqlalchemy as db
|
||||||
|
_verbose = False #True
|
||||||
|
engine = db.create_engine('postgresql://hpc@hawk-monitor4:5432/coe_mon', echo=verbose)
|
||||||
|
connection = engine.connect()
|
||||||
|
return connection
|
||||||
|
|
||||||
|
def close_db(self):
|
||||||
|
return self.connection.close()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def build_query(jobid, interval):
|
||||||
|
import sqlalchemy as db
|
||||||
|
query_string = """with job as (
|
||||||
|
select job_id, starttime, endtime, nodes from jobs where job_id='{jobid}.hawk-pbs5'
|
||||||
|
),
|
||||||
|
node_series as(
|
||||||
|
select n.name, scmcavg.id as series_id from nodes n
|
||||||
|
inner join (select * from label where key='node') l on n.id = l.value::int
|
||||||
|
inner join series_cmc_power_racktraynodepoweravg scmcavg on l.id = scmcavg.labels[(
|
||||||
|
select pos from label_key_position
|
||||||
|
where metric_category= 'cmc_power'
|
||||||
|
and metric_name = 'RackTrayNodePowerAvg'
|
||||||
|
and key = 'node'
|
||||||
|
)]
|
||||||
|
where n.id = any((select nodes from job)::int[])
|
||||||
|
)
|
||||||
|
select a.time, a.value, ns.name from (
|
||||||
|
select
|
||||||
|
time_bucket(extract ('epoch' from '{interval} seconds'::interval)::int*1000, cmcavg.ts) as time,
|
||||||
|
cmcavg.series_id::varchar,
|
||||||
|
avg(cmcavg.val) AS value
|
||||||
|
from cmc_power_racktraynodepoweravg cmcavg
|
||||||
|
where
|
||||||
|
ts <= (select endtime from job)
|
||||||
|
and ts >= (select starttime from job)
|
||||||
|
and series_id = Any(select series_id from node_series)
|
||||||
|
group by time, cmcavg.series_id order by time desc) a
|
||||||
|
inner join node_series ns on a.series_id::int = ns.series_id;
|
||||||
|
"""
|
||||||
|
return db.text(query_string.format(jobid=jobid, interval=interval))
|
||||||
|
|
||||||
|
def db_to_list(self, jobid, interval):
|
||||||
|
query = self.build_query(jobid, interval)
|
||||||
|
return self.connection.execute(query).fetchall()
|
||||||
|
|
||||||
|
def db_to_pf(self, jobid, inerval):
|
||||||
|
query = self.build_query(jobid, interval)
|
||||||
|
return pd.read_sql(query, con=self.connection)
|
||||||
|
|
||||||
|
|
||||||
|
class App:
|
||||||
|
def __init__(self, config):
|
||||||
|
self.config = config
|
||||||
|
self.db = MonitoringDB(self.config.verbose)
|
||||||
|
|
||||||
|
def run_all(self):
|
||||||
|
for jobid in self.config.jobid:
|
||||||
|
try:
|
||||||
|
power = Power.from_db(self.db, jobid, self.config.interval)
|
||||||
|
except RuntimeError:
|
||||||
|
print('No data found for job ID "{}"'.format(jobid))
|
||||||
|
continue
|
||||||
|
|
||||||
|
fn = power.to_file(jobid)
|
||||||
|
if fn:
|
||||||
|
print('Created file {fn}'.format(fn=fn))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
args = parse_arguments()
|
config = parse_arguments()
|
||||||
|
config.interval = 5
|
||||||
conn = init_db()
|
|
||||||
jobid = args.jobid
|
|
||||||
interval = 5
|
|
||||||
query = init_query(jobid, interval)
|
|
||||||
|
|
||||||
all_list = db_to_list(conn, query)
|
|
||||||
#all_df = db_to_df(conn, query)
|
|
||||||
|
|
||||||
power = Power.from_list(all_list) # , jobid, interval)
|
|
||||||
print("#epochs", len(power.epochs))
|
|
||||||
|
|
||||||
print(power.header())
|
|
||||||
|
|
||||||
epochs_iter = iter(power.epochs.items())
|
|
||||||
ts, values = next(epochs_iter)
|
|
||||||
print(ts, values)
|
|
||||||
print('time:', power.summarize_time(ts))
|
|
||||||
print('values:', power.summarize_values(values))
|
|
||||||
print('epoch:', power.pretty_print(power.summarize_epoch((ts, values))))
|
|
||||||
print("filename: ", power.to_file(jobid))
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
main = App(config)
|
||||||
|
main.run_all()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue