%%% ====================================================================
%%% ``The contents of this file are subject to the Erlang Public License,
%%% Version 1.1, (the "License"); you may not use this file except in
%%% compliance with the License. You should have received a copy of the
%%% Erlang Public License along with this software. If not, it can be
%%% retrieved via the world wide web at http://www.erlang.org/.
%%%
%%% Software distributed under the License is distributed on an "AS IS"
%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%%% the License for the specific language governing rights and limitations
%%% under the License.
%%%
%%% The Initial Developer of the Original Code is ProcessOne.
%%% Portions created by ProcessOne are Copyright 2006-2009, ProcessOne
%%% All Rights Reserved.''
%%% This software is copyright 2006-2009, ProcessOne.
%%%
%%%
%%% @copyright 2006-2009 ProcessOne
%%% @author Christophe Romain It is used as a default for all unknown PubSub node type. It can serve
%%% as a developer basis and reference to build its own custom pubsub node
%%% types. PubSub plugin nodes are using the {@link gen_node} behaviour. The API isn't stabilized yet. The pubsub plugin
%%% development is still a work in progress. However, the system is already
%%% useable and useful as is. Please, send us comments, feedback and
%%% improvements. Called during pubsub modules initialisation. Any pubsub plugin must
%% implement this function. It can return anything. This function is mainly used to trigger the setup task necessary for the
%% plugin. It can be used for example by the developer to create the specific
%% module database schema if it does not exists yet. Called during pubsub modules termination. Any pubsub plugin must
%% implement this function. It can return anything. Example of function return value: In {@link node_default}, the permission is decided by the place in the
%% hierarchy where the user is creating the node. The access parameter is also
%% checked in the default module. This parameter depends on the value of the
%% access_createnode ACL value in ejabberd config file. This function also check that node can be created a a children of its
%% parent node PubSub plugins can redefine the PubSub node creation rights as they
%% which. They can simply delegate this check to the {@link node_default}
%% module by implementing this function like this:
%% ```check_create_user_permission(Host, ServerHost, Node, ParentNode, Owner, Access) ->
%% node_default:check_create_user_permission(Host, ServerHost, Node, ParentNode, Owner, Access).''' purge items of deleted nodes after effective deletion. Accepts or rejects subcription requests on a PubSub node. The mechanism works as follow:
%%
%%
%%
%%
The selected behaviour depends on the return parameter: %%
In the default plugin module, the record is unchanged.
subscribe_node(NodeId, Sender, Subscriber, AccessModel, SendLast, PresenceSubscription, RosterGroup, Options) -> SubKey = jlib:jid_tolower(Subscriber), GenKey = jlib:jid_remove_resource(SubKey), Authorized = (jlib:jid_tolower(jlib:jid_remove_resource(Sender)) == GenKey), {Affiliation, Subscriptions} = select_affiliation_subscriptions(NodeId, GenKey, SubKey), Whitelisted = lists:member(Affiliation, [member, publisher, owner]), PendingSubscription = lists:any(fun({pending, _}) -> true; (_) -> false end, Subscriptions), if not Authorized -> %% JIDs do not match {error, ?ERR_EXTENDED(?ERR_BAD_REQUEST, "invalid-jid")}; Affiliation == outcast -> %% Requesting entity is blocked {error, ?ERR_FORBIDDEN}; PendingSubscription -> %% Requesting entity has pending subscription {error, ?ERR_EXTENDED(?ERR_NOT_AUTHORIZED, "pending-subscription")}; (AccessModel == presence) and (not PresenceSubscription) -> %% Entity is not authorized to create a subscription (presence subscription required) {error, ?ERR_EXTENDED(?ERR_NOT_AUTHORIZED, "presence-subscription-required")}; (AccessModel == roster) and (not RosterGroup) -> %% Entity is not authorized to create a subscription (not in roster group) {error, ?ERR_EXTENDED(?ERR_NOT_AUTHORIZED, "not-in-roster-group")}; (AccessModel == whitelist) and (not Whitelisted) -> %% Node has whitelist access model and entity lacks required affiliation {error, ?ERR_EXTENDED(?ERR_NOT_ALLOWED, "closed-node")}; (AccessModel == authorize) -> % TODO: to be done %% Node has authorize access model {error, ?ERR_FORBIDDEN}; %%MustPay -> %% % Payment is required for a subscription %% {error, ?ERR_PAYMENT_REQUIRED}; %%ForbiddenAnonymous -> %% % Requesting entity is anonymous %% {error, ?ERR_FORBIDDEN}; true -> case pubsub_subscription_odbc:subscribe_node(Subscriber, NodeId, Options) of {result, SubId} -> NewSub = case AccessModel of authorize -> pending; _ -> subscribed end, update_subscription(NodeId, SubKey, [{NewSub, SubId} | Subscriptions]), case {NewSub, SendLast} of {subscribed, never} -> {result, {default, subscribed, SubId}}; {subscribed, _} -> {result, {default, subscribed, SubId, send_last}}; {_, _} -> {result, {default, pending, SubId}} end; _ -> {error, ?ERR_INTERNAL_SERVER_ERROR} end end. %% @spec (NodeId, Sender, Subscriber, SubId) -> %% {error, Reason} | {result, []} %% NodeId = mod_pubsub:pubsubNodeId() %% Sender = mod_pubsub:jid() %% Subscriber = mod_pubsub:jid() %% SubId = mod_pubsub:subid() %% Reason = mod_pubsub:stanzaError() %% @docUnsubscribe the Subscriber from the Node.
unsubscribe_node(NodeId, Sender, Subscriber, SubId) -> SubKey = jlib:jid_tolower(Subscriber), GenKey = jlib:jid_remove_resource(SubKey), Authorized = (jlib:jid_tolower(jlib:jid_remove_resource(Sender)) == GenKey), {Affiliation, Subscriptions} = select_affiliation_subscriptions(NodeId, SubKey), SubIdExists = case SubId of [] -> false; List when is_list(List) -> true; _ -> false end, if %% Requesting entity is prohibited from unsubscribing entity not Authorized -> {error, ?ERR_FORBIDDEN}; %% Entity did not specify SubId %%SubId == "", ?? -> %% {error, ?ERR_EXTENDED(?ERR_BAD_REQUEST, "subid-required")}; %% Invalid subscription identifier %%InvalidSubId -> %% {error, ?ERR_EXTENDED(?ERR_NOT_ACCEPTABLE, "invalid-subid")}; %% Requesting entity is not a subscriber Subscriptions == [] -> {error, ?ERR_EXTENDED(?ERR_UNEXPECTED_REQUEST, "not-subscribed")}; %% Subid supplied, so use that. SubIdExists -> Sub = first_in_list(fun(S) -> case S of {_Sub, SubId} -> true; _ -> false end end, Subscriptions), case Sub of {value, S} -> delete_subscription(SubKey, NodeId, S, Affiliation, Subscriptions), {result, default}; false -> {error, ?ERR_EXTENDED(?ERR_UNEXPECTED_REQUEST, "not-subscribed")} end; %% No subid supplied, but there's only one matching %% subscription, so use that. length(Subscriptions) == 1 -> delete_subscription(SubKey, NodeId, hd(Subscriptions), Affiliation, Subscriptions), {result, default}; %% No subid and more than one possible subscription match. true -> {error, ?ERR_EXTENDED(?ERR_BAD_REQUEST, "subid-required")} end. delete_subscription(SubKey, NodeId, {Subscription, SubId}, Affiliation, Subscriptions) -> NewSubs = Subscriptions -- [{Subscription, SubId}], pubsub_subscription_odbc:unsubscribe_node(SubKey, NodeId, SubId), case {Affiliation, NewSubs} of {none, []} -> % Just a regular subscriber, and this is final item, so % delete the state. del_state(NodeId, SubKey); _ -> update_subscription(NodeId, SubKey, NewSubs) end. %% @spec (NodeId, Publisher, PublishModel, MaxItems, ItemId, Payload) -> %% {true, PubsubItem} | {result, Reply} %% NodeId = mod_pubsub:pubsubNodeId() %% Publisher = mod_pubsub:jid() %% PublishModel = atom() %% MaxItems = integer() %% ItemId = string() %% Payload = term() %% @docPublishes the item passed as parameter.
%%The mechanism works as follow: %%
The selected behaviour depends on the return parameter: %%
In the default plugin module, the record is unchanged.
publish_item(NodeId, Publisher, PublishModel, MaxItems, ItemId, Payload) -> SubKey = jlib:jid_tolower(Publisher), GenKey = jlib:jid_remove_resource(SubKey), {Affiliation, Subscriptions} = select_affiliation_subscriptions(NodeId, GenKey, SubKey), Subscribed = case PublishModel of subscribers -> is_subscribed(Subscriptions); _ -> undefined end, if not ((PublishModel == open) or ((PublishModel == publishers) and ((Affiliation == owner) or (Affiliation == publisher))) or (Subscribed == true)) -> %% Entity does not have sufficient privileges to publish to node {error, ?ERR_FORBIDDEN}; true -> %% TODO: check creation, presence, roster if MaxItems > 0 -> %% Note: this works cause set_item tries an update before %% the insert, and the update just ignore creation field. PubId = {now(), SubKey}, set_item(#pubsub_item{itemid = {ItemId, NodeId}, creation = {now(), GenKey}, modification = PubId, payload = Payload}), Items = [ItemId | itemids(NodeId, GenKey)--[ItemId]], {result, {_, OI}} = remove_extra_items(NodeId, MaxItems, Items), %% set new item list use useless {result, {default, broadcast, OI}}; true -> {result, {default, broadcast, []}} end end. %% @spec (NodeId, MaxItems, ItemIds) -> {NewItemIds,OldItemIds} %% NodeId = mod_pubsub:pubsubNodeId() %% MaxItems = integer() | unlimited %% ItemIds = [ItemId::string()] %% NewItemIds = [ItemId::string()] %% @docThis function is used to remove extra items, most notably when the %% maximum number of items has been reached.
%%This function is used internally by the core PubSub module, as no %% permission check is performed.
%%In the default plugin module, the oldest items are removed, but other %% rules can be used.
%%If another PubSub plugin wants to delegate the item removal (and if the %% plugin is using the default pubsub storage), it can implements this function like this: %% ```remove_extra_items(NodeId, MaxItems, ItemIds) -> %% node_default:remove_extra_items(NodeId, MaxItems, ItemIds).'''
remove_extra_items(_NodeId, unlimited, ItemIds) -> {result, {ItemIds, []}}; remove_extra_items(NodeId, MaxItems, ItemIds) -> NewItems = lists:sublist(ItemIds, MaxItems), OldItems = lists:nthtail(length(NewItems), ItemIds), %% Remove extra items: del_items(NodeId, OldItems), %% Return the new items list: {result, {NewItems, OldItems}}. %% @spec (NodeId, Publisher, PublishModel, ItemId) -> %% {error, Reason::stanzaError()} | %% {result, []} %% NodeId = mod_pubsub:pubsubNodeId() %% Publisher = mod_pubsub:jid() %% PublishModel = atom() %% ItemId = string() %% @docTriggers item deletion.
%%Default plugin: The user performing the deletion must be the node owner %% or a publisher.
delete_item(NodeId, Publisher, PublishModel, ItemId) -> SubKey = jlib:jid_tolower(Publisher), GenKey = jlib:jid_remove_resource(SubKey), {result, Affiliation} = get_affiliation(NodeId, GenKey), Allowed = (Affiliation == publisher) orelse (Affiliation == owner) orelse (PublishModel == open) orelse case get_item(NodeId, ItemId) of {result, #pubsub_item{creation = {_, GenKey}}} -> true; _ -> false end, if not Allowed -> %% Requesting entity does not have sufficient privileges {error, ?ERR_FORBIDDEN}; true -> case del_item(NodeId, ItemId) of {updated, 1} -> %% set new item list use useless {result, {default, broadcast}}; _ -> %% Non-existent node or item {error, ?ERR_ITEM_NOT_FOUND} end end. %% @spec (NodeId, Owner) -> %% {error, Reason::stanzaError()} | %% {result, {default, broadcast}} %% NodeId = mod_pubsub:pubsubNodeId() %% Owner = mod_pubsub:jid() purge_node(NodeId, Owner) -> SubKey = jlib:jid_tolower(Owner), GenKey = jlib:jid_remove_resource(SubKey), GenState = get_state(NodeId, GenKey), case GenState of #pubsub_state{affiliation = owner} -> {result, States} = get_states(NodeId), lists:foreach( fun(#pubsub_state{items = []}) -> ok; (#pubsub_state{items = Items}) -> del_items(NodeId, Items) end, States), {result, {default, broadcast}}; _ -> %% Entity is not owner {error, ?ERR_FORBIDDEN} end. %% @spec (Host, JID) -> [{Node,Affiliation}] %% Host = host() %% JID = mod_pubsub:jid() %% @docReturn the current affiliations for the given user
%%The default module reads affiliations in the main Mnesia %% pubsub_state table. If a plugin stores its data in the same %% table, it should return an empty list, as the affiliation will be read by %% the default PubSub module. Otherwise, it should return its own affiliation, %% that will be added to the affiliation stored in the main %% pubsub_state table.
get_entity_affiliations(Host, Owner) -> SubKey = jlib:jid_tolower(Owner), GenKey = jlib:jid_remove_resource(SubKey), H = ?PUBSUB:escape(Host), J = encode_jid(GenKey), Reply = case catch ejabberd_odbc:sql_query_t( ["select node, type, i.nodeid, affiliation " "from pubsub_state i, pubsub_node n " "where i.nodeid = n.nodeid " "and jid='", J, "' " "and host='", H, "';"]) of {selected, ["node", "type", "nodeid", "affiliation"], RItems} -> lists:map(fun({N, T, I, A}) -> Node = nodetree_tree_odbc:raw_to_node(Host, {N, "", T, I}), {Node, decode_affiliation(A)} end, RItems); _ -> [] end, {result, Reply}. get_node_affiliations(NodeId) -> Reply = case catch ejabberd_odbc:sql_query_t( ["select jid, affiliation " "from pubsub_state " "where nodeid='", NodeId, "';"]) of {selected, ["jid", "affiliation"], RItems} -> lists:map(fun({J, A}) -> {decode_jid(J), decode_affiliation(A)} end, RItems); _ -> [] end, {result, Reply}. get_affiliation(NodeId, Owner) -> SubKey = jlib:jid_tolower(Owner), GenKey = jlib:jid_remove_resource(SubKey), J = encode_jid(GenKey), Reply = case catch ejabberd_odbc:sql_query_t( ["select affiliation from pubsub_state " "where nodeid='", NodeId, "' and jid='", J, "';"]) of {selected, ["affiliation"], [{A}]} -> decode_affiliation(A); _ -> none end, {result, Reply}. set_affiliation(NodeId, Owner, Affiliation) -> SubKey = jlib:jid_tolower(Owner), GenKey = jlib:jid_remove_resource(SubKey), {_, Subscriptions} = select_affiliation_subscriptions(NodeId, GenKey), case {Affiliation, Subscriptions} of {none, none} -> del_state(NodeId, GenKey); _ -> update_affiliation(NodeId, GenKey, Affiliation) end. %% @spec (Host, Owner) -> [{Node,Subscription}] %% Host = host() %% Owner = mod_pubsub:jid() %% @docReturn the current subscriptions for the given user
%%The default module reads subscriptions in the main Mnesia %% pubsub_state table. If a plugin stores its data in the same %% table, it should return an empty list, as the affiliation will be read by %% the default PubSub module. Otherwise, it should return its own affiliation, %% that will be added to the affiliation stored in the main %% pubsub_state table.
get_entity_subscriptions(Host, Owner) -> SubKey = jlib:jid_tolower(Owner), GenKey = jlib:jid_remove_resource(SubKey), H = ?PUBSUB:escape(Host), SJ = encode_jid(SubKey), GJ = encode_jid(GenKey), Query = case SubKey of GenKey -> ["select node, type, i.nodeid, jid, subscriptions " "from pubsub_state i, pubsub_node n " "where i.nodeid = n.nodeid " "and jid like '", GJ, "%' " "and host='", H, "';"]; _ -> ["select node, type, i.nodeid, jid, subscriptions " "from pubsub_state i, pubsub_node n " "where i.nodeid = n.nodeid " "and jid in ('", SJ, "', '", GJ, "') " "and host='", H, "';"] end, Reply = case catch ejabberd_odbc:sql_query_t(Query) of {selected, ["node", "type", "nodeid", "jid", "subscriptions"], RItems} -> lists:foldl(fun({N, T, I, J, S}, Acc) -> Node = nodetree_tree_odbc:raw_to_node(Host, {N, "", T, I}), Jid = decode_jid(J), case decode_subscriptions(S) of [] -> [{Node, none, Jid}|Acc]; Subs -> lists:foldl(fun({Sub, SubId}, Acc2) -> [{Node, Sub, SubId, Jid}|Acc2]; (Sub, Acc2) -> [{Node, Sub, Jid}|Acc2] end, Acc, Subs) end end, [], RItems); _ -> [] end, {result, Reply}. %% do the same as get_entity_subscriptions but filter result only to %% nodes having send_last_published_item=on_sub_and_presence %% as this call avoid seeking node, it must return node and type as well get_entity_subscriptions_for_send_last(Host, Owner) -> SubKey = jlib:jid_tolower(Owner), GenKey = jlib:jid_remove_resource(SubKey), H = ?PUBSUB:escape(Host), SJ = encode_jid(SubKey), GJ = encode_jid(GenKey), Query = case SubKey of GenKey -> ["select node, type, i.nodeid, jid, subscriptions " "from pubsub_state i, pubsub_node n, pubsub_node_option o " "where i.nodeid = n.nodeid and n.nodeid = o.nodeid " "and name='send_last_published_item' and val='on_sub_and_presence' " "and jid like '", GJ, "%' " "and host='", H, "';"]; _ -> ["select node, type, i.nodeid, jid, subscriptions " "from pubsub_state i, pubsub_node n, pubsub_node_option o " "where i.nodeid = n.nodeid and n.nodeid = o.nodeid " "and name='send_last_published_item' and val='on_sub_and_presence' " "and jid in ('", SJ, "', '", GJ, "') " "and host='", H, "';"] end, Reply = case catch ejabberd_odbc:sql_query_t(Query) of {selected, ["node", "type", "nodeid", "jid", "subscriptions"], RItems} -> lists:foldl(fun({N, T, I, J, S}, Acc) -> Node = nodetree_tree_odbc:raw_to_node(Host, {N, "", T, I}), Jid = decode_jid(J), case decode_subscriptions(S) of [] -> [{Node, none, Jid}|Acc]; Subs -> lists:foldl(fun({Sub, SubId}, Acc2) -> [{Node, Sub, SubId, Jid}|Acc2]; (Sub, Acc2) -> [{Node, Sub, Jid}|Acc2] end, Acc, Subs) end end, [], RItems); _ -> [] end, {result, Reply}. get_node_subscriptions(NodeId) -> Reply = case catch ejabberd_odbc:sql_query_t( ["select jid, subscriptions " "from pubsub_state " "where nodeid='", NodeId, "';"]) of {selected, ["jid", "subscriptions"], RItems} -> lists:foldl(fun({J, S}, Acc) -> Jid = decode_jid(J), case decode_subscriptions(S) of [] -> [{Jid, none}|Acc]; Subs -> lists:foldl(fun({Sub, SubId}, Acc2) -> [{Jid, Sub, SubId}|Acc2]; (Sub, Acc2) -> [{Jid, Sub}|Acc2] end, Acc, Subs) end end, [], RItems); _ -> [] end, {result, Reply}. get_subscriptions(NodeId, Owner) -> SubKey = jlib:jid_tolower(Owner), J = encode_jid(SubKey), Reply = case catch ejabberd_odbc:sql_query_t( ["select subscriptions from pubsub_state " "where nodeid='", NodeId, "' and jid='", J, "';"]) of {selected, ["subscriptions"], [{S}]} -> decode_subscriptions(S); _ -> [] end, {result, Reply}. set_subscriptions(NodeId, Owner, Subscription, SubId) -> SubKey = jlib:jid_tolower(Owner), SubState = get_state_without_itemids(NodeId, SubKey), case {SubId, SubState#pubsub_state.subscriptions} of {_, []} -> case Subscription of none -> {error, ?ERR_EXTENDED(?ERR_BAD_REQUEST, "not-subscribed")}; _ -> new_subscription(NodeId, Owner, Subscription, SubState) end; {"", [{_, SID}]} -> case Subscription of none -> unsub_with_subid(NodeId, SID, SubState); _ -> replace_subscription({Subscription, SID}, SubState) end; {"", [_|_]} -> {error, ?ERR_EXTENDED(?ERR_BAD_REQUEST, "subid-required")}; _ -> case Subscription of none -> unsub_with_subid(NodeId, SubId, SubState); _ -> replace_subscription({Subscription, SubId}, SubState) end end. replace_subscription(NewSub, SubState) -> NewSubs = replace_subscription(NewSub, SubState#pubsub_state.subscriptions, []), set_state(SubState#pubsub_state{subscriptions = NewSubs}). replace_subscription(_, [], Acc) -> Acc; replace_subscription({Sub, SubId}, [{_, SubID} | T], Acc) -> replace_subscription({Sub, SubId}, T, [{Sub, SubID} | Acc]). new_subscription(NodeId, Owner, Subscription, SubState) -> case pubsub_subscription_odbc:subscribe_node(Owner, NodeId, []) of {result, SubId} -> Subscriptions = SubState#pubsub_state.subscriptions, set_state(SubState#pubsub_state{subscriptions = [{Subscription, SubId} | Subscriptions]}), {Subscription, SubId}; _ -> {error, ?ERR_INTERNAL_SERVER_ERROR} end. unsub_with_subid(NodeId, SubId, SubState) -> pubsub_subscription_odbc:unsubscribe_node(SubState#pubsub_state.stateid, NodeId, SubId), NewSubs = lists:filter(fun ({_, SID}) -> SubId =/= SID end, SubState#pubsub_state.subscriptions), case {NewSubs, SubState#pubsub_state.affiliation} of {[], none} -> del_state(NodeId, element(1, SubState#pubsub_state.stateid)); _ -> set_state(SubState#pubsub_state{subscriptions = NewSubs}) end. %% @spec (Host, Owner) -> {result, [Node]} | {error, Reason} %% Host = host() %% Owner = jid() %% Node = pubsubNode() %% @docReturns a list of Owner's nodes on Host with pending %% subscriptions.
get_pending_nodes(Host, Owner) -> GenKey = jlib:jid_remove_resource(jlib:jid_tolower(Owner)), States = mnesia:match_object(#pubsub_state{stateid = {GenKey, '_'}, affiliation = owner, _ = '_'}), NodeIDs = [ID || #pubsub_state{stateid = {_, ID}} <- States], NodeTree = case catch ets:lookup(gen_mod:get_module_proc(Host, config), nodetree) of [{nodetree, N}] -> N; _ -> nodetree_tree_odbc end, Reply = mnesia:foldl(fun(#pubsub_state{stateid = {_, NID}} = S, Acc) -> case lists:member(NID, NodeIDs) of true -> case get_nodes_helper(NodeTree, S) of {value, Node} -> [Node | Acc]; false -> Acc end; false -> Acc end end, [], pubsub_state), {result, Reply}. get_nodes_helper(NodeTree, #pubsub_state{stateid = {_, N}, subscriptions = Subs}) -> HasPending = fun ({pending, _}) -> true; (pending) -> true; (_) -> false end, case lists:any(HasPending, Subs) of true -> case NodeTree:get_node(N) of #pubsub_node{nodeid = {_, Node}} -> {value, Node}; _ -> false end; false -> false end. %% @spec (NodeId) -> [States] | [] %% NodeId = mod_pubsub:pubsubNodeId() %% @doc Returns the list of stored states for a given node. %%For the default PubSub module, states are stored in Mnesia database.
%%We can consider that the pubsub_state table have been created by the main %% mod_pubsub module.
%%PubSub plugins can store the states where they wants (for example in a %% relational database).
%%If a PubSub plugin wants to delegate the states storage to the default node, %% they can implement this function like this: %% ```get_states(NodeId) -> %% node_default:get_states(NodeId).'''
get_states(NodeId) -> case catch ejabberd_odbc:sql_query_t( ["select jid, affiliation, subscriptions " "from pubsub_state " "where nodeid='", NodeId, "';"]) of {selected, ["jid", "affiliation", "subscriptions"], RItems} -> {result, lists:map(fun({SJID, Affiliation, Subscriptions}) -> #pubsub_state{stateid = {decode_jid(SJID), NodeId}, items = itemids(NodeId, SJID), affiliation = decode_affiliation(Affiliation), subscriptions = decode_subscriptions(Subscriptions)} end, RItems)}; _ -> {result, []} end. %% @spec (NodeId, JID) -> [State] | [] %% NodeId = mod_pubsub:pubsubNodeId() %% JID = mod_pubsub:jid() %% State = mod_pubsub:pubsubItems() %% @docReturns a state (one state list), given its reference.
get_state(NodeId, JID) -> State = get_state_without_itemids(NodeId, JID), {SJID, _} = State#pubsub_state.stateid, State#pubsub_state{items = itemids(NodeId, SJID)}. get_state_without_itemids(NodeId, JID) -> J = encode_jid(JID), case catch ejabberd_odbc:sql_query_t( ["select jid, affiliation, subscriptions " "from pubsub_state " "where jid='", J, "' " "and nodeid='", NodeId, "';"]) of {selected, ["jid", "affiliation", "subscriptions"], [{SJID, Affiliation, Subscriptions}]} -> #pubsub_state{stateid = {decode_jid(SJID), NodeId}, affiliation = decode_affiliation(Affiliation), subscriptions = decode_subscriptions(Subscriptions)}; _ -> #pubsub_state{stateid={JID, NodeId}} end. %% @spec (State) -> ok | {error, Reason::stanzaError()} %% State = mod_pubsub:pubsubStates() %% @docWrite a state into database.
set_state(State) -> {_, NodeId} = State#pubsub_state.stateid, set_state(NodeId, State). set_state(NodeId, State) -> %% NOTE: in odbc version, as we do not handle item list, %% we just need to update affiliation and subscription %% cause {JID,NodeId} is the key. if it does not exists, then we insert it. %% MySQL can be optimized using INSERT ... ON DUPLICATE KEY as well {JID, _} = State#pubsub_state.stateid, J = encode_jid(JID), S = encode_subscriptions(State#pubsub_state.subscriptions), A = encode_affiliation(State#pubsub_state.affiliation), case catch ejabberd_odbc:sql_query_t( ["update pubsub_state " "set subscriptions='", S, "', affiliation='", A, "' " "where nodeid='", NodeId, "' and jid='", J, "';"]) of {updated, 1} -> ok; _ -> catch ejabberd_odbc:sql_query_t( ["insert into pubsub_state(nodeid, jid, affiliation, subscriptions) " "values('", NodeId, "', '", J, "', '", A, "', '", S, "');"]) end, {result, []}. %% @spec (NodeId, JID) -> ok | {error, Reason::stanzaError()} %% NodeId = mod_pubsub:pubsubNodeId() %% JID = mod_pubsub:jid() %% @docDelete a state from database.
del_state(NodeId, JID) -> J = encode_jid(JID), catch ejabberd_odbc:sql_query_t( ["delete from pubsub_state " "where jid='", J, "' " "and nodeid='", NodeId, "';"]), ok. %% @spec (NodeId, From) -> {[Items],RsmOut} | [] %% NodeId = mod_pubsub:pubsubNodeId() %% Items = mod_pubsub:pubsubItems() %% @doc Returns the list of stored items for a given node. %%For the default PubSub module, items are stored in Mnesia database.
%%We can consider that the pubsub_item table have been created by the main %% mod_pubsub module.
%%PubSub plugins can store the items where they wants (for example in a %% relational database), or they can even decide not to persist any items.
%%If a PubSub plugin wants to delegate the item storage to the default node, %% they can implement this function like this: %% ```get_items(NodeId, From) -> %% node_default:get_items(NodeId, From).'''
get_items(NodeId, _From) -> case catch ejabberd_odbc:sql_query_t( ["select itemid, publisher, creation, modification, payload " "from pubsub_item " "where nodeid='", NodeId, "' " "order by modification desc;"]) of {selected, ["itemid", "publisher", "creation", "modification", "payload"], RItems} -> {result, lists:map(fun(RItem) -> raw_to_item(NodeId, RItem) end, RItems)}; _ -> {result, []} end. get_items(NodeId, From, none) -> MaxItems = case catch ejabberd_odbc:sql_query_t( ["select val from pubsub_node_option " "where nodeid='", NodeId, "' " "and name='max_items';"]) of {selected, ["val"], [{Value}]} -> Tokens = element(2, erl_scan:string(Value++".")), element(2, erl_parse:parse_term(Tokens)); _ -> ?MAXITEMS end, get_items(NodeId, From, #rsm_in{max=MaxItems}); get_items(NodeId, _From, #rsm_in{max=M, direction=Direction, id=I, index=IncIndex})-> Max = ?PUBSUB:escape(i2l(M)), {Way, Order} = case Direction of aft -> {"<", "desc"}; before when I == [] -> {"is not", "asc"}; before -> {">", "asc"}; _ when IncIndex =/= undefined -> {"<", "desc"}; % using index _ -> {"is not", "desc"}% Can be better end, [AttrName, Id] = case I of undefined when IncIndex =/= undefined -> case catch ejabberd_odbc:sql_query_t( ["select modification from pubsub_item pi " "where exists ( " "select count(*) as count1 " "from pubsub_item " "where nodeid='", NodeId, "' " "and modification > pi.modification " "having count1 = ",?PUBSUB:escape(i2l(IncIndex))," );"]) of {selected, [_], [{O}]} -> ["modification", "'"++O++"'"]; _ -> ["modification", "null"] end; undefined -> ["modification", "null"]; [] -> ["modification", "null"]; I -> [A, B] = string:tokens(?PUBSUB:escape(i2l(I)), "@"), [A, "'"++B++"'"] end, Count= case catch ejabberd_odbc:sql_query_t( ["select count(*) " "from pubsub_item " "where nodeid='", NodeId, "';"]) of {selected, [_], [{C}]} -> C; _ -> "0" end, case catch ejabberd_odbc:sql_query_t( ["select itemid, publisher, creation, modification, payload " "from pubsub_item " "where nodeid='", NodeId, "' " "and ", AttrName," ", Way, " ", Id, " " "order by ", AttrName," ", Order," limit ", i2l(Max)," ;"]) of {selected, ["itemid", "publisher", "creation", "modification", "payload"], RItems} -> case length(RItems) of 0 -> {result, {[], #rsm_out{count=Count}}}; _ -> {_, _, _, F, _} = hd(RItems), Index = case catch ejabberd_odbc:sql_query_t( ["select count(*) " "from pubsub_item " "where nodeid='", NodeId, "' " "and ", AttrName," > '", F, "';"]) of %{selected, [_], [{C}, {In}]} -> [string:strip(C, both, $"), string:strip(In, both, $")]; {selected, [_], [{In}]} -> In; _ -> "0" end, %{F, _} = string:to_integer(FStr), {_, _, _, L, _} = lists:last(RItems), RsmOut = #rsm_out{count=Count, index=Index, first="modification@"++F, last="modification@"++i2l(L)}, {result, {lists:map(fun(RItem) -> raw_to_item(NodeId, RItem) end, RItems), RsmOut}} end; _ -> {result, {[], none}} end. get_items(NodeId, JID, AccessModel, PresenceSubscription, RosterGroup, SubId) -> get_items(NodeId, JID, AccessModel, PresenceSubscription, RosterGroup, SubId, none). get_items(NodeId, JID, AccessModel, PresenceSubscription, RosterGroup, _SubId, RSM) -> SubKey = jlib:jid_tolower(JID), GenKey = jlib:jid_remove_resource(SubKey), {Affiliation, Subscriptions} = select_affiliation_subscriptions(NodeId, GenKey, SubKey), Whitelisted = can_fetch_item(Affiliation, Subscriptions), if %%SubId == "", ?? -> %% Entity has multiple subscriptions to the node but does not specify a subscription ID %{error, ?ERR_EXTENDED(?ERR_BAD_REQUEST, "subid-required")}; %%InvalidSubId -> %% Entity is subscribed but specifies an invalid subscription ID %{error, ?ERR_EXTENDED(?ERR_NOT_ACCEPTABLE, "invalid-subid")}; Affiliation == outcast -> %% Requesting entity is blocked {error, ?ERR_FORBIDDEN}; (AccessModel == presence) and (not PresenceSubscription) -> %% Entity is not authorized to create a subscription (presence subscription required) {error, ?ERR_EXTENDED(?ERR_NOT_AUTHORIZED, "presence-subscription-required")}; (AccessModel == roster) and (not RosterGroup) -> %% Entity is not authorized to create a subscription (not in roster group) {error, ?ERR_EXTENDED(?ERR_NOT_AUTHORIZED, "not-in-roster-group")}; (AccessModel == whitelist) and (not Whitelisted) -> %% Node has whitelist access model and entity lacks required affiliation {error, ?ERR_EXTENDED(?ERR_NOT_ALLOWED, "closed-node")}; (AccessModel == authorize) -> % TODO: to be done %% Node has authorize access model {error, ?ERR_FORBIDDEN}; %%MustPay -> %% % Payment is required for a subscription %% {error, ?ERR_PAYMENT_REQUIRED}; true -> get_items(NodeId, JID, RSM) end. get_last_items(NodeId, _From, Count) -> case catch ejabberd_odbc:sql_query_t( ["select itemid, publisher, creation, modification, payload " "from pubsub_item " "where nodeid='", NodeId, "' " "order by modification desc limit ", i2l(Count), ";"]) of {selected, ["itemid", "publisher", "creation", "modification", "payload"], RItems} -> {result, lists:map(fun(RItem) -> raw_to_item(NodeId, RItem) end, RItems)}; _ -> {result, []} end. %% @spec (NodeId, ItemId) -> [Item] | [] %% NodeId = mod_pubsub:pubsubNodeId() %% ItemId = string() %% Item = mod_pubsub:pubsubItems() %% @docReturns an item (one item list), given its reference.
get_item(NodeId, ItemId) -> I = ?PUBSUB:escape(ItemId), case catch ejabberd_odbc:sql_query_t( ["select itemid, publisher, creation, modification, payload " "from pubsub_item " "where nodeid='", NodeId, "' " "and itemid='", I,"';"]) of {selected, ["itemid", "publisher", "creation", "modification", "payload"], [RItem]} -> {result, raw_to_item(NodeId, RItem)}; _ -> {error, ?ERR_ITEM_NOT_FOUND} end. get_item(NodeId, ItemId, JID, AccessModel, PresenceSubscription, RosterGroup, _SubId) -> SubKey = jlib:jid_tolower(JID), GenKey = jlib:jid_remove_resource(SubKey), {Affiliation, Subscriptions} = select_affiliation_subscriptions(NodeId, GenKey, SubKey), Whitelisted = can_fetch_item(Affiliation, Subscriptions), if %%SubId == "", ?? -> %% Entity has multiple subscriptions to the node but does not specify a subscription ID %{error, ?ERR_EXTENDED(?ERR_BAD_REQUEST, "subid-required")}; %%InvalidSubId -> %% Entity is subscribed but specifies an invalid subscription ID %{error, ?ERR_EXTENDED(?ERR_NOT_ACCEPTABLE, "invalid-subid")}; Affiliation == outcast -> %% Requesting entity is blocked {error, ?ERR_FORBIDDEN}; (AccessModel == presence) and (not PresenceSubscription) -> %% Entity is not authorized to create a subscription (presence subscription required) {error, ?ERR_EXTENDED(?ERR_NOT_AUTHORIZED, "presence-subscription-required")}; (AccessModel == roster) and (not RosterGroup) -> %% Entity is not authorized to create a subscription (not in roster group) {error, ?ERR_EXTENDED(?ERR_NOT_AUTHORIZED, "not-in-roster-group")}; (AccessModel == whitelist) and (not Whitelisted) -> %% Node has whitelist access model and entity lacks required affiliation {error, ?ERR_EXTENDED(?ERR_NOT_ALLOWED, "closed-node")}; (AccessModel == authorize) -> % TODO: to be done %% Node has authorize access model {error, ?ERR_FORBIDDEN}; %%MustPay -> %% % Payment is required for a subscription %% {error, ?ERR_PAYMENT_REQUIRED}; true -> get_item(NodeId, ItemId) end. %% @spec (Item) -> ok | {error, Reason::stanzaError()} %% Item = mod_pubsub:pubsubItems() %% @docWrite an item into database.
set_item(Item) -> {ItemId, NodeId} = Item#pubsub_item.itemid, I = ?PUBSUB:escape(ItemId), {C, _} = Item#pubsub_item.creation, {M, JID} = Item#pubsub_item.modification, P = encode_jid(JID), Payload = Item#pubsub_item.payload, XML = ?PUBSUB:escape(lists:flatten(lists:map(fun(X) -> xml:element_to_string(X) end, Payload))), S = fun({T1, T2, T3}) -> lists:flatten([i2l(T1, 6), ":", i2l(T2, 6), ":", i2l(T3, 6)]) end, case catch ejabberd_odbc:sql_query_t( ["update pubsub_item " "set publisher='", P, "', modification='", S(M), "', payload='", XML, "' " "where nodeid='", NodeId, "' and itemid='", I, "';"]) of {updated, 1} -> ok; _ -> catch ejabberd_odbc:sql_query_t( ["insert into pubsub_item " "(nodeid, itemid, publisher, creation, modification, payload) " "values('", NodeId, "', '", I, "', '", P, "', '", S(C), "', '", S(M), "', '", XML, "');"]) end, {result, []}. %% @spec (NodeId, ItemId) -> ok | {error, Reason::stanzaError()} %% NodeId = mod_pubsub:pubsubNodeId() %% ItemId = string() %% @docDelete an item from database.
del_item(NodeId, ItemId) -> I = ?PUBSUB:escape(ItemId), catch ejabberd_odbc:sql_query_t( ["delete from pubsub_item " "where itemid='", I, "' " "and nodeid='", NodeId, "';"]). del_items(_, []) -> ok; del_items(NodeId, [ItemId]) -> del_item(NodeId, ItemId); del_items(NodeId, ItemIds) -> I = string:join([["'", ?PUBSUB:escape(X), "'"] || X <- ItemIds], ","), catch ejabberd_odbc:sql_query_t( ["delete from pubsub_item " "where itemid in (", I, ") " "and nodeid='", NodeId, "';"]). %% @docReturn the name of the node if known: Default is to return %% node id.
get_item_name(_Host, _Node, Id) -> Id. %% @spec (Affiliation, Subscription) -> true | false %% Affiliation = owner | member | publisher | outcast | none %% Subscription = subscribed | none %% @doc Determines if the combination of Affiliation and Subscribed %% are allowed to get items from a node. can_fetch_item(owner, _) -> true; can_fetch_item(member, _) -> true; can_fetch_item(publisher, _) -> true; can_fetch_item(outcast, _) -> false; can_fetch_item(none, Subscriptions) -> is_subscribed(Subscriptions); can_fetch_item(_Affiliation, _Subscription) -> false. is_subscribed(Subscriptions) -> lists:any(fun ({subscribed, _SubId}) -> true; (_) -> false end, Subscriptions). %% Returns the first item where Pred() is true in List first_in_list(_Pred, []) -> false; first_in_list(Pred, [H | T]) -> case Pred(H) of true -> {value, H}; _ -> first_in_list(Pred, T) end. itemids(NodeId, {U, S, R}) -> itemids(NodeId, encode_jid({U, S, R})); itemids(NodeId, SJID) -> case catch ejabberd_odbc:sql_query_t( ["select itemid " "from pubsub_item " "where nodeid='", NodeId, "' " "and publisher like '", SJID, "%' " "order by modification desc;"]) of {selected, ["itemid"], RItems} -> lists:map(fun({ItemId}) -> ItemId end, RItems); _ -> [] end. select_affiliation_subscriptions(NodeId, JID) -> J = encode_jid(JID), case catch ejabberd_odbc:sql_query_t( ["select affiliation,subscriptions from pubsub_state " "where nodeid='", NodeId, "' and jid='", J, "';"]) of {selected, ["affiliation", "subscriptions"], [{A, S}]} -> {decode_affiliation(A), decode_subscriptions(S)}; _ -> {none, []} end. select_affiliation_subscriptions(NodeId, JID, JID) -> select_affiliation_subscriptions(NodeId, JID); select_affiliation_subscriptions(NodeId, GenKey, SubKey) -> {result, Affiliation} = get_affiliation(NodeId, GenKey), {result, Subscriptions} = get_subscriptions(NodeId, SubKey), {Affiliation, Subscriptions}. update_affiliation(NodeId, JID, Affiliation) -> J = encode_jid(JID), A = encode_affiliation(Affiliation), case catch ejabberd_odbc:sql_query_t( ["update pubsub_state " "set affiliation='", A, "' " "where nodeid='", NodeId, "' and jid='", J, "';"]) of {updated, 1} -> ok; _ -> catch ejabberd_odbc:sql_query_t( ["insert into pubsub_state(nodeid, jid, affiliation, subscriptions) " "values('", NodeId, "', '", J, "', '", A, "', '');"]) end. update_subscription(NodeId, JID, Subscription) -> J = encode_jid(JID), S = encode_subscriptions(Subscription), case catch ejabberd_odbc:sql_query_t( ["update pubsub_state " "set subscriptions='", S, "' " "where nodeid='", NodeId, "' and jid='", J, "';"]) of {updated, 1} -> ok; _ -> catch ejabberd_odbc:sql_query_t( ["insert into pubsub_state(nodeid, jid, affiliation, subscriptions) " "values('", NodeId, "', '", J, "', 'n', '", S, "');"]) end. decode_jid(SJID) -> jlib:jid_tolower(jlib:string_to_jid(SJID)). decode_node(N) -> ?PUBSUB:string_to_node(N). decode_affiliation("o") -> owner; decode_affiliation("p") -> publisher; decode_affiliation("m") -> member; decode_affiliation("c") -> outcast; decode_affiliation(_) -> none. decode_subscription("s") -> subscribed; decode_subscription("p") -> pending; decode_subscription("u") -> unconfigured; decode_subscription(_) -> none. decode_subscriptions(Subscriptions) -> lists:foldl(fun(Subscription, Acc) -> case string:tokens(Subscription, ":") of [S, SubId] -> [{decode_subscription(S), SubId}|Acc]; _ -> Acc end end, [], string:tokens(Subscriptions, ",")). encode_jid(JID) -> ?PUBSUB:escape(jlib:jid_to_string(JID)). encode_affiliation(owner) -> "o"; encode_affiliation(publisher) -> "p"; encode_affiliation(member) -> "m"; encode_affiliation(outcast) -> "c"; encode_affiliation(_) -> "n". encode_subscription(subscribed) -> "s"; encode_subscription(pending) -> "p"; encode_subscription(unconfigured) -> "u"; encode_subscription(_) -> "n". encode_subscriptions(Subscriptions) -> string:join(lists:map(fun({S, SubId}) -> encode_subscription(S)++":"++SubId end, Subscriptions), ","). %%% record getter/setter state_to_raw(NodeId, State) -> {JID, _} = State#pubsub_state.stateid, J = encode_jid(JID), A = encode_affiliation(State#pubsub_state.affiliation), S = encode_subscriptions(State#pubsub_state.subscriptions), ["'", NodeId, "', '", J, "', '", A, "', '", S, "'"]. raw_to_item(NodeId, {ItemId, SJID, Creation, Modification, XML}) -> JID = decode_jid(SJID), ToTime = fun(Str) -> [T1,T2,T3] = string:tokens(Str, ":"), {l2i(T1), l2i(T2), l2i(T3)} end, Payload = case xml_stream:parse_element(XML) of {error, _Reason} -> []; El -> [El] end, #pubsub_item{itemid = {ItemId, NodeId}, creation={ToTime(Creation), JID}, modification={ToTime(Modification), JID}, payload = Payload}. l2i(L) when is_list(L) -> list_to_integer(L); l2i(I) when is_integer(I) -> I. i2l(I) when is_integer(I) -> integer_to_list(I); i2l(L) when is_list(L) -> L. i2l(I, N) when is_integer(I) -> i2l(i2l(I), N); i2l(L, N) when is_list(L) -> case length(L) of N -> L; C when C > N -> L; _ -> i2l([$0|L], N) end.