Rewrite ejabberd system monitor

Previous version was inefficient: it had a lot of false positives
along with a lot of false negatives, making its usage pointless.
The new verion is based on memsup(3erl) application: the OOM watchdog is
only started when total OS memory consumption is more than 80%.
A watchdog periodically inspects all running processes and collects
statistics about overloaded ones (those queueing a lot of messages).
If the OOM killer is enabled (`oom_killer: true`), all overloaded
processes would be killed. By default, OOM killer is enabled.
When memory consumption is back to normal, the OOM watchdog is stopped.
This commit is contained in:
Evgeniy Khramtsov 2017-12-17 18:52:37 +03:00
parent 4352cb9fed
commit 515f8b22c0
3 changed files with 246 additions and 294 deletions

View File

@ -50,6 +50,7 @@ start(normal, _Args) ->
ejabberd_mnesia:start(), ejabberd_mnesia:start(),
file_queue_init(), file_queue_init(),
maybe_add_nameservers(), maybe_add_nameservers(),
ejabberd_system_monitor:start(),
case ejabberd_sup:start_link() of case ejabberd_sup:start_link() of
{ok, SupPid} -> {ok, SupPid} ->
register_elixir_config_hooks(), register_elixir_config_hooks(),

View File

@ -47,13 +47,6 @@ init([]) ->
5000, 5000,
worker, worker,
[ejabberd_cluster]}, [ejabberd_cluster]},
SystemMonitor =
{ejabberd_system_monitor,
{ejabberd_system_monitor, start_link, []},
permanent,
brutal_kill,
worker,
[ejabberd_system_monitor]},
S2S = S2S =
{ejabberd_s2s, {ejabberd_s2s,
{ejabberd_s2s, start_link, []}, {ejabberd_s2s, start_link, []},
@ -172,7 +165,6 @@ init([]) ->
PKIX, PKIX,
ACME, ACME,
Listener, Listener,
SystemMonitor,
S2S, S2S,
Captcha, Captcha,
S2SInSupervisor, S2SInSupervisor,

View File

@ -24,320 +24,279 @@
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
-module(ejabberd_system_monitor). -module(ejabberd_system_monitor).
-behaviour(gen_event).
-behaviour(ejabberd_config). -behaviour(ejabberd_config).
-author('alexey@process-one.net'). -author('alexey@process-one.net').
-author('ekhramtsov@process-one.net').
-behaviour(gen_server).
%% API %% API
-export([start_link/0, process_command/1, register_hook/1, -export([start/0, opt_type/1]).
unregister_hook/1, process_remote_command/1]).
-export([init/1, handle_call/3, handle_cast/2, %% gen_event callbacks
handle_info/2, terminate/2, code_change/3, opt_type/1]). -export([init/1, handle_event/2, handle_call/2,
handle_info/2, terminate/2, code_change/3]).
-include("ejabberd.hrl"). %% We don't use ejabberd logger because lager can be overloaded
-include("logger.hrl"). %% too and alarm_handler may get stuck.
%%-include("logger.hrl").
-include("xmpp.hrl"). -define(CHECK_INTERVAL, timer:seconds(30)).
-record(state, {tref :: reference(),
mref :: reference()}).
-record(proc_stat, {qlen :: non_neg_integer(),
memory :: non_neg_integer(),
initial_call :: mfa(),
current_function :: mfa(),
ancestors :: [pid() | atom()],
application :: pid() | atom(),
name :: pid() | atom()}).
-type state() :: #state{}.
-type proc_stat() :: #proc_stat{}.
-record(state, {}). %%%===================================================================
%%% API
%%%===================================================================
-spec start() -> ok.
start() ->
gen_event:add_handler(alarm_handler, ?MODULE, []),
application:load(os_mon),
application:set_env(os_mon, start_cpu_sup, false),
application:set_env(os_mon, start_disksup, false),
application:set_env(os_mon, start_os_sup, false),
application:set_env(os_mon, start_memsup, true),
ejabberd:start_app(os_mon).
%%==================================================================== excluded_apps() ->
%% API %% FIXME: lager gets overloaded very often, but
%%==================================================================== %% it fails to get recovered after brutal kill,
%%-------------------------------------------------------------------- %% so it's better to make it tolerate crashes
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} %% rather than ignoring the overload
%% Description: Starts the server [os_mon, mnesia, sasl, stdlib, kernel, lager].
%%--------------------------------------------------------------------
start_link() ->
LH = ejabberd_config:get_option(watchdog_large_heap, 1000000),
Opts = [{large_heap, LH}],
gen_server:start_link({local, ?MODULE}, ?MODULE, Opts,
[]).
-spec process_command(stanza()) -> ok. %%%===================================================================
process_command(#message{from = From, to = To, body = Body}) -> %%% gen_event callbacks
case To of %%%===================================================================
#jid{luser = <<"">>, lresource = <<"watchdog">>} -> init([]) ->
LFrom = jid:tolower(jid:remove_resource(From)),
case lists:member(LFrom, get_admin_jids()) of
true ->
BodyText = xmpp:get_text(Body),
spawn(fun () ->
process_flag(priority, high),
process_command1(From, To, BodyText)
end),
ok;
false -> ok
end;
_ ->
ok
end;
process_command(_) ->
ok.
register_hook(Host) ->
ejabberd_hooks:add(local_send_to_resource_hook, Host,
?MODULE, process_command, 50).
unregister_hook(Host) ->
ejabberd_hooks:delete(local_send_to_resource_hook, Host,
?MODULE, process_command, 50).
%%====================================================================
%% gen_server callbacks
%%====================================================================
%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
init(Opts) ->
LH = proplists:get_value(large_heap, Opts),
process_flag(priority, high),
erlang:system_monitor(self(), [{large_heap, LH}]),
ejabberd_hooks:add(host_up, ?MODULE, register_hook, 50),
ejabberd_hooks:add(host_down, ?MODULE, unregister_hook, 60),
lists:foreach(fun register_hook/1, ?MYHOSTS),
{ok, #state{}}. {ok, #state{}}.
%%-------------------------------------------------------------------- handle_event({set_alarm, {system_memory_high_watermark, _}}, State) ->
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | error_logger:warning_msg(
%% {reply, Reply, State, Timeout} | "More than 80% of OS memory is allocated, "
%% {noreply, State} | "starting OOM watchdog", []),
%% {noreply, State, Timeout} | {ok, handle_overload(State)};
%% {stop, Reason, Reply, State} | handle_event({clear_alarm, system_memory_high_watermark}, State) ->
%% {stop, Reason, State} cancel_timer(State#state.tref),
%% Description: Handling call messages error_logger:info_msg(
%%-------------------------------------------------------------------- "Memory consumption is back to normal, "
handle_call({get, large_heap}, _From, State) -> "stopping OOM watchdog", []),
{reply, get_large_heap(), State}; {ok, State#state{tref = undefined}};
handle_call({set, large_heap, NewValue}, _From, handle_event(Event, State) ->
State) -> error_logger:warning_msg("unexpected event: ~p", [Event]),
MonSettings = erlang:system_monitor(self(), {ok, State}.
[{large_heap, NewValue}]),
OldLH = get_large_heap(MonSettings),
NewLH = get_large_heap(),
{reply, {lh_changed, OldLH, NewLH}, State};
handle_call(_Request, _From, State) ->
Reply = ok, {reply, Reply, State}.
get_large_heap() -> handle_call(_Request, State) ->
MonSettings = erlang:system_monitor(), {ok, {error, badarg}, State}.
get_large_heap(MonSettings).
get_large_heap(MonSettings) -> handle_info({timeout, _TRef, handle_overload}, State) ->
{_MonitorPid, Options} = MonSettings, {ok, handle_overload(State)};
proplists:get_value(large_heap, Options). handle_info(Info, State) ->
error_logger:warning_msg("unexpected info: ~p", [Info]),
{ok, State}.
%%-------------------------------------------------------------------- terminate(_Reason, _State) ->
%% Function: handle_cast(Msg, State) -> {noreply, State} | ok.
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast(_Msg, State) -> {noreply, State}.
%%-------------------------------------------------------------------- code_change(_OldVsn, State, _Extra) ->
%% Function: handle_info(Info, State) -> {noreply, State} | {ok, State}.
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info({monitor, Pid, large_heap, Info}, State) ->
spawn(fun () ->
process_flag(priority, high),
process_large_heap(Pid, Info)
end),
{noreply, State};
handle_info(_Info, State) -> {noreply, State}.
%%-------------------------------------------------------------------- %%%===================================================================
%% Function: terminate(Reason, State) -> void()
%% Description: This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, _State) -> ok.
%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) -> {ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions %%% Internal functions
%%-------------------------------------------------------------------- %%%===================================================================
-spec handle_overload(state()) -> state().
handle_overload(State) ->
AllProcs = processes(),
AppPids = get_app_pids(),
{TotalMsgs, ProcsNum, Apps, Stats} = overloaded_procs(AppPids, AllProcs),
if TotalMsgs >= 10000 ->
SortedStats = lists:reverse(lists:keysort(#proc_stat.qlen, Stats)),
error_logger:warning_msg(
"The system is overloaded with ~b messages "
"queued by ~b process(es) (~b%) "
"from the following applications: ~s; "
"the top processes are:~n~s",
[TotalMsgs, ProcsNum,
round(ProcsNum*100/length(AllProcs)),
format_apps(Apps),
format_top_procs(SortedStats)]),
kill(SortedStats, round(TotalMsgs/ProcsNum));
true ->
ok
end,
lists:foreach(fun erlang:garbage_collect/1, AllProcs),
restart_timer(State).
process_large_heap(Pid, Info) -> -spec get_app_pids() -> map().
Host = (?MYNAME), get_app_pids() ->
JIDs = get_admin_jids(), try application:info() of
DetailedInfo = detailed_info(Pid), Info ->
Body = str:format("(~w) The process ~w is consuming too " case lists:keyfind(running, 1, Info) of
"much memory:~n~p~n~s", {_, Apps} ->
[node(), Pid, Info, DetailedInfo]), lists:foldl(
From = jid:make(<<"">>, Host, <<"watchdog">>), fun({Name, Pid}, M) when is_pid(Pid) ->
Hint = [#hint{type = 'no-permanent-store'}], maps:put(Pid, Name, M);
lists:foreach( (_, M) ->
fun(JID) -> M
send_message(From, jid:make(JID), Body, Hint) end, #{}, Apps);
end, JIDs). false ->
#{}
send_message(From, To, Body) -> end
send_message(From, To, Body, []). catch _:_ ->
#{}
send_message(From, To, Body, ExtraEls) ->
ejabberd_router:route(#message{type = chat,
from = From,
to = To,
body = xmpp:mk_text(Body),
sub_els = ExtraEls}).
get_admin_jids() ->
ejabberd_config:get_option(watchdog_admins, []).
detailed_info(Pid) ->
case process_info(Pid, dictionary) of
{dictionary, Dict} ->
case lists:keysearch('$ancestors', 1, Dict) of
{value, {'$ancestors', [Sup | _]}} ->
case Sup of
ejabberd_c2s_sup -> c2s_info(Pid);
ejabberd_s2s_out_sup -> s2s_out_info(Pid);
ejabberd_service_sup -> service_info(Pid);
_ -> detailed_info1(Pid)
end;
_ -> detailed_info1(Pid)
end;
_ -> detailed_info1(Pid)
end. end.
detailed_info1(Pid) -> -spec overloaded_procs(map(), [pid()])
io_lib:format("~p", -> {non_neg_integer(), non_neg_integer(), dict:dict(), [proc_stat()]}.
[[process_info(Pid, current_function), overloaded_procs(AppPids, AllProcs) ->
process_info(Pid, initial_call), lists:foldl(
process_info(Pid, message_queue_len), fun(Pid, {TotalMsgs, ProcsNum, Apps, Stats}) ->
process_info(Pid, links), process_info(Pid, dictionary), case proc_stat(Pid, AppPids) of
process_info(Pid, heap_size), #proc_stat{qlen = QLen, application = App} = Stat
process_info(Pid, stack_size)]]). when QLen > 0 ->
{TotalMsgs + QLen, ProcsNum + 1,
dict:update_counter(App, QLen, Apps),
[Stat|Stats]};
_ ->
{TotalMsgs, ProcsNum, Apps, Stats}
end
end, {0, 0, dict:new(), []}, AllProcs).
c2s_info(Pid) -> -spec proc_stat(pid(), map()) -> proc_stat() | undefined.
[<<"Process type: c2s">>, check_send_queue(Pid), proc_stat(Pid, AppPids) ->
<<"\n">>, case process_info(Pid, [message_queue_len,
io_lib:format("Command to kill this process: kill ~s ~w", memory,
[iolist_to_binary(atom_to_list(node())), Pid])]. initial_call,
current_function,
s2s_out_info(Pid) -> dictionary,
FromTo = mnesia:dirty_select(s2s, group_leader,
[{{s2s, '$1', Pid, '_'}, [], ['$1']}]), registered_name]) of
[<<"Process type: s2s_out">>, [{_, MsgLen}, {_, Mem}, {_, InitCall},
case FromTo of {_, CurrFun}, {_, Dict}, {_, GL}, {_, Name}] ->
[{From, To}] -> IntLen = proplists:get_value('$internal_queue_len', Dict, 0),
<<"\n", TrueInitCall = proplists:get_value('$initial_call', Dict, InitCall),
(io_lib:format("S2S connection: from ~s to ~s", Ancestors = proplists:get_value('$ancestors', Dict, []),
[From, To]))/binary>>; Len = IntLen + MsgLen,
_ -> <<"">> App = maps:get(GL, AppPids, kernel),
end, RegName = case Name of
check_send_queue(Pid), <<"\n">>, [] -> Pid;
io_lib:format("Command to kill this process: kill ~s ~w", _ -> Name
[iolist_to_binary(atom_to_list(node())), Pid])]. end,
#proc_stat{qlen = Len,
service_info(Pid) -> memory = Mem,
Routes = mnesia:dirty_select(route, initial_call = TrueInitCall,
[{{route, '$1', Pid, '_'}, [], ['$1']}]), current_function = CurrFun,
[<<"Process type: s2s_out">>, ancestors = Ancestors,
case Routes of application = App,
[Route] -> <<"\nServiced domain: ", Route/binary>>; name = RegName};
_ -> <<"">> _ ->
end, undefined
check_send_queue(Pid), <<"\n">>,
io_lib:format("Command to kill this process: kill ~s ~w",
[iolist_to_binary(atom_to_list(node())), Pid])].
check_send_queue(Pid) ->
case {process_info(Pid, current_function),
process_info(Pid, message_queue_len)}
of
{{current_function, MFA}, {message_queue_len, MLen}} ->
if MLen > 100 ->
case MFA of
{prim_inet, send, 2} ->
<<"\nPossible reason: the process is blocked "
"trying to send data over its TCP connection.">>;
{M, F, A} ->
[<<"\nPossible reason: the process can't "
"process messages faster than they arrive. ">>,
io_lib:format("Current function is ~w:~w/~w",
[M, F, A])]
end;
true -> <<"">>
end;
_ -> <<"">>
end. end.
process_command1(From, To, Body) -> -spec restart_timer(#state{}) -> #state{}.
process_command2(str:tokens(Body, <<" ">>), From, To). restart_timer(State) ->
cancel_timer(State#state.tref),
TRef = erlang:start_timer(?CHECK_INTERVAL, self(), handle_overload),
State#state{tref = TRef}.
process_command2([<<"kill">>, SNode, SPid], From, To) -> -spec cancel_timer(reference()) -> ok.
Node = misc:binary_to_atom(SNode), cancel_timer(undefined) ->
remote_command(Node, [kill, SPid], From, To); ok;
process_command2([<<"showlh">>, SNode], From, To) -> cancel_timer(TRef) ->
Node = misc:binary_to_atom(SNode), case erlang:cancel_timer(TRef) of
remote_command(Node, [showlh], From, To); false ->
process_command2([<<"setlh">>, SNode, NewValueString], receive {timeout, TRef, _} -> ok
From, To) -> after 0 -> ok
Node = misc:binary_to_atom(SNode), end;
NewValue = binary_to_integer(NewValueString), _ ->
remote_command(Node, [setlh, NewValue], From, To); ok
process_command2([<<"help">>], From, To) -> end.
send_message(To, From, help());
process_command2(_, From, To) ->
send_message(To, From, help()).
help() -> -spec format_apps(dict:dict()) -> io:data().
<<"Commands:\n kill <node> <pid>\n showlh " format_apps(Apps) ->
"<node>\n setlh <node> <integer>">>. AppList = lists:reverse(lists:keysort(2, dict:to_list(Apps))),
string:join(
[io_lib:format("~p (~b msgs)", [App, Msgs]) || {App, Msgs} <- AppList],
", ").
remote_command(Node, Args, From, To) -> -spec format_top_procs([proc_stat()]) -> io:data().
Message = case ejabberd_cluster:call(Node, ?MODULE, format_top_procs(Stats) ->
process_remote_command, [Args]) Stats1 = lists:sublist(Stats, 5),
of string:join(lists:map(fun format_proc/1, Stats1), io_lib:nl()).
{badrpc, Reason} ->
io_lib:format("Command failed:~n~p", [Reason]);
Result -> Result
end,
send_message(To, From, iolist_to_binary(Message)).
process_remote_command([kill, SPid]) -> -spec format_proc(proc_stat()) -> io:data().
exit(list_to_pid(SPid), kill), <<"ok">>; format_proc(#proc_stat{qlen = Len, memory = Mem, initial_call = InitCall,
process_remote_command([showlh]) -> current_function = CurrFun, ancestors = Ancs,
Res = gen_server:call(ejabberd_system_monitor, application = App, name = Name}) ->
{get, large_heap}), io_lib:format(
io_lib:format("Current large heap: ~p", [Res]); "** ~w: msgs = ~b, memory = ~b, initial_call = ~s, "
process_remote_command([setlh, NewValue]) -> "current_function = ~s, ancestors = ~w, application = ~w",
{lh_changed, OldLH, NewLH} = [Name, Len, Mem, format_mfa(InitCall), format_mfa(CurrFun), Ancs, App]).
gen_server:call(ejabberd_system_monitor,
{set, large_heap, NewValue}),
io_lib:format("Result of set large heap: ~p --> ~p",
[OldLH, NewLH]);
process_remote_command(_) -> throw(unknown_command).
-spec opt_type(watchdog_admins) -> fun(([binary()]) -> ljid()); -spec format_mfa(mfa()) -> io:data().
(watchdog_large_heap) -> fun((pos_integer()) -> pos_integer()); format_mfa({M, F, A}) when is_atom(M), is_atom(F), is_integer(A) ->
io_lib:format("~s:~s/~b", [M, F, A]);
format_mfa(WTF) ->
io_lib:format("~w", [WTF]).
-spec kill([proc_stat()], non_neg_integer()) -> ok.
kill(Stats, Threshold) ->
case ejabberd_config:get_option(oom_killer, true) of
true ->
do_kill(Stats, Threshold);
false ->
ok
end.
-spec do_kill([proc_stat()], non_neg_integer()) -> ok.
do_kill(Stats, Threshold) ->
Killed = lists:filtermap(
fun(#proc_stat{qlen = Len, name = Name, application = App})
when Len >= Threshold ->
case lists:member(App, excluded_apps()) of
true ->
error_logger:warning_msg(
"Unable to kill process ~p from whitelisted "
"application ~p", [Name, App]),
false;
false ->
case kill_proc(Name) of
false -> false;
Pid -> {true, Pid}
end
end;
(_) ->
false
end, Stats),
TotalKilled = length(Killed),
if TotalKilled > 0 ->
error_logger:error_msg(
"Killed ~b process(es) consuming more than ~b message(s) each",
[TotalKilled, Threshold]);
true ->
ok
end.
-spec kill_proc(pid() | atom()) -> false | pid().
kill_proc(undefined) ->
false;
kill_proc(Name) when is_atom(Name) ->
kill_proc(whereis(Name));
kill_proc(Pid) ->
exit(Pid, kill),
Pid.
-spec opt_type(oom_killer) -> fun((boolean()) -> boolean());
(atom()) -> [atom()]. (atom()) -> [atom()].
opt_type(watchdog_admins) -> opt_type(oom_killer) ->
fun (JIDs) -> fun(B) when is_boolean(B) -> B end;
[jid:tolower(jid:decode(iolist_to_binary(S))) opt_type(_) -> [oom_killer].
|| S <- JIDs]
end;
opt_type(watchdog_large_heap) ->
fun (I) when is_integer(I), I > 0 -> I end;
opt_type(_) -> [watchdog_admins, watchdog_large_heap].