Blob Blame History Raw
commit c7ca5eccb5709c75797579f508c3f9ae968cd55e
Author: Andrew Beekhof <andrew@beekhof.net>
Date:   Fri Jan 31 10:53:35 2014 +1100

    Fix: Bug rhbz#1057697 - Use native DBus library for systemd async support to avoid problematic use of threads
    
    (cherry picked from commit 2f90aad962c63eba313ed466580703434c80bd1a)

diff --git a/lib/services/dbus.c b/lib/services/dbus.c
index 69ea6ac..a3286f2 100644
--- a/lib/services/dbus.c
+++ b/lib/services/dbus.c
@@ -4,6 +4,9 @@
 #include <dbus/dbus.h>
 #include <pcmk-dbus.h>
 
+#define BUS_PROPERTY_IFACE "org.freedesktop.DBus.Properties"
+
+
 static bool pcmk_dbus_error_check(DBusError *err, const char *prefix, const char *function, int line) 
 {
     if (err && dbus_error_is_set(err)) {
@@ -43,36 +46,18 @@ bool pcmk_dbus_append_arg(DBusMessage *msg, int dtype, const void *value)
     return TRUE;
 }
 
-DBusMessage *pcmk_dbus_send_recv(DBusMessage *msg, DBusConnection *connection, char **e)
+bool
+pcmk_dbus_find_error(const char *method, DBusPendingCall* pending, DBusMessage *reply, DBusError *ret)
 {
     DBusError error;
-    const char *method = NULL;
-    DBusMessage *reply = NULL;
-    DBusPendingCall* pending = NULL;
 
     dbus_error_init(&error);
 
-    CRM_ASSERT(dbus_message_get_type (msg) == DBUS_MESSAGE_TYPE_METHOD_CALL);
-    method = dbus_message_get_member (msg);
+    if(pending == NULL) {
+        error.name = "org.clusterlabs.pacemaker.NoRequest";
+        error.message = "No request sent";
 
-    // send message and get a handle for a reply
-    if (!dbus_connection_send_with_reply (connection, msg, &pending, -1)) { // -1 is default timeout
-        crm_err("Send with reply failed");
-        return NULL;
-    }
-    if (NULL == pending) {
-        crm_err("No pending call found");
-        return NULL;
-    }
-
-    dbus_connection_flush(connection);
-
-    /* block until we receive a reply */
-    dbus_pending_call_block(pending);
-
-    /* get the reply message */
-    reply = dbus_pending_call_steal_reply(pending);
-    if(reply == NULL) {
+    } else if(reply == NULL) {
         error.name = "org.clusterlabs.pacemaker.NoReply";
         error.message = "No reply";
 
@@ -80,7 +65,6 @@ DBusMessage *pcmk_dbus_send_recv(DBusMessage *msg, DBusConnection *connection, c
         DBusMessageIter args;
         int dtype = dbus_message_get_type(reply);
 
-
         switch(dtype) {
             case DBUS_MESSAGE_TYPE_METHOD_RETURN:
                 dbus_message_iter_init(reply, &args);
@@ -104,7 +88,7 @@ DBusMessage *pcmk_dbus_send_recv(DBusMessage *msg, DBusConnection *connection, c
 
             case DBUS_MESSAGE_TYPE_ERROR:
                 dbus_set_error_from_message (&error, reply);
-                crm_err("%s error '%s': %s", method, error.name, error.message);
+                crm_info("%s error '%s': %s", method, error.name, error.message);
                 break;
             default:
                 error.message = "Unknown reply type";
@@ -113,23 +97,86 @@ DBusMessage *pcmk_dbus_send_recv(DBusMessage *msg, DBusConnection *connection, c
         }
     }
 
-    if(error.name) {
-        if(e) {
-            *e = strdup(error.name);
+    if(ret && (error.name || error.message)) {
+        *ret = error;
+        return TRUE;
+    }
+
+    return FALSE;
+}
+
+DBusMessage *pcmk_dbus_send_recv(DBusMessage *msg, DBusConnection *connection, DBusError *error)
+{
+    const char *method = NULL;
+    DBusMessage *reply = NULL;
+    DBusPendingCall* pending = NULL;
+
+    CRM_ASSERT(dbus_message_get_type (msg) == DBUS_MESSAGE_TYPE_METHOD_CALL);
+    method = dbus_message_get_member (msg);
+
+    // send message and get a handle for a reply
+    if (!dbus_connection_send_with_reply (connection, msg, &pending, -1)) { // -1 is default timeout
+        if(error) {
+            error->message = "Call to dbus_connection_send_with_reply() failed";
+            error->name = "org.clusterlabs.pacemaker.SendFailed";
         }
+        crm_err("Error sending %s request", method);
+        return NULL;
+    }
+
+    dbus_connection_flush(connection);
+
+    if(pending) {
+        /* block until we receive a reply */
+        dbus_pending_call_block(pending);
+
+        /* get the reply message */
+        reply = dbus_pending_call_steal_reply(pending);
+    }
+
+    if(pcmk_dbus_find_error(method, pending, reply, error)) {
+        crm_trace("Was error: '%s' '%s'", error->name, error->message);
         if(reply) {
             dbus_message_unref(reply);
             reply = NULL;
         }
-    } else if(e) {
-        *e = NULL;
+    }
+    crm_trace("Was error: '%s' '%s'", error->name, error->message);
+
+    if(pending) {
+        /* free the pending message handle */
+        dbus_pending_call_unref(pending);
     }
 
-    /* free the pending message handle */
-    dbus_pending_call_unref(pending);
     return reply;
 }
 
+bool pcmk_dbus_send(DBusMessage *msg, DBusConnection *connection,
+                    void(*done)(DBusPendingCall *pending, void *user_data), void *user_data)
+{
+    DBusError error;
+    const char *method = NULL;
+    DBusPendingCall* pending = NULL;
+
+    dbus_error_init(&error);
+
+    CRM_ASSERT(dbus_message_get_type (msg) == DBUS_MESSAGE_TYPE_METHOD_CALL);
+    method = dbus_message_get_member (msg);
+
+    // send message and get a handle for a reply
+    if (!dbus_connection_send_with_reply (connection, msg, &pending, -1)) { // -1 is default timeout
+        crm_err("Send with reply failed for %s", method);
+        return FALSE;
+
+    } else if (pending == NULL) {
+        crm_err("No pending call found for %s", method);
+        return FALSE;
+
+    }
+    CRM_ASSERT(dbus_pending_call_set_notify(pending, done, user_data, NULL));
+    return TRUE;
+}
+
 bool pcmk_dbus_type_check(DBusMessage *msg, DBusMessageIter *field, int expected, const char *function, int line)
 {
     int dtype = dbus_message_iter_get_arg_type(field);
@@ -147,8 +194,6 @@ bool pcmk_dbus_type_check(DBusMessage *msg, DBusMessageIter *field, int expected
     return TRUE;
 }
 
-#define BUS_PROPERTY_IFACE "org.freedesktop.DBus.Properties"
-
 char *
 pcmk_dbus_get_property(
     DBusConnection *connection, const char *target, const char *obj, const gchar * iface, const char *name)
@@ -160,10 +205,11 @@ pcmk_dbus_get_property(
     /* DBusBasicValue value; */
     const char *method = "GetAll";
     char *output = NULL;
-    char *error = NULL;
+    DBusError error;
 
         /* desc = systemd_unit_property(path, BUS_NAME ".Unit", "Description"); */
 
+    dbus_error_init(&error);
     crm_info("Calling: %s on %s", method, target);
     msg = dbus_message_new_method_call(target, // target for the method call
                                        obj, // object to call on
@@ -180,7 +226,7 @@ pcmk_dbus_get_property(
     reply = pcmk_dbus_send_recv(msg, connection, &error);
     dbus_message_unref(msg);
 
-    if(reply == NULL) {
+    if(error.name) {
         crm_err("Call to %s for %s failed: No reply", method, iface);
         return NULL;
 
@@ -242,20 +288,105 @@ pcmk_dbus_get_property(
     return output;
 }
 
+static void pcmk_dbus_connection_dispatch(DBusConnection *connection, DBusDispatchStatus new_status, void *data){
+    crm_trace("status %d for %p", new_status, data);
+    if (new_status == DBUS_DISPATCH_DATA_REMAINS){
+        dbus_connection_dispatch(connection);
+    }
+}
+
+static int
+pcmk_dbus_watch_dispatch(gpointer userdata)
+{
+    DBusWatch *watch = userdata;
+    int flags = dbus_watch_get_flags(watch);
 
+    crm_trace("Dispatching %p with flags %d", watch, flags);
+    if(flags & DBUS_WATCH_READABLE) {
+        dbus_watch_handle(watch, DBUS_WATCH_READABLE);
+    } else {
+        dbus_watch_handle(watch, DBUS_WATCH_ERROR);
+    }
+    return 0;
+}
+
+static void
+pcmk_dbus_watch_destroy(gpointer userdata)
+{
+    crm_trace("Destroyed %p", userdata);
+}
 
 
+struct mainloop_fd_callbacks pcmk_dbus_cb = {
+    .dispatch = pcmk_dbus_watch_dispatch,
+    .destroy = pcmk_dbus_watch_destroy,
+};
 
-int dbus_watch_get_unix_fd	(	DBusWatch * 	watch	);
+static dbus_bool_t
+pcmk_dbus_watch_add(DBusWatch *watch, void *data){
+    int fd = dbus_watch_get_unix_fd(watch);
 
+    mainloop_io_t *client = mainloop_add_fd(
+        "dbus", G_PRIORITY_DEFAULT, fd, watch, &pcmk_dbus_cb);
 
-/* http://dbus.freedesktop.org/doc/api/html/group__DBusConnection.html#gaebf031eb444b4f847606aa27daa3d8e6 */
-    
-DBUS_EXPORT dbus_bool_t dbus_connection_set_watch_functions(
-    DBusConnection * 	connection,
-    DBusAddWatchFunction 	add_function,
-    DBusRemoveWatchFunction 	remove_function,
-    DBusWatchToggledFunction 	toggled_function,
-    void * 	data,
-    DBusFreeFunction 	free_data_function 
-    );
+    crm_trace("Added %p with fd=%d", watch, fd);
+    dbus_watch_set_data(watch, client, NULL);
+    return TRUE;
+}
+
+static void
+pcmk_dbus_watch_remove(DBusWatch *watch, void *data){
+    mainloop_io_t *client = dbus_watch_get_data(watch);
+
+    crm_trace("Removed %p", watch);
+    mainloop_del_fd(client);
+}
+
+static gboolean
+pcmk_dbus_timeout_dispatch(gpointer data)
+{
+    crm_trace("Timeout for %p");
+    dbus_timeout_handle(data);
+    return FALSE;
+}
+
+static dbus_bool_t
+pcmk_dbus_timeout_add(DBusTimeout *timeout, void *data){
+    guint id = g_timeout_add(dbus_timeout_get_interval(timeout), pcmk_dbus_timeout_dispatch, timeout);
+
+    if(id) {
+        dbus_timeout_set_data(timeout, GUINT_TO_POINTER(id), NULL);
+    }
+    return TRUE;
+}
+
+static void
+pcmk_dbus_timeout_remove(DBusTimeout *timeout, void *data){
+    void *vid = dbus_timeout_get_data(timeout);
+    guint id = GPOINTER_TO_UINT(vid);
+
+    if(id) {
+        g_source_remove(id);
+        dbus_timeout_set_data(timeout, 0, NULL);
+    }
+}
+
+static void
+pcmk_dbus_timeout_toggle(DBusTimeout *timeout, void *data){
+    if(dbus_timeout_get_enabled(timeout)) {
+        pcmk_dbus_timeout_add(timeout, data);
+    } else {
+        pcmk_dbus_timeout_remove(timeout, data);
+    }
+}
+
+/* Inspired by http://www.kolej.mff.cuni.cz/~vesej3am/devel/dbus-select.c */
+
+void pcmk_dbus_connection_setup_with_select(DBusConnection *c){
+	dbus_connection_set_timeout_functions(
+            c, pcmk_dbus_timeout_add, pcmk_dbus_timeout_remove, pcmk_dbus_timeout_toggle, NULL, NULL);
+	dbus_connection_set_watch_functions(c, pcmk_dbus_watch_add, pcmk_dbus_watch_remove, NULL, NULL, NULL);
+	dbus_connection_set_dispatch_status_function(c, pcmk_dbus_connection_dispatch, NULL, NULL);
+
+	pcmk_dbus_connection_dispatch(c, dbus_connection_get_dispatch_status(c), NULL);
+}
diff --git a/lib/services/pcmk-dbus.h b/lib/services/pcmk-dbus.h
index 27ac737..c8d2234 100644
--- a/lib/services/pcmk-dbus.h
+++ b/lib/services/pcmk-dbus.h
@@ -1,7 +1,13 @@
 DBusConnection *pcmk_dbus_connect(void);
+void pcmk_dbus_connection_setup_with_select(DBusConnection *c);
 void pcmk_dbus_disconnect(DBusConnection *connection);
 
-DBusMessage *pcmk_dbus_send_recv(DBusMessage *msg, DBusConnection *connection, char **error);
+bool pcmk_dbus_send(DBusMessage *msg, DBusConnection *connection,
+                    void(*done)(DBusPendingCall *pending, void *user_data), void *user_data);
+DBusMessage *pcmk_dbus_send_recv(DBusMessage *msg, DBusConnection *connection, DBusError *error);
 bool pcmk_dbus_append_arg(DBusMessage *msg, int dtype, const void *value);
 bool pcmk_dbus_type_check(DBusMessage *msg, DBusMessageIter *field, int expected, const char *function, int line);
 char *pcmk_dbus_get_property(DBusConnection *connection, const char *target, const char *obj, const gchar * iface, const char *name);
+
+bool pcmk_dbus_find_error(const char *method, DBusPendingCall* pending, DBusMessage *reply, DBusError *error);
+
diff --git a/lib/services/systemd.c b/lib/services/systemd.c
index 9aa5b03..a06d547 100644
--- a/lib/services/systemd.c
+++ b/lib/services/systemd.c
@@ -69,6 +69,7 @@ systemd_init(void)
     if (need_init) {
         need_init = 0;
         systemd_proxy = pcmk_dbus_connect();
+        pcmk_dbus_connection_setup_with_select(systemd_proxy);
     }
     if (systemd_proxy == NULL) {
         return FALSE;
@@ -122,7 +123,7 @@ systemd_unit_by_name(const gchar * arg_name, gchar ** out_unit)
     DBusMessage *reply = NULL;
     const char *method = "GetUnit";
     char *name = NULL;
-    char *error = NULL;
+    DBusError error;
 
 /*
   <method name="GetUnit">
@@ -144,13 +145,12 @@ systemd_unit_by_name(const gchar * arg_name, gchar ** out_unit)
 
         pcmk_dbus_append_arg(msg, DBUS_TYPE_STRING, &name);
 
+        dbus_error_init(&error);
         reply = pcmk_dbus_send_recv(msg, systemd_proxy, &error);
         dbus_message_unref(msg);
 
-        if(error) {
-            crm_info("Call to %s failed: %s", method, error);
-            free(error);
-            error = NULL;
+        if(error.name) {
+            crm_info("Call to %s failed: %s", method, error.name);
 
         } else if (dbus_message_iter_init(reply, &args)) {
 
@@ -192,7 +192,7 @@ systemd_unit_listall(void)
     DBusMessage *msg = NULL;
     DBusMessage *reply = NULL;
     const char *method = "ListUnits";
-    char *error = NULL;
+    DBusError error;
 
     if (systemd_init() == FALSE) {
         return NULL;
@@ -204,15 +204,15 @@ systemd_unit_listall(void)
         "  </method>\n"                                                 \
 */
 
+    dbus_error_init(&error);
     msg = systemd_new_method(BUS_NAME".Manager", method);
     CRM_ASSERT(msg != NULL);
 
     reply = pcmk_dbus_send_recv(msg, systemd_proxy, &error);
     dbus_message_unref(msg);
 
-    if(error) {
-        crm_err("Call to %s failed: %s", method, error);
-        free(error);
+    if(error.name) {
+        crm_err("Call to %s failed: %s", method, error.name);
         return NULL;
 
     } else if (!dbus_message_iter_init(reply, &args)) {
@@ -318,64 +318,84 @@ systemd_unit_metadata(const char *name)
     return meta;
 }
 
-#if 0
+static bool
+systemd_mask_error(svc_action_t *op, const char *error)
+{
+    crm_trace("Could not issue %s for %s: %s", op->action, op->rsc, error);
+    if(strstr(error, "org.freedesktop.systemd1.InvalidName")
+       || strstr(error, "org.freedesktop.systemd1.LoadFailed")
+       || strstr(error, "org.freedesktop.systemd1.NoSuchUnit")) {
+
+        if (safe_str_eq(op->action, "stop")) {
+            crm_trace("Masking %s failure for %s: unknown services are stopped", op->action, op->rsc);
+            op->rc = PCMK_OCF_OK;
+
+        } else {
+            crm_trace("Mapping %s failure for %s: unknown services are not installed", op->action, op->rsc);
+            op->rc = PCMK_OCF_NOT_INSTALLED;
+            op->status = PCMK_LRM_OP_NOT_INSTALLED;
+        }
+        return TRUE;
+    }
+
+    return FALSE;
+}
+
 static void
-systemd_unit_exec_done(GObject * source_object, GAsyncResult * res, gpointer user_data)
+systemd_async_dispatch(DBusPendingCall *pending, void *user_data)
 {
-    GError *error = NULL;
-    GVariant *_ret = NULL;
+    DBusError error;
+    DBusMessage *reply = NULL;
     svc_action_t *op = user_data;
-    GDBusProxy *proxy = G_DBUS_PROXY(source_object);
 
-    /* Obtain rc and stderr/out */
-    _ret = g_dbus_proxy_call_finish(proxy, res, &error);
+    dbus_error_init(&error);
+    if(pending) {
+        reply = dbus_pending_call_steal_reply(pending);
+    }
+    if(pcmk_dbus_find_error(op->action, pending, reply, &error)) {
 
-    if (error) {
         /* ignore "already started" or "not running" errors */
-        crm_trace("Could not issue %s for %s: %s", op->action, op->rsc, error->message);
-        if (strstr(error->message, "systemd1.LoadFailed")
-            || strstr(error->message, "systemd1.InvalidName")) {
+        if (!systemd_mask_error(op, error.name)) {
+            crm_err("%s for %s: %s", op->action, op->rsc, error.message);
+        }
 
-            if (safe_str_eq(op->action, "stop")) {
-                crm_trace("Masking Stop failure for %s: unknown services are stopped", op->rsc);
-                op->rc = PCMK_OCF_OK;
+    } else {
+        DBusMessageIter args;
 
-            } else {
-                op->rc = PCMK_OCF_NOT_INSTALLED;
-                op->status = PCMK_LRM_OP_NOT_INSTALLED;
-            }
+        if(!dbus_message_iter_init(reply, &args)) {
+            crm_err("Call to %s failed: no arguments", op->action);
 
-        } else {
-            crm_err("Could not issue %s for %s: %s", op->action, op->rsc, error->message);
-        }
-        g_error_free(error);
-
-    } else if(g_variant_is_of_type (_ret, G_VARIANT_TYPE("(o)"))) {
-        char *path = NULL;
+        } else if(!pcmk_dbus_type_check(reply, &args, DBUS_TYPE_OBJECT_PATH, __FUNCTION__, __LINE__)) {
+            crm_warn("Call to %s passed but return type was unexpected", op->action);
+            op->rc = PCMK_OCF_OK;
 
-        g_variant_get(_ret, "(o)", &path);
-        crm_info("Call to %s passed: type '%s' %s", op->action, g_variant_get_type_string(_ret),
-                 path);
-        op->rc = PCMK_OCF_OK;
+        } else {
+            const char *path = NULL;
 
-    } else {
-        crm_err("Call to %s passed but return type was '%s' not '(o)'", op->action, g_variant_get_type_string(_ret));
-        op->rc = PCMK_OCF_OK;
+            dbus_message_get_args (reply, NULL,
+                                   DBUS_TYPE_OBJECT_PATH, &path,
+                                   DBUS_TYPE_INVALID);
+            crm_info("Call to %s passed: %s", op->action, path);
+            op->rc = PCMK_OCF_OK;
+        }
     }
 
     operation_finalize(op);
-    if (_ret) {
-        g_variant_unref(_ret);
+
+    if(pending) {
+        dbus_pending_call_unref(pending);
+    }
+    if(reply) {
+        dbus_message_unref(reply);
     }
 }
-#endif
 
 #define SYSTEMD_OVERRIDE_ROOT "/run/systemd/system/"
 
 gboolean
 systemd_unit_exec(svc_action_t * op, gboolean synchronous)
 {
-    char *error = NULL;
+    DBusError error;
     char *unit = NULL;
     const char *replace_s = "replace";
     gboolean pass = FALSE;
@@ -384,7 +404,8 @@ systemd_unit_exec(svc_action_t * op, gboolean synchronous)
     DBusMessage *msg = NULL;
     DBusMessage *reply = NULL;
     DBusMessageIter args;
-    
+
+    dbus_error_init(&error);
     op->rc = PCMK_OCF_UNKNOWN_ERROR;
     CRM_ASSERT(systemd_init());
 
@@ -465,16 +486,6 @@ systemd_unit_exec(svc_action_t * op, gboolean synchronous)
 
     crm_debug("Calling %s for %s: %s", method, op->rsc, unit);
 
-#if 0
-    if (synchronous == FALSE) {
-        g_dbus_proxy_call(systemd_proxy, method, g_variant_new("(ss)", name, "replace"),
-                          G_DBUS_CALL_FLAGS_NONE, op->timeout, NULL, systemd_unit_exec_done, op);
-        free(unit);
-        free(name);
-        return TRUE;
-    }
-#endif
-
     msg = systemd_new_method(BUS_NAME".Manager", method);
     CRM_ASSERT(msg != NULL);
 
@@ -482,43 +493,47 @@ systemd_unit_exec(svc_action_t * op, gboolean synchronous)
     pcmk_dbus_append_arg(msg, DBUS_TYPE_STRING, &name);
     pcmk_dbus_append_arg(msg, DBUS_TYPE_STRING, &replace_s);
 
+    if (synchronous == FALSE) {
+        free(unit);
+        free(name);
+        return pcmk_dbus_send(msg, systemd_proxy, systemd_async_dispatch, op);
+    }
+
     reply = pcmk_dbus_send_recv(msg, systemd_proxy, &error);
-    dbus_message_unref(msg);
 
-    if(error) {
+    if(error.name) {
         /* ignore "already started" or "not running" errors */
-        if (safe_str_eq(op->action, "stop")
-            && (strstr(error, "org.freedesktop.systemd1.InvalidName")
-                || strstr(error, "org.freedesktop.systemd1.NoSuchUnit"))) {
-            crm_trace("Masking Stop failure for %s: unknown services are stopped", op->rsc);
-            op->rc = PCMK_OCF_OK;
-        } else {
-            crm_err("Could not issue %s for %s: %s (%s)", method, op->rsc, error, unit);
+        if(!systemd_mask_error(op, error.name)) {
+            crm_err("Could not issue %s for %s: %s (%s)", method, op->rsc, error.name, unit);
         }
         goto cleanup;
 
     } else if(!dbus_message_iter_init(reply, &args)) {
         crm_err("Call to %s failed: no arguments", method);
         goto cleanup;
-    }
 
-    /* (o) */
-    if(!pcmk_dbus_type_check(reply, &args, DBUS_TYPE_OBJECT_PATH, __FUNCTION__, __LINE__)) {
-        crm_err("Call to %s failed: Message has invalid arguments", method);
+    } else if(!pcmk_dbus_type_check(reply, &args, DBUS_TYPE_OBJECT_PATH, __FUNCTION__, __LINE__)) {
+        crm_warn("Call to %s passed but return type was unexpected", op->action);
+        op->rc = PCMK_OCF_OK;
 
     } else {
-        DBusBasicValue value;
+        const char *path = NULL;
 
-        dbus_message_iter_get_basic(&args, &value);
-        crm_info("Call to %s passed: %s", op->action, value.str);
+        dbus_message_get_args (reply, NULL,
+                               DBUS_TYPE_OBJECT_PATH, &path,
+                               DBUS_TYPE_INVALID);
+        crm_info("Call to %s passed: %s", op->action, path);
         op->rc = PCMK_OCF_OK;
     }
 
   cleanup:
-    free(error);
     free(unit);
     free(name);
 
+    if(msg) {
+        dbus_message_unref(msg);
+    }
+
     if(reply) {
         dbus_message_unref(reply);
     }