Compare commits

...

17 commits

Author SHA1 Message Date
5f4765d196 Update of README 2023-11-22 11:18:36 +01:00
c0f6adcf67 Remove debug output and fix whitespace 2023-11-22 11:03:56 +01:00
035803fbe2 Error handling for jobid without data 2023-11-22 10:52:49 +01:00
ee106619ea Outline main into class 2023-11-22 10:30:36 +01:00
36bfb06228 Add factory function for power from data base 2023-11-22 09:01:05 +01:00
f9ebd68654 Refactor database class for multiple querries 2023-11-20 16:50:22 +01:00
9294b04291 Introduce class for database handling 2023-11-20 16:23:36 +01:00
bacc9e63c9 Use config object 2023-11-20 16:19:34 +01:00
8596fb4184 Remove/comment debug output 2023-11-20 16:04:19 +01:00
f90b3fc1da Rearrange class methods 2023-11-20 15:53:52 +01:00
3a8170e7c9 Add missing import statement 2023-11-20 11:39:11 +01:00
294a4e1273 Add argument parser 2023-11-17 16:25:51 +01:00
b54e31f4fd Lazy import of sqlalchemy 2023-11-17 16:24:00 +01:00
3d52b1b2c7 Format README 2023-11-17 16:20:57 +01:00
831a85639b Add prototype to query job power from database 2023-10-26 16:19:16 +02:00
8a60c2dd9f Merge pull request 'Utility to fetch from psql database and export to csv like format' (#1) from hpckhald/hawk-utils-scripts:pgsql-db into pgsql-db
Reviewed-on: #1
2023-09-14 12:18:49 +00:00
b321a25fb0 First commit for getting data from PG-SQL 2023-08-09 17:39:31 +02:00
4 changed files with 434 additions and 0 deletions

View file

@ -1,3 +1,22 @@
# hawk-utils-scripts/monitoring/db # hawk-utils-scripts/monitoring/db
Use this directory for scripts, etc, that operate on or extract data from the the database. Use this directory for scripts, etc, that operate on or extract data from the the database.
## scripts/get_detailed_power.py
Python script which querries the database for a list of jobids and produces a file for each jobid.
Those files contain time-resolved power consumption data and are meant to be consumed by the utility in [monitoring/logs/scripts/plot_energy_logs.sh](../logs/README.md#scripts/plot_energy_logs.sh).
Requirements:
- Python module sqlalchemy `python -m pip install sqlalchemy`
Usage:
```bash
ssh monitoring
./get_detailed_power.py 2335001 7678069.hawk-pbs5
Created file detailed_power.2335001.hawk-pbs5.100000-1000004.csv
Created file detailed_power.7678069.hawk-pbs5.2432345-423423.csv
```

View file

@ -0,0 +1,8 @@
#!/bin/bash
module load gcc/13.1.0
module list
PG_DIR=/quobyte/qb1/hpc34576/hpc34576/postgres
echo "Before running, use \"export LD_LIBRARY_PATH=/quobyte/qb1/hpc34576/hpc34576/postgres/lib:\$LD_LIBRARY_PATH\""
gcc -O3 -I${PG_DIR}/include -L${PG_DIR}/lib poc.c -lpq

View file

@ -0,0 +1,195 @@
/* License? */
#include"libpq-fe.h"
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#define MIN(a,b) ((a)<(b)?(a):(b))
static int GetNumNodes(PGconn *const conn, const char *const jobid)
{
int n= 0;
char cmd[84];
sprintf(cmd, "select node_amount from jobs where job_id='%7s.hawk-pbs5'", jobid);
PGresult *res= PQexec(conn, cmd);
if(PGRES_TUPLES_OK!= PQresultStatus(res)) {
goto bye;
}
const int nfields= PQnfields(res);
const int ntups= PQntuples(res);
if(1!= nfields|| 1!= ntups) {
goto bye;
}
n= atoi(PQgetvalue(res, 0, 0));
bye:
PQclear(res);
return n;
}
static int NumNodes= 0;
static char (*NodeNames)[32]= NULL;
inline static int GetNodeIx(const char *const name)
{
int i;
for(i= 0; i< NumNodes; ++i) {
if(strlen(NodeNames[i])> 0) {
if(0== strcmp(NodeNames[i], name)) {
break;
}
} else {
strcpy(NodeNames[i], name);
break;
}
}
if(i>= NumNodes) {
printf("Strange situation with node-name!\n");
}
return i;
}
inline static void WriteTimeVal(FILE *const fp, const long long tp, const double *const vals, const int n)
{
fprintf(fp, " %lld", tp);
for(int i= 0; i< n; ++i) {
fprintf(fp, " %.1lf", vals[i]);
}
fprintf(fp, "\n");
}
/* Somebody took time to write this query at <https://kb.hlrs.de/monitoring/index.php/TimescaleDB#Job_data> */
/* Used the query as is instead of building the query as the database code is expected to be most optimised */
char QString[1170]=
"with job as (\n"
" select job_id, starttime, endtime, nodes from jobs where job_id='xxxxxxx.hawk-pbs5'\n"
"),\n"
"node_series as (\n"
" select n.name, scmcavg.id as series_id from nodes n\n"
" inner join (select * from label where key='node') l on n.id = l.value::int\n"
" inner join series_cmc_power_racktraynodepoweravg scmcavg on l.id = scmcavg.labels[(\n"
" select pos from label_key_position\n"
" where metric_category= 'cmc_power'\n"
" and metric_name = 'RackTrayNodePowerAvg'\n"
" and key = 'node'\n"
" )]\n"
" where n.id = any((select nodes from job)::int[])\n"
")\n"
"select a.time, a.value, ns.name from (\n"
" select\n"
" time_bucket(extract ('epoch' from '5 seconds'::interval)::int*1000, cmcavg.ts) as time,\n"
" cmcavg.series_id::varchar,\n"
" avg(cmcavg.val) AS value\n"
" from cmc_power_racktraynodepoweravg cmcavg\n"
"where\n"
" ts <= (select endtime from job)\n"
" and ts >= (select starttime from job)\n"
" and series_id = Any(select series_id from node_series)\n"
"group by time, cmcavg.series_id order by time desc) a\n"
"inner join node_series ns on a.series_id::int = ns.series_id;\n";
int main(int argc, char *argv[])
{
if(argc< 2|| 7!= strlen(argv[1])|| 0== atoi(argv[1])) {
printf("Invalid job-id (%s)\n", argc< 2? "(null)": argv[1]);
return 0;
}
PGconn *conn= PQsetdbLogin("hawk-monitor4", "5432", "", "", "coe_mon", "hpc", "");
if(CONNECTION_OK!= PQstatus(conn)) {
printf("Cannot connect to the database!\n");
return 0;
}
NumNodes= GetNumNodes(conn, argv[1]);
if(0== NumNodes) {
printf("Cannot retrieve number of nodes for the job-id: %d!\n", argv[1]);
goto bye;
}
char *ptr= strstr(QString, "xxxxxxx.hawk-pbs5");
strncpy(ptr, argv[1], 7);
PGresult *res= PQexec(conn, QString);
if(PGRES_TUPLES_OK!= PQresultStatus(res)) {
printf("Problem here!\n");
PQclear(res);
}
NodeNames= (char (*)[32]) malloc(sizeof(char[32])* NumNodes);
for(int i= 0; i< NumNodes; ++i) {
memset(NodeNames[i], 0, sizeof(char[32]));
}
const int nfields= PQnfields(res);
/* printf("%d fields\n", nfields); */
int itime= -1, inname= -1, ival= -1;
for(int i= 0; i< nfields; ++i) {
const char *fieldname= PQfname(res, i);
if(0== strcmp("time", fieldname)) {
itime= i;
} else if(0== strcmp("name", fieldname)) {
inname= i;
} else if(0== strcmp("value", fieldname)) {
ival= i;
} else {
printf("Do not know how to read field \"%s\"\n", fieldname);
}
}
if(-1== itime|| -1== inname|| -1== ival) {
printf("Unknown fields causing program to exit!\n");
PQclear(res);
goto bye;
}
/* printf("itime= %d, inname= %d, ival= %d\n", itime, inname, ival); */
int ntups= PQntuples(res);
/* printf("%d tuples\n", ntups); */
if(0!= ntups% NumNodes) {
printf("#entries (%d) is not integral multiple of #nodes (%d)\n", ntups, NumNodes);
}
int tup0= 0, tup1= ntups, tupstep= 1;
long long tp_min= atoll(PQgetvalue(res, tup0, itime));
if(tp_min> atoll(PQgetvalue(res, ntups- 1, itime))) {
printf("Time in reverse order!\n");
tup0= ntups- 1; tup1= -1; tupstep= -1;
tp_min= atoll(PQgetvalue(res, tup0, itime));
}
long long tp, last_tp= tp_min;
double *vals= malloc(sizeof(double)* NumNodes);
memset(vals, 0, sizeof(double)* NumNodes);
char nname[32];
FILE *fp= fopen("oldstyleoutput.dat", "w");
for(int it= tup0; tup1!= it; it+= tupstep) {
tp= atoll(PQgetvalue(res, it, itime)); /* - tp_min; */
if(tp!= last_tp&& tp< last_tp) { /* some sanity check because I don't understand how the database querry works! */
printf("Strange ordering!\n");
}
if(tp> last_tp) { /* write out the last time-point */
WriteTimeVal(fp, last_tp, vals, NumNodes);
memset(vals, 0, sizeof(double)* NumNodes);
last_tp= tp;
}
int nodeix= GetNodeIx(PQgetvalue(res, it, inname));
vals[nodeix]= atof(PQgetvalue(res, it, ival));
}
WriteTimeVal(fp, last_tp, vals, NumNodes);
fprintf(fp, "#offsetted-time-point (ms)");
for(int i= 0; i< NumNodes; ++i) {
fprintf(fp, " %s", NodeNames[i]);
}
fprintf(fp, "\n");
fclose(fp); fp= NULL;
if(NULL!= vals) { free(vals); vals= NULL; }
if(NULL!= NodeNames) { free(NodeNames); NodeNames= NULL; }
PQclear(res);
bye:
PQfinish(conn);
return 0;
}

View file

@ -0,0 +1,212 @@
import argparse
import numpy as np
from collections import OrderedDict
import os.path
def parse_arguments():
parser = argparse.ArgumentParser(
description='Produce detailed power usage data for a list of jobids.')
parser.add_argument('-v', '--verbose', action='store_true',
help='Show database querries, etc.')
parser.add_argument('jobid', type=parse_jobid,
nargs='+',
help='Job ID such as "2260215" or "2260215.hawk-pbs5"')
return parser.parse_args()
def parse_jobid(s):
import re
hawkpbs = r'.hawk-pbs5'
jobid = re.sub(hawkpbs, '', s)
if not jobid.isdigit():
raise argparse.ArgumentTypeError(f'invalid job ID "{s}"')
return jobid
class Power:
def __init__(self, nodes):
self.nodes = nodes
self.epochs = OrderedDict()
self.first_ts = None
self.last_ts = None
@classmethod
def from_list(cls, data):
"""Assumes data is a list of tuples (timestamp, value, node)"""
nodes = list(set([line[2] for line in data]))
cls = Power(nodes)
# for now ignore order to nodes
values = {}
for l in data:
ts = l[0]
if ts not in values:
values[ts] = []
# node = l[1]
power = l[1]
values[ts].append(power)
epochs = sorted(values.keys())
for epoch in epochs:
cls.insert_epoch(epoch, values[epoch])
return cls
@classmethod
def from_db(cls, db, jobid, interval):
all_list = db.db_to_list(jobid, interval)
if not all_list:
raise RuntimeError
return Power.from_list(all_list)
def to_file(self, jobid):
"""Dumps power data to file. Returns filename is succesfull and None if unsucessfull."""
fname = self.filename(jobid)
if os.path.exists(fname):
print("Error: cowardly refusing to overwrite file ", fname)
return None
try:
with open(fname, "w+") as f:
f.write(self.header())
f.write(self.body())
except IOError:
print("Error: could not write to file ", filename)
fname = None
return fname
def insert_epoch(self, ts, values):
self.epochs[ts] = values
if not self.first_ts:
self.first_ts = ts
self.last_ts = ts
def header(self):
hd = "# all timestamp have unit miliseconds since unix epoch\n"
hd += "# all power values have unit Watt\n"
hd += "timestamp,delta_t,head_node_power,avg_node_power,median_node_power,min_node_power,max_node_power,std_dev_node_power"
# add node names here instead
hd += ",NO_NODE_NAMES_YET\n"
return hd
def body(self):
_body = ""
for epoch in self.epochs.items():
_body += Power.pretty_print(self.summarize_epoch(epoch))
return _body
@staticmethod
def summarize_time(ts):
return ts, -1
@staticmethod
def summarize_values(val):
values = np.asarray(val)
head = values[0]
min, max = values.min(), values.max()
avg, stddev = values.mean(), values.std()
median = np.median(values)
return head, avg, median, min, max, stddev
@staticmethod
def summarize_epoch(epoch):
ts, values = epoch
return Power.summarize_time(ts) \
+ Power.summarize_values(values)
# + values
@staticmethod
def pretty_print(args):
return ",".join(str(a) for a in args) + '\n'
def filename(self, jobid):
fname = "detailed_power_{jobid}.hawk-pbs5.{first}-{last}.csv".format(
jobid=jobid, first=self.first_ts, last=self.last_ts
)
return fname
class MonitoringDB:
def __init__(self, verbose):
self.connection = self.init_db(verbose)
@staticmethod
def init_db(verbose):
import sqlalchemy as db
_verbose = False #True
engine = db.create_engine('postgresql://hpc@hawk-monitor4:5432/coe_mon', echo=verbose)
connection = engine.connect()
return connection
def close_db(self):
return self.connection.close()
@staticmethod
def build_query(jobid, interval):
import sqlalchemy as db
query_string = """with job as (
select job_id, starttime, endtime, nodes from jobs where job_id='{jobid}.hawk-pbs5'
),
node_series as(
select n.name, scmcavg.id as series_id from nodes n
inner join (select * from label where key='node') l on n.id = l.value::int
inner join series_cmc_power_racktraynodepoweravg scmcavg on l.id = scmcavg.labels[(
select pos from label_key_position
where metric_category= 'cmc_power'
and metric_name = 'RackTrayNodePowerAvg'
and key = 'node'
)]
where n.id = any((select nodes from job)::int[])
)
select a.time, a.value, ns.name from (
select
time_bucket(extract ('epoch' from '{interval} seconds'::interval)::int*1000, cmcavg.ts) as time,
cmcavg.series_id::varchar,
avg(cmcavg.val) AS value
from cmc_power_racktraynodepoweravg cmcavg
where
ts <= (select endtime from job)
and ts >= (select starttime from job)
and series_id = Any(select series_id from node_series)
group by time, cmcavg.series_id order by time desc) a
inner join node_series ns on a.series_id::int = ns.series_id;
"""
return db.text(query_string.format(jobid=jobid, interval=interval))
def db_to_list(self, jobid, interval):
query = self.build_query(jobid, interval)
return self.connection.execute(query).fetchall()
def db_to_pf(self, jobid, inerval):
query = self.build_query(jobid, interval)
return pd.read_sql(query, con=self.connection)
class App:
def __init__(self, config):
self.config = config
self.db = MonitoringDB(self.config.verbose)
def run_all(self):
for jobid in self.config.jobid:
try:
power = Power.from_db(self.db, jobid, self.config.interval)
except RuntimeError:
print('No data found for job ID "{}"'.format(jobid))
continue
fn = power.to_file(jobid)
if fn:
print('Created file {fn}'.format(fn=fn))
if __name__ == "__main__":
config = parse_arguments()
config.interval = 5
main = App(config)
main.run_all()