Blob Blame History Raw
From: Alexey Lebedeff <alebedev@mirantis.com>
Date: Wed, 9 Mar 2016 18:09:04 +0300
Subject: [PATCH] Avoid RPC roundtrips in list commands

Current implementation of various `list_XXX` commands require cross-node
roundtrip for every processed item - because `rabbitmqctl` target node
is responsible for gathering global list of all items of
interest (channels etc.) and then processing them one by one.

For example, listing 10000 channels evenly distributed across 3 nodes
where network has 1ms delay takes more than 10 seconds on my
machine. And with the proposed change listing will take almost the same
time as it'll take to gather this info locally. E.g. in the case above
listing now takes 0.7 second on the same machine with same 1ms delay.

It works by invoking emitting_map on every node, where it should send
info about only local items to aggregator, in an async fashion - as no
reply from aggregator is needed.

diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index fb3da21..1a98dcc 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -23,7 +23,7 @@
          sync_queue/1, cancel_sync_queue/1, become/1,
          purge_queue/1]).
 
--import(rabbit_misc, [rpc_call/4, rpc_call/5, rpc_call/7]).
+-import(rabbit_misc, [rpc_call/4, rpc_call/5]).
 
 -define(EXTERNAL_CHECK_INTERVAL, 1000).
 
@@ -589,56 +589,74 @@ action(purge_queue, Node, [Q], Opts, Inform, Timeout) ->
 
 action(list_users, Node, [], _Opts, Inform, Timeout) ->
     Inform("Listing users", []),
-    call(Node, {rabbit_auth_backend_internal, list_users, []},
-         rabbit_auth_backend_internal:user_info_keys(), true, Timeout);
+    call_emitter(Node, {rabbit_auth_backend_internal, list_users, []},
+                 rabbit_auth_backend_internal:user_info_keys(),
+                 [{timeout, Timeout}, to_bin_utf8]);
 
 action(list_permissions, Node, [], Opts, Inform, Timeout) ->
     VHost = proplists:get_value(?VHOST_OPT, Opts),
     Inform("Listing permissions in vhost \"~s\"", [VHost]),
-    call(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]},
-         rabbit_auth_backend_internal:vhost_perms_info_keys(), true, Timeout,
-         true);
+    call_emitter(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]},
+                 rabbit_auth_backend_internal:vhost_perms_info_keys(),
+                 [{timeout, Timeout}, to_bin_utf8, is_escaped]);
 
 action(list_parameters, Node, [], Opts, Inform, Timeout) ->
     VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
     Inform("Listing runtime parameters", []),
-    call(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]},
-         rabbit_runtime_parameters:info_keys(), Timeout);
+    call_emitter(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]},
+                 rabbit_runtime_parameters:info_keys(),
+                 [{timeout, Timeout}]);
 
 action(list_policies, Node, [], Opts, Inform, Timeout) ->
     VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
     Inform("Listing policies", []),
-    call(Node, {rabbit_policy, list_formatted, [VHostArg]},
-         rabbit_policy:info_keys(), Timeout);
+    call_emitter(Node, {rabbit_policy, list_formatted, [VHostArg]},
+                 rabbit_policy:info_keys(),
+                 [{timeout, Timeout}]);
 
 action(list_vhosts, Node, Args, _Opts, Inform, Timeout) ->
     Inform("Listing vhosts", []),
     ArgAtoms = default_if_empty(Args, [name]),
-    call(Node, {rabbit_vhost, info_all, []}, ArgAtoms, true, Timeout);
+    call_emitter(Node, {rabbit_vhost, info_all, []}, ArgAtoms,
+                 [{timeout, Timeout}, to_bin_utf8]);
 
 action(list_user_permissions, _Node, _Args = [], _Opts, _Inform, _Timeout) ->
     {error_string,
      "list_user_permissions expects a username argument, but none provided."};
 action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) ->
     Inform("Listing permissions for user ~p", Args),
-    call(Node, {rabbit_auth_backend_internal, list_user_permissions, Args},
-         rabbit_auth_backend_internal:user_perms_info_keys(), true, Timeout,
-         true);
+    call_emitter(Node, {rabbit_auth_backend_internal, list_user_permissions, Args},
+                 rabbit_auth_backend_internal:user_perms_info_keys(),
+                 [{timeout, Timeout}, to_bin_utf8, is_escaped]);
 
 action(list_queues, Node, Args, Opts, Inform, Timeout) ->
-    [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
     Inform("Listing queues", []),
+    %% User options
+    [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
     VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
     ArgAtoms = default_if_empty(Args, [name, messages]),
-    call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms, Online, Offline]},
-         ArgAtoms, Timeout);
+
+    %% Data for emission
+    Nodes = nodes_in_cluster(Node, Timeout),
+    OnlineChunks = if Online -> length(Nodes); true -> 0 end,
+    OfflineChunks = if Offline -> 1; true -> 0 end,
+    ChunksOpt = {chunks, OnlineChunks + OfflineChunks},
+    TimeoutOpt = {timeout, Timeout},
+    EmissionRef = make_ref(),
+    EmissionRefOpt = {ref, EmissionRef},
+
+    _ = Online andalso start_emission(Node, {rabbit_amqqueue, emit_info_all, [Nodes, VHostArg, ArgAtoms]},
+                                      [TimeoutOpt, EmissionRefOpt]),
+    _ = Offline andalso start_emission(Node, {rabbit_amqqueue, emit_info_down, [VHostArg, ArgAtoms]},
+                                       [TimeoutOpt, EmissionRefOpt]),
+    display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]);
 
 action(list_exchanges, Node, Args, Opts, Inform, Timeout) ->
     Inform("Listing exchanges", []),
     VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
     ArgAtoms = default_if_empty(Args, [name, type]),
-    call(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]},
-         ArgAtoms, Timeout);
+    call_emitter(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]},
+                 ArgAtoms, [{timeout, Timeout}]);
 
 action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
     Inform("Listing bindings", []),
@@ -646,27 +664,31 @@ action(list_bindings, Node, Args, Opts, Inform, Timeout) ->
     ArgAtoms = default_if_empty(Args, [source_name, source_kind,
                                        destination_name, destination_kind,
                                        routing_key, arguments]),
-    call(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]},
-         ArgAtoms, Timeout);
+    call_emitter(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]},
+                 ArgAtoms, [{timeout, Timeout}]);
 
 action(list_connections, Node, Args, _Opts, Inform, Timeout) ->
     Inform("Listing connections", []),
     ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]),
-    call(Node, {rabbit_networking, connection_info_all, [ArgAtoms]},
-         ArgAtoms, Timeout);
+    Nodes = nodes_in_cluster(Node, Timeout),
+    call_emitter(Node, {rabbit_networking, emit_connection_info_all, [Nodes, ArgAtoms]},
+                 ArgAtoms, [{timeout, Timeout}, {chunks, length(Nodes)}]);
 
 action(list_channels, Node, Args, _Opts, Inform, Timeout) ->
     Inform("Listing channels", []),
     ArgAtoms = default_if_empty(Args, [pid, user, consumer_count,
                                        messages_unacknowledged]),
-    call(Node, {rabbit_channel, info_all, [ArgAtoms]},
-         ArgAtoms, Timeout);
+    Nodes = nodes_in_cluster(Node, Timeout),
+    call_emitter(Node, {rabbit_channel, emit_info_all, [Nodes, ArgAtoms]}, ArgAtoms,
+                 [{timeout, Timeout}, {chunks, length(Nodes)}]);
 
 action(list_consumers, Node, _Args, Opts, Inform, Timeout) ->
     Inform("Listing consumers", []),
     VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
-    call(Node, {rabbit_amqqueue, consumers_all, [VHostArg]},
-         rabbit_amqqueue:consumer_info_keys(), Timeout).
+    Nodes = nodes_in_cluster(Node, Timeout),
+    call_emitter(Node, {rabbit_amqqueue, emit_consumers_all, [Nodes, VHostArg]},
+                 rabbit_amqqueue:consumer_info_keys(),
+                 [{timeout, Timeout}, {chunks, length(Nodes)}]).
 
 format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)).
 
@@ -772,17 +794,18 @@ display_info_message_row(IsEscaped, Result, InfoItemKeys) ->
                        {X, Value} -> Value
                    end, IsEscaped) || X <- InfoItemKeys]).
 
-display_info_message(IsEscaped) ->
+display_info_message(IsEscaped, InfoItemKeys) ->
     fun ([], _) ->
             ok;
-        ([FirstResult|_] = List, InfoItemKeys) when is_list(FirstResult) ->
+        ([FirstResult|_] = List, _) when is_list(FirstResult) ->
             lists:foreach(fun(Result) ->
                                   display_info_message_row(IsEscaped, Result, InfoItemKeys)
                           end,
                           List),
             ok;
-        (Result, InfoItemKeys) ->
-            display_info_message_row(IsEscaped, Result, InfoItemKeys)
+        (Result, _) ->
+            display_info_message_row(IsEscaped, Result, InfoItemKeys),
+            ok
     end.
 
 display_info_list(Results, InfoItemKeys) when is_list(Results) ->
@@ -839,7 +862,10 @@ display_call_result(Node, MFA) ->
     end.
 
 unsafe_rpc(Node, Mod, Fun, Args) ->
-    case rpc_call(Node, Mod, Fun, Args) of
+    unsafe_rpc(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
+
+unsafe_rpc(Node, Mod, Fun, Args, Timeout) ->
+    case rpc_call(Node, Mod, Fun, Args, Timeout) of
         {badrpc, _} = Res -> throw(Res);
         Normal            -> Normal
     end.
@@ -858,33 +884,42 @@ ensure_app_running(Node) ->
 call(Node, {Mod, Fun, Args}) ->
     rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
 
-call(Node, {Mod, Fun, Args}, InfoKeys, Timeout) ->
-    call(Node, {Mod, Fun, Args}, InfoKeys, false, Timeout, false).
+call_emitter(Node, {Mod, Fun, Args}, InfoKeys, Opts) ->
+    Ref = start_emission(Node, {Mod, Fun, Args}, Opts),
+    display_emission_result(Ref, InfoKeys, Opts).
+
+start_emission(Node, {Mod, Fun, Args}, Opts) ->
+    ToBinUtf8 = proplists:get_value(to_bin_utf8, Opts, false),
+    Timeout = proplists:get_value(timeout, Opts, infinity),
+    Ref = proplists:get_value(ref, Opts, make_ref()),
+    rabbit_control_misc:spawn_emitter_caller(
+      Node, Mod, Fun, prepare_call_args(Args, ToBinUtf8),
+      Ref, self(), Timeout),
+    Ref.
+
+display_emission_result(Ref, InfoKeys, Opts) ->
+    IsEscaped = proplists:get_value(is_escaped, Opts, false),
+    Chunks = proplists:get_value(chunks, Opts, 1),
+    Timeout = proplists:get_value(timeout, Opts, infinity),
+    EmissionStatus = rabbit_control_misc:wait_for_info_messages(
+                       self(), Ref, display_info_message(IsEscaped, InfoKeys), ok, Timeout, Chunks),
+    emission_to_action_result(EmissionStatus).
+
+%% Convert rabbit_control_misc:wait_for_info_messages/6 return value
+%% into form expected by rabbit_cli:main/3.
+emission_to_action_result({ok, ok}) ->
+    ok;
+emission_to_action_result({error, Error}) ->
+    Error.
 
-call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout) ->
-    call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, false).
+prepare_call_args(Args, ToBinUtf8) ->
+    case ToBinUtf8 of
+        true  -> valid_utf8_args(Args);
+        false -> Args
+    end.
 
-call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, IsEscaped) ->
-    Args0 = case ToBinUtf8 of
-                true  -> lists:map(fun list_to_binary_utf8/1, Args);
-                false -> Args
-            end,
-    Ref = make_ref(),
-    Pid = self(),
-    spawn_link(
-      fun () ->
-              case rabbit_cli:rpc_call(Node, Mod, Fun, Args0,
-                                       Ref, Pid, Timeout) of
-                  {error, _} = Error        ->
-                      Pid ! {error, Error};
-                  {bad_argument, _} = Error ->
-                      Pid ! {error, Error};
-                  _                         ->
-                      ok
-              end
-      end),
-    rabbit_control_misc:wait_for_info_messages(
-      Pid, Ref, InfoKeys, display_info_message(IsEscaped), Timeout).
+valid_utf8_args(Args) ->
+    lists:map(fun list_to_binary_utf8/1, Args).
 
 list_to_binary_utf8(L) ->
     B = list_to_binary(L),
@@ -934,7 +969,10 @@ split_list([_])        -> exit(even_list_needed);
 split_list([A, B | T]) -> [{A, B} | split_list(T)].
 
 nodes_in_cluster(Node) ->
-    unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]).
+    unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], ?RPC_TIMEOUT).
+
+nodes_in_cluster(Node, Timeout) ->
+    unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], Timeout).
 
 alarms_by_node(Name) ->
     Status = unsafe_rpc(Name, rabbit, status, []),