moved thread termination code out to threads.c

This commit is contained in:
Rainer Gerhards 2007-12-17 09:06:23 +00:00
parent 0a8e339bfe
commit 18dbfe70d8
7 changed files with 62 additions and 31 deletions

View File

@ -459,7 +459,7 @@ static rsRetVal modExit(void)\
* if there is a module-internal need to do so.
*/
#define BEGINrunInput \
static rsRetVal runInput(void)\
static rsRetVal runInput(thrdInfo_t *pThrd)\
{\
DEFiRet;

View File

@ -78,7 +78,7 @@ typedef struct moduleInfo {
union {
struct {/* data for input modules */
eTermSyncType_t eTermSyncType;
rsRetVal (*runInput)(void); /* function to gather input and submit to queue */
rsRetVal (*runInput)(thrdInfo_t*); /* function to gather input and submit to queue */
} im;
struct {/* data for output modules */
/* below: perform the configured action

View File

@ -67,26 +67,22 @@ typedef struct _instanceData {
*/
BEGINrunInput
CODESTARTrunInput
struct timeval tvSelectTimeout;
sigset_t sigSet;
sigfillset(&sigSet);
pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
sigemptyset(&sigSet);
sigaddset(&sigSet, SIGUSR2);
pthread_sigmask(SIG_UNBLOCK, &sigSet, NULL);
while(!bFinished) {
dbgprintf("immark pre select\n");
tvSelectTimeout.tv_sec = 5;
tvSelectTimeout.tv_usec = 0;
select(0, NULL, NULL, NULL, &tvSelectTimeout);
if(bFinished)
break;
dbgprintf("immark post select, doing mark, bFinished: %d\n", bFinished);
/* this is an endless loop - it is terminated when the thread is
* signalled to do so. This, however, is handled by the framework,
* right into the sleep below.
*/
while(1) {
/* we do not need to handle the RS_RET_TERMINATE_NOW case any
* special because we just need to terminate. This may be different
* if a cleanup is needed. But for now, we can just use CHKiRet().
* rgerhards, 2007-12-17
*/
CHKiRet(thrdSleep(pThrd, 5, 0)); /* seconds, micro seconds */
logmsgInternal(LOG_INFO, "-- MARK --", ADDDATE);
//logmsgInternal(LOG_INFO, "-- MARK --", ADDDATE|MARK);
}
dbgprintf("immark: finished!\n");
return RS_RET_OK;
finalize_it:
return iRet;
ENDrunInput

View File

@ -93,6 +93,7 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
RS_RET_ADDRESS_UNKNOWN = -2020, /**< an address is unknown - not necessarily an error */
RS_RET_MALICIOUS_ENTITY = -2021, /**< there is an malicious entity involved */
RS_RET_OK_DELETE_LISTENTRY = 1, /**< operation successful, but callee requested the deletion of an entry (special state) */
RS_RET_TERMINATE_NOW = 2, /**< operation successful, function is requested to terminate (mostly used with threads) */
RS_RET_OK = 0 /**< operation successful */
};
typedef enum rsRetVal_ rsRetVal; /**< friendly type for global return value */

View File

@ -6418,12 +6418,6 @@ int main(int argc, char **argv)
#endif
/* do any de-init's that need to be done AFTER this comment */
#if IMMARK
dbgprintf("waiting to join thrdMain\n");
pthread_kill(thrdMain, SIGUSR2);
pthread_join(thrdMain, NULL);
dbgprintf("joined thrdMain\n");
#endif
dbgprintf("reaching die\n");
die(bFinished);

View File

@ -91,7 +91,8 @@ rsRetVal thrdTerminate(thrdInfo_t *pThis)
dbgprintf("Terminate thread %lx via method %d\n", pThis->thrdID, pThis->eTermTool);
if(pThis->eTermTool == eTermSync_SIGNAL) {
pthread_kill(pThis->thrdID, SIGUSR2);
pThis->bShallStop = 1; /* request termination */
pthread_kill(pThis->thrdID, SIGUSR2); /* get thread out ouf blocking calls */
pthread_join(pThis->thrdID, NULL);
/* TODO: TIMEOUT! */
} else if(pThis->eTermTool == eTermSync_NONE) {
@ -127,8 +128,22 @@ static void* thrdStarter(void *arg)
assert(pThis != NULL);
assert(pThis->pUsrThrdMain != NULL);
iRet = pThis->pUsrThrdMain();
dbgprintf("thrdStarter: usrThrdMain 0x%lx returned with iRet %d.\n", (unsigned long) pThis->thrdID, iRet);
/* block all signals except the one we need for graceful termination */
sigset_t sigSet;
sigfillset(&sigSet);
pthread_sigmask(SIG_BLOCK, &sigSet, NULL);
sigemptyset(&sigSet);
sigaddset(&sigSet, SIGUSR2);
pthread_sigmask(SIG_UNBLOCK, &sigSet, NULL);
/* setup complete, we are now ready to execute the user code. We will not
* regain control until the user code is finished, in which case we terminate
* the thread.
*/
iRet = pThis->pUsrThrdMain(pThis);
dbgprintf("thrdStarter: usrThrdMain 0x%lx returned with iRet %d, exiting now.\n", (unsigned long) pThis->thrdID, iRet);
pthread_exit(0);
}
@ -136,7 +151,7 @@ static void* thrdStarter(void *arg)
* executing threads. It is added at the end of the list.
* rgerhards, 2007-12-14
*/
rsRetVal thrdCreate(rsRetVal (*thrdMain)(void), eTermSyncType_t eTermSyncType)
rsRetVal thrdCreate(rsRetVal (*thrdMain)(thrdInfo_t*), eTermSyncType_t eTermSyncType)
{
DEFiRet;
thrdInfo_t *pThis;
@ -198,6 +213,29 @@ rsRetVal thrdExit(void)
}
/* thrdSleep() - a fairly portable way to put a thread to sleep. It
* will wake up when
* a) the wake-time is over
* b) the thread shall be terminated
* Returns RS_RET_OK if all went well, RS_RET_TERMINATE_NOW if the calling
* thread shall be terminated and any other state if an error happened.
* rgerhards, 2007-12-17
*/
rsRetVal
thrdSleep(thrdInfo_t *pThis, int iSeconds, int iuSeconds)
{
DEFiRet;
struct timeval tvSelectTimeout;
assert(pThis != NULL);
tvSelectTimeout.tv_sec = iSeconds;
tvSelectTimeout.tv_usec = iuSeconds; /* micro seconds */
select(0, NULL, NULL, NULL, &tvSelectTimeout);
if(pThis->bShallStop)
iRet = RS_RET_TERMINATE_NOW;
return iRet;
}
/* queue functions (may be migrated to some other file...)
*/

View File

@ -34,7 +34,8 @@ typedef enum eTermSyncType {
typedef struct thrdInfo {
eTermSyncType_t eTermTool;
int bIsActive; /* Is thread running? */
rsRetVal (*pUsrThrdMain)(void); /* user thread main to be called in new thread */
int bShallStop; /* set to 1 if the thread should be stopped ? */
rsRetVal (*pUsrThrdMain)(struct thrdInfo*); /* user thread main to be called in new thread */
pthread_t thrdID;
} thrdInfo_t;
@ -54,7 +55,8 @@ rsRetVal thrdExit(void);
rsRetVal thrdInit(void);
rsRetVal thrdTerminate(thrdInfo_t *pThis);
rsRetVal thrdTerminateAll(void);
rsRetVal thrdCreate(rsRetVal (*thrdMain)(void), eTermSyncType_t eTermSyncType);
rsRetVal thrdCreate(rsRetVal (*thrdMain)(thrdInfo_t*), eTermSyncType_t eTermSyncType);
rsRetVal thrdSleep(thrdInfo_t *pThis, int iSeconds, int iuSeconds);
msgQueue *queueInit (void);
void queueDelete (msgQueue *q);
void queueAdd (msgQueue *q, void* in);