rsyslog/runtime/netstrms.c
Rainer Gerhards ad1fd213a7
imtcp: major multithreading and performance improvements
This commit significantly enhances imtcp by introducing a fully
functional worker thread pool, enabling true multi-threaded
processing for better scalability under high loads. This is
particularly beneficial when using TLS connections.

Notable changes:
- Implemented a complete worker pool for imtcp.
- Introduced the `workerthreads` config parameter for tuning
  concurrency.
- Improved epoll efficiency by enabling edge-triggered mode.
- Added starvation handling via `starvationProtection.maxReads`.
- Refactored session accept logic and optimized network object
  handling.
- Removed an obsolete network driver layer for event notification.
- Fixed multiple issues related to message timing, EPOLLERR
  handling, and tests.
- Improved performance in poll() mode by reducing redundant
  allocations.
- Introduced new CI tests for imtcp without epoll.
- Allowed disabling imtcp tests via a new configure switch.
- Added new impstats counters for worker thread pool statistics.

Details:
- The worker pool replaces an outdated experimental
  implementation.
- If `workerthreads=1`, no worker pool is created to minimize
  context switches.
- Moves worker pool variables inside `tcpsrv` instance to
  prevent conflicts.
- Extracts session `accept()` logic into a dedicated function
  for clarity.
- Fixes message ordering inconsistencies in multi-threaded
  scenarios.
- Properly handles `EPOLLERR` notifications to improve error
  resilience.
- Optimizes poll() mode by avoiding unnecessary reallocation
  of file descriptors.
- Replaces the old network driver layer for event notification
  with a streamlined solution.
  - Now uses **conditional compilation** to select the best
    method (epoll or poll) at build time.
  - This significantly reduces code complexity, improves
    maintainability, and boosts performance.
- The previous "thread pool" was a rough experiment that did
  not perform significantly better than single-threaded mode.
  - The **new implementation** allows multiple worker threads
    on platforms with `epoll`.
  - On non-epoll systems, an optimized **poll() based
    single-threaded approach** is used, which is expected to
    perform better than the old "thread pool."
- Adds `pthread_setname_np` only when available to improve
  portability.
- Fixes test cases that assumed strict message timing, which
  was unreliable.
- Reduces test parallelism for TSAN CI runs to prevent
  resource exhaustion.
- Moves a test case to `imdiag` to ensure stable execution.
- Provides a new CI environment to verify `imtcp` behavior
  without epoll.
- Introduces `--enable-imtcp-tests` configure switch for test
  flexibility.
- Improves debug logging and adds better error handling for
  worker pool startup.

New configuration parameters:
- `workerthreads`: Defines the number of worker threads for
  imtcp. If set to 1, no worker pool is created.
- `starvationProtection.maxReads`: Defines the maximum number
  of consecutive reads a worker can perform before being
  interrupted to allow other sessions to be processed.

New impstats counters (emitted only when `workerthreads > 1`):
- `runs`: Number of times the worker thread has been invoked.
- `read`: Number of read calls performed by the worker.
  For TLS, this includes read/write calls.
- `accept`: Number of `accept()` calls handled by the worker.
- `starvation_protect`: Number of times a socket was sent
  back to the queue due to reaching the maximum number of
  consecutive requests, ensuring fair scheduling of sessions.

These changes significantly enhance rsyslog’s TCP handling
performance and stability, particularly in high-volume
environments.

Closes #5529, #5532, #5578, #5580.
2025-03-01 14:01:20 +01:00

559 lines
14 KiB
C

