From: Alexey Lebedeff Date: Wed, 9 Mar 2016 18:09:04 +0300 Subject: [PATCH] Avoid RPC roundtrips in list commands Current implementation of various `list_XXX` commands require cross-node roundtrip for every processed item - because `rabbitmqctl` target node is responsible for gathering global list of all items of interest (channels etc.) and then processing them one by one. For example, listing 10000 channels evenly distributed across 3 nodes where network has 1ms delay takes more than 10 seconds on my machine. And with the proposed change listing will take almost the same time as it'll take to gather this info locally. E.g. in the case above listing now takes 0.7 second on the same machine with same 1ms delay. It works by invoking emitting_map on every node, where it should send info about only local items to aggregator, in an async fashion - as no reply from aggregator is needed. diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index fb3da21..1a98dcc 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -23,7 +23,7 @@ sync_queue/1, cancel_sync_queue/1, become/1, purge_queue/1]). --import(rabbit_misc, [rpc_call/4, rpc_call/5, rpc_call/7]). +-import(rabbit_misc, [rpc_call/4, rpc_call/5]). -define(EXTERNAL_CHECK_INTERVAL, 1000). @@ -589,56 +589,74 @@ action(purge_queue, Node, [Q], Opts, Inform, Timeout) -> action(list_users, Node, [], _Opts, Inform, Timeout) -> Inform("Listing users", []), - call(Node, {rabbit_auth_backend_internal, list_users, []}, - rabbit_auth_backend_internal:user_info_keys(), true, Timeout); + call_emitter(Node, {rabbit_auth_backend_internal, list_users, []}, + rabbit_auth_backend_internal:user_info_keys(), + [{timeout, Timeout}, to_bin_utf8]); action(list_permissions, Node, [], Opts, Inform, Timeout) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Listing permissions in vhost \"~s\"", [VHost]), - call(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]}, - rabbit_auth_backend_internal:vhost_perms_info_keys(), true, Timeout, - true); + call_emitter(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]}, + rabbit_auth_backend_internal:vhost_perms_info_keys(), + [{timeout, Timeout}, to_bin_utf8, is_escaped]); action(list_parameters, Node, [], Opts, Inform, Timeout) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Listing runtime parameters", []), - call(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]}, - rabbit_runtime_parameters:info_keys(), Timeout); + call_emitter(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]}, + rabbit_runtime_parameters:info_keys(), + [{timeout, Timeout}]); action(list_policies, Node, [], Opts, Inform, Timeout) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Listing policies", []), - call(Node, {rabbit_policy, list_formatted, [VHostArg]}, - rabbit_policy:info_keys(), Timeout); + call_emitter(Node, {rabbit_policy, list_formatted, [VHostArg]}, + rabbit_policy:info_keys(), + [{timeout, Timeout}]); action(list_vhosts, Node, Args, _Opts, Inform, Timeout) -> Inform("Listing vhosts", []), ArgAtoms = default_if_empty(Args, [name]), - call(Node, {rabbit_vhost, info_all, []}, ArgAtoms, true, Timeout); + call_emitter(Node, {rabbit_vhost, info_all, []}, ArgAtoms, + [{timeout, Timeout}, to_bin_utf8]); action(list_user_permissions, _Node, _Args = [], _Opts, _Inform, _Timeout) -> {error_string, "list_user_permissions expects a username argument, but none provided."}; action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) -> Inform("Listing permissions for user ~p", Args), - call(Node, {rabbit_auth_backend_internal, list_user_permissions, Args}, - rabbit_auth_backend_internal:user_perms_info_keys(), true, Timeout, - true); + call_emitter(Node, {rabbit_auth_backend_internal, list_user_permissions, Args}, + rabbit_auth_backend_internal:user_perms_info_keys(), + [{timeout, Timeout}, to_bin_utf8, is_escaped]); action(list_queues, Node, Args, Opts, Inform, Timeout) -> - [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]), Inform("Listing queues", []), + %% User options + [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), ArgAtoms = default_if_empty(Args, [name, messages]), - call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms, Online, Offline]}, - ArgAtoms, Timeout); + + %% Data for emission + Nodes = nodes_in_cluster(Node, Timeout), + OnlineChunks = if Online -> length(Nodes); true -> 0 end, + OfflineChunks = if Offline -> 1; true -> 0 end, + ChunksOpt = {chunks, OnlineChunks + OfflineChunks}, + TimeoutOpt = {timeout, Timeout}, + EmissionRef = make_ref(), + EmissionRefOpt = {ref, EmissionRef}, + + _ = Online andalso start_emission(Node, {rabbit_amqqueue, emit_info_all, [Nodes, VHostArg, ArgAtoms]}, + [TimeoutOpt, EmissionRefOpt]), + _ = Offline andalso start_emission(Node, {rabbit_amqqueue, emit_info_down, [VHostArg, ArgAtoms]}, + [TimeoutOpt, EmissionRefOpt]), + display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]); action(list_exchanges, Node, Args, Opts, Inform, Timeout) -> Inform("Listing exchanges", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), ArgAtoms = default_if_empty(Args, [name, type]), - call(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]}, - ArgAtoms, Timeout); + call_emitter(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]}, + ArgAtoms, [{timeout, Timeout}]); action(list_bindings, Node, Args, Opts, Inform, Timeout) -> Inform("Listing bindings", []), @@ -646,27 +664,31 @@ action(list_bindings, Node, Args, Opts, Inform, Timeout) -> ArgAtoms = default_if_empty(Args, [source_name, source_kind, destination_name, destination_kind, routing_key, arguments]), - call(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]}, - ArgAtoms, Timeout); + call_emitter(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]}, + ArgAtoms, [{timeout, Timeout}]); action(list_connections, Node, Args, _Opts, Inform, Timeout) -> Inform("Listing connections", []), ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]), - call(Node, {rabbit_networking, connection_info_all, [ArgAtoms]}, - ArgAtoms, Timeout); + Nodes = nodes_in_cluster(Node, Timeout), + call_emitter(Node, {rabbit_networking, emit_connection_info_all, [Nodes, ArgAtoms]}, + ArgAtoms, [{timeout, Timeout}, {chunks, length(Nodes)}]); action(list_channels, Node, Args, _Opts, Inform, Timeout) -> Inform("Listing channels", []), ArgAtoms = default_if_empty(Args, [pid, user, consumer_count, messages_unacknowledged]), - call(Node, {rabbit_channel, info_all, [ArgAtoms]}, - ArgAtoms, Timeout); + Nodes = nodes_in_cluster(Node, Timeout), + call_emitter(Node, {rabbit_channel, emit_info_all, [Nodes, ArgAtoms]}, ArgAtoms, + [{timeout, Timeout}, {chunks, length(Nodes)}]); action(list_consumers, Node, _Args, Opts, Inform, Timeout) -> Inform("Listing consumers", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - call(Node, {rabbit_amqqueue, consumers_all, [VHostArg]}, - rabbit_amqqueue:consumer_info_keys(), Timeout). + Nodes = nodes_in_cluster(Node, Timeout), + call_emitter(Node, {rabbit_amqqueue, emit_consumers_all, [Nodes, VHostArg]}, + rabbit_amqqueue:consumer_info_keys(), + [{timeout, Timeout}, {chunks, length(Nodes)}]). format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). @@ -772,17 +794,18 @@ display_info_message_row(IsEscaped, Result, InfoItemKeys) -> {X, Value} -> Value end, IsEscaped) || X <- InfoItemKeys]). -display_info_message(IsEscaped) -> +display_info_message(IsEscaped, InfoItemKeys) -> fun ([], _) -> ok; - ([FirstResult|_] = List, InfoItemKeys) when is_list(FirstResult) -> + ([FirstResult|_] = List, _) when is_list(FirstResult) -> lists:foreach(fun(Result) -> display_info_message_row(IsEscaped, Result, InfoItemKeys) end, List), ok; - (Result, InfoItemKeys) -> - display_info_message_row(IsEscaped, Result, InfoItemKeys) + (Result, _) -> + display_info_message_row(IsEscaped, Result, InfoItemKeys), + ok end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> @@ -839,7 +862,10 @@ display_call_result(Node, MFA) -> end. unsafe_rpc(Node, Mod, Fun, Args) -> - case rpc_call(Node, Mod, Fun, Args) of + unsafe_rpc(Node, Mod, Fun, Args, ?RPC_TIMEOUT). + +unsafe_rpc(Node, Mod, Fun, Args, Timeout) -> + case rpc_call(Node, Mod, Fun, Args, Timeout) of {badrpc, _} = Res -> throw(Res); Normal -> Normal end. @@ -858,33 +884,42 @@ ensure_app_running(Node) -> call(Node, {Mod, Fun, Args}) -> rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)). -call(Node, {Mod, Fun, Args}, InfoKeys, Timeout) -> - call(Node, {Mod, Fun, Args}, InfoKeys, false, Timeout, false). +call_emitter(Node, {Mod, Fun, Args}, InfoKeys, Opts) -> + Ref = start_emission(Node, {Mod, Fun, Args}, Opts), + display_emission_result(Ref, InfoKeys, Opts). + +start_emission(Node, {Mod, Fun, Args}, Opts) -> + ToBinUtf8 = proplists:get_value(to_bin_utf8, Opts, false), + Timeout = proplists:get_value(timeout, Opts, infinity), + Ref = proplists:get_value(ref, Opts, make_ref()), + rabbit_control_misc:spawn_emitter_caller( + Node, Mod, Fun, prepare_call_args(Args, ToBinUtf8), + Ref, self(), Timeout), + Ref. + +display_emission_result(Ref, InfoKeys, Opts) -> + IsEscaped = proplists:get_value(is_escaped, Opts, false), + Chunks = proplists:get_value(chunks, Opts, 1), + Timeout = proplists:get_value(timeout, Opts, infinity), + EmissionStatus = rabbit_control_misc:wait_for_info_messages( + self(), Ref, display_info_message(IsEscaped, InfoKeys), ok, Timeout, Chunks), + emission_to_action_result(EmissionStatus). + +%% Convert rabbit_control_misc:wait_for_info_messages/6 return value +%% into form expected by rabbit_cli:main/3. +emission_to_action_result({ok, ok}) -> + ok; +emission_to_action_result({error, Error}) -> + Error. -call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout) -> - call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, false). +prepare_call_args(Args, ToBinUtf8) -> + case ToBinUtf8 of + true -> valid_utf8_args(Args); + false -> Args + end. -call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, IsEscaped) -> - Args0 = case ToBinUtf8 of - true -> lists:map(fun list_to_binary_utf8/1, Args); - false -> Args - end, - Ref = make_ref(), - Pid = self(), - spawn_link( - fun () -> - case rabbit_cli:rpc_call(Node, Mod, Fun, Args0, - Ref, Pid, Timeout) of - {error, _} = Error -> - Pid ! {error, Error}; - {bad_argument, _} = Error -> - Pid ! {error, Error}; - _ -> - ok - end - end), - rabbit_control_misc:wait_for_info_messages( - Pid, Ref, InfoKeys, display_info_message(IsEscaped), Timeout). +valid_utf8_args(Args) -> + lists:map(fun list_to_binary_utf8/1, Args). list_to_binary_utf8(L) -> B = list_to_binary(L), @@ -934,7 +969,10 @@ split_list([_]) -> exit(even_list_needed); split_list([A, B | T]) -> [{A, B} | split_list(T)]. nodes_in_cluster(Node) -> - unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]). + unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], ?RPC_TIMEOUT). + +nodes_in_cluster(Node, Timeout) -> + unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], Timeout). alarms_by_node(Name) -> Status = unsafe_rpc(Name, rabbit, status, []),