Blame SOURCES/rsyslog-8.37.0-rhbz1622768-kubernetes-404-handling.patch

45f82e
From 3987cd929d859f900318b393133c3bdde8dfffd5 Mon Sep 17 00:00:00 2001
45f82e
From: Rich Megginson <rmeggins@redhat.com>
45f82e
Date: Tue, 28 Aug 2018 12:44:23 -0600
45f82e
Subject: [PATCH] mmkubertnetes: action fails preparation cycle if kubernetes
45f82e
 API destroys resource during bootup sequence
45f82e
45f82e
The plugin was not handling 404 Not Found correctly when looking
45f82e
up pods and namespaces.  In this case, we assume the pod/namespace
45f82e
was deleted, annotate the record with whatever metadata we have,
45f82e
and cache the fact that the pod/namespace is missing so we don't
45f82e
attempt to look it up again.
45f82e
In addition, the plugin was not handling error 429 Busy correctly.
45f82e
In this case, it should also annotate the record with whatever
45f82e
metadata it has, and _not_ cache anything.  By default the plugin
45f82e
will retry every 5 seconds to connect to Kubernetes.  This
45f82e
behavior is controlled by the new config param `busyretryinterval`.
45f82e
This commit also adds impstats counters so that admins can
45f82e
view the state of the plugin to see if the lookups are working
45f82e
or are returning errors.  The stats are reported per-instance
45f82e
or per-action to facilitate using multiple different actions
45f82e
for different Kubernetes servers.
45f82e
This commit also adds support for client cert auth to
45f82e
Kubernetes via the two new config params `tls.mycert` and
45f82e
`tls.myprivkey`.
45f82e
---
45f82e
 contrib/mmkubernetes/mmkubernetes.c | 296 ++++++++++++++++++++++++----
45f82e
 7 files changed, 160 insertions(+), 36 deletions(-)
45f82e
45f82e
diff --git a/contrib/mmkubernetes/mmkubernetes.c b/contrib/mmkubernetes/mmkubernetes.c
45f82e
index 422cb2577..5bf5b049d 100644
45f82e
--- a/contrib/mmkubernetes/mmkubernetes.c
45f82e
+++ b/contrib/mmkubernetes/mmkubernetes.c
45f82e
@@ -52,9 +52,12 @@
45f82e
 #include "syslogd-types.h"
45f82e
 #include "module-template.h"
45f82e
 #include "errmsg.h"
45f82e
+#include "statsobj.h"
45f82e
 #include "regexp.h"
45f82e
 #include "hashtable.h"
45f82e
 #include "srUtils.h"
45f82e
+#include "unicode-helper.h"
45f82e
+#include "datetime.h"
45f82e
 
45f82e
 /* static data */
45f82e
 MODULE_TYPE_OUTPUT /* this is technically an output plugin */
45f82e
@@ -62,6 +65,8 @@ MODULE_TYPE_KEEP /* releasing the module would cause a leak through libcurl */
45f82e
 MODULE_CNFNAME("mmkubernetes")
45f82e
 DEF_OMOD_STATIC_DATA
45f82e
 DEFobjCurrIf(regexp)
45f82e
+DEFobjCurrIf(statsobj)
45f82e
+DEFobjCurrIf(datetime)
45f82e
 
45f82e
 #define HAVE_LOADSAMPLESFROMSTRING 1
45f82e
 #if defined(NO_LOADSAMPLESFROMSTRING)
45f82e
@@ -95,12 +100,14 @@ DEFobjCurrIf(regexp)
45f82e
 #define DFLT_CONTAINER_NAME "$!CONTAINER_NAME" /* name of variable holding CONTAINER_NAME value */
45f82e
 #define DFLT_CONTAINER_ID_FULL "$!CONTAINER_ID_FULL" /* name of variable holding CONTAINER_ID_FULL value */
45f82e
 #define DFLT_KUBERNETES_URL "https://kubernetes.default.svc.cluster.local:443"
45f82e
+#define DFLT_BUSY_RETRY_INTERVAL 5 /* retry every 5 seconds */
45f82e
 
45f82e
 static struct cache_s {
45f82e
 	const uchar *kbUrl;
45f82e
 	struct hashtable *mdHt;
45f82e
 	struct hashtable *nsHt;
45f82e
 	pthread_mutex_t *cacheMtx;
45f82e
+	int lastBusyTime;
45f82e
 } **caches;
45f82e
 
45f82e
 typedef struct {
45f82e
@@ -116,6 +123,8 @@ struct modConfData_s {
45f82e
 	uchar *srcMetadataPath;	/* where to get data for kubernetes queries */
45f82e
 	uchar *dstMetadataPath;	/* where to put metadata obtained from kubernetes */
45f82e
 	uchar *caCertFile; /* File holding the CA cert (+optional chain) of CA that issued the Kubernetes server cert */
45f82e
+	uchar *myCertFile; /* File holding cert corresponding to private key used for client cert auth */
45f82e
+	uchar *myPrivKeyFile; /* File holding private key corresponding to cert used for client cert auth */
45f82e
 	sbool allowUnsignedCerts; /* For testing/debugging - do not check for CA certs (CURLOPT_SSL_VERIFYPEER FALSE) */
45f82e
 	uchar *token; /* The token value to use to authenticate to Kubernetes - takes precedence over tokenFile */
45f82e
 	uchar *tokenFile; /* The file whose contents is the token value to use to authenticate to Kubernetes */
45f82e
@@ -127,6 +136,7 @@ struct modConfData_s {
45f82e
 	uchar *fnRulebase; /* lognorm rulebase filename for container log filename match */
45f82e
 	char *contRules; /* lognorm rules for CONTAINER_NAME value match */
45f82e
 	uchar *contRulebase; /* lognorm rulebase filename for CONTAINER_NAME value match */
45f82e
+	int busyRetryInterval; /* how to handle 429 response - 0 means error, non-zero means retry every N seconds */
45f82e
 };
45f82e
 
45f82e
 /* action (instance) configuration data */
45f82e
@@ -135,6 +145,8 @@ typedef struct _instanceData {
45f82e
 	msgPropDescr_t *srcMetadataDescr;	/* where to get data for kubernetes queries */
45f82e
 	uchar *dstMetadataPath;	/* where to put metadata obtained from kubernetes */
45f82e
 	uchar *caCertFile; /* File holding the CA cert (+optional chain) of CA that issued the Kubernetes server cert */
45f82e
+	uchar *myCertFile; /* File holding cert corresponding to private key used for client cert auth */
45f82e
+	uchar *myPrivKeyFile; /* File holding private key corresponding to cert used for client cert auth */
45f82e
 	sbool allowUnsignedCerts; /* For testing/debugging - do not check for CA certs (CURLOPT_SSL_VERIFYPEER FALSE) */
45f82e
 	uchar *token; /* The token value to use to authenticate to Kubernetes - takes precedence over tokenFile */
45f82e
 	uchar *tokenFile; /* The file whose contents is the token value to use to authenticate to Kubernetes */
45f82e
@@ -151,6 +163,7 @@ typedef struct _instanceData {
45f82e
 	msgPropDescr_t *contNameDescr; /* CONTAINER_NAME field */
45f82e
 	msgPropDescr_t *contIdFullDescr; /* CONTAINER_ID_FULL field */
45f82e
 	struct cache_s *cache;
45f82e
+	int busyRetryInterval; /* how to handle 429 response - 0 means error, non-zero means retry every N seconds */
45f82e
 } instanceData;
45f82e
 
45f82e
 typedef struct wrkrInstanceData {
45f82e
@@ -159,6 +172,16 @@ typedef struct wrkrInstanceData {
45f82e
 	struct curl_slist *curlHdr;
45f82e
 	char *curlRply;
45f82e
 	size_t curlRplyLen;
45f82e
+	statsobj_t *stats; /* stats for this instance */
45f82e
+	STATSCOUNTER_DEF(k8sRecordSeen, mutK8sRecordSeen)
45f82e
+	STATSCOUNTER_DEF(namespaceMetadataSuccess, mutNamespaceMetadataSuccess)
45f82e
+	STATSCOUNTER_DEF(namespaceMetadataNotFound, mutNamespaceMetadataNotFound)
45f82e
+	STATSCOUNTER_DEF(namespaceMetadataBusy, mutNamespaceMetadataBusy)
45f82e
+	STATSCOUNTER_DEF(namespaceMetadataError, mutNamespaceMetadataError)
45f82e
+	STATSCOUNTER_DEF(podMetadataSuccess, mutPodMetadataSuccess)
45f82e
+	STATSCOUNTER_DEF(podMetadataNotFound, mutPodMetadataNotFound)
45f82e
+	STATSCOUNTER_DEF(podMetadataBusy, mutPodMetadataBusy)
45f82e
+	STATSCOUNTER_DEF(podMetadataError, mutPodMetadataError)
45f82e
 } wrkrInstanceData_t;
45f82e
 
45f82e
 /* module parameters (v6 config format) */
45f82e
@@ -167,6 +190,8 @@ static struct cnfparamdescr modpdescr[] = {
45f82e
 	{ "srcmetadatapath", eCmdHdlrString, 0 },
45f82e
 	{ "dstmetadatapath", eCmdHdlrString, 0 },
45f82e
 	{ "tls.cacert", eCmdHdlrString, 0 },
45f82e
+	{ "tls.mycert", eCmdHdlrString, 0 },
45f82e
+	{ "tls.myprivkey", eCmdHdlrString, 0 },
45f82e
 	{ "allowunsignedcerts", eCmdHdlrBinary, 0 },
45f82e
 	{ "token", eCmdHdlrString, 0 },
45f82e
 	{ "tokenfile", eCmdHdlrString, 0 },
45f82e
@@ -174,7 +199,8 @@ static struct cnfparamdescr modpdescr[] = {
45f82e
 	{ "de_dot", eCmdHdlrBinary, 0 },
45f82e
 	{ "de_dot_separator", eCmdHdlrString, 0 },
45f82e
 	{ "filenamerulebase", eCmdHdlrString, 0 },
45f82e
-	{ "containerrulebase", eCmdHdlrString, 0 }
45f82e
+	{ "containerrulebase", eCmdHdlrString, 0 },
45f82e
+	{ "busyretryinterval", eCmdHdlrInt, 0 }
45f82e
 #if HAVE_LOADSAMPLESFROMSTRING == 1
45f82e
 	,
45f82e
 	{ "filenamerules", eCmdHdlrArray, 0 },
45f82e
@@ -193,6 +219,8 @@ static struct cnfparamdescr actpdescr[] = {
45f82e
 	{ "srcmetadatapath", eCmdHdlrString, 0 },
45f82e
 	{ "dstmetadatapath", eCmdHdlrString, 0 },
45f82e
 	{ "tls.cacert", eCmdHdlrString, 0 },
45f82e
+	{ "tls.mycert", eCmdHdlrString, 0 },
45f82e
+	{ "tls.myprivkey", eCmdHdlrString, 0 },
45f82e
 	{ "allowunsignedcerts", eCmdHdlrBinary, 0 },
45f82e
 	{ "token", eCmdHdlrString, 0 },
45f82e
 	{ "tokenfile", eCmdHdlrString, 0 },
45f82e
@@ -200,7 +228,8 @@ static struct cnfparamdescr actpdescr[] = {
45f82e
 	{ "de_dot", eCmdHdlrBinary, 0 },
45f82e
 	{ "de_dot_separator", eCmdHdlrString, 0 },
45f82e
 	{ "filenamerulebase", eCmdHdlrString, 0 },
45f82e
-	{ "containerrulebase", eCmdHdlrString, 0 }
45f82e
+	{ "containerrulebase", eCmdHdlrString, 0 },
45f82e
+	{ "busyretryinterval", eCmdHdlrInt, 0 }
45f82e
 #if HAVE_LOADSAMPLESFROMSTRING == 1
45f82e
 	,
45f82e
 	{ "filenamerules", eCmdHdlrArray, 0 },
45f82e
@@ -493,8 +522,9 @@ ENDbeginCnfLoad
45f82e
 BEGINsetModCnf
45f82e
 	struct cnfparamvals *pvals = NULL;
45f82e
 	int i;
45f82e
-	FILE *fp;
45f82e
+	FILE *fp = NULL;
45f82e
 	int ret;
45f82e
+	char errStr[1024];
45f82e
 CODESTARTsetModCnf
45f82e
 	pvals = nvlstGetParams(lst, &modpblk, NULL);
45f82e
 	if(pvals == NULL) {
45f82e
@@ -509,6 +539,7 @@ CODESTARTsetModCnf
45f82e
 	}
45f82e
 
45f82e
 	loadModConf->de_dot = DFLT_DE_DOT;
45f82e
+	loadModConf->busyRetryInterval = DFLT_BUSY_RETRY_INTERVAL;
45f82e
 	for(i = 0 ; i < modpblk.nParams ; ++i) {
45f82e
 		if(!pvals[i].bUsed) {
45f82e
 			continue;
45f82e
@@ -528,15 +559,42 @@ CODESTARTsetModCnf
45f82e
 			loadModConf->caCertFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
45f82e
 			fp = fopen((const char*)loadModConf->caCertFile, "r");
45f82e
 			if(fp == NULL) {
45f82e
-				char errStr[1024];
45f82e
 				rs_strerror_r(errno, errStr, sizeof(errStr));
45f82e
 				iRet = RS_RET_NO_FILE_ACCESS;
45f82e
 				LogError(0, iRet,
45f82e
-						"error: certificate file %s couldn't be accessed: %s\n",
45f82e
+						"error: 'tls.cacert' file %s couldn't be accessed: %s\n",
45f82e
 						loadModConf->caCertFile, errStr);
45f82e
 				ABORT_FINALIZE(iRet);
45f82e
 			} else {
45f82e
 				fclose(fp);
45f82e
+				fp = NULL;
45f82e
+			}
45f82e
+		} else if(!strcmp(modpblk.descr[i].name, "tls.mycert")) {
45f82e
+			free(loadModConf->myCertFile);
45f82e
+			loadModConf->myCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
45f82e
+			fp = fopen((const char*)loadModConf->myCertFile, "r");
45f82e
+			if(fp == NULL) {
45f82e
+				rs_strerror_r(errno, errStr, sizeof(errStr));
45f82e
+				iRet = RS_RET_NO_FILE_ACCESS;
45f82e
+				LogError(0, iRet,
45f82e
+						"error: 'tls.mycert' file %s couldn't be accessed: %s\n",
45f82e
+						loadModConf->myCertFile, errStr);
45f82e
+			} else {
45f82e
+				fclose(fp);
45f82e
+				fp = NULL;
45f82e
+			}
45f82e
+		} else if(!strcmp(modpblk.descr[i].name, "tls.myprivkey")) {
45f82e
+			loadModConf->myPrivKeyFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
45f82e
+			fp = fopen((const char*)loadModConf->myPrivKeyFile, "r");
45f82e
+			if(fp == NULL) {
45f82e
+				rs_strerror_r(errno, errStr, sizeof(errStr));
45f82e
+				iRet = RS_RET_NO_FILE_ACCESS;
45f82e
+				LogError(0, iRet,
45f82e
+						"error: 'tls.myprivkey' file %s couldn't be accessed: %s\n",
45f82e
+						loadModConf->myPrivKeyFile, errStr);
45f82e
+			} else {
45f82e
+				fclose(fp);
45f82e
+				fp = NULL;
45f82e
 			}
45f82e
 		} else if(!strcmp(modpblk.descr[i].name, "allowunsignedcerts")) {
45f82e
 			loadModConf->allowUnsignedCerts = pvals[i].val.d.n;
45f82e
@@ -548,7 +606,6 @@ CODESTARTsetModCnf
45f82e
 			loadModConf->tokenFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
45f82e
 			fp = fopen((const char*)loadModConf->tokenFile, "r");
45f82e
 			if(fp == NULL) {
45f82e
-				char errStr[1024];
45f82e
 				rs_strerror_r(errno, errStr, sizeof(errStr));
45f82e
 				iRet = RS_RET_NO_FILE_ACCESS;
45f82e
 				LogError(0, iRet,
45f82e
@@ -557,6 +614,7 @@ CODESTARTsetModCnf
45f82e
 				ABORT_FINALIZE(iRet);
45f82e
 			} else {
45f82e
 				fclose(fp);
45f82e
+				fp = NULL;
45f82e
 			}
45f82e
 		} else if(!strcmp(modpblk.descr[i].name, "annotation_match")) {
45f82e
 			free_annotationmatch(&loadModConf->annotation_match);
45f82e
@@ -577,7 +635,6 @@ CODESTARTsetModCnf
45f82e
 			loadModConf->fnRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
45f82e
 			fp = fopen((const char*)loadModConf->fnRulebase, "r");
45f82e
 			if(fp == NULL) {
45f82e
-				char errStr[1024];
45f82e
 				rs_strerror_r(errno, errStr, sizeof(errStr));
45f82e
 				iRet = RS_RET_NO_FILE_ACCESS;
45f82e
 				LogError(0, iRet,
45f82e
@@ -586,6 +643,7 @@ CODESTARTsetModCnf
45f82e
 				ABORT_FINALIZE(iRet);
45f82e
 			} else {
45f82e
 				fclose(fp);
45f82e
+				fp = NULL;
45f82e
 			}
45f82e
 #if HAVE_LOADSAMPLESFROMSTRING == 1
45f82e
 		} else if(!strcmp(modpblk.descr[i].name, "containerrules")) {
45f82e
@@ -597,7 +655,6 @@ CODESTARTsetModCnf
45f82e
 			loadModConf->contRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
45f82e
 			fp = fopen((const char*)loadModConf->contRulebase, "r");
45f82e
 			if(fp == NULL) {
45f82e
-				char errStr[1024];
45f82e
 				rs_strerror_r(errno, errStr, sizeof(errStr));
45f82e
 				iRet = RS_RET_NO_FILE_ACCESS;
45f82e
 				LogError(0, iRet,
45f82e
@@ -606,7 +663,10 @@ CODESTARTsetModCnf
45f82e
 				ABORT_FINALIZE(iRet);
45f82e
 			} else {
45f82e
 				fclose(fp);
45f82e
+				fp = NULL;
45f82e
 			}
45f82e
+		} else if(!strcmp(modpblk.descr[i].name, "busyretryinterval")) {
45f82e
+			loadModConf->busyRetryInterval = pvals[i].val.d.n;
45f82e
 		} else {
45f82e
 			dbgprintf("mmkubernetes: program error, non-handled "
45f82e
 				"param '%s' in module() block\n", modpblk.descr[i].name);
45f82e
@@ -650,6 +710,8 @@ CODESTARTsetModCnf
45f82e
 	caches = calloc(1, sizeof(struct cache_s *));
45f82e
 
45f82e
 finalize_it:
45f82e
+	if (fp)
45f82e
+		fclose(fp);
45f82e
 	if(pvals != NULL)
45f82e
 		cnfparamvalsDestruct(pvals, &modpblk);
45f82e
 ENDsetModCnf
45f82e
@@ -667,6 +729,8 @@ CODESTARTfreeInstance
45f82e
 	free(pData->srcMetadataDescr);
45f82e
 	free(pData->dstMetadataPath);
45f82e
 	free(pData->caCertFile);
45f82e
+	free(pData->myCertFile);
45f82e
+	free(pData->myPrivKeyFile);
45f82e
 	free(pData->token);
45f82e
 	free(pData->tokenFile);
45f82e
 	free(pData->fnRules);
45f82e
@@ -710,6 +774,45 @@ CODESTARTcreateWrkrInstance
45f82e
 	char *tokenHdr = NULL;
45f82e
 	FILE *fp = NULL;
45f82e
 	char *token = NULL;
45f82e
+	char *statsName = NULL;
45f82e
+
45f82e
+	CHKiRet(statsobj.Construct(&(pWrkrData->stats)));
45f82e
+	if ((-1 == asprintf(&statsName, "mmkubernetes(%s)", pWrkrData->pData->kubernetesUrl)) ||
45f82e
+		(!statsName)) {
45f82e
+		ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
45f82e
+	}
45f82e
+	CHKiRet(statsobj.SetName(pWrkrData->stats, (uchar *)statsName));
45f82e
+	free(statsName);
45f82e
+	statsName = NULL;
45f82e
+	CHKiRet(statsobj.SetOrigin(pWrkrData->stats, UCHAR_CONSTANT("mmkubernetes")));
45f82e
+	STATSCOUNTER_INIT(pWrkrData->k8sRecordSeen, pWrkrData->mutK8sRecordSeen);
45f82e
+	CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("recordseen"),
45f82e
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->k8sRecordSeen)));
45f82e
+	STATSCOUNTER_INIT(pWrkrData->namespaceMetadataSuccess, pWrkrData->mutNamespaceMetadataSuccess);
45f82e
+	CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("namespacemetadatasuccess"),
45f82e
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->namespaceMetadataSuccess)));
45f82e
+	STATSCOUNTER_INIT(pWrkrData->namespaceMetadataNotFound, pWrkrData->mutNamespaceMetadataNotFound);
45f82e
+	CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("namespacemetadatanotfound"),
45f82e
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->namespaceMetadataNotFound)));
45f82e
+	STATSCOUNTER_INIT(pWrkrData->namespaceMetadataBusy, pWrkrData->mutNamespaceMetadataBusy);
45f82e
+	CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("namespacemetadatabusy"),
45f82e
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->namespaceMetadataBusy)));
45f82e
+	STATSCOUNTER_INIT(pWrkrData->namespaceMetadataError, pWrkrData->mutNamespaceMetadataError);
45f82e
+	CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("namespacemetadataerror"),
45f82e
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->namespaceMetadataError)));
45f82e
+	STATSCOUNTER_INIT(pWrkrData->podMetadataSuccess, pWrkrData->mutPodMetadataSuccess);
45f82e
+	CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("podmetadatasuccess"),
45f82e
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->podMetadataSuccess)));
45f82e
+	STATSCOUNTER_INIT(pWrkrData->podMetadataNotFound, pWrkrData->mutPodMetadataNotFound);
45f82e
+	CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("podmetadatanotfound"),
45f82e
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->podMetadataNotFound)));
45f82e
+	STATSCOUNTER_INIT(pWrkrData->podMetadataBusy, pWrkrData->mutPodMetadataBusy);
45f82e
+	CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("podmetadatabusy"),
45f82e
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->podMetadataBusy)));
45f82e
+	STATSCOUNTER_INIT(pWrkrData->podMetadataError, pWrkrData->mutPodMetadataError);
45f82e
+	CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("podmetadataerror"),
45f82e
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->podMetadataError)));
45f82e
+	CHKiRet(statsobj.ConstructFinalize(pWrkrData->stats));
45f82e
 
