From 5a6f8378000e0ab17c5c603366ace023a127f0de Mon Sep 17 00:00:00 2001 From: Christophe Romain Date: Wed, 8 Apr 2009 15:53:46 +0000 Subject: [PATCH] pubsub: prevent blocking when sending lots of items, send last items to connected resource only, and cosmetic changes SVN Revision: 2005 --- src/mod_pubsub/mod_pubsub.erl | 321 ++++++++++++++++++---------------- src/mod_pubsub/node_flat.erl | 6 +- 2 files changed, 169 insertions(+), 158 deletions(-) diff --git a/src/mod_pubsub/mod_pubsub.erl b/src/mod_pubsub/mod_pubsub.erl index 5310df12b..e946a46a6 100644 --- a/src/mod_pubsub/mod_pubsub.erl +++ b/src/mod_pubsub/mod_pubsub.erl @@ -75,6 +75,8 @@ unsubscribe_node/5, publish_item/6, delete_item/4, + send_items/4, + broadcast_stanza/6, get_configure/5, set_configure/5, get_items/3, @@ -479,27 +481,30 @@ handle_call(stop, _From, State) -> handle_cast({presence, JID}, State) -> %% A new resource is available. send last published items Host = State#state.host, + LJID = jlib:jid_tolower(JID), %% for each node From is subscribed to %% and if the node is so configured, send the last published item to From - lists:foreach(fun(Type) -> - {result, Subscriptions} = node_action(Type, get_entity_subscriptions, [Host, JID]), - lists:foreach( - fun({Node, subscribed, SubJID}) -> - case tree_action(Host, get_node, [Host, Node, JID]) of - #pubsub_node{options = Options} -> - case get_option(Options, send_last_published_item) of - on_sub_and_presence -> - send_last_item(Host, Node, SubJID); - _ -> - ok - end; - _ -> - ok - end; + spawn(fun() -> + lists:foreach(fun(Type) -> + {result, Subscriptions} = node_action(Type, get_entity_subscriptions, [Host, JID]), + lists:foreach( + fun({Node, subscribed, _SubJID}) -> + case tree_action(Host, get_node, [Host, Node, JID]) of + #pubsub_node{options = Options} -> + case get_option(Options, send_last_published_item) of + on_sub_and_presence -> + send_items(Host, Node, LJID, last); + _ -> + ok + end; + _ -> + ok + end; (_) -> - ok - end, Subscriptions) - end, State#state.plugins), + ok + end, Subscriptions) + end, State#state.plugins) + end), {noreply, State}; handle_cast({presence, User, Server, Resources, JID}, State) -> @@ -507,36 +512,38 @@ handle_cast({presence, User, Server, Resources, JID}, State) -> Owner = jlib:jid_remove_resource(jlib:jid_tolower(JID)), Host = State#state.host, ServerHost = State#state.server_host, - lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, options = Options}) -> - case get_option(Options, send_last_published_item) of - on_sub_and_presence -> - lists:foreach(fun(Resource) -> - LJID = {User, Server, Resource}, - case is_caps_notify(ServerHost, Node, LJID) of - true -> - Subscribed = case get_option(Options, access_model) of - open -> true; - presence -> true; - whitelist -> false; % subscribers are added manually - authorize -> false; % likewise - roster -> - Grps = get_option(Options, roster_groups_allowed, []), - {OU, OS, _} = Owner, - element(2, get_roster_info(OU, OS, LJID, Grps)) - end, - if Subscribed -> - send_last_item(Owner, Node, LJID); + spawn(fun() -> + lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, options = Options}) -> + case get_option(Options, send_last_published_item) of + on_sub_and_presence -> + lists:foreach(fun(Resource) -> + LJID = {User, Server, Resource}, + case is_caps_notify(ServerHost, Node, LJID) of true -> + Subscribed = case get_option(Options, access_model) of + open -> true; + presence -> true; + whitelist -> false; % subscribers are added manually + authorize -> false; % likewise + roster -> + Grps = get_option(Options, roster_groups_allowed, []), + {OU, OS, _} = Owner, + element(2, get_roster_info(OU, OS, LJID, Grps)) + end, + if Subscribed -> + send_items(Owner, Node, LJID, last); + true -> + ok + end; + false -> ok - end; - false -> - ok - end - end, Resources); - _ -> - ok - end - end, tree_action(Host, get_nodes, [Owner, JID])), + end + end, Resources); + _ -> + ok + end + end, tree_action(Host, get_nodes, [Owner, JID])) + end), {noreply, State}; handle_cast({remove_user, LUser, LServer}, State) -> @@ -836,10 +843,7 @@ iq_disco_items(Host, Item, From) -> transaction(Host, Node, Action, sync_dirty) end. -iq_local(From, To, #iq{type = Type, - sub_el = SubEl, - xmlns = XMLNS, - lang = Lang} = IQ) -> +iq_local(From, To, #iq{type = Type, sub_el = SubEl, xmlns = XMLNS, lang = Lang} = IQ) -> ServerHost = To#jid.lserver, %% Accept IQs to server only from our own users. if @@ -1091,14 +1095,14 @@ find_authorization_response(Packet) -> %% @spec (Host, JID, Node, Subscription) -> void %% Host = mod_pubsub:host() %% JID = jlib:jid() -%% Node = string() +%% SNode = string() %% Subscription = atom() %% Plugins = [Plugin::string()] %% @doc Send a message to JID with the supplied Subscription -send_authorization_approval(Host, JID, Node, Subscription) -> +send_authorization_approval(Host, JID, SNode, Subscription) -> Stanza = event_stanza( [{xmlelement, "subscription", - [{"node", Node}, + [{"node", SNode}, {"jid", jlib:jid_to_string(JID)}, {"subscription", subscription_to_string(Subscription)}], []}]), @@ -1450,7 +1454,7 @@ subscribe_node(Host, Node, From, JID) -> {error, Error} -> {error, Error}; {result, {Result, subscribed, send_last}} -> - send_last_item(Host, Node, Subscriber), + send_items(Host, Node, Subscriber, last), case Result of default -> {result, Reply(subscribed)}; _ -> {result, Result} @@ -1539,10 +1543,10 @@ publish_item(Host, ServerHost, Node, Publisher, ItemId, Payload) -> PayloadSize > PayloadMaxSize -> %% Entity attempts to publish very large payload {error, extended_error(?ERR_NOT_ACCEPTABLE, "payload-too-big")}; - PayloadCount > 1 -> + PayloadCount =/= 1 -> %% Entity attempts to publish item with multiple payload elements {error, extended_error(?ERR_BAD_REQUEST, "invalid-payload")}; - Payload == "" -> + Payload == "" -> %% TODO better use PayloadSize == 0 ? %% Publisher attempts to publish to payload node with no payload {error, extended_error(?ERR_BAD_REQUEST, "payload-required")}; (DeliverPayloads == 0) and (PersistItems == 0) and (PayloadSize > 0) -> @@ -1760,18 +1764,9 @@ get_items(Host, Node, From, SubId, SMaxItems, ItemIDs) -> end, %% Generate the XML response (Item list), limiting the %% number of items sent to MaxItems: - ItemsEls = lists:map( - fun(#pubsub_item{itemid = {ItemId, _}, - payload = Payload}) -> - ItemAttrs = case ItemId of - "" -> []; - _ -> [{"id", ItemId}] - end, - {xmlelement, "item", ItemAttrs, Payload} - end, lists:sublist(SendItems, MaxItems)), {result, [{xmlelement, "pubsub", [{"xmlns", ?NS_PUBSUB}], [{xmlelement, "items", [{"node", node_to_string(Node)}], - ItemsEls}]}]} + itemsEls(lists:sublist(SendItems, MaxItems))}]}]} end end. @@ -1781,14 +1776,6 @@ get_items(Host, Node, From) -> _ -> [] end. -%% @spec (Host, Node, LJID) -> any() -%% Host = host() -%% Node = pubsubNode() -%% LJID = {U, S, []} -%% @doc

