mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-20 14:20:42 +01:00
some cleanup
This commit is contained in:
parent
24cd5aee47
commit
553d1880d4
@ -239,7 +239,6 @@ qqueueAdviseMaxWorkers(qqueue_t *pThis)
|
|||||||
ISOBJ_TYPE_assert(pThis, qqueue);
|
ISOBJ_TYPE_assert(pThis, qqueue);
|
||||||
|
|
||||||
if(!pThis->bEnqOnly) {
|
if(!pThis->bEnqOnly) {
|
||||||
dbgprintf("AdviseMaxWorkers: log Queue Size: %d, high water mark %d\n",
|
|
||||||
getLogicalQueueSize(pThis) , pThis->iHighWtrMrk);
|
getLogicalQueueSize(pThis) , pThis->iHighWtrMrk);
|
||||||
if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) {
|
if(pThis->bIsDA && getLogicalQueueSize(pThis) >= pThis->iHighWtrMrk) {
|
||||||
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
|
wtpAdviseMaxWorkers(pThis->pWtpDA, 1); /* disk queues have always one worker */
|
||||||
@ -486,26 +485,6 @@ static rsRetVal qDelFixedArray(qqueue_t *pThis)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* reset the logical dequeue pointer to the physical dequeue position.
|
|
||||||
* This is only needed after we cancelled workers (during queue shutdown).
|
|
||||||
*/
|
|
||||||
static rsRetVal
|
|
||||||
qUnDeqAllFixedArray(qqueue_t *pThis)
|
|
||||||
{
|
|
||||||
DEFiRet;
|
|
||||||
|
|
||||||
ISOBJ_TYPE_assert(pThis, qqueue);
|
|
||||||
|
|
||||||
DBGOPRINT((obj_t*) pThis, "resetting FixedArray deq index to %ld (was %ld), logical dequeue count %d\n",
|
|
||||||
pThis->tVars.farray.head, pThis->tVars.farray.deqhead, pThis->nLogDeq);
|
|
||||||
|
|
||||||
pThis->tVars.farray.deqhead = pThis->tVars.farray.head;
|
|
||||||
pThis->nLogDeq = 0;
|
|
||||||
|
|
||||||
RETiRet;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* -------------------- linked list -------------------- */
|
/* -------------------- linked list -------------------- */
|
||||||
|
|
||||||
|
|
||||||
@ -597,26 +576,6 @@ static rsRetVal qDelLinkedList(qqueue_t *pThis)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* reset the logical dequeue pointer to the physical dequeue position.
|
|
||||||
* This is only needed after we cancelled workers (during queue shutdown).
|
|
||||||
*/
|
|
||||||
static rsRetVal
|
|
||||||
qUnDeqAllLinkedList(qqueue_t *pThis)
|
|
||||||
{
|
|
||||||
DEFiRet;
|
|
||||||
|
|
||||||
ASSERT(pThis != NULL);
|
|
||||||
|
|
||||||
DBGOPRINT((obj_t*) pThis, "resetting LinkedList deq ptr to %p (was %p), logical dequeue count %d\n",
|
|
||||||
pThis->tVars.linklist.pDelRoot, pThis->tVars.linklist.pDeqRoot, pThis->nLogDeq);
|
|
||||||
|
|
||||||
pThis->tVars.linklist.pDeqRoot = pThis->tVars.linklist.pDelRoot;
|
|
||||||
pThis->nLogDeq = 0;
|
|
||||||
|
|
||||||
RETiRet;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* -------------------- disk -------------------- */
|
/* -------------------- disk -------------------- */
|
||||||
|
|
||||||
|
|
||||||
@ -863,16 +822,6 @@ finalize_it:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* This is a dummy function for disks - we do not need to reset anything
|
|
||||||
* because everything is already persisted...
|
|
||||||
*/
|
|
||||||
static rsRetVal
|
|
||||||
qUnDeqAllDisk(__attribute__((unused)) qqueue_t *pThis)
|
|
||||||
{
|
|
||||||
return RS_RET_OK;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* -------------------- direct (no queueing) -------------------- */
|
/* -------------------- direct (no queueing) -------------------- */
|
||||||
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis)
|
static rsRetVal qConstructDirect(qqueue_t __attribute__((unused)) *pThis)
|
||||||
{
|
{
|
||||||
@ -917,12 +866,6 @@ static rsRetVal qDelDirect(qqueue_t __attribute__((unused)) *pThis)
|
|||||||
return RS_RET_OK;
|
return RS_RET_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
static rsRetVal
|
|
||||||
qUnDeqAllDirect(__attribute__((unused)) qqueue_t *pThis)
|
|
||||||
{
|
|
||||||
return RS_RET_OK;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* --------------- end type-specific handlers -------------------- */
|
/* --------------- end type-specific handlers -------------------- */
|
||||||
|
|
||||||
@ -1192,7 +1135,6 @@ ShutdownWorkers(qqueue_t *pThis)
|
|||||||
DBGOPRINT((obj_t*) pThis, "initiating worker thread shutdown sequence\n");
|
DBGOPRINT((obj_t*) pThis, "initiating worker thread shutdown sequence\n");
|
||||||
|
|
||||||
CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis));
|
CHKiRet(tryShutdownWorkersWithinQueueTimeout(pThis));
|
||||||
dbgprintf("YYY: physical queue size: %d\n", getPhysicalQueueSize(pThis));
|
|
||||||
|
|
||||||
if(getPhysicalQueueSize(pThis) > 0) {
|
if(getPhysicalQueueSize(pThis) > 0) {
|
||||||
CHKiRet(tryShutdownWorkersWithinActionTimeout(pThis));
|
CHKiRet(tryShutdownWorkersWithinActionTimeout(pThis));
|
||||||
@ -1260,7 +1202,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
|
|||||||
pThis->qAdd = qAddFixedArray;
|
pThis->qAdd = qAddFixedArray;
|
||||||
pThis->qDeq = qDeqFixedArray;
|
pThis->qDeq = qDeqFixedArray;
|
||||||
pThis->qDel = qDelFixedArray;
|
pThis->qDel = qDelFixedArray;
|
||||||
pThis->qUnDeqAll = qUnDeqAllFixedArray;
|
|
||||||
break;
|
break;
|
||||||
case QUEUETYPE_LINKEDLIST:
|
case QUEUETYPE_LINKEDLIST:
|
||||||
pThis->qConstruct = qConstructLinkedList;
|
pThis->qConstruct = qConstructLinkedList;
|
||||||
@ -1268,7 +1209,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
|
|||||||
pThis->qAdd = qAddLinkedList;
|
pThis->qAdd = qAddLinkedList;
|
||||||
pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
|
pThis->qDeq = (rsRetVal (*)(qqueue_t*,void**)) qDeqLinkedList;
|
||||||
pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
|
pThis->qDel = (rsRetVal (*)(qqueue_t*)) qDelLinkedList;
|
||||||
pThis->qUnDeqAll = qUnDeqAllLinkedList;
|
|
||||||
break;
|
break;
|
||||||
case QUEUETYPE_DISK:
|
case QUEUETYPE_DISK:
|
||||||
pThis->qConstruct = qConstructDisk;
|
pThis->qConstruct = qConstructDisk;
|
||||||
@ -1276,7 +1216,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
|
|||||||
pThis->qAdd = qAddDisk;
|
pThis->qAdd = qAddDisk;
|
||||||
pThis->qDeq = qDeqDisk;
|
pThis->qDeq = qDeqDisk;
|
||||||
pThis->qDel = qDelDisk;
|
pThis->qDel = qDelDisk;
|
||||||
pThis->qUnDeqAll = qUnDeqAllDisk;
|
|
||||||
/* special handling */
|
/* special handling */
|
||||||
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
|
pThis->iNumWorkerThreads = 1; /* we need exactly one worker */
|
||||||
break;
|
break;
|
||||||
@ -1285,7 +1224,6 @@ rsRetVal qqueueConstruct(qqueue_t **ppThis, queueType_t qType, int iWorkerThread
|
|||||||
pThis->qDestruct = qDestructDirect;
|
pThis->qDestruct = qDestructDirect;
|
||||||
pThis->qAdd = qAddDirect;
|
pThis->qAdd = qAddDirect;
|
||||||
pThis->qDel = qDelDirect;
|
pThis->qDel = qDelDirect;
|
||||||
pThis->qUnDeqAll = qUnDeqAllDirect;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1471,7 +1409,6 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
|
|||||||
|
|
||||||
nDequeued = nDiscarded = 0;
|
nDequeued = nDiscarded = 0;
|
||||||
while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) {
|
while((iQueueSize = getLogicalQueueSize(pThis)) > 0 && nDequeued < pThis->iDeqBatchSize) {
|
||||||
dbgprintf("DequeueConsumableElements, index %d\n", nDequeued);
|
|
||||||
CHKiRet(qqueueDeq(pThis, &pUsr));
|
CHKiRet(qqueueDeq(pThis, &pUsr));
|
||||||
|
|
||||||
/* check if we should discard this element */
|
/* check if we should discard this element */
|
||||||
@ -1652,7 +1589,6 @@ DequeueForConsumer(qqueue_t *pThis, wti_t *pWti)
|
|||||||
ISOBJ_TYPE_assert(pThis, qqueue);
|
ISOBJ_TYPE_assert(pThis, qqueue);
|
||||||
ISOBJ_TYPE_assert(pWti, wti);
|
ISOBJ_TYPE_assert(pWti, wti);
|
||||||
|
|
||||||
dbgprintf("YYY: dequeue for consumer\n");
|
|
||||||
CHKiRet(DequeueConsumable(pThis, pWti));
|
CHKiRet(DequeueConsumable(pThis, pWti));
|
||||||
|
|
||||||
if(pWti->batch.nElem == 0)
|
if(pWti->batch.nElem == 0)
|
||||||
@ -2080,14 +2016,6 @@ CODESTARTobjDestruct(qqueue)
|
|||||||
if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL)
|
if(pThis->qType != QUEUETYPE_DIRECT && !pThis->bEnqOnly && pThis->pqParent == NULL)
|
||||||
ShutdownWorkers(pThis);
|
ShutdownWorkers(pThis);
|
||||||
|
|
||||||
/* now all workers are terminated. Messages may exist. Also, some logically dequeued
|
|
||||||
* messages may never have been processed because their worker was terminated. So
|
|
||||||
* we need to reset the logical dequeue pointer, persist the queue if configured to do
|
|
||||||
* so and then destruct everything. -- rgerhards, 2009-05-26
|
|
||||||
*/
|
|
||||||
RUNLOG_STR("XXX: NOT undequeueing entries!");
|
|
||||||
//CHKiRet(pThis->qUnDeqAll(pThis));
|
|
||||||
|
|
||||||
if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
|
if(pThis->bIsDA && getPhysicalQueueSize(pThis) > 0 && pThis->bSaveOnShutdown) {
|
||||||
CHKiRet(DoSaveOnShutdown(pThis));
|
CHKiRet(DoSaveOnShutdown(pThis));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -113,7 +113,6 @@ typedef struct queue_s {
|
|||||||
rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr);
|
rsRetVal (*qAdd)(struct queue_s *pThis, void *pUsr);
|
||||||
rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr);
|
rsRetVal (*qDeq)(struct queue_s *pThis, void **ppUsr);
|
||||||
rsRetVal (*qDel)(struct queue_s *pThis);
|
rsRetVal (*qDel)(struct queue_s *pThis);
|
||||||
rsRetVal (*qUnDeqAll)(struct queue_s *pThis);
|
|
||||||
/* end type-specific handler */
|
/* end type-specific handler */
|
||||||
/* synchronization variables */
|
/* synchronization variables */
|
||||||
pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */
|
pthread_mutex_t mutThrdMgmt; /* mutex for the queue's thread management */
|
||||||
|
|||||||
@ -119,7 +119,7 @@ wtiSetState(wti_t *pThis, bool bNewVal)
|
|||||||
* Note that when waiting for the thread to terminate, we do a busy wait, checking
|
* Note that when waiting for the thread to terminate, we do a busy wait, checking
|
||||||
* progress every 10ms. It is very unlikely that we will ever cancel a thread
|
* progress every 10ms. It is very unlikely that we will ever cancel a thread
|
||||||
* and, if so, it will only happen at the end of the rsyslog run. So doing this
|
* and, if so, it will only happen at the end of the rsyslog run. So doing this
|
||||||
* kind of not optimal wait is considered preferable over using condition variables.
|
* kind of non-optimal wait is considered preferable over using condition variables.
|
||||||
* rgerhards, 2008-02-26
|
* rgerhards, 2008-02-26
|
||||||
*/
|
*/
|
||||||
rsRetVal
|
rsRetVal
|
||||||
@ -134,7 +134,6 @@ wtiCancelThrd(wti_t *pThis)
|
|||||||
pthread_cancel(pThis->thrdID);
|
pthread_cancel(pThis->thrdID);
|
||||||
/* now wait until the thread terminates... */
|
/* now wait until the thread terminates... */
|
||||||
while(wtiGetState(pThis)) {
|
while(wtiGetState(pThis)) {
|
||||||
//fprintf(stderr, "sleep loop for getState\n");
|
|
||||||
srSleep(0, 10000);
|
srSleep(0, 10000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -271,7 +270,6 @@ wtiWorker(wti_t *pThis)
|
|||||||
|
|
||||||
/* first check if we are in shutdown process (but evaluate a bit later) */
|
/* first check if we are in shutdown process (but evaluate a bit later) */
|
||||||
terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED);
|
terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED);
|
||||||
RUNLOG_VAR("%d", terminateRet);
|
|
||||||
if(terminateRet == RS_RET_TERMINATE_NOW) {
|
if(terminateRet == RS_RET_TERMINATE_NOW) {
|
||||||
/* we now need to free the old batch */
|
/* we now need to free the old batch */
|
||||||
localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis);
|
localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis);
|
||||||
|
|||||||
@ -117,8 +117,7 @@ wtpConstructFinalize(wtp_t *pThis)
|
|||||||
/* alloc and construct workers - this can only be done in finalizer as we previously do
|
/* alloc and construct workers - this can only be done in finalizer as we previously do
|
||||||
* not know the max number of workers
|
* not know the max number of workers
|
||||||
*/
|
*/
|
||||||
if((pThis->pWrkr = MALLOC(sizeof(wti_t*) * pThis->iNumWorkerThreads)) == NULL)
|
CHKmalloc(pThis->pWrkr = MALLOC(sizeof(wti_t*) * pThis->iNumWorkerThreads));
|
||||||
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
|
|
||||||
|
|
||||||
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
|
for(i = 0 ; i < pThis->iNumWorkerThreads ; ++i) {
|
||||||
CHKiRet(wtiConstruct(&pThis->pWrkr[i]));
|
CHKiRet(wtiConstruct(&pThis->pWrkr[i]));
|
||||||
@ -190,10 +189,8 @@ wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex)
|
|||||||
wtpState = ATOMIC_FETCH_32BIT(pThis->wtpState);
|
wtpState = ATOMIC_FETCH_32BIT(pThis->wtpState);
|
||||||
|
|
||||||
if(wtpState == wtpState_SHUTDOWN_IMMEDIATE) {
|
if(wtpState == wtpState_SHUTDOWN_IMMEDIATE) {
|
||||||
RUNLOG_STR("WWW: ChkStopWrkr returns TERMINATE_NOW");
|
|
||||||
ABORT_FINALIZE(RS_RET_TERMINATE_NOW);
|
ABORT_FINALIZE(RS_RET_TERMINATE_NOW);
|
||||||
} else if(wtpState == wtpState_SHUTDOWN) {
|
} else if(wtpState == wtpState_SHUTDOWN) {
|
||||||
RUNLOG_STR("WWW: ChkStopWrkr returns TERMINATE_WHEN_IDLE");
|
|
||||||
ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE);
|
ABORT_FINALIZE(RS_RET_TERMINATE_WHEN_IDLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -429,7 +426,6 @@ wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr)
|
|||||||
|
|
||||||
ISOBJ_TYPE_assert(pThis, wtp);
|
ISOBJ_TYPE_assert(pThis, wtp);
|
||||||
|
|
||||||
int nMaxWrkrTmp = nMaxWrkr;
|
|
||||||
if(nMaxWrkr == 0)
|
if(nMaxWrkr == 0)
|
||||||
FINALIZE;
|
FINALIZE;
|
||||||
|
|
||||||
@ -437,10 +433,10 @@ int nMaxWrkrTmp = nMaxWrkr;
|
|||||||
nMaxWrkr = pThis->iNumWorkerThreads;
|
nMaxWrkr = pThis->iNumWorkerThreads;
|
||||||
|
|
||||||
nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd);
|
nMissing = nMaxWrkr - ATOMIC_FETCH_32BIT(pThis->iCurNumWrkThrd);
|
||||||
dbgprintf("wtpAdviseMaxWorkers, nmax: %d, curr %d, missing %d\n", nMaxWrkrTmp, pThis->iNumWorkerThreads, nMissing);
|
|
||||||
|
|
||||||
if(nMissing > 0) {
|
if(nMissing > 0) {
|
||||||
DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n", wtpGetDbgHdr(pThis), nMissing);
|
DBGPRINTF("%s: high activity - starting %d additional worker thread(s).\n",
|
||||||
|
wtpGetDbgHdr(pThis), nMissing);
|
||||||
/* start the rqtd nbr of workers */
|
/* start the rqtd nbr of workers */
|
||||||
for(i = 0 ; i < nMissing ; ++i) {
|
for(i = 0 ; i < nMissing ; ++i) {
|
||||||
CHKiRet(wtpStartWrkr(pThis));
|
CHKiRet(wtpStartWrkr(pThis));
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user