/* netstrms.c
*
* Work on this module begun 2008-04-23 by Rainer Gerhards.
*
* Copyright 2008-2025 Rainer Gerhards and Adiscon GmbH.
*
* This file is part of the rsyslog runtime library.
*
* The rsyslog runtime library is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* The rsyslog runtime library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with the rsyslog runtime library. If not, see <http://www.gnu.org/licenses/>.
*
* A copy of the GPL can be found in the file "COPYING" in this distribution.
* A copy of the LGPL can be found in the file "COPYING.LESSER" in this distribution.
*/
#include "config.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <errno.h>
#include "rsyslog.h"
#include "module-template.h"
#include "obj.h"
#include "nsd.h"
#include "netstrm.h"
#include "netstrms.h"
#include "rsconf.h"
MODULE_TYPE_LIB
MODULE_TYPE_NOKEEP
/* static data */
DEFobjStaticHelpers
DEFobjCurrIf(glbl)
DEFobjCurrIf(netstrm)
/* load our low-level driver. This must be done before any
* driver-specific functions (allmost all...) can be carried
* out. Note that the driver's .ifIsLoaded is correctly
* initialized by calloc() and we depend on that.
* WARNING: this code is mostly identical to similar code in
* nssel.c - TODO: abstract it and move it to some common place.
* rgerhards, 2008-04-18
*/
static rsRetVal
loadDrvr(netstrms_t *pThis)
{
DEFiRet;
uchar *pBaseDrvrName;
uchar szDrvrName[48]; /* 48 shall be large enough */
pBaseDrvrName = pThis->pBaseDrvrName;
if(pBaseDrvrName == NULL) /* if no drvr name is set, use system default */
pBaseDrvrName = glbl.GetDfltNetstrmDrvr(runConf);
if(snprintf((char*)szDrvrName, sizeof(szDrvrName), "lmnsd_%s", pBaseDrvrName) == sizeof(szDrvrName))
ABORT_FINALIZE(RS_RET_DRVRNAME_TOO_LONG);
CHKmalloc(pThis->pDrvrName = (uchar*) strdup((char*)szDrvrName));
pThis->Drvr.ifVersion = nsdCURR_IF_VERSION;
/* The pDrvrName+2 below is a hack to obtain the object name. It
* safes us to have yet another variable with the name without "lm" in
* front of it. If we change the module load interface, we may re-think
* about this hack, but for the time being it is efficient and clean
* enough. -- rgerhards, 2008-04-18
*/
CHKiRet(obj.UseObj(__FILE__, szDrvrName+2, szDrvrName, (void*) &pThis->Drvr));
finalize_it:
if(iRet != RS_RET_OK) {
if(pThis->pDrvrName != NULL) {
free(pThis->pDrvrName);
pThis->pDrvrName = NULL;
}
}
RETiRet;
}
/* Standard-Constructor */
BEGINobjConstruct(netstrms) /* be sure to specify the object type also in END macro! */
ENDobjConstruct(netstrms)
/* destructor for the netstrms object */
BEGINobjDestruct(netstrms) /* be sure to specify the object type also in END and CODESTART macros! */
CODESTARTobjDestruct(netstrms)
/* and now we must release our driver, if we got one. We use the presence of
* a driver name string as load indicator (because we also need that string
* to release the driver
*/
if(pThis->pDrvrName != NULL) {
obj.ReleaseObj(__FILE__, pThis->pDrvrName+2, pThis->pDrvrName, (void*) &pThis->Drvr);
free(pThis->pDrvrName);
}
if(pThis->pszDrvrAuthMode != NULL) {
free(pThis->pszDrvrAuthMode);
pThis->pszDrvrAuthMode = NULL;
}
if(pThis->pszDrvrPermitExpiredCerts != NULL) {
free(pThis->pszDrvrPermitExpiredCerts);
pThis->pszDrvrPermitExpiredCerts = NULL;
}
free((void*)pThis->pszDrvrCAFile);
pThis->pszDrvrCAFile = NULL;
free((void*)pThis->pszDrvrCRLFile);
pThis->pszDrvrCRLFile = NULL;
free((void*)pThis->pszDrvrKeyFile);
pThis->pszDrvrKeyFile = NULL;
free((void*)pThis->pszDrvrCertFile);
pThis->pszDrvrCertFile = NULL;
if(pThis->pBaseDrvrName != NULL) {
free(pThis->pBaseDrvrName);
pThis->pBaseDrvrName = NULL;
}
if(pThis->gnutlsPriorityString != NULL) {
free(pThis->gnutlsPriorityString);
pThis->gnutlsPriorityString = NULL;
}
ENDobjDestruct(netstrms)
/* ConstructionFinalizer */
static rsRetVal
netstrmsConstructFinalize(netstrms_t *pThis)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, netstrms);
iRet = loadDrvr(pThis);
RETiRet;
}
static rsRetVal
SetSynBacklog(netstrms_t *pThis, const int iSynBacklog)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, netstrms);
pThis->iSynBacklog = iSynBacklog;
RETiRet;
}
/* set the base driver name. If the driver name
* is set to NULL, the previously set name is deleted but
* no name set again (which results in the system default being
* used)-- rgerhards, 2008-05-05
*/
static rsRetVal
SetDrvrName(netstrms_t *pThis, uchar *pszName)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, netstrms);
if(pThis->pBaseDrvrName != NULL) {
free(pThis->pBaseDrvrName);
pThis->pBaseDrvrName = NULL;
}
if(pszName != NULL) {
CHKmalloc(pThis->pBaseDrvrName = (uchar*) strdup((char*) pszName));
}
finalize_it:
RETiRet;
}
/* set the driver's permitted peers -- rgerhards, 2008-05-19 */
static rsRetVal
SetDrvrPermPeers(netstrms_t *pThis, permittedPeers_t *pPermPeers)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, netstrms);
pThis->pPermPeers = pPermPeers;
RETiRet;
}
/* return the driver's permitted peers
* We use non-standard calling conventions because it makes an awful lot
* of sense here.
* rgerhards, 2008-05-19
*/
static permittedPeers_t*
GetDrvrPermPeers(netstrms_t *pThis)
{
ISOBJ_TYPE_assert(pThis, netstrms);
return pThis->pPermPeers;
}
/* set the driver auth mode -- rgerhards, 2008-05-19 */
static rsRetVal
SetDrvrAuthMode(netstrms_t *pThis, uchar *mode)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, netstrms);
CHKmalloc(pThis->pszDrvrAuthMode = (uchar*)strdup((char*)mode));
finalize_it:
RETiRet;
}
/* return the driver auth mode
* We use non-standard calling conventions because it makes an awful lot
* of sense here.
* rgerhards, 2008-05-19
*/
static uchar*
GetDrvrAuthMode(netstrms_t *pThis)
{
ISOBJ_TYPE_assert(pThis, netstrms);
return pThis->pszDrvrAuthMode;
}
/* return the driver permitexpiredcerts mode
* We use non-standard calling conventions because it makes an awful lot
* of sense here.
* alorbach, 2018-12-21
*/
static uchar*
GetDrvrPermitExpiredCerts(netstrms_t *pThis)
{
ISOBJ_TYPE_assert(pThis, netstrms);
return pThis->pszDrvrPermitExpiredCerts;
}
/* set the driver permitexpiredcerts mode -- alorbach, 2018-12-20
*/
static rsRetVal
SetDrvrPermitExpiredCerts(netstrms_t *pThis, uchar *mode)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, netstrms);
if (mode != NULL) {
CHKmalloc(pThis->pszDrvrPermitExpiredCerts = (uchar*) strdup((char*)mode));
}
finalize_it:
RETiRet;
}
static rsRetVal
SetDrvrTlsCAFile(netstrms_t *pThis, const uchar *mode)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, netstrms);
if (mode != NULL) {
CHKmalloc(pThis->pszDrvrCAFile = (uchar*) strdup((char*)mode));
}
finalize_it:
RETiRet;
}
static rsRetVal
SetDrvrTlsCRLFile(netstrms_t *pThis, const uchar *mode)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, netstrms);
if (mode != NULL) {
CHKmalloc(pThis->pszDrvrCRLFile = (uchar*) strdup((char*)mode));
}
finalize_it:
RETiRet;
}
static rsRetVal
SetDrvrTlsKeyFile(netstrms_t *pThis, const uchar *mode)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, netstrms);
if (mode != NULL) {
CHKmalloc(pThis->pszDrvrKeyFile = (uchar*) strdup((char*)mode));
}
finalize_it:
RETiRet;
}
static rsRetVal
SetDrvrTlsCertFile(netstrms_t *pThis, const uchar *mode)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, netstrms);
if (mode != NULL) {
CHKmalloc(pThis->pszDrvrCertFile = (uchar*) strdup((char*)mode));
}
finalize_it:
RETiRet;
}
/* Set the priorityString
* PascalWithopf 2017-08-16
*/
static rsRetVal
SetDrvrGnutlsPriorityString(netstrms_t *pThis, uchar *iVal)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, netstrms);
CHKmalloc(pThis->gnutlsPriorityString = (uchar*)strdup((char*)iVal));
finalize_it:
RETiRet;
}
/* return the priorityString
* PascalWithopf, 2017-08-16
*/
static uchar*
GetDrvrGnutlsPriorityString(netstrms_t *pThis)
{
ISOBJ_TYPE_assert(pThis, netstrms);
return pThis->gnutlsPriorityString;
}
/* set the driver mode -- rgerhards, 2008-04-30 */
static rsRetVal
SetDrvrMode(netstrms_t *pThis, int iMode)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, netstrms);
pThis->iDrvrMode = iMode;
RETiRet;
}
/* return the driver mode
* We use non-standard calling conventions because it makes an awful lot
* of sense here.
* rgerhards, 2008-04-30
*/
static int
GetDrvrMode(netstrms_t *pThis)
{
ISOBJ_TYPE_assert(pThis, netstrms);
return pThis->iDrvrMode;
}
/* set the driver cert extended key usage check setting -- jvymazal, 2019-08-16 */
static rsRetVal
SetDrvrCheckExtendedKeyUsage(netstrms_t *pThis, int ChkExtendedKeyUsage)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, netstrms);
pThis->DrvrChkExtendedKeyUsage = ChkExtendedKeyUsage;
RETiRet;
}
/* return the driver cert extended key usage check setting
* jvymazal, 2019-08-16
*/
static int
GetDrvrCheckExtendedKeyUsage(netstrms_t *pThis)
{
ISOBJ_TYPE_assert(pThis, netstrms);
return pThis->DrvrChkExtendedKeyUsage;
}
/* set the driver name checking policy -- jvymazal, 2019-08-16 */
static rsRetVal
SetDrvrPrioritizeSAN(netstrms_t *pThis, int prioritizeSan)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, netstrms);
pThis->DrvrPrioritizeSan = prioritizeSan;
RETiRet;
}
/* return the driver name checking policy
* jvymazal, 2019-08-16
*/
static int
GetDrvrPrioritizeSAN(netstrms_t *pThis)
{
ISOBJ_TYPE_assert(pThis, netstrms);
return pThis->DrvrPrioritizeSan;
}
/* set the driver TlsVerifyDepth -- alorbach, 2019-12-20 */
static rsRetVal
SetDrvrTlsVerifyDepth(netstrms_t *pThis, int verifyDepth)
{
DEFiRet;
ISOBJ_TYPE_assert(pThis, netstrms);
pThis->DrvrVerifyDepth = verifyDepth;
RETiRet;
}
/* return the driver TlsVerifyDepth
* alorbach, 2019-12-20
*/
static int
GetDrvrTlsVerifyDepth(netstrms_t *pThis)
{
ISOBJ_TYPE_assert(pThis, netstrms);
return pThis->DrvrVerifyDepth;
}
static const uchar *
GetDrvrTlsCAFile(netstrms_t *pThis)
{
ISOBJ_TYPE_assert(pThis, netstrms);
return pThis->pszDrvrCAFile;
}
static const uchar *
GetDrvrTlsCRLFile(netstrms_t *pThis)
{
ISOBJ_TYPE_assert(pThis, netstrms);
return pThis->pszDrvrCRLFile;
}
static const uchar *
GetDrvrTlsKeyFile(netstrms_t *pThis)
{
ISOBJ_TYPE_assert(pThis, netstrms);
return pThis->pszDrvrKeyFile;
}
static const uchar *
GetDrvrTlsCertFile(netstrms_t *pThis)
{
ISOBJ_TYPE_assert(pThis, netstrms);
return pThis->pszDrvrCertFile;
}
/* create an instance of a netstrm object. It is initialized with default
* values. The current driver is used. The caller may set netstrm properties
* and must call ConstructFinalize().
*/
static rsRetVal
CreateStrm(netstrms_t *pThis, netstrm_t **ppStrm)
{
netstrm_t *pStrm = NULL;
DEFiRet;
CHKiRet(objUse(netstrm, DONT_LOAD_LIB));
CHKiRet(netstrm.Construct(&pStrm));
/* we copy over our driver structure. We could provide a pointer to
* ourselves, but that costs some performance on each driver invocation.
* As we already have hefty indirection (and thus performance toll), I
* prefer to copy over the function pointers here. -- rgerhards, 2008-04-23
*/
memcpy(&pStrm->Drvr, &pThis->Drvr, sizeof(pThis->Drvr));
pStrm->pNS = pThis;
*ppStrm = pStrm;
finalize_it:
if(iRet != RS_RET_OK) {
if(pStrm != NULL)
netstrm.Destruct(&pStrm);
}
RETiRet;
}
/* queryInterface function */
BEGINobjQueryInterface(netstrms)
CODESTARTobjQueryInterface(netstrms)
if(pIf->ifVersion != netstrmsCURR_IF_VERSION) {/* check for current version, increment on each change */
ABORT_FINALIZE(RS_RET_INTERFACE_NOT_SUPPORTED);
}
/* ok, we have the right interface, so let's fill it
* Please note that we may also do some backwards-compatibility
* work here (if we can support an older interface version - that,
* of course, also affects the "if" above).
*/
pIf->Construct = netstrmsConstruct;
pIf->ConstructFinalize = netstrmsConstructFinalize;
pIf->Destruct = netstrmsDestruct;
pIf->CreateStrm = CreateStrm;
pIf->SetSynBacklog = SetSynBacklog;
pIf->SetDrvrName = SetDrvrName;
pIf->SetDrvrMode = SetDrvrMode;
pIf->GetDrvrMode = GetDrvrMode;
pIf->SetDrvrAuthMode = SetDrvrAuthMode;
pIf->GetDrvrAuthMode = GetDrvrAuthMode;
pIf->SetDrvrPermitExpiredCerts = SetDrvrPermitExpiredCerts;
pIf->GetDrvrPermitExpiredCerts = GetDrvrPermitExpiredCerts;
pIf->SetDrvrGnutlsPriorityString = SetDrvrGnutlsPriorityString;
pIf->GetDrvrGnutlsPriorityString = GetDrvrGnutlsPriorityString;
pIf->SetDrvrPermPeers = SetDrvrPermPeers;
pIf->GetDrvrPermPeers = GetDrvrPermPeers;
pIf->SetDrvrCheckExtendedKeyUsage = SetDrvrCheckExtendedKeyUsage;
pIf->GetDrvrCheckExtendedKeyUsage = GetDrvrCheckExtendedKeyUsage;
pIf->SetDrvrPrioritizeSAN = SetDrvrPrioritizeSAN;
pIf->GetDrvrPrioritizeSAN = GetDrvrPrioritizeSAN;
pIf->SetDrvrTlsVerifyDepth = SetDrvrTlsVerifyDepth;
pIf->GetDrvrTlsVerifyDepth = GetDrvrTlsVerifyDepth;
pIf->GetDrvrTlsCAFile = GetDrvrTlsCAFile;
pIf->GetDrvrTlsCRLFile = GetDrvrTlsCRLFile;
pIf->GetDrvrTlsKeyFile = GetDrvrTlsKeyFile;
pIf->GetDrvrTlsCertFile = GetDrvrTlsCertFile;
pIf->SetDrvrTlsCAFile = SetDrvrTlsCAFile;
pIf->SetDrvrTlsCRLFile = SetDrvrTlsCRLFile;
pIf->SetDrvrTlsKeyFile = SetDrvrTlsKeyFile;
pIf->SetDrvrTlsCertFile = SetDrvrTlsCertFile;
finalize_it:
ENDobjQueryInterface(netstrms)
/* exit our class */
BEGINObjClassExit(netstrms, OBJ_IS_LOADABLE_MODULE) /* CHANGE class also in END MACRO! */
CODESTARTObjClassExit(netstrms)
/* release objects we no longer need */
objRelease(glbl, CORE_COMPONENT);
objRelease(netstrm, DONT_LOAD_LIB);
ENDObjClassExit(netstrms)
/* Initialize the netstrms class. Must be called as the very first method
* before anything else is called inside this class.
* rgerhards, 2008-02-19
*/
BEGINAbstractObjClassInit(netstrms, 1, OBJ_IS_CORE_MODULE) /* class, version */
/* request objects we use */
CHKiRet(objUse(glbl, CORE_COMPONENT));
/* set our own handlers */
ENDObjClassInit(netstrms)
/* --------------- here now comes the plumbing that makes as a library module --------------- */
BEGINmodExit
CODESTARTmodExit
netstrmsClassExit();
netstrmClassExit(); /* we use this object, so we must exit it after we are finished */
ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_LIB_QUERIES
ENDqueryEtryPt
BEGINmodInit()
CODESTARTmodInit
*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
/* Initialize all classes that are in our module - this includes ourselfs */
CHKiRet(netstrmClassInit(pModInfo));
CHKiRet(netstrmsClassInit(pModInfo));
ENDmodInit