Blame SOURCES/rsyslog-8.24.0-rhbz1591819-msg-loss-shutdown.patch

c17bfd
From 59627f23bee26f3acec19d491d5884bcd1fb672e Mon Sep 17 00:00:00 2001
c17bfd
From: Rainer Gerhards <rgerhards@adiscon.com>
c17bfd
Date: Wed, 6 Jun 2018 17:30:21 +0200
c17bfd
Subject: [PATCH] core: fix message loss on target unavailibility during
c17bfd
 shutdown
c17bfd
c17bfd
Triggering condition:
c17bfd
- action queue in disk mode (or DA)
c17bfd
- batch is being processed by failed action in retry mode
c17bfd
- rsyslog is shut down without resuming action
c17bfd
c17bfd
In these cases messages may be lost by not properly writing them
c17bfd
back to the disk queue.
c17bfd
c17bfd
closes https://github.com/rsyslog/rsyslog/issues/2760
c17bfd
---
c17bfd
 action.c        | 11 +++++++++--
c17bfd
 runtime/queue.c |  3 +++
c17bfd
 2 files changed, 12 insertions(+), 2 deletions(-)
c17bfd
c17bfd
diff --git a/action.c b/action.c
c17bfd
index a9f886a43..39fcb1c19 100644
c17bfd
--- a/action.c
c17bfd
+++ b/action.c
c17bfd
@@ -1554,8 +1554,15 @@ processBatchMain(void *__restrict__ const pVoid,
c17bfd
 			/* we do not check error state below, because aborting would be
c17bfd
 			 * more harmful than continuing.
c17bfd
 			 */
c17bfd
-			processMsgMain(pAction, pWti, pBatch->pElem[i].pMsg, &ttNow);
c17bfd
-			batchSetElemState(pBatch, i, BATCH_STATE_COMM);
c17bfd
+			rsRetVal localRet = processMsgMain(pAction, pWti, pBatch->pElem[i].pMsg, &ttNow);
c17bfd
+			DBGPRINTF("processBatchMain: i %d, processMsgMain iRet %d\n", i, localRet);
c17bfd
+			if(   localRet == RS_RET_OK
c17bfd
+			   || localRet == RS_RET_DEFER_COMMIT
c17bfd
+			   || localRet == RS_RET_ACTION_FAILED
c17bfd
+			   || localRet == RS_RET_PREVIOUS_COMMITTED ) {
c17bfd
+				batchSetElemState(pBatch, i, BATCH_STATE_COMM);
c17bfd
+				DBGPRINTF("processBatchMain: i %d, COMM state set\n", i);
c17bfd
+			}
c17bfd
 		}
c17bfd
 	}
c17bfd
 
c17bfd
diff --git a/runtime/queue.c b/runtime/queue.c
c17bfd
index 74cc217d1..fd163a49f 100644
c17bfd
--- a/runtime/queue.c
c17bfd
+++ b/runtime/queue.c
c17bfd
@@ -1666,6 +1666,7 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
c17bfd
 
c17bfd
 	for(i = 0 ; i < pBatch->nElem ; ++i) {
c17bfd
 		pMsg = pBatch->pElem[i].pMsg;
c17bfd
+		DBGPRINTF("DeleteProcessedBatch: etry %d state %d\n", i, pBatch->eltState[i]);
c17bfd
 		if(   pBatch->eltState[i] == BATCH_STATE_RDY
c17bfd
 		   || pBatch->eltState[i] == BATCH_STATE_SUB) {
c17bfd
 			localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
c17bfd
@@ -1778,6 +1779,8 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
c17bfd
 	/* it is sufficient to persist only when the bulk of work is done */
c17bfd
 	qqueueChkPersist(pThis, nDequeued+nDiscarded+nDeleted);
c17bfd
 
c17bfd
+	DBGOPRINT((obj_t*) pThis, "dequeued %d consumable elements, szlog %d sz phys %d\n",
c17bfd
+		nDequeued, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
c17bfd
 	pWti->batch.nElem = nDequeued;
c17bfd
 	pWti->batch.nElemDeq = nDequeued + nDiscarded;
c17bfd
 	pWti->batch.deqID = getNextDeqID(pThis);