yeahuh / rpms / qemu-kvm

Forked from rpms/qemu-kvm 2 years ago
Clone
ae23c9
From 7ccb50f6be019258bfe5bb501142e39c5a4f52e4 Mon Sep 17 00:00:00 2001
ae23c9
From: Kevin Wolf <kwolf@redhat.com>
ae23c9
Date: Tue, 26 Jun 2018 09:48:22 +0200
ae23c9
Subject: [PATCH 114/268] job: Move transactions to Job
ae23c9
ae23c9
RH-Author: Kevin Wolf <kwolf@redhat.com>
ae23c9
Message-id: <20180626094856.6924-40-kwolf@redhat.com>
ae23c9
Patchwork-id: 81097
ae23c9
O-Subject: [RHV-7.6 qemu-kvm-rhev PATCH v2 39/73] job: Move transactions to Job
ae23c9
Bugzilla: 1513543
ae23c9
RH-Acked-by: Jeffrey Cody <jcody@redhat.com>
ae23c9
RH-Acked-by: Max Reitz <mreitz@redhat.com>
ae23c9
RH-Acked-by: Fam Zheng <famz@redhat.com>
ae23c9
ae23c9
This moves the logic that implements job transactions from BlockJob to
ae23c9
Job.
ae23c9
ae23c9
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
ae23c9
Reviewed-by: Max Reitz <mreitz@redhat.com>
ae23c9
(cherry picked from commit 7eaa8fb57da96301f4a8ce176ad503f80efc7cc0)
ae23c9
Signed-off-by: Kevin Wolf <kwolf@redhat.com>
ae23c9
Signed-off-by: Miroslav Rezanina <mrezanin@redhat.com>
ae23c9
---
ae23c9
 blockdev.c                   |   6 +-
ae23c9
 blockjob.c                   | 238 +------------------------------------------
ae23c9
 include/block/blockjob.h     |  54 ----------
ae23c9
 include/block/blockjob_int.h |  10 --
ae23c9
 include/qemu/job.h           |  71 +++++++++++--
ae23c9
 job.c                        | 234 ++++++++++++++++++++++++++++++++++++++++--
ae23c9
 tests/test-blockjob-txn.c    |  12 +--
ae23c9
 tests/test-blockjob.c        |   2 +-
ae23c9
 8 files changed, 303 insertions(+), 324 deletions(-)
ae23c9
ae23c9
diff --git a/blockdev.c b/blockdev.c
ae23c9
index 6efdb30..9aa2e79 100644
ae23c9
--- a/blockdev.c
ae23c9
+++ b/blockdev.c
ae23c9
@@ -2256,7 +2256,7 @@ void qmp_transaction(TransactionActionList *dev_list,
ae23c9
      */
ae23c9
     props = get_transaction_properties(props);
ae23c9
     if (props->completion_mode != ACTION_COMPLETION_MODE_INDIVIDUAL) {
ae23c9
-        block_job_txn = block_job_txn_new();
ae23c9
+        block_job_txn = job_txn_new();
ae23c9
     }
ae23c9
 
ae23c9
     /* drain all i/o before any operations */
ae23c9
@@ -2315,7 +2315,7 @@ exit:
ae23c9
     if (!has_props) {
ae23c9
         qapi_free_TransactionProperties(props);
ae23c9
     }
ae23c9
-    block_job_txn_unref(block_job_txn);
ae23c9
+    job_txn_unref(block_job_txn);
ae23c9
 }
ae23c9
 
ae23c9
 void qmp_eject(bool has_device, const char *device,
ae23c9
@@ -3909,7 +3909,7 @@ void qmp_block_job_finalize(const char *id, Error **errp)
ae23c9
     }
ae23c9
 
ae23c9
     trace_qmp_block_job_finalize(job);
ae23c9
-    block_job_finalize(job, errp);
ae23c9
+    job_finalize(&job->job, errp);
ae23c9
     aio_context_release(aio_context);
ae23c9
 }
ae23c9
 
ae23c9
diff --git a/blockjob.c b/blockjob.c
ae23c9
index bd35c4f..14b21c8 100644
ae23c9
--- a/blockjob.c
ae23c9
+++ b/blockjob.c
ae23c9
@@ -36,19 +36,6 @@
ae23c9
 #include "qemu/coroutine.h"
ae23c9
 #include "qemu/timer.h"
ae23c9
 
