omkafka: refactor failed delivery handling and error reporting

This commit is contained in:
Rainer Gerhards 2017-11-17 10:44:02 +01:00
parent 010e9e8156
commit 19cebbf092

View File

@ -969,12 +969,11 @@ checkFailedMessages(instanceData *const __restrict__ pData)
DEFiRet;
/* Loop through failed messages, reprocess them first! */
while (!LIST_EMPTY(&pData->failedmsg_head)) {
fmsgEntry = LIST_FIRST(&pData->failedmsg_head);
//assert(fmsgEntry == NULL); /* Avoids false positives in CLANG*/
while ((fmsgEntry = LIST_FIRST(&pData->failedmsg_head)) != NULL) {
/* Put back into kafka! */
iRet = writeKafka(pData, (uchar*) fmsgEntry->payload, NULL, fmsgEntry->topicname);
if(iRet != RS_RET_OK) {
// TODO: LogError???
DBGPRINTF("omkafka: failed to delivery failed msg '%.*s' with status %d. "
"- suspending AGAIN!\n",
(int)(strlen((char*)fmsgEntry->payload)-1),
@ -986,6 +985,7 @@ checkFailedMessages(instanceData *const __restrict__ pData)
(char*)fmsgEntry->payload);
LIST_REMOVE(fmsgEntry, entries);
free(fmsgEntry);
fmsgEntry = NULL;
}
}
@ -994,49 +994,43 @@ finalize_it:
}
/* This function persists failed messages into a data file, so they can
* Be resend on next startup.
* be resend on next startup.
* alorbach, 2017-06-02
*/
static rsRetVal
static rsRetVal ATTR_NONNULL(1)
persistFailedMsgs(instanceData *const __restrict__ pData)
{
DEFiRet;
int fdMsgFile = -1;
ssize_t nwritten;
/* Clear Failed Msg List */
failedmsg_entry* fmsgEntry;
fmsgEntry = LIST_FIRST(&pData->failedmsg_head);
failedmsg_entry* fmsgEntry = LIST_FIRST(&pData->failedmsg_head);
if (fmsgEntry != NULL) {
fdMsgFile = open((char*)pData->failedMsgFile,
O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC,
S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
if(fdMsgFile == -1) {
char errStr[1024];
rs_strerror_r(errno, errStr, sizeof(errStr));
DBGPRINTF("omkafka: persistFailedMsgs error opening failed msg file: %s\n", errStr);
LogError(errno, RS_RET_ERR, "omkafka: persistFailedMsgs error opening failed msg file");
ABORT_FINALIZE(RS_RET_ERR);
}
/* Loop through failed messages, reprocess them first! */
while (fmsgEntry != NULL) {
/* Put into into kafka! */
write(fdMsgFile, fmsgEntry->topicname, ustrlen(fmsgEntry->topicname) );
write(fdMsgFile, "\t", 1);
nwritten = write(fdMsgFile, fmsgEntry->payload, ustrlen(fmsgEntry->payload) );
nwritten = write(fdMsgFile, fmsgEntry->topicname, ustrlen(fmsgEntry->topicname) );
if(nwritten != -1)
write(fdMsgFile, "\t", 1);
if(nwritten != -1)
nwritten = write(fdMsgFile, fmsgEntry->payload, ustrlen(fmsgEntry->payload) );
if(nwritten == -1) {
DBGPRINTF("omkafka: persistFailedMsgs error %d writing failed msg file\n", errno);
LogError(errno, RS_RET_ERR, "omkafka: persistFailedMsgs error writing failed msg file");
ABORT_FINALIZE(RS_RET_ERR);
} else {
DBGPRINTF("omkafka: persistFailedMsgs successfully written loaded msg '%.*s' for topic '%s'\n",
(int)(strlen((char*)fmsgEntry->payload)-1), fmsgEntry->payload, fmsgEntry->topicname);
}
/* Get next item */
fmsgEntry = LIST_NEXT(fmsgEntry, entries);
}
} else {
DBGPRINTF("omkafka: persistFailedMsgs We do not need to persist failed messages.\n");
DBGPRINTF("omkafka: persistFailedMsgs: We do not need to persist failed messages.\n");
}
finalize_it:
if(fdMsgFile != -1) {
@ -1196,7 +1190,6 @@ CODESTARTfreeInstance
/* Helpers for Failed Msg List */
failedmsg_entry* fmsgEntry1;
failedmsg_entry* fmsgEntry2;
/* Close error file */
if(pData->fdErrFile != -1)
close(pData->fdErrFile);
/* Closing Kafka first! */
@ -1213,13 +1206,13 @@ CODESTARTfreeInstance
pthread_rwlock_unlock(&pData->rkLock);
/* Delete Linked List for failed msgs */
fmsgEntry1 = LIST_FIRST(&pData->failedmsg_head);
fmsgEntry1 = LIST_FIRST(&pData->failedmsg_head);
while (fmsgEntry1 != NULL) {
fmsgEntry2 = LIST_NEXT(fmsgEntry1, entries);
fmsgEntry2 = LIST_NEXT(fmsgEntry1, entries);
free(fmsgEntry1->payload);
free(fmsgEntry1->topicname);
free(fmsgEntry1);
fmsgEntry1 = fmsgEntry2;
fmsgEntry1 = fmsgEntry2;
}
LIST_INIT(&pData->failedmsg_head);
/* Free other mem */