From: Alexey Lebedeff Date: Wed, 9 Mar 2016 14:55:02 +0300 Subject: [PATCH] Avoid RPC roundtrips while listing items - Emit info about particular items in parallel on every node, with results delivered directly to a `rabbitmqctl` instance. - `rabbit_control_misc:wait_for_info_messages/5` can wait for results of more than one emitting map. - Stop passing arround InfoItemKeys in `rabbit_control_misc:wait_for_info_messages/5`, the same information could be directly encoded in DisplayFun closure. - Add `emit` to function names, to avoid confusion with regular ones which return result directly. Part of https://github.com/rabbitmq/rabbitmq-server/pull/683 diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6d4e52e..60e94c0 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -25,10 +25,10 @@ check_exclusive_access/2, with_exclusive_access_or_die/3, stat/1, deliver/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, - info_all/6]). + emit_info_all/5, list_local/1]). -export([list_down/1]). -export([force_event_refresh/1, notify_policy_changed/1]). --export([consumers/1, consumers_all/1, consumers_all/3, consumer_info_keys/0]). +-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]). -export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2]). -export([notify_down_all/2, notify_down_all/3, activate_limit_all/2, credit/5]). @@ -41,7 +41,8 @@ %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, - set_ram_duration_target/2, set_maximum_since_use/2]). + set_ram_duration_target/2, set_maximum_since_use/2, + emit_info_local/4, emit_info_down/4, emit_consumers_local/3]). -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -117,10 +118,6 @@ -spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()]. -spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) -> [rabbit_types:infos()]. --spec info_all - (rabbit_types:vhost(), rabbit_types:info_keys(), boolean(), boolean(), - reference(), pid()) -> - 'ok'. -spec force_event_refresh(reference()) -> 'ok'. -spec notify_policy_changed(rabbit_types:amqqueue()) -> 'ok'. -spec consumers(rabbit_types:amqqueue()) -> @@ -130,7 +127,6 @@ -spec consumers_all(rabbit_types:vhost()) -> [{name(), pid(), rabbit_types:ctag(), boolean(), non_neg_integer(), rabbit_framing:amqp_table()}]. --spec consumers_all(rabbit_types:vhost(), reference(), pid()) -> 'ok'. -spec stat(rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}. -spec delete_immediately(qpids()) -> 'ok'. @@ -627,16 +623,23 @@ info_all(VHostPath, Items) -> map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++ map(list_down(VHostPath), fun (Q) -> info_down(Q, Items, down) end). -info_all(VHostPath, Items, NeedOnline, NeedOffline, Ref, AggregatorPid) -> - NeedOnline andalso rabbit_control_misc:emitting_map_with_exit_handler( - AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, list(VHostPath), - continue), - NeedOffline andalso rabbit_control_misc:emitting_map_with_exit_handler( - AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end, - list_down(VHostPath), - continue), - %% Previous maps are incomplete, finalize emission - rabbit_control_misc:emitting_map(AggregatorPid, Ref, fun(_) -> no_op end, []). +emit_info_local(VHostPath, Items, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map_with_exit_handler( + AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, list_local(VHostPath)). + +emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) -> + Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, AggregatorPid]) || Node <- Nodes ], + rabbit_control_misc:await_emitters_termination(Pids). + +emit_info_down(VHostPath, Items, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map_with_exit_handler( + AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end, + list_down(VHostPath)). + +list_local(VHostPath) -> + [ Q || #amqqueue{state = State, pid=QPid} = Q <- list(VHostPath), + State =/= crashed, + node() =:= node(QPid) ]. force_event_refresh(Ref) -> [gen_server2:cast(Q#amqqueue.pid, @@ -656,12 +659,17 @@ consumers_all(VHostPath) -> map(list(VHostPath), fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end)). -consumers_all(VHostPath, Ref, AggregatorPid) -> +emit_consumers_all(Nodes, VHostPath, Ref, AggregatorPid) -> + Pids = [ spawn_link(Node, rabbit_amqqueue, emit_consumers_local, [VHostPath, Ref, AggregatorPid]) || Node <- Nodes ], + rabbit_control_misc:await_emitters_termination(Pids), + ok. + +emit_consumers_local(VHostPath, Ref, AggregatorPid) -> ConsumerInfoKeys = consumer_info_keys(), rabbit_control_misc:emitting_map( AggregatorPid, Ref, fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end, - list(VHostPath)). + list_local(VHostPath)). get_queue_consumer_info(Q, ConsumerInfoKeys) -> [lists:zip(ConsumerInfoKeys, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 53b9fe8..e115b76 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -56,7 +56,7 @@ -export([send_command/2, deliver/4, deliver_reply/2, send_credit_reply/2, send_drained/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1, - info_all/3]). + emit_info_all/4]). -export([refresh_config_local/0, ready_for_close/1]). -export([force_event_refresh/1]). @@ -64,7 +64,7 @@ handle_info/2, handle_pre_hibernate/1, prioritise_call/4, prioritise_cast/3, prioritise_info/3, format_message_queue/2]). %% Internal --export([list_local/0, deliver_reply_local/3]). +-export([list_local/0, emit_info_local/3, deliver_reply_local/3]). -export([get_vhost/1, get_user/1]). -record(ch, { @@ -220,7 +220,6 @@ -spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos(). -spec info_all() -> [rabbit_types:infos()]. -spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()]. --spec info_all(rabbit_types:info_keys(), reference(), pid()) -> 'ok'. -spec refresh_config_local() -> 'ok'. -spec ready_for_close(pid()) -> 'ok'. -spec force_event_refresh(reference()) -> 'ok'. @@ -326,9 +325,16 @@ info_all() -> info_all(Items) -> rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). -info_all(Items, Ref, AggregatorPid) -> +emit_info_all(Nodes, Items, Ref, AggregatorPid) -> + Pids = [ spawn_link(Node, rabbit_channel, emit_info_local, [Items, Ref, AggregatorPid]) || Node <- Nodes ], + rabbit_control_misc:await_emitters_termination(Pids). + +emit_info_local(Items, Ref, AggregatorPid) -> + emit_info(list_local(), Items, Ref, AggregatorPid). + +emit_info(PidList, InfoItems, Ref, AggregatorPid) -> rabbit_control_misc:emitting_map_with_exit_handler( - AggregatorPid, Ref, fun(C) -> info(C, Items) end, list()). + AggregatorPid, Ref, fun(C) -> info(C, InfoItems) end, PidList). refresh_config_local() -> rabbit_misc:upmap( diff --git a/src/rabbit_control_misc.erl b/src/rabbit_control_misc.erl index 0d2de1f..2e7a16c 100644 --- a/src/rabbit_control_misc.erl +++ b/src/rabbit_control_misc.erl @@ -17,7 +17,8 @@ -module(rabbit_control_misc). -export([emitting_map/4, emitting_map/5, emitting_map_with_exit_handler/4, - emitting_map_with_exit_handler/5, wait_for_info_messages/5, + emitting_map_with_exit_handler/5, wait_for_info_messages/6, + spawn_emitter_caller/7, await_emitters_termination/1, print_cmd_result/2]). -spec emitting_map(pid(), reference(), fun(), list()) -> 'ok'. @@ -25,7 +26,14 @@ -spec emitting_map_with_exit_handler (pid(), reference(), fun(), list()) -> 'ok'. -spec emitting_map_with_exit_handler - (pid(), reference(), fun(), list(), atom()) -> 'ok'. + (pid(), reference(), fun(), list(), 'continue') -> 'ok'. + +-type fold_fun() :: fun ((term(), term()) -> term()). + +-spec wait_for_info_messages (pid(), reference(), fold_fun(), term(), timeout(), non_neg_integer()) -> {'ok', term()} | {'error', term()}. +-spec spawn_emitter_caller (node(), module(), atom(), [term()], reference(), pid(), timeout()) -> 'ok'. +-spec await_emitters_termination ([pid()]) -> 'ok'. + -spec print_cmd_result(atom(), term()) -> 'ok'. emitting_map(AggregatorPid, Ref, Fun, List) -> @@ -65,27 +73,108 @@ step_with_exit_handler(AggregatorPid, Ref, Fun, Item) -> ok end. -wait_for_info_messages(Pid, Ref, ArgAtoms, DisplayFun, Timeout) -> - _ = notify_if_timeout(Pid, Ref, Timeout), - wait_for_info_messages(Ref, ArgAtoms, DisplayFun). +%% Invokes RPC for async info collection in separate (but linked to +%% the caller) process. Separate process waits for RPC to finish and +%% in case of errors sends them in wait_for_info_messages/5-compatible +%% form to aggregator process. Calling process is then expected to +%% do blocking call of wait_for_info_messages/5. +%% +%% Remote function MUST use calls to emitting_map/4 (and other +%% emitting_map's) to properly deliver requested information to an +%% aggregator process. +%% +%% If for performance reasons several parallel emitting_map's need to +%% be run, remote function MUST NOT return until all this +%% emitting_map's are done. And during all this time remote RPC +%% process MUST be linked to emitting +%% processes. await_emitters_termination/1 helper can be used as a +%% last statement of remote function to ensure this behaviour. +spawn_emitter_caller(Node, Mod, Fun, Args, Ref, Pid, Timeout) -> + spawn_monitor( + fun () -> + case rpc_call_emitter(Node, Mod, Fun, Args, Ref, Pid, Timeout) of + {error, _} = Error -> + Pid ! {Ref, error, Error}; + {bad_argument, _} = Error -> + Pid ! {Ref, error, Error}; + {badrpc, _} = Error -> + Pid ! {Ref, error, Error}; + _ -> + ok + end + end), + ok. + +rpc_call_emitter(Node, Mod, Fun, Args, Ref, Pid, Timeout) -> + rabbit_misc:rpc_call(Node, Mod, Fun, Args++[Ref, Pid], Timeout). + +%% Agregator process expects correct numbers of explicits ACKs about +%% finished emission process. While everything is linked, we still +%% need somehow to wait for termination of all emitters before +%% returning from RPC call - otherwise links will be just broken with +%% reason 'normal' and we can miss some errors, and subsequentially +%% hang. +await_emitters_termination(Pids) -> + Monitors = [erlang:monitor(process, Pid) || Pid <- Pids], + collect_monitors(Monitors). -wait_for_info_messages(Ref, InfoItemKeys, DisplayFun) when is_reference(Ref) -> +collect_monitors([]) -> + ok; +collect_monitors([Monitor|Rest]) -> receive - {Ref, finished} -> - ok; - {Ref, {timeout, T}} -> + {'DOWN', Monitor, _Pid, normal} -> + collect_monitors(Rest); + {'DOWN', Monitor, _Pid, noproc} -> + %% There is a link and a monitor to a process. Matching + %% this clause means that process has gracefully + %% terminated even before we've started monitoring. + collect_monitors(Rest); + {'DOWN', _, Pid, Reason} -> + exit({emitter_exit, Pid, Reason}) + end. + +%% Wait for result of one or more calls to emitting_map-family +%% functions. +%% +%% Number of expected acknowledgments is specified by ChunkCount +%% argument. Most common usage will be with ChunkCount equals to +%% number of live nodes, but it's not mandatory - thus more generic +%% name of 'ChunkCount' was chosen. +wait_for_info_messages(Pid, Ref, Fun, Acc0, Timeout, ChunkCount) -> + notify_if_timeout(Pid, Ref, Timeout), + wait_for_info_messages(Ref, Fun, Acc0, ChunkCount). + +wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft) -> + receive + {Ref, finished} when ChunksLeft =:= 1 -> + {ok, Acc0}; + {Ref, finished} -> + wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft - 1); + {Ref, {timeout, T}} -> exit({error, {timeout, (T / 1000)}}); - {Ref, []} -> - wait_for_info_messages(Ref, InfoItemKeys, DisplayFun); - {Ref, Result, continue} -> - DisplayFun(Result, InfoItemKeys), - wait_for_info_messages(Ref, InfoItemKeys, DisplayFun); - {error, Error} -> - Error; - _ -> - wait_for_info_messages(Ref, InfoItemKeys, DisplayFun) + {Ref, []} -> + wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft); + {Ref, Result, continue} -> + wait_for_info_messages(Ref, Fun, Fun(Result, Acc0), ChunksLeft); + {Ref, error, Error} -> + {error, simplify_emission_error(Error)}; + {'DOWN', _MRef, process, _Pid, normal} -> + wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft); + {'DOWN', _MRef, process, _Pid, Reason} -> + {error, simplify_emission_error(Reason)}; + _Msg -> + wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft) end. +simplify_emission_error({badrpc, {'EXIT', {{nocatch, EmissionError}, _Stacktrace}}}) -> + EmissionError; +simplify_emission_error({{nocatch, EmissionError}, _Stacktrace}) -> + EmissionError; +simplify_emission_error(Anything) -> + {error, Anything}. + +notify_if_timeout(_, _, infinity) -> + ok; notify_if_timeout(Pid, Ref, Timeout) -> timer:send_after(Timeout, Pid, {Ref, {timeout, Timeout}}). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 9750ba8..02ffeb7 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -75,7 +75,7 @@ -export([get_env/3]). -export([get_channel_operation_timeout/0]). -export([random/1]). --export([rpc_call/4, rpc_call/5, rpc_call/7]). +-export([rpc_call/4, rpc_call/5]). -export([report_default_thread_pool_size/0]). %% Horrible macro to use in guards @@ -262,8 +262,6 @@ -spec random(non_neg_integer()) -> non_neg_integer(). -spec rpc_call(node(), atom(), atom(), [any()]) -> any(). -spec rpc_call(node(), atom(), atom(), [any()], number()) -> any(). --spec rpc_call - (node(), atom(), atom(), [any()], reference(), pid(), number()) -> any(). -spec report_default_thread_pool_size() -> 'ok'. %%---------------------------------------------------------------------------- @@ -1173,9 +1171,6 @@ rpc_call(Node, Mod, Fun, Args, Timeout) -> rpc:call(Node, Mod, Fun, Args, Timeout) end. -rpc_call(Node, Mod, Fun, Args, Ref, Pid, Timeout) -> - rpc_call(Node, Mod, Fun, Args++[Ref, Pid], Timeout). - guess_number_of_cpu_cores() -> case erlang:system_info(logical_processors_available) of unknown -> % Happens on Mac OS X. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 5bf30ff..63e3ed0 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -33,7 +33,8 @@ node_listeners/1, register_connection/1, unregister_connection/1, connections/0, connection_info_keys/0, connection_info/1, connection_info/2, - connection_info_all/0, connection_info_all/1, connection_info_all/3, + connection_info_all/0, connection_info_all/1, + emit_connection_info_all/4, emit_connection_info_local/3, close_connection/2, force_connection_event_refresh/1, tcp_host/1]). %% Used by TCP-based transports, e.g. STOMP adapter @@ -89,8 +90,6 @@ -spec connection_info_all() -> [rabbit_types:infos()]. -spec connection_info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()]. --spec connection_info_all(rabbit_types:info_keys(), reference(), pid()) -> - 'ok'. -spec close_connection(pid(), string()) -> 'ok'. -spec force_connection_event_refresh(reference()) -> 'ok'. @@ -365,10 +364,15 @@ connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items). connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end). connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end). -connection_info_all(Items, Ref, AggregatorPid) -> +emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> + Pids = [ spawn_link(Node, rabbit_networking, emit_connection_info_local, [Items, Ref, AggregatorPid]) || Node <- Nodes ], + rabbit_control_misc:await_emitters_termination(Pids), + ok. + +emit_connection_info_local(Items, Ref, AggregatorPid) -> rabbit_control_misc:emitting_map_with_exit_handler( AggregatorPid, Ref, fun(Q) -> connection_info(Q, Items) end, - connections()). + connections_local()). close_connection(Pid, Explanation) -> rabbit_log:info("Closing connection ~p because ~p~n", [Pid, Explanation]),