From 59627f23bee26f3acec19d491d5884bcd1fb672e Mon Sep 17 00:00:00 2001 From: Rainer Gerhards Date: Wed, 6 Jun 2018 17:30:21 +0200 Subject: [PATCH] core: fix message loss on target unavailibility during shutdown Triggering condition: - action queue in disk mode (or DA) - batch is being processed by failed action in retry mode - rsyslog is shut down without resuming action In these cases messages may be lost by not properly writing them back to the disk queue. closes https://github.com/rsyslog/rsyslog/issues/2760 --- action.c | 11 +++++++++-- runtime/queue.c | 3 +++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/action.c b/action.c index a9f886a43..39fcb1c19 100644 --- a/action.c +++ b/action.c @@ -1554,8 +1554,15 @@ processBatchMain(void *__restrict__ const pVoid, /* we do not check error state below, because aborting would be * more harmful than continuing. */ - processMsgMain(pAction, pWti, pBatch->pElem[i].pMsg, &ttNow); - batchSetElemState(pBatch, i, BATCH_STATE_COMM); + rsRetVal localRet = processMsgMain(pAction, pWti, pBatch->pElem[i].pMsg, &ttNow); + DBGPRINTF("processBatchMain: i %d, processMsgMain iRet %d\n", i, localRet); + if( localRet == RS_RET_OK + || localRet == RS_RET_DEFER_COMMIT + || localRet == RS_RET_ACTION_FAILED + || localRet == RS_RET_PREVIOUS_COMMITTED ) { + batchSetElemState(pBatch, i, BATCH_STATE_COMM); + DBGPRINTF("processBatchMain: i %d, COMM state set\n", i); + } } } diff --git a/runtime/queue.c b/runtime/queue.c index 74cc217d1..fd163a49f 100644 --- a/runtime/queue.c +++ b/runtime/queue.c @@ -1666,6 +1666,7 @@ DeleteProcessedBatch(qqueue_t *pThis, batch_t *pBatch) for(i = 0 ; i < pBatch->nElem ; ++i) { pMsg = pBatch->pElem[i].pMsg; + DBGPRINTF("DeleteProcessedBatch: etry %d state %d\n", i, pBatch->eltState[i]); if( pBatch->eltState[i] == BATCH_STATE_RDY || pBatch->eltState[i] == BATCH_STATE_SUB) { localRet = doEnqSingleObj(pThis, eFLOWCTL_NO_DELAY, MsgAddRef(pMsg)); @@ -1778,6 +1779,8 @@ DequeueConsumableElements(qqueue_t *pThis, wti_t *pWti, int *piRemainingQueueSiz /* it is sufficient to persist only when the bulk of work is done */ qqueueChkPersist(pThis, nDequeued+nDiscarded+nDeleted); + DBGOPRINT((obj_t*) pThis, "dequeued %d consumable elements, szlog %d sz phys %d\n", + nDequeued, getLogicalQueueSize(pThis), getPhysicalQueueSize(pThis)); pWti->batch.nElem = nDequeued; pWti->batch.nElemDeq = nDequeued + nDiscarded; pWti->batch.deqID = getNextDeqID(pThis);