pubsub: prevent blocking when sending lots of items, send last items to connected resource only, and cosmetic changes

SVN Revision: 2005
This commit is contained in:
Christophe Romain 2009-04-08 15:53:46 +00:00
parent f80c848692
commit 5a6f837800
2 changed files with 169 additions and 158 deletions

View File

@ -75,6 +75,8 @@
unsubscribe_node/5,
publish_item/6,
delete_item/4,
send_items/4,
broadcast_stanza/6,
get_configure/5,
set_configure/5,
get_items/3,
@ -479,27 +481,30 @@ handle_call(stop, _From, State) ->
handle_cast({presence, JID}, State) ->
%% A new resource is available. send last published items
Host = State#state.host,
LJID = jlib:jid_tolower(JID),
%% for each node From is subscribed to
%% and if the node is so configured, send the last published item to From
lists:foreach(fun(Type) ->
{result, Subscriptions} = node_action(Type, get_entity_subscriptions, [Host, JID]),
lists:foreach(
fun({Node, subscribed, SubJID}) ->
case tree_action(Host, get_node, [Host, Node, JID]) of
#pubsub_node{options = Options} ->
case get_option(Options, send_last_published_item) of
on_sub_and_presence ->
send_last_item(Host, Node, SubJID);
_ ->
ok
end;
_ ->
ok
end;
spawn(fun() ->
lists:foreach(fun(Type) ->
{result, Subscriptions} = node_action(Type, get_entity_subscriptions, [Host, JID]),
lists:foreach(
fun({Node, subscribed, _SubJID}) ->
case tree_action(Host, get_node, [Host, Node, JID]) of
#pubsub_node{options = Options} ->
case get_option(Options, send_last_published_item) of
on_sub_and_presence ->
send_items(Host, Node, LJID, last);
_ ->
ok
end;
_ ->
ok
end;
(_) ->
ok
end, Subscriptions)
end, State#state.plugins),
ok
end, Subscriptions)
end, State#state.plugins)
end),
{noreply, State};
handle_cast({presence, User, Server, Resources, JID}, State) ->
@ -507,36 +512,38 @@ handle_cast({presence, User, Server, Resources, JID}, State) ->
Owner = jlib:jid_remove_resource(jlib:jid_tolower(JID)),
Host = State#state.host,
ServerHost = State#state.server_host,
lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, options = Options}) ->
case get_option(Options, send_last_published_item) of
on_sub_and_presence ->
lists:foreach(fun(Resource) ->
LJID = {User, Server, Resource},
case is_caps_notify(ServerHost, Node, LJID) of
true ->
Subscribed = case get_option(Options, access_model) of
open -> true;
presence -> true;
whitelist -> false; % subscribers are added manually
authorize -> false; % likewise
roster ->
Grps = get_option(Options, roster_groups_allowed, []),
{OU, OS, _} = Owner,
element(2, get_roster_info(OU, OS, LJID, Grps))
end,
if Subscribed ->
send_last_item(Owner, Node, LJID);
spawn(fun() ->
lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, options = Options}) ->
case get_option(Options, send_last_published_item) of
on_sub_and_presence ->
lists:foreach(fun(Resource) ->
LJID = {User, Server, Resource},
case is_caps_notify(ServerHost, Node, LJID) of
true ->
Subscribed = case get_option(Options, access_model) of
open -> true;
presence -> true;
whitelist -> false; % subscribers are added manually
authorize -> false; % likewise
roster ->
Grps = get_option(Options, roster_groups_allowed, []),
{OU, OS, _} = Owner,
element(2, get_roster_info(OU, OS, LJID, Grps))
end,
if Subscribed ->
send_items(Owner, Node, LJID, last);
true ->
ok
end;
false ->
ok
end;
false ->
ok
end
end, Resources);
_ ->
ok
end
end, tree_action(Host, get_nodes, [Owner, JID])),
end
end, Resources);
_ ->
ok
end
end, tree_action(Host, get_nodes, [Owner, JID]))
end),
{noreply, State};
handle_cast({remove_user, LUser, LServer}, State) ->
@ -836,10 +843,7 @@ iq_disco_items(Host, Item, From) ->
transaction(Host, Node, Action, sync_dirty)
end.
iq_local(From, To, #iq{type = Type,
sub_el = SubEl,
xmlns = XMLNS,
lang = Lang} = IQ) ->
iq_local(From, To, #iq{type = Type, sub_el = SubEl, xmlns = XMLNS, lang = Lang} = IQ) ->
ServerHost = To#jid.lserver,
%% Accept IQs to server only from our own users.
if
@ -1091,14 +1095,14 @@ find_authorization_response(Packet) ->
%% @spec (Host, JID, Node, Subscription) -> void
%% Host = mod_pubsub:host()
%% JID = jlib:jid()
%% Node = string()
%% SNode = string()
%% Subscription = atom()
%% Plugins = [Plugin::string()]
%% @doc Send a message to JID with the supplied Subscription
send_authorization_approval(Host, JID, Node, Subscription) ->
send_authorization_approval(Host, JID, SNode, Subscription) ->
Stanza = event_stanza(
[{xmlelement, "subscription",
[{"node", Node},
[{"node", SNode},
{"jid", jlib:jid_to_string(JID)},
{"subscription", subscription_to_string(Subscription)}],
[]}]),
@ -1450,7 +1454,7 @@ subscribe_node(Host, Node, From, JID) ->
{error, Error} ->
{error, Error};
{result, {Result, subscribed, send_last}} ->
send_last_item(Host, Node, Subscriber),
send_items(Host, Node, Subscriber, last),
case Result of
default -> {result, Reply(subscribed)};
_ -> {result, Result}
@ -1539,10 +1543,10 @@ publish_item(Host, ServerHost, Node, Publisher, ItemId, Payload) ->
PayloadSize > PayloadMaxSize ->
%% Entity attempts to publish very large payload
{error, extended_error(?ERR_NOT_ACCEPTABLE, "payload-too-big")};
PayloadCount > 1 ->
PayloadCount =/= 1 ->
%% Entity attempts to publish item with multiple payload elements
{error, extended_error(?ERR_BAD_REQUEST, "invalid-payload")};
Payload == "" ->
Payload == "" -> %% TODO better use PayloadSize == 0 ?
%% Publisher attempts to publish to payload node with no payload
{error, extended_error(?ERR_BAD_REQUEST, "payload-required")};
(DeliverPayloads == 0) and (PersistItems == 0) and (PayloadSize > 0) ->
@ -1760,18 +1764,9 @@ get_items(Host, Node, From, SubId, SMaxItems, ItemIDs) ->
end,
%% Generate the XML response (Item list), limiting the
%% number of items sent to MaxItems:
ItemsEls = lists:map(
fun(#pubsub_item{itemid = {ItemId, _},
payload = Payload}) ->
ItemAttrs = case ItemId of
"" -> [];
_ -> [{"id", ItemId}]
end,
{xmlelement, "item", ItemAttrs, Payload}
end, lists:sublist(SendItems, MaxItems)),
{result, [{xmlelement, "pubsub", [{"xmlns", ?NS_PUBSUB}],
[{xmlelement, "items", [{"node", node_to_string(Node)}],
ItemsEls}]}]}
itemsEls(lists:sublist(SendItems, MaxItems))}]}]}
end
end.
@ -1781,14 +1776,6 @@ get_items(Host, Node, From) ->
_ -> []
end.
%% @spec (Host, Node, LJID) -> any()
%% Host = host()
%% Node = pubsubNode()
%% LJID = {U, S, []}
%% @doc <p>Resend the last item of a node to the user.</p>
send_last_item(Host, Node, LJID) ->
send_items(Host, Node, LJID, last).
%% @spec (Host, Node, LJID, Number) -> any()
%% Host = host()
%% Node = pubsubNode()
@ -1821,17 +1808,9 @@ send_items(Host, Node, LJID, Number) ->
Items
end
end,
ItemsEls = lists:map(
fun(#pubsub_item{itemid = {ItemId, _}, payload = Payload}) ->
ItemAttrs = case ItemId of
"" -> [];
_ -> [{"id", ItemId}]
end,
{xmlelement, "item", ItemAttrs, Payload}
end, ToSend),
Stanza = event_stanza(
[{xmlelement, "items", [{"node", node_to_string(Node)}],
ItemsEls}]),
itemsEls(ToSend)}]),
ejabberd_router ! {route, service_jid(Host), jlib:make_jid(LJID), Stanza}.
%% @spec (Host, JID, Plugins) -> {error, Reason} | {result, Response}
@ -2210,6 +2189,7 @@ event_stanza(Els) ->
%%%%%% broadcast functions
broadcast_publish_item(Host, Node, ItemId, _From, Payload) ->
%broadcast(Host, Node, none, true, "items", ItemEls)
Action =
fun(#pubsub_node{options = Options, type = Type}) ->
case node_call(Type, get_states, [Host, Node]) of
@ -2220,15 +2200,10 @@ broadcast_publish_item(Host, Node, ItemId, _From, Payload) ->
true -> Payload;
false -> []
end,
ItemAttrs = case ItemId of
"" -> [];
_ -> [{"id", ItemId}]
end,
Stanza = event_stanza(
[{xmlelement, "items", [{"node", node_to_string(Node)}],
[{xmlelement, "item", ItemAttrs, Content}]}]),
broadcast_stanza(Host, Options, States, Stanza),
broadcast_by_caps(Host, Node, Type, Stanza),
[{xmlelement, "item", itemAttr(ItemId), Content}]}]),
broadcast_stanza(Host, Node, Type, Options, States, Stanza),
{result, true};
_ ->
{result, false}
@ -2239,6 +2214,7 @@ broadcast_publish_item(Host, Node, ItemId, _From, Payload) ->
broadcast_retract_items(Host, Node, ItemIds) ->
broadcast_retract_items(Host, Node, ItemIds, false).
broadcast_retract_items(Host, Node, ItemIds, ForceNotify) ->
%broadcast(Host, Node, notify_retract, ForceNotify, "retract", RetractEls)
Action =
fun(#pubsub_node{options = Options, type = Type}) ->
case (get_option(Options, notify_retract) or ForceNotify) of
@ -2247,19 +2223,11 @@ broadcast_retract_items(Host, Node, ItemIds, ForceNotify) ->
{result, []} ->
{result, false};
{result, States} ->
RetractEls = lists:map(
fun(ItemId) ->
ItemAttrs = case ItemId of
"" -> [];
_ -> [{"id", ItemId}]
end,
{xmlelement, "retract", ItemAttrs, []}
end, ItemIds),
RetractEls = [{xmlelement, "retract", itemAttr(ItemId), []} || ItemId <- ItemIds],
Stanza = event_stanza(
[{xmlelement, "items", [{"node", node_to_string(Node)}],
RetractEls}]),
broadcast_stanza(Host, Options, States, Stanza),
broadcast_by_caps(Host, Node, Type, Stanza),
broadcast_stanza(Host, Node, Type, Options, States, Stanza),
{result, true};
_ ->
{result, false}
@ -2271,6 +2239,7 @@ broadcast_retract_items(Host, Node, ItemIds, ForceNotify) ->
transaction(Host, Node, Action, sync_dirty).
broadcast_purge_node(Host, Node) ->
%broadcast(Host, Node, notify_retract, false, "purge", [])
Action =
fun(#pubsub_node{options = Options, type = Type}) ->
case get_option(Options, notify_retract) of
@ -2280,9 +2249,9 @@ broadcast_purge_node(Host, Node) ->
{result, false};
{result, States} ->
Stanza = event_stanza(
[{xmlelement, "purge", [{"node", node_to_string(Node)}], []}]),
broadcast_stanza(Host, Options, States, Stanza),
broadcast_by_caps(Host, Node, Type, Stanza),
[{xmlelement, "purge", [{"node", node_to_string(Node)}],
[]}]),
broadcast_stanza(Host, Node, Type, Options, States, Stanza),
{result, true};
_ ->
{result, false}
@ -2294,6 +2263,7 @@ broadcast_purge_node(Host, Node) ->
transaction(Host, Node, Action, sync_dirty).
broadcast_removed_node(Host, Node) ->
%broadcast(Host, Node, notify_delete, false, "delete", [])
Action =
fun(#pubsub_node{options = Options, type = Type}) ->
case get_option(Options, notify_delete) of
@ -2303,9 +2273,9 @@ broadcast_removed_node(Host, Node) ->
{result, false};
{result, States} ->
Stanza = event_stanza(
[{xmlelement, "delete", [{"node", node_to_string(Node)}], []}]),
broadcast_stanza(Host, Options, States, Stanza),
broadcast_by_caps(Host, Node, Type, Stanza),
[{xmlelement, "delete", [{"node", node_to_string(Node)}],
[]}]),
broadcast_stanza(Host, Node, Type, Options, States, Stanza),
{result, true};
_ ->
{result, false}
@ -2317,6 +2287,7 @@ broadcast_removed_node(Host, Node) ->
transaction(Host, Node, Action, sync_dirty).
broadcast_config_notification(Host, Node, Lang) ->
%broadcast(Host, Node, notify_config, false, "items", ConfigEls)
Action =
fun(#pubsub_node{options = Options, owners = Owners, type = Type}) ->
case get_option(Options, notify_config) of
@ -2334,10 +2305,8 @@ broadcast_config_notification(Host, Node, Lang) ->
end,
Stanza = event_stanza(
[{xmlelement, "items", [{"node", node_to_string(Node)}],
[{xmlelement, "item", [{"id", "configuration"}],
Content}]}]),
broadcast_stanza(Host, Options, States, Stanza),
broadcast_by_caps(Host, Node, Type, Stanza),
[{xmlelement, "item", itemAttr("configuration"), Content}]}]),
broadcast_stanza(Host, Node, Type, Options, States, Stanza),
{result, true};
_ ->
{result, false}
@ -2348,63 +2317,92 @@ broadcast_config_notification(Host, Node, Lang) ->
end,
transaction(Host, Node, Action, sync_dirty).
broadcast_stanza(Host, NodeOpts, States, Stanza) ->
PresenceDelivery = get_option(NodeOpts, presence_based_delivery),
BroadcastAll = get_option(NodeOpts, broadcast_all_resources), %% XXX this is not standard
% TODO: merge broadcast code that way
%broadcast(Host, Node, Feature, Force, ElName, SubEls) ->
% Action =
% fun(#pubsub_node{options = Options, type = Type}) ->
% case (get_option(Options, Feature) or Force) of
% true ->
% case node_call(Type, get_states, [Host, Node]) of
% {result, []} ->
% {result, false};
% {result, States} ->
% Stanza = event_stanza([{xmlelement, ElName, [{"node", node_to_string(Node)}], SubEls}]),
% broadcast_stanza(Host, Node, Type, Options, States, Stanza),
% {result, true};
% _ ->
% {result, false}
% end;
% _ ->
% {result, false}
% end
% end,
% transaction(Host, Node, Action, sync_dirty).
broadcast_stanza(Host, Node, _Type, Options, States, Stanza) ->
AccessModel = get_option(Options, access_model),
PresenceDelivery = get_option(Options, presence_based_delivery),
BroadcastAll = get_option(Options, broadcast_all_resources), %% XXX this is not standard, but usefull
From = service_jid(Host),
%% Handles explicit subscriptions
lists:foreach(fun(#pubsub_state{stateid = {LJID, _}, subscription = Subs}) ->
case is_to_deliver(LJID, Subs, PresenceDelivery) of
true ->
JIDs = case BroadcastAll of
true -> ejabberd_sm:get_user_resources(element(1, LJID), element(2, LJID));
false -> [LJID]
To = case BroadcastAll of
true -> jlib:jid_remove_resource(LJID);
false -> LJID
end,
lists:foreach(fun(JID) ->
ejabberd_router ! {route, From, jlib:make_jid(JID), Stanza}
end, JIDs);
ejabberd_router ! {route, From, jlib:make_jid(To), Stanza};
false ->
ok
end
end, States).
%% broadcast Stanza to all contacts of the user that are advertising
%% interest in this kind of Node.
broadcast_by_caps({LUser, LServer, LResource}, Node, _Type, Stanza) ->
SenderResource = case LResource of
[] -> hd(user_resources(LUser, LServer));
_ -> LResource
end,
case ejabberd_sm:get_session_pid(LUser, LServer, SenderResource) of
C2SPid when is_pid(C2SPid) ->
%% set the from address on the notification to the bare JID of the account owner
%% Also, add "replyto" if entity has presence subscription to the account owner
%% See XEP-0163 1.1 section 4.3.1
Sender = jlib:make_jid(LUser, LServer, ""),
%%ReplyTo = jlib:make_jid(LUser, LServer, SenderResource), % This has to be used
case catch ejabberd_c2s:get_subscribed(C2SPid) of
Contacts when is_list(Contacts) ->
lists:foreach(fun({U, S, _}) ->
JIDs = lists:foldl(fun(R, Acc) ->
LJID = {U, S, R},
case is_caps_notify(LServer, Node, LJID) of
true -> [LJID | Acc];
false -> Acc
end
end, [], user_resources(U, S)),
lists:foreach(fun(JID) ->
ejabberd_router ! {route, Sender, jlib:make_jid(JID), Stanza}
end, JIDs)
end, Contacts);
end, States),
%% Handles implicit presence subscriptions
case Host of
{LUser, LServer, LResource} ->
SenderResource = case LResource of
[] ->
case user_resources(LUser, LServer) of
[Resource|_] -> Resource;
_ -> ""
end;
_ ->
ok
LResource
end,
ok;
case ejabberd_sm:get_session_pid(LUser, LServer, SenderResource) of
C2SPid when is_pid(C2SPid) ->
%% set the from address on the notification to the bare JID of the account owner
%% Also, add "replyto" if entity has presence subscription to the account owner
%% See XEP-0163 1.1 section 4.3.1
Sender = jlib:make_jid(LUser, LServer, ""),
%%ReplyTo = jlib:make_jid(LUser, LServer, SenderResource), % This has to be used
case catch ejabberd_c2s:get_subscribed(C2SPid) of
Contacts when is_list(Contacts) ->
lists:foreach(fun({U, S, _}) ->
spawn(fun() ->
JIDs = lists:foldl(fun(R, Acc) ->
LJID = {U, S, R},
case is_caps_notify(LServer, Node, LJID) of
true -> [LJID | Acc];
false -> Acc
end
end, [], user_resources(U, S)),
lists:foreach(fun(JID) ->
ejabberd_router ! {route, Sender, jlib:make_jid(JID), Stanza}
end, JIDs)
end)
end, Contacts);
_ ->
ok
end,
ok;
_ ->
?DEBUG("~p@~p has no session; can't deliver ~p to contacts", [LUser, LServer, Stanza]),
ok
end;
_ ->
?DEBUG("~p@~p has no session; can't deliver ~p to contacts", [LUser, LServer, Stanza]),
ok
end;
broadcast_by_caps(_, _, _, _) ->
ok.
end.
%% If we don't know the resource, just pick first if any
%% If no resource available, check if caps anyway (remote online)
@ -2861,3 +2859,16 @@ uniqid() ->
{T1, T2, T3} = now(),
lists:flatten(io_lib:fwrite("~.16B~.16B~.16B", [T1, T2, T3])).
% node attributes %%% TODO to be used
nodeAttr(Node) ->
[{"node", node_to_string(Node)}].
% item attributes
itemAttr([]) -> [];
itemAttr(ItemId) -> [{"id", ItemId}].
% build item elements from item list
itemsEls(Items) ->
lists:map(fun(#pubsub_item{itemid = {ItemId, _}, payload = Payload}) ->
{xmlelement, "item", itemAttr(ItemId), Payload}
end, Items).

View File

@ -16,7 +16,7 @@
%%% This software is copyright 2006-2009, ProcessOne.
%%%
%%% @copyright 2006-2009 ProcessOne
%%% @author Christophe romain <christophe.romain@process-one.net>
%%% @author Christophe Romain <christophe.romain@process-one.net>
%%% [http://www.process-one.net/]
%%% @version {@vsn}, {@date} {@time}
%%% @end
@ -81,7 +81,7 @@ options() ->
{roster_groups_allowed, []},
{publish_model, publishers},
{max_payload_size, ?MAX_PAYLOAD_SIZE},
{send_last_published_item, never},
{send_last_published_item, on_sub_and_presence},
{deliver_notifications, true},
{presence_based_delivery, false}].
@ -166,7 +166,7 @@ get_items(Host, Node, JID, AccessModel, PresenceSubscription, RosterGroup, SubId
get_item(Host, Node, ItemId) ->
node_default:get_item(Host, Node, ItemId).
get_item(Host, Node, ItemId, JID, AccessModel, PresenceSubscription, RosterGroup, SubId) ->
node_default:get_item(Host, Node, ItemId, JID, AccessModel, PresenceSubscription, RosterGroup, SubId).