From 07a193d4dc8e4419f47fe21a45ef53fced8f9656 Mon Sep 17 00:00:00 2001 From: Christophe Romain Date: Wed, 27 Sep 2017 10:51:37 +0200 Subject: [PATCH] =?UTF-8?q?PubSub:=20fix=20RSM=20support=20(#1994)(#2?= =?UTF-8?q?=C3=82014)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/mod_pubsub.erl | 14 ++- src/node_flat_sql.erl | 203 +++++++++++++++++++++--------------------- 2 files changed, 113 insertions(+), 104 deletions(-) diff --git a/src/mod_pubsub.erl b/src/mod_pubsub.erl index 95ab5ec4b..669422d15 100644 --- a/src/mod_pubsub.erl +++ b/src/mod_pubsub.erl @@ -1986,8 +1986,14 @@ get_items(Host, Node, From, SubId, SMaxItems, ItemIds, RSM) -> Owners = node_owners_call(Host, Type, Nidx, O), {PS, RG} = get_presence_and_roster_permissions( Host, From, Owners, AccessModel, AllowedGroups), - node_call(Host, Type, get_items, - [Nidx, From, AccessModel, PS, RG, SubId, RSM]) + case ItemIds of + [ItemId] -> + node_call(Host, Type, get_item, + [Nidx, ItemId, From, AccessModel, PS, RG, undefined]); + _ -> + node_call(Host, Type, get_items, + [Nidx, From, AccessModel, PS, RG, SubId, RSM]) + end end end, case transaction(Host, Node, Action, sync_dirty) of @@ -2005,6 +2011,10 @@ get_items(Host, Node, From, SubId, SMaxItems, ItemIds, RSM) -> #pubsub{items = #ps_items{node = Node, items = itemsEls(lists:sublist(SendItems, MaxItems))}, rsm = RsmOut}}; + {result, {_, Item}} -> + {result, + #pubsub{items = #ps_items{node = Node, + items = itemsEls([Item])}}}; Error -> Error end. diff --git a/src/node_flat_sql.erl b/src/node_flat_sql.erl index 72281a970..110e31092 100644 --- a/src/node_flat_sql.erl +++ b/src/node_flat_sql.erl @@ -647,95 +647,60 @@ del_state(Nidx, JID) -> " where jid=%(J)s and nodeid=%(Nidx)d")), ok. -get_items(Nidx, From, undefined) -> - MaxItems = case ejabberd_sql:sql_query_t( - ?SQL("select @(val)s from pubsub_node_option " - "where nodeid=%(Nidx)d and name='max_items'")) of - {selected, [{Value}]} -> - misc:expr_to_term(Value); - _ -> - ?MAXITEMS - end, - get_items(Nidx, From, #rsm_set{max = MaxItems}); +get_items(Nidx, _From, undefined) -> + SNidx = misc:i2l(Nidx), + case ejabberd_sql:sql_query_t( + [<<"select itemid, publisher, creation, modification, payload", + " from pubsub_item where nodeid='", SNidx/binary, "'">>]) of + {selected, _, AllItems} -> + Count = length(AllItems), + if Count =< ?MAXITEMS -> + {result, {[raw_to_item(Nidx, RItem) || RItem <- AllItems], undefined}}; + true -> + RItems = lists:sublist(AllItems, ?MAXITEMS), + Rsm = rsm_page(Count, 0, 0, RItems), + {result, {[raw_to_item(Nidx, RItem) || RItem <- RItems], Rsm}} + end; + _ -> + {result, {[], undefined}} + end; get_items(Nidx, _From, #rsm_set{max = Max, index = IncIndex, 'after' = After, before = Before}) -> - {Way, Order} = if After == <<>> -> {<<"is not">>, <<"desc">>}; - After /= undefined -> {<<"<">>, <<"desc">>}; - Before == <<>> -> {<<"is not">>, <<"asc">>}; - Before /= undefined -> {<<">">>, <<"asc">>}; - true -> {<<"is not">>, <<"desc">>} - end, - SNidx = misc:i2l(Nidx), - I = if After /= undefined -> After; - Before /= undefined -> Before; - true -> undefined - end, - [AttrName, Id] = - case I of - undefined when IncIndex =/= undefined -> - case ejabberd_sql:sql_query_t( - [<<"select creation from pubsub_item pi " - "where exists ( select count(*) as count1 " - "from pubsub_item where nodeid='">>, SNidx, - <<"' and creation > pi.creation having count1 = ">>, - integer_to_binary(IncIndex), <<" );">>]) of - {selected, [_], [[O]]} -> - [<<"creation">>, <<"'", O/binary, "'">>]; - _ -> - [<<"creation">>, <<"null">>] - end; - undefined -> - [<<"creation">>, <<"null">>]; - <<>> -> - [<<"creation">>, <<"null">>]; - I -> - [A, B] = str:tokens(ejabberd_sql:escape(I), <<"@">>), - [A, <<"'", B/binary, "'">>] - end, - Count = case ejabberd_sql:sql_query_t( - [<<"select count(*) from pubsub_item where nodeid='">>, - SNidx, <<"';">>]) of - {selected, [_], [[C]]} -> binary_to_integer(C); + Count = case catch ejabberd_sql:sql_query_t( + ?SQL("select @(count(itemid))d from pubsub_item" + " where nodeid=%(Nidx)d")) of + {selected, [{C}]} -> C; _ -> 0 end, + Offset = case {IncIndex, Before, After} of + {I, undefined, undefined} when is_integer(I) -> I; + _ -> 0 + end, + Limit = case Max of + undefined -> ?MAXITEMS; + _ -> Max + end, + Filters = rsm_filters(misc:i2l(Nidx), Before, After), Query = fun(mssql, _) -> ejabberd_sql:sql_query_t( - [<<"select top ">>, integer_to_binary(Max), - <<" itemid, publisher, creation, modification, payload " - "from pubsub_item where nodeid='">>, SNidx, - <<"' and ">>, AttrName, <<" ">>, Way, <<" ">>, Id, <<" order by ">>, - AttrName, <<" ">>, Order, <<";">>]); + [<<"select top ", (integer_to_binary(Limit))/binary, + " itemid, publisher, creation, modification, payload", + " from pubsub_item", Filters/binary>>]); + %OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY; (_, _) -> ejabberd_sql:sql_query_t( - [<<"select itemid, publisher, creation, modification, payload " - "from pubsub_item where nodeid='">>, SNidx, - <<"' and ">>, AttrName, <<" ">>, Way, <<" ">>, Id, <<" order by ">>, - AttrName, <<" ">>, Order, <<" limit ">>, - integer_to_binary(Max), <<" ;">>]) + [<<"select itemid, publisher, creation, modification, payload", + " from pubsub_item", Filters/binary, + " limit ", (integer_to_binary(Limit))/binary, + " offset ", (integer_to_binary(Offset))/binary>>]) end, case ejabberd_sql:sql_query_t(Query) of + {selected, _, []} -> + {result, {[], #rsm_set{count = Count}}}; {selected, [<<"itemid">>, <<"publisher">>, <<"creation">>, <<"modification">>, <<"payload">>], RItems} -> - case RItems of - [[_, _, _, F, _]|_] -> - Index = case catch ejabberd_sql:sql_query_t( - [<<"select count(*) from pubsub_item " - "where nodeid='">>, SNidx, <<"' and ">>, - AttrName, <<" > '">>, F, <<"';">>]) of - {selected, [_], [[In]]} -> binary_to_integer(In); - _ -> 0 - end, - [_, _, _, L, _] = lists:last(RItems), - RsmOut = #rsm_set{count = Count, - index = Index, - first = #rsm_first{ - index = Index, - data = <<"creation@", F/binary>>}, - last = <<"creation@", L/binary>>}, - {result, {[raw_to_item(Nidx, RItem) || RItem <- RItems], RsmOut}}; - [] -> - {result, {[], #rsm_set{count = Count}}} - end; + Rsm = rsm_page(Count, IncIndex, Offset, RItems), + {result, {[raw_to_item(Nidx, RItem) || RItem <- RItems], Rsm}}; _ -> {result, {[], undefined}} end. @@ -773,24 +738,24 @@ get_items(Nidx, JID, AccessModel, PresenceSubscription, RosterGroup, _SubId, RSM get_items(Nidx, JID, RSM) end. -get_last_items(Nidx, _From, Count) -> - Limit = misc:i2l(Count), +get_last_items(Nidx, _From, Limit) -> SNidx = misc:i2l(Nidx), Query = fun(mssql, _) -> ejabberd_sql:sql_query_t( - [<<"select top ">>, Limit, - <<" itemid, publisher, creation, modification, payload " - "from pubsub_item where nodeid='">>, SNidx, - <<"' order by modification desc ;">>]); + [<<"select top ", (integer_to_binary(Limit))/binary, + " itemid, publisher, creation, modification, payload", + " from pubsub_item where nodeid='", SNidx/binary, + "' order by modification desc">>]); (_, _) -> ejabberd_sql:sql_query_t( - [<<"select itemid, publisher, creation, modification, payload " - "from pubsub_item where nodeid='">>, SNidx, - <<"' order by modification desc limit ">>, Limit, <<";">>]) + [<<"select itemid, publisher, creation, modification, payload", + " from pubsub_item where nodeid='", SNidx/binary, + "' order by modification desc ", + " limit ", (integer_to_binary(Limit))/binary>>]) end, case catch ejabberd_sql:sql_query_t(Query) of - {selected, - [<<"itemid">>, <<"publisher">>, <<"creation">>, <<"modification">>, <<"payload">>], RItems} -> + {selected, [<<"itemid">>, <<"publisher">>, <<"creation">>, + <<"modification">>, <<"payload">>], RItems} -> {result, [raw_to_item(Nidx, RItem) || RItem <- RItems]}; _ -> {result, []} @@ -798,9 +763,9 @@ get_last_items(Nidx, _From, Count) -> get_item(Nidx, ItemId) -> case catch ejabberd_sql:sql_query_t( - ?SQL("select @(itemid)s, @(publisher)s, @(creation)s," - " @(modification)s, @(payload)s from pubsub_item" - " where nodeid=%(Nidx)d and itemid=%(ItemId)s")) + ?SQL("select @(itemid)s, @(publisher)s, @(creation)s," + " @(modification)s, @(payload)s from pubsub_item" + " where nodeid=%(Nidx)d and itemid=%(ItemId)s")) of {selected, [RItem]} -> {result, raw_to_item(Nidx, RItem)}; @@ -850,11 +815,8 @@ set_item(Item) -> P = encode_jid(JID), Payload = Item#pubsub_item.payload, XML = str:join([fxml:element_to_binary(X) || X<-Payload], <<>>), - S = fun ({T1, T2, T3}) -> - str:join([misc:i2l(T1, 6), misc:i2l(T2, 6), misc:i2l(T3, 6)], <<":">>) - end, - SM = S(M), - SC = S(C), + SM = encode_now(M), + SC = encode_now(C), ?SQL_UPSERT_T( "pubsub_item", ["!nodeid=%(Nidx)d", @@ -1029,15 +991,52 @@ raw_to_item(Nidx, [ItemId, SJID, Creation, Modification, XML]) -> raw_to_item(Nidx, {ItemId, SJID, Creation, Modification, XML}); raw_to_item(Nidx, {ItemId, SJID, Creation, Modification, XML}) -> JID = decode_jid(SJID), - ToTime = fun (Str) -> - [T1, T2, T3] = str:tokens(Str, <<":">>), - {misc:l2i(T1), misc:l2i(T2), misc:l2i(T3)} - end, Payload = case fxml_stream:parse_element(XML) of {error, _Reason} -> []; El -> [El] end, #pubsub_item{itemid = {ItemId, Nidx}, - creation = {ToTime(Creation), jid:remove_resource(JID)}, - modification = {ToTime(Modification), JID}, + creation = {decode_now(Creation), jid:remove_resource(JID)}, + modification = {decode_now(Modification), JID}, payload = Payload}. + +rsm_filters(SNidx, undefined, undefined) -> + <<" where nodeid='", SNidx/binary, "'", + " order by creation asc">>; +rsm_filters(SNidx, undefined, After) -> + <<" where nodeid='", SNidx/binary, "'", + " and creation>'", (encode_stamp(After))/binary, "'", + " order by creation asc">>; +rsm_filters(SNidx, <<>>, undefined) -> + %% 2.5 Requesting the Last Page in a Result Set + Now = p1_time_compat:timestamp(), + <<" where nodeid='", SNidx/binary, "'", + " and creation<'", (encode_now(Now))/binary, "'", + " order by creation desc">>; +rsm_filters(SNidx, Before, undefined) -> + <<" where nodeid='", SNidx/binary, "'", + " and creation<'", (encode_stamp(Before))/binary, "'", + " order by creation desc">>. + +rsm_page(Count, Index, Offset, Items) -> + First = decode_stamp(lists:nth(3, hd(Items))), + Last = decode_stamp(lists:nth(3, lists:last(Items))), + #rsm_set{count = Count, index = Index, + first = #rsm_first{index = Offset, data = First}, + last = Last}. + +encode_stamp(Stamp) -> + case jlib:datetime_string_to_timestamp(Stamp) of + {MS,S,US} -> encode_now({MS,S,US}); + _ -> Stamp + end. +decode_stamp(Stamp) -> + jlib:now_to_utc_string(decode_now(Stamp)). + +encode_now({T1, T2, T3}) -> + <<(misc:i2l(T1, 6))/binary, ":", + (misc:i2l(T2, 6))/binary, ":", + (misc:i2l(T3, 6))/binary>>. +decode_now(NowStr) -> + [MS, S, US] = binary:split(NowStr, <<":">>, [global]), + {binary_to_integer(MS), binary_to_integer(S), binary_to_integer(US)}.