mirror of
https://github.com/processone/ejabberd.git
synced 2024-10-09 15:06:54 +02:00
open up to 3 s2s outgoing connection per domain pair
SVN Revision: 928
This commit is contained in:
parent
e809c6c322
commit
9fc203ee6d
@ -3,12 +3,12 @@
|
||||
%%% Author : Alexey Shchepin <alexey@sevcom.net>
|
||||
%%% Purpose : S2S connections manager
|
||||
%%% Created : 7 Dec 2002 by Alexey Shchepin <alexey@sevcom.net>
|
||||
%%% Id : $Id$
|
||||
%%% Id : $Id: ejabberd_s2s.erl 820 2007-07-19 21:17:13Z mremond $
|
||||
%%%----------------------------------------------------------------------
|
||||
|
||||
-module(ejabberd_s2s).
|
||||
-author('alexey@sevcom.net').
|
||||
-vsn('$Revision$ ').
|
||||
-vsn('$Revision: 820 $ ').
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
@ -16,9 +16,8 @@
|
||||
-export([start_link/0,
|
||||
route/3,
|
||||
have_connection/1,
|
||||
get_key/1,
|
||||
has_key/2,
|
||||
try_register/1,
|
||||
remove_connection/1,
|
||||
remove_connection/3,
|
||||
dirty_get_connections/0,
|
||||
allow_host/2,
|
||||
@ -55,14 +54,9 @@ route(From, To, Packet) ->
|
||||
ok
|
||||
end.
|
||||
|
||||
remove_connection(FromTo) ->
|
||||
F = fun() ->
|
||||
mnesia:delete({s2s, FromTo})
|
||||
end,
|
||||
mnesia:transaction(F).
|
||||
|
||||
remove_connection(FromTo, Pid, Key) ->
|
||||
case catch mnesia:dirty_read(s2s, FromTo) of
|
||||
?ERROR_MSG("XXXXXXXXXXX ~p~n", [{FromTo, Pid, Key}]),
|
||||
case catch mnesia:dirty_match_object(s2s, {s2s, FromTo, Pid, '_'}) of
|
||||
[#s2s{pid = Pid, key = Key}] ->
|
||||
F = fun() ->
|
||||
mnesia:delete_object(#s2s{fromto = FromTo,
|
||||
@ -82,23 +76,27 @@ have_connection(FromTo) ->
|
||||
false
|
||||
end.
|
||||
|
||||
get_key(FromTo) ->
|
||||
case catch mnesia:dirty_read(s2s, FromTo) of
|
||||
[E] ->
|
||||
E#s2s.key;
|
||||
has_key(FromTo, Key) ->
|
||||
case mnesia:dirty_select(s2s,
|
||||
[{#s2s{fromto = FromTo, key = Key, _ = '_'},
|
||||
[],
|
||||
['$_']}]) of
|
||||
[] ->
|
||||
false;
|
||||
_ ->
|
||||
error
|
||||
true
|
||||
end.
|
||||
|
||||
try_register(FromTo) ->
|
||||
Key = randoms:get_string(),
|
||||
Max_S2S_Connexions_Number = 3,
|
||||
F = fun() ->
|
||||
case mnesia:read({s2s, FromTo}) of
|
||||
[] ->
|
||||
mnesia:write(#s2s{fromto = FromTo,
|
||||
pid = self(),
|
||||
key = Key}),
|
||||
{key, Key};
|
||||
L when length(L) < Max_S2S_Connexions_Number ->
|
||||
mnesia:write(#s2s{fromto = FromTo,
|
||||
pid = self(),
|
||||
key = Key}),
|
||||
{key, Key};
|
||||
_ ->
|
||||
false
|
||||
end
|
||||
@ -126,9 +124,10 @@ dirty_get_connections() ->
|
||||
%%--------------------------------------------------------------------
|
||||
init([]) ->
|
||||
update_tables(),
|
||||
mnesia:create_table(s2s, [{ram_copies, [node()]},
|
||||
mnesia:create_table(s2s, [{ram_copies, [node()]}, {type, bag},
|
||||
{attributes, record_info(fields, s2s)}]),
|
||||
mnesia:add_table_copy(s2s, node(), ram_copies),
|
||||
mnesia:add_table_index(s2s, key),
|
||||
mnesia:subscribe(system),
|
||||
ejabberd_ctl:register_commands(
|
||||
[{"incoming-s2s-number", "print number of incoming s2s connections on the node"},
|
||||
@ -224,7 +223,7 @@ do_route(From, To, Packet) ->
|
||||
Attrs),
|
||||
send_element(Pid, {xmlelement, Name, NewAttrs, Els}),
|
||||
ok;
|
||||
{aborted, Reason} ->
|
||||
{aborted, _Reason} ->
|
||||
case xml:get_tag_attr_s("type", Packet) of
|
||||
"error" -> ok;
|
||||
"result" -> ok;
|
||||
@ -240,6 +239,8 @@ find_connection(From, To) ->
|
||||
#jid{lserver = MyServer} = From,
|
||||
#jid{lserver = Server} = To,
|
||||
FromTo = {MyServer, Server},
|
||||
Max_S2S_Connexions_Number = 3,
|
||||
?ERROR_MSG("XXX Finding connection for ~p~n", [FromTo]),
|
||||
case catch mnesia:dirty_read(s2s, FromTo) of
|
||||
{'EXIT', Reason} ->
|
||||
{aborted, Reason};
|
||||
@ -250,36 +251,49 @@ find_connection(From, To) ->
|
||||
case {is_service(From, To),
|
||||
allow_host(MyServer, Server)} of
|
||||
{false, true} ->
|
||||
?DEBUG("starting new s2s connection~n", []),
|
||||
Key = randoms:get_string(),
|
||||
{ok, Pid} = ejabberd_s2s_out:start(
|
||||
MyServer, Server, {new, Key}),
|
||||
F = fun() ->
|
||||
case mnesia:read({s2s, FromTo}) of
|
||||
[El] ->
|
||||
El#s2s.pid;
|
||||
[] ->
|
||||
mnesia:write(#s2s{fromto = FromTo,
|
||||
pid = Pid,
|
||||
key = Key}),
|
||||
Pid
|
||||
end
|
||||
end,
|
||||
TRes = mnesia:transaction(F),
|
||||
case TRes of
|
||||
{atomic, Pid} ->
|
||||
ejabberd_s2s_out:start_connection(Pid);
|
||||
_ ->
|
||||
ejabberd_s2s_out:stop_connection(Pid)
|
||||
end,
|
||||
TRes;
|
||||
new_connection(MyServer, Server, From, FromTo, Max_S2S_Connexions_Number);
|
||||
_ ->
|
||||
{aborted, error}
|
||||
end;
|
||||
[El] ->
|
||||
{atomic, El#s2s.pid}
|
||||
L when is_list(L) , length(L) < Max_S2S_Connexions_Number ->
|
||||
%% We establish another connection for this pair.
|
||||
new_connection(MyServer, Server, From, FromTo, Max_S2S_Connexions_Number);
|
||||
L when is_list(L) ->
|
||||
%% We choose a connexion from the pool of opened ones.
|
||||
{atomic, choose_connection(From, L)}
|
||||
end.
|
||||
|
||||
choose_connection(From, Connections) ->
|
||||
El = lists:nth(erlang:phash(From, length(Connections)), Connections),
|
||||
%El = lists:nth(random:uniform(length(Connections)), Connections),
|
||||
?ERROR_MSG("XXX using ejabberd_s2s_out ~p~n", [El#s2s.pid]),
|
||||
El#s2s.pid.
|
||||
|
||||
new_connection(MyServer, Server, From, FromTo, Max_S2S_Connexions_Number) ->
|
||||
Key = randoms:get_string(),
|
||||
{ok, Pid} = ejabberd_s2s_out:start(
|
||||
MyServer, Server, {new, Key}),
|
||||
F = fun() ->
|
||||
case mnesia:read({s2s, FromTo}) of
|
||||
L when length(L) < Max_S2S_Connexions_Number ->
|
||||
mnesia:write(#s2s{fromto = FromTo,
|
||||
pid = Pid,
|
||||
key = Key}),
|
||||
?ERROR_MSG("XXX new s2s connection started ~p~n", [Pid]),
|
||||
Pid;
|
||||
L ->
|
||||
choose_connection(From, L)
|
||||
end
|
||||
end,
|
||||
TRes = mnesia:transaction(F),
|
||||
case TRes of
|
||||
{atomic, Pid} ->
|
||||
ejabberd_s2s_out:start_connection(Pid);
|
||||
_ ->
|
||||
ejabberd_s2s_out:stop_connection(Pid)
|
||||
end,
|
||||
TRes.
|
||||
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Function: is_service(From, To) -> true | false
|
||||
@ -321,6 +335,15 @@ ctl_process(Val, _Args) ->
|
||||
Val.
|
||||
|
||||
update_tables() ->
|
||||
case catch mnesia:table_info(s2s, type) of
|
||||
bag ->
|
||||
ok;
|
||||
{'EXIT', _} ->
|
||||
ok;
|
||||
_ ->
|
||||
% XXX TODO convert it ?
|
||||
mnesia:delete_table(s2s)
|
||||
end,
|
||||
case catch mnesia:table_info(s2s, attributes) of
|
||||
[fromto, node, key] ->
|
||||
mnesia:transform_table(s2s, ignore, [fromto, pid, key]),
|
||||
@ -349,5 +372,3 @@ allow_host(MyServer, S2SHost) ->
|
||||
_ -> true %% The default s2s policy is allow
|
||||
end
|
||||
end.
|
||||
|
||||
|
||||
|
@ -3,7 +3,7 @@
|
||||
%%% Author : Alexey Shchepin <alexey@sevcom.net>
|
||||
%%% Purpose : Serve incoming s2s connection
|
||||
%%% Created : 6 Dec 2002 by Alexey Shchepin <alexey@sevcom.net>
|
||||
%%% Id : $Id$
|
||||
%%% Id : $Id: ejabberd_s2s_in.erl 820 2007-07-19 21:17:13Z mremond $
|
||||
%%%----------------------------------------------------------------------
|
||||
|
||||
-module(ejabberd_s2s_in).
|
||||
@ -74,7 +74,7 @@
|
||||
-define(HOST_UNKNOWN_ERR,
|
||||
xml:element_to_string(?SERR_HOST_UNKNOWN)).
|
||||
|
||||
-define(INVALID_FROM_ERR,
|
||||
-define(INVALID_FROM_ERR,
|
||||
xml:element_to_string(?SERR_INVALID_FROM)).
|
||||
|
||||
-define(INVALID_XML_ERR,
|
||||
@ -101,7 +101,7 @@ socket_type() ->
|
||||
%% Returns: {ok, StateName, StateData} |
|
||||
%% {ok, StateName, StateData, Timeout} |
|
||||
%% ignore |
|
||||
%% {stop, StopReason}
|
||||
%% {stop, StopReason}
|
||||
%%----------------------------------------------------------------------
|
||||
init([{SockMod, Socket}, Opts]) ->
|
||||
?INFO_MSG("started: ~p", [Socket]),
|
||||
@ -136,7 +136,7 @@ init([{SockMod, Socket}, Opts]) ->
|
||||
%% Func: StateName/2
|
||||
%% Returns: {next_state, NextStateName, NextStateData} |
|
||||
%% {next_state, NextStateName, NextStateData, Timeout} |
|
||||
%% {stop, Reason, NewStateData}
|
||||
%% {stop, Reason, NewStateData}
|
||||
%%----------------------------------------------------------------------
|
||||
|
||||
wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
|
||||
@ -312,8 +312,8 @@ stream_established({xmlstreamelement, El}, StateData) ->
|
||||
?INFO_MSG("GET KEY: ~p", [{To, From, Id, Key}]),
|
||||
LTo = jlib:nameprep(To),
|
||||
LFrom = jlib:nameprep(From),
|
||||
%% Checks if the from domain is allowed and if the to
|
||||
%% domain is handled by this server:
|
||||
%% Checks if the from domain is allowed and if the to
|
||||
%% domain is handled by this server:
|
||||
case {ejabberd_s2s:allow_host(To, From),
|
||||
lists:member(LTo, ejabberd_router:dirty_get_all_domains())} of
|
||||
{true, true} ->
|
||||
@ -338,10 +338,13 @@ stream_established({xmlstreamelement, El}, StateData) ->
|
||||
?INFO_MSG("VERIFY KEY: ~p", [{To, From, Id, Key}]),
|
||||
LTo = jlib:nameprep(To),
|
||||
LFrom = jlib:nameprep(From),
|
||||
Key1 = ejabberd_s2s:get_key({LTo, LFrom}),
|
||||
Type = if Key == Key1 -> "valid";
|
||||
true -> "invalid"
|
||||
Type = case ejabberd_s2s:has_key({LTo, LFrom}, Key) of
|
||||
true -> "valid";
|
||||
_ -> "invalid"
|
||||
end,
|
||||
%Type = if Key == Key1 -> "valid";
|
||||
% true -> "invalid"
|
||||
% end,
|
||||
send_element(StateData,
|
||||
{xmlelement,
|
||||
"db:verify",
|
||||
@ -456,7 +459,7 @@ stream_established(closed, StateData) ->
|
||||
%% {reply, Reply, NextStateName, NextStateData} |
|
||||
%% {reply, Reply, NextStateName, NextStateData, Timeout} |
|
||||
%% {stop, Reason, NewStateData} |
|
||||
%% {stop, Reason, Reply, NewStateData}
|
||||
%% {stop, Reason, Reply, NewStateData}
|
||||
%%----------------------------------------------------------------------
|
||||
%state_name(Event, From, StateData) ->
|
||||
% Reply = ok,
|
||||
@ -466,7 +469,7 @@ stream_established(closed, StateData) ->
|
||||
%% Func: handle_event/3
|
||||
%% Returns: {next_state, NextStateName, NextStateData} |
|
||||
%% {next_state, NextStateName, NextStateData, Timeout} |
|
||||
%% {stop, Reason, NewStateData}
|
||||
%% {stop, Reason, NewStateData}
|
||||
%%----------------------------------------------------------------------
|
||||
handle_event(_Event, StateName, StateData) ->
|
||||
{next_state, StateName, StateData}.
|
||||
@ -478,7 +481,7 @@ handle_event(_Event, StateName, StateData) ->
|
||||
%% {reply, Reply, NextStateName, NextStateData} |
|
||||
%% {reply, Reply, NextStateName, NextStateData, Timeout} |
|
||||
%% {stop, Reason, NewStateData} |
|
||||
%% {stop, Reason, Reply, NewStateData}
|
||||
%% {stop, Reason, Reply, NewStateData}
|
||||
%%----------------------------------------------------------------------
|
||||
handle_sync_event(_Event, _From, StateName, StateData) ->
|
||||
Reply = ok,
|
||||
@ -491,7 +494,7 @@ code_change(_OldVsn, StateName, StateData, _Extra) ->
|
||||
%% Func: handle_info/3
|
||||
%% Returns: {next_state, NextStateName, NextStateData} |
|
||||
%% {next_state, NextStateName, NextStateData, Timeout} |
|
||||
%% {stop, Reason, NewStateData}
|
||||
%% {stop, Reason, NewStateData}
|
||||
%%----------------------------------------------------------------------
|
||||
handle_info({send_text, Text}, StateName, StateData) ->
|
||||
send_text(StateData, Text),
|
||||
@ -677,5 +680,3 @@ match_labels([DL | DLabels], [PL | PLabels]) ->
|
||||
false ->
|
||||
false
|
||||
end.
|
||||
|
||||
|
||||
|
@ -107,7 +107,7 @@ stop_connection(Pid) ->
|
||||
%% Returns: {ok, StateName, StateData} |
|
||||
%% {ok, StateName, StateData, Timeout} |
|
||||
%% ignore |
|
||||
%% {stop, StopReason}
|
||||
%% {stop, StopReason}
|
||||
%%----------------------------------------------------------------------
|
||||
init([From, Server, Type]) ->
|
||||
process_flag(trap_exit, true),
|
||||
@ -146,7 +146,7 @@ init([From, Server, Type]) ->
|
||||
%% Func: StateName/2
|
||||
%% Returns: {next_state, NextStateName, NextStateData} |
|
||||
%% {next_state, NextStateName, NextStateData, Timeout} |
|
||||
%% {stop, Reason, NewStateData}
|
||||
%% {stop, Reason, NewStateData}
|
||||
%%----------------------------------------------------------------------
|
||||
open_socket(init, StateData) ->
|
||||
?INFO_MSG("open_socket: ~p", [{StateData#state.myname,
|
||||
@ -583,7 +583,7 @@ stream_established(closed, StateData) ->
|
||||
%% {reply, Reply, NextStateName, NextStateData} |
|
||||
%% {reply, Reply, NextStateName, NextStateData, Timeout} |
|
||||
%% {stop, Reason, NewStateData} |
|
||||
%% {stop, Reason, Reply, NewStateData}
|
||||
%% {stop, Reason, Reply, NewStateData}
|
||||
%%----------------------------------------------------------------------
|
||||
%state_name(Event, From, StateData) ->
|
||||
% Reply = ok,
|
||||
@ -593,7 +593,7 @@ stream_established(closed, StateData) ->
|
||||
%% Func: handle_event/3
|
||||
%% Returns: {next_state, NextStateName, NextStateData} |
|
||||
%% {next_state, NextStateName, NextStateData, Timeout} |
|
||||
%% {stop, Reason, NewStateData}
|
||||
%% {stop, Reason, NewStateData}
|
||||
%%----------------------------------------------------------------------
|
||||
handle_event(Event, StateName, StateData) ->
|
||||
{next_state, StateName, StateData}.
|
||||
@ -605,7 +605,7 @@ handle_event(Event, StateName, StateData) ->
|
||||
%% {reply, Reply, NextStateName, NextStateData} |
|
||||
%% {reply, Reply, NextStateName, NextStateData, Timeout} |
|
||||
%% {stop, Reason, NewStateData} |
|
||||
%% {stop, Reason, Reply, NewStateData}
|
||||
%% {stop, Reason, Reply, NewStateData}
|
||||
%%----------------------------------------------------------------------
|
||||
handle_sync_event(Event, From, StateName, StateData) ->
|
||||
Reply = ok,
|
||||
@ -618,7 +618,7 @@ code_change(OldVsn, StateName, StateData, Extra) ->
|
||||
%% Func: handle_info/3
|
||||
%% Returns: {next_state, NextStateName, NextStateData} |
|
||||
%% {next_state, NextStateName, NextStateData, Timeout} |
|
||||
%% {stop, Reason, NewStateData}
|
||||
%% {stop, Reason, NewStateData}
|
||||
%%----------------------------------------------------------------------
|
||||
handle_info({send_text, Text}, StateName, StateData) ->
|
||||
send_text(StateData, Text),
|
||||
@ -848,4 +848,3 @@ test_get_addr_port(Server) ->
|
||||
lists:keyreplace(HostPort, 1, Acc, {HostPort, Num + 1})
|
||||
end
|
||||
end, [], lists:seq(1, 100000)).
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user