Blob Blame History Raw
From: Luke Bakken <lbakken@pivotal.io>
Date: Tue, 13 Mar 2018 09:00:50 -0700
Subject: [PATCH] Add special case in handle_other for normal TCP port exit

Handle noport at epmd monitor startup

Handle EXIT from TCP port more gracefully

Ensure that Parent pid is matched

diff --git a/src/rabbit_epmd_monitor.erl b/src/rabbit_epmd_monitor.erl
index 9d8044e6e..1a14a640d 100644
--- a/src/rabbit_epmd_monitor.erl
+++ b/src/rabbit_epmd_monitor.erl
@@ -48,16 +48,26 @@
 %%    epmd" as a shutdown or uninstall step.
 %% ----------------------------------------------------------------------------
 
-start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+start_link() ->
+    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
 
 init([]) ->
     {Me, Host} = rabbit_nodes:parts(node()),
     Mod = net_kernel:epmd_module(),
-    {port, Port, _Version} = Mod:port_please(Me, Host),
-    {ok, ensure_timer(#state{mod  = Mod,
-                             me   = Me,
-                             host = Host,
-                             port = Port})}.
+    init_handle_port_please(Mod:port_please(Me, Host), Mod, Me, Host).
+
+init_handle_port_please(noport, Mod, Me, Host) ->
+    State = #state{mod = Mod,
+                   me = Me,
+                   host = Host,
+                   port = undefined},
+    {ok, ensure_timer(State)};
+init_handle_port_please({port, Port, _Version}, Mod, Me, Host) ->
+    State = #state{mod = Mod,
+                   me = Me,
+                   host = Host,
+                   port = Port},
+    {ok, ensure_timer(State)}.
 
 handle_call(_Request, _From, State) ->
     {noreply, State}.
@@ -65,9 +75,9 @@ handle_call(_Request, _From, State) ->
 handle_cast(_Msg, State) ->
     {noreply, State}.
 
-handle_info(check, State) ->
-    check_epmd(State),
-    {noreply, ensure_timer(State#state{timer = undefined})};
+handle_info(check, State0) ->
+    {ok, State1} = check_epmd(State0),
+    {noreply, ensure_timer(State1#state{timer = undefined})};
 
 handle_info(_Info, State) ->
     {noreply, State}.
@@ -83,15 +93,18 @@ code_change(_OldVsn, State, _Extra) ->
 ensure_timer(State) ->
     rabbit_misc:ensure_timer(State, #state.timer, ?CHECK_FREQUENCY, check).
 
-check_epmd(#state{mod  = Mod,
-                  me   = Me,
-                  host = Host,
-                  port = Port}) ->
-    case Mod:port_please(Me, Host) of
-        noport -> rabbit_log:warning(
-                    "epmd does not know us, re-registering ~s at port ~b~n",
-                    [Me, Port]),
-                  rabbit_nodes:ensure_epmd(),
-                  Mod:register_node(Me, Port);
-        _      -> ok
-    end.
+check_epmd(State = #state{mod  = Mod,
+                          me   = Me,
+                          host = Host,
+                          port = Port}) ->
+    Port1 = case Mod:port_please(Me, Host) of
+                noport ->
+                    rabbit_log:warning("epmd does not know us, re-registering ~s at port ~b~n",
+                                       [Me, Port]),
+                    Port;
+                {port, NewPort, _Version} ->
+                    NewPort
+            end,
+    rabbit_nodes:ensure_epmd(),
+    Mod:register_node(Me, Port1),
+    {ok, State#state{port = Port1}}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 24de35e7e..a6cca9438 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -563,9 +563,14 @@ handle_other({channel_closing, ChPid}, State) ->
     ok = rabbit_channel:ready_for_close(ChPid),
     {_, State1} = channel_cleanup(ChPid, State),
     maybe_close(control_throttle(State1));
+handle_other({'EXIT', Parent, normal}, State = #v1{parent = Parent}) ->
+    %% rabbitmq/rabbitmq-server#544
+    %% The connection port process has exited due to the TCP socket being closed.
+    %% Handle this case in the same manner as receiving {error, closed}
+    stop(closed, State);
 handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
-    terminate(io_lib:format("broker forced connection closure "
-                            "with reason '~w'", [Reason]), State),
+    Msg = io_lib:format("broker forced connection closure with reason '~w'", [Reason]),
+    terminate(Msg, State),
     %% this is what we are expected to do according to
     %% http://www.erlang.org/doc/man/sys.html
     %%
@@ -794,7 +799,7 @@ wait_for_channel_termination(N, TimerRef,
                     wait_for_channel_termination(N-1, TimerRef, State1)
             end;
         {'EXIT', Sock, _Reason} ->
-            [channel_cleanup(ChPid, State) || ChPid <- all_channels()],
+            clean_up_all_channels(State),
             exit(normal);
         cancel_wait ->
             exit(channel_termination_timeout)
@@ -963,6 +968,12 @@ channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) ->
 
 all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()].
 
+clean_up_all_channels(State) ->
+    CleanupFun = fun(ChPid) ->
+                    channel_cleanup(ChPid, State)
+                 end,
+    lists:foreach(CleanupFun, all_channels()).
+
 %%--------------------------------------------------------------------------
 
 handle_frame(Type, 0, Payload,