mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-16 09:50:40 +01:00
testbench: Kafka plumbing
- Removed all sleeps where possible. - Moved all kafka start/stop/download logic into functions. - Moved kafka/zookeeper stop into error_exit and exit_test. - Kafka/Zookeeper cleanup only done on success now. - Kafka/Zookeeper logfiles automatically dumped on error_exit only now. - Added cleanup for Kafka/Zookeeper instances into CI/buildbot_cleanup.sh
This commit is contained in:
parent
389d517e34
commit
b08a8e2b51
@ -614,10 +614,8 @@ TESTS += \
|
||||
# sndrcv_kafka_multi.sh
|
||||
if HAVE_VALGRIND
|
||||
TESTS += \
|
||||
sndrcv_kafka-vg-rcvr.sh
|
||||
# sndrcv_kafka-vg-sender.sh
|
||||
# Note: -sender had some issues. If the re-appear, see
|
||||
# see https://github.com/rsyslog/rsyslog/issues/2434
|
||||
omkafka-vg.sh \
|
||||
imkafka-vg.sh
|
||||
endif
|
||||
endif
|
||||
endif
|
||||
@ -1628,12 +1626,12 @@ EXTRA_DIST= \
|
||||
mysql-actq-mt-withpause.sh \
|
||||
mysql-actq-mt-withpause-vg.sh \
|
||||
omkafka.sh \
|
||||
omkafka-vg.sh \
|
||||
imkafka.sh \
|
||||
imkafka-vg.sh \
|
||||
imkafka_multi_single.sh \
|
||||
sndrcv_kafka.sh \
|
||||
sndrcv_kafka_multi_topics.sh \
|
||||
sndrcv_kafka-vg-rcvr.sh \
|
||||
sndrcv_kafka-vg-sender.sh \
|
||||
testsuites/kafka-server.properties \
|
||||
testsuites/kafka-server.dep_wrk1.properties \
|
||||
testsuites/kafka-server.dep_wrk2.properties \
|
||||
|
||||
531
tests/diag.sh
531
tests/diag.sh
@ -630,19 +630,24 @@ function error_exit() {
|
||||
RSYSLOG_DEBUG=$RSYSLOG_DEBUG_SAVE
|
||||
rm IN_AUTO_DEBUG
|
||||
fi
|
||||
# Extended debug output for dependencies started by testbench
|
||||
if [[ "$EXTRA_EXITCHECK" == 'dumpkafkalogs' ]]; then
|
||||
# Dump Zookeeper log
|
||||
. $srcdir/diag.sh dump-zookeeper-serverlog
|
||||
# Dump Kafka log
|
||||
. $srcdir/diag.sh dump-kafka-serverlog
|
||||
fi
|
||||
# output listening ports as a temporay debug measure (2018-09-08 rgerhards)
|
||||
if [ $(uname) == "Linux" ]; then
|
||||
netstat -tlp
|
||||
else
|
||||
netstat
|
||||
fi
|
||||
|
||||
# Extended debug output for dependencies started by testbench
|
||||
if [[ "$EXTRA_EXITCHECK" == 'dumpkafkalogs' ]]; then
|
||||
# Dump Zookeeper log
|
||||
dump_zookeeper_serverlog
|
||||
# Dump Kafka log
|
||||
dump_kafka_serverlog
|
||||
fi
|
||||
|
||||
# Extended Exit handling for kafka / zookeeper instances
|
||||
kafka_exit_handling "false"
|
||||
|
||||
# we need to do some minimal cleanup so that "make distcheck" does not
|
||||
# complain too much
|
||||
exit $1
|
||||
@ -731,6 +736,10 @@ function exit_test() {
|
||||
rm -f tmp.qi nocert
|
||||
rm -fr $RSYSLOG_DYNNAME* # delete all of our dynamic files
|
||||
unset TCPFLOOD_EXTRA_OPTS
|
||||
|
||||
# Extended Exit handling for kafka / zookeeper instances
|
||||
kafka_exit_handling "true"
|
||||
|
||||
printf "Test SUCCESFUL\n"
|
||||
echo -------------------------------------------------------------------------------
|
||||
}
|
||||
@ -796,6 +805,292 @@ dep_work_dir=$(pwd)/.dep_wrk
|
||||
|
||||
#END: ext kafka config
|
||||
|
||||
function kafka_exit_handling() {
|
||||
|
||||
# Extended Exit handling for kafka / zookeeper instances
|
||||
if [[ "$EXTRA_EXIT" == 'kafka' ]]; then
|
||||
|
||||
echo stop kafka instance
|
||||
stop_kafka '.dep_wrk' $1
|
||||
|
||||
echo stop zookeeper instance
|
||||
stop_zookeeper '.dep_wrk' $1
|
||||
fi
|
||||
|
||||
# Extended Exit handling for kafka / zookeeper instances
|
||||
if [[ "$EXTRA_EXIT" == 'kafkamulti' ]]; then
|
||||
echo stop kafka instances
|
||||
stop_kafka '.dep_wrk1' $1
|
||||
stop_kafka '.dep_wrk2' $1
|
||||
stop_kafka '.dep_wrk3' $1
|
||||
|
||||
echo stop zookeeper instances
|
||||
stop_zookeeper '.dep_wrk1' $1
|
||||
stop_zookeeper '.dep_wrk2' $1
|
||||
stop_zookeeper '.dep_wrk3' $1
|
||||
fi
|
||||
}
|
||||
|
||||
function download_kafka() {
|
||||
if [ ! -d $dep_cache_dir ]; then
|
||||
echo "Creating dependency cache dir $dep_cache_dir"
|
||||
mkdir $dep_cache_dir
|
||||
fi
|
||||
if [ ! -f $dep_zk_cached_file ]; then
|
||||
echo "Downloading zookeeper"
|
||||
wget -q $dep_zk_url -O $dep_zk_cached_file
|
||||
if [ $? -ne 0 ]
|
||||
then
|
||||
echo error during wget, retry:
|
||||
wget $dep_zk_url -O $dep_zk_cached_file
|
||||
if [ $? -ne 0 ]
|
||||
then
|
||||
error_exit 1
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
if [ ! -f $dep_kafka_cached_file ]; then
|
||||
echo "Downloading kafka"
|
||||
wget -q $dep_kafka_url -O $dep_kafka_cached_file
|
||||
if [ $? -ne 0 ]
|
||||
then
|
||||
echo error during wget, retry:
|
||||
wget $dep_kafka_url -O $dep_kafka_cached_file
|
||||
if [ $? -ne 0 ]
|
||||
then
|
||||
error_exit 1
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
function stop_kafka() {
|
||||
if [ "x$1" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
else
|
||||
dep_work_dir=$(readlink -f $srcdir/$1)
|
||||
fi
|
||||
if [ ! -d $dep_work_dir/kafka ]; then
|
||||
echo "Kafka work-dir $dep_work_dir/kafka does not exist, no action needed"
|
||||
else
|
||||
echo "Stopping Kafka instance $1"
|
||||
(cd $dep_work_dir/kafka && ./bin/kafka-server-stop.sh)
|
||||
if [[ "$2" == 'true' ]]; then
|
||||
$TESTTOOL_DIR/msleep 2000
|
||||
cleanup_kafka $1
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
function cleanup_kafka() {
|
||||
if [ "x$1" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
else
|
||||
dep_work_dir=$(readlink -f $srcdir/$1)
|
||||
fi
|
||||
if [ ! -d $dep_work_dir/kafka ]; then
|
||||
echo "Kafka work-dir $dep_work_dir/kafka does not exist, no action needed"
|
||||
else
|
||||
echo "Cleanup Kafka instance $1"
|
||||
rm -rf $dep_work_dir/kafka
|
||||
fi
|
||||
}
|
||||
|
||||
function stop_zookeeper() {
|
||||
if [ "x$1" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
else
|
||||
dep_work_dir=$(readlink -f $srcdir/$1)
|
||||
fi
|
||||
(cd $dep_work_dir/zk &> /dev/null && ./bin/zkServer.sh stop)
|
||||
if [[ "$2" == 'true' ]]; then
|
||||
$TESTTOOL_DIR/msleep 2000
|
||||
cleanup_zookeeper $1
|
||||
fi
|
||||
}
|
||||
|
||||
function cleanup_zookeeper() {
|
||||
if [ "x$1" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
else
|
||||
dep_work_dir=$(readlink -f $srcdir/$1)
|
||||
fi
|
||||
rm -rf $dep_work_dir/zk
|
||||
}
|
||||
|
||||
function start_zookeeper() {
|
||||
if [ "x$1" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
dep_work_tk_config="zoo.cfg"
|
||||
else
|
||||
dep_work_dir=$(readlink -f $srcdir/$1)
|
||||
dep_work_tk_config="zoo$1.cfg"
|
||||
fi
|
||||
|
||||
if [ ! -f $dep_zk_cached_file ]; then
|
||||
echo "Dependency-cache does not have zookeeper package, did you download dependencies?"
|
||||
error_exit 77
|
||||
fi
|
||||
if [ ! -d $dep_work_dir ]; then
|
||||
echo "Creating dependency working directory"
|
||||
mkdir -p $dep_work_dir
|
||||
fi
|
||||
if [ -d $dep_work_dir/zk ]; then
|
||||
(cd $dep_work_dir/zk && ./bin/zkServer.sh stop)
|
||||
$TESTTOOL_DIR/msleep 2000
|
||||
fi
|
||||
rm -rf $dep_work_dir/zk
|
||||
(cd $dep_work_dir && tar -zxvf $dep_zk_cached_file --xform $dep_zk_dir_xform_pattern --show-transformed-names) > /dev/null
|
||||
cp -f $srcdir/testsuites/$dep_work_tk_config $dep_work_dir/zk/conf/zoo.cfg
|
||||
echo "Starting Zookeeper instance $1"
|
||||
(cd $dep_work_dir/zk && ./bin/zkServer.sh start)
|
||||
$TESTTOOL_DIR/msleep 2000
|
||||
}
|
||||
|
||||
function start_kafka() {
|
||||
# Force IPv4 usage of Kafka!
|
||||
export KAFKA_OPTS="-Djava.net.preferIPv4Stack=True"
|
||||
if [ "x$1" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
dep_work_kafka_config="kafka-server.properties"
|
||||
else
|
||||
dep_work_dir=$(readlink -f $1)
|
||||
dep_work_kafka_config="kafka-server$1.properties"
|
||||
fi
|
||||
|
||||
if [ ! -f $dep_kafka_cached_file ]; then
|
||||
echo "Dependency-cache does not have kafka package, did you download dependencies?"
|
||||
error_exit 77
|
||||
fi
|
||||
if [ ! -d $dep_work_dir ]; then
|
||||
echo "Creating dependency working directory"
|
||||
mkdir -p $dep_work_dir
|
||||
fi
|
||||
rm -rf $dep_work_dir/kafka
|
||||
( cd $dep_work_dir &&
|
||||
tar -zxvf $dep_kafka_cached_file --xform $dep_kafka_dir_xform_pattern --show-transformed-names) > /dev/null
|
||||
cp -f $srcdir/testsuites/$dep_work_kafka_config $dep_work_dir/kafka/config/
|
||||
echo "Starting Kafka instance $dep_work_kafka_config"
|
||||
(cd $dep_work_dir/kafka && ./bin/kafka-server-start.sh -daemon ./config/$dep_work_kafka_config)
|
||||
$TESTTOOL_DIR/msleep 4000
|
||||
|
||||
# Check if kafka instance came up!
|
||||
kafkapid=`ps aux | grep -i $dep_work_kafka_config | grep java | grep -v grep | awk '{print $1}'`
|
||||
if [[ "" != "$kafkapid" ]];
|
||||
then
|
||||
echo "Kafka instance $dep_work_kafka_config started with PID $kafkapid"
|
||||
else
|
||||
echo "Starting Kafka instance $dep_work_kafka_config, SECOND ATTEMPT!"
|
||||
(cd $dep_work_dir/kafka && ./bin/kafka-server-start.sh -daemon ./config/$dep_work_kafka_config)
|
||||
$TESTTOOL_DIR/msleep 4000
|
||||
|
||||
kafkapid=`ps aux | grep -i $dep_work_kafka_config | grep java | grep -v grep | awk '{print $1}'`
|
||||
if [[ "" != "$kafkapid" ]];
|
||||
then
|
||||
echo "Kafka instance $dep_work_kafka_config started with PID $kafkapid"
|
||||
else
|
||||
echo "Failed to start Kafka instance for $dep_work_kafka_config"
|
||||
error_exit 77
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
function create_kafka_topic() {
|
||||
if [ "x$2" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
else
|
||||
dep_work_dir=$(readlink -f $2)
|
||||
fi
|
||||
if [ "x$3" == "x" ]; then
|
||||
dep_work_port='2181'
|
||||
else
|
||||
dep_work_port=$3
|
||||
fi
|
||||
if [ ! -d $dep_work_dir/kafka ]; then
|
||||
echo "Kafka work-dir $dep_work_dir/kafka does not exist, did you start kafka?"
|
||||
exit 1
|
||||
fi
|
||||
if [ "x$1" == "x" ]; then
|
||||
echo "Topic-name not provided."
|
||||
exit 1
|
||||
fi
|
||||
(cd $dep_work_dir/kafka && ./bin/kafka-topics.sh --zookeeper localhost:$dep_work_port/kafka --create --topic $1 --replication-factor 1 --partitions 2 )
|
||||
(cd $dep_work_dir/kafka && ./bin/kafka-topics.sh --zookeeper localhost:$dep_work_port/kafka --alter --topic $1 --delete-config retention.ms)
|
||||
(cd $dep_work_dir/kafka && ./bin/kafka-topics.sh --zookeeper localhost:$dep_work_port/kafka --alter --topic $1 --delete-config retention.bytes)
|
||||
}
|
||||
|
||||
function delete_kafka_topic() {
|
||||
if [ "x$2" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
else
|
||||
dep_work_dir=$(readlink -f $srcdir/$2)
|
||||
fi
|
||||
if [ "x$3" == "x" ]; then
|
||||
dep_work_port='2181'
|
||||
else
|
||||
dep_work_port=$3
|
||||
fi
|
||||
|
||||
echo "deleting kafka-topic $1"
|
||||
(cd $dep_work_dir/kafka && ./bin/kafka-topics.sh --delete --zookeeper localhost:$dep_work_port/kafka --topic $1)
|
||||
}
|
||||
|
||||
function dump_kafka_topic() {
|
||||
if [ "x$2" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
dep_kafka_log_dump=$(readlink -f rsyslog.out.kafka.log)
|
||||
else
|
||||
dep_work_dir=$(readlink -f $srcdir/$2)
|
||||
dep_kafka_log_dump=$(readlink -f rsyslog.out.kafka$2.log)
|
||||
fi
|
||||
if [ "x$3" == "x" ]; then
|
||||
dep_work_port='2181'
|
||||
else
|
||||
dep_work_port=$3
|
||||
fi
|
||||
|
||||
echo "dumping kafka-topic $1"
|
||||
if [ ! -d $dep_work_dir/kafka ]; then
|
||||
echo "Kafka work-dir does not exist, did you start kafka?"
|
||||
exit 1
|
||||
fi
|
||||
if [ "x$1" == "x" ]; then
|
||||
echo "Topic-name not provided."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
(cd $dep_work_dir/kafka && ./bin/kafka-console-consumer.sh --timeout-ms 2000 --from-beginning --zookeeper localhost:$dep_work_port/kafka --topic $1 > $dep_kafka_log_dump)
|
||||
}
|
||||
|
||||
function dump_kafka_serverlog() {
|
||||
if [ "x$1" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
else
|
||||
dep_work_dir=$(readlink -f $srcdir/$1)
|
||||
fi
|
||||
if [ ! -d $dep_work_dir/kafka ]; then
|
||||
echo "Kafka work-dir $dep_work_dir/kafka does not exist, no kafka debuglog"
|
||||
else
|
||||
echo "Dumping server.log from Kafka instance $1"
|
||||
echo "========================================="
|
||||
cat $dep_work_dir/kafka/logs/server.log
|
||||
echo "========================================="
|
||||
fi
|
||||
}
|
||||
|
||||
function dump_zookeeper_serverlog() {
|
||||
if [ "x$1" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
else
|
||||
dep_work_dir=$(readlink -f $srcdir/$1)
|
||||
fi
|
||||
echo "Dumping zookeeper.out from Zookeeper instance $1"
|
||||
echo "========================================="
|
||||
cat $dep_work_dir/zk/zookeeper.out
|
||||
echo "========================================="
|
||||
}
|
||||
|
||||
|
||||
case $1 in
|
||||
'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason
|
||||
# for (solaris) load debugging, uncomment next 2 lines:
|
||||
@ -1123,38 +1418,6 @@ case $1 in
|
||||
exit 77
|
||||
fi
|
||||
;;
|
||||
'download-kafka')
|
||||
if [ ! -d $dep_cache_dir ]; then
|
||||
echo "Creating dependency cache dir $dep_cache_dir"
|
||||
mkdir $dep_cache_dir
|
||||
fi
|
||||
if [ ! -f $dep_zk_cached_file ]; then
|
||||
echo "Downloading zookeeper"
|
||||
wget -q $dep_zk_url -O $dep_zk_cached_file
|
||||
if [ $? -ne 0 ]
|
||||
then
|
||||
echo error during wget, retry:
|
||||
wget $dep_zk_url -O $dep_zk_cached_file
|
||||
if [ $? -ne 0 ]
|
||||
then
|
||||
error_exit 1
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
if [ ! -f $dep_kafka_cached_file ]; then
|
||||
echo "Downloading kafka"
|
||||
wget -q $dep_kafka_url -O $dep_kafka_cached_file
|
||||
if [ $? -ne 0 ]
|
||||
then
|
||||
echo error during wget, retry:
|
||||
wget $dep_kafka_url -O $dep_kafka_cached_file
|
||||
if [ $? -ne 0 ]
|
||||
then
|
||||
error_exit 1
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
;;
|
||||
'download-elasticsearch')
|
||||
if [ ! -d $dep_cache_dir ]; then
|
||||
echo "Creating dependency cache dir $dep_cache_dir"
|
||||
@ -1171,80 +1434,6 @@ case $1 in
|
||||
fi
|
||||
fi
|
||||
;;
|
||||
'start-zookeeper')
|
||||
if [ "x$2" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
dep_work_tk_config="zoo.cfg"
|
||||
else
|
||||
dep_work_dir=$(readlink -f $srcdir/$2)
|
||||
dep_work_tk_config="zoo$2.cfg"
|
||||
fi
|
||||
|
||||
if [ ! -f $dep_zk_cached_file ]; then
|
||||
echo "Dependency-cache does not have zookeeper package, did you download dependencies?"
|
||||
exit 77
|
||||
fi
|
||||
if [ ! -d $dep_work_dir ]; then
|
||||
echo "Creating dependency working directory"
|
||||
mkdir -p $dep_work_dir
|
||||
fi
|
||||
if [ -d $dep_work_dir/zk ]; then
|
||||
(cd $dep_work_dir/zk && ./bin/zkServer.sh stop)
|
||||
$TESTTOOL_DIR/msleep 2000
|
||||
fi
|
||||
rm -rf $dep_work_dir/zk
|
||||
(cd $dep_work_dir && tar -zxvf $dep_zk_cached_file --xform $dep_zk_dir_xform_pattern --show-transformed-names) > /dev/null
|
||||
cp -f $srcdir/testsuites/$dep_work_tk_config $dep_work_dir/zk/conf/zoo.cfg
|
||||
echo "Starting Zookeeper instance $2"
|
||||
(cd $dep_work_dir/zk && ./bin/zkServer.sh start)
|
||||
$TESTTOOL_DIR/msleep 2000
|
||||
;;
|
||||
'start-kafka')
|
||||
# Force IPv4 usage of Kafka!
|
||||
export KAFKA_OPTS="-Djava.net.preferIPv4Stack=True"
|
||||
if [ "x$2" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
dep_work_kafka_config="kafka-server.properties"
|
||||
else
|
||||
dep_work_dir=$(readlink -f $2)
|
||||
dep_work_kafka_config="kafka-server$2.properties"
|
||||
fi
|
||||
|
||||
if [ ! -f $dep_kafka_cached_file ]; then
|
||||
echo "Dependency-cache does not have kafka package, did you download dependencies?"
|
||||
exit 77
|
||||
fi
|
||||
if [ ! -d $dep_work_dir ]; then
|
||||
echo "Creating dependency working directory"
|
||||
mkdir -p $dep_work_dir
|
||||
fi
|
||||
rm -rf $dep_work_dir/kafka
|
||||
(cd $dep_work_dir && tar -zxvf $dep_kafka_cached_file --xform $dep_kafka_dir_xform_pattern --show-transformed-names) > /dev/null
|
||||
cp -f $srcdir/testsuites/$dep_work_kafka_config $dep_work_dir/kafka/config/
|
||||
echo "Starting Kafka instance $dep_work_kafka_config"
|
||||
(cd $dep_work_dir/kafka && ./bin/kafka-server-start.sh -daemon ./config/$dep_work_kafka_config)
|
||||
$TESTTOOL_DIR/msleep 4000
|
||||
|
||||
# Check if kafka instance came up!
|
||||
kafkapid=`ps aux | grep -i $dep_work_kafka_config | grep java | grep -v grep | awk '{print $2}'`
|
||||
if [[ "" != "$kafkapid" ]];
|
||||
then
|
||||
echo "Kafka instance $dep_work_kafka_config started with PID $kafkapid"
|
||||
else
|
||||
echo "Starting Kafka instance $dep_work_kafka_config, SECOND ATTEMPT!"
|
||||
(cd $dep_work_dir/kafka && ./bin/kafka-server-start.sh -daemon ./config/$dep_work_kafka_config)
|
||||
$TESTTOOL_DIR/msleep 4000
|
||||
|
||||
kafkapid=`ps aux | grep -i $dep_work_kafka_config | grep java | grep -v grep | awk '{print $2}'`
|
||||
if [[ "" != "$kafkapid" ]];
|
||||
then
|
||||
echo "Kafka instance $dep_work_kafka_config started with PID $kafkapid"
|
||||
else
|
||||
echo "Failed to start Kafka instance for $dep_work_kafka_config"
|
||||
error_exit 77
|
||||
fi
|
||||
fi
|
||||
;;
|
||||
'prepare-elasticsearch') # $2, if set, is the number of additional ES instances
|
||||
# Heap Size (limit to 128MB for testbench! defaults is way to HIGH)
|
||||
export ES_JAVA_OPTS="-Xms128m -Xmx128m"
|
||||
@ -1331,58 +1520,6 @@ case $1 in
|
||||
$TESTTOOL_DIR/msleep 2000
|
||||
echo ES startup succeeded
|
||||
;;
|
||||
'dump-kafka-serverlog')
|
||||
if [ "x$2" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
else
|
||||
dep_work_dir=$(readlink -f $srcdir/$2)
|
||||
fi
|
||||
if [ ! -d $dep_work_dir/kafka ]; then
|
||||
echo "Kafka work-dir $dep_work_dir/kafka does not exist, no kafka debuglog"
|
||||
else
|
||||
echo "Dumping server.log from Kafka instance $2"
|
||||
echo "========================================="
|
||||
cat $dep_work_dir/kafka/logs/server.log
|
||||
echo "========================================="
|
||||
fi
|
||||
;;
|
||||
|
||||
'dump-zookeeper-serverlog')
|
||||
if [ "x$2" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
else
|
||||
dep_work_dir=$(readlink -f $srcdir/$2)
|
||||
fi
|
||||
echo "Dumping zookeeper.out from Zookeeper instance $2"
|
||||
echo "========================================="
|
||||
cat $dep_work_dir/zk/zookeeper.out
|
||||
echo "========================================="
|
||||
;;
|
||||
'stop-kafka')
|
||||
if [ "x$2" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
else
|
||||
dep_work_dir=$(readlink -f $srcdir/$2)
|
||||
fi
|
||||
if [ ! -d $dep_work_dir/kafka ]; then
|
||||
echo "Kafka work-dir $dep_work_dir/kafka does not exist, no action needed"
|
||||
else
|
||||
echo "Stopping Kafka instance $2"
|
||||
(cd $dep_work_dir/kafka && ./bin/kafka-server-stop.sh)
|
||||
$TESTTOOL_DIR/msleep 2000
|
||||
rm -rf $dep_work_dir/kafka
|
||||
fi
|
||||
;;
|
||||
'stop-zookeeper')
|
||||
if [ "x$2" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
else
|
||||
dep_work_dir=$(readlink -f $srcdir/$2)
|
||||
fi
|
||||
(cd $dep_work_dir/zk &> /dev/null && ./bin/zkServer.sh stop)
|
||||
$TESTTOOL_DIR/msleep 2000
|
||||
rm -rf $dep_work_dir/zk
|
||||
;;
|
||||
'stop-elasticsearch')
|
||||
if [ "x$2" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
@ -1405,70 +1542,6 @@ case $1 in
|
||||
rm -f $dep_work_es_pidfile
|
||||
rm -rf $dep_work_dir/es
|
||||
;;
|
||||
'create-kafka-topic')
|
||||
if [ "x$3" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
else
|
||||
dep_work_dir=$(readlink -f $3)
|
||||
fi
|
||||
if [ "x$4" == "x" ]; then
|
||||
dep_work_port='2181'
|
||||
else
|
||||
dep_work_port=$4
|
||||
fi
|
||||
if [ ! -d $dep_work_dir/kafka ]; then
|
||||
echo "Kafka work-dir $dep_work_dir/kafka does not exist, did you start kafka?"
|
||||
exit 1
|
||||
fi
|
||||
if [ "x$2" == "x" ]; then
|
||||
echo "Topic-name not provided."
|
||||
exit 1
|
||||
fi
|
||||
(cd $dep_work_dir/kafka && ./bin/kafka-topics.sh --zookeeper localhost:$dep_work_port/kafka --create --topic $2 --replication-factor 1 --partitions 2 )
|
||||
(cd $dep_work_dir/kafka && ./bin/kafka-topics.sh --zookeeper localhost:$dep_work_port/kafka --alter --topic $2 --delete-config retention.ms)
|
||||
(cd $dep_work_dir/kafka && ./bin/kafka-topics.sh --zookeeper localhost:$dep_work_port/kafka --alter --topic $2 --delete-config retention.bytes)
|
||||
;;
|
||||
'delete-kafka-topic')
|
||||
if [ "x$3" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
else
|
||||
dep_work_dir=$(readlink -f $srcdir/$3)
|
||||
fi
|
||||
if [ "x$4" == "x" ]; then
|
||||
dep_work_port='2181'
|
||||
else
|
||||
dep_work_port=$4
|
||||
fi
|
||||
|
||||
echo "deleting kafka-topic $2"
|
||||
(cd $dep_work_dir/kafka && ./bin/kafka-topics.sh --delete --zookeeper localhost:$dep_work_port/kafka --topic $2)
|
||||
;;
|
||||
'dump-kafka-topic')
|
||||
if [ "x$3" == "x" ]; then
|
||||
dep_work_dir=$(readlink -f .dep_wrk)
|
||||
dep_kafka_log_dump=$(readlink -f rsyslog.out.kafka.log)
|
||||
else
|
||||
dep_work_dir=$(readlink -f $srcdir/$3)
|
||||
dep_kafka_log_dump=$(readlink -f rsyslog.out.kafka$3.log)
|
||||
fi
|
||||
if [ "x$4" == "x" ]; then
|
||||
dep_work_port='2181'
|
||||
else
|
||||
dep_work_port=$4
|
||||
fi
|
||||
|
||||
echo "dumping kafka-topic $2"
|
||||
if [ ! -d $dep_work_dir/kafka ]; then
|
||||
echo "Kafka work-dir does not exist, did you start kafka?"
|
||||
exit 1
|
||||
fi
|
||||
if [ "x$2" == "x" ]; then
|
||||
echo "Topic-name not provided."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
(cd $dep_work_dir/kafka && ./bin/kafka-console-consumer.sh --timeout-ms 2000 --from-beginning --zookeeper localhost:$dep_work_port/kafka --topic $2 > $dep_kafka_log_dump)
|
||||
;;
|
||||
'check-inotify') # Check for inotify/fen support
|
||||
if [ -n "$(find /usr/include -name 'inotify.h' -print -quit)" ]; then
|
||||
echo [inotify mode]
|
||||
|
||||
89
tests/imkafka-vg.sh
Executable file
89
tests/imkafka-vg.sh
Executable file
@ -0,0 +1,89 @@
|
||||
#!/bin/bash
|
||||
# added 2018-08-29 by alorbach
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
echo Init Testbench
|
||||
. $srcdir/diag.sh init
|
||||
check_command_available kafkacat
|
||||
|
||||
# *** ==============================================================================
|
||||
export TESTMESSAGES=10000
|
||||
export TESTMESSAGESFULL=$TESTMESSAGES
|
||||
|
||||
# Generate random topic name
|
||||
export RANDTOPIC=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
|
||||
|
||||
# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only.
|
||||
export EXTRA_EXITCHECK=dumpkafkalogs
|
||||
export EXTRA_EXIT=kafka
|
||||
echo ===============================================================================
|
||||
echo Check and Stop previous instances of kafka/zookeeper
|
||||
download_kafka
|
||||
stop_zookeeper
|
||||
stop_kafka
|
||||
|
||||
echo Create kafka/zookeeper instance and $RANDTOPIC topic
|
||||
start_zookeeper
|
||||
start_kafka
|
||||
# create new topic
|
||||
create_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
# --- Create imkafka receiver config
|
||||
export RSYSLOG_DEBUGLOG="log"
|
||||
generate_conf
|
||||
add_conf '
|
||||
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
|
||||
|
||||
module(load="../plugins/imkafka/.libs/imkafka")
|
||||
/* Polls messages from kafka server!*/
|
||||
input( type="imkafka"
|
||||
topic="'$RANDTOPIC'"
|
||||
broker="localhost:29092"
|
||||
consumergroup="default"
|
||||
confParam=[ "compression.codec=none",
|
||||
"session.timeout.ms=10000",
|
||||
"socket.timeout.ms=5000",
|
||||
"socket.keepalive.enable=true",
|
||||
"reconnect.backoff.jitter.ms=1000",
|
||||
"enable.partition.eof=false" ]
|
||||
)
|
||||
|
||||
template(name="outfmt" type="string" string="%msg:F,58:2%\n")
|
||||
|
||||
if ($msg contains "msgnum:") then {
|
||||
action( type="omfile" file=`echo $RSYSLOG_OUT_LOG` template="outfmt" )
|
||||
}
|
||||
'
|
||||
# ---
|
||||
|
||||
# --- Start imkafka receiver config
|
||||
echo Starting receiver instance [imkafka]
|
||||
startup_vg
|
||||
# ---
|
||||
|
||||
# --- Fill Kafka Server with messages
|
||||
# Can properly be done in a better way?!
|
||||
for i in {00000001..0010000}
|
||||
do
|
||||
echo " msgnum:$i" >> $RSYSLOG_OUT_LOG.in
|
||||
done
|
||||
|
||||
echo Inject messages into kafka
|
||||
cat $RSYSLOG_OUT_LOG.in | kafkacat -P -b localhost:29092 -t $RANDTOPIC
|
||||
# ---
|
||||
|
||||
echo Give imkafka some time to start...
|
||||
sleep 5
|
||||
|
||||
echo Stopping sender instance [omkafka]
|
||||
shutdown_when_empty
|
||||
wait_shutdown_vg
|
||||
check_exit_vg
|
||||
|
||||
# Delete topic to remove old traces before
|
||||
delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
# Do the final sequence check
|
||||
seq_check 1 $TESTMESSAGESFULL -d
|
||||
|
||||
echo success
|
||||
exit_test
|
||||
@ -1,33 +1,32 @@
|
||||
#!/bin/bash
|
||||
# added 2018-08-29 by alorbach
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
echo Init Testbench
|
||||
. $srcdir/diag.sh init
|
||||
check_command_available kafkacat
|
||||
|
||||
# *** ==============================================================================
|
||||
export TESTMESSAGES=100000
|
||||
export TESTMESSAGESFULL=$TESTMESSAGES
|
||||
|
||||
# Generate random topic name
|
||||
export RANDTOPIC=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
|
||||
|
||||
# enable the EXTRA_EXITCHECK only if really needed - otherwise spams the test log
|
||||
# too much
|
||||
#export EXTRA_EXITCHECK=dumpkafkalogs
|
||||
# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only.
|
||||
export EXTRA_EXITCHECK=dumpkafkalogs
|
||||
export EXTRA_EXIT=kafka
|
||||
echo ===============================================================================
|
||||
echo Create kafka/zookeeper instance and $RANDTOPIC topic
|
||||
. $srcdir/diag.sh download-kafka
|
||||
. $srcdir/diag.sh stop-zookeeper
|
||||
. $srcdir/diag.sh stop-kafka
|
||||
|
||||
echo Init Testbench
|
||||
. $srcdir/diag.sh init
|
||||
check_command_available kafkacat
|
||||
echo Check and Stop previous instances of kafka/zookeeper
|
||||
download_kafka
|
||||
stop_zookeeper
|
||||
stop_kafka
|
||||
|
||||
echo Create kafka/zookeeper instance and $RANDTOPIC topic
|
||||
. $srcdir/diag.sh start-zookeeper
|
||||
. $srcdir/diag.sh start-kafka
|
||||
start_zookeeper
|
||||
start_kafka
|
||||
|
||||
# create new topic
|
||||
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
echo Give Kafka some time to process topic create ...
|
||||
sleep 5
|
||||
create_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
# --- Create imkafka receiver config
|
||||
export RSYSLOG_DEBUGLOG="log"
|
||||
@ -81,13 +80,7 @@ shutdown_when_empty
|
||||
wait_shutdown
|
||||
|
||||
# Delete topic to remove old traces before
|
||||
. $srcdir/diag.sh delete-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
echo stop kafka instance
|
||||
. $srcdir/diag.sh stop-kafka
|
||||
|
||||
# STOP ZOOKEEPER in any case
|
||||
. $srcdir/diag.sh stop-zookeeper
|
||||
delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
# Do the final sequence check
|
||||
seq_check 1 $TESTMESSAGESFULL -d
|
||||
|
||||
@ -1,47 +1,40 @@
|
||||
#!/bin/bash
|
||||
# added 2018-08-29 by alorbach
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
echo Init Testbench
|
||||
. $srcdir/diag.sh init
|
||||
check_command_available kafkacat
|
||||
|
||||
# *** ==============================================================================
|
||||
export TESTMESSAGES=100000
|
||||
export TESTMESSAGESFULL=100000
|
||||
|
||||
# Generate random topic name
|
||||
export RANDTOPIC=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
|
||||
|
||||
echo Init Testbench
|
||||
. $srcdir/diag.sh init
|
||||
|
||||
# Check for kafkacat
|
||||
check_command_available kafkacat
|
||||
|
||||
# enable the EXTRA_EXITCHECK only if really needed - otherwise spams the test log
|
||||
# too much
|
||||
#export EXTRA_EXITCHECK=dumpkafkalogs
|
||||
# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only.
|
||||
export EXTRA_EXITCHECK=dumpkafkalogs
|
||||
export EXTRA_EXIT=kafkamulti
|
||||
echo ===============================================================================
|
||||
echo Check and Stop previous instances of kafka/zookeeper
|
||||
download_kafka
|
||||
stop_zookeeper '.dep_wrk1'
|
||||
stop_zookeeper '.dep_wrk2'
|
||||
stop_zookeeper '.dep_wrk3'
|
||||
stop_kafka '.dep_wrk1'
|
||||
stop_kafka '.dep_wrk2'
|
||||
stop_kafka '.dep_wrk3'
|
||||
|
||||
echo Create kafka/zookeeper instance and $RANDTOPIC topic
|
||||
. $srcdir/diag.sh download-kafka
|
||||
. $srcdir/diag.sh stop-zookeeper '.dep_wrk1'
|
||||
. $srcdir/diag.sh stop-zookeeper '.dep_wrk2'
|
||||
. $srcdir/diag.sh stop-zookeeper '.dep_wrk3'
|
||||
. $srcdir/diag.sh stop-kafka '.dep_wrk1'
|
||||
. $srcdir/diag.sh stop-kafka '.dep_wrk2'
|
||||
. $srcdir/diag.sh stop-kafka '.dep_wrk3'
|
||||
|
||||
echo Create kafka/zookeeper instance and topics
|
||||
. $srcdir/diag.sh start-zookeeper '.dep_wrk1'
|
||||
. $srcdir/diag.sh start-zookeeper '.dep_wrk2'
|
||||
. $srcdir/diag.sh start-zookeeper '.dep_wrk3'
|
||||
. $srcdir/diag.sh start-kafka '.dep_wrk1'
|
||||
. $srcdir/diag.sh start-kafka '.dep_wrk2'
|
||||
. $srcdir/diag.sh start-kafka '.dep_wrk3'
|
||||
|
||||
echo Give Kafka some time to sync...
|
||||
sleep 5
|
||||
start_zookeeper '.dep_wrk1'
|
||||
start_zookeeper '.dep_wrk2'
|
||||
start_zookeeper '.dep_wrk3'
|
||||
start_kafka '.dep_wrk1'
|
||||
start_kafka '.dep_wrk2'
|
||||
start_kafka '.dep_wrk3'
|
||||
|
||||
# create new topic
|
||||
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC '.dep_wrk1' '22181'
|
||||
|
||||
echo Give Kafka some time to process topic create ...
|
||||
sleep 5
|
||||
create_kafka_topic $RANDTOPIC '.dep_wrk1' '22181'
|
||||
|
||||
# --- Create imkafka receiver config
|
||||
export RSYSLOG_DEBUGLOG="log"
|
||||
@ -138,15 +131,7 @@ TIMEDIFF=$(echo "$TIMEEND - $TIMESTART" | bc)
|
||||
echo "*** imkafka time to process all data: $TIMEDIFF seconds!"
|
||||
|
||||
# Delete topic to remove old traces before
|
||||
. $srcdir/diag.sh delete-kafka-topic $RANDTOPIC '.dep_wrk1' '22181'
|
||||
|
||||
echo \[sndrcv_kafka.sh\]: stop kafka instances
|
||||
. $srcdir/diag.sh stop-kafka '.dep_wrk1'
|
||||
. $srcdir/diag.sh stop-kafka '.dep_wrk2'
|
||||
. $srcdir/diag.sh stop-kafka '.dep_wrk3'
|
||||
. $srcdir/diag.sh stop-zookeeper '.dep_wrk1'
|
||||
. $srcdir/diag.sh stop-zookeeper '.dep_wrk2'
|
||||
. $srcdir/diag.sh stop-zookeeper '.dep_wrk3'
|
||||
delete_kafka_topic $RANDTOPIC '.dep_wrk1' '22181'
|
||||
|
||||
# Do the final sequence check
|
||||
seq_check 1 $TESTMESSAGESFULL -d
|
||||
|
||||
@ -1,39 +1,32 @@
|
||||
#!/bin/bash
|
||||
# added 2018-08-29 by alorbach
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
echo Init Testbench
|
||||
. $srcdir/diag.sh init
|
||||
check_command_available kafkacat
|
||||
|
||||
# *** ==============================================================================
|
||||
export TESTMESSAGES=100000
|
||||
export TESTMESSAGESFULL=100000
|
||||
|
||||
# Generate random topic name
|
||||
export RANDTOPIC=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
|
||||
|
||||
echo Init Testbench
|
||||
. $srcdir/diag.sh init
|
||||
|
||||
# Check for kafkacat
|
||||
check_command_available kafkacat
|
||||
|
||||
# enable the EXTRA_EXITCHECK only if really needed - otherwise spams the test log
|
||||
# too much
|
||||
#export EXTRA_EXITCHECK=dumpkafkalogs
|
||||
# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only.
|
||||
export EXTRA_EXITCHECK=dumpkafkalogs
|
||||
export EXTRA_EXIT=kafka
|
||||
echo ===============================================================================
|
||||
echo Check and Stop previous instances of kafka/zookeeper
|
||||
download_kafka
|
||||
stop_zookeeper
|
||||
stop_kafka
|
||||
|
||||
echo Create kafka/zookeeper instance and $RANDTOPIC topic
|
||||
. $srcdir/diag.sh download-kafka
|
||||
. $srcdir/diag.sh stop-zookeeper
|
||||
. $srcdir/diag.sh stop-kafka
|
||||
|
||||
echo Create kafka/zookeeper instance and topics
|
||||
. $srcdir/diag.sh start-zookeeper
|
||||
. $srcdir/diag.sh start-kafka
|
||||
|
||||
echo Give Kafka some time to sync...
|
||||
sleep 5
|
||||
start_zookeeper
|
||||
start_kafka
|
||||
|
||||
# create new topic
|
||||
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
echo Give Kafka some time to process topic create ...
|
||||
sleep 5
|
||||
create_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
# --- Create imkafka receiver config
|
||||
export RSYSLOG_DEBUGLOG="log"
|
||||
@ -178,11 +171,7 @@ TIMEDIFF=$(echo "$TIMEEND - $TIMESTART" | bc)
|
||||
echo "*** imkafka time to process all data: $TIMEDIFF seconds!"
|
||||
|
||||
# Delete topic to remove old traces before
|
||||
. $srcdir/diag.sh delete-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
echo \[sndrcv_kafka.sh\]: stop kafka instances
|
||||
. $srcdir/diag.sh stop-kafka
|
||||
. $srcdir/diag.sh stop-zookeeper
|
||||
delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
# Do the final sequence check
|
||||
seq_check 1 $TESTMESSAGESFULL -d
|
||||
|
||||
87
tests/omkafka-vg.sh
Executable file
87
tests/omkafka-vg.sh
Executable file
@ -0,0 +1,87 @@
|
||||
#!/bin/bash
|
||||
# added 2017-05-03 by alorbach
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
echo Init Testbench
|
||||
. $srcdir/diag.sh init
|
||||
check_command_available kafkacat
|
||||
|
||||
# *** ==============================================================================
|
||||
export TESTMESSAGES=10000
|
||||
export TESTMESSAGESFULL=$TESTMESSAGES
|
||||
|
||||
# Generate random topic name
|
||||
export RANDTOPIC=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
|
||||
|
||||
# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only.
|
||||
export EXTRA_EXITCHECK=dumpkafkalogs
|
||||
export EXTRA_EXIT=kafka
|
||||
echo ===============================================================================
|
||||
echo Check and Stop previous instances of kafka/zookeeper
|
||||
download_kafka
|
||||
stop_zookeeper
|
||||
stop_kafka
|
||||
|
||||
echo Create kafka/zookeeper instance and $RANDTOPIC topic
|
||||
start_zookeeper
|
||||
start_kafka
|
||||
# create new topic
|
||||
create_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
# --- Create/Start omkafka sender config
|
||||
export RSYSLOG_DEBUGLOG="log"
|
||||
generate_conf
|
||||
add_conf '
|
||||
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
|
||||
$imdiagInjectDelayMode full
|
||||
|
||||
module(load="../plugins/omkafka/.libs/omkafka")
|
||||
|
||||
template(name="outfmt" type="string" string="%msg:F,58:2%\n")
|
||||
|
||||
local4.* action( name="kafka-fwd"
|
||||
type="omkafka"
|
||||
topic="'$RANDTOPIC'"
|
||||
broker="localhost:29092"
|
||||
template="outfmt"
|
||||
confParam=[ "compression.codec=none",
|
||||
"socket.timeout.ms=10000",
|
||||
"socket.keepalive.enable=true",
|
||||
"reconnect.backoff.jitter.ms=1000",
|
||||
"queue.buffering.max.messages=10000",
|
||||
"enable.auto.commit=true",
|
||||
"message.send.max.retries=1"]
|
||||
topicConfParam=["message.timeout.ms=10000"]
|
||||
partitions.auto="on"
|
||||
closeTimeout="60000"
|
||||
resubmitOnFailure="on"
|
||||
keepFailedMessages="on"
|
||||
failedMsgFile="'$RSYSLOG_OUT_LOG'-failed-'$RANDTOPIC'.data"
|
||||
action.resumeInterval="1"
|
||||
action.resumeRetryCount="2"
|
||||
queue.saveonshutdown="on"
|
||||
)
|
||||
'
|
||||
|
||||
echo Starting sender instance [omkafka]
|
||||
startup_vg
|
||||
# ---
|
||||
|
||||
echo Inject messages into rsyslog sender instance
|
||||
injectmsg 1 $TESTMESSAGES
|
||||
|
||||
echo Stopping sender instance [omkafka]
|
||||
shutdown_when_empty
|
||||
wait_shutdown_vg
|
||||
check_exit_vg
|
||||
|
||||
kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s'> $RSYSLOG_OUT_LOG
|
||||
kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p@%o:%k:%s' > $RSYSLOG_OUT_LOG.extra
|
||||
|
||||
# Delete topic to remove old traces before
|
||||
delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
# Do the final sequence check
|
||||
seq_check 1 $TESTMESSAGESFULL -d
|
||||
|
||||
echo success
|
||||
exit_test
|
||||
@ -1,33 +1,30 @@
|
||||
#!/bin/bash
|
||||
# added 2017-05-03 by alorbach
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
echo Init Testbench
|
||||
. $srcdir/diag.sh init
|
||||
check_command_available kafkacat
|
||||
|
||||
# *** ==============================================================================
|
||||
export TESTMESSAGES=100000
|
||||
export TESTMESSAGESFULL=$TESTMESSAGES
|
||||
|
||||
# Generate random topic name
|
||||
export RANDTOPIC=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
|
||||
|
||||
# enable the EXTRA_EXITCHECK only if really needed - otherwise spams the test log
|
||||
# too much
|
||||
#export EXTRA_EXITCHECK=dumpkafkalogs
|
||||
# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only.
|
||||
export EXTRA_EXITCHECK=dumpkafkalogs
|
||||
export EXTRA_EXIT=kafka
|
||||
echo ===============================================================================
|
||||
echo Check and Stop previous instances of kafka/zookeeper
|
||||
. $srcdir/diag.sh download-kafka
|
||||
. $srcdir/diag.sh stop-zookeeper
|
||||
. $srcdir/diag.sh stop-kafka
|
||||
|
||||
echo Init Testbench
|
||||
. $srcdir/diag.sh init
|
||||
check_command_available kafkacat
|
||||
download_kafka
|
||||
stop_zookeeper
|
||||
stop_kafka
|
||||
|
||||
echo Create kafka/zookeeper instance and $RANDTOPIC topic
|
||||
. $srcdir/diag.sh start-zookeeper
|
||||
. $srcdir/diag.sh start-kafka
|
||||
# create new topic
|
||||
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
echo Give Kafka some time to process topic create ...
|
||||
sleep 5
|
||||
start_zookeeper
|
||||
start_kafka
|
||||
create_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
# --- Create/Start omkafka sender config
|
||||
export RSYSLOG_DEBUGLOG="log"
|
||||
@ -79,16 +76,10 @@ kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%s'> $RSYSLOG_O
|
||||
kafkacat -b localhost:29092 -e -C -o beginning -t $RANDTOPIC -f '%p@%o:%k:%s' > $RSYSLOG_OUT_LOG.extra
|
||||
|
||||
# Delete topic to remove old traces before
|
||||
# . $srcdir/diag.sh delete-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
# Dump Kafka log | uncomment if needed
|
||||
# . $srcdir/diag.sh dump-kafka-serverlog
|
||||
|
||||
echo stop kafka instance
|
||||
. $srcdir/diag.sh stop-kafka
|
||||
|
||||
# STOP ZOOKEEPER in any case
|
||||
. $srcdir/diag.sh stop-zookeeper
|
||||
# dump_kafka_serverlog
|
||||
|
||||
# Do the final sequence check
|
||||
seq_check 1 $TESTMESSAGESFULL -d
|
||||
|
||||
@ -1,31 +1,30 @@
|
||||
#!/bin/bash
|
||||
# added 2017-05-03 by alorbach
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
echo Init Testbench
|
||||
. $srcdir/diag.sh init
|
||||
|
||||
# *** ==============================================================================
|
||||
export TESTMESSAGES=100000
|
||||
export TESTMESSAGESFULL=100000
|
||||
|
||||
# Generate random topic name
|
||||
export RANDTOPIC=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
|
||||
|
||||
# enable the EXTRA_EXITCHECK only if really needed - otherwise spams the test log
|
||||
# too much
|
||||
# export EXTRA_EXITCHECK=dumpkafkalogs
|
||||
# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only.
|
||||
export EXTRA_EXITCHECK=dumpkafkalogs
|
||||
export EXTRA_EXIT=kafka
|
||||
echo ===============================================================================
|
||||
echo Check and Stop previous instances of kafka/zookeeper
|
||||
. $srcdir/diag.sh download-kafka
|
||||
. $srcdir/diag.sh stop-zookeeper
|
||||
. $srcdir/diag.sh stop-kafka
|
||||
|
||||
echo Init Testbench
|
||||
. $srcdir/diag.sh init
|
||||
download_kafka
|
||||
stop_zookeeper
|
||||
stop_kafka
|
||||
|
||||
echo Create kafka/zookeeper instance and topics
|
||||
. $srcdir/diag.sh start-zookeeper
|
||||
. $srcdir/diag.sh start-kafka
|
||||
start_zookeeper
|
||||
start_kafka
|
||||
# create new topic
|
||||
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
echo Give Kafka some time to process topic create ...
|
||||
sleep 5
|
||||
create_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
# --- Create/Start omkafka sender config
|
||||
export RSYSLOG_DEBUGLOG="log"
|
||||
@ -112,16 +111,10 @@ shutdown_when_empty 2
|
||||
wait_shutdown 2
|
||||
|
||||
# Delete topic to remove old traces before
|
||||
. $srcdir/diag.sh delete-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
# Dump Kafka log | uncomment if needed
|
||||
# . $srcdir/diag.sh dump-kafka-serverlog
|
||||
|
||||
echo stop kafka instance
|
||||
. $srcdir/diag.sh stop-kafka
|
||||
|
||||
# STOP ZOOKEEPER in any case
|
||||
. $srcdir/diag.sh stop-zookeeper
|
||||
# dump_kafka_serverlog
|
||||
|
||||
# Do the final sequence check
|
||||
seq_check 1 $TESTMESSAGESFULL -d
|
||||
|
||||
@ -2,6 +2,10 @@
|
||||
# added 2017-05-18 by alorbach
|
||||
# This test only tests what happens when kafka cluster fails
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
echo Init Testbench
|
||||
. $srcdir/diag.sh init
|
||||
|
||||
# *** ==============================================================================
|
||||
export TESTMESSAGES=50000
|
||||
export TESTMESSAGES2=50001
|
||||
export TESTMESSAGESFULL=100000
|
||||
@ -9,25 +13,22 @@ export TESTMESSAGESFULL=100000
|
||||
# Generate random topic name
|
||||
export RANDTOPIC=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
|
||||
|
||||
# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only.
|
||||
export EXTRA_EXITCHECK=dumpkafkalogs
|
||||
export EXTRA_EXIT=kafka
|
||||
echo ===============================================================================
|
||||
echo Check and Stop previous instances of kafka/zookeeper
|
||||
. $srcdir/diag.sh download-kafka
|
||||
. $srcdir/diag.sh stop-zookeeper
|
||||
. $srcdir/diag.sh stop-kafka
|
||||
|
||||
echo Init Testbench
|
||||
. $srcdir/diag.sh init
|
||||
download_kafka
|
||||
stop_zookeeper
|
||||
stop_kafka
|
||||
|
||||
echo Create kafka/zookeeper instance and topics
|
||||
. $srcdir/diag.sh start-zookeeper
|
||||
. $srcdir/diag.sh start-kafka
|
||||
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
echo Give Kafka some time to process topic create ...
|
||||
sleep 5
|
||||
start_zookeeper
|
||||
start_kafka
|
||||
create_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
echo Stopping kafka cluster instance
|
||||
. $srcdir/diag.sh stop-kafka
|
||||
stop_kafka
|
||||
|
||||
# --- Create imkafka receiver config
|
||||
export RSYSLOG_DEBUGLOG="log"
|
||||
@ -103,7 +104,7 @@ echo Inject messages into rsyslog sender instance
|
||||
injectmsg 1 $TESTMESSAGES
|
||||
|
||||
echo Starting kafka cluster instance
|
||||
. $srcdir/diag.sh start-kafka
|
||||
start_kafka
|
||||
|
||||
echo Sleep to give rsyslog instances time to process data ...
|
||||
sleep 5
|
||||
@ -123,13 +124,7 @@ shutdown_when_empty
|
||||
wait_shutdown
|
||||
|
||||
echo delete kafka topics
|
||||
. $srcdir/diag.sh delete-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
echo stop kafka instance
|
||||
. $srcdir/diag.sh stop-kafka
|
||||
|
||||
# STOP ZOOKEEPER in any case
|
||||
. $srcdir/diag.sh stop-zookeeper
|
||||
delete_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
# Do the final sequence check
|
||||
seq_check2 1 $TESTMESSAGESFULL -d
|
||||
|
||||
@ -2,28 +2,29 @@
|
||||
# added 2017-06-06 by alorbach
|
||||
# This tests the keepFailedMessages feature in omkafka
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
echo Init Testbench
|
||||
. $srcdir/diag.sh init
|
||||
|
||||
# *** ==============================================================================
|
||||
export TESTMESSAGES=50000
|
||||
export TESTMESSAGESFULL=50000
|
||||
|
||||
# Generate random topic name
|
||||
export RANDTOPIC=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
|
||||
|
||||
# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only.
|
||||
export EXTRA_EXITCHECK=dumpkafkalogs
|
||||
export EXTRA_EXIT=kafka
|
||||
echo ===============================================================================
|
||||
echo Check and Stop previous instances of kafka/zookeeper
|
||||
. $srcdir/diag.sh download-kafka
|
||||
. $srcdir/diag.sh stop-zookeeper
|
||||
. $srcdir/diag.sh stop-kafka
|
||||
|
||||
echo Init Testbench
|
||||
. $srcdir/diag.sh init
|
||||
download_kafka
|
||||
stop_zookeeper
|
||||
stop_kafka
|
||||
|
||||
echo Create kafka/zookeeper instance and topics
|
||||
. $srcdir/diag.sh start-zookeeper
|
||||
. $srcdir/diag.sh start-kafka
|
||||
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
echo Give Kafka some time to process topic create ...
|
||||
sleep 5
|
||||
start_zookeeper
|
||||
start_kafka
|
||||
create_kafka_topic $RANDTOPIC '.dep_wrk' '22181'
|
||||
|
||||
# --- Create omkafka receiver config
|
||||
export RSYSLOG_DEBUGLOG="log"
|
||||
@ -95,14 +96,14 @@ echo Inject messages into rsyslog sender instance
|
||||
injectmsg 1 $TESTMESSAGES
|
||||
|
||||
echo Stopping kafka cluster instance
|
||||
. $srcdir/diag.sh stop-kafka
|
||||
stop_kafka
|
||||
|
||||
echo Stopping sender instance [imkafka]
|
||||
shutdown_when_empty 2
|
||||
wait_shutdown 2
|
||||
|
||||
echo Starting kafka cluster instance
|
||||
. $srcdir/diag.sh start-kafka
|
||||
start_kafka
|
||||
|
||||
echo Sleep to give rsyslog instances time to process data ...
|
||||
sleep 5
|
||||
@ -123,13 +124,7 @@ shutdown_when_empty
|
||||
wait_shutdown
|
||||
|
||||
echo delete kafka topics
|
||||
. $srcdir/diag.sh delete-kafka-topic 'static' '.dep_wrk' '22181'
|
||||
|
||||
echo stop kafka instance
|
||||
. $srcdir/diag.sh stop-kafka
|
||||
|
||||
# STOP ZOOKEEPER in any case
|
||||
. $srcdir/diag.sh stop-zookeeper
|
||||
delete_kafka_topic 'static' '.dep_wrk' '22181'
|
||||
|
||||
# Do the final sequence check
|
||||
seq_check 1 $TESTMESSAGESFULL
|
||||
|
||||
@ -1,35 +1,37 @@
|
||||
#!/bin/bash
|
||||
# added 2017-05-08 by alorbach
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
echo Init Testbench
|
||||
. $srcdir/diag.sh init
|
||||
|
||||
# *** ==============================================================================
|
||||
export TESTMESSAGES=100000
|
||||
export TESTMESSAGESFULL=100000
|
||||
|
||||
# Generate random topic name
|
||||
export RANDTOPIC=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
|
||||
|
||||
# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only.
|
||||
export EXTRA_EXITCHECK=dumpkafkalogs
|
||||
export EXTRA_EXIT=kafkamulti
|
||||
echo ===============================================================================
|
||||
echo Check and Stop previous instances of kafka/zookeeper
|
||||
. $srcdir/diag.sh download-kafka
|
||||
. $srcdir/diag.sh stop-zookeeper '.dep_wrk1'
|
||||
. $srcdir/diag.sh stop-zookeeper '.dep_wrk2'
|
||||
. $srcdir/diag.sh stop-zookeeper '.dep_wrk3'
|
||||
. $srcdir/diag.sh stop-kafka '.dep_wrk1'
|
||||
. $srcdir/diag.sh stop-kafka '.dep_wrk2'
|
||||
. $srcdir/diag.sh stop-kafka '.dep_wrk3'
|
||||
|
||||
. $srcdir/diag.sh init
|
||||
download_kafka
|
||||
stop_zookeeper '.dep_wrk1'
|
||||
stop_zookeeper '.dep_wrk2'
|
||||
stop_zookeeper '.dep_wrk3'
|
||||
stop_kafka '.dep_wrk1'
|
||||
stop_kafka '.dep_wrk2'
|
||||
stop_kafka '.dep_wrk3'
|
||||
|
||||
echo Create kafka/zookeeper instance and topics
|
||||
. $srcdir/diag.sh start-zookeeper '.dep_wrk1'
|
||||
. $srcdir/diag.sh start-zookeeper '.dep_wrk2'
|
||||
. $srcdir/diag.sh start-zookeeper '.dep_wrk3'
|
||||
. $srcdir/diag.sh start-kafka '.dep_wrk1'
|
||||
. $srcdir/diag.sh start-kafka '.dep_wrk2'
|
||||
. $srcdir/diag.sh start-kafka '.dep_wrk3'
|
||||
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC '.dep_wrk1' '22181'
|
||||
|
||||
echo Give Kafka some time to process topic create ...
|
||||
sleep 5
|
||||
start_zookeeper '.dep_wrk1'
|
||||
start_zookeeper '.dep_wrk2'
|
||||
start_zookeeper '.dep_wrk3'
|
||||
start_kafka '.dep_wrk1'
|
||||
start_kafka '.dep_wrk2'
|
||||
start_kafka '.dep_wrk3'
|
||||
create_kafka_topic $RANDTOPIC '.dep_wrk1' '22181'
|
||||
|
||||
# --- Create omkafka sender config
|
||||
export RSYSLOG_DEBUGLOG="log"
|
||||
@ -119,15 +121,7 @@ shutdown_when_empty 2
|
||||
wait_shutdown 2
|
||||
|
||||
echo delete kafka topics
|
||||
. $srcdir/diag.sh delete-kafka-topic $RANDTOPIC '.dep_wrk1' '22181'
|
||||
|
||||
echo stop kafka instances
|
||||
. $srcdir/diag.sh stop-kafka '.dep_wrk1'
|
||||
. $srcdir/diag.sh stop-kafka '.dep_wrk2'
|
||||
. $srcdir/diag.sh stop-kafka '.dep_wrk3'
|
||||
. $srcdir/diag.sh stop-zookeeper '.dep_wrk1'
|
||||
. $srcdir/diag.sh stop-zookeeper '.dep_wrk2'
|
||||
. $srcdir/diag.sh stop-zookeeper '.dep_wrk3'
|
||||
delete_kafka_topic $RANDTOPIC '.dep_wrk1' '22181'
|
||||
|
||||
# Do the final sequence check
|
||||
seq_check 1 $TESTMESSAGESFULL -d
|
||||
|
||||
@ -1,6 +1,10 @@
|
||||
#!/bin/bash
|
||||
# added 2018-08-13 by alorbach
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
echo Init Testbench
|
||||
. $srcdir/diag.sh init
|
||||
|
||||
# *** ==============================================================================
|
||||
export TESTMESSAGES=50000
|
||||
export TESTMESSAGESFULL=100000
|
||||
|
||||
@ -8,25 +12,20 @@ export TESTMESSAGESFULL=100000
|
||||
export RANDTOPIC1=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
|
||||
export RANDTOPIC2=$(cat /dev/urandom | tr -dc 'a-zA-Z0-9' | fold -w 8 | head -n 1)
|
||||
|
||||
# enable the EXTRA_EXITCHECK only if really needed - otherwise spams the test log
|
||||
# too much
|
||||
#export EXTRA_EXITCHECK=dumpkafkalogs
|
||||
# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only.
|
||||
export EXTRA_EXITCHECK=dumpkafkalogs
|
||||
export EXTRA_EXIT=kafka
|
||||
echo ===============================================================================
|
||||
echo Check and Stop previous instances of kafka/zookeeper
|
||||
. $srcdir/diag.sh download-kafka
|
||||
. $srcdir/diag.sh stop-zookeeper
|
||||
. $srcdir/diag.sh stop-kafka
|
||||
|
||||
echo Init Testbench
|
||||
. $srcdir/diag.sh init
|
||||
download_kafka
|
||||
stop_zookeeper
|
||||
stop_kafka
|
||||
|
||||
echo Create kafka/zookeeper instance and topics
|
||||
. $srcdir/diag.sh start-zookeeper
|
||||
. $srcdir/diag.sh start-kafka
|
||||
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC1 '.dep_wrk' '22181'
|
||||
. $srcdir/diag.sh create-kafka-topic $RANDTOPIC2 '.dep_wrk' '22181'
|
||||
|
||||
echo Give Kafka some time to process topic create ...
|
||||
sleep 5
|
||||
start_zookeeper
|
||||
start_kafka
|
||||
create_kafka_topic $RANDTOPIC1 '.dep_wrk' '22181'
|
||||
create_kafka_topic $RANDTOPIC2 '.dep_wrk' '22181'
|
||||
|
||||
# --- Create omkafka sender config
|
||||
export RSYSLOG_DEBUGLOG="log"
|
||||
@ -146,17 +145,11 @@ shutdown_when_empty 2
|
||||
wait_shutdown 2
|
||||
|
||||
echo delete kafka topics
|
||||
. $srcdir/diag.sh delete-kafka-topic $RANDTOPIC1 '.dep_wrk' '22181'
|
||||
. $srcdir/diag.sh delete-kafka-topic $RANDTOPIC2 '.dep_wrk' '22181'
|
||||
delete_kafka_topic $RANDTOPIC1 '.dep_wrk' '22181'
|
||||
delete_kafka_topic $RANDTOPIC2 '.dep_wrk' '22181'
|
||||
|
||||
# Dump Kafka log | uncomment if needed
|
||||
# . $srcdir/diag.sh dump-kafka-serverlog
|
||||
|
||||
echo stop kafka instance
|
||||
. $srcdir/diag.sh stop-kafka
|
||||
|
||||
# STOP ZOOKEEPER in any case
|
||||
. $srcdir/diag.sh stop-zookeeper
|
||||
# dump_kafka_serverlog
|
||||
|
||||
# Do the final sequence check
|
||||
seq_check 1 $TESTMESSAGES -d
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user