diff --git a/plugins/omkafka/omkafka.c b/plugins/omkafka/omkafka.c index ae74fb178..251eb094b 100644 --- a/plugins/omkafka/omkafka.c +++ b/plugins/omkafka/omkafka.c @@ -969,12 +969,11 @@ checkFailedMessages(instanceData *const __restrict__ pData) DEFiRet; /* Loop through failed messages, reprocess them first! */ - while (!LIST_EMPTY(&pData->failedmsg_head)) { - fmsgEntry = LIST_FIRST(&pData->failedmsg_head); - //assert(fmsgEntry == NULL); /* Avoids false positives in CLANG*/ + while ((fmsgEntry = LIST_FIRST(&pData->failedmsg_head)) != NULL) { /* Put back into kafka! */ iRet = writeKafka(pData, (uchar*) fmsgEntry->payload, NULL, fmsgEntry->topicname); if(iRet != RS_RET_OK) { + // TODO: LogError??? DBGPRINTF("omkafka: failed to delivery failed msg '%.*s' with status %d. " "- suspending AGAIN!\n", (int)(strlen((char*)fmsgEntry->payload)-1), @@ -986,6 +985,7 @@ checkFailedMessages(instanceData *const __restrict__ pData) (char*)fmsgEntry->payload); LIST_REMOVE(fmsgEntry, entries); free(fmsgEntry); + fmsgEntry = NULL; } } @@ -994,49 +994,43 @@ finalize_it: } /* This function persists failed messages into a data file, so they can - * Be resend on next startup. + * be resend on next startup. * alorbach, 2017-06-02 */ -static rsRetVal +static rsRetVal ATTR_NONNULL(1) persistFailedMsgs(instanceData *const __restrict__ pData) { DEFiRet; int fdMsgFile = -1; ssize_t nwritten; - /* Clear Failed Msg List */ - failedmsg_entry* fmsgEntry; - - fmsgEntry = LIST_FIRST(&pData->failedmsg_head); + failedmsg_entry* fmsgEntry = LIST_FIRST(&pData->failedmsg_head); if (fmsgEntry != NULL) { fdMsgFile = open((char*)pData->failedMsgFile, O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP); if(fdMsgFile == -1) { - char errStr[1024]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - DBGPRINTF("omkafka: persistFailedMsgs error opening failed msg file: %s\n", errStr); + LogError(errno, RS_RET_ERR, "omkafka: persistFailedMsgs error opening failed msg file"); ABORT_FINALIZE(RS_RET_ERR); } - /* Loop through failed messages, reprocess them first! */ while (fmsgEntry != NULL) { - /* Put into into kafka! */ - write(fdMsgFile, fmsgEntry->topicname, ustrlen(fmsgEntry->topicname) ); - write(fdMsgFile, "\t", 1); - nwritten = write(fdMsgFile, fmsgEntry->payload, ustrlen(fmsgEntry->payload) ); + nwritten = write(fdMsgFile, fmsgEntry->topicname, ustrlen(fmsgEntry->topicname) ); + if(nwritten != -1) + write(fdMsgFile, "\t", 1); + if(nwritten != -1) + nwritten = write(fdMsgFile, fmsgEntry->payload, ustrlen(fmsgEntry->payload) ); if(nwritten == -1) { - DBGPRINTF("omkafka: persistFailedMsgs error %d writing failed msg file\n", errno); + LogError(errno, RS_RET_ERR, "omkafka: persistFailedMsgs error writing failed msg file"); ABORT_FINALIZE(RS_RET_ERR); } else { DBGPRINTF("omkafka: persistFailedMsgs successfully written loaded msg '%.*s' for topic '%s'\n", (int)(strlen((char*)fmsgEntry->payload)-1), fmsgEntry->payload, fmsgEntry->topicname); } - /* Get next item */ fmsgEntry = LIST_NEXT(fmsgEntry, entries); } } else { - DBGPRINTF("omkafka: persistFailedMsgs We do not need to persist failed messages.\n"); + DBGPRINTF("omkafka: persistFailedMsgs: We do not need to persist failed messages.\n"); } finalize_it: if(fdMsgFile != -1) { @@ -1196,7 +1190,6 @@ CODESTARTfreeInstance /* Helpers for Failed Msg List */ failedmsg_entry* fmsgEntry1; failedmsg_entry* fmsgEntry2; - /* Close error file */ if(pData->fdErrFile != -1) close(pData->fdErrFile); /* Closing Kafka first! */ @@ -1213,13 +1206,13 @@ CODESTARTfreeInstance pthread_rwlock_unlock(&pData->rkLock); /* Delete Linked List for failed msgs */ - fmsgEntry1 = LIST_FIRST(&pData->failedmsg_head); + fmsgEntry1 = LIST_FIRST(&pData->failedmsg_head); while (fmsgEntry1 != NULL) { - fmsgEntry2 = LIST_NEXT(fmsgEntry1, entries); + fmsgEntry2 = LIST_NEXT(fmsgEntry1, entries); free(fmsgEntry1->payload); free(fmsgEntry1->topicname); free(fmsgEntry1); - fmsgEntry1 = fmsgEntry2; + fmsgEntry1 = fmsgEntry2; } LIST_INIT(&pData->failedmsg_head); /* Free other mem */