omkafka: Add ability to dump librdkafka statistics to a file
Use statsFile to specify statistics output file; also requires setting statistics.interval.ms confparam to a non-zero value.
Setting timestamp to 0 now lets kafka handle this.
Also added "RD_KAFKA_V_KEY(NULL,0) if no key is configured.
testbench: Changed kafka server configuration "log.retention.hours"
property to 5000 which avoids that the log cleaner is deleting our
records before they can be processed.
Fixed test configurations (socket timeouts with newer kafka).
Reactivated some kafka tests that work now.
omkafka: Fixed "closeTimeout" setting in action shutdown
commit 87c8f478ebd0111c9c8d521020acffa42a4508c9 fixed missing mutex
definition, but did not do so via the macros originally intended for
this purpose. It still was very useful, as the patch made omkafka
compile on systems without atomic operations. And did so without
causing any real overhead.
Nevertheless this commit now adjust to use the rsyslog-core intended
way of doing things.
omkafka emits many useful operational status messages only to the debug
log. After careful review, we have exposed many of these as user error
and warning message (ex: librdkafka queue full, so user knows why we
suspend the plugin temporarily). This may have made the module too
chatty. If so, one can always filter out messages via configuration. And
if we really went overboard, we can undo those parts with the next
release. So IMHO it's better to give a bit more than less, as this
definitely eases troubleshooting for users.
closes https://github.com/rsyslog/rsyslog/issues/2314
omkafka has several issue if multiple worker instances are used. This commit
actually make the module use a single worker thread at max. Reasoning:
Librdkafka creates background threads itself. So omkafka basically needs to move
memory buffers over to librdkafka, which then does the heavy hauling. As such, we
think that it is best to run max one wrkr instance of omkafka -- otherwise we just
get additional locking (contention) overhead without any real gain. As such,
we use a global mutex for doAction which ensures only one worker can be active
at any given time. That mutex is also used to guard utility functions (like
tryResume) which may also be accessed by multiple workers in parallel.
Note: shall this method be changed, the kafka connection/suspension handling needs
to be refactored. The current code assumes that all workers share state information
including librdkafka handles.
closes https://github.com/rsyslog/rsyslog/issues/2313
if kafka produce fails when resubmitting messages, the message object
is duplicated. This potentially leads to a mem leak or message duplication
(not fully checked yet).
The failed message list is improperly cleaned. This is a regression
from recent commit 4eae19e089b5a83da679fe29398c6b2c10003793, which
was introduced in 8.31.0.
This problem is more likely to happen under heavy load or bad
connectivity, when the local librdkafka queue overruns or message
delivery times out.
closes https://github.com/rsyslog/rsyslog/issues/2184
closes https://github.com/rsyslog/rsyslog/issues/2067
These are not even recognized when used and leads to a config startup error message:
* closeTimeout
* reopenOnHup
* resubmitOnFailure
* keepFailedMessages
* failedMsgFile
Reason was invalid use of upper case in config param block.
fixes https://github.com/rsyslog/rsyslog/issues/2052
Whenever a message could (temporarily) not be delivered to kafka,
a non-trivial amount of memory was leaked. This could sum up to
quite a big memory leak.
fixes https://github.com/rsyslog/rsyslog/issues/1991