PubSub: add RSM support for mnesia backend

This commit is contained in:
Christophe Romain 2017-09-27 20:39:54 +02:00
parent 3c8308bb8d
commit 216a0c97b9
1 changed files with 67 additions and 3 deletions

View File

@ -728,9 +728,53 @@ del_state(#pubsub_state{stateid = {Key, Nidx}, items = Items}) ->
%% mod_pubsub module.</p>
%% <p>PubSub plugins can store the items where they wants (for example in a
%% relational database), or they can even decide not to persist any items.</p>
get_items(Nidx, _From, _RSM) ->
Items = mnesia:index_read(pubsub_item, Nidx, #pubsub_item.nodeidx),
{result, {lists:keysort(#pubsub_item.creation, Items), undefined}}.
get_items(Nidx, _From, undefined) ->
RItems = lists:keysort(#pubsub_item.creation,
mnesia:index_read(pubsub_item, Nidx, #pubsub_item.nodeidx)),
Count = length(RItems),
if Count =< ?MAXITEMS ->
{result, {RItems, undefined}};
true ->
ItemsPage = lists:sublist(RItems, ?MAXITEMS),
Rsm = rsm_page(Count, 0, 0, ItemsPage),
{result, {ItemsPage, Rsm}}
end;
get_items(Nidx, _From, #rsm_set{max = Max, index = IncIndex,
'after' = After, before = Before}) ->
RItems = lists:keysort(#pubsub_item.creation,
mnesia:index_read(pubsub_item, Nidx, #pubsub_item.nodeidx)),
Count = length(RItems),
Limit = case Max of
undefined -> ?MAXITEMS;
_ -> Max
end,
{Offset, ItemsPage} =
case {IncIndex, Before, After} of
{I, undefined, undefined} ->
SubList = lists:nthtail(I, RItems),
{I, lists:sublist(SubList, Limit)};
{_, <<>>, undefined} ->
%% 2.5 Requesting the Last Page in a Result Set
SubList = lists:reverse(RItems),
{0, lists:sublist(SubList, Limit)};
{_, Stamp, undefined} ->
BeforeNow = encode_stamp(Stamp),
SubList = lists:dropwhile(
fun(#pubsub_item{creation = {Now, _}}) ->
Now >= BeforeNow
end, lists:reverse(RItems)),
{0, lists:sublist(SubList, Limit)};
{_, undefined, Stamp} ->
AfterNow = encode_stamp(Stamp),
SubList = lists:dropwhile(
fun(#pubsub_item{creation = {Now, _}}) ->
Now =< AfterNow
end, RItems),
{0, lists:sublist(SubList, Limit)}
end,
Rsm = rsm_page(Count, IncIndex, Offset, ItemsPage),
{result, {ItemsPage, Rsm}}.
get_items(Nidx, JID, AccessModel, PresenceSubscription, RosterGroup, _SubId, RSM) ->
SubKey = jid:tolower(JID),
@ -880,6 +924,26 @@ first_in_list(Pred, [H | T]) ->
_ -> first_in_list(Pred, T)
end.
rsm_page(Count, Index, Offset, Items) ->
FirstItem = hd(Items),
LastItem = lists:last(Items),
First = decode_stamp(element(1, FirstItem#pubsub_item.creation)),
Last = decode_stamp(element(1, LastItem#pubsub_item.creation)),
#rsm_set{count = Count, index = Index,
first = #rsm_first{index = Offset, data = First},
last = Last}.
encode_stamp(Stamp) ->
case catch xmpp_util:decode_timestamp(Stamp) of
{MS,S,US} -> {MS,S,US};
_ -> Stamp
end.
decode_stamp(Stamp) ->
case catch xmpp_util:encode_timestamp(Stamp) of
TimeStamp when is_binary(TimeStamp) -> TimeStamp;
_ -> Stamp
end.
transform({pubsub_state, {Id, Nidx}, Is, A, Ss}) ->
{pubsub_state, {Id, Nidx}, Nidx, Is, A, Ss};
transform({pubsub_item, {Id, Nidx}, C, M, P}) ->