From 5b2912aeb7074fd47e296dd6abb0dc4a6c6bffa5 Mon Sep 17 00:00:00 2001 From: Peter Lemenkov Date: Jul 15 2016 15:08:13 +0000 Subject: Speedups and fixes for IPv6 - Avoid RPC roundtrips in list commands - Use proto_dist from config instead of always using default (inet_tcp) Signed-off-by: Peter Lemenkov --- diff --git a/rabbitmq-common-0001-Avoid-RPC-roundtrips-while-listing-items.patch b/rabbitmq-common-0001-Avoid-RPC-roundtrips-while-listing-items.patch new file mode 100644 index 0000000..48f7365 --- /dev/null +++ b/rabbitmq-common-0001-Avoid-RPC-roundtrips-while-listing-items.patch @@ -0,0 +1,393 @@ +From: Alexey Lebedeff +Date: Wed, 9 Mar 2016 14:55:02 +0300 +Subject: [PATCH] Avoid RPC roundtrips while listing items + +- Emit info about particular items in parallel on every node, with + results delivered directly to a `rabbitmqctl` instance. +- `rabbit_control_misc:wait_for_info_messages/5` can wait for results of + more than one emitting map. +- Stop passing arround InfoItemKeys in + `rabbit_control_misc:wait_for_info_messages/5`, the same information + could be directly encoded in DisplayFun closure. +- Add `emit` to function names, to avoid confusion with regular ones + which return result directly. + +Part of https://github.com/rabbitmq/rabbitmq-server/pull/683 + +diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl +index 6d4e52e..60e94c0 100644 +--- a/src/rabbit_amqqueue.erl ++++ b/src/rabbit_amqqueue.erl +@@ -25,10 +25,10 @@ + check_exclusive_access/2, with_exclusive_access_or_die/3, + stat/1, deliver/2, requeue/3, ack/3, reject/4]). + -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, +- info_all/6]). ++ emit_info_all/5, list_local/1]). + -export([list_down/1]). + -export([force_event_refresh/1, notify_policy_changed/1]). +--export([consumers/1, consumers_all/1, consumers_all/3, consumer_info_keys/0]). ++-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]). + -export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). + -export([notify_sent/2, notify_sent_queue_down/1, resume/2]). + -export([notify_down_all/2, notify_down_all/3, activate_limit_all/2, credit/5]). +@@ -41,7 +41,8 @@ + + %% internal + -export([internal_declare/2, internal_delete/1, run_backing_queue/3, +- set_ram_duration_target/2, set_maximum_since_use/2]). ++ set_ram_duration_target/2, set_maximum_since_use/2, ++ emit_info_local/4, emit_info_down/4, emit_consumers_local/3]). + + -include("rabbit.hrl"). + -include_lib("stdlib/include/qlc.hrl"). +@@ -117,10 +118,6 @@ + -spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()]. + -spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) -> + [rabbit_types:infos()]. +--spec info_all +- (rabbit_types:vhost(), rabbit_types:info_keys(), boolean(), boolean(), +- reference(), pid()) -> +- 'ok'. + -spec force_event_refresh(reference()) -> 'ok'. + -spec notify_policy_changed(rabbit_types:amqqueue()) -> 'ok'. + -spec consumers(rabbit_types:amqqueue()) -> +@@ -130,7 +127,6 @@ + -spec consumers_all(rabbit_types:vhost()) -> + [{name(), pid(), rabbit_types:ctag(), boolean(), + non_neg_integer(), rabbit_framing:amqp_table()}]. +--spec consumers_all(rabbit_types:vhost(), reference(), pid()) -> 'ok'. + -spec stat(rabbit_types:amqqueue()) -> + {'ok', non_neg_integer(), non_neg_integer()}. + -spec delete_immediately(qpids()) -> 'ok'. +@@ -627,16 +623,23 @@ info_all(VHostPath, Items) -> + map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++ + map(list_down(VHostPath), fun (Q) -> info_down(Q, Items, down) end). + +-info_all(VHostPath, Items, NeedOnline, NeedOffline, Ref, AggregatorPid) -> +- NeedOnline andalso rabbit_control_misc:emitting_map_with_exit_handler( +- AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, list(VHostPath), +- continue), +- NeedOffline andalso rabbit_control_misc:emitting_map_with_exit_handler( +- AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end, +- list_down(VHostPath), +- continue), +- %% Previous maps are incomplete, finalize emission +- rabbit_control_misc:emitting_map(AggregatorPid, Ref, fun(_) -> no_op end, []). ++emit_info_local(VHostPath, Items, Ref, AggregatorPid) -> ++ rabbit_control_misc:emitting_map_with_exit_handler( ++ AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, list_local(VHostPath)). ++ ++emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) -> ++ Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, AggregatorPid]) || Node <- Nodes ], ++ rabbit_control_misc:await_emitters_termination(Pids). ++ ++emit_info_down(VHostPath, Items, Ref, AggregatorPid) -> ++ rabbit_control_misc:emitting_map_with_exit_handler( ++ AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end, ++ list_down(VHostPath)). ++ ++list_local(VHostPath) -> ++ [ Q || #amqqueue{state = State, pid=QPid} = Q <- list(VHostPath), ++ State =/= crashed, ++ node() =:= node(QPid) ]. + + force_event_refresh(Ref) -> + [gen_server2:cast(Q#amqqueue.pid, +@@ -656,12 +659,17 @@ consumers_all(VHostPath) -> + map(list(VHostPath), + fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end)). + +-consumers_all(VHostPath, Ref, AggregatorPid) -> ++emit_consumers_all(Nodes, VHostPath, Ref, AggregatorPid) -> ++ Pids = [ spawn_link(Node, rabbit_amqqueue, emit_consumers_local, [VHostPath, Ref, AggregatorPid]) || Node <- Nodes ], ++ rabbit_control_misc:await_emitters_termination(Pids), ++ ok. ++ ++emit_consumers_local(VHostPath, Ref, AggregatorPid) -> + ConsumerInfoKeys = consumer_info_keys(), + rabbit_control_misc:emitting_map( + AggregatorPid, Ref, + fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end, +- list(VHostPath)). ++ list_local(VHostPath)). + + get_queue_consumer_info(Q, ConsumerInfoKeys) -> + [lists:zip(ConsumerInfoKeys, +diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl +index 53b9fe8..e115b76 100644 +--- a/src/rabbit_channel.erl ++++ b/src/rabbit_channel.erl +@@ -56,7 +56,7 @@ + -export([send_command/2, deliver/4, deliver_reply/2, + send_credit_reply/2, send_drained/2]). + -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1, +- info_all/3]). ++ emit_info_all/4]). + -export([refresh_config_local/0, ready_for_close/1]). + -export([force_event_refresh/1]). + +@@ -64,7 +64,7 @@ + handle_info/2, handle_pre_hibernate/1, prioritise_call/4, + prioritise_cast/3, prioritise_info/3, format_message_queue/2]). + %% Internal +--export([list_local/0, deliver_reply_local/3]). ++-export([list_local/0, emit_info_local/3, deliver_reply_local/3]). + -export([get_vhost/1, get_user/1]). + + -record(ch, { +@@ -220,7 +220,6 @@ + -spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos(). + -spec info_all() -> [rabbit_types:infos()]. + -spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()]. +--spec info_all(rabbit_types:info_keys(), reference(), pid()) -> 'ok'. + -spec refresh_config_local() -> 'ok'. + -spec ready_for_close(pid()) -> 'ok'. + -spec force_event_refresh(reference()) -> 'ok'. +@@ -326,9 +325,16 @@ info_all() -> + info_all(Items) -> + rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). + +-info_all(Items, Ref, AggregatorPid) -> ++emit_info_all(Nodes, Items, Ref, AggregatorPid) -> ++ Pids = [ spawn_link(Node, rabbit_channel, emit_info_local, [Items, Ref, AggregatorPid]) || Node <- Nodes ], ++ rabbit_control_misc:await_emitters_termination(Pids). ++ ++emit_info_local(Items, Ref, AggregatorPid) -> ++ emit_info(list_local(), Items, Ref, AggregatorPid). ++ ++emit_info(PidList, InfoItems, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map_with_exit_handler( +- AggregatorPid, Ref, fun(C) -> info(C, Items) end, list()). ++ AggregatorPid, Ref, fun(C) -> info(C, InfoItems) end, PidList). + + refresh_config_local() -> + rabbit_misc:upmap( +diff --git a/src/rabbit_control_misc.erl b/src/rabbit_control_misc.erl +index 0d2de1f..2e7a16c 100644 +--- a/src/rabbit_control_misc.erl ++++ b/src/rabbit_control_misc.erl +@@ -17,7 +17,8 @@ + -module(rabbit_control_misc). + + -export([emitting_map/4, emitting_map/5, emitting_map_with_exit_handler/4, +- emitting_map_with_exit_handler/5, wait_for_info_messages/5, ++ emitting_map_with_exit_handler/5, wait_for_info_messages/6, ++ spawn_emitter_caller/7, await_emitters_termination/1, + print_cmd_result/2]). + + -spec emitting_map(pid(), reference(), fun(), list()) -> 'ok'. +@@ -25,7 +26,14 @@ + -spec emitting_map_with_exit_handler + (pid(), reference(), fun(), list()) -> 'ok'. + -spec emitting_map_with_exit_handler +- (pid(), reference(), fun(), list(), atom()) -> 'ok'. ++ (pid(), reference(), fun(), list(), 'continue') -> 'ok'. ++ ++-type fold_fun() :: fun ((term(), term()) -> term()). ++ ++-spec wait_for_info_messages (pid(), reference(), fold_fun(), term(), timeout(), non_neg_integer()) -> {'ok', term()} | {'error', term()}. ++-spec spawn_emitter_caller (node(), module(), atom(), [term()], reference(), pid(), timeout()) -> 'ok'. ++-spec await_emitters_termination ([pid()]) -> 'ok'. ++ + -spec print_cmd_result(atom(), term()) -> 'ok'. + + emitting_map(AggregatorPid, Ref, Fun, List) -> +@@ -65,27 +73,108 @@ step_with_exit_handler(AggregatorPid, Ref, Fun, Item) -> + ok + end. + +-wait_for_info_messages(Pid, Ref, ArgAtoms, DisplayFun, Timeout) -> +- _ = notify_if_timeout(Pid, Ref, Timeout), +- wait_for_info_messages(Ref, ArgAtoms, DisplayFun). ++%% Invokes RPC for async info collection in separate (but linked to ++%% the caller) process. Separate process waits for RPC to finish and ++%% in case of errors sends them in wait_for_info_messages/5-compatible ++%% form to aggregator process. Calling process is then expected to ++%% do blocking call of wait_for_info_messages/5. ++%% ++%% Remote function MUST use calls to emitting_map/4 (and other ++%% emitting_map's) to properly deliver requested information to an ++%% aggregator process. ++%% ++%% If for performance reasons several parallel emitting_map's need to ++%% be run, remote function MUST NOT return until all this ++%% emitting_map's are done. And during all this time remote RPC ++%% process MUST be linked to emitting ++%% processes. await_emitters_termination/1 helper can be used as a ++%% last statement of remote function to ensure this behaviour. ++spawn_emitter_caller(Node, Mod, Fun, Args, Ref, Pid, Timeout) -> ++ spawn_monitor( ++ fun () -> ++ case rpc_call_emitter(Node, Mod, Fun, Args, Ref, Pid, Timeout) of ++ {error, _} = Error -> ++ Pid ! {Ref, error, Error}; ++ {bad_argument, _} = Error -> ++ Pid ! {Ref, error, Error}; ++ {badrpc, _} = Error -> ++ Pid ! {Ref, error, Error}; ++ _ -> ++ ok ++ end ++ end), ++ ok. ++ ++rpc_call_emitter(Node, Mod, Fun, Args, Ref, Pid, Timeout) -> ++ rabbit_misc:rpc_call(Node, Mod, Fun, Args++[Ref, Pid], Timeout). ++ ++%% Agregator process expects correct numbers of explicits ACKs about ++%% finished emission process. While everything is linked, we still ++%% need somehow to wait for termination of all emitters before ++%% returning from RPC call - otherwise links will be just broken with ++%% reason 'normal' and we can miss some errors, and subsequentially ++%% hang. ++await_emitters_termination(Pids) -> ++ Monitors = [erlang:monitor(process, Pid) || Pid <- Pids], ++ collect_monitors(Monitors). + +-wait_for_info_messages(Ref, InfoItemKeys, DisplayFun) when is_reference(Ref) -> ++collect_monitors([]) -> ++ ok; ++collect_monitors([Monitor|Rest]) -> + receive +- {Ref, finished} -> +- ok; +- {Ref, {timeout, T}} -> ++ {'DOWN', Monitor, _Pid, normal} -> ++ collect_monitors(Rest); ++ {'DOWN', Monitor, _Pid, noproc} -> ++ %% There is a link and a monitor to a process. Matching ++ %% this clause means that process has gracefully ++ %% terminated even before we've started monitoring. ++ collect_monitors(Rest); ++ {'DOWN', _, Pid, Reason} -> ++ exit({emitter_exit, Pid, Reason}) ++ end. ++ ++%% Wait for result of one or more calls to emitting_map-family ++%% functions. ++%% ++%% Number of expected acknowledgments is specified by ChunkCount ++%% argument. Most common usage will be with ChunkCount equals to ++%% number of live nodes, but it's not mandatory - thus more generic ++%% name of 'ChunkCount' was chosen. ++wait_for_info_messages(Pid, Ref, Fun, Acc0, Timeout, ChunkCount) -> ++ notify_if_timeout(Pid, Ref, Timeout), ++ wait_for_info_messages(Ref, Fun, Acc0, ChunkCount). ++ ++wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft) -> ++ receive ++ {Ref, finished} when ChunksLeft =:= 1 -> ++ {ok, Acc0}; ++ {Ref, finished} -> ++ wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft - 1); ++ {Ref, {timeout, T}} -> + exit({error, {timeout, (T / 1000)}}); +- {Ref, []} -> +- wait_for_info_messages(Ref, InfoItemKeys, DisplayFun); +- {Ref, Result, continue} -> +- DisplayFun(Result, InfoItemKeys), +- wait_for_info_messages(Ref, InfoItemKeys, DisplayFun); +- {error, Error} -> +- Error; +- _ -> +- wait_for_info_messages(Ref, InfoItemKeys, DisplayFun) ++ {Ref, []} -> ++ wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft); ++ {Ref, Result, continue} -> ++ wait_for_info_messages(Ref, Fun, Fun(Result, Acc0), ChunksLeft); ++ {Ref, error, Error} -> ++ {error, simplify_emission_error(Error)}; ++ {'DOWN', _MRef, process, _Pid, normal} -> ++ wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft); ++ {'DOWN', _MRef, process, _Pid, Reason} -> ++ {error, simplify_emission_error(Reason)}; ++ _Msg -> ++ wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft) + end. + ++simplify_emission_error({badrpc, {'EXIT', {{nocatch, EmissionError}, _Stacktrace}}}) -> ++ EmissionError; ++simplify_emission_error({{nocatch, EmissionError}, _Stacktrace}) -> ++ EmissionError; ++simplify_emission_error(Anything) -> ++ {error, Anything}. ++ ++notify_if_timeout(_, _, infinity) -> ++ ok; + notify_if_timeout(Pid, Ref, Timeout) -> + timer:send_after(Timeout, Pid, {Ref, {timeout, Timeout}}). + +diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl +index 9750ba8..02ffeb7 100644 +--- a/src/rabbit_misc.erl ++++ b/src/rabbit_misc.erl +@@ -75,7 +75,7 @@ + -export([get_env/3]). + -export([get_channel_operation_timeout/0]). + -export([random/1]). +--export([rpc_call/4, rpc_call/5, rpc_call/7]). ++-export([rpc_call/4, rpc_call/5]). + -export([report_default_thread_pool_size/0]). + + %% Horrible macro to use in guards +@@ -262,8 +262,6 @@ + -spec random(non_neg_integer()) -> non_neg_integer(). + -spec rpc_call(node(), atom(), atom(), [any()]) -> any(). + -spec rpc_call(node(), atom(), atom(), [any()], number()) -> any(). +--spec rpc_call +- (node(), atom(), atom(), [any()], reference(), pid(), number()) -> any(). + -spec report_default_thread_pool_size() -> 'ok'. + + %%---------------------------------------------------------------------------- +@@ -1173,9 +1171,6 @@ rpc_call(Node, Mod, Fun, Args, Timeout) -> + rpc:call(Node, Mod, Fun, Args, Timeout) + end. + +-rpc_call(Node, Mod, Fun, Args, Ref, Pid, Timeout) -> +- rpc_call(Node, Mod, Fun, Args++[Ref, Pid], Timeout). +- + guess_number_of_cpu_cores() -> + case erlang:system_info(logical_processors_available) of + unknown -> % Happens on Mac OS X. +diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl +index 5bf30ff..63e3ed0 100644 +--- a/src/rabbit_networking.erl ++++ b/src/rabbit_networking.erl +@@ -33,7 +33,8 @@ + node_listeners/1, register_connection/1, unregister_connection/1, + connections/0, connection_info_keys/0, + connection_info/1, connection_info/2, +- connection_info_all/0, connection_info_all/1, connection_info_all/3, ++ connection_info_all/0, connection_info_all/1, ++ emit_connection_info_all/4, emit_connection_info_local/3, + close_connection/2, force_connection_event_refresh/1, tcp_host/1]). + + %% Used by TCP-based transports, e.g. STOMP adapter +@@ -89,8 +90,6 @@ + -spec connection_info_all() -> [rabbit_types:infos()]. + -spec connection_info_all(rabbit_types:info_keys()) -> + [rabbit_types:infos()]. +--spec connection_info_all(rabbit_types:info_keys(), reference(), pid()) -> +- 'ok'. + -spec close_connection(pid(), string()) -> 'ok'. + -spec force_connection_event_refresh(reference()) -> 'ok'. + +@@ -365,10 +364,15 @@ connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items). + connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end). + connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end). + +-connection_info_all(Items, Ref, AggregatorPid) -> ++emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> ++ Pids = [ spawn_link(Node, rabbit_networking, emit_connection_info_local, [Items, Ref, AggregatorPid]) || Node <- Nodes ], ++ rabbit_control_misc:await_emitters_termination(Pids), ++ ok. ++ ++emit_connection_info_local(Items, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map_with_exit_handler( + AggregatorPid, Ref, fun(Q) -> connection_info(Q, Items) end, +- connections()). ++ connections_local()). + + close_connection(Pid, Explanation) -> + rabbit_log:info("Closing connection ~p because ~p~n", [Pid, Explanation]), diff --git a/rabbitmq-common-0002-Use-proto_dist-from-config-instead-of-always-using-d.patch b/rabbitmq-common-0002-Use-proto_dist-from-config-instead-of-always-using-d.patch new file mode 100644 index 0000000..f8c3496 --- /dev/null +++ b/rabbitmq-common-0002-Use-proto_dist-from-config-instead-of-always-using-d.patch @@ -0,0 +1,23 @@ +From: Peter Lemenkov +Date: Fri, 15 Jul 2016 16:01:08 +0200 +Subject: [PATCH] Use proto_dist from config instead of always using default + (inet_tcp) + +Signed-off-by: Peter Lemenkov + +diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl +index 70a5355..6bfce23 100644 +--- a/src/rabbit_nodes.erl ++++ b/src/rabbit_nodes.erl +@@ -221,9 +221,11 @@ set_cluster_name(Name) -> + ensure_epmd() -> + {ok, Prog} = init:get_argument(progname), + ID = rabbit_misc:random(1000000000), ++ ProtoDist = application:get_env(kernel, proto_dist, inet_tcp), + Port = open_port( + {spawn_executable, os:find_executable(Prog)}, + [{args, ["-sname", rabbit_misc:format("epmd-starter-~b", [ID]), ++ "-proto_dist", rabbit_misc:format("~p", [ProtoDist]), + "-noshell", "-eval", "halt()."]}, + exit_status, stderr_to_stdout, use_stdio]), + port_shutdown_loop(Port). diff --git a/rabbitmq-server-0005-Avoid-RPC-roundtrips-in-list-commands.patch b/rabbitmq-server-0005-Avoid-RPC-roundtrips-in-list-commands.patch new file mode 100644 index 0000000..9ab4b98 --- /dev/null +++ b/rabbitmq-server-0005-Avoid-RPC-roundtrips-in-list-commands.patch @@ -0,0 +1,280 @@ +From: Alexey Lebedeff +Date: Wed, 9 Mar 2016 18:09:04 +0300 +Subject: [PATCH] Avoid RPC roundtrips in list commands + +Current implementation of various `list_XXX` commands require cross-node +roundtrip for every processed item - because `rabbitmqctl` target node +is responsible for gathering global list of all items of +interest (channels etc.) and then processing them one by one. + +For example, listing 10000 channels evenly distributed across 3 nodes +where network has 1ms delay takes more than 10 seconds on my +machine. And with the proposed change listing will take almost the same +time as it'll take to gather this info locally. E.g. in the case above +listing now takes 0.7 second on the same machine with same 1ms delay. + +It works by invoking emitting_map on every node, where it should send +info about only local items to aggregator, in an async fashion - as no +reply from aggregator is needed. + +diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl +index fb3da21..1a98dcc 100644 +--- a/src/rabbit_control_main.erl ++++ b/src/rabbit_control_main.erl +@@ -23,7 +23,7 @@ + sync_queue/1, cancel_sync_queue/1, become/1, + purge_queue/1]). + +--import(rabbit_misc, [rpc_call/4, rpc_call/5, rpc_call/7]). ++-import(rabbit_misc, [rpc_call/4, rpc_call/5]). + + -define(EXTERNAL_CHECK_INTERVAL, 1000). + +@@ -589,56 +589,74 @@ action(purge_queue, Node, [Q], Opts, Inform, Timeout) -> + + action(list_users, Node, [], _Opts, Inform, Timeout) -> + Inform("Listing users", []), +- call(Node, {rabbit_auth_backend_internal, list_users, []}, +- rabbit_auth_backend_internal:user_info_keys(), true, Timeout); ++ call_emitter(Node, {rabbit_auth_backend_internal, list_users, []}, ++ rabbit_auth_backend_internal:user_info_keys(), ++ [{timeout, Timeout}, to_bin_utf8]); + + action(list_permissions, Node, [], Opts, Inform, Timeout) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Inform("Listing permissions in vhost \"~s\"", [VHost]), +- call(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]}, +- rabbit_auth_backend_internal:vhost_perms_info_keys(), true, Timeout, +- true); ++ call_emitter(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]}, ++ rabbit_auth_backend_internal:vhost_perms_info_keys(), ++ [{timeout, Timeout}, to_bin_utf8, is_escaped]); + + action(list_parameters, Node, [], Opts, Inform, Timeout) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Listing runtime parameters", []), +- call(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]}, +- rabbit_runtime_parameters:info_keys(), Timeout); ++ call_emitter(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]}, ++ rabbit_runtime_parameters:info_keys(), ++ [{timeout, Timeout}]); + + action(list_policies, Node, [], Opts, Inform, Timeout) -> + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + Inform("Listing policies", []), +- call(Node, {rabbit_policy, list_formatted, [VHostArg]}, +- rabbit_policy:info_keys(), Timeout); ++ call_emitter(Node, {rabbit_policy, list_formatted, [VHostArg]}, ++ rabbit_policy:info_keys(), ++ [{timeout, Timeout}]); + + action(list_vhosts, Node, Args, _Opts, Inform, Timeout) -> + Inform("Listing vhosts", []), + ArgAtoms = default_if_empty(Args, [name]), +- call(Node, {rabbit_vhost, info_all, []}, ArgAtoms, true, Timeout); ++ call_emitter(Node, {rabbit_vhost, info_all, []}, ArgAtoms, ++ [{timeout, Timeout}, to_bin_utf8]); + + action(list_user_permissions, _Node, _Args = [], _Opts, _Inform, _Timeout) -> + {error_string, + "list_user_permissions expects a username argument, but none provided."}; + action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) -> + Inform("Listing permissions for user ~p", Args), +- call(Node, {rabbit_auth_backend_internal, list_user_permissions, Args}, +- rabbit_auth_backend_internal:user_perms_info_keys(), true, Timeout, +- true); ++ call_emitter(Node, {rabbit_auth_backend_internal, list_user_permissions, Args}, ++ rabbit_auth_backend_internal:user_perms_info_keys(), ++ [{timeout, Timeout}, to_bin_utf8, is_escaped]); + + action(list_queues, Node, Args, Opts, Inform, Timeout) -> +- [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]), + Inform("Listing queues", []), ++ %% User options ++ [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + ArgAtoms = default_if_empty(Args, [name, messages]), +- call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms, Online, Offline]}, +- ArgAtoms, Timeout); ++ ++ %% Data for emission ++ Nodes = nodes_in_cluster(Node, Timeout), ++ OnlineChunks = if Online -> length(Nodes); true -> 0 end, ++ OfflineChunks = if Offline -> 1; true -> 0 end, ++ ChunksOpt = {chunks, OnlineChunks + OfflineChunks}, ++ TimeoutOpt = {timeout, Timeout}, ++ EmissionRef = make_ref(), ++ EmissionRefOpt = {ref, EmissionRef}, ++ ++ _ = Online andalso start_emission(Node, {rabbit_amqqueue, emit_info_all, [Nodes, VHostArg, ArgAtoms]}, ++ [TimeoutOpt, EmissionRefOpt]), ++ _ = Offline andalso start_emission(Node, {rabbit_amqqueue, emit_info_down, [VHostArg, ArgAtoms]}, ++ [TimeoutOpt, EmissionRefOpt]), ++ display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]); + + action(list_exchanges, Node, Args, Opts, Inform, Timeout) -> + Inform("Listing exchanges", []), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + ArgAtoms = default_if_empty(Args, [name, type]), +- call(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]}, +- ArgAtoms, Timeout); ++ call_emitter(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]}, ++ ArgAtoms, [{timeout, Timeout}]); + + action(list_bindings, Node, Args, Opts, Inform, Timeout) -> + Inform("Listing bindings", []), +@@ -646,27 +664,31 @@ action(list_bindings, Node, Args, Opts, Inform, Timeout) -> + ArgAtoms = default_if_empty(Args, [source_name, source_kind, + destination_name, destination_kind, + routing_key, arguments]), +- call(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]}, +- ArgAtoms, Timeout); ++ call_emitter(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]}, ++ ArgAtoms, [{timeout, Timeout}]); + + action(list_connections, Node, Args, _Opts, Inform, Timeout) -> + Inform("Listing connections", []), + ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]), +- call(Node, {rabbit_networking, connection_info_all, [ArgAtoms]}, +- ArgAtoms, Timeout); ++ Nodes = nodes_in_cluster(Node, Timeout), ++ call_emitter(Node, {rabbit_networking, emit_connection_info_all, [Nodes, ArgAtoms]}, ++ ArgAtoms, [{timeout, Timeout}, {chunks, length(Nodes)}]); + + action(list_channels, Node, Args, _Opts, Inform, Timeout) -> + Inform("Listing channels", []), + ArgAtoms = default_if_empty(Args, [pid, user, consumer_count, + messages_unacknowledged]), +- call(Node, {rabbit_channel, info_all, [ArgAtoms]}, +- ArgAtoms, Timeout); ++ Nodes = nodes_in_cluster(Node, Timeout), ++ call_emitter(Node, {rabbit_channel, emit_info_all, [Nodes, ArgAtoms]}, ArgAtoms, ++ [{timeout, Timeout}, {chunks, length(Nodes)}]); + + action(list_consumers, Node, _Args, Opts, Inform, Timeout) -> + Inform("Listing consumers", []), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), +- call(Node, {rabbit_amqqueue, consumers_all, [VHostArg]}, +- rabbit_amqqueue:consumer_info_keys(), Timeout). ++ Nodes = nodes_in_cluster(Node, Timeout), ++ call_emitter(Node, {rabbit_amqqueue, emit_consumers_all, [Nodes, VHostArg]}, ++ rabbit_amqqueue:consumer_info_keys(), ++ [{timeout, Timeout}, {chunks, length(Nodes)}]). + + format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). + +@@ -772,17 +794,18 @@ display_info_message_row(IsEscaped, Result, InfoItemKeys) -> + {X, Value} -> Value + end, IsEscaped) || X <- InfoItemKeys]). + +-display_info_message(IsEscaped) -> ++display_info_message(IsEscaped, InfoItemKeys) -> + fun ([], _) -> + ok; +- ([FirstResult|_] = List, InfoItemKeys) when is_list(FirstResult) -> ++ ([FirstResult|_] = List, _) when is_list(FirstResult) -> + lists:foreach(fun(Result) -> + display_info_message_row(IsEscaped, Result, InfoItemKeys) + end, + List), + ok; +- (Result, InfoItemKeys) -> +- display_info_message_row(IsEscaped, Result, InfoItemKeys) ++ (Result, _) -> ++ display_info_message_row(IsEscaped, Result, InfoItemKeys), ++ ok + end. + + display_info_list(Results, InfoItemKeys) when is_list(Results) -> +@@ -839,7 +862,10 @@ display_call_result(Node, MFA) -> + end. + + unsafe_rpc(Node, Mod, Fun, Args) -> +- case rpc_call(Node, Mod, Fun, Args) of ++ unsafe_rpc(Node, Mod, Fun, Args, ?RPC_TIMEOUT). ++ ++unsafe_rpc(Node, Mod, Fun, Args, Timeout) -> ++ case rpc_call(Node, Mod, Fun, Args, Timeout) of + {badrpc, _} = Res -> throw(Res); + Normal -> Normal + end. +@@ -858,33 +884,42 @@ ensure_app_running(Node) -> + call(Node, {Mod, Fun, Args}) -> + rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)). + +-call(Node, {Mod, Fun, Args}, InfoKeys, Timeout) -> +- call(Node, {Mod, Fun, Args}, InfoKeys, false, Timeout, false). ++call_emitter(Node, {Mod, Fun, Args}, InfoKeys, Opts) -> ++ Ref = start_emission(Node, {Mod, Fun, Args}, Opts), ++ display_emission_result(Ref, InfoKeys, Opts). ++ ++start_emission(Node, {Mod, Fun, Args}, Opts) -> ++ ToBinUtf8 = proplists:get_value(to_bin_utf8, Opts, false), ++ Timeout = proplists:get_value(timeout, Opts, infinity), ++ Ref = proplists:get_value(ref, Opts, make_ref()), ++ rabbit_control_misc:spawn_emitter_caller( ++ Node, Mod, Fun, prepare_call_args(Args, ToBinUtf8), ++ Ref, self(), Timeout), ++ Ref. ++ ++display_emission_result(Ref, InfoKeys, Opts) -> ++ IsEscaped = proplists:get_value(is_escaped, Opts, false), ++ Chunks = proplists:get_value(chunks, Opts, 1), ++ Timeout = proplists:get_value(timeout, Opts, infinity), ++ EmissionStatus = rabbit_control_misc:wait_for_info_messages( ++ self(), Ref, display_info_message(IsEscaped, InfoKeys), ok, Timeout, Chunks), ++ emission_to_action_result(EmissionStatus). ++ ++%% Convert rabbit_control_misc:wait_for_info_messages/6 return value ++%% into form expected by rabbit_cli:main/3. ++emission_to_action_result({ok, ok}) -> ++ ok; ++emission_to_action_result({error, Error}) -> ++ Error. + +-call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout) -> +- call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, false). ++prepare_call_args(Args, ToBinUtf8) -> ++ case ToBinUtf8 of ++ true -> valid_utf8_args(Args); ++ false -> Args ++ end. + +-call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, IsEscaped) -> +- Args0 = case ToBinUtf8 of +- true -> lists:map(fun list_to_binary_utf8/1, Args); +- false -> Args +- end, +- Ref = make_ref(), +- Pid = self(), +- spawn_link( +- fun () -> +- case rabbit_cli:rpc_call(Node, Mod, Fun, Args0, +- Ref, Pid, Timeout) of +- {error, _} = Error -> +- Pid ! {error, Error}; +- {bad_argument, _} = Error -> +- Pid ! {error, Error}; +- _ -> +- ok +- end +- end), +- rabbit_control_misc:wait_for_info_messages( +- Pid, Ref, InfoKeys, display_info_message(IsEscaped), Timeout). ++valid_utf8_args(Args) -> ++ lists:map(fun list_to_binary_utf8/1, Args). + + list_to_binary_utf8(L) -> + B = list_to_binary(L), +@@ -934,7 +969,10 @@ split_list([_]) -> exit(even_list_needed); + split_list([A, B | T]) -> [{A, B} | split_list(T)]. + + nodes_in_cluster(Node) -> +- unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]). ++ unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], ?RPC_TIMEOUT). ++ ++nodes_in_cluster(Node, Timeout) -> ++ unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], Timeout). + + alarms_by_node(Name) -> + Status = unsafe_rpc(Name, rabbit, status, []), diff --git a/rabbitmq-server.spec b/rabbitmq-server.spec index e9df6e3..f7fbb1e 100644 --- a/rabbitmq-server.spec +++ b/rabbitmq-server.spec @@ -5,7 +5,7 @@ Name: rabbitmq-server Version: 3.6.3 -Release: 1%{?dist} +Release: 2%{?dist} License: MPLv1.1 Group: Development/Libraries Source0: http://www.rabbitmq.com/releases/rabbitmq-server/v%{version}/%{name}-%{version}.tar.xz @@ -20,6 +20,10 @@ Patch1: rabbitmq-server-0001-Remove-excessive-sd_notify-code.patch Patch2: rabbitmq-server-0002-Add-systemd-notification-support.patch Patch3: rabbitmq-server-0003-Revert-Distinct-exit-codes-for-CLI-utilities.patch Patch4: rabbitmq-server-0004-Allow-guest-login-from-non-loopback-connections.patch +Patch5: rabbitmq-server-0005-Avoid-RPC-roundtrips-in-list-commands.patch +Patch101: rabbitmq-common-0001-Avoid-RPC-roundtrips-while-listing-items.patch +Patch102: rabbitmq-common-0002-Use-proto_dist-from-config-instead-of-always-using-d.patch + URL: http://www.rabbitmq.com/ BuildArch: noarch BuildRequires: erlang >= %{erlang_minver}, python-simplejson, xmlto, libxslt, python, zip @@ -56,6 +60,12 @@ scalable implementation of an AMQP broker. %patch2 -p1 %patch3 -p1 %patch4 -p1 +%patch5 -p1 + +cd deps/rabbit_common +%patch101 -p1 +%patch102 -p1 +cd ../.. # We have to remove it until common_test subpackage lands RHOS rm -f \ @@ -171,6 +181,10 @@ done %changelog +* Fri Jul 15 2016 Peter Lemenkov - 3.6.3-2 +- Avoid RPC roundtrips in list commands +- Use proto_dist from config instead of always using default (inet_tcp) + * Thu Jul 7 2016 Peter Lemenkov - 3.6.3-1 - Ver. 3.6.3