From d88e4d495ffc2ae950f77e440aa7c5d06c864309 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Sun, 21 May 2017 23:21:13 +0300 Subject: [PATCH] Don't store messages via a single process --- src/mod_offline.erl | 195 +++++++++++++------------------------ src/mod_offline_mnesia.erl | 27 +---- src/mod_offline_riak.erl | 32 ++---- src/mod_offline_sql.erl | 41 +++----- src/prosody2ejabberd.erl | 19 ++-- src/sql_queries.erl | 11 +-- 6 files changed, 106 insertions(+), 219 deletions(-) diff --git a/src/mod_offline.erl b/src/mod_offline.erl index c06bb8976..2c2c6185a 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -33,14 +33,13 @@ -protocol({xep, 160, '1.0'}). -protocol({xep, 334, '0.2'}). --behaviour(gen_server). -behaviour(gen_mod). -export([start/2, stop/1, reload/3, store_packet/1, - store_offline_msg/5, + store_offline_msg/1, c2s_self_presence/1, get_sm_features/5, get_sm_identity/5, @@ -64,9 +63,7 @@ webadmin_user/4, webadmin_user_parse_query/5]). --export([init/1, handle_call/3, handle_cast/2, - handle_info/2, terminate/2, code_change/3, - mod_opt_type/1, depends/2]). +-export([mod_opt_type/1, depends/2]). -deprecated({get_queue_length,2}). @@ -86,14 +83,11 @@ %% default value for the maximum number of user messages -define(MAX_USER_MESSAGES, infinity). --type us() :: {binary(), binary()}. -type c2s_state() :: ejabberd_c2s:state(). -callback init(binary(), gen_mod:opts()) -> any(). -callback import(#offline_msg{}) -> ok. --callback store_messages(binary(), us(), [#offline_msg{}], - non_neg_integer(), non_neg_integer()) -> - {atomic, any()}. +-callback store_message(#offline_msg{}) -> ok | {error, any()}. -callback pop_messages(binary(), binary()) -> {ok, [#offline_msg{}]} | {error, any()}. -callback remove_expired_messages(binary()) -> {atomic, any()}. @@ -108,25 +102,10 @@ -callback remove_all_messages(binary(), binary()) -> {atomic, any()}. -callback count_messages(binary(), binary()) -> non_neg_integer(). -start(Host, Opts) -> - gen_mod:start_child(?MODULE, Host, Opts). - -stop(Host) -> - gen_mod:stop_child(?MODULE, Host). - -reload(Host, NewOpts, OldOpts) -> - Proc = gen_mod:get_module_proc(Host, ?MODULE), - gen_server:cast(Proc, {reload, NewOpts, OldOpts}). - depends(_Host, _Opts) -> []. -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -init([Host, Opts]) -> - process_flag(trap_exit, true), +start(Host, Opts) -> Mod = gen_mod:db_mod(Host, Opts, ?MODULE), Mod:init(Host, Opts), IQDisc = gen_mod:get_opt(iqdisc, Opts, gen_iq_handler:iqdisc(Host)), @@ -153,64 +132,9 @@ init([Host, Opts]) -> ejabberd_hooks:add(webadmin_user_parse_query, Host, ?MODULE, webadmin_user_parse_query, 50), gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE, - ?MODULE, handle_offline_query, IQDisc), - AccessMaxOfflineMsgs = - gen_mod:get_opt(access_max_user_messages, Opts, - max_user_offline_messages), - {ok, - #state{host = Host, - access_max_offline_messages = AccessMaxOfflineMsgs}}. + ?MODULE, handle_offline_query, IQDisc). - -handle_call(stop, _From, State) -> - {stop, normal, ok, State}. - -handle_cast({reload, NewOpts, OldOpts}, #state{host = Host} = State) -> - NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE), - OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE), - if NewMod /= OldMod -> - NewMod:init(Host, NewOpts); - true -> - ok - end, - case gen_mod:is_equal_opt(iqdisc, NewOpts, OldOpts, gen_iq_handler:iqdisc(Host)) of - {false, IQDisc, _} -> - gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE, - ?MODULE, handle_offline_query, IQDisc); - true -> - ok - end, - case gen_mod:is_equal_opt(access_max_user_messages, NewOpts, OldOpts, - max_user_offline_messages) of - {false, AccessMaxOfflineMsgs, _} -> - {noreply, - State#state{access_max_offline_messages = AccessMaxOfflineMsgs}}; - true -> - {noreply, State} - end; -handle_cast(Msg, State) -> - ?WARNING_MSG("unexpected cast: ~p", [Msg]), - {noreply, State}. - - -handle_info(#offline_msg{us = UserServer} = Msg, State) -> - #state{host = Host, - access_max_offline_messages = AccessMaxOfflineMsgs} = State, - DBType = gen_mod:db_type(Host, ?MODULE), - Msgs = receive_all(UserServer, [Msg], DBType), - Len = length(Msgs), - MaxOfflineMsgs = get_max_user_messages(AccessMaxOfflineMsgs, - UserServer, Host), - store_offline_msg(Host, UserServer, Msgs, Len, MaxOfflineMsgs), - {noreply, State}; - -handle_info(_Info, State) -> - ?ERROR_MSG("got unexpected info: ~p", [_Info]), - {noreply, State}. - - -terminate(_Reason, State) -> - Host = State#state.host, +stop(Host) -> ejabberd_hooks:delete(offline_message_hook, Host, ?MODULE, store_packet, 50), ejabberd_hooks:delete(c2s_self_presence, Host, ?MODULE, c2s_self_presence, 50), @@ -229,41 +153,48 @@ terminate(_Reason, State) -> ?MODULE, webadmin_user, 50), ejabberd_hooks:delete(webadmin_user_parse_query, Host, ?MODULE, webadmin_user_parse_query, 50), - gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE), - ok. + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE). - -code_change(_OldVsn, State, _Extra) -> {ok, State}. - -store_offline_msg(Host, US, Msgs, Len, MaxOfflineMsgs) -> - Mod = gen_mod:db_mod(Host, ?MODULE), - case Mod:store_messages(Host, US, Msgs, Len, MaxOfflineMsgs) of - {atomic, discard} -> - discard_warn_sender(Msgs); - _ -> +reload(Host, NewOpts, OldOpts) -> + NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE), + OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE), + if NewMod /= OldMod -> + NewMod:init(Host, NewOpts); + true -> + ok + end, + case gen_mod:is_equal_opt(iqdisc, NewOpts, OldOpts, gen_iq_handler:iqdisc(Host)) of + {false, IQDisc, _} -> + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE, + ?MODULE, handle_offline_query, IQDisc); + true -> ok end. -get_max_user_messages(AccessRule, {User, Server}, Host) -> - case acl:match_rule( - Host, AccessRule, jid:make(User, Server)) of +-spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}. +store_offline_msg(#offline_msg{us = {User, Server}} = Msg) -> + Mod = gen_mod:db_mod(Server, ?MODULE), + case get_max_user_messages(User, Server) of + infinity -> + Mod:store_message(Msg); + Limit -> + Num = count_offline_messages(User, Server), + if Num < Limit -> + Mod:store_message(Msg); + true -> + {error, full} + end + end. + +get_max_user_messages(User, Server) -> + Access = gen_mod:get_module_opt(Server, ?MODULE, access_max_user_messages, + max_user_offline_messages), + case acl:match_rule(Server, Access, jid:make(User, Server)) of Max when is_integer(Max) -> Max; infinity -> infinity; _ -> ?MAX_USER_MESSAGES end. -receive_all(US, Msgs, DBType) -> - receive - #offline_msg{us = US} = Msg -> - receive_all(US, [Msg | Msgs], DBType) - after 0 -> - case DBType of - mnesia -> Msgs; - sql -> lists:reverse(Msgs); - riak -> Msgs - end - end. - get_sm_features(Acc, _From, _To, <<"">>, _Lang) -> Feats = case Acc of {result, I} -> I; @@ -484,14 +415,19 @@ store_packet({_Action, #message{from = From, to = To} = Packet} = Acc) -> NewPacket -> TimeStamp = p1_time_compat:timestamp(), Expire = find_x_expire(TimeStamp, NewPacket), - gen_mod:get_module_proc(To#jid.lserver, ?MODULE) ! - #offline_msg{us = {LUser, LServer}, - timestamp = TimeStamp, - expire = Expire, - from = From, - to = To, - packet = NewPacket}, - {offlined, NewPacket} + OffMsg = #offline_msg{us = {LUser, LServer}, + timestamp = TimeStamp, + expire = Expire, + from = From, + to = To, + packet = NewPacket}, + case store_offline_msg(OffMsg) of + ok -> + {offlined, NewPacket}; + {error, Reason} -> + discard_warn_sender(Packet, Reason), + stop + end end; _ -> Acc end; @@ -635,15 +571,18 @@ remove_user(User, Server) -> %% Helper functions: %% Warn senders that their messages have been discarded: -discard_warn_sender(Msgs) -> - lists:foreach( - fun(#offline_msg{packet = Packet}) -> - ErrText = <<"Your contact offline message queue is " - "full. The message has been discarded.">>, - Lang = xmpp:get_lang(Packet), - Err = xmpp:err_resource_constraint(ErrText, Lang), - ejabberd_router:route_error(Packet, Err) - end, Msgs). +-spec discard_warn_sender(message(), full | any()) -> ok. +discard_warn_sender(Packet, full) -> + ErrText = <<"Your contact offline message queue is " + "full. The message has been discarded.">>, + Lang = xmpp:get_lang(Packet), + Err = xmpp:err_resource_constraint(ErrText, Lang), + ejabberd_router:route_error(Packet, Err); +discard_warn_sender(Packet, _) -> + ErrText = <<"Database failure">>, + Lang = xmpp:get_lang(Packet), + Err = xmpp:err_internal_server_error(ErrText, Lang), + ejabberd_router:route_error(Packet, Err). webadmin_page(_, Host, #request{us = _US, path = [<<"user">>, U, <<"queue">>], @@ -790,11 +729,7 @@ get_queue_length(LUser, LServer) -> count_offline_messages(LUser, LServer). get_messages_subset(User, Host, MsgsAll) -> - Access = gen_mod:get_module_opt(Host, ?MODULE, access_max_user_messages, - max_user_offline_messages), - MaxOfflineMsgs = case get_max_user_messages(Access, - User, Host) - of + MaxOfflineMsgs = case get_max_user_messages(User, Host) of Number when is_integer(Number) -> Number; _ -> 100 end, diff --git a/src/mod_offline_mnesia.erl b/src/mod_offline_mnesia.erl index d0d0de418..a725ab003 100644 --- a/src/mod_offline_mnesia.erl +++ b/src/mod_offline_mnesia.erl @@ -26,7 +26,7 @@ -behaviour(mod_offline). --export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1, +-export([init/2, store_message/1, pop_messages/2, remove_expired_messages/1, remove_old_messages/2, remove_user/2, read_message_headers/2, read_message/3, remove_message/3, read_all_messages/2, remove_all_messages/2, count_messages/2, import/1]). @@ -36,8 +36,6 @@ -include("mod_offline.hrl"). -include("logger.hrl"). --define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000). - %%%=================================================================== %%% API %%%=================================================================== @@ -46,26 +44,9 @@ init(_Host, _Opts) -> [{disc_only_copies, [node()]}, {type, bag}, {attributes, record_info(fields, offline_msg)}]). -store_messages(_Host, US, Msgs, Len, MaxOfflineMsgs) -> - F = fun () -> - Count = if MaxOfflineMsgs =/= infinity -> - Len + count_mnesia_records(US); - true -> 0 - end, - if Count > MaxOfflineMsgs -> discard; - true -> - if Len >= (?OFFLINE_TABLE_LOCK_THRESHOLD) -> - mnesia:write_lock_table(offline_msg); - true -> ok - end, - lists:foreach( - fun(#offline_msg{packet = Pkt} = M) -> - El = xmpp:encode(Pkt), - mnesia:write(M#offline_msg{packet = El}) - end, Msgs) - end - end, - mnesia:transaction(F). +store_message(#offline_msg{packet = Pkt} = OffMsg) -> + El = xmpp:encode(Pkt), + mnesia:dirty_write(OffMsg#offline_msg{packet = El}). pop_messages(LUser, LServer) -> US = {LUser, LServer}, diff --git a/src/mod_offline_riak.erl b/src/mod_offline_riak.erl index ffc1450aa..5d0fd1af8 100644 --- a/src/mod_offline_riak.erl +++ b/src/mod_offline_riak.erl @@ -26,7 +26,7 @@ -behaviour(mod_offline). --export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1, +-export([init/2, store_message/1, pop_messages/2, remove_expired_messages/1, remove_old_messages/2, remove_user/2, read_message_headers/2, read_message/3, remove_message/3, read_all_messages/2, remove_all_messages/2, count_messages/2, import/1]). @@ -40,31 +40,11 @@ init(_Host, _Opts) -> ok. -store_messages(Host, {User, _}, Msgs, Len, MaxOfflineMsgs) -> - Count = if MaxOfflineMsgs =/= infinity -> - Len + count_messages(User, Host); - true -> 0 - end, - if - Count > MaxOfflineMsgs -> - {atomic, discard}; - true -> - try - lists:foreach( - fun(#offline_msg{us = US, - packet = Pkt, - timestamp = TS} = M) -> - El = xmpp:encode(Pkt), - ok = ejabberd_riak:put( - M#offline_msg{packet = El}, - offline_msg_schema(), - [{i, TS}, {'2i', [{<<"us">>, US}]}]) - end, Msgs), - {atomic, ok} - catch _:{badmatch, Err} -> - {atomic, Err} - end - end. +store_message(#offline_msg{us = US, packet = Pkt, timestamp = TS} = M) -> + El = xmpp:encode(Pkt), + ejabberd_riak:put(M#offline_msg{packet = El}, + offline_msg_schema(), + [{i, TS}, {'2i', [{<<"us">>, US}]}]). pop_messages(LUser, LServer) -> case ejabberd_riak:get_by_index(offline_msg, offline_msg_schema(), diff --git a/src/mod_offline_sql.erl b/src/mod_offline_sql.erl index a8c587679..48b32be81 100644 --- a/src/mod_offline_sql.erl +++ b/src/mod_offline_sql.erl @@ -28,7 +28,7 @@ -behaviour(mod_offline). --export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1, +-export([init/2, store_message/1, pop_messages/2, remove_expired_messages/1, remove_old_messages/2, remove_user/2, read_message_headers/2, read_message/3, remove_message/3, read_all_messages/2, remove_all_messages/2, count_messages/2, import/1, export/1]). @@ -44,30 +44,21 @@ init(_Host, _Opts) -> ok. -store_messages(Host, {User, _Server}, Msgs, Len, MaxOfflineMsgs) -> - Count = if MaxOfflineMsgs =/= infinity -> - Len + count_messages(User, Host); - true -> 0 - end, - if Count > MaxOfflineMsgs -> {atomic, discard}; - true -> - Query = lists:map( - fun(M) -> - LUser = (M#offline_msg.to)#jid.luser, - From = M#offline_msg.from, - To = M#offline_msg.to, - Packet = xmpp:set_from_to( - M#offline_msg.packet, From, To), - NewPacket = xmpp_util:add_delay_info( - Packet, jid:make(Host), - M#offline_msg.timestamp, - <<"Offline Storage">>), - XML = fxml:element_to_binary( - xmpp:encode(NewPacket)), - sql_queries:add_spool_sql(LUser, XML) - end, - Msgs), - sql_queries:add_spool(Host, Query) +store_message(#offline_msg{us = {LUser, LServer}} = M) -> + From = M#offline_msg.from, + To = M#offline_msg.to, + Packet = xmpp:set_from_to(M#offline_msg.packet, From, To), + NewPacket = xmpp_util:add_delay_info( + Packet, jid:make(LServer), + M#offline_msg.timestamp, + <<"Offline Storage">>), + XML = fxml:element_to_binary( + xmpp:encode(NewPacket)), + case sql_queries:add_spool(LUser, LServer, XML) of + {updated, _} -> + ok; + _ -> + {error, db_failure} end. pop_messages(LUser, LServer) -> diff --git a/src/prosody2ejabberd.erl b/src/prosody2ejabberd.erl index 072da0908..312a177be 100644 --- a/src/prosody2ejabberd.erl +++ b/src/prosody2ejabberd.erl @@ -185,15 +185,16 @@ convert_data(_Host, "config", _User, [Data]) -> convert_data(Host, "offline", User, [Data]) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Host), - Msgs = lists:flatmap( - fun({_, RawXML}) -> - case deserialize(RawXML) of - [El] -> el_to_offline_msg(LUser, LServer, El); - _ -> [] - end - end, Data), - mod_offline:store_offline_msg( - LServer, {LUser, LServer}, Msgs, length(Msgs), infinity); + lists:foreach( + fun({_, RawXML}) -> + case deserialize(RawXML) of + [El] -> + Msg = el_to_offline_msg(LUser, LServer, El), + ok = mod_offline:store_offline_msg(Msg); + _ -> + ok + end + end, Data); convert_data(Host, "privacy", User, [Data]) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Host), diff --git a/src/sql_queries.erl b/src/sql_queries.erl index 2f2e55863..0cf595bdf 100644 --- a/src/sql_queries.erl +++ b/src/sql_queries.erl @@ -37,7 +37,7 @@ set_password_scram_t/6, add_user/3, add_user_scram/6, del_user/2, del_user_return_password/3, list_users/1, list_users/2, users_number/1, users_number/2, - add_spool_sql/2, add_spool/2, get_and_del_spool_msg_t/2, + add_spool/3, get_and_del_spool_msg_t/2, del_spool_msg/2, get_roster/2, get_roster_jid_groups/2, get_roster_groups/3, del_user_roster_t/2, get_roster_by_jid/3, get_rostergroup_by_jid/3, @@ -273,11 +273,10 @@ users_number(LServer, [{prefix, Prefix}]) users_number(LServer, []) -> users_number(LServer). -add_spool_sql(LUser, XML) -> - ?SQL("insert into spool(username, xml) values (%(LUser)s, %(XML)s)"). - -add_spool(LServer, Queries) -> - ejabberd_sql:sql_transaction(LServer, Queries). +add_spool(LUser, LServer, XML) -> + ejabberd_sql:sql_query( + LServer, + ?SQL("insert into spool(username, xml) values (%(LUser)s, %(XML)s)")). get_and_del_spool_msg_t(LServer, LUser) -> F = fun () ->