From 989be897340eb458b00efedfd5e082bb362db79a Mon Sep 17 00:00:00 2001
From: Rich Megginson <rmeggins@redhat.com>
Date: Tue, 15 May 2018 16:03:25 -0600
Subject: [PATCH 11/11] omelasticsearch: write op types; bulk rejection retries
Add support for a 'create' write operation type in addition to
the default 'index'. Using create allows specifying a unique id
for each record, and allows duplicate document detection.
Add support for checking each record returned in a bulk index
request response. Allow specifying a ruleset to send each failed
record to. Add a local variable `omes` which contains the
information in the error response, so that users can control how
to handle responses e.g. retry, or send to an error file.
Add support for response stats - count successes, duplicates, and
different types of failures.
Add testing for bulk index rejections.
(cherry picked from commit 57dd368a2a915d79c94a8dc0de30c93a0bbdc8fe)
(cherry picked from commit 30a15621e1e7e393b2153e9fe5c13f724dea25b5)
---
plugins/omelasticsearch/omelasticsearch.c | 441 ++++++++++++++++++++++++++++--
1 file changed, 415 insertions(+), 26 deletions(-)
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index ed2b47535..ca61ae28f 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -51,6 +51,8 @@
#include "statsobj.h"
#include "cfsysline.h"
#include "unicode-helper.h"
+#include "ratelimit.h"
+#include "ruleset.h"
#ifndef O_LARGEFILE
# define O_LARGEFILE 0
@@ -64,6 +66,8 @@ MODULE_CNFNAME("omelasticsearch")
DEF_OMOD_STATIC_DATA
DEFobjCurrIf(errmsg)
DEFobjCurrIf(statsobj)
+DEFobjCurrIf(prop)
+DEFobjCurrIf(ruleset)
statsobj_t *indexStats;
STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit)
@@ -71,19 +75,35 @@ STATSCOUNTER_DEF(indexHTTPFail, mutIndexHTTPFail)
STATSCOUNTER_DEF(indexHTTPReqFail, mutIndexHTTPReqFail)
STATSCOUNTER_DEF(checkConnFail, mutCheckConnFail)
STATSCOUNTER_DEF(indexESFail, mutIndexESFail)
+STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess)
+STATSCOUNTER_DEF(indexBadResponse, mutIndexBadResponse)
+STATSCOUNTER_DEF(indexDuplicate, mutIndexDuplicate)
+STATSCOUNTER_DEF(indexBadArgument, mutIndexBadArgument)
+STATSCOUNTER_DEF(indexBulkRejection, mutIndexBulkRejection)
+STATSCOUNTER_DEF(indexOtherResponse, mutIndexOtherResponse)
+static prop_t *pInputName = NULL;
# define META_STRT "{\"index\":{\"_index\": \""
+# define META_STRT_CREATE "{\"create\":{\"_index\": \""
# define META_TYPE "\",\"_type\":\""
# define META_PARENT "\",\"_parent\":\""
# define META_ID "\", \"_id\":\""
# define META_END "\"}}\n"
+typedef enum {
+ ES_WRITE_INDEX,
+ ES_WRITE_CREATE,
+ ES_WRITE_UPDATE, /* not supported */
+ ES_WRITE_UPSERT /* not supported */
+} es_write_ops_t;
+
/* REST API for elasticsearch hits this URL:
* http://<hostName>:<restPort>/<searchIndex>/<searchType>
*/
+/* bulk API uses /_bulk */
typedef struct curl_slist HEADER;
-typedef struct _instanceData {
+typedef struct instanceConf_s {
int defaultPort;
int fdErrFile; /* error file fd or -1 if not open */
pthread_mutex_t mutErrFile;
@@ -113,8 +133,25 @@ typedef struct _instanceData {
uchar *caCertFile;
uchar *myCertFile;
uchar *myPrivKeyFile;
+ es_write_ops_t writeOperation;
+ sbool retryFailures;
+ int ratelimitInterval;
+ int ratelimitBurst;
+ /* for retries */
+ ratelimit_t *ratelimiter;
+ uchar *retryRulesetName;
+ ruleset_t *retryRuleset;
+ struct instanceConf_s *next;
} instanceData;
+typedef instanceConf_t instanceData;
+
+struct modConfData_s {
+ rsconf_t *pConf; /* our overall config object */
+ instanceConf_t *root, *tail;
+};
+static modConfData_t *loadModConf = NULL; /* modConf ptr to use for the current load process */
+
typedef struct wrkrInstanceData {
instanceData *pData;
int serverIndex;
@@ -160,7 +197,12 @@ static struct cnfparamdescr actpdescr[] = {
{ "allowunsignedcerts", eCmdHdlrBinary, 0 },
{ "tls.cacert", eCmdHdlrString, 0 },
{ "tls.mycert", eCmdHdlrString, 0 },
- { "tls.myprivkey", eCmdHdlrString, 0 }
+ { "tls.myprivkey", eCmdHdlrString, 0 },
+ { "writeoperation", eCmdHdlrGetWord, 0 },
+ { "retryfailures", eCmdHdlrBinary, 0 },
+ { "ratelimit.interval", eCmdHdlrInt, 0 },
+ { "ratelimit.burst", eCmdHdlrInt, 0 },
+ { "retryruleset", eCmdHdlrString, 0 }
};
static struct cnfparamblk actpblk =
{ CNFPARAMBLK_VERSION,
@@ -177,6 +219,9 @@ CODESTARTcreateInstance
pData->caCertFile = NULL;
pData->myCertFile = NULL;
pData->myPrivKeyFile = NULL;
+ pData->ratelimiter = NULL;
+ pData->retryRulesetName = NULL;
+ pData->retryRuleset = NULL;
ENDcreateInstance
BEGINcreateWrkrInstance
@@ -228,6 +273,9 @@ CODESTARTfreeInstance
free(pData->caCertFile);
free(pData->myCertFile);
free(pData->myPrivKeyFile);
+ free(pData->retryRulesetName);
+ if (pData->ratelimiter != NULL)
+ ratelimitDestruct(pData->ratelimiter);
ENDfreeInstance
BEGINfreeWrkrInstance
@@ -285,6 +333,10 @@ CODESTARTdbgPrintInstInfo
dbgprintf("\ttls.cacert='%s'\n", pData->caCertFile);
dbgprintf("\ttls.mycert='%s'\n", pData->myCertFile);
dbgprintf("\ttls.myprivkey='%s'\n", pData->myPrivKeyFile);
+ dbgprintf("\twriteoperation='%d'\n", pData->writeOperation);
+ dbgprintf("\tretryfailures='%d'\n", pData->retryFailures);
+ dbgprintf("\tratelimit.interval='%d'\n", pData->ratelimitInterval);
+ dbgprintf("\tratelimit.burst='%d'\n", pData->ratelimitBurst);
ENDdbgPrintInstInfo
@@ -557,7 +609,11 @@ finalize_it:
static size_t
computeMessageSize(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
{
- size_t r = sizeof(META_STRT)-1 + sizeof(META_TYPE)-1 + sizeof(META_END)-1 + sizeof("\n")-1;
+ size_t r = sizeof(META_TYPE)-1 + sizeof(META_END)-1 + sizeof("\n")-1;
+ if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
+ r += sizeof(META_STRT_CREATE)-1;
+ else
+ r += sizeof(META_STRT)-1;
uchar *searchIndex = 0;
uchar *searchType;
@@ -594,7 +650,10 @@ buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
DEFiRet;
getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId);
- r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1);
+ if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
+ r = es_addBuf(&pWrkrData->batch.data, META_STRT_CREATE, sizeof(META_STRT_CREATE)-1);
+ else
+ r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1);
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex,
ustrlen(searchIndex));
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
@@ -709,13 +768,20 @@ static int checkReplyStatus(fjson_object* ok) {
/*
* Context object for error file content creation or status check
+ * response_item - the full {"create":{"_index":"idxname",.....}}
+ * response_body - the inner hash of the response_item - {"_index":"idxname",...}
+ * status - the "status" field from the inner hash - "status":500
+ * should be able to use fjson_object_get_int(status) to get the http result code
*/
typedef struct exeContext{
int statusCheckOnly;
fjson_object *errRoot;
- rsRetVal (*prepareErrorFileContent)(struct exeContext *ctx,int itemStatus,char *request,char *response);
-
-
+ rsRetVal (*prepareErrorFileContent)(struct exeContext *ctx,int itemStatus,char *request,char *response,
+ fjson_object *response_item, fjson_object *response_body, fjson_object *status);
+ es_write_ops_t writeOperation;
+ ratelimit_t *ratelimiter;
+ ruleset_t *retryRuleset;
+ struct json_tokener *jTokener;
} context;
/*
@@ -728,8 +794,15 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
fjson_object *replyRoot = *pReplyRoot;
int i;
int numitems;
- fjson_object *items=NULL;
+ fjson_object *items=NULL, *jo_errors = NULL;
+ int errors = 0;
+ if(fjson_object_object_get_ex(replyRoot, "errors", &jo_errors)) {
+ errors = fjson_object_get_boolean(jo_errors);
+ if (!errors && pWrkrData->pData->retryFailures) {
+ return RS_RET_OK;
+ }
+ }
/*iterate over items*/
if(!fjson_object_object_get_ex(replyRoot, "items", &items)) {
@@ -741,7 +814,11 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
numitems = fjson_object_array_length(items);
- DBGPRINTF("omelasticsearch: Entire request %s\n",reqmsg);
+ if (reqmsg) {
+ DBGPRINTF("omelasticsearch: Entire request %s\n", reqmsg);
+ } else {
+ DBGPRINTF("omelasticsearch: Empty request\n");
+ }
const char *lastReqRead= (char*)reqmsg;
DBGPRINTF("omelasticsearch: %d items in reply\n", numitems);
@@ -769,8 +846,7 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
char *request =0;
char *response =0;
- if(ctx->statusCheckOnly)
- {
+ if(ctx->statusCheckOnly || (NULL == lastReqRead)) {
if(itemStatus) {
DBGPRINTF("omelasticsearch: error in elasticsearch reply: item %d, status is %d\n", i, fjson_object_get_int(ok));
DBGPRINTF("omelasticsearch: status check found error.\n");
@@ -795,7 +871,8 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
}
/*call the context*/
- rsRetVal ret = ctx->prepareErrorFileContent(ctx, itemStatus, request,response);
+ rsRetVal ret = ctx->prepareErrorFileContent(ctx, itemStatus, request,
+ response, item, result, ok);
/*free memory in any case*/
free(request);
@@ -818,11 +895,14 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
* Dumps only failed requests of bulk insert
*/
static rsRetVal
-getDataErrorOnly(context *ctx,int itemStatus,char *request,char *response)
+getDataErrorOnly(context *ctx,int itemStatus,char *request,char *response,
+ fjson_object *response_item, fjson_object *response_body, fjson_object *status)
{
DEFiRet;
- if(itemStatus)
- {
+ (void)response_item; /* unused */
+ (void)response_body; /* unused */
+ (void)status; /* unused */
+ if(itemStatus) {
fjson_object *onlyErrorResponses =NULL;
fjson_object *onlyErrorRequests=NULL;
@@ -855,9 +935,16 @@ static rsRetVal
getDataInterleaved(context *ctx,
int __attribute__((unused)) itemStatus,
char *request,
- char *response)
+ char *response,
+ fjson_object *response_item,
+ fjson_object *response_body,
+ fjson_object *status
+)
{
DEFiRet;
+ (void)response_item; /* unused */
+ (void)response_body; /* unused */
+ (void)status; /* unused */
fjson_object *interleaved =NULL;
if(!fjson_object_object_get_ex(ctx->errRoot, "response", &interleaved))
{
@@ -889,11 +976,13 @@ getDataInterleaved(context *ctx,
*/
static rsRetVal
-getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *response)
+getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *response,
+ fjson_object *response_item, fjson_object *response_body, fjson_object *status)
{
DEFiRet;
if (itemStatus) {
- if(getDataInterleaved(ctx, itemStatus,request,response)!= RS_RET_OK) {
+ if(getDataInterleaved(ctx, itemStatus,request,response,
+ response_item, response_body, status)!= RS_RET_OK) {
ABORT_FINALIZE(RS_RET_ERR);
}
}
@@ -902,6 +991,141 @@ getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *resp
RETiRet;
}
+/* request string looks like this:
+ * "{\"create\":{\"_index\": \"rsyslog_testbench\",\"_type\":\"test-type\",
+ * \"_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}}\n
+ * {\"msgnum\":\"x00000000\",\"viaq_msg_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}\n"
+ * we don't want the meta header, only the data part
+ * start = first \n + 1
+ * end = last \n
+ */
+static rsRetVal
+createMsgFromRequest(const char *request, context *ctx, smsg_t **msg)
+{
+ DEFiRet;
+ fjson_object *jo_msg = NULL;
+ const char *datastart, *dataend;
+ size_t datalen;
+ enum json_tokener_error json_error;
+
+ *msg = NULL;
+ if (!(datastart = strchr(request, '\n')) || (datastart[1] != '{')) {
+ LogError(0, RS_RET_ERR,
+ "omelasticsearch: malformed original request - "
+ "could not find start of original data [%s]",
+ request);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ datastart++; /* advance to { */
+ if (!(dataend = strchr(datastart, '\n')) || (dataend[1] != '\0')) {
+ LogError(0, RS_RET_ERR,
+ "omelasticsearch: malformed original request - "
+ "could not find end of original data [%s]",
+ request);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ datalen = dataend - datastart;
+ json_tokener_reset(ctx->jTokener);
+ fjson_object *jo_request = json_tokener_parse_ex(ctx->jTokener, datastart, datalen);
+ json_error = fjson_tokener_get_error(ctx->jTokener);
+ if (!jo_request || (json_error != fjson_tokener_success)) {
+ LogError(0, RS_RET_ERR,
+ "omelasticsearch: parse error [%s] - could not convert original "
+ "request JSON back into JSON object [%s]",
+ fjson_tokener_error_desc(json_error), request);
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+
+ CHKiRet(msgConstruct(msg));
+ MsgSetFlowControlType(*msg, eFLOWCTL_FULL_DELAY);
+ MsgSetInputName(*msg, pInputName);
+ if (fjson_object_object_get_ex(jo_request, "message", &jo_msg)) {
+ const char *rawmsg = json_object_get_string(jo_msg);
+ const size_t msgLen = (size_t)json_object_get_string_len(jo_msg);
+ MsgSetRawMsg(*msg, rawmsg, msgLen);
+ } else {
+ MsgSetRawMsg(*msg, request, strlen(request));
+ }
+ MsgSetMSGoffs(*msg, 0); /* we do not have a header... */
+ CHKiRet(msgAddJSON(*msg, (uchar*)"!", jo_request, 0, 0));
+
+ finalize_it:
+ RETiRet;
+
+}
+
+
+static rsRetVal
+getDataRetryFailures(context *ctx,int itemStatus,char *request,char *response,
+ fjson_object *response_item, fjson_object *response_body, fjson_object *status)
+{
+ DEFiRet;
+ fjson_object *omes = NULL, *jo = NULL;
+ int istatus = fjson_object_get_int(status);
+ int iscreateop = 0;
+ struct json_object_iterator it = json_object_iter_begin(response_item);
+ struct json_object_iterator itEnd = json_object_iter_end(response_item);
+ const char *optype = NULL;
+ smsg_t *msg = NULL;
+
+ (void)response;
+ (void)itemStatus;
+ CHKiRet(createMsgFromRequest(request, ctx, &msg));
+ CHKmalloc(msg);
+ /* add status as local variables */
+ omes = json_object_new_object();
+ if (!json_object_iter_equal(&it, &itEnd))
+ optype = json_object_iter_peek_name(&it);
+ if (optype && !strcmp("create", optype))
+ iscreateop = 1;
+ if (optype && !strcmp("index", optype) && (ctx->writeOperation == ES_WRITE_INDEX))
+ iscreateop = 1;
+ if (optype) {
+ jo = json_object_new_string(optype);
+ } else {
+ jo = json_object_new_string("unknown");
+ }
+ json_object_object_add(omes, "writeoperation", jo);
+
+ if (!optype) {
+ STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
+ } else if ((istatus == 200) || (istatus == 201)) {
+ STATSCOUNTER_INC(indexSuccess, mutIndexSuccess);
+ } else if ((istatus == 409) && iscreateop) {
+ STATSCOUNTER_INC(indexDuplicate, mutIndexDuplicate);
+ } else if (istatus == 400 || (istatus < 200)) {
+ STATSCOUNTER_INC(indexBadArgument, mutIndexBadArgument);
+ } else {
+ fjson_object *error = NULL, *errtype = NULL;
+ if(fjson_object_object_get_ex(response_body, "error", &error) &&
+ fjson_object_object_get_ex(error, "type", &errtype)) {
+ if (istatus == 429) {
+ STATSCOUNTER_INC(indexBulkRejection, mutIndexBulkRejection);
+ } else {
+ STATSCOUNTER_INC(indexOtherResponse, mutIndexOtherResponse);
+ }
+ } else {
+ STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
+ }
+ }
+ /* add response_body fields to local var omes */
+ it = json_object_iter_begin(response_body);
+ itEnd = json_object_iter_end(response_body);
+ while (!json_object_iter_equal(&it, &itEnd)) {
+ json_object_object_add(omes, json_object_iter_peek_name(&it),
+ json_object_get(json_object_iter_peek_value(&it)));
+ json_object_iter_next(&it);
+ }
+ CHKiRet(msgAddJSON(msg, (uchar*)".omes", omes, 0, 0));
+ omes = NULL;
+ MsgSetRuleset(msg, ctx->retryRuleset);
+ CHKiRet(ratelimitAddMsg(ctx->ratelimiter, NULL, msg));
+finalize_it:
+ if (omes)
+ json_object_put(omes);
+ RETiRet;
+}
+
/*
* get erroronly context
*/
@@ -979,6 +1203,23 @@ initializeErrorInterleavedConext(wrkrInstanceData_t *pWrkrData,context *ctx){
RETiRet;
}
+/*get retry failures context*/
+static rsRetVal
+initializeRetryFailuresContext(wrkrInstanceData_t *pWrkrData,context *ctx){
+ DEFiRet;
+ ctx->statusCheckOnly=0;
+ fjson_object *errRoot=NULL;
+ if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
+
+
+ fjson_object_object_add(errRoot, "url", fjson_object_new_string((char*)pWrkrData->restURL));
+ ctx->errRoot = errRoot;
+ ctx->prepareErrorFileContent= &getDataRetryFailures;
+ CHKmalloc(ctx->jTokener = json_tokener_new());
+ finalize_it:
+ RETiRet;
+}
+
/* write data error request/replies to separate error file
* Note: we open the file but never close it before exit. If it
@@ -994,6 +1235,10 @@ writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, fjson_object
char errStr[1024];
context ctx;
ctx.errRoot=0;
+ ctx.writeOperation = pWrkrData->pData->writeOperation;
+ ctx.ratelimiter = pWrkrData->pData->ratelimiter;
+ ctx.retryRuleset = pWrkrData->pData->retryRuleset;
+ ctx.jTokener = NULL;
DEFiRet;
if(pData->errorFile == NULL) {
@@ -1039,9 +1284,12 @@ writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, fjson_object
DBGPRINTF("omelasticsearch: error initializing error interleaved context.\n");
ABORT_FINALIZE(RS_RET_ERR);
}
- }
- else
- {
+ } else if(pData->retryFailures) {
+ if(initializeRetryFailuresContext(pWrkrData, &ctx) != RS_RET_OK) {
+ DBGPRINTF("omelasticsearch: error initializing retry failures context.\n");
+ ABORT_FINALIZE(RS_RET_ERR);
+ }
+ } else {
DBGPRINTF("omelasticsearch: None of the modes match file write. No data to write.\n");
ABORT_FINALIZE(RS_RET_ERR);
}
@@ -1082,25 +1330,38 @@ finalize_it:
if(bMutLocked)
pthread_mutex_unlock(&pData->mutErrFile);
fjson_object_put(ctx.errRoot);
+ if (ctx.jTokener)
+ json_tokener_free(ctx.jTokener);
+ free(rendered);
RETiRet;
}
static rsRetVal
-checkResultBulkmode(wrkrInstanceData_t *pWrkrData, fjson_object *root)
+checkResultBulkmode(wrkrInstanceData_t *pWrkrData, fjson_object *root, uchar *reqmsg)
{
DEFiRet;
context ctx;
- ctx.statusCheckOnly=1;
ctx.errRoot = 0;
- if(parseRequestAndResponseForContext(pWrkrData,&root,0,&ctx)!= RS_RET_OK)
- {
+ ctx.writeOperation = pWrkrData->pData->writeOperation;
+ ctx.ratelimiter = pWrkrData->pData->ratelimiter;
+ ctx.retryRuleset = pWrkrData->pData->retryRuleset;
+ ctx.statusCheckOnly=1;
+ ctx.jTokener = NULL;
+ if (pWrkrData->pData->retryFailures) {
+ ctx.statusCheckOnly=0;
+ CHKiRet(initializeRetryFailuresContext(pWrkrData, &ctx));
+ }
+ if(parseRequestAndResponseForContext(pWrkrData,&root,reqmsg,&ctx)!= RS_RET_OK) {
DBGPRINTF("omelasticsearch: error found in elasticsearch reply\n");
ABORT_FINALIZE(RS_RET_DATAFAIL);
}
- finalize_it:
- RETiRet;
+finalize_it:
+ fjson_object_put(ctx.errRoot);
+ if (ctx.jTokener)
+ json_tokener_free(ctx.jTokener);
+ RETiRet;
}
@@ -1118,7 +1378,7 @@ checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg)
}
if(pWrkrData->pData->bulkmode) {
- iRet = checkResultBulkmode(pWrkrData, root);
+ iRet = checkResultBulkmode(pWrkrData, root, reqmsg);
} else {
if(fjson_object_object_get_ex(root, "status", &status)) {
iRet = RS_RET_DATAFAIL;
@@ -1397,6 +1657,13 @@ setInstParamDefaults(instanceData *pData)
pData->caCertFile = NULL;
pData->myCertFile = NULL;
pData->myPrivKeyFile = NULL;
+ pData->writeOperation = ES_WRITE_INDEX;
+ pData->retryFailures = 0;
+ pData->ratelimitBurst = 20000;
+ pData->ratelimitInterval = 600;
+ pData->ratelimiter = NULL;
+ pData->retryRulesetName = NULL;
+ pData->retryRuleset = NULL;
}
BEGINnewActInst
@@ -1495,6 +1762,27 @@ CODESTARTnewActInst
} else {
fclose(fp);
}
+ } else if(!strcmp(actpblk.descr[i].name, "writeoperation")) {
+ char *writeop = es_str2cstr(pvals[i].val.d.estr, NULL);
+ if (writeop && !strcmp(writeop, "create")) {
+ pData->writeOperation = ES_WRITE_CREATE;
+ } else if (writeop && !strcmp(writeop, "index")) {
+ pData->writeOperation = ES_WRITE_INDEX;
+ } else if (writeop) {
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR,
+ "omelasticsearch: invalid value '%s' for writeoperation: "
+ "must be one of 'index' or 'create' - using default value 'index'", writeop);
+ pData->writeOperation = ES_WRITE_INDEX;
+ }
+ free(writeop);
+ } else if(!strcmp(actpblk.descr[i].name, "retryfailures")) {
+ pData->retryFailures = pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "ratelimit.burst")) {
+ pData->ratelimitBurst = (int) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "ratelimit.interval")) {
+ pData->ratelimitInterval = (int) pvals[i].val.d.n;
+ } else if(!strcmp(actpblk.descr[i].name, "retryruleset")) {
+ pData->retryRulesetName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
} else {
dbgprintf("omelasticsearch: program error, non-handled "
"param '%s'\n", actpblk.descr[i].name);
@@ -1661,6 +1949,27 @@ CODESTARTnewActInst
pData->searchIndex = (uchar*) strdup("system");
if(pData->searchType == NULL)
pData->searchType = (uchar*) strdup("events");
+
+ if ((pData->writeOperation != ES_WRITE_INDEX) && (pData->bulkId == NULL)) {
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR,
+ "omelasticsearch: writeoperation '%d' requires bulkid", pData->writeOperation);
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+ }
+
+ if (pData->retryFailures) {
+ CHKiRet(ratelimitNew(&pData->ratelimiter, "omelasticsearch", NULL));
+ ratelimitSetLinuxLike(pData->ratelimiter, pData->ratelimitInterval, pData->ratelimitBurst);
+ ratelimitSetNoTimeCache(pData->ratelimiter);
+ }
+
+ /* node created, let's add to list of instance configs for the module */
+ if(loadModConf->tail == NULL) {
+ loadModConf->tail = loadModConf->root = pData;
+ } else {
+ loadModConf->tail->next = pData;
+ loadModConf->tail = pData;
+ }
+
CODE_STD_FINALIZERnewActInst
cnfparamvalsDestruct(pvals, &actpblk);
if (serverParam)
@@ -1680,6 +1989,51 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
CODE_STD_FINALIZERparseSelectorAct
ENDparseSelectorAct
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+ loadModConf = pModConf;
+ pModConf->pConf = pConf;
+ pModConf->root = pModConf->tail = NULL;
+ENDbeginCnfLoad
+
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ loadModConf = NULL; /* done loading */
+ENDendCnfLoad
+
+
+BEGINcheckCnf
+ instanceConf_t *inst;
+CODESTARTcheckCnf
+ for(inst = pModConf->root ; inst != NULL ; inst = inst->next) {
+ ruleset_t *pRuleset;
+ rsRetVal localRet;
+
+ if (inst->retryRulesetName) {
+ localRet = ruleset.GetRuleset(pModConf->pConf, &pRuleset, inst->retryRulesetName);
+ if(localRet == RS_RET_NOT_FOUND) {
+ errmsg.LogError(0, localRet, "omelasticsearch: retryruleset '%s' not found - "
+ "no retry ruleset will be used", inst->retryRulesetName);
+ } else {
+ inst->retryRuleset = pRuleset;
+ }
+ }
+ }
+ENDcheckCnf
+
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ENDactivateCnf
+
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+ENDfreeCnf
+
+
BEGINdoHUP
CODESTARTdoHUP
if(pData->fdErrFile != -1) {
@@ -1691,10 +2045,14 @@ ENDdoHUP
BEGINmodExit
CODESTARTmodExit
+ if(pInputName != NULL)
+ prop.Destruct(&pInputName);
curl_global_cleanup();
statsobj.Destruct(&indexStats);
objRelease(errmsg, CORE_COMPONENT);
- objRelease(statsobj, CORE_COMPONENT);
+ objRelease(statsobj, CORE_COMPONENT);
+ objRelease(prop, CORE_COMPONENT);
+ objRelease(ruleset, CORE_COMPONENT);
ENDmodExit
BEGINqueryEtryPt
@@ -1705,6 +2063,7 @@ CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
CODEqueryEtryPt_doHUP
CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
+CODEqueryEtryPt_STD_CONF2_QUERIES
ENDqueryEtryPt
@@ -1714,6 +2073,8 @@ CODESTARTmodInit
CODEmodInit_QueryRegCFSLineHdlr
CHKiRet(objUse(errmsg, CORE_COMPONENT));
CHKiRet(objUse(statsobj, CORE_COMPONENT));
+ CHKiRet(objUse(prop, CORE_COMPONENT));
+ CHKiRet(objUse(ruleset, CORE_COMPONENT));
if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled");
@@ -1739,7 +2100,28 @@ CODEmodInit_QueryRegCFSLineHdlr
STATSCOUNTER_INIT(indexESFail, mutIndexESFail);
CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.es",
ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexESFail));
+ STATSCOUNTER_INIT(indexSuccess, mutIndexSuccess);
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.success",
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexSuccess));
+ STATSCOUNTER_INIT(indexBadResponse, mutIndexBadResponse);
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bad",
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadResponse));
+ STATSCOUNTER_INIT(indexDuplicate, mutIndexDuplicate);
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.duplicate",
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexDuplicate));
+ STATSCOUNTER_INIT(indexBadArgument, mutIndexBadArgument);
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.badargument",
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadArgument));
+ STATSCOUNTER_INIT(indexBulkRejection, mutIndexBulkRejection);
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bulkrejection",
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBulkRejection));
+ STATSCOUNTER_INIT(indexOtherResponse, mutIndexOtherResponse);
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.other",
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexOtherResponse));
CHKiRet(statsobj.ConstructFinalize(indexStats));
+ CHKiRet(prop.Construct(&pInputName));
+ CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("omelasticsearch"), sizeof("omelasticsearch") - 1));
+ CHKiRet(prop.ConstructFinalize(pInputName));
ENDmodInit
/* vi:set ai: