Monitor routes

Clean route table from the process that died unexpectedly.
This usually may happen when the corresponding process
gets killed by OOM killer during overload.
This commit is contained in:
Evgeny Khramtsov 2019-07-03 10:39:03 +03:00
parent 0fc190e2ef
commit 6e2502ea7d
1 changed files with 47 additions and 5 deletions

View File

@ -65,6 +65,7 @@
%% This value is used in SIP and Megaco for a transaction lifetime.
-define(IQ_TIMEOUT, 32000).
-define(CALL_TIMEOUT, timer:minutes(10)).
-include("logger.hrl").
-include("ejabberd_router.hrl").
@ -78,7 +79,7 @@
-callback find_routes(binary()) -> {ok, [#route{}]} | {error, any()}.
-callback get_all_routes() -> {ok, [binary()]} | {error, any()}.
-record(state, {}).
-record(state, {route_monitors = #{} :: #{{binary(), pid()} => reference()}}).
%%====================================================================
%% API
@ -176,6 +177,7 @@ register_route(Domain, ServerHost, LocalHint, Pid) ->
get_component_number(LDomain), Pid) of
ok ->
?DEBUG("Route registered: ~s", [LDomain]),
monitor_route(LDomain, Pid),
ejabberd_hooks:run(route_registered, [LDomain]),
delete_cache(Mod, LDomain);
{error, Err} ->
@ -205,6 +207,7 @@ unregister_route(Domain, Pid) ->
LDomain, get_component_number(LDomain), Pid) of
ok ->
?DEBUG("Route unregistered: ~s", [LDomain]),
demonitor_route(LDomain, Pid),
ejabberd_hooks:run(route_unregistered, [LDomain]),
delete_cache(Mod, LDomain);
{error, Err} ->
@ -324,16 +327,47 @@ init([]) ->
clean_cache(),
{ok, #state{}}.
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_call({monitor, Domain, Pid}, _From, State) ->
MRefs = State#state.route_monitors,
MRefs1 = case maps:is_key({Domain, Pid}, MRefs) of
true -> MRefs;
false ->
MRef = erlang:monitor(process, Pid),
MRefs#{{Domain, Pid} => MRef}
end,
{reply, ok, State#state{route_monitors = MRefs1}};
handle_call({demonitor, Domain, Pid}, _From, State) ->
MRefs = State#state.route_monitors,
MRefs1 = case maps:find({Domain, Pid}, MRefs) of
{ok, MRef} ->
erlang:demonitor(MRef, [flush]),
maps:remove({Domain, Pid}, MRefs);
error ->
MRefs
end,
{reply, ok, State#state{route_monitors = MRefs1}};
handle_call(Request, From, State) ->
?WARNING_MSG("Unexpected call from ~p: ~p", [From, Request]),
{noreply, State}.
handle_cast(_Msg, State) ->
handle_cast(Msg, State) ->
?WARNING_MSG("Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({route, Packet}, State) ->
route(Packet),
{noreply, State};
handle_info({'DOWN', MRef, _, Pid, Info}, State) ->
MRefs = maps:filter(
fun({Domain, P}, M) when P == Pid, M == MRef ->
?DEBUG("Process ~p with route registered to ~s "
"has terminated unexpectedly with reason: ~p",
[P, Domain, Info]),
false;
(_, _) ->
true
end, State#state.route_monitors),
{noreply, State#state{route_monitors = MRefs}};
handle_info(Info, State) ->
?ERROR_MSG("Unexpected info: ~p", [Info]),
{noreply, State}.
@ -427,6 +461,14 @@ get_domain_balancing(From, To, LDomain) ->
end
end.
-spec monitor_route(binary(), pid()) -> ok.
monitor_route(Domain, Pid) ->
?GEN_SERVER:call(?MODULE, {monitor, Domain, Pid}, ?CALL_TIMEOUT).
-spec demonitor_route(binary(), pid()) -> ok.
demonitor_route(Domain, Pid) ->
?GEN_SERVER:call(?MODULE, {demonitor, Domain, Pid}, ?CALL_TIMEOUT).
-spec get_backend() -> module().
get_backend() ->
DBType = ejabberd_option:router_db_type(),