Blame SOURCES/rsyslog-8.24.0-rhbz1539193-mmkubernetes-new-plugin.patch

c17bfd
From: Jiri Vymazal <jvymazal@redhat.com>
c17bfd
Date: Mon, 28 Jun 2018 12:07:55 +0100
c17bfd
Subject: Kubernetes Metadata plugin - mmkubernetes
c17bfd
c17bfd
This plugin is used to annotate records logged by Kubernetes containers.
c17bfd
It will add the namespace uuid, pod uuid, pod and namespace labels and
c17bfd
annotations, and other metadata associated with the pod and namespace.
c17bfd
It will work with either log files in `/var/log/containers/*.log` or
c17bfd
with journald entries with `CONTAINER_NAME` and `CONTAINER_ID_FULL`.
c17bfd
c17bfd
For usage and configuration see syslog-doc
c17bfd
c17bfd
*Credits*
c17bfd
c17bfd
This work is based on https://github.com/fabric8io/fluent-plugin-kubernetes_metadata_filter
c17bfd
and has many of the same features.
c17bfd
c17bfd
(cherry picked from commit a6264bf8f91975c9bc0fc602dcdc6881486f1579)
c17bfd
(cherry picked from commit b8e68366422052dca9e0a9409baa410f20ae88c8)
c17bfd
c17bfd
(cherry picked from commit 77886e21292d8220f93b3404236da0e8f7159255)
c17bfd
(cherry picked from commit e4d1c7b3832eedc8a1545c2ee6bf022f545d0c76)
c17bfd
(cherry picked from commit 3d9f820642b0edc78da0b5bed818590dcd31fa9c)
c17bfd
(cherry picked from commit 1d49aac5cb101704486bfb065fac362ca69f06bc)
c17bfd
(cherry picked from commit fc2ad45f78dd666b8c9e706ad88c17aaff146d2d)
c17bfd
(cherry picked from commit 8cf87f64f6c74a4544112ec7fddc5bf4d43319a7)
c17bfd
---
c17bfd
 Makefile.am                                      |    5 +
c17bfd
 configure.ac                                     |   35 +
c17bfd
 contrib/mmkubernetes/Makefile.am                 |    6 +
c17bfd
 contrib/mmkubernetes/k8s_container_name.rulebase |    3 +
c17bfd
 contrib/mmkubernetes/k8s_filename.rulebase       |    2 +
c17bfd
 contrib/mmkubernetes/mmkubernetes.c              | 1491 +++++++++++++++++++++++
c17bfd
 contrib/mmkubernetes/sample.conf                 |    7 +
c17bfd
 7 files changed, 1549 insertions(+)
c17bfd
 create mode 100644 contrib/mmkubernetes/Makefile.am
c17bfd
 create mode 100644 contrib/mmkubernetes/k8s_container_name.rulebase
c17bfd
 create mode 100644 contrib/mmkubernetes/k8s_filename.rulebase
c17bfd
 create mode 100644 contrib/mmkubernetes/mmkubernetes.c
c17bfd
 create mode 100644 contrib/mmkubernetes/sample.conf
c17bfd
c17bfd
diff --git a/Makefile.am b/Makefile.am
c17bfd
index a276ef9ea..b58ebaf93 100644
c17bfd
--- a/Makefile.am
c17bfd
+++ b/Makefile.am
c17bfd
@@ -275,6 +275,11 @@ if ENABLE_OMTCL
c17bfd
 SUBDIRS += contrib/omtcl
c17bfd
 endif
c17bfd
 
c17bfd
+# mmkubernetes
c17bfd
+if ENABLE_MMKUBERNETES
c17bfd
+SUBDIRS += contrib/mmkubernetes
c17bfd
+endif
c17bfd
+
c17bfd
 # tests are added as last element, because tests may need different
c17bfd
 # modules that need to be generated first
c17bfd
 SUBDIRS += tests
c17bfd
diff --git a/configure.ac b/configure.ac
c17bfd
index a9411f4be..c664222b9 100644
c17bfd
--- a/configure.ac
c17bfd
+++ b/configure.ac
c17bfd
@@ -1889,6 +1889,39 @@ AM_CONDITIONAL(ENABLE_OMTCL, test x$enable_omtcl = xyes)
c17bfd
 
c17bfd
 # END TCL SUPPORT
c17bfd
 
c17bfd
+# mmkubernetes - Kubernetes metadata support
c17bfd
+
c17bfd
+AC_ARG_ENABLE(mmkubernetes,
c17bfd
+        [AS_HELP_STRING([--enable-mmkubernetes],
c17bfd
+            [Enable compilation of the mmkubernetes module @<:@default=no@:>@])],
c17bfd
+        [case "${enableval}" in
c17bfd
+         yes) enable_mmkubernetes="yes" ;;
c17bfd
+          no) enable_mmkubernetes="no" ;;
c17bfd
+           *) AC_MSG_ERROR(bad value ${enableval} for --enable-mmkubernetes) ;;
c17bfd
+         esac],
c17bfd
+        [enable_mmkubernetes=no]
c17bfd
+)
c17bfd
+if test "x$enable_mmkubernetes" = "xyes"; then
c17bfd
+        PKG_CHECK_MODULES([CURL], [libcurl])
c17bfd
+        PKG_CHECK_MODULES(LIBLOGNORM, lognorm >= 2.0.3)
c17bfd
+
c17bfd
+        save_CFLAGS="$CFLAGS"
c17bfd
+        save_LIBS="$LIBS"
c17bfd
+
c17bfd
+        CFLAGS="$CFLAGS $LIBLOGNORM_CFLAGS"
c17bfd
+        LIBS="$LIBS $LIBLOGNORM_LIBS"
c17bfd
+
c17bfd
+        AC_CHECK_FUNC([ln_loadSamplesFromString],
c17bfd
+                      [AC_DEFINE([HAVE_LOADSAMPLESFROMSTRING], [1], [Define if ln_loadSamplesFromString exists.])],
c17bfd
+                      [AC_DEFINE([NO_LOADSAMPLESFROMSTRING], [1], [Define if ln_loadSamplesFromString does not exist.])])
c17bfd
+
c17bfd
+        CFLAGS="$save_CFLAGS"
c17bfd
+        LIBS="$save_LIBS"
c17bfd
+fi
c17bfd
+AM_CONDITIONAL(ENABLE_MMKUBERNETES, test x$enable_mmkubernetes = xyes)
c17bfd
+
c17bfd
+# END Kubernetes metadata support
c17bfd
+
c17bfd
 # man pages
c17bfd
 AC_CHECKING([if required man pages already exist])
c17bfd
 have_to_generate_man_pages="no"
c17bfd
@@ -2016,6 +2035,7 @@ AC_CONFIG_FILES([Makefile \
c17bfd
 		contrib/omhttpfs/Makefile \
c17bfd
 		contrib/omamqp1/Makefile \
c17bfd
 		contrib/omtcl/Makefile \
c17bfd
+		contrib/mmkubernetes/Makefile \
c17bfd
 		tests/Makefile])
c17bfd
 AC_OUTPUT
c17bfd
 
c17bfd
@@ -2090,6 +2110,7 @@ echo "    mmrfc5424addhmac enabled:                 $enable_mmrfc5424addhmac"
c17bfd
 echo "    mmpstrucdata enabled:                     $enable_mmpstrucdata"
c17bfd
 echo "    mmsequence enabled:                       $enable_mmsequence"
c17bfd
 echo "    mmdblookup enabled:                       $enable_mmdblookup"
c17bfd
+echo "    mmkubernetes enabled:                     $enable_mmkubernetes"
c17bfd
 echo
c17bfd
 echo "---{ database support }---"
c17bfd
 echo "    MySql support enabled:                    $enable_mysql"
c17bfd
diff --git a/contrib/mmkubernetes/Makefile.am b/contrib/mmkubernetes/Makefile.am
c17bfd
new file mode 100644
c17bfd
index 000000000..3dcc235a6
c17bfd
--- /dev/null
c17bfd
+++ b/contrib/mmkubernetes/Makefile.am
c17bfd
@@ -0,0 +1,6 @@
c17bfd
+pkglib_LTLIBRARIES = mmkubernetes.la
c17bfd
+
c17bfd
+mmkubernetes_la_SOURCES = mmkubernetes.c
c17bfd
+mmkubernetes_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CURL_CFLAGS) $(LIBLOGNORM_CFLAGS)
c17bfd
+mmkubernetes_la_LDFLAGS = -module -avoid-version
c17bfd
+mmkubernetes_la_LIBADD = $(CURL_LIBS) $(LIBLOGNORM_LIBS)
c17bfd
diff --git a/contrib/mmkubernetes/k8s_container_name.rulebase b/contrib/mmkubernetes/k8s_container_name.rulebase
c17bfd
new file mode 100644
c17bfd
index 000000000..35fbb317c
c17bfd
--- /dev/null
c17bfd
+++ b/contrib/mmkubernetes/k8s_container_name.rulebase
c17bfd
@@ -0,0 +1,3 @@
c17bfd
+version=2
c17bfd
+rule=:%k8s_prefix:char-to:_%_%container_name:char-to:.%.%container_hash:char-to:_%_%pod_name:char-to:_%_%namespace_name:char-to:_%_%not_used_1:char-to:_%_%not_used_2:rest%
c17bfd
+rule=:%k8s_prefix:char-to:_%_%container_name:char-to:_%_%pod_name:char-to:_%_%namespace_name:char-to:_%_%not_used_1:char-to:_%_%not_used_2:rest%
c17bfd
diff --git a/contrib/mmkubernetes/k8s_filename.rulebase b/contrib/mmkubernetes/k8s_filename.rulebase
c17bfd
new file mode 100644
c17bfd
index 000000000..24c0d9138
c17bfd
--- /dev/null
c17bfd
+++ b/contrib/mmkubernetes/k8s_filename.rulebase
c17bfd
@@ -0,0 +1,2 @@
c17bfd
+version=2
c17bfd
+rule=:/var/log/containers/%pod_name:char-to:_%_%namespace_name:char-to:_%_%container_name_and_id:char-to:.%.log
c17bfd
diff --git a/contrib/mmkubernetes/mmkubernetes.c b/contrib/mmkubernetes/mmkubernetes.c
c17bfd
new file mode 100644
c17bfd
index 000000000..5012c54f6
c17bfd
--- /dev/null
c17bfd
+++ b/contrib/mmkubernetes/mmkubernetes.c
c17bfd
@@ -0,0 +1,1491 @@
c17bfd
+/* mmkubernetes.c
c17bfd
+ * This is a message modification module. It uses metadata obtained
c17bfd
+ * from the message to query Kubernetes and obtain additional metadata
c17bfd
+ * relating to the container instance.
c17bfd
+ *
c17bfd
+ * Inspired by:
c17bfd
+ * https://github.com/fabric8io/fluent-plugin-kubernetes_metadata_filter
c17bfd
+ *
c17bfd
+ * NOTE: read comments in module-template.h for details on the calling interface!
c17bfd
+ *
c17bfd
+ * Copyright 2016 Red Hat Inc.
c17bfd
+ *
c17bfd
+ * This file is part of rsyslog.
c17bfd
+ *
c17bfd
+ * Licensed under the Apache License, Version 2.0 (the "License");
c17bfd
+ * you may not use this file except in compliance with the License.
c17bfd
+ * You may obtain a copy of the License at
c17bfd
+ *
c17bfd
+ *       http://www.apache.org/licenses/LICENSE-2.0
c17bfd
+ *       -or-
c17bfd
+ *       see COPYING.ASL20 in the source distribution
c17bfd
+ *
c17bfd
+ * Unless required by applicable law or agreed to in writing, software
c17bfd
+ * distributed under the License is distributed on an "AS IS" BASIS,
c17bfd
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
c17bfd
+ * See the License for the specific language governing permissions and
c17bfd
+ * limitations under the License.
c17bfd
+ */
c17bfd
+
c17bfd
+/* needed for asprintf */
c17bfd
+#ifndef _GNU_SOURCE
c17bfd
+#  define _GNU_SOURCE
c17bfd
+#endif
c17bfd
+
c17bfd
+#include "config.h"
c17bfd
+#include "rsyslog.h"
c17bfd
+#include <stdio.h>
c17bfd
+#include <stdarg.h>
c17bfd
+#include <stdlib.h>
c17bfd
+#include <string.h>
c17bfd
+#include <assert.h>
c17bfd
+#include <errno.h>
c17bfd
+#include <unistd.h>
c17bfd
+#include <sys/stat.h>
c17bfd
+#include <libestr.h>
c17bfd
+#include <liblognorm.h>
c17bfd
+#include <json.h>
c17bfd
+#include <curl/curl.h>
c17bfd
+#include <curl/easy.h>
c17bfd
+#include <pthread.h>
c17bfd
+#include "conf.h"
c17bfd
+#include "syslogd-types.h"
c17bfd
+#include "module-template.h"
c17bfd
+#include "errmsg.h"
c17bfd
+#include "regexp.h"
c17bfd
+#include "hashtable.h"
c17bfd
+#include "srUtils.h"
c17bfd
+
c17bfd
+/* static data */
c17bfd
+MODULE_TYPE_OUTPUT /* this is technically an output plugin */
c17bfd
+MODULE_TYPE_KEEP /* releasing the module would cause a leak through libcurl */
c17bfd
+MODULE_CNFNAME("mmkubernetes")
c17bfd
+DEF_OMOD_STATIC_DATA
c17bfd
+DEFobjCurrIf(errmsg)
c17bfd
+DEFobjCurrIf(regexp)
c17bfd
+
c17bfd
+#define HAVE_LOADSAMPLESFROMSTRING 1
c17bfd
+#if defined(NO_LOADSAMPLESFROMSTRING)
c17bfd
+#undef HAVE_LOADSAMPLESFROMSTRING
c17bfd
+#endif
c17bfd
+/* original from fluentd plugin:
c17bfd
+ * 'var\.log\.containers\.(?<pod_name>[a-z0-9]([-a-z0-9]*[a-z0-9])?\
c17bfd
+ *   (\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?<namespace>[^_]+)_\
c17bfd
+ *   (?<container_name>.+)-(?<docker_id>[a-z0-9]{64})\.log$'
c17bfd
+ * this is for _tag_ match, not actual filename match - in_tail turns filename
c17bfd
+ * into a fluentd tag
c17bfd
+ */
c17bfd
+#define DFLT_FILENAME_LNRULES "rule=:/var/log/containers/%pod_name:char-to:_%_"\
c17bfd
+	"%namespace_name:char-to:_%_%container_name:char-to:-%-%container_id:char-to:.%.log"
c17bfd
+#define DFLT_FILENAME_RULEBASE "/etc/rsyslog.d/k8s_filename.rulebase"
c17bfd
+/* original from fluentd plugin:
c17bfd
+ *   '^(?<name_prefix>[^_]+)_(?<container_name>[^\._]+)\
c17bfd
+ *     (\.(?<container_hash>[^_]+))?_(?<pod_name>[^_]+)_\
c17bfd
+ *     (?<namespace>[^_]+)_[^_]+_[^_]+$'
c17bfd
+ */
c17bfd
+#define DFLT_CONTAINER_LNRULES "rule=:%k8s_prefix:char-to:_%_%container_name:char-to:.%."\
c17bfd
+	"%container_hash:char-to:_%_"\
c17bfd
+	"%pod_name:char-to:_%_%namespace_name:char-to:_%_%not_used_1:char-to:_%_%not_used_2:rest%\n"\
c17bfd
+	"rule=:%k8s_prefix:char-to:_%_%container_name:char-to:_%_"\
c17bfd
+	"%pod_name:char-to:_%_%namespace_name:char-to:_%_%not_used_1:char-to:_%_%not_used_2:rest%"
c17bfd
+#define DFLT_CONTAINER_RULEBASE "/etc/rsyslog.d/k8s_container_name.rulebase"
c17bfd
+#define DFLT_SRCMD_PATH "$!metadata!filename"
c17bfd
+#define DFLT_DSTMD_PATH "$!"
c17bfd
+#define DFLT_DE_DOT 1 /* true */
c17bfd
+#define DFLT_DE_DOT_SEPARATOR "_"
c17bfd
+#define DFLT_CONTAINER_NAME "$!CONTAINER_NAME" /* name of variable holding CONTAINER_NAME value */
c17bfd
+#define DFLT_CONTAINER_ID_FULL "$!CONTAINER_ID_FULL" /* name of variable holding CONTAINER_ID_FULL value */
c17bfd
+#define DFLT_KUBERNETES_URL "https://kubernetes.default.svc.cluster.local:443"
c17bfd
+
c17bfd
+static struct cache_s {
c17bfd
+	const uchar *kbUrl;
c17bfd
+	struct hashtable *mdHt;
c17bfd
+	struct hashtable *nsHt;
c17bfd
+	pthread_mutex_t *cacheMtx;
c17bfd
+} **caches;
c17bfd
+
c17bfd
+typedef struct {
c17bfd
+	int nmemb;
c17bfd
+	uchar **patterns;
c17bfd
+	regex_t *regexps;
c17bfd
+} annotation_match_t;
c17bfd
+
c17bfd
+/* module configuration data */
c17bfd
+struct modConfData_s {
c17bfd
+	rsconf_t *pConf;	/* our overall config object */
c17bfd
+	uchar *kubernetesUrl;	/* scheme, host, port, and optional path prefix for Kubernetes API lookups */
c17bfd
+	uchar *srcMetadataPath;	/* where to get data for kubernetes queries */
c17bfd
+	uchar *dstMetadataPath;	/* where to put metadata obtained from kubernetes */
c17bfd
+	uchar *caCertFile; /* File holding the CA cert (+optional chain) of CA that issued the Kubernetes server cert */
c17bfd
+	sbool allowUnsignedCerts; /* For testing/debugging - do not check for CA certs (CURLOPT_SSL_VERIFYPEER FALSE) */
c17bfd
+	uchar *token; /* The token value to use to authenticate to Kubernetes - takes precedence over tokenFile */
c17bfd
+	uchar *tokenFile; /* The file whose contents is the token value to use to authenticate to Kubernetes */
c17bfd
+	sbool de_dot; /* If true (default), convert '.' characters in labels & annotations to de_dot_separator */
c17bfd
+	uchar *de_dot_separator; /* separator character (default '_') to use for de_dotting */
c17bfd
+	size_t de_dot_separator_len; /* length of separator character */
c17bfd
+	annotation_match_t annotation_match; /* annotation keys must match these to be included in record */
c17bfd
+	char *fnRules; /* lognorm rules for container log filename match */
c17bfd
+	uchar *fnRulebase; /* lognorm rulebase filename for container log filename match */
c17bfd
+	char *contRules; /* lognorm rules for CONTAINER_NAME value match */
c17bfd
+	uchar *contRulebase; /* lognorm rulebase filename for CONTAINER_NAME value match */
c17bfd
+};
c17bfd
+
c17bfd
+/* action (instance) configuration data */
c17bfd
+typedef struct _instanceData {
c17bfd
+	uchar *kubernetesUrl;	/* scheme, host, port, and optional path prefix for Kubernetes API lookups */
c17bfd
+	msgPropDescr_t *srcMetadataDescr;	/* where to get data for kubernetes queries */
c17bfd
+	uchar *dstMetadataPath;	/* where to put metadata obtained from kubernetes */
c17bfd
+	uchar *caCertFile; /* File holding the CA cert (+optional chain) of CA that issued the Kubernetes server cert */
c17bfd
+	sbool allowUnsignedCerts; /* For testing/debugging - do not check for CA certs (CURLOPT_SSL_VERIFYPEER FALSE) */
c17bfd
+	uchar *token; /* The token value to use to authenticate to Kubernetes - takes precedence over tokenFile */
c17bfd
+	uchar *tokenFile; /* The file whose contents is the token value to use to authenticate to Kubernetes */
c17bfd
+	sbool de_dot; /* If true (default), convert '.' characters in labels & annotations to de_dot_separator */
c17bfd
+	uchar *de_dot_separator; /* separator character (default '_') to use for de_dotting */
c17bfd
+	size_t de_dot_separator_len; /* length of separator character */
c17bfd
+	annotation_match_t annotation_match; /* annotation keys must match these to be included in record */
c17bfd
+	char *fnRules; /* lognorm rules for container log filename match */
c17bfd
+	uchar *fnRulebase; /* lognorm rulebase filename for container log filename match */
c17bfd
+	ln_ctx fnCtxln;	/**< context to be used for liblognorm */
c17bfd
+	char *contRules; /* lognorm rules for CONTAINER_NAME value match */
c17bfd
+	uchar *contRulebase; /* lognorm rulebase filename for CONTAINER_NAME value match */
c17bfd
+	ln_ctx contCtxln;	/**< context to be used for liblognorm */
c17bfd
+	msgPropDescr_t *contNameDescr; /* CONTAINER_NAME field */
c17bfd
+	msgPropDescr_t *contIdFullDescr; /* CONTAINER_ID_FULL field */
c17bfd
+	struct cache_s *cache;
c17bfd
+} instanceData;
c17bfd
+
c17bfd
+typedef struct wrkrInstanceData {
c17bfd
+	instanceData *pData;
c17bfd
+	CURL *curlCtx;
c17bfd
+	struct curl_slist *curlHdr;
c17bfd
+	char *curlRply;
c17bfd
+	size_t curlRplyLen;
c17bfd
+} wrkrInstanceData_t;
c17bfd
+
c17bfd
+/* module parameters (v6 config format) */
c17bfd
+static struct cnfparamdescr modpdescr[] = {
c17bfd
+	{ "kubernetesurl", eCmdHdlrString, 0 },
c17bfd
+	{ "srcmetadatapath", eCmdHdlrString, 0 },
c17bfd
+	{ "dstmetadatapath", eCmdHdlrString, 0 },
c17bfd
+	{ "tls.cacert", eCmdHdlrString, 0 },
c17bfd
+	{ "allowunsignedcerts", eCmdHdlrBinary, 0 },
c17bfd
+	{ "token", eCmdHdlrString, 0 },
c17bfd
+	{ "tokenfile", eCmdHdlrString, 0 },
c17bfd
+	{ "annotation_match", eCmdHdlrArray, 0 },
c17bfd
+	{ "de_dot", eCmdHdlrBinary, 0 },
c17bfd
+	{ "de_dot_separator", eCmdHdlrString, 0 },
c17bfd
+	{ "filenamerulebase", eCmdHdlrString, 0 },
c17bfd
+	{ "containerrulebase", eCmdHdlrString, 0 }
c17bfd
+#if HAVE_LOADSAMPLESFROMSTRING == 1
c17bfd
+	,
c17bfd
+	{ "filenamerules", eCmdHdlrArray, 0 },
c17bfd
+	{ "containerrules", eCmdHdlrArray, 0 }
c17bfd
+#endif
c17bfd
+};
c17bfd
+static struct cnfparamblk modpblk = {
c17bfd
+	CNFPARAMBLK_VERSION,
c17bfd
+	sizeof(modpdescr)/sizeof(struct cnfparamdescr),
c17bfd
+	modpdescr
c17bfd
+};
c17bfd
+
c17bfd
+/* action (instance) parameters (v6 config format) */
c17bfd
+static struct cnfparamdescr actpdescr[] = {
c17bfd
+	{ "kubernetesurl", eCmdHdlrString, 0 },
c17bfd
+	{ "srcmetadatapath", eCmdHdlrString, 0 },
c17bfd
+	{ "dstmetadatapath", eCmdHdlrString, 0 },
c17bfd
+	{ "tls.cacert", eCmdHdlrString, 0 },
c17bfd
+	{ "allowunsignedcerts", eCmdHdlrBinary, 0 },
c17bfd
+	{ "token", eCmdHdlrString, 0 },
c17bfd
+	{ "tokenfile", eCmdHdlrString, 0 },
c17bfd
+	{ "annotation_match", eCmdHdlrArray, 0 },
c17bfd
+	{ "de_dot", eCmdHdlrBinary, 0 },
c17bfd
+	{ "de_dot_separator", eCmdHdlrString, 0 },
c17bfd
+	{ "filenamerulebase", eCmdHdlrString, 0 },
c17bfd
+	{ "containerrulebase", eCmdHdlrString, 0 }
c17bfd
+#if HAVE_LOADSAMPLESFROMSTRING == 1
c17bfd
+	,
c17bfd
+	{ "filenamerules", eCmdHdlrArray, 0 },
c17bfd
+	{ "containerrules", eCmdHdlrArray, 0 }
c17bfd
+#endif
c17bfd
+};
c17bfd
+static struct cnfparamblk actpblk =
c17bfd
+	{ CNFPARAMBLK_VERSION,
c17bfd
+	  sizeof(actpdescr)/sizeof(struct cnfparamdescr),
c17bfd
+	  actpdescr
c17bfd
+	};
c17bfd
+
c17bfd
+static modConfData_t *loadModConf = NULL;	/* modConf ptr to use for the current load process */
c17bfd
+static modConfData_t *runModConf = NULL;	/* modConf ptr to use for the current exec process */
c17bfd
+
c17bfd
+static void free_annotationmatch(annotation_match_t *match) {
c17bfd
+	if (match) {
c17bfd
+		for(int ii = 0 ; ii < match->nmemb; ++ii) {
c17bfd
+			if (match->patterns)
c17bfd
+				free(match->patterns[ii]);
c17bfd
+			if (match->regexps)
c17bfd
+				regexp.regfree(&match->regexps[ii]);
c17bfd
+		}
c17bfd
+		free(match->patterns);
c17bfd
+		match->patterns = NULL;
c17bfd
+		free(match->regexps);
c17bfd
+		match->regexps = NULL;
c17bfd
+		match->nmemb = 0;
c17bfd
+	}
c17bfd
+}
c17bfd
+
c17bfd
+static int init_annotationmatch(annotation_match_t *match, struct cnfarray *ar) {
c17bfd
+	DEFiRet;
c17bfd
+
c17bfd
+	match->nmemb = ar->nmemb;
c17bfd
+	CHKmalloc(match->patterns = calloc(sizeof(uchar*), match->nmemb));
c17bfd
+	CHKmalloc(match->regexps = calloc(sizeof(regex_t), match->nmemb));
c17bfd
+	for(int jj = 0; jj < ar->nmemb; ++jj) {
c17bfd
+		int rexret = 0;
c17bfd
+		match->patterns[jj] = (uchar*)es_str2cstr(ar->arr[jj], NULL);
c17bfd
+		rexret = regexp.regcomp(&match->regexps[jj],
c17bfd
+				(char *)match->patterns[jj], REG_EXTENDED|REG_NOSUB);
c17bfd
+		if (0 != rexret) {
c17bfd
+			char errMsg[512];
c17bfd
+			regexp.regerror(rexret, &match->regexps[jj], errMsg, sizeof(errMsg));
c17bfd
+			iRet = RS_RET_CONFIG_ERROR;
c17bfd
+			errmsg.LogError(0, iRet,
c17bfd
+					"error: could not compile annotation_match string [%s]"
c17bfd
+					" into an extended regexp - %d: %s\n",
c17bfd
+					match->patterns[jj], rexret, errMsg);
c17bfd
+			break;
c17bfd
+		}
c17bfd
+	}
c17bfd
+finalize_it:
c17bfd
+	if (iRet)
c17bfd
+		free_annotationmatch(match);
c17bfd
+	RETiRet;
c17bfd
+}
c17bfd
+
c17bfd
+static int copy_annotationmatch(annotation_match_t *src, annotation_match_t *dest) {
c17bfd
+	DEFiRet;
c17bfd
+
c17bfd
+	dest->nmemb = src->nmemb;
c17bfd
+	CHKmalloc(dest->patterns = malloc(sizeof(uchar*) * dest->nmemb));
c17bfd
+	CHKmalloc(dest->regexps = calloc(sizeof(regex_t), dest->nmemb));
c17bfd
+	for(int jj = 0 ; jj < src->nmemb ; ++jj) {
c17bfd
+		CHKmalloc(dest->patterns[jj] = (uchar*)strdup((char *)src->patterns[jj]));
c17bfd
+		/* assumes was already successfully compiled */
c17bfd
+		regexp.regcomp(&dest->regexps[jj], (char *)dest->patterns[jj], REG_EXTENDED|REG_NOSUB);
c17bfd
+	}
c17bfd
+finalize_it:
c17bfd
+    if (iRet)
c17bfd
+    	free_annotationmatch(dest);
c17bfd
+	RETiRet;
c17bfd
+}
c17bfd
+
c17bfd
+/* takes a hash of annotations and returns another json object hash containing only the
c17bfd
+ * keys that match - this logic is taken directly from fluent-plugin-kubernetes_metadata_filter
c17bfd
+ * except that we do not add the key multiple times to the object to be returned
c17bfd
+ */
c17bfd
+static struct json_object *match_annotations(annotation_match_t *match,
c17bfd
+		struct json_object *annotations) {
c17bfd
+	struct json_object *ret = NULL;
c17bfd
+
c17bfd
+	for (int jj = 0; jj < match->nmemb; ++jj) {
c17bfd
+		struct json_object_iterator it = json_object_iter_begin(annotations);
c17bfd
+		struct json_object_iterator itEnd = json_object_iter_end(annotations);
c17bfd
+		for (;!json_object_iter_equal(&it, &itEnd); json_object_iter_next(&it)) {
c17bfd
+			const char *const key = json_object_iter_peek_name(&it);
c17bfd
+			if (!ret || !fjson_object_object_get_ex(ret, key, NULL)) {
c17bfd
+				if (!regexp.regexec(&match->regexps[jj], key, 0, NULL, 0)) {
c17bfd
+					if (!ret) {
c17bfd
+						ret = json_object_new_object();
c17bfd
+					}
c17bfd
+					json_object_object_add(ret, key,
c17bfd
+						json_object_get(json_object_iter_peek_value(&it)));
c17bfd
+				}
c17bfd
+			}
c17bfd
+		}
c17bfd
+	}
c17bfd
+	return ret;
c17bfd
+}
c17bfd
+
c17bfd
+/* This will take a hash of labels or annotations and will de_dot the keys.
c17bfd
+ * It will return a brand new hash.  AFAICT, there is no safe way to
c17bfd
+ * iterate over the hash while modifying it in place.
c17bfd
+ */
c17bfd
+static struct json_object *de_dot_json_object(struct json_object *jobj,
c17bfd
+		const char *delim, size_t delim_len) {
c17bfd
+	struct json_object *ret = NULL;
c17bfd
+	struct json_object_iterator it = json_object_iter_begin(jobj);
c17bfd
+	struct json_object_iterator itEnd = json_object_iter_end(jobj);
c17bfd
+	es_str_t *new_es_key = NULL;
c17bfd
+	DEFiRet;
c17bfd
+
c17bfd
+	ret = json_object_new_object();
c17bfd
+	while (!json_object_iter_equal(&it, &itEnd)) {
c17bfd
+		const char *const key = json_object_iter_peek_name(&it);
c17bfd
+		const char *cc = strstr(key, ".");
c17bfd
+		if (NULL == cc) {
c17bfd
+			json_object_object_add(ret, key,
c17bfd
+					json_object_get(json_object_iter_peek_value(&it)));
c17bfd
+		} else {
c17bfd
+			char *new_key = NULL;
c17bfd
+			const char *prevcc = key;
c17bfd
+			new_es_key = es_newStrFromCStr(key, (es_size_t)(cc-prevcc));
c17bfd
+			while (cc) {
c17bfd
+				if (es_addBuf(&new_es_key, (char *)delim, (es_size_t)delim_len))
c17bfd
+					ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
c17bfd
+				cc += 1; /* one past . */
c17bfd
+				prevcc = cc; /* beginning of next substring */
c17bfd
+				if ((cc = strstr(prevcc, ".")) || (cc = strchr(prevcc, '\0'))) {
c17bfd
+					if (es_addBuf(&new_es_key, (char *)prevcc, (es_size_t)(cc-prevcc)))
c17bfd
+						ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
c17bfd
+					if (!*cc)
c17bfd
+						cc = NULL; /* EOS - done */
c17bfd
+				}
c17bfd
+			}
c17bfd
+			new_key = es_str2cstr(new_es_key, NULL);
c17bfd
+			es_deleteStr(new_es_key);
c17bfd
+			new_es_key = NULL;
c17bfd
+			json_object_object_add(ret, new_key,
c17bfd
+					json_object_get(json_object_iter_peek_value(&it)));
c17bfd
+			free(new_key);
c17bfd
+		}
c17bfd
+		json_object_iter_next(&it);
c17bfd
+	}
c17bfd
+finalize_it:
c17bfd
+	if (iRet != RS_RET_OK) {
c17bfd
+		json_object_put(ret);
c17bfd
+		ret = NULL;
c17bfd
+	}
c17bfd
+	if (new_es_key)
c17bfd
+		es_deleteStr(new_es_key);
c17bfd
+	return ret;
c17bfd
+}
c17bfd
+
c17bfd
+/* given a "metadata" object field, do
c17bfd
+ * - make sure "annotations" field has only the matching keys
c17bfd
+ * - de_dot the "labels" and "annotations" fields keys
c17bfd
+ * This modifies the jMetadata object in place
c17bfd
+ */
c17bfd
+static void parse_labels_annotations(struct json_object *jMetadata,
c17bfd
+		annotation_match_t *match, sbool de_dot,
c17bfd
+		const char *delim, size_t delim_len) {
c17bfd
+	struct json_object *jo = NULL;
c17bfd
+
c17bfd
+	if (fjson_object_object_get_ex(jMetadata, "annotations", &jo)) {
c17bfd
+		if ((jo = match_annotations(match, jo)))
c17bfd
+			json_object_object_add(jMetadata, "annotations", jo);
c17bfd
+		else
c17bfd
+			json_object_object_del(jMetadata, "annotations");
c17bfd
+	}
c17bfd
+	/* dedot labels and annotations */
c17bfd
+	if (de_dot) {
c17bfd
+		struct json_object *jo2 = NULL;
c17bfd
+		if (fjson_object_object_get_ex(jMetadata, "annotations", &jo)) {
c17bfd
+			if ((jo2 = de_dot_json_object(jo, delim, delim_len))) {
c17bfd
+				json_object_object_add(jMetadata, "annotations", jo2);
c17bfd
+			}
c17bfd
+		}
c17bfd
+		if (fjson_object_object_get_ex(jMetadata, "labels", &jo)) {
c17bfd
+			if ((jo2 = de_dot_json_object(jo, delim, delim_len))) {
c17bfd
+				json_object_object_add(jMetadata, "labels", jo2);
c17bfd
+			}
c17bfd
+		}
c17bfd
+	}
c17bfd
+}
c17bfd
+
c17bfd
+#if HAVE_LOADSAMPLESFROMSTRING == 1
c17bfd
+static int array_to_rules(struct cnfarray *ar, char **rules) {
c17bfd
+	DEFiRet;
c17bfd
+	es_str_t *tmpstr = NULL;
c17bfd
+	es_size_t size = 0;
c17bfd
+
c17bfd
+	if (rules == NULL)
c17bfd
+		FINALIZE;
c17bfd
+	*rules = NULL;
c17bfd
+	if (!ar->nmemb)
c17bfd
+		FINALIZE;
c17bfd
+	for (int jj = 0; jj < ar->nmemb; jj++)
c17bfd
+		size += es_strlen(ar->arr[jj]);
c17bfd
+	if (!size)
c17bfd
+		FINALIZE;
c17bfd
+	CHKmalloc(tmpstr = es_newStr(size));
c17bfd
+	CHKiRet((es_addStr(&tmpstr, ar->arr[0])));
c17bfd
+	CHKiRet((es_addBufConstcstr(&tmpstr, "\n")));
c17bfd
+	for(int jj=1; jj < ar->nmemb; ++jj) {
c17bfd
+		CHKiRet((es_addStr(&tmpstr, ar->arr[jj])));
c17bfd
+		CHKiRet((es_addBufConstcstr(&tmpstr, "\n")));
c17bfd
+	}
c17bfd
+	CHKiRet((es_addBufConstcstr(&tmpstr, "\0")));
c17bfd
+	CHKmalloc(*rules = es_str2cstr(tmpstr, NULL));
c17bfd
+finalize_it:
c17bfd
+	if (tmpstr) {
c17bfd
+		es_deleteStr(tmpstr);
c17bfd
+	}
c17bfd
+    if (iRet != RS_RET_OK) {
c17bfd
+    	free(*rules);
c17bfd
+    	*rules = NULL;
c17bfd
+    }
c17bfd
+	RETiRet;
c17bfd
+}
c17bfd
+#endif
c17bfd
+
c17bfd
+/* callback for liblognorm error messages */
c17bfd
+static void
c17bfd
+errCallBack(void __attribute__((unused)) *cookie, const char *msg,
c17bfd
+	    size_t __attribute__((unused)) lenMsg)
c17bfd
+{
c17bfd
+	errmsg.LogError(0, RS_RET_ERR_LIBLOGNORM, "liblognorm error: %s", msg);
c17bfd
+}
c17bfd
+
c17bfd
+static rsRetVal
c17bfd
+set_lnctx(ln_ctx *ctxln, char *instRules, uchar *instRulebase, char *modRules, uchar *modRulebase)
c17bfd
+{
c17bfd
+	DEFiRet;
c17bfd
+	if (ctxln == NULL)
c17bfd
+		FINALIZE;
c17bfd
+	CHKmalloc(*ctxln = ln_initCtx());
c17bfd
+	ln_setErrMsgCB(*ctxln, errCallBack, NULL);
c17bfd
+	if(instRules) {
c17bfd
+#if HAVE_LOADSAMPLESFROMSTRING == 1
c17bfd
+		if(ln_loadSamplesFromString(*ctxln, instRules) !=0) {
c17bfd
+			errmsg.LogError(0, RS_RET_NO_RULEBASE, "error: normalization rules '%s' "
c17bfd
+					"could not be loaded", instRules);
c17bfd
+			ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD);
c17bfd
+		}
c17bfd
+#else
c17bfd
+		(void)instRules;
c17bfd
+#endif
c17bfd
+	} else if(instRulebase) {
c17bfd
+		if(ln_loadSamples(*ctxln, (char*) instRulebase) != 0) {
c17bfd
+			errmsg.LogError(0, RS_RET_NO_RULEBASE, "error: normalization rulebase '%s' "
c17bfd
+					"could not be loaded", instRulebase);
c17bfd
+			ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD);
c17bfd
+		}
c17bfd
+	} else if(modRules) {
c17bfd
+#if HAVE_LOADSAMPLESFROMSTRING == 1
c17bfd
+		if(ln_loadSamplesFromString(*ctxln, modRules) !=0) {
c17bfd
+			errmsg.LogError(0, RS_RET_NO_RULEBASE, "error: normalization rules '%s' "
c17bfd
+					"could not be loaded", modRules);
c17bfd
+			ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD);
c17bfd
+		}
c17bfd
+#else
c17bfd
+		(void)modRules;
c17bfd
+#endif
c17bfd
+	} else if(modRulebase) {
c17bfd
+		if(ln_loadSamples(*ctxln, (char*) modRulebase) != 0) {
c17bfd
+			errmsg.LogError(0, RS_RET_NO_RULEBASE, "error: normalization rulebase '%s' "
c17bfd
+					"could not be loaded", modRulebase);
c17bfd
+			ABORT_FINALIZE(RS_RET_ERR_LIBLOGNORM_SAMPDB_LOAD);
c17bfd
+		}
c17bfd
+	}
c17bfd
+finalize_it:
c17bfd
+	if (iRet != RS_RET_OK){
c17bfd
+		ln_exitCtx(*ctxln);
c17bfd
+		*ctxln = NULL;
c17bfd
+	}
c17bfd
+	RETiRet;
c17bfd
+}
c17bfd
+
c17bfd
+BEGINbeginCnfLoad
c17bfd
+CODESTARTbeginCnfLoad
c17bfd
+	loadModConf = pModConf;
c17bfd
+	pModConf->pConf = pConf;
c17bfd
+ENDbeginCnfLoad
c17bfd
+
c17bfd
+
c17bfd
+BEGINsetModCnf
c17bfd
+	struct cnfparamvals *pvals = NULL;
c17bfd
+	int i;
c17bfd
+	FILE *fp;
c17bfd
+	int ret;
c17bfd
+CODESTARTsetModCnf
c17bfd
+	pvals = nvlstGetParams(lst, &modpblk, NULL);
c17bfd
+	if(pvals == NULL) {
c17bfd
+		errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "mmkubernetes: "
c17bfd
+			"error processing module config parameters [module(...)]");
c17bfd
+		ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
c17bfd
+	}
c17bfd
+
c17bfd
+	if(Debug) {
c17bfd
+		dbgprintf("module (global) param blk for mmkubernetes:\n");
c17bfd
+		cnfparamsPrint(&modpblk, pvals);
c17bfd
+	}
c17bfd
+
c17bfd
+	loadModConf->de_dot = DFLT_DE_DOT;
c17bfd
+	for(i = 0 ; i < modpblk.nParams ; ++i) {
c17bfd
+		if(!pvals[i].bUsed) {
c17bfd
+			continue;
c17bfd
+		} else if(!strcmp(modpblk.descr[i].name, "kubernetesurl")) {
c17bfd
+			free(loadModConf->kubernetesUrl);
c17bfd
+			loadModConf->kubernetesUrl = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
+		} else if(!strcmp(modpblk.descr[i].name, "srcmetadatapath")) {
c17bfd
+			free(loadModConf->srcMetadataPath);
c17bfd
+			loadModConf->srcMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
+			/* todo: sanitize the path */
c17bfd
+		} else if(!strcmp(modpblk.descr[i].name, "dstmetadatapath")) {
c17bfd
+			free(loadModConf->dstMetadataPath);
c17bfd
+			loadModConf->dstMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
+			/* todo: sanitize the path */
c17bfd
+		} else if(!strcmp(modpblk.descr[i].name, "tls.cacert")) {
c17bfd
+			free(loadModConf->caCertFile);
c17bfd
+			loadModConf->caCertFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
+			fp = fopen((const char*)loadModConf->caCertFile, "r");
c17bfd
+			if(fp == NULL) {
c17bfd
+				char errStr[1024];
c17bfd
+				rs_strerror_r(errno, errStr, sizeof(errStr));
c17bfd
+				iRet = RS_RET_NO_FILE_ACCESS;
c17bfd
+				errmsg.LogError(0, iRet,
c17bfd
+						"error: certificate file %s couldn't be accessed: %s\n",
c17bfd
+						loadModConf->caCertFile, errStr);
c17bfd
+				ABORT_FINALIZE(iRet);
c17bfd
+			} else {
c17bfd
+				fclose(fp);
c17bfd
+			}
c17bfd
+		} else if(!strcmp(modpblk.descr[i].name, "allowunsignedcerts")) {
c17bfd
+			loadModConf->allowUnsignedCerts = pvals[i].val.d.n;
c17bfd
+		} else if(!strcmp(modpblk.descr[i].name, "token")) {
c17bfd
+			free(loadModConf->token);
c17bfd
+			loadModConf->token = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
+		} else if(!strcmp(modpblk.descr[i].name, "tokenfile")) {
c17bfd
+			free(loadModConf->tokenFile);
c17bfd
+			loadModConf->tokenFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
+			fp = fopen((const char*)loadModConf->tokenFile, "r");
c17bfd
+			if(fp == NULL) {
c17bfd
+				char errStr[1024];
c17bfd
+				rs_strerror_r(errno, errStr, sizeof(errStr));
c17bfd
+				iRet = RS_RET_NO_FILE_ACCESS;
c17bfd
+				errmsg.LogError(0, iRet,
c17bfd
+						"error: token file %s couldn't be accessed: %s\n",
c17bfd
+						loadModConf->tokenFile, errStr);
c17bfd
+				ABORT_FINALIZE(iRet);
c17bfd
+			} else {
c17bfd
+				fclose(fp);
c17bfd
+			}
c17bfd
+		} else if(!strcmp(modpblk.descr[i].name, "annotation_match")) {
c17bfd
+			free_annotationmatch(&loadModConf->annotation_match);
c17bfd
+			if ((ret = init_annotationmatch(&loadModConf->annotation_match, pvals[i].val.d.ar)))
c17bfd
+				ABORT_FINALIZE(ret);
c17bfd
+		} else if(!strcmp(modpblk.descr[i].name, "de_dot")) {
c17bfd
+			loadModConf->de_dot = pvals[i].val.d.n;
c17bfd
+		} else if(!strcmp(modpblk.descr[i].name, "de_dot_separator")) {
c17bfd
+			free(loadModConf->de_dot_separator);
c17bfd
+			loadModConf->de_dot_separator = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
+#if HAVE_LOADSAMPLESFROMSTRING == 1
c17bfd
+		} else if(!strcmp(modpblk.descr[i].name, "filenamerules")) {
c17bfd
+			free(loadModConf->fnRules);
c17bfd
+			CHKiRet((array_to_rules(pvals[i].val.d.ar, &loadModConf->fnRules)));
c17bfd
+#endif
c17bfd
+		} else if(!strcmp(modpblk.descr[i].name, "filenamerulebase")) {
c17bfd
+			free(loadModConf->fnRulebase);
c17bfd
+			loadModConf->fnRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
+			fp = fopen((const char*)loadModConf->fnRulebase, "r");
c17bfd
+			if(fp == NULL) {
c17bfd
+				char errStr[1024];
c17bfd
+				rs_strerror_r(errno, errStr, sizeof(errStr));
c17bfd
+				iRet = RS_RET_NO_FILE_ACCESS;
c17bfd
+				errmsg.LogError(0, iRet,
c17bfd
+						"error: filenamerulebase file %s couldn't be accessed: %s\n",
c17bfd
+						loadModConf->fnRulebase, errStr);
c17bfd
+				ABORT_FINALIZE(iRet);
c17bfd
+			} else {
c17bfd
+				fclose(fp);
c17bfd
+			}
c17bfd
+#if HAVE_LOADSAMPLESFROMSTRING == 1
c17bfd
+		} else if(!strcmp(modpblk.descr[i].name, "containerrules")) {
c17bfd
+			free(loadModConf->contRules);
c17bfd
+			CHKiRet((array_to_rules(pvals[i].val.d.ar, &loadModConf->contRules)));
c17bfd
+#endif
c17bfd
+		} else if(!strcmp(modpblk.descr[i].name, "containerrulebase")) {
c17bfd
+			free(loadModConf->contRulebase);
c17bfd
+			loadModConf->contRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
+			fp = fopen((const char*)loadModConf->contRulebase, "r");
c17bfd
+			if(fp == NULL) {
c17bfd
+				char errStr[1024];
c17bfd
+				rs_strerror_r(errno, errStr, sizeof(errStr));
c17bfd
+				iRet = RS_RET_NO_FILE_ACCESS;
c17bfd
+				errmsg.LogError(0, iRet,
c17bfd
+						"error: containerrulebase file %s couldn't be accessed: %s\n",
c17bfd
+						loadModConf->contRulebase, errStr);
c17bfd
+				ABORT_FINALIZE(iRet);
c17bfd
+			} else {
c17bfd
+				fclose(fp);
c17bfd
+			}
c17bfd
+		} else {
c17bfd
+			dbgprintf("mmkubernetes: program error, non-handled "
c17bfd
+				"param '%s' in module() block\n", modpblk.descr[i].name);
c17bfd
+			/* todo: error message? */
c17bfd
+		}
c17bfd
+	}
c17bfd
+
c17bfd
+#if HAVE_LOADSAMPLESFROMSTRING == 1
c17bfd
+	if (loadModConf->fnRules && loadModConf->fnRulebase) {
c17bfd
+		errmsg.LogError(0, RS_RET_CONFIG_ERROR,
c17bfd
+				"mmkubernetes: only 1 of filenamerules or filenamerulebase may be used");
c17bfd
+		ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
c17bfd
+	}
c17bfd
+	if (loadModConf->contRules && loadModConf->contRulebase) {
c17bfd
+		errmsg.LogError(0, RS_RET_CONFIG_ERROR,
c17bfd
+				"mmkubernetes: only 1 of containerrules or containerrulebase may be used");
c17bfd
+		ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
c17bfd
+	}
c17bfd
+#endif
c17bfd
+
c17bfd
+	/* set defaults */
c17bfd
+	if(loadModConf->srcMetadataPath == NULL)
c17bfd
+		loadModConf->srcMetadataPath = (uchar *) strdup(DFLT_SRCMD_PATH);
c17bfd
+	if(loadModConf->dstMetadataPath == NULL)
c17bfd
+		loadModConf->dstMetadataPath = (uchar *) strdup(DFLT_DSTMD_PATH);
c17bfd
+	if(loadModConf->de_dot_separator == NULL)
c17bfd
+		loadModConf->de_dot_separator = (uchar *) strdup(DFLT_DE_DOT_SEPARATOR);
c17bfd
+	if(loadModConf->de_dot_separator)
c17bfd
+		loadModConf->de_dot_separator_len = strlen((const char *)loadModConf->de_dot_separator);
c17bfd
+#if HAVE_LOADSAMPLESFROMSTRING == 1
c17bfd
+	if (loadModConf->fnRules == NULL && loadModConf->fnRulebase == NULL)
c17bfd
+		loadModConf->fnRules = strdup(DFLT_FILENAME_LNRULES);
c17bfd
+	if (loadModConf->contRules == NULL && loadModConf->contRulebase == NULL)
c17bfd
+		loadModConf->contRules = strdup(DFLT_CONTAINER_LNRULES);
c17bfd
+#else
c17bfd
+	if (loadModConf->fnRulebase == NULL)
c17bfd
+		loadModConf->fnRulebase = (uchar *)strdup(DFLT_FILENAME_RULEBASE);
c17bfd
+	if (loadModConf->contRulebase == NULL)
c17bfd
+		loadModConf->contRulebase = (uchar *)strdup(DFLT_CONTAINER_RULEBASE);
c17bfd
+#endif
c17bfd
+	caches = calloc(1, sizeof(struct cache_s *));
c17bfd
+
c17bfd
+finalize_it:
c17bfd
+	if(pvals != NULL)
c17bfd
+		cnfparamvalsDestruct(pvals, &modpblk);
c17bfd
+ENDsetModCnf
c17bfd
+
c17bfd
+
c17bfd
+BEGINcreateInstance
c17bfd
+CODESTARTcreateInstance
c17bfd
+ENDcreateInstance
c17bfd
+
c17bfd
+
c17bfd
+BEGINfreeInstance
c17bfd
+CODESTARTfreeInstance
c17bfd
+	free(pData->kubernetesUrl);
c17bfd
+	msgPropDescrDestruct(pData->srcMetadataDescr);
c17bfd
+	free(pData->srcMetadataDescr);
c17bfd
+	free(pData->dstMetadataPath);
c17bfd
+	free(pData->caCertFile);
c17bfd
+	free(pData->token);
c17bfd
+	free(pData->tokenFile);
c17bfd
+	free(pData->fnRules);
c17bfd
+	free(pData->fnRulebase);
c17bfd
+	ln_exitCtx(pData->fnCtxln);
c17bfd
+	free(pData->contRules);
c17bfd
+	free(pData->contRulebase);
c17bfd
+	ln_exitCtx(pData->contCtxln);
c17bfd
+	free_annotationmatch(&pData->annotation_match);
c17bfd
+	free(pData->de_dot_separator);
c17bfd
+	msgPropDescrDestruct(pData->contNameDescr);
c17bfd
+	free(pData->contNameDescr);
c17bfd
+	msgPropDescrDestruct(pData->contIdFullDescr);
c17bfd
+	free(pData->contIdFullDescr);
c17bfd
+ENDfreeInstance
c17bfd
+
c17bfd
+static size_t curlCB(char *data, size_t size, size_t nmemb, void *usrptr)
c17bfd
+{
c17bfd
+	DEFiRet;
c17bfd
+	wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t *) usrptr;
c17bfd
+	char * buf;
c17bfd
+	size_t newlen;
c17bfd
+
c17bfd
+	newlen = pWrkrData->curlRplyLen + size * nmemb;
c17bfd
+	CHKmalloc(buf = realloc(pWrkrData->curlRply, newlen));
c17bfd
+	memcpy(buf + pWrkrData->curlRplyLen, data, size * nmemb);
c17bfd
+	pWrkrData->curlRply = buf;
c17bfd
+	pWrkrData->curlRplyLen = newlen;
c17bfd
+
c17bfd
+finalize_it:
c17bfd
+	if (iRet != RS_RET_OK) {
c17bfd
+		return 0;
c17bfd
+	}
c17bfd
+	return size * nmemb;
c17bfd
+}
c17bfd
+
c17bfd
+BEGINcreateWrkrInstance
c17bfd
+CODESTARTcreateWrkrInstance
c17bfd
+	CURL *ctx;
c17bfd
+	struct curl_slist *hdr = NULL;
c17bfd
+	char *tokenHdr = NULL;
c17bfd
+	FILE *fp = NULL;
c17bfd
+	char *token = NULL;
c17bfd
+
c17bfd
+	hdr = curl_slist_append(hdr, "Content-Type: text/json; charset=utf-8");
c17bfd
+	if (pWrkrData->pData->token) {
c17bfd
+		if ((-1 == asprintf(&tokenHdr, "Authorization: Bearer %s", pWrkrData->pData->token)) ||
c17bfd
+			(!tokenHdr)) {
c17bfd
+			ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
c17bfd
+		}
c17bfd
+	} else if (pWrkrData->pData->tokenFile) {
c17bfd
+		struct stat statbuf;
c17bfd
+		fp = fopen((const char*)pWrkrData->pData->tokenFile, "r");
c17bfd
+		if (fp && !fstat(fileno(fp), &statbuf)) {
c17bfd
+			size_t bytesread;
c17bfd
+			CHKmalloc(token = malloc((statbuf.st_size+1)*sizeof(char)));
c17bfd
+			if (0 < (bytesread = fread(token, sizeof(char), statbuf.st_size, fp))) {
c17bfd
+				token[bytesread] = '\0';
c17bfd
+				if ((-1 == asprintf(&tokenHdr, "Authorization: Bearer %s", token)) ||
c17bfd
+					(!tokenHdr)) {
c17bfd
+					ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
c17bfd
+				}
c17bfd
+			}
c17bfd
+			free(token);
c17bfd
+			token = NULL;
c17bfd
+		}
c17bfd
+		if (fp) {
c17bfd
+			fclose(fp);
c17bfd
+			fp = NULL;
c17bfd
+		}
c17bfd
+	}
c17bfd
+	if (tokenHdr) {
c17bfd
+		hdr = curl_slist_append(hdr, tokenHdr);
c17bfd
+		free(tokenHdr);
c17bfd
+	}
c17bfd
+	pWrkrData->curlHdr = hdr;
c17bfd
+	ctx = curl_easy_init();
c17bfd
+	curl_easy_setopt(ctx, CURLOPT_HTTPHEADER, hdr);
c17bfd
+	curl_easy_setopt(ctx, CURLOPT_WRITEFUNCTION, curlCB);
c17bfd
+	curl_easy_setopt(ctx, CURLOPT_WRITEDATA, pWrkrData);
c17bfd
+	if(pWrkrData->pData->caCertFile)
c17bfd
+		curl_easy_setopt(ctx, CURLOPT_CAINFO, pWrkrData->pData->caCertFile);
c17bfd
+	if(pWrkrData->pData->allowUnsignedCerts)
c17bfd
+		curl_easy_setopt(ctx, CURLOPT_SSL_VERIFYPEER, 0);
c17bfd
+
c17bfd
+	pWrkrData->curlCtx = ctx;
c17bfd
+finalize_it:
c17bfd
+	free(token);
c17bfd
+	if (fp) {
c17bfd
+		fclose(fp);
c17bfd
+	}
c17bfd
+ENDcreateWrkrInstance
c17bfd
+
c17bfd
+
c17bfd
+BEGINfreeWrkrInstance
c17bfd
+CODESTARTfreeWrkrInstance
c17bfd
+	curl_easy_cleanup(pWrkrData->curlCtx);
c17bfd
+	curl_slist_free_all(pWrkrData->curlHdr);
c17bfd
+ENDfreeWrkrInstance
c17bfd
+
c17bfd
+
c17bfd
+static struct cache_s *cacheNew(const uchar *url)
c17bfd
+{
c17bfd
+	struct cache_s *cache;
c17bfd
+
c17bfd
+	if (NULL == (cache = calloc(1, sizeof(struct cache_s)))) {
c17bfd
+		FINALIZE;
c17bfd
+	}
c17bfd
+	cache->kbUrl = url;
c17bfd
+	cache->mdHt = create_hashtable(100, hash_from_string,
c17bfd
+		key_equals_string, (void (*)(void *)) json_object_put);
c17bfd
+	cache->nsHt = create_hashtable(100, hash_from_string,
c17bfd
+		key_equals_string, (void (*)(void *)) json_object_put);
c17bfd
+	cache->cacheMtx = malloc(sizeof(pthread_mutex_t));
c17bfd
+	if (!cache->mdHt || !cache->nsHt || !cache->cacheMtx) {
c17bfd
+		free (cache);
c17bfd
+		cache = NULL;
c17bfd
+		FINALIZE;
c17bfd
+	}
c17bfd
+	pthread_mutex_init(cache->cacheMtx, NULL);
c17bfd
+
c17bfd
+finalize_it:
c17bfd
+	return cache;
c17bfd
+}
c17bfd
+
c17bfd
+
c17bfd
+static void cacheFree(struct cache_s *cache)
c17bfd
+{
c17bfd
+	hashtable_destroy(cache->mdHt, 1);
c17bfd
+	hashtable_destroy(cache->nsHt, 1);
c17bfd
+	pthread_mutex_destroy(cache->cacheMtx);
c17bfd
+	free(cache->cacheMtx);
c17bfd
+	free(cache);
c17bfd
+}
c17bfd
+
c17bfd
+
c17bfd
+BEGINnewActInst
c17bfd
+	struct cnfparamvals *pvals = NULL;
c17bfd
+	int i;
c17bfd
+	FILE *fp;
c17bfd
+	char *rxstr = NULL;
c17bfd
+	char *srcMetadataPath = NULL;
c17bfd
+CODESTARTnewActInst
c17bfd
+	DBGPRINTF("newActInst (mmkubernetes)\n");
c17bfd
+
c17bfd
+	pvals = nvlstGetParams(lst, &actpblk, NULL);
c17bfd
+	if(pvals == NULL) {
c17bfd
+		errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "mmkubernetes: "
c17bfd
+			"error processing config parameters [action(...)]");
c17bfd
+		ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
c17bfd
+	}
c17bfd
+
c17bfd
+	if(Debug) {
c17bfd
+		dbgprintf("action param blk in mmkubernetes:\n");
c17bfd
+		cnfparamsPrint(&actpblk, pvals);
c17bfd
+	}
c17bfd
+
c17bfd
+	CODE_STD_STRING_REQUESTnewActInst(1)
c17bfd
+	CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
c17bfd
+	CHKiRet(createInstance(&pData));
c17bfd
+
c17bfd
+	pData->de_dot = loadModConf->de_dot;
c17bfd
+	pData->allowUnsignedCerts = loadModConf->allowUnsignedCerts;
c17bfd
+	for(i = 0 ; i < actpblk.nParams ; ++i) {
c17bfd
+		if(!pvals[i].bUsed) {
c17bfd
+			continue;
c17bfd
+		} else if(!strcmp(actpblk.descr[i].name, "kubernetesurl")) {
c17bfd
+			free(pData->kubernetesUrl);
c17bfd
+			pData->kubernetesUrl = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
+		} else if(!strcmp(actpblk.descr[i].name, "srcmetadatapath")) {
c17bfd
+			msgPropDescrDestruct(pData->srcMetadataDescr);
c17bfd
+			free(pData->srcMetadataDescr);
c17bfd
+			CHKmalloc(pData->srcMetadataDescr = MALLOC(sizeof(msgPropDescr_t)));
c17bfd
+			srcMetadataPath = es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
+			CHKiRet(msgPropDescrFill(pData->srcMetadataDescr, (uchar *)srcMetadataPath,
c17bfd
+				strlen(srcMetadataPath)));
c17bfd
+			/* todo: sanitize the path */
c17bfd
+		} else if(!strcmp(actpblk.descr[i].name, "dstmetadatapath")) {
c17bfd
+			free(pData->dstMetadataPath);
c17bfd
+			pData->dstMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
+			/* todo: sanitize the path */
c17bfd
+		} else if(!strcmp(actpblk.descr[i].name, "tls.cacert")) {
c17bfd
+			free(pData->caCertFile);
c17bfd
+			pData->caCertFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
+			fp = fopen((const char*)pData->caCertFile, "r");
c17bfd
+			if(fp == NULL) {
c17bfd
+				char errStr[1024];
c17bfd
+				rs_strerror_r(errno, errStr, sizeof(errStr));
c17bfd
+				iRet = RS_RET_NO_FILE_ACCESS;
c17bfd
+				errmsg.LogError(0, iRet,
c17bfd
+						"error: certificate file %s couldn't be accessed: %s\n",
c17bfd
+						pData->caCertFile, errStr);
c17bfd
+				ABORT_FINALIZE(iRet);
c17bfd
+			} else {
c17bfd
+				fclose(fp);
c17bfd
+			}
c17bfd
+		} else if(!strcmp(actpblk.descr[i].name, "allowunsignedcerts")) {
c17bfd
+			pData->allowUnsignedCerts = pvals[i].val.d.n;
c17bfd
+		} else if(!strcmp(actpblk.descr[i].name, "token")) {
c17bfd
+			free(pData->token);
c17bfd
+			pData->token = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
+		} else if(!strcmp(actpblk.descr[i].name, "tokenfile")) {
c17bfd
+			free(pData->tokenFile);
c17bfd
+			pData->tokenFile = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
+			fp = fopen((const char*)pData->tokenFile, "r");
c17bfd
+			if(fp == NULL) {
c17bfd
+				char errStr[1024];
c17bfd
+				rs_strerror_r(errno, errStr, sizeof(errStr));
c17bfd
+				iRet = RS_RET_NO_FILE_ACCESS;
c17bfd
+				errmsg.LogError(0, iRet,
c17bfd
+						"error: token file %s couldn't be accessed: %s\n",
c17bfd
+						pData->tokenFile, errStr);
c17bfd
+				ABORT_FINALIZE(iRet);
c17bfd
+			} else {
c17bfd
+				fclose(fp);
c17bfd
+			}
c17bfd
+		} else if(!strcmp(actpblk.descr[i].name, "annotation_match")) {
c17bfd
+			free_annotationmatch(&pData->annotation_match);
c17bfd
+			if (RS_RET_OK != (iRet = init_annotationmatch(&pData->annotation_match, pvals[i].val.d.ar)))
c17bfd
+				ABORT_FINALIZE(iRet);
c17bfd
+		} else if(!strcmp(actpblk.descr[i].name, "de_dot")) {
c17bfd
+			pData->de_dot = pvals[i].val.d.n;
c17bfd
+		} else if(!strcmp(actpblk.descr[i].name, "de_dot_separator")) {
c17bfd
+			free(pData->de_dot_separator);
c17bfd
+			pData->de_dot_separator = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
+#if HAVE_LOADSAMPLESFROMSTRING == 1
c17bfd
+		} else if(!strcmp(modpblk.descr[i].name, "filenamerules")) {
c17bfd
+			free(pData->fnRules);
c17bfd
+			CHKiRet((array_to_rules(pvals[i].val.d.ar, &pData->fnRules)));
c17bfd
+#endif
c17bfd
+		} else if(!strcmp(modpblk.descr[i].name, "filenamerulebase")) {
c17bfd
+			free(pData->fnRulebase);
c17bfd
+			pData->fnRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
+			fp = fopen((const char*)pData->fnRulebase, "r");
c17bfd
+			if(fp == NULL) {
c17bfd
+				char errStr[1024];
c17bfd
+				rs_strerror_r(errno, errStr, sizeof(errStr));
c17bfd
+				iRet = RS_RET_NO_FILE_ACCESS;
c17bfd
+				errmsg.LogError(0, iRet,
c17bfd
+						"error: filenamerulebase file %s couldn't be accessed: %s\n",
c17bfd
+						pData->fnRulebase, errStr);
c17bfd
+				ABORT_FINALIZE(iRet);
c17bfd
+			} else {
c17bfd
+				fclose(fp);
c17bfd
+			}
c17bfd
+#if HAVE_LOADSAMPLESFROMSTRING == 1
c17bfd
+		} else if(!strcmp(modpblk.descr[i].name, "containerrules")) {
c17bfd
+			free(pData->contRules);
c17bfd
+			CHKiRet((array_to_rules(pvals[i].val.d.ar, &pData->contRules)));
c17bfd
+#endif
c17bfd
+		} else if(!strcmp(modpblk.descr[i].name, "containerrulebase")) {
c17bfd
+			free(pData->contRulebase);
c17bfd
+			pData->contRulebase = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
c17bfd
+			fp = fopen((const char*)pData->contRulebase, "r");
c17bfd
+			if(fp == NULL) {
c17bfd
+				char errStr[1024];
c17bfd
+				rs_strerror_r(errno, errStr, sizeof(errStr));
c17bfd
+				iRet = RS_RET_NO_FILE_ACCESS;
c17bfd
+				errmsg.LogError(0, iRet,
c17bfd
+						"error: containerrulebase file %s couldn't be accessed: %s\n",
c17bfd
+						pData->contRulebase, errStr);
c17bfd
+				ABORT_FINALIZE(iRet);
c17bfd
+			} else {
c17bfd
+				fclose(fp);
c17bfd
+			}
c17bfd
+		} else {
c17bfd
+			dbgprintf("mmkubernetes: program error, non-handled "
c17bfd
+				"param '%s' in action() block\n", actpblk.descr[i].name);
c17bfd
+			/* todo: error message? */
c17bfd
+		}
c17bfd
+	}
c17bfd
+
c17bfd
+#if HAVE_LOADSAMPLESFROMSTRING == 1
c17bfd
+	if (pData->fnRules && pData->fnRulebase) {
c17bfd
+		errmsg.LogError(0, RS_RET_CONFIG_ERROR,
c17bfd
+		    "mmkubernetes: only 1 of filenamerules or filenamerulebase may be used");
c17bfd
+		ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
c17bfd
+	}
c17bfd
+	if (pData->contRules && pData->contRulebase) {
c17bfd
+		errmsg.LogError(0, RS_RET_CONFIG_ERROR,
c17bfd
+			"mmkubernetes: only 1 of containerrules or containerrulebase may be used");
c17bfd
+		ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
c17bfd
+	}
c17bfd
+#endif
c17bfd
+	CHKiRet(set_lnctx(&pData->fnCtxln, pData->fnRules, pData->fnRulebase,
c17bfd
+			loadModConf->fnRules, loadModConf->fnRulebase));
c17bfd
+	CHKiRet(set_lnctx(&pData->contCtxln, pData->contRules, pData->contRulebase,
c17bfd
+			loadModConf->contRules, loadModConf->contRulebase));
c17bfd
+
c17bfd
+	if(pData->kubernetesUrl == NULL) {
c17bfd
+		if(loadModConf->kubernetesUrl == NULL) {
c17bfd
+			CHKmalloc(pData->kubernetesUrl = (uchar *) strdup(DFLT_KUBERNETES_URL));
c17bfd
+		} else {
c17bfd
+			CHKmalloc(pData->kubernetesUrl = (uchar *) strdup((char *) loadModConf->kubernetesUrl));
c17bfd
+		}
c17bfd
+	}
c17bfd
+	if(pData->srcMetadataDescr == NULL) {
c17bfd
+		CHKmalloc(pData->srcMetadataDescr = MALLOC(sizeof(msgPropDescr_t)));
c17bfd
+		CHKiRet(msgPropDescrFill(pData->srcMetadataDescr, loadModConf->srcMetadataPath,
c17bfd
+			strlen((char *)loadModConf->srcMetadataPath)));
c17bfd
+	}
c17bfd
+	if(pData->dstMetadataPath == NULL)
c17bfd
+		pData->dstMetadataPath = (uchar *) strdup((char *) loadModConf->dstMetadataPath);
c17bfd
+	if(pData->caCertFile == NULL && loadModConf->caCertFile)
c17bfd
+		pData->caCertFile = (uchar *) strdup((char *) loadModConf->caCertFile);
c17bfd
+	if(pData->token == NULL && loadModConf->token)
c17bfd
+		pData->token = (uchar *) strdup((char *) loadModConf->token);
c17bfd
+	if(pData->tokenFile == NULL && loadModConf->tokenFile)
c17bfd
+		pData->tokenFile = (uchar *) strdup((char *) loadModConf->tokenFile);
c17bfd
+	if(pData->de_dot_separator == NULL && loadModConf->de_dot_separator)
c17bfd
+		pData->de_dot_separator = (uchar *) strdup((char *) loadModConf->de_dot_separator);
c17bfd
+	if((pData->annotation_match.nmemb == 0) && (loadModConf->annotation_match.nmemb > 0))
c17bfd
+		copy_annotationmatch(&loadModConf->annotation_match, &pData->annotation_match);
c17bfd
+
c17bfd
+	if(pData->de_dot_separator)
c17bfd
+		pData->de_dot_separator_len = strlen((const char *)pData->de_dot_separator);
c17bfd
+
c17bfd
+	CHKmalloc(pData->contNameDescr = MALLOC(sizeof(msgPropDescr_t)));
c17bfd
+	CHKiRet(msgPropDescrFill(pData->contNameDescr, (uchar*) DFLT_CONTAINER_NAME,
c17bfd
+			strlen(DFLT_CONTAINER_NAME)));
c17bfd
+	CHKmalloc(pData->contIdFullDescr = MALLOC(sizeof(msgPropDescr_t)));
c17bfd
+	CHKiRet(msgPropDescrFill(pData->contIdFullDescr, (uchar*) DFLT_CONTAINER_ID_FULL,
c17bfd
+			strlen(DFLT_CONTAINER_NAME)));
c17bfd
+
c17bfd
+	/* get the cache for this url */
c17bfd
+	for(i = 0; caches[i] != NULL; i++) {
c17bfd
+		if(!strcmp((char *) pData->kubernetesUrl, (char *) caches[i]->kbUrl))
c17bfd
+			break;
c17bfd
+	}
c17bfd
+	if(caches[i] != NULL) {
c17bfd
+		pData->cache = caches[i];
c17bfd
+	} else {
c17bfd
+		CHKmalloc(pData->cache = cacheNew(pData->kubernetesUrl));
c17bfd
+
c17bfd
+		CHKmalloc(caches = realloc(caches, (i + 2) * sizeof(struct cache_s *)));
c17bfd
+		caches[i] = pData->cache;
c17bfd
+		caches[i + 1] = NULL;
c17bfd
+	}
c17bfd
+CODE_STD_FINALIZERnewActInst
c17bfd
+	if(pvals != NULL)
c17bfd
+		cnfparamvalsDestruct(pvals, &actpblk);
c17bfd
+	free(rxstr);
c17bfd
+	free(srcMetadataPath);
c17bfd
+ENDnewActInst
c17bfd
+
c17bfd
+
c17bfd
+/* legacy config format is not supported */
c17bfd
+BEGINparseSelectorAct
c17bfd
+CODESTARTparseSelectorAct
c17bfd
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
c17bfd
+	if(strncmp((char *) p, ":mmkubernetes:", sizeof(":mmkubernetes:") - 1)) {
c17bfd
+		errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
c17bfd
+			"mmkubernetes supports only v6+ config format, use: "
c17bfd
+			"action(type=\"mmkubernetes\" ...)");
c17bfd
+	}
c17bfd
+	ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
c17bfd
+CODE_STD_FINALIZERparseSelectorAct
c17bfd
+ENDparseSelectorAct
c17bfd
+
c17bfd
+
c17bfd
+BEGINendCnfLoad
c17bfd
+CODESTARTendCnfLoad
c17bfd
+ENDendCnfLoad
c17bfd
+
c17bfd
+
c17bfd
+BEGINcheckCnf
c17bfd
+CODESTARTcheckCnf
c17bfd
+ENDcheckCnf
c17bfd
+
c17bfd
+
c17bfd
+BEGINactivateCnf
c17bfd
+CODESTARTactivateCnf
c17bfd
+	runModConf = pModConf;
c17bfd
+ENDactivateCnf
c17bfd
+
c17bfd
+
c17bfd
+BEGINfreeCnf
c17bfd
+CODESTARTfreeCnf
c17bfd
+	int i;
c17bfd
+
c17bfd
+	free(pModConf->kubernetesUrl);
c17bfd
+	free(pModConf->srcMetadataPath);
c17bfd
+	free(pModConf->dstMetadataPath);
c17bfd
+	free(pModConf->caCertFile);
c17bfd
+	free(pModConf->token);
c17bfd
+	free(pModConf->tokenFile);
c17bfd
+	free(pModConf->de_dot_separator);
c17bfd
+	free(pModConf->fnRules);
c17bfd
+	free(pModConf->fnRulebase);
c17bfd
+	free(pModConf->contRules);
c17bfd
+	free(pModConf->contRulebase);
c17bfd
+	free_annotationmatch(&pModConf->annotation_match);
c17bfd
+	for(i = 0; caches[i] != NULL; i++)
c17bfd
+		cacheFree(caches[i]);
c17bfd
+	free(caches);
c17bfd
+ENDfreeCnf
c17bfd
+
c17bfd
+
c17bfd
+BEGINdbgPrintInstInfo
c17bfd
+CODESTARTdbgPrintInstInfo
c17bfd
+	dbgprintf("mmkubernetes\n");
c17bfd
+	dbgprintf("\tkubernetesUrl='%s'\n", pData->kubernetesUrl);
c17bfd
+	dbgprintf("\tsrcMetadataPath='%s'\n", pData->srcMetadataDescr->name);
c17bfd
+	dbgprintf("\tdstMetadataPath='%s'\n", pData->dstMetadataPath);
c17bfd
+	dbgprintf("\ttls.cacert='%s'\n", pData->caCertFile);
c17bfd
+	dbgprintf("\tallowUnsignedCerts='%d'\n", pData->allowUnsignedCerts);
c17bfd
+	dbgprintf("\ttoken='%s'\n", pData->token);
c17bfd
+	dbgprintf("\ttokenFile='%s'\n", pData->tokenFile);
c17bfd
+	dbgprintf("\tde_dot='%d'\n", pData->de_dot);
c17bfd
+	dbgprintf("\tde_dot_separator='%s'\n", pData->de_dot_separator);
c17bfd
+	dbgprintf("\tfilenamerulebase='%s'\n", pData->fnRulebase);
c17bfd
+	dbgprintf("\tcontainerrulebase='%s'\n", pData->contRulebase);
c17bfd
+#if HAVE_LOADSAMPLESFROMSTRING == 1
c17bfd
+	dbgprintf("\tfilenamerules='%s'\n", pData->fnRules);
c17bfd
+	dbgprintf("\tcontainerrules='%s'\n", pData->contRules);
c17bfd
+#endif
c17bfd
+ENDdbgPrintInstInfo
c17bfd
+
c17bfd
+
c17bfd
+BEGINtryResume
c17bfd
+CODESTARTtryResume
c17bfd
+ENDtryResume
c17bfd
+
c17bfd
+static rsRetVal
c17bfd
+extractMsgMetadata(smsg_t *pMsg, instanceData *pData, struct json_object **json)
c17bfd
+{
c17bfd
+	DEFiRet;
c17bfd
+	uchar *filename = NULL, *container_name = NULL, *container_id_full = NULL;
c17bfd
+	rs_size_t fnLen, container_name_len, container_id_full_len;
c17bfd
+	unsigned short freeFn = 0, free_container_name = 0, free_container_id_full = 0;
c17bfd
+	int lnret;
c17bfd
+	struct json_object *cnid = NULL;
c17bfd
+
c17bfd
+	if (!json)
c17bfd
+		FINALIZE;
c17bfd
+	*json = NULL;
c17bfd
+	/* extract metadata from the CONTAINER_NAME field and see if CONTAINER_ID_FULL is present */
c17bfd
+	container_name = MsgGetProp(pMsg, NULL, pData->contNameDescr,
c17bfd
+				    &container_name_len, &free_container_name, NULL);
c17bfd
+	container_id_full = MsgGetProp(
c17bfd
+		pMsg, NULL, pData->contIdFullDescr, &container_id_full_len, &free_container_id_full, NULL);
c17bfd
+
c17bfd
+	if (container_name && container_id_full && container_name_len && container_id_full_len) {
c17bfd
+		dbgprintf("mmkubernetes: CONTAINER_NAME: '%s'  CONTAINER_ID_FULL: '%s'.\n",
c17bfd
+			  container_name, container_id_full);
c17bfd
+		if ((lnret = ln_normalize(pData->contCtxln, (char*)container_name,
c17bfd
+					  container_name_len, json))) {
c17bfd
+			if (LN_WRONGPARSER != lnret) {
c17bfd
+				LogMsg(0, RS_RET_ERR, LOG_ERR,
c17bfd
+					"mmkubernetes: error parsing container_name [%s]: [%d]",
c17bfd
+					container_name, lnret);
c17bfd
+
c17bfd
+				ABORT_FINALIZE(RS_RET_ERR);
c17bfd
+			}
c17bfd
+			/* else assume parser didn't find a match and fall through */
c17bfd
+		} else if (fjson_object_object_get_ex(*json, "pod_name", NULL) &&
c17bfd
+			fjson_object_object_get_ex(*json, "namespace_name", NULL) &&
c17bfd
+			fjson_object_object_get_ex(*json, "container_name", NULL)) {
c17bfd
+			/* if we have fields for pod name, namespace name, container name,
c17bfd
+			 * and container id, we are good to go */
c17bfd
+			/* add field for container id */
c17bfd
+			json_object_object_add(*json, "container_id",
c17bfd
+				json_object_new_string_len((const char *)container_id_full,
c17bfd
+							   container_id_full_len));
c17bfd
+			ABORT_FINALIZE(RS_RET_OK);
c17bfd
+		}
c17bfd
+	}
c17bfd
+
c17bfd
+	/* extract metadata from the file name */
c17bfd
+	filename = MsgGetProp(pMsg, NULL, pData->srcMetadataDescr, &fnLen, &freeFn, NULL);
c17bfd
+	if((filename == NULL) || (fnLen == 0))
c17bfd
+		ABORT_FINALIZE(RS_RET_NOT_FOUND);
c17bfd
+
c17bfd
+	dbgprintf("mmkubernetes: filename: '%s' len %d.\n", filename, fnLen);
c17bfd
+	if ((lnret = ln_normalize(pData->fnCtxln, (char*)filename, fnLen, json))) {
c17bfd
+		if (LN_WRONGPARSER != lnret) {
c17bfd
+			LogMsg(0, RS_RET_ERR, LOG_ERR,
c17bfd
+				"mmkubernetes: error parsing container_name [%s]: [%d]",
c17bfd
+				filename, lnret);
c17bfd
+
c17bfd
+			ABORT_FINALIZE(RS_RET_ERR);
c17bfd
+		} else {
c17bfd
+			/* no match */
c17bfd
+			ABORT_FINALIZE(RS_RET_NOT_FOUND);
c17bfd
+		}
c17bfd
+}
c17bfd
+	/* if we have fields for pod name, namespace name, container name,
c17bfd
+	 * and container id, we are good to go */
c17bfd
+	if (fjson_object_object_get_ex(*json, "pod_name", NULL) &&
c17bfd
+		fjson_object_object_get_ex(*json, "namespace_name", NULL) &&
c17bfd
+		fjson_object_object_get_ex(*json, "container_name_and_id", &cnid)) {
c17bfd
+		/* parse container_name_and_id into container_name and container_id */
c17bfd
+		const char *container_name_and_id = json_object_get_string(cnid);
c17bfd
+		const char *last_dash = NULL;
c17bfd
+		if (container_name_and_id && (last_dash = strrchr(container_name_and_id, '-')) &&
c17bfd
+			*(last_dash + 1) && (last_dash != container_name_and_id)) {
c17bfd
+			json_object_object_add(*json, "container_name",
c17bfd
+				json_object_new_string_len(container_name_and_id,
c17bfd
+							   (int)(last_dash-container_name_and_id)));
c17bfd
+			json_object_object_add(*json, "container_id",
c17bfd
+					json_object_new_string(last_dash + 1));
c17bfd
+			ABORT_FINALIZE(RS_RET_OK);
c17bfd
+		}
c17bfd
+	}
c17bfd
+	ABORT_FINALIZE(RS_RET_NOT_FOUND);
c17bfd
+finalize_it:
c17bfd
+	if(freeFn)
c17bfd
+		free(filename);
c17bfd
+	if (free_container_name)
c17bfd
+		free(container_name);
c17bfd
+	if (free_container_id_full)
c17bfd
+		free(container_id_full);
c17bfd
+	if (iRet != RS_RET_OK) {
c17bfd
+		json_object_put(*json);
c17bfd
+		*json = NULL;
c17bfd
+	}
c17bfd
+	RETiRet;
c17bfd
+}
c17bfd
+
c17bfd
+
c17bfd
+static rsRetVal
c17bfd
+queryKB(wrkrInstanceData_t *pWrkrData, char *url, struct json_object **rply)
c17bfd
+{
c17bfd
+	DEFiRet;
c17bfd
+	CURLcode ccode;
c17bfd
+	struct json_tokener *jt = NULL;
c17bfd
+	struct json_object *jo;
c17bfd
+	long resp_code = 400;
c17bfd
+
c17bfd
+	/* query kubernetes for pod info */
c17bfd
+	ccode = curl_easy_setopt(pWrkrData->curlCtx, CURLOPT_URL, url);
c17bfd
+	if(ccode != CURLE_OK)
c17bfd
+		ABORT_FINALIZE(RS_RET_ERR);
c17bfd
+	if(CURLE_OK != (ccode = curl_easy_perform(pWrkrData->curlCtx))) {
c17bfd
+		errmsg.LogMsg(0, RS_RET_ERR, LOG_ERR,
c17bfd
+			      "mmkubernetes: failed to connect to [%s] - %d:%s\n",
c17bfd
+			      url, ccode, curl_easy_strerror(ccode));
c17bfd
+		ABORT_FINALIZE(RS_RET_ERR);
c17bfd
+	}
c17bfd
+	if(CURLE_OK != (ccode = curl_easy_getinfo(pWrkrData->curlCtx,
c17bfd
+					CURLINFO_RESPONSE_CODE, &resp_code))) {
c17bfd
+		errmsg.LogMsg(0, RS_RET_ERR, LOG_ERR,
c17bfd
+			      "mmkubernetes: could not get response code from query to [%s] - %d:%s\n",
c17bfd
+			      url, ccode, curl_easy_strerror(ccode));
c17bfd
+		ABORT_FINALIZE(RS_RET_ERR);
c17bfd
+	}
c17bfd
+	if(resp_code == 401) {
c17bfd
+		errmsg.LogMsg(0, RS_RET_ERR, LOG_ERR,
c17bfd
+			      "mmkubernetes: Unauthorized: not allowed to view url - "
c17bfd
+			      "check token/auth credentials [%s]\n",
c17bfd
+			      url);
c17bfd
+		ABORT_FINALIZE(RS_RET_ERR);
c17bfd
+	}
c17bfd
+	if(resp_code == 403) {
c17bfd
+		errmsg.LogMsg(0, RS_RET_ERR, LOG_ERR,
c17bfd
+			      "mmkubernetes: Forbidden: no access - "
c17bfd
+			      "check permissions to view url [%s]\n",
c17bfd
+			      url);
c17bfd
+		ABORT_FINALIZE(RS_RET_ERR);
c17bfd
+	}
c17bfd
+	if(resp_code == 404) {
c17bfd
+		errmsg.LogMsg(0, RS_RET_ERR, LOG_ERR,
c17bfd
+			      "mmkubernetes: Not Found: the resource does not exist at url [%s]\n",
c17bfd
+			      url);
c17bfd
+		ABORT_FINALIZE(RS_RET_ERR);
c17bfd
+	}
c17bfd
+	if(resp_code == 429) {
c17bfd
+		errmsg.LogMsg(0, RS_RET_ERR, LOG_ERR,
c17bfd
+			      "mmkubernetes: Too Many Requests: the server is too heavily loaded "
c17bfd
+			      "to provide the data for the requested url [%s]\n",
c17bfd
+			      url);
c17bfd
+		ABORT_FINALIZE(RS_RET_ERR);
c17bfd
+	}
c17bfd
+	if(resp_code != 200) {
c17bfd
+		errmsg.LogMsg(0, RS_RET_ERR, LOG_ERR,
c17bfd
+			      "mmkubernetes: server returned unexpected code [%ld] for url [%s]\n",
c17bfd
+			      resp_code, url);
c17bfd
+		ABORT_FINALIZE(RS_RET_ERR);
c17bfd
+	}
c17bfd
+	/* parse retrieved data */
c17bfd
+	jt = json_tokener_new();
c17bfd
+	json_tokener_reset(jt);
c17bfd
+	jo = json_tokener_parse_ex(jt, pWrkrData->curlRply, pWrkrData->curlRplyLen);
c17bfd
+	json_tokener_free(jt);
c17bfd
+	if(!json_object_is_type(jo, json_type_object)) {
c17bfd
+		json_object_put(jo);
c17bfd
+		jo = NULL;
c17bfd
+		errmsg.LogMsg(0, RS_RET_JSON_PARSE_ERR, LOG_INFO,
c17bfd
+			      "mmkubernetes: unable to parse string as JSON:[%.*s]\n",
c17bfd
+			      (int)pWrkrData->curlRplyLen, pWrkrData->curlRply);
c17bfd
+		ABORT_FINALIZE(RS_RET_JSON_PARSE_ERR);
c17bfd
+	}
c17bfd
+
c17bfd
+	dbgprintf("mmkubernetes: queryKB reply:\n%s\n",
c17bfd
+		json_object_to_json_string_ext(jo, JSON_C_TO_STRING_PRETTY));
c17bfd
+
c17bfd
+	*rply = jo;
c17bfd
+
c17bfd
+finalize_it:
c17bfd
+	if(pWrkrData->curlRply != NULL) {
c17bfd
+		free(pWrkrData->curlRply);
c17bfd
+		pWrkrData->curlRply = NULL;
c17bfd
+		pWrkrData->curlRplyLen = 0;
c17bfd
+	}
c17bfd
+	RETiRet;
c17bfd
+}
c17bfd
+
c17bfd
+
c17bfd
+/* versions < 8.16.0 don't support BEGINdoAction_NoStrings */
c17bfd
+#if defined(BEGINdoAction_NoStrings)
c17bfd
+BEGINdoAction_NoStrings
c17bfd
+	smsg_t **ppMsg = (smsg_t **) pMsgData;
c17bfd
+	smsg_t *pMsg = ppMsg[0];
c17bfd
+#else
c17bfd
+BEGINdoAction
c17bfd
+	smsg_t *pMsg = (smsg_t*) ppString[0];
c17bfd
+#endif
c17bfd
+	const char *podName = NULL, *ns = NULL, *containerName = NULL,
c17bfd
+		*containerID = NULL;
c17bfd
+	char *mdKey = NULL;
c17bfd
+	struct json_object *jMetadata = NULL, *jMetadataCopy = NULL, *jMsgMeta = NULL,
c17bfd
+			*jo = NULL;
c17bfd
+	int add_ns_metadata = 0;
c17bfd
+CODESTARTdoAction
c17bfd
+	CHKiRet_Hdlr(extractMsgMetadata(pMsg, pWrkrData->pData, &jMsgMeta)) {
c17bfd
+		ABORT_FINALIZE((iRet == RS_RET_NOT_FOUND) ? RS_RET_OK : iRet);
c17bfd
+	}
c17bfd
+
c17bfd
+	if (fjson_object_object_get_ex(jMsgMeta, "pod_name", &jo))
c17bfd
+		podName = json_object_get_string(jo);
c17bfd
+	if (fjson_object_object_get_ex(jMsgMeta, "namespace_name", &jo))
c17bfd
+		ns = json_object_get_string(jo);
c17bfd
+	if (fjson_object_object_get_ex(jMsgMeta, "container_name", &jo))
c17bfd
+		containerName = json_object_get_string(jo);
c17bfd
+	if (fjson_object_object_get_ex(jMsgMeta, "container_id", &jo))
c17bfd
+		containerID = json_object_get_string(jo);
c17bfd
+	assert(podName != NULL);
c17bfd
+	assert(ns != NULL);
c17bfd
+	assert(containerName != NULL);
c17bfd
+	assert(containerID != NULL);
c17bfd
+
c17bfd
+	dbgprintf("mmkubernetes:\n  podName: '%s'\n  namespace: '%s'\n  containerName: '%s'\n"
c17bfd
+		"  containerID: '%s'\n", podName, ns, containerName, containerID);
c17bfd
+
c17bfd
+	/* check cache for metadata */
c17bfd
+	if ((-1 == asprintf(&mdKey, "%s_%s_%s", ns, podName, containerName)) ||
c17bfd
+		(!mdKey)) {
c17bfd
+		ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
c17bfd
+	}
c17bfd
+	pthread_mutex_lock(pWrkrData->pData->cache->cacheMtx);
c17bfd
+	jMetadata = hashtable_search(pWrkrData->pData->cache->mdHt, mdKey);
c17bfd
+
c17bfd
+	if(jMetadata == NULL) {
c17bfd
+		char *url = NULL;
c17bfd
+		struct json_object *jReply = NULL, *jo2 = NULL, *jNsMeta = NULL, *jPodData = NULL;
c17bfd
+
c17bfd
+		/* check cache for namespace metadata */
c17bfd
+		jNsMeta = hashtable_search(pWrkrData->pData->cache->nsHt, (char *)ns);
c17bfd
+
c17bfd
+		if(jNsMeta == NULL) {
c17bfd
+			/* query kubernetes for namespace info */
c17bfd
+			/* todo: move url definitions elsewhere */
c17bfd
+			if ((-1 == asprintf(&url, "%s/api/v1/namespaces/%s",
c17bfd
+				 (char *) pWrkrData->pData->kubernetesUrl, ns)) ||
c17bfd
+				(!url)) {
c17bfd
+				pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
c17bfd
+				ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
c17bfd
+			}
c17bfd
+			iRet = queryKB(pWrkrData, url, &jReply);
c17bfd
+			free(url);
c17bfd
+			/* todo: implement support for the .orphaned namespace */
c17bfd
+			if (iRet != RS_RET_OK) {
c17bfd
+				json_object_put(jReply);
c17bfd
+				jReply = NULL;
c17bfd
+				pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
c17bfd
+				FINALIZE;
c17bfd
+			}
c17bfd
+
c17bfd
+			if(fjson_object_object_get_ex(jReply, "metadata", &jNsMeta)) {
c17bfd
+				jNsMeta = json_object_get(jNsMeta);
c17bfd
+				parse_labels_annotations(jNsMeta, &pWrkrData->pData->annotation_match,
c17bfd
+					pWrkrData->pData->de_dot,
c17bfd
+					(const char *)pWrkrData->pData->de_dot_separator,
c17bfd
+					pWrkrData->pData->de_dot_separator_len);
c17bfd
+				add_ns_metadata = 1;
c17bfd
+			} else {
c17bfd
+				/* namespace with no metadata??? */
c17bfd
+				errmsg.LogMsg(0, RS_RET_ERR, LOG_INFO,
c17bfd
+					      "mmkubernetes: namespace [%s] has no metadata!\n", ns);
c17bfd
+				jNsMeta = NULL;
c17bfd
+			}
c17bfd
+
c17bfd
+			json_object_put(jReply);
c17bfd
+			jReply = NULL;
c17bfd
+		}
c17bfd
+
c17bfd
+		if ((-1 == asprintf(&url, "%s/api/v1/namespaces/%s/pods/%s",
c17bfd
+			 (char *) pWrkrData->pData->kubernetesUrl, ns, podName)) ||
c17bfd
+			(!url)) {
c17bfd
+			pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
c17bfd
+			ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
c17bfd
+		}
c17bfd
+		iRet = queryKB(pWrkrData, url, &jReply);
c17bfd
+		free(url);
c17bfd
+		if(iRet != RS_RET_OK) {
c17bfd
+			if(jNsMeta && add_ns_metadata) {
c17bfd
+				hashtable_insert(pWrkrData->pData->cache->nsHt, strdup(ns), jNsMeta);
c17bfd
+			}
c17bfd
+			json_object_put(jReply);
c17bfd
+			jReply = NULL;
c17bfd
+			pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
c17bfd
+			FINALIZE;
c17bfd
+		}
c17bfd
+
c17bfd
+		jo = json_object_new_object();
c17bfd
+		if(jNsMeta && fjson_object_object_get_ex(jNsMeta, "uid", &jo2))
c17bfd
+			json_object_object_add(jo, "namespace_id", json_object_get(jo2));
c17bfd
+		if(jNsMeta && fjson_object_object_get_ex(jNsMeta, "labels", &jo2))
c17bfd
+			json_object_object_add(jo, "namespace_labels", json_object_get(jo2));
c17bfd
+		if(jNsMeta && fjson_object_object_get_ex(jNsMeta, "annotations", &jo2))
c17bfd
+			json_object_object_add(jo, "namespace_annotations", json_object_get(jo2));
c17bfd
+		if(jNsMeta && fjson_object_object_get_ex(jNsMeta, "creationTimestamp", &jo2))
c17bfd
+			json_object_object_add(jo, "creation_timestamp", json_object_get(jo2));
c17bfd
+		if(fjson_object_object_get_ex(jReply, "metadata", &jPodData)) {
c17bfd
+			if(fjson_object_object_get_ex(jPodData, "uid", &jo2))
c17bfd
+				json_object_object_add(jo, "pod_id", json_object_get(jo2));
c17bfd
+			parse_labels_annotations(jPodData, &pWrkrData->pData->annotation_match,
c17bfd
+				pWrkrData->pData->de_dot,
c17bfd
+				(const char *)pWrkrData->pData->de_dot_separator,
c17bfd
+				pWrkrData->pData->de_dot_separator_len);
c17bfd
+			if(fjson_object_object_get_ex(jPodData, "annotations", &jo2))
c17bfd
+				json_object_object_add(jo, "annotations", json_object_get(jo2));
c17bfd
+			if(fjson_object_object_get_ex(jPodData, "labels", &jo2))
c17bfd
+				json_object_object_add(jo, "labels", json_object_get(jo2));
c17bfd
+		}
c17bfd
+		if(fjson_object_object_get_ex(jReply, "spec", &jPodData)) {
c17bfd
+			if(fjson_object_object_get_ex(jPodData, "nodeName", &jo2)) {
c17bfd
+				json_object_object_add(jo, "host", json_object_get(jo2));
c17bfd
+			}
c17bfd
+		}
c17bfd
+		json_object_put(jReply);
c17bfd
+		jReply = NULL;
c17bfd
+
c17bfd
+		if (fjson_object_object_get_ex(jMsgMeta, "pod_name", &jo2))
c17bfd
+			json_object_object_add(jo, "pod_name", json_object_get(jo2));
c17bfd
+		if (fjson_object_object_get_ex(jMsgMeta, "namespace_name", &jo2))
c17bfd
+			json_object_object_add(jo, "namespace_name", json_object_get(jo2));
c17bfd
+		if (fjson_object_object_get_ex(jMsgMeta, "container_name", &jo2))
c17bfd
+			json_object_object_add(jo, "container_name", json_object_get(jo2));
c17bfd
+		json_object_object_add(jo, "master_url",
c17bfd
+			json_object_new_string((const char *)pWrkrData->pData->kubernetesUrl));
c17bfd
+		jMetadata = json_object_new_object();
c17bfd
+		json_object_object_add(jMetadata, "kubernetes", jo);
c17bfd
+		jo = json_object_new_object();
c17bfd
+		if (fjson_object_object_get_ex(jMsgMeta, "container_id", &jo2))
c17bfd
+			json_object_object_add(jo, "container_id", json_object_get(jo2));
c17bfd
+		json_object_object_add(jMetadata, "docker", jo);
c17bfd
+
c17bfd
+		hashtable_insert(pWrkrData->pData->cache->mdHt, mdKey, jMetadata);
c17bfd
+		mdKey = NULL;
c17bfd
+		if(jNsMeta && add_ns_metadata) {
c17bfd
+			hashtable_insert(pWrkrData->pData->cache->nsHt, strdup(ns), jNsMeta);
c17bfd
+			ns = NULL;
c17bfd
+		}
c17bfd
+	}
c17bfd
+
c17bfd
+	/* make a copy of the metadata for the msg to own */
c17bfd
+	/* todo: use json_object_deep_copy when implementation available in libfastjson */
c17bfd
+	/* yes, this is expensive - but there is no other way to make this thread safe - we
c17bfd
+	 * can't allow the msg to have a shared pointer to an element inside the cache,
c17bfd
+	 * outside of the cache lock
c17bfd
+	 */
c17bfd
+	jMetadataCopy = json_tokener_parse(json_object_get_string(jMetadata));
c17bfd
+	pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
c17bfd
+	/* the +1 is there to skip the leading '$' */
c17bfd
+	msgAddJSON(pMsg, (uchar *) pWrkrData->pData->dstMetadataPath + 1, jMetadataCopy, 0, 0);
c17bfd
+
c17bfd
+finalize_it:
c17bfd
+	json_object_put(jMsgMeta);
c17bfd
+	free(mdKey);
c17bfd
+ENDdoAction
c17bfd
+
c17bfd
+
c17bfd
+BEGINisCompatibleWithFeature
c17bfd
+CODESTARTisCompatibleWithFeature
c17bfd
+ENDisCompatibleWithFeature
c17bfd
+
c17bfd
+
c17bfd
+/* all the macros bellow have to be in a specific order */
c17bfd
+BEGINmodExit
c17bfd
+CODESTARTmodExit
c17bfd
+	curl_global_cleanup();
c17bfd
+
c17bfd
+	objRelease(regexp, LM_REGEXP_FILENAME);
c17bfd
+	objRelease(errmsg, CORE_COMPONENT);
c17bfd
+ENDmodExit
c17bfd
+
c17bfd
+
c17bfd
+BEGINqueryEtryPt
c17bfd
+CODESTARTqueryEtryPt
c17bfd
+CODEqueryEtryPt_STD_OMOD_QUERIES
c17bfd
+CODEqueryEtryPt_STD_OMOD8_QUERIES
c17bfd
+CODEqueryEtryPt_STD_CONF2_QUERIES
c17bfd
+CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
c17bfd
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
c17bfd
+ENDqueryEtryPt
c17bfd
+
c17bfd
+
c17bfd
+BEGINmodInit()
c17bfd
+CODESTARTmodInit
c17bfd
+	*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
c17bfd
+CODEmodInit_QueryRegCFSLineHdlr
c17bfd
+	DBGPRINTF("mmkubernetes: module compiled with rsyslog version %s.\n", VERSION);
c17bfd
+	CHKiRet(objUse(errmsg, CORE_COMPONENT));
c17bfd
+	CHKiRet(objUse(regexp, LM_REGEXP_FILENAME));
c17bfd
+
c17bfd
+	/* CURL_GLOBAL_ALL initializes more than is needed but the
c17bfd
+	 * libcurl documentation discourages use of other values
c17bfd
+	 */
c17bfd
+	curl_global_init(CURL_GLOBAL_ALL);
c17bfd
+ENDmodInit
c17bfd
diff --git a/contrib/mmkubernetes/sample.conf b/contrib/mmkubernetes/sample.conf
c17bfd
new file mode 100644
c17bfd
index 000000000..4c400ed51
c17bfd
--- /dev/null
c17bfd
+++ b/contrib/mmkubernetes/sample.conf
c17bfd
@@ -0,0 +1,7 @@
c17bfd
+module(load="mmkubernetes") # see docs for all module and action parameters
c17bfd
+
c17bfd
+# $!metadata!filename added by imfile using addmetadata="on"
c17bfd
+# e.g. input(type="imfile" file="/var/log/containers/*.log" tag="kubernetes" addmetadata="on")
c17bfd
+# $!CONTAINER_NAME and $!CONTAINER_ID_FULL added by imjournal
c17bfd
+
c17bfd
+action(type="mmkubernetes")
c17bfd
-- 
c17bfd
2.14.4
c17bfd