From b2659e4a6fa5ca231681fcafcb5b714f7efedd4f Mon Sep 17 00:00:00 2001 From: Badlop Date: Tue, 28 Apr 2009 14:57:16 +0000 Subject: [PATCH] * src/ejabberd_hooks.erl: Support distributed hooks (EJAB-829) SVN Revision: 2047 --- ChangeLog | 4 ++ src/ejabberd_hooks.erl | 87 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/ChangeLog b/ChangeLog index bd175f4ad..7b0ba705d 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,7 @@ +2009-04-28 Badlop + + * src/ejabberd_hooks.erl: Support distributed hooks (EJAB-829) + 2009-04-27 Badlop * src/xml.erl: More verbose error reporting for diff --git a/src/ejabberd_hooks.erl b/src/ejabberd_hooks.erl index 9851a241c..bb71f14e1 100644 --- a/src/ejabberd_hooks.erl +++ b/src/ejabberd_hooks.erl @@ -33,12 +33,16 @@ -export([start_link/0, add/3, add/4, + add_dist/5, delete/3, delete/4, + delete_dist/5, run/2, run_fold/3, add/5, + add_dist/6, delete/5, + delete_dist/6, run/3, run_fold/4]). @@ -52,6 +56,9 @@ -include("ejabberd.hrl"). +%% Timeout of 5 seconds in calls to distributed hooks +-define(TIMEOUT_DISTRIBUTED_HOOK, 5000). + -record(state, {}). %%%---------------------------------------------------------------------- @@ -77,6 +84,12 @@ add(Hook, Module, Function, Seq) -> add(Hook, Host, Module, Function, Seq) -> gen_server:call(ejabberd_hooks, {add, Hook, Host, Module, Function, Seq}). +add_dist(Hook, Node, Module, Function, Seq) -> + gen_server:call(ejabberd_hooks, {add, Hook, global, Node, Module, Function, Seq}). + +add_dist(Hook, Host, Node, Module, Function, Seq) -> + gen_server:call(ejabberd_hooks, {add, Hook, Host, Node, Module, Function, Seq}). + %% @spec (Hook::atom(), Function::function(), Seq::integer()) -> ok %% @doc See del/4. delete(Hook, Function, Seq) when is_function(Function) -> @@ -94,6 +107,12 @@ delete(Hook, Module, Function, Seq) -> delete(Hook, Host, Module, Function, Seq) -> gen_server:call(ejabberd_hooks, {delete, Hook, Host, Module, Function, Seq}). +delete_dist(Hook, Node, Module, Function, Seq) -> + delete_dist(Hook, global, Node, Module, Function, Seq). + +delete_dist(Hook, Host, Node, Module, Function, Seq) -> + gen_server:call(ejabberd_hooks, {delete, Hook, Host, Node, Module, Function, Seq}). + %% @spec (Hook::atom(), Args) -> ok %% @doc Run the calls of this hook in order, don't care about function results. %% If a call returns stop, no more calls are performed. @@ -167,6 +186,24 @@ handle_call({add, Hook, Host, Module, Function, Seq}, _From, State) -> ok end, {reply, Reply, State}; +handle_call({add, Hook, Host, Node, Module, Function, Seq}, _From, State) -> + Reply = case ets:lookup(hooks, {Hook, Host}) of + [{_, Ls}] -> + El = {Seq, Node, Module, Function}, + case lists:member(El, Ls) of + true -> + ok; + false -> + NewLs = lists:merge(Ls, [El]), + ets:insert(hooks, {{Hook, Host}, NewLs}), + ok + end; + [] -> + NewLs = [{Seq, Node, Module, Function}], + ets:insert(hooks, {{Hook, Host}, NewLs}), + ok + end, + {reply, Reply, State}; handle_call({delete, Hook, Host, Module, Function, Seq}, _From, State) -> Reply = case ets:lookup(hooks, {Hook, Host}) of [{_, Ls}] -> @@ -177,6 +214,16 @@ handle_call({delete, Hook, Host, Module, Function, Seq}, _From, State) -> ok end, {reply, Reply, State}; +handle_call({delete, Hook, Host, Node, Module, Function, Seq}, _From, State) -> + Reply = case ets:lookup(hooks, {Hook, Host}) of + [{_, Ls}] -> + NewLs = lists:delete({Seq, Node, Module, Function}, Ls), + ets:insert(hooks, {{Hook, Host}, NewLs}), + ok; + [] -> + ok + end, + {reply, Reply, State}; handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. @@ -217,6 +264,25 @@ code_change(_OldVsn, State, _Extra) -> run1([], _Hook, _Args) -> ok; +run1([{_Seq, Node, Module, Function} | Ls], Hook, Args) -> + case rpc:call(Node, Module, Function, Args, ?TIMEOUT_DISTRIBUTED_HOOK) of + timeout -> + ?ERROR_MSG("Timeout on RPC to ~p~nrunning hook: ~p", + [Node, {Hook, Args}]), + run1(Ls, Hook, Args); + {badrpc, Reason} -> + ?ERROR_MSG("Bad RPC error to ~p: ~p~nrunning hook: ~p", + [Node, Reason, {Hook, Args}]), + run1(Ls, Hook, Args); + stop -> + ?INFO_MSG("~nThe process ~p in node ~p ran a hook in node ~p.~n" + "Stop.", [self(), node(), Node]), % debug code + ok; + Res -> + ?INFO_MSG("~nThe process ~p in node ~p ran a hook in node ~p.~n" + "The response is:~n~s", [self(), node(), Node, Res]), % debug code + run1(Ls, Hook, Args) + end; run1([{_Seq, Module, Function} | Ls], Hook, Args) -> Res = if is_function(Function) -> catch apply(Function, Args); @@ -237,6 +303,27 @@ run1([{_Seq, Module, Function} | Ls], Hook, Args) -> run_fold1([], _Hook, Val, _Args) -> Val; +run_fold1([{_Seq, Node, Module, Function} | Ls], Hook, Val, Args) -> + case rpc:call(Node, Module, Function, [Val | Args], ?TIMEOUT_DISTRIBUTED_HOOK) of + {badrpc, Reason} -> + ?ERROR_MSG("Bad RPC error to ~p: ~p~nrunning hook: ~p", + [Node, Reason, {Hook, Args}]), + run_fold1(Ls, Hook, Val, Args); + timeout -> + ?ERROR_MSG("Timeout on RPC to ~p~nrunning hook: ~p", + [Node, {Hook, Args}]), + run_fold1(Ls, Hook, Val, Args); + stop -> + stopped; + {stop, NewVal} -> + ?INFO_MSG("~nThe process ~p in node ~p ran a hook in node ~p.~n" + "Stop, and the NewVal is:~n~p", [self(), node(), Node, NewVal]), % debug code + NewVal; + NewVal -> + ?INFO_MSG("~nThe process ~p in node ~p ran a hook in node ~p.~n" + "The NewVal is:~n~p", [self(), node(), Node, NewVal]), % debug code + run_fold1(Ls, Hook, NewVal, Args) + end; run_fold1([{_Seq, Module, Function} | Ls], Hook, Val, Args) -> Res = if is_function(Function) -> catch apply(Function, [Val | Args]);