PubSub: fix RSM support (#1994)(#2Â014)
This commit is contained in:
parent
cf09ed2df2
commit
07a193d4dc
|
@ -1986,8 +1986,14 @@ get_items(Host, Node, From, SubId, SMaxItems, ItemIds, RSM) ->
|
||||||
Owners = node_owners_call(Host, Type, Nidx, O),
|
Owners = node_owners_call(Host, Type, Nidx, O),
|
||||||
{PS, RG} = get_presence_and_roster_permissions(
|
{PS, RG} = get_presence_and_roster_permissions(
|
||||||
Host, From, Owners, AccessModel, AllowedGroups),
|
Host, From, Owners, AccessModel, AllowedGroups),
|
||||||
node_call(Host, Type, get_items,
|
case ItemIds of
|
||||||
[Nidx, From, AccessModel, PS, RG, SubId, RSM])
|
[ItemId] ->
|
||||||
|
node_call(Host, Type, get_item,
|
||||||
|
[Nidx, ItemId, From, AccessModel, PS, RG, undefined]);
|
||||||
|
_ ->
|
||||||
|
node_call(Host, Type, get_items,
|
||||||
|
[Nidx, From, AccessModel, PS, RG, SubId, RSM])
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
case transaction(Host, Node, Action, sync_dirty) of
|
case transaction(Host, Node, Action, sync_dirty) of
|
||||||
|
@ -2005,6 +2011,10 @@ get_items(Host, Node, From, SubId, SMaxItems, ItemIds, RSM) ->
|
||||||
#pubsub{items = #ps_items{node = Node,
|
#pubsub{items = #ps_items{node = Node,
|
||||||
items = itemsEls(lists:sublist(SendItems, MaxItems))},
|
items = itemsEls(lists:sublist(SendItems, MaxItems))},
|
||||||
rsm = RsmOut}};
|
rsm = RsmOut}};
|
||||||
|
{result, {_, Item}} ->
|
||||||
|
{result,
|
||||||
|
#pubsub{items = #ps_items{node = Node,
|
||||||
|
items = itemsEls([Item])}}};
|
||||||
Error ->
|
Error ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -647,95 +647,60 @@ del_state(Nidx, JID) ->
|
||||||
" where jid=%(J)s and nodeid=%(Nidx)d")),
|
" where jid=%(J)s and nodeid=%(Nidx)d")),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
get_items(Nidx, From, undefined) ->
|
get_items(Nidx, _From, undefined) ->
|
||||||
MaxItems = case ejabberd_sql:sql_query_t(
|
SNidx = misc:i2l(Nidx),
|
||||||
?SQL("select @(val)s from pubsub_node_option "
|
case ejabberd_sql:sql_query_t(
|
||||||
"where nodeid=%(Nidx)d and name='max_items'")) of
|
[<<"select itemid, publisher, creation, modification, payload",
|
||||||
{selected, [{Value}]} ->
|
" from pubsub_item where nodeid='", SNidx/binary, "'">>]) of
|
||||||
misc:expr_to_term(Value);
|
{selected, _, AllItems} ->
|
||||||
_ ->
|
Count = length(AllItems),
|
||||||
?MAXITEMS
|
if Count =< ?MAXITEMS ->
|
||||||
end,
|
{result, {[raw_to_item(Nidx, RItem) || RItem <- AllItems], undefined}};
|
||||||
get_items(Nidx, From, #rsm_set{max = MaxItems});
|
true ->
|
||||||
|
RItems = lists:sublist(AllItems, ?MAXITEMS),
|
||||||
|
Rsm = rsm_page(Count, 0, 0, RItems),
|
||||||
|
{result, {[raw_to_item(Nidx, RItem) || RItem <- RItems], Rsm}}
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
{result, {[], undefined}}
|
||||||
|
end;
|
||||||
get_items(Nidx, _From, #rsm_set{max = Max, index = IncIndex,
|
get_items(Nidx, _From, #rsm_set{max = Max, index = IncIndex,
|
||||||
'after' = After, before = Before}) ->
|
'after' = After, before = Before}) ->
|
||||||
{Way, Order} = if After == <<>> -> {<<"is not">>, <<"desc">>};
|
Count = case catch ejabberd_sql:sql_query_t(
|
||||||
After /= undefined -> {<<"<">>, <<"desc">>};
|
?SQL("select @(count(itemid))d from pubsub_item"
|
||||||
Before == <<>> -> {<<"is not">>, <<"asc">>};
|
" where nodeid=%(Nidx)d")) of
|
||||||
Before /= undefined -> {<<">">>, <<"asc">>};
|
{selected, [{C}]} -> C;
|
||||||
true -> {<<"is not">>, <<"desc">>}
|
|
||||||
end,
|
|
||||||
SNidx = misc:i2l(Nidx),
|
|
||||||
I = if After /= undefined -> After;
|
|
||||||
Before /= undefined -> Before;
|
|
||||||
true -> undefined
|
|
||||||
end,
|
|
||||||
[AttrName, Id] =
|
|
||||||
case I of
|
|
||||||
undefined when IncIndex =/= undefined ->
|
|
||||||
case ejabberd_sql:sql_query_t(
|
|
||||||
[<<"select creation from pubsub_item pi "
|
|
||||||
"where exists ( select count(*) as count1 "
|
|
||||||
"from pubsub_item where nodeid='">>, SNidx,
|
|
||||||
<<"' and creation > pi.creation having count1 = ">>,
|
|
||||||
integer_to_binary(IncIndex), <<" );">>]) of
|
|
||||||
{selected, [_], [[O]]} ->
|
|
||||||
[<<"creation">>, <<"'", O/binary, "'">>];
|
|
||||||
_ ->
|
|
||||||
[<<"creation">>, <<"null">>]
|
|
||||||
end;
|
|
||||||
undefined ->
|
|
||||||
[<<"creation">>, <<"null">>];
|
|
||||||
<<>> ->
|
|
||||||
[<<"creation">>, <<"null">>];
|
|
||||||
I ->
|
|
||||||
[A, B] = str:tokens(ejabberd_sql:escape(I), <<"@">>),
|
|
||||||
[A, <<"'", B/binary, "'">>]
|
|
||||||
end,
|
|
||||||
Count = case ejabberd_sql:sql_query_t(
|
|
||||||
[<<"select count(*) from pubsub_item where nodeid='">>,
|
|
||||||
SNidx, <<"';">>]) of
|
|
||||||
{selected, [_], [[C]]} -> binary_to_integer(C);
|
|
||||||
_ -> 0
|
_ -> 0
|
||||||
end,
|
end,
|
||||||
|
Offset = case {IncIndex, Before, After} of
|
||||||
|
{I, undefined, undefined} when is_integer(I) -> I;
|
||||||
|
_ -> 0
|
||||||
|
end,
|
||||||
|
Limit = case Max of
|
||||||
|
undefined -> ?MAXITEMS;
|
||||||
|
_ -> Max
|
||||||
|
end,
|
||||||
|
Filters = rsm_filters(misc:i2l(Nidx), Before, After),
|
||||||
Query = fun(mssql, _) ->
|
Query = fun(mssql, _) ->
|
||||||
ejabberd_sql:sql_query_t(
|
ejabberd_sql:sql_query_t(
|
||||||
[<<"select top ">>, integer_to_binary(Max),
|
[<<"select top ", (integer_to_binary(Limit))/binary,
|
||||||
<<" itemid, publisher, creation, modification, payload "
|
" itemid, publisher, creation, modification, payload",
|
||||||
"from pubsub_item where nodeid='">>, SNidx,
|
" from pubsub_item", Filters/binary>>]);
|
||||||
<<"' and ">>, AttrName, <<" ">>, Way, <<" ">>, Id, <<" order by ">>,
|
%OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY;
|
||||||
AttrName, <<" ">>, Order, <<";">>]);
|
|
||||||
(_, _) ->
|
(_, _) ->
|
||||||
ejabberd_sql:sql_query_t(
|
ejabberd_sql:sql_query_t(
|
||||||
[<<"select itemid, publisher, creation, modification, payload "
|
[<<"select itemid, publisher, creation, modification, payload",
|
||||||
"from pubsub_item where nodeid='">>, SNidx,
|
" from pubsub_item", Filters/binary,
|
||||||
<<"' and ">>, AttrName, <<" ">>, Way, <<" ">>, Id, <<" order by ">>,
|
" limit ", (integer_to_binary(Limit))/binary,
|
||||||
AttrName, <<" ">>, Order, <<" limit ">>,
|
" offset ", (integer_to_binary(Offset))/binary>>])
|
||||||
integer_to_binary(Max), <<" ;">>])
|
|
||||||
end,
|
end,
|
||||||
case ejabberd_sql:sql_query_t(Query) of
|
case ejabberd_sql:sql_query_t(Query) of
|
||||||
|
{selected, _, []} ->
|
||||||
|
{result, {[], #rsm_set{count = Count}}};
|
||||||
{selected, [<<"itemid">>, <<"publisher">>, <<"creation">>,
|
{selected, [<<"itemid">>, <<"publisher">>, <<"creation">>,
|
||||||
<<"modification">>, <<"payload">>], RItems} ->
|
<<"modification">>, <<"payload">>], RItems} ->
|
||||||
case RItems of
|
Rsm = rsm_page(Count, IncIndex, Offset, RItems),
|
||||||
[[_, _, _, F, _]|_] ->
|
{result, {[raw_to_item(Nidx, RItem) || RItem <- RItems], Rsm}};
|
||||||
Index = case catch ejabberd_sql:sql_query_t(
|
|
||||||
[<<"select count(*) from pubsub_item "
|
|
||||||
"where nodeid='">>, SNidx, <<"' and ">>,
|
|
||||||
AttrName, <<" > '">>, F, <<"';">>]) of
|
|
||||||
{selected, [_], [[In]]} -> binary_to_integer(In);
|
|
||||||
_ -> 0
|
|
||||||
end,
|
|
||||||
[_, _, _, L, _] = lists:last(RItems),
|
|
||||||
RsmOut = #rsm_set{count = Count,
|
|
||||||
index = Index,
|
|
||||||
first = #rsm_first{
|
|
||||||
index = Index,
|
|
||||||
data = <<"creation@", F/binary>>},
|
|
||||||
last = <<"creation@", L/binary>>},
|
|
||||||
{result, {[raw_to_item(Nidx, RItem) || RItem <- RItems], RsmOut}};
|
|
||||||
[] ->
|
|
||||||
{result, {[], #rsm_set{count = Count}}}
|
|
||||||
end;
|
|
||||||
_ ->
|
_ ->
|
||||||
{result, {[], undefined}}
|
{result, {[], undefined}}
|
||||||
end.
|
end.
|
||||||
|
@ -773,24 +738,24 @@ get_items(Nidx, JID, AccessModel, PresenceSubscription, RosterGroup, _SubId, RSM
|
||||||
get_items(Nidx, JID, RSM)
|
get_items(Nidx, JID, RSM)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_last_items(Nidx, _From, Count) ->
|
get_last_items(Nidx, _From, Limit) ->
|
||||||
Limit = misc:i2l(Count),
|
|
||||||
SNidx = misc:i2l(Nidx),
|
SNidx = misc:i2l(Nidx),
|
||||||
Query = fun(mssql, _) ->
|
Query = fun(mssql, _) ->
|
||||||
ejabberd_sql:sql_query_t(
|
ejabberd_sql:sql_query_t(
|
||||||
[<<"select top ">>, Limit,
|
[<<"select top ", (integer_to_binary(Limit))/binary,
|
||||||
<<" itemid, publisher, creation, modification, payload "
|
" itemid, publisher, creation, modification, payload",
|
||||||
"from pubsub_item where nodeid='">>, SNidx,
|
" from pubsub_item where nodeid='", SNidx/binary,
|
||||||
<<"' order by modification desc ;">>]);
|
"' order by modification desc">>]);
|
||||||
(_, _) ->
|
(_, _) ->
|
||||||
ejabberd_sql:sql_query_t(
|
ejabberd_sql:sql_query_t(
|
||||||
[<<"select itemid, publisher, creation, modification, payload "
|
[<<"select itemid, publisher, creation, modification, payload",
|
||||||
"from pubsub_item where nodeid='">>, SNidx,
|
" from pubsub_item where nodeid='", SNidx/binary,
|
||||||
<<"' order by modification desc limit ">>, Limit, <<";">>])
|
"' order by modification desc ",
|
||||||
|
" limit ", (integer_to_binary(Limit))/binary>>])
|
||||||
end,
|
end,
|
||||||
case catch ejabberd_sql:sql_query_t(Query) of
|
case catch ejabberd_sql:sql_query_t(Query) of
|
||||||
{selected,
|
{selected, [<<"itemid">>, <<"publisher">>, <<"creation">>,
|
||||||
[<<"itemid">>, <<"publisher">>, <<"creation">>, <<"modification">>, <<"payload">>], RItems} ->
|
<<"modification">>, <<"payload">>], RItems} ->
|
||||||
{result, [raw_to_item(Nidx, RItem) || RItem <- RItems]};
|
{result, [raw_to_item(Nidx, RItem) || RItem <- RItems]};
|
||||||
_ ->
|
_ ->
|
||||||
{result, []}
|
{result, []}
|
||||||
|
@ -798,9 +763,9 @@ get_last_items(Nidx, _From, Count) ->
|
||||||
|
|
||||||
get_item(Nidx, ItemId) ->
|
get_item(Nidx, ItemId) ->
|
||||||
case catch ejabberd_sql:sql_query_t(
|
case catch ejabberd_sql:sql_query_t(
|
||||||
?SQL("select @(itemid)s, @(publisher)s, @(creation)s,"
|
?SQL("select @(itemid)s, @(publisher)s, @(creation)s,"
|
||||||
" @(modification)s, @(payload)s from pubsub_item"
|
" @(modification)s, @(payload)s from pubsub_item"
|
||||||
" where nodeid=%(Nidx)d and itemid=%(ItemId)s"))
|
" where nodeid=%(Nidx)d and itemid=%(ItemId)s"))
|
||||||
of
|
of
|
||||||
{selected, [RItem]} ->
|
{selected, [RItem]} ->
|
||||||
{result, raw_to_item(Nidx, RItem)};
|
{result, raw_to_item(Nidx, RItem)};
|
||||||
|
@ -850,11 +815,8 @@ set_item(Item) ->
|
||||||
P = encode_jid(JID),
|
P = encode_jid(JID),
|
||||||
Payload = Item#pubsub_item.payload,
|
Payload = Item#pubsub_item.payload,
|
||||||
XML = str:join([fxml:element_to_binary(X) || X<-Payload], <<>>),
|
XML = str:join([fxml:element_to_binary(X) || X<-Payload], <<>>),
|
||||||
S = fun ({T1, T2, T3}) ->
|
SM = encode_now(M),
|
||||||
str:join([misc:i2l(T1, 6), misc:i2l(T2, 6), misc:i2l(T3, 6)], <<":">>)
|
SC = encode_now(C),
|
||||||
end,
|
|
||||||
SM = S(M),
|
|
||||||
SC = S(C),
|
|
||||||
?SQL_UPSERT_T(
|
?SQL_UPSERT_T(
|
||||||
"pubsub_item",
|
"pubsub_item",
|
||||||
["!nodeid=%(Nidx)d",
|
["!nodeid=%(Nidx)d",
|
||||||
|
@ -1029,15 +991,52 @@ raw_to_item(Nidx, [ItemId, SJID, Creation, Modification, XML]) ->
|
||||||
raw_to_item(Nidx, {ItemId, SJID, Creation, Modification, XML});
|
raw_to_item(Nidx, {ItemId, SJID, Creation, Modification, XML});
|
||||||
raw_to_item(Nidx, {ItemId, SJID, Creation, Modification, XML}) ->
|
raw_to_item(Nidx, {ItemId, SJID, Creation, Modification, XML}) ->
|
||||||
JID = decode_jid(SJID),
|
JID = decode_jid(SJID),
|
||||||
ToTime = fun (Str) ->
|
|
||||||
[T1, T2, T3] = str:tokens(Str, <<":">>),
|
|
||||||
{misc:l2i(T1), misc:l2i(T2), misc:l2i(T3)}
|
|
||||||
end,
|
|
||||||
Payload = case fxml_stream:parse_element(XML) of
|
Payload = case fxml_stream:parse_element(XML) of
|
||||||
{error, _Reason} -> [];
|
{error, _Reason} -> [];
|
||||||
El -> [El]
|
El -> [El]
|
||||||
end,
|
end,
|
||||||
#pubsub_item{itemid = {ItemId, Nidx},
|
#pubsub_item{itemid = {ItemId, Nidx},
|
||||||
creation = {ToTime(Creation), jid:remove_resource(JID)},
|
creation = {decode_now(Creation), jid:remove_resource(JID)},
|
||||||
modification = {ToTime(Modification), JID},
|
modification = {decode_now(Modification), JID},
|
||||||
payload = Payload}.
|
payload = Payload}.
|
||||||
|
|
||||||
|
rsm_filters(SNidx, undefined, undefined) ->
|
||||||
|
<<" where nodeid='", SNidx/binary, "'",
|
||||||
|
" order by creation asc">>;
|
||||||
|
rsm_filters(SNidx, undefined, After) ->
|
||||||
|
<<" where nodeid='", SNidx/binary, "'",
|
||||||
|
" and creation>'", (encode_stamp(After))/binary, "'",
|
||||||
|
" order by creation asc">>;
|
||||||
|
rsm_filters(SNidx, <<>>, undefined) ->
|
||||||
|
%% 2.5 Requesting the Last Page in a Result Set
|
||||||
|
Now = p1_time_compat:timestamp(),
|
||||||
|
<<" where nodeid='", SNidx/binary, "'",
|
||||||
|
" and creation<'", (encode_now(Now))/binary, "'",
|
||||||
|
" order by creation desc">>;
|
||||||
|
rsm_filters(SNidx, Before, undefined) ->
|
||||||
|
<<" where nodeid='", SNidx/binary, "'",
|
||||||
|
" and creation<'", (encode_stamp(Before))/binary, "'",
|
||||||
|
" order by creation desc">>.
|
||||||
|
|
||||||
|
rsm_page(Count, Index, Offset, Items) ->
|
||||||
|
First = decode_stamp(lists:nth(3, hd(Items))),
|
||||||
|
Last = decode_stamp(lists:nth(3, lists:last(Items))),
|
||||||
|
#rsm_set{count = Count, index = Index,
|
||||||
|
first = #rsm_first{index = Offset, data = First},
|
||||||
|
last = Last}.
|
||||||
|
|
||||||
|
encode_stamp(Stamp) ->
|
||||||
|
case jlib:datetime_string_to_timestamp(Stamp) of
|
||||||
|
{MS,S,US} -> encode_now({MS,S,US});
|
||||||
|
_ -> Stamp
|
||||||
|
end.
|
||||||
|
decode_stamp(Stamp) ->
|
||||||
|
jlib:now_to_utc_string(decode_now(Stamp)).
|
||||||
|
|
||||||
|
encode_now({T1, T2, T3}) ->
|
||||||
|
<<(misc:i2l(T1, 6))/binary, ":",
|
||||||
|
(misc:i2l(T2, 6))/binary, ":",
|
||||||
|
(misc:i2l(T3, 6))/binary>>.
|
||||||
|
decode_now(NowStr) ->
|
||||||
|
[MS, S, US] = binary:split(NowStr, <<":">>, [global]),
|
||||||
|
{binary_to_integer(MS), binary_to_integer(S), binary_to_integer(US)}.
|
||||||
|
|
Loading…
Reference in New Issue