Utility to fetch from psql database and export to csv like format #1
2 changed files with 203 additions and 0 deletions
8
monitoring/db/elklikeoutput/compile.sh
Normal file
8
monitoring/db/elklikeoutput/compile.sh
Normal file
|
@ -0,0 +1,8 @@
|
|||
#!/bin/bash
|
||||
|
||||
module load gcc/13.1.0
|
||||
module list
|
||||
|
||||
PG_DIR=/quobyte/qb1/hpc34576/hpc34576/postgres
|
||||
echo "Before running, use \"export LD_LIBRARY_PATH=/quobyte/qb1/hpc34576/hpc34576/postgres/lib:\$LD_LIBRARY_PATH\""
|
||||
gcc -O3 -I${PG_DIR}/include -L${PG_DIR}/lib poc.c -lpq
|
195
monitoring/db/elklikeoutput/db-query.c
Normal file
195
monitoring/db/elklikeoutput/db-query.c
Normal file
|
@ -0,0 +1,195 @@
|
|||
/* License? */
|
||||
|
||||
#include"libpq-fe.h"
|
||||
#include<stdio.h>
|
||||
#include<stdlib.h>
|
||||
#include<string.h>
|
||||
|
||||
#define MIN(a,b) ((a)<(b)?(a):(b))
|
||||
|
||||
static int GetNumNodes(PGconn *const conn, const char *const jobid)
|
||||
{
|
||||
int n= 0;
|
||||
char cmd[84];
|
||||
sprintf(cmd, "select node_amount from jobs where job_id='%7s.hawk-pbs5'", jobid);
|
||||
PGresult *res= PQexec(conn, cmd);
|
||||
if(PGRES_TUPLES_OK!= PQresultStatus(res)) {
|
||||
goto bye;
|
||||
}
|
||||
const int nfields= PQnfields(res);
|
||||
const int ntups= PQntuples(res);
|
||||
if(1!= nfields|| 1!= ntups) {
|
||||
goto bye;
|
||||
}
|
||||
n= atoi(PQgetvalue(res, 0, 0));
|
||||
|
||||
bye:
|
||||
PQclear(res);
|
||||
return n;
|
||||
}
|
||||
|
||||
static int NumNodes= 0;
|
||||
static char (*NodeNames)[32]= NULL;
|
||||
inline static int GetNodeIx(const char *const name)
|
||||
{
|
||||
int i;
|
||||
for(i= 0; i< NumNodes; ++i) {
|
||||
if(strlen(NodeNames[i])> 0) {
|
||||
if(0== strcmp(NodeNames[i], name)) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
strcpy(NodeNames[i], name);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(i>= NumNodes) {
|
||||
printf("Strange situation with node-name!\n");
|
||||
}
|
||||
return i;
|
||||
}
|
||||
|
||||
inline static void WriteTimeVal(FILE *const fp, const long long tp, const double *const vals, const int n)
|
||||
{
|
||||
fprintf(fp, " %lld", tp);
|
||||
for(int i= 0; i< n; ++i) {
|
||||
fprintf(fp, " %.1lf", vals[i]);
|
||||
}
|
||||
fprintf(fp, "\n");
|
||||
}
|
||||
|
||||
/* Somebody took time to write this query at <https://kb.hlrs.de/monitoring/index.php/TimescaleDB#Job_data> */
|
||||
/* Used the query as is instead of building the query as the database code is expected to be most optimised */
|
||||
char QString[1170]=
|
||||
"with job as (\n"
|
||||
" select job_id, starttime, endtime, nodes from jobs where job_id='xxxxxxx.hawk-pbs5'\n"
|
||||
"),\n"
|
||||
"node_series as (\n"
|
||||
" select n.name, scmcavg.id as series_id from nodes n\n"
|
||||
" inner join (select * from label where key='node') l on n.id = l.value::int\n"
|
||||
" inner join series_cmc_power_racktraynodepoweravg scmcavg on l.id = scmcavg.labels[(\n"
|
||||
" select pos from label_key_position\n"
|
||||
" where metric_category= 'cmc_power'\n"
|
||||
" and metric_name = 'RackTrayNodePowerAvg'\n"
|
||||
" and key = 'node'\n"
|
||||
" )]\n"
|
||||
" where n.id = any((select nodes from job)::int[])\n"
|
||||
")\n"
|
||||
"select a.time, a.value, ns.name from (\n"
|
||||
" select\n"
|
||||
" time_bucket(extract ('epoch' from '5 seconds'::interval)::int*1000, cmcavg.ts) as time,\n"
|
||||
" cmcavg.series_id::varchar,\n"
|
||||
" avg(cmcavg.val) AS value\n"
|
||||
" from cmc_power_racktraynodepoweravg cmcavg\n"
|
||||
"where\n"
|
||||
" ts <= (select endtime from job)\n"
|
||||
" and ts >= (select starttime from job)\n"
|
||||
" and series_id = Any(select series_id from node_series)\n"
|
||||
"group by time, cmcavg.series_id order by time desc) a\n"
|
||||
"inner join node_series ns on a.series_id::int = ns.series_id;\n";
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
if(argc< 2|| 7!= strlen(argv[1])|| 0== atoi(argv[1])) {
|
||||
printf("Invalid job-id (%s)\n", argc< 2? "(null)": argv[1]);
|
||||
return 0;
|
||||
}
|
||||
|
||||
PGconn *conn= PQsetdbLogin("hawk-monitor4", "5432", "", "", "coe_mon", "hpc", "");
|
||||
if(CONNECTION_OK!= PQstatus(conn)) {
|
||||
printf("Cannot connect to the database!\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
NumNodes= GetNumNodes(conn, argv[1]);
|
||||
if(0== NumNodes) {
|
||||
printf("Cannot retrieve number of nodes for the job-id: %d!\n", argv[1]);
|
||||
goto bye;
|
||||
}
|
||||
|
||||
char *ptr= strstr(QString, "xxxxxxx.hawk-pbs5");
|
||||
strncpy(ptr, argv[1], 7);
|
||||
|
||||
PGresult *res= PQexec(conn, QString);
|
||||
if(PGRES_TUPLES_OK!= PQresultStatus(res)) {
|
||||
printf("Problem here!\n");
|
||||
PQclear(res);
|
||||
}
|
||||
|
||||
NodeNames= (char (*)[32]) malloc(sizeof(char[32])* NumNodes);
|
||||
for(int i= 0; i< NumNodes; ++i) {
|
||||
memset(NodeNames[i], 0, sizeof(char[32]));
|
||||
}
|
||||
|
||||
const int nfields= PQnfields(res);
|
||||
/* printf("%d fields\n", nfields); */
|
||||
int itime= -1, inname= -1, ival= -1;
|
||||
for(int i= 0; i< nfields; ++i) {
|
||||
const char *fieldname= PQfname(res, i);
|
||||
if(0== strcmp("time", fieldname)) {
|
||||
itime= i;
|
||||
} else if(0== strcmp("name", fieldname)) {
|
||||
inname= i;
|
||||
} else if(0== strcmp("value", fieldname)) {
|
||||
ival= i;
|
||||
} else {
|
||||
printf("Do not know how to read field \"%s\"\n", fieldname);
|
||||
}
|
||||
}
|
||||
if(-1== itime|| -1== inname|| -1== ival) {
|
||||
printf("Unknown fields causing program to exit!\n");
|
||||
PQclear(res);
|
||||
goto bye;
|
||||
}
|
||||
/* printf("itime= %d, inname= %d, ival= %d\n", itime, inname, ival); */
|
||||
|
||||
int ntups= PQntuples(res);
|
||||
/* printf("%d tuples\n", ntups); */
|
||||
if(0!= ntups% NumNodes) {
|
||||
printf("#entries (%d) is not integral multiple of #nodes (%d)\n", ntups, NumNodes);
|
||||
}
|
||||
|
||||
int tup0= 0, tup1= ntups, tupstep= 1;
|
||||
long long tp_min= atoll(PQgetvalue(res, tup0, itime));
|
||||
if(tp_min> atoll(PQgetvalue(res, ntups- 1, itime))) {
|
||||
printf("Time in reverse order!\n");
|
||||
tup0= ntups- 1; tup1= -1; tupstep= -1;
|
||||
tp_min= atoll(PQgetvalue(res, tup0, itime));
|
||||
}
|
||||
|
||||
long long tp, last_tp= tp_min;
|
||||
double *vals= malloc(sizeof(double)* NumNodes);
|
||||
memset(vals, 0, sizeof(double)* NumNodes);
|
||||
char nname[32];
|
||||
FILE *fp= fopen("oldstyleoutput.dat", "w");
|
||||
for(int it= tup0; tup1!= it; it+= tupstep) {
|
||||
tp= atoll(PQgetvalue(res, it, itime)); /* - tp_min; */
|
||||
if(tp!= last_tp&& tp< last_tp) { /* some sanity check because I don't understand how the database querry works! */
|
||||
printf("Strange ordering!\n");
|
||||
}
|
||||
if(tp> last_tp) { /* write out the last time-point */
|
||||
WriteTimeVal(fp, last_tp, vals, NumNodes);
|
||||
memset(vals, 0, sizeof(double)* NumNodes);
|
||||
last_tp= tp;
|
||||
}
|
||||
int nodeix= GetNodeIx(PQgetvalue(res, it, inname));
|
||||
vals[nodeix]= atof(PQgetvalue(res, it, ival));
|
||||
}
|
||||
WriteTimeVal(fp, last_tp, vals, NumNodes);
|
||||
fprintf(fp, "#offsetted-time-point (ms)");
|
||||
for(int i= 0; i< NumNodes; ++i) {
|
||||
fprintf(fp, " %s", NodeNames[i]);
|
||||
}
|
||||
fprintf(fp, "\n");
|
||||
fclose(fp); fp= NULL;
|
||||
|
||||
if(NULL!= vals) { free(vals); vals= NULL; }
|
||||
if(NULL!= NodeNames) { free(NodeNames); NodeNames= NULL; }
|
||||
|
||||
PQclear(res);
|
||||
|
||||
bye:
|
||||
PQfinish(conn);
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Reference in a new issue