diff --git a/configure.ac b/configure.ac index e12942de7..080f0fd8b 100644 --- a/configure.ac +++ b/configure.ac @@ -1706,6 +1706,14 @@ if test "x$enable_omkafka" = "xyes"; then ]) ]) ]) + PKG_CHECK_MODULES([LIBLZ4], [liblz4],, [ + AC_CHECK_LIB([lz4], [LZ4_compress], [ + AC_MSG_WARN([liblz4 is missing but library present, using -llz4]) + LIBRDKAFKA_LIBS=-llz4 + ], [ + AC_MSG_ERROR([could not find liblz4 library]) + ]) + ]) AC_CHECK_HEADERS([librdkafka/rdkafka.h]) fi @@ -1719,6 +1727,15 @@ if test "x$enable_imkafka" = "xyes"; then AC_MSG_ERROR([could not find rdkafka library]) ]) ]) + PKG_CHECK_MODULES([LIBLZ4], [liblz4],, [ + AC_CHECK_LIB([lz4], [LZ4_compress], [ + AC_MSG_WARN([liblz4 is missing but library present, using -llz4]) + LIBRDKAFKA_LIBS=-llz4 + ], [ + AC_MSG_ERROR([could not find liblz4 library]) + ]) + ]) + AC_CHECK_HEADERS([librdkafka/rdkafka.h]) fi diff --git a/plugins/omkafka/omkafka.c b/plugins/omkafka/omkafka.c index 612f52b82..3236c69bb 100644 --- a/plugins/omkafka/omkafka.c +++ b/plugins/omkafka/omkafka.c @@ -275,6 +275,9 @@ rd_kafka_topic_t** topic) { ABORT_FINALIZE(RS_RET_KAFKA_ERROR); } for(int i = 0 ; i < pData->nTopicConfParams ; ++i) { + DBGPRINTF("omkafka: setting custom topic configuration parameter: %s:%s\n", + pData->topicConfParams[i].name, + pData->topicConfParams[i].val); if(rd_kafka_topic_conf_set(topicconf, pData->topicConfParams[i].name, pData->topicConfParams[i].val, @@ -539,10 +542,10 @@ static void do_rd_kafka_destroy(instanceData *const __restrict__ pData) { if (pData->rk == NULL) { - DBGPRINTF("omkafka: can't close, handle wasn't open\n"); + DBGPRINTF("omkafka: onDestroy can't close, handle wasn't open\n"); } else { int queuedCount = rd_kafka_outq_len(pData->rk); - DBGPRINTF("omkafka: closing - items left in outqueue: %d\n", queuedCount); + DBGPRINTF("omkafka: onDestroy closing - items left in outqueue: %d\n", queuedCount); struct timespec tOut; timeoutComp(&tOut, pData->closeTimeout); @@ -550,7 +553,23 @@ do_rd_kafka_destroy(instanceData *const __restrict__ pData) while (timeoutVal(&tOut) > 0) { queuedCount = rd_kafka_outq_len(pData->rk); if (queuedCount > 0) { - rd_kafka_poll(pData->rk, 10); + /* Flush all remaining kafka messages (rd_kafka_poll is called inside) */ + const int flushStatus = rd_kafka_flush(pData->rk, 5000); + if (flushStatus != RD_KAFKA_RESP_ERR_NO_ERROR) /* TODO: Handle unsend messages here! */ { + errmsg.LogError(0, RS_RET_KAFKA_ERROR, + "omkafka: onDestroy Failed to send remaing '%d' messages to topic '%s' on shutdown with error: '%s'", + queuedCount, + rd_kafka_topic_name(pData->pTopic), + rd_kafka_err2str(flushStatus)); + } else { + DBGPRINTF("omkafka: onDestroyflushed remaining '%d' messages to kafka topic '%s'\n", + queuedCount, rd_kafka_topic_name(pData->pTopic)); + + /* Trigger callbacks a last time before shutdown */ + const int callbacksCalled = rd_kafka_poll(pData->rk, 0); /* call callbacks */ + DBGPRINTF("omkafka: onDestroy kafka outqueue length: %d, callbacks called %d\n", + rd_kafka_outq_len(pData->rk), callbacksCalled); + } } else { break; } @@ -651,6 +670,9 @@ openKafka(instanceData *const __restrict__ pData) #endif for(int i = 0 ; i < pData->nConfParams ; ++i) { + DBGPRINTF("omkafka: setting custom configuration parameter: %s:%s\n", + pData->confParams[i].name, + pData->confParams[i].val); if(rd_kafka_conf_set(conf, pData->confParams[i].name, pData->confParams[i].val, @@ -827,6 +849,7 @@ CODESTARTtryResume if ((iKafkaRet = rd_kafka_metadata(pWrkrData->pData->rk, 0, NULL, &metadata, 1000)) != RD_KAFKA_RESP_ERR_NO_ERROR) { DBGPRINTF("omkafka: tryResume failed, brokers down %d,%s\n", iKafkaRet, rd_kafka_err2str(iKafkaRet)); + ABORT_FINALIZE(RS_RET_SUSPENDED); } else { DBGPRINTF("omkafka: tryResume success, %d brokers UP\n", metadata->broker_cnt); /* Reset suspended state */ @@ -848,10 +871,11 @@ writeKafka(instanceData *pData, uchar *msg, uchar *msgTimestamp, uchar *topic) const int partition = getPartition(pData); rd_kafka_topic_t *rkt = NULL; pthread_rwlock_t *dynTopicLock = NULL; - int msg_enqueue_status = 0; #if RD_KAFKA_VERSION >= 0x00090400 rd_kafka_resp_err_t msg_kafka_response; int64_t ttMsgTimestamp; +#else + int msg_enqueue_status = 0; #endif DBGPRINTF("omkafka: trying to send: key:'%s', msg:'%s', timestamp:'%s'\n", pData->key, msg, msgTimestamp); @@ -916,10 +940,14 @@ writeKafka(instanceData *pData, uchar *msg, uchar *msgTimestamp, uchar *topic) if (pData->dynaTopic) { pthread_rwlock_unlock(dynTopicLock);/* dynamic topic can't be used beyond this pt */ } - DBGPRINTF("omkafka: kafka outqueue length: %d, callbacks called %d\n", + DBGPRINTF("omkafka: writeKafka kafka outqueue length: %d, callbacks called %d\n", rd_kafka_outq_len(pData->rk), callbacksCalled); +#if RD_KAFKA_VERSION >= 0x00090400 + if (msg_kafka_response != RD_KAFKA_RESP_ERR_NO_ERROR) { +#else if (msg_enqueue_status == -1) { +#endif STATSCOUNTER_INC(ctrKafkaFail, mutCtrKafkaFail); ABORT_FINALIZE(RS_RET_KAFKA_PRODUCE_ERR); /* ABORT_FINALIZE isn't absolutely necessary as of now, @@ -944,22 +972,34 @@ CODESTARTdoAction if (! pData->bIsOpen) CHKiRet(setupKafkaHandle(pData, 0)); - /* Suspend Action if broker problems were reported in error callback */ - if (pData->bIsSuspended) { - DBGPRINTF("omkafka: broker failure detected, suspending action\n"); - ABORT_FINALIZE(RS_RET_SUSPENDED); - } - + /* Lock here to prevent msg loss */ pthread_rwlock_rdlock(&pData->rkLock); + /* We need to trigger callbacks first in order to suspend the Action properly on failure */ + const int callbacksCalled = rd_kafka_poll(pData->rk, 0); /* call callbacks */ + DBGPRINTF("omkafka: doAction kafka outqueue length: %d, callbacks called %d\n", + rd_kafka_outq_len(pData->rk), callbacksCalled); + /* support dynamic topic */ if(pData->dynaTopic) iRet = writeKafka(pData, ppString[0], ppString[1], ppString[2]); else iRet = writeKafka(pData, ppString[0], ppString[1], pData->topic); + /* Unlock now */ pthread_rwlock_unlock(&pData->rkLock); finalize_it: + if(iRet != RS_RET_OK) { + DBGPRINTF("omkafka: doAction failed with status %d\n", iRet); + } + + /* Suspend Action if broker problems were reported in error callback */ + if (pData->bIsSuspended) { + DBGPRINTF("omkafka: doAction broker failure detected, suspending action\n"); + + /* Suspend Action now */ + iRet = RS_RET_SUSPENDED; + } ENDdoAction diff --git a/tests/diag.sh b/tests/diag.sh index 3717f8503..bdef57f51 100755 --- a/tests/diag.sh +++ b/tests/diag.sh @@ -641,7 +641,18 @@ case $1 in cp $srcdir/testsuites/$dep_work_kafka_config $dep_work_dir/kafka/config/ echo "Starting Kafka instance $2" (cd $dep_work_dir/kafka && ./bin/kafka-server-start.sh -daemon ./config/$dep_work_kafka_config) - ./msleep 2000 + ./msleep 4000 + + # Check if kafka instance came up! + kafkapid=`ps aux | grep -i $dep_work_kafka_config | grep java | grep -v grep | awk '{print $2}'` + # kafkapid=$(ps aux | grep -i '$dep_work_kafka_config' | grep java | grep -v grep | awk '{print $2}') + if [[ "" != "$kafkapid" ]]; + then + echo "Kafka instance $dep_work_kafka_config started with PID $kafkapid" + else + echo "Failed to start Kafka instance for $dep_work_kafka_config" + . $srcdir/diag.sh error-exit 1 + fi ;; 'stop-kafka') if [ "x$2" == "x" ]; then diff --git a/tests/sndrcv_kafka_fail.sh b/tests/sndrcv_kafka_fail.sh index 99c5b45f4..931f93d3b 100755 --- a/tests/sndrcv_kafka_fail.sh +++ b/tests/sndrcv_kafka_fail.sh @@ -44,7 +44,7 @@ echo \[sndrcv_kafka_fail.sh\]: Inject messages into rsyslog sender instance . $srcdir/diag.sh tcpflood -m$TESTMESSAGES -i1001 echo \[sndrcv_kafka_fail.sh\]: Sleep to give rsyslog instances time to process data ... -sleep 5 +sleep 10 echo \[sndrcv_kafka_fail.sh\]: Stopping sender instance [imkafka] . $srcdir/diag.sh shutdown-when-empty 2 @@ -60,4 +60,6 @@ echo \[sndrcv_kafka_fail.sh\]: Stopping receiver instance [omkafka] echo \[sndrcv_kafka_fail.sh\]: stop kafka instance . $srcdir/diag.sh delete-kafka-topic 'static' '.dep_wrk' '22181' . $srcdir/diag.sh stop-kafka + +# STOP ZOOKEEPER in any case . $srcdir/diag.sh stop-zookeeper diff --git a/tests/testsuites/sndrcv_kafka_sender.conf b/tests/testsuites/sndrcv_kafka_sender.conf index 603ea76a4..e79525021 100644 --- a/tests/testsuites/sndrcv_kafka_sender.conf +++ b/tests/testsuites/sndrcv_kafka_sender.conf @@ -15,8 +15,11 @@ action( name="kafka-fwd" "socket.timeout.ms=1000", "socket.keepalive.enable=true", "reconnect.backoff.jitter.ms=1000", - "message.send.max.retries=10"] + "queue.buffering.max.messages=10000", + "message.send.max.retries=5"] + topicConfParam=["message.timeout.ms=10000"] partitions.auto="on" -# action.resumeInterval="1" - action.resumeRetryCount="2" + action.resumeInterval="5" + action.resumeRetryCount="1" + queue.saveonshutdown="on" ) diff --git a/tests/testsuites/zoo.cfg b/tests/testsuites/zoo.cfg index 509cc47f5..b3a3cac5d 100644 --- a/tests/testsuites/zoo.cfg +++ b/tests/testsuites/zoo.cfg @@ -1,5 +1,5 @@ -tickTime=2000 -initLimit=10 -syncLimit=5 +tickTime=1000 +initLimit=5 +syncLimit=2 dataDir=zk_data_dir clientPort=22181 diff --git a/tests/travis/install.sh b/tests/travis/install.sh index c0d9b7758..b4221ff76 100755 --- a/tests/travis/install.sh +++ b/tests/travis/install.sh @@ -20,6 +20,7 @@ fi # As travis has no xenial images, we always need to install librdkafka from source if [ "x$KAFKA" == "xYES" ]; then + sudo apt-get install -qq liblz4-dev set -ex git clone https://github.com/edenhill/librdkafka echo $CFLAGS