From 3987cd929d859f900318b393133c3bdde8dfffd5 Mon Sep 17 00:00:00 2001 From: Rich Megginson Date: Tue, 28 Aug 2018 12:44:23 -0600 Subject: [PATCH] mmkubertnetes: action fails preparation cycle if kubernetes API destroys resource during bootup sequence The plugin was not handling 404 Not Found correctly when looking up pods and namespaces. In this case, we assume the pod/namespace was deleted, annotate the record with whatever metadata we have, and cache the fact that the pod/namespace is missing so we don't attempt to look it up again. In addition, the plugin was not handling error 429 Busy correctly. In this case, it should also annotate the record with whatever metadata it has, and _not_ cache anything. By default the plugin will retry every 5 seconds to connect to Kubernetes. This behavior is controlled by the new config param `busyretryinterval`. This commit also adds impstats counters so that admins can view the state of the plugin to see if the lookups are working or are returning errors. The stats are reported per-instance or per-action to facilitate using multiple different actions for different Kubernetes servers. This commit also adds support for client cert auth to Kubernetes via the two new config params `tls.mycert` and `tls.myprivkey`. --- contrib/mmkubernetes/mmkubernetes.c | 296 ++++++++++++++++++++++++---- 7 files changed, 160 insertions(+), 36 deletions(-) diff --git a/contrib/mmkubernetes/mmkubernetes.c b/contrib/mmkubernetes/mmkubernetes.c index 422cb2577..5bf5b049d 100644 --- a/contrib/mmkubernetes/mmkubernetes.c +++ b/contrib/mmkubernetes/mmkubernetes.c @@ -52,9 +52,12 @@ #include "syslogd-types.h" #include "module-template.h" #include "errmsg.h" +#include "statsobj.h" #include "regexp.h" #include "hashtable.h" #include "srUtils.h" +#include "unicode-helper.h" +#include "datetime.h" /* static data */ MODULE_TYPE_OUTPUT /* this is technically an output plugin */ @@ -62,6 +65,8 @@ MODULE_TYPE_KEEP /* releasing the module would cause a leak through libcurl */ MODULE_CNFNAME("mmkubernetes") DEF_OMOD_STATIC_DATA DEFobjCurrIf(regexp) +DEFobjCurrIf(statsobj) +DEFobjCurrIf(datetime) #define HAVE_LOADSAMPLESFROMSTRING 1 #if defined(NO_LOADSAMPLESFROMSTRING) @@ -95,12 +100,14 @@ DEFobjCurrIf(regexp) #define DFLT_CONTAINER_NAME "$!CONTAINER_NAME" /* name of variable holding CONTAINER_NAME value */ #define DFLT_CONTAINER_ID_FULL "$!CONTAINER_ID_FULL" /* name of variable holding CONTAINER_ID_FULL value */ #define DFLT_KUBERNETES_URL "https://kubernetes.default.svc.cluster.local:443" +#define DFLT_BUSY_RETRY_INTERVAL 5 /* retry every 5 seconds */ static struct cache_s { const uchar *kbUrl; struct hashtable *mdHt; struct hashtable *nsHt; pthread_mutex_t *cacheMtx; + int lastBusyTime; } **caches; typedef struct { @@ -116,6 +123,8 @@ struct modConfData_s { uchar *srcMetadataPath; /* where to get data for kubernetes queries */ uchar *dstMetadataPath; /* where to put metadata obtained from kubernetes */ uchar *caCertFile; /* File holding the CA cert (+optional chain) of CA that issued the Kubernetes server cert */ + uchar *myCertFile; /* File holding cert corresponding to private key used for client cert auth */ + uchar *myPrivKeyFile; /* File holding private key corresponding to cert used for client cert auth */ sbool allowUnsignedCerts; /* For testing/debugging - do not check for CA certs (CURLOPT_SSL_VERIFYPEER FALSE) */ uchar *token; /* The token value to use to authenticate to Kubernetes - takes precedence over tokenFile */ uchar *tokenFile; /* The file whose contents is the token value to use to authenticate to Kubernetes */ @@ -127,6 +136,7 @@ struct modConfData_s { uchar *fnRulebase; /* lognorm rulebase filename for container log filename match */ char *contRules; /* lognorm rules for CONTAINER_NAME value match */ uchar *contRulebase; /* lognorm rulebase filename for CONTAINER_NAME value match */ + int busyRetryInterval; /* how to handle 429 response - 0 means error, non-zero means retry every N seconds */ }; /* action (instance) configuration data */ @@ -135,6 +145,8 @@ typedef struct _instanceData { msgPropDescr_t *srcMetadataDescr; /* where to get data for kubernetes queries */ uchar *dstMetadataPath; /* where to put metadata obtained from kubernetes */ uchar *caCertFile; /* File holding the CA cert (+optional chain) of CA that issued the Kubernetes server cert */ + uchar *myCertFile; /* File holding cert corresponding to private key used for client cert auth */ + uchar *myPrivKeyFile; /* File holding private key corresponding to cert used for client cert auth */ sbool allowUnsignedCerts; /* For testing/debugging - do not check for CA certs (CURLOPT_SSL_VERIFYPEER FALSE) */ uchar *token; /* The token value to use to authenticate to Kubernetes - takes precedence over tokenFile */ uchar *tokenFile; /* The file whose contents is the token value to use to authenticate to Kubernetes */ @@ -151,6 +163,7 @@ typedef struct _instanceData { msgPropDescr_t *contNameDescr; /* CONTAINER_NAME field */ msgPropDescr_t *contIdFullDescr; /* CONTAINER_ID_FULL field */ struct cache_s *cache; + int busyRetryInterval; /* how to handle 429 response - 0 means error, non-zero means retry every N seconds */ } instanceData; typedef struct wrkrInstanceData { @@ -159,6 +172,16 @@ typedef struct wrkrInstanceData { struct curl_slist *curlHdr; char *curlRply; size_t curlRplyLen; + statsobj_t *stats; /* stats for this instance */ + STATSCOUNTER_DEF(k8sRecordSeen, mutK8sRecordSeen) + STATSCOUNTER_DEF(namespaceMetadataSuccess, mutNamespaceMetadataSuccess) + STATSCOUNTER_DEF(namespaceMetadataNotFound, mutNamespaceMetadataNotFound) + STATSCOUNTER_DEF(namespaceMetadataBusy, mutNamespaceMetadataBusy) + STATSCOUNTER_DEF(namespaceMetadataError, mutNamespaceMetadataError) + STATSCOUNTER_DEF(podMetadataSuccess, mutPodMetadataSuccess) + STATSCOUNTER_DEF(podMetadataNotFound, mutPodMetadataNotFound) + STATSCOUNTER_DEF(podMetadataBusy, mutPodMetadataBusy) + STATSCOUNTER_DEF(podMetadataError, mutPodMetadataError) } wrkrInstanceData_t; /* module parameters (v6 config format) */ @@ -167,6 +190,8 @@ static struct cnfparamdescr modpdescr[] = { { "srcmetadatapath", eCmdHdlrString, 0 }, { "dstmetadatapath", eCmdHdlrString, 0 }, { "tls.cacert", eCmdHdlrString, 0 }, + { "tls.mycert", eCmdHdlrString, 0 }, + { "tls.myprivkey", eCmdHdlrString, 0 }, { "allowunsignedcerts", eCmdHdlrBinary, 0 }, { "token", eCmdHdlrString, 0 }, { "tokenfile", eCmdHdlrString, 0 }, @@ -174,7 +199,8 @@ static struct cnfparamdescr modpdescr[] = { { "de_dot", eCmdHdlrBinary, 0 }, { "de_dot_separator", eCmdHdlrString, 0 }, { "filenamerulebase", eCmdHdlrString, 0 }, - { "containerrulebase", eCmdHdlrString, 0 } + { "containerrulebase", eCmdHdlrString, 0 }, + { "busyretryinterval", eCmdHdlrInt, 0 } #if HAVE_LOADSAMPLESFROMSTRING == 1 , { "filenamerules", eCmdHdlrArray, 0 }, @@ -193,6 +219,8 @@ static struct cnfparamdescr actpdescr[] = { { "srcmetadatapath", eCmdHdlrString, 0 }, { "dstmetadatapath", eCmdHdlrString, 0 }, { "tls.cacert", eCmdHdlrString, 0 }, + { "tls.mycert", eCmdHdlrString, 0 }, + { "tls.myprivkey", eCmdHdlrString, 0 }, { "allowunsignedcerts", eCmdHdlrBinary, 0 }, { "token", eCmdHdlrString, 0 }, { "tokenfile", eCmdHdlrString, 0 }, @@ -200,7 +228,8 @@ static struct cnfparamdescr actpdescr[] = { { "de_dot", eCmdHdlrBinary, 0 }, { "de_dot_separator", eCmdHdlrString, 0 }, { "filenamerulebase", eCmdHdlrString, 0 }, - { "containerrulebase", eCmdHdlrString, 0 } + { "containerrulebase", eCmdHdlrString, 0 }, + { "busyretryinterval", eCmdHdlrInt, 0 } #if HAVE_LOADSAMPLESFROMSTRING == 1 , { "filenamerules", eCmdHdlrArray, 0 }, @@ -493,8 +522,9 @@ ENDbeginCnfLoad BEGINsetModCnf struct cnfparamvals *pvals = NULL; int i; - FILE *fp; + FILE *fp = NULL; int ret; + char errStr[1024]; CODESTARTsetModCnf pvals = nvlstGetParams(lst, &modpblk, NULL); if(pvals == NULL) { @@ -509,6 +539,7 @@ CODESTARTsetModCnf } loadModConf->de_dot = DFLT_DE_DOT; + loadModConf->busyRetryInterval = DFLT_BUSY_RETRY_INTERVAL; for(i = 0 ; i < modpblk.nParams ; ++i) { if(!pvals[i].bUsed) { continue; @@ -528,15 +559,42 @@ CODESTARTsetModCnf loadModConf->caCertFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); fp = fopen((const char*)loadModConf->caCertFile, "r"); if(fp == NULL) { - char errStr[1024]; rs_strerror_r(errno, errStr, sizeof(errStr)); iRet = RS_RET_NO_FILE_ACCESS; LogError(0, iRet, - "error: certificate file %s couldn't be accessed: %s\n", + "error: 'tls.cacert' file %s couldn't be accessed: %s\n", loadModConf->caCertFile, errStr); ABORT_FINALIZE(iRet); } else { fclose(fp); + fp = NULL; + } + } else if(!strcmp(modpblk.descr[i].name, "tls.mycert")) { + free(loadModConf->myCertFile); + loadModConf->myCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + fp = fopen((const char*)loadModConf->myCertFile, "r"); + if(fp == NULL) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + iRet = RS_RET_NO_FILE_ACCESS; + LogError(0, iRet, + "error: 'tls.mycert' file %s couldn't be accessed: %s\n", + loadModConf->myCertFile, errStr); + } else { + fclose(fp); + fp = NULL; + } + } else if(!strcmp(modpblk.descr[i].name, "tls.myprivkey")) { + loadModConf->myPrivKeyFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + fp = fopen((const char*)loadModConf->myPrivKeyFile, "r"); + if(fp == NULL) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + iRet = RS_RET_NO_FILE_ACCESS; + LogError(0, iRet, + "error: 'tls.myprivkey' file %s couldn't be accessed: %s\n", + loadModConf->myPrivKeyFile, errStr); + } else { + fclose(fp); + fp = NULL; } } else if(!strcmp(modpblk.descr[i].name, "allowunsignedcerts")) { loadModConf->allowUnsignedCerts = pvals[i].val.d.n; @@ -548,7 +606,6 @@ CODESTARTsetModCnf loadModConf->tokenFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); fp = fopen((const char*)loadModConf->tokenFile, "r"); if(fp == NULL) { - char errStr[1024]; rs_strerror_r(errno, errStr, sizeof(errStr)); iRet = RS_RET_NO_FILE_ACCESS; LogError(0, iRet, @@ -557,6 +614,7 @@ CODESTARTsetModCnf ABORT_FINALIZE(iRet); } else { fclose(fp); + fp = NULL; } } else if(!strcmp(modpblk.descr[i].name, "annotation_match")) { free_annotationmatch(&loadModConf->annotation_match); @@ -577,7 +635,6 @@ CODESTARTsetModCnf loadModConf->fnRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); fp = fopen((const char*)loadModConf->fnRulebase, "r"); if(fp == NULL) { - char errStr[1024]; rs_strerror_r(errno, errStr, sizeof(errStr)); iRet = RS_RET_NO_FILE_ACCESS; LogError(0, iRet, @@ -586,6 +643,7 @@ CODESTARTsetModCnf ABORT_FINALIZE(iRet); } else { fclose(fp); + fp = NULL; } #if HAVE_LOADSAMPLESFROMSTRING == 1 } else if(!strcmp(modpblk.descr[i].name, "containerrules")) { @@ -597,7 +655,6 @@ CODESTARTsetModCnf loadModConf->contRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); fp = fopen((const char*)loadModConf->contRulebase, "r"); if(fp == NULL) { - char errStr[1024]; rs_strerror_r(errno, errStr, sizeof(errStr)); iRet = RS_RET_NO_FILE_ACCESS; LogError(0, iRet, @@ -606,7 +663,10 @@ CODESTARTsetModCnf ABORT_FINALIZE(iRet); } else { fclose(fp); + fp = NULL; } + } else if(!strcmp(modpblk.descr[i].name, "busyretryinterval")) { + loadModConf->busyRetryInterval = pvals[i].val.d.n; } else { dbgprintf("mmkubernetes: program error, non-handled " "param '%s' in module() block\n", modpblk.descr[i].name); @@ -650,6 +710,8 @@ CODESTARTsetModCnf caches = calloc(1, sizeof(struct cache_s *)); finalize_it: + if (fp) + fclose(fp); if(pvals != NULL) cnfparamvalsDestruct(pvals, &modpblk); ENDsetModCnf @@ -667,6 +729,8 @@ CODESTARTfreeInstance free(pData->srcMetadataDescr); free(pData->dstMetadataPath); free(pData->caCertFile); + free(pData->myCertFile); + free(pData->myPrivKeyFile); free(pData->token); free(pData->tokenFile); free(pData->fnRules); @@ -710,6 +774,45 @@ CODESTARTcreateWrkrInstance char *tokenHdr = NULL; FILE *fp = NULL; char *token = NULL; + char *statsName = NULL; + + CHKiRet(statsobj.Construct(&(pWrkrData->stats))); + if ((-1 == asprintf(&statsName, "mmkubernetes(%s)", pWrkrData->pData->kubernetesUrl)) || + (!statsName)) { + ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY); + } + CHKiRet(statsobj.SetName(pWrkrData->stats, (uchar *)statsName)); + free(statsName); + statsName = NULL; + CHKiRet(statsobj.SetOrigin(pWrkrData->stats, UCHAR_CONSTANT("mmkubernetes"))); + STATSCOUNTER_INIT(pWrkrData->k8sRecordSeen, pWrkrData->mutK8sRecordSeen); + CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("recordseen"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->k8sRecordSeen))); + STATSCOUNTER_INIT(pWrkrData->namespaceMetadataSuccess, pWrkrData->mutNamespaceMetadataSuccess); + CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("namespacemetadatasuccess"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->namespaceMetadataSuccess))); + STATSCOUNTER_INIT(pWrkrData->namespaceMetadataNotFound, pWrkrData->mutNamespaceMetadataNotFound); + CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("namespacemetadatanotfound"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->namespaceMetadataNotFound))); + STATSCOUNTER_INIT(pWrkrData->namespaceMetadataBusy, pWrkrData->mutNamespaceMetadataBusy); + CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("namespacemetadatabusy"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->namespaceMetadataBusy))); + STATSCOUNTER_INIT(pWrkrData->namespaceMetadataError, pWrkrData->mutNamespaceMetadataError); + CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("namespacemetadataerror"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->namespaceMetadataError))); + STATSCOUNTER_INIT(pWrkrData->podMetadataSuccess, pWrkrData->mutPodMetadataSuccess); + CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("podmetadatasuccess"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->podMetadataSuccess))); + STATSCOUNTER_INIT(pWrkrData->podMetadataNotFound, pWrkrData->mutPodMetadataNotFound); + CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("podmetadatanotfound"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->podMetadataNotFound))); + STATSCOUNTER_INIT(pWrkrData->podMetadataBusy, pWrkrData->mutPodMetadataBusy); + CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("podmetadatabusy"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->podMetadataBusy))); + STATSCOUNTER_INIT(pWrkrData->podMetadataError, pWrkrData->mutPodMetadataError); + CHKiRet(statsobj.AddCounter(pWrkrData->stats, UCHAR_CONSTANT("podmetadataerror"), + ctrType_IntCtr, CTR_FLAG_RESETTABLE, &(pWrkrData->podMetadataError))); + CHKiRet(statsobj.ConstructFinalize(pWrkrData->stats)); hdr = curl_slist_append(hdr, "Content-Type: text/json; charset=utf-8"); if (pWrkrData->pData->token) { @@ -749,12 +852,20 @@ CODESTARTcreateWrkrInstance curl_easy_setopt(ctx, CURLOPT_WRITEDATA, pWrkrData); if(pWrkrData->pData->caCertFile) curl_easy_setopt(ctx, CURLOPT_CAINFO, pWrkrData->pData->caCertFile); + if(pWrkrData->pData->myCertFile) + curl_easy_setopt(ctx, CURLOPT_SSLCERT, pWrkrData->pData->myCertFile); + if(pWrkrData->pData->myPrivKeyFile) + curl_easy_setopt(ctx, CURLOPT_SSLKEY, pWrkrData->pData->myPrivKeyFile); if(pWrkrData->pData->allowUnsignedCerts) curl_easy_setopt(ctx, CURLOPT_SSL_VERIFYPEER, 0); pWrkrData->curlCtx = ctx; finalize_it: free(token); + free(statsName); + if ((iRet != RS_RET_OK) && pWrkrData->stats) { + statsobj.Destruct(&(pWrkrData->stats)); + } if (fp) { fclose(fp); } @@ -765,6 +876,7 @@ BEGINfreeWrkrInstance CODESTARTfreeWrkrInstance curl_easy_cleanup(pWrkrData->curlCtx); curl_slist_free_all(pWrkrData->curlHdr); + statsobj.Destruct(&(pWrkrData->stats)); ENDfreeWrkrInstance @@ -790,6 +902,8 @@ cacheNew(const uchar *const url) key_equals_string, hashtable_json_object_put); cache->nsHt = create_hashtable(100, hash_from_string, key_equals_string, hashtable_json_object_put); + dbgprintf("mmkubernetes: created cache mdht [%p] nsht [%p]\n", + cache->mdHt, cache->nsHt); cache->cacheMtx = malloc(sizeof(pthread_mutex_t)); if (!cache->mdHt || !cache->nsHt || !cache->cacheMtx) { free (cache); @@ -797,6 +911,7 @@ cacheNew(const uchar *const url) FINALIZE; } pthread_mutex_init(cache->cacheMtx, NULL); + cache->lastBusyTime = 0; finalize_it: return cache; @@ -816,9 +931,10 @@ static void cacheFree(struct cache_s *cache) BEGINnewActInst struct cnfparamvals *pvals = NULL; int i; - FILE *fp; + FILE *fp = NULL; char *rxstr = NULL; char *srcMetadataPath = NULL; + char errStr[1024]; CODESTARTnewActInst DBGPRINTF("newActInst (mmkubernetes)\n"); @@ -840,6 +956,7 @@ CODESTARTnewActInst pData->de_dot = loadModConf->de_dot; pData->allowUnsignedCerts = loadModConf->allowUnsignedCerts; + pData->busyRetryInterval = loadModConf->busyRetryInterval; for(i = 0 ; i < actpblk.nParams ; ++i) { if(!pvals[i].bUsed) { continue; @@ -863,7 +980,6 @@ CODESTARTnewActInst pData->caCertFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); fp = fopen((const char*)pData->caCertFile, "r"); if(fp == NULL) { - char errStr[1024]; rs_strerror_r(errno, errStr, sizeof(errStr)); iRet = RS_RET_NO_FILE_ACCESS; LogError(0, iRet, @@ -872,6 +988,33 @@ CODESTARTnewActInst ABORT_FINALIZE(iRet); } else { fclose(fp); + fp = NULL; + } + } else if(!strcmp(actpblk.descr[i].name, "tls.mycert")) { + pData->myCertFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + fp = fopen((const char*)pData->myCertFile, "r"); + if(fp == NULL) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + iRet = RS_RET_NO_FILE_ACCESS; + LogError(0, iRet, + "error: 'tls.mycert' file %s couldn't be accessed: %s\n", + pData->myCertFile, errStr); + } else { + fclose(fp); + fp = NULL; + } + } else if(!strcmp(actpblk.descr[i].name, "tls.myprivkey")) { + pData->myPrivKeyFile = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL); + fp = fopen((const char*)pData->myPrivKeyFile, "r"); + if(fp == NULL) { + rs_strerror_r(errno, errStr, sizeof(errStr)); + iRet = RS_RET_NO_FILE_ACCESS; + LogError(0, iRet, + "error: 'tls.myprivkey' file %s couldn't be accessed: %s\n", + pData->myPrivKeyFile, errStr); + } else { + fclose(fp); + fp = NULL; } } else if(!strcmp(actpblk.descr[i].name, "allowunsignedcerts")) { pData->allowUnsignedCerts = pvals[i].val.d.n; @@ -883,7 +1026,6 @@ CODESTARTnewActInst pData->tokenFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); fp = fopen((const char*)pData->tokenFile, "r"); if(fp == NULL) { - char errStr[1024]; rs_strerror_r(errno, errStr, sizeof(errStr)); iRet = RS_RET_NO_FILE_ACCESS; LogError(0, iRet, @@ -892,6 +1034,7 @@ CODESTARTnewActInst ABORT_FINALIZE(iRet); } else { fclose(fp); + fp = NULL; } } else if(!strcmp(actpblk.descr[i].name, "annotation_match")) { free_annotationmatch(&pData->annotation_match); @@ -912,7 +1055,6 @@ CODESTARTnewActInst pData->fnRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); fp = fopen((const char*)pData->fnRulebase, "r"); if(fp == NULL) { - char errStr[1024]; rs_strerror_r(errno, errStr, sizeof(errStr)); iRet = RS_RET_NO_FILE_ACCESS; LogError(0, iRet, @@ -921,6 +1063,7 @@ CODESTARTnewActInst ABORT_FINALIZE(iRet); } else { fclose(fp); + fp = NULL; } #if HAVE_LOADSAMPLESFROMSTRING == 1 } else if(!strcmp(modpblk.descr[i].name, "containerrules")) { @@ -932,7 +1075,6 @@ CODESTARTnewActInst pData->contRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL); fp = fopen((const char*)pData->contRulebase, "r"); if(fp == NULL) { - char errStr[1024]; rs_strerror_r(errno, errStr, sizeof(errStr)); iRet = RS_RET_NO_FILE_ACCESS; LogError(0, iRet, @@ -941,7 +1083,10 @@ CODESTARTnewActInst ABORT_FINALIZE(iRet); } else { fclose(fp); + fp = NULL; } + } else if(!strcmp(actpblk.descr[i].name, "busyretryinterval")) { + pData->busyRetryInterval = pvals[i].val.d.n; } else { dbgprintf("mmkubernetes: program error, non-handled " "param '%s' in action() block\n", actpblk.descr[i].name); @@ -982,6 +1127,10 @@ CODESTARTnewActInst pData->dstMetadataPath = (uchar *) strdup((char *) loadModConf->dstMetadataPath); if(pData->caCertFile == NULL && loadModConf->caCertFile) pData->caCertFile = (uchar *) strdup((char *) loadModConf->caCertFile); + if(pData->myCertFile == NULL && loadModConf->myCertFile) + pData->myCertFile = (uchar *) strdup((char *) loadModConf->myCertFile); + if(pData->myPrivKeyFile == NULL && loadModConf->myPrivKeyFile) + pData->myPrivKeyFile = (uchar *) strdup((char *) loadModConf->myPrivKeyFile); if(pData->token == NULL && loadModConf->token) pData->token = (uchar *) strdup((char *) loadModConf->token); if(pData->tokenFile == NULL && loadModConf->tokenFile) @@ -1018,6 +1167,8 @@ CODESTARTnewActInst CODE_STD_FINALIZERnewActInst if(pvals != NULL) cnfparamvalsDestruct(pvals, &actpblk); + if(fp) + fclose(fp); free(rxstr); free(srcMetadataPath); ENDnewActInst @@ -1061,6 +1212,8 @@ CODESTARTfreeCnf free(pModConf->srcMetadataPath); free(pModConf->dstMetadataPath); free(pModConf->caCertFile); + free(pModConf->myCertFile); + free(pModConf->myPrivKeyFile); free(pModConf->token); free(pModConf->tokenFile); free(pModConf->de_dot_separator); @@ -1069,8 +1222,11 @@ CODESTARTfreeCnf free(pModConf->contRules); free(pModConf->contRulebase); free_annotationmatch(&pModConf->annotation_match); - for(i = 0; caches[i] != NULL; i++) + for(i = 0; caches[i] != NULL; i++) { + dbgprintf("mmkubernetes: freeing cache [%d] mdht [%p] nsht [%p]\n", + i, caches[i]->mdHt, caches[i]->nsHt); cacheFree(caches[i]); + } free(caches); ENDfreeCnf @@ -1082,6 +1238,8 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tsrcMetadataPath='%s'\n", pData->srcMetadataDescr->name); dbgprintf("\tdstMetadataPath='%s'\n", pData->dstMetadataPath); dbgprintf("\ttls.cacert='%s'\n", pData->caCertFile); + dbgprintf("\ttls.mycert='%s'\n", pData->myCertFile); + dbgprintf("\ttls.myprivkey='%s'\n", pData->myPrivKeyFile); dbgprintf("\tallowUnsignedCerts='%d'\n", pData->allowUnsignedCerts); dbgprintf("\ttoken='%s'\n", pData->token); dbgprintf("\ttokenFile='%s'\n", pData->tokenFile); @@ -1093,6 +1251,7 @@ CODESTARTdbgPrintInstInfo dbgprintf("\tfilenamerules='%s'\n", pData->fnRules); dbgprintf("\tcontainerrules='%s'\n", pData->contRules); #endif + dbgprintf("\tbusyretryinterval='%d'\n", pData->busyRetryInterval); ENDdbgPrintInstInfo @@ -1206,6 +1365,24 @@ queryKB(wrkrInstanceData_t *pWrkrData, char *url, struct json_object **rply) struct json_object *jo; long resp_code = 400; + if (pWrkrData->pData->cache->lastBusyTime) { + time_t now; + datetime.GetTime(&now); + now -= pWrkrData->pData->cache->lastBusyTime; + if (now < pWrkrData->pData->busyRetryInterval) { + LogMsg(0, RS_RET_RETRY, LOG_DEBUG, + "mmkubernetes: Waited [%ld] of [%d] seconds for the requested url [%s]\n", + now, pWrkrData->pData->busyRetryInterval, url); + ABORT_FINALIZE(RS_RET_RETRY); + } else { + LogMsg(0, RS_RET_OK, LOG_DEBUG, + "mmkubernetes: Cleared busy status after [%d] seconds - " + "will retry the requested url [%s]\n", + pWrkrData->pData->busyRetryInterval, url); + pWrkrData->pData->cache->lastBusyTime = 0; + } + } + /* query kubernetes for pod info */ ccode = curl_easy_setopt(pWrkrData->curlCtx, CURLOPT_URL, url); if(ccode != CURLE_OK) @@ -1238,17 +1415,23 @@ queryKB(wrkrInstanceData_t *pWrkrData, char *url, struct json_object **rply) ABORT_FINALIZE(RS_RET_ERR); } if(resp_code == 404) { - LogMsg(0, RS_RET_ERR, LOG_ERR, + LogMsg(0, RS_RET_NOT_FOUND, LOG_INFO, "mmkubernetes: Not Found: the resource does not exist at url [%s]\n", url); - ABORT_FINALIZE(RS_RET_ERR); + ABORT_FINALIZE(RS_RET_NOT_FOUND); } if(resp_code == 429) { - LogMsg(0, RS_RET_ERR, LOG_ERR, + if (pWrkrData->pData->busyRetryInterval) { + time_t now; + datetime.GetTime(&now); + pWrkrData->pData->cache->lastBusyTime = now; + } + + LogMsg(0, RS_RET_RETRY, LOG_INFO, "mmkubernetes: Too Many Requests: the server is too heavily loaded " "to provide the data for the requested url [%s]\n", url); - ABORT_FINALIZE(RS_RET_ERR); + ABORT_FINALIZE(RS_RET_RETRY); } if(resp_code != 200) { LogMsg(0, RS_RET_ERR, LOG_ERR, @@ -1299,12 +1482,14 @@ BEGINdoAction char *mdKey = NULL; struct json_object *jMetadata = NULL, *jMetadataCopy = NULL, *jMsgMeta = NULL, *jo = NULL; - int add_ns_metadata = 0; + int add_pod_metadata = 1; CODESTARTdoAction CHKiRet_Hdlr(extractMsgMetadata(pMsg, pWrkrData->pData, &jMsgMeta)) { ABORT_FINALIZE((iRet == RS_RET_NOT_FOUND) ? RS_RET_OK : iRet); } + STATSCOUNTER_INC(pWrkrData->k8sRecordSeen, pWrkrData->mutK8sRecordSeen); + if (fjson_object_object_get_ex(jMsgMeta, "pod_name", &jo)) podName = json_object_get_string(jo); if (fjson_object_object_get_ex(jMsgMeta, "namespace_name", &jo)) @@ -1347,28 +1532,49 @@ CODESTARTdoAction } iRet = queryKB(pWrkrData, url, &jReply); free(url); - /* todo: implement support for the .orphaned namespace */ - if (iRet != RS_RET_OK) { + if (iRet == RS_RET_NOT_FOUND) { + /* negative cache namespace - make a dummy empty namespace metadata object */ + jNsMeta = json_object_new_object(); + STATSCOUNTER_INC(pWrkrData->namespaceMetadataNotFound, + pWrkrData->mutNamespaceMetadataNotFound); + } else if (iRet == RS_RET_RETRY) { + /* server is busy - retry or error */ + STATSCOUNTER_INC(pWrkrData->namespaceMetadataBusy, + pWrkrData->mutNamespaceMetadataBusy); + if (0 == pWrkrData->pData->busyRetryInterval) { + pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx); + ABORT_FINALIZE(RS_RET_ERR); + } + add_pod_metadata = 0; /* don't cache pod metadata either - retry both */ + } else if (iRet != RS_RET_OK) { + /* hard error - something the admin needs to fix e.g. network, config, auth */ json_object_put(jReply); jReply = NULL; + STATSCOUNTER_INC(pWrkrData->namespaceMetadataError, + pWrkrData->mutNamespaceMetadataError); pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx); FINALIZE; - } - - if(fjson_object_object_get_ex(jReply, "metadata", &jNsMeta)) { + } else if (fjson_object_object_get_ex(jReply, "metadata", &jNsMeta)) { jNsMeta = json_object_get(jNsMeta); parse_labels_annotations(jNsMeta, &pWrkrData->pData->annotation_match, pWrkrData->pData->de_dot, (const char *)pWrkrData->pData->de_dot_separator, pWrkrData->pData->de_dot_separator_len); - add_ns_metadata = 1; + STATSCOUNTER_INC(pWrkrData->namespaceMetadataSuccess, + pWrkrData->mutNamespaceMetadataSuccess); } else { /* namespace with no metadata??? */ LogMsg(0, RS_RET_ERR, LOG_INFO, "mmkubernetes: namespace [%s] has no metadata!\n", ns); - jNsMeta = NULL; + /* negative cache namespace - make a dummy empty namespace metadata object */ + jNsMeta = json_object_new_object(); + STATSCOUNTER_INC(pWrkrData->namespaceMetadataSuccess, + pWrkrData->mutNamespaceMetadataSuccess); } + if(jNsMeta) { + hashtable_insert(pWrkrData->pData->cache->nsHt, strdup(ns), jNsMeta); + } json_object_put(jReply); jReply = NULL; } @@ -1381,14 +1587,28 @@ CODESTARTdoAction } iRet = queryKB(pWrkrData, url, &jReply); free(url); - if(iRet != RS_RET_OK) { - if(jNsMeta && add_ns_metadata) { - hashtable_insert(pWrkrData->pData->cache->nsHt, strdup(ns), jNsMeta); + if (iRet == RS_RET_NOT_FOUND) { + /* negative cache pod - make a dummy empty pod metadata object */ + iRet = RS_RET_OK; + STATSCOUNTER_INC(pWrkrData->podMetadataNotFound, pWrkrData->mutPodMetadataNotFound); + } else if (iRet == RS_RET_RETRY) { + /* server is busy - retry or error */ + STATSCOUNTER_INC(pWrkrData->podMetadataBusy, pWrkrData->mutPodMetadataBusy); + if (0 == pWrkrData->pData->busyRetryInterval) { + pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx); + ABORT_FINALIZE(RS_RET_ERR); } + add_pod_metadata = 0; /* do not cache so that we can retry */ + iRet = RS_RET_OK; + } else if(iRet != RS_RET_OK) { + /* hard error - something the admin needs to fix e.g. network, config, auth */ json_object_put(jReply); jReply = NULL; + STATSCOUNTER_INC(pWrkrData->podMetadataError, pWrkrData->mutPodMetadataError); pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx); FINALIZE; + } else { + STATSCOUNTER_INC(pWrkrData->podMetadataSuccess, pWrkrData->mutPodMetadataSuccess); } jo = json_object_new_object(); @@ -1435,11 +1655,9 @@ CODESTARTdoAction json_object_object_add(jo, "container_id", json_object_get(jo2)); json_object_object_add(jMetadata, "docker", jo); - hashtable_insert(pWrkrData->pData->cache->mdHt, mdKey, jMetadata); - mdKey = NULL; - if(jNsMeta && add_ns_metadata) { - hashtable_insert(pWrkrData->pData->cache->nsHt, strdup(ns), jNsMeta); - ns = NULL; + if (add_pod_metadata) { + hashtable_insert(pWrkrData->pData->cache->mdHt, mdKey, jMetadata); + mdKey = NULL; } } @@ -1450,6 +1668,11 @@ CODESTARTdoAction * outside of the cache lock */ jMetadataCopy = json_tokener_parse(json_object_get_string(jMetadata)); + if (!add_pod_metadata) { + /* jMetadata object was created from scratch and not cached */ + json_object_put(jMetadata); + jMetadata = NULL; + } pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx); /* the +1 is there to skip the leading '$' */ msgAddJSON(pMsg, (uchar *) pWrkrData->pData->dstMetadataPath + 1, jMetadataCopy, 0, 0); @@ -1470,7 +1693,9 @@ BEGINmodExit CODESTARTmodExit curl_global_cleanup(); + objRelease(datetime, CORE_COMPONENT); objRelease(regexp, LM_REGEXP_FILENAME); + objRelease(statsobj, CORE_COMPONENT); ENDmodExit @@ -1489,8 +1714,9 @@ CODESTARTmodInit *ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */ CODEmodInit_QueryRegCFSLineHdlr DBGPRINTF("mmkubernetes: module compiled with rsyslog version %s.\n", VERSION); + CHKiRet(objUse(statsobj, CORE_COMPONENT)); CHKiRet(objUse(regexp, LM_REGEXP_FILENAME)); - + CHKiRet(objUse(datetime, CORE_COMPONENT)); /* CURL_GLOBAL_ALL initializes more than is needed but the * libcurl documentation discourages use of other values */