added flow control options to other input sources

This commit is contained in:
Rainer Gerhards 2008-03-19 07:50:47 +00:00
parent 860bcbfe3d
commit 13c32d03f3
7 changed files with 23 additions and 12 deletions

View File

@ -1,5 +1,7 @@
---------------------------------------------------------------------------
Version 3.12.4 (rgerhards), 2008-03-??
- added flow control options to other input sources
- bugfix/doc: removed no longer supported -h option from man page
---------------------------------------------------------------------------
Version 3.12.3 (rgerhards), 2008-03-18
- added advanced flow control for congestion cases (mode depending on message

View File

@ -133,7 +133,7 @@ static rsRetVal writeSyslogV(int iPRI, const char *szFmt, va_list va)
/* here we must create our message object and supply it to the message queue
*/
CHKiRet(parseAndSubmitMessage(LocalHostName, msgBuf, strlen(msgBuf), MSG_DONT_PARSE_HOSTNAME, NOFLAG));
CHKiRet(parseAndSubmitMessage(LocalHostName, msgBuf, strlen(msgBuf), MSG_DONT_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_LIGHT_DELAY));
finalize_it:
RETiRet;

View File

@ -192,7 +192,7 @@ CODESTARTrunInput
if(net.isAllowedSender(net.pAllowedSenders_UDP,
(struct sockaddr *)&frominet, (char*)fromHostFQDN)) {
parseAndSubmitMessage((char*)fromHost, (char*) pRcvBuf, l,
MSG_PARSE_HOSTNAME, NOFLAG);
MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_NO_DELAY);
} else {
dbgprintf("%s is not an allowed sender\n", (char*)fromHostFQDN);
if(option_DisallowWarning) {

View File

@ -181,7 +181,7 @@ static rsRetVal readSocket(int fd, int bParseHost, int flags)
iRcvd = recv(fd, line, MAXLINE - 1, 0);
dbgprintf("Message from UNIX socket: #%d\n", fd);
if (iRcvd > 0) {
parseAndSubmitMessage(LocalHostName, line, iRcvd, bParseHost, flags);
parseAndSubmitMessage(LocalHostName, line, iRcvd, bParseHost, flags, eFLOWCTL_LIGHT_DELAY);
} else if (iRcvd < 0 && errno != EINTR) {
char errStr[1024];
rs_strerror_r(errno, errStr, sizeof(errStr));

View File

@ -630,8 +630,12 @@ void untty(void)
* printchopped(). rgerhards 2005-10-06
* rgerhards: 2008-03-06: added "flags" to allow an input module to specify
* flags, most importantly to request ignoring the messages' timestamp.
*
* rgerhards, 2008-03-19:
* I added an additional calling parameter to permit specifying the flow
* control capability of the source.
*/
rsRetVal printline(char *hname, char *msg, int bParseHost, int flags)
rsRetVal printline(char *hname, char *msg, int bParseHost, int flags, flowControl_t flowCtlType)
{
DEFiRet;
register char *p;
@ -641,6 +645,7 @@ rsRetVal printline(char *hname, char *msg, int bParseHost, int flags)
/* Now it is time to create the message object (rgerhards)
*/
CHKiRet(msgConstruct(&pMsg));
MsgSetFlowControlType(pMsg, flowCtlType);
MsgSetRawMsg(pMsg, msg);
pMsg->bParseHOSTNAME = bParseHost;
@ -716,9 +721,13 @@ finalize_it:
* It also has been adopted to our usual calling interface, but currently does
* not provide any useful return states. But we now have the hook and things can
* improve in the future. <-- TODO!
*
* rgerhards, 2008-03-19:
* I added an additional calling parameter to permit specifying the flow
* control capability of the source.
*/
rsRetVal
parseAndSubmitMessage(char *hname, char *msg, int len, int bParseHost, int flags)
parseAndSubmitMessage(char *hname, char *msg, int len, int bParseHost, int flags, flowControl_t flowCtlType)
{
DEFiRet;
register int iMsg;
@ -817,7 +826,7 @@ parseAndSubmitMessage(char *hname, char *msg, int len, int bParseHost, int flags
*/
if(iMsg == MAXLINE) {
*(pMsg + iMsg) = '\0'; /* space *is* reserved for this! */
printline(hname, tmpline, bParseHost, flags);
printline(hname, tmpline, bParseHost, flags, flowCtlType);
} else {
/* This case in theory never can happen. If it happens, we have
* a logic error. I am checking for it, because if I would not,
@ -869,7 +878,7 @@ parseAndSubmitMessage(char *hname, char *msg, int len, int bParseHost, int flags
*(pMsg + iMsg) = '\0'; /* space *is* reserved for this! */
/* typically, we should end up here! */
printline(hname, tmpline, bParseHost, flags);
printline(hname, tmpline, bParseHost, flags, flowCtlType);
finalize_it:
RETiRet;

View File

@ -118,7 +118,7 @@ typedef struct filed selector_t; /* new type name */
#define MSG_PARSE_HOSTNAME 1
#define MSG_DONT_PARSE_HOSTNAME 0
rsRetVal parseAndSubmitMessage(char *hname, char *msg, int len, int bParseHost, int flags);
rsRetVal parseAndSubmitMessage(char *hname, char *msg, int len, int bParseHost, int flags, flowControl_t flowCtlType);
#include "net.h" /* TODO: remove when you remoe isAllowedSender from here! */
void untty(void);
rsRetVal selectorConstruct(selector_t **ppThis);

View File

@ -209,7 +209,7 @@ PrepareClose(tcps_sess_t *pThis)
* this case.
*/
dbgprintf("Extra data at end of stream in legacy syslog/tcp message - processing\n");
parseAndSubmitMessage(pThis->fromHost, pThis->msg, pThis->iMsg, MSG_PARSE_HOSTNAME, NOFLAG);
parseAndSubmitMessage(pThis->fromHost, pThis->msg, pThis->iMsg, MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_LIGHT_DELAY);
pThis->bAtStrtOfFram = 1;
}
@ -290,7 +290,7 @@ processDataRcvd(tcps_sess_t *pThis, char c)
if(pThis->iMsg >= MAXLINE) {
/* emergency, we now need to flush, no matter if we are at end of message or not... */
dbgprintf("error: message received is larger than MAXLINE, we split it\n");
parseAndSubmitMessage(pThis->fromHost, pThis->msg, pThis->iMsg, MSG_PARSE_HOSTNAME, NOFLAG);
parseAndSubmitMessage(pThis->fromHost, pThis->msg, pThis->iMsg, MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_LIGHT_DELAY);
pThis->iMsg = 0;
/* we might think if it is better to ignore the rest of the
* message than to treat it as a new one. Maybe this is a good
@ -300,7 +300,7 @@ processDataRcvd(tcps_sess_t *pThis, char c)
}
if(c == '\n' && pThis->eFraming == TCP_FRAMING_OCTET_STUFFING) { /* record delemiter? */
parseAndSubmitMessage(pThis->fromHost, pThis->msg, pThis->iMsg, MSG_PARSE_HOSTNAME, NOFLAG);
parseAndSubmitMessage(pThis->fromHost, pThis->msg, pThis->iMsg, MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_LIGHT_DELAY);
pThis->iMsg = 0;
pThis->inputState = eAtStrtFram;
} else {
@ -318,7 +318,7 @@ processDataRcvd(tcps_sess_t *pThis, char c)
pThis->iOctetsRemain--;
if(pThis->iOctetsRemain < 1) {
/* we have end of frame! */
parseAndSubmitMessage(pThis->fromHost, pThis->msg, pThis->iMsg, MSG_PARSE_HOSTNAME, NOFLAG);
parseAndSubmitMessage(pThis->fromHost, pThis->msg, pThis->iMsg, MSG_PARSE_HOSTNAME, NOFLAG, eFLOWCTL_LIGHT_DELAY);
pThis->iMsg = 0;
pThis->inputState = eAtStrtFram;
}