input stmt: add core engine plumbing

This commit is contained in:
Rainer Gerhards 2012-09-26 12:25:10 +02:00
parent b715e86769
commit 7c2183ee32
5 changed files with 96 additions and 1 deletions

View File

@ -353,6 +353,24 @@ finalize_it:\
}
/* newInpInst()
* This is basically the equivalent to newActInst() for creating input
* module (listener) instances.
*/
#define BEGINnewInpInst \
static rsRetVal newInpInst(struct nvlst *lst)\
{\
DEFiRet;
#define CODESTARTnewInpInst \
#define CODE_STD_FINALIZERnewInpInst
#define ENDnewInpInst \
RETiRet;\
}
/* tryResume()
* This entry point is called to check if a module can resume operations. This
* happens when a module requested that it be suspended. In suspended state,
@ -521,6 +539,16 @@ static rsRetVal queryEtryPt(uchar *name, rsRetVal (**pEtryPoint)())\
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
/* the following block is to be added for input modules that support the v2
* config system. The config name is also provided.
*/
#define CODEqueryEtryPt_STD_CONF2_IMOD_QUERIES \
else if(!strcmp((char*) name, "newInpInst")) {\
*pEtryPoint = newInpInst;\
} \
CODEqueryEtryPt_STD_CONF2_CNFNAME_QUERIES
/* the following block is to be added for modules that require
* pre priv drop activation support.
*/

View File

@ -607,6 +607,12 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_
CHKiRet((*pNew->modQueryEtryPt)((uchar*)"willRun", &pNew->mod.im.willRun));
CHKiRet((*pNew->modQueryEtryPt)((uchar*)"afterRun", &pNew->mod.im.afterRun));
pNew->mod.im.bCanRun = 0;
localRet = (*pNew->modQueryEtryPt)((uchar*)"newInpInst", &pNew->mod.im.newInpInst);
if(localRet == RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) {
pNew->mod.om.newActInst = NULL;
} else if(localRet != RS_RET_OK) {
ABORT_FINALIZE(localRet);
}
break;
case eMOD_OUT:
CHKiRet((*pNew->modQueryEtryPt)((uchar*)"freeInstance", &pNew->freeInstance));
@ -625,7 +631,8 @@ doModInit(rsRetVal (*modInit)(int, int*, rsRetVal(**)(), rsRetVal(*)(), modInfo_
else if(localRet != RS_RET_OK)
ABORT_FINALIZE(localRet);
localRet = (*pNew->modQueryEtryPt)((uchar*)"endTransaction", &pNew->mod.om.endTransaction);
localRet = (*pNew->modQueryEtryPt)((uchar*)"endTransaction",
&pNew->mod.om.endTransaction);
if(localRet == RS_RET_MODULE_ENTRY_POINT_NOT_FOUND) {
pNew->mod.om.endTransaction = dummyEndTransaction;
} else if(localRet != RS_RET_OK) {

View File

@ -131,6 +131,7 @@ struct modInfo_s {
/* TODO: remove? */rsRetVal (*willRun)(void); /* check if the current config will be able to run*/
rsRetVal (*runInput)(thrdInfo_t*); /* function to gather input and submit to queue */
rsRetVal (*afterRun)(thrdInfo_t*); /* function to gather input and submit to queue */
rsRetVal (*newInpInst)(struct nvlst *lst);
int bCanRun; /* cached value of whether willRun() succeeded */
} im;
struct {/* data for output modules */

View File

@ -99,6 +99,18 @@ static uchar template_SysklogdFileFormat[] = "\"%TIMESTAMP% %HOSTNAME% %syslogta
static uchar template_StdJSONFmt[] = "\"{\\\"message\\\":\\\"%msg:::json%\\\",\\\"fromhost\\\":\\\"%HOSTNAME:::json%\\\",\\\"facility\\\":\\\"%syslogfacility-text%\\\",\\\"priority\\\":\\\"%syslogpriority-text%\\\",\\\"timereported\\\":\\\"%timereported:::date-rfc3339%\\\",\\\"timegenerated\\\":\\\"%timegenerated:::date-rfc3339%\\\"}\"";
/* end templates */
/* tables for interfacing with the v6 config system (as far as we need to) */
static struct cnfparamdescr inppdescr[] = {
{ "name", eCmdHdlrGetWord, 0 },
{ "type", eCmdHdlrString, CNFPARAM_REQUIRED }
};
static struct cnfparamblk inppblk =
{ CNFPARAMBLK_VERSION,
sizeof(inppdescr)/sizeof(struct cnfparamdescr),
inppdescr
};
/* forward-definitions */
void cnfDoCfsysline(char *ln);
/* Standard-Constructor
@ -373,6 +385,49 @@ finalize_it:
return estr;
}
/* Process input() objects */
rsRetVal
inputProcessCnf(struct cnfobj *o)
{
struct cnfparamvals *pvals;
modInfo_t *pMod;
uchar *cnfModName = NULL;
void *pModData;
action_t *pAction;
int typeIdx;
DEFiRet;
pvals = nvlstGetParams(o->nvlst, &inppblk, NULL);
if(pvals == NULL) {
ABORT_FINALIZE(RS_RET_ERR);
}
DBGPRINTF("input param blk after inputProcessCnf:\n");
cnfparamsPrint(&inppblk, pvals);
typeIdx = cnfparamGetIdx(&inppblk, "type");
if(pvals[typeIdx].bUsed == 0) {
errmsg.LogError(0, RS_RET_CONF_RQRD_PARAM_MISSING, "input type missing");
ABORT_FINALIZE(RS_RET_CONF_RQRD_PARAM_MISSING); // TODO: move this into rainerscript handlers
}
cnfModName = (uchar*)es_str2cstr(pvals[typeIdx].val.d.estr, NULL);
if((pMod = module.FindWithCnfName(loadConf, cnfModName, eMOD_IN)) == NULL) {
errmsg.LogError(0, RS_RET_MOD_UNKNOWN, "input module name '%s' is unknown", cnfModName);
ABORT_FINALIZE(RS_RET_MOD_UNKNOWN);
}
if(pMod->mod.im.newInpInst == NULL) {
errmsg.LogError(0, RS_RET_MOD_NO_INPUT_STMT,
"input module '%s' does not support input() statement", cnfModName);
ABORT_FINALIZE(RS_RET_MOD_NO_INPUT_STMT);
}
dbgprintf("DDDD: ready to roll...\n");
CHKiRet(pMod->mod.im.newInpInst(o->nvlst));
dbgprintf("DDDD: done calling module entry point\n");
finalize_it:
free(cnfModName);
cnfparamvalsDestruct(pvals, &inppblk);
RETiRet;
}
/*------------------------------ interface to flex/bison parser ------------------------------*/
extern int yylineno;
@ -416,6 +471,9 @@ void cnfDoObj(struct cnfobj *o)
case CNFOBJ_ACTION:
actionProcessCnf(o);
break;
case CNFOBJ_INPUT:
inputProcessCnf(o);
break;
case CNFOBJ_TPL:
tplProcessCnf(o);
break;

View File

@ -381,6 +381,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_MODULE_ALREADY_IN_CONF = -2221, /**< module already in current configuration */
RS_RET_PARAM_NOT_PERMITTED = -2222, /**< legacy parameter no longer permitted (usally already set by v2) */
RS_RET_NO_JSON_PASSING = -2223, /**< rsyslog core does not support JSON-passing plugin API */
RS_RET_MOD_NO_INPUT_STMT = -2224, /**< (input) module does not support input() statement */
/* RainerScript error messages (range 1000.. 1999) */
RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */