From b321a25fb077ab4b85953906eefe65b73a4a439a Mon Sep 17 00:00:00 2001 From: Kingshuk Haldar Date: Wed, 9 Aug 2023 17:39:31 +0200 Subject: [PATCH] First commit for getting data from PG-SQL --- monitoring/db/elklikeoutput/compile.sh | 8 + monitoring/db/elklikeoutput/db-query.c | 195 +++++++++++++++++++++++++ 2 files changed, 203 insertions(+) create mode 100644 monitoring/db/elklikeoutput/compile.sh create mode 100644 monitoring/db/elklikeoutput/db-query.c diff --git a/monitoring/db/elklikeoutput/compile.sh b/monitoring/db/elklikeoutput/compile.sh new file mode 100644 index 0000000..d819bc5 --- /dev/null +++ b/monitoring/db/elklikeoutput/compile.sh @@ -0,0 +1,8 @@ +#!/bin/bash + +module load gcc/13.1.0 +module list + +PG_DIR=/quobyte/qb1/hpc34576/hpc34576/postgres +echo "Before running, use \"export LD_LIBRARY_PATH=/quobyte/qb1/hpc34576/hpc34576/postgres/lib:\$LD_LIBRARY_PATH\"" +gcc -O3 -I${PG_DIR}/include -L${PG_DIR}/lib poc.c -lpq diff --git a/monitoring/db/elklikeoutput/db-query.c b/monitoring/db/elklikeoutput/db-query.c new file mode 100644 index 0000000..1d4ddaa --- /dev/null +++ b/monitoring/db/elklikeoutput/db-query.c @@ -0,0 +1,195 @@ +/* License? */ + +#include"libpq-fe.h" +#include +#include +#include + +#define MIN(a,b) ((a)<(b)?(a):(b)) + +static int GetNumNodes(PGconn *const conn, const char *const jobid) +{ + int n= 0; + char cmd[84]; + sprintf(cmd, "select node_amount from jobs where job_id='%7s.hawk-pbs5'", jobid); + PGresult *res= PQexec(conn, cmd); + if(PGRES_TUPLES_OK!= PQresultStatus(res)) { + goto bye; + } + const int nfields= PQnfields(res); + const int ntups= PQntuples(res); + if(1!= nfields|| 1!= ntups) { + goto bye; + } + n= atoi(PQgetvalue(res, 0, 0)); + + bye: + PQclear(res); + return n; +} + +static int NumNodes= 0; +static char (*NodeNames)[32]= NULL; +inline static int GetNodeIx(const char *const name) +{ + int i; + for(i= 0; i< NumNodes; ++i) { + if(strlen(NodeNames[i])> 0) { + if(0== strcmp(NodeNames[i], name)) { + break; + } + } else { + strcpy(NodeNames[i], name); + break; + } + } + if(i>= NumNodes) { + printf("Strange situation with node-name!\n"); + } + return i; +} + +inline static void WriteTimeVal(FILE *const fp, const long long tp, const double *const vals, const int n) +{ + fprintf(fp, " %lld", tp); + for(int i= 0; i< n; ++i) { + fprintf(fp, " %.1lf", vals[i]); + } + fprintf(fp, "\n"); +} + +/* Somebody took time to write this query at */ +/* Used the query as is instead of building the query as the database code is expected to be most optimised */ +char QString[1170]= +"with job as (\n" +" select job_id, starttime, endtime, nodes from jobs where job_id='xxxxxxx.hawk-pbs5'\n" +"),\n" +"node_series as (\n" +" select n.name, scmcavg.id as series_id from nodes n\n" +" inner join (select * from label where key='node') l on n.id = l.value::int\n" +" inner join series_cmc_power_racktraynodepoweravg scmcavg on l.id = scmcavg.labels[(\n" +" select pos from label_key_position\n" +" where metric_category= 'cmc_power'\n" +" and metric_name = 'RackTrayNodePowerAvg'\n" +" and key = 'node'\n" +" )]\n" +" where n.id = any((select nodes from job)::int[])\n" +")\n" +"select a.time, a.value, ns.name from (\n" +" select\n" +" time_bucket(extract ('epoch' from '5 seconds'::interval)::int*1000, cmcavg.ts) as time,\n" +" cmcavg.series_id::varchar,\n" +" avg(cmcavg.val) AS value\n" +" from cmc_power_racktraynodepoweravg cmcavg\n" +"where\n" +" ts <= (select endtime from job)\n" +" and ts >= (select starttime from job)\n" +" and series_id = Any(select series_id from node_series)\n" +"group by time, cmcavg.series_id order by time desc) a\n" +"inner join node_series ns on a.series_id::int = ns.series_id;\n"; + +int main(int argc, char *argv[]) +{ + if(argc< 2|| 7!= strlen(argv[1])|| 0== atoi(argv[1])) { + printf("Invalid job-id (%s)\n", argc< 2? "(null)": argv[1]); + return 0; + } + + PGconn *conn= PQsetdbLogin("hawk-monitor4", "5432", "", "", "coe_mon", "hpc", ""); + if(CONNECTION_OK!= PQstatus(conn)) { + printf("Cannot connect to the database!\n"); + return 0; + } + + NumNodes= GetNumNodes(conn, argv[1]); + if(0== NumNodes) { + printf("Cannot retrieve number of nodes for the job-id: %d!\n", argv[1]); + goto bye; + } + + char *ptr= strstr(QString, "xxxxxxx.hawk-pbs5"); + strncpy(ptr, argv[1], 7); + + PGresult *res= PQexec(conn, QString); + if(PGRES_TUPLES_OK!= PQresultStatus(res)) { + printf("Problem here!\n"); + PQclear(res); + } + + NodeNames= (char (*)[32]) malloc(sizeof(char[32])* NumNodes); + for(int i= 0; i< NumNodes; ++i) { + memset(NodeNames[i], 0, sizeof(char[32])); + } + + const int nfields= PQnfields(res); + /* printf("%d fields\n", nfields); */ + int itime= -1, inname= -1, ival= -1; + for(int i= 0; i< nfields; ++i) { + const char *fieldname= PQfname(res, i); + if(0== strcmp("time", fieldname)) { + itime= i; + } else if(0== strcmp("name", fieldname)) { + inname= i; + } else if(0== strcmp("value", fieldname)) { + ival= i; + } else { + printf("Do not know how to read field \"%s\"\n", fieldname); + } + } + if(-1== itime|| -1== inname|| -1== ival) { + printf("Unknown fields causing program to exit!\n"); + PQclear(res); + goto bye; + } + /* printf("itime= %d, inname= %d, ival= %d\n", itime, inname, ival); */ + + int ntups= PQntuples(res); + /* printf("%d tuples\n", ntups); */ + if(0!= ntups% NumNodes) { + printf("#entries (%d) is not integral multiple of #nodes (%d)\n", ntups, NumNodes); + } + + int tup0= 0, tup1= ntups, tupstep= 1; + long long tp_min= atoll(PQgetvalue(res, tup0, itime)); + if(tp_min> atoll(PQgetvalue(res, ntups- 1, itime))) { + printf("Time in reverse order!\n"); + tup0= ntups- 1; tup1= -1; tupstep= -1; + tp_min= atoll(PQgetvalue(res, tup0, itime)); + } + + long long tp, last_tp= tp_min; + double *vals= malloc(sizeof(double)* NumNodes); + memset(vals, 0, sizeof(double)* NumNodes); + char nname[32]; + FILE *fp= fopen("oldstyleoutput.dat", "w"); + for(int it= tup0; tup1!= it; it+= tupstep) { + tp= atoll(PQgetvalue(res, it, itime)); /* - tp_min; */ + if(tp!= last_tp&& tp< last_tp) { /* some sanity check because I don't understand how the database querry works! */ + printf("Strange ordering!\n"); + } + if(tp> last_tp) { /* write out the last time-point */ + WriteTimeVal(fp, last_tp, vals, NumNodes); + memset(vals, 0, sizeof(double)* NumNodes); + last_tp= tp; + } + int nodeix= GetNodeIx(PQgetvalue(res, it, inname)); + vals[nodeix]= atof(PQgetvalue(res, it, ival)); + } + WriteTimeVal(fp, last_tp, vals, NumNodes); + fprintf(fp, "#offsetted-time-point (ms)"); + for(int i= 0; i< NumNodes; ++i) { + fprintf(fp, " %s", NodeNames[i]); + } + fprintf(fp, "\n"); + fclose(fp); fp= NULL; + + if(NULL!= vals) { free(vals); vals= NULL; } + if(NULL!= NodeNames) { free(NodeNames); NodeNames= NULL; } + + PQclear(res); + + bye: + PQfinish(conn); + + return 0; +}