From bae734abdd4fd8b880e4dfed15f3b2f09250ea3c Mon Sep 17 00:00:00 2001 From: Andre Lorbach Date: Mon, 1 Oct 2018 12:40:26 +0200 Subject: [PATCH] testbench: Changed kafka server to kafka_2.11-2.0.0.tgz Also adjusted some kafka server settings related to log retention. Hardened stop function for kafka and zookeeper. See also https://github.com/rsyslog/rsyslog/issues/3057 --- tests/diag.sh | 100 +++++++++++++++--- .../kafka-server.dep_wrk1.properties | 5 +- .../kafka-server.dep_wrk2.properties | 5 +- .../kafka-server.dep_wrk3.properties | 5 +- tests/testsuites/kafka-server.properties | 5 +- 5 files changed, 95 insertions(+), 25 deletions(-) diff --git a/tests/diag.sh b/tests/diag.sh index be89b63e6..37cf4cd7d 100755 --- a/tests/diag.sh +++ b/tests/diag.sh @@ -841,8 +841,8 @@ dep_zk_cached_file=$dep_cache_dir/zookeeper-3.4.13.tar.gz # makes creating testbench with single kafka instances difficult. # old version -> dep_kafka_url=http://www-us.apache.org/dist/kafka/0.10.2.2/kafka_2.12-0.10.2.2.tgz # old version -> dep_kafka_cached_file=$dep_cache_dir/kafka_2.12-0.10.2.2.tgz -dep_kafka_url=http://www-us.apache.org/dist/kafka/2.0.0/kafka_2.12-2.0.0.tgz -dep_kafka_cached_file=$dep_cache_dir/kafka_2.12-2.0.0.tgz +dep_kafka_url=http://www-us.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz +dep_kafka_cached_file=$dep_cache_dir/kafka_2.11-2.0.0.tgz if [ -z "$ES_DOWNLOAD" ]; then export ES_DOWNLOAD=elasticsearch-5.6.9.tar.gz @@ -868,21 +868,21 @@ function kafka_exit_handling() { # Extended Exit handling for kafka / zookeeper instances if [[ "$EXTRA_EXIT" == 'kafka' ]]; then - echo stop kafka instance + echo "stop kafka instance" stop_kafka '.dep_wrk' $1 - echo stop zookeeper instance + echo "stop zookeeper instance" stop_zookeeper '.dep_wrk' $1 fi # Extended Exit handling for kafka / zookeeper instances if [[ "$EXTRA_EXIT" == 'kafkamulti' ]]; then - echo stop kafka instances + echo "stop kafka instances" stop_kafka '.dep_wrk1' $1 stop_kafka '.dep_wrk2' $1 stop_kafka '.dep_wrk3' $1 - echo stop zookeeper instances + echo "stop zookeeper instances" stop_zookeeper '.dep_wrk1' $1 stop_zookeeper '.dep_wrk2' $1 stop_zookeeper '.dep_wrk3' $1 @@ -923,18 +923,47 @@ function download_kafka() { } function stop_kafka() { + i=0 if [ "x$1" == "x" ]; then dep_work_dir=$(readlink -f .dep_wrk) + dep_work_kafka_config="kafka-server.properties" else dep_work_dir=$(readlink -f $srcdir/$1) + if [[ ".dep_wrk" != "$1" ]]; then + dep_work_kafka_config="kafka-server$1.properties" + else + dep_work_kafka_config="kafka-server.properties" + fi fi if [ ! -d $dep_work_dir/kafka ]; then echo "Kafka work-dir $dep_work_dir/kafka does not exist, no action needed" else - echo "Stopping Kafka instance $1" - (cd $dep_work_dir/kafka && ./bin/kafka-server-stop.sh) + # Get kafka pid instance + kafkapid=$(ps aux | grep -i $dep_work_kafka_config | grep java | grep -v grep | awk '{print $2}') + + echo "Stopping Kafka instance $1 ($dep_work_kafka_config/$kafkapid)" + kill $kafkapid + + # Check if kafka instance went down! + while true; do +#echo "waiting for propper shutdown for $dep_work_kafka_config / $kafkapid " + kafkapid=$(ps aux | grep -i $dep_work_kafka_config | grep java | grep -v grep | awk '{print $2}') + if [[ "" != "$kafkapid" ]]; then + $TESTTOOL_DIR/msleep 100 # wait 100 milliseconds + if test $i -gt $TB_TIMEOUT_STARTSTOP; then + echo "Kafka instance $dep_work_kafka_config (PID $kafkapid) still running - Performing hard shutdown (-9)" + kill -9 $kafkapid + break + fi + let "i++" + else + # Break the loop + break + fi + done + if [[ "$2" == 'true' ]]; then - $TESTTOOL_DIR/msleep 2000 + # Prozess shutdown, do cleanup now cleanup_kafka $1 fi fi @@ -955,15 +984,52 @@ function cleanup_kafka() { } function stop_zookeeper() { + i=0 if [ "x$1" == "x" ]; then dep_work_dir=$(readlink -f .dep_wrk) + dep_work_tk_config="zoo.cfg" else dep_work_dir=$(readlink -f $srcdir/$1) + if [[ ".dep_wrk" != "$1" ]]; then + dep_work_tk_config="zoo$1.cfg" + else + dep_work_tk_config="zoo.cfg" + fi fi - (cd $dep_work_dir/zk &> /dev/null && ./bin/zkServer.sh stop) - if [[ "$2" == 'true' ]]; then - $TESTTOOL_DIR/msleep 2000 - cleanup_zookeeper $1 + + if [ ! -d $dep_work_dir/zk ]; then + echo "Zookeeper work-dir $dep_work_dir/zk does not exist, no action needed" + else + # Get Zookeeper pid instance + zkpid=$(ps aux | grep -i $dep_work_tk_config | grep java | grep -v grep | awk '{print $2}') + echo "Stopping Zookeeper instance $1 ($dep_work_tk_config/$zkpid)" + kill $zkpid + + # Check if Zookeeper instance went down! + zkpid=$(ps aux | grep -i $dep_work_tk_config | grep java | grep -v grep | awk '{print $2}') + if [[ "" != "$zkpid" ]]; then + while true; do +#echo "waiting for propper shutdown for $dep_work_tk_config / $zkpid " + zkpid=$(ps aux | grep -i $dep_work_tk_config | grep java | grep -v grep | awk '{print $2}') + if [[ "" != "$zkpid" ]]; then + $TESTTOOL_DIR/msleep 100 # wait 100 milliseconds + if test $i -gt $TB_TIMEOUT_STARTSTOP; then + echo "Zookeeper instance $dep_work_tk_config (PID $zkpid) still running - Performing hard shutdown (-9)" + kill -9 $zkpid + break + fi + let "i++" + else + # Break the loop + break + fi + done + fi + + if [[ "$2" == 'true' ]]; then + # Prozess shutdown, do cleanup now + cleanup_zookeeper $1 + fi fi } @@ -1033,19 +1099,19 @@ function start_kafka() { $TESTTOOL_DIR/msleep 4000 # Check if kafka instance came up! - kafkapid=$(ps aux | grep -i $dep_work_kafka_config | grep java | grep -v grep | awk '{print $1}') + kafkapid=$(ps aux | grep -i $dep_work_kafka_config | grep java | grep -v grep | awk '{print $2}') if [[ "" != "$kafkapid" ]]; then - echo "Kafka instance $dep_work_kafka_config started with PID $kafkapid" + echo "Kafka instance $dep_work_kafka_config (PID $kafkapid) started ... " else echo "Starting Kafka instance $dep_work_kafka_config, SECOND ATTEMPT!" (cd $dep_work_dir/kafka && ./bin/kafka-server-start.sh -daemon ./config/$dep_work_kafka_config) $TESTTOOL_DIR/msleep 4000 - kafkapid=$(ps aux | grep -i $dep_work_kafka_config | grep java | grep -v grep | awk '{print $1}') + kafkapid=$(ps aux | grep -i $dep_work_kafka_config | grep java | grep -v grep | awk '{print $2}') if [[ "" != "$kafkapid" ]]; then - echo "Kafka instance $dep_work_kafka_config started with PID $kafkapid" + echo "Kafka instance $dep_work_kafka_config (PID $kafkapid) started ... " else echo "Failed to start Kafka instance for $dep_work_kafka_config" error_exit 77 diff --git a/tests/testsuites/kafka-server.dep_wrk1.properties b/tests/testsuites/kafka-server.dep_wrk1.properties index c691bc426..c4f34c79d 100644 --- a/tests/testsuites/kafka-server.dep_wrk1.properties +++ b/tests/testsuites/kafka-server.dep_wrk1.properties @@ -27,8 +27,8 @@ log.index.size.max.bytes=104857600 log.message.timestamp.type=CreateTime log.retention.check.interval.ms=300000 log.retention.bytes=104857600 -log.retention.hours=5000 -log.roll.hours=168 +log.retention.hours=10000 +log.roll.hours=5000 message.max.bytes=1000000 num.network.threads=2 @@ -45,6 +45,7 @@ socket.send.buffer.bytes=102400 offsets.storage=kafka offsets.topic.num.partitions=1 offsets.topic.replication.factor=3 +offsets.retention.minutes=10080 transaction.state.log.num.partitions=1 replica.fetch.max.bytes=10485760 diff --git a/tests/testsuites/kafka-server.dep_wrk2.properties b/tests/testsuites/kafka-server.dep_wrk2.properties index a5d7f0574..0216391fc 100644 --- a/tests/testsuites/kafka-server.dep_wrk2.properties +++ b/tests/testsuites/kafka-server.dep_wrk2.properties @@ -27,8 +27,8 @@ log.index.size.max.bytes=104857600 log.message.timestamp.type=CreateTime log.retention.check.interval.ms=300000 log.retention.bytes=104857600 -log.retention.hours=5000 -log.roll.hours=168 +log.retention.hours=10000 +log.roll.hours=5000 message.max.bytes=1000000 num.network.threads=2 @@ -45,6 +45,7 @@ socket.send.buffer.bytes=102400 offsets.storage=kafka offsets.topic.num.partitions=1 offsets.topic.replication.factor=3 +offsets.retention.minutes=10080 transaction.state.log.num.partitions=1 replica.fetch.max.bytes=10485760 diff --git a/tests/testsuites/kafka-server.dep_wrk3.properties b/tests/testsuites/kafka-server.dep_wrk3.properties index 22a3e98ff..858da77f4 100644 --- a/tests/testsuites/kafka-server.dep_wrk3.properties +++ b/tests/testsuites/kafka-server.dep_wrk3.properties @@ -27,8 +27,8 @@ log.index.size.max.bytes=104857600 log.message.timestamp.type=CreateTime log.retention.check.interval.ms=300000 log.retention.bytes=104857600 -log.retention.hours=5000 -log.roll.hours=168 +log.retention.hours=10000 +log.roll.hours=5000 message.max.bytes=1000000 num.network.threads=2 @@ -45,6 +45,7 @@ socket.send.buffer.bytes=102400 offsets.storage=kafka offsets.topic.num.partitions=1 offsets.topic.replication.factor=3 +offsets.retention.minutes=10080 transaction.state.log.num.partitions=1 replica.fetch.max.bytes=10485760 diff --git a/tests/testsuites/kafka-server.properties b/tests/testsuites/kafka-server.properties index 60a12b5ca..bb79fadea 100644 --- a/tests/testsuites/kafka-server.properties +++ b/tests/testsuites/kafka-server.properties @@ -27,8 +27,8 @@ log.index.size.max.bytes=104857600 log.message.timestamp.type=CreateTime log.retention.check.interval.ms=300000 log.retention.bytes=104857600 -log.retention.hours=5000 -log.roll.hours=168 +log.retention.hours=10000 +log.roll.hours=5000 message.max.bytes=1000000 num.network.threads=2 @@ -45,6 +45,7 @@ socket.send.buffer.bytes=102400 offsets.storage=kafka offsets.topic.num.partitions=2 offsets.topic.replication.factor=1 +offsets.retention.minutes=10080 transaction.state.log.num.partitions=2 replica.fetch.max.bytes=10485760