mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-17 19:50:41 +01:00
while we solved valgrind false positives during shutdown in 2700badfda76c701fa9169c31c9ec4f7993dc0b8, we continue to see related messages for thread-local-storages. Rather than suppressing these as well we now refactored the code so that it does not use detached threads. This shall improve robustness. In case it does not, we have enhanced debug messages under CI runs, so potentially still-existing problem areas should be easier to diagnose. see also https://github.com/rsyslog/rsyslog/issues/2902
109 lines
4.4 KiB
C
109 lines
4.4 KiB
C
/* Definition of the worker thread pool (wtp) object.
|
|
*
|
|
* Copyright 2008-2018 Adiscon GmbH.
|
|
*
|
|
* This file is part of the rsyslog runtime library.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
* -or-
|
|
* see COPYING.ASL20 in the source distribution
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
#ifndef WTP_H_INCLUDED
|
|
#define WTP_H_INCLUDED
|
|
|
|
#include <pthread.h>
|
|
#include "obj.h"
|
|
#include "atomic.h"
|
|
|
|
/* states for worker threads.
|
|
* important: they need to be increasing with all previous state bits
|
|
* set. That is because we can only atomically or a value!
|
|
*/
|
|
#define WRKTHRD_STOPPED 0
|
|
#define WRKTHRD_INITIALIZING 1
|
|
#define WRKTHRD_RUNNING 3
|
|
#define WRKTHRD_WAIT_JOIN 7
|
|
|
|
|
|
/* possible states of a worker thread pool */
|
|
typedef enum {
|
|
wtpState_RUNNING = 0, /* runs in regular mode */
|
|
wtpState_SHUTDOWN = 1, /* worker threads shall shutdown when idle */
|
|
wtpState_SHUTDOWN_IMMEDIATE = 2 /* worker threads shall shutdown ASAP, even if not idle */
|
|
} wtpState_t;
|
|
|
|
|
|
/* the worker thread pool (wtp) object */
|
|
struct wtp_s {
|
|
BEGINobjInstance;
|
|
wtpState_t wtpState;
|
|
int iNumWorkerThreads;/* number of worker threads to use */
|
|
int iCurNumWrkThrd;/* current number of active worker threads */
|
|
struct wti_s **pWrkr;/* array with control structure for the worker thread(s) associated with this wtp */
|
|
int toWrkShutdown; /* timeout for idle workers in ms, -1 means indefinite (0 is immediate) */
|
|
rsRetVal (*pConsumer)(void *); /* user-supplied consumer function for dewtpd messages */
|
|
/* synchronization variables */
|
|
pthread_mutex_t mutWtp; /* mutex for the wtp's thread management */
|
|
pthread_cond_t condThrdInitDone; /* signalled when a new thread is ready for work */
|
|
pthread_cond_t condThrdTrm;/* signalled when threads terminate */
|
|
/* end sync variables */
|
|
/* user objects */
|
|
void *pUsr; /* pointer to user object (in this case, the queue the wtp belongs to) */
|
|
pthread_attr_t attrThrd;/* attribute for new threads (created just once and cached here) */
|
|
pthread_mutex_t *pmutUsr;
|
|
rsRetVal (*pfChkStopWrkr)(void *pUsr, int);
|
|
rsRetVal (*pfGetDeqBatchSize)(void *pUsr, int*); /* obtains max dequeue count from queue config */
|
|
rsRetVal (*pfObjProcessed)(void *pUsr, wti_t *pWti); /* indicate user object is processed */
|
|
rsRetVal (*pfRateLimiter)(void *pUsr);
|
|
rsRetVal (*pfDoWork)(void *pUsr, void *pWti);
|
|
/* end user objects */
|
|
uchar *pszDbgHdr; /* header string for debug messages */
|
|
DEF_ATOMIC_HELPER_MUT(mutCurNumWrkThrd)
|
|
DEF_ATOMIC_HELPER_MUT(mutWtpState)
|
|
};
|
|
|
|
/* some symbolic constants for easier reference */
|
|
|
|
|
|
#define DENY_WORKER_START_DURING_SHUTDOWN 0
|
|
#define PERMIT_WORKER_START_DURING_SHUTDOWN 1
|
|
|
|
/* prototypes */
|
|
rsRetVal wtpConstruct(wtp_t **ppThis);
|
|
rsRetVal wtpConstructFinalize(wtp_t *pThis);
|
|
rsRetVal wtpDestruct(wtp_t **ppThis);
|
|
rsRetVal wtpAdviseMaxWorkers(wtp_t *pThis, int nMaxWrkr, const int permit_during_shutdown);
|
|
rsRetVal wtpProcessThrdChanges(wtp_t *pThis);
|
|
rsRetVal wtpChkStopWrkr(wtp_t *pThis, int bLockUsrMutex);
|
|
rsRetVal wtpSetState(wtp_t *pThis, wtpState_t iNewState);
|
|
rsRetVal wtpWakeupAllWrkr(wtp_t *pThis);
|
|
rsRetVal wtpCancelAll(wtp_t *pThis, const uchar *const cancelobj);
|
|
rsRetVal wtpSetDbgHdr(wtp_t *pThis, uchar *pszMsg, size_t lenMsg);
|
|
rsRetVal wtpShutdownAll(wtp_t *pThis, wtpState_t tShutdownCmd, struct timespec *ptTimeout);
|
|
PROTOTYPEObjClassInit(wtp);
|
|
PROTOTYPEObjClassExit(wtp);
|
|
PROTOTYPEpropSetMethFP(wtp, pfChkStopWrkr, rsRetVal(*pVal)(void*, int));
|
|
PROTOTYPEpropSetMethFP(wtp, pfRateLimiter, rsRetVal(*pVal)(void*));
|
|
PROTOTYPEpropSetMethFP(wtp, pfGetDeqBatchSize, rsRetVal(*pVal)(void*, int*));
|
|
PROTOTYPEpropSetMethFP(wtp, pfDoWork, rsRetVal(*pVal)(void*, void*));
|
|
PROTOTYPEpropSetMethFP(wtp, pfObjProcessed, rsRetVal(*pVal)(void*, wti_t*));
|
|
PROTOTYPEpropSetMeth(wtp, toWrkShutdown, long);
|
|
PROTOTYPEpropSetMeth(wtp, wtpState, wtpState_t);
|
|
PROTOTYPEpropSetMeth(wtp, iMaxWorkerThreads, int);
|
|
PROTOTYPEpropSetMeth(wtp, pUsr, void*);
|
|
PROTOTYPEpropSetMeth(wtp, iNumWorkerThreads, int);
|
|
PROTOTYPEpropSetMethPTR(wtp, pmutUsr, pthread_mutex_t);
|
|
|
|
#endif /* #ifndef WTP_H_INCLUDED */
|