Blob Blame History Raw
From 0271ce020f3e8c30044632f4e4bfa673b051f233 Mon Sep 17 00:00:00 2001
From: Colin Walters <walters@verbum.org>
Date: Thu, 16 Jun 2016 22:16:27 -0400
Subject: [PATCH] pull: Ensure we always process queue only from main thread

I was easily reproducing a hang on pulls with thousands of requests on
current git master.  The initial symptom seemed to be that there are
multiple code paths where we don't invoke
`session_thread_process_pending_queue()`.  We really need to do
that any time we remove something from the outstanding queue,
to ensure it gets filled again.

A further issue is that we were tying the lifecycle of the pending
object to the `GTask`, but the task could be unref'd from the main
thread (via a `GSource` on the main thread), and that introduced
threadsafety issues, because the hash table and other data suddenly
could be concurrently modified.

Both of these need to be fixed together.  First, we introduce
`Arc<Pending>`, and ensure that both the main and worker threads hold
references.

Second, we ensure that we re-process the queue *immediately* whenever
a task is done, inside the worker thread, rather than doing it
incidentally via an unref.  This architecture is quite similar to what
the outside pull code is doing.
---
 src/libostree/ostree-fetcher.c | 55 ++++++++++++++++++++++++++++++++++++------
 1 file changed, 47 insertions(+), 8 deletions(-)

diff --git a/src/libostree/ostree-fetcher.c b/src/libostree/ostree-fetcher.c
index d956d95..313df6a 100644
--- a/src/libostree/ostree-fetcher.c
+++ b/src/libostree/ostree-fetcher.c
@@ -67,7 +67,12 @@ typedef struct {
   guint64 total_downloaded;
 } ThreadClosure;
 
+static void
+session_thread_process_pending_queue (ThreadClosure *thread_closure);
+
 typedef struct {
+  volatile int ref_count;
+
   ThreadClosure *thread_closure;
   SoupURI *uri;
 
@@ -186,10 +191,22 @@ pending_task_compare (gconstpointer a,
          (priority_a < priority_b) ? -1 : 1;
 }
 
+static OstreeFetcherPendingURI *
+pending_uri_ref (OstreeFetcherPendingURI *pending)
+{
+  g_return_val_if_fail (pending != NULL, NULL);
+  g_return_val_if_fail (pending->ref_count > 0, NULL);
+
+  g_atomic_int_inc (&pending->ref_count);
+
+  return pending;
+}
+
 static void
-pending_uri_free (OstreeFetcherPendingURI *pending)
+pending_uri_unref (OstreeFetcherPendingURI *pending)
 {
-  g_hash_table_remove (pending->thread_closure->outstanding, pending);
+  if (!g_atomic_int_dec_and_test (&pending->ref_count))
+    return;
 
   g_clear_pointer (&pending->thread_closure, thread_closure_unref);
 
@@ -331,8 +348,7 @@ session_thread_process_pending_queue (ThreadClosure *thread_closure)
       pending = g_task_get_task_data (task);
       cancellable = g_task_get_cancellable (task);
 
-      /* pending_uri_free() removes this. */
-      g_hash_table_add (thread_closure->outstanding, pending);
+      g_hash_table_add (thread_closure->outstanding, pending_uri_ref (pending));
 
       soup_request_send_async (pending->request,
                                cancellable,
@@ -540,7 +556,7 @@ _ostree_fetcher_constructed (GObject *object)
   self->thread_closure->tmpdir_dfd = -1;
   self->thread_closure->tmpdir_lock = empty_lockfile;
 
-  self->thread_closure->outstanding = g_hash_table_new (NULL, NULL);
+  self->thread_closure->outstanding = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify)pending_uri_unref);
   self->thread_closure->output_stream_set = g_hash_table_new_full (NULL, NULL,
                                                                    (GDestroyNotify) NULL,
                                                                    (GDestroyNotify) g_object_unref);
@@ -743,6 +759,18 @@ on_stream_read (GObject        *object,
                 gpointer        user_data);
 
 static void
+remove_pending_rerun_queue (OstreeFetcherPendingURI *pending)
+{
+  /* Hold a temporary ref to ensure the reference to
+   * pending->thread_closure is valid.
+   */
+  pending_uri_ref (pending);
+  g_hash_table_remove (pending->thread_closure->outstanding, pending);
+  session_thread_process_pending_queue (pending->thread_closure);
+  pending_uri_unref (pending);
+}
+
+static void
 on_out_splice_complete (GObject        *object,
                         GAsyncResult   *result,
                         gpointer        user_data) 
@@ -770,7 +798,10 @@ on_out_splice_complete (GObject        *object,
 
  out:
   if (local_error)
-    g_task_return_error (task, local_error);
+    {
+      g_task_return_error (task, local_error);
+      remove_pending_rerun_queue (pending);
+    }
 
   g_object_unref (task);
 }
@@ -802,6 +833,7 @@ on_stream_read (GObject        *object,
       g_task_return_pointer (task,
                              g_strdup (pending->out_tmpfile),
                              (GDestroyNotify) g_free);
+      remove_pending_rerun_queue (pending);
     }
   else
     {
@@ -837,7 +869,10 @@ on_stream_read (GObject        *object,
 
  out:
   if (local_error)
-    g_task_return_error (task, local_error);
+    {
+      g_task_return_error (task, local_error);
+      remove_pending_rerun_queue (pending);
+    }
 
   g_object_unref (task);
 }
@@ -883,6 +918,7 @@ on_request_sent (GObject        *object,
                                      g_strdup (pending->out_tmpfile),
                                      (GDestroyNotify) g_free);
             }
+          remove_pending_rerun_queue (pending);
           goto out;
         }
       else if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
@@ -947,6 +983,7 @@ on_request_sent (GObject        *object,
       g_task_return_pointer (task,
                              g_object_ref (pending->request_body),
                              (GDestroyNotify) g_object_unref);
+      remove_pending_rerun_queue (pending);
     }
   
  out:
@@ -955,6 +992,7 @@ on_request_sent (GObject        *object,
       if (pending->request_body)
         (void) g_input_stream_close (pending->request_body, NULL, NULL);
       g_task_return_error (task, local_error);
+      remove_pending_rerun_queue (pending);
     }
 
   g_object_unref (task);
@@ -979,6 +1017,7 @@ ostree_fetcher_request_uri_internal (OstreeFetcher         *self,
 
   /* SoupRequest is created in session thread. */
   pending = g_new0 (OstreeFetcherPendingURI, 1);
+  pending->ref_count = 1;
   pending->thread_closure = thread_closure_ref (self->thread_closure);
   pending->uri = soup_uri_copy (uri);
   pending->max_size = max_size;
@@ -986,7 +1025,7 @@ ostree_fetcher_request_uri_internal (OstreeFetcher         *self,
 
   task = g_task_new (self, cancellable, callback, user_data);
   g_task_set_source_tag (task, source_tag);
-  g_task_set_task_data (task, pending, (GDestroyNotify) pending_uri_free);
+  g_task_set_task_data (task, pending, (GDestroyNotify) pending_uri_unref);
 
   /* We'll use the GTask priority for our own priority queue. */
   g_task_set_priority (task, priority);
-- 
2.5.5