Blame SOURCES/0001-pull-Ensure-we-always-process-queue-only-from-main-t.patch

b5b830
From 0271ce020f3e8c30044632f4e4bfa673b051f233 Mon Sep 17 00:00:00 2001
b5b830
From: Colin Walters <walters@verbum.org>
b5b830
Date: Thu, 16 Jun 2016 22:16:27 -0400
b5b830
Subject: [PATCH] pull: Ensure we always process queue only from main thread
b5b830
b5b830
I was easily reproducing a hang on pulls with thousands of requests on
b5b830
current git master.  The initial symptom seemed to be that there are
b5b830
multiple code paths where we don't invoke
b5b830
`session_thread_process_pending_queue()`.  We really need to do
b5b830
that any time we remove something from the outstanding queue,
b5b830
to ensure it gets filled again.
b5b830
b5b830
A further issue is that we were tying the lifecycle of the pending
b5b830
object to the `GTask`, but the task could be unref'd from the main
b5b830
thread (via a `GSource` on the main thread), and that introduced
b5b830
threadsafety issues, because the hash table and other data suddenly
b5b830
could be concurrently modified.
b5b830
b5b830
Both of these need to be fixed together.  First, we introduce
b5b830
`Arc<Pending>`, and ensure that both the main and worker threads hold
b5b830
references.
b5b830
b5b830
Second, we ensure that we re-process the queue *immediately* whenever
b5b830
a task is done, inside the worker thread, rather than doing it
b5b830
incidentally via an unref.  This architecture is quite similar to what
b5b830
the outside pull code is doing.
b5b830
---
b5b830
 src/libostree/ostree-fetcher.c | 55 ++++++++++++++++++++++++++++++++++++------
b5b830
 1 file changed, 47 insertions(+), 8 deletions(-)
b5b830
b5b830
diff --git a/src/libostree/ostree-fetcher.c b/src/libostree/ostree-fetcher.c
b5b830
index d956d95..313df6a 100644
b5b830
--- a/src/libostree/ostree-fetcher.c
b5b830
+++ b/src/libostree/ostree-fetcher.c
b5b830
@@ -67,7 +67,12 @@ typedef struct {
b5b830
   guint64 total_downloaded;
b5b830
 } ThreadClosure;
b5b830
 
b5b830
+static void
b5b830
+session_thread_process_pending_queue (ThreadClosure *thread_closure);
b5b830
+
b5b830
 typedef struct {
b5b830
+  volatile int ref_count;
b5b830
+
b5b830
   ThreadClosure *thread_closure;
b5b830
   SoupURI *uri;
b5b830
 
b5b830
@@ -186,10 +191,22 @@ pending_task_compare (gconstpointer a,
b5b830
          (priority_a < priority_b) ? -1 : 1;
b5b830
 }
b5b830
 
b5b830
+static OstreeFetcherPendingURI *
b5b830
+pending_uri_ref (OstreeFetcherPendingURI *pending)
b5b830
+{
b5b830
+  g_return_val_if_fail (pending != NULL, NULL);
b5b830
+  g_return_val_if_fail (pending->ref_count > 0, NULL);
b5b830
+
b5b830
+  g_atomic_int_inc (&pending->ref_count);
b5b830
+
b5b830
+  return pending;
b5b830
+}
b5b830
+
b5b830
 static void
b5b830
-pending_uri_free (OstreeFetcherPendingURI *pending)
b5b830
+pending_uri_unref (OstreeFetcherPendingURI *pending)
b5b830
 {
b5b830
-  g_hash_table_remove (pending->thread_closure->outstanding, pending);
b5b830
+  if (!g_atomic_int_dec_and_test (&pending->ref_count))
b5b830
+    return;
b5b830
 
b5b830
   g_clear_pointer (&pending->thread_closure, thread_closure_unref);
b5b830
 
b5b830
@@ -331,8 +348,7 @@ session_thread_process_pending_queue (ThreadClosure *thread_closure)
b5b830
       pending = g_task_get_task_data (task);
b5b830
       cancellable = g_task_get_cancellable (task);
b5b830
 
b5b830
-      /* pending_uri_free() removes this. */
b5b830
-      g_hash_table_add (thread_closure->outstanding, pending);
b5b830
+      g_hash_table_add (thread_closure->outstanding, pending_uri_ref (pending));
b5b830
 
b5b830
       soup_request_send_async (pending->request,
b5b830
                                cancellable,
b5b830
@@ -540,7 +556,7 @@ _ostree_fetcher_constructed (GObject *object)
b5b830
   self->thread_closure->tmpdir_dfd = -1;
b5b830
   self->thread_closure->tmpdir_lock = empty_lockfile;
b5b830
 
b5b830
-  self->thread_closure->outstanding = g_hash_table_new (NULL, NULL);
b5b830
+  self->thread_closure->outstanding = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify)pending_uri_unref);
b5b830
   self->thread_closure->output_stream_set = g_hash_table_new_full (NULL, NULL,
b5b830
                                                                    (GDestroyNotify) NULL,
b5b830
                                                                    (GDestroyNotify) g_object_unref);
b5b830
@@ -743,6 +759,18 @@ on_stream_read (GObject        *object,
b5b830
                 gpointer        user_data);
b5b830
 
b5b830
 static void
b5b830
+remove_pending_rerun_queue (OstreeFetcherPendingURI *pending)
b5b830
+{
b5b830
+  /* Hold a temporary ref to ensure the reference to
b5b830
+   * pending->thread_closure is valid.
b5b830
+   */
b5b830
+  pending_uri_ref (pending);
b5b830
+  g_hash_table_remove (pending->thread_closure->outstanding, pending);
b5b830
+  session_thread_process_pending_queue (pending->thread_closure);
b5b830
+  pending_uri_unref (pending);
b5b830
+}
b5b830
+
b5b830
+static void
b5b830
 on_out_splice_complete (GObject        *object,
b5b830
                         GAsyncResult   *result,
b5b830
                         gpointer        user_data) 
b5b830
@@ -770,7 +798,10 @@ on_out_splice_complete (GObject        *object,
b5b830
 
b5b830
  out:
b5b830
   if (local_error)
b5b830
-    g_task_return_error (task, local_error);
b5b830
+    {
b5b830
+      g_task_return_error (task, local_error);
b5b830
+      remove_pending_rerun_queue (pending);
b5b830
+    }
b5b830
 
b5b830
   g_object_unref (task);
b5b830
 }
b5b830
@@ -802,6 +833,7 @@ on_stream_read (GObject        *object,
b5b830
       g_task_return_pointer (task,
b5b830
                              g_strdup (pending->out_tmpfile),
b5b830
                              (GDestroyNotify) g_free);
b5b830
+      remove_pending_rerun_queue (pending);
b5b830
     }
b5b830
   else
b5b830
     {
b5b830
@@ -837,7 +869,10 @@ on_stream_read (GObject        *object,
b5b830
 
b5b830
  out:
b5b830
   if (local_error)
b5b830
-    g_task_return_error (task, local_error);
b5b830
+    {
b5b830
+      g_task_return_error (task, local_error);
b5b830
+      remove_pending_rerun_queue (pending);
b5b830
+    }
b5b830
 
b5b830
   g_object_unref (task);
b5b830
 }
b5b830
@@ -883,6 +918,7 @@ on_request_sent (GObject        *object,
b5b830
                                      g_strdup (pending->out_tmpfile),
b5b830
                                      (GDestroyNotify) g_free);
b5b830
             }
b5b830
+          remove_pending_rerun_queue (pending);
b5b830
           goto out;
b5b830
         }
b5b830
       else if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
b5b830
@@ -947,6 +983,7 @@ on_request_sent (GObject        *object,
b5b830
       g_task_return_pointer (task,
b5b830
                              g_object_ref (pending->request_body),
b5b830
                              (GDestroyNotify) g_object_unref);
b5b830
+      remove_pending_rerun_queue (pending);
b5b830
     }
b5b830
   
b5b830
  out:
b5b830
@@ -955,6 +992,7 @@ on_request_sent (GObject        *object,
b5b830
       if (pending->request_body)
b5b830
         (void) g_input_stream_close (pending->request_body, NULL, NULL);
b5b830
       g_task_return_error (task, local_error);
b5b830
+      remove_pending_rerun_queue (pending);
b5b830
     }
b5b830
 
b5b830
   g_object_unref (task);
b5b830
@@ -979,6 +1017,7 @@ ostree_fetcher_request_uri_internal (OstreeFetcher         *self,
b5b830
 
b5b830
   /* SoupRequest is created in session thread. */
b5b830
   pending = g_new0 (OstreeFetcherPendingURI, 1);
b5b830
+  pending->ref_count = 1;
b5b830
   pending->thread_closure = thread_closure_ref (self->thread_closure);
b5b830
   pending->uri = soup_uri_copy (uri);
b5b830
   pending->max_size = max_size;
b5b830
@@ -986,7 +1025,7 @@ ostree_fetcher_request_uri_internal (OstreeFetcher         *self,
b5b830
 
b5b830
   task = g_task_new (self, cancellable, callback, user_data);
b5b830
   g_task_set_source_tag (task, source_tag);
b5b830
-  g_task_set_task_data (task, pending, (GDestroyNotify) pending_uri_free);
b5b830
+  g_task_set_task_data (task, pending, (GDestroyNotify) pending_uri_unref);
b5b830
 
b5b830
   /* We'll use the GTask priority for our own priority queue. */
b5b830
   g_task_set_priority (task, priority);
b5b830
-- 
b5b830
2.5.5
b5b830