Blame SOURCES/rsyslog-8.24.0-rhbz1565214-omelasticsearch-write-op-types-bulk-rejection-retries.patch

c17bfd
From 989be897340eb458b00efedfd5e082bb362db79a Mon Sep 17 00:00:00 2001
c17bfd
From: Rich Megginson <rmeggins@redhat.com>
c17bfd
Date: Tue, 15 May 2018 16:03:25 -0600
c17bfd
Subject: [PATCH 11/11] omelasticsearch: write op types; bulk rejection retries
c17bfd
c17bfd
Add support for a 'create' write operation type in addition to
c17bfd
the default 'index'.  Using create allows specifying a unique id
c17bfd
for each record, and allows duplicate document detection.
c17bfd
c17bfd
Add support for checking each record returned in a bulk index
c17bfd
request response.  Allow specifying a ruleset to send each failed
c17bfd
record to.  Add a local variable `omes` which contains the
c17bfd
information in the error response, so that users can control how
c17bfd
to handle responses e.g. retry, or send to an error file.
c17bfd
c17bfd
Add support for response stats - count successes, duplicates, and
c17bfd
different types of failures.
c17bfd
c17bfd
Add testing for bulk index rejections.
c17bfd
c17bfd
(cherry picked from commit 57dd368a2a915d79c94a8dc0de30c93a0bbdc8fe)
c17bfd
(cherry picked from commit 30a15621e1e7e393b2153e9fe5c13f724dea25b5)
c17bfd
---
c17bfd
 plugins/omelasticsearch/omelasticsearch.c | 441 ++++++++++++++++++++++++++++--
c17bfd
 1 file changed, 415 insertions(+), 26 deletions(-)
c17bfd
c17bfd
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
c17bfd
index ed2b47535..ca61ae28f 100644
c17bfd
--- a/plugins/omelasticsearch/omelasticsearch.c
c17bfd
+++ b/plugins/omelasticsearch/omelasticsearch.c
c17bfd
@@ -51,6 +51,8 @@
c17bfd
 #include "statsobj.h"
c17bfd
 #include "cfsysline.h"
c17bfd
 #include "unicode-helper.h"
c17bfd
+#include "ratelimit.h"
c17bfd
+#include "ruleset.h"
c17bfd
 
c17bfd
 #ifndef O_LARGEFILE
c17bfd
 #  define O_LARGEFILE 0
c17bfd
@@ -64,6 +66,8 @@ MODULE_CNFNAME("omelasticsearch")
c17bfd
 DEF_OMOD_STATIC_DATA
c17bfd
 DEFobjCurrIf(errmsg)
c17bfd
 DEFobjCurrIf(statsobj)
c17bfd
+DEFobjCurrIf(prop)
c17bfd
+DEFobjCurrIf(ruleset)
c17bfd
 
c17bfd
 statsobj_t *indexStats;
c17bfd
 STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit)
c17bfd
@@ -71,19 +75,35 @@ STATSCOUNTER_DEF(indexHTTPFail, mutIndexHTTPFail)
c17bfd
 STATSCOUNTER_DEF(indexHTTPReqFail, mutIndexHTTPReqFail)
c17bfd
 STATSCOUNTER_DEF(checkConnFail, mutCheckConnFail)
c17bfd
 STATSCOUNTER_DEF(indexESFail, mutIndexESFail)
c17bfd
+STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess)
c17bfd
+STATSCOUNTER_DEF(indexBadResponse, mutIndexBadResponse)
c17bfd
+STATSCOUNTER_DEF(indexDuplicate, mutIndexDuplicate)
c17bfd
+STATSCOUNTER_DEF(indexBadArgument, mutIndexBadArgument)
c17bfd
+STATSCOUNTER_DEF(indexBulkRejection, mutIndexBulkRejection)
c17bfd
+STATSCOUNTER_DEF(indexOtherResponse, mutIndexOtherResponse)
c17bfd
 
c17bfd
+static prop_t *pInputName = NULL;
c17bfd
 
c17bfd
 #	define META_STRT "{\"index\":{\"_index\": \""
c17bfd
+#	define META_STRT_CREATE "{\"create\":{\"_index\": \""
c17bfd
 #	define META_TYPE "\",\"_type\":\""
c17bfd
 #	define META_PARENT "\",\"_parent\":\""
c17bfd
 #	define META_ID "\", \"_id\":\""
c17bfd
 #	define META_END  "\"}}\n"
c17bfd
 
c17bfd
+typedef enum {
c17bfd
+	ES_WRITE_INDEX,
c17bfd
+	ES_WRITE_CREATE,
c17bfd
+	ES_WRITE_UPDATE, /* not supported */
c17bfd
+	ES_WRITE_UPSERT /* not supported */
c17bfd
+} es_write_ops_t;
c17bfd
+
c17bfd
 /* REST API for elasticsearch hits this URL:
c17bfd
  * http://<hostName>:<restPort>/<searchIndex>/<searchType>
c17bfd
  */
c17bfd
+/* bulk API uses /_bulk */
c17bfd
 typedef struct curl_slist HEADER;
