- begun some work on Msg Object serializiation

- created a kind of general base class
This commit is contained in:
Rainer Gerhards 2008-01-04 16:05:42 +00:00
parent b95b5ab284
commit faf8e5a384
9 changed files with 295 additions and 5 deletions

View File

@ -14,6 +14,7 @@ Version 3.10.0 (rgerhards), 2008-01-??
- all inputs are now implemented as loadable plugins
- enhanced threading model: each input module now runs on its own thread
- enhanced message queue which now supports different queueing methods
(among others, this can be used for performance fine-tuning)
- added a large number of new configuration directives for the new
input modules
- ability to bind UDP listeners to specific local interfaces/ports and

View File

@ -27,6 +27,8 @@ rsyslogd_SOURCES = \
sync.h \
net.c \
net.h \
obj.c \
obj.h \
msg.c \
msg.h \
expr.c \

66
msg.c
View File

@ -41,6 +41,8 @@
#include "template.h"
#include "msg.h"
DEFobjStaticHelpers
static syslogCODE rs_prioritynames[] =
{
{ "alert", LOG_ALERT },
@ -118,6 +120,7 @@ msg_t* MsgConstruct(void)
pM->iSeverity = -1;
pM->iFacility = -1;
getCurrTime(&(pM->tRcvdAt));
objConstructSetObjInfo(pM);
}
/* DEV debugging only! dbgprintf("MsgConstruct\t0x%x, ref 1\n", (int)pM);*/
@ -271,6 +274,61 @@ msg_t* MsgDup(msg_t* pOld)
#undef tmpCOPYCSTR
/* This method serializes a message object. That means the whole
* object is modified into text form. That text form is suitable for
* later reconstruction of the object by calling MsgDeSerialize().
* The most common use case for this method is the creation of an
* on-disk representation of the message object.
* We do not serialize the cache properties. We re-create them when needed.
* This saves us a lot of memory. Performance is no concern, as serializing
* is a so slow operation that recration of the caches does not count.
* rgerhards, 2008-01-03
*/
rsRetVal MsgSerialize(uchar **ppOutBuf, size_t *pLenBuf, void *pUsr)
{
DEFiRet;
msg_t* pThis = pUsr;
rsCStrObj *pCStr;
assert(ppOutBuf != NULL);
assert(pLenBuf != NULL);
assert(pThis != NULL);
if((pCStr = rsCStrConstruct()) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
CHKiRet(rsCStrAppendStr(pCStr, (uchar*) "$MSG v1\n"));
/*
if(rsCStrAppendChar(pStrB, (escapeMode == 0) ? '\'' : '\\') != RS_RET_OK)
pNew->iSyslogVers = pOld->iSyslogVers;
pNew->bParseHOSTNAME = pOld->bParseHOSTNAME;
pNew->iSeverity = pOld->iSeverity;
pNew->iFacility = pOld->iFacility;
pNew->bParseHOSTNAME = pOld->bParseHOSTNAME;
pNew->msgFlags = pOld->msgFlags;
pNew->iProtocolVersion = pOld->iProtocolVersion;
memcpy(&pNew->tRcvdAt, &pOld->tRcvdAt, sizeof(struct syslogTime));
memcpy(&pNew->tTIMESTAMP, &pOld->tTIMESTAMP, sizeof(struct syslogTime));
tmpCOPYSZ(RawMsg);
tmpCOPYSZ(MSG);
tmpCOPYSZ(UxTradMsg);
tmpCOPYSZ(TAG);
tmpCOPYSZ(HOSTNAME);
tmpCOPYSZ(RcvFrom);
tmpCOPYCSTR(ProgName);
tmpCOPYCSTR(StrucData);
tmpCOPYCSTR(APPNAME);
tmpCOPYCSTR(PROCID);
tmpCOPYCSTR(MSGID);
*/
finalize_it:
return iRet;
}
/* Increment reference count - see description of the "msg"
* structure for details. As a convenience to developers,
* this method returns the msg pointer that is passed to it.
@ -1869,6 +1927,14 @@ char *MsgGetProp(msg_t *pMsg, struct templateEntry *pTpe,
}
/* Initialize the message class. Must be called as the very first method
* before anything else is called inside this class.
* rgerhards, 2008-01-04
*/
BEGINObjClassInit(Msg)
OBJSetMethodHandler(objMethod_SERIALIZE, MsgSerialize);
ENDObjClassInit
/*
* vi:set ai:
*/

4
msg.h
View File

@ -25,6 +25,7 @@
#ifndef MSG_H_INCLUDED
#define MSG_H_INCLUDED 1
#include "obj.h"
#include "syslogd-types.h"
#include "template.h"
@ -44,6 +45,7 @@
* called each time a "copy" is stored somewhere.
*/
struct msg {
BEGINobjInstance; /* Data to implement generic object - MUST be the first data element! */
int iRefCount; /* reference counter (0 = unused) */
short iSyslogVers; /* version of syslog protocol
* 0 - RFC 3164
@ -104,8 +106,10 @@ typedef struct msg msg_t; /* new name */
/* function prototypes
*/
PROTOTYPEObjClassInit(Msg);
char* getProgramName(msg_t*);
msg_t* MsgConstruct(void);
rsRetVal MsgSerialize(uchar **ppOutBuf, size_t *pLenBuf, void *pUsr);
void MsgDestruct(msg_t * pM);
msg_t* MsgDup(msg_t* pOld);
msg_t *MsgAddRef(msg_t *pM);

97
obj.c Normal file
View File

@ -0,0 +1,97 @@
/* obj.c
*
* This file implements a generic object "class". All other classes can
* use the service of this base class here to include auto-destruction and
* other capabilities in a generic manner.
*
* File begun on 2008-01-04 by RGerhards
*
* Copyright 2008 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
* Rsyslog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Rsyslog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
*
* A copy of the GPL can be found in the file "COPYING" in this distribution.
*/
#include "config.h"
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include "rsyslog.h"
#include "obj.h"
/* static data */
/* methods */
/* This is a dummy method to be used when a standard method has not been
* implemented by an object. Having it allows us to simply call via the
* jump table without any NULL pointer checks - which gains quite
* some performance. -- rgerhards, 2008-01-04
*/
static rsRetVal objInfoNotImplementedDummy(void __attribute__((unused)) *pThis)
{
return RS_RET_NOT_IMPLEMENTED;
}
/* construct an object Info object. Each class shall do this on init. The
* resulting object shall be cached during the lifetime of the class and each
* object shall receive a reference. A constructor MUST be provided for all
* objects, thus it is in the parameter list.
* pszName must point to constant pool memory. It is never freed.
*/
rsRetVal objInfoConstruct(objInfo_t **ppThis, objID_t objID, uchar *pszName, rsRetVal (*pDestruct)(void *))
{
DEFiRet;
int i;
objInfo_t *pThis;
assert(ppThis != NULL);
assert(pDestruct != NULL);
if((pThis = calloc(1, sizeof(objInfo_t))) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
pThis->pszName = pszName;
pThis->objID = objID;
for(i = 0 ; i < OBJ_NUM_METHODS ; ++i) {
pThis->objMethods[i] = objInfoNotImplementedDummy;
}
*ppThis = pThis;
finalize_it:
return iRet;
}
/* set a method handler */
rsRetVal objInfoSetMethod(objInfo_t *pThis, objMethod_t objMethod, rsRetVal (*pHandler)(void*))
{
assert(pThis != NULL);
assert(objMethod > 0 && objMethod < OBJ_NUM_METHODS);
pThis->objMethods[objMethod] = pHandler;
return RS_RET_OK;
}
/*
* vi:set ai:
*/

82
obj.h Normal file
View File

@ -0,0 +1,82 @@
/* Definition of the generic obj class module.
*
* This module relies heavily on preprocessor macros in order to
* provide fast execution time AND ease of use.
*
* Copyright 2008 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of rsyslog.
*
* Rsyslog is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Rsyslog is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Rsyslog. If not, see <http://www.gnu.org/licenses/>.
*
* A copy of the GPL can be found in the file "COPYING" in this distribution.
*/
#ifndef OBJ_H_INCLUDED
#define OBJ_H_INCLUDED
typedef enum { /* IDs of known object "types/classes" */
objNull = 0, /* no valid object (we do not start at zero so we can detect calloc()) */
objMsg = 1
} objID_t;
typedef enum { /* IDs of base methods supported by all objects - used for jump table, so
* they must start at zero and be incremented. -- rgerahrds, 2008-01-04
*/
objMethod_DESTRUCT = 0,
objMethod_SERIALIZE = 1,
objMethod_DESERIALIZE = 2,
objMethod_DEBUGPRINT = 3
} objMethod_t;
#define OBJ_NUM_METHODS 4 /* must be updated to contain the max number of methods supported */
typedef struct objInfo_s {
objID_t objID;
uchar *pszName;
rsRetVal (*objMethods[OBJ_NUM_METHODS])(void *pThis);
} objInfo_t;
typedef struct obj { /* the dummy struct that each derived class can be casted to */
objInfo_t *pObjInfo;
} obj_t;
/* macros */
#define DEFobjStaticHelpers static objInfo_t *pObjInfoOBJ = NULL;
#define BEGINobjInstance objInfo_t *pObjInfo
/* must be called in Constructor: */
#define objConstructSetObjInfo(pThis) ((obj_t*) (pThis))->pObjInfo = pObjInfoOBJ;
#define objDestruct(pThis) ((objInfo_t*) (pThis)->objMethods[objMethod_DESTRUCT])
/* class initializer */
#define PROTOTYPEObjClassInit(objName) rsRetVal objName##ClassInit(void)
#define BEGINObjClassInit(objName) \
rsRetVal objName##ClassInit(void) \
{ \
DEFiRet; \
CHKiRet(objInfoConstruct(&pObjInfoOBJ, obj##objName, (uchar*) #objName, (rsRetVal (*)(void*))objName##Destruct));
#define ENDObjClassInit \
finalize_it: \
return iRet; \
}
#define OBJSetMethodHandler(methodID, pHdlr) \
CHKiRet(objInfoSetMethod(pObjInfoOBJ, methodID, (rsRetVal (*)(void*)) pHdlr))
/* prototypes */
rsRetVal objInfoConstruct(objInfo_t **ppThis, objID_t objID, uchar *pszName, rsRetVal (*pDestruct)(void *));
rsRetVal objInfoSetMethod(objInfo_t *pThis, objMethod_t objMethod, rsRetVal (*pHandler)(void*));
#endif /* #ifndef OBJ_H_INCLUDED */

10
queue.c
View File

@ -230,9 +230,12 @@ rsRetVal qAddDisk(queue_t *pThis, void* pUsr)
{
DEFiRet;
int i;
long lenBuf;
uchar *pBuf;
assert(pThis != NULL);
dbgprintf("writing to file %d\n", pThis->tVars.disk.fd);
CHKiRet(pThis->serializer(pBuf, &lenBuf, pUsr)); // TODO: hier weiter machen!
i = write(pThis->tVars.disk.fd, "entry\n", 6);
dbgprintf("write wrote %d bytes, errno: %d, err %s\n", i, errno, strerror(errno));
@ -354,7 +357,10 @@ queueWorker(void *arg)
}
/* Constructor for the queue object */
rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, rsRetVal (*pConsumer)(void*))
rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, rsRetVal (*pConsumer)(void*),
rsRetVal (*serializer)(uchar **ppOutBuf, size_t *lenBuf, void *pUsr),
rsRetVal (*deSerializer)(void *ppUsr, uchar *ppBuf, size_t lenBuf)
)
{
DEFiRet;
queue_t *pThis;
@ -378,6 +384,8 @@ rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize,
pThis->notEmpty = (pthread_cond_t *) malloc (sizeof (pthread_cond_t));
pthread_cond_init (pThis->notEmpty, NULL);
pThis->qType = qType;
pThis->serializer = serializer;
pThis->deSerializer = deSerializer;
/* set type-specific handlers */
switch(qType) {

13
queue.h
View File

@ -51,6 +51,12 @@ typedef struct queue_s {
rsRetVal (*qDestruct)(struct queue_s *pThis);
rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr);
rsRetVal (*qDel)(struct queue_s *pThis, void **ppUsr);
/* the following two are currently only required for disk queuing, but
* we keep them global because we otherwise needed to change the interface
* too much.
*/
rsRetVal (*serializer)(uchar **ppOutBuf, size_t *lenBuf, void *pUsr);
rsRetVal (*deSerializer)(void *ppUsr, uchar *ppBuf, size_t lenBuf);
/* end type-specific handler */
/* synchronization variables */
pthread_mutex_t *mut;
@ -66,8 +72,6 @@ typedef struct queue_s {
qLinkedList_t *pLast;
} linklist;
struct {
rsRetVal (*serializer)(uchar **ppOutBuf, size_t *lenBuf, void *pUsr);
rsRetVal (*deSerializer)(void *ppUsr, uchar *ppBuf, size_t lenBuf);
uchar *pszSpoolDir;
size_t lenSpoolDir;
uchar *pszFilePrefix;
@ -82,8 +86,11 @@ typedef struct queue_s {
/* prototypes */
rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, rsRetVal (*pConsumer)(void*));
rsRetVal queueDestruct(queue_t *pThis);
rsRetVal queueEnqObj(queue_t *pThis, void *pUsr);
rsRetVal queueConstruct(queue_t **ppThis, queueType_t qType, int iMaxQueueSize, rsRetVal (*pConsumer)(void*),
rsRetVal (*serializer)(uchar **ppOutBuf, size_t *lenBuf, void *pUsr),
rsRetVal (*deSerializer)(void *ppUsr, uchar *ppBuf, size_t lenBuf)
);
#endif /* #ifndef QUEUE_H_INCLUDED */

View File

@ -3353,7 +3353,7 @@ init(void)
}
/* create message queue */
CHKiRet_Hdlr(queueConstruct(&pMsgQueue, MainMsgQueType, iMainMsgQueueSize, msgConsumer)) {
CHKiRet_Hdlr(queueConstruct(&pMsgQueue, MainMsgQueType, iMainMsgQueueSize, msgConsumer, MsgSerialize, NULL)) {
/* no queue is fatal, we need to give up in that case... */
fprintf(stderr, "fatal error %d: could not create message queue - rsyslogd can not run!\n", iRet);
exit(1);
@ -4608,11 +4608,27 @@ static void mainThread()
}
/* Method to initialize all global classes.
* rgerhards, 2008-01-04
*/
static rsRetVal InitGlobalClasses(void)
{
DEFiRet;
CHKiRet(MsgClassInit());
finalize_it:
return iRet;
}
/* This is the main entry point into rsyslogd. Over time, we should try to
* modularize it a bit more...
*/
int main(int argc, char **argv)
{
DEFiRet;
register int i;
register char *p;
int num_fds;
@ -4627,6 +4643,8 @@ int main(int argc, char **argv)
* or put in conditional compilation. 2005-01-18 RGerhards */
#endif
CHKiRet(InitGlobalClasses());
ppid = getpid();
if(chdir ("/") != 0)
@ -4883,6 +4901,11 @@ int main(int argc, char **argv)
die(bFinished);
thrdExit();
finalize_it:
if(iRet != RS_RET_OK)
fprintf(stderr, "rsyslogd run failed with error %d.\n", iRet);
return 0;
}