24
1
mirror of https://github.com/processone/ejabberd.git synced 2024-06-08 21:43:07 +02:00

Don't store messages via a single process

This commit is contained in:
Evgeniy Khramtsov 2017-05-21 23:21:13 +03:00
parent 66a4e405e0
commit d88e4d495f
6 changed files with 106 additions and 219 deletions

View File

@ -33,14 +33,13 @@
-protocol({xep, 160, '1.0'}). -protocol({xep, 160, '1.0'}).
-protocol({xep, 334, '0.2'}). -protocol({xep, 334, '0.2'}).
-behaviour(gen_server).
-behaviour(gen_mod). -behaviour(gen_mod).
-export([start/2, -export([start/2,
stop/1, stop/1,
reload/3, reload/3,
store_packet/1, store_packet/1,
store_offline_msg/5, store_offline_msg/1,
c2s_self_presence/1, c2s_self_presence/1,
get_sm_features/5, get_sm_features/5,
get_sm_identity/5, get_sm_identity/5,
@ -64,9 +63,7 @@
webadmin_user/4, webadmin_user/4,
webadmin_user_parse_query/5]). webadmin_user_parse_query/5]).
-export([init/1, handle_call/3, handle_cast/2, -export([mod_opt_type/1, depends/2]).
handle_info/2, terminate/2, code_change/3,
mod_opt_type/1, depends/2]).
-deprecated({get_queue_length,2}). -deprecated({get_queue_length,2}).
@ -86,14 +83,11 @@
%% default value for the maximum number of user messages %% default value for the maximum number of user messages
-define(MAX_USER_MESSAGES, infinity). -define(MAX_USER_MESSAGES, infinity).
-type us() :: {binary(), binary()}.
-type c2s_state() :: ejabberd_c2s:state(). -type c2s_state() :: ejabberd_c2s:state().
-callback init(binary(), gen_mod:opts()) -> any(). -callback init(binary(), gen_mod:opts()) -> any().
-callback import(#offline_msg{}) -> ok. -callback import(#offline_msg{}) -> ok.
-callback store_messages(binary(), us(), [#offline_msg{}], -callback store_message(#offline_msg{}) -> ok | {error, any()}.
non_neg_integer(), non_neg_integer()) ->
{atomic, any()}.
-callback pop_messages(binary(), binary()) -> -callback pop_messages(binary(), binary()) ->
{ok, [#offline_msg{}]} | {error, any()}. {ok, [#offline_msg{}]} | {error, any()}.
-callback remove_expired_messages(binary()) -> {atomic, any()}. -callback remove_expired_messages(binary()) -> {atomic, any()}.
@ -108,25 +102,10 @@
-callback remove_all_messages(binary(), binary()) -> {atomic, any()}. -callback remove_all_messages(binary(), binary()) -> {atomic, any()}.
-callback count_messages(binary(), binary()) -> non_neg_integer(). -callback count_messages(binary(), binary()) -> non_neg_integer().
start(Host, Opts) ->
gen_mod:start_child(?MODULE, Host, Opts).
stop(Host) ->
gen_mod:stop_child(?MODULE, Host).
reload(Host, NewOpts, OldOpts) ->
Proc = gen_mod:get_module_proc(Host, ?MODULE),
gen_server:cast(Proc, {reload, NewOpts, OldOpts}).
depends(_Host, _Opts) -> depends(_Host, _Opts) ->
[]. [].
%%==================================================================== start(Host, Opts) ->
%% gen_server callbacks
%%====================================================================
init([Host, Opts]) ->
process_flag(trap_exit, true),
Mod = gen_mod:db_mod(Host, Opts, ?MODULE), Mod = gen_mod:db_mod(Host, Opts, ?MODULE),
Mod:init(Host, Opts), Mod:init(Host, Opts),
IQDisc = gen_mod:get_opt(iqdisc, Opts, gen_iq_handler:iqdisc(Host)), IQDisc = gen_mod:get_opt(iqdisc, Opts, gen_iq_handler:iqdisc(Host)),
@ -153,64 +132,9 @@ init([Host, Opts]) ->
ejabberd_hooks:add(webadmin_user_parse_query, Host, ejabberd_hooks:add(webadmin_user_parse_query, Host,
?MODULE, webadmin_user_parse_query, 50), ?MODULE, webadmin_user_parse_query, 50),
gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE, gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
?MODULE, handle_offline_query, IQDisc), ?MODULE, handle_offline_query, IQDisc).
AccessMaxOfflineMsgs =
gen_mod:get_opt(access_max_user_messages, Opts,
max_user_offline_messages),
{ok,
#state{host = Host,
access_max_offline_messages = AccessMaxOfflineMsgs}}.
stop(Host) ->
handle_call(stop, _From, State) ->
{stop, normal, ok, State}.
handle_cast({reload, NewOpts, OldOpts}, #state{host = Host} = State) ->
NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE),
OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE),
if NewMod /= OldMod ->
NewMod:init(Host, NewOpts);
true ->
ok
end,
case gen_mod:is_equal_opt(iqdisc, NewOpts, OldOpts, gen_iq_handler:iqdisc(Host)) of
{false, IQDisc, _} ->
gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
?MODULE, handle_offline_query, IQDisc);
true ->
ok
end,
case gen_mod:is_equal_opt(access_max_user_messages, NewOpts, OldOpts,
max_user_offline_messages) of
{false, AccessMaxOfflineMsgs, _} ->
{noreply,
State#state{access_max_offline_messages = AccessMaxOfflineMsgs}};
true ->
{noreply, State}
end;
handle_cast(Msg, State) ->
?WARNING_MSG("unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(#offline_msg{us = UserServer} = Msg, State) ->
#state{host = Host,
access_max_offline_messages = AccessMaxOfflineMsgs} = State,
DBType = gen_mod:db_type(Host, ?MODULE),
Msgs = receive_all(UserServer, [Msg], DBType),
Len = length(Msgs),
MaxOfflineMsgs = get_max_user_messages(AccessMaxOfflineMsgs,
UserServer, Host),
store_offline_msg(Host, UserServer, Msgs, Len, MaxOfflineMsgs),
{noreply, State};
handle_info(_Info, State) ->
?ERROR_MSG("got unexpected info: ~p", [_Info]),
{noreply, State}.
terminate(_Reason, State) ->
Host = State#state.host,
ejabberd_hooks:delete(offline_message_hook, Host, ejabberd_hooks:delete(offline_message_hook, Host,
?MODULE, store_packet, 50), ?MODULE, store_packet, 50),
ejabberd_hooks:delete(c2s_self_presence, Host, ?MODULE, c2s_self_presence, 50), ejabberd_hooks:delete(c2s_self_presence, Host, ?MODULE, c2s_self_presence, 50),
@ -229,41 +153,48 @@ terminate(_Reason, State) ->
?MODULE, webadmin_user, 50), ?MODULE, webadmin_user, 50),
ejabberd_hooks:delete(webadmin_user_parse_query, Host, ejabberd_hooks:delete(webadmin_user_parse_query, Host,
?MODULE, webadmin_user_parse_query, 50), ?MODULE, webadmin_user_parse_query, 50),
gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE), gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE).
ok.
reload(Host, NewOpts, OldOpts) ->
code_change(_OldVsn, State, _Extra) -> {ok, State}. NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE),
OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE),
store_offline_msg(Host, US, Msgs, Len, MaxOfflineMsgs) -> if NewMod /= OldMod ->
Mod = gen_mod:db_mod(Host, ?MODULE), NewMod:init(Host, NewOpts);
case Mod:store_messages(Host, US, Msgs, Len, MaxOfflineMsgs) of true ->
{atomic, discard} -> ok
discard_warn_sender(Msgs); end,
_ -> case gen_mod:is_equal_opt(iqdisc, NewOpts, OldOpts, gen_iq_handler:iqdisc(Host)) of
{false, IQDisc, _} ->
gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
?MODULE, handle_offline_query, IQDisc);
true ->
ok ok
end. end.
get_max_user_messages(AccessRule, {User, Server}, Host) -> -spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}.
case acl:match_rule( store_offline_msg(#offline_msg{us = {User, Server}} = Msg) ->
Host, AccessRule, jid:make(User, Server)) of Mod = gen_mod:db_mod(Server, ?MODULE),
case get_max_user_messages(User, Server) of
infinity ->
Mod:store_message(Msg);
Limit ->
Num = count_offline_messages(User, Server),
if Num < Limit ->
Mod:store_message(Msg);
true ->
{error, full}
end
end.
get_max_user_messages(User, Server) ->
Access = gen_mod:get_module_opt(Server, ?MODULE, access_max_user_messages,
max_user_offline_messages),
case acl:match_rule(Server, Access, jid:make(User, Server)) of
Max when is_integer(Max) -> Max; Max when is_integer(Max) -> Max;
infinity -> infinity; infinity -> infinity;
_ -> ?MAX_USER_MESSAGES _ -> ?MAX_USER_MESSAGES
end. end.
receive_all(US, Msgs, DBType) ->
receive
#offline_msg{us = US} = Msg ->
receive_all(US, [Msg | Msgs], DBType)
after 0 ->
case DBType of
mnesia -> Msgs;
sql -> lists:reverse(Msgs);
riak -> Msgs
end
end.
get_sm_features(Acc, _From, _To, <<"">>, _Lang) -> get_sm_features(Acc, _From, _To, <<"">>, _Lang) ->
Feats = case Acc of Feats = case Acc of
{result, I} -> I; {result, I} -> I;
@ -484,14 +415,19 @@ store_packet({_Action, #message{from = From, to = To} = Packet} = Acc) ->
NewPacket -> NewPacket ->
TimeStamp = p1_time_compat:timestamp(), TimeStamp = p1_time_compat:timestamp(),
Expire = find_x_expire(TimeStamp, NewPacket), Expire = find_x_expire(TimeStamp, NewPacket),
gen_mod:get_module_proc(To#jid.lserver, ?MODULE) ! OffMsg = #offline_msg{us = {LUser, LServer},
#offline_msg{us = {LUser, LServer},
timestamp = TimeStamp, timestamp = TimeStamp,
expire = Expire, expire = Expire,
from = From, from = From,
to = To, to = To,
packet = NewPacket}, packet = NewPacket},
{offlined, NewPacket} case store_offline_msg(OffMsg) of
ok ->
{offlined, NewPacket};
{error, Reason} ->
discard_warn_sender(Packet, Reason),
stop
end
end; end;
_ -> Acc _ -> Acc
end; end;
@ -635,15 +571,18 @@ remove_user(User, Server) ->
%% Helper functions: %% Helper functions:
%% Warn senders that their messages have been discarded: %% Warn senders that their messages have been discarded:
discard_warn_sender(Msgs) -> -spec discard_warn_sender(message(), full | any()) -> ok.
lists:foreach( discard_warn_sender(Packet, full) ->
fun(#offline_msg{packet = Packet}) ->
ErrText = <<"Your contact offline message queue is " ErrText = <<"Your contact offline message queue is "
"full. The message has been discarded.">>, "full. The message has been discarded.">>,
Lang = xmpp:get_lang(Packet), Lang = xmpp:get_lang(Packet),
Err = xmpp:err_resource_constraint(ErrText, Lang), Err = xmpp:err_resource_constraint(ErrText, Lang),
ejabberd_router:route_error(Packet, Err) ejabberd_router:route_error(Packet, Err);
end, Msgs). discard_warn_sender(Packet, _) ->
ErrText = <<"Database failure">>,
Lang = xmpp:get_lang(Packet),
Err = xmpp:err_internal_server_error(ErrText, Lang),
ejabberd_router:route_error(Packet, Err).
webadmin_page(_, Host, webadmin_page(_, Host,
#request{us = _US, path = [<<"user">>, U, <<"queue">>], #request{us = _US, path = [<<"user">>, U, <<"queue">>],
@ -790,11 +729,7 @@ get_queue_length(LUser, LServer) ->
count_offline_messages(LUser, LServer). count_offline_messages(LUser, LServer).
get_messages_subset(User, Host, MsgsAll) -> get_messages_subset(User, Host, MsgsAll) ->
Access = gen_mod:get_module_opt(Host, ?MODULE, access_max_user_messages, MaxOfflineMsgs = case get_max_user_messages(User, Host) of
max_user_offline_messages),
MaxOfflineMsgs = case get_max_user_messages(Access,
User, Host)
of
Number when is_integer(Number) -> Number; Number when is_integer(Number) -> Number;
_ -> 100 _ -> 100
end, end,

View File

@ -26,7 +26,7 @@
-behaviour(mod_offline). -behaviour(mod_offline).
-export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1, -export([init/2, store_message/1, pop_messages/2, remove_expired_messages/1,
remove_old_messages/2, remove_user/2, read_message_headers/2, remove_old_messages/2, remove_user/2, read_message_headers/2,
read_message/3, remove_message/3, read_all_messages/2, read_message/3, remove_message/3, read_all_messages/2,
remove_all_messages/2, count_messages/2, import/1]). remove_all_messages/2, count_messages/2, import/1]).
@ -36,8 +36,6 @@
-include("mod_offline.hrl"). -include("mod_offline.hrl").
-include("logger.hrl"). -include("logger.hrl").
-define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000).
%%%=================================================================== %%%===================================================================
%%% API %%% API
%%%=================================================================== %%%===================================================================
@ -46,26 +44,9 @@ init(_Host, _Opts) ->
[{disc_only_copies, [node()]}, {type, bag}, [{disc_only_copies, [node()]}, {type, bag},
{attributes, record_info(fields, offline_msg)}]). {attributes, record_info(fields, offline_msg)}]).
store_messages(_Host, US, Msgs, Len, MaxOfflineMsgs) -> store_message(#offline_msg{packet = Pkt} = OffMsg) ->
F = fun () ->
Count = if MaxOfflineMsgs =/= infinity ->
Len + count_mnesia_records(US);
true -> 0
end,
if Count > MaxOfflineMsgs -> discard;
true ->
if Len >= (?OFFLINE_TABLE_LOCK_THRESHOLD) ->
mnesia:write_lock_table(offline_msg);
true -> ok
end,
lists:foreach(
fun(#offline_msg{packet = Pkt} = M) ->
El = xmpp:encode(Pkt), El = xmpp:encode(Pkt),
mnesia:write(M#offline_msg{packet = El}) mnesia:dirty_write(OffMsg#offline_msg{packet = El}).
end, Msgs)
end
end,
mnesia:transaction(F).
pop_messages(LUser, LServer) -> pop_messages(LUser, LServer) ->
US = {LUser, LServer}, US = {LUser, LServer},

View File

@ -26,7 +26,7 @@
-behaviour(mod_offline). -behaviour(mod_offline).
-export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1, -export([init/2, store_message/1, pop_messages/2, remove_expired_messages/1,
remove_old_messages/2, remove_user/2, read_message_headers/2, remove_old_messages/2, remove_user/2, read_message_headers/2,
read_message/3, remove_message/3, read_all_messages/2, read_message/3, remove_message/3, read_all_messages/2,
remove_all_messages/2, count_messages/2, import/1]). remove_all_messages/2, count_messages/2, import/1]).
@ -40,31 +40,11 @@
init(_Host, _Opts) -> init(_Host, _Opts) ->
ok. ok.
store_messages(Host, {User, _}, Msgs, Len, MaxOfflineMsgs) -> store_message(#offline_msg{us = US, packet = Pkt, timestamp = TS} = M) ->
Count = if MaxOfflineMsgs =/= infinity ->
Len + count_messages(User, Host);
true -> 0
end,
if
Count > MaxOfflineMsgs ->
{atomic, discard};
true ->
try
lists:foreach(
fun(#offline_msg{us = US,
packet = Pkt,
timestamp = TS} = M) ->
El = xmpp:encode(Pkt), El = xmpp:encode(Pkt),
ok = ejabberd_riak:put( ejabberd_riak:put(M#offline_msg{packet = El},
M#offline_msg{packet = El},
offline_msg_schema(), offline_msg_schema(),
[{i, TS}, {'2i', [{<<"us">>, US}]}]) [{i, TS}, {'2i', [{<<"us">>, US}]}]).
end, Msgs),
{atomic, ok}
catch _:{badmatch, Err} ->
{atomic, Err}
end
end.
pop_messages(LUser, LServer) -> pop_messages(LUser, LServer) ->
case ejabberd_riak:get_by_index(offline_msg, offline_msg_schema(), case ejabberd_riak:get_by_index(offline_msg, offline_msg_schema(),

View File

@ -28,7 +28,7 @@
-behaviour(mod_offline). -behaviour(mod_offline).
-export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1, -export([init/2, store_message/1, pop_messages/2, remove_expired_messages/1,
remove_old_messages/2, remove_user/2, read_message_headers/2, remove_old_messages/2, remove_user/2, read_message_headers/2,
read_message/3, remove_message/3, read_all_messages/2, read_message/3, remove_message/3, read_all_messages/2,
remove_all_messages/2, count_messages/2, import/1, export/1]). remove_all_messages/2, count_messages/2, import/1, export/1]).
@ -44,30 +44,21 @@
init(_Host, _Opts) -> init(_Host, _Opts) ->
ok. ok.
store_messages(Host, {User, _Server}, Msgs, Len, MaxOfflineMsgs) -> store_message(#offline_msg{us = {LUser, LServer}} = M) ->
Count = if MaxOfflineMsgs =/= infinity ->
Len + count_messages(User, Host);
true -> 0
end,
if Count > MaxOfflineMsgs -> {atomic, discard};
true ->
Query = lists:map(
fun(M) ->
LUser = (M#offline_msg.to)#jid.luser,
From = M#offline_msg.from, From = M#offline_msg.from,
To = M#offline_msg.to, To = M#offline_msg.to,
Packet = xmpp:set_from_to( Packet = xmpp:set_from_to(M#offline_msg.packet, From, To),
M#offline_msg.packet, From, To),
NewPacket = xmpp_util:add_delay_info( NewPacket = xmpp_util:add_delay_info(
Packet, jid:make(Host), Packet, jid:make(LServer),
M#offline_msg.timestamp, M#offline_msg.timestamp,
<<"Offline Storage">>), <<"Offline Storage">>),
XML = fxml:element_to_binary( XML = fxml:element_to_binary(
xmpp:encode(NewPacket)), xmpp:encode(NewPacket)),
sql_queries:add_spool_sql(LUser, XML) case sql_queries:add_spool(LUser, LServer, XML) of
end, {updated, _} ->
Msgs), ok;
sql_queries:add_spool(Host, Query) _ ->
{error, db_failure}
end. end.
pop_messages(LUser, LServer) -> pop_messages(LUser, LServer) ->

View File

@ -185,15 +185,16 @@ convert_data(_Host, "config", _User, [Data]) ->
convert_data(Host, "offline", User, [Data]) -> convert_data(Host, "offline", User, [Data]) ->
LUser = jid:nodeprep(User), LUser = jid:nodeprep(User),
LServer = jid:nameprep(Host), LServer = jid:nameprep(Host),
Msgs = lists:flatmap( lists:foreach(
fun({_, RawXML}) -> fun({_, RawXML}) ->
case deserialize(RawXML) of case deserialize(RawXML) of
[El] -> el_to_offline_msg(LUser, LServer, El); [El] ->
_ -> [] Msg = el_to_offline_msg(LUser, LServer, El),
ok = mod_offline:store_offline_msg(Msg);
_ ->
ok
end end
end, Data), end, Data);
mod_offline:store_offline_msg(
LServer, {LUser, LServer}, Msgs, length(Msgs), infinity);
convert_data(Host, "privacy", User, [Data]) -> convert_data(Host, "privacy", User, [Data]) ->
LUser = jid:nodeprep(User), LUser = jid:nodeprep(User),
LServer = jid:nameprep(Host), LServer = jid:nameprep(Host),

View File

@ -37,7 +37,7 @@
set_password_scram_t/6, add_user/3, add_user_scram/6, set_password_scram_t/6, add_user/3, add_user_scram/6,
del_user/2, del_user_return_password/3, list_users/1, del_user/2, del_user_return_password/3, list_users/1,
list_users/2, users_number/1, users_number/2, list_users/2, users_number/1, users_number/2,
add_spool_sql/2, add_spool/2, get_and_del_spool_msg_t/2, add_spool/3, get_and_del_spool_msg_t/2,
del_spool_msg/2, get_roster/2, get_roster_jid_groups/2, del_spool_msg/2, get_roster/2, get_roster_jid_groups/2,
get_roster_groups/3, del_user_roster_t/2, get_roster_groups/3, del_user_roster_t/2,
get_roster_by_jid/3, get_rostergroup_by_jid/3, get_roster_by_jid/3, get_rostergroup_by_jid/3,
@ -273,11 +273,10 @@ users_number(LServer, [{prefix, Prefix}])
users_number(LServer, []) -> users_number(LServer, []) ->
users_number(LServer). users_number(LServer).
add_spool_sql(LUser, XML) -> add_spool(LUser, LServer, XML) ->
?SQL("insert into spool(username, xml) values (%(LUser)s, %(XML)s)"). ejabberd_sql:sql_query(
LServer,
add_spool(LServer, Queries) -> ?SQL("insert into spool(username, xml) values (%(LUser)s, %(XML)s)")).
ejabberd_sql:sql_transaction(LServer, Queries).
get_and_del_spool_msg_t(LServer, LUser) -> get_and_del_spool_msg_t(LServer, LUser) ->
F = fun () -> F = fun () ->