Blame SOURCES/rsyslog-8.24.0-rhbz1538372-imjournal-duplicates.patch

fde8c1
From: Jiri Vymazal <jvymazal@redhat.com>
fde8c1
Date: Wed, 25 Jul 2018 15:05:01 -0500
fde8c1
fde8c1
modification and merge of below patches for RHEL consumers, 
fde8c1
also modified journal invalidate/rotation handling to keep possibility
fde8c1
to continue after switch of persistent journal
fde8c1
original:
fde8c1
%From 3bede5ba768975c8b6fe3d1f3e11075910f52fdd Mon Sep 17 00:00:00 2001
fde8c1
%From: Jiri Vymazal <jvymazal@redhat.com>
fde8c1
%Date: Wed, 7 Mar 2018 11:57:29 +0100
fde8c1
%Subject: [PATCH] Fetching cursor on readJournal() and simplified pollJournal()
fde8c1
%
fde8c1
%Fetching journal cursor in persistJournal could cause us to save
fde8c1
%invalid cursor leading to duplicating messages further on, now we are
fde8c1
%saving it on each readJournal() where we now that the state is good.
fde8c1
%This result in simplyfing persisJournalState() a bit as well.
fde8c1
%
fde8c1
%pollJournal() is now cleaner and faster, correctly handles INVALIDATE
fde8c1
%status from journald and is able to continue polling after journal
fde8c1
%flush. Also reduced POLL_TIMEOUT a bit as it caused rsyslog to exit
fde8c1
%with error in corner cases for some ppc when left at full second.
fde8c1
plus
fde8c1
%
fde8c1
%From a99f9b4b42d261c384aee09306fc421df2cca7a5 Mon Sep 17 00:00:00 2001
fde8c1
%From: Peter Portante <peter.a.portante@gmail.com>
fde8c1
%Date: Wed, 24 Jan 2018 19:34:41 -0500
fde8c1
%Subject: [PATCH] Proposed fix for handling journal correctly
fde8c1
%
fde8c1
%The fix is to immediately setup the inotify file descriptor via
fde8c1
%`sd_journal_get_fd()` right after a journal open, and then
fde8c1
%periodically call `sd_journal_process()` to give the client API
fde8c1
%library a chance to detect deleted journal files on disk that need to
fde8c1
%be closed so they can be properly erased by the file system.
fde8c1
%
fde8c1
%We remove the open/close dance and simplify that code as a result.
fde8c1
%
fde8c1
%Fixes issue #2436.
fde8c1
and also:
fde8c1
%From 27f96c84d34ee000fbb5d45b00233f2ec3cf2d8a Mon Sep 17 00:00:00 2001
fde8c1
%From: Rainer Gerhards <rgerhards@adiscon.com>
fde8c1
%Date: Tue, 24 Oct 2017 16:14:13 +0200
fde8c1
%Subject: [PATCH] imjournal bugfix: do not disable itself on error
fde8c1
%
fde8c1
%If some functions calls inside the main loop failed, imjournal exited
fde8c1
%with an error code, actually disabling all logging from the journal.
fde8c1
%This was probably never intended.
fde8c1
%
fde8c1
%This patch makes imjournal recover the situation instead.
fde8c1
%
fde8c1
%closes https://github.com/rsyslog/rsyslog/issues/1895
fde8c1
---
fde8c1
 plugins/imjournal/imjournal.c | 211 ++++++++++++++++++++++--------------------
fde8c1
 1 file changed, 110 insertions(+), 102 deletions(-)
fde8c1
fde8c1
--- a/plugins/imjournal/imjournal.c
fde8c1
+++ b/plugins/imjournal/imjournal.c
fde8c1
@@ -80,6 +80,7 @@ static struct configSettings_s {
fde8c1
 	int iDfltFacility;
fde8c1
 	int bUseJnlPID;
fde8c1
	char *dfltTag;
fde8c1
+	int bWorkAroundJournalBug;
fde8c1
 } cs;
fde8c1
 
fde8c1
 static rsRetVal facilityHdlr(uchar **pp, void *pVal);
fde8c1
@@ -95,6 +96,7 @@ static struct cnfparamdescr modpdescr[] = {
fde8c1
 	{ "defaultfacility", eCmdHdlrString, 0 },
fde8c1
 	{ "usepidfromsystem", eCmdHdlrBinary, 0 },
fde8c1
	{ "defaulttag", eCmdHdlrGetWord, 0 },
fde8c1
+	{ "workaroundjournalbug", eCmdHdlrBinary, 0 }
fde8c1
 };
fde8c1
 static struct cnfparamblk modpblk =
fde8c1
 	{ CNFPARAMBLK_VERSION,
fde8c1
@@ -114,6 +114,10 @@ /* module-global parameters */
fde8c1
 static const char *pid_field_name;	/* read-only after startup */
fde8c1
 static ratelimit_t *ratelimiter = NULL;
fde8c1
 static sd_journal *j;
fde8c1
+static int j_inotify_fd;
fde8c1
+static char *last_cursor = NULL;
fde8c1
+
fde8c1
+#define J_PROCESS_PERIOD 1024  /* Call sd_journal_process() every 1,024 records */
fde8c1
 
fde8c1
 static rsRetVal persistJournalState(void);
fde8c1
 static rsRetVal loadJournalState(void);
fde8c1
@@ -123,6 +127,14 @@ openJournal(sd_journal** jj)
fde8c1
 
fde8c1
 	if (sd_journal_open(jj, SD_JOURNAL_LOCAL_ONLY) < 0)
fde8c1
 		iRet = RS_RET_IO_ERROR;
fde8c1
+	int r;
fde8c1
+
fde8c1
+	if ((r = sd_journal_get_fd(j)) < 0) {
fde8c1
+		errmsg.LogError(-r, RS_RET_IO_ERROR, "imjournal: sd_journal_get_fd() failed");
fde8c1
+		iRet = RS_RET_IO_ERROR;
fde8c1
+	} else {
fde8c1
+		j_inotify_fd = r;
fde8c1
+	}	
fde8c1
 	RETiRet;
fde8c1
 }
fde8c1
 
fde8c1
@@ -132,6 +144,7 @@ closeJournal(sd_journal** jj)
fde8c1
 		persistJournalState();
fde8c1
 	}
fde8c1
 	sd_journal_close(*jj);
fde8c1
+	j_inotify_fd = 0;
fde8c1
 }
fde8c1
 
fde8c1
 
fde8c1
@@ -262,6 +275,7 @@ readjournal(void)
fde8c1
 	char *message = NULL;
fde8c1
 	char *sys_iden = NULL;
fde8c1
 	char *sys_iden_help = NULL;
fde8c1
+	char *c = NULL;
fde8c1
 
fde8c1
 	const void *get;
fde8c1
 	const void *pidget;
fde8c1
@@ -433,6 +437,15 @@ readjournal(void)
fde8c1
 		tv.tv_usec = timestamp % 1000000;
fde8c1
 	}
fde8c1
 
fde8c1
+	if (cs.bWorkAroundJournalBug) {
fde8c1
+		/* save journal cursor (at this point we can be sure it is valid) */
fde8c1
+		sd_journal_get_cursor(j, &c);
fde8c1
+		if (c) {
fde8c1
+			free(last_cursor);
fde8c1
+			last_cursor = c;
fde8c1
+		}
fde8c1
+	}
fde8c1
+
fde8c1
 	/* submit message */
fde8c1
 	enqMsg((uchar *)message, (uchar *) sys_iden_help, facility, severity, &tv, json, 0);
fde8c1
 
fde8c1
@@ -413,44 +433,49 @@ persistJournalState (void)
fde8c1
 	DEFiRet;
fde8c1
 	FILE *sf; /* state file */
fde8c1
 	char tmp_sf[MAXFNAME];
fde8c1
-	char *cursor;
fde8c1
 	int ret = 0;
fde8c1
 
fde8c1
-	/* On success, sd_journal_get_cursor()  returns 1 in systemd
fde8c1
-	   197 or older and 0 in systemd 198 or newer */
fde8c1
-	if ((ret = sd_journal_get_cursor(j, &cursor)) >= 0) {
fde8c1
-               /* we create a temporary name by adding a ".tmp"
fde8c1
-                * suffix to the end of our state file's name
fde8c1
-                */
fde8c1
-               snprintf(tmp_sf, sizeof(tmp_sf), "%s.tmp", cs.stateFile);
fde8c1
-               if ((sf = fopen(tmp_sf, "wb")) != NULL) {
fde8c1
-			if (fprintf(sf, "%s", cursor) < 0) {
fde8c1
-				iRet = RS_RET_IO_ERROR;
fde8c1
-			}
fde8c1
-			fclose(sf);
fde8c1
-			free(cursor);
fde8c1
-                       /* change the name of the file to the configured one */
fde8c1
-                       if (iRet == RS_RET_OK && rename(tmp_sf, cs.stateFile) == -1) {
fde8c1
-                               char errStr[256];
fde8c1
-                               rs_strerror_r(errno, errStr, sizeof(errStr));
fde8c1
-                               iRet = RS_RET_IO_ERROR;
fde8c1
-                               errmsg.LogError(0, iRet, "rename() failed: "
fde8c1
-                                       "'%s', new path: '%s'\n", errStr, cs.stateFile);
fde8c1
-                       }
fde8c1
+	if (cs.bWorkAroundJournalBug) {
fde8c1
+		if (!last_cursor)
fde8c1
+			ABORT_FINALIZE(RS_RET_OK);
fde8c1
 
fde8c1
-		} else {
fde8c1
-			char errStr[256];
fde8c1
-			rs_strerror_r(errno, errStr, sizeof(errStr));
fde8c1
-			errmsg.LogError(0, RS_RET_FOPEN_FAILURE, "fopen() failed: "
fde8c1
-				"'%s', path: '%s'\n", errStr, tmp_sf);
fde8c1
-			iRet = RS_RET_FOPEN_FAILURE;
fde8c1
-		}
fde8c1
-	} else {
fde8c1
+	} else if ((ret = sd_journal_get_cursor(j, &last_cursor)) < 0) {
fde8c1
 		char errStr[256];
fde8c1
 		rs_strerror_r(-(ret), errStr, sizeof(errStr));
fde8c1
 		errmsg.LogError(0, RS_RET_ERR, "sd_journal_get_cursor() failed: '%s'\n", errStr);
fde8c1
-		iRet = RS_RET_ERR;
fde8c1
+		ABORT_FINALIZE(RS_RET_ERR);
fde8c1
	}
fde8c1
+	/* we create a temporary name by adding a ".tmp"
fde8c1
+	 * suffix to the end of our state file's name
fde8c1
+	 */
fde8c1
+	snprintf(tmp_sf, sizeof(tmp_sf), "%s.tmp", cs.stateFile);
fde8c1
+
fde8c1
+	sf = fopen(tmp_sf, "wb");
fde8c1
+	if (!sf) {
fde8c1
+		errmsg.LogError(errno, RS_RET_FOPEN_FAILURE, "imjournal: fopen() failed for path: '%s'", tmp_sf);
fde8c1
+		ABORT_FINALIZE(RS_RET_FOPEN_FAILURE);
fde8c1
+	}
fde8c1
+
fde8c1
+	ret = fputs(last_cursor, sf);
fde8c1
+	if (ret < 0) {
fde8c1
+		errmsg.LogError(errno, RS_RET_IO_ERROR, "imjournal: failed to save cursor to: '%s'", tmp_sf);
fde8c1
+		ret = fclose(sf);
fde8c1
+		ABORT_FINALIZE(RS_RET_IO_ERROR);
fde8c1
+	}
fde8c1
+
fde8c1
+	ret = fclose(sf);
fde8c1
+	if (ret < 0) {
fde8c1
+		errmsg.LogError(errno, RS_RET_IO_ERROR, "imjournal: fclose() failed for path: '%s'", tmp_sf);
fde8c1
+		ABORT_FINALIZE(RS_RET_IO_ERROR);
fde8c1
+	}
fde8c1
+
fde8c1
+	ret = rename(tmp_sf, cs.stateFile);
fde8c1
+	if (ret < 0) {
fde8c1
+		errmsg.LogError(errno, iRet, "imjournal: rename() failed for new path: '%s'", cs.stateFile);
fde8c1
+		ABORT_FINALIZE(RS_RET_IO_ERROR);
fde8c1
+	}
fde8c1
+
fde8c1
+finalize_it:
fde8c1
 	RETiRet;
fde8c1
 }
fde8c1
 
fde8c1
@@ -473,64 +473,26 @@
fde8c1
  * except for the special handling of EINTR.
fde8c1
  */
fde8c1
 
fde8c1
-#define POLL_TIMEOUT 1000 /* timeout for poll is 1s */
fde8c1
+#define POLL_TIMEOUT 900000 /* timeout for poll is 900ms */
fde8c1
 
