From: Luke Bakken Date: Tue, 13 Mar 2018 09:00:50 -0700 Subject: [PATCH] Add special case in handle_other for normal TCP port exit Handle noport at epmd monitor startup Handle EXIT from TCP port more gracefully Ensure that Parent pid is matched diff --git a/src/rabbit_epmd_monitor.erl b/src/rabbit_epmd_monitor.erl index 9d8044e6e..1a14a640d 100644 --- a/src/rabbit_epmd_monitor.erl +++ b/src/rabbit_epmd_monitor.erl @@ -48,16 +48,26 @@ %% epmd" as a shutdown or uninstall step. %% ---------------------------------------------------------------------------- -start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). init([]) -> {Me, Host} = rabbit_nodes:parts(node()), Mod = net_kernel:epmd_module(), - {port, Port, _Version} = Mod:port_please(Me, Host), - {ok, ensure_timer(#state{mod = Mod, - me = Me, - host = Host, - port = Port})}. + init_handle_port_please(Mod:port_please(Me, Host), Mod, Me, Host). + +init_handle_port_please(noport, Mod, Me, Host) -> + State = #state{mod = Mod, + me = Me, + host = Host, + port = undefined}, + {ok, ensure_timer(State)}; +init_handle_port_please({port, Port, _Version}, Mod, Me, Host) -> + State = #state{mod = Mod, + me = Me, + host = Host, + port = Port}, + {ok, ensure_timer(State)}. handle_call(_Request, _From, State) -> {noreply, State}. @@ -65,9 +75,9 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info(check, State) -> - check_epmd(State), - {noreply, ensure_timer(State#state{timer = undefined})}; +handle_info(check, State0) -> + {ok, State1} = check_epmd(State0), + {noreply, ensure_timer(State1#state{timer = undefined})}; handle_info(_Info, State) -> {noreply, State}. @@ -83,15 +93,18 @@ code_change(_OldVsn, State, _Extra) -> ensure_timer(State) -> rabbit_misc:ensure_timer(State, #state.timer, ?CHECK_FREQUENCY, check). -check_epmd(#state{mod = Mod, - me = Me, - host = Host, - port = Port}) -> - case Mod:port_please(Me, Host) of - noport -> rabbit_log:warning( - "epmd does not know us, re-registering ~s at port ~b~n", - [Me, Port]), - rabbit_nodes:ensure_epmd(), - Mod:register_node(Me, Port); - _ -> ok - end. +check_epmd(State = #state{mod = Mod, + me = Me, + host = Host, + port = Port}) -> + Port1 = case Mod:port_please(Me, Host) of + noport -> + rabbit_log:warning("epmd does not know us, re-registering ~s at port ~b~n", + [Me, Port]), + Port; + {port, NewPort, _Version} -> + NewPort + end, + rabbit_nodes:ensure_epmd(), + Mod:register_node(Me, Port1), + {ok, State#state{port = Port1}}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 24de35e7e..a6cca9438 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -563,9 +563,14 @@ handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), {_, State1} = channel_cleanup(ChPid, State), maybe_close(control_throttle(State1)); +handle_other({'EXIT', Parent, normal}, State = #v1{parent = Parent}) -> + %% rabbitmq/rabbitmq-server#544 + %% The connection port process has exited due to the TCP socket being closed. + %% Handle this case in the same manner as receiving {error, closed} + stop(closed, State); handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) -> - terminate(io_lib:format("broker forced connection closure " - "with reason '~w'", [Reason]), State), + Msg = io_lib:format("broker forced connection closure with reason '~w'", [Reason]), + terminate(Msg, State), %% this is what we are expected to do according to %% http://www.erlang.org/doc/man/sys.html %% @@ -794,7 +799,7 @@ wait_for_channel_termination(N, TimerRef, wait_for_channel_termination(N-1, TimerRef, State1) end; {'EXIT', Sock, _Reason} -> - [channel_cleanup(ChPid, State) || ChPid <- all_channels()], + clean_up_all_channels(State), exit(normal); cancel_wait -> exit(channel_termination_timeout) @@ -963,6 +968,12 @@ channel_cleanup(ChPid, State = #v1{channel_count = ChannelCount}) -> all_channels() -> [ChPid || {{ch_pid, ChPid}, _ChannelMRef} <- get()]. +clean_up_all_channels(State) -> + CleanupFun = fun(ChPid) -> + channel_cleanup(ChPid, State) + end, + lists:foreach(CleanupFun, all_channels()). + %%-------------------------------------------------------------------------- handle_frame(Type, 0, Payload,