Blame SOURCES/rsyslog-8.24.0-rhbz1591819-msg-loss-shutdown.patch

f656cf
From 59627f23bee26f3acec19d491d5884bcd1fb672e Mon Sep 17 00:00:00 2001
f656cf
From: Rainer Gerhards <rgerhards@adiscon.com>
f656cf
Date: Wed, 6 Jun 2018 17:30:21 +0200
f656cf
Subject: [PATCH] core: fix message loss on target unavailibility during
f656cf
 shutdown
f656cf
f656cf
Triggering condition:
f656cf
- action queue in disk mode (or DA)
f656cf
- batch is being processed by failed action in retry mode
f656cf
- rsyslog is shut down without resuming action
f656cf
f656cf
In these cases messages may be lost by not properly writing them
f656cf
back to the disk queue.
f656cf
f656cf
closes https://github.com/rsyslog/rsyslog/issues/2760
f656cf
---
f656cf
 action.c        | 11 +++++++++--
f656cf
 runtime/queue.c |  3 +++
f656cf
 2 files changed, 12 insertions(+), 2 deletions(-)
f656cf
f656cf
diff --git a/action.c b/action.c
f656cf
index a9f886a43..39fcb1c19 100644
f656cf
--- a/action.c
f656cf
+++ b/action.c
f656cf
@@ -1554,8 +1554,15 @@ processBatchMain(void *__restrict__ const pVoid,
f656cf
 			/* we do not check error state below, because aborting would be
f656cf
 			 * more harmful than continuing.
f656cf
 			 */
f656cf
-			processMsgMain(pAction, pWti, pBatch->pElem[i].pMsg, &ttNow);
f656cf
-			batchSetElemState(pBatch, i, BATCH_STATE_COMM);
f656cf
+			rsRetVal localRet = processMsgMain(pAction, pWti, pBatch->pElem[i].pMsg, &ttNow);
f656cf
+			DBGPRINTF("processBatchMain: i %d, processMsgMain iRet %d\n", i, localRet);
f656cf
+			if(   localRet == RS_RET_OK
f656cf
+			   || localRet == RS_RET_DEFER_COMMIT
f656cf
+			   || localRet == RS_RET_ACTION_FAILED
f656cf
+			   || localRet == RS_RET_PREVIOUS_COMMITTED ) {
f656cf
+				batchSetElemState(pBatch, i, BATCH_STATE_COMM);
f656cf
+				DBGPRINTF("processBatchMain: i %d, COMM state set\n", i);
f656cf
+			}
f656cf
 		}
f656cf
 	}
f656cf
 
f656cf
diff --git a/runtime/queue.c b/runtime/queue.c
f656cf
index 74cc217d1..fd163a49f 100644
f656cf
--- a/runtime/queue.c
f656cf
+++ b/runtime/queue.c
f656cf
@@ -1666,6 +1666,7 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
f656cf
 
f656cf
 	for(i = 0 ; i < pBatch->nElem ; ++i) {
f656cf
 		pMsg = pBatch->pElem[i].pMsg;
f656cf
+		DBGPRINTF("DeleteProcessedBatch: etry %d state %d\n", i, pBatch->eltState[i]);
f656cf
 		if(   pBatch->eltState[i] == BATCH_STATE_RDY
f656cf
 		   || pBatch->eltState[i] == BATCH_STATE_SUB) {
f656cf
 			localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
f656cf
@@ -1778,6 +1779,8 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
f656cf
 	/* it is sufficient to persist only when the bulk of work is done */
f656cf
 	qqueueChkPersist(pThis, nDequeued+nDiscarded+nDeleted);
f656cf
 
f656cf
+	DBGOPRINT((obj_t*) pThis, "dequeued %d consumable elements, szlog %d sz phys %d\n",
f656cf
+		nDequeued, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
f656cf
 	pWti->batch.nElem = nDequeued;
f656cf
 	pWti->batch.nElemDeq = nDequeued + nDiscarded;
f656cf
 	pWti->batch.deqID = getNextDeqID(pThis);