Blob Blame History Raw
From 3987cd929d859f900318b393133c3bdde8dfffd5 Mon Sep 17 00:00:00 2001
From: Rich Megginson <rmeggins@redhat.com>
Date: Tue, 28 Aug 2018 12:44:23 -0600
Subject: [PATCH] mmkubertnetes: action fails preparation cycle if kubernetes
 API destroys resource during bootup sequence

The plugin was not handling 404 Not Found correctly when looking
up pods and namespaces.  In this case, we assume the pod/namespace
was deleted, annotate the record with whatever metadata we have,
and cache the fact that the pod/namespace is missing so we don't
attempt to look it up again.
In addition, the plugin was not handling error 429 Busy correctly.
In this case, it should also annotate the record with whatever
metadata it has, and _not_ cache anything.  By default the plugin
will retry every 5 seconds to connect to Kubernetes.  This
behavior is controlled by the new config param `busyretryinterval`.
This commit also adds impstats counters so that admins can
view the state of the plugin to see if the lookups are working
or are returning errors.  The stats are reported per-instance
or per-action to facilitate using multiple different actions
for different Kubernetes servers.
This commit also adds support for client cert auth to
Kubernetes via the two new config params `tls.mycert` and
`tls.myprivkey`.
---
 contrib/mmkubernetes/mmkubernetes.c | 296 ++++++++++++++++++++++++----
 7 files changed, 160 insertions(+), 36 deletions(-)

diff --git a/contrib/mmkubernetes/mmkubernetes.c b/contrib/mmkubernetes/mmkubernetes.c
index 422cb2577..5bf5b049d 100644
--- a/contrib/mmkubernetes/mmkubernetes.c
+++ b/contrib/mmkubernetes/mmkubernetes.c
@@ -52,9 +52,12 @@
 #include "syslogd-types.h"
 #include "module-template.h"
 #include "errmsg.h"
+#include "statsobj.h"
 #include "regexp.h"
 #include "hashtable.h"
 #include "srUtils.h"
+#include "unicode-helper.h"
+#include "datetime.h"
 
 /* static data */
 MODULE_TYPE_OUTPUT /* this is technically an output plugin */
@@ -62,6 +65,8 @@ MODULE_TYPE_KEEP /* releasing the module would cause a leak through libcurl */
 MODULE_CNFNAME("mmkubernetes")
 DEF_OMOD_STATIC_DATA
 DEFobjCurrIf(regexp)
+DEFobjCurrIf(statsobj)
+DEFobjCurrIf(datetime)
 
 #define HAVE_LOADSAMPLESFROMSTRING 1
 #if defined(NO_LOADSAMPLESFROMSTRING)
@@ -95,12 +100,14 @@ DEFobjCurrIf(regexp)
 #define DFLT_CONTAINER_NAME "$!CONTAINER_NAME" /* name of variable holding CONTAINER_NAME value */
 #define DFLT_CONTAINER_ID_FULL "$!CONTAINER_ID_FULL" /* name of variable holding CONTAINER_ID_FULL value */
 #define DFLT_KUBERNETES_URL "https://kubernetes.default.svc.cluster.local:443"