fde8c1
 static rsRetVal
fde8c1
 pollJournal(void)
fde8c1
 {
fde8c1
 	DEFiRet;
fde8c1
-	struct pollfd pollfd;
fde8c1
-	int pr = 0;
fde8c1
-	int jr = 0;
fde8c1
-
fde8c1
-	pollfd.fd = sd_journal_get_fd(j);
fde8c1
-	pollfd.events = sd_journal_get_events(j);
fde8c1
-	pr = poll(&pollfd, 1, POLL_TIMEOUT);
fde8c1
-	if (pr == -1) {
fde8c1
-		if (errno == EINTR) {
fde8c1
-			/* EINTR is also received during termination
fde8c1
-			 * so return now to check the term state.
fde8c1
-			 */
fde8c1
-			ABORT_FINALIZE(RS_RET_OK);
fde8c1
-		} else {
fde8c1
-			char errStr[256];
fde8c1
-
fde8c1
-			rs_strerror_r(errno, errStr, sizeof(errStr));
fde8c1
-			errmsg.LogError(0, RS_RET_ERR,
fde8c1
-				"poll() failed: '%s'", errStr);
fde8c1
-			ABORT_FINALIZE(RS_RET_ERR);
fde8c1
-		}
fde8c1
-	}
fde8c1
+	int r;
fde8c1
 
fde8c1
+	r = sd_journal_wait(j, POLL_TIMEOUT);
fde8c1
 
fde8c1
-	jr = sd_journal_process(j);
fde8c1
-	
fde8c1
-	if (pr == 1 && jr == SD_JOURNAL_INVALIDATE) {
fde8c1
-		/* do not persist stateFile sd_journal_get_cursor will fail! */
fde8c1
-		char* tmp = cs.stateFile;
fde8c1
-		cs.stateFile = NULL;
fde8c1
+	if (r == SD_JOURNAL_INVALIDATE) {
fde8c1
 		closeJournal(&j);
fde8c1
-		cs.stateFile = tmp;
fde8c1
 
fde8c1
 		iRet = openJournal(&j);
fde8c1
-		if (iRet != RS_RET_OK) {
fde8c1
-			char errStr[256];
fde8c1
-			rs_strerror_r(errno, errStr, sizeof(errStr));
fde8c1
-			errmsg.LogError(0, RS_RET_IO_ERROR,
fde8c1
-				"sd_journal_open() failed: '%s'", errStr);
fde8c1
+		if (iRet != RS_RET_OK)
fde8c1
 			ABORT_FINALIZE(RS_RET_ERR);
fde8c1
-		}
fde8c1
 
fde8c1
-		if(cs.stateFile != NULL){
fde8c1
+		if (cs.stateFile)
fde8c1
 			iRet = loadJournalState();
fde8c1
-		}
fde8c1
-		LogMsg(0, RS_RET_OK, LOG_NOTICE, "imjournal: journal reloaded...");
fde8c1
-	} else if (jr < 0) {
fde8c1
-		char errStr[256];
fde8c1
-		rs_strerror_r(errno, errStr, sizeof(errStr));
fde8c1
-		errmsg.LogError(0, RS_RET_ERR,
fde8c1
-			"sd_journal_process() failed: '%s'", errStr);
fde8c1
-		ABORT_FINALIZE(RS_RET_ERR);
fde8c1
+		errmsg.LogMsg(0, RS_RET_OK, LOG_NOTICE, "imjournal: journal reloaded...");
fde8c1
 	}
fde8c1
 
fde8c1
 finalize_it:
fde8c1
@@ -631,8 +612,17 @@ loadJournalState(void)
fde8c1
 	RETiRet;
fde8c1
 }
fde8c1
 
fde8c1
+static void
fde8c1
+tryRecover(void) {
fde8c1
+	errmsg.LogMsg(0, RS_RET_OK, LOG_INFO, "imjournal: trying to recover from unexpected "
fde8c1
+		"journal error");
fde8c1
+	closeJournal(&j);
fde8c1
+	srSleep(10, 0);	// do not hammer machine with too-frequent retries
fde8c1
+	openJournal(&j);
fde8c1
+}
fde8c1
+
fde8c1
 BEGINrunInput
fde8c1
-	int count = 0;
fde8c1
+	uint64_t count = 0;
fde8c1
 CODESTARTrunInput
fde8c1
 	CHKiRet(ratelimitNew(&ratelimiter, "imjournal", NULL));
fde8c1
 	dbgprintf("imjournal: ratelimiting burst %d, interval %d\n", cs.ratelimitBurst,
fde8c1
@@ -665,26 +655,38 @@ CODESTARTrunInput
fde8c1
 
fde8c1
 		r = sd_journal_next(j);
fde8c1
 		if (r < 0) {
fde8c1
-			char errStr[256];
fde8c1
-
fde8c1
-			rs_strerror_r(errno, errStr, sizeof(errStr));
fde8c1
-			errmsg.LogError(0, RS_RET_ERR,
fde8c1
-				"sd_journal_next() failed: '%s'", errStr);
fde8c1
-			ABORT_FINALIZE(RS_RET_ERR);
fde8c1
+			tryRecover();
fde8c1
+			continue;
fde8c1
 		}
fde8c1
 
fde8c1
 		if (r == 0) {
fde8c1
 			/* No new messages, wait for activity. */
fde8c1
-			CHKiRet(pollJournal());
fde8c1
+			if (pollJournal() != RS_RET_OK) {
fde8c1
+ 				tryRecover();
fde8c1
+ 			}
fde8c1
 			continue;
fde8c1
 		}
fde8c1
 
fde8c1
-		CHKiRet(readjournal());
fde8c1
+		if (readjournal() != RS_RET_OK) {
fde8c1
+ 			tryRecover();
fde8c1
+ 			continue;
fde8c1
+ 		}
fde8c1
+
fde8c1
+		count++;
fde8c1
+
fde8c1
+		if ((count % J_PROCESS_PERIOD) == 0) {
fde8c1
+			/* Give the journal a periodic chance to detect rotated journal files to be cleaned up. */
fde8c1
+			r = sd_journal_process(j);
fde8c1
+			if (r < 0) {
fde8c1
+				errmsg.LogError(-r, RS_RET_ERR, "imjournal: sd_journal_process() failed");
fde8c1
+				tryRecover();
fde8c1
+				continue;
fde8c1
+			}
fde8c1
+		}
fde8c1
+
fde8c1
 		if (cs.stateFile) { /* can't persist without a state file */
fde8c1
 			/* TODO: This could use some finer metric. */
fde8c1
-			count++;
fde8c1
-			if (count == cs.iPersistStateInterval) {
fde8c1
-				count = 0;
fde8c1
+			if ((count % cs.iPersistStateInterval) == 0) {
fde8c1
 				persistJournalState();
fde8c1
 			}
fde8c1
 		}
fde8c1
@@ -901,6 +909,8 @@ CODESTARTsetModCnf
fde8c1
 			cs.bUseJnlPID = (int) pvals[i].val.d.n;
fde8c1
		} else if (!strcmp(modpblk.descr[i].name, "defaulttag")) {
fde8c1
			cs.dfltTag = (char *)es_str2cstr(pvals[i].val.d.estr, NULL);
fde8c1
+		} else if (!strcmp(modpblk.descr[i].name, "workaroundjournalbug")) {
fde8c1
+			cs.bWorkAroundJournalBug = (int) pvals[i].val.d.n;
fde8c1
 		} else {
fde8c1
 			dbgprintf("imjournal: program error, non-handled "
fde8c1
 				"param '%s' in beginCnfLoad\n", modpblk.descr[i].name);
fde8c1
@@ -961,6 +971,8 @@ CODEmodInit_QueryRegCFSLineHdlr
fde8c1
		NULL, &cs.bUseJnlPID, STD_LOADABLE_MODULE_ID));
fde8c1
	CHKiRet(omsdRegCFSLineHdlr((uchar *)"imjournaldefaulttag", 0, eCmdHdlrGetWord,
fde8c1
		NULL, &cs.dfltTag, STD_LOADABLE_MODULE_ID));
fde8c1
+	CHKiRet(omsdRegCFSLineHdlr((uchar *)"workaroundjournalbug", 0, eCmdHdlrBinary,
fde8c1
+		NULL, &cs.bWorkAroundJournalBug, STD_LOADABLE_MODULE_ID));
fde8c1
 ENDmodInit
fde8c1
 /* vim:set ai:
fde8c1
  */