25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-09-29 14:37:44 +02:00

Make gen_iq_handlers more robust (EJABS-1758)

Conflicts:

	src/ejabberd_local.erl
	src/ejabberd_sm.erl
	src/gen_iq_handler.erl
This commit is contained in:
Evgeniy Khramtsov 2012-05-25 17:40:21 +10:00
parent 6b7d70adf6
commit b82789f11d
4 changed files with 194 additions and 124 deletions

View File

@ -66,30 +66,39 @@ start_link() ->
process_iq(From, To, Packet) ->
IQ = jlib:iq_query_info(Packet),
case IQ of
#iq{xmlns = XMLNS} ->
Host = To#jid.lserver,
case ets:lookup(?IQTABLE, {XMLNS, Host}) of
[{_, Module, Function}] ->
ResIQ = Module:Function(From, To, IQ),
if ResIQ /= ignore ->
ejabberd_router:route(To, From, jlib:iq_to_xml(ResIQ));
true -> ok
end;
[{_, Module, Function, Opts}] ->
gen_iq_handler:handle(Host, Module, Function, Opts,
From, To, IQ);
[] ->
Err = jlib:make_error_reply(Packet,
?ERR_FEATURE_NOT_IMPLEMENTED),
ejabberd_router:route(To, From, Err)
end;
reply ->
IQReply = jlib:iq_query_or_response_info(Packet),
process_iq_reply(From, To, IQReply);
_ ->
Err = jlib:make_error_reply(Packet, ?ERR_BAD_REQUEST),
ejabberd_router:route(To, From, Err),
ok
#iq{xmlns = XMLNS} ->
Host = To#jid.lserver,
case ets:lookup(?IQTABLE, {XMLNS, Host}) of
[{_, Module, Function}] ->
ResIQ = Module:Function(From, To, IQ),
if
ResIQ /= ignore ->
ejabberd_router:route(
To, From, jlib:iq_to_xml(ResIQ));
true ->
ok
end;
[{_, Module, Function, Opts1}|Tail] ->
Opts = if is_pid(Opts1) ->
[Opts1 |
[Pid || {_, _, _, Pid} <- Tail]];
true ->
Opts1
end,
gen_iq_handler:handle(Host, Module, Function, Opts,
From, To, IQ);
[] ->
Err = jlib:make_error_reply(
Packet, ?ERR_FEATURE_NOT_IMPLEMENTED),
ejabberd_router:route(To, From, Err)
end;
reply ->
IQReply = jlib:iq_query_or_response_info(Packet),
process_iq_reply(From, To, IQReply);
_ ->
Err = jlib:make_error_reply(Packet, ?ERR_BAD_REQUEST),
ejabberd_router:route(To, From, Err),
ok
end.
process_iq_reply(From, To, #iq{id = ID} = IQ) ->
@ -167,16 +176,13 @@ bounce_resource_packet(From, To, Packet) ->
%%====================================================================
init([]) ->
lists:foreach(fun (Host) ->
ejabberd_router:register_route(Host,
{apply, ?MODULE,
route}),
ejabberd_hooks:add(local_send_to_resource_hook, Host,
?MODULE, bounce_resource_packet,
100)
end,
?MYHOSTS),
catch ets:new(?IQTABLE, [named_table, public]),
lists:foreach(
fun(Host) ->
ejabberd_router:register_route(Host, {apply, ?MODULE, route}),
ejabberd_hooks:add(local_send_to_resource_hook, Host,
?MODULE, bounce_resource_packet, 100)
end, ?MYHOSTS),
catch ets:new(?IQTABLE, [named_table, public, bag]),
mnesia:delete_table(iq_response),
catch ets:new(iq_response,
[named_table, public, {keypos, #iq_response.id}]),
@ -201,19 +207,23 @@ handle_info({register_iq_handler, Host, XMLNS, Module,
ets:insert(?IQTABLE, {{XMLNS, Host}, Module, Function}),
catch mod_disco:register_feature(Host, XMLNS),
{noreply, State};
handle_info({register_iq_handler, Host, XMLNS, Module,
Function, Opts},
State) ->
ets:insert(?IQTABLE,
{{XMLNS, Host}, Module, Function, Opts}),
handle_info({register_iq_handler, Host, XMLNS, Module, Function, Opts}, State) ->
ets:insert(?IQTABLE, {{XMLNS, Host}, Module, Function, Opts}),
if is_pid(Opts) ->
erlang:monitor(process, Opts);
true ->
ok
end,
catch mod_disco:register_feature(Host, XMLNS),
{noreply, State};
handle_info({unregister_iq_handler, Host, XMLNS},
State) ->
case ets:lookup(?IQTABLE, {XMLNS, Host}) of
[{_, Module, Function, Opts}] ->
gen_iq_handler:stop_iq_handler(Module, Function, Opts);
_ -> ok
[{_, Module, Function, Opts1}|Tail] when is_pid(Opts1) ->
Opts = [Opts1 | [Pid || {_, _, _, Pid} <- Tail]],
gen_iq_handler:stop_iq_handler(Module, Function, Opts);
_ ->
ok
end,
ets:delete(?IQTABLE, {XMLNS, Host}),
catch mod_disco:unregister_feature(Host, XMLNS),
@ -233,12 +243,36 @@ handle_info(refresh_iq_handlers, State) ->
handle_info({timeout, _TRef, ID}, State) ->
spawn(fun () -> process_iq_timeout(ID) end),
{noreply, State};
handle_info(_Info, State) -> {noreply, State}.
handle_info({'DOWN', _MRef, _Type, Pid, _Info}, State) ->
Rs = ets:select(?IQTABLE,
[{{'_','_','_','$1'},
[{'==', '$1', Pid}],
['$_']}]),
lists:foreach(fun(R) -> ets:delete_object(?IQTABLE, R) end, Rs),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) -> ok.
%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> void()
%% Description: This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) -> {ok, State}.
%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
do_route(From, To, Packet) ->
?DEBUG("local route~n\tfrom ~p~n\tto ~p~n\tpacket "
"~P~n",

View File

@ -422,7 +422,7 @@ init([]) ->
mnesia:add_table_index(session, usr),
mnesia:add_table_index(session, us),
mnesia:add_table_copy(session, node(), ram_copies),
ets:new(sm_iqtable, [named_table]),
ets:new(sm_iqtable, [named_table, bag]),
ejabberd_hooks:add(node_up, ?MODULE, node_up, 100),
ejabberd_hooks:add(node_down, ?MODULE, node_down, 100),
ejabberd_hooks:add(node_hash_update, ?MODULE, migrate,
@ -467,22 +467,34 @@ handle_info({register_iq_handler, Host, XMLNS, Module,
ets:insert(sm_iqtable,
{{XMLNS, Host}, Module, Function}),
{noreply, State};
handle_info({register_iq_handler, Host, XMLNS, Module,
Function, Opts},
State) ->
ets:insert(sm_iqtable,
{{XMLNS, Host}, Module, Function, Opts}),
handle_info({register_iq_handler, Host, XMLNS, Module, Function, Opts}, State) ->
ets:insert(sm_iqtable, {{XMLNS, Host}, Module, Function, Opts}),
if is_pid(Opts) ->
erlang:monitor(process, Opts);
true ->
ok
end,
{noreply, State};
handle_info({unregister_iq_handler, Host, XMLNS},
State) ->
case ets:lookup(sm_iqtable, {XMLNS, Host}) of
[{_, Module, Function, Opts}] ->
gen_iq_handler:stop_iq_handler(Module, Function, Opts);
_ -> ok
[{_, Module, Function, Opts1}|Tail] when is_pid(Opts1) ->
Opts = [Opts1 | [Pid || {_, _, _, Pid} <- Tail]],
gen_iq_handler:stop_iq_handler(Module, Function, Opts);
_ ->
ok
end,
ets:delete(sm_iqtable, {XMLNS, Host}),
{noreply, State};
handle_info(_Info, State) -> {noreply, State}.
handle_info({'DOWN', _MRef, _Type, Pid, _Info}, State) ->
Rs = ets:select(sm_iqtable,
[{{'_','_','_','$1'},
[{'==', '$1', Pid}],
['$_']}]),
lists:foreach(fun(R) -> ets:delete_object(sm_iqtable, R) end, Rs),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ejabberd_hooks:delete(node_up, ?MODULE, node_up, 100),
@ -817,28 +829,38 @@ get_max_user_sessions(LUser, Host) ->
process_iq(From, To, Packet) ->
IQ = jlib:iq_query_info(Packet),
case IQ of
#iq{xmlns = XMLNS} ->
Host = To#jid.lserver,
case ets:lookup(sm_iqtable, {XMLNS, Host}) of
[{_, Module, Function}] ->
ResIQ = Module:Function(From, To, IQ),
if ResIQ /= ignore ->
ejabberd_router:route(To, From, jlib:iq_to_xml(ResIQ));
true -> ok
end;
[{_, Module, Function, Opts}] ->
gen_iq_handler:handle(Host, Module, Function, Opts,
From, To, IQ);
[] ->
Err = jlib:make_error_reply(Packet,
?ERR_SERVICE_UNAVAILABLE),
ejabberd_router:route(To, From, Err)
end;
reply -> ok;
_ ->
Err = jlib:make_error_reply(Packet, ?ERR_BAD_REQUEST),
ejabberd_router:route(To, From, Err),
ok
#iq{xmlns = XMLNS} ->
Host = To#jid.lserver,
case ets:lookup(sm_iqtable, {XMLNS, Host}) of
[{_, Module, Function}] ->
ResIQ = Module:Function(From, To, IQ),
if
ResIQ /= ignore ->
ejabberd_router:route(To, From,
jlib:iq_to_xml(ResIQ));
true ->
ok
end;
[{_, Module, Function, Opts1}|Tail] ->
Opts = if is_pid(Opts1) ->
[Opts1 |
[Pid || {_, _, _, Pid} <- Tail]];
true ->
Opts1
end,
gen_iq_handler:handle(Host, Module, Function, Opts,
From, To, IQ);
[] ->
Err = jlib:make_error_reply(
Packet, ?ERR_SERVICE_UNAVAILABLE),
ejabberd_router:route(To, From, Err)
end;
reply ->
ok;
_ ->
Err = jlib:make_error_reply(Packet, ?ERR_BAD_REQUEST),
ejabberd_router:route(To, From, Err),
ok
end.
-spec force_update_presence({binary(), binary()}) -> any().

View File

@ -163,12 +163,11 @@ init([]) ->
[ejabberd_tmp_sup]},
IQSupervisor =
{ejabberd_iq_sup,
{ejabberd_tmp_sup, start_link,
[ejabberd_iq_sup, gen_iq_handler]},
{ejabberd_iq_sup, start_link, []},
permanent,
infinity,
supervisor,
[ejabberd_tmp_sup]},
[ejabberd_iq_sup]},
STUNSupervisor =
{ejabberd_stun_sup,
{ejabberd_tmp_sup, start_link,

View File

@ -31,7 +31,7 @@
-behaviour(gen_server).
%% API
-export([start_link/3, add_iq_handler/6,
-export([start_link/5, add_iq_handler/6,
remove_iq_handler/3, stop_iq_handler/3, handle/7,
process_iq/6, check_type/1]).
@ -50,9 +50,17 @@
-type type() :: no_queue | one_queue | {queues, pos_integer()} | parallel.
-type opts() :: no_queue | {one_queue, pid()} | {queues, [pid()]} | parallel.
start_link(Host, Module, Function) ->
gen_server:start_link(?MODULE, [Host, Module, Function],
[]).
%%====================================================================
%% API
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link(Component, Host, NS, Module, Function) ->
{ok, Pid} = gen_server:start_link(?MODULE, [Host, Module, Function], []),
Component:register_iq_handler(Host, NS, Module, Function, Pid),
{ok, Pid}.
-spec add_iq_handler(component(), binary(), binary(),
atom(), atom(), type()) -> any().
@ -60,28 +68,17 @@ start_link(Host, Module, Function) ->
add_iq_handler(Component, Host, NS, Module, Function,
Type) ->
case Type of
no_queue ->
Component:register_iq_handler(Host, NS, Module,
Function, no_queue);
one_queue ->
{ok, Pid} = supervisor:start_child(ejabberd_iq_sup,
[Host, Module, Function]),
Component:register_iq_handler(Host, NS, Module,
Function, {one_queue, Pid});
{queues, N} ->
Pids = lists:map(fun (_) ->
{ok, Pid} =
supervisor:start_child(ejabberd_iq_sup,
[Host, Module,
Function]),
Pid
end,
lists:seq(1, N)),
Component:register_iq_handler(Host, NS, Module,
Function, {queues, Pids});
parallel ->
Component:register_iq_handler(Host, NS, Module,
Function, parallel)
no_queue ->
Component:register_iq_handler(Host, NS, Module, Function, no_queue);
one_queue ->
start_handler(Component, Host, NS, Module, Function);
{queues, N} ->
lists:foreach(
fun(_) ->
start_handler(Component, Host, NS, Module, Function)
end, lists:seq(1, N));
parallel ->
Component:register_iq_handler(Host, NS, Module, Function, parallel)
end.
-spec remove_iq_handler(component(), binary(), binary()) -> any().
@ -93,30 +90,26 @@ remove_iq_handler(Component, Host, NS) ->
stop_iq_handler(_Module, _Function, Opts) ->
case Opts of
{one_queue, Pid} -> gen_server:call(Pid, stop);
{queues, Pids} ->
lists:foreach(fun (Pid) ->
catch gen_server:call(Pid, stop)
end,
Pids);
_ -> ok
[_|_] = Pids ->
stop_handlers(Pids);
_ ->
ok
end.
-spec handle(binary(), atom(), atom(), opts(), jid(), jid(), iq()) -> any().
handle(Host, Module, Function, Opts, From, To, IQ) ->
case Opts of
no_queue ->
process_iq(Host, Module, Function, From, To, IQ);
{one_queue, Pid} -> Pid ! {process_iq, From, To, IQ};
{queues, Pids} ->
Pid = lists:nth(erlang:phash(now(), length(Pids)),
Pids),
Pid ! {process_iq, From, To, IQ};
parallel ->
spawn(?MODULE, process_iq,
[Host, Module, Function, From, To, IQ]);
_ -> todo
no_queue ->
process_iq(Host, Module, Function, From, To, IQ);
parallel ->
spawn(?MODULE, process_iq, [Host, Module, Function, From, To, IQ]);
[_|_] = Pids ->
Pid = lists:nth(erlang:phash(now(), length(Pids)), Pids),
Pid ! {process_iq, From, To, IQ};
_ ->
?ERROR_MSG("unexpected iqdisc options = ~p", [Opts]),
todo
end.
-spec process_iq(binary(), atom(), atom(), jid(), jid(), iq()) -> any().
@ -138,6 +131,28 @@ check_type(one_queue) -> one_queue;
check_type({queues, N}) when is_integer(N), N>0 -> {queues, N};
check_type(parallel) -> parallel.
start_handler(Component, Host, NS, Module, Function) ->
Spec = {{?MODULE, make_ref()},
{?MODULE, start_link, [Component, Host, NS, Module, Function]},
permanent,
brutal_kill,
worker,
[?MODULE]},
{ok, Pid} = supervisor:start_child(ejabberd_iq_sup, Spec),
Pid.
stop_handlers(Pids) ->
lists:foreach(
fun({Id, Pid, _, _}) ->
case lists:member(Pid, Pids) of
true ->
supervisor:terminate_child(ejabberd_iq_sup, Id),
supervisor:delete_child(ejabberd_iq_sup, Id);
false ->
ok
end
end, supervisor:which_children(ejabberd_iq_sup)).
%%====================================================================
%% gen_server callbacks
%%====================================================================