ae23c9
-/* Transactional group of block jobs */
ae23c9
-struct JobTxn {
ae23c9
-
ae23c9
-    /* Is this txn being cancelled? */
ae23c9
-    bool aborting;
ae23c9
-
ae23c9
-    /* List of jobs */
ae23c9
-    QLIST_HEAD(, Job) jobs;
ae23c9
-
ae23c9
-    /* Reference count */
ae23c9
-    int refcnt;
ae23c9
-};
ae23c9
-
ae23c9
 /*
ae23c9
  * The block job API is composed of two categories of functions.
ae23c9
  *
ae23c9
@@ -94,48 +81,6 @@ BlockJob *block_job_get(const char *id)
ae23c9
     }
ae23c9
 }
ae23c9
 
ae23c9
-JobTxn *block_job_txn_new(void)
ae23c9
-{
ae23c9
-    JobTxn *txn = g_new0(JobTxn, 1);
ae23c9
-    QLIST_INIT(&txn->jobs);
ae23c9
-    txn->refcnt = 1;
ae23c9
-    return txn;
ae23c9
-}
ae23c9
-
ae23c9
-static void block_job_txn_ref(JobTxn *txn)
ae23c9
-{
ae23c9
-    txn->refcnt++;
ae23c9
-}
ae23c9
-
ae23c9
-void block_job_txn_unref(JobTxn *txn)
ae23c9
-{
ae23c9
-    if (txn && --txn->refcnt == 0) {
ae23c9
-        g_free(txn);
ae23c9
-    }
ae23c9
-}
ae23c9
-
ae23c9
-void block_job_txn_add_job(JobTxn *txn, BlockJob *job)
ae23c9
-{
ae23c9
-    if (!txn) {
ae23c9
-        return;
ae23c9
-    }
ae23c9
-
ae23c9
-    assert(!job->txn);
ae23c9
-    job->txn = txn;
ae23c9
-
ae23c9
-    QLIST_INSERT_HEAD(&txn->jobs, &job->job, txn_list);
ae23c9
-    block_job_txn_ref(txn);
ae23c9
-}
ae23c9
-
ae23c9
-void block_job_txn_del_job(BlockJob *job)
ae23c9
-{
ae23c9
-    if (job->txn) {
ae23c9
-        QLIST_REMOVE(&job->job, txn_list);
ae23c9
-        block_job_txn_unref(job->txn);
ae23c9
-        job->txn = NULL;
ae23c9
-    }
ae23c9
-}
ae23c9
-
ae23c9
 static void block_job_attached_aio_context(AioContext *new_context,
ae23c9
                                            void *opaque);
ae23c9
 static void block_job_detach_aio_context(void *opaque);
ae23c9
@@ -145,8 +90,6 @@ void block_job_free(Job *job)
ae23c9
     BlockJob *bjob = container_of(job, BlockJob, job);
ae23c9
     BlockDriverState *bs = blk_bs(bjob->blk);
ae23c9
 
ae23c9
-    assert(!bjob->txn);
ae23c9
-
ae23c9
     bs->job = NULL;
ae23c9
     block_job_remove_all_bdrv(bjob);
ae23c9
     blk_remove_aio_context_notifier(bjob->blk,
ae23c9
@@ -261,158 +204,6 @@ const BlockJobDriver *block_job_driver(BlockJob *job)
ae23c9
     return job->driver;
ae23c9
 }
ae23c9
 
ae23c9
-static int block_job_prepare(BlockJob *job)
ae23c9
-{
ae23c9
-    if (job->job.ret == 0 && job->driver->prepare) {
ae23c9
-        job->job.ret = job->driver->prepare(job);
ae23c9
-    }
ae23c9
-    return job->job.ret;
ae23c9
-}
ae23c9
-
ae23c9
-static void job_cancel_async(Job *job, bool force)
ae23c9
-{
ae23c9
-    if (job->user_paused) {
ae23c9
-        /* Do not call job_enter here, the caller will handle it.  */
ae23c9
-        job->user_paused = false;
ae23c9
-        if (job->driver->user_resume) {
ae23c9
-            job->driver->user_resume(job);
ae23c9
-        }
ae23c9
-        assert(job->pause_count > 0);
ae23c9
-        job->pause_count--;
ae23c9
-    }
ae23c9
-    job->cancelled = true;
ae23c9
-    /* To prevent 'force == false' overriding a previous 'force == true' */
ae23c9
-    job->force_cancel |= force;
ae23c9
-}
ae23c9
-
ae23c9
-static int block_job_txn_apply(JobTxn *txn, int fn(BlockJob *), bool lock)
ae23c9
-{
ae23c9
-    AioContext *ctx;
ae23c9
-    Job *job, *next;
ae23c9
-    BlockJob *bjob;
ae23c9
-    int rc = 0;
ae23c9
-
ae23c9
-    QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) {
ae23c9
-        assert(is_block_job(job));
ae23c9
-        bjob = container_of(job, BlockJob, job);
ae23c9
-
ae23c9
-        if (lock) {
ae23c9
-            ctx = job->aio_context;
ae23c9
-            aio_context_acquire(ctx);
ae23c9
-        }
ae23c9
-        rc = fn(bjob);
ae23c9
-        if (lock) {
ae23c9
-            aio_context_release(ctx);
ae23c9
-        }
ae23c9
-        if (rc) {
ae23c9
-            break;
ae23c9
-        }
ae23c9
-    }
ae23c9
-    return rc;
ae23c9
-}
ae23c9
-
ae23c9
-static void block_job_completed_txn_abort(BlockJob *job)
ae23c9
-{
ae23c9
-    AioContext *ctx;
ae23c9
-    JobTxn *txn = job->txn;
ae23c9
-    Job *other_job;
ae23c9
-
ae23c9
-    if (txn->aborting) {
ae23c9
-        /*
ae23c9
-         * We are cancelled by another job, which will handle everything.
ae23c9
-         */
ae23c9
-        return;
ae23c9
-    }
ae23c9
-    txn->aborting = true;
ae23c9
-    block_job_txn_ref(txn);
ae23c9
-
ae23c9
-    /* We are the first failed job. Cancel other jobs. */
ae23c9
-    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
ae23c9
-        ctx = other_job->aio_context;
ae23c9
-        aio_context_acquire(ctx);
ae23c9
-    }
ae23c9
-
ae23c9
-    /* Other jobs are effectively cancelled by us, set the status for
ae23c9
-     * them; this job, however, may or may not be cancelled, depending
ae23c9
-     * on the caller, so leave it. */
ae23c9
-    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
ae23c9
-        if (other_job != &job->job) {
ae23c9
-            job_cancel_async(other_job, false);
ae23c9
-        }
ae23c9
-    }
ae23c9
-    while (!QLIST_EMPTY(&txn->jobs)) {
ae23c9
-        other_job = QLIST_FIRST(&txn->jobs);
ae23c9
-        ctx = other_job->aio_context;
ae23c9
-        if (!job_is_completed(other_job)) {
ae23c9
-            assert(job_is_cancelled(other_job));
ae23c9
-            job_finish_sync(other_job, NULL, NULL);
ae23c9
-        }
ae23c9
-        job_finalize_single(other_job);
ae23c9
-        aio_context_release(ctx);
ae23c9
-    }
ae23c9
-
ae23c9
-    block_job_txn_unref(txn);
ae23c9
-}
ae23c9
-
ae23c9
-static int block_job_needs_finalize(BlockJob *job)
ae23c9
-{
ae23c9
-    return !job->job.auto_finalize;
ae23c9
-}
ae23c9
-
ae23c9
-static int block_job_finalize_single(BlockJob *job)
ae23c9
-{
ae23c9
-    return job_finalize_single(&job->job);
ae23c9
-}
ae23c9
-
ae23c9
-static void block_job_do_finalize(BlockJob *job)
ae23c9
-{
ae23c9
-    int rc;
ae23c9
-    assert(job && job->txn);
ae23c9
-
ae23c9
-    /* prepare the transaction to complete */
ae23c9
-    rc = block_job_txn_apply(job->txn, block_job_prepare, true);
ae23c9
-    if (rc) {
ae23c9
-        block_job_completed_txn_abort(job);
ae23c9
-    } else {
ae23c9
-        block_job_txn_apply(job->txn, block_job_finalize_single, true);
ae23c9
-    }
ae23c9
-}
ae23c9
-
ae23c9
-static int block_job_transition_to_pending(BlockJob *job)
ae23c9
-{
ae23c9
-    job_state_transition(&job->job, JOB_STATUS_PENDING);
ae23c9
-    if (!job->job.auto_finalize) {
ae23c9
-        job_event_pending(&job->job);
ae23c9
-    }
ae23c9
-    return 0;
ae23c9
-}
ae23c9
-
ae23c9
-static void block_job_completed_txn_success(BlockJob *job)
ae23c9
-{
ae23c9
-    JobTxn *txn = job->txn;
ae23c9
-    Job *other_job;
ae23c9
-
ae23c9
-    job_state_transition(&job->job, JOB_STATUS_WAITING);
ae23c9
-
ae23c9
-    /*
ae23c9
-     * Successful completion, see if there are other running jobs in this
ae23c9
-     * txn.
ae23c9
-     */
ae23c9
-    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
ae23c9
-        if (!job_is_completed(other_job)) {
ae23c9
-            return;
ae23c9
-        }
ae23c9
-        assert(other_job->ret == 0);
ae23c9
-    }
ae23c9
-
ae23c9
-    block_job_txn_apply(txn, block_job_transition_to_pending, false);
ae23c9
-
ae23c9
-    /* If no jobs need manual finalization, automatically do so */
ae23c9
-    if (block_job_txn_apply(txn, block_job_needs_finalize, false) == 0) {
ae23c9
-        block_job_do_finalize(job);
ae23c9
-    }
ae23c9
-}
ae23c9
-
ae23c9
 /* Assumes the job_mutex is held */
