Compare commits
17 commits
73fc7d4ba7
...
5f4765d196
Author | SHA1 | Date | |
---|---|---|---|
5f4765d196 | |||
c0f6adcf67 | |||
035803fbe2 | |||
ee106619ea | |||
36bfb06228 | |||
f9ebd68654 | |||
9294b04291 | |||
bacc9e63c9 | |||
8596fb4184 | |||
f90b3fc1da | |||
3a8170e7c9 | |||
294a4e1273 | |||
b54e31f4fd | |||
3d52b1b2c7 | |||
831a85639b | |||
8a60c2dd9f | |||
b321a25fb0 |
4 changed files with 434 additions and 0 deletions
|
@ -1,3 +1,22 @@
|
||||||
# hawk-utils-scripts/monitoring/db
|
# hawk-utils-scripts/monitoring/db
|
||||||
|
|
||||||
Use this directory for scripts, etc, that operate on or extract data from the the database.
|
Use this directory for scripts, etc, that operate on or extract data from the the database.
|
||||||
|
|
||||||
|
## scripts/get_detailed_power.py
|
||||||
|
|
||||||
|
Python script which querries the database for a list of jobids and produces a file for each jobid.
|
||||||
|
|
||||||
|
Those files contain time-resolved power consumption data and are meant to be consumed by the utility in [monitoring/logs/scripts/plot_energy_logs.sh](../logs/README.md#scripts/plot_energy_logs.sh).
|
||||||
|
|
||||||
|
Requirements:
|
||||||
|
|
||||||
|
- Python module sqlalchemy `python -m pip install sqlalchemy`
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
ssh monitoring
|
||||||
|
./get_detailed_power.py 2335001 7678069.hawk-pbs5
|
||||||
|
Created file detailed_power.2335001.hawk-pbs5.100000-1000004.csv
|
||||||
|
Created file detailed_power.7678069.hawk-pbs5.2432345-423423.csv
|
||||||
|
```
|
||||||
|
|
8
monitoring/db/elklikeoutput/compile.sh
Normal file
8
monitoring/db/elklikeoutput/compile.sh
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
module load gcc/13.1.0
|
||||||
|
module list
|
||||||
|
|
||||||
|
PG_DIR=/quobyte/qb1/hpc34576/hpc34576/postgres
|
||||||
|
echo "Before running, use \"export LD_LIBRARY_PATH=/quobyte/qb1/hpc34576/hpc34576/postgres/lib:\$LD_LIBRARY_PATH\""
|
||||||
|
gcc -O3 -I${PG_DIR}/include -L${PG_DIR}/lib poc.c -lpq
|
195
monitoring/db/elklikeoutput/db-query.c
Normal file
195
monitoring/db/elklikeoutput/db-query.c
Normal file
|
@ -0,0 +1,195 @@
|
||||||
|
/* License? */
|
||||||
|
|
||||||
|
#include"libpq-fe.h"
|
||||||
|
#include<stdio.h>
|
||||||
|
#include<stdlib.h>
|
||||||
|
#include<string.h>
|
||||||
|
|
||||||
|
#define MIN(a,b) ((a)<(b)?(a):(b))
|
||||||
|
|
||||||
|
static int GetNumNodes(PGconn *const conn, const char *const jobid)
|
||||||
|
{
|
||||||
|
int n= 0;
|
||||||
|
char cmd[84];
|
||||||
|
sprintf(cmd, "select node_amount from jobs where job_id='%7s.hawk-pbs5'", jobid);
|
||||||
|
PGresult *res= PQexec(conn, cmd);
|
||||||
|
if(PGRES_TUPLES_OK!= PQresultStatus(res)) {
|
||||||
|
goto bye;
|
||||||
|
}
|
||||||
|
const int nfields= PQnfields(res);
|
||||||
|
const int ntups= PQntuples(res);
|
||||||
|
if(1!= nfields|| 1!= ntups) {
|
||||||
|
goto bye;
|
||||||
|
}
|
||||||
|
n= atoi(PQgetvalue(res, 0, 0));
|
||||||
|
|
||||||
|
bye:
|
||||||
|
PQclear(res);
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int NumNodes= 0;
|
||||||
|
static char (*NodeNames)[32]= NULL;
|
||||||
|
inline static int GetNodeIx(const char *const name)
|
||||||
|
{
|
||||||
|
int i;
|
||||||
|
for(i= 0; i< NumNodes; ++i) {
|
||||||
|
if(strlen(NodeNames[i])> 0) {
|
||||||
|
if(0== strcmp(NodeNames[i], name)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
strcpy(NodeNames[i], name);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(i>= NumNodes) {
|
||||||
|
printf("Strange situation with node-name!\n");
|
||||||
|
}
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline static void WriteTimeVal(FILE *const fp, const long long tp, const double *const vals, const int n)
|
||||||
|
{
|
||||||
|
fprintf(fp, " %lld", tp);
|
||||||
|
for(int i= 0; i< n; ++i) {
|
||||||
|
fprintf(fp, " %.1lf", vals[i]);
|
||||||
|
}
|
||||||
|
fprintf(fp, "\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Somebody took time to write this query at <https://kb.hlrs.de/monitoring/index.php/TimescaleDB#Job_data> */
|
||||||
|
/* Used the query as is instead of building the query as the database code is expected to be most optimised */
|
||||||
|
char QString[1170]=
|
||||||
|
"with job as (\n"
|
||||||
|
" select job_id, starttime, endtime, nodes from jobs where job_id='xxxxxxx.hawk-pbs5'\n"
|
||||||
|
"),\n"
|
||||||
|
"node_series as (\n"
|
||||||
|
" select n.name, scmcavg.id as series_id from nodes n\n"
|
||||||
|
" inner join (select * from label where key='node') l on n.id = l.value::int\n"
|
||||||
|
" inner join series_cmc_power_racktraynodepoweravg scmcavg on l.id = scmcavg.labels[(\n"
|
||||||
|
" select pos from label_key_position\n"
|
||||||
|
" where metric_category= 'cmc_power'\n"
|
||||||
|
" and metric_name = 'RackTrayNodePowerAvg'\n"
|
||||||
|
" and key = 'node'\n"
|
||||||
|
" )]\n"
|
||||||
|
" where n.id = any((select nodes from job)::int[])\n"
|
||||||
|
")\n"
|
||||||
|
"select a.time, a.value, ns.name from (\n"
|
||||||
|
" select\n"
|
||||||
|
" time_bucket(extract ('epoch' from '5 seconds'::interval)::int*1000, cmcavg.ts) as time,\n"
|
||||||
|
" cmcavg.series_id::varchar,\n"
|
||||||
|
" avg(cmcavg.val) AS value\n"
|
||||||
|
" from cmc_power_racktraynodepoweravg cmcavg\n"
|
||||||
|
"where\n"
|
||||||
|
" ts <= (select endtime from job)\n"
|
||||||
|
" and ts >= (select starttime from job)\n"
|
||||||
|
" and series_id = Any(select series_id from node_series)\n"
|
||||||
|
"group by time, cmcavg.series_id order by time desc) a\n"
|
||||||
|
"inner join node_series ns on a.series_id::int = ns.series_id;\n";
|
||||||
|
|
||||||
|
int main(int argc, char *argv[])
|
||||||
|
{
|
||||||
|
if(argc< 2|| 7!= strlen(argv[1])|| 0== atoi(argv[1])) {
|
||||||
|
printf("Invalid job-id (%s)\n", argc< 2? "(null)": argv[1]);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
PGconn *conn= PQsetdbLogin("hawk-monitor4", "5432", "", "", "coe_mon", "hpc", "");
|
||||||
|
if(CONNECTION_OK!= PQstatus(conn)) {
|
||||||
|
printf("Cannot connect to the database!\n");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
NumNodes= GetNumNodes(conn, argv[1]);
|
||||||
|
if(0== NumNodes) {
|
||||||
|
printf("Cannot retrieve number of nodes for the job-id: %d!\n", argv[1]);
|
||||||
|
goto bye;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *ptr= strstr(QString, "xxxxxxx.hawk-pbs5");
|
||||||
|
strncpy(ptr, argv[1], 7);
|
||||||
|
|
||||||
|
PGresult *res= PQexec(conn, QString);
|
||||||
|
if(PGRES_TUPLES_OK!= PQresultStatus(res)) {
|
||||||
|
printf("Problem here!\n");
|
||||||
|
PQclear(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
NodeNames= (char (*)[32]) malloc(sizeof(char[32])* NumNodes);
|
||||||
|
for(int i= 0; i< NumNodes; ++i) {
|
||||||
|
memset(NodeNames[i], 0, sizeof(char[32]));
|
||||||
|
}
|
||||||
|
|
||||||
|
const int nfields= PQnfields(res);
|
||||||
|
/* printf("%d fields\n", nfields); */
|
||||||
|
int itime= -1, inname= -1, ival= -1;
|
||||||
|
for(int i= 0; i< nfields; ++i) {
|
||||||
|
const char *fieldname= PQfname(res, i);
|
||||||
|
if(0== strcmp("time", fieldname)) {
|
||||||
|
itime= i;
|
||||||
|
} else if(0== strcmp("name", fieldname)) {
|
||||||
|
inname= i;
|
||||||
|
} else if(0== strcmp("value", fieldname)) {
|
||||||
|
ival= i;
|
||||||
|
} else {
|
||||||
|
printf("Do not know how to read field \"%s\"\n", fieldname);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(-1== itime|| -1== inname|| -1== ival) {
|
||||||
|
printf("Unknown fields causing program to exit!\n");
|
||||||
|
PQclear(res);
|
||||||
|
goto bye;
|
||||||
|
}
|
||||||
|
/* printf("itime= %d, inname= %d, ival= %d\n", itime, inname, ival); */
|
||||||
|
|
||||||
|
int ntups= PQntuples(res);
|
||||||
|
/* printf("%d tuples\n", ntups); */
|
||||||
|
if(0!= ntups% NumNodes) {
|
||||||
|
printf("#entries (%d) is not integral multiple of #nodes (%d)\n", ntups, NumNodes);
|
||||||
|
}
|
||||||
|
|
||||||
|
int tup0= 0, tup1= ntups, tupstep= 1;
|
||||||
|
long long tp_min= atoll(PQgetvalue(res, tup0, itime));
|
||||||
|
if(tp_min> atoll(PQgetvalue(res, ntups- 1, itime))) {
|
||||||
|
printf("Time in reverse order!\n");
|
||||||
|
tup0= ntups- 1; tup1= -1; tupstep= -1;
|
||||||
|
tp_min= atoll(PQgetvalue(res, tup0, itime));
|
||||||
|
}
|
||||||
|
|
||||||
|
long long tp, last_tp= tp_min;
|
||||||
|
double *vals= malloc(sizeof(double)* NumNodes);
|
||||||
|
memset(vals, 0, sizeof(double)* NumNodes);
|
||||||
|
char nname[32];
|
||||||
|
FILE *fp= fopen("oldstyleoutput.dat", "w");
|
||||||
|
for(int it= tup0; tup1!= it; it+= tupstep) {
|
||||||
|
tp= atoll(PQgetvalue(res, it, itime)); /* - tp_min; */
|
||||||
|
if(tp!= last_tp&& tp< last_tp) { /* some sanity check because I don't understand how the database querry works! */
|
||||||
|
printf("Strange ordering!\n");
|
||||||
|
}
|
||||||
|
if(tp> last_tp) { /* write out the last time-point */
|
||||||
|
WriteTimeVal(fp, last_tp, vals, NumNodes);
|
||||||
|
memset(vals, 0, sizeof(double)* NumNodes);
|
||||||
|
last_tp= tp;
|
||||||
|
}
|
||||||
|
int nodeix= GetNodeIx(PQgetvalue(res, it, inname));
|
||||||
|
vals[nodeix]= atof(PQgetvalue(res, it, ival));
|
||||||
|
}
|
||||||
|
WriteTimeVal(fp, last_tp, vals, NumNodes);
|
||||||
|
fprintf(fp, "#offsetted-time-point (ms)");
|
||||||
|
for(int i= 0; i< NumNodes; ++i) {
|
||||||
|
fprintf(fp, " %s", NodeNames[i]);
|
||||||
|
}
|
||||||
|
fprintf(fp, "\n");
|
||||||
|
fclose(fp); fp= NULL;
|
||||||
|
|
||||||
|
if(NULL!= vals) { free(vals); vals= NULL; }
|
||||||
|
if(NULL!= NodeNames) { free(NodeNames); NodeNames= NULL; }
|
||||||
|
|
||||||
|
PQclear(res);
|
||||||
|
|
||||||
|
bye:
|
||||||
|
PQfinish(conn);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
212
monitoring/db/scripts/get_detailed_power.py
Normal file
212
monitoring/db/scripts/get_detailed_power.py
Normal file
|
@ -0,0 +1,212 @@
|
||||||
|
import argparse
|
||||||
|
import numpy as np
|
||||||
|
from collections import OrderedDict
|
||||||
|
import os.path
|
||||||
|
|
||||||
|
|
||||||
|
def parse_arguments():
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description='Produce detailed power usage data for a list of jobids.')
|
||||||
|
parser.add_argument('-v', '--verbose', action='store_true',
|
||||||
|
help='Show database querries, etc.')
|
||||||
|
parser.add_argument('jobid', type=parse_jobid,
|
||||||
|
nargs='+',
|
||||||
|
help='Job ID such as "2260215" or "2260215.hawk-pbs5"')
|
||||||
|
|
||||||
|
return parser.parse_args()
|
||||||
|
|
||||||
|
def parse_jobid(s):
|
||||||
|
import re
|
||||||
|
hawkpbs = r'.hawk-pbs5'
|
||||||
|
jobid = re.sub(hawkpbs, '', s)
|
||||||
|
if not jobid.isdigit():
|
||||||
|
raise argparse.ArgumentTypeError(f'invalid job ID "{s}"')
|
||||||
|
return jobid
|
||||||
|
|
||||||
|
|
||||||
|
class Power:
|
||||||
|
def __init__(self, nodes):
|
||||||
|
self.nodes = nodes
|
||||||
|
self.epochs = OrderedDict()
|
||||||
|
self.first_ts = None
|
||||||
|
self.last_ts = None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_list(cls, data):
|
||||||
|
"""Assumes data is a list of tuples (timestamp, value, node)"""
|
||||||
|
nodes = list(set([line[2] for line in data]))
|
||||||
|
cls = Power(nodes)
|
||||||
|
|
||||||
|
# for now ignore order to nodes
|
||||||
|
values = {}
|
||||||
|
for l in data:
|
||||||
|
ts = l[0]
|
||||||
|
if ts not in values:
|
||||||
|
values[ts] = []
|
||||||
|
# node = l[1]
|
||||||
|
power = l[1]
|
||||||
|
values[ts].append(power)
|
||||||
|
|
||||||
|
epochs = sorted(values.keys())
|
||||||
|
for epoch in epochs:
|
||||||
|
cls.insert_epoch(epoch, values[epoch])
|
||||||
|
|
||||||
|
return cls
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_db(cls, db, jobid, interval):
|
||||||
|
all_list = db.db_to_list(jobid, interval)
|
||||||
|
if not all_list:
|
||||||
|
raise RuntimeError
|
||||||
|
|
||||||
|
return Power.from_list(all_list)
|
||||||
|
|
||||||
|
def to_file(self, jobid):
|
||||||
|
"""Dumps power data to file. Returns filename is succesfull and None if unsucessfull."""
|
||||||
|
fname = self.filename(jobid)
|
||||||
|
if os.path.exists(fname):
|
||||||
|
print("Error: cowardly refusing to overwrite file ", fname)
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(fname, "w+") as f:
|
||||||
|
f.write(self.header())
|
||||||
|
f.write(self.body())
|
||||||
|
except IOError:
|
||||||
|
print("Error: could not write to file ", filename)
|
||||||
|
fname = None
|
||||||
|
|
||||||
|
return fname
|
||||||
|
|
||||||
|
def insert_epoch(self, ts, values):
|
||||||
|
self.epochs[ts] = values
|
||||||
|
if not self.first_ts:
|
||||||
|
self.first_ts = ts
|
||||||
|
self.last_ts = ts
|
||||||
|
|
||||||
|
def header(self):
|
||||||
|
hd = "# all timestamp have unit miliseconds since unix epoch\n"
|
||||||
|
hd += "# all power values have unit Watt\n"
|
||||||
|
hd += "timestamp,delta_t,head_node_power,avg_node_power,median_node_power,min_node_power,max_node_power,std_dev_node_power"
|
||||||
|
# add node names here instead
|
||||||
|
hd += ",NO_NODE_NAMES_YET\n"
|
||||||
|
return hd
|
||||||
|
|
||||||
|
def body(self):
|
||||||
|
_body = ""
|
||||||
|
for epoch in self.epochs.items():
|
||||||
|
_body += Power.pretty_print(self.summarize_epoch(epoch))
|
||||||
|
return _body
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def summarize_time(ts):
|
||||||
|
return ts, -1
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def summarize_values(val):
|
||||||
|
values = np.asarray(val)
|
||||||
|
head = values[0]
|
||||||
|
min, max = values.min(), values.max()
|
||||||
|
avg, stddev = values.mean(), values.std()
|
||||||
|
median = np.median(values)
|
||||||
|
return head, avg, median, min, max, stddev
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def summarize_epoch(epoch):
|
||||||
|
ts, values = epoch
|
||||||
|
return Power.summarize_time(ts) \
|
||||||
|
+ Power.summarize_values(values)
|
||||||
|
# + values
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def pretty_print(args):
|
||||||
|
return ",".join(str(a) for a in args) + '\n'
|
||||||
|
|
||||||
|
def filename(self, jobid):
|
||||||
|
fname = "detailed_power_{jobid}.hawk-pbs5.{first}-{last}.csv".format(
|
||||||
|
jobid=jobid, first=self.first_ts, last=self.last_ts
|
||||||
|
)
|
||||||
|
return fname
|
||||||
|
|
||||||
|
|
||||||
|
class MonitoringDB:
|
||||||
|
def __init__(self, verbose):
|
||||||
|
self.connection = self.init_db(verbose)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def init_db(verbose):
|
||||||
|
import sqlalchemy as db
|
||||||
|
_verbose = False #True
|
||||||
|
engine = db.create_engine('postgresql://hpc@hawk-monitor4:5432/coe_mon', echo=verbose)
|
||||||
|
connection = engine.connect()
|
||||||
|
return connection
|
||||||
|
|
||||||
|
def close_db(self):
|
||||||
|
return self.connection.close()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def build_query(jobid, interval):
|
||||||
|
import sqlalchemy as db
|
||||||
|
query_string = """with job as (
|
||||||
|
select job_id, starttime, endtime, nodes from jobs where job_id='{jobid}.hawk-pbs5'
|
||||||
|
),
|
||||||
|
node_series as(
|
||||||
|
select n.name, scmcavg.id as series_id from nodes n
|
||||||
|
inner join (select * from label where key='node') l on n.id = l.value::int
|
||||||
|
inner join series_cmc_power_racktraynodepoweravg scmcavg on l.id = scmcavg.labels[(
|
||||||
|
select pos from label_key_position
|
||||||
|
where metric_category= 'cmc_power'
|
||||||
|
and metric_name = 'RackTrayNodePowerAvg'
|
||||||
|
and key = 'node'
|
||||||
|
)]
|
||||||
|
where n.id = any((select nodes from job)::int[])
|
||||||
|
)
|
||||||
|
select a.time, a.value, ns.name from (
|
||||||
|
select
|
||||||
|
time_bucket(extract ('epoch' from '{interval} seconds'::interval)::int*1000, cmcavg.ts) as time,
|
||||||
|
cmcavg.series_id::varchar,
|
||||||
|
avg(cmcavg.val) AS value
|
||||||
|
from cmc_power_racktraynodepoweravg cmcavg
|
||||||
|
where
|
||||||
|
ts <= (select endtime from job)
|
||||||
|
and ts >= (select starttime from job)
|
||||||
|
and series_id = Any(select series_id from node_series)
|
||||||
|
group by time, cmcavg.series_id order by time desc) a
|
||||||
|
inner join node_series ns on a.series_id::int = ns.series_id;
|
||||||
|
"""
|
||||||
|
return db.text(query_string.format(jobid=jobid, interval=interval))
|
||||||
|
|
||||||
|
def db_to_list(self, jobid, interval):
|
||||||
|
query = self.build_query(jobid, interval)
|
||||||
|
return self.connection.execute(query).fetchall()
|
||||||
|
|
||||||
|
def db_to_pf(self, jobid, inerval):
|
||||||
|
query = self.build_query(jobid, interval)
|
||||||
|
return pd.read_sql(query, con=self.connection)
|
||||||
|
|
||||||
|
|
||||||
|
class App:
|
||||||
|
def __init__(self, config):
|
||||||
|
self.config = config
|
||||||
|
self.db = MonitoringDB(self.config.verbose)
|
||||||
|
|
||||||
|
def run_all(self):
|
||||||
|
for jobid in self.config.jobid:
|
||||||
|
try:
|
||||||
|
power = Power.from_db(self.db, jobid, self.config.interval)
|
||||||
|
except RuntimeError:
|
||||||
|
print('No data found for job ID "{}"'.format(jobid))
|
||||||
|
continue
|
||||||
|
|
||||||
|
fn = power.to_file(jobid)
|
||||||
|
if fn:
|
||||||
|
print('Created file {fn}'.format(fn=fn))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
config = parse_arguments()
|
||||||
|
config.interval = 5
|
||||||
|
|
||||||
|
main = App(config)
|
||||||
|
main.run_all()
|
||||||
|
|
Loading…
Reference in a new issue