24
1
mirror of https://github.com/processone/ejabberd.git synced 2024-06-12 21:52:07 +02:00

Prevent overload of incomming s2s connections

Three changes were introduced:

1) ejabberd_s2s_in now uses p1_fsm instead of gen_fsm.  And uses
   the {max_queue, N} option to kill the process if its input
   queue grows too much.
2) If a ejabberd_s2s_in process is overload and killed, the server
   that originated that connection is not allowed to connect back
   to us for X seconds (set to 60seconds on the source)
3) The list of blocked (both statically and dynamically by the above
   method) host is now also checked for hosts authenticating by
   starttls+sasl. Previusly it was only used during dialback.
This commit is contained in:
Pablo Polvorin 2011-12-02 15:30:20 -03:00
parent 87df27109a
commit cf973f27bb
2 changed files with 64 additions and 14 deletions

View File

@ -41,7 +41,11 @@
dirty_get_connections/0, dirty_get_connections/0,
allow_host/2, allow_host/2,
incoming_s2s_number/0, incoming_s2s_number/0,
outgoing_s2s_number/0 outgoing_s2s_number/0,
clean_temporarily_blocked_table/0,
list_temporarily_blocked_hosts/0,
external_host_overloaded/1,
is_temporarly_blocked/1
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -57,9 +61,14 @@
-define(DEFAULT_MAX_S2S_CONNECTIONS_NUMBER, 1). -define(DEFAULT_MAX_S2S_CONNECTIONS_NUMBER, 1).
-define(DEFAULT_MAX_S2S_CONNECTIONS_NUMBER_PER_NODE, 1). -define(DEFAULT_MAX_S2S_CONNECTIONS_NUMBER_PER_NODE, 1).
-define(S2S_OVERLOAD_BLOCK_PERIOD, 60).
%% once a server is temporarly blocked, it stay blocked for 60 seconds
-record(s2s, {fromto, pid, key}). -record(s2s, {fromto, pid, key}).
-record(state, {}). -record(state, {}).
-record(temporarily_blocked, {host, timestamp}).
%%==================================================================== %%====================================================================
%% API %% API
%%==================================================================== %%====================================================================
@ -79,6 +88,31 @@ route(From, To, Packet) ->
ok ok
end. end.
clean_temporarily_blocked_table() ->
mnesia:clear_table(temporarily_blocked).
list_temporarily_blocked_hosts() ->
ets:tab2list(temporarily_blocked).
external_host_overloaded(Host) ->
?INFO_MSG("Disabling connections from ~s for ~p seconds", [Host, ?S2S_OVERLOAD_BLOCK_PERIOD]),
mnesia:transaction( fun() ->
mnesia:write(#temporarily_blocked{host = Host, timestamp = now()})
end).
is_temporarly_blocked(Host) ->
case mnesia:dirty_read(temporarily_blocked, Host) of
[] -> false;
[#temporarily_blocked{timestamp = T}=Entry] ->
case timer:now_diff(now(), T) of
N when N > ?S2S_OVERLOAD_BLOCK_PERIOD * 1000 * 1000 ->
mnesia:dirty_delete_object(Entry),
false;
_ ->
true
end
end.
remove_connection(FromTo, Pid, Key) -> remove_connection(FromTo, Pid, Key) ->
case catch mnesia:dirty_match_object(s2s, #s2s{fromto = FromTo, case catch mnesia:dirty_match_object(s2s, #s2s{fromto = FromTo,
pid = Pid, pid = Pid,
@ -169,6 +203,7 @@ init([]) ->
mnesia:add_table_copy(s2s, node(), ram_copies), mnesia:add_table_copy(s2s, node(), ram_copies),
mnesia:subscribe(system), mnesia:subscribe(system),
ejabberd_commands:register_commands(commands()), ejabberd_commands:register_commands(commands()),
mnesia:create_table(temporarily_blocked, [{ram_copies, [node()]}, {attributes, record_info(fields, temporarily_blocked)}]),
{ok, #state{}}. {ok, #state{}}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -486,6 +521,9 @@ update_tables() ->
%% Check if host is in blacklist or white list %% Check if host is in blacklist or white list
allow_host(MyServer, S2SHost) -> allow_host(MyServer, S2SHost) ->
allow_host2(MyServer, S2SHost) andalso (not is_temporarly_blocked(S2SHost)).
allow_host2(MyServer, S2SHost) ->
Hosts = ?MYHOSTS, Hosts = ?MYHOSTS,
case lists:dropwhile( case lists:dropwhile(
fun(ParentDomain) -> fun(ParentDomain) ->

View File

@ -27,7 +27,7 @@
-module(ejabberd_s2s_in). -module(ejabberd_s2s_in).
-author('alexey@process-one.net'). -author('alexey@process-one.net').
-behaviour(gen_fsm). -behaviour(p1_fsm).
%% External exports %% External exports
-export([start/2, -export([start/2,
@ -92,10 +92,12 @@
-define(FSMOPTS, []). -define(FSMOPTS, []).
-endif. -endif.
-define(FSMLIMITS, [{max_queue, 2000}]). %% if queue grows more than this, we shutdown this connection.
%% Module start with or without supervisor: %% Module start with or without supervisor:
-ifdef(NO_TRANSIENT_SUPERVISORS). -ifdef(NO_TRANSIENT_SUPERVISORS).
-define(SUPERVISOR_START, gen_fsm:start(ejabberd_s2s_in, [SockData, Opts], -define(SUPERVISOR_START, p1_fsm:start(ejabberd_s2s_in, [SockData, Opts],
?FSMOPTS)). ?FSMOPTS ++ ?FSMLIMITS)).
-else. -else.
-define(SUPERVISOR_START, supervisor:start_child(ejabberd_s2s_in_sup, -define(SUPERVISOR_START, supervisor:start_child(ejabberd_s2s_in_sup,
[SockData, Opts])). [SockData, Opts])).
@ -131,7 +133,7 @@ start(SockData, Opts) ->
?SUPERVISOR_START. ?SUPERVISOR_START.
start_link(SockData, Opts) -> start_link(SockData, Opts) ->
gen_fsm:start_link(ejabberd_s2s_in, [SockData, Opts], ?FSMOPTS). p1_fsm:start_link(ejabberd_s2s_in, [SockData, Opts], ?FSMOPTS ++ ?FSMLIMITS).
socket_type() -> socket_type() ->
xml_stream. xml_stream.
@ -347,8 +349,9 @@ wait_for_feature_request({xmlstreamelement, El}, StateData) ->
error -> error ->
false false
end, end,
AllowRemoteHost = ejabberd_s2s:allow_host("", AuthDomain),
if if
AuthRes -> AuthRes andalso AllowRemoteHost ->
(StateData#state.sockmod):reset_stream( (StateData#state.sockmod):reset_stream(
StateData#state.socket), StateData#state.socket),
send_element(StateData, send_element(StateData,
@ -590,14 +593,7 @@ handle_sync_event(get_state_infos, _From, StateName, StateData) ->
catch catch
_:_ -> {unknown,unknown} _:_ -> {unknown,unknown}
end, end,
Domains = case StateData#state.authenticated of Domains = get_external_hosts(StateData),
true ->
[StateData#state.auth_domain];
false ->
Connections = StateData#state.connections,
[D || {{D, _}, established} <-
dict:to_list(Connections)]
end,
Infos = [ Infos = [
{direction, in}, {direction, in},
{statename, StateName}, {statename, StateName},
@ -656,9 +652,25 @@ handle_info(_, StateName, StateData) ->
%%---------------------------------------------------------------------- %%----------------------------------------------------------------------
terminate(Reason, _StateName, StateData) -> terminate(Reason, _StateName, StateData) ->
?DEBUG("terminated: ~p", [Reason]), ?DEBUG("terminated: ~p", [Reason]),
case Reason of
{process_limit, _} ->
[ejabberd_s2s:external_host_overloaded(Host) || Host <- get_external_hosts(StateData)];
_ ->
ok
end,
(StateData#state.sockmod):close(StateData#state.socket), (StateData#state.sockmod):close(StateData#state.socket),
ok. ok.
get_external_hosts(StateData) ->
case StateData#state.authenticated of
true ->
[StateData#state.auth_domain];
false ->
Connections = StateData#state.connections,
[D || {{D, _}, established} <- dict:to_list(Connections)]
end.
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
%%% Internal functions %%% Internal functions
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------