mirror of
https://github.com/rsyslog/rsyslog.git
synced 2025-12-13 03:40:41 +01:00
omazureeventhubs: Fix implementation of amqp_address parameter
Parameter amqp_address is now handeled correctly. Added testcase for amqp_address parameter. closes: https://github.com/rsyslog/rsyslog/issues/5413
This commit is contained in:
parent
a467e9651d
commit
bf60befc54
@ -208,10 +208,10 @@ static rsRetVal writeProton(wrkrInstanceData_t *__restrict__ const pWrkrData,
|
||||
/* tables for interfacing with the v6 config system */
|
||||
/* action (instance) parameters */
|
||||
static struct cnfparamdescr actpdescr[] = {
|
||||
{ "azurehost", eCmdHdlrString, CNFPARAM_REQUIRED },
|
||||
{ "azureport", eCmdHdlrString, CNFPARAM_REQUIRED },
|
||||
{ "azure_key_name", eCmdHdlrString, CNFPARAM_REQUIRED },
|
||||
{ "azure_key", eCmdHdlrString, CNFPARAM_REQUIRED },
|
||||
{ "azurehost", eCmdHdlrString, 0 },
|
||||
{ "azureport", eCmdHdlrString, 0 },
|
||||
{ "azure_key_name", eCmdHdlrString, 0 },
|
||||
{ "azure_key", eCmdHdlrString, 0 },
|
||||
{ "amqp_address", eCmdHdlrString, 0 },
|
||||
{ "container", eCmdHdlrString, 0 },
|
||||
{ "eventproperties", eCmdHdlrArray, 0 },
|
||||
@ -872,20 +872,6 @@ CODESTARTnewActInst
|
||||
}
|
||||
}
|
||||
|
||||
if(pData->azure_key_name == NULL || pData->azure_key == NULL) {
|
||||
LogError(0, RS_RET_CONFIG_ERROR,
|
||||
"omazureeventhubs: azure_key_name and azure_key are requires to access azure eventhubs"
|
||||
" - action definition invalid");
|
||||
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
|
||||
}
|
||||
|
||||
if(pData->container == NULL) {
|
||||
LogError(0, RS_RET_CONFIG_ERROR,
|
||||
"omazureeventhubs: Event Hubs \"container\" parameter (which is instance) not specified "
|
||||
" - action definition invalid");
|
||||
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
|
||||
}
|
||||
|
||||
if(pData->amqp_address == NULL) {
|
||||
if(pData->azurehost == NULL) {
|
||||
LogMsg(0, NO_ERRCODE, LOG_INFO, "omazureeventhubs: \"azurehost\" parameter not specified "
|
||||
@ -896,6 +882,18 @@ CODESTARTnewActInst
|
||||
// Set default
|
||||
CHKmalloc(pData->azureport = (uchar *) strdup("5671"));
|
||||
}
|
||||
if(pData->azure_key_name == NULL || pData->azure_key == NULL) {
|
||||
LogError(0, RS_RET_CONFIG_ERROR,
|
||||
"omazureeventhubs: azure_key_name and azure_key are requires to access azure eventhubs"
|
||||
" - action definition invalid");
|
||||
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
|
||||
}
|
||||
if(pData->container == NULL) {
|
||||
LogError(0, RS_RET_CONFIG_ERROR,
|
||||
"omazureeventhubs: Event Hubs \"container\" parameter (which is instance) not specified"
|
||||
" - action definition invalid");
|
||||
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
|
||||
}
|
||||
|
||||
// Create amqps URL from parameters
|
||||
char szAddress[1024];
|
||||
@ -906,6 +904,85 @@ CODESTARTnewActInst
|
||||
pData->azureport,
|
||||
pData->container);
|
||||
CHKmalloc(pData->amqp_address = (uchar*) strdup(szAddress));
|
||||
} else {
|
||||
// Free if set first
|
||||
pData->azurehost = NULL;
|
||||
pData->azureport = NULL;
|
||||
pData->azure_key_name = NULL;
|
||||
pData->azure_key = NULL;
|
||||
pData->container = NULL;
|
||||
|
||||
// Remove the protocol part
|
||||
char* amqp_address_dup = strdup((char*)pData->amqp_address);
|
||||
char* startstr = strstr(amqp_address_dup, "amqps://");
|
||||
char* endstr = NULL;
|
||||
if (!startstr) {
|
||||
LogError(0, RS_RET_CONFIG_ERROR,
|
||||
"omazureeventhubs: \"amqp_address\" parameter (URL) invalid "
|
||||
" - could not find prefix in URL");
|
||||
free(amqp_address_dup);
|
||||
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
|
||||
}
|
||||
startstr += strlen("amqps://");
|
||||
|
||||
// Parse AccessKeyName
|
||||
endstr = strchr(startstr, ':');
|
||||
if (!endstr) {
|
||||
LogError(0, RS_RET_CONFIG_ERROR,
|
||||
"omazureeventhubs: \"amqp_address\" parameter (URL) invalid "
|
||||
" - could not find azure_key_name in URL");
|
||||
free(amqp_address_dup);
|
||||
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
|
||||
}
|
||||
*endstr = '\0';
|
||||
pData->azure_key_name = (uchar*) strdup(startstr);
|
||||
|
||||
// Parse AccessKey
|
||||
startstr = endstr + 1;
|
||||
endstr = strchr(startstr, '@');
|
||||
if (!endstr) {
|
||||
LogError(0, RS_RET_CONFIG_ERROR,
|
||||
"omazureeventhubs: \"amqp_address\" parameter (URL) invalid "
|
||||
" - could not find azure_key in URL");
|
||||
free(amqp_address_dup);
|
||||
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
|
||||
}
|
||||
*endstr = '\0';
|
||||
pData->azure_key = (uchar*) strdup(startstr);
|
||||
|
||||
// Parse EventHubsNamespace and EventHubsInstance
|
||||
startstr = endstr + 1;
|
||||
endstr = strchr(startstr, '/');
|
||||
if (!endstr) {
|
||||
LogError(0, RS_RET_CONFIG_ERROR,
|
||||
"omazureeventhubs: \"amqp_address\" parameter (URL) invalid "
|
||||
" - could not find azurehost in URL");
|
||||
free(amqp_address_dup);
|
||||
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
|
||||
}
|
||||
*endstr = '\0';
|
||||
pData->azurehost = (uchar*) strdup(startstr);
|
||||
|
||||
// Set port to default 5671 (amqps)
|
||||
pData->azureport = (uchar*) strdup("5671");
|
||||
|
||||
// Copy Rest into container (EventHubsInstance)
|
||||
startstr = endstr + 1;
|
||||
pData->container = (uchar*) strdup(startstr);
|
||||
|
||||
// Free dup memory
|
||||
free(amqp_address_dup);
|
||||
|
||||
// Output Debug Information
|
||||
DBGPRINTF(
|
||||
"newActInst: parsed amqp_address parameters for %p@key_name=%s key=%s host=%s port=%s container=%s\n",
|
||||
pData,
|
||||
pData->azure_key_name,
|
||||
pData->azure_key,
|
||||
pData->azurehost,
|
||||
pData->azureport,
|
||||
pData->container
|
||||
);
|
||||
}
|
||||
|
||||
iNumTpls = 1;
|
||||
|
||||
@ -997,6 +997,7 @@ if ENABLE_OMAZUREEVENTHUBS
|
||||
if ENABLE_OMAZUREEVENTHUBS_TESTS
|
||||
TESTS += \
|
||||
omazureeventhubs-basic.sh \
|
||||
omazureeventhubs-basic-url.sh \
|
||||
omazureeventhubs-list.sh \
|
||||
omazureeventhubs-stress.sh \
|
||||
omazureeventhubs-interrupt.sh
|
||||
@ -2866,6 +2867,7 @@ EXTRA_DIST= \
|
||||
testsuites/zoo.dep_wrk2.cfg \
|
||||
testsuites/zoo.dep_wrk3.cfg \
|
||||
omazureeventhubs-basic.sh \
|
||||
omazureeventhubs-basic-url.sh \
|
||||
omazureeventhubs-list.sh \
|
||||
omazureeventhubs-stress.sh \
|
||||
omazureeventhubs-interrupt.sh \
|
||||
|
||||
118
tests/omazureeventhubs-basic-url.sh
Executable file
118
tests/omazureeventhubs-basic-url.sh
Executable file
@ -0,0 +1,118 @@
|
||||
#!/bin/bash
|
||||
# This file is part of the rsyslog project, released under ASL 2.0
|
||||
. ${srcdir:=.}/diag.sh init
|
||||
|
||||
export NUMMESSAGES=100
|
||||
export NUMMESSAGESFULL=$NUMMESSAGES
|
||||
export WAITTIMEOUT=20
|
||||
|
||||
# REQUIRES EXTERNAL ENVIRONMENT VARIABLES
|
||||
if [[ -z "${AZURE_HOST}" ]]; then
|
||||
echo "SKIP: AZURE_HOST environment variable not SET! Example: <yourname>.servicebus.windows.net - SKIPPING"
|
||||
exit 77
|
||||
fi
|
||||
if [[ -z "${AZURE_PORT}" ]]; then
|
||||
echo "SKIP: AZURE_PORT environment variable not SET! Example: 5671 - SKIPPING"
|
||||
exit 77
|
||||
fi
|
||||
if [[ -z "${AZURE_KEY_NAME}" ]]; then
|
||||
echo "SKIP: AZURE_KEY_NAME environment variable not SET! Example: <yourkeyname> - SKIPPING"
|
||||
exit 77
|
||||
fi
|
||||
if [[ -z "${AZURE_KEY}" ]]; then
|
||||
echo "SKIP: AZURE_KEY environment variable not SET! Example: <yourlongkey> - SKIPPING"
|
||||
exit 77
|
||||
fi
|
||||
if [[ -z "${AZURE_CONTAINER}" ]]; then
|
||||
echo "SKIP: AZURE_CONTAINER environment variable not SET! Example: <youreventhubsname> - SKIPPING"
|
||||
exit 77
|
||||
fi
|
||||
|
||||
export AMQPS_ADRESS="amqps://$AZURE_KEY_NAME:$AZURE_KEY@$AZURE_HOST:$AZURE_PORT/$AZURE_NAME"
|
||||
export AZURE_ENDPOINT="Endpoint=sb://$AZURE_HOST/;SharedAccessKeyName=$AZURE_KEY_NAME;SharedAccessKey=$AZURE_KEY;EntityPath=$AZURE_NAME"
|
||||
|
||||
# --- Create/Start omazureeventhubs sender config
|
||||
|
||||
generate_conf
|
||||
add_conf '
|
||||
global(
|
||||
debug.whitelist="on"
|
||||
debug.files=["omazureeventhubs.c", "modules.c", "errmsg.c", "action.c"]
|
||||
)
|
||||
|
||||
# impstats in order to gain insight into error cases
|
||||
module(load="../plugins/impstats/.libs/impstats"
|
||||
log.file="'$RSYSLOG_DYNNAME.pstats'"
|
||||
interval="1" log.syslog="off")
|
||||
$imdiagInjectDelayMode full
|
||||
|
||||
# Load mods
|
||||
module(load="../plugins/omazureeventhubs/.libs/omazureeventhubs")
|
||||
|
||||
# templates
|
||||
template(name="outfmt" type="string" string="%msg:F,58:2%\n")
|
||||
|
||||
local4.* {
|
||||
action( name="omazureeventhubs"
|
||||
type="omazureeventhubs"
|
||||
amqp_address="amqps://'$AZURE_KEY_NAME':'$AZURE_KEY'@'$AZURE_HOST'/'$AZURE_CONTAINER'"
|
||||
template="outfmt"
|
||||
action.resumeInterval="1"
|
||||
action.resumeRetryCount="2"
|
||||
)
|
||||
|
||||
action( type="omfile" file="'$RSYSLOG_OUT_LOG'")
|
||||
stop
|
||||
}
|
||||
|
||||
action( type="omfile" file="'$RSYSLOG_DYNNAME.othermsg'")
|
||||
'
|
||||
echo Starting sender instance [omazureeventhubs]
|
||||
startup
|
||||
|
||||
echo Inject messages into rsyslog sender instance
|
||||
injectmsg 1 $NUMMESSAGES
|
||||
|
||||
wait_file_lines $RSYSLOG_OUT_LOG $NUMMESSAGESFULL 100
|
||||
|
||||
# experimental: wait until kafkacat receives everything
|
||||
timeoutend=$WAITTIMEOUT
|
||||
timecounter=0
|
||||
|
||||
echo "CHECK $RSYSLOG_DYNNAME.pstats"
|
||||
while [ $timecounter -lt $timeoutend ]; do
|
||||
(( timecounter++ ))
|
||||
|
||||
if [ -f "$RSYSLOG_DYNNAME.pstats" ] ; then
|
||||
# Read IMPSTATS for verification
|
||||
IMPSTATSLINE=$(cat $RSYSLOG_DYNNAME.pstats | grep "origin\=omazureeventhubs" | tail -1 | cut -d: -f5)
|
||||
SUBMITTED_MSG=$(echo $IMPSTATSLINE | grep "submitted" | cut -d" " -f2 | cut -d"=" -f2)
|
||||
FAILED_MSG=$(echo $IMPSTATSLINE | grep "failures" | cut -d" " -f3 | cut -d"=" -f2)
|
||||
ACCEPTED_MSG=$(echo $IMPSTATSLINE | grep "accepted" | cut -d" " -f4 | cut -d"=" -f2)
|
||||
|
||||
if ! [[ $SUBMITTED_MSG =~ $re ]] ; then
|
||||
echo "**** omazureeventhubs WAITING FOR IMPSTATS"
|
||||
else
|
||||
if [ "$SUBMITTED_MSG" -ge "$NUMMESSAGESFULL" ]; then
|
||||
if [ "$ACCEPTED_MSG" -eq "$NUMMESSAGESFULL" ]; then
|
||||
echo "**** omazureeventhubs SUCCESS: NUMMESSAGESFULL: $NUMMESSAGESFULL, SUBMITTED_MSG:$SUBMITTED_MSG, ACCEPTED_MSG: $ACCEPTED_MSG, FAILED_MSG: $FAILED_MSG"
|
||||
shutdown_when_empty
|
||||
wait_shutdown
|
||||
#cp $RSYSLOG_DEBUGLOG DEBUGDEBUG.log
|
||||
exit_test
|
||||
else
|
||||
echo "**** omazureeventhubs FAIL: NUMMESSAGESFULL: $NUMMESSAGESFULL, SUBMITTED/WAITING: SUBMITTED_MSG:$SUBMITTED_MSG, ACCEPTED_MSG: $ACCEPTED_MSG, FAILED_MSG: $FAILED_MSG"
|
||||
fi
|
||||
else
|
||||
echo "**** omazureeventhubs WAITING: SUBMITTED_MSG:$SUBMITTED_MSG, ACCEPTED_MSG: $ACCEPTED_MSG, FAILED_MSG: $FAILED_MSG"
|
||||
fi
|
||||
fi
|
||||
fi
|
||||
|
||||
$TESTTOOL_DIR/msleep 1000
|
||||
done
|
||||
unset count
|
||||
|
||||
shutdown_when_empty
|
||||
wait_shutdown
|
||||
error_exit 1
|
||||
Loading…
x
Reference in New Issue
Block a user