omelasticsearch: add dynamic pipline support

This commit is contained in:
Niels Becker 2017-10-26 14:29:57 +02:00
parent 0f5b086c71
commit 7931581d49

View File

@ -108,6 +108,7 @@ typedef struct _instanceData {
sbool dynSrchType;
sbool dynParent;
sbool dynBulkId;
sbool dynPipelineName;
sbool bulkmode;
size_t maxbytes;
sbool useHttps;
@ -149,13 +150,14 @@ static struct cnfparamdescr actpdescr[] = {
{ "bulkmode", eCmdHdlrBinary, 0 },
{ "maxbytes", eCmdHdlrSize, 0 },
{ "asyncrepl", eCmdHdlrGoneAway, 0 },
{ "usehttps", eCmdHdlrBinary, 0 },
{ "usehttps", eCmdHdlrBinary, 0 },
{ "timeout", eCmdHdlrGetWord, 0 },
{ "errorfile", eCmdHdlrGetWord, 0 },
{ "erroronly", eCmdHdlrBinary, 0 },
{ "interleaved", eCmdHdlrBinary, 0 },
{ "template", eCmdHdlrGetWord, 0 },
{ "dynbulkid", eCmdHdlrBinary, 0 },
{ "dynpipelinename", eCmdHdlrBinary, 0 },
{ "bulkid", eCmdHdlrGetWord, 0 },
{ "allowunsignedcerts", eCmdHdlrBinary, 0 }
};
@ -260,6 +262,7 @@ CODESTARTdbgPrintInstInfo
dbgprintf("\tsearch index='%s'\n", pData->searchIndex);
dbgprintf("\tsearch type='%s'\n", pData->searchType);
dbgprintf("\tpipeline name='%s'\n", pData->pipelineName);
dbgprintf("\tdynamic pipeline name=%d\n", pData->dynPipelineName);
dbgprintf("\tparent='%s'\n", pData->parent);
dbgprintf("\ttimeout='%s'\n", pData->timeout);
dbgprintf("\tdynamic search index=%d\n", pData->dynSrchIdx);
@ -408,75 +411,39 @@ ENDtryResume
static void
getIndexTypeAndParent(instanceData *pData, uchar **tpls,
uchar **srchIndex, uchar **srchType, uchar **parent,
uchar **bulkId)
uchar **bulkId, uchar **pipelineName)
{
*srchIndex = pData->searchIndex;
*parent = pData->parent;
*srchType = pData->searchType;
*bulkId = pData->bulkId;
*pipelineName = pData->pipelineName;
if(tpls == NULL) {
*srchIndex = pData->searchIndex;
*parent = pData->parent;
*srchType = pData->searchType;
*bulkId = NULL;
goto done;
}
int iNumTpls = 1;
if(pData->dynSrchIdx) {
*srchIndex = tpls[1];
if(pData->dynSrchType) {
*srchType = tpls[2];
if(pData->dynParent) {
*parent = tpls[3];
if(pData->dynBulkId) {
*bulkId = tpls[4];
}
} else {
*parent = pData->parent;
if(pData->dynBulkId) {
*bulkId = tpls[3];
}
}
} else {
*srchType = pData->searchType;
if(pData->dynParent) {
*parent = tpls[2];
if(pData->dynBulkId) {
*bulkId = tpls[3];
}
} else {
*parent = pData->parent;
if(pData->dynBulkId) {
*bulkId = tpls[2];
}
}
}
} else {
*srchIndex = pData->searchIndex;
if(pData->dynSrchType) {
*srchType = tpls[1];
if(pData->dynParent) {
*parent = tpls[2];
if(pData->dynBulkId) {
*bulkId = tpls[3];
}
} else {
*parent = pData->parent;
if(pData->dynBulkId) {
*bulkId = tpls[2];
}
}
} else {
*srchType = pData->searchType;
if(pData->dynParent) {
*parent = tpls[1];
if(pData->dynBulkId) {
*bulkId = tpls[2];
}
} else {
*parent = pData->parent;
if(pData->dynBulkId) {
*bulkId = tpls[1];
}
}
}
*srchIndex = tpls[iNumTpls];
++iNumTpls;
}
if(pData->dynSrchType) {
*srchType = tpls[iNumTpls];
++iNumTpls;
}
if(pData->dynParent) {
*parent = tpls[iNumTpls];
++iNumTpls;
}
if(pData->dynBulkId) {
*bulkId = tpls[iNumTpls];
++iNumTpls;
}
if(pData->dynPipelineName) {
*pipelineName = tpls[iNumTpls];
++iNumTpls;
}
done: return;
}
@ -504,13 +471,12 @@ setPostURL(wrkrInstanceData_t *pWrkrData, instanceData *pData, uchar **tpls)
}
separator = '?';
pipelineName = pData->pipelineName;
if(bulkmode) {
r = es_addBuf(&url, "_bulk", sizeof("_bulk")-1);
parent = NULL;
} else {
getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId);
getIndexTypeAndParent(pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName);
r = es_addBuf(&url, (char*)searchIndex, ustrlen(searchIndex));
if(r == 0) r = es_addChar(&url, '/');
if(r == 0) r = es_addBuf(&url, (char*)searchType, ustrlen(searchType));
@ -561,9 +527,9 @@ computeMessageSize(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
uchar *searchType;
uchar *parent = NULL;
uchar *bulkId = NULL;
uchar *pipelineName = pWrkrData->pData->pipelineName;
uchar *pipelineName;
getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId);
getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName);
r += ustrlen((char *)message) + ustrlen(searchIndex) + ustrlen(searchType);
if(parent != NULL) {
@ -593,10 +559,10 @@ buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
uchar *searchType;
uchar *parent = NULL;
uchar *bulkId = NULL;
uchar *pipelineName = pWrkrData->pData->pipelineName;
uchar *pipelineName;
DEFiRet;
getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId);
getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId, &pipelineName);
r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1);
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex,
ustrlen(searchIndex));
@ -1383,6 +1349,7 @@ setInstParamDefaults(instanceData *pData)
pData->searchIndex = NULL;
pData->searchType = NULL;
pData->pipelineName = NULL;
pData->dynPipelineName = 0;
pData->parent = NULL;
pData->timeout = NULL;
pData->dynSrchIdx = 0;
@ -1421,9 +1388,9 @@ CODESTARTnewActInst
servers = pvals[i].val.d.ar;
} else if(!strcmp(actpblk.descr[i].name, "errorfile")) {
pData->errorFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
}else if(!strcmp(actpblk.descr[i].name, "erroronly")) {
} else if(!strcmp(actpblk.descr[i].name, "erroronly")) {
pData->errorOnly = pvals[i].val.d.n;
}else if(!strcmp(actpblk.descr[i].name, "interleaved")) {
} else if(!strcmp(actpblk.descr[i].name, "interleaved")) {
pData->interleaved = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "serverport")) {
pData->defaultPort = (int) pvals[i].val.d.n;
@ -1439,6 +1406,8 @@ CODESTARTnewActInst
pData->searchType = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "pipelinename")) {
pData->pipelineName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "dynpipelinename")) {
pData->dynPipelineName = pvals[i].val.d.n;
} else if(!strcmp(actpblk.descr[i].name, "parent")) {
pData->parent = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else if(!strcmp(actpblk.descr[i].name, "dynsearchindex")) {
@ -1499,6 +1468,12 @@ CODESTARTnewActInst
"name for bulkid template given - action definition invalid");
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
}
if(pData->dynPipelineName && pData->pipelineName == NULL) {
errmsg.LogError(0, RS_RET_CONFIG_ERROR,
"omelasticsearch: requested dynamic pipeline name, but no "
"name for pipelineName template given - action definition invalid");
ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
}
if (pData->uid != NULL)
CHKiRet(computeAuthHeader((char*) pData->uid, (char*) pData->pwd, &pData->authBuf));
@ -1508,6 +1483,7 @@ CODESTARTnewActInst
if(pData->dynSrchType) ++iNumTpls;
if(pData->dynParent) ++iNumTpls;
if(pData->dynBulkId) ++iNumTpls;
if(pData->dynPipelineName) ++iNumTpls;
DBGPRINTF("omelasticsearch: requesting %d templates\n", iNumTpls);
CODE_STD_STRING_REQUESTnewActInst(iNumTpls)
@ -1520,73 +1496,33 @@ CODESTARTnewActInst
* it will always be string 1. Type may be 1 or 2, depending on whether search
* index is dynamic as well. Rule needs to be followed throughout the module.
*/
iNumTpls = 1;
if(pData->dynSrchIdx) {
CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchIndex),
CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->searchIndex),
OMSR_NO_RQD_TPL_OPTS));
if(pData->dynSrchType) {
CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->searchType),
OMSR_NO_RQD_TPL_OPTS));
if(pData->dynParent) {
CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->parent),
OMSR_NO_RQD_TPL_OPTS));
if(pData->dynBulkId) {
CHKiRet(OMSRsetEntry(*ppOMSR, 4, ustrdup(pData->bulkId),
OMSR_NO_RQD_TPL_OPTS));
}
} else {
if(pData->dynBulkId) {
CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId),
OMSR_NO_RQD_TPL_OPTS));
}
}
} else {
if(pData->dynParent) {
CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent),
OMSR_NO_RQD_TPL_OPTS));
if(pData->dynBulkId) {
CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId),
OMSR_NO_RQD_TPL_OPTS));
}
} else {
if(pData->dynBulkId) {
CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId),
OMSR_NO_RQD_TPL_OPTS));
}
}
}
} else {
if(pData->dynSrchType) {
CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->searchType),
OMSR_NO_RQD_TPL_OPTS));
if(pData->dynParent) {
CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->parent),
OMSR_NO_RQD_TPL_OPTS));
if(pData->dynBulkId) {
CHKiRet(OMSRsetEntry(*ppOMSR, 3, ustrdup(pData->bulkId),
OMSR_NO_RQD_TPL_OPTS));
}
} else {
if(pData->dynBulkId) {
CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId),
OMSR_NO_RQD_TPL_OPTS));
}
}
} else {
if(pData->dynParent) {
CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->parent),
OMSR_NO_RQD_TPL_OPTS));
if(pData->dynBulkId) {
CHKiRet(OMSRsetEntry(*ppOMSR, 2, ustrdup(pData->bulkId),
OMSR_NO_RQD_TPL_OPTS));
}
} else {
if(pData->dynBulkId) {
CHKiRet(OMSRsetEntry(*ppOMSR, 1, ustrdup(pData->bulkId),
OMSR_NO_RQD_TPL_OPTS));
}
}
}
++iNumTpls;
}
if(pData->dynSrchType) {
CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->searchType),
OMSR_NO_RQD_TPL_OPTS));
++iNumTpls;
}
if(pData->dynParent) {
CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->parent),
OMSR_NO_RQD_TPL_OPTS));
++iNumTpls;
}
if(pData->dynBulkId) {
CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->bulkId),
OMSR_NO_RQD_TPL_OPTS));
++iNumTpls;
}
if(pData->dynPipelineName) {
CHKiRet(OMSRsetEntry(*ppOMSR, iNumTpls, ustrdup(pData->pipelineName),
OMSR_NO_RQD_TPL_OPTS));
++iNumTpls;
}
if (servers != NULL) {
pData->numServers = servers->nmemb;