From: Jiri Vymazal Date: Wed, 25 Jul 2018 15:05:01 -0500 modification and merge of below patches for RHEL consumers, also modified journal invalidate/rotation handling to keep possibility to continue after switch of persistent journal original: %From 3bede5ba768975c8b6fe3d1f3e11075910f52fdd Mon Sep 17 00:00:00 2001 %From: Jiri Vymazal %Date: Wed, 7 Mar 2018 11:57:29 +0100 %Subject: [PATCH] Fetching cursor on readJournal() and simplified pollJournal() % %Fetching journal cursor in persistJournal could cause us to save %invalid cursor leading to duplicating messages further on, now we are %saving it on each readJournal() where we now that the state is good. %This result in simplyfing persisJournalState() a bit as well. % %pollJournal() is now cleaner and faster, correctly handles INVALIDATE %status from journald and is able to continue polling after journal %flush. Also reduced POLL_TIMEOUT a bit as it caused rsyslog to exit %with error in corner cases for some ppc when left at full second. plus % %From a99f9b4b42d261c384aee09306fc421df2cca7a5 Mon Sep 17 00:00:00 2001 %From: Peter Portante %Date: Wed, 24 Jan 2018 19:34:41 -0500 %Subject: [PATCH] Proposed fix for handling journal correctly % %The fix is to immediately setup the inotify file descriptor via %`sd_journal_get_fd()` right after a journal open, and then %periodically call `sd_journal_process()` to give the client API %library a chance to detect deleted journal files on disk that need to %be closed so they can be properly erased by the file system. % %We remove the open/close dance and simplify that code as a result. % %Fixes issue #2436. and also: %From 27f96c84d34ee000fbb5d45b00233f2ec3cf2d8a Mon Sep 17 00:00:00 2001 %From: Rainer Gerhards %Date: Tue, 24 Oct 2017 16:14:13 +0200 %Subject: [PATCH] imjournal bugfix: do not disable itself on error % %If some functions calls inside the main loop failed, imjournal exited %with an error code, actually disabling all logging from the journal. %This was probably never intended. % %This patch makes imjournal recover the situation instead. % %closes https://github.com/rsyslog/rsyslog/issues/1895 --- plugins/imjournal/imjournal.c | 211 ++++++++++++++++++++++-------------------- 1 file changed, 110 insertions(+), 102 deletions(-) --- a/plugins/imjournal/imjournal.c +++ b/plugins/imjournal/imjournal.c @@ -80,6 +80,7 @@ static struct configSettings_s { int iDfltFacility; int bUseJnlPID; char *dfltTag; + int bWorkAroundJournalBug; } cs; static rsRetVal facilityHdlr(uchar **pp, void *pVal); @@ -95,6 +96,7 @@ static struct cnfparamdescr modpdescr[] = { { "defaultfacility", eCmdHdlrString, 0 }, { "usepidfromsystem", eCmdHdlrBinary, 0 }, { "defaulttag", eCmdHdlrGetWord, 0 }, + { "workaroundjournalbug", eCmdHdlrBinary, 0 } }; static struct cnfparamblk modpblk = { CNFPARAMBLK_VERSION, @@ -114,6 +114,10 @@ /* module-global parameters */ static const char *pid_field_name; /* read-only after startup */ static ratelimit_t *ratelimiter = NULL; static sd_journal *j; +static int j_inotify_fd; +static char *last_cursor = NULL; + +#define J_PROCESS_PERIOD 1024 /* Call sd_journal_process() every 1,024 records */ static rsRetVal persistJournalState(void); static rsRetVal loadJournalState(void); @@ -123,6 +127,14 @@ openJournal(sd_journal** jj) if (sd_journal_open(jj, SD_JOURNAL_LOCAL_ONLY) < 0) iRet = RS_RET_IO_ERROR; + int r; + + if ((r = sd_journal_get_fd(j)) < 0) { + errmsg.LogError(-r, RS_RET_IO_ERROR, "imjournal: sd_journal_get_fd() failed"); + iRet = RS_RET_IO_ERROR; + } else { + j_inotify_fd = r; + } RETiRet; } @@ -132,6 +144,7 @@ closeJournal(sd_journal** jj) persistJournalState(); } sd_journal_close(*jj); + j_inotify_fd = 0; } @@ -262,6 +275,7 @@ readjournal(void) char *message = NULL; char *sys_iden = NULL; char *sys_iden_help = NULL; + char *c = NULL; const void *get; const void *pidget; @@ -433,6 +437,15 @@ readjournal(void) tv.tv_usec = timestamp % 1000000; } + if (cs.bWorkAroundJournalBug) { + /* save journal cursor (at this point we can be sure it is valid) */ + sd_journal_get_cursor(j, &c); + if (c) { + free(last_cursor); + last_cursor = c; + } + } + /* submit message */ enqMsg((uchar *)message, (uchar *) sys_iden_help, facility, severity, &tv, json, 0); @@ -413,44 +433,49 @@ persistJournalState (void) DEFiRet; FILE *sf; /* state file */ char tmp_sf[MAXFNAME]; - char *cursor; int ret = 0; - /* On success, sd_journal_get_cursor() returns 1 in systemd - 197 or older and 0 in systemd 198 or newer */ - if ((ret = sd_journal_get_cursor(j, &cursor)) >= 0) { - /* we create a temporary name by adding a ".tmp" - * suffix to the end of our state file's name - */ - snprintf(tmp_sf, sizeof(tmp_sf), "%s.tmp", cs.stateFile); - if ((sf = fopen(tmp_sf, "wb")) != NULL) { - if (fprintf(sf, "%s", cursor) < 0) { - iRet = RS_RET_IO_ERROR; - } - fclose(sf); - free(cursor); - /* change the name of the file to the configured one */ - if (iRet == RS_RET_OK && rename(tmp_sf, cs.stateFile) == -1) { - char errStr[256]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - iRet = RS_RET_IO_ERROR; - errmsg.LogError(0, iRet, "rename() failed: " - "'%s', new path: '%s'\n", errStr, cs.stateFile); - } + if (cs.bWorkAroundJournalBug) { + if (!last_cursor) + ABORT_FINALIZE(RS_RET_OK); - } else { - char errStr[256]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - errmsg.LogError(0, RS_RET_FOPEN_FAILURE, "fopen() failed: " - "'%s', path: '%s'\n", errStr, tmp_sf); - iRet = RS_RET_FOPEN_FAILURE; - } - } else { + } else if ((ret = sd_journal_get_cursor(j, &last_cursor)) < 0) { char errStr[256]; rs_strerror_r(-(ret), errStr, sizeof(errStr)); errmsg.LogError(0, RS_RET_ERR, "sd_journal_get_cursor() failed: '%s'\n", errStr); - iRet = RS_RET_ERR; + ABORT_FINALIZE(RS_RET_ERR); } + /* we create a temporary name by adding a ".tmp" + * suffix to the end of our state file's name + */ + snprintf(tmp_sf, sizeof(tmp_sf), "%s.tmp", cs.stateFile); + + sf = fopen(tmp_sf, "wb"); + if (!sf) { + errmsg.LogError(errno, RS_RET_FOPEN_FAILURE, "imjournal: fopen() failed for path: '%s'", tmp_sf); + ABORT_FINALIZE(RS_RET_FOPEN_FAILURE); + } + + ret = fputs(last_cursor, sf); + if (ret < 0) { + errmsg.LogError(errno, RS_RET_IO_ERROR, "imjournal: failed to save cursor to: '%s'", tmp_sf); + ret = fclose(sf); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + + ret = fclose(sf); + if (ret < 0) { + errmsg.LogError(errno, RS_RET_IO_ERROR, "imjournal: fclose() failed for path: '%s'", tmp_sf); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + + ret = rename(tmp_sf, cs.stateFile); + if (ret < 0) { + errmsg.LogError(errno, iRet, "imjournal: rename() failed for new path: '%s'", cs.stateFile); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + +finalize_it: RETiRet; } @@ -473,64 +473,26 @@ * except for the special handling of EINTR. */ -#define POLL_TIMEOUT 1000 /* timeout for poll is 1s */ +#define POLL_TIMEOUT 900000 /* timeout for poll is 900ms */ static rsRetVal pollJournal(void) { DEFiRet; - struct pollfd pollfd; - int pr = 0; - int jr = 0; - - pollfd.fd = sd_journal_get_fd(j); - pollfd.events = sd_journal_get_events(j); - pr = poll(&pollfd, 1, POLL_TIMEOUT); - if (pr == -1) { - if (errno == EINTR) { - /* EINTR is also received during termination - * so return now to check the term state. - */ - ABORT_FINALIZE(RS_RET_OK); - } else { - char errStr[256]; - - rs_strerror_r(errno, errStr, sizeof(errStr)); - errmsg.LogError(0, RS_RET_ERR, - "poll() failed: '%s'", errStr); - ABORT_FINALIZE(RS_RET_ERR); - } - } + int r; + r = sd_journal_wait(j, POLL_TIMEOUT); - jr = sd_journal_process(j); - - if (pr == 1 && jr == SD_JOURNAL_INVALIDATE) { - /* do not persist stateFile sd_journal_get_cursor will fail! */ - char* tmp = cs.stateFile; - cs.stateFile = NULL; + if (r == SD_JOURNAL_INVALIDATE) { closeJournal(&j); - cs.stateFile = tmp; iRet = openJournal(&j); - if (iRet != RS_RET_OK) { - char errStr[256]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - errmsg.LogError(0, RS_RET_IO_ERROR, - "sd_journal_open() failed: '%s'", errStr); + if (iRet != RS_RET_OK) ABORT_FINALIZE(RS_RET_ERR); - } - if(cs.stateFile != NULL){ + if (cs.stateFile) iRet = loadJournalState(); - } - LogMsg(0, RS_RET_OK, LOG_NOTICE, "imjournal: journal reloaded..."); - } else if (jr < 0) { - char errStr[256]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - errmsg.LogError(0, RS_RET_ERR, - "sd_journal_process() failed: '%s'", errStr); - ABORT_FINALIZE(RS_RET_ERR); + errmsg.LogMsg(0, RS_RET_OK, LOG_NOTICE, "imjournal: journal reloaded..."); } finalize_it: @@ -631,8 +612,17 @@ loadJournalState(void) RETiRet; } +static void +tryRecover(void) { + errmsg.LogMsg(0, RS_RET_OK, LOG_INFO, "imjournal: trying to recover from unexpected " + "journal error"); + closeJournal(&j); + srSleep(10, 0); // do not hammer machine with too-frequent retries + openJournal(&j); +} + BEGINrunInput - int count = 0; + uint64_t count = 0; CODESTARTrunInput CHKiRet(ratelimitNew(&ratelimiter, "imjournal", NULL)); dbgprintf("imjournal: ratelimiting burst %d, interval %d\n", cs.ratelimitBurst, @@ -665,26 +655,38 @@ CODESTARTrunInput r = sd_journal_next(j); if (r < 0) { - char errStr[256]; - - rs_strerror_r(errno, errStr, sizeof(errStr)); - errmsg.LogError(0, RS_RET_ERR, - "sd_journal_next() failed: '%s'", errStr); - ABORT_FINALIZE(RS_RET_ERR); + tryRecover(); + continue; } if (r == 0) { /* No new messages, wait for activity. */ - CHKiRet(pollJournal()); + if (pollJournal() != RS_RET_OK) { + tryRecover(); + } continue; } - CHKiRet(readjournal()); + if (readjournal() != RS_RET_OK) { + tryRecover(); + continue; + } + + count++; + + if ((count % J_PROCESS_PERIOD) == 0) { + /* Give the journal a periodic chance to detect rotated journal files to be cleaned up. */ + r = sd_journal_process(j); + if (r < 0) { + errmsg.LogError(-r, RS_RET_ERR, "imjournal: sd_journal_process() failed"); + tryRecover(); + continue; + } + } + if (cs.stateFile) { /* can't persist without a state file */ /* TODO: This could use some finer metric. */ - count++; - if (count == cs.iPersistStateInterval) { - count = 0; + if ((count % cs.iPersistStateInterval) == 0) { persistJournalState(); } } @@ -901,6 +909,8 @@ CODESTARTsetModCnf cs.bUseJnlPID = (int) pvals[i].val.d.n; } else if (!strcmp(modpblk.descr[i].name, "defaulttag")) { cs.dfltTag = (char *)es_str2cstr(pvals[i].val.d.estr, NULL); + } else if (!strcmp(modpblk.descr[i].name, "workaroundjournalbug")) { + cs.bWorkAroundJournalBug = (int) pvals[i].val.d.n; } else { dbgprintf("imjournal: program error, non-handled " "param '%s' in beginCnfLoad\n", modpblk.descr[i].name); @@ -961,6 +971,8 @@ CODEmodInit_QueryRegCFSLineHdlr NULL, &cs.bUseJnlPID, STD_LOADABLE_MODULE_ID)); CHKiRet(omsdRegCFSLineHdlr((uchar *)"imjournaldefaulttag", 0, eCmdHdlrGetWord, NULL, &cs.dfltTag, STD_LOADABLE_MODULE_ID)); + CHKiRet(omsdRegCFSLineHdlr((uchar *)"workaroundjournalbug", 0, eCmdHdlrBinary, + NULL, &cs.bWorkAroundJournalBug, STD_LOADABLE_MODULE_ID)); ENDmodInit /* vim:set ai: */