mirror of
https://github.com/processone/ejabberd.git
synced 2024-12-28 17:38:54 +01:00
989 lines
37 KiB
Erlang
989 lines
37 KiB
Erlang
%%%----------------------------------------------------------------------
|
|
%%% File : node_flat.erl
|
|
%%% Author : Christophe Romain <christophe.romain@process-one.net>
|
|
%%% Purpose : Standard PubSub node plugin
|
|
%%% Created : 1 Dec 2007 by Christophe Romain <christophe.romain@process-one.net>
|
|
%%%
|
|
%%%
|
|
%%% ejabberd, Copyright (C) 2002-2023 ProcessOne
|
|
%%%
|
|
%%% This program is free software; you can redistribute it and/or
|
|
%%% modify it under the terms of the GNU General Public License as
|
|
%%% published by the Free Software Foundation; either version 2 of the
|
|
%%% License, or (at your option) any later version.
|
|
%%%
|
|
%%% This program is distributed in the hope that it will be useful,
|
|
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
%%% General Public License for more details.
|
|
%%%
|
|
%%% You should have received a copy of the GNU General Public License along
|
|
%%% with this program; if not, write to the Free Software Foundation, Inc.,
|
|
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
%%%
|
|
%%%----------------------------------------------------------------------
|
|
|
|
%%% @doc The module <strong>{@module}</strong> is the default PubSub plugin.
|
|
%%% <p>It is used as a default for all unknown PubSub node type. It can serve
|
|
%%% as a developer basis and reference to build its own custom pubsub node
|
|
%%% types.</p>
|
|
%%% <p>PubSub plugin nodes are using the {@link gen_node} behaviour.</p>
|
|
|
|
-module(node_flat).
|
|
-behaviour(gen_pubsub_node).
|
|
-author('christophe.romain@process-one.net').
|
|
|
|
-include("pubsub.hrl").
|
|
-include_lib("xmpp/include/xmpp.hrl").
|
|
|
|
-export([init/3, terminate/2, options/0, features/0,
|
|
create_node_permission/6, create_node/2, delete_node/1,
|
|
purge_node/2, subscribe_node/8, unsubscribe_node/4,
|
|
publish_item/7, delete_item/4,
|
|
remove_extra_items/2, remove_extra_items/3, remove_expired_items/2,
|
|
get_entity_affiliations/2, get_node_affiliations/1,
|
|
get_affiliation/2, set_affiliation/3,
|
|
get_entity_subscriptions/2, get_node_subscriptions/1,
|
|
get_subscriptions/2, set_subscriptions/4,
|
|
get_pending_nodes/2, get_states/1, get_state/2,
|
|
set_state/1, get_items/7, get_items/3, get_item/7,
|
|
get_last_items/3, get_only_item/2,
|
|
get_item/2, set_item/1, get_item_name/3, node_to_path/1,
|
|
path_to_node/1, can_fetch_item/2, is_subscribed/1, transform/1]).
|
|
|
|
init(_Host, _ServerHost, _Opts) ->
|
|
%pubsub_subscription:init(Host, ServerHost, Opts),
|
|
ejabberd_mnesia:create(?MODULE, pubsub_state,
|
|
[{disc_copies, [node()]}, {index, [nodeidx]},
|
|
{type, ordered_set},
|
|
{attributes, record_info(fields, pubsub_state)}]),
|
|
ejabberd_mnesia:create(?MODULE, pubsub_item,
|
|
[{disc_only_copies, [node()]}, {index, [nodeidx]},
|
|
{attributes, record_info(fields, pubsub_item)}]),
|
|
ejabberd_mnesia:create(?MODULE, pubsub_orphan,
|
|
[{disc_copies, [node()]},
|
|
{attributes, record_info(fields, pubsub_orphan)}]),
|
|
ItemsFields = record_info(fields, pubsub_item),
|
|
case mnesia:table_info(pubsub_item, attributes) of
|
|
ItemsFields -> ok;
|
|
_ -> mnesia:transform_table(pubsub_item, ignore, ItemsFields)
|
|
end,
|
|
ok.
|
|
|
|
terminate(_Host, _ServerHost) ->
|
|
ok.
|
|
|
|
options() ->
|
|
[{deliver_payloads, true},
|
|
{notify_config, false},
|
|
{notify_delete, false},
|
|
{notify_retract, true},
|
|
{purge_offline, false},
|
|
{persist_items, true},
|
|
{max_items, ?MAXITEMS},
|
|
{subscribe, true},
|
|
{access_model, open},
|
|
{roster_groups_allowed, []},
|
|
{publish_model, publishers},
|
|
{notification_type, headline},
|
|
{max_payload_size, ?MAX_PAYLOAD_SIZE},
|
|
{send_last_published_item, on_sub_and_presence},
|
|
{deliver_notifications, true},
|
|
{presence_based_delivery, false},
|
|
{itemreply, none}].
|
|
|
|
features() ->
|
|
[<<"create-nodes">>,
|
|
<<"auto-create">>,
|
|
<<"access-authorize">>,
|
|
<<"delete-nodes">>,
|
|
<<"delete-items">>,
|
|
<<"get-pending">>,
|
|
<<"instant-nodes">>,
|
|
<<"manage-subscriptions">>,
|
|
<<"modify-affiliations">>,
|
|
<<"outcast-affiliation">>,
|
|
<<"persistent-items">>,
|
|
<<"multi-items">>,
|
|
<<"publish">>,
|
|
<<"publish-only-affiliation">>,
|
|
<<"publish-options">>,
|
|
<<"purge-nodes">>,
|
|
<<"retract-items">>,
|
|
<<"retrieve-affiliations">>,
|
|
<<"retrieve-items">>,
|
|
<<"retrieve-subscriptions">>,
|
|
<<"subscribe">>,
|
|
%%<<"subscription-options">>,
|
|
<<"subscription-notifications">>].
|
|
|
|
%% @doc Checks if the current user has the permission to create the requested node
|
|
%% <p>In flat node, any unused node name is allowed. The access parameter is also
|
|
%% checked. This parameter depends on the value of the
|
|
%% <tt>access_createnode</tt> ACL value in ejabberd config file.</p>
|
|
create_node_permission(Host, ServerHost, _Node, _ParentNode, Owner, Access) ->
|
|
LOwner = jid:tolower(Owner),
|
|
Allowed = case LOwner of
|
|
{<<"">>, Host, <<"">>} ->
|
|
true; % pubsub service always allowed
|
|
_ ->
|
|
acl:match_rule(ServerHost, Access, LOwner) =:= allow
|
|
end,
|
|
{result, Allowed}.
|
|
|
|
create_node(Nidx, Owner) ->
|
|
OwnerKey = jid:tolower(jid:remove_resource(Owner)),
|
|
set_state(#pubsub_state{stateid = {OwnerKey, Nidx},
|
|
nodeidx = Nidx, affiliation = owner}),
|
|
{result, {default, broadcast}}.
|
|
|
|
delete_node(Nodes) ->
|
|
Tr = fun (#pubsub_state{stateid = {J, _}, subscriptions = Ss}) ->
|
|
lists:map(fun (S) -> {J, S} end, Ss)
|
|
end,
|
|
Reply = lists:map(fun (#pubsub_node{id = Nidx} = PubsubNode) ->
|
|
{result, States} = get_states(Nidx),
|
|
lists:foreach(fun (State) ->
|
|
del_items(Nidx, State#pubsub_state.items),
|
|
del_state(State#pubsub_state{items = []})
|
|
end, States),
|
|
del_orphan_items(Nidx),
|
|
{PubsubNode, lists:flatmap(Tr, States)}
|
|
end, Nodes),
|
|
{result, {default, broadcast, Reply}}.
|
|
|
|
%% @doc <p>Accepts or rejects subcription requests on a PubSub node.</p>
|
|
%% <p>The mechanism works as follow:
|
|
%% <ul>
|
|
%% <li>The main PubSub module prepares the subscription and passes the
|
|
%% result of the preparation as a record.</li>
|
|
%% <li>This function gets the prepared record and several other parameters and
|
|
%% can decide to:<ul>
|
|
%% <li>reject the subscription;</li>
|
|
%% <li>allow it as is, letting the main module perform the database
|
|
%% persistence;</li>
|
|
%% <li>allow it, modifying the record. The main module will store the
|
|
%% modified record;</li>
|
|
%% <li>allow it, but perform the needed persistence operations.</li></ul>
|
|
%% </li></ul></p>
|
|
%% <p>The selected behaviour depends on the return parameter:
|
|
%% <ul>
|
|
%% <li><tt>{error, Reason}</tt>: an IQ error result will be returned. No
|
|
%% subscription will actually be performed.</li>
|
|
%% <li><tt>true</tt>: Subscribe operation is allowed, based on the
|
|
%% unmodified record passed in parameter <tt>SubscribeResult</tt>. If this
|
|
%% parameter contains an error, no subscription will be performed.</li>
|
|
%% <li><tt>{true, PubsubState}</tt>: Subscribe operation is allowed, but
|
|
%% the {@link mod_pubsub:pubsubState()} record returned replaces the value
|
|
%% passed in parameter <tt>SubscribeResult</tt>.</li>
|
|
%% <li><tt>{true, done}</tt>: Subscribe operation is allowed, but the
|
|
%% {@link mod_pubsub:pubsubState()} will be considered as already stored and
|
|
%% no further persistence operation will be performed. This case is used,
|
|
%% when the plugin module is doing the persistence by itself or when it want
|
|
%% to completely disable persistence.</li></ul>
|
|
%% </p>
|
|
%% <p>In the default plugin module, the record is unchanged.</p>
|
|
subscribe_node(Nidx, Sender, Subscriber, AccessModel,
|
|
SendLast, PresenceSubscription, RosterGroup, _Options) ->
|
|
SubKey = jid:tolower(Subscriber),
|
|
GenKey = jid:remove_resource(SubKey),
|
|
Authorized = jid:tolower(jid:remove_resource(Sender)) == GenKey,
|
|
GenState = get_state(Nidx, GenKey),
|
|
SubState = case SubKey of
|
|
GenKey -> GenState;
|
|
_ -> get_state(Nidx, SubKey)
|
|
end,
|
|
Affiliation = GenState#pubsub_state.affiliation,
|
|
Subscriptions = SubState#pubsub_state.subscriptions,
|
|
Whitelisted = lists:member(Affiliation, [member, publisher, owner]),
|
|
PendingSubscription = lists:any(fun
|
|
({pending, _}) -> true;
|
|
(_) -> false
|
|
end,
|
|
Subscriptions),
|
|
Owner = Affiliation == owner,
|
|
if not Authorized ->
|
|
{error,
|
|
mod_pubsub:extended_error((xmpp:err_bad_request()), mod_pubsub:err_invalid_jid())};
|
|
(Affiliation == outcast) or (Affiliation == publish_only) ->
|
|
{error, xmpp:err_forbidden()};
|
|
PendingSubscription ->
|
|
{error,
|
|
mod_pubsub:extended_error((xmpp:err_not_authorized()), mod_pubsub:err_pending_subscription())};
|
|
(AccessModel == presence) and (not PresenceSubscription) and (not Owner) ->
|
|
{error,
|
|
mod_pubsub:extended_error((xmpp:err_not_authorized()), mod_pubsub:err_presence_subscription_required())};
|
|
(AccessModel == roster) and (not RosterGroup) and (not Owner) ->
|
|
{error,
|
|
mod_pubsub:extended_error((xmpp:err_not_authorized()), mod_pubsub:err_not_in_roster_group())};
|
|
(AccessModel == whitelist) and (not Whitelisted) and (not Owner) ->
|
|
{error,
|
|
mod_pubsub:extended_error((xmpp:err_not_allowed()), mod_pubsub:err_closed_node())};
|
|
%%MustPay ->
|
|
%% % Payment is required for a subscription
|
|
%% {error, ?ERR_PAYMENT_REQUIRED};
|
|
%%ForbiddenAnonymous ->
|
|
%% % Requesting entity is anonymous
|
|
%% {error, xmpp:err_forbidden()};
|
|
true ->
|
|
%%SubId = pubsub_subscription:add_subscription(Subscriber, Nidx, Options),
|
|
{NewSub, SubId} = case Subscriptions of
|
|
[{subscribed, Id}|_] ->
|
|
{subscribed, Id};
|
|
[] ->
|
|
Id = pubsub_subscription:make_subid(),
|
|
Sub = case AccessModel of
|
|
authorize -> pending;
|
|
_ -> subscribed
|
|
end,
|
|
set_state(SubState#pubsub_state{subscriptions =
|
|
[{Sub, Id} | Subscriptions]}),
|
|
{Sub, Id}
|
|
end,
|
|
case {NewSub, SendLast} of
|
|
{subscribed, never} ->
|
|
{result, {default, subscribed, SubId}};
|
|
{subscribed, _} ->
|
|
{result, {default, subscribed, SubId, send_last}};
|
|
{_, _} ->
|
|
{result, {default, pending, SubId}}
|
|
end
|
|
end.
|
|
|
|
%% @doc <p>Unsubscribe the <tt>Subscriber</tt> from the <tt>Node</tt>.</p>
|
|
unsubscribe_node(Nidx, Sender, Subscriber, SubId) ->
|
|
SubKey = jid:tolower(Subscriber),
|
|
GenKey = jid:remove_resource(SubKey),
|
|
Authorized = jid:tolower(jid:remove_resource(Sender)) == GenKey,
|
|
GenState = get_state(Nidx, GenKey),
|
|
SubState = case SubKey of
|
|
GenKey -> GenState;
|
|
_ -> get_state(Nidx, SubKey)
|
|
end,
|
|
Subscriptions = lists:filter(fun
|
|
({_Sub, _SubId}) -> true;
|
|
(_SubId) -> false
|
|
end,
|
|
SubState#pubsub_state.subscriptions),
|
|
SubIdExists = case SubId of
|
|
<<>> -> false;
|
|
Binary when is_binary(Binary) -> true;
|
|
_ -> false
|
|
end,
|
|
if
|
|
%% Requesting entity is prohibited from unsubscribing entity
|
|
not Authorized ->
|
|
{error, xmpp:err_forbidden()};
|
|
%% Entity did not specify SubId
|
|
%%SubId == "", ?? ->
|
|
%% {error, mod_pubsub:extended_error(xmpp:err_bad_request(), "subid-required")};
|
|
%% Invalid subscription identifier
|
|
%%InvalidSubId ->
|
|
%% {error, mod_pubsub:extended_error(?ERR_NOT_ACCEPTABLE, "invalid-subid")};
|
|
%% Requesting entity is not a subscriber
|
|
Subscriptions == [] ->
|
|
{error,
|
|
mod_pubsub:extended_error(xmpp:err_unexpected_request(), mod_pubsub:err_not_subscribed())};
|
|
%% Subid supplied, so use that.
|
|
SubIdExists ->
|
|
Sub = first_in_list(fun
|
|
({_, S}) when S == SubId -> true;
|
|
(_) -> false
|
|
end,
|
|
SubState#pubsub_state.subscriptions),
|
|
case Sub of
|
|
{value, S} ->
|
|
delete_subscriptions(SubState, [S]),
|
|
{result, default};
|
|
false ->
|
|
{error,
|
|
mod_pubsub:extended_error(xmpp:err_unexpected_request(), mod_pubsub:err_not_subscribed())}
|
|
end;
|
|
%% Asking to remove all subscriptions to the given node
|
|
SubId == all ->
|
|
delete_subscriptions(SubState, Subscriptions),
|
|
{result, default};
|
|
%% No subid supplied, but there's only one matching subscription
|
|
length(Subscriptions) == 1 ->
|
|
delete_subscriptions(SubState, Subscriptions),
|
|
{result, default};
|
|
%% No subid and more than one possible subscription match.
|
|
true ->
|
|
{error,
|
|
mod_pubsub:extended_error((xmpp:err_bad_request()), mod_pubsub:err_subid_required())}
|
|
end.
|
|
|
|
delete_subscriptions(SubState, Subscriptions) ->
|
|
NewSubs = lists:foldl(fun ({Subscription, SubId}, Acc) ->
|
|
%%pubsub_subscription:delete_subscription(SubKey, Nidx, SubId),
|
|
Acc -- [{Subscription, SubId}]
|
|
end, SubState#pubsub_state.subscriptions, Subscriptions),
|
|
case {SubState#pubsub_state.affiliation, NewSubs} of
|
|
{none, []} -> del_state(SubState);
|
|
_ -> set_state(SubState#pubsub_state{subscriptions = NewSubs})
|
|
end.
|
|
|
|
%% @doc <p>Publishes the item passed as parameter.</p>
|
|
%% <p>The mechanism works as follow:
|
|
%% <ul>
|
|
%% <li>The main PubSub module prepares the item to publish and passes the
|
|
%% result of the preparation as a {@link mod_pubsub:pubsubItem()} record.</li>
|
|
%% <li>This function gets the prepared record and several other parameters and can decide to:<ul>
|
|
%% <li>reject the publication;</li>
|
|
%% <li>allow the publication as is, letting the main module perform the database persistence;</li>
|
|
%% <li>allow the publication, modifying the record. The main module will store the modified record;</li>
|
|
%% <li>allow it, but perform the needed persistence operations.</li></ul>
|
|
%% </li></ul></p>
|
|
%% <p>The selected behaviour depends on the return parameter:
|
|
%% <ul>
|
|
%% <li><tt>{error, Reason}</tt>: an iq error result will be return. No
|
|
%% publication is actually performed.</li>
|
|
%% <li><tt>true</tt>: Publication operation is allowed, based on the
|
|
%% unmodified record passed in parameter <tt>Item</tt>. If the <tt>Item</tt>
|
|
%% parameter contains an error, no subscription will actually be
|
|
%% performed.</li>
|
|
%% <li><tt>{true, Item}</tt>: Publication operation is allowed, but the
|
|
%% {@link mod_pubsub:pubsubItem()} record returned replaces the value passed
|
|
%% in parameter <tt>Item</tt>. The persistence will be performed by the main
|
|
%% module.</li>
|
|
%% <li><tt>{true, done}</tt>: Publication operation is allowed, but the
|
|
%% {@link mod_pubsub:pubsubItem()} will be considered as already stored and
|
|
%% no further persistence operation will be performed. This case is used,
|
|
%% when the plugin module is doing the persistence by itself or when it want
|
|
%% to completely disable persistence.</li></ul>
|
|
%% </p>
|
|
%% <p>In the default plugin module, the record is unchanged.</p>
|
|
publish_item(Nidx, Publisher, PublishModel, MaxItems, ItemId, Payload,
|
|
_PubOpts) ->
|
|
SubKey = jid:tolower(Publisher),
|
|
GenKey = jid:remove_resource(SubKey),
|
|
GenState = get_state(Nidx, GenKey),
|
|
SubState = case SubKey of
|
|
GenKey -> GenState;
|
|
_ -> get_state(Nidx, SubKey)
|
|
end,
|
|
Affiliation = GenState#pubsub_state.affiliation,
|
|
Subscribed = case PublishModel of
|
|
subscribers -> is_subscribed(GenState#pubsub_state.subscriptions) orelse
|
|
is_subscribed(SubState#pubsub_state.subscriptions);
|
|
_ -> undefined
|
|
end,
|
|
if not ((PublishModel == open) or
|
|
(PublishModel == publishers) and
|
|
((Affiliation == owner)
|
|
or (Affiliation == publisher)
|
|
or (Affiliation == publish_only))
|
|
or (Subscribed == true)) ->
|
|
{error, xmpp:err_forbidden()};
|
|
true ->
|
|
if MaxItems > 0;
|
|
MaxItems == unlimited ->
|
|
Now = erlang:timestamp(),
|
|
case get_item(Nidx, ItemId) of
|
|
{result, #pubsub_item{creation = {_, GenKey}} = OldItem} ->
|
|
set_item(OldItem#pubsub_item{
|
|
modification = {Now, SubKey},
|
|
payload = Payload}),
|
|
{result, {default, broadcast, []}};
|
|
{result, _} ->
|
|
{error, xmpp:err_forbidden()};
|
|
_ ->
|
|
Items = [ItemId | GenState#pubsub_state.items],
|
|
{result, {NI, OI}} = remove_extra_items(Nidx, MaxItems, Items),
|
|
set_state(GenState#pubsub_state{items = NI}),
|
|
set_item(#pubsub_item{
|
|
itemid = {ItemId, Nidx},
|
|
nodeidx = Nidx,
|
|
creation = {Now, GenKey},
|
|
modification = {Now, SubKey},
|
|
payload = Payload}),
|
|
{result, {default, broadcast, OI}}
|
|
end;
|
|
true ->
|
|
{result, {default, broadcast, []}}
|
|
end
|
|
end.
|
|
|
|
remove_extra_items(Nidx, MaxItems) ->
|
|
{result, States} = get_states(Nidx),
|
|
Records = States ++ mnesia:read({pubsub_orphan, Nidx}),
|
|
ItemIds = lists:flatmap(fun(#pubsub_state{items = Is}) ->
|
|
Is;
|
|
(#pubsub_orphan{items = Is}) ->
|
|
Is
|
|
end, Records),
|
|
remove_extra_items(Nidx, MaxItems, ItemIds).
|
|
|
|
%% @doc <p>This function is used to remove extra items, most notably when the
|
|
%% maximum number of items has been reached.</p>
|
|
%% <p>This function is used internally by the core PubSub module, as no
|
|
%% permission check is performed.</p>
|
|
%% <p>In the default plugin module, the oldest items are removed, but other
|
|
%% rules can be used.</p>
|
|
%% <p>If another PubSub plugin wants to delegate the item removal (and if the
|
|
%% plugin is using the default pubsub storage), it can implements this function like this:
|
|
%% ```remove_extra_items(Nidx, MaxItems, ItemIds) ->
|
|
%% node_default:remove_extra_items(Nidx, MaxItems, ItemIds).'''</p>
|
|
remove_extra_items(_Nidx, unlimited, ItemIds) ->
|
|
{result, {ItemIds, []}};
|
|
remove_extra_items(Nidx, MaxItems, ItemIds) ->
|
|
NewItems = lists:sublist(ItemIds, MaxItems),
|
|
OldItems = lists:nthtail(length(NewItems), ItemIds),
|
|
del_items(Nidx, OldItems),
|
|
{result, {NewItems, OldItems}}.
|
|
|
|
remove_expired_items(_Nidx, infinity) ->
|
|
{result, []};
|
|
remove_expired_items(Nidx, Seconds) ->
|
|
Items = mnesia:index_read(pubsub_item, Nidx, #pubsub_item.nodeidx),
|
|
ExpT = misc:usec_to_now(
|
|
erlang:system_time(microsecond) - (Seconds * 1000000)),
|
|
ExpItems = lists:filtermap(
|
|
fun(#pubsub_item{itemid = {ItemId, _},
|
|
modification = {ModT, _}}) when ModT < ExpT ->
|
|
{true, ItemId};
|
|
(#pubsub_item{}) ->
|
|
false
|
|
end, Items),
|
|
del_items(Nidx, ExpItems),
|
|
{result, ExpItems}.
|
|
|
|
%% @doc <p>Triggers item deletion.</p>
|
|
%% <p>Default plugin: The user performing the deletion must be the node owner
|
|
%% or a publisher, or PublishModel being open.</p>
|
|
delete_item(Nidx, Publisher, PublishModel, ItemId) ->
|
|
SubKey = jid:tolower(Publisher),
|
|
GenKey = jid:remove_resource(SubKey),
|
|
GenState = get_state(Nidx, GenKey),
|
|
#pubsub_state{affiliation = Affiliation, items = Items} = GenState,
|
|
Allowed = Affiliation == publisher orelse
|
|
Affiliation == owner orelse
|
|
(PublishModel == open andalso
|
|
case get_item(Nidx, ItemId) of
|
|
{result, #pubsub_item{creation = {_, GenKey}}} -> true;
|
|
_ -> false
|
|
end),
|
|
if not Allowed ->
|
|
{error, xmpp:err_forbidden()};
|
|
true ->
|
|
case lists:member(ItemId, Items) of
|
|
true ->
|
|
del_item(Nidx, ItemId),
|
|
set_state(GenState#pubsub_state{items = lists:delete(ItemId, Items)}),
|
|
{result, {default, broadcast}};
|
|
false ->
|
|
case Affiliation of
|
|
owner ->
|
|
{result, States} = get_states(Nidx),
|
|
Records = States ++ mnesia:read({pubsub_orphan, Nidx}),
|
|
lists:foldl(fun
|
|
(#pubsub_state{items = RI} = S, Res) ->
|
|
case lists:member(ItemId, RI) of
|
|
true ->
|
|
NI = lists:delete(ItemId, RI),
|
|
del_item(Nidx, ItemId),
|
|
mnesia:write(S#pubsub_state{items = NI}),
|
|
{result, {default, broadcast}};
|
|
false ->
|
|
Res
|
|
end;
|
|
(#pubsub_orphan{items = RI} = S, Res) ->
|
|
case lists:member(ItemId, RI) of
|
|
true ->
|
|
NI = lists:delete(ItemId, RI),
|
|
del_item(Nidx, ItemId),
|
|
mnesia:write(S#pubsub_orphan{items = NI}),
|
|
{result, {default, broadcast}};
|
|
false ->
|
|
Res
|
|
end
|
|
end,
|
|
{error, xmpp:err_item_not_found()}, Records);
|
|
_ ->
|
|
{error, xmpp:err_forbidden()}
|
|
end
|
|
end
|
|
end.
|
|
|
|
purge_node(Nidx, Owner) ->
|
|
SubKey = jid:tolower(Owner),
|
|
GenKey = jid:remove_resource(SubKey),
|
|
GenState = get_state(Nidx, GenKey),
|
|
case GenState of
|
|
#pubsub_state{affiliation = owner} ->
|
|
{result, States} = get_states(Nidx),
|
|
lists:foreach(fun
|
|
(#pubsub_state{items = []}) ->
|
|
ok;
|
|
(#pubsub_state{items = Items} = S) ->
|
|
del_items(Nidx, Items),
|
|
set_state(S#pubsub_state{items = []})
|
|
end,
|
|
States),
|
|
del_orphan_items(Nidx),
|
|
{result, {default, broadcast}};
|
|
_ ->
|
|
{error, xmpp:err_forbidden()}
|
|
end.
|
|
|
|
%% @doc <p>Return the current affiliations for the given user</p>
|
|
%% <p>The default module reads affiliations in the main Mnesia
|
|
%% <tt>pubsub_state</tt> table. If a plugin stores its data in the same
|
|
%% table, it should return an empty list, as the affiliation will be read by
|
|
%% the default PubSub module. Otherwise, it should return its own affiliation,
|
|
%% that will be added to the affiliation stored in the main
|
|
%% <tt>pubsub_state</tt> table.</p>
|
|
get_entity_affiliations(Host, Owner) ->
|
|
SubKey = jid:tolower(Owner),
|
|
GenKey = jid:remove_resource(SubKey),
|
|
States = mnesia:match_object(#pubsub_state{stateid = {GenKey, '_'}, _ = '_'}),
|
|
NodeTree = mod_pubsub:tree(Host),
|
|
Reply = lists:foldl(fun (#pubsub_state{stateid = {_, N}, affiliation = A}, Acc) ->
|
|
case NodeTree:get_node(N) of
|
|
#pubsub_node{nodeid = {Host, _}} = Node -> [{Node, A} | Acc];
|
|
_ -> Acc
|
|
end
|
|
end,
|
|
[], States),
|
|
{result, Reply}.
|
|
|
|
get_node_affiliations(Nidx) ->
|
|
{result, States} = get_states(Nidx),
|
|
Tr = fun (#pubsub_state{stateid = {J, _}, affiliation = A}) -> {J, A} end,
|
|
{result, lists:map(Tr, States)}.
|
|
|
|
get_affiliation(Nidx, Owner) ->
|
|
SubKey = jid:tolower(Owner),
|
|
GenKey = jid:remove_resource(SubKey),
|
|
#pubsub_state{affiliation = Affiliation} = get_state(Nidx, GenKey),
|
|
{result, Affiliation}.
|
|
|
|
set_affiliation(Nidx, Owner, Affiliation) ->
|
|
SubKey = jid:tolower(Owner),
|
|
GenKey = jid:remove_resource(SubKey),
|
|
GenState = get_state(Nidx, GenKey),
|
|
case {Affiliation, GenState#pubsub_state.subscriptions} of
|
|
{none, []} -> {result, del_state(GenState)};
|
|
_ -> {result, set_state(GenState#pubsub_state{affiliation = Affiliation})}
|
|
end.
|
|
|
|
%% @doc <p>Return the current subscriptions for the given user</p>
|
|
%% <p>The default module reads subscriptions in the main Mnesia
|
|
%% <tt>pubsub_state</tt> table. If a plugin stores its data in the same
|
|
%% table, it should return an empty list, as the affiliation will be read by
|
|
%% the default PubSub module. Otherwise, it should return its own affiliation,
|
|
%% that will be added to the affiliation stored in the main
|
|
%% <tt>pubsub_state</tt> table.</p>
|
|
get_entity_subscriptions(Host, Owner) ->
|
|
{U, D, _} = SubKey = jid:tolower(Owner),
|
|
GenKey = jid:remove_resource(SubKey),
|
|
States = case SubKey of
|
|
GenKey ->
|
|
mnesia:match_object(#pubsub_state{stateid = {{U, D, '_'}, '_'}, _ = '_'});
|
|
_ ->
|
|
mnesia:match_object(#pubsub_state{stateid = {GenKey, '_'}, _ = '_'})
|
|
++
|
|
mnesia:match_object(#pubsub_state{stateid = {SubKey, '_'}, _ = '_'})
|
|
end,
|
|
NodeTree = mod_pubsub:tree(Host),
|
|
Reply = lists:foldl(fun (#pubsub_state{stateid = {J, N}, subscriptions = Ss}, Acc) ->
|
|
case NodeTree:get_node(N) of
|
|
#pubsub_node{nodeid = {Host, _}} = Node ->
|
|
lists:foldl(fun ({Sub, SubId}, Acc2) ->
|
|
[{Node, Sub, SubId, J} | Acc2]
|
|
end,
|
|
Acc, Ss);
|
|
_ ->
|
|
Acc
|
|
end
|
|
end,
|
|
[], States),
|
|
{result, Reply}.
|
|
|
|
get_node_subscriptions(Nidx) ->
|
|
{result, States} = get_states(Nidx),
|
|
Tr = fun (#pubsub_state{stateid = {J, _}, subscriptions = Subscriptions}) ->
|
|
lists:foldl(fun ({S, SubId}, Acc) ->
|
|
[{J, S, SubId} | Acc]
|
|
end,
|
|
[], Subscriptions)
|
|
end,
|
|
{result, lists:flatmap(Tr, States)}.
|
|
|
|
get_subscriptions(Nidx, Owner) ->
|
|
SubKey = jid:tolower(Owner),
|
|
SubState = get_state(Nidx, SubKey),
|
|
{result, SubState#pubsub_state.subscriptions}.
|
|
|
|
set_subscriptions(Nidx, Owner, Subscription, SubId) ->
|
|
SubKey = jid:tolower(Owner),
|
|
SubState = get_state(Nidx, SubKey),
|
|
case {SubId, SubState#pubsub_state.subscriptions} of
|
|
{_, []} ->
|
|
case Subscription of
|
|
none ->
|
|
{error,
|
|
mod_pubsub:extended_error((xmpp:err_bad_request()), mod_pubsub:err_not_subscribed())};
|
|
_ ->
|
|
new_subscription(Nidx, Owner, Subscription, SubState)
|
|
end;
|
|
{<<>>, [{_, SID}]} ->
|
|
case Subscription of
|
|
none -> unsub_with_subid(SubState, SID);
|
|
_ -> replace_subscription({Subscription, SID}, SubState)
|
|
end;
|
|
{<<>>, [_ | _]} ->
|
|
{error,
|
|
mod_pubsub:extended_error((xmpp:err_bad_request()), mod_pubsub:err_subid_required())};
|
|
_ ->
|
|
case Subscription of
|
|
none -> unsub_with_subid(SubState, SubId);
|
|
_ -> replace_subscription({Subscription, SubId}, SubState)
|
|
end
|
|
end.
|
|
|
|
replace_subscription(NewSub, SubState) ->
|
|
NewSubs = replace_subscription(NewSub, SubState#pubsub_state.subscriptions, []),
|
|
{result, set_state(SubState#pubsub_state{subscriptions = NewSubs})}.
|
|
|
|
replace_subscription(_, [], Acc) -> Acc;
|
|
replace_subscription({Sub, SubId}, [{_, SubId} | T], Acc) ->
|
|
replace_subscription({Sub, SubId}, T, [{Sub, SubId} | Acc]).
|
|
|
|
new_subscription(_Nidx, _Owner, Sub, SubState) ->
|
|
%%SubId = pubsub_subscription:add_subscription(Owner, Nidx, []),
|
|
SubId = pubsub_subscription:make_subid(),
|
|
Subs = SubState#pubsub_state.subscriptions,
|
|
set_state(SubState#pubsub_state{subscriptions = [{Sub, SubId} | Subs]}),
|
|
{result, {Sub, SubId}}.
|
|
|
|
unsub_with_subid(SubState, SubId) ->
|
|
%%pubsub_subscription:delete_subscription(SubState#pubsub_state.stateid, Nidx, SubId),
|
|
NewSubs = [{S, Sid}
|
|
|| {S, Sid} <- SubState#pubsub_state.subscriptions,
|
|
SubId =/= Sid],
|
|
case {NewSubs, SubState#pubsub_state.affiliation} of
|
|
{[], none} -> {result, del_state(SubState)};
|
|
_ -> {result, set_state(SubState#pubsub_state{subscriptions = NewSubs})}
|
|
end.
|
|
|
|
%% @doc <p>Returns a list of Owner's nodes on Host with pending
|
|
%% subscriptions.</p>
|
|
get_pending_nodes(Host, Owner) ->
|
|
GenKey = jid:remove_resource(jid:tolower(Owner)),
|
|
States = mnesia:match_object(#pubsub_state{stateid = {GenKey, '_'},
|
|
affiliation = owner,
|
|
_ = '_'}),
|
|
NodeIdxs = [Nidx || #pubsub_state{stateid = {_, Nidx}} <- States],
|
|
NodeTree = mod_pubsub:tree(Host),
|
|
Reply = mnesia:foldl(fun (#pubsub_state{stateid = {_, Nidx}} = S, Acc) ->
|
|
case lists:member(Nidx, NodeIdxs) of
|
|
true ->
|
|
case get_nodes_helper(NodeTree, S) of
|
|
{value, Node} -> [Node | Acc];
|
|
false -> Acc
|
|
end;
|
|
false ->
|
|
Acc
|
|
end
|
|
end,
|
|
[], pubsub_state),
|
|
{result, Reply}.
|
|
|
|
get_nodes_helper(NodeTree, #pubsub_state{stateid = {_, N}, subscriptions = Subs}) ->
|
|
HasPending = fun
|
|
({pending, _}) -> true;
|
|
(pending) -> true;
|
|
(_) -> false
|
|
end,
|
|
case lists:any(HasPending, Subs) of
|
|
true ->
|
|
case NodeTree:get_node(N) of
|
|
#pubsub_node{nodeid = {_, Node}} -> {value, Node};
|
|
_ -> false
|
|
end;
|
|
false ->
|
|
false
|
|
end.
|
|
|
|
%% @doc Returns the list of stored states for a given node.
|
|
%% <p>For the default PubSub module, states are stored in Mnesia database.</p>
|
|
%% <p>We can consider that the pubsub_state table have been created by the main
|
|
%% mod_pubsub module.</p>
|
|
%% <p>PubSub plugins can store the states where they wants (for example in a
|
|
%% relational database).</p>
|
|
%% <p>If a PubSub plugin wants to delegate the states storage to the default node,
|
|
%% they can implement this function like this:
|
|
%% ```get_states(Nidx) ->
|
|
%% node_default:get_states(Nidx).'''</p>
|
|
get_states(Nidx) ->
|
|
States = case catch mnesia:index_read(pubsub_state, Nidx, #pubsub_state.nodeidx) of
|
|
List when is_list(List) -> List;
|
|
_ -> []
|
|
end,
|
|
{result, States}.
|
|
|
|
%% @doc <p>Returns a state (one state list), given its reference.</p>
|
|
get_state(Nidx, Key) ->
|
|
StateId = {Key, Nidx},
|
|
case catch mnesia:read({pubsub_state, StateId}) of
|
|
[State] when is_record(State, pubsub_state) -> State;
|
|
_ -> #pubsub_state{stateid = StateId, nodeidx = Nidx}
|
|
end.
|
|
|
|
%% @doc <p>Write a state into database.</p>
|
|
set_state(State) when is_record(State, pubsub_state) ->
|
|
mnesia:write(State).
|
|
%set_state(_) -> {error, ?ERR_INTERNAL_SERVER_ERROR}.
|
|
|
|
%% @doc <p>Delete a state from database.</p>
|
|
del_state(#pubsub_state{stateid = {Key, Nidx}, items = Items}) ->
|
|
case Items of
|
|
[] ->
|
|
ok;
|
|
_ ->
|
|
Orphan = #pubsub_orphan{nodeid = Nidx, items =
|
|
case mnesia:read({pubsub_orphan, Nidx}) of
|
|
[#pubsub_orphan{items = ItemIds}] ->
|
|
lists:usort(ItemIds++Items);
|
|
_ ->
|
|
Items
|
|
end},
|
|
mnesia:write(Orphan)
|
|
end,
|
|
mnesia:delete({pubsub_state, {Key, Nidx}}).
|
|
|
|
%% @doc Returns the list of stored items for a given node.
|
|
%% <p>For the default PubSub module, items are stored in Mnesia database.</p>
|
|
%% <p>We can consider that the pubsub_item table have been created by the main
|
|
%% mod_pubsub module.</p>
|
|
%% <p>PubSub plugins can store the items where they wants (for example in a
|
|
%% relational database), or they can even decide not to persist any items.</p>
|
|
get_items(Nidx, _From, undefined) ->
|
|
RItems = lists:keysort(#pubsub_item.creation,
|
|
mnesia:index_read(pubsub_item, Nidx, #pubsub_item.nodeidx)),
|
|
{result, {RItems, undefined}};
|
|
|
|
get_items(Nidx, _From, #rsm_set{max = Max, index = IncIndex,
|
|
'after' = After, before = Before}) ->
|
|
case lists:keysort(#pubsub_item.creation,
|
|
mnesia:index_read(pubsub_item, Nidx, #pubsub_item.nodeidx)) of
|
|
[] ->
|
|
{result, {[], #rsm_set{count = 0}}};
|
|
RItems ->
|
|
Count = length(RItems),
|
|
Limit = case Max of
|
|
undefined -> ?MAXITEMS;
|
|
_ -> Max
|
|
end,
|
|
{Offset, ItemsPage} =
|
|
case {IncIndex, Before, After} of
|
|
{undefined, undefined, undefined} ->
|
|
{0, lists:sublist(RItems, Limit)};
|
|
{I, undefined, undefined} ->
|
|
SubList = lists:nthtail(I, RItems),
|
|
{I, lists:sublist(SubList, Limit)};
|
|
{_, <<>>, undefined} ->
|
|
%% 2.5 Requesting the Last Page in a Result Set
|
|
SubList = lists:reverse(RItems),
|
|
{Count-Limit, lists:reverse(lists:sublist(SubList, Limit))};
|
|
{_, Stamp, undefined} ->
|
|
BeforeNow = encode_stamp(Stamp),
|
|
{NewIndex, SubList} = extract_sublist(before_now, BeforeNow,
|
|
0, lists:reverse(RItems)),
|
|
{Count-NewIndex-Limit, lists:reverse(lists:sublist(SubList, Limit))};
|
|
{_, undefined, Stamp} ->
|
|
AfterNow = encode_stamp(Stamp),
|
|
{NewIndex, SubList} = extract_sublist(after_now, AfterNow,
|
|
0, RItems),
|
|
{NewIndex, lists:sublist(SubList, Limit)}
|
|
end,
|
|
Rsm = rsm_page(Count, IncIndex, Offset, ItemsPage),
|
|
{result, {ItemsPage, Rsm}}
|
|
end.
|
|
|
|
get_items(Nidx, JID, AccessModel, PresenceSubscription, RosterGroup, _SubId, RSM) ->
|
|
SubKey = jid:tolower(JID),
|
|
GenKey = jid:remove_resource(SubKey),
|
|
GenState = get_state(Nidx, GenKey),
|
|
SubState = get_state(Nidx, SubKey),
|
|
Affiliation = GenState#pubsub_state.affiliation,
|
|
BareSubscriptions = GenState#pubsub_state.subscriptions,
|
|
FullSubscriptions = SubState#pubsub_state.subscriptions,
|
|
Whitelisted = can_fetch_item(Affiliation, BareSubscriptions) orelse
|
|
can_fetch_item(Affiliation, FullSubscriptions),
|
|
if %%SubId == "", ?? ->
|
|
%% Entity has multiple subscriptions to the node but does not specify a subscription ID
|
|
%{error, mod_pubsub:extended_error(xmpp:err_bad_request(), "subid-required")};
|
|
%%InvalidSubId ->
|
|
%% Entity is subscribed but specifies an invalid subscription ID
|
|
%{error, mod_pubsub:extended_error(?ERR_NOT_ACCEPTABLE, "invalid-subid")};
|
|
(Affiliation == outcast) or (Affiliation == publish_only) ->
|
|
{error, xmpp:err_forbidden()};
|
|
(AccessModel == presence) and not PresenceSubscription ->
|
|
{error,
|
|
mod_pubsub:extended_error((xmpp:err_not_authorized()), mod_pubsub:err_presence_subscription_required())};
|
|
(AccessModel == roster) and not RosterGroup ->
|
|
{error,
|
|
mod_pubsub:extended_error((xmpp:err_not_authorized()), mod_pubsub:err_not_in_roster_group())};
|
|
(AccessModel == whitelist) and not Whitelisted ->
|
|
{error,
|
|
mod_pubsub:extended_error((xmpp:err_not_allowed()), mod_pubsub:err_closed_node())};
|
|
(AccessModel == authorize) and not Whitelisted ->
|
|
{error, xmpp:err_forbidden()};
|
|
%%MustPay ->
|
|
%% % Payment is required for a subscription
|
|
%% {error, ?ERR_PAYMENT_REQUIRED};
|
|
true ->
|
|
get_items(Nidx, JID, RSM)
|
|
end.
|
|
|
|
extract_sublist(A, Now, Index, [#pubsub_item{creation = {Creation, _}} | RItems])
|
|
when ((A == before_now) and (Creation >= Now))
|
|
or ((A == after_now) and (Creation =< Now)) ->
|
|
extract_sublist(A, Now, Index+1, RItems);
|
|
extract_sublist(_, _, Index, RItems) ->
|
|
{Index, RItems}.
|
|
|
|
get_only_item(Nidx, From) ->
|
|
get_last_items(Nidx, From, 1).
|
|
|
|
get_last_items(Nidx, _From, Count) when Count > 0 ->
|
|
Items = mnesia:index_read(pubsub_item, Nidx, #pubsub_item.nodeidx),
|
|
LastItems = lists:reverse(lists:keysort(#pubsub_item.modification, Items)),
|
|
{result, lists:sublist(LastItems, Count)};
|
|
get_last_items(_Nidx, _From, _Count) ->
|
|
{result, []}.
|
|
|
|
%% @doc <p>Returns an item (one item list), given its reference.</p>
|
|
|
|
get_item(Nidx, ItemId) ->
|
|
case mnesia:read({pubsub_item, {ItemId, Nidx}}) of
|
|
[Item] when is_record(Item, pubsub_item) -> {result, Item};
|
|
_ -> {error, xmpp:err_item_not_found()}
|
|
end.
|
|
|
|
get_item(Nidx, ItemId, JID, AccessModel, PresenceSubscription, RosterGroup, _SubId) ->
|
|
SubKey = jid:tolower(JID),
|
|
GenKey = jid:remove_resource(SubKey),
|
|
GenState = get_state(Nidx, GenKey),
|
|
Affiliation = GenState#pubsub_state.affiliation,
|
|
Subscriptions = GenState#pubsub_state.subscriptions,
|
|
Whitelisted = can_fetch_item(Affiliation, Subscriptions),
|
|
if %%SubId == "", ?? ->
|
|
%% Entity has multiple subscriptions to the node but does not specify a subscription ID
|
|
%{error, mod_pubsub:extended_error(xmpp:err_bad_request(), "subid-required")};
|
|
%%InvalidSubId ->
|
|
%% Entity is subscribed but specifies an invalid subscription ID
|
|
%{error, mod_pubsub:extended_error(?ERR_NOT_ACCEPTABLE, "invalid-subid")};
|
|
(Affiliation == outcast) or (Affiliation == publish_only) ->
|
|
{error, xmpp:err_forbidden()};
|
|
(AccessModel == presence) and not PresenceSubscription ->
|
|
{error,
|
|
mod_pubsub:extended_error((xmpp:err_not_authorized()), mod_pubsub:err_presence_subscription_required())};
|
|
(AccessModel == roster) and not RosterGroup ->
|
|
{error,
|
|
mod_pubsub:extended_error((xmpp:err_not_authorized()), mod_pubsub:err_not_in_roster_group())};
|
|
(AccessModel == whitelist) and not Whitelisted ->
|
|
{error,
|
|
mod_pubsub:extended_error((xmpp:err_not_allowed()), mod_pubsub:err_closed_node())};
|
|
(AccessModel == authorize) and not Whitelisted ->
|
|
{error, xmpp:err_forbidden()};
|
|
%%MustPay ->
|
|
%% % Payment is required for a subscription
|
|
%% {error, ?ERR_PAYMENT_REQUIRED};
|
|
true ->
|
|
get_item(Nidx, ItemId)
|
|
end.
|
|
|
|
%% @doc <p>Write an item into database.</p>
|
|
set_item(Item) when is_record(Item, pubsub_item) ->
|
|
mnesia:write(Item).
|
|
%set_item(_) -> {error, ?ERR_INTERNAL_SERVER_ERROR}.
|
|
|
|
%% @doc <p>Delete an item from database.</p>
|
|
del_item(Nidx, ItemId) ->
|
|
mnesia:delete({pubsub_item, {ItemId, Nidx}}).
|
|
|
|
del_items(Nidx, ItemIds) ->
|
|
lists:foreach(fun (ItemId) -> del_item(Nidx, ItemId)
|
|
end,
|
|
ItemIds).
|
|
|
|
del_orphan_items(Nidx) ->
|
|
case mnesia:read({pubsub_orphan, Nidx}) of
|
|
[#pubsub_orphan{items = ItemIds}] ->
|
|
del_items(Nidx, ItemIds),
|
|
mnesia:delete({pubsub_orphan, Nidx});
|
|
_ ->
|
|
ok
|
|
end.
|
|
|
|
get_item_name(_Host, _Node, Id) ->
|
|
{result, Id}.
|
|
|
|
%% @doc <p>Return the path of the node. In flat it's just node id.</p>
|
|
node_to_path(Node) ->
|
|
{result, [Node]}.
|
|
|
|
path_to_node(Path) ->
|
|
{result,
|
|
case Path of
|
|
%% default slot
|
|
[Node] -> iolist_to_binary(Node);
|
|
%% handle old possible entries, used when migrating database content to new format
|
|
[Node | _] when is_binary(Node) ->
|
|
iolist_to_binary(str:join([<<"">> | Path], <<"/">>));
|
|
%% default case (used by PEP for example)
|
|
_ -> iolist_to_binary(Path)
|
|
end}.
|
|
|
|
can_fetch_item(owner, _) -> true;
|
|
can_fetch_item(member, _) -> true;
|
|
can_fetch_item(publisher, _) -> true;
|
|
can_fetch_item(publish_only, _) -> false;
|
|
can_fetch_item(outcast, _) -> false;
|
|
can_fetch_item(none, Subscriptions) -> is_subscribed(Subscriptions).
|
|
%can_fetch_item(_Affiliation, _Subscription) -> false.
|
|
|
|
is_subscribed(Subscriptions) ->
|
|
lists:any(fun
|
|
({subscribed, _SubId}) -> true;
|
|
(_) -> false
|
|
end,
|
|
Subscriptions).
|
|
|
|
first_in_list(_Pred, []) ->
|
|
false;
|
|
first_in_list(Pred, [H | T]) ->
|
|
case Pred(H) of
|
|
true -> {value, H};
|
|
_ -> first_in_list(Pred, T)
|
|
end.
|
|
|
|
rsm_page(Count, _, _, []) ->
|
|
#rsm_set{count = Count};
|
|
rsm_page(Count, Index, Offset, Items) ->
|
|
FirstItem = hd(Items),
|
|
LastItem = lists:last(Items),
|
|
First = decode_stamp(element(1, FirstItem#pubsub_item.creation)),
|
|
Last = decode_stamp(element(1, LastItem#pubsub_item.creation)),
|
|
#rsm_set{count = Count, index = Index,
|
|
first = #rsm_first{index = Offset, data = First},
|
|
last = Last}.
|
|
|
|
encode_stamp(Stamp) ->
|
|
try xmpp_util:decode_timestamp(Stamp)
|
|
catch _:{bad_timestamp, _} ->
|
|
Stamp % We should return a proper error to the client instead.
|
|
end.
|
|
decode_stamp(Stamp) ->
|
|
xmpp_util:encode_timestamp(Stamp).
|
|
|
|
transform({pubsub_state, {Id, Nidx}, Is, A, Ss}) ->
|
|
{pubsub_state, {Id, Nidx}, Nidx, Is, A, Ss};
|
|
transform({pubsub_item, {Id, Nidx}, C, M, P}) ->
|
|
{pubsub_item, {Id, Nidx}, Nidx, C, M, P};
|
|
transform(Rec) ->
|
|
Rec.
|