testbench: Changed kafka server to kafka_2.11-2.0.0.tgz

Also adjusted some kafka server settings related to log retention.
Hardened stop function for kafka and zookeeper.
See also https://github.com/rsyslog/rsyslog/issues/3057
This commit is contained in:
Andre Lorbach 2018-10-01 12:40:26 +02:00
parent 98289652be
commit bae734abdd
5 changed files with 95 additions and 25 deletions

View File

@ -841,8 +841,8 @@ dep_zk_cached_file=$dep_cache_dir/zookeeper-3.4.13.tar.gz
# makes creating testbench with single kafka instances difficult. # makes creating testbench with single kafka instances difficult.
# old version -> dep_kafka_url=http://www-us.apache.org/dist/kafka/0.10.2.2/kafka_2.12-0.10.2.2.tgz # old version -> dep_kafka_url=http://www-us.apache.org/dist/kafka/0.10.2.2/kafka_2.12-0.10.2.2.tgz
# old version -> dep_kafka_cached_file=$dep_cache_dir/kafka_2.12-0.10.2.2.tgz # old version -> dep_kafka_cached_file=$dep_cache_dir/kafka_2.12-0.10.2.2.tgz
dep_kafka_url=http://www-us.apache.org/dist/kafka/2.0.0/kafka_2.12-2.0.0.tgz dep_kafka_url=http://www-us.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz
dep_kafka_cached_file=$dep_cache_dir/kafka_2.12-2.0.0.tgz dep_kafka_cached_file=$dep_cache_dir/kafka_2.11-2.0.0.tgz
if [ -z "$ES_DOWNLOAD" ]; then if [ -z "$ES_DOWNLOAD" ]; then
export ES_DOWNLOAD=elasticsearch-5.6.9.tar.gz export ES_DOWNLOAD=elasticsearch-5.6.9.tar.gz
@ -868,21 +868,21 @@ function kafka_exit_handling() {
# Extended Exit handling for kafka / zookeeper instances # Extended Exit handling for kafka / zookeeper instances
if [[ "$EXTRA_EXIT" == 'kafka' ]]; then if [[ "$EXTRA_EXIT" == 'kafka' ]]; then
echo stop kafka instance echo "stop kafka instance"
stop_kafka '.dep_wrk' $1 stop_kafka '.dep_wrk' $1
echo stop zookeeper instance echo "stop zookeeper instance"
stop_zookeeper '.dep_wrk' $1 stop_zookeeper '.dep_wrk' $1
fi fi
# Extended Exit handling for kafka / zookeeper instances # Extended Exit handling for kafka / zookeeper instances
if [[ "$EXTRA_EXIT" == 'kafkamulti' ]]; then if [[ "$EXTRA_EXIT" == 'kafkamulti' ]]; then
echo stop kafka instances echo "stop kafka instances"
stop_kafka '.dep_wrk1' $1 stop_kafka '.dep_wrk1' $1
stop_kafka '.dep_wrk2' $1 stop_kafka '.dep_wrk2' $1
stop_kafka '.dep_wrk3' $1 stop_kafka '.dep_wrk3' $1
echo stop zookeeper instances echo "stop zookeeper instances"
stop_zookeeper '.dep_wrk1' $1 stop_zookeeper '.dep_wrk1' $1
stop_zookeeper '.dep_wrk2' $1 stop_zookeeper '.dep_wrk2' $1
stop_zookeeper '.dep_wrk3' $1 stop_zookeeper '.dep_wrk3' $1
@ -923,18 +923,47 @@ function download_kafka() {
} }
function stop_kafka() { function stop_kafka() {
i=0
if [ "x$1" == "x" ]; then if [ "x$1" == "x" ]; then
dep_work_dir=$(readlink -f .dep_wrk) dep_work_dir=$(readlink -f .dep_wrk)
dep_work_kafka_config="kafka-server.properties"
else else
dep_work_dir=$(readlink -f $srcdir/$1) dep_work_dir=$(readlink -f $srcdir/$1)
if [[ ".dep_wrk" != "$1" ]]; then
dep_work_kafka_config="kafka-server$1.properties"
else
dep_work_kafka_config="kafka-server.properties"
fi
fi fi
if [ ! -d $dep_work_dir/kafka ]; then if [ ! -d $dep_work_dir/kafka ]; then
echo "Kafka work-dir $dep_work_dir/kafka does not exist, no action needed" echo "Kafka work-dir $dep_work_dir/kafka does not exist, no action needed"
else else
echo "Stopping Kafka instance $1" # Get kafka pid instance
(cd $dep_work_dir/kafka && ./bin/kafka-server-stop.sh) kafkapid=$(ps aux | grep -i $dep_work_kafka_config | grep java | grep -v grep | awk '{print $2}')
echo "Stopping Kafka instance $1 ($dep_work_kafka_config/$kafkapid)"
kill $kafkapid
# Check if kafka instance went down!
while true; do
#echo "waiting for propper shutdown for $dep_work_kafka_config / $kafkapid "
kafkapid=$(ps aux | grep -i $dep_work_kafka_config | grep java | grep -v grep | awk '{print $2}')
if [[ "" != "$kafkapid" ]]; then
$TESTTOOL_DIR/msleep 100 # wait 100 milliseconds
if test $i -gt $TB_TIMEOUT_STARTSTOP; then
echo "Kafka instance $dep_work_kafka_config (PID $kafkapid) still running - Performing hard shutdown (-9)"
kill -9 $kafkapid
break
fi
let "i++"
else
# Break the loop
break
fi
done
if [[ "$2" == 'true' ]]; then if [[ "$2" == 'true' ]]; then
$TESTTOOL_DIR/msleep 2000 # Prozess shutdown, do cleanup now
cleanup_kafka $1 cleanup_kafka $1
fi fi
fi fi
@ -955,15 +984,52 @@ function cleanup_kafka() {
} }
function stop_zookeeper() { function stop_zookeeper() {
i=0
if [ "x$1" == "x" ]; then if [ "x$1" == "x" ]; then
dep_work_dir=$(readlink -f .dep_wrk) dep_work_dir=$(readlink -f .dep_wrk)
dep_work_tk_config="zoo.cfg"
else else
dep_work_dir=$(readlink -f $srcdir/$1) dep_work_dir=$(readlink -f $srcdir/$1)
if [[ ".dep_wrk" != "$1" ]]; then
dep_work_tk_config="zoo$1.cfg"
else
dep_work_tk_config="zoo.cfg"
fi
fi fi
(cd $dep_work_dir/zk &> /dev/null && ./bin/zkServer.sh stop)
if [[ "$2" == 'true' ]]; then if [ ! -d $dep_work_dir/zk ]; then
$TESTTOOL_DIR/msleep 2000 echo "Zookeeper work-dir $dep_work_dir/zk does not exist, no action needed"
cleanup_zookeeper $1 else
# Get Zookeeper pid instance
zkpid=$(ps aux | grep -i $dep_work_tk_config | grep java | grep -v grep | awk '{print $2}')
echo "Stopping Zookeeper instance $1 ($dep_work_tk_config/$zkpid)"
kill $zkpid
# Check if Zookeeper instance went down!
zkpid=$(ps aux | grep -i $dep_work_tk_config | grep java | grep -v grep | awk '{print $2}')
if [[ "" != "$zkpid" ]]; then
while true; do
#echo "waiting for propper shutdown for $dep_work_tk_config / $zkpid "
zkpid=$(ps aux | grep -i $dep_work_tk_config | grep java | grep -v grep | awk '{print $2}')
if [[ "" != "$zkpid" ]]; then
$TESTTOOL_DIR/msleep 100 # wait 100 milliseconds
if test $i -gt $TB_TIMEOUT_STARTSTOP; then
echo "Zookeeper instance $dep_work_tk_config (PID $zkpid) still running - Performing hard shutdown (-9)"
kill -9 $zkpid
break
fi
let "i++"
else
# Break the loop
break
fi
done
fi
if [[ "$2" == 'true' ]]; then
# Prozess shutdown, do cleanup now
cleanup_zookeeper $1
fi
fi fi
} }
@ -1033,19 +1099,19 @@ function start_kafka() {
$TESTTOOL_DIR/msleep 4000 $TESTTOOL_DIR/msleep 4000
# Check if kafka instance came up! # Check if kafka instance came up!
kafkapid=$(ps aux | grep -i $dep_work_kafka_config | grep java | grep -v grep | awk '{print $1}') kafkapid=$(ps aux | grep -i $dep_work_kafka_config | grep java | grep -v grep | awk '{print $2}')
if [[ "" != "$kafkapid" ]]; if [[ "" != "$kafkapid" ]];
then then
echo "Kafka instance $dep_work_kafka_config started with PID $kafkapid" echo "Kafka instance $dep_work_kafka_config (PID $kafkapid) started ... "
else else
echo "Starting Kafka instance $dep_work_kafka_config, SECOND ATTEMPT!" echo "Starting Kafka instance $dep_work_kafka_config, SECOND ATTEMPT!"
(cd $dep_work_dir/kafka && ./bin/kafka-server-start.sh -daemon ./config/$dep_work_kafka_config) (cd $dep_work_dir/kafka && ./bin/kafka-server-start.sh -daemon ./config/$dep_work_kafka_config)
$TESTTOOL_DIR/msleep 4000 $TESTTOOL_DIR/msleep 4000
kafkapid=$(ps aux | grep -i $dep_work_kafka_config | grep java | grep -v grep | awk '{print $1}') kafkapid=$(ps aux | grep -i $dep_work_kafka_config | grep java | grep -v grep | awk '{print $2}')
if [[ "" != "$kafkapid" ]]; if [[ "" != "$kafkapid" ]];
then then
echo "Kafka instance $dep_work_kafka_config started with PID $kafkapid" echo "Kafka instance $dep_work_kafka_config (PID $kafkapid) started ... "
else else
echo "Failed to start Kafka instance for $dep_work_kafka_config" echo "Failed to start Kafka instance for $dep_work_kafka_config"
error_exit 77 error_exit 77

View File

@ -27,8 +27,8 @@ log.index.size.max.bytes=104857600
log.message.timestamp.type=CreateTime log.message.timestamp.type=CreateTime
log.retention.check.interval.ms=300000 log.retention.check.interval.ms=300000
log.retention.bytes=104857600 log.retention.bytes=104857600
log.retention.hours=5000 log.retention.hours=10000
log.roll.hours=168 log.roll.hours=5000
message.max.bytes=1000000 message.max.bytes=1000000
num.network.threads=2 num.network.threads=2
@ -45,6 +45,7 @@ socket.send.buffer.bytes=102400
offsets.storage=kafka offsets.storage=kafka
offsets.topic.num.partitions=1 offsets.topic.num.partitions=1
offsets.topic.replication.factor=3 offsets.topic.replication.factor=3
offsets.retention.minutes=10080
transaction.state.log.num.partitions=1 transaction.state.log.num.partitions=1
replica.fetch.max.bytes=10485760 replica.fetch.max.bytes=10485760

View File

@ -27,8 +27,8 @@ log.index.size.max.bytes=104857600
log.message.timestamp.type=CreateTime log.message.timestamp.type=CreateTime
log.retention.check.interval.ms=300000 log.retention.check.interval.ms=300000
log.retention.bytes=104857600 log.retention.bytes=104857600
log.retention.hours=5000 log.retention.hours=10000
log.roll.hours=168 log.roll.hours=5000
message.max.bytes=1000000 message.max.bytes=1000000
num.network.threads=2 num.network.threads=2
@ -45,6 +45,7 @@ socket.send.buffer.bytes=102400
offsets.storage=kafka offsets.storage=kafka
offsets.topic.num.partitions=1 offsets.topic.num.partitions=1
offsets.topic.replication.factor=3 offsets.topic.replication.factor=3
offsets.retention.minutes=10080
transaction.state.log.num.partitions=1 transaction.state.log.num.partitions=1
replica.fetch.max.bytes=10485760 replica.fetch.max.bytes=10485760

View File

@ -27,8 +27,8 @@ log.index.size.max.bytes=104857600
log.message.timestamp.type=CreateTime log.message.timestamp.type=CreateTime
log.retention.check.interval.ms=300000 log.retention.check.interval.ms=300000
log.retention.bytes=104857600 log.retention.bytes=104857600
log.retention.hours=5000 log.retention.hours=10000
log.roll.hours=168 log.roll.hours=5000
message.max.bytes=1000000 message.max.bytes=1000000
num.network.threads=2 num.network.threads=2
@ -45,6 +45,7 @@ socket.send.buffer.bytes=102400
offsets.storage=kafka offsets.storage=kafka
offsets.topic.num.partitions=1 offsets.topic.num.partitions=1
offsets.topic.replication.factor=3 offsets.topic.replication.factor=3
offsets.retention.minutes=10080
transaction.state.log.num.partitions=1 transaction.state.log.num.partitions=1
replica.fetch.max.bytes=10485760 replica.fetch.max.bytes=10485760

View File

@ -27,8 +27,8 @@ log.index.size.max.bytes=104857600
log.message.timestamp.type=CreateTime log.message.timestamp.type=CreateTime
log.retention.check.interval.ms=300000 log.retention.check.interval.ms=300000
log.retention.bytes=104857600 log.retention.bytes=104857600
log.retention.hours=5000 log.retention.hours=10000
log.roll.hours=168 log.roll.hours=5000
message.max.bytes=1000000 message.max.bytes=1000000
num.network.threads=2 num.network.threads=2
@ -45,6 +45,7 @@ socket.send.buffer.bytes=102400
offsets.storage=kafka offsets.storage=kafka
offsets.topic.num.partitions=2 offsets.topic.num.partitions=2
offsets.topic.replication.factor=1 offsets.topic.replication.factor=1
offsets.retention.minutes=10080
transaction.state.log.num.partitions=2 transaction.state.log.num.partitions=2
replica.fetch.max.bytes=10485760 replica.fetch.max.bytes=10485760