%%%---------------------------------------------------------------------- %%% File : mod_push.erl %%% Author : Holger Weiss %%% Purpose : Push Notifications (XEP-0357) %%% Created : 15 Jul 2017 by Holger Weiss %%% %%% %%% ejabberd, Copyright (C) 2017 ProcessOne %%% %%% This program is free software; you can redistribute it and/or %%% modify it under the terms of the GNU General Public License as %%% published by the Free Software Foundation; either version 2 of the %%% License, or (at your option) any later version. %%% %%% This program is distributed in the hope that it will be useful, %%% but WITHOUT ANY WARRANTY; without even the implied warranty of %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU %%% General Public License for more details. %%% %%% You should have received a copy of the GNU General Public License along %%% with this program; if not, write to the Free Software Foundation, Inc., %%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. %%% %%%---------------------------------------------------------------------- -module(mod_push). -author('holger@zedat.fu-berlin.de'). -protocol({xep, 357, '0.2'}). -behavior(gen_mod). %% gen_mod callbacks. -export([start/2, stop/1, reload/3, mod_opt_type/1, depends/2]). %% ejabberd_hooks callbacks. -export([disco_sm_features/5, c2s_session_pending/1, c2s_copy_session/2, c2s_handle_cast/2, c2s_stanza/3, mam_message/6, offline_message/1, remove_user/2]). %% gen_iq_handler callback. -export([process_iq/1]). %% ejabberd command. -export([get_commands_spec/0, delete_old_sessions/1]). -include("ejabberd.hrl"). -include("ejabberd_commands.hrl"). -include("logger.hrl"). -include("xmpp.hrl"). -define(PUSH_CACHE, push_cache). -type c2s_state() :: ejabberd_c2s:state(). -type timestamp() :: erlang:timestamp(). -type push_session() :: {timestamp(), ljid(), binary(), xdata()}. -callback init(binary(), gen_mod:opts()) -> any(). -callback store_session(binary(), binary(), timestamp(), jid(), binary(), xdata()) -> {ok, push_session()} | error. -callback lookup_session(binary(), binary(), jid(), binary()) -> {ok, push_session()} | error. -callback lookup_session(binary(), binary(), timestamp()) -> {ok, push_session()} | error. -callback lookup_sessions(binary(), binary(), jid()) -> {ok, [push_session()]} | error. -callback lookup_sessions(binary(), binary()) -> {ok, [push_session()]} | error. -callback lookup_sessions(binary()) -> {ok, [push_session()]} | error. -callback delete_session(binary(), binary(), timestamp()) -> ok | error. -callback delete_old_sessions(binary() | global, erlang:timestamp()) -> any(). -callback use_cache(binary()) -> boolean(). -callback cache_nodes(binary()) -> [node()]. -optional_callbacks([use_cache/1, cache_nodes/1]). %%-------------------------------------------------------------------- %% gen_mod callbacks. %%-------------------------------------------------------------------- -spec start(binary(), gen_mod:opts()) -> ok. start(Host, Opts) -> IQDisc = gen_mod:get_opt(iqdisc, Opts, gen_iq_handler:iqdisc(Host)), Mod = gen_mod:db_mod(Host, Opts, ?MODULE), Mod:init(Host, Opts), init_cache(Mod, Host, Opts), register_iq_handlers(Host, IQDisc), register_hooks(Host), ejabberd_commands:register_commands(get_commands_spec()). -spec stop(binary()) -> ok. stop(Host) -> unregister_hooks(Host), unregister_iq_handlers(Host), ejabberd_commands:unregister_commands(get_commands_spec()). -spec reload(binary(), gen_mod:opts(), gen_mod:opts()) -> ok. reload(Host, NewOpts, OldOpts) -> NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE), OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE), if NewMod /= OldMod -> NewMod:init(Host, NewOpts); true -> ok end, case gen_mod:is_equal_opt(iqdisc, NewOpts, OldOpts, gen_iq_handler:iqdisc(Host)) of {false, IQDisc, _} -> register_iq_handlers(Host, IQDisc); true -> ok end. -spec depends(binary(), gen_mod:opts()) -> [{module(), hard | soft}]. depends(_Host, _Opts) -> []. -spec mod_opt_type(atom()) -> fun((term()) -> term()) | [atom()]. mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end; mod_opt_type(O) when O == cache_life_time; O == cache_size -> fun(I) when is_integer(I), I > 0 -> I; (infinity) -> infinity end; mod_opt_type(O) when O == use_cache; O == cache_missed -> fun (B) when is_boolean(B) -> B end; mod_opt_type(iqdisc) -> fun gen_iq_handler:check_type/1; mod_opt_type(_) -> [db_type, cache_life_time, cache_size, use_cache, cache_missed, iqdisc]. %%-------------------------------------------------------------------- %% ejabberd command callback. %%-------------------------------------------------------------------- -spec get_commands_spec() -> [ejabberd_commands()]. get_commands_spec() -> [#ejabberd_commands{name = delete_old_push_sessions, tags = [purge], desc = "Remove push sessions older than DAYS", module = ?MODULE, function = delete_old_sessions, args = [{days, integer}], result = {res, rescode}}]. -spec delete_old_sessions(non_neg_integer()) -> ok | any(). delete_old_sessions(Days) -> CurrentTime = p1_time_compat:system_time(micro_seconds), Diff = Days * 24 * 60 * 60 * 1000000, TimeStamp = misc:usec_to_now(CurrentTime - Diff), DBTypes = lists:usort( lists:map( fun(Host) -> case gen_mod:db_type(Host, ?MODULE) of sql -> {sql, Host}; Other -> {Other, global} end end, ?MYHOSTS)), Results = lists:map( fun({DBType, Host}) -> Mod = gen_mod:db_mod(DBType, ?MODULE), Mod:delete_old_sessions(Host, TimeStamp) end, DBTypes), ets_cache:clear(?PUSH_CACHE, ejabberd_cluster:get_nodes()), case lists:filter(fun(Res) -> Res /= ok end, Results) of [] -> ?INFO_MSG("Deleted push sessions older than ~B days", [Days]), ok; [NotOk | _] -> ?ERROR_MSG("Error while deleting old push sessions: ~p", [NotOk]), NotOk end. %%-------------------------------------------------------------------- %% Register/unregister hooks. %%-------------------------------------------------------------------- -spec register_hooks(binary()) -> ok. register_hooks(Host) -> ejabberd_hooks:add(disco_sm_features, Host, ?MODULE, disco_sm_features, 50), ejabberd_hooks:add(c2s_session_pending, Host, ?MODULE, c2s_session_pending, 50), ejabberd_hooks:add(c2s_copy_session, Host, ?MODULE, c2s_copy_session, 50), ejabberd_hooks:add(c2s_handle_cast, Host, ?MODULE, c2s_handle_cast, 50), ejabberd_hooks:add(c2s_handle_send, Host, ?MODULE, c2s_stanza, 50), ejabberd_hooks:add(store_mam_message, Host, ?MODULE, mam_message, 50), ejabberd_hooks:add(offline_message_hook, Host, ?MODULE, offline_message, 50), ejabberd_hooks:add(remove_user, Host, ?MODULE, remove_user, 50). -spec unregister_hooks(binary()) -> ok. unregister_hooks(Host) -> ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, disco_sm_features, 50), ejabberd_hooks:delete(c2s_session_pending, Host, ?MODULE, c2s_session_pending, 50), ejabberd_hooks:delete(c2s_copy_session, Host, ?MODULE, c2s_copy_session, 50), ejabberd_hooks:delete(c2s_handle_cast, Host, ?MODULE, c2s_handle_cast, 50), ejabberd_hooks:delete(c2s_handle_send, Host, ?MODULE, c2s_stanza, 50), ejabberd_hooks:delete(store_mam_message, Host, ?MODULE, mam_message, 50), ejabberd_hooks:delete(offline_message_hook, Host, ?MODULE, offline_message, 50), ejabberd_hooks:delete(remove_user, Host, ?MODULE, remove_user, 50). %%-------------------------------------------------------------------- %% Service discovery. %%-------------------------------------------------------------------- -spec disco_sm_features(empty | {result, [binary()]} | {error, stanza_error()}, jid(), jid(), binary(), binary()) -> {result, [binary()]} | {error, stanza_error()}. disco_sm_features(empty, From, To, Node, Lang) -> disco_sm_features({result, []}, From, To, Node, Lang); disco_sm_features({result, OtherFeatures}, #jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}, <<"">>, _Lang) -> {result, [?NS_PUSH_0 | OtherFeatures]}; disco_sm_features(Acc, _From, _To, _Node, _Lang) -> Acc. %%-------------------------------------------------------------------- %% IQ handlers. %%-------------------------------------------------------------------- -spec register_iq_handlers(binary(), gen_iq_handler:type()) -> ok. register_iq_handlers(Host, IQDisc) -> gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_PUSH_0, ?MODULE, process_iq, IQDisc). -spec unregister_iq_handlers(binary()) -> ok. unregister_iq_handlers(Host) -> gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_PUSH_0). -spec process_iq(iq()) -> iq(). process_iq(#iq{type = get, lang = Lang} = IQ) -> Txt = <<"Value 'get' of 'type' attribute is not allowed">>, xmpp:make_error(IQ, xmpp:err_not_allowed(Txt, Lang)); process_iq(#iq{lang = Lang, sub_els = [#push_enable{node = <<>>}]} = IQ) -> Txt = <<"Enabling push without 'node' attribute is not supported">>, xmpp:make_error(IQ, xmpp:err_feature_not_implemented(Txt, Lang)); process_iq(#iq{from = #jid{lserver = LServer} = JID, to = #jid{lserver = LServer}, sub_els = [#push_enable{jid = PushJID, node = Node, xdata = XData}]} = IQ) -> case enable(JID, PushJID, Node, XData) of ok -> xmpp:make_iq_result(IQ); error -> xmpp:make_error(IQ, xmpp:err_internal_server_error()) end; process_iq(#iq{from = #jid{lserver = LServer} = JID, to = #jid{lserver = LServer}, sub_els = [#push_disable{jid = PushJID, node = Node}]} = IQ) -> case disable(JID, PushJID, Node) of ok -> xmpp:make_iq_result(IQ); error -> xmpp:make_error(IQ, xmpp:err_item_not_found()) end; process_iq(IQ) -> xmpp:make_error(IQ, xmpp:err_not_allowed()). -spec enable(jid(), jid(), binary(), xdata()) -> ok | error. enable(#jid{luser = LUser, lserver = LServer, lresource = LResource} = JID, PushJID, Node, XData) -> case ejabberd_sm:get_session_sid(LUser, LServer, LResource) of {TS, PID} -> case store_session(LUser, LServer, TS, PushJID, Node, XData) of {ok, _} -> ?INFO_MSG("Enabling push notifications for ~s", [jid:encode(JID)]), ejabberd_c2s:cast(PID, push_enable); error -> ?ERROR_MSG("Cannot enable push for ~s: database error", [jid:encode(JID)]), error end; none -> ?WARNING_MSG("Cannot enable push for ~s: session not found", [jid:encode(JID)]), error end. -spec disable(jid(), jid(), binary() | undefined) -> ok | error. disable(#jid{luser = LUser, lserver = LServer, lresource = LResource} = JID, PushJID, Node) -> case ejabberd_sm:get_session_sid(LUser, LServer, LResource) of {_TS, PID} -> ?INFO_MSG("Disabling push notifications for ~s", [jid:encode(JID)]), ejabberd_c2s:cast(PID, push_disable); none -> ?WARNING_MSG("Session not found while disabling push for ~s", [jid:encode(JID)]) end, if Node /= undefined -> delete_session(LUser, LServer, PushJID, Node); true -> delete_sessions(LUser, LServer, PushJID) end. %%-------------------------------------------------------------------- %% Hook callbacks. %%-------------------------------------------------------------------- -spec c2s_stanza(c2s_state(), xmpp_element() | xmlel(), term()) -> c2s_state(). c2s_stanza(#{push_enabled := true, mgmt_state := pending} = State, _Pkt, _SendResult) -> notify(State), State; c2s_stanza(State, _Pkt, _SendResult) -> State. -spec mam_message(message() | drop, binary(), binary(), jid(), chat | groupchat, recv | send) -> message(). mam_message(#message{meta = #{push_notified := true}} = Pkt, _LUser, _LServer, _Peer, _Type, _Dir) -> Pkt; mam_message(#message{} = Pkt, LUser, LServer, _Peer, chat, _Dir) -> case lookup_sessions(LUser, LServer) of {ok, [_|_] = Clients} -> case drop_online_sessions(LUser, LServer, Clients) of [_|_] = Clients1 -> ?DEBUG("Notifying ~s@~s of MAM message", [LUser, LServer]), notify(LUser, LServer, Clients1); [] -> ok end; _ -> ok end, xmpp:put_meta(Pkt, push_notified, true); mam_message(Pkt, _LUser, _LServer, _Peer, _Type, _Dir) -> Pkt. -spec offline_message({any(), message()}) -> {any(), message()}. offline_message({_Action, #message{meta = #{push_notified := true}}} = Acc) -> Acc; offline_message({Action, #message{to = #jid{luser = LUser, lserver = LServer}} = Pkt}) -> case lookup_sessions(LUser, LServer) of {ok, [_|_] = Clients} -> ?DEBUG("Notifying ~s@~s of offline message", [LUser, LServer]), notify(LUser, LServer, Clients); _ -> ok end, {Action, xmpp:put_meta(Pkt, push_notified, true)}. -spec c2s_session_pending(c2s_state()) -> c2s_state(). c2s_session_pending(#{push_enabled := true, mgmt_queue := Queue} = State) -> case p1_queue:len(Queue) of Len when Len > 0 -> ?DEBUG("Notifying client of unacknowledged messages", []), notify(State), State; 0 -> State end; c2s_session_pending(State) -> State. -spec c2s_copy_session(c2s_state(), c2s_state()) -> c2s_state(). c2s_copy_session(State, #{push_enabled := true}) -> State#{push_enabled => true}; c2s_copy_session(State, _) -> State. -spec c2s_handle_cast(c2s_state(), any()) -> c2s_state() | {stop, c2s_state()}. c2s_handle_cast(State, push_enable) -> {stop, State#{push_enabled => true}}; c2s_handle_cast(State, push_disable) -> {stop, maps:remove(push_enabled, State)}; c2s_handle_cast(State, _Msg) -> State. -spec remove_user(binary(), binary()) -> ok | error. remove_user(LUser, LServer) -> ?INFO_MSG("Removing any push sessions of ~s@~s", [LUser, LServer]), Mod = gen_mod:db_mod(LServer, ?MODULE), LookupFun = fun() -> Mod:lookup_sessions(LUser, LServer) end, delete_sessions(LUser, LServer, LookupFun, Mod). %%-------------------------------------------------------------------- %% Internal functions. %%-------------------------------------------------------------------- -spec notify(c2s_state()) -> ok. notify(#{jid := #jid{luser = LUser, lserver = LServer}, sid := {TS, _}}) -> case lookup_session(LUser, LServer, TS) of {ok, Client} -> notify(LUser, LServer, [Client]); error -> ok end. -spec notify(binary(), binary(), [push_session()]) -> ok. notify(LUser, LServer, Clients) -> lists:foreach( fun({TS, PushLJID, Node, XData}) -> HandleResponse = fun(#iq{type = result}) -> ok; (#iq{type = error}) -> delete_session(LUser, LServer, TS); (timeout) -> ok % Hmm. end, notify(LServer, PushLJID, Node, XData, HandleResponse) end, Clients). -spec notify(binary(), ljid(), binary(), xdata(), fun((iq() | timeout) -> any())) -> ok. notify(LServer, PushLJID, Node, XData, HandleResponse) -> From = jid:make(LServer), Item = #ps_item{xml_els = [xmpp:encode(#push_notification{})]}, PubSub = #pubsub{publish = #ps_publish{node = Node, items = [Item]}, publish_options = XData}, IQ = #iq{type = set, from = From, to = jid:make(PushLJID), id = randoms:get_string(), sub_els = [PubSub]}, ejabberd_local:route_iq(IQ, HandleResponse), ok. -spec store_session(binary(), binary(), timestamp(), jid(), binary(), xdata()) -> {ok, push_session()} | error. store_session(LUser, LServer, TS, PushJID, Node, XData) -> Mod = gen_mod:db_mod(LServer, ?MODULE), delete_session(LUser, LServer, PushJID, Node), case use_cache(Mod, LServer) of true -> ets_cache:delete(?PUSH_CACHE, {LUser, LServer}, cache_nodes(Mod, LServer)), ets_cache:update( ?PUSH_CACHE, {LUser, LServer, TS}, {ok, {TS, PushJID, Node, XData}}, fun() -> Mod:store_session(LUser, LServer, TS, PushJID, Node, XData) end, cache_nodes(Mod, LServer)); false -> Mod:store_session(LUser, LServer, TS, PushJID, Node, XData) end. -spec lookup_session(binary(), binary(), timestamp()) -> {ok, push_session()} | error. lookup_session(LUser, LServer, TS) -> Mod = gen_mod:db_mod(LServer, ?MODULE), case use_cache(Mod, LServer) of true -> ets_cache:lookup( ?PUSH_CACHE, {LUser, LServer, TS}, fun() -> Mod:lookup_session(LUser, LServer, TS) end); false -> Mod:lookup_session(LUser, LServer, TS) end. -spec lookup_sessions(binary(), binary()) -> {ok, [push_session()]} | error. lookup_sessions(LUser, LServer) -> Mod = gen_mod:db_mod(LServer, ?MODULE), case use_cache(Mod, LServer) of true -> ets_cache:lookup( ?PUSH_CACHE, {LUser, LServer}, fun() -> Mod:lookup_sessions(LUser, LServer) end); false -> Mod:lookup_sessions(LUser, LServer) end. -spec delete_session(binary(), binary(), timestamp()) -> ok | error. delete_session(LUser, LServer, TS) -> Mod = gen_mod:db_mod(LServer, ?MODULE), ok = Mod:delete_session(LUser, LServer, TS), case use_cache(Mod, LServer) of true -> ets_cache:delete(?PUSH_CACHE, {LUser, LServer}, cache_nodes(Mod, LServer)), ets_cache:delete(?PUSH_CACHE, {LUser, LServer, TS}, cache_nodes(Mod, LServer)); false -> ok end. -spec delete_session(binary(), binary(), jid(), binary()) -> ok | error. delete_session(LUser, LServer, PushJID, Node) -> Mod = gen_mod:db_mod(LServer, ?MODULE), case Mod:lookup_session(LUser, LServer, PushJID, Node) of {ok, {TS, _, _, _}} -> delete_session(LUser, LServer, TS); error -> error end. -spec delete_sessions(binary(), binary(), jid()) -> ok | error. delete_sessions(LUser, LServer, PushJID) -> Mod = gen_mod:db_mod(LServer, ?MODULE), LookupFun = fun() -> Mod:lookup_sessions(LUser, LServer, PushJID) end, delete_sessions(LUser, LServer, LookupFun, Mod). -spec delete_sessions(binary(), binary(), fun(() -> ok | error), module()) -> ok | error. delete_sessions(LUser, LServer, LookupFun, Mod) -> case LookupFun() of {ok, Clients} -> case use_cache(Mod, LServer) of true -> ets_cache:delete(?PUSH_CACHE, {LUser, LServer}, cache_nodes(Mod, LServer)); false -> ok end, lists:foreach( fun({TS, _, _, _}) -> ok = Mod:delete_session(LUser, LServer, TS), case use_cache(Mod, LServer) of true -> ets_cache:delete(?PUSH_CACHE, {LUser, LServer, TS}, cache_nodes(Mod, LServer)); false -> ok end end, Clients); error -> error end. -spec drop_online_sessions(binary(), binary(), [push_session()]) -> [push_session()]. drop_online_sessions(LUser, LServer, Clients) -> SessIDs = ejabberd_sm:get_session_sids(LUser, LServer), [Client || {TS, _, _, _} = Client <- Clients, not lists:keyfind(TS, 1, SessIDs)]. %%-------------------------------------------------------------------- %% Caching. %%-------------------------------------------------------------------- -spec init_cache(module(), binary(), gen_mod:opts()) -> ok. init_cache(Mod, Host, Opts) -> case use_cache(Mod, Host) of true -> CacheOpts = cache_opts(Host, Opts), ets_cache:new(?PUSH_CACHE, CacheOpts); false -> ets_cache:delete(?PUSH_CACHE) end. -spec cache_opts(binary(), gen_mod:opts()) -> [proplists:property()]. cache_opts(Host, Opts) -> MaxSize = gen_mod:get_opt( cache_size, Opts, ejabberd_config:cache_size(Host)), CacheMissed = gen_mod:get_opt( cache_missed, Opts, ejabberd_config:cache_missed(Host)), LifeTime = case gen_mod:get_opt( cache_life_time, Opts, ejabberd_config:cache_life_time(Host)) of infinity -> infinity; I -> timer:seconds(I) end, [{max_size, MaxSize}, {cache_missed, CacheMissed}, {life_time, LifeTime}]. -spec use_cache(module(), binary()) -> boolean(). use_cache(Mod, Host) -> case erlang:function_exported(Mod, use_cache, 1) of true -> Mod:use_cache(Host); false -> gen_mod:get_module_opt( Host, ?MODULE, use_cache, ejabberd_config:use_cache(Host)) end. -spec cache_nodes(module(), binary()) -> [node()]. cache_nodes(Mod, Host) -> case erlang:function_exported(Mod, cache_nodes, 1) of true -> Mod:cache_nodes(Host); false -> ejabberd_cluster:get_nodes() end.