Blame SOURCES/rsyslog-8.24.0-rhbz1565214-omelasticsearch-write-op-types-bulk-rejection-retries.patch

fde8c1
From 989be897340eb458b00efedfd5e082bb362db79a Mon Sep 17 00:00:00 2001
fde8c1
From: Rich Megginson <rmeggins@redhat.com>
fde8c1
Date: Tue, 15 May 2018 16:03:25 -0600
fde8c1
Subject: [PATCH 11/11] omelasticsearch: write op types; bulk rejection retries
fde8c1
fde8c1
Add support for a 'create' write operation type in addition to
fde8c1
the default 'index'.  Using create allows specifying a unique id
fde8c1
for each record, and allows duplicate document detection.
fde8c1
fde8c1
Add support for checking each record returned in a bulk index
fde8c1
request response.  Allow specifying a ruleset to send each failed
fde8c1
record to.  Add a local variable `omes` which contains the
fde8c1
information in the error response, so that users can control how
fde8c1
to handle responses e.g. retry, or send to an error file.
fde8c1
fde8c1
Add support for response stats - count successes, duplicates, and
fde8c1
different types of failures.
fde8c1
fde8c1
Add testing for bulk index rejections.
fde8c1
fde8c1
(cherry picked from commit 57dd368a2a915d79c94a8dc0de30c93a0bbdc8fe)
fde8c1
(cherry picked from commit 30a15621e1e7e393b2153e9fe5c13f724dea25b5)
fde8c1
---
fde8c1
 plugins/omelasticsearch/omelasticsearch.c | 441 ++++++++++++++++++++++++++++--
fde8c1
 1 file changed, 415 insertions(+), 26 deletions(-)
fde8c1
fde8c1
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
fde8c1
index ed2b47535..ca61ae28f 100644
fde8c1
--- a/plugins/omelasticsearch/omelasticsearch.c
fde8c1
+++ b/plugins/omelasticsearch/omelasticsearch.c
fde8c1
@@ -51,6 +51,8 @@
fde8c1
 #include "statsobj.h"
fde8c1
 #include "cfsysline.h"
fde8c1
 #include "unicode-helper.h"
fde8c1
+#include "ratelimit.h"
fde8c1
+#include "ruleset.h"
fde8c1
 
fde8c1
 #ifndef O_LARGEFILE
fde8c1
 #  define O_LARGEFILE 0
fde8c1
@@ -64,6 +66,8 @@ MODULE_CNFNAME("omelasticsearch")
fde8c1
 DEF_OMOD_STATIC_DATA
fde8c1
 DEFobjCurrIf(errmsg)
fde8c1
 DEFobjCurrIf(statsobj)
fde8c1
+DEFobjCurrIf(prop)
fde8c1
+DEFobjCurrIf(ruleset)
fde8c1
 
fde8c1
 statsobj_t *indexStats;
fde8c1
 STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit)
fde8c1
@@ -71,19 +75,35 @@ STATSCOUNTER_DEF(indexHTTPFail, mutIndexHTTPFail)
fde8c1
 STATSCOUNTER_DEF(indexHTTPReqFail, mutIndexHTTPReqFail)
fde8c1
 STATSCOUNTER_DEF(checkConnFail, mutCheckConnFail)
fde8c1
 STATSCOUNTER_DEF(indexESFail, mutIndexESFail)
fde8c1
+STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess)
fde8c1
+STATSCOUNTER_DEF(indexBadResponse, mutIndexBadResponse)
fde8c1
+STATSCOUNTER_DEF(indexDuplicate, mutIndexDuplicate)
fde8c1
+STATSCOUNTER_DEF(indexBadArgument, mutIndexBadArgument)
fde8c1
+STATSCOUNTER_DEF(indexBulkRejection, mutIndexBulkRejection)
fde8c1
+STATSCOUNTER_DEF(indexOtherResponse, mutIndexOtherResponse)
fde8c1
 
fde8c1
+static prop_t *pInputName = NULL;
fde8c1
 
fde8c1
 #	define META_STRT "{\"index\":{\"_index\": \""
fde8c1
+#	define META_STRT_CREATE "{\"create\":{\"_index\": \""
fde8c1
 #	define META_TYPE "\",\"_type\":\""
fde8c1
 #	define META_PARENT "\",\"_parent\":\""
fde8c1
 #	define META_ID "\", \"_id\":\""
fde8c1
 #	define META_END  "\"}}\n"
fde8c1
 
fde8c1
+typedef enum {
fde8c1
+	ES_WRITE_INDEX,
fde8c1
+	ES_WRITE_CREATE,
fde8c1
+	ES_WRITE_UPDATE, /* not supported */
fde8c1
+	ES_WRITE_UPSERT /* not supported */
fde8c1
+} es_write_ops_t;
fde8c1
+
fde8c1
 /* REST API for elasticsearch hits this URL:
fde8c1
  * http://<hostName>:<restPort>/<searchIndex>/<searchType>
fde8c1
  */
fde8c1
+/* bulk API uses /_bulk */
fde8c1
 typedef struct curl_slist HEADER;
fde8c1
-typedef struct _instanceData {
fde8c1
+typedef struct instanceConf_s {
fde8c1
 	int defaultPort;
fde8c1
 	int fdErrFile;		/* error file fd or -1 if not open */
fde8c1
 	pthread_mutex_t mutErrFile;
fde8c1
@@ -113,8 +133,25 @@ typedef struct _instanceData {
fde8c1
 	uchar *caCertFile;
fde8c1
 	uchar *myCertFile;
fde8c1
 	uchar *myPrivKeyFile;
fde8c1
+	es_write_ops_t writeOperation;
fde8c1
+	sbool retryFailures;
fde8c1
+	int ratelimitInterval;
fde8c1
+	int ratelimitBurst;
fde8c1
+	/* for retries */
fde8c1
+	ratelimit_t *ratelimiter;
fde8c1
+	uchar *retryRulesetName;
fde8c1
+	ruleset_t *retryRuleset;
fde8c1
+	struct instanceConf_s *next;
fde8c1
 } instanceData;
fde8c1
 
fde8c1
+typedef instanceConf_t instanceData;
fde8c1
+
fde8c1
+struct modConfData_s {
fde8c1
+	rsconf_t *pConf;		/* our overall config object */
fde8c1
+	instanceConf_t *root, *tail;
fde8c1
+};
fde8c1
+static modConfData_t *loadModConf = NULL;	/* modConf ptr to use for the current load process */
fde8c1
+
fde8c1
 typedef struct wrkrInstanceData {
fde8c1
 	instanceData *pData;
fde8c1
 	int serverIndex;
fde8c1
@@ -160,7 +197,12 @@ static struct cnfparamdescr actpdescr[] = {
fde8c1
 	{ "allowunsignedcerts", eCmdHdlrBinary, 0 },
fde8c1
 	{ "tls.cacert", eCmdHdlrString, 0 },
fde8c1
 	{ "tls.mycert", eCmdHdlrString, 0 },
fde8c1
-	{ "tls.myprivkey", eCmdHdlrString, 0 }
fde8c1
+	{ "tls.myprivkey", eCmdHdlrString, 0 },
fde8c1
+	{ "writeoperation", eCmdHdlrGetWord, 0 },
fde8c1
+	{ "retryfailures", eCmdHdlrBinary, 0 },
fde8c1
+	{ "ratelimit.interval", eCmdHdlrInt, 0 },
fde8c1
+	{ "ratelimit.burst", eCmdHdlrInt, 0 },
fde8c1
+	{ "retryruleset", eCmdHdlrString, 0 }
fde8c1
 };
fde8c1
 static struct cnfparamblk actpblk =
fde8c1
 	{ CNFPARAMBLK_VERSION,
fde8c1
@@ -177,6 +219,9 @@ CODESTARTcreateInstance
fde8c1
 	pData->caCertFile = NULL;
fde8c1
 	pData->myCertFile = NULL;
fde8c1
 	pData->myPrivKeyFile = NULL;
fde8c1
+	pData->ratelimiter = NULL;
fde8c1
+	pData->retryRulesetName = NULL;
fde8c1
+	pData->retryRuleset = NULL;
fde8c1
 ENDcreateInstance
fde8c1
 
fde8c1
 BEGINcreateWrkrInstance
fde8c1
@@ -228,6 +273,9 @@ CODESTARTfreeInstance
fde8c1
 	free(pData->caCertFile);
fde8c1
 	free(pData->myCertFile);
fde8c1
 	free(pData->myPrivKeyFile);
fde8c1
+	free(pData->retryRulesetName);
fde8c1
+	if (pData->ratelimiter != NULL)
fde8c1
+		ratelimitDestruct(pData->ratelimiter);
fde8c1
 ENDfreeInstance
fde8c1
 
fde8c1
 BEGINfreeWrkrInstance
fde8c1
@@ -285,6 +333,10 @@ CODESTARTdbgPrintInstInfo
fde8c1
 	dbgprintf("\ttls.cacert='%s'\n", pData->caCertFile);
fde8c1
 	dbgprintf("\ttls.mycert='%s'\n", pData->myCertFile);
fde8c1
 	dbgprintf("\ttls.myprivkey='%s'\n", pData->myPrivKeyFile);
fde8c1
+	dbgprintf("\twriteoperation='%d'\n", pData->writeOperation);
fde8c1
+	dbgprintf("\tretryfailures='%d'\n", pData->retryFailures);
fde8c1
+	dbgprintf("\tratelimit.interval='%d'\n", pData->ratelimitInterval);
fde8c1
+	dbgprintf("\tratelimit.burst='%d'\n", pData->ratelimitBurst);
fde8c1
 ENDdbgPrintInstInfo
fde8c1
 
fde8c1
 
fde8c1
@@ -557,7 +609,11 @@ finalize_it:
fde8c1
 static size_t
fde8c1
 computeMessageSize(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
fde8c1
 {
fde8c1
-	size_t r = sizeof(META_STRT)-1 + sizeof(META_TYPE)-1 + sizeof(META_END)-1 + sizeof("\n")-1;
fde8c1
+	size_t r = sizeof(META_TYPE)-1 + sizeof(META_END)-1 + sizeof("\n")-1;
fde8c1
+	if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
fde8c1
+		r += sizeof(META_STRT_CREATE)-1;
fde8c1
+	else
fde8c1
+		r += sizeof(META_STRT)-1;
fde8c1
 
fde8c1
 	uchar *searchIndex = 0;
fde8c1
 	uchar *searchType;
fde8c1
@@ -594,7 +650,10 @@ buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
fde8c1
 	DEFiRet;
fde8c1
 
fde8c1
 	getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId);
fde8c1
-	r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1);
fde8c1
+	if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
fde8c1
+		r = es_addBuf(&pWrkrData->batch.data, META_STRT_CREATE, sizeof(META_STRT_CREATE)-1);
fde8c1
+	else
fde8c1
+		r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1);
fde8c1
 	if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex,
fde8c1
 				 ustrlen(searchIndex));
fde8c1
 	if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
fde8c1
@@ -709,13 +768,20 @@ static int checkReplyStatus(fjson_object* ok) {
fde8c1
 
fde8c1
 /*
fde8c1
  * Context object for error file content creation or status check
fde8c1
+ * response_item - the full {"create":{"_index":"idxname",.....}}
fde8c1
+ * response_body - the inner hash of the response_item - {"_index":"idxname",...}
fde8c1
+ * status - the "status" field from the inner hash - "status":500
fde8c1
+ *          should be able to use fjson_object_get_int(status) to get the http result code
fde8c1
  */
fde8c1
 typedef struct exeContext{
fde8c1
 	int statusCheckOnly;
fde8c1
 	fjson_object *errRoot;
fde8c1
-	rsRetVal (*prepareErrorFileContent)(struct exeContext *ctx,int itemStatus,char *request,char *response);
fde8c1
-
fde8c1
-
fde8c1
+	rsRetVal (*prepareErrorFileContent)(struct exeContext *ctx,int itemStatus,char *request,char *response,
fde8c1
+			fjson_object *response_item, fjson_object *response_body, fjson_object *status);
fde8c1
+	es_write_ops_t writeOperation;
fde8c1
+	ratelimit_t *ratelimiter;
fde8c1
+	ruleset_t *retryRuleset;
fde8c1
+	struct json_tokener *jTokener;
fde8c1
 } context;
fde8c1
 
fde8c1
 /*
fde8c1
@@ -728,8 +794,15 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
fde8c1
 	fjson_object *replyRoot = *pReplyRoot;
fde8c1
 	int i;
fde8c1
 	int numitems;
fde8c1
-	fjson_object *items=NULL;
fde8c1
+	fjson_object *items=NULL, *jo_errors = NULL;
fde8c1
+	int errors = 0;
fde8c1
 
fde8c1
+	if(fjson_object_object_get_ex(replyRoot, "errors", &jo_errors)) {
fde8c1
+		errors = fjson_object_get_boolean(jo_errors);
fde8c1
+		if (!errors && pWrkrData->pData->retryFailures) {
fde8c1
+			return RS_RET_OK;
fde8c1
+		}
fde8c1
+	}
fde8c1
 
fde8c1
 	/*iterate over items*/
fde8c1
 	if(!fjson_object_object_get_ex(replyRoot, "items", &items)) {
fde8c1
@@ -741,7 +814,11 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
fde8c1
 
fde8c1
 	numitems = fjson_object_array_length(items);
fde8c1
 
fde8c1
-	DBGPRINTF("omelasticsearch: Entire request %s\n",reqmsg);
fde8c1
+	if (reqmsg) {
fde8c1
+		DBGPRINTF("omelasticsearch: Entire request %s\n", reqmsg);
fde8c1
+	} else {
fde8c1
+		DBGPRINTF("omelasticsearch: Empty request\n");
fde8c1
+	}
fde8c1
 	const char *lastReqRead= (char*)reqmsg;
fde8c1
 
fde8c1
 	DBGPRINTF("omelasticsearch: %d items in reply\n", numitems);
fde8c1
@@ -769,8 +846,7 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
fde8c1
 		
fde8c1
 		char *request =0;
fde8c1
 		char *response =0;
fde8c1
-		if(ctx->statusCheckOnly)
fde8c1
-		{
fde8c1
+		if(ctx->statusCheckOnly || (NULL == lastReqRead)) {
fde8c1
 			if(itemStatus) {
fde8c1
 				DBGPRINTF("omelasticsearch: error in elasticsearch reply: item %d, status is %d\n", i, fjson_object_get_int(ok));
fde8c1
 				DBGPRINTF("omelasticsearch: status check found error.\n");
fde8c1
@@ -795,7 +871,8 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
fde8c1
 			}
fde8c1
 
fde8c1
 			/*call the context*/
fde8c1
-			rsRetVal ret = ctx->prepareErrorFileContent(ctx, itemStatus, request,response);
fde8c1
+			rsRetVal ret = ctx->prepareErrorFileContent(ctx, itemStatus, request,
fde8c1
+					response, item, result, ok);
fde8c1
 
fde8c1
 			/*free memory in any case*/
fde8c1
 			free(request);
fde8c1
@@ -818,11 +895,14 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
fde8c1
  * Dumps only failed requests of bulk insert
fde8c1
  */
fde8c1
 static rsRetVal
fde8c1
-getDataErrorOnly(context *ctx,int itemStatus,char *request,char *response)
fde8c1
+getDataErrorOnly(context *ctx,int itemStatus,char *request,char *response,
fde8c1
+		fjson_object *response_item, fjson_object *response_body, fjson_object *status)
fde8c1
 {
fde8c1
 	DEFiRet;
fde8c1
-	if(itemStatus)
fde8c1
-	{
fde8c1
+	(void)response_item; /* unused */
fde8c1
+	(void)response_body; /* unused */
fde8c1
+	(void)status; /* unused */
fde8c1
+	if(itemStatus) {
fde8c1
 		fjson_object *onlyErrorResponses =NULL;
fde8c1
 		fjson_object *onlyErrorRequests=NULL;
fde8c1
 
fde8c1
@@ -855,9 +935,16 @@ static rsRetVal
fde8c1
 getDataInterleaved(context *ctx,
fde8c1
 	int __attribute__((unused)) itemStatus,
fde8c1
 	char *request,
fde8c1
-	char *response)
fde8c1
+	char *response,
fde8c1
+	fjson_object *response_item,
fde8c1
+	fjson_object *response_body,
fde8c1
+	fjson_object *status
fde8c1
+)
fde8c1
 {
fde8c1
 	DEFiRet;
fde8c1
+	(void)response_item; /* unused */
fde8c1
+	(void)response_body; /* unused */
fde8c1
+	(void)status; /* unused */
fde8c1
 	fjson_object *interleaved =NULL;
fde8c1
 	if(!fjson_object_object_get_ex(ctx->errRoot, "response", &interleaved))
fde8c1
 	{
fde8c1
@@ -889,11 +976,13 @@ getDataInterleaved(context *ctx,
fde8c1
  */
fde8c1
 
fde8c1
 static rsRetVal
fde8c1
-getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *response)
fde8c1
+getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *response,
fde8c1
+		fjson_object *response_item, fjson_object *response_body, fjson_object *status)
fde8c1
 {
fde8c1
 	DEFiRet;
fde8c1
 	if (itemStatus) {
fde8c1
-		if(getDataInterleaved(ctx, itemStatus,request,response)!= RS_RET_OK) {
fde8c1
+		if(getDataInterleaved(ctx, itemStatus,request,response,
fde8c1
+				response_item, response_body, status)!= RS_RET_OK) {
fde8c1
 			ABORT_FINALIZE(RS_RET_ERR);
fde8c1
 		}
fde8c1
 	}
fde8c1
@@ -902,6 +991,141 @@ getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *resp
fde8c1
 		RETiRet;
fde8c1
 }
fde8c1
 
fde8c1
+/* request string looks like this:
fde8c1
+ * "{\"create\":{\"_index\": \"rsyslog_testbench\",\"_type\":\"test-type\",
fde8c1
+ *   \"_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}}\n
fde8c1
+ * {\"msgnum\":\"x00000000\",\"viaq_msg_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}\n"
fde8c1
+ * we don't want the meta header, only the data part
fde8c1
+ * start = first \n + 1
fde8c1
+ * end = last \n
fde8c1
+ */
fde8c1
+static rsRetVal
fde8c1
+createMsgFromRequest(const char *request, context *ctx, smsg_t **msg)
fde8c1
+{
fde8c1
+	DEFiRet;
fde8c1
+	fjson_object *jo_msg = NULL;
fde8c1
+	const char *datastart, *dataend;
fde8c1
+	size_t datalen;
fde8c1
+	enum json_tokener_error json_error;
fde8c1
+
fde8c1
+	*msg = NULL;
fde8c1
+	if (!(datastart = strchr(request, '\n')) || (datastart[1] != '{')) {
fde8c1
+		LogError(0, RS_RET_ERR,
fde8c1
+			"omelasticsearch: malformed original request - "
fde8c1
+			"could not find start of original data [%s]",
fde8c1
+			request);
fde8c1
+		ABORT_FINALIZE(RS_RET_ERR);
fde8c1
+	}
fde8c1
+	datastart++; /* advance to { */
fde8c1
+	if (!(dataend = strchr(datastart, '\n')) || (dataend[1] != '\0')) {
fde8c1
+		LogError(0, RS_RET_ERR,
fde8c1
+			"omelasticsearch: malformed original request - "
fde8c1
+			"could not find end of original data [%s]",
fde8c1
+			request);
fde8c1
+		ABORT_FINALIZE(RS_RET_ERR);
fde8c1
+	}
fde8c1
+	datalen = dataend - datastart;
fde8c1
+	json_tokener_reset(ctx->jTokener);
fde8c1
+	fjson_object *jo_request = json_tokener_parse_ex(ctx->jTokener, datastart, datalen);
fde8c1
+	json_error = fjson_tokener_get_error(ctx->jTokener);
fde8c1
+	if (!jo_request || (json_error != fjson_tokener_success)) {
fde8c1
+		LogError(0, RS_RET_ERR,
fde8c1
+			"omelasticsearch: parse error [%s] - could not convert original "
fde8c1
+			"request JSON back into JSON object [%s]",
fde8c1
+			fjson_tokener_error_desc(json_error), request);
fde8c1
+		ABORT_FINALIZE(RS_RET_ERR);
fde8c1
+	}
fde8c1
+
fde8c1
+	CHKiRet(msgConstruct(msg));
fde8c1
+	MsgSetFlowControlType(*msg, eFLOWCTL_FULL_DELAY);
fde8c1
+	MsgSetInputName(*msg, pInputName);
fde8c1
+	if (fjson_object_object_get_ex(jo_request, "message", &jo_msg)) {
fde8c1
+		const char *rawmsg = json_object_get_string(jo_msg);
fde8c1
+		const size_t msgLen = (size_t)json_object_get_string_len(jo_msg);
fde8c1
+		MsgSetRawMsg(*msg, rawmsg, msgLen);
fde8c1
+	} else {
fde8c1
+		MsgSetRawMsg(*msg, request, strlen(request));
fde8c1
+	}
fde8c1
+	MsgSetMSGoffs(*msg, 0);	/* we do not have a header... */
fde8c1
+	CHKiRet(msgAddJSON(*msg, (uchar*)"!", jo_request, 0, 0));
fde8c1
+
fde8c1
+	finalize_it:
fde8c1
+		RETiRet;
fde8c1
+
fde8c1
+}
fde8c1
+
fde8c1
+
fde8c1
+static rsRetVal
fde8c1
+getDataRetryFailures(context *ctx,int itemStatus,char *request,char *response,
fde8c1
+		fjson_object *response_item, fjson_object *response_body, fjson_object *status)
fde8c1
+{
fde8c1
+	DEFiRet;
fde8c1
+	fjson_object *omes = NULL, *jo = NULL;
fde8c1
+	int istatus = fjson_object_get_int(status);
fde8c1
+	int iscreateop = 0;
fde8c1
+	struct json_object_iterator it = json_object_iter_begin(response_item);
fde8c1
+	struct json_object_iterator itEnd = json_object_iter_end(response_item);
fde8c1
+	const char *optype = NULL;
fde8c1
+	smsg_t *msg = NULL;
fde8c1
+
fde8c1
+	(void)response;
fde8c1
+	(void)itemStatus;
fde8c1
+	CHKiRet(createMsgFromRequest(request, ctx, &msg));
fde8c1
+	CHKmalloc(msg);
fde8c1
+	/* add status as local variables */
fde8c1
+	omes = json_object_new_object();
fde8c1
+	if (!json_object_iter_equal(&it, &itEnd))
fde8c1
+		optype = json_object_iter_peek_name(&it);
fde8c1
+	if (optype && !strcmp("create", optype))
fde8c1
+		iscreateop = 1;
fde8c1
+	if (optype && !strcmp("index", optype) && (ctx->writeOperation == ES_WRITE_INDEX))
fde8c1
+		iscreateop = 1;
fde8c1
+	if (optype) {
fde8c1
+		jo = json_object_new_string(optype);
fde8c1
+	} else {
fde8c1
+		jo = json_object_new_string("unknown");
fde8c1
+	}
fde8c1
+	json_object_object_add(omes, "writeoperation", jo);
fde8c1
+
fde8c1
+	if (!optype) {
fde8c1
+		STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
fde8c1
+	} else if ((istatus == 200) || (istatus == 201)) {
fde8c1
+		STATSCOUNTER_INC(indexSuccess, mutIndexSuccess);
fde8c1
+	} else if ((istatus == 409) && iscreateop) {
fde8c1
+		STATSCOUNTER_INC(indexDuplicate, mutIndexDuplicate);
fde8c1
+	} else if (istatus == 400 || (istatus < 200)) {
fde8c1
+		STATSCOUNTER_INC(indexBadArgument, mutIndexBadArgument);
fde8c1
+	} else {
fde8c1
+		fjson_object *error = NULL, *errtype = NULL;
fde8c1
+		if(fjson_object_object_get_ex(response_body, "error", &error) &&
fde8c1
+		   fjson_object_object_get_ex(error, "type", &errtype)) {
fde8c1
+			if (istatus == 429) {
fde8c1
+				STATSCOUNTER_INC(indexBulkRejection, mutIndexBulkRejection);
fde8c1
+			} else {
fde8c1
+				STATSCOUNTER_INC(indexOtherResponse, mutIndexOtherResponse);
fde8c1
+			}
fde8c1
+		} else {
fde8c1
+			STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
fde8c1
+		}
fde8c1
+	}
fde8c1
+	/* add response_body fields to local var omes */
fde8c1
+	it = json_object_iter_begin(response_body);
fde8c1
+	itEnd = json_object_iter_end(response_body);
fde8c1
+	while (!json_object_iter_equal(&it, &itEnd)) {
fde8c1
+		json_object_object_add(omes, json_object_iter_peek_name(&it),
fde8c1
+			json_object_get(json_object_iter_peek_value(&it)));
fde8c1
+		json_object_iter_next(&it);
fde8c1
+	}
fde8c1
+	CHKiRet(msgAddJSON(msg, (uchar*)".omes", omes, 0, 0));
fde8c1
+	omes = NULL;
fde8c1
+	MsgSetRuleset(msg, ctx->retryRuleset);
fde8c1
+	CHKiRet(ratelimitAddMsg(ctx->ratelimiter, NULL, msg));
fde8c1
+finalize_it:
fde8c1
+	if (omes)
fde8c1
+		json_object_put(omes);
fde8c1
+	RETiRet;
fde8c1
+}
fde8c1
+
fde8c1
 /*
fde8c1
  * get erroronly context
fde8c1
  */
fde8c1
@@ -979,6 +1203,23 @@ initializeErrorInterleavedConext(wrkrInstanceData_t *pWrkrData,context *ctx){
fde8c1
 		RETiRet;
fde8c1
 }
fde8c1
 
fde8c1
+/*get retry failures context*/
fde8c1
+static rsRetVal
fde8c1
+initializeRetryFailuresContext(wrkrInstanceData_t *pWrkrData,context *ctx){
fde8c1
+	DEFiRet;
fde8c1
+	ctx->statusCheckOnly=0;
fde8c1
+	fjson_object *errRoot=NULL;
fde8c1
+	if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
fde8c1
+
fde8c1
+
fde8c1
+	fjson_object_object_add(errRoot, "url", fjson_object_new_string((char*)pWrkrData->restURL));
fde8c1
+	ctx->errRoot = errRoot;
fde8c1
+	ctx->prepareErrorFileContent= &getDataRetryFailures;
fde8c1
+	CHKmalloc(ctx->jTokener = json_tokener_new());
fde8c1
+	finalize_it:
fde8c1
+		RETiRet;
fde8c1
+}
fde8c1
+
fde8c1
 
fde8c1
 /* write data error request/replies to separate error file
fde8c1
  * Note: we open the file but never close it before exit. If it
fde8c1
@@ -994,6 +1235,10 @@ writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, fjson_object
fde8c1
 	char errStr[1024];
fde8c1
 	context ctx;
fde8c1
 	ctx.errRoot=0;
fde8c1
+	ctx.writeOperation = pWrkrData->pData->writeOperation;
fde8c1
+	ctx.ratelimiter = pWrkrData->pData->ratelimiter;
fde8c1
+	ctx.retryRuleset = pWrkrData->pData->retryRuleset;
fde8c1
+	ctx.jTokener = NULL;
fde8c1
 	DEFiRet;
fde8c1
 
fde8c1
 	if(pData->errorFile == NULL) {
fde8c1
@@ -1039,9 +1284,12 @@ writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, fjson_object
fde8c1
 				DBGPRINTF("omelasticsearch: error initializing error interleaved context.\n");
fde8c1
 				ABORT_FINALIZE(RS_RET_ERR);
fde8c1
 			}
fde8c1
-		}
fde8c1
-		else
fde8c1
-		{
fde8c1
+		} else if(pData->retryFailures) {
fde8c1
+			if(initializeRetryFailuresContext(pWrkrData, &ctx) != RS_RET_OK) {
fde8c1
+				DBGPRINTF("omelasticsearch: error initializing retry failures context.\n");
fde8c1
+				ABORT_FINALIZE(RS_RET_ERR);
fde8c1
+			}
fde8c1
+		} else {
fde8c1
 			DBGPRINTF("omelasticsearch: None of the modes match file write. No data to write.\n");
fde8c1
 			ABORT_FINALIZE(RS_RET_ERR);
fde8c1
 		}
fde8c1
@@ -1082,25 +1330,38 @@ finalize_it:
fde8c1
 	if(bMutLocked)
fde8c1
 		pthread_mutex_unlock(&pData->mutErrFile);
fde8c1
 	fjson_object_put(ctx.errRoot);
fde8c1
+	if (ctx.jTokener)
fde8c1
+		json_tokener_free(ctx.jTokener);
fde8c1
+	free(rendered);
fde8c1
 	RETiRet;
fde8c1
 }
fde8c1
 
fde8c1
 
fde8c1
 static rsRetVal
fde8c1
-checkResultBulkmode(wrkrInstanceData_t *pWrkrData, fjson_object *root)
fde8c1
+checkResultBulkmode(wrkrInstanceData_t *pWrkrData, fjson_object *root, uchar *reqmsg)
fde8c1
 {
fde8c1
 	DEFiRet;
fde8c1
 	context ctx;
fde8c1
-	ctx.statusCheckOnly=1;
fde8c1
 	ctx.errRoot = 0;
fde8c1
-	if(parseRequestAndResponseForContext(pWrkrData,&root,0,&ctx)!= RS_RET_OK)
fde8c1
-	{
fde8c1
+	ctx.writeOperation = pWrkrData->pData->writeOperation;
fde8c1
+	ctx.ratelimiter = pWrkrData->pData->ratelimiter;
fde8c1
+	ctx.retryRuleset = pWrkrData->pData->retryRuleset;
fde8c1
+	ctx.statusCheckOnly=1;
fde8c1
+	ctx.jTokener = NULL;
fde8c1
+	if (pWrkrData->pData->retryFailures) {
fde8c1
+		ctx.statusCheckOnly=0;
fde8c1
+		CHKiRet(initializeRetryFailuresContext(pWrkrData, &ctx));
fde8c1
+	}
fde8c1
+	if(parseRequestAndResponseForContext(pWrkrData,&root,reqmsg,&ctx)!= RS_RET_OK) {
fde8c1
 		DBGPRINTF("omelasticsearch: error found in elasticsearch reply\n");
fde8c1
 		ABORT_FINALIZE(RS_RET_DATAFAIL);
fde8c1
 	}
fde8c1
 
fde8c1
-	finalize_it:
fde8c1
-		RETiRet;
fde8c1
+finalize_it:
fde8c1
+	fjson_object_put(ctx.errRoot);
fde8c1
+	if (ctx.jTokener)
fde8c1
+		json_tokener_free(ctx.jTokener);
fde8c1
+	RETiRet;
fde8c1
 }
fde8c1
 
fde8c1
 
fde8c1
@@ -1118,7 +1378,7 @@ checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg)
fde8c1
 	}
fde8c1
 
fde8c1
 	if(pWrkrData->pData->bulkmode) {
fde8c1
-		iRet = checkResultBulkmode(pWrkrData, root);
fde8c1
+		iRet = checkResultBulkmode(pWrkrData, root, reqmsg);
fde8c1
 	} else {
fde8c1
 		if(fjson_object_object_get_ex(root, "status", &status)) {
fde8c1
 			iRet = RS_RET_DATAFAIL;
fde8c1
@@ -1397,6 +1657,13 @@ setInstParamDefaults(instanceData *pData)
fde8c1
 	pData->caCertFile = NULL;
fde8c1
 	pData->myCertFile = NULL;
fde8c1
 	pData->myPrivKeyFile = NULL;
fde8c1
+	pData->writeOperation = ES_WRITE_INDEX;
fde8c1
+	pData->retryFailures = 0;
fde8c1
+	pData->ratelimitBurst = 20000;
fde8c1
+	pData->ratelimitInterval = 600;
fde8c1
+	pData->ratelimiter = NULL;
fde8c1
+	pData->retryRulesetName = NULL;
fde8c1
+	pData->retryRuleset = NULL;
fde8c1
 }
fde8c1
 
fde8c1
 BEGINnewActInst
fde8c1
@@ -1495,6 +1762,27 @@ CODESTARTnewActInst
fde8c1
 			} else {
fde8c1
 				fclose(fp);
fde8c1
 			}
fde8c1
+		} else if(!strcmp(actpblk.descr[i].name, "writeoperation")) {
fde8c1
+			char *writeop = es_str2cstr(pvals[i].val.d.estr, NULL);
fde8c1
+			if (writeop && !strcmp(writeop, "create")) {
fde8c1
+				pData->writeOperation = ES_WRITE_CREATE;
fde8c1
+			} else if (writeop && !strcmp(writeop, "index")) {
fde8c1
+				pData->writeOperation = ES_WRITE_INDEX;
fde8c1
+			} else if (writeop) {
fde8c1
+				errmsg.LogError(0, RS_RET_CONFIG_ERROR,
fde8c1
+					"omelasticsearch: invalid value '%s' for writeoperation: "
fde8c1
+					"must be one of 'index' or 'create' - using default value 'index'", writeop);
fde8c1
+				pData->writeOperation = ES_WRITE_INDEX;
fde8c1
+			}
fde8c1
+			free(writeop);
fde8c1
+		} else if(!strcmp(actpblk.descr[i].name, "retryfailures")) {
fde8c1
+			pData->retryFailures = pvals[i].val.d.n;
fde8c1
+		} else if(!strcmp(actpblk.descr[i].name, "ratelimit.burst")) {
fde8c1
+			pData->ratelimitBurst = (int) pvals[i].val.d.n;
fde8c1
+		} else if(!strcmp(actpblk.descr[i].name, "ratelimit.interval")) {
fde8c1
+			pData->ratelimitInterval = (int) pvals[i].val.d.n;
fde8c1
+		} else if(!strcmp(actpblk.descr[i].name, "retryruleset")) {
fde8c1
+			pData->retryRulesetName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
fde8c1
 		} else {
fde8c1
 			dbgprintf("omelasticsearch: program error, non-handled "
fde8c1
 			  "param '%s'\n", actpblk.descr[i].name);
fde8c1
@@ -1661,6 +1949,27 @@ CODESTARTnewActInst
fde8c1
 		pData->searchIndex = (uchar*) strdup("system");
fde8c1
 	if(pData->searchType == NULL)
fde8c1
 		pData->searchType = (uchar*) strdup("events");
fde8c1
+
fde8c1
+	if ((pData->writeOperation != ES_WRITE_INDEX) && (pData->bulkId == NULL)) {
fde8c1
+		errmsg.LogError(0, RS_RET_CONFIG_ERROR,
fde8c1
+			"omelasticsearch: writeoperation '%d' requires bulkid", pData->writeOperation);
fde8c1
+		ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
fde8c1
+	}
fde8c1
+
fde8c1
+	if (pData->retryFailures) {
fde8c1
+		CHKiRet(ratelimitNew(&pData->ratelimiter, "omelasticsearch", NULL));
fde8c1
+		ratelimitSetLinuxLike(pData->ratelimiter, pData->ratelimitInterval, pData->ratelimitBurst);
fde8c1
+		ratelimitSetNoTimeCache(pData->ratelimiter);
fde8c1
+	}
fde8c1
+
fde8c1
+	/* node created, let's add to list of instance configs for the module */
fde8c1
+	if(loadModConf->tail == NULL) {
fde8c1
+		loadModConf->tail = loadModConf->root = pData;
fde8c1
+	} else {
fde8c1
+		loadModConf->tail->next = pData;
fde8c1
+		loadModConf->tail = pData;
fde8c1
+	}
fde8c1
+
fde8c1
 CODE_STD_FINALIZERnewActInst
fde8c1
 	cnfparamvalsDestruct(pvals, &actpblk);
fde8c1
 	if (serverParam)
fde8c1
@@ -1680,6 +1989,51 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
fde8c1
 CODE_STD_FINALIZERparseSelectorAct
fde8c1
 ENDparseSelectorAct
fde8c1
 
fde8c1
+
fde8c1
+BEGINbeginCnfLoad
fde8c1
+CODESTARTbeginCnfLoad
fde8c1
+	loadModConf = pModConf;
fde8c1
+	pModConf->pConf = pConf;
fde8c1
+	pModConf->root = pModConf->tail = NULL;
fde8c1
+ENDbeginCnfLoad
fde8c1
+
fde8c1
+
fde8c1
+BEGINendCnfLoad
fde8c1
+CODESTARTendCnfLoad
fde8c1
+	loadModConf = NULL; /* done loading */
fde8c1
+ENDendCnfLoad
fde8c1
+
fde8c1
+
fde8c1
+BEGINcheckCnf
fde8c1
+	instanceConf_t *inst;
fde8c1
+CODESTARTcheckCnf
fde8c1
+	for(inst = pModConf->root ; inst != NULL ; inst = inst->next) {
fde8c1
+		ruleset_t *pRuleset;
fde8c1
+		rsRetVal localRet;
fde8c1
+
fde8c1
+		if (inst->retryRulesetName) {
fde8c1
+			localRet = ruleset.GetRuleset(pModConf->pConf, &pRuleset, inst->retryRulesetName);
fde8c1
+			if(localRet == RS_RET_NOT_FOUND) {
fde8c1
+				errmsg.LogError(0, localRet, "omelasticsearch: retryruleset '%s' not found - "
fde8c1
+						"no retry ruleset will be used", inst->retryRulesetName);
fde8c1
+			} else {
fde8c1
+				inst->retryRuleset = pRuleset;
fde8c1
+			}
fde8c1
+		}
fde8c1
+	}
fde8c1
+ENDcheckCnf
fde8c1
+
fde8c1
+
fde8c1
+BEGINactivateCnf
fde8c1
+CODESTARTactivateCnf
fde8c1
+ENDactivateCnf
fde8c1
+
fde8c1
+
fde8c1
+BEGINfreeCnf
fde8c1
+CODESTARTfreeCnf
fde8c1
+ENDfreeCnf
fde8c1
+
fde8c1
+
fde8c1
 BEGINdoHUP
fde8c1
 CODESTARTdoHUP
fde8c1
 	if(pData->fdErrFile != -1) {
fde8c1
@@ -1691,10 +2045,14 @@ ENDdoHUP
fde8c1
 
fde8c1
 BEGINmodExit
fde8c1
 CODESTARTmodExit
fde8c1
+	if(pInputName != NULL)
fde8c1
+		prop.Destruct(&pInputName);
fde8c1
 	curl_global_cleanup();
fde8c1
 	statsobj.Destruct(&indexStats);
fde8c1
 	objRelease(errmsg, CORE_COMPONENT);
fde8c1
-        objRelease(statsobj, CORE_COMPONENT);
fde8c1
+	objRelease(statsobj, CORE_COMPONENT);
fde8c1
+	objRelease(prop, CORE_COMPONENT);
fde8c1
+	objRelease(ruleset, CORE_COMPONENT);
fde8c1
 ENDmodExit
fde8c1
 
fde8c1
 BEGINqueryEtryPt
fde8c1
@@ -1705,6 +2063,7 @@ CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
fde8c1
 CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
fde8c1
 CODEqueryEtryPt_doHUP
fde8c1
 CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
fde8c1
+CODEqueryEtryPt_STD_CONF2_QUERIES
fde8c1
 ENDqueryEtryPt
fde8c1
 
fde8c1
 
fde8c1
@@ -1714,6 +2073,8 @@ CODESTARTmodInit
fde8c1
 CODEmodInit_QueryRegCFSLineHdlr
fde8c1
 	CHKiRet(objUse(errmsg, CORE_COMPONENT));
fde8c1
 	CHKiRet(objUse(statsobj, CORE_COMPONENT));
fde8c1
+	CHKiRet(objUse(prop, CORE_COMPONENT));
fde8c1
+	CHKiRet(objUse(ruleset, CORE_COMPONENT));
fde8c1
 
fde8c1
 	if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
fde8c1
 		errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled");
fde8c1
@@ -1739,7 +2100,28 @@ CODEmodInit_QueryRegCFSLineHdlr
fde8c1
 	STATSCOUNTER_INIT(indexESFail, mutIndexESFail);
fde8c1
 	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.es",
fde8c1
 		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexESFail));
fde8c1
+	STATSCOUNTER_INIT(indexSuccess, mutIndexSuccess);
fde8c1
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.success",
fde8c1
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexSuccess));
fde8c1
+	STATSCOUNTER_INIT(indexBadResponse, mutIndexBadResponse);
fde8c1
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bad",
fde8c1
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadResponse));
fde8c1
+	STATSCOUNTER_INIT(indexDuplicate, mutIndexDuplicate);
fde8c1
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.duplicate",
fde8c1
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexDuplicate));
fde8c1
+	STATSCOUNTER_INIT(indexBadArgument, mutIndexBadArgument);
fde8c1
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.badargument",
fde8c1
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadArgument));
fde8c1
+	STATSCOUNTER_INIT(indexBulkRejection, mutIndexBulkRejection);
fde8c1
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bulkrejection",
fde8c1
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBulkRejection));
fde8c1
+	STATSCOUNTER_INIT(indexOtherResponse, mutIndexOtherResponse);
fde8c1
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.other",
fde8c1
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexOtherResponse));
fde8c1
 	CHKiRet(statsobj.ConstructFinalize(indexStats));
fde8c1
+	CHKiRet(prop.Construct(&pInputName));
fde8c1
+	CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("omelasticsearch"), sizeof("omelasticsearch") - 1));
fde8c1
+	CHKiRet(prop.ConstructFinalize(pInputName));
fde8c1
 ENDmodInit
fde8c1
 
fde8c1
 /* vi:set ai: