From 0d2720d7ab96c0023cfe09234807418c9f310ca0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chmielowski?= Date: Thu, 2 May 2019 11:12:22 +0200 Subject: [PATCH] Don't issue count/message fetch queries for offline from mam when not needed --- src/mod_mam.erl | 47 ++++++++++++++++++++++++++++++--------------- src/mod_mam_sql.erl | 28 +++++++++++++++++---------- src/mod_offline.erl | 43 ++++++++++++++++++++++++++++++++++------- 3 files changed, 85 insertions(+), 33 deletions(-) diff --git a/src/mod_mam.erl b/src/mod_mam.erl index 73a00180e..ba00d74e5 100644 --- a/src/mod_mam.erl +++ b/src/mod_mam.erl @@ -42,7 +42,7 @@ get_room_config/4, set_room_option/3, offline_message/1, export/1, mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2, is_empty_for_user/2, is_empty_for_room/3, check_create_room/4, - process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2, select/6]). + process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2, select/7]). -include("xmpp.hrl"). -include("logger.hrl"). @@ -71,17 +71,22 @@ #rsm_set{} | undefined, chat | groupchat) -> {[{binary(), non_neg_integer(), xmlel()}], boolean(), count()} | {error, db_failure}. +-callback select(binary(), jid(), jid(), mam_query:result(), + #rsm_set{} | undefined, chat | groupchat, + all | only_count | only_messages) -> + {[{binary(), non_neg_integer(), xmlel()}], boolean(), count()} | + {error, db_failure}. -callback use_cache(binary()) -> boolean(). -callback cache_nodes(binary()) -> [node()]. -callback remove_from_archive(binary(), binary(), jid() | none) -> ok | {error, any()}. -callback is_empty_for_user(binary(), binary()) -> boolean(). -callback is_empty_for_room(binary(), binary(), binary()) -> boolean(). -callback select_with_mucsub(binary(), jid(), jid(), mam_query:result(), - #rsm_set{} | undefined) -> + #rsm_set{} | undefined, all | only_count | only_messages) -> {[{binary(), non_neg_integer(), xmlel()}], boolean(), count()} | {error, db_failure}. --optional_callbacks([use_cache/1, cache_nodes/1, select_with_mucsub/5]). +-optional_callbacks([use_cache/1, cache_nodes/1, select_with_mucsub/6, select/6, select/7]). %%%=================================================================== %%% API @@ -1038,9 +1043,12 @@ select_and_send(LServer, Query, RSM, #iq{from = From, to = To} = IQ, MsgType) -> xmpp:make_error(IQ, Err) end. +select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType) -> + select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, all). + select(_LServer, JidRequestor, JidArchive, Query, RSM, {groupchat, _Role, #state{config = #config{mam = false}, - history = History}} = MsgType) -> + history = History}} = MsgType, _Flags) -> Start = proplists:get_value(start, Query), End = proplists:get_value('end', Query), #lqueue{queue = Q} = History, @@ -1079,21 +1087,20 @@ select(_LServer, JidRequestor, JidArchive, Query, RSM, _ -> {Msgs, true, L} end; -select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType) -> +select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags) -> case might_expose_jid(Query, MsgType) of true -> {[], true, 0}; false -> case {MsgType, gen_mod:get_module_opt(LServer, ?MODULE, user_mucsub_from_muc_archive)} of {chat, true} -> - select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM); + select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM, Flags); _ -> - Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType) + db_select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags) end end. -select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM) -> +select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM, Flags) -> MucHosts = mod_muc_admin:find_hosts(LServer), Mod = gen_mod:db_mod(LServer, ?MODULE), case proplists:get_value(with, Query) of @@ -1103,20 +1110,19 @@ select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM) -> select(LServer, JidRequestor, MucJid, Query, RSM, {groupchat, member, #state{config = #config{mam = true}}}); _ -> - Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, chat) + db_select(LServer, JidRequestor, JidArchive, Query, RSM, chat, Flags) end; _ -> - case erlang:function_exported(Mod, select_with_mucsub, 5) of + case erlang:function_exported(Mod, select_with_mucsub, 6) of true -> - Mod:select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM); + Mod:select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM, Flags); false -> - select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM) + select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM, Flags) end end. -select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM) -> - Mod = gen_mod:db_mod(LServer, ?MODULE), - case Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, chat) of +select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM, Flags) -> + case db_select(LServer, JidRequestor, JidArchive, Query, RSM, chat, Flags) of {error, _} = Err -> Err; {Entries, All, Count} -> @@ -1166,6 +1172,15 @@ select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM) -> end end. +db_select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags) -> + Mod = gen_mod:db_mod(LServer, ?MODULE), + case erlang:function_exported(Mod, select, 7) of + true -> + Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags); + _ -> + Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType) + end. + wrap_as_mucsub(Messages, #jid{lserver = LServer} = Requester) -> ReqBare = jid:remove_resource(Requester), ReqServer = jid:make(<<>>, LServer, <<>>), diff --git a/src/mod_mam_sql.erl b/src/mod_mam_sql.erl index 5379a055a..be87e64da 100644 --- a/src/mod_mam_sql.erl +++ b/src/mod_mam_sql.erl @@ -30,8 +30,8 @@ %% API -export([init/2, remove_user/2, remove_room/3, delete_old_messages/3, - extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/6, export/1, remove_from_archive/3, - is_empty_for_user/2, is_empty_for_room/3, select_with_mucsub/5]). + extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/7, export/1, remove_from_archive/3, + is_empty_for_user/2, is_empty_for_room/3, select_with_mucsub/6]). -include_lib("stdlib/include/ms_transform.hrl"). -include("xmpp.hrl"). @@ -174,20 +174,20 @@ get_prefs(LUser, LServer) -> end. select(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, - MAMQuery, RSM, MsgType) -> + MAMQuery, RSM, MsgType, Flags) -> User = case MsgType of chat -> LUser; _ -> jid:encode(JidArchive) end, {Query, CountQuery} = make_sql_query(User, LServer, MAMQuery, RSM, none), - do_select_query(LServer, JidRequestor, JidArchive, RSM, MsgType, Query, CountQuery). + do_select_query(LServer, JidRequestor, JidArchive, RSM, MsgType, Query, CountQuery, Flags). -spec select_with_mucsub(binary(), jid(), jid(), mam_query:result(), - #rsm_set{} | undefined) -> + #rsm_set{} | undefined, all | only_count | only_messages) -> {[{binary(), non_neg_integer(), xmlel()}], boolean(), integer()} | {error, db_failure}. select_with_mucsub(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, - MAMQuery, RSM) -> + MAMQuery, RSM, Flags) -> Extra = case gen_mod:db_mod(LServer, mod_muc) of mod_muc_sql -> subscribers_table; @@ -204,17 +204,25 @@ select_with_mucsub(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, [jid:encode(Jid) || {Jid, _} <- SubRooms] end, {Query, CountQuery} = make_sql_query(LUser, LServer, MAMQuery, RSM, Extra), - do_select_query(LServer, JidRequestor, JidArchive, RSM, chat, Query, CountQuery). + do_select_query(LServer, JidRequestor, JidArchive, RSM, chat, Query, CountQuery, Flags). -do_select_query(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, RSM, MsgType, Query, CountQuery) -> +do_select_query(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, RSM, + MsgType, Query, CountQuery, Flags) -> % TODO from XEP-0313 v0.2: "To conserve resources, a server MAY place a % reasonable limit on how many stanzas may be pushed to a client in one % request. If a query returns a number of stanzas greater than this limit % and the client did not specify a limit using RSM then the server should % return a policy-violation error to the client." We currently don't do this % for v0.2 requests, but we do limit #rsm_in.max for v0.3 and newer. - case {ejabberd_sql:sql_query(LServer, Query), - ejabberd_sql:sql_query(LServer, CountQuery)} of + QRes = case Flags of + all -> + {ejabberd_sql:sql_query(LServer, Query), ejabberd_sql:sql_query(LServer, CountQuery)}; + only_messages -> + {ejabberd_sql:sql_query(LServer, Query), {selected, ok, [[<<"0">>]]}}; + only_count -> + {{selected, ok, []}, ejabberd_sql:sql_query(LServer, CountQuery)} + end, + case QRes of {{selected, _, Res}, {selected, _, [[Count]]}} -> {Max, Direction, _} = get_max_direction_id(RSM), {Res1, IsComplete} = diff --git a/src/mod_offline.erl b/src/mod_offline.erl index c4ac0eb65..ff3d06aa2 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -712,6 +712,7 @@ read_messages(LUser, LServer) -> Res end. +-spec read_db_messages(binary(), binary()) -> [{binary(), message()}]. read_db_messages(LUser, LServer) -> Mod = gen_mod:db_mod(LServer, ?MODULE), CodecOpts = ejabberd_config:codec_options(LServer), @@ -733,9 +734,9 @@ read_db_messages(LUser, LServer) -> end end, Mod:read_message_headers(LUser, LServer)). --spec read_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) -> - [{integer(), message()}]. -read_mam_messages(LUser, LServer, ReadMsgs) -> +-spec parse_marker_messages(binary(), [#offline_msg{} | {any(), message()}]) -> + {integer() | none, [message()]}. +parse_marker_messages(LServer, ReadMsgs) -> {Timestamp, ExtraMsgs} = lists:foldl( fun({_Node, #message{id = <<"ActivityMarker">>, body = [], type = error} = Msg}, {T, E}) -> @@ -771,8 +772,8 @@ read_mam_messages(LUser, LServer, ReadMsgs) -> Decoded -> Pkt1 = add_delay_info(Decoded, LServer, TS), {T, [xmpp:set_from_to(Pkt1, From, To) | E]} - catch _:{xmpp_codec, _Why} -> - {T, E} + catch _:{xmpp_codec, _Why} -> + {T, E} end; ({_Node, Msg}, {T, E}) -> {T, [Msg | E]} @@ -790,6 +791,12 @@ read_mam_messages(LUser, LServer, ReadMsgs) -> _ -> Timestamp end, + {Start, ExtraMsgs}. + +-spec read_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) -> + [{integer(), message()}]. +read_mam_messages(LUser, LServer, ReadMsgs) -> + {Start, ExtraMsgs} = parse_marker_messages(LServer, ReadMsgs), AllMsgs = case Start of none -> ExtraMsgs; @@ -804,7 +811,7 @@ read_mam_messages(LUser, LServer, ReadMsgs) -> [{start, Start}], #rsm_set{max = MaxOfflineMsgs, before = <<"9999999999999999">>}, - chat), + chat, only_messages), MamMsgs2 = lists:map( fun({_, _, #forwarded{sub_els = [MM | _], delay = #delay{stamp = MMT}}}) -> add_delay_info(MM, LServer, MMT) @@ -842,6 +849,28 @@ read_mam_messages(LUser, LServer, ReadMsgs) -> end, 1, AllMsgs2), AllMsgs3. +-spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) -> + integer(). +count_mam_messages(LUser, LServer, ReadMsgs) -> + {Start, ExtraMsgs} = parse_marker_messages(LServer, ReadMsgs), + case Start of + none -> + length(ExtraMsgs); + _ -> + MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of + Number when is_integer(Number) -> Number - length(ExtraMsgs); + infinity -> undefined; + _ -> 100 - length(ExtraMsgs) + end, + JID = jid:make(LUser, LServer, <<>>), + {_, _, Count} = mod_mam:select(LServer, JID, JID, + [{start, Start}], + #rsm_set{max = MaxOfflineMsgs, + before = <<"9999999999999999">>}, + chat, only_count), + Count + length(ExtraMsgs) + end. + format_user_queue(Hdrs) -> lists:map( fun({Seq, From, To, TS, El}) -> @@ -1007,7 +1036,7 @@ count_offline_messages(User, Server) -> case use_mam_for_user(User, Server) of true -> Res = read_db_messages(LUser, LServer), - length(read_mam_messages(LUser, LServer, Res)); + count_mam_messages(LUser, LServer, Res); _ -> Mod = gen_mod:db_mod(LServer, ?MODULE), Mod:count_messages(LUser, LServer)