PubSub: fix RSM support (#1994)(#2Â014)

This commit is contained in:
Christophe Romain 2017-09-27 10:51:37 +02:00
parent cf09ed2df2
commit 07a193d4dc
2 changed files with 113 additions and 104 deletions

View File

@ -1986,8 +1986,14 @@ get_items(Host, Node, From, SubId, SMaxItems, ItemIds, RSM) ->
Owners = node_owners_call(Host, Type, Nidx, O),
{PS, RG} = get_presence_and_roster_permissions(
Host, From, Owners, AccessModel, AllowedGroups),
node_call(Host, Type, get_items,
[Nidx, From, AccessModel, PS, RG, SubId, RSM])
case ItemIds of
[ItemId] ->
node_call(Host, Type, get_item,
[Nidx, ItemId, From, AccessModel, PS, RG, undefined]);
_ ->
node_call(Host, Type, get_items,
[Nidx, From, AccessModel, PS, RG, SubId, RSM])
end
end
end,
case transaction(Host, Node, Action, sync_dirty) of
@ -2005,6 +2011,10 @@ get_items(Host, Node, From, SubId, SMaxItems, ItemIds, RSM) ->
#pubsub{items = #ps_items{node = Node,
items = itemsEls(lists:sublist(SendItems, MaxItems))},
rsm = RsmOut}};
{result, {_, Item}} ->
{result,
#pubsub{items = #ps_items{node = Node,
items = itemsEls([Item])}}};
Error ->
Error
end.

View File

