Merge branch 'master' into v8-stable

This commit is contained in:
Rainer Gerhards 2023-06-19 12:58:19 +02:00
commit a29e469524
73 changed files with 3671 additions and 306 deletions

View File

@ -29,7 +29,7 @@ on:
jobs:
run:
runs-on: ubuntu-18.04
runs-on: ubuntu-latest
timeout-minutes: 50
strategy:
# When set to true, cancel all in-progress jobs if any matrix job fails.

View File

@ -62,7 +62,7 @@ jobs:
software-properties-common \
valgrind \
wget \
zstd
zstd
- name: git checkout project
uses: actions/checkout@v1

View File

@ -29,7 +29,7 @@ on:
jobs:
check_run:
runs-on: ubuntu-18.04
runs-on: ubuntu-latest
timeout-minutes: 50
steps:

View File

@ -1,4 +1,72 @@
----------------------------------------------------------------------------------------
Scheduled Release 8.2306.0 (aka 2023.06) 2023-06-??
- 2023-06-19: mmnormalize bugfix: if msg cannot be parsed, parser chain is stopped
When an parser is not able to parse a message, it should indicate this
to rsyslog core, which then activates the next parser(s) inside the
configured parser chain.
Unfortunatley, mmnormalize always tells core "success", and so no
other parsers are activated.
closes https://github.com/rsyslog/rsyslog/issues/5148
- 2023-06-19: [i/o]mhiredis: various fixes and enhancements
please see the change log for details. Among others, suspending of the modules
has been fixed. Also a new "stream" mode has been added.
Thanks to Théo Bertin (frikilax) for the patch.
- 2023-06-19: testbench/bug: mmexternal-SegFault-empty-jroot-vg.sh fails due to typo
Fix the typo that makes the test fail.
Thanks to Paul Fertser for the patch.
- 2023-06-16: imjournal: Add FileCreateMode module parameter
FileCreateMode allows to set the default file mode bits
when creating new files. As of now, it has only impact on the state file.
Add test suite as well.
Minor indentation fix in run_journal.yml
Thanks to Attila Lakatos for the patch.
- 2023-06-16: core bugfix: potential segfault on busy systems
This was discovered by Konstantin J. Chernov in a practicaly deployment.
Here, msg object tag processing caused sporadic segfaults. We did not
hear from similiar cases, but there clearly is potential for problems
because a mutex lock had insufficient range, thus leading to a potential
race.
The patch is directly from Konstantin J. Chernov, thanks for that.
Please note that the mutex lock could be minimized as it is not strictly
needed for the pM == NULL case, but this cause is extremely exotic
and the resulting code would be harder to understand. Thus we opt
to do the locking on funtion level (as usual).
Descriptiond edited by Rainer Gerhards
closes: https://github.com/rsyslog/rsyslog/issues/5110
- 2023-06-16: Add new global config option "libcapng.default"
Defines how rsyslog should behave in case something went wrong
when capabilities were to be dropped. Default value is "on",
in which case rsyslog exits on a libcapng related error.
Thanks to Attila Lakatos for the patch.
Closes https://github.com/rsyslog/rsyslog/issues/5096
- 2023-06-05: imfile bugfix: file handle leak, primarily in kubernetes context
At this point there is a code imfile.c#L919 that adds an inotify observer to the
parent of the symbolic link target. But there is no such code that removes this
observer in the case when inotify events do not occur in the directory tree above.
This may be if the directory tree of the symbolic link target and the directory tree
of the symbolic link itself are divided into different subtrees somewhere at the levels
above.
For example, in the rsyslog configuration, an imfile with the
template /var/log/containers/*.log is configured and there is the following directory
tree:
/var/log/pods/pod-1/a/0.log
/var/log/containers/pod-1-a-0.log -> /var/log/pods/pod-1/a/0.log
In this example, kubernetes cron jobs will permanently delete directories at the
/var/log/pods/pod-* level. And thus, inotify observer on the parent object of the
symbolic link target (/var/log/pods/pod-1/a/0.log) looking at the directory
/var/log/pods/pod-1/a will constantly leak.
This is due to the fact that the list of active objects in the edge with path
/var/log/containers, where the parent object of the target symbolic link is added,
is not checked. Verification and deletion will occur only in the case of an inotify
event in the upper nodes of the directory tree, in /var/log and above.
Thanks to Sergey Kacheev for the patch!
- 2023-06-05: GNUTls Driver: Fix memory leaks in gtlsInitCred
Missing CA Certificate or multiple Connections caused
a memory leak in pThis->xcred as it was allocated each time in
gtlsInitCred by gnutls_certificate_allocate_credentials
closes: https://github.com/rsyslog/rsyslog/issues/5135
- 2023-05-24: CI: update base ubuntu image for github actions
----------------------------------------------------------------------------------------
Scheduled Release 8.2304.0 (aka 2023.04) 2023-04-18
- 2023-04-17: imptcp bugfix: spam log on oversize message
If an oversize message was received by imptcp, imptcp reported

View File

@ -2,9 +2,9 @@
# Process this file with autoconf to produce a configure script.
AC_PREREQ(2.61)
AC_INIT([rsyslog],[8.2304.0],[rsyslog@lists.adiscon.com]) # UPDATE on release
AC_INIT([rsyslog],[8.2306.0.master],[rsyslog@lists.adiscon.com]) # UPDATE on release
AC_DEFINE(VERSION_YEAR, 23, [year part of real rsyslog version]) # UPDATE on release
AC_DEFINE(VERSION_MONTH, 4, [month part of real rsyslog version]) # UPDATE on release
AC_DEFINE(VERSION_MONTH, 6, [month part of real rsyslog version]) # UPDATE on release
AM_INIT_AUTOMAKE([subdir-objects])
@ -2519,6 +2519,17 @@ AC_ARG_ENABLE(omhiredis,
[enable_omhiredis=no]
)
AC_ARG_ENABLE(redis_tests,
[AS_HELP_STRING([--enable-redis-tests],[Enable redis tests, needs redis-server @<:@default=no@:>@])],
[case "${enableval}" in
yes) enable_redis_tests="yes" ;;
no) enable_redis_tests="no" ;;
*) AC_MSG_ERROR(bad value ${enableval} for --enable-redis-tests) ;;
esac],
[enable_redis_tests=no]
)
AM_CONDITIONAL(ENABLE_REDIS_TESTS, test x$enable_redis_tests = xyes)
if test "x$enable_omhiredis" = "xyes" -o "x$enable_imhiredis" = "xyes" ; then
PKG_CHECK_MODULES(HIREDIS, hiredis >= 0.10.1, [],
[AC_SEARCH_LIBS(redisConnectWithTimeout, hiredis,
@ -2555,6 +2566,21 @@ if test "x$enable_imhiredis" = "xyes" ; then
[AC_MSG_ERROR([no libevent >= 2.0 found with pthreads support, imhiredis cannot use pub/sub])])
fi
if test "x$enable_imhiredis" = "xyes" || test "x$enable_omhiredis" = "xyes"; then
if test "x$enable_redis_tests" = "xyes"; then
AC_CHECK_PROG(REDIS, [redis-server], [yes], [no])
if test "x${REDIS}" = "xno"; then
AC_MSG_FAILURE([redis-server, which is a redis-tests dependency, not found])
fi
fi
else
if test "x$enable_redis_tests" = "xyes"; then
AC_MSG_WARN([redis-tests can not be enabled without imhiredis or omhiredis support.
Disabling enable_redis_tests...])
enable_redis_tests="no"
fi
fi
AM_CONDITIONAL(ENABLE_OMHIREDIS, test x$enable_omhiredis = xyes)
AM_CONDITIONAL(ENABLE_IMHIREDIS, test x$enable_imhiredis = xyes)
@ -2891,6 +2917,7 @@ echo " omhttpfs module will be compiled: $enable_omhttpfs"
echo " omamqp1 module will be compiled: $enable_omamqp1"
echo " omtcl module will be compiled: $enable_omtcl"
echo " omkafka module will be compiled: $enable_omkafka"
echo " omhiredis module will be compiled: $enable_omhiredis"
echo
echo "---{ parser modules }---"
echo " pmlastmsg module will be compiled: $enable_pmlastmsg"
@ -2957,6 +2984,7 @@ echo " Elasticsearch Tests: $enable_elasticsearch_tests"
echo " ClickHouse Tests: $enable_clickhouse_tests"
echo " PostgreSQL Tests enabled: $enable_pgsql_tests"
echo " Kafka Tests enabled: $enable_kafka_tests"
echo " Redis Tests enabled: $enable_redis_tests"
echo " Imdocker Tests enabled: $enable_imdocker_tests"
echo " gnutls tests enabled: $enable_gnutls_tests"
echo " imfile tests enabled: $enable_imfile_tests"

File diff suppressed because it is too large Load Diff

View File

@ -30,6 +30,7 @@
#include <assert.h>
#include <signal.h>
#include <time.h>
#include <math.h>
#include <hiredis/hiredis.h>
#include "rsyslog.h"
@ -53,6 +54,7 @@ DEF_OMOD_STATIC_DATA
#define OMHIREDIS_MODE_QUEUE 1
#define OMHIREDIS_MODE_PUBLISH 2
#define OMHIREDIS_MODE_SET 3
#define OMHIREDIS_MODE_STREAM 4
/* our instance data.
* this will be accessable
@ -64,10 +66,34 @@ typedef struct _instanceData {
uchar *tplName; /* template name */
char *modeDescription; /* mode description */
int mode; /* mode constant */
uchar *key; /* key for QUEUE and PUBLISH modes */
uchar *key; /* key for QUEUE, PUBLISH and STREAM modes */
uchar *streamKeyAck; /* key name for STREAM ACKs (when enabled) */
uchar *streamGroupAck; /* group name for STREAM ACKs (when enabled) */
uchar *streamIndexAck; /* index name for STREAM ACKs (when enabled) */
int expiration; /* expiration value for SET/SETEX mode */
sbool dynaKey; /* Should we treat the key as a template? */
sbool streamDynaKeyAck; /* Should we treat the groupAck as a template? */
sbool streamDynaGroupAck; /* Should we treat the groupAck as a template? */
sbool streamDynaIndexAck; /* Should we treat the IndexAck as a template? */
sbool useRPush; /* Should we use RPUSH instead of LPUSH? */
uchar *streamOutField; /* Field to place message into (for stream insertions only) */
uint streamCapacityLimit; /* zero means stream is not capped (default)
setting a non-zero value ultimately activates the approximate MAXLEN option '~'
(see Redis XADD docs)*/
sbool streamAck; /* Should the module send an XACK for each inserted message?
This feature requires that 3 infos are present in the '$.' object of the log:
- $.redis!stream
- $.redis!group
- $.redis!index
Those 3 infos can either be provided through usage of imhiredis
or set manually with Rainerscript */
sbool streamDel; /* Should the module send an XDEL for each inserted message?
This feature requires that 2 infos are present in the '$.' object of the log:
- $.redis!stream
- $.redis!index
Those 2 infos can either be provided through usage of imhiredis
or set manually with Rainerscript */
} instanceData;
typedef struct wrkrInstanceData {
@ -86,6 +112,16 @@ static struct cnfparamdescr actpdescr[] = {
{ "expiration", eCmdHdlrInt, 0 },
{ "dynakey", eCmdHdlrBinary, 0 },
{ "userpush", eCmdHdlrBinary, 0 },
{ "stream.outField", eCmdHdlrGetWord, 0 },
{ "stream.capacityLimit", eCmdHdlrNonNegInt, 0 },
{ "stream.ack", eCmdHdlrBinary, 0 },
{ "stream.del", eCmdHdlrBinary, 0 },
{ "stream.keyAck", eCmdHdlrGetWord, 0 },
{ "stream.groupAck", eCmdHdlrGetWord, 0 },
{ "stream.indexAck", eCmdHdlrGetWord, 0 },
{ "stream.dynaKeyAck", eCmdHdlrBinary, 0 },
{ "stream.dynaGroupAck", eCmdHdlrBinary, 0 },
{ "stream.dynaIndexAck", eCmdHdlrBinary, 0 },
};
static struct cnfparamblk actpblk = {
@ -127,6 +163,11 @@ CODESTARTfreeInstance
free(pData->key);
free(pData->modeDescription);
free(pData->serverpassword);
free(pData->tplName);
free(pData->streamKeyAck);
free(pData->streamGroupAck);
free(pData->streamIndexAck);
free(pData->streamOutField);
ENDfreeInstance
BEGINfreeWrkrInstance
@ -143,14 +184,14 @@ ENDdbgPrintInstInfo
static rsRetVal initHiredis(wrkrInstanceData_t *pWrkrData, int bSilent)
{
char *server;
char *serverpasswd;
redisReply *reply = NULL;
DEFiRet;
server = (pWrkrData->pData->server == NULL) ? (char *)"127.0.0.1" :
(char*) pWrkrData->pData->server;
DBGPRINTF("omhiredis: trying connect to '%s' at port %d\n", server,
pWrkrData->pData->port);
struct timeval timeout = { 1, 500000 }; /* 1.5 seconds */
pWrkrData->conn = redisConnectWithTimeout(server, pWrkrData->pData->port,
timeout);
@ -162,18 +203,55 @@ static rsRetVal initHiredis(wrkrInstanceData_t *pWrkrData, int bSilent)
}
if (pWrkrData->pData->serverpassword != NULL) {
serverpasswd = (char*) pWrkrData->pData->serverpassword;
int rc;
rc = redisAppendCommand(pWrkrData->conn, "AUTH %s", serverpasswd);
if (rc == REDIS_ERR) {
LogError(0, NO_ERRCODE, "omhiredis: %s", pWrkrData->conn->errstr);
reply = redisCommand(pWrkrData->conn, "AUTH %s", (char*) pWrkrData->pData->serverpassword);
if (reply == NULL) {
DBGPRINTF("omhiredis: could not get reply from AUTH command\n");
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
else if (reply->type == REDIS_REPLY_ERROR) {
LogError(0, NO_ERRCODE, "omhiredis: error while authenticating: %s", reply->str);
ABORT_FINALIZE(RS_RET_ERR);
} else {
pWrkrData->count++;
}
}
finalize_it:
if (iRet != RS_RET_OK && pWrkrData-> conn != NULL) {
redisFree(pWrkrData->conn);
pWrkrData->conn = NULL;
}
if (reply != NULL) freeReplyObject(reply);
RETiRet;
}
static rsRetVal isMaster(wrkrInstanceData_t *pWrkrData) {
DEFiRet;
redisReply *reply = NULL;
assert(pWrkrData->conn != NULL);
reply = redisCommand(pWrkrData->conn, "ROLE");
if (reply == NULL) {
DBGPRINTF("omhiredis: could not get reply from ROLE command\n");
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
else if (reply->type == REDIS_REPLY_ERROR) {
LogMsg(0, RS_RET_REDIS_ERROR, LOG_WARNING, "omhiredis: got an error while querying role -> "
"%s\n", reply->str);
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
else if (reply->type != REDIS_REPLY_ARRAY || reply->element[0]->type != REDIS_REPLY_STRING) {
LogMsg(0, RS_RET_REDIS_ERROR, LOG_ERR, "omhiredis: did not get a proper reply from ROLE command");
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
else {
if (strncmp(reply->element[0]->str, "master", 6)) {
LogMsg(0, RS_RET_OK, LOG_WARNING, "omhiredis: current connected node is not a master");
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
}
finalize_it:
free(reply);
RETiRet;
}
@ -220,6 +298,20 @@ static rsRetVal writeHiredis(uchar* key, uchar *message, wrkrInstanceData_t *pWr
rc = REDIS_ERR;
}
break;
case OMHIREDIS_MODE_STREAM:
if (pWrkrData->pData->streamCapacityLimit != 0) {
rc = redisAppendCommand(pWrkrData->conn, "XADD %s MAXLEN ~ %d * %s %s",
key,
pWrkrData->pData->streamCapacityLimit,
pWrkrData->pData->streamOutField,
message);
} else {
rc = redisAppendCommand(pWrkrData->conn, "XADD %s * %s %s",
key,
pWrkrData->pData->streamOutField,
message);
}
break;
default:
dbgprintf("omhiredis: mode %d is invalid something is really wrong\n",
pWrkrData->pData->mode);
@ -239,12 +331,47 @@ finalize_it:
RETiRet;
}
static rsRetVal ackHiredisStreamIndex(wrkrInstanceData_t *pWrkrData, uchar *key, uchar *group, uchar *index) {
DEFiRet;
if (REDIS_ERR == redisAppendCommand(pWrkrData->conn, "XACK %s %s %s", key, group, index)) {
LogError(0, NO_ERRCODE, "omhiredis: %s", pWrkrData->conn->errstr);
DBGPRINTF("omhiredis: %s\n", pWrkrData->conn->errstr);
ABORT_FINALIZE(RS_RET_ERR);
} else {
pWrkrData->count++;
}
finalize_it:
RETiRet;
}
static rsRetVal delHiredisStreamIndex(wrkrInstanceData_t *pWrkrData, uchar *key, uchar *index) {
DEFiRet;
if (REDIS_ERR == redisAppendCommand(pWrkrData->conn, "XDEL %s %s", key, index)) {
LogError(0, NO_ERRCODE, "omhiredis: %s", pWrkrData->conn->errstr);
DBGPRINTF("omhiredis: %s\n", pWrkrData->conn->errstr);
ABORT_FINALIZE(RS_RET_ERR);
} else {
pWrkrData->count++;
}
finalize_it:
RETiRet;
}
/* called when resuming from suspended state.
* try to restablish our connection to redis */
BEGINtryResume
CODESTARTtryResume
if(pWrkrData->conn == NULL)
iRet = initHiredis(pWrkrData, 0);
closeHiredis(pWrkrData);
CHKiRet(initHiredis(pWrkrData, 0));
// Must get a master node for all modes, except 'publish'
if(pWrkrData->pData->mode != OMHIREDIS_MODE_PUBLISH) {
CHKiRet(isMaster(pWrkrData));
}
finalize_it:
ENDtryResume
/* begin a transaction.
@ -261,13 +388,25 @@ ENDbeginTransaction
* which appends it as a command to the
* current pipeline */
BEGINdoAction
uchar *message, *key, *keyNameAck, *groupNameAck, *IndexNameAck;
int inputIndex = 0;
CODESTARTdoAction
if(pWrkrData->pData->dynaKey) {
CHKiRet(writeHiredis(ppString[1], ppString[0], pWrkrData));
// Don't change the order of conditions/assignations here without changing the end of the newActInst function!
message = ppString[inputIndex++];
key = pWrkrData->pData->dynaKey ? ppString[inputIndex++] : pWrkrData->pData->key;
keyNameAck = pWrkrData->pData->streamDynaKeyAck ? ppString[inputIndex++] : pWrkrData->pData->streamKeyAck;
groupNameAck = pWrkrData->pData->streamDynaGroupAck ? ppString[inputIndex++] : pWrkrData->pData->streamGroupAck;
IndexNameAck = pWrkrData->pData->streamDynaIndexAck ? ppString[inputIndex++] : pWrkrData->pData->streamIndexAck;
CHKiRet(writeHiredis(key, message, pWrkrData));
if(pWrkrData->pData->streamAck) {
CHKiRet(ackHiredisStreamIndex(pWrkrData, keyNameAck, groupNameAck, IndexNameAck));
}
else {
CHKiRet(writeHiredis(pWrkrData->pData->key, ppString[0], pWrkrData));
if(pWrkrData->pData->streamDel) {
CHKiRet(delHiredisStreamIndex(pWrkrData, keyNameAck, IndexNameAck));
}
iRet = RS_RET_DEFER_COMMIT;
finalize_it:
ENDdoAction
@ -284,13 +423,19 @@ CODESTARTendTransaction
redisReply *reply;
int i;
for ( i = 0; i < pWrkrData->count; i++ ) {
redisGetReply ( pWrkrData->conn, (void*)&reply);
if( pWrkrData->conn->err ){
if( REDIS_OK != redisGetReply( pWrkrData->conn, (void*)&reply) || pWrkrData->conn->err ) {
dbgprintf("omhiredis: %s\n", pWrkrData->conn->errstr);
LogError(0, RS_RET_REDIS_ERROR, "Error while processing replies: %s", pWrkrData->conn->errstr);
closeHiredis(pWrkrData);
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
else {
if (reply->type == REDIS_REPLY_ERROR) {
LogError(0, RS_RET_REDIS_ERROR, "Received error from redis -> %s", reply->str);
closeHiredis(pWrkrData);
freeReplyObject(reply);
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
freeReplyObject(reply);
}
}
@ -313,7 +458,18 @@ setInstParamDefaults(instanceData *pData)
pData->expiration = 0;
pData->modeDescription = NULL;
pData->key = NULL;
pData->dynaKey = 0;
pData->useRPush = 0;
pData->streamOutField = NULL;
pData->streamKeyAck = NULL;
pData->streamDynaKeyAck = 0;
pData->streamGroupAck = NULL;
pData->streamDynaGroupAck = 0;
pData->streamIndexAck = NULL;
pData->streamDynaIndexAck = 0;
pData->streamCapacityLimit = 0;
pData->streamAck = 0;
pData->streamDel = 0;
}
/* here is where the work to set up a new instance
@ -324,7 +480,7 @@ BEGINnewActInst
struct cnfparamvals *pvals;
int i;
int iNumTpls;
uchar *keydup = NULL;
uchar *strDup = NULL;
CODESTARTnewActInst
if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL)
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
@ -335,7 +491,7 @@ CODESTARTnewActInst
for(i = 0 ; i < actpblk.nParams ; ++i) {
if(!pvals[i].bUsed)
continue;
if(!strcmp(actpblk.descr[i].name, "server")) {
pData->server = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "serverport")) {
@ -348,6 +504,26 @@ CODESTARTnewActInst
pData->dynaKey = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "userpush")) {
pData->useRPush = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "stream.outField")) {
pData->streamOutField = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "stream.keyAck")) {
pData->streamKeyAck = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "stream.dynaKeyAck")) {
pData->streamDynaKeyAck = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "stream.groupAck")) {
pData->streamGroupAck = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "stream.dynaGroupAck")) {
pData->streamDynaGroupAck = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "stream.indexAck")) {
pData->streamIndexAck = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "stream.dynaIndexAck")) {
pData->streamDynaIndexAck = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "stream.capacityLimit")) {
pData->streamCapacityLimit = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "stream.ack")) {
pData->streamAck = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "stream.del")) {
pData->streamDel = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "mode")) {
pData->modeDescription = es_str2cstr(pvals[i].val.d.estr, NULL);
if (!strcmp(pData->modeDescription, "template")) {
@ -358,6 +534,8 @@ CODESTARTnewActInst
pData->mode = OMHIREDIS_MODE_PUBLISH;
} else if (!strcmp(pData->modeDescription, "set")) {
pData->mode = OMHIREDIS_MODE_SET;
} else if (!strcmp(pData->modeDescription, "stream")) {
pData->mode = OMHIREDIS_MODE_STREAM;
} else {
dbgprintf("omhiredis: unsupported mode %s\n", actpblk.descr[i].name);
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
@ -380,51 +558,167 @@ CODESTARTnewActInst
pData->mode = OMHIREDIS_MODE_TEMPLATE;
}
/* check config sanity for selected mode */
switch(pData->mode) {
case OMHIREDIS_MODE_QUEUE:
case OMHIREDIS_MODE_PUBLISH:
case OMHIREDIS_MODE_SET:
if (pData->key == NULL) {
dbgprintf("omhiredis: mode %s requires a key\n", pData->modeDescription);
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
}
if (pData->tplName == NULL) {
dbgprintf("omhiredis: using default RSYSLOG_ForwardFormat template\n");
CHKmalloc(pData->tplName = ustrdup("RSYSLOG_ForwardFormat"));
}
if (pData->expiration && strcmp(pData->modeDescription, "set")) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: expiration set but mode is not "\
"'set', expiration will be ignored");
}
break;
case OMHIREDIS_MODE_TEMPLATE:
if (pData->tplName == NULL) {
dbgprintf("omhiredis: selected mode requires template\n");
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
}
break;
if (pData->mode == OMHIREDIS_MODE_STREAM && !pData->streamOutField) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: no stream.outField set, "\
"using 'msg' as default");
pData->streamOutField = ustrdup("msg");
}
if (pData->tplName == NULL) {
if(pData->mode == OMHIREDIS_MODE_TEMPLATE) {
LogError(0, RS_RET_CONF_PARSE_ERROR, "omhiredis: selected mode requires a template");
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
} else {
CHKmalloc(pData->tplName = ustrdup("RSYSLOG_ForwardFormat"));
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: no template set, "\
"using RSYSLOG_ForwardFormat as default");
}
}
if (pData->mode != OMHIREDIS_MODE_TEMPLATE && pData->key == NULL) {
LogError(0, RS_RET_CONF_PARSE_ERROR,
"omhiredis: mode %s requires a key", pData->modeDescription);
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
}
if (pData->expiration && pData->mode != OMHIREDIS_MODE_SET) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: expiration set but mode is not "\
"'set', expiration will be ignored");
}
if (pData->mode != OMHIREDIS_MODE_STREAM) {
if (pData->streamOutField) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.outField set "\
"but mode is not 'stream', field will be ignored");
}
if (pData->streamAck) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.ack set "\
"but mode is not 'stream', XACK will be ignored");
}
if (pData->streamDel) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.del set "\
"but mode is not 'stream', XDEL will be ignored");
}
if (pData->streamCapacityLimit) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.capacityLimit set "\
"but mode is not 'stream', stream trimming will be ignored");
}
if (pData->streamKeyAck) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.keyAck set "\
"but mode is not 'stream', parameter will be ignored");
}
if (pData->streamDynaKeyAck) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.dynaKeyAck set "\
"but mode is not 'stream', parameter will be ignored");
}
if (pData->streamGroupAck) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.groupAck set "\
"but mode is not 'stream', parameter will be ignored");
}
if (pData->streamDynaGroupAck) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.dynaGroupAck set "\
"but mode is not 'stream', parameter will be ignored");
}
if (pData->streamIndexAck) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.indexAck set "\
"but mode is not 'stream', parameter will be ignored");
}
if (pData->streamDynaIndexAck) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: stream.dynaIndexAck set "\
"but mode is not 'stream', parameter will be ignored");
}
} else {
if(pData->streamAck) {
if(!pData->streamKeyAck || !pData->streamGroupAck || !pData->streamIndexAck) {
LogError(0, RS_RET_CONF_PARSE_ERROR,
"omhiredis: 'stream.ack' is set but one of "\
"'stream.keyAck', 'stream.groupAck' or 'stream.indexAck' is missing");
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
}
}
if(pData->streamDel) {
if(!pData->streamKeyAck || !pData->streamIndexAck) {
LogError(0, RS_RET_CONF_PARSE_ERROR,
"omhiredis: 'stream.del' is set but one of "\
"'stream.keyAck' or 'stream.indexAck' is missing");
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
}
}
}
if (pData->streamDynaKeyAck && pData->streamKeyAck == NULL) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: 'stream.dynaKeyAck' set "\
"but 'stream.keyAck' is empty, disabling");
pData->streamDynaKeyAck = 0;
}
if (pData->streamDynaGroupAck && pData->streamGroupAck == NULL) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: 'stream.dynaGroupAck' set "\
"but 'stream.groupAck' is empty, disabling");
pData->streamDynaGroupAck = 0;
}
if (pData->streamDynaIndexAck && pData->streamIndexAck == NULL) {
LogError(0, RS_RET_CONF_PARSE_WARNING, "omhiredis: 'stream.dynaGroupAck' set "\
"but 'stream.indexAck' is empty, disabling");
pData->streamDynaIndexAck = 0;
}
iNumTpls = 1;
if (pData->dynaKey) {
assert(pData->key != NULL);
iNumTpls = 2;
iNumTpls += 1;
}
if (pData->streamDynaKeyAck) {
assert(pData->streamKeyAck != NULL);
iNumTpls += 1;
}
if (pData->streamDynaGroupAck) {
assert(pData->streamGroupAck != NULL);
iNumTpls += 1;
}
if (pData->streamDynaIndexAck) {
assert(pData->streamIndexAck != NULL);
iNumTpls += 1;
}
CODE_STD_STRING_REQUESTnewActInst(iNumTpls);
CHKiRet(OMSRsetEntry(*ppOMSR, 0, (uchar*)pData->tplName, OMSR_NO_RQD_TPL_OPTS));
/* Insert templates in opposite order (keep in sync with doAction), order will be
* - tplName
* - key
* - streamKeyAck
* - streamGroupAck
* - streamIndexAck
*/
if (pData->streamDynaIndexAck) {
CHKmalloc(strDup = ustrdup(pData->streamIndexAck));
CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, strDup, OMSR_NO_RQD_TPL_OPTS));
strDup = NULL; /* handed over */
}
if (pData->streamDynaGroupAck) {
CHKmalloc(strDup = ustrdup(pData->streamGroupAck));
CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, strDup, OMSR_NO_RQD_TPL_OPTS));
strDup = NULL; /* handed over */
}
if (pData->streamDynaKeyAck) {
CHKmalloc(strDup = ustrdup(pData->streamKeyAck));
CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, strDup, OMSR_NO_RQD_TPL_OPTS));
strDup = NULL; /* handed over */
}
if (pData->dynaKey) {
CHKmalloc(keydup = ustrdup(pData->key));
CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->key), OMSR_NO_RQD_TPL_OPTS));
keydup = NULL; /* handed over */
CHKmalloc(strDup = ustrdup(pData->key));
CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, strDup, OMSR_NO_RQD_TPL_OPTS));
strDup = NULL; /* handed over */
}
CHKiRet(OMSRsetEntry(*ppOMSR, --iNumTpls, ustrdup(pData->tplName), OMSR_NO_RQD_TPL_OPTS));
CODE_STD_FINALIZERnewActInst
cnfparamvalsDestruct(pvals, &actpblk);
free(keydup);
free(strDup);
ENDnewActInst

View File

@ -852,10 +852,13 @@ detect_updates(fs_edge_t *const edge)
* the old file in case a process is still writing into it until the FILE_DELETE_DELAY
* is reached OR the inode has changed (see elseif below). In most cases, the
* delay will never be reached and the file will be closed when the inode has changed.
* Directories are deleted without delay.
*/
if (act->time_to_delete + FILE_DELETE_DELAY < ttNow) {
DBGPRINTF("detect_updates obj gone away, unlinking: '%s', ttDelete: %lds, ttNow:%ld\n",
act->name, ttNow - (act->time_to_delete + FILE_DELETE_DELAY), ttNow);
sbool is_file = act->edge->is_file;
if (!is_file || act->time_to_delete + FILE_DELETE_DELAY < ttNow) {
DBGPRINTF("detect_updates obj gone away, unlinking: "
"'%s', ttDelete: %lds, ttNow:%ld isFile: %d\n",
act->name, ttNow - (act->time_to_delete + FILE_DELETE_DELAY), ttNow, is_file);
act_obj_unlink(act);
restart = 1;
} else {
@ -1038,9 +1041,9 @@ act_obj_destroy(act_obj_t *const act, const int is_deleted)
act_obj_t *target_act;
for(target_act = act->edge->active ; target_act != NULL ; target_act = target_act->next) {
if(target_act->source_name && !strcmp(target_act->source_name, act->name)) {
DBGPRINTF("act_obj_destroy: unlinking slink target %s of %s "
"symlink\n", target_act->name, act->name);
act_obj_unlink(target_act);
DBGPRINTF("act_obj_destroy: detect_updates for parent of target %s of %s symlink\n",
target_act->name, act->name);
detect_updates(target_act->edge->parent->root->edges);
break;
}
}

View File

@ -35,6 +35,7 @@
#include <sys/socket.h>
#include <errno.h>
#include <systemd/sd-journal.h>
#include <fcntl.h>
#include "dirty.h"
#include "cfsysline.h"
@ -73,6 +74,7 @@ struct modConfData_s {
static struct configSettings_s {
char *stateFile;
int fCreateMode; /* default mode to use when creating new files, e.g. stateFile */
int iPersistStateInterval;
unsigned int ratelimitInterval;
unsigned int ratelimitBurst;
@ -92,6 +94,7 @@ static rsRetVal facilityHdlr(uchar **pp, void *pVal);
/* module-global parameters */
static struct cnfparamdescr modpdescr[] = {
{ "statefile", eCmdHdlrGetWord, 0 },
{ "filecreatemode", eCmdHdlrFileCreateMode, 0 },
{ "ratelimit.interval", eCmdHdlrInt, 0 },
{ "ratelimit.burst", eCmdHdlrInt, 0 },
{ "persiststateinterval", eCmdHdlrInt, 0 },
@ -523,8 +526,10 @@ static rsRetVal
persistJournalState(void)
{
DEFiRet;
FILE *sf = NULL; /* state file */
char tmp_sf[MAXFNAME];
int fd = -1;
size_t len;
ssize_t wr_ret;
DBGPRINTF("Persisting journal position, cursor: %s, at head? %d\n",
journalContext.cursor, journalContext.atHead);
@ -556,19 +561,20 @@ persistJournalState(void)
(int)(sizeof(tmp_sf) - sizeof(IM_SF_TMP_SUFFIX)),
cs.stateFile, IM_SF_TMP_SUFFIX);
sf = fopen(tmp_sf, "wb");
if (sf == NULL) {
LogError(errno, RS_RET_FOPEN_FAILURE, "imjournal: fopen() failed for path: '%s'", tmp_sf);
ABORT_FINALIZE(RS_RET_FOPEN_FAILURE);
fd = open((char*) tmp_sf, O_WRONLY|O_CREAT|O_CLOEXEC, cs.fCreateMode);
if (fd == -1) {
LogError(errno, RS_RET_FILE_OPEN_ERROR, "imjournal: open() failed for path: '%s'", tmp_sf);
ABORT_FINALIZE(RS_RET_FILE_OPEN_ERROR);
}
if(fputs(journalContext.cursor, sf) == EOF) {
LogError(errno, RS_RET_IO_ERROR, "imjournal: failed to save cursor to: '%s'", tmp_sf);
len = strlen(journalContext.cursor);
wr_ret = write(fd, journalContext.cursor, len);
if (wr_ret != (ssize_t)len) {
LogError(errno, RS_RET_IO_ERROR, "imjournal: failed to save cursor to: '%s',"
"write returned %zd, expected %zu", cs.stateFile, wr_ret, len);
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
fflush(sf);
/* change the name of the file to the configured one */
if (rename(tmp_sf, cs.stateFile) < 0) {
LogError(errno, iRet, "imjournal: rename() failed for new path: '%s'", cs.stateFile);
@ -576,7 +582,7 @@ persistJournalState(void)
}
if (cs.bFsync) {
if (fsync(fileno(sf)) != 0) {
if (fsync(fd) != 0) {
LogError(errno, RS_RET_IO_ERROR, "imjournal: fsync on '%s' failed", cs.stateFile);
ABORT_FINALIZE(RS_RET_IO_ERROR);
}
@ -599,9 +605,9 @@ persistJournalState(void)
DBGPRINTF("Persisted journal to '%s'\n", cs.stateFile);
finalize_it:
if (sf != NULL) {
if (fclose(sf) == EOF) {
LogError(errno, RS_RET_IO_ERROR, "imjournal: fclose() failed for path: '%s'", tmp_sf);
if (fd != -1) {
if (close(fd) == -1) {
LogError(errno, RS_RET_IO_ERROR, "imjournal: close() failed for path: '%s'", tmp_sf);
iRet = RS_RET_IO_ERROR;
}
}
@ -898,6 +904,7 @@ CODESTARTbeginCnfLoad
cs.bIgnoreNonValidStatefile = 1;
cs.iPersistStateInterval = DFLT_persiststateinterval;
cs.stateFile = NULL;
cs.fCreateMode = -1;
cs.ratelimitBurst = 20000;
cs.ratelimitInterval = 600;
cs.iDfltSeverity = DFLT_SEVERITY;
@ -1039,6 +1046,8 @@ CODESTARTsetModCnf
cs.iPersistStateInterval = (int) pvals[i].val.d.n;
} else if (!strcmp(modpblk.descr[i].name, "statefile")) {
cs.stateFile = (char *)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(modpblk.descr[i].name, "filecreatemode")) {
cs.fCreateMode = (int) pvals[i].val.d.n;
} else if(!strcmp(modpblk.descr[i].name, "ratelimit.burst")) {
cs.ratelimitBurst = (unsigned int) pvals[i].val.d.n;
} else if(!strcmp(modpblk.descr[i].name, "ratelimit.interval")) {
@ -1074,6 +1083,14 @@ CODESTARTsetModCnf
}
}
/* File create mode is not set */
if (cs.fCreateMode == -1) {
const int fCreateMode = 0644;
LogMsg(0, RS_RET_OK_WARN, LOG_WARNING, "imjournal: filecreatemode is not set, "
"using default %04o", fCreateMode);
cs.fCreateMode = fCreateMode;
}
finalize_it:
if (pvals != NULL)
cnfparamvalsDestruct(pvals, &modpblk);

View File

@ -234,10 +234,11 @@ CODESTARTparse2
"json: %s\n", r, fjson_object_to_json_string(json));
}
fjson_object_put(json);
ABORT_FINALIZE(RS_RET_COULD_NOT_PARSE);
} else {
iRet = MsgSetPropsViaJSON_Object(pMsg, json);
}
finalize_it:
ENDparse2

View File

@ -178,7 +178,8 @@ static struct cnfparamdescr cnfparamdescr[] = {
{ "parser.supportcompressionextension", eCmdHdlrBinary, 0 },
{ "shutdown.queue.doublesize", eCmdHdlrBinary, 0 },
{ "debug.files", eCmdHdlrArray, 0 },
{ "debug.whitelist", eCmdHdlrBinary, 0 }
{ "debug.whitelist", eCmdHdlrBinary, 0 },
{ "libcapng.default", eCmdHdlrBinary, 0 }
};
static struct cnfparamblk paramblk =
{ CNFPARAMBLK_VERSION,
@ -1183,6 +1184,13 @@ glblDoneLoadCnf(void)
if(!strcmp(paramblk.descr[i].name, "workdirectory")) {
cstr = (uchar*) es_str2cstr(cnfparamvals[i].val.d.estr, NULL);
setWorkDir(NULL, cstr);
} else if(!strcmp(paramblk.descr[i].name, "libcapng.default")) {
#ifdef ENABLE_LIBCAPNG
loadConf->globals.bAbortOnFailedLibcapngSetup = (int) cnfparamvals[i].val.d.n;
#else
LogError(0, RS_RET_ERR, "rsyslog wasn't "
"compiled with libcap-ng support.");
#endif
} else if(!strcmp(paramblk.descr[i].name, "variables.casesensitive")) {
const int val = (int) cnfparamvals[i].val.d.n;
fjson_global_do_case_sensitive_comparison(val);

View File

@ -2552,12 +2552,15 @@ tryEmulateTAG(smsg_t *const pM, const sbool bLockMutex)
void ATTR_NONNULL(2,3)
getTAG(smsg_t * const pM, uchar **const ppBuf, int *const piLen, const sbool bLockMutex)
{
if(bLockMutex == LOCK_MUTEX)
MsgLock(pM);
if(pM == NULL) {
*ppBuf = UCHAR_CONSTANT("");
*piLen = 0;
} else {
if(pM->iLenTAG == 0)
tryEmulateTAG(pM, bLockMutex);
tryEmulateTAG(pM, MUTEX_ALREADY_LOCKED);
if(pM->iLenTAG == 0) {
*ppBuf = UCHAR_CONSTANT("");
*piLen = 0;
@ -2566,6 +2569,9 @@ getTAG(smsg_t * const pM, uchar **const ppBuf, int *const piLen, const sbool bLo
*piLen = pM->iLenTAG;
}
}
if(bLockMutex == LOCK_MUTEX)
MsgUnlock(pM);
}

View File

@ -711,7 +711,10 @@ gtlsInitCred(nsd_gtls_t *const pThis )
DEFiRet;
/* X509 stuff */
CHKgnutls(gnutls_certificate_allocate_credentials(&pThis->xcred));
if (pThis->xcred == NULL) {
/* Allocate only ONCE */
CHKgnutls(gnutls_certificate_allocate_credentials(&pThis->xcred));
}
/* sets the trusted cas file */
cafile = (pThis->pszCAFile == NULL) ? glbl.GetDfltNetstrmDrvrCAF(runConf) : pThis->pszCAFile;
@ -2277,7 +2280,12 @@ finalize_it:
if(pThis->bHaveSess) {
gnutls_deinit(pThis->sess);
pThis->bHaveSess = 0;
/* Free memory using gnutls api first*/
gnutls_certificate_free_credentials(pThis->xcred);
pThis->xcred = NULL;
/* Free other memory */
free(pThis->pszConnectHost);
pThis->pszConnectHost = NULL;
}
}

View File

@ -159,6 +159,9 @@ int rsconfNeedDropPriv(rsconf_t *const cnf)
static void cnfSetDefaults(rsconf_t *pThis)
{
#ifdef ENABLE_LIBCAPNG
pThis->globals.bAbortOnFailedLibcapngSetup = 1;
#endif
pThis->globals.bAbortOnUncleanConfig = 0;
pThis->globals.bAbortOnFailedQueueStartup = 0;
pThis->globals.bReduceRepeatMsgs = 0;

View File

@ -84,6 +84,9 @@ struct parsercnf_s {
* be re-set as often as the user likes).
*/
struct globals_s {
#ifdef ENABLE_LIBCAPNG
int bAbortOnFailedLibcapngSetup;
#endif
int bDebugPrintTemplateList;
int bDebugPrintModuleList;
int bDebugPrintCfSysLineHandlerList;

View File

@ -1055,6 +1055,74 @@ TESTS += \
endif # HAVE_VALGRIND
endif # ENABLE_OMRABBITMQ
if ENABLE_REDIS_TESTS
if ENABLE_IMHIREDIS
TESTS += \
imhiredis-queue.sh \
imhiredis-queue-lpop.sh \
imhiredis-redis-restart.sh \
imhiredis-redis-start-after.sh \
imhiredis-subscribe.sh \
imhiredis-stream.sh \
imhiredis-stream-from-beginning.sh \
imhiredis-stream-consumerGroup-ack.sh \
imhiredis-stream-consumerGroup-noack.sh \
imhiredis-stream-consumerGroup-reclaim.sh
if HAVE_VALGRIND
TESTS += \
imhiredis-queue-vg.sh \
imhiredis-queue-lpop-vg.sh \
imhiredis-redis-restart-vg.sh \
imhiredis-redis-start-after-vg.sh \
imhiredis-subscribe-vg.sh \
imhiredis-stream-vg.sh \
imhiredis-stream-from-beginning-vg.sh \
imhiredis-stream-consumerGroup-ack-vg.sh \
imhiredis-stream-consumerGroup-noack-vg.sh \
imhiredis-stream-consumerGroup-reclaim-vg.sh
endif # HAVE_VALGRIND
endif # ENABLE_IMHIREDIS
if ENABLE_OMHIREDIS
TESTS += \
mmdb-reload.sh \
omhiredis-dynakey.sh \
omhiredis-publish.sh \
omhiredis-queue-rpush.sh \
omhiredis-queue.sh \
omhiredis-set.sh \
omhiredis-setex.sh \
omhiredis-template.sh \
omhiredis-withpass.sh \
omhiredis-wrongpass.sh \
omhiredis-stream-ack.sh \
omhiredis-stream-capped.sh \
omhiredis-stream-del.sh \
omhiredis-stream-dynack.sh \
omhiredis-stream-outfield.sh \
omhiredis-stream.sh
if HAVE_VALGRIND
TESTS += \
mmdb-reload-vg.sh \
omhiredis-dynakey-vg.sh \
omhiredis-publish-vg.sh \
omhiredis-queue-rpush-vg.sh \
omhiredis-queue-vg.sh \
omhiredis-set-vg.sh \
omhiredis-setex-vg.sh \
omhiredis-template-vg.sh \
omhiredis-withpass-vg.sh \
omhiredis-wrongpass-vg.sh \
omhiredis-stream-ack-vg.sh \
omhiredis-stream-capped-vg.sh \
omhiredis-stream-del-vg.sh \
omhiredis-stream-dynack-vg.sh \
omhiredis-stream-outfield-vg.sh \
omhiredis-stream-vg.sh
endif # HAVE_VALGRIND
endif # ENABLE_OMHIREDIS
endif # ENABLE_REDIS_TESTS
if ENABLE_IMPSTATS
TESTS += \
impstats-hup.sh \
@ -1573,6 +1641,7 @@ TESTS += \
imfile-rename.sh \
imfile-symlink.sh \
imfile-symlink-multi.sh \
imfile-symlink-ext-tmp-dir-tree.sh \
imfile-logrotate.sh \
imfile-logrotate-async.sh \
imfile-logrotate-multiple.sh \
@ -2033,6 +2102,26 @@ EXTRA_DIST= \
omrabbitmq_error_server3.sh \
omrabbitmq_json.sh \
omrabbitmq_raw.sh \
imhiredis-queue.sh \
imhiredis-queue-vg.sh \
imhiredis-queue-lpop.sh \
imhiredis-queue-lpop-vg.sh \
imhiredis-redis-restart.sh \
imhiredis-redis-restart-vg.sh \
imhiredis-redis-start-after.sh \
imhiredis-redis-start-after-vg.sh \
imhiredis-subscribe.sh \
imhiredis-subscribe-vg.sh \
imhiredis-stream.sh \
imhiredis-stream-vg.sh \
imhiredis-stream-from-beginning.sh \
imhiredis-stream-from-beginning-vg.sh \
imhiredis-stream-consumerGroup-ack.sh \
imhiredis-stream-consumerGroup-ack-vg.sh \
imhiredis-stream-consumerGroup-noack.sh \
imhiredis-stream-consumerGroup-noack-vg.sh \
imhiredis-stream-consumerGroup-reclaim.sh \
imhiredis-stream-consumerGroup-reclaim-vg.sh \
msgvar-concurrency.sh \
msgvar-concurrency-array.sh \
testsuites/msgvar-concurrency-array.rulebase \
@ -2589,6 +2678,7 @@ EXTRA_DIST= \
imfile-rename.sh \
imfile-symlink.sh \
imfile-symlink-multi.sh \
imfile-symlink-ext-tmp-dir-tree.sh \
imfile-logrotate.sh \
imfile-logrotate-async.sh \
imfile-logrotate-copytruncate.sh \

View File

@ -41,15 +41,19 @@ make check
Running named tests
===================
make testname.sh.log
make testname.log
For example, to run the imfile-basic.sh test, use
make imfile-basic.sh.log
make imfile-basic.log
Test output is in imfile-basic.sh.log
Test output is in imfile-basic.log
To re-run the test, first remove imfile-basic.sh.log then make again
To re-run the test, first remove imfile-basic.log then make again
Or an alternative option is to run
make check TESTS='imfile-basic.sh'
* Using gdb to debug rsyslog during a test run

View File

@ -893,6 +893,39 @@ check_journal_testmsg_received() {
fi;
}
# checks that among the open files found in /proc/<PID>/fd/*
# there is or is not, depending on the calling mode,
# a link with the specified suffix in the target name
check_fd_for_pid() {
local pid="$1" mode="$2" suffix="$3" target seen
seen="false"
for fd in $(echo /proc/$pid/fd/*); do
target="$(readlink -m "$fd")"
if [[ "$target" != *$RSYSLOG_DYNNAME* ]]; then
continue
fi
if ((i % 10 == 0)); then
echo "INFO: check target='$target'"
fi
if [[ "$target" == *$suffix ]]; then
seen="true"
if [[ "$mode" == "exists" ]]; then
echo "PASS: check fd for pid=$pid mode='$mode' suffix='$suffix'"
return 0
fi
fi
done
if [[ "$seen" == "false" ]] && [[ "$mode" == "absent" ]]; then
echo "PASS: check fd for pid=$pid mode='$mode' suffix='$suffix'"
return 0
fi
echo "FAIL: check fd for pid=$pid mode='$mode' suffix='$suffix'"
if [[ "$mode" != "ignore" ]]; then
return 1
fi
return 0
}
# wait for main message queue to be empty. $1 is the instance.
# we run in a loop to ensure rsyslog is *really* finished when a
# function for the "finished predicate" is defined. This is done
@ -1316,6 +1349,11 @@ error_exit() {
# Extended Exit handling for kafka / zookeeper instances
kafka_exit_handling "false"
# Ensure redis instance is stopped
if [ -n "$REDIS_DYN_DIR" ]; then
stop_redis
fi
error_stats $1 # Report error to rsyslog testbench stats
do_cleanup
@ -1535,6 +1573,9 @@ exit_test() {
# Extended Exit handling for kafka / zookeeper instances
kafka_exit_handling "true"
# Ensure redis is stopped
stop_redis
printf '%s Test %s SUCCESSFUL (took %s seconds)\n' "$(tb_timestamp)" "$0" "$(( $(date +%s) - TB_STARTTEST ))"
echo -------------------------------------------------------------------------------
exit 0
@ -2421,6 +2462,97 @@ mysql_cleanup_test() {
}
start_redis() {
check_command_available redis-server
export REDIS_DYN_CONF="${RSYSLOG_DYNNAME}.redis.conf"
export REDIS_DYN_DIR="$(pwd)/${RSYSLOG_DYNNAME}-redis"
# Only set a random port if not set (useful when Redis must be restarted during a test)
if [ -z "$REDIS_RANDOM_PORT" ]; then
export REDIS_RANDOM_PORT="$(get_free_port)"
fi
cp $srcdir/testsuites/redis.conf $REDIS_DYN_CONF
mkdir -p $REDIS_DYN_DIR
sed -itemp "s+<tmpdir>+${REDIS_DYN_DIR}+g" $REDIS_DYN_CONF
sed -itemp "s+<rndport>+${REDIS_RANDOM_PORT}+g" $REDIS_DYN_CONF
# Start the server
echo "Starting redis with conf file $REDIS_DYN_CONF"
redis-server $REDIS_DYN_CONF &
$TESTTOOL_DIR/msleep 2000
# Wait for Redis to be fully up
timeoutend=10
until nc -w1 -z 127.0.0.1 $REDIS_RANDOM_PORT; do
echo "Waiting for Redis to start..."
$TESTTOOL_DIR/msleep 1000
(( timeseconds=timeseconds + 2 ))
if [ "$timeseconds" -gt "$timeoutend" ]; then
echo "--- TIMEOUT ( $timeseconds ) reached!!!"
if [ ! -d ${REDIS_DYN_DIR}/redis.log ]; then
echo "no Redis logs"
else
echo "Dumping ${REDIS_DYN_DIR}/redis.log"
echo "========================================="
cat ${REDIS_DYN_DIR}/redis.log
echo "========================================="
fi
error_exit 1
fi
done
}
cleanup_redis() {
if [ -d ${REDIS_DYN_DIR} ]; then
rm -rf ${REDIS_DYN_DIR}
fi
if [ -f ${REDIS_DYN_CONF} ]; then
rm -f ${REDIS_DYN_CONF}
fi
}
stop_redis() {
if [ -f "$REDIS_DYN_DIR/redis.pid" ]; then
redispid=$(cat $REDIS_DYN_DIR/redis.pid)
echo "Stopping Redis instance"
kill $redispid
i=0
# Check if redis instance went down!
while [ -f $REDIS_DYN_DIR/redis.pid ]; do
redispid=$(cat $REDIS_DYN_DIR/redis.pid)
if [[ "" != "$redispid" ]]; then
$TESTTOOL_DIR/msleep 100 # wait 100 milliseconds
if test $i -gt $TB_TIMEOUT_STARTSTOP; then
echo "redis server (PID $redispid) still running - Performing hard shutdown (-9)"
kill -9 $redispid
break
fi
(( i++ ))
else
# Break the loop
break
fi
done
fi
}
redis_command() {
check_command_available redis-cli
if [ -z "$1" ]; then
echo "redis_command: no command provided!"
error_exit 1
fi
printf "$1\n" | redis-cli -p "$REDIS_RANDOM_PORT"
}
# $1 - replacement string
# $2 - start search string
# $3 - file name

View File

@ -0,0 +1,80 @@
#!/bin/bash
# This test creates multiple symlinks (all watched by rsyslog via wildcard)
# chained to target files via additional symlinks and checks that all files
# are recorded with correct corresponding metadata (name of symlink
# matching configuration).
# This is part of the rsyslog testbench, released under ASL 2.0
. "${srcdir:=.}"/diag.sh init
. "$srcdir"/diag.sh check-inotify
# #define FILE_DELETE_DELAY 5 /* how many seconds to wait before finally deleting a gone file */
export RSYSLOG_DEBUG="debug nologfuncflow noprintmutexaction nostdout"
export RSYSLOG_DEBUGLOG="log"
export TEST_TIMEOUT=30
# generate input files first. Note that rsyslog processes it as
# soon as it start up (so the file should exist at that point).
generate_conf
add_conf '
# comment out if you need more debug info:
global( debug.whitelist="on" debug.files=["imfile.c"]
workDirectory="./'"$RSYSLOG_DYNNAME"'.work"
)
module(load="../plugins/imfile/.libs/imfile" mode="inotify")
input(type="imfile" File="./'"$RSYSLOG_DYNNAME"'.links/*.log" Tag="file:"
Severity="error" Facility="local7" addMetadata="on")
template(name="outfmt" type="list") {
constant(value="HEADER ")
property(name="msg" format="json")
constant(value=", filename: ")
property(name="$!metadata!filename")
constant(value=", fileoffset: ")
property(name="$!metadata!fileoffset")
constant(value="\n")
}
if $msg contains "msgnum:" then
action( type="omfile" file="'"$RSYSLOG_DYNNAME.out/$RSYSLOG_OUT_LOG"'" template="outfmt")
'
mkdir "$RSYSLOG_DYNNAME".links "$RSYSLOG_DYNNAME".work "$RSYSLOG_DYNNAME".out
printf '\ncreating %s\n' "$RSYSLOG_DYNNAME".targets/container-1/logs/0.log
mkdir -p "$RSYSLOG_DYNNAME".targets/container-1/logs
./inputfilegen -m 1 >"$RSYSLOG_DYNNAME".targets/container-1/logs/0.log
ls -l "$RSYSLOG_DYNNAME".targets/container-1/logs/0.log
ln -sv "$PWD/$RSYSLOG_DYNNAME".targets/container-1/logs/0.log "$PWD/$RSYSLOG_DYNNAME".links/container-1.log
printf '%s generated link %s\n' "$(tb_timestamp)" "container-1"
ls -l "$RSYSLOG_DYNNAME".links/container-1.log
# Start rsyslog now
startup
PID=$(cat "$RSYSLOG_PIDBASE".pid)
echo "Rsyslog pid $RSYSLOG_PIDBASE.pid=$PID"
if [[ "$PID" == "" ]]; then
error_exit 1
fi
echo "INFO: check files"
# wait until this files has been opened
check_fd_for_pid "$PID" exists "container-1/logs/0.log"
check_fd_for_pid "$PID" exists "container-1/logs"
echo "INFO: remove watched files"
rm -vr "$RSYSLOG_DYNNAME".targets/container-1
rm -v "$RSYSLOG_DYNNAME".links/container-1.log
until check_fd_for_pid "$PID" absent "container-1/logs (deleted)"; do
if ((_wait_for_absent++ > TEST_TIMEOUT)); then
error_exit 1
fi
echo "INFO: trigger fd unlinking"
./inputfilegen -m 1 >"$RSYSLOG_DYNNAME".links/gogogo.log
./msleep 1000
rm -v "$RSYSLOG_DYNNAME".links/gogogo.log
./msleep 10
done
shutdown_when_empty
wait_shutdown
exit_test

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/imhiredis-queue-lpop.sh

52
tests/imhiredis-queue-lpop.sh Executable file
View File

@ -0,0 +1,52 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
redis_command "RPUSH mykey message1"
redis_command "RPUSH mykey message2"
redis_command "RPUSH mykey message3"
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/imhiredis/.libs/imhiredis")
template(name="outfmt" type="string" string="%$/num% %msg%\n")
input(type="imhiredis"
server="127.0.0.1"
port="'$REDIS_RANDOM_PORT'"
key="mykey"
mode="queue"
uselpop="on"
ruleset="redis")
ruleset(name="redis") {
set $/num = cnum($/num + 1);
action(type="omfile"
file="'$RSYSLOG_OUT_LOG'"
template="outfmt")
}
action(type="omfile" file="'$RSYSLOG_OUT_LOG'" template="outfmt")
'
startup
shutdown_when_empty
wait_shutdown
stop_redis
# Same order
content_check '1 message1'
content_check '2 message2'
content_check '3 message3'
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

7
tests/imhiredis-queue-vg.sh Executable file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/imhiredis-queue.sh

51
tests/imhiredis-queue.sh Executable file
View File

@ -0,0 +1,51 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
redis_command "RPUSH mykey message1"
redis_command "RPUSH mykey message2"
redis_command "RPUSH mykey message3"
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/imhiredis/.libs/imhiredis")
template(name="outfmt" type="string" string="%$/num% %msg%\n")
input(type="imhiredis"
server="127.0.0.1"
port="'$REDIS_RANDOM_PORT'"
key="mykey"
mode="queue"
ruleset="redis")
ruleset(name="redis") {
set $/num = cnum($/num + 1);
action(type="omfile"
file="'$RSYSLOG_OUT_LOG'"
template="outfmt")
}
action(type="omfile" file="'$RSYSLOG_OUT_LOG'" template="outfmt")
'
startup
shutdown_when_empty
wait_shutdown
stop_redis
# Opposite order
content_check '1 message3'
content_check '2 message2'
content_check '3 message1'
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/imhiredis-redis-restart.sh

View File

@ -0,0 +1,69 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
# Start redis once to be able to generate configuration
start_redis
redis_command "RPUSH mykey message1"
redis_command "RPUSH mykey message2"
redis_command "RPUSH mykey message3"
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/imhiredis/.libs/imhiredis")
template(name="outfmt" type="string" string="%$/num% %msg%\n")
input(type="imhiredis"
server="127.0.0.1"
port="'$REDIS_RANDOM_PORT'"
key="mykey"
mode="queue"
ruleset="redis")
ruleset(name="redis") {
set $/num = cnum($/num + 1);
action(type="omfile"
file="'$RSYSLOG_OUT_LOG'"
template="outfmt")
}
action(type="omfile" file="'$RSYSLOG_OUT_LOG'" template="outfmt")
'
startup
wait_content '3 message1'
# Stop Redis and wait a short moment for imhiredis to notice Redis went down
stop_redis
rst_msleep 1500
start_redis
redis_command "RPUSH mykey message4"
redis_command "RPUSH mykey message5"
redis_command "RPUSH mykey message6"
wait_content '4 message6'
shutdown_when_empty
wait_shutdown
stop_redis
content_check '1 message3'
content_check '2 message2'
content_check '3 message1'
content_check 'sleeping 10 seconds before retrying'
content_check '4 message6'
content_check '5 message5'
content_check '6 message4'
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/imhiredis-redis-start-after.sh

View File

@ -0,0 +1,55 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
# Start redis once to be able to generate configuration
start_redis
stop_redis
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/imhiredis/.libs/imhiredis")
template(name="outfmt" type="string" string="%$/num% %msg%\n")
input(type="imhiredis"
server="127.0.0.1"
port="'$REDIS_RANDOM_PORT'"
key="mykey"
mode="queue"
ruleset="redis")
ruleset(name="redis") {
set $/num = cnum($/num + 1);
action(type="omfile"
file="'$RSYSLOG_OUT_LOG'"
template="outfmt")
}
action(type="omfile" file="'$RSYSLOG_OUT_LOG'" template="outfmt")
'
startup
start_redis
redis_command "RPUSH mykey message1"
redis_command "RPUSH mykey message2"
redis_command "RPUSH mykey message3"
wait_content '1 message3'
shutdown_when_empty
wait_shutdown
stop_redis
content_check '1 message3'
content_check '2 message2'
content_check '3 message1'
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/imhiredis-stream-consumerGroup-ack.sh

View File

@ -0,0 +1,67 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/imhiredis/.libs/imhiredis")
template(name="outfmt" type="string" string="%$/num% %$!msg%\n")
input(type="imhiredis"
server="127.0.0.1"
port="'$REDIS_RANDOM_PORT'"
key="mystream"
mode="stream"
stream.consumerGroup="mygroup"
stream.consumerName="myName"
ruleset="redis")
ruleset(name="redis") {
set $/num = cnum($/num + 1);
action(type="omfile"
file="'$RSYSLOG_OUT_LOG'"
template="outfmt")
}
action(type="omfile" file="'$RSYSLOG_OUT_LOG'" template="outfmt")
'
startup
redis_command "XADD mystream * msg message1"
redis_command "XADD mystream * msg message2"
redis_command "XADD mystream * msg message3"
shutdown_when_empty
wait_shutdown
output="$(redis_command 'hello 3\nXINFO groups mystream' | grep 'pending')"
if [ -z "$output" ]; then
echo "Could not get group result from redis, cannot tell if entries ware acknowledged!"
error_exit 1
fi
if ! echo "$output" | grep -q "pending 0"; then
echo "ERROR: entries werent acknowledged!"
echo "ERROR: output from Redis is '$output'"
echo "ERROR: expected 'pending 0'"
error_exit 1
fi
stop_redis
content_check '1 message1'
content_check '2 message2'
content_check '3 message3'
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/imhiredis-stream-consumerGroup-noack.sh

View File

@ -0,0 +1,68 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/imhiredis/.libs/imhiredis")
template(name="outfmt" type="string" string="%$/num% %$!msg%\n")
input(type="imhiredis"
server="127.0.0.1"
port="'$REDIS_RANDOM_PORT'"
key="mystream"
mode="stream"
stream.consumerGroup="mygroup"
stream.consumerName="myName"
stream.consumerACK="off"
ruleset="redis")
ruleset(name="redis") {
set $/num = cnum($/num + 1);
action(type="omfile"
file="'$RSYSLOG_OUT_LOG'"
template="outfmt")
}
action(type="omfile" file="'$RSYSLOG_OUT_LOG'" template="outfmt")
'
startup
redis_command "XADD mystream * msg message1"
redis_command "XADD mystream * msg message2"
redis_command "XADD mystream * msg message3"
shutdown_when_empty
wait_shutdown
output="$(redis_command 'hello 3\nXINFO groups mystream' | grep 'pending')"
if [ -z "$output" ]; then
echo "Could not get group result from redis, cannot tell if entries ware acknowledged!"
error_exit 1
fi
if ! echo "$output" | grep -q "pending 3"; then
echo "ERROR: entries were acknowledged, they shouldn't have!"
echo "ERROR: output from Redis is '$output'"
echo "ERROR: expected 'pending 3'"
error_exit 1
fi
stop_redis
content_check '1 message1'
content_check '2 message2'
content_check '3 message3'
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/imhiredis-stream-consumerGroup-reclaim.sh

View File

@ -0,0 +1,83 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/imhiredis/.libs/imhiredis")
template(name="outfmt" type="string" string="%$/num% %$!msg%\n")
input(type="imhiredis"
server="127.0.0.1"
port="'$REDIS_RANDOM_PORT'"
key="mystream"
mode="stream"
stream.consumerGroup="mygroup"
stream.consumerName="myName"
stream.autoclaimIdleTime="5000" #5 seconds
ruleset="redis")
ruleset(name="redis") {
set $/num = cnum($/num + 1);
action(type="omfile"
file="'$RSYSLOG_OUT_LOG'"
template="outfmt")
}
action(type="omfile" file="'$RSYSLOG_OUT_LOG'" template="outfmt")
'
redis_command "XADD mystream * msg message1"
redis_command "XADD mystream * msg message2"
redis_command "XADD mystream * msg message3"
redis_command "XADD mystream * msg message4"
redis_command "XADD mystream * msg message5"
redis_command "XADD mystream * msg message6"
redis_command "XGROUP CREATE mystream mygroup 0-0"
# Read and claim message1 and message2
redis_command "XREADGROUP GROUP mygroup otherConsumer COUNT 2 STREAMS mystream >"
rst_msleep 5500
# Read and claim message3 and message4
redis_command "XREADGROUP GROUP mygroup otherConsumer COUNT 2 STREAMS mystream >"
startup
shutdown_when_empty
wait_shutdown
output="$(redis_command 'hello 3\nXINFO groups mystream' | grep 'pending')"
if [ -z "$output" ]; then
echo "Could not get group result from redis, cannot tell if entries ware acknowledged!"
error_exit 1
fi
# Should still have 2 pending messages: message3 and message4
if ! echo "$output" | grep -q "pending 2"; then
echo "ERROR: entries weren't acknowledged!"
echo "ERROR: output from Redis is '$output'"
echo "ERROR: expected 'pending 2'"
error_exit 1
fi
stop_redis
# Should reclaim message1 and message2
# then claim and acknowledge message5 and message6 normally
content_check '1 message1'
content_check '2 message2'
content_check '3 message5'
content_check '4 message6'
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/imhiredis-stream-from-beginning.sh

View File

@ -0,0 +1,60 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
# WILL be logged by Rsyslog
redis_command "XADD mystream * msg message1"
redis_command "XADD mystream * msg message2"
redis_command "XADD mystream * msg message3"
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/imhiredis/.libs/imhiredis")
template(name="outfmt" type="string" string="%$/num% %$!msg%\n")
input(type="imhiredis"
server="127.0.0.1"
port="'$REDIS_RANDOM_PORT'"
key="mystream"
mode="stream"
stream.readFrom="0-0"
ruleset="redis")
ruleset(name="redis") {
set $/num = cnum($/num + 1);
action(type="omfile"
file="'$RSYSLOG_OUT_LOG'"
template="outfmt")
}
action(type="omfile" file="'$RSYSLOG_OUT_LOG'" template="outfmt")
'
startup
redis_command "XADD mystream * msg message4"
redis_command "XADD mystream * msg message5"
redis_command "XADD mystream * msg message6"
shutdown_when_empty
wait_shutdown
stop_redis
content_check '1 message1'
content_check '2 message2'
content_check '3 message3'
content_check '4 message4'
content_check '5 message5'
content_check '6 message6'
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

7
tests/imhiredis-stream-vg.sh Executable file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/imhiredis-stream.sh

60
tests/imhiredis-stream.sh Executable file
View File

@ -0,0 +1,60 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
# Won't be logged by Rsyslog
redis_command "XADD mystream * msg message1"
redis_command "XADD mystream * msg message2"
redis_command "XADD mystream * msg message3"
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/imhiredis/.libs/imhiredis")
template(name="outfmt" type="string" string="%$/num% %$!msg%\n")
input(type="imhiredis"
server="127.0.0.1"
port="'$REDIS_RANDOM_PORT'"
key="mystream"
mode="stream"
ruleset="redis")
ruleset(name="redis") {
set $/num = cnum($/num + 1);
action(type="omfile"
file="'$RSYSLOG_OUT_LOG'"
template="outfmt")
}
action(type="omfile" file="'$RSYSLOG_OUT_LOG'" template="outfmt")
'
startup
redis_command "XADD mystream * msg message4"
redis_command "XADD mystream * msg message5"
redis_command "XADD mystream * msg message6"
shutdown_when_empty
wait_shutdown
stop_redis
check_not_present "message1"
check_not_present "message2"
check_not_present "message3"
content_check '1 message4'
content_check '2 message5'
content_check '3 message6'
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/imhiredis-subscribe.sh

60
tests/imhiredis-subscribe.sh Executable file
View File

@ -0,0 +1,60 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
# Won't be logged by Rsyslog
redis_command "PUBLISH mychannel message1"
redis_command "PUBLISH mychannel message2"
redis_command "PUBLISH mychannel message3"
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/imhiredis/.libs/imhiredis")
template(name="outfmt" type="string" string="%$/num% %msg%\n")
input(type="imhiredis"
server="127.0.0.1"
port="'$REDIS_RANDOM_PORT'"
key="mychannel"
mode="subscribe"
ruleset="redis")
ruleset(name="redis") {
set $/num = cnum($/num + 1);
action(type="omfile"
file="'$RSYSLOG_OUT_LOG'"
template="outfmt")
}
action(type="omfile" file="'$RSYSLOG_OUT_LOG'" template="outfmt")
'
startup
redis_command "PUBLISH mychannel message4"
redis_command "PUBLISH mychannel message5"
redis_command "PUBLISH mychannel message6"
shutdown_when_empty
wait_shutdown
stop_redis
check_not_present "message1"
check_not_present "message2"
check_not_present "message3"
content_check '1 message4'
content_check '2 message5'
content_check '3 message6'
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

View File

@ -0,0 +1,37 @@
#!/bin/bash
# Test for checking the fileCreateMode imjournal parameter
# Basically we set 3 different file creation modes for the state file
# and test if those are really set
#
. ${srcdir:=.}/diag.sh init
# $1 - the file create mode that we set for the state file
test_imjournal_filecreatemode() {
local EXPECTED=$1
generate_conf
add_conf '
global(workDirectory="'$RSYSLOG_DYNNAME.spool'")
module(
load="../plugins/imjournal/.libs/imjournal"
StateFile="imjournal.state"
fileCreateMode="'$EXPECTED'"
)
'
startup
local ACTUAL="$(stat -c "%#a" "$RSYSLOG_DYNNAME.spool/imjournal.state")"
if [ "$ACTUAL" != "$EXPECTED" ]; then
echo "imjournal fileCreateMode failed, incorrect permissions on state file: expected($EXPECTED) != actual($ACTUAL)"
error_exit
fi
shutdown_when_empty
wait_shutdown
ls -l "$RSYSLOG_DYNNAME.spool/imjournal.state"
}
test_imjournal_filecreatemode "0600"
test_imjournal_filecreatemode "0640"
test_imjournal_filecreatemode "0644"
exit_test

View File

@ -14,7 +14,7 @@ if $msg contains "msgnum:" then {
}
'
startup_vg
injectmsg litteral "<129>Mar 10 01:00:00 172.20.245.8 tag:msgnum:1"
injectmsg literal "<129>Mar 10 01:00:00 172.20.245.8 tag:msgnum:1"
shutdown_when_empty
wait_shutdown_vg
check_exit_vg

7
tests/omhiredis-dynakey-vg.sh Executable file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhiredis-dynakey.sh

59
tests/omhiredis-dynakey.sh Executable file
View File

@ -0,0 +1,59 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
# export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/omhiredis/.libs/omhiredis")
template(name="outfmt" type="string" string="%msg%")
template(name="dynakey" type="string" string="%$!dynaKey%")
local4.* {
set $!dynaKey = "myDynaKey";
action(type="omhiredis"
server="127.0.0.1"
serverport="'$REDIS_RANDOM_PORT'"
mode="set"
dynakey="on"
key="dynakey"
template="outfmt")
stop
}
action(type="omfile" file="'$RSYSLOG_DYNNAME.othermsg'" template="outfmt")
'
# Should get nothing
redis_command "GET myDynaKey" > $RSYSLOG_OUT_LOG
startup
# Inject 1 message
injectmsg 1 1
shutdown_when_empty
wait_shutdown
# Should get ' msgnum:00000001:'
redis_command "GET myDynaKey" >> $RSYSLOG_OUT_LOG
# The first get is before inserting, the second is after
export EXPECTED="/usr/bin/redis-cli
/usr/bin/redis-cli
msgnum:00000001:"
cmp_exact $RSYSLOG_OUT_LOG
stop_redis
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

7
tests/omhiredis-publish-vg.sh Executable file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhiredis-publish.sh

60
tests/omhiredis-publish.sh Executable file
View File

@ -0,0 +1,60 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
# export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/omhiredis/.libs/omhiredis")
template(name="outfmt" type="string" string="%msg%")
local4.* {
action(type="omhiredis"
server="127.0.0.1"
serverport="'$REDIS_RANDOM_PORT'"
mode="publish"
key="myChannel"
template="outfmt")
stop
}
action(type="omfile" file="'$RSYSLOG_DYNNAME.othermsg'" template="outfmt")
'
(redis_command "SUBSCRIBE myChannel" > $RSYSLOG_OUT_LOG) &
startup
# Inject 3 messages
injectmsg 1 3
shutdown_when_empty
wait_shutdown
# The SUBSCRIBE command gets the result of the subscribing and the 3 subsequent messages published by omhiredis
export EXPECTED="/usr/bin/redis-cli
subscribe
myChannel
1
message
myChannel
msgnum:00000001:
message
myChannel
msgnum:00000002:
message
myChannel
msgnum:00000003:"
cmp_exact $RSYSLOG_OUT_LOG
stop_redis
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhiredis-queue-rpush.sh

58
tests/omhiredis-queue-rpush.sh Executable file
View File

@ -0,0 +1,58 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
# export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/omhiredis/.libs/omhiredis")
template(name="outfmt" type="string" string="%msg%")
local4.* {
action(type="omhiredis"
server="127.0.0.1"
serverport="'$REDIS_RANDOM_PORT'"
mode="queue"
userpush="on"
key="myKey"
template="outfmt")
stop
}
action(type="omfile" file="'$RSYSLOG_DYNNAME.othermsg'" template="outfmt")
'
startup
# Inject 5 messages
injectmsg 1 5
shutdown_when_empty
wait_shutdown
redis_command "LLEN myKey" > $RSYSLOG_OUT_LOG
# try to get 6 (should get only 5)
redis_command "LPOP myKey 6" >> $RSYSLOG_OUT_LOG
# Messages should be retrieved in order (as they were inserted using RPUSH)
export EXPECTED="/usr/bin/redis-cli
5
/usr/bin/redis-cli
msgnum:00000001:
msgnum:00000002:
msgnum:00000003:
msgnum:00000004:
msgnum:00000005:"
cmp_exact $RSYSLOG_OUT_LOG
stop_redis
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

7
tests/omhiredis-queue-vg.sh Executable file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhiredis-queue.sh

62
tests/omhiredis-queue.sh Executable file
View File

@ -0,0 +1,62 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
# export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/omhiredis/.libs/omhiredis")
template(name="outfmt" type="string" string="%msg%")
local4.* {
action(type="omhiredis"
server="127.0.0.1"
serverport="'$REDIS_RANDOM_PORT'"
mode="queue"
key="myKey"
template="outfmt")
stop
}
action(type="omfile" file="'$RSYSLOG_DYNNAME.othermsg'" template="outfmt")
'
startup
# Inject 10 messages
injectmsg 1 10
shutdown_when_empty
wait_shutdown
redis_command "LLEN myKey" > $RSYSLOG_OUT_LOG
# try to get 11 (should get only 10)
redis_command "LPOP myKey 11" >> $RSYSLOG_OUT_LOG
# Messages should be retrieved in reverse order (as they were inserted using LPUSH)
export EXPECTED="/usr/bin/redis-cli
10
/usr/bin/redis-cli
msgnum:00000010:
msgnum:00000009:
msgnum:00000008:
msgnum:00000007:
msgnum:00000006:
msgnum:00000005:
msgnum:00000004:
msgnum:00000003:
msgnum:00000002:
msgnum:00000001:"
cmp_exact $RSYSLOG_OUT_LOG
stop_redis
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

7
tests/omhiredis-set-vg.sh Executable file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhiredis-set.sh

56
tests/omhiredis-set.sh Executable file
View File

@ -0,0 +1,56 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
# export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/omhiredis/.libs/omhiredis")
template(name="outfmt" type="string" string="%msg%")
local4.* {
action(type="omhiredis"
server="127.0.0.1"
serverport="'$REDIS_RANDOM_PORT'"
mode="set"
key="outKey"
template="outfmt")
stop
}
action(type="omfile" file="'$RSYSLOG_DYNNAME.othermsg'" template="outfmt")
'
# Should get nothing
redis_command "GET outKey" > $RSYSLOG_OUT_LOG
startup
# Inject 1 message
injectmsg 1 1
shutdown_when_empty
wait_shutdown
# Should get ' msgnum:00000001:'
redis_command "GET outKey" >> $RSYSLOG_OUT_LOG
# The first get is before inserting, the second is after
export EXPECTED="/usr/bin/redis-cli
/usr/bin/redis-cli
msgnum:00000001:"
cmp_exact $RSYSLOG_OUT_LOG
stop_redis
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

7
tests/omhiredis-setex-vg.sh Executable file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhiredis-setex.sh

75
tests/omhiredis-setex.sh Executable file
View File

@ -0,0 +1,75 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
# export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
EXPIRATION="3"
start_redis
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/omhiredis/.libs/omhiredis")
template(name="outfmt" type="string" string="%msg%")
local4.* {
action(type="omhiredis"
server="127.0.0.1"
serverport="'$REDIS_RANDOM_PORT'"
mode="set"
key="outKey"
expiration="'$EXPIRATION'"
template="outfmt")
stop
}
action(type="omfile" file="'$RSYSLOG_DYNNAME.othermsg'" template="outfmt")
'
# Should get nothing
redis_command "GET outKey" > $RSYSLOG_OUT_LOG
startup
# Inject 1 message
injectmsg 1 1
shutdown_when_empty
wait_shutdown
# Should get the remaining expiration time (over -1, under 5)
ttl=$(redis_command "TTL outKey" | sed '1d')
# Should get ' msgnum:00000001:'
redis_command "GET outKey" >> $RSYSLOG_OUT_LOG
sleep $EXPIRATION
# Should get nothing
redis_command "GET outKey" >> $RSYSLOG_OUT_LOG
if [ $ttl -lt 0 ] || [ $ttl -gt $EXPIRATION ]; then
echo "ERROR: expiration is not in [0:$EXPIRATION] -> $ttl"
error_exit 1
fi
# The first get is before inserting
# The third is while the key is still valid
# The fourth is after the key expired
export EXPECTED="/usr/bin/redis-cli
/usr/bin/redis-cli
msgnum:00000001:
/usr/bin/redis-cli
"
cmp_exact $RSYSLOG_OUT_LOG
stop_redis
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhiredis-stream-ack.sh

97
tests/omhiredis-stream-ack.sh Executable file
View File

@ -0,0 +1,97 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
# export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/omhiredis/.libs/omhiredis")
template(name="outfmt" type="string" string="%msg%")
local4.* {
action(type="omhiredis"
server="127.0.0.1"
serverport="'$REDIS_RANDOM_PORT'"
mode="stream"
key="outStream"
stream.ack="on"
stream.keyAck="inStream"
stream.groupAck="group"
stream.indexAck="1-0"
template="outfmt")
stop
}
action(type="omfile" file="'$RSYSLOG_DYNNAME.othermsg'" template="outfmt")
'
redis_command "XGROUP CREATE inStream group 0 MKSTREAM" > $RSYSLOG_OUT_LOG
redis_command "XADD inStream 1-0 key value" >> $RSYSLOG_OUT_LOG
redis_command "XREADGROUP GROUP group consumerName COUNT 1 STREAMS inStream >" >> $RSYSLOG_OUT_LOG
redis_command "XINFO GROUPS inStream" >> $RSYSLOG_OUT_LOG
startup
# Inject 1 message
injectmsg 1 1
shutdown_when_empty
wait_shutdown
redis_command "XINFO GROUPS inStream" >> $RSYSLOG_OUT_LOG
# 1. create group and stream
# 2. add entry to stream (with index 1-0)
# 3. read it from a consumer group -> index is now pending
# 4. show group infos -> pending shows 1 pending entry
# 4.2. start Rsyslog and send message -> omhiredis acknowledges index 1-0 on group 'group' for stream 'inStream'
# 5. show group infos again -> pending now shows 0 pending entries
export EXPECTED="/usr/bin/redis-cli
OK
/usr/bin/redis-cli
1-0
/usr/bin/redis-cli
inStream
1-0
key
value
/usr/bin/redis-cli
name
group
consumers
1
pending
1
last-delivered-id
1-0
entries-read
1
lag
0
/usr/bin/redis-cli
name
group
consumers
1
pending
0
last-delivered-id
1-0
entries-read
1
lag
0"
cmp_exact $RSYSLOG_OUT_LOG
stop_redis
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhiredis-stream-capped.sh

View File

@ -0,0 +1,53 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
# export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/omhiredis/.libs/omhiredis")
template(name="outfmt" type="string" string="%msg%")
local4.* {
action(type="omhiredis"
server="127.0.0.1"
serverport="'$REDIS_RANDOM_PORT'"
mode="stream"
key="outStream"
stream.capacityLimit="128"
template="outfmt")
stop
}
action(type="omfile" file="'$RSYSLOG_DYNNAME.othermsg'" template="outfmt")
'
startup
# Inject 1000 messages
injectmsg 1 1000
shutdown_when_empty
wait_shutdown
count=$(redis_command "XLEN outStream" | grep -o "[0-9]*")
# Should be less than 1000 (close to 128, but not 128)
# 500 still means that stream length was capped
if [ ! "${count}" -le 500 ]; then
echo "error: stream has too much entries -> $count"
error_exit 1
fi
stop_redis
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhiredis-stream-del.sh

70
tests/omhiredis-stream-del.sh Executable file
View File

@ -0,0 +1,70 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
# export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/omhiredis/.libs/omhiredis")
template(name="outfmt" type="string" string="%msg%")
local4.* {
action(type="omhiredis"
server="127.0.0.1"
serverport="'$REDIS_RANDOM_PORT'"
mode="stream"
key="outStream"
stream.del="on"
stream.keyAck="inStream"
stream.indexAck="1-0"
template="outfmt")
stop
}
action(type="omfile" file="'$RSYSLOG_DYNNAME.othermsg'" template="outfmt")
'
redis_command "XADD inStream 1-0 key value" >> $RSYSLOG_OUT_LOG
redis_command "XREAD STREAMS inStream 0" >> $RSYSLOG_OUT_LOG
redis_command "XLEN inStream" >> $RSYSLOG_OUT_LOG
startup
# Inject 1 message
injectmsg 1 1
shutdown_when_empty
wait_shutdown
redis_command "XLEN inStream" >> $RSYSLOG_OUT_LOG
# 2. add entry to stream (with index 1-0)
# 3. read it -> index is read (but not deleted)
# 4. show stream length -> should be 1
# 4.2. start Rsyslog and send message -> omhiredis deletes index 1-0 on stream 'inStream'
# 5. show stream length again -> should be 0
export EXPECTED="/usr/bin/redis-cli
1-0
/usr/bin/redis-cli
inStream
1-0
key
value
/usr/bin/redis-cli
1
/usr/bin/redis-cli
0"
cmp_exact $RSYSLOG_OUT_LOG
stop_redis
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhiredis-stream-dynack.sh

107
tests/omhiredis-stream-dynack.sh Executable file
View File

@ -0,0 +1,107 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
# export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/omhiredis/.libs/omhiredis")
template(name="outfmt" type="string" string="%msg%")
template(name="redis-key" type="string" string="%$.redis!key%")
template(name="redis-group" type="string" string="%$.redis!group%")
template(name="redis-index" type="string" string="%$.redis!index%")
local4.* {
set $.redis!key = "inputStream";
set $.redis!group = "myGroup";
set $.redis!index = "2-0";
action(type="omhiredis"
server="127.0.0.1"
serverport="'$REDIS_RANDOM_PORT'"
mode="stream"
key="outStream"
stream.ack="on"
stream.dynaKeyAck="on"
stream.dynaGroupAck="on"
stream.dynaIndexAck="on"
stream.keyAck="redis-key"
stream.groupAck="redis-group"
stream.indexAck="redis-index"
template="outfmt")
stop
}
action(type="omfile" file="'$RSYSLOG_DYNNAME.othermsg'" template="outfmt")
'
redis_command "XGROUP CREATE inputStream myGroup 0 MKSTREAM" > $RSYSLOG_OUT_LOG
redis_command "XADD inputStream 2-0 key value" >> $RSYSLOG_OUT_LOG
redis_command "XREADGROUP GROUP myGroup consumerName COUNT 1 STREAMS inputStream >" >> $RSYSLOG_OUT_LOG
redis_command "XINFO GROUPS inputStream" >> $RSYSLOG_OUT_LOG
startup
# Inject 1 message
injectmsg 1 1
shutdown_when_empty
wait_shutdown
redis_command "XINFO GROUPS inputStream" >> $RSYSLOG_OUT_LOG
# 1. create group and stream
# 2. add entry to stream (with index 2-0)
# 3. read it from a consumer group -> index is now pending
# 4. show group infos -> pending shows 1 pending entry
# 4.2. start Rsyslog and send message -> omhiredis acknowledges index 2-0 on group 'myGroup' for stream 'inputStream'
# 5. show group infos again -> pending now shows 0 pending entries
export EXPECTED="/usr/bin/redis-cli
OK
/usr/bin/redis-cli
2-0
/usr/bin/redis-cli
inputStream
2-0
key
value
/usr/bin/redis-cli
name
myGroup
consumers
1
pending
1
last-delivered-id
2-0
entries-read
1
lag
0
/usr/bin/redis-cli
name
myGroup
consumers
1
pending
0
last-delivered-id
2-0
entries-read
1
lag
0"
cmp_exact $RSYSLOG_OUT_LOG
stop_redis
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhiredis-stream-outfield.sh

View File

@ -0,0 +1,57 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
# export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/omhiredis/.libs/omhiredis")
template(name="outfmt" type="string" string="%msg%")
local4.* {
action(type="omhiredis"
server="127.0.0.1"
serverport="'$REDIS_RANDOM_PORT'"
mode="stream"
key="outStream"
stream.outField="custom_field"
template="outfmt")
stop
}
action(type="omfile" file="'$RSYSLOG_DYNNAME.othermsg'" template="outfmt")
'
startup
# Inject 2 messages
injectmsg 1 2
shutdown_when_empty
wait_shutdown
# Should get '2'
redis_command "XLEN outStream" >> $RSYSLOG_OUT_LOG
# Should get 2 entries
redis_command "XREAD COUNT 3 STREAMS outStream 0" >> $RSYSLOG_OUT_LOG
# Cannot check for full reply as it includes entries' unix timestamp
content_count_check "outStream" 1 $RSYSLOG_OUT_LOG
content_count_check " msgnum:00000001:" 1 $RSYSLOG_OUT_LOG
content_count_check " msgnum:00000002:" 1 $RSYSLOG_OUT_LOG
# 2 data should be inserted in the 'custom_field' field inside every message
content_count_check "custom_field" 2 $RSYSLOG_OUT_LOG
content_count_check "msg" 2 $RSYSLOG_OUT_LOG
content_count_check "msgnum" 2 $RSYSLOG_OUT_LOG
stop_redis
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

7
tests/omhiredis-stream-vg.sh Executable file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhiredis-stream.sh

63
tests/omhiredis-stream.sh Executable file
View File

@ -0,0 +1,63 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
# export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/omhiredis/.libs/omhiredis")
template(name="outfmt" type="string" string="%msg%")
local4.* {
action(type="omhiredis"
server="127.0.0.1"
serverport="'$REDIS_RANDOM_PORT'"
mode="stream"
key="outStream"
template="outfmt")
stop
}
action(type="omfile" file="'$RSYSLOG_DYNNAME.othermsg'" template="outfmt")
'
# Should get 'ERR no such key'
redis_command "XINFO GROUPS outStream" > $RSYSLOG_OUT_LOG
startup
# Inject 2 messages
injectmsg 1 2
shutdown_when_empty
wait_shutdown
# Should get an empty string (no ERR)
redis_command "XINFO GROUPS outStream" >> $RSYSLOG_OUT_LOG
# Should get '2'
redis_command "XLEN outStream" >> $RSYSLOG_OUT_LOG
# Should get 2 entries
redis_command "XREAD COUNT 3 STREAMS outStream 0" >> $RSYSLOG_OUT_LOG
# Cannot check for full reply as it includes entries' unix timestamp
content_count_check "ERR no such key" 1 $RSYSLOG_OUT_LOG
content_count_check "outStream" 1 $RSYSLOG_OUT_LOG
content_count_check " msgnum:00000001:" 1 $RSYSLOG_OUT_LOG
content_count_check " msgnum:00000002:" 1 $RSYSLOG_OUT_LOG
# 2 for the name of the field where the message was inserted, 2 for the 'msgnum' part
content_count_check "msg" 4 $RSYSLOG_OUT_LOG
content_count_check "msgnum" 2 $RSYSLOG_OUT_LOG
stop_redis
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

7
tests/omhiredis-template-vg.sh Executable file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhiredis-template.sh

56
tests/omhiredis-template.sh Executable file
View File

@ -0,0 +1,56 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
# export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/omhiredis/.libs/omhiredis")
template(name="outfmt" type="string" string="%msg%")
template(name="redis_command" type="string" string="INCRBY counter 3")
local4.* {
action(type="omhiredis"
server="127.0.0.1"
serverport="'$REDIS_RANDOM_PORT'"
mode="template"
template="redis_command")
stop
}
action(type="omfile" file="'$RSYSLOG_DYNNAME.othermsg'" template="outfmt")
'
# Should get nothing
redis_command "GET counter" > $RSYSLOG_OUT_LOG
startup
# Inject 3 messages
injectmsg 1 3
shutdown_when_empty
wait_shutdown
# Should get '9'
redis_command "GET counter" >> $RSYSLOG_OUT_LOG
# The first get is before inserting, the second is after
export EXPECTED="/usr/bin/redis-cli
/usr/bin/redis-cli
9"
cmp_exact $RSYSLOG_OUT_LOG
stop_redis
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

7
tests/omhiredis-withpass-vg.sh Executable file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhiredis-withpass.sh

63
tests/omhiredis-withpass.sh Executable file
View File

@ -0,0 +1,63 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
# export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
REDIS_PASSWORD="mySuperSecretPasswd"
# Set a password on started Redis server
redis_command "CONFIG SET requirepass ${REDIS_PASSWORD}"
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/omhiredis/.libs/omhiredis")
template(name="outfmt" type="string" string="%msg%")
local4.* {
action(type="omhiredis"
server="127.0.0.1"
serverport="'$REDIS_RANDOM_PORT'"
serverpassword="'${REDIS_PASSWORD}'"
mode="set"
key="outKey"
template="outfmt")
stop
}
action(type="omfile" file="'$RSYSLOG_DYNNAME.othermsg'" template="outfmt")
'
# Client MUST authentiate here!
redis_command "AUTH ${REDIS_PASSWORD} \n GET outKey" > $RSYSLOG_OUT_LOG
startup
# Inject 1 message
injectmsg 1 1
shutdown_when_empty
wait_shutdown
# Should get ' msgnum:00000001:'
redis_command "AUTH ${REDIS_PASSWORD} \n GET outKey" >> $RSYSLOG_OUT_LOG
# the "OK" replies are for authentication of the redis cli client
export EXPECTED="/usr/bin/redis-cli
OK
/usr/bin/redis-cli
OK
msgnum:00000001:"
cmp_exact $RSYSLOG_OUT_LOG
stop_redis
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

View File

@ -0,0 +1,7 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
#export RS_REDIR=-d
export USE_VALGRIND="YES"
source ${srcdir:=.}/omhiredis-wrongpass.sh

49
tests/omhiredis-wrongpass.sh Executable file
View File

@ -0,0 +1,49 @@
#!/usr/bin/env bash
# added 2023-04-20 by Théo Bertin, released under ASL 2.0
## Uncomment for debugging
# export RS_REDIR=-d
. ${srcdir:=.}/diag.sh init
start_redis
REDIS_PASSWORD="mySuperSecretPasswd"
# Set a password on started Redis server
redis_command "CONFIG SET requirepass ${REDIS_PASSWORD}"
generate_conf
add_conf '
global(localhostname="server")
module(load="../contrib/omhiredis/.libs/omhiredis")
template(name="outfmt" type="string" string="%msg%")
local4.* {
action(type="omhiredis"
server="127.0.0.1"
serverport="'$REDIS_RANDOM_PORT'"
serverpassword="ThatsNotMyPassword"
mode="set"
key="outKey"
template="outfmt")
stop
}
action(type="omfile" file="'$RSYSLOG_OUT_LOG'" template="outfmt")
'
startup
# Inject 1 message
injectmsg 1 1
shutdown_when_empty
wait_shutdown
content_check "error while authenticating: WRONGPASS invalid username-password pair or user is disabled." $RSYSLOG_OUT_LOG
stop_redis
# Removes generated configuration file, log and pid files
cleanup_redis
exit_test

View File

@ -0,0 +1,26 @@
daemonize no
# PID file and port used
pidfile "<tmpdir>/redis.pid"
port <rndport>
bind 127.0.0.1
protected-mode no
# Close the connection after a client is idle for 300 seconds
timeout 300
tcp-backlog 1024
tcp-keepalive 15
# Logging
loglevel warning
logfile "<tmpdir>/redis.log"
databases 1
# Do not save anything to disk
dir "<tmpdir>/"
save ""
appendonly no
maxmemory 1gb
maxmemory-policy volatile-lru

View File

@ -1565,6 +1565,52 @@ initAll(int argc, char **argv)
resetErrMsgsFlag();
localRet = rsconf.Load(&ourConf, ConfFile);
#ifdef ENABLE_LIBCAPNG
/*
* Drop capabilities to the necessary set
*/
int capng_rc, capng_failed = 0;
capng_clear(CAPNG_SELECT_BOTH);
if ((capng_rc = capng_updatev(CAPNG_ADD, CAPNG_EFFECTIVE|CAPNG_PERMITTED,
CAP_BLOCK_SUSPEND,
CAP_CHOWN,
CAP_IPC_LOCK,
CAP_LEASE,
CAP_NET_ADMIN,
CAP_NET_BIND_SERVICE,
CAP_DAC_OVERRIDE,
CAP_SETGID,
CAP_SETUID,
CAP_SYS_ADMIN,
CAP_SYS_CHROOT,
CAP_SYS_RESOURCE,
CAP_SYSLOG,
-1
)) != 0) {
LogError(0, RS_RET_LIBCAPNG_ERR,
"could not update the internal posix capabilities settings "
"based on the options passed to it, capng_updatev=%d", capng_rc);
capng_failed = 1;
}
if ((capng_rc = capng_apply(CAPNG_SELECT_BOTH)) != 0) {
LogError(0, RS_RET_LIBCAPNG_ERR,
"could not transfer the specified internal posix capabilities "
"settings to the kernel, capng_apply=%d", capng_rc);
capng_failed = 1;
}
if (capng_failed) {
DBGPRINTF("Capabilities were not dropped successfully.\n");
if (loadConf->globals.bAbortOnFailedLibcapngSetup) {
ABORT_FINALIZE(RS_RET_LIBCAPNG_ERR);
}
} else {
DBGPRINTF("Capabilities were dropped successfully\n");
}
#endif
if(fp_rs_full_conf_output != NULL) {
if(fp_rs_full_conf_output != stdout) {
fclose(fp_rs_full_conf_output);
@ -2171,44 +2217,6 @@ main(int argc, char **argv)
dbgClassInit();
#ifdef ENABLE_LIBCAPNG
/*
* Drop capabilities to the necessary set
*/
int capng_rc;
capng_clear(CAPNG_SELECT_BOTH);
if ((capng_rc = capng_updatev(CAPNG_ADD, CAPNG_EFFECTIVE|CAPNG_PERMITTED,
CAP_BLOCK_SUSPEND,
CAP_CHOWN,
CAP_IPC_LOCK,
CAP_LEASE,
CAP_NET_ADMIN,
CAP_NET_BIND_SERVICE,
CAP_DAC_OVERRIDE,
CAP_SETGID,
CAP_SETUID,
CAP_SYS_ADMIN,
CAP_SYS_CHROOT,
CAP_SYS_RESOURCE,
CAP_SYSLOG,
-1
)) != 0) {
LogError(0, RS_RET_LIBCAPNG_ERR,
"could not update the internal posix capabilities settings "
"based on the options passed to it, capng_updatev=%d\n", capng_rc);
exit(-1);
}
if ((capng_rc = capng_apply(CAPNG_SELECT_BOTH)) != 0) {
LogError(0, RS_RET_LIBCAPNG_ERR,
"could not transfer the specified internal posix capabilities "
"settings to the kernel, capng_apply=%d\n", capng_rc);
exit(-1);
}
DBGPRINTF("Capabilities were dropped successfully\n");
#endif
initAll(argc, argv);
#ifdef HAVE_LIBSYSTEMD
sd_notify(0, "READY=1");