mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-21 10:10:42 +01:00
continued implementing wti class
This commit is contained in:
parent
f553ede5d9
commit
c876b04da2
246
wti.c
246
wti.c
@ -50,7 +50,6 @@
|
||||
DEFobjStaticHelpers
|
||||
|
||||
/* forward-definitions */
|
||||
static void *wtiWorker(void *arg);
|
||||
|
||||
/* methods */
|
||||
|
||||
@ -72,13 +71,21 @@ wtiGetDbgHdr(wti_t *pThis)
|
||||
/* get the current worker state. For simplicity and speed, we have
|
||||
* NOT used our regular calling interface this time. I hope that won't
|
||||
* bite in the long term... -- rgerhards, 2008-01-17
|
||||
* TODO: may be performance optimized by atomic operations
|
||||
*/
|
||||
static inline qWrkCmd_t
|
||||
wtiGetState(wti_t *pThis)
|
||||
wtiGetState(wti_t *pThis, int bLockMutex)
|
||||
{
|
||||
DEFVARS_mutexProtection;
|
||||
qWrkCmd_t tCmd;
|
||||
|
||||
ISOBJ_TYPE_assert(pThis, wti);
|
||||
// TODO: lock mutex?
|
||||
return pThis->tCurrCmd;
|
||||
|
||||
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
|
||||
tCmd = pThis->tCurrCmd;
|
||||
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
|
||||
|
||||
return tCmd;
|
||||
}
|
||||
|
||||
|
||||
@ -88,11 +95,10 @@ wtiGetState(wti_t *pThis)
|
||||
* in an active state. -- rgerhards, 2008-01-20
|
||||
*/
|
||||
rsRetVal
|
||||
wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, bActiveOnly)
|
||||
wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, int bActiveOnly)
|
||||
{
|
||||
DEFiRet;
|
||||
DEFVARS_mutex_cancelsafeLock;
|
||||
int iState;
|
||||
|
||||
ISOBJ_TYPE_assert(pThis, wti);
|
||||
assert(tCmd <= eWRKTHRD_SHUTDOWN_IMMEDIATE);
|
||||
@ -108,9 +114,6 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, bActiveOnly)
|
||||
dbgprintf("%s: receiving command %d\n", wtiGetDbgHdr(pThis), tCmd);
|
||||
switch(tCmd) {
|
||||
case eWRKTHRD_RUN_CREATED:
|
||||
assert(pThis->tCurrCmd < eWRKTHRD_RUN_CREATED);
|
||||
iState = pthread_create(&(pThis->thrdID), NULL, wtiWorker, (void*) pThis);
|
||||
dbgprintf("wti: Worker thread %s, started with state %d.\n", wtiGetDbgHdr(pThis), iState);
|
||||
break;
|
||||
case eWRKTHRD_TERMINATING:
|
||||
/* TODO: re-enable meaningful debug msg! (via function callback?)
|
||||
@ -135,7 +138,7 @@ wtiSetState(wti_t *pThis, qWrkCmd_t tCmd, bActiveOnly)
|
||||
}
|
||||
|
||||
mutex_cancelsafe_unlock(&pThis->mut);
|
||||
return iRet;
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
|
||||
@ -167,7 +170,7 @@ rsRetVal wtiDestruct(wti_t **ppThis)
|
||||
/* back to normal */
|
||||
pthread_setcancelstate(iCancelStateSave, NULL);
|
||||
|
||||
return iRet;
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
|
||||
@ -197,28 +200,6 @@ wtiConstructFinalize(wti_t *pThis)
|
||||
}
|
||||
|
||||
|
||||
/* Waits until the specified worker thread
|
||||
* changed to full running state (aka has started up).
|
||||
* rgerhards, 2008-01-17
|
||||
*/
|
||||
static inline rsRetVal
|
||||
wtiWaitStartup(wti_t *pThis)
|
||||
{
|
||||
DEFVARS_mutex_cancelsafeLock;
|
||||
ISOBJ_TYPE_assert(pThis, wti);
|
||||
|
||||
mutex_cancelsafe_lock(&pThis->mut);
|
||||
if((pThis->tCurrCmd == eWRKTHRD_RUN_CREATED) || (pThis->tCurrCmd == eWRKTHRD_RUN_CREATED)) {
|
||||
dbgprintf("wti: waiting on worker thread %s startup\n", wtiGetDbgHdr(pThis));
|
||||
pthread_cond_wait(&pThis->condInitDone, &pThis->mut);
|
||||
dbgprintf("worker startup done!\n");
|
||||
}
|
||||
mutex_cancelsafe_unlock(&pThis->mut);
|
||||
|
||||
return RS_RET_OK;
|
||||
}
|
||||
|
||||
|
||||
/* join a specific worker thread
|
||||
* we do not lock the mutex, because join will sync anyways...
|
||||
*/
|
||||
@ -230,42 +211,199 @@ wtiJoinThrd(wti_t *pThis)
|
||||
ISOBJ_TYPE_assert(pThis, wti);
|
||||
dbgprintf("wti: waiting for worker %s termination, current state %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd);
|
||||
pthread_join(pThis->thrdID, NULL);
|
||||
wtiSetState(pThis, eWRKTHRD_STOPPED); /* back to virgin... */
|
||||
wtiSetState(pThis, eWRKTHRD_STOPPED, 0); /* back to virgin... */
|
||||
pThis->thrdID = 0; /* invalidate the thread ID so that we do not accidently find reused ones */
|
||||
dbgprintf("wti: worker %s has stopped\n", wtiGetDbgHdr(pThis));
|
||||
|
||||
return iRet;
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
|
||||
static void *
|
||||
wtiWorker(void *arg)
|
||||
{
|
||||
wti_t *pThis = (wti_t*) arg;
|
||||
|
||||
ISOBJ_TYPE_assert(pThis, wti);
|
||||
|
||||
// TODO: add logic!
|
||||
//
|
||||
pthread_exit(0);
|
||||
}
|
||||
|
||||
/* Starts a worker thread (on a specific index [i]!)
|
||||
/* check if we had a worker thread changes and, if so, act
|
||||
* on it. At a minimum, terminated threads are harvested (joined).
|
||||
*/
|
||||
rsRetVal
|
||||
wtiStart(wti_t *pThis)
|
||||
wtiProcessThrdChanges(wti_t *pThis, int bLockMutex)
|
||||
{
|
||||
DEFiRet;
|
||||
DEFVARS_mutexProtection;
|
||||
|
||||
ISOBJ_TYPE_assert(pThis, wti);
|
||||
wtiSetState(pThis, eWRKTHRD_RUN_CREATED);
|
||||
|
||||
return iRet;
|
||||
BEGIN_MTX_PROTECTED_OPERATIONS(&pThis->mut, bLockMutex);
|
||||
switch(pThis->tCurrCmd) {
|
||||
case eWRKTHRD_TERMINATING:
|
||||
iRet = wtiJoinThrd(pThis);
|
||||
break;
|
||||
/* these cases just to satisfy the compiler, we do not act an them: */
|
||||
case eWRKTHRD_STOPPED:
|
||||
case eWRKTHRD_RUN_CREATED:
|
||||
case eWRKTHRD_RUN_INIT:
|
||||
case eWRKTHRD_RUNNING:
|
||||
case eWRKTHRD_SHUTDOWN:
|
||||
case eWRKTHRD_SHUTDOWN_IMMEDIATE:
|
||||
/* DO NOTHING */
|
||||
break;
|
||||
}
|
||||
END_MTX_PROTECTED_OPERATIONS(&pThis->mut);
|
||||
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
|
||||
/* cancellation cleanup handler for queueWorker ()
|
||||
* Updates admin structure and frees ressources.
|
||||
* rgerhards, 2008-01-16
|
||||
*/
|
||||
static void
|
||||
wtiWorkerCancelCleanup(void *arg)
|
||||
{
|
||||
wti_t *pThis = (wti_t*) arg;
|
||||
wtp_t *pWtp;
|
||||
int iCancelStateSave;
|
||||
|
||||
ISOBJ_TYPE_assert(pThis, wti);
|
||||
pWtp = pThis->pWtp;
|
||||
ISOBJ_TYPE_assert(pWtp, wtp);
|
||||
|
||||
dbgprintf("%s: cancelation cleanup handler called.\n", wtiGetDbgHdr(pThis));
|
||||
|
||||
/* call user supplied handler (that one e.g. requeues the element) */
|
||||
pWtp->pfOnWorkerCancel(pThis->pWtp->pUsr);
|
||||
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
|
||||
d_pthread_mutex_lock(&pWtp->mut);
|
||||
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0);
|
||||
// TODO: sync access!
|
||||
pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
|
||||
|
||||
pthread_cond_signal(&pWtp->condThrdTrm); /* activate anyone waiting on thread shutdown */
|
||||
d_pthread_mutex_unlock(&pWtp->mut);
|
||||
pthread_setcancelstate(iCancelStateSave, NULL);
|
||||
}
|
||||
|
||||
|
||||
/* generic worker thread framework
|
||||
*
|
||||
* Some special comments below, so that they do not clutter the main function code:
|
||||
*
|
||||
* On the use of pthread_testcancel():
|
||||
* Now make sure we can get canceled - it is not specified if pthread_setcancelstate() is
|
||||
* a cancellation point in itself. As we run most of the time without cancel enabled, I fear
|
||||
* we may never get cancelled if we do not create a cancellation point ourselfs.
|
||||
*
|
||||
* On the use of pthread_yield():
|
||||
* We yield to give the other threads a chance to obtain the mutex. If we do not
|
||||
* do that, this thread may very well aquire the mutex again before another thread
|
||||
* has even a chance to run. The reason is that mutex operations are free to be
|
||||
* implemented in the quickest possible way (and they typically are!). That is, the
|
||||
* mutex lock/unlock most probably just does an atomic memory swap and does not necessarily
|
||||
* schedule other threads waiting on the same mutex. That can lead to the same thread
|
||||
* aquiring the mutex ever and ever again while all others are starving for it. We
|
||||
* have exactly seen this behaviour when we deliberately introduced a long-running
|
||||
* test action which basically did a sleep. I understand that with real actions the
|
||||
* likelihood of this starvation condition is very low - but it could still happen
|
||||
* and would be very hard to debug. The yield() is a sure fix, its performance overhead
|
||||
* should be well accepted given the above facts. -- rgerhards, 2008-01-10
|
||||
*/
|
||||
rsRetVal
|
||||
wtiWorker(wti_t *pThis)
|
||||
{
|
||||
DEFiRet;
|
||||
DEFVARS_mutexProtection;
|
||||
struct timespec t;
|
||||
wtp_t *pWtp; /* our worker thread pool */
|
||||
int bInactivityTOOccured = 0;
|
||||
|
||||
ISOBJ_TYPE_assert(pThis, wti);
|
||||
pWtp = pThis->pWtp; /* shortcut */
|
||||
ISOBJ_TYPE_assert(pWtp, wtp);
|
||||
|
||||
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
|
||||
|
||||
/* now we have our identity, on to real processing */
|
||||
while(1) { /* loop will be broken below - need to do mutex locks */
|
||||
dbgprintf("%s: start worker run, queue cmd currently %d\n", wtiGetDbgHdr(pThis), pThis->tCurrCmd);
|
||||
/* process any pending thread requests */
|
||||
wtpProcessThrdChanges(pWtp);
|
||||
pthread_testcancel(); /* see big comment in function header */
|
||||
pthread_yield(); /* see big comment in function header */
|
||||
|
||||
wtpSetInactivityGuard(pThis->pWtp, 0, LOCK_MUTEX); /* must be set before usr mutex is locked! */
|
||||
BEGIN_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr, LOCK_MUTEX);
|
||||
|
||||
if( (bInactivityTOOccured && pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED))
|
||||
|| wtpChkStopWrkr(pWtp, LOCK_MUTEX, MUTEX_ALREADY_LOCKED)) {
|
||||
END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
|
||||
break; /* end worker thread run */
|
||||
}
|
||||
bInactivityTOOccured = 0; /* reset for next run */
|
||||
|
||||
/* if we reach this point, we are still protected by the mutex */
|
||||
|
||||
if(pWtp->pfIsIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED)) {
|
||||
dbgprintf("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
|
||||
pWtp->pfOnIdle(pWtp->pUsr, MUTEX_ALREADY_LOCKED);
|
||||
|
||||
dbgprintf("%s: pre condwait ->notEmpty, worker shutdown %d\n",
|
||||
wtiGetDbgHdr(pThis), pThis->pWtp->toWrkShutdown); // DEL
|
||||
if(pWtp->toWrkShutdown == -1) {
|
||||
dbgprintf("worker never times out!\n"); // DEL
|
||||
/* never shut down any started worker */
|
||||
pthread_cond_wait(pWtp->pcondBusy, pWtp->pmutUsr);
|
||||
} else {
|
||||
timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
|
||||
if(pthread_cond_timedwait(pWtp->pcondBusy, pWtp->pmutUsr, &t) != 0) {
|
||||
dbgprintf("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
|
||||
bInactivityTOOccured = 1; /* indicate we had a timeout */
|
||||
}
|
||||
}
|
||||
dbgprintf("%s: post condwait ->notEmpty\n", wtiGetDbgHdr(pThis)); // DEL
|
||||
END_MTX_PROTECTED_OPERATIONS(pWtp->pmutUsr);
|
||||
continue; /* request next iteration */
|
||||
}
|
||||
|
||||
/* if we reach this point, we have a non-empty queue (and are still protected by mutex) */
|
||||
pWtp->pfDoWork(pThis, iCancelStateSave);
|
||||
|
||||
/* TODO: move this above into one of the chck Term functions */
|
||||
//if(Debug && (qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN) && pThis->iQueueSize > 0)
|
||||
// dbgprintf("%s: worker does not yet terminate because it still has "
|
||||
// " %d messages to process.\n", wtiGetDbgHdr(pThis), pThis->iQueueSize);
|
||||
}
|
||||
|
||||
/* indicate termination */
|
||||
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
|
||||
dbgprintf("%s: worker waiting for mutex\n", wtiGetDbgHdr(pThis));
|
||||
d_pthread_mutex_lock(&pThis->mut);
|
||||
pthread_cleanup_pop(0); /* remove cleanup handler */
|
||||
|
||||
// TODO: I think we no longer need that - but check!
|
||||
#if 0
|
||||
/* if we ever need finalize_it, here would be the place for it! */
|
||||
if(qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN ||
|
||||
qWrkrGetState(pWrkrInst) == eWRKTHRD_SHUTDOWN_IMMEDIATE ||
|
||||
qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_INIT ||
|
||||
qWrkrGetState(pWrkrInst) == eWRKTHRD_RUN_CREATED) {
|
||||
/* in shutdown case, we need to flag termination. All other commands
|
||||
* have a meaning to the thread harvester, so we can not overwrite them
|
||||
*/
|
||||
dbgprintf("%s: setting termination state\n", wtiGetDbgHdr(pThis));
|
||||
wtiSetState(pWrkrInst, eWRKTHRD_TERMINATING, 0);
|
||||
}
|
||||
#else
|
||||
wtiSetState(pThis, eWRKTHRD_TERMINATING, 0);
|
||||
#endif
|
||||
// TODO: call, mutex:
|
||||
pWtp->bThrdStateChanged = 1; /* indicate change, so harverster will be called */
|
||||
pthread_cond_signal(&pWtp->condThrdTrm); /* activate anyone waiting on thread shutdown */
|
||||
d_pthread_mutex_unlock(&pThis->mut);
|
||||
pthread_setcancelstate(iCancelStateSave, NULL);
|
||||
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
|
||||
/* some simple object access methods */
|
||||
DEFpropSetMeth(wti, toShutdown, int);
|
||||
|
||||
/* set the debug header message
|
||||
* The passed-in string is duplicated. So if the caller does not need
|
||||
@ -294,7 +432,7 @@ wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, size_t lenMsg)
|
||||
memcpy(pThis->pszDbgHdr, pszMsg, lenMsg + 1); /* always think about the \0! */
|
||||
|
||||
finalize_it:
|
||||
return iRet;
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
|
||||
|
||||
3
wti.h
3
wti.h
@ -34,7 +34,6 @@ typedef struct wti_s {
|
||||
qWrkCmd_t tCurrCmd; /* current command to be carried out by worker */
|
||||
obj_t *pUsr; /* current user object being processed (or NULL if none) */
|
||||
wtp_t *pWtp; /* my worker thread pool (important if only the work thread instance is passed! */
|
||||
int toShutdown; /* shutdown timeout, used when idle */
|
||||
pthread_cond_t condInitDone; /* signaled when the thread startup is done (once per thread existance) */
|
||||
pthread_mutex_t mut;
|
||||
uchar *pszDbgHdr; /* header string for debug messages */
|
||||
@ -47,9 +46,9 @@ typedef struct wti_s {
|
||||
rsRetVal wtiConstruct(wti_t **ppThis);
|
||||
rsRetVal wtiConstructFinalize(wti_t *pThis);
|
||||
rsRetVal wtiDestruct(wti_t **ppThis);
|
||||
rsRetVal wtiWorker(wti_t *pThis);
|
||||
PROTOTYPEObjClassInit(wti);
|
||||
PROTOTYPEpropSetMeth(wti, pszDbgHdr, uchar*);
|
||||
PROTOTYPEpropSetMeth(wti, toShutdown, int);
|
||||
#define wtiGetID(pThis) ((unsigned long) pThis)
|
||||
|
||||
#endif /* #ifndef WTI_H_INCLUDED */
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user