Blame SOURCES/rsyslog-8.24.0-rhbz1539193-mmkubernetes-new-plugin.patch

f656cf
From: Jiri Vymazal <jvymazal@redhat.com>
f656cf
Date: Mon, 28 Jun 2018 12:07:55 +0100
f656cf
Subject: Kubernetes Metadata plugin - mmkubernetes
f656cf
f656cf
This plugin is used to annotate records logged by Kubernetes containers.
f656cf
It will add the namespace uuid, pod uuid, pod and namespace labels and
f656cf
annotations, and other metadata associated with the pod and namespace.
f656cf
It will work with either log files in `/var/log/containers/*.log` or
f656cf
with journald entries with `CONTAINER_NAME` and `CONTAINER_ID_FULL`.
f656cf
f656cf
For usage and configuration see syslog-doc
f656cf
f656cf
*Credits*
f656cf
f656cf
This work is based on https://github.com/fabric8io/fluent-plugin-kubernetes_metadata_filter
f656cf
and has many of the same features.
f656cf
f656cf
(cherry picked from commit a6264bf8f91975c9bc0fc602dcdc6881486f1579)
f656cf
(cherry picked from commit b8e68366422052dca9e0a9409baa410f20ae88c8)
f656cf
f656cf
(cherry picked from commit 77886e21292d8220f93b3404236da0e8f7159255)
f656cf
(cherry picked from commit e4d1c7b3832eedc8a1545c2ee6bf022f545d0c76)
f656cf
(cherry picked from commit 3d9f820642b0edc78da0b5bed818590dcd31fa9c)
f656cf
(cherry picked from commit 1d49aac5cb101704486bfb065fac362ca69f06bc)
f656cf
(cherry picked from commit fc2ad45f78dd666b8c9e706ad88c17aaff146d2d)
f656cf
(cherry picked from commit 8cf87f64f6c74a4544112ec7fddc5bf4d43319a7)
f656cf
---
f656cf
 Makefile.am                                      |    5 +
f656cf
 configure.ac                                     |   35 +
f656cf
 contrib/mmkubernetes/Makefile.am                 |    6 +
f656cf
 contrib/mmkubernetes/k8s_container_name.rulebase |    3 +
f656cf
 contrib/mmkubernetes/k8s_filename.rulebase       |    2 +
f656cf
 contrib/mmkubernetes/mmkubernetes.c              | 1491 +++++++++++++++++++++++
f656cf
 contrib/mmkubernetes/sample.conf                 |    7 +
f656cf
 7 files changed, 1549 insertions(+)
f656cf
 create mode 100644 contrib/mmkubernetes/Makefile.am
f656cf
 create mode 100644 contrib/mmkubernetes/k8s_container_name.rulebase
f656cf
 create mode 100644 contrib/mmkubernetes/k8s_filename.rulebase
f656cf
 create mode 100644 contrib/mmkubernetes/mmkubernetes.c
f656cf
 create mode 100644 contrib/mmkubernetes/sample.conf
f656cf
f656cf
diff --git a/Makefile.am b/Makefile.am
f656cf
index a276ef9ea..b58ebaf93 100644
f656cf
--- a/Makefile.am
f656cf
+++ b/Makefile.am
f656cf
@@ -275,6 +275,11 @@ if ENABLE_OMTCL
f656cf
 SUBDIRS += contrib/omtcl
f656cf
 endif
f656cf
 
f656cf
+# mmkubernetes
f656cf
+if ENABLE_MMKUBERNETES
f656cf
+SUBDIRS += contrib/mmkubernetes
f656cf
+endif
f656cf
+
f656cf
 # tests are added as last element, because tests may need different
f656cf
 # modules that need to be generated first
f656cf
 SUBDIRS += tests
f656cf
diff --git a/configure.ac b/configure.ac
f656cf
index a9411f4be..c664222b9 100644
f656cf
--- a/configure.ac
f656cf
+++ b/configure.ac
f656cf
@@ -1889,6 +1889,39 @@ AM_CONDITIONAL(ENABLE_OMTCL, test x$enable_omtcl = xyes)
f656cf
 
f656cf
 # END TCL SUPPORT
f656cf
 
f656cf
+# mmkubernetes - Kubernetes metadata support
f656cf
+
f656cf
+AC_ARG_ENABLE(mmkubernetes,
f656cf
+        [AS_HELP_STRING([--enable-mmkubernetes],
f656cf
+            [Enable compilation of the mmkubernetes module @<:@default=no@:>@])],
f656cf
+        [case "${enableval}" in
f656cf
+         yes) enable_mmkubernetes="yes" ;;
f656cf
+          no) enable_mmkubernetes="no" ;;
f656cf
+           *) AC_MSG_ERROR(bad value ${enableval} for --enable-mmkubernetes) ;;
f656cf
+         esac],
f656cf
+        [enable_mmkubernetes=no]
f656cf
+)
f656cf
+if test "x$enable_mmkubernetes" = "xyes"; then
f656cf
+        PKG_CHECK_MODULES([CURL], [libcurl])
f656cf
+        PKG_CHECK_MODULES(LIBLOGNORM, lognorm >= 2.0.3)
f656cf
+
f656cf
+        save_CFLAGS="$CFLAGS"
f656cf
+        save_LIBS="$LIBS"
f656cf
+
f656cf
+        CFLAGS="$CFLAGS $LIBLOGNORM_CFLAGS"
f656cf
+        LIBS="$LIBS $LIBLOGNORM_LIBS"
f656cf
+
f656cf
+        AC_CHECK_FUNC([ln_loadSamplesFromString],
f656cf
+                      [AC_DEFINE([HAVE_LOADSAMPLESFROMSTRING], [1], [Define if ln_loadSamplesFromString exists.])],
f656cf
+                      [AC_DEFINE([NO_LOADSAMPLESFROMSTRING], [1], [Define if ln_loadSamplesFromString does not exist.])])
f656cf
+
f656cf
+        CFLAGS="$save_CFLAGS"
f656cf
+        LIBS="$save_LIBS"
f656cf
+fi
f656cf
+AM_CONDITIONAL(ENABLE_MMKUBERNETES, test x$enable_mmkubernetes = xyes)
f656cf
+
f656cf
+# END Kubernetes metadata support
f656cf
+
f656cf
 # man pages
f656cf
 AC_CHECKING([if required man pages already exist])
f656cf
 have_to_generate_man_pages="no"
f656cf
@@ -2016,6 +2035,7 @@ AC_CONFIG_FILES([Makefile \
f656cf
 		contrib/omhttpfs/Makefile \
f656cf
 		contrib/omamqp1/Makefile \
f656cf
 		contrib/omtcl/Makefile \
f656cf
+		contrib/mmkubernetes/Makefile \
f656cf
 		tests/Makefile])
f656cf
 AC_OUTPUT
f656cf
 
f656cf
@@ -2090,6 +2110,7 @@ echo "    mmrfc5424addhmac enabled:                 $enable_mmrfc5424addhmac"
f656cf
 echo "    mmpstrucdata enabled:                     $enable_mmpstrucdata"
f656cf
 echo "    mmsequence enabled:                       $enable_mmsequence"
f656cf
 echo "    mmdblookup enabled:                       $enable_mmdblookup"
f656cf
+echo "    mmkubernetes enabled:                     $enable_mmkubernetes"
f656cf
 echo
f656cf
 echo "---{ database support }---"
f656cf
 echo "    MySql support enabled:                    $enable_mysql"