ae23c9
 static bool job_timer_pending(Job *job)
ae23c9
 {
ae23c9
@@ -451,15 +242,6 @@ int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n)
ae23c9
     return ratelimit_calculate_delay(&job->limit, n);
ae23c9
 }
ae23c9
 
ae23c9
-void block_job_finalize(BlockJob *job, Error **errp)
ae23c9
-{
ae23c9
-    assert(job && job->job.id);
ae23c9
-    if (job_apply_verb(&job->job, JOB_VERB_FINALIZE, errp)) {
ae23c9
-        return;
ae23c9
-    }
ae23c9
-    block_job_do_finalize(job);
ae23c9
-}
ae23c9
-
ae23c9
 void block_job_dismiss(BlockJob **jobptr, Error **errp)
ae23c9
 {
ae23c9
     BlockJob *job = *jobptr;
ae23c9
@@ -483,7 +265,7 @@ void block_job_cancel(BlockJob *job, bool force)
ae23c9
     if (!job_started(&job->job)) {
ae23c9
         block_job_completed(job, -ECANCELED);
ae23c9
     } else if (job->job.deferred_to_main_loop) {
ae23c9
-        block_job_completed_txn_abort(job);
ae23c9
+        job_completed_txn_abort(&job->job);
ae23c9
     } else {
ae23c9
         block_job_enter(job);
ae23c9
     }
ae23c9
@@ -656,7 +438,7 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
ae23c9
         return NULL;
ae23c9
     }
ae23c9
 
ae23c9
-    job = job_create(job_id, &driver->job_driver, blk_get_aio_context(blk),
ae23c9
+    job = job_create(job_id, &driver->job_driver, txn, blk_get_aio_context(blk),
ae23c9
                      flags, cb, opaque, errp);
ae23c9
     if (job == NULL) {
ae23c9
         blk_unref(blk);
ae23c9
@@ -703,30 +485,20 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
ae23c9
         }
ae23c9
     }
ae23c9
 
ae23c9
-    /* Single jobs are modeled as single-job transactions for sake of
ae23c9
-     * consolidating the job management logic */
ae23c9
-    if (!txn) {
ae23c9
-        txn = block_job_txn_new();
ae23c9
-        block_job_txn_add_job(txn, job);
ae23c9
-        block_job_txn_unref(txn);
ae23c9
-    } else {
ae23c9
-        block_job_txn_add_job(txn, job);
ae23c9
-    }
ae23c9
-
ae23c9
     return job;
ae23c9
 }
ae23c9
 
ae23c9
 void block_job_completed(BlockJob *job, int ret)
ae23c9
 {
ae23c9
-    assert(job && job->txn && !job_is_completed(&job->job));
ae23c9
+    assert(job && job->job.txn && !job_is_completed(&job->job));
ae23c9
     assert(blk_bs(job->blk)->job == job);
ae23c9
     job->job.ret = ret;
ae23c9
     job_update_rc(&job->job);
ae23c9
     trace_block_job_completed(job, ret, job->job.ret);
ae23c9
     if (job->job.ret) {
ae23c9
-        block_job_completed_txn_abort(job);
ae23c9
+        job_completed_txn_abort(&job->job);
ae23c9
     } else {
ae23c9
-        block_job_completed_txn_success(job);
ae23c9
+        job_completed_txn_success(&job->job);
ae23c9
     }
ae23c9
 }
