* src/ejabberd_hooks.erl: Support distributed hooks (EJAB-829)

SVN Revision: 2047
This commit is contained in:
Badlop 2009-04-28 14:57:16 +00:00
parent e45e486fd2
commit b2659e4a6f
2 changed files with 91 additions and 0 deletions

View File

@ -1,3 +1,7 @@
2009-04-28 Badlop <badlop@process-one.net>
* src/ejabberd_hooks.erl: Support distributed hooks (EJAB-829)
2009-04-27 Badlop <badlop@process-one.net>
* src/xml.erl: More verbose error reporting for

View File

@ -33,12 +33,16 @@
-export([start_link/0,
add/3,
add/4,
add_dist/5,
delete/3,
delete/4,
delete_dist/5,
run/2,
run_fold/3,
add/5,
add_dist/6,
delete/5,
delete_dist/6,
run/3,
run_fold/4]).
@ -52,6 +56,9 @@
-include("ejabberd.hrl").
%% Timeout of 5 seconds in calls to distributed hooks
-define(TIMEOUT_DISTRIBUTED_HOOK, 5000).
-record(state, {}).
%%%----------------------------------------------------------------------
@ -77,6 +84,12 @@ add(Hook, Module, Function, Seq) ->
add(Hook, Host, Module, Function, Seq) ->
gen_server:call(ejabberd_hooks, {add, Hook, Host, Module, Function, Seq}).
add_dist(Hook, Node, Module, Function, Seq) ->
gen_server:call(ejabberd_hooks, {add, Hook, global, Node, Module, Function, Seq}).
add_dist(Hook, Host, Node, Module, Function, Seq) ->
gen_server:call(ejabberd_hooks, {add, Hook, Host, Node, Module, Function, Seq}).
%% @spec (Hook::atom(), Function::function(), Seq::integer()) -> ok
%% @doc See del/4.
delete(Hook, Function, Seq) when is_function(Function) ->
@ -94,6 +107,12 @@ delete(Hook, Module, Function, Seq) ->
delete(Hook, Host, Module, Function, Seq) ->
gen_server:call(ejabberd_hooks, {delete, Hook, Host, Module, Function, Seq}).
delete_dist(Hook, Node, Module, Function, Seq) ->
delete_dist(Hook, global, Node, Module, Function, Seq).
delete_dist(Hook, Host, Node, Module, Function, Seq) ->
gen_server:call(ejabberd_hooks, {delete, Hook, Host, Node, Module, Function, Seq}).
%% @spec (Hook::atom(), Args) -> ok
%% @doc Run the calls of this hook in order, don't care about function results.
%% If a call returns stop, no more calls are performed.
@ -167,6 +186,24 @@ handle_call({add, Hook, Host, Module, Function, Seq}, _From, State) ->
ok
end,
{reply, Reply, State};
handle_call({add, Hook, Host, Node, Module, Function, Seq}, _From, State) ->
Reply = case ets:lookup(hooks, {Hook, Host}) of
[{_, Ls}] ->
El = {Seq, Node, Module, Function},
case lists:member(El, Ls) of
true ->
ok;
false ->
NewLs = lists:merge(Ls, [El]),
ets:insert(hooks, {{Hook, Host}, NewLs}),
ok
end;
[] ->
NewLs = [{Seq, Node, Module, Function}],
ets:insert(hooks, {{Hook, Host}, NewLs}),
ok
end,
{reply, Reply, State};
handle_call({delete, Hook, Host, Module, Function, Seq}, _From, State) ->
Reply = case ets:lookup(hooks, {Hook, Host}) of
[{_, Ls}] ->
@ -177,6 +214,16 @@ handle_call({delete, Hook, Host, Module, Function, Seq}, _From, State) ->
ok
end,
{reply, Reply, State};
handle_call({delete, Hook, Host, Node, Module, Function, Seq}, _From, State) ->
Reply = case ets:lookup(hooks, {Hook, Host}) of
[{_, Ls}] ->
NewLs = lists:delete({Seq, Node, Module, Function}, Ls),
ets:insert(hooks, {{Hook, Host}, NewLs}),
ok;
[] ->
ok
end,
{reply, Reply, State};
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
@ -217,6 +264,25 @@ code_change(_OldVsn, State, _Extra) ->
run1([], _Hook, _Args) ->
ok;
run1([{_Seq, Node, Module, Function} | Ls], Hook, Args) ->
case rpc:call(Node, Module, Function, Args, ?TIMEOUT_DISTRIBUTED_HOOK) of
timeout ->
?ERROR_MSG("Timeout on RPC to ~p~nrunning hook: ~p",
[Node, {Hook, Args}]),
run1(Ls, Hook, Args);
{badrpc, Reason} ->
?ERROR_MSG("Bad RPC error to ~p: ~p~nrunning hook: ~p",
[Node, Reason, {Hook, Args}]),
run1(Ls, Hook, Args);
stop ->
?INFO_MSG("~nThe process ~p in node ~p ran a hook in node ~p.~n"
"Stop.", [self(), node(), Node]), % debug code
ok;
Res ->
?INFO_MSG("~nThe process ~p in node ~p ran a hook in node ~p.~n"
"The response is:~n~s", [self(), node(), Node, Res]), % debug code
run1(Ls, Hook, Args)
end;
run1([{_Seq, Module, Function} | Ls], Hook, Args) ->
Res = if is_function(Function) ->
catch apply(Function, Args);
@ -237,6 +303,27 @@ run1([{_Seq, Module, Function} | Ls], Hook, Args) ->
run_fold1([], _Hook, Val, _Args) ->
Val;
run_fold1([{_Seq, Node, Module, Function} | Ls], Hook, Val, Args) ->
case rpc:call(Node, Module, Function, [Val | Args], ?TIMEOUT_DISTRIBUTED_HOOK) of
{badrpc, Reason} ->
?ERROR_MSG("Bad RPC error to ~p: ~p~nrunning hook: ~p",
[Node, Reason, {Hook, Args}]),
run_fold1(Ls, Hook, Val, Args);
timeout ->
?ERROR_MSG("Timeout on RPC to ~p~nrunning hook: ~p",
[Node, {Hook, Args}]),
run_fold1(Ls, Hook, Val, Args);
stop ->
stopped;
{stop, NewVal} ->
?INFO_MSG("~nThe process ~p in node ~p ran a hook in node ~p.~n"
"Stop, and the NewVal is:~n~p", [self(), node(), Node, NewVal]), % debug code
NewVal;
NewVal ->
?INFO_MSG("~nThe process ~p in node ~p ran a hook in node ~p.~n"
"The NewVal is:~n~p", [self(), node(), Node, NewVal]), % debug code
run_fold1(Ls, Hook, NewVal, Args)
end;
run_fold1([{_Seq, Module, Function} | Ls], Hook, Val, Args) ->
Res = if is_function(Function) ->
catch apply(Function, [Val | Args]);