stream.c: Moved doSizeLimitProcessing check to strmWrite

The check was done in strmPhysWrite before which caused syslog
messages to split in the middle if the syslog message batch exceeded
the default IO Buffer size.

closes: https://github.com/rsyslog/rsyslog/issues/4233
This commit is contained in:
Andre lorbach 2020-03-30 14:34:08 +02:00
parent 26bd4b61bb
commit b84c9debea
3 changed files with 73 additions and 4 deletions

View File

@ -81,6 +81,7 @@ DEFobjCurrIf(zlibw)
static rsRetVal strmFlushInternal(strm_t *pThis, int bFlushZip);
static rsRetVal strmWrite(strm_t *__restrict__ const pThis, const uchar *__restrict__ const pBuf,
const size_t lenBuf);
static rsRetVal strmOpenFile(strm_t *pThis);
static rsRetVal strmCloseFile(strm_t *pThis);
static void *asyncWriterThread(void *pPtr);
static rsRetVal doZipWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf, int bFlush);
@ -278,7 +279,10 @@ doPhysOpen(strm_t *pThis)
iFlags |= O_NONBLOCK;
}
if(pThis->bAsyncWrite)d_pthread_mutex_lock(&pThis->mut);
pThis->fd = open((char*)pThis->pszCurrFName, iFlags | O_LARGEFILE, pThis->tOpenMode);
if(pThis->bAsyncWrite) d_pthread_mutex_unlock(&pThis->mut);
const int errno_save = errno; /* dbgprintf can mangle it! */
DBGPRINTF("file '%s' opened as #%d with mode %d\n", pThis->pszCurrFName,
pThis->fd, (int) pThis->tOpenMode);
@ -1238,6 +1242,7 @@ ENDobjConstruct(strm)
*/
static rsRetVal strmConstructFinalize(strm_t *pThis)
{
pthread_mutexattr_t mutAttr;
rsRetVal localRet;
int i;
DEFiRet;
@ -1283,7 +1288,12 @@ static rsRetVal strmConstructFinalize(strm_t *pThis)
/* if we work asynchronously, we need a couple of synchronization objects */
if(pThis->bAsyncWrite) {
pthread_mutex_init(&pThis->mut, 0);
/* the mutex must be recursive, because objects may call into other
* object identifiers recursively.
*/
pthread_mutexattr_init(&mutAttr);
pthread_mutexattr_settype(&mutAttr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&pThis->mut, &mutAttr);
pthread_cond_init(&pThis->notFull, 0);
pthread_cond_init(&pThis->notEmpty, 0);
pthread_cond_init(&pThis->isEmpty, 0);
@ -1766,8 +1776,6 @@ strmPhysWrite(strm_t *pThis, uchar *pBuf, size_t lenBuf)
if(pThis->sType == STREAMTYPE_FILE_CIRCULAR) {
CHKiRet(strmCheckNextOutputFile(pThis));
} else if(pThis->iSizeLimit != 0) {
CHKiRet(doSizeLimitProcessing(pThis));
}
finalize_it:
@ -2151,6 +2159,10 @@ strmWrite(strm_t *__restrict__ const pThis, const uchar *__restrict__ const pBuf
if(pThis->iBufPtr == pThis->sIOBufSize) {
CHKiRet(strmFlushInternal(pThis, 0)); /* get a new buffer for rest of data */
}
if(pThis->fd != -1 && pThis->iSizeLimit != 0) { /* Only check if fd already set */
CHKiRet(doSizeLimitProcessing(pThis));
}
finalize_it:
if(pThis->bAsyncWrite) {

View File

@ -458,6 +458,7 @@ TESTS += \
parsertest-parse-nodate-udp.sh \
parsertest-snare_ccoff_udp.sh \
parsertest-snare_ccoff_udp2.sh
if ENABLE_LIBGCRYPT
TESTS += \
queue-encryption-disk.sh \
@ -1002,7 +1003,8 @@ TESTS += \
rscript_random.sh \
rscript_hash32.sh \
rscript_hash64.sh \
rscript_replace.sh
rscript_replace.sh \
omfile-outchannel-many.sh
if HAVE_VALGRIND
TESTS += \
imptcp_conndrop-vg.sh
@ -1737,6 +1739,7 @@ EXTRA_DIST= \
omfile-whitespace-filename.sh \
omfile-read-only.sh \
omfile-outchannel.sh \
omfile-outchannel-many.sh \
omfile_both_files_set.sh \
omfile_hup.sh \
omrabbitmq_no_params.sh \

54
tests/omfile-outchannel-many.sh Executable file
View File

@ -0,0 +1,54 @@
#!/bin/bash
# addd 2018-08-02 by RGerhards, released under ASL 2.0
. ${srcdir:=.}/diag.sh init
export NUMMESSAGES=500000
echo "ls -l $RSYSLOG_DYNNAME.channel.*
mv -f $RSYSLOG_DYNNAME.channel.log.prev.9 $RSYSLOG_DYNNAME.channel.log.prev.10 2>/dev/null
mv -f $RSYSLOG_DYNNAME.channel.log.prev.8 $RSYSLOG_DYNNAME.channel.log.prev.9 2>/dev/null
mv -f $RSYSLOG_DYNNAME.channel.log.prev.7 $RSYSLOG_DYNNAME.channel.log.prev.8 2>/dev/null
mv -f $RSYSLOG_DYNNAME.channel.log.prev.6 $RSYSLOG_DYNNAME.channel.log.prev.7 2>/dev/null
mv -f $RSYSLOG_DYNNAME.channel.log.prev.5 $RSYSLOG_DYNNAME.channel.log.prev.6 2>/dev/null
mv -f $RSYSLOG_DYNNAME.channel.log.prev.4 $RSYSLOG_DYNNAME.channel.log.prev.5 2>/dev/null
mv -f $RSYSLOG_DYNNAME.channel.log.prev.3 $RSYSLOG_DYNNAME.channel.log.prev.4 2>/dev/null
mv -f $RSYSLOG_DYNNAME.channel.log.prev.2 $RSYSLOG_DYNNAME.channel.log.prev.3 2>/dev/null
mv -f $RSYSLOG_DYNNAME.channel.log.prev.1 $RSYSLOG_DYNNAME.channel.log.prev.2 2>/dev/null
mv -f $RSYSLOG_DYNNAME.channel.log.prev $RSYSLOG_DYNNAME.channel.log.prev.1 2>/dev/null
mv -f $RSYSLOG_DYNNAME.channel.log $RSYSLOG_DYNNAME.channel.log.prev
" > $RSYSLOG_DYNNAME.rotate.sh
chmod +x $RSYSLOG_DYNNAME.rotate.sh
generate_conf
add_conf '
main_queue(
queue.workerthreads="4"
queue.timeoutWorkerthreadShutdown="-1"
queue.workerThreadMinimumMessages="10"
)
module(load="../plugins/imptcp/.libs/imptcp")
input(type="imptcp" port="0" listenPortFileName="'$RSYSLOG_DYNNAME'.tcpflood_port" ruleset="tcp")
template(name="outfmt" type="string" string="%msg:F,58:2%\n")
module(load="builtin:omfile" template="outfmt")
$outchannel log_rotation,'$RSYSLOG_DYNNAME.channel.log', $NUMMESSAGES,./'$RSYSLOG_DYNNAME.rotate.sh'
if $msg contains "msgnum:" then {
# if $/num % 2 == 0 then
:omfile:$log_rotation
# else
# :omfile:$log_rotation2
# set $/num = $/num + 1;
}
ruleset(name="tcp") {
:omfile:$log_rotation2
}
'
startup
./tcpflood -p$TCPFLOOD_PORT -m$NUMMESSAGES & # TCPFlood needs to run async!
#./msleep 2500
injectmsg
sleep 1
shutdown_when_empty
wait_shutdown
ls -l $RSYSLOG_DYNNAME.channel.*
cat $RSYSLOG_DYNNAME.channel.* > $RSYSLOG_OUT_LOG
seq_check
exit_test