Compare commits

..

7 commits

View file

@ -13,6 +13,8 @@ def parse_arguments(args):
help='Show database querries, etc.') help='Show database querries, etc.')
parser.add_argument('-t', '--interval', action='store', type=float, default=5.0, parser.add_argument('-t', '--interval', action='store', type=float, default=5.0,
help="Interval between power values in seconds") help="Interval between power values in seconds")
parser.add_argument('--hawk-ai', action='store_true',
help="Job did run on Hawk-AI")
parser.add_argument('jobid', type=parse_jobid, parser.add_argument('jobid', type=parse_jobid,
nargs='+', nargs='+',
help='Job ID such as "2260215" or "2260215.hawk-pbs5"') help='Job ID such as "2260215" or "2260215.hawk-pbs5"')
@ -38,18 +40,19 @@ class Power:
@classmethod @classmethod
def from_list(cls, data): def from_list(cls, data):
"""Assumes data is a list of tuples (timestamp, value, node)""" """Assumes data is a list of tuples (timestamp, node, value)"""
nodes = list(set([line[2] for line in data])) idx_ts = 0; idx_node = 1; idx_value = 2
nodes = list(set([line[idx_node] for line in data]))
cls = Power(nodes) cls = Power(nodes)
# for now ignore order to nodes # for now ignore order to nodes
values = {} values = {}
for l in data: for l in data:
ts = l[0] ts = l[idx_ts]
if ts not in values: if ts not in values:
values[ts] = [] values[ts] = []
# node = l[1] # node = l[1]
power = l[1] power = l[idx_value]
values[ts].append(power) values[ts].append(power)
epochs = sorted(values.keys()) epochs = sorted(values.keys())
@ -59,8 +62,8 @@ class Power:
return cls return cls
@classmethod @classmethod
def from_db(cls, db, jobid, interval): def from_db(cls, db, jobid, interval, hawk_ai):
all_list = db.db_to_list(jobid, interval) all_list = db.db_to_list(jobid, interval, hawk_ai)
if not all_list: if not all_list:
raise RuntimeError raise RuntimeError
@ -69,7 +72,7 @@ class Power:
return power return power
def to_file(self, jobid): def to_file(self, jobid, header=""):
"""Dumps power data to file. Returns filename is succesfull and None if unsucessfull.""" """Dumps power data to file. Returns filename is succesfull and None if unsucessfull."""
fname = self.filename(jobid) fname = self.filename(jobid)
if os.path.exists(fname): if os.path.exists(fname):
@ -78,7 +81,7 @@ class Power:
try: try:
with open(fname, "w+") as f: with open(fname, "w+") as f:
f.write(self.header()) f.write(header + self.header())
f.write(self.body()) f.write(self.body())
except IOError: except IOError:
print("Error: could not write to file ", fname) print("Error: could not write to file ", fname)
@ -139,13 +142,36 @@ class Power:
class MonitoringDB: class MonitoringDB:
QUERY_STRING_HAWK = """
-- For description of get_job_data(), see https://kb.hlrs.de/monitoring/index.php/TimescaleDB_-_Query_Guidelines#Function:_get_job_data_and_get_ai_job_data
select * from get_job_data(
'{jobid}.hawk-pbs5',
'cmc_power_racktraynodepoweravg', -- power data source
'{interval} seconds',
array['avg'], -- aggregation: average across samples in bucket
array['time','node'] -- sort by time first than node (ascending)
)
as t(time bigint, name varchar, avg double precision);
"""
QUERY_STRING_HAWK_AI = """
-- For description of get_ai_job_data(), see https://kb.hlrs.de/monitoring/index.php/TimescaleDB_-_Query_Guidelines#Function:_get_job_data_and_get_ai_job_data
select * from get_ai_job_data(
'{jobid}.hawk-pbs5',
'telegraf_ipmi_power_meter', -- power data source
'{interval} seconds',
array['avg'], -- aggregation: average across samples in bucket
array['time','node'] -- sort by time first than node (ascending)
)
as t(time bigint, name varchar, avg double precision);
"""
def __init__(self, verbose): def __init__(self, verbose):
self.connection = self.init_db(verbose) self.connection = self.init_db(verbose)
@staticmethod @staticmethod
def init_db(verbose): def init_db(verbose):
import sqlalchemy as db import sqlalchemy as db
_verbose = False #True
engine = db.create_engine('postgresql://hpc@hawk-monitor4:5432/coe_mon', echo=verbose) engine = db.create_engine('postgresql://hpc@hawk-monitor4:5432/coe_mon', echo=verbose)
connection = engine.connect() connection = engine.connect()
return connection return connection
@ -153,45 +179,22 @@ class MonitoringDB:
def close_db(self): def close_db(self):
return self.connection.close() return self.connection.close()
@staticmethod @classmethod
def build_query(jobid, interval): def build_query(cls, jobid, interval, hawk_ai):
"""Query taken from https://kb.hlrs.de/monitoring/index.php/TimescaleDB_-_Query_Guidelines#Power_by_Job_Query"""
import sqlalchemy as db import sqlalchemy as db
query_string = """with _job as ( if hawk_ai:
select job_id, starttime, endtime, nodes from job where job_id='{jobid}.hawk-pbs5' query_string = cls.QUERY_STRING_HAWK_AI
), else:
node_series as( query_string = cls.QUERY_STRING_HAWK
select n.name, scmcavg.id as series_id from node n
inner join (select * from label where key='node') l on n.id = l.value::int
inner join series_cmc_power_racktraynodepoweravg scmcavg on l.id = scmcavg.labels[(
select pos from label_key_position
where metric_category= 'cmc_power'
and metric_name = 'RackTrayNodePowerAvg'
and key = 'node'
)]
where n.id = any((select nodes from _job)::int[])
)
select a.time, a.value, ns.name from (
select
time_bucket(extract ('epoch' from '{interval} seconds'::interval)::int*1000, cmcavg.ts) as time,
cmcavg.series_id::varchar,
avg(cmcavg.val) AS value
from cmc_power_racktraynodepoweravg cmcavg
where
ts <= (select endtime from _job)
and ts >= (select starttime from _job)
and series_id = Any(select series_id from node_series)
group by time, cmcavg.series_id order by time desc) a
inner join node_series ns on a.series_id::int = ns.series_id;
"""
return db.text(query_string.format(jobid=jobid, interval=interval)) return db.text(query_string.format(jobid=jobid, interval=interval))
def db_to_list(self, jobid, interval): def db_to_list(self, jobid, interval, hawk_ai):
query = self.build_query(jobid, interval) query = self.build_query(jobid, interval, hawk_ai)
return self.connection.execute(query).fetchall() return self.connection.execute(query).fetchall()
def db_to_pf(self, jobid, inerval): def db_to_pf(self, jobid, interval, hawk_ai):
query = self.build_query(jobid, interval) import pandas as pd
query = self.build_query(jobid, interval, hawk_ai)
return pd.read_sql(query, con=self.connection) return pd.read_sql(query, con=self.connection)
@ -200,22 +203,43 @@ class App:
self.config = config self.config = config
self.db = MonitoringDB(self.config.verbose) self.db = MonitoringDB(self.config.verbose)
@staticmethod
def warnings(config):
warn = ""
if not config.hawk_ai and config.interval < 5:
warn += '# Warning: interval<5 is very small and may lead to data gaps.'
if config.hawk_ai and config.interval < 60:
warn += '# Warning: interval<60 is very small for Hawk-AI nodes and may lead to data gaps.'
return warn
def run_all(self): def run_all(self):
warnings = self.warnings(self.config)
if warnings:
print(warnings)
header = f"# {config.datetime}: {config.cmd}\n"
if warnings:
header += f"{warnings}\n"
header += "#\n"
for jobid in self.config.jobid: for jobid in self.config.jobid:
try: try:
power = Power.from_db(self.db, jobid, self.config.interval) power = Power.from_db(self.db, jobid, self.config.interval, self.config.hawk_ai)
except RuntimeError: except RuntimeError:
print('No data found for job ID "{}"'.format(jobid)) print('No data found for job ID "{}"'.format(jobid))
continue continue
fn = power.to_file(jobid) fn = power.to_file(jobid, header)
if fn: if fn:
print('Created file {fn}'.format(fn=fn)) print('Created file {fn}'.format(fn=fn))
if __name__ == "__main__": if __name__ == "__main__":
import sys import sys
from datetime import datetime
config = parse_arguments(sys.argv[1:]) config = parse_arguments(sys.argv[1:])
config.cmd = " ".join(sys.argv)
config.datetime = f"{datetime.now()}"
main = App(config) main = App(config)
main.run_all() main.run_all()