diff --git a/ChangeLog b/ChangeLog index 349072242..37539a77e 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,10 +1,16 @@ +2007-09-25 Alexey Shchepin + + * src/ejabberd_s2s.erl: Max number of connections and max number + of connections per node now can be specified via + max_s2s_connections and max_s2s_connections_per_node acl rules + 2007-09-14 Mickael Remond * src/ejabberd_s2s_out.erl: Changed to actual p1_fsm behaviour. It was working correctly as the API of p1_fsm and gen_fsm is strictly the same. - * src/ejabberd_s2_in.erl: Added debug hook for s2s loop (EJAB-358). + * src/ejabberd_s2s_in.erl: Added debug hook for s2s loop (EJAB-358). * src/ejabberd_c2s.erl: Added debug hook for c2s loop (EJAB-358). diff --git a/src/ejabberd_s2s.erl b/src/ejabberd_s2s.erl index 8ebda1cf6..96e8e14de 100644 --- a/src/ejabberd_s2s.erl +++ b/src/ejabberd_s2s.erl @@ -3,36 +3,37 @@ %%% Author : Alexey Shchepin %%% Purpose : S2S connections manager %%% Created : 7 Dec 2002 by Alexey Shchepin -%%% Id : $Id: ejabberd_s2s.erl 820 2007-07-19 21:17:13Z mremond $ +%%% Id : $Id$ %%%---------------------------------------------------------------------- -module(ejabberd_s2s). -author('alexey@sevcom.net'). --vsn('$Revision: 820 $ '). +-vsn('$Revision$ '). -behaviour(gen_server). %% API -export([start_link/0, - route/3, - have_connection/1, - has_key/2, - try_register/1, - remove_connection/3, - dirty_get_connections/0, - allow_host/2, - ctl_process/2 - ]). + route/3, + have_connection/1, + has_key/2, + try_register/1, + remove_connection/3, + dirty_get_connections/0, + allow_host/2, + ctl_process/2 + ]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3]). -include("ejabberd.hrl"). -include("jlib.hrl"). -include("ejabberd_ctl.hrl"). --define(DEFAULT_MAX_S2S_CONNEXIONS_NUMBER, 3). +-define(DEFAULT_MAX_S2S_CONNECTIONS_NUMBER, 1). +-define(DEFAULT_MAX_S2S_CONNECTIONS_NUMBER_PER_NODE, 1). -record(s2s, {fromto, pid, key}). -record(state, {}). @@ -49,64 +50,72 @@ start_link() -> route(From, To, Packet) -> case catch do_route(From, To, Packet) of - {'EXIT', Reason} -> - ?ERROR_MSG("~p~nwhen processing: ~p", - [Reason, {From, To, Packet}]); - _ -> - ok + {'EXIT', Reason} -> + ?ERROR_MSG("~p~nwhen processing: ~p", + [Reason, {From, To, Packet}]); + _ -> + ok end. remove_connection(FromTo, Pid, Key) -> - case catch mnesia:dirty_match_object(s2s, {s2s, FromTo, Pid, '_'}) of - [#s2s{pid = Pid, key = Key}] -> - F = fun() -> - mnesia:delete_object(#s2s{fromto = FromTo, - pid = Pid, - key = Key}) - end, - mnesia:transaction(F); - _ -> - ok + case catch mnesia:dirty_match_object(s2s, #s2s{fromto = FromTo, + pid = Pid, + _ = '_'}) of + [#s2s{pid = Pid, key = Key}] -> + F = fun() -> + mnesia:delete_object(#s2s{fromto = FromTo, + pid = Pid, + key = Key}) + end, + mnesia:transaction(F); + _ -> + ok end. have_connection(FromTo) -> case catch mnesia:dirty_read(s2s, FromTo) of - [_] -> - true; - _ -> - false + [_] -> + true; + _ -> + false end. has_key(FromTo, Key) -> case mnesia:dirty_select(s2s, - [{#s2s{fromto = FromTo, key = Key, _ = '_'}, - [], - ['$_']}]) of - [] -> - false; - _ -> - true + [{#s2s{fromto = FromTo, key = Key, _ = '_'}, + [], + ['$_']}]) of + [] -> + false; + _ -> + true end. try_register(FromTo) -> Key = randoms:get_string(), - Max_S2S_Connections_Number = max_s2s_connexions_number(element(1, FromTo)), + MaxS2SConnectionsNumber = max_s2s_connections_number(FromTo), + MaxS2SConnectionsNumberPerNode = + max_s2s_connections_number_per_node(FromTo), F = fun() -> - case mnesia:read({s2s, FromTo}) of - L when length(L) < Max_S2S_Connections_Number -> - mnesia:write(#s2s{fromto = FromTo, - pid = self(), - key = Key}), - {key, Key}; - _ -> - false - end - end, + L = mnesia:read({s2s, FromTo}), + NeededConnections = needed_connections_number( + L, MaxS2SConnectionsNumber, + MaxS2SConnectionsNumberPerNode), + if + NeededConnections > 0 -> + mnesia:write(#s2s{fromto = FromTo, + pid = self(), + key = Key}), + {key, Key}; + true -> + false + end + end, case mnesia:transaction(F) of - {atomic, Res} -> - Res; - _ -> - false + {atomic, Res} -> + Res; + _ -> + false end. dirty_get_connections() -> @@ -126,9 +135,8 @@ dirty_get_connections() -> init([]) -> update_tables(), mnesia:create_table(s2s, [{ram_copies, [node()]}, {type, bag}, - {attributes, record_info(fields, s2s)}]), + {attributes, record_info(fields, s2s)}]), mnesia:add_table_copy(s2s, node(), ram_copies), - mnesia:add_table_index(s2s, key), mnesia:subscribe(system), ejabberd_ctl:register_commands( [{"incoming-s2s-number", "print number of incoming s2s connections on the node"}, @@ -169,11 +177,11 @@ handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> {noreply, State}; handle_info({route, From, To, Packet}, State) -> case catch do_route(From, To, Packet) of - {'EXIT', Reason} -> - ?ERROR_MSG("~p~nwhen processing: ~p", - [Reason, {From, To, Packet}]); - _ -> - ok + {'EXIT', Reason} -> + ?ERROR_MSG("~p~nwhen processing: ~p", + [Reason, {From, To, Packet}]); + _ -> + ok end, {noreply, State}; handle_info(_Info, State) -> @@ -201,125 +209,162 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- clean_table_from_bad_node(Node) -> F = fun() -> - Es = mnesia:select( - s2s, - [{#s2s{pid = '$1', _ = '_'}, - [{'==', {node, '$1'}, Node}], - ['$_']}]), - lists:foreach(fun(E) -> - mnesia:delete_object(E) - end, Es) - end, + Es = mnesia:select( + s2s, + [{#s2s{pid = '$1', _ = '_'}, + [{'==', {node, '$1'}, Node}], + ['$_']}]), + lists:foreach(fun(E) -> + mnesia:delete_object(E) + end, Es) + end, mnesia:transaction(F). do_route(From, To, Packet) -> ?DEBUG("s2s manager~n\tfrom ~p~n\tto ~p~n\tpacket ~P~n", - [From, To, Packet, 8]), + [From, To, Packet, 8]), case find_connection(From, To) of - {atomic, Pid} when pid(Pid) -> - ?DEBUG("sending to process ~p~n", [Pid]), - {xmlelement, Name, Attrs, Els} = Packet, - NewAttrs = jlib:replace_from_to_attrs(jlib:jid_to_string(From), - jlib:jid_to_string(To), - Attrs), - send_element(Pid, {xmlelement, Name, NewAttrs, Els}), - ok; - {aborted, _Reason} -> - case xml:get_tag_attr_s("type", Packet) of - "error" -> ok; - "result" -> ok; - _ -> - Err = jlib:make_error_reply( - Packet, ?ERR_SERVICE_UNAVAILABLE), - ejabberd_router:route(To, From, Err) - end, - false + {atomic, Pid} when pid(Pid) -> + ?DEBUG("sending to process ~p~n", [Pid]), + {xmlelement, Name, Attrs, Els} = Packet, + NewAttrs = jlib:replace_from_to_attrs(jlib:jid_to_string(From), + jlib:jid_to_string(To), + Attrs), + send_element(Pid, {xmlelement, Name, NewAttrs, Els}), + ok; + {aborted, _Reason} -> + case xml:get_tag_attr_s("type", Packet) of + "error" -> ok; + "result" -> ok; + _ -> + Err = jlib:make_error_reply( + Packet, ?ERR_SERVICE_UNAVAILABLE), + ejabberd_router:route(To, From, Err) + end, + false end. find_connection(From, To) -> #jid{lserver = MyServer} = From, #jid{lserver = Server} = To, FromTo = {MyServer, Server}, - Max_S2S_Connections_Number = max_s2s_connexions_number(MyServer), - ?INFO_MSG("Finding connection for ~p~n", [FromTo]), + MaxS2SConnectionsNumber = max_s2s_connections_number(FromTo), + MaxS2SConnectionsNumberPerNode = + max_s2s_connections_number_per_node(FromTo), + ?DEBUG("Finding connection for ~p~n", [FromTo]), case catch mnesia:dirty_read(s2s, FromTo) of - {'EXIT', Reason} -> - {aborted, Reason}; - [] -> - %% We try to establish all the connections if the host is not a - %% service and if the s2s host is not blacklisted or - %% is in whitelist: - case not is_service(From, To) andalso allow_host(MyServer, Server) of - true -> - open_several_connections(Max_S2S_Connections_Number, MyServer, - Server, From, FromTo, Max_S2S_Connections_Number); - false -> - {aborted, error} - end; - L when is_list(L) , length(L) < Max_S2S_Connections_Number -> - %% We establish the missing connections for this pair. - open_several_connections(Max_S2S_Connections_Number-length(L), MyServer, - Server, From, FromTo, Max_S2S_Connections_Number); - L when is_list(L) -> - %% We choose a connexion from the pool of opened ones. - {atomic, choose_connection(From, L)} + {'EXIT', Reason} -> + {aborted, Reason}; + [] -> + %% We try to establish all the connections if the host is not a + %% service and if the s2s host is not blacklisted or + %% is in whitelist: + case not is_service(From, To) andalso allow_host(MyServer, Server) of + true -> + NeededConnections = needed_connections_number( + [], MaxS2SConnectionsNumber, + MaxS2SConnectionsNumberPerNode), + open_several_connections( + NeededConnections, MyServer, + Server, From, FromTo, + MaxS2SConnectionsNumber, MaxS2SConnectionsNumberPerNode); + false -> + {aborted, error} + end; + L when is_list(L) -> + NeededConnections = needed_connections_number( + L, MaxS2SConnectionsNumber, + MaxS2SConnectionsNumberPerNode), + if + NeededConnections > 0 -> + %% We establish the missing connections for this pair. + open_several_connections( + NeededConnections, MyServer, + Server, From, FromTo, + MaxS2SConnectionsNumber, MaxS2SConnectionsNumberPerNode); + true -> + %% We choose a connexion from the pool of opened ones. + {atomic, choose_connection(From, L)} + end end. choose_connection(From, Connections) -> - %El = lists:nth(random:uniform(length(Connections)), Connections), - % use sticky connections based on the full JID of the sender - Pid = case lists:nth(erlang:phash(From, length(Connections)), Connections) of - El when is_record(El, s2s) -> - El#s2s.pid; - P when is_pid(P) -> - P - end, - ?INFO_MSG("Using ejabberd_s2s_out ~p~n", [Pid]), + choose_pid(From, [C#s2s.pid || C <- Connections]). + +choose_pid(From, Pids) -> + Pids1 = case [P || P <- Pids, node(P) == node()] of + [] -> Pids; + Ps -> Ps + end, + %% use sticky connections based on the full JID of the sender + Pid = lists:nth(erlang:phash(From, length(Pids1)), Pids1), + ?DEBUG("Using ejabberd_s2s_out ~p~n", [Pid]), Pid. -open_several_connections(N, MyServer, Server, From, FromTo, Max_S2S_Connections_Number) -> - Connections_Result = [new_connection(MyServer, Server, From, FromTo, Max_S2S_Connections_Number) - || _N <- lists:seq(1, N)], - case [PID || {atomic, PID} <- Connections_Result] of +open_several_connections(N, MyServer, Server, From, FromTo, + MaxS2SConnectionsNumber, + MaxS2SConnectionsNumberPerNode) -> + ConnectionsResult = + [new_connection(MyServer, Server, From, FromTo, + MaxS2SConnectionsNumber, MaxS2SConnectionsNumberPerNode) + || _N <- lists:seq(1, N)], + case [PID || {atomic, PID} <- ConnectionsResult] of [] -> - hd(Connections_Result); + hd(ConnectionsResult); PIDs -> - {atomic, choose_connection(From, PIDs)} + {atomic, choose_pid(From, PIDs)} end. -new_connection(MyServer, Server, From, FromTo, Max_S2S_Connections_Number) -> +new_connection(MyServer, Server, From, FromTo, + MaxS2SConnectionsNumber, MaxS2SConnectionsNumberPerNode) -> Key = randoms:get_string(), {ok, Pid} = ejabberd_s2s_out:start( - MyServer, Server, {new, Key}), + MyServer, Server, {new, Key}), F = fun() -> - case mnesia:read({s2s, FromTo}) of - L when length(L) < Max_S2S_Connections_Number -> - mnesia:write(#s2s{fromto = FromTo, - pid = Pid, - key = Key}), - ?INFO_MSG("New s2s connection started ~p~n", [Pid]), - Pid; - L -> - choose_connection(From, L) - end - end, + L = mnesia:read({s2s, FromTo}), + NeededConnections = needed_connections_number( + L, MaxS2SConnectionsNumber, + MaxS2SConnectionsNumberPerNode), + if + NeededConnections > 0 -> + mnesia:write(#s2s{fromto = FromTo, + pid = Pid, + key = Key}), + ?INFO_MSG("New s2s connection started ~p", [Pid]), + Pid; + true -> + choose_connection(From, L) + end + end, TRes = mnesia:transaction(F), case TRes of - {atomic, Pid} -> - ejabberd_s2s_out:start_connection(Pid); - _ -> - ejabberd_s2s_out:stop_connection(Pid) + {atomic, Pid} -> + ejabberd_s2s_out:start_connection(Pid); + _ -> + ejabberd_s2s_out:stop_connection(Pid) end, TRes. -max_s2s_connexions_number(Host) -> - case ejabberd_config:get_local_option({max_s2s_connexions_number, Host}) of - N when is_integer(N) -> - N; - _ -> - ?DEFAULT_MAX_S2S_CONNEXIONS_NUMBER +max_s2s_connections_number({From, To}) -> + case acl:match_rule( + From, max_s2s_connections, jlib:make_jid("", To, "")) of + Max when is_integer(Max) -> Max; + _ -> ?DEFAULT_MAX_S2S_CONNECTIONS_NUMBER end. +max_s2s_connections_number_per_node({From, To}) -> + case acl:match_rule( + From, max_s2s_connections_per_node, jlib:make_jid("", To, "")) of + Max when is_integer(Max) -> Max; + _ -> ?DEFAULT_MAX_S2S_CONNECTIONS_NUMBER_PER_NODE + end. + +needed_connections_number(Ls, MaxS2SConnectionsNumber, + MaxS2SConnectionsNumberPerNode) -> + LocalLs = [L || L <- Ls, node(L#s2s.pid) == node()], + lists:min([MaxS2SConnectionsNumber - length(Ls), + MaxS2SConnectionsNumberPerNode - length(LocalLs)]). + %%-------------------------------------------------------------------- %% Function: is_service(From, To) -> true | false %% Description: Return true if the destination must be considered as a @@ -328,12 +373,12 @@ max_s2s_connexions_number(Host) -> is_service(From, To) -> LFromDomain = From#jid.lserver, case ejabberd_config:get_local_option({route_subdomains, LFromDomain}) of - s2s -> % bypass RFC 3920 10.3 - false; - _ -> - LDstDomain = To#jid.lserver, - P = fun(Domain) -> is_subdomain(LDstDomain, Domain) end, - lists:any(P, ?MYHOSTS) + s2s -> % bypass RFC 3920 10.3 + false; + _ -> + LDstDomain = To#jid.lserver, + P = fun(Domain) -> is_subdomain(LDstDomain, Domain) end, + lists:any(P, ?MYHOSTS) end. %%-------------------------------------------------------------------- @@ -361,39 +406,40 @@ ctl_process(Val, _Args) -> update_tables() -> case catch mnesia:table_info(s2s, type) of - bag -> - ok; - {'EXIT', _} -> - ok; - _ -> - % XXX TODO convert it ? - mnesia:delete_table(s2s) + bag -> + ok; + {'EXIT', _} -> + ok; + _ -> + % XXX TODO convert it ? + mnesia:delete_table(s2s) end, case catch mnesia:table_info(s2s, attributes) of - [fromto, node, key] -> - mnesia:transform_table(s2s, ignore, [fromto, pid, key]), - mnesia:clear_table(s2s); - [fromto, pid, key] -> - ok; - {'EXIT', _} -> - ok + [fromto, node, key] -> + mnesia:transform_table(s2s, ignore, [fromto, pid, key]), + mnesia:clear_table(s2s); + [fromto, pid, key] -> + ok; + {'EXIT', _} -> + ok end, case lists:member(local_s2s, mnesia:system_info(tables)) of - true -> - mnesia:delete_table(local_s2s); - false -> - ok + true -> + mnesia:delete_table(local_s2s); + false -> + ok end. %% Check if host is in blacklist or white list allow_host(MyServer, S2SHost) -> case ejabberd_config:get_local_option({{s2s_host, S2SHost},MyServer}) of - deny -> false; - allow -> true; - _ -> - case ejabberd_config:get_local_option({s2s_default_policy, MyServer}) of - deny -> false; - allow -> true; - _ -> true %% The default s2s policy is allow - end + deny -> false; + allow -> true; + _ -> + case ejabberd_config:get_local_option({s2s_default_policy, MyServer}) of + deny -> false; + allow -> true; + _ -> true %% The default s2s policy is allow + end end. +