improved session recovery when outbound tcp connection breaks, reduces

probability of message loss at the price of a highly unlikely potential
    (single) message duplication
This commit is contained in:
Rainer Gerhards 2008-03-12 14:50:35 +00:00
parent 618a7f6a22
commit 1bc3337f00
4 changed files with 39 additions and 8 deletions

View File

@ -37,6 +37,9 @@ Version 3.12.1 (rgerhards), 2008-03-06
- added ability to compile on HP UX; verified that imudp worked on HP UX;
however, we are still in need of people trying out rsyslogd on HP UX,
so it can not yet be assumed it runs there
- improved session recovery when outbound tcp connection breaks, reduces
probability of message loss at the price of a highly unlikely potential
(single) message duplication
---------------------------------------------------------------------------
Version 3.12.0 (rgerhards), 2008-02-28
- added full expression support for filters; filters can now contain

View File

@ -259,12 +259,13 @@ static rsRetVal TCPSendFrame(void *pvData, char *msg, size_t len)
*/
static rsRetVal TCPSendPrepRetry(void *pvData)
{
DEFiRet;
instanceData *pData = (instanceData *) pvData;
assert(pData != NULL);
close(pData->sock);
pData->sock = -1;
return RS_RET_OK;
RETiRet;
}

View File

@ -303,12 +303,39 @@ Send(tcpclt_t *pThis, void *pData, char *msg, size_t len)
CHKiRet(pThis->initFunc(pData));
iRet = pThis->sendFunc(pData, msg, len);
if(iRet == RS_RET_OK || retry > 0) {
/* we are done - either we succeeded or the retry failed */
if(iRet == RS_RET_OK) {
/* we are done, we also use this as indication that the previous
* message was succesfully received (it's not always the case, but its at
* least our best shot at it -- rgerhards, 2008-03-12
*/
if(pThis->prevMsg != NULL)
free(pThis->prevMsg);
/* if we can not alloc a new buffer, we silently ignore it. The worst that
* happens is that we lose our message recovery buffer - anything else would
* be worse, so don't try anything ;) -- rgerhards, 2008-03-12
*/
if((pThis->prevMsg = malloc(len)) != NULL) {
memcpy(pThis->prevMsg, msg, len);
pThis->lenPrevMsg = len;
}
/* we are done with this record */
bDone = 1;
} else { /* OK, one retry */
} else {
if(retry == 0) { /* OK, one retry */
++retry;
CHKiRet(pThis->prepRetryFunc(pData)); /* try to recover */
/* now try to send our stored previous message (which most probably
* didn't make it
*/
if(pThis->prevMsg != NULL) {
CHKiRet(pThis->initFunc(pData));
CHKiRet(pThis->sendFunc(pData, pThis->prevMsg, pThis->lenPrevMsg));
}
} else {
/* OK, max number of retries reached, nothing we can do */
bDone = 1;
}
}
}
@ -414,7 +441,6 @@ ENDobjQueryInterface(tcpclt)
BEGINObjClassExit(tcpclt, OBJ_IS_LOADABLE_MODULE) /* CHANGE class also in END MACRO! */
CODESTARTObjClassExit(tcpclt)
/* release objects we no longer need */
//objRelease(net, LM_NET_FILENAME);
ENDObjClassExit(tcpclt)
@ -424,7 +450,6 @@ ENDObjClassExit(tcpclt)
*/
BEGINObjClassInit(tcpclt, 1, OBJ_IS_LOADABLE_MODULE) /* class, version - CHANGE class also in END MACRO! */
/* request objects we use */
//CHKiRet(objUse(net, LM_NET_FILENAME));
/* set our own handlers */
OBJSetMethodHandler(objMethod_CONSTRUCTION_FINALIZER, tcpcltConstructFinalize);

View File

@ -33,6 +33,8 @@
typedef struct tcpclt_s {
BEGINobjInstance; /**< Data to implement generic object - MUST be the first data element! */
TCPFRAMINGMODE tcp_framing;
char *prevMsg;
size_t lenPrevMsg;
/* session specific callbacks */
rsRetVal (*initFunc)(void*);
rsRetVal (*sendFunc)(void*, char*, size_t);