mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-17 08:10:43 +01:00
changed Rcv-Interface in tcpsrv subsystem
It is now iRet based. This enables us to communicate more in-depth information to the upper peers. This is needed to handle the EGAIN case on rcv (not yet implemented)
This commit is contained in:
parent
716ab25446
commit
7b1a570d54
@ -68,7 +68,7 @@ MODULE_TYPE_INPUT
|
|||||||
static rsRetVal addGSSListener(void __attribute__((unused)) *pVal, uchar *pNewVal);
|
static rsRetVal addGSSListener(void __attribute__((unused)) *pVal, uchar *pNewVal);
|
||||||
static int TCPSessGSSInit(void);
|
static int TCPSessGSSInit(void);
|
||||||
static void TCPSessGSSClose(tcps_sess_t* pSess);
|
static void TCPSessGSSClose(tcps_sess_t* pSess);
|
||||||
static int TCPSessGSSRecv(tcps_sess_t *pSess, void *buf, size_t buf_len);
|
static rsRetVal TCPSessGSSRecv(tcps_sess_t *pSess, void *buf, size_t buf_len, ssize_t *);
|
||||||
static rsRetVal onSessAccept(tcpsrv_t *pThis, tcps_sess_t *ppSess);
|
static rsRetVal onSessAccept(tcpsrv_t *pThis, tcps_sess_t *ppSess);
|
||||||
static rsRetVal OnSessAcceptGSS(tcpsrv_t *pThis, tcps_sess_t *ppSess);
|
static rsRetVal OnSessAcceptGSS(tcpsrv_t *pThis, tcps_sess_t *ppSess);
|
||||||
|
|
||||||
@ -274,25 +274,28 @@ finalize_it:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int
|
static rsRetVal
|
||||||
doRcvData(tcps_sess_t *pSess, char *buf, size_t lenBuf)
|
doRcvData(tcps_sess_t *pSess, char *buf, size_t lenBuf, ssize_t *piLenRcvd)
|
||||||
{
|
{
|
||||||
ssize_t state;
|
DEFiRet;
|
||||||
int allowedMethods;
|
int allowedMethods;
|
||||||
gss_sess_t *pGSess;
|
gss_sess_t *pGSess;
|
||||||
|
|
||||||
assert(pSess != NULL);
|
assert(pSess != NULL);
|
||||||
assert(pSess->pUsr != NULL);
|
assert(pSess->pUsr != NULL);
|
||||||
pGSess = (gss_sess_t*) pSess->pUsr;
|
pGSess = (gss_sess_t*) pSess->pUsr;
|
||||||
|
assert(piLenRcvd != NULL);
|
||||||
|
|
||||||
allowedMethods = pGSess->allowedMethods;
|
allowedMethods = pGSess->allowedMethods;
|
||||||
if(allowedMethods & ALLOWEDMETHOD_GSS)
|
if(allowedMethods & ALLOWEDMETHOD_GSS) {
|
||||||
state = TCPSessGSSRecv(pSess, buf, lenBuf);
|
CHKiRet(TCPSessGSSRecv(pSess, buf, lenBuf, piLenRcvd));
|
||||||
else {
|
} else {
|
||||||
if(netstrm.Rcv(pSess->pStrm, (uchar*) buf, &state) != RS_RET_OK)
|
*piLenRcvd = lenBuf;
|
||||||
state = -1; // TODO: move this function to an iRet interface! 2008-05-05
|
CHKiRet(netstrm.Rcv(pSess->pStrm, (uchar*) buf, piLenRcvd) != RS_RET_OK);
|
||||||
}
|
}
|
||||||
return state;
|
|
||||||
|
finalize_it:
|
||||||
|
RETiRet;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -526,25 +529,26 @@ finalize_it:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* returns: number of bytes read or -1 on error
|
/* Replaces recv() for gssapi connections.
|
||||||
* Replaces recv() for gssapi connections.
|
|
||||||
*/
|
*/
|
||||||
int TCPSessGSSRecv(tcps_sess_t *pSess, void *buf, size_t buf_len)
|
int TCPSessGSSRecv(tcps_sess_t *pSess, void *buf, size_t buf_len, ssize_t *piLenRcvd)
|
||||||
{
|
{
|
||||||
|
DEFiRet;
|
||||||
gss_buffer_desc xmit_buf, msg_buf;
|
gss_buffer_desc xmit_buf, msg_buf;
|
||||||
gss_ctx_id_t *context;
|
gss_ctx_id_t *context;
|
||||||
OM_uint32 maj_stat, min_stat;
|
OM_uint32 maj_stat, min_stat;
|
||||||
int fdSess;
|
int fdSess;
|
||||||
int conf_state;
|
int conf_state;
|
||||||
int state, len;
|
int state;
|
||||||
gss_sess_t *pGSess;
|
gss_sess_t *pGSess;
|
||||||
|
|
||||||
assert(pSess->pUsr != NULL);
|
assert(pSess->pUsr != NULL);
|
||||||
|
assert(piLenRcvd != NULL);
|
||||||
pGSess = (gss_sess_t*) pSess->pUsr;
|
pGSess = (gss_sess_t*) pSess->pUsr;
|
||||||
|
|
||||||
netstrm.GetSock(pSess->pStrm, &fdSess); // TODO: method access, CHKiRet!
|
netstrm.GetSock(pSess->pStrm, &fdSess); // TODO: method access, CHKiRet!
|
||||||
if ((state = gssutil.recv_token(fdSess, &xmit_buf)) <= 0)
|
if ((state = gssutil.recv_token(fdSess, &xmit_buf)) <= 0)
|
||||||
return state;
|
ABORT_FINALIZE(RS_RET_GSS_ERR);
|
||||||
|
|
||||||
context = &pGSess->gss_context;
|
context = &pGSess->gss_context;
|
||||||
maj_stat = gss_unwrap(&min_stat, *context, &xmit_buf, &msg_buf,
|
maj_stat = gss_unwrap(&min_stat, *context, &xmit_buf, &msg_buf,
|
||||||
@ -555,18 +559,19 @@ int TCPSessGSSRecv(tcps_sess_t *pSess, void *buf, size_t buf_len)
|
|||||||
free(xmit_buf.value);
|
free(xmit_buf.value);
|
||||||
xmit_buf.value = 0;
|
xmit_buf.value = 0;
|
||||||
}
|
}
|
||||||
return (-1);
|
ABORT_FINALIZE(RS_RET_GSS_ERR);
|
||||||
}
|
}
|
||||||
if (xmit_buf.value) {
|
if (xmit_buf.value) {
|
||||||
free(xmit_buf.value);
|
free(xmit_buf.value);
|
||||||
xmit_buf.value = 0;
|
xmit_buf.value = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
len = msg_buf.length < buf_len ? msg_buf.length : buf_len;
|
*piLenRcvd = msg_buf.length < buf_len ? msg_buf.length : buf_len;
|
||||||
memcpy(buf, msg_buf.value, len);
|
memcpy(buf, msg_buf.value, *piLenRcvd);
|
||||||
gss_release_buffer(&min_stat, &msg_buf);
|
gss_release_buffer(&min_stat, &msg_buf);
|
||||||
|
|
||||||
return len;
|
finalize_it:
|
||||||
|
RETiRet;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -101,16 +101,17 @@ doOpenLstnSocks(tcpsrv_t *pSrv)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int
|
static rsRetVal
|
||||||
doRcvData(tcps_sess_t *pSess, char *buf, size_t lenBuf)
|
doRcvData(tcps_sess_t *pSess, char *buf, size_t lenBuf, ssize_t *piLenRcvd)
|
||||||
{
|
{
|
||||||
ssize_t state;
|
DEFiRet;
|
||||||
assert(pSess != NULL);
|
assert(pSess != NULL);
|
||||||
|
assert(piLenRcvd != NULL);
|
||||||
|
|
||||||
state = lenBuf;
|
*piLenRcvd = lenBuf;
|
||||||
if(netstrm.Rcv(pSess->pStrm, (uchar*) buf, &state) != RS_RET_OK)
|
CHKiRet(netstrm.Rcv(pSess->pStrm, (uchar*) buf, piLenRcvd) != RS_RET_OK);
|
||||||
state = -1; // TODO: move this function to an iRet interface! 2008-04-23
|
finalize_it:
|
||||||
return state;
|
RETiRet;
|
||||||
}
|
}
|
||||||
|
|
||||||
static rsRetVal
|
static rsRetVal
|
||||||
@ -167,7 +168,6 @@ static rsRetVal addTCPListener(void __attribute__((unused)) *pVal, uchar *pNewVa
|
|||||||
CHKiRet(tcpsrv.SetDrvrMode(pOurTcpsrv, iStrmDrvrMode));
|
CHKiRet(tcpsrv.SetDrvrMode(pOurTcpsrv, iStrmDrvrMode));
|
||||||
/* now set optional params, but only if they were actually configured */
|
/* now set optional params, but only if they were actually configured */
|
||||||
if(pszStrmDrvrAuthMode != NULL) {
|
if(pszStrmDrvrAuthMode != NULL) {
|
||||||
RUNLOG_VAR("%s", pszStrmDrvrAuthMode);
|
|
||||||
CHKiRet(tcpsrv.SetDrvrAuthMode(pOurTcpsrv, pszStrmDrvrAuthMode));
|
CHKiRet(tcpsrv.SetDrvrAuthMode(pOurTcpsrv, pszStrmDrvrAuthMode));
|
||||||
}
|
}
|
||||||
if(pPermPeersRoot != NULL) {
|
if(pPermPeersRoot != NULL) {
|
||||||
|
|||||||
@ -1388,9 +1388,15 @@ Rcv(nsd_t *pNsd, uchar *pBuf, ssize_t *pLenBuf)
|
|||||||
/* in TLS mode now */
|
/* in TLS mode now */
|
||||||
lenRcvd = gnutls_record_recv(pThis->sess, pBuf, *pLenBuf);
|
lenRcvd = gnutls_record_recv(pThis->sess, pBuf, *pLenBuf);
|
||||||
if(lenRcvd < 0) {
|
if(lenRcvd < 0) {
|
||||||
int gnuRet; /* TODO: build a specific function for GnuTLS error reporting */
|
if(lenRcvd == GNUTLS_E_AGAIN || lenRcvd == GNUTLS_E_INTERRUPTED) {
|
||||||
*pLenBuf = -1;
|
pThis->rtryCall = gtlsRtry_recv;
|
||||||
CHKgnutls(lenRcvd); /* this will abort the function */
|
dbgprintf("GnuTLS receive requires a retry (this most probably is OK and no error condition)\n");
|
||||||
|
iRet = RS_RET_RETRY;
|
||||||
|
} else {
|
||||||
|
int gnuRet; /* TODO: build a specific function for GnuTLS error reporting */
|
||||||
|
*pLenBuf = -1;
|
||||||
|
CHKgnutls(lenRcvd); /* this will abort the function */
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
*pLenBuf = lenRcvd;
|
*pLenBuf = lenRcvd;
|
||||||
|
|||||||
@ -28,7 +28,8 @@
|
|||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
gtlsRtry_None = 0, /**< no call needs to be retried */
|
gtlsRtry_None = 0, /**< no call needs to be retried */
|
||||||
gtlsRtry_handshake = 1
|
gtlsRtry_handshake = 1,
|
||||||
|
gtlsRtry_recv = 2
|
||||||
} gtlsRtryCall_t; /**< IDs of calls that needs to be retried */
|
} gtlsRtryCall_t; /**< IDs of calls that needs to be retried */
|
||||||
|
|
||||||
typedef nsd_if_t nsd_gtls_if_t; /* we just *implement* this interface */
|
typedef nsd_if_t nsd_gtls_if_t; /* we just *implement* this interface */
|
||||||
|
|||||||
@ -238,6 +238,9 @@ enum rsRetVal_ /** return value. All methods return this if not specified oth
|
|||||||
RS_RET_FILE_NO_STAT = -2096, /**< can not stat() a file */
|
RS_RET_FILE_NO_STAT = -2096, /**< can not stat() a file */
|
||||||
RS_RET_FILE_TOO_LARGE = -2097, /**< a file is larger than permitted */
|
RS_RET_FILE_TOO_LARGE = -2097, /**< a file is larger than permitted */
|
||||||
RS_RET_INVALID_WILDCARD = -2098, /**< a wildcard entry is invalid */
|
RS_RET_INVALID_WILDCARD = -2098, /**< a wildcard entry is invalid */
|
||||||
|
RS_RET_CLOSED = -2099, /**< connection was closed */
|
||||||
|
RS_RET_RETRY = -2100, /**< call should be retried (e.g. EGAIN on recv) */
|
||||||
|
RS_RET_GSS_ERR = -2101, /**< generic error occured in GSSAPI subsystem */
|
||||||
|
|
||||||
/* RainerScript error messages (range 1000.. 1999) */
|
/* RainerScript error messages (range 1000.. 1999) */
|
||||||
RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */
|
RS_RET_SYSVAR_NOT_FOUND = 1001, /**< system variable could not be found (maybe misspelled) */
|
||||||
|
|||||||
13
tcpsrv.c
13
tcpsrv.c
@ -404,6 +404,7 @@ Run(tcpsrv_t *pThis)
|
|||||||
tcps_sess_t *pNewSess;
|
tcps_sess_t *pNewSess;
|
||||||
nssel_t *pSel;
|
nssel_t *pSel;
|
||||||
int state;
|
int state;
|
||||||
|
ssize_t iRcvd;
|
||||||
|
|
||||||
ISOBJ_TYPE_assert(pThis, tcpsrv);
|
ISOBJ_TYPE_assert(pThis, tcpsrv);
|
||||||
|
|
||||||
@ -452,11 +453,13 @@ Run(tcpsrv_t *pThis)
|
|||||||
dbgprintf("netstream %p with new data\n", pThis->pSessions[iTCPSess]->pStrm);
|
dbgprintf("netstream %p with new data\n", pThis->pSessions[iTCPSess]->pStrm);
|
||||||
|
|
||||||
/* Receive message */
|
/* Receive message */
|
||||||
state = pThis->pRcvData(pThis->pSessions[iTCPSess], buf, sizeof(buf));
|
iRet = pThis->pRcvData(pThis->pSessions[iTCPSess], buf, sizeof(buf), &iRcvd);
|
||||||
if(state == 0) {
|
if(iRet == RS_RET_CLOSED) {
|
||||||
pThis->pOnRegularClose(pThis->pSessions[iTCPSess]);
|
pThis->pOnRegularClose(pThis->pSessions[iTCPSess]);
|
||||||
tcps_sess.Destruct(&pThis->pSessions[iTCPSess]);
|
tcps_sess.Destruct(&pThis->pSessions[iTCPSess]);
|
||||||
} else if(state == -1) {
|
} else if(iRet == RS_RET_RETRY) {
|
||||||
|
/* we simply ignore retry - this is not an error, but we also have not received anything */
|
||||||
|
} else if(iRet == RS_RET_OK) {
|
||||||
errno = 0;
|
errno = 0;
|
||||||
errmsg.LogError(NO_ERRCODE, "netstream session %p will be closed due to error\n",
|
errmsg.LogError(NO_ERRCODE, "netstream session %p will be closed due to error\n",
|
||||||
pThis->pSessions[iTCPSess]->pStrm);
|
pThis->pSessions[iTCPSess]->pStrm);
|
||||||
@ -464,7 +467,7 @@ Run(tcpsrv_t *pThis)
|
|||||||
tcps_sess.Destruct(&pThis->pSessions[iTCPSess]);
|
tcps_sess.Destruct(&pThis->pSessions[iTCPSess]);
|
||||||
} else {
|
} else {
|
||||||
/* valid data received, process it! */
|
/* valid data received, process it! */
|
||||||
if(tcps_sess.DataRcvd(pThis->pSessions[iTCPSess], buf, state) != RS_RET_OK) {
|
if(tcps_sess.DataRcvd(pThis->pSessions[iTCPSess], buf, iRcvd) != RS_RET_OK) {
|
||||||
/* in this case, something went awfully wrong.
|
/* in this case, something went awfully wrong.
|
||||||
* We are instructed to terminate the session.
|
* We are instructed to terminate the session.
|
||||||
*/
|
*/
|
||||||
@ -563,7 +566,7 @@ SetCBIsPermittedHost(tcpsrv_t *pThis, int (*pCB)(struct sockaddr *addr, char *fr
|
|||||||
}
|
}
|
||||||
|
|
||||||
static rsRetVal
|
static rsRetVal
|
||||||
SetCBRcvData(tcpsrv_t *pThis, int (*pRcvData)(tcps_sess_t*, char*, size_t))
|
SetCBRcvData(tcpsrv_t *pThis, rsRetVal (*pRcvData)(tcps_sess_t*, char*, size_t, ssize_t*))
|
||||||
{
|
{
|
||||||
DEFiRet;
|
DEFiRet;
|
||||||
pThis->pRcvData = pRcvData;
|
pThis->pRcvData = pRcvData;
|
||||||
|
|||||||
4
tcpsrv.h
4
tcpsrv.h
@ -40,7 +40,7 @@ struct tcpsrv_s {
|
|||||||
void *pUsr; /**< a user-settable pointer (provides extensibility for "derived classes")*/
|
void *pUsr; /**< a user-settable pointer (provides extensibility for "derived classes")*/
|
||||||
/* callbacks */
|
/* callbacks */
|
||||||
int (*pIsPermittedHost)(struct sockaddr *addr, char *fromHostFQDN, void*pUsrSrv, void*pUsrSess);
|
int (*pIsPermittedHost)(struct sockaddr *addr, char *fromHostFQDN, void*pUsrSrv, void*pUsrSess);
|
||||||
int (*pRcvData)(tcps_sess_t*, char*, size_t);
|
rsRetVal (*pRcvData)(tcps_sess_t*, char*, size_t, ssize_t *);
|
||||||
rsRetVal (*OpenLstnSocks)(struct tcpsrv_s*);
|
rsRetVal (*OpenLstnSocks)(struct tcpsrv_s*);
|
||||||
rsRetVal (*pOnListenDeinit)(void*);
|
rsRetVal (*pOnListenDeinit)(void*);
|
||||||
rsRetVal (*OnDestruct)(void*);
|
rsRetVal (*OnDestruct)(void*);
|
||||||
@ -67,7 +67,7 @@ BEGINinterface(tcpsrv) /* name must also be changed in ENDinterface macro! */
|
|||||||
rsRetVal (*SetUsrP)(tcpsrv_t*, void*);
|
rsRetVal (*SetUsrP)(tcpsrv_t*, void*);
|
||||||
rsRetVal (*SetCBIsPermittedHost)(tcpsrv_t*, int (*) (struct sockaddr *addr, char*, void*, void*));
|
rsRetVal (*SetCBIsPermittedHost)(tcpsrv_t*, int (*) (struct sockaddr *addr, char*, void*, void*));
|
||||||
rsRetVal (*SetCBOpenLstnSocks)(tcpsrv_t *, rsRetVal (*)(tcpsrv_t*));
|
rsRetVal (*SetCBOpenLstnSocks)(tcpsrv_t *, rsRetVal (*)(tcpsrv_t*));
|
||||||
rsRetVal (*SetCBRcvData)(tcpsrv_t *, int (*)(tcps_sess_t*, char*, size_t));
|
rsRetVal (*SetCBRcvData)(tcpsrv_t *pThis, rsRetVal (*pRcvData)(tcps_sess_t*, char*, size_t, ssize_t*));
|
||||||
rsRetVal (*SetCBOnListenDeinit)(tcpsrv_t*, rsRetVal (*)(void*));
|
rsRetVal (*SetCBOnListenDeinit)(tcpsrv_t*, rsRetVal (*)(void*));
|
||||||
rsRetVal (*SetCBOnDestruct)(tcpsrv_t*, rsRetVal (*) (void*));
|
rsRetVal (*SetCBOnDestruct)(tcpsrv_t*, rsRetVal (*) (void*));
|
||||||
rsRetVal (*SetCBOnRegularClose)(tcpsrv_t*, rsRetVal (*) (tcps_sess_t*));
|
rsRetVal (*SetCBOnRegularClose)(tcpsrv_t*, rsRetVal (*) (tcps_sess_t*));
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user