ae23c9
 
ae23c9
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
ae23c9
index 44df025..09e6bb4 100644
ae23c9
--- a/include/block/blockjob.h
ae23c9
+++ b/include/block/blockjob.h
ae23c9
@@ -33,7 +33,6 @@
ae23c9
 #define BLOCK_JOB_SLICE_TIME 100000000ULL /* ns */
ae23c9
 
ae23c9
 typedef struct BlockJobDriver BlockJobDriver;
ae23c9
-typedef struct JobTxn JobTxn;
ae23c9
 
ae23c9
 /**
ae23c9
  * BlockJob:
ae23c9
@@ -84,8 +83,6 @@ typedef struct BlockJob {
ae23c9
 
ae23c9
     /** BlockDriverStates that are involved in this block job */
ae23c9
     GSList *nodes;
ae23c9
-
ae23c9
-    JobTxn *txn;
ae23c9
 } BlockJob;
ae23c9
 
ae23c9
 /**
ae23c9
@@ -153,22 +150,6 @@ void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp);
ae23c9
 void block_job_cancel(BlockJob *job, bool force);
ae23c9
 
ae23c9
 /**
ae23c9
- * block_job_finalize:
ae23c9
- * @job: The job to fully commit and finish.
ae23c9
- * @errp: Error object.
ae23c9
- *
ae23c9
- * For jobs that have finished their work and are pending
ae23c9
- * awaiting explicit acknowledgement to commit their work,
ae23c9
- * This will commit that work.
ae23c9
- *
ae23c9
- * FIXME: Make the below statement universally true:
ae23c9
- * For jobs that support the manual workflow mode, all graph
ae23c9
- * changes that occur as a result will occur after this command
ae23c9
- * and before a successful reply.
ae23c9
- */
ae23c9
-void block_job_finalize(BlockJob *job, Error **errp);
ae23c9
-
ae23c9
-/**
ae23c9
  * block_job_dismiss:
ae23c9
  * @job: The job to be dismissed.
ae23c9
  * @errp: Error object.
ae23c9
@@ -260,41 +241,6 @@ int block_job_complete_sync(BlockJob *job, Error **errp);
ae23c9
 void block_job_iostatus_reset(BlockJob *job);
ae23c9
 
ae23c9
 /**
ae23c9
- * block_job_txn_new:
ae23c9
- *
ae23c9
- * Allocate and return a new block job transaction.  Jobs can be added to the
ae23c9
- * transaction using block_job_txn_add_job().
ae23c9
- *
ae23c9
- * The transaction is automatically freed when the last job completes or is
ae23c9
- * cancelled.
ae23c9
- *
ae23c9
- * All jobs in the transaction either complete successfully or fail/cancel as a
ae23c9
- * group.  Jobs wait for each other before completing.  Cancelling one job
ae23c9
- * cancels all jobs in the transaction.
ae23c9
- */
ae23c9
-JobTxn *block_job_txn_new(void);
ae23c9
-
ae23c9
-/**
ae23c9
- * block_job_txn_unref:
ae23c9
- *
ae23c9
- * Release a reference that was previously acquired with block_job_txn_add_job
ae23c9
- * or block_job_txn_new. If it's the last reference to the object, it will be
ae23c9
- * freed.
ae23c9
- */
ae23c9
-void block_job_txn_unref(JobTxn *txn);
ae23c9
-
ae23c9
-/**
ae23c9
- * block_job_txn_add_job:
ae23c9
- * @txn: The transaction (may be NULL)
ae23c9
- * @job: Job to add to the transaction
ae23c9
- *
ae23c9
- * Add @job to the transaction.  The @job must not already be in a transaction.
ae23c9
- * The caller must call either block_job_txn_unref() or block_job_completed()
ae23c9
- * to release the reference that is automatically grabbed here.
ae23c9
- */
ae23c9
-void block_job_txn_add_job(JobTxn *txn, BlockJob *job);
ae23c9
-
ae23c9
-/**
ae23c9
  * block_job_is_internal:
ae23c9
  * @job: The job to determine if it is user-visible or not.
ae23c9
  *
ae23c9
diff --git a/include/block/blockjob_int.h b/include/block/blockjob_int.h
ae23c9
index ce66a9b..29a2802 100644
ae23c9
--- a/include/block/blockjob_int.h
ae23c9
+++ b/include/block/blockjob_int.h
ae23c9
@@ -38,16 +38,6 @@ struct BlockJobDriver {
ae23c9
     /** Generic JobDriver callbacks and settings */
ae23c9
     JobDriver job_driver;
ae23c9
 
ae23c9
-    /**
ae23c9
-     * If the callback is not NULL, prepare will be invoked when all the jobs
ae23c9
-     * belonging to the same transaction complete; or upon this job's completion
ae23c9
-     * if it is not in a transaction.
ae23c9
-     *
ae23c9
-     * This callback will not be invoked if the job has already failed.
ae23c9
-     * If it fails, abort and then clean will be called.
ae23c9
-     */
ae23c9
-    int (*prepare)(BlockJob *job);
ae23c9
-
ae23c9
     /*
ae23c9
      * If the callback is not NULL, it will be invoked before the job is
ae23c9
      * resumed in a new AioContext.  This is the place to move any resources
ae23c9
diff --git a/include/qemu/job.h b/include/qemu/job.h
ae23c9
index d4aa7fa..39d74ab 100644
ae23c9
--- a/include/qemu/job.h
ae23c9
+++ b/include/qemu/job.h
ae23c9
@@ -32,6 +32,8 @@
ae23c9
 #include "block/aio.h"
ae23c9
 
ae23c9
 typedef struct JobDriver JobDriver;
ae23c9
+typedef struct JobTxn JobTxn;
ae23c9
+
ae23c9
 
ae23c9
 /**
ae23c9
  * Long-running operation.
ae23c9
@@ -133,6 +135,9 @@ typedef struct Job {
ae23c9
     /** Element of the list of jobs */