Resend the last item of a node to the user.

-send_last_item(Host, Node, LJID) -> - send_items(Host, Node, LJID, last). - %% @spec (Host, Node, LJID, Number) -> any() %% Host = host() %% Node = pubsubNode() @@ -1821,17 +1808,9 @@ send_items(Host, Node, LJID, Number) -> Items end end, - ItemsEls = lists:map( - fun(#pubsub_item{itemid = {ItemId, _}, payload = Payload}) -> - ItemAttrs = case ItemId of - "" -> []; - _ -> [{"id", ItemId}] - end, - {xmlelement, "item", ItemAttrs, Payload} - end, ToSend), Stanza = event_stanza( [{xmlelement, "items", [{"node", node_to_string(Node)}], - ItemsEls}]), + itemsEls(ToSend)}]), ejabberd_router ! {route, service_jid(Host), jlib:make_jid(LJID), Stanza}. %% @spec (Host, JID, Plugins) -> {error, Reason} | {result, Response} @@ -2210,6 +2189,7 @@ event_stanza(Els) -> %%%%%% broadcast functions broadcast_publish_item(Host, Node, ItemId, _From, Payload) -> + %broadcast(Host, Node, none, true, "items", ItemEls) Action = fun(#pubsub_node{options = Options, type = Type}) -> case node_call(Type, get_states, [Host, Node]) of @@ -2220,15 +2200,10 @@ broadcast_publish_item(Host, Node, ItemId, _From, Payload) -> true -> Payload; false -> [] end, - ItemAttrs = case ItemId of - "" -> []; - _ -> [{"id", ItemId}] - end, Stanza = event_stanza( [{xmlelement, "items", [{"node", node_to_string(Node)}], - [{xmlelement, "item", ItemAttrs, Content}]}]), - broadcast_stanza(Host, Options, States, Stanza), - broadcast_by_caps(Host, Node, Type, Stanza), + [{xmlelement, "item", itemAttr(ItemId), Content}]}]), + broadcast_stanza(Host, Node, Type, Options, States, Stanza), {result, true}; _ -> {result, false} @@ -2239,6 +2214,7 @@ broadcast_publish_item(Host, Node, ItemId, _From, Payload) -> broadcast_retract_items(Host, Node, ItemIds) -> broadcast_retract_items(Host, Node, ItemIds, false). broadcast_retract_items(Host, Node, ItemIds, ForceNotify) -> + %broadcast(Host, Node, notify_retract, ForceNotify, "retract", RetractEls) Action = fun(#pubsub_node{options = Options, type = Type}) -> case (get_option(Options, notify_retract) or ForceNotify) of @@ -2247,19 +2223,11 @@ broadcast_retract_items(Host, Node, ItemIds, ForceNotify) -> {result, []} -> {result, false}; {result, States} -> - RetractEls = lists:map( - fun(ItemId) -> - ItemAttrs = case ItemId of - "" -> []; - _ -> [{"id", ItemId}] - end, - {xmlelement, "retract", ItemAttrs, []} - end, ItemIds), + RetractEls = [{xmlelement, "retract", itemAttr(ItemId), []} || ItemId <- ItemIds], Stanza = event_stanza( [{xmlelement, "items", [{"node", node_to_string(Node)}], RetractEls}]), - broadcast_stanza(Host, Options, States, Stanza), - broadcast_by_caps(Host, Node, Type, Stanza), + broadcast_stanza(Host, Node, Type, Options, States, Stanza), {result, true}; _ -> {result, false} @@ -2271,6 +2239,7 @@ broadcast_retract_items(Host, Node, ItemIds, ForceNotify) -> transaction(Host, Node, Action, sync_dirty). broadcast_purge_node(Host, Node) -> + %broadcast(Host, Node, notify_retract, false, "purge", []) Action = fun(#pubsub_node{options = Options, type = Type}) -> case get_option(Options, notify_retract) of @@ -2280,9 +2249,9 @@ broadcast_purge_node(Host, Node) -> {result, false}; {result, States} -> Stanza = event_stanza( - [{xmlelement, "purge", [{"node", node_to_string(Node)}], []}]), - broadcast_stanza(Host, Options, States, Stanza), - broadcast_by_caps(Host, Node, Type, Stanza), + [{xmlelement, "purge", [{"node", node_to_string(Node)}], + []}]), + broadcast_stanza(Host, Node, Type, Options, States, Stanza), {result, true}; _ -> {result, false} @@ -2294,6 +2263,7 @@ broadcast_purge_node(Host, Node) -> transaction(Host, Node, Action, sync_dirty). broadcast_removed_node(Host, Node) -> + %broadcast(Host, Node, notify_delete, false, "delete", []) Action = fun(#pubsub_node{options = Options, type = Type}) -> case get_option(Options, notify_delete) of @@ -2303,9 +2273,9 @@ broadcast_removed_node(Host, Node) -> {result, false}; {result, States} -> Stanza = event_stanza( - [{xmlelement, "delete", [{"node", node_to_string(Node)}], []}]), - broadcast_stanza(Host, Options, States, Stanza), - broadcast_by_caps(Host, Node, Type, Stanza), + [{xmlelement, "delete", [{"node", node_to_string(Node)}], + []}]), + broadcast_stanza(Host, Node, Type, Options, States, Stanza), {result, true}; _ -> {result, false} @@ -2317,6 +2287,7 @@ broadcast_removed_node(Host, Node) -> transaction(Host, Node, Action, sync_dirty). broadcast_config_notification(Host, Node, Lang) -> + %broadcast(Host, Node, notify_config, false, "items", ConfigEls) Action = fun(#pubsub_node{options = Options, owners = Owners, type = Type}) -> case get_option(Options, notify_config) of @@ -2334,10 +2305,8 @@ broadcast_config_notification(Host, Node, Lang) -> end, Stanza = event_stanza( [{xmlelement, "items", [{"node", node_to_string(Node)}], - [{xmlelement, "item", [{"id", "configuration"}], - Content}]}]), - broadcast_stanza(Host, Options, States, Stanza), - broadcast_by_caps(Host, Node, Type, Stanza), + [{xmlelement, "item", itemAttr("configuration"), Content}]}]), + broadcast_stanza(Host, Node, Type, Options, States, Stanza), {result, true}; _ -> {result, false} @@ -2348,63 +2317,92 @@ broadcast_config_notification(Host, Node, Lang) -> end, transaction(Host, Node, Action, sync_dirty). -broadcast_stanza(Host, NodeOpts, States, Stanza) -> - PresenceDelivery = get_option(NodeOpts, presence_based_delivery), - BroadcastAll = get_option(NodeOpts, broadcast_all_resources), %% XXX this is not standard +% TODO: merge broadcast code that way +%broadcast(Host, Node, Feature, Force, ElName, SubEls) -> +% Action = +% fun(#pubsub_node{options = Options, type = Type}) -> +% case (get_option(Options, Feature) or Force) of +% true -> +% case node_call(Type, get_states, [Host, Node]) of +% {result, []} -> +% {result, false}; +% {result, States} -> +% Stanza = event_stanza([{xmlelement, ElName, [{"node", node_to_string(Node)}], SubEls}]), +% broadcast_stanza(Host, Node, Type, Options, States, Stanza), +% {result, true}; +% _ -> +% {result, false} +% end; +% _ -> +% {result, false} +% end +% end, +% transaction(Host, Node, Action, sync_dirty). + +broadcast_stanza(Host, Node, _Type, Options, States, Stanza) -> + AccessModel = get_option(Options, access_model), + PresenceDelivery = get_option(Options, presence_based_delivery), + BroadcastAll = get_option(Options, broadcast_all_resources), %% XXX this is not standard, but usefull From = service_jid(Host), + %% Handles explicit subscriptions lists:foreach(fun(#pubsub_state{stateid = {LJID, _}, subscription = Subs}) -> case is_to_deliver(LJID, Subs, PresenceDelivery) of true -> - JIDs = case BroadcastAll of - true -> ejabberd_sm:get_user_resources(element(1, LJID), element(2, LJID)); - false -> [LJID] + To = case BroadcastAll of + true -> jlib:jid_remove_resource(LJID); + false -> LJID end, - lists:foreach(fun(JID) -> - ejabberd_router ! {route, From, jlib:make_jid(JID), Stanza} - end, JIDs); + ejabberd_router ! {route, From, jlib:make_jid(To), Stanza}; false -> ok end - end, States). - -%% broadcast Stanza to all contacts of the user that are advertising -%% interest in this kind of Node. -broadcast_by_caps({LUser, LServer, LResource}, Node, _Type, Stanza) -> - SenderResource = case LResource of - [] -> hd(user_resources(LUser, LServer)); - _ -> LResource - end, - case ejabberd_sm:get_session_pid(LUser, LServer, SenderResource) of - C2SPid when is_pid(C2SPid) -> - %% set the from address on the notification to the bare JID of the account owner - %% Also, add "replyto" if entity has presence subscription to the account owner - %% See XEP-0163 1.1 section 4.3.1 - Sender = jlib:make_jid(LUser, LServer, ""), - %%ReplyTo = jlib:make_jid(LUser, LServer, SenderResource), % This has to be used - case catch ejabberd_c2s:get_subscribed(C2SPid) of - Contacts when is_list(Contacts) -> - lists:foreach(fun({U, S, _}) -> - JIDs = lists:foldl(fun(R, Acc) -> - LJID = {U, S, R}, - case is_caps_notify(LServer, Node, LJID) of - true -> [LJID | Acc]; - false -> Acc - end - end, [], user_resources(U, S)), - lists:foreach(fun(JID) -> - ejabberd_router ! {route, Sender, jlib:make_jid(JID), Stanza} - end, JIDs) - end, Contacts); + end, States), + %% Handles implicit presence subscriptions + case Host of + {LUser, LServer, LResource} -> + SenderResource = case LResource of + [] -> + case user_resources(LUser, LServer) of + [Resource|_] -> Resource; + _ -> "" + end; _ -> - ok + LResource end, - ok; + case ejabberd_sm:get_session_pid(LUser, LServer, SenderResource) of + C2SPid when is_pid(C2SPid) -> + %% set the from address on the notification to the bare JID of the account owner + %% Also, add "replyto" if entity has presence subscription to the account owner + %% See XEP-0163 1.1 section 4.3.1 + Sender = jlib:make_jid(LUser, LServer, ""), + %%ReplyTo = jlib:make_jid(LUser, LServer, SenderResource), % This has to be used + case catch ejabberd_c2s:get_subscribed(C2SPid) of + Contacts when is_list(Contacts) -> + lists:foreach(fun({U, S, _}) -> + spawn(fun() -> + JIDs = lists:foldl(fun(R, Acc) -> + LJID = {U, S, R}, + case is_caps_notify(LServer, Node, LJID) of + true -> [LJID | Acc]; + false -> Acc + end + end, [], user_resources(U, S)), + lists:foreach(fun(JID) -> + ejabberd_router ! {route, Sender, jlib:make_jid(JID), Stanza} + end, JIDs) + end) + end, Contacts); + _ -> + ok + end, + ok; + _ -> + ?DEBUG("~p@~p has no session; can't deliver ~p to contacts", [LUser, LServer, Stanza]), + ok + end; _ -> - ?DEBUG("~p@~p has no session; can't deliver ~p to contacts", [LUser, LServer, Stanza]), ok - end; -broadcast_by_caps(_, _, _, _) -> - ok. + end. %% If we don't know the resource, just pick first if any %% If no resource available, check if caps anyway (remote online) @@ -2861,3 +2859,16 @@ uniqid() -> {T1, T2, T3} = now(), lists:flatten(io_lib:fwrite("~.16B~.16B~.16B", [T1, T2, T3])). +% node attributes %%% TODO to be used +nodeAttr(Node) -> + [{"node", node_to_string(Node)}]. + +% item attributes +itemAttr([]) -> []; +itemAttr(ItemId) -> [{"id", ItemId}]. + +% build item elements from item list +itemsEls(Items) -> + lists:map(fun(#pubsub_item{itemid = {ItemId, _}, payload = Payload}) -> + {xmlelement, "item", itemAttr(ItemId), Payload} + end, Items). diff --git a/src/mod_pubsub/node_flat.erl b/src/mod_pubsub/node_flat.erl index 776ede574..a5abb26c1 100644 --- a/src/mod_pubsub/node_flat.erl +++ b/src/mod_pubsub/node_flat.erl @@ -16,7 +16,7 @@ %%% This software is copyright 2006-2009, ProcessOne. %%% %%% @copyright 2006-2009 ProcessOne -%%% @author Christophe romain +%%% @author Christophe Romain %%% [http://www.process-one.net/] %%% @version {@vsn}, {@date} {@time} %%% @end @@ -81,7 +81,7 @@ options() -> {roster_groups_allowed, []}, {publish_model, publishers}, {max_payload_size, ?MAX_PAYLOAD_SIZE}, - {send_last_published_item, never}, + {send_last_published_item, on_sub_and_presence}, {deliver_notifications, true}, {presence_based_delivery, false}]. @@ -166,7 +166,7 @@ get_items(Host, Node, JID, AccessModel, PresenceSubscription, RosterGroup, SubId get_item(Host, Node, ItemId) -> node_default:get_item(Host, Node, ItemId). - + get_item(Host, Node, ItemId, JID, AccessModel, PresenceSubscription, RosterGroup, SubId) -> node_default:get_item(Host, Node, ItemId, JID, AccessModel, PresenceSubscription, RosterGroup, SubId).