add module mmsequence

This commit is contained in:
Pavel Levshin 2013-10-16 13:39:35 +02:00 committed by Rainer Gerhards
parent 6dea3e9104
commit 1064e566bc
6 changed files with 574 additions and 2 deletions

View File

@ -249,6 +249,10 @@ if ENABLE_MMCOUNT
SUBDIRS += plugins/mmcount
endif
if ENABLE_MMSEQUENCE
SUBDIRS += plugins/mmsequence
endif
if ENABLE_MMFIELDS
SUBDIRS += plugins/mmfields
endif

View File

@ -993,8 +993,8 @@ AM_CONDITIONAL(ENABLE_MMUTF8FIX, test x$enable_mmutf8fix = xyes)
AC_ARG_ENABLE(mmcount,
[AS_HELP_STRING([--enable-mmcount],[Enable message counting @<:@default=no@:>@])],
[case "${enableval}" in
yes) enable_xmpp="yes" ;;
no) enable_xmpp="no" ;;
yes) enable_mmcount="yes" ;;
no) enable_mmcount="no" ;;
*) AC_MSG_ERROR(bad value ${enableval} for --enable-mmcount) ;;
esac],
[enable_mmcount=no]
@ -1002,6 +1002,19 @@ AC_ARG_ENABLE(mmcount,
AM_CONDITIONAL(ENABLE_MMCOUNT, test x$enable_mmcount = xyes)
# mmsequence
AC_ARG_ENABLE(mmsequence,
[AS_HELP_STRING([--enable-mmsequence],[Enable sequence generator @<:@default=no@:>@])],
[case "${enableval}" in
yes) enable_mmsequence="yes" ;;
no) enable_mmsequence="no" ;;
*) AC_MSG_ERROR(bad value ${enableval} for --enable-mmsequence) ;;
esac],
[enable_mmsequence=no]
)
AM_CONDITIONAL(ENABLE_MMSEQUENCE, test x$enable_mmsequence = xyes)
# mmfields
AC_ARG_ENABLE(mmfields,
[AS_HELP_STRING([--enable-mmfields],[Enable building mmfields support @<:@default=no@:>@])],
@ -1564,6 +1577,7 @@ AC_CONFIG_FILES([Makefile \
plugins/mmanon/Makefile \
plugins/mmutf8fix/Makefile \
plugins/mmcount/Makefile \
plugins/mmsequence/Makefile \
plugins/mmfields/Makefile \
plugins/mmpstrucdata/Makefile \
plugins/mmrfc5424addhmac/Makefile \
@ -1633,6 +1647,7 @@ echo " mmsnmptrapd module will be compiled: $enable_mmsnmptrapd"
echo " mmutf8fix enabled: $enable_mmutf8fix"
echo " mmrfc5424addhmac enabled: $enable_mmrfc5424addhmac"
echo " mmpstrucdata enabled: $enable_mmpstrucdata"
echo " mmsequence enabled: $enable_mmsequence"
echo
echo "---{ strgen modules }---"
echo " sm_cust_bindcdr module will be compiled: $enable_sm_cust_bindcdr"

148
doc/mmsequence.html Normal file
View File

@ -0,0 +1,148 @@
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html><head>
<meta http-equiv="Content-Language" content="en">
<title>mmsequence</title></head>
<body>
<a href="rsyslog_conf_modules.html">back</a>
<h1>Number generator and counter module (mmsequence)</h1>
<p><b>Module Name:&nbsp;&nbsp;&nbsp; mmsequence</b></p>
<p><b>Author: </b>Pavel Levshin &lt;pavel@levshin.spb.ru&gt;</p>
<p><b>Status: </b>Non project-supported module - contact author
or rsyslog mailing list for questions</p>
<p><b>Available since</b>: 7.5.6</p>
<p><b>Description</b>:</p>
<p>This module generates numeric sequences of different kinds. It can be used
to count messages up to a limit and to number them. It can generate random
numbers in a given range.</p>
<p>This module is implemented via the output module interface, so it is
called just as an action. The number generated is stored in a variable.</p>
<p>&nbsp;</p>
<p><b>Action Parameters</b>:</p>
<ul>
<li><b>mode</b> "random" or "instance" or "key"
<p>Specifies mode of the action. In "random" mode, the module
generates uniformly distributed integer numbers in a range defined
by "from" and "to".</p>
<p>In "instance" mode, which is default, the action produces a counter
in range [from, to). This counter is specific to this action instance.</p>
<p>In "key" mode, the counter can be shared between multiple instances.
This counter is identified by a name, which is defined with "key"
parameter.</p>
</li>
<li><b>from</b> [non-negative integer], default "0"
<p>Starting value for counters and lower margin for random generator.</p>
</li>
<li><b>to</b> [positive integer], default "INT_MAX"
<p>Upper margin for all sequences. Note that this margin is not
inclusive. When next value for a counter is equal or greater than
this parameter, the counter resets to the starting value.</p>
</li>
<li><b>step</b> [non-negative integer], default "1"
<p>Increment for counters. If step is "0", it can be used to fetch
current value without modification. The latter not applies to
"random" mode. This is useful in "key" mode or to get constant
values in "instance" mode.</p>
</li>
<li><b>key</b> [word], default ""
<p>Name of the global counter which is used in this action.</p>
</li>
<li><b>var</b> [word], default "$!mmsequence"
<p>Name of the variable where the number will be stored.
Should start with "$".</p>
</li>
</ul>
<p><b>Sample</b>:</p>
<pre>
# load balance
Ruleset(
name="logd"
queue.workerthreads="5"
){
Action(
type="mmsequence"
mode="instance"
from="0"
to="2"
var="$.seq"
)
if $.seq == "0" then {
Action(
type="mmnormalize"
userawmsg="on"
rulebase="/etc/rsyslog.d/rules.rb"
)
} else {
Action(
type="mmnormalize"
userawmsg="on"
rulebase="/etc/rsyslog.d/rules.rb"
)
}
# output logic here
}
# generate random numbers
action(
type="mmsequence"
mode="random"
to="100"
var="$!rndz"
)
# count from 0 to 99
action(
type="mmsequence"
mode="instance"
to="100"
var="$!cnt1"
)
# the same as before but the counter is global
action(
type="mmsequence"
mode="key"
key="key1"
to="100"
var="$!cnt2"
)
# count specific messages but place the counter in every message
if $msg contains "txt" then
action(
type="mmsequence"
mode="key"
to="100"
var="$!cnt3"
)
else
action(
type="mmsequence"
mode="key"
to="100"
step="0"
var="$!cnt3"
key=""
)
</pre>
<p><b>Legacy Configuration Directives</b>:</p>
<p>Not supported.</p>
<p>[<a href="rsyslog_conf.html">rsyslog.conf overview</a>] [<a href="manual.html">manual
index</a>] [<a href="http://www.rsyslog.com/">rsyslog site</a>]</p>
<p><font size="2">This documentation is part of the
<a href="http://www.rsyslog.com/">rsyslog</a> project.<br>
Copyright &copy; 2008-2013 by <a href="http://www.gerhards.net/rainer">Rainer Gerhards</a> and
<a href="http://www.adiscon.com/">Adiscon</a>. Released under the GNU GPL
version 3 or higher.</font></p>
</body></html>

View File

@ -126,6 +126,7 @@ the output module interface.
<li><a href="mmutf8fix.html">mmutf8fix</a> - used to fix invalid UTF-8 character sequences
<li><a href="mmrfc5424addhmac.html">mmrfc5424addhmac</a> - custom module for adding HMACs to
rfc5424-formatted messages if not already present
<li><a href="mmsequence.html">mmsequence</a> - sequence generator and counter plugin
</ul>
<a name="lm"></a><h2>String Generator Modules</h2>

View File

@ -0,0 +1,8 @@
pkglib_LTLIBRARIES = mmsequence.la
mmsequence_la_SOURCES = mmsequence.c
mmsequence_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS)
mmsequence_la_LDFLAGS = -module -avoid-version
mmsequence_la_LIBADD =
EXTRA_DIST =

View File

@ -0,0 +1,396 @@
/* mmsequence.c
* Generate a number based on some sequence.
*
* Copyright 2013 pavel@levshin.spb.ru.
*
* Based on: mmcount.c
* Copyright 2013 Red Hat Inc.
*
* This file is part of rsyslog.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* -or-
* see COPYING.ASL20 in the source distribution
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "config.h"
#include "rsyslog.h"
#include <stdio.h>
#include <stdarg.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <signal.h>
#include <errno.h>
#include <unistd.h>
#include <stdint.h>
#include <time.h>
#include <limits.h>
#include <json/json.h>
#include <pthread.h>
#include "conf.h"
#include "syslogd-types.h"
#include "srUtils.h"
#include "template.h"
#include "module-template.h"
#include "errmsg.h"
#include "hashtable.h"
#define JSON_VAR_NAME "$!mmsequence"
enum mmSequenceModes {
mmSequenceRandom,
mmSequencePerInstance,
mmSequencePerKey
};
MODULE_TYPE_OUTPUT
MODULE_TYPE_NOKEEP
MODULE_CNFNAME("mmsequence")
DEFobjCurrIf(errmsg);
DEF_OMOD_STATIC_DATA
/* config variables */
typedef struct _instanceData {
enum mmSequenceModes mode;
int valueFrom;
int valueTo;
int step;
unsigned int seed;
int value;
char *pszKey;
char *pszVar;
} instanceData;
struct modConfData_s {
rsconf_t *pConf; /* our overall config object */
};
static modConfData_t *loadModConf = NULL;/* modConf ptr to use for the current load process */
static modConfData_t *runModConf = NULL;/* modConf ptr to use for the current exec process */
/* tables for interfacing with the v6 config system */
/* action (instance) parameters */
static struct cnfparamdescr actpdescr[] = {
{ "mode", eCmdHdlrGetWord, 0 },
{ "from", eCmdHdlrNonNegInt, 0 },
{ "to", eCmdHdlrPositiveInt, 0 },
{ "step", eCmdHdlrNonNegInt, 0 },
{ "key", eCmdHdlrGetWord, 0 },
{ "var", eCmdHdlrGetWord, 0 },
};
static struct cnfparamblk actpblk =
{ CNFPARAMBLK_VERSION,
sizeof(actpdescr)/sizeof(struct cnfparamdescr),
actpdescr
};
/* table for key-counter pairs */
static struct hashtable *ght;
static pthread_mutex_t ght_mutex = PTHREAD_MUTEX_INITIALIZER;
BEGINbeginCnfLoad
CODESTARTbeginCnfLoad
loadModConf = pModConf;
pModConf->pConf = pConf;
ENDbeginCnfLoad
BEGINendCnfLoad
CODESTARTendCnfLoad
ENDendCnfLoad
BEGINcheckCnf
CODESTARTcheckCnf
ENDcheckCnf
BEGINactivateCnf
CODESTARTactivateCnf
runModConf = pModConf;
ENDactivateCnf
BEGINfreeCnf
CODESTARTfreeCnf
ENDfreeCnf
BEGINcreateInstance
CODESTARTcreateInstance
ENDcreateInstance
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
ENDisCompatibleWithFeature
BEGINfreeInstance
CODESTARTfreeInstance
ENDfreeInstance
static inline void
setInstParamDefaults(instanceData *pData)
{
pData->mode = mmSequencePerInstance;
pData->valueFrom = 0;
pData->valueTo = INT_MAX;
pData->step = 1;
pData->pszKey = "";
pData->pszVar = JSON_VAR_NAME;
}
BEGINnewActInst
struct cnfparamvals *pvals;
int i;
char *cstr;
CODESTARTnewActInst
DBGPRINTF("newActInst (mmsequence)\n");
if((pvals = nvlstGetParams(lst, &actpblk, NULL)) == NULL) {
ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
}
CODE_STD_STRING_REQUESTnewActInst(1)
CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
CHKiRet(createInstance(&pData));
setInstParamDefaults(pData);
for(i = 0 ; i < actpblk.nParams ; ++i) {
if(!pvals[i].bUsed)
continue;
if(!strcmp(actpblk.descr[i].name, "mode")) {
if(!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"random",
sizeof("random")-1)) {
pData->mode = mmSequenceRandom;
} else if (!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"instance",
sizeof("instance")-1)) {
pData->mode = mmSequencePerInstance;
} else if (!es_strbufcmp(pvals[i].val.d.estr, (uchar*)"key",
sizeof("key")-1)) {
pData->mode = mmSequencePerKey;
} else {
cstr = es_str2cstr(pvals[i].val.d.estr, NULL);
errmsg.LogError(0, RS_RET_INVLD_MODE,
"mmsequence: invalid mode '%s' - ignored",
cstr);
free(cstr);
}
continue;
}
if(!strcmp(actpblk.descr[i].name, "from")) {
pData->valueFrom = pvals[i].val.d.n;
continue;
}
if(!strcmp(actpblk.descr[i].name, "to")) {
pData->valueTo = pvals[i].val.d.n;
continue;
}
if(!strcmp(actpblk.descr[i].name, "step")) {
pData->step = pvals[i].val.d.n;
continue;
}
if(!strcmp(actpblk.descr[i].name, "key")) {
pData->pszKey = es_str2cstr(pvals[i].val.d.estr, NULL);
continue;
}
if(!strcmp(actpblk.descr[i].name, "var")) {
cstr = es_str2cstr(pvals[i].val.d.estr, NULL);
if (strlen(cstr) < 3) {
errmsg.LogError(0, RS_RET_VALUE_NOT_SUPPORTED,
"mmsequence: valid variable name should be at least "
"3 symbols long, got %s", cstr);
free(cstr);
} else if (cstr[0] != '$') {
errmsg.LogError(0, RS_RET_VALUE_NOT_SUPPORTED,
"mmsequence: valid variable name should start with $,"
"got %s", cstr);
free(cstr);
} else {
pData->pszVar = cstr;
}
continue;
}
dbgprintf("mmsequence: program error, non-handled "
"param '%s'\n", actpblk.descr[i].name);
}
switch(pData->mode) {
case mmSequenceRandom:
pData->seed = (unsigned int)(intptr_t)pData ^ (unsigned int)time(NULL);
break;
case mmSequencePerInstance:
pData->value = pData->valueTo;
break;
case mmSequencePerKey:
if (pthread_mutex_lock(&ght_mutex)) {
DBGPRINTF("mmsequence: mutex lock has failed!\n");
ABORT_FINALIZE(RS_RET_ERR);
}
if (ght == NULL) {
if(NULL == (ght = create_hashtable(100, hash_from_string, key_equals_string, NULL))) {
pthread_mutex_unlock(&ght_mutex);
DBGPRINTF("mmsequence: error creating hash table!\n");
ABORT_FINALIZE(RS_RET_ERR);
}
}
pthread_mutex_unlock(&ght_mutex);
break;
default:
errmsg.LogError(0, RS_RET_INVLD_MODE,
"mmsequence: this mode is not currently implemented");
}
CODE_STD_FINALIZERnewActInst
cnfparamvalsDestruct(pvals, &actpblk);
ENDnewActInst
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
ENDdbgPrintInstInfo
BEGINtryResume
CODESTARTtryResume
ENDtryResume
static int *
getCounter(struct hashtable *ht, char *str, int initial) {
int *pCounter;
char *pStr;
pCounter = hashtable_search(ht, str);
if(pCounter) {
return pCounter;
}
/* counter is not found for the str, so add new entry and
return the counter */
if(NULL == (pStr = strdup(str))) {
DBGPRINTF("mmsequence: memory allocation for key failed\n");
return NULL;
}
if(NULL == (pCounter = (int*)malloc(sizeof(*pCounter)))) {
DBGPRINTF("mmsequence: memory allocation for value failed\n");
free(pStr);
return NULL;
}
*pCounter = initial;
if(!hashtable_insert(ht, pStr, pCounter)) {
DBGPRINTF("mmsequence: inserting element into hashtable failed\n");
free(pStr);
free(pCounter);
return NULL;
}
return pCounter;
}
BEGINdoAction
msg_t *pMsg;
struct json_object *json;
int val = 0;
int *pCounter;
CODESTARTdoAction
pMsg = (msg_t*) ppString[0];
switch(pData->mode) {
case mmSequenceRandom:
val = pData->valueFrom + (rand_r(&pData->seed) %
(pData->valueTo - pData->valueFrom));
break;
case mmSequencePerInstance:
if (pData->value >= pData->valueTo - pData->step) {
pData->value = pData->valueFrom;
} else {
pData->value += pData->step;
}
val = pData->value;
break;
case mmSequencePerKey:
if (!pthread_mutex_lock(&ght_mutex)) {
pCounter = getCounter(ght, pData->pszKey, pData->valueTo);
if(pCounter) {
if (*pCounter >= pData->valueTo - pData->step
|| *pCounter < pData->valueFrom ) {
*pCounter = pData->valueFrom;
} else {
*pCounter += pData->step;
}
val = *pCounter;
} else {
errmsg.LogError(0, RS_RET_NOT_FOUND,
"mmsequence: unable to fetch the counter from hash");
}
pthread_mutex_unlock(&ght_mutex);
} else {
errmsg.LogError(0, RS_RET_ERR,
"mmsequence: mutex lock has failed!");
}
break;
default:
errmsg.LogError(0, RS_RET_NOT_IMPLEMENTED,
"mmsequence: this mode is not currently implemented");
}
/* finalize_it: */
json = json_object_new_int(val);
if (json == NULL) {
errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED,
"mmsequence: unable to create JSON");
} else if (RS_RET_OK != msgAddJSON(pMsg, (uchar *)pData->pszVar + 1, json)) {
errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED,
"mmsequence: unable to pass out the value");
json_object_put(json);
}
ENDdoAction
BEGINparseSelectorAct
CODESTARTparseSelectorAct
CODE_STD_STRING_REQUESTparseSelectorAct(1)
if(strncmp((char*) p, ":mmsequence:", sizeof(":mmsequence:") - 1)) {
errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
"mmsequence supports only v6+ config format, use: "
"action(type=\"mmsequence\" ...)");
}
ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
BEGINmodExit
CODESTARTmodExit
objRelease(errmsg, CORE_COMPONENT);
ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_QUERIES
ENDqueryEtryPt
BEGINmodInit()
CODESTARTmodInit
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
CODEmodInit_QueryRegCFSLineHdlr
DBGPRINTF("mmsequence: module compiled with rsyslog version %s.\n", VERSION);
CHKiRet(objUse(errmsg, CORE_COMPONENT));
ENDmodInit