25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-10-05 14:51:05 +02:00

- get rid of rpc:call to avoid group leader inheritance

- do not log migration errors
- remove stopping node from cluster hashing explicitly
This commit is contained in:
Evgeniy Khramtsov 2010-10-20 17:26:01 +10:00
parent babff870a8
commit c96a1805e8
3 changed files with 53 additions and 25 deletions

View File

@ -80,6 +80,7 @@ start(_, _) ->
%% This function is called when an application is about to be stopped, %% This function is called when an application is about to be stopped,
%% before shutting down the processes of the application. %% before shutting down the processes of the application.
prep_stop(State) -> prep_stop(State) ->
ejabberd_cluster:shutdown(),
stop_modules(), stop_modules(),
ejabberd_admin:stop(), ejabberd_admin:stop(),
broadcast_c2s_shutdown(), broadcast_c2s_shutdown(),

View File

@ -10,7 +10,7 @@
-behaviour(gen_server). -behaviour(gen_server).
%% API %% API
-export([start_link/0, get_node/1, get_node_new/1, announce/0, -export([start_link/0, get_node/1, get_node_new/1, announce/0, shutdown/0,
node_id/0, get_node_by_id/1, get_nodes/0, rehash_timeout/0]). node_id/0, get_node_by_id/1, get_nodes/0, rehash_timeout/0]).
%% gen_server callbacks %% gen_server callbacks
@ -68,6 +68,14 @@ get_node_by_id(NodeID) ->
node() node()
end. end.
shutdown() ->
lists:foreach(
fun(Node) when Node /= node() ->
{ejabberd_cluster, Node} ! {node_down, node()};
(_) ->
ok
end, get_nodes()).
%%==================================================================== %%====================================================================
%% gen_server callbacks %% gen_server callbacks
%%==================================================================== %%====================================================================
@ -118,6 +126,10 @@ handle_info({node_ready, Node}, State) ->
?INFO_MSG("adding node ~p to hash", [Node]), ?INFO_MSG("adding node ~p to hash", [Node]),
append_node(?HASHTBL, Node), append_node(?HASHTBL, Node),
{noreply, State}; {noreply, State};
handle_info({node_down, Node}, State) ->
delete_node(?HASHTBL, Node),
delete_node(?HASHTBL_NEW, Node),
{noreply, State};
handle_info({nodedown, Node, _}, State) -> handle_info({nodedown, Node, _}, State) ->
?INFO_MSG("node ~p goes down", [Node]), ?INFO_MSG("node ~p goes down", [Node]),
delete_node(?HASHTBL, Node), delete_node(?HASHTBL, Node),

View File

@ -525,8 +525,8 @@ relay_messages(MRef, TRef, Clone, Queue) ->
relay_messages(MRef, TRef, Clone) -> relay_messages(MRef, TRef, Clone) ->
receive receive
{'DOWN', MRef, process, Clone, Reason} -> {'DOWN', MRef, process, Clone, _Reason} ->
Reason; normal;
{'EXIT', _Parent, _Reason} -> {'EXIT', _Parent, _Reason} ->
{migrated, Clone}; {migrated, Clone};
{timeout, TRef, timeout} -> {timeout, TRef, timeout} ->
@ -563,15 +563,7 @@ handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time,
Time1 Time1
end, end,
Now = now(), Now = now(),
Reason = case catch rpc:call(Node, M, F, A, RPCTimeout) of Reason = case catch rpc_call(Node, M, F, A, RPCTimeout) of
{badrpc, timeout} ->
normal;
{badrpc, _} = Err ->
{migration_error, Err};
{'EXIT', _} = Err ->
{migration_error, Err};
{error, _} = Err ->
{migration_error, Err};
{ok, Clone} -> {ok, Clone} ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
MRef = erlang:monitor(process, Clone), MRef = erlang:monitor(process, Clone),
@ -579,8 +571,8 @@ handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time,
TimeLeft = lists:max([Time1 - NowDiff, 0]), TimeLeft = lists:max([Time1 - NowDiff, 0]),
TRef = erlang:start_timer(TimeLeft, self(), timeout), TRef = erlang:start_timer(TimeLeft, self(), timeout),
relay_messages(MRef, TRef, Clone, Queue); relay_messages(MRef, TRef, Clone, Queue);
Reply -> _ ->
{migration_error, {bad_reply, Reply}} normal
end, end,
terminate(Reason, Name, Msg, Mod, StateName, NStateData, []); terminate(Reason, Name, Msg, Mod, StateName, NStateData, []);
{stop, Reason, NStateData} -> {stop, Reason, NStateData} ->
@ -628,15 +620,7 @@ handle_msg(Msg, Parent, Name, StateName, StateData,
Time1 Time1
end, end,
Now = now(), Now = now(),
Reason = case catch rpc:call(Node, M, F, A, RPCTimeout) of Reason = case catch rpc_call(Node, M, F, A, RPCTimeout) of
{badrpc, timeout} ->
normal;
{badrpc, R} ->
{migration_error, R};
{'EXIT', R} ->
{migration_error, R};
{error, R} ->
{migration_error, R};
{ok, Clone} -> {ok, Clone} ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
MRef = erlang:monitor(process, Clone), MRef = erlang:monitor(process, Clone),
@ -644,8 +628,8 @@ handle_msg(Msg, Parent, Name, StateName, StateData,
TimeLeft = lists:max([Time1 - NowDiff, 0]), TimeLeft = lists:max([Time1 - NowDiff, 0]),
TRef = erlang:start_timer(TimeLeft, self(), timeout), TRef = erlang:start_timer(TimeLeft, self(), timeout),
relay_messages(MRef, TRef, Clone, Queue); relay_messages(MRef, TRef, Clone, Queue);
Reply -> _ ->
{migration_error, {bad_reply, Reply}} normal
end, end,
terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug); terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug);
{stop, Reason, NStateData} -> {stop, Reason, NStateData} ->
@ -832,3 +816,34 @@ message_queue_len(#limits{max_queue = MaxQueue}, QueueLen) ->
_ -> _ ->
ok ok
end. end.
rpc_call(Node, Mod, Fun, Args, Timeout) ->
Ref = make_ref(),
Caller = self(),
F = fun() ->
group_leader(whereis(user), self()),
case catch apply(Mod, Fun, Args) of
{'EXIT', _} = Err ->
Caller ! {Ref, {badrpc, Err}};
Result ->
Caller ! {Ref, Result}
end
end,
Pid = spawn(Node, F),
MRef = erlang:monitor(process, Pid),
receive
{Ref, Result} ->
erlang:demonitor(MRef, [flush]),
Result;
{'DOWN', MRef, _, _, Reason} ->
{badrpc, Reason}
after Timeout ->
erlang:demonitor(MRef, [flush]),
catch exit(Pid, kill),
receive
{Ref, Result} ->
Result
after 0 ->
{badrpc, timeout}
end
end.