mirror of
https://github.com/rsyslog/rsyslog.git
synced 2026-04-23 12:38:12 +02:00
omazureeventhubs: fix data races and suppressions
- Add msgLock to protect the message queue/helper array and counters across the worker and proton thread. - Make locking cancellation-safe to avoid shutdown deadlocks (pthread_cleanup_push/pop via mutexCancelCleanup). - Fix delivery tag handling: don’t treat Proton tags as NUL-terminated strings; log via "%.*s" and parse tag IDs length-safely (no OOB reads under Valgrind). - Fix message-id length passed into protonmsg_entry_construct (use strlen). - Use pn_message_send(..., NULL) so Proton owns the encode buffer (avoid stale buffer reuse); drop any manual send-buffer free in worker cleanup. - Add Valgrind suppressions for Proton OpenSSL init leak (standard + interrupt paths). - Small cleanups / comment improvements.
This commit is contained in:
parent
d0b53cacab
commit
0d415e9ce7
@ -1191,6 +1191,13 @@ AC_ARG_ENABLE(clickhouse_tests,
|
||||
)
|
||||
AM_CONDITIONAL(ENABLE_CLICKHOUSE_TESTS, test x$enable_clickhouse_tests = xyes)
|
||||
|
||||
AC_DEFUN([RSYSLOG_CHECK_OPENSSL_FOR_PROTON], [
|
||||
# Proton requires OpenSSL for SSL/TLS support. Ensure OpenSSL is available.
|
||||
# If OPENSSL_LIBS is not already set (from --enable-openssl), detect it now.
|
||||
if test "x$OPENSSL_LIBS" = "x"; then
|
||||
PKG_CHECK_MODULES(OPENSSL, openssl)
|
||||
fi
|
||||
])
|
||||
|
||||
# openssl support
|
||||
AC_ARG_ENABLE(openssl,
|
||||
@ -2766,6 +2773,7 @@ if test "x$enable_omazureeventhubs" = "xyes"; then
|
||||
PKG_CHECK_MODULES(PROTON_PROACTOR, libqpid-proton-proactor >= 0.13)
|
||||
AC_SUBST(PROTON_PROACTOR_CFLAGS)
|
||||
AC_SUBST(PROTON_PROACTOR_LIBS)
|
||||
RSYSLOG_CHECK_OPENSSL_FOR_PROTON
|
||||
fi
|
||||
AM_CONDITIONAL(ENABLE_OMAZUREEVENTHUBS, test x$enable_omazureeventhubs = xyes)
|
||||
# END PROTON
|
||||
@ -3076,6 +3084,7 @@ if test "x$enable_omamqp1" = "xyes"; then
|
||||
PKG_CHECK_MODULES(PROTON, libqpid-proton >= 0.13)
|
||||
AC_SUBST(PROTON_CFLAGS)
|
||||
AC_SUBST(PROTON_LIBS)
|
||||
RSYSLOG_CHECK_OPENSSL_FOR_PROTON
|
||||
fi
|
||||
AM_CONDITIONAL(ENABLE_OMAMQP1, test x$enable_omamqp1 = xyes)
|
||||
|
||||
|
||||
@ -168,6 +168,7 @@ typedef struct wrkrInstanceData {
|
||||
int bIsConnected; /* 1 if connected, 0 if disconnected */
|
||||
int bIsSuspended; /* when broker fail, we need to suspend the action */
|
||||
pthread_rwlock_t pnLock;
|
||||
pthread_mutex_t msgLock; /* protects message queue state */
|
||||
|
||||
// PROTON Handles
|
||||
pn_proactor_t *pnProactor;
|
||||
@ -372,6 +373,7 @@ static rsRetVal closeProton(wrkrInstanceData_t *const __restrict__ pWrkrData) {
|
||||
|
||||
// Mark all remaining entries as REJECTED
|
||||
if (pWrkrData->aProtonMsgs != NULL) {
|
||||
pthread_mutex_lock(&pWrkrData->msgLock);
|
||||
for (unsigned int i = 0; i < pWrkrData->nProtonMsgs; ++i) {
|
||||
if (pWrkrData->aProtonMsgs[i] != NULL && (pWrkrData->aProtonMsgs[i]->status == PROTON_UNSUBMITTED ||
|
||||
pWrkrData->aProtonMsgs[i]->status == PROTON_SUBMITTED)) {
|
||||
@ -383,6 +385,7 @@ static rsRetVal closeProton(wrkrInstanceData_t *const __restrict__ pWrkrData) {
|
||||
INST_STATSCOUNTER_INC(pData, pData->ctrAzureFail, pData->mutCtrAzureFail);
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&pWrkrData->msgLock);
|
||||
}
|
||||
|
||||
FINALIZE;
|
||||
@ -525,9 +528,12 @@ static rsRetVal writeProton(wrkrInstanceData_t *__restrict__ const pWrkrData,
|
||||
DEFiRet;
|
||||
instanceData *const pData = (instanceData *const)pWrkrData->pData;
|
||||
protonmsg_entry *fmsgEntry;
|
||||
int lockHeld = 0;
|
||||
|
||||
// Create Unqiue Message ID
|
||||
char szMsgID[64];
|
||||
pthread_mutex_lock(&pWrkrData->msgLock);
|
||||
lockHeld = 1;
|
||||
sprintf(szMsgID, "%d", pWrkrData->iMsgSeq);
|
||||
|
||||
const char *pszParamStr = (const char *)actParam(pParam, 1 /*pData->iNumTpls*/, iMsg, 0).param;
|
||||
@ -540,11 +546,18 @@ static rsRetVal writeProton(wrkrInstanceData_t *__restrict__ const pWrkrData,
|
||||
pWrkrData->iMsgSeq++;
|
||||
|
||||
// Add message to LIST for sending
|
||||
CHKmalloc(fmsgEntry = protonmsg_entry_construct(szMsgID, sizeof(szMsgID), pszParamStr, tzParamStrLen,
|
||||
pthread_mutex_unlock(&pWrkrData->msgLock);
|
||||
lockHeld = 0;
|
||||
CHKmalloc(fmsgEntry = protonmsg_entry_construct(szMsgID, strlen(szMsgID), pszParamStr, tzParamStrLen,
|
||||
(const char *)pData->amqp_address));
|
||||
pthread_mutex_lock(&pWrkrData->msgLock);
|
||||
lockHeld = 1;
|
||||
// Add to helper Array
|
||||
pWrkrData->aProtonMsgs[iMsg] = fmsgEntry;
|
||||
finalize_it:
|
||||
if (lockHeld) {
|
||||
pthread_mutex_unlock(&pWrkrData->msgLock);
|
||||
}
|
||||
RETiRet;
|
||||
}
|
||||
|
||||
@ -576,12 +589,14 @@ BEGINcreateWrkrInstance
|
||||
pWrkrData->iMsgSeq = 0;
|
||||
pWrkrData->iMaxMsgSeq = 0;
|
||||
pWrkrData->pnMessageBuffer.start = NULL;
|
||||
pWrkrData->pnMessageBuffer.size = 0;
|
||||
|
||||
pWrkrData->nProtonMsgs = 0;
|
||||
pWrkrData->nMaxProtonMsgs = MAX_DEFAULTMSGS;
|
||||
CHKmalloc(pWrkrData->aProtonMsgs = calloc(MAX_DEFAULTMSGS, sizeof(struct s_protonmsg_entry)));
|
||||
|
||||
CHKiRet(pthread_rwlock_init(&pWrkrData->pnLock, NULL));
|
||||
CHKiRet(pthread_mutex_init(&pWrkrData->msgLock, NULL));
|
||||
|
||||
pWrkrData->bThreadRunning = 0;
|
||||
pWrkrData->tid = 0;
|
||||
@ -641,21 +656,23 @@ BEGINfreeWrkrInstance
|
||||
pn_proactor_free(pWrkrData->pnProactor);
|
||||
pWrkrData->pnProactor = NULL;
|
||||
}
|
||||
free(pWrkrData->pnMessageBuffer.start);
|
||||
|
||||
pthread_rwlock_unlock(&pWrkrData->pnLock);
|
||||
|
||||
// Free our proton helper array
|
||||
if (pWrkrData->aProtonMsgs != NULL) {
|
||||
pthread_mutex_lock(&pWrkrData->msgLock);
|
||||
for (unsigned int i = 0; i < pWrkrData->nProtonMsgs; ++i) {
|
||||
if (pWrkrData->aProtonMsgs[i] != NULL) {
|
||||
protonmsg_entry_destruct(pWrkrData->aProtonMsgs[i]);
|
||||
}
|
||||
}
|
||||
free(pWrkrData->aProtonMsgs);
|
||||
pthread_mutex_unlock(&pWrkrData->msgLock);
|
||||
}
|
||||
|
||||
pthread_rwlock_destroy(&pWrkrData->pnLock);
|
||||
pthread_mutex_destroy(&pWrkrData->msgLock);
|
||||
ENDfreeWrkrInstance
|
||||
|
||||
|
||||
@ -687,9 +704,6 @@ finalize_it:
|
||||
RETiRet;
|
||||
ENDbeginTransaction
|
||||
|
||||
/*
|
||||
* New Transaction action interface
|
||||
*/
|
||||
/** \brief Send a batch of messages and wait for broker acknowledgement.
|
||||
*
|
||||
* Messages queued via writeProton() are submitted to the proton sender. The
|
||||
@ -697,47 +711,78 @@ ENDbeginTransaction
|
||||
* I/O and updates the status of each message asynchronously. This function
|
||||
* loops until all messages have either been accepted or a suspension condition
|
||||
* is detected.
|
||||
*
|
||||
* When the helper needs to grow we allocate a fresh zeroed array under the
|
||||
* queue mutex and swap it in. The previous buffer only contains remnants from
|
||||
* older transactions and is destroyed after the swap, so no in-flight message
|
||||
* state is lost during resizing.
|
||||
*/
|
||||
BEGINcommitTransaction
|
||||
// instanceData *__restrict__ const pData = pWrkrData->pData;
|
||||
unsigned i;
|
||||
unsigned iNeedSubmission;
|
||||
sbool bDone = 0;
|
||||
protonmsg_entry *pMsgEntry = NULL;
|
||||
CODESTARTcommitTransaction;
|
||||
#ifndef NDEBUG
|
||||
DBGPRINTF("omazureeventhubs[%p]: commitTransaction [%d msgs] ENTER\n", pWrkrData, nParams);
|
||||
#endif
|
||||
|
||||
// Handle/Expand our proton helper array
|
||||
if (nParams > pWrkrData->nMaxProtonMsgs) {
|
||||
// Free old Array
|
||||
if (pWrkrData->aProtonMsgs != NULL) {
|
||||
free(pWrkrData->aProtonMsgs);
|
||||
}
|
||||
// Expand our proton helper array
|
||||
protonmsg_entry **newMsgs = NULL;
|
||||
protonmsg_entry **oldMsgs = NULL;
|
||||
unsigned int oldMaxMsgs = 0;
|
||||
unsigned int currentMaxMsgs = 0;
|
||||
if (pthread_mutex_lock(&pWrkrData->msgLock) != 0) {
|
||||
LogError(0, RS_RET_SYS_ERR, "omazureeventhubs: could not lock msg queue in commitTransaction");
|
||||
ABORT_FINALIZE(RS_RET_SYS_ERR);
|
||||
}
|
||||
currentMaxMsgs = pWrkrData->nMaxProtonMsgs;
|
||||
pthread_mutex_unlock(&pWrkrData->msgLock);
|
||||
|
||||
if (nParams > currentMaxMsgs) {
|
||||
DBGPRINTF("omazureeventhubs[%p]: commitTransaction expand helper array from %d to %d\n", pWrkrData,
|
||||
pWrkrData->nMaxProtonMsgs, nParams);
|
||||
pWrkrData->nMaxProtonMsgs = nParams; // Set new MAX
|
||||
CHKmalloc(pWrkrData->aProtonMsgs = calloc(pWrkrData->nMaxProtonMsgs, sizeof(struct s_protonmsg_entry)));
|
||||
currentMaxMsgs, nParams);
|
||||
CHKmalloc(newMsgs = calloc(nParams, sizeof(struct s_protonmsg_entry)));
|
||||
}
|
||||
|
||||
pthread_mutex_lock(&pWrkrData->msgLock);
|
||||
if (newMsgs != NULL) {
|
||||
oldMsgs = pWrkrData->aProtonMsgs;
|
||||
oldMaxMsgs = pWrkrData->nMaxProtonMsgs;
|
||||
pWrkrData->aProtonMsgs = newMsgs;
|
||||
pWrkrData->nMaxProtonMsgs = nParams;
|
||||
}
|
||||
// Copy count of New Messages and increase MaxMsgSeq
|
||||
pWrkrData->nProtonMsgs = nParams;
|
||||
pWrkrData->iMaxMsgSeq += nParams;
|
||||
pthread_mutex_unlock(&pWrkrData->msgLock);
|
||||
|
||||
if (oldMsgs != NULL) {
|
||||
for (i = 0; i < oldMaxMsgs; ++i) {
|
||||
if (oldMsgs[i] != NULL) {
|
||||
protonmsg_entry_destruct(oldMsgs[i]);
|
||||
}
|
||||
}
|
||||
free(oldMsgs);
|
||||
}
|
||||
|
||||
do {
|
||||
iNeedSubmission = 0;
|
||||
// Put unsubmitted messages into Proton
|
||||
for (i = 0; i < nParams; ++i) {
|
||||
sbool needSubmission = 0;
|
||||
// Get reference to Proton Array Helper
|
||||
pMsgEntry = ((protonmsg_entry *)pWrkrData->aProtonMsgs[i]);
|
||||
pthread_mutex_lock(&pWrkrData->msgLock);
|
||||
pMsgEntry = (protonmsg_entry *)pWrkrData->aProtonMsgs[i];
|
||||
if (pMsgEntry == NULL) {
|
||||
// Send Message to Proton
|
||||
writeProton(pWrkrData, pParams, i);
|
||||
needSubmission = 1;
|
||||
} else if (pMsgEntry->status == PROTON_REJECTED) {
|
||||
// Reset Message Entry, it will be retried
|
||||
pMsgEntry->status = PROTON_UNSUBMITTED;
|
||||
}
|
||||
pthread_mutex_unlock(&pWrkrData->msgLock);
|
||||
|
||||
if (needSubmission) {
|
||||
CHKiRet(writeProton(pWrkrData, pParams, i));
|
||||
}
|
||||
}
|
||||
bDone = 1;
|
||||
|
||||
@ -751,8 +796,8 @@ BEGINcommitTransaction
|
||||
|
||||
// Verify if messages have been submitted successfully
|
||||
for (i = 0; i < nParams; ++i) {
|
||||
// Get reference to Proton Array Helper
|
||||
pMsgEntry = ((protonmsg_entry *)pWrkrData->aProtonMsgs[i]);
|
||||
pthread_mutex_lock(&pWrkrData->msgLock);
|
||||
pMsgEntry = (protonmsg_entry *)pWrkrData->aProtonMsgs[i];
|
||||
if (pMsgEntry != NULL) {
|
||||
if (pMsgEntry->status == PROTON_UNSUBMITTED) {
|
||||
iNeedSubmission++;
|
||||
@ -763,6 +808,7 @@ BEGINcommitTransaction
|
||||
bDone = 0;
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&pWrkrData->msgLock);
|
||||
}
|
||||
|
||||
if (iNeedSubmission > 0) {
|
||||
@ -786,6 +832,7 @@ BEGINcommitTransaction
|
||||
finalize_it:
|
||||
// Free Proton Message Helpers
|
||||
if (pWrkrData->aProtonMsgs != NULL) {
|
||||
pthread_mutex_lock(&pWrkrData->msgLock);
|
||||
for (i = 0; i < nParams; ++i) {
|
||||
if (pWrkrData->aProtonMsgs[i] != NULL) {
|
||||
// Destroy
|
||||
@ -793,6 +840,7 @@ finalize_it:
|
||||
pWrkrData->aProtonMsgs[i] = NULL;
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&pWrkrData->msgLock);
|
||||
}
|
||||
|
||||
/* TODO: Suspend Action if broker problems were reported in error callback */
|
||||
@ -1177,6 +1225,11 @@ static void *proton_thread(void __attribute__((unused)) * pVoidWrkrData) {
|
||||
*/
|
||||
static void handleProtonDelivery(wrkrInstanceData_t *const pWrkrData) {
|
||||
instanceData *const pData = (instanceData *const)pWrkrData->pData;
|
||||
if (pthread_mutex_lock(&pWrkrData->msgLock) != 0) {
|
||||
LogError(0, RS_RET_SYS_ERR, "omazureeventhubs: could not lock msg queue for delivery processing");
|
||||
return;
|
||||
}
|
||||
pthread_cleanup_push(mutexCancelCleanup, &pWrkrData->msgLock);
|
||||
|
||||
/* Process messages from ARRAY */
|
||||
for (unsigned int i = 0; i < pWrkrData->nProtonMsgs; ++i) {
|
||||
@ -1200,14 +1253,18 @@ static void handleProtonDelivery(wrkrInstanceData_t *const pWrkrData) {
|
||||
* - call pn_message_encode2() to encode the message to a buffer
|
||||
* - call pn_link_send() to send the encoded message bytes
|
||||
* - call pn_link_advance() to indicate the message is complete
|
||||
*
|
||||
* Pass NULL to let Proton manage the buffer lifecycle completely,
|
||||
* avoiding issues with uninitialized buffer data.
|
||||
*/
|
||||
if (pn_message_send(message, pWrkrData->pnSender, &pWrkrData->pnMessageBuffer) < 0) {
|
||||
ssize_t sendLen = pn_message_send(message, pWrkrData->pnSender, NULL);
|
||||
if (sendLen < 0) {
|
||||
LogMsg(0, NO_ERRCODE, LOG_INFO, "handleProtonDelivery: PN_LINK_FLOW deliver SEND ERROR %s\n",
|
||||
pn_error_text(pn_message_error(message)));
|
||||
pn_message_free(message);
|
||||
break;
|
||||
goto finalize_it;
|
||||
} else {
|
||||
DBGPRINTF("handleProtonDelivery: PN_LINK_FLOW deliver SUCCESS\n");
|
||||
DBGPRINTF("handleProtonDelivery: PN_LINK_FLOW deliver SUCCESS (sent %zd bytes)\n", sendLen);
|
||||
pn_message_free(message);
|
||||
}
|
||||
|
||||
@ -1224,7 +1281,7 @@ static void handleProtonDelivery(wrkrInstanceData_t *const pWrkrData) {
|
||||
// TODO: MAKE CONFIGUREABLE
|
||||
// Wait 10 microseconds
|
||||
// srSleep(0, 10000);
|
||||
break;
|
||||
goto finalize_it;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -1232,6 +1289,9 @@ static void handleProtonDelivery(wrkrInstanceData_t *const pWrkrData) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
finalize_it:
|
||||
pthread_cleanup_pop(1);
|
||||
}
|
||||
|
||||
/* Handles PROTON Communication in this Function
|
||||
@ -1275,6 +1335,10 @@ static void handleProton(wrkrInstanceData_t *const pWrkrData, pn_event_t *event)
|
||||
DBGPRINTF("handleProton: PN_CONNECTION_INIT to %p:%s:%s/%s\n", pWrkrData, pData->azurehost,
|
||||
pData->azureport, pData->container);
|
||||
pWrkrData->pnStatus = PN_CONNECTION_INIT;
|
||||
unsigned int pendingMsgs = 0;
|
||||
pthread_mutex_lock(&pWrkrData->msgLock);
|
||||
pendingMsgs = pWrkrData->nProtonMsgs;
|
||||
pthread_mutex_unlock(&pWrkrData->msgLock);
|
||||
|
||||
// Get Connection
|
||||
pWrkrData->pnConn = pn_event_connection(event);
|
||||
@ -1293,7 +1357,7 @@ static void handleProton(wrkrInstanceData_t *const pWrkrData, pn_event_t *event)
|
||||
pn_terminus_set_address(pn_link_target(pWrkrData->pnSender), (const char *)pData->amqp_address);
|
||||
|
||||
pn_link_open(pWrkrData->pnSender);
|
||||
pn_link_flow(pWrkrData->pnSender, pWrkrData->nProtonMsgs);
|
||||
pn_link_flow(pWrkrData->pnSender, pendingMsgs);
|
||||
break;
|
||||
}
|
||||
case PN_CONNECTION_REMOTE_OPEN: {
|
||||
@ -1317,7 +1381,11 @@ static void handleProton(wrkrInstanceData_t *const pWrkrData, pn_event_t *event)
|
||||
break;
|
||||
}
|
||||
case PN_CONNECTION_WAKE: {
|
||||
DBGPRINTF("handleProton: PN_CONNECTION_WAKE (%d) to %p:%s:%s/%s\n", pWrkrData->nProtonMsgs, pWrkrData,
|
||||
unsigned int pendingMsgs = 0;
|
||||
pthread_mutex_lock(&pWrkrData->msgLock);
|
||||
pendingMsgs = pWrkrData->nProtonMsgs;
|
||||
pthread_mutex_unlock(&pWrkrData->msgLock);
|
||||
DBGPRINTF("handleProton: PN_CONNECTION_WAKE (%d) to %p:%s:%s/%s\n", pendingMsgs, pWrkrData,
|
||||
pData->azurehost, pData->azureport, pData->container);
|
||||
/* Process messages */
|
||||
handleProtonDelivery(pWrkrData);
|
||||
@ -1340,7 +1408,19 @@ static void handleProton(wrkrInstanceData_t *const pWrkrData, pn_event_t *event)
|
||||
|
||||
if (pnTag.start != NULL) {
|
||||
// Convert Tag into Number!
|
||||
unsigned int iTagNum = (unsigned int)atoi((pnTag.start != NULL ? pnTag.start : ""));
|
||||
unsigned int iTagNum = 0;
|
||||
if (pnTag.size > 0) {
|
||||
/* pn_delivery_tag_t.start is NOT guaranteed to be NUL-terminated. */
|
||||
/* Our tags are decimal ASCII (see pn_dtag() usage), so parse in-place. */
|
||||
for (size_t j = 0; j < pnTag.size; ++j) {
|
||||
const unsigned char c = (unsigned char)pnTag.start[j];
|
||||
if (c < '0' || c > '9') {
|
||||
break;
|
||||
}
|
||||
iTagNum = (iTagNum * 10u) + (unsigned int)(c - '0');
|
||||
}
|
||||
}
|
||||
pthread_mutex_lock(&pWrkrData->msgLock);
|
||||
// Calc QueueNumber from Tagnum
|
||||
unsigned int iQueueNum = pWrkrData->nProtonMsgs - (pWrkrData->iMaxMsgSeq - iTagNum);
|
||||
// Get proton Msg Helper (checks for out of bound array access)
|
||||
@ -1351,10 +1431,9 @@ static void handleProton(wrkrInstanceData_t *const pWrkrData, pn_event_t *event)
|
||||
// Process if found
|
||||
if (pMsgEntry != NULL) {
|
||||
if (pn_delivery_remote_state(pDeliveryStatus) == PN_ACCEPTED) {
|
||||
DBGPRINTF("handleProton: PN_DELIVERY SUCCESS for MSG '%s(Q %d, MAX %d)' @ %p:%s:%s/%s\n",
|
||||
(pnTag.start != NULL ? (char *)pnTag.start : "NULL"), iQueueNum,
|
||||
pWrkrData->nMaxProtonMsgs, pWrkrData, pData->azurehost, pData->azureport,
|
||||
pData->container);
|
||||
DBGPRINTF("handleProton: PN_DELIVERY SUCCESS for MSG '%.*s(Q %d, MAX %d)' @ %p:%s:%s/%s\n",
|
||||
(int)pnTag.size, pnTag.start, iQueueNum, pWrkrData->nMaxProtonMsgs, pWrkrData,
|
||||
pData->azurehost, pData->azureport, pData->container);
|
||||
pn_delivery_settle(pDeliveryStatus); // free the delivered message
|
||||
pMsgEntry->status = PROTON_ACCEPTED;
|
||||
|
||||
@ -1363,10 +1442,10 @@ static void handleProton(wrkrInstanceData_t *const pWrkrData, pn_event_t *event)
|
||||
INST_STATSCOUNTER_INC(pData, pData->ctrAzureAck, pData->mutCtrAzureAck);
|
||||
} else if (pn_delivery_remote_state(pDeliveryStatus) == PN_REJECTED) {
|
||||
LogError(0, RS_RET_ERR,
|
||||
"omazureeventhubs: PN_DELIVERY REJECTED for MSG '%s'"
|
||||
"omazureeventhubs: PN_DELIVERY REJECTED for MSG '%.*s'"
|
||||
" - @ %p:%s:%s/%s\n",
|
||||
(pnTag.start != NULL ? (char *)pnTag.start : "NULL"), pWrkrData, pData->azurehost,
|
||||
pData->azureport, pData->container);
|
||||
(int)pnTag.size, pnTag.start, pWrkrData, pData->azurehost, pData->azureport,
|
||||
pData->container);
|
||||
pMsgEntry->status = PROTON_REJECTED;
|
||||
|
||||
// Increment Stats Counter
|
||||
@ -1380,6 +1459,7 @@ static void handleProton(wrkrInstanceData_t *const pWrkrData, pn_event_t *event)
|
||||
STATSCOUNTER_INC(ctrAzureOtherErrors, mutCtrAzureOtherErrors);
|
||||
INST_STATSCOUNTER_INC(pData, pData->ctrAzureOtherErrors, pData->mutCtrAzureOtherErrors);
|
||||
}
|
||||
pthread_mutex_unlock(&pWrkrData->msgLock);
|
||||
} else {
|
||||
LogError(0, RS_RET_ERR, "handleProton: PN_DELIVERY HELPER ARRAY is NULL - @ %p:%s:%s/%s\n", pWrkrData,
|
||||
pData->azurehost, pData->azureport, pData->container);
|
||||
@ -1388,6 +1468,11 @@ static void handleProton(wrkrInstanceData_t *const pWrkrData, pn_event_t *event)
|
||||
}
|
||||
break;
|
||||
}
|
||||
case PN_TRANSPORT:
|
||||
DBGPRINTF("handleProton: PN_TRANSPORT for %p:%s\n", pWrkrData, pData->azurehost);
|
||||
proton_check_condition(event, pWrkrData, pn_transport_condition(pn_event_transport(event)),
|
||||
"transport has data");
|
||||
break;
|
||||
case PN_TRANSPORT_CLOSED:
|
||||
DBGPRINTF("handleProton: transport closed for %p:%s\n", pWrkrData, pData->azurehost);
|
||||
proton_check_condition(event, pWrkrData, pn_transport_condition(pn_event_transport(event)),
|
||||
@ -1398,6 +1483,11 @@ static void handleProton(wrkrInstanceData_t *const pWrkrData, pn_event_t *event)
|
||||
proton_check_condition(event, pWrkrData, pn_connection_remote_condition(pn_event_connection(event)),
|
||||
"connection closed");
|
||||
break;
|
||||
case PN_SESSION_REMOTE_OPEN:
|
||||
DBGPRINTF("handleProton: remote session opened for %p:%s\n", pWrkrData, pData->azurehost);
|
||||
proton_check_condition(event, pWrkrData, pn_session_remote_condition(pn_event_session(event)),
|
||||
"remote session opened");
|
||||
break;
|
||||
case PN_SESSION_REMOTE_CLOSE:
|
||||
DBGPRINTF("handleProton: remote session closed for %p:%s\n", pWrkrData, pData->azurehost);
|
||||
proton_check_condition(event, pWrkrData, pn_session_remote_condition(pn_event_session(event)),
|
||||
|
||||
@ -4,7 +4,7 @@
|
||||
|
||||
export NUMMESSAGES=100
|
||||
export NUMMESSAGESFULL=$NUMMESSAGES
|
||||
export WAITTIMEOUT=20
|
||||
export WAITTIMEOUT=10
|
||||
|
||||
# REQUIRES EXTERNAL ENVIRONMENT VARIABLES
|
||||
if [[ -z "${AZURE_HOST}" ]]; then
|
||||
|
||||
@ -7,7 +7,7 @@ fi
|
||||
. ${srcdir:=.}/diag.sh init
|
||||
|
||||
# --- If test is needed, create helper script to store environment variables for
|
||||
# éventhubs access:
|
||||
# <EFBFBD>venthubs access:
|
||||
# export AZURE_HOST=""
|
||||
# export AZURE_PORT=""
|
||||
# export AZURE_KEY_NAME=""
|
||||
@ -29,6 +29,7 @@ export interrupt_host="$AZURE_HOST"
|
||||
export interrupt_port="$AZURE_PORT"
|
||||
export interrupt_tick="10"
|
||||
|
||||
TEST_TIMEOUT_WAIT=180
|
||||
|
||||
# REQUIRES EXTERNAL ENVIRONMENT VARIABLES
|
||||
if [[ -z "${AZURE_HOST}" ]]; then
|
||||
@ -113,7 +114,7 @@ startup
|
||||
echo Inject messages into rsyslog sender instance
|
||||
injectmsg 1 $NUMMESSAGES
|
||||
|
||||
wait_file_lines --interrupt-connection $interrupt_host $interrupt_port $interrupt_tick $RSYSLOG_OUT_LOG $NUMMESSAGESFULL 100
|
||||
wait_file_lines --interrupt-connection $interrupt_host $interrupt_port $interrupt_tick $RSYSLOG_OUT_LOG $NUMMESSAGESFULL $TEST_TIMEOUT_WAIT
|
||||
|
||||
timeoutend=$WAITTIMEOUT
|
||||
timecounter=0
|
||||
|
||||
@ -11,3 +11,45 @@
|
||||
fun:start_thread
|
||||
fun:clone
|
||||
}
|
||||
|
||||
# Proton OpenSSL initialization leak (library-level)
|
||||
{
|
||||
omazureeventhubs_pn_ssl_domain_leak
|
||||
Memcheck:Leak
|
||||
match-leak-kinds: definite
|
||||
fun:malloc
|
||||
fun:initialize
|
||||
fun:__pthread_once_slow
|
||||
fun:UnknownInlinedFun
|
||||
fun:pni_init_ssl_domain
|
||||
fun:pn_ssl_domain
|
||||
fun:openProton
|
||||
fun:setupProtonHandle
|
||||
fun:actionPrepare
|
||||
fun:processMsgMain
|
||||
fun:doSubmitToActionQ
|
||||
fun:execAct
|
||||
fun:scriptExec
|
||||
fun:execPROPFILT
|
||||
fun:scriptExec
|
||||
}
|
||||
|
||||
# Proton OpenSSL initialization leak (library-level) seen in interrupt test path
|
||||
{
|
||||
omazureeventhubs_pn_ssl_domain_leak_interrupt
|
||||
Memcheck:Leak
|
||||
match-leak-kinds: definite
|
||||
fun:malloc
|
||||
fun:initialize
|
||||
fun:__pthread_once_slow
|
||||
fun:UnknownInlinedFun
|
||||
fun:pni_init_ssl_domain
|
||||
fun:pn_ssl_domain
|
||||
fun:openProton
|
||||
fun:setupProtonHandle
|
||||
fun:actionPrepare
|
||||
fun:processMsgMain
|
||||
fun:processBatchMain
|
||||
fun:ConsumerReg
|
||||
fun:wtiWorker
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user