omkafka: Add ability to dump librdkafka statistics to a file (#3400)

omkafka: Add ability to dump librdkafka statistics to a file

Use statsFile to specify statistics output file; also requires setting statistics.interval.ms confparam to a non-zero value.
This commit is contained in:
pcullen65 2019-01-15 11:08:37 -05:00 committed by Rainer Gerhards
parent 00ce629e7a
commit 10b8235650

View File

@ -174,6 +174,9 @@ typedef struct _instanceData {
int fdErrFile; /* error file fd or -1 if not open */
pthread_mutex_t mutErrFile;
uchar *statsFile;
int fdStatsFile; /* stats file fd or -1 if not open */
pthread_mutex_t mutStatsFile;
int bIsOpen;
int bIsSuspended; /* when broker fail, we need to suspend the action */
pthread_rwlock_t rkLock;
@ -201,6 +204,7 @@ static struct cnfparamdescr actpdescr[] = {
{ "confparam", eCmdHdlrArray, 0 },
{ "topicconfparam", eCmdHdlrArray, 0 },
{ "errorfile", eCmdHdlrGetWord, 0 },
{ "statsfile", eCmdHdlrGetWord, 0 },
{ "key", eCmdHdlrGetWord, 0 },
{ "template", eCmdHdlrGetWord, 0 },
{ "closetimeout", eCmdHdlrPositiveInt, 0 },
@ -586,6 +590,47 @@ finalize_it:
RETiRet;
}
/* write librdkafka stats object to a file
* Note: we open the file but never close it before exit. If it
* needs to be closed, HUP must be sent.
* Assumes pData->statsFile != NULL.
*/
static rsRetVal
writeStats(instanceData *const pData,
char *statsData,
const size_t lenData)
{
int bLocked = 0;
DEFiRet;
/* Protect the file write from operations due to other wrks & HUP */
pthread_mutex_lock(&pData->mutStatsFile);
bLocked = 1;
if(pData->fdStatsFile == -1) {
pData->fdStatsFile = open((char*)pData->statsFile,
O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC,
S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
if(pData->fdStatsFile == -1) {
LogError(errno, RS_RET_ERR, "omkafka: error opening stats file %s",
pData->statsFile);
ABORT_FINALIZE(RS_RET_ERR);
}
}
ssize_t nwritten = write(pData->fdStatsFile, statsData, lenData);
nwritten += write(pData->fdStatsFile, "\n", 1);
if(nwritten != (ssize_t) lenData + 1) {
LogError(errno, RS_RET_ERR,
"omkafka: error writing stats file, write returns %lld, expected %lld\n",
(long long) nwritten, (long long)(lenData + 1));
}
finalize_it:
if(bLocked)
pthread_mutex_unlock(&pData->mutStatsFile);
RETiRet;
}
/* identify and count specific types of kafka failures.
*/
static rsRetVal
@ -882,6 +927,7 @@ static int
statsCallback(rd_kafka_t __attribute__((unused)) *rk,
char *json, size_t __attribute__((unused)) json_len,
void __attribute__((unused)) *opaque) {
instanceData *const pData = (instanceData *) opaque;
char buf[2048];
char handler_name[1024] = "unknown";
int replyq = 0;
@ -937,6 +983,10 @@ statsCallback(rd_kafka_t __attribute__((unused)) *rk,
rtt_avg_usec, throttle_avg_msec, int_latency_avg_usec);
LogMsg(0, NO_ERRCODE, LOG_INFO, "%s\n", buf);
/* Write the entire json stats object, if requested */
if (pData->statsFile != NULL)
writeStats(pData, json, json_len);
return 0;
}
@ -1403,6 +1453,12 @@ CODESTARTdoHUP
pData->fdErrFile = -1;
}
pthread_mutex_unlock(&pData->mutErrFile);
pthread_mutex_lock(&pData->mutStatsFile);
if(pData->fdStatsFile != -1) {
close(pData->fdStatsFile);
pData->fdStatsFile = -1;
}
pthread_mutex_unlock(&pData->mutStatsFile);
if (pData->bReopenOnHup) {
CHKiRet(setupKafkaHandle(pData, 1));
}
@ -1415,6 +1471,7 @@ CODESTARTcreateInstance
pData->bIsOpen = 0;
pData->bIsSuspended = 0;
pData->fdErrFile = -1;
pData->fdStatsFile = -1;
pData->pTopic = NULL;
pData->bReportErrs = 1;
pData->bReopenOnHup = 1;
@ -1424,6 +1481,7 @@ CODESTARTcreateInstance
SLIST_INIT(&pData->failedmsg_head);
CHKiRet(pthread_mutex_init(&pData->mut_doAction, NULL));
CHKiRet(pthread_mutex_init(&pData->mutErrFile, NULL));
CHKiRet(pthread_mutex_init(&pData->mutStatsFile, NULL));
CHKiRet(pthread_rwlock_init(&pData->rkLock, NULL));
CHKiRet(pthread_mutex_init(&pData->mutDynCache, NULL));
INIT_ATOMIC_HELPER_MUT(pData->mutCurrPartition);
@ -1448,6 +1506,8 @@ CODESTARTfreeInstance
failedmsg_entry* fmsgEntry2;
if(pData->fdErrFile != -1)
close(pData->fdErrFile);
if(pData->fdStatsFile != -1)
close(pData->fdStatsFile);
/* Closing Kafka first! */
pthread_rwlock_wrlock(&pData->rkLock);
closeKafka(pData);
@ -1471,6 +1531,7 @@ CODESTARTfreeInstance
SLIST_INIT(&pData->failedmsg_head);
/* Free other mem */
free(pData->errorFile);
free(pData->statsFile);
free(pData->failedMsgFile);
free(pData->topic);
free(pData->brokers);
@ -1489,6 +1550,7 @@ CODESTARTfreeInstance
pthread_rwlock_destroy(&pData->rkLock);
pthread_mutex_destroy(&pData->mut_doAction);
pthread_mutex_destroy(&pData->mutErrFile);
pthread_mutex_destroy(&pData->mutStatsFile);
pthread_mutex_destroy(&pData->mutDynCache);
ENDfreeInstance
@ -1614,6 +1676,7 @@ setInstParamDefaults(instanceData *pData)
pData->nTopicConfParams = 0;
pData->topicConfParams = NULL;
pData->errorFile = NULL;
pData->statsFile = NULL;
pData->failedMsgFile = NULL;
pData->key = NULL;
pData->closeTimeout = 2000;
@ -1701,6 +1764,8 @@ CODESTARTnewActInst
}
} else if(!strcmp(actpblk.descr[i].name, "errorfile")) {
pData->errorFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "statsfile")) {
pData->statsFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "key")) {
pData->key = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "template")) {