|
|
5b2912 |
From: Alexey Lebedeff <alebedev@mirantis.com>
|
|
|
5b2912 |
Date: Wed, 9 Mar 2016 14:55:02 +0300
|
|
|
5b2912 |
Subject: [PATCH] Avoid RPC roundtrips while listing items
|
|
|
5b2912 |
|
|
|
5b2912 |
- Emit info about particular items in parallel on every node, with
|
|
|
5b2912 |
results delivered directly to a `rabbitmqctl` instance.
|
|
|
5b2912 |
- `rabbit_control_misc:wait_for_info_messages/5` can wait for results of
|
|
|
5b2912 |
more than one emitting map.
|
|
|
5b2912 |
- Stop passing arround InfoItemKeys in
|
|
|
5b2912 |
`rabbit_control_misc:wait_for_info_messages/5`, the same information
|
|
|
5b2912 |
could be directly encoded in DisplayFun closure.
|
|
|
5b2912 |
- Add `emit` to function names, to avoid confusion with regular ones
|
|
|
5b2912 |
which return result directly.
|
|
|
5b2912 |
|
|
|
5b2912 |
Part of https://github.com/rabbitmq/rabbitmq-server/pull/683
|
|
|
5b2912 |
|
|
|
5b2912 |
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
|
|
|
5b2912 |
index 6d4e52e..60e94c0 100644
|
|
|
5b2912 |
--- a/src/rabbit_amqqueue.erl
|
|
|
5b2912 |
+++ b/src/rabbit_amqqueue.erl
|
|
|
5b2912 |
@@ -25,10 +25,10 @@
|
|
|
5b2912 |
check_exclusive_access/2, with_exclusive_access_or_die/3,
|
|
|
5b2912 |
stat/1, deliver/2, requeue/3, ack/3, reject/4]).
|
|
|
5b2912 |
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2,
|
|
|
5b2912 |
- info_all/6]).
|
|
|
5b2912 |
+ emit_info_all/5, list_local/1]).
|
|
|
5b2912 |
-export([list_down/1]).
|
|
|
5b2912 |
-export([force_event_refresh/1, notify_policy_changed/1]).
|
|
|
5b2912 |
--export([consumers/1, consumers_all/1, consumers_all/3, consumer_info_keys/0]).
|
|
|
5b2912 |
+-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
|
|
|
5b2912 |
-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
|
|
|
5b2912 |
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
|
|
|
5b2912 |
-export([notify_down_all/2, notify_down_all/3, activate_limit_all/2, credit/5]).
|
|
|
5b2912 |
@@ -41,7 +41,8 @@
|
|
|
5b2912 |
|
|
|
5b2912 |
%% internal
|
|
|
5b2912 |
-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
|
|
|
5b2912 |
- set_ram_duration_target/2, set_maximum_since_use/2]).
|
|
|
5b2912 |
+ set_ram_duration_target/2, set_maximum_since_use/2,
|
|
|
5b2912 |
+ emit_info_local/4, emit_info_down/4, emit_consumers_local/3]).
|
|
|
5b2912 |
|
|
|
5b2912 |
-include("rabbit.hrl").
|
|
|
5b2912 |
-include_lib("stdlib/include/qlc.hrl").
|
|
|
5b2912 |
@@ -117,10 +118,6 @@
|
|
|
5b2912 |
-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()].
|
|
|
5b2912 |
-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) ->
|
|
|
5b2912 |
[rabbit_types:infos()].
|
|
|
5b2912 |
--spec info_all
|
|
|
5b2912 |
- (rabbit_types:vhost(), rabbit_types:info_keys(), boolean(), boolean(),
|
|
|
5b2912 |
- reference(), pid()) ->
|
|
|
5b2912 |
- 'ok'.
|
|
|
5b2912 |
-spec force_event_refresh(reference()) -> 'ok'.
|
|
|
5b2912 |
-spec notify_policy_changed(rabbit_types:amqqueue()) -> 'ok'.
|
|
|
5b2912 |
-spec consumers(rabbit_types:amqqueue()) ->
|
|
|
5b2912 |
@@ -130,7 +127,6 @@
|
|
|
5b2912 |
-spec consumers_all(rabbit_types:vhost()) ->
|
|
|
5b2912 |
[{name(), pid(), rabbit_types:ctag(), boolean(),
|
|
|
5b2912 |
non_neg_integer(), rabbit_framing:amqp_table()}].
|
|
|
5b2912 |
--spec consumers_all(rabbit_types:vhost(), reference(), pid()) -> 'ok'.
|
|
|
5b2912 |
-spec stat(rabbit_types:amqqueue()) ->
|
|
|
5b2912 |
{'ok', non_neg_integer(), non_neg_integer()}.
|
|
|
5b2912 |
-spec delete_immediately(qpids()) -> 'ok'.
|
|
|
5b2912 |
@@ -627,16 +623,23 @@ info_all(VHostPath, Items) ->
|
|
|
5b2912 |
map(list(VHostPath), fun (Q) -> info(Q, Items) end) ++
|
|
|
5b2912 |
map(list_down(VHostPath), fun (Q) -> info_down(Q, Items, down) end).
|
|
|
5b2912 |
|
|
|
5b2912 |
-info_all(VHostPath, Items, NeedOnline, NeedOffline, Ref, AggregatorPid) ->
|
|
|
5b2912 |
- NeedOnline andalso rabbit_control_misc:emitting_map_with_exit_handler(
|
|
|
5b2912 |
- AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, list(VHostPath),
|
|
|
5b2912 |
- continue),
|
|
|
5b2912 |
- NeedOffline andalso rabbit_control_misc:emitting_map_with_exit_handler(
|
|
|
5b2912 |
- AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end,
|
|
|
5b2912 |
- list_down(VHostPath),
|
|
|
5b2912 |
- continue),
|
|
|
5b2912 |
- %% Previous maps are incomplete, finalize emission
|
|
|
5b2912 |
- rabbit_control_misc:emitting_map(AggregatorPid, Ref, fun(_) -> no_op end, []).
|
|
|
5b2912 |
+emit_info_local(VHostPath, Items, Ref, AggregatorPid) ->
|
|
|
5b2912 |
+ rabbit_control_misc:emitting_map_with_exit_handler(
|
|
|
5b2912 |
+ AggregatorPid, Ref, fun(Q) -> info(Q, Items) end, list_local(VHostPath)).
|
|
|
5b2912 |
+
|
|
|
5b2912 |
+emit_info_all(Nodes, VHostPath, Items, Ref, AggregatorPid) ->
|
|
|
5b2912 |
+ Pids = [ spawn_link(Node, rabbit_amqqueue, emit_info_local, [VHostPath, Items, Ref, AggregatorPid]) || Node <- Nodes ],
|
|
|
5b2912 |
+ rabbit_control_misc:await_emitters_termination(Pids).
|
|
|
5b2912 |
+
|
|
|
5b2912 |
+emit_info_down(VHostPath, Items, Ref, AggregatorPid) ->
|
|
|
5b2912 |
+ rabbit_control_misc:emitting_map_with_exit_handler(
|
|
|
5b2912 |
+ AggregatorPid, Ref, fun(Q) -> info_down(Q, Items, down) end,
|
|
|
5b2912 |
+ list_down(VHostPath)).
|
|
|
5b2912 |
+
|
|
|
5b2912 |
+list_local(VHostPath) ->
|
|
|
5b2912 |
+ [ Q || #amqqueue{state = State, pid=QPid} = Q <- list(VHostPath),
|
|
|
5b2912 |
+ State =/= crashed,
|
|
|
5b2912 |
+ node() =:= node(QPid) ].
|
|
|
5b2912 |
|
|
|
5b2912 |
force_event_refresh(Ref) ->
|
|
|
5b2912 |
[gen_server2:cast(Q#amqqueue.pid,
|
|
|
5b2912 |
@@ -656,12 +659,17 @@ consumers_all(VHostPath) ->
|
|
|
5b2912 |
map(list(VHostPath),
|
|
|
5b2912 |
fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end)).
|
|
|
5b2912 |
|
|
|
5b2912 |
-consumers_all(VHostPath, Ref, AggregatorPid) ->
|
|
|
5b2912 |
+emit_consumers_all(Nodes, VHostPath, Ref, AggregatorPid) ->
|
|
|
5b2912 |
+ Pids = [ spawn_link(Node, rabbit_amqqueue, emit_consumers_local, [VHostPath, Ref, AggregatorPid]) || Node <- Nodes ],
|
|
|
5b2912 |
+ rabbit_control_misc:await_emitters_termination(Pids),
|
|
|
5b2912 |
+ ok.
|
|
|
5b2912 |
+
|
|
|
5b2912 |
+emit_consumers_local(VHostPath, Ref, AggregatorPid) ->
|
|
|
5b2912 |
ConsumerInfoKeys = consumer_info_keys(),
|
|
|
5b2912 |
rabbit_control_misc:emitting_map(
|
|
|
5b2912 |
AggregatorPid, Ref,
|
|
|
5b2912 |
fun(Q) -> get_queue_consumer_info(Q, ConsumerInfoKeys) end,
|
|
|
5b2912 |
- list(VHostPath)).
|
|
|
5b2912 |
+ list_local(VHostPath)).
|
|
|
5b2912 |
|
|
|
5b2912 |
get_queue_consumer_info(Q, ConsumerInfoKeys) ->
|
|
|
5b2912 |
[lists:zip(ConsumerInfoKeys,
|
|
|
5b2912 |
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
|
|
|
5b2912 |
index 53b9fe8..e115b76 100644
|
|
|
5b2912 |
--- a/src/rabbit_channel.erl
|
|
|
5b2912 |
+++ b/src/rabbit_channel.erl
|
|
|
5b2912 |
@@ -56,7 +56,7 @@
|
|
|
5b2912 |
-export([send_command/2, deliver/4, deliver_reply/2,
|
|
|
5b2912 |
send_credit_reply/2, send_drained/2]).
|
|
|
5b2912 |
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1,
|
|
|
5b2912 |
- info_all/3]).
|
|
|
5b2912 |
+ emit_info_all/4]).
|
|
|
5b2912 |
-export([refresh_config_local/0, ready_for_close/1]).
|
|
|
5b2912 |
-export([force_event_refresh/1]).
|
|
|
5b2912 |
|
|
|
5b2912 |
@@ -64,7 +64,7 @@
|
|
|
5b2912 |
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
|
|
|
5b2912 |
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
|
|
|
5b2912 |
%% Internal
|
|
|
5b2912 |
--export([list_local/0, deliver_reply_local/3]).
|
|
|
5b2912 |
+-export([list_local/0, emit_info_local/3, deliver_reply_local/3]).
|
|
|
5b2912 |
-export([get_vhost/1, get_user/1]).
|
|
|
5b2912 |
|
|
|
5b2912 |
-record(ch, {
|
|
|
5b2912 |
@@ -220,7 +220,6 @@
|
|
|
5b2912 |
-spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos().
|
|
|
5b2912 |
-spec info_all() -> [rabbit_types:infos()].
|
|
|
5b2912 |
-spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()].
|
|
|
5b2912 |
--spec info_all(rabbit_types:info_keys(), reference(), pid()) -> 'ok'.
|
|
|
5b2912 |
-spec refresh_config_local() -> 'ok'.
|
|
|
5b2912 |
-spec ready_for_close(pid()) -> 'ok'.
|
|
|
5b2912 |
-spec force_event_refresh(reference()) -> 'ok'.
|
|
|
5b2912 |
@@ -326,9 +325,16 @@ info_all() ->
|
|
|
5b2912 |
info_all(Items) ->
|
|
|
5b2912 |
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
|
|
|
5b2912 |
|
|
|
5b2912 |
-info_all(Items, Ref, AggregatorPid) ->
|
|
|
5b2912 |
+emit_info_all(Nodes, Items, Ref, AggregatorPid) ->
|
|
|
5b2912 |
+ Pids = [ spawn_link(Node, rabbit_channel, emit_info_local, [Items, Ref, AggregatorPid]) || Node <- Nodes ],
|
|
|
5b2912 |
+ rabbit_control_misc:await_emitters_termination(Pids).
|
|
|
5b2912 |
+
|
|
|
5b2912 |
+emit_info_local(Items, Ref, AggregatorPid) ->
|
|
|
5b2912 |
+ emit_info(list_local(), Items, Ref, AggregatorPid).
|
|
|
5b2912 |
+
|
|
|
5b2912 |
+emit_info(PidList, InfoItems, Ref, AggregatorPid) ->
|
|
|
5b2912 |
rabbit_control_misc:emitting_map_with_exit_handler(
|
|
|
5b2912 |
- AggregatorPid, Ref, fun(C) -> info(C, Items) end, list()).
|
|
|
5b2912 |
+ AggregatorPid, Ref, fun(C) -> info(C, InfoItems) end, PidList).
|
|
|
5b2912 |
|
|
|
5b2912 |
refresh_config_local() ->
|
|
|
5b2912 |
rabbit_misc:upmap(
|
|
|
5b2912 |
diff --git a/src/rabbit_control_misc.erl b/src/rabbit_control_misc.erl
|
|
|
5b2912 |
index 0d2de1f..2e7a16c 100644
|
|
|
5b2912 |
--- a/src/rabbit_control_misc.erl
|
|
|
5b2912 |
+++ b/src/rabbit_control_misc.erl
|
|
|
5b2912 |
@@ -17,7 +17,8 @@
|
|
|
5b2912 |
-module(rabbit_control_misc).
|
|
|
5b2912 |
|
|
|
5b2912 |
-export([emitting_map/4, emitting_map/5, emitting_map_with_exit_handler/4,
|
|
|
5b2912 |
- emitting_map_with_exit_handler/5, wait_for_info_messages/5,
|
|
|
5b2912 |
+ emitting_map_with_exit_handler/5, wait_for_info_messages/6,
|
|
|
5b2912 |
+ spawn_emitter_caller/7, await_emitters_termination/1,
|
|
|
5b2912 |
print_cmd_result/2]).
|
|
|
5b2912 |
|
|
|
5b2912 |
-spec emitting_map(pid(), reference(), fun(), list()) -> 'ok'.
|
|
|
5b2912 |
@@ -25,7 +26,14 @@
|
|
|
5b2912 |
-spec emitting_map_with_exit_handler
|
|
|
5b2912 |
(pid(), reference(), fun(), list()) -> 'ok'.
|
|
|
5b2912 |
-spec emitting_map_with_exit_handler
|
|
|
5b2912 |
- (pid(), reference(), fun(), list(), atom()) -> 'ok'.
|
|
|
5b2912 |
+ (pid(), reference(), fun(), list(), 'continue') -> 'ok'.
|
|
|
5b2912 |
+
|
|
|
5b2912 |
+-type fold_fun() :: fun ((term(), term()) -> term()).
|
|
|
5b2912 |
+
|
|
|
5b2912 |
+-spec wait_for_info_messages (pid(), reference(), fold_fun(), term(), timeout(), non_neg_integer()) -> {'ok', term()} | {'error', term()}.
|
|
|
5b2912 |
+-spec spawn_emitter_caller (node(), module(), atom(), [term()], reference(), pid(), timeout()) -> 'ok'.
|
|
|
5b2912 |
+-spec await_emitters_termination ([pid()]) -> 'ok'.
|
|
|
5b2912 |
+
|
|
|
5b2912 |
-spec print_cmd_result(atom(), term()) -> 'ok'.
|
|
|
5b2912 |
|
|
|
5b2912 |
emitting_map(AggregatorPid, Ref, Fun, List) ->
|
|
|
5b2912 |
@@ -65,27 +73,108 @@ step_with_exit_handler(AggregatorPid, Ref, Fun, Item) ->
|
|
|
5b2912 |
ok
|
|
|
5b2912 |
end.
|
|
|
5b2912 |
|
|
|
5b2912 |
-wait_for_info_messages(Pid, Ref, ArgAtoms, DisplayFun, Timeout) ->
|
|
|
5b2912 |
- _ = notify_if_timeout(Pid, Ref, Timeout),
|
|
|
5b2912 |
- wait_for_info_messages(Ref, ArgAtoms, DisplayFun).
|
|
|
5b2912 |
+%% Invokes RPC for async info collection in separate (but linked to
|
|
|
5b2912 |
+%% the caller) process. Separate process waits for RPC to finish and
|
|
|
5b2912 |
+%% in case of errors sends them in wait_for_info_messages/5-compatible
|
|
|
5b2912 |
+%% form to aggregator process. Calling process is then expected to
|
|
|
5b2912 |
+%% do blocking call of wait_for_info_messages/5.
|
|
|
5b2912 |
+%%
|
|
|
5b2912 |
+%% Remote function MUST use calls to emitting_map/4 (and other
|
|
|
5b2912 |
+%% emitting_map's) to properly deliver requested information to an
|
|
|
5b2912 |
+%% aggregator process.
|
|
|
5b2912 |
+%%
|
|
|
5b2912 |
+%% If for performance reasons several parallel emitting_map's need to
|
|
|
5b2912 |
+%% be run, remote function MUST NOT return until all this
|
|
|
5b2912 |
+%% emitting_map's are done. And during all this time remote RPC
|
|
|
5b2912 |
+%% process MUST be linked to emitting
|
|
|
5b2912 |
+%% processes. await_emitters_termination/1 helper can be used as a
|
|
|
5b2912 |
+%% last statement of remote function to ensure this behaviour.
|
|
|
5b2912 |
+spawn_emitter_caller(Node, Mod, Fun, Args, Ref, Pid, Timeout) ->
|
|
|
5b2912 |
+ spawn_monitor(
|
|
|
5b2912 |
+ fun () ->
|
|
|
5b2912 |
+ case rpc_call_emitter(Node, Mod, Fun, Args, Ref, Pid, Timeout) of
|
|
|
5b2912 |
+ {error, _} = Error ->
|
|
|
5b2912 |
+ Pid ! {Ref, error, Error};
|
|
|
5b2912 |
+ {bad_argument, _} = Error ->
|
|
|
5b2912 |
+ Pid ! {Ref, error, Error};
|
|
|
5b2912 |
+ {badrpc, _} = Error ->
|
|
|
5b2912 |
+ Pid ! {Ref, error, Error};
|
|
|
5b2912 |
+ _ ->
|
|
|
5b2912 |
+ ok
|
|
|
5b2912 |
+ end
|
|
|
5b2912 |
+ end),
|
|
|
5b2912 |
+ ok.
|
|
|
5b2912 |
+
|
|
|
5b2912 |
+rpc_call_emitter(Node, Mod, Fun, Args, Ref, Pid, Timeout) ->
|
|
|
5b2912 |
+ rabbit_misc:rpc_call(Node, Mod, Fun, Args++[Ref, Pid], Timeout).
|
|
|
5b2912 |
+
|
|
|
5b2912 |
+%% Agregator process expects correct numbers of explicits ACKs about
|
|
|
5b2912 |
+%% finished emission process. While everything is linked, we still
|
|
|
5b2912 |
+%% need somehow to wait for termination of all emitters before
|
|
|
5b2912 |
+%% returning from RPC call - otherwise links will be just broken with
|
|
|
5b2912 |
+%% reason 'normal' and we can miss some errors, and subsequentially
|
|
|
5b2912 |
+%% hang.
|
|
|
5b2912 |
+await_emitters_termination(Pids) ->
|
|
|
5b2912 |
+ Monitors = [erlang:monitor(process, Pid) || Pid <- Pids],
|
|
|
5b2912 |
+ collect_monitors(Monitors).
|
|
|
5b2912 |
|
|
|
5b2912 |
-wait_for_info_messages(Ref, InfoItemKeys, DisplayFun) when is_reference(Ref) ->
|
|
|
5b2912 |
+collect_monitors([]) ->
|
|
|
5b2912 |
+ ok;
|
|
|
5b2912 |
+collect_monitors([Monitor|Rest]) ->
|
|
|
5b2912 |
receive
|
|
|
5b2912 |
- {Ref, finished} ->
|
|
|
5b2912 |
- ok;
|
|
|
5b2912 |
- {Ref, {timeout, T}} ->
|
|
|
5b2912 |
+ {'DOWN', Monitor, _Pid, normal} ->
|
|
|
5b2912 |
+ collect_monitors(Rest);
|
|
|
5b2912 |
+ {'DOWN', Monitor, _Pid, noproc} ->
|
|
|
5b2912 |
+ %% There is a link and a monitor to a process. Matching
|
|
|
5b2912 |
+ %% this clause means that process has gracefully
|
|
|
5b2912 |
+ %% terminated even before we've started monitoring.
|
|
|
5b2912 |
+ collect_monitors(Rest);
|
|
|
5b2912 |
+ {'DOWN', _, Pid, Reason} ->
|
|
|
5b2912 |
+ exit({emitter_exit, Pid, Reason})
|
|
|
5b2912 |
+ end.
|
|
|
5b2912 |
+
|
|
|
5b2912 |
+%% Wait for result of one or more calls to emitting_map-family
|
|
|
5b2912 |
+%% functions.
|
|
|
5b2912 |
+%%
|
|
|
5b2912 |
+%% Number of expected acknowledgments is specified by ChunkCount
|
|
|
5b2912 |
+%% argument. Most common usage will be with ChunkCount equals to
|
|
|
5b2912 |
+%% number of live nodes, but it's not mandatory - thus more generic
|
|
|
5b2912 |
+%% name of 'ChunkCount' was chosen.
|
|
|
5b2912 |
+wait_for_info_messages(Pid, Ref, Fun, Acc0, Timeout, ChunkCount) ->
|
|
|
5b2912 |
+ notify_if_timeout(Pid, Ref, Timeout),
|
|
|
5b2912 |
+ wait_for_info_messages(Ref, Fun, Acc0, ChunkCount).
|
|
|
5b2912 |
+
|
|
|
5b2912 |
+wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft) ->
|
|
|
5b2912 |
+ receive
|
|
|
5b2912 |
+ {Ref, finished} when ChunksLeft =:= 1 ->
|
|
|
5b2912 |
+ {ok, Acc0};
|
|
|
5b2912 |
+ {Ref, finished} ->
|
|
|
5b2912 |
+ wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft - 1);
|
|
|
5b2912 |
+ {Ref, {timeout, T}} ->
|
|
|
5b2912 |
exit({error, {timeout, (T / 1000)}});
|
|
|
5b2912 |
- {Ref, []} ->
|
|
|
5b2912 |
- wait_for_info_messages(Ref, InfoItemKeys, DisplayFun);
|
|
|
5b2912 |
- {Ref, Result, continue} ->
|
|
|
5b2912 |
- DisplayFun(Result, InfoItemKeys),
|
|
|
5b2912 |
- wait_for_info_messages(Ref, InfoItemKeys, DisplayFun);
|
|
|
5b2912 |
- {error, Error} ->
|
|
|
5b2912 |
- Error;
|
|
|
5b2912 |
- _ ->
|
|
|
5b2912 |
- wait_for_info_messages(Ref, InfoItemKeys, DisplayFun)
|
|
|
5b2912 |
+ {Ref, []} ->
|
|
|
5b2912 |
+ wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft);
|
|
|
5b2912 |
+ {Ref, Result, continue} ->
|
|
|
5b2912 |
+ wait_for_info_messages(Ref, Fun, Fun(Result, Acc0), ChunksLeft);
|
|
|
5b2912 |
+ {Ref, error, Error} ->
|
|
|
5b2912 |
+ {error, simplify_emission_error(Error)};
|
|
|
5b2912 |
+ {'DOWN', _MRef, process, _Pid, normal} ->
|
|
|
5b2912 |
+ wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft);
|
|
|
5b2912 |
+ {'DOWN', _MRef, process, _Pid, Reason} ->
|
|
|
5b2912 |
+ {error, simplify_emission_error(Reason)};
|
|
|
5b2912 |
+ _Msg ->
|
|
|
5b2912 |
+ wait_for_info_messages(Ref, Fun, Acc0, ChunksLeft)
|
|
|
5b2912 |
end.
|
|
|
5b2912 |
|
|
|
5b2912 |
+simplify_emission_error({badrpc, {'EXIT', {{nocatch, EmissionError}, _Stacktrace}}}) ->
|
|
|
5b2912 |
+ EmissionError;
|
|
|
5b2912 |
+simplify_emission_error({{nocatch, EmissionError}, _Stacktrace}) ->
|
|
|
5b2912 |
+ EmissionError;
|
|
|
5b2912 |
+simplify_emission_error(Anything) ->
|
|
|
5b2912 |
+ {error, Anything}.
|
|
|
5b2912 |
+
|
|
|
5b2912 |
+notify_if_timeout(_, _, infinity) ->
|
|
|
5b2912 |
+ ok;
|
|
|
5b2912 |
notify_if_timeout(Pid, Ref, Timeout) ->
|
|
|
5b2912 |
timer:send_after(Timeout, Pid, {Ref, {timeout, Timeout}}).
|
|
|
5b2912 |
|
|
|
5b2912 |
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
|
|
|
5b2912 |
index 9750ba8..02ffeb7 100644
|
|
|
5b2912 |
--- a/src/rabbit_misc.erl
|
|
|
5b2912 |
+++ b/src/rabbit_misc.erl
|
|
|
5b2912 |
@@ -75,7 +75,7 @@
|
|
|
5b2912 |
-export([get_env/3]).
|
|
|
5b2912 |
-export([get_channel_operation_timeout/0]).
|
|
|
5b2912 |
-export([random/1]).
|
|
|
5b2912 |
--export([rpc_call/4, rpc_call/5, rpc_call/7]).
|
|
|
5b2912 |
+-export([rpc_call/4, rpc_call/5]).
|
|
|
5b2912 |
-export([report_default_thread_pool_size/0]).
|
|
|
5b2912 |
|
|
|
5b2912 |
%% Horrible macro to use in guards
|
|
|
5b2912 |
@@ -262,8 +262,6 @@
|
|
|
5b2912 |
-spec random(non_neg_integer()) -> non_neg_integer().
|
|
|
5b2912 |
-spec rpc_call(node(), atom(), atom(), [any()]) -> any().
|
|
|
5b2912 |
-spec rpc_call(node(), atom(), atom(), [any()], number()) -> any().
|
|
|
5b2912 |
--spec rpc_call
|
|
|
5b2912 |
- (node(), atom(), atom(), [any()], reference(), pid(), number()) -> any().
|
|
|
5b2912 |
-spec report_default_thread_pool_size() -> 'ok'.
|
|
|
5b2912 |
|
|
|
5b2912 |
%%----------------------------------------------------------------------------
|
|
|
5b2912 |
@@ -1173,9 +1171,6 @@ rpc_call(Node, Mod, Fun, Args, Timeout) ->
|
|
|
5b2912 |
rpc:call(Node, Mod, Fun, Args, Timeout)
|
|
|
5b2912 |
end.
|
|
|
5b2912 |
|
|
|
5b2912 |
-rpc_call(Node, Mod, Fun, Args, Ref, Pid, Timeout) ->
|
|
|
5b2912 |
- rpc_call(Node, Mod, Fun, Args++[Ref, Pid], Timeout).
|
|
|
5b2912 |
-
|
|
|
5b2912 |
guess_number_of_cpu_cores() ->
|
|
|
5b2912 |
case erlang:system_info(logical_processors_available) of
|
|
|
5b2912 |
unknown -> % Happens on Mac OS X.
|
|
|
5b2912 |
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
|
|
|
5b2912 |
index 5bf30ff..63e3ed0 100644
|
|
|
5b2912 |
--- a/src/rabbit_networking.erl
|
|
|
5b2912 |
+++ b/src/rabbit_networking.erl
|
|
|
5b2912 |
@@ -33,7 +33,8 @@
|
|
|
5b2912 |
node_listeners/1, register_connection/1, unregister_connection/1,
|
|
|
5b2912 |
connections/0, connection_info_keys/0,
|
|
|
5b2912 |
connection_info/1, connection_info/2,
|
|
|
5b2912 |
- connection_info_all/0, connection_info_all/1, connection_info_all/3,
|
|
|
5b2912 |
+ connection_info_all/0, connection_info_all/1,
|
|
|
5b2912 |
+ emit_connection_info_all/4, emit_connection_info_local/3,
|
|
|
5b2912 |
close_connection/2, force_connection_event_refresh/1, tcp_host/1]).
|
|
|
5b2912 |
|
|
|
5b2912 |
%% Used by TCP-based transports, e.g. STOMP adapter
|
|
|
5b2912 |
@@ -89,8 +90,6 @@
|
|
|
5b2912 |
-spec connection_info_all() -> [rabbit_types:infos()].
|
|
|
5b2912 |
-spec connection_info_all(rabbit_types:info_keys()) ->
|
|
|
5b2912 |
[rabbit_types:infos()].
|
|
|
5b2912 |
--spec connection_info_all(rabbit_types:info_keys(), reference(), pid()) ->
|
|
|
5b2912 |
- 'ok'.
|
|
|
5b2912 |
-spec close_connection(pid(), string()) -> 'ok'.
|
|
|
5b2912 |
-spec force_connection_event_refresh(reference()) -> 'ok'.
|
|
|
5b2912 |
|
|
|
5b2912 |
@@ -365,10 +364,15 @@ connection_info(Pid, Items) -> rabbit_reader:info(Pid, Items).
|
|
|
5b2912 |
connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end).
|
|
|
5b2912 |
connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end).
|
|
|
5b2912 |
|
|
|
5b2912 |
-connection_info_all(Items, Ref, AggregatorPid) ->
|
|
|
5b2912 |
+emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
|
|
|
5b2912 |
+ Pids = [ spawn_link(Node, rabbit_networking, emit_connection_info_local, [Items, Ref, AggregatorPid]) || Node <- Nodes ],
|
|
|
5b2912 |
+ rabbit_control_misc:await_emitters_termination(Pids),
|
|
|
5b2912 |
+ ok.
|
|
|
5b2912 |
+
|
|
|
5b2912 |
+emit_connection_info_local(Items, Ref, AggregatorPid) ->
|
|
|
5b2912 |
rabbit_control_misc:emitting_map_with_exit_handler(
|
|
|
5b2912 |
AggregatorPid, Ref, fun(Q) -> connection_info(Q, Items) end,
|
|
|
5b2912 |
- connections()).
|
|
|
5b2912 |
+ connections_local()).
|
|
|
5b2912 |
|
|
|
5b2912 |
close_connection(Pid, Explanation) ->
|
|
|
5b2912 |
rabbit_log:info("Closing connection ~p because ~p~n", [Pid, Explanation]),
|