@ -647,95 +647,60 @@ del_state(Nidx, JID) ->
" where jid=%(J)s and nodeid=%(Nidx)d")),
ok.
get_items(Nidx, From, undefined) ->
MaxItems = case ejabberd_sql:sql_query_t(
?SQL("select @(val)s from pubsub_node_option "
"where nodeid=%(Nidx)d and name='max_items'")) of
{selected, [{Value}]} ->
misc:expr_to_term(Value);
_ ->
?MAXITEMS
end,
get_items(Nidx, From, #rsm_set{max = MaxItems});
get_items(Nidx, _From, undefined) ->
SNidx = misc:i2l(Nidx),
case ejabberd_sql:sql_query_t(
[<<"select itemid, publisher, creation, modification, payload",
" from pubsub_item where nodeid='", SNidx/binary, "'">>]) of
{selected, _, AllItems} ->
Count = length(AllItems),
if Count =< ?MAXITEMS ->
{result, {[raw_to_item(Nidx, RItem) || RItem <- AllItems], undefined}};
true ->
RItems = lists:sublist(AllItems, ?MAXITEMS),
Rsm = rsm_page(Count, 0, 0, RItems),
{result, {[raw_to_item(Nidx, RItem) || RItem <- RItems], Rsm}}
end;
_ ->
{result, {[], undefined}}
end;
get_items(Nidx, _From, #rsm_set{max = Max, index = IncIndex,
'after' = After, before = Before}) ->
{Way, Order} = if After == <<>> -> {<<"is not">>, <<"desc">>};
After /= undefined -> {<<"<">>, <<"desc">>};
Before == <<>> -> {<<"is not">>, <<"asc">>};
Before /= undefined -> {<<">">>, <<"asc">>};
true -> {<<"is not">>, <<"desc">>}
end,
SNidx = misc:i2l(Nidx),
I = if After /= undefined -> After;
Before /= undefined -> Before;
true -> undefined
end,
[AttrName, Id] =
case I of
undefined when IncIndex =/= undefined ->
case ejabberd_sql:sql_query_t(
[<<"select creation from pubsub_item pi "
"where exists ( select count(*) as count1 "
"from pubsub_item where nodeid='">>, SNidx,
<<"' and creation > pi.creation having count1 = ">>,
integer_to_binary(IncIndex), <<" );">>]) of
{selected, [_], [[O]]} ->
[<<"creation">>, <<"'", O/binary, "'">>];
_ ->
[<<"creation">>, <<"null">>]
end;
undefined ->
[<<"creation">>, <<"null">>];
<<>> ->
[<<"creation">>, <<"null">>];
I ->
[A, B] = str:tokens(ejabberd_sql:escape(I), <<"@">>),
[A, <<"'", B/binary, "'">>]
end,
Count = case ejabberd_sql:sql_query_t(
[<<"select count(*) from pubsub_item where nodeid='">>,
SNidx, <<"';">>]) of
{selected, [_], [[C]]} -> binary_to_integer(C);
Count = case catch ejabberd_sql:sql_query_t(
?SQL("select @(count(itemid))d from pubsub_item"
" where nodeid=%(Nidx)d")) of
{selected, [{C}]} -> C;
_ -> 0
end,
Offset = case {IncIndex, Before, After} of
{I, undefined, undefined} when is_integer(I) -> I;
_ -> 0
end,
Limit = case Max of
undefined -> ?MAXITEMS;
_ -> Max
end,
Filters = rsm_filters(misc:i2l(Nidx), Before, After),
Query = fun(mssql, _) ->
ejabberd_sql:sql_query_t(
[<<"select top ">>, integer_to_binary(Max),
<<" itemid, publisher, creation, modification, payload "
"from pubsub_item where nodeid='">>, SNidx,
<<"' and ">>, AttrName, <<" ">>, Way, <<" ">>, Id, <<" order by ">>,
AttrName, <<" ">>, Order, <<";">>]);
[<<"select top ", (integer_to_binary(Limit))/binary,
" itemid, publisher, creation, modification, payload",
" from pubsub_item", Filters/binary>>]);
%OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY;
(_, _) ->
ejabberd_sql:sql_query_t(
[<<"select itemid, publisher, creation, modification, payload "
"from pubsub_item where nodeid='">>, SNidx,
<<"' and ">>, AttrName, <<" ">>, Way, <<" ">>, Id, <<" order by ">>,
AttrName, <<" ">>, Order, <<" limit ">>,
integer_to_binary(Max), <<" ;">>])
[<<"select itemid, publisher, creation, modification, payload",
" from pubsub_item", Filters/binary,
" limit ", (integer_to_binary(Limit))/binary,
" offset ", (integer_to_binary(Offset))/binary>>])
end,
case ejabberd_sql:sql_query_t(Query) of
{selected, _, []} ->
{result, {[], #rsm_set{count = Count}}};
{selected, [<<"itemid">>, <<"publisher">>, <<"creation">>,
<<"modification">>, <<"payload">>], RItems} ->
case RItems of
[[_, _, _, F, _]|_] ->
Index = case catch ejabberd_sql:sql_query_t(
[<<"select count(*) from pubsub_item "
"where nodeid='">>, SNidx, <<"' and ">>,
AttrName, <<" > '">>, F, <<"';">>]) of
{selected, [_], [[In]]} -> binary_to_integer(In);
_ -> 0
end,
[_, _, _, L, _] = lists:last(RItems),
RsmOut = #rsm_set{count = Count,
index = Index,
first = #rsm_first{
index = Index,
data = <<"creation@", F/binary>>},
last = <<"creation@", L/binary>>},
{result, {[raw_to_item(Nidx, RItem) || RItem <- RItems], RsmOut}};
[] ->
{result, {[], #rsm_set{count = Count}}}
end;
Rsm = rsm_page(Count, IncIndex, Offset, RItems),
{result, {[raw_to_item(Nidx, RItem) || RItem <- RItems], Rsm}};
_ ->
{result, {[], undefined}}
end.
@ -773,24 +738,24 @@ get_items(Nidx, JID, AccessModel, PresenceSubscription, RosterGroup, _SubId, RSM
get_items(Nidx, JID, RSM)
end.
get_last_items(Nidx, _From, Count) ->
Limit = misc:i2l(Count),
get_last_items(Nidx, _From, Limit) ->
SNidx = misc:i2l(Nidx),
Query = fun(mssql, _) ->
ejabberd_sql:sql_query_t(
[<<"select top ">>, Limit,
<<" itemid, publisher, creation, modification, payload "
"from pubsub_item where nodeid='">>, SNidx,
<<"' order by modification desc ;">>]);
[<<"select top ", (integer_to_binary(Limit))/binary,
" itemid, publisher, creation, modification, payload",
" from pubsub_item where nodeid='", SNidx/binary,
"' order by modification desc">>]);
(_, _) ->
ejabberd_sql:sql_query_t(
[<<"select itemid, publisher, creation, modification, payload "
"from pubsub_item where nodeid='">>, SNidx,
<<"' order by modification desc limit ">>, Limit, <<";">>])
[<<"select itemid, publisher, creation, modification, payload",
" from pubsub_item where nodeid='", SNidx/binary,
"' order by modification desc ",
" limit ", (integer_to_binary(Limit))/binary>>])
end,
case catch ejabberd_sql:sql_query_t(Query) of
{selected,
[<<"itemid">>, <<"publisher">>, <<"creation">>, <<"modification">>, <<"payload">>], RItems} ->
{selected, [<<"itemid">>, <<"publisher">>, <<"creation">>,
<<"modification">>, <<"payload">>], RItems} ->
{result, [raw_to_item(Nidx, RItem) || RItem <- RItems]};
_ ->
{result, []}
@ -798,9 +763,9 @@ get_last_items(Nidx, _From, Count) ->
get_item(Nidx, ItemId) ->
case catch ejabberd_sql:sql_query_t(
?SQL("select @(itemid)s, @(publisher)s, @(creation)s,"
" @(modification)s, @(payload)s from pubsub_item"
" where nodeid=%(Nidx)d and itemid=%(ItemId)s"))
?SQL("select @(itemid)s, @(publisher)s, @(creation)s,"
" @(modification)s, @(payload)s from pubsub_item"
" where nodeid=%(Nidx)d and itemid=%(ItemId)s"))
of
{selected, [RItem]} ->
{result, raw_to_item(Nidx, RItem)};
@ -850,11 +815,8 @@ set_item(Item) ->
P = encode_jid(JID),
Payload = Item#pubsub_item.payload,
XML = str:join([fxml:element_to_binary(X) || X<-Payload], <<>>),
S = fun ({T1, T2, T3}) ->
str:join([misc:i2l(T1, 6), misc:i2l(T2, 6), misc:i2l(T3, 6)], <<":">>)
end,
SM = S(M),
SC = S(C),
SM = encode_now(M),
SC = encode_now(C),
?SQL_UPSERT_T(
"pubsub_item",
["!nodeid=%(Nidx)d",
@ -1029,15 +991,52 @@ raw_to_item(Nidx, [ItemId, SJID, Creation, Modification, XML]) ->
raw_to_item(Nidx, {ItemId, SJID, Creation, Modification, XML});
raw_to_item(Nidx, {ItemId, SJID, Creation, Modification, XML}) ->
JID = decode_jid(SJID),
ToTime = fun (Str) ->
[T1, T2, T3] = str:tokens(Str, <<":">>),
{misc:l2i(T1), misc:l2i(T2), misc:l2i(T3)}
end,
Payload = case fxml_stream:parse_element(XML) of
{error, _Reason} -> [];
El -> [El]
end,
#pubsub_item{itemid = {ItemId, Nidx},
creation = {ToTime(Creation), jid:remove_resource(JID)},
modification = {ToTime(Modification), JID},
creation = {decode_now(Creation), jid:remove_resource(JID)},
modification = {decode_now(Modification), JID},
payload = Payload}.
rsm_filters(SNidx, undefined, undefined) ->
<<" where nodeid='", SNidx/binary, "'",
" order by creation asc">>;
rsm_filters(SNidx, undefined, After) ->
<<" where nodeid='", SNidx/binary, "'",
" and creation>'", (encode_stamp(After))/binary, "'",
" order by creation asc">>;
rsm_filters(SNidx, <<>>, undefined) ->
%% 2.5 Requesting the Last Page in a Result Set
Now = p1_time_compat:timestamp(),
<<" where nodeid='", SNidx/binary, "'",
" and creation<'", (encode_now(Now))/binary, "'",
" order by creation desc">>;
rsm_filters(SNidx, Before, undefined) ->
<<" where nodeid='", SNidx/binary, "'",
" and creation<'", (encode_stamp(Before))/binary, "'",
" order by creation desc">>.
rsm_page(Count, Index, Offset, Items) ->
First = decode_stamp(lists:nth(3, hd(Items))),
Last = decode_stamp(lists:nth(3, lists:last(Items))),
#rsm_set{count = Count, index = Index,
first = #rsm_first{index = Offset, data = First},
last = Last}.
encode_stamp(Stamp) ->
case jlib:datetime_string_to_timestamp(Stamp) of
{MS,S,US} -> encode_now({MS,S,US});
_ -> Stamp
end.
decode_stamp(Stamp) ->
jlib:now_to_utc_string(decode_now(Stamp)).
encode_now({T1, T2, T3}) ->
<<(misc:i2l(T1, 6))/binary, ":",
(misc:i2l(T2, 6))/binary, ":",
(misc:i2l(T3, 6))/binary>>.
decode_now(NowStr) ->
[MS, S, US] = binary:split(NowStr, <<":">>, [global]),
{binary_to_integer(MS), binary_to_integer(S), binary_to_integer(US)}.