45f82e
 	hdr = curl_slist_append(hdr, "Content-Type: text/json; charset=utf-8");
45f82e
 	if (pWrkrData->pData->token) {
45f82e
@@ -749,12 +852,20 @@ CODESTARTcreateWrkrInstance
45f82e
 	curl_easy_setopt(ctx, CURLOPT_WRITEDATA, pWrkrData);
45f82e
 	if(pWrkrData->pData->caCertFile)
45f82e
 		curl_easy_setopt(ctx, CURLOPT_CAINFO, pWrkrData->pData->caCertFile);
45f82e
+	if(pWrkrData->pData->myCertFile)
45f82e
+		curl_easy_setopt(ctx, CURLOPT_SSLCERT, pWrkrData->pData->myCertFile);
45f82e
+	if(pWrkrData->pData->myPrivKeyFile)
45f82e
+		curl_easy_setopt(ctx, CURLOPT_SSLKEY, pWrkrData->pData->myPrivKeyFile);
45f82e
 	if(pWrkrData->pData->allowUnsignedCerts)
45f82e
 		curl_easy_setopt(ctx, CURLOPT_SSL_VERIFYPEER, 0);
45f82e
 
45f82e
 	pWrkrData->curlCtx = ctx;
45f82e
 finalize_it:
45f82e
 	free(token);
45f82e
+	free(statsName);
45f82e
+	if ((iRet != RS_RET_OK) && pWrkrData->stats) {
45f82e
+		statsobj.Destruct(&(pWrkrData->stats));
45f82e
+	}
45f82e
 	if (fp) {
45f82e
 		fclose(fp);
45f82e
 	}
45f82e
@@ -765,6 +876,7 @@ BEGINfreeWrkrInstance
45f82e
 CODESTARTfreeWrkrInstance
45f82e
 	curl_easy_cleanup(pWrkrData->curlCtx);
45f82e
 	curl_slist_free_all(pWrkrData->curlHdr);
45f82e
+	statsobj.Destruct(&(pWrkrData->stats));
45f82e
 ENDfreeWrkrInstance
45f82e
 
45f82e
 
45f82e
@@ -790,6 +902,8 @@ cacheNew(const uchar *const url)
45f82e
 		key_equals_string, hashtable_json_object_put);
