Change implementation of mod_offline use_mam_for_storage

Previous version was trying to determine range of messages that should
be fetched from mam by storing time when last user resource disconnected.

But that had couple edge cases that could cause problems, for example in
case of node crash we could not store information about user disconnect
and with that we didn't have data to initiate mam query.

New version don't track user disconnects, but simply ensure that we have
timestamp of first message that is gonna be put in storage, after some
measurements cost of that check with caching on top is not that costly,
and as much more robust i decided to introduce that change.
This commit is contained in:
Paweł Chmielowski 2019-05-28 14:32:09 +02:00
parent 4eaba13189
commit 571a786b9b
1 changed files with 45 additions and 48 deletions

View File

@ -61,8 +61,7 @@
c2s_copy_session/2,
webadmin_page/3,
webadmin_user/4,
webadmin_user_parse_query/5,
user_unset_presence/4]).
webadmin_user_parse_query/5]).
-export([mod_opt_type/1, mod_options/1, depends/2]).
@ -83,6 +82,8 @@
%% default value for the maximum number of user messages
-define(MAX_USER_MESSAGES, infinity).
-define(EMPTY_SPOOL_CACHE, offline_empty_cache).
-type c2s_state() :: ejabberd_c2s:state().
-callback init(binary(), gen_mod:opts()) -> any().
@ -110,6 +111,7 @@ depends(_Host, _Opts) ->
start(Host, Opts) ->
Mod = gen_mod:db_mod(Host, Opts, ?MODULE),
Mod:init(Host, Opts),
init_cache(Opts),
ejabberd_hooks:add(offline_message_hook, Host, ?MODULE,
store_packet, 50),
ejabberd_hooks:add(c2s_self_presence, Host, ?MODULE, c2s_self_presence, 50),
@ -132,8 +134,6 @@ start(Host, Opts) ->
?MODULE, webadmin_user, 50),
ejabberd_hooks:add(webadmin_user_parse_query, Host,
?MODULE, webadmin_user_parse_query, 50),
ejabberd_hooks:add(unset_presence_hook, Host, ?MODULE,
user_unset_presence, 50),
gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
?MODULE, handle_offline_query).
@ -156,39 +156,52 @@ stop(Host) ->
?MODULE, webadmin_user, 50),
ejabberd_hooks:delete(webadmin_user_parse_query, Host,
?MODULE, webadmin_user_parse_query, 50),
ejabberd_hooks:delete(unset_presence_hook, Host, ?MODULE,
user_unset_presence, 50),
gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE).
reload(Host, NewOpts, OldOpts) ->
NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE),
OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE),
init_cache(NewOpts),
if NewMod /= OldMod ->
NewMod:init(Host, NewOpts);
true ->
ok
end.
init_cache(Opts) ->
case gen_mod:get_opt(use_mam_for_storage, Opts) of
true ->
MaxSize = gen_mod:get_opt(cache_size, Opts),
LifeTime = case gen_mod:get_opt(cache_life_time, Opts) of
infinity -> infinity;
I -> timer:seconds(I)
end,
COpts = [{max_size, MaxSize}, {cache_missed, false}, {life_time, LifeTime}],
ets_cache:new(?EMPTY_SPOOL_CACHE, COpts);
false ->
ets_cache:delete(?EMPTY_SPOOL_CACHE)
end.
-spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}.
store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) ->
{UseMam, ActivityMarker} = case use_mam_for_user(User, Server) of
true ->
{true, xmpp:get_meta(Pkt, activity_marker, false)};
_ ->
{false, false}
end,
case UseMam andalso (not ActivityMarker) andalso
xmpp:get_meta(Pkt, mam_archived, false) of
UseMam = use_mam_for_user(User, Server),
case UseMam andalso xmpp:get_meta(Pkt, mam_archived, false) of
true ->
case xmpp:get_meta(Pkt, first_from_queue, false) of
true ->
store_last_activity_marker(User, Server, xmpp:get_meta(Pkt, stanza_id));
_ ->
ok
end;
false when ActivityMarker ->
Mod = gen_mod:db_mod(Server, ?MODULE),
Mod:store_message(Msg);
ets_cache:lookup(?EMPTY_SPOOL_CACHE, {User, Server},
fun() ->
case count_messages_in_db(User, Server) of
0 ->
case Mod:store_message(Msg) of
ok ->
{cache, ok};
Err ->
{nocache, Err}
end;
_ ->
{cache, ok}
end
end);
false ->
Mod = gen_mod:db_mod(Server, ?MODULE),
case get_max_user_messages(User, Server) of
@ -554,6 +567,8 @@ route_offline_messages(#{jid := #jid{luser = LUser, lserver = LServer}} = State)
{ok, OffMsgs} ->
case use_mam_for_user(LUser, LServer) of
true ->
ets_cache:delete(?EMPTY_SPOOL_CACHE, {LUser, LServer},
ejabberd_cluster:get_nodes()),
lists:map(
fun({_, #message{from = From, to = To} = Msg}) ->
#offline_msg{from = From, to = To,
@ -627,31 +642,6 @@ remove_user(User, Server) ->
Mod:remove_user(LUser, LServer),
ok.
-spec user_unset_presence(binary(), binary(), binary(), binary()) -> any().
user_unset_presence(User, Server, _Resource, _Status) ->
case use_mam_for_user(User, Server) of
true ->
case ejabberd_sm:get_user_present_resources(User, Server) of
[] ->
TimeStamp = erlang:system_time(microsecond),
store_last_activity_marker(User, Server, TimeStamp);
_ ->
ok
end;
_ ->
ok
end.
store_last_activity_marker(User, Server, Timestamp) ->
Jid = jid:make(User, Server, <<>>),
Pkt = xmpp:put_meta(#message{id = <<"ActivityMarker">>, type = error, from = Jid, to = Jid},
activity_marker, true),
Msg = #offline_msg{us = {User, Server}, from = Jid, to = Jid,
timestamp = misc:usec_to_now(Timestamp),
packet = Pkt},
store_offline_msg(Msg).
%% Helper functions:
-spec check_if_message_should_be_bounced(message()) -> boolean().
@ -1123,12 +1113,19 @@ mod_opt_type(use_mam_for_storage) ->
mod_opt_type(store_empty_body) ->
fun (V) when is_boolean(V) -> V;
(unless_chat_state) -> unless_chat_state
end;
mod_opt_type(O) when O == cache_life_time; O == cache_size ->
fun (I) when is_integer(I), I > 0 -> I;
(infinity) -> infinity
end.
mod_options(Host) ->
[{db_type, ejabberd_config:default_db(Host, ?MODULE)},
{access_max_user_messages, max_user_offline_messages},
{store_empty_body, unless_chat_state},
{use_mam_for_storage, false},
{bounce_groupchat, false},
{store_groupchat, false}].
{store_groupchat, false},
{cache_size, ejabberd_config:cache_size(Host)},
{cache_life_time, ejabberd_config:cache_life_time(Host)}].