Blob Blame History Raw
From 989be897340eb458b00efedfd5e082bb362db79a Mon Sep 17 00:00:00 2001
From: Rich Megginson <rmeggins@redhat.com>
Date: Tue, 15 May 2018 16:03:25 -0600
Subject: [PATCH 11/11] omelasticsearch: write op types; bulk rejection retries

Add support for a 'create' write operation type in addition to
the default 'index'.  Using create allows specifying a unique id
for each record, and allows duplicate document detection.

Add support for checking each record returned in a bulk index
request response.  Allow specifying a ruleset to send each failed
record to.  Add a local variable `omes` which contains the
information in the error response, so that users can control how
to handle responses e.g. retry, or send to an error file.

Add support for response stats - count successes, duplicates, and
different types of failures.

Add testing for bulk index rejections.

(cherry picked from commit 57dd368a2a915d79c94a8dc0de30c93a0bbdc8fe)
(cherry picked from commit 30a15621e1e7e393b2153e9fe5c13f724dea25b5)
---
 plugins/omelasticsearch/omelasticsearch.c | 441 ++++++++++++++++++++++++++++--
 1 file changed, 415 insertions(+), 26 deletions(-)

diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
index ed2b47535..ca61ae28f 100644
--- a/plugins/omelasticsearch/omelasticsearch.c
+++ b/plugins/omelasticsearch/omelasticsearch.c
@@ -51,6 +51,8 @@
 #include "statsobj.h"
 #include "cfsysline.h"
 #include "unicode-helper.h"
+#include "ratelimit.h"
+#include "ruleset.h"
 
 #ifndef O_LARGEFILE
 #  define O_LARGEFILE 0
@@ -64,6 +66,8 @@ MODULE_CNFNAME("omelasticsearch")
 DEF_OMOD_STATIC_DATA
 DEFobjCurrIf(errmsg)
 DEFobjCurrIf(statsobj)
+DEFobjCurrIf(prop)
+DEFobjCurrIf(ruleset)
 
 statsobj_t *indexStats;
 STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit)
@@ -71,19 +75,35 @@ STATSCOUNTER_DEF(indexHTTPFail, mutIndexHTTPFail)
 STATSCOUNTER_DEF(indexHTTPReqFail, mutIndexHTTPReqFail)
 STATSCOUNTER_DEF(checkConnFail, mutCheckConnFail)
 STATSCOUNTER_DEF(indexESFail, mutIndexESFail)
+STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess)
+STATSCOUNTER_DEF(indexBadResponse, mutIndexBadResponse)
+STATSCOUNTER_DEF(indexDuplicate, mutIndexDuplicate)
+STATSCOUNTER_DEF(indexBadArgument, mutIndexBadArgument)
+STATSCOUNTER_DEF(indexBulkRejection, mutIndexBulkRejection)
+STATSCOUNTER_DEF(indexOtherResponse, mutIndexOtherResponse)
 
+static prop_t *pInputName = NULL;
 
 #	define META_STRT "{\"index\":{\"_index\": \""
+#	define META_STRT_CREATE "{\"create\":{\"_index\": \""
 #	define META_TYPE "\",\"_type\":\""
 #	define META_PARENT "\",\"_parent\":\""
 #	define META_ID "\", \"_id\":\""
 #	define META_END  "\"}}\n"
 
+typedef enum {
+	ES_WRITE_INDEX,
+	ES_WRITE_CREATE,
+	ES_WRITE_UPDATE, /* not supported */
+	ES_WRITE_UPSERT /* not supported */
+} es_write_ops_t;
+
 /* REST API for elasticsearch hits this URL:
  * http://<hostName>:<restPort>/<searchIndex>/<searchType>
  */
+/* bulk API uses /_bulk */
 typedef struct curl_slist HEADER;
