PubSub: fix RSM support (#1994)(#2Â014)
This commit is contained in:
parent
cf09ed2df2
commit
07a193d4dc
|
@ -1986,8 +1986,14 @@ get_items(Host, Node, From, SubId, SMaxItems, ItemIds, RSM) ->
|
|||
Owners = node_owners_call(Host, Type, Nidx, O),
|
||||
{PS, RG} = get_presence_and_roster_permissions(
|
||||
Host, From, Owners, AccessModel, AllowedGroups),
|
||||
node_call(Host, Type, get_items,
|
||||
[Nidx, From, AccessModel, PS, RG, SubId, RSM])
|
||||
case ItemIds of
|
||||
[ItemId] ->
|
||||
node_call(Host, Type, get_item,
|
||||
[Nidx, ItemId, From, AccessModel, PS, RG, undefined]);
|
||||
_ ->
|
||||
node_call(Host, Type, get_items,
|
||||
[Nidx, From, AccessModel, PS, RG, SubId, RSM])
|
||||
end
|
||||
end
|
||||
end,
|
||||
case transaction(Host, Node, Action, sync_dirty) of
|
||||
|
@ -2005,6 +2011,10 @@ get_items(Host, Node, From, SubId, SMaxItems, ItemIds, RSM) ->
|
|||
#pubsub{items = #ps_items{node = Node,
|
||||
items = itemsEls(lists:sublist(SendItems, MaxItems))},
|
||||
rsm = RsmOut}};
|
||||
{result, {_, Item}} ->
|
||||
{result,
|
||||
#pubsub{items = #ps_items{node = Node,
|
||||
items = itemsEls([Item])}}};
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
|
|
@ -647,95 +647,60 @@ del_state(Nidx, JID) ->
|
|||
" where jid=%(J)s and nodeid=%(Nidx)d")),
|
||||
ok.
|
||||
|
||||
get_items(Nidx, From, undefined) ->
|
||||
MaxItems = case ejabberd_sql:sql_query_t(
|
||||
?SQL("select @(val)s from pubsub_node_option "
|
||||
"where nodeid=%(Nidx)d and name='max_items'")) of
|
||||
{selected, [{Value}]} ->
|
||||
misc:expr_to_term(Value);
|
||||
_ ->
|
||||
?MAXITEMS
|
||||
end,
|
||||
get_items(Nidx, From, #rsm_set{max = MaxItems});
|
||||
get_items(Nidx, _From, undefined) ->
|
||||
SNidx = misc:i2l(Nidx),
|
||||
case ejabberd_sql:sql_query_t(
|
||||
[<<"select itemid, publisher, creation, modification, payload",
|
||||
" from pubsub_item where nodeid='", SNidx/binary, "'">>]) of
|
||||
{selected, _, AllItems} ->
|
||||
Count = length(AllItems),
|
||||
if Count =< ?MAXITEMS ->
|
||||
{result, {[raw_to_item(Nidx, RItem) || RItem <- AllItems], undefined}};
|
||||
true ->
|
||||
RItems = lists:sublist(AllItems, ?MAXITEMS),
|
||||
Rsm = rsm_page(Count, 0, 0, RItems),
|
||||
{result, {[raw_to_item(Nidx, RItem) || RItem <- RItems], Rsm}}
|
||||
end;
|
||||
_ ->
|
||||
{result, {[], undefined}}
|
||||
end;
|
||||
get_items(Nidx, _From, #rsm_set{max = Max, index = IncIndex,
|
||||
'after' = After, before = Before}) ->
|
||||
{Way, Order} = if After == <<>> -> {<<"is not">>, <<"desc">>};
|
||||
After /= undefined -> {<<"<">>, <<"desc">>};
|
||||
Before == <<>> -> {<<"is not">>, <<"asc">>};
|
||||
Before /= undefined -> {<<">">>, <<"asc">>};
|
||||
true -> {<<"is not">>, <<"desc">>}
|
||||
end,
|
||||
SNidx = misc:i2l(Nidx),
|
||||
I = if After /= undefined -> After;
|
||||
Before /= undefined -> Before;
|
||||
true -> undefined
|
||||
end,
|
||||
[AttrName, Id] =
|
||||
case I of
|
||||
undefined when IncIndex =/= undefined ->
|
||||
case ejabberd_sql:sql_query_t(
|
||||
[<<"select creation from pubsub_item pi "
|
||||
"where exists ( select count(*) as count1 "
|
||||
"from pubsub_item where nodeid='">>, SNidx,
|
||||
<<"' and creation > pi.creation having count1 = ">>,
|
||||
integer_to_binary(IncIndex), <<" );">>]) of
|
||||
{selected, [_], [[O]]} ->
|
||||
[<<"creation">>, <<"'", O/binary, "'">>];
|
||||
_ ->
|
||||
[<<"creation">>, <<"null">>]
|
||||
end;
|
||||
undefined ->
|
||||
[<<"creation">>, <<"null">>];
|
||||
<<>> ->
|
||||
[<<"creation">>, <<"null">>];
|
||||
I ->
|
||||
[A, B] = str:tokens(ejabberd_sql:escape(I), <<"@">>),
|
||||
[A, <<"'", B/binary, "'">>]
|
||||
end,
|
||||
Count = case ejabberd_sql:sql_query_t(
|
||||
[<<"select count(*) from pubsub_item where nodeid='">>,
|
||||
SNidx, <<"';">>]) of
|
||||
{selected, [_], [[C]]} -> binary_to_integer(C);
|
||||
Count = case catch ejabberd_sql:sql_query_t(
|
||||
?SQL("select @(count(itemid))d from pubsub_item"
|
||||
" where nodeid=%(Nidx)d")) of
|
||||
{selected, [{C}]} -> C;
|
||||
_ -> 0
|
||||
end,
|
||||
Offset = case {IncIndex, Before, After} of
|
||||
{I, undefined, undefined} when is_integer(I) -> I;
|
||||
_ -> 0
|
||||
end,
|
||||
Limit = case Max of
|
||||
undefined -> ?MAXITEMS;
|
||||
_ -> Max
|
||||
end,
|
||||
Filters = rsm_filters(misc:i2l(Nidx), Before, After),
|
||||
Query = fun(mssql, _) ->
|
||||
ejabberd_sql:sql_query_t(
|
||||
[<<"select top ">>, integer_to_binary(Max),
|
||||
<<" itemid, publisher, creation, modification, payload "
|
||||
"from pubsub_item where nodeid='">>, SNidx,
|
||||
<<"' and ">>, AttrName, <<" ">>, Way, <<" ">>, Id, <<" order by ">>,
|
||||
AttrName, <<" ">>, Order, <<";">>]);
|
||||
[<<"select top ", (integer_to_binary(Limit))/binary,
|
||||
" itemid, publisher, creation, modification, payload",
|
||||
" from pubsub_item", Filters/binary>>]);
|
||||
%OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY;
|
||||
(_, _) ->
|
||||
ejabberd_sql:sql_query_t(
|
||||
[<<"select itemid, publisher, creation, modification, payload "
|
||||
"from pubsub_item where nodeid='">>, SNidx,
|
||||
<<"' and ">>, AttrName, <<" ">>, Way, <<" ">>, Id, <<" order by ">>,
|
||||
AttrName, <<" ">>, Order, <<" limit ">>,
|
||||
integer_to_binary(Max), <<" ;">>])
|
||||
[<<"select itemid, publisher, creation, modification, payload",
|
||||
" from pubsub_item", Filters/binary,
|
||||
" limit ", (integer_to_binary(Limit))/binary,
|
||||
" offset ", (integer_to_binary(Offset))/binary>>])
|
||||
end,
|
||||
case ejabberd_sql:sql_query_t(Query) of
|
||||
{selected, _, []} ->
|
||||
{result, {[], #rsm_set{count = Count}}};
|
||||
{selected, [<<"itemid">>, <<"publisher">>, <<"creation">>,
|
||||
<<"modification">>, <<"payload">>], RItems} ->
|
||||
case RItems of
|
||||
[[_, _, _, F, _]|_] ->
|
||||
Index = case catch ejabberd_sql:sql_query_t(
|
||||
[<<"select count(*) from pubsub_item "
|
||||
"where nodeid='">>, SNidx, <<"' and ">>,
|
||||
AttrName, <<" > '">>, F, <<"';">>]) of
|
||||
{selected, [_], [[In]]} -> binary_to_integer(In);
|
||||
_ -> 0
|
||||
end,
|
||||
[_, _, _, L, _] = lists:last(RItems),
|
||||
RsmOut = #rsm_set{count = Count,
|
||||
index = Index,
|
||||
first = #rsm_first{
|
||||
index = Index,
|
||||
data = <<"creation@", F/binary>>},
|
||||
last = <<"creation@", L/binary>>},
|
||||
{result, {[raw_to_item(Nidx, RItem) || RItem <- RItems], RsmOut}};
|
||||
[] ->
|
||||
{result, {[], #rsm_set{count = Count}}}
|
||||
end;
|
||||
Rsm = rsm_page(Count, IncIndex, Offset, RItems),
|
||||
{result, {[raw_to_item(Nidx, RItem) || RItem <- RItems], Rsm}};
|
||||
_ ->
|
||||
{result, {[], undefined}}
|
||||
end.
|
||||
|
@ -773,24 +738,24 @@ get_items(Nidx, JID, AccessModel, PresenceSubscription, RosterGroup, _SubId, RSM
|
|||
get_items(Nidx, JID, RSM)
|
||||
end.
|
||||
|
||||
get_last_items(Nidx, _From, Count) ->
|
||||
Limit = misc:i2l(Count),
|
||||
get_last_items(Nidx, _From, Limit) ->
|
||||
SNidx = misc:i2l(Nidx),
|
||||
Query = fun(mssql, _) ->
|
||||
ejabberd_sql:sql_query_t(
|
||||
[<<"select top ">>, Limit,
|
||||
<<" itemid, publisher, creation, modification, payload "
|
||||
"from pubsub_item where nodeid='">>, SNidx,
|
||||
<<"' order by modification desc ;">>]);
|
||||
[<<"select top ", (integer_to_binary(Limit))/binary,
|
||||
" itemid, publisher, creation, modification, payload",
|
||||
" from pubsub_item where nodeid='", SNidx/binary,
|
||||
"' order by modification desc">>]);
|
||||
(_, _) ->
|
||||
ejabberd_sql:sql_query_t(
|
||||
[<<"select itemid, publisher, creation, modification, payload "
|
||||
"from pubsub_item where nodeid='">>, SNidx,
|
||||
<<"' order by modification desc limit ">>, Limit, <<";">>])
|
||||
[<<"select itemid, publisher, creation, modification, payload",
|
||||
" from pubsub_item where nodeid='", SNidx/binary,
|
||||
"' order by modification desc ",
|
||||
" limit ", (integer_to_binary(Limit))/binary>>])
|
||||
end,
|
||||
case catch ejabberd_sql:sql_query_t(Query) of
|
||||
{selected,
|
||||
[<<"itemid">>, <<"publisher">>, <<"creation">>, <<"modification">>, <<"payload">>], RItems} ->
|
||||
{selected, [<<"itemid">>, <<"publisher">>, <<"creation">>,
|
||||
<<"modification">>, <<"payload">>], RItems} ->
|
||||
{result, [raw_to_item(Nidx, RItem) || RItem <- RItems]};
|
||||
_ ->
|
||||
{result, []}
|
||||
|
@ -798,9 +763,9 @@ get_last_items(Nidx, _From, Count) ->
|
|||
|
||||
get_item(Nidx, ItemId) ->
|
||||
case catch ejabberd_sql:sql_query_t(
|
||||
?SQL("select @(itemid)s, @(publisher)s, @(creation)s,"
|
||||
" @(modification)s, @(payload)s from pubsub_item"
|
||||
" where nodeid=%(Nidx)d and itemid=%(ItemId)s"))
|
||||
?SQL("select @(itemid)s, @(publisher)s, @(creation)s,"
|
||||
" @(modification)s, @(payload)s from pubsub_item"
|
||||
" where nodeid=%(Nidx)d and itemid=%(ItemId)s"))
|
||||
of
|
||||
{selected, [RItem]} ->
|
||||
{result, raw_to_item(Nidx, RItem)};
|
||||
|
@ -850,11 +815,8 @@ set_item(Item) ->
|
|||
P = encode_jid(JID),
|
||||
Payload = Item#pubsub_item.payload,
|
||||
XML = str:join([fxml:element_to_binary(X) || X<-Payload], <<>>),
|
||||
S = fun ({T1, T2, T3}) ->
|
||||
str:join([misc:i2l(T1, 6), misc:i2l(T2, 6), misc:i2l(T3, 6)], <<":">>)
|
||||
end,
|
||||
SM = S(M),
|
||||
SC = S(C),
|
||||
SM = encode_now(M),
|
||||
SC = encode_now(C),
|
||||
?SQL_UPSERT_T(
|
||||
"pubsub_item",
|
||||
["!nodeid=%(Nidx)d",
|
||||
|
@ -1029,15 +991,52 @@ raw_to_item(Nidx, [ItemId, SJID, Creation, Modification, XML]) ->
|
|||
raw_to_item(Nidx, {ItemId, SJID, Creation, Modification, XML});
|
||||
raw_to_item(Nidx, {ItemId, SJID, Creation, Modification, XML}) ->
|
||||
JID = decode_jid(SJID),
|
||||
ToTime = fun (Str) ->
|
||||
[T1, T2, T3] = str:tokens(Str, <<":">>),
|
||||
{misc:l2i(T1), misc:l2i(T2), misc:l2i(T3)}
|
||||
end,
|
||||
Payload = case fxml_stream:parse_element(XML) of
|
||||
{error, _Reason} -> [];
|
||||
El -> [El]
|
||||
end,
|
||||
#pubsub_item{itemid = {ItemId, Nidx},
|
||||
creation = {ToTime(Creation), jid:remove_resource(JID)},
|
||||
modification = {ToTime(Modification), JID},
|
||||
creation = {decode_now(Creation), jid:remove_resource(JID)},
|
||||
modification = {decode_now(Modification), JID},
|
||||
payload = Payload}.
|
||||
|
||||
rsm_filters(SNidx, undefined, undefined) ->
|
||||
<<" where nodeid='", SNidx/binary, "'",
|
||||
" order by creation asc">>;
|
||||
rsm_filters(SNidx, undefined, After) ->
|
||||
<<" where nodeid='", SNidx/binary, "'",
|
||||
" and creation>'", (encode_stamp(After))/binary, "'",
|
||||
" order by creation asc">>;
|
||||
rsm_filters(SNidx, <<>>, undefined) ->
|
||||
%% 2.5 Requesting the Last Page in a Result Set
|
||||
Now = p1_time_compat:timestamp(),
|
||||
<<" where nodeid='", SNidx/binary, "'",
|
||||
" and creation<'", (encode_now(Now))/binary, "'",
|
||||
" order by creation desc">>;
|
||||
rsm_filters(SNidx, Before, undefined) ->
|
||||
<<" where nodeid='", SNidx/binary, "'",
|
||||
" and creation<'", (encode_stamp(Before))/binary, "'",
|
||||
" order by creation desc">>.
|
||||
|
||||
rsm_page(Count, Index, Offset, Items) ->
|
||||
First = decode_stamp(lists:nth(3, hd(Items))),
|
||||
Last = decode_stamp(lists:nth(3, lists:last(Items))),
|
||||
#rsm_set{count = Count, index = Index,
|
||||
first = #rsm_first{index = Offset, data = First},
|
||||
last = Last}.
|
||||
|
||||
encode_stamp(Stamp) ->
|
||||
case jlib:datetime_string_to_timestamp(Stamp) of
|
||||
{MS,S,US} -> encode_now({MS,S,US});
|
||||
_ -> Stamp
|
||||
end.
|
||||
decode_stamp(Stamp) ->
|
||||
jlib:now_to_utc_string(decode_now(Stamp)).
|
||||
|
||||
encode_now({T1, T2, T3}) ->
|
||||
<<(misc:i2l(T1, 6))/binary, ":",
|
||||
(misc:i2l(T2, 6))/binary, ":",
|
||||
(misc:i2l(T3, 6))/binary>>.
|
||||
decode_now(NowStr) ->
|
||||
[MS, S, US] = binary:split(NowStr, <<":">>, [global]),
|
||||
{binary_to_integer(MS), binary_to_integer(S), binary_to_integer(US)}.
|
||||
|
|
Loading…
Reference in New Issue