Add prototype to query job power from database
This commit is contained in:
parent
8a60c2dd9f
commit
831a85639b
2 changed files with 191 additions and 0 deletions
|
@ -1,3 +1,20 @@
|
|||
# hawk-utils-scripts/monitoring/db
|
||||
|
||||
Use this directory for scripts, etc, that operate on or extract data from the the database.
|
||||
|
||||
## scripts/get_detailed_power.py
|
||||
|
||||
Python script which querries the database for a list of jobids and produces a file for each jobid.
|
||||
|
||||
Those files contain time-resolved power consumption data and are meant to be consumed by the utility in [monitoring/logs/scripts/plot_energy_logs.sh](../logs/README.md#scripts/plot_energy_logs.sh).
|
||||
|
||||
Requirements:
|
||||
- Python module sqlalchemy `python -m pip install sqlalchemy`
|
||||
|
||||
Usage:
|
||||
```bash
|
||||
ssh monitoring
|
||||
./get_detailed_power.py 234556 767869.hawk-pbs5
|
||||
Saved file detailed_power.234556.hawk-pbs5.100000-1000004.csv
|
||||
Saved file detailed_power.767869.hawk-pbs5.2432345-423423.csv
|
||||
```
|
||||
|
|
174
monitoring/db/scripts/get_detailed_power.py
Normal file
174
monitoring/db/scripts/get_detailed_power.py
Normal file
|
@ -0,0 +1,174 @@
|
|||
import sqlalchemy as db
|
||||
import numpy as np
|
||||
from collections import OrderedDict
|
||||
import os.path
|
||||
|
||||
def init_db():
|
||||
_verbose = False #True
|
||||
engine = db.create_engine('postgresql://hpc@hawk-monitor4:5432/coe_mon', echo=_verbose)
|
||||
conn = engine.connect()
|
||||
return conn
|
||||
|
||||
def init_query(jobid, interval):
|
||||
query_string = """with job as (
|
||||
select job_id, starttime, endtime, nodes from jobs where job_id='{jobid}.hawk-pbs5'
|
||||
),
|
||||
node_series as(
|
||||
select n.name, scmcavg.id as series_id from nodes n
|
||||
inner join (select * from label where key='node') l on n.id = l.value::int
|
||||
inner join series_cmc_power_racktraynodepoweravg scmcavg on l.id = scmcavg.labels[(
|
||||
select pos from label_key_position
|
||||
where metric_category= 'cmc_power'
|
||||
and metric_name = 'RackTrayNodePowerAvg'
|
||||
and key = 'node'
|
||||
)]
|
||||
where n.id = any((select nodes from job)::int[])
|
||||
)
|
||||
select a.time, a.value, ns.name from (
|
||||
select
|
||||
time_bucket(extract ('epoch' from '{interval} seconds'::interval)::int*1000, cmcavg.ts) as time,
|
||||
cmcavg.series_id::varchar,
|
||||
avg(cmcavg.val) AS value
|
||||
from cmc_power_racktraynodepoweravg cmcavg
|
||||
where
|
||||
ts <= (select endtime from job)
|
||||
and ts >= (select starttime from job)
|
||||
and series_id = Any(select series_id from node_series)
|
||||
group by time, cmcavg.series_id order by time desc) a
|
||||
inner join node_series ns on a.series_id::int = ns.series_id;
|
||||
"""
|
||||
return db.text(query_string.format(jobid=jobid, interval=interval))
|
||||
|
||||
def db_to_list(connection, query):
|
||||
return connection.execute(query).fetchall()
|
||||
|
||||
def db_to_pf(connection, query):
|
||||
return pd.read_sql(query, con=connection)
|
||||
|
||||
class Power:
|
||||
def __init__(self, nodes):
|
||||
self.nodes = nodes
|
||||
self.epochs = OrderedDict()
|
||||
self.first_ts = None
|
||||
self.last_ts = None
|
||||
|
||||
def insert_epoch(self, ts, values):
|
||||
self.epochs[ts] = values
|
||||
if not self.first_ts:
|
||||
self.first_ts = ts
|
||||
self.last_ts = ts
|
||||
|
||||
@classmethod
|
||||
def from_list(cls, data):
|
||||
"""Assumes data is a list of tuples (timestamp, value, node)"""
|
||||
nodes = list(set([line[2] for line in data]))
|
||||
cls = Power(nodes)
|
||||
|
||||
#times = list(set([line[0] for line in data]))
|
||||
|
||||
# for now ignore order to nodes
|
||||
values = {}
|
||||
for l in data:
|
||||
ts = l[0]
|
||||
if ts not in values:
|
||||
values[ts] = []
|
||||
# node = l[1]
|
||||
power = l[1]
|
||||
values[ts].append(power)
|
||||
|
||||
epochs = sorted(values.keys())
|
||||
for epoch in epochs:
|
||||
cls.insert_epoch(epoch, values[epoch])
|
||||
|
||||
return cls
|
||||
|
||||
def header(self):
|
||||
hd = "# all timestamp have unit miliseconds since unix epoch\n"
|
||||
hd += "# all power values have unit Watt\n"
|
||||
hd += "timestamp,delta_t,head_node_power,avg_node_power,median_node_power,min_node_power,max_node_power,std_dev_node_power"
|
||||
# add node names here instead
|
||||
hd += ",NO_NODE_NAMES_YET\n"
|
||||
return hd
|
||||
|
||||
@staticmethod
|
||||
def summarize_values(val):
|
||||
values = np.asarray(val)
|
||||
head = values[0]
|
||||
min, max = values.min(), values.max()
|
||||
avg, stddev = values.mean(), values.std()
|
||||
median = np.median(values)
|
||||
return head, avg, median, min, max, stddev
|
||||
|
||||
|
||||
@staticmethod
|
||||
def summarize_time(ts):
|
||||
return ts, -1
|
||||
|
||||
@staticmethod
|
||||
def summarize_epoch(epoch):
|
||||
ts, values = epoch
|
||||
return Power.summarize_time(ts) \
|
||||
+ Power.summarize_values(values)
|
||||
# + values
|
||||
|
||||
@staticmethod
|
||||
def pretty_print(args):
|
||||
return ",".join(str(a) for a in args) + '\n'
|
||||
|
||||
|
||||
def body(self):
|
||||
_body = ""
|
||||
for epoch in self.epochs.items():
|
||||
_body += Power.pretty_print(self.summarize_epoch(epoch))
|
||||
return _body
|
||||
|
||||
def filename(self, jobid):
|
||||
fname = "detailed_power_{jobid}.hawk-pbs5.{first}-{last}.csv".format(
|
||||
jobid=jobid, first=self.first_ts, last=self.last_ts
|
||||
)
|
||||
return fname
|
||||
|
||||
|
||||
def to_file(self, jobid):
|
||||
"""Dumps power data to file. Returns filename is succesfull and None if unsucessfull."""
|
||||
fname = self.filename(jobid)
|
||||
if os.path.exists(fname):
|
||||
print("Error: cowardly refusing to overwrite file ", fname)
|
||||
return None
|
||||
|
||||
try:
|
||||
with open(fname, "w+") as f:
|
||||
f.write(self.header())
|
||||
f.write(self.body())
|
||||
except IOError:
|
||||
print("Error: could not write to file ", filename)
|
||||
fname = None
|
||||
|
||||
return fname
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
conn = init_db()
|
||||
jobid = "2260215"
|
||||
interval = 5
|
||||
query = init_query(jobid, interval)
|
||||
|
||||
all_list = db_to_list(conn, query)
|
||||
#all_df = db_to_df(conn, query)
|
||||
|
||||
power = Power.from_list(all_list) # , jobid, interval)
|
||||
print("#epochs", len(power.epochs))
|
||||
|
||||
print(power.header())
|
||||
|
||||
epochs_iter = iter(power.epochs.items())
|
||||
ts, values = next(epochs_iter)
|
||||
print(ts, values)
|
||||
print('time:', power.summarize_time(ts))
|
||||
print('values:', power.summarize_values(values))
|
||||
print('epoch:', power.pretty_print(power.summarize_epoch((ts, values))))
|
||||
print("filename: ", power.to_file(jobid))
|
||||
|
||||
|
||||
|
||||
|
Loading…
Reference in a new issue