+#define DFLT_BUSY_RETRY_INTERVAL 5 /* retry every 5 seconds */
 
 static struct cache_s {
 	const uchar *kbUrl;
 	struct hashtable *mdHt;
 	struct hashtable *nsHt;
 	pthread_mutex_t *cacheMtx;
+	int lastBusyTime;
 } **caches;
 
 typedef struct {
@@ -116,6 +123,8 @@ struct modConfData_s {
 	uchar *srcMetadataPath;	/* where to get data for kubernetes queries */
 	uchar *dstMetadataPath;	/* where to put metadata obtained from kubernetes */
 	uchar *caCertFile; /* File holding the CA cert (+optional chain) of CA that issued the Kubernetes server cert */
+	uchar *myCertFile; /* File holding cert corresponding to private key used for client cert auth */
+	uchar *myPrivKeyFile; /* File holding private key corresponding to cert used for client cert auth */
 	sbool allowUnsignedCerts; /* For testing/debugging - do not check for CA certs (CURLOPT_SSL_VERIFYPEER FALSE) */
 	uchar *token; /* The token value to use to authenticate to Kubernetes - takes precedence over tokenFile */
 	uchar *tokenFile; /* The file whose contents is the token value to use to authenticate to Kubernetes */
@@ -127,6 +136,7 @@ struct modConfData_s {
 	uchar *fnRulebase; /* lognorm rulebase filename for container log filename match */
 	char *contRules; /* lognorm rules for CONTAINER_NAME value match */
 	uchar *contRulebase; /* lognorm rulebase filename for CONTAINER_NAME value match */
+	int busyRetryInterval; /* how to handle 429 response - 0 means error, non-zero means retry every N seconds */
 };
 
 /* action (instance) configuration data */
@@ -135,6 +145,8 @@ typedef struct _instanceData {
 	msgPropDescr_t *srcMetadataDescr;	/* where to get data for kubernetes queries */
 	uchar *dstMetadataPath;	/* where to put metadata obtained from kubernetes */
 	uchar *caCertFile; /* File holding the CA cert (+optional chain) of CA that issued the Kubernetes server cert */
+	uchar *myCertFile; /* File holding cert corresponding to private key used for client cert auth */
+	uchar *myPrivKeyFile; /* File holding private key corresponding to cert used for client cert auth */
 	sbool allowUnsignedCerts; /* For testing/debugging - do not check for CA certs (CURLOPT_SSL_VERIFYPEER FALSE) */
 	uchar *token; /* The token value to use to authenticate to Kubernetes - takes precedence over tokenFile */
 	uchar *tokenFile; /* The file whose contents is the token value to use to authenticate to Kubernetes */
@@ -151,6 +163,7 @@ typedef struct _instanceData {
 	msgPropDescr_t *contNameDescr; /* CONTAINER_NAME field */
 	msgPropDescr_t *contIdFullDescr; /* CONTAINER_ID_FULL field */
 	struct cache_s *cache;
+	int busyRetryInterval; /* how to handle 429 response - 0 means error, non-zero means retry every N seconds */
 } instanceData;
 
 typedef struct wrkrInstanceData {
@@ -159,6 +172,16 @@ typedef struct wrkrInstanceData {
 	struct curl_slist *curlHdr;
 	char *curlRply;
 	size_t curlRplyLen;
+	statsobj_t *stats; /* stats for this instance */
+	STATSCOUNTER_DEF(k8sRecordSeen, mutK8sRecordSeen)
+	STATSCOUNTER_DEF(namespaceMetadataSuccess, mutNamespaceMetadataSuccess)
+	STATSCOUNTER_DEF(namespaceMetadataNotFound, mutNamespaceMetadataNotFound)
+	STATSCOUNTER_DEF(namespaceMetadataBusy, mutNamespaceMetadataBusy)
+	STATSCOUNTER_DEF(namespaceMetadataError, mutNamespaceMetadataError)
+	STATSCOUNTER_DEF(podMetadataSuccess, mutPodMetadataSuccess)
+	STATSCOUNTER_DEF(podMetadataNotFound, mutPodMetadataNotFound)
+	STATSCOUNTER_DEF(podMetadataBusy, mutPodMetadataBusy)
+	STATSCOUNTER_DEF(podMetadataError, mutPodMetadataError)
 } wrkrInstanceData_t;
 
 /* module parameters (v6 config format) */
@@ -167,6 +190,8 @@ static struct cnfparamdescr modpdescr[] = {
 	{ "srcmetadatapath", eCmdHdlrString, 0 },
 	{ "dstmetadatapath", eCmdHdlrString, 0 },
 	{ "tls.cacert", eCmdHdlrString, 0 },
+	{ "tls.mycert", eCmdHdlrString, 0 },
+	{ "tls.myprivkey", eCmdHdlrString, 0 },
 	{ "allowunsignedcerts", eCmdHdlrBinary, 0 },
 	{ "token", eCmdHdlrString, 0 },
 	{ "tokenfile", eCmdHdlrString, 0 },
@@ -174,7 +199,8 @@ static struct cnfparamdescr modpdescr[] = {
 	{ "de_dot", eCmdHdlrBinary, 0 },
 	{ "de_dot_separator", eCmdHdlrString, 0 },
 	{ "filenamerulebase", eCmdHdlrString, 0 },
-	{ "containerrulebase", eCmdHdlrString, 0 }
+	{ "containerrulebase", eCmdHdlrString, 0 },
+	{ "busyretryinterval", eCmdHdlrInt, 0 }
 #if HAVE_LOADSAMPLESFROMSTRING == 1
 	,
 	{ "filenamerules", eCmdHdlrArray, 0 },
@@ -193,6 +219,8 @@ static struct cnfparamdescr actpdescr[] = {
 	{ "srcmetadatapath", eCmdHdlrString, 0 },
 	{ "dstmetadatapath", eCmdHdlrString, 0 },
 	{ "tls.cacert", eCmdHdlrString, 0 },
+	{ "tls.mycert", eCmdHdlrString, 0 },
+	{ "tls.myprivkey", eCmdHdlrString, 0 },
 	{ "allowunsignedcerts", eCmdHdlrBinary, 0 },
 	{ "token", eCmdHdlrString, 0 },
 	{ "tokenfile", eCmdHdlrString, 0 },
@@ -200,7 +228,8 @@ static struct cnfparamdescr actpdescr[] = {
 	{ "de_dot", eCmdHdlrBinary, 0 },
 	{ "de_dot_separator", eCmdHdlrString, 0 },
 	{ "filenamerulebase", eCmdHdlrString, 0 },
-	{ "containerrulebase", eCmdHdlrString, 0 }
+	{ "containerrulebase", eCmdHdlrString, 0 },
+	{ "busyretryinterval", eCmdHdlrInt, 0 }
 #if HAVE_LOADSAMPLESFROMSTRING == 1
 	,
 	{ "filenamerules", eCmdHdlrArray, 0 },
@@ -493,8 +522,9 @@ ENDbeginCnfLoad
 BEGINsetModCnf
 	struct cnfparamvals *pvals = NULL;
 	int i;
-	FILE *fp;
+	FILE *fp = NULL;
 	int ret;
+	char errStr[1024];
 CODESTARTsetModCnf
 	pvals = nvlstGetParams(lst, &modpblk, NULL);
 	if(pvals == NULL) {
@@ -509,6 +539,7 @@ CODESTARTsetModCnf
 	}
 
 	loadModConf->de_dot = DFLT_DE_DOT;
+	loadModConf->busyRetryInterval = DFLT_BUSY_RETRY_INTERVAL;
 	for(i = 0 ; i < modpblk.nParams ; ++i) {
 		if(!pvals[i].bUsed) {
 			continue;
@@ -528,15 +559,42 @@ CODESTARTsetModCnf
 			loadModConf->caCertFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
 			fp = fopen((const char*)loadModConf->caCertFile, "r");
 			if(fp == NULL) {
-				char errStr[1024];
 				rs_strerror_r(errno, errStr, sizeof(errStr));
 				iRet = RS_RET_NO_FILE_ACCESS;
 				LogError(0, iRet,
-						"error: certificate file %s couldn't be accessed: %s\n",
+						"error: 'tls.cacert' file %s couldn't be accessed: %s\n",
 						loadModConf->caCertFile, errStr);
 				ABORT_FINALIZE(iRet);
 			} else {
 				fclose(fp);
+				fp = NULL;
+			}
+		} else if(!strcmp(modpblk.descr[i].name, "tls.mycert")) {
+			free(loadModConf->myCertFile);
+			loadModConf->myCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+			fp = fopen((const char*)loadModConf->myCertFile, "r");
+			if(fp == NULL) {
+				rs_strerror_r(errno, errStr, sizeof(errStr));
+				iRet = RS_RET_NO_FILE_ACCESS;
+				LogError(0, iRet,
+						"error: 'tls.mycert' file %s couldn't be accessed: %s\n",
+						loadModConf->myCertFile, errStr);
+			} else {
+				fclose(fp);
+				fp = NULL;
+			}
+		} else if(!strcmp(modpblk.descr[i].name, "tls.myprivkey")) {
+			loadModConf->myPrivKeyFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+			fp = fopen((const char*)loadModConf->myPrivKeyFile, "r");
+			if(fp == NULL) {
+				rs_strerror_r(errno, errStr, sizeof(errStr));
+				iRet = RS_RET_NO_FILE_ACCESS;
+				LogError(0, iRet,
+						"error: 'tls.myprivkey' file %s couldn't be accessed: %s\n",
+						loadModConf->myPrivKeyFile, errStr);
+			} else {
+				fclose(fp);
+				fp = NULL;
 			}
 		} else if(!strcmp(modpblk.descr[i].name, "allowunsignedcerts")) {
 			loadModConf->allowUnsignedCerts = pvals[i].val.d.n;
@@ -548,7 +606,6 @@ CODESTARTsetModCnf
 			loadModConf->tokenFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
 			fp = fopen((const char*)loadModConf->tokenFile, "r");
 			if(fp == NULL) {
-				char errStr[1024];
 				rs_strerror_r(errno, errStr, sizeof(errStr));
 				iRet = RS_RET_NO_FILE_ACCESS;
 				LogError(0, iRet,
@@ -557,6 +614,7 @@ CODESTARTsetModCnf
 				ABORT_FINALIZE(iRet);
 			} else {
 				fclose(fp);
+				fp = NULL;
 			}
 		} else if(!strcmp(modpblk.descr[i].name, "annotation_match")) {
 			free_annotationmatch(&loadModConf->annotation_match);
@@ -577,7 +635,6 @@ CODESTARTsetModCnf
 			loadModConf->fnRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
 			fp = fopen((const char*)loadModConf->fnRulebase, "r");
 			if(fp == NULL) {
-				char errStr[1024];
 				rs_strerror_r(errno, errStr, sizeof(errStr));
 				iRet = RS_RET_NO_FILE_ACCESS;
 				LogError(0, iRet,
@@ -586,6 +643,7 @@ CODESTARTsetModCnf
 				ABORT_FINALIZE(iRet);
 			} else {
 				fclose(fp);
+				fp = NULL;
 			}
 #if HAVE_LOADSAMPLESFROMSTRING == 1
 		} else if(!strcmp(modpblk.descr[i].name, "containerrules")) {
@@ -597,7 +655,6 @@ CODESTARTsetModCnf
 			loadModConf->contRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
 			fp = fopen((const char*)loadModConf->contRulebase, "r");
 			if(fp == NULL) {
-				char errStr[1024];
 				rs_strerror_r(errno, errStr, sizeof(errStr));
 				iRet = RS_RET_NO_FILE_ACCESS;
 				LogError(0, iRet,
@@ -606,7 +663,10 @@ CODESTARTsetModCnf
 				ABORT_FINALIZE(iRet);
 			} else {
 				fclose(fp);
+				fp = NULL;
 			}
+		} else if(!strcmp(modpblk.descr[i].name, "busyretryinterval")) {
+			loadModConf->busyRetryInterval = pvals[i].val.d.n;
 		} else {
 			dbgprintf("mmkubernetes: program error, non-handled "
 				"param '%s' in module() block\n", modpblk.descr[i].name);
@@ -650,6 +710,8 @@ CODESTARTsetModCnf
 	caches = calloc(1, sizeof(struct cache_s *));
 
 finalize_it:
+	if (fp)
+		fclose(fp);
 	if(pvals != NULL)
 		cnfparamvalsDestruct(pvals, &modpblk);
 ENDsetModCnf
@@ -667,6 +729,8 @@ CODESTARTfreeInstance
 	free(pData->srcMetadataDescr);
 	free(pData->dstMetadataPath);
 	free(pData->caCertFile);
+	free(pData->myCertFile);
+	free(pData->myPrivKeyFile);
 	free(pData->token);
 	free(pData->tokenFile);
 	free(pData->fnRules);
@@ -710,6 +774,45 @@ CODESTARTcreateWrkrInstance
 	char *tokenHdr = NULL;
 	FILE *fp = NULL;
 	char *token = NULL;
+	char *statsName = NULL;
+
+	CHKiRet(statsobj.Construct(&(pWrkrData->stats)));
+	if ((-1 == asprintf(&statsName, "mmkubernetes(%s)", pWrkrData->pData->kubernetesUrl)) ||
+		(!statsName)) {
+		ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
+	}
+	CHKiRet(statsobj.SetName(pWrkrData->stats, (uchar *)statsName));
+	free(statsName);
+	statsName = NULL;
+	CHKiRet(statsobj.SetOrigin(pWrkrData->stats, UCHAR_CONSTANT("mmkubernetes")));
+	STATSCOUNTER_INIT(pWrkrData->k8sRecordSeen, pWrkrData->mutK8sRecordSeen);
+	CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("recordseen"),
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->k8sRecordSeen)));
+	STATSCOUNTER_INIT(pWrkrData->namespaceMetadataSuccess, pWrkrData->mutNamespaceMetadataSuccess);
+	CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("namespacemetadatasuccess"),
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->namespaceMetadataSuccess)));
+	STATSCOUNTER_INIT(pWrkrData->namespaceMetadataNotFound, pWrkrData->mutNamespaceMetadataNotFound);
+	CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("namespacemetadatanotfound"),
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->namespaceMetadataNotFound)));
+	STATSCOUNTER_INIT(pWrkrData->namespaceMetadataBusy, pWrkrData->mutNamespaceMetadataBusy);
+	CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("namespacemetadatabusy"),
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->namespaceMetadataBusy)));
+	STATSCOUNTER_INIT(pWrkrData->namespaceMetadataError, pWrkrData->mutNamespaceMetadataError);
+	CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("namespacemetadataerror"),
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->namespaceMetadataError)));
+	STATSCOUNTER_INIT(pWrkrData->podMetadataSuccess, pWrkrData->mutPodMetadataSuccess);
+	CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("podmetadatasuccess"),
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->podMetadataSuccess)));
+	STATSCOUNTER_INIT(pWrkrData->podMetadataNotFound, pWrkrData->mutPodMetadataNotFound);
+	CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("podmetadatanotfound"),
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->podMetadataNotFound)));
+	STATSCOUNTER_INIT(pWrkrData->podMetadataBusy, pWrkrData->mutPodMetadataBusy);
+	CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("podmetadatabusy"),
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->podMetadataBusy)));
+	STATSCOUNTER_INIT(pWrkrData->podMetadataError, pWrkrData->mutPodMetadataError);
+	CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("podmetadataerror"),
+		ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->podMetadataError)));
+	CHKiRet(statsobj.ConstructFinalize(pWrkrData->stats));
 
 	hdr = curl_slist_append(hdr, "Content-Type: text/json; charset=utf-8");
 	if (pWrkrData->pData->token) {
@@ -749,12 +852,20 @@ CODESTARTcreateWrkrInstance
 	curl_easy_setopt(ctx, CURLOPT_WRITEDATA, pWrkrData);
 	if(pWrkrData->pData->caCertFile)
 		curl_easy_setopt(ctx, CURLOPT_CAINFO, pWrkrData->pData->caCertFile);
+	if(pWrkrData->pData->myCertFile)
+		curl_easy_setopt(ctx, CURLOPT_SSLCERT, pWrkrData->pData->myCertFile);
+	if(pWrkrData->pData->myPrivKeyFile)
+		curl_easy_setopt(ctx, CURLOPT_SSLKEY, pWrkrData->pData->myPrivKeyFile);
 	if(pWrkrData->pData->allowUnsignedCerts)
 		curl_easy_setopt(ctx, CURLOPT_SSL_VERIFYPEER, 0);
 
 	pWrkrData->curlCtx = ctx;
 finalize_it:
 	free(token);
+	free(statsName);
+	if ((iRet != RS_RET_OK) && pWrkrData->stats) {
+		statsobj.Destruct(&(pWrkrData->stats));
+	}
 	if (fp) {
 		fclose(fp);
 	}
@@ -765,6 +876,7 @@ BEGINfreeWrkrInstance
 CODESTARTfreeWrkrInstance
 	curl_easy_cleanup(pWrkrData->curlCtx);
 	curl_slist_free_all(pWrkrData->curlHdr);
+	statsobj.Destruct(&(pWrkrData->stats));
 ENDfreeWrkrInstance
 
 
@@ -790,6 +902,8 @@ cacheNew(const uchar *const url)
 		key_equals_string, hashtable_json_object_put);
 	cache->nsHt = create_hashtable(100, hash_from_string,
 		key_equals_string, hashtable_json_object_put);
