mirror of
https://github.com/processone/ejabberd.git
synced 2024-11-06 15:37:52 +01:00
02eeebd41a
If the client is able to go to oor, mod_ack must made it do so. If the client is not able to to go oor mode, mod_ack must stop the c2s session, and wait for it to really terminate before continuing (to avoid race conditions, see EJABS-1677). Conflicts: src/ejabberd_c2s.erl src/mod_ack.erl src/mod_ping.erl
413 lines
12 KiB
Erlang
413 lines
12 KiB
Erlang
%%%-------------------------------------------------------------------
|
|
%%% File : mod_ack.erl
|
|
%%% Author : Mickael Remond <mremond@process-one.net>
|
|
%%% Description : Implements reliable message delivery
|
|
%%% Note: this module depends on mod_caps
|
|
%%% Created : 12 Mar 2010 by Mickael Remond <mremond@process-one.net>
|
|
%%%
|
|
%%%
|
|
%%% ejabberd, Copyright (C) 2002-2012 ProcessOne
|
|
%%%
|
|
%%% This program is free software; you can redistribute it and/or
|
|
%%% modify it under the terms of the GNU General Public License as
|
|
%%% published by the Free Software Foundation; either version 2 of the
|
|
%%% License, or (at your option) any later version.
|
|
%%%
|
|
%%% This program is distributed in the hope that it will be useful,
|
|
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
%%% General Public License for more details.
|
|
%%%
|
|
%%% You should have received a copy of the GNU General Public License
|
|
%%% along with this program; if not, write to the Free Software
|
|
%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
|
|
%%% 02111-1307 USA
|
|
%%%
|
|
%%%-------------------------------------------------------------------
|
|
-module(mod_ack).
|
|
|
|
-behaviour(gen_server).
|
|
|
|
-behaviour(gen_mod).
|
|
|
|
%% API
|
|
-export([start/2, stop/1, start_link/2]).
|
|
|
|
-export([user_send_packet/3, offline_message/3,
|
|
delayed_message/3, remove_connection/3,
|
|
feature_inspect_packet/4]).
|
|
|
|
%% gen_server callbacks
|
|
-export([init/1, handle_info/2, handle_call/3,
|
|
handle_cast/2, terminate/2, code_change/3]).
|
|
|
|
-include("jlib.hrl").
|
|
|
|
-include("ejabberd.hrl").
|
|
|
|
-define(PROCNAME, ejabberd_mod_ack).
|
|
|
|
-define(ACK_TIMEOUT, 60).
|
|
|
|
-define(DICT, dict).
|
|
|
|
-ifndef(NS_RECEIPTS).
|
|
|
|
-define(NS_RECEIPTS, <<"urn:xmpp:receipts">>).
|
|
|
|
-endif.
|
|
|
|
-ifndef(NS_PING).
|
|
|
|
-define(NS_PING, <<"urn:xmpp:ping">>).
|
|
|
|
-endif.
|
|
|
|
-ifndef(NS_P1_PUSHED).
|
|
|
|
-define(NS_P1_PUSHED, <<"p1:pushed">>).
|
|
|
|
-endif.
|
|
|
|
-record(state, {host = <<"">> :: binary(),
|
|
timers = (?DICT):new() :: dict(),
|
|
timeout = ?ACK_TIMEOUT :: non_neg_integer()}).
|
|
|
|
start_link(Host, Opts) ->
|
|
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
|
|
gen_server:start_link({local, Proc}, ?MODULE,
|
|
[Host, Opts], []).
|
|
|
|
start(Host, Opts) ->
|
|
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
|
|
ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]},
|
|
transient, 1000, worker, [?MODULE]},
|
|
supervisor:start_child(ejabberd_sup, ChildSpec).
|
|
|
|
stop(Host) ->
|
|
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
|
|
gen_server:call(Proc, stop),
|
|
supervisor:terminate_child(ejabberd_sup, Proc),
|
|
supervisor:delete_child(ejabberd_sup, Proc).
|
|
|
|
user_send_packet(From, To,
|
|
#xmlel{name = <<"message">>} = Packet) ->
|
|
case has_receipt_request(Packet) of
|
|
{true, _} ->
|
|
process_ack_request(<<"on-sender-server">>, From, To,
|
|
Packet);
|
|
false ->
|
|
case has_receipt_response(Packet) of
|
|
{true, ID} ->
|
|
Server = From#jid.lserver,
|
|
del_timer(Server, {message, ID}, From);
|
|
false -> do_nothing
|
|
end
|
|
end;
|
|
user_send_packet(From, _To,
|
|
#xmlel{name = <<"iq">>, attrs = Attrs}) ->
|
|
case xml:get_attr_s(<<"id">>, Attrs) of
|
|
<<"">> -> ok;
|
|
ID ->
|
|
Server = From#jid.lserver,
|
|
del_timer(Server, {iq, ID}, From)
|
|
end;
|
|
user_send_packet(_From, _To, _Packet) -> do_nothing.
|
|
|
|
offline_message(From, To, Packet) ->
|
|
process_ack_request(<<"offline">>, From, To, Packet),
|
|
ok.
|
|
|
|
delayed_message(From, To, Packet) ->
|
|
process_ack_request(<<"delayed">>, From, To, Packet),
|
|
ok.
|
|
|
|
feature_inspect_packet(JID, Server,
|
|
#xmlel{name = <<"presence">>} = Pres,
|
|
#xmlel{name = <<"message">>, attrs = Attrs} = El) ->
|
|
HasReceipts = has_receipt_request(El),
|
|
ReceiptsSupported = are_receipts_supported(Server, Pres),
|
|
?DEBUG("feature_inspect_packet:~n** JID: ~p~n** "
|
|
"Has receipts: ~p~n** Receipts supported: "
|
|
"~p~n** Pres: ~p~n** El: ~p",
|
|
[JID, HasReceipts, ReceiptsSupported, Pres, El]),
|
|
Type = xml:get_attr_s(<<"type">>, Attrs),
|
|
case HasReceipts of
|
|
_ when Type == <<"error">> -> ok;
|
|
{true, ID} ->
|
|
case {jlib:string_to_jid(xml:get_attr_s(<<"from">>,
|
|
Attrs)),
|
|
jlib:string_to_jid(xml:get_attr_s(<<"to">>, Attrs))}
|
|
of
|
|
{#jid{} = From, #jid{} = To} ->
|
|
Pkt = {From, To, El},
|
|
case ReceiptsSupported of
|
|
true -> add_timer(Server, {message, ID}, JID, Pkt);
|
|
false -> ping(From, To, Server, JID, El);
|
|
unknown ->
|
|
process_ack_request(<<"unreliable">>, From, To, El)
|
|
end;
|
|
_ ->
|
|
?WARNING_MSG("message doesn't have 'from' or 'to' "
|
|
"attribute:~n** El: ~p",
|
|
[El])
|
|
end;
|
|
_ -> ok
|
|
end;
|
|
feature_inspect_packet(_User, _Server, _Pres, _El) ->
|
|
ok.
|
|
|
|
remove_connection({_, C2SPid}, #jid{lserver = Host},
|
|
_Info) ->
|
|
gen_server:cast(gen_mod:get_module_proc(Host,
|
|
?PROCNAME),
|
|
{del, C2SPid}).
|
|
|
|
init([Host, Opts]) ->
|
|
Timeout = timer:seconds(gen_mod:get_opt(timeout, Opts,
|
|
fun(I) when is_integer(I), I>0 ->
|
|
I
|
|
end,
|
|
?ACK_TIMEOUT)),
|
|
ejabberd_hooks:add(user_send_packet, Host, ?MODULE,
|
|
user_send_packet, 20),
|
|
ejabberd_hooks:add(offline_message_hook, Host, ?MODULE,
|
|
offline_message, 20),
|
|
ejabberd_hooks:add(delayed_message_hook, Host, ?MODULE,
|
|
delayed_message, 20),
|
|
ejabberd_hooks:add(feature_inspect_packet, Host,
|
|
?MODULE, feature_inspect_packet, 150),
|
|
ejabberd_hooks:add(sm_remove_connection_hook, Host,
|
|
?MODULE, remove_connection, 20),
|
|
ejabberd_hooks:add(sm_remove_migrated_connection_hook,
|
|
Host, ?MODULE, remove_connection, 20),
|
|
{ok, #state{host = Host, timeout = Timeout}}.
|
|
|
|
handle_call(stop, _From, State) ->
|
|
{stop, normal, ok, State};
|
|
handle_call(_Req, _From, State) ->
|
|
{reply, {error, badarg}, State}.
|
|
|
|
handle_cast({add, ID, Pid, Packet}, State) ->
|
|
TRef = erlang:start_timer(State#state.timeout, self(),
|
|
{ID, Pid}),
|
|
Timers = insert(Pid, ID, {TRef, Packet},
|
|
State#state.timers),
|
|
{noreply, State#state{timers = Timers}};
|
|
handle_cast({del, ID, Pid}, State) ->
|
|
case lookup(Pid, ID, State#state.timers) of
|
|
{ok, {TRef, {From, To, #xmlel{attrs = Attrs}}}} ->
|
|
cancel_timer(TRef),
|
|
Timers = delete(Pid, ID, State#state.timers),
|
|
case ID of
|
|
{iq, _} ->
|
|
MsgID = xml:get_attr_s(<<"id">>, Attrs),
|
|
Message = #xmlel{name = <<"message">>,
|
|
attrs = [{<<"id">>, MsgID}],
|
|
children =
|
|
[#xmlel{name = <<"received">>,
|
|
attrs =
|
|
[{<<"xmlns">>, ?NS_RECEIPTS},
|
|
{<<"id">>, MsgID}],
|
|
children = []}]},
|
|
ejabberd_router:route(To, From, Message);
|
|
_ -> ok
|
|
end,
|
|
{noreply, State#state{timers = Timers}};
|
|
error -> {noreply, State}
|
|
end;
|
|
handle_cast({del, Pid}, State) ->
|
|
lists:foreach(fun ({_, _, {TRef, {From, To, El}}}) ->
|
|
cancel_timer(TRef),
|
|
El1 = xml:remove_subtags(El, <<"x">>,
|
|
{<<"xmlns">>,
|
|
?NS_P1_PUSHED}),
|
|
El2 = xml:append_subtags(El1,
|
|
[#xmlel{name = <<"x">>,
|
|
attrs =
|
|
[{<<"xmlns">>,
|
|
?NS_P1_PUSHED}],
|
|
children = []}]),
|
|
?DEBUG("Resending message:~n** From: ~p~n** "
|
|
"To: ~p~n** El: ~p",
|
|
[From, To, El2]),
|
|
ejabberd_router:route(From, To, El2)
|
|
end,
|
|
to_list(Pid, State#state.timers)),
|
|
Timers = delete(Pid, State#state.timers),
|
|
{noreply, State#state{timers = Timers}};
|
|
handle_cast(_Msg, State) -> {noreply, State}.
|
|
|
|
handle_info({timeout, _TRef, {ID, Pid}}, State) ->
|
|
case lookup(Pid, ID, State#state.timers) of
|
|
{ok, _} ->
|
|
catch ejabberd_c2s:stop_or_detach(Pid),
|
|
handle_cast({del, Pid}, State);
|
|
error ->
|
|
?WARNING_MSG("ack ID not found :~p", [ID]),
|
|
{noreply, State}
|
|
end;
|
|
handle_info(_Info, State) -> {noreply, State}.
|
|
|
|
terminate(_Reason, State) ->
|
|
Host = State#state.host,
|
|
ejabberd_hooks:delete(user_send_packet, Host, ?MODULE,
|
|
user_send_packet, 20),
|
|
ejabberd_hooks:delete(offline_message_hook, Host,
|
|
?MODULE, offline_message, 20),
|
|
ejabberd_hooks:delete(delayed_message_hook, Host,
|
|
?MODULE, delayed_message, 20),
|
|
ejabberd_hooks:delete(feature_inspect_packet, Host,
|
|
?MODULE, feature_inspect_packet, 150),
|
|
ejabberd_hooks:delete(sm_remove_connection_hook, Host,
|
|
?MODULE, remove_connection, 20),
|
|
ejabberd_hooks:delete(sm_remove_migrated_connection_hook,
|
|
Host, ?MODULE, remove_connection, 20),
|
|
ok.
|
|
|
|
code_change(_OldVsn, State, _Extra) -> {ok, State}.
|
|
|
|
process_ack_request(AckTagName,
|
|
#jid{lserver = LServer} = From, To,
|
|
#xmlel{name = <<"message">>} = Packet) ->
|
|
case has_receipt_request(Packet) of
|
|
{true, ID} ->
|
|
BareTo = jlib:jid_remove_resource(To),
|
|
Message = #xmlel{name = <<"message">>,
|
|
attrs = [{<<"id">>, ID}],
|
|
children =
|
|
[#xmlel{name = AckTagName,
|
|
attrs =
|
|
[{<<"xmlns">>, ?NS_RECEIPTS},
|
|
{<<"server">>, LServer},
|
|
{<<"id">>, ID}],
|
|
children = []}]},
|
|
ejabberd_router:route(BareTo, From, Message);
|
|
false -> do_nothing
|
|
end.
|
|
|
|
has_receipt_request(Packet) ->
|
|
has_receipt(Packet, <<"request">>).
|
|
|
|
has_receipt_response(Packet) ->
|
|
has_receipt(Packet, <<"received">>).
|
|
|
|
has_receipt(#xmlel{name = <<"message">>,
|
|
attrs = MsgAttrs} =
|
|
Packet,
|
|
Type) ->
|
|
case xml:get_attr_s(<<"id">>, MsgAttrs) of
|
|
<<"">> ->
|
|
case Type of
|
|
<<"request">> ->
|
|
false; %% Message must have an ID to ask a request for ack.
|
|
<<"received">> ->
|
|
case xml:get_subtag(Packet, <<"received">>) of
|
|
false -> false;
|
|
#xmlel{attrs = Attrs} ->
|
|
case xml:get_attr_s(<<"xmlns">>, Attrs) of
|
|
?NS_RECEIPTS ->
|
|
case xml:get_attr_s(<<"id">>, Attrs) of
|
|
<<"">> -> false;
|
|
SubTagID -> {true, SubTagID}
|
|
end;
|
|
_ -> false
|
|
end
|
|
end
|
|
end;
|
|
ID ->
|
|
case xml:get_subtag(Packet, Type) of
|
|
false -> false;
|
|
#xmlel{attrs = Attrs} ->
|
|
case xml:get_attr_s(<<"xmlns">>, Attrs) of
|
|
?NS_RECEIPTS ->
|
|
case xml:get_attr_s(<<"id">>, Attrs) of
|
|
<<"">> -> {true, ID};
|
|
SubTagID -> {true, SubTagID}
|
|
end;
|
|
_ -> false
|
|
end
|
|
end
|
|
end.
|
|
|
|
are_receipts_supported(Server,
|
|
#xmlel{name = <<"presence">>,
|
|
children = Els}) ->
|
|
case mod_caps:read_caps(Els) of
|
|
nothing -> unknown;
|
|
Caps ->
|
|
lists:member(?NS_RECEIPTS, mod_caps:get_features(Server, Caps))
|
|
end.
|
|
|
|
ping(From, To, Server, JID, El) ->
|
|
ID = randoms:get_string(),
|
|
add_timer(Server, {iq, ID}, JID, {From, To, El}),
|
|
ejabberd_router:route(jlib:make_jid(<<"">>, Server,
|
|
<<"">>),
|
|
JID,
|
|
#xmlel{name = <<"iq">>,
|
|
attrs =
|
|
[{<<"type">>, <<"get">>}, {<<"id">>, ID}],
|
|
children =
|
|
[#xmlel{name = <<"query">>,
|
|
attrs = [{<<"xmlns">>, ?NS_PING}],
|
|
children = []}]}).
|
|
|
|
add_timer(Host, ID, JID, Packet) ->
|
|
{U, S, R} = jlib:jid_tolower(JID),
|
|
C2SPid = ejabberd_sm:get_session_pid(U, S, R),
|
|
gen_server:cast(gen_mod:get_module_proc(Host,
|
|
?PROCNAME),
|
|
{add, ID, C2SPid, Packet}).
|
|
|
|
del_timer(Host, ID, JID) ->
|
|
{U, S, R} = jlib:jid_tolower(JID),
|
|
C2SPid = ejabberd_sm:get_session_pid(U, S, R),
|
|
gen_server:cast(gen_mod:get_module_proc(Host,
|
|
?PROCNAME),
|
|
{del, ID, C2SPid}).
|
|
|
|
cancel_timer(TRef) ->
|
|
case erlang:cancel_timer(TRef) of
|
|
false ->
|
|
receive {timeout, TRef, _} -> ok after 0 -> ok end;
|
|
_ -> ok
|
|
end.
|
|
|
|
lookup(Pid, Key, Queue) ->
|
|
case (?DICT):find(Pid, Queue) of
|
|
{ok, Treap} ->
|
|
case treap:lookup(Key, Treap) of
|
|
{ok, _, Val} -> {ok, Val};
|
|
error -> error
|
|
end;
|
|
error -> error
|
|
end.
|
|
|
|
insert(Pid, Key, Val, Queue) ->
|
|
Treap = case (?DICT):find(Pid, Queue) of
|
|
{ok, Treap1} -> Treap1;
|
|
error -> nil
|
|
end,
|
|
(?DICT):store(Pid, treap:insert(Key, now(), Val, Treap),
|
|
Queue).
|
|
|
|
delete(Pid, Key, Queue) ->
|
|
case (?DICT):find(Pid, Queue) of
|
|
{ok, Treap} ->
|
|
NewTreap = treap:delete(Key, Treap),
|
|
case treap:is_empty(NewTreap) of
|
|
true -> (?DICT):erase(Pid, Queue);
|
|
false -> (?DICT):store(Pid, NewTreap, Queue)
|
|
end;
|
|
error -> Queue
|
|
end.
|
|
|
|
delete(Pid, Queue) -> (?DICT):erase(Pid, Queue).
|
|
|
|
to_list(Pid, Queue) ->
|
|
case (?DICT):find(Pid, Queue) of
|
|
{ok, Treap} -> treap:to_list(Treap);
|
|
error -> []
|
|
end.
|