From d6f1d3df5b5a75f618bcc6eeb6425bc47dfd84d2 Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Thu, 20 Jul 2017 20:22:50 +0200 Subject: [PATCH] Support XEP-0357: Push Notifications Closes #1379. --- ejabberd.yml.example | 1 + src/ejabberd_c2s.erl | 6 +- src/ejabberd_sm.erl | 20 +- src/misc.erl | 16 +- src/mod_mam.erl | 19 +- src/mod_push.erl | 590 ++++++++++++++++++++++++++ src/mod_push_mnesia.erl | 204 +++++++++ test/ejabberd_SUITE.erl | 4 +- test/ejabberd_SUITE_data/ejabberd.yml | 6 + test/push_tests.erl | 232 ++++++++++ 10 files changed, 1079 insertions(+), 19 deletions(-) create mode 100644 src/mod_push.erl create mode 100644 src/mod_push_mnesia.erl create mode 100644 test/push_tests.erl diff --git a/ejabberd.yml.example b/ejabberd.yml.example index 3f0e8d1c6..693a87f57 100644 --- a/ejabberd.yml.example +++ b/ejabberd.yml.example @@ -725,6 +725,7 @@ modules: - "flat" - "hometree" - "pep" # pep requires mod_caps + mod_push: {} ## mod_register: ## ## Protect In-Band account registrations with CAPTCHA. diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 159cb4054..4b265d29d 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -47,7 +47,7 @@ process_terminated/2, process_info/2]). %% API -export([get_presence/1, set_presence/2, resend_presence/1, resend_presence/2, - open_session/1, call/3, send/2, close/1, close/2, stop/1, + open_session/1, call/3, cast/2, send/2, close/1, close/2, stop/1, reply/2, copy_state/2, set_timeout/2, route/2, host_up/1, host_down/1]). @@ -90,6 +90,10 @@ socket_type() -> call(Ref, Msg, Timeout) -> xmpp_stream_in:call(Ref, Msg, Timeout). +-spec cast(pid(), term()) -> ok. +cast(Ref, Msg) -> + xmpp_stream_in:cast(Ref, Msg). + reply(Ref, Reply) -> xmpp_stream_in:reply(Ref, Reply). diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index 344febb5d..302cfded4 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -66,6 +66,8 @@ user_resources/2, kick_user/2, get_session_pid/3, + get_session_sid/3, + get_session_sids/2, get_user_info/2, get_user_info/3, get_user_ip/3, @@ -292,15 +294,31 @@ close_session_unset_presence(SID, User, Server, -spec get_session_pid(binary(), binary(), binary()) -> none | pid(). get_session_pid(User, Server, Resource) -> + case get_session_sid(User, Server, Resource) of + {_, PID} -> PID; + none -> none + end. + +-spec get_session_sid(binary(), binary(), binary()) -> none | sid(). + +get_session_sid(User, Server, Resource) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), LResource = jid:resourceprep(Resource), Mod = get_sm_backend(LServer), case online(get_sessions(Mod, LUser, LServer, LResource)) of - [#session{sid = {_, Pid}}] -> Pid; + [#session{sid = SID}] -> SID; _ -> none end. +-spec get_session_sids(binary(), binary()) -> [sid()]. + +get_session_sids(User, Server) -> + LUser = jid:nodeprep(User), + LServer = jid:nameprep(Server), + Mod = get_sm_backend(LServer), + online(get_sessions(Mod, LUser, LServer)). + -spec set_offline_info(sid(), binary(), binary(), binary(), info()) -> ok. set_offline_info(SID, User, Server, Resource, Info) -> diff --git a/src/misc.erl b/src/misc.erl index 604a458af..2112cd90c 100644 --- a/src/misc.erl +++ b/src/misc.erl @@ -32,8 +32,8 @@ hex_to_bin/1, hex_to_base64/1, expand_keyword/3, atom_to_binary/1, binary_to_atom/1, tuple_to_binary/1, l2i/1, i2l/1, i2l/2, expr_to_term/1, term_to_expr/1, - encode_pid/1, decode_pid/2, compile_exprs/2, join_atoms/2, - try_read_file/1]). + now_to_usec/1, usec_to_now/1, encode_pid/1, decode_pid/2, + compile_exprs/2, join_atoms/2, try_read_file/1]). %% Deprecated functions -export([decode_base64/1, encode_base64/1]). @@ -127,6 +127,18 @@ expr_to_term(Expr) -> term_to_expr(Term) -> list_to_binary(io_lib:print(Term)). +-spec now_to_usec(erlang:timestamp()) -> non_neg_integer(). +now_to_usec({MSec, Sec, USec}) -> + (MSec*1000000 + Sec)*1000000 + USec. + +-spec usec_to_now(non_neg_integer()) -> erlang:timestamp(). +usec_to_now(Int) -> + Secs = Int div 1000000, + USec = Int rem 1000000, + MSec = Secs div 1000000, + Sec = Secs rem 1000000, + {MSec, Sec, USec}. + l2i(I) when is_integer(I) -> I; l2i(L) when is_binary(L) -> binary_to_integer(L). diff --git a/src/mod_mam.erl b/src/mod_mam.erl index eb2082fe2..30dd7dd57 100644 --- a/src/mod_mam.erl +++ b/src/mod_mam.erl @@ -434,8 +434,9 @@ message_is_archived(false, #{jid := JID}, Pkt) -> delete_old_messages(TypeBin, Days) when TypeBin == <<"chat">>; TypeBin == <<"groupchat">>; TypeBin == <<"all">> -> + CurrentTime = p1_time_compat:system_time(micro_seconds), Diff = Days * 24 * 60 * 60 * 1000000, - TimeStamp = usec_to_now(p1_time_compat:system_time(micro_seconds) - Diff), + TimeStamp = misc:usec_to_now(CurrentTime - Diff), Type = misc:binary_to_atom(TypeBin), DBTypes = lists:usort( lists:map( @@ -830,7 +831,7 @@ select(_LServer, JidRequestor, JidArchive, Query, RSM, Msgs = lists:flatmap( fun({Nick, Pkt, _HaveSubject, Now, _Size}) -> - TS = now_to_usec(Now), + TS = misc:now_to_usec(Now), case match_interval(Now, Start, End) and match_rsm(Now, RSM) of true -> @@ -979,24 +980,14 @@ match_interval(Now, Start, End) -> (Now >= Start) and (Now =< End). match_rsm(Now, #rsm_set{'after' = ID}) when is_binary(ID), ID /= <<"">> -> - Now1 = (catch usec_to_now(binary_to_integer(ID))), + Now1 = (catch misc:usec_to_now(binary_to_integer(ID))), Now > Now1; match_rsm(Now, #rsm_set{before = ID}) when is_binary(ID), ID /= <<"">> -> - Now1 = (catch usec_to_now(binary_to_integer(ID))), + Now1 = (catch misc:usec_to_now(binary_to_integer(ID))), Now < Now1; match_rsm(_Now, _) -> true. -now_to_usec({MSec, Sec, USec}) -> - (MSec*1000000 + Sec)*1000000 + USec. - -usec_to_now(Int) -> - Secs = Int div 1000000, - USec = Int rem 1000000, - MSec = Secs div 1000000, - Sec = Secs rem 1000000, - {MSec, Sec, USec}. - get_jids(undefined) -> []; get_jids(Js) -> diff --git a/src/mod_push.erl b/src/mod_push.erl new file mode 100644 index 000000000..f82226440 --- /dev/null +++ b/src/mod_push.erl @@ -0,0 +1,590 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_push.erl +%%% Author : Holger Weiss +%%% Purpose : Push Notifications (XEP-0357) +%%% Created : 15 Jul 2017 by Holger Weiss +%%% +%%% +%%% ejabberd, Copyright (C) 2017 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- + +-module(mod_push). +-author('holger@zedat.fu-berlin.de'). +-protocol({xep, 357, '0.2'}). + +-behavior(gen_mod). + +%% gen_mod callbacks. +-export([start/2, stop/1, reload/3, mod_opt_type/1, depends/2]). + +%% ejabberd_hooks callbacks. +-export([disco_sm_features/5, c2s_session_pending/1, c2s_copy_session/2, + c2s_handle_cast/2, c2s_stanza/3, mam_message/6, offline_message/1, + remove_user/2]). + +%% gen_iq_handler callback. +-export([process_iq/1]). + +%% ejabberd command. +-export([get_commands_spec/0, delete_old_sessions/1]). + +-include("ejabberd.hrl"). +-include("ejabberd_commands.hrl"). +-include("logger.hrl"). +-include("xmpp.hrl"). + +-define(PUSH_CACHE, push_cache). + +-type c2s_state() :: ejabberd_c2s:state(). +-type timestamp() :: erlang:timestamp(). +-type push_session() :: {timestamp(), ljid(), binary(), xdata()}. + +-callback init(binary(), gen_mod:opts()) + -> any(). +-callback store_session(binary(), binary(), timestamp(), jid(), binary(), + xdata()) + -> {ok, push_session()} | error. +-callback lookup_session(binary(), binary(), jid(), binary()) + -> {ok, push_session()} | error. +-callback lookup_session(binary(), binary(), timestamp()) + -> {ok, push_session()} | error. +-callback lookup_sessions(binary(), binary(), jid()) + -> {ok, [push_session()]} | error. +-callback lookup_sessions(binary(), binary()) + -> {ok, [push_session()]} | error. +-callback lookup_sessions(binary()) + -> {ok, [push_session()]} | error. +-callback delete_session(binary(), binary(), timestamp()) + -> ok | error. +-callback delete_old_sessions(binary() | global, erlang:timestamp()) + -> any(). +-callback use_cache(binary()) + -> boolean(). +-callback cache_nodes(binary()) + -> [node()]. + +-optional_callbacks([use_cache/1, cache_nodes/1]). + +%%-------------------------------------------------------------------- +%% gen_mod callbacks. +%%-------------------------------------------------------------------- +-spec start(binary(), gen_mod:opts()) -> ok. +start(Host, Opts) -> + IQDisc = gen_mod:get_opt(iqdisc, Opts, gen_iq_handler:iqdisc(Host)), + Mod = gen_mod:db_mod(Host, Opts, ?MODULE), + Mod:init(Host, Opts), + init_cache(Mod, Host, Opts), + register_iq_handlers(Host, IQDisc), + register_hooks(Host), + ejabberd_commands:register_commands(get_commands_spec()). + +-spec stop(binary()) -> ok. +stop(Host) -> + unregister_hooks(Host), + unregister_iq_handlers(Host), + ejabberd_commands:unregister_commands(get_commands_spec()). + +-spec reload(binary(), gen_mod:opts(), gen_mod:opts()) -> ok. +reload(Host, NewOpts, OldOpts) -> + NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE), + OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE), + if NewMod /= OldMod -> + NewMod:init(Host, NewOpts); + true -> + ok + end, + case gen_mod:is_equal_opt(iqdisc, NewOpts, OldOpts, + gen_iq_handler:iqdisc(Host)) of + {false, IQDisc, _} -> + register_iq_handlers(Host, IQDisc); + true -> + ok + end. + +-spec depends(binary(), gen_mod:opts()) -> [{module(), hard | soft}]. +depends(_Host, _Opts) -> + []. + +-spec mod_opt_type(atom()) -> fun((term()) -> term()) | [atom()]. +mod_opt_type(db_type) -> + fun(T) -> ejabberd_config:v_db(?MODULE, T) end; +mod_opt_type(O) when O == cache_life_time; O == cache_size -> + fun(I) when is_integer(I), I > 0 -> I; + (infinity) -> infinity + end; +mod_opt_type(O) when O == use_cache; O == cache_missed -> + fun (B) when is_boolean(B) -> B end; +mod_opt_type(iqdisc) -> + fun gen_iq_handler:check_type/1; +mod_opt_type(_) -> + [db_type, cache_life_time, cache_size, use_cache, cache_missed, iqdisc]. + +%%-------------------------------------------------------------------- +%% ejabberd command callback. +%%-------------------------------------------------------------------- +-spec get_commands_spec() -> [ejabberd_commands()]. +get_commands_spec() -> + [#ejabberd_commands{name = delete_old_push_sessions, tags = [purge], + desc = "Remove push sessions older than DAYS", + module = ?MODULE, function = delete_old_sessions, + args = [{days, integer}], + result = {res, rescode}}]. + +-spec delete_old_sessions(non_neg_integer()) -> ok | any(). +delete_old_sessions(Days) -> + CurrentTime = p1_time_compat:system_time(micro_seconds), + Diff = Days * 24 * 60 * 60 * 1000000, + TimeStamp = misc:usec_to_now(CurrentTime - Diff), + DBTypes = lists:usort( + lists:map( + fun(Host) -> + case gen_mod:db_type(Host, ?MODULE) of + sql -> {sql, Host}; + Other -> {Other, global} + end + end, ?MYHOSTS)), + Results = lists:map( + fun({DBType, Host}) -> + Mod = gen_mod:db_mod(DBType, ?MODULE), + Mod:delete_old_sessions(Host, TimeStamp) + end, DBTypes), + ets_cache:clear(?PUSH_CACHE, ejabberd_cluster:get_nodes()), + case lists:filter(fun(Res) -> Res /= ok end, Results) of + [] -> + ?INFO_MSG("Deleted push sessions older than ~B days", [Days]), + ok; + [NotOk | _] -> + ?ERROR_MSG("Error while deleting old push sessions: ~p", [NotOk]), + NotOk + end. + +%%-------------------------------------------------------------------- +%% Register/unregister hooks. +%%-------------------------------------------------------------------- +-spec register_hooks(binary()) -> ok. +register_hooks(Host) -> + ejabberd_hooks:add(disco_sm_features, Host, ?MODULE, + disco_sm_features, 50), + ejabberd_hooks:add(c2s_session_pending, Host, ?MODULE, + c2s_session_pending, 50), + ejabberd_hooks:add(c2s_copy_session, Host, ?MODULE, + c2s_copy_session, 50), + ejabberd_hooks:add(c2s_handle_cast, Host, ?MODULE, + c2s_handle_cast, 50), + ejabberd_hooks:add(c2s_handle_send, Host, ?MODULE, + c2s_stanza, 50), + ejabberd_hooks:add(store_mam_message, Host, ?MODULE, + mam_message, 50), + ejabberd_hooks:add(offline_message_hook, Host, ?MODULE, + offline_message, 50), + ejabberd_hooks:add(remove_user, Host, ?MODULE, + remove_user, 50). + +-spec unregister_hooks(binary()) -> ok. +unregister_hooks(Host) -> + ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, + disco_sm_features, 50), + ejabberd_hooks:delete(c2s_session_pending, Host, ?MODULE, + c2s_session_pending, 50), + ejabberd_hooks:delete(c2s_copy_session, Host, ?MODULE, + c2s_copy_session, 50), + ejabberd_hooks:delete(c2s_handle_cast, Host, ?MODULE, + c2s_handle_cast, 50), + ejabberd_hooks:delete(c2s_handle_send, Host, ?MODULE, + c2s_stanza, 50), + ejabberd_hooks:delete(store_mam_message, Host, ?MODULE, + mam_message, 50), + ejabberd_hooks:delete(offline_message_hook, Host, ?MODULE, + offline_message, 50), + ejabberd_hooks:delete(remove_user, Host, ?MODULE, + remove_user, 50). + +%%-------------------------------------------------------------------- +%% Service discovery. +%%-------------------------------------------------------------------- +-spec disco_sm_features(empty | {result, [binary()]} | {error, stanza_error()}, + jid(), jid(), binary(), binary()) + -> {result, [binary()]} | {error, stanza_error()}. +disco_sm_features(empty, From, To, Node, Lang) -> + disco_sm_features({result, []}, From, To, Node, Lang); +disco_sm_features({result, OtherFeatures}, + #jid{luser = U, lserver = S}, + #jid{luser = U, lserver = S}, <<"">>, _Lang) -> + {result, [?NS_PUSH_0 | OtherFeatures]}; +disco_sm_features(Acc, _From, _To, _Node, _Lang) -> + Acc. + +%%-------------------------------------------------------------------- +%% IQ handlers. +%%-------------------------------------------------------------------- +-spec register_iq_handlers(binary(), gen_iq_handler:type()) -> ok. +register_iq_handlers(Host, IQDisc) -> + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_PUSH_0, + ?MODULE, process_iq, IQDisc). + +-spec unregister_iq_handlers(binary()) -> ok. +unregister_iq_handlers(Host) -> + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_PUSH_0). + +-spec process_iq(iq()) -> iq(). +process_iq(#iq{type = get, lang = Lang} = IQ) -> + Txt = <<"Value 'get' of 'type' attribute is not allowed">>, + xmpp:make_error(IQ, xmpp:err_not_allowed(Txt, Lang)); +process_iq(#iq{lang = Lang, sub_els = [#push_enable{node = <<>>}]} = IQ) -> + Txt = <<"Enabling push without 'node' attribute is not supported">>, + xmpp:make_error(IQ, xmpp:err_feature_not_implemented(Txt, Lang)); +process_iq(#iq{from = #jid{lserver = LServer} = JID, + to = #jid{lserver = LServer}, + sub_els = [#push_enable{jid = PushJID, + node = Node, + xdata = XData}]} = IQ) -> + case enable(JID, PushJID, Node, XData) of + ok -> + xmpp:make_iq_result(IQ); + error -> + xmpp:make_error(IQ, xmpp:err_internal_server_error()) + end; +process_iq(#iq{from = #jid{lserver = LServer} = JID, + to = #jid{lserver = LServer}, + sub_els = [#push_disable{jid = PushJID, + node = Node}]} = IQ) -> + case disable(JID, PushJID, Node) of + ok -> + xmpp:make_iq_result(IQ); + error -> + xmpp:make_error(IQ, xmpp:err_item_not_found()) + end; +process_iq(IQ) -> + xmpp:make_error(IQ, xmpp:err_not_allowed()). + +-spec enable(jid(), jid(), binary(), xdata()) -> ok | error. +enable(#jid{luser = LUser, lserver = LServer, lresource = LResource} = JID, + PushJID, Node, XData) -> + case ejabberd_sm:get_session_sid(LUser, LServer, LResource) of + {TS, PID} -> + case store_session(LUser, LServer, TS, PushJID, Node, XData) of + {ok, _} -> + ?INFO_MSG("Enabling push notifications for ~s", + [jid:encode(JID)]), + ejabberd_c2s:cast(PID, push_enable); + error -> + ?ERROR_MSG("Cannot enable push for ~s: database error", + [jid:encode(JID)]), + error + end; + none -> + ?WARNING_MSG("Cannot enable push for ~s: session not found", + [jid:encode(JID)]), + error + end. + +-spec disable(jid(), jid(), binary() | undefined) -> ok | error. +disable(#jid{luser = LUser, lserver = LServer, lresource = LResource} = JID, + PushJID, Node) -> + case ejabberd_sm:get_session_sid(LUser, LServer, LResource) of + {_TS, PID} -> + ?INFO_MSG("Disabling push notifications for ~s", + [jid:encode(JID)]), + ejabberd_c2s:cast(PID, push_disable); + none -> + ?WARNING_MSG("Session not found while disabling push for ~s", + [jid:encode(JID)]) + end, + if Node /= undefined -> + delete_session(LUser, LServer, PushJID, Node); + true -> + delete_sessions(LUser, LServer, PushJID) + end. + +%%-------------------------------------------------------------------- +%% Hook callbacks. +%%-------------------------------------------------------------------- +-spec c2s_stanza(c2s_state(), xmpp_element() | xmlel(), term()) -> c2s_state(). +c2s_stanza(#{push_enabled := true, mgmt_state := pending} = State, + _Pkt, _SendResult) -> + notify(State), + State; +c2s_stanza(State, _Pkt, _SendResult) -> + State. + +-spec mam_message(message() | drop, binary(), binary(), jid(), + chat | groupchat, recv | send) -> message(). +mam_message(#message{meta = #{push_notified := true}} = Pkt, + _LUser, _LServer, _Peer, _Type, _Dir) -> + Pkt; +mam_message(#message{} = Pkt, LUser, LServer, _Peer, chat, _Dir) -> + case lookup_sessions(LUser, LServer) of + {ok, [_|_] = Clients} -> + case drop_online_sessions(LUser, LServer, Clients) of + [_|_] = Clients1 -> + ?DEBUG("Notifying ~s@~s of MAM message", [LUser, LServer]), + notify(LUser, LServer, Clients1); + [] -> + ok + end; + _ -> + ok + end, + xmpp:put_meta(Pkt, push_notified, true); +mam_message(Pkt, _LUser, _LServer, _Peer, _Type, _Dir) -> + Pkt. + +-spec offline_message({any(), message()}) -> {any(), message()}. +offline_message({_Action, #message{meta = #{push_notified := true}}} = Acc) -> + Acc; +offline_message({Action, #message{to = #jid{luser = LUser, + lserver = LServer}} = Pkt}) -> + case lookup_sessions(LUser, LServer) of + {ok, [_|_] = Clients} -> + ?DEBUG("Notifying ~s@~s of offline message", [LUser, LServer]), + notify(LUser, LServer, Clients); + _ -> + ok + end, + {Action, xmpp:put_meta(Pkt, push_notified, true)}. + +-spec c2s_session_pending(c2s_state()) -> c2s_state(). +c2s_session_pending(#{push_enabled := true, mgmt_queue := Queue} = State) -> + case p1_queue:len(Queue) of + Len when Len > 0 -> + ?DEBUG("Notifying client of unacknowledged messages", []), + notify(State), + State; + 0 -> + State + end; +c2s_session_pending(State) -> + State. + +-spec c2s_copy_session(c2s_state(), c2s_state()) -> c2s_state(). +c2s_copy_session(State, #{push_enabled := true}) -> + State#{push_enabled => true}; +c2s_copy_session(State, _) -> + State. + +-spec c2s_handle_cast(c2s_state(), any()) -> c2s_state() | {stop, c2s_state()}. +c2s_handle_cast(State, push_enable) -> + {stop, State#{push_enabled => true}}; +c2s_handle_cast(State, push_disable) -> + {stop, maps:remove(push_enabled, State)}; +c2s_handle_cast(State, _Msg) -> + State. + +-spec remove_user(binary(), binary()) -> ok | error. +remove_user(LUser, LServer) -> + ?INFO_MSG("Removing any push sessions of ~s@~s", [LUser, LServer]), + Mod = gen_mod:db_mod(LServer, ?MODULE), + LookupFun = fun() -> Mod:lookup_sessions(LUser, LServer) end, + delete_sessions(LUser, LServer, LookupFun, Mod). + +%%-------------------------------------------------------------------- +%% Internal functions. +%%-------------------------------------------------------------------- +-spec notify(c2s_state()) -> ok. +notify(#{jid := #jid{luser = LUser, lserver = LServer}, sid := {TS, _}}) -> + case lookup_session(LUser, LServer, TS) of + {ok, Client} -> + notify(LUser, LServer, [Client]); + error -> + ok + end. + +-spec notify(binary(), binary(), [push_session()]) -> ok. +notify(LUser, LServer, Clients) -> + lists:foreach( + fun({TS, PushLJID, Node, XData}) -> + HandleResponse = fun(#iq{type = result}) -> + ok; + (#iq{type = error}) -> + delete_session(LUser, LServer, TS); + (timeout) -> + ok % Hmm. + end, + notify(LServer, PushLJID, Node, XData, HandleResponse) + end, Clients). + +-spec notify(binary(), ljid(), binary(), xdata(), + fun((iq() | timeout) -> any())) -> ok. +notify(LServer, PushLJID, Node, XData, HandleResponse) -> + From = jid:make(LServer), + Item = #ps_item{xml_els = [xmpp:encode(#push_notification{})]}, + PubSub = #pubsub{publish = #ps_publish{node = Node, items = [Item]}, + publish_options = XData}, + IQ = #iq{type = set, + from = From, + to = jid:make(PushLJID), + id = randoms:get_string(), + sub_els = [PubSub]}, + ejabberd_local:route_iq(IQ, HandleResponse), + ok. + +-spec store_session(binary(), binary(), timestamp(), jid(), binary(), xdata()) + -> {ok, push_session()} | error. +store_session(LUser, LServer, TS, PushJID, Node, XData) -> + Mod = gen_mod:db_mod(LServer, ?MODULE), + delete_session(LUser, LServer, PushJID, Node), + case use_cache(Mod, LServer) of + true -> + ets_cache:delete(?PUSH_CACHE, {LUser, LServer}, + cache_nodes(Mod, LServer)), + ets_cache:update( + ?PUSH_CACHE, + {LUser, LServer, TS}, {ok, {TS, PushJID, Node, XData}}, + fun() -> + Mod:store_session(LUser, LServer, TS, PushJID, Node, + XData) + end, cache_nodes(Mod, LServer)); + false -> + Mod:store_session(LUser, LServer, TS, PushJID, Node, XData) + end. + +-spec lookup_session(binary(), binary(), timestamp()) + -> {ok, push_session()} | error. +lookup_session(LUser, LServer, TS) -> + Mod = gen_mod:db_mod(LServer, ?MODULE), + case use_cache(Mod, LServer) of + true -> + ets_cache:lookup( + ?PUSH_CACHE, {LUser, LServer, TS}, + fun() -> Mod:lookup_session(LUser, LServer, TS) end); + false -> + Mod:lookup_session(LUser, LServer, TS) + end. + +-spec lookup_sessions(binary(), binary()) -> {ok, [push_session()]} | error. +lookup_sessions(LUser, LServer) -> + Mod = gen_mod:db_mod(LServer, ?MODULE), + case use_cache(Mod, LServer) of + true -> + ets_cache:lookup( + ?PUSH_CACHE, {LUser, LServer}, + fun() -> Mod:lookup_sessions(LUser, LServer) end); + false -> + Mod:lookup_sessions(LUser, LServer) + end. + +-spec delete_session(binary(), binary(), timestamp()) -> ok | error. +delete_session(LUser, LServer, TS) -> + Mod = gen_mod:db_mod(LServer, ?MODULE), + ok = Mod:delete_session(LUser, LServer, TS), + case use_cache(Mod, LServer) of + true -> + ets_cache:delete(?PUSH_CACHE, {LUser, LServer}, + cache_nodes(Mod, LServer)), + ets_cache:delete(?PUSH_CACHE, {LUser, LServer, TS}, + cache_nodes(Mod, LServer)); + false -> + ok + end. + +-spec delete_session(binary(), binary(), jid(), binary()) -> ok | error. +delete_session(LUser, LServer, PushJID, Node) -> + Mod = gen_mod:db_mod(LServer, ?MODULE), + case Mod:lookup_session(LUser, LServer, PushJID, Node) of + {ok, {TS, _, _, _}} -> + delete_session(LUser, LServer, TS); + error -> + error + end. + +-spec delete_sessions(binary(), binary(), jid()) -> ok | error. +delete_sessions(LUser, LServer, PushJID) -> + Mod = gen_mod:db_mod(LServer, ?MODULE), + LookupFun = fun() -> Mod:lookup_sessions(LUser, LServer, PushJID) end, + delete_sessions(LUser, LServer, LookupFun, Mod). + +-spec delete_sessions(binary(), binary(), fun(() -> ok | error), module()) + -> ok | error. +delete_sessions(LUser, LServer, LookupFun, Mod) -> + case LookupFun() of + {ok, Clients} -> + case use_cache(Mod, LServer) of + true -> + ets_cache:delete(?PUSH_CACHE, {LUser, LServer}, + cache_nodes(Mod, LServer)); + false -> + ok + end, + lists:foreach( + fun({TS, _, _, _}) -> + ok = Mod:delete_session(LUser, LServer, TS), + case use_cache(Mod, LServer) of + true -> + ets_cache:delete(?PUSH_CACHE, + {LUser, LServer, TS}, + cache_nodes(Mod, LServer)); + false -> + ok + end + end, Clients); + error -> + error + end. + +-spec drop_online_sessions(binary(), binary(), [push_session()]) + -> [push_session()]. +drop_online_sessions(LUser, LServer, Clients) -> + SessIDs = ejabberd_sm:get_session_sids(LUser, LServer), + [Client || {TS, _, _, _} = Client <- Clients, + not lists:keyfind(TS, 1, SessIDs)]. + +%%-------------------------------------------------------------------- +%% Caching. +%%-------------------------------------------------------------------- +-spec init_cache(module(), binary(), gen_mod:opts()) -> ok. +init_cache(Mod, Host, Opts) -> + case use_cache(Mod, Host) of + true -> + CacheOpts = cache_opts(Host, Opts), + ets_cache:new(?PUSH_CACHE, CacheOpts); + false -> + ets_cache:delete(?PUSH_CACHE) + end. + +-spec cache_opts(binary(), gen_mod:opts()) -> [proplists:property()]. +cache_opts(Host, Opts) -> + MaxSize = gen_mod:get_opt( + cache_size, Opts, + ejabberd_config:cache_size(Host)), + CacheMissed = gen_mod:get_opt( + cache_missed, Opts, + ejabberd_config:cache_missed(Host)), + LifeTime = case gen_mod:get_opt( + cache_life_time, Opts, + ejabberd_config:cache_life_time(Host)) of + infinity -> infinity; + I -> timer:seconds(I) + end, + [{max_size, MaxSize}, {cache_missed, CacheMissed}, {life_time, LifeTime}]. + +-spec use_cache(module(), binary()) -> boolean(). +use_cache(Mod, Host) -> + case erlang:function_exported(Mod, use_cache, 1) of + true -> Mod:use_cache(Host); + false -> + gen_mod:get_module_opt( + Host, ?MODULE, use_cache, + ejabberd_config:use_cache(Host)) + end. + +-spec cache_nodes(module(), binary()) -> [node()]. +cache_nodes(Mod, Host) -> + case erlang:function_exported(Mod, cache_nodes, 1) of + true -> Mod:cache_nodes(Host); + false -> ejabberd_cluster:get_nodes() + end. diff --git a/src/mod_push_mnesia.erl b/src/mod_push_mnesia.erl new file mode 100644 index 000000000..04ea8d60a --- /dev/null +++ b/src/mod_push_mnesia.erl @@ -0,0 +1,204 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_push_mnesia.erl +%%% Author : Holger Weiss +%%% Purpose : Mnesia backend for Push Notifications (XEP-0357) +%%% Created : 15 Jul 2017 by Holger Weiss +%%% +%%% +%%% ejabberd, Copyright (C) 2017 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- + +-module(mod_push_mnesia). +-author('holger@zedat.fu-berlin.de'). + +-behavior(mod_push). + +%% API +-export([init/2, store_session/6, lookup_session/4, lookup_session/3, + lookup_sessions/3, lookup_sessions/2, lookup_sessions/1, + delete_session/3, delete_old_sessions/2]). + +-include_lib("stdlib/include/ms_transform.hrl"). +-include("logger.hrl"). +-include("xmpp.hrl"). + +-record(push_session, + {us = {<<"">>, <<"">>} :: {binary(), binary()}, + timestamp = p1_time_compat:timestamp() :: erlang:timestamp(), + service = {<<"">>, <<"">>, <<"">>} :: ljid(), + node = <<"">> :: binary(), + xdata = #xdata{} :: xdata()}). + +%%%------------------------------------------------------------------- +%%% API +%%%------------------------------------------------------------------- +init(_Host, _Opts) -> + ejabberd_mnesia:create(?MODULE, push_session, + [{disc_only_copies, [node()]}, + {type, bag}, + {attributes, record_info(fields, push_session)}]). + +store_session(LUser, LServer, TS, PushJID, Node, XData) -> + US = {LUser, LServer}, + PushLJID = jid:tolower(PushJID), + MaxSessions = ejabberd_sm:get_max_user_sessions(LUser, LServer), + F = fun() -> + if is_integer(MaxSessions) -> + enforce_max_sessions(US, MaxSessions - 1); + MaxSessions == infinity -> + ok + end, + mnesia:write(#push_session{us = US, + timestamp = TS, + service = PushLJID, + node = Node, + xdata = XData}) + end, + case mnesia:transaction(F) of + {atomic, ok} -> + {ok, {TS, PushLJID, Node, XData}}; + {aborted, E} -> + ?ERROR_MSG("Cannot store push session for ~s@~s: ~p", + [LUser, LServer, E]), + error + end. + +lookup_session(LUser, LServer, PushJID, Node) -> + PushLJID = jid:tolower(PushJID), + MatchSpec = ets:fun2ms( + fun(#push_session{us = {U, S}, service = P, node = N} = Rec) + when U == LUser, + S == LServer, + P == PushLJID, + N == Node -> + Rec + end), + case mnesia:dirty_select(push_session, MatchSpec) of + [#push_session{timestamp = TS, xdata = XData}] -> + {ok, {TS, PushLJID, Node, XData}}; + _ -> + ?DEBUG("No push session found for ~s@~s (~p, ~s)", + [LUser, LServer, PushJID, Node]), + error + end. + +lookup_session(LUser, LServer, TS) -> + MatchSpec = ets:fun2ms( + fun(#push_session{us = {U, S}, timestamp = T} = Rec) + when U == LUser, + S == LServer, + T == TS -> + Rec + end), + case mnesia:dirty_select(push_session, MatchSpec) of + [#push_session{service = PushLJID, node = Node, xdata = XData}] -> + {ok, {TS, PushLJID, Node, XData}}; + _ -> + ?DEBUG("No push session found for ~s@~s (~p)", + [LUser, LServer, TS]), + error + end. + +lookup_sessions(LUser, LServer, PushJID) -> + PushLJID = jid:tolower(PushJID), + MatchSpec = ets:fun2ms( + fun(#push_session{us = {U, S}, service = P, node = N} = Rec) + when U == LUser, + S == LServer, + P == PushLJID -> + Rec + end), + {ok, mnesia:dirty_select(push_session, MatchSpec)}. + +lookup_sessions(LUser, LServer) -> + Records = mnesia:dirty_read(push_session, {LUser, LServer}), + Clients = [{TS, PushLJID, Node, XData} + || #push_session{timestamp = TS, + service = PushLJID, + node = Node, + xdata = XData} <- Records], + {ok, Clients}. + +lookup_sessions(LServer) -> + MatchSpec = ets:fun2ms( + fun(#push_session{us = {_U, S}, + timestamp = TS, + service = PushLJID, + node = Node, + xdata = XData}) + when S == LServer -> + {TS, PushLJID, Node, XData} + end), + {ok, mnesia:dirty_select(push_session, MatchSpec)}. + +delete_session(LUser, LServer, TS) -> + MatchSpec = ets:fun2ms( + fun(#push_session{us = {U, S}, timestamp = T} = Rec) + when U == LUser, + S == LServer, + T == TS -> + Rec + end), + F = fun() -> + Recs = mnesia:select(push_session, MatchSpec), + lists:foreach(fun mnesia:delete_object/1, Recs) + end, + case mnesia:transaction(F) of + {atomic, ok} -> + ok; + {aborted, E} -> + ?ERROR_MSG("Cannot delete push seesion of ~s@~s: ~p", + [LUser, LServer, E]), + error + end. + +delete_old_sessions(_LServer, Time) -> + DelIfOld = fun(#push_session{timestamp = T} = Rec, ok) when T < Time -> + mnesia:delete_object(Rec); + (_Rec, ok) -> + ok + end, + F = fun() -> + mnesia:foldl(DelIfOld, ok, push_session) + end, + case mnesia:transaction(F) of + {atomic, ok} -> + ok; + {aborted, E} -> + ?ERROR_MSG("Cannot delete old push sessions: ~p", [E]), + error + end. + +%%-------------------------------------------------------------------- +%% Internal functions. +%%-------------------------------------------------------------------- +-spec enforce_max_sessions({binary(), binary()}, non_neg_integer()) -> ok. +enforce_max_sessions({U, S} = US, Max) -> + Recs = mnesia:wread({push_session, US}), + NumRecs = length(Recs), + if NumRecs > Max -> + NumOldRecs = NumRecs - Max, + Recs1 = lists:keysort(#push_session.timestamp, Recs), + Recs2 = lists:reverse(Recs1), + OldRecs = lists:sublist(Recs2, Max + 1, NumOldRecs), + ?INFO_MSG("Disabling ~B old push session(s) of ~s@~s", + [NumOldRecs, U, S]), + lists:foreach(fun(Rec) -> mnesia:delete_object(Rec) end, OldRecs); + true -> + ok + end. diff --git a/test/ejabberd_SUITE.erl b/test/ejabberd_SUITE.erl index 17465617b..539d8dc33 100644 --- a/test/ejabberd_SUITE.erl +++ b/test/ejabberd_SUITE.erl @@ -431,6 +431,7 @@ db_tests(DB) when DB == mnesia; DB == redis -> mam_tests:single_cases(), carbons_tests:single_cases(), csi_tests:single_cases(), + push_tests:single_cases(), test_unregister]}, muc_tests:master_slave_cases(), privacy_tests:master_slave_cases(), @@ -441,7 +442,8 @@ db_tests(DB) when DB == mnesia; DB == redis -> vcard_tests:master_slave_cases(), announce_tests:master_slave_cases(), carbons_tests:master_slave_cases(), - csi_tests:master_slave_cases()]; + csi_tests:master_slave_cases(), + push_tests:master_slave_cases()]; db_tests(_) -> [{single_user, [sequence], [test_register, diff --git a/test/ejabberd_SUITE_data/ejabberd.yml b/test/ejabberd_SUITE_data/ejabberd.yml index 89618c0c0..5ee287400 100644 --- a/test/ejabberd_SUITE_data/ejabberd.yml +++ b/test/ejabberd_SUITE_data/ejabberd.yml @@ -231,7 +231,10 @@ Welcome to this XMPP server." mod_disco: [] mod_ping: [] mod_proxy65: [] + mod_push: [] mod_s2s_dialback: [] + mod_stream_mgmt: + resume_timeout: 3 mod_legacy_auth: [] mod_register: welcome_message: @@ -290,7 +293,10 @@ Welcome to this XMPP server." mod_disco: [] mod_ping: [] mod_proxy65: [] + mod_push: [] mod_s2s_dialback: [] + mod_stream_mgmt: + resume_timeout: 3 mod_legacy_auth: [] mod_register: welcome_message: diff --git a/test/push_tests.erl b/test/push_tests.erl new file mode 100644 index 000000000..535671ee1 --- /dev/null +++ b/test/push_tests.erl @@ -0,0 +1,232 @@ +%%%------------------------------------------------------------------- +%%% Author : Holger Weiss +%%% Created : 15 Jul 2017 by Holger Weiss +%%% +%%% +%%% ejabberd, Copyright (C) 2017 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- + +-module(push_tests). + +%% API +-compile(export_all). +-import(suite, [close_socket/1, connect/1, disconnect/1, get_event/1, + get_features/2, make_iq_result/1, my_jid/1, put_event/2, recv/1, + recv_iq/1, recv_message/1, self_presence/2, send/2, send_recv/2, + server_jid/1]). + +-include("suite.hrl"). + +-define(PUSH_NODE, <<"d3v1c3">>). +-define(PUSH_XDATA_FIELDS, + [#xdata_field{var = <<"FORM_TYPE">>, + values = [?NS_PUBSUB_PUBLISH_OPTIONS]}, + #xdata_field{var = <<"secret">>, + values = [<<"c0nf1d3nt14l">>]}]). + +%%%=================================================================== +%%% API +%%%=================================================================== +%%%=================================================================== +%%% Single user tests +%%%=================================================================== +single_cases() -> + {push_single, [sequence], + [single_test(feature_enabled), + single_test(unsupported_iq)]}. + +feature_enabled(Config) -> + BareMyJID = jid:remove_resource(my_jid(Config)), + Features = get_features(Config, BareMyJID), + true = lists:member(?NS_PUSH_0, Features), + disconnect(Config). + +unsupported_iq(Config) -> + PushJID = my_jid(Config), + lists:foreach( + fun(SubEl) -> + #iq{type = error} = + send_recv(Config, #iq{type = get, sub_els = [SubEl]}) + end, [#push_enable{jid = PushJID}, #push_disable{jid = PushJID}]), + disconnect(Config). + +%%%=================================================================== +%%% Master-slave tests +%%%=================================================================== +master_slave_cases() -> + {push_master_slave, [sequence], + [master_slave_test(sm), + master_slave_test(offline), + master_slave_test(mam)]}. + +sm_master(Config) -> + ct:comment("Waiting for the slave to close the socket"), + peer_down = get_event(Config), + ct:comment("Sending message to the slave"), + send_test_message(Config), + ct:comment("Handling push notification"), + handle_notification(Config), + ct:comment("Receiving bounced message from the slave"), + #message{type = error} = recv_message(Config), + ct:comment("Closing the connection"), + disconnect(Config). + +sm_slave(Config) -> + ct:comment("Enabling push notifications"), + ok = enable_push(Config), + ct:comment("Enabling stream management"), + ok = enable_sm(Config), + ct:comment("Closing the socket"), + close_socket(Config). + +offline_master(Config) -> + ct:comment("Waiting for the slave to be ready"), + ready = get_event(Config), + ct:comment("Sending message to the slave"), + send_test_message(Config), % No push notification, slave is online. + ct:comment("Waiting for the slave to disconnect"), + peer_down = get_event(Config), + ct:comment("Sending message to offline storage"), + send_test_message(Config), + ct:comment("Handling push notification for offline message"), + handle_notification(Config), + ct:comment("Closing the connection"), + disconnect(Config). + +offline_slave(Config) -> + ct:comment("Re-enabling push notifications"), + ok = enable_push(Config), + ct:comment("Letting the master know that we're ready"), + put_event(Config, ready), + ct:comment("Receiving message from the master"), + recv_test_message(Config), + ct:comment("Closing the connection"), + disconnect(Config). + +mam_master(Config) -> + ct:comment("Waiting for the slave to be ready"), + ready = get_event(Config), + ct:comment("Sending message to the slave"), + send_test_message(Config), + ct:comment("Handling push notification for MAM message"), + handle_notification(Config), + ct:comment("Closing the connection"), + disconnect(Config). + +mam_slave(Config) -> + self_presence(Config, available), + ct:comment("Receiving message from offline storage"), + recv_test_message(Config), + ct:comment("Re-enabling push notifications"), + ok = enable_push(Config), + ct:comment("Enabling MAM"), + ok = enable_mam(Config), + ct:comment("Letting the master know that we're ready"), + put_event(Config, ready), + ct:comment("Receiving message from the master"), + recv_test_message(Config), + ct:comment("Waiting for the master to disconnect"), + peer_down = get_event(Config), + ct:comment("Disabling push notifications"), + ok = disable_push(Config), + ct:comment("Closing the connection and cleaning up"), + clean(disconnect(Config)). + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +single_test(T) -> + list_to_atom("push_" ++ atom_to_list(T)). + +master_slave_test(T) -> + {list_to_atom("push_" ++ atom_to_list(T)), [parallel], + [list_to_atom("push_" ++ atom_to_list(T) ++ "_master"), + list_to_atom("push_" ++ atom_to_list(T) ++ "_slave")]}. + +enable_sm(Config) -> + send(Config, #sm_enable{xmlns = ?NS_STREAM_MGMT_3, resume = true}), + case recv(Config) of + #sm_enabled{resume = true} -> + ok; + #sm_failed{reason = Reason} -> + Reason + end. + +enable_mam(Config) -> + case send_recv( + Config, #iq{type = set, sub_els = [#mam_prefs{xmlns = ?NS_MAM_1, + default = always}]}) of + #iq{type = result} -> + ok; + #iq{type = error} = Err -> + xmpp:get_error(Err) + end. + +enable_push(Config) -> + %% Usually, the push JID would be a server JID (such as push.example.com). + %% We specify the peer's full user JID instead, so the push notifications + %% will be sent to the peer. + PushJID = ?config(peer, Config), + XData = #xdata{type = submit, fields = ?PUSH_XDATA_FIELDS}, + case send_recv( + Config, #iq{type = set, + sub_els = [#push_enable{jid = PushJID, + node = ?PUSH_NODE, + xdata = XData}]}) of + #iq{type = result, sub_els = []} -> + ok; + #iq{type = error} = Err -> + xmpp:get_error(Err) + end. + +disable_push(Config) -> + PushJID = ?config(peer, Config), + case send_recv( + Config, #iq{type = set, + sub_els = [#push_disable{jid = PushJID, + node = ?PUSH_NODE}]}) of + #iq{type = result, sub_els = []} -> + ok; + #iq{type = error} = Err -> + xmpp:get_error(Err) + end. + +send_test_message(Config) -> + Peer = ?config(peer, Config), + Msg = #message{to = Peer, body = [#text{data = <<"test">>}]}, + send(Config, Msg). + +recv_test_message(Config) -> + Peer = ?config(peer, Config), + #message{from = Peer, + body = [#text{data = <<"test">>}]} = recv_message(Config). + +handle_notification(Config) -> + From = server_jid(Config), + Item = #ps_item{xml_els = [xmpp:encode(#push_notification{})]}, + Publish = #ps_publish{node = ?PUSH_NODE, items = [Item]}, + XData = #xdata{type = submit, fields = ?PUSH_XDATA_FIELDS}, + PubSub = #pubsub{publish = Publish, publish_options = XData}, + IQ = #iq{type = set, from = From, sub_els = [PubSub]} = recv_iq(Config), + send(Config, make_iq_result(IQ)). + +clean(Config) -> + {U, S, _} = jid:tolower(my_jid(Config)), + mod_push:remove_user(U, S), + mod_mam:remove_user(U, S), + Config.