45f82e
 	cache->nsHt = create_hashtable(100, hash_from_string,
45f82e
 		key_equals_string, hashtable_json_object_put);
45f82e
+	dbgprintf("mmkubernetes: created cache mdht [%p] nsht [%p]\n",
45f82e
+			cache->mdHt, cache->nsHt);
45f82e
 	cache->cacheMtx = malloc(sizeof(pthread_mutex_t));
45f82e
 	if (!cache->mdHt || !cache->nsHt || !cache->cacheMtx) {
45f82e
 		free (cache);
45f82e
@@ -797,6 +911,7 @@ cacheNew(const uchar *const url)
45f82e
 		FINALIZE;
45f82e
 	}
45f82e
 	pthread_mutex_init(cache->cacheMtx, NULL);
45f82e
+	cache->lastBusyTime = 0;
45f82e
 
45f82e
 finalize_it:
45f82e
 	return cache;
45f82e
@@ -816,9 +931,10 @@ static void cacheFree(struct cache_s *cache)
45f82e
 BEGINnewActInst
45f82e
 	struct cnfparamvals *pvals = NULL;
45f82e
 	int i;
45f82e
-	FILE *fp;
45f82e
+	FILE *fp = NULL;
45f82e
 	char *rxstr = NULL;
45f82e
 	char *srcMetadataPath = NULL;
