diff --git a/src/ejabberd_local.erl b/src/ejabberd_local.erl index 6ed49d306..e63e48eb5 100644 --- a/src/ejabberd_local.erl +++ b/src/ejabberd_local.erl @@ -66,30 +66,39 @@ start_link() -> process_iq(From, To, Packet) -> IQ = jlib:iq_query_info(Packet), case IQ of - #iq{xmlns = XMLNS} -> - Host = To#jid.lserver, - case ets:lookup(?IQTABLE, {XMLNS, Host}) of - [{_, Module, Function}] -> - ResIQ = Module:Function(From, To, IQ), - if ResIQ /= ignore -> - ejabberd_router:route(To, From, jlib:iq_to_xml(ResIQ)); - true -> ok - end; - [{_, Module, Function, Opts}] -> - gen_iq_handler:handle(Host, Module, Function, Opts, - From, To, IQ); - [] -> - Err = jlib:make_error_reply(Packet, - ?ERR_FEATURE_NOT_IMPLEMENTED), - ejabberd_router:route(To, From, Err) - end; - reply -> - IQReply = jlib:iq_query_or_response_info(Packet), - process_iq_reply(From, To, IQReply); - _ -> - Err = jlib:make_error_reply(Packet, ?ERR_BAD_REQUEST), - ejabberd_router:route(To, From, Err), - ok + #iq{xmlns = XMLNS} -> + Host = To#jid.lserver, + case ets:lookup(?IQTABLE, {XMLNS, Host}) of + [{_, Module, Function}] -> + ResIQ = Module:Function(From, To, IQ), + if + ResIQ /= ignore -> + ejabberd_router:route( + To, From, jlib:iq_to_xml(ResIQ)); + true -> + ok + end; + [{_, Module, Function, Opts1}|Tail] -> + Opts = if is_pid(Opts1) -> + [Opts1 | + [Pid || {_, _, _, Pid} <- Tail]]; + true -> + Opts1 + end, + gen_iq_handler:handle(Host, Module, Function, Opts, + From, To, IQ); + [] -> + Err = jlib:make_error_reply( + Packet, ?ERR_FEATURE_NOT_IMPLEMENTED), + ejabberd_router:route(To, From, Err) + end; + reply -> + IQReply = jlib:iq_query_or_response_info(Packet), + process_iq_reply(From, To, IQReply); + _ -> + Err = jlib:make_error_reply(Packet, ?ERR_BAD_REQUEST), + ejabberd_router:route(To, From, Err), + ok end. process_iq_reply(From, To, #iq{id = ID} = IQ) -> @@ -167,16 +176,13 @@ bounce_resource_packet(From, To, Packet) -> %%==================================================================== init([]) -> - lists:foreach(fun (Host) -> - ejabberd_router:register_route(Host, - {apply, ?MODULE, - route}), - ejabberd_hooks:add(local_send_to_resource_hook, Host, - ?MODULE, bounce_resource_packet, - 100) - end, - ?MYHOSTS), - catch ets:new(?IQTABLE, [named_table, public]), + lists:foreach( + fun(Host) -> + ejabberd_router:register_route(Host, {apply, ?MODULE, route}), + ejabberd_hooks:add(local_send_to_resource_hook, Host, + ?MODULE, bounce_resource_packet, 100) + end, ?MYHOSTS), + catch ets:new(?IQTABLE, [named_table, public, bag]), mnesia:delete_table(iq_response), catch ets:new(iq_response, [named_table, public, {keypos, #iq_response.id}]), @@ -201,19 +207,23 @@ handle_info({register_iq_handler, Host, XMLNS, Module, ets:insert(?IQTABLE, {{XMLNS, Host}, Module, Function}), catch mod_disco:register_feature(Host, XMLNS), {noreply, State}; -handle_info({register_iq_handler, Host, XMLNS, Module, - Function, Opts}, - State) -> - ets:insert(?IQTABLE, - {{XMLNS, Host}, Module, Function, Opts}), +handle_info({register_iq_handler, Host, XMLNS, Module, Function, Opts}, State) -> + ets:insert(?IQTABLE, {{XMLNS, Host}, Module, Function, Opts}), + if is_pid(Opts) -> + erlang:monitor(process, Opts); + true -> + ok + end, catch mod_disco:register_feature(Host, XMLNS), {noreply, State}; handle_info({unregister_iq_handler, Host, XMLNS}, State) -> case ets:lookup(?IQTABLE, {XMLNS, Host}) of - [{_, Module, Function, Opts}] -> - gen_iq_handler:stop_iq_handler(Module, Function, Opts); - _ -> ok + [{_, Module, Function, Opts1}|Tail] when is_pid(Opts1) -> + Opts = [Opts1 | [Pid || {_, _, _, Pid} <- Tail]], + gen_iq_handler:stop_iq_handler(Module, Function, Opts); + _ -> + ok end, ets:delete(?IQTABLE, {XMLNS, Host}), catch mod_disco:unregister_feature(Host, XMLNS), @@ -233,12 +243,36 @@ handle_info(refresh_iq_handlers, State) -> handle_info({timeout, _TRef, ID}, State) -> spawn(fun () -> process_iq_timeout(ID) end), {noreply, State}; -handle_info(_Info, State) -> {noreply, State}. +handle_info({'DOWN', _MRef, _Type, Pid, _Info}, State) -> + Rs = ets:select(?IQTABLE, + [{{'_','_','_','$1'}, + [{'==', '$1', Pid}], + ['$_']}]), + lists:foreach(fun(R) -> ets:delete_object(?IQTABLE, R) end, Rs), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. -terminate(_Reason, _State) -> ok. +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. -code_change(_OldVsn, State, _Extra) -> {ok, State}. +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- do_route(From, To, Packet) -> ?DEBUG("local route~n\tfrom ~p~n\tto ~p~n\tpacket " "~P~n", diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index 6bbcc2140..aab87550e 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -422,7 +422,7 @@ init([]) -> mnesia:add_table_index(session, usr), mnesia:add_table_index(session, us), mnesia:add_table_copy(session, node(), ram_copies), - ets:new(sm_iqtable, [named_table]), + ets:new(sm_iqtable, [named_table, bag]), ejabberd_hooks:add(node_up, ?MODULE, node_up, 100), ejabberd_hooks:add(node_down, ?MODULE, node_down, 100), ejabberd_hooks:add(node_hash_update, ?MODULE, migrate, @@ -467,22 +467,34 @@ handle_info({register_iq_handler, Host, XMLNS, Module, ets:insert(sm_iqtable, {{XMLNS, Host}, Module, Function}), {noreply, State}; -handle_info({register_iq_handler, Host, XMLNS, Module, - Function, Opts}, - State) -> - ets:insert(sm_iqtable, - {{XMLNS, Host}, Module, Function, Opts}), +handle_info({register_iq_handler, Host, XMLNS, Module, Function, Opts}, State) -> + ets:insert(sm_iqtable, {{XMLNS, Host}, Module, Function, Opts}), + if is_pid(Opts) -> + erlang:monitor(process, Opts); + true -> + ok + end, {noreply, State}; handle_info({unregister_iq_handler, Host, XMLNS}, State) -> case ets:lookup(sm_iqtable, {XMLNS, Host}) of - [{_, Module, Function, Opts}] -> - gen_iq_handler:stop_iq_handler(Module, Function, Opts); - _ -> ok + [{_, Module, Function, Opts1}|Tail] when is_pid(Opts1) -> + Opts = [Opts1 | [Pid || {_, _, _, Pid} <- Tail]], + gen_iq_handler:stop_iq_handler(Module, Function, Opts); + _ -> + ok end, ets:delete(sm_iqtable, {XMLNS, Host}), {noreply, State}; -handle_info(_Info, State) -> {noreply, State}. +handle_info({'DOWN', _MRef, _Type, Pid, _Info}, State) -> + Rs = ets:select(sm_iqtable, + [{{'_','_','_','$1'}, + [{'==', '$1', Pid}], + ['$_']}]), + lists:foreach(fun(R) -> ets:delete_object(sm_iqtable, R) end, Rs), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. terminate(_Reason, _State) -> ejabberd_hooks:delete(node_up, ?MODULE, node_up, 100), @@ -817,28 +829,38 @@ get_max_user_sessions(LUser, Host) -> process_iq(From, To, Packet) -> IQ = jlib:iq_query_info(Packet), case IQ of - #iq{xmlns = XMLNS} -> - Host = To#jid.lserver, - case ets:lookup(sm_iqtable, {XMLNS, Host}) of - [{_, Module, Function}] -> - ResIQ = Module:Function(From, To, IQ), - if ResIQ /= ignore -> - ejabberd_router:route(To, From, jlib:iq_to_xml(ResIQ)); - true -> ok - end; - [{_, Module, Function, Opts}] -> - gen_iq_handler:handle(Host, Module, Function, Opts, - From, To, IQ); - [] -> - Err = jlib:make_error_reply(Packet, - ?ERR_SERVICE_UNAVAILABLE), - ejabberd_router:route(To, From, Err) - end; - reply -> ok; - _ -> - Err = jlib:make_error_reply(Packet, ?ERR_BAD_REQUEST), - ejabberd_router:route(To, From, Err), - ok + #iq{xmlns = XMLNS} -> + Host = To#jid.lserver, + case ets:lookup(sm_iqtable, {XMLNS, Host}) of + [{_, Module, Function}] -> + ResIQ = Module:Function(From, To, IQ), + if + ResIQ /= ignore -> + ejabberd_router:route(To, From, + jlib:iq_to_xml(ResIQ)); + true -> + ok + end; + [{_, Module, Function, Opts1}|Tail] -> + Opts = if is_pid(Opts1) -> + [Opts1 | + [Pid || {_, _, _, Pid} <- Tail]]; + true -> + Opts1 + end, + gen_iq_handler:handle(Host, Module, Function, Opts, + From, To, IQ); + [] -> + Err = jlib:make_error_reply( + Packet, ?ERR_SERVICE_UNAVAILABLE), + ejabberd_router:route(To, From, Err) + end; + reply -> + ok; + _ -> + Err = jlib:make_error_reply(Packet, ?ERR_BAD_REQUEST), + ejabberd_router:route(To, From, Err), + ok end. -spec force_update_presence({binary(), binary()}) -> any(). diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl index a765ef98f..3ac0cf65e 100644 --- a/src/ejabberd_sup.erl +++ b/src/ejabberd_sup.erl @@ -163,12 +163,11 @@ init([]) -> [ejabberd_tmp_sup]}, IQSupervisor = {ejabberd_iq_sup, - {ejabberd_tmp_sup, start_link, - [ejabberd_iq_sup, gen_iq_handler]}, + {ejabberd_iq_sup, start_link, []}, permanent, infinity, supervisor, - [ejabberd_tmp_sup]}, + [ejabberd_iq_sup]}, STUNSupervisor = {ejabberd_stun_sup, {ejabberd_tmp_sup, start_link, diff --git a/src/gen_iq_handler.erl b/src/gen_iq_handler.erl index 3875425f3..00ffc95e9 100644 --- a/src/gen_iq_handler.erl +++ b/src/gen_iq_handler.erl @@ -31,7 +31,7 @@ -behaviour(gen_server). %% API --export([start_link/3, add_iq_handler/6, +-export([start_link/5, add_iq_handler/6, remove_iq_handler/3, stop_iq_handler/3, handle/7, process_iq/6, check_type/1]). @@ -50,9 +50,17 @@ -type type() :: no_queue | one_queue | {queues, pos_integer()} | parallel. -type opts() :: no_queue | {one_queue, pid()} | {queues, [pid()]} | parallel. -start_link(Host, Module, Function) -> - gen_server:start_link(?MODULE, [Host, Module, Function], - []). +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server +%%-------------------------------------------------------------------- +start_link(Component, Host, NS, Module, Function) -> + {ok, Pid} = gen_server:start_link(?MODULE, [Host, Module, Function], []), + Component:register_iq_handler(Host, NS, Module, Function, Pid), + {ok, Pid}. -spec add_iq_handler(component(), binary(), binary(), atom(), atom(), type()) -> any(). @@ -60,28 +68,17 @@ start_link(Host, Module, Function) -> add_iq_handler(Component, Host, NS, Module, Function, Type) -> case Type of - no_queue -> - Component:register_iq_handler(Host, NS, Module, - Function, no_queue); - one_queue -> - {ok, Pid} = supervisor:start_child(ejabberd_iq_sup, - [Host, Module, Function]), - Component:register_iq_handler(Host, NS, Module, - Function, {one_queue, Pid}); - {queues, N} -> - Pids = lists:map(fun (_) -> - {ok, Pid} = - supervisor:start_child(ejabberd_iq_sup, - [Host, Module, - Function]), - Pid - end, - lists:seq(1, N)), - Component:register_iq_handler(Host, NS, Module, - Function, {queues, Pids}); - parallel -> - Component:register_iq_handler(Host, NS, Module, - Function, parallel) + no_queue -> + Component:register_iq_handler(Host, NS, Module, Function, no_queue); + one_queue -> + start_handler(Component, Host, NS, Module, Function); + {queues, N} -> + lists:foreach( + fun(_) -> + start_handler(Component, Host, NS, Module, Function) + end, lists:seq(1, N)); + parallel -> + Component:register_iq_handler(Host, NS, Module, Function, parallel) end. -spec remove_iq_handler(component(), binary(), binary()) -> any(). @@ -93,30 +90,26 @@ remove_iq_handler(Component, Host, NS) -> stop_iq_handler(_Module, _Function, Opts) -> case Opts of - {one_queue, Pid} -> gen_server:call(Pid, stop); - {queues, Pids} -> - lists:foreach(fun (Pid) -> - catch gen_server:call(Pid, stop) - end, - Pids); - _ -> ok + [_|_] = Pids -> + stop_handlers(Pids); + _ -> + ok end. -spec handle(binary(), atom(), atom(), opts(), jid(), jid(), iq()) -> any(). handle(Host, Module, Function, Opts, From, To, IQ) -> case Opts of - no_queue -> - process_iq(Host, Module, Function, From, To, IQ); - {one_queue, Pid} -> Pid ! {process_iq, From, To, IQ}; - {queues, Pids} -> - Pid = lists:nth(erlang:phash(now(), length(Pids)), - Pids), - Pid ! {process_iq, From, To, IQ}; - parallel -> - spawn(?MODULE, process_iq, - [Host, Module, Function, From, To, IQ]); - _ -> todo + no_queue -> + process_iq(Host, Module, Function, From, To, IQ); + parallel -> + spawn(?MODULE, process_iq, [Host, Module, Function, From, To, IQ]); + [_|_] = Pids -> + Pid = lists:nth(erlang:phash(now(), length(Pids)), Pids), + Pid ! {process_iq, From, To, IQ}; + _ -> + ?ERROR_MSG("unexpected iqdisc options = ~p", [Opts]), + todo end. -spec process_iq(binary(), atom(), atom(), jid(), jid(), iq()) -> any(). @@ -138,6 +131,28 @@ check_type(one_queue) -> one_queue; check_type({queues, N}) when is_integer(N), N>0 -> {queues, N}; check_type(parallel) -> parallel. +start_handler(Component, Host, NS, Module, Function) -> + Spec = {{?MODULE, make_ref()}, + {?MODULE, start_link, [Component, Host, NS, Module, Function]}, + permanent, + brutal_kill, + worker, + [?MODULE]}, + {ok, Pid} = supervisor:start_child(ejabberd_iq_sup, Spec), + Pid. + +stop_handlers(Pids) -> + lists:foreach( + fun({Id, Pid, _, _}) -> + case lists:member(Pid, Pids) of + true -> + supervisor:terminate_child(ejabberd_iq_sup, Id), + supervisor:delete_child(ejabberd_iq_sup, Id); + false -> + ok + end + end, supervisor:which_children(ejabberd_iq_sup)). + %%==================================================================== %% gen_server callbacks %%====================================================================