From 515f8b22c0b81675207626aad11eeba0cee2c48c Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Sun, 17 Dec 2017 18:52:37 +0300 Subject: [PATCH] Rewrite ejabberd system monitor Previous version was inefficient: it had a lot of false positives along with a lot of false negatives, making its usage pointless. The new verion is based on memsup(3erl) application: the OOM watchdog is only started when total OS memory consumption is more than 80%. A watchdog periodically inspects all running processes and collects statistics about overloaded ones (those queueing a lot of messages). If the OOM killer is enabled (`oom_killer: true`), all overloaded processes would be killed. By default, OOM killer is enabled. When memory consumption is back to normal, the OOM watchdog is stopped. --- src/ejabberd_app.erl | 1 + src/ejabberd_sup.erl | 8 - src/ejabberd_system_monitor.erl | 531 +++++++++++++++----------------- 3 files changed, 246 insertions(+), 294 deletions(-) diff --git a/src/ejabberd_app.erl b/src/ejabberd_app.erl index 3743a8f04..4e9819d64 100644 --- a/src/ejabberd_app.erl +++ b/src/ejabberd_app.erl @@ -50,6 +50,7 @@ start(normal, _Args) -> ejabberd_mnesia:start(), file_queue_init(), maybe_add_nameservers(), + ejabberd_system_monitor:start(), case ejabberd_sup:start_link() of {ok, SupPid} -> register_elixir_config_hooks(), diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl index dbbc4d5e4..3f9809a05 100644 --- a/src/ejabberd_sup.erl +++ b/src/ejabberd_sup.erl @@ -47,13 +47,6 @@ init([]) -> 5000, worker, [ejabberd_cluster]}, - SystemMonitor = - {ejabberd_system_monitor, - {ejabberd_system_monitor, start_link, []}, - permanent, - brutal_kill, - worker, - [ejabberd_system_monitor]}, S2S = {ejabberd_s2s, {ejabberd_s2s, start_link, []}, @@ -172,7 +165,6 @@ init([]) -> PKIX, ACME, Listener, - SystemMonitor, S2S, Captcha, S2SInSupervisor, diff --git a/src/ejabberd_system_monitor.erl b/src/ejabberd_system_monitor.erl index 773104f9e..b0df101ba 100644 --- a/src/ejabberd_system_monitor.erl +++ b/src/ejabberd_system_monitor.erl @@ -24,320 +24,279 @@ %%%------------------------------------------------------------------- -module(ejabberd_system_monitor). - +-behaviour(gen_event). -behaviour(ejabberd_config). -author('alexey@process-one.net'). - --behaviour(gen_server). +-author('ekhramtsov@process-one.net'). %% API --export([start_link/0, process_command/1, register_hook/1, - unregister_hook/1, process_remote_command/1]). +-export([start/0, opt_type/1]). --export([init/1, handle_call/3, handle_cast/2, - handle_info/2, terminate/2, code_change/3, opt_type/1]). +%% gen_event callbacks +-export([init/1, handle_event/2, handle_call/2, + handle_info/2, terminate/2, code_change/3]). --include("ejabberd.hrl"). --include("logger.hrl"). +%% We don't use ejabberd logger because lager can be overloaded +%% too and alarm_handler may get stuck. +%%-include("logger.hrl"). --include("xmpp.hrl"). +-define(CHECK_INTERVAL, timer:seconds(30)). +-record(state, {tref :: reference(), + mref :: reference()}). +-record(proc_stat, {qlen :: non_neg_integer(), + memory :: non_neg_integer(), + initial_call :: mfa(), + current_function :: mfa(), + ancestors :: [pid() | atom()], + application :: pid() | atom(), + name :: pid() | atom()}). +-type state() :: #state{}. +-type proc_stat() :: #proc_stat{}. --record(state, {}). +%%%=================================================================== +%%% API +%%%=================================================================== +-spec start() -> ok. +start() -> + gen_event:add_handler(alarm_handler, ?MODULE, []), + application:load(os_mon), + application:set_env(os_mon, start_cpu_sup, false), + application:set_env(os_mon, start_disksup, false), + application:set_env(os_mon, start_os_sup, false), + application:set_env(os_mon, start_memsup, true), + ejabberd:start_app(os_mon). -%%==================================================================== -%% API -%%==================================================================== -%%-------------------------------------------------------------------- -%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} -%% Description: Starts the server -%%-------------------------------------------------------------------- -start_link() -> - LH = ejabberd_config:get_option(watchdog_large_heap, 1000000), - Opts = [{large_heap, LH}], - gen_server:start_link({local, ?MODULE}, ?MODULE, Opts, - []). +excluded_apps() -> + %% FIXME: lager gets overloaded very often, but + %% it fails to get recovered after brutal kill, + %% so it's better to make it tolerate crashes + %% rather than ignoring the overload + [os_mon, mnesia, sasl, stdlib, kernel, lager]. --spec process_command(stanza()) -> ok. -process_command(#message{from = From, to = To, body = Body}) -> - case To of - #jid{luser = <<"">>, lresource = <<"watchdog">>} -> - LFrom = jid:tolower(jid:remove_resource(From)), - case lists:member(LFrom, get_admin_jids()) of - true -> - BodyText = xmpp:get_text(Body), - spawn(fun () -> - process_flag(priority, high), - process_command1(From, To, BodyText) - end), - ok; - false -> ok - end; - _ -> - ok - end; -process_command(_) -> - ok. - -register_hook(Host) -> - ejabberd_hooks:add(local_send_to_resource_hook, Host, - ?MODULE, process_command, 50). - -unregister_hook(Host) -> - ejabberd_hooks:delete(local_send_to_resource_hook, Host, - ?MODULE, process_command, 50). - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -%%-------------------------------------------------------------------- -%% Function: init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% Description: Initiates the server -%%-------------------------------------------------------------------- -init(Opts) -> - LH = proplists:get_value(large_heap, Opts), - process_flag(priority, high), - erlang:system_monitor(self(), [{large_heap, LH}]), - ejabberd_hooks:add(host_up, ?MODULE, register_hook, 50), - ejabberd_hooks:add(host_down, ?MODULE, unregister_hook, 60), - lists:foreach(fun register_hook/1, ?MYHOSTS), +%%%=================================================================== +%%% gen_event callbacks +%%%=================================================================== +init([]) -> {ok, #state{}}. -%%-------------------------------------------------------------------- -%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% Description: Handling call messages -%%-------------------------------------------------------------------- -handle_call({get, large_heap}, _From, State) -> - {reply, get_large_heap(), State}; -handle_call({set, large_heap, NewValue}, _From, - State) -> - MonSettings = erlang:system_monitor(self(), - [{large_heap, NewValue}]), - OldLH = get_large_heap(MonSettings), - NewLH = get_large_heap(), - {reply, {lh_changed, OldLH, NewLH}, State}; -handle_call(_Request, _From, State) -> - Reply = ok, {reply, Reply, State}. +handle_event({set_alarm, {system_memory_high_watermark, _}}, State) -> + error_logger:warning_msg( + "More than 80% of OS memory is allocated, " + "starting OOM watchdog", []), + {ok, handle_overload(State)}; +handle_event({clear_alarm, system_memory_high_watermark}, State) -> + cancel_timer(State#state.tref), + error_logger:info_msg( + "Memory consumption is back to normal, " + "stopping OOM watchdog", []), + {ok, State#state{tref = undefined}}; +handle_event(Event, State) -> + error_logger:warning_msg("unexpected event: ~p", [Event]), + {ok, State}. -get_large_heap() -> - MonSettings = erlang:system_monitor(), - get_large_heap(MonSettings). +handle_call(_Request, State) -> + {ok, {error, badarg}, State}. -get_large_heap(MonSettings) -> - {_MonitorPid, Options} = MonSettings, - proplists:get_value(large_heap, Options). +handle_info({timeout, _TRef, handle_overload}, State) -> + {ok, handle_overload(State)}; +handle_info(Info, State) -> + error_logger:warning_msg("unexpected info: ~p", [Info]), + {ok, State}. -%%-------------------------------------------------------------------- -%% Function: handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling cast messages -%%-------------------------------------------------------------------- -handle_cast(_Msg, State) -> {noreply, State}. +terminate(_Reason, _State) -> + ok. -%%-------------------------------------------------------------------- -%% Function: handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling all non call/cast messages -%%-------------------------------------------------------------------- -handle_info({monitor, Pid, large_heap, Info}, State) -> - spawn(fun () -> - process_flag(priority, high), - process_large_heap(Pid, Info) - end), - {noreply, State}; -handle_info(_Info, State) -> {noreply, State}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. -%%-------------------------------------------------------------------- -%% Function: terminate(Reason, State) -> void() -%% Description: This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any necessary -%% cleaning up. When it returns, the gen_server terminates with Reason. -%% The return value is ignored. -%%-------------------------------------------------------------------- -terminate(_Reason, _State) -> ok. - -%%-------------------------------------------------------------------- -%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} -%% Description: Convert process state when code is changed -%%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> {ok, State}. - -%%-------------------------------------------------------------------- +%%%=================================================================== %%% Internal functions -%%-------------------------------------------------------------------- +%%%=================================================================== +-spec handle_overload(state()) -> state(). +handle_overload(State) -> + AllProcs = processes(), + AppPids = get_app_pids(), + {TotalMsgs, ProcsNum, Apps, Stats} = overloaded_procs(AppPids, AllProcs), + if TotalMsgs >= 10000 -> + SortedStats = lists:reverse(lists:keysort(#proc_stat.qlen, Stats)), + error_logger:warning_msg( + "The system is overloaded with ~b messages " + "queued by ~b process(es) (~b%) " + "from the following applications: ~s; " + "the top processes are:~n~s", + [TotalMsgs, ProcsNum, + round(ProcsNum*100/length(AllProcs)), + format_apps(Apps), + format_top_procs(SortedStats)]), + kill(SortedStats, round(TotalMsgs/ProcsNum)); + true -> + ok + end, + lists:foreach(fun erlang:garbage_collect/1, AllProcs), + restart_timer(State). -process_large_heap(Pid, Info) -> - Host = (?MYNAME), - JIDs = get_admin_jids(), - DetailedInfo = detailed_info(Pid), - Body = str:format("(~w) The process ~w is consuming too " - "much memory:~n~p~n~s", - [node(), Pid, Info, DetailedInfo]), - From = jid:make(<<"">>, Host, <<"watchdog">>), - Hint = [#hint{type = 'no-permanent-store'}], - lists:foreach( - fun(JID) -> - send_message(From, jid:make(JID), Body, Hint) - end, JIDs). - -send_message(From, To, Body) -> - send_message(From, To, Body, []). - -send_message(From, To, Body, ExtraEls) -> - ejabberd_router:route(#message{type = chat, - from = From, - to = To, - body = xmpp:mk_text(Body), - sub_els = ExtraEls}). - -get_admin_jids() -> - ejabberd_config:get_option(watchdog_admins, []). - -detailed_info(Pid) -> - case process_info(Pid, dictionary) of - {dictionary, Dict} -> - case lists:keysearch('$ancestors', 1, Dict) of - {value, {'$ancestors', [Sup | _]}} -> - case Sup of - ejabberd_c2s_sup -> c2s_info(Pid); - ejabberd_s2s_out_sup -> s2s_out_info(Pid); - ejabberd_service_sup -> service_info(Pid); - _ -> detailed_info1(Pid) - end; - _ -> detailed_info1(Pid) - end; - _ -> detailed_info1(Pid) +-spec get_app_pids() -> map(). +get_app_pids() -> + try application:info() of + Info -> + case lists:keyfind(running, 1, Info) of + {_, Apps} -> + lists:foldl( + fun({Name, Pid}, M) when is_pid(Pid) -> + maps:put(Pid, Name, M); + (_, M) -> + M + end, #{}, Apps); + false -> + #{} + end + catch _:_ -> + #{} end. -detailed_info1(Pid) -> - io_lib:format("~p", - [[process_info(Pid, current_function), - process_info(Pid, initial_call), - process_info(Pid, message_queue_len), - process_info(Pid, links), process_info(Pid, dictionary), - process_info(Pid, heap_size), - process_info(Pid, stack_size)]]). +-spec overloaded_procs(map(), [pid()]) + -> {non_neg_integer(), non_neg_integer(), dict:dict(), [proc_stat()]}. +overloaded_procs(AppPids, AllProcs) -> + lists:foldl( + fun(Pid, {TotalMsgs, ProcsNum, Apps, Stats}) -> + case proc_stat(Pid, AppPids) of + #proc_stat{qlen = QLen, application = App} = Stat + when QLen > 0 -> + {TotalMsgs + QLen, ProcsNum + 1, + dict:update_counter(App, QLen, Apps), + [Stat|Stats]}; + _ -> + {TotalMsgs, ProcsNum, Apps, Stats} + end + end, {0, 0, dict:new(), []}, AllProcs). -c2s_info(Pid) -> - [<<"Process type: c2s">>, check_send_queue(Pid), - <<"\n">>, - io_lib:format("Command to kill this process: kill ~s ~w", - [iolist_to_binary(atom_to_list(node())), Pid])]. - -s2s_out_info(Pid) -> - FromTo = mnesia:dirty_select(s2s, - [{{s2s, '$1', Pid, '_'}, [], ['$1']}]), - [<<"Process type: s2s_out">>, - case FromTo of - [{From, To}] -> - <<"\n", - (io_lib:format("S2S connection: from ~s to ~s", - [From, To]))/binary>>; - _ -> <<"">> - end, - check_send_queue(Pid), <<"\n">>, - io_lib:format("Command to kill this process: kill ~s ~w", - [iolist_to_binary(atom_to_list(node())), Pid])]. - -service_info(Pid) -> - Routes = mnesia:dirty_select(route, - [{{route, '$1', Pid, '_'}, [], ['$1']}]), - [<<"Process type: s2s_out">>, - case Routes of - [Route] -> <<"\nServiced domain: ", Route/binary>>; - _ -> <<"">> - end, - check_send_queue(Pid), <<"\n">>, - io_lib:format("Command to kill this process: kill ~s ~w", - [iolist_to_binary(atom_to_list(node())), Pid])]. - -check_send_queue(Pid) -> - case {process_info(Pid, current_function), - process_info(Pid, message_queue_len)} - of - {{current_function, MFA}, {message_queue_len, MLen}} -> - if MLen > 100 -> - case MFA of - {prim_inet, send, 2} -> - <<"\nPossible reason: the process is blocked " - "trying to send data over its TCP connection.">>; - {M, F, A} -> - [<<"\nPossible reason: the process can't " - "process messages faster than they arrive. ">>, - io_lib:format("Current function is ~w:~w/~w", - [M, F, A])] - end; - true -> <<"">> - end; - _ -> <<"">> +-spec proc_stat(pid(), map()) -> proc_stat() | undefined. +proc_stat(Pid, AppPids) -> + case process_info(Pid, [message_queue_len, + memory, + initial_call, + current_function, + dictionary, + group_leader, + registered_name]) of + [{_, MsgLen}, {_, Mem}, {_, InitCall}, + {_, CurrFun}, {_, Dict}, {_, GL}, {_, Name}] -> + IntLen = proplists:get_value('$internal_queue_len', Dict, 0), + TrueInitCall = proplists:get_value('$initial_call', Dict, InitCall), + Ancestors = proplists:get_value('$ancestors', Dict, []), + Len = IntLen + MsgLen, + App = maps:get(GL, AppPids, kernel), + RegName = case Name of + [] -> Pid; + _ -> Name + end, + #proc_stat{qlen = Len, + memory = Mem, + initial_call = TrueInitCall, + current_function = CurrFun, + ancestors = Ancestors, + application = App, + name = RegName}; + _ -> + undefined end. -process_command1(From, To, Body) -> - process_command2(str:tokens(Body, <<" ">>), From, To). +-spec restart_timer(#state{}) -> #state{}. +restart_timer(State) -> + cancel_timer(State#state.tref), + TRef = erlang:start_timer(?CHECK_INTERVAL, self(), handle_overload), + State#state{tref = TRef}. -process_command2([<<"kill">>, SNode, SPid], From, To) -> - Node = misc:binary_to_atom(SNode), - remote_command(Node, [kill, SPid], From, To); -process_command2([<<"showlh">>, SNode], From, To) -> - Node = misc:binary_to_atom(SNode), - remote_command(Node, [showlh], From, To); -process_command2([<<"setlh">>, SNode, NewValueString], - From, To) -> - Node = misc:binary_to_atom(SNode), - NewValue = binary_to_integer(NewValueString), - remote_command(Node, [setlh, NewValue], From, To); -process_command2([<<"help">>], From, To) -> - send_message(To, From, help()); -process_command2(_, From, To) -> - send_message(To, From, help()). +-spec cancel_timer(reference()) -> ok. +cancel_timer(undefined) -> + ok; +cancel_timer(TRef) -> + case erlang:cancel_timer(TRef) of + false -> + receive {timeout, TRef, _} -> ok + after 0 -> ok + end; + _ -> + ok + end. -help() -> - <<"Commands:\n kill \n showlh " - "\n setlh ">>. +-spec format_apps(dict:dict()) -> io:data(). +format_apps(Apps) -> + AppList = lists:reverse(lists:keysort(2, dict:to_list(Apps))), + string:join( + [io_lib:format("~p (~b msgs)", [App, Msgs]) || {App, Msgs} <- AppList], + ", "). -remote_command(Node, Args, From, To) -> - Message = case ejabberd_cluster:call(Node, ?MODULE, - process_remote_command, [Args]) - of - {badrpc, Reason} -> - io_lib:format("Command failed:~n~p", [Reason]); - Result -> Result - end, - send_message(To, From, iolist_to_binary(Message)). +-spec format_top_procs([proc_stat()]) -> io:data(). +format_top_procs(Stats) -> + Stats1 = lists:sublist(Stats, 5), + string:join(lists:map(fun format_proc/1, Stats1), io_lib:nl()). -process_remote_command([kill, SPid]) -> - exit(list_to_pid(SPid), kill), <<"ok">>; -process_remote_command([showlh]) -> - Res = gen_server:call(ejabberd_system_monitor, - {get, large_heap}), - io_lib:format("Current large heap: ~p", [Res]); -process_remote_command([setlh, NewValue]) -> - {lh_changed, OldLH, NewLH} = - gen_server:call(ejabberd_system_monitor, - {set, large_heap, NewValue}), - io_lib:format("Result of set large heap: ~p --> ~p", - [OldLH, NewLH]); -process_remote_command(_) -> throw(unknown_command). +-spec format_proc(proc_stat()) -> io:data(). +format_proc(#proc_stat{qlen = Len, memory = Mem, initial_call = InitCall, + current_function = CurrFun, ancestors = Ancs, + application = App, name = Name}) -> + io_lib:format( + "** ~w: msgs = ~b, memory = ~b, initial_call = ~s, " + "current_function = ~s, ancestors = ~w, application = ~w", + [Name, Len, Mem, format_mfa(InitCall), format_mfa(CurrFun), Ancs, App]). --spec opt_type(watchdog_admins) -> fun(([binary()]) -> ljid()); - (watchdog_large_heap) -> fun((pos_integer()) -> pos_integer()); +-spec format_mfa(mfa()) -> io:data(). +format_mfa({M, F, A}) when is_atom(M), is_atom(F), is_integer(A) -> + io_lib:format("~s:~s/~b", [M, F, A]); +format_mfa(WTF) -> + io_lib:format("~w", [WTF]). + +-spec kill([proc_stat()], non_neg_integer()) -> ok. +kill(Stats, Threshold) -> + case ejabberd_config:get_option(oom_killer, true) of + true -> + do_kill(Stats, Threshold); + false -> + ok + end. + +-spec do_kill([proc_stat()], non_neg_integer()) -> ok. +do_kill(Stats, Threshold) -> + Killed = lists:filtermap( + fun(#proc_stat{qlen = Len, name = Name, application = App}) + when Len >= Threshold -> + case lists:member(App, excluded_apps()) of + true -> + error_logger:warning_msg( + "Unable to kill process ~p from whitelisted " + "application ~p", [Name, App]), + false; + false -> + case kill_proc(Name) of + false -> false; + Pid -> {true, Pid} + end + end; + (_) -> + false + end, Stats), + TotalKilled = length(Killed), + if TotalKilled > 0 -> + error_logger:error_msg( + "Killed ~b process(es) consuming more than ~b message(s) each", + [TotalKilled, Threshold]); + true -> + ok + end. + +-spec kill_proc(pid() | atom()) -> false | pid(). +kill_proc(undefined) -> + false; +kill_proc(Name) when is_atom(Name) -> + kill_proc(whereis(Name)); +kill_proc(Pid) -> + exit(Pid, kill), + Pid. + +-spec opt_type(oom_killer) -> fun((boolean()) -> boolean()); (atom()) -> [atom()]. -opt_type(watchdog_admins) -> - fun (JIDs) -> - [jid:tolower(jid:decode(iolist_to_binary(S))) - || S <- JIDs] - end; -opt_type(watchdog_large_heap) -> - fun (I) when is_integer(I), I > 0 -> I end; -opt_type(_) -> [watchdog_admins, watchdog_large_heap]. +opt_type(oom_killer) -> + fun(B) when is_boolean(B) -> B end; +opt_type(_) -> [oom_killer].