mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-15 10:30:40 +01:00
improved testbench / solved imdiag race condition
imdiag/imtcp had a modload race condition (as imdiag is a testing aid, this has no implications for production deployments). Also, I replaced netcat by a custom program to talk to imdiag. This, for the first time ever, is now a Java program. I plan to add some GUI troubleshooting tools and thought it is a good idea to start doing things in Java that can simply be done in that language.
This commit is contained in:
parent
aaef9aa018
commit
7a7ec37f99
@ -1,4 +1,7 @@
|
||||
---------------------------------------------------------------------------
|
||||
Version 4.3.? [DEVEL] (rgerhards), 2009-??-??
|
||||
- bugfix: imdiag/imtcp had a race condition
|
||||
---------------------------------------------------------------------------
|
||||
Version 4.3.1 [DEVEL] (rgerhards), 2009-05-25
|
||||
- added capability to run multiple tcp listeners (on different ports)
|
||||
- performance enhancement: imtcp calls parser no longer on input thread
|
||||
|
||||
@ -131,5 +131,5 @@ SUBDIRS += tests
|
||||
# temporarily be removed below. The intent behind forcing everthing to compile
|
||||
# in a make distcheck is so that we detect code that accidently was not updated
|
||||
# when some global update happened.
|
||||
DISTCHECK_CONFIGURE_FLAGS=--enable-gssapi_krb5 --enable-imfile --enable-snmp --enable-pgsql --enable-libdbi --enable-mysql --enable-omtemplate --enable-imtemplate --enable-relp --enable-rsyslogd --enable-mail --enable-klog --enable-diagtools --enable-gnutls --enable-omstdout --enable-omprog --enable-imdiag
|
||||
DISTCHECK_CONFIGURE_FLAGS=--enable-gssapi_krb5 --enable-imfile --enable-snmp --enable-pgsql --enable-libdbi --enable-mysql --enable-omtemplate --enable-imtemplate --enable-relp --enable-rsyslogd --enable-mail --enable-klog --enable-diagtools --enable-gnutls --enable-omstdout --enable-omprog --enable-imdiag --enable-shave
|
||||
ACLOCAL_AMFLAGS = -I m4
|
||||
|
||||
@ -40,6 +40,7 @@
|
||||
#include <time.h>
|
||||
#include <assert.h>
|
||||
#include <errno.h>
|
||||
#include <pthread.h>
|
||||
#ifdef OS_BSD
|
||||
# include "libgen.h"
|
||||
#endif
|
||||
@ -61,6 +62,14 @@
|
||||
DEFobjStaticHelpers
|
||||
DEFobjCurrIf(errmsg)
|
||||
|
||||
/* we must ensure that only one thread at one time tries to load or unload
|
||||
* modules, otherwise we may see race conditions. This first came up with
|
||||
* imdiag/imtcp, which both use the same stream drivers. Below is the mutex
|
||||
* for that handling.
|
||||
* rgerhards, 2009-05-25
|
||||
*/
|
||||
static pthread_mutex_t mutLoadUnload;
|
||||
|
||||
static modInfo_t *pLoadedModules = NULL; /* list of currently-loaded modules */
|
||||
static modInfo_t *pLoadedModulesLast = NULL; /* tail-pointer */
|
||||
|
||||
@ -479,6 +488,8 @@ modUnlinkAndDestroy(modInfo_t **ppThis)
|
||||
pThis = *ppThis;
|
||||
assert(pThis != NULL);
|
||||
|
||||
pthread_mutex_lock(&mutLoadUnload);
|
||||
|
||||
/* first check if we are permitted to unload */
|
||||
if(pThis->eType == eMOD_LIB) {
|
||||
if(pThis->uRefCnt > 0) {
|
||||
@ -513,6 +524,7 @@ modUnlinkAndDestroy(modInfo_t **ppThis)
|
||||
moduleDestruct(pThis);
|
||||
|
||||
finalize_it:
|
||||
pthread_mutex_unlock(&mutLoadUnload);
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
@ -587,6 +599,8 @@ Load(uchar *pModName)
|
||||
assert(pModName != NULL);
|
||||
dbgprintf("Requested to load module '%s'\n", pModName);
|
||||
|
||||
pthread_mutex_lock(&mutLoadUnload);
|
||||
|
||||
iModNameLen = strlen((char *) pModName);
|
||||
if(iModNameLen > 3 && !strcmp((char *) pModName + iModNameLen - 3, ".so")) {
|
||||
iModNameLen -= 3;
|
||||
@ -696,6 +710,7 @@ Load(uchar *pModName)
|
||||
}
|
||||
|
||||
finalize_it:
|
||||
pthread_mutex_unlock(&mutLoadUnload);
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
@ -791,6 +806,7 @@ BEGINObjClassExit(module, OBJ_IS_LOADABLE_MODULE) /* CHANGE class also in END MA
|
||||
CODESTARTObjClassExit(module)
|
||||
/* release objects we no longer need */
|
||||
objRelease(errmsg, CORE_COMPONENT);
|
||||
pthread_mutex_destroy(&mutLoadUnload);
|
||||
|
||||
# ifdef DEBUG
|
||||
modUsrPrintAll(); /* debug aid - TODO: integrate with debug.c, at least the settings! */
|
||||
@ -833,6 +849,7 @@ ENDobjQueryInterface(module)
|
||||
*/
|
||||
BEGINAbstractObjClassInit(module, 1, OBJ_IS_CORE_MODULE) /* class, version - CHANGE class also in END MACRO! */
|
||||
uchar *pModPath;
|
||||
pthread_mutexattr_t mutAttr;
|
||||
|
||||
/* use any module load path specified in the environment */
|
||||
if((pModPath = (uchar*) getenv("RSYSLOG_MODDIR")) != NULL) {
|
||||
@ -850,6 +867,10 @@ BEGINAbstractObjClassInit(module, 1, OBJ_IS_CORE_MODULE) /* class, version - CHA
|
||||
SetModDir(glblModPath);
|
||||
}
|
||||
|
||||
pthread_mutexattr_init(&mutAttr);
|
||||
pthread_mutexattr_settype(&mutAttr, PTHREAD_MUTEX_RECURSIVE);
|
||||
pthread_mutex_init(&mutLoadUnload, &mutAttr);
|
||||
|
||||
/* request objects we use */
|
||||
CHKiRet(objUse(errmsg, CORE_COMPONENT));
|
||||
ENDObjClassInit(module)
|
||||
|
||||
23
tcps_sess.c
23
tcps_sess.c
@ -58,7 +58,6 @@ static int iMaxLine; /* maximum size of a single message */
|
||||
|
||||
/* forward definitions */
|
||||
static rsRetVal Close(tcps_sess_t *pThis);
|
||||
static rsRetVal defaultDoSubmitMessage(tcps_sess_t *pThis, uchar*, int);
|
||||
|
||||
|
||||
/* Standard-Constructor */
|
||||
@ -66,7 +65,6 @@ BEGINobjConstruct(tcps_sess) /* be sure to specify the object type also in END m
|
||||
pThis->iMsg = 0; /* just make sure... */
|
||||
pThis->bAtStrtOfFram = 1; /* indicate frame header expected */
|
||||
pThis->eFraming = TCP_FRAMING_OCTET_STUFFING; /* just make sure... */
|
||||
pThis->DoSubmitMessage = defaultDoSubmitMessage;
|
||||
/* now allocate the message reception buffer */
|
||||
CHKmalloc(pThis->pMsg = (uchar*) malloc(sizeof(uchar) * iMaxLine + 1));
|
||||
finalize_it:
|
||||
@ -228,11 +226,8 @@ SetOnMsgReceive(tcps_sess_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t*, uchar
|
||||
* rgerhards, 2009-04-23
|
||||
*/
|
||||
static rsRetVal
|
||||
defaultDoSubmitMessage(tcps_sess_t *pThis, uchar *pszMsg, int iLenMsg)
|
||||
defaultDoSubmitMessage(tcps_sess_t *pThis)
|
||||
{
|
||||
// TODO: make calling this overridable so that the diag module can ask to be called
|
||||
// and so it can do its work right in this entry point (but we need to check that
|
||||
// we have the capability to send a reply at this point).
|
||||
msg_t *pMsg;
|
||||
struct syslogTime stTime;
|
||||
time_t ttGenTime;
|
||||
@ -240,6 +235,11 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, uchar *pszMsg, int iLenMsg)
|
||||
|
||||
ISOBJ_TYPE_assert(pThis, tcps_sess);
|
||||
|
||||
if(pThis->DoSubmitMessage != NULL) {
|
||||
pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg);
|
||||
FINALIZE;
|
||||
}
|
||||
|
||||
//TODO: if((iTimeRequery == 0) || (iNbrTimeUsed++ % iTimeRequery) == 0) {
|
||||
datetime.getCurrTime(&stTime, &ttGenTime);
|
||||
//}
|
||||
@ -247,7 +247,7 @@ defaultDoSubmitMessage(tcps_sess_t *pThis, uchar *pszMsg, int iLenMsg)
|
||||
CHKiRet(msgConstructWithTime(&pMsg, &stTime, ttGenTime));
|
||||
/* first trim the buffer to what we have actually received */
|
||||
CHKmalloc(pMsg->pszRawMsg = malloc(sizeof(uchar) * pThis->iMsg));
|
||||
memcpy(pMsg->pszRawMsg, pszMsg, iLenMsg);
|
||||
memcpy(pMsg->pszRawMsg, pThis->pMsg, pThis->iMsg);
|
||||
pMsg->iLenRawMsg = pThis->iMsg;
|
||||
MsgSetInputName(pMsg, pThis->pLstnInfo->pszInputName);
|
||||
MsgSetFlowControlType(pMsg, eFLOWCTL_LIGHT_DELAY);
|
||||
@ -266,6 +266,7 @@ finalize_it:
|
||||
}
|
||||
|
||||
|
||||
|
||||
/* This should be called before a normal (non forced) close
|
||||
* of a TCP session. This function checks if there is any unprocessed
|
||||
* message left in the TCP stream. Such a message is probably a
|
||||
@ -305,7 +306,7 @@ PrepareClose(tcps_sess_t *pThis)
|
||||
* this case.
|
||||
*/
|
||||
dbgprintf("Extra data at end of stream in legacy syslog/tcp message - processing\n");
|
||||
pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg);
|
||||
defaultDoSubmitMessage(pThis);
|
||||
}
|
||||
|
||||
finalize_it:
|
||||
@ -386,7 +387,7 @@ processDataRcvd(tcps_sess_t *pThis, char c)
|
||||
if(pThis->iMsg >= iMaxLine) {
|
||||
/* emergency, we now need to flush, no matter if we are at end of message or not... */
|
||||
dbgprintf("error: message received is larger than max msg size, we split it\n");
|
||||
pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg);
|
||||
defaultDoSubmitMessage(pThis);
|
||||
/* we might think if it is better to ignore the rest of the
|
||||
* message than to treat it as a new one. Maybe this is a good
|
||||
* candidate for a configuration parameter...
|
||||
@ -397,7 +398,7 @@ processDataRcvd(tcps_sess_t *pThis, char c)
|
||||
if(( (c == '\n')
|
||||
|| ((pThis->pSrv->addtlFrameDelim != TCPSRV_NO_ADDTL_DELIMITER) && (c == pThis->pSrv->addtlFrameDelim))
|
||||
) && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delimiter? */
|
||||
pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg);
|
||||
defaultDoSubmitMessage(pThis);
|
||||
pThis->inputState = eAtStrtFram;
|
||||
} else {
|
||||
/* IMPORTANT: here we copy the actual frame content to the message - for BOTH framing modes!
|
||||
@ -414,7 +415,7 @@ processDataRcvd(tcps_sess_t *pThis, char c)
|
||||
pThis->iOctetsRemain--;
|
||||
if(pThis->iOctetsRemain < 1) {
|
||||
/* we have end of frame! */
|
||||
pThis->DoSubmitMessage(pThis, pThis->pMsg, pThis->iMsg);
|
||||
defaultDoSubmitMessage(pThis);
|
||||
pThis->inputState = eAtStrtFram;
|
||||
}
|
||||
}
|
||||
|
||||
43
tests/DiagTalker.java
Normal file
43
tests/DiagTalker.java
Normal file
@ -0,0 +1,43 @@
|
||||
//package com.rsyslog.diag;
|
||||
import java.io.*;
|
||||
import java.net.*;
|
||||
|
||||
public class DiagTalker {
|
||||
public static void main(String[] args) throws IOException {
|
||||
|
||||
Socket diagSocket = null;
|
||||
PrintWriter out = null;
|
||||
BufferedReader in = null;
|
||||
final String host = "127.0.0.1";
|
||||
final int port = 13500;
|
||||
|
||||
try {
|
||||
diagSocket = new Socket(host, port);
|
||||
out = new PrintWriter(diagSocket.getOutputStream(), true);
|
||||
in = new BufferedReader(new InputStreamReader(
|
||||
diagSocket.getInputStream()));
|
||||
} catch (UnknownHostException e) {
|
||||
System.err.println("can not resolve " + host + "!");
|
||||
System.exit(1);
|
||||
} catch (IOException e) {
|
||||
System.err.println("Couldn't get I/O for "
|
||||
+ "the connection to: " + host + ".");
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
BufferedReader stdIn = new BufferedReader(
|
||||
new InputStreamReader(System.in));
|
||||
String userInput;
|
||||
|
||||
while ((userInput = stdIn.readLine()) != null) {
|
||||
out.println(userInput);
|
||||
System.out.println("imdiag returns: " + in.readLine());
|
||||
}
|
||||
|
||||
out.close();
|
||||
in.close();
|
||||
stdIn.close();
|
||||
diagSocket.close();
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,9 +5,12 @@ if ENABLE_OMSTDOUT
|
||||
TESTS += omod-if-array.sh parsertest.sh inputname.sh fieldtest.sh
|
||||
endif
|
||||
TESTS_ENVIRONMENT = RSYSLOG_MODDIR='$(abs_top_builddir)'/runtime/.libs/
|
||||
DISTCLEANFILES=rsyslog.pid
|
||||
DISTCLEANFILES=rsyslog.pid '$(abs_top_builddir)'/DiagTalker.class
|
||||
test_files = testbench.h runtime-dummy.c
|
||||
|
||||
check_JAVA = DiagTalker.java
|
||||
#dist_java_JAVA = DiagTalker.java
|
||||
|
||||
EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \
|
||||
cfg1.cfgtest \
|
||||
cfg1.testin \
|
||||
@ -47,6 +50,7 @@ EXTRA_DIST= 1.rstest 2.rstest 3.rstest err1.rstest \
|
||||
testsuites/1.inputname_imtcp_12516 \
|
||||
omod-if-array.sh \
|
||||
waitqueueempty.sh \
|
||||
DiagTalker.java \
|
||||
cfg.sh
|
||||
|
||||
ourtail_SOURCES = ourtail.c
|
||||
|
||||
@ -4,6 +4,10 @@
|
||||
# memory to disk mode for DA queues.
|
||||
# added 2009-04-17 by Rgerhards
|
||||
# This file is part of the rsyslog project, released under GPLv3
|
||||
# uncomment for debugging support:
|
||||
#set -o xtrace
|
||||
#export RSYSLOG_DEBUG="debug nostdout"
|
||||
#export RSYSLOG_DEBUGLOG="tmp"
|
||||
echo testing queue disk-only mode
|
||||
rm -rf test-spool
|
||||
mkdir test-spool
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
# wait until main message queue is empty. This is currently done in
|
||||
# a separate shell script so that we can change the implementation
|
||||
# at some later point. -- rgerhards, 2009-05-25
|
||||
echo WaitMainQueueEmpty | nc 127.0.0.1 13500
|
||||
#echo WaitMainQueueEmpty | nc 127.0.0.1 13500
|
||||
echo WaitMainQueueEmpty | java -classpath $abs_top_builddir DiagTalker
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user