--- mod_pubsub.erl 2013-06-13 23:58:13.380824021 +0200 +++ mod_pubsub_odbc.erl 2013-06-14 00:01:11.907478941 +0200 @@ -43,7 +43,7 @@ %%% 6.2.3.1, 6.2.3.5, and 6.3. For information on subscription leases see %%% XEP-0060 section 12.18. --module(mod_pubsub). +-module(mod_pubsub_odbc). -author('christophe.romain@process-one.net'). @@ -62,11 +62,11 @@ -include("pubsub.hrl"). --define(STDTREE, <<"tree">>). +-define(STDTREE, <<"tree_odbc">>). --define(STDNODE, <<"flat">>). +-define(STDNODE, <<"flat_odbc">>). --define(PEPNODE, <<"pep">>). +-define(PEPNODE, <<"pep_odbc">>). %% exports for hooks -export([presence_probe/3, caps_update/3, @@ -101,7 +101,7 @@ -export([subscription_to_string/1, affiliation_to_string/1, string_to_subscription/1, string_to_affiliation/1, extended_error/2, extended_error/3, - rename_default_nodeplugin/0]). + escape/1]). %% API and gen_server callbacks -export([start_link/2, start/2, stop/1, init/1, @@ -111,7 +111,7 @@ %% calls for parallel sending of last items -export([send_loop/1]). --define(PROCNAME, ejabberd_mod_pubsub). +-define(PROCNAME, ejabberd_mod_pubsub_odbc). -define(LOOPNAME, ejabberd_mod_pubsub_loop). @@ -350,8 +350,6 @@ false -> ok end, ejabberd_router:register_route(Host), - update_node_database(Host, ServerHost), - update_state_database(Host, ServerHost), put(server_host, ServerHost), init_nodes(Host, ServerHost, NodeTree, Plugins), State = #state{host = Host, server_host = ServerHost, @@ -424,359 +422,14 @@ ok. init_nodes(Host, ServerHost, _NodeTree, Plugins) -> - case lists:member(<<"hometree">>, Plugins) of + case lists:member(<<"hometree_odbc">>, Plugins) of true -> - create_node(Host, ServerHost, <<"/home">>, service_jid(Host), <<"hometree">>), + create_node(Host, ServerHost, <<"/home">>, service_jid(Host), <<"hometree_odbc">>), create_node(Host, ServerHost, <<"/home/", ServerHost/binary>>, service_jid(Host), - <<"hometree">>); + <<"hometree_odbc">>); false -> ok end. -update_node_database(Host, ServerHost) -> - mnesia:del_table_index(pubsub_node, type), - mnesia:del_table_index(pubsub_node, parentid), - case catch mnesia:table_info(pubsub_node, attributes) of - [host_node, host_parent, info] -> - ?INFO_MSG("upgrade node pubsub tables", []), - F = fun () -> - {Result, LastIdx} = lists:foldl(fun ({pubsub_node, - NodeId, ParentId, - {nodeinfo, Items, - Options, - Entities}}, - {RecList, - NodeIdx}) -> - ItemsList = - lists:foldl(fun - ({item, - IID, - Publisher, - Payload}, - Acc) -> - C = - {unknown, - Publisher}, - M = - {now(), - Publisher}, - mnesia:write(#pubsub_item{itemid - = - {IID, - NodeIdx}, - creation - = - C, - modification - = - M, - payload - = - Payload}), - [{Publisher, - IID} - | Acc] - end, - [], - Items), - Owners = - dict:fold(fun - (JID, - {entity, - Aff, - Sub}, - Acc) -> - UsrItems = - lists:foldl(fun - ({P, - I}, - IAcc) -> - case - P - of - JID -> - [I - | IAcc]; - _ -> - IAcc - end - end, - [], - ItemsList), - mnesia:write({pubsub_state, - {JID, - NodeIdx}, - UsrItems, - Aff, - Sub}), - case - Aff - of - owner -> - [JID - | Acc]; - _ -> - Acc - end - end, - [], - Entities), - mnesia:delete({pubsub_node, - NodeId}), - {[#pubsub_node{nodeid - = - NodeId, - id - = - NodeIdx, - parents - = - [element(2, - ParentId)], - owners - = - Owners, - options - = - Options} - | RecList], - NodeIdx + 1} - end, - {[], 1}, - mnesia:match_object({pubsub_node, - {Host, - '_'}, - '_', - '_'})), - mnesia:write(#pubsub_index{index = node, last = LastIdx, - free = []}), - Result - end, - {atomic, NewRecords} = mnesia:transaction(F), - {atomic, ok} = mnesia:delete_table(pubsub_node), - {atomic, ok} = mnesia:create_table(pubsub_node, - [{disc_copies, [node()]}, - {attributes, - record_info(fields, - pubsub_node)}]), - FNew = fun () -> - lists:foreach(fun (Record) -> mnesia:write(Record) end, - NewRecords) - end, - case mnesia:transaction(FNew) of - {atomic, Result} -> - ?INFO_MSG("Pubsub node tables updated correctly: ~p", - [Result]); - {aborted, Reason} -> - ?ERROR_MSG("Problem updating Pubsub node tables:~n~p", - [Reason]) - end; - [nodeid, parentid, type, owners, options] -> - F = fun ({pubsub_node, NodeId, {_, Parent}, Type, - Owners, Options}) -> - #pubsub_node{nodeid = NodeId, id = 0, - parents = [Parent], type = Type, - owners = Owners, options = Options} - end, - mnesia:transform_table(pubsub_node, F, - [nodeid, id, parents, type, owners, options]), - FNew = fun () -> - LastIdx = lists:foldl(fun (#pubsub_node{nodeid = - NodeId} = - PubsubNode, - NodeIdx) -> - mnesia:write(PubsubNode#pubsub_node{id - = - NodeIdx}), - lists:foreach(fun - (#pubsub_state{stateid - = - StateId} = - State) -> - {JID, - _} = - StateId, - mnesia:delete({pubsub_state, - StateId}), - mnesia:write(State#pubsub_state{stateid - = - {JID, - NodeIdx}}) - end, - mnesia:match_object(#pubsub_state{stateid - = - {'_', - NodeId}, - _ - = - '_'})), - lists:foreach(fun - (#pubsub_item{itemid - = - ItemId} = - Item) -> - {IID, - _} = - ItemId, - {M1, - M2} = - Item#pubsub_item.modification, - {C1, - C2} = - Item#pubsub_item.creation, - mnesia:delete({pubsub_item, - ItemId}), - mnesia:write(Item#pubsub_item{itemid - = - {IID, - NodeIdx}, - modification - = - {M2, - M1}, - creation - = - {C2, - C1}}) - end, - mnesia:match_object(#pubsub_item{itemid - = - {'_', - NodeId}, - _ - = - '_'})), - NodeIdx + 1 - end, - 1, - mnesia:match_object({pubsub_node, - {Host, '_'}, - '_', '_', - '_', '_', - '_'}) - ++ - mnesia:match_object({pubsub_node, - {{'_', - ServerHost, - '_'}, - '_'}, - '_', '_', - '_', '_', - '_'})), - mnesia:write(#pubsub_index{index = node, - last = LastIdx, free = []}) - end, - case mnesia:transaction(FNew) of - {atomic, Result} -> - rename_default_nodeplugin(), - ?INFO_MSG("Pubsub node tables updated correctly: ~p", - [Result]); - {aborted, Reason} -> - ?ERROR_MSG("Problem updating Pubsub node tables:~n~p", - [Reason]) - end; - [nodeid, id, parent, type, owners, options] -> - F = fun ({pubsub_node, NodeId, Id, Parent, Type, Owners, - Options}) -> - #pubsub_node{nodeid = NodeId, id = Id, - parents = [Parent], type = Type, - owners = Owners, options = Options} - end, - mnesia:transform_table(pubsub_node, F, - [nodeid, id, parents, type, owners, options]), - rename_default_nodeplugin(); - _ -> ok - end, - mnesia:transaction(fun () -> - case catch mnesia:first(pubsub_node) of - {_, L} when is_binary(L) -> - lists:foreach(fun ({H, N}) - when is_binary(N) -> - [Node] = - mnesia:read({pubsub_node, - {H, - N}}), - Type = - Node#pubsub_node.type, - BN = element(2, - node_call(Type, - path_to_node, - [N])), - BP = case [element(2, - node_call(Type, - path_to_node, - [P])) - || P - <- Node#pubsub_node.parents] - of - [<<>>] -> []; - Parents -> - Parents - end, - mnesia:write(Node#pubsub_node{nodeid - = - {H, - BN}, - parents - = - BP}), - mnesia:delete({pubsub_node, - {H, - N}}); - (_) -> ok - end, - mnesia:all_keys(pubsub_node)); - _ -> ok - end - end). - -rename_default_nodeplugin() -> - lists:foreach(fun (Node) -> - mnesia:dirty_write(Node#pubsub_node{type = - <<"hometree">>}) - end, - mnesia:dirty_match_object(#pubsub_node{type = - <<"default">>, - _ = '_'})). - -update_state_database(_Host, _ServerHost) -> - case catch mnesia:table_info(pubsub_state, attributes) of - [stateid, items, affiliation, subscription] -> - ?INFO_MSG("upgrade state pubsub tables", []), - F = fun ({pubsub_state, {JID, NodeID}, Items, Aff, Sub}, Acc) -> - Subs = case Sub of - none -> - []; - _ -> - {result, SubID} = pubsub_subscription:subscribe_node(JID, NodeID, []), - [{Sub, SubID}] - end, - NewState = #pubsub_state{stateid = {JID, NodeID}, - items = Items, - affiliation = Aff, - subscriptions = Subs}, - [NewState | Acc] - end, - {atomic, NewRecs} = mnesia:transaction(fun mnesia:foldl/3, - [F, [], pubsub_state]), - {atomic, ok} = mnesia:delete_table(pubsub_state), - {atomic, ok} = mnesia:create_table(pubsub_state, - [{disc_copies, [node()]}, - {attributes, record_info(fields, pubsub_state)}]), - FNew = fun () -> - lists:foreach(fun mnesia:write/1, NewRecs) - end, - case mnesia:transaction(FNew) of - {atomic, Result} -> - ?INFO_MSG("Pubsub state tables updated correctly: ~p", - [Result]); - {aborted, Reason} -> - ?ERROR_MSG("Problem updating Pubsub state tables:~n~p", - [Reason]) - end; - _ -> - ok - end. - send_loop(State) -> receive {presence, JID, Pid} -> @@ -785,11 +438,13 @@ LJID = jlib:jid_tolower(JID), BJID = jlib:jid_remove_resource(LJID), lists:foreach(fun (PType) -> - {result, Subscriptions} = node_action(Host, + {result, Subscriptions} = case catch node_action(Host, PType, - get_entity_subscriptions, - [Host, - JID]), + get_entity_subscriptions_for_send_last, + [Host, JID]) of + {result, S} -> S; + _ -> [] + end, lists:foreach(fun ({Node, subscribed, _, SubJID}) -> if (SubJID == LJID) or @@ -801,24 +456,14 @@ type = Type, id = - NodeId, - options - = - Options} = + NodeId} = Node, - case - get_option(Options, - send_last_published_item) - of - on_sub_and_presence -> - send_items(H, + send_items(H, N, NodeId, Type, LJID, last); - _ -> ok - end; true -> % resource not concerned about that subscription ok @@ -1009,7 +654,8 @@ children = []}]; disco_identity(Host, Node, From) -> Action = fun (#pubsub_node{id = Idx, type = Type, - options = Options, owners = Owners}) -> + options = Options}) -> + Owners = node_owners_call(Type, Idx), case get_allowed_items_call(Host, Idx, From, Type, Options, Owners) of {result, _} -> {result, @@ -1061,7 +707,8 @@ || Feature <- features(<<"pep">>)]]; disco_features(Host, Node, From) -> Action = fun (#pubsub_node{id = Idx, type = Type, - options = Options, owners = Owners}) -> + options = Options}) -> + Owners = node_owners_call(Type, Idx), case get_allowed_items_call(Host, Idx, From, Type, Options, Owners) of {result, _} -> {result, @@ -1106,9 +753,9 @@ ). disco_items(Host, <<>>, From) -> Action = fun (#pubsub_node{nodeid = {_, NodeID}, - options = Options, type = Type, id = Idx, - owners = Owners}, + options = Options, type = Type, id = Idx}, Acc) -> + Owners = node_owners_call(Type, Idx), case get_allowed_items_call(Host, Idx, From, Type, Options, Owners) of {result, _} -> [#xmlel{name = <<"item">>, @@ -1129,13 +776,14 @@ _ -> Acc end end, - case transaction(Host, Action, sync_dirty) of + case transaction_on_nodes(Host, Action, sync_dirty) of {result, Items} -> Items; _ -> [] end; disco_items(Host, Node, From) -> Action = fun (#pubsub_node{id = Idx, type = Type, - options = Options, owners = Owners}) -> + options = Options}) -> + Owners = node_owners_call(Type, Idx), case get_allowed_items_call(Host, Idx, From, Type, Options, Owners) of @@ -1239,9 +887,6 @@ lists:foreach(fun ({#pubsub_node{options = Options, - owners - = - Owners, id = NodeId}, subscribed, _, @@ -1253,7 +898,7 @@ presence -> case lists:member(BJID, - Owners) + node_owners(Host, PType, NodeId)) of true -> node_action(Host, @@ -1503,7 +1148,8 @@ IQ -> #xmlel{attrs = QAttrs} = SubEl, Node = xml:get_attr_s(<<"node">>, QAttrs), - Res = case iq_disco_items(Host, Node, From) of + Rsm = jlib:rsm_decode(IQ), + Res = case iq_disco_items(Host, Node, From, Rsm) of {result, IQRes} -> jlib:iq_to_xml(IQ#iq{type = result, sub_el = @@ -1630,7 +1276,7 @@ % [] -> % [<<"leaf">>]; %% No sub-nodes: it's a leaf node % _ -> -% case node_call(Type, get_items, [NodeId, From]) of +% case node_call(Type, get_items, [NodeId, From, none]) of % {result, []} -> [<<"collection">>]; % {result, _} -> [<<"leaf">>, <<"collection">>]; % _ -> [] @@ -1652,7 +1298,11 @@ % [#xmlel{name = <<"feature">>, % attrs = [{<<"var">>, ?NS_PUBSUB}], % children = []} -% | lists:map(fun (T) -> +% | lists:map(fun +% (<<"rsm">>)-> +% #xmlel{name = <<"feature">>, +% attrs = [{<<"var">>, ?NS_RSM}]}; +% (T) -> % #xmlel{name = <<"feature">>, % attrs = % [{<<"var">>, @@ -1677,7 +1327,7 @@ [] -> [<<"leaf">>]; _ -> case node_call(Type, get_items, - [NodeId, From]) + [NodeId, From, none]) of {result, []} -> [<<"collection">>]; @@ -1699,7 +1349,11 @@ F = [#xmlel{name = <<"feature">>, attrs = [{<<"var">>, ?NS_PUBSUB}], children = []} - | lists:map(fun (T) -> + | lists:map(fun + (<<"rsm">>)-> + #xmlel{name = <<"feature">>, + attrs = [{<<"var">>, ?NS_RSM}]}; + (T) -> #xmlel{name = <<"feature">>, attrs = [{<<"var">>, @@ -1743,7 +1397,11 @@ #xmlel{name = <<"feature">>, attrs = [{<<"var">>, ?NS_VCARD}], children = []}] ++ - lists:map(fun (Feature) -> + lists:map(fun + (<<"rsm">>)-> + #xmlel{name = <<"feature">>, + attrs = [{<<"var">>, ?NS_RSM}]}; + (Feature) -> #xmlel{name = <<"feature">>, attrs = [{<<"var">>, <<(?NS_PUBSUB)/binary, "#", Feature/binary>>}], @@ -1756,14 +1414,15 @@ _ -> node_disco_info(Host, Node, From) end. --spec(iq_disco_items/3 :: +-spec(iq_disco_items/4 :: ( Host :: mod_pubsub:host(), NodeId :: <<>> | mod_pubsub:nodeId(), - From :: jid()) + From :: jid(), + Rsm :: any()) -> {result, [xmlel()]} ). -iq_disco_items(Host, <<>>, From) -> +iq_disco_items(Host, <<>>, From, _RSM) -> {result, lists:map(fun (#pubsub_node{nodeid = {_, SubNode}, options = Options}) -> @@ -1800,7 +1459,7 @@ % Nodes)}; % Other -> Other % end; -iq_disco_items(Host, ?NS_COMMANDS, _From) -> +iq_disco_items(Host, ?NS_COMMANDS, _From, _RSM) -> CommandItems = [#xmlel{name = <<"item">>, attrs = [{<<"jid">>, Host}, @@ -1808,22 +1467,19 @@ {<<"name">>, <<"Get Pending">>}], children = []}], {result, CommandItems}; -iq_disco_items(_Host, ?NS_PUBSUB_GET_PENDING, _From) -> +iq_disco_items(_Host, ?NS_PUBSUB_GET_PENDING, _From, _RSM) -> CommandItems = [], {result, CommandItems}; -iq_disco_items(Host, Item, From) -> +iq_disco_items(Host, Item, From, RSM) -> case str:tokens(Item, <<"!">>) of [_Node, _ItemID] -> {result, []}; [Node] -> % Node = string_to_node(SNode), Action = fun (#pubsub_node{id = Idx, type = Type, - options = Options, owners = Owners}) -> - NodeItems = case get_allowed_items_call(Host, Idx, - From, Type, - Options, - Owners) - of + options = Options}) -> + Owners = node_owners_call(Type, Idx), + {NodeItems, RsmOut} = case get_allowed_items_call(Host, Idx, From, Type, Options, Owners, RSM) of {result, R} -> R; - _ -> [] + _ -> {[], none} end, Nodes = lists:map(fun (#pubsub_node{nodeid = {_, SubNode}, @@ -1866,7 +1522,7 @@ children = []} end, NodeItems), - {result, Nodes ++ Items} + {result, Nodes ++ Items ++ jlib:rsm_encode(RsmOut)} end, case transaction(Host, Node, Action, sync_dirty) of {result, {_, Result}} -> {result, Result}; @@ -2017,7 +1673,8 @@ (_, Acc) -> Acc end, [], xml:remove_cdata(Els)), - get_items(Host, Node, From, SubId, MaxItems, ItemIDs); + RSM = jlib:rsm_decode(SubEl), + get_items(Host, Node, From, SubId, MaxItems, ItemIDs, RSM); {get, <<"subscriptions">>} -> get_subscriptions(Host, Node, From, Plugins); {get, <<"affiliations">>} -> @@ -2052,7 +1709,9 @@ ). iq_pubsub_owner(Host, ServerHost, From, IQType, SubEl, Lang) -> #xmlel{children = SubEls} = SubEl, - Action = xml:remove_cdata(SubEls), + Action = lists:filter(fun(#xmlel{name = <<"set">>, _ = '_'}) -> false; + (_) -> true + end, xml:remove_cdata(SubEls)), case Action of [#xmlel{name = Name, attrs = Attrs, children = Els}] -> Node = xml:get_attr_s(<<"node">>, Attrs), @@ -2186,7 +1845,8 @@ _ -> [] end end, - case transaction(fun () -> + case transaction(Host, + fun () -> {result, lists:flatmap(Tr, Plugins)} end, sync_dirty) @@ -2231,7 +1891,8 @@ %%% authorization handling -send_authorization_request(#pubsub_node{owners = Owners, nodeid = {Host, Node}}, +send_authorization_request(#pubsub_node{nodeid = {Host, Node}, + type = Type, id = NodeId}, Subscriber) -> Lang = <<"en">>, Stanza = #xmlel{name = <<"message">>, attrs = [], @@ -2309,7 +1970,7 @@ ejabberd_router:route(service_jid(Host), jlib:make_jid(Owner), Stanza) end, - Owners). + node_owners(Host, Type, NodeId)). find_authorization_response(Packet) -> #xmlel{children = Els} = Packet, @@ -2374,11 +2035,11 @@ <<"true">> -> true; _ -> false end, - Action = fun (#pubsub_node{type = Type, owners = Owners, + Action = fun (#pubsub_node{type = Type, id = NodeId}) -> IsApprover = lists:member(jlib:jid_tolower(jlib:jid_remove_resource(From)), - Owners), + node_owners_call(Type, NodeId)), {result, Subscriptions} = node_call(Type, get_subscriptions, [NodeId, @@ -2633,7 +2294,7 @@ children = [#xmlel{name = <<"create">>, attrs = nodeAttr(Node), children = []}]}], - case transaction(CreateNode, transaction) of + case transaction(Host, CreateNode, transaction) of {result, {NodeId, SubsByDepth, {Result, broadcast}}} -> broadcast_created_node(Host, Node, NodeId, Type, NodeOptions, SubsByDepth), ejabberd_hooks:run(pubsub_create_node, ServerHost, [ServerHost, Host, Node, NodeId, NodeOptions]), @@ -2770,7 +2431,7 @@ %% subscribe_node(Host, Node, From, JID, Configuration) -> SubOpts = case - pubsub_subscription:parse_options_xform(Configuration) + pubsub_subscription_odbc:parse_options_xform(Configuration) of {result, GoodSubOpts} -> GoodSubOpts; _ -> invalid @@ -2784,7 +2445,7 @@ end end, Action = fun (#pubsub_node{options = Options, - owners = Owners, type = Type, id = NodeId}) -> + type = Type, id = NodeId}) -> Features = features(Type), SubscribeFeature = lists:member(<<"subscribe">>, Features), OptionsFeature = lists:member(<<"subscription-options">>, Features), @@ -2793,6 +2454,7 @@ AccessModel = get_option(Options, access_model), SendLast = get_option(Options, send_last_published_item), AllowedGroups = get_option(Options, roster_groups_allowed, []), + Owners = node_owners_call(Type, NodeId), {PresenceSubscription, RosterGroup} = get_presence_and_roster_permissions(Host, Subscriber, Owners, AccessModel, AllowedGroups), @@ -2947,12 +2609,9 @@ Features = features(Type), PublishFeature = lists:member(<<"publish">>, Features), PublishModel = get_option(Options, publish_model), + MaxItems = max_items(Host, Options), DeliverPayloads = get_option(Options, deliver_payloads), PersistItems = get_option(Options, persist_items), - MaxItems = case PersistItems of - false -> 0; - true -> max_items(Host, Options) - end, PayloadCount = payload_xmlelements(Payload), PayloadSize = byte_size(term_to_binary(Payload)) - 2, PayloadMaxSize = get_option(Options, max_payload_size), @@ -3008,7 +2667,7 @@ false -> ok end, - set_cached_item(Host, NodeId, ItemId, Publisher, Payload), + set_cached_item(Host, NodeId, ItemId, Publisher, Payload), case Result of default -> {result, Reply}; _ -> {result, Result} @@ -3201,19 +2860,20 @@ %%

The permission are not checked in this function.

%% @todo We probably need to check that the user doing the query has the right %% to read the items. --spec(get_items/6 :: +-spec(get_items/7 :: ( Host :: mod_pubsub:host(), Node :: mod_pubsub:nodeId(), From :: jid(), SubId :: mod_pubsub:subId(), SMaxItems :: binary(), - ItemIDs :: [mod_pubsub:itemId()]) + ItemIDs :: [mod_pubsub:itemId()], + Rsm :: any()) -> {result, [xmlel(),...]} %%% | {error, xmlel()} ). -get_items(Host, Node, From, SubId, SMaxItems, ItemIDs) -> +get_items(Host, Node, From, SubId, SMaxItems, ItemIDs, RSM) -> MaxItems = if SMaxItems == <<"">> -> get_max_items_node(Host); true -> @@ -3225,13 +2885,13 @@ case MaxItems of {error, Error} -> {error, Error}; _ -> - Action = fun (#pubsub_node{options = Options, type = Type, id = NodeId, - owners = Owners}) -> + Action = fun (#pubsub_node{options = Options, type = Type, id = NodeId}) -> Features = features(Type), RetreiveFeature = lists:member(<<"retrieve-items">>, Features), PersistentFeature = lists:member(<<"persistent-items">>, Features), AccessModel = get_option(Options, access_model), AllowedGroups = get_option(Options, roster_groups_allowed, []), + Owners = node_owners_call(Type, NodeId), {PresenceSubscription, RosterGroup} = get_presence_and_roster_permissions(Host, From, Owners, AccessModel, AllowedGroups), @@ -3249,11 +2909,11 @@ node_call(Type, get_items, [NodeId, From, AccessModel, PresenceSubscription, RosterGroup, - SubId]) + SubId, RSM]) end end, case transaction(Host, Node, Action, sync_dirty) of - {result, {_, Items}} -> + {result, {_, {Items, RSMOut}}} -> SendItems = case ItemIDs of [] -> Items; _ -> @@ -3271,8 +2931,8 @@ children = [#xmlel{name = <<"items">>, attrs = nodeAttr(Node), children = - itemsEls(lists:sublist(SendItems, - MaxItems))}]}]}; + itemsEls(lists:sublist(SendItems, MaxItems))} + | jlib:rsm_encode(RSMOut)]}]}; Error -> Error end end. @@ -3296,13 +2956,18 @@ end. get_allowed_items_call(Host, NodeIdx, From, Type, Options, Owners) -> + case get_allowed_items_call(Host, NodeIdx, From, Type, Options, Owners, none) of + {result, {I, _}} -> {result, I}; + Error -> Error + end. +get_allowed_items_call(Host, NodeIdx, From, Type, Options, Owners, RSM) -> AccessModel = get_option(Options, access_model), AllowedGroups = get_option(Options, roster_groups_allowed, []), {PresenceSubscription, RosterGroup} = get_presence_and_roster_permissions(Host, From, Owners, AccessModel, AllowedGroups), node_call(Type, get_items, - [NodeIdx, From, AccessModel, PresenceSubscription, RosterGroup, undefined]). + [NodeIdx, From, AccessModel, PresenceSubscription, RosterGroup, undefined, RSM]). %% @spec (Host, Node, NodeId, Type, LJID, Number) -> any() %% Host = pubsubHost() @@ -3313,35 +2978,32 @@ %% Number = last | integer() %% @doc

Resend the items of a node to the user.

%% @todo use cache-last-item feature -send_items(Host, Node, NodeId, Type, {U, S, R} = LJID, last) -> - case get_cached_item(Host, NodeId) of - undefined -> - send_items(Host, Node, NodeId, Type, LJID, 1); +send_items(Host, Node, NodeId, Type, LJID, last) -> + Stanza = case get_cached_item(Host, NodeId) of + undefined -> + % special ODBC optimization, works only with node_hometree_odbc, node_flat_odbc and node_pep_odbc + case node_action(Host, Type, get_last_items, [NodeId, LJID, 1]) of + {result, [LastItem]} -> + {ModifNow, ModifUSR} = LastItem#pubsub_item.modification, + event_stanza_with_delay( + [#xmlel{name = <<"items">>, attrs = nodeAttr(Node), + children = itemsEls([LastItem])}], ModifNow, ModifUSR); + _ -> + event_stanza( + [#xmlel{name = <<"items">>, attrs = nodeAttr(Node), + children = itemsEls([])}]) + end; LastItem -> {ModifNow, ModifUSR} = LastItem#pubsub_item.modification, - Stanza = event_stanza_with_delay([#xmlel{name = + event_stanza_with_delay([#xmlel{name = <<"items">>, attrs = nodeAttr(Node), children = itemsEls([LastItem])}], - ModifNow, ModifUSR), - case is_tuple(Host) of - false -> - ejabberd_router:route(service_jid(Host), - jlib:make_jid(LJID), Stanza); - true -> - case ejabberd_sm:get_session_pid(U, S, R) of - C2SPid when is_pid(C2SPid) -> - ejabberd_c2s:broadcast(C2SPid, - {pep_message, - <<((Node))/binary, "+notify">>}, - _Sender = service_jid(Host), - Stanza); - _ -> ok - end - end - end; + ModifNow, ModifUSR) + end, + ejabberd_router:route(service_jid(Host), jlib:make_jid(LJID), Stanza); send_items(Host, Node, NodeId, Type, {U, S, R} = LJID, Number) -> ToSend = case node_action(Host, Type, get_items, @@ -3369,20 +3031,7 @@ attrs = nodeAttr(Node), children = itemsEls(ToSend)}]) end, - case is_tuple(Host) of - false -> - ejabberd_router:route(service_jid(Host), - jlib:make_jid(LJID), Stanza); - true -> - case ejabberd_sm:get_session_pid(U, S, R) of - C2SPid when is_pid(C2SPid) -> - ejabberd_c2s:broadcast(C2SPid, - {pep_message, - <<((Node))/binary, "+notify">>}, - _Sender = service_jid(Host), Stanza); - _ -> ok - end - end. + ejabberd_router:route(service_jid(Host), jlib:make_jid(LJID), Stanza). %% @spec (Host, JID, Plugins) -> {error, Reason} | {result, Response} %% Host = host() @@ -3577,9 +3226,10 @@ case Entities of error -> {error, ?ERR_BAD_REQUEST}; _ -> - Action = fun (#pubsub_node{owners = Owners, type = Type, + Action = fun (#pubsub_node{type = Type, id = NodeId} = N) -> + Owners = node_owners_call(Type, NodeId), case lists:member(Owner, Owners) of true -> OwnerJID = jlib:make_jid(Owner), @@ -3592,42 +3242,7 @@ _ -> Entities end, lists:foreach(fun ({JID, Affiliation}) -> - node_call(Type, - set_affiliation, - [NodeId, JID, - Affiliation]), - case Affiliation of - owner -> - NewOwner = - jlib:jid_tolower(jlib:jid_remove_resource(JID)), - NewOwners = - [NewOwner - | Owners], - tree_call(Host, - set_node, - [N#pubsub_node{owners - = - NewOwners}]); - none -> - OldOwner = - jlib:jid_tolower(jlib:jid_remove_resource(JID)), - case - lists:member(OldOwner, - Owners) - of - true -> - NewOwners = - Owners -- - [OldOwner], - tree_call(Host, - set_node, - [N#pubsub_node{owners - = - NewOwners}]); - _ -> ok - end; - _ -> ok - end + node_call(Type, set_affiliation, [NodeId, JID, Affiliation]) end, FilteredEntities), {result, []}; @@ -3686,11 +3301,11 @@ end. read_sub(Subscriber, Node, NodeID, SubID, Lang) -> - case pubsub_subscription:get_subscription(Subscriber, NodeID, SubID) of + case pubsub_subscription_odbc:get_subscription(Subscriber, NodeID, SubID) of {error, notfound} -> {error, extended_error(?ERR_NOT_ACCEPTABLE, <<"invalid-subid">>)}; {result, #pubsub_subscription{options = Options}} -> - {result, XdataEl} = pubsub_subscription:get_options_xform(Lang, Options), + {result, XdataEl} = pubsub_subscription_odbc:get_options_xform(Lang, Options), OptionsEl = #xmlel{name = <<"options">>, attrs = [{<<"jid">>, jlib:jid_to_string(Subscriber)}, @@ -3724,7 +3339,7 @@ end. set_options_helper(Configuration, JID, NodeID, SubID, Type) -> - SubOpts = case pubsub_subscription:parse_options_xform(Configuration) of + SubOpts = case pubsub_subscription_odbc:parse_options_xform(Configuration) of {result, GoodSubOpts} -> GoodSubOpts; _ -> invalid end, @@ -3756,7 +3371,7 @@ write_sub(_Subscriber, _NodeID, _SubID, invalid) -> {error, extended_error(?ERR_BAD_REQUEST, <<"invalid-options">>)}; write_sub(Subscriber, NodeID, SubID, Options) -> - case pubsub_subscription:set_subscription(Subscriber, NodeID, SubID, Options) of + case pubsub_subscription_odbc:set_subscription(Subscriber, NodeID, SubID, Options) of {error, notfound} -> {error, extended_error(?ERR_NOT_ACCEPTABLE, <<"invalid-subid">>)}; {result, _} -> @@ -3977,9 +3592,9 @@ ejabberd_router:route(service_jid(Host), jlib:make_jid(JID), Stanza) end, - Action = fun (#pubsub_node{owners = Owners, type = Type, + Action = fun (#pubsub_node{type = Type, id = NodeId}) -> - case lists:member(Owner, Owners) of + case lists:member(Owner, node_owners_call(Type, NodeId)) of true -> Result = lists:foldl(fun ({JID, Subscription, SubId}, @@ -4373,7 +3988,7 @@ {Depth, [{N, get_node_subs(N)} || N <- Nodes]} end, tree_call(Host, get_parentnodes_tree, [Host, Node, service_jid(Host)]))} end, - case transaction(Action, sync_dirty) of + case transaction(Host, Action, sync_dirty) of {result, CollSubs} -> CollSubs; _ -> [] end. @@ -4387,9 +4002,9 @@ get_options_for_subs(NodeID, Subs) -> lists:foldl(fun({JID, subscribed, SubID}, Acc) -> - case pubsub_subscription:read_subscription(JID, NodeID, SubID) of + case pubsub_subscription_odbc:get_subscription(JID, NodeID, SubID) of {error, notfound} -> [{JID, SubID, []} | Acc]; - #pubsub_subscription{options = Options} -> [{JID, SubID, Options} | Acc]; + {result, #pubsub_subscription{options = Options}} -> [{JID, SubID, Options} | Acc]; _ -> Acc end; (_, Acc) -> @@ -5089,6 +4704,30 @@ _ -> features() end. +%% @spec (Host, Type, NodeId) -> [ljid()] +%% NodeId = pubsubNodeId() +%% @doc

Return list of node owners.

+node_owners(Host, Type, NodeId) -> + case node_action(Host, Type, get_node_affiliations, [NodeId]) of + {result, Affiliations} -> + lists:foldl( + fun({LJID, owner}, Acc) -> [LJID|Acc]; + (_, Acc) -> Acc + end, [], Affiliations); + _ -> + [] + end. +node_owners_call(Type, NodeId) -> + case node_call(Type, get_node_affiliations, [NodeId]) of + {result, Affiliations} -> + lists:foldl( + fun({LJID, owner}, Acc) -> [LJID|Acc]; + (_, Acc) -> Acc + end, [], Affiliations); + _ -> + [] + end. + %% @doc

node tree plugin call.

tree_call({_User, Server, _Resource}, Function, Args) -> tree_call(Server, Function, Args); @@ -5108,7 +4747,13 @@ tree_action(Host, Function, Args) -> ?DEBUG("tree_action ~p ~p ~p", [Host, Function, Args]), Fun = fun () -> tree_call(Host, Function, Args) end, - catch mnesia:sync_dirty(Fun). + case catch ejabberd_odbc:sql_bloc(odbc_conn(Host), Fun) of + {atomic, Result} -> + Result; + {aborted, Reason} -> + ?ERROR_MSG("transaction return internal error: ~p~n",[{aborted, Reason}]), + {error, ?ERR_INTERNAL_SERVER_ERROR} + end. %% @doc

node plugin call.

node_call(Type, Function, Args) -> @@ -5133,13 +4778,12 @@ node_action(Host, Type, Function, Args) -> ?DEBUG("node_action ~p ~p ~p ~p", [Host, Type, Function, Args]), - transaction(fun () -> node_call(Type, Function, Args) - end, + transaction(Host, fun () -> node_call(Type, Function, Args) end, sync_dirty). %% @doc

plugin transaction handling.

transaction(Host, Node, Action, Trans) -> - transaction(fun () -> + transaction(Host, fun () -> case tree_call(Host, get_node, [Host, Node]) of N when is_record(N, pubsub_node) -> case Action(N) of @@ -5153,16 +4797,22 @@ end, Trans). -transaction(Host, Action, Trans) -> - transaction(fun () -> +transaction_on_nodes(Host, Action, Trans) -> + transaction(Host, fun () -> {result, lists:foldl(Action, [], tree_call(Host, get_nodes, [Host]))} end, Trans). -transaction(Fun, Trans) -> - case catch mnesia:Trans(Fun) of +transaction(Host, Fun, Trans) -> + transaction_retry(Host, Fun, Trans, 2). +transaction_retry(Host, Fun, Trans, Count) -> + SqlFun = case Trans of + transaction -> sql_transaction; + _ -> sql_bloc + end, + case catch ejabberd_odbc:SqlFun(odbc_conn(Host), Fun) of {result, Result} -> {result, Result}; {error, Error} -> {error, Error}; {atomic, {result, Result}} -> {result, Result}; @@ -5171,6 +4821,15 @@ ?ERROR_MSG("transaction return internal error: ~p~n", [{aborted, Reason}]), {error, ?ERR_INTERNAL_SERVER_ERROR}; + {'EXIT', {timeout, _} = Reason} -> + case Count of + 0 -> + ?ERROR_MSG("transaction return internal error: ~p~n", [{'EXIT', Reason}]), + {error, ?ERR_INTERNAL_SERVER_ERROR}; + N -> + erlang:yield(), + transaction_retry(Host, Fun, Trans, N-1) + end; {'EXIT', Reason} -> ?ERROR_MSG("transaction return internal error: ~p~n", [{'EXIT', Reason}]), @@ -5181,6 +4840,16 @@ {error, ?ERR_INTERNAL_SERVER_ERROR} end. +odbc_conn({_U, Host, _R})-> Host; +odbc_conn(<<$., Host/binary>>) -> Host; +odbc_conn(<<_, Host/binary>>) -> odbc_conn(Host). + +%% escape value for database storage +escape({_U, _H, _R}=JID)-> + ejabberd_odbc:escape(jlib:jid_to_string(JID)); +escape(Value)-> + ejabberd_odbc:escape(Value). + %%%% helpers %% Add pubsub-specific error element