Blame SOURCES/rsyslog-8.24.0-rhbz1565214-omelasticsearch-write-op-types-bulk-rejection-retries.patch

f656cf
From 989be897340eb458b00efedfd5e082bb362db79a Mon Sep 17 00:00:00 2001
f656cf
From: Rich Megginson <rmeggins@redhat.com>
f656cf
Date: Tue, 15 May 2018 16:03:25 -0600
f656cf
Subject: [PATCH 11/11] omelasticsearch: write op types; bulk rejection retries
f656cf
f656cf
Add support for a 'create' write operation type in addition to
f656cf
the default 'index'.  Using create allows specifying a unique id
f656cf
for each record, and allows duplicate document detection.
f656cf
f656cf
Add support for checking each record returned in a bulk index
f656cf
request response.  Allow specifying a ruleset to send each failed
f656cf
record to.  Add a local variable `omes` which contains the
f656cf
information in the error response, so that users can control how
f656cf
to handle responses e.g. retry, or send to an error file.
f656cf
f656cf
Add support for response stats - count successes, duplicates, and
f656cf
different types of failures.
f656cf
f656cf
Add testing for bulk index rejections.
f656cf
f656cf
(cherry picked from commit 57dd368a2a915d79c94a8dc0de30c93a0bbdc8fe)
f656cf
(cherry picked from commit 30a15621e1e7e393b2153e9fe5c13f724dea25b5)
f656cf
---
f656cf
 plugins/omelasticsearch/omelasticsearch.c | 441 ++++++++++++++++++++++++++++--
f656cf
 1 file changed, 415 insertions(+), 26 deletions(-)
f656cf
f656cf
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
f656cf
index ed2b47535..ca61ae28f 100644
f656cf
--- a/plugins/omelasticsearch/omelasticsearch.c
f656cf
+++ b/plugins/omelasticsearch/omelasticsearch.c
f656cf
@@ -51,6 +51,8 @@
f656cf
 #include "statsobj.h"
f656cf
 #include "cfsysline.h"
f656cf
 #include "unicode-helper.h"
f656cf
+#include "ratelimit.h"
f656cf
+#include "ruleset.h"
f656cf
 
f656cf
 #ifndef O_LARGEFILE
f656cf
 #  define O_LARGEFILE 0
f656cf
@@ -64,6 +66,8 @@ MODULE_CNFNAME("omelasticsearch")
f656cf
 DEF_OMOD_STATIC_DATA
f656cf
 DEFobjCurrIf(errmsg)
f656cf
 DEFobjCurrIf(statsobj)
f656cf
+DEFobjCurrIf(prop)
f656cf
+DEFobjCurrIf(ruleset)
f656cf
 
f656cf
 statsobj_t *indexStats;
f656cf
 STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit)
f656cf
@@ -71,19 +75,35 @@ STATSCOUNTER_DEF(indexHTTPFail, mutIndexHTTPFail)
f656cf
 STATSCOUNTER_DEF(indexHTTPReqFail, mutIndexHTTPReqFail)
f656cf
 STATSCOUNTER_DEF(checkConnFail, mutCheckConnFail)
f656cf
 STATSCOUNTER_DEF(indexESFail, mutIndexESFail)
f656cf
+STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess)
f656cf
+STATSCOUNTER_DEF(indexBadResponse, mutIndexBadResponse)
f656cf
+STATSCOUNTER_DEF(indexDuplicate, mutIndexDuplicate)
f656cf
+STATSCOUNTER_DEF(indexBadArgument, mutIndexBadArgument)
f656cf
+STATSCOUNTER_DEF(indexBulkRejection, mutIndexBulkRejection)
f656cf
+STATSCOUNTER_DEF(indexOtherResponse, mutIndexOtherResponse)
f656cf
 
f656cf
+static prop_t *pInputName = NULL;
f656cf
 
f656cf
 #	define META_STRT "{\"index\":{\"_index\": \""
f656cf
+#	define META_STRT_CREATE "{\"create\":{\"_index\": \""
f656cf
 #	define META_TYPE "\",\"_type\":\""
f656cf
 #	define META_PARENT "\",\"_parent\":\""
f656cf
 #	define META_ID "\", \"_id\":\""
f656cf
 #	define META_END  "\"}}\n"
f656cf
 
f656cf
+typedef enum {
f656cf
+	ES_WRITE_INDEX,
f656cf
+	ES_WRITE_CREATE,
f656cf
+	ES_WRITE_UPDATE, /* not supported */
f656cf
+	ES_WRITE_UPSERT /* not supported */
f656cf
+} es_write_ops_t;
f656cf
+
f656cf
 /* REST API for elasticsearch hits this URL:
f656cf
  * http://<hostName>:<restPort>/<searchIndex>/<searchType>
f656cf
  */
f656cf
+/* bulk API uses /_bulk */
f656cf
 typedef struct curl_slist HEADER;
f656cf
-typedef struct _instanceData {
f656cf
+typedef struct instanceConf_s {
f656cf
 	int defaultPort;
f656cf
 	int fdErrFile;		/* error file fd or -1 if not open */
f656cf
 	pthread_mutex_t mutErrFile;
f656cf
@@ -113,8 +133,25 @@ typedef struct _instanceData {
f656cf
 	uchar *caCertFile;
f656cf
 	uchar *myCertFile;
f656cf
 	uchar *myPrivKeyFile;
f656cf
+	es_write_ops_t writeOperation;
f656cf
+	sbool retryFailures;
f656cf
+	int ratelimitInterval;
f656cf
+	int ratelimitBurst;
f656cf
+	/* for retries */
f656cf
+	ratelimit_t *ratelimiter;
f656cf
+	uchar *retryRulesetName;
f656cf
+	ruleset_t *retryRuleset;
f656cf
+	struct instanceConf_s *next;
f656cf
 } instanceData;
f656cf
 
f656cf
+typedef instanceConf_t instanceData;
f656cf
+
f656cf
+struct modConfData_s {
f656cf
+	rsconf_t *pConf;		/* our overall config object */
f656cf
+	instanceConf_t *root, *tail;
f656cf
+};
f656cf
+static modConfData_t *loadModConf = NULL;	/* modConf ptr to use for the current load process */
f656cf
+
f656cf
 typedef struct wrkrInstanceData {
f656cf
 	instanceData *pData;
f656cf
 	int serverIndex;
f656cf
@@ -160,7 +197,12 @@ static struct cnfparamdescr actpdescr[] = {
f656cf
 	{ "allowunsignedcerts", eCmdHdlrBinary, 0 },
f656cf
 	{ "tls.cacert", eCmdHdlrString, 0 },
f656cf
 	{ "tls.mycert", eCmdHdlrString, 0 },
f656cf
-	{ "tls.myprivkey", eCmdHdlrString, 0 }
f656cf
+	{ "tls.myprivkey", eCmdHdlrString, 0 },
f656cf
+	{ "writeoperation", eCmdHdlrGetWord, 0 },
f656cf
+	{ "retryfailures", eCmdHdlrBinary, 0 },
f656cf
+	{ "ratelimit.interval", eCmdHdlrInt, 0 },
f656cf
+	{ "ratelimit.burst", eCmdHdlrInt, 0 },
f656cf
+	{ "retryruleset", eCmdHdlrString, 0 }
f656cf
 };
f656cf
 static struct cnfparamblk actpblk =
f656cf
 	{ CNFPARAMBLK_VERSION,
f656cf
@@ -177,6 +219,9 @@ CODESTARTcreateInstance
f656cf
 	pData->caCertFile = NULL;
f656cf
 	pData->myCertFile = NULL;
f656cf
 	pData->myPrivKeyFile = NULL;
f656cf
+	pData->ratelimiter = NULL;
f656cf
+	pData->retryRulesetName = NULL;
f656cf
+	pData->retryRuleset = NULL;
f656cf
 ENDcreateInstance
f656cf
 
f656cf
 BEGINcreateWrkrInstance
f656cf
@@ -228,6 +273,9 @@ CODESTARTfreeInstance
f656cf
 	free(pData->caCertFile);
f656cf
 	free(pData->myCertFile);
f656cf
 	free(pData->myPrivKeyFile);
f656cf
+	free(pData->retryRulesetName);
f656cf
+	if (pData->ratelimiter != NULL)
f656cf
+		ratelimitDestruct(pData->ratelimiter);
f656cf
 ENDfreeInstance
f656cf
 
f656cf
 BEGINfreeWrkrInstance
f656cf
@@ -285,6 +333,10 @@ CODESTARTdbgPrintInstInfo
f656cf
 	dbgprintf("\ttls.cacert='%s'\n", pData->caCertFile);
f656cf
 	dbgprintf("\ttls.mycert='%s'\n", pData->myCertFile);
f656cf
 	dbgprintf("\ttls.myprivkey='%s'\n", pData->myPrivKeyFile);
f656cf
+	dbgprintf("\twriteoperation='%d'\n", pData->writeOperation);
f656cf
+	dbgprintf("\tretryfailures='%d'\n", pData->retryFailures);
f656cf
+	dbgprintf("\tratelimit.interval='%d'\n", pData->ratelimitInterval);
f656cf
+	dbgprintf("\tratelimit.burst='%d'\n", pData->ratelimitBurst);
f656cf
 ENDdbgPrintInstInfo
f656cf
 
f656cf
 
f656cf
@@ -557,7 +609,11 @@ finalize_it:
f656cf
 static size_t
f656cf
 computeMessageSize(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
f656cf
 {
f656cf
-	size_t r = sizeof(META_STRT)-1 + sizeof(META_TYPE)-1 + sizeof(META_END)-1 + sizeof("\n")-1;
f656cf
+	size_t r = sizeof(META_TYPE)-1 + sizeof(META_END)-1 + sizeof("\n")-1;
f656cf
+	if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
f656cf
+		r += sizeof(META_STRT_CREATE)-1;
f656cf
+	else
f656cf
+		r += sizeof(META_STRT)-1;
f656cf
 
f656cf
 	uchar *searchIndex = 0;
f656cf
 	uchar *searchType;
f656cf
@@ -594,7 +650,10 @@ buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
f656cf
 	DEFiRet;
f656cf
 
f656cf
 	getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId);
f656cf
-	r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1);
f656cf
+	if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
f656cf
+		r = es_addBuf(&pWrkrData->batch.data, META_STRT_CREATE, sizeof(META_STRT_CREATE)-1);
f656cf
+	else
f656cf
+		r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1);
f656cf
 	if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex,
f656cf
 				 ustrlen(searchIndex));
f656cf
 	if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
f656cf
@@ -709,13 +768,20 @@ static int checkReplyStatus(fjson_object* ok) {
f656cf
 
f656cf
 /*
f656cf
  * Context object for error file content creation or status check
f656cf
+ * response_item - the full {"create":{"_index":"idxname",.....}}
f656cf
+ * response_body - the inner hash of the response_item - {"_index":"idxname",...}
f656cf
+ * status - the "status" field from the inner hash - "status":500
f656cf
+ *          should be able to use fjson_object_get_int(status) to get the http result code
f656cf
  */
f656cf
 typedef struct exeContext{
f656cf
 	int statusCheckOnly;
f656cf
 	fjson_object *errRoot;
f656cf
-	rsRetVal (*prepareErrorFileContent)(struct exeContext *ctx,int itemStatus,char *request,char *response);
f656cf
-
f656cf
-
f656cf
+	rsRetVal (*prepareErrorFileContent)(struct exeContext *ctx,int itemStatus,char *request,char *response,
f656cf
+			fjson_object *response_item, fjson_object *response_body, fjson_object *status);
f656cf
+	es_write_ops_t writeOperation;
f656cf
+	ratelimit_t *ratelimiter;
f656cf
+	ruleset_t *retryRuleset;
f656cf
+	struct json_tokener *jTokener;
f656cf
 } context;
f656cf
 
f656cf
 /*
f656cf
@@ -728,8 +794,15 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
f656cf
 	fjson_object *replyRoot = *pReplyRoot;
f656cf
 	int i;
f656cf
 	int numitems;
f656cf
-	fjson_object *items=NULL;
f656cf
+	fjson_object *items=NULL, *jo_errors = NULL;
f656cf
+	int errors = 0;
f656cf
 
f656cf
+	if(fjson_object_object_get_ex(replyRoot, "errors", &jo_errors)) {
f656cf
+		errors = fjson_object_get_boolean(jo_errors);
f656cf
+		if (!errors && pWrkrData->pData->retryFailures) {
f656cf
+			return RS_RET_OK;
f656cf
+		}
f656cf
+	}
f656cf
 
f656cf
 	/*iterate over items*/
f656cf
 	if(!fjson_object_object_get_ex(replyRoot, "items", &items)) {
f656cf
@@ -741,7 +814,11 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
f656cf
 
f656cf
 	numitems = fjson_object_array_length(items);
f656cf
 
f656cf
-	DBGPRINTF("omelasticsearch: Entire request %s\n",reqmsg);
f656cf
+	if (reqmsg) {
f656cf
+		DBGPRINTF("omelasticsearch: Entire request %s\n", reqmsg);
f656cf
+	} else {
f656cf
+		DBGPRINTF("omelasticsearch: Empty request\n");
f656cf
+	}
f656cf
 	const char *lastReqRead= (char*)reqmsg;
f656cf
 
f656cf
 	DBGPRINTF("omelasticsearch: %d items in reply\n", numitems);
f656cf
@@ -769,8 +846,7 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
f656cf
 		
f656cf
 		char *request =0;
f656cf
 		char *response =0;
f656cf
-		if(ctx->statusCheckOnly)
f656cf
-		{
f656cf
+		if(ctx->statusCheckOnly || (NULL == lastReqRead)) {
f656cf
 			if(itemStatus) {
f656cf
 				DBGPRINTF("omelasticsearch: error in elasticsearch reply: item %d, status is %d\n", i, fjson_object_get_int(ok));
f656cf
 				DBGPRINTF("omelasticsearch: status check found error.\n");
f656cf
@@ -795,7 +871,8 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
f656cf
 			}
f656cf
 
f656cf
 			/*call the context*/
f656cf
-			rsRetVal ret = ctx->prepareErrorFileContent(ctx, itemStatus, request,response);
f656cf
+			rsRetVal ret = ctx->prepareErrorFileContent(ctx, itemStatus, request,
f656cf
+					response, item, result, ok);
f656cf
 
f656cf
 			/*free memory in any case*/
f656cf
 			free(request);
f656cf
@@ -818,11 +895,14 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
f656cf
  * Dumps only failed requests of bulk insert
f656cf
  */
f656cf
 static rsRetVal
f656cf
-getDataErrorOnly(context *ctx,int itemStatus,char *request,char *response)
f656cf
+getDataErrorOnly(context *ctx,int itemStatus,char *request,char *response,
f656cf
+		fjson_object *response_item, fjson_object *response_body, fjson_object *status)
f656cf
 {
f656cf
 	DEFiRet;
f656cf
-	if(itemStatus)
f656cf
-	{
f656cf
+	(void)response_item; /* unused */
f656cf
+	(void)response_body; /* unused */
f656cf
+	(void)status; /* unused */
f656cf
+	if(itemStatus) {
f656cf
 		fjson_object *onlyErrorResponses =NULL;
f656cf
 		fjson_object *onlyErrorRequests=NULL;
f656cf
 
f656cf
@@ -855,9 +935,16 @@ static rsRetVal
f656cf
 getDataInterleaved(context *ctx,
f656cf
 	int __attribute__((unused)) itemStatus,
f656cf
 	char *request,
f656cf
-	char *response)
f656cf
+	char *response,
f656cf
+	fjson_object *response_item,
f656cf
+	fjson_object *response_body,
f656cf
+	fjson_object *status
f656cf
+)
f656cf
 {
f656cf
 	DEFiRet;
f656cf
+	(void)response_item; /* unused */
f656cf
+	(void)response_body; /* unused */
f656cf
+	(void)status; /* unused */
f656cf
 	fjson_object *interleaved =NULL;
f656cf
 	if(!fjson_object_object_get_ex(ctx->errRoot, "response", &interleaved))
f656cf
 	{
f656cf
@@ -889,11 +976,13 @@ getDataInterleaved(context *ctx,
f656cf
  */
f656cf
 
f656cf
 static rsRetVal
f656cf
-getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *response)
f656cf
+getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *response,
f656cf
+		fjson_object *response_item, fjson_object *response_body, fjson_object *status)
f656cf
 {
f656cf
 	DEFiRet;
f656cf
 	if (itemStatus) {
f656cf
-		if(getDataInterleaved(ctx, itemStatus,request,response)!= RS_RET_OK) {
f656cf
+		if(getDataInterleaved(ctx, itemStatus,request,response,
f656cf
+				response_item, response_body, status)!= RS_RET_OK) {
f656cf
 			ABORT_FINALIZE(RS_RET_ERR);
f656cf
 		}
f656cf
 	}
f656cf
@@ -902,6 +991,141 @@ getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *resp
f656cf
 		RETiRet;
f656cf
 }
f656cf
 
f656cf
+/* request string looks like this:
f656cf
+ * "{\"create\":{\"_index\": \"rsyslog_testbench\",\"_type\":\"test-type\",
f656cf
+ *   \"_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}}\n
f656cf
+ * {\"msgnum\":\"x00000000\",\"viaq_msg_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}\n"
f656cf
+ * we don't want the meta header, only the data part
f656cf
+ * start = first \n + 1
f656cf
+ * end = last \n
f656cf
+ */
f656cf
+static rsRetVal
f656cf
+createMsgFromRequest(const char *request, context *ctx, smsg_t **msg)
f656cf
+{
f656cf
+	DEFiRet;
f656cf
+	fjson_object *jo_msg = NULL;
f656cf
+	const char *datastart, *dataend;
f656cf
+	size_t datalen;
f656cf
+	enum json_tokener_error json_error;
f656cf
+
f656cf
+	*msg = NULL;
f656cf
+	if (!(datastart = strchr(request, '\n')) || (datastart[1] != '{')) {
f656cf
+		LogError(0, RS_RET_ERR,
f656cf
+			"omelasticsearch: malformed original request - "
f656cf
+			"could not find start of original data [%s]",
f656cf
+			request);
f656cf
+		ABORT_FINALIZE(RS_RET_ERR);
f656cf
+	}
f656cf
+	datastart++; /* advance to { */
f656cf
+	if (!(dataend = strchr(datastart, '\n')) || (dataend[1] != '\0')) {
f656cf
+		LogError(0, RS_RET_ERR,
f656cf
+			"omelasticsearch: malformed original request - "
f656cf
+			"could not find end of original data [%s]",
f656cf
+			request);
f656cf
+		ABORT_FINALIZE(RS_RET_ERR);
f656cf
+	}
f656cf
+	datalen = dataend - datastart;
f656cf
+	json_tokener_reset(ctx->jTokener);
f656cf
+	fjson_object *jo_request = json_tokener_parse_ex(ctx->jTokener, datastart, datalen);
f656cf
+	json_error = fjson_tokener_get_error(ctx->jTokener);
f656cf
+	if (!jo_request || (json_error != fjson_tokener_success)) {
f656cf
+		LogError(0, RS_RET_ERR,
f656cf
+			"omelasticsearch: parse error [%s] - could not convert original "
f656cf
+			"request JSON back into JSON object [%s]",
f656cf
+			fjson_tokener_error_desc(json_error), request);
f656cf
+		ABORT_FINALIZE(RS_RET_ERR);
f656cf
+	}
f656cf
+
f656cf
+	CHKiRet(msgConstruct(msg));
f656cf
+	MsgSetFlowControlType(*msg, eFLOWCTL_FULL_DELAY);
f656cf
+	MsgSetInputName(*msg, pInputName);
f656cf
+	if (fjson_object_object_get_ex(jo_request, "message", &jo_msg)) {
f656cf
+		const char *rawmsg = json_object_get_string(jo_msg);
f656cf
+		const size_t msgLen = (size_t)json_object_get_string_len(jo_msg);
f656cf
+		MsgSetRawMsg(*msg, rawmsg, msgLen);
f656cf
+	} else {
f656cf
+		MsgSetRawMsg(*msg, request, strlen(request));
f656cf
+	}
f656cf
+	MsgSetMSGoffs(*msg, 0);	/* we do not have a header... */
f656cf
+	CHKiRet(msgAddJSON(*msg, (uchar*)"!", jo_request, 0, 0));
f656cf
+
f656cf
+	finalize_it:
f656cf
+		RETiRet;
f656cf
+
f656cf
+}
f656cf
+
f656cf
+
f656cf
+static rsRetVal
f656cf
+getDataRetryFailures(context *ctx,int itemStatus,char *request,char *response,
f656cf
+		fjson_object *response_item, fjson_object *response_body, fjson_object *status)
f656cf
+{
f656cf
+	DEFiRet;
f656cf
+	fjson_object *omes = NULL, *jo = NULL;
f656cf
+	int istatus = fjson_object_get_int(status);
f656cf
+	int iscreateop = 0;
f656cf
+	struct json_object_iterator it = json_object_iter_begin(response_item);
f656cf
+	struct json_object_iterator itEnd = json_object_iter_end(response_item);
f656cf
+	const char *optype = NULL;
f656cf
+	smsg_t *msg = NULL;
f656cf
+
f656cf
+	(void)response;
f656cf
+	(void)itemStatus;
f656cf
+	CHKiRet(createMsgFromRequest(request, ctx, &msg));
f656cf
+	CHKmalloc(msg);
f656cf
+	/* add status as local variables */
f656cf
+	omes = json_object_new_object();
f656cf
+	if (!json_object_iter_equal(&it, &itEnd))
f656cf
+		optype = json_object_iter_peek_name(&it);
f656cf
+	if (optype && !strcmp("create", optype))
f656cf
+		iscreateop = 1;
f656cf
+	if (optype && !strcmp("index", optype) && (ctx->writeOperation == ES_WRITE_INDEX))
f656cf
+		iscreateop = 1;
f656cf
+	if (optype) {
f656cf
+		jo = json_object_new_string(optype);
f656cf
+	} else {
f656cf
+		jo = json_object_new_string("unknown");
f656cf
+	}
f656cf
+	json_object_object_add(omes, "writeoperation", jo);
f656cf
+
f656cf
+	if (!optype) {
f656cf
+		STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
f656cf
+	} else if ((istatus == 200) || (istatus == 201)) {
f656cf
+		STATSCOUNTER_INC(indexSuccess, mutIndexSuccess);
f656cf
+	} else if ((istatus == 409) && iscreateop) {
f656cf
+		STATSCOUNTER_INC(indexDuplicate, mutIndexDuplicate);
f656cf
+	} else if (istatus == 400 || (istatus < 200)) {
f656cf
+		STATSCOUNTER_INC(indexBadArgument, mutIndexBadArgument);
f656cf
+	} else {
f656cf
+		fjson_object *error = NULL, *errtype = NULL;
f656cf
+		if(fjson_object_object_get_ex(response_body, "error", &error) &&
f656cf
+		   fjson_object_object_get_ex(error, "type", &errtype)) {
f656cf
+			if (istatus == 429) {
f656cf
+				STATSCOUNTER_INC(indexBulkRejection, mutIndexBulkRejection);
f656cf
+			} else {
f656cf
+				STATSCOUNTER_INC(indexOtherResponse, mutIndexOtherResponse);
f656cf
+			}
f656cf
+		} else {
f656cf
+			STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
f656cf
+		}
f656cf
+	}
f656cf
+	/* add response_body fields to local var omes */
f656cf
+	it = json_object_iter_begin(response_body);
f656cf
+	itEnd = json_object_iter_end(response_body);
f656cf
+	while (!json_object_iter_equal(&it, &itEnd)) {
f656cf
+		json_object_object_add(omes, json_object_iter_peek_name(&it),
f656cf
+			json_object_get(json_object_iter_peek_value(&it)));
f656cf
+		json_object_iter_next(&it);
f656cf
+	}
f656cf
+	CHKiRet(msgAddJSON(msg, (uchar*)".omes", omes, 0, 0));
f656cf
+	omes = NULL;
f656cf
+	MsgSetRuleset(msg, ctx->retryRuleset);
f656cf
+	CHKiRet(ratelimitAddMsg(ctx->ratelimiter, NULL, msg));
f656cf
+finalize_it:
f656cf
+	if (omes)
f656cf
+		json_object_put(omes);
f656cf
+	RETiRet;
f656cf
+}
f656cf
+
f656cf
 /*
f656cf
  * get erroronly context
f656cf
  */
f656cf
@@ -979,6 +1203,23 @@ initializeErrorInterleavedConext(wrkrInstanceData_t *pWrkrData,context *ctx){
f656cf
 		RETiRet;
f656cf
 }
f656cf
 
f656cf
+/*get retry failures context*/
f656cf
+static rsRetVal
f656cf
+initializeRetryFailuresContext(wrkrInstanceData_t *pWrkrData,context *ctx){
f656cf
+	DEFiRet;
f656cf
+	ctx->statusCheckOnly=0;
f656cf
+	fjson_object *errRoot=NULL;
f656cf
+	if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
f656cf
+
f656cf
+
f656cf
+	fjson_object_object_add(errRoot, "url", fjson_object_new_string((char*)pWrkrData->restURL));
f656cf
+	ctx->errRoot = errRoot;
f656cf
+	ctx->prepareErrorFileContent= &getDataRetryFailures;
f656cf
+	CHKmalloc(ctx->jTokener = json_tokener_new());
f656cf
+	finalize_it:
f656cf
+		RETiRet;
f656cf
+}
f656cf
+
f656cf
 
f656cf
 /* write data error request/replies to separate error file
f656cf
  * Note: we open the file but never close it before exit. If it
f656cf
@@ -994,6 +1235,10 @@ writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, fjson_object
f656cf
 	char errStr[1024];
f656cf
 	context ctx;
f656cf
 	ctx.errRoot=0;
f656cf
+	ctx.writeOperation = pWrkrData->pData->writeOperation;
f656cf
+	ctx.ratelimiter = pWrkrData->pData->ratelimiter;
f656cf
+	ctx.retryRuleset = pWrkrData->pData->retryRuleset;
f656cf
+	ctx.jTokener = NULL;
f656cf
 	DEFiRet;
f656cf
 
f656cf
 	if(pData->errorFile == NULL) {
f656cf
@@ -1039,9 +1284,12 @@ writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, fjson_object
f656cf
 				DBGPRINTF("omelasticsearch: error initializing error interleaved context.\n");
f656cf
 				ABORT_FINALIZE(RS_RET_ERR);
f656cf
 			}
f656cf
-		}
f656cf
-		else
f656cf
-		{
f656cf
+		} else if(pData->retryFailures) {
f656cf
+			if(initializeRetryFailuresContext(pWrkrData, &ctx) != RS_RET_OK) {
f656cf
+				DBGPRINTF("omelasticsearch: error initializing retry failures context.\n");
f656cf
+				ABORT_FINALIZE(RS_RET_ERR);
f656cf
+			}
f656cf
+		} else {
f656cf
 			DBGPRINTF("omelasticsearch: None of the modes match file write. No data to write.\n");
f656cf
 			ABORT_FINALIZE(RS_RET_ERR);
f656cf
 		}
f656cf
@@ -1082,25 +1330,38 @@ finalize_it:
f656cf
 	if(bMutLocked)
f656cf
 		pthread_mutex_unlock(&pData->mutErrFile);
f656cf
 	fjson_object_put(ctx.errRoot);
f656cf
+	if (ctx.jTokener)
f656cf
+		json_tokener_free(ctx.jTokener);
f656cf
+	free(rendered);
f656cf
 	RETiRet;
f656cf
 }
f656cf
 
f656cf
 
f656cf
 static rsRetVal
f656cf
-checkResultBulkmode(wrkrInstanceData_t *pWrkrData, fjson_object *root)
f656cf
+checkResultBulkmode(wrkrInstanceData_t *pWrkrData, fjson_object *root, uchar *reqmsg)
f656cf
 {
f656cf
 	DEFiRet;
f656cf
 	context ctx;
f656cf
-	ctx.statusCheckOnly=1;
f656cf
 	ctx.errRoot = 0;
f656cf
-	if(parseRequestAndResponseForContext(pWrkrData,&root,0,&ctx)!= RS_RET_OK)
f656cf
-	{
f656cf
+	ctx.writeOperation = pWrkrData->pData->writeOperation;
f656cf
+	ctx.ratelimiter = pWrkrData->pData->ratelimiter;
f656cf
+	ctx.retryRuleset = pWrkrData->pData->retryRuleset;
f656cf
+	ctx.statusCheckOnly=1;
f656cf
+	ctx.jTokener = NULL;
f656cf
+	if (pWrkrData->pData->retryFailures) {
f656cf
+		ctx.statusCheckOnly=0;
f656cf
+		CHKiRet(initializeRetryFailuresContext(pWrkrData, &ctx));
f656cf
+	}
f656cf
+	if(parseRequestAndResponseForContext(pWrkrData,&root,reqmsg,&ctx)!= RS_RET_OK) {
f656cf
 		DBGPRINTF("omelasticsearch: error found in elasticsearch reply\n");
f656cf
 		ABORT_FINALIZE(RS_RET_DATAFAIL);
f656cf
 	}
f656cf
 
f656cf
-	finalize_it:
f656cf
-		RETiRet;
f656cf
+finalize_it:
f656cf
+	fjson_object_put(ctx.errRoot);
f656cf
+	if (ctx.jTokener)
f656cf
+		json_tokener_free(ctx.jTokener);
f656cf
+	RETiRet;
f656cf
 }
f656cf
 
f656cf
 
f656cf
@@ -1118,7 +1378,7 @@ checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg)
f656cf
 	}
f656cf
 
f656cf
 	if(pWrkrData->pData->bulkmode) {
f656cf
-		iRet = checkResultBulkmode(pWrkrData, root);
f656cf
+		iRet = checkResultBulkmode(pWrkrData, root, reqmsg);
f656cf
 	} else {
f656cf
 		if(fjson_object_object_get_ex(root, "status", &status)) {
f656cf
 			iRet = RS_RET_DATAFAIL;
f656cf
@@ -1397,6 +1657,13 @@ setInstParamDefaults(instanceData *pData)
f656cf
 	pData->caCertFile = NULL;
f656cf
 	pData->myCertFile = NULL;
f656cf
 	pData->myPrivKeyFile = NULL;
f656cf
+	pData->writeOperation = ES_WRITE_INDEX;
f656cf
+	pData->retryFailures = 0;
f656cf
+	pData->ratelimitBurst = 20000;
f656cf
+	pData->ratelimitInterval = 600;
f656cf
+	pData->ratelimiter = NULL;
f656cf
+	pData->retryRulesetName = NULL;
f656cf
+	pData->retryRuleset = NULL;
f656cf
 }
f656cf
 
f656cf
 BEGINnewActInst
f656cf
@@ -1495,6 +1762,27 @@ CODESTARTnewActInst
f656cf
 			} else {
f656cf
 				fclose(fp);
f656cf
 			}
f656cf
+		} else if(!strcmp(actpblk.descr[i].name, "writeoperation")) {
f656cf
+			char *writeop = es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
+			if (writeop && !strcmp(writeop, "create")) {
f656cf
+				pData->writeOperation = ES_WRITE_CREATE;
f656cf
+			} else if (writeop && !strcmp(writeop, "index")) {
f656cf
+				pData->writeOperation = ES_WRITE_INDEX;
f656cf
+			} else if (writeop) {
f656cf
+				errmsg.LogError(0, RS_RET_CONFIG_ERROR,
f656cf
+					"omelasticsearch: invalid value '%s' for writeoperation: "
f656cf
+					"must be one of 'index' or 'create' - using default value 'index'", writeop);
f656cf
+				pData->writeOperation = ES_WRITE_INDEX;
f656cf
+			}
f656cf
+			free(writeop);
f656cf
+		} else if(!strcmp(actpblk.descr[i].name, "retryfailures")) {
f656cf
+			pData->retryFailures = pvals[i].val.d.n;
f656cf
+		} else if(!strcmp(actpblk.descr[i].name, "ratelimit.burst")) {
f656cf
+			pData->ratelimitBurst = (int) pvals[i].val.d.n;
f656cf
+		} else if(!strcmp(actpblk.descr[i].name, "ratelimit.interval")) {
f656cf
+			pData->ratelimitInterval = (int) pvals[i].val.d.n;
f656cf
+		} else if(!strcmp(actpblk.descr[i].name, "retryruleset")) {
f656cf
+			pData->retryRulesetName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
 		} else {
f656cf
 			dbgprintf("omelasticsearch: program error, non-handled "
f656cf
 			  "param '%s'\n", actpblk.descr[i].name);
f656cf
@@ -1661,6 +1949,27 @@ CODESTARTnewActInst
f656cf
 		pData->searchIndex = (uchar*) strdup("system");
f656cf
 	if(pData->searchType == NULL)
f656cf
 		pData->searchType = (uchar*) strdup("events");
f656cf
+
f656cf
+	if ((pData->writeOperation != ES_WRITE_INDEX) && (pData->bulkId == NULL)) {
f656cf
+		errmsg.LogError(0, RS_RET_CONFIG_ERROR,
f656cf
+			"omelasticsearch: writeoperation '%d' requires bulkid", pData->writeOperation);
f656cf
+		ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
f656cf
+	}
f656cf
+
f656cf
+	if (pData->retryFailures) {
f656cf
+		CHKiRet(ratelimitNew(&pData->ratelimiter, "omelasticsearch", NULL));
f656cf
+		ratelimitSetLinuxLike(pData->ratelimiter, pData->ratelimitInterval, pData->ratelimitBurst);
f656cf
+		ratelimitSetNoTimeCache(pData->ratelimiter);
f656cf
+	}
f656cf
+
f656cf
+	/* node created, let's add to list of instance configs for the module */
f656cf
+	if(loadModConf->tail == NULL) {
f656cf
+		loadModConf->tail = loadModConf->root = pData;
f656cf
+	} else {
f656cf
+		loadModConf->tail->next = pData;
f656cf
+		loadModConf->tail = pData;
f656cf
+	}
f656cf
+
f656cf
 CODE_STD_FINALIZERnewActInst
f656cf
 	cnfparamvalsDestruct(pvals, &actpblk);
f656cf
 	if (serverParam)
f656cf
@@ -1680,6 +1989,51 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
f656cf
 CODE_STD_FINALIZERparseSelectorAct
f656cf
 ENDparseSelectorAct
f656cf
 
f656cf
+
f656cf
+BEGINbeginCnfLoad
f656cf
+CODESTARTbeginCnfLoad
f656cf
+	loadModConf = pModConf;
f656cf
+	pModConf->pConf = pConf;
f656cf
+	pModConf->root = pModConf->tail = NULL;
f656cf
+ENDbeginCnfLoad
f656cf
+
f656cf
+
f656cf
+BEGINendCnfLoad
f656cf
+CODESTARTendCnfLoad
f656cf
+	loadModConf = NULL; /* done loading */
f656cf
+ENDendCnfLoad
f656cf
+
f656cf
+
f656cf
+BEGINcheckCnf
f656cf
+	instanceConf_t *inst;
f656cf
+CODESTARTcheckCnf
f656cf
+	for(inst = pModConf->root ; inst != NULL ; inst = inst->next) {
f656cf
+		ruleset_t *pRuleset;
f656cf
+		rsRetVal localRet;
f656cf
+
f656cf
+		if (inst->retryRulesetName) {
f656cf
+			localRet = ruleset.GetRuleset(pModConf->pConf, &pRuleset, inst->retryRulesetName);
f656cf
+			if(localRet == RS_RET_NOT_FOUND) {
f656cf
+				errmsg.LogError(0, localRet, "omelasticsearch: retryruleset '%s' not found - "
f656cf
+						"no retry ruleset will be used", inst->retryRulesetName);
f656cf
+			} else {
f656cf
+				inst->retryRuleset = pRuleset;
f656cf
+			}
f656cf
+		}
f656cf
+	}
f656cf
+ENDcheckCnf
f656cf
+
f656cf
+
f656cf
+BEGINactivateCnf
f656cf
+CODESTARTactivateCnf
f656cf
+ENDactivateCnf
f656cf
+
f656cf
+
f656cf
+BEGINfreeCnf
f656cf
+CODESTARTfreeCnf
f656cf
+ENDfreeCnf
f656cf
+
f656cf
+
f656cf
 BEGINdoHUP
f656cf
 CODESTARTdoHUP
f656cf
 	if(pData->fdErrFile != -1) {
f656cf
@@ -1691,10 +2045,14 @@ ENDdoHUP
f656cf
 
f656cf
 BEGINmodExit
f656cf
 CODESTARTmodExit
f656cf
+	if(pInputName != NULL)
f656cf
+		prop.Destruct(&pInputName);
f656cf
 	curl_global_cleanup();
f656cf
 	statsobj.Destruct(&indexStats);
f656cf
 	objRelease(errmsg, CORE_COMPONENT);
f656cf
-        objRelease(statsobj, CORE_COMPONENT);
f656cf
+	objRelease(statsobj, CORE_COMPONENT);
f656cf
+	objRelease(prop, CORE_COMPONENT);
f656cf
+	objRelease(ruleset, CORE_COMPONENT);
f656cf
 ENDmodExit
f656cf
 
f656cf
 BEGINqueryEtryPt
f656cf
@@ -1705,6 +2063,7 @@ CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
f656cf
 CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
f656cf
 CODEqueryEtryPt_doHUP
f656cf
 CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
f656cf
+CODEqueryEtryPt_STD_CONF2_QUERIES
f656cf
 ENDqueryEtryPt
f656cf
 
f656cf
 
f656cf
@@ -1714,6 +2073,8 @@ CODESTARTmodInit
f656cf
 CODEmodInit_QueryRegCFSLineHdlr
f656cf
 	CHKiRet(objUse(errmsg, CORE_COMPONENT));
f656cf
 	CHKiRet(objUse(statsobj, CORE_COMPONENT));
f656cf
+	CHKiRet(objUse(prop, CORE_COMPONENT));
f656cf
+	CHKiRet(objUse(ruleset, CORE_COMPONENT));
f656cf
 
f656cf
 	if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
f656cf
 		errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled");
f656cf
@@ -1739,7 +2100,28 @@ CODEmodInit_QueryRegCFSLineHdlr
f656cf
 	STATSCOUNTER_INIT(indexESFail, mutIndexESFail);
f656cf
 	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.es",
f656cf
 		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexESFail));
f656cf
+	STATSCOUNTER_INIT(indexSuccess, mutIndexSuccess);
f656cf
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.success",
f656cf
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexSuccess));
f656cf
+	STATSCOUNTER_INIT(indexBadResponse, mutIndexBadResponse);
f656cf
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bad",
f656cf
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadResponse));
f656cf
+	STATSCOUNTER_INIT(indexDuplicate, mutIndexDuplicate);
f656cf
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.duplicate",
f656cf
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexDuplicate));
f656cf
+	STATSCOUNTER_INIT(indexBadArgument, mutIndexBadArgument);
f656cf
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.badargument",
f656cf
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadArgument));
f656cf
+	STATSCOUNTER_INIT(indexBulkRejection, mutIndexBulkRejection);
f656cf
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bulkrejection",
f656cf
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBulkRejection));
f656cf
+	STATSCOUNTER_INIT(indexOtherResponse, mutIndexOtherResponse);
f656cf
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.other",
f656cf
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexOtherResponse));
f656cf
 	CHKiRet(statsobj.ConstructFinalize(indexStats));
f656cf
+	CHKiRet(prop.Construct(&pInputName));
f656cf
+	CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("omelasticsearch"), sizeof("omelasticsearch") - 1));
f656cf
+	CHKiRet(prop.ConstructFinalize(pInputName));
f656cf
 ENDmodInit
f656cf
 
f656cf
 /* vi:set ai: