testbench: Added new kafka tests using kafkacat for better debugging.

Fixed some issues with sndrcv kafka tests.
Generating kafka topics dynamically now it kafka tests.

Limited messagecount in some tests to 50000 for now.
This commit is contained in:
Andre Lorbach 2018-08-30 17:56:20 +02:00
parent 3cb6adc5e8
commit 9d0933bccf
19 changed files with 546 additions and 366 deletions

View File

@ -597,12 +597,14 @@ if ENABLE_OMKAFKA
if ENABLE_IMKAFKA
if ENABLE_KAFKA_TESTS
TESTS += \
omkafka.sh \
imkafka.sh \
sndrcv_kafka.sh \
sndrcv_kafka_multi.sh
sndrcv_kafka_multi_topics.sh
# Tests below need to be stable first!
# sndrcv_kafka_fail.sh \
# sndrcv_kafka_failresume.sh
# sndrcv_kafka_multi.sh
if HAVE_VALGRIND
TESTS += \
sndrcv_kafka-vg-rcvr.sh
@ -1585,6 +1587,8 @@ EXTRA_DIST= \
mysql-actq-mt.sh \
mysql-actq-mt-withpause.sh \
mysql-actq-mt-withpause-vg.sh \
omkafka.sh \
imkafka.sh \
sndrcv_kafka.sh \
sndrcv_kafka_multi_topics.sh \
sndrcv_kafka-vg-rcvr.sh \

View File

@ -619,7 +619,7 @@ function gzip_seq_check() {
function tcpflood() {
eval ./tcpflood -p$TCPFLOOD_PORT "$@" $TCPFLOOD_EXTRA_OPTS
if [ "$?" -ne "0" ]; then
echo "error during tcpflood! see ${RSYSLOG_OUT_LOG}.save for what was written"
echo "error during tcpflood on port ${TCPFLOOD_PORT}! see ${RSYSLOG_OUT_LOG}.save for what was written"
cp ${RSYSLOG_OUT_LOG} ${RSYSLOG_OUT_LOG}.save
error_exit 1 stacktrace
fi
@ -694,8 +694,11 @@ function presort() {
dep_cache_dir=$(pwd)/.dep_cache
dep_zk_url=http://www-us.apache.org/dist/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
dep_zk_cached_file=$dep_cache_dir/zookeeper-3.4.13.tar.gz
# dep_kafka_url=http://www-us.apache.org/dist/kafka/0.10.2.2/kafka_2.12-0.10.2.2.tgz
# dep_kafka_cached_file=$dep_cache_dir/kafka_2.12-0.10.2.2.tgz
# byANDRE: We stay with kafka 0.10.x for now. Newer Kafka Versions have changes that
# makes creating testbench with single kafka instances difficult.
# old version -> dep_kafka_url=http://www-us.apache.org/dist/kafka/0.10.2.2/kafka_2.12-0.10.2.2.tgz
# old version -> dep_kafka_cached_file=$dep_cache_dir/kafka_2.12-0.10.2.2.tgz
dep_kafka_url=http://www-us.apache.org/dist/kafka/2.0.0/kafka_2.12-2.0.0.tgz
dep_kafka_cached_file=$dep_cache_dir/kafka_2.12-2.0.0.tgz
@ -1160,6 +1163,8 @@ case $1 in
$TESTTOOL_DIR/msleep 2000
;;
'start-kafka')
# Force IPv4 usage of Kafka!
export KAFKA_OPTS="-Djava.net.preferIPv4Stack=True"
if [ "x$2" == "x" ]; then
dep_work_dir=$(readlink -f .dep_wrk)
dep_work_kafka_config="kafka-server.properties"
@ -1382,7 +1387,9 @@ case $1 in
echo "Topic-name not provided."
exit 1
fi
(cd $dep_work_dir/kafka && ./bin/kafka-topics.sh --create --zookeeper localhost:$dep_work_port/kafka --topic $2 --partitions 2 --replication-factor 1)
(cd $dep_work_dir/kafka && ./bin/kafka-topics.sh --zookeeper localhost:$dep_work_port/kafka --create --topic $2 --replication-factor 1 --partitions 2 )
(cd $dep_work_dir/kafka && ./bin/kafka-topics.sh --zookeeper localhost:$dep_work_port/kafka --alter --topic $2 --delete-config retention.ms)
(cd $dep_work_dir/kafka && ./bin/kafka-topics.sh --zookeeper localhost:$dep_work_port/kafka --alter --topic $2 --delete-config retention.bytes)
;;
'delete-kafka-topic')
if [ "x$3" == "x" ]; then

93
tests/imkafka.sh Executable file
View File

@ -0,0 +1,93 @@
#!/bin/bash
# added 2018-08-29 by alorbach
# This file is part of the rsyslog project, released under ASL 2.0
export TESTMESSAGES=100000
export TESTMESSAGESFULL=$TESTMESSAGES
# Generate random topic name
export RANDTOPIC=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
# enable the EXTRA_EXITCHECK only if really needed - otherwise spams the test log
# too much
#export EXTRA_EXITCHECK=dumpkafkalogs
echo ===============================================================================
echo \[imkafka.sh\]: Create kafka/zookeeper instance and $RANDTOPIC topic
. $srcdir/diag.sh download-kafka
. $srcdir/diag.sh stop-zookeeper
. $srcdir/diag.sh stop-kafka
. $srcdir/diag.sh start-zookeeper
. $srcdir/diag.sh start-kafka
# create new topic
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
echo \[imkafka.sh\]: Give Kafka some time to process topic create ...
sleep 5
echo \[imkafka.sh\]: Init Testbench
. $srcdir/diag.sh init
# --- Create imkafka receiver config
export RSYSLOG_DEBUGLOG="log"
generate_conf
add_conf '
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
module(load="../plugins/imkafka/.libs/imkafka")
/* Polls messages from kafka server!*/
input( type="imkafka"
topic="'$RANDTOPIC'"
broker="localhost:29092"
consumergroup="default"
confParam=[ "compression.codec=none",
"session.timeout.ms=10000",
"socket.timeout.ms=5000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"enable.partition.eof=false" ]
)
template(name="outfmt" type="string" string="%msg:F,58:2%\n")
if ($msg contains "msgnum:") then {
action( type="omfile" file=`echo $RSYSLOG_OUT_LOG` template="outfmt" )
}
'
# ---
# --- Start imkafka receiver config
echo \[imkafka.sh\]: Starting receiver instance [imkafka]
startup
# ---
# --- Fill Kafka Server with messages
# Can properly be done in a better way?!
for i in {00000001..00100000}
do
echo " msgnum:$i" >> $RSYSLOG_OUT_LOG.in
done
echo \[imkafka.sh\]: Inject messages into kafka
cat $RSYSLOG_OUT_LOG.in | kafkacat -P -b localhost:29092 -t $RANDTOPIC
# ---
echo \[imkafka.sh\]: Give imkafka some time to start...
sleep 5
echo \[imkafka.sh\]: Stopping sender instance [omkafka]
shutdown_when_empty
wait_shutdown
# Delete topic to remove old traces before
. $srcdir/diag.sh delete-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
echo \[imkafka.sh\]: stop kafka instance
. $srcdir/diag.sh stop-kafka
# STOP ZOOKEEPER in any case
. $srcdir/diag.sh stop-zookeeper
# Do the final sequence check
seq_check 1 $TESTMESSAGESFULL -d
echo success
exit_test

95
tests/omkafka.sh Executable file
View File

@ -0,0 +1,95 @@
#!/bin/bash
# added 2017-05-03 by alorbach
# This file is part of the rsyslog project, released under ASL 2.0
export TESTMESSAGES=50000
export TESTMESSAGESFULL=$TESTMESSAGES
# Generate random topic name
export RANDTOPIC=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
# enable the EXTRA_EXITCHECK only if really needed - otherwise spams the test log
# too much
#export EXTRA_EXITCHECK=dumpkafkalogs
echo ===============================================================================
echo \[omkafka.sh\]: Create kafka/zookeeper instance and $RANDTOPIC topic
. $srcdir/diag.sh download-kafka
. $srcdir/diag.sh stop-zookeeper
. $srcdir/diag.sh stop-kafka
. $srcdir/diag.sh start-zookeeper
. $srcdir/diag.sh start-kafka
# create new topic
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
echo \[omkafka.sh\]: Give Kafka some time to process topic create ...
sleep 5
echo \[omkafka.sh\]: Init Testbench
. $srcdir/diag.sh init
# --- Create/Start omkafka sender config
export RSYSLOG_DEBUGLOG="log"
generate_conf
add_conf '
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
module(load="../plugins/omkafka/.libs/omkafka")
module(load="../plugins/imtcp/.libs/imtcp")
input(type="imtcp" port="'$TCPFLOOD_PORT'") /* this port for tcpflood! */
template(name="outfmt" type="string" string="%msg:F,58:2%\n")
local4.* action( name="kafka-fwd"
type="omkafka"
topic="'$RANDTOPIC'"
broker="localhost:29092"
template="outfmt"
confParam=[ "compression.codec=none",
"socket.timeout.ms=10000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"queue.buffering.max.messages=10000",
"enable.auto.commit=true",
"message.send.max.retries=1"]
topicConfParam=["message.timeout.ms=10000"]
partitions.auto="on"
closeTimeout="60000"
resubmitOnFailure="on"
keepFailedMessages="on"
failedMsgFile="omkafka-failed.data"
action.resumeInterval="1"
action.resumeRetryCount="2"
queue.saveonshutdown="on"
)
'
echo \[omkafka.sh\]: Starting sender instance [omkafka]
startup
# ---
echo \[omkafka.sh\]: Inject messages into rsyslog sender instance
tcpflood -m$TESTMESSAGES -i1
echo \[omkafka.sh\]: Stopping sender instance [omkafka]
shutdown_when_empty
wait_shutdown
kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s'> $RSYSLOG_OUT_LOG
kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p@%o:%k:%s' > $RSYSLOG_OUT_LOG.extra
# Delete topic to remove old traces before
# . $srcdir/diag.sh delete-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
# Dump Kafka log
# uncomment if needed . $srcdir/diag.sh dump-kafka-serverlog
echo \[omkafka.sh\]: stop kafka instance
. $srcdir/diag.sh stop-kafka
# STOP ZOOKEEPER in any case
. $srcdir/diag.sh stop-zookeeper
# Do the final sequence check
seq_check 1 $TESTMESSAGESFULL -d
echo success
exit_test

View File

@ -3,17 +3,22 @@
# This file is part of the rsyslog project, released under ASL 2.0
export TESTMESSAGES=100000
export TESTMESSAGESFULL=100000
# Generate random topic name
export RANDTOPIC=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
# enable the EXTRA_EXITCHECK only if really needed - otherwise spams the test log
# too much
#export EXTRA_EXITCHECK=dumpkafkalogs
echo ===============================================================================
echo \[sndrcv_kafka.sh\]: Create kafka/zookeeper instance and static topic
echo \[sndrcv_kafka.sh\]: Create kafka/zookeeper instance and $RANDTOPIC topic
. $srcdir/diag.sh download-kafka
. $srcdir/diag.sh stop-zookeeper
. $srcdir/diag.sh stop-kafka
. $srcdir/diag.sh start-zookeeper
. $srcdir/diag.sh start-kafka
. $srcdir/diag.sh create-kafka-topic 'static' '.dep_wrk' '22181'
# create new topic
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
echo \[sndrcv_kafka.sh\]: Give Kafka some time to process topic create ...
sleep 5
@ -25,7 +30,7 @@ echo \[sndrcv_kafka.sh\]: Init Testbench
export RSYSLOG_DEBUGLOG="log"
generate_conf
add_conf '
main_queue(queue.timeoutactioncompletion="10000" queue.timeoutshutdown="60000")
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
module(load="../plugins/omkafka/.libs/omkafka")
module(load="../plugins/imtcp/.libs/imtcp")
@ -35,24 +40,24 @@ template(name="outfmt" type="string" string="%msg%\n")
local4.* action( name="kafka-fwd"
type="omkafka"
topic="static"
topic="'$RANDTOPIC'"
broker="localhost:29092"
template="outfmt"
confParam=[ "compression.codec=none",
"socket.timeout.ms=10000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"queue.buffering.max.messages=20000",
"queue.buffering.max.messages=10000",
"enable.auto.commit=true",
"message.send.max.retries=1"]
topicConfParam=["message.timeout.ms=10000"]
partitions.auto="on"
closeTimeout="30000"
closeTimeout="60000"
resubmitOnFailure="on"
keepFailedMessages="on"
failedMsgFile="omkafka-failed.data"
action.resumeInterval="1"
action.resumeRetryCount="-1"
action.resumeRetryCount="2"
queue.saveonshutdown="on"
)
'
@ -68,14 +73,17 @@ tcpflood -m$TESTMESSAGES -i1
export RSYSLOG_DEBUGLOG="log2"
generate_conf 2
add_conf '
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
module(load="../plugins/imkafka/.libs/imkafka")
/* Polls messages from kafka server!*/
input( type="imkafka"
topic="static"
topic="'$RANDTOPIC'"
broker="localhost:29092"
consumergroup="default"
confParam=[ "compression.codec=none",
"socket.timeout.ms=10000",
"session.timeout.ms=10000",
"socket.timeout.ms=5000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"enable.partition.eof=false" ]
@ -92,9 +100,6 @@ echo \[sndrcv_kafka.sh\]: Starting receiver instance [imkafka]
startup 2
# ---
#echo \[sndrcv_kafka.sh\]: Sleep to give rsyslog instances time to process data ...
#sleep 20
echo \[sndrcv_kafka.sh\]: Stopping sender instance [omkafka]
shutdown_when_empty
wait_shutdown
@ -103,11 +108,8 @@ echo \[sndrcv_kafka.sh\]: Stopping receiver instance [imkafka]
shutdown_when_empty 2
wait_shutdown 2
# Do the final sequence check
seq_check 1 $TESTMESSAGESFULL -d
echo \[sndrcv_kafka.sh\]: delete kafka topics
. $srcdir/diag.sh delete-kafka-topic 'static' '.dep_wrk' '22181'
# Delete topic to remove old traces before
. $srcdir/diag.sh delete-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
echo \[sndrcv_kafka.sh\]: stop kafka instance
. $srcdir/diag.sh stop-kafka
@ -115,5 +117,8 @@ echo \[sndrcv_kafka.sh\]: stop kafka instance
# STOP ZOOKEEPER in any case
. $srcdir/diag.sh stop-zookeeper
# Do the final sequence check
seq_check 1 $TESTMESSAGESFULL -d
echo success
exit_test

View File

@ -2,18 +2,21 @@
# added 2017-05-18 by alorbach
# This test only tests what happens when kafka cluster fails
# This file is part of the rsyslog project, released under ASL 2.0
export TESTMESSAGES=10000
export TESTMESSAGES2=10001
export TESTMESSAGESFULL=10000
export TESTMESSAGES=50000
export TESTMESSAGES2=50001
export TESTMESSAGESFULL=100000
# Generate random topic name
export RANDTOPIC=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
echo ===============================================================================
echo \[sndrcv_kafka_fail.sh\]: Create kafka/zookeeper instance and static topic
echo \[sndrcv_kafka_fail.sh\]: Create kafka/zookeeper instance and $RANDTOPIC topic
. $srcdir/diag.sh download-kafka
. $srcdir/diag.sh stop-zookeeper
. $srcdir/diag.sh stop-kafka
. $srcdir/diag.sh start-zookeeper
. $srcdir/diag.sh start-kafka
. $srcdir/diag.sh create-kafka-topic 'static' '.dep_wrk' '22181'
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
echo \[sndrcv_kafka_fail.sh\]: Give Kafka some time to process topic create ...
sleep 5
@ -24,18 +27,24 @@ echo \[sndrcv_kafka_fail.sh\]: Init Testbench
echo \[sndrcv_kafka_fail.sh\]: Stopping kafka cluster instance
. $srcdir/diag.sh stop-kafka
# --- Create omkafka receiver config
# --- Create imkafka receiver config
export RSYSLOG_DEBUGLOG="log"
generate_conf
add_conf '
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
module(load="../plugins/imkafka/.libs/imkafka")
/* Polls messages from kafka server!*/
input( type="imkafka"
topic="static"
topic="'$RANDTOPIC'"
broker="localhost:29092"
consumergroup="default"
confParam=[ "compression.codec=none",
"session.timeout.ms=10000",
"socket.timeout.ms=5000",
"socket.keepalive.enable=true"]
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"enable.partition.eof=false" ]
)
template(name="outfmt" type="string" string="%msg:F,58:2%\n")
@ -45,15 +54,15 @@ if ($msg contains "msgnum:") then {
}
'
echo \[sndrcv_kafka_fail.sh\]: Starting receiver instance [omkafka]
export RSYSLOG_DEBUGLOG="log"
echo \[sndrcv_kafka_fail.sh\]: Starting receiver instance [imkafka]
startup
# ---
# --- Create omkafka sender config
export RSYSLOG_DEBUGLOG="log2"
generate_conf 2
add_conf '
main_queue(queue.timeoutactioncompletion="5000" queue.timeoutshutdown="60000")
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
module(load="../plugins/omkafka/.libs/omkafka")
module(load="../plugins/imtcp/.libs/imtcp")
@ -63,30 +72,29 @@ template(name="outfmt" type="string" string="%msg%\n")
local4.* action( name="kafka-fwd"
type="omkafka"
topic="static"
topic="'$RANDTOPIC'"
broker="localhost:29092"
template="outfmt"
confParam=[ "compression.codec=none",
"socket.timeout.ms=5000",
"socket.timeout.ms=10000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"queue.buffering.max.messages=20000",
"enable.auto.commit=true",
"message.send.max.retries=1"]
topicConfParam=["message.timeout.ms=5000"]
topicConfParam=["message.timeout.ms=10000"]
partitions.auto="on"
closeTimeout="30000"
closeTimeout="60000"
resubmitOnFailure="on"
keepFailedMessages="on"
failedMsgFile="omkafka-failed.data"
action.resumeInterval="2"
action.resumeRetryCount="-1"
action.resumeRetryCount="2"
queue.saveonshutdown="on"
)
' 2
echo \[sndrcv_kafka_fail.sh\]: Starting sender instance [imkafka]
export RSYSLOG_DEBUGLOG="log2"
echo \[sndrcv_kafka_fail.sh\]: Starting sender instance [omkafka]
startup 2
# ---
@ -109,18 +117,12 @@ echo \[sndrcv_kafka_fail.sh\]: Stopping sender instance [imkafka]
shutdown_when_empty 2
wait_shutdown 2
#echo \[sndrcv_kafka_fail.sh\]: Sleep to give rsyslog receiver time to receive data ...
#sleep 20
echo \[sndrcv_kafka_fail.sh\]: Stopping receiver instance [omkafka]
shutdown_when_empty
wait_shutdown
# Do the final sequence check
seq_check2 1 $TESTMESSAGESFULL -d
echo \[sndrcv_kafka_fail.sh\]: delete kafka topics
. $srcdir/diag.sh delete-kafka-topic 'static' '.dep_wrk' '22181'
. $srcdir/diag.sh delete-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
echo \[sndrcv_kafka_fail.sh\]: stop kafka instance
. $srcdir/diag.sh stop-kafka
@ -128,5 +130,8 @@ echo \[sndrcv_kafka_fail.sh\]: stop kafka instance
# STOP ZOOKEEPER in any case
. $srcdir/diag.sh stop-zookeeper
# Do the final sequence check
seq_check2 1 $TESTMESSAGESFULL -d
echo success
exit_test

View File

@ -2,8 +2,11 @@
# added 2017-06-06 by alorbach
# This tests the keepFailedMessages feature in omkafka
# This file is part of the rsyslog project, released under ASL 2.0
export TESTMESSAGES=1000
export TESTMESSAGESFULL=1000
export TESTMESSAGES=50000
export TESTMESSAGESFULL=50000
# Generate random topic name
export RANDTOPIC=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
echo ===============================================================================
echo \[sndrcv_kafka_failresume.sh\]: Create kafka/zookeeper instance and static topic
@ -12,7 +15,7 @@ echo \[sndrcv_kafka_failresume.sh\]: Create kafka/zookeeper instance and static
. $srcdir/diag.sh stop-kafka
. $srcdir/diag.sh start-zookeeper
. $srcdir/diag.sh start-kafka
. $srcdir/diag.sh create-kafka-topic 'static' '.dep_wrk' '22181'
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
echo \[sndrcv_kafka_failresume.sh\]: Give Kafka some time to process topic create ...
sleep 5
@ -21,17 +24,21 @@ echo \[sndrcv_kafka_failresume.sh\]: Init Testbench
. $srcdir/diag.sh init
# --- Create omkafka receiver config
export RSYSLOG_DEBUGLOG="log"
generate_conf
add_conf '
module(load="../plugins/imkafka/.libs/imkafka")
/* Polls messages from kafka server!*/
input( type="imkafka"
topic="static"
topic="'$RANDTOPIC'"
broker="localhost:29092"
consumergroup="default"
confParam=[ "compression.codec=none",
"session.timeout.ms=10000",
"socket.timeout.ms=5000",
"socket.keepalive.enable=true"]
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"enable.partition.eof=false" ]
)
template(name="outfmt" type="string" string="%msg:F,58:2%\n")
@ -40,9 +47,13 @@ if ($msg contains "msgnum:") then {
action( type="omfile" file=`echo $RSYSLOG_OUT_LOG` template="outfmt" )
}
'
echo \[sndrcv_kafka_failresume.sh\]: Starting receiver instance [omkafka]
startup
# ---
# --- Create omkafka sender config
export RSYSLOG_DEBUGLOG="log2"
generate_conf 2
add_conf '
main_queue(queue.timeoutactioncompletion="10000" queue.timeoutshutdown="60000")
@ -55,7 +66,7 @@ template(name="outfmt" type="string" string="%msg%\n")
action( name="kafka-fwd"
type="omkafka"
topic="static"
topic="'$RANDTOPIC'"
broker="localhost:29092"
template="outfmt"
confParam=[ "compression.codec=none",
@ -74,15 +85,11 @@ action( name="kafka-fwd"
queue.saveonshutdown="on"
)
' 2
# ---
echo \[sndrcv_kafka_failresume.sh\]: Starting sender instance [imkafka]
export RSYSLOG_DEBUGLOG="log"
startup 2
echo \[sndrcv_kafka_failresume.sh\]: Starting receiver instance [omkafka]
export RSYSLOG_DEBUGLOG="log2"
startup
# ---
>>>>>>> testbench: Added new kafka tests using kafkacat for better debugging.
echo \[sndrcv_kafka_failresume.sh\]: Inject messages into rsyslog sender instance
tcpflood -m$TESTMESSAGES -i1
@ -111,9 +118,6 @@ echo \[sndrcv_kafka_failresume.sh\]: Stopping sender instance [imkafka]
shutdown_when_empty 2
wait_shutdown 2
echo \[sndrcv_kafka_failresume.sh\]: Sleep to give rsyslog receiver time to receive data ...
sleep 20
echo \[sndrcv_kafka_failresume.sh\]: Stopping receiver instance [omkafka]
shutdown_when_empty
wait_shutdown
@ -121,14 +125,14 @@ wait_shutdown
echo \[sndrcv_kafka_failresume.sh\]: delete kafka topics
. $srcdir/diag.sh delete-kafka-topic 'static' '.dep_wrk' '22181'
# Do the final sequence check
seq_check 1 $TESTMESSAGESFULL
echo \[sndrcv_kafka_failresume.sh\]: stop kafka instance
. $srcdir/diag.sh stop-kafka
# STOP ZOOKEEPER in any case
. $srcdir/diag.sh stop-zookeeper
# Do the final sequence check
seq_check 1 $TESTMESSAGESFULL
echo success
exit_test

View File

@ -1,8 +1,11 @@
#!/bin/bash
# added 2017-05-08 by alorbach
# This file is part of the rsyslog project, released under ASL 2.0
export TESTMESSAGES=1000
export TESTMESSAGESFULL=1000
export TESTMESSAGES=100000
export TESTMESSAGESFULL=100000
# Generate random topic name
export RANDTOPIC=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
echo ===============================================================================
echo \[sndrcv_kafka_multi.sh\]: Create multiple kafka/zookeeper instances and static topic
@ -19,37 +22,20 @@ echo \[sndrcv_kafka_multi.sh\]: Create multiple kafka/zookeeper instances and st
. $srcdir/diag.sh start-kafka '.dep_wrk1'
. $srcdir/diag.sh start-kafka '.dep_wrk2'
. $srcdir/diag.sh start-kafka '.dep_wrk3'
. $srcdir/diag.sh create-kafka-topic 'static' '.dep_wrk1' '22181'
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC '.dep_wrk1' '22181'
echo \[sndrcv_kafka_multi.sh\]: Give Kafka some time to process topic create ...
sleep 5
echo \[sndrcv_kafka_multi.sh\]: Init Testbench
. $srcdir/diag.sh init
# --- Create omkafka receiver config
# --- Create omkafka sender config
export RSYSLOG_DEBUGLOG="log"
generate_conf
add_conf '
module(load="../plugins/imkafka/.libs/imkafka")
/* Polls messages from kafka server!*/
input( type="imkafka"
topic="static"
broker=["localhost:29092", "localhost:29093", "localhost:29094"]
# broker="localhost:29092"
consumergroup="default"
confParam=[ "compression.codec=none",
"socket.timeout.ms=5000",
"enable.partition.eof=false",
"socket.keepalive.enable=true"]
)
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
template(name="outfmt" type="string" string="%msg:F,58:2%\n")
if ($msg contains "msgnum:") then {
action( type="omfile" file=`echo $RSYSLOG_OUT_LOG` template="outfmt" )
}
'
# ---
# --- Create omkafka sender config
generate_conf 2
add_conf '
module(load="../plugins/omkafka/.libs/omkafka")
module(load="../plugins/imtcp/.libs/imtcp")
input(type="imtcp" port="'$TCPFLOOD_PORT'" Ruleset="omkafka") /* this port for tcpflood! */
@ -60,50 +46,63 @@ ruleset(name="omkafka") {
action( type="omkafka"
name="kafka-fwd"
broker=["localhost:29092", "localhost:29093", "localhost:29094"]
topic="static"
topic="'$RANDTOPIC'"
template="outfmt"
confParam=[ "compression.codec=none",
"socket.timeout.ms=5000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"queue.buffering.max.messages=20000",
"queue.buffering.max.messages=10000",
"message.send.max.retries=1"]
partitions.auto="on"
partitions.scheme="random"
closeTimeout="60000"
queue.size="1000000"
queue.type="LinkedList"
action.repeatedmsgcontainsoriginalmsg="off"
action.resumeRetryCount="-1"
action.resumeInterval="1"
action.resumeRetryCount="2"
action.reportSuspension="on"
action.reportSuspensionContinuation="on" )
}
ruleset(name="omkafka1") {
action(name="kafka-fwd" type="omkafka" topic="static" broker="localhost:29092" template="outfmt" partitions.auto="on")
}
ruleset(name="omkafka2") {
action(name="kafka-fwd" type="omkafka" topic="static" broker="localhost:29093" template="outfmt" partitions.auto="on")
}
ruleset(name="omkafka3") {
action(name="kafka-fwd" type="omkafka" topic="static" broker="localhost:29094" template="outfmt" partitions.auto="on")
}
' 2
# ---
'
echo \[sndrcv_kafka_multi.sh\]: Starting sender instance [omkafka]
export RSYSLOG_DEBUGLOG="log"
startup
# now inject the messages into instance 2. It will connect to instance 1, and that instance will record the data.
tcpflood -m$TESTMESSAGES -i1
echo \[sndrcv_kafka_multi.sh\]: Starting receiver instance [imkafka]
# --- Create omkafka receiver config
export RSYSLOG_DEBUGLOG="log2"
startup 2
generate_conf 2
add_conf '
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
#echo \[sndrcv_kafka_multi.sh\]: Sleep to give rsyslog instances time to process data ...
#sleep 20
module(load="../plugins/imkafka/.libs/imkafka")
/* Polls messages from kafka server!*/
input( type="imkafka"
topic="'$RANDTOPIC'"
broker=["localhost:29092", "localhost:29093", "localhost:29094"]
consumergroup="default"
confParam=[ "compression.codec=none",
"session.timeout.ms=10000",
"socket.timeout.ms=5000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"enable.partition.eof=false" ]
)
template(name="outfmt" type="string" string="%msg:F,58:2%\n")
if ($msg contains "msgnum:") then {
action( type="omfile" file=`echo '$RSYSLOG_OUT_LOG'` template="outfmt" )
}
' 2
echo \[sndrcv_kafka_multi.sh\]: Starting receiver instance [imkafka]
startup 2
echo \[sndrcv_kafka_multi.sh\]: Stopping sender instance [omkafka]
shutdown_when_empty
@ -114,10 +113,7 @@ shutdown_when_empty 2
wait_shutdown 2
echo \[sndrcv_kafka_multi.sh\]: delete kafka topics
. $srcdir/diag.sh delete-kafka-topic 'static' '.dep_wrk1' '22181'
# Do the final sequence check
seq_check 1 $TESTMESSAGESFULL -d
. $srcdir/diag.sh delete-kafka-topic $RANDTOPIC '.dep_wrk1' '22181'
echo \[sndrcv_kafka.sh\]: stop kafka instances
. $srcdir/diag.sh stop-kafka '.dep_wrk1'
@ -127,5 +123,8 @@ echo \[sndrcv_kafka.sh\]: stop kafka instances
. $srcdir/diag.sh stop-zookeeper '.dep_wrk2'
. $srcdir/diag.sh stop-zookeeper '.dep_wrk3'
# Do the final sequence check
seq_check 1 $TESTMESSAGESFULL -d
echo success
exit_test

View File

@ -1,8 +1,13 @@
#!/bin/bash
# added 2018-08-13 by alorbach
# This file is part of the rsyslog project, released under ASL 2.0
export TESTMESSAGES=5000
export TESTMESSAGESFULL=10000
export TESTMESSAGES=50000
export TESTMESSAGESFULL=100000
# Generate random topic name
export RANDTOPIC1=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
export RANDTOPIC2=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
# enable the EXTRA_EXITCHECK only if really needed - otherwise spams the test log
# too much
#export EXTRA_EXITCHECK=dumpkafkalogs
@ -13,8 +18,8 @@ echo \[sndrcv_kafka_multi_topics.sh\]: Create kafka/zookeeper instance and stati
. $srcdir/diag.sh stop-kafka
. $srcdir/diag.sh start-zookeeper
. $srcdir/diag.sh start-kafka
. $srcdir/diag.sh create-kafka-topic 'static1' '.dep_wrk' '22181'
. $srcdir/diag.sh create-kafka-topic 'static2' '.dep_wrk' '22181'
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC1 '.dep_wrk' '22181'
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC2 '.dep_wrk' '22181'
echo \[sndrcv_kafka_multi_topics.sh\]: Give Kafka some time to process topic create ...
sleep 5
@ -23,88 +28,95 @@ echo \[sndrcv_kafka_fail.sh\]: Init Testbench
. $srcdir/diag.sh init
# --- Create omkafka sender config
export RSYSLOG_DEBUGLOG="log"
generate_conf
add_conf '
main_queue(queue.timeoutactioncompletion="10000" queue.timeoutshutdown="60000")
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
module(load="../plugins/omkafka/.libs/omkafka")
module(load="../plugins/imtcp/.libs/imtcp")
input(type="imtcp" port="13514") /* this port for tcpflood! */
input(type="imtcp" port="'$TCPFLOOD_PORT'") /* this port for tcpflood! */
template(name="outfmt" type="string" string="%msg%\n")
local4.* action( name="kafka-fwd"
type="omkafka"
topic="static1"
topic="'$RANDTOPIC1'"
broker="localhost:29092"
template="outfmt"
confParam=[ "compression.codec=none",
"socket.timeout.ms=10000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"queue.buffering.max.messages=20000",
"queue.buffering.max.messages=10000",
"enable.auto.commit=true",
"message.send.max.retries=1"]
topicConfParam=["message.timeout.ms=10000"]
partitions.auto="on"
closeTimeout="30000"
closeTimeout="60000"
resubmitOnFailure="on"
keepFailedMessages="on"
failedMsgFile="omkafka-failed1.data"
action.resumeInterval="2"
action.resumeRetryCount="-1"
action.resumeInterval="1"
action.resumeRetryCount="2"
queue.saveonshutdown="on"
)
local4.* action( name="kafka-fwd"
type="omkafka"
topic="static2"
topic="'$RANDTOPIC2'"
broker="localhost:29092"
template="outfmt"
confParam=[ "compression.codec=none",
"socket.timeout.ms=10000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"queue.buffering.max.messages=20000",
"queue.buffering.max.messages=10000",
"enable.auto.commit=true",
"message.send.max.retries=1"]
topicConfParam=["message.timeout.ms=10000"]
partitions.auto="on"
closeTimeout="30000"
closeTimeout="60000"
resubmitOnFailure="on"
keepFailedMessages="on"
failedMsgFile="omkafka-failed2.data"
action.resumeInterval="2"
action.resumeRetryCount="-1"
action.resumeInterval="1"
action.resumeRetryCount="2"
queue.saveonshutdown="on"
)
'
echo \[sndrcv_kafka_multi_topics.sh\]: Starting sender instance [omkafka]
export RSYSLOG_DEBUGLOG="log"
startup
# ---
echo \[sndrcv_kafka.sh\]: Inject messages into rsyslog sender instance
tcpflood -m$TESTMESSAGES -i1
# --- Create omkafka sender config
# --- Create omkafka receiver config
export RSYSLOG_DEBUGLOG="log2"
generate_conf 2
add_conf '
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
module(load="../plugins/imkafka/.libs/imkafka")
/* Polls messages from kafka server!*/
input( type="imkafka"
topic="static1"
topic="'$RANDTOPIC1'"
broker="localhost:29092"
consumergroup="default1"
confParam=[ "compression.codec=none",
"session.timeout.ms=10000",
"socket.timeout.ms=10000",
"enable.partition.eof=false",
"reconnect.backoff.jitter.ms=1000",
"socket.keepalive.enable=true"]
)
input( type="imkafka"
topic="static2"
topic="'$RANDTOPIC2'"
broker="localhost:29092"
consumergroup="default2"
confParam=[ "compression.codec=none",
"session.timeout.ms=10000",
"socket.timeout.ms=10000",
"enable.partition.eof=false",
"reconnect.backoff.jitter.ms=1000",
@ -119,31 +131,20 @@ if ($msg contains "msgnum:") then {
' 2
echo \[sndrcv_kafka_multi_topics.sh\]: Starting receiver instance [imkafka]
export RSYSLOG_DEBUGLOG="log2"
startup 2
# ---
echo \[sndrcv_kafka.sh\]: Inject messages into rsyslog sender instance
tcpflood -m$TESTMESSAGES -i1
#echo \[sndrcv_kafka.sh\]: Sleep to give rsyslog instances time to process data ...
sleep 5
echo \[sndrcv_kafka.sh\]: Stopping sender instance [omkafka]
shutdown_when_empty 2
wait_shutdown 2
echo \[sndrcv_kafka.sh\]: Stopping receiver instance [imkafka]
shutdown_when_empty
wait_shutdown
# Do the final sequence check
seq_check 1 $TESTMESSAGES -d
content_check_with_count "000" $TESTMESSAGESFULL
echo \[sndrcv_kafka.sh\]: Stopping receiver instance [imkafka]
shutdown_when_empty 2
wait_shutdown 2
echo \[sndrcv_kafka.sh\]: delete kafka topics
. $srcdir/diag.sh delete-kafka-topic 'static1' '.dep_wrk' '22181'
. $srcdir/diag.sh delete-kafka-topic 'static2' '.dep_wrk' '22181'
. $srcdir/diag.sh delete-kafka-topic $RANDTOPIC1 '.dep_wrk' '22181'
. $srcdir/diag.sh delete-kafka-topic $RANDTOPIC2 '.dep_wrk' '22181'
echo \[sndrcv_kafka.sh\]: stop kafka instance
. $srcdir/diag.sh stop-kafka
@ -151,5 +152,9 @@ echo \[sndrcv_kafka.sh\]: stop kafka instance
# STOP ZOOKEEPER in any case
. $srcdir/diag.sh stop-zookeeper
# Do the final sequence check
seq_check 1 $TESTMESSAGES -d
content_check_with_count "000" $TESTMESSAGESFULL
echo success
exit_test

View File

@ -1,12 +1,12 @@
broker.id=1
listeners=PLAINTEXT://:29092
listeners=plaintext://localhost:29092
auto.create.topics.enable=true
auto.leader.rebalance.enable=true
background.threads=2
compression.type=producer
controlled.shutdown.enable=true
default.replication.factor=3
default.replication.factor=1
delete.topic.enable=true
dual.commit.enabled=false
@ -14,52 +14,54 @@ dual.commit.enabled=false
leader.imbalance.check.interval.seconds=10
leader.imbalance.per.broker.percentage=10
#10 MB is sufficient for testing
log.segment.bytes=10485760
log.cleaner.enable=true
#100 MB is sufficient for testing
log.segment.bytes=104857600
log.cleaner.enable=false
log.cleanup.policy=delete
log.retention.hours=1
log.dirs=kafka-logs
log.flush.interval.messages=1000
log.flush.interval.ms=1000
log.flush.scheduler.interval.ms=1000
log.flush.interval.messages=10000
log.flush.interval.ms=10000
log.flush.scheduler.interval.ms=10000
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.index.size.max.bytes=104857600
log.message.timestamp.type=CreateTime
log.retention.check.interval.ms=300000
log.retention.bytes=104857600
log.retention.hours=168
log.roll.hours=168
message.max.bytes=1000000
num.network.threads=2
num.io.threads=2
num.partitions=100
num.partitions=2
num.recovery.threads.per.data.dir=1
num.replica.fetchers=1
min.insync.replicas=2
min.insync.replicas=1
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.request.max.bytes=10485760
socket.send.buffer.bytes=102400
offsets.storage=kafka
offsets.topic.num.partitions=50
offsets.topic.num.partitions=1
offsets.topic.replication.factor=3
transaction.state.log.num.partitions=1
replica.fetch.max.bytes=1048576
replica.fetch.max.bytes=10485760
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.lag.time.max.ms=10000
replica.socket.receive.buffer.bytes=65536
replica.socket.timeout.ms=5000
unclean.leader.election.enable=false
queued.max.requests=500
# Allow unclean leader election!
unclean.leader.election.enable=true
queued.max.requests=10000
zookeeper.connect=localhost:22181/kafka,localhost:22182/kafka,localhost:22183/kafka
zookeeper.connection.timeout.ms=10000
zookeeper.session.timeout.ms=5000
zookeeper.sync.time.ms=2000
zookeeper.sync.time.ms=5000
group.id="default"

View File

@ -1,12 +1,12 @@
broker.id=2
listeners=PLAINTEXT://:29093
listeners=plaintext://localhost:29093
auto.create.topics.enable=true
auto.leader.rebalance.enable=true
background.threads=2
compression.type=producer
controlled.shutdown.enable=true
default.replication.factor=3
default.replication.factor=1
delete.topic.enable=true
dual.commit.enabled=false
@ -14,52 +14,54 @@ dual.commit.enabled=false
leader.imbalance.check.interval.seconds=10
leader.imbalance.per.broker.percentage=10
#10 MB is sufficient for testing
log.segment.bytes=10485760
log.cleaner.enable=true
#100 MB is sufficient for testing
log.segment.bytes=104857600
log.cleaner.enable=false
log.cleanup.policy=delete
log.retention.hours=1
log.dirs=kafka-logs
log.flush.interval.messages=1000
log.flush.interval.ms=1000
log.flush.scheduler.interval.ms=1000
log.flush.interval.messages=10000
log.flush.interval.ms=10000
log.flush.scheduler.interval.ms=10000
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.index.size.max.bytes=104857600
log.message.timestamp.type=CreateTime
log.retention.check.interval.ms=300000
log.retention.bytes=104857600
log.retention.hours=168
log.roll.hours=168
message.max.bytes=1000000
num.network.threads=2
num.io.threads=2
num.partitions=100
num.partitions=2
num.recovery.threads.per.data.dir=1
num.replica.fetchers=1
min.insync.replicas=2
min.insync.replicas=1
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.request.max.bytes=10485760
socket.send.buffer.bytes=102400
offsets.storage=kafka
offsets.topic.num.partitions=1
offsets.topic.replication.factor=3
transaction.state.log.num.partitions=1
replica.fetch.max.bytes=1048576
replica.fetch.max.bytes=10485760
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.lag.time.max.ms=10000
replica.socket.receive.buffer.bytes=65536
replica.socket.timeout.ms=5000
unclean.leader.election.enable=false
queued.max.requests=500
# Allow unclean leader election!
unclean.leader.election.enable=true
queued.max.requests=10000
zookeeper.connect=localhost:22181/kafka,localhost:22182/kafka,localhost:22183/kafka
zookeeper.connection.timeout.ms=10000
zookeeper.session.timeout.ms=5000
zookeeper.sync.time.ms=2000
zookeeper.sync.time.ms=5000
group.id="default"

View File

@ -1,12 +1,12 @@
broker.id=3
listeners=PLAINTEXT://:29094
listeners=plaintext://localhost:29094
auto.create.topics.enable=true
auto.leader.rebalance.enable=true
background.threads=2
compression.type=producer
controlled.shutdown.enable=true
default.replication.factor=3
default.replication.factor=1
delete.topic.enable=true
dual.commit.enabled=false
@ -14,52 +14,54 @@ dual.commit.enabled=false
leader.imbalance.check.interval.seconds=10
leader.imbalance.per.broker.percentage=10
#10 MB is sufficient for testing
log.segment.bytes=10485760
log.cleaner.enable=true
#100 MB is sufficient for testing
log.segment.bytes=104857600
log.cleaner.enable=false
log.cleanup.policy=delete
log.retention.hours=1
log.dirs=kafka-logs
log.flush.interval.messages=1000
log.flush.interval.ms=1000
log.flush.scheduler.interval.ms=1000
log.flush.interval.messages=10000
log.flush.interval.ms=10000
log.flush.scheduler.interval.ms=10000
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.index.size.max.bytes=104857600
log.message.timestamp.type=CreateTime
log.retention.check.interval.ms=300000
log.retention.bytes=104857600
log.retention.hours=168
log.roll.hours=168
message.max.bytes=1000000
num.network.threads=2
num.io.threads=2
num.partitions=100
num.partitions=2
num.recovery.threads.per.data.dir=1
num.replica.fetchers=1
min.insync.replicas=2
min.insync.replicas=1
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.request.max.bytes=10485760
socket.send.buffer.bytes=102400
offsets.storage=kafka
offsets.topic.num.partitions=1
offsets.topic.replication.factor=3
transaction.state.log.num.partitions=1
replica.fetch.max.bytes=1048576
replica.fetch.max.bytes=10485760
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.lag.time.max.ms=10000
replica.socket.receive.buffer.bytes=65536
replica.socket.timeout.ms=5000
unclean.leader.election.enable=false
queued.max.requests=500
# Allow unclean leader election!
unclean.leader.election.enable=true
queued.max.requests=10000
zookeeper.connect=localhost:22181/kafka,localhost:22182/kafka,localhost:22183/kafka
zookeeper.connection.timeout.ms=10000
zookeeper.session.timeout.ms=5000
zookeeper.sync.time.ms=2000
zookeeper.sync.time.ms=5000
group.id="default"

View File

@ -1,5 +1,5 @@
broker.id=0
listeners=PLAINTEXT://:29092
listeners=plaintext://localhost:29092
auto.create.topics.enable=true
auto.leader.rebalance.enable=true
@ -9,52 +9,53 @@ controlled.shutdown.enable=true
default.replication.factor=1
delete.topic.enable=true
dual.commit.enabled=true
dual.commit.enabled=false
leader.imbalance.check.interval.seconds=10
leader.imbalance.per.broker.percentage=10
#100 MB is sufficient for testing
log.segment.bytes=536870912
log.segment.bytes=104857600
log.cleaner.enable=false
log.cleanup.policy=delete
log.retention.hours=1
log.dirs=kafka-logs
log.flush.interval.messages=1000
log.flush.interval.ms=1000
log.flush.scheduler.interval.ms=1000
log.flush.interval.messages=10000
log.flush.interval.ms=10000
log.flush.scheduler.interval.ms=10000
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.index.size.max.bytes=104857600
log.message.timestamp.type=CreateTime
log.retention.check.interval.ms=300000
log.retention.bytes=104857600
log.retention.hours=168
log.roll.hours=168
message.max.bytes=1000000
num.network.threads=2
num.io.threads=2
num.partitions=1
num.partitions=2
num.recovery.threads.per.data.dir=1
num.replica.fetchers=1
min.insync.replicas=1
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.request.max.bytes=10485760
socket.send.buffer.bytes=102400
offsets.storage=kafka
offsets.topic.num.partitions=1
#offsets.topic.replication.factor=3
transaction.state.log.num.partitions=1
offsets.topic.num.partitions=2
offsets.topic.replication.factor=1
transaction.state.log.num.partitions=2
replica.fetch.max.bytes=1048576
replica.fetch.max.bytes=10485760
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.lag.time.max.ms=10000
replica.socket.receive.buffer.bytes=65536
replica.socket.timeout.ms=5000
unclean.leader.election.enable=false
# Allow unclean leader election!
unclean.leader.election.enable=true
queued.max.requests=10000
zookeeper.connect=localhost:22181/kafka

View File

@ -1,20 +1,5 @@
#--- Do we need this for the test?
#server.1=localhost:2889:3888
#server.2=localhost:3889:4888
#server.3=localhost:4889:5888
#---
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
tickTime=1000
initLimit=5
syncLimit=2
dataDir=zk_data_dir
# the port at which the clients will connect
clientPort=22181

View File

@ -1,20 +1,5 @@
#--- Do we need this for the test?
#server.1=localhost:2889:3888
#server.2=localhost:3889:4888
#server.3=localhost:4889:5888
#---
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
tickTime=1000
initLimit=5
syncLimit=2
dataDir=zk_data_dir
# the port at which the clients will connect
clientPort=22182

View File

@ -1,20 +1,5 @@
#--- Do we need this for the test?
#server.1=localhost:2889:3888
#server.2=localhost:3889:4888
#server.3=localhost:4889:5888
#---
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
tickTime=1000
initLimit=5
syncLimit=2
dataDir=zk_data_dir
# the port at which the clients will connect
clientPort=22183

View File

@ -53,6 +53,7 @@ fi
# As travis has no xenial images, we always need to install librdkafka from source
if [ "x$KAFKA" == "xYES" ]; then
sudo apt-get install -qq liblz4-dev
# For kafka testbench, "kafkacat" package is needed!
git clone https://github.com/edenhill/librdkafka > /dev/null
(unset CFLAGS; cd librdkafka ; ./configure --prefix=/usr --CFLAGS="-g" > /dev/null ; make -j2 > /dev/null ; sudo make install > /dev/null)
rm -rf librdkafka # get rid of source, e.g. for line length check