CI: many kafkatests were not executed b/c kafkacat was renamed

kafkacat has been renamed to kcat, as it looks for trademark issues.
Kafka module tests depend on that utility and are skipped if command
is not available. This was now always the case for newer development
containers.

Name was now adjusted in all tests.
This commit is contained in:
Rainer Gerhards 2025-09-04 17:18:42 +02:00
parent fe55b31521
commit 697f7d8378
No known key found for this signature in database
GPG Key ID: 0CB6B2A8BE80B499
18 changed files with 56 additions and 56 deletions

View File

@ -445,11 +445,11 @@ kafka_check_broken_broker() {
fi fi
} }
# inject messages via kafkacat tool (for imkafka tests) # inject messages via kcat tool (for imkafka tests)
# $1 == "--wait" means wait for rsyslog to receive TESTMESSAGES lines in RSYSLOG_OUT_LOG # $1 == "--wait" means wait for rsyslog to receive TESTMESSAGES lines in RSYSLOG_OUT_LOG
# $TESTMESSAGES contains number of messages to inject # $TESTMESSAGES contains number of messages to inject
# $RANDTOPIC contains topic to produce to # $RANDTOPIC contains topic to produce to
injectmsg_kafkacat() { injectmsg_kcat() {
if [ "$1" == "--wait" ]; then if [ "$1" == "--wait" ]; then
wait="YES" wait="YES"
shift shift
@ -458,7 +458,7 @@ injectmsg_kafkacat() {
printf 'TESTBENCH ERROR: TESTMESSAGES env var not set!\n' printf 'TESTBENCH ERROR: TESTMESSAGES env var not set!\n'
error_exit 1 error_exit 1
fi fi
MAXATONCE=25000 # how many msgs should kafkacat send? - hint: current version errs out above ~70000 MAXATONCE=25000 # how many msgs should kcat send? - hint: current version errs out above ~70000
i=1 i=1
while (( i<=TESTMESSAGES )); do while (( i<=TESTMESSAGES )); do
currmsgs=0 currmsgs=0
@ -466,14 +466,14 @@ injectmsg_kafkacat() {
printf ' msgnum:%8.8d\n' $i; printf ' msgnum:%8.8d\n' $i;
i=$((i + 1)) i=$((i + 1))
currmsgs=$((currmsgs+1)) currmsgs=$((currmsgs+1))
done > "$RSYSLOG_DYNNAME.kafkacat.in" done > "$RSYSLOG_DYNNAME.kcat.in"
set -e set -e
kafkacat -P -b localhost:29092 -t $RANDTOPIC <"$RSYSLOG_DYNNAME.kafkacat.in" 2>&1 | tee >$RSYSLOG_DYNNAME.kafkacat.log kcat -P -b localhost:29092 -t $RANDTOPIC <"$RSYSLOG_DYNNAME.kcat.in" 2>&1 | tee >$RSYSLOG_DYNNAME.kcat.log
set +e set +e
printf 'kafkacat injected %d msgs so far\n' $((i - 1)) printf 'kcat injected %d msgs so far\n' $((i - 1))
kafka_check_broken_broker $RSYSLOG_DYNNAME.kafkacat.log kafka_check_broken_broker $RSYSLOG_DYNNAME.kcat.log
check_not_present "ERROR" $RSYSLOG_DYNNAME.kafkacat.log check_not_present "ERROR" $RSYSLOG_DYNNAME.kcat.log
cat $RSYSLOG_DYNNAME.kafkacat.log cat $RSYSLOG_DYNNAME.kcat.log
done done
if [ "$wait" == "YES" ]; then if [ "$wait" == "YES" ]; then

View File

@ -2,7 +2,7 @@
# added 2018-10-24 by rgerhards # added 2018-10-24 by rgerhards
# This file is part of the rsyslog project, released under ASL 2.0 # This file is part of the rsyslog project, released under ASL 2.0
. ${srcdir:=.}/diag.sh init . ${srcdir:=.}/diag.sh init
check_command_available kafkacat check_command_available kcat
export KEEP_KAFKA_RUNNING="YES" export KEEP_KAFKA_RUNNING="YES"
export TESTMESSAGES=100000 export TESTMESSAGES=100000
@ -50,7 +50,7 @@ if ($msg contains "msgnum:") then {
export RSTB_DAEMONIZE="YES" export RSTB_DAEMONIZE="YES"
startup startup
injectmsg_kafkacat --wait 1 $TESTMESSAGES -d injectmsg_kcat --wait 1 $TESTMESSAGES -d
shutdown_when_empty shutdown_when_empty
wait_shutdown wait_shutdown

View File

@ -2,7 +2,7 @@
# added 2018-08-29 by alorbach # added 2018-08-29 by alorbach
# This file is part of the rsyslog project, released under ASL 2.0 # This file is part of the rsyslog project, released under ASL 2.0
. ${srcdir:=.}/diag.sh init . ${srcdir:=.}/diag.sh init
check_command_available kafkacat check_command_available kcat
export KEEP_KAFKA_RUNNING="YES" export KEEP_KAFKA_RUNNING="YES"
export TESTMESSAGES=1000 export TESTMESSAGES=1000
@ -46,7 +46,7 @@ startup
# We inject messages, even though we know this will not work. The reason # We inject messages, even though we know this will not work. The reason
# is that we want to ensure we do not get a segfault in such an error case # is that we want to ensure we do not get a segfault in such an error case
injectmsg_kafkacat injectmsg_kcat
shutdown_when_empty shutdown_when_empty
wait_shutdown wait_shutdown

View File

@ -2,7 +2,7 @@
# added 2018-10-26 by rgerhards # added 2018-10-26 by rgerhards
# This file is part of the rsyslog project, released under ASL 2.0 # This file is part of the rsyslog project, released under ASL 2.0
. ${srcdir:=.}/diag.sh init . ${srcdir:=.}/diag.sh init
check_command_available kafkacat check_command_available kcat
export KEEP_KAFKA_RUNNING="YES" export KEEP_KAFKA_RUNNING="YES"
export TESTMESSAGES=100000 export TESTMESSAGES=100000
@ -44,7 +44,7 @@ if ($msg contains "msgnum:") then {
' '
startup startup
injectmsg_kafkacat --wait 1 $TESTMESSAGES -d injectmsg_kcat --wait 1 $TESTMESSAGES -d
shutdown_when_empty shutdown_when_empty
wait_shutdown wait_shutdown

View File

@ -2,7 +2,7 @@
# added 2018-08-29 by alorbach # added 2018-08-29 by alorbach
# This file is part of the rsyslog project, released under ASL 2.0 # This file is part of the rsyslog project, released under ASL 2.0
. ${srcdir:=.}/diag.sh init . ${srcdir:=.}/diag.sh init
check_command_available kafkacat check_command_available kcat
export KEEP_KAFKA_RUNNING="YES" export KEEP_KAFKA_RUNNING="YES"
export TESTMESSAGES=100000 export TESTMESSAGES=100000
@ -46,7 +46,7 @@ if ($msg contains "msgnum:") then {
} }
' '
startup startup
injectmsg_kafkacat --wait 1 $TESTMESSAGESFULL -d injectmsg_kcat --wait 1 $TESTMESSAGESFULL -d
shutdown_when_empty shutdown_when_empty
wait_shutdown wait_shutdown

View File

@ -2,7 +2,7 @@
# added 2018-08-29 by alorbach # added 2018-08-29 by alorbach
# This file is part of the rsyslog project, released under ASL 2.0 # This file is part of the rsyslog project, released under ASL 2.0
. ${srcdir:=.}/diag.sh init . ${srcdir:=.}/diag.sh init
check_command_available kafkacat check_command_available kcat
export KEEP_KAFKA_RUNNING="YES" export KEEP_KAFKA_RUNNING="YES"
# False positive codefactor.io # False positive codefactor.io
export RSYSLOG_OUT_LOG_1="${RSYSLOG_OUT_LOG:-default}.1" export RSYSLOG_OUT_LOG_1="${RSYSLOG_OUT_LOG:-default}.1"
@ -86,7 +86,7 @@ startup 2
TIMESTART=$(date +%s.%N) TIMESTART=$(date +%s.%N)
injectmsg_kafkacat injectmsg_kcat
# special case: number of test messages differs from file output # special case: number of test messages differs from file output
wait_file_lines $RSYSLOG_OUT_LOG $((TESTMESSAGES)) ${RETRIES:-200} wait_file_lines $RSYSLOG_OUT_LOG $((TESTMESSAGES)) ${RETRIES:-200}
# Check that at least 25% messages are in both logfiles, otherwise load balancing hasn't worked # Check that at least 25% messages are in both logfiles, otherwise load balancing hasn't worked

View File

@ -3,7 +3,7 @@
# This file is part of the rsyslog project, released under ASL 2.0 # This file is part of the rsyslog project, released under ASL 2.0
echo Init Testbench echo Init Testbench
. ${srcdir:=.}/diag.sh init . ${srcdir:=.}/diag.sh init
check_command_available kafkacat check_command_available kcat
# *** ============================================================================== # *** ==============================================================================
export TESTMESSAGES=100000 export TESTMESSAGES=100000
@ -115,7 +115,7 @@ do
done done
echo Inject messages into kafka echo Inject messages into kafka
kafkacat <$RSYSLOG_OUT_LOG.in -P -b localhost:29092 -t $RANDTOPIC kcat <$RSYSLOG_OUT_LOG.in -P -b localhost:29092 -t $RANDTOPIC
# --- # ---
echo Give imkafka some time to start... echo Give imkafka some time to start...

View File

@ -2,7 +2,7 @@
# added 2018-08-29 by alorbach # added 2018-08-29 by alorbach
# This file is part of the rsyslog project, released under ASL 2.0 # This file is part of the rsyslog project, released under ASL 2.0
. ${srcdir:=.}/diag.sh init . ${srcdir:=.}/diag.sh init
check_command_available kafkacat check_command_available kcat
export KEEP_KAFKA_RUNNING="YES" export KEEP_KAFKA_RUNNING="YES"
export TESTMESSAGES=100000 export TESTMESSAGES=100000
@ -134,7 +134,7 @@ startup
TIMESTART=$(date +%s.%N) TIMESTART=$(date +%s.%N)
injectmsg_kafkacat injectmsg_kcat
# special case: number of test messages differs from file output # special case: number of test messages differs from file output
wait_file_lines $RSYSLOG_OUT_LOG $((TESTMESSAGES * 8)) ${RETRIES:-200} wait_file_lines $RSYSLOG_OUT_LOG $((TESTMESSAGES * 8)) ${RETRIES:-200}
shutdown_when_empty shutdown_when_empty

View File

@ -2,7 +2,7 @@
# added 2018-10-26 by Rainer Gerhards # added 2018-10-26 by Rainer Gerhards
# This file is part of the rsyslog project, released under ASL 2.0 # This file is part of the rsyslog project, released under ASL 2.0
. ${srcdir:=.}/diag.sh init . ${srcdir:=.}/diag.sh init
check_command_available kafkacat check_command_available kcat
export KEEP_KAFKA_RUNNING="YES" export KEEP_KAFKA_RUNNING="YES"
export TESTMESSAGES=100000 export TESTMESSAGES=100000
@ -20,19 +20,19 @@ start_zookeeper
start_kafka start_kafka
create_kafka_topic $RANDTOPIC '.dep_wrk' '22181' create_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
printf 'injecting messages via kafkacat\n' printf 'injecting messages via kcat\n'
injectmsg_kafkacat injectmsg_kcat
# experimental: wait until kafkacat receives everything # experimental: wait until kcat receives everything
timeoutend=10 timeoutend=10
timecounter=0 timecounter=0
printf 'receiving messages via kafkacat\n' printf 'receiving messages via kcat\n'
while [ $timecounter -lt $timeoutend ]; do while [ $timecounter -lt $timeoutend ]; do
(( timecounter++ )) (( timecounter++ ))
kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s\n' > $RSYSLOG_OUT_LOG kcat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s\n' > $RSYSLOG_OUT_LOG
count=$(wc -l < ${RSYSLOG_OUT_LOG}) count=$(wc -l < ${RSYSLOG_OUT_LOG})
if [ $count -eq $TESTMESSAGES ]; then if [ $count -eq $TESTMESSAGES ]; then
printf '**** wait-kafka-lines success, have %d lines ****\n\n' "$TESTMESSAGES" printf '**** wait-kafka-lines success, have %d lines ****\n\n' "$TESTMESSAGES"

View File

@ -75,7 +75,7 @@ injectmsg 1 $NUMMESSAGES
wait_file_lines $RSYSLOG_OUT_LOG $NUMMESSAGESFULL 100 wait_file_lines $RSYSLOG_OUT_LOG $NUMMESSAGESFULL 100
# experimental: wait until kafkacat receives everything # experimental: wait until kcat receives everything
timeoutend=$WAITTIMEOUT timeoutend=$WAITTIMEOUT
timecounter=0 timecounter=0

View File

@ -80,7 +80,7 @@ injectmsg 1 $NUMMESSAGES
wait_file_lines $RSYSLOG_OUT_LOG $NUMMESSAGESFULL 100 wait_file_lines $RSYSLOG_OUT_LOG $NUMMESSAGESFULL 100
# experimental: wait until kafkacat receives everything # experimental: wait until kcat receives everything
timeoutend=$WAITTIMEOUT timeoutend=$WAITTIMEOUT
timecounter=0 timecounter=0

View File

@ -94,7 +94,7 @@ tcpflood -m$NUMMESSAGES -i1
wait_file_lines $RSYSLOG_OUT_LOG $NUMMESSAGESFULL 100 wait_file_lines $RSYSLOG_OUT_LOG $NUMMESSAGESFULL 100
# experimental: wait until kafkacat receives everything # experimental: wait until kcat receives everything
timeoutend=$WAITTIMEOUT timeoutend=$WAITTIMEOUT
timecounter=0 timecounter=0

View File

@ -91,7 +91,7 @@ startup
echo Inject messages into rsyslog sender instance echo Inject messages into rsyslog sender instance
injectmsg 1 $NUMMESSAGES injectmsg 1 $NUMMESSAGES
# experimental: wait until kafkacat receives everything # experimental: wait until kcat receives everything
timeoutend=$WAITTIMEOUT timeoutend=$WAITTIMEOUT
timecounter=0 timecounter=0

View File

@ -3,7 +3,7 @@
# This file is part of the rsyslog project, released under ASL 2.0 # This file is part of the rsyslog project, released under ASL 2.0
. ${srcdir:=.}/diag.sh init . ${srcdir:=.}/diag.sh init
test_status unreliable 'https://github.com/rsyslog/rsyslog/issues/3197' test_status unreliable 'https://github.com/rsyslog/rsyslog/issues/3197'
check_command_available kafkacat check_command_available kcat
export KEEP_KAFKA_RUNNING="YES" export KEEP_KAFKA_RUNNING="YES"
export TESTMESSAGES=100000 export TESTMESSAGES=100000
@ -80,7 +80,7 @@ injectmsg 1 $TESTMESSAGES
wait_file_lines $RSYSLOG_OUT_LOG $TESTMESSAGESFULL 100 wait_file_lines $RSYSLOG_OUT_LOG $TESTMESSAGESFULL 100
# experimental: wait until kafkacat receives everything # experimental: wait until kcat receives everything
timeoutend=100 timeoutend=100
timecounter=0 timecounter=0
@ -88,7 +88,7 @@ timecounter=0
while [ $timecounter -lt $timeoutend ]; do while [ $timecounter -lt $timeoutend ]; do
(( timecounter++ )) (( timecounter++ ))
kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG kcat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG
count=$(wc -l < ${RSYSLOG_OUT_LOG}) count=$(wc -l < ${RSYSLOG_OUT_LOG})
if [ $count -eq $TESTMESSAGESFULL ]; then if [ $count -eq $TESTMESSAGESFULL ]; then
printf '**** wait-kafka-lines success, have %d lines ****\n\n' "$TESTMESSAGESFULL" printf '**** wait-kafka-lines success, have %d lines ****\n\n' "$TESTMESSAGESFULL"
@ -117,8 +117,8 @@ echo Stopping sender instance [omkafka]
shutdown_when_empty shutdown_when_empty
wait_shutdown wait_shutdown
#kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG #kcat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG
#kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p@%o:%k:%s' > $RSYSLOG_OUT_LOG.extra #kcat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p@%o:%k:%s' > $RSYSLOG_OUT_LOG.extra
# Delete topic to remove old traces before # Delete topic to remove old traces before
delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181' delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181'

View File

@ -3,7 +3,7 @@
# This file is part of the rsyslog project, released under ASL 2.0 # This file is part of the rsyslog project, released under ASL 2.0
. ${srcdir:=.}/diag.sh init . ${srcdir:=.}/diag.sh init
test_status unreliable 'https://github.com/rsyslog/rsyslog/issues/3197' test_status unreliable 'https://github.com/rsyslog/rsyslog/issues/3197'
check_command_available kafkacat check_command_available kcat
export KEEP_KAFKA_RUNNING="YES" export KEEP_KAFKA_RUNNING="YES"
export TESTMESSAGES=100000 export TESTMESSAGES=100000
@ -78,7 +78,7 @@ injectmsg 1 $TESTMESSAGES
wait_file_lines $RSYSLOG_OUT_LOG $TESTMESSAGESFULL 100 wait_file_lines $RSYSLOG_OUT_LOG $TESTMESSAGESFULL 100
# experimental: wait until kafkacat receives everything # experimental: wait until kcat receives everything
timeoutend=100 timeoutend=100
timecounter=0 timecounter=0
@ -86,7 +86,7 @@ timecounter=0
while [ $timecounter -lt $timeoutend ]; do while [ $timecounter -lt $timeoutend ]; do
(( timecounter++ )) (( timecounter++ ))
kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG kcat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG
count=$(wc -l < ${RSYSLOG_OUT_LOG}) count=$(wc -l < ${RSYSLOG_OUT_LOG})
if [ $count -eq $TESTMESSAGESFULL ]; then if [ $count -eq $TESTMESSAGESFULL ]; then
printf '**** wait-kafka-lines success, have %d lines ****\n\n' "$TESTMESSAGESFULL" printf '**** wait-kafka-lines success, have %d lines ****\n\n' "$TESTMESSAGESFULL"
@ -115,8 +115,8 @@ echo Stopping sender instance [omkafka]
shutdown_when_empty shutdown_when_empty
wait_shutdown wait_shutdown
#kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG #kcat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG
#kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p@%o:%k:%s' > $RSYSLOG_OUT_LOG.extra #kcat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p@%o:%k:%s' > $RSYSLOG_OUT_LOG.extra
# Delete topic to remove old traces before # Delete topic to remove old traces before
delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181' delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181'

View File

@ -3,7 +3,7 @@
# This file is part of the rsyslog project, released under ASL 2.0 # This file is part of the rsyslog project, released under ASL 2.0
. ${srcdir:=.}/diag.sh init . ${srcdir:=.}/diag.sh init
test_status unreliable 'https://github.com/rsyslog/rsyslog/issues/3197' test_status unreliable 'https://github.com/rsyslog/rsyslog/issues/3197'
check_command_available kafkacat check_command_available kcat
export KEEP_KAFKA_RUNNING="YES" export KEEP_KAFKA_RUNNING="YES"
export TESTMESSAGES=100000 export TESTMESSAGES=100000
@ -78,7 +78,7 @@ injectmsg 1 $TESTMESSAGES
wait_file_lines $RSYSLOG_OUT_LOG $TESTMESSAGESFULL 100 wait_file_lines $RSYSLOG_OUT_LOG $TESTMESSAGESFULL 100
# experimental: wait until kafkacat receives everything # experimental: wait until kcat receives everything
timeoutend=100 timeoutend=100
timecounter=0 timecounter=0
@ -86,7 +86,7 @@ timecounter=0
while [ $timecounter -lt $timeoutend ]; do while [ $timecounter -lt $timeoutend ]; do
(( timecounter++ )) (( timecounter++ ))
kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG kcat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG
count=$(wc -l < ${RSYSLOG_OUT_LOG}) count=$(wc -l < ${RSYSLOG_OUT_LOG})
if [ $count -eq $TESTMESSAGESFULL ]; then if [ $count -eq $TESTMESSAGESFULL ]; then
printf '**** wait-kafka-lines success, have %d lines ****\n\n' "$TESTMESSAGESFULL" printf '**** wait-kafka-lines success, have %d lines ****\n\n' "$TESTMESSAGESFULL"
@ -115,8 +115,8 @@ echo Stopping sender instance [omkafka]
shutdown_when_empty shutdown_when_empty
wait_shutdown wait_shutdown
#kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG #kcat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG
#kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p@%o:%k:%s' > $RSYSLOG_OUT_LOG.extra #kcat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p@%o:%k:%s' > $RSYSLOG_OUT_LOG.extra
# Delete topic to remove old traces before # Delete topic to remove old traces before
delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181' delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181'

View File

@ -3,7 +3,7 @@
# This file is part of the rsyslog project, released under ASL 2.0 # This file is part of the rsyslog project, released under ASL 2.0
. ${srcdir:=.}/diag.sh init . ${srcdir:=.}/diag.sh init
test_status unreliable 'https://github.com/rsyslog/rsyslog/issues/3197' test_status unreliable 'https://github.com/rsyslog/rsyslog/issues/3197'
check_command_available kafkacat check_command_available kcat
export KEEP_KAFKA_RUNNING="YES" export KEEP_KAFKA_RUNNING="YES"
export TESTMESSAGES=100000 export TESTMESSAGES=100000
@ -84,7 +84,7 @@ injectmsg 1 $TESTMESSAGES
wait_file_lines $RSYSLOG_OUT_LOG $TESTMESSAGESFULL 100 wait_file_lines $RSYSLOG_OUT_LOG $TESTMESSAGESFULL 100
# experimental: wait until kafkacat receives everything # experimental: wait until kcat receives everything
timeoutend=100 timeoutend=100
timecounter=0 timecounter=0
@ -92,11 +92,11 @@ timecounter=0
while [ $timecounter -lt $timeoutend ]; do while [ $timecounter -lt $timeoutend ]; do
(( timecounter++ )) (( timecounter++ ))
kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG kcat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG
count=$(wc -l < ${RSYSLOG_OUT_LOG}) count=$(wc -l < ${RSYSLOG_OUT_LOG})
if [ $count -eq $TESTMESSAGESFULL ]; then if [ $count -eq $TESTMESSAGESFULL ]; then
printf '**** wait-kafka-lines success, have %d lines ****\n\n' "$TESTMESSAGESFULL" printf '**** wait-kafka-lines success, have %d lines ****\n\n' "$TESTMESSAGESFULL"
kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p %k\n' | sort | uniq > "$RSYSLOG_OUT_LOG.extra" kcat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p %k\n' | sort | uniq > "$RSYSLOG_OUT_LOG.extra"
count=$(wc -l < "${RSYSLOG_OUT_LOG}.extra") count=$(wc -l < "${RSYSLOG_OUT_LOG}.extra")
if [ $count -eq 10 ]; then if [ $count -eq 10 ]; then
printf '**** partition check success, have 10 partition-key combinations ****\n\n' printf '**** partition check success, have 10 partition-key combinations ****\n\n'
@ -106,7 +106,7 @@ while [ $timecounter -lt $timeoutend ]; do
wait_shutdown wait_shutdown
printf '\n\nERROR: partition check failed, expected 10 got %s\n' "$count" printf '\n\nERROR: partition check failed, expected 10 got %s\n' "$count"
printf '\ņRAW DATA:\n' printf '\ņRAW DATA:\n'
kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p %k\n' kcat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p %k\n'
printf '\nCHECKED OUTPUT:\n' printf '\nCHECKED OUTPUT:\n'
cat "$RSYSLOG_OUT_LOG.extra" cat "$RSYSLOG_OUT_LOG.extra"
error_exit 1 error_exit 1
@ -138,8 +138,8 @@ echo Stopping sender instance [omkafka]
shutdown_when_empty shutdown_when_empty
wait_shutdown wait_shutdown
#kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG #kcat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s' > $RSYSLOG_OUT_LOG
#kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p@%o:%k:%s' > $RSYSLOG_OUT_LOG.extra #kcat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p@%o:%k:%s' > $RSYSLOG_OUT_LOG.extra
# Delete topic to remove old traces before # Delete topic to remove old traces before
delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181' delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181'

View File

@ -37,7 +37,7 @@ sudo apt-get install -qq libmaxminddb-dev libmongoc-dev libbson-dev
# As travis has no xenial images, we always need to install librdkafka from source # As travis has no xenial images, we always need to install librdkafka from source
if [ "x$KAFKA" == "xYES" ]; then if [ "x$KAFKA" == "xYES" ]; then
sudo apt-get install -qq liblz4-dev sudo apt-get install -qq liblz4-dev
# For kafka testbench, "kafkacat" package is needed! # For kafka testbench, "kcat" package is needed!
git clone https://github.com/edenhill/librdkafka > /dev/null git clone https://github.com/edenhill/librdkafka > /dev/null
(unset CFLAGS; cd librdkafka ; ./configure --prefix=/usr --CFLAGS="-g" > /dev/null ; make -j2 > /dev/null ; sudo make install > /dev/null) (unset CFLAGS; cd librdkafka ; ./configure --prefix=/usr --CFLAGS="-g" > /dev/null ; make -j2 > /dev/null ; sudo make install > /dev/null)
rm -rf librdkafka # get rid of source, e.g. for line length check rm -rf librdkafka # get rid of source, e.g. for line length check