| From 7ccb50f6be019258bfe5bb501142e39c5a4f52e4 Mon Sep 17 00:00:00 2001 |
| From: Kevin Wolf <kwolf@redhat.com> |
| Date: Tue, 26 Jun 2018 09:48:22 +0200 |
| Subject: [PATCH 114/268] job: Move transactions to Job |
| |
| RH-Author: Kevin Wolf <kwolf@redhat.com> |
| Message-id: <20180626094856.6924-40-kwolf@redhat.com> |
| Patchwork-id: 81097 |
| O-Subject: [RHV-7.6 qemu-kvm-rhev PATCH v2 39/73] job: Move transactions to Job |
| Bugzilla: 1513543 |
| RH-Acked-by: Jeffrey Cody <jcody@redhat.com> |
| RH-Acked-by: Max Reitz <mreitz@redhat.com> |
| RH-Acked-by: Fam Zheng <famz@redhat.com> |
| |
| This moves the logic that implements job transactions from BlockJob to |
| Job. |
| |
| Signed-off-by: Kevin Wolf <kwolf@redhat.com> |
| Reviewed-by: Max Reitz <mreitz@redhat.com> |
| (cherry picked from commit 7eaa8fb57da96301f4a8ce176ad503f80efc7cc0) |
| Signed-off-by: Kevin Wolf <kwolf@redhat.com> |
| Signed-off-by: Miroslav Rezanina <mrezanin@redhat.com> |
| |
| blockdev.c | 6 +- |
| blockjob.c | 238 +------------------------------------------ |
| include/block/blockjob.h | 54 ---------- |
| include/block/blockjob_int.h | 10 -- |
| include/qemu/job.h | 71 +++++++++++-- |
| job.c | 234 ++++++++++++++++++++++++++++++++++++++++-- |
| tests/test-blockjob-txn.c | 12 +-- |
| tests/test-blockjob.c | 2 +- |
| 8 files changed, 303 insertions(+), 324 deletions(-) |
| |
| diff --git a/blockdev.c b/blockdev.c |
| index 6efdb30..9aa2e79 100644 |
| |
| |
| @@ -2256,7 +2256,7 @@ void qmp_transaction(TransactionActionList *dev_list, |
| */ |
| props = get_transaction_properties(props); |
| if (props->completion_mode != ACTION_COMPLETION_MODE_INDIVIDUAL) { |
| - block_job_txn = block_job_txn_new(); |
| + block_job_txn = job_txn_new(); |
| } |
| |
| /* drain all i/o before any operations */ |
| @@ -2315,7 +2315,7 @@ exit: |
| if (!has_props) { |
| qapi_free_TransactionProperties(props); |
| } |
| - block_job_txn_unref(block_job_txn); |
| + job_txn_unref(block_job_txn); |
| } |
| |
| void qmp_eject(bool has_device, const char *device, |
| @@ -3909,7 +3909,7 @@ void qmp_block_job_finalize(const char *id, Error **errp) |
| } |
| |
| trace_qmp_block_job_finalize(job); |
| - block_job_finalize(job, errp); |
| + job_finalize(&job->job, errp); |
| aio_context_release(aio_context); |
| } |
| |
| diff --git a/blockjob.c b/blockjob.c |
| index bd35c4f..14b21c8 100644 |
| |
| |
| @@ -36,19 +36,6 @@ |
| #include "qemu/coroutine.h" |
| #include "qemu/timer.h" |
| |
| -/* Transactional group of block jobs */ |
| -struct JobTxn { |
| - |
| - /* Is this txn being cancelled? */ |
| - bool aborting; |
| - |
| - /* List of jobs */ |
| - QLIST_HEAD(, Job) jobs; |
| - |
| - /* Reference count */ |
| - int refcnt; |
| -}; |
| - |
| /* |
| * The block job API is composed of two categories of functions. |
| * |
| @@ -94,48 +81,6 @@ BlockJob *block_job_get(const char *id) |
| } |
| } |
| |
| -JobTxn *block_job_txn_new(void) |
| -{ |
| - JobTxn *txn = g_new0(JobTxn, 1); |
| - QLIST_INIT(&txn->jobs); |
| - txn->refcnt = 1; |
| - return txn; |
| -} |
| - |
| -static void block_job_txn_ref(JobTxn *txn) |
| -{ |
| - txn->refcnt++; |
| -} |
| - |
| -void block_job_txn_unref(JobTxn *txn) |
| -{ |
| - if (txn && --txn->refcnt == 0) { |
| - g_free(txn); |
| - } |
| -} |
| - |
| -void block_job_txn_add_job(JobTxn *txn, BlockJob *job) |
| -{ |
| - if (!txn) { |
| - return; |
| - } |
| - |
| - assert(!job->txn); |
| - job->txn = txn; |
| - |
| - QLIST_INSERT_HEAD(&txn->jobs, &job->job, txn_list); |
| - block_job_txn_ref(txn); |
| -} |
| - |
| -void block_job_txn_del_job(BlockJob *job) |
| -{ |
| - if (job->txn) { |
| - QLIST_REMOVE(&job->job, txn_list); |
| - block_job_txn_unref(job->txn); |
| - job->txn = NULL; |
| - } |
| -} |
| - |
| static void block_job_attached_aio_context(AioContext *new_context, |
| void *opaque); |
| static void block_job_detach_aio_context(void *opaque); |
| @@ -145,8 +90,6 @@ void block_job_free(Job *job) |
| BlockJob *bjob = container_of(job, BlockJob, job); |
| BlockDriverState *bs = blk_bs(bjob->blk); |
| |
| - assert(!bjob->txn); |
| - |
| bs->job = NULL; |
| block_job_remove_all_bdrv(bjob); |
| blk_remove_aio_context_notifier(bjob->blk, |
| @@ -261,158 +204,6 @@ const BlockJobDriver *block_job_driver(BlockJob *job) |
| return job->driver; |
| } |
| |
| -static int block_job_prepare(BlockJob *job) |
| -{ |
| - if (job->job.ret == 0 && job->driver->prepare) { |
| - job->job.ret = job->driver->prepare(job); |
| - } |
| - return job->job.ret; |
| -} |
| - |
| -static void job_cancel_async(Job *job, bool force) |
| -{ |
| - if (job->user_paused) { |
| - /* Do not call job_enter here, the caller will handle it. */ |
| - job->user_paused = false; |
| - if (job->driver->user_resume) { |
| - job->driver->user_resume(job); |
| - } |
| - assert(job->pause_count > 0); |
| - job->pause_count--; |
| - } |
| - job->cancelled = true; |
| - /* To prevent 'force == false' overriding a previous 'force == true' */ |
| - job->force_cancel |= force; |
| -} |
| - |
| -static int block_job_txn_apply(JobTxn *txn, int fn(BlockJob *), bool lock) |
| -{ |
| - AioContext *ctx; |
| - Job *job, *next; |
| - BlockJob *bjob; |
| - int rc = 0; |
| - |
| - QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) { |
| - assert(is_block_job(job)); |
| - bjob = container_of(job, BlockJob, job); |
| - |
| - if (lock) { |
| - ctx = job->aio_context; |
| - aio_context_acquire(ctx); |
| - } |
| - rc = fn(bjob); |
| - if (lock) { |
| - aio_context_release(ctx); |
| - } |
| - if (rc) { |
| - break; |
| - } |
| - } |
| - return rc; |
| -} |
| - |
| -static void block_job_completed_txn_abort(BlockJob *job) |
| -{ |
| - AioContext *ctx; |
| - JobTxn *txn = job->txn; |
| - Job *other_job; |
| - |
| - if (txn->aborting) { |
| - /* |
| - * We are cancelled by another job, which will handle everything. |
| - */ |
| - return; |
| - } |
| - txn->aborting = true; |
| - block_job_txn_ref(txn); |
| - |
| - /* We are the first failed job. Cancel other jobs. */ |
| - QLIST_FOREACH(other_job, &txn->jobs, txn_list) { |
| - ctx = other_job->aio_context; |
| - aio_context_acquire(ctx); |
| - } |
| - |
| - /* Other jobs are effectively cancelled by us, set the status for |
| - * them; this job, however, may or may not be cancelled, depending |
| - * on the caller, so leave it. */ |
| - QLIST_FOREACH(other_job, &txn->jobs, txn_list) { |
| - if (other_job != &job->job) { |
| - job_cancel_async(other_job, false); |
| - } |
| - } |
| - while (!QLIST_EMPTY(&txn->jobs)) { |
| - other_job = QLIST_FIRST(&txn->jobs); |
| - ctx = other_job->aio_context; |
| - if (!job_is_completed(other_job)) { |
| - assert(job_is_cancelled(other_job)); |
| - job_finish_sync(other_job, NULL, NULL); |
| - } |
| - job_finalize_single(other_job); |
| - aio_context_release(ctx); |
| - } |
| - |
| - block_job_txn_unref(txn); |
| -} |
| - |
| -static int block_job_needs_finalize(BlockJob *job) |
| -{ |
| - return !job->job.auto_finalize; |
| -} |
| - |
| -static int block_job_finalize_single(BlockJob *job) |
| -{ |
| - return job_finalize_single(&job->job); |
| -} |
| - |
| -static void block_job_do_finalize(BlockJob *job) |
| -{ |
| - int rc; |
| - assert(job && job->txn); |
| - |
| - /* prepare the transaction to complete */ |
| - rc = block_job_txn_apply(job->txn, block_job_prepare, true); |
| - if (rc) { |
| - block_job_completed_txn_abort(job); |
| - } else { |
| - block_job_txn_apply(job->txn, block_job_finalize_single, true); |
| - } |
| -} |
| - |
| -static int block_job_transition_to_pending(BlockJob *job) |
| -{ |
| - job_state_transition(&job->job, JOB_STATUS_PENDING); |
| - if (!job->job.auto_finalize) { |
| - job_event_pending(&job->job); |
| - } |
| - return 0; |
| -} |
| - |
| -static void block_job_completed_txn_success(BlockJob *job) |
| -{ |
| - JobTxn *txn = job->txn; |
| - Job *other_job; |
| - |
| - job_state_transition(&job->job, JOB_STATUS_WAITING); |
| - |
| - /* |
| - * Successful completion, see if there are other running jobs in this |
| - * txn. |
| - */ |
| - QLIST_FOREACH(other_job, &txn->jobs, txn_list) { |
| - if (!job_is_completed(other_job)) { |
| - return; |
| - } |
| - assert(other_job->ret == 0); |
| - } |
| - |
| - block_job_txn_apply(txn, block_job_transition_to_pending, false); |
| - |
| - /* If no jobs need manual finalization, automatically do so */ |
| - if (block_job_txn_apply(txn, block_job_needs_finalize, false) == 0) { |
| - block_job_do_finalize(job); |
| - } |
| -} |
| - |
| /* Assumes the job_mutex is held */ |
| static bool job_timer_pending(Job *job) |
| { |
| @@ -451,15 +242,6 @@ int64_t block_job_ratelimit_get_delay(BlockJob *job, uint64_t n) |
| return ratelimit_calculate_delay(&job->limit, n); |
| } |
| |
| -void block_job_finalize(BlockJob *job, Error **errp) |
| -{ |
| - assert(job && job->job.id); |
| - if (job_apply_verb(&job->job, JOB_VERB_FINALIZE, errp)) { |
| - return; |
| - } |
| - block_job_do_finalize(job); |
| -} |
| - |
| void block_job_dismiss(BlockJob **jobptr, Error **errp) |
| { |
| BlockJob *job = *jobptr; |
| @@ -483,7 +265,7 @@ void block_job_cancel(BlockJob *job, bool force) |
| if (!job_started(&job->job)) { |
| block_job_completed(job, -ECANCELED); |
| } else if (job->job.deferred_to_main_loop) { |
| - block_job_completed_txn_abort(job); |
| + job_completed_txn_abort(&job->job); |
| } else { |
| block_job_enter(job); |
| } |
| @@ -656,7 +438,7 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver, |
| return NULL; |
| } |
| |
| - job = job_create(job_id, &driver->job_driver, blk_get_aio_context(blk), |
| + job = job_create(job_id, &driver->job_driver, txn, blk_get_aio_context(blk), |
| flags, cb, opaque, errp); |
| if (job == NULL) { |
| blk_unref(blk); |
| @@ -703,30 +485,20 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver, |
| } |
| } |
| |
| - /* Single jobs are modeled as single-job transactions for sake of |
| - * consolidating the job management logic */ |
| - if (!txn) { |
| - txn = block_job_txn_new(); |
| - block_job_txn_add_job(txn, job); |
| - block_job_txn_unref(txn); |
| - } else { |
| - block_job_txn_add_job(txn, job); |
| - } |
| - |
| return job; |
| } |
| |
| void block_job_completed(BlockJob *job, int ret) |
| { |
| - assert(job && job->txn && !job_is_completed(&job->job)); |
| + assert(job && job->job.txn && !job_is_completed(&job->job)); |
| assert(blk_bs(job->blk)->job == job); |
| job->job.ret = ret; |
| job_update_rc(&job->job); |
| trace_block_job_completed(job, ret, job->job.ret); |
| if (job->job.ret) { |
| - block_job_completed_txn_abort(job); |
| + job_completed_txn_abort(&job->job); |
| } else { |
| - block_job_completed_txn_success(job); |
| + job_completed_txn_success(&job->job); |
| } |
| } |
| |
| diff --git a/include/block/blockjob.h b/include/block/blockjob.h |
| index 44df025..09e6bb4 100644 |
| |
| |
| @@ -33,7 +33,6 @@ |
| #define BLOCK_JOB_SLICE_TIME 100000000ULL /* ns */ |
| |
| typedef struct BlockJobDriver BlockJobDriver; |
| -typedef struct JobTxn JobTxn; |
| |
| /** |
| * BlockJob: |
| @@ -84,8 +83,6 @@ typedef struct BlockJob { |
| |
| /** BlockDriverStates that are involved in this block job */ |
| GSList *nodes; |
| - |
| - JobTxn *txn; |
| } BlockJob; |
| |
| /** |
| @@ -153,22 +150,6 @@ void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp); |
| void block_job_cancel(BlockJob *job, bool force); |
| |
| /** |
| - * block_job_finalize: |
| - * @job: The job to fully commit and finish. |
| - * @errp: Error object. |
| - * |
| - * For jobs that have finished their work and are pending |
| - * awaiting explicit acknowledgement to commit their work, |
| - * This will commit that work. |
| - * |
| - * FIXME: Make the below statement universally true: |
| - * For jobs that support the manual workflow mode, all graph |
| - * changes that occur as a result will occur after this command |
| - * and before a successful reply. |
| - */ |
| -void block_job_finalize(BlockJob *job, Error **errp); |
| - |
| -/** |
| * block_job_dismiss: |
| * @job: The job to be dismissed. |
| * @errp: Error object. |
| @@ -260,41 +241,6 @@ int block_job_complete_sync(BlockJob *job, Error **errp); |
| void block_job_iostatus_reset(BlockJob *job); |
| |
| /** |
| - * block_job_txn_new: |
| - * |
| - * Allocate and return a new block job transaction. Jobs can be added to the |
| - * transaction using block_job_txn_add_job(). |
| - * |
| - * The transaction is automatically freed when the last job completes or is |
| - * cancelled. |
| - * |
| - * All jobs in the transaction either complete successfully or fail/cancel as a |
| - * group. Jobs wait for each other before completing. Cancelling one job |
| - * cancels all jobs in the transaction. |
| - */ |
| -JobTxn *block_job_txn_new(void); |
| - |
| -/** |
| - * block_job_txn_unref: |
| - * |
| - * Release a reference that was previously acquired with block_job_txn_add_job |
| - * or block_job_txn_new. If it's the last reference to the object, it will be |
| - * freed. |
| - */ |
| -void block_job_txn_unref(JobTxn *txn); |
| - |
| -/** |
| - * block_job_txn_add_job: |
| - * @txn: The transaction (may be NULL) |
| - * @job: Job to add to the transaction |
| - * |
| - * Add @job to the transaction. The @job must not already be in a transaction. |
| - * The caller must call either block_job_txn_unref() or block_job_completed() |
| - * to release the reference that is automatically grabbed here. |
| - */ |
| -void block_job_txn_add_job(JobTxn *txn, BlockJob *job); |
| - |
| -/** |
| * block_job_is_internal: |
| * @job: The job to determine if it is user-visible or not. |
| * |
| diff --git a/include/block/blockjob_int.h b/include/block/blockjob_int.h |
| index ce66a9b..29a2802 100644 |
| |
| |
| @@ -38,16 +38,6 @@ struct BlockJobDriver { |
| /** Generic JobDriver callbacks and settings */ |
| JobDriver job_driver; |
| |
| - /** |
| - * If the callback is not NULL, prepare will be invoked when all the jobs |
| - * belonging to the same transaction complete; or upon this job's completion |
| - * if it is not in a transaction. |
| - * |
| - * This callback will not be invoked if the job has already failed. |
| - * If it fails, abort and then clean will be called. |
| - */ |
| - int (*prepare)(BlockJob *job); |
| - |
| /* |
| * If the callback is not NULL, it will be invoked before the job is |
| * resumed in a new AioContext. This is the place to move any resources |
| diff --git a/include/qemu/job.h b/include/qemu/job.h |
| index d4aa7fa..39d74ab 100644 |
| |
| |
| @@ -32,6 +32,8 @@ |
| #include "block/aio.h" |
| |
| typedef struct JobDriver JobDriver; |
| +typedef struct JobTxn JobTxn; |
| + |
| |
| /** |
| * Long-running operation. |
| @@ -133,6 +135,9 @@ typedef struct Job { |
| /** Element of the list of jobs */ |
| QLIST_ENTRY(Job) job_list; |
| |
| + /** Transaction this job is part of */ |
| + JobTxn *txn; |
| + |
| /** Element of the list of jobs in a job transaction */ |
| QLIST_ENTRY(Job) txn_list; |
| } Job; |
| @@ -184,6 +189,16 @@ struct JobDriver { |
| void (*drain)(Job *job); |
| |
| /** |
| + * If the callback is not NULL, prepare will be invoked when all the jobs |
| + * belonging to the same transaction complete; or upon this job's completion |
| + * if it is not in a transaction. |
| + * |
| + * This callback will not be invoked if the job has already failed. |
| + * If it fails, abort and then clean will be called. |
| + */ |
| + int (*prepare)(Job *job); |
| + |
| + /** |
| * If the callback is not NULL, it will be invoked when all the jobs |
| * belonging to the same transaction complete; or upon this job's |
| * completion if it is not in a transaction. Skipped if NULL. |
| @@ -227,20 +242,52 @@ typedef enum JobCreateFlags { |
| JOB_MANUAL_DISMISS = 0x04, |
| } JobCreateFlags; |
| |
| +/** |
| + * Allocate and return a new job transaction. Jobs can be added to the |
| + * transaction using job_txn_add_job(). |
| + * |
| + * The transaction is automatically freed when the last job completes or is |
| + * cancelled. |
| + * |
| + * All jobs in the transaction either complete successfully or fail/cancel as a |
| + * group. Jobs wait for each other before completing. Cancelling one job |
| + * cancels all jobs in the transaction. |
| + */ |
| +JobTxn *job_txn_new(void); |
| + |
| +/** |
| + * Release a reference that was previously acquired with job_txn_add_job or |
| + * job_txn_new. If it's the last reference to the object, it will be freed. |
| + */ |
| +void job_txn_unref(JobTxn *txn); |
| + |
| +/** |
| + * @txn: The transaction (may be NULL) |
| + * @job: Job to add to the transaction |
| + * |
| + * Add @job to the transaction. The @job must not already be in a transaction. |
| + * The caller must call either job_txn_unref() or block_job_completed() to |
| + * release the reference that is automatically grabbed here. |
| + * |
| + * If @txn is NULL, the function does nothing. |
| + */ |
| +void job_txn_add_job(JobTxn *txn, Job *job); |
| |
| /** |
| * Create a new long-running job and return it. |
| * |
| * @job_id: The id of the newly-created job, or %NULL for internal jobs |
| * @driver: The class object for the newly-created job. |
| + * @txn: The transaction this job belongs to, if any. %NULL otherwise. |
| * @ctx: The AioContext to run the job coroutine in. |
| * @flags: Creation flags for the job. See @JobCreateFlags. |
| * @cb: Completion function for the job. |
| * @opaque: Opaque pointer value passed to @cb. |
| * @errp: Error object. |
| */ |
| -void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx, |
| - int flags, BlockCompletionFunc *cb, void *opaque, Error **errp); |
| +void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn, |
| + AioContext *ctx, int flags, BlockCompletionFunc *cb, |
| + void *opaque, Error **errp); |
| |
| /** |
| * Add a reference to Job refcnt, it will be decreased with job_unref, and then |
| @@ -260,9 +307,6 @@ void job_event_cancelled(Job *job); |
| /** To be called when a successfully completed job is finalised. */ |
| void job_event_completed(Job *job); |
| |
| -/** To be called when the job transitions to PENDING */ |
| -void job_event_pending(Job *job); |
| - |
| /** |
| * Conditionally enter the job coroutine if the job is ready to run, not |
| * already busy and fn() returns true. fn() is called while under the job_lock |
| @@ -375,6 +419,16 @@ void job_early_fail(Job *job); |
| /** Asynchronously complete the specified @job. */ |
| void job_complete(Job *job, Error **errp);; |
| |
| +/** |
| + * For a @job that has finished its work and is pending awaiting explicit |
| + * acknowledgement to commit its work, this will commit that work. |
| + * |
| + * FIXME: Make the below statement universally true: |
| + * For jobs that support the manual workflow mode, all graph changes that occur |
| + * as a result will occur after this command and before a successful reply. |
| + */ |
| +void job_finalize(Job *job, Error **errp); |
| + |
| typedef void JobDeferToMainLoopFn(Job *job, void *opaque); |
| |
| /** |
| @@ -407,10 +461,9 @@ void coroutine_fn job_do_yield(Job *job, uint64_t ns); |
| bool job_should_pause(Job *job); |
| bool job_started(Job *job); |
| void job_do_dismiss(Job *job); |
| -int job_finalize_single(Job *job); |
| void job_update_rc(Job *job); |
| - |
| -typedef struct BlockJob BlockJob; |
| -void block_job_txn_del_job(BlockJob *job); |
| +void job_cancel_async(Job *job, bool force); |
| +void job_completed_txn_abort(Job *job); |
| +void job_completed_txn_success(Job *job); |
| |
| #endif |
| diff --git a/job.c b/job.c |
| index aa74b4c..4f6fd73 100644 |
| |
| |
| @@ -60,6 +60,19 @@ bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = { |
| [JOB_VERB_DISMISS] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0}, |
| }; |
| |
| +/* Transactional group of jobs */ |
| +struct JobTxn { |
| + |
| + /* Is this txn being cancelled? */ |
| + bool aborting; |
| + |
| + /* List of jobs */ |
| + QLIST_HEAD(, Job) jobs; |
| + |
| + /* Reference count */ |
| + int refcnt; |
| +}; |
| + |
| /* Right now, this mutex is only needed to synchronize accesses to job->busy |
| * and job->sleep_timer, such as concurrent calls to job_do_yield and |
| * job_enter. */ |
| @@ -80,6 +93,71 @@ static void __attribute__((__constructor__)) job_init(void) |
| qemu_mutex_init(&job_mutex); |
| } |
| |
| +JobTxn *job_txn_new(void) |
| +{ |
| + JobTxn *txn = g_new0(JobTxn, 1); |
| + QLIST_INIT(&txn->jobs); |
| + txn->refcnt = 1; |
| + return txn; |
| +} |
| + |
| +static void job_txn_ref(JobTxn *txn) |
| +{ |
| + txn->refcnt++; |
| +} |
| + |
| +void job_txn_unref(JobTxn *txn) |
| +{ |
| + if (txn && --txn->refcnt == 0) { |
| + g_free(txn); |
| + } |
| +} |
| + |
| +void job_txn_add_job(JobTxn *txn, Job *job) |
| +{ |
| + if (!txn) { |
| + return; |
| + } |
| + |
| + assert(!job->txn); |
| + job->txn = txn; |
| + |
| + QLIST_INSERT_HEAD(&txn->jobs, job, txn_list); |
| + job_txn_ref(txn); |
| +} |
| + |
| +static void job_txn_del_job(Job *job) |
| +{ |
| + if (job->txn) { |
| + QLIST_REMOVE(job, txn_list); |
| + job_txn_unref(job->txn); |
| + job->txn = NULL; |
| + } |
| +} |
| + |
| +static int job_txn_apply(JobTxn *txn, int fn(Job *), bool lock) |
| +{ |
| + AioContext *ctx; |
| + Job *job, *next; |
| + int rc = 0; |
| + |
| + QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) { |
| + if (lock) { |
| + ctx = job->aio_context; |
| + aio_context_acquire(ctx); |
| + } |
| + rc = fn(job); |
| + if (lock) { |
| + aio_context_release(ctx); |
| + } |
| + if (rc) { |
| + break; |
| + } |
| + } |
| + return rc; |
| +} |
| + |
| + |
| /* TODO Make static once the whole state machine is in job.c */ |
| void job_state_transition(Job *job, JobStatus s1) |
| { |
| @@ -181,8 +259,9 @@ static void job_sleep_timer_cb(void *opaque) |
| job_enter(job); |
| } |
| |
| -void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx, |
| - int flags, BlockCompletionFunc *cb, void *opaque, Error **errp) |
| +void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn, |
| + AioContext *ctx, int flags, BlockCompletionFunc *cb, |
| + void *opaque, Error **errp) |
| { |
| Job *job; |
| |
| @@ -228,6 +307,16 @@ void *job_create(const char *job_id, const JobDriver *driver, AioContext *ctx, |
| |
| QLIST_INSERT_HEAD(&jobs, job, job_list); |
| |
| + /* Single jobs are modeled as single-job transactions for sake of |
| + * consolidating the job management logic */ |
| + if (!txn) { |
| + txn = job_txn_new(); |
| + job_txn_add_job(txn, job); |
| + job_txn_unref(txn); |
| + } else { |
| + job_txn_add_job(txn, job); |
| + } |
| + |
| return job; |
| } |
| |
| @@ -241,6 +330,7 @@ void job_unref(Job *job) |
| if (--job->refcnt == 0) { |
| assert(job->status == JOB_STATUS_NULL); |
| assert(!timer_pending(&job->sleep_timer)); |
| + assert(!job->txn); |
| |
| if (job->driver->free) { |
| job->driver->free(job); |
| @@ -263,7 +353,7 @@ void job_event_completed(Job *job) |
| notifier_list_notify(&job->on_finalize_completed, job); |
| } |
| |
| -void job_event_pending(Job *job) |
| +static void job_event_pending(Job *job) |
| { |
| notifier_list_notify(&job->on_pending, job); |
| } |
| @@ -469,8 +559,7 @@ void job_do_dismiss(Job *job) |
| job->paused = false; |
| job->deferred_to_main_loop = true; |
| |
| - /* TODO Don't assume it's a BlockJob */ |
| - block_job_txn_del_job((BlockJob*) job); |
| + job_txn_del_job(job); |
| |
| job_state_transition(job, JOB_STATUS_NULL); |
| job_unref(job); |
| @@ -523,7 +612,7 @@ static void job_clean(Job *job) |
| } |
| } |
| |
| -int job_finalize_single(Job *job) |
| +static int job_finalize_single(Job *job) |
| { |
| assert(job_is_completed(job)); |
| |
| @@ -550,12 +639,141 @@ int job_finalize_single(Job *job) |
| } |
| } |
| |
| - /* TODO Don't assume it's a BlockJob */ |
| - block_job_txn_del_job((BlockJob*) job); |
| + job_txn_del_job(job); |
| job_conclude(job); |
| return 0; |
| } |
| |
| +void job_cancel_async(Job *job, bool force) |
| +{ |
| + if (job->user_paused) { |
| + /* Do not call job_enter here, the caller will handle it. */ |
| + job->user_paused = false; |
| + if (job->driver->user_resume) { |
| + job->driver->user_resume(job); |
| + } |
| + assert(job->pause_count > 0); |
| + job->pause_count--; |
| + } |
| + job->cancelled = true; |
| + /* To prevent 'force == false' overriding a previous 'force == true' */ |
| + job->force_cancel |= force; |
| +} |
| + |
| +void job_completed_txn_abort(Job *job) |
| +{ |
| + AioContext *ctx; |
| + JobTxn *txn = job->txn; |
| + Job *other_job; |
| + |
| + if (txn->aborting) { |
| + /* |
| + * We are cancelled by another job, which will handle everything. |
| + */ |
| + return; |
| + } |
| + txn->aborting = true; |
| + job_txn_ref(txn); |
| + |
| + /* We are the first failed job. Cancel other jobs. */ |
| + QLIST_FOREACH(other_job, &txn->jobs, txn_list) { |
| + ctx = other_job->aio_context; |
| + aio_context_acquire(ctx); |
| + } |
| + |
| + /* Other jobs are effectively cancelled by us, set the status for |
| + * them; this job, however, may or may not be cancelled, depending |
| + * on the caller, so leave it. */ |
| + QLIST_FOREACH(other_job, &txn->jobs, txn_list) { |
| + if (other_job != job) { |
| + job_cancel_async(other_job, false); |
| + } |
| + } |
| + while (!QLIST_EMPTY(&txn->jobs)) { |
| + other_job = QLIST_FIRST(&txn->jobs); |
| + ctx = other_job->aio_context; |
| + if (!job_is_completed(other_job)) { |
| + assert(job_is_cancelled(other_job)); |
| + job_finish_sync(other_job, NULL, NULL); |
| + } |
| + job_finalize_single(other_job); |
| + aio_context_release(ctx); |
| + } |
| + |
| + job_txn_unref(txn); |
| +} |
| + |
| +static int job_prepare(Job *job) |
| +{ |
| + if (job->ret == 0 && job->driver->prepare) { |
| + job->ret = job->driver->prepare(job); |
| + } |
| + return job->ret; |
| +} |
| + |
| +static int job_needs_finalize(Job *job) |
| +{ |
| + return !job->auto_finalize; |
| +} |
| + |
| +static void job_do_finalize(Job *job) |
| +{ |
| + int rc; |
| + assert(job && job->txn); |
| + |
| + /* prepare the transaction to complete */ |
| + rc = job_txn_apply(job->txn, job_prepare, true); |
| + if (rc) { |
| + job_completed_txn_abort(job); |
| + } else { |
| + job_txn_apply(job->txn, job_finalize_single, true); |
| + } |
| +} |
| + |
| +void job_finalize(Job *job, Error **errp) |
| +{ |
| + assert(job && job->id); |
| + if (job_apply_verb(job, JOB_VERB_FINALIZE, errp)) { |
| + return; |
| + } |
| + job_do_finalize(job); |
| +} |
| + |
| +static int job_transition_to_pending(Job *job) |
| +{ |
| + job_state_transition(job, JOB_STATUS_PENDING); |
| + if (!job->auto_finalize) { |
| + job_event_pending(job); |
| + } |
| + return 0; |
| +} |
| + |
| +void job_completed_txn_success(Job *job) |
| +{ |
| + JobTxn *txn = job->txn; |
| + Job *other_job; |
| + |
| + job_state_transition(job, JOB_STATUS_WAITING); |
| + |
| + /* |
| + * Successful completion, see if there are other running jobs in this |
| + * txn. |
| + */ |
| + QLIST_FOREACH(other_job, &txn->jobs, txn_list) { |
| + if (!job_is_completed(other_job)) { |
| + return; |
| + } |
| + assert(other_job->ret == 0); |
| + } |
| + |
| + job_txn_apply(txn, job_transition_to_pending, false); |
| + |
| + /* If no jobs need manual finalization, automatically do so */ |
| + if (job_txn_apply(txn, job_needs_finalize, false) == 0) { |
| + job_do_finalize(job); |
| + } |
| +} |
| + |
| void job_complete(Job *job, Error **errp) |
| { |
| /* Should not be reachable via external interface for internal jobs */ |
| diff --git a/tests/test-blockjob-txn.c b/tests/test-blockjob-txn.c |
| index ec5d592..6ee31d5 100644 |
| |
| |
| @@ -125,7 +125,7 @@ static void test_single_job(int expected) |
| JobTxn *txn; |
| int result = -EINPROGRESS; |
| |
| - txn = block_job_txn_new(); |
| + txn = job_txn_new(); |
| job = test_block_job_start(1, true, expected, &result, txn); |
| job_start(&job->job); |
| |
| @@ -138,7 +138,7 @@ static void test_single_job(int expected) |
| } |
| g_assert_cmpint(result, ==, expected); |
| |
| - block_job_txn_unref(txn); |
| + job_txn_unref(txn); |
| } |
| |
| static void test_single_job_success(void) |
| @@ -164,7 +164,7 @@ static void test_pair_jobs(int expected1, int expected2) |
| int result1 = -EINPROGRESS; |
| int result2 = -EINPROGRESS; |
| |
| - txn = block_job_txn_new(); |
| + txn = job_txn_new(); |
| job1 = test_block_job_start(1, true, expected1, &result1, txn); |
| job2 = test_block_job_start(2, true, expected2, &result2, txn); |
| job_start(&job1->job); |
| @@ -173,7 +173,7 @@ static void test_pair_jobs(int expected1, int expected2) |
| /* Release our reference now to trigger as many nice |
| * use-after-free bugs as possible. |
| */ |
| - block_job_txn_unref(txn); |
| + job_txn_unref(txn); |
| |
| if (expected1 == -ECANCELED) { |
| block_job_cancel(job1, false); |
| @@ -226,7 +226,7 @@ static void test_pair_jobs_fail_cancel_race(void) |
| int result1 = -EINPROGRESS; |
| int result2 = -EINPROGRESS; |
| |
| - txn = block_job_txn_new(); |
| + txn = job_txn_new(); |
| job1 = test_block_job_start(1, true, -ECANCELED, &result1, txn); |
| job2 = test_block_job_start(2, false, 0, &result2, txn); |
| job_start(&job1->job); |
| @@ -247,7 +247,7 @@ static void test_pair_jobs_fail_cancel_race(void) |
| g_assert_cmpint(result1, ==, -ECANCELED); |
| g_assert_cmpint(result2, ==, -ECANCELED); |
| |
| - block_job_txn_unref(txn); |
| + job_txn_unref(txn); |
| } |
| |
| int main(int argc, char **argv) |
| diff --git a/tests/test-blockjob.c b/tests/test-blockjob.c |
| index e44c608..1e052c2 100644 |
| |
| |
| @@ -364,7 +364,7 @@ static void test_cancel_concluded(void) |
| } |
| assert(job->job.status == JOB_STATUS_PENDING); |
| |
| - block_job_finalize(job, &error_abort); |
| + job_finalize(&job->job, &error_abort); |
| assert(job->job.status == JOB_STATUS_CONCLUDED); |
| |
| cancel_common(s); |
| -- |
| 1.8.3.1 |
| |