diff --git a/plugins/omkafka/omkafka.c b/plugins/omkafka/omkafka.c index c1bd2f190..6df454846 100644 --- a/plugins/omkafka/omkafka.c +++ b/plugins/omkafka/omkafka.c @@ -39,6 +39,7 @@ #include "atomic.h" #include "statsobj.h" #include "unicode-helper.h" +#include "datetime.h" MODULE_TYPE_OUTPUT MODULE_TYPE_NOKEEP @@ -47,6 +48,7 @@ MODULE_CNFNAME("omkafka") /* internal structures */ DEF_OMOD_STATIC_DATA +DEFobjCurrIf(datetime) DEFobjCurrIf(errmsg) DEFobjCurrIf(statsobj) @@ -86,6 +88,9 @@ getClockTopicAccess(void) #endif } +/* Needed for Kafka timestamp librdkafka > 0.9.4 */ +#define KAFKA_TimeStamp "\"%timestamp:::date-unixtimestamp%\"" + static int closeTimeout = 1000; static pthread_mutex_t closeTimeoutMut = PTHREAD_MUTEX_INITIALIZER; @@ -798,15 +803,19 @@ ENDtryResume /* must be called with read(rkLock) */ static rsRetVal -writeKafka(instanceData *pData, uchar *msg, uchar *topic) +writeKafka(instanceData *pData, uchar *msg, uchar *msgTimestamp, uchar *topic) { DEFiRet; const int partition = getPartition(pData); rd_kafka_topic_t *rkt = NULL; pthread_rwlock_t *dynTopicLock = NULL; int msg_enqueue_status = 0; +#if RD_KAFKA_VERSION >= 0x00090400 + rd_kafka_resp_err_t msg_kafka_response; + int64_t ttMsgTimestamp; +#endif - DBGPRINTF("omkafka: trying to send: key:'%s', msg:'%s'\n", pData->key, msg); + DBGPRINTF("omkafka: trying to send: key:'%s', msg:'%s', timestamp:'%s'\n", pData->key, msg, msgTimestamp); if(pData->dynaTopic) { DBGPRINTF("omkafka: topic to insert to: %s\n", topic); @@ -815,17 +824,55 @@ writeKafka(instanceData *pData, uchar *msg, uchar *topic) rkt = pData->pTopic; } +#if RD_KAFKA_VERSION >= 0x00090400 + ttMsgTimestamp = atoi((char*)msgTimestamp); /* Convert timestamp into int */ + ttMsgTimestamp *= 1000; /* Timestamp in Milliseconds for kafka */ + DBGPRINTF("omkafka: rd_kafka_producev timestamp=%s/%" PRId64 "\n", msgTimestamp, ttMsgTimestamp); + + /* Using new kafka producev API, includes Timestamp! */ + if (pData->key == NULL) { + msg_kafka_response = rd_kafka_producev(pData->rk, + RD_KAFKA_V_RKT(rkt), + RD_KAFKA_V_PARTITION(partition), + RD_KAFKA_V_VALUE(msg, strlen((char*)msg)), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_TIMESTAMP(ttMsgTimestamp), + RD_KAFKA_V_END); + } else { + msg_kafka_response = rd_kafka_producev(pData->rk, + RD_KAFKA_V_RKT(rkt), + RD_KAFKA_V_PARTITION(partition), + RD_KAFKA_V_VALUE(msg, strlen((char*)msg)), + RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY), + RD_KAFKA_V_TIMESTAMP(ttMsgTimestamp), + RD_KAFKA_V_KEY(pData->key,strlen((char*)pData->key)), + RD_KAFKA_V_END); + } + + if (msg_kafka_response != RD_KAFKA_RESP_ERR_NO_ERROR ) { + errmsg.LogError(0, RS_RET_KAFKA_PRODUCE_ERR, + "omkafka: Failed to produce to topic '%s' (rd_kafka_producev)" + "partition %d: %s\n", + rd_kafka_topic_name(rkt), partition, + rd_kafka_err2str(msg_kafka_response)); + } +#else + + DBGPRINTF("omkafka: rd_kafka_produce\n"); + /* Using old kafka produce API */ msg_enqueue_status = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, msg, strlen((char*)msg), pData->key, pData->key == NULL ? 0 : strlen((char*)pData->key), NULL); if(msg_enqueue_status == -1) { errmsg.LogError(0, RS_RET_KAFKA_PRODUCE_ERR, - "omkafka: Failed to produce to topic '%s' " + "omkafka: Failed to produce to topic '%s' (rd_kafka_produce) " "partition %d: %s\n", rd_kafka_topic_name(rkt), partition, rd_kafka_err2str(rd_kafka_errno2err(errno))); } +#endif + const int callbacksCalled = rd_kafka_poll(pData->rk, 0); /* call callbacks */ if (pData->dynaTopic) { pthread_rwlock_unlock(dynTopicLock);/* dynamic topic can't be used beyond this pt */ @@ -862,9 +909,9 @@ CODESTARTdoAction /* support dynamic topic */ if(pData->dynaTopic) - iRet = writeKafka(pData, ppString[0], ppString[1]); + iRet = writeKafka(pData, ppString[0], ppString[1], ppString[2]); else - iRet = writeKafka(pData, ppString[0], pData->topic); + iRet = writeKafka(pData, ppString[0], ppString[1], pData->topic); pthread_rwlock_unlock(&pData->rkLock); finalize_it: @@ -995,14 +1042,18 @@ CODESTARTnewActInst ABORT_FINALIZE(RS_RET_CONFIG_ERROR); } - iNumTpls = 1; + iNumTpls = 2; if(pData->dynaTopic) ++iNumTpls; CODE_STD_STRING_REQUESTnewActInst(iNumTpls); CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)strdup((pData->tplName == NULL) ? "RSYSLOG_FileFormat" : (char*)pData->tplName), OMSR_NO_RQD_TPL_OPTS)); + + CHKiRet(OMSRsetEntry(*ppOMSR, 1, (uchar*)strdup(" KAFKA_TimeStamp"), + OMSR_NO_RQD_TPL_OPTS)); + if(pData->dynaTopic) { - CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->topic), + CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->topic), OMSR_NO_RQD_TPL_OPTS)); CHKmalloc(pData->dynCache = (dynaTopicCacheEntry**) calloc(pData->iDynaTopicCacheSize, sizeof(dynaTopicCacheEntry*))); @@ -1063,9 +1114,11 @@ ENDqueryEtryPt BEGINmodInit() CODESTARTmodInit + uchar *pTmp; INITLegCnfVars *ipIFVersProvided = CURR_MOD_IF_VERSION; CODEmodInit_QueryRegCFSLineHdlr + CHKiRet(objUse(datetime, CORE_COMPONENT)); CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(statsobj, CORE_COMPONENT)); @@ -1095,5 +1148,10 @@ CODEmodInit_QueryRegCFSLineHdlr CHKiRet(statsobj.AddCounter(kafkaStats, (uchar *)"topicdynacache.evicted", ctrType_IntCtr, CTR_FLAG_RESETTABLE, &ctrCacheEvict)); CHKiRet(statsobj.ConstructFinalize(kafkaStats)); + + DBGPRINTF("omkafka: Add KAFKA_TimeStamp to template system ONCE\n"); + pTmp = (uchar*) KAFKA_TimeStamp; + tplAddLine(ourConf, " KAFKA_TimeStamp", &pTmp); + CODEmodInit_QueryRegCFSLineHdlr ENDmodInit