ae23c9
     QLIST_ENTRY(Job) job_list;
ae23c9
 
ae23c9
+    /** Transaction this job is part of */
ae23c9
+    JobTxn *txn;
ae23c9
+
ae23c9
     /** Element of the list of jobs in a job transaction */
ae23c9
     QLIST_ENTRY(Job) txn_list;
ae23c9
 } Job;
ae23c9
@@ -184,6 +189,16 @@ struct JobDriver {
ae23c9
     void (*drain)(Job *job);
ae23c9
 
ae23c9
     /**
ae23c9
+     * If the callback is not NULL, prepare will be invoked when all the jobs
ae23c9
+     * belonging to the same transaction complete; or upon this job's completion
ae23c9
+     * if it is not in a transaction.
ae23c9
+     *
ae23c9
+     * This callback will not be invoked if the job has already failed.
ae23c9
+     * If it fails, abort and then clean will be called.
ae23c9
+     */
ae23c9
+    int (*prepare)(Job *job);
ae23c9
+
ae23c9
+    /**
ae23c9
      * If the callback is not NULL, it will be invoked when all the jobs
ae23c9
      * belonging to the same transaction complete; or upon this job's
ae23c9
      * completion if it is not in a transaction. Skipped if NULL.
ae23c9
@@ -227,20 +242,52 @@ typedef enum JobCreateFlags {
ae23c9
     JOB_MANUAL_DISMISS = 0x04,
ae23c9
 } JobCreateFlags;
ae23c9
 
ae23c9
+/**
ae23c9
+ * Allocate and return a new job transaction. Jobs can be added to the
ae23c9
+ * transaction using job_txn_add_job().
ae23c9
+ *
ae23c9
+ * The transaction is automatically freed when the last job completes or is
ae23c9
+ * cancelled.
ae23c9
+ *
ae23c9
+ * All jobs in the transaction either complete successfully or fail/cancel as a
ae23c9
+ * group.  Jobs wait for each other before completing.  Cancelling one job
ae23c9
+ * cancels all jobs in the transaction.
ae23c9
+ */
ae23c9
+JobTxn *job_txn_new(void);
ae23c9
+
ae23c9
+/**
ae23c9
+ * Release a reference that was previously acquired with job_txn_add_job or
ae23c9
+ * job_txn_new. If it's the last reference to the object, it will be freed.
ae23c9
+ */
ae23c9
+void job_txn_unref(JobTxn *txn);
ae23c9
+
ae23c9
+/**
ae23c9
+ * @txn: The transaction (may be NULL)
ae23c9
+ * @job: Job to add to the transaction
ae23c9
+ *
ae23c9
+ * Add @job to the transaction.  The @job must not already be in a transaction.
ae23c9
+ * The caller must call either job_txn_unref() or block_job_completed() to
ae23c9
+ * release the reference that is automatically grabbed here.
ae23c9
+ *
ae23c9
+ * If @txn is NULL, the function does nothing.
ae23c9
+ */
ae23c9
+void job_txn_add_job(JobTxn *txn, Job *job);
ae23c9
 
ae23c9
 /**
ae23c9
  * Create a new long-running job and return it.
ae23c9
  *
ae23c9
  * @job_id: The id of the newly-created job, or %NULL for internal jobs
ae23c9
  * @driver: The class object for the newly-created job.
ae23c9
+ * @txn: The transaction this job belongs to, if any. %NULL otherwise.
ae23c9
  * @ctx: The AioContext to run the job coroutine in.
ae23c9
  * @flags: Creation flags for the job. See @JobCreateFlags.
ae23c9
  * @cb: Completion function for the job.
ae23c9
  * @opaque: Opaque pointer value passed to @cb.
ae23c9
  * @errp: Error object.
ae23c9
  */
ae23c9
-void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx,
ae23c9
-                 int flags, BlockCompletionFunc *cb, void *opaque, Error **errp);
ae23c9
+void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
ae23c9
+                 AioContext *ctx, int flags, BlockCompletionFunc *cb,
ae23c9
+                 void *opaque, Error **errp);
ae23c9
 
ae23c9
 /**
ae23c9
  * Add a reference to Job refcnt, it will be decreased with job_unref, and then
ae23c9
@@ -260,9 +307,6 @@ void job_event_cancelled(Job *job);
ae23c9
 /** To be called when a successfully completed job is finalised. */
ae23c9
 void job_event_completed(Job *job);
ae23c9
 
ae23c9
-/** To be called when the job transitions to PENDING */
ae23c9
-void job_event_pending(Job *job);
ae23c9
-
ae23c9
 /**
ae23c9
  * Conditionally enter the job coroutine if the job is ready to run, not
ae23c9
  * already busy and fn() returns true. fn() is called while under the job_lock
ae23c9
@@ -375,6 +419,16 @@ void job_early_fail(Job *job);
ae23c9
 /** Asynchronously complete the specified @job. */
ae23c9
 void job_complete(Job *job, Error **errp);;
ae23c9
 
ae23c9
+/**
ae23c9
+ * For a @job that has finished its work and is pending awaiting explicit
ae23c9
+ * acknowledgement to commit its work, this will commit that work.
ae23c9
+ *
ae23c9
+ * FIXME: Make the below statement universally true:
ae23c9
+ * For jobs that support the manual workflow mode, all graph changes that occur
ae23c9
+ * as a result will occur after this command and before a successful reply.
ae23c9
+ */
ae23c9
+void job_finalize(Job *job, Error **errp);
ae23c9
+
ae23c9
 typedef void JobDeferToMainLoopFn(Job *job, void *opaque);
ae23c9
 
