MIZUTA Takeshi 0b830970c2 Don't refer to errno when the pthread library fails
When the pthread library fails, errno is referenced even though errno is not set.
Fix to refer to the return code of the pthread library instead of errno.
2022-03-03 10:34:54 +09:00

574 lines
18 KiB
C

/* wti.c
*
* This file implements the worker thread instance (wti) class.
*
* File begun on 2008-01-20 by RGerhards based on functions from the
* previous queue object class (the wti functions have been extracted)
*
* There is some in-depth documentation available in doc/dev_queue.html
* (and in the web doc set on https://www.rsyslog.com/doc/). Be sure to read it
* if you are getting aquainted to the object.
*
* Copyright 2008-2019 Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* -or-
* see COPYING.ASL20 in the source distribution
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "config.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <signal.h>
#include <pthread.h>
#include <errno.h>
#include "rsyslog.h"
#include "stringbuf.h"
#include "srUtils.h"
#include "errmsg.h"
#include "wtp.h"
#include "wti.h"
#include "obj.h"
#include "glbl.h"
#include "action.h"
#include "atomic.h"
#include "rsconf.h"
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(glbl)
pthread_key_t thrd_wti_key;
/* forward-definitions */
/* methods */
/* get the header for debug messages
* The caller must NOT free or otherwise modify the returned string!
*/
uchar * ATTR_NONNULL()
wtiGetDbgHdr(const wti_t *const pThis)
{
ISOBJ_TYPE_assert(pThis, wti);
if(pThis->pszDbgHdr == NULL)
return (uchar*) "wti"; /* should not normally happen */
else
return pThis->pszDbgHdr;
}
/* return the current worker processing state. For the sake of
* simplicity, we do not use the iRet interface. -- rgerhards, 2009-07-17
*/
int ATTR_NONNULL()
wtiGetState(wti_t *pThis)
{
return ATOMIC_FETCH_32BIT(&pThis->bIsRunning, &pThis->mutIsRunning);
}
/* join terminated worker thread
* This may be called in any thread state, it will be a NOP if the
* thread is not to join.
*/
void ATTR_NONNULL()
wtiJoinThrd(wti_t *const pThis)
{
int r;
ISOBJ_TYPE_assert(pThis, wti);
if(wtiGetState(pThis) == WRKTHRD_WAIT_JOIN) {
DBGPRINTF("%s: joining terminated worker\n", wtiGetDbgHdr(pThis));
if((r = pthread_join(pThis->thrdID, NULL)) != 0) {
LogMsg(r, RS_RET_INTERNAL_ERROR, LOG_WARNING,
"rsyslog bug? wti cannot join terminated wrkr");
}
DBGPRINTF("%s: worker fully terminated\n", wtiGetDbgHdr(pThis));
wtiSetState(pThis, WRKTHRD_STOPPED);
if(dbgTimeoutToStderr) {
fprintf(stderr, "rsyslog debug: %s: thread joined\n",
wtiGetDbgHdr(pThis));
}
}
}
/* Set this thread to "always running" state (can not be unset)
* rgerhards, 2009-07-20
*/
rsRetVal ATTR_NONNULL()
wtiSetAlwaysRunning(wti_t *pThis)
{
ISOBJ_TYPE_assert(pThis, wti);
pThis->bAlwaysRunning = RSTRUE;
return RS_RET_OK;
}
/* Set status (thread is running or not), actually an property of
* use for wtp, but we need to have it per thread instance (thus it
* is inside wti). -- rgerhards, 2009-07-17
*/
rsRetVal ATTR_NONNULL()
wtiSetState(wti_t *pThis, const int newVal)
{
ISOBJ_TYPE_assert(pThis, wti);
if(newVal == WRKTHRD_STOPPED) {
ATOMIC_STORE_0_TO_INT(&pThis->bIsRunning, &pThis->mutIsRunning);
} else {
ATOMIC_OR_INT_TO_INT(&pThis->bIsRunning, &pThis->mutIsRunning, newVal);
}
return RS_RET_OK;
}
/* advise all workers to start by interrupting them. That should unblock all srSleep()
* calls.
*/
rsRetVal
wtiWakeupThrd(wti_t *pThis)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
if(wtiGetState(pThis)) {
/* we first try the cooperative "cancel" interface */
pthread_kill(pThis->thrdID, SIGTTIN);
DBGPRINTF("sent SIGTTIN to worker thread %p\n", (void*) pThis->thrdID);
}
RETiRet;
}
/* Cancel the thread. If the thread is not running. But it is save and legal to
* call wtiCancelThrd() in such situations. This function only returns when the
* thread has terminated. Else we may get race conditions all over the code...
* Note that when waiting for the thread to terminate, we do a busy wait, checking
* progress every 10ms. It is very unlikely that we will ever cancel a thread
* and, if so, it will only happen at the end of the rsyslog run. So doing this
* kind of non-optimal wait is considered preferable over using condition variables.
* rgerhards, 2008-02-26
*/
rsRetVal ATTR_NONNULL()
wtiCancelThrd(wti_t *pThis, const uchar *const cancelobj)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
wtiJoinThrd(pThis);
if(wtiGetState(pThis) != WRKTHRD_STOPPED) {
LogMsg(0, RS_RET_ERR, LOG_WARNING, "%s: need to do cooperative cancellation "
"- some data may be lost, increase timeout?", cancelobj);
/* we first try the cooperative "cancel" interface */
pthread_kill(pThis->thrdID, SIGTTIN);
DBGPRINTF("sent SIGTTIN to worker thread %p, giving it a chance to terminate\n",
(void *) pThis->thrdID);
srSleep(0, 50000);
wtiJoinThrd(pThis);
}
if(wtiGetState(pThis) != WRKTHRD_STOPPED) {
LogMsg(0, RS_RET_ERR, LOG_WARNING, "%s: need to do hard cancellation", cancelobj);
if(dbgTimeoutToStderr) {
fprintf(stderr, "rsyslog debug: %s: need to do hard cancellation\n",
cancelobj);
}
pthread_cancel(pThis->thrdID);
pthread_kill(pThis->thrdID, SIGTTIN);
DBGPRINTF("cooperative worker termination failed, using cancellation...\n");
DBGOPRINT((obj_t*) pThis, "canceling worker thread\n");
pthread_cancel(pThis->thrdID);
/* now wait until the thread terminates... */
while(wtiGetState(pThis) != WRKTHRD_STOPPED && wtiGetState(pThis) != WRKTHRD_WAIT_JOIN) {
DBGOPRINT((obj_t*) pThis, "waiting on termination, state %d\n", wtiGetState(pThis));
srSleep(0, 10000);
}
}
wtiJoinThrd(pThis);
RETiRet;
}
/* note: this function is only called once in action.c */
rsRetVal
wtiNewIParam(wti_t *const pWti, action_t *const pAction, actWrkrIParams_t **piparams)
{
actWrkrInfo_t *const wrkrInfo = &(pWti->actWrkrInfo[pAction->iActionNbr]);
actWrkrIParams_t *iparams;
int newMax;
DEFiRet;
if(wrkrInfo->p.tx.currIParam == wrkrInfo->p.tx.maxIParams) {
/* we need to extend */
newMax = (wrkrInfo->p.tx.maxIParams == 0) ? CONF_IPARAMS_BUFSIZE
: 2 * wrkrInfo->p.tx.maxIParams;
CHKmalloc(iparams = realloc(wrkrInfo->p.tx.iparams,
sizeof(actWrkrIParams_t) * pAction->iNumTpls * newMax));
memset(iparams + (wrkrInfo->p.tx.currIParam * pAction->iNumTpls), 0,
sizeof(actWrkrIParams_t) * pAction->iNumTpls * (newMax - wrkrInfo->p.tx.maxIParams));
wrkrInfo->p.tx.iparams = iparams;
wrkrInfo->p.tx.maxIParams = newMax;
}
*piparams = wrkrInfo->p.tx.iparams + wrkrInfo->p.tx.currIParam * pAction->iNumTpls;
++wrkrInfo->p.tx.currIParam;
finalize_it:
RETiRet;
}
/* Destructor */
BEGINobjDestruct(wti) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(wti)
if(wtiGetState(pThis) != WRKTHRD_STOPPED) {
DBGPRINTF("%s: rsyslog bug: worker not stopped during shutdown\n",
wtiGetDbgHdr(pThis));
if(dbgTimeoutToStderr) {
fprintf(stderr, "RSYSLOG BUG: %s: worker not stopped during shutdown\n",
wtiGetDbgHdr(pThis));
} else {
assert(wtiGetState(pThis) == WRKTHRD_STOPPED);
}
}
/* actual destruction */
batchFree(&pThis->batch);
free(pThis->actWrkrInfo);
pthread_cond_destroy(&pThis->pcondBusy);
DESTROY_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
free(pThis->pszDbgHdr);
ENDobjDestruct(wti)
/* Standard-Constructor for the wti object
*/
BEGINobjConstruct(wti) /* be sure to specify the object type also in END macro! */
INIT_ATOMIC_HELPER_MUT(pThis->mutIsRunning);
pthread_cond_init(&pThis->pcondBusy, NULL);
ENDobjConstruct(wti)
/* Construction finalizer
* rgerhards, 2008-01-17
*/
rsRetVal
wtiConstructFinalize(wti_t *pThis)
{
DEFiRet;
int iDeqBatchSize;
ISOBJ_TYPE_assert(pThis, wti);
DBGPRINTF("%s: finalizing construction of worker instance data (for %d actions)\n",
wtiGetDbgHdr(pThis), runConf->actions.iActionNbr);
/* initialize our thread instance descriptor (no concurrency here) */
pThis->bIsRunning = WRKTHRD_STOPPED;
/* must use calloc as we need zero-init */
CHKmalloc(pThis->actWrkrInfo = calloc(runConf->actions.iActionNbr, sizeof(actWrkrInfo_t)));
if(pThis->pWtp == NULL) {
dbgprintf("wtiConstructFinalize: pWtp not set, this may be intentional\n");
FINALIZE;
}
/* we now alloc the array for user pointers. We obtain the max from the queue itself. */
CHKiRet(pThis->pWtp->pfGetDeqBatchSize(pThis->pWtp->pUsr, &iDeqBatchSize));
CHKiRet(batchInit(&pThis->batch, iDeqBatchSize));
finalize_it:
RETiRet;
}
/* cancellation cleanup handler for queueWorker ()
* Most importantly, it must bring back the batch into a consistent state.
* Keep in mind that cancellation is disabled if we run into
* the cancel cleanup handler (and have been cancelled).
* rgerhards, 2008-01-16
*/
static void
wtiWorkerCancelCleanup(void *arg)
{
wti_t *pThis = (wti_t*) arg;
wtp_t *pWtp;
ISOBJ_TYPE_assert(pThis, wti);
pWtp = pThis->pWtp;
ISOBJ_TYPE_assert(pWtp, wtp);
DBGPRINTF("%s: cancellation cleanup handler called.\n", wtiGetDbgHdr(pThis));
pWtp->pfObjProcessed(pWtp->pUsr, pThis);
DBGPRINTF("%s: done cancellation cleanup handler.\n", wtiGetDbgHdr(pThis));
}
/* wait for queue to become non-empty or timeout
* this is introduced as helper to support queue minimum batch sizes, but may
* also be used for other cases. This function waits until the queue is non-empty
* or a timeout occurs. The timeout must be passed in as absolute value.
* @returns 0 if timeout occurs (queue still empty), something else otherwise
*/
int ATTR_NONNULL()
wtiWaitNonEmpty(wti_t *const pThis, const struct timespec timeout)
{
wtp_t *__restrict__ const pWtp = pThis->pWtp;
int r;
DBGOPRINT((obj_t*) pThis, "waiting on queue to become non-empty\n");
if(d_pthread_cond_timedwait(&pThis->pcondBusy, pWtp->pmutUsr, &timeout) != 0) {
r = 0;
} else {
r = 1;
}
DBGOPRINT((obj_t*) pThis, "waited on queue to become non-empty, result %d\n", r);
return r;
}
/* wait for queue to become non-empty or timeout
* helper to wtiWorker. Note the the predicate is
* re-tested by the caller, so it is OK to NOT do it here.
* rgerhards, 2009-05-20
*/
static void ATTR_NONNULL()
doIdleProcessing(wti_t *const pThis, wtp_t *const pWtp, int *const pbInactivityTOOccurred)
{
struct timespec t;
DBGPRINTF("%s: worker IDLE, waiting for work.\n", wtiGetDbgHdr(pThis));
if(pThis->bAlwaysRunning) {
/* never shut down any started worker */
d_pthread_cond_wait(&pThis->pcondBusy, pWtp->pmutUsr);
} else {
timeoutComp(&t, pWtp->toWrkShutdown);/* get absolute timeout */
if(d_pthread_cond_timedwait(&pThis->pcondBusy, pWtp->pmutUsr, &t) != 0) {
DBGPRINTF("%s: inactivity timeout, worker terminating...\n", wtiGetDbgHdr(pThis));
*pbInactivityTOOccurred = 1; /* indicate we had a timeout */
}
}
DBGOPRINT((obj_t*) pThis, "worker awoke from idle processing\n");
}
/* generic worker thread framework. Note that we prohibit cancellation
* during almost all times, because it can have very undesired side effects.
* However, we may need to cancel a thread if the consumer blocks for too
* long (during shutdown). So what we do is block cancellation, and every
* consumer must enable it during the periods where it is safe.
*/
PRAGMA_DIAGNOSTIC_PUSH
PRAGMA_IGNORE_Wempty_body
rsRetVal
wtiWorker(wti_t *__restrict__ const pThis)
{
wtp_t *__restrict__ const pWtp = pThis->pWtp; /* our worker thread pool -- shortcut */
action_t *__restrict__ pAction;
rsRetVal localRet;
rsRetVal terminateRet;
actWrkrInfo_t *__restrict__ wrkrInfo;
int iCancelStateSave;
int i, j, k;
DEFiRet;
dbgSetThrdName(pThis->pszDbgHdr);
pthread_cleanup_push(wtiWorkerCancelCleanup, pThis);
int bInactivityTOOccurred = 0;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &iCancelStateSave);
DBGPRINTF("wti %p: worker starting\n", pThis);
/* now we have our identity, on to real processing */
/* note: in this loop, the mutex is "never" unlocked. Of course,
* this is not true: it actually is unlocked when the actual processing
* is done, as part of pWtp->pfDoWork() processing. Note that this
* function is required to re-lock it when done. We cannot do the
* lock/unlock here ourselfs, as pfDoWork() needs to access queue
* structures itself.
* The same goes for pfRateLimiter(). While we could unlock/lock when
* we call it, in practice the function is often called without any
* ratelimiting actually done. Only the rate limiter itself knows
* that. As such, it needs to bear the burden of doing the locking
* when required. -- rgerhards, 2013-11-20
*/
d_pthread_mutex_lock(pWtp->pmutUsr);
while(1) { /* loop will be broken below */
if(pWtp->pfRateLimiter != NULL) { /* call rate-limiter, if defined */
pWtp->pfRateLimiter(pWtp->pUsr);
}
/* first check if we are in shutdown process (but evaluate a bit later) */
terminateRet = wtpChkStopWrkr(pWtp, MUTEX_ALREADY_LOCKED);
if(terminateRet == RS_RET_TERMINATE_NOW) {
/* we now need to free the old batch */
localRet = pWtp->pfObjProcessed(pWtp->pUsr, pThis);
DBGOPRINT((obj_t*) pThis, "terminating worker because of "
"TERMINATE_NOW mode, del iRet %d\n", localRet);
break;
}
/* try to execute and process whatever we have */
localRet = pWtp->pfDoWork(pWtp->pUsr, pThis);
if(localRet == RS_RET_ERR_QUEUE_EMERGENCY) {
break; /* end of loop */
} else if(localRet == RS_RET_IDLE) {
if(terminateRet == RS_RET_TERMINATE_WHEN_IDLE || bInactivityTOOccurred) {
DBGOPRINT((obj_t*) pThis, "terminating worker terminateRet=%d, "
"bInactivityTOOccurred=%d\n", terminateRet, bInactivityTOOccurred);
break; /* end of loop */
}
doIdleProcessing(pThis, pWtp, &bInactivityTOOccurred);
continue; /* request next iteration */
}
bInactivityTOOccurred = 0; /* reset for next run */
}
d_pthread_mutex_unlock(pWtp->pmutUsr);
DBGPRINTF("DDDD: wti %p: worker cleanup action instances\n", pThis);
for(i = 0 ; i < runConf->actions.iActionNbr ; ++i) {
wrkrInfo = &(pThis->actWrkrInfo[i]);
dbgprintf("wti %p, action %d, ptr %p\n", pThis, i, wrkrInfo->actWrkrData);
if(wrkrInfo->actWrkrData != NULL) {
pAction = wrkrInfo->pAction;
actionRemoveWorker(pAction, wrkrInfo->actWrkrData);
pAction->pMod->mod.om.freeWrkrInstance(wrkrInfo->actWrkrData);
if(pAction->isTransactional) {
/* free iparam "cache" - we need to go through to max! */
for(j = 0 ; j < wrkrInfo->p.tx.maxIParams ; ++j) {
for(k = 0 ; k < pAction->iNumTpls ; ++k) {
free(actParam(wrkrInfo->p.tx.iparams,
pAction->iNumTpls, j, k).param);
}
}
free(wrkrInfo->p.tx.iparams);
wrkrInfo->p.tx.iparams = NULL;
wrkrInfo->p.tx.currIParam = 0;
wrkrInfo->p.tx.maxIParams = 0;
} else {
releaseDoActionParams(pAction, pThis, 1);
}
wrkrInfo->actWrkrData = NULL; /* re-init for next activation */
}
}
/* indicate termination */
pthread_cleanup_pop(0); /* remove cleanup handler */
pthread_setcancelstate(iCancelStateSave, NULL);
dbgprintf("wti %p: exiting\n", pThis);
RETiRet;
}
PRAGMA_DIAGNOSTIC_POP
/* some simple object access methods */
DEFpropSetMeth(wti, pWtp, wtp_t*)
/* set the debug header message
* The passed-in string is duplicated. So if the caller does not need
* it any longer, it must free it. Must be called only before object is finalized.
* rgerhards, 2008-01-09
*/
rsRetVal
wtiSetDbgHdr(wti_t *pThis, uchar *pszMsg, const size_t lenMsg)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, wti);
assert(pszMsg != NULL);
if(lenMsg < 1)
ABORT_FINALIZE(RS_RET_PARAM_ERROR);
if(pThis->pszDbgHdr != NULL) {
free(pThis->pszDbgHdr);
}
if((pThis->pszDbgHdr = malloc(lenMsg + 1)) == NULL)
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
memcpy(pThis->pszDbgHdr, pszMsg, lenMsg + 1); /* always think about the \0! */
finalize_it:
RETiRet;
}
/* This function returns (and creates if necessary) a dummy wti suitable
* for use by the rule engine. It is intended to be used for direct-mode
* main queues (folks, don't do that!). Once created, data is stored in
* thread-specific storage.
* Note: we do NOT do error checking -- if this functions fails, all the
* rest will fail as well... (also, it will only fail under OOM, so...).
* Memleak: we leak pWti's when run in direct mode. However, this is only
* a cosmetic leak, as we need them until all inputs are terminated,
* what means essentially until rsyslog itself is terminated. So we
* don't care -- it's just not nice in valgrind, but that's it.
*/
wti_t *
wtiGetDummy(void)
{
wti_t *pWti;
pWti = (wti_t*) pthread_getspecific(thrd_wti_key);
if(pWti == NULL) {
wtiConstruct(&pWti);
if(pWti != NULL)
wtiConstructFinalize(pWti);
if(pthread_setspecific(thrd_wti_key, pWti) != 0) {
DBGPRINTF("wtiGetDummy: error setspecific thrd_wti_key\n");
}
}
return pWti;
}
/* dummy */
static rsRetVal wtiQueryInterface(interface_t __attribute__((unused)) *i) { return RS_RET_NOT_IMPLEMENTED; }
/* exit our class
*/
BEGINObjClassExit(wti, OBJ_IS_CORE_MODULE) /* CHANGE class also in END MACRO! */
CODESTARTObjClassExit(nsdsel_gtls)
/* release objects we no longer need */
objRelease(glbl, CORE_COMPONENT);
pthread_key_delete(thrd_wti_key);
ENDObjClassExit(wti)
/* Initialize the wti class. Must be called as the very first method
* before anything else is called inside this class.
* rgerhards, 2008-01-09
*/
BEGINObjClassInit(wti, 1, OBJ_IS_CORE_MODULE) /* one is the object version (most important for persisting) */
int r;
/* request objects we use */
CHKiRet(objUse(glbl, CORE_COMPONENT));
r = pthread_key_create(&thrd_wti_key, NULL);
if(r != 0) {
dbgprintf("wti.c: pthread_key_create failed\n");
ABORT_FINALIZE(RS_RET_ERR);
}
ENDObjClassInit(wti)