25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-11-24 16:23:40 +01:00

Refactor ejabberd listener API

This commit is contained in:
Evgeny Khramtsov 2018-09-17 11:21:02 +03:00
parent 78dae4036e
commit de385591d0
13 changed files with 218 additions and 232 deletions

View File

@ -25,7 +25,7 @@
{fast_tls, ".*", {git, "https://github.com/processone/fast_tls", "f36ea5b74526c2c1c9c38f8d473168d95804f59d"}}, {fast_tls, ".*", {git, "https://github.com/processone/fast_tls", "f36ea5b74526c2c1c9c38f8d473168d95804f59d"}},
{stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.12"}}}, {stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.12"}}},
{fast_xml, ".*", {git, "https://github.com/processone/fast_xml", {tag, "1.1.32"}}}, {fast_xml, ".*", {git, "https://github.com/processone/fast_xml", {tag, "1.1.32"}}},
{xmpp, ".*", {git, "https://github.com/processone/xmpp", "8d85c4b"}}, {xmpp, ".*", {git, "https://github.com/processone/xmpp", "2ce0626"}},
{fast_yaml, ".*", {git, "https://github.com/processone/fast_yaml", {tag, "1.0.15"}}}, {fast_yaml, ".*", {git, "https://github.com/processone/fast_yaml", {tag, "1.0.15"}}},
{jiffy, ".*", {git, "https://github.com/davisp/jiffy", {tag, "0.14.8"}}}, {jiffy, ".*", {git, "https://github.com/davisp/jiffy", {tag, "0.14.8"}}},
{p1_oauth2, ".*", {git, "https://github.com/processone/p1_oauth2", {tag, "0.6.3"}}}, {p1_oauth2, ".*", {git, "https://github.com/processone/p1_oauth2", {tag, "0.6.3"}}},

View File

@ -23,20 +23,18 @@
%%% %%%
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
-module(ejabberd_bosh). -module(ejabberd_bosh).
-behaviour(xmpp_socket).
-behaviour(p1_fsm).
-protocol({xep, 124, '1.11'}). -protocol({xep, 124, '1.11'}).
-protocol({xep, 206, '1.4'}). -protocol({xep, 206, '1.4'}).
-behaviour(p1_fsm).
%% API %% API
-export([start/2, start/3, start_link/3]). -export([start/2, start/3, start_link/3]).
-export([send_xml/2, setopts/2, controlling_process/2, -export([send_xml/2, setopts/2, controlling_process/2,
migrate/3, become_controller/2, reset_stream/1, change_shaper/2, close/1,
reset_stream/1, change_shaper/2, monitor/1, close/1,
sockname/1, peername/1, process_request/3, send/2, sockname/1, peername/1, process_request/3, send/2,
change_controller/2]). get_transport/1, get_owner/1]).
%% gen_fsm callbacks %% gen_fsm callbacks
-export([init/1, wait_for_session/2, wait_for_session/3, -export([init/1, wait_for_session/2, wait_for_session/3,
@ -167,22 +165,12 @@ setopts({http_bind, FsmRef, _IP}, Opts) ->
controlling_process(_Socket, _Pid) -> ok. controlling_process(_Socket, _Pid) -> ok.
become_controller(FsmRef, C2SPid) ->
p1_fsm:send_all_state_event(FsmRef,
{become_controller, C2SPid}).
change_controller({http_bind, FsmRef, _IP}, C2SPid) ->
become_controller(FsmRef, C2SPid).
reset_stream({http_bind, _FsmRef, _IP} = Socket) -> reset_stream({http_bind, _FsmRef, _IP} = Socket) ->
Socket. Socket.
change_shaper({http_bind, FsmRef, _IP}, Shaper) -> change_shaper({http_bind, FsmRef, _IP}, Shaper) ->
p1_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}). p1_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}).
monitor({http_bind, FsmRef, _IP}) ->
erlang:monitor(process, FsmRef).
close({http_bind, FsmRef, _IP}) -> close({http_bind, FsmRef, _IP}) ->
catch p1_fsm:sync_send_all_state_event(FsmRef, catch p1_fsm:sync_send_all_state_event(FsmRef,
close). close).
@ -191,10 +179,11 @@ sockname(_Socket) -> {ok, {{0, 0, 0, 0}, 0}}.
peername({http_bind, _FsmRef, IP}) -> {ok, IP}. peername({http_bind, _FsmRef, IP}) -> {ok, IP}.
migrate(FsmRef, Node, After) when node(FsmRef) == node() -> get_transport(_Socket) ->
catch erlang:send_after(After, FsmRef, {migrate, Node}); http_bind.
migrate(_FsmRef, _Node, _After) ->
ok. get_owner({http_bind, FsmRef, _IP}) ->
FsmRef.
process_request(Data, IP, Type) -> process_request(Data, IP, Type) ->
Opts1 = ejabberd_c2s_config:get_c2s_limits(), Opts1 = ejabberd_c2s_config:get_c2s_limits(),
@ -295,30 +284,26 @@ init([#body{attrs = Attrs}, IP, SID]) ->
buf_new(XMPPDomain)), buf_new(XMPPDomain)),
Opts2} Opts2}
end, end,
xmpp_socket:start(ejabberd_c2s, ?MODULE, Socket, case ejabberd_c2s:start({?MODULE, Socket}, [{receiver, self()}|Opts]) of
[{receiver, self()}|Opts]), {ok, C2SPid} ->
Inactivity = gen_mod:get_module_opt(XMPPDomain, ejabberd_c2s:accept(C2SPid),
mod_bosh, max_inactivity), Inactivity = gen_mod:get_module_opt(XMPPDomain,
MaxConcat = gen_mod:get_module_opt(XMPPDomain, mod_bosh, max_concat), mod_bosh, max_inactivity),
ShapedReceivers = buf_new(XMPPDomain, ?MAX_SHAPED_REQUESTS_QUEUE_LEN), MaxConcat = gen_mod:get_module_opt(XMPPDomain, mod_bosh, max_concat),
State = #state{host = XMPPDomain, sid = SID, ip = IP, ShapedReceivers = buf_new(XMPPDomain, ?MAX_SHAPED_REQUESTS_QUEUE_LEN),
xmpp_ver = XMPPVer, el_ibuf = InBuf, State = #state{host = XMPPDomain, sid = SID, ip = IP,
max_concat = MaxConcat, el_obuf = buf_new(XMPPDomain), xmpp_ver = XMPPVer, el_ibuf = InBuf,
inactivity_timeout = Inactivity, max_concat = MaxConcat, el_obuf = buf_new(XMPPDomain),
shaped_receivers = ShapedReceivers, inactivity_timeout = Inactivity,
shaper_state = ShaperState}, shaped_receivers = ShapedReceivers,
NewState = restart_inactivity_timer(State), shaper_state = ShaperState},
mod_bosh:open_session(SID, self()), NewState = restart_inactivity_timer(State),
{ok, wait_for_session, NewState}; mod_bosh:open_session(SID, self()),
init([StateName, State]) -> {ok, wait_for_session, NewState};
mod_bosh:open_session(State#state.sid, self()), {error, Reason} ->
case State#state.c2s_pid of {stop, Reason};
C2SPid when is_pid(C2SPid) -> ignore ->
NewSocket = make_socket(self(), State#state.ip), ignore
C2SPid ! {change_socket, NewSocket},
NewState = restart_inactivity_timer(State),
{ok, StateName, NewState};
_ -> {stop, normal}
end. end.
wait_for_session(_Event, State) -> wait_for_session(_Event, State) ->
@ -525,7 +510,7 @@ active1(#body{attrs = Attrs} = Req, From, State) ->
end end
end. end.
handle_event({become_controller, C2SPid}, StateName, handle_event({activate, C2SPid}, StateName,
State) -> State) ->
State1 = route_els(State#state{c2s_pid = C2SPid}), State1 = route_els(State#state{c2s_pid = C2SPid}),
{next_state, StateName, State1}; {next_state, StateName, State1};
@ -598,24 +583,11 @@ handle_info({timeout, TRef, shaper_timeout}, StateName,
{stop, normal, State}; {stop, normal, State};
_ -> {next_state, StateName, State} _ -> {next_state, StateName, State}
end; end;
handle_info({migrate, Node}, StateName, State) ->
if Node /= node() ->
NewState = bounce_receivers(State, migrated),
{migrate, NewState,
{Node, ?MODULE, start, [StateName, NewState]}, 0};
true -> {next_state, StateName, State}
end;
handle_info(_Info, StateName, State) -> handle_info(_Info, StateName, State) ->
?ERROR_MSG("unexpected info:~n** Msg: ~p~n** StateName: ~p", ?ERROR_MSG("unexpected info:~n** Msg: ~p~n** StateName: ~p",
[_Info, StateName]), [_Info, StateName]),
{next_state, StateName, State}. {next_state, StateName, State}.
terminate({migrated, ClonePid}, _StateName, State) ->
?INFO_MSG("Migrating session \"~s\" (c2s_pid = "
"~p) to ~p on node ~p",
[State#state.sid, State#state.c2s_pid, ClonePid,
node(ClonePid)]),
mod_bosh:close_session(State#state.sid);
terminate(_Reason, _StateName, State) -> terminate(_Reason, _StateName, State) ->
mod_bosh:close_session(State#state.sid), mod_bosh:close_session(State#state.sid),
case State#state.c2s_pid of case State#state.c2s_pid of
@ -718,7 +690,7 @@ do_reply(State, From, Body, RID) ->
Responses2 = gb_trees:insert(RID, Body, Responses1), Responses2 = gb_trees:insert(RID, Body, Responses1),
State#state{responses = Responses2}. State#state{responses = Responses2}.
bounce_receivers(State, Reason) -> bounce_receivers(State, _Reason) ->
Receivers = gb_trees:to_list(State#state.receivers), Receivers = gb_trees:to_list(State#state.receivers),
ShapedReceivers = lists:map(fun ({_, From, ShapedReceivers = lists:map(fun ({_, From,
#body{attrs = Attrs} = Body}) -> #body{attrs = Attrs} = Body}) ->
@ -726,18 +698,13 @@ bounce_receivers(State, Reason) ->
{RID, {From, Body}} {RID, {From, Body}}
end, end,
p1_queue:to_list(State#state.shaped_receivers)), p1_queue:to_list(State#state.shaped_receivers)),
lists:foldl(fun ({RID, {From, Body}}, AccState) -> lists:foldl(fun ({RID, {From, _Body}}, AccState) ->
NewBody = if Reason == closed -> NewBody = #body{http_reason =
#body{http_reason = <<"Session closed">>,
<<"Session closed">>, attrs =
attrs = [{type, <<"terminate">>},
[{type, <<"terminate">>}, {condition,
{condition, <<"other-request">>}]},
<<"other-request">>}]};
Reason == migrated ->
Body#body{http_reason =
<<"Session migrated">>}
end,
do_reply(AccState, From, NewBody, RID) do_reply(AccState, From, NewBody, RID)
end, end,
State, Receivers ++ ShapedReceivers). State, Receivers ++ ShapedReceivers).

View File

@ -22,14 +22,14 @@
-module(ejabberd_c2s). -module(ejabberd_c2s).
-behaviour(xmpp_stream_in). -behaviour(xmpp_stream_in).
-behaviour(ejabberd_config). -behaviour(ejabberd_config).
-behaviour(xmpp_socket). -behaviour(ejabberd_listener).
-protocol({rfc, 6121}). -protocol({rfc, 6121}).
%% xmpp_socket callbacks %% ejabberd_listener callbacks
-export([start/2, start_link/2, socket_type/0]). -export([start/2, start_link/2, accept/1, listen_opt_type/1]).
%% ejabberd_config callbacks %% ejabberd_config callbacks
-export([opt_type/1, listen_opt_type/1, transform_listen_option/2]). -export([opt_type/1, transform_listen_option/2]).
%% xmpp_stream_in callbacks %% xmpp_stream_in callbacks
-export([init/1, handle_call/3, handle_cast/2, -export([init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2, code_change/3]). handle_info/2, terminate/2, code_change/3]).
@ -61,26 +61,18 @@
-export_type([state/0]). -export_type([state/0]).
%%%=================================================================== %%%===================================================================
%%% xmpp_socket API %%% ejabberd_listener API
%%%=================================================================== %%%===================================================================
start(SockData, Opts) -> start(SockData, Opts) ->
case proplists:get_value(supervisor, Opts, true) of xmpp_stream_in:start(?MODULE, [SockData, Opts],
true -> ejabberd_config:fsm_limit_opts(Opts)).
case supervisor:start_child(ejabberd_c2s_sup, [SockData, Opts]) of
{ok, undefined} -> ignore;
Res -> Res
end;
_ ->
xmpp_stream_in:start(?MODULE, [SockData, Opts],
ejabberd_config:fsm_limit_opts(Opts))
end.
start_link(SockData, Opts) -> start_link(SockData, Opts) ->
xmpp_stream_in:start_link(?MODULE, [SockData, Opts], xmpp_stream_in:start_link(?MODULE, [SockData, Opts],
ejabberd_config:fsm_limit_opts(Opts)). ejabberd_config:fsm_limit_opts(Opts)).
socket_type() -> accept(Ref) ->
xml_stream. xmpp_stream_in:accept(Ref).
%%%=================================================================== %%%===================================================================
%%% Common API %%% Common API

View File

@ -24,14 +24,14 @@
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
-module(ejabberd_http). -module(ejabberd_http).
-behaviour(ejabberd_listener).
-behaviour(ejabberd_config). -behaviour(ejabberd_config).
-author('alexey@process-one.net'). -author('alexey@process-one.net').
%% External exports %% External exports
-export([start/2, start_link/2, become_controller/1, -export([start/2, start_link/2,
socket_type/0, receive_headers/1, recv_file/2, accept/1, receive_headers/1, recv_file/2,
transform_listen_option/2, listen_opt_type/1]). transform_listen_option/2, listen_opt_type/1]).
-export([init/2, opt_type/1]). -export([init/2, opt_type/1]).
@ -164,12 +164,9 @@ init({SockMod, Socket}, Opts) ->
{error, _} -> State {error, _} -> State
end. end.
become_controller(_Pid) -> accept(_Pid) ->
ok. ok.
socket_type() ->
raw.
send_text(_State, none) -> send_text(_State, none) ->
ok; ok;
send_text(State, Text) -> send_text(State, Text) ->

View File

@ -23,19 +23,17 @@
%%% %%%
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
-module(ejabberd_http_ws). -module(ejabberd_http_ws).
-behaviour(ejabberd_config).
-author('ecestari@process-one.net'). -author('ecestari@process-one.net').
-behaviour(ejabberd_config).
-behaviour(xmpp_socket).
-behaviour(p1_fsm). -behaviour(p1_fsm).
-export([start/1, start_link/1, init/1, handle_event/3, -export([start/1, start_link/1, init/1, handle_event/3,
handle_sync_event/4, code_change/4, handle_info/3, handle_sync_event/4, code_change/4, handle_info/3,
terminate/3, send_xml/2, setopts/2, sockname/1, terminate/3, send_xml/2, setopts/2, sockname/1,
peername/1, controlling_process/2, become_controller/2, peername/1, controlling_process/2, get_owner/1,
monitor/1, reset_stream/1, close/1, change_shaper/2, reset_stream/1, close/1, change_shaper/2,
socket_handoff/3, opt_type/1]). socket_handoff/3, get_transport/1, opt_type/1]).
-include("logger.hrl"). -include("logger.hrl").
@ -54,8 +52,8 @@
timeout = ?WEBSOCKET_TIMEOUT :: non_neg_integer(), timeout = ?WEBSOCKET_TIMEOUT :: non_neg_integer(),
timer = make_ref() :: reference(), timer = make_ref() :: reference(),
input = [] :: list(), input = [] :: list(),
waiting_input = false :: false | pid(), active = false :: boolean(),
last_receiver = self() :: pid(), c2s_pid :: pid(),
ws :: {#ws{}, pid()}, ws :: {#ws{}, pid()},
rfc_compilant = undefined :: boolean() | undefined}). rfc_compilant = undefined :: boolean() | undefined}).
@ -104,15 +102,9 @@ peername({http_ws, _FsmRef, IP}) -> {ok, IP}.
controlling_process(_Socket, _Pid) -> ok. controlling_process(_Socket, _Pid) -> ok.
become_controller(FsmRef, C2SPid) ->
p1_fsm:send_all_state_event(FsmRef, {activate, C2SPid}).
close({http_ws, FsmRef, _IP}) -> close({http_ws, FsmRef, _IP}) ->
catch p1_fsm:sync_send_all_state_event(FsmRef, close). catch p1_fsm:sync_send_all_state_event(FsmRef, close).
monitor({http_ws, FsmRef, _IP}) ->
erlang:monitor(process, FsmRef).
reset_stream({http_ws, _FsmRef, _IP} = Socket) -> reset_stream({http_ws, _FsmRef, _IP} = Socket) ->
Socket. Socket.
@ -120,6 +112,12 @@ change_shaper({http_ws, _FsmRef, _IP}, _Shaper) ->
%% TODO??? %% TODO???
ok. ok.
get_transport(_Socket) ->
websocket.
get_owner({http_ws, FsmRef, _IP}) ->
FsmRef.
socket_handoff(LocalPath, Request, Opts) -> socket_handoff(LocalPath, Request, Opts) ->
ejabberd_websocket:socket_handoff(LocalPath, Request, Opts, ?MODULE, fun get_human_html_xmlel/0). ejabberd_websocket:socket_handoff(LocalPath, Request, Opts, ?MODULE, fun get_human_html_xmlel/0).
@ -145,31 +143,34 @@ init([{#ws{ip = IP, http_opts = HOpts}, _} = WS]) ->
Socket = {http_ws, self(), IP}, Socket = {http_ws, self(), IP},
?DEBUG("Client connected through websocket ~p", ?DEBUG("Client connected through websocket ~p",
[Socket]), [Socket]),
xmpp_socket:start(ejabberd_c2s, ?MODULE, Socket, case ejabberd_c2s:start({?MODULE, Socket}, [{receiver, self()}|Opts]) of
[{receiver, self()}|Opts]), {ok, C2SPid} ->
Timer = erlang:start_timer(WSTimeout, self(), []), ejabberd_c2s:accept(C2SPid),
{ok, loop, Timer = erlang:start_timer(WSTimeout, self(), []),
#state{socket = Socket, timeout = WSTimeout, {ok, loop,
timer = Timer, ws = WS, #state{socket = Socket, timeout = WSTimeout,
ping_interval = PingInterval}}. timer = Timer, ws = WS, c2s_pid = C2SPid,
ping_interval = PingInterval}};
handle_event({activate, From}, StateName, StateData) -> {error, Reason} ->
case StateData#state.input of {stop, Reason};
[] -> ignore ->
{next_state, StateName, ignore
StateData#state{waiting_input = From}};
Input ->
Receiver = From,
lists:foreach(fun(I) when is_binary(I)->
Receiver ! {tcp, StateData#state.socket, I};
(I2) ->
Receiver ! {tcp, StateData#state.socket, [I2]}
end, Input),
{next_state, StateName,
StateData#state{input = [], waiting_input = false,
last_receiver = Receiver}}
end. end.
handle_event({activate, From}, StateName, State) ->
State1 = case State#state.input of
[] -> State#state{active = true};
Input ->
lists:foreach(
fun(I) when is_binary(I)->
From ! {tcp, State#state.socket, I};
(I2) ->
From ! {tcp, State#state.socket, [I2]}
end, Input),
State#state{active = false, input = []}
end,
{next_state, StateName, State1#state{c2s_pid = From}}.
handle_sync_event({send_xml, Packet}, _From, StateName, handle_sync_event({send_xml, Packet}, _From, StateName,
#state{ws = {_, WsPid}, rfc_compilant = R} = StateData) -> #state{ws = {_, WsPid}, rfc_compilant = R} = StateData) ->
Packet2 = case {case R of undefined -> true; V -> V end, Packet} of Packet2 = case {case R of undefined -> true; V -> V end, Packet} of
@ -233,14 +234,13 @@ handle_info(closed, _StateName, StateData) ->
{stop, normal, StateData}; {stop, normal, StateData};
handle_info({received, Packet}, StateName, StateDataI) -> handle_info({received, Packet}, StateName, StateDataI) ->
{StateData, Parsed} = parse(StateDataI, Packet), {StateData, Parsed} = parse(StateDataI, Packet),
SD = case StateData#state.waiting_input of SD = case StateData#state.active of
false -> false ->
Input = StateData#state.input ++ if is_binary(Parsed) -> [Parsed]; true -> Parsed end, Input = StateData#state.input ++ if is_binary(Parsed) -> [Parsed]; true -> Parsed end,
StateData#state{input = Input}; StateData#state{input = Input};
Receiver -> true ->
Receiver ! {tcp, StateData#state.socket, Parsed}, StateData#state.c2s_pid ! {tcp, StateData#state.socket, Parsed},
setup_timers(StateData#state{waiting_input = false, setup_timers(StateData#state{active = false})
last_receiver = Receiver})
end, end,
{next_state, StateName, SD}; {next_state, StateName, SD};
handle_info(PingPong, StateName, StateData) when PingPong == ping orelse handle_info(PingPong, StateName, StateData) when PingPong == ping orelse
@ -273,13 +273,7 @@ code_change(_OldVsn, StateName, StateData, _Extra) ->
{ok, StateName, StateData}. {ok, StateName, StateData}.
terminate(_Reason, _StateName, StateData) -> terminate(_Reason, _StateName, StateData) ->
case StateData#state.waiting_input of StateData#state.c2s_pid ! {tcp_closed, StateData#state.socket}.
false -> ok;
Receiver ->
?DEBUG("C2S Pid : ~p", [Receiver]),
Receiver ! {tcp_closed, StateData#state.socket}
end,
ok.
setup_timers(StateData) -> setup_timers(StateData) ->
misc:cancel_timer(StateData#state.timer), misc:cancel_timer(StateData#state.timer),

View File

@ -36,6 +36,13 @@
-include("logger.hrl"). -include("logger.hrl").
-callback start({gen_tcp, inet:socket()}, [proplists:property()]) ->
{ok, pid()} | {error, any()} | ignore.
-callback start_link({gen_tcp, inet:socket()}, [proplists:property()]) ->
{ok, pid()} | {error, any()} | ignore.
-callback accept(pid()) -> any().
-callback listen_opt_type(atom()) -> fun((atom()) -> term()) | [atom()].
%% We do not block on send anymore. %% We do not block on send anymore.
-define(TCP_SEND_TIMEOUT, 15000). -define(TCP_SEND_TIMEOUT, 15000).
@ -81,11 +88,7 @@ report_duplicated_portips(L) ->
start(Port, Module, Opts) -> start(Port, Module, Opts) ->
NewOpts = validate_module_options(Module, Opts), NewOpts = validate_module_options(Module, Opts),
%% Check if the module is an ejabberd listener or an independent listener start_dependent(Port, Module, NewOpts).
case Module:socket_type() of
independent -> Module:start_listener(Port, NewOpts);
_ -> start_dependent(Port, Module, NewOpts)
end.
%% @spec(Port, Module, Opts) -> {ok, Pid} | {error, ErrorMessage} %% @spec(Port, Module, Opts) -> {ok, Pid} | {error, ErrorMessage}
start_dependent(Port, Module, Opts) -> start_dependent(Port, Module, Opts) ->
@ -109,7 +112,6 @@ init_udp(PortIP, Module, Opts, SockOpts, Port) ->
%% Inform my parent that this port was opened successfully %% Inform my parent that this port was opened successfully
proc_lib:init_ack({ok, self()}), proc_lib:init_ack({ok, self()}),
application:ensure_started(ejabberd), application:ensure_started(ejabberd),
start_module_sup(Port, Module),
?INFO_MSG("Start accepting UDP connections at ~s for ~p", ?INFO_MSG("Start accepting UDP connections at ~s for ~p",
[format_portip(PortIP), Module]), [format_portip(PortIP), Module]),
case erlang:function_exported(Module, udp_init, 2) of case erlang:function_exported(Module, udp_init, 2) of
@ -136,21 +138,21 @@ init_tcp(PortIP, Module, Opts, SockOpts, Port) ->
{ok, ListenSocket} -> {ok, ListenSocket} ->
proc_lib:init_ack({ok, self()}), proc_lib:init_ack({ok, self()}),
application:ensure_started(ejabberd), application:ensure_started(ejabberd),
start_module_sup(Port, Module), Sup = start_module_sup(Module, Opts),
?INFO_MSG("Start accepting TCP connections at ~s for ~p", ?INFO_MSG("Start accepting TCP connections at ~s for ~p",
[format_portip(PortIP), Module]), [format_portip(PortIP), Module]),
case erlang:function_exported(Module, tcp_init, 2) of case erlang:function_exported(Module, tcp_init, 2) of
false -> false ->
accept(ListenSocket, Module, Opts); accept(ListenSocket, Module, Opts, Sup);
true -> true ->
case catch Module:tcp_init(ListenSocket, Opts) of case catch Module:tcp_init(ListenSocket, Opts) of
{'EXIT', _} = Err -> {'EXIT', _} = Err ->
?ERROR_MSG("failed to process callback function " ?ERROR_MSG("failed to process callback function "
"~p:~s(~p, ~p): ~p", "~p:~s(~p, ~p): ~p",
[Module, tcp_init, ListenSocket, Opts, Err]), [Module, tcp_init, ListenSocket, Opts, Err]),
accept(ListenSocket, Module, Opts); accept(ListenSocket, Module, Opts, Sup);
NewOpts -> NewOpts ->
accept(ListenSocket, Module, NewOpts) accept(ListenSocket, Module, NewOpts, Sup)
end end
end; end;
{error, _} = Err -> {error, _} = Err ->
@ -240,20 +242,22 @@ get_ip_tuple(no_ip_option, inet6) ->
get_ip_tuple(IPOpt, _IPVOpt) -> get_ip_tuple(IPOpt, _IPVOpt) ->
IPOpt. IPOpt.
accept(ListenSocket, Module, Opts) -> accept(ListenSocket, Module, Opts, Sup) ->
Interval = proplists:get_value(accept_interval, Opts, 0), Interval = proplists:get_value(accept_interval, Opts, 0),
accept(ListenSocket, Module, Opts, Interval). accept(ListenSocket, Module, Opts, Sup, Interval).
accept(ListenSocket, Module, Opts, Interval) -> accept(ListenSocket, Module, Opts, Sup, Interval) ->
NewInterval = check_rate_limit(Interval), NewInterval = check_rate_limit(Interval),
case gen_tcp:accept(ListenSocket) of case gen_tcp:accept(ListenSocket) of
{ok, Socket} -> {ok, Socket} ->
case {inet:sockname(Socket), inet:peername(Socket)} of case {inet:sockname(Socket), inet:peername(Socket)} of
{{ok, {Addr, Port}}, {ok, {PAddr, PPort}}} -> {{ok, {Addr, Port}}, {ok, {PAddr, PPort}}} ->
Receiver = case xmpp_socket:start(Module, Receiver = case start_connection(Module, Socket, Opts, Sup) of
gen_tcp, Socket, Opts) of {ok, RecvPid} ->
{ok, RecvPid} -> RecvPid; RecvPid;
_ -> none _ ->
gen_tcp:close(Socket),
none
end, end,
?INFO_MSG("(~p) Accepted connection ~s:~p -> ~s:~p", ?INFO_MSG("(~p) Accepted connection ~s:~p -> ~s:~p",
[Receiver, [Receiver,
@ -262,11 +266,11 @@ accept(ListenSocket, Module, Opts, Interval) ->
_ -> _ ->
gen_tcp:close(Socket) gen_tcp:close(Socket)
end, end,
accept(ListenSocket, Module, Opts, NewInterval); accept(ListenSocket, Module, Opts, Sup, NewInterval);
{error, Reason} -> {error, Reason} ->
?ERROR_MSG("(~w) Failed TCP accept: ~s", ?ERROR_MSG("(~w) Failed TCP accept: ~s",
[ListenSocket, inet:format_error(Reason)]), [ListenSocket, inet:format_error(Reason)]),
accept(ListenSocket, Module, Opts, NewInterval) accept(ListenSocket, Module, Opts, Sup, NewInterval)
end. end.
udp_recv(Socket, Module, Opts) -> udp_recv(Socket, Module, Opts) ->
@ -287,6 +291,25 @@ udp_recv(Socket, Module, Opts) ->
throw({error, Reason}) throw({error, Reason})
end. end.
start_connection(Module, Socket, Opts, Sup) ->
Res = case Sup of
undefined -> Module:start({gen_tcp, Socket}, Opts);
_ -> supervisor:start_child(Sup, [{gen_tcp, Socket}, Opts])
end,
case Res of
{ok, Pid} ->
case gen_tcp:controlling_process(Socket, Pid) of
ok ->
Module:accept(Pid),
{ok, Pid};
Err ->
exit(Pid, kill),
Err
end;
Err ->
Err
end.
%% @spec (Port, Module, Opts) -> {ok, Pid} | {error, Error} %% @spec (Port, Module, Opts) -> {ok, Pid} | {error, Error}
start_listener(Port, Module, Opts) -> start_listener(Port, Module, Opts) ->
case start_listener2(Port, Module, Opts) of case start_listener2(Port, Module, Opts) of
@ -309,16 +332,21 @@ start_listener2(Port, Module, Opts) ->
%% So, it's normal (and harmless) that in most cases this call returns: {error, {already_started, pid()}} %% So, it's normal (and harmless) that in most cases this call returns: {error, {already_started, pid()}}
start_listener_sup(Port, Module, Opts). start_listener_sup(Port, Module, Opts).
start_module_sup(_Port, Module) -> -spec start_module_sup(module(), [proplists:property()]) -> atom().
Proc1 = gen_mod:get_module_proc(<<"sup">>, Module), start_module_sup(Module, Opts) ->
ChildSpec1 = case proplists:get_value(supervisor, Opts, true) of
{Proc1, true ->
{ejabberd_tmp_sup, start_link, [Proc1, Module]}, Proc = list_to_atom(atom_to_list(Module) ++ "_sup"),
permanent, ChildSpec = {Proc, {ejabberd_tmp_sup, start_link, [Proc, Module]},
infinity, permanent,
supervisor, infinity,
[ejabberd_tmp_sup]}, supervisor,
supervisor:start_child(ejabberd_sup, ChildSpec1). [ejabberd_tmp_sup]},
supervisor:start_child(ejabberd_sup, ChildSpec),
Proc;
false ->
undefined
end.
start_listener_sup(Port, Module, Opts) -> start_listener_sup(Port, Module, Opts) ->
ChildSpec = {Port, ChildSpec = {Port,

View File

@ -21,12 +21,10 @@
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
-module(ejabberd_s2s_in). -module(ejabberd_s2s_in).
-behaviour(xmpp_stream_in). -behaviour(xmpp_stream_in).
-behaviour(xmpp_socket). -behaviour(ejabberd_listener).
%% xmpp_socket callbacks
-export([start/2, start_link/2, socket_type/0]).
%% ejabberd_listener callbacks %% ejabberd_listener callbacks
-export([listen_opt_type/1]). -export([start/2, start_link/2, accept/1, listen_opt_type/1]).
%% xmpp_stream_in callbacks %% xmpp_stream_in callbacks
-export([init/1, handle_call/3, handle_cast/2, -export([init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2, code_change/3]). handle_info/2, terminate/2, code_change/3]).
@ -53,16 +51,8 @@
%%% API %%% API
%%%=================================================================== %%%===================================================================
start(SockData, Opts) -> start(SockData, Opts) ->
case proplists:get_value(supervisor, Opts, true) of xmpp_stream_in:start(?MODULE, [SockData, Opts],
true -> ejabberd_config:fsm_limit_opts(Opts)).
case supervisor:start_child(ejabberd_s2s_in_sup, [SockData, Opts]) of
{ok, undefined} -> ignore;
Res -> Res
end;
_ ->
xmpp_stream_in:start(?MODULE, [SockData, Opts],
ejabberd_config:fsm_limit_opts(Opts))
end.
start_link(SockData, Opts) -> start_link(SockData, Opts) ->
xmpp_stream_in:start_link(?MODULE, [SockData, Opts], xmpp_stream_in:start_link(?MODULE, [SockData, Opts],
@ -77,8 +67,8 @@ close(Ref, Reason) ->
stop(Ref) -> stop(Ref) ->
xmpp_stream_in:stop(Ref). xmpp_stream_in:stop(Ref).
socket_type() -> accept(Ref) ->
xml_stream. xmpp_stream_in:accept(Ref).
-spec send(pid(), xmpp_element()) -> ok; -spec send(pid(), xmpp_element()) -> ok;
(state(), xmpp_element()) -> state(). (state(), xmpp_element()) -> state().

View File

@ -21,20 +21,19 @@
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
-module(ejabberd_service). -module(ejabberd_service).
-behaviour(xmpp_stream_in). -behaviour(xmpp_stream_in).
-behaviour(xmpp_socket). -behaviour(ejabberd_listener).
-protocol({xep, 114, '1.6'}). -protocol({xep, 114, '1.6'}).
%% xmpp_socket callbacks
-export([start/2, start_link/2, socket_type/0, close/1, close/2]).
%% ejabberd_listener callbacks %% ejabberd_listener callbacks
-export([start/2, start_link/2, accept/1]).
-export([listen_opt_type/1, transform_listen_option/2]). -export([listen_opt_type/1, transform_listen_option/2]).
%% xmpp_stream_in callbacks %% xmpp_stream_in callbacks
-export([init/1, handle_info/2, terminate/2, code_change/3]). -export([init/1, handle_info/2, terminate/2, code_change/3]).
-export([handle_stream_start/2, handle_auth_success/4, handle_auth_failure/4, -export([handle_stream_start/2, handle_auth_success/4, handle_auth_failure/4,
handle_authenticated_packet/2, get_password_fun/1, tls_options/1]). handle_authenticated_packet/2, get_password_fun/1, tls_options/1]).
%% API %% API
-export([send/2]). -export([send/2, close/1, close/2]).
-include("xmpp.hrl"). -include("xmpp.hrl").
-include("logger.hrl"). -include("logger.hrl").
@ -53,8 +52,8 @@ start_link(SockData, Opts) ->
xmpp_stream_in:start_link(?MODULE, [SockData, Opts], xmpp_stream_in:start_link(?MODULE, [SockData, Opts],
ejabberd_config:fsm_limit_opts(Opts)). ejabberd_config:fsm_limit_opts(Opts)).
socket_type() -> accept(Ref) ->
xml_stream. xmpp_stream_in:accept(Ref).
-spec send(pid(), xmpp_element()) -> ok; -spec send(pid(), xmpp_element()) -> ok;
(state(), xmpp_element()) -> state(). (state(), xmpp_element()) -> state().

View File

@ -24,25 +24,29 @@
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
-module(ejabberd_sip). -module(ejabberd_sip).
-behaviour(ejabberd_listener).
-ifndef(SIP). -ifndef(SIP).
-include("logger.hrl"). -include("logger.hrl").
-export([socket_type/0, start/2, listen_opt_type/1]). -export([accept/1, start/2, start_link/2, listen_opt_type/1]).
log_error() -> log_error() ->
?CRITICAL_MSG("ejabberd is not compiled with SIP support", []). ?CRITICAL_MSG("ejabberd is not compiled with SIP support", []).
socket_type() -> accept(_) ->
log_error(), log_error(),
raw. ok.
listen_opt_type(_) -> listen_opt_type(_) ->
log_error(), log_error(),
[]. [].
start(_, _) -> start(_, _) ->
log_error(), log_error(),
{error, sip_not_compiled}. {error, sip_not_compiled}.
start_link(_, _) ->
log_error(),
{error, sip_not_compiled}.
-else. -else.
%% API %% API
-export([tcp_init/2, udp_init/2, udp_recv/5, start/2, -export([tcp_init/2, udp_init/2, udp_recv/5, start/2,
socket_type/0, listen_opt_type/1]). start_link/2, accept/1, listen_opt_type/1]).
%%%=================================================================== %%%===================================================================
@ -62,8 +66,11 @@ udp_recv(Sock, Addr, Port, Data, Opts) ->
start(Opaque, Opts) -> start(Opaque, Opts) ->
esip_socket:start(Opaque, Opts). esip_socket:start(Opaque, Opts).
socket_type() -> start_link({gen_tcp, Sock}, Opts) ->
raw. esip_socket:start_link(Sock, Opts).
accept(_) ->
ok.
set_certfile(Opts) -> set_certfile(Opts) ->
case lists:keymember(certfile, 1, Opts) of case lists:keymember(certfile, 1, Opts) of

View File

@ -24,27 +24,30 @@
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
-module(ejabberd_stun). -module(ejabberd_stun).
-behaviour(ejabberd_listener).
-protocol({rfc, 5766}). -protocol({rfc, 5766}).
-protocol({xep, 176, '1.0'}). -protocol({xep, 176, '1.0'}).
-ifndef(STUN). -ifndef(STUN).
-include("logger.hrl"). -include("logger.hrl").
-export([socket_type/0, start/2, listen_opt_type/1]). -export([accept/1, start/2, start_link/2, listen_opt_type/1]).
log_error() -> log_error() ->
?CRITICAL_MSG("ejabberd is not compiled with STUN/TURN support", []). ?CRITICAL_MSG("ejabberd is not compiled with STUN/TURN support", []).
socket_type() -> accept(_) ->
log_error(), log_error(),
raw. ok.
listen_opt_type(_) -> listen_opt_type(_) ->
log_error(), log_error(),
[]. [].
start(_, _) -> start(_, _) ->
log_error(), log_error(),
{error, sip_not_compiled}. {error, sip_not_compiled}.
start_link(_, _) ->
log_error(),
{error, sip_not_compiled}.
-else. -else.
-export([tcp_init/2, udp_init/2, udp_recv/5, start/2, -export([tcp_init/2, udp_init/2, udp_recv/5, start/2,
socket_type/0, listen_opt_type/1]). start_link/2, accept/1, listen_opt_type/1]).
-include("logger.hrl"). -include("logger.hrl").
@ -65,8 +68,11 @@ udp_recv(Socket, Addr, Port, Packet, Opts) ->
start(Opaque, Opts) -> start(Opaque, Opts) ->
stun:start(Opaque, Opts). stun:start(Opaque, Opts).
socket_type() -> start_link({gen_tcp, Sock}, Opts) ->
raw. stun:start_link(Sock, Opts).
accept(_Pid) ->
ok.
%%%=================================================================== %%%===================================================================
%%% Internal functions %%% Internal functions

View File

@ -31,10 +31,11 @@
%%% TODO: commands strings should be strings without ~n %%% TODO: commands strings should be strings without ~n
-module(ejabberd_xmlrpc). -module(ejabberd_xmlrpc).
-behaviour(ejabberd_listener).
-author('badlop@process-one.net'). -author('badlop@process-one.net').
-export([start/2, handler/2, process/2, socket_type/0, -export([start/2, start_link/2, handler/2, process/2, accept/1,
transform_listen_option/2, listen_opt_type/1]). transform_listen_option/2, listen_opt_type/1]).
-include("logger.hrl"). -include("logger.hrl").
@ -190,7 +191,11 @@
start({gen_tcp = _SockMod, Socket}, Opts) -> start({gen_tcp = _SockMod, Socket}, Opts) ->
ejabberd_http:start({gen_tcp, Socket}, [{xmlrpc, true}|Opts]). ejabberd_http:start({gen_tcp, Socket}, [{xmlrpc, true}|Opts]).
socket_type() -> raw. start_link({gen_tcp = _SockMod, Socket}, Opts) ->
ejabberd_http:start_link({gen_tcp, Socket}, [{xmlrpc, true}|Opts]).
accept(Pid) ->
ejabberd_http:accept(Pid).
%% ----------------------------- %% -----------------------------
%% HTTP interface %% HTTP interface

View File

@ -99,15 +99,7 @@ init([Host, Opts]) ->
Service = {mod_proxy65_service, Service = {mod_proxy65_service,
{mod_proxy65_service, start_link, [Host, Opts]}, {mod_proxy65_service, start_link, [Host, Opts]},
transient, 5000, worker, [mod_proxy65_service]}, transient, 5000, worker, [mod_proxy65_service]},
StreamSupervisor = {ejabberd_mod_proxy65_sup, {ok, {{one_for_one, 10, 1}, [Service]}}.
{ejabberd_tmp_sup, start_link,
[gen_mod:get_module_proc(Host,
ejabberd_mod_proxy65_sup),
mod_proxy65_stream]},
transient, infinity, supervisor, [ejabberd_tmp_sup]},
{ok,
{{one_for_one, 10, 1},
[StreamSupervisor, Service]}}.
depends(_Host, _Opts) -> depends(_Host, _Opts) ->
[]. [].

View File

@ -27,18 +27,19 @@
-author('xram@jabber.ru'). -author('xram@jabber.ru').
-behaviour(p1_fsm). -behaviour(p1_fsm).
-behaviour(ejabberd_listener).
%% gen_fsm callbacks. %% gen_fsm callbacks.
-export([init/1, handle_event/3, handle_sync_event/4, -export([init/1, handle_event/3, handle_sync_event/4,
code_change/4, handle_info/3, terminate/3]). code_change/4, handle_info/3, terminate/3]).
%% gen_fsm states. %% gen_fsm states.
-export([wait_for_init/2, wait_for_auth/2, -export([accepting/2, wait_for_init/2, wait_for_auth/2,
wait_for_request/2, wait_for_activation/2, wait_for_request/2, wait_for_activation/2,
stream_established/2]). stream_established/2]).
-export([start/2, stop/1, start_link/3, activate/2, -export([start/2, stop/1, start_link/2, start_link/3, activate/2,
relay/3, socket_type/0, listen_opt_type/1, relay/3, accept/1, listen_opt_type/1,
listen_options/0]). listen_options/0]).
-include("mod_proxy65.hrl"). -include("mod_proxy65.hrl").
@ -69,10 +70,14 @@ start({gen_tcp, Socket}, Opts1) ->
fun({server_host, _}) -> true; fun({server_host, _}) -> true;
(_) -> false (_) -> false
end, Opts1), end, Opts1),
Supervisor = gen_mod:get_module_proc(Host, p1_fsm:start(?MODULE, [Socket, Host, Opts], []).
ejabberd_mod_proxy65_sup),
supervisor:start_child(Supervisor, start_link({gen_tcp, Socket}, Opts1) ->
[Socket, Host, Opts]). {[{server_host, Host}], Opts} = lists:partition(
fun({server_host, _}) -> true;
(_) -> false
end, Opts1),
start_link(Socket, Host, Opts).
start_link(Socket, Host, Opts) -> start_link(Socket, Host, Opts) ->
p1_fsm:start_link(?MODULE, [Socket, Host, Opts], []). p1_fsm:start_link(?MODULE, [Socket, Host, Opts], []).
@ -84,9 +89,8 @@ init([Socket, Host, Opts]) ->
RecvBuf = gen_mod:get_opt(recbuf, Opts), RecvBuf = gen_mod:get_opt(recbuf, Opts),
SendBuf = gen_mod:get_opt(sndbuf, Opts), SendBuf = gen_mod:get_opt(sndbuf, Opts),
TRef = erlang:send_after(?WAIT_TIMEOUT, self(), stop), TRef = erlang:send_after(?WAIT_TIMEOUT, self(), stop),
inet:setopts(Socket, inet:setopts(Socket, [{recbuf, RecvBuf}, {sndbuf, SendBuf}]),
[{active, true}, {recbuf, RecvBuf}, {sndbuf, SendBuf}]), {ok, accepting,
{ok, wait_for_init,
#state{host = Host, auth_type = AuthType, #state{host = Host, auth_type = AuthType,
socket = Socket, shaper = Shaper, timer = TRef}}. socket = Socket, shaper = Shaper, timer = TRef}}.
@ -101,7 +105,8 @@ terminate(_Reason, StateName, #state{sha1 = SHA1}) ->
%%%------------------------------ %%%------------------------------
%%% API. %%% API.
%%%------------------------------ %%%------------------------------
socket_type() -> raw. accept(StreamPid) ->
p1_fsm:send_event(StreamPid, accept).
stop(StreamPid) -> StreamPid ! stop. stop(StreamPid) -> StreamPid ! stop.
@ -125,6 +130,10 @@ activate({P1, J1}, {P2, J2}) ->
%%%----------------------- %%%-----------------------
%%% States %%% States
%%%----------------------- %%%-----------------------
accepting(accept, State) ->
inet:setopts(State#state.socket, [{active, true}]),
{next_state, wait_for_init, State}.
wait_for_init(Packet, wait_for_init(Packet,
#state{socket = Socket, auth_type = AuthType} = #state{socket = Socket, auth_type = AuthType} =
StateData) -> StateData) ->