diff --git a/src/mod_offline.erl b/src/mod_offline.erl index 27035e840..3b41ac97d 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -98,7 +98,7 @@ -callback remove_old_messages(non_neg_integer(), binary()) -> {atomic, any()}. -callback remove_user(binary(), binary()) -> any(). -callback read_message_headers(binary(), binary()) -> - [{non_neg_integer(), jid(), jid(), undefined | erlang:timestamp(), xmlel()}]. + [{non_neg_integer(), jid(), jid(), undefined | erlang:timestamp(), xmlel()}] | error. -callback read_message(binary(), binary(), non_neg_integer()) -> {ok, #offline_msg{}} | error. -callback remove_message(binary(), binary(), non_neg_integer()) -> ok | {error, any()}. @@ -222,26 +222,26 @@ store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) -> Mod = gen_mod:db_mod(Server, ?MODULE), case UseMam andalso xmpp:get_meta(Pkt, mam_archived, false) of true -> - ets_cache:lookup(?EMPTY_SPOOL_CACHE, {User, Server}, - fun() -> - case count_messages_in_db(Mod, User, Server) of - 0 -> - case Mod:store_message(Msg) of - ok -> - {cache, ok}; - Err -> - {nocache, Err} - end; - _ -> - {cache, ok} + case count_offline_messages(User, Server) of + 0 -> + store_message_in_db(Mod, Msg); + _ -> + case use_cache(Mod, Server) of + true -> + ets_cache:incr( + ?SPOOL_COUNTER_CACHE, + {User, Server}, 1, + cache_nodes(Mod, Server)); + false -> + ok end - end); + end; false -> case get_max_user_messages(User, Server) of infinity -> store_message_in_db(Mod, Msg); Limit -> - Num = count_messages_in_db(Mod, User, Server), + Num = count_offline_messages(User, Server), if Num < Limit -> store_message_in_db(Mod, Msg); true -> @@ -288,7 +288,12 @@ get_sm_items(_Acc, #jid{luser = U, lserver = S} = JID, ?NS_FLEX_OFFLINE, _Lang) -> ejabberd_sm:route(JID, {resend_offline, false}), Mod = gen_mod:db_mod(S, ?MODULE), - Hdrs = Mod:read_message_headers(U, S), + Hdrs = case Mod:read_message_headers(U, S) of + L when is_list(L) -> + L; + _ -> + [] + end, BareJID = jid:remove_resource(JID), {result, lists:map( fun({Seq, From, _To, _TS, _El}) -> @@ -516,9 +521,31 @@ store_packet({_Action, #message{from = From, to = To} = Packet} = Acc) -> stop end end; - _ -> Acc + _ -> + maybe_update_cache(To, Packet), + Acc end; - false -> Acc + false -> + maybe_update_cache(To, Packet), + Acc + end. + +-spec maybe_update_cache(jid(), message()) -> ok. +maybe_update_cache(#jid{lserver = Server, luser = User}, Packet) -> + case xmpp:get_meta(Packet, mam_archived, false) of + true -> + Mod = gen_mod:db_mod(Server, ?MODULE), + case use_mam_for_user(User, Server) andalso use_cache(Mod, Server) of + true -> + ets_cache:incr( + ?SPOOL_COUNTER_CACHE, + {User, Server}, 1, + cache_nodes(Mod, Server)); + _ -> + ok + end; + _ -> + ok end. -spec check_store_hint(message()) -> store | no_store | none. @@ -750,7 +777,12 @@ offline_msg_to_route(LServer, #offline_msg{from = From, to = To} = R) -> -spec read_messages(binary(), binary()) -> [{binary(), message()}]. read_messages(LUser, LServer) -> - Res = read_db_messages(LUser, LServer), + Res = case read_db_messages(LUser, LServer) of + error -> + []; + L when is_list(L) -> + L + end, case use_mam_for_user(LUser, LServer) of true -> read_mam_messages(LUser, LServer, Res); @@ -758,27 +790,32 @@ read_messages(LUser, LServer) -> Res end. --spec read_db_messages(binary(), binary()) -> [{binary(), message()}]. +-spec read_db_messages(binary(), binary()) -> [{binary(), message()}] | error. read_db_messages(LUser, LServer) -> Mod = gen_mod:db_mod(LServer, ?MODULE), CodecOpts = ejabberd_config:codec_options(), - lists:flatmap( - fun({Seq, From, To, TS, El}) -> - Node = integer_to_binary(Seq), - try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of - Pkt -> + case Mod:read_message_headers(LUser, LServer) of + error -> + error; + L -> + lists:flatmap( + fun({Seq, From, To, TS, El}) -> Node = integer_to_binary(Seq), - Pkt1 = add_delay_info(Pkt, LServer, TS), - Pkt2 = xmpp:set_from_to(Pkt1, From, To), - [{Node, Pkt2}] - catch _:{xmpp_codec, Why} -> - ?ERROR_MSG("Failed to decode packet ~p " - "of user ~s: ~s", - [El, jid:encode(To), - xmpp:format_error(Why)]), - [] - end - end, Mod:read_message_headers(LUser, LServer)). + try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of + Pkt -> + Node = integer_to_binary(Seq), + Pkt1 = add_delay_info(Pkt, LServer, TS), + Pkt2 = xmpp:set_from_to(Pkt1, From, To), + [{Node, Pkt2}] + catch _:{xmpp_codec, Why} -> + ?ERROR_MSG("Failed to decode packet ~p " + "of user ~s: ~s", + [El, jid:encode(To), + xmpp:format_error(Why)]), + [] + end + end, L) + end. -spec parse_marker_messages(binary(), [#offline_msg{} | {any(), message()}]) -> {integer() | none, [message()]}. @@ -896,13 +933,15 @@ read_mam_messages(LUser, LServer, ReadMsgs) -> end, 1, AllMsgs2), AllMsgs3. --spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) -> - integer(). +-spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}] | error) -> + {cache, integer()} | {nocache, integer()}. +count_mam_messages(_LUser, _LServer, error) -> + {nocache, 0}; count_mam_messages(LUser, LServer, ReadMsgs) -> {Start, ExtraMsgs} = parse_marker_messages(LServer, ReadMsgs), case Start of none -> - length(ExtraMsgs); + {cache, length(ExtraMsgs)}; _ -> MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of Number when is_integer(Number) -> Number - length(ExtraMsgs); @@ -914,7 +953,7 @@ count_mam_messages(LUser, LServer, ReadMsgs) -> #rsm_set{max = MaxOfflineMsgs, before = <<"9999999999999999">>}, chat, only_count), - Count + length(ExtraMsgs) + {cache, Count + length(ExtraMsgs)} end. format_user_queue(Hdrs) -> @@ -957,7 +996,10 @@ user_queue(User, Server, Query, Lang) -> US = {LUser, LServer}, Mod = gen_mod:db_mod(LServer, ?MODULE), user_queue_parse_query(LUser, LServer, Query), - HdrsAll = Mod:read_message_headers(LUser, LServer), + HdrsAll = case Mod:read_message_headers(LUser, LServer) of + error -> []; + L -> L + end, Hdrs = get_messages_subset(User, Server, HdrsAll), FMsgs = format_user_queue(Hdrs), [?XC(<<"h1">>, @@ -1082,26 +1124,32 @@ webadmin_user_parse_query(Acc, _Action, _User, _Server, count_offline_messages(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), + Mod = gen_mod:db_mod(LServer, ?MODULE), case use_mam_for_user(User, Server) of true -> - Res = read_db_messages(LUser, LServer), - count_mam_messages(LUser, LServer, Res); + case use_cache(Mod, LServer) of + true -> + ets_cache:lookup( + ?SPOOL_COUNTER_CACHE, {LUser, LServer}, + fun() -> + Res = read_db_messages(LUser, LServer), + count_mam_messages(LUser, LServer, Res) + end); + false -> + Res = read_db_messages(LUser, LServer), + ets_cache:untag(count_mam_messages(LUser, LServer, Res)) + end; _ -> - Mod = gen_mod:db_mod(LServer, ?MODULE), - count_messages_in_db(Mod, LUser, LServer) - end. - --spec count_messages_in_db(module(), binary(), binary()) -> non_neg_integer(). -count_messages_in_db(Mod, LUser, LServer) -> - case use_cache(Mod, LServer) of - true -> - ets_cache:lookup( - ?SPOOL_COUNTER_CACHE, {LUser, LServer}, - fun() -> - Mod:count_messages(LUser, LServer) - end); - false -> - ets_cache:untag(Mod:count_messages(LUser, LServer)) + case use_cache(Mod, LServer) of + true -> + ets_cache:lookup( + ?SPOOL_COUNTER_CACHE, {LUser, LServer}, + fun() -> + Mod:count_messages(LUser, LServer) + end); + false -> + ets_cache:untag(Mod:count_messages(LUser, LServer)) + end end. -spec store_message_in_db(module(), #offline_msg{}) -> ok | {error, any()}. diff --git a/src/mod_offline_riak.erl b/src/mod_offline_riak.erl index 2c1ddea7c..3e126c12c 100644 --- a/src/mod_offline_riak.erl +++ b/src/mod_offline_riak.erl @@ -88,7 +88,7 @@ read_message_headers(LUser, LServer) -> end, Rs), lists:keysort(1, Hdrs); _Err -> - [] + error end. read_message(_LUser, _LServer, I) -> diff --git a/src/mod_offline_sql.erl b/src/mod_offline_sql.erl index 640cc071e..2846b28b0 100644 --- a/src/mod_offline_sql.erl +++ b/src/mod_offline_sql.erl @@ -131,7 +131,7 @@ read_message_headers(LUser, LServer) -> end end, Rows); _Err -> - [] + error end. read_message(LUser, LServer, Seq) -> diff --git a/test/ejabberd_SUITE_data/ejabberd.mysql.yml b/test/ejabberd_SUITE_data/ejabberd.mysql.yml index 3de3c3f60..0b0550e18 100644 --- a/test/ejabberd_SUITE_data/ejabberd.mysql.yml +++ b/test/ejabberd_SUITE_data/ejabberd.mysql.yml @@ -22,6 +22,7 @@ define_macro: db_type: sql ram_db_type: sql mod_offline: + use_cache: true db_type: sql mod_privacy: db_type: sql diff --git a/test/ejabberd_SUITE_data/ejabberd.pgsql.yml b/test/ejabberd_SUITE_data/ejabberd.pgsql.yml index c64ccdd96..637fc61bc 100644 --- a/test/ejabberd_SUITE_data/ejabberd.pgsql.yml +++ b/test/ejabberd_SUITE_data/ejabberd.pgsql.yml @@ -22,6 +22,7 @@ define_macro: db_type: sql ram_db_type: sql mod_offline: + use_cache: true db_type: sql mod_privacy: db_type: sql