From 9fc203ee6d93d2d630f5e427678df5fa2dcd7e8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micka=C3=ABl=20R=C3=A9mond?= Date: Fri, 14 Sep 2007 14:15:44 +0000 Subject: [PATCH] open up to 3 s2s outgoing connection per domain pair SVN Revision: 928 --- src/ejabberd_s2s.erl | 121 +++++++++++++++++++++++---------------- src/ejabberd_s2s_in.erl | 31 +++++----- src/ejabberd_s2s_out.erl | 13 ++--- 3 files changed, 93 insertions(+), 72 deletions(-) diff --git a/src/ejabberd_s2s.erl b/src/ejabberd_s2s.erl index c9e40efbc..e20870272 100644 --- a/src/ejabberd_s2s.erl +++ b/src/ejabberd_s2s.erl @@ -3,12 +3,12 @@ %%% Author : Alexey Shchepin %%% Purpose : S2S connections manager %%% Created : 7 Dec 2002 by Alexey Shchepin -%%% Id : $Id$ +%%% Id : $Id: ejabberd_s2s.erl 820 2007-07-19 21:17:13Z mremond $ %%%---------------------------------------------------------------------- -module(ejabberd_s2s). -author('alexey@sevcom.net'). --vsn('$Revision$ '). +-vsn('$Revision: 820 $ '). -behaviour(gen_server). @@ -16,9 +16,8 @@ -export([start_link/0, route/3, have_connection/1, - get_key/1, + has_key/2, try_register/1, - remove_connection/1, remove_connection/3, dirty_get_connections/0, allow_host/2, @@ -55,14 +54,9 @@ route(From, To, Packet) -> ok end. -remove_connection(FromTo) -> - F = fun() -> - mnesia:delete({s2s, FromTo}) - end, - mnesia:transaction(F). - remove_connection(FromTo, Pid, Key) -> - case catch mnesia:dirty_read(s2s, FromTo) of + ?ERROR_MSG("XXXXXXXXXXX ~p~n", [{FromTo, Pid, Key}]), + case catch mnesia:dirty_match_object(s2s, {s2s, FromTo, Pid, '_'}) of [#s2s{pid = Pid, key = Key}] -> F = fun() -> mnesia:delete_object(#s2s{fromto = FromTo, @@ -82,23 +76,27 @@ have_connection(FromTo) -> false end. -get_key(FromTo) -> - case catch mnesia:dirty_read(s2s, FromTo) of - [E] -> - E#s2s.key; +has_key(FromTo, Key) -> + case mnesia:dirty_select(s2s, + [{#s2s{fromto = FromTo, key = Key, _ = '_'}, + [], + ['$_']}]) of + [] -> + false; _ -> - error + true end. try_register(FromTo) -> Key = randoms:get_string(), + Max_S2S_Connexions_Number = 3, F = fun() -> case mnesia:read({s2s, FromTo}) of - [] -> - mnesia:write(#s2s{fromto = FromTo, - pid = self(), - key = Key}), - {key, Key}; + L when length(L) < Max_S2S_Connexions_Number -> + mnesia:write(#s2s{fromto = FromTo, + pid = self(), + key = Key}), + {key, Key}; _ -> false end @@ -126,9 +124,10 @@ dirty_get_connections() -> %%-------------------------------------------------------------------- init([]) -> update_tables(), - mnesia:create_table(s2s, [{ram_copies, [node()]}, + mnesia:create_table(s2s, [{ram_copies, [node()]}, {type, bag}, {attributes, record_info(fields, s2s)}]), mnesia:add_table_copy(s2s, node(), ram_copies), + mnesia:add_table_index(s2s, key), mnesia:subscribe(system), ejabberd_ctl:register_commands( [{"incoming-s2s-number", "print number of incoming s2s connections on the node"}, @@ -224,7 +223,7 @@ do_route(From, To, Packet) -> Attrs), send_element(Pid, {xmlelement, Name, NewAttrs, Els}), ok; - {aborted, Reason} -> + {aborted, _Reason} -> case xml:get_tag_attr_s("type", Packet) of "error" -> ok; "result" -> ok; @@ -240,6 +239,8 @@ find_connection(From, To) -> #jid{lserver = MyServer} = From, #jid{lserver = Server} = To, FromTo = {MyServer, Server}, + Max_S2S_Connexions_Number = 3, + ?ERROR_MSG("XXX Finding connection for ~p~n", [FromTo]), case catch mnesia:dirty_read(s2s, FromTo) of {'EXIT', Reason} -> {aborted, Reason}; @@ -250,36 +251,49 @@ find_connection(From, To) -> case {is_service(From, To), allow_host(MyServer, Server)} of {false, true} -> - ?DEBUG("starting new s2s connection~n", []), - Key = randoms:get_string(), - {ok, Pid} = ejabberd_s2s_out:start( - MyServer, Server, {new, Key}), - F = fun() -> - case mnesia:read({s2s, FromTo}) of - [El] -> - El#s2s.pid; - [] -> - mnesia:write(#s2s{fromto = FromTo, - pid = Pid, - key = Key}), - Pid - end - end, - TRes = mnesia:transaction(F), - case TRes of - {atomic, Pid} -> - ejabberd_s2s_out:start_connection(Pid); - _ -> - ejabberd_s2s_out:stop_connection(Pid) - end, - TRes; + new_connection(MyServer, Server, From, FromTo, Max_S2S_Connexions_Number); _ -> {aborted, error} end; - [El] -> - {atomic, El#s2s.pid} + L when is_list(L) , length(L) < Max_S2S_Connexions_Number -> + %% We establish another connection for this pair. + new_connection(MyServer, Server, From, FromTo, Max_S2S_Connexions_Number); + L when is_list(L) -> + %% We choose a connexion from the pool of opened ones. + {atomic, choose_connection(From, L)} end. +choose_connection(From, Connections) -> + El = lists:nth(erlang:phash(From, length(Connections)), Connections), + %El = lists:nth(random:uniform(length(Connections)), Connections), + ?ERROR_MSG("XXX using ejabberd_s2s_out ~p~n", [El#s2s.pid]), + El#s2s.pid. + +new_connection(MyServer, Server, From, FromTo, Max_S2S_Connexions_Number) -> + Key = randoms:get_string(), + {ok, Pid} = ejabberd_s2s_out:start( + MyServer, Server, {new, Key}), + F = fun() -> + case mnesia:read({s2s, FromTo}) of + L when length(L) < Max_S2S_Connexions_Number -> + mnesia:write(#s2s{fromto = FromTo, + pid = Pid, + key = Key}), + ?ERROR_MSG("XXX new s2s connection started ~p~n", [Pid]), + Pid; + L -> + choose_connection(From, L) + end + end, + TRes = mnesia:transaction(F), + case TRes of + {atomic, Pid} -> + ejabberd_s2s_out:start_connection(Pid); + _ -> + ejabberd_s2s_out:stop_connection(Pid) + end, + TRes. + %%-------------------------------------------------------------------- %% Function: is_service(From, To) -> true | false @@ -321,6 +335,15 @@ ctl_process(Val, _Args) -> Val. update_tables() -> + case catch mnesia:table_info(s2s, type) of + bag -> + ok; + {'EXIT', _} -> + ok; + _ -> + % XXX TODO convert it ? + mnesia:delete_table(s2s) + end, case catch mnesia:table_info(s2s, attributes) of [fromto, node, key] -> mnesia:transform_table(s2s, ignore, [fromto, pid, key]), @@ -349,5 +372,3 @@ allow_host(MyServer, S2SHost) -> _ -> true %% The default s2s policy is allow end end. - - diff --git a/src/ejabberd_s2s_in.erl b/src/ejabberd_s2s_in.erl index 969be69d9..81cdf665f 100644 --- a/src/ejabberd_s2s_in.erl +++ b/src/ejabberd_s2s_in.erl @@ -3,7 +3,7 @@ %%% Author : Alexey Shchepin %%% Purpose : Serve incoming s2s connection %%% Created : 6 Dec 2002 by Alexey Shchepin -%%% Id : $Id$ +%%% Id : $Id: ejabberd_s2s_in.erl 820 2007-07-19 21:17:13Z mremond $ %%%---------------------------------------------------------------------- -module(ejabberd_s2s_in). @@ -74,7 +74,7 @@ -define(HOST_UNKNOWN_ERR, xml:element_to_string(?SERR_HOST_UNKNOWN)). --define(INVALID_FROM_ERR, +-define(INVALID_FROM_ERR, xml:element_to_string(?SERR_INVALID_FROM)). -define(INVALID_XML_ERR, @@ -101,7 +101,7 @@ socket_type() -> %% Returns: {ok, StateName, StateData} | %% {ok, StateName, StateData, Timeout} | %% ignore | -%% {stop, StopReason} +%% {stop, StopReason} %%---------------------------------------------------------------------- init([{SockMod, Socket}, Opts]) -> ?INFO_MSG("started: ~p", [Socket]), @@ -136,7 +136,7 @@ init([{SockMod, Socket}, Opts]) -> %% Func: StateName/2 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | -%% {stop, Reason, NewStateData} +%% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) -> @@ -312,8 +312,8 @@ stream_established({xmlstreamelement, El}, StateData) -> ?INFO_MSG("GET KEY: ~p", [{To, From, Id, Key}]), LTo = jlib:nameprep(To), LFrom = jlib:nameprep(From), - %% Checks if the from domain is allowed and if the to - %% domain is handled by this server: + %% Checks if the from domain is allowed and if the to + %% domain is handled by this server: case {ejabberd_s2s:allow_host(To, From), lists:member(LTo, ejabberd_router:dirty_get_all_domains())} of {true, true} -> @@ -338,10 +338,13 @@ stream_established({xmlstreamelement, El}, StateData) -> ?INFO_MSG("VERIFY KEY: ~p", [{To, From, Id, Key}]), LTo = jlib:nameprep(To), LFrom = jlib:nameprep(From), - Key1 = ejabberd_s2s:get_key({LTo, LFrom}), - Type = if Key == Key1 -> "valid"; - true -> "invalid" + Type = case ejabberd_s2s:has_key({LTo, LFrom}, Key) of + true -> "valid"; + _ -> "invalid" end, + %Type = if Key == Key1 -> "valid"; + % true -> "invalid" + % end, send_element(StateData, {xmlelement, "db:verify", @@ -456,7 +459,7 @@ stream_established(closed, StateData) -> %% {reply, Reply, NextStateName, NextStateData} | %% {reply, Reply, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} | -%% {stop, Reason, Reply, NewStateData} +%% {stop, Reason, Reply, NewStateData} %%---------------------------------------------------------------------- %state_name(Event, From, StateData) -> % Reply = ok, @@ -466,7 +469,7 @@ stream_established(closed, StateData) -> %% Func: handle_event/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | -%% {stop, Reason, NewStateData} +%% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- handle_event(_Event, StateName, StateData) -> {next_state, StateName, StateData}. @@ -478,7 +481,7 @@ handle_event(_Event, StateName, StateData) -> %% {reply, Reply, NextStateName, NextStateData} | %% {reply, Reply, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} | -%% {stop, Reason, Reply, NewStateData} +%% {stop, Reason, Reply, NewStateData} %%---------------------------------------------------------------------- handle_sync_event(_Event, _From, StateName, StateData) -> Reply = ok, @@ -491,7 +494,7 @@ code_change(_OldVsn, StateName, StateData, _Extra) -> %% Func: handle_info/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | -%% {stop, Reason, NewStateData} +%% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- handle_info({send_text, Text}, StateName, StateData) -> send_text(StateData, Text), @@ -677,5 +680,3 @@ match_labels([DL | DLabels], [PL | PLabels]) -> false -> false end. - - diff --git a/src/ejabberd_s2s_out.erl b/src/ejabberd_s2s_out.erl index 363f03120..8bf756a16 100644 --- a/src/ejabberd_s2s_out.erl +++ b/src/ejabberd_s2s_out.erl @@ -107,7 +107,7 @@ stop_connection(Pid) -> %% Returns: {ok, StateName, StateData} | %% {ok, StateName, StateData, Timeout} | %% ignore | -%% {stop, StopReason} +%% {stop, StopReason} %%---------------------------------------------------------------------- init([From, Server, Type]) -> process_flag(trap_exit, true), @@ -146,7 +146,7 @@ init([From, Server, Type]) -> %% Func: StateName/2 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | -%% {stop, Reason, NewStateData} +%% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- open_socket(init, StateData) -> ?INFO_MSG("open_socket: ~p", [{StateData#state.myname, @@ -583,7 +583,7 @@ stream_established(closed, StateData) -> %% {reply, Reply, NextStateName, NextStateData} | %% {reply, Reply, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} | -%% {stop, Reason, Reply, NewStateData} +%% {stop, Reason, Reply, NewStateData} %%---------------------------------------------------------------------- %state_name(Event, From, StateData) -> % Reply = ok, @@ -593,7 +593,7 @@ stream_established(closed, StateData) -> %% Func: handle_event/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | -%% {stop, Reason, NewStateData} +%% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- handle_event(Event, StateName, StateData) -> {next_state, StateName, StateData}. @@ -605,7 +605,7 @@ handle_event(Event, StateName, StateData) -> %% {reply, Reply, NextStateName, NextStateData} | %% {reply, Reply, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} | -%% {stop, Reason, Reply, NewStateData} +%% {stop, Reason, Reply, NewStateData} %%---------------------------------------------------------------------- handle_sync_event(Event, From, StateName, StateData) -> Reply = ok, @@ -618,7 +618,7 @@ code_change(OldVsn, StateName, StateData, Extra) -> %% Func: handle_info/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | -%% {stop, Reason, NewStateData} +%% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- handle_info({send_text, Text}, StateName, StateData) -> send_text(StateData, Text), @@ -848,4 +848,3 @@ test_get_addr_port(Server) -> lists:keyreplace(HostPort, 1, Acc, {HostPort, Num + 1}) end end, [], lists:seq(1, 100000)). -