+	dbgprintf("mmkubernetes: created cache mdht [%p] nsht [%p]\n",
+			cache->mdHt, cache->nsHt);
 	cache->cacheMtx = malloc(sizeof(pthread_mutex_t));
 	if (!cache->mdHt || !cache->nsHt || !cache->cacheMtx) {
 		free (cache);
@@ -797,6 +911,7 @@ cacheNew(const uchar *const url)
 		FINALIZE;
 	}
 	pthread_mutex_init(cache->cacheMtx, NULL);
+	cache->lastBusyTime = 0;
 
 finalize_it:
 	return cache;
@@ -816,9 +931,10 @@ static void cacheFree(struct cache_s *cache)
 BEGINnewActInst
 	struct cnfparamvals *pvals = NULL;
 	int i;
-	FILE *fp;
+	FILE *fp = NULL;
 	char *rxstr = NULL;
 	char *srcMetadataPath = NULL;
+	char errStr[1024];
 CODESTARTnewActInst
 	DBGPRINTF("newActInst (mmkubernetes)\n");
 
@@ -840,6 +956,7 @@ CODESTARTnewActInst
 
 	pData->de_dot = loadModConf->de_dot;
 	pData->allowUnsignedCerts = loadModConf->allowUnsignedCerts;
+	pData->busyRetryInterval = loadModConf->busyRetryInterval;
 	for(i = 0 ; i < actpblk.nParams ; ++i) {
 		if(!pvals[i].bUsed) {
 			continue;
@@ -863,7 +980,6 @@ CODESTARTnewActInst
 			pData->caCertFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
 			fp = fopen((const char*)pData->caCertFile, "r");
 			if(fp == NULL) {
-				char errStr[1024];
 				rs_strerror_r(errno, errStr, sizeof(errStr));
 				iRet = RS_RET_NO_FILE_ACCESS;
 				LogError(0, iRet,
@@ -872,6 +988,33 @@ CODESTARTnewActInst
 				ABORT_FINALIZE(iRet);
 			} else {
 				fclose(fp);
+				fp = NULL;
+			}
+		} else if(!strcmp(actpblk.descr[i].name, "tls.mycert")) {
+			pData->myCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+			fp = fopen((const char*)pData->myCertFile, "r");
+			if(fp == NULL) {
+				rs_strerror_r(errno, errStr, sizeof(errStr));
+				iRet = RS_RET_NO_FILE_ACCESS;
+				LogError(0, iRet,
+						"error: 'tls.mycert' file %s couldn't be accessed: %s\n",
+						pData->myCertFile, errStr);
+			} else {
+				fclose(fp);
+				fp = NULL;
+			}
+		} else if(!strcmp(actpblk.descr[i].name, "tls.myprivkey")) {
+			pData->myPrivKeyFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+			fp = fopen((const char*)pData->myPrivKeyFile, "r");
+			if(fp == NULL) {
+				rs_strerror_r(errno, errStr, sizeof(errStr));
+				iRet = RS_RET_NO_FILE_ACCESS;
+				LogError(0, iRet,
+						"error: 'tls.myprivkey' file %s couldn't be accessed: %s\n",
+						pData->myPrivKeyFile, errStr);
+			} else {
+				fclose(fp);
+				fp = NULL;
 			}
 		} else if(!strcmp(actpblk.descr[i].name, "allowunsignedcerts")) {
 			pData->allowUnsignedCerts = pvals[i].val.d.n;
@@ -883,7 +1026,6 @@ CODESTARTnewActInst
 			pData->tokenFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
 			fp = fopen((const char*)pData->tokenFile, "r");
 			if(fp == NULL) {
-				char errStr[1024];
 				rs_strerror_r(errno, errStr, sizeof(errStr));
 				iRet = RS_RET_NO_FILE_ACCESS;
 				LogError(0, iRet,
@@ -892,6 +1034,7 @@ CODESTARTnewActInst
 				ABORT_FINALIZE(iRet);
 			} else {
 				fclose(fp);
+				fp = NULL;
 			}
 		} else if(!strcmp(actpblk.descr[i].name, "annotation_match")) {
 			free_annotationmatch(&pData->annotation_match);
@@ -912,7 +1055,6 @@ CODESTARTnewActInst
 			pData->fnRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
 			fp = fopen((const char*)pData->fnRulebase, "r");
 			if(fp == NULL) {
-				char errStr[1024];
 				rs_strerror_r(errno, errStr, sizeof(errStr));
 				iRet = RS_RET_NO_FILE_ACCESS;
 				LogError(0, iRet,
@@ -921,6 +1063,7 @@ CODESTARTnewActInst
 				ABORT_FINALIZE(iRet);
 			} else {
 				fclose(fp);
+				fp = NULL;
 			}
 #if HAVE_LOADSAMPLESFROMSTRING == 1
 		} else if(!strcmp(modpblk.descr[i].name, "containerrules")) {
@@ -932,7 +1075,6 @@ CODESTARTnewActInst
 			pData->contRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
 			fp = fopen((const char*)pData->contRulebase, "r");
 			if(fp == NULL) {
-				char errStr[1024];
 				rs_strerror_r(errno, errStr, sizeof(errStr));
 				iRet = RS_RET_NO_FILE_ACCESS;
 				LogError(0, iRet,
@@ -941,7 +1083,10 @@ CODESTARTnewActInst
 				ABORT_FINALIZE(iRet);
 			} else {
 				fclose(fp);
+				fp = NULL;
 			}
+		} else if(!strcmp(actpblk.descr[i].name, "busyretryinterval")) {
+			pData->busyRetryInterval = pvals[i].val.d.n;
 		} else {
 			dbgprintf("mmkubernetes: program error, non-handled "
 				"param '%s' in action() block\n", actpblk.descr[i].name);
@@ -982,6 +1127,10 @@ CODESTARTnewActInst
 		pData->dstMetadataPath = (uchar *) strdup((char *) loadModConf->dstMetadataPath);
 	if(pData->caCertFile == NULL && loadModConf->caCertFile)
 		pData->caCertFile = (uchar *) strdup((char *) loadModConf->caCertFile);
+	if(pData->myCertFile == NULL && loadModConf->myCertFile)
+		pData->myCertFile = (uchar *) strdup((char *) loadModConf->myCertFile);
+	if(pData->myPrivKeyFile == NULL && loadModConf->myPrivKeyFile)
+		pData->myPrivKeyFile = (uchar *) strdup((char *) loadModConf->myPrivKeyFile);
 	if(pData->token == NULL && loadModConf->token)
 		pData->token = (uchar *) strdup((char *) loadModConf->token);
 	if(pData->tokenFile == NULL && loadModConf->tokenFile)
@@ -1018,6 +1167,8 @@ CODESTARTnewActInst
 CODE_STD_FINALIZERnewActInst
 	if(pvals != NULL)
 		cnfparamvalsDestruct(pvals, &actpblk);
+	if(fp)
+		fclose(fp);
 	free(rxstr);
 	free(srcMetadataPath);
 ENDnewActInst
@@ -1061,6 +1212,8 @@ CODESTARTfreeCnf
 	free(pModConf->srcMetadataPath);
 	free(pModConf->dstMetadataPath);
 	free(pModConf->caCertFile);
+	free(pModConf->myCertFile);
+	free(pModConf->myPrivKeyFile);
 	free(pModConf->token);
 	free(pModConf->tokenFile);
 	free(pModConf->de_dot_separator);
@@ -1069,8 +1222,11 @@ CODESTARTfreeCnf
 	free(pModConf->contRules);
 	free(pModConf->contRulebase);
 	free_annotationmatch(&pModConf->annotation_match);
-	for(i = 0; caches[i] != NULL; i++)
+	for(i = 0; caches[i] != NULL; i++) {
+		dbgprintf("mmkubernetes: freeing cache [%d] mdht [%p] nsht [%p]\n",
+				i, caches[i]->mdHt, caches[i]->nsHt);
 		cacheFree(caches[i]);
+	}
 	free(caches);
 ENDfreeCnf
 
@@ -1082,6 +1238,8 @@ CODESTARTdbgPrintInstInfo
 	dbgprintf("\tsrcMetadataPath='%s'\n", pData->srcMetadataDescr->name);
 	dbgprintf("\tdstMetadataPath='%s'\n", pData->dstMetadataPath);
 	dbgprintf("\ttls.cacert='%s'\n", pData->caCertFile);
+	dbgprintf("\ttls.mycert='%s'\n", pData->myCertFile);
+	dbgprintf("\ttls.myprivkey='%s'\n", pData->myPrivKeyFile);
 	dbgprintf("\tallowUnsignedCerts='%d'\n", pData->allowUnsignedCerts);
 	dbgprintf("\ttoken='%s'\n", pData->token);
 	dbgprintf("\ttokenFile='%s'\n", pData->tokenFile);
@@ -1093,6 +1251,7 @@ CODESTARTdbgPrintInstInfo
 	dbgprintf("\tfilenamerules='%s'\n", pData->fnRules);
 	dbgprintf("\tcontainerrules='%s'\n", pData->contRules);
 #endif
+	dbgprintf("\tbusyretryinterval='%d'\n", pData->busyRetryInterval);
 ENDdbgPrintInstInfo
 
 
@@ -1206,6 +1365,24 @@ queryKB(wrkrInstanceData_t *pWrkrData, char *url, struct json_object **rply)
 	struct json_object *jo;
 	long resp_code = 400;
 
+	if (pWrkrData->pData->cache->lastBusyTime) {
+		time_t now;
+		datetime.GetTime(&now);
+		now -= pWrkrData->pData->cache->lastBusyTime;
+		if (now < pWrkrData->pData->busyRetryInterval) {
+			LogMsg(0, RS_RET_RETRY, LOG_DEBUG,
+				"mmkubernetes: Waited [%ld] of [%d] seconds for the requested url [%s]\n",
+				now, pWrkrData->pData->busyRetryInterval, url);
+			ABORT_FINALIZE(RS_RET_RETRY);
+		} else {
+			LogMsg(0, RS_RET_OK, LOG_DEBUG,
+				"mmkubernetes: Cleared busy status after [%d] seconds - "
+				"will retry the requested url [%s]\n",
+				pWrkrData->pData->busyRetryInterval, url);
+			pWrkrData->pData->cache->lastBusyTime = 0;
+		}
+	}
+
 	/* query kubernetes for pod info */
 	ccode = curl_easy_setopt(pWrkrData->curlCtx, CURLOPT_URL, url);
 	if(ccode != CURLE_OK)
@@ -1238,17 +1415,23 @@ queryKB(wrkrInstanceData_t *pWrkrData, char *url, struct json_object **rply)
 		ABORT_FINALIZE(RS_RET_ERR);
 	}
 	if(resp_code == 404) {
-		LogMsg(0, RS_RET_ERR, LOG_ERR,
+		LogMsg(0, RS_RET_NOT_FOUND, LOG_INFO,
 			      "mmkubernetes: Not Found: the resource does not exist at url [%s]\n",
 			      url);
-		ABORT_FINALIZE(RS_RET_ERR);
+		ABORT_FINALIZE(RS_RET_NOT_FOUND);
 	}
 	if(resp_code == 429) {
-		LogMsg(0, RS_RET_ERR, LOG_ERR,
+		if (pWrkrData->pData->busyRetryInterval) {
+			time_t now;
+			datetime.GetTime(&now);
+			pWrkrData->pData->cache->lastBusyTime = now;
+		}
+
+		LogMsg(0, RS_RET_RETRY, LOG_INFO,
 			      "mmkubernetes: Too Many Requests: the server is too heavily loaded "
 			      "to provide the data for the requested url [%s]\n",
 			      url);
-		ABORT_FINALIZE(RS_RET_ERR);
+		ABORT_FINALIZE(RS_RET_RETRY);
 	}
 	if(resp_code != 200) {
 		LogMsg(0, RS_RET_ERR, LOG_ERR,
@@ -1299,12 +1482,14 @@ BEGINdoAction
 	char *mdKey = NULL;
 	struct json_object *jMetadata = NULL, *jMetadataCopy = NULL, *jMsgMeta = NULL,
 			*jo = NULL;
-	int add_ns_metadata = 0;
+	int add_pod_metadata = 1;
 CODESTARTdoAction
 	CHKiRet_Hdlr(extractMsgMetadata(pMsg, pWrkrData->pData, &jMsgMeta)) {
 		ABORT_FINALIZE((iRet == RS_RET_NOT_FOUND) ? RS_RET_OK : iRet);
 	}
 
+	STATSCOUNTER_INC(pWrkrData->k8sRecordSeen, pWrkrData->mutK8sRecordSeen);
+
 	if (fjson_object_object_get_ex(jMsgMeta, "pod_name", &jo))
 		podName = json_object_get_string(jo);
 	if (fjson_object_object_get_ex(jMsgMeta, "namespace_name", &jo))
@@ -1347,28 +1532,49 @@ CODESTARTdoAction
 			}
 			iRet = queryKB(pWrkrData, url, &jReply);
 			free(url);
-			/* todo: implement support for the .orphaned namespace */
-			if (iRet != RS_RET_OK) {
+			if (iRet == RS_RET_NOT_FOUND) {
+				/* negative cache namespace - make a dummy empty namespace metadata object */
+				jNsMeta = json_object_new_object();
+				STATSCOUNTER_INC(pWrkrData->namespaceMetadataNotFound,
+						 pWrkrData->mutNamespaceMetadataNotFound);
+			} else if (iRet == RS_RET_RETRY) {
+				/* server is busy - retry or error */
+				STATSCOUNTER_INC(pWrkrData->namespaceMetadataBusy,
+						 pWrkrData->mutNamespaceMetadataBusy);
+				if (0 == pWrkrData->pData->busyRetryInterval) {
+					pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
+					ABORT_FINALIZE(RS_RET_ERR);
+				}
+				add_pod_metadata = 0; /* don't cache pod metadata either - retry both */
+			} else if (iRet != RS_RET_OK) {
+				/* hard error - something the admin needs to fix e.g. network, config, auth */
 				json_object_put(jReply);
 				jReply = NULL;
+				STATSCOUNTER_INC(pWrkrData->namespaceMetadataError,
+						 pWrkrData->mutNamespaceMetadataError);
 				pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
 				FINALIZE;
-			}
-
-			if(fjson_object_object_get_ex(jReply, "metadata", &jNsMeta)) {
+			} else if (fjson_object_object_get_ex(jReply, "metadata", &jNsMeta)) {
 				jNsMeta = json_object_get(jNsMeta);
 				parse_labels_annotations(jNsMeta, &pWrkrData->pData->annotation_match,
 					pWrkrData->pData->de_dot,
 					(const char *)pWrkrData->pData->de_dot_separator,
 					pWrkrData->pData->de_dot_separator_len);
-				add_ns_metadata = 1;
+				STATSCOUNTER_INC(pWrkrData->namespaceMetadataSuccess,
+						 pWrkrData->mutNamespaceMetadataSuccess);
 			} else {
 				/* namespace with no metadata??? */
 				LogMsg(0, RS_RET_ERR, LOG_INFO,
 					      "mmkubernetes: namespace [%s] has no metadata!\n", ns);
-				jNsMeta = NULL;
+				/* negative cache namespace - make a dummy empty namespace metadata object */
+				jNsMeta = json_object_new_object();
+				STATSCOUNTER_INC(pWrkrData->namespaceMetadataSuccess,
+						 pWrkrData->mutNamespaceMetadataSuccess);
 			}
 
+			if(jNsMeta) {
+				hashtable_insert(pWrkrData->pData->cache->nsHt, strdup(ns), jNsMeta);
+			}
 			json_object_put(jReply);
 			jReply = NULL;
 		}
@@ -1381,14 +1587,28 @@ CODESTARTdoAction
 		}
 		iRet = queryKB(pWrkrData, url, &jReply);
 		free(url);
-		if(iRet != RS_RET_OK) {
-			if(jNsMeta && add_ns_metadata) {
-				hashtable_insert(pWrkrData->pData->cache->nsHt, strdup(ns), jNsMeta);
+		if (iRet == RS_RET_NOT_FOUND) {
+			/* negative cache pod - make a dummy empty pod metadata object */
+			iRet = RS_RET_OK;
+			STATSCOUNTER_INC(pWrkrData->podMetadataNotFound, pWrkrData->mutPodMetadataNotFound);
+		} else if (iRet == RS_RET_RETRY) {
+			/* server is busy - retry or error */
+			STATSCOUNTER_INC(pWrkrData->podMetadataBusy, pWrkrData->mutPodMetadataBusy);
+			if (0 == pWrkrData->pData->busyRetryInterval) {
+				pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
+				ABORT_FINALIZE(RS_RET_ERR);
 			}
+			add_pod_metadata = 0; /* do not cache so that we can retry */
+			iRet = RS_RET_OK;
+		} else if(iRet != RS_RET_OK) {
+			/* hard error - something the admin needs to fix e.g. network, config, auth */
 			json_object_put(jReply);
 			jReply = NULL;
+			STATSCOUNTER_INC(pWrkrData->podMetadataError, pWrkrData->mutPodMetadataError);
 			pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
 			FINALIZE;
+		} else {
+			STATSCOUNTER_INC(pWrkrData->podMetadataSuccess, pWrkrData->mutPodMetadataSuccess);
 		}
 
 		jo = json_object_new_object();
@@ -1435,11 +1655,9 @@ CODESTARTdoAction
 			json_object_object_add(jo, "container_id", json_object_get(jo2));
 		json_object_object_add(jMetadata, "docker", jo);
 
-		hashtable_insert(pWrkrData->pData->cache->mdHt, mdKey, jMetadata);
-		mdKey = NULL;
-		if(jNsMeta && add_ns_metadata) {
-			hashtable_insert(pWrkrData->pData->cache->nsHt, strdup(ns), jNsMeta);
-			ns = NULL;
+		if (add_pod_metadata) {
+			hashtable_insert(pWrkrData->pData->cache->mdHt, mdKey, jMetadata);
+			mdKey = NULL;
 		}
 	}
 
@@ -1450,6 +1668,11 @@ CODESTARTdoAction
 	 * outside of the cache lock
 	 */
 	jMetadataCopy = json_tokener_parse(json_object_get_string(jMetadata));
+	if (!add_pod_metadata) {
+		/* jMetadata object was created from scratch and not cached */
+		json_object_put(jMetadata);
+		jMetadata = NULL;
+	}
 	pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
 	/* the +1 is there to skip the leading '$' */
 	msgAddJSON(pMsg, (uchar *) pWrkrData->pData->dstMetadataPath + 1, jMetadataCopy, 0, 0);
@@ -1470,7 +1693,9 @@ BEGINmodExit
 CODESTARTmodExit
 	curl_global_cleanup();
 
+	objRelease(datetime, CORE_COMPONENT);
 	objRelease(regexp, LM_REGEXP_FILENAME);
+	objRelease(statsobj, CORE_COMPONENT);
 ENDmodExit
 
 
@@ -1489,8 +1714,9 @@ CODESTARTmodInit
 	*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
 CODEmodInit_QueryRegCFSLineHdlr
 	DBGPRINTF("mmkubernetes: module compiled with rsyslog version %s.\n", VERSION);
+	CHKiRet(objUse(statsobj, CORE_COMPONENT));
 	CHKiRet(objUse(regexp, LM_REGEXP_FILENAME));
-
+	CHKiRet(objUse(datetime, CORE_COMPONENT));
 	/* CURL_GLOBAL_ALL initializes more than is needed but the
 	 * libcurl documentation discourages use of other values
 	 */