ae23c9
 /**
ae23c9
@@ -407,10 +461,9 @@ void coroutine_fn job_do_yield(Job *job, uint64_t ns);
ae23c9
 bool job_should_pause(Job *job);
ae23c9
 bool job_started(Job *job);
ae23c9
 void job_do_dismiss(Job *job);
ae23c9
-int job_finalize_single(Job *job);
ae23c9
 void job_update_rc(Job *job);
ae23c9
-
ae23c9
-typedef struct BlockJob BlockJob;
ae23c9
-void block_job_txn_del_job(BlockJob *job);
ae23c9
+void job_cancel_async(Job *job, bool force);
ae23c9
+void job_completed_txn_abort(Job *job);
ae23c9
+void job_completed_txn_success(Job *job);
ae23c9
 
ae23c9
 #endif
ae23c9
diff --git a/job.c b/job.c
ae23c9
index aa74b4c..4f6fd73 100644
ae23c9
--- a/job.c
ae23c9
+++ b/job.c
ae23c9
@@ -60,6 +60,19 @@ bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
ae23c9
     [JOB_VERB_DISMISS]              = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
ae23c9
 };
ae23c9
 
ae23c9
+/* Transactional group of jobs */
ae23c9
+struct JobTxn {
ae23c9
+
ae23c9
+    /* Is this txn being cancelled? */
ae23c9
+    bool aborting;
ae23c9
+
ae23c9
+    /* List of jobs */
ae23c9
+    QLIST_HEAD(, Job) jobs;
ae23c9
+
ae23c9
+    /* Reference count */
ae23c9
+    int refcnt;
ae23c9
+};
ae23c9
+
ae23c9
 /* Right now, this mutex is only needed to synchronize accesses to job->busy
ae23c9
  * and job->sleep_timer, such as concurrent calls to job_do_yield and
ae23c9
  * job_enter. */
ae23c9
@@ -80,6 +93,71 @@ static void __attribute__((__constructor__)) job_init(void)
ae23c9
     qemu_mutex_init(&job_mutex);
ae23c9
 }
ae23c9
 
ae23c9
+JobTxn *job_txn_new(void)
ae23c9
+{
ae23c9
+    JobTxn *txn = g_new0(JobTxn, 1);
ae23c9
+    QLIST_INIT(&txn->jobs);
ae23c9
+    txn->refcnt = 1;
ae23c9
+    return txn;
ae23c9
+}
ae23c9
+
ae23c9
+static void job_txn_ref(JobTxn *txn)
ae23c9
+{
ae23c9
+    txn->refcnt++;
ae23c9
+}
ae23c9
+
ae23c9
+void job_txn_unref(JobTxn *txn)
ae23c9
+{
ae23c9
+    if (txn && --txn->refcnt == 0) {
ae23c9
+        g_free(txn);
ae23c9
+    }
ae23c9
+}
ae23c9
+
ae23c9
+void job_txn_add_job(JobTxn *txn, Job *job)
ae23c9
+{
ae23c9
+    if (!txn) {
ae23c9
+        return;
ae23c9
+    }
ae23c9
+
ae23c9
+    assert(!job->txn);
ae23c9
+    job->txn = txn;
ae23c9
+
ae23c9
+    QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
ae23c9
+    job_txn_ref(txn);
ae23c9
+}
ae23c9
+
ae23c9
+static void job_txn_del_job(Job *job)
ae23c9
+{
ae23c9
+    if (job->txn) {
ae23c9
+        QLIST_REMOVE(job, txn_list);
ae23c9
+        job_txn_unref(job->txn);
ae23c9
+        job->txn = NULL;
ae23c9
+    }
ae23c9
+}
ae23c9
+
ae23c9
+static int job_txn_apply(JobTxn *txn, int fn(Job *), bool lock)
ae23c9
+{
ae23c9
+    AioContext *ctx;
ae23c9
+    Job *job, *next;
ae23c9
+    int rc = 0;
ae23c9
+
ae23c9
+    QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) {
ae23c9
+        if (lock) {
ae23c9
+            ctx = job->aio_context;
ae23c9
+            aio_context_acquire(ctx);
ae23c9
+        }
ae23c9
+        rc = fn(job);
ae23c9
+        if (lock) {
ae23c9
+            aio_context_release(ctx);
ae23c9
+        }
ae23c9
+        if (rc) {
ae23c9
+            break;
ae23c9
+        }
ae23c9
+    }
ae23c9
+    return rc;
ae23c9
+}
ae23c9
+
ae23c9
+
ae23c9
 /* TODO Make static once the whole state machine is in job.c */
ae23c9
 void job_state_transition(Job *job, JobStatus s1)
ae23c9
 {
ae23c9
@@ -181,8 +259,9 @@ static void job_sleep_timer_cb(void *opaque)
ae23c9
     job_enter(job);
ae23c9
 }
ae23c9
 
ae23c9
-void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx,
ae23c9
-                 int flags, BlockCompletionFunc *cb, void *opaque, Error **errp)
ae23c9
+void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
ae23c9
+                 AioContext *ctx, int flags, BlockCompletionFunc *cb,
ae23c9
+                 void *opaque, Error **errp)
ae23c9
 {
ae23c9
     Job *job;
ae23c9
 
ae23c9
@@ -228,6 +307,16 @@ void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx,
ae23c9
 
ae23c9
     QLIST_INSERT_HEAD(&jobs, job, job_list);
ae23c9
 
ae23c9
+    /* Single jobs are modeled as single-job transactions for sake of
ae23c9
+     * consolidating the job management logic */
ae23c9
+    if (!txn) {
ae23c9
+        txn = job_txn_new();
ae23c9
+        job_txn_add_job(txn, job);
ae23c9
+        job_txn_unref(txn);
ae23c9
+    } else {
ae23c9
+        job_txn_add_job(txn, job);
ae23c9
+    }
ae23c9
+
ae23c9
     return job;
ae23c9
 }
ae23c9
 
ae23c9
@@ -241,6 +330,7 @@ void job_unref(Job *job)
ae23c9
     if (--job->refcnt == 0) {
ae23c9
         assert(job->status == JOB_STATUS_NULL);
ae23c9
         assert(!timer_pending(&job->sleep_timer));
ae23c9
+        assert(!job->txn);
ae23c9
 
ae23c9
         if (job->driver->free) {
ae23c9
             job->driver->free(job);
ae23c9
@@ -263,7 +353,7 @@ void job_event_completed(Job *job)
ae23c9
     notifier_list_notify(&job->on_finalize_completed, job);
ae23c9
 }
ae23c9
 
ae23c9
-void job_event_pending(Job *job)
ae23c9
+static void job_event_pending(Job *job)
ae23c9
 {
ae23c9
     notifier_list_notify(&job->on_pending, job);
ae23c9
 }
ae23c9
@@ -469,8 +559,7 @@ void job_do_dismiss(Job *job)
ae23c9
     job->paused = false;
ae23c9
     job->deferred_to_main_loop = true;
ae23c9
 
ae23c9
-    /* TODO Don't assume it's a BlockJob */
ae23c9
-    block_job_txn_del_job((BlockJob*) job);
ae23c9
+    job_txn_del_job(job);
ae23c9
 
ae23c9
     job_state_transition(job, JOB_STATUS_NULL);
ae23c9
     job_unref(job);
ae23c9
@@ -523,7 +612,7 @@ static void job_clean(Job *job)
ae23c9
     }
ae23c9
 }
ae23c9
 
ae23c9
-int job_finalize_single(Job *job)
ae23c9
+static int job_finalize_single(Job *job)
ae23c9
 {
ae23c9
     assert(job_is_completed(job));
ae23c9
 
ae23c9
@@ -550,12 +639,141 @@ int job_finalize_single(Job *job)
ae23c9
         }
ae23c9
     }
ae23c9
 
ae23c9
-    /* TODO Don't assume it's a BlockJob */
ae23c9
-    block_job_txn_del_job((BlockJob*) job);
ae23c9
+    job_txn_del_job(job);
ae23c9
     job_conclude(job);
ae23c9
     return 0;
ae23c9
 }
ae23c9
 
ae23c9
+void job_cancel_async(Job *job, bool force)
ae23c9
+{
ae23c9
+    if (job->user_paused) {
ae23c9
+        /* Do not call job_enter here, the caller will handle it.  */
ae23c9
+        job->user_paused = false;
ae23c9
+        if (job->driver->user_resume) {
ae23c9
+            job->driver->user_resume(job);
ae23c9
+        }
ae23c9
+        assert(job->pause_count > 0);
ae23c9
+        job->pause_count--;
ae23c9
+    }
ae23c9
+    job->cancelled = true;
ae23c9
+    /* To prevent 'force == false' overriding a previous 'force == true' */
ae23c9
+    job->force_cancel |= force;
ae23c9
+}
ae23c9
+
ae23c9
+void job_completed_txn_abort(Job *job)
ae23c9
+{
ae23c9
+    AioContext *ctx;
ae23c9
+    JobTxn *txn = job->txn;
ae23c9
+    Job *other_job;
ae23c9
+
ae23c9
+    if (txn->aborting) {
ae23c9
+        /*
ae23c9
+         * We are cancelled by another job, which will handle everything.
ae23c9
+         */
ae23c9
+        return;
ae23c9
+    }
ae23c9
+    txn->aborting = true;
ae23c9
+    job_txn_ref(txn);
ae23c9
+
ae23c9
+    /* We are the first failed job. Cancel other jobs. */
ae23c9
+    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
ae23c9
+        ctx = other_job->aio_context;
ae23c9
+        aio_context_acquire(ctx);
ae23c9
+    }
ae23c9
+
ae23c9
+    /* Other jobs are effectively cancelled by us, set the status for
ae23c9
+     * them; this job, however, may or may not be cancelled, depending
ae23c9
+     * on the caller, so leave it. */
ae23c9
+    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
ae23c9
+        if (other_job != job) {
ae23c9
+            job_cancel_async(other_job, false);
ae23c9
+        }
ae23c9
+    }
ae23c9
+    while (!QLIST_EMPTY(&txn->jobs)) {
ae23c9
+        other_job = QLIST_FIRST(&txn->jobs);
ae23c9
+        ctx = other_job->aio_context;
ae23c9
+        if (!job_is_completed(other_job)) {
ae23c9
+            assert(job_is_cancelled(other_job));
ae23c9
+            job_finish_sync(other_job, NULL, NULL);
ae23c9
+        }
ae23c9
+        job_finalize_single(other_job);
ae23c9
+        aio_context_release(ctx);
ae23c9
+    }
ae23c9
+
ae23c9
+    job_txn_unref(txn);
ae23c9
+}
ae23c9
+
ae23c9
+static int job_prepare(Job *job)
ae23c9
+{
ae23c9
+    if (job->ret == 0 && job->driver->prepare) {
ae23c9
+        job->ret = job->driver->prepare(job);
ae23c9
+    }
ae23c9
+    return job->ret;
ae23c9
+}
ae23c9
+
ae23c9
+static int job_needs_finalize(Job *job)
ae23c9
+{
ae23c9
+    return !job->auto_finalize;
ae23c9
+}
ae23c9
+
ae23c9
+static void job_do_finalize(Job *job)
ae23c9
+{
ae23c9
+    int rc;
ae23c9
+    assert(job && job->txn);
ae23c9
+
ae23c9
+    /* prepare the transaction to complete */
ae23c9
+    rc = job_txn_apply(job->txn, job_prepare, true);
ae23c9
+    if (rc) {
ae23c9
+        job_completed_txn_abort(job);
ae23c9
+    } else {
ae23c9
+        job_txn_apply(job->txn, job_finalize_single, true);
ae23c9
+    }
ae23c9
+}
ae23c9
+
ae23c9
+void job_finalize(Job *job, Error **errp)
ae23c9
+{
ae23c9
+    assert(job && job->id);
ae23c9
+    if (job_apply_verb(job, JOB_VERB_FINALIZE, errp)) {
ae23c9
+        return;
ae23c9
+    }
ae23c9
+    job_do_finalize(job);
ae23c9
+}
ae23c9
+
ae23c9
+static int job_transition_to_pending(Job *job)
ae23c9
+{
ae23c9
+    job_state_transition(job, JOB_STATUS_PENDING);
ae23c9
+    if (!job->auto_finalize) {
ae23c9
+        job_event_pending(job);
ae23c9
+    }
ae23c9
+    return 0;
ae23c9
+}
ae23c9
+
ae23c9
+void job_completed_txn_success(Job *job)
ae23c9
+{
ae23c9
+    JobTxn *txn = job->txn;
ae23c9
+    Job *other_job;
ae23c9
+
ae23c9
+    job_state_transition(job, JOB_STATUS_WAITING);
ae23c9
+
ae23c9
+    /*
ae23c9
+     * Successful completion, see if there are other running jobs in this
ae23c9
+     * txn.
ae23c9
+     */
ae23c9
+    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
ae23c9
+        if (!job_is_completed(other_job)) {
ae23c9
+            return;
ae23c9
+        }
ae23c9
+        assert(other_job->ret == 0);
ae23c9
+    }
ae23c9
+
ae23c9
+    job_txn_apply(txn, job_transition_to_pending, false);
ae23c9
+
ae23c9
+    /* If no jobs need manual finalization, automatically do so */
ae23c9
+    if (job_txn_apply(txn, job_needs_finalize, false) == 0) {
ae23c9
+        job_do_finalize(job);
ae23c9
+    }
ae23c9
+}
ae23c9
+
ae23c9
 void job_complete(Job *job, Error **errp)
ae23c9
 {
ae23c9
     /* Should not be reachable via external interface for internal jobs */
ae23c9
diff --git a/tests/test-blockjob-txn.c b/tests/test-blockjob-txn.c
ae23c9
index ec5d592..6ee31d5 100644
ae23c9
--- a/tests/test-blockjob-txn.c
ae23c9
+++ b/tests/test-blockjob-txn.c
ae23c9
@@ -125,7 +125,7 @@ static void test_single_job(int expected)
ae23c9
     JobTxn *txn;
ae23c9
     int result = -EINPROGRESS;
ae23c9
 
ae23c9
-    txn = block_job_txn_new();
ae23c9
+    txn = job_txn_new();
ae23c9
     job = test_block_job_start(1, true, expected, &result, txn);
ae23c9
     job_start(&job->job);
ae23c9
 
ae23c9
@@ -138,7 +138,7 @@ static void test_single_job(int expected)
ae23c9
     }
ae23c9
     g_assert_cmpint(result, ==, expected);
ae23c9
 
ae23c9
-    block_job_txn_unref(txn);
ae23c9
+    job_txn_unref(txn);
ae23c9
 }
ae23c9
 
ae23c9
 static void test_single_job_success(void)
ae23c9
@@ -164,7 +164,7 @@ static void test_pair_jobs(int expected1, int expected2)
ae23c9
     int result1 = -EINPROGRESS;
ae23c9
     int result2 = -EINPROGRESS;
ae23c9
 
ae23c9
-    txn = block_job_txn_new();
ae23c9
+    txn = job_txn_new();
ae23c9
     job1 = test_block_job_start(1, true, expected1, &result1, txn);
ae23c9
     job2 = test_block_job_start(2, true, expected2, &result2, txn);
ae23c9
     job_start(&job1->job);
ae23c9
@@ -173,7 +173,7 @@ static void test_pair_jobs(int expected1, int expected2)
ae23c9
     /* Release our reference now to trigger as many nice
ae23c9
      * use-after-free bugs as possible.
ae23c9
      */
ae23c9
-    block_job_txn_unref(txn);
ae23c9
+    job_txn_unref(txn);
ae23c9
 
ae23c9
     if (expected1 == -ECANCELED) {
ae23c9
         block_job_cancel(job1, false);
ae23c9
@@ -226,7 +226,7 @@ static void test_pair_jobs_fail_cancel_race(void)
ae23c9
     int result1 = -EINPROGRESS;
ae23c9
     int result2 = -EINPROGRESS;
ae23c9
 
ae23c9
-    txn = block_job_txn_new();
ae23c9
+    txn = job_txn_new();
ae23c9
     job1 = test_block_job_start(1, true, -ECANCELED, &result1, txn);
ae23c9
     job2 = test_block_job_start(2, false, 0, &result2, txn);
ae23c9
     job_start(&job1->job);
ae23c9
@@ -247,7 +247,7 @@ static void test_pair_jobs_fail_cancel_race(void)
ae23c9
     g_assert_cmpint(result1, ==, -ECANCELED);
ae23c9
     g_assert_cmpint(result2, ==, -ECANCELED);
ae23c9
 
ae23c9
-    block_job_txn_unref(txn);
ae23c9
+    job_txn_unref(txn);
ae23c9
 }
ae23c9
 
ae23c9
 int main(int argc, char **argv)
ae23c9
diff --git a/tests/test-blockjob.c b/tests/test-blockjob.c
ae23c9
index e44c608..1e052c2 100644
ae23c9
--- a/tests/test-blockjob.c
ae23c9
+++ b/tests/test-blockjob.c
ae23c9
@@ -364,7 +364,7 @@ static void test_cancel_concluded(void)
ae23c9
     }
ae23c9
     assert(job->job.status == JOB_STATUS_PENDING);
ae23c9
 
ae23c9
-    block_job_finalize(job, &error_abort);
ae23c9
+    job_finalize(&job->job, &error_abort);
ae23c9
     assert(job->job.status == JOB_STATUS_CONCLUDED);
ae23c9
 
ae23c9
     cancel_common(s);
ae23c9
-- 
ae23c9
1.8.3.1
ae23c9