Compare commits
7 commits
41dab7a647
...
ff7da8b8a2
Author | SHA1 | Date | |
---|---|---|---|
ff7da8b8a2 | |||
1e09e93400 | |||
a484711971 | |||
8543446b91 | |||
c7c4460196 | |||
e73f69f25e | |||
ae00b04bbe |
1 changed files with 69 additions and 45 deletions
|
@ -13,6 +13,8 @@ def parse_arguments(args):
|
||||||
help='Show database querries, etc.')
|
help='Show database querries, etc.')
|
||||||
parser.add_argument('-t', '--interval', action='store', type=float, default=5.0,
|
parser.add_argument('-t', '--interval', action='store', type=float, default=5.0,
|
||||||
help="Interval between power values in seconds")
|
help="Interval between power values in seconds")
|
||||||
|
parser.add_argument('--hawk-ai', action='store_true',
|
||||||
|
help="Job did run on Hawk-AI")
|
||||||
parser.add_argument('jobid', type=parse_jobid,
|
parser.add_argument('jobid', type=parse_jobid,
|
||||||
nargs='+',
|
nargs='+',
|
||||||
help='Job ID such as "2260215" or "2260215.hawk-pbs5"')
|
help='Job ID such as "2260215" or "2260215.hawk-pbs5"')
|
||||||
|
@ -38,18 +40,19 @@ class Power:
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_list(cls, data):
|
def from_list(cls, data):
|
||||||
"""Assumes data is a list of tuples (timestamp, value, node)"""
|
"""Assumes data is a list of tuples (timestamp, node, value)"""
|
||||||
nodes = list(set([line[2] for line in data]))
|
idx_ts = 0; idx_node = 1; idx_value = 2
|
||||||
|
nodes = list(set([line[idx_node] for line in data]))
|
||||||
cls = Power(nodes)
|
cls = Power(nodes)
|
||||||
|
|
||||||
# for now ignore order to nodes
|
# for now ignore order to nodes
|
||||||
values = {}
|
values = {}
|
||||||
for l in data:
|
for l in data:
|
||||||
ts = l[0]
|
ts = l[idx_ts]
|
||||||
if ts not in values:
|
if ts not in values:
|
||||||
values[ts] = []
|
values[ts] = []
|
||||||
# node = l[1]
|
# node = l[1]
|
||||||
power = l[1]
|
power = l[idx_value]
|
||||||
values[ts].append(power)
|
values[ts].append(power)
|
||||||
|
|
||||||
epochs = sorted(values.keys())
|
epochs = sorted(values.keys())
|
||||||
|
@ -59,8 +62,8 @@ class Power:
|
||||||
return cls
|
return cls
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_db(cls, db, jobid, interval):
|
def from_db(cls, db, jobid, interval, hawk_ai):
|
||||||
all_list = db.db_to_list(jobid, interval)
|
all_list = db.db_to_list(jobid, interval, hawk_ai)
|
||||||
if not all_list:
|
if not all_list:
|
||||||
raise RuntimeError
|
raise RuntimeError
|
||||||
|
|
||||||
|
@ -69,7 +72,7 @@ class Power:
|
||||||
|
|
||||||
return power
|
return power
|
||||||
|
|
||||||
def to_file(self, jobid):
|
def to_file(self, jobid, header=""):
|
||||||
"""Dumps power data to file. Returns filename is succesfull and None if unsucessfull."""
|
"""Dumps power data to file. Returns filename is succesfull and None if unsucessfull."""
|
||||||
fname = self.filename(jobid)
|
fname = self.filename(jobid)
|
||||||
if os.path.exists(fname):
|
if os.path.exists(fname):
|
||||||
|
@ -78,7 +81,7 @@ class Power:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with open(fname, "w+") as f:
|
with open(fname, "w+") as f:
|
||||||
f.write(self.header())
|
f.write(header + self.header())
|
||||||
f.write(self.body())
|
f.write(self.body())
|
||||||
except IOError:
|
except IOError:
|
||||||
print("Error: could not write to file ", fname)
|
print("Error: could not write to file ", fname)
|
||||||
|
@ -139,13 +142,36 @@ class Power:
|
||||||
|
|
||||||
|
|
||||||
class MonitoringDB:
|
class MonitoringDB:
|
||||||
|
QUERY_STRING_HAWK = """
|
||||||
|
-- For description of get_job_data(), see https://kb.hlrs.de/monitoring/index.php/TimescaleDB_-_Query_Guidelines#Function:_get_job_data_and_get_ai_job_data
|
||||||
|
select * from get_job_data(
|
||||||
|
'{jobid}.hawk-pbs5',
|
||||||
|
'cmc_power_racktraynodepoweravg', -- power data source
|
||||||
|
'{interval} seconds',
|
||||||
|
array['avg'], -- aggregation: average across samples in bucket
|
||||||
|
array['time','node'] -- sort by time first than node (ascending)
|
||||||
|
)
|
||||||
|
as t(time bigint, name varchar, avg double precision);
|
||||||
|
"""
|
||||||
|
|
||||||
|
QUERY_STRING_HAWK_AI = """
|
||||||
|
-- For description of get_ai_job_data(), see https://kb.hlrs.de/monitoring/index.php/TimescaleDB_-_Query_Guidelines#Function:_get_job_data_and_get_ai_job_data
|
||||||
|
select * from get_ai_job_data(
|
||||||
|
'{jobid}.hawk-pbs5',
|
||||||
|
'telegraf_ipmi_power_meter', -- power data source
|
||||||
|
'{interval} seconds',
|
||||||
|
array['avg'], -- aggregation: average across samples in bucket
|
||||||
|
array['time','node'] -- sort by time first than node (ascending)
|
||||||
|
)
|
||||||
|
as t(time bigint, name varchar, avg double precision);
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, verbose):
|
def __init__(self, verbose):
|
||||||
self.connection = self.init_db(verbose)
|
self.connection = self.init_db(verbose)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def init_db(verbose):
|
def init_db(verbose):
|
||||||
import sqlalchemy as db
|
import sqlalchemy as db
|
||||||
_verbose = False #True
|
|
||||||
engine = db.create_engine('postgresql://hpc@hawk-monitor4:5432/coe_mon', echo=verbose)
|
engine = db.create_engine('postgresql://hpc@hawk-monitor4:5432/coe_mon', echo=verbose)
|
||||||
connection = engine.connect()
|
connection = engine.connect()
|
||||||
return connection
|
return connection
|
||||||
|
@ -153,45 +179,22 @@ class MonitoringDB:
|
||||||
def close_db(self):
|
def close_db(self):
|
||||||
return self.connection.close()
|
return self.connection.close()
|
||||||
|
|
||||||
@staticmethod
|
@classmethod
|
||||||
def build_query(jobid, interval):
|
def build_query(cls, jobid, interval, hawk_ai):
|
||||||
"""Query taken from https://kb.hlrs.de/monitoring/index.php/TimescaleDB_-_Query_Guidelines#Power_by_Job_Query"""
|
|
||||||
import sqlalchemy as db
|
import sqlalchemy as db
|
||||||
query_string = """with _job as (
|
if hawk_ai:
|
||||||
select job_id, starttime, endtime, nodes from job where job_id='{jobid}.hawk-pbs5'
|
query_string = cls.QUERY_STRING_HAWK_AI
|
||||||
),
|
else:
|
||||||
node_series as(
|
query_string = cls.QUERY_STRING_HAWK
|
||||||
select n.name, scmcavg.id as series_id from node n
|
|
||||||
inner join (select * from label where key='node') l on n.id = l.value::int
|
|
||||||
inner join series_cmc_power_racktraynodepoweravg scmcavg on l.id = scmcavg.labels[(
|
|
||||||
select pos from label_key_position
|
|
||||||
where metric_category= 'cmc_power'
|
|
||||||
and metric_name = 'RackTrayNodePowerAvg'
|
|
||||||
and key = 'node'
|
|
||||||
)]
|
|
||||||
where n.id = any((select nodes from _job)::int[])
|
|
||||||
)
|
|
||||||
select a.time, a.value, ns.name from (
|
|
||||||
select
|
|
||||||
time_bucket(extract ('epoch' from '{interval} seconds'::interval)::int*1000, cmcavg.ts) as time,
|
|
||||||
cmcavg.series_id::varchar,
|
|
||||||
avg(cmcavg.val) AS value
|
|
||||||
from cmc_power_racktraynodepoweravg cmcavg
|
|
||||||
where
|
|
||||||
ts <= (select endtime from _job)
|
|
||||||
and ts >= (select starttime from _job)
|
|
||||||
and series_id = Any(select series_id from node_series)
|
|
||||||
group by time, cmcavg.series_id order by time desc) a
|
|
||||||
inner join node_series ns on a.series_id::int = ns.series_id;
|
|
||||||
"""
|
|
||||||
return db.text(query_string.format(jobid=jobid, interval=interval))
|
return db.text(query_string.format(jobid=jobid, interval=interval))
|
||||||
|
|
||||||
def db_to_list(self, jobid, interval):
|
def db_to_list(self, jobid, interval, hawk_ai):
|
||||||
query = self.build_query(jobid, interval)
|
query = self.build_query(jobid, interval, hawk_ai)
|
||||||
return self.connection.execute(query).fetchall()
|
return self.connection.execute(query).fetchall()
|
||||||
|
|
||||||
def db_to_pf(self, jobid, inerval):
|
def db_to_pf(self, jobid, interval, hawk_ai):
|
||||||
query = self.build_query(jobid, interval)
|
import pandas as pd
|
||||||
|
query = self.build_query(jobid, interval, hawk_ai)
|
||||||
return pd.read_sql(query, con=self.connection)
|
return pd.read_sql(query, con=self.connection)
|
||||||
|
|
||||||
|
|
||||||
|
@ -200,22 +203,43 @@ class App:
|
||||||
self.config = config
|
self.config = config
|
||||||
self.db = MonitoringDB(self.config.verbose)
|
self.db = MonitoringDB(self.config.verbose)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def warnings(config):
|
||||||
|
warn = ""
|
||||||
|
if not config.hawk_ai and config.interval < 5:
|
||||||
|
warn += '# Warning: interval<5 is very small and may lead to data gaps.'
|
||||||
|
if config.hawk_ai and config.interval < 60:
|
||||||
|
warn += '# Warning: interval<60 is very small for Hawk-AI nodes and may lead to data gaps.'
|
||||||
|
return warn
|
||||||
|
|
||||||
def run_all(self):
|
def run_all(self):
|
||||||
|
warnings = self.warnings(self.config)
|
||||||
|
if warnings:
|
||||||
|
print(warnings)
|
||||||
|
|
||||||
|
header = f"# {config.datetime}: {config.cmd}\n"
|
||||||
|
if warnings:
|
||||||
|
header += f"{warnings}\n"
|
||||||
|
header += "#\n"
|
||||||
|
|
||||||
for jobid in self.config.jobid:
|
for jobid in self.config.jobid:
|
||||||
try:
|
try:
|
||||||
power = Power.from_db(self.db, jobid, self.config.interval)
|
power = Power.from_db(self.db, jobid, self.config.interval, self.config.hawk_ai)
|
||||||
except RuntimeError:
|
except RuntimeError:
|
||||||
print('No data found for job ID "{}"'.format(jobid))
|
print('No data found for job ID "{}"'.format(jobid))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
fn = power.to_file(jobid)
|
fn = power.to_file(jobid, header)
|
||||||
if fn:
|
if fn:
|
||||||
print('Created file {fn}'.format(fn=fn))
|
print('Created file {fn}'.format(fn=fn))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
import sys
|
import sys
|
||||||
|
from datetime import datetime
|
||||||
config = parse_arguments(sys.argv[1:])
|
config = parse_arguments(sys.argv[1:])
|
||||||
|
config.cmd = " ".join(sys.argv)
|
||||||
|
config.datetime = f"{datetime.now()}"
|
||||||
|
|
||||||
main = App(config)
|
main = App(config)
|
||||||
main.run_all()
|
main.run_all()
|
||||||
|
|
Loading…
Reference in a new issue