mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-17 04:40:43 +01:00
some more fixes for queue engine
The enhanced testbench now runs without failures, again
This commit is contained in:
parent
fc3e56941c
commit
13d4a23e92
@ -121,6 +121,8 @@ static void getCurrTime(struct syslogTime *t, time_t *ttSeconds)
|
|||||||
t->OffsetMode = '+';
|
t->OffsetMode = '+';
|
||||||
t->OffsetHour = lBias / 3600;
|
t->OffsetHour = lBias / 3600;
|
||||||
t->OffsetMinute = lBias % 3600;
|
t->OffsetMinute = lBias % 3600;
|
||||||
|
|
||||||
|
t->timeType = 0; /* this is new and may cause format errors -- rgerhards, 2009-05-28 */
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -54,6 +54,7 @@
|
|||||||
#include "wtp.h"
|
#include "wtp.h"
|
||||||
#include "wti.h"
|
#include "wti.h"
|
||||||
#include "atomic.h"
|
#include "atomic.h"
|
||||||
|
#include "msg.h" /* TODO: remove one we removed MsgAddRef() call */
|
||||||
|
|
||||||
#ifdef OS_SOLARIS
|
#ifdef OS_SOLARIS
|
||||||
# include <sched.h>
|
# include <sched.h>
|
||||||
@ -248,7 +249,6 @@ static inline rsRetVal qqueueAdviseMaxWorkers(qqueue_t *pThis)
|
|||||||
} else {
|
} else {
|
||||||
iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
|
iMaxWorkers = getLogicalQueueSize(pThis) / pThis->iMinMsgsPerWrkr + 1;
|
||||||
}
|
}
|
||||||
dbgprintf("YYY: wtp advise max reg workers %d\n", iMaxWorkers);
|
|
||||||
wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
|
wtpAdviseMaxWorkers(pThis->pWtpReg, iMaxWorkers); /* disk queues have always one worker */
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -294,7 +294,6 @@ TurnOffDAMode(qqueue_t *pThis)
|
|||||||
{
|
{
|
||||||
DEFiRet;
|
DEFiRet;
|
||||||
|
|
||||||
RUNLOG_STR("XXX: TurnOffDAMode\n");
|
|
||||||
ISOBJ_TYPE_assert(pThis, qqueue);
|
ISOBJ_TYPE_assert(pThis, qqueue);
|
||||||
ASSERT(pThis->bRunsDA);
|
ASSERT(pThis->bRunsDA);
|
||||||
|
|
||||||
@ -720,6 +719,7 @@ static rsRetVal qDeqLinkedList(qqueue_t *pThis, obj_t **ppUsr)
|
|||||||
DEFiRet;
|
DEFiRet;
|
||||||
|
|
||||||
pEntry = pThis->tVars.linklist.pDeqRoot;
|
pEntry = pThis->tVars.linklist.pDeqRoot;
|
||||||
|
ISOBJ_TYPE_assert(pEntry->pUsr, msg);
|
||||||
*ppUsr = pEntry->pUsr;
|
*ppUsr = pEntry->pUsr;
|
||||||
pThis->tVars.linklist.pDeqRoot = pEntry->pNext;
|
pThis->tVars.linklist.pDeqRoot = pEntry->pNext;
|
||||||
|
|
||||||
@ -1137,7 +1137,7 @@ finalize_it:
|
|||||||
/* generic code to dequeue a queue entry
|
/* generic code to dequeue a queue entry
|
||||||
*/
|
*/
|
||||||
static rsRetVal
|
static rsRetVal
|
||||||
qqueueDeq(qqueue_t *pThis, void *pUsr)
|
qqueueDeq(qqueue_t *pThis, void **ppUsr)
|
||||||
{
|
{
|
||||||
DEFiRet;
|
DEFiRet;
|
||||||
|
|
||||||
@ -1148,7 +1148,7 @@ qqueueDeq(qqueue_t *pThis, void *pUsr)
|
|||||||
* If we decrement, however, we may lose a message. But that is better than
|
* If we decrement, however, we may lose a message. But that is better than
|
||||||
* losing the whole process because it loops... -- rgerhards, 2008-01-03
|
* losing the whole process because it loops... -- rgerhards, 2008-01-03
|
||||||
*/
|
*/
|
||||||
iRet = pThis->qDeq(pThis, pUsr);
|
iRet = pThis->qDeq(pThis, ppUsr);
|
||||||
ATOMIC_INC(pThis->nLogDeq);
|
ATOMIC_INC(pThis->nLogDeq);
|
||||||
|
|
||||||
// dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n",
|
// dbgoprint((obj_t*) pThis, "entry deleted, size now log %d, phys %d entries\n",
|
||||||
@ -1162,6 +1162,8 @@ qqueueDeq(qqueue_t *pThis, void *pUsr)
|
|||||||
* Both the regular and DA queue (if it exists) is waited for, but on the same timeout.
|
* Both the regular and DA queue (if it exists) is waited for, but on the same timeout.
|
||||||
* After this function returns, the workers must either be finished or some force
|
* After this function returns, the workers must either be finished or some force
|
||||||
* to finish them must be applied.
|
* to finish them must be applied.
|
||||||
|
* This function also instructs the DA worker pool (if it exists) to terminate. This is done
|
||||||
|
* in preparation of final queue shutdown.
|
||||||
* rgerhards, 2009-05-27
|
* rgerhards, 2009-05-27
|
||||||
*/
|
*/
|
||||||
static rsRetVal
|
static rsRetVal
|
||||||
@ -1175,7 +1177,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
|
|||||||
ISOBJ_TYPE_assert(pThis, qqueue);
|
ISOBJ_TYPE_assert(pThis, qqueue);
|
||||||
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
|
ASSERT(pThis->pqParent == NULL); /* detect invalid calling sequence */
|
||||||
|
|
||||||
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
|
BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pThis->mut); /* some workers may be running in parallel! */
|
||||||
if(getPhysicalQueueSize(pThis) > 0) {
|
if(getPhysicalQueueSize(pThis) > 0) {
|
||||||
if(pThis->bRunsDA) {
|
if(pThis->bRunsDA) {
|
||||||
/* We may have waited on the low water mark. As it may have changed, we
|
/* We may have waited on the low water mark. As it may have changed, we
|
||||||
@ -1184,7 +1186,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
|
|||||||
wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
|
wtpAdviseMaxWorkers(pThis->pWtpDA, 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
|
END_MTX_PROTECTED_OPERATIONS_UNCOND(pThis->mut);
|
||||||
|
|
||||||
/* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
|
/* at this stage, we need to have the DA worker properly initialized and running (if there is one) */
|
||||||
if(pThis->bRunsDA) {
|
if(pThis->bRunsDA) {
|
||||||
@ -1217,9 +1219,7 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* OK, the worker for the regular queue is processed, on the the DA queue regular worker. */
|
/* OK, the worker for the regular queue is processed, on the the DA queue regular worker. */
|
||||||
BEGIN_MTX_PROTECTED_OPERATIONS(pThis->mut, LOCK_MUTEX); /* some workers may be running in parallel! */
|
|
||||||
if(pThis->bRunsDA) {
|
if(pThis->bRunsDA) {
|
||||||
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
|
|
||||||
dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n",
|
dbgoprint((obj_t*) pThis, "we have a DA queue (0x%lx), requesting its shutdown.\n",
|
||||||
qqueueGetID(pThis->pqDA));
|
qqueueGetID(pThis->pqDA));
|
||||||
/* we use the same absolute timeout as above, so we do not use more than the configured
|
/* we use the same absolute timeout as above, so we do not use more than the configured
|
||||||
@ -1232,8 +1232,16 @@ tryShutdownWorkersWithinQueueTimeout(qqueue_t *pThis)
|
|||||||
} else {
|
} else {
|
||||||
dbgoprint((obj_t*) pThis, "DA queue worker shut down.\n");
|
dbgoprint((obj_t*) pThis, "DA queue worker shut down.\n");
|
||||||
}
|
}
|
||||||
|
/* we also instruct the DA worker pool to shutdown ASAP. If we need it for persisting
|
||||||
|
* the queue, it is restarted at a later stage. We don't care here if a timeout happens.
|
||||||
|
*/
|
||||||
|
dbgoprint((obj_t*) pThis, "trying shutdown of regular worker of DA queue\n");
|
||||||
|
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN_IMMEDIATE, &tTimeout);
|
||||||
|
if(iRetLocal == RS_RET_TIMED_OUT) {
|
||||||
|
dbgoprint((obj_t*) pThis, "shutdown timed out on main queue DA worker pool (this is OK)\n");
|
||||||
} else {
|
} else {
|
||||||
END_MTX_PROTECTED_OPERATIONS(pThis->mut);
|
dbgoprint((obj_t*) pThis, "main queue DA worker pool shut down.\n");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
RETiRet;
|
RETiRet;
|
||||||
@ -1335,6 +1343,14 @@ cancelWorkers(qqueue_t *pThis)
|
|||||||
dbgoprint((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker "
|
dbgoprint((obj_t*) pThis, "unexpected iRet state %d trying to cancel DA queue worker "
|
||||||
"threads, continuing, but results are unpredictable\n", iRetLocal);
|
"threads, continuing, but results are unpredictable\n", iRetLocal);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* finally, we cancel the main queue's DA worker pool, if it still is running. It may be
|
||||||
|
* restarted later to persist the queue. But we stop it, because otherwise we get into
|
||||||
|
* big trouble when resetting the logical dequeue pointer. This operation can only be
|
||||||
|
* done when *no* worker is running. So time for a shutdown... -- rgerhards, 2009-05-28
|
||||||
|
*/
|
||||||
|
dbgoprint((obj_t*) pThis, "checking to see if we need to cancel the main queue's DA worker pool\n");
|
||||||
|
iRetLocal = wtpCancelAll(pThis->pWtpDA); /* returns immediately if all threads already have terminated */
|
||||||
}
|
}
|
||||||
|
|
||||||
RETiRet;
|
RETiRet;
|
||||||
@ -1600,23 +1616,15 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
|
|||||||
|
|
||||||
ISOBJ_TYPE_assert(pThis, qqueue);
|
ISOBJ_TYPE_assert(pThis, qqueue);
|
||||||
assert(pBatch != NULL);
|
assert(pBatch != NULL);
|
||||||
// TODO: ULTRA: lock qaueue mutex if instructed to do so
|
|
||||||
|
|
||||||
/* if the queue runs in DA mode, the DA worker already deleted the in-memory representation
|
|
||||||
* of the message. But in regular mode, we need to do it ourselfs. We differentiate between
|
|
||||||
* the two cases, because it is actually the easiest way to handle the destruct-Problem in
|
|
||||||
* a simple and pUsrp-Type agnostic way (else we would need an objAddRef() generic function).
|
|
||||||
*/
|
|
||||||
if(!pThis->bRunsDA) {
|
|
||||||
for(i = 0 ; i < pBatch->nElem ; ++i) {
|
for(i = 0 ; i < pBatch->nElem ; ++i) {
|
||||||
pUsr = pBatch->pElem[i].pUsrp;
|
pUsr = pBatch->pElem[i].pUsrp;
|
||||||
objDestruct(pUsr);
|
objDestruct(pUsr);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
iRet = DeleteBatchFromQStore(pThis, pBatch);
|
iRet = DeleteBatchFromQStore(pThis, pBatch);
|
||||||
|
|
||||||
pBatch->nElem = 0; /* reset batch */
|
pBatch->nElem = pBatch->nElemDeq = 0; /* reset batch */
|
||||||
|
|
||||||
RETiRet;
|
RETiRet;
|
||||||
}
|
}
|
||||||
@ -1908,8 +1916,13 @@ ConsumerDA(qqueue_t *pThis, wti_t *pWti, int iCancelStateSave)
|
|||||||
|
|
||||||
CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave));
|
CHKiRet(DequeueForConsumer(pThis, pWti, iCancelStateSave));
|
||||||
/* iterate over returned results and enqueue them in DA queue */
|
/* iterate over returned results and enqueue them in DA queue */
|
||||||
for(i = 0 ; i < pWti->batch.nElem ; i++)
|
for(i = 0 ; i < pWti->batch.nElem ; i++) {
|
||||||
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, pWti->batch.pElem[i].pUsrp));
|
/* TODO: we must add a generic "addRef" mechanism, because the disk queue enqueue destructs
|
||||||
|
* the message. So far, we simply assume we always have msg_t, what currently is always the case.
|
||||||
|
* rgerhards, 2009-05-28
|
||||||
|
*/
|
||||||
|
CHKiRet(qqueueEnqObj(pThis->pqDA, eFLOWCTL_NO_DELAY, (obj_t*)MsgAddRef((msg_t*)(pWti->batch.pElem[i].pUsrp))));
|
||||||
|
}
|
||||||
|
|
||||||
finalize_it:
|
finalize_it:
|
||||||
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
|
dbgoprint((obj_t*) pThis, "DAConsumer returns with iRet %d\n", iRet);
|
||||||
@ -2306,16 +2319,12 @@ dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), g
|
|||||||
if(pThis->bRunsDA != 2) {
|
if(pThis->bRunsDA != 2) {
|
||||||
InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */
|
InitDA(pThis, QUEUE_MODE_ENQONLY, LOCK_MUTEX); /* switch to DA mode */
|
||||||
dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
|
dbgprintf("after InitDA, queue log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
|
||||||
//!!! TODO !!!das passiert wohl, wenn die queue empty wird! (aber es vorher noch nciht war)
|
|
||||||
RUNLOG_VAR("%d", pThis->bRunsDA);
|
|
||||||
RUNLOG_VAR("%d", pThis->pWtpDA->wtpState);
|
|
||||||
qqueueWaitDAModeInitialized(pThis); /* make sure DA mode is actually started, else we may have a race! */
|
qqueueWaitDAModeInitialized(pThis); /* make sure DA mode is actually started, else we may have a race! */
|
||||||
}
|
}
|
||||||
/* make sure we do not timeout before we are done */
|
/* make sure we do not timeout before we are done */
|
||||||
dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n");
|
dbgoprint((obj_t*) pThis, "bSaveOnShutdown configured, infinite timeout set\n");
|
||||||
timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
|
timeoutComp(&tTimeout, QUEUE_TIMEOUT_ETERNAL);
|
||||||
/* and run the primary queue's DA worker to drain the queue */
|
/* and run the primary queue's DA worker to drain the queue */
|
||||||
RUNLOG;
|
|
||||||
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
|
iRetLocal = wtpShutdownAll(pThis->pWtpDA, wtpState_SHUTDOWN, &tTimeout);
|
||||||
dbgoprint((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n",
|
dbgoprint((obj_t*) pThis, "end queue persistence run, iRet %d, queue size log %d, phys %d\n",
|
||||||
iRetLocal, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
|
iRetLocal, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
|
||||||
@ -2333,7 +2342,6 @@ BEGINobjDestruct(qqueue) /* be sure to specify the object type also in END and C
|
|||||||
CODESTARTobjDestruct(qqueue)
|
CODESTARTobjDestruct(qqueue)
|
||||||
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
|
pThis->bQueueInDestruction = 1; /* indicate we are in destruction (modifies some behaviour) */
|
||||||
|
|
||||||
RUNLOG_STR("XXX: queue destruct\n");
|
|
||||||
/* shut down all workers
|
/* shut down all workers
|
||||||
* We do not need to shutdown workers when we are in enqueue-only mode or we are a
|
* We do not need to shutdown workers when we are in enqueue-only mode or we are a
|
||||||
* direct queue - because in both cases we have none... ;)
|
* direct queue - because in both cases we have none... ;)
|
||||||
@ -2347,7 +2355,6 @@ RUNLOG_STR("XXX: queue destruct\n");
|
|||||||
* we need to reset the logical dequeue pointer, persist the queue if configured to do
|
* we need to reset the logical dequeue pointer, persist the queue if configured to do
|
||||||
* so and then destruct everything. -- rgerhards, 2009-05-26
|
* so and then destruct everything. -- rgerhards, 2009-05-26
|
||||||
*/
|
*/
|
||||||
//!!!! //CHKiRet(pThis->qUnDeqAll(pThis));
|
|
||||||
dbgprintf("XXX: pre unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
|
dbgprintf("XXX: pre unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
|
||||||
CHKiRet(pThis->qUnDeqAll(pThis));
|
CHKiRet(pThis->qUnDeqAll(pThis));
|
||||||
dbgprintf("XXX: post unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
|
dbgprintf("XXX: post unDeq disk log %d, phys %d\n", getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
|
||||||
|
|||||||
@ -392,7 +392,7 @@ wtiWorker(wti_t *pThis)
|
|||||||
dbgSetThrdName(pThis->pszDbgHdr);
|
dbgSetThrdName(pThis->pszDbgHdr);
|
||||||
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
|
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
|
||||||
|
|
||||||
pThis->batch.nElemDeq = 0; /* re-init dequeue count */
|
// TODO: if we have a problem, enable again! pThis->batch.nElemDeq = 0; /* re-init dequeue count */
|
||||||
BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
|
BEGIN_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
|
||||||
pWtp->pfOnWorkerStartup(pWtp->pUsr);
|
pWtp->pfOnWorkerStartup(pWtp->pUsr);
|
||||||
END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
|
END_MTX_PROTECTED_OPERATIONS_UNCOND(pWtp->pmutUsr);
|
||||||
|
|||||||
@ -294,17 +294,13 @@ wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout
|
|||||||
|
|
||||||
ISOBJ_TYPE_assert(pThis, wtp);
|
ISOBJ_TYPE_assert(pThis, wtp);
|
||||||
|
|
||||||
dbgprintf("XXX:10 wtp %p, state %d\n", pThis, pThis->wtpState);
|
|
||||||
wtpSetState(pThis, tShutdownCmd);
|
wtpSetState(pThis, tShutdownCmd);
|
||||||
dbgprintf("XXX:20 wtp %p, state %d\n", pThis, pThis->wtpState);
|
|
||||||
wtpWakeupAllWrkr(pThis);
|
wtpWakeupAllWrkr(pThis);
|
||||||
dbgprintf("XXX:30 wtp %p, state %d\n", pThis, pThis->wtpState);
|
|
||||||
|
|
||||||
/* see if we need to harvest (join) any terminated threads (even in timeout case,
|
/* see if we need to harvest (join) any terminated threads (even in timeout case,
|
||||||
* some may have terminated...
|
* some may have terminated...
|
||||||
*/
|
*/
|
||||||
wtpProcessThrdChanges(pThis);
|
wtpProcessThrdChanges(pThis);
|
||||||
dbgprintf("XXX:40 wtp %p, state %d\n", pThis, pThis->wtpState);
|
|
||||||
|
|
||||||
/* and wait for their termination */
|
/* and wait for their termination */
|
||||||
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
|
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
|
||||||
@ -312,9 +308,7 @@ dbgprintf("XXX:40 wtp %p, state %d\n", pThis, pThis->wtpState);
|
|||||||
pthread_cleanup_push(mutexCancelCleanup, &pThis->mut);
|
pthread_cleanup_push(mutexCancelCleanup, &pThis->mut);
|
||||||
pthread_setcancelstate(iCancelStateSave, NULL);
|
pthread_setcancelstate(iCancelStateSave, NULL);
|
||||||
bTimedOut = 0;
|
bTimedOut = 0;
|
||||||
dbgprintf("XXX:50 wtp %p, state %d\n", pThis, pThis->wtpState);
|
|
||||||
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
|
while(pThis->iCurNumWrkThrd > 0 && !bTimedOut) {
|
||||||
dbgprintf("XXX:60 wtp %p, state %d\n", pThis, pThis->wtpState);
|
|
||||||
dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n",
|
dbgprintf("%s: waiting %ldms on worker thread termination, %d still running\n",
|
||||||
wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd);
|
wtpGetDbgHdr(pThis), timeoutVal(ptTimeout), pThis->iCurNumWrkThrd);
|
||||||
|
|
||||||
|
|||||||
@ -34,6 +34,7 @@ public class DiagTalker {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
diagSocket = new Socket(host, port);
|
diagSocket = new Socket(host, port);
|
||||||
|
diagSocket.setSoTimeout(0); /* wait for lenghty operations */
|
||||||
out = new PrintWriter(diagSocket.getOutputStream(), true);
|
out = new PrintWriter(diagSocket.getOutputStream(), true);
|
||||||
in = new BufferedReader(new InputStreamReader(
|
in = new BufferedReader(new InputStreamReader(
|
||||||
diagSocket.getInputStream()));
|
diagSocket.getInputStream()));
|
||||||
|
|||||||
@ -27,8 +27,5 @@ source $srcdir/diag.sh injectmsg 2050 50
|
|||||||
|
|
||||||
# clean up and check test result
|
# clean up and check test result
|
||||||
source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
|
source $srcdir/diag.sh shutdown-when-empty # shut down rsyslogd when done processing messages
|
||||||
### currently, we get a stable abort if we use the former kill logic. With shutdown-when-empty, it hangs (but that still tells us there is a bug ;)) ###
|
|
||||||
#kill `cat rsyslog.pid`
|
|
||||||
echo seqchk?
|
|
||||||
source $srcdir/diag.sh seq-check 2099
|
source $srcdir/diag.sh seq-check 2099
|
||||||
source $srcdir/diag.sh exit
|
source $srcdir/diag.sh exit
|
||||||
|
|||||||
@ -19,7 +19,6 @@ $srcdir/diag.sh shutdown-immediate
|
|||||||
$srcdir/diag.sh wait-shutdown
|
$srcdir/diag.sh wait-shutdown
|
||||||
source $srcdir/diag.sh check-mainq-spool
|
source $srcdir/diag.sh check-mainq-spool
|
||||||
|
|
||||||
echo DEBUG EXIT!
|
|
||||||
#exit
|
#exit
|
||||||
|
|
||||||
# restart engine and have rest processed
|
# restart engine and have rest processed
|
||||||
|
|||||||
@ -9,7 +9,7 @@
|
|||||||
#valgrind="valgrind --tool=drd --log-fd=1"
|
#valgrind="valgrind --tool=drd --log-fd=1"
|
||||||
#valgrind="valgrind --tool=helgrind --log-fd=1"
|
#valgrind="valgrind --tool=helgrind --log-fd=1"
|
||||||
#set -o xtrace
|
#set -o xtrace
|
||||||
#export RSYSLOG_DEBUG="debug nostdout noprintmutexaction"
|
#export RSYSLOG_DEBUG="debug nostdout printmutexaction"
|
||||||
#export RSYSLOG_DEBUGLOG="log"
|
#export RSYSLOG_DEBUGLOG="log"
|
||||||
case $1 in
|
case $1 in
|
||||||
'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason
|
'init') $srcdir/killrsyslog.sh # kill rsyslogd if it runs for some reason
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user