From: Jiri Vymazal Date: Wed, 14 Mar 2018 90:05:01 -0500 modification and merge of below patches for RHEL consumers, also modified journal invalidate/rotation handling to keep possibility to continue after switch of persistent journal original: % %From a99f9b4b42d261c384aee09306fc421df2cca7a5 Mon Sep 17 00:00:00 2001 %From: Peter Portante %Date: Wed, 24 Jan 2018 19:34:41 -0500 %Subject: [PATCH] Proposed fix for handling journal correctly % %The fix is to immediately setup the inotify file descriptor via %`sd_journal_get_fd()` right after a journal open, and then %periodically call `sd_journal_process()` to give the client API %library a chance to detect deleted journal files on disk that need to %be closed so they can be properly erased by the file system. % %We remove the open/close dance and simplify that code as a result. % %Fixes issue #2436. and also: %From 27f96c84d34ee000fbb5d45b00233f2ec3cf2d8a Mon Sep 17 00:00:00 2001 %From: Rainer Gerhards %Date: Tue, 24 Oct 2017 16:14:13 +0200 %Subject: [PATCH] imjournal bugfix: do not disable itself on error % %If some functions calls inside the main loop failed, imjournal exited %with an error code, actually disabling all logging from the journal. %This was probably never intended. % %This patch makes imjournal recover the situation instead. % %closes https://github.com/rsyslog/rsyslog/issues/1895 --- plugins/imjournal/imjournal.c | 206 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------------------------------------------------------------------ 1 file changed, 104 insertions(+), 102 deletions(-) --- a/plugins/imjournal/imjournal.c +++ b/plugins/imjournal/imjournal.c @@ -114,6 +114,10 @@ /* module-global parameters */ static const char *pid_field_name; /* read-only after startup */ static ratelimit_t *ratelimiter = NULL; static sd_journal *j; +static int j_inotify_fd; +static char *last_cursor = NULL; + +#define J_PROCESS_PERIOD 1024 /* Call sd_journal_process() every 1,024 records */ static rsRetVal persistJournalState(void); static rsRetVal loadJournalState(void); @@ -123,6 +127,14 @@ openJournal(sd_journal** jj) if (sd_journal_open(jj, SD_JOURNAL_LOCAL_ONLY) < 0) iRet = RS_RET_IO_ERROR; + int r; + + if ((r = sd_journal_get_fd(j)) < 0) { + errmsg.LogError(-r, RS_RET_IO_ERROR, "imjournal: sd_journal_get_fd() failed"); + iRet = RS_RET_IO_ERROR; + } else { + j_inotify_fd = r; + } RETiRet; } @@ -132,6 +144,7 @@ closeJournal(sd_journal** jj) persistJournalState(); } sd_journal_close(*jj); + j_inotify_fd = 0; } @@ -262,6 +275,7 @@ readjournal(void) char *message = NULL; char *sys_iden = NULL; char *sys_iden_help = NULL; + char *c = NULL; const void *get; const void *pidget; @@ -393,6 +407,12 @@ readjournal(void) tv.tv_usec = timestamp % 1000000; } + sd_journal_get_cursor(j, &c); + if (c) { + free(last_cursor); + last_cursor = c; + } + /* submit message */ enqMsg((uchar *)message, (uchar *) sys_iden_help, facility, severity, &tv, json, 0); @@ -413,44 +433,41 @@ persistJournalState (void) DEFiRet; FILE *sf; /* state file */ char tmp_sf[MAXFNAME]; - char *cursor; - int ret = 0; + int r = 0; - /* On success, sd_journal_get_cursor() returns 1 in systemd - 197 or older and 0 in systemd 198 or newer */ - if ((ret = sd_journal_get_cursor(j, &cursor)) >= 0) { - /* we create a temporary name by adding a ".tmp" - * suffix to the end of our state file's name - */ - snprintf(tmp_sf, sizeof(tmp_sf), "%s.tmp", cs.stateFile); - if ((sf = fopen(tmp_sf, "wb")) != NULL) { - if (fprintf(sf, "%s", cursor) < 0) { - iRet = RS_RET_IO_ERROR; - } - fclose(sf); - free(cursor); - /* change the name of the file to the configured one */ - if (iRet == RS_RET_OK && rename(tmp_sf, cs.stateFile) == -1) { - char errStr[256]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - iRet = RS_RET_IO_ERROR; - errmsg.LogError(0, iRet, "rename() failed: " - "'%s', new path: '%s'\n", errStr, cs.stateFile); - } + if (!last_cursor) + ABORT_FINALIZE(RS_RET_OK); - } else { - char errStr[256]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - errmsg.LogError(0, RS_RET_FOPEN_FAILURE, "fopen() failed: " - "'%s', path: '%s'\n", errStr, tmp_sf); - iRet = RS_RET_FOPEN_FAILURE; - } - } else { - char errStr[256]; - rs_strerror_r(-(ret), errStr, sizeof(errStr)); - errmsg.LogError(0, RS_RET_ERR, "sd_journal_get_cursor() failed: '%s'\n", errStr); - iRet = RS_RET_ERR; - } + /* we create a temporary name by adding a ".tmp" + * suffix to the end of our state file's name + */ + snprintf(tmp_sf, sizeof(tmp_sf), "%s.tmp", cs.stateFile); + + sf = fopen(tmp_sf, "wb"); + if (!sf) { + errmsg.LogError(errno, RS_RET_FOPEN_FAILURE, "imjournal: fopen() failed for path: '%s'", tmp_sf); + ABORT_FINALIZE(RS_RET_FOPEN_FAILURE); + } + + r = fprintf(sf, "%s", last_cursor); + if (r < 0) { + errmsg.LogError(errno, RS_RET_FOPEN_FAILURE, "imjournal: failed to save cursor to: '%s'", tmp_sf); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + + r = fclose(sf); + if (r < 0) { + errmsg.LogError(errno, iRet, "imjournal: fclose() failed for path: '%s'", tmp_sf); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + + r = rename(tmp_sf, cs.stateFile); + if (r < 0) { + errmsg.LogError(errno, iRet, "imjournal: rename() failed for new path: '%s'", cs.stateFile); + ABORT_FINALIZE(RS_RET_IO_ERROR); + } + +finalize_it: RETiRet; } @@ -473,64 +473,29 @@ * except for the special handling of EINTR. */ -#define POLL_TIMEOUT 1000 /* timeout for poll is 1s */ +#define POLL_TIMEOUT 900000 /* timeout for poll is 900ms */ static rsRetVal pollJournal(void) { DEFiRet; - struct pollfd pollfd; - int pr = 0; - int jr = 0; - - pollfd.fd = sd_journal_get_fd(j); - pollfd.events = sd_journal_get_events(j); - pr = poll(&pollfd, 1, POLL_TIMEOUT); - if (pr == -1) { - if (errno == EINTR) { - /* EINTR is also received during termination - * so return now to check the term state. - */ - ABORT_FINALIZE(RS_RET_OK); - } else { - char errStr[256]; - - rs_strerror_r(errno, errStr, sizeof(errStr)); - errmsg.LogError(0, RS_RET_ERR, - "poll() failed: '%s'", errStr); - ABORT_FINALIZE(RS_RET_ERR); - } - } + int r; + for (;;) { + r = sd_journal_wait(j, POLL_TIMEOUT); + break; + } - jr = sd_journal_process(j); - - if (pr == 1 && jr == SD_JOURNAL_INVALIDATE) { - /* do not persist stateFile sd_journal_get_cursor will fail! */ - char* tmp = cs.stateFile; - cs.stateFile = NULL; + if (r == SD_JOURNAL_INVALIDATE) { closeJournal(&j); - cs.stateFile = tmp; iRet = openJournal(&j); - if (iRet != RS_RET_OK) { - char errStr[256]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - errmsg.LogError(0, RS_RET_IO_ERROR, - "sd_journal_open() failed: '%s'", errStr); + if (iRet != RS_RET_OK) ABORT_FINALIZE(RS_RET_ERR); - } - if(cs.stateFile != NULL){ + if (cs.stateFile) iRet = loadJournalState(); - } - LogMsg(0, RS_RET_OK, LOG_NOTICE, "imjournal: journal reloaded..."); - } else if (jr < 0) { - char errStr[256]; - rs_strerror_r(errno, errStr, sizeof(errStr)); - errmsg.LogError(0, RS_RET_ERR, - "sd_journal_process() failed: '%s'", errStr); - ABORT_FINALIZE(RS_RET_ERR); + errmsg.LogMsg(0, RS_RET_OK, LOG_NOTICE, "imjournal: journal reloaded..."); } finalize_it: @@ -631,8 +612,17 @@ loadJournalState(void) RETiRet; } +static void +tryRecover(void) { + errmsg.LogMsg(0, RS_RET_OK, LOG_INFO, "imjournal: trying to recover from unexpected " + "journal error"); + closeJournal(&j); + srSleep(10, 0); // do not hammer machine with too-frequent retries + openJournal(&j); +} + BEGINrunInput - int count = 0; + uint64_t count = 0; CODESTARTrunInput CHKiRet(ratelimitNew(&ratelimiter, "imjournal", NULL)); dbgprintf("imjournal: ratelimiting burst %d, interval %d\n", cs.ratelimitBurst, @@ -665,26 +655,38 @@ CODESTARTrunInput r = sd_journal_next(j); if (r < 0) { - char errStr[256]; - - rs_strerror_r(errno, errStr, sizeof(errStr)); - errmsg.LogError(0, RS_RET_ERR, - "sd_journal_next() failed: '%s'", errStr); - ABORT_FINALIZE(RS_RET_ERR); + tryRecover(); + continue; } if (r == 0) { /* No new messages, wait for activity. */ - CHKiRet(pollJournal()); + if (pollJournal() != RS_RET_OK) { + tryRecover(); + } continue; } - CHKiRet(readjournal()); + if (readjournal() != RS_RET_OK) { + tryRecover(); + continue; + } + + count++; + + if ((count % J_PROCESS_PERIOD) == 0) { + /* Give the journal a periodic chance to detect rotated journal files to be cleaned up. */ + r = sd_journal_process(j); + if (r < 0) { + errmsg.LogError(-r, RS_RET_ERR, "imjournal: sd_journal_process() failed"); + tryRecover(); + continue; + } + } + if (cs.stateFile) { /* can't persist without a state file */ /* TODO: This could use some finer metric. */ - count++; - if (count == cs.iPersistStateInterval) { - count = 0; + if ((count % cs.iPersistStateInterval) == 0) { persistJournalState(); } }