c17bfd
-typedef struct _instanceData {
c17bfd
+typedef struct instanceConf_s {
c17bfd
 	int defaultPort;
c17bfd
 	int fdErrFile;		/* error file fd or -1 if not open */
c17bfd
 	pthread_mutex_t mutErrFile;
c17bfd
@@ -113,8 +133,25 @@ typedef struct _instanceData {
c17bfd
 	uchar *caCertFile;
c17bfd
 	uchar *myCertFile;
c17bfd
 	uchar *myPrivKeyFile;
c17bfd
+	es_write_ops_t writeOperation;
c17bfd
+	sbool retryFailures;
c17bfd
+	int ratelimitInterval;
c17bfd
+	int ratelimitBurst;
c17bfd
+	/* for retries */
c17bfd
+	ratelimit_t *ratelimiter;
c17bfd
+	uchar *retryRulesetName;
c17bfd
+	ruleset_t *retryRuleset;
c17bfd
+	struct instanceConf_s *next;
c17bfd
 } instanceData;
c17bfd
 
c17bfd
+typedef instanceConf_t instanceData;
c17bfd
+
c17bfd
+struct modConfData_s {
c17bfd
+	rsconf_t *pConf;		/* our overall config object */
c17bfd
+	instanceConf_t *root, *tail;
c17bfd
+};
c17bfd
+static modConfData_t *loadModConf = NULL;	/* modConf ptr to use for the current load process */
c17bfd
+
c17bfd
 typedef struct wrkrInstanceData {
c17bfd
 	instanceData *pData;
c17bfd
 	int serverIndex;
c17bfd
@@ -160,7 +197,12 @@ static struct cnfparamdescr actpdescr[] = {
c17bfd
 	{ "allowunsignedcerts", eCmdHdlrBinary, 0 },
c17bfd
 	{ "tls.cacert", eCmdHdlrString, 0 },
c17bfd
 	{ "tls.mycert", eCmdHdlrString, 0 },
c17bfd
-	{ "tls.myprivkey", eCmdHdlrString, 0 }
c17bfd
+	{ "tls.myprivkey", eCmdHdlrString, 0 },
c17bfd
+	{ "writeoperation", eCmdHdlrGetWord, 0 },
c17bfd
+	{ "retryfailures", eCmdHdlrBinary, 0 },
c17bfd
+	{ "ratelimit.interval", eCmdHdlrInt, 0 },
c17bfd
+	{ "ratelimit.burst", eCmdHdlrInt, 0 },
c17bfd
+	{ "retryruleset", eCmdHdlrString, 0 }
c17bfd
 };
c17bfd
 static struct cnfparamblk actpblk =
c17bfd
 	{ CNFPARAMBLK_VERSION,
c17bfd
@@ -177,6 +219,9 @@ CODESTARTcreateInstance
c17bfd
 	pData->caCertFile = NULL;
c17bfd
 	pData->myCertFile = NULL;
c17bfd
 	pData->myPrivKeyFile = NULL;
c17bfd
+	pData->ratelimiter = NULL;
c17bfd
+	pData->retryRulesetName = NULL;
c17bfd
+	pData->retryRuleset = NULL;
c17bfd
 ENDcreateInstance
c17bfd
 
c17bfd
 BEGINcreateWrkrInstance
c17bfd
@@ -228,6 +273,9 @@ CODESTARTfreeInstance
c17bfd
 	free(pData->caCertFile);
c17bfd
 	free(pData->myCertFile);
c17bfd
 	free(pData->myPrivKeyFile);
c17bfd
+	free(pData->retryRulesetName);
c17bfd
+	if (pData->ratelimiter != NULL)
c17bfd
+		ratelimitDestruct(pData->ratelimiter);
c17bfd
 ENDfreeInstance
c17bfd
 
c17bfd
 BEGINfreeWrkrInstance
c17bfd
@@ -285,6 +333,10 @@ CODESTARTdbgPrintInstInfo
c17bfd
 	dbgprintf("\ttls.cacert='%s'\n", pData->caCertFile);
c17bfd
 	dbgprintf("\ttls.mycert='%s'\n", pData->myCertFile);
c17bfd
 	dbgprintf("\ttls.myprivkey='%s'\n", pData->myPrivKeyFile);
c17bfd
+	dbgprintf("\twriteoperation='%d'\n", pData->writeOperation);
c17bfd
+	dbgprintf("\tretryfailures='%d'\n", pData->retryFailures);
c17bfd
+	dbgprintf("\tratelimit.interval='%d'\n", pData->ratelimitInterval);
c17bfd
+	dbgprintf("\tratelimit.burst='%d'\n", pData->ratelimitBurst);
c17bfd
 ENDdbgPrintInstInfo
c17bfd
 
c17bfd
 
c17bfd
@@ -557,7 +609,11 @@ finalize_it:
c17bfd
 static size_t
c17bfd
 computeMessageSize(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
c17bfd
 {
c17bfd
-	size_t r = sizeof(META_STRT)-1 + sizeof(META_TYPE)-1 + sizeof(META_END)-1 + sizeof("\n")-1;
c17bfd
+	size_t r = sizeof(META_TYPE)-1 + sizeof(META_END)-1 + sizeof("\n")-1;
c17bfd
+	if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
c17bfd
+		r += sizeof(META_STRT_CREATE)-1;
c17bfd
+	else
c17bfd
+		r += sizeof(META_STRT)-1;
c17bfd
 
c17bfd
 	uchar *searchIndex = 0;
c17bfd
 	uchar *searchType;
c17bfd
@@ -594,7 +650,10 @@ buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
c17bfd
 	DEFiRet;
c17bfd
 
c17bfd
 	getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId);
c17bfd
-	r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1);
c17bfd
+	if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
c17bfd
+		r = es_addBuf(&pWrkrData->batch.data, META_STRT_CREATE, sizeof(META_STRT_CREATE)-1);
c17bfd
+	else
c17bfd
+		r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1);
c17bfd
 	if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex,
c17bfd
 				 ustrlen(searchIndex));
c17bfd
 	if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
c17bfd
@@ -709,13 +768,20 @@ static int checkReplyStatus(fjson_object* ok) {
c17bfd
 
c17bfd
 /*
c17bfd
  * Context object for error file content creation or status check
c17bfd
+ * response_item - the full {"create":{"_index":"idxname",.....}}
c17bfd
+ * response_body - the inner hash of the response_item - {"_index":"idxname",...}
c17bfd
+ * status - the "status" field from the inner hash - "status":500
c17bfd
+ *          should be able to use fjson_object_get_int(status) to get the http result code
c17bfd
  */
c17bfd
 typedef struct exeContext{
c17bfd
 	int statusCheckOnly;
c17bfd
 	fjson_object *errRoot;
c17bfd
-	rsRetVal (*prepareErrorFileContent)(struct exeContext *ctx,int itemStatus,char *request,char *response);
c17bfd
-
c17bfd
-
c17bfd
+	rsRetVal (*prepareErrorFileContent)(struct exeContext *ctx,int itemStatus,char *request,char *response,
c17bfd
+			fjson_object *response_item, fjson_object *response_body, fjson_object *status);
c17bfd
+	es_write_ops_t writeOperation;
c17bfd
+	ratelimit_t *ratelimiter;
c17bfd
+	ruleset_t *retryRuleset;
c17bfd
+	struct json_tokener *jTokener;
c17bfd
 } context;
c17bfd
 
c17bfd
 /*
c17bfd
@@ -728,8 +794,15 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
c17bfd
 	fjson_object *replyRoot = *pReplyRoot;
c17bfd
 	int i;
c17bfd
 	int numitems;
c17bfd
-	fjson_object *items=NULL;
c17bfd
+	fjson_object *items=NULL, *jo_errors = NULL;
c17bfd
+	int errors = 0;
c17bfd
 
c17bfd
+	if(fjson_object_object_get_ex(replyRoot, "errors", &jo_errors)) {
c17bfd
+		errors = fjson_object_get_boolean(jo_errors);
c17bfd
+		if (!errors && pWrkrData->pData->retryFailures) {
c17bfd
+			return RS_RET_OK;
c17bfd
+		}
c17bfd
+	}
c17bfd
 
c17bfd
 	/*iterate over items*/
c17bfd
 	if(!fjson_object_object_get_ex(replyRoot, "items", &items)) {
c17bfd
@@ -741,7 +814,11 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
c17bfd
 
c17bfd
 	numitems = fjson_object_array_length(items);
c17bfd
 
c17bfd
-	DBGPRINTF("omelasticsearch: Entire request %s\n",reqmsg);
c17bfd
+	if (reqmsg) {
c17bfd
+		DBGPRINTF("omelasticsearch: Entire request %s\n", reqmsg);
c17bfd
+	} else {
c17bfd
+		DBGPRINTF("omelasticsearch: Empty request\n");
c17bfd
+	}
c17bfd
 	const char *lastReqRead= (char*)reqmsg;
c17bfd
 
c17bfd
 	DBGPRINTF("omelasticsearch: %d items in reply\n", numitems);
c17bfd
@@ -769,8 +846,7 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
c17bfd
 		
c17bfd
 		char *request =0;
c17bfd
 		char *response =0;
c17bfd
-		if(ctx->statusCheckOnly)
c17bfd
-		{
c17bfd
+		if(ctx->statusCheckOnly || (NULL == lastReqRead)) {
c17bfd
 			if(itemStatus) {
c17bfd
 				DBGPRINTF("omelasticsearch: error in elasticsearch reply: item %d, status is %d\n", i, fjson_object_get_int(ok));
c17bfd
 				DBGPRINTF("omelasticsearch: status check found error.\n");
c17bfd
@@ -795,7 +871,8 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
c17bfd
 			}
c17bfd
 
c17bfd
 			/*call the context*/
c17bfd
-			rsRetVal ret = ctx->prepareErrorFileContent(ctx, itemStatus, request,response);
c17bfd
+			rsRetVal ret = ctx->prepareErrorFileContent(ctx, itemStatus, request,
c17bfd
+					response, item, result, ok);
c17bfd
 
c17bfd
 			/*free memory in any case*/
c17bfd
 			free(request);
c17bfd
@@ -818,11 +895,14 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
c17bfd
  * Dumps only failed requests of bulk insert
c17bfd
  */
c17bfd
 static rsRetVal
c17bfd
-getDataErrorOnly(context *ctx,int itemStatus,char *request,char *response)
c17bfd
+getDataErrorOnly(context *ctx,int itemStatus,char *request,char *response,
c17bfd
+		fjson_object *response_item, fjson_object *response_body, fjson_object *status)
c17bfd
 {
c17bfd
 	DEFiRet;
c17bfd
-	if(itemStatus)
c17bfd
-	{
c17bfd
+	(void)response_item; /* unused */
c17bfd
+	(void)response_body; /* unused */
c17bfd
+	(void)status; /* unused */
c17bfd
+	if(itemStatus) {
c17bfd
 		fjson_object *onlyErrorResponses =NULL;
c17bfd
 		fjson_object *onlyErrorRequests=NULL;
c17bfd
 
c17bfd
@@ -855,9 +935,16 @@ static rsRetVal
c17bfd
 getDataInterleaved(context *ctx,
c17bfd
 	int __attribute__((unused)) itemStatus,
c17bfd
 	char *request,
c17bfd
-	char *response)
c17bfd
+	char *response,
c17bfd
+	fjson_object *response_item,
c17bfd
+	fjson_object *response_body,
c17bfd
+	fjson_object *status
c17bfd
+)
c17bfd
 {
c17bfd
 	DEFiRet;
c17bfd
+	(void)response_item; /* unused */
c17bfd
+	(void)response_body; /* unused */
c17bfd
+	(void)status; /* unused */
c17bfd
 	fjson_object *interleaved =NULL;
c17bfd
 	if(!fjson_object_object_get_ex(ctx->errRoot, "response", &interleaved))
c17bfd
 	{
c17bfd
@@ -889,11 +976,13 @@ getDataInterleaved(context *ctx,
c17bfd
  */
c17bfd
 
c17bfd
 static rsRetVal
c17bfd
-getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *response)
c17bfd
+getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *response,
c17bfd
+		fjson_object *response_item, fjson_object *response_body, fjson_object *status)
c17bfd
 {
c17bfd
 	DEFiRet;
c17bfd
 	if (itemStatus) {
c17bfd
-		if(getDataInterleaved(ctx, itemStatus,request,response)!= RS_RET_OK) {
c17bfd
+		if(getDataInterleaved(ctx, itemStatus,request,response,
c17bfd
+				response_item, response_body, status)!= RS_RET_OK) {
c17bfd
 			ABORT_FINALIZE(RS_RET_ERR);
c17bfd
 		}
c17bfd
 	}
c17bfd
@@ -902,6 +991,141 @@ getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *resp
c17bfd
 		RETiRet;
c17bfd
 }
c17bfd
 
c17bfd
+/* request string looks like this:
c17bfd
+ * "{\"create\":{\"_index\": \"rsyslog_testbench\",\"_type\":\"test-type\",
c17bfd
+ *   \"_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}}\n
c17bfd
+ * {\"msgnum\":\"x00000000\",\"viaq_msg_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}\n"
c17bfd
+ * we don't want the meta header, only the data part
c17bfd
+ * start = first \n + 1
c17bfd
+ * end = last \n
c17bfd
+ */
c17bfd
+static rsRetVal
c17bfd
+createMsgFromRequest(const char *request, context *ctx, smsg_t **msg)
c17bfd
+{
c17bfd
+	DEFiRet;
c17bfd
+	fjson_object *jo_msg = NULL;
c17bfd
+	const char *datastart, *dataend;
c17bfd
+	size_t datalen;
c17bfd
+	enum json_tokener_error json_error;
c17bfd
+
c17bfd
+	*msg = NULL;
c17bfd
+	if (!(datastart = strchr(request, '\n')) || (datastart[1] != '{')) {
c17bfd
+		LogError(0, RS_RET_ERR,
c17bfd
+			"omelasticsearch: malformed original request - "
c17bfd
+			"could not find start of original data [%s]",
c17bfd
+			request);
c17bfd
+		ABORT_FINALIZE(RS_RET_ERR);
c17bfd
+	}
c17bfd
+	datastart++; /* advance to { */
c17bfd
+	if (!(dataend = strchr(datastart, '\n')) || (dataend[1] != '\0')) {
c17bfd
+		LogError(0, RS_RET_ERR,
c17bfd
+			"omelasticsearch: malformed original request - "
c17bfd
+			"could not find end of original data [%s]",
c17bfd
+			request);
c17bfd
+		ABORT_FINALIZE(RS_RET_ERR);
c17bfd
+	}
c17bfd
+	datalen = dataend - datastart;
c17bfd
+	json_tokener_reset(ctx->jTokener);
c17bfd
+	fjson_object *jo_request = json_tokener_parse_ex(ctx->jTokener, datastart, datalen);
c17bfd
+	json_error = fjson_tokener_get_error(ctx->jTokener);
c17bfd
+	if (!jo_request || (json_error != fjson_tokener_success)) {
c17bfd
+		LogError(0, RS_RET_ERR,
c17bfd
+			"omelasticsearch: parse error [%s] - could not convert original "
c17bfd
+			"request JSON back into JSON object [%s]",
c17bfd
+			fjson_tokener_error_desc(json_error), request);
c17bfd
+		ABORT_FINALIZE(RS_RET_ERR);
c17bfd
+	}
c17bfd
+
c17bfd
+	CHKiRet(msgConstruct(msg));
c17bfd
+	MsgSetFlowControlType(*msg, eFLOWCTL_FULL_DELAY);
c17bfd
+	MsgSetInputName(*msg, pInputName);
c17bfd
+	if (fjson_object_object_get_ex(jo_request, "message", &jo_msg)) {
c17bfd
+		const char *rawmsg = json_object_get_string(jo_msg);
c17bfd
+		const size_t msgLen = (size_t)json_object_get_string_len(jo_msg);
c17bfd
+		MsgSetRawMsg(*msg, rawmsg, msgLen);
c17bfd
+	} else {
c17bfd
+		MsgSetRawMsg(*msg, request, strlen(request));
c17bfd
+	}
c17bfd
+	MsgSetMSGoffs(*msg, 0);	/* we do not have a header... */
c17bfd
+	CHKiRet(msgAddJSON(*msg, (uchar*)"!", jo_request, 0, 0));
c17bfd
+
c17bfd
+	finalize_it:
c17bfd
+		RETiRet;
c17bfd
+
c17bfd
+}
c17bfd
+
c17bfd
+
c17bfd
+static rsRetVal
c17bfd
+getDataRetryFailures(context *ctx,int itemStatus,char *request,char *response,
c17bfd
+		fjson_object *response_item, fjson_object *response_body, fjson_object *status)
c17bfd
+{
c17bfd
+	DEFiRet;
c17bfd
+	fjson_object *omes = NULL, *jo = NULL;
c17bfd
+	int istatus = fjson_object_get_int(status);
c17bfd
+	int iscreateop = 0;
c17bfd
+	struct json_object_iterator it = json_object_iter_begin(response_item);
c17bfd
+	struct json_object_iterator itEnd = json_object_iter_end(response_item);
c17bfd
+	const char *optype = NULL;
c17bfd
+	smsg_t *msg = NULL;
c17bfd
+
c17bfd
+	(void)response;
c17bfd
+	(void)itemStatus;
c17bfd
+	CHKiRet(createMsgFromRequest(request, ctx, &msg));
c17bfd
+	CHKmalloc(msg);
c17bfd
+	/* add status as local variables */
c17bfd
+	omes = json_object_new_object();
c17bfd
+	if (!json_object_iter_equal(&it, &itEnd))
c17bfd
+		optype = json_object_iter_peek_name(&it);
c17bfd
+	if (optype && !strcmp("create", optype))
c17bfd
+		iscreateop = 1;
c17bfd
+	if (optype && !strcmp("index", optype) && (ctx->writeOperation == ES_WRITE_INDEX))
c17bfd
+		iscreateop = 1;
c17bfd
+	if (optype) {
c17bfd
+		jo = json_object_new_string(optype);
c17bfd
+	} else {
c17bfd
+		jo = json_object_new_string("unknown");
c17bfd
+	}
c17bfd
+	json_object_object_add(omes, "writeoperation", jo);
c17bfd
+
c17bfd
+	if (!optype) {
c17bfd
+		STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
c17bfd
+	} else if ((istatus == 200) || (istatus == 201)) {
c17bfd
+		STATSCOUNTER_INC(indexSuccess, mutIndexSuccess);
c17bfd
+	} else if ((istatus == 409) && iscreateop) {
c17bfd
+		STATSCOUNTER_INC(indexDuplicate, mutIndexDuplicate);
c17bfd
+	} else if (istatus == 400 || (istatus < 200)) {
c17bfd
+		STATSCOUNTER_INC(indexBadArgument, mutIndexBadArgument);
c17bfd
+	} else {
c17bfd
+		fjson_object *error = NULL, *errtype = NULL;
c17bfd
+		if(fjson_object_object_get_ex(response_body, "error", &error) &&
c17bfd
+		   fjson_object_object_get_ex(error, "type", &errtype)) {
c17bfd
+			if (istatus == 429) {
c17bfd
+				STATSCOUNTER_INC(indexBulkRejection, mutIndexBulkRejection);
c17bfd
+			} else {
c17bfd
+				STATSCOUNTER_INC(indexOtherResponse, mutIndexOtherResponse);
c17bfd
+			}
c17bfd
+		} else {
c17bfd
+			STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
c17bfd
+		}
c17bfd
+	}
c17bfd
+	/* add response_body fields to local var omes */
c17bfd
+	it = json_object_iter_begin(response_body);
c17bfd
+	itEnd = json_object_iter_end(response_body);
c17bfd
+	while (!json_object_iter_equal(&it, &itEnd)) {
c17bfd
+		json_object_object_add(omes, json_object_iter_peek_name(&it),
c17bfd
+			json_object_get(json_object_iter_peek_value(&it)));
c17bfd
+		json_object_iter_next(&it);
c17bfd
+	}
c17bfd
+	CHKiRet(msgAddJSON(msg, (uchar*)".omes", omes, 0, 0));
c17bfd
+	omes = NULL;
c17bfd
+	MsgSetRuleset(msg, ctx->retryRuleset);
c17bfd
+	CHKiRet(ratelimitAddMsg(ctx->ratelimiter, NULL, msg));
c17bfd
+finalize_it:
c17bfd
+	if (omes)
c17bfd
+		json_object_put(omes);
c17bfd
+	RETiRet;
c17bfd
+}
c17bfd
+
c17bfd
 /*
c17bfd
  * get erroronly context
c17bfd
  */
c17bfd
@@ -979,6 +1203,23 @@ initializeErrorInterleavedConext(wrkrInstanceData_t *pWrkrData,context *ctx){
c17bfd
 		RETiRet;
c17bfd
 }
c17bfd
 
c17bfd
+/*get retry failures context*/
c17bfd
+static rsRetVal
c17bfd
+initializeRetryFailuresContext(wrkrInstanceData_t *pWrkrData,context *ctx){
c17bfd
+	DEFiRet;
c17bfd
+	ctx->statusCheckOnly=0;
c17bfd
+	fjson_object *errRoot=NULL;
c17bfd
+	if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
c17bfd
+
c17bfd
+
c17bfd
+	fjson_object_object_add(errRoot, "url", fjson_object_new_string((char*)pWrkrData->restURL));
c17bfd
+	ctx->errRoot = errRoot;
c17bfd
+	ctx->prepareErrorFileContent= &getDataRetryFailures;
c17bfd
+	CHKmalloc(ctx->jTokener = json_tokener_new());
c17bfd
+	finalize_it:
c17bfd
+		RETiRet;
c17bfd
+}
c17bfd
+
c17bfd
 
c17bfd
 /* write data error request/replies to separate error file
c17bfd
  * Note: we open the file but never close it before exit. If it
c17bfd
@@ -994,6 +1235,10 @@ writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, fjson_object
c17bfd
 	char errStr[1024];
c17bfd
 	context ctx;
c17bfd
 	ctx.errRoot=0;
c17bfd
+	ctx.writeOperation = pWrkrData->pData->writeOperation;
c17bfd
+	ctx.ratelimiter = pWrkrData->pData->ratelimiter;
c17bfd
+	ctx.retryRuleset = pWrkrData->pData->retryRuleset;
c17bfd
+	ctx.jTokener = NULL;
c17bfd
 	DEFiRet;
c17bfd
 
c17bfd
 	if(pData->errorFile == NULL) {
c17bfd
@@ -1039,9 +1284,12 @@ writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, fjson_object
c17bfd
 				DBGPRINTF("omelasticsearch: error initializing error interleaved context.\n");
c17bfd
 				ABORT_FINALIZE(RS_RET_ERR);
c17bfd
 			}
c17bfd
-		}
c17bfd
-		else
c17bfd
-		{
c17bfd
+		} else if(pData->retryFailures) {
c17bfd
+			if(initializeRetryFailuresContext(pWrkrData, &ctx) != RS_RET_OK) {
c17bfd
+				DBGPRINTF("omelasticsearch: error initializing retry failures context.\n");
c17bfd
+				ABORT_FINALIZE(RS_RET_ERR);
c17bfd
+			}
c17bfd
+		} else {
c17bfd
 			DBGPRINTF("omelasticsearch: None of the modes match file write. No data to write.\n");
c17bfd
 			ABORT_FINALIZE(RS_RET_ERR);
c17bfd
 		}
c17bfd
@@ -1082,25 +1330,38 @@ finalize_it:
c17bfd
 	if(bMutLocked)
c17bfd
 		pthread_mutex_unlock(&pData->mutErrFile);
c17bfd
 	fjson_object_put(ctx.errRoot);
c17bfd
+	if (ctx.jTokener)
c17bfd
+		json_tokener_free(ctx.jTokener);
c17bfd
+	free(rendered);
c17bfd
 	RETiRet;
c17bfd
 }
c17bfd
 
c17bfd
 
c17bfd
 static rsRetVal
c17bfd
-checkResultBulkmode(wrkrInstanceData_t *pWrkrData, fjson_object *root)
c17bfd
+checkResultBulkmode(wrkrInstanceData_t *pWrkrData, fjson_object *root, uchar *reqmsg)
c17bfd
 {
c17bfd
 	DEFiRet;
c17bfd
 	context ctx;
c17bfd
-	ctx.statusCheckOnly=1;
c17bfd
 	ctx.errRoot = 0;
c17bfd
-	if(parseRequestAndResponseForContext(pWrkrData,&root,0,&ctx)!= RS_RET_OK)
c17bfd
-	{
c17bfd
+	ctx.writeOperation = pWrkrData->pData->writeOperation;
c17bfd
+	ctx.ratelimiter = pWrkrData->pData->ratelimiter;
c17bfd
+	ctx.retryRuleset = pWrkrData->pData->retryRuleset;
c17bfd
+	ctx.statusCheckOnly=1;
c17bfd
+	ctx.jTokener = NULL;
c17bfd
+	if (pWrkrData->pData->retryFailures) {
c17bfd
+		ctx.statusCheckOnly=0;
c17bfd
+		CHKiRet(initializeRetryFailuresContext(pWrkrData, &ctx));
c17bfd
+	}
c17bfd
+	if(parseRequestAndResponseForContext(pWrkrData,&root,reqmsg,&ctx)!= RS_RET_OK) {
c17bfd
 		DBGPRINTF("omelasticsearch: error found in elasticsearch reply\n");
c17bfd
 		ABORT_FINALIZE(RS_RET_DATAFAIL);
c17bfd
 	}
c17bfd
 
c17bfd
-	finalize_it:
c17bfd
-		RETiRet;
c17bfd
+finalize_it:
c17bfd
+	fjson_object_put(ctx.errRoot);
c17bfd
+	if (ctx.jTokener)
c17bfd
+		json_tokener_free(ctx.jTokener);
c17bfd
+	RETiRet;
c17bfd
 }
c17bfd
 
c17bfd
 
c17bfd
@@ -1118,7 +1378,7 @@ checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg)
c17bfd
 	}
c17bfd
 
c17bfd
 	if(pWrkrData->pData->bulkmode) {
c17bfd
-		iRet = checkResultBulkmode(pWrkrData, root);
c17bfd
+		iRet = checkResultBulkmode(pWrkrData, root, reqmsg);
c17bfd
 	} else {
c17bfd
 		if(fjson_object_object_get_ex(root, "status", &status)) {
c17bfd
 			iRet = RS_RET_DATAFAIL;
c17bfd
@@ -1397,6 +1657,13 @@ setInstParamDefaults(instanceData *pData)
c17bfd
 	pData->caCertFile = NULL;
c17bfd
 	pData->myCertFile = NULL;
c17bfd
 	pData->myPrivKeyFile = NULL;
c17bfd
+	pData->writeOperation = ES_WRITE_INDEX;
c17bfd
+	pData->retryFailures = 0;
c17bfd
+	pData->ratelimitBurst = 20000;
c17bfd
+	pData->ratelimitInterval = 600;
c17bfd
+	pData->ratelimiter = NULL;
c17bfd
+	pData->retryRulesetName = NULL;
c17bfd
+	pData->retryRuleset = NULL;
c17bfd
 }
c17bfd
 
c17bfd
 BEGINnewActInst
c17bfd
@@ -1495,6 +1762,27 @@ CODESTARTnewActInst
c17bfd
 			} else {
c17bfd
 				fclose(fp);
c17bfd
 			}
c17bfd
+		} else if(!strcmp(actpblk.descr[i].name, "writeoperation")) {
c17bfd
+			char *writeop = es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
+			if (writeop && !strcmp(writeop, "create")) {
c17bfd
+				pData->writeOperation = ES_WRITE_CREATE;
c17bfd
+			} else if (writeop && !strcmp(writeop, "index")) {
c17bfd
+				pData->writeOperation = ES_WRITE_INDEX;
c17bfd
+			} else if (writeop) {
c17bfd
+				errmsg.LogError(0, RS_RET_CONFIG_ERROR,
c17bfd
+					"omelasticsearch: invalid value '%s' for writeoperation: "
c17bfd
+					"must be one of 'index' or 'create' - using default value 'index'", writeop);
c17bfd
+				pData->writeOperation = ES_WRITE_INDEX;
c17bfd
+			}
c17bfd
+			free(writeop);
c17bfd
+		} else if(!strcmp(actpblk.descr[i].name, "retryfailures")) {
c17bfd
+			pData->retryFailures = pvals[i].val.d.n;
c17bfd
+		} else if(!strcmp(actpblk.descr[i].name, "ratelimit.burst")) {
c17bfd
+			pData->ratelimitBurst = (int) pvals[i].val.d.n;
c17bfd
+		} else if(!strcmp(actpblk.descr[i].name, "ratelimit.interval")) {
c17bfd
+			pData->ratelimitInterval = (int) pvals[i].val.d.n;
c17bfd
+		} else if(!strcmp(actpblk.descr[i].name, "retryruleset")) {
c17bfd
+			pData->retryRulesetName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
 		} else {
c17bfd
 			dbgprintf("omelasticsearch: program error, non-handled "
c17bfd
 			  "param '%s'\n", actpblk.descr[i].name);
c17bfd
@@ -1661,6 +1949,27 @@ CODESTARTnewActInst
c17bfd
 		pData->searchIndex = (uchar*) strdup("system");
c17bfd
 	if(pData->searchType == NULL)
c17bfd
 		pData->searchType = (uchar*) strdup("events");
c17bfd
+
c17bfd
+	if ((pData->writeOperation != ES_WRITE_INDEX) && (pData->bulkId == NULL)) {
c17bfd
+		errmsg.LogError(0, RS_RET_CONFIG_ERROR,
c17bfd
+			"omelasticsearch: writeoperation '%d' requires bulkid", pData->writeOperation);
c17bfd
+		ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
c17bfd
+	}
c17bfd
+
c17bfd
+	if (pData->retryFailures) {
c17bfd
+		CHKiRet(ratelimitNew(&pData->ratelimiter, "omelasticsearch", NULL));
c17bfd
+		ratelimitSetLinuxLike(pData->ratelimiter, pData->ratelimitInterval, pData->ratelimitBurst);
c17bfd
+		ratelimitSetNoTimeCache(pData->ratelimiter);
c17bfd
+	}
c17bfd
+
c17bfd
+	/* node created, let's add to list of instance configs for the module */
c17bfd
+	if(loadModConf->tail == NULL) {
c17bfd
+		loadModConf->tail = loadModConf->root = pData;
c17bfd
+	} else {
c17bfd
+		loadModConf->tail->next = pData;
c17bfd
+		loadModConf->tail = pData;
c17bfd
+	}
c17bfd
+
c17bfd
 CODE_STD_FINALIZERnewActInst
c17bfd
 	cnfparamvalsDestruct(pvals, &actpblk);
c17bfd
 	if (serverParam)
c17bfd
@@ -1680,6 +1989,51 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
c17bfd
 CODE_STD_FINALIZERparseSelectorAct
c17bfd
 ENDparseSelectorAct
c17bfd
 
c17bfd
+
c17bfd
+BEGINbeginCnfLoad
c17bfd
+CODESTARTbeginCnfLoad
c17bfd
+	loadModConf = pModConf;
c17bfd
+	pModConf->pConf = pConf;
c17bfd
+	pModConf->root = pModConf->tail = NULL;
c17bfd
+ENDbeginCnfLoad
c17bfd
+
c17bfd
+
c17bfd
+BEGINendCnfLoad
c17bfd
+CODESTARTendCnfLoad
c17bfd
+	loadModConf = NULL; /* done loading */
c17bfd
+ENDendCnfLoad
c17bfd
+
c17bfd
+
c17bfd
+BEGINcheckCnf
c17bfd
+	instanceConf_t *inst;
c17bfd
+CODESTARTcheckCnf
c17bfd
+	for(inst = pModConf->root ; inst != NULL ; inst = inst->next) {
c17bfd
+		ruleset_t *pRuleset;
c17bfd
+		rsRetVal localRet;
c17bfd
+
c17bfd
+		if (inst->retryRulesetName) {
c17bfd
+			localRet = ruleset.GetRuleset(pModConf->pConf, &pRuleset, inst->retryRulesetName);
c17bfd
+			if(localRet == RS_RET_NOT_FOUND) {
c17bfd
+				errmsg.LogError(0, localRet, "omelasticsearch: retryruleset '%s' not found - "
c17bfd
+						"no retry ruleset will be used", inst->retryRulesetName);
c17bfd
+			} else {
c17bfd
+				inst->retryRuleset = pRuleset;
c17bfd
+			}
c17bfd
+		}
c17bfd
+	}
c17bfd
+ENDcheckCnf
c17bfd
+
c17bfd
+
c17bfd
+BEGINactivateCnf
c17bfd
+CODESTARTactivateCnf
c17bfd
+ENDactivateCnf
c17bfd
+
c17bfd
+
c17bfd
+BEGINfreeCnf
c17bfd
+CODESTARTfreeCnf
c17bfd
+ENDfreeCnf
c17bfd
+
c17bfd
+
c17bfd
 BEGINdoHUP
c17bfd
 CODESTARTdoHUP
c17bfd
 	if(pData->fdErrFile != -1) {
c17bfd
@@ -1691,10 +2045,14 @@ ENDdoHUP
c17bfd
 
c17bfd
 BEGINmodExit
c17bfd
 CODESTARTmodExit
c17bfd
+	if(pInputName != NULL)
c17bfd
+		prop.Destruct(&pInputName);
c17bfd
 	curl_global_cleanup();
c17bfd
 	statsobj.Destruct(&indexStats);
c17bfd
 	objRelease(errmsg, CORE_COMPONENT);
c17bfd
-        objRelease(statsobj, CORE_COMPONENT);
c17bfd
+	objRelease(statsobj, CORE_COMPONENT);
c17bfd
+	objRelease(prop, CORE_COMPONENT);
c17bfd
+	objRelease(ruleset, CORE_COMPONENT);
c17bfd
 ENDmodExit
c17bfd
 
c17bfd
 BEGINqueryEtryPt
c17bfd
@@ -1705,6 +2063,7 @@ CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
c17bfd
 CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
c17bfd
 CODEqueryEtryPt_doHUP
c17bfd
 CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
c17bfd
+CODEqueryEtryPt_STD_CONF2_QUERIES
c17bfd
 ENDqueryEtryPt
c17bfd
 
c17bfd
 
c17bfd
@@ -1714,6 +2073,8 @@ CODESTARTmodInit
c17bfd
 CODEmodInit_QueryRegCFSLineHdlr
c17bfd
 	CHKiRet(objUse(errmsg, CORE_COMPONENT));
c17bfd
 	CHKiRet(objUse(statsobj, CORE_COMPONENT));
c17bfd
+	CHKiRet(objUse(prop, CORE_COMPONENT));
c17bfd
+	CHKiRet(objUse(ruleset, CORE_COMPONENT));
c17bfd
 
c17bfd
 	if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
c17bfd
 		errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled");
c17bfd
@@ -1739,7 +2100,28 @@ CODEmodInit_QueryRegCFSLineHdlr
c17bfd
 	STATSCOUNTER_INIT(indexESFail, mutIndexESFail);
c17bfd
 	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.es",
c17bfd
 		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexESFail));
c17bfd
+	STATSCOUNTER_INIT(indexSuccess, mutIndexSuccess);
c17bfd
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.success",
c17bfd
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexSuccess));
c17bfd
+	STATSCOUNTER_INIT(indexBadResponse, mutIndexBadResponse);
c17bfd
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bad",
c17bfd
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadResponse));
c17bfd
+	STATSCOUNTER_INIT(indexDuplicate, mutIndexDuplicate);
c17bfd
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.duplicate",
c17bfd
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexDuplicate));
c17bfd
+	STATSCOUNTER_INIT(indexBadArgument, mutIndexBadArgument);
c17bfd
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.badargument",
c17bfd
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadArgument));
c17bfd
+	STATSCOUNTER_INIT(indexBulkRejection, mutIndexBulkRejection);
c17bfd
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bulkrejection",
c17bfd
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBulkRejection));
c17bfd
+	STATSCOUNTER_INIT(indexOtherResponse, mutIndexOtherResponse);
c17bfd
+	CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.other",
c17bfd
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexOtherResponse));
c17bfd
 	CHKiRet(statsobj.ConstructFinalize(indexStats));
c17bfd
+	CHKiRet(prop.Construct(&pInputName));
c17bfd
+	CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("omelasticsearch"), sizeof("omelasticsearch") - 1));
c17bfd
+	CHKiRet(prop.ConstructFinalize(pInputName));
c17bfd
 ENDmodInit
c17bfd
 
c17bfd
 /* vi:set ai: