core: add fromhost-port message property

Some deployments need to disambiguate multiple senders sharing an IP,
for example autossh or similar tunnel setups. Exposing the source port
improves observability and lets pipelines key on a stable tuple.

Impact: new property/JSON field; tcps_sess IF v4; out-of-tree modules
must rebuild.

Before: messages exposed fromhost and fromhost-ip only.
After:  messages also expose fromhost-port and jsonmesg includes it.

Introduce PROP_FROMHOST_PORT and wire it through msg.{h,c}. For TCP,
capture the remote port on accept, store it in tcps_sess, and attach it
to the msg on submit. For other inputs, resolveDNS derives the port from
the sockaddr when available; local inputs return an empty string. Add a
getter, duplication and destructor handling, and name<->ID mapping. Add
the field to jsonmesg output. Update docs, lexer keywords, and the
external plugin interface doc (property is modifiable). Bump
tcps_sessCURR_IF_VERSION to 4 and add SetHostPort() to the interface.
Include a focused test (fromhost-port.sh) that verifies the property.

Non-technical rationale: allow identification by (fromhost-ip,
fromhost-port) where IP alone is shared across systems (e.g., autossh).

With help from AI-Agents: ChatGPT
This commit is contained in:
Rainer Gerhards 2025-09-02 16:09:47 +02:00
parent bc9a587de3
commit c89113d531
No known key found for this signature in database
GPG Key ID: 0CB6B2A8BE80B499
13 changed files with 213 additions and 3 deletions

View File

@ -43,7 +43,7 @@ class RainerScriptLexer(RegexLexer):
'syslogtag', 'protocol-version', 'structured-data', 'app-name',
'procid', 'msgid', 'pri', 'pri-text', 'syslogfacility',
'syslogfacility-text', 'syslogseverity', 'syslogseverity-text',
'fromhost', 'fromhost-ip', 'remotehost', 'remotehost-ip', 'timereported',
'fromhost', 'fromhost-ip', 'fromhost-port', 'remotehost', 'remotehost-ip', 'timereported',
'timegenerated', 'timestamp', 'json', 'jsonmesg', 'jsonf'
]

View File

@ -66,6 +66,10 @@ The following message properties exist:
The same as fromhost, but always as an IP address. Local inputs (like
imklog) use 127.0.0.1 in this property.
**fromhost-port**
The same as fromhost, but contains the numeric source port of the
sender. Local inputs provide an empty string.
**syslogtag**
TAG from the message

View File

@ -327,6 +327,7 @@ Most message properties can be modified. Modifiable are:
* hostname (aliased "source")
* fromhost
* fromhost-ip
* fromhost-port
* all message variable ("$!" tree)
If the message variable tree is modified, new variables may also be *added*. Deletion

View File

@ -36,6 +36,7 @@
#include <assert.h>
#include <ctype.h>
#include <sys/socket.h>
#include <netinet/in.h>
#ifdef HAVE_SYSINFO_UPTIME
#include <sys/sysinfo.h>
#endif
@ -262,6 +263,11 @@ static inline void MsgSetRcvFromIPWithoutAddRef(smsg_t *pThis, prop_t *new) {
pThis->pRcvFromIP = new;
}
static inline void MsgSetRcvFromPortWithoutAddRef(smsg_t *pThis, prop_t *new) {
if (pThis->pRcvFromPort != NULL) prop.Destruct(&pThis->pRcvFromPort);
pThis->pRcvFromPort = new;
}
/* set RcvFrom name in msg object WITHOUT calling AddRef.
* rgerhards, 2013-01-22
@ -305,6 +311,9 @@ static rsRetVal resolveDNS(smsg_t *const pMsg) {
prop_t *propFromHost = NULL;
prop_t *ip;
prop_t *localName;
prop_t *port = NULL;
char portbuf[8];
uint16_t pnum;
DEFiRet;
MsgLock(pMsg);
@ -319,6 +328,19 @@ static rsRetVal resolveDNS(smsg_t *const pMsg) {
/* we pass down the props, so no need for AddRef */
MsgSetRcvFromWithoutAddRef(pMsg, localName);
MsgSetRcvFromIPWithoutAddRef(pMsg, ip);
if (pMsg->pRcvFromPort == NULL) {
if (pMsg->rcvFrom.pfrominet->ss_family == AF_INET)
pnum = ntohs(((struct sockaddr_in *)pMsg->rcvFrom.pfrominet)->sin_port);
else if (pMsg->rcvFrom.pfrominet->ss_family == AF_INET6)
pnum = ntohs(((struct sockaddr_in6 *)pMsg->rcvFrom.pfrominet)->sin6_port);
else
pnum = 0;
snprintf(portbuf, sizeof(portbuf), "%u", pnum);
CHKiRet(prop.CreateStringProp(&port, (uchar *)portbuf, strlen(portbuf)));
MsgSetRcvFromPortWithoutAddRef(pMsg, port);
port = NULL;
}
}
}
finalize_it:
@ -329,6 +351,7 @@ finalize_it:
}
MsgUnlock(pMsg);
if (propFromHost != NULL) prop.Destruct(&propFromHost);
if (port != NULL) prop.Destruct(&port);
RETiRet;
}
@ -358,6 +381,21 @@ static uchar *getRcvFromIP(smsg_t *const pM) {
return psz;
}
static uchar *getRcvFromPort(smsg_t *const pM) {
uchar *psz;
int len;
if (pM == NULL) {
psz = UCHAR_CONSTANT("");
} else {
resolveDNS(pM);
if (pM->pRcvFromPort == NULL)
psz = UCHAR_CONSTANT("");
else
prop.GetString(pM->pRcvFromPort, &psz, &len);
}
return psz;
}
/* map a property name (string) to a property ID */
rsRetVal propNameToID(const uchar *const pName, propid_t *const pPropID) {
@ -383,6 +421,8 @@ rsRetVal propNameToID(const uchar *const pName, propid_t *const pPropID) {
*pPropID = PROP_FROMHOST;
} else if (!strcasecmp((char *)pName, "fromhost-ip")) {
*pPropID = PROP_FROMHOST_IP;
} else if (!strcasecmp((char *)pName, "fromhost-port")) {
*pPropID = PROP_FROMHOST_PORT;
} else if (!strcasecmp((char *)pName, "pri")) {
*pPropID = PROP_PRI;
} else if (!strcasecmp((char *)pName, "pri-text")) {
@ -505,6 +545,8 @@ uchar *propIDToName(propid_t propID) {
return UCHAR_CONSTANT("fromhost");
case PROP_FROMHOST_IP:
return UCHAR_CONSTANT("fromhost-ip");
case PROP_FROMHOST_PORT:
return UCHAR_CONSTANT("fromhost-port");
case PROP_PRI:
return UCHAR_CONSTANT("pri");
case PROP_PRI_TEXT:
@ -660,6 +702,7 @@ static rsRetVal msgBaseConstruct(smsg_t **ppThis) {
pM->pCSMSGID = NULL;
pM->pInputName = NULL;
pM->pRcvFromIP = NULL;
pM->pRcvFromPort = NULL;
pM->rcvFrom.pRcvFrom = NULL;
pM->pRuleset = NULL;
pM->json = NULL;
@ -790,6 +833,7 @@ rsRetVal msgDestruct(smsg_t **ppThis) {
free(pThis->rcvFrom.pfrominet);
}
if (pThis->pRcvFromIP != NULL) prop.Destruct(&pThis->pRcvFromIP);
if (pThis->pRcvFromPort != NULL) prop.Destruct(&pThis->pRcvFromPort);
free(pThis->pszRcvdAt3164);
free(pThis->pszRcvdAt3339);
free(pThis->pszRcvdAt_MySQL);
@ -911,6 +955,10 @@ ENDobjDestruct
pNew->pRcvFromIP = pOld->pRcvFromIP;
prop.AddRef(pNew->pRcvFromIP);
}
if (pOld->pRcvFromPort != NULL) {
pNew->pRcvFromPort = pOld->pRcvFromPort;
prop.AddRef(pNew->pRcvFromPort);
}
if (pOld->pInputName != NULL) {
pNew->pInputName = pOld->pInputName;
prop.AddRef(pNew->pInputName);
@ -2103,6 +2151,9 @@ const uchar *msgGetJSONMESG(smsg_t *__restrict__ const pMsg) {
jval = json_object_new_string((char *)getRcvFromIP(pMsg));
json_object_object_add(json, "fromhost-ip", jval);
jval = json_object_new_string((char *)getRcvFromPort(pMsg));
json_object_object_add(json, "fromhost-port", jval);
jval = json_object_new_string(getPRI(pMsg));
json_object_object_add(json, "pri", jval);
@ -2489,6 +2540,25 @@ finalize_it:
RETiRet;
}
rsRetVal MsgSetRcvFromPort(smsg_t *pThis, prop_t *new) {
assert(pThis != NULL);
prop.AddRef(new);
MsgSetRcvFromPortWithoutAddRef(pThis, new);
return RS_RET_OK;
}
rsRetVal MsgSetRcvFromPortStr(smsg_t *const pThis, const uchar *psz, const int len, prop_t **ppProp) {
DEFiRet;
assert(pThis != NULL);
CHKiRet(prop.CreateOrReuseStringProp(ppProp, psz, len));
MsgSetRcvFromPort(pThis, *ppProp);
finalize_it:
RETiRet;
}
/* rgerhards 2004-11-09: set HOSTNAME in msg object
* rgerhards, 2007-06-21:
@ -3339,6 +3409,9 @@ uchar *MsgGetProp(smsg_t *__restrict__ const pMsg,
case PROP_FROMHOST_IP:
pRes = getRcvFromIP(pMsg);
break;
case PROP_FROMHOST_PORT:
pRes = getRcvFromPort(pMsg);
break;
case PROP_PRI:
pRes = (uchar *)getPRI(pMsg);
break;

View File

@ -95,6 +95,7 @@ struct msg {
cstr_t *pCSMSGID; /* MSGID */
prop_t *pInputName; /* input name property */
prop_t *pRcvFromIP; /* IP of system message was received from */
prop_t *pRcvFromPort; /* port of system message was received from */
union {
prop_t *pRcvFrom; /* name of system message was received from */
struct sockaddr_storage *pfrominet; /* unresolved name */
@ -194,6 +195,8 @@ void MsgSetRcvFrom(smsg_t *pMsg, prop_t *);
void MsgSetRcvFromStr(smsg_t *const pMsg, const uchar *pszRcvFrom, const int, prop_t **);
rsRetVal MsgSetRcvFromIP(smsg_t *pMsg, prop_t *);
rsRetVal MsgSetRcvFromIPStr(smsg_t *const pThis, const uchar *psz, const int len, prop_t **ppProp);
rsRetVal MsgSetRcvFromPort(smsg_t *pMsg, prop_t *);
rsRetVal MsgSetRcvFromPortStr(smsg_t *const pThis, const uchar *psz, const int len, prop_t **ppProp);
void MsgSetHOSTNAME(smsg_t *pMsg, const uchar *pszHOSTNAME, const int lenHOSTNAME);
rsRetVal MsgSetAfterPRIOffs(smsg_t *pMsg, int offs);
void MsgSetMSGoffs(smsg_t *pMsg, int offs);

View File

@ -68,6 +68,9 @@ BEGINobjConstruct(tcps_sess) /* be sure to specify the object type also in END m
pThis->inputState = eAtStrtFram; /* indicate frame header expected */
pThis->eFraming = TCP_FRAMING_OCTET_STUFFING; /* just make sure... */
pthread_mutex_init(&pThis->mut, NULL);
pThis->fromHost = NULL;
pThis->fromHostIP = NULL;
pThis->fromHostPort = NULL;
/* now allocate the message reception buffer */
CHKmalloc(pThis->pMsg = (uchar *)malloc(pThis->iMaxLine + 1));
finalize_it:
@ -100,6 +103,7 @@ BEGINobjDestruct(tcps_sess) /* be sure to specify the object type also in END an
/* now destruct our own properties */
if (pThis->fromHost != NULL) CHKiRet(prop.Destruct(&pThis->fromHost));
if (pThis->fromHostIP != NULL) CHKiRet(prop.Destruct(&pThis->fromHostIP));
if (pThis->fromHostPort != NULL) CHKiRet(prop.Destruct(&pThis->fromHostPort));
free(pThis->pMsg);
ENDobjDestruct(tcps_sess)
@ -144,6 +148,17 @@ static rsRetVal SetHostIP(tcps_sess_t *pThis, prop_t *ip) {
RETiRet;
}
static rsRetVal SetHostPort(tcps_sess_t *pThis, prop_t *port) {
DEFiRet;
ISOBJ_TYPE_assert(pThis, tcps_sess);
if (pThis->fromHostPort != NULL) {
prop.Destruct(&pThis->fromHostPort);
}
pThis->fromHostPort = port;
RETiRet;
}
static rsRetVal SetStrm(tcps_sess_t *pThis, netstrm_t *pStrm) {
DEFiRet;
ISOBJ_TYPE_assert(pThis, tcps_sess);
@ -236,6 +251,7 @@ static rsRetVal defaultDoSubmitMessage(tcps_sess_t *pThis,
pMsg->msgFlags = NEEDS_PARSING | PARSE_HOSTNAME;
MsgSetRcvFrom(pMsg, pThis->fromHost);
CHKiRet(MsgSetRcvFromIP(pMsg, pThis->fromHostIP));
CHKiRet(MsgSetRcvFromPort(pMsg, pThis->fromHostPort));
MsgSetRuleset(pMsg, cnf_params->pRuleset);
STATSCOUNTER_INC(pThis->pLstnInfo->ctrSubmit, pThis->pLstnInfo->mutCtrSubmit);
@ -552,6 +568,7 @@ BEGINobjQueryInterface(tcps_sess)
pIf->SetLstnInfo = SetLstnInfo;
pIf->SetHost = SetHost;
pIf->SetHostIP = SetHostIP;
pIf->SetHostPort = SetHostPort;
pIf->SetStrm = SetStrm;
pIf->SetMsgIdx = SetMsgIdx;
pIf->SetOnMsgReceive = SetOnMsgReceive;

View File

@ -44,6 +44,7 @@ struct tcps_sess_s {
uchar *pMsg; /* message (fragment) received */
prop_t *fromHost; /* host name we received messages from */
prop_t *fromHostIP;
prop_t *fromHostPort;
void *pUsr; /* a user-pointer */
rsRetVal (*DoSubmitMessage)(tcps_sess_t *, uchar *, int); /* submit message callback */
int iMaxLine; /* fast lookup buffer for config property */
@ -66,17 +67,20 @@ BEGINinterface(tcps_sess) /* name must also be changed in ENDinterface macro! */
rsRetVal (*SetUsrP)(tcps_sess_t *, void *);
rsRetVal (*SetHost)(tcps_sess_t *pThis, uchar *);
rsRetVal (*SetHostIP)(tcps_sess_t *pThis, prop_t *);
rsRetVal (*SetHostPort)(tcps_sess_t *pThis, prop_t *);
rsRetVal (*SetStrm)(tcps_sess_t *pThis, netstrm_t *);
rsRetVal (*SetMsgIdx)(tcps_sess_t *pThis, int);
rsRetVal (*SetOnMsgReceive)(tcps_sess_t *pThis, rsRetVal (*OnMsgReceive)(tcps_sess_t *, uchar *, int));
ENDinterface(tcps_sess)
#define tcps_sessCURR_IF_VERSION 3 /* increment whenever you change the interface structure! */
#define tcps_sessCURR_IF_VERSION 4 /* increment whenever you change the interface structure! */
/* interface changes
* to version v2, rgerhards, 2009-05-22
* - Data structures changed
* - SetLstnInfo entry point added
* version 3, rgerhards, 2013-01-21:
* - signature of SetHostIP() changed
* version 4, 2025-01-??:
* - SetHostPort() entry point added
*/

View File

@ -656,6 +656,7 @@ static ATTR_NONNULL() rsRetVal SessAccept(tcpsrv_t *const pThis,
struct sockaddr_storage *addr;
uchar *fromHostFQDN = NULL;
prop_t *fromHostIP = NULL;
prop_t *fromHostPort = NULL;
ISOBJ_TYPE_assert(pThis, tcpsrv);
assert(pLstnInfo != NULL);
@ -699,6 +700,18 @@ static ATTR_NONNULL() rsRetVal SessAccept(tcpsrv_t *const pThis,
}
CHKiRet(netstrm.GetRemoteIP(pNewStrm, &fromHostIP));
CHKiRet(netstrm.GetRemAddr(pNewStrm, &addr));
char portbuf[8];
uint16_t port;
if (addr->ss_family == AF_INET)
port = ntohs(((struct sockaddr_in *)addr)->sin_port);
else if (addr->ss_family == AF_INET6)
port = ntohs(((struct sockaddr_in6 *)addr)->sin6_port);
else
port = 0;
snprintf(portbuf, sizeof(portbuf), "%u", port);
CHKiRet(prop.Construct(&fromHostPort));
CHKiRet(prop.SetString(fromHostPort, (uchar *)portbuf, strlen(portbuf)));
CHKiRet(prop.ConstructFinalize(fromHostPort));
/* Here we check if a host is permitted to send us messages. If it isn't, we do not further
* process the message but log a warning (if we are configured to do this).
* rgerhards, 2005-09-26
@ -721,6 +734,8 @@ static ATTR_NONNULL() rsRetVal SessAccept(tcpsrv_t *const pThis,
CHKiRet(tcps_sess.SetHost(pSess, fromHostFQDN));
fromHostFQDN = NULL; /* we handed this string over */
CHKiRet(tcps_sess.SetHostIP(pSess, fromHostIP));
CHKiRet(tcps_sess.SetHostPort(pSess, fromHostPort));
fromHostPort = NULL;
CHKiRet(tcps_sess.SetStrm(pSess, pNewStrm));
pNewStrm = NULL; /* prevent it from being freed in error handler, now done in tcps_sess! */
CHKiRet(tcps_sess.SetMsgIdx(pSess, 0));
@ -752,6 +767,7 @@ finalize_it:
if (pSess != NULL) tcps_sess.Destruct(&pSess);
if (pNewStrm != NULL) netstrm.Destruct(&pNewStrm);
if (fromHostIP != NULL) prop.Destruct(&fromHostIP);
if (fromHostPort != NULL) prop.Destruct(&fromHostPort);
free(fromHostFQDN);
}

View File

@ -216,6 +216,7 @@ typedef uintTiny propid_t;
#define PROP_PARSESUCCESS 23
#define PROP_JSONMESG 24
#define PROP_RAWMSG_AFTER_PRI 25
#define PROP_FROMHOST_PORT 26
#define PROP_SYS_NOW 150
#define PROP_SYS_YEAR 151
#define PROP_SYS_MONTH 152

View File

@ -612,7 +612,9 @@ endif # ENABLE_DEFAULT_TESTS
if ENABLE_IMTCP_TESTS
TESTS += \
imtcp-listen-port-file-2.sh \
fromhost-port.sh \
fromhost-port-tuple.sh \
fromhost-port-async-ruleset.sh \
allowed-sender-tcp-ok.sh \
allowed-sender-tcp-fail.sh \
allowed-sender-tcp-hostname-ok.sh \
@ -2506,6 +2508,9 @@ EXTRA_DIST= \
msgdup_props.sh \
empty-ruleset.sh \
ruleset-direct-queue.sh \
fromhost-port.sh \
fromhost-port-tuple.sh \
fromhost-port-async-ruleset.sh \
imtcp-listen-port-file-2.sh \
allowed-sender-tcp-ok.sh \
allowed-sender-tcp-fail.sh \

View File

@ -0,0 +1,32 @@
#!/bin/bash
## fromhost-port.sh
## Check that fromhost-port property records sender port and that it
## can properly be carried over to a second asnc ruleset.
. ${srcdir:=.}/diag.sh init
export NUMMESSAGES=1
export QUEUE_EMPTY_CHECK_FUNC=wait_file_lines
generate_conf
add_conf '
module(load="../plugins/imtcp/.libs/imtcp")
input(type="imtcp" port="0" listenPortFileName="'$RSYSLOG_DYNNAME'.tcpflood_port")
template(name="outfmt" type="list") {
property(name="fromhost-port")
constant(value="\n")
}
call async
# Note: a disk-type queue is selected to test even more rsyslog core features
ruleset(name="async" queue.type="disk") {
:msg, contains, "msgnum:" action(type="omfile" template="outfmt"
file="'$RSYSLOG_OUT_LOG'")
}
'
startup
tcpflood -m $NUMMESSAGES -w "${RSYSLOG_DYNNAME}.tcpflood-port"
shutdown_when_empty
wait_shutdown
export EXPECTED="$(cat "${RSYSLOG_DYNNAME}.tcpflood-port")"
cmp_exact
exit_test

28
tests/fromhost-port-tuple.sh Executable file
View File

@ -0,0 +1,28 @@
#!/bin/bash
## fromhost-port.sh
## Check that fromhost-port property records sender port
. ${srcdir:=.}/diag.sh init
export NUMMESSAGES=1
export QUEUE_EMPTY_CHECK_FUNC=wait_file_lines
generate_conf
add_conf '
module(load="../plugins/imtcp/.libs/imtcp")
input(type="imtcp" port="0" listenPortFileName="'$RSYSLOG_DYNNAME'.tcpflood_port")
template(name="outfmt" type="list") {
property(name="$.ip_port")
constant(value="\n")
}
set $.ip_port = $fromhost-ip & ":" & $fromhost-port;
:msg, contains, "msgnum:" action(type="omfile" template="outfmt"
file="'$RSYSLOG_OUT_LOG'")
'
startup
tcpflood -m $NUMMESSAGES -w "${RSYSLOG_DYNNAME}.tcpflood-port"
shutdown_when_empty
wait_shutdown
export EXPECTED="127.0.0.1:$(cat "${RSYSLOG_DYNNAME}.tcpflood-port")"
cmp_exact
exit_test

26
tests/fromhost-port.sh Executable file
View File

@ -0,0 +1,26 @@
#!/bin/bash
## fromhost-port.sh
## Check that fromhost-port property records sender port
. ${srcdir:=.}/diag.sh init
export NUMMESSAGES=1
export QUEUE_EMPTY_CHECK_FUNC=wait_file_lines
generate_conf
add_conf '
module(load="../plugins/imtcp/.libs/imtcp")
input(type="imtcp" port="0" listenPortFileName="'$RSYSLOG_DYNNAME'.tcpflood_port")
template(name="outfmt" type="list") {
property(name="fromhost-port")
constant(value="\n")
}
:msg, contains, "msgnum:" action(type="omfile" template="outfmt"
file="'$RSYSLOG_OUT_LOG'")
'
startup
tcpflood -m $NUMMESSAGES -w "${RSYSLOG_DYNNAME}.tcpflood-port"
shutdown_when_empty
wait_shutdown
export EXPECTED="$(cat "${RSYSLOG_DYNNAME}.tcpflood-port")"
cmp_exact
exit_test