From 7a3092a859e0605959b7ef241d8224c456e6518f Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Fri, 10 Nov 2017 18:02:22 +0300 Subject: [PATCH] Use new API for IQ routing Functions ejabberd_local:route_iq/2,3 are now depecated: ejabberd_router:route_iq/2,3,4 should be used instead. --- src/ejabberd_iq.erl | 149 ++++++++++++++++++++++++++++++++++++++++ src/ejabberd_local.erl | 138 +++++-------------------------------- src/ejabberd_router.erl | 49 +++++++++---- src/ejabberd_sup.erl | 3 + src/mod_caps.erl | 51 +++++++------- src/mod_delegation.erl | 87 ++++++++++++----------- src/mod_muc_room.erl | 101 ++++++++++----------------- src/mod_ping.erl | 15 ++-- src/mod_push.erl | 9 ++- 9 files changed, 331 insertions(+), 271 deletions(-) create mode 100644 src/ejabberd_iq.erl diff --git a/src/ejabberd_iq.erl b/src/ejabberd_iq.erl new file mode 100644 index 000000000..36d822ba6 --- /dev/null +++ b/src/ejabberd_iq.erl @@ -0,0 +1,149 @@ +%%%------------------------------------------------------------------- +%%% @author xram +%%% @copyright (C) 2017, xram +%%% @doc +%%% +%%% @end +%%% Created : 10 Nov 2017 by xram +%%%------------------------------------------------------------------- +-module(ejabberd_iq). + +-behaviour(gen_server). + +%% API +-export([start_link/0, route/4, dispatch/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-include("xmpp.hrl"). +-include("logger.hrl"). + +-record(state, {expire = infinity :: timeout()}). + +%%%=================================================================== +%%% API +%%%=================================================================== +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +route(#iq{type = T} = IQ, Proc, Ctx, Timeout) when T == set; T == get -> + Expire = current_time() + Timeout, + Rnd = randoms:get_string(), + ID = encode_id(Expire, Rnd), + ets:insert(?MODULE, {{Expire, Rnd}, Proc, Ctx}), + gen_server:cast(?MODULE, {restart_timer, Expire}), + ejabberd_router:route(IQ#iq{id = ID}). + +-spec dispatch(iq()) -> boolean(). +dispatch(#iq{type = T, id = ID} = IQ) when T == error; T == result -> + case decode_id(ID) of + {ok, Expire, Rnd, Node} -> + ejabberd_cluster:send({?MODULE, Node}, {route, IQ, {Expire, Rnd}}); + error -> + false + end; +dispatch(_) -> + false. + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== +init([]) -> + ets:new(?MODULE, [named_table, ordered_set, public]), + {ok, #state{}}. + +handle_call(Request, From, State) -> + {stop, {unexpected_call, Request, From}, State}. + +handle_cast({restart_timer, Expire}, State) -> + State1 = State#state{expire = min(Expire, State#state.expire)}, + noreply(State1); +handle_cast(Msg, State) -> + ?WARNING_MSG("unexpected cast: ~p", [Msg]), + noreply(State). + +handle_info({route, IQ, Key}, State) -> + case ets:lookup(?MODULE, Key) of + [{_, Proc, Ctx}] -> + callback(Proc, IQ, Ctx), + ets:delete(?MODULE, Key); + [] -> + ok + end, + noreply(State); +handle_info(timeout, State) -> + Expire = clean(ets:first(?MODULE)), + noreply(State#state{expire = Expire}); +handle_info(Info, State) -> + ?WARNING_MSG("unexpected info: ~p", [Info]), + noreply(State). + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +current_time() -> + p1_time_compat:system_time(milli_seconds). + +clean({Expire, _} = Key) -> + case current_time() of + Time when Time >= Expire -> + case ets:lookup(?MODULE, Key) of + [{_, Proc, Ctx}] -> + callback(Proc, timeout, Ctx), + ets:delete(?MODULE, Key); + [] -> + ok + end, + clean(ets:next(?MODULE, Key)); + _ -> + Expire + end; +clean('$end_of_table') -> + infinity. + +noreply(#state{expire = Expire} = State) -> + case Expire of + infinity -> + {noreply, State}; + _ -> + Timeout = max(0, Expire - current_time()), + {noreply, State, Timeout} + end. + +encode_id(Expire, Rnd) -> + ExpireBin = integer_to_binary(Expire), + Node = atom_to_binary(node(), utf8), + CheckSum = calc_checksum(<>), + <<"rr-", ExpireBin/binary, $-, Rnd/binary, $-, CheckSum/binary, $-, Node/binary>>. + +decode_id(<<"rr-", ID/binary>>) -> + try + [ExpireBin, Tail] = binary:split(ID, <<"-">>), + [Rnd, Rest] = binary:split(Tail, <<"-">>), + [CheckSum, NodeBin] = binary:split(Rest, <<"-">>), + CheckSum = calc_checksum(<>), + Node = erlang:binary_to_existing_atom(NodeBin, utf8), + Expire = binary_to_integer(ExpireBin), + {ok, Expire, Rnd, Node} + catch _:{badmatch, _} -> + error + end; +decode_id(_) -> + error. + +calc_checksum(Data) -> + Key = ejabberd_config:get_option(shared_key), + base64:encode(crypto:hash(sha, <>)). + +callback(undefined, IQRes, Fun) -> + Fun(IQRes); +callback(Proc, IQRes, Ctx) -> + Proc ! {iq_reply, IQRes, Ctx}. diff --git a/src/ejabberd_local.erl b/src/ejabberd_local.erl index c1b21d508..cc1d6a2eb 100644 --- a/src/ejabberd_local.erl +++ b/src/ejabberd_local.erl @@ -32,17 +32,21 @@ %% API -export([start/0, start_link/0]). --export([route/1, route_iq/2, route_iq/3, process_iq/1, - process_iq_reply/1, get_features/1, - register_iq_handler/5, register_iq_response_handler/4, - register_iq_response_handler/5, unregister_iq_handler/2, - unregister_iq_response_handler/2, bounce_resource_packet/1, +-export([route/1, process_iq/1, + get_features/1, + register_iq_handler/5, + unregister_iq_handler/2, + bounce_resource_packet/1, host_up/1, host_down/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% deprecated functions: use ejabberd_router:route_iq/3,4 +-export([route_iq/2, route_iq/3]). +-deprecated([{route_iq, 2}, {route_iq, 3}]). + -include("ejabberd.hrl"). -include("logger.hrl"). -include_lib("stdlib/include/ms_transform.hrl"). @@ -50,18 +54,8 @@ -record(state, {}). --record(iq_response, {id = <<"">> :: binary(), - module :: atom(), - function :: atom() | fun(), - timer = make_ref() :: reference()}). - -define(IQTABLE, local_iqtable). -%% This value is used in SIP and Megaco for a transaction lifetime. --define(IQ_TIMEOUT, 32000). - --type ping_timeout() :: non_neg_integer() | undefined. - %%==================================================================== %% API %%==================================================================== @@ -99,17 +93,8 @@ process_iq(#iq{type = T, lang = Lang, sub_els = SubEls} = Packet) end, Err = xmpp:err_bad_request(Txt, Lang), ejabberd_router:route_error(Packet, Err); -process_iq(#iq{type = T} = Packet) when T == result; T == error -> - process_iq_reply(Packet). - --spec process_iq_reply(iq()) -> any(). -process_iq_reply(#iq{id = ID} = IQ) -> - case get_iq_callback(ID) of - {ok, undefined, Function} -> Function(IQ), ok; - {ok, Module, Function} -> - Module:Function(IQ), ok; - _ -> nothing - end. +process_iq(#iq{type = T}) when T == result; T == error -> + ok. -spec route(stanza()) -> any(). route(Packet) -> @@ -119,43 +104,13 @@ route(Packet) -> [xmpp:pp(Packet), {E, {R, erlang:get_stacktrace()}}]) end. --spec route_iq(iq(), function()) -> any(). -route_iq(IQ, F) -> - route_iq(IQ, F, undefined). +-spec route_iq(iq(), function()) -> ok. +route_iq(IQ, Fun) -> + route_iq(IQ, Fun, undefined). --spec route_iq(iq(), function(), ping_timeout()) -> any(). -route_iq(#iq{from = From, type = Type} = IQ, F, Timeout) - when is_function(F) -> - Packet = if Type == set; Type == get -> - ID = randoms:get_string(), - Host = From#jid.lserver, - register_iq_response_handler(Host, ID, undefined, F, Timeout), - IQ#iq{id = ID}; - true -> - IQ - end, - ejabberd_router:route(Packet). - --spec register_iq_response_handler(binary(), binary(), module(), - atom() | function()) -> any(). -register_iq_response_handler(Host, ID, Module, - Function) -> - register_iq_response_handler(Host, ID, Module, Function, - undefined). - --spec register_iq_response_handler(binary(), binary(), module(), - atom() | function(), ping_timeout()) -> any(). -register_iq_response_handler(_Host, ID, Module, - Function, Timeout0) -> - Timeout = case Timeout0 of - undefined -> ?IQ_TIMEOUT; - N when is_integer(N), N > 0 -> N - end, - TRef = erlang:start_timer(Timeout, ?MODULE, ID), - mnesia:dirty_write(#iq_response{id = ID, - module = Module, - function = Function, - timer = TRef}). +-spec route_iq(iq(), function(), undefined | non_neg_integer()) -> ok. +route_iq(IQ, Fun, Timeout) -> + ejabberd_router:route_iq(IQ, Fun, undefined, Timeout). -spec register_iq_handler(binary(), binary(), module(), function(), gen_iq_handler:opts()) -> ok. @@ -163,10 +118,6 @@ register_iq_handler(Host, XMLNS, Module, Fun, Opts) -> gen_server:cast(?MODULE, {register_iq_handler, Host, XMLNS, Module, Fun, Opts}). --spec unregister_iq_response_handler(binary(), binary()) -> ok. -unregister_iq_response_handler(_Host, ID) -> - catch get_iq_callback(ID), ok. - -spec unregister_iq_handler(binary(), binary()) -> ok. unregister_iq_handler(Host, XMLNS) -> gen_server:cast(?MODULE, {unregister_iq_handler, Host, XMLNS}). @@ -204,9 +155,6 @@ init([]) -> catch ets:new(?IQTABLE, [named_table, public, ordered_set, {read_concurrency, true}]), update_table(), - ejabberd_mnesia:create(?MODULE, iq_response, - [{ram_copies, [node()]}, - {attributes, record_info(fields, iq_response)}]), {ok, #state{}}. handle_call(_Request, _From, State) -> @@ -232,9 +180,6 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({route, Packet}, State) -> route(Packet), {noreply, State}; -handle_info({timeout, _TRef, ID}, State) -> - process_iq_timeout(ID), - {noreply, State}; handle_info(Info, State) -> ?WARNING_MSG("unexpected info: ~p", [Info]), {noreply, State}. @@ -269,15 +214,8 @@ do_route(Packet) -> -spec update_table() -> ok. update_table() -> - case catch mnesia:table_info(iq_response, attributes) of - [id, module, function] -> - mnesia:delete_table(iq_response), - ok; - [id, module, function, timer] -> - ok; - {'EXIT', _} -> - ok - end. + catch mnesia:delete_table(iq_response), + ok. host_up(Host) -> Owner = case whereis(?MODULE) of @@ -296,41 +234,3 @@ host_down(Host) -> ejabberd_router:unregister_route(Host, Owner), ejabberd_hooks:delete(local_send_to_resource_hook, Host, ?MODULE, bounce_resource_packet, 100). - --spec get_iq_callback(binary()) -> {ok, module(), atom() | function()} | error. -get_iq_callback(ID) -> - case mnesia:dirty_read(iq_response, ID) of - [#iq_response{module = Module, timer = TRef, - function = Function}] -> - cancel_timer(TRef), - mnesia:dirty_delete(iq_response, ID), - {ok, Module, Function}; - _ -> - error - end. - --spec process_iq_timeout(binary()) -> any(). -process_iq_timeout(ID) -> - spawn(fun process_iq_timeout/0) ! ID. - --spec process_iq_timeout() -> any(). -process_iq_timeout() -> - receive - ID -> - case get_iq_callback(ID) of - {ok, undefined, Function} -> - Function(timeout); - _ -> - ok - end - after 5000 -> - ok - end. - --spec cancel_timer(reference()) -> ok. -cancel_timer(TRef) -> - case erlang:cancel_timer(TRef) of - false -> - receive {timeout, TRef, _} -> ok after 0 -> ok end; - _ -> ok - end. diff --git a/src/ejabberd_router.erl b/src/ejabberd_router.erl index 69413c6de..4fc0fa290 100644 --- a/src/ejabberd_router.erl +++ b/src/ejabberd_router.erl @@ -37,6 +37,9 @@ %% API -export([route/1, route_error/2, + route_iq/2, + route_iq/3, + route_iq/4, register_route/2, register_route/3, register_route/4, @@ -62,6 +65,9 @@ -export([route/3, route_error/4]). -deprecated([{route, 3}, {route_error, 4}]). +%% This value is used in SIP and Megaco for a transaction lifetime. +-define(IQ_TIMEOUT, 32000). + -include("ejabberd.hrl"). -include("logger.hrl"). -include("ejabberd_router.hrl"). @@ -136,6 +142,20 @@ route_error(From, To, Packet, #stanza_error{} = Err) -> route(From, To, xmpp:make_error(Packet, Err)) end. +-spec route_iq(iq(), term()) -> ok. +route_iq(IQ, State) -> + route_iq(IQ, State, undefined, ?IQ_TIMEOUT). + +-spec route_iq(iq(), term(), pid() | atom()) -> ok. +route_iq(IQ, State, Proc) -> + route_iq(IQ, State, Proc, ?IQ_TIMEOUT). + +-spec route_iq(iq(), term(), pid() | atom(), undefined | non_neg_integer()) -> ok. +route_iq(IQ, State, Proc, undefined) -> + route_iq(IQ, State, Proc, ?IQ_TIMEOUT); +route_iq(IQ, State, Proc, Timeout) -> + ejabberd_iq:route(IQ, Proc, State, Timeout). + -spec register_route(binary(), binary()) -> ok. register_route(Domain, ServerHost) -> register_route(Domain, ServerHost, undefined). @@ -339,18 +359,23 @@ do_route(OrigPacket) -> drop -> ok; Packet -> - To = xmpp:get_to(Packet), - LDstDomain = To#jid.lserver, - case find_routes(LDstDomain) of - [] -> - ejabberd_s2s:route(Packet); - [Route] -> - do_route(Packet, Route); - Routes -> - From = xmpp:get_from(Packet), - balancing_route(From, To, Packet, Routes) - end, - ok + case ejabberd_iq:dispatch(Packet) of + true -> + ok; + false -> + To = xmpp:get_to(Packet), + LDstDomain = To#jid.lserver, + case find_routes(LDstDomain) of + [] -> + ejabberd_s2s:route(Packet); + [Route] -> + do_route(Packet, Route); + Routes -> + From = xmpp:get_from(Packet), + balancing_route(From, To, Packet, Routes) + end, + ok + end end. -spec do_route(stanza(), #route{}) -> any(). diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl index 35527ebd7..463e7ea29 100644 --- a/src/ejabberd_sup.erl +++ b/src/ejabberd_sup.erl @@ -156,6 +156,8 @@ init([]) -> permanent, 5000, worker, [cyrsasl]}, PKIX = {ejabberd_pkix, {ejabberd_pkix, start_link, []}, permanent, 5000, worker, [ejabberd_pkix]}, + IQ = {ejabberd_iq, {ejabberd_iq, start_link, []}, + permanent, 5000, worker, [ejabberd_iq]}, {ok, {{one_for_one, 10, 1}, [Hooks, Cluster, @@ -180,6 +182,7 @@ init([]) -> SQLSupervisor, RiakSupervisor, RedisSupervisor, + IQ, Router, RouterMulticast, Local, diff --git a/src/mod_caps.erl b/src/mod_caps.erl index c34c81631..edc93bbf1 100644 --- a/src/mod_caps.erl +++ b/src/mod_caps.erl @@ -118,11 +118,11 @@ user_send_packet({#presence{type = available, from = #jid{luser = U, lserver = LServer} = From, to = #jid{luser = U, lserver = LServer, lresource = <<"">>}} = Pkt, - State}) -> + #{jid := To} = State}) -> case read_caps(Pkt) of nothing -> ok; #caps{version = Version, exts = Exts} = Caps -> - feature_request(LServer, From, Caps, [Version | Exts]) + feature_request(LServer, From, To, Caps, [Version | Exts]) end, {Pkt, State}; user_send_packet(Acc) -> @@ -130,13 +130,13 @@ user_send_packet(Acc) -> -spec user_receive_packet({stanza(), ejabberd_c2s:state()}) -> {stanza(), ejabberd_c2s:state()}. user_receive_packet({#presence{from = From, type = available} = Pkt, - #{lserver := LServer} = State}) -> + #{lserver := LServer, jid := To} = State}) -> IsRemote = not ejabberd_router:is_my_host(From#jid.lserver), if IsRemote -> case read_caps(Pkt) of nothing -> ok; #caps{version = Version, exts = Exts} = Caps -> - feature_request(LServer, From, Caps, [Version | Exts]) + feature_request(LServer, From, To, Caps, [Version | Exts]) end; true -> ok end, @@ -298,7 +298,12 @@ handle_call(_Req, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info(_Info, State) -> {noreply, State}. +handle_info({iq_reply, IQReply, {Host, From, To, Caps, SubNodes}}, State) -> + feature_response(IQReply, Host, From, To, Caps, SubNodes), + {noreply, State}; +handle_info(Info, State) -> + ?WARNING_MSG("unexpected info: ~p", [Info]), + {noreply, State}. terminate(_Reason, State) -> Host = State#state.host, @@ -322,39 +327,37 @@ terminate(_Reason, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. --spec feature_request(binary(), jid(), caps(), [binary()]) -> any(). -feature_request(Host, From, Caps, +-spec feature_request(binary(), jid(), jid(), caps(), [binary()]) -> any(). +feature_request(Host, From, To, Caps, [SubNode | Tail] = SubNodes) -> Node = Caps#caps.node, NodePair = {Node, SubNode}, case ets_cache:lookup(caps_features_cache, NodePair, caps_read_fun(Host, NodePair)) of {ok, Fs} when is_list(Fs) -> - feature_request(Host, From, Caps, Tail); + feature_request(Host, From, To, Caps, Tail); _ -> - LFrom = jid:tolower(From), - case ets_cache:insert_new(caps_requests_cache, {LFrom, NodePair}, ok) of + LTo = jid:tolower(To), + case ets_cache:insert_new(caps_requests_cache, {LTo, NodePair}, ok) of true -> IQ = #iq{type = get, - from = jid:make(Host), - to = From, + from = From, + to = To, sub_els = [#disco_info{node = <>}]}, - F = fun (IQReply) -> - feature_response(IQReply, Host, From, Caps, - SubNodes) - end, - ejabberd_local:route_iq(IQ, F); + ejabberd_router:route_iq( + IQ, {Host, From, To, Caps, SubNodes}, + gen_mod:get_module_proc(Host, ?MODULE)); false -> ok end, - feature_request(Host, From, Caps, Tail) + feature_request(Host, From, To, Caps, Tail) end; -feature_request(_Host, _From, _Caps, []) -> ok. +feature_request(_Host, _From, _To, _Caps, []) -> ok. --spec feature_response(iq(), binary(), ljid(), caps(), [binary()]) -> any(). +-spec feature_response(iq(), binary(), jid(), jid(), caps(), [binary()]) -> any(). feature_response(#iq{type = result, sub_els = [El]}, - Host, From, Caps, [SubNode | SubNodes]) -> + Host, From, To, Caps, [SubNode | SubNodes]) -> NodePair = {Caps#caps.node, SubNode}, try DiscoInfo = xmpp:decode(El), @@ -374,10 +377,10 @@ feature_response(#iq{type = result, sub_els = [El]}, catch _:{xmpp_codec, _Why} -> ok end, - feature_request(Host, From, Caps, SubNodes); -feature_response(_IQResult, Host, From, Caps, + feature_request(Host, From, To, Caps, SubNodes); +feature_response(_IQResult, Host, From, To, Caps, [_SubNode | SubNodes]) -> - feature_request(Host, From, Caps, SubNodes). + feature_request(Host, From, To, Caps, SubNodes). -spec caps_read_fun(binary(), {binary(), binary()}) -> fun(() -> {ok, [binary()] | non_neg_integer()} | error). diff --git a/src/mod_delegation.erl b/src/mod_delegation.erl index 865f8ebf4..27e00768d 100644 --- a/src/mod_delegation.erl +++ b/src/mod_delegation.erl @@ -47,6 +47,7 @@ -type disco_acc() :: {error, stanza_error()} | {result, [binary()]} | empty. -record(state, {server_host = <<"">> :: binary(), delegations = dict:new() :: ?TDICT}). +-type state() :: #state{}. %%%=================================================================== %%% API @@ -161,27 +162,6 @@ handle_cast({component_connected, Host}, State) -> end end, NSAttrsAccessList), {noreply, State}; -handle_cast({disco_info, Type, Host, NS, Info}, State) -> - From = jid:make(State#state.server_host), - To = jid:make(Host), - case dict:find({NS, Type}, State#state.delegations) of - error -> - Msg = #message{from = From, to = To, - sub_els = [#delegation{delegated = [#delegated{ns = NS}]}]}, - Delegations = dict:store({NS, Type}, {Host, Info}, State#state.delegations), - gen_iq_handler:add_iq_handler(Type, State#state.server_host, NS, - ?MODULE, Type, gen_iq_handler:iqdisc(Host)), - ejabberd_router:route(Msg), - ?INFO_MSG("Namespace '~s' is delegated to external component '~s'", - [NS, Host]), - {noreply, State#state{delegations = Delegations}}; - {ok, {AnotherHost, _}} -> - ?WARNING_MSG("Failed to delegate namespace '~s' to " - "external component '~s' because it's already " - "delegated to '~s'", - [NS, Host, AnotherHost]), - {noreply, State} - end; handle_cast({component_disconnected, Host}, State) -> ServerHost = State#state.server_host, Delegations = @@ -199,7 +179,24 @@ handle_cast({component_disconnected, Host}, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info(_Info, State) -> +handle_info({iq_reply, ResIQ, {disco_info, Type, Host, NS}}, State) -> + {noreply, + case ResIQ of + #iq{type = result, sub_els = [SubEl]} -> + try xmpp:decode(SubEl) of + #disco_info{} = Info -> + process_disco_info(State, Type, Host, NS, Info) + catch _:{xmpp_codec, _} -> + State + end; + _ -> + State + end}; +handle_info({iq_reply, ResIQ, #iq{} = IQ}, State) -> + process_iq_result(IQ, ResIQ), + {noreply, State}; +handle_info(Info, State) -> + ?WARNING_MSG("unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, State) -> @@ -246,12 +243,12 @@ process_iq(#iq{to = To, lang = Lang, sub_els = [SubEl]} = IQ, Type) -> forwarded = #forwarded{xml_els = [xmpp:encode(IQ)]}}, NewFrom = jid:make(LServer), NewTo = jid:make(Host), - ejabberd_local:route_iq( + ejabberd_router:route_iq( #iq{type = set, from = NewFrom, to = NewTo, sub_els = [Delegation]}, - fun(Result) -> process_iq_result(IQ, Result) end), + IQ, gen_mod:get_module_proc(LServer, ?MODULE)), ignore; error -> Txt = <<"Failed to map delegated namespace to external component">>, @@ -284,29 +281,41 @@ process_iq_result(#iq{lang = Lang} = IQ, timeout) -> Err = xmpp:err_internal_server_error(Txt, Lang), ejabberd_router:route_error(IQ, Err). +-spec process_disco_info(state(), ejabberd_local | ejabberd_sm, + binary(), binary(), disco_info()) -> state(). +process_disco_info(State, Type, Host, NS, Info) -> + From = jid:make(State#state.server_host), + To = jid:make(Host), + case dict:find({NS, Type}, State#state.delegations) of + error -> + Msg = #message{from = From, to = To, + sub_els = [#delegation{delegated = [#delegated{ns = NS}]}]}, + Delegations = dict:store({NS, Type}, {Host, Info}, State#state.delegations), + gen_iq_handler:add_iq_handler(Type, State#state.server_host, NS, + ?MODULE, Type, gen_iq_handler:iqdisc(Host)), + ejabberd_router:route(Msg), + ?INFO_MSG("Namespace '~s' is delegated to external component '~s'", + [NS, Host]), + State#state{delegations = Delegations}; + {ok, {AnotherHost, _}} -> + ?WARNING_MSG("Failed to delegate namespace '~s' to " + "external component '~s' because it's already " + "delegated to '~s'", + [NS, Host, AnotherHost]), + State + end. + -spec send_disco_queries(binary(), binary(), binary()) -> ok. send_disco_queries(LServer, Host, NS) -> From = jid:make(LServer), To = jid:make(Host), lists:foreach( fun({Type, Node}) -> - ejabberd_local:route_iq( + ejabberd_router:route_iq( #iq{type = get, from = From, to = To, sub_els = [#disco_info{node = Node}]}, - fun(#iq{type = result, sub_els = [SubEl]}) -> - try xmpp:decode(SubEl) of - #disco_info{} = Info-> - Proc = gen_mod:get_module_proc(LServer, ?MODULE), - gen_server:cast( - Proc, {disco_info, Type, Host, NS, Info}); - _ -> - ok - catch _:{xmpp_codec, _} -> - ok - end; - (_) -> - ok - end) + {disco_info, Type, Host, NS}, + gen_mod:get_module_proc(LServer, ?MODULE)) end, [{ejabberd_local, <<(?NS_DELEGATION)/binary, "::", NS/binary>>}, {ejabberd_sm, <<(?NS_DELEGATION)/binary, ":bare:", NS/binary>>}]). diff --git a/src/mod_muc_room.erl b/src/mod_muc_room.erl index be90d72eb..96eb1da84 100644 --- a/src/mod_muc_room.erl +++ b/src/mod_muc_room.erl @@ -433,27 +433,31 @@ normal_state({route, ToNick, {next_state, normal_state, StateData} end; normal_state({route, ToNick, - #iq{from = From, id = StanzaId, lang = Lang} = Packet}, + #iq{from = From, type = Type, lang = Lang} = Packet}, StateData) -> case {(StateData#state.config)#config.allow_query_users, - is_user_online_iq(StanzaId, From, StateData)} of - {true, {true, NewId, FromFull}} -> + (?DICT):find(jid:tolower(From), StateData#state.users)} of + {true, {ok, #user{nick = FromNick}}} -> case find_jid_by_nick(ToNick, StateData) of false -> ErrText = <<"Recipient is not in the conference room">>, Err = xmpp:err_item_not_found(ErrText, Lang), ejabberd_router:route_error(Packet, Err); - ToJID -> - {ok, #user{nick = FromNick}} = - (?DICT):find(jid:tolower(FromFull), StateData#state.users), - {ToJID2, Packet2} = handle_iq_vcard(ToJID, NewId, Packet), - ejabberd_router:route( - xmpp:set_from_to( - Packet2, - jid:replace_resource(StateData#state.jid, FromNick), - ToJID2)) + To -> + FromJID = jid:replace_resource(StateData#state.jid, FromNick), + if Type == get; Type == set -> + ToJID = case is_vcard_request(Packet) of + true -> jid:remove_resource(To); + false -> To + end, + ejabberd_router:route_iq( + xmpp:set_from_to(Packet, FromJID, ToJID), Packet, self()); + true -> + ejabberd_router:route( + xmpp:set_from_to(Packet, FromJID, To)) + end end; - {_, {false, _, _}} -> + {true, error} -> ErrText = <<"Only occupants are allowed to send queries " "to the conference">>, Err = xmpp:err_not_acceptable(ErrText, Lang), @@ -660,6 +664,18 @@ handle_info({captcha_failed, From}, normal_state, {next_state, normal_state, NewState}; handle_info(shutdown, _StateName, StateData) -> {stop, shutdown, StateData}; +handle_info({iq_reply, #iq{type = Type, sub_els = Els}, + #iq{from = From, to = To} = IQ}, StateName, StateData) -> + ejabberd_router:route( + xmpp:set_from_to( + IQ#iq{type = Type, sub_els = Els}, + To, From)), + {next_state, StateName, StateData}; +handle_info({iq_reply, timeout, IQ}, StateName, StateData) -> + Txt = <<"iq response timed out">>, + Err = xmpp:err_recipient_unavailable(Txt, IQ#iq.lang), + ejabberd_router:route_error(IQ, Err), + {next_state, StateName, StateData}; handle_info(_Info, StateName, StateData) -> {next_state, StateName, StateData}. @@ -920,6 +936,12 @@ process_voice_approval(From, Pkt, VoiceApproval, StateData) -> StateData end. +-spec is_vcard_request(iq()) -> boolean(). +is_vcard_request(#iq{type = T, sub_els = [El]}) -> + (T == get orelse T == set) andalso xmpp:get_ns(El) == ?NS_VCARD; +is_vcard_request(_) -> + false. + %% @doc Check if this non participant can send message to room. %% %% XEP-0045 v1.23: @@ -1129,59 +1151,6 @@ is_occupant_or_admin(JID, StateData) -> _ -> false end. -%%% -%%% Handle IQ queries of vCard -%%% --spec is_user_online_iq(binary(), jid(), state()) -> - {boolean(), binary(), jid()}. -is_user_online_iq(StanzaId, JID, StateData) - when JID#jid.lresource /= <<"">> -> - {is_user_online(JID, StateData), StanzaId, JID}; -is_user_online_iq(StanzaId, JID, StateData) - when JID#jid.lresource == <<"">> -> - try stanzaid_unpack(StanzaId) of - {OriginalId, Resource} -> - JIDWithResource = jid:replace_resource(JID, Resource), - {is_user_online(JIDWithResource, StateData), OriginalId, - JIDWithResource} - catch - _:_ -> {is_user_online(JID, StateData), StanzaId, JID} - end. - --spec handle_iq_vcard(jid(), binary(), iq()) -> {jid(), iq()}. -handle_iq_vcard(ToJID, NewId, #iq{type = Type, sub_els = SubEls} = IQ) -> - ToBareJID = jid:remove_resource(ToJID), - case SubEls of - [SubEl] when Type == get, ToBareJID /= ToJID -> - case xmpp:get_ns(SubEl) of - ?NS_VCARD -> - {ToBareJID, change_stanzaid(ToJID, IQ)}; - _ -> - {ToJID, xmpp:set_id(IQ, NewId)} - end; - _ -> - {ToJID, xmpp:set_id(IQ, NewId)} - end. - --spec stanzaid_pack(binary(), binary()) -> binary(). -stanzaid_pack(OriginalId, Resource) -> - <<"berd", - (base64:encode(<<"ejab\000", - OriginalId/binary, "\000", - Resource/binary>>))/binary>>. - --spec stanzaid_unpack(binary()) -> {binary(), binary()}. -stanzaid_unpack(<<"berd", StanzaIdBase64/binary>>) -> - StanzaId = base64:decode(StanzaIdBase64), - [<<"ejab">>, OriginalId, Resource] = - str:tokens(StanzaId, <<"\000">>), - {OriginalId, Resource}. - --spec change_stanzaid(jid(), iq()) -> iq(). -change_stanzaid(ToJID, #iq{id = PreviousId} = Packet) -> - NewId = stanzaid_pack(PreviousId, ToJID#jid.lresource), - xmpp:set_id(Packet, NewId). - %% Decide the fate of the message and its sender %% Returns: continue_delivery | forget_message | {expulse_sender, Reason} -spec decide_fate_message(message(), jid(), state()) -> diff --git a/src/mod_ping.erl b/src/mod_ping.erl index 1c9639bf7..023571812 100644 --- a/src/mod_ping.erl +++ b/src/mod_ping.erl @@ -132,7 +132,7 @@ handle_cast({start_ping, JID}, State) -> handle_cast({stop_ping, JID}, State) -> Timers = del_timer(JID, State#state.timers), {noreply, State#state{timers = Timers}}; -handle_cast({iq_pong, JID, timeout}, State) -> +handle_cast({iq_reply, timeout, JID}, State) -> Timers = del_timer(JID, State#state.timers), ejabberd_hooks:run(user_ping_timeout, State#state.host, [JID]), @@ -149,20 +149,19 @@ handle_cast({iq_pong, JID, timeout}, State) -> _ -> ok end, {noreply, State#state{timers = Timers}}; -handle_cast({iq_pong, _JID, _}, State) -> +handle_cast({iq_reply, #iq{}, _JID}, State) -> {noreply, State}; handle_cast(Msg, State) -> ?WARNING_MSG("unexpected cast: ~p", [Msg]), {noreply, State}. handle_info({timeout, _TRef, {ping, JID}}, State) -> - From = jid:make(State#state.host), + Host = State#state.host, + From = jid:remove_resource(JID), IQ = #iq{from = From, to = JID, type = get, sub_els = [#ping{}]}, - Pid = self(), - F = fun (Response) -> - gen_server:cast(Pid, {iq_pong, JID, Response}) - end, - ejabberd_local:route_iq(IQ, F, State#state.ping_ack_timeout), + ejabberd_router:route_iq(IQ, JID, + gen_mod:get_module_proc(Host, ?MODULE), + State#state.ping_ack_timeout), Timers = add_timer(JID, State#state.ping_interval, State#state.timers), {noreply, State#state{timers = Timers}}; diff --git a/src/mod_push.erl b/src/mod_push.erl index 0af5df9e3..1eaec6ad5 100644 --- a/src/mod_push.erl +++ b/src/mod_push.erl @@ -46,6 +46,9 @@ %% API (used by mod_push_keepalive). -export([notify/1, notify/3, notify/5]). +%% For IQ callbacks +-export([delete_session/3]). + -include("ejabberd.hrl"). -include("ejabberd_commands.hrl"). -include("logger.hrl"). @@ -426,7 +429,8 @@ notify(LUser, LServer, Clients) -> HandleResponse = fun(#iq{type = result}) -> ok; (#iq{type = error}) -> - delete_session(LUser, LServer, TS); + spawn(?MODULE, delete_session, + [LUser, LServer, TS]); (timeout) -> ok % Hmm. end, @@ -445,8 +449,7 @@ notify(LServer, PushLJID, Node, XData, HandleResponse) -> to = jid:make(PushLJID), id = randoms:get_string(), sub_els = [PubSub]}, - ejabberd_local:route_iq(IQ, HandleResponse), - ok. + ejabberd_router:route_iq(IQ, HandleResponse). %%-------------------------------------------------------------------- %% Internal functions.