45f82e
+	char errStr[1024];
45f82e
 CODESTARTnewActInst
45f82e
 	DBGPRINTF("newActInst (mmkubernetes)\n");
45f82e
 
45f82e
@@ -840,6 +956,7 @@ CODESTARTnewActInst
45f82e
 
45f82e
 	pData->de_dot = loadModConf->de_dot;
45f82e
 	pData->allowUnsignedCerts = loadModConf->allowUnsignedCerts;
45f82e
+	pData->busyRetryInterval = loadModConf->busyRetryInterval;
45f82e
 	for(i = 0 ; i < actpblk.nParams ; ++i) {
45f82e
 		if(!pvals[i].bUsed) {
45f82e
 			continue;
45f82e
@@ -863,7 +980,6 @@ CODESTARTnewActInst
45f82e
 			pData->caCertFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
45f82e
 			fp = fopen((const char*)pData->caCertFile, "r");
45f82e
 			if(fp == NULL) {
45f82e
-				char errStr[1024];
45f82e
 				rs_strerror_r(errno, errStr, sizeof(errStr));
45f82e
 				iRet = RS_RET_NO_FILE_ACCESS;
45f82e
 				LogError(0, iRet,
45f82e
@@ -872,6 +988,33 @@ CODESTARTnewActInst
45f82e
 				ABORT_FINALIZE(iRet);
45f82e
 			} else {
45f82e
 				fclose(fp);
45f82e
+				fp = NULL;
45f82e
+			}
45f82e
+		} else if(!strcmp(actpblk.descr[i].name, "tls.mycert")) {
45f82e
+			pData->myCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
45f82e
+			fp = fopen((const char*)pData->myCertFile, "r");
45f82e
+			if(fp == NULL) {
45f82e
+				rs_strerror_r(errno, errStr, sizeof(errStr));
45f82e
+				iRet = RS_RET_NO_FILE_ACCESS;
45f82e
+				LogError(0, iRet,
45f82e
+						"error: 'tls.mycert' file %s couldn't be accessed: %s\n",
45f82e
+						pData->myCertFile, errStr);
45f82e
+			} else {
45f82e
+				fclose(fp);
45f82e
+				fp = NULL;
45f82e
+			}
45f82e
+		} else if(!strcmp(actpblk.descr[i].name, "tls.myprivkey")) {
45f82e
+			pData->myPrivKeyFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
45f82e
+			fp = fopen((const char*)pData->myPrivKeyFile, "r");
45f82e
+			if(fp == NULL) {
45f82e
+				rs_strerror_r(errno, errStr, sizeof(errStr));
45f82e
+				iRet = RS_RET_NO_FILE_ACCESS;
45f82e
+				LogError(0, iRet,
45f82e
+						"error: 'tls.myprivkey' file %s couldn't be accessed: %s\n",
45f82e
+						pData->myPrivKeyFile, errStr);
45f82e
+			} else {
45f82e
+				fclose(fp);
45f82e
+				fp = NULL;
45f82e
 			}
45f82e
 		} else if(!strcmp(actpblk.descr[i].name, "allowunsignedcerts")) {
45f82e
 			pData->allowUnsignedCerts = pvals[i].val.d.n;
45f82e
@@ -883,7 +1026,6 @@ CODESTARTnewActInst
45f82e
 			pData->tokenFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
45f82e
 			fp = fopen((const char*)pData->tokenFile, "r");
45f82e
 			if(fp == NULL) {
45f82e
-				char errStr[1024];
45f82e
 				rs_strerror_r(errno, errStr, sizeof(errStr));
45f82e
 				iRet = RS_RET_NO_FILE_ACCESS;
45f82e
 				LogError(0, iRet,
45f82e
@@ -892,6 +1034,7 @@ CODESTARTnewActInst
45f82e
 				ABORT_FINALIZE(iRet);
45f82e
 			} else {
45f82e
 				fclose(fp);
45f82e
+				fp = NULL;
45f82e
 			}
45f82e
 		} else if(!strcmp(actpblk.descr[i].name, "annotation_match")) {
45f82e
 			free_annotationmatch(&pData->annotation_match);
45f82e
@@ -912,7 +1055,6 @@ CODESTARTnewActInst
45f82e
 			pData->fnRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
45f82e
 			fp = fopen((const char*)pData->fnRulebase, "r");
45f82e
 			if(fp == NULL) {
45f82e
-				char errStr[1024];
45f82e
 				rs_strerror_r(errno, errStr, sizeof(errStr));
45f82e
 				iRet = RS_RET_NO_FILE_ACCESS;
45f82e
 				LogError(0, iRet,
45f82e
@@ -921,6 +1063,7 @@ CODESTARTnewActInst
45f82e
 				ABORT_FINALIZE(iRet);
45f82e
 			} else {
45f82e
 				fclose(fp);
45f82e
+				fp = NULL;
45f82e
 			}
45f82e
 #if HAVE_LOADSAMPLESFROMSTRING == 1
45f82e
 		} else if(!strcmp(modpblk.descr[i].name, "containerrules")) {
45f82e
@@ -932,7 +1075,6 @@ CODESTARTnewActInst
45f82e
 			pData->contRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
45f82e
 			fp = fopen((const char*)pData->contRulebase, "r");
45f82e
 			if(fp == NULL) {
45f82e
-				char errStr[1024];
45f82e
 				rs_strerror_r(errno, errStr, sizeof(errStr));
45f82e
 				iRet = RS_RET_NO_FILE_ACCESS;
45f82e
 				LogError(0, iRet,
45f82e
@@ -941,7 +1083,10 @@ CODESTARTnewActInst
45f82e
 				ABORT_FINALIZE(iRet);
45f82e
 			} else {
45f82e
 				fclose(fp);
45f82e
+				fp = NULL;
45f82e
 			}
45f82e
+		} else if(!strcmp(actpblk.descr[i].name, "busyretryinterval")) {
45f82e
+			pData->busyRetryInterval = pvals[i].val.d.n;
45f82e
 		} else {
45f82e
 			dbgprintf("mmkubernetes: program error, non-handled "
45f82e
 				"param '%s' in action() block\n", actpblk.descr[i].name);
45f82e
@@ -982,6 +1127,10 @@ CODESTARTnewActInst
45f82e
 		pData->dstMetadataPath = (uchar *) strdup((char *) loadModConf->dstMetadataPath);
45f82e
 	if(pData->caCertFile == NULL && loadModConf->caCertFile)
45f82e
 		pData->caCertFile = (uchar *) strdup((char *) loadModConf->caCertFile);
45f82e
+	if(pData->myCertFile == NULL && loadModConf->myCertFile)
45f82e
+		pData->myCertFile = (uchar *) strdup((char *) loadModConf->myCertFile);
45f82e
+	if(pData->myPrivKeyFile == NULL && loadModConf->myPrivKeyFile)
45f82e
+		pData->myPrivKeyFile = (uchar *) strdup((char *) loadModConf->myPrivKeyFile);
45f82e
 	if(pData->token == NULL && loadModConf->token)
45f82e
 		pData->token = (uchar *) strdup((char *) loadModConf->token);
45f82e
 	if(pData->tokenFile == NULL && loadModConf->tokenFile)
45f82e
@@ -1018,6 +1167,8 @@ CODESTARTnewActInst
45f82e
 CODE_STD_FINALIZERnewActInst
45f82e
 	if(pvals != NULL)
45f82e
 		cnfparamvalsDestruct(pvals, &actpblk);
45f82e
+	if(fp)
45f82e
+		fclose(fp);
45f82e
 	free(rxstr);
45f82e
 	free(srcMetadataPath);
45f82e
 ENDnewActInst
45f82e
@@ -1061,6 +1212,8 @@ CODESTARTfreeCnf
45f82e
 	free(pModConf->srcMetadataPath);
45f82e
 	free(pModConf->dstMetadataPath);
45f82e
 	free(pModConf->caCertFile);
45f82e
+	free(pModConf->myCertFile);
45f82e
+	free(pModConf->myPrivKeyFile);
45f82e
 	free(pModConf->token);
45f82e
 	free(pModConf->tokenFile);
45f82e
 	free(pModConf->de_dot_separator);
45f82e
@@ -1069,8 +1222,11 @@ CODESTARTfreeCnf
45f82e
 	free(pModConf->contRules);
45f82e
 	free(pModConf->contRulebase);
45f82e
 	free_annotationmatch(&pModConf->annotation_match);
45f82e
-	for(i = 0; caches[i] != NULL; i++)
45f82e
+	for(i = 0; caches[i] != NULL; i++) {
45f82e
+		dbgprintf("mmkubernetes: freeing cache [%d] mdht [%p] nsht [%p]\n",
45f82e
+				i, caches[i]->mdHt, caches[i]->nsHt);
45f82e
 		cacheFree(caches[i]);
45f82e
+	}
45f82e
 	free(caches);
45f82e
 ENDfreeCnf
45f82e
 
45f82e
@@ -1082,6 +1238,8 @@ CODESTARTdbgPrintInstInfo
45f82e
 	dbgprintf("\tsrcMetadataPath='%s'\n", pData->srcMetadataDescr->name);
45f82e
 	dbgprintf("\tdstMetadataPath='%s'\n", pData->dstMetadataPath);
45f82e
 	dbgprintf("\ttls.cacert='%s'\n", pData->caCertFile);
45f82e
+	dbgprintf("\ttls.mycert='%s'\n", pData->myCertFile);
45f82e
+	dbgprintf("\ttls.myprivkey='%s'\n", pData->myPrivKeyFile);
45f82e
 	dbgprintf("\tallowUnsignedCerts='%d'\n", pData->allowUnsignedCerts);
45f82e
 	dbgprintf("\ttoken='%s'\n", pData->token);
45f82e
 	dbgprintf("\ttokenFile='%s'\n", pData->tokenFile);
45f82e
@@ -1093,6 +1251,7 @@ CODESTARTdbgPrintInstInfo
45f82e
 	dbgprintf("\tfilenamerules='%s'\n", pData->fnRules);
45f82e
 	dbgprintf("\tcontainerrules='%s'\n", pData->contRules);
45f82e
 #endif
45f82e
+	dbgprintf("\tbusyretryinterval='%d'\n", pData->busyRetryInterval);
45f82e
 ENDdbgPrintInstInfo
45f82e
 
45f82e
 
45f82e
@@ -1206,6 +1365,24 @@ queryKB(wrkrInstanceData_t *pWrkrData, char *url, struct json_object **rply)
45f82e
 	struct json_object *jo;
45f82e
 	long resp_code = 400;
45f82e
 
45f82e
+	if (pWrkrData->pData->cache->lastBusyTime) {
45f82e
+		time_t now;
45f82e
+		datetime.GetTime(&now;;
45f82e
+		now -= pWrkrData->pData->cache->lastBusyTime;
45f82e
+		if (now < pWrkrData->pData->busyRetryInterval) {
45f82e
+			LogMsg(0, RS_RET_RETRY, LOG_DEBUG,
45f82e
+				"mmkubernetes: Waited [%ld] of [%d] seconds for the requested url [%s]\n",
45f82e
+				now, pWrkrData->pData->busyRetryInterval, url);
45f82e
+			ABORT_FINALIZE(RS_RET_RETRY);
45f82e
+		} else {
45f82e
+			LogMsg(0, RS_RET_OK, LOG_DEBUG,
45f82e
+				"mmkubernetes: Cleared busy status after [%d] seconds - "
45f82e
+				"will retry the requested url [%s]\n",
45f82e
+				pWrkrData->pData->busyRetryInterval, url);
45f82e
+			pWrkrData->pData->cache->lastBusyTime = 0;
45f82e
+		}
45f82e
+	}
45f82e
+
45f82e
 	/* query kubernetes for pod info */
45f82e
 	ccode = curl_easy_setopt(pWrkrData->curlCtx, CURLOPT_URL, url);
45f82e
 	if(ccode != CURLE_OK)
45f82e
@@ -1238,17 +1415,23 @@ queryKB(wrkrInstanceData_t *pWrkrData, char *url, struct json_object **rply)
45f82e
 		ABORT_FINALIZE(RS_RET_ERR);
45f82e
 	}
45f82e
 	if(resp_code == 404) {
45f82e
-		LogMsg(0, RS_RET_ERR, LOG_ERR,
45f82e
+		LogMsg(0, RS_RET_NOT_FOUND, LOG_INFO,
45f82e
 			      "mmkubernetes: Not Found: the resource does not exist at url [%s]\n",
45f82e
 			      url);
45f82e
-		ABORT_FINALIZE(RS_RET_ERR);
45f82e
+		ABORT_FINALIZE(RS_RET_NOT_FOUND);
45f82e
 	}
45f82e
 	if(resp_code == 429) {
45f82e
-		LogMsg(0, RS_RET_ERR, LOG_ERR,
45f82e
+		if (pWrkrData->pData->busyRetryInterval) {
45f82e
+			time_t now;
45f82e
+			datetime.GetTime(&now;;
45f82e
+			pWrkrData->pData->cache->lastBusyTime = now;
45f82e
+		}
45f82e
+
45f82e
+		LogMsg(0, RS_RET_RETRY, LOG_INFO,
45f82e
 			      "mmkubernetes: Too Many Requests: the server is too heavily loaded "
45f82e
 			      "to provide the data for the requested url [%s]\n",
45f82e
 			      url);
45f82e
-		ABORT_FINALIZE(RS_RET_ERR);
45f82e
+		ABORT_FINALIZE(RS_RET_RETRY);
45f82e
 	}
45f82e
 	if(resp_code != 200) {
45f82e
 		LogMsg(0, RS_RET_ERR, LOG_ERR,
45f82e
@@ -1299,12 +1482,14 @@ BEGINdoAction
45f82e
 	char *mdKey = NULL;
45f82e
 	struct json_object *jMetadata = NULL, *jMetadataCopy = NULL, *jMsgMeta = NULL,
45f82e
 			*jo = NULL;
45f82e
-	int add_ns_metadata = 0;
45f82e
+	int add_pod_metadata = 1;
45f82e
 CODESTARTdoAction
45f82e
 	CHKiRet_Hdlr(extractMsgMetadata(pMsg, pWrkrData->pData, &jMsgMeta)) {
45f82e
 		ABORT_FINALIZE((iRet == RS_RET_NOT_FOUND) ? RS_RET_OK : iRet);
45f82e
 	}
45f82e
 
45f82e
+	STATSCOUNTER_INC(pWrkrData->k8sRecordSeen, pWrkrData->mutK8sRecordSeen);
45f82e
+
45f82e
 	if (fjson_object_object_get_ex(jMsgMeta, "pod_name", &jo))
45f82e
 		podName = json_object_get_string(jo);
45f82e
 	if (fjson_object_object_get_ex(jMsgMeta, "namespace_name", &jo))
45f82e
@@ -1347,28 +1532,49 @@ CODESTARTdoAction
45f82e
 			}
45f82e
 			iRet = queryKB(pWrkrData, url, &jReply);
45f82e
 			free(url);
45f82e
-			/* todo: implement support for the .orphaned namespace */
45f82e
-			if (iRet != RS_RET_OK) {
45f82e
+			if (iRet == RS_RET_NOT_FOUND) {
45f82e
+				/* negative cache namespace - make a dummy empty namespace metadata object */
45f82e
+				jNsMeta = json_object_new_object();
45f82e
+				STATSCOUNTER_INC(pWrkrData->namespaceMetadataNotFound,
45f82e
+						 pWrkrData->mutNamespaceMetadataNotFound);
45f82e
+			} else if (iRet == RS_RET_RETRY) {
45f82e
+				/* server is busy - retry or error */
45f82e
+				STATSCOUNTER_INC(pWrkrData->namespaceMetadataBusy,
45f82e
+						 pWrkrData->mutNamespaceMetadataBusy);
45f82e
+				if (0 == pWrkrData->pData->busyRetryInterval) {
45f82e
+					pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
45f82e
+					ABORT_FINALIZE(RS_RET_ERR);
45f82e
+				}
45f82e
+				add_pod_metadata = 0; /* don't cache pod metadata either - retry both */
45f82e
+			} else if (iRet != RS_RET_OK) {
45f82e
+				/* hard error - something the admin needs to fix e.g. network, config, auth */
45f82e
 				json_object_put(jReply);
45f82e
 				jReply = NULL;
45f82e
+				STATSCOUNTER_INC(pWrkrData->namespaceMetadataError,
45f82e
+						 pWrkrData->mutNamespaceMetadataError);
45f82e
 				pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
45f82e
 				FINALIZE;
45f82e
-			}
45f82e
-
45f82e
-			if(fjson_object_object_get_ex(jReply, "metadata", &jNsMeta)) {
45f82e
+			} else if (fjson_object_object_get_ex(jReply, "metadata", &jNsMeta)) {
45f82e
 				jNsMeta = json_object_get(jNsMeta);
45f82e
 				parse_labels_annotations(jNsMeta, &pWrkrData->pData->annotation_match,
45f82e
 					pWrkrData->pData->de_dot,
45f82e
 					(const char *)pWrkrData->pData->de_dot_separator,
45f82e
 					pWrkrData->pData->de_dot_separator_len);
45f82e
-				add_ns_metadata = 1;
45f82e
+				STATSCOUNTER_INC(pWrkrData->namespaceMetadataSuccess,
45f82e
+						 pWrkrData->mutNamespaceMetadataSuccess);
45f82e
 			} else {
45f82e
 				/* namespace with no metadata??? */
45f82e
 				LogMsg(0, RS_RET_ERR, LOG_INFO,
45f82e
 					      "mmkubernetes: namespace [%s] has no metadata!\n", ns);
45f82e
-				jNsMeta = NULL;
45f82e
+				/* negative cache namespace - make a dummy empty namespace metadata object */
45f82e
+				jNsMeta = json_object_new_object();
45f82e
+				STATSCOUNTER_INC(pWrkrData->namespaceMetadataSuccess,
45f82e
+						 pWrkrData->mutNamespaceMetadataSuccess);
45f82e
 			}
45f82e
 
45f82e
+			if(jNsMeta) {
45f82e
+				hashtable_insert(pWrkrData->pData->cache->nsHt, strdup(ns), jNsMeta);
45f82e
+			}
45f82e
 			json_object_put(jReply);
45f82e
 			jReply = NULL;
45f82e
 		}
45f82e
@@ -1381,14 +1587,28 @@ CODESTARTdoAction
45f82e
 		}
45f82e
 		iRet = queryKB(pWrkrData, url, &jReply);
45f82e
 		free(url);
45f82e
-		if(iRet != RS_RET_OK) {
45f82e
-			if(jNsMeta && add_ns_metadata) {
45f82e
-				hashtable_insert(pWrkrData->pData->cache->nsHt, strdup(ns), jNsMeta);
45f82e
+		if (iRet == RS_RET_NOT_FOUND) {
45f82e
+			/* negative cache pod - make a dummy empty pod metadata object */
45f82e
+			iRet = RS_RET_OK;
45f82e
+			STATSCOUNTER_INC(pWrkrData->podMetadataNotFound, pWrkrData->mutPodMetadataNotFound);
45f82e
+		} else if (iRet == RS_RET_RETRY) {
45f82e
+			/* server is busy - retry or error */
45f82e
+			STATSCOUNTER_INC(pWrkrData->podMetadataBusy, pWrkrData->mutPodMetadataBusy);
45f82e
+			if (0 == pWrkrData->pData->busyRetryInterval) {
45f82e
+				pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
45f82e
+				ABORT_FINALIZE(RS_RET_ERR);
45f82e
 			}
45f82e
+			add_pod_metadata = 0; /* do not cache so that we can retry */
45f82e
+			iRet = RS_RET_OK;
45f82e
+		} else if(iRet != RS_RET_OK) {
45f82e
+			/* hard error - something the admin needs to fix e.g. network, config, auth */
45f82e
 			json_object_put(jReply);
45f82e
 			jReply = NULL;
45f82e
+			STATSCOUNTER_INC(pWrkrData->podMetadataError, pWrkrData->mutPodMetadataError);
45f82e
 			pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
45f82e
 			FINALIZE;
45f82e
+		} else {
45f82e
+			STATSCOUNTER_INC(pWrkrData->podMetadataSuccess, pWrkrData->mutPodMetadataSuccess);
45f82e
 		}
45f82e
 
45f82e
 		jo = json_object_new_object();
45f82e
@@ -1435,11 +1655,9 @@ CODESTARTdoAction
45f82e
 			json_object_object_add(jo, "container_id", json_object_get(jo2));
45f82e
 		json_object_object_add(jMetadata, "docker", jo);
45f82e
 
45f82e
-		hashtable_insert(pWrkrData->pData->cache->mdHt, mdKey, jMetadata);
45f82e
-		mdKey = NULL;
45f82e
-		if(jNsMeta && add_ns_metadata) {
45f82e
-			hashtable_insert(pWrkrData->pData->cache->nsHt, strdup(ns), jNsMeta);
45f82e
-			ns = NULL;
45f82e
+		if (add_pod_metadata) {
45f82e
+			hashtable_insert(pWrkrData->pData->cache->mdHt, mdKey, jMetadata);
45f82e
+			mdKey = NULL;
45f82e
 		}
45f82e
 	}
45f82e
 
45f82e
@@ -1450,6 +1668,11 @@ CODESTARTdoAction
45f82e
 	 * outside of the cache lock
45f82e
 	 */
45f82e
 	jMetadataCopy = json_tokener_parse(json_object_get_string(jMetadata));
45f82e
+	if (!add_pod_metadata) {
45f82e
+		/* jMetadata object was created from scratch and not cached */
45f82e
+		json_object_put(jMetadata);
45f82e
+		jMetadata = NULL;
45f82e
+	}
45f82e
 	pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
45f82e
 	/* the +1 is there to skip the leading '$' */
45f82e
 	msgAddJSON(pMsg, (uchar *) pWrkrData->pData->dstMetadataPath + 1, jMetadataCopy, 0, 0);
45f82e
@@ -1470,7 +1693,9 @@ BEGINmodExit
45f82e
 CODESTARTmodExit
45f82e
 	curl_global_cleanup();
45f82e
 
45f82e
+	objRelease(datetime, CORE_COMPONENT);
45f82e
 	objRelease(regexp, LM_REGEXP_FILENAME);
45f82e
+	objRelease(statsobj, CORE_COMPONENT);
45f82e
 ENDmodExit
45f82e
 
45f82e
 
45f82e
@@ -1489,8 +1714,9 @@ CODESTARTmodInit
45f82e
 	*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
45f82e
 CODEmodInit_QueryRegCFSLineHdlr
45f82e
 	DBGPRINTF("mmkubernetes: module compiled with rsyslog version %s.\n", VERSION);
45f82e
+	CHKiRet(objUse(statsobj, CORE_COMPONENT));
45f82e
 	CHKiRet(objUse(regexp, LM_REGEXP_FILENAME));
45f82e
-
45f82e
+	CHKiRet(objUse(datetime, CORE_COMPONENT));
45f82e
 	/* CURL_GLOBAL_ALL initializes more than is needed but the
45f82e
 	 * libcurl documentation discourages use of other values
45f82e
 	 */