|
|
fde8c1 |
From 989be897340eb458b00efedfd5e082bb362db79a Mon Sep 17 00:00:00 2001
|
|
|
fde8c1 |
From: Rich Megginson <rmeggins@redhat.com>
|
|
|
fde8c1 |
Date: Tue, 15 May 2018 16:03:25 -0600
|
|
|
fde8c1 |
Subject: [PATCH 11/11] omelasticsearch: write op types; bulk rejection retries
|
|
|
fde8c1 |
|
|
|
fde8c1 |
Add support for a 'create' write operation type in addition to
|
|
|
fde8c1 |
the default 'index'. Using create allows specifying a unique id
|
|
|
fde8c1 |
for each record, and allows duplicate document detection.
|
|
|
fde8c1 |
|
|
|
fde8c1 |
Add support for checking each record returned in a bulk index
|
|
|
fde8c1 |
request response. Allow specifying a ruleset to send each failed
|
|
|
fde8c1 |
record to. Add a local variable `omes` which contains the
|
|
|
fde8c1 |
information in the error response, so that users can control how
|
|
|
fde8c1 |
to handle responses e.g. retry, or send to an error file.
|
|
|
fde8c1 |
|
|
|
fde8c1 |
Add support for response stats - count successes, duplicates, and
|
|
|
fde8c1 |
different types of failures.
|
|
|
fde8c1 |
|
|
|
fde8c1 |
Add testing for bulk index rejections.
|
|
|
fde8c1 |
|
|
|
fde8c1 |
(cherry picked from commit 57dd368a2a915d79c94a8dc0de30c93a0bbdc8fe)
|
|
|
fde8c1 |
(cherry picked from commit 30a15621e1e7e393b2153e9fe5c13f724dea25b5)
|
|
|
fde8c1 |
---
|
|
|
fde8c1 |
plugins/omelasticsearch/omelasticsearch.c | 441 ++++++++++++++++++++++++++++--
|
|
|
fde8c1 |
1 file changed, 415 insertions(+), 26 deletions(-)
|
|
|
fde8c1 |
|
|
|
fde8c1 |
diff --git a/plugins/omelasticsearch/omelasticsearch.c b/plugins/omelasticsearch/omelasticsearch.c
|
|
|
fde8c1 |
index ed2b47535..ca61ae28f 100644
|
|
|
fde8c1 |
--- a/plugins/omelasticsearch/omelasticsearch.c
|
|
|
fde8c1 |
+++ b/plugins/omelasticsearch/omelasticsearch.c
|
|
|
fde8c1 |
@@ -51,6 +51,8 @@
|
|
|
fde8c1 |
#include "statsobj.h"
|
|
|
fde8c1 |
#include "cfsysline.h"
|
|
|
fde8c1 |
#include "unicode-helper.h"
|
|
|
fde8c1 |
+#include "ratelimit.h"
|
|
|
fde8c1 |
+#include "ruleset.h"
|
|
|
fde8c1 |
|
|
|
fde8c1 |
#ifndef O_LARGEFILE
|
|
|
fde8c1 |
# define O_LARGEFILE 0
|
|
|
fde8c1 |
@@ -64,6 +66,8 @@ MODULE_CNFNAME("omelasticsearch")
|
|
|
fde8c1 |
DEF_OMOD_STATIC_DATA
|
|
|
fde8c1 |
DEFobjCurrIf(errmsg)
|
|
|
fde8c1 |
DEFobjCurrIf(statsobj)
|
|
|
fde8c1 |
+DEFobjCurrIf(prop)
|
|
|
fde8c1 |
+DEFobjCurrIf(ruleset)
|
|
|
fde8c1 |
|
|
|
fde8c1 |
statsobj_t *indexStats;
|
|
|
fde8c1 |
STATSCOUNTER_DEF(indexSubmit, mutIndexSubmit)
|
|
|
fde8c1 |
@@ -71,19 +75,35 @@ STATSCOUNTER_DEF(indexHTTPFail, mutIndexHTTPFail)
|
|
|
fde8c1 |
STATSCOUNTER_DEF(indexHTTPReqFail, mutIndexHTTPReqFail)
|
|
|
fde8c1 |
STATSCOUNTER_DEF(checkConnFail, mutCheckConnFail)
|
|
|
fde8c1 |
STATSCOUNTER_DEF(indexESFail, mutIndexESFail)
|
|
|
fde8c1 |
+STATSCOUNTER_DEF(indexSuccess, mutIndexSuccess)
|
|
|
fde8c1 |
+STATSCOUNTER_DEF(indexBadResponse, mutIndexBadResponse)
|
|
|
fde8c1 |
+STATSCOUNTER_DEF(indexDuplicate, mutIndexDuplicate)
|
|
|
fde8c1 |
+STATSCOUNTER_DEF(indexBadArgument, mutIndexBadArgument)
|
|
|
fde8c1 |
+STATSCOUNTER_DEF(indexBulkRejection, mutIndexBulkRejection)
|
|
|
fde8c1 |
+STATSCOUNTER_DEF(indexOtherResponse, mutIndexOtherResponse)
|
|
|
fde8c1 |
|
|
|
fde8c1 |
+static prop_t *pInputName = NULL;
|
|
|
fde8c1 |
|
|
|
fde8c1 |
# define META_STRT "{\"index\":{\"_index\": \""
|
|
|
fde8c1 |
+# define META_STRT_CREATE "{\"create\":{\"_index\": \""
|
|
|
fde8c1 |
# define META_TYPE "\",\"_type\":\""
|
|
|
fde8c1 |
# define META_PARENT "\",\"_parent\":\""
|
|
|
fde8c1 |
# define META_ID "\", \"_id\":\""
|
|
|
fde8c1 |
# define META_END "\"}}\n"
|
|
|
fde8c1 |
|
|
|
fde8c1 |
+typedef enum {
|
|
|
fde8c1 |
+ ES_WRITE_INDEX,
|
|
|
fde8c1 |
+ ES_WRITE_CREATE,
|
|
|
fde8c1 |
+ ES_WRITE_UPDATE, /* not supported */
|
|
|
fde8c1 |
+ ES_WRITE_UPSERT /* not supported */
|
|
|
fde8c1 |
+} es_write_ops_t;
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
/* REST API for elasticsearch hits this URL:
|
|
|
fde8c1 |
* http://<hostName>:<restPort>/<searchIndex>/<searchType>
|
|
|
fde8c1 |
*/
|
|
|
fde8c1 |
+/* bulk API uses /_bulk */
|
|
|
fde8c1 |
typedef struct curl_slist HEADER;
|
|
|
fde8c1 |
-typedef struct _instanceData {
|
|
|
fde8c1 |
+typedef struct instanceConf_s {
|
|
|
fde8c1 |
int defaultPort;
|
|
|
fde8c1 |
int fdErrFile; /* error file fd or -1 if not open */
|
|
|
fde8c1 |
pthread_mutex_t mutErrFile;
|
|
|
fde8c1 |
@@ -113,8 +133,25 @@ typedef struct _instanceData {
|
|
|
fde8c1 |
uchar *caCertFile;
|
|
|
fde8c1 |
uchar *myCertFile;
|
|
|
fde8c1 |
uchar *myPrivKeyFile;
|
|
|
fde8c1 |
+ es_write_ops_t writeOperation;
|
|
|
fde8c1 |
+ sbool retryFailures;
|
|
|
fde8c1 |
+ int ratelimitInterval;
|
|
|
fde8c1 |
+ int ratelimitBurst;
|
|
|
fde8c1 |
+ /* for retries */
|
|
|
fde8c1 |
+ ratelimit_t *ratelimiter;
|
|
|
fde8c1 |
+ uchar *retryRulesetName;
|
|
|
fde8c1 |
+ ruleset_t *retryRuleset;
|
|
|
fde8c1 |
+ struct instanceConf_s *next;
|
|
|
fde8c1 |
} instanceData;
|
|
|
fde8c1 |
|
|
|
fde8c1 |
+typedef instanceConf_t instanceData;
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+struct modConfData_s {
|
|
|
fde8c1 |
+ rsconf_t *pConf; /* our overall config object */
|
|
|
fde8c1 |
+ instanceConf_t *root, *tail;
|
|
|
fde8c1 |
+};
|
|
|
fde8c1 |
+static modConfData_t *loadModConf = NULL; /* modConf ptr to use for the current load process */
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
typedef struct wrkrInstanceData {
|
|
|
fde8c1 |
instanceData *pData;
|
|
|
fde8c1 |
int serverIndex;
|
|
|
fde8c1 |
@@ -160,7 +197,12 @@ static struct cnfparamdescr actpdescr[] = {
|
|
|
fde8c1 |
{ "allowunsignedcerts", eCmdHdlrBinary, 0 },
|
|
|
fde8c1 |
{ "tls.cacert", eCmdHdlrString, 0 },
|
|
|
fde8c1 |
{ "tls.mycert", eCmdHdlrString, 0 },
|
|
|
fde8c1 |
- { "tls.myprivkey", eCmdHdlrString, 0 }
|
|
|
fde8c1 |
+ { "tls.myprivkey", eCmdHdlrString, 0 },
|
|
|
fde8c1 |
+ { "writeoperation", eCmdHdlrGetWord, 0 },
|
|
|
fde8c1 |
+ { "retryfailures", eCmdHdlrBinary, 0 },
|
|
|
fde8c1 |
+ { "ratelimit.interval", eCmdHdlrInt, 0 },
|
|
|
fde8c1 |
+ { "ratelimit.burst", eCmdHdlrInt, 0 },
|
|
|
fde8c1 |
+ { "retryruleset", eCmdHdlrString, 0 }
|
|
|
fde8c1 |
};
|
|
|
fde8c1 |
static struct cnfparamblk actpblk =
|
|
|
fde8c1 |
{ CNFPARAMBLK_VERSION,
|
|
|
fde8c1 |
@@ -177,6 +219,9 @@ CODESTARTcreateInstance
|
|
|
fde8c1 |
pData->caCertFile = NULL;
|
|
|
fde8c1 |
pData->myCertFile = NULL;
|
|
|
fde8c1 |
pData->myPrivKeyFile = NULL;
|
|
|
fde8c1 |
+ pData->ratelimiter = NULL;
|
|
|
fde8c1 |
+ pData->retryRulesetName = NULL;
|
|
|
fde8c1 |
+ pData->retryRuleset = NULL;
|
|
|
fde8c1 |
ENDcreateInstance
|
|
|
fde8c1 |
|
|
|
fde8c1 |
BEGINcreateWrkrInstance
|
|
|
fde8c1 |
@@ -228,6 +273,9 @@ CODESTARTfreeInstance
|
|
|
fde8c1 |
free(pData->caCertFile);
|
|
|
fde8c1 |
free(pData->myCertFile);
|
|
|
fde8c1 |
free(pData->myPrivKeyFile);
|
|
|
fde8c1 |
+ free(pData->retryRulesetName);
|
|
|
fde8c1 |
+ if (pData->ratelimiter != NULL)
|
|
|
fde8c1 |
+ ratelimitDestruct(pData->ratelimiter);
|
|
|
fde8c1 |
ENDfreeInstance
|
|
|
fde8c1 |
|
|
|
fde8c1 |
BEGINfreeWrkrInstance
|
|
|
fde8c1 |
@@ -285,6 +333,10 @@ CODESTARTdbgPrintInstInfo
|
|
|
fde8c1 |
dbgprintf("\ttls.cacert='%s'\n", pData->caCertFile);
|
|
|
fde8c1 |
dbgprintf("\ttls.mycert='%s'\n", pData->myCertFile);
|
|
|
fde8c1 |
dbgprintf("\ttls.myprivkey='%s'\n", pData->myPrivKeyFile);
|
|
|
fde8c1 |
+ dbgprintf("\twriteoperation='%d'\n", pData->writeOperation);
|
|
|
fde8c1 |
+ dbgprintf("\tretryfailures='%d'\n", pData->retryFailures);
|
|
|
fde8c1 |
+ dbgprintf("\tratelimit.interval='%d'\n", pData->ratelimitInterval);
|
|
|
fde8c1 |
+ dbgprintf("\tratelimit.burst='%d'\n", pData->ratelimitBurst);
|
|
|
fde8c1 |
ENDdbgPrintInstInfo
|
|
|
fde8c1 |
|
|
|
fde8c1 |
|
|
|
fde8c1 |
@@ -557,7 +609,11 @@ finalize_it:
|
|
|
fde8c1 |
static size_t
|
|
|
fde8c1 |
computeMessageSize(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
|
|
|
fde8c1 |
{
|
|
|
fde8c1 |
- size_t r = sizeof(META_STRT)-1 + sizeof(META_TYPE)-1 + sizeof(META_END)-1 + sizeof("\n")-1;
|
|
|
fde8c1 |
+ size_t r = sizeof(META_TYPE)-1 + sizeof(META_END)-1 + sizeof("\n")-1;
|
|
|
fde8c1 |
+ if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
|
|
|
fde8c1 |
+ r += sizeof(META_STRT_CREATE)-1;
|
|
|
fde8c1 |
+ else
|
|
|
fde8c1 |
+ r += sizeof(META_STRT)-1;
|
|
|
fde8c1 |
|
|
|
fde8c1 |
uchar *searchIndex = 0;
|
|
|
fde8c1 |
uchar *searchType;
|
|
|
fde8c1 |
@@ -594,7 +650,10 @@ buildBatch(wrkrInstanceData_t *pWrkrData, uchar *message, uchar **tpls)
|
|
|
fde8c1 |
DEFiRet;
|
|
|
fde8c1 |
|
|
|
fde8c1 |
getIndexTypeAndParent(pWrkrData->pData, tpls, &searchIndex, &searchType, &parent, &bulkId);
|
|
|
fde8c1 |
- r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1);
|
|
|
fde8c1 |
+ if (pWrkrData->pData->writeOperation == ES_WRITE_CREATE)
|
|
|
fde8c1 |
+ r = es_addBuf(&pWrkrData->batch.data, META_STRT_CREATE, sizeof(META_STRT_CREATE)-1);
|
|
|
fde8c1 |
+ else
|
|
|
fde8c1 |
+ r = es_addBuf(&pWrkrData->batch.data, META_STRT, sizeof(META_STRT)-1);
|
|
|
fde8c1 |
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, (char*)searchIndex,
|
|
|
fde8c1 |
ustrlen(searchIndex));
|
|
|
fde8c1 |
if(r == 0) r = es_addBuf(&pWrkrData->batch.data, META_TYPE, sizeof(META_TYPE)-1);
|
|
|
fde8c1 |
@@ -709,13 +768,20 @@ static int checkReplyStatus(fjson_object* ok) {
|
|
|
fde8c1 |
|
|
|
fde8c1 |
/*
|
|
|
fde8c1 |
* Context object for error file content creation or status check
|
|
|
fde8c1 |
+ * response_item - the full {"create":{"_index":"idxname",.....}}
|
|
|
fde8c1 |
+ * response_body - the inner hash of the response_item - {"_index":"idxname",...}
|
|
|
fde8c1 |
+ * status - the "status" field from the inner hash - "status":500
|
|
|
fde8c1 |
+ * should be able to use fjson_object_get_int(status) to get the http result code
|
|
|
fde8c1 |
*/
|
|
|
fde8c1 |
typedef struct exeContext{
|
|
|
fde8c1 |
int statusCheckOnly;
|
|
|
fde8c1 |
fjson_object *errRoot;
|
|
|
fde8c1 |
- rsRetVal (*prepareErrorFileContent)(struct exeContext *ctx,int itemStatus,char *request,char *response);
|
|
|
fde8c1 |
-
|
|
|
fde8c1 |
-
|
|
|
fde8c1 |
+ rsRetVal (*prepareErrorFileContent)(struct exeContext *ctx,int itemStatus,char *request,char *response,
|
|
|
fde8c1 |
+ fjson_object *response_item, fjson_object *response_body, fjson_object *status);
|
|
|
fde8c1 |
+ es_write_ops_t writeOperation;
|
|
|
fde8c1 |
+ ratelimit_t *ratelimiter;
|
|
|
fde8c1 |
+ ruleset_t *retryRuleset;
|
|
|
fde8c1 |
+ struct json_tokener *jTokener;
|
|
|
fde8c1 |
} context;
|
|
|
fde8c1 |
|
|
|
fde8c1 |
/*
|
|
|
fde8c1 |
@@ -728,8 +794,15 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
|
|
|
fde8c1 |
fjson_object *replyRoot = *pReplyRoot;
|
|
|
fde8c1 |
int i;
|
|
|
fde8c1 |
int numitems;
|
|
|
fde8c1 |
- fjson_object *items=NULL;
|
|
|
fde8c1 |
+ fjson_object *items=NULL, *jo_errors = NULL;
|
|
|
fde8c1 |
+ int errors = 0;
|
|
|
fde8c1 |
|
|
|
fde8c1 |
+ if(fjson_object_object_get_ex(replyRoot, "errors", &jo_errors)) {
|
|
|
fde8c1 |
+ errors = fjson_object_get_boolean(jo_errors);
|
|
|
fde8c1 |
+ if (!errors && pWrkrData->pData->retryFailures) {
|
|
|
fde8c1 |
+ return RS_RET_OK;
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
|
|
|
fde8c1 |
/*iterate over items*/
|
|
|
fde8c1 |
if(!fjson_object_object_get_ex(replyRoot, "items", &items)) {
|
|
|
fde8c1 |
@@ -741,7 +814,11 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
|
|
|
fde8c1 |
|
|
|
fde8c1 |
numitems = fjson_object_array_length(items);
|
|
|
fde8c1 |
|
|
|
fde8c1 |
- DBGPRINTF("omelasticsearch: Entire request %s\n",reqmsg);
|
|
|
fde8c1 |
+ if (reqmsg) {
|
|
|
fde8c1 |
+ DBGPRINTF("omelasticsearch: Entire request %s\n", reqmsg);
|
|
|
fde8c1 |
+ } else {
|
|
|
fde8c1 |
+ DBGPRINTF("omelasticsearch: Empty request\n");
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
const char *lastReqRead= (char*)reqmsg;
|
|
|
fde8c1 |
|
|
|
fde8c1 |
DBGPRINTF("omelasticsearch: %d items in reply\n", numitems);
|
|
|
fde8c1 |
@@ -769,8 +846,7 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
|
|
|
fde8c1 |
|
|
|
fde8c1 |
char *request =0;
|
|
|
fde8c1 |
char *response =0;
|
|
|
fde8c1 |
- if(ctx->statusCheckOnly)
|
|
|
fde8c1 |
- {
|
|
|
fde8c1 |
+ if(ctx->statusCheckOnly || (NULL == lastReqRead)) {
|
|
|
fde8c1 |
if(itemStatus) {
|
|
|
fde8c1 |
DBGPRINTF("omelasticsearch: error in elasticsearch reply: item %d, status is %d\n", i, fjson_object_get_int(ok));
|
|
|
fde8c1 |
DBGPRINTF("omelasticsearch: status check found error.\n");
|
|
|
fde8c1 |
@@ -795,7 +871,8 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
|
|
|
fde8c1 |
}
|
|
|
fde8c1 |
|
|
|
fde8c1 |
/*call the context*/
|
|
|
fde8c1 |
- rsRetVal ret = ctx->prepareErrorFileContent(ctx, itemStatus, request,response);
|
|
|
fde8c1 |
+ rsRetVal ret = ctx->prepareErrorFileContent(ctx, itemStatus, request,
|
|
|
fde8c1 |
+ response, item, result, ok);
|
|
|
fde8c1 |
|
|
|
fde8c1 |
/*free memory in any case*/
|
|
|
fde8c1 |
free(request);
|
|
|
fde8c1 |
@@ -818,11 +895,14 @@ parseRequestAndResponseForContext(wrkrInstanceData_t *pWrkrData,fjson_object **p
|
|
|
fde8c1 |
* Dumps only failed requests of bulk insert
|
|
|
fde8c1 |
*/
|
|
|
fde8c1 |
static rsRetVal
|
|
|
fde8c1 |
-getDataErrorOnly(context *ctx,int itemStatus,char *request,char *response)
|
|
|
fde8c1 |
+getDataErrorOnly(context *ctx,int itemStatus,char *request,char *response,
|
|
|
fde8c1 |
+ fjson_object *response_item, fjson_object *response_body, fjson_object *status)
|
|
|
fde8c1 |
{
|
|
|
fde8c1 |
DEFiRet;
|
|
|
fde8c1 |
- if(itemStatus)
|
|
|
fde8c1 |
- {
|
|
|
fde8c1 |
+ (void)response_item; /* unused */
|
|
|
fde8c1 |
+ (void)response_body; /* unused */
|
|
|
fde8c1 |
+ (void)status; /* unused */
|
|
|
fde8c1 |
+ if(itemStatus) {
|
|
|
fde8c1 |
fjson_object *onlyErrorResponses =NULL;
|
|
|
fde8c1 |
fjson_object *onlyErrorRequests=NULL;
|
|
|
fde8c1 |
|
|
|
fde8c1 |
@@ -855,9 +935,16 @@ static rsRetVal
|
|
|
fde8c1 |
getDataInterleaved(context *ctx,
|
|
|
fde8c1 |
int __attribute__((unused)) itemStatus,
|
|
|
fde8c1 |
char *request,
|
|
|
fde8c1 |
- char *response)
|
|
|
fde8c1 |
+ char *response,
|
|
|
fde8c1 |
+ fjson_object *response_item,
|
|
|
fde8c1 |
+ fjson_object *response_body,
|
|
|
fde8c1 |
+ fjson_object *status
|
|
|
fde8c1 |
+)
|
|
|
fde8c1 |
{
|
|
|
fde8c1 |
DEFiRet;
|
|
|
fde8c1 |
+ (void)response_item; /* unused */
|
|
|
fde8c1 |
+ (void)response_body; /* unused */
|
|
|
fde8c1 |
+ (void)status; /* unused */
|
|
|
fde8c1 |
fjson_object *interleaved =NULL;
|
|
|
fde8c1 |
if(!fjson_object_object_get_ex(ctx->errRoot, "response", &interleaved))
|
|
|
fde8c1 |
{
|
|
|
fde8c1 |
@@ -889,11 +976,13 @@ getDataInterleaved(context *ctx,
|
|
|
fde8c1 |
*/
|
|
|
fde8c1 |
|
|
|
fde8c1 |
static rsRetVal
|
|
|
fde8c1 |
-getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *response)
|
|
|
fde8c1 |
+getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *response,
|
|
|
fde8c1 |
+ fjson_object *response_item, fjson_object *response_body, fjson_object *status)
|
|
|
fde8c1 |
{
|
|
|
fde8c1 |
DEFiRet;
|
|
|
fde8c1 |
if (itemStatus) {
|
|
|
fde8c1 |
- if(getDataInterleaved(ctx, itemStatus,request,response)!= RS_RET_OK) {
|
|
|
fde8c1 |
+ if(getDataInterleaved(ctx, itemStatus,request,response,
|
|
|
fde8c1 |
+ response_item, response_body, status)!= RS_RET_OK) {
|
|
|
fde8c1 |
ABORT_FINALIZE(RS_RET_ERR);
|
|
|
fde8c1 |
}
|
|
|
fde8c1 |
}
|
|
|
fde8c1 |
@@ -902,6 +991,141 @@ getDataErrorOnlyInterleaved(context *ctx,int itemStatus,char *request,char *resp
|
|
|
fde8c1 |
RETiRet;
|
|
|
fde8c1 |
}
|
|
|
fde8c1 |
|
|
|
fde8c1 |
+/* request string looks like this:
|
|
|
fde8c1 |
+ * "{\"create\":{\"_index\": \"rsyslog_testbench\",\"_type\":\"test-type\",
|
|
|
fde8c1 |
+ * \"_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}}\n
|
|
|
fde8c1 |
+ * {\"msgnum\":\"x00000000\",\"viaq_msg_id\":\"FAEAFC0D17C847DA8BD6F47BC5B3800A\"}\n"
|
|
|
fde8c1 |
+ * we don't want the meta header, only the data part
|
|
|
fde8c1 |
+ * start = first \n + 1
|
|
|
fde8c1 |
+ * end = last \n
|
|
|
fde8c1 |
+ */
|
|
|
fde8c1 |
+static rsRetVal
|
|
|
fde8c1 |
+createMsgFromRequest(const char *request, context *ctx, smsg_t **msg)
|
|
|
fde8c1 |
+{
|
|
|
fde8c1 |
+ DEFiRet;
|
|
|
fde8c1 |
+ fjson_object *jo_msg = NULL;
|
|
|
fde8c1 |
+ const char *datastart, *dataend;
|
|
|
fde8c1 |
+ size_t datalen;
|
|
|
fde8c1 |
+ enum json_tokener_error json_error;
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+ *msg = NULL;
|
|
|
fde8c1 |
+ if (!(datastart = strchr(request, '\n')) || (datastart[1] != '{')) {
|
|
|
fde8c1 |
+ LogError(0, RS_RET_ERR,
|
|
|
fde8c1 |
+ "omelasticsearch: malformed original request - "
|
|
|
fde8c1 |
+ "could not find start of original data [%s]",
|
|
|
fde8c1 |
+ request);
|
|
|
fde8c1 |
+ ABORT_FINALIZE(RS_RET_ERR);
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
+ datastart++; /* advance to { */
|
|
|
fde8c1 |
+ if (!(dataend = strchr(datastart, '\n')) || (dataend[1] != '\0')) {
|
|
|
fde8c1 |
+ LogError(0, RS_RET_ERR,
|
|
|
fde8c1 |
+ "omelasticsearch: malformed original request - "
|
|
|
fde8c1 |
+ "could not find end of original data [%s]",
|
|
|
fde8c1 |
+ request);
|
|
|
fde8c1 |
+ ABORT_FINALIZE(RS_RET_ERR);
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
+ datalen = dataend - datastart;
|
|
|
fde8c1 |
+ json_tokener_reset(ctx->jTokener);
|
|
|
fde8c1 |
+ fjson_object *jo_request = json_tokener_parse_ex(ctx->jTokener, datastart, datalen);
|
|
|
fde8c1 |
+ json_error = fjson_tokener_get_error(ctx->jTokener);
|
|
|
fde8c1 |
+ if (!jo_request || (json_error != fjson_tokener_success)) {
|
|
|
fde8c1 |
+ LogError(0, RS_RET_ERR,
|
|
|
fde8c1 |
+ "omelasticsearch: parse error [%s] - could not convert original "
|
|
|
fde8c1 |
+ "request JSON back into JSON object [%s]",
|
|
|
fde8c1 |
+ fjson_tokener_error_desc(json_error), request);
|
|
|
fde8c1 |
+ ABORT_FINALIZE(RS_RET_ERR);
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+ CHKiRet(msgConstruct(msg));
|
|
|
fde8c1 |
+ MsgSetFlowControlType(*msg, eFLOWCTL_FULL_DELAY);
|
|
|
fde8c1 |
+ MsgSetInputName(*msg, pInputName);
|
|
|
fde8c1 |
+ if (fjson_object_object_get_ex(jo_request, "message", &jo_msg)) {
|
|
|
fde8c1 |
+ const char *rawmsg = json_object_get_string(jo_msg);
|
|
|
fde8c1 |
+ const size_t msgLen = (size_t)json_object_get_string_len(jo_msg);
|
|
|
fde8c1 |
+ MsgSetRawMsg(*msg, rawmsg, msgLen);
|
|
|
fde8c1 |
+ } else {
|
|
|
fde8c1 |
+ MsgSetRawMsg(*msg, request, strlen(request));
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
+ MsgSetMSGoffs(*msg, 0); /* we do not have a header... */
|
|
|
fde8c1 |
+ CHKiRet(msgAddJSON(*msg, (uchar*)"!", jo_request, 0, 0));
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+ finalize_it:
|
|
|
fde8c1 |
+ RETiRet;
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+}
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+static rsRetVal
|
|
|
fde8c1 |
+getDataRetryFailures(context *ctx,int itemStatus,char *request,char *response,
|
|
|
fde8c1 |
+ fjson_object *response_item, fjson_object *response_body, fjson_object *status)
|
|
|
fde8c1 |
+{
|
|
|
fde8c1 |
+ DEFiRet;
|
|
|
fde8c1 |
+ fjson_object *omes = NULL, *jo = NULL;
|
|
|
fde8c1 |
+ int istatus = fjson_object_get_int(status);
|
|
|
fde8c1 |
+ int iscreateop = 0;
|
|
|
fde8c1 |
+ struct json_object_iterator it = json_object_iter_begin(response_item);
|
|
|
fde8c1 |
+ struct json_object_iterator itEnd = json_object_iter_end(response_item);
|
|
|
fde8c1 |
+ const char *optype = NULL;
|
|
|
fde8c1 |
+ smsg_t *msg = NULL;
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+ (void)response;
|
|
|
fde8c1 |
+ (void)itemStatus;
|
|
|
fde8c1 |
+ CHKiRet(createMsgFromRequest(request, ctx, &msg));
|
|
|
fde8c1 |
+ CHKmalloc(msg);
|
|
|
fde8c1 |
+ /* add status as local variables */
|
|
|
fde8c1 |
+ omes = json_object_new_object();
|
|
|
fde8c1 |
+ if (!json_object_iter_equal(&it, &itEnd))
|
|
|
fde8c1 |
+ optype = json_object_iter_peek_name(&it);
|
|
|
fde8c1 |
+ if (optype && !strcmp("create", optype))
|
|
|
fde8c1 |
+ iscreateop = 1;
|
|
|
fde8c1 |
+ if (optype && !strcmp("index", optype) && (ctx->writeOperation == ES_WRITE_INDEX))
|
|
|
fde8c1 |
+ iscreateop = 1;
|
|
|
fde8c1 |
+ if (optype) {
|
|
|
fde8c1 |
+ jo = json_object_new_string(optype);
|
|
|
fde8c1 |
+ } else {
|
|
|
fde8c1 |
+ jo = json_object_new_string("unknown");
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
+ json_object_object_add(omes, "writeoperation", jo);
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+ if (!optype) {
|
|
|
fde8c1 |
+ STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
|
|
|
fde8c1 |
+ } else if ((istatus == 200) || (istatus == 201)) {
|
|
|
fde8c1 |
+ STATSCOUNTER_INC(indexSuccess, mutIndexSuccess);
|
|
|
fde8c1 |
+ } else if ((istatus == 409) && iscreateop) {
|
|
|
fde8c1 |
+ STATSCOUNTER_INC(indexDuplicate, mutIndexDuplicate);
|
|
|
fde8c1 |
+ } else if (istatus == 400 || (istatus < 200)) {
|
|
|
fde8c1 |
+ STATSCOUNTER_INC(indexBadArgument, mutIndexBadArgument);
|
|
|
fde8c1 |
+ } else {
|
|
|
fde8c1 |
+ fjson_object *error = NULL, *errtype = NULL;
|
|
|
fde8c1 |
+ if(fjson_object_object_get_ex(response_body, "error", &error) &&
|
|
|
fde8c1 |
+ fjson_object_object_get_ex(error, "type", &errtype)) {
|
|
|
fde8c1 |
+ if (istatus == 429) {
|
|
|
fde8c1 |
+ STATSCOUNTER_INC(indexBulkRejection, mutIndexBulkRejection);
|
|
|
fde8c1 |
+ } else {
|
|
|
fde8c1 |
+ STATSCOUNTER_INC(indexOtherResponse, mutIndexOtherResponse);
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
+ } else {
|
|
|
fde8c1 |
+ STATSCOUNTER_INC(indexBadResponse, mutIndexBadResponse);
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
+ /* add response_body fields to local var omes */
|
|
|
fde8c1 |
+ it = json_object_iter_begin(response_body);
|
|
|
fde8c1 |
+ itEnd = json_object_iter_end(response_body);
|
|
|
fde8c1 |
+ while (!json_object_iter_equal(&it, &itEnd)) {
|
|
|
fde8c1 |
+ json_object_object_add(omes, json_object_iter_peek_name(&it),
|
|
|
fde8c1 |
+ json_object_get(json_object_iter_peek_value(&it)));
|
|
|
fde8c1 |
+ json_object_iter_next(&it);
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
+ CHKiRet(msgAddJSON(msg, (uchar*)".omes", omes, 0, 0));
|
|
|
fde8c1 |
+ omes = NULL;
|
|
|
fde8c1 |
+ MsgSetRuleset(msg, ctx->retryRuleset);
|
|
|
fde8c1 |
+ CHKiRet(ratelimitAddMsg(ctx->ratelimiter, NULL, msg));
|
|
|
fde8c1 |
+finalize_it:
|
|
|
fde8c1 |
+ if (omes)
|
|
|
fde8c1 |
+ json_object_put(omes);
|
|
|
fde8c1 |
+ RETiRet;
|
|
|
fde8c1 |
+}
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
/*
|
|
|
fde8c1 |
* get erroronly context
|
|
|
fde8c1 |
*/
|
|
|
fde8c1 |
@@ -979,6 +1203,23 @@ initializeErrorInterleavedConext(wrkrInstanceData_t *pWrkrData,context *ctx){
|
|
|
fde8c1 |
RETiRet;
|
|
|
fde8c1 |
}
|
|
|
fde8c1 |
|
|
|
fde8c1 |
+/*get retry failures context*/
|
|
|
fde8c1 |
+static rsRetVal
|
|
|
fde8c1 |
+initializeRetryFailuresContext(wrkrInstanceData_t *pWrkrData,context *ctx){
|
|
|
fde8c1 |
+ DEFiRet;
|
|
|
fde8c1 |
+ ctx->statusCheckOnly=0;
|
|
|
fde8c1 |
+ fjson_object *errRoot=NULL;
|
|
|
fde8c1 |
+ if((errRoot=fjson_object_new_object()) == NULL) ABORT_FINALIZE(RS_RET_ERR);
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+ fjson_object_object_add(errRoot, "url", fjson_object_new_string((char*)pWrkrData->restURL));
|
|
|
fde8c1 |
+ ctx->errRoot = errRoot;
|
|
|
fde8c1 |
+ ctx->prepareErrorFileContent= &getDataRetryFailures;
|
|
|
fde8c1 |
+ CHKmalloc(ctx->jTokener = json_tokener_new());
|
|
|
fde8c1 |
+ finalize_it:
|
|
|
fde8c1 |
+ RETiRet;
|
|
|
fde8c1 |
+}
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
|
|
|
fde8c1 |
/* write data error request/replies to separate error file
|
|
|
fde8c1 |
* Note: we open the file but never close it before exit. If it
|
|
|
fde8c1 |
@@ -994,6 +1235,10 @@ writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, fjson_object
|
|
|
fde8c1 |
char errStr[1024];
|
|
|
fde8c1 |
context ctx;
|
|
|
fde8c1 |
ctx.errRoot=0;
|
|
|
fde8c1 |
+ ctx.writeOperation = pWrkrData->pData->writeOperation;
|
|
|
fde8c1 |
+ ctx.ratelimiter = pWrkrData->pData->ratelimiter;
|
|
|
fde8c1 |
+ ctx.retryRuleset = pWrkrData->pData->retryRuleset;
|
|
|
fde8c1 |
+ ctx.jTokener = NULL;
|
|
|
fde8c1 |
DEFiRet;
|
|
|
fde8c1 |
|
|
|
fde8c1 |
if(pData->errorFile == NULL) {
|
|
|
fde8c1 |
@@ -1039,9 +1284,12 @@ writeDataError(wrkrInstanceData_t *pWrkrData, instanceData *pData, fjson_object
|
|
|
fde8c1 |
DBGPRINTF("omelasticsearch: error initializing error interleaved context.\n");
|
|
|
fde8c1 |
ABORT_FINALIZE(RS_RET_ERR);
|
|
|
fde8c1 |
}
|
|
|
fde8c1 |
- }
|
|
|
fde8c1 |
- else
|
|
|
fde8c1 |
- {
|
|
|
fde8c1 |
+ } else if(pData->retryFailures) {
|
|
|
fde8c1 |
+ if(initializeRetryFailuresContext(pWrkrData, &ctx) != RS_RET_OK) {
|
|
|
fde8c1 |
+ DBGPRINTF("omelasticsearch: error initializing retry failures context.\n");
|
|
|
fde8c1 |
+ ABORT_FINALIZE(RS_RET_ERR);
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
+ } else {
|
|
|
fde8c1 |
DBGPRINTF("omelasticsearch: None of the modes match file write. No data to write.\n");
|
|
|
fde8c1 |
ABORT_FINALIZE(RS_RET_ERR);
|
|
|
fde8c1 |
}
|
|
|
fde8c1 |
@@ -1082,25 +1330,38 @@ finalize_it:
|
|
|
fde8c1 |
if(bMutLocked)
|
|
|
fde8c1 |
pthread_mutex_unlock(&pData->mutErrFile);
|
|
|
fde8c1 |
fjson_object_put(ctx.errRoot);
|
|
|
fde8c1 |
+ if (ctx.jTokener)
|
|
|
fde8c1 |
+ json_tokener_free(ctx.jTokener);
|
|
|
fde8c1 |
+ free(rendered);
|
|
|
fde8c1 |
RETiRet;
|
|
|
fde8c1 |
}
|
|
|
fde8c1 |
|
|
|
fde8c1 |
|
|
|
fde8c1 |
static rsRetVal
|
|
|
fde8c1 |
-checkResultBulkmode(wrkrInstanceData_t *pWrkrData, fjson_object *root)
|
|
|
fde8c1 |
+checkResultBulkmode(wrkrInstanceData_t *pWrkrData, fjson_object *root, uchar *reqmsg)
|
|
|
fde8c1 |
{
|
|
|
fde8c1 |
DEFiRet;
|
|
|
fde8c1 |
context ctx;
|
|
|
fde8c1 |
- ctx.statusCheckOnly=1;
|
|
|
fde8c1 |
ctx.errRoot = 0;
|
|
|
fde8c1 |
- if(parseRequestAndResponseForContext(pWrkrData,&root,0,&ctx)!= RS_RET_OK)
|
|
|
fde8c1 |
- {
|
|
|
fde8c1 |
+ ctx.writeOperation = pWrkrData->pData->writeOperation;
|
|
|
fde8c1 |
+ ctx.ratelimiter = pWrkrData->pData->ratelimiter;
|
|
|
fde8c1 |
+ ctx.retryRuleset = pWrkrData->pData->retryRuleset;
|
|
|
fde8c1 |
+ ctx.statusCheckOnly=1;
|
|
|
fde8c1 |
+ ctx.jTokener = NULL;
|
|
|
fde8c1 |
+ if (pWrkrData->pData->retryFailures) {
|
|
|
fde8c1 |
+ ctx.statusCheckOnly=0;
|
|
|
fde8c1 |
+ CHKiRet(initializeRetryFailuresContext(pWrkrData, &ctx));
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
+ if(parseRequestAndResponseForContext(pWrkrData,&root,reqmsg,&ctx)!= RS_RET_OK) {
|
|
|
fde8c1 |
DBGPRINTF("omelasticsearch: error found in elasticsearch reply\n");
|
|
|
fde8c1 |
ABORT_FINALIZE(RS_RET_DATAFAIL);
|
|
|
fde8c1 |
}
|
|
|
fde8c1 |
|
|
|
fde8c1 |
- finalize_it:
|
|
|
fde8c1 |
- RETiRet;
|
|
|
fde8c1 |
+finalize_it:
|
|
|
fde8c1 |
+ fjson_object_put(ctx.errRoot);
|
|
|
fde8c1 |
+ if (ctx.jTokener)
|
|
|
fde8c1 |
+ json_tokener_free(ctx.jTokener);
|
|
|
fde8c1 |
+ RETiRet;
|
|
|
fde8c1 |
}
|
|
|
fde8c1 |
|
|
|
fde8c1 |
|
|
|
fde8c1 |
@@ -1118,7 +1378,7 @@ checkResult(wrkrInstanceData_t *pWrkrData, uchar *reqmsg)
|
|
|
fde8c1 |
}
|
|
|
fde8c1 |
|
|
|
fde8c1 |
if(pWrkrData->pData->bulkmode) {
|
|
|
fde8c1 |
- iRet = checkResultBulkmode(pWrkrData, root);
|
|
|
fde8c1 |
+ iRet = checkResultBulkmode(pWrkrData, root, reqmsg);
|
|
|
fde8c1 |
} else {
|
|
|
fde8c1 |
if(fjson_object_object_get_ex(root, "status", &status)) {
|
|
|
fde8c1 |
iRet = RS_RET_DATAFAIL;
|
|
|
fde8c1 |
@@ -1397,6 +1657,13 @@ setInstParamDefaults(instanceData *pData)
|
|
|
fde8c1 |
pData->caCertFile = NULL;
|
|
|
fde8c1 |
pData->myCertFile = NULL;
|
|
|
fde8c1 |
pData->myPrivKeyFile = NULL;
|
|
|
fde8c1 |
+ pData->writeOperation = ES_WRITE_INDEX;
|
|
|
fde8c1 |
+ pData->retryFailures = 0;
|
|
|
fde8c1 |
+ pData->ratelimitBurst = 20000;
|
|
|
fde8c1 |
+ pData->ratelimitInterval = 600;
|
|
|
fde8c1 |
+ pData->ratelimiter = NULL;
|
|
|
fde8c1 |
+ pData->retryRulesetName = NULL;
|
|
|
fde8c1 |
+ pData->retryRuleset = NULL;
|
|
|
fde8c1 |
}
|
|
|
fde8c1 |
|
|
|
fde8c1 |
BEGINnewActInst
|
|
|
fde8c1 |
@@ -1495,6 +1762,27 @@ CODESTARTnewActInst
|
|
|
fde8c1 |
} else {
|
|
|
fde8c1 |
fclose(fp);
|
|
|
fde8c1 |
}
|
|
|
fde8c1 |
+ } else if(!strcmp(actpblk.descr[i].name, "writeoperation")) {
|
|
|
fde8c1 |
+ char *writeop = es_str2cstr(pvals[i].val.d.estr, NULL);
|
|
|
fde8c1 |
+ if (writeop && !strcmp(writeop, "create")) {
|
|
|
fde8c1 |
+ pData->writeOperation = ES_WRITE_CREATE;
|
|
|
fde8c1 |
+ } else if (writeop && !strcmp(writeop, "index")) {
|
|
|
fde8c1 |
+ pData->writeOperation = ES_WRITE_INDEX;
|
|
|
fde8c1 |
+ } else if (writeop) {
|
|
|
fde8c1 |
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR,
|
|
|
fde8c1 |
+ "omelasticsearch: invalid value '%s' for writeoperation: "
|
|
|
fde8c1 |
+ "must be one of 'index' or 'create' - using default value 'index'", writeop);
|
|
|
fde8c1 |
+ pData->writeOperation = ES_WRITE_INDEX;
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
+ free(writeop);
|
|
|
fde8c1 |
+ } else if(!strcmp(actpblk.descr[i].name, "retryfailures")) {
|
|
|
fde8c1 |
+ pData->retryFailures = pvals[i].val.d.n;
|
|
|
fde8c1 |
+ } else if(!strcmp(actpblk.descr[i].name, "ratelimit.burst")) {
|
|
|
fde8c1 |
+ pData->ratelimitBurst = (int) pvals[i].val.d.n;
|
|
|
fde8c1 |
+ } else if(!strcmp(actpblk.descr[i].name, "ratelimit.interval")) {
|
|
|
fde8c1 |
+ pData->ratelimitInterval = (int) pvals[i].val.d.n;
|
|
|
fde8c1 |
+ } else if(!strcmp(actpblk.descr[i].name, "retryruleset")) {
|
|
|
fde8c1 |
+ pData->retryRulesetName = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
|
|
|
fde8c1 |
} else {
|
|
|
fde8c1 |
dbgprintf("omelasticsearch: program error, non-handled "
|
|
|
fde8c1 |
"param '%s'\n", actpblk.descr[i].name);
|
|
|
fde8c1 |
@@ -1661,6 +1949,27 @@ CODESTARTnewActInst
|
|
|
fde8c1 |
pData->searchIndex = (uchar*) strdup("system");
|
|
|
fde8c1 |
if(pData->searchType == NULL)
|
|
|
fde8c1 |
pData->searchType = (uchar*) strdup("events");
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+ if ((pData->writeOperation != ES_WRITE_INDEX) && (pData->bulkId == NULL)) {
|
|
|
fde8c1 |
+ errmsg.LogError(0, RS_RET_CONFIG_ERROR,
|
|
|
fde8c1 |
+ "omelasticsearch: writeoperation '%d' requires bulkid", pData->writeOperation);
|
|
|
fde8c1 |
+ ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+ if (pData->retryFailures) {
|
|
|
fde8c1 |
+ CHKiRet(ratelimitNew(&pData->ratelimiter, "omelasticsearch", NULL));
|
|
|
fde8c1 |
+ ratelimitSetLinuxLike(pData->ratelimiter, pData->ratelimitInterval, pData->ratelimitBurst);
|
|
|
fde8c1 |
+ ratelimitSetNoTimeCache(pData->ratelimiter);
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+ /* node created, let's add to list of instance configs for the module */
|
|
|
fde8c1 |
+ if(loadModConf->tail == NULL) {
|
|
|
fde8c1 |
+ loadModConf->tail = loadModConf->root = pData;
|
|
|
fde8c1 |
+ } else {
|
|
|
fde8c1 |
+ loadModConf->tail->next = pData;
|
|
|
fde8c1 |
+ loadModConf->tail = pData;
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
CODE_STD_FINALIZERnewActInst
|
|
|
fde8c1 |
cnfparamvalsDestruct(pvals, &actpblk);
|
|
|
fde8c1 |
if (serverParam)
|
|
|
fde8c1 |
@@ -1680,6 +1989,51 @@ CODE_STD_STRING_REQUESTparseSelectorAct(1)
|
|
|
fde8c1 |
CODE_STD_FINALIZERparseSelectorAct
|
|
|
fde8c1 |
ENDparseSelectorAct
|
|
|
fde8c1 |
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+BEGINbeginCnfLoad
|
|
|
fde8c1 |
+CODESTARTbeginCnfLoad
|
|
|
fde8c1 |
+ loadModConf = pModConf;
|
|
|
fde8c1 |
+ pModConf->pConf = pConf;
|
|
|
fde8c1 |
+ pModConf->root = pModConf->tail = NULL;
|
|
|
fde8c1 |
+ENDbeginCnfLoad
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+BEGINendCnfLoad
|
|
|
fde8c1 |
+CODESTARTendCnfLoad
|
|
|
fde8c1 |
+ loadModConf = NULL; /* done loading */
|
|
|
fde8c1 |
+ENDendCnfLoad
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+BEGINcheckCnf
|
|
|
fde8c1 |
+ instanceConf_t *inst;
|
|
|
fde8c1 |
+CODESTARTcheckCnf
|
|
|
fde8c1 |
+ for(inst = pModConf->root ; inst != NULL ; inst = inst->next) {
|
|
|
fde8c1 |
+ ruleset_t *pRuleset;
|
|
|
fde8c1 |
+ rsRetVal localRet;
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+ if (inst->retryRulesetName) {
|
|
|
fde8c1 |
+ localRet = ruleset.GetRuleset(pModConf->pConf, &pRuleset, inst->retryRulesetName);
|
|
|
fde8c1 |
+ if(localRet == RS_RET_NOT_FOUND) {
|
|
|
fde8c1 |
+ errmsg.LogError(0, localRet, "omelasticsearch: retryruleset '%s' not found - "
|
|
|
fde8c1 |
+ "no retry ruleset will be used", inst->retryRulesetName);
|
|
|
fde8c1 |
+ } else {
|
|
|
fde8c1 |
+ inst->retryRuleset = pRuleset;
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
+ }
|
|
|
fde8c1 |
+ENDcheckCnf
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+BEGINactivateCnf
|
|
|
fde8c1 |
+CODESTARTactivateCnf
|
|
|
fde8c1 |
+ENDactivateCnf
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+BEGINfreeCnf
|
|
|
fde8c1 |
+CODESTARTfreeCnf
|
|
|
fde8c1 |
+ENDfreeCnf
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
+
|
|
|
fde8c1 |
BEGINdoHUP
|
|
|
fde8c1 |
CODESTARTdoHUP
|
|
|
fde8c1 |
if(pData->fdErrFile != -1) {
|
|
|
fde8c1 |
@@ -1691,10 +2045,14 @@ ENDdoHUP
|
|
|
fde8c1 |
|
|
|
fde8c1 |
BEGINmodExit
|
|
|
fde8c1 |
CODESTARTmodExit
|
|
|
fde8c1 |
+ if(pInputName != NULL)
|
|
|
fde8c1 |
+ prop.Destruct(&pInputName);
|
|
|
fde8c1 |
curl_global_cleanup();
|
|
|
fde8c1 |
statsobj.Destruct(&indexStats);
|
|
|
fde8c1 |
objRelease(errmsg, CORE_COMPONENT);
|
|
|
fde8c1 |
- objRelease(statsobj, CORE_COMPONENT);
|
|
|
fde8c1 |
+ objRelease(statsobj, CORE_COMPONENT);
|
|
|
fde8c1 |
+ objRelease(prop, CORE_COMPONENT);
|
|
|
fde8c1 |
+ objRelease(ruleset, CORE_COMPONENT);
|
|
|
fde8c1 |
ENDmodExit
|
|
|
fde8c1 |
|
|
|
fde8c1 |
BEGINqueryEtryPt
|
|
|
fde8c1 |
@@ -1705,6 +2063,7 @@ CODEqueryEtryPt_IsCompatibleWithFeature_IF_OMOD_QUERIES
|
|
|
fde8c1 |
CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
|
|
|
fde8c1 |
CODEqueryEtryPt_doHUP
|
|
|
fde8c1 |
CODEqueryEtryPt_TXIF_OMOD_QUERIES /* we support the transactional interface! */
|
|
|
fde8c1 |
+CODEqueryEtryPt_STD_CONF2_QUERIES
|
|
|
fde8c1 |
ENDqueryEtryPt
|
|
|
fde8c1 |
|
|
|
fde8c1 |
|
|
|
fde8c1 |
@@ -1714,6 +2073,8 @@ CODESTARTmodInit
|
|
|
fde8c1 |
CODEmodInit_QueryRegCFSLineHdlr
|
|
|
fde8c1 |
CHKiRet(objUse(errmsg, CORE_COMPONENT));
|
|
|
fde8c1 |
CHKiRet(objUse(statsobj, CORE_COMPONENT));
|
|
|
fde8c1 |
+ CHKiRet(objUse(prop, CORE_COMPONENT));
|
|
|
fde8c1 |
+ CHKiRet(objUse(ruleset, CORE_COMPONENT));
|
|
|
fde8c1 |
|
|
|
fde8c1 |
if (curl_global_init(CURL_GLOBAL_ALL) != 0) {
|
|
|
fde8c1 |
errmsg.LogError(0, RS_RET_OBJ_CREATION_FAILED, "CURL fail. -elasticsearch indexing disabled");
|
|
|
fde8c1 |
@@ -1739,7 +2100,28 @@ CODEmodInit_QueryRegCFSLineHdlr
|
|
|
fde8c1 |
STATSCOUNTER_INIT(indexESFail, mutIndexESFail);
|
|
|
fde8c1 |
CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"failed.es",
|
|
|
fde8c1 |
ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexESFail));
|
|
|
fde8c1 |
+ STATSCOUNTER_INIT(indexSuccess, mutIndexSuccess);
|
|
|
fde8c1 |
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.success",
|
|
|
fde8c1 |
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexSuccess));
|
|
|
fde8c1 |
+ STATSCOUNTER_INIT(indexBadResponse, mutIndexBadResponse);
|
|
|
fde8c1 |
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bad",
|
|
|
fde8c1 |
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadResponse));
|
|
|
fde8c1 |
+ STATSCOUNTER_INIT(indexDuplicate, mutIndexDuplicate);
|
|
|
fde8c1 |
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.duplicate",
|
|
|
fde8c1 |
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexDuplicate));
|
|
|
fde8c1 |
+ STATSCOUNTER_INIT(indexBadArgument, mutIndexBadArgument);
|
|
|
fde8c1 |
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.badargument",
|
|
|
fde8c1 |
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBadArgument));
|
|
|
fde8c1 |
+ STATSCOUNTER_INIT(indexBulkRejection, mutIndexBulkRejection);
|
|
|
fde8c1 |
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.bulkrejection",
|
|
|
fde8c1 |
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexBulkRejection));
|
|
|
fde8c1 |
+ STATSCOUNTER_INIT(indexOtherResponse, mutIndexOtherResponse);
|
|
|
fde8c1 |
+ CHKiRet(statsobj.AddCounter(indexStats, (uchar *)"response.other",
|
|
|
fde8c1 |
+ ctrType_IntCtr, CTR_FLAG_RESETTABLE, &indexOtherResponse));
|
|
|
fde8c1 |
CHKiRet(statsobj.ConstructFinalize(indexStats));
|
|
|
fde8c1 |
+ CHKiRet(prop.Construct(&pInputName));
|
|
|
fde8c1 |
+ CHKiRet(prop.SetString(pInputName, UCHAR_CONSTANT("omelasticsearch"), sizeof("omelasticsearch") - 1));
|
|
|
fde8c1 |
+ CHKiRet(prop.ConstructFinalize(pInputName));
|
|
|
fde8c1 |
ENDmodInit
|
|
|
fde8c1 |
|
|
|
fde8c1 |
/* vi:set ai:
|