From 571a786b9b6e18a861de4767df3ca7fe1147631f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chmielowski?= Date: Tue, 28 May 2019 14:32:09 +0200 Subject: [PATCH] Change implementation of mod_offline use_mam_for_storage Previous version was trying to determine range of messages that should be fetched from mam by storing time when last user resource disconnected. But that had couple edge cases that could cause problems, for example in case of node crash we could not store information about user disconnect and with that we didn't have data to initiate mam query. New version don't track user disconnects, but simply ensure that we have timestamp of first message that is gonna be put in storage, after some measurements cost of that check with caching on top is not that costly, and as much more robust i decided to introduce that change. --- src/mod_offline.erl | 93 ++++++++++++++++++++++----------------------- 1 file changed, 45 insertions(+), 48 deletions(-) diff --git a/src/mod_offline.erl b/src/mod_offline.erl index 63a0d6763..76682d06c 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -61,8 +61,7 @@ c2s_copy_session/2, webadmin_page/3, webadmin_user/4, - webadmin_user_parse_query/5, - user_unset_presence/4]). + webadmin_user_parse_query/5]). -export([mod_opt_type/1, mod_options/1, depends/2]). @@ -83,6 +82,8 @@ %% default value for the maximum number of user messages -define(MAX_USER_MESSAGES, infinity). +-define(EMPTY_SPOOL_CACHE, offline_empty_cache). + -type c2s_state() :: ejabberd_c2s:state(). -callback init(binary(), gen_mod:opts()) -> any(). @@ -110,6 +111,7 @@ depends(_Host, _Opts) -> start(Host, Opts) -> Mod = gen_mod:db_mod(Host, Opts, ?MODULE), Mod:init(Host, Opts), + init_cache(Opts), ejabberd_hooks:add(offline_message_hook, Host, ?MODULE, store_packet, 50), ejabberd_hooks:add(c2s_self_presence, Host, ?MODULE, c2s_self_presence, 50), @@ -132,8 +134,6 @@ start(Host, Opts) -> ?MODULE, webadmin_user, 50), ejabberd_hooks:add(webadmin_user_parse_query, Host, ?MODULE, webadmin_user_parse_query, 50), - ejabberd_hooks:add(unset_presence_hook, Host, ?MODULE, - user_unset_presence, 50), gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE, ?MODULE, handle_offline_query). @@ -156,39 +156,52 @@ stop(Host) -> ?MODULE, webadmin_user, 50), ejabberd_hooks:delete(webadmin_user_parse_query, Host, ?MODULE, webadmin_user_parse_query, 50), - ejabberd_hooks:delete(unset_presence_hook, Host, ?MODULE, - user_unset_presence, 50), gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE). reload(Host, NewOpts, OldOpts) -> NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE), OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE), + init_cache(NewOpts), if NewMod /= OldMod -> NewMod:init(Host, NewOpts); true -> ok end. +init_cache(Opts) -> + case gen_mod:get_opt(use_mam_for_storage, Opts) of + true -> + MaxSize = gen_mod:get_opt(cache_size, Opts), + LifeTime = case gen_mod:get_opt(cache_life_time, Opts) of + infinity -> infinity; + I -> timer:seconds(I) + end, + COpts = [{max_size, MaxSize}, {cache_missed, false}, {life_time, LifeTime}], + ets_cache:new(?EMPTY_SPOOL_CACHE, COpts); + false -> + ets_cache:delete(?EMPTY_SPOOL_CACHE) + end. + -spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}. store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) -> - {UseMam, ActivityMarker} = case use_mam_for_user(User, Server) of - true -> - {true, xmpp:get_meta(Pkt, activity_marker, false)}; - _ -> - {false, false} - end, - case UseMam andalso (not ActivityMarker) andalso - xmpp:get_meta(Pkt, mam_archived, false) of + UseMam = use_mam_for_user(User, Server), + case UseMam andalso xmpp:get_meta(Pkt, mam_archived, false) of true -> - case xmpp:get_meta(Pkt, first_from_queue, false) of - true -> - store_last_activity_marker(User, Server, xmpp:get_meta(Pkt, stanza_id)); - _ -> - ok - end; - false when ActivityMarker -> Mod = gen_mod:db_mod(Server, ?MODULE), - Mod:store_message(Msg); + ets_cache:lookup(?EMPTY_SPOOL_CACHE, {User, Server}, + fun() -> + case count_messages_in_db(User, Server) of + 0 -> + case Mod:store_message(Msg) of + ok -> + {cache, ok}; + Err -> + {nocache, Err} + end; + _ -> + {cache, ok} + end + end); false -> Mod = gen_mod:db_mod(Server, ?MODULE), case get_max_user_messages(User, Server) of @@ -554,6 +567,8 @@ route_offline_messages(#{jid := #jid{luser = LUser, lserver = LServer}} = State) {ok, OffMsgs} -> case use_mam_for_user(LUser, LServer) of true -> + ets_cache:delete(?EMPTY_SPOOL_CACHE, {LUser, LServer}, + ejabberd_cluster:get_nodes()), lists:map( fun({_, #message{from = From, to = To} = Msg}) -> #offline_msg{from = From, to = To, @@ -627,31 +642,6 @@ remove_user(User, Server) -> Mod:remove_user(LUser, LServer), ok. --spec user_unset_presence(binary(), binary(), binary(), binary()) -> any(). -user_unset_presence(User, Server, _Resource, _Status) -> - case use_mam_for_user(User, Server) of - true -> - case ejabberd_sm:get_user_present_resources(User, Server) of - [] -> - TimeStamp = erlang:system_time(microsecond), - store_last_activity_marker(User, Server, TimeStamp); - _ -> - ok - end; - _ -> - ok - end. - -store_last_activity_marker(User, Server, Timestamp) -> - Jid = jid:make(User, Server, <<>>), - Pkt = xmpp:put_meta(#message{id = <<"ActivityMarker">>, type = error, from = Jid, to = Jid}, - activity_marker, true), - - Msg = #offline_msg{us = {User, Server}, from = Jid, to = Jid, - timestamp = misc:usec_to_now(Timestamp), - packet = Pkt}, - store_offline_msg(Msg). - %% Helper functions: -spec check_if_message_should_be_bounced(message()) -> boolean(). @@ -1123,12 +1113,19 @@ mod_opt_type(use_mam_for_storage) -> mod_opt_type(store_empty_body) -> fun (V) when is_boolean(V) -> V; (unless_chat_state) -> unless_chat_state + end; +mod_opt_type(O) when O == cache_life_time; O == cache_size -> + fun (I) when is_integer(I), I > 0 -> I; + (infinity) -> infinity end. + mod_options(Host) -> [{db_type, ejabberd_config:default_db(Host, ?MODULE)}, {access_max_user_messages, max_user_offline_messages}, {store_empty_body, unless_chat_state}, {use_mam_for_storage, false}, {bounce_groupchat, false}, - {store_groupchat, false}]. + {store_groupchat, false}, + {cache_size, ejabberd_config:cache_size(Host)}, + {cache_life_time, ejabberd_config:cache_life_time(Host)}].