omkafka: Enhanced support to detect kafka broker problems.

Also fixed tryresume and doAction handling when action is
in suspend state.

Calling for Callbacks more often.

Adjusted kafka fail test settings.

diag.sh script detects of kafka server comes up proberly

Added liblz4 to dependencies for omkafka/imkafka needed
for static linking.

This addresses the omkafka queue problem from issue:
https://github.com/rsyslog/rsyslog/issues/1052
Yet it does not fully solve the message problem yet,
but I am working on this.
This commit is contained in:
Andre Lorbach 2017-05-22 17:30:08 +02:00
parent fa775e7590
commit 732d0e21a7
7 changed files with 93 additions and 19 deletions

View File

@ -1706,6 +1706,14 @@ if test "x$enable_omkafka" = "xyes"; then
])
])
])
PKG_CHECK_MODULES([LIBLZ4], [liblz4],, [
AC_CHECK_LIB([lz4], [LZ4_compress], [
AC_MSG_WARN([liblz4 is missing but library present, using -llz4])
LIBRDKAFKA_LIBS=-llz4
], [
AC_MSG_ERROR([could not find liblz4 library])
])
])
AC_CHECK_HEADERS([librdkafka/rdkafka.h])
fi
@ -1719,6 +1727,15 @@ if test "x$enable_imkafka" = "xyes"; then
AC_MSG_ERROR([could not find rdkafka library])
])
])
PKG_CHECK_MODULES([LIBLZ4], [liblz4],, [
AC_CHECK_LIB([lz4], [LZ4_compress], [
AC_MSG_WARN([liblz4 is missing but library present, using -llz4])
LIBRDKAFKA_LIBS=-llz4
], [
AC_MSG_ERROR([could not find liblz4 library])
])
])
AC_CHECK_HEADERS([librdkafka/rdkafka.h])
fi

View File

@ -275,6 +275,9 @@ rd_kafka_topic_t** topic) {
ABORT_FINALIZE(RS_RET_KAFKA_ERROR);
}
for(int i = 0 ; i < pData->nTopicConfParams ; ++i) {
DBGPRINTF("omkafka: setting custom topic configuration parameter: %s:%s\n",
pData->topicConfParams[i].name,
pData->topicConfParams[i].val);
if(rd_kafka_topic_conf_set(topicconf,
pData->topicConfParams[i].name,
pData->topicConfParams[i].val,
@ -539,10 +542,10 @@ static void
do_rd_kafka_destroy(instanceData *const __restrict__ pData)
{
if (pData->rk == NULL) {
DBGPRINTF("omkafka: can't close, handle wasn't open\n");
DBGPRINTF("omkafka: onDestroy can't close, handle wasn't open\n");
} else {
int queuedCount = rd_kafka_outq_len(pData->rk);
DBGPRINTF("omkafka: closing - items left in outqueue: %d\n", queuedCount);
DBGPRINTF("omkafka: onDestroy closing - items left in outqueue: %d\n", queuedCount);
struct timespec tOut;
timeoutComp(&tOut, pData->closeTimeout);
@ -550,7 +553,23 @@ do_rd_kafka_destroy(instanceData *const __restrict__ pData)
while (timeoutVal(&tOut) > 0) {
queuedCount = rd_kafka_outq_len(pData->rk);
if (queuedCount > 0) {
rd_kafka_poll(pData->rk, 10);
/* Flush all remaining kafka messages (rd_kafka_poll is called inside) */
const int flushStatus = rd_kafka_flush(pData->rk, 5000);
if (flushStatus != RD_KAFKA_RESP_ERR_NO_ERROR) /* TODO: Handle unsend messages here! */ {
errmsg.LogError(0, RS_RET_KAFKA_ERROR,
"omkafka: onDestroy Failed to send remaing '%d' messages to topic '%s' on shutdown with error: '%s'",
queuedCount,
rd_kafka_topic_name(pData->pTopic),
rd_kafka_err2str(flushStatus));
} else {
DBGPRINTF("omkafka: onDestroyflushed remaining '%d' messages to kafka topic '%s'\n",
queuedCount, rd_kafka_topic_name(pData->pTopic));
/* Trigger callbacks a last time before shutdown */
const int callbacksCalled = rd_kafka_poll(pData->rk, 0); /* call callbacks */
DBGPRINTF("omkafka: onDestroy kafka outqueue length: %d, callbacks called %d\n",
rd_kafka_outq_len(pData->rk), callbacksCalled);
}
} else {
break;
}
@ -651,6 +670,9 @@ openKafka(instanceData *const __restrict__ pData)
#endif
for(int i = 0 ; i < pData->nConfParams ; ++i) {
DBGPRINTF("omkafka: setting custom configuration parameter: %s:%s\n",
pData->confParams[i].name,
pData->confParams[i].val);
if(rd_kafka_conf_set(conf,
pData->confParams[i].name,
pData->confParams[i].val,
@ -827,6 +849,7 @@ CODESTARTtryResume
if ((iKafkaRet = rd_kafka_metadata(pWrkrData->pData->rk, 0, NULL, &metadata, 1000)) != RD_KAFKA_RESP_ERR_NO_ERROR) {
DBGPRINTF("omkafka: tryResume failed, brokers down %d,%s\n", iKafkaRet, rd_kafka_err2str(iKafkaRet));
ABORT_FINALIZE(RS_RET_SUSPENDED);
} else {
DBGPRINTF("omkafka: tryResume success, %d brokers UP\n", metadata->broker_cnt);
/* Reset suspended state */
@ -848,10 +871,11 @@ writeKafka(instanceData *pData, uchar *msg, uchar *msgTimestamp, uchar *topic)
const int partition = getPartition(pData);
rd_kafka_topic_t *rkt = NULL;
pthread_rwlock_t *dynTopicLock = NULL;
int msg_enqueue_status = 0;
#if RD_KAFKA_VERSION >= 0x00090400
rd_kafka_resp_err_t msg_kafka_response;
int64_t ttMsgTimestamp;
#else
int msg_enqueue_status = 0;
#endif
DBGPRINTF("omkafka: trying to send: key:'%s', msg:'%s', timestamp:'%s'\n", pData->key, msg, msgTimestamp);
@ -916,10 +940,14 @@ writeKafka(instanceData *pData, uchar *msg, uchar *msgTimestamp, uchar *topic)
if (pData->dynaTopic) {
pthread_rwlock_unlock(dynTopicLock);/* dynamic topic can't be used beyond this pt */
}
DBGPRINTF("omkafka: kafka outqueue length: %d, callbacks called %d\n",
DBGPRINTF("omkafka: writeKafka kafka outqueue length: %d, callbacks called %d\n",
rd_kafka_outq_len(pData->rk), callbacksCalled);
#if RD_KAFKA_VERSION >= 0x00090400
if (msg_kafka_response != RD_KAFKA_RESP_ERR_NO_ERROR) {
#else
if (msg_enqueue_status == -1) {
#endif
STATSCOUNTER_INC(ctrKafkaFail, mutCtrKafkaFail);
ABORT_FINALIZE(RS_RET_KAFKA_PRODUCE_ERR);
/* ABORT_FINALIZE isn't absolutely necessary as of now,
@ -944,22 +972,34 @@ CODESTARTdoAction
if (! pData->bIsOpen)
CHKiRet(setupKafkaHandle(pData, 0));
/* Suspend Action if broker problems were reported in error callback */
if (pData->bIsSuspended) {
DBGPRINTF("omkafka: broker failure detected, suspending action\n");
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
/* Lock here to prevent msg loss */
pthread_rwlock_rdlock(&pData->rkLock);
/* We need to trigger callbacks first in order to suspend the Action properly on failure */
const int callbacksCalled = rd_kafka_poll(pData->rk, 0); /* call callbacks */
DBGPRINTF("omkafka: doAction kafka outqueue length: %d, callbacks called %d\n",
rd_kafka_outq_len(pData->rk), callbacksCalled);
/* support dynamic topic */
if(pData->dynaTopic)
iRet = writeKafka(pData, ppString[0], ppString[1], ppString[2]);
else
iRet = writeKafka(pData, ppString[0], ppString[1], pData->topic);
/* Unlock now */
pthread_rwlock_unlock(&pData->rkLock);
finalize_it:
if(iRet != RS_RET_OK) {
DBGPRINTF("omkafka: doAction failed with status %d\n", iRet);
}
/* Suspend Action if broker problems were reported in error callback */
if (pData->bIsSuspended) {
DBGPRINTF("omkafka: doAction broker failure detected, suspending action\n");
/* Suspend Action now */
iRet = RS_RET_SUSPENDED;
}
ENDdoAction

View File

@ -641,7 +641,18 @@ case $1 in
cp $srcdir/testsuites/$dep_work_kafka_config $dep_work_dir/kafka/config/
echo "Starting Kafka instance $2"
(cd $dep_work_dir/kafka && ./bin/kafka-server-start.sh -daemon ./config/$dep_work_kafka_config)
./msleep 2000
./msleep 4000
# Check if kafka instance came up!
kafkapid=`ps aux | grep -i $dep_work_kafka_config | grep java | grep -v grep | awk '{print $2}'`
# kafkapid=$(ps aux | grep -i '$dep_work_kafka_config' | grep java | grep -v grep | awk '{print $2}')
if [[ "" != "$kafkapid" ]];
then
echo "Kafka instance $dep_work_kafka_config started with PID $kafkapid"
else
echo "Failed to start Kafka instance for $dep_work_kafka_config"
. $srcdir/diag.sh error-exit 1
fi
;;
'stop-kafka')
if [ "x$2" == "x" ]; then

View File

@ -44,7 +44,7 @@ echo \[sndrcv_kafka_fail.sh\]: Inject messages into rsyslog sender instance
. $srcdir/diag.sh tcpflood -m$TESTMESSAGES -i1001
echo \[sndrcv_kafka_fail.sh\]: Sleep to give rsyslog instances time to process data ...
sleep 5
sleep 10
echo \[sndrcv_kafka_fail.sh\]: Stopping sender instance [imkafka]
. $srcdir/diag.sh shutdown-when-empty 2
@ -60,4 +60,6 @@ echo \[sndrcv_kafka_fail.sh\]: Stopping receiver instance [omkafka]
echo \[sndrcv_kafka_fail.sh\]: stop kafka instance
. $srcdir/diag.sh delete-kafka-topic 'static' '.dep_wrk' '22181'
. $srcdir/diag.sh stop-kafka
# STOP ZOOKEEPER in any case
. $srcdir/diag.sh stop-zookeeper

View File

@ -15,8 +15,11 @@ action( name="kafka-fwd"
"socket.timeout.ms=1000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"message.send.max.retries=10"]
"queue.buffering.max.messages=10000",
"message.send.max.retries=5"]
topicConfParam=["message.timeout.ms=10000"]
partitions.auto="on"
# action.resumeInterval="1"
action.resumeRetryCount="2"
action.resumeInterval="5"
action.resumeRetryCount="1"
queue.saveonshutdown="on"
)

View File

@ -1,5 +1,5 @@
tickTime=2000
initLimit=10
syncLimit=5
tickTime=1000
initLimit=5
syncLimit=2
dataDir=zk_data_dir
clientPort=22181

View File

@ -20,6 +20,7 @@ fi
# As travis has no xenial images, we always need to install librdkafka from source
if [ "x$KAFKA" == "xYES" ]; then
sudo apt-get install -qq liblz4-dev
set -ex
git clone https://github.com/edenhill/librdkafka
echo $CFLAGS