omelasticsearch: upgrade to v8 ouptut module interface

Note that this code here has NOT yet been tested.
This commit is contained in:
Rainer Gerhards 2013-10-30 18:12:32 +01:00
parent 8546cd810e
commit 63edb11d37
2 changed files with 123 additions and 100 deletions

View File

@ -69,8 +69,8 @@ STATSCOUNTER_DEF(indexESFail, mutIndexESFail)
typedef struct curl_slist HEADER;
typedef struct _instanceData {
int port;
int replyLen;
int fdErrFile; /* error file fd or -1 if not open */
pthread_mutex_t mutErrFile;
uchar *server;
uchar *uid;
uchar *pwd;
@ -80,24 +80,29 @@ typedef struct _instanceData {
uchar *tplName;
uchar *timeout;
uchar *bulkId;
uchar *restURL; /* last used URL for error reporting */
uchar *errorFile;
char *reply;
sbool dynSrchIdx;
sbool dynSrchType;
sbool dynParent;
sbool dynBulkId;
sbool bulkmode;
sbool asyncRepl;
} instanceData;
typedef struct wrkrInstanceData {
instanceData *pData;
int replyLen;
char *reply;
CURL *curlHandle; /* libcurl session handle */
HEADER *postHeader; /* json POST request info */
uchar *restURL; /* last used URL for error reporting */
struct {
es_str_t *data;
int nmemb; /* number of messages in batch (for statistics counting) */
uchar *currTpl1;
uchar *currTpl2;
} batch;
CURL *curlHandle; /* libcurl session handle */
HEADER *postHeader; /* json POST request info */
} instanceData;
} wrkrInstanceData_t;
/* tables for interfacing with the v6 config system */
@ -127,12 +132,30 @@ static struct cnfparamblk actpblk =
actpdescr
};
static rsRetVal curlSetup(wrkrInstanceData_t *pWrkrData, instanceData *pData);
BEGINcreateInstance
CODESTARTcreateInstance
pData->restURL = NULL;
pData->fdErrFile = -1;
pthread_mutex_init(&pData->mutErrFile, NULL);
ENDcreateInstance
BEGINcreateWrkrInstance
CODESTARTcreateWrkrInstance
pWrkrData->restURL = NULL;
if(pData->bulkmode) {
pWrkrData->batch.currTpl1 = NULL;
pWrkrData->batch.currTpl2 = NULL;
if((pWrkrData->batch.data = es_newStr(1024)) == NULL) {
DBGPRINTF("omelasticsearch: error creating batch string "
"turned off bulk mode\n");
pData->bulkmode = 0; /* at least it works */
}
}
CHKiRet(curlSetup(pWrkrData, pWrkrData->pData));
finalize_it:
ENDcreateWrkrInstance
BEGINisCompatibleWithFeature
CODESTARTisCompatibleWithFeature
if(eFeat == sFEATURERepeatedMsgReduction)
@ -141,16 +164,9 @@ ENDisCompatibleWithFeature
BEGINfreeInstance
CODESTARTfreeInstance
if (pData->postHeader) {
curl_slist_free_all(pData->postHeader);
pData->postHeader = NULL;
}
if (pData->curlHandle) {
curl_easy_cleanup(pData->curlHandle);
pData->curlHandle = NULL;
}
if(pData->fdErrFile != -1)
close(pData->fdErrFile);
pthread_mutex_destroy(&pData->mutErrFile);
free(pData->server);
free(pData->uid);
free(pData->pwd);
@ -159,11 +175,23 @@ CODESTARTfreeInstance
free(pData->parent);
free(pData->tplName);
free(pData->timeout);
free(pData->restURL);
free(pData->errorFile);
free(pData->bulkId);
ENDfreeInstance
BEGINfreeWrkrInstance
CODESTARTfreeWrkrInstance
if(pWrkrData->postHeader) {
curl_slist_free_all(pWrkrData->postHeader);
pWrkrData->postHeader = NULL;
}
if(pWrkrData->curlHandle) {
curl_easy_cleanup(pWrkrData->curlHandle);
pWrkrData->curlHandle = NULL;
}
free(pWrkrData->restURL);
ENDfreeWrkrInstance
BEGINdbgPrintInstInfo
CODESTARTdbgPrintInstInfo
dbgprintf("omelasticsearch\n");
@ -211,7 +239,7 @@ setBaseURL(instanceData *pData, es_str_t **url)
static inline rsRetVal
checkConn(instanceData *pData)
checkConn(wrkrInstanceData_t *pWrkrData)
{
es_str_t *url;
CURL *curl = NULL;
@ -219,7 +247,7 @@ checkConn(instanceData *pData)
char *cstr;
DEFiRet;
setBaseURL(pData, &url);
setBaseURL(pWrkrData->pData, &url);
curl = curl_easy_init();
if(curl == NULL) {
DBGPRINTF("omelasticsearch: checkConn() curl_easy_init() failed\n");
@ -235,16 +263,16 @@ checkConn(instanceData *pData)
curl_easy_setopt(curl, CURLOPT_URL, cstr);
free(cstr);
pData->reply = NULL;
pData->replyLen = 0;
curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData);
pWrkrData->reply = NULL;
pWrkrData->replyLen = 0;
curl_easy_setopt(curl, CURLOPT_WRITEDATA, pWrkrData);
res = curl_easy_perform(curl);
if(res != CURLE_OK) {
DBGPRINTF("omelasticsearch: checkConn() curl_easy_perform() "
"failed: %s\n", curl_easy_strerror(res));
ABORT_FINALIZE(RS_RET_SUSPENDED);
}
free(pData->reply);
free(pWrkrData->reply);
DBGPRINTF("omelasticsearch: checkConn() completed with success\n");
finalize_it:
@ -257,7 +285,7 @@ finalize_it:
BEGINtryResume
CODESTARTtryResume
DBGPRINTF("omelasticsearch: tryResume called\n");
iRet = checkConn(pData);
iRet = checkConn(pWrkrData);
ENDtryResume
@ -330,7 +358,7 @@ getIndexTypeAndParent(instanceData *pData, uchar **tpls,
static rsRetVal
setCurlURL(instanceData *pData, uchar **tpls)
setCurlURL(wrkrInstanceData_t *pWrkrData, instanceData *pData, uchar **tpls)
{
char authBuf[1024];
uchar *searchIndex;
@ -368,11 +396,11 @@ setCurlURL(instanceData *pData, uchar **tpls)
if(r == 0) r = es_addBuf(&url, (char*)parent, ustrlen(parent));
}
free(pData->restURL);
pData->restURL = (uchar*)es_str2cstr(url, NULL);
curl_easy_setopt(pData->curlHandle, CURLOPT_URL, pData->restURL);
free(pWrkrData->restURL);
pWrkrData->restURL = (uchar*)es_str2cstr(url, NULL);
curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_URL, pWrkrData->restURL);
es_deleteStr(url);
DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pData->restURL);
DBGPRINTF("omelasticsearch: using REST URL: '%s'\n", pWrkrData->restURL);
if(pData->uid != NULL) {
rLocal = snprintf(authBuf, sizeof(authBuf), "%s:%s", pData->uid,
@ -383,8 +411,8 @@ setCurlURL(instanceData *pData, uchar **tpls)
rLocal);
ABORT_FINALIZE(RS_RET_ERR);
}
curl_easy_setopt(pData->curlHandle, CURLOPT_USERPWD, authBuf);
curl_easy_setopt(pData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_USERPWD, authBuf);
curl_easy_setopt(pWrkrData->curlHandle, CURLOPT_PROXYAUTH, CURLAUTH_ANY);
}
finalize_it:
RETiRet;
@ -396,7 +424,7 @@ finalize_it:
* index changes.
*/
static rsRetVal
buildBatch(instanceData *pData, uchar *message, uchar **tpls)
buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
{
int length = strlen((char *)message);
int r;
@ -411,29 +439,29 @@ buildBatch(instanceData *pData, uchar *message, uchar **tpls)
# define META_ID "\", \"_id\":\""
# define META_END "\"}}\n"
getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId);
r = es_addBuf(&pData->batch.data, META_STRT, sizeof(META_STRT)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchIndex,
getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId);
r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1);
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex,
ustrlen(searchIndex));
if(r == 0) r = es_addBuf(&pData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)searchType,
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchType,
ustrlen(searchType));
if(parent != NULL) {
if(r == 0) r = es_addBuf(&pData->batch.data, META_PARENT, sizeof(META_PARENT)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)parent, ustrlen(parent));
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_PARENT, sizeof(META_PARENT)-1);
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)parent, ustrlen(parent));
}
if(bulkId != NULL) {
if(r == 0) r = es_addBuf(&pData->batch.data, META_ID, sizeof(META_ID)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)bulkId, ustrlen(bulkId));
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_ID, sizeof(META_ID)-1);
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)bulkId, ustrlen(bulkId));
}
if(r == 0) r = es_addBuf(&pData->batch.data, META_END, sizeof(META_END)-1);
if(r == 0) r = es_addBuf(&pData->batch.data, (char*)message, length);
if(r == 0) r = es_addBuf(&pData->batch.data, "\n", sizeof("\n")-1);
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_END, sizeof(META_END)-1);
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)message, length);
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, "\n", sizeof("\n")-1);
if(r != 0) {
DBGPRINTF("omelasticsearch: growing batch failed with code %d\n", r);
ABORT_FINALIZE(RS_RET_ERR);
}
++pData->batch.nmemb;
++pWrkrData->batch.nmemb;
iRet = RS_RET_DEFER_COMMIT;
finalize_it:
@ -446,7 +474,7 @@ finalize_it:
* needs to be closed, HUP must be sent.
*/
static inline rsRetVal
writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg)
writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg)
{
char *rendered = NULL;
cJSON *errRoot;
@ -454,6 +482,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg)
cJSON *replyRoot = *pReplyRoot;
size_t toWrite;
ssize_t wrRet;
sbool bMutLocked = 0;
char errStr[1024];
DEFiRet;
@ -463,6 +492,9 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg)
FINALIZE;
}
pthread_mutex_lock(&pData->mutErrFile);
bMutLocked = 1;
if(pData->fdErrFile == -1) {
pData->fdErrFile = open((char*)pData->errorFile,
O_WRONLY|O_CREAT|O_APPEND|O_LARGEFILE|O_CLOEXEC,
@ -474,7 +506,7 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg)
}
}
if((req=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pData->restURL));
cJSON_AddItemToObject(req, "url", cJSON_CreateString((char*)pWrkrData->restURL));
cJSON_AddItemToObject(req, "postdata", cJSON_CreateString((char*)reqmsg));
if((errRoot=cJSON_CreateObject()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
@ -495,13 +527,15 @@ writeDataError(instanceData *pData, cJSON **pReplyRoot, uchar *reqmsg)
*pReplyRoot = NULL; /* tell caller not to delete once again! */
finalize_it:
if(bMutLocked)
pthread_mutex_unlock(&pData->mutErrFile);
free(rendered);
RETiRet;
}
static inline rsRetVal
checkResultBulkmode(instanceData *pData, cJSON *root)
checkResultBulkmode(wrkrInstanceData_t *pWrkrData, cJSON *root)
{
int i;
int numitems;
@ -515,7 +549,7 @@ checkResultBulkmode(instanceData *pData, cJSON *root)
if(items == NULL || items->type != cJSON_Array) {
DBGPRINTF("omelasticsearch: error in elasticsearch reply: "
"bulkmode insert does not return array, reply is: %s\n",
pData->reply);
pWrkrData->reply);
ABORT_FINALIZE(RS_RET_DATAFAIL);
}
numitems = cJSON_GetArraySize(items);
@ -547,20 +581,20 @@ finalize_it:
static inline rsRetVal
checkResult(instanceData *pData, uchar *reqmsg)
checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg)
{
cJSON *root;
cJSON *ok;
DEFiRet;
root = cJSON_Parse(pData->reply);
root = cJSON_Parse(pWrkrData->reply);
if(root == NULL) {
DBGPRINTF("omelasticsearch: could not parse JSON result \n");
ABORT_FINALIZE(RS_RET_ERR);
}
if(pData->bulkmode) {
iRet = checkResultBulkmode(pData, root);
if(pWrkrData->pData->bulkmode) {
iRet = checkResultBulkmode(pWrkrData, root);
} else {
ok = cJSON_GetObjectItem(root, "ok");
if(ok == NULL || ok->type != cJSON_True) {
@ -572,7 +606,7 @@ checkResult(instanceData *pData, uchar *reqmsg)
* these in any case.
*/
if(iRet == RS_RET_DATAFAIL) {
writeDataError(pData, &root, reqmsg);
writeDataError(pWrkrData, pWrkrData->pData, &root, reqmsg);
iRet = RS_RET_OK; /* we have handled the problem! */
}
@ -587,19 +621,19 @@ finalize_it:
static rsRetVal
curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls, int nmsgs)
curlPost(wrkrInstanceData_t *pWrkrData, uchar *message, int msglen, uchar **tpls, int nmsgs)
{
CURLcode code;
CURL *curl = pData->curlHandle;
CURL *curl = pWrkrData->curlHandle;
DEFiRet;
pData->reply = NULL;
pData->replyLen = 0;
pWrkrData->reply = NULL;
pWrkrData->replyLen = 0;
if(pData->dynSrchIdx || pData->dynSrchType || pData->dynParent)
CHKiRet(setCurlURL(pData, tpls));
if(pWrkrData->pData->dynSrchIdx || pWrkrData->pData->dynSrchType || pWrkrData->pData->dynParent)
CHKiRet(setCurlURL(pWrkrData, pWrkrData->pData, tpls));
curl_easy_setopt(curl, CURLOPT_WRITEDATA, pData);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, pWrkrData);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, (char *)message);
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, msglen);
code = curl_easy_perform(curl);
@ -618,27 +652,27 @@ curlPost(instanceData *pData, uchar *message, int msglen, uchar **tpls, int nmsg
break;
}
DBGPRINTF("omelasticsearch: pData replyLen = '%d'\n", pData->replyLen);
if (pData->replyLen > 0) {
pData->reply[pData->replyLen] = '\0'; /* Append 0 Byte if replyLen is above 0 - byte has been reserved in malloc */
DBGPRINTF("omelasticsearch: pWrkrData replyLen = '%d'\n", pWrkrData->replyLen);
if(pWrkrData->replyLen > 0) {
pWrkrData->reply[pWrkrData->replyLen] = '\0'; /* Append 0 Byte if replyLen is above 0 - byte has been reserved in malloc */
}
DBGPRINTF("omelasticsearch: pData reply: '%s'\n", pData->reply);
DBGPRINTF("omelasticsearch: pWrkrData reply: '%s'\n", pWrkrData->reply);
CHKiRet(checkResult(pData, message));
CHKiRet(checkResult(pWrkrData, message));
finalize_it:
free(pData->reply);
free(pWrkrData->reply);
RETiRet;
}
BEGINbeginTransaction
CODESTARTbeginTransaction
dbgprintf("omelasticsearch: beginTransaction\n");
if(!pData->bulkmode) {
if(!pWrkrData->pData->bulkmode) {
FINALIZE;
}
es_emptyStr(pData->batch.data);
pData->batch.nmemb = 0;
es_emptyStr(pWrkrData->batch.data);
pWrkrData->batch.nmemb = 0;
finalize_it:
ENDbeginTransaction
@ -647,9 +681,9 @@ BEGINdoAction
CODESTARTdoAction
STATSCOUNTER_INC(indexSubmit, mutIndexSubmit);
if(pData->bulkmode) {
CHKiRet(buildBatch(pData, ppString[0], ppString));
CHKiRet(buildBatch(pWrkrData, ppString[0], ppString));
} else {
CHKiRet(curlPost(pData, ppString[0], strlen((char*)ppString[0]),
CHKiRet(curlPost(pWrkrData, ppString[0], strlen((char*)ppString[0]),
ppString, 1));
}
finalize_it:
@ -662,13 +696,13 @@ BEGINendTransaction
CODESTARTendTransaction
dbgprintf("omelasticsearch: endTransaction init\n");
/* End Transaction only if batch data is not empty */
if (pData->batch.data != NULL ) {
cstr = es_str2cstr(pData->batch.data, NULL);
if (pWrkrData->batch.data != NULL ) {
cstr = es_str2cstr(pWrkrData->batch.data, NULL);
dbgprintf("omelasticsearch: endTransaction, batch: '%s'\n", cstr);
CHKiRet(curlPost(pData, (uchar*) cstr, strlen(cstr), NULL, pData->batch.nmemb));
CHKiRet(curlPost(pWrkrData, (uchar*) cstr, strlen(cstr), NULL, pWrkrData->batch.nmemb));
}
else
dbgprintf("omelasticsearch: endTransaction, pData->batch.data is NULL, nothing to send. \n");
dbgprintf("omelasticsearch: endTransaction, pWrkrData->batch.data is NULL, nothing to send. \n");
finalize_it:
free(cstr);
dbgprintf("omelasticsearch: endTransaction done with %d\n", iRet);
@ -679,24 +713,24 @@ size_t
curlResult(void *ptr, size_t size, size_t nmemb, void *userdata)
{
char *p = (char *)ptr;
instanceData *pData = (instanceData*) userdata;
wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t*) userdata;
char *buf;
size_t newlen;
newlen = pData->replyLen + size*nmemb;
if((buf = realloc(pData->reply, newlen + 1)) == NULL) {
newlen = pWrkrData->replyLen + size*nmemb;
if((buf = realloc(pWrkrData->reply, newlen + 1)) == NULL) {
DBGPRINTF("omelasticsearch: realloc failed in curlResult\n");
return 0; /* abort due to failure */
}
memcpy(buf+pData->replyLen, p, size*nmemb);
pData->replyLen = newlen;
pData->reply = buf;
memcpy(buf+pWrkrData->replyLen, p, size*nmemb);
pWrkrData->replyLen = newlen;
pWrkrData->reply = buf;
return size*nmemb;
}
static rsRetVal
curlSetup(instanceData *pData)
curlSetup(wrkrInstanceData_t *pWrkrData, instanceData *pData)
{
HEADER *header;
CURL *handle;
@ -712,13 +746,13 @@ curlSetup(instanceData *pData)
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlResult);
curl_easy_setopt(handle, CURLOPT_POST, 1);
pData->curlHandle = handle;
pData->postHeader = header;
pWrkrData->curlHandle = handle;
pWrkrData->postHeader = header;
if( pData->bulkmode
|| (pData->dynSrchIdx == 0 && pData->dynSrchType == 0 && pData->dynParent == 0)) {
/* in this case, we know no tpls are involved in the request-->NULL OK! */
setCurlURL(pData, NULL);
setCurlURL(pWrkrData, pData, NULL);
}
if(Debug) {
@ -838,16 +872,6 @@ CODESTARTnewActInst
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
}
if(pData->bulkmode) {
pData->batch.currTpl1 = NULL;
pData->batch.currTpl2 = NULL;
if((pData->batch.data = es_newStr(1024)) == NULL) {
DBGPRINTF("omelasticsearch: error creating batch string "
"turned off bulk mode\n");
pData->bulkmode = 0; /* at least it works */
}
}
iNumTpls = 1;
if(pData->dynSrchIdx) ++iNumTpls;
if(pData->dynSrchType) ++iNumTpls;
@ -939,9 +963,6 @@ CODESTARTnewActInst
pData->searchIndex = (uchar*) strdup("system");
if(pData->searchType == NULL)
pData->searchType = (uchar*) strdup("events");
CHKiRet(curlSetup(pData));
CODE_STD_FINALIZERnewActInst
cnfparamvalsDestruct(pvals, &actpblk);
ENDnewActInst
@ -979,6 +1000,7 @@ ENDmodExit
BEGINqueryEtryPt
CODESTARTqueryEtryPt
CODEqueryEtryPt_STD_OMOD_QUERIES
CODEqueryEtryPt_STD_OMOD8_QUERIES
CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_doHUP

View File

@ -232,7 +232,7 @@ static rsRetVal isCompatibleWithFeature(syslogFeature __attribute__((unused)) eF
* introduced in v4.3.3 -- rgerhards, 2009-04-27
*/
#define BEGINbeginTransaction \
static rsRetVal beginTransaction(instanceData __attribute__((unused)) *pData)\
static rsRetVal beginTransaction(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\
{\
DEFiRet;
@ -422,8 +422,9 @@ static rsRetVal newInpInst(struct nvlst *lst)\
* rgerhard, 2007-08-02
*/
#define BEGINtryResume \
static rsRetVal tryResume(instanceData __attribute__((unused)) *pData)\
static rsRetVal tryResume(wrkrInstanceData_t __attribute__((unused)) *pWrkrData)\
{\
instanceData *pData = pWrkrData->pData; \
DEFiRet;
#define CODESTARTtryResume \