Make count_offline_messages cache work when offline uses mam for storage

This also replace existing cache for checking if spool is empty with this
cache.
This commit is contained in:
Paweł Chmielowski 2019-07-01 13:36:05 +02:00
parent c5fde9d5af
commit 3e8f3573a3
5 changed files with 110 additions and 60 deletions

View File

@ -98,7 +98,7 @@
-callback remove_old_messages(non_neg_integer(), binary()) -> {atomic, any()}.
-callback remove_user(binary(), binary()) -> any().
-callback read_message_headers(binary(), binary()) ->
[{non_neg_integer(), jid(), jid(), undefined | erlang:timestamp(), xmlel()}].
[{non_neg_integer(), jid(), jid(), undefined | erlang:timestamp(), xmlel()}] | error.
-callback read_message(binary(), binary(), non_neg_integer()) ->
{ok, #offline_msg{}} | error.
-callback remove_message(binary(), binary(), non_neg_integer()) -> ok | {error, any()}.
@ -222,26 +222,26 @@ store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) ->
Mod = gen_mod:db_mod(Server, ?MODULE),
case UseMam andalso xmpp:get_meta(Pkt, mam_archived, false) of
true ->
ets_cache:lookup(?EMPTY_SPOOL_CACHE, {User, Server},
fun() ->
case count_messages_in_db(Mod, User, Server) of
0 ->
case Mod:store_message(Msg) of
ok ->
{cache, ok};
Err ->
{nocache, Err}
end;
_ ->
{cache, ok}
case count_offline_messages(User, Server) of
0 ->
store_message_in_db(Mod, Msg);
_ ->
case use_cache(Mod, Server) of
true ->
ets_cache:incr(
?SPOOL_COUNTER_CACHE,
{User, Server}, 1,
cache_nodes(Mod, Server));
false ->
ok
end
end);
end;
false ->
case get_max_user_messages(User, Server) of
infinity ->
store_message_in_db(Mod, Msg);
Limit ->
Num = count_messages_in_db(Mod, User, Server),
Num = count_offline_messages(User, Server),
if Num < Limit ->
store_message_in_db(Mod, Msg);
true ->
@ -288,7 +288,12 @@ get_sm_items(_Acc, #jid{luser = U, lserver = S} = JID,
?NS_FLEX_OFFLINE, _Lang) ->
ejabberd_sm:route(JID, {resend_offline, false}),
Mod = gen_mod:db_mod(S, ?MODULE),
Hdrs = Mod:read_message_headers(U, S),
Hdrs = case Mod:read_message_headers(U, S) of
L when is_list(L) ->
L;
_ ->
[]
end,
BareJID = jid:remove_resource(JID),
{result, lists:map(
fun({Seq, From, _To, _TS, _El}) ->
@ -516,9 +521,31 @@ store_packet({_Action, #message{from = From, to = To} = Packet} = Acc) ->
stop
end
end;
_ -> Acc
_ ->
maybe_update_cache(To, Packet),
Acc
end;
false -> Acc
false ->
maybe_update_cache(To, Packet),
Acc
end.
-spec maybe_update_cache(jid(), message()) -> ok.
maybe_update_cache(#jid{lserver = Server, luser = User}, Packet) ->
case xmpp:get_meta(Packet, mam_archived, false) of
true ->
Mod = gen_mod:db_mod(Server, ?MODULE),
case use_mam_for_user(User, Server) andalso use_cache(Mod, Server) of
true ->
ets_cache:incr(
?SPOOL_COUNTER_CACHE,
{User, Server}, 1,
cache_nodes(Mod, Server));
_ ->
ok
end;
_ ->
ok
end.
-spec check_store_hint(message()) -> store | no_store | none.
@ -750,7 +777,12 @@ offline_msg_to_route(LServer, #offline_msg{from = From, to = To} = R) ->
-spec read_messages(binary(), binary()) -> [{binary(), message()}].
read_messages(LUser, LServer) ->
Res = read_db_messages(LUser, LServer),
Res = case read_db_messages(LUser, LServer) of
error ->
[];
L when is_list(L) ->
L
end,
case use_mam_for_user(LUser, LServer) of
true ->
read_mam_messages(LUser, LServer, Res);
@ -758,27 +790,32 @@ read_messages(LUser, LServer) ->
Res
end.
-spec read_db_messages(binary(), binary()) -> [{binary(), message()}].
-spec read_db_messages(binary(), binary()) -> [{binary(), message()}] | error.
read_db_messages(LUser, LServer) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
CodecOpts = ejabberd_config:codec_options(),
lists:flatmap(
fun({Seq, From, To, TS, El}) ->
Node = integer_to_binary(Seq),
try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
Pkt ->
case Mod:read_message_headers(LUser, LServer) of
error ->
error;
L ->
lists:flatmap(
fun({Seq, From, To, TS, El}) ->
Node = integer_to_binary(Seq),
Pkt1 = add_delay_info(Pkt, LServer, TS),
Pkt2 = xmpp:set_from_to(Pkt1, From, To),
[{Node, Pkt2}]
catch _:{xmpp_codec, Why} ->
?ERROR_MSG("Failed to decode packet ~p "
"of user ~s: ~s",
[El, jid:encode(To),
xmpp:format_error(Why)]),
[]
end
end, Mod:read_message_headers(LUser, LServer)).
try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
Pkt ->
Node = integer_to_binary(Seq),
Pkt1 = add_delay_info(Pkt, LServer, TS),
Pkt2 = xmpp:set_from_to(Pkt1, From, To),
[{Node, Pkt2}]
catch _:{xmpp_codec, Why} ->
?ERROR_MSG("Failed to decode packet ~p "
"of user ~s: ~s",
[El, jid:encode(To),
xmpp:format_error(Why)]),
[]
end
end, L)
end.
-spec parse_marker_messages(binary(), [#offline_msg{} | {any(), message()}]) ->
{integer() | none, [message()]}.
@ -896,13 +933,15 @@ read_mam_messages(LUser, LServer, ReadMsgs) ->
end, 1, AllMsgs2),
AllMsgs3.
-spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) ->
integer().
-spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}] | error) ->
{cache, integer()} | {nocache, integer()}.
count_mam_messages(_LUser, _LServer, error) ->
{nocache, 0};
count_mam_messages(LUser, LServer, ReadMsgs) ->
{Start, ExtraMsgs} = parse_marker_messages(LServer, ReadMsgs),
case Start of
none ->
length(ExtraMsgs);
{cache, length(ExtraMsgs)};
_ ->
MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of
Number when is_integer(Number) -> Number - length(ExtraMsgs);
@ -914,7 +953,7 @@ count_mam_messages(LUser, LServer, ReadMsgs) ->
#rsm_set{max = MaxOfflineMsgs,
before = <<"9999999999999999">>},
chat, only_count),
Count + length(ExtraMsgs)
{cache, Count + length(ExtraMsgs)}
end.
format_user_queue(Hdrs) ->
@ -957,7 +996,10 @@ user_queue(User, Server, Query, Lang) ->
US = {LUser, LServer},
Mod = gen_mod:db_mod(LServer, ?MODULE),
user_queue_parse_query(LUser, LServer, Query),
HdrsAll = Mod:read_message_headers(LUser, LServer),
HdrsAll = case Mod:read_message_headers(LUser, LServer) of
error -> [];
L -> L
end,
Hdrs = get_messages_subset(User, Server, HdrsAll),
FMsgs = format_user_queue(Hdrs),
[?XC(<<"h1">>,
@ -1082,26 +1124,32 @@ webadmin_user_parse_query(Acc, _Action, _User, _Server,
count_offline_messages(User, Server) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
Mod = gen_mod:db_mod(LServer, ?MODULE),
case use_mam_for_user(User, Server) of
true ->
Res = read_db_messages(LUser, LServer),
count_mam_messages(LUser, LServer, Res);
case use_cache(Mod, LServer) of
true ->
ets_cache:lookup(
?SPOOL_COUNTER_CACHE, {LUser, LServer},
fun() ->
Res = read_db_messages(LUser, LServer),
count_mam_messages(LUser, LServer, Res)
end);
false ->
Res = read_db_messages(LUser, LServer),
ets_cache:untag(count_mam_messages(LUser, LServer, Res))
end;
_ ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
count_messages_in_db(Mod, LUser, LServer)
end.
-spec count_messages_in_db(module(), binary(), binary()) -> non_neg_integer().
count_messages_in_db(Mod, LUser, LServer) ->
case use_cache(Mod, LServer) of
true ->
ets_cache:lookup(
?SPOOL_COUNTER_CACHE, {LUser, LServer},
fun() ->
Mod:count_messages(LUser, LServer)
end);
false ->
ets_cache:untag(Mod:count_messages(LUser, LServer))
case use_cache(Mod, LServer) of
true ->
ets_cache:lookup(
?SPOOL_COUNTER_CACHE, {LUser, LServer},
fun() ->
Mod:count_messages(LUser, LServer)
end);
false ->
ets_cache:untag(Mod:count_messages(LUser, LServer))
end
end.
-spec store_message_in_db(module(), #offline_msg{}) -> ok | {error, any()}.

View File

@ -88,7 +88,7 @@ read_message_headers(LUser, LServer) ->
end, Rs),
lists:keysort(1, Hdrs);
_Err ->
[]
error
end.
read_message(_LUser, _LServer, I) ->

View File

@ -131,7 +131,7 @@ read_message_headers(LUser, LServer) ->
end
end, Rows);
_Err ->
[]
error
end.
read_message(LUser, LServer, Seq) ->

View File

@ -22,6 +22,7 @@ define_macro:
db_type: sql
ram_db_type: sql
mod_offline:
use_cache: true
db_type: sql
mod_privacy:
db_type: sql

View File

@ -22,6 +22,7 @@ define_macro:
db_type: sql
ram_db_type: sql
mod_offline:
use_cache: true
db_type: sql
mod_privacy:
db_type: sql