Blame rabbitmq-server-0005-Avoid-RPC-roundtrips-in-list-commands.patch

5b2912
From: Alexey Lebedeff <alebedev@mirantis.com>
5b2912
Date: Wed, 9 Mar 2016 18:09:04 +0300
5b2912
Subject: [PATCH] Avoid RPC roundtrips in list commands
5b2912
5b2912
Current implementation of various `list_XXX` commands require cross-node
5b2912
roundtrip for every processed item - because `rabbitmqctl` target node
5b2912
is responsible for gathering global list of all items of
5b2912
interest (channels etc.) and then processing them one by one.
5b2912
5b2912
For example, listing 10000 channels evenly distributed across 3 nodes
5b2912
where network has 1ms delay takes more than 10 seconds on my
5b2912
machine. And with the proposed change listing will take almost the same
5b2912
time as it'll take to gather this info locally. E.g. in the case above
5b2912
listing now takes 0.7 second on the same machine with same 1ms delay.
5b2912
5b2912
It works by invoking emitting_map on every node, where it should send
5b2912
info about only local items to aggregator, in an async fashion - as no
5b2912
reply from aggregator is needed.
5b2912
5b2912
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
5b2912
index fb3da21..1a98dcc 100644
5b2912
--- a/src/rabbit_control_main.erl
5b2912
+++ b/src/rabbit_control_main.erl
5b2912
@@ -23,7 +23,7 @@
5b2912
          sync_queue/1, cancel_sync_queue/1, become/1,
5b2912
          purge_queue/1]).
5b2912
 
5b2912
--import(rabbit_misc, [rpc_call/4, rpc_call/5, rpc_call/7]).
5b2912
+-import(rabbit_misc, [rpc_call/4, rpc_call/5]).
5b2912
 
5b2912
 -define(EXTERNAL_CHECK_INTERVAL, 1000).
5b2912
 
5b2912
@@ -589,56 +589,74 @@ action(purge_queue, Node, [Q], Opts, Inform, Timeout) ->
5b2912
 
5b2912
 action(list_users, Node, [], _Opts, Inform, Timeout) ->
5b2912
     Inform("Listing users", []),
5b2912
-    call(Node, {rabbit_auth_backend_internal, list_users, []},
5b2912
-         rabbit_auth_backend_internal:user_info_keys(), true, Timeout);
5b2912
+    call_emitter(Node, {rabbit_auth_backend_internal, list_users, []},
5b2912
+                 rabbit_auth_backend_internal:user_info_keys(),
5b2912
+                 [{timeout, Timeout}, to_bin_utf8]);
5b2912
 
5b2912
 action(list_permissions, Node, [], Opts, Inform, Timeout) ->
5b2912
     VHost = proplists:get_value(?VHOST_OPT, Opts),
5b2912
     Inform("Listing permissions in vhost \"~s\"", [VHost]),
5b2912
-    call(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]},
5b2912
-         rabbit_auth_backend_internal:vhost_perms_info_keys(), true, Timeout,
5b2912
-         true);
5b2912
+    call_emitter(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]},
5b2912
+                 rabbit_auth_backend_internal:vhost_perms_info_keys(),
5b2912
+                 [{timeout, Timeout}, to_bin_utf8, is_escaped]);
5b2912
 
5b2912
 action(list_parameters, Node, [], Opts, Inform, Timeout) ->
5b2912
     VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
5b2912
     Inform("Listing runtime parameters", []),
5b2912
-    call(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]},
5b2912
-         rabbit_runtime_parameters:info_keys(), Timeout);
5b2912
+    call_emitter(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]},
5b2912
+                 rabbit_runtime_parameters:info_keys(),
5b2912
+                 [{timeout, Timeout}]);
5b2912
 
5b2912
 action(list_policies, Node, [], Opts, Inform, Timeout) ->
5b2912
     VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
5b2912
     Inform("Listing policies", []),
5b2912
-    call(Node, {rabbit_policy, list_formatted, [VHostArg]},
5b2912
-         rabbit_policy:info_keys(), Timeout);
5b2912
+    call_emitter(Node, {rabbit_policy, list_formatted, [VHostArg]},
5b2912
+                 rabbit_policy:info_keys(),
5b2912
+                 [{timeout, Timeout}]);
5b2912
 
5b2912
 action(list_vhosts, Node, Args, _Opts, Inform, Timeout) ->
5b2912
     Inform("Listing vhosts", []),
5b2912
     ArgAtoms = default_if_empty(Args, [name]),
5b2912
-    call(Node, {rabbit_vhost, info_all, []}, ArgAtoms, true, Timeout);
5b2912
+    call_emitter(Node, {rabbit_vhost, info_all, []}, ArgAtoms,
5b2912
+                 [{timeout, Timeout}, to_bin_utf8]);
5b2912
 
5b2912
 action(list_user_permissions, _Node, _Args = [], _Opts, _Inform, _Timeout) ->
5b2912
     {error_string,
5b2912
      "list_user_permissions expects a username argument, but none provided."};
5b2912
 action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) ->
5b2912
     Inform("Listing permissions for user ~p", Args),
5b2912
-    call(Node, {rabbit_auth_backend_internal, list_user_permissions, Args},
5b2912
-         rabbit_auth_backend_internal:user_perms_info_keys(), true, Timeout,
5b2912
-         true);
5b2912
+    call_emitter(Node, {rabbit_auth_backend_internal, list_user_permissions, Args},
5b2912
+                 rabbit_auth_backend_internal:user_perms_info_keys(),
5b2912
+                 [{timeout, Timeout}, to_bin_utf8, is_escaped]);
5b2912
 
5b2912
 action(list_queues, Node, Args, Opts, Inform, Timeout) ->
5b2912
-    [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
5b2912
     Inform("Listing queues", []),
5b2912
+    %% User options
5b2912
+    [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
5b2912
     VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
5b2912
     ArgAtoms = default_if_empty(Args, [name, messages]),
5b2912
-    call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms, Online, Offline]},
5b2912
-         ArgAtoms, Timeout);
5b2912
+
5b2912
+    %% Data for emission
5b2912
+    Nodes = nodes_in_cluster(Node, Timeout),
5b2912
+    OnlineChunks = if Online -> length(Nodes); true -> 0 end,
5b2912
+    OfflineChunks = if Offline -> 1; true -> 0 end,
5b2912
+    ChunksOpt = {chunks, OnlineChunks + OfflineChunks},
5b2912
+    TimeoutOpt = {timeout, Timeout},
5b2912
+    EmissionRef = make_ref(),
5b2912
+    EmissionRefOpt = {ref, EmissionRef},
5b2912
+
5b2912
+    _ = Online andalso start_emission(Node, {rabbit_amqqueue, emit_info_all, [Nodes, VHostArg, ArgAtoms]},
5b2912
+                                      [TimeoutOpt, EmissionRefOpt]),
5b2912
+    _ = Offline andalso start_emission(Node, {rabbit_amqqueue, emit_info_down, [VHostArg, ArgAtoms]},
5b2912
+                                       [TimeoutOpt, EmissionRefOpt]),
5b2912
+    display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]);
5b2912
 
5b2912
 action(list_exchanges, Node, Args, Opts, Inform, Timeout) ->
5b2912
     Inform("Listing exchanges", []),
5b2912
     VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
5b2912
     ArgAtoms = default_if_empty(Args, [name, type]),
5b2912
-    call(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]},
5b2912
-         ArgAtoms, Timeout);
5b2912
+    call_emitter(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]},
5b2912
+                 ArgAtoms, [{timeout, Timeout}]);
5b2912
 
5b2912
 action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
5b2912
     Inform("Listing bindings", []),
5b2912
@@ -646,27 +664,31 @@ action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
5b2912
     ArgAtoms = default_if_empty(Args, [source_name, source_kind,
5b2912
                                        destination_name, destination_kind,
5b2912
                                        routing_key, arguments]),
5b2912
-    call(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]},
5b2912
-         ArgAtoms, Timeout);
5b2912
+    call_emitter(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]},
5b2912
+                 ArgAtoms, [{timeout, Timeout}]);
5b2912
 
5b2912
 action(list_connections, Node, Args, _Opts, Inform, Timeout) ->
5b2912
     Inform("Listing connections", []),
5b2912
     ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]),
5b2912
-    call(Node, {rabbit_networking, connection_info_all, [ArgAtoms]},
5b2912
-         ArgAtoms, Timeout);
5b2912
+    Nodes = nodes_in_cluster(Node, Timeout),
5b2912
+    call_emitter(Node, {rabbit_networking, emit_connection_info_all, [Nodes, ArgAtoms]},
5b2912
+                 ArgAtoms, [{timeout, Timeout}, {chunks, length(Nodes)}]);
5b2912
 
5b2912
 action(list_channels, Node, Args, _Opts, Inform, Timeout) ->
5b2912
     Inform("Listing channels", []),
5b2912
     ArgAtoms = default_if_empty(Args, [pid, user, consumer_count,
5b2912
                                        messages_unacknowledged]),
5b2912
-    call(Node, {rabbit_channel, info_all, [ArgAtoms]},
5b2912
-         ArgAtoms, Timeout);
5b2912
+    Nodes = nodes_in_cluster(Node, Timeout),
5b2912
+    call_emitter(Node, {rabbit_channel, emit_info_all, [Nodes, ArgAtoms]}, ArgAtoms,
5b2912
+                 [{timeout, Timeout}, {chunks, length(Nodes)}]);
5b2912
 
5b2912
 action(list_consumers, Node, _Args, Opts, Inform, Timeout) ->
5b2912
     Inform("Listing consumers", []),
5b2912
     VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
5b2912
-    call(Node, {rabbit_amqqueue, consumers_all, [VHostArg]},
5b2912
-         rabbit_amqqueue:consumer_info_keys(), Timeout).
5b2912
+    Nodes = nodes_in_cluster(Node, Timeout),
5b2912
+    call_emitter(Node, {rabbit_amqqueue, emit_consumers_all, [Nodes, VHostArg]},
5b2912
+                 rabbit_amqqueue:consumer_info_keys(),
5b2912
+                 [{timeout, Timeout}, {chunks, length(Nodes)}]).
5b2912
 
5b2912
 format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)).
5b2912
 
5b2912
@@ -772,17 +794,18 @@ display_info_message_row(IsEscaped, Result, InfoItemKeys) ->
5b2912
                        {X, Value} -> Value
5b2912
                    end, IsEscaped) || X <- InfoItemKeys]).
5b2912
 
5b2912
-display_info_message(IsEscaped) ->
5b2912
+display_info_message(IsEscaped, InfoItemKeys) ->
5b2912
     fun ([], _) ->
5b2912
             ok;
5b2912
-        ([FirstResult|_] = List, InfoItemKeys) when is_list(FirstResult) ->
5b2912
+        ([FirstResult|_] = List, _) when is_list(FirstResult) ->
5b2912
             lists:foreach(fun(Result) ->
5b2912
                                   display_info_message_row(IsEscaped, Result, InfoItemKeys)
5b2912
                           end,
5b2912
                           List),
5b2912
             ok;
5b2912
-        (Result, InfoItemKeys) ->
5b2912
-            display_info_message_row(IsEscaped, Result, InfoItemKeys)
5b2912
+        (Result, _) ->
5b2912
+            display_info_message_row(IsEscaped, Result, InfoItemKeys),
5b2912
+            ok
5b2912
     end.
5b2912
 
5b2912
 display_info_list(Results, InfoItemKeys) when is_list(Results) ->
5b2912
@@ -839,7 +862,10 @@ display_call_result(Node, MFA) ->
5b2912
     end.
5b2912
 
5b2912
 unsafe_rpc(Node, Mod, Fun, Args) ->
5b2912
-    case rpc_call(Node, Mod, Fun, Args) of
5b2912
+    unsafe_rpc(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
5b2912
+
5b2912
+unsafe_rpc(Node, Mod, Fun, Args, Timeout) ->
5b2912
+    case rpc_call(Node, Mod, Fun, Args, Timeout) of
5b2912
         {badrpc, _} = Res -> throw(Res);
5b2912
         Normal            -> Normal
5b2912
     end.
5b2912
@@ -858,33 +884,42 @@ ensure_app_running(Node) ->
5b2912
 call(Node, {Mod, Fun, Args}) ->
5b2912
     rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
5b2912
 
5b2912
-call(Node, {Mod, Fun, Args}, InfoKeys, Timeout) ->
5b2912
-    call(Node, {Mod, Fun, Args}, InfoKeys, false, Timeout, false).
5b2912
+call_emitter(Node, {Mod, Fun, Args}, InfoKeys, Opts) ->
5b2912
+    Ref = start_emission(Node, {Mod, Fun, Args}, Opts),
5b2912
+    display_emission_result(Ref, InfoKeys, Opts).
5b2912
+
5b2912
+start_emission(Node, {Mod, Fun, Args}, Opts) ->
5b2912
+    ToBinUtf8 = proplists:get_value(to_bin_utf8, Opts, false),
5b2912
+    Timeout = proplists:get_value(timeout, Opts, infinity),
5b2912
+    Ref = proplists:get_value(ref, Opts, make_ref()),
5b2912
+    rabbit_control_misc:spawn_emitter_caller(
5b2912
+      Node, Mod, Fun, prepare_call_args(Args, ToBinUtf8),
5b2912
+      Ref, self(), Timeout),
5b2912
+    Ref.
5b2912
+
5b2912
+display_emission_result(Ref, InfoKeys, Opts) ->
5b2912
+    IsEscaped = proplists:get_value(is_escaped, Opts, false),
5b2912
+    Chunks = proplists:get_value(chunks, Opts, 1),
5b2912
+    Timeout = proplists:get_value(timeout, Opts, infinity),
5b2912
+    EmissionStatus = rabbit_control_misc:wait_for_info_messages(
5b2912
+                       self(), Ref, display_info_message(IsEscaped, InfoKeys), ok, Timeout, Chunks),
5b2912
+    emission_to_action_result(EmissionStatus).
5b2912
+
5b2912
+%% Convert rabbit_control_misc:wait_for_info_messages/6 return value
5b2912
+%% into form expected by rabbit_cli:main/3.
5b2912
+emission_to_action_result({ok, ok}) ->
5b2912
+    ok;
5b2912
+emission_to_action_result({error, Error}) ->
5b2912
+    Error.
5b2912
 
5b2912
-call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout) ->
5b2912
-    call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, false).
5b2912
+prepare_call_args(Args, ToBinUtf8) ->
5b2912
+    case ToBinUtf8 of
5b2912
+        true  -> valid_utf8_args(Args);
5b2912
+        false -> Args
5b2912
+    end.
5b2912
 
5b2912
-call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, IsEscaped) ->
5b2912
-    Args0 = case ToBinUtf8 of
5b2912
-                true  -> lists:map(fun list_to_binary_utf8/1, Args);
5b2912
-                false -> Args
5b2912
-            end,
5b2912
-    Ref = make_ref(),
5b2912
-    Pid = self(),
5b2912
-    spawn_link(
5b2912
-      fun () ->
5b2912
-              case rabbit_cli:rpc_call(Node, Mod, Fun, Args0,
5b2912
-                                       Ref, Pid, Timeout) of
5b2912
-                  {error, _} = Error        ->
5b2912
-                      Pid ! {error, Error};
5b2912
-                  {bad_argument, _} = Error ->
5b2912
-                      Pid ! {error, Error};
5b2912
-                  _                         ->
5b2912
-                      ok
5b2912
-              end
5b2912
-      end),
5b2912
-    rabbit_control_misc:wait_for_info_messages(
5b2912
-      Pid, Ref, InfoKeys, display_info_message(IsEscaped), Timeout).
5b2912
+valid_utf8_args(Args) ->
5b2912
+    lists:map(fun list_to_binary_utf8/1, Args).
5b2912
 
5b2912
 list_to_binary_utf8(L) ->
5b2912
     B = list_to_binary(L),
5b2912
@@ -934,7 +969,10 @@ split_list([_])        -> exit(even_list_needed);
5b2912
 split_list([A, B | T]) -> [{A, B} | split_list(T)].
5b2912
 
5b2912
 nodes_in_cluster(Node) ->
5b2912
-    unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]).
5b2912
+    unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], ?RPC_TIMEOUT).
5b2912
+
5b2912
+nodes_in_cluster(Node, Timeout) ->
5b2912
+    unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], Timeout).
5b2912
 
5b2912
 alarms_by_node(Name) ->
5b2912
     Status = unsafe_rpc(Name, rabbit, status, []),