From 989be897340eb458b00efedfd5e082bb362db79a Mon Sep 17 00:00:00 2001 From: Rich Megginson Date: Tue, 15 May 2018 16:03:25 -0600 Subject: [PATCH 11/11] omelasticsearch: write op types; bulk rejection retries Add support for a 'create' write operation type in addition to the default 'index'. Using create allows specifying a unique id for each record, and allows duplicate document detection. Add support for checking each record returned in a bulk index request response. Allow specifying a ruleset to send each failed record to. Add a local variable `omes` which contains the information in the error response, so that users can control how to handle responses e.g. retry, or send to an error file. Add support for response stats - count successes, duplicates, and different types of failures. Add testing for bulk index rejections. (cherry picked from commit 57dd368a2a915d79c94a8dc0de30c93a0bbdc8fe) (cherry picked from commit 30a15621e1e7e393b2153e9fe5c13f724dea25b5) --- plugins/omelasticsearch/omelasticsearch.c | 441 ++++++++++++++++++++++++++++-- 1 file changed, 415 insertions(+), 26 deletions(-) diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c index ed2b47535..ca61ae28f 100644 --- a/plugins/omelasticsearch/omelasticsearch.c +++ b/plugins/omelasticsearch/omelasticsearch.c @@ -51,6 +51,8 @@ #include "statsobj.h" #include "cfsysline.h" #include "unicode-helper.h" +#include "ratelimit.h" +#include "ruleset.h" #ifndef O_LARGEFILE # define O_LARGEFILE 0 @@ -64,6 +66,8 @@ MODULE_CNFNAME("omelasticsearch") DEF_OMOD_STATIC_DATA DEFobjCurrIf(errmsg) DEFobjCurrIf(statsobj) +DEFobjCurrIf(prop) +DEFobjCurrIf(ruleset) statsobj_t *indexStats; STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit) @@ -71,19 +75,35 @@ STATSCOUNTER_DEF(indexHTTPFail, mutIndexHTTPFail) STATSCOUNTER_DEF(indexHTTPReqFail, mutIndexHTTPReqFail) STATSCOUNTER_DEF(checkConnFail, mutCheckConnFail) STATSCOUNTER_DEF(indexESFail, mutIndexESFail) +STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess) +STATSCOUNTER_DEF(indexBadResponse, mutIndexBadResponse) +STATSCOUNTER_DEF(indexDuplicate, mutIndexDuplicate) +STATSCOUNTER_DEF(indexBadArgument, mutIndexBadArgument) +STATSCOUNTER_DEF(indexBulkRejection, mutIndexBulkRejection) +STATSCOUNTER_DEF(indexOtherResponse, mutIndexOtherResponse) +static prop_t *pInputName = NULL; # define META_STRT "{\"index\":{\"_index\": \"" +# define META_STRT_CREATE "{\"create\":{\"_index\": \"" # define META_TYPE "\",\"_type\":\"" # define META_PARENT "\",\"_parent\":\"" # define META_ID "\", \"_id\":\"" # define META_END "\"}}\n" +typedef enum { + ES_WRITE_INDEX, + ES_WRITE_CREATE, + ES_WRITE_UPDATE, /* not supported */ + ES_WRITE_UPSERT /* not supported */ +} es_write_ops_t; + /* REST API for elasticsearch hits this URL: * http://:// */ +/* bulk API uses /_bulk */ typedef struct curl_slist HEADER; -typedef struct _instanceData { +typedef struct instanceConf_s { int defaultPort; int fdErrFile; /* error file fd or -1 if not open */ pthread_mutex_t mutErrFile; @@ -113,8 +133,25 @@ typedef struct _instanceData { uchar *caCertFile; uchar *myCertFile; uchar *myPrivKeyFile; + es_write_ops_t writeOperation; + sbool retryFailures; + int ratelimitInterval; + int ratelimitBurst; + /* for retries */ + ratelimit_t *ratelimiter; + uchar *retryRulesetName; + ruleset_t *retryRuleset; + struct instanceConf_s *next; } instanceData; +typedef instanceConf_t instanceData; + +struct modConfData_s { + rsconf_t *pConf; /* our overall config object */ + instanceConf_t *root, *tail; +}; +static modConfData_t *loadModConf = NULL; /* modConf ptr to use for the current load process */ + typedef struct wrkrInstanceData { instanceData *pData; int serverIndex; @@ -160,7 +197,12 @@ static struct cnfparamdescr actpdescr[] = { { "allowunsignedcerts", eCmdHdlrBinary, 0 }, { "tls.cacert", eCmdHdlrString, 0 }, { "tls.mycert", eCmdHdlrString, 0 }, - { "tls.myprivkey", eCmdHdlrString, 0 } + { "tls.myprivkey", eCmdHdlrString, 0 }, + { "writeoperation", eCmdHdlrGetWord, 0 }, + { "retryfailures", eCmdHdlrBinary, 0 }, + { "ratelimit.interval", eCmdHdlrInt, 0 }, + { "ratelimit.burst", eCmdHdlrInt, 0 }, + { "retryruleset", eCmdHdlrString, 0 } }; static struct cnfparamblk actpblk = { CNFPARAMBLK_VERSION, @@ -177,6 +219,9 @@ CODESTARTcreateInstance pData->caCertFile = NULL; pData->myCertFile = NULL; pData->myPrivKeyFile = NULL; + pData->ratelimiter = NULL; + pData->retryRulesetName = NULL; + pData->retryRuleset = NULL; ENDcreateInstance BEGINcreateWrkrInstance @@ -228,6 +273,9 @@ CODESTARTfreeInstance free(pData->caCertFile); free(pData->myCertFile); free(pData->myPrivKeyFile); + free(pData->retryRulesetName); + if (pData->ratelimiter != NULL) + ratelimitDestruct(pData->ratelimiter); ENDfreeInstance BEGINfreeWrkrInstance @@ -285,6 +333,10 @@ CODESTARTdbgPrintInstInfo dbgprintf("\ttls.cacert='%s'\n", pData->caCertFile); dbgprintf("\ttls.mycert='%s'\n", pData->myCertFile); dbgprintf("\ttls.myprivkey='%s'\n", pData->myPrivKeyFile); + dbgprintf("\twriteoperation='%d'\n", pData->writeOperation); + dbgprintf("\tretryfailures='%d'\n", pData->retryFailures); + dbgprintf("\tratelimit.interval='%d'\n", pData->ratelimitInterval); + dbgprintf("\tratelimit.burst='%d'\n", pData->ratelimitBurst); ENDdbgPrintInstInfo @@ -557,7 +609,11 @@ finalize_it: static size_t computeMessageSize(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls) { - size_t r = sizeof(META_STRT)-1 + sizeof(META_TYPE)-1 + sizeof(META_END)-1 + sizeof("\n")-1; + size_t r = sizeof(META_TYPE)-1 + sizeof(META_END)-1 + sizeof("\n")-1; + if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE) + r += sizeof(META_STRT_CREATE)-1; + else + r += sizeof(META_STRT)-1; uchar *searchIndex = 0; uchar *searchType; @@ -594,7 +650,10 @@ buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls) DEFiRet; getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId); - r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1); + if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE) + r = es_addBuf(&pWrkrData->batch.data, META_STRT_CREATE, sizeof(META_STRT_CREATE)-1); + else + r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1); if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex, ustrlen(searchIndex)); if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1); @@ -709,13 +768,20 @@ static int checkReplyStatus(fjson_object* ok) { /* * Context object for error file content creation or status check + * response_item - the full {"create":{"_index":"idxname",.....}} + * response_body - the inner hash of the response_item - {"_index":"idxname",...} + * status - the "status" field from the inner hash - "status":500 + * should be able to use fjson_object_get_int(status) to get the http result code */ typedef struct exeContext{ int statusCheckOnly; fjson_object *errRoot; - rsRetVal (*prepareErrorFileContent)(struct exeContext *ctx,int itemStatus,char *request,char *response); - - + rsRetVal (*prepareErrorFileContent)(struct exeContext *ctx,int itemStatus,char *request,char *response, + fjson_object *response_item, fjson_object *response_body, fjson_object *status); + es_write_ops_t writeOperation; + ratelimit_t *ratelimiter; + ruleset_t *retryRuleset; + struct json_tokener *jTokener; } context; /* @@ -728,8 +794,15 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p fjson_object *replyRoot = *pReplyRoot; int i; int numitems; - fjson_object *items=NULL; + fjson_object *items=NULL, *jo_errors = NULL; + int errors = 0; + if(fjson_object_object_get_ex(replyRoot, "errors", &jo_errors)) { + errors = fjson_object_get_boolean(jo_errors); + if (!errors && pWrkrData->pData->retryFailures) { + return RS_RET_OK; + } + } /*iterate over items*/ if(!fjson_object_object_get_ex(replyRoot, "items", &items)) { @@ -741,7 +814,11 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p numitems = fjson_object_array_length(items); - DBGPRINTF("omelasticsearch: Entire request %s\n",reqmsg); + if (reqmsg) { + DBGPRINTF("omelasticsearch: Entire request %s\n", reqmsg); + } else { + DBGPRINTF("omelasticsearch: Empty request\n"); + } const char *lastReqRead= (char*)reqmsg; DBGPRINTF("omelasticsearch: %d items in reply\n", numitems); @@ -769,8 +846,7 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p char *request =0; char *response =0; - if(ctx->statusCheckOnly) - { + if(ctx->statusCheckOnly || (NULL == lastReqRead)) { if(itemStatus) { DBGPRINTF("omelasticsearch: error in elasticsearch reply: item %d, status is %d\n", i, fjson_object_get_int(ok)); DBGPRINTF("omelasticsearch: status check found error.\n"); @@ -795,7 +871,8 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p } /*call the context*/ - rsRetVal ret = ctx->prepareErrorFileContent(ctx, itemStatus, request,response); + rsRetVal ret = ctx->prepareErrorFileContent(ctx, itemStatus, request, + response, item, result, ok); /*free memory in any case*/ free(request); @@ -818,11 +895,14 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p * Dumps only failed requests of bulk insert */ static rsRetVal -getDataErrorOnly(context *ctx,int itemStatus,char *request,char *response) +getDataErrorOnly(context *ctx,int itemStatus,char *request,char *response, + fjson_object *response_item, fjson_object *response_body, fjson_object *status) { DEFiRet; - if(itemStatus) - { + (void)response_item; /* unused */ + (void)response_body; /* unused */ + (void)status; /* unused */ + if(itemStatus) { fjson_object *onlyErrorResponses =NULL; fjson_object *onlyErrorRequests=NULL; @@ -855,9 +935,16 @@ static rsRetVal getDataInterleaved(context *ctx, int __attribute__((unused)) itemStatus, char *request, - char *response) + char *response, + fjson_object *response_item, + fjson_object *response_body, + fjson_object *status +) { DEFiRet; + (void)response_item; /* unused */ + (void)response_body; /* unused */ + (void)status; /* unused */ fjson_object *interleaved =NULL; if(!fjson_object_object_get_ex(ctx->errRoot, "response", &interleaved)) { @@ -889,11 +976,13 @@ getDataInterleaved(context *ctx, */ static rsRetVal -getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *response) +getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *response, + fjson_object *response_item, fjson_object *response_body, fjson_object *status) { DEFiRet; if (itemStatus) { - if(getDataInterleaved(ctx, itemStatus,request,response)!= RS_RET_OK) { + if(getDataInterleaved(ctx, itemStatus,request,response, + response_item, response_body, status)!= RS_RET_OK) { ABORT_FINALIZE(RS_RET_ERR); } } @@ -902,6 +991,141 @@ getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *resp RETiRet; } +/* request string looks like this: + * "{\"create\":{\"_index\": \"rsyslog_testbench\",\"_type\":\"test-type\", + * \"_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}}\n + * {\"msgnum\":\"x00000000\",\"viaq_msg_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}\n" + * we don't want the meta header, only the data part + * start = first \n + 1 + * end = last \n + */ +static rsRetVal +createMsgFromRequest(const char *request, context *ctx, smsg_t **msg) +{ + DEFiRet; + fjson_object *jo_msg = NULL; + const char *datastart, *dataend; + size_t datalen; + enum json_tokener_error json_error; + + *msg = NULL; + if (!(datastart = strchr(request, '\n')) || (datastart[1] != '{')) { + LogError(0, RS_RET_ERR, + "omelasticsearch: malformed original request - " + "could not find start of original data [%s]", + request); + ABORT_FINALIZE(RS_RET_ERR); + } + datastart++; /* advance to { */ + if (!(dataend = strchr(datastart, '\n')) || (dataend[1] != '\0')) { + LogError(0, RS_RET_ERR, + "omelasticsearch: malformed original request - " + "could not find end of original data [%s]", + request); + ABORT_FINALIZE(RS_RET_ERR); + } + datalen = dataend - datastart; + json_tokener_reset(ctx->jTokener); + fjson_object *jo_request = json_tokener_parse_ex(ctx->jTokener, datastart, datalen); + json_error = fjson_tokener_get_error(ctx->jTokener); + if (!jo_request || (json_error != fjson_tokener_success)) { + LogError(0, RS_RET_ERR, + "omelasticsearch: parse error [%s] - could not convert original " + "request JSON back into JSON object [%s]", + fjson_tokener_error_desc(json_error), request); + ABORT_FINALIZE(RS_RET_ERR); + } + + CHKiRet(msgConstruct(msg)); + MsgSetFlowControlType(*msg, eFLOWCTL_FULL_DELAY); + MsgSetInputName(*msg, pInputName); + if (fjson_object_object_get_ex(jo_request, "message", &jo_msg)) { + const char *rawmsg = json_object_get_string(jo_msg); + const size_t msgLen = (size_t)json_object_get_string_len(jo_msg); + MsgSetRawMsg(*msg, rawmsg, msgLen); + } else { + MsgSetRawMsg(*msg, request, strlen(request)); + } + MsgSetMSGoffs(*msg, 0); /* we do not have a header... */ + CHKiRet(msgAddJSON(*msg, (uchar*)"!", jo_request, 0, 0)); + + finalize_it: + RETiRet; + +} + + +static rsRetVal +getDataRetryFailures(context *ctx,int itemStatus,char *request,char *response, + fjson_object *response_item, fjson_object *response_body, fjson_object *status) +{ + DEFiRet; + fjson_object *omes = NULL, *jo = NULL; + int istatus = fjson_object_get_int(status); + int iscreateop = 0; + struct json_object_iterator it = json_object_iter_begin(response_item); + struct json_object_iterator itEnd = json_object_iter_end(response_item); + const char *optype = NULL; + smsg_t *msg = NULL; + + (void)response; + (void)itemStatus; + CHKiRet(createMsgFromRequest(request, ctx, &msg)); + CHKmalloc(msg); + /* add status as local variables */ + omes = json_object_new_object(); + if (!json_object_iter_equal(&it, &itEnd)) + optype = json_object_iter_peek_name(&it); + if (optype && !strcmp("create", optype)) + iscreateop = 1; + if (optype && !strcmp("index", optype) && (ctx->writeOperation == ES_WRITE_INDEX)) + iscreateop = 1; + if (optype) { + jo = json_object_new_string(optype); + } else { + jo = json_object_new_string("unknown"); + } + json_object_object_add(omes, "writeoperation", jo); + + if (!optype) { + STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse); + } else if ((istatus == 200) || (istatus == 201)) { + STATSCOUNTER_INC(indexSuccess, mutIndexSuccess); + } else if ((istatus == 409) && iscreateop) { + STATSCOUNTER_INC(indexDuplicate, mutIndexDuplicate); + } else if (istatus == 400 || (istatus < 200)) { + STATSCOUNTER_INC(indexBadArgument, mutIndexBadArgument); + } else { + fjson_object *error = NULL, *errtype = NULL; + if(fjson_object_object_get_ex(response_body, "error", &error) && + fjson_object_object_get_ex(error, "type", &errtype)) { + if (istatus == 429) { + STATSCOUNTER_INC(indexBulkRejection, mutIndexBulkRejection); + } else { + STATSCOUNTER_INC(indexOtherResponse, mutIndexOtherResponse); + } + } else { + STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse); + } + } + /* add response_body fields to local var omes */ + it = json_object_iter_begin(response_body); + itEnd = json_object_iter_end(response_body); + while (!json_object_iter_equal(&it, &itEnd)) { + json_object_object_add(omes, json_object_iter_peek_name(&it), + json_object_get(json_object_iter_peek_value(&it))); + json_object_iter_next(&it); + } + CHKiRet(msgAddJSON(msg, (uchar*)".omes", omes, 0, 0)); + omes = NULL; + MsgSetRuleset(msg, ctx->retryRuleset); + CHKiRet(ratelimitAddMsg(ctx->ratelimiter, NULL, msg)); +finalize_it: + if (omes) + json_object_put(omes); + RETiRet; +} + /* * get erroronly context */ @@ -979,6 +1203,23 @@ initializeErrorInterleavedConext(wrkrInstanceData_t *pWrkrData,context *ctx){ RETiRet; } +/*get retry failures context*/ +static rsRetVal +initializeRetryFailuresContext(wrkrInstanceData_t *pWrkrData,context *ctx){ + DEFiRet; + ctx->statusCheckOnly=0; + fjson_object *errRoot=NULL; + if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR); + + + fjson_object_object_add(errRoot, "url", fjson_object_new_string((char*)pWrkrData->restURL)); + ctx->errRoot = errRoot; + ctx->prepareErrorFileContent= &getDataRetryFailures; + CHKmalloc(ctx->jTokener = json_tokener_new()); + finalize_it: + RETiRet; +} + /* write data error request/replies to separate error file * Note: we open the file but never close it before exit. If it @@ -994,6 +1235,10 @@ writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, fjson_object char errStr[1024]; context ctx; ctx.errRoot=0; + ctx.writeOperation = pWrkrData->pData->writeOperation; + ctx.ratelimiter = pWrkrData->pData->ratelimiter; + ctx.retryRuleset = pWrkrData->pData->retryRuleset; + ctx.jTokener = NULL; DEFiRet; if(pData->errorFile == NULL) { @@ -1039,9 +1284,12 @@ writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, fjson_object DBGPRINTF("omelasticsearch: error initializing error interleaved context.\n"); ABORT_FINALIZE(RS_RET_ERR); } - } - else - { + } else if(pData->retryFailures) { + if(initializeRetryFailuresContext(pWrkrData, &ctx) != RS_RET_OK) { + DBGPRINTF("omelasticsearch: error initializing retry failures context.\n"); + ABORT_FINALIZE(RS_RET_ERR); + } + } else { DBGPRINTF("omelasticsearch: None of the modes match file write. No data to write.\n"); ABORT_FINALIZE(RS_RET_ERR); } @@ -1082,25 +1330,38 @@ finalize_it: if(bMutLocked) pthread_mutex_unlock(&pData->mutErrFile); fjson_object_put(ctx.errRoot); + if (ctx.jTokener) + json_tokener_free(ctx.jTokener); + free(rendered); RETiRet; } static rsRetVal -checkResultBulkmode(wrkrInstanceData_t *pWrkrData, fjson_object *root) +checkResultBulkmode(wrkrInstanceData_t *pWrkrData, fjson_object *root, uchar *reqmsg) { DEFiRet; context ctx; - ctx.statusCheckOnly=1; ctx.errRoot = 0; - if(parseRequestAndResponseForContext(pWrkrData,&root,0,&ctx)!= RS_RET_OK) - { + ctx.writeOperation = pWrkrData->pData->writeOperation; + ctx.ratelimiter = pWrkrData->pData->ratelimiter; + ctx.retryRuleset = pWrkrData->pData->retryRuleset; + ctx.statusCheckOnly=1; + ctx.jTokener = NULL; + if (pWrkrData->pData->retryFailures) { + ctx.statusCheckOnly=0; + CHKiRet(initializeRetryFailuresContext(pWrkrData, &ctx)); + } + if(parseRequestAndResponseForContext(pWrkrData,&root,reqmsg,&ctx)!= RS_RET_OK) { DBGPRINTF("omelasticsearch: error found in elasticsearch reply\n"); ABORT_FINALIZE(RS_RET_DATAFAIL); } - finalize_it: - RETiRet; +finalize_it: + fjson_object_put(ctx.errRoot); + if (ctx.jTokener) + json_tokener_free(ctx.jTokener); + RETiRet; } @@ -1118,7 +1378,7 @@ checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg) } if(pWrkrData->pData->bulkmode) { - iRet = checkResultBulkmode(pWrkrData, root); + iRet = checkResultBulkmode(pWrkrData, root, reqmsg); } else { if(fjson_object_object_get_ex(root, "status", &status)) { iRet = RS_RET_DATAFAIL; @@ -1397,6 +1657,13 @@ setInstParamDefaults(instanceData *pData) pData->caCertFile = NULL; pData->myCertFile = NULL; pData->myPrivKeyFile = NULL; + pData->writeOperation = ES_WRITE_INDEX; + pData->retryFailures = 0; + pData->ratelimitBurst = 20000; + pData->ratelimitInterval = 600; + pData->ratelimiter = NULL; + pData->retryRulesetName = NULL; + pData->retryRuleset = NULL; } BEGINnewActInst @@ -1495,6 +1762,27 @@ CODESTARTnewActInst } else { fclose(fp); } + } else if(!strcmp(actpblk.descr[i].name, "writeoperation")) { + char *writeop = es_str2cstr(pvals[i].val.d.estr, NULL); + if (writeop && !strcmp(writeop, "create")) { + pData->writeOperation = ES_WRITE_CREATE; + } else if (writeop && !strcmp(writeop, "index")) { + pData->writeOperation = ES_WRITE_INDEX; + } else if (writeop) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, + "omelasticsearch: invalid value '%s' for writeoperation: " + "must be one of 'index' or 'create' - using default value 'index'", writeop); + pData->writeOperation = ES_WRITE_INDEX; + } + free(writeop); + } else if(!strcmp(actpblk.descr[i].name, "retryfailures")) { + pData->retryFailures = pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "ratelimit.burst")) { + pData->ratelimitBurst = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "ratelimit.interval")) { + pData->ratelimitInterval = (int) pvals[i].val.d.n; + } else if(!strcmp(actpblk.descr[i].name, "retryruleset")) { + pData->retryRulesetName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); } else { dbgprintf("omelasticsearch: program error, non-handled " "param '%s'\n", actpblk.descr[i].name); @@ -1661,6 +1949,27 @@ CODESTARTnewActInst pData->searchIndex = (uchar*) strdup("system"); if(pData->searchType == NULL) pData->searchType = (uchar*) strdup("events"); + + if ((pData->writeOperation != ES_WRITE_INDEX) && (pData->bulkId == NULL)) { + errmsg.LogError(0, RS_RET_CONFIG_ERROR, + "omelasticsearch: writeoperation '%d' requires bulkid", pData->writeOperation); + ABORT_FINALIZE(RS_RET_CONFIG_ERROR); + } + + if (pData->retryFailures) { + CHKiRet(ratelimitNew(&pData->ratelimiter, "omelasticsearch", NULL)); + ratelimitSetLinuxLike(pData->ratelimiter, pData->ratelimitInterval, pData->ratelimitBurst); + ratelimitSetNoTimeCache(pData->ratelimiter); + } + + /* node created, let's add to list of instance configs for the module */ + if(loadModConf->tail == NULL) { + loadModConf->tail = loadModConf->root = pData; + } else { + loadModConf->tail->next = pData; + loadModConf->tail = pData; + } + CODE_STD_FINALIZERnewActInst cnfparamvalsDestruct(pvals, &actpblk); if (serverParam) @@ -1680,6 +1989,51 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1) CODE_STD_FINALIZERparseSelectorAct ENDparseSelectorAct + +BEGINbeginCnfLoad +CODESTARTbeginCnfLoad + loadModConf = pModConf; + pModConf->pConf = pConf; + pModConf->root = pModConf->tail = NULL; +ENDbeginCnfLoad + + +BEGINendCnfLoad +CODESTARTendCnfLoad + loadModConf = NULL; /* done loading */ +ENDendCnfLoad + + +BEGINcheckCnf + instanceConf_t *inst; +CODESTARTcheckCnf + for(inst = pModConf->root ; inst != NULL ; inst = inst->next) { + ruleset_t *pRuleset; + rsRetVal localRet; + + if (inst->retryRulesetName) { + localRet = ruleset.GetRuleset(pModConf->pConf, &pRuleset, inst->retryRulesetName); + if(localRet == RS_RET_NOT_FOUND) { + errmsg.LogError(0, localRet, "omelasticsearch: retryruleset '%s' not found - " + "no retry ruleset will be used", inst->retryRulesetName); + } else { + inst->retryRuleset = pRuleset; + } + } + } +ENDcheckCnf + + +BEGINactivateCnf +CODESTARTactivateCnf +ENDactivateCnf + + +BEGINfreeCnf +CODESTARTfreeCnf +ENDfreeCnf + + BEGINdoHUP CODESTARTdoHUP if(pData->fdErrFile != -1) { @@ -1691,10 +2045,14 @@ ENDdoHUP BEGINmodExit CODESTARTmodExit + if(pInputName != NULL) + prop.Destruct(&pInputName); curl_global_cleanup(); statsobj.Destruct(&indexStats); objRelease(errmsg, CORE_COMPONENT); - objRelease(statsobj, CORE_COMPONENT); + objRelease(statsobj, CORE_COMPONENT); + objRelease(prop, CORE_COMPONENT); + objRelease(ruleset, CORE_COMPONENT); ENDmodExit BEGINqueryEtryPt @@ -1705,6 +2063,7 @@ CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES CODEqueryEtryPt_doHUP CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */ +CODEqueryEtryPt_STD_CONF2_QUERIES ENDqueryEtryPt @@ -1714,6 +2073,8 @@ CODESTARTmodInit CODEmodInit_QueryRegCFSLineHdlr CHKiRet(objUse(errmsg, CORE_COMPONENT)); CHKiRet(objUse(statsobj, CORE_COMPONENT)); + CHKiRet(objUse(prop, CORE_COMPONENT)); + CHKiRet(objUse(ruleset, CORE_COMPONENT)); if (curl_global_init(CURL_GLOBAL_ALL) != 0) { errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled"); @@ -1739,7 +2100,28 @@ CODEmodInit_QueryRegCFSLineHdlr STATSCOUNTER_INIT(indexESFail, mutIndexESFail); CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.es", ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexESFail)); + STATSCOUNTER_INIT(indexSuccess, mutIndexSuccess); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.success", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexSuccess)); + STATSCOUNTER_INIT(indexBadResponse, mutIndexBadResponse); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bad", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadResponse)); + STATSCOUNTER_INIT(indexDuplicate, mutIndexDuplicate); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.duplicate", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexDuplicate)); + STATSCOUNTER_INIT(indexBadArgument, mutIndexBadArgument); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.badargument", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadArgument)); + STATSCOUNTER_INIT(indexBulkRejection, mutIndexBulkRejection); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bulkrejection", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBulkRejection)); + STATSCOUNTER_INIT(indexOtherResponse, mutIndexOtherResponse); + CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.other", + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexOtherResponse)); CHKiRet(statsobj.ConstructFinalize(indexStats)); + CHKiRet(prop.Construct(&pInputName)); + CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("omelasticsearch"), sizeof("omelasticsearch") - 1)); + CHKiRet(prop.ConstructFinalize(pInputName)); ENDmodInit /* vi:set ai: