Blob Blame History Raw
From e8d64cbd15fa84907dc23f8b52d6f2f847b46fec Mon Sep 17 00:00:00 2001
From: Rich Megginson <rmeggins@redhat.com>
Date: Mon, 10 Sep 2018 17:25:38 -0600
Subject: [PATCH] imfile: support for endmsg.regex

This adds support for endmsg.regex.  It is similar to
startmsg.regex except that it matches the line that denotes
the end of the message, rather than the start of the next message.
This is primarily for container log file use cases such as this:

    date stdout P start of message
    date stdout P  middle of message
    date stdout F  end of message

The `F` means this is the line which contains the final part of
the message.  The fully assembled message should be
`start of message middle of message end of message`.
`startmsg.regex="^[^ ]+ stdout F "` will match.

(cherry picked from commit c902a0938fe163b5351829d2b72001d024895c16)
(cherry picked from commit dd4a72c4d52d8da98ed6b86114868e1a450ccb41)
---
 plugins/imfile/imfile.c                      |  44 ++++--
 plugins/imptcp/imptcp.c                      |  10 +-
 runtime/stream.c                             |  28 +++-
 runtime/stream.h                             |   2 +-
 4 files changed, 62 insertions(+), 20 deletions(-)

diff --git a/plugins/imfile/imfile.c b/plugins/imfile/imfile.c
index 7767c9f02..87706082f 100644
--- a/plugins/imfile/imfile.c
+++ b/plugins/imfile/imfile.c
@@ -126,7 +126,9 @@ struct instanceConf_s {
 	sbool bRMStateOnDel;
 	uint8_t readMode;
 	uchar *startRegex;
-	regex_t end_preg;	/* compiled version of startRegex */
+	uchar *endRegex;
+	regex_t start_preg;	/* compiled version of startRegex */
+	regex_t end_preg;	/* compiled version of endRegex */
 	sbool discardTruncatedMsg;
 	sbool msgDiscardingError;
 	sbool escapeLF;
@@ -281,6 +283,7 @@ static struct cnfparamdescr inppdescr[] = {
 	{ "ruleset", eCmdHdlrString, 0 },
 	{ "readmode", eCmdHdlrInt, 0 },
 	{ "startmsg.regex", eCmdHdlrString, 0 },
+	{ "endmsg.regex", eCmdHdlrString, 0 },
 	{ "discardtruncatedmsg", eCmdHdlrBinary, 0 },
 	{ "msgdiscardingerror", eCmdHdlrBinary, 0 },
 	{ "escapelf", eCmdHdlrBinary, 0 },
@@ -1421,6 +1424,7 @@ pollFileReal(act_obj_t *act, cstr_t **pCStr)
 	int64 strtOffs;
 	DEFiRet;
 	int nProcessed = 0;
+	regex_t *start_preg = NULL, *end_preg = NULL;
 
 	DBGPRINTF("pollFileReal enter, pStrm %p, name '%s'\n", act->pStrm, act->name);
 	DBGPRINTF("pollFileReal enter, edge %p\n", act->edge);
@@ -1432,15 +1436,18 @@ pollFileReal(act_obj_t *act, cstr_t **pCStr)
 		CHKiRet(openFile(act)); /* open file */
 	}
 
+	start_preg = (inst->startRegex == NULL) ? NULL : &inst->start_preg;
+	end_preg = (inst->endRegex == NULL) ? NULL : &inst->end_preg;
+
 	/* loop below will be exited when strmReadLine() returns EOF */
 	while(glbl.GetGlobalInputTermState() == 0) {
 		if(inst->maxLinesAtOnce != 0 && nProcessed >= inst->maxLinesAtOnce)
 			break;
-		if(inst->startRegex == NULL) {
+		if((start_preg == NULL) && (end_preg == NULL)) {
 			CHKiRet(strm.ReadLine(act->pStrm, pCStr, inst->readMode, inst->escapeLF,
 				inst->trimLineOverBytes, &strtOffs));
 		} else {
-			CHKiRet(strmReadMultiLine(act->pStrm, pCStr, &inst->end_preg,
+			CHKiRet(strmReadMultiLine(act->pStrm, pCStr, start_preg, end_preg,
 				inst->escapeLF, inst->discardTruncatedMsg, inst->msgDiscardingError, &strtOffs));
 		}
 		++nProcessed;
@@ -1506,6 +1513,7 @@ createInstance(instanceConf_t **const pinst)
 	inst->iPersistStateInterval = 0;
 	inst->readMode = 0;
 	inst->startRegex = NULL;
+	inst->endRegex = NULL;
 	inst->discardTruncatedMsg = 0;
 	inst->msgDiscardingError = 1;
 	inst->bRMStateOnDel = 1;
@@ -1713,6 +1721,8 @@ CODESTARTnewInpInst
 			inst->readMode = (sbool) pvals[i].val.d.n;
 		} else if(!strcmp(inppblk.descr[i].name, "startmsg.regex")) {
 			inst->startRegex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
+		} else if(!strcmp(inppblk.descr[i].name, "endmsg.regex")) {
+			inst->endRegex = (uchar*)es_str2cstr(pvals[i].val.d.estr, NULL);
 		} else if(!strcmp(inppblk.descr[i].name, "discardtruncatedmsg")) {
 			inst->discardTruncatedMsg = (sbool) pvals[i].val.d.n;
 		} else if(!strcmp(inppblk.descr[i].name, "msgdiscardingerror")) {
@@ -1753,19 +1763,31 @@ CODESTARTnewInpInst
 			  "param '%s'\n", inppblk.descr[i].name);
 		}
 	}
-	if(inst->readMode != 0 &&  inst->startRegex != NULL) {
+	i = (inst->readMode > 0) ? 1 : 0;
+	i = (NULL != inst->startRegex) ? (i+1) : i;
+	i = (NULL != inst->endRegex) ? (i+1) : i;
+	if(i > 1) {
 		LogError(0, RS_RET_PARAM_NOT_PERMITTED,
-			"readMode and startmsg.regex cannot be set "
-			"at the same time --- remove one of them");
+			"only one of readMode or startmsg.regex or endmsg.regex can be set "
+			"at the same time");
 			ABORT_FINALIZE(RS_RET_PARAM_NOT_PERMITTED);
 	}
 
 	if(inst->startRegex != NULL) {
-		const int errcode = regcomp(&inst->end_preg, (char*)inst->startRegex, REG_EXTENDED);
+		const int errcode = regcomp(&inst->start_preg, (char*)inst->startRegex, REG_EXTENDED);
+		if(errcode != 0) {
+			char errbuff[512];
+			regerror(errcode, &inst->start_preg, errbuff, sizeof(errbuff));
+			parser_errmsg("imfile: error in startmsg.regex expansion: %s", errbuff);
+			ABORT_FINALIZE(RS_RET_ERR);
+		}
+	}
+	if(inst->endRegex != NULL) {
+		const int errcode = regcomp(&inst->end_preg, (char*)inst->endRegex, REG_EXTENDED);
 		if(errcode != 0) {
 			char errbuff[512];
 			regerror(errcode, &inst->end_preg, errbuff, sizeof(errbuff));
-			parser_errmsg("imfile: error in regex expansion: %s", errbuff);
+			parser_errmsg("imfile: error in endmsg.regex expansion: %s", errbuff);
 			ABORT_FINALIZE(RS_RET_ERR);
 		}
 	}
@@ -1970,9 +1992,13 @@ CODESTARTfreeCnf
 		free(inst->pszStateFile);
 		free(inst->pszFileName_forOldStateFile);
 		if(inst->startRegex != NULL) {
-			regfree(&inst->end_preg);
+			regfree(&inst->start_preg);
 			free(inst->startRegex);
 		}
+		if(inst->endRegex != NULL) {
+			regfree(&inst->end_preg);
+			free(inst->endRegex);
+		}
 		del = inst;
 		inst = inst->next;
 		free(del);
diff --git a/plugins/imptcp/imptcp.c b/plugins/imptcp/imptcp.c
index 9b6be0f40..a94b97f41 100644
--- a/plugins/imptcp/imptcp.c
+++ b/plugins/imptcp/imptcp.c
@@ -162,7 +162,7 @@ struct instanceConf_s {
 	int ratelimitInterval;
 	int ratelimitBurst;
 	uchar *startRegex;
-	regex_t end_preg;	/* compiled version of startRegex */
+	regex_t start_preg;	/* compiled version of startRegex */
 	struct instanceConf_s *next;
 };
 
@@ -961,7 +961,7 @@ processDataRcvd_regexFraming(ptcpsess_t *const __restrict__ pThis,
 	if(c == '\n') {
 		pThis->iCurrLine = pThis->iMsg;
 	} else {
-		const int isMatch = !regexec(&inst->end_preg, (char*)pThis->pMsg+pThis->iCurrLine, 0, NULL, 0);
+		const int isMatch = !regexec(&inst->start_preg, (char*)pThis->pMsg+pThis->iCurrLine, 0, NULL, 0);
 		if(isMatch) {
 			DBGPRINTF("regex match (%d), framing line: %s\n", pThis->iCurrLine, pThis->pMsg);
 			strcpy((char*)pThis->pMsg_save, (char*) pThis->pMsg+pThis->iCurrLine);
@@ -2188,10 +2188,10 @@ CODESTARTnewInpInst
 	}
 
 	if(inst->startRegex != NULL) {
-		const int errcode = regcomp(&inst->end_preg, (char*)inst->startRegex, REG_EXTENDED);
+		const int errcode = regcomp(&inst->start_preg, (char*)inst->startRegex, REG_EXTENDED);
 		if(errcode != 0) {
 			char errbuff[512];
-			regerror(errcode, &inst->end_preg, errbuff, sizeof(errbuff));
+			regerror(errcode, &inst->start_preg, errbuff, sizeof(errbuff));
 			parser_errmsg("imptcp: error in framing.delimiter.regex expansion: %s", errbuff);
 			ABORT_FINALIZE(RS_RET_ERR);
 		}
@@ -2348,7 +2348,7 @@ CODESTARTfreeCnf
 		free(inst->pszInputName);
 		free(inst->dfltTZ);
 		if(inst->startRegex != NULL) {
-			regfree(&inst->end_preg);
+			regfree(&inst->start_preg);
 			free(inst->startRegex);
 		}
 		del = inst;
diff --git a/runtime/stream.c b/runtime/stream.c
index 6b7e7028e..0f4197103 100644
--- a/runtime/stream.c
+++ b/runtime/stream.c
@@ -942,12 +942,12 @@ strmReadMultiLine_isTimedOut(const strm_t *const __restrict__ pThis)
 
 /* read a multi-line message from a strm file.
  * The multi-line message is terminated based on the user-provided
- * startRegex (Posix ERE). For performance reasons, the regex
+ * startRegex or endRegex (Posix ERE). For performance reasons, the regex
  * must already have been compiled by the user.
  * added 2015-05-12 rgerhards
  */
 rsRetVal
-strmReadMultiLine(strm_t *pThis, cstr_t **ppCStr, regex_t *preg, const sbool bEscapeLF,
+strmReadMultiLine(strm_t *pThis, cstr_t **ppCStr, regex_t *start_preg, regex_t *end_preg, const sbool bEscapeLF,
 	const sbool discardTruncatedMsg, const sbool msgDiscardingError, int64 *const strtOffs)
 {
 	uchar c;
@@ -979,9 +979,14 @@ strmReadMultiLine(strm_t *pThis, cstr_t **ppCStr, regex_t *preg, const sbool bEs
 		cstrFinalize(thisLine);
 
 		/* we have a line, now let's assemble the message */
-		const int isMatch = !regexec(preg, (char*)rsCStrGetSzStrNoNULL(thisLine), 0, NULL, 0);
-
-		if(isMatch) {
+		const int isStartMatch = start_preg ?
+				!regexec(start_preg, (char*)rsCStrGetSzStrNoNULL(thisLine), 0, NULL, 0) :
+				0;
+		const int isEndMatch = end_preg ?
+				!regexec(end_preg, (char*)rsCStrGetSzStrNoNULL(thisLine), 0, NULL, 0) :
+				0;
+
+		if(isStartMatch) {
 			/* in this case, the *previous* message is complete and we are
 			 * at the start of a new one.
 			 */
@@ -1047,6 +1052,19 @@ strmReadMultiLine(strm_t *pThis, cstr_t **ppCStr, regex_t *preg, const sbool bEs
 				}
 			}
 		}
+		if(isEndMatch) {
+			/* in this case, the *current* message is complete and we are
+			 * at the end of it.
+			 */
+			if(pThis->ignoringMsg == 0) {
+				if(pThis->prevMsgSegment != NULL) {
+					finished = 1;
+					*ppCStr = pThis->prevMsgSegment;
+					pThis->prevMsgSegment= NULL;
+				}
+			}
+			pThis->ignoringMsg = 0;
+		}
 		cstrDestruct(&thisLine);
 	} while(finished == 0);
 
diff --git a/runtime/stream.h b/runtime/stream.h
index 71596879e..7dc597ff5 100644
--- a/runtime/stream.h
+++ b/runtime/stream.h
@@ -225,7 +225,7 @@ ENDinterface(strm)
 /* prototypes */
 PROTOTYPEObjClassInit(strm);
 rsRetVal strmMultiFileSeek(strm_t *pThis, unsigned int fileNum, off64_t offs, off64_t *bytesDel);
-rsRetVal strmReadMultiLine(strm_t *pThis, cstr_t **ppCStr, regex_t *preg,
+rsRetVal strmReadMultiLine(strm_t *pThis, cstr_t **ppCStr, regex_t *start_preg, regex_t *end_preg,
 	sbool bEscapeLF, sbool discardTruncatedMsg, sbool msgDiscardingError, int64 *const strtOffs);
 int strmReadMultiLine_isTimedOut(const strm_t *const __restrict__ pThis);
 void strmDebugOutBuf(const strm_t *const pThis);