omkafka: fix potential memory leak

if kafka produce fails when resubmitting messages, the message object
is duplicated. This potentially leads to a mem leak or message duplication
(not fully checked yet).
This commit is contained in:
Rainer Gerhards 2017-12-28 19:17:46 +01:00
parent 02761dd0d8
commit c9543c6543

View File

@ -80,6 +80,10 @@ struct kafka_params {
#define O_LARGEFILE 0
#endif
/* flags for writeKafka: shall we resubmit a failed message? */
#define RESUBMIT 1
#define NO_RESUBMIT 0
#if HAVE_ATOMIC_BUILTINS64
static uint64 clockTopicAccess = 0;
#else
@ -562,10 +566,13 @@ finalize_it:
RETiRet;
}
/* must be called with read(rkLock) */
/* must be called with read(rkLock)
* b_do_resubmit tells if we shall resubmit on error or not. This is needed
* when we submit already resubmitted messages.
*/
static rsRetVal ATTR_NONNULL(1, 2)
writeKafka(instanceData *const pData, uchar *const msg,
uchar *const msgTimestamp, uchar *const topic)
uchar *const msgTimestamp, uchar *const topic, const int b_do_resubmit)
{
DEFiRet;
const int partition = getPartition(pData);
@ -630,7 +637,7 @@ writeKafka(instanceData *const pData, uchar *const msg,
if (msg_kafka_response != RD_KAFKA_RESP_ERR_NO_ERROR ) {
/* Put into kafka queue, again if configured! */
if (pData->bResubmitOnFailure) {
if (pData->bResubmitOnFailure && b_do_resubmit) {
DBGPRINTF("omkafka: Failed to produce to topic '%s' (rd_kafka_producev)"
"partition %d: '%d/%s' - adding MSG '%s' to failed for RETRY!\n",
rd_kafka_topic_name(rkt), partition, msg_kafka_response,
@ -656,7 +663,7 @@ writeKafka(instanceData *const pData, uchar *const msg,
NULL);
if(msg_enqueue_status == -1) {
/* Put into kafka queue, again if configured! */
if (pData->bResubmitOnFailure) {
if (pData->bResubmitOnFailure && b_do_resubmit) {
DBGPRINTF("omkafka: Failed to produce to topic '%s' (rd_kafka_produce)"
"partition %d: '%d/%s' - adding MSG '%s' to failed for RETRY!\n",
rd_kafka_topic_name(rkt), partition, rd_kafka_last_error(),
@ -998,7 +1005,7 @@ checkFailedMessages(instanceData *const __restrict__ pData)
fmsgEntry = SLIST_FIRST(&pData->failedmsg_head);
assert(fmsgEntry != NULL);
/* Put back into kafka! */
iRet = writeKafka(pData, (uchar*) fmsgEntry->payload, NULL, fmsgEntry->topicname);
iRet = writeKafka(pData, (uchar*) fmsgEntry->payload, NULL, fmsgEntry->topicname, NO_RESUBMIT);
if(iRet != RS_RET_OK) {
// TODO: LogError???
DBGPRINTF("omkafka: failed to delivery failed msg '%.*s' with status %d. "
@ -1338,7 +1345,7 @@ CODESTARTdoAction
}
/* support dynamic topic */
iRet = writeKafka(pData, ppString[0], ppString[1], pData->dynaTopic ? ppString[2] : pData->topic);
iRet = writeKafka(pData, ppString[0], ppString[1], pData->dynaTopic ? ppString[2] : pData->topic, RESUBMIT);
finalize_it:
if(need_unlock) {