mirror of
https://github.com/processone/ejabberd.git
synced 2024-11-24 16:23:40 +01:00
Merge pull request #1881 from weiss/push
Support XEP-0357: Push Notifications
This commit is contained in:
commit
766b7c65a6
@ -725,6 +725,8 @@ modules:
|
||||
- "flat"
|
||||
- "hometree"
|
||||
- "pep" # pep requires mod_caps
|
||||
mod_push: {}
|
||||
mod_push_keepalive: {}
|
||||
## mod_register:
|
||||
##
|
||||
## Protect In-Band account registrations with CAPTCHA.
|
||||
|
@ -47,7 +47,7 @@
|
||||
process_terminated/2, process_info/2]).
|
||||
%% API
|
||||
-export([get_presence/1, set_presence/2, resend_presence/1, resend_presence/2,
|
||||
open_session/1, call/3, send/2, close/1, close/2, stop/1,
|
||||
open_session/1, call/3, cast/2, send/2, close/1, close/2, stop/1,
|
||||
reply/2, copy_state/2, set_timeout/2, route/2,
|
||||
host_up/1, host_down/1]).
|
||||
|
||||
@ -90,6 +90,10 @@ socket_type() ->
|
||||
call(Ref, Msg, Timeout) ->
|
||||
xmpp_stream_in:call(Ref, Msg, Timeout).
|
||||
|
||||
-spec cast(pid(), term()) -> ok.
|
||||
cast(Ref, Msg) ->
|
||||
xmpp_stream_in:cast(Ref, Msg).
|
||||
|
||||
reply(Ref, Reply) ->
|
||||
xmpp_stream_in:reply(Ref, Reply).
|
||||
|
||||
|
@ -66,6 +66,8 @@
|
||||
user_resources/2,
|
||||
kick_user/2,
|
||||
get_session_pid/3,
|
||||
get_session_sid/3,
|
||||
get_session_sids/2,
|
||||
get_user_info/2,
|
||||
get_user_info/3,
|
||||
get_user_ip/3,
|
||||
@ -292,15 +294,31 @@ close_session_unset_presence(SID, User, Server,
|
||||
-spec get_session_pid(binary(), binary(), binary()) -> none | pid().
|
||||
|
||||
get_session_pid(User, Server, Resource) ->
|
||||
case get_session_sid(User, Server, Resource) of
|
||||
{_, PID} -> PID;
|
||||
none -> none
|
||||
end.
|
||||
|
||||
-spec get_session_sid(binary(), binary(), binary()) -> none | sid().
|
||||
|
||||
get_session_sid(User, Server, Resource) ->
|
||||
LUser = jid:nodeprep(User),
|
||||
LServer = jid:nameprep(Server),
|
||||
LResource = jid:resourceprep(Resource),
|
||||
Mod = get_sm_backend(LServer),
|
||||
case online(get_sessions(Mod, LUser, LServer, LResource)) of
|
||||
[#session{sid = {_, Pid}}] -> Pid;
|
||||
[#session{sid = SID}] -> SID;
|
||||
_ -> none
|
||||
end.
|
||||
|
||||
-spec get_session_sids(binary(), binary()) -> [sid()].
|
||||
|
||||
get_session_sids(User, Server) ->
|
||||
LUser = jid:nodeprep(User),
|
||||
LServer = jid:nameprep(Server),
|
||||
Mod = get_sm_backend(LServer),
|
||||
online(get_sessions(Mod, LUser, LServer)).
|
||||
|
||||
-spec set_offline_info(sid(), binary(), binary(), binary(), info()) -> ok.
|
||||
|
||||
set_offline_info(SID, User, Server, Resource, Info) ->
|
||||
|
16
src/misc.erl
16
src/misc.erl
@ -32,8 +32,8 @@
|
||||
hex_to_bin/1, hex_to_base64/1, expand_keyword/3,
|
||||
atom_to_binary/1, binary_to_atom/1, tuple_to_binary/1,
|
||||
l2i/1, i2l/1, i2l/2, expr_to_term/1, term_to_expr/1,
|
||||
encode_pid/1, decode_pid/2, compile_exprs/2, join_atoms/2,
|
||||
try_read_file/1]).
|
||||
now_to_usec/1, usec_to_now/1, encode_pid/1, decode_pid/2,
|
||||
compile_exprs/2, join_atoms/2, try_read_file/1]).
|
||||
|
||||
%% Deprecated functions
|
||||
-export([decode_base64/1, encode_base64/1]).
|
||||
@ -127,6 +127,18 @@ expr_to_term(Expr) ->
|
||||
term_to_expr(Term) ->
|
||||
list_to_binary(io_lib:print(Term)).
|
||||
|
||||
-spec now_to_usec(erlang:timestamp()) -> non_neg_integer().
|
||||
now_to_usec({MSec, Sec, USec}) ->
|
||||
(MSec*1000000 + Sec)*1000000 + USec.
|
||||
|
||||
-spec usec_to_now(non_neg_integer()) -> erlang:timestamp().
|
||||
usec_to_now(Int) ->
|
||||
Secs = Int div 1000000,
|
||||
USec = Int rem 1000000,
|
||||
MSec = Secs div 1000000,
|
||||
Sec = Secs rem 1000000,
|
||||
{MSec, Sec, USec}.
|
||||
|
||||
l2i(I) when is_integer(I) -> I;
|
||||
l2i(L) when is_binary(L) -> binary_to_integer(L).
|
||||
|
||||
|
@ -434,8 +434,9 @@ message_is_archived(false, #{jid := JID}, Pkt) ->
|
||||
delete_old_messages(TypeBin, Days) when TypeBin == <<"chat">>;
|
||||
TypeBin == <<"groupchat">>;
|
||||
TypeBin == <<"all">> ->
|
||||
CurrentTime = p1_time_compat:system_time(micro_seconds),
|
||||
Diff = Days * 24 * 60 * 60 * 1000000,
|
||||
TimeStamp = usec_to_now(p1_time_compat:system_time(micro_seconds) - Diff),
|
||||
TimeStamp = misc:usec_to_now(CurrentTime - Diff),
|
||||
Type = misc:binary_to_atom(TypeBin),
|
||||
DBTypes = lists:usort(
|
||||
lists:map(
|
||||
@ -830,7 +831,7 @@ select(_LServer, JidRequestor, JidArchive, Query, RSM,
|
||||
Msgs =
|
||||
lists:flatmap(
|
||||
fun({Nick, Pkt, _HaveSubject, Now, _Size}) ->
|
||||
TS = now_to_usec(Now),
|
||||
TS = misc:now_to_usec(Now),
|
||||
case match_interval(Now, Start, End) and
|
||||
match_rsm(Now, RSM) of
|
||||
true ->
|
||||
@ -979,24 +980,14 @@ match_interval(Now, Start, End) ->
|
||||
(Now >= Start) and (Now =< End).
|
||||
|
||||
match_rsm(Now, #rsm_set{'after' = ID}) when is_binary(ID), ID /= <<"">> ->
|
||||
Now1 = (catch usec_to_now(binary_to_integer(ID))),
|
||||
Now1 = (catch misc:usec_to_now(binary_to_integer(ID))),
|
||||
Now > Now1;
|
||||
match_rsm(Now, #rsm_set{before = ID}) when is_binary(ID), ID /= <<"">> ->
|
||||
Now1 = (catch usec_to_now(binary_to_integer(ID))),
|
||||
Now1 = (catch misc:usec_to_now(binary_to_integer(ID))),
|
||||
Now < Now1;
|
||||
match_rsm(_Now, _) ->
|
||||
true.
|
||||
|
||||
now_to_usec({MSec, Sec, USec}) ->
|
||||
(MSec*1000000 + Sec)*1000000 + USec.
|
||||
|
||||
usec_to_now(Int) ->
|
||||
Secs = Int div 1000000,
|
||||
USec = Int rem 1000000,
|
||||
MSec = Secs div 1000000,
|
||||
Sec = Secs rem 1000000,
|
||||
{MSec, Sec, USec}.
|
||||
|
||||
get_jids(undefined) ->
|
||||
[];
|
||||
get_jids(Js) ->
|
||||
|
596
src/mod_push.erl
Normal file
596
src/mod_push.erl
Normal file
@ -0,0 +1,596 @@
|
||||
%%%----------------------------------------------------------------------
|
||||
%%% File : mod_push.erl
|
||||
%%% Author : Holger Weiss <holger@zedat.fu-berlin.de>
|
||||
%%% Purpose : Push Notifications (XEP-0357)
|
||||
%%% Created : 15 Jul 2017 by Holger Weiss <holger@zedat.fu-berlin.de>
|
||||
%%%
|
||||
%%%
|
||||
%%% ejabberd, Copyright (C) 2017 ProcessOne
|
||||
%%%
|
||||
%%% This program is free software; you can redistribute it and/or
|
||||
%%% modify it under the terms of the GNU General Public License as
|
||||
%%% published by the Free Software Foundation; either version 2 of the
|
||||
%%% License, or (at your option) any later version.
|
||||
%%%
|
||||
%%% This program is distributed in the hope that it will be useful,
|
||||
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
%%% General Public License for more details.
|
||||
%%%
|
||||
%%% You should have received a copy of the GNU General Public License along
|
||||
%%% with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
%%%
|
||||
%%%----------------------------------------------------------------------
|
||||
|
||||
-module(mod_push).
|
||||
-author('holger@zedat.fu-berlin.de').
|
||||
-protocol({xep, 357, '0.2'}).
|
||||
|
||||
-behavior(gen_mod).
|
||||
|
||||
%% gen_mod callbacks.
|
||||
-export([start/2, stop/1, reload/3, mod_opt_type/1, depends/2]).
|
||||
|
||||
%% ejabberd_hooks callbacks.
|
||||
-export([disco_sm_features/5, c2s_session_pending/1, c2s_copy_session/2,
|
||||
c2s_handle_cast/2, c2s_stanza/3, mam_message/6, offline_message/1,
|
||||
remove_user/2]).
|
||||
|
||||
%% gen_iq_handler callback.
|
||||
-export([process_iq/1]).
|
||||
|
||||
%% ejabberd command.
|
||||
-export([get_commands_spec/0, delete_old_sessions/1]).
|
||||
|
||||
%% API (used by mod_push_keepalive).
|
||||
-export([notify/1, notify/3, notify/5]).
|
||||
|
||||
-include("ejabberd.hrl").
|
||||
-include("ejabberd_commands.hrl").
|
||||
-include("logger.hrl").
|
||||
-include("xmpp.hrl").
|
||||
|
||||
-define(PUSH_CACHE, push_cache).
|
||||
|
||||
-type c2s_state() :: ejabberd_c2s:state().
|
||||
-type timestamp() :: erlang:timestamp().
|
||||
-type push_session() :: {timestamp(), ljid(), binary(), xdata()}.
|
||||
|
||||
-callback init(binary(), gen_mod:opts())
|
||||
-> any().
|
||||
-callback store_session(binary(), binary(), timestamp(), jid(), binary(),
|
||||
xdata())
|
||||
-> {ok, push_session()} | error.
|
||||
-callback lookup_session(binary(), binary(), jid(), binary())
|
||||
-> {ok, push_session()} | error.
|
||||
-callback lookup_session(binary(), binary(), timestamp())
|
||||
-> {ok, push_session()} | error.
|
||||
-callback lookup_sessions(binary(), binary(), jid())
|
||||
-> {ok, [push_session()]} | error.
|
||||
-callback lookup_sessions(binary(), binary())
|
||||
-> {ok, [push_session()]} | error.
|
||||
-callback lookup_sessions(binary())
|
||||
-> {ok, [push_session()]} | error.
|
||||
-callback delete_session(binary(), binary(), timestamp())
|
||||
-> ok | error.
|
||||
-callback delete_old_sessions(binary() | global, erlang:timestamp())
|
||||
-> any().
|
||||
-callback use_cache(binary())
|
||||
-> boolean().
|
||||
-callback cache_nodes(binary())
|
||||
-> [node()].
|
||||
|
||||
-optional_callbacks([use_cache/1, cache_nodes/1]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_mod callbacks.
|
||||
%%--------------------------------------------------------------------
|
||||
-spec start(binary(), gen_mod:opts()) -> ok.
|
||||
start(Host, Opts) ->
|
||||
IQDisc = gen_mod:get_opt(iqdisc, Opts, gen_iq_handler:iqdisc(Host)),
|
||||
Mod = gen_mod:db_mod(Host, Opts, ?MODULE),
|
||||
Mod:init(Host, Opts),
|
||||
init_cache(Mod, Host, Opts),
|
||||
register_iq_handlers(Host, IQDisc),
|
||||
register_hooks(Host),
|
||||
ejabberd_commands:register_commands(get_commands_spec()).
|
||||
|
||||
-spec stop(binary()) -> ok.
|
||||
stop(Host) ->
|
||||
unregister_hooks(Host),
|
||||
unregister_iq_handlers(Host),
|
||||
ejabberd_commands:unregister_commands(get_commands_spec()).
|
||||
|
||||
-spec reload(binary(), gen_mod:opts(), gen_mod:opts()) -> ok.
|
||||
reload(Host, NewOpts, OldOpts) ->
|
||||
NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE),
|
||||
OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE),
|
||||
if NewMod /= OldMod ->
|
||||
NewMod:init(Host, NewOpts);
|
||||
true ->
|
||||
ok
|
||||
end,
|
||||
case gen_mod:is_equal_opt(iqdisc, NewOpts, OldOpts,
|
||||
gen_iq_handler:iqdisc(Host)) of
|
||||
{false, IQDisc, _} ->
|
||||
register_iq_handlers(Host, IQDisc);
|
||||
true ->
|
||||
ok
|
||||
end.
|
||||
|
||||
-spec depends(binary(), gen_mod:opts()) -> [{module(), hard | soft}].
|
||||
depends(_Host, _Opts) ->
|
||||
[].
|
||||
|
||||
-spec mod_opt_type(atom()) -> fun((term()) -> term()) | [atom()].
|
||||
mod_opt_type(db_type) ->
|
||||
fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
|
||||
mod_opt_type(O) when O == cache_life_time; O == cache_size ->
|
||||
fun(I) when is_integer(I), I > 0 -> I;
|
||||
(infinity) -> infinity
|
||||
end;
|
||||
mod_opt_type(O) when O == use_cache; O == cache_missed ->
|
||||
fun (B) when is_boolean(B) -> B end;
|
||||
mod_opt_type(iqdisc) ->
|
||||
fun gen_iq_handler:check_type/1;
|
||||
mod_opt_type(_) ->
|
||||
[db_type, cache_life_time, cache_size, use_cache, cache_missed, iqdisc].
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% ejabberd command callback.
|
||||
%%--------------------------------------------------------------------
|
||||
-spec get_commands_spec() -> [ejabberd_commands()].
|
||||
get_commands_spec() ->
|
||||
[#ejabberd_commands{name = delete_old_push_sessions, tags = [purge],
|
||||
desc = "Remove push sessions older than DAYS",
|
||||
module = ?MODULE, function = delete_old_sessions,
|
||||
args = [{days, integer}],
|
||||
result = {res, rescode}}].
|
||||
|
||||
-spec delete_old_sessions(non_neg_integer()) -> ok | any().
|
||||
delete_old_sessions(Days) ->
|
||||
CurrentTime = p1_time_compat:system_time(micro_seconds),
|
||||
Diff = Days * 24 * 60 * 60 * 1000000,
|
||||
TimeStamp = misc:usec_to_now(CurrentTime - Diff),
|
||||
DBTypes = lists:usort(
|
||||
lists:map(
|
||||
fun(Host) ->
|
||||
case gen_mod:db_type(Host, ?MODULE) of
|
||||
sql -> {sql, Host};
|
||||
Other -> {Other, global}
|
||||
end
|
||||
end, ?MYHOSTS)),
|
||||
Results = lists:map(
|
||||
fun({DBType, Host}) ->
|
||||
Mod = gen_mod:db_mod(DBType, ?MODULE),
|
||||
Mod:delete_old_sessions(Host, TimeStamp)
|
||||
end, DBTypes),
|
||||
ets_cache:clear(?PUSH_CACHE, ejabberd_cluster:get_nodes()),
|
||||
case lists:filter(fun(Res) -> Res /= ok end, Results) of
|
||||
[] ->
|
||||
?INFO_MSG("Deleted push sessions older than ~B days", [Days]),
|
||||
ok;
|
||||
[NotOk | _] ->
|
||||
?ERROR_MSG("Error while deleting old push sessions: ~p", [NotOk]),
|
||||
NotOk
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Register/unregister hooks.
|
||||
%%--------------------------------------------------------------------
|
||||
-spec register_hooks(binary()) -> ok.
|
||||
register_hooks(Host) ->
|
||||
ejabberd_hooks:add(disco_sm_features, Host, ?MODULE,
|
||||
disco_sm_features, 50),
|
||||
ejabberd_hooks:add(c2s_session_pending, Host, ?MODULE,
|
||||
c2s_session_pending, 50),
|
||||
ejabberd_hooks:add(c2s_copy_session, Host, ?MODULE,
|
||||
c2s_copy_session, 50),
|
||||
ejabberd_hooks:add(c2s_handle_cast, Host, ?MODULE,
|
||||
c2s_handle_cast, 50),
|
||||
ejabberd_hooks:add(c2s_handle_send, Host, ?MODULE,
|
||||
c2s_stanza, 50),
|
||||
ejabberd_hooks:add(store_mam_message, Host, ?MODULE,
|
||||
mam_message, 50),
|
||||
ejabberd_hooks:add(offline_message_hook, Host, ?MODULE,
|
||||
offline_message, 50),
|
||||
ejabberd_hooks:add(remove_user, Host, ?MODULE,
|
||||
remove_user, 50).
|
||||
|
||||
-spec unregister_hooks(binary()) -> ok.
|
||||
unregister_hooks(Host) ->
|
||||
ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE,
|
||||
disco_sm_features, 50),
|
||||
ejabberd_hooks:delete(c2s_session_pending, Host, ?MODULE,
|
||||
c2s_session_pending, 50),
|
||||
ejabberd_hooks:delete(c2s_copy_session, Host, ?MODULE,
|
||||
c2s_copy_session, 50),
|
||||
ejabberd_hooks:delete(c2s_handle_cast, Host, ?MODULE,
|
||||
c2s_handle_cast, 50),
|
||||
ejabberd_hooks:delete(c2s_handle_send, Host, ?MODULE,
|
||||
c2s_stanza, 50),
|
||||
ejabberd_hooks:delete(store_mam_message, Host, ?MODULE,
|
||||
mam_message, 50),
|
||||
ejabberd_hooks:delete(offline_message_hook, Host, ?MODULE,
|
||||
offline_message, 50),
|
||||
ejabberd_hooks:delete(remove_user, Host, ?MODULE,
|
||||
remove_user, 50).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Service discovery.
|
||||
%%--------------------------------------------------------------------
|
||||
-spec disco_sm_features(empty | {result, [binary()]} | {error, stanza_error()},
|
||||
jid(), jid(), binary(), binary())
|
||||
-> {result, [binary()]} | {error, stanza_error()}.
|
||||
disco_sm_features(empty, From, To, Node, Lang) ->
|
||||
disco_sm_features({result, []}, From, To, Node, Lang);
|
||||
disco_sm_features({result, OtherFeatures},
|
||||
#jid{luser = U, lserver = S},
|
||||
#jid{luser = U, lserver = S}, <<"">>, _Lang) ->
|
||||
{result, [?NS_PUSH_0 | OtherFeatures]};
|
||||
disco_sm_features(Acc, _From, _To, _Node, _Lang) ->
|
||||
Acc.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% IQ handlers.
|
||||
%%--------------------------------------------------------------------
|
||||
-spec register_iq_handlers(binary(), gen_iq_handler:type()) -> ok.
|
||||
register_iq_handlers(Host, IQDisc) ->
|
||||
gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_PUSH_0,
|
||||
?MODULE, process_iq, IQDisc).
|
||||
|
||||
-spec unregister_iq_handlers(binary()) -> ok.
|
||||
unregister_iq_handlers(Host) ->
|
||||
gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_PUSH_0).
|
||||
|
||||
-spec process_iq(iq()) -> iq().
|
||||
process_iq(#iq{type = get, lang = Lang} = IQ) ->
|
||||
Txt = <<"Value 'get' of 'type' attribute is not allowed">>,
|
||||
xmpp:make_error(IQ, xmpp:err_not_allowed(Txt, Lang));
|
||||
process_iq(#iq{lang = Lang, sub_els = [#push_enable{node = <<>>}]} = IQ) ->
|
||||
Txt = <<"Enabling push without 'node' attribute is not supported">>,
|
||||
xmpp:make_error(IQ, xmpp:err_feature_not_implemented(Txt, Lang));
|
||||
process_iq(#iq{from = #jid{lserver = LServer} = JID,
|
||||
to = #jid{lserver = LServer},
|
||||
sub_els = [#push_enable{jid = PushJID,
|
||||
node = Node,
|
||||
xdata = XData}]} = IQ) ->
|
||||
case enable(JID, PushJID, Node, XData) of
|
||||
ok ->
|
||||
xmpp:make_iq_result(IQ);
|
||||
error ->
|
||||
xmpp:make_error(IQ, xmpp:err_internal_server_error())
|
||||
end;
|
||||
process_iq(#iq{from = #jid{lserver = LServer} = JID,
|
||||
to = #jid{lserver = LServer},
|
||||
sub_els = [#push_disable{jid = PushJID,
|
||||
node = Node}]} = IQ) ->
|
||||
case disable(JID, PushJID, Node) of
|
||||
ok ->
|
||||
xmpp:make_iq_result(IQ);
|
||||
error ->
|
||||
xmpp:make_error(IQ, xmpp:err_item_not_found())
|
||||
end;
|
||||
process_iq(IQ) ->
|
||||
xmpp:make_error(IQ, xmpp:err_not_allowed()).
|
||||
|
||||
-spec enable(jid(), jid(), binary(), xdata()) -> ok | error.
|
||||
enable(#jid{luser = LUser, lserver = LServer, lresource = LResource} = JID,
|
||||
PushJID, Node, XData) ->
|
||||
case ejabberd_sm:get_session_sid(LUser, LServer, LResource) of
|
||||
{TS, PID} ->
|
||||
case store_session(LUser, LServer, TS, PushJID, Node, XData) of
|
||||
{ok, _} ->
|
||||
?INFO_MSG("Enabling push notifications for ~s",
|
||||
[jid:encode(JID)]),
|
||||
ejabberd_c2s:cast(PID, push_enable);
|
||||
error ->
|
||||
?ERROR_MSG("Cannot enable push for ~s: database error",
|
||||
[jid:encode(JID)]),
|
||||
error
|
||||
end;
|
||||
none ->
|
||||
?WARNING_MSG("Cannot enable push for ~s: session not found",
|
||||
[jid:encode(JID)]),
|
||||
error
|
||||
end.
|
||||
|
||||
-spec disable(jid(), jid(), binary() | undefined) -> ok | error.
|
||||
disable(#jid{luser = LUser, lserver = LServer, lresource = LResource} = JID,
|
||||
PushJID, Node) ->
|
||||
case ejabberd_sm:get_session_sid(LUser, LServer, LResource) of
|
||||
{_TS, PID} ->
|
||||
?INFO_MSG("Disabling push notifications for ~s",
|
||||
[jid:encode(JID)]),
|
||||
ejabberd_c2s:cast(PID, push_disable);
|
||||
none ->
|
||||
?WARNING_MSG("Session not found while disabling push for ~s",
|
||||
[jid:encode(JID)])
|
||||
end,
|
||||
if Node /= undefined ->
|
||||
delete_session(LUser, LServer, PushJID, Node);
|
||||
true ->
|
||||
delete_sessions(LUser, LServer, PushJID)
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Hook callbacks.
|
||||
%%--------------------------------------------------------------------
|
||||
-spec c2s_stanza(c2s_state(), xmpp_element() | xmlel(), term()) -> c2s_state().
|
||||
c2s_stanza(#{push_enabled := true, mgmt_state := pending} = State,
|
||||
_Pkt, _SendResult) ->
|
||||
notify(State),
|
||||
State;
|
||||
c2s_stanza(State, _Pkt, _SendResult) ->
|
||||
State.
|
||||
|
||||
-spec mam_message(message() | drop, binary(), binary(), jid(),
|
||||
chat | groupchat, recv | send) -> message().
|
||||
mam_message(#message{meta = #{push_notified := true}} = Pkt,
|
||||
_LUser, _LServer, _Peer, _Type, _Dir) ->
|
||||
Pkt;
|
||||
mam_message(#message{} = Pkt, LUser, LServer, _Peer, chat, _Dir) ->
|
||||
case lookup_sessions(LUser, LServer) of
|
||||
{ok, [_|_] = Clients} ->
|
||||
case drop_online_sessions(LUser, LServer, Clients) of
|
||||
[_|_] = Clients1 ->
|
||||
?DEBUG("Notifying ~s@~s of MAM message", [LUser, LServer]),
|
||||
notify(LUser, LServer, Clients1);
|
||||
[] ->
|
||||
ok
|
||||
end;
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
xmpp:put_meta(Pkt, push_notified, true);
|
||||
mam_message(Pkt, _LUser, _LServer, _Peer, _Type, _Dir) ->
|
||||
Pkt.
|
||||
|
||||
-spec offline_message({any(), message()}) -> {any(), message()}.
|
||||
offline_message({_Action, #message{meta = #{push_notified := true}}} = Acc) ->
|
||||
Acc;
|
||||
offline_message({Action, #message{to = #jid{luser = LUser,
|
||||
lserver = LServer}} = Pkt}) ->
|
||||
case lookup_sessions(LUser, LServer) of
|
||||
{ok, [_|_] = Clients} ->
|
||||
?DEBUG("Notifying ~s@~s of offline message", [LUser, LServer]),
|
||||
notify(LUser, LServer, Clients);
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
{Action, xmpp:put_meta(Pkt, push_notified, true)}.
|
||||
|
||||
-spec c2s_session_pending(c2s_state()) -> c2s_state().
|
||||
c2s_session_pending(#{push_enabled := true, mgmt_queue := Queue} = State) ->
|
||||
case p1_queue:len(Queue) of
|
||||
Len when Len > 0 ->
|
||||
?DEBUG("Notifying client of unacknowledged messages", []),
|
||||
notify(State),
|
||||
State;
|
||||
0 ->
|
||||
State
|
||||
end;
|
||||
c2s_session_pending(State) ->
|
||||
State.
|
||||
|
||||
-spec c2s_copy_session(c2s_state(), c2s_state()) -> c2s_state().
|
||||
c2s_copy_session(State, #{push_enabled := true}) ->
|
||||
State#{push_enabled => true};
|
||||
c2s_copy_session(State, _) ->
|
||||
State.
|
||||
|
||||
-spec c2s_handle_cast(c2s_state(), any()) -> c2s_state() | {stop, c2s_state()}.
|
||||
c2s_handle_cast(State, push_enable) ->
|
||||
{stop, State#{push_enabled => true}};
|
||||
c2s_handle_cast(State, push_disable) ->
|
||||
{stop, maps:remove(push_enabled, State)};
|
||||
c2s_handle_cast(State, _Msg) ->
|
||||
State.
|
||||
|
||||
-spec remove_user(binary(), binary()) -> ok | error.
|
||||
remove_user(LUser, LServer) ->
|
||||
?INFO_MSG("Removing any push sessions of ~s@~s", [LUser, LServer]),
|
||||
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||
LookupFun = fun() -> Mod:lookup_sessions(LUser, LServer) end,
|
||||
delete_sessions(LUser, LServer, LookupFun, Mod).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Generate push notifications.
|
||||
%%--------------------------------------------------------------------
|
||||
-spec notify(c2s_state()) -> ok.
|
||||
notify(#{jid := #jid{luser = LUser, lserver = LServer}, sid := {TS, _}}) ->
|
||||
case lookup_session(LUser, LServer, TS) of
|
||||
{ok, Client} ->
|
||||
notify(LUser, LServer, [Client]);
|
||||
error ->
|
||||
ok
|
||||
end.
|
||||
|
||||
-spec notify(binary(), binary(), [push_session()]) -> ok.
|
||||
notify(LUser, LServer, Clients) ->
|
||||
lists:foreach(
|
||||
fun({TS, PushLJID, Node, XData}) ->
|
||||
HandleResponse = fun(#iq{type = result}) ->
|
||||
ok;
|
||||
(#iq{type = error}) ->
|
||||
delete_session(LUser, LServer, TS);
|
||||
(timeout) ->
|
||||
ok % Hmm.
|
||||
end,
|
||||
notify(LServer, PushLJID, Node, XData, HandleResponse)
|
||||
end, Clients).
|
||||
|
||||
-spec notify(binary(), ljid(), binary(), xdata(),
|
||||
fun((iq() | timeout) -> any())) -> ok.
|
||||
notify(LServer, PushLJID, Node, XData, HandleResponse) ->
|
||||
From = jid:make(LServer),
|
||||
Item = #ps_item{xml_els = [xmpp:encode(#push_notification{})]},
|
||||
PubSub = #pubsub{publish = #ps_publish{node = Node, items = [Item]},
|
||||
publish_options = XData},
|
||||
IQ = #iq{type = set,
|
||||
from = From,
|
||||
to = jid:make(PushLJID),
|
||||
id = randoms:get_string(),
|
||||
sub_els = [PubSub]},
|
||||
ejabberd_local:route_iq(IQ, HandleResponse),
|
||||
ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions.
|
||||
%%--------------------------------------------------------------------
|
||||
-spec store_session(binary(), binary(), timestamp(), jid(), binary(), xdata())
|
||||
-> {ok, push_session()} | error.
|
||||
store_session(LUser, LServer, TS, PushJID, Node, XData) ->
|
||||
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||
delete_session(LUser, LServer, PushJID, Node),
|
||||
case use_cache(Mod, LServer) of
|
||||
true ->
|
||||
ets_cache:delete(?PUSH_CACHE, {LUser, LServer},
|
||||
cache_nodes(Mod, LServer)),
|
||||
ets_cache:update(
|
||||
?PUSH_CACHE,
|
||||
{LUser, LServer, TS}, {ok, {TS, PushJID, Node, XData}},
|
||||
fun() ->
|
||||
Mod:store_session(LUser, LServer, TS, PushJID, Node,
|
||||
XData)
|
||||
end, cache_nodes(Mod, LServer));
|
||||
false ->
|
||||
Mod:store_session(LUser, LServer, TS, PushJID, Node, XData)
|
||||
end.
|
||||
|
||||
-spec lookup_session(binary(), binary(), timestamp())
|
||||
-> {ok, push_session()} | error.
|
||||
lookup_session(LUser, LServer, TS) ->
|
||||
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||
case use_cache(Mod, LServer) of
|
||||
true ->
|
||||
ets_cache:lookup(
|
||||
?PUSH_CACHE, {LUser, LServer, TS},
|
||||
fun() -> Mod:lookup_session(LUser, LServer, TS) end);
|
||||
false ->
|
||||
Mod:lookup_session(LUser, LServer, TS)
|
||||
end.
|
||||
|
||||
-spec lookup_sessions(binary(), binary()) -> {ok, [push_session()]} | error.
|
||||
lookup_sessions(LUser, LServer) ->
|
||||
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||
case use_cache(Mod, LServer) of
|
||||
true ->
|
||||
ets_cache:lookup(
|
||||
?PUSH_CACHE, {LUser, LServer},
|
||||
fun() -> Mod:lookup_sessions(LUser, LServer) end);
|
||||
false ->
|
||||
Mod:lookup_sessions(LUser, LServer)
|
||||
end.
|
||||
|
||||
-spec delete_session(binary(), binary(), timestamp()) -> ok | error.
|
||||
delete_session(LUser, LServer, TS) ->
|
||||
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||
ok = Mod:delete_session(LUser, LServer, TS),
|
||||
case use_cache(Mod, LServer) of
|
||||
true ->
|
||||
ets_cache:delete(?PUSH_CACHE, {LUser, LServer},
|
||||
cache_nodes(Mod, LServer)),
|
||||
ets_cache:delete(?PUSH_CACHE, {LUser, LServer, TS},
|
||||
cache_nodes(Mod, LServer));
|
||||
false ->
|
||||
ok
|
||||
end.
|
||||
|
||||
-spec delete_session(binary(), binary(), jid(), binary()) -> ok | error.
|
||||
delete_session(LUser, LServer, PushJID, Node) ->
|
||||
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||
case Mod:lookup_session(LUser, LServer, PushJID, Node) of
|
||||
{ok, {TS, _, _, _}} ->
|
||||
delete_session(LUser, LServer, TS);
|
||||
error ->
|
||||
error
|
||||
end.
|
||||
|
||||
-spec delete_sessions(binary(), binary(), jid()) -> ok | error.
|
||||
delete_sessions(LUser, LServer, PushJID) ->
|
||||
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||
LookupFun = fun() -> Mod:lookup_sessions(LUser, LServer, PushJID) end,
|
||||
delete_sessions(LUser, LServer, LookupFun, Mod).
|
||||
|
||||
-spec delete_sessions(binary(), binary(), fun(() -> ok | error), module())
|
||||
-> ok | error.
|
||||
delete_sessions(LUser, LServer, LookupFun, Mod) ->
|
||||
case LookupFun() of
|
||||
{ok, Clients} ->
|
||||
case use_cache(Mod, LServer) of
|
||||
true ->
|
||||
ets_cache:delete(?PUSH_CACHE, {LUser, LServer},
|
||||
cache_nodes(Mod, LServer));
|
||||
false ->
|
||||
ok
|
||||
end,
|
||||
lists:foreach(
|
||||
fun({TS, _, _, _}) ->
|
||||
ok = Mod:delete_session(LUser, LServer, TS),
|
||||
case use_cache(Mod, LServer) of
|
||||
true ->
|
||||
ets_cache:delete(?PUSH_CACHE,
|
||||
{LUser, LServer, TS},
|
||||
cache_nodes(Mod, LServer));
|
||||
false ->
|
||||
ok
|
||||
end
|
||||
end, Clients);
|
||||
error ->
|
||||
error
|
||||
end.
|
||||
|
||||
-spec drop_online_sessions(binary(), binary(), [push_session()])
|
||||
-> [push_session()].
|
||||
drop_online_sessions(LUser, LServer, Clients) ->
|
||||
SessIDs = ejabberd_sm:get_session_sids(LUser, LServer),
|
||||
[Client || {TS, _, _, _} = Client <- Clients,
|
||||
not lists:keyfind(TS, 1, SessIDs)].
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Caching.
|
||||
%%--------------------------------------------------------------------
|
||||
-spec init_cache(module(), binary(), gen_mod:opts()) -> ok.
|
||||
init_cache(Mod, Host, Opts) ->
|
||||
case use_cache(Mod, Host) of
|
||||
true ->
|
||||
CacheOpts = cache_opts(Host, Opts),
|
||||
ets_cache:new(?PUSH_CACHE, CacheOpts);
|
||||
false ->
|
||||
ets_cache:delete(?PUSH_CACHE)
|
||||
end.
|
||||
|
||||
-spec cache_opts(binary(), gen_mod:opts()) -> [proplists:property()].
|
||||
cache_opts(Host, Opts) ->
|
||||
MaxSize = gen_mod:get_opt(
|
||||
cache_size, Opts,
|
||||
ejabberd_config:cache_size(Host)),
|
||||
CacheMissed = gen_mod:get_opt(
|
||||
cache_missed, Opts,
|
||||
ejabberd_config:cache_missed(Host)),
|
||||
LifeTime = case gen_mod:get_opt(
|
||||
cache_life_time, Opts,
|
||||
ejabberd_config:cache_life_time(Host)) of
|
||||
infinity -> infinity;
|
||||
I -> timer:seconds(I)
|
||||
end,
|
||||
[{max_size, MaxSize}, {cache_missed, CacheMissed}, {life_time, LifeTime}].
|
||||
|
||||
-spec use_cache(module(), binary()) -> boolean().
|
||||
use_cache(Mod, Host) ->
|
||||
case erlang:function_exported(Mod, use_cache, 1) of
|
||||
true -> Mod:use_cache(Host);
|
||||
false ->
|
||||
gen_mod:get_module_opt(
|
||||
Host, ?MODULE, use_cache,
|
||||
ejabberd_config:use_cache(Host))
|
||||
end.
|
||||
|
||||
-spec cache_nodes(module(), binary()) -> [node()].
|
||||
cache_nodes(Mod, Host) ->
|
||||
case erlang:function_exported(Mod, cache_nodes, 1) of
|
||||
true -> Mod:cache_nodes(Host);
|
||||
false -> ejabberd_cluster:get_nodes()
|
||||
end.
|
236
src/mod_push_keepalive.erl
Normal file
236
src/mod_push_keepalive.erl
Normal file
@ -0,0 +1,236 @@
|
||||
%%%----------------------------------------------------------------------
|
||||
%%% File : mod_push_keepalive.erl
|
||||
%%% Author : Holger Weiss <holger@zedat.fu-berlin.de>
|
||||
%%% Purpose : Keep pending XEP-0198 sessions alive with XEP-0357
|
||||
%%% Created : 15 Jul 2017 by Holger Weiss <holger@zedat.fu-berlin.de>
|
||||
%%%
|
||||
%%%
|
||||
%%% ejabberd, Copyright (C) 2017 ProcessOne
|
||||
%%%
|
||||
%%% This program is free software; you can redistribute it and/or
|
||||
%%% modify it under the terms of the GNU General Public License as
|
||||
%%% published by the Free Software Foundation; either version 2 of the
|
||||
%%% License, or (at your option) any later version.
|
||||
%%%
|
||||
%%% This program is distributed in the hope that it will be useful,
|
||||
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
%%% General Public License for more details.
|
||||
%%%
|
||||
%%% You should have received a copy of the GNU General Public License along
|
||||
%%% with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
%%%
|
||||
%%%----------------------------------------------------------------------
|
||||
|
||||
-module(mod_push_keepalive).
|
||||
-author('holger@zedat.fu-berlin.de').
|
||||
|
||||
-behavior(gen_mod).
|
||||
|
||||
%% gen_mod callbacks.
|
||||
-export([start/2, stop/1, reload/3, mod_opt_type/1, depends/2]).
|
||||
|
||||
%% ejabberd_hooks callbacks.
|
||||
-export([c2s_session_pending/1, c2s_session_resumed/1, c2s_copy_session/2,
|
||||
c2s_handle_cast/2, c2s_handle_info/2, c2s_stanza/3]).
|
||||
|
||||
-include("logger.hrl").
|
||||
-include("xmpp.hrl").
|
||||
|
||||
-define(PUSH_BEFORE_TIMEOUT_SECS, 120).
|
||||
|
||||
-type c2s_state() :: ejabberd_c2s:state().
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_mod callbacks.
|
||||
%%--------------------------------------------------------------------
|
||||
-spec start(binary(), gen_mod:opts()) -> ok.
|
||||
start(Host, Opts) ->
|
||||
case gen_mod:get_opt(wake_on_start, Opts, false) of
|
||||
true ->
|
||||
wake_all(Host);
|
||||
false ->
|
||||
ok
|
||||
end,
|
||||
register_hooks(Host).
|
||||
|
||||
-spec stop(binary()) -> ok.
|
||||
stop(Host) ->
|
||||
unregister_hooks(Host).
|
||||
|
||||
-spec reload(binary(), gen_mod:opts(), gen_mod:opts()) -> ok.
|
||||
reload(Host, NewOpts, OldOpts) ->
|
||||
case gen_mod:is_equal_opt(wake_on_start, NewOpts, OldOpts, false) of
|
||||
{false, true, _} ->
|
||||
wake_all(Host);
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
ok.
|
||||
|
||||
-spec depends(binary(), gen_mod:opts()) -> [{module(), hard | soft}].
|
||||
depends(_Host, _Opts) ->
|
||||
[{mod_push, hard},
|
||||
{mod_client_state, soft},
|
||||
{mod_stream_mgmt, soft}].
|
||||
|
||||
-spec mod_opt_type(atom()) -> fun((term()) -> term()) | [atom()].
|
||||
mod_opt_type(resume_timeout) ->
|
||||
fun(I) when is_integer(I), I >= 0 -> I;
|
||||
(undefined) -> undefined
|
||||
end;
|
||||
mod_opt_type(wake_on_start) ->
|
||||
fun (B) when is_boolean(B) -> B end;
|
||||
mod_opt_type(wake_on_timeout) ->
|
||||
fun (B) when is_boolean(B) -> B end;
|
||||
mod_opt_type(O) when O == cache_life_time; O == cache_size ->
|
||||
fun(I) when is_integer(I), I > 0 -> I;
|
||||
(infinity) -> infinity
|
||||
end;
|
||||
mod_opt_type(O) when O == use_cache; O == cache_missed ->
|
||||
fun (B) when is_boolean(B) -> B end;
|
||||
mod_opt_type(_) ->
|
||||
[resume_timeout, wake_on_start, wake_on_timeout, db_type, cache_life_time,
|
||||
cache_size, use_cache, cache_missed, iqdisc].
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Register/unregister hooks.
|
||||
%%--------------------------------------------------------------------
|
||||
-spec register_hooks(binary()) -> ok.
|
||||
register_hooks(Host) ->
|
||||
ejabberd_hooks:add(c2s_session_pending, Host, ?MODULE,
|
||||
c2s_session_pending, 50),
|
||||
ejabberd_hooks:add(c2s_session_resumed, Host, ?MODULE,
|
||||
c2s_session_resumed, 50),
|
||||
ejabberd_hooks:add(c2s_copy_session, Host, ?MODULE,
|
||||
c2s_copy_session, 50),
|
||||
ejabberd_hooks:add(c2s_handle_cast, Host, ?MODULE,
|
||||
c2s_handle_cast, 40),
|
||||
ejabberd_hooks:add(c2s_handle_info, Host, ?MODULE,
|
||||
c2s_handle_info, 50),
|
||||
ejabberd_hooks:add(c2s_handle_send, Host, ?MODULE,
|
||||
c2s_stanza, 50).
|
||||
|
||||
-spec unregister_hooks(binary()) -> ok.
|
||||
unregister_hooks(Host) ->
|
||||
ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE,
|
||||
disco_sm_features, 50),
|
||||
ejabberd_hooks:delete(c2s_session_pending, Host, ?MODULE,
|
||||
c2s_session_pending, 50),
|
||||
ejabberd_hooks:delete(c2s_session_resumed, Host, ?MODULE,
|
||||
c2s_session_resumed, 50),
|
||||
ejabberd_hooks:delete(c2s_copy_session, Host, ?MODULE,
|
||||
c2s_copy_session, 50),
|
||||
ejabberd_hooks:delete(c2s_handle_cast, Host, ?MODULE,
|
||||
c2s_handle_cast, 40),
|
||||
ejabberd_hooks:delete(c2s_handle_info, Host, ?MODULE,
|
||||
c2s_handle_info, 50),
|
||||
ejabberd_hooks:delete(c2s_handle_send, Host, ?MODULE,
|
||||
c2s_stanza, 50).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Hook callbacks.
|
||||
%%--------------------------------------------------------------------
|
||||
-spec c2s_stanza(c2s_state(), xmpp_element() | xmlel(), term()) -> c2s_state().
|
||||
c2s_stanza(#{push_enabled := true, mgmt_state := pending} = State,
|
||||
_Pkt, _SendResult) ->
|
||||
maybe_restore_resume_timeout(State);
|
||||
c2s_stanza(State, _Pkt, _SendResult) ->
|
||||
State.
|
||||
|
||||
-spec c2s_session_pending(c2s_state()) -> c2s_state().
|
||||
c2s_session_pending(#{push_enabled := true, mgmt_queue := Queue} = State) ->
|
||||
case p1_queue:len(Queue) of
|
||||
0 ->
|
||||
State1 = maybe_adjust_resume_timeout(State),
|
||||
maybe_start_wakeup_timer(State1);
|
||||
_ ->
|
||||
State
|
||||
end;
|
||||
c2s_session_pending(State) ->
|
||||
State.
|
||||
|
||||
-spec c2s_session_resumed(c2s_state()) -> c2s_state().
|
||||
c2s_session_resumed(#{push_enabled := true} = State) ->
|
||||
maybe_restore_resume_timeout(State);
|
||||
c2s_session_resumed(State) ->
|
||||
State.
|
||||
|
||||
-spec c2s_copy_session(c2s_state(), c2s_state()) -> c2s_state().
|
||||
c2s_copy_session(State, #{push_enabled := true,
|
||||
push_resume_timeout := ResumeTimeout,
|
||||
push_wake_on_timeout := WakeOnTimeout}) ->
|
||||
State#{push_resume_timeout => ResumeTimeout,
|
||||
push_wake_on_timeout => WakeOnTimeout};
|
||||
c2s_copy_session(State, _) ->
|
||||
State.
|
||||
|
||||
-spec c2s_handle_cast(c2s_state(), any()) -> c2s_state().
|
||||
c2s_handle_cast(#{lserver := LServer} = State, push_enable) ->
|
||||
ResumeTimeout = gen_mod:get_module_opt(LServer, ?MODULE,
|
||||
resume_timeout, 86400),
|
||||
WakeOnTimeout = gen_mod:get_module_opt(LServer, ?MODULE,
|
||||
wake_on_timeout, true),
|
||||
State#{push_resume_timeout => ResumeTimeout,
|
||||
push_wake_on_timeout => WakeOnTimeout};
|
||||
c2s_handle_cast(State, push_disable) ->
|
||||
State1 = maps:remove(push_resume_timeout, State),
|
||||
maps:remove(push_wake_on_timeout, State1);
|
||||
c2s_handle_cast(State, _Msg) ->
|
||||
State.
|
||||
|
||||
-spec c2s_handle_info(c2s_state(), any()) -> c2s_state() | {stop, c2s_state()}.
|
||||
c2s_handle_info(#{push_enabled := true, mgmt_state := pending,
|
||||
jid := JID} = State, {timeout, _, push_keepalive}) ->
|
||||
?INFO_MSG("Waking ~s before session times out", [jid:encode(JID)]),
|
||||
mod_push:notify(State),
|
||||
{stop, State};
|
||||
c2s_handle_info(State, _) ->
|
||||
State.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions.
|
||||
%%--------------------------------------------------------------------
|
||||
-spec maybe_adjust_resume_timeout(c2s_state()) -> c2s_state().
|
||||
maybe_adjust_resume_timeout(#{push_resume_timeout := undefined} = State) ->
|
||||
State;
|
||||
maybe_adjust_resume_timeout(#{push_resume_timeout := Timeout} = State) ->
|
||||
OrigTimeout = mod_stream_mgmt:get_resume_timeout(State),
|
||||
?DEBUG("Adjusting resume timeout to ~B seconds", [Timeout]),
|
||||
State1 = mod_stream_mgmt:set_resume_timeout(State, Timeout),
|
||||
State1#{push_resume_timeout_orig => OrigTimeout}.
|
||||
|
||||
-spec maybe_restore_resume_timeout(c2s_state()) -> c2s_state().
|
||||
maybe_restore_resume_timeout(#{push_resume_timeout_orig := Timeout} = State) ->
|
||||
?DEBUG("Restoring resume timeout to ~B seconds", [Timeout]),
|
||||
State1 = mod_stream_mgmt:set_resume_timeout(State, Timeout),
|
||||
maps:remove(push_resume_timeout_orig, State1);
|
||||
maybe_restore_resume_timeout(State) ->
|
||||
State.
|
||||
|
||||
-spec maybe_start_wakeup_timer(c2s_state()) -> c2s_state().
|
||||
maybe_start_wakeup_timer(#{push_wake_on_timeout := true,
|
||||
push_resume_timeout := ResumeTimeout} = State)
|
||||
when is_integer(ResumeTimeout), ResumeTimeout > ?PUSH_BEFORE_TIMEOUT_SECS ->
|
||||
WakeTimeout = ResumeTimeout - ?PUSH_BEFORE_TIMEOUT_SECS,
|
||||
?DEBUG("Scheduling wake-up timer to fire in ~B seconds", [WakeTimeout]),
|
||||
erlang:start_timer(timer:seconds(WakeTimeout), self(), push_keepalive),
|
||||
State;
|
||||
maybe_start_wakeup_timer(State) ->
|
||||
State.
|
||||
|
||||
-spec wake_all(binary()) -> ok | error.
|
||||
wake_all(LServer) ->
|
||||
?INFO_MSG("Waking all push clients on ~s", [LServer]),
|
||||
Mod = gen_mod:db_mod(LServer, mod_push),
|
||||
case Mod:lookup_sessions(LServer) of
|
||||
{ok, Sessions} ->
|
||||
IgnoreResponse = fun(_) -> ok end,
|
||||
lists:foreach(fun({_, PushLJID, Node, XData}) ->
|
||||
mod_push:notify(LServer, PushLJID, Node,
|
||||
XData, IgnoreResponse)
|
||||
end, Sessions);
|
||||
error ->
|
||||
error
|
||||
end.
|
204
src/mod_push_mnesia.erl
Normal file
204
src/mod_push_mnesia.erl
Normal file
@ -0,0 +1,204 @@
|
||||
%%%----------------------------------------------------------------------
|
||||
%%% File : mod_push_mnesia.erl
|
||||
%%% Author : Holger Weiss <holger@zedat.fu-berlin.de>
|
||||
%%% Purpose : Mnesia backend for Push Notifications (XEP-0357)
|
||||
%%% Created : 15 Jul 2017 by Holger Weiss <holger@zedat.fu-berlin.de>
|
||||
%%%
|
||||
%%%
|
||||
%%% ejabberd, Copyright (C) 2017 ProcessOne
|
||||
%%%
|
||||
%%% This program is free software; you can redistribute it and/or
|
||||
%%% modify it under the terms of the GNU General Public License as
|
||||
%%% published by the Free Software Foundation; either version 2 of the
|
||||
%%% License, or (at your option) any later version.
|
||||
%%%
|
||||
%%% This program is distributed in the hope that it will be useful,
|
||||
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
%%% General Public License for more details.
|
||||
%%%
|
||||
%%% You should have received a copy of the GNU General Public License along
|
||||
%%% with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
%%%
|
||||
%%%----------------------------------------------------------------------
|
||||
|
||||
-module(mod_push_mnesia).
|
||||
-author('holger@zedat.fu-berlin.de').
|
||||
|
||||
-behavior(mod_push).
|
||||
|
||||
%% API
|
||||
-export([init/2, store_session/6, lookup_session/4, lookup_session/3,
|
||||
lookup_sessions/3, lookup_sessions/2, lookup_sessions/1,
|
||||
delete_session/3, delete_old_sessions/2]).
|
||||
|
||||
-include_lib("stdlib/include/ms_transform.hrl").
|
||||
-include("logger.hrl").
|
||||
-include("xmpp.hrl").
|
||||
|
||||
-record(push_session,
|
||||
{us = {<<"">>, <<"">>} :: {binary(), binary()},
|
||||
timestamp = p1_time_compat:timestamp() :: erlang:timestamp(),
|
||||
service = {<<"">>, <<"">>, <<"">>} :: ljid(),
|
||||
node = <<"">> :: binary(),
|
||||
xdata = #xdata{} :: xdata()}).
|
||||
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% API
|
||||
%%%-------------------------------------------------------------------
|
||||
init(_Host, _Opts) ->
|
||||
ejabberd_mnesia:create(?MODULE, push_session,
|
||||
[{disc_only_copies, [node()]},
|
||||
{type, bag},
|
||||
{attributes, record_info(fields, push_session)}]).
|
||||
|
||||
store_session(LUser, LServer, TS, PushJID, Node, XData) ->
|
||||
US = {LUser, LServer},
|
||||
PushLJID = jid:tolower(PushJID),
|
||||
MaxSessions = ejabberd_sm:get_max_user_sessions(LUser, LServer),
|
||||
F = fun() ->
|
||||
if is_integer(MaxSessions) ->
|
||||
enforce_max_sessions(US, MaxSessions - 1);
|
||||
MaxSessions == infinity ->
|
||||
ok
|
||||
end,
|
||||
mnesia:write(#push_session{us = US,
|
||||
timestamp = TS,
|
||||
service = PushLJID,
|
||||
node = Node,
|
||||
xdata = XData})
|
||||
end,
|
||||
case mnesia:transaction(F) of
|
||||
{atomic, ok} ->
|
||||
{ok, {TS, PushLJID, Node, XData}};
|
||||
{aborted, E} ->
|
||||
?ERROR_MSG("Cannot store push session for ~s@~s: ~p",
|
||||
[LUser, LServer, E]),
|
||||
error
|
||||
end.
|
||||
|
||||
lookup_session(LUser, LServer, PushJID, Node) ->
|
||||
PushLJID = jid:tolower(PushJID),
|
||||
MatchSpec = ets:fun2ms(
|
||||
fun(#push_session{us = {U, S}, service = P, node = N} = Rec)
|
||||
when U == LUser,
|
||||
S == LServer,
|
||||
P == PushLJID,
|
||||
N == Node ->
|
||||
Rec
|
||||
end),
|
||||
case mnesia:dirty_select(push_session, MatchSpec) of
|
||||
[#push_session{timestamp = TS, xdata = XData}] ->
|
||||
{ok, {TS, PushLJID, Node, XData}};
|
||||
_ ->
|
||||
?DEBUG("No push session found for ~s@~s (~p, ~s)",
|
||||
[LUser, LServer, PushJID, Node]),
|
||||
error
|
||||
end.
|
||||
|
||||
lookup_session(LUser, LServer, TS) ->
|
||||
MatchSpec = ets:fun2ms(
|
||||
fun(#push_session{us = {U, S}, timestamp = T} = Rec)
|
||||
when U == LUser,
|
||||
S == LServer,
|
||||
T == TS ->
|
||||
Rec
|
||||
end),
|
||||
case mnesia:dirty_select(push_session, MatchSpec) of
|
||||
[#push_session{service = PushLJID, node = Node, xdata = XData}] ->
|
||||
{ok, {TS, PushLJID, Node, XData}};
|
||||
_ ->
|
||||
?DEBUG("No push session found for ~s@~s (~p)",
|
||||
[LUser, LServer, TS]),
|
||||
error
|
||||
end.
|
||||
|
||||
lookup_sessions(LUser, LServer, PushJID) ->
|
||||
PushLJID = jid:tolower(PushJID),
|
||||
MatchSpec = ets:fun2ms(
|
||||
fun(#push_session{us = {U, S}, service = P, node = N} = Rec)
|
||||
when U == LUser,
|
||||
S == LServer,
|
||||
P == PushLJID ->
|
||||
Rec
|
||||
end),
|
||||
{ok, mnesia:dirty_select(push_session, MatchSpec)}.
|
||||
|
||||
lookup_sessions(LUser, LServer) ->
|
||||
Records = mnesia:dirty_read(push_session, {LUser, LServer}),
|
||||
Clients = [{TS, PushLJID, Node, XData}
|
||||
|| #push_session{timestamp = TS,
|
||||
service = PushLJID,
|
||||
node = Node,
|
||||
xdata = XData} <- Records],
|
||||
{ok, Clients}.
|
||||
|
||||
lookup_sessions(LServer) ->
|
||||
MatchSpec = ets:fun2ms(
|
||||
fun(#push_session{us = {_U, S},
|
||||
timestamp = TS,
|
||||
service = PushLJID,
|
||||
node = Node,
|
||||
xdata = XData})
|
||||
when S == LServer ->
|
||||
{TS, PushLJID, Node, XData}
|
||||
end),
|
||||
{ok, mnesia:dirty_select(push_session, MatchSpec)}.
|
||||
|
||||
delete_session(LUser, LServer, TS) ->
|
||||
MatchSpec = ets:fun2ms(
|
||||
fun(#push_session{us = {U, S}, timestamp = T} = Rec)
|
||||
when U == LUser,
|
||||
S == LServer,
|
||||
T == TS ->
|
||||
Rec
|
||||
end),
|
||||
F = fun() ->
|
||||
Recs = mnesia:select(push_session, MatchSpec),
|
||||
lists:foreach(fun mnesia:delete_object/1, Recs)
|
||||
end,
|
||||
case mnesia:transaction(F) of
|
||||
{atomic, ok} ->
|
||||
ok;
|
||||
{aborted, E} ->
|
||||
?ERROR_MSG("Cannot delete push seesion of ~s@~s: ~p",
|
||||
[LUser, LServer, E]),
|
||||
error
|
||||
end.
|
||||
|
||||
delete_old_sessions(_LServer, Time) ->
|
||||
DelIfOld = fun(#push_session{timestamp = T} = Rec, ok) when T < Time ->
|
||||
mnesia:delete_object(Rec);
|
||||
(_Rec, ok) ->
|
||||
ok
|
||||
end,
|
||||
F = fun() ->
|
||||
mnesia:foldl(DelIfOld, ok, push_session)
|
||||
end,
|
||||
case mnesia:transaction(F) of
|
||||
{atomic, ok} ->
|
||||
ok;
|
||||
{aborted, E} ->
|
||||
?ERROR_MSG("Cannot delete old push sessions: ~p", [E]),
|
||||
error
|
||||
end.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Internal functions.
|
||||
%%--------------------------------------------------------------------
|
||||
-spec enforce_max_sessions({binary(), binary()}, non_neg_integer()) -> ok.
|
||||
enforce_max_sessions({U, S} = US, Max) ->
|
||||
Recs = mnesia:wread({push_session, US}),
|
||||
NumRecs = length(Recs),
|
||||
if NumRecs > Max ->
|
||||
NumOldRecs = NumRecs - Max,
|
||||
Recs1 = lists:keysort(#push_session.timestamp, Recs),
|
||||
Recs2 = lists:reverse(Recs1),
|
||||
OldRecs = lists:sublist(Recs2, Max + 1, NumOldRecs),
|
||||
?INFO_MSG("Disabling ~B old push session(s) of ~s@~s",
|
||||
[NumOldRecs, U, S]),
|
||||
lists:foreach(fun(Rec) -> mnesia:delete_object(Rec) end, OldRecs);
|
||||
true ->
|
||||
ok
|
||||
end.
|
@ -33,6 +33,8 @@
|
||||
c2s_unbinded_packet/2, c2s_closed/2, c2s_terminated/2,
|
||||
c2s_handle_send/3, c2s_handle_info/2, c2s_handle_call/3,
|
||||
c2s_handle_recv/3]).
|
||||
%% adjust pending session timeout
|
||||
-export([get_resume_timeout/1, set_resume_timeout/2]).
|
||||
|
||||
-include("xmpp.hrl").
|
||||
-include("logger.hrl").
|
||||
@ -235,8 +237,9 @@ c2s_handle_info(#{mgmt_ack_timer := TRef, jid := JID, mod := Mod} = State,
|
||||
[jid:encode(JID)]),
|
||||
State1 = Mod:close(State),
|
||||
{stop, transition_to_pending(State1)};
|
||||
c2s_handle_info(#{mgmt_state := pending, jid := JID, mod := Mod} = State,
|
||||
{timeout, _, pending_timeout}) ->
|
||||
c2s_handle_info(#{mgmt_state := pending,
|
||||
mgmt_pending_timer := TRef, jid := JID, mod := Mod} = State,
|
||||
{timeout, TRef, pending_timeout}) ->
|
||||
?DEBUG("Timed out waiting for resumption of stream for ~s",
|
||||
[jid:encode(JID)]),
|
||||
Mod:stop(State#{mgmt_state => timeout});
|
||||
@ -282,6 +285,20 @@ c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In, sid := SID,
|
||||
c2s_terminated(State, _Reason) ->
|
||||
State.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Adjust pending session timeout
|
||||
%%%===================================================================
|
||||
-spec get_resume_timeout(state()) -> non_neg_integer().
|
||||
get_resume_timeout(#{mgmt_timeout := Timeout}) ->
|
||||
Timeout.
|
||||
|
||||
-spec set_resume_timeout(state(), non_neg_integer()) -> state().
|
||||
set_resume_timeout(#{mgmt_timeout := Timeout} = State, Timeout) ->
|
||||
State;
|
||||
set_resume_timeout(State, Timeout) ->
|
||||
State1 = restart_pending_timer(State, Timeout),
|
||||
State1#{mgmt_timeout => Timeout}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
@ -408,8 +425,8 @@ transition_to_pending(#{mgmt_state := active, jid := JID,
|
||||
lserver := LServer, mgmt_timeout := Timeout} = State) ->
|
||||
State1 = cancel_ack_timer(State),
|
||||
?INFO_MSG("Waiting for resumption of stream for ~s", [jid:encode(JID)]),
|
||||
erlang:start_timer(timer:seconds(Timeout), self(), pending_timeout),
|
||||
State2 = State1#{mgmt_state => pending},
|
||||
TRef = erlang:start_timer(timer:seconds(Timeout), self(), pending_timeout),
|
||||
State2 = State1#{mgmt_state => pending, mgmt_pending_timer => TRef},
|
||||
ejabberd_hooks:run_fold(c2s_session_pending, LServer, State2, []);
|
||||
transition_to_pending(State) ->
|
||||
State.
|
||||
@ -648,8 +665,24 @@ add_resent_delay_info(_State, El, _Time) ->
|
||||
send(#{mod := Mod} = State, Pkt) ->
|
||||
Mod:send(State, Pkt).
|
||||
|
||||
-spec restart_pending_timer(state(), non_neg_integer()) -> state().
|
||||
restart_pending_timer(#{mgmt_pending_timer := TRef} = State, NewTimeout) ->
|
||||
cancel_timer(TRef),
|
||||
NewTRef = erlang:start_timer(timer:seconds(NewTimeout), self(),
|
||||
pending_timeout),
|
||||
State#{mgmt_pending_timer => NewTRef};
|
||||
restart_pending_timer(State, _NewTimeout) ->
|
||||
State.
|
||||
|
||||
-spec cancel_ack_timer(state()) -> state().
|
||||
cancel_ack_timer(#{mgmt_ack_timer := TRef} = State) ->
|
||||
cancel_timer(TRef),
|
||||
maps:remove(mgmt_ack_timer, State);
|
||||
cancel_ack_timer(State) ->
|
||||
State.
|
||||
|
||||
-spec cancel_timer(reference()) -> ok.
|
||||
cancel_timer(TRef) ->
|
||||
case erlang:cancel_timer(TRef) of
|
||||
false ->
|
||||
receive {timeout, TRef, _} -> ok
|
||||
@ -657,10 +690,7 @@ cancel_ack_timer(#{mgmt_ack_timer := TRef} = State) ->
|
||||
end;
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
maps:remove(mgmt_ack_timer, State);
|
||||
cancel_ack_timer(State) ->
|
||||
State.
|
||||
end.
|
||||
|
||||
-spec bounce_message_queue() -> ok.
|
||||
bounce_message_queue() ->
|
||||
|
@ -431,6 +431,7 @@ db_tests(DB) when DB == mnesia; DB == redis ->
|
||||
mam_tests:single_cases(),
|
||||
carbons_tests:single_cases(),
|
||||
csi_tests:single_cases(),
|
||||
push_tests:single_cases(),
|
||||
test_unregister]},
|
||||
muc_tests:master_slave_cases(),
|
||||
privacy_tests:master_slave_cases(),
|
||||
@ -441,7 +442,8 @@ db_tests(DB) when DB == mnesia; DB == redis ->
|
||||
vcard_tests:master_slave_cases(),
|
||||
announce_tests:master_slave_cases(),
|
||||
carbons_tests:master_slave_cases(),
|
||||
csi_tests:master_slave_cases()];
|
||||
csi_tests:master_slave_cases(),
|
||||
push_tests:master_slave_cases()];
|
||||
db_tests(_) ->
|
||||
[{single_user, [sequence],
|
||||
[test_register,
|
||||
|
@ -231,7 +231,11 @@ Welcome to this XMPP server."
|
||||
mod_disco: []
|
||||
mod_ping: []
|
||||
mod_proxy65: []
|
||||
mod_push: []
|
||||
mod_push_keepalive: []
|
||||
mod_s2s_dialback: []
|
||||
mod_stream_mgmt:
|
||||
resume_timeout: 3
|
||||
mod_legacy_auth: []
|
||||
mod_register:
|
||||
welcome_message:
|
||||
@ -290,7 +294,11 @@ Welcome to this XMPP server."
|
||||
mod_disco: []
|
||||
mod_ping: []
|
||||
mod_proxy65: []
|
||||
mod_push: []
|
||||
mod_push_keepalive: []
|
||||
mod_s2s_dialback: []
|
||||
mod_stream_mgmt:
|
||||
resume_timeout: 3
|
||||
mod_legacy_auth: []
|
||||
mod_register:
|
||||
welcome_message:
|
||||
|
234
test/push_tests.erl
Normal file
234
test/push_tests.erl
Normal file
@ -0,0 +1,234 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% Author : Holger Weiss <holger@zedat.fu-berlin.de>
|
||||
%%% Created : 15 Jul 2017 by Holger Weiss <holger@zedat.fu-berlin.de>
|
||||
%%%
|
||||
%%%
|
||||
%%% ejabberd, Copyright (C) 2017 ProcessOne
|
||||
%%%
|
||||
%%% This program is free software; you can redistribute it and/or
|
||||
%%% modify it under the terms of the GNU General Public License as
|
||||
%%% published by the Free Software Foundation; either version 2 of the
|
||||
%%% License, or (at your option) any later version.
|
||||
%%%
|
||||
%%% This program is distributed in the hope that it will be useful,
|
||||
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
%%% General Public License for more details.
|
||||
%%%
|
||||
%%% You should have received a copy of the GNU General Public License along
|
||||
%%% with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
%%%
|
||||
%%%----------------------------------------------------------------------
|
||||
|
||||
-module(push_tests).
|
||||
|
||||
%% API
|
||||
-compile(export_all).
|
||||
-import(suite, [close_socket/1, connect/1, disconnect/1, get_event/1,
|
||||
get_features/2, make_iq_result/1, my_jid/1, put_event/2, recv/1,
|
||||
recv_iq/1, recv_message/1, self_presence/2, send/2, send_recv/2,
|
||||
server_jid/1]).
|
||||
|
||||
-include("suite.hrl").
|
||||
|
||||
-define(PUSH_NODE, <<"d3v1c3">>).
|
||||
-define(PUSH_XDATA_FIELDS,
|
||||
[#xdata_field{var = <<"FORM_TYPE">>,
|
||||
values = [?NS_PUBSUB_PUBLISH_OPTIONS]},
|
||||
#xdata_field{var = <<"secret">>,
|
||||
values = [<<"c0nf1d3nt14l">>]}]).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
%%%===================================================================
|
||||
%%% Single user tests
|
||||
%%%===================================================================
|
||||
single_cases() ->
|
||||
{push_single, [sequence],
|
||||
[single_test(feature_enabled),
|
||||
single_test(unsupported_iq)]}.
|
||||
|
||||
feature_enabled(Config) ->
|
||||
BareMyJID = jid:remove_resource(my_jid(Config)),
|
||||
Features = get_features(Config, BareMyJID),
|
||||
true = lists:member(?NS_PUSH_0, Features),
|
||||
disconnect(Config).
|
||||
|
||||
unsupported_iq(Config) ->
|
||||
PushJID = my_jid(Config),
|
||||
lists:foreach(
|
||||
fun(SubEl) ->
|
||||
#iq{type = error} =
|
||||
send_recv(Config, #iq{type = get, sub_els = [SubEl]})
|
||||
end, [#push_enable{jid = PushJID}, #push_disable{jid = PushJID}]),
|
||||
disconnect(Config).
|
||||
|
||||
%%%===================================================================
|
||||
%%% Master-slave tests
|
||||
%%%===================================================================
|
||||
master_slave_cases() ->
|
||||
{push_master_slave, [sequence],
|
||||
[master_slave_test(sm),
|
||||
master_slave_test(offline),
|
||||
master_slave_test(mam)]}.
|
||||
|
||||
sm_master(Config) ->
|
||||
ct:comment("Waiting for the slave to close the socket"),
|
||||
peer_down = get_event(Config),
|
||||
ct:comment("Waiting a bit in order to test the keepalive feature"),
|
||||
ct:sleep(5000), % Without mod_push_keepalive, the session would time out.
|
||||
ct:comment("Sending message to the slave"),
|
||||
send_test_message(Config),
|
||||
ct:comment("Handling push notification"),
|
||||
handle_notification(Config),
|
||||
ct:comment("Receiving bounced message from the slave"),
|
||||
#message{type = error} = recv_message(Config),
|
||||
ct:comment("Closing the connection"),
|
||||
disconnect(Config).
|
||||
|
||||
sm_slave(Config) ->
|
||||
ct:comment("Enabling push notifications"),
|
||||
ok = enable_push(Config),
|
||||
ct:comment("Enabling stream management"),
|
||||
ok = enable_sm(Config),
|
||||
ct:comment("Closing the socket"),
|
||||
close_socket(Config).
|
||||
|
||||
offline_master(Config) ->
|
||||
ct:comment("Waiting for the slave to be ready"),
|
||||
ready = get_event(Config),
|
||||
ct:comment("Sending message to the slave"),
|
||||
send_test_message(Config), % No push notification, slave is online.
|
||||
ct:comment("Waiting for the slave to disconnect"),
|
||||
peer_down = get_event(Config),
|
||||
ct:comment("Sending message to offline storage"),
|
||||
send_test_message(Config),
|
||||
ct:comment("Handling push notification for offline message"),
|
||||
handle_notification(Config),
|
||||
ct:comment("Closing the connection"),
|
||||
disconnect(Config).
|
||||
|
||||
offline_slave(Config) ->
|
||||
ct:comment("Re-enabling push notifications"),
|
||||
ok = enable_push(Config),
|
||||
ct:comment("Letting the master know that we're ready"),
|
||||
put_event(Config, ready),
|
||||
ct:comment("Receiving message from the master"),
|
||||
recv_test_message(Config),
|
||||
ct:comment("Closing the connection"),
|
||||
disconnect(Config).
|
||||
|
||||
mam_master(Config) ->
|
||||
ct:comment("Waiting for the slave to be ready"),
|
||||
ready = get_event(Config),
|
||||
ct:comment("Sending message to the slave"),
|
||||
send_test_message(Config),
|
||||
ct:comment("Handling push notification for MAM message"),
|
||||
handle_notification(Config),
|
||||
ct:comment("Closing the connection"),
|
||||
disconnect(Config).
|
||||
|
||||
mam_slave(Config) ->
|
||||
self_presence(Config, available),
|
||||
ct:comment("Receiving message from offline storage"),
|
||||
recv_test_message(Config),
|
||||
ct:comment("Re-enabling push notifications"),
|
||||
ok = enable_push(Config),
|
||||
ct:comment("Enabling MAM"),
|
||||
ok = enable_mam(Config),
|
||||
ct:comment("Letting the master know that we're ready"),
|
||||
put_event(Config, ready),
|
||||
ct:comment("Receiving message from the master"),
|
||||
recv_test_message(Config),
|
||||
ct:comment("Waiting for the master to disconnect"),
|
||||
peer_down = get_event(Config),
|
||||
ct:comment("Disabling push notifications"),
|
||||
ok = disable_push(Config),
|
||||
ct:comment("Closing the connection and cleaning up"),
|
||||
clean(disconnect(Config)).
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
single_test(T) ->
|
||||
list_to_atom("push_" ++ atom_to_list(T)).
|
||||
|
||||
master_slave_test(T) ->
|
||||
{list_to_atom("push_" ++ atom_to_list(T)), [parallel],
|
||||
[list_to_atom("push_" ++ atom_to_list(T) ++ "_master"),
|
||||
list_to_atom("push_" ++ atom_to_list(T) ++ "_slave")]}.
|
||||
|
||||
enable_sm(Config) ->
|
||||
send(Config, #sm_enable{xmlns = ?NS_STREAM_MGMT_3, resume = true}),
|
||||
case recv(Config) of
|
||||
#sm_enabled{resume = true} ->
|
||||
ok;
|
||||
#sm_failed{reason = Reason} ->
|
||||
Reason
|
||||
end.
|
||||
|
||||
enable_mam(Config) ->
|
||||
case send_recv(
|
||||
Config, #iq{type = set, sub_els = [#mam_prefs{xmlns = ?NS_MAM_1,
|
||||
default = always}]}) of
|
||||
#iq{type = result} ->
|
||||
ok;
|
||||
#iq{type = error} = Err ->
|
||||
xmpp:get_error(Err)
|
||||
end.
|
||||
|
||||
enable_push(Config) ->
|
||||
%% Usually, the push JID would be a server JID (such as push.example.com).
|
||||
%% We specify the peer's full user JID instead, so the push notifications
|
||||
%% will be sent to the peer.
|
||||
PushJID = ?config(peer, Config),
|
||||
XData = #xdata{type = submit, fields = ?PUSH_XDATA_FIELDS},
|
||||
case send_recv(
|
||||
Config, #iq{type = set,
|
||||
sub_els = [#push_enable{jid = PushJID,
|
||||
node = ?PUSH_NODE,
|
||||
xdata = XData}]}) of
|
||||
#iq{type = result, sub_els = []} ->
|
||||
ok;
|
||||
#iq{type = error} = Err ->
|
||||
xmpp:get_error(Err)
|
||||
end.
|
||||
|
||||
disable_push(Config) ->
|
||||
PushJID = ?config(peer, Config),
|
||||
case send_recv(
|
||||
Config, #iq{type = set,
|
||||
sub_els = [#push_disable{jid = PushJID,
|
||||
node = ?PUSH_NODE}]}) of
|
||||
#iq{type = result, sub_els = []} ->
|
||||
ok;
|
||||
#iq{type = error} = Err ->
|
||||
xmpp:get_error(Err)
|
||||
end.
|
||||
|
||||
send_test_message(Config) ->
|
||||
Peer = ?config(peer, Config),
|
||||
Msg = #message{to = Peer, body = [#text{data = <<"test">>}]},
|
||||
send(Config, Msg).
|
||||
|
||||
recv_test_message(Config) ->
|
||||
Peer = ?config(peer, Config),
|
||||
#message{from = Peer,
|
||||
body = [#text{data = <<"test">>}]} = recv_message(Config).
|
||||
|
||||
handle_notification(Config) ->
|
||||
From = server_jid(Config),
|
||||
Item = #ps_item{xml_els = [xmpp:encode(#push_notification{})]},
|
||||
Publish = #ps_publish{node = ?PUSH_NODE, items = [Item]},
|
||||
XData = #xdata{type = submit, fields = ?PUSH_XDATA_FIELDS},
|
||||
PubSub = #pubsub{publish = Publish, publish_options = XData},
|
||||
IQ = #iq{type = set, from = From, sub_els = [PubSub]} = recv_iq(Config),
|
||||
send(Config, make_iq_result(IQ)).
|
||||
|
||||
clean(Config) ->
|
||||
{U, S, _} = jid:tolower(my_jid(Config)),
|
||||
mod_push:remove_user(U, S),
|
||||
mod_mam:remove_user(U, S),
|
||||
Config.
|
Loading…
Reference in New Issue
Block a user