rsyslog/tests/omkafka.sh
Andre Lorbach 847d4223ef testbench: Incremented input timeout to 60000 because kafka tests may fail on load otherwise.
Also fixed minor configuration and syntax issues.
Moved test init in kafka tests as far as possible to the top.
2018-09-07 11:52:57 +02:00

100 lines
2.8 KiB
Bash
Executable File

#!/bin/bash
# added 2017-05-03 by alorbach
# This file is part of the rsyslog project, released under ASL 2.0
export TESTMESSAGES=100000
export TESTMESSAGESFULL=$TESTMESSAGES
# Generate random topic name
export RANDTOPIC=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
# enable the EXTRA_EXITCHECK only if really needed - otherwise spams the test log
# too much
#export EXTRA_EXITCHECK=dumpkafkalogs
echo ===============================================================================
echo Check and Stop previous instances of kafka/zookeeper
. $srcdir/diag.sh download-kafka
. $srcdir/diag.sh stop-zookeeper
. $srcdir/diag.sh stop-kafka
echo Init Testbench
. $srcdir/diag.sh init
# Check for kafkacat
check_command_available kafkacat
echo Create kafka/zookeeper instance and $RANDTOPIC topic
. $srcdir/diag.sh start-zookeeper
. $srcdir/diag.sh start-kafka
# create new topic
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
echo Give Kafka some time to process topic create ...
sleep 5
# --- Create/Start omkafka sender config
export RSYSLOG_DEBUGLOG="log"
generate_conf
add_conf '
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
module(load="../plugins/omkafka/.libs/omkafka")
module(load="../plugins/imtcp/.libs/imtcp")
input(type="imtcp" port="'$TCPFLOOD_PORT'") /* this port for tcpflood! */
template(name="outfmt" type="string" string="%msg:F,58:2%\n")
local4.* action( name="kafka-fwd"
type="omkafka"
topic="'$RANDTOPIC'"
broker="localhost:29092"
template="outfmt"
confParam=[ "compression.codec=none",
"socket.timeout.ms=10000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"queue.buffering.max.messages=10000",
"enable.auto.commit=true",
"message.send.max.retries=1"]
topicConfParam=["message.timeout.ms=10000"]
partitions.auto="on"
closeTimeout="60000"
resubmitOnFailure="on"
keepFailedMessages="on"
failedMsgFile="'$RSYSLOG_OUT_LOG'-failed-'$RANDTOPIC'.data"
action.resumeInterval="1"
action.resumeRetryCount="2"
queue.saveonshutdown="on"
)
'
echo Starting sender instance [omkafka]
startup
# ---
echo Inject messages into rsyslog sender instance
tcpflood -m$TESTMESSAGES -i1
echo Stopping sender instance [omkafka]
shutdown_when_empty
wait_shutdown
kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s'> $RSYSLOG_OUT_LOG
kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p@%o:%k:%s' > $RSYSLOG_OUT_LOG.extra
# Delete topic to remove old traces before
# . $srcdir/diag.sh delete-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
# Dump Kafka log | uncomment if needed
# . $srcdir/diag.sh dump-kafka-serverlog
echo stop kafka instance
. $srcdir/diag.sh stop-kafka
# STOP ZOOKEEPER in any case
. $srcdir/diag.sh stop-zookeeper
# Do the final sequence check
seq_check 1 $TESTMESSAGESFULL -d
echo success
exit_test