From 4aebd2fd8ecb64a0065b1ab1c2e736dacabc6543 Mon Sep 17 00:00:00 2001 From: Evgeny Khramtsov Date: Sun, 30 Jun 2019 21:14:37 +0300 Subject: [PATCH] Cache number of offline messages --- rebar.config | 2 +- src/mod_offline.erl | 155 ++++++++++++++++++++++++++++--------- src/mod_offline_mnesia.erl | 8 +- src/mod_offline_opt.erl | 7 ++ src/mod_offline_riak.erl | 4 +- src/mod_offline_sql.erl | 7 +- 6 files changed, 137 insertions(+), 46 deletions(-) diff --git a/rebar.config b/rebar.config index 4f477ebfa..e8c88a18a 100644 --- a/rebar.config +++ b/rebar.config @@ -20,7 +20,7 @@ {deps, [{lager, ".*", {git, "https://github.com/erlang-lager/lager", "3.6.10"}}, {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", "2887223"}}, - {cache_tab, ".*", {git, "https://github.com/processone/cache_tab", "8c4487c"}}, + {cache_tab, ".*", {git, "https://github.com/processone/cache_tab", "a425873340"}}, {fast_tls, ".*", {git, "https://github.com/processone/fast_tls", {tag, "1.1.1"}}}, {stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.16"}}}, {fast_xml, ".*", {git, "https://github.com/processone/fast_xml", "7fd02f3a2f"}}, diff --git a/src/mod_offline.erl b/src/mod_offline.erl index 01c4e04bc..27035e840 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -85,6 +85,7 @@ -define(MAX_USER_MESSAGES, infinity). -define(EMPTY_SPOOL_CACHE, offline_empty_cache). +-define(SPOOL_COUNTER_CACHE, offline_msg_counter_cache). -type c2s_state() :: ejabberd_c2s:state(). @@ -103,9 +104,12 @@ -callback remove_message(binary(), binary(), non_neg_integer()) -> ok | {error, any()}. -callback read_all_messages(binary(), binary()) -> [#offline_msg{}]. -callback remove_all_messages(binary(), binary()) -> {atomic, any()}. --callback count_messages(binary(), binary()) -> non_neg_integer(). +-callback count_messages(binary(), binary()) -> {ets_cache:tag(), non_neg_integer()}. +-callback use_cache(binary()) -> boolean(). +-callback cache_nodes(binary()) -> [node()]. --optional_callbacks([remove_expired_messages/1, remove_old_messages/2]). +-optional_callbacks([remove_expired_messages/1, remove_old_messages/2, + use_cache/1, cache_nodes/1]). depends(_Host, _Opts) -> []. @@ -113,7 +117,7 @@ depends(_Host, _Opts) -> start(Host, Opts) -> Mod = gen_mod:db_mod(Opts, ?MODULE), Mod:init(Host, Opts), - init_cache(Opts), + init_cache(Mod, Host, Opts), ejabberd_hooks:add(offline_message_hook, Host, ?MODULE, store_packet, 50), ejabberd_hooks:add(c2s_self_presence, Host, ?MODULE, c2s_self_presence, 50), @@ -163,33 +167,64 @@ stop(Host) -> reload(Host, NewOpts, OldOpts) -> NewMod = gen_mod:db_mod(NewOpts, ?MODULE), OldMod = gen_mod:db_mod(OldOpts, ?MODULE), - init_cache(NewOpts), + init_cache(NewMod, Host, NewOpts), if NewMod /= OldMod -> NewMod:init(Host, NewOpts); true -> ok end. -init_cache(Opts) -> +init_cache(Mod, Host, Opts) -> + CacheOpts = [{max_size, mod_offline_opt:cache_size(Opts)}, + {life_time, mod_offline_opt:cache_life_time(Opts)}, + {cache_missed, false}], case mod_offline_opt:use_mam_for_storage(Opts) of true -> - MaxSize = mod_offline_opt:cache_size(Opts), - LifeTime = mod_offline_opt:cache_life_time(Opts), - COpts = [{max_size, MaxSize}, {cache_missed, false}, {life_time, LifeTime}], - ets_cache:new(?EMPTY_SPOOL_CACHE, COpts); + ets_cache:new(?EMPTY_SPOOL_CACHE, CacheOpts); false -> ets_cache:delete(?EMPTY_SPOOL_CACHE) + end, + case use_cache(Mod, Host) of + true -> + ets_cache:new(?SPOOL_COUNTER_CACHE, CacheOpts); + false -> + ets_cache:delete(?SPOOL_COUNTER_CACHE) + end. + +-spec use_cache(module(), binary()) -> boolean(). +use_cache(Mod, Host) -> + case erlang:function_exported(Mod, use_cache, 1) of + true -> Mod:use_cache(Host); + false -> mod_offline_opt:use_cache(Host) + end. + +-spec cache_nodes(module(), binary()) -> [node()]. +cache_nodes(Mod, Host) -> + case erlang:function_exported(Mod, cache_nodes, 1) of + true -> Mod:cache_nodes(Host); + false -> ejabberd_cluster:get_nodes() + end. + +-spec flush_cache(module(), binary(), binary()) -> ok. +flush_cache(Mod, User, Server) -> + case use_cache(Mod, Server) of + true -> + ets_cache:delete(?SPOOL_COUNTER_CACHE, + {User, Server}, + cache_nodes(Mod, Server)); + false -> + ok end. -spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}. store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) -> UseMam = use_mam_for_user(User, Server), + Mod = gen_mod:db_mod(Server, ?MODULE), case UseMam andalso xmpp:get_meta(Pkt, mam_archived, false) of true -> - Mod = gen_mod:db_mod(Server, ?MODULE), ets_cache:lookup(?EMPTY_SPOOL_CACHE, {User, Server}, fun() -> - case count_messages_in_db(User, Server) of + case count_messages_in_db(Mod, User, Server) of 0 -> case Mod:store_message(Msg) of ok -> @@ -202,15 +237,14 @@ store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) -> end end); false -> - Mod = gen_mod:db_mod(Server, ?MODULE), case get_max_user_messages(User, Server) of infinity -> - Mod:store_message(Msg); + store_message_in_db(Mod, Msg); Limit -> - Num = count_messages_in_db(User, Server), + Num = count_messages_in_db(Mod, User, Server), if Num < Limit -> - Mod:store_message(Msg); - true -> + store_message_in_db(Mod, Msg); + true -> {error, full} end end @@ -407,6 +441,7 @@ remove_msg_by_node(To, Seq) -> LServer = To#jid.lserver, Mod = gen_mod:db_mod(LServer, ?MODULE), Mod:remove_message(LUser, LServer, I), + flush_cache(Mod, LUser, LServer), true; _ -> false @@ -573,6 +608,7 @@ route_offline_messages(#{jid := #jid{luser = LUser, lserver = LServer}} = State) packet = Msg} end, read_mam_messages(LUser, LServer, OffMsgs)); _ -> + flush_cache(Mod, LUser, LServer), OffMsgs end; _ -> @@ -619,16 +655,24 @@ remove_expired_messages(Server) -> LServer = jid:nameprep(Server), Mod = gen_mod:db_mod(LServer, ?MODULE), case erlang:function_exported(Mod, remove_expired_messages, 1) of - true -> Mod:remove_expired_messages(LServer); - false -> erlang:error(not_implemented) + true -> + Ret = Mod:remove_expired_messages(LServer), + ets_cache:clear(?SPOOL_COUNTER_CACHE), + Ret; + false -> + erlang:error(not_implemented) end. remove_old_messages(Days, Server) -> LServer = jid:nameprep(Server), Mod = gen_mod:db_mod(LServer, ?MODULE), case erlang:function_exported(Mod, remove_old_messages, 2) of - true -> Mod:remove_old_messages(Days, LServer); - false -> erlang:error(not_implemented) + true -> + Ret = Mod:remove_old_messages(Days, LServer), + ets_cache:clear(?SPOOL_COUNTER_CACHE), + Ret; + false -> + erlang:error(not_implemented) end. -spec remove_user(binary(), binary()) -> ok. @@ -637,7 +681,7 @@ remove_user(User, Server) -> LServer = jid:nameprep(Server), Mod = gen_mod:db_mod(LServer, ?MODULE), Mod:remove_user(LUser, LServer), - ok. + flush_cache(Mod, LUser, LServer). %% Helper functions: @@ -944,23 +988,29 @@ user_queue_parse_query(LUser, LServer, Query) -> Mod = gen_mod:db_mod(LServer, ?MODULE), case lists:keysearch(<<"delete">>, 1, Query) of {value, _} -> - user_queue_parse_query(LUser, LServer, Query, Mod); + case user_queue_parse_query(LUser, LServer, Query, Mod, false) of + true -> + flush_cache(Mod, LUser, LServer); + false -> + ok + end; _ -> ok end. -user_queue_parse_query(LUser, LServer, Query, Mod) -> +user_queue_parse_query(LUser, LServer, Query, Mod, Acc) -> case lists:keytake(<<"selected">>, 1, Query) of {value, {_, Seq}, Query2} -> - case catch binary_to_integer(Seq) of - I when is_integer(I), I>=0 -> - Mod:remove_message(LUser, LServer, I); - _ -> - ok - end, - user_queue_parse_query(LUser, LServer, Query2, Mod); + NewAcc = case catch binary_to_integer(Seq) of + I when is_integer(I), I>=0 -> + Mod:remove_message(LUser, LServer, I), + true; + _ -> + Acc + end, + user_queue_parse_query(LUser, LServer, Query2, Mod, NewAcc); false -> - ok + Acc end. us_to_list({User, Server}) -> @@ -1007,7 +1057,9 @@ delete_all_msgs(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:remove_all_messages(LUser, LServer). + Ret = Mod:remove_all_messages(LUser, LServer), + flush_cache(Mod, LUser, LServer), + Ret. webadmin_user_parse_query(_, <<"removealloffline">>, User, Server, _Query) -> @@ -1035,13 +1087,39 @@ count_offline_messages(User, Server) -> Res = read_db_messages(LUser, LServer), count_mam_messages(LUser, LServer, Res); _ -> - count_messages_in_db(LUser, LServer) + Mod = gen_mod:db_mod(LServer, ?MODULE), + count_messages_in_db(Mod, LUser, LServer) end. --spec count_messages_in_db(binary(), binary()) -> non_neg_integer(). -count_messages_in_db(LUser, LServer) -> - Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:count_messages(LUser, LServer). +-spec count_messages_in_db(module(), binary(), binary()) -> non_neg_integer(). +count_messages_in_db(Mod, LUser, LServer) -> + case use_cache(Mod, LServer) of + true -> + ets_cache:lookup( + ?SPOOL_COUNTER_CACHE, {LUser, LServer}, + fun() -> + Mod:count_messages(LUser, LServer) + end); + false -> + ets_cache:untag(Mod:count_messages(LUser, LServer)) + end. + +-spec store_message_in_db(module(), #offline_msg{}) -> ok | {error, any()}. +store_message_in_db(Mod, #offline_msg{us = {User, Server}} = Msg) -> + case Mod:store_message(Msg) of + ok -> + case use_cache(Mod, Server) of + true -> + ets_cache:incr( + ?SPOOL_COUNTER_CACHE, + {User, Server}, 1, + cache_nodes(Mod, Server)); + false -> + ok + end; + Err -> + Err + end. -spec add_delay_info(message(), binary(), undefined | erlang:timestamp()) -> message(). @@ -1107,6 +1185,8 @@ mod_opt_type(store_empty_body) -> econf:bool()); mod_opt_type(db_type) -> econf:db_type(?MODULE); +mod_opt_type(use_cache) -> + econf:bool(); mod_opt_type(cache_size) -> econf:pos_int(infinity); mod_opt_type(cache_life_time) -> @@ -1119,5 +1199,6 @@ mod_options(Host) -> {use_mam_for_storage, false}, {bounce_groupchat, false}, {store_groupchat, false}, + {use_cache, ejabberd_option:use_cache(Host)}, {cache_size, ejabberd_option:cache_size(Host)}, {cache_life_time, ejabberd_option:cache_life_time(Host)}]. diff --git a/src/mod_offline_mnesia.erl b/src/mod_offline_mnesia.erl index d82735dee..7fec22a6c 100644 --- a/src/mod_offline_mnesia.erl +++ b/src/mod_offline_mnesia.erl @@ -156,10 +156,10 @@ count_messages(LUser, LServer) -> F = fun () -> count_mnesia_records(US) end, - case catch mnesia:async_dirty(F) of - I when is_integer(I) -> I; - _ -> 0 - end. + {cache, case mnesia:async_dirty(F) of + I when is_integer(I) -> I; + _ -> 0 + end}. import(#offline_msg{} = Msg) -> mnesia:dirty_write(Msg). diff --git a/src/mod_offline_opt.erl b/src/mod_offline_opt.erl index bb5eac6d9..e9ab7c71b 100644 --- a/src/mod_offline_opt.erl +++ b/src/mod_offline_opt.erl @@ -10,6 +10,7 @@ -export([db_type/1]). -export([store_empty_body/1]). -export([store_groupchat/1]). +-export([use_cache/1]). -export([use_mam_for_storage/1]). -spec access_max_user_messages(gen_mod:opts() | global | binary()) -> atom() | [ejabberd_shaper:shaper_rule()]. @@ -54,6 +55,12 @@ store_groupchat(Opts) when is_map(Opts) -> store_groupchat(Host) -> gen_mod:get_module_opt(Host, mod_offline, store_groupchat). +-spec use_cache(gen_mod:opts() | global | binary()) -> boolean(). +use_cache(Opts) when is_map(Opts) -> + gen_mod:get_opt(use_cache, Opts); +use_cache(Host) -> + gen_mod:get_module_opt(Host, mod_offline, use_cache). + -spec use_mam_for_storage(gen_mod:opts() | global | binary()) -> boolean(). use_mam_for_storage(Opts) when is_map(Opts) -> gen_mod:get_opt(use_mam_for_storage, Opts); diff --git a/src/mod_offline_riak.erl b/src/mod_offline_riak.erl index db86767ce..2c1ddea7c 100644 --- a/src/mod_offline_riak.erl +++ b/src/mod_offline_riak.erl @@ -124,9 +124,9 @@ count_messages(LUser, LServer) -> case ejabberd_riak:count_by_index( offline_msg, <<"us">>, {LUser, LServer}) of {ok, Res} -> - Res; + {cache, Res}; _ -> - 0 + {nocache, 0} end. import(#offline_msg{us = US, timestamp = TS} = M) -> diff --git a/src/mod_offline_sql.erl b/src/mod_offline_sql.erl index 822ca7f43..640cc071e 100644 --- a/src/mod_offline_sql.erl +++ b/src/mod_offline_sql.erl @@ -185,8 +185,11 @@ count_messages(LUser, LServer) -> ?SQL("select @(count(*))d from spool " "where username=%(LUser)s and %(LServer)H")) of {selected, [{Res}]} -> - Res; - _ -> 0 + {cache, Res}; + {selected, []} -> + {cache, 0}; + _ -> + {nocache, 0} end. export(_Server) ->