f656cf
diff --git a/contrib/mmkubernetes/Makefile.am b/contrib/mmkubernetes/Makefile.am
f656cf
new file mode 100644
f656cf
index 000000000..3dcc235a6
f656cf
--- /dev/null
f656cf
+++ b/contrib/mmkubernetes/Makefile.am
f656cf
@@ -0,0 +1,6 @@
f656cf
+pkglib_LTLIBRARIES = mmkubernetes.la
f656cf
+
f656cf
+mmkubernetes_la_SOURCES = mmkubernetes.c
f656cf
+mmkubernetes_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CURL_CFLAGS) $(LIBLOGNORM_CFLAGS)
f656cf
+mmkubernetes_la_LDFLAGS = -module -avoid-version
f656cf
+mmkubernetes_la_LIBADD = $(CURL_LIBS) $(LIBLOGNORM_LIBS)
f656cf
diff --git a/contrib/mmkubernetes/k8s_container_name.rulebase b/contrib/mmkubernetes/k8s_container_name.rulebase
f656cf
new file mode 100644
f656cf
index 000000000..35fbb317c
f656cf
--- /dev/null
f656cf
+++ b/contrib/mmkubernetes/k8s_container_name.rulebase
f656cf
@@ -0,0 +1,3 @@
f656cf
+version=2
f656cf
+rule=:%k8s_prefix:char-to:_%_%container_name:char-to:.%.%container_hash:char-to:_%_%pod_name:char-to:_%_%namespace_name:char-to:_%_%not_used_1:char-to:_%_%not_used_2:rest%
f656cf
+rule=:%k8s_prefix:char-to:_%_%container_name:char-to:_%_%pod_name:char-to:_%_%namespace_name:char-to:_%_%not_used_1:char-to:_%_%not_used_2:rest%
f656cf
diff --git a/contrib/mmkubernetes/k8s_filename.rulebase b/contrib/mmkubernetes/k8s_filename.rulebase
f656cf
new file mode 100644
f656cf
index 000000000..24c0d9138
f656cf
--- /dev/null
f656cf
+++ b/contrib/mmkubernetes/k8s_filename.rulebase
f656cf
@@ -0,0 +1,2 @@
f656cf
+version=2
f656cf
+rule=:/var/log/containers/%pod_name:char-to:_%_%namespace_name:char-to:_%_%container_name_and_id:char-to:.%.log
f656cf
diff --git a/contrib/mmkubernetes/mmkubernetes.c b/contrib/mmkubernetes/mmkubernetes.c
f656cf
new file mode 100644
f656cf
index 000000000..5012c54f6
f656cf
--- /dev/null
f656cf
+++ b/contrib/mmkubernetes/mmkubernetes.c
f656cf
@@ -0,0 +1,1491 @@
f656cf
+/* mmkubernetes.c
f656cf
+ * This is a message modification module. It uses metadata obtained
f656cf
+ * from the message to query Kubernetes and obtain additional metadata
f656cf
+ * relating to the container instance.
f656cf
+ *
f656cf
+ * Inspired by:
f656cf
+ * https://github.com/fabric8io/fluent-plugin-kubernetes_metadata_filter
f656cf
+ *
f656cf
+ * NOTE: read comments in module-template.h for details on the calling interface!
f656cf
+ *
f656cf
+ * Copyright 2016 Red Hat Inc.
f656cf
+ *
f656cf
+ * This file is part of rsyslog.
f656cf
+ *
f656cf
+ * Licensed under the Apache License, Version 2.0 (the "License");
f656cf
+ * you may not use this file except in compliance with the License.
f656cf
+ * You may obtain a copy of the License at
f656cf
+ *
f656cf
+ *       http://www.apache.org/licenses/LICENSE-2.0
f656cf
+ *       -or-
f656cf
+ *       see COPYING.ASL20 in the source distribution
f656cf
+ *
f656cf
+ * Unless required by applicable law or agreed to in writing, software
f656cf
+ * distributed under the License is distributed on an "AS IS" BASIS,
f656cf
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
f656cf
+ * See the License for the specific language governing permissions and
f656cf
+ * limitations under the License.
f656cf
+ */
f656cf
+
f656cf
+/* needed for asprintf */
f656cf
+#ifndef _GNU_SOURCE
f656cf
+#  define _GNU_SOURCE
f656cf
+#endif
f656cf
+
f656cf
+#include "config.h"
f656cf
+#include "rsyslog.h"
f656cf
+#include <stdio.h>
f656cf
+#include <stdarg.h>
f656cf
+#include <stdlib.h>
f656cf
+#include <string.h>
f656cf
+#include <assert.h>
f656cf
+#include <errno.h>
f656cf
+#include <unistd.h>
f656cf
+#include <sys/stat.h>
f656cf
+#include <libestr.h>
f656cf
+#include <liblognorm.h>
f656cf
+#include <json.h>
f656cf
+#include <curl/curl.h>
f656cf
+#include <curl/easy.h>
f656cf
+#include <pthread.h>
f656cf
+#include "conf.h"
f656cf
+#include "syslogd-types.h"
f656cf
+#include "module-template.h"
f656cf
+#include "errmsg.h"
f656cf
+#include "regexp.h"
f656cf
+#include "hashtable.h"
f656cf
+#include "srUtils.h"
f656cf
+
f656cf
+/* static data */
f656cf
+MODULE_TYPE_OUTPUT /* this is technically an output plugin */
f656cf
+MODULE_TYPE_KEEP /* releasing the module would cause a leak through libcurl */
f656cf
+MODULE_CNFNAME("mmkubernetes")
f656cf
+DEF_OMOD_STATIC_DATA
f656cf
+DEFobjCurrIf(errmsg)
f656cf
+DEFobjCurrIf(regexp)
f656cf
+
f656cf
+#define HAVE_LOADSAMPLESFROMSTRING 1
f656cf
+#if defined(NO_LOADSAMPLESFROMSTRING)
f656cf
+#undef HAVE_LOADSAMPLESFROMSTRING
f656cf
+#endif
f656cf
+/* original from fluentd plugin:
f656cf
+ * 'var\.log\.containers\.(?<pod_name>[a-z0-9]([-a-z0-9]*[a-z0-9])?\
f656cf
+ *   (\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?<namespace>[^_]+)_\
f656cf
+ *   (?<container_name>.+)-(?<docker_id>[a-z0-9]{64})\.log$'
f656cf
+ * this is for _tag_ match, not actual filename match - in_tail turns filename
f656cf
+ * into a fluentd tag
f656cf
+ */
f656cf
+#define DFLT_FILENAME_LNRULES "rule=:/var/log/containers/%pod_name:char-to:_%_"\
f656cf
+	"%namespace_name:char-to:_%_%container_name:char-to:-%-%container_id:char-to:.%.log"
f656cf
+#define DFLT_FILENAME_RULEBASE "/etc/rsyslog.d/k8s_filename.rulebase"
f656cf
+/* original from fluentd plugin:
f656cf
+ *   '^(?<name_prefix>[^_]+)_(?<container_name>[^\._]+)\
f656cf
+ *     (\.(?<container_hash>[^_]+))?_(?<pod_name>[^_]+)_\
f656cf
+ *     (?<namespace>[^_]+)_[^_]+_[^_]+$'
f656cf
+ */
f656cf
+#define DFLT_CONTAINER_LNRULES "rule=:%k8s_prefix:char-to:_%_%container_name:char-to:.%."\
f656cf
+	"%container_hash:char-to:_%_"\
f656cf
+	"%pod_name:char-to:_%_%namespace_name:char-to:_%_%not_used_1:char-to:_%_%not_used_2:rest%\n"\
f656cf
+	"rule=:%k8s_prefix:char-to:_%_%container_name:char-to:_%_"\
f656cf
+	"%pod_name:char-to:_%_%namespace_name:char-to:_%_%not_used_1:char-to:_%_%not_used_2:rest%"
f656cf
+#define DFLT_CONTAINER_RULEBASE "/etc/rsyslog.d/k8s_container_name.rulebase"
f656cf
+#define DFLT_SRCMD_PATH "$!metadata!filename"
f656cf
+#define DFLT_DSTMD_PATH "$!"
f656cf
+#define DFLT_DE_DOT 1 /* true */
f656cf
+#define DFLT_DE_DOT_SEPARATOR "_"
f656cf
+#define DFLT_CONTAINER_NAME "$!CONTAINER_NAME" /* name of variable holding CONTAINER_NAME value */
f656cf
+#define DFLT_CONTAINER_ID_FULL "$!CONTAINER_ID_FULL" /* name of variable holding CONTAINER_ID_FULL value */
f656cf
+#define DFLT_KUBERNETES_URL "https://kubernetes.default.svc.cluster.local:443"
f656cf
+
f656cf
+static struct cache_s {
f656cf
+	const uchar *kbUrl;
f656cf
+	struct hashtable *mdHt;
f656cf
+	struct hashtable *nsHt;
f656cf
+	pthread_mutex_t *cacheMtx;
f656cf
+} **caches;
f656cf
+
f656cf
+typedef struct {
f656cf
+	int nmemb;
f656cf
+	uchar **patterns;
f656cf
+	regex_t *regexps;
f656cf
+} annotation_match_t;
f656cf
+
f656cf
+/* module configuration data */
f656cf
+struct modConfData_s {
f656cf
+	rsconf_t *pConf;	/* our overall config object */
f656cf
+	uchar *kubernetesUrl;	/* scheme, host, port, and optional path prefix for Kubernetes API lookups */
f656cf
+	uchar *srcMetadataPath;	/* where to get data for kubernetes queries */
f656cf
+	uchar *dstMetadataPath;	/* where to put metadata obtained from kubernetes */
f656cf
+	uchar *caCertFile; /* File holding the CA cert (+optional chain) of CA that issued the Kubernetes server cert */
f656cf
+	sbool allowUnsignedCerts; /* For testing/debugging - do not check for CA certs (CURLOPT_SSL_VERIFYPEER FALSE) */
f656cf
+	uchar *token; /* The token value to use to authenticate to Kubernetes - takes precedence over tokenFile */
f656cf
+	uchar *tokenFile; /* The file whose contents is the token value to use to authenticate to Kubernetes */
f656cf
+	sbool de_dot; /* If true (default), convert '.' characters in labels & annotations to de_dot_separator */
f656cf
+	uchar *de_dot_separator; /* separator character (default '_') to use for de_dotting */
f656cf
+	size_t de_dot_separator_len; /* length of separator character */
f656cf
+	annotation_match_t annotation_match; /* annotation keys must match these to be included in record */
f656cf
+	char *fnRules; /* lognorm rules for container log filename match */
f656cf
+	uchar *fnRulebase; /* lognorm rulebase filename for container log filename match */
f656cf
+	char *contRules; /* lognorm rules for CONTAINER_NAME value match */
f656cf
+	uchar *contRulebase; /* lognorm rulebase filename for CONTAINER_NAME value match */
f656cf
+};
f656cf
+
f656cf
+/* action (instance) configuration data */
f656cf
+typedef struct _instanceData {
f656cf
+	uchar *kubernetesUrl;	/* scheme, host, port, and optional path prefix for Kubernetes API lookups */
f656cf
+	msgPropDescr_t *srcMetadataDescr;	/* where to get data for kubernetes queries */
f656cf
+	uchar *dstMetadataPath;	/* where to put metadata obtained from kubernetes */
f656cf
+	uchar *caCertFile; /* File holding the CA cert (+optional chain) of CA that issued the Kubernetes server cert */
f656cf
+	sbool allowUnsignedCerts; /* For testing/debugging - do not check for CA certs (CURLOPT_SSL_VERIFYPEER FALSE) */
f656cf
+	uchar *token; /* The token value to use to authenticate to Kubernetes - takes precedence over tokenFile */
f656cf
+	uchar *tokenFile; /* The file whose contents is the token value to use to authenticate to Kubernetes */
f656cf
+	sbool de_dot; /* If true (default), convert '.' characters in labels & annotations to de_dot_separator */
f656cf
+	uchar *de_dot_separator; /* separator character (default '_') to use for de_dotting */
f656cf
+	size_t de_dot_separator_len; /* length of separator character */
f656cf
+	annotation_match_t annotation_match; /* annotation keys must match these to be included in record */
f656cf
+	char *fnRules; /* lognorm rules for container log filename match */
f656cf
+	uchar *fnRulebase; /* lognorm rulebase filename for container log filename match */
f656cf
+	ln_ctx fnCtxln;	/**< context to be used for liblognorm */
f656cf
+	char *contRules; /* lognorm rules for CONTAINER_NAME value match */
f656cf
+	uchar *contRulebase; /* lognorm rulebase filename for CONTAINER_NAME value match */
f656cf
+	ln_ctx contCtxln;	/**< context to be used for liblognorm */
f656cf
+	msgPropDescr_t *contNameDescr; /* CONTAINER_NAME field */
f656cf
+	msgPropDescr_t *contIdFullDescr; /* CONTAINER_ID_FULL field */
f656cf
+	struct cache_s *cache;
f656cf
+} instanceData;
f656cf
+
f656cf
+typedef struct wrkrInstanceData {
f656cf
+	instanceData *pData;
f656cf
+	CURL *curlCtx;
f656cf
+	struct curl_slist *curlHdr;
f656cf
+	char *curlRply;
f656cf
+	size_t curlRplyLen;
f656cf
+} wrkrInstanceData_t;
f656cf
+
f656cf
+/* module parameters (v6 config format) */
f656cf
+static struct cnfparamdescr modpdescr[] = {
f656cf
+	{ "kubernetesurl", eCmdHdlrString, 0 },
f656cf
+	{ "srcmetadatapath", eCmdHdlrString, 0 },
f656cf
+	{ "dstmetadatapath", eCmdHdlrString, 0 },
f656cf
+	{ "tls.cacert", eCmdHdlrString, 0 },
f656cf
+	{ "allowunsignedcerts", eCmdHdlrBinary, 0 },
f656cf
+	{ "token", eCmdHdlrString, 0 },
f656cf
+	{ "tokenfile", eCmdHdlrString, 0 },
f656cf
+	{ "annotation_match", eCmdHdlrArray, 0 },
f656cf
+	{ "de_dot", eCmdHdlrBinary, 0 },
f656cf
+	{ "de_dot_separator", eCmdHdlrString, 0 },
f656cf
+	{ "filenamerulebase", eCmdHdlrString, 0 },
f656cf
+	{ "containerrulebase", eCmdHdlrString, 0 }
f656cf
+#if HAVE_LOADSAMPLESFROMSTRING == 1
f656cf
+	,
f656cf
+	{ "filenamerules", eCmdHdlrArray, 0 },
f656cf
+	{ "containerrules", eCmdHdlrArray, 0 }
f656cf
+#endif
f656cf
+};
f656cf
+static struct cnfparamblk modpblk = {
f656cf
+	CNFPARAMBLK_VERSION,
f656cf
+	sizeof(modpdescr)/sizeof(struct cnfparamdescr),
f656cf
+	modpdescr
f656cf
+};
f656cf
+
f656cf
+/* action (instance) parameters (v6 config format) */
f656cf
+static struct cnfparamdescr actpdescr[] = {
f656cf
+	{ "kubernetesurl", eCmdHdlrString, 0 },
f656cf
+	{ "srcmetadatapath", eCmdHdlrString, 0 },
f656cf
+	{ "dstmetadatapath", eCmdHdlrString, 0 },
f656cf
+	{ "tls.cacert", eCmdHdlrString, 0 },
f656cf
+	{ "allowunsignedcerts", eCmdHdlrBinary, 0 },
f656cf
+	{ "token", eCmdHdlrString, 0 },
f656cf
+	{ "tokenfile", eCmdHdlrString, 0 },
f656cf
+	{ "annotation_match", eCmdHdlrArray, 0 },
f656cf
+	{ "de_dot", eCmdHdlrBinary, 0 },
f656cf
+	{ "de_dot_separator", eCmdHdlrString, 0 },
f656cf
+	{ "filenamerulebase", eCmdHdlrString, 0 },
f656cf
+	{ "containerrulebase", eCmdHdlrString, 0 }
f656cf
+#if HAVE_LOADSAMPLESFROMSTRING == 1
f656cf
+	,
f656cf
+	{ "filenamerules", eCmdHdlrArray, 0 },
f656cf
+	{ "containerrules", eCmdHdlrArray, 0 }
f656cf
+#endif
f656cf
+};
f656cf
+static struct cnfparamblk actpblk =
f656cf
+	{ CNFPARAMBLK_VERSION,
f656cf
+	  sizeof(actpdescr)/sizeof(struct cnfparamdescr),
f656cf
+	  actpdescr
f656cf
+	};
f656cf
+
f656cf
+static modConfData_t *loadModConf = NULL;	/* modConf ptr to use for the current load process */
f656cf
+static modConfData_t *runModConf = NULL;	/* modConf ptr to use for the current exec process */
f656cf
+
f656cf
+static void free_annotationmatch(annotation_match_t *match) {
f656cf
+	if (match) {
f656cf
+		for(int ii = 0 ; ii < match->nmemb; ++ii) {
f656cf
+			if (match->patterns)
f656cf
+				free(match->patterns[ii]);
f656cf
+			if (match->regexps)
f656cf
+				regexp.regfree(&match->regexps[ii]);
f656cf
+		}
f656cf
+		free(match->patterns);
f656cf
+		match->patterns = NULL;
f656cf
+		free(match->regexps);
f656cf
+		match->regexps = NULL;
f656cf
+		match->nmemb = 0;
f656cf
+	}
f656cf
+}
f656cf
+
f656cf
+static int init_annotationmatch(annotation_match_t *match, struct cnfarray *ar) {
f656cf
+	DEFiRet;
f656cf
+
f656cf
+	match->nmemb = ar->nmemb;
f656cf
+	CHKmalloc(match->patterns = calloc(sizeof(uchar*), match->nmemb));
f656cf
+	CHKmalloc(match->regexps = calloc(sizeof(regex_t), match->nmemb));
f656cf
+	for(int jj = 0; jj < ar->nmemb; ++jj) {
f656cf
+		int rexret = 0;
f656cf
+		match->patterns[jj] = (uchar*)es_str2cstr(ar->arr[jj], NULL);
f656cf
+		rexret = regexp.regcomp(&match->regexps[jj],
f656cf
+				(char *)match->patterns[jj], REG_EXTENDED|REG_NOSUB);
f656cf
+		if (0 != rexret) {
f656cf
+			char errMsg[512];
f656cf
+			regexp.regerror(rexret, &match->regexps[jj], errMsg, sizeof(errMsg));
f656cf
+			iRet = RS_RET_CONFIG_ERROR;
f656cf
+			errmsg.LogError(0, iRet,
f656cf
+					"error: could not compile annotation_match string [%s]"
f656cf
+					" into an extended regexp - %d: %s\n",
f656cf
+					match->patterns[jj], rexret, errMsg);
f656cf
+			break;
f656cf
+		}
f656cf
+	}
f656cf
+finalize_it:
f656cf
+	if (iRet)
f656cf
+		free_annotationmatch(match);
f656cf
+	RETiRet;
f656cf
+}
f656cf
+
f656cf
+static int copy_annotationmatch(annotation_match_t *src, annotation_match_t *dest) {
f656cf
+	DEFiRet;
f656cf
+
f656cf
+	dest->nmemb = src->nmemb;
f656cf
+	CHKmalloc(dest->patterns = malloc(sizeof(uchar*) * dest->nmemb));
f656cf
+	CHKmalloc(dest->regexps = calloc(sizeof(regex_t), dest->nmemb));
f656cf
+	for(int jj = 0 ; jj < src->nmemb ; ++jj) {
f656cf
+		CHKmalloc(dest->patterns[jj] = (uchar*)strdup((char *)src->patterns[jj]));
f656cf
+		/* assumes was already successfully compiled */
f656cf
+		regexp.regcomp(&dest->regexps[jj], (char *)dest->patterns[jj], REG_EXTENDED|REG_NOSUB);
f656cf
+	}
f656cf
+finalize_it:
f656cf
+    if (iRet)
f656cf
+    	free_annotationmatch(dest);
f656cf
+	RETiRet;
f656cf
+}
f656cf
+
f656cf
+/* takes a hash of annotations and returns another json object hash containing only the
f656cf
+ * keys that match - this logic is taken directly from fluent-plugin-kubernetes_metadata_filter
f656cf
+ * except that we do not add the key multiple times to the object to be returned
f656cf
+ */
f656cf
+static struct json_object *match_annotations(annotation_match_t *match,
f656cf
+		struct json_object *annotations) {
f656cf
+	struct json_object *ret = NULL;
f656cf
+
f656cf
+	for (int jj = 0; jj < match->nmemb; ++jj) {
f656cf
+		struct json_object_iterator it = json_object_iter_begin(annotations);
f656cf
+		struct json_object_iterator itEnd = json_object_iter_end(annotations);
f656cf
+		for (;!json_object_iter_equal(&it, &itEnd); json_object_iter_next(&it)) {
f656cf
+			const char *const key = json_object_iter_peek_name(&it);
f656cf
+			if (!ret || !fjson_object_object_get_ex(ret, key, NULL)) {
f656cf
+				if (!regexp.regexec(&match->regexps[jj], key, 0, NULL, 0)) {
f656cf
+					if (!ret) {
f656cf
+						ret = json_object_new_object();
f656cf
+					}
f656cf
+					json_object_object_add(ret, key,
f656cf
+						json_object_get(json_object_iter_peek_value(&it)));
f656cf
+				}
f656cf
+			}
f656cf
+		}
f656cf
+	}
f656cf
+	return ret;
f656cf
+}
f656cf
+
f656cf
+/* This will take a hash of labels or annotations and will de_dot the keys.
f656cf
+ * It will return a brand new hash.  AFAICT, there is no safe way to
f656cf
+ * iterate over the hash while modifying it in place.
f656cf
+ */
f656cf
+static struct json_object *de_dot_json_object(struct json_object *jobj,
f656cf
+		const char *delim, size_t delim_len) {
f656cf
+	struct json_object *ret = NULL;
f656cf
+	struct json_object_iterator it = json_object_iter_begin(jobj);
f656cf
+	struct json_object_iterator itEnd = json_object_iter_end(jobj);
f656cf
+	es_str_t *new_es_key = NULL;
f656cf
+	DEFiRet;
f656cf
+
f656cf
+	ret = json_object_new_object();
f656cf
+	while (!json_object_iter_equal(&it, &itEnd)) {
f656cf
+		const char *const key = json_object_iter_peek_name(&it);
f656cf
+		const char *cc = strstr(key, ".");
f656cf
+		if (NULL == cc) {
f656cf
+			json_object_object_add(ret, key,
f656cf
+					json_object_get(json_object_iter_peek_value(&it)));
f656cf
+		} else {
f656cf
+			char *new_key = NULL;
f656cf
+			const char *prevcc = key;
f656cf
+			new_es_key = es_newStrFromCStr(key, (es_size_t)(cc-prevcc));
f656cf
+			while (cc) {
f656cf
+				if (es_addBuf(&new_es_key, (char *)delim, (es_size_t)delim_len))
f656cf
+					ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
f656cf
+				cc += 1; /* one past . */
f656cf
+				prevcc = cc; /* beginning of next substring */
f656cf
+				if ((cc = strstr(prevcc, ".")) || (cc = strchr(prevcc, '\0'))) {
f656cf
+					if (es_addBuf(&new_es_key, (char *)prevcc, (es_size_t)(cc-prevcc)))
f656cf
+						ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
f656cf
+					if (!*cc)
f656cf
+						cc = NULL; /* EOS - done */
f656cf
+				}
f656cf
+			}
f656cf
+			new_key = es_str2cstr(new_es_key, NULL);
f656cf
+			es_deleteStr(new_es_key);
f656cf
+			new_es_key = NULL;
f656cf
+			json_object_object_add(ret, new_key,
f656cf
+					json_object_get(json_object_iter_peek_value(&it)));
f656cf
+			free(new_key);
f656cf
+		}
f656cf
+		json_object_iter_next(&it);
f656cf
+	}
f656cf
+finalize_it:
f656cf
+	if (iRet != RS_RET_OK) {
f656cf
+		json_object_put(ret);
f656cf
+		ret = NULL;
f656cf
+	}
f656cf
+	if (new_es_key)
f656cf
+		es_deleteStr(new_es_key);
f656cf
+	return ret;
f656cf
+}
f656cf
+
f656cf
+/* given a "metadata" object field, do
f656cf
+ * - make sure "annotations" field has only the matching keys
f656cf
+ * - de_dot the "labels" and "annotations" fields keys
f656cf
+ * This modifies the jMetadata object in place
f656cf
+ */
f656cf
+static void parse_labels_annotations(struct json_object *jMetadata,
f656cf
+		annotation_match_t *match, sbool de_dot,
f656cf
+		const char *delim, size_t delim_len) {
f656cf
+	struct json_object *jo = NULL;
f656cf
+
f656cf
+	if (fjson_object_object_get_ex(jMetadata, "annotations", &jo)) {
f656cf
+		if ((jo = match_annotations(match, jo)))
f656cf
+			json_object_object_add(jMetadata, "annotations", jo);
f656cf
+		else
f656cf
+			json_object_object_del(jMetadata, "annotations");
f656cf
+	}
f656cf
+	/* dedot labels and annotations */
f656cf
+	if (de_dot) {
f656cf
+		struct json_object *jo2 = NULL;
f656cf
+		if (fjson_object_object_get_ex(jMetadata, "annotations", &jo)) {
f656cf
+			if ((jo2 = de_dot_json_object(jo, delim, delim_len))) {
f656cf
+				json_object_object_add(jMetadata, "annotations", jo2);
f656cf
+			}
f656cf
+		}
f656cf
+		if (fjson_object_object_get_ex(jMetadata, "labels", &jo)) {
f656cf
+			if ((jo2 = de_dot_json_object(jo, delim, delim_len))) {
f656cf
+				json_object_object_add(jMetadata, "labels", jo2);
f656cf
+			}
f656cf
+		}
f656cf
+	}
f656cf
+}
f656cf
+
f656cf
+#if HAVE_LOADSAMPLESFROMSTRING == 1
f656cf
+static int array_to_rules(struct cnfarray *ar, char **rules) {
f656cf
+	DEFiRet;
f656cf
+	es_str_t *tmpstr = NULL;
f656cf
+	es_size_t size = 0;
f656cf
+
f656cf
+	if (rules == NULL)
f656cf
+		FINALIZE;
f656cf
+	*rules = NULL;
f656cf
+	if (!ar->nmemb)
f656cf
+		FINALIZE;
f656cf
+	for (int jj = 0; jj < ar->nmemb; jj++)
f656cf
+		size += es_strlen(ar->arr[jj]);
f656cf
+	if (!size)
f656cf
+		FINALIZE;
f656cf
+	CHKmalloc(tmpstr = es_newStr(size));
f656cf
+	CHKiRet((es_addStr(&tmpstr, ar->arr[0])));
f656cf
+	CHKiRet((es_addBufConstcstr(&tmpstr, "\n")));
f656cf
+	for(int jj=1; jj < ar->nmemb; ++jj) {
f656cf
+		CHKiRet((es_addStr(&tmpstr, ar->arr[jj])));
f656cf
+		CHKiRet((es_addBufConstcstr(&tmpstr, "\n")));
f656cf
+	}
f656cf
+	CHKiRet((es_addBufConstcstr(&tmpstr, "\0")));
f656cf
+	CHKmalloc(*rules = es_str2cstr(tmpstr, NULL));
f656cf
+finalize_it:
f656cf
+	if (tmpstr) {
f656cf
+		es_deleteStr(tmpstr);
f656cf
+	}
f656cf
+    if (iRet != RS_RET_OK) {
f656cf
+    	free(*rules);
f656cf
+    	*rules = NULL;
f656cf
+    }
f656cf
+	RETiRet;
f656cf
+}
f656cf
+#endif
f656cf
+
f656cf
+/* callback for liblognorm error messages */
f656cf
+static void
f656cf
+errCallBack(void __attribute__((unused)) *cookie, const char *msg,
f656cf
+	    size_t __attribute__((unused)) lenMsg)
f656cf
+{
f656cf
+	errmsg.LogError(0, RS_RET_ERR_LIBLOGNORM, "liblognorm error: %s", msg);
f656cf
+}
f656cf
+
f656cf
+static rsRetVal
f656cf
+set_lnctx(ln_ctx *ctxln, char *instRules, uchar *instRulebase, char *modRules, uchar *modRulebase)
f656cf
+{
f656cf
+	DEFiRet;
f656cf
+	if (ctxln == NULL)
f656cf
+		FINALIZE;
f656cf
+	CHKmalloc(*ctxln = ln_initCtx());
f656cf
+	ln_setErrMsgCB(*ctxln, errCallBack, NULL);
f656cf
+	if(instRules) {
f656cf
+#if HAVE_LOADSAMPLESFROMSTRING == 1
f656cf
+		if(ln_loadSamplesFromString(*ctxln, instRules) !=0) {
f656cf
+			errmsg.LogError(0, RS_RET_NO_RULEBASE, "error: normalization rules '%s' "
f656cf
+					"could not be loaded", instRules);
f656cf
+			ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD);
f656cf
+		}
f656cf
+#else
f656cf
+		(void)instRules;
f656cf
+#endif
f656cf
+	} else if(instRulebase) {
f656cf
+		if(ln_loadSamples(*ctxln, (char*) instRulebase) != 0) {
f656cf
+			errmsg.LogError(0, RS_RET_NO_RULEBASE, "error: normalization rulebase '%s' "
f656cf
+					"could not be loaded", instRulebase);
f656cf
+			ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD);
f656cf
+		}
f656cf
+	} else if(modRules) {
f656cf
+#if HAVE_LOADSAMPLESFROMSTRING == 1
f656cf
+		if(ln_loadSamplesFromString(*ctxln, modRules) !=0) {
f656cf
+			errmsg.LogError(0, RS_RET_NO_RULEBASE, "error: normalization rules '%s' "
f656cf
+					"could not be loaded", modRules);
f656cf
+			ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD);
f656cf
+		}
f656cf
+#else
f656cf
+		(void)modRules;
f656cf
+#endif
f656cf
+	} else if(modRulebase) {
f656cf
+		if(ln_loadSamples(*ctxln, (char*) modRulebase) != 0) {
f656cf
+			errmsg.LogError(0, RS_RET_NO_RULEBASE, "error: normalization rulebase '%s' "
f656cf
+					"could not be loaded", modRulebase);
f656cf
+			ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD);
f656cf
+		}
f656cf
+	}
f656cf
+finalize_it:
f656cf
+	if (iRet != RS_RET_OK){
f656cf
+		ln_exitCtx(*ctxln);
f656cf
+		*ctxln = NULL;
f656cf
+	}
f656cf
+	RETiRet;
f656cf
+}
f656cf
+
f656cf
+BEGINbeginCnfLoad
f656cf
+CODESTARTbeginCnfLoad
f656cf
+	loadModConf = pModConf;
f656cf
+	pModConf->pConf = pConf;
f656cf
+ENDbeginCnfLoad
f656cf
+
f656cf
+
f656cf
+BEGINsetModCnf
f656cf
+	struct cnfparamvals *pvals = NULL;
f656cf
+	int i;
f656cf
+	FILE *fp;
f656cf
+	int ret;
f656cf
+CODESTARTsetModCnf
f656cf
+	pvals = nvlstGetParams(lst, &modpblk, NULL);
f656cf
+	if(pvals == NULL) {
f656cf
+		errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "mmkubernetes: "
f656cf
+			"error processing module config parameters [module(...)]");
f656cf
+		ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
f656cf
+	}
f656cf
+
f656cf
+	if(Debug) {
f656cf
+		dbgprintf("module (global) param blk for mmkubernetes:\n");
f656cf
+		cnfparamsPrint(&modpblk, pvals);
f656cf
+	}
f656cf
+
f656cf
+	loadModConf->de_dot = DFLT_DE_DOT;
f656cf
+	for(i = 0 ; i < modpblk.nParams ; ++i) {
f656cf
+		if(!pvals[i].bUsed) {
f656cf
+			continue;
f656cf
+		} else if(!strcmp(modpblk.descr[i].name, "kubernetesurl")) {
f656cf
+			free(loadModConf->kubernetesUrl);
f656cf
+			loadModConf->kubernetesUrl = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
+		} else if(!strcmp(modpblk.descr[i].name, "srcmetadatapath")) {
f656cf
+			free(loadModConf->srcMetadataPath);
f656cf
+			loadModConf->srcMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
+			/* todo: sanitize the path */
f656cf
+		} else if(!strcmp(modpblk.descr[i].name, "dstmetadatapath")) {
f656cf
+			free(loadModConf->dstMetadataPath);
f656cf
+			loadModConf->dstMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
+			/* todo: sanitize the path */
f656cf
+		} else if(!strcmp(modpblk.descr[i].name, "tls.cacert")) {
f656cf
+			free(loadModConf->caCertFile);
f656cf
+			loadModConf->caCertFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
+			fp = fopen((const char*)loadModConf->caCertFile, "r");
f656cf
+			if(fp == NULL) {
f656cf
+				char errStr[1024];
f656cf
+				rs_strerror_r(errno, errStr, sizeof(errStr));
f656cf
+				iRet = RS_RET_NO_FILE_ACCESS;
f656cf
+				errmsg.LogError(0, iRet,
f656cf
+						"error: certificate file %s couldn't be accessed: %s\n",
f656cf
+						loadModConf->caCertFile, errStr);
f656cf
+				ABORT_FINALIZE(iRet);
f656cf
+			} else {
f656cf
+				fclose(fp);
f656cf
+			}
f656cf
+		} else if(!strcmp(modpblk.descr[i].name, "allowunsignedcerts")) {
f656cf
+			loadModConf->allowUnsignedCerts = pvals[i].val.d.n;
f656cf
+		} else if(!strcmp(modpblk.descr[i].name, "token")) {
f656cf
+			free(loadModConf->token);
f656cf
+			loadModConf->token = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
+		} else if(!strcmp(modpblk.descr[i].name, "tokenfile")) {
f656cf
+			free(loadModConf->tokenFile);
f656cf
+			loadModConf->tokenFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
+			fp = fopen((const char*)loadModConf->tokenFile, "r");
f656cf
+			if(fp == NULL) {
f656cf
+				char errStr[1024];
f656cf
+				rs_strerror_r(errno, errStr, sizeof(errStr));
f656cf
+				iRet = RS_RET_NO_FILE_ACCESS;
f656cf
+				errmsg.LogError(0, iRet,
f656cf
+						"error: token file %s couldn't be accessed: %s\n",
f656cf
+						loadModConf->tokenFile, errStr);
f656cf
+				ABORT_FINALIZE(iRet);
f656cf
+			} else {
f656cf
+				fclose(fp);
f656cf
+			}
f656cf
+		} else if(!strcmp(modpblk.descr[i].name, "annotation_match")) {
f656cf
+			free_annotationmatch(&loadModConf->annotation_match);
f656cf
+			if ((ret = init_annotationmatch(&loadModConf->annotation_match, pvals[i].val.d.ar)))
f656cf
+				ABORT_FINALIZE(ret);
f656cf
+		} else if(!strcmp(modpblk.descr[i].name, "de_dot")) {
f656cf
+			loadModConf->de_dot = pvals[i].val.d.n;
f656cf
+		} else if(!strcmp(modpblk.descr[i].name, "de_dot_separator")) {
f656cf
+			free(loadModConf->de_dot_separator);
f656cf
+			loadModConf->de_dot_separator = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
+#if HAVE_LOADSAMPLESFROMSTRING == 1
f656cf
+		} else if(!strcmp(modpblk.descr[i].name, "filenamerules")) {
f656cf
+			free(loadModConf->fnRules);
f656cf
+			CHKiRet((array_to_rules(pvals[i].val.d.ar, &loadModConf->fnRules)));
f656cf
+#endif
f656cf
+		} else if(!strcmp(modpblk.descr[i].name, "filenamerulebase")) {
f656cf
+			free(loadModConf->fnRulebase);
f656cf
+			loadModConf->fnRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
+			fp = fopen((const char*)loadModConf->fnRulebase, "r");
f656cf
+			if(fp == NULL) {
f656cf
+				char errStr[1024];
f656cf
+				rs_strerror_r(errno, errStr, sizeof(errStr));
f656cf
+				iRet = RS_RET_NO_FILE_ACCESS;
f656cf
+				errmsg.LogError(0, iRet,
f656cf
+						"error: filenamerulebase file %s couldn't be accessed: %s\n",
f656cf
+						loadModConf->fnRulebase, errStr);
f656cf
+				ABORT_FINALIZE(iRet);
f656cf
+			} else {
f656cf
+				fclose(fp);
f656cf
+			}
f656cf
+#if HAVE_LOADSAMPLESFROMSTRING == 1
f656cf
+		} else if(!strcmp(modpblk.descr[i].name, "containerrules")) {
f656cf
+			free(loadModConf->contRules);
f656cf
+			CHKiRet((array_to_rules(pvals[i].val.d.ar, &loadModConf->contRules)));
f656cf
+#endif
f656cf
+		} else if(!strcmp(modpblk.descr[i].name, "containerrulebase")) {
f656cf
+			free(loadModConf->contRulebase);
f656cf
+			loadModConf->contRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
+			fp = fopen((const char*)loadModConf->contRulebase, "r");
f656cf
+			if(fp == NULL) {
f656cf
+				char errStr[1024];
f656cf
+				rs_strerror_r(errno, errStr, sizeof(errStr));
f656cf
+				iRet = RS_RET_NO_FILE_ACCESS;
f656cf
+				errmsg.LogError(0, iRet,
f656cf
+						"error: containerrulebase file %s couldn't be accessed: %s\n",
f656cf
+						loadModConf->contRulebase, errStr);
f656cf
+				ABORT_FINALIZE(iRet);
f656cf
+			} else {
f656cf
+				fclose(fp);
f656cf
+			}
f656cf
+		} else {
f656cf
+			dbgprintf("mmkubernetes: program error, non-handled "
f656cf
+				"param '%s' in module() block\n", modpblk.descr[i].name);
f656cf
+			/* todo: error message? */
f656cf
+		}
f656cf
+	}
f656cf
+
f656cf
+#if HAVE_LOADSAMPLESFROMSTRING == 1
f656cf
+	if (loadModConf->fnRules && loadModConf->fnRulebase) {
f656cf
+		errmsg.LogError(0, RS_RET_CONFIG_ERROR,
f656cf
+				"mmkubernetes: only 1 of filenamerules or filenamerulebase may be used");
f656cf
+		ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
f656cf
+	}
f656cf
+	if (loadModConf->contRules && loadModConf->contRulebase) {
f656cf
+		errmsg.LogError(0, RS_RET_CONFIG_ERROR,
f656cf
+				"mmkubernetes: only 1 of containerrules or containerrulebase may be used");
f656cf
+		ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
f656cf
+	}
f656cf
+#endif
f656cf
+
f656cf
+	/* set defaults */
f656cf
+	if(loadModConf->srcMetadataPath == NULL)
f656cf
+		loadModConf->srcMetadataPath = (uchar *) strdup(DFLT_SRCMD_PATH);
f656cf
+	if(loadModConf->dstMetadataPath == NULL)
f656cf
+		loadModConf->dstMetadataPath = (uchar *) strdup(DFLT_DSTMD_PATH);
f656cf
+	if(loadModConf->de_dot_separator == NULL)
f656cf
+		loadModConf->de_dot_separator = (uchar *) strdup(DFLT_DE_DOT_SEPARATOR);
f656cf
+	if(loadModConf->de_dot_separator)
f656cf
+		loadModConf->de_dot_separator_len = strlen((const char *)loadModConf->de_dot_separator);
f656cf
+#if HAVE_LOADSAMPLESFROMSTRING == 1
f656cf
+	if (loadModConf->fnRules == NULL && loadModConf->fnRulebase == NULL)
f656cf
+		loadModConf->fnRules = strdup(DFLT_FILENAME_LNRULES);
f656cf
+	if (loadModConf->contRules == NULL && loadModConf->contRulebase == NULL)
f656cf
+		loadModConf->contRules = strdup(DFLT_CONTAINER_LNRULES);
f656cf
+#else
f656cf
+	if (loadModConf->fnRulebase == NULL)
f656cf
+		loadModConf->fnRulebase = (uchar *)strdup(DFLT_FILENAME_RULEBASE);
f656cf
+	if (loadModConf->contRulebase == NULL)
f656cf
+		loadModConf->contRulebase = (uchar *)strdup(DFLT_CONTAINER_RULEBASE);
f656cf
+#endif
f656cf
+	caches = calloc(1, sizeof(struct cache_s *));
f656cf
+
f656cf
+finalize_it:
f656cf
+	if(pvals != NULL)
f656cf
+		cnfparamvalsDestruct(pvals, &modpblk);
f656cf
+ENDsetModCnf
f656cf
+
f656cf
+
f656cf
+BEGINcreateInstance
f656cf
+CODESTARTcreateInstance
f656cf
+ENDcreateInstance
f656cf
+
f656cf
+
f656cf
+BEGINfreeInstance
f656cf
+CODESTARTfreeInstance
f656cf
+	free(pData->kubernetesUrl);
f656cf
+	msgPropDescrDestruct(pData->srcMetadataDescr);
f656cf
+	free(pData->srcMetadataDescr);
f656cf
+	free(pData->dstMetadataPath);
f656cf
+	free(pData->caCertFile);
f656cf
+	free(pData->token);
f656cf
+	free(pData->tokenFile);
f656cf
+	free(pData->fnRules);
f656cf
+	free(pData->fnRulebase);
f656cf
+	ln_exitCtx(pData->fnCtxln);
f656cf
+	free(pData->contRules);
f656cf
+	free(pData->contRulebase);
f656cf
+	ln_exitCtx(pData->contCtxln);
f656cf
+	free_annotationmatch(&pData->annotation_match);
f656cf
+	free(pData->de_dot_separator);
f656cf
+	msgPropDescrDestruct(pData->contNameDescr);
f656cf
+	free(pData->contNameDescr);
f656cf
+	msgPropDescrDestruct(pData->contIdFullDescr);
f656cf
+	free(pData->contIdFullDescr);
f656cf
+ENDfreeInstance
f656cf
+
f656cf
+static size_t curlCB(char *data, size_t size, size_t nmemb, void *usrptr)
f656cf
+{
f656cf
+	DEFiRet;
f656cf
+	wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t *) usrptr;
f656cf
+	char * buf;
f656cf
+	size_t newlen;
f656cf
+
f656cf
+	newlen = pWrkrData->curlRplyLen + size * nmemb;
f656cf
+	CHKmalloc(buf = realloc(pWrkrData->curlRply, newlen));
f656cf
+	memcpy(buf + pWrkrData->curlRplyLen, data, size * nmemb);
f656cf
+	pWrkrData->curlRply = buf;
f656cf
+	pWrkrData->curlRplyLen = newlen;
f656cf
+
f656cf
+finalize_it:
f656cf
+	if (iRet != RS_RET_OK) {
f656cf
+		return 0;
f656cf
+	}
f656cf
+	return size * nmemb;
f656cf
+}
f656cf
+
f656cf
+BEGINcreateWrkrInstance
f656cf
+CODESTARTcreateWrkrInstance
f656cf
+	CURL *ctx;
f656cf
+	struct curl_slist *hdr = NULL;
f656cf
+	char *tokenHdr = NULL;
f656cf
+	FILE *fp = NULL;
f656cf
+	char *token = NULL;
f656cf
+
f656cf
+	hdr = curl_slist_append(hdr, "Content-Type: text/json; charset=utf-8");
f656cf
+	if (pWrkrData->pData->token) {
f656cf
+		if ((-1 == asprintf(&tokenHdr, "Authorization: Bearer %s", pWrkrData->pData->token)) ||
f656cf
+			(!tokenHdr)) {
f656cf
+			ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
f656cf
+		}
f656cf
+	} else if (pWrkrData->pData->tokenFile) {
f656cf
+		struct stat statbuf;
f656cf
+		fp = fopen((const char*)pWrkrData->pData->tokenFile, "r");
f656cf
+		if (fp && !fstat(fileno(fp), &statbuf)) {
f656cf
+			size_t bytesread;
f656cf
+			CHKmalloc(token = malloc((statbuf.st_size+1)*sizeof(char)));
f656cf
+			if (0 < (bytesread = fread(token, sizeof(char), statbuf.st_size, fp))) {
f656cf
+				token[bytesread] = '\0';
f656cf
+				if ((-1 == asprintf(&tokenHdr, "Authorization: Bearer %s", token)) ||
f656cf
+					(!tokenHdr)) {
f656cf
+					ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
f656cf
+				}
f656cf
+			}
f656cf
+			free(token);
f656cf
+			token = NULL;
f656cf
+		}
f656cf
+		if (fp) {
f656cf
+			fclose(fp);
f656cf
+			fp = NULL;
f656cf
+		}
f656cf
+	}
f656cf
+	if (tokenHdr) {
f656cf
+		hdr = curl_slist_append(hdr, tokenHdr);
f656cf
+		free(tokenHdr);
f656cf
+	}
f656cf
+	pWrkrData->curlHdr = hdr;
f656cf
+	ctx = curl_easy_init();
f656cf
+	curl_easy_setopt(ctx, CURLOPT_HTTPHEADER, hdr);
f656cf
+	curl_easy_setopt(ctx, CURLOPT_WRITEFUNCTION, curlCB);
f656cf
+	curl_easy_setopt(ctx, CURLOPT_WRITEDATA, pWrkrData);
f656cf
+	if(pWrkrData->pData->caCertFile)
f656cf
+		curl_easy_setopt(ctx, CURLOPT_CAINFO, pWrkrData->pData->caCertFile);
f656cf
+	if(pWrkrData->pData->allowUnsignedCerts)
f656cf
+		curl_easy_setopt(ctx, CURLOPT_SSL_VERIFYPEER, 0);
f656cf
+
f656cf
+	pWrkrData->curlCtx = ctx;
f656cf
+finalize_it:
f656cf
+	free(token);
f656cf
+	if (fp) {
f656cf
+		fclose(fp);
f656cf
+	}
f656cf
+ENDcreateWrkrInstance
f656cf
+
f656cf
+
f656cf
+BEGINfreeWrkrInstance
f656cf
+CODESTARTfreeWrkrInstance
f656cf
+	curl_easy_cleanup(pWrkrData->curlCtx);
f656cf
+	curl_slist_free_all(pWrkrData->curlHdr);
f656cf
+ENDfreeWrkrInstance
f656cf
+
f656cf
+
f656cf
+static struct cache_s *cacheNew(const uchar *url)
f656cf
+{
f656cf
+	struct cache_s *cache;
f656cf
+
f656cf
+	if (NULL == (cache = calloc(1, sizeof(struct cache_s)))) {
f656cf
+		FINALIZE;
f656cf
+	}
f656cf
+	cache->kbUrl = url;
f656cf
+	cache->mdHt = create_hashtable(100, hash_from_string,
f656cf
+		key_equals_string, (void (*)(void *)) json_object_put);
f656cf
+	cache->nsHt = create_hashtable(100, hash_from_string,
f656cf
+		key_equals_string, (void (*)(void *)) json_object_put);
f656cf
+	cache->cacheMtx = malloc(sizeof(pthread_mutex_t));
f656cf
+	if (!cache->mdHt || !cache->nsHt || !cache->cacheMtx) {
f656cf
+		free (cache);
f656cf
+		cache = NULL;
f656cf
+		FINALIZE;
f656cf
+	}
f656cf
+	pthread_mutex_init(cache->cacheMtx, NULL);
f656cf
+
f656cf
+finalize_it:
f656cf
+	return cache;
f656cf
+}
f656cf
+
f656cf
+
f656cf
+static void cacheFree(struct cache_s *cache)
f656cf
+{
f656cf
+	hashtable_destroy(cache->mdHt, 1);
f656cf
+	hashtable_destroy(cache->nsHt, 1);
f656cf
+	pthread_mutex_destroy(cache->cacheMtx);
f656cf
+	free(cache->cacheMtx);
f656cf
+	free(cache);
f656cf
+}
f656cf
+
f656cf
+
f656cf
+BEGINnewActInst
f656cf
+	struct cnfparamvals *pvals = NULL;
f656cf
+	int i;
f656cf
+	FILE *fp;
f656cf
+	char *rxstr = NULL;
f656cf
+	char *srcMetadataPath = NULL;
f656cf
+CODESTARTnewActInst
f656cf
+	DBGPRINTF("newActInst (mmkubernetes)\n");
f656cf
+
f656cf
+	pvals = nvlstGetParams(lst, &actpblk, NULL);
f656cf
+	if(pvals == NULL) {
f656cf
+		errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "mmkubernetes: "
f656cf
+			"error processing config parameters [action(...)]");
f656cf
+		ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
f656cf
+	}
f656cf
+
f656cf
+	if(Debug) {
f656cf
+		dbgprintf("action param blk in mmkubernetes:\n");
f656cf
+		cnfparamsPrint(&actpblk, pvals);
f656cf
+	}
f656cf
+
f656cf
+	CODE_STD_STRING_REQUESTnewActInst(1)
f656cf
+	CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
f656cf
+	CHKiRet(createInstance(&pData));
f656cf
+
f656cf
+	pData->de_dot = loadModConf->de_dot;
f656cf
+	pData->allowUnsignedCerts = loadModConf->allowUnsignedCerts;
f656cf
+	for(i = 0 ; i < actpblk.nParams ; ++i) {
f656cf
+		if(!pvals[i].bUsed) {
f656cf
+			continue;
f656cf
+		} else if(!strcmp(actpblk.descr[i].name, "kubernetesurl")) {
f656cf
+			free(pData->kubernetesUrl);
f656cf
+			pData->kubernetesUrl = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
+		} else if(!strcmp(actpblk.descr[i].name, "srcmetadatapath")) {
f656cf
+			msgPropDescrDestruct(pData->srcMetadataDescr);
f656cf
+			free(pData->srcMetadataDescr);
f656cf
+			CHKmalloc(pData->srcMetadataDescr = MALLOC(sizeof(msgPropDescr_t)));
f656cf
+			srcMetadataPath = es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
+			CHKiRet(msgPropDescrFill(pData->srcMetadataDescr, (uchar *)srcMetadataPath,
f656cf
+				strlen(srcMetadataPath)));
f656cf
+			/* todo: sanitize the path */
f656cf
+		} else if(!strcmp(actpblk.descr[i].name, "dstmetadatapath")) {
f656cf
+			free(pData->dstMetadataPath);
f656cf
+			pData->dstMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
+			/* todo: sanitize the path */
f656cf
+		} else if(!strcmp(actpblk.descr[i].name, "tls.cacert")) {
f656cf
+			free(pData->caCertFile);
f656cf
+			pData->caCertFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
+			fp = fopen((const char*)pData->caCertFile, "r");
f656cf
+			if(fp == NULL) {
f656cf
+				char errStr[1024];
f656cf
+				rs_strerror_r(errno, errStr, sizeof(errStr));
f656cf
+				iRet = RS_RET_NO_FILE_ACCESS;
f656cf
+				errmsg.LogError(0, iRet,
f656cf
+						"error: certificate file %s couldn't be accessed: %s\n",
f656cf
+						pData->caCertFile, errStr);
f656cf
+				ABORT_FINALIZE(iRet);
f656cf
+			} else {
f656cf
+				fclose(fp);
f656cf
+			}
f656cf
+		} else if(!strcmp(actpblk.descr[i].name, "allowunsignedcerts")) {
f656cf
+			pData->allowUnsignedCerts = pvals[i].val.d.n;
f656cf
+		} else if(!strcmp(actpblk.descr[i].name, "token")) {
f656cf
+			free(pData->token);
f656cf
+			pData->token = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
+		} else if(!strcmp(actpblk.descr[i].name, "tokenfile")) {
f656cf
+			free(pData->tokenFile);
f656cf
+			pData->tokenFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
+			fp = fopen((const char*)pData->tokenFile, "r");
f656cf
+			if(fp == NULL) {
f656cf
+				char errStr[1024];
f656cf
+				rs_strerror_r(errno, errStr, sizeof(errStr));
f656cf
+				iRet = RS_RET_NO_FILE_ACCESS;
f656cf
+				errmsg.LogError(0, iRet,
f656cf
+						"error: token file %s couldn't be accessed: %s\n",
f656cf
+						pData->tokenFile, errStr);
f656cf
+				ABORT_FINALIZE(iRet);
f656cf
+			} else {
f656cf
+				fclose(fp);
f656cf
+			}
f656cf
+		} else if(!strcmp(actpblk.descr[i].name, "annotation_match")) {
f656cf
+			free_annotationmatch(&pData->annotation_match);
f656cf
+			if (RS_RET_OK != (iRet = init_annotationmatch(&pData->annotation_match, pvals[i].val.d.ar)))
f656cf
+				ABORT_FINALIZE(iRet);
f656cf
+		} else if(!strcmp(actpblk.descr[i].name, "de_dot")) {
f656cf
+			pData->de_dot = pvals[i].val.d.n;
f656cf
+		} else if(!strcmp(actpblk.descr[i].name, "de_dot_separator")) {
f656cf
+			free(pData->de_dot_separator);
f656cf
+			pData->de_dot_separator = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
+#if HAVE_LOADSAMPLESFROMSTRING == 1
f656cf
+		} else if(!strcmp(modpblk.descr[i].name, "filenamerules")) {
f656cf
+			free(pData->fnRules);
f656cf
+			CHKiRet((array_to_rules(pvals[i].val.d.ar, &pData->fnRules)));
f656cf
+#endif
f656cf
+		} else if(!strcmp(modpblk.descr[i].name, "filenamerulebase")) {
f656cf
+			free(pData->fnRulebase);
f656cf
+			pData->fnRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
+			fp = fopen((const char*)pData->fnRulebase, "r");
f656cf
+			if(fp == NULL) {
f656cf
+				char errStr[1024];
f656cf
+				rs_strerror_r(errno, errStr, sizeof(errStr));
f656cf
+				iRet = RS_RET_NO_FILE_ACCESS;
f656cf
+				errmsg.LogError(0, iRet,
f656cf
+						"error: filenamerulebase file %s couldn't be accessed: %s\n",
f656cf
+						pData->fnRulebase, errStr);
f656cf
+				ABORT_FINALIZE(iRet);
f656cf
+			} else {
f656cf
+				fclose(fp);
f656cf
+			}
f656cf
+#if HAVE_LOADSAMPLESFROMSTRING == 1
f656cf
+		} else if(!strcmp(modpblk.descr[i].name, "containerrules")) {
f656cf
+			free(pData->contRules);
f656cf
+			CHKiRet((array_to_rules(pvals[i].val.d.ar, &pData->contRules)));
f656cf
+#endif
f656cf
+		} else if(!strcmp(modpblk.descr[i].name, "containerrulebase")) {
f656cf
+			free(pData->contRulebase);
f656cf
+			pData->contRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
f656cf
+			fp = fopen((const char*)pData->contRulebase, "r");
f656cf
+			if(fp == NULL) {
f656cf
+				char errStr[1024];
f656cf
+				rs_strerror_r(errno, errStr, sizeof(errStr));
f656cf
+				iRet = RS_RET_NO_FILE_ACCESS;
f656cf
+				errmsg.LogError(0, iRet,
f656cf
+						"error: containerrulebase file %s couldn't be accessed: %s\n",
f656cf
+						pData->contRulebase, errStr);
f656cf
+				ABORT_FINALIZE(iRet);
f656cf
+			} else {
f656cf
+				fclose(fp);
f656cf
+			}
f656cf
+		} else {
f656cf
+			dbgprintf("mmkubernetes: program error, non-handled "
f656cf
+				"param '%s' in action() block\n", actpblk.descr[i].name);
f656cf
+			/* todo: error message? */
f656cf
+		}
f656cf
+	}
f656cf
+
f656cf
+#if HAVE_LOADSAMPLESFROMSTRING == 1
f656cf
+	if (pData->fnRules && pData->fnRulebase) {
f656cf
+		errmsg.LogError(0, RS_RET_CONFIG_ERROR,
f656cf
+		    "mmkubernetes: only 1 of filenamerules or filenamerulebase may be used");
f656cf
+		ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
f656cf
+	}
f656cf
+	if (pData->contRules && pData->contRulebase) {
f656cf
+		errmsg.LogError(0, RS_RET_CONFIG_ERROR,
f656cf
+			"mmkubernetes: only 1 of containerrules or containerrulebase may be used");
f656cf
+		ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
f656cf
+	}
f656cf
+#endif
f656cf
+	CHKiRet(set_lnctx(&pData->fnCtxln, pData->fnRules, pData->fnRulebase,
f656cf
+			loadModConf->fnRules, loadModConf->fnRulebase));
f656cf
+	CHKiRet(set_lnctx(&pData->contCtxln, pData->contRules, pData->contRulebase,
f656cf
+			loadModConf->contRules, loadModConf->contRulebase));
f656cf
+
f656cf
+	if(pData->kubernetesUrl == NULL) {
f656cf
+		if(loadModConf->kubernetesUrl == NULL) {
f656cf
+			CHKmalloc(pData->kubernetesUrl = (uchar *) strdup(DFLT_KUBERNETES_URL));
f656cf
+		} else {
f656cf
+			CHKmalloc(pData->kubernetesUrl = (uchar *) strdup((char *) loadModConf->kubernetesUrl));
f656cf
+		}
f656cf
+	}
f656cf
+	if(pData->srcMetadataDescr == NULL) {
f656cf
+		CHKmalloc(pData->srcMetadataDescr = MALLOC(sizeof(msgPropDescr_t)));
f656cf
+		CHKiRet(msgPropDescrFill(pData->srcMetadataDescr, loadModConf->srcMetadataPath,
f656cf
+			strlen((char *)loadModConf->srcMetadataPath)));
f656cf
+	}
f656cf
+	if(pData->dstMetadataPath == NULL)
f656cf
+		pData->dstMetadataPath = (uchar *) strdup((char *) loadModConf->dstMetadataPath);
f656cf
+	if(pData->caCertFile == NULL && loadModConf->caCertFile)
f656cf
+		pData->caCertFile = (uchar *) strdup((char *) loadModConf->caCertFile);
f656cf
+	if(pData->token == NULL && loadModConf->token)
f656cf
+		pData->token = (uchar *) strdup((char *) loadModConf->token);
f656cf
+	if(pData->tokenFile == NULL && loadModConf->tokenFile)
f656cf
+		pData->tokenFile = (uchar *) strdup((char *) loadModConf->tokenFile);
f656cf
+	if(pData->de_dot_separator == NULL && loadModConf->de_dot_separator)
f656cf
+		pData->de_dot_separator = (uchar *) strdup((char *) loadModConf->de_dot_separator);
f656cf
+	if((pData->annotation_match.nmemb == 0) && (loadModConf->annotation_match.nmemb > 0))
f656cf
+		copy_annotationmatch(&loadModConf->annotation_match, &pData->annotation_match);
f656cf
+
f656cf
+	if(pData->de_dot_separator)
f656cf
+		pData->de_dot_separator_len = strlen((const char *)pData->de_dot_separator);
f656cf
+
f656cf
+	CHKmalloc(pData->contNameDescr = MALLOC(sizeof(msgPropDescr_t)));
f656cf
+	CHKiRet(msgPropDescrFill(pData->contNameDescr, (uchar*) DFLT_CONTAINER_NAME,
f656cf
+			strlen(DFLT_CONTAINER_NAME)));
f656cf
+	CHKmalloc(pData->contIdFullDescr = MALLOC(sizeof(msgPropDescr_t)));
f656cf
+	CHKiRet(msgPropDescrFill(pData->contIdFullDescr, (uchar*) DFLT_CONTAINER_ID_FULL,
f656cf
+			strlen(DFLT_CONTAINER_NAME)));
f656cf
+
f656cf
+	/* get the cache for this url */
f656cf
+	for(i = 0; caches[i] != NULL; i++) {
f656cf
+		if(!strcmp((char *) pData->kubernetesUrl, (char *) caches[i]->kbUrl))
f656cf
+			break;
f656cf
+	}
f656cf
+	if(caches[i] != NULL) {
f656cf
+		pData->cache = caches[i];
f656cf
+	} else {
f656cf
+		CHKmalloc(pData->cache = cacheNew(pData->kubernetesUrl));
f656cf
+
f656cf
+		CHKmalloc(caches = realloc(caches, (i + 2) * sizeof(struct cache_s *)));
f656cf
+		caches[i] = pData->cache;
f656cf
+		caches[i + 1] = NULL;
f656cf
+	}
f656cf
+CODE_STD_FINALIZERnewActInst
f656cf
+	if(pvals != NULL)
f656cf
+		cnfparamvalsDestruct(pvals, &actpblk);
f656cf
+	free(rxstr);
f656cf
+	free(srcMetadataPath);
f656cf
+ENDnewActInst
f656cf
+
f656cf
+
f656cf
+/* legacy config format is not supported */
f656cf
+BEGINparseSelectorAct
f656cf
+CODESTARTparseSelectorAct
f656cf
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
f656cf
+	if(strncmp((char *) p, ":mmkubernetes:", sizeof(":mmkubernetes:") - 1)) {
f656cf
+		errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
f656cf
+			"mmkubernetes supports only v6+ config format, use: "
f656cf
+			"action(type=\"mmkubernetes\" ...)");
f656cf
+	}
f656cf
+	ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
f656cf
+CODE_STD_FINALIZERparseSelectorAct
f656cf
+ENDparseSelectorAct
f656cf
+
f656cf
+
f656cf
+BEGINendCnfLoad
f656cf
+CODESTARTendCnfLoad
f656cf
+ENDendCnfLoad
f656cf
+
f656cf
+
f656cf
+BEGINcheckCnf
f656cf
+CODESTARTcheckCnf
f656cf
+ENDcheckCnf
f656cf
+
f656cf
+
f656cf
+BEGINactivateCnf
f656cf
+CODESTARTactivateCnf
f656cf
+	runModConf = pModConf;
f656cf
+ENDactivateCnf
f656cf
+
f656cf
+
f656cf
+BEGINfreeCnf
f656cf
+CODESTARTfreeCnf
f656cf
+	int i;
f656cf
+
f656cf
+	free(pModConf->kubernetesUrl);
f656cf
+	free(pModConf->srcMetadataPath);
f656cf
+	free(pModConf->dstMetadataPath);
f656cf
+	free(pModConf->caCertFile);
f656cf
+	free(pModConf->token);
f656cf
+	free(pModConf->tokenFile);
f656cf
+	free(pModConf->de_dot_separator);
f656cf
+	free(pModConf->fnRules);
f656cf
+	free(pModConf->fnRulebase);
f656cf
+	free(pModConf->contRules);
f656cf
+	free(pModConf->contRulebase);
f656cf
+	free_annotationmatch(&pModConf->annotation_match);
f656cf
+	for(i = 0; caches[i] != NULL; i++)
f656cf
+		cacheFree(caches[i]);
f656cf
+	free(caches);
f656cf
+ENDfreeCnf
f656cf
+
f656cf
+
f656cf
+BEGINdbgPrintInstInfo
f656cf
+CODESTARTdbgPrintInstInfo
f656cf
+	dbgprintf("mmkubernetes\n");
f656cf
+	dbgprintf("\tkubernetesUrl='%s'\n", pData->kubernetesUrl);
f656cf
+	dbgprintf("\tsrcMetadataPath='%s'\n", pData->srcMetadataDescr->name);
f656cf
+	dbgprintf("\tdstMetadataPath='%s'\n", pData->dstMetadataPath);
f656cf
+	dbgprintf("\ttls.cacert='%s'\n", pData->caCertFile);
f656cf
+	dbgprintf("\tallowUnsignedCerts='%d'\n", pData->allowUnsignedCerts);
f656cf
+	dbgprintf("\ttoken='%s'\n", pData->token);
f656cf
+	dbgprintf("\ttokenFile='%s'\n", pData->tokenFile);
f656cf
+	dbgprintf("\tde_dot='%d'\n", pData->de_dot);
f656cf
+	dbgprintf("\tde_dot_separator='%s'\n", pData->de_dot_separator);
f656cf
+	dbgprintf("\tfilenamerulebase='%s'\n", pData->fnRulebase);
f656cf
+	dbgprintf("\tcontainerrulebase='%s'\n", pData->contRulebase);
f656cf
+#if HAVE_LOADSAMPLESFROMSTRING == 1
f656cf
+	dbgprintf("\tfilenamerules='%s'\n", pData->fnRules);
f656cf
+	dbgprintf("\tcontainerrules='%s'\n", pData->contRules);
f656cf
+#endif
f656cf
+ENDdbgPrintInstInfo
f656cf
+
f656cf
+
f656cf
+BEGINtryResume
f656cf
+CODESTARTtryResume
f656cf
+ENDtryResume
f656cf
+
f656cf
+static rsRetVal
f656cf
+extractMsgMetadata(smsg_t *pMsg, instanceData *pData, struct json_object **json)
f656cf
+{
f656cf
+	DEFiRet;
f656cf
+	uchar *filename = NULL, *container_name = NULL, *container_id_full = NULL;
f656cf
+	rs_size_t fnLen, container_name_len, container_id_full_len;
f656cf
+	unsigned short freeFn = 0, free_container_name = 0, free_container_id_full = 0;
f656cf
+	int lnret;
f656cf
+	struct json_object *cnid = NULL;
f656cf
+
f656cf
+	if (!json)
f656cf
+		FINALIZE;
f656cf
+	*json = NULL;
f656cf
+	/* extract metadata from the CONTAINER_NAME field and see if CONTAINER_ID_FULL is present */
f656cf
+	container_name = MsgGetProp(pMsg, NULL, pData->contNameDescr,
f656cf
+				    &container_name_len, &free_container_name, NULL);
f656cf
+	container_id_full = MsgGetProp(
f656cf
+		pMsg, NULL, pData->contIdFullDescr, &container_id_full_len, &free_container_id_full, NULL);
f656cf
+
f656cf
+	if (container_name && container_id_full && container_name_len && container_id_full_len) {
f656cf
+		dbgprintf("mmkubernetes: CONTAINER_NAME: '%s'  CONTAINER_ID_FULL: '%s'.\n",
f656cf
+			  container_name, container_id_full);
f656cf
+		if ((lnret = ln_normalize(pData->contCtxln, (char*)container_name,
f656cf
+					  container_name_len, json))) {
f656cf
+			if (LN_WRONGPARSER != lnret) {
f656cf
+				LogMsg(0, RS_RET_ERR, LOG_ERR,
f656cf
+					"mmkubernetes: error parsing container_name [%s]: [%d]",
f656cf
+					container_name, lnret);
f656cf
+
f656cf
+				ABORT_FINALIZE(RS_RET_ERR);
f656cf
+			}
f656cf
+			/* else assume parser didn't find a match and fall through */
f656cf
+		} else if (fjson_object_object_get_ex(*json, "pod_name", NULL) &&
f656cf
+			fjson_object_object_get_ex(*json, "namespace_name", NULL) &&
f656cf
+			fjson_object_object_get_ex(*json, "container_name", NULL)) {
f656cf
+			/* if we have fields for pod name, namespace name, container name,
f656cf
+			 * and container id, we are good to go */
f656cf
+			/* add field for container id */
f656cf
+			json_object_object_add(*json, "container_id",
f656cf
+				json_object_new_string_len((const char *)container_id_full,
f656cf
+							   container_id_full_len));
f656cf
+			ABORT_FINALIZE(RS_RET_OK);
f656cf
+		}
f656cf
+	}
f656cf
+
f656cf
+	/* extract metadata from the file name */
f656cf
+	filename = MsgGetProp(pMsg, NULL, pData->srcMetadataDescr, &fnLen, &freeFn, NULL);
f656cf
+	if((filename == NULL) || (fnLen == 0))
f656cf
+		ABORT_FINALIZE(RS_RET_NOT_FOUND);
f656cf
+
f656cf
+	dbgprintf("mmkubernetes: filename: '%s' len %d.\n", filename, fnLen);
f656cf
+	if ((lnret = ln_normalize(pData->fnCtxln, (char*)filename, fnLen, json))) {
f656cf
+		if (LN_WRONGPARSER != lnret) {
f656cf
+			LogMsg(0, RS_RET_ERR, LOG_ERR,
f656cf
+				"mmkubernetes: error parsing container_name [%s]: [%d]",
f656cf
+				filename, lnret);
f656cf
+
f656cf
+			ABORT_FINALIZE(RS_RET_ERR);
f656cf
+		} else {
f656cf
+			/* no match */
f656cf
+			ABORT_FINALIZE(RS_RET_NOT_FOUND);
f656cf
+		}
f656cf
+}
f656cf
+	/* if we have fields for pod name, namespace name, container name,
f656cf
+	 * and container id, we are good to go */
f656cf
+	if (fjson_object_object_get_ex(*json, "pod_name", NULL) &&
f656cf
+		fjson_object_object_get_ex(*json, "namespace_name", NULL) &&
f656cf
+		fjson_object_object_get_ex(*json, "container_name_and_id", &cnid)) {
f656cf
+		/* parse container_name_and_id into container_name and container_id */
f656cf
+		const char *container_name_and_id = json_object_get_string(cnid);
f656cf
+		const char *last_dash = NULL;
f656cf
+		if (container_name_and_id && (last_dash = strrchr(container_name_and_id, '-')) &&
f656cf
+			*(last_dash + 1) && (last_dash != container_name_and_id)) {
f656cf
+			json_object_object_add(*json, "container_name",
f656cf
+				json_object_new_string_len(container_name_and_id,
f656cf
+							   (int)(last_dash-container_name_and_id)));
f656cf
+			json_object_object_add(*json, "container_id",
f656cf
+					json_object_new_string(last_dash + 1));
f656cf
+			ABORT_FINALIZE(RS_RET_OK);
f656cf
+		}
f656cf
+	}
f656cf
+	ABORT_FINALIZE(RS_RET_NOT_FOUND);
f656cf
+finalize_it:
f656cf
+	if(freeFn)
f656cf
+		free(filename);
f656cf
+	if (free_container_name)
f656cf
+		free(container_name);
f656cf
+	if (free_container_id_full)
f656cf
+		free(container_id_full);
f656cf
+	if (iRet != RS_RET_OK) {
f656cf
+		json_object_put(*json);
f656cf
+		*json = NULL;
f656cf
+	}
f656cf
+	RETiRet;
f656cf
+}
f656cf
+
f656cf
+
f656cf
+static rsRetVal
f656cf
+queryKB(wrkrInstanceData_t *pWrkrData, char *url, struct json_object **rply)
f656cf
+{
f656cf
+	DEFiRet;
f656cf
+	CURLcode ccode;
f656cf
+	struct json_tokener *jt = NULL;
f656cf
+	struct json_object *jo;
f656cf
+	long resp_code = 400;
f656cf
+
f656cf
+	/* query kubernetes for pod info */
f656cf
+	ccode = curl_easy_setopt(pWrkrData->curlCtx, CURLOPT_URL, url);
f656cf
+	if(ccode != CURLE_OK)
f656cf
+		ABORT_FINALIZE(RS_RET_ERR);
f656cf
+	if(CURLE_OK != (ccode = curl_easy_perform(pWrkrData->curlCtx))) {
f656cf
+		errmsg.LogMsg(0, RS_RET_ERR, LOG_ERR,
f656cf
+			      "mmkubernetes: failed to connect to [%s] - %d:%s\n",
f656cf
+			      url, ccode, curl_easy_strerror(ccode));
f656cf
+		ABORT_FINALIZE(RS_RET_ERR);
f656cf
+	}
f656cf
+	if(CURLE_OK != (ccode = curl_easy_getinfo(pWrkrData->curlCtx,
f656cf
+					CURLINFO_RESPONSE_CODE, &resp_code))) {
f656cf
+		errmsg.LogMsg(0, RS_RET_ERR, LOG_ERR,
f656cf
+			      "mmkubernetes: could not get response code from query to [%s] - %d:%s\n",
f656cf
+			      url, ccode, curl_easy_strerror(ccode));
f656cf
+		ABORT_FINALIZE(RS_RET_ERR);
f656cf
+	}
f656cf
+	if(resp_code == 401) {
f656cf
+		errmsg.LogMsg(0, RS_RET_ERR, LOG_ERR,
f656cf
+			      "mmkubernetes: Unauthorized: not allowed to view url - "
f656cf
+			      "check token/auth credentials [%s]\n",
f656cf
+			      url);
f656cf
+		ABORT_FINALIZE(RS_RET_ERR);
f656cf
+	}
f656cf
+	if(resp_code == 403) {
f656cf
+		errmsg.LogMsg(0, RS_RET_ERR, LOG_ERR,
f656cf
+			      "mmkubernetes: Forbidden: no access - "
f656cf
+			      "check permissions to view url [%s]\n",
f656cf
+			      url);
f656cf
+		ABORT_FINALIZE(RS_RET_ERR);
f656cf
+	}
f656cf
+	if(resp_code == 404) {
f656cf
+		errmsg.LogMsg(0, RS_RET_ERR, LOG_ERR,
f656cf
+			      "mmkubernetes: Not Found: the resource does not exist at url [%s]\n",
f656cf
+			      url);
f656cf
+		ABORT_FINALIZE(RS_RET_ERR);
f656cf
+	}
f656cf
+	if(resp_code == 429) {
f656cf
+		errmsg.LogMsg(0, RS_RET_ERR, LOG_ERR,
f656cf
+			      "mmkubernetes: Too Many Requests: the server is too heavily loaded "
f656cf
+			      "to provide the data for the requested url [%s]\n",
f656cf
+			      url);
f656cf
+		ABORT_FINALIZE(RS_RET_ERR);
f656cf
+	}
f656cf
+	if(resp_code != 200) {
f656cf
+		errmsg.LogMsg(0, RS_RET_ERR, LOG_ERR,
f656cf
+			      "mmkubernetes: server returned unexpected code [%ld] for url [%s]\n",
f656cf
+			      resp_code, url);
f656cf
+		ABORT_FINALIZE(RS_RET_ERR);
f656cf
+	}
f656cf
+	/* parse retrieved data */
f656cf
+	jt = json_tokener_new();
f656cf
+	json_tokener_reset(jt);
f656cf
+	jo = json_tokener_parse_ex(jt, pWrkrData->curlRply, pWrkrData->curlRplyLen);
f656cf
+	json_tokener_free(jt);
f656cf
+	if(!json_object_is_type(jo, json_type_object)) {
f656cf
+		json_object_put(jo);
f656cf
+		jo = NULL;
f656cf
+		errmsg.LogMsg(0, RS_RET_JSON_PARSE_ERR, LOG_INFO,
f656cf
+			      "mmkubernetes: unable to parse string as JSON:[%.*s]\n",
f656cf
+			      (int)pWrkrData->curlRplyLen, pWrkrData->curlRply);
f656cf
+		ABORT_FINALIZE(RS_RET_JSON_PARSE_ERR);
f656cf
+	}
f656cf
+
f656cf
+	dbgprintf("mmkubernetes: queryKB reply:\n%s\n",
f656cf
+		json_object_to_json_string_ext(jo, JSON_C_TO_STRING_PRETTY));
f656cf
+
f656cf
+	*rply = jo;
f656cf
+
f656cf
+finalize_it:
f656cf
+	if(pWrkrData->curlRply != NULL) {
f656cf
+		free(pWrkrData->curlRply);
f656cf
+		pWrkrData->curlRply = NULL;
f656cf
+		pWrkrData->curlRplyLen = 0;
f656cf
+	}
f656cf
+	RETiRet;
f656cf
+}
f656cf
+
f656cf
+
f656cf
+/* versions < 8.16.0 don't support BEGINdoAction_NoStrings */
f656cf
+#if defined(BEGINdoAction_NoStrings)
f656cf
+BEGINdoAction_NoStrings
f656cf
+	smsg_t **ppMsg = (smsg_t **) pMsgData;
f656cf
+	smsg_t *pMsg = ppMsg[0];
f656cf
+#else
f656cf
+BEGINdoAction
f656cf
+	smsg_t *pMsg = (smsg_t*) ppString[0];
f656cf
+#endif
f656cf
+	const char *podName = NULL, *ns = NULL, *containerName = NULL,
f656cf
+		*containerID = NULL;
f656cf
+	char *mdKey = NULL;
f656cf
+	struct json_object *jMetadata = NULL, *jMetadataCopy = NULL, *jMsgMeta = NULL,
f656cf
+			*jo = NULL;
f656cf
+	int add_ns_metadata = 0;
f656cf
+CODESTARTdoAction
f656cf
+	CHKiRet_Hdlr(extractMsgMetadata(pMsg, pWrkrData->pData, &jMsgMeta)) {
f656cf
+		ABORT_FINALIZE((iRet == RS_RET_NOT_FOUND) ? RS_RET_OK : iRet);
f656cf
+	}
f656cf
+
f656cf
+	if (fjson_object_object_get_ex(jMsgMeta, "pod_name", &jo))
f656cf
+		podName = json_object_get_string(jo);
f656cf
+	if (fjson_object_object_get_ex(jMsgMeta, "namespace_name", &jo))
f656cf
+		ns = json_object_get_string(jo);
f656cf
+	if (fjson_object_object_get_ex(jMsgMeta, "container_name", &jo))
f656cf
+		containerName = json_object_get_string(jo);
f656cf
+	if (fjson_object_object_get_ex(jMsgMeta, "container_id", &jo))
f656cf
+		containerID = json_object_get_string(jo);
f656cf
+	assert(podName != NULL);
f656cf
+	assert(ns != NULL);
f656cf
+	assert(containerName != NULL);
f656cf
+	assert(containerID != NULL);
f656cf
+
f656cf
+	dbgprintf("mmkubernetes:\n  podName: '%s'\n  namespace: '%s'\n  containerName: '%s'\n"
f656cf
+		"  containerID: '%s'\n", podName, ns, containerName, containerID);
f656cf
+
f656cf
+	/* check cache for metadata */
f656cf
+	if ((-1 == asprintf(&mdKey, "%s_%s_%s", ns, podName, containerName)) ||
f656cf
+		(!mdKey)) {
f656cf
+		ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
f656cf
+	}
f656cf
+	pthread_mutex_lock(pWrkrData->pData->cache->cacheMtx);
f656cf
+	jMetadata = hashtable_search(pWrkrData->pData->cache->mdHt, mdKey);
f656cf
+
f656cf
+	if(jMetadata == NULL) {
f656cf
+		char *url = NULL;
f656cf
+		struct json_object *jReply = NULL, *jo2 = NULL, *jNsMeta = NULL, *jPodData = NULL;
f656cf
+
f656cf
+		/* check cache for namespace metadata */
f656cf
+		jNsMeta = hashtable_search(pWrkrData->pData->cache->nsHt, (char *)ns);
f656cf
+
f656cf
+		if(jNsMeta == NULL) {
f656cf
+			/* query kubernetes for namespace info */
f656cf
+			/* todo: move url definitions elsewhere */
f656cf
+			if ((-1 == asprintf(&url, "%s/api/v1/namespaces/%s",
f656cf
+				 (char *) pWrkrData->pData->kubernetesUrl, ns)) ||
f656cf
+				(!url)) {
f656cf
+				pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
f656cf
+				ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
f656cf
+			}
f656cf
+			iRet = queryKB(pWrkrData, url, &jReply);
f656cf
+			free(url);
f656cf
+			/* todo: implement support for the .orphaned namespace */
f656cf
+			if (iRet != RS_RET_OK) {
f656cf
+				json_object_put(jReply);
f656cf
+				jReply = NULL;
f656cf
+				pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
f656cf
+				FINALIZE;
f656cf
+			}
f656cf
+
f656cf
+			if(fjson_object_object_get_ex(jReply, "metadata", &jNsMeta)) {
f656cf
+				jNsMeta = json_object_get(jNsMeta);
f656cf
+				parse_labels_annotations(jNsMeta, &pWrkrData->pData->annotation_match,
f656cf
+					pWrkrData->pData->de_dot,
f656cf
+					(const char *)pWrkrData->pData->de_dot_separator,
f656cf
+					pWrkrData->pData->de_dot_separator_len);
f656cf
+				add_ns_metadata = 1;
f656cf
+			} else {
f656cf
+				/* namespace with no metadata??? */
f656cf
+				errmsg.LogMsg(0, RS_RET_ERR, LOG_INFO,
f656cf
+					      "mmkubernetes: namespace [%s] has no metadata!\n", ns);
f656cf
+				jNsMeta = NULL;
f656cf
+			}
f656cf
+
f656cf
+			json_object_put(jReply);
f656cf
+			jReply = NULL;
f656cf
+		}
f656cf
+
f656cf
+		if ((-1 == asprintf(&url, "%s/api/v1/namespaces/%s/pods/%s",
f656cf
+			 (char *) pWrkrData->pData->kubernetesUrl, ns, podName)) ||
f656cf
+			(!url)) {
f656cf
+			pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
f656cf
+			ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
f656cf
+		}
f656cf
+		iRet = queryKB(pWrkrData, url, &jReply);
f656cf
+		free(url);
f656cf
+		if(iRet != RS_RET_OK) {
f656cf
+			if(jNsMeta && add_ns_metadata) {
f656cf
+				hashtable_insert(pWrkrData->pData->cache->nsHt, strdup(ns), jNsMeta);
f656cf
+			}
f656cf
+			json_object_put(jReply);
f656cf
+			jReply = NULL;
f656cf
+			pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
f656cf
+			FINALIZE;
f656cf
+		}
f656cf
+
f656cf
+		jo = json_object_new_object();
f656cf
+		if(jNsMeta && fjson_object_object_get_ex(jNsMeta, "uid", &jo2))
f656cf
+			json_object_object_add(jo, "namespace_id", json_object_get(jo2));
f656cf
+		if(jNsMeta && fjson_object_object_get_ex(jNsMeta, "labels", &jo2))
f656cf
+			json_object_object_add(jo, "namespace_labels", json_object_get(jo2));
f656cf
+		if(jNsMeta && fjson_object_object_get_ex(jNsMeta, "annotations", &jo2))
f656cf
+			json_object_object_add(jo, "namespace_annotations", json_object_get(jo2));
f656cf
+		if(jNsMeta && fjson_object_object_get_ex(jNsMeta, "creationTimestamp", &jo2))
f656cf
+			json_object_object_add(jo, "creation_timestamp", json_object_get(jo2));
f656cf
+		if(fjson_object_object_get_ex(jReply, "metadata", &jPodData)) {
f656cf
+			if(fjson_object_object_get_ex(jPodData, "uid", &jo2))
f656cf
+				json_object_object_add(jo, "pod_id", json_object_get(jo2));
f656cf
+			parse_labels_annotations(jPodData, &pWrkrData->pData->annotation_match,
f656cf
+				pWrkrData->pData->de_dot,
f656cf
+				(const char *)pWrkrData->pData->de_dot_separator,
f656cf
+				pWrkrData->pData->de_dot_separator_len);
f656cf
+			if(fjson_object_object_get_ex(jPodData, "annotations", &jo2))
f656cf
+				json_object_object_add(jo, "annotations", json_object_get(jo2));
f656cf
+			if(fjson_object_object_get_ex(jPodData, "labels", &jo2))
f656cf
+				json_object_object_add(jo, "labels", json_object_get(jo2));
f656cf
+		}
f656cf
+		if(fjson_object_object_get_ex(jReply, "spec", &jPodData)) {
f656cf
+			if(fjson_object_object_get_ex(jPodData, "nodeName", &jo2)) {
f656cf
+				json_object_object_add(jo, "host", json_object_get(jo2));
f656cf
+			}
f656cf
+		}
f656cf
+		json_object_put(jReply);
f656cf
+		jReply = NULL;
f656cf
+
f656cf
+		if (fjson_object_object_get_ex(jMsgMeta, "pod_name", &jo2))
f656cf
+			json_object_object_add(jo, "pod_name", json_object_get(jo2));
f656cf
+		if (fjson_object_object_get_ex(jMsgMeta, "namespace_name", &jo2))
f656cf
+			json_object_object_add(jo, "namespace_name", json_object_get(jo2));
f656cf
+		if (fjson_object_object_get_ex(jMsgMeta, "container_name", &jo2))
f656cf
+			json_object_object_add(jo, "container_name", json_object_get(jo2));
f656cf
+		json_object_object_add(jo, "master_url",
f656cf
+			json_object_new_string((const char *)pWrkrData->pData->kubernetesUrl));
f656cf
+		jMetadata = json_object_new_object();
f656cf
+		json_object_object_add(jMetadata, "kubernetes", jo);
f656cf
+		jo = json_object_new_object();
f656cf
+		if (fjson_object_object_get_ex(jMsgMeta, "container_id", &jo2))
f656cf
+			json_object_object_add(jo, "container_id", json_object_get(jo2));
f656cf
+		json_object_object_add(jMetadata, "docker", jo);
f656cf
+
f656cf
+		hashtable_insert(pWrkrData->pData->cache->mdHt, mdKey, jMetadata);
f656cf
+		mdKey = NULL;
f656cf
+		if(jNsMeta && add_ns_metadata) {
f656cf
+			hashtable_insert(pWrkrData->pData->cache->nsHt, strdup(ns), jNsMeta);
f656cf
+			ns = NULL;
f656cf
+		}
f656cf
+	}
f656cf
+
f656cf
+	/* make a copy of the metadata for the msg to own */
f656cf
+	/* todo: use json_object_deep_copy when implementation available in libfastjson */
f656cf
+	/* yes, this is expensive - but there is no other way to make this thread safe - we
f656cf
+	 * can't allow the msg to have a shared pointer to an element inside the cache,
f656cf
+	 * outside of the cache lock
f656cf
+	 */
f656cf
+	jMetadataCopy = json_tokener_parse(json_object_get_string(jMetadata));
f656cf
+	pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
f656cf
+	/* the +1 is there to skip the leading '$' */
f656cf
+	msgAddJSON(pMsg, (uchar *) pWrkrData->pData->dstMetadataPath + 1, jMetadataCopy, 0, 0);
f656cf
+
f656cf
+finalize_it:
f656cf
+	json_object_put(jMsgMeta);
f656cf
+	free(mdKey);
f656cf
+ENDdoAction
f656cf
+
f656cf
+
f656cf
+BEGINisCompatibleWithFeature
f656cf
+CODESTARTisCompatibleWithFeature
f656cf
+ENDisCompatibleWithFeature
f656cf
+
f656cf
+
f656cf
+/* all the macros bellow have to be in a specific order */
f656cf
+BEGINmodExit
f656cf
+CODESTARTmodExit
f656cf
+	curl_global_cleanup();
f656cf
+
f656cf
+	objRelease(regexp, LM_REGEXP_FILENAME);
f656cf
+	objRelease(errmsg, CORE_COMPONENT);
f656cf
+ENDmodExit
f656cf
+
f656cf
+
f656cf
+BEGINqueryEtryPt
f656cf
+CODESTARTqueryEtryPt
f656cf
+CODEqueryEtryPt_STD_OMOD_QUERIES
f656cf
+CODEqueryEtryPt_STD_OMOD8_QUERIES
f656cf
+CODEqueryEtryPt_STD_CONF2_QUERIES
f656cf
+CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
f656cf
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
f656cf
+ENDqueryEtryPt
f656cf
+
f656cf
+
f656cf
+BEGINmodInit()
f656cf
+CODESTARTmodInit
f656cf
+	*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
f656cf
+CODEmodInit_QueryRegCFSLineHdlr
f656cf
+	DBGPRINTF("mmkubernetes: module compiled with rsyslog version %s.\n", VERSION);
f656cf
+	CHKiRet(objUse(errmsg, CORE_COMPONENT));
f656cf
+	CHKiRet(objUse(regexp, LM_REGEXP_FILENAME));
f656cf
+
f656cf
+	/* CURL_GLOBAL_ALL initializes more than is needed but the
f656cf
+	 * libcurl documentation discourages use of other values
f656cf
+	 */
f656cf
+	curl_global_init(CURL_GLOBAL_ALL);
f656cf
+ENDmodInit
f656cf
diff --git a/contrib/mmkubernetes/sample.conf b/contrib/mmkubernetes/sample.conf
f656cf
new file mode 100644
f656cf
index 000000000..4c400ed51
f656cf
--- /dev/null
f656cf
+++ b/contrib/mmkubernetes/sample.conf
f656cf
@@ -0,0 +1,7 @@
f656cf
+module(load="mmkubernetes") # see docs for all module and action parameters
f656cf
+
f656cf
+# $!metadata!filename added by imfile using addmetadata="on"
f656cf
+# e.g. input(type="imfile" file="/var/log/containers/*.log" tag="kubernetes" addmetadata="on")
f656cf
+# $!CONTAINER_NAME and $!CONTAINER_ID_FULL added by imjournal
f656cf
+
f656cf
+action(type="mmkubernetes")
f656cf
-- 
f656cf
2.14.4
f656cf