|
|
c17bfd |
From 59627f23bee26f3acec19d491d5884bcd1fb672e Mon Sep 17 00:00:00 2001
|
|
|
c17bfd |
From: Rainer Gerhards <rgerhards@adiscon.com>
|
|
|
c17bfd |
Date: Wed, 6 Jun 2018 17:30:21 +0200
|
|
|
c17bfd |
Subject: [PATCH] core: fix message loss on target unavailibility during
|
|
|
c17bfd |
shutdown
|
|
|
c17bfd |
|
|
|
c17bfd |
Triggering condition:
|
|
|
c17bfd |
- action queue in disk mode (or DA)
|
|
|
c17bfd |
- batch is being processed by failed action in retry mode
|
|
|
c17bfd |
- rsyslog is shut down without resuming action
|
|
|
c17bfd |
|
|
|
c17bfd |
In these cases messages may be lost by not properly writing them
|
|
|
c17bfd |
back to the disk queue.
|
|
|
c17bfd |
|
|
|
c17bfd |
closes https://github.com/rsyslog/rsyslog/issues/2760
|
|
|
c17bfd |
---
|
|
|
c17bfd |
action.c | 11 +++++++++--
|
|
|
c17bfd |
runtime/queue.c | 3 +++
|
|
|
c17bfd |
2 files changed, 12 insertions(+), 2 deletions(-)
|
|
|
c17bfd |
|
|
|
c17bfd |
diff --git a/action.c b/action.c
|
|
|
c17bfd |
index a9f886a43..39fcb1c19 100644
|
|
|
c17bfd |
--- a/action.c
|
|
|
c17bfd |
+++ b/action.c
|
|
|
c17bfd |
@@ -1554,8 +1554,15 @@ processBatchMain(void *__restrict__ const pVoid,
|
|
|
c17bfd |
/* we do not check error state below, because aborting would be
|
|
|
c17bfd |
* more harmful than continuing.
|
|
|
c17bfd |
*/
|
|
|
c17bfd |
- processMsgMain(pAction, pWti, pBatch->pElem[i].pMsg, &ttNow);
|
|
|
c17bfd |
- batchSetElemState(pBatch, i, BATCH_STATE_COMM);
|
|
|
c17bfd |
+ rsRetVal localRet = processMsgMain(pAction, pWti, pBatch->pElem[i].pMsg, &ttNow);
|
|
|
c17bfd |
+ DBGPRINTF("processBatchMain: i %d, processMsgMain iRet %d\n", i, localRet);
|
|
|
c17bfd |
+ if( localRet == RS_RET_OK
|
|
|
c17bfd |
+ || localRet == RS_RET_DEFER_COMMIT
|
|
|
c17bfd |
+ || localRet == RS_RET_ACTION_FAILED
|
|
|
c17bfd |
+ || localRet == RS_RET_PREVIOUS_COMMITTED ) {
|
|
|
c17bfd |
+ batchSetElemState(pBatch, i, BATCH_STATE_COMM);
|
|
|
c17bfd |
+ DBGPRINTF("processBatchMain: i %d, COMM state set\n", i);
|
|
|
c17bfd |
+ }
|
|
|
c17bfd |
}
|
|
|
c17bfd |
}
|
|
|
c17bfd |
|
|
|
c17bfd |
diff --git a/runtime/queue.c b/runtime/queue.c
|
|
|
c17bfd |
index 74cc217d1..fd163a49f 100644
|
|
|
c17bfd |
--- a/runtime/queue.c
|
|
|
c17bfd |
+++ b/runtime/queue.c
|
|
|
c17bfd |
@@ -1666,6 +1666,7 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch)
|
|
|
c17bfd |
|
|
|
c17bfd |
for(i = 0 ; i < pBatch->nElem ; ++i) {
|
|
|
c17bfd |
pMsg = pBatch->pElem[i].pMsg;
|
|
|
c17bfd |
+ DBGPRINTF("DeleteProcessedBatch: etry %d state %d\n", i, pBatch->eltState[i]);
|
|
|
c17bfd |
if( pBatch->eltState[i] == BATCH_STATE_RDY
|
|
|
c17bfd |
|| pBatch->eltState[i] == BATCH_STATE_SUB) {
|
|
|
c17bfd |
localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg));
|
|
|
c17bfd |
@@ -1778,6 +1779,8 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz
|
|
|
c17bfd |
/* it is sufficient to persist only when the bulk of work is done */
|
|
|
c17bfd |
qqueueChkPersist(pThis, nDequeued+nDiscarded+nDeleted);
|
|
|
c17bfd |
|
|
|
c17bfd |
+ DBGOPRINT((obj_t*) pThis, "dequeued %d consumable elements, szlog %d sz phys %d\n",
|
|
|
c17bfd |
+ nDequeued, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis));
|
|
|
c17bfd |
pWti->batch.nElem = nDequeued;
|
|
|
c17bfd |
pWti->batch.nElemDeq = nDequeued + nDiscarded;
|
|
|
c17bfd |
pWti->batch.deqID = getNextDeqID(pThis);
|