-typedef struct _instanceData {
+typedef struct instanceConf_s {
 	int defaultPort;
 	int fdErrFile;		/* error file fd or -1 if not open */
 	pthread_mutex_t mutErrFile;
@@ -113,8 +133,25 @@ typedef struct _instanceData {
 	uchar *caCertFile;
 	uchar *myCertFile;
 	uchar *myPrivKeyFile;
+	es_write_ops_t writeOperation;
+	sbool retryFailures;
+	int ratelimitInterval;
+	int ratelimitBurst;
+	/* for retries */
+	ratelimit_t *ratelimiter;
+	uchar *retryRulesetName;
+	ruleset_t *retryRuleset;
+	struct instanceConf_s *next;
 } instanceData;
 
+typedef instanceConf_t instanceData;
+
+struct modConfData_s {
+	rsconf_t *pConf;		/* our overall config object */
+	instanceConf_t *root, *tail;
+};
+static modConfData_t *loadModConf = NULL;	/* modConf ptr to use for the current load process */
+
 typedef struct wrkrInstanceData {
 	instanceData *pData;
 	int serverIndex;
@@ -160,7 +197,12 @@ static struct cnfparamdescr actpdescr[] = {
 	{ "allowunsignedcerts", eCmdHdlrBinary, 0 },
 	{ "tls.cacert", eCmdHdlrString, 0 },
 	{ "tls.mycert", eCmdHdlrString, 0 },
-	{ "tls.myprivkey", eCmdHdlrString, 0 }
+	{ "tls.myprivkey", eCmdHdlrString, 0 },
+	{ "writeoperation", eCmdHdlrGetWord, 0 },
+	{ "retryfailures", eCmdHdlrBinary, 0 },
+	{ "ratelimit.interval", eCmdHdlrInt, 0 },
+	{ "ratelimit.burst", eCmdHdlrInt, 0 },
+	{ "retryruleset", eCmdHdlrString, 0 }
 };
 static struct cnfparamblk actpblk =
 	{ CNFPARAMBLK_VERSION,
@@ -177,6 +219,9 @@ CODESTARTcreateInstance
 	pData->caCertFile = NULL;
 	pData->myCertFile = NULL;
 	pData->myPrivKeyFile = NULL;
+	pData->ratelimiter = NULL;
+	pData->retryRulesetName = NULL;
+	pData->retryRuleset = NULL;
 ENDcreateInstance
 
 BEGINcreateWrkrInstance
@@ -228,6 +273,9 @@ CODESTARTfreeInstance
 	free(pData->caCertFile);
 	free(pData->myCertFile);
 	free(pData->myPrivKeyFile);
+	free(pData->retryRulesetName);
+	if (pData->ratelimiter != NULL)
+		ratelimitDestruct(pData->ratelimiter);
 ENDfreeInstance
 
 BEGINfreeWrkrInstance
@@ -285,6 +333,10 @@ CODESTARTdbgPrintInstInfo
 	dbgprintf("\ttls.cacert='%s'\n", pData->caCertFile);
 	dbgprintf("\ttls.mycert='%s'\n", pData->myCertFile);
 	dbgprintf("\ttls.myprivkey='%s'\n", pData->myPrivKeyFile);
+	dbgprintf("\twriteoperation='%d'\n", pData->writeOperation);
+	dbgprintf("\tretryfailures='%d'\n", pData->retryFailures);
+	dbgprintf("\tratelimit.interval='%d'\n", pData->ratelimitInterval);
+	dbgprintf("\tratelimit.burst='%d'\n", pData->ratelimitBurst);
 ENDdbgPrintInstInfo
 
 
@@ -557,7 +609,11 @@ finalize_it:
 static size_t
 computeMessageSize(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
 {
-	size_t r = sizeof(META_STRT)-1 + sizeof(META_TYPE)-1 + sizeof(META_END)-1 + sizeof("\n")-1;
+	size_t r = sizeof(META_TYPE)-1 + sizeof(META_END)-1 + sizeof("\n")-1;
+	if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
+		r += sizeof(META_STRT_CREATE)-1;
+	else
+		r += sizeof(META_STRT)-1;
 
 	uchar *searchIndex = 0;
 	uchar *searchType;
@@ -594,7 +650,10 @@ buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
 	DEFiRet;
 
 	getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId);
-	r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1);
+	if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
+		r = es_addBuf(&pWrkrData->batch.data, META_STRT_CREATE, sizeof(META_STRT_CREATE)-1);
+	else
+		r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1);
 	if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex,
 				 ustrlen(searchIndex));
 	if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
@@ -709,13 +768,20 @@ static int checkReplyStatus(fjson_object* ok) {
 
 /*
  * Context object for error file content creation or status check
+ * response_item - the full {"create":{"_index":"idxname",.....}}
+ * response_body - the inner hash of the response_item - {"_index":"idxname",...}
+ * status - the "status" field from the inner hash - "status":500
+ *          should be able to use fjson_object_get_int(status) to get the http result code
  */
 typedef struct exeContext{
 	int statusCheckOnly;
 	fjson_object *errRoot;
-	rsRetVal (*prepareErrorFileContent)(struct exeContext *ctx,int itemStatus,char *request,char *response);
-
-
+	rsRetVal (*prepareErrorFileContent)(struct exeContext *ctx,int itemStatus,char *request,char *response,
+			fjson_object *response_item, fjson_object *response_body, fjson_object *status);
+	es_write_ops_t writeOperation;
+	ratelimit_t *ratelimiter;
+	ruleset_t *retryRuleset;
+	struct json_tokener *jTokener;
 } context;
 
 /*
@@ -728,8 +794,15 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
 	fjson_object *replyRoot = *pReplyRoot;
 	int i;
 	int numitems;
-	fjson_object *items=NULL;
+	fjson_object *items=NULL, *jo_errors = NULL;
+	int errors = 0;
 
+	if(fjson_object_object_get_ex(replyRoot, "errors", &jo_errors)) {
+		errors = fjson_object_get_boolean(jo_errors);
+		if (!errors && pWrkrData->pData->retryFailures) {
+			return RS_RET_OK;
+		}
+	}
 
 	/*iterate over items*/
 	if(!fjson_object_object_get_ex(replyRoot, "items", &items)) {
@@ -741,7 +814,11 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
 
 	numitems = fjson_object_array_length(items);
 
-	DBGPRINTF("omelasticsearch: Entire request %s\n",reqmsg);
+	if (reqmsg) {
+		DBGPRINTF("omelasticsearch: Entire request %s\n", reqmsg);
+	} else {
+		DBGPRINTF("omelasticsearch: Empty request\n");
+	}
 	const char *lastReqRead= (char*)reqmsg;
 
 	DBGPRINTF("omelasticsearch: %d items in reply\n", numitems);
@@ -769,8 +846,7 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
 		
 		char *request =0;
 		char *response =0;
-		if(ctx->statusCheckOnly)
-		{
+		if(ctx->statusCheckOnly || (NULL == lastReqRead)) {
 			if(itemStatus) {
 				DBGPRINTF("omelasticsearch: error in elasticsearch reply: item %d, status is %d\n", i, fjson_object_get_int(ok));
 				DBGPRINTF("omelasticsearch: status check found error.\n");
@@ -795,7 +871,8 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
 			}
 
 			/*call the context*/
-			rsRetVal ret = ctx->prepareErrorFileContent(ctx, itemStatus, request,response);
+			rsRetVal ret = ctx->prepareErrorFileContent(ctx, itemStatus, request,
+					response, item, result, ok);
 
 			/*free memory in any case*/
 			free(request);
@@ -818,11 +895,14 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
  * Dumps only failed requests of bulk insert
  */
 static rsRetVal
-getDataErrorOnly(context *ctx,int itemStatus,char *request,char *response)
+getDataErrorOnly(context *ctx,int itemStatus,char *request,char *response,
+		fjson_object *response_item, fjson_object *response_body, fjson_object *status)
 {
 	DEFiRet;
-	if(itemStatus)
-	{
+	(void)response_item; /* unused */
+	(void)response_body; /* unused */
+	(void)status; /* unused */
+	if(itemStatus) {
 		fjson_object *onlyErrorResponses =NULL;
 		fjson_object *onlyErrorRequests=NULL;
 
@@ -855,9 +935,16 @@ static rsRetVal
 getDataInterleaved(context *ctx,
 	int __attribute__((unused)) itemStatus,
 	char *request,
-	char *response)
+	char *response,
+	fjson_object *response_item,
+	fjson_object *response_body,
+	fjson_object *status
+)
 {
 	DEFiRet;
+	(void)response_item; /* unused */
+	(void)response_body; /* unused */
+	(void)status; /* unused */
 	fjson_object *interleaved =NULL;
 	if(!fjson_object_object_get_ex(ctx->errRoot, "response", &interleaved))
 	{
@@ -889,11 +976,13 @@ getDataInterleaved(context *ctx,
  */
 
 static rsRetVal
-getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *response)
+getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *response,
+		fjson_object *response_item, fjson_object *response_body, fjson_object *status)
 {
 	DEFiRet;
 	if (itemStatus) {
-		if(getDataInterleaved(ctx, itemStatus,request,response)!= RS_RET_OK) {
+		if(getDataInterleaved(ctx, itemStatus,request,response,
+				response_item, response_body, status)!= RS_RET_OK) {
 			ABORT_FINALIZE(RS_RET_ERR);
 		}
 	}
@@ -902,6 +991,141 @@ getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *resp
 		RETiRet;
 }
 
+/* request string looks like this:
+ * "{\"create\":{\"_index\": \"rsyslog_testbench\",\"_type\":\"test-type\",
+ *   \"_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}}\n
+ * {\"msgnum\":\"x00000000\",\"viaq_msg_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}\n"
+ * we don't want the meta header, only the data part
+ * start = first \n + 1
+ * end = last \n
+ */
+static rsRetVal
+createMsgFromRequest(const char *request, context *ctx, smsg_t **msg)
+{
+	DEFiRet;
+	fjson_object *jo_msg = NULL;
+	const char *datastart, *dataend;
+	size_t datalen;
+	enum json_tokener_error json_error;
+
+	*msg = NULL;
+	if (!(datastart = strchr(request, '\n')) || (datastart[1] != '{')) {
+		LogError(0, RS_RET_ERR,
+			"omelasticsearch: malformed original request - "
+			"could not find start of original data [%s]",
+			request);
+		ABORT_FINALIZE(RS_RET_ERR);
+	}
+	datastart++; /* advance to { */
+	if (!(dataend = strchr(datastart, '\n')) || (dataend[1] != '\0')) {
+		LogError(0, RS_RET_ERR,
+			"omelasticsearch: malformed original request - "
+			"could not find end of original data [%s]",
+			request);
+		ABORT_FINALIZE(RS_RET_ERR);
+	}
+	datalen = dataend - datastart;
+	json_tokener_reset(ctx->jTokener);
+	fjson_object *jo_request = json_tokener_parse_ex(ctx->jTokener, datastart, datalen);
+	json_error = fjson_tokener_get_error(ctx->jTokener);
+	if (!jo_request || (json_error != fjson_tokener_success)) {
+		LogError(0, RS_RET_ERR,
+			"omelasticsearch: parse error [%s] - could not convert original "
+			"request JSON back into JSON object [%s]",
+			fjson_tokener_error_desc(json_error), request);
+		ABORT_FINALIZE(RS_RET_ERR);
+	}
+
+	CHKiRet(msgConstruct(msg));
+	MsgSetFlowControlType(*msg, eFLOWCTL_FULL_DELAY);
+	MsgSetInputName(*msg, pInputName);
+	if (fjson_object_object_get_ex(jo_request, "message", &jo_msg)) {
+		const char *rawmsg = json_object_get_string(jo_msg);
+		const size_t msgLen = (size_t)json_object_get_string_len(jo_msg);
+		MsgSetRawMsg(*msg, rawmsg, msgLen);
+	} else {
+		MsgSetRawMsg(*msg, request, strlen(request));
+	}
+	MsgSetMSGoffs(*msg, 0);	/* we do not have a header... */
+	CHKiRet(msgAddJSON(*msg, (uchar*)"!", jo_request, 0, 0));
+
+	finalize_it:
+		RETiRet;
+
+}
+
+
+static rsRetVal
+getDataRetryFailures(context *ctx,int itemStatus,char *request,char *response,
+		fjson_object *response_item, fjson_object *response_body, fjson_object *status)
+{
+	DEFiRet;
+	fjson_object *omes = NULL, *jo = NULL;
+	int istatus = fjson_object_get_int(status);
+	int iscreateop = 0;
+	struct json_object_iterator it = json_object_iter_begin(response_item);
+	struct json_object_iterator itEnd = json_object_iter_end(response_item);
+	const char *optype = NULL;
+	smsg_t *msg = NULL;
+
+	(void)response;
+	(void)itemStatus;
+	CHKiRet(createMsgFromRequest(request, ctx, &msg));
+	CHKmalloc(msg);
+	/* add status as local variables */
+	omes = json_object_new_object();
+	if (!json_object_iter_equal(&it, &itEnd))
+		optype = json_object_iter_peek_name(&it);
+	if (optype && !strcmp("create", optype))
+		iscreateop = 1;
+	if (optype && !strcmp("index", optype) && (ctx->writeOperation == ES_WRITE_INDEX))
+		iscreateop = 1;
+	if (optype) {
+		jo = json_object_new_string(optype);
+	} else {
+		jo = json_object_new_string("unknown");
+	}
+	json_object_object_add(omes, "writeoperation", jo);
+
+	if (!optype) {
+		STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
+	} else if ((istatus == 200) || (istatus == 201)) {
+		STATSCOUNTER_INC(indexSuccess, mutIndexSuccess);
+	} else if ((istatus == 409) && iscreateop) {
+		STATSCOUNTER_INC(indexDuplicate, mutIndexDuplicate);
+	} else if (istatus == 400 || (istatus < 200)) {
+		STATSCOUNTER_INC(indexBadArgument, mutIndexBadArgument);
+	} else {
+		fjson_object *error = NULL, *errtype = NULL;
+		if(fjson_object_object_get_ex(response_body, "error", &error) &&
+		   fjson_object_object_get_ex(error, "type", &errtype)) {
+			if (istatus == 429) {
+				STATSCOUNTER_INC(indexBulkRejection, mutIndexBulkRejection);
+			} else {
+				STATSCOUNTER_INC(indexOtherResponse, mutIndexOtherResponse);
+			}
+		} else {
+			STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
+		}
+	}
+	/* add response_body fields to local var omes */
+	it = json_object_iter_begin(response_body);
+	itEnd = json_object_iter_end(response_body);
+	while (!json_object_iter_equal(&it, &itEnd)) {
+		json_object_object_add(omes, json_object_iter_peek_name(&it),
+			json_object_get(json_object_iter_peek_value(&it)));
+		json_object_iter_next(&it);
+	}
+	CHKiRet(msgAddJSON(msg, (uchar*)".omes", omes, 0, 0));
+	omes = NULL;
+	MsgSetRuleset(msg, ctx->retryRuleset);
+	CHKiRet(ratelimitAddMsg(ctx->ratelimiter, NULL, msg));
+finalize_it:
+	if (omes)
+		json_object_put(omes);
+	RETiRet;
+}
+
 /*
  * get erroronly context
  */
@@ -979,6 +1203,23 @@ initializeErrorInterleavedConext(wrkrInstanceData_t *pWrkrData,context *ctx){
 		RETiRet;
 }
 
+/*get retry failures context*/
+static rsRetVal
+initializeRetryFailuresContext(wrkrInstanceData_t *pWrkrData,context *ctx){
+	DEFiRet;
+	ctx->statusCheckOnly=0;
+	fjson_object *errRoot=NULL;
+	if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
+
+
+	fjson_object_object_add(errRoot, "url", fjson_object_new_string((char*)pWrkrData->restURL));
+	ctx->errRoot = errRoot;
+	ctx->prepareErrorFileContent= &getDataRetryFailures;
+	CHKmalloc(ctx->jTokener = json_tokener_new());
+	finalize_it:
+		RETiRet;
+}
+
 
 /* write data error request/replies to separate error file
  * Note: we open the file but never close it before exit. If it
@@ -994,6 +1235,10 @@ writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, fjson_object
 	char errStr[1024];
 	context ctx;
 	ctx.errRoot=0;
+	ctx.writeOperation = pWrkrData->pData->writeOperation;
+	ctx.ratelimiter = pWrkrData->pData->ratelimiter;
+	ctx.retryRuleset = pWrkrData->pData->retryRuleset;
+	ctx.jTokener = NULL;
 	DEFiRet;
 
 	if(pData->errorFile == NULL) {
@@ -1039,9 +1284,12 @@ writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, fjson_object
 				DBGPRINTF("omelasticsearch: error initializing error interleaved context.\n");
 				ABORT_FINALIZE(RS_RET_ERR);
 			}
-		}
-		else
-		{
+		} else if(pData->retryFailures) {
+			if(initializeRetryFailuresContext(pWrkrData, &ctx) != RS_RET_OK) {
+				DBGPRINTF("omelasticsearch: error initializing retry failures context.\n");
+				ABORT_FINALIZE(RS_RET_ERR);
+			}
+		} else {
 			DBGPRINTF("omelasticsearch: None of the modes match file write. No data to write.\n");
 			ABORT_FINALIZE(RS_RET_ERR);
 		}
@@ -1082,25 +1330,38 @@ finalize_it:
 	if(bMutLocked)
 		pthread_mutex_unlock(&pData->mutErrFile);
 	fjson_object_put(ctx.errRoot);
+	if (ctx.jTokener)
+		json_tokener_free(ctx.jTokener);
+	free(rendered);
 	RETiRet;
 }
 
 
 static rsRetVal
-checkResultBulkmode(wrkrInstanceData_t *pWrkrData, fjson_object *root)
+checkResultBulkmode(wrkrInstanceData_t *pWrkrData, fjson_object *root, uchar *reqmsg)
 {
 	DEFiRet;
 	context ctx;
-	ctx.statusCheckOnly=1;
 	ctx.errRoot = 0;
-	if(parseRequestAndResponseForContext(pWrkrData,&root,0,&ctx)!= RS_RET_OK)
-	{
+	ctx.writeOperation = pWrkrData->pData->writeOperation;
+	ctx.ratelimiter = pWrkrData->pData->ratelimiter;
+	ctx.retryRuleset = pWrkrData->pData->retryRuleset;
+	ctx.statusCheckOnly=1;
+	ctx.jTokener = NULL;
+	if (pWrkrData->pData->retryFailures) {
+		ctx.statusCheckOnly=0;
+		CHKiRet(initializeRetryFailuresContext(pWrkrData, &ctx));
+	}
+	if(parseRequestAndResponseForContext(pWrkrData,&root,reqmsg,&ctx)!= RS_RET_OK) {
 		DBGPRINTF("omelasticsearch: error found in elasticsearch reply\n");
 		ABORT_FINALIZE(RS_RET_DATAFAIL);
 	}
 
-	finalize_it:
-		RETiRet;
+finalize_it:
+	fjson_object_put(ctx.errRoot);
+	if (ctx.jTokener)
+		json_tokener_free(ctx.jTokener);
+	RETiRet;
 }
 
 
@@ -1118,7 +1378,7 @@ checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg)
 	}
 
 	if(pWrkrData->pData->bulkmode) {
-		iRet = checkResultBulkmode(pWrkrData, root);
+		iRet = checkResultBulkmode(pWrkrData, root, reqmsg);
 	} else {
 		if(fjson_object_object_get_ex(root, "status", &status)) {
 			iRet = RS_RET_DATAFAIL;
@@ -1397,6 +1657,13 @@ setInstParamDefaults(instanceData *pData)
 	pData->caCertFile = NULL;
 	pData->myCertFile = NULL;
 	pData->myPrivKeyFile = NULL;
+	pData->writeOperation = ES_WRITE_INDEX;
+	pData->retryFailures = 0;
+	pData->ratelimitBurst = 20000;
+	pData->ratelimitInterval = 600;
+	pData->ratelimiter = NULL;
+	pData->retryRulesetName = NULL;
+	pData->retryRuleset = NULL;
 }
 
 BEGINnewActInst
@@ -1495,6 +1762,27 @@ CODESTARTnewActInst
 			} else {
 				fclose(fp);
 			}
+		} else if(!strcmp(actpblk.descr[i].name, "writeoperation")) {
+			char *writeop = es_str2cstr(pvals[i].val.d.estr, NULL);
+			if (writeop && !strcmp(writeop, "create")) {
+				pData->writeOperation = ES_WRITE_CREATE;
+			} else if (writeop && !strcmp(writeop, "index")) {
+				pData->writeOperation = ES_WRITE_INDEX;
+			} else if (writeop) {
+				errmsg.LogError(0, RS_RET_CONFIG_ERROR,
+					"omelasticsearch: invalid value '%s' for writeoperation: "
+					"must be one of 'index' or 'create' - using default value 'index'", writeop);
+				pData->writeOperation = ES_WRITE_INDEX;
+			}
+			free(writeop);
+		} else if(!strcmp(actpblk.descr[i].name, "retryfailures")) {
+			pData->retryFailures = pvals[i].val.d.n;
+		} else if(!strcmp(actpblk.descr[i].name, "ratelimit.burst")) {
+			pData->ratelimitBurst = (int) pvals[i].val.d.n;
+		} else if(!strcmp(actpblk.descr[i].name, "ratelimit.interval")) {
+			pData->ratelimitInterval = (int) pvals[i].val.d.n;
+		} else if(!strcmp(actpblk.descr[i].name, "retryruleset")) {
+			pData->retryRulesetName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
 		} else {
 			dbgprintf("omelasticsearch: program error, non-handled "
 			  "param '%s'\n", actpblk.descr[i].name);
@@ -1661,6 +1949,27 @@ CODESTARTnewActInst
 		pData->searchIndex = (uchar*) strdup("system");
 	if(pData->searchType == NULL)
 		pData->searchType = (uchar*) strdup("events");
+
+	if ((pData->writeOperation != ES_WRITE_INDEX) && (pData->bulkId == NULL)) {
+		errmsg.LogError(0, RS_RET_CONFIG_ERROR,
+			"omelasticsearch: writeoperation '%d' requires bulkid", pData->writeOperation);
+		ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+	}
+
+	if (pData->retryFailures) {
+		CHKiRet(ratelimitNew(&pData->ratelimiter, "omelasticsearch", NULL));
+		ratelimitSetLinuxLike(pData->ratelimiter, pData->ratelimitInterval, pData->ratelimitBurst);
+		ratelimitSetNoTimeCache(pData->ratelimiter);
+	}
+
+	/* node created, let's add to list of instance configs for the module */
+	if(loadModConf->tail == NULL) {
+		loadModConf->tail = loadModConf->root = pData;
+	} else {
+		loadModConf->tail->next = pData;
+		loadModConf->tail = pData;
+	}
+
 CODE_STD_FINALIZERnewActInst
 	cnfparamvalsDestruct(pvals, &actpblk);
 	if (serverParam)
@@ -1680,6 +1989,51 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
 CODE_STD_FINALIZERparseSelectorAct
 ENDparseSelectorAct
 
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+	loadModConf = pModConf;
+	pModConf->pConf = pConf;
+	pModConf->root = pModConf->tail = NULL;
+ENDbeginCnfLoad
+
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+	loadModConf = NULL; /* done loading */
+ENDendCnfLoad
+
+
+BEGINcheckCnf
+	instanceConf_t *inst;
+CODESTARTcheckCnf
+	for(inst = pModConf->root ; inst != NULL ; inst = inst->next) {
+		ruleset_t *pRuleset;
+		rsRetVal localRet;
+
+		if (inst->retryRulesetName) {
+			localRet = ruleset.GetRuleset(pModConf->pConf, &pRuleset, inst->retryRulesetName);
+			if(localRet == RS_RET_NOT_FOUND) {
+				errmsg.LogError(0, localRet, "omelasticsearch: retryruleset '%s' not found - "
+						"no retry ruleset will be used", inst->retryRulesetName);
+			} else {
+				inst->retryRuleset = pRuleset;
+			}
+		}
+	}
+ENDcheckCnf
+
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+ENDactivateCnf
+
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+ENDfreeCnf
+
+
 BEGINdoHUP
 CODESTARTdoHUP
 	if(pData->fdErrFile != -1) {
@@ -1691,10 +2045,14 @@ ENDdoHUP
 
 BEGINmodExit
 CODESTARTmodExit
+	if(pInputName != NULL)
+		prop.Destruct(&pInputName);
 	curl_global_cleanup();
 	statsobj.Destruct(&indexStats);
 	objRelease(errmsg, CORE_COMPONENT);
-        objRelease(statsobj, CORE_COMPONENT);
+	objRelease(statsobj, CORE_COMPONENT);
+	objRelease(prop, CORE_COMPONENT);
+	objRelease(ruleset, CORE_COMPONENT);
 ENDmodExit
 
 BEGINqueryEtryPt
@@ -1705,6 +2063,7 @@ CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
 CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
 CODEqueryEtryPt_doHUP
 CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
+CODEqueryEtryPt_STD_CONF2_QUERIES
 ENDqueryEtryPt
 
 
@@ -1714,6 +2073,8 @@ CODESTARTmodInit
 CODEmodInit_QueryRegCFSLineHdlr
 	CHKiRet(objUse(errmsg, CORE_COMPONENT));
 	CHKiRet(objUse(statsobj, CORE_COMPONENT));
+	CHKiRet(objUse(prop, CORE_COMPONENT));
+	CHKiRet(objUse(ruleset, CORE_COMPONENT));
 
 	if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
 		errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled");
@@ -1739,7 +2100,28 @@ CODEmodInit_QueryRegCFSLineHdlr
 	STATSCOUNTER_INIT(indexESFail, mutIndexESFail);
 	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.es",
 		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexESFail));
+	STATSCOUNTER_INIT(indexSuccess, mutIndexSuccess);
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.success",
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexSuccess));
+	STATSCOUNTER_INIT(indexBadResponse, mutIndexBadResponse);
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bad",
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadResponse));
+	STATSCOUNTER_INIT(indexDuplicate, mutIndexDuplicate);
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.duplicate",
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexDuplicate));
+	STATSCOUNTER_INIT(indexBadArgument, mutIndexBadArgument);
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.badargument",
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadArgument));
+	STATSCOUNTER_INIT(indexBulkRejection, mutIndexBulkRejection);
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bulkrejection",
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBulkRejection));
+	STATSCOUNTER_INIT(indexOtherResponse, mutIndexOtherResponse);
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.other",
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexOtherResponse));
 	CHKiRet(statsobj.ConstructFinalize(indexStats));
+	CHKiRet(prop.Construct(&pInputName));
+	CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("omelasticsearch"), sizeof("omelasticsearch") - 1));
+	CHKiRet(prop.ConstructFinalize(pInputName));
 ENDmodInit
 
 /* vi:set ai: