Blob Blame History Raw
From 59627f23bee26f3acec19d491d5884bcd1fb672e Mon Sep 17 00:00:00 2001
From: Rainer Gerhards <rgerhards@adiscon.com>
Date: Wed, 6 Jun 2018 17:30:21 +0200
Subject: [PATCH] core: fix message loss on target unavailibility during
 shutdown

Triggering condition:
- action queue in disk mode (or DA)
- batch is being processed by failed action in retry mode
- rsyslog is shut down without resuming action

In these cases messages may be lost by not properly writing them
back to the disk queue.

closes https://github.com/rsyslog/rsyslog/issues/2760
---
 action.c        | 11 +++++++++--
 runtime/queue.c |  3 +++
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git a/action.c b/action.c
index a9f886a43..39fcb1c19 100644
--- a/action.c
+++ b/action.c
@@ -1554,8 +1554,15 @@ processBatchMain(void *__restrict__ const pVoid,
 			/* we do not check error state below, because aborting would be
 			 * more harmful than continuing.
 			 */
-			processMsgMain(pAction, pWti, pBatch->pElem[i].pMsg, &ttNow);
-			batchSetElemState(pBatch, i, BATCH_STATE_COMM);
+			rsRetVal localRet = processMsgMain(pAction, pWti, pBatch->pElem[i].pMsg, &ttNow);
+			DBGPRINTF("processBatchMain: i %d, processMsgMain iRet %d\n", i, localRet);
+			if(   localRet == RS_RET_OK
+			   || localRet == RS_RET_DEFER_COMMIT
+			   || localRet == RS_RET_ACTION_FAILED
+			   || localRet == RS_RET_PREVIOUS_COMMITTED ) {
+				batchSetElemState(pBatch, i, BATCH_STATE_COMM);
+				DBGPRINTF("processBatchMain: i %d, COMM state set\n", i);
+			}
 		}
 	}
 
diff --git a/runtime/queue.c b/runtime/queue.c
index 74cc217d1..fd163a49f 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1666,6 +1666,7 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
 
 	for(i = 0 ; i < pBatch->nElem ; ++i) {
 		pMsg = pBatch->pElem[i].pMsg;
+		DBGPRINTF("DeleteProcessedBatch: etry %d state %d\n", i, pBatch->eltState[i]);
 		if(   pBatch->eltState[i] == BATCH_STATE_RDY
 		   || pBatch->eltState[i] == BATCH_STATE_SUB) {
 			localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
@@ -1778,6 +1779,8 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
 	/* it is sufficient to persist only when the bulk of work is done */
 	qqueueChkPersist(pThis, nDequeued+nDiscarded+nDeleted);
 
+	DBGOPRINT((obj_t*) pThis, "dequeued %d consumable elements, szlog %d sz phys %d\n",
+		nDequeued, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
 	pWti->batch.nElem = nDequeued;
 	pWti->batch.nElemDeq = nDequeued + nDiscarded;
 	pWti->batch.deqID = getNextDeqID(pThis);