From c96a1805e820031d58f303831db4af842aa6d504 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Wed, 20 Oct 2010 17:26:01 +1000 Subject: [PATCH] - get rid of rpc:call to avoid group leader inheritance - do not log migration errors - remove stopping node from cluster hashing explicitly --- src/ejabberd_app.erl | 1 + src/ejabberd_cluster.erl | 14 ++++++++- src/p1_fsm.erl | 63 +++++++++++++++++++++++++--------------- 3 files changed, 53 insertions(+), 25 deletions(-) diff --git a/src/ejabberd_app.erl b/src/ejabberd_app.erl index 8726805dd..ba7b2d319 100644 --- a/src/ejabberd_app.erl +++ b/src/ejabberd_app.erl @@ -80,6 +80,7 @@ start(_, _) -> %% This function is called when an application is about to be stopped, %% before shutting down the processes of the application. prep_stop(State) -> + ejabberd_cluster:shutdown(), stop_modules(), ejabberd_admin:stop(), broadcast_c2s_shutdown(), diff --git a/src/ejabberd_cluster.erl b/src/ejabberd_cluster.erl index 190e35202..c189eb450 100644 --- a/src/ejabberd_cluster.erl +++ b/src/ejabberd_cluster.erl @@ -10,7 +10,7 @@ -behaviour(gen_server). %% API --export([start_link/0, get_node/1, get_node_new/1, announce/0, +-export([start_link/0, get_node/1, get_node_new/1, announce/0, shutdown/0, node_id/0, get_node_by_id/1, get_nodes/0, rehash_timeout/0]). %% gen_server callbacks @@ -68,6 +68,14 @@ get_node_by_id(NodeID) -> node() end. +shutdown() -> + lists:foreach( + fun(Node) when Node /= node() -> + {ejabberd_cluster, Node} ! {node_down, node()}; + (_) -> + ok + end, get_nodes()). + %%==================================================================== %% gen_server callbacks %%==================================================================== @@ -118,6 +126,10 @@ handle_info({node_ready, Node}, State) -> ?INFO_MSG("adding node ~p to hash", [Node]), append_node(?HASHTBL, Node), {noreply, State}; +handle_info({node_down, Node}, State) -> + delete_node(?HASHTBL, Node), + delete_node(?HASHTBL_NEW, Node), + {noreply, State}; handle_info({nodedown, Node, _}, State) -> ?INFO_MSG("node ~p goes down", [Node]), delete_node(?HASHTBL, Node), diff --git a/src/p1_fsm.erl b/src/p1_fsm.erl index bc47e9c67..adb4e00cf 100644 --- a/src/p1_fsm.erl +++ b/src/p1_fsm.erl @@ -525,8 +525,8 @@ relay_messages(MRef, TRef, Clone, Queue) -> relay_messages(MRef, TRef, Clone) -> receive - {'DOWN', MRef, process, Clone, Reason} -> - Reason; + {'DOWN', MRef, process, Clone, _Reason} -> + normal; {'EXIT', _Parent, _Reason} -> {migrated, Clone}; {timeout, TRef, timeout} -> @@ -563,15 +563,7 @@ handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time, Time1 end, Now = now(), - Reason = case catch rpc:call(Node, M, F, A, RPCTimeout) of - {badrpc, timeout} -> - normal; - {badrpc, _} = Err -> - {migration_error, Err}; - {'EXIT', _} = Err -> - {migration_error, Err}; - {error, _} = Err -> - {migration_error, Err}; + Reason = case catch rpc_call(Node, M, F, A, RPCTimeout) of {ok, Clone} -> process_flag(trap_exit, true), MRef = erlang:monitor(process, Clone), @@ -579,8 +571,8 @@ handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time, TimeLeft = lists:max([Time1 - NowDiff, 0]), TRef = erlang:start_timer(TimeLeft, self(), timeout), relay_messages(MRef, TRef, Clone, Queue); - Reply -> - {migration_error, {bad_reply, Reply}} + _ -> + normal end, terminate(Reason, Name, Msg, Mod, StateName, NStateData, []); {stop, Reason, NStateData} -> @@ -628,15 +620,7 @@ handle_msg(Msg, Parent, Name, StateName, StateData, Time1 end, Now = now(), - Reason = case catch rpc:call(Node, M, F, A, RPCTimeout) of - {badrpc, timeout} -> - normal; - {badrpc, R} -> - {migration_error, R}; - {'EXIT', R} -> - {migration_error, R}; - {error, R} -> - {migration_error, R}; + Reason = case catch rpc_call(Node, M, F, A, RPCTimeout) of {ok, Clone} -> process_flag(trap_exit, true), MRef = erlang:monitor(process, Clone), @@ -644,8 +628,8 @@ handle_msg(Msg, Parent, Name, StateName, StateData, TimeLeft = lists:max([Time1 - NowDiff, 0]), TRef = erlang:start_timer(TimeLeft, self(), timeout), relay_messages(MRef, TRef, Clone, Queue); - Reply -> - {migration_error, {bad_reply, Reply}} + _ -> + normal end, terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug); {stop, Reason, NStateData} -> @@ -832,3 +816,34 @@ message_queue_len(#limits{max_queue = MaxQueue}, QueueLen) -> _ -> ok end. + +rpc_call(Node, Mod, Fun, Args, Timeout) -> + Ref = make_ref(), + Caller = self(), + F = fun() -> + group_leader(whereis(user), self()), + case catch apply(Mod, Fun, Args) of + {'EXIT', _} = Err -> + Caller ! {Ref, {badrpc, Err}}; + Result -> + Caller ! {Ref, Result} + end + end, + Pid = spawn(Node, F), + MRef = erlang:monitor(process, Pid), + receive + {Ref, Result} -> + erlang:demonitor(MRef, [flush]), + Result; + {'DOWN', MRef, _, _, Reason} -> + {badrpc, Reason} + after Timeout -> + erlang:demonitor(MRef, [flush]), + catch exit(Pid, kill), + receive + {Ref, Result} -> + Result + after 0 -> + {badrpc, timeout} + end + end.