Don't issue count/message fetch queries for offline from mam when not needed

This commit is contained in:
Paweł Chmielowski 2019-05-02 11:12:22 +02:00
parent ef1a75a628
commit 0d2720d7ab
3 changed files with 85 additions and 33 deletions

View File

@ -42,7 +42,7 @@
get_room_config/4, set_room_option/3, offline_message/1, export/1, get_room_config/4, set_room_option/3, offline_message/1, export/1,
mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2, mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2,
is_empty_for_user/2, is_empty_for_room/3, check_create_room/4, is_empty_for_user/2, is_empty_for_room/3, check_create_room/4,
process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2, select/6]). process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2, select/7]).
-include("xmpp.hrl"). -include("xmpp.hrl").
-include("logger.hrl"). -include("logger.hrl").
@ -71,17 +71,22 @@
#rsm_set{} | undefined, chat | groupchat) -> #rsm_set{} | undefined, chat | groupchat) ->
{[{binary(), non_neg_integer(), xmlel()}], boolean(), count()} | {[{binary(), non_neg_integer(), xmlel()}], boolean(), count()} |
{error, db_failure}. {error, db_failure}.
-callback select(binary(), jid(), jid(), mam_query:result(),
#rsm_set{} | undefined, chat | groupchat,
all | only_count | only_messages) ->
{[{binary(), non_neg_integer(), xmlel()}], boolean(), count()} |
{error, db_failure}.
-callback use_cache(binary()) -> boolean(). -callback use_cache(binary()) -> boolean().
-callback cache_nodes(binary()) -> [node()]. -callback cache_nodes(binary()) -> [node()].
-callback remove_from_archive(binary(), binary(), jid() | none) -> ok | {error, any()}. -callback remove_from_archive(binary(), binary(), jid() | none) -> ok | {error, any()}.
-callback is_empty_for_user(binary(), binary()) -> boolean(). -callback is_empty_for_user(binary(), binary()) -> boolean().
-callback is_empty_for_room(binary(), binary(), binary()) -> boolean(). -callback is_empty_for_room(binary(), binary(), binary()) -> boolean().
-callback select_with_mucsub(binary(), jid(), jid(), mam_query:result(), -callback select_with_mucsub(binary(), jid(), jid(), mam_query:result(),
#rsm_set{} | undefined) -> #rsm_set{} | undefined, all | only_count | only_messages) ->
{[{binary(), non_neg_integer(), xmlel()}], boolean(), count()} | {[{binary(), non_neg_integer(), xmlel()}], boolean(), count()} |
{error, db_failure}. {error, db_failure}.
-optional_callbacks([use_cache/1, cache_nodes/1, select_with_mucsub/5]). -optional_callbacks([use_cache/1, cache_nodes/1, select_with_mucsub/6, select/6, select/7]).
%%%=================================================================== %%%===================================================================
%%% API %%% API
@ -1038,9 +1043,12 @@ select_and_send(LServer, Query, RSM, #iq{from = From, to = To} = IQ, MsgType) ->
xmpp:make_error(IQ, Err) xmpp:make_error(IQ, Err)
end. end.
select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType) ->
select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, all).
select(_LServer, JidRequestor, JidArchive, Query, RSM, select(_LServer, JidRequestor, JidArchive, Query, RSM,
{groupchat, _Role, #state{config = #config{mam = false}, {groupchat, _Role, #state{config = #config{mam = false},
history = History}} = MsgType) -> history = History}} = MsgType, _Flags) ->
Start = proplists:get_value(start, Query), Start = proplists:get_value(start, Query),
End = proplists:get_value('end', Query), End = proplists:get_value('end', Query),
#lqueue{queue = Q} = History, #lqueue{queue = Q} = History,
@ -1079,21 +1087,20 @@ select(_LServer, JidRequestor, JidArchive, Query, RSM,
_ -> _ ->
{Msgs, true, L} {Msgs, true, L}
end; end;
select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType) -> select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags) ->
case might_expose_jid(Query, MsgType) of case might_expose_jid(Query, MsgType) of
true -> true ->
{[], true, 0}; {[], true, 0};
false -> false ->
case {MsgType, gen_mod:get_module_opt(LServer, ?MODULE, user_mucsub_from_muc_archive)} of case {MsgType, gen_mod:get_module_opt(LServer, ?MODULE, user_mucsub_from_muc_archive)} of
{chat, true} -> {chat, true} ->
select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM); select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM, Flags);
_ -> _ ->
Mod = gen_mod:db_mod(LServer, ?MODULE), db_select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags)
Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType)
end end
end. end.
select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM) -> select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM, Flags) ->
MucHosts = mod_muc_admin:find_hosts(LServer), MucHosts = mod_muc_admin:find_hosts(LServer),
Mod = gen_mod:db_mod(LServer, ?MODULE), Mod = gen_mod:db_mod(LServer, ?MODULE),
case proplists:get_value(with, Query) of case proplists:get_value(with, Query) of
@ -1103,20 +1110,19 @@ select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM) ->
select(LServer, JidRequestor, MucJid, Query, RSM, select(LServer, JidRequestor, MucJid, Query, RSM,
{groupchat, member, #state{config = #config{mam = true}}}); {groupchat, member, #state{config = #config{mam = true}}});
_ -> _ ->
Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, chat) db_select(LServer, JidRequestor, JidArchive, Query, RSM, chat, Flags)
end; end;
_ -> _ ->
case erlang:function_exported(Mod, select_with_mucsub, 5) of case erlang:function_exported(Mod, select_with_mucsub, 6) of
true -> true ->
Mod:select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM); Mod:select_with_mucsub(LServer, JidRequestor, JidArchive, Query, RSM, Flags);
false -> false ->
select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM) select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM, Flags)
end end
end. end.
select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM) -> select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM, Flags) ->
Mod = gen_mod:db_mod(LServer, ?MODULE), case db_select(LServer, JidRequestor, JidArchive, Query, RSM, chat, Flags) of
case Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, chat) of
{error, _} = Err -> {error, _} = Err ->
Err; Err;
{Entries, All, Count} -> {Entries, All, Count} ->
@ -1166,6 +1172,15 @@ select_with_mucsub_fallback(LServer, JidRequestor, JidArchive, Query, RSM) ->
end end
end. end.
db_select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
case erlang:function_exported(Mod, select, 7) of
true ->
Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType, Flags);
_ ->
Mod:select(LServer, JidRequestor, JidArchive, Query, RSM, MsgType)
end.
wrap_as_mucsub(Messages, #jid{lserver = LServer} = Requester) -> wrap_as_mucsub(Messages, #jid{lserver = LServer} = Requester) ->
ReqBare = jid:remove_resource(Requester), ReqBare = jid:remove_resource(Requester),
ReqServer = jid:make(<<>>, LServer, <<>>), ReqServer = jid:make(<<>>, LServer, <<>>),

View File

@ -30,8 +30,8 @@
%% API %% API
-export([init/2, remove_user/2, remove_room/3, delete_old_messages/3, -export([init/2, remove_user/2, remove_room/3, delete_old_messages/3,
extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/6, export/1, remove_from_archive/3, extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/7, export/1, remove_from_archive/3,
is_empty_for_user/2, is_empty_for_room/3, select_with_mucsub/5]). is_empty_for_user/2, is_empty_for_room/3, select_with_mucsub/6]).
-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("stdlib/include/ms_transform.hrl").
-include("xmpp.hrl"). -include("xmpp.hrl").
@ -174,20 +174,20 @@ get_prefs(LUser, LServer) ->
end. end.
select(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, select(LServer, JidRequestor, #jid{luser = LUser} = JidArchive,
MAMQuery, RSM, MsgType) -> MAMQuery, RSM, MsgType, Flags) ->
User = case MsgType of User = case MsgType of
chat -> LUser; chat -> LUser;
_ -> jid:encode(JidArchive) _ -> jid:encode(JidArchive)
end, end,
{Query, CountQuery} = make_sql_query(User, LServer, MAMQuery, RSM, none), {Query, CountQuery} = make_sql_query(User, LServer, MAMQuery, RSM, none),
do_select_query(LServer, JidRequestor, JidArchive, RSM, MsgType, Query, CountQuery). do_select_query(LServer, JidRequestor, JidArchive, RSM, MsgType, Query, CountQuery, Flags).
-spec select_with_mucsub(binary(), jid(), jid(), mam_query:result(), -spec select_with_mucsub(binary(), jid(), jid(), mam_query:result(),
#rsm_set{} | undefined) -> #rsm_set{} | undefined, all | only_count | only_messages) ->
{[{binary(), non_neg_integer(), xmlel()}], boolean(), integer()} | {[{binary(), non_neg_integer(), xmlel()}], boolean(), integer()} |
{error, db_failure}. {error, db_failure}.
select_with_mucsub(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, select_with_mucsub(LServer, JidRequestor, #jid{luser = LUser} = JidArchive,
MAMQuery, RSM) -> MAMQuery, RSM, Flags) ->
Extra = case gen_mod:db_mod(LServer, mod_muc) of Extra = case gen_mod:db_mod(LServer, mod_muc) of
mod_muc_sql -> mod_muc_sql ->
subscribers_table; subscribers_table;
@ -204,17 +204,25 @@ select_with_mucsub(LServer, JidRequestor, #jid{luser = LUser} = JidArchive,
[jid:encode(Jid) || {Jid, _} <- SubRooms] [jid:encode(Jid) || {Jid, _} <- SubRooms]
end, end,
{Query, CountQuery} = make_sql_query(LUser, LServer, MAMQuery, RSM, Extra), {Query, CountQuery} = make_sql_query(LUser, LServer, MAMQuery, RSM, Extra),
do_select_query(LServer, JidRequestor, JidArchive, RSM, chat, Query, CountQuery). do_select_query(LServer, JidRequestor, JidArchive, RSM, chat, Query, CountQuery, Flags).
do_select_query(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, RSM, MsgType, Query, CountQuery) -> do_select_query(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, RSM,
MsgType, Query, CountQuery, Flags) ->
% TODO from XEP-0313 v0.2: "To conserve resources, a server MAY place a % TODO from XEP-0313 v0.2: "To conserve resources, a server MAY place a
% reasonable limit on how many stanzas may be pushed to a client in one % reasonable limit on how many stanzas may be pushed to a client in one
% request. If a query returns a number of stanzas greater than this limit % request. If a query returns a number of stanzas greater than this limit
% and the client did not specify a limit using RSM then the server should % and the client did not specify a limit using RSM then the server should
% return a policy-violation error to the client." We currently don't do this % return a policy-violation error to the client." We currently don't do this
% for v0.2 requests, but we do limit #rsm_in.max for v0.3 and newer. % for v0.2 requests, but we do limit #rsm_in.max for v0.3 and newer.
case {ejabberd_sql:sql_query(LServer, Query), QRes = case Flags of
ejabberd_sql:sql_query(LServer, CountQuery)} of all ->
{ejabberd_sql:sql_query(LServer, Query), ejabberd_sql:sql_query(LServer, CountQuery)};
only_messages ->
{ejabberd_sql:sql_query(LServer, Query), {selected, ok, [[<<"0">>]]}};
only_count ->
{{selected, ok, []}, ejabberd_sql:sql_query(LServer, CountQuery)}
end,
case QRes of
{{selected, _, Res}, {selected, _, [[Count]]}} -> {{selected, _, Res}, {selected, _, [[Count]]}} ->
{Max, Direction, _} = get_max_direction_id(RSM), {Max, Direction, _} = get_max_direction_id(RSM),
{Res1, IsComplete} = {Res1, IsComplete} =

View File

@ -712,6 +712,7 @@ read_messages(LUser, LServer) ->
Res Res
end. end.
-spec read_db_messages(binary(), binary()) -> [{binary(), message()}].
read_db_messages(LUser, LServer) -> read_db_messages(LUser, LServer) ->
Mod = gen_mod:db_mod(LServer, ?MODULE), Mod = gen_mod:db_mod(LServer, ?MODULE),
CodecOpts = ejabberd_config:codec_options(LServer), CodecOpts = ejabberd_config:codec_options(LServer),
@ -733,9 +734,9 @@ read_db_messages(LUser, LServer) ->
end end
end, Mod:read_message_headers(LUser, LServer)). end, Mod:read_message_headers(LUser, LServer)).
-spec read_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) -> -spec parse_marker_messages(binary(), [#offline_msg{} | {any(), message()}]) ->
[{integer(), message()}]. {integer() | none, [message()]}.
read_mam_messages(LUser, LServer, ReadMsgs) -> parse_marker_messages(LServer, ReadMsgs) ->
{Timestamp, ExtraMsgs} = lists:foldl( {Timestamp, ExtraMsgs} = lists:foldl(
fun({_Node, #message{id = <<"ActivityMarker">>, fun({_Node, #message{id = <<"ActivityMarker">>,
body = [], type = error} = Msg}, {T, E}) -> body = [], type = error} = Msg}, {T, E}) ->
@ -771,8 +772,8 @@ read_mam_messages(LUser, LServer, ReadMsgs) ->
Decoded -> Decoded ->
Pkt1 = add_delay_info(Decoded, LServer, TS), Pkt1 = add_delay_info(Decoded, LServer, TS),
{T, [xmpp:set_from_to(Pkt1, From, To) | E]} {T, [xmpp:set_from_to(Pkt1, From, To) | E]}
catch _:{xmpp_codec, _Why} -> catch _:{xmpp_codec, _Why} ->
{T, E} {T, E}
end; end;
({_Node, Msg}, {T, E}) -> ({_Node, Msg}, {T, E}) ->
{T, [Msg | E]} {T, [Msg | E]}
@ -790,6 +791,12 @@ read_mam_messages(LUser, LServer, ReadMsgs) ->
_ -> _ ->
Timestamp Timestamp
end, end,
{Start, ExtraMsgs}.
-spec read_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) ->
[{integer(), message()}].
read_mam_messages(LUser, LServer, ReadMsgs) ->
{Start, ExtraMsgs} = parse_marker_messages(LServer, ReadMsgs),
AllMsgs = case Start of AllMsgs = case Start of
none -> none ->
ExtraMsgs; ExtraMsgs;
@ -804,7 +811,7 @@ read_mam_messages(LUser, LServer, ReadMsgs) ->
[{start, Start}], [{start, Start}],
#rsm_set{max = MaxOfflineMsgs, #rsm_set{max = MaxOfflineMsgs,
before = <<"9999999999999999">>}, before = <<"9999999999999999">>},
chat), chat, only_messages),
MamMsgs2 = lists:map( MamMsgs2 = lists:map(
fun({_, _, #forwarded{sub_els = [MM | _], delay = #delay{stamp = MMT}}}) -> fun({_, _, #forwarded{sub_els = [MM | _], delay = #delay{stamp = MMT}}}) ->
add_delay_info(MM, LServer, MMT) add_delay_info(MM, LServer, MMT)
@ -842,6 +849,28 @@ read_mam_messages(LUser, LServer, ReadMsgs) ->
end, 1, AllMsgs2), end, 1, AllMsgs2),
AllMsgs3. AllMsgs3.
-spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) ->
integer().
count_mam_messages(LUser, LServer, ReadMsgs) ->
{Start, ExtraMsgs} = parse_marker_messages(LServer, ReadMsgs),
case Start of
none ->
length(ExtraMsgs);
_ ->
MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of
Number when is_integer(Number) -> Number - length(ExtraMsgs);
infinity -> undefined;
_ -> 100 - length(ExtraMsgs)
end,
JID = jid:make(LUser, LServer, <<>>),
{_, _, Count} = mod_mam:select(LServer, JID, JID,
[{start, Start}],
#rsm_set{max = MaxOfflineMsgs,
before = <<"9999999999999999">>},
chat, only_count),
Count + length(ExtraMsgs)
end.
format_user_queue(Hdrs) -> format_user_queue(Hdrs) ->
lists:map( lists:map(
fun({Seq, From, To, TS, El}) -> fun({Seq, From, To, TS, El}) ->
@ -1007,7 +1036,7 @@ count_offline_messages(User, Server) ->
case use_mam_for_user(User, Server) of case use_mam_for_user(User, Server) of
true -> true ->
Res = read_db_messages(LUser, LServer), Res = read_db_messages(LUser, LServer),
length(read_mam_messages(LUser, LServer, Res)); count_mam_messages(LUser, LServer, Res);
_ -> _ ->
Mod = gen_mod:db_mod(LServer, ?MODULE), Mod = gen_mod:db_mod(LServer, ?MODULE),
Mod:count_messages(LUser, LServer) Mod:count_messages(LUser, LServer)