Improve type specs for ejabberd_s2s

Also minor code cleanup
This commit is contained in:
Evgeny Khramtsov 2019-07-09 16:42:24 +03:00
parent 43da45cf67
commit f19b41fd19
1 changed files with 101 additions and 101 deletions

View File

@ -54,51 +54,38 @@
-include("logger.hrl"). -include("logger.hrl").
-include("xmpp.hrl"). -include("xmpp.hrl").
-include("ejabberd_commands.hrl"). -include("ejabberd_commands.hrl").
-include_lib("public_key/include/public_key.hrl"). -include_lib("stdlib/include/ms_transform.hrl").
-include("ejabberd_stacktrace.hrl"). -include("ejabberd_stacktrace.hrl").
-include("translate.hrl"). -include("translate.hrl").
-define(PKIXEXPLICIT, 'OTP-PUB-KEY').
-define(PKIXIMPLICIT, 'OTP-PUB-KEY').
-include("XmppAddr.hrl").
-define(DEFAULT_MAX_S2S_CONNECTIONS_NUMBER, 1). -define(DEFAULT_MAX_S2S_CONNECTIONS_NUMBER, 1).
-define(DEFAULT_MAX_S2S_CONNECTIONS_NUMBER_PER_NODE, 1). -define(DEFAULT_MAX_S2S_CONNECTIONS_NUMBER_PER_NODE, 1).
-define(S2S_OVERLOAD_BLOCK_PERIOD, 60). -define(S2S_OVERLOAD_BLOCK_PERIOD, 60).
%% once a server is temporarly blocked, it stay blocked for 60 seconds %% once a server is temporarly blocked, it stay blocked for 60 seconds
-record(s2s, {fromto = {<<"">>, <<"">>} :: {binary(), binary()} | '_', -record(s2s, {fromto :: {binary(), binary()},
pid = self() :: pid() | '_' | '$1'}). pid :: pid()}).
-record(state, {}). -record(state, {}).
-record(temporarily_blocked, {host = <<"">> :: binary(), -record(temporarily_blocked, {host :: binary(),
timestamp :: integer()}). timestamp :: integer()}).
-type temporarily_blocked() :: #temporarily_blocked{}. -type temporarily_blocked() :: #temporarily_blocked{}.
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
[]).
clean_temporarily_blocked_table() -> clean_temporarily_blocked_table() ->
mnesia:clear_table(temporarily_blocked). mnesia:clear_table(temporarily_blocked).
-spec list_temporarily_blocked_hosts() -> [temporarily_blocked()]. -spec list_temporarily_blocked_hosts() -> [temporarily_blocked()].
list_temporarily_blocked_hosts() -> list_temporarily_blocked_hosts() ->
ets:tab2list(temporarily_blocked). ets:tab2list(temporarily_blocked).
-spec external_host_overloaded(binary()) -> {aborted, any()} | {atomic, ok}. -spec external_host_overloaded(binary()) -> {aborted, any()} | {atomic, ok}.
external_host_overloaded(Host) -> external_host_overloaded(Host) ->
?INFO_MSG("Disabling connections from ~s for ~p " ?INFO_MSG("Disabling s2s connections to ~s for ~p seconds",
"seconds",
[Host, ?S2S_OVERLOAD_BLOCK_PERIOD]), [Host, ?S2S_OVERLOAD_BLOCK_PERIOD]),
mnesia:transaction(fun () -> mnesia:transaction(fun () ->
Time = erlang:monotonic_time(), Time = erlang:monotonic_time(),
@ -107,21 +94,20 @@ external_host_overloaded(Host) ->
end). end).
-spec is_temporarly_blocked(binary()) -> boolean(). -spec is_temporarly_blocked(binary()) -> boolean().
is_temporarly_blocked(Host) -> is_temporarly_blocked(Host) ->
case mnesia:dirty_read(temporarily_blocked, Host) of case mnesia:dirty_read(temporarily_blocked, Host) of
[] -> false; [] -> false;
[#temporarily_blocked{timestamp = T} = Entry] -> [#temporarily_blocked{timestamp = T} = Entry] ->
Diff = erlang:monotonic_time() - T, Diff = erlang:monotonic_time() - T,
case erlang:convert_time_unit(Diff, native, microsecond) of case erlang:convert_time_unit(Diff, native, microsecond) of
N when N > (?S2S_OVERLOAD_BLOCK_PERIOD) * 1000 * 1000 -> N when N > (?S2S_OVERLOAD_BLOCK_PERIOD) * 1000 * 1000 ->
mnesia:dirty_delete_object(Entry), false; mnesia:dirty_delete_object(Entry), false;
_ -> true _ -> true
end end
end. end.
-spec remove_connection({binary(), binary()}, pid()) -> ok. -spec remove_connection({binary(), binary()}, pid()) -> ok.
remove_connection(FromTo, Pid) -> remove_connection({From, To} = FromTo, Pid) ->
case mnesia:dirty_match_object(s2s, #s2s{fromto = FromTo, pid = Pid}) of case mnesia:dirty_match_object(s2s, #s2s{fromto = FromTo, pid = Pid}) of
[#s2s{pid = Pid}] -> [#s2s{pid = Pid}] ->
F = fun() -> F = fun() ->
@ -130,25 +116,24 @@ remove_connection(FromTo, Pid) ->
case mnesia:transaction(F) of case mnesia:transaction(F) of
{atomic, _} -> ok; {atomic, _} -> ok;
{aborted, Reason} -> {aborted, Reason} ->
?ERROR_MSG("Failed to unregister s2s connection: " ?ERROR_MSG("Failed to unregister s2s connection ~s -> ~s: "
"Mnesia failure: ~p", [Reason]) "Mnesia failure: ~p",
[From, To, Reason])
end; end;
_ -> _ ->
ok ok
end. end.
-spec have_connection({binary(), binary()}) -> boolean(). -spec have_connection({binary(), binary()}) -> boolean().
have_connection(FromTo) -> have_connection(FromTo) ->
case catch mnesia:dirty_read(s2s, FromTo) of case catch mnesia:dirty_read(s2s, FromTo) of
[_] -> [_] ->
true; true;
_ -> _ ->
false false
end. end.
-spec get_connections_pids({binary(), binary()}) -> [pid()]. -spec get_connections_pids({binary(), binary()}) -> [pid()].
get_connections_pids(FromTo) -> get_connections_pids(FromTo) ->
case catch mnesia:dirty_read(s2s, FromTo) of case catch mnesia:dirty_read(s2s, FromTo) of
L when is_list(L) -> L when is_list(L) ->
@ -158,8 +143,7 @@ get_connections_pids(FromTo) ->
end. end.
-spec try_register({binary(), binary()}) -> boolean(). -spec try_register({binary(), binary()}) -> boolean().
try_register({From, To} = FromTo) ->
try_register(FromTo) ->
MaxS2SConnectionsNumber = max_s2s_connections_number(FromTo), MaxS2SConnectionsNumber = max_s2s_connections_number(FromTo),
MaxS2SConnectionsNumberPerNode = MaxS2SConnectionsNumberPerNode =
max_s2s_connections_number_per_node(FromTo), max_s2s_connections_number_per_node(FromTo),
@ -169,18 +153,21 @@ try_register(FromTo) ->
MaxS2SConnectionsNumber, MaxS2SConnectionsNumber,
MaxS2SConnectionsNumberPerNode), MaxS2SConnectionsNumberPerNode),
if NeededConnections > 0 -> if NeededConnections > 0 ->
mnesia:write(#s2s{fromto = FromTo, pid = self()}), mnesia:write(#s2s{fromto = FromTo, pid = self()}),
true; true;
true -> false true -> false
end end
end, end,
case mnesia:transaction(F) of case mnesia:transaction(F) of
{atomic, Res} -> Res; {atomic, Res} -> Res;
_ -> false {aborted, Reason} ->
?ERROR_MSG("Failed to register s2s connection ~s -> ~s: "
"Mnesia failure: ~p",
[From, To, Reason]),
false
end. end.
-spec dirty_get_connections() -> [{binary(), binary()}]. -spec dirty_get_connections() -> [{binary(), binary()}].
dirty_get_connections() -> dirty_get_connections() ->
mnesia:dirty_all_keys(s2s). mnesia:dirty_all_keys(s2s).
@ -276,10 +263,12 @@ init([]) ->
{stop, Reason} {stop, Reason}
end. end.
handle_call(_Request, _From, State) -> handle_call(Request, From, State) ->
{reply, ok, State}. ?WARNING_MSG("Unexpected call from ~p: ~p", [From, Request]),
{noreply, State}.
handle_cast(_Msg, State) -> handle_cast(Msg, State) ->
?WARNING_MSG("Unexpected cast: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
@ -294,14 +283,15 @@ handle_info({route, Packet}, State) ->
misc:format_exception(2, Class, Reason, StackTrace)]) misc:format_exception(2, Class, Reason, StackTrace)])
end, end,
{noreply, State}; {noreply, State};
handle_info(_Info, State) -> {noreply, State}. handle_info(Info, State) ->
?WARNING_MSG("Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) -> terminate(_Reason, _State) ->
ejabberd_commands:unregister_commands(get_commands_spec()), ejabberd_commands:unregister_commands(get_commands_spec()),
lists:foreach(fun host_down/1, ejabberd_option:hosts()), lists:foreach(fun host_down/1, ejabberd_option:hosts()),
ejabberd_hooks:delete(host_up, ?MODULE, host_up, 50), ejabberd_hooks:delete(host_up, ?MODULE, host_up, 50),
ejabberd_hooks:delete(host_down, ?MODULE, host_down, 60), ejabberd_hooks:delete(host_down, ?MODULE, host_down, 60).
ok.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -309,10 +299,12 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%%% Internal functions %%% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec host_up(binary()) -> ok.
host_up(Host) -> host_up(Host) ->
ejabberd_s2s_in:host_up(Host), ejabberd_s2s_in:host_up(Host),
ejabberd_s2s_out:host_up(Host). ejabberd_s2s_out:host_up(Host).
-spec host_down(binary()) -> ok.
host_down(Host) -> host_down(Host) ->
lists:foreach( lists:foreach(
fun(#s2s{fromto = {From, _}, pid = Pid}) when node(Pid) == node() -> fun(#s2s{fromto = {From, _}, pid = Pid}) when node(Pid) == node() ->
@ -334,12 +326,11 @@ clean_table_from_bad_node(Node) ->
F = fun() -> F = fun() ->
Es = mnesia:select( Es = mnesia:select(
s2s, s2s,
[{#s2s{pid = '$1', _ = '_'}, ets:fun2ms(
[{'==', {node, '$1'}, Node}], fun(#s2s{pid = Pid} = E) when node(Pid) == Node ->
['$_']}]), E
lists:foreach(fun(E) -> end)),
mnesia:delete_object(E) lists:foreach(fun mnesia:delete_object/1, Es)
end, Es)
end, end,
mnesia:async_dirty(F). mnesia:async_dirty(F).
@ -350,12 +341,12 @@ route(Packet) ->
To = xmpp:get_to(Packet), To = xmpp:get_to(Packet),
case start_connection(From, To) of case start_connection(From, To) of
{ok, Pid} when is_pid(Pid) -> {ok, Pid} when is_pid(Pid) ->
?DEBUG("Sending to process ~p~n", [Pid]), ?DEBUG("Sending to process ~p~n", [Pid]),
#jid{lserver = MyServer} = From, #jid{lserver = MyServer} = From,
ejabberd_hooks:run(s2s_send_packet, MyServer, [Packet]), ejabberd_hooks:run(s2s_send_packet, MyServer, [Packet]),
ejabberd_s2s_out:route(Pid, Packet); ejabberd_s2s_out:route(Pid, Packet);
{error, Reason} -> {error, Reason} ->
Lang = xmpp:get_lang(Packet), Lang = xmpp:get_lang(Packet),
Err = case Reason of Err = case Reason of
forbidden -> forbidden ->
xmpp:err_forbidden(?T("Access denied by service policy"), Lang); xmpp:err_forbidden(?T("Access denied by service policy"), Lang);
@ -366,12 +357,12 @@ route(Packet) ->
end. end.
-spec start_connection(jid(), jid()) -spec start_connection(jid(), jid())
-> {ok, pid()} | {error, forbidden | internal_server_error}. -> {ok, pid()} | {error, forbidden | internal_server_error}.
start_connection(From, To) -> start_connection(From, To) ->
start_connection(From, To, []). start_connection(From, To, []).
-spec start_connection(jid(), jid(), [proplists:property()]) -spec start_connection(jid(), jid(), [proplists:property()])
-> {ok, pid()} | {error, forbidden | internal_server_error}. -> {ok, pid()} | {error, forbidden | internal_server_error}.
start_connection(From, To, Opts) -> start_connection(From, To, Opts) ->
#jid{lserver = MyServer} = From, #jid{lserver = MyServer} = From,
#jid{lserver = Server} = To, #jid{lserver = Server} = To,
@ -382,11 +373,11 @@ start_connection(From, To, Opts) ->
max_s2s_connections_number_per_node(FromTo), max_s2s_connections_number_per_node(FromTo),
?DEBUG("Finding connection for ~p~n", [FromTo]), ?DEBUG("Finding connection for ~p~n", [FromTo]),
case mnesia:dirty_read(s2s, FromTo) of case mnesia:dirty_read(s2s, FromTo) of
[] -> [] ->
%% We try to establish all the connections if the host is not a %% We try to establish all the connections if the host is not a
%% service and if the s2s host is not blacklisted or %% service and if the s2s host is not blacklisted or
%% is in whitelist: %% is in whitelist:
LServer = ejabberd_router:host_of_route(MyServer), LServer = ejabberd_router:host_of_route(MyServer),
case allow_host(LServer, Server) of case allow_host(LServer, Server) of
true -> true ->
NeededConnections = needed_connections_number( NeededConnections = needed_connections_number(
@ -400,20 +391,20 @@ start_connection(From, To, Opts) ->
false -> false ->
{error, forbidden} {error, forbidden}
end; end;
L when is_list(L) -> L when is_list(L) ->
NeededConnections = needed_connections_number(L, NeededConnections = needed_connections_number(L,
MaxS2SConnectionsNumber, MaxS2SConnectionsNumber,
MaxS2SConnectionsNumberPerNode), MaxS2SConnectionsNumberPerNode),
if NeededConnections > 0 -> if NeededConnections > 0 ->
%% We establish the missing connections for this pair. %% We establish the missing connections for this pair.
open_several_connections(NeededConnections, MyServer, open_several_connections(NeededConnections, MyServer,
Server, From, FromTo, Server, From, FromTo,
MaxS2SConnectionsNumber, MaxS2SConnectionsNumber,
MaxS2SConnectionsNumberPerNode, Opts); MaxS2SConnectionsNumberPerNode, Opts);
true -> true ->
%% We choose a connexion from the pool of opened ones. %% We choose a connexion from the pool of opened ones.
{ok, choose_connection(From, L)} {ok, choose_connection(From, L)}
end end
end. end.
-spec choose_connection(jid(), [#s2s{}]) -> pid(). -spec choose_connection(jid(), [#s2s{}]) -> pid().
@ -423,8 +414,8 @@ choose_connection(From, Connections) ->
-spec choose_pid(jid(), [pid()]) -> pid(). -spec choose_pid(jid(), [pid()]) -> pid().
choose_pid(From, Pids) -> choose_pid(From, Pids) ->
Pids1 = case [P || P <- Pids, node(P) == node()] of Pids1 = case [P || P <- Pids, node(P) == node()] of
[] -> Pids; [] -> Pids;
Ps -> Ps Ps -> Ps
end, end,
Pid = Pid =
lists:nth(erlang:phash(jid:remove_resource(From), lists:nth(erlang:phash(jid:remove_resource(From),
@ -433,13 +424,17 @@ choose_pid(From, Pids) ->
?DEBUG("Using ejabberd_s2s_out ~p~n", [Pid]), ?DEBUG("Using ejabberd_s2s_out ~p~n", [Pid]),
Pid. Pid.
-spec open_several_connections(pos_integer(), binary(), binary(),
jid(), {binary(), binary()},
integer(), integer(), [proplists:property()]) ->
{ok, pid()} | {error, internal_server_error}.
open_several_connections(N, MyServer, Server, From, open_several_connections(N, MyServer, Server, From,
FromTo, MaxS2SConnectionsNumber, FromTo, MaxS2SConnectionsNumber,
MaxS2SConnectionsNumberPerNode, Opts) -> MaxS2SConnectionsNumberPerNode, Opts) ->
case lists:flatmap( case lists:flatmap(
fun(_) -> fun(_) ->
new_connection(MyServer, Server, new_connection(MyServer, Server,
From, FromTo, MaxS2SConnectionsNumber, From, FromTo, MaxS2SConnectionsNumber,
MaxS2SConnectionsNumberPerNode, Opts) MaxS2SConnectionsNumberPerNode, Opts)
end, lists:seq(1, N)) of end, lists:seq(1, N)) of
[] -> [] ->
@ -448,6 +443,8 @@ open_several_connections(N, MyServer, Server, From,
{ok, choose_pid(From, PIDs)} {ok, choose_pid(From, PIDs)}
end. end.
-spec new_connection(binary(), binary(), jid(), {binary(), binary()},
integer(), integer(), [proplists:property()]) -> [pid()].
new_connection(MyServer, Server, From, FromTo, new_connection(MyServer, Server, From, FromTo,
MaxS2SConnectionsNumber, MaxS2SConnectionsNumberPerNode, Opts) -> MaxS2SConnectionsNumber, MaxS2SConnectionsNumberPerNode, Opts) ->
{ok, Pid} = ejabberd_s2s_out:start(MyServer, Server, Opts), {ok, Pid} = ejabberd_s2s_out:start(MyServer, Server, Opts),
@ -457,22 +454,23 @@ new_connection(MyServer, Server, From, FromTo,
MaxS2SConnectionsNumber, MaxS2SConnectionsNumber,
MaxS2SConnectionsNumberPerNode), MaxS2SConnectionsNumberPerNode),
if NeededConnections > 0 -> if NeededConnections > 0 ->
mnesia:write(#s2s{fromto = FromTo, pid = Pid}), mnesia:write(#s2s{fromto = FromTo, pid = Pid}),
Pid; Pid;
true -> choose_connection(From, L) true -> choose_connection(From, L)
end end
end, end,
TRes = mnesia:transaction(F), TRes = mnesia:transaction(F),
case TRes of case TRes of
{atomic, Pid1} -> {atomic, Pid1} ->
if Pid1 == Pid -> if Pid1 == Pid ->
ejabberd_s2s_out:connect(Pid); ejabberd_s2s_out:connect(Pid);
true -> true ->
ejabberd_s2s_out:stop(Pid) ejabberd_s2s_out:stop(Pid)
end, end,
[Pid1]; [Pid1];
{aborted, Reason} -> {aborted, Reason} ->
?ERROR_MSG("Failed to register connection ~s -> ~s: ~p", ?ERROR_MSG("Failed to register s2s connection ~s -> ~s: "
"Mnesia failure: ~p",
[MyServer, Server, Reason]), [MyServer, Server, Reason]),
ejabberd_s2s_out:stop(Pid), ejabberd_s2s_out:stop(Pid),
[] []
@ -529,11 +527,13 @@ incoming_s2s_number() ->
outgoing_s2s_number() -> outgoing_s2s_number() ->
supervisor_count(ejabberd_s2s_out_sup). supervisor_count(ejabberd_s2s_out_sup).
-spec supervisor_count(atom()) -> non_neg_integer().
supervisor_count(Supervisor) -> supervisor_count(Supervisor) ->
case catch supervisor:which_children(Supervisor) of try supervisor:count_children(Supervisor) of
{'EXIT', _} -> 0; Props ->
Result -> proplists:get_value(workers, Props, 0)
length(Result) catch _:_ ->
0
end. end.
-spec stop_s2s_connections() -> ok. -spec stop_s2s_connections() -> ok.
@ -557,10 +557,12 @@ update_tables() ->
ok. ok.
%% Check if host is in blacklist or white list %% Check if host is in blacklist or white list
-spec allow_host(binary(), binary()) -> boolean().
allow_host(MyServer, S2SHost) -> allow_host(MyServer, S2SHost) ->
allow_host1(MyServer, S2SHost) andalso allow_host1(MyServer, S2SHost) andalso
not is_temporarly_blocked(S2SHost). not is_temporarly_blocked(S2SHost).
-spec allow_host1(binary(), binary()) -> boolean().
allow_host1(MyHost, S2SHost) -> allow_host1(MyHost, S2SHost) ->
Rule = ejabberd_option:s2s_access(MyHost), Rule = ejabberd_option:s2s_access(MyHost),
JID = jid:make(S2SHost), JID = jid:make(S2SHost),
@ -570,8 +572,7 @@ allow_host1(MyHost, S2SHost) ->
case ejabberd_hooks:run_fold(s2s_allow_host, MyHost, case ejabberd_hooks:run_fold(s2s_allow_host, MyHost,
allow, [MyHost, S2SHost]) of allow, [MyHost, S2SHost]) of
deny -> false; deny -> false;
allow -> true; allow -> true
_ -> true
end end
end. end.
@ -581,8 +582,8 @@ allow_host1(MyHost, S2SHost) ->
%% Info = [{InfoName::atom(), InfoValue::any()}] %% Info = [{InfoName::atom(), InfoValue::any()}]
get_info_s2s_connections(Type) -> get_info_s2s_connections(Type) ->
ChildType = case Type of ChildType = case Type of
in -> ejabberd_s2s_in_sup; in -> ejabberd_s2s_in_sup;
out -> ejabberd_s2s_out_sup out -> ejabberd_s2s_out_sup
end, end,
Connections = supervisor:which_children(ChildType), Connections = supervisor:which_children(ChildType),
get_s2s_info(Connections, Type). get_s2s_info(Connections, Type).
@ -597,13 +598,12 @@ complete_s2s_info([Connection | T], Type, Result) ->
complete_s2s_info(T, Type, [State | Result]). complete_s2s_info(T, Type, [State | Result]).
-spec get_s2s_state(pid()) -> [{status, open | closed | error} | {s2s_pid, pid()}]. -spec get_s2s_state(pid()) -> [{status, open | closed | error} | {s2s_pid, pid()}].
get_s2s_state(S2sPid) -> get_s2s_state(S2sPid) ->
Infos = case p1_fsm:sync_send_all_state_event(S2sPid, Infos = case p1_fsm:sync_send_all_state_event(S2sPid,
get_state_infos) get_state_infos)
of of
{state_infos, Is} -> [{status, open} | Is]; {state_infos, Is} -> [{status, open} | Is];
{noproc, _} -> [{status, closed}]; %% Connection closed {noproc, _} -> [{status, closed}]; %% Connection closed
{badrpc, _} -> [{status, error}] {badrpc, _} -> [{status, error}]
end, end,
[{s2s_pid, S2sPid} | Infos]. [{s2s_pid, S2sPid} | Infos].