From 59627f23bee26f3acec19d491d5884bcd1fb672e Mon Sep 17 00:00:00 2001
From: Rainer Gerhards <rgerhards@adiscon.com>
Date: Wed, 6 Jun 2018 17:30:21 +0200
Subject: [PATCH] core: fix message loss on target unavailibility during
shutdown
Triggering condition:
- action queue in disk mode (or DA)
- batch is being processed by failed action in retry mode
- rsyslog is shut down without resuming action
In these cases messages may be lost by not properly writing them
back to the disk queue.
closes https://github.com/rsyslog/rsyslog/issues/2760
---
action.c | 11 +++++++++--
runtime/queue.c | 3 +++
2 files changed, 12 insertions(+), 2 deletions(-)
diff --git a/action.c b/action.c
index a9f886a43..39fcb1c19 100644
--- a/action.c
+++ b/action.c
@@ -1554,8 +1554,15 @@ processBatchMain(void *__restrict__ const pVoid,
/* we do not check error state below, because aborting would be
* more harmful than continuing.
*/
- processMsgMain(pAction, pWti, pBatch->pElem[i].pMsg, &ttNow);
- batchSetElemState(pBatch, i, BATCH_STATE_COMM);
+ rsRetVal localRet = processMsgMain(pAction, pWti, pBatch->pElem[i].pMsg, &ttNow);
+ DBGPRINTF("processBatchMain: i %d, processMsgMain iRet %d\n", i, localRet);
+ if( localRet == RS_RET_OK
+ || localRet == RS_RET_DEFER_COMMIT
+ || localRet == RS_RET_ACTION_FAILED
+ || localRet == RS_RET_PREVIOUS_COMMITTED ) {
+ batchSetElemState(pBatch, i, BATCH_STATE_COMM);
+ DBGPRINTF("processBatchMain: i %d, COMM state set\n", i);
+ }
}
}
diff --git a/runtime/queue.c b/runtime/queue.c
index 74cc217d1..fd163a49f 100644
--- a/runtime/queue.c
+++ b/runtime/queue.c
@@ -1666,6 +1666,7 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
for(i = 0 ; i < pBatch->nElem ; ++i) {
pMsg = pBatch->pElem[i].pMsg;
+ DBGPRINTF("DeleteProcessedBatch: etry %d state %d\n", i, pBatch->eltState[i]);
if( pBatch->eltState[i] == BATCH_STATE_RDY
|| pBatch->eltState[i] == BATCH_STATE_SUB) {
localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
@@ -1778,6 +1779,8 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
/* it is sufficient to persist only when the bulk of work is done */
qqueueChkPersist(pThis, nDequeued+nDiscarded+nDeleted);
+ DBGOPRINT((obj_t*) pThis, "dequeued %d consumable elements, szlog %d sz phys %d\n",
+ nDequeued, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
pWti->batch.nElem = nDequeued;
pWti->batch.nElemDeq = nDequeued + nDiscarded;
pWti->batch.deqID = getNextDeqID(pThis);