From: Luke Bakken <lbakken@pivotal.io>
Date: Tue, 13 Mar 2018 09:00:50 -0700
Subject: [PATCH] Add special case in handle_other for normal TCP port exit
Handle noport at epmd monitor startup
Handle EXIT from TCP port more gracefully
Ensure that Parent pid is matched
diff --git a/src/rabbit_epmd_monitor.erl b/src/rabbit_epmd_monitor.erl
index 9d8044e6e..1a14a640d 100644
--- a/src/rabbit_epmd_monitor.erl
+++ b/src/rabbit_epmd_monitor.erl
@@ -48,16 +48,26 @@
%% epmd" as a shutdown or uninstall step.
%% ----------------------------------------------------------------------------
-start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
init([]) ->
{Me, Host} = rabbit_nodes:parts(node()),
Mod = net_kernel:epmd_module(),
- {port, Port, _Version} = Mod:port_please(Me, Host),
- {ok, ensure_timer(#state{mod = Mod,
- me = Me,
- host = Host,
- port = Port})}.
+ init_handle_port_please(Mod:port_please(Me, Host), Mod, Me, Host).
+
+init_handle_port_please(noport, Mod, Me, Host) ->
+ State = #state{mod = Mod,
+ me = Me,
+ host = Host,
+ port = undefined},
+ {ok, ensure_timer(State)};
+init_handle_port_please({port, Port, _Version}, Mod, Me, Host) ->
+ State = #state{mod = Mod,
+ me = Me,
+ host = Host,
+ port = Port},
+ {ok, ensure_timer(State)}.
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -65,9 +75,9 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info(check, State) ->
- check_epmd(State),
- {noreply, ensure_timer(State#state{timer = undefined})};
+handle_info(check, State0) ->
+ {ok, State1} = check_epmd(State0),
+ {noreply, ensure_timer(State1#state{timer = undefined})};
handle_info(_Info, State) ->
{noreply, State}.
@@ -83,15 +93,18 @@ code_change(_OldVsn, State, _Extra) ->
ensure_timer(State) ->
rabbit_misc:ensure_timer(State, #state.timer, ?CHECK_FREQUENCY, check).
-check_epmd(#state{mod = Mod,
- me = Me,
- host = Host,
- port = Port}) ->
- case Mod:port_please(Me, Host) of
- noport -> rabbit_log:warning(
- "epmd does not know us, re-registering ~s at port ~b~n",
- [Me, Port]),
- rabbit_nodes:ensure_epmd(),
- Mod:register_node(Me, Port);
- _ -> ok
- end.
+check_epmd(State = #state{mod = Mod,
+ me = Me,
+ host = Host,
+ port = Port}) ->
+ Port1 = case Mod:port_please(Me, Host) of
+ noport ->
+ rabbit_log:warning("epmd does not know us, re-registering ~s at port ~b~n",
+ [Me, Port]),
+ Port;
+ {port, NewPort, _Version} ->
+ NewPort
+ end,
+ rabbit_nodes:ensure_epmd(),
+ Mod:register_node(Me, Port1),
+ {ok, State#state{port = Port1}}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 24de35e7e..a6cca9438 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -563,9 +563,14 @@ handle_other({channel_closing, ChPid}, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
{_, State1} = channel_cleanup(ChPid, State),
maybe_close(control_throttle(State1));
+handle_other({'EXIT', Parent, normal}, State = #v1{parent = Parent}) ->
+ %% rabbitmq/rabbitmq-server#544
+ %% The connection port process has exited due to the TCP socket being closed.
+ %% Handle this case in the same manner as receiving {error, closed}
+ stop(closed, State);
handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
- terminate(io_lib:format("broker forced connection closure "
- "with reason '~w'", [Reason]), State),
+ Msg = io_lib:format("broker forced connection closure with reason '~w'", [Reason]),
+ terminate(Msg, State),
%% this is what we are expected to do according to
%% http://www.erlang.org/doc/man/sys.html
%%
@@ -794,7 +799,7 @@ wait_for_channel_termination(N, TimerRef,
wait_for_channel_termination(N-1, TimerRef, State1)
end;
{'EXIT', Sock, _Reason} ->
- [channel_cleanup(ChPid, State) || ChPid <- all_channels()],
+ clean_up_all_channels(State),
exit(normal);
cancel_wait ->
exit(channel_termination_timeout)
@@ -963,6 +968,12 @@ channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) ->
all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
+clean_up_all_channels(State) ->
+ CleanupFun = fun(ChPid) ->
+ channel_cleanup(ChPid, State)
+ end,
+ lists:foreach(CleanupFun, all_channels()).
+
%%--------------------------------------------------------------------------
handle_frame(Type, 0, Payload,