diff --git a/src/mod_pubsub.erl b/src/mod_pubsub.erl index 194d12444..a67ae5bfc 100644 --- a/src/mod_pubsub.erl +++ b/src/mod_pubsub.erl @@ -52,7 +52,7 @@ %% exports for hooks -export([presence_probe/3, caps_add/3, caps_update/3, in_subscription/6, out_subscription/4, - on_user_offline/3, remove_user/2, + on_user_online/1, on_user_offline/2, remove_user/2, disco_local_identity/5, disco_local_features/5, disco_local_items/5, disco_sm_identity/5, disco_sm_features/5, disco_sm_items/5, @@ -89,11 +89,7 @@ %% API and gen_server callbacks -export([start/2, stop/1, init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, depends/2, export/1]). - --export([send_loop/1, mod_opt_type/1]). - --define(LOOPNAME, ejabberd_mod_pubsub_loop). + terminate/2, code_change/3, depends/2, export/1, mod_opt_type/1]). %%==================================================================== %% API @@ -300,7 +296,9 @@ init([ServerHost, Opts]) -> ?MODULE, process_commands, IQDisc), Plugins end, Hosts), - ejabberd_hooks:add(sm_remove_connection_hook, ServerHost, + ejabberd_hooks:add(c2s_session_opened, ServerHost, + ?MODULE, on_user_online, 75), + ejabberd_hooks:add(c2s_terminated, ServerHost, ?MODULE, on_user_offline, 75), ejabberd_hooks:add(disco_local_identity, ServerHost, ?MODULE, disco_local_identity, 75), @@ -337,34 +335,16 @@ init([ServerHost, Opts]) -> false -> ok end, - {_, State} = init_send_loop(ServerHost, Hosts), - {ok, State}. - -init_send_loop(ServerHost, Hosts) -> NodeTree = config(ServerHost, nodetree), Plugins = config(ServerHost, plugins), - LastItemCache = config(ServerHost, last_item_cache), - MaxItemsNode = config(ServerHost, max_items_node), PepMapping = config(ServerHost, pep_mapping), - PepOffline = config(ServerHost, ignore_pep_from_offline), - Access = config(ServerHost, access), DBType = gen_mod:db_type(ServerHost, ?MODULE), - State = #state{hosts = Hosts, server_host = ServerHost, - access = Access, pep_mapping = PepMapping, - ignore_pep_from_offline = PepOffline, - last_item_cache = LastItemCache, - max_items_node = MaxItemsNode, nodetree = NodeTree, - plugins = Plugins, db_type = DBType}, - Proc = gen_mod:get_module_proc(ServerHost, ?LOOPNAME), - Pid = case whereis(Proc) of - undefined -> - SendLoop = spawn(?MODULE, send_loop, [State]), - register(Proc, SendLoop), - SendLoop; - Loop -> - Loop - end, - {Pid, State}. + {ok, #state{hosts = Hosts, server_host = ServerHost, + access = Access, pep_mapping = PepMapping, + ignore_pep_from_offline = PepOffline, + last_item_cache = LastItemCache, + max_items_node = MaxItemsNode, nodetree = NodeTree, + plugins = Plugins, db_type = DBType}}. depends(ServerHost, Opts) -> Host = gen_mod:get_opt_host(ServerHost, Opts, <<"pubsub.@HOST@">>), @@ -414,94 +394,6 @@ terminate_plugins(Host, ServerHost, Plugins, TreePlugin) -> TreePlugin:terminate(Host, ServerHost), ok. -get_subscribed(User, Server) -> - Items = ejabberd_hooks:run_fold(roster_get, Server, [], [{User, Server}]), - lists:filtermap( - fun(#roster{jid = LJID, subscription = Sub}) - when Sub == both orelse Sub == from -> - {true, LJID}; - (_) -> - false - end, Items). - -send_loop(State) -> - receive - {presence, JID, _Pid} -> - ServerHost = State#state.server_host, - Host = host(State#state.server_host), - DBType = State#state.db_type, - LJID = jid:tolower(JID), - BJID = jid:remove_resource(LJID), - lists:foreach( - fun(PType) -> - Subs = get_subscriptions_for_send_last(Host, PType, DBType, JID, LJID, BJID), - lists:foreach( - fun({NodeRec, _, _, SubJID}) -> - {_, Node} = NodeRec#pubsub_node.nodeid, - Nidx = NodeRec#pubsub_node.id, - Options = NodeRec#pubsub_node.options, - [send_items(Host, Node, Nidx, PType, Options, SubJID, last) - || NodeRec#pubsub_node.type == PType] - end, - lists:usort(Subs)) - end, - State#state.plugins), - if not State#state.ignore_pep_from_offline -> - {User, Server, Resource} = LJID, - Contacts = get_subscribed(User, Server), - lists:foreach( - fun({U, S, R}) when S == ServerHost -> - case user_resources(U, S) of - [] -> %% offline - PeerJID = jid:make(U, S, R), - self() ! {presence, User, Server, [Resource], PeerJID}; - _ -> %% online - %% this is already handled by presence probe - ok - end; - (_) -> - %% we can not do anything in any cases - ok - end, Contacts); - true -> - ok - end, - send_loop(State); - {presence, User, Server, Resources, JID} -> - spawn(fun() -> - Host = host(State#state.server_host), - Owner = jid:remove_resource(jid:tolower(JID)), - lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, type = Type, id = Nidx, options = Options}) -> - case match_option(Options, send_last_published_item, on_sub_and_presence) of - true -> - lists:foreach(fun(Resource) -> - LJID = {User, Server, Resource}, - Subscribed = case get_option(Options, access_model) of - open -> true; - presence -> true; - whitelist -> false; % subscribers are added manually - authorize -> false; % likewise - roster -> - Grps = get_option(Options, roster_groups_allowed, []), - {OU, OS, _} = Owner, - element(2, get_roster_info(OU, OS, LJID, Grps)) - end, - if Subscribed -> send_items(Owner, Node, Nidx, Type, Options, LJID, last); - true -> ok - end - end, - Resources); - _ -> - ok - end - end, - tree_action(Host, get_nodes, [Owner, JID])) - end), - send_loop(State); - stop -> - ok - end. - %% ------- %% disco hooks handling functions %% @@ -660,12 +552,12 @@ disco_items(Host, Node, From) -> end. %% ------- -%% presence hooks handling functions +%% presence and session hooks handling functions %% -spec caps_add(jid(), jid(), [binary()]) -> ok. -caps_add(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID, _Features) - when Host =/= S -> +caps_add(#jid{lserver = S1} = From, #jid{lserver = S2} = To, _Features) + when S1 =/= S2 -> %% When a remote contact goes online while the local user is offline, the %% remote contact won't receive last items from the local user even if %% ignore_pep_from_offline is set to false. To work around this issue a bit, @@ -675,30 +567,36 @@ caps_add(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID %% contact becomes available; the former is also executed when the local %% user goes online (because that triggers the contact to send a presence %% packet with CAPS). - presence(Host, {presence, U, S, [R], JID}); + send_last_pep(To, From); caps_add(_From, _To, _Feature) -> ok. -spec caps_update(jid(), jid(), [binary()]) -> ok. -caps_update(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID, _Features) -> - presence(Host, {presence, U, S, [R], JID}). +caps_update(From, To, _Features) -> + send_last_pep(To, From). -spec presence_probe(jid(), jid(), pid()) -> ok. presence_probe(#jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}, _Pid) -> %% ignore presence_probe from my other ressources - %% to not get duplicated last items ok; -presence_probe(#jid{lserver = S} = From, #jid{lserver = S} = To, Pid) -> - presence(S, {presence, From, Pid}), - presence(S, {presence, From#jid.luser, S, [From#jid.lresource], To}); +presence_probe(#jid{lserver = S} = From, #jid{lserver = S} = To, _Pid) -> + send_last_pep(To, From); presence_probe(_From, _To, _Pid) -> - %% ignore presence_probe from remote contacts, - %% those are handled via caps_add + %% ignore presence_probe from remote contacts, those are handled via caps_add ok. -presence(ServerHost, Presence) -> - gen_mod:get_module_proc(ServerHost, ?LOOPNAME) ! Presence, - ok. +-spec on_user_online(ejabberd_c2s:state()) -> ejabberd_c2s:state(). +on_user_online(C2SState) -> + JID = maps:get(jid, C2SState), + send_last_items(JID), + C2SState. + +-spec on_user_offline(ejabberd_c2s:state(), atom()) -> ejabberd_c2s:state(). +on_user_offline(#{jid := JID} = C2SState, _Reason) -> + purge_offline(jid:tolower(JID)), + C2SState; +on_user_offline(C2SState, _Reason) -> + C2SState. %% ------- %% subscription hooks handling functions @@ -707,14 +605,8 @@ presence(ServerHost, Presence) -> -spec out_subscription( binary(), binary(), jid(), subscribed | unsubscribed | subscribe | unsubscribe) -> boolean(). -out_subscription(User, Server, JID, subscribed) -> - Owner = jid:make(User, Server), - {PUser, PServer, PResource} = jid:tolower(JID), - PResources = case PResource of - <<>> -> user_resources(PUser, PServer); - _ -> [PResource] - end, - presence(Server, {presence, PUser, PServer, PResources, Owner}), +out_subscription(User, Server, To, subscribed) -> + send_last_pep(jid:make(User, Server), To), true; out_subscription(_, _, _, _) -> true. @@ -883,7 +775,9 @@ terminate(_Reason, false -> ok end, - ejabberd_hooks:delete(sm_remove_connection_hook, ServerHost, + ejabberd_hooks:delete(c2s_session_opened, ServerHost, + ?MODULE, on_user_online, 75), + ejabberd_hooks:delete(c2s_terminated, ServerHost, ?MODULE, on_user_offline, 75), ejabberd_hooks:delete(disco_local_identity, ServerHost, ?MODULE, disco_local_identity, 75), @@ -901,12 +795,6 @@ terminate(_Reason, ?MODULE, remove_user, 50), ejabberd_hooks:delete(c2s_handle_info, ServerHost, ?MODULE, c2s_handle_info, 50), - case whereis(gen_mod:get_module_proc(ServerHost, ?LOOPNAME)) of - undefined -> - ?ERROR_MSG("~s process is dead, pubsub was broken", [?LOOPNAME]); - Pid -> - Pid ! stop - end, lists:foreach( fun(Host) -> gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO), @@ -1797,7 +1685,7 @@ subscribe_node(Host, Node, From, JID, Configuration) -> Nidx = TNode#pubsub_node.id, Type = TNode#pubsub_node.type, Options = TNode#pubsub_node.options, - send_items(Host, Node, Nidx, Type, Options, Subscriber, last), + send_items(Host, Node, Nidx, Type, Options, Subscriber, 1), ServerHost = serverhost(Host), ejabberd_hooks:run(pubsub_subscribe_node, ServerHost, [ServerHost, Host, Node, Subscriber, SubId]), @@ -2069,8 +1957,6 @@ purge_node(Host, Node, Owner) -> %% @doc
Return the items of a given node.
%%The number of items to return is limited by MaxItems.
%%The permission are not checked in this function.
-%% @todo We probably need to check that the user doing the query has the right -%% to read the items. -spec get_items(host(), binary(), jid(), binary(), binary(), [binary()], undefined | rsm_set()) -> {result, pubsub()} | {error, stanza_error()}. @@ -2153,43 +2039,23 @@ get_allowed_items_call(Host, Nidx, From, Type, Options, Owners, RSM) -> {PS, RG} = get_presence_and_roster_permissions(Host, From, Owners, AccessModel, AllowedGroups), node_call(Host, Type, get_items, [Nidx, From, AccessModel, PS, RG, undefined, RSM]). -get_last_items(Host, Type, Nidx, LJID, Count) -> +get_last_items(Host, Type, Nidx, LJID, 1) -> case get_cached_item(Host, Nidx) of undefined -> - case node_action(Host, Type, get_last_items, [Nidx, LJID, Count]) of + case node_action(Host, Type, get_last_items, [Nidx, LJID, 1]) of {result, Items} -> Items; - _ -> [] + _ -> [] end; LastItem -> [LastItem] - end. - -%% @docResend the items of a node to the user.
-%% @todo use cache-last-item feature -send_items(Host, Node, Nidx, Type, Options, LJID, last) -> - case get_last_items(Host, Type, Nidx, LJID, 1) of - [LastItem] -> - Stanza = items_event_stanza(Node, Options, [LastItem]), - dispatch_items(Host, LJID, Node, Stanza); - _ -> - ok end; -send_items(Host, Node, Nidx, Type, Options, LJID, Number) when Number > 0 -> - Stanza = items_event_stanza(Node, Options, get_last_items(Host, Type, Nidx, Number, LJID)), - dispatch_items(Host, LJID, Node, Stanza); -send_items(Host, Node, _Nidx, _Type, Options, LJID, _) -> - Stanza = items_event_stanza(Node, Options, []), - dispatch_items(Host, LJID, Node, Stanza). - -dispatch_items({FromU, FromS, FromR}, To, Node, Stanza) -> - SenderResource = user_resource(FromU, FromS, FromR), - ejabberd_sm:route(jid:make(FromU, FromS, SenderResource), - {send_filtered, {pep_message, <<((Node))/binary, "+notify">>}, - jid:make(FromU, FromS), jid:make(To), - Stanza}); -dispatch_items(From, To, _Node, Stanza) -> - ejabberd_router:route( - xmpp:set_from_to(Stanza, service_jid(From), jid:make(To))). +get_last_items(Host, Type, Nidx, LJID, Count) when Count > 1 -> + case node_action(Host, Type, get_last_items, [Nidx, LJID, Count]) of + {result, Items} -> Items; + _ -> [] + end; +get_last_items(_Host, _Type, _Nidx, _LJID, _Count) -> + []. %% @docReturn the list of affiliations as an XMPP response.
-spec get_affiliations(host(), binary(), jid(), [binary()]) -> @@ -2536,14 +2402,15 @@ get_subscriptions_for_send_last(Host, PType, sql, JID, LJID, BJID) -> {result, Subs} = node_action(Host, PType, get_entity_subscriptions_for_send_last, [Host, JID]), - [{Node, Sub, SubId, SubJID} + [{Node, SubId, SubJID} || {Node, Sub, SubId, SubJID} <- Subs, Sub =:= subscribed, (SubJID == LJID) or (SubJID == BJID)]; + % sql version already filter result by on_sub_and_presence get_subscriptions_for_send_last(Host, PType, _, JID, LJID, BJID) -> {result, Subs} = node_action(Host, PType, get_entity_subscriptions, [Host, JID]), - [{Node, Sub, SubId, SubJID} + [{Node, SubId, SubJID} || {Node, Sub, SubId, SubJID} <- Subs, Sub =:= subscribed, (SubJID == LJID) or (SubJID == BJID), match_option(Node, send_last_published_item, on_sub_and_presence)]. @@ -2932,8 +2799,9 @@ get_options_for_subs(Host, Nidx, Subs, true) -> broadcast_stanza(Host, _Node, _Nidx, _Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM) -> NotificationType = get_option(NodeOptions, notification_type, headline), BroadcastAll = get_option(NodeOptions, broadcast_all_resources), %% XXX this is not standard, but usefull - From = service_jid(Host), - Stanza = add_message_type(BaseStanza, NotificationType), + Stanza = add_message_type( + xmpp:set_from(BaseStanza, service_jid(Host)), + NotificationType), %% Handles explicit subscriptions SubIDsByJID = subscribed_nodes_by_jid(NotifyType, SubsByDepth), lists:foreach(fun ({LJID, _NodeName, SubIDs}) -> @@ -2956,7 +2824,7 @@ broadcast_stanza(Host, _Node, _Nidx, _Type, NodeOptions, SubsByDepth, NotifyType end, lists:foreach(fun(To) -> ejabberd_router:route( - xmpp:set_from_to(StanzaToSend, From, jid:make(To))) + xmpp:set_to(StanzaToSend, jid:make(To))) end, LJIDs) end, SubIDsByJID). @@ -2965,55 +2833,142 @@ broadcast_stanza({LUser, LServer, LResource}, Publisher, Node, Nidx, Type, NodeO %% Handles implicit presence subscriptions SenderResource = user_resource(LUser, LServer, LResource), NotificationType = get_option(NodeOptions, notification_type, headline), - Stanza = add_message_type(BaseStanza, NotificationType), + Stanza = add_message_type( + xmpp:set_from(BaseStanza, jid:make(LUser, LServer)), + NotificationType), %% set the from address on the notification to the bare JID of the account owner %% Also, add "replyto" if entity has presence subscription to the account owner %% See XEP-0163 1.1 section 4.3.1 ejabberd_sm:route(jid:make(LUser, LServer, SenderResource), {pep_message, <<((Node))/binary, "+notify">>, - jid:make(LUser, LServer), add_extended_headers( Stanza, extended_headers([Publisher]))}); broadcast_stanza(Host, _Publisher, Node, Nidx, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM) -> broadcast_stanza(Host, Node, Nidx, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM). -spec c2s_handle_info(ejabberd_c2s:state(), term()) -> ejabberd_c2s:state(). -c2s_handle_info(#{server := Server} = C2SState, - {pep_message, Feature, From, Packet}) -> - LServer = jid:nameprep(Server), - lists:foreach( - fun({USR, Caps}) -> - Features = mod_caps:get_features(LServer, Caps), - case lists:member(Feature, Features) of - true -> - To = jid:make(USR), - NewPacket = xmpp:set_from_to(Packet, From, To), - ejabberd_router:route(NewPacket); - false -> - ok - end - end, mod_caps:list_features(C2SState)), +c2s_handle_info(#{lserver := LServer} = C2SState, + {pep_message, Feature, Packet}) -> + [maybe_send_pep_stanza(LServer, USR, Caps, Feature, Packet) + || {USR, Caps} <- mod_caps:list_features(C2SState)], {stop, C2SState}; -c2s_handle_info(#{server := Server} = C2SState, - {send_filtered, {pep_message, Feature}, From, To, Packet}) -> - LServer = jid:nameprep(Server), - case mod_caps:get_user_caps(To, C2SState) of - {ok, Caps} -> - Features = mod_caps:get_features(LServer, Caps), - case lists:member(Feature, Features) of - true -> - NewPacket = xmpp:set_from_to(Packet, From, To), - ejabberd_router:route(NewPacket); - false -> - ok - end; - error -> - ok +c2s_handle_info(#{lserver := LServer} = C2SState, + {pep_message, Feature, Packet, USR}) -> + case mod_caps:get_user_caps(USR, C2SState) of + {ok, Caps} -> maybe_send_pep_stanza(LServer, USR, Caps, Feature, Packet); + error -> ok end, {stop, C2SState}; c2s_handle_info(C2SState, _) -> C2SState. +send_items(Host, Node, Nidx, Type, Options, LJID, Number) -> + send_items(Host, Node, Nidx, Type, Options, Host, LJID, LJID, Number). +send_items(Host, Node, Nidx, Type, Options, Publisher, SubLJID, ToLJID, Number) -> + case get_last_items(Host, Type, Nidx, SubLJID, Number) of + [] -> + ok; + Items -> + Stanza = items_event_stanza(Node, Options, Items), + send_stanza(Publisher, ToLJID, Node, Stanza) + end. + +send_stanza({LUser, LServer, _} = Publisher, USR, Node, BaseStanza) -> + Stanza = xmpp:set_from(BaseStanza, jid:make(LUser, LServer)), + USRs = case USR of + {PUser, PServer, <<>>} -> + [{PUser, PServer, PRessource} + || PRessource <- user_resources(PUser, PServer)]; + _ -> + [USR] + end, + [ejabberd_sm:route(jid:make(Publisher), + {pep_message, <<((Node))/binary, "+notify">>, + add_extended_headers( + Stanza, extended_headers([Publisher])), + To}) || To <- USRs]; +send_stanza(Host, USR, _Node, Stanza) -> + ejabberd_router:route( + xmpp:set_from_to(Stanza, service_jid(Host), jid:make(USR))). + +maybe_send_pep_stanza(LServer, USR, Caps, Feature, Packet) -> + Features = mod_caps:get_features(LServer, Caps), + case lists:member(Feature, Features) of + true -> + ejabberd_router:route(xmpp:set_to(Packet, jid:make(USR))); + false -> + ok + end. + +send_last_items(JID) -> + ServerHost = JID#jid.lserver, + Host = host(ServerHost), + DBType = config(ServerHost, db_type), + LJID = jid:tolower(JID), + BJID = jid:remove_resource(LJID), + lists:foreach( + fun(PType) -> + Subs = get_subscriptions_for_send_last(Host, PType, DBType, JID, LJID, BJID), + lists:foreach( + fun({#pubsub_node{nodeid = {_, Node}, type = Type, id = Nidx, + options = Options}, _, SubJID}) + when Type == PType-> + send_items(Host, Node, Nidx, PType, Options, Host, SubJID, LJID, 1); + (_) -> + ok + end, + lists:usort(Subs)) + end, config(ServerHost, plugins)). +% pep_from_offline hack can not work anymore, as sender c2s does not +% exists when sender is offline, so we can't get match receiver caps +% does it make sens to send PEP from an offline contact anyway ? +% case config(ServerHost, ignore_pep_from_offline) of +% false -> +% Roster = ejabberd_hooks:run_fold(roster_get, ServerHost, [], +% [{JID#jid.luser, ServerHost}]), +% lists:foreach( +% fun(#roster{jid = {U, S, R}, subscription = Sub}) +% when Sub == both orelse Sub == from, +% S == ServerHost -> +% case user_resources(U, S) of +% [] -> send_last_pep(jid:make(U, S, R), JID); +% _ -> ok %% this is already handled by presence probe +% end; +% (_) -> +% ok %% we can not do anything in any cases +% end, Roster); +% true -> +% ok +% end. +send_last_pep(From, To) -> + ServerHost = From#jid.lserver, + Host = host(ServerHost), + Publisher = jid:tolower(From), + Owner = jid:remove_resource(Publisher), + lists:foreach( + fun(#pubsub_node{nodeid = {_, Node}, type = Type, id = Nidx, options = Options}) -> + case match_option(Options, send_last_published_item, on_sub_and_presence) of + true -> + LJID = jid:tolower(To), + Subscribed = case get_option(Options, access_model) of + open -> true; + presence -> true; + whitelist -> false; % subscribers are added manually + authorize -> false; % likewise + roster -> + Grps = get_option(Options, roster_groups_allowed, []), + {OU, OS, _} = Owner, + element(2, get_roster_info(OU, OS, LJID, Grps)) + end, + if Subscribed -> send_items(Owner, Node, Nidx, Type, Options, Publisher, LJID, LJID, 1); + true -> ok + end; + _ -> + ok + end + end, + tree_action(Host, get_nodes, [Owner, From])). + subscribed_nodes_by_jid(NotifyType, SubsByDepth) -> NodesToDeliver = fun (Depth, Node, Subs, Acc) -> NodeName = case Node#pubsub_node.nodeid of @@ -3186,9 +3141,6 @@ node_owners_call(_Host, _Type, _Nidx, Owners) -> %% @docReturn the maximum number of items for a given node.
%%Unlimited means that there is no limit in the number of items that can %% be stored.
-%% @todo In practice, the current data structure means that we cannot manage -%% millions of items on a given node. This should be addressed in a new -%% version. -spec max_items(host(), [{atom(), any()}]) -> non_neg_integer(). max_items(Host, Options) -> case get_option(Options, persist_items) of @@ -3792,14 +3744,6 @@ subid_shim(SubIds) -> extended_headers(Jids) -> [#address{type = replyto, jid = Jid} || Jid <- Jids]. --spec on_user_offline(ejabberd_sm:sid(), jid(), ejabberd_sm:info()) -> ok. -on_user_offline(_, JID, _) -> - {User, Server, Resource} = jid:tolower(JID), - case user_resources(User, Server) of - [] -> purge_offline({User, Server, Resource}); - _ -> ok - end. - -spec purge_offline(ljid()) -> ok. purge_offline(LJID) -> Host = host(element(2, LJID)), @@ -3840,7 +3784,7 @@ purge_offline(LJID) -> end end, lists:usort(lists:flatten(Affs))); {Error, _} -> - ?DEBUG("on_user_offline ~p", [Error]) + ?ERROR_MSG("can not purge offline: ~p", [Error]) end. -spec purge_offline(host(), ljid(), binary()) -> ok | {error, stanza_error()}. @@ -3852,13 +3796,13 @@ purge_offline(Host, LJID, Node) -> {result, {[], _}} -> ok; {result, {Items, _}} -> - {User, Server, _} = LJID, + {User, Server, Resource} = LJID, PublishModel = get_option(Options, publish_model), ForceNotify = get_option(Options, notify_retract), {_, NodeId} = Node#pubsub_node.nodeid, lists:foreach(fun - (#pubsub_item{itemid = {ItemId, _}, modification = {_, {U, S, _}}}) - when (U == User) and (S == Server) -> + (#pubsub_item{itemid = {ItemId, _}, modification = {_, {U, S, R}}}) + when (U == User) and (S == Server) and (R == Resource) -> case node_action(Host, Type, delete_item, [Nidx, {U, S, <<>>}, PublishModel, ItemId]) of {result, {_, broadcast}} -> broadcast_retract_items(Host, NodeId, Nidx, Type, Options, [ItemId], ForceNotify),