mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-20 13:10:43 +01:00
partial ability to read a disk queue back in (not completed, but would like
to save source for the weekend)
This commit is contained in:
parent
a80f7776c5
commit
366060a51d
2
msg.c
2
msg.c
@ -394,7 +394,9 @@ static rsRetVal MsgSerialize(msg_t *pThis, strm_t *pStrm)
|
||||
assert(pThis != NULL);
|
||||
assert(pStrm != NULL);
|
||||
|
||||
dbgprintf("MsgSerialize\n");
|
||||
CHKiRet(objBeginSerialize(pStrm, (obj_t*) pThis));
|
||||
dbgprintf("post MsgSerialize\n");
|
||||
objSerializeSCALAR(pStrm, iProtocolVersion, SHORT);
|
||||
objSerializeSCALAR(pStrm, iSeverity, SHORT);
|
||||
objSerializeSCALAR(pStrm, iFacility, SHORT);
|
||||
|
||||
54
obj.c
54
obj.c
@ -89,6 +89,7 @@ rsRetVal objInfoConstruct(objInfo_t **ppThis, objID_t objID, uchar *pszName, int
|
||||
|
||||
pThis->pszName = pszName;
|
||||
pThis->iObjVers = iObjVers;
|
||||
fprintf(stderr, "objid %d set for %s\n", objID, pszName);
|
||||
pThis->objID = objID;
|
||||
|
||||
pThis->objMethods[0] = pConstruct;
|
||||
@ -109,7 +110,6 @@ rsRetVal objInfoSetMethod(objInfo_t *pThis, objMethod_t objMethod, rsRetVal (*pH
|
||||
{
|
||||
assert(pThis != NULL);
|
||||
assert(objMethod > 0 && objMethod < OBJ_NUM_METHODS);
|
||||
|
||||
pThis->objMethods[objMethod] = pHandler;
|
||||
|
||||
return RS_RET_OK;
|
||||
@ -164,6 +164,7 @@ rsRetVal objBeginSerialize(strm_t *pStrm, obj_t *pObj)
|
||||
{
|
||||
DEFiRet;
|
||||
|
||||
dbgprintf("objBeginSerialize obj type: %x\n", objGetObjID(pStrm));
|
||||
ISOBJ_TYPE_assert(pStrm, strm);
|
||||
ISOBJ_assert(pObj);
|
||||
|
||||
@ -212,6 +213,7 @@ rsRetVal objSerializeProp(strm_t *pStrm, uchar *pszPropName, propertyType_t prop
|
||||
assert(pszPropName != NULL);
|
||||
|
||||
/*dbgprintf("objSerializeProp: strm %p, propName '%s', type %d, pUsr %p\n", pStrm, pszPropName, propType, pUsr);*/
|
||||
dbgprintf("objSerializeProp: strm %p, propName '%s', type %d, pUsr %p\n", pStrm, pszPropName, propType, pUsr);
|
||||
/* if we have no user pointer, there is no need to write this property.
|
||||
* TODO: think if that's the righ point of view
|
||||
* rgerhards, 2008-01-06
|
||||
@ -586,13 +588,14 @@ finalize_it:
|
||||
* of the trailer. Header must already have been processed.
|
||||
* rgerhards, 2008-01-11
|
||||
*/
|
||||
rsRetVal objDeserializeProperties(obj_t *pObj, objID_t oID, strm_t *pStrm)
|
||||
static rsRetVal objDeserializeProperties(obj_t *pObj, objID_t oID, strm_t *pStrm)
|
||||
{
|
||||
DEFiRet;
|
||||
property_t propBuf;
|
||||
|
||||
ISOBJ_assert(pObj);
|
||||
ISOBJ_TYPE_assert(pStrm, strm);
|
||||
assert(oID > 0 && oID < OBJ_NUM_IDS);
|
||||
|
||||
iRet = objDeserializeProperty(&propBuf, pStrm);
|
||||
while(iRet == RS_RET_OK) {
|
||||
@ -627,7 +630,7 @@ rsRetVal objDeserialize(void *ppObj, objID_t objTypeExpected, strm_t *pStrm)
|
||||
|
||||
assert(ppObj != NULL);
|
||||
assert(objTypeExpected > 0 && objTypeExpected < OBJ_NUM_IDS);
|
||||
assert(pStrm != NULL);
|
||||
ISOBJ_TYPE_assert(pStrm, strm);
|
||||
|
||||
/* we de-serialize the header. if all goes well, we are happy. However, if
|
||||
* we experience a problem, we try to recover. We do this by skipping to
|
||||
@ -662,6 +665,50 @@ finalize_it:
|
||||
}
|
||||
|
||||
|
||||
/* De-Serialize an object, but treat it as property bag.
|
||||
* rgerhards, 2008-01-11
|
||||
*/
|
||||
rsRetVal objDeserializeObjAsPropBag(obj_t *pObj, strm_t *pStrm)
|
||||
{
|
||||
DEFiRet;
|
||||
rsRetVal iRetLocal;
|
||||
objID_t oID = 0; /* this assignment is just to supress a compiler warning - this saddens me */
|
||||
int oVers = 0; /* after all, it is totally useless but takes up some execution time... */
|
||||
|
||||
dbgprintf("objDese...AsPropBag 0\n");
|
||||
ISOBJ_assert(pObj);
|
||||
dbgprintf("objDese...AsPropBag 0a\n");
|
||||
ISOBJ_TYPE_assert(pStrm, strm);
|
||||
dbgprintf("objDese...AsPropBag 1\n");
|
||||
|
||||
/* we de-serialize the header. if all goes well, we are happy. However, if
|
||||
* we experience a problem, we try to recover. We do this by skipping to
|
||||
* the next object header. This is defined via the line-start cookies. In
|
||||
* worst case, we exhaust the queue, but then we receive EOF return state
|
||||
* from objDeserializeTryRecover(), what will cause us to ultimately give up.
|
||||
* rgerhards, 2008-07-08
|
||||
*/
|
||||
do {
|
||||
iRetLocal = objDeserializeHeader((uchar*) "Obj", &oID, &oVers, pStrm);
|
||||
if(iRetLocal != RS_RET_OK) {
|
||||
dbgprintf("objDeserializeObjAsPropBag error %d during header - trying to recover\n", iRetLocal);
|
||||
CHKiRet(objDeserializeTryRecover(pStrm));
|
||||
}
|
||||
} while(iRetLocal != RS_RET_OK);
|
||||
|
||||
dbgprintf("objDese...AsPropBag 2\n");
|
||||
if(oID != objGetObjID(pObj))
|
||||
ABORT_FINALIZE(RS_RET_INVALID_OID);
|
||||
|
||||
/* we got the object, now we need to fill the properties */
|
||||
CHKiRet(objDeserializeProperties(pObj, oID, pStrm));
|
||||
|
||||
finalize_it:
|
||||
return iRet;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/* De-Serialize an object property bag. As a property bag contains only partial properties,
|
||||
* it is not instanciable. Thus, the caller must provide a pointer of an already-instanciated
|
||||
* object of the correct type.
|
||||
@ -719,6 +766,7 @@ rsRetVal objRegisterObj(objID_t oID, objInfo_t *pInfo)
|
||||
DEFiRet;
|
||||
|
||||
assert(pInfo != NULL);
|
||||
assert(arrObjInfo[oID] == NULL);
|
||||
if(oID < 1 || oID > OBJ_NUM_IDS)
|
||||
ABORT_FINALIZE(RS_RET_INVALID_OID);
|
||||
|
||||
|
||||
1
obj.h
1
obj.h
@ -72,6 +72,7 @@
|
||||
/* the next macro MUST be called in Constructors: */
|
||||
#ifndef NDEBUG /* this means if debug... */
|
||||
# define objConstructSetObjInfo(pThis) \
|
||||
assert(pThis->pObjInfo == NULL); \
|
||||
((obj_t*) (pThis))->pObjInfo = pObjInfoOBJ; \
|
||||
((obj_t*) (pThis))->iObjCooCKiE = 0xBADEFEE
|
||||
#else
|
||||
|
||||
71
queue.c
71
queue.c
@ -479,6 +479,68 @@ queueWorker(void *arg)
|
||||
pthread_exit(0);
|
||||
}
|
||||
|
||||
|
||||
/* This method checks if there is any persistent information on the
|
||||
* queue and, if so, tries to load it. This method can only legally be
|
||||
* called from the destructor (I moved it out from there to keep the
|
||||
* Constructor code somewhat smaller). -- rgerhards, 2008-01-11
|
||||
*/
|
||||
static rsRetVal
|
||||
queueTryLoadPersistedInfo(queue_t *pThis)
|
||||
{
|
||||
DEFiRet;
|
||||
strm_t *psQIF = NULL;
|
||||
uchar pszQIFNam[MAXFNAME];
|
||||
size_t lenQIFNam;
|
||||
struct stat stat_buf;
|
||||
|
||||
ISOBJ_TYPE_assert(pThis, queue);
|
||||
|
||||
/* Construct file name */
|
||||
lenQIFNam = snprintf((char*)pszQIFNam, sizeof(pszQIFNam) / sizeof(uchar), "%s/%s.qi",
|
||||
(char*) glblGetWorkDir(), (char*)pThis->pszFilePrefix);
|
||||
|
||||
/* check if the file exists */
|
||||
dbgprintf("stat '%s'\n", pszQIFNam);
|
||||
if(stat((char*) pszQIFNam, &stat_buf) == -1) {
|
||||
if(errno == ENOENT) {
|
||||
dbgprintf("Queue 0x%lx: clean startup, no .qi file found\n", queueGetID(pThis));
|
||||
ABORT_FINALIZE(RS_RET_OK);
|
||||
} else {
|
||||
dbgprintf("Queue 0x%lx: error %d trying to access .qi file\n", queueGetID(pThis), errno);
|
||||
ABORT_FINALIZE(RS_RET_IO_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
/* If we reach this point, we have a .qi file */
|
||||
|
||||
CHKiRet(strmConstruct(&psQIF));
|
||||
CHKiRet(strmSetDir(psQIF, glblGetWorkDir(), strlen((char*)glblGetWorkDir())));
|
||||
CHKiRet(strmSettOperationsMode(psQIF, STREAMMODE_READ));
|
||||
CHKiRet(strmSetsType(psQIF, STREAMTYPE_FILE_SINGLE));
|
||||
CHKiRet(strmSetFName(psQIF, pszQIFNam, lenQIFNam));
|
||||
CHKiRet(strmConstructFinalize(psQIF));
|
||||
|
||||
/* first, we try to read the property bag for ourselfs */
|
||||
CHKiRet(objDeserializePropBag((obj_t*) pThis, psQIF));
|
||||
|
||||
/* and now the stream objects (some order as when persisted!) */
|
||||
CHKiRet(objDeserializeObjAsPropBag(pThis->tVars.disk.pWrite, psQIF));
|
||||
CHKiRet(objDeserializeObjAsPropBag(pThis->tVars.disk.pRead, psQIF));
|
||||
|
||||
finalize_it:
|
||||
if(psQIF != NULL)
|
||||
strmDestruct(psQIF);
|
||||
|
||||
if(iRet != RS_RET_OK) {
|
||||
dbgprintf("Queue 0x%lx: error %d reading .qi file - can not start queue\n",
|
||||
queueGetID(pThis), iRet);
|
||||
}
|
||||
|
||||
return iRet;
|
||||
}
|
||||
|
||||
|
||||
/* Constructor for the queue object
|
||||
* This constructs the data structure, but does not yet start the queue. That
|
||||
* is done by queueStart(). The reason is that we want to give the caller a chance
|
||||
@ -561,7 +623,7 @@ finalize_it:
|
||||
/* start up the queue - it must have been constructed and parameters defined
|
||||
* before.
|
||||
*/
|
||||
rsRetVal queueStart(queue_t *pThis)
|
||||
rsRetVal queueStart(queue_t *pThis) /* this is the ConstructionFinalizer */
|
||||
{
|
||||
DEFiRet;
|
||||
int iState;
|
||||
@ -569,6 +631,9 @@ rsRetVal queueStart(queue_t *pThis)
|
||||
|
||||
assert(pThis != NULL);
|
||||
|
||||
/* and now check if there is some persistent information that needs to be read in */
|
||||
CHKiRet(queueTryLoadPersistedInfo(pThis));
|
||||
|
||||
dbgprintf("Queue 0x%lx: type %d, maxFileSz %ld starting\n", (unsigned long) pThis, pThis->qType,
|
||||
pThis->iMaxFileSize);
|
||||
|
||||
@ -650,10 +715,12 @@ static rsRetVal queuePersist(queue_t *pThis)
|
||||
objSerializeSCALAR_VAR(psQIF, qType, INT, i);
|
||||
objSerializeSCALAR(psQIF, iQueueSize, INT);
|
||||
CHKiRet(objEndSerialize(psQIF));
|
||||
dbgprintf("queue serial 1\n");
|
||||
|
||||
/* this is disk specific and must be moved to a function */
|
||||
CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF));
|
||||
CHKiRet(strmSerialize(pThis->tVars.disk.pRead, psQIF));
|
||||
dbgprintf("queue serial 2\n");
|
||||
|
||||
/* persist queue object itself */
|
||||
|
||||
@ -852,7 +919,7 @@ BEGINObjClassInit(queue, 1)
|
||||
//OBJSetMethodHandler(objMethod_SERIALIZE, strmSerialize);
|
||||
OBJSetMethodHandler(objMethod_SETPROPERTY, queueSetProperty);
|
||||
//OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, strmConstructFinalize);
|
||||
ENDObjClassInit(strm)
|
||||
ENDObjClassInit(queue)
|
||||
|
||||
/*
|
||||
* vi:set ai:
|
||||
|
||||
@ -107,6 +107,8 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
|
||||
RS_RET_FILE_PREFIX_MISSING = -2036, /**< a required file prefix (parameter?) is missing */
|
||||
RS_RET_INVALID_HEADER_RECTYPE = -2037, /**< invalid record type in header or invalid header */
|
||||
RS_RET_QTYPE_MISMATCH = -2038, /**< different qType when reading back a property type */
|
||||
RS_RET_NO_FILE_ACCESS = -2039, /**< covers EACCES error on file open() */
|
||||
RS_RET_FILE_NOT_FOUND = -2040, /**< file not found */
|
||||
RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */
|
||||
RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */
|
||||
RS_RET_NO_RUN = 3, /**< operation successful, but function does not like to be executed */
|
||||
|
||||
44
stream.c
44
stream.c
@ -88,6 +88,15 @@ static rsRetVal strmOpenFile(strm_t *pThis)
|
||||
iFlags = O_WRONLY | O_TRUNC | O_CREAT | O_APPEND;
|
||||
|
||||
pThis->fd = open((char*)pThis->pszCurrFName, iFlags, pThis->tOpenMode);
|
||||
if(pThis->fd == -1) {
|
||||
int ierrnoSave = errno;
|
||||
dbgprintf("Stream 0x%lx: open error %d\n", (unsigned long) pThis, errno);
|
||||
if(ierrnoSave == ENOENT)
|
||||
ABORT_FINALIZE(RS_RET_FILE_NOT_FOUND);
|
||||
else
|
||||
ABORT_FINALIZE(RS_RET_IO_ERROR);
|
||||
}
|
||||
|
||||
pThis->iCurrOffs = 0;
|
||||
|
||||
dbgprintf("Stream 0x%lx: opened file '%s' for %s (0x%x) as %d\n", (unsigned long) pThis,
|
||||
@ -621,28 +630,39 @@ rsRetVal strmSerialize(strm_t *pThis, strm_t *pStrm)
|
||||
int i;
|
||||
long l;
|
||||
|
||||
assert(pThis != NULL);
|
||||
assert(pStrm != NULL);
|
||||
ISOBJ_TYPE_assert(pThis, strm);
|
||||
ISOBJ_TYPE_assert(pStrm, strm);
|
||||
|
||||
dbgprintf("strmSerialize 1\n");
|
||||
CHKiRet(objBeginSerialize(pStrm, (obj_t*) pThis));
|
||||
|
||||
i = pThis->sType;
|
||||
objSerializeSCALAR_VAR(pStrm, sType, INT, i);
|
||||
dbgprintf("strmSerialize 2\n");
|
||||
objSerializeSCALAR(pStrm, iCurrFNum, INT);
|
||||
objSerializePTR(pStrm, pszFName, PSZ);
|
||||
i = pThis->tOperationsMode;
|
||||
objSerializeSCALAR_VAR(pStrm, tOperationsMode, INT, i);
|
||||
i = pThis->tOpenMode;
|
||||
objSerializeSCALAR_VAR(pStrm, tOpenMode, INT, i);
|
||||
l = (long) pThis->iMaxFileSize;
|
||||
objSerializeSCALAR_VAR(pStrm, iMaxFileSize, LONG, l);
|
||||
objSerializeSCALAR(pStrm, iMaxFiles, INT);
|
||||
objSerializeSCALAR(pStrm, iFileNumDigits, INT);
|
||||
objSerializeSCALAR(pStrm, bDeleteOnClose, INT);
|
||||
|
||||
i = pThis->sType;
|
||||
objSerializeSCALAR_VAR(pStrm, sType, INT, i);
|
||||
|
||||
i = pThis->tOperationsMode;
|
||||
objSerializeSCALAR_VAR(pStrm, tOperationsMode, INT, i);
|
||||
|
||||
i = pThis->tOpenMode;
|
||||
objSerializeSCALAR_VAR(pStrm, tOpenMode, INT, i);
|
||||
|
||||
l = (long) pThis->iCurrOffs;
|
||||
objSerializeSCALAR_VAR(pStrm, iCurrOffs, LONG, l);
|
||||
|
||||
// TODO: really serialize?
|
||||
//l = (long) pThis->iMaxFileSize;
|
||||
//objSerializeSCALAR_VAR(pStrm, iMaxFileSize, LONG, l);
|
||||
|
||||
CHKiRet(objEndSerialize(pStrm));
|
||||
|
||||
finalize_it:
|
||||
dbgprintf("strmSerialize out %d\n", iRet);
|
||||
return iRet;
|
||||
}
|
||||
|
||||
@ -656,7 +676,7 @@ rsRetVal strmSetProperty(strm_t *pThis, property_t *pProp)
|
||||
{
|
||||
DEFiRet;
|
||||
|
||||
ISOBJ_TYPE_assert(pThis, Msg);
|
||||
ISOBJ_TYPE_assert(pThis, strm);
|
||||
assert(pProp != NULL);
|
||||
|
||||
if(isProp("sType")) {
|
||||
@ -669,6 +689,8 @@ rsRetVal strmSetProperty(strm_t *pThis, property_t *pProp)
|
||||
CHKiRet(strmSettOperationsMode(pThis, pProp->val.vInt));
|
||||
} else if(isProp("tOpenMode")) {
|
||||
CHKiRet(strmSettOpenMode(pThis, pProp->val.vInt));
|
||||
} else if(isProp("iCurrOffs")) {
|
||||
pThis->iCurrOffs = pProp->val.vLong;
|
||||
} else if(isProp("iMaxFileSize")) {
|
||||
CHKiRet(strmSetiMaxFileSize(pThis, pProp->val.vLong));
|
||||
} else if(isProp("iMaxFiles")) {
|
||||
|
||||
2
stream.h
2
stream.h
@ -73,12 +73,12 @@ typedef struct strm_s {
|
||||
int iMaxFiles; /* maximum number of files if a circular mode is in use */
|
||||
int iFileNumDigits;/* min number of digits to use in file number (only in circular mode) */
|
||||
int bDeleteOnClose; /* set to 1 to auto-delete on close -- be careful with that setting! */
|
||||
size_t iCurrOffs;/* current offset */
|
||||
/* dynamic properties, valid only during file open, not to be persistet */
|
||||
size_t sIOBufSize;/* size of IO buffer */
|
||||
uchar *pszDir; /* Directory */
|
||||
int lenDir;
|
||||
int fd; /* the file descriptor, -1 if closed */
|
||||
size_t iCurrOffs;/* current offset */
|
||||
uchar *pszCurrFName; /* name of current file (if open) */
|
||||
uchar *pIOBuf; /* io Buffer */
|
||||
size_t iBufPtrMax; /* current max Ptr in Buffer (if partial read!) */
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user