25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-11-26 16:26:24 +01:00

Updated to trunk r2532, fix subscription managment (by owner)

SVN Revision: 2536
This commit is contained in:
Pablo Polvorin 2009-08-25 19:54:44 +00:00
parent fcf66c2e75
commit a1a6469ed0
3 changed files with 207 additions and 141 deletions

View File

@ -35,7 +35,7 @@
%%% Functions concerning configuration should be rewritten. %%% Functions concerning configuration should be rewritten.
%%% %%%
%%% Support for subscription-options and multi-subscribe features was %%% Support for subscription-options and multi-subscribe features was
%%% added by Brian Cully <bjc@kublai.com>. Subscriptions and options are %%% added by Brian Cully (bjc AT kublai.com). Subscriptions and options are
%%% stored in the pubsub_subscription table, with a link to them provided %%% stored in the pubsub_subscription table, with a link to them provided
%%% by the subscriptions field of pubsub_state. For information on %%% by the subscriptions field of pubsub_state. For information on
%%% subscription-options and mulit-subscribe see XEP-0060 sections 6.1.6, %%% subscription-options and mulit-subscribe see XEP-0060 sections 6.1.6,
@ -272,9 +272,9 @@ terminate_plugins(Host, ServerHost, Plugins, TreePlugin) ->
TreePlugin:terminate(Host, ServerHost), TreePlugin:terminate(Host, ServerHost),
ok. ok.
init_nodes(_Host, _ServerHost) -> init_nodes(Host, ServerHost) ->
%create_node(Host, ServerHost, ["home"], service_jid(Host), "hometree"), create_node(Host, ServerHost, ["home"], service_jid(Host), "hometree"),
%create_node(Host, ServerHost, ["home", ServerHost], service_jid(Host), "hometree"), create_node(Host, ServerHost, ["home", ServerHost], service_jid(Host), "hometree"),
ok. ok.
update_node_database(Host, ServerHost) -> update_node_database(Host, ServerHost) ->
@ -284,7 +284,7 @@ update_node_database(Host, ServerHost) ->
[host_node, host_parent, info] -> [host_node, host_parent, info] ->
?INFO_MSG("upgrade node pubsub tables",[]), ?INFO_MSG("upgrade node pubsub tables",[]),
F = fun() -> F = fun() ->
lists:foldl( {Result, LastIdx} = lists:foldl(
fun({pubsub_node, NodeId, ParentId, {nodeinfo, Items, Options, Entities}}, {RecList, NodeIdx}) -> fun({pubsub_node, NodeId, ParentId, {nodeinfo, Items, Options, Entities}}, {RecList, NodeIdx}) ->
ItemsList = ItemsList =
lists:foldl( lists:foldl(
@ -328,7 +328,9 @@ update_node_database(Host, ServerHost) ->
RecList], NodeIdx + 1} RecList], NodeIdx + 1}
end, {[], 1}, end, {[], 1},
mnesia:match_object( mnesia:match_object(
{pubsub_node, {Host, '_'}, '_', '_'})) {pubsub_node, {Host, '_'}, '_', '_'})),
mnesia:write(#pubsub_index{index = node, last = LastIdx, free = []}),
Result
end, end,
{atomic, NewRecords} = mnesia:transaction(F), {atomic, NewRecords} = mnesia:transaction(F),
{atomic, ok} = mnesia:delete_table(pubsub_node), {atomic, ok} = mnesia:delete_table(pubsub_node),
@ -341,11 +343,9 @@ update_node_database(Host, ServerHost) ->
end, end,
case mnesia:transaction(FNew) of case mnesia:transaction(FNew) of
{atomic, Result} -> {atomic, Result} ->
?INFO_MSG("Pubsub node tables updated correctly: ~p", ?INFO_MSG("Pubsub node tables updated correctly: ~p", [Result]);
[Result]);
{aborted, Reason} -> {aborted, Reason} ->
?ERROR_MSG("Problem updating Pubsub node tables:~n~p", ?ERROR_MSG("Problem updating Pubsub node tables:~n~p", [Reason])
[Reason])
end; end;
[nodeid, parentid, type, owners, options] -> [nodeid, parentid, type, owners, options] ->
F = fun({pubsub_node, NodeId, {_, Parent}, Type, Owners, Options}) -> F = fun({pubsub_node, NodeId, {_, Parent}, Type, Owners, Options}) ->
@ -359,7 +359,7 @@ update_node_database(Host, ServerHost) ->
end, end,
mnesia:transform_table(pubsub_node, F, [nodeid, id, parents, type, owners, options]), mnesia:transform_table(pubsub_node, F, [nodeid, id, parents, type, owners, options]),
FNew = fun() -> FNew = fun() ->
lists:foldl(fun(#pubsub_node{nodeid = NodeId} = PubsubNode, NodeIdx) -> LastIdx = lists:foldl(fun(#pubsub_node{nodeid = NodeId} = PubsubNode, NodeIdx) ->
mnesia:write(PubsubNode#pubsub_node{id = NodeIdx}), mnesia:write(PubsubNode#pubsub_node{id = NodeIdx}),
lists:foreach(fun(#pubsub_state{stateid = StateId} = State) -> lists:foreach(fun(#pubsub_state{stateid = StateId} = State) ->
{JID, _} = StateId, {JID, _} = StateId,
@ -379,15 +379,15 @@ update_node_database(Host, ServerHost) ->
end, 1, mnesia:match_object( end, 1, mnesia:match_object(
{pubsub_node, {Host, '_'}, '_', '_', '_', '_', '_'}) {pubsub_node, {Host, '_'}, '_', '_', '_', '_', '_'})
++ mnesia:match_object( ++ mnesia:match_object(
{pubsub_node, {{'_', ServerHost, '_'}, '_'}, '_', '_', '_', '_', '_'})) {pubsub_node, {{'_', ServerHost, '_'}, '_'}, '_', '_', '_', '_', '_'})),
mnesia:write(#pubsub_index{index = node, last = LastIdx, free = []})
end, end,
case mnesia:transaction(FNew) of case mnesia:transaction(FNew) of
{atomic, Result} -> {atomic, Result} ->
?INFO_MSG("Pubsub node tables updated correctly: ~p", rename_default_nodeplugin(),
[Result]); ?INFO_MSG("Pubsub node tables updated correctly: ~p", [Result]);
{aborted, Reason} -> {aborted, Reason} ->
?ERROR_MSG("Problem updating Pubsub node tables:~n~p", ?ERROR_MSG("Problem updating Pubsub node tables:~n~p", [Reason])
[Reason])
end; end;
[nodeid, id, parent, type, owners, options] -> [nodeid, id, parent, type, owners, options] ->
F = fun({pubsub_node, NodeId, Id, Parent, Type, Owners, Options}) -> F = fun({pubsub_node, NodeId, Id, Parent, Type, Owners, Options}) ->
@ -399,11 +399,17 @@ update_node_database(Host, ServerHost) ->
owners = Owners, owners = Owners,
options = Options} options = Options}
end, end,
mnesia:transform_table(pubsub_node, F, [nodeid, id, parents, type, owners, options]); mnesia:transform_table(pubsub_node, F, [nodeid, id, parents, type, owners, options]),
rename_default_nodeplugin();
_ -> _ ->
ok ok
end. end.
rename_default_nodeplugin() ->
lists:foreach(fun(Node) ->
mnesia:dirty_write(Node#pubsub_node{type = "hometree"})
end, mnesia:dirty_match_object(#pubsub_node{type = "default", _ = '_'})).
update_state_database(_Host, _ServerHost) -> update_state_database(_Host, _ServerHost) ->
case catch mnesia:table_info(pubsub_state, attributes) of case catch mnesia:table_info(pubsub_state, attributes) of
[stateid, items, affiliation, subscription] -> [stateid, items, affiliation, subscription] ->
@ -467,7 +473,7 @@ send_loop(State) ->
lists:foreach(fun(PType) -> lists:foreach(fun(PType) ->
{result, Subscriptions} = node_action(Host, PType, get_entity_subscriptions, [Host, JID]), {result, Subscriptions} = node_action(Host, PType, get_entity_subscriptions, [Host, JID]),
lists:foreach( lists:foreach(
fun({Node, subscribed, SubJID}) -> fun({Node, subscribed, _, SubJID}) ->
if (SubJID == LJID) or (SubJID == BJID) -> if (SubJID == LJID) or (SubJID == BJID) ->
#pubsub_node{options = Options, type = Type, id = NodeId} = Node, #pubsub_node{options = Options, type = Type, id = NodeId} = Node,
case get_option(Options, send_last_published_item) of case get_option(Options, send_last_published_item) of
@ -637,13 +643,12 @@ disco_sm_features(Acc, From, To, Node, _Lang) ->
end. end.
disco_sm_items(Acc, From, To, <<>>, _Lang) -> disco_sm_items(Acc, From, To, <<>>, _Lang) ->
%% TODO, use iq_disco_items(Host, [], From)
Host = exmpp_jid:prep_domain_as_list(To), Host = exmpp_jid:prep_domain_as_list(To),
LJID = jlib:short_prepd_bare_jid(To),
case tree_action(Host, get_subnodes, [Host, [], From]) of case tree_action(Host, get_subnodes, [Host, [], From]) of
[] -> [] ->
Acc; Acc;
Nodes -> Nodes ->
SBJID = jlib:short_prepd_bare_jid(To),
Items = case Acc of Items = case Acc of
{result, I} -> I; {result, I} -> I;
_ -> [] _ -> []
@ -651,8 +656,7 @@ disco_sm_items(Acc, From, To, <<>>, _Lang) ->
NodeItems = lists:map( NodeItems = lists:map(
fun(#pubsub_node{nodeid = {_, Node}}) -> fun(#pubsub_node{nodeid = {_, Node}}) ->
#xmlel{ns = ?NS_DISCO_ITEMS, name = 'item', attrs = #xmlel{ns = ?NS_DISCO_ITEMS, name = 'item', attrs =
[?XMLATTR('jid', exmpp_jid:to_binary(LJID)), [?XMLATTR('jid', exmpp_jid:to_binary(SBJID)) | nodeAttr(Node)]}
?XMLATTR('node', node_to_string(Node))]}
end, Nodes), end, Nodes),
{result, NodeItems ++ Items} {result, NodeItems ++ Items}
end; end;
@ -675,7 +679,7 @@ disco_sm_items(Acc, From, To, NodeB, _Lang) ->
fun(#pubsub_item{itemid = {Id, _}}) -> fun(#pubsub_item{itemid = {Id, _}}) ->
%% "jid" is required by XEP-0030, and %% "jid" is required by XEP-0030, and
%% "node" is forbidden by XEP-0060. %% "node" is forbidden by XEP-0060.
{result, Name} = node_action(Host, Node, get_item_name, [NodeId, Id]), {result, Name} = node_call(Type, get_item_name, [Host, Node, Id]),
#xmlel{ns = ?NS_DISCO_ITEMS, name = 'item', attrs = #xmlel{ns = ?NS_DISCO_ITEMS, name = 'item', attrs =
[?XMLATTR('jid', exmpp_jid:to_binary(LJID)), [?XMLATTR('jid', exmpp_jid:to_binary(LJID)),
?XMLATTR('name', Name)]} ?XMLATTR('name', Name)]}
@ -695,16 +699,16 @@ disco_sm_items(Acc, From, To, NodeB, _Lang) ->
%% %%
presence_probe(JID, JID, Pid) -> presence_probe(JID, JID, Pid) ->
{U, S, R} = jlib:short_prepd_jid(JID), {User, Server, Resource} = jlib:short_prepd_jid(JID),
Host = exmpp_jid:prep_domain_as_list(JID), Host = exmpp_jid:prep_domain_as_list(JID),
Proc = gen_mod:get_module_proc(Host, ?PROCNAME), Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
gen_server:cast(Proc, {presence, JID, Pid}), gen_server:cast(Proc, {presence, JID, Pid}),
gen_server:cast(Proc, {presence, U, S, [R], JID}); gen_server:cast(Proc, {presence, User, Server, [Resource], JID});
presence_probe(Peer, JID, _Pid) -> presence_probe(Peer, JID, _Pid) ->
{U, S, R} = jlib:short_prepd_jid(Peer), {User, Server, Resource} = jlib:short_prepd_jid(Peer),
Host = exmpp_jid:prep_domain_as_list(JID), Host = exmpp_jid:prep_domain_as_list(JID),
Proc = gen_mod:get_module_proc(Host, ?PROCNAME), Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
gen_server:cast(Proc, {presence, U, S, [R], JID}). gen_server:cast(Proc, {presence, User, Server, [Resource], JID}).
%% ------- %% -------
%% subscription hooks handling functions %% subscription hooks handling functions
@ -782,7 +786,7 @@ handle_cast({remove_user, LUser, LServer}, State) ->
lists:foreach(fun(PType) -> lists:foreach(fun(PType) ->
{result, Subscriptions} = node_action(Host, PType, get_entity_subscriptions, [Host, Owner]), {result, Subscriptions} = node_action(Host, PType, get_entity_subscriptions, [Host, Owner]),
lists:foreach(fun lists:foreach(fun
({#pubsub_node{nodeid = {H, N}}, subscribed, JID}) -> ({#pubsub_node{nodeid = {H, N}}, subscribed, _, JID}) ->
unsubscribe_node(H, N, Owner, JID, all); unsubscribe_node(H, N, Owner, JID, all);
(_) -> (_) ->
ok ok
@ -803,7 +807,7 @@ handle_cast({unsubscribe, Subscriber, Owner}, State) ->
lists:foreach(fun(PType) -> lists:foreach(fun(PType) ->
{result, Subscriptions} = node_action(Host, PType, get_entity_subscriptions, [Host, Subscriber]), {result, Subscriptions} = node_action(Host, PType, get_entity_subscriptions, [Host, Subscriber]),
lists:foreach(fun lists:foreach(fun
({Node, subscribed, JID}) -> ({Node, subscribed, _, JID}) ->
#pubsub_node{options = Options, owners = Owners, type = Type, id = NodeId} = Node, #pubsub_node{options = Options, owners = Owners, type = Type, id = NodeId} = Node,
case get_option(Options, access_model) of case get_option(Options, access_model) of
presence -> presence ->
@ -909,8 +913,7 @@ do_route(ServerHost, Access, Plugins, Host, From, To, Packet) ->
#iq{type = get, ns = ?NS_DISCO_INFO, #iq{type = get, ns = ?NS_DISCO_INFO,
payload = SubEl, lang = Lang} -> payload = SubEl, lang = Lang} ->
QAttrs = SubEl#xmlel.attrs, QAttrs = SubEl#xmlel.attrs,
Node = exmpp_xml:get_attribute_from_list_as_list(QAttrs, Node = exmpp_xml:get_attribute_from_list_as_list(QAttrs, 'node', ""),
'node', ""),
ServerHostB = list_to_binary(ServerHost), ServerHostB = list_to_binary(ServerHost),
Info = ejabberd_hooks:run_fold( Info = ejabberd_hooks:run_fold(
disco_info, ServerHostB, [], disco_info, ServerHostB, [],
@ -1524,15 +1527,20 @@ find_authorization_response(Packet) ->
%% Host = mod_pubsub:host() %% Host = mod_pubsub:host()
%% JID = jlib:jid() %% JID = jlib:jid()
%% SNode = string() %% SNode = string()
%% Subscription = atom() %% Subscription = atom() | {atom(), mod_pubsub:subid)}
%% Plugins = [Plugin::string()] %% Plugins = [Plugin::string()]
%% @doc Send a message to JID with the supplied Subscription %% @doc Send a message to JID with the supplied Subscription
send_authorization_approval(Host, JID, SNode, Subscription) -> send_authorization_approval(Host, JID, SNode, Subscription) ->
SubAttrs = case Subscription of
{S, SID} -> [?XMLATTR('subscription', subscription_to_string(S)),
?XMLATTR('subid', SID)];
S -> [?XMLATTR('subscription', subscription_to_string(S))]
end,
Stanza = event_stanza( Stanza = event_stanza(
[#xmlel{ns = ?NS_PUBSUB_EVENT, name = 'subscription', attrs = [#xmlel{ns = ?NS_PUBSUB_EVENT, name = 'subscription', attrs =
[?XMLATTR('node', SNode), [?XMLATTR('node', SNode),
?XMLATTR('jid', exmpp_jid:to_binary(JID)), ?XMLATTR('jid', exmpp_jid:to_binary(JID))] ++ SubAttrs
?XMLATTR('subscription', subscription_to_string(Subscription))]}]), }]),
ejabberd_router ! {route, service_jid(Host), JID, Stanza}. ejabberd_router ! {route, service_jid(Host), JID, Stanza}.
handle_authorization_response(Host, From, To, Packet, XFields) -> handle_authorization_response(Host, From, To, Packet, XFields) ->
@ -1551,14 +1559,14 @@ handle_authorization_response(Host, From, To, Packet, XFields) ->
"true" -> true; "true" -> true;
_ -> false _ -> false
end, end,
Action = fun(#pubsub_node{type = Type, id = NodeId, owners = Owners}) -> Action = fun(#pubsub_node{type = Type, owners = Owners, id = NodeId}) ->
IsApprover = lists:member(jlib:short_prepd_bare_jid(From), Owners), IsApprover = lists:member(jlib:short_prepd_bare_jid(From), Owners),
{result, Subscriptions} = node_call(Type, get_subscriptions, [NodeId, Subscriber]), {result, Subscriptions} = node_call(Type, get_subscriptions, [NodeId, Subscriber]),
if if
not IsApprover -> not IsApprover ->
{error, 'forbidden'}; {error, 'forbidden'};
true -> true ->
update_auth(Host, SNode, Type, NodeId, update_auth(Host, SNode, Type, NodeId,
Subscriber, Allow, Subscriber, Allow,
Subscriptions) Subscriptions)
end end
@ -1760,8 +1768,8 @@ create_node(Host, ServerHost, Node, Owner, GivenType, Access, Configuration) ->
{result, Result}; {result, Result};
Error -> Error ->
%% in case we change transaction to sync_dirty... %% in case we change transaction to sync_dirty...
%% node_call(Type, delete_node, [NodeId]), %% node_call(Type, delete_node, [Host, Node]),
%% tree_call(Host, delete_node, [NodeId]), %% tree_call(Host, delete_node, [Host, Node]),
Error Error
end; end;
Error -> Error ->
@ -1786,10 +1794,14 @@ delete_node(_Host, [], _Owner) ->
{error, 'not-allowed'}; {error, 'not-allowed'};
delete_node(Host, Node, Owner) -> delete_node(Host, Node, Owner) ->
Action = fun(#pubsub_node{type = Type, id = NodeId}) -> Action = fun(#pubsub_node{type = Type, id = NodeId}) ->
SubsByDepth = get_collection_subscriptions(Host, Node),
case node_call(Type, get_affiliation, [NodeId, Owner]) of case node_call(Type, get_affiliation, [NodeId, Owner]) of
{result, owner} -> {result, owner} ->
Removed = tree_call(Host, delete_node, [NodeId]), Removed = tree_call(Host, delete_node, [Host, Node]),
node_call(Type, delete_node, [Removed]); case node_call(Type, delete_node, [Removed]) of
{result, Res} -> {result, {SubsByDepth, Res}};
Error -> Error
end;
_ -> _ ->
%% Entity is not an owner %% Entity is not an owner
{error, 'forbidden'} {error, 'forbidden'}
@ -1797,27 +1809,27 @@ delete_node(Host, Node, Owner) ->
end, end,
Reply = [], Reply = [],
case transaction(Host, Node, Action, transaction) of case transaction(Host, Node, Action, transaction) of
{result, {_, {Result, broadcast, Removed}}} -> {result, {_, {SubsByDepth, {Result, broadcast, Removed}}}} ->
lists:foreach(fun({RNode, RSubscriptions}) -> lists:foreach(fun({RNode, _RSubscriptions}) ->
{RH, RN} = RNode#pubsub_node.nodeid, {RH, RN} = RNode#pubsub_node.nodeid,
NodeId = RNode#pubsub_node.id, NodeId = RNode#pubsub_node.id,
Type = RNode#pubsub_node.type, Type = RNode#pubsub_node.type,
Options = RNode#pubsub_node.options, Options = RNode#pubsub_node.options,
broadcast_removed_node(RH, RN, NodeId, Type, Options, RSubscriptions), broadcast_removed_node(RH, RN, NodeId, Type, Options, SubsByDepth),
unset_cached_item(RH, NodeId) unset_cached_item(RH, NodeId)
end, Removed), end, Removed),
case Result of case Result of
default -> {result, Reply}; default -> {result, Reply};
_ -> {result, Result} _ -> {result, Result}
end; end;
{result, {_, {Result, _Removed}}} -> {result, {_, {_, {Result, _Removed}}}} ->
case Result of case Result of
default -> {result, Reply}; default -> {result, Reply};
_ -> {result, Result} _ -> {result, Result}
end; end;
{result, {_, default}} -> {result, {_, {_, default}}} ->
{result, Reply}; {result, Reply};
{result, {_, Result}} -> {result, {_, {_, Result}}} ->
{result, Result}; {result, Result};
Error -> Error ->
Error Error
@ -1858,7 +1870,7 @@ subscribe_node(Host, Node, From, JID, Configuration) ->
Features = features(Type), Features = features(Type),
SubscribeFeature = lists:member("subscribe", Features), SubscribeFeature = lists:member("subscribe", Features),
OptionsFeature = lists:member("subscription-options", Features), OptionsFeature = lists:member("subscription-options", Features),
HasOptions = not (SubOpts == []), HasOptions = not (SubOpts == []),
SubscribeConfig = get_option(Options, subscribe), SubscribeConfig = get_option(Options, subscribe),
AccessModel = get_option(Options, access_model), AccessModel = get_option(Options, access_model),
SendLast = get_option(Options, send_last_published_item), SendLast = get_option(Options, send_last_published_item),
@ -1898,35 +1910,37 @@ subscribe_node(Host, Node, From, JID, Configuration) ->
end, end,
Reply = fun(Subscription) -> Reply = fun(Subscription) ->
%% TODO, this is subscription-notification, should depends on node features %% TODO, this is subscription-notification, should depends on node features
SubAttrs = case Subscription of
{subscribed, SubId} ->
[{"subscription", subscription_to_string(subscribed)},
{"subid", SubId}];
Other ->
[{"subscription", subscription_to_string(Other)}]
end,
Fields = Fields =
[?XMLATTR('node', node_to_string(Node)), [ ?XMLATTR('jid', exmpp_jid:to_binary(Subscriber)) | SubAttrs],
?XMLATTR('jid', exmpp_jid:to_binary(Subscriber)),
?XMLATTR('subscription', subscription_to_string(Subscription))],
#xmlel{ns = ?NS_PUBSUB, name = 'pubsub', children = #xmlel{ns = ?NS_PUBSUB, name = 'pubsub', children =
[#xmlel{ns = ?NS_PUBSUB, name = 'subscription', attrs = [#xmlel{ns = ?NS_PUBSUB, name = 'subscription', attrs = Fields}]}
case Subscription of
subscribed -> [?XMLATTR('subid', SubId)|Fields];
_ -> Fields
end}]}
end, end,
case transaction(Host, Node, Action, sync_dirty) of case transaction(Host, Node, Action, sync_dirty) of
{result, {TNode, {Result, subscribed, send_last}}} -> {result, {TNode, {Result, subscribed, SubId, send_last}}} ->
NodeId = TNode#pubsub_node.id, NodeId = TNode#pubsub_node.id,
Type = TNode#pubsub_node.type, Type = TNode#pubsub_node.type,
send_items(Host, Node, NodeId, Type, Subscriber, last), send_items(Host, Node, NodeId, Type, Subscriber, last),
case Result of case Result of
default -> {result, Reply(subscribed)}; default -> {result, Reply({subscribed, SubId})};
_ -> {result, Result}
end;
{result, {TNode, {Result, Subscription}}} ->
case Subscription of
pending -> send_authorization_request(TNode, Subscriber);
_ -> ok
end,
case Result of
default -> {result, Reply(Subscription)};
_ -> {result, Result} _ -> {result, Result}
end; end;
{result, {_TNode, {default, subscribed, SubId}}} ->
{result, Reply({subscribed, SubId})};
{result, {_TNode, {Result, subscribed, _SubId}}} ->
{result, Result};
{result, {TNode, {default, pending, _SubId}}} ->
send_authorization_request(TNode, Subscriber),
{result, Reply(pending)};
{result, {TNode, {Result, pending}}} ->
send_authorization_request(TNode, Subscriber),
{result, Result};
{result, {_, Result}} -> {result, {_, Result}} ->
%% this case should never occure anyway %% this case should never occure anyway
{result, Result}; {result, Result};
@ -1992,7 +2006,7 @@ publish_item(Host, ServerHost, Node, Publisher, ItemId, Payload) ->
Features = features(Type), Features = features(Type),
PublishFeature = lists:member("publish", Features), PublishFeature = lists:member("publish", Features),
PublishModel = get_option(Options, publish_model), PublishModel = get_option(Options, publish_model),
MaxItems = max_items(Options), MaxItems = max_items(Host, Options),
DeliverPayloads = get_option(Options, deliver_payloads), DeliverPayloads = get_option(Options, deliver_payloads),
PersistItems = get_option(Options, persist_items), PersistItems = get_option(Options, persist_items),
PayloadCount = payload_xmlelements(Payload), PayloadCount = payload_xmlelements(Payload),
@ -2276,7 +2290,7 @@ get_item(Host, Node, ItemId) ->
end. end.
%% @spec (Host, Node, NodeId, Type, LJID, Number) -> any() %% @spec (Host, Node, NodeId, Type, LJID, Number) -> any()
%% Host = host() %% Host = pubsubHost()
%% Node = pubsubNode() %% Node = pubsubNode()
%% NodeId = pubsubNodeId() %% NodeId = pubsubNodeId()
%% Type = pubsubNodeType() %% Type = pubsubNodeType()
@ -2651,6 +2665,7 @@ get_subscriptions(Host, Node, JID) ->
{result, {_, Subscriptions}} -> {result, {_, Subscriptions}} ->
Entities = lists:flatmap( Entities = lists:flatmap(
fun({_, none}) -> []; fun({_, none}) -> [];
({_, pending, _}) -> [];
({{AU, AS, AR}, Subscription}) -> ({{AU, AS, AR}, Subscription}) ->
[#xmlel{ns = ?NS_PUBSUB_OWNER, name = 'subscription', attrs = [#xmlel{ns = ?NS_PUBSUB_OWNER, name = 'subscription', attrs =
[?XMLATTR('jid', exmpp_jid:to_binary(AU, AS, AR)), [?XMLATTR('jid', exmpp_jid:to_binary(AU, AS, AR)),
@ -2681,19 +2696,20 @@ set_subscriptions(Host, Node, From, EntitiesEls) ->
#xmlel{name = 'subscription', attrs = Attrs} -> #xmlel{name = 'subscription', attrs = Attrs} ->
JID = try JID = try
exmpp_jid:parse( exmpp_jid:parse(
exmpp_xml:get_attribute_from_list_as_list(Attrs, 'jid', "")) exmpp_xml:get_attribute_from_list(Attrs, 'jid', ""))
catch catch
_:_ -> _:_ ->
error error
end, end,
Subscription = string_to_subscription( Subscription = string_to_subscription(
exmpp_xml:get_attribute_from_list_as_list(Attrs, 'subscription', false)), exmpp_xml:get_attribute_from_list_as_list(Attrs, 'subscription', false)),
SubId = exmpp_xml:get_attribute_from_list_as_list(Attrs, "subid", false),
if if
(JID == error) or (JID == error) or
(Subscription == false) -> (Subscription == false) ->
error; error;
true -> true ->
[{jlib:short_prepd_jid(JID), Subscription} | Acc] [{JID, Subscription, SubId} | Acc]
end end
end end
end end
@ -2702,18 +2718,38 @@ set_subscriptions(Host, Node, From, EntitiesEls) ->
error -> error ->
{error, 'bad-request'}; {error, 'bad-request'};
_ -> _ ->
Action = fun(#pubsub_node{type = Type, id = NodeId, owners = Owners}) -> Notify = fun(JID, Sub, _SubId) ->
case lists:member(Owner, Owners) of Stanza = #xmlel{ns = ?NS_JABBER_CLIENT,
true -> name = 'message',
lists:foreach( children =
fun({JID, Subscription}) -> [#xmlel{ns = ?NS_PUBSUB,
node_call(Type, set_subscription, [NodeId, JID, Subscription]) name = 'pubsub',
end, Entities), children =
{result, []}; [#xmlel{ns = ?NS_PUBSUB,
_ -> name = 'subscription',
{error, 'forbidden'} attrs = [?XMLATTR('jid', exmpp_jid:to_binary(JID)),
end ?XMLATTR('subsription', subscription_to_string(Sub)) | nodeAttr(Node)]}]}]},
end,
ejabberd_router ! {route, service_jid(Host), JID, Stanza}
end,
Action = fun(#pubsub_node{owners = Owners, type = Type, id = NodeId}) ->
case lists:member(Owner, Owners) of
true ->
Result = lists:foldl(fun({JID, Subscription, SubId}, Acc) ->
case node_call(Type, set_subscriptions, [NodeId, JID, Subscription, SubId]) of
{error, Err} -> [{error, Err} | Acc];
_ -> Notify(JID, Subscription, SubId), Acc
end
end, [], Entities),
case Result of
[] -> {result, []};
_ -> {error, 'not-acceptable'}
end;
_ ->
{error, 'forbidden'}
end
end,
case transaction(Host, Node, Action, sync_dirty) of case transaction(Host, Node, Action, sync_dirty) of
{result, {_, Result}} -> {result, Result}; {result, {_, Result}} -> {result, Result};
Other -> Other Other -> Other
@ -2799,7 +2835,7 @@ service_jid(Host) ->
_ -> exmpp_jid:make(Host) _ -> exmpp_jid:make(Host)
end. end.
%% @spec (LJID, PresenceDelivery) -> boolean() %% @spec (LJID, NotifyType, Depth, NodeOptions, SubOptions) -> boolean()
%% LJID = jid() %% LJID = jid()
%% NotifyType = items | nodes %% NotifyType = items | nodes
%% Depth = integer() %% Depth = integer()
@ -2889,13 +2925,13 @@ broadcast_publish_item(Host, Node, NodeId, Type, Options, Removed, ItemId, _From
{result, false} {result, false}
end. end.
broadcast_retract_items(Host, Node, NodeId, Type, Options, ItemIds) -> broadcast_retract_items(Host, Node, NodeId, Type, NodeOptions, ItemIds) ->
broadcast_retract_items(Host, Node, NodeId, Type, Options, ItemIds, false). broadcast_retract_items(Host, Node, NodeId, Type, NodeOptions, ItemIds, false).
broadcast_retract_items(_Host, _Node, _NodeId, _Type, _Options, [], _ForceNotify) -> broadcast_retract_items(_Host, _Node, _NodeId, _Type, _NodeOptions, [], _ForceNotify) ->
{result, false}; {result, false};
broadcast_retract_items(Host, Node, NodeId, Type, Options, ItemIds, ForceNotify) -> broadcast_retract_items(Host, Node, NodeId, Type, NodeOptions, ItemIds, ForceNotify) ->
%broadcast(Host, Node, NodeId, Options, notify_retract, ForceNotify, 'retract', RetractEls) %broadcast(Host, Node, NodeId, NodeOptions, notify_retract, ForceNotify, 'retract', RetractEls)
case (get_option(Options, notify_retract) or ForceNotify) of case (get_option(NodeOptions, notify_retract) or ForceNotify) of
true -> true ->
case get_collection_subscriptions(Host, Node) of case get_collection_subscriptions(Host, Node) of
[] -> [] ->
@ -2905,7 +2941,7 @@ broadcast_retract_items(Host, Node, NodeId, Type, Options, ItemIds, ForceNotify)
Stanza = event_stanza( Stanza = event_stanza(
[#xmlel{ns = ?NS_PUBSUB_EVENT, name = 'items', attrs = nodeAttr(Node), children = [#xmlel{ns = ?NS_PUBSUB_EVENT, name = 'items', attrs = nodeAttr(Node), children =
[#xmlel{ns = ?NS_PUBSUB_EVENT, name = 'retract', attrs = itemAttr(ItemId)} || ItemId <- ItemIds]}]), [#xmlel{ns = ?NS_PUBSUB_EVENT, name = 'retract', attrs = itemAttr(ItemId)} || ItemId <- ItemIds]}]),
broadcast_stanza(Host, Node, NodeId, Type, Options, SubsByDepth, items, Stanza), broadcast_stanza(Host, Node, NodeId, Type, NodeOptions, SubsByDepth, items, Stanza),
{result, true}; {result, true};
_ -> _ ->
{result, false} {result, false}
@ -2914,9 +2950,9 @@ broadcast_retract_items(Host, Node, NodeId, Type, Options, ItemIds, ForceNotify)
{result, false} {result, false}
end. end.
broadcast_purge_node(Host, Node, NodeId, Type, Options) -> broadcast_purge_node(Host, Node, NodeId, Type, NodeOptions) ->
%broadcast(Host, Node, NodeId, Options, notify_retract, false, 'purge', []) %broadcast(Host, Node, NodeId, NodeOptions, notify_retract, false, 'purge', [])
case get_option(Options, notify_retract) of case get_option(NodeOptions, notify_retract) of
true -> true ->
case get_collection_subscriptions(Host, Node) of case get_collection_subscriptions(Host, Node) of
[] -> [] ->
@ -2924,7 +2960,7 @@ broadcast_purge_node(Host, Node, NodeId, Type, Options) ->
SubsByDepth when is_list(SubsByDepth) -> SubsByDepth when is_list(SubsByDepth) ->
Stanza = event_stanza( Stanza = event_stanza(
[#xmlel{ns = ?NS_PUBSUB_EVENT, name = 'purge', attrs = nodeAttr(Node)}]), [#xmlel{ns = ?NS_PUBSUB_EVENT, name = 'purge', attrs = nodeAttr(Node)}]),
broadcast_stanza(Host, Node, NodeId, Type, Options, SubsByDepth, nodes, Stanza), broadcast_stanza(Host, Node, NodeId, Type, NodeOptions, SubsByDepth, nodes, Stanza),
{result, true}; {result, true};
_ -> _ ->
{result, false} {result, false}
@ -2933,43 +2969,43 @@ broadcast_purge_node(Host, Node, NodeId, Type, Options) ->
{result, false} {result, false}
end. end.
broadcast_removed_node(Host, Node, NodeId, Type, Options, Subs) -> broadcast_removed_node(Host, Node, NodeId, Type, NodeOptions, SubsByDepth) ->
%broadcast(Host, Node, NodeId, Options, notify_delete, false, 'delete', []) %broadcast(Host, Node, NodeId, NodeOptions, notify_delete, false, 'delete', [])
case get_option(Options, notify_delete) of case get_option(NodeOptions, notify_delete) of
true -> true ->
case Subs of case SubsByDepth of
[] -> [] ->
{result, false}; {result, false};
_ -> _ ->
Stanza = event_stanza( Stanza = event_stanza(
[#xmlel{ns = ?NS_PUBSUB_EVENT, name = 'delete', attrs = nodeAttr(Node)}]), [#xmlel{ns = ?NS_PUBSUB_EVENT, name = 'delete', attrs = nodeAttr(Node)}]),
broadcast_stanza(Host, Node, NodeId, Type, Options, Subs, nodes, Stanza), broadcast_stanza(Host, Node, NodeId, Type, NodeOptions, SubsByDepth, nodes, Stanza),
{result, true} {result, true}
end; end;
_ -> _ ->
{result, false} {result, false}
end. end.
broadcast_config_notification(Host, Node, NodeId, Type, Options, Lang) -> broadcast_config_notification(Host, Node, NodeId, Type, NodeOptions, Lang) ->
%broadcast(Host, Node, NodeId, Options, notify_config, false, 'items', ConfigEls) %broadcast(Host, Node, NodeId, NodeOptions, notify_config, false, 'items', ConfigEls)
case get_option(Options, notify_config) of case get_option(NodeOptions, notify_config) of
true -> true ->
case get_collection_subscriptions(Host, Node) of case get_collection_subscriptions(Host, Node) of
[] -> [] ->
{result, false}; {result, false};
SubsByDepth when is_list(SubsByDepth) -> SubsByDepth when is_list(SubsByDepth) ->
Content = case get_option(Options, deliver_payloads) of Content = case get_option(NodeOptions, deliver_payloads) of
true -> true ->
[#xmlel{ns = ?NS_DATA_FORMS, name = 'x', attrs = [?XMLATTR('type', <<"form">>)], children = [#xmlel{ns = ?NS_DATA_FORMS, name = 'x', attrs = [?XMLATTR('type', <<"form">>)], children =
get_configure_xfields(Type, Options, Lang, [])}]; get_configure_xfields(Type, NodeOptions, Lang, [])}];
false -> false ->
[] []
end, end,
Stanza = event_stanza( Stanza = event_stanza(
[#xmlel{ns = ?NS_PUBSUB_EVENT, name = 'items', attrs = nodeAttr(Node), children = [#xmlel{ns = ?NS_PUBSUB_EVENT, name = 'items', attrs = nodeAttr(Node), children =
[#xmlel{ns = ?NS_PUBSUB_EVENT, name = 'item', attrs = [?XMLATTR('id', <<"configuration">>)], children = [#xmlel{ns = ?NS_PUBSUB_EVENT, name = 'item', attrs = [?XMLATTR('id', <<"configuration">>)], children =
Content}]}]), Content}]}]),
broadcast_stanza(Host, Node, NodeId, Type, Options, SubsByDepth, nodes, Stanza), broadcast_stanza(Host, Node, NodeId, Type, NodeOptions, SubsByDepth, nodes, Stanza),
{result, true}; {result, true};
_ -> _ ->
{result, false} {result, false}
@ -2995,22 +3031,24 @@ get_node_subs(#pubsub_node{type = Type,
get_options_for_subs(_Host, Node, NodeID, Subs) -> get_options_for_subs(_Host, Node, NodeID, Subs) ->
lists:foldl(fun({JID, subscribed, SubID}, Acc) -> lists:foldl(fun({JID, subscribed, SubID}, Acc) ->
{result, #pubsub_subscription{options = Options}} = pubsub_subscription:get_subscription(JID, NodeID, SubID), case pubsub_subscription:get_subscription(JID, NodeID, SubID) of
[{JID, Node, Options} | Acc]; {result, #pubsub_subscription{options = Options}} -> [{JID, Node, Options} | Acc];
_ -> Acc
end;
(_, Acc) -> (_, Acc) ->
Acc Acc
end, [], Subs). end, [], Subs).
% TODO: merge broadcast code that way % TODO: merge broadcast code that way
%broadcast(Host, Node, NodeId, Type, Options, Feature, Force, ElName, SubEls) -> %broadcast(Host, Node, NodeId, Type, NodeOptions, Feature, Force, ElName, SubEls) ->
% case (get_option(Options, Feature) or Force) of % case (get_option(NodeOptions, Feature) or Force) of
% true -> % true ->
% case node_action(Host, Type, get_node_subscriptions, [NodeId]) of % case node_action(Host, Type, get_node_subscriptions, [NodeId]) of
% {result, []} -> % {result, []} ->
% {result, false}; % {result, false};
% {result, Subs} -> % {result, Subs} ->
% Stanza = event_stanza([{xmlelement, ElName, [{"node", node_to_string(Node)}], SubEls}]), % Stanza = event_stanza([{xmlelement, ElName, [{"node", node_to_string(Node)}], SubEls}]),
% broadcast_stanza(Host, Node, Type, Options, Subs, Stanza), % broadcast_stanza(Host, Node, Type, NodeOptions, SubOpts, Stanza),
% {result, true}; % {result, true};
% _ -> % _ ->
% {result, false} % {result, false}
@ -3198,7 +3236,8 @@ node_options(Type) ->
Result Result
end. end.
%% @spec (Options) -> MaxItems %% @spec (Host, Options) -> MaxItems
%% Host = host()
%% Options = [Option] %% Options = [Option]
%% Option = {Key::atom(), Value::term()} %% Option = {Key::atom(), Value::term()}
%% MaxItems = integer() | unlimited %% MaxItems = integer() | unlimited
@ -3208,7 +3247,7 @@ node_options(Type) ->
%% @todo In practice, the current data structure means that we cannot manage %% @todo In practice, the current data structure means that we cannot manage
%% millions of items on a given node. This should be addressed in a new %% millions of items on a given node. This should be addressed in a new
%% version. %% version.
max_items(Options) -> max_items(Host, Options) ->
case get_option(Options, persist_items) of case get_option(Options, persist_items) of
true -> true ->
case get_option(Options, max_items) of case get_option(Options, max_items) of
@ -3218,8 +3257,13 @@ max_items(Options) ->
end; end;
false -> false ->
case get_option(Options, send_last_published_item) of case get_option(Options, send_last_published_item) of
never -> 0; never ->
_ -> 1 0;
_ ->
case is_last_item_cache_enabled(Host) of
true -> 0;
false -> 1
end
end end
end. end.
@ -3306,8 +3350,10 @@ set_configure(Host, Node, From, Els, Lang) ->
end, end,
case set_xoption(XData, OldOpts) of case set_xoption(XData, OldOpts) of
NewOpts when is_list(NewOpts) -> NewOpts when is_list(NewOpts) ->
tree_call(Host, set_node, [N#pubsub_node{options = NewOpts}]), case tree_call(Host, set_node, [N#pubsub_node{options = NewOpts}]) of
{result, ok}; ok -> {result, ok};
Err -> Err
end;
Err -> Err ->
Err Err
end end
@ -3412,35 +3458,45 @@ set_xoption([{"pubsub#type", Value} | Opts], NewOpts) ->
?SET_STRING_XOPT(type, Value); ?SET_STRING_XOPT(type, Value);
set_xoption([{"pubsub#body_xslt", Value} | Opts], NewOpts) -> set_xoption([{"pubsub#body_xslt", Value} | Opts], NewOpts) ->
?SET_STRING_XOPT(body_xslt, Value); ?SET_STRING_XOPT(body_xslt, Value);
set_xoption([{"pubsub#collection", Value} | Opts], NewOpts) ->
NewValue = [string_to_node(V) || V <- Value],
?SET_LIST_XOPT(collection, NewValue);
set_xoption([{"pubsub#node", [Value]} | Opts], NewOpts) ->
NewValue = string_to_node(Value),
?SET_LIST_XOPT(node, NewValue);
set_xoption([_ | Opts], NewOpts) -> set_xoption([_ | Opts], NewOpts) ->
% skip unknown field % skip unknown field
set_xoption(Opts, NewOpts). set_xoption(Opts, NewOpts).
%%%% last item cache handling %%%% last item cache handling
is_last_item_cache_enabled({_, ServerHost, _}) ->
is_last_item_cache_enabled(ServerHost);
is_last_item_cache_enabled(Host) ->
case ets:lookup(gen_mod:get_module_proc(Host, config), last_item_cache) of
[{last_item_cache, true}] -> true;
_ -> false
end.
set_cached_item({_, ServerHost, _}, NodeId, ItemId, Payload) -> set_cached_item({_, ServerHost, _}, NodeId, ItemId, Payload) ->
set_cached_item(ServerHost, NodeId, ItemId, Payload); set_cached_item(ServerHost, NodeId, ItemId, Payload);
set_cached_item(Host, NodeId, ItemId, Payload) -> set_cached_item(Host, NodeId, ItemId, Payload) ->
case ets:lookup(gen_mod:get_module_proc(Host, config), last_item_cache) of case is_last_item_cache_enabled(Host) of
[{last_item_cache, true}] -> true -> ets:insert(gen_mod:get_module_proc(Host, last_items), {NodeId, {ItemId, Payload}});
ets:insert(gen_mod:get_module_proc(Host, last_items), {NodeId, {ItemId, Payload}}); _ -> ok
_ ->
ok
end. end.
unset_cached_item({_, ServerHost, _}, NodeId) -> unset_cached_item({_, ServerHost, _}, NodeId) ->
unset_cached_item(ServerHost, NodeId); unset_cached_item(ServerHost, NodeId);
unset_cached_item(Host, NodeId) -> unset_cached_item(Host, NodeId) ->
case ets:lookup(gen_mod:get_module_proc(Host, config), last_item_cache) of case is_last_item_cache_enabled(Host) of
[{last_item_cache, true}] -> true -> ets:delete(gen_mod:get_module_proc(Host, last_items), NodeId);
ets:delete(gen_mod:get_module_proc(Host, last_items), NodeId); _ -> ok
_ ->
ok
end. end.
get_cached_item({_, ServerHost, _}, NodeId) -> get_cached_item({_, ServerHost, _}, NodeId) ->
get_cached_item(ServerHost, NodeId); get_cached_item(ServerHost, NodeId);
get_cached_item(Host, NodeId) -> get_cached_item(Host, NodeId) ->
case ets:lookup(gen_mod:get_module_proc(Host, config), last_item_cache) of case is_last_item_cache_enabled(Host) of
[{last_item_cache, true}] -> true ->
case ets:lookup(gen_mod:get_module_proc(Host, last_items), NodeId) of case ets:lookup(gen_mod:get_module_proc(Host, last_items), NodeId) of
[{NodeId, {ItemId, Payload}}] -> [{NodeId, {ItemId, Payload}}] ->
#pubsub_item{itemid = {ItemId, NodeId}, payload = Payload}; #pubsub_item{itemid = {ItemId, NodeId}, payload = Payload};
@ -3482,7 +3538,7 @@ select_type(ServerHost, Host, Node) ->
features() -> features() ->
[ [
%TODO "access-authorize", % OPTIONAL % see plugin "access-authorize", % OPTIONAL
"access-open", % OPTIONAL this relates to access_model option in node_hometree "access-open", % OPTIONAL this relates to access_model option in node_hometree
"access-presence", % OPTIONAL this relates to access_model option in node_pep "access-presence", % OPTIONAL this relates to access_model option in node_pep
%TODO "access-roster", % OPTIONAL %TODO "access-roster", % OPTIONAL
@ -3496,7 +3552,7 @@ features() ->
% see plugin "delete-items", % RECOMMENDED % see plugin "delete-items", % RECOMMENDED
% see plugin "delete-nodes", % RECOMMENDED % see plugin "delete-nodes", % RECOMMENDED
% see plugin "filtered-notifications", % RECOMMENDED % see plugin "filtered-notifications", % RECOMMENDED
%TODO "get-pending", % OPTIONAL % see plugin "get-pending", % OPTIONAL
% see plugin "instant-nodes", % RECOMMENDED % see plugin "instant-nodes", % RECOMMENDED
"item-ids", % RECOMMENDED "item-ids", % RECOMMENDED
"last-published", % RECOMMENDED "last-published", % RECOMMENDED
@ -3506,8 +3562,8 @@ features() ->
"member-affiliation", % RECOMMENDED "member-affiliation", % RECOMMENDED
%TODO "meta-data", % RECOMMENDED %TODO "meta-data", % RECOMMENDED
% see plugin "modify-affiliations", % OPTIONAL % see plugin "modify-affiliations", % OPTIONAL
%TODO "multi-collection", % OPTIONAL % see plugin "multi-collection", % OPTIONAL
%TODO "multi-subscribe", % OPTIONAL % see plugin "multi-subscribe", % OPTIONAL
% see plugin "outcast-affiliation", % RECOMMENDED % see plugin "outcast-affiliation", % RECOMMENDED
% see plugin "persistent-items", % RECOMMENDED % see plugin "persistent-items", % RECOMMENDED
"presence-notifications", % OPTIONAL "presence-notifications", % OPTIONAL
@ -3521,8 +3577,9 @@ features() ->
"retrieve-default" % RECOMMENDED "retrieve-default" % RECOMMENDED
% see plugin "retrieve-items", % RECOMMENDED % see plugin "retrieve-items", % RECOMMENDED
% see plugin "retrieve-subscriptions", % RECOMMENDED % see plugin "retrieve-subscriptions", % RECOMMENDED
%TODO "shim", % OPTIONAL
% see plugin "subscribe", % REQUIRED % see plugin "subscribe", % REQUIRED
%TODO "subscription-options", % OPTIONAL % see plugin "subscription-options", % OPTIONAL
% see plugin "subscription-notifications" % OPTIONAL % see plugin "subscription-notifications" % OPTIONAL
]. ].
features(Type) -> features(Type) ->

View File

@ -536,7 +536,7 @@ remove_extra_items(NodeId, MaxItems, ItemIds) ->
%% ItemId = string() %% ItemId = string()
%% @doc <p>Triggers item deletion.</p> %% @doc <p>Triggers item deletion.</p>
%% <p>Default plugin: The user performing the deletion must be the node owner %% <p>Default plugin: The user performing the deletion must be the node owner
%% or a publisher.</p> %% or a publisher, or PublishModel being open.</p>
delete_item(NodeId, Publisher, PublishModel, ItemId) -> delete_item(NodeId, Publisher, PublishModel, ItemId) ->
GenKey = jlib:short_prepd_bare_jid(Publisher), GenKey = jlib:short_prepd_bare_jid(Publisher),
GenState = get_state(NodeId, GenKey), GenState = get_state(NodeId, GenKey),
@ -696,7 +696,10 @@ set_subscriptions(NodeId, Owner, Subscription, SubId) ->
SubState = get_state(NodeId, SubKey), SubState = get_state(NodeId, SubKey),
case {SubId, SubState#pubsub_state.subscriptions} of case {SubId, SubState#pubsub_state.subscriptions} of
{_, []} -> {_, []} ->
{error, 'item-not-found'}; case Subscription of
none -> ok;
_ -> new_subscription(NodeId, Owner, Subscription, SubState)
end;
{"", [{_, SID}]} -> {"", [{_, SID}]} ->
case Subscription of case Subscription of
none -> unsub_with_subid(NodeId, SID, SubState); none -> unsub_with_subid(NodeId, SID, SubState);
@ -721,8 +724,14 @@ replace_subscription(_, [], Acc) ->
replace_subscription({Sub, SubId}, [{_, SubID} | T], Acc) -> replace_subscription({Sub, SubId}, [{_, SubID} | T], Acc) ->
replace_subscription({Sub, SubId}, T, [{Sub, SubID} | Acc]). replace_subscription({Sub, SubId}, T, [{Sub, SubID} | Acc]).
new_subscription(NodeId, Owner, Subscription, SubState) ->
SubId = pubsub_subscription:add_subscription(Owner, NodeId, []),
Subscriptions = SubState#pubsub_state.subscriptions,
set_state(SubState#pubsub_state{subscriptions = [{Subscription, SubId} | Subscriptions]}),
{Subscription, SubId}.
unsub_with_subid(NodeId, SubId, SubState) -> unsub_with_subid(NodeId, SubId, SubState) ->
pubsub_subscription:unsubscribe_node(SubState#pubsub_state.stateid, pubsub_subscription:delete_subscription(SubState#pubsub_state.stateid,
NodeId, SubId), NodeId, SubId),
NewSubs = lists:filter(fun ({_, SID}) -> SubId =/= SID end, NewSubs = lists:filter(fun ({_, SID}) -> SubId =/= SID end,
SubState#pubsub_state.subscriptions), SubState#pubsub_state.subscriptions),

View File

@ -107,7 +107,7 @@
-record(pubsub_state, {stateid, -record(pubsub_state, {stateid,
items = [], items = [],
affiliation = none, affiliation = none,
subscriptions = none subscriptions = []
}). }).
%%% @type pubsubItem() = #pubsub_item{ %%% @type pubsubItem() = #pubsub_item{