omkafka: Added support to set kafka timestamp

We are using rd_kafka_producev now.
Only works with librdkafka higher than 0.9.4. When using older vresion of
librdkafka, old api is being used (rd_kafka_produce).

closes https://github.com/rsyslog/rsyslog/issues/1559
This commit is contained in:
Andre Lorbach 2017-05-16 16:08:12 +02:00
parent a65096dee2
commit 43c782dc1c

View File

@ -39,6 +39,7 @@
#include "atomic.h"
#include "statsobj.h"
#include "unicode-helper.h"
#include "datetime.h"
MODULE_TYPE_OUTPUT
MODULE_TYPE_NOKEEP
@ -47,6 +48,7 @@ MODULE_CNFNAME("omkafka")
/* internal structures
*/
DEF_OMOD_STATIC_DATA
DEFobjCurrIf(datetime)
DEFobjCurrIf(errmsg)
DEFobjCurrIf(statsobj)
@ -86,6 +88,9 @@ getClockTopicAccess(void)
#endif
}
/* Needed for Kafka timestamp librdkafka > 0.9.4 */
#define KAFKA_TimeStamp "\"%timestamp:::date-unixtimestamp%\""
static int closeTimeout = 1000;
static pthread_mutex_t closeTimeoutMut = PTHREAD_MUTEX_INITIALIZER;
@ -798,15 +803,19 @@ ENDtryResume
/* must be called with read(rkLock) */
static rsRetVal
writeKafka(instanceData *pData, uchar *msg, uchar *topic)
writeKafka(instanceData *pData, uchar *msg, uchar *msgTimestamp, uchar *topic)
{
DEFiRet;
const int partition = getPartition(pData);
rd_kafka_topic_t *rkt = NULL;
pthread_rwlock_t *dynTopicLock = NULL;
int msg_enqueue_status = 0;
#if RD_KAFKA_VERSION >= 0x00090400
rd_kafka_resp_err_t msg_kafka_response;
int64_t ttMsgTimestamp;
#endif
DBGPRINTF("omkafka: trying to send: key:'%s', msg:'%s'\n", pData->key, msg);
DBGPRINTF("omkafka: trying to send: key:'%s', msg:'%s', timestamp:'%s'\n", pData->key, msg, msgTimestamp);
if(pData->dynaTopic) {
DBGPRINTF("omkafka: topic to insert to: %s\n", topic);
@ -815,17 +824,55 @@ writeKafka(instanceData *pData, uchar *msg, uchar *topic)
rkt = pData->pTopic;
}
#if RD_KAFKA_VERSION >= 0x00090400
ttMsgTimestamp = atoi((char*)msgTimestamp); /* Convert timestamp into int */
ttMsgTimestamp *= 1000; /* Timestamp in Milliseconds for kafka */
DBGPRINTF("omkafka: rd_kafka_producev timestamp=%s/%" PRId64 "\n", msgTimestamp, ttMsgTimestamp);
/* Using new kafka producev API, includes Timestamp! */
if (pData->key == NULL) {
msg_kafka_response = rd_kafka_producev(pData->rk,
RD_KAFKA_V_RKT(rkt),
RD_KAFKA_V_PARTITION(partition),
RD_KAFKA_V_VALUE(msg, strlen((char*)msg)),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_TIMESTAMP(ttMsgTimestamp),
RD_KAFKA_V_END);
} else {
msg_kafka_response = rd_kafka_producev(pData->rk,
RD_KAFKA_V_RKT(rkt),
RD_KAFKA_V_PARTITION(partition),
RD_KAFKA_V_VALUE(msg, strlen((char*)msg)),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_TIMESTAMP(ttMsgTimestamp),
RD_KAFKA_V_KEY(pData->key,strlen((char*)pData->key)),
RD_KAFKA_V_END);
}
if (msg_kafka_response != RD_KAFKA_RESP_ERR_NO_ERROR ) {
errmsg.LogError(0, RS_RET_KAFKA_PRODUCE_ERR,
"omkafka: Failed to produce to topic '%s' (rd_kafka_producev)"
"partition %d: %s\n",
rd_kafka_topic_name(rkt), partition,
rd_kafka_err2str(msg_kafka_response));
}
#else
DBGPRINTF("omkafka: rd_kafka_produce\n");
/* Using old kafka produce API */
msg_enqueue_status = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY,
msg, strlen((char*)msg), pData->key,
pData->key == NULL ? 0 : strlen((char*)pData->key),
NULL);
if(msg_enqueue_status == -1) {
errmsg.LogError(0, RS_RET_KAFKA_PRODUCE_ERR,
"omkafka: Failed to produce to topic '%s' "
"omkafka: Failed to produce to topic '%s' (rd_kafka_produce) "
"partition %d: %s\n",
rd_kafka_topic_name(rkt), partition,
rd_kafka_err2str(rd_kafka_errno2err(errno)));
}
#endif
const int callbacksCalled = rd_kafka_poll(pData->rk, 0); /* call callbacks */
if (pData->dynaTopic) {
pthread_rwlock_unlock(dynTopicLock);/* dynamic topic can't be used beyond this pt */
@ -862,9 +909,9 @@ CODESTARTdoAction
/* support dynamic topic */
if(pData->dynaTopic)
iRet = writeKafka(pData, ppString[0], ppString[1]);
iRet = writeKafka(pData, ppString[0], ppString[1], ppString[2]);
else
iRet = writeKafka(pData, ppString[0], pData->topic);
iRet = writeKafka(pData, ppString[0], ppString[1], pData->topic);
pthread_rwlock_unlock(&pData->rkLock);
finalize_it:
@ -995,14 +1042,18 @@ CODESTARTnewActInst
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
}
iNumTpls = 1;
iNumTpls = 2;
if(pData->dynaTopic) ++iNumTpls;
CODE_STD_STRING_REQUESTnewActInst(iNumTpls);
CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ?
"RSYSLOG_FileFormat" : (char*)pData->tplName),
OMSR_NO_RQD_TPL_OPTS));
CHKiRet(OMSRsetEntry(*ppOMSR, 1, (uchar*)strdup(" KAFKA_TimeStamp"),
OMSR_NO_RQD_TPL_OPTS));
if(pData->dynaTopic) {
CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->topic),
CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->topic),
OMSR_NO_RQD_TPL_OPTS));
CHKmalloc(pData->dynCache = (dynaTopicCacheEntry**)
calloc(pData->iDynaTopicCacheSize, sizeof(dynaTopicCacheEntry*)));
@ -1063,9 +1114,11 @@ ENDqueryEtryPt
BEGINmodInit()
CODESTARTmodInit
uchar *pTmp;
INITLegCnfVars
*ipIFVersProvided = CURR_MOD_IF_VERSION;
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(datetime, CORE_COMPONENT));
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(statsobj, CORE_COMPONENT));
@ -1095,5 +1148,10 @@ CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"topicdynacache.evicted",
ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrCacheEvict));
CHKiRet(statsobj.ConstructFinalize(kafkaStats));
DBGPRINTF("omkafka: Add KAFKA_TimeStamp to template system ONCE\n");
pTmp = (uchar*) KAFKA_TimeStamp;
tplAddLine(ourConf, " KAFKA_TimeStamp", &pTmp);
CODEmodInit_QueryRegCFSLineHdlr
ENDmodInit