mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-20 06:10:42 +01:00
removed queue's UngetObj() call
... which is no longer needed thanks to the new queue design.
This commit is contained in:
parent
93f873277b
commit
fe5bea77ac
@ -433,11 +433,11 @@ message-caused. This is under the assumption that any reasonable responsive
|
||||
admin will hopefully test his configuration at least once before turning it
|
||||
into production. And config SQL errors should manifest immediately, so I
|
||||
expect these to be fixed before a configuration runs in production. So it is
|
||||
the chore of the output module to interpret the return code it received from
|
||||
its API and decide whether this is more likely action-caused or
|
||||
the duty of the output module to interpret the return code it received from
|
||||
the API call and decide whether the failure is more likely action-caused or
|
||||
message-caused. For database outputs, I would assume that it is always easy
|
||||
to classify failures that can only be action-caused, especially in the
|
||||
dominating case of a failed network connection or a failed server.
|
||||
to classify failures that must be action-caused, especially in the
|
||||
dominating cases of failed network connections or failed servers.
|
||||
|
||||
For other outputs it may not be as easy. But, for example, all stream network
|
||||
outputs can detect a broken connection, so this also is a sure fit.
|
||||
|
||||
110
runtime/queue.c
110
runtime/queue.c
@ -73,7 +73,6 @@ static rsRetVal GetDeqBatchSize(qqueue_t *pThis, int *pVal);
|
||||
static int qqueueIsIdleDA(qqueue_t *pThis);
|
||||
static rsRetVal ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave);
|
||||
static rsRetVal ConsumerCancelCleanup(void *arg1, void *arg2);
|
||||
static rsRetVal UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex);
|
||||
|
||||
/* some constants for queuePersist () */
|
||||
#define QUEUE_CHECKPOINT 1
|
||||
@ -169,7 +168,7 @@ finalize_it:
|
||||
/* methods */
|
||||
|
||||
|
||||
/* get the overall queue size, which includes ungotten objects. Must only be called
|
||||
/* get the overall queue size. Must only be called
|
||||
* while mutex is locked!
|
||||
* rgerhards, 2008-01-29
|
||||
*/
|
||||
@ -178,11 +177,11 @@ qqueueGetOverallQueueSize(qqueue_t *pThis)
|
||||
{
|
||||
#if 0 /* leave a bit in for debugging -- rgerhards, 2008-01-30 */
|
||||
BEGINfunc
|
||||
dbgoprint((obj_t*) pThis, "queue size: %d (regular %d, ungotten %d)\n",
|
||||
pThis->iQueueSize + pThis->iUngottenObjs, pThis->iQueueSize, pThis->iUngottenObjs);
|
||||
dbgoprint((obj_t*) pThis, "queue size: %d (regular %d)\n",
|
||||
pThis->iQueueSize, pThis->iQueueSize);
|
||||
ENDfunc
|
||||
#endif
|
||||
return pThis->iQueueSize + pThis->iUngottenObjs;
|
||||
return pThis->iQueueSize;
|
||||
}
|
||||
|
||||
|
||||
@ -837,8 +836,6 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
|
||||
uchar pszQIFNam[MAXFNAME];
|
||||
size_t lenQIFNam;
|
||||
struct stat stat_buf;
|
||||
int iUngottenObjs;
|
||||
obj_t *pUsr;
|
||||
|
||||
ISOBJ_TYPE_assert(pThis, qqueue);
|
||||
|
||||
@ -868,18 +865,7 @@ qqueueTryLoadPersistedInfo(qqueue_t *pThis)
|
||||
/* first, we try to read the property bag for ourselfs */
|
||||
CHKiRet(obj.DeserializePropBag((obj_t*) pThis, psQIF));
|
||||
|
||||
/* then the ungotten object queue */
|
||||
iUngottenObjs = pThis->iUngottenObjs;
|
||||
pThis->iUngottenObjs = 0; /* will be incremented when we add objects! */
|
||||
|
||||
while(iUngottenObjs > 0) {
|
||||
/* fill the queue from disk */
|
||||
CHKiRet(obj.Deserialize((void*) &pUsr, (uchar*)"msg", psQIF, NULL, NULL));
|
||||
UngetObj(pThis, pUsr, MUTEX_ALREADY_LOCKED);
|
||||
--iUngottenObjs; /* one less */
|
||||
}
|
||||
|
||||
/* and now the stream objects (some order as when persisted!) */
|
||||
/* then the stream objects (same order as when persisted!) */
|
||||
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pWrite, (uchar*) "strm", psQIF,
|
||||
(rsRetVal(*)(obj_t*,void*))qqueueLoadPersStrmInfoFixup, pThis));
|
||||
CHKiRet(obj.Deserialize(&pThis->tVars.disk.pReadDel, (uchar*) "strm", psQIF,
|
||||
@ -1122,57 +1108,6 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
|
||||
/* --------------- end type-specific handlers -------------------- */
|
||||
|
||||
|
||||
/* unget a user pointer that has been dequeued. This functionality is especially important
|
||||
* for consumer cancel cleanup handlers. To support it, a short list of ungotten user pointers
|
||||
* is maintened in memory.
|
||||
* rgerhards, 2008-01-20
|
||||
*/
|
||||
static rsRetVal
|
||||
UngetObj(qqueue_t *pThis, obj_t *pUsr, int bLockMutex)
|
||||
{
|
||||
DEFiRet;
|
||||
DEFVARS_mutexProtection;
|
||||
|
||||
ISOBJ_TYPE_assert(pThis, qqueue);
|
||||
ISOBJ_assert(pUsr); /* TODO: we aborted right at this place at least 3 times -- race? 2008-02-28, -03-10, -03-15
|
||||
The second time I noticed it the queue was in destruction with NO worker threads
|
||||
running. The pUsr ptr was totally off and provided no clue what it may be pointing
|
||||
at (except that it looked like the static data pool). Both times, the abort happend
|
||||
inside an action queue */
|
||||
|
||||
dbgoprint((obj_t*) pThis, "ungetting user object %s\n", obj.GetName(pUsr));
|
||||
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, bLockMutex);
|
||||
iRet = qqueueAddLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, pUsr);
|
||||
++pThis->iUngottenObjs; /* indicate one more */
|
||||
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
|
||||
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
|
||||
/* dequeues a user pointer from the ungotten queue. Pointers from there should always be
|
||||
* dequeued first.
|
||||
*
|
||||
* This function must only be called when the mutex is locked!
|
||||
*
|
||||
* rgerhards, 2008-01-29
|
||||
*/
|
||||
static rsRetVal
|
||||
GetUngottenObj(qqueue_t *pThis, obj_t **ppUsr)
|
||||
{
|
||||
DEFiRet;
|
||||
|
||||
ISOBJ_TYPE_assert(pThis, qqueue);
|
||||
ASSERT(ppUsr != NULL);
|
||||
|
||||
iRet = qqueueDelLinkedList(&pThis->pUngetRoot, &pThis->pUngetLast, ppUsr);
|
||||
--pThis->iUngottenObjs; /* indicate one less */
|
||||
dbgoprint((obj_t*) pThis, "dequeued ungotten user object %s\n", obj.GetName(*ppUsr));
|
||||
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
|
||||
/* generic code to add a queue entry
|
||||
* We use some specific code to most efficiently support direct mode
|
||||
* queues. This is justified in spite of the gain and the need to do some
|
||||
@ -1198,8 +1133,6 @@ finalize_it:
|
||||
|
||||
|
||||
/* generic code to remove a queue entry
|
||||
* rgerhards, 2008-01-29: we must first see if there is any object in the
|
||||
* ungotten list and, if so, dequeue it first.
|
||||
*/
|
||||
static rsRetVal
|
||||
qqueueDel(qqueue_t *pThis, void *pUsr)
|
||||
@ -1213,13 +1146,8 @@ qqueueDel(qqueue_t *pThis, void *pUsr)
|
||||
* If we decrement, however, we may lose a message. But that is better than
|
||||
* losing the whole process because it loops... -- rgerhards, 2008-01-03
|
||||
*/
|
||||
if(pThis->iUngottenObjs > 0) {
|
||||
iRet = GetUngottenObj(pThis, (obj_t**) pUsr);
|
||||
} else {
|
||||
iRet = pThis->qDeq(pThis, pUsr);
|
||||
// TODO: ULTRA iRet = pThis->qDel(pThis, pUsr);
|
||||
ATOMIC_DEC(pThis->iQueueSize);
|
||||
}
|
||||
iRet = pThis->qDeq(pThis, pUsr);
|
||||
ATOMIC_DEC(pThis->iQueueSize);
|
||||
|
||||
dbgoprint((obj_t*) pThis, "entry deleted, state %d, size now %d entries\n",
|
||||
iRet, pThis->iQueueSize);
|
||||
@ -1528,6 +1456,8 @@ finalize_it:
|
||||
static rsRetVal
|
||||
ConsumerCancelCleanup(void *arg1, void *arg2)
|
||||
{
|
||||
//TODO: looks like we no longer need it!
|
||||
/*
|
||||
DEFiRet;
|
||||
|
||||
qqueue_t *pThis = (qqueue_t*) arg1;
|
||||
@ -1535,14 +1465,9 @@ ConsumerCancelCleanup(void *arg1, void *arg2)
|
||||
|
||||
ISOBJ_TYPE_assert(pThis, qqueue);
|
||||
|
||||
if(pUsr != NULL) {
|
||||
/* make sure the data element is not lost */
|
||||
dbgoprint((obj_t*) pThis, "cancelation cleanup handler consumer called, we need to unget one user data element\n");
|
||||
CHKiRet(UngetObj(pThis, pUsr, LOCK_MUTEX));
|
||||
}
|
||||
|
||||
finalize_it:
|
||||
RETiRet;
|
||||
*/
|
||||
return RS_RET_OK;
|
||||
}
|
||||
|
||||
|
||||
@ -2188,7 +2113,6 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
|
||||
strm_t *psQIF = NULL; /* Queue Info File */
|
||||
uchar pszQIFNam[MAXFNAME];
|
||||
size_t lenQIFNam;
|
||||
obj_t *pUsr;
|
||||
|
||||
ASSERT(pThis != NULL);
|
||||
|
||||
@ -2235,20 +2159,10 @@ static rsRetVal qqueuePersist(qqueue_t *pThis, int bIsCheckpoint)
|
||||
*/
|
||||
CHKiRet(obj.BeginSerializePropBag(psQIF, (obj_t*) pThis));
|
||||
objSerializeSCALAR(psQIF, iQueueSize, INT);
|
||||
objSerializeSCALAR(psQIF, iUngottenObjs, INT);
|
||||
objSerializeSCALAR(psQIF, tVars.disk.sizeOnDisk, INT64);
|
||||
objSerializeSCALAR(psQIF, tVars.disk.bytesRead, INT64);
|
||||
CHKiRet(obj.EndSerialize(psQIF));
|
||||
|
||||
/* now we must persist all objects on the ungotten queue - they can not go to
|
||||
* to the regular files. -- rgerhards, 2008-01-29
|
||||
*/
|
||||
while(pThis->iUngottenObjs > 0) {
|
||||
CHKiRet(GetUngottenObj(pThis, &pUsr));
|
||||
CHKiRet((objSerialize(pUsr))(pUsr, psQIF));
|
||||
objDestruct(pUsr);
|
||||
}
|
||||
|
||||
/* now persist the stream info */
|
||||
CHKiRet(strmSerialize(pThis->tVars.disk.pWrite, psQIF));
|
||||
CHKiRet(strmSerialize(pThis->tVars.disk.pReadDel, psQIF));
|
||||
@ -2615,8 +2529,6 @@ static rsRetVal qqueueSetProperty(qqueue_t *pThis, var_t *pProp)
|
||||
|
||||
if(isProp("iQueueSize")) {
|
||||
pThis->iQueueSize = pProp->val.num;
|
||||
} else if(isProp("iUngottenObjs")) {
|
||||
pThis->iUngottenObjs = pProp->val.num;
|
||||
} else if(isProp("tVars.disk.sizeOnDisk")) {
|
||||
pThis->tVars.disk.sizeOnDisk = pProp->val.num;
|
||||
} else if(isProp("tVars.disk.bytesRead")) {
|
||||
|
||||
@ -152,12 +152,6 @@ typedef struct queue_s {
|
||||
struct queue_s *pqDA; /* queue for disk-assisted modes */
|
||||
struct queue_s *pqParent;/* pointer to the parent (if this is a child queue) */
|
||||
int bDAEnqOnly; /* EnqOnly setting for DA queue */
|
||||
/* some data elements for the queueUngetObj() functionality. This list should always be short
|
||||
* and is always kept in memory
|
||||
*/
|
||||
qLinkedList_t *pUngetRoot;
|
||||
qLinkedList_t *pUngetLast;
|
||||
int iUngottenObjs; /* number of objects currently in the "ungotten" list */
|
||||
/* now follow queueing mode specific data elements */
|
||||
union { /* different data elements based on queue type (qType) */
|
||||
struct {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user