* src/ejabberd_service.erl: Bugfix

* src/ejabberd_receiver.erl: Rewritten to use {active, once} mode
for socket
* src/ejabberd_c2s.erl: Update
* src/ejabberd_listener.erl: Likewise
* src/ejabberd_s2s_in.erl: Likewise
* src/ejabberd_s2s_out.erl: Likewise
* src/ejabberd_service.erl: Likewise
* src/shaper.erl: Likewise
* src/tls/tls.erl: Likewise
* src/web/ejabberd_http.erl: Likewise

SVN Revision: 483
This commit is contained in:
Alexey Shchepin 2006-01-13 01:55:20 +00:00
parent 8401a5ac55
commit 6bb510d99e
10 changed files with 280 additions and 100 deletions

View File

@ -1,3 +1,18 @@
2006-01-13 Alexey Shchepin <alexey@sevcom.net>
* src/ejabberd_service.erl: Bugfix
* src/ejabberd_receiver.erl: Rewritten to use {active, once} mode
for socket
* src/ejabberd_c2s.erl: Update
* src/ejabberd_listener.erl: Likewise
* src/ejabberd_s2s_in.erl: Likewise
* src/ejabberd_s2s_out.erl: Likewise
* src/ejabberd_service.erl: Likewise
* src/shaper.erl: Likewise
* src/tls/tls.erl: Likewise
* src/web/ejabberd_http.erl: Likewise
2006-01-02 Mickael Remond <mickael.remond@process-one.net>
* src/odbc/ejabberd_odbc.erl: Native MySQL support

View File

@ -17,6 +17,7 @@
start_link/2,
send_text/2,
send_element/2,
become_controller/1,
get_presence/1]).
%% gen_fsm callbacks
@ -97,6 +98,9 @@ start(SockData, Opts) ->
start_link(SockData, Opts) ->
gen_fsm:start_link(ejabberd_c2s, [SockData, Opts], ?FSMOPTS).
become_controller(Pid) ->
gen_fsm:send_all_state_event(Pid, become_controller).
%% Return Username, Resource and presence information
get_presence(FsmRef) ->
gen_fsm:sync_send_all_state_event(FsmRef, {get_presence}, 1000).
@ -791,6 +795,12 @@ session_established(closed, StateData) ->
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
handle_event(become_controller, StateName, StateData) ->
ok = (StateData#state.sockmod):controlling_process(
StateData#state.socket,
StateData#state.receiver),
ejabberd_receiver:become_controller(StateData#state.receiver),
{next_state, StateName, StateData};
handle_event(_Event, StateName, StateData) ->
{next_state, StateName, StateData}.
@ -975,7 +985,10 @@ handle_info({route, From, To, Packet}, StateName, StateData) ->
{next_state, StateName, NewState};
true ->
{next_state, StateName, NewState}
end.
end;
handle_info(Info, StateName, StateData) ->
?ERROR_MSG("Unexpected info: ~p", [Info]),
{next_state, StateName, StateData}.
%%----------------------------------------------------------------------
%% Func: terminate/3
@ -1035,7 +1048,7 @@ terminate(_Reason, StateName, StateData) ->
_ ->
ok
end,
(StateData#state.sockmod):close(StateData#state.socket),
ejabberd_receiver:close(StateData#state.receiver),
ok.
%%%----------------------------------------------------------------------

View File

@ -99,6 +99,7 @@ accept(ListenSocket, Module, Opts) ->
{error, _Reason} ->
gen_tcp:close(Socket)
end,
Module:become_controller(Pid),
accept(ListenSocket, Module, Opts);
{error, Reason} ->
?INFO_MSG("(~w) Failed TCP accept: ~w",
@ -146,6 +147,7 @@ accept_ssl(ListenSocket, Module, Opts) ->
end,
{ok, Pid} = Module:start({ssl, Socket}, Opts),
catch ssl:controlling_process(Socket, Pid),
Module:become_controller(Pid),
accept_ssl(ListenSocket, Module, Opts);
{error, timeout} ->
accept_ssl(ListenSocket, Module, Opts);

View File

@ -10,20 +10,68 @@
-author('alexey@sevcom.net').
-vsn('$Revision$ ').
-behaviour(gen_server).
%% API
-export([start/3,
receiver/4,
change_shaper/2,
reset_stream/1,
starttls/2]).
starttls/2,
become_controller/1,
close/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-include("ejabberd.hrl").
-record(state, {socket,
sock_mod,
shaper_state,
c2s_pid,
xml_stream_state,
timeout}).
%%====================================================================
%% API
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
start(Socket, SockMod, Shaper) ->
proc_lib:spawn(?MODULE, receiver, [Socket, SockMod, Shaper, self()]).
{ok, Pid} = gen_server:start(
?MODULE, [Socket, SockMod, Shaper, self()], []),
Pid.
change_shaper(Pid, Shaper) ->
gen_server:cast(Pid, {change_shaper, Shaper}).
receiver(Socket, SockMod, Shaper, C2SPid) ->
reset_stream(Pid) ->
gen_server:call(Pid, reset_stream).
starttls(Pid, TLSSocket) ->
gen_server:call(Pid, {starttls, TLSSocket}).
become_controller(Pid) ->
gen_server:call(Pid, become_controller).
close(Pid) ->
gen_server:cast(Pid, close).
%%====================================================================
%% gen_server callbacks
%%====================================================================
%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([Socket, SockMod, Shaper, C2SPid]) ->
XMLStreamState = xml_stream:new(C2SPid),
ShaperState = shaper:new(Shaper),
Timeout = case SockMod of
@ -32,77 +80,149 @@ receiver(Socket, SockMod, Shaper, C2SPid) ->
_ ->
infinity
end,
receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout).
{ok, #state{socket = Socket,
sock_mod = SockMod,
shaper_state = ShaperState,
c2s_pid = C2SPid,
xml_stream_state = XMLStreamState,
timeout = Timeout}}.
receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout) ->
Res = (catch SockMod:recv(Socket, 0, Timeout)),
receive
{starttls, TLSSocket} ->
xml_stream:close(XMLStreamState),
XMLStreamState1 = xml_stream:new(C2SPid),
TLSRes = case Res of
{ok, Data} ->
tls:recv_data(TLSSocket, Data);
_ ->
tls:recv_data(TLSSocket, "")
end,
receiver1(TLSSocket, tls,
ShaperState, C2SPid, XMLStreamState1, Timeout,
TLSRes);
{change_timeout, NewTimeout} -> % Dirty hack
receiver1(Socket, SockMod,
ShaperState, C2SPid, XMLStreamState, NewTimeout,
Res)
after 0 ->
receiver1(Socket, SockMod,
ShaperState, C2SPid, XMLStreamState, Timeout,
Res)
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
handle_call({starttls, TLSSocket}, _From,
#state{xml_stream_state = XMLStreamState,
c2s_pid = C2SPid} = State) ->
xml_stream:close(XMLStreamState),
NewXMLStreamState = xml_stream:new(C2SPid),
NewState = State#state{socket = TLSSocket,
sock_mod = tls,
xml_stream_state = NewXMLStreamState},
case tls:recv_data(TLSSocket, "") of
{ok, TLSData} ->
{reply, ok, process_data(TLSData, NewState)};
{error, _Reason} ->
{stop, normal, ok, NewState}
end;
handle_call(reset_stream, _From,
#state{xml_stream_state = XMLStreamState,
c2s_pid = C2SPid} = State) ->
xml_stream:close(XMLStreamState),
NewXMLStreamState = xml_stream:new(C2SPid),
Reply = ok,
{reply, Reply, State#state{xml_stream_state = NewXMLStreamState}};
handle_call(become_controller, _From, State) ->
activate_socket(State),
Reply = ok,
{reply, Reply, State};
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast({change_shaper, Shaper}, State) ->
NewShaperState = shaper:new(Shaper),
{noreply, State#state{shaper_state = NewShaperState}};
handle_cast(close, State) ->
{stop, normal, State};
handle_cast(_Msg, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info({Tag, _TCPSocket, Data},
#state{socket = Socket,
sock_mod = SockMod} = State)
when (Tag == tcp) or (Tag == ssl) ->
case SockMod of
tls ->
case tls:recv_data(Socket, Data) of
{ok, TLSData} ->
{noreply, process_data(TLSData, State)};
{error, _Reason} ->
{stop, normal, State}
end;
_ ->
{noreply, process_data(Data, State)}
end;
handle_info({Tag, _TCPSocket}, State)
when (Tag == tcp_closed) or (Tag == ssl_closed) ->
{stop, normal, State};
handle_info({Tag, _TCPSocket, Reason}, State)
when (Tag == tcp_error) or (Tag == ssl_error) ->
case Reason of
timeout ->
{noreply, State};
_ ->
{stop, normal, State}
end;
handle_info({timeout, _Ref, activate}, State) ->
activate_socket(State),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> void()
%% Description: This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, #state{xml_stream_state = XMLStreamState,
c2s_pid = C2SPid} = State) ->
xml_stream:close(XMLStreamState),
gen_fsm:send_event(C2SPid, closed),
catch (State#state.sock_mod):close(State#state.socket),
ok.
%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
activate_socket(#state{socket = Socket,
sock_mod = SockMod}) ->
case SockMod of
gen_tcp ->
inet:setopts(Socket, [{active, once}]);
_ ->
SockMod:setopts(Socket, [{active, once}])
end.
receiver1(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout, Res) ->
case Res of
{ok, Text} ->
ShaperSt1 = receive
{change_shaper, Shaper} ->
shaper:new(Shaper)
after 0 ->
ShaperState
end,
NewShaperState = shaper:update(ShaperSt1, size(Text)),
XMLStreamState1 = receive
reset_stream ->
xml_stream:close(XMLStreamState),
xml_stream:new(C2SPid)
after 0 ->
XMLStreamState
end,
XMLStreamState2 = xml_stream:parse(XMLStreamState1, Text),
receiver(Socket, SockMod, NewShaperState, C2SPid, XMLStreamState2,
Timeout);
{error, timeout} ->
receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState,
Timeout);
{error, Reason} ->
xml_stream:close(XMLStreamState),
gen_fsm:send_event(C2SPid, closed),
ok;
{'EXIT', Reason} ->
?ERROR_MSG("(~w) abnormal ~w:recv termination:~n\t~p~n",
[Socket, SockMod, Reason]),
xml_stream:close(XMLStreamState),
gen_fsm:send_event(C2SPid, closed),
ok
end.
change_shaper(Pid, Shaper) ->
Pid ! {change_shaper, Shaper}.
reset_stream(Pid) ->
Pid ! reset_stream.
starttls(Pid, TLSSocket) ->
Pid ! {starttls, TLSSocket}.
process_data(Data,
#state{xml_stream_state = XMLStreamState,
shaper_state = ShaperState} = State) ->
XMLStreamState1 = xml_stream:parse(XMLStreamState, Data),
{NewShaperState, Pause} = shaper:update(ShaperState, size(Data)),
if
Pause > 0 ->
erlang:start_timer(Pause, self(), activate);
true ->
activate_socket(State)
end,
State#state{xml_stream_state = XMLStreamState1,
shaper_state = NewShaperState}.

View File

@ -1,7 +1,7 @@
%%%----------------------------------------------------------------------
%%% File : ejabberd_s2s_in.erl
%%% Author : Alexey Shchepin <alexey@sevcom.net>
%%% Purpose :
%%% Purpose : Serve incoming s2s connection
%%% Created : 6 Dec 2002 by Alexey Shchepin <alexey@sevcom.net>
%%% Id : $Id$
%%%----------------------------------------------------------------------
@ -14,7 +14,9 @@
%% External exports
-export([start/2,
start_link/2,match_domain/2]).
start_link/2,
become_controller/1,
match_domain/2]).
%% gen_fsm callbacks
-export([init/1,
@ -29,7 +31,6 @@
-include("ejabberd.hrl").
-include("jlib.hrl").
%-include_lib("ssl/pkix/SSL-PKIX.hrl").
-include_lib("ssl/pkix/PKIX1Explicit88.hrl").
-include_lib("ssl/pkix/PKIX1Implicit88.hrl").
-include("XmppAddr.hrl").
@ -87,6 +88,9 @@ start(SockData, Opts) ->
start_link(SockData, Opts) ->
gen_fsm:start_link(ejabberd_s2s_in, [SockData, Opts], ?FSMOPTS).
become_controller(Pid) ->
gen_fsm:send_all_state_event(Pid, become_controller).
%%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm
%%%----------------------------------------------------------------------
@ -455,6 +459,12 @@ stream_established(closed, StateData) ->
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
handle_event(become_controller, StateName, StateData) ->
ok = (StateData#state.sockmod):controlling_process(
StateData#state.socket,
StateData#state.receiver),
ejabberd_receiver:become_controller(StateData#state.receiver),
{next_state, StateName, StateData};
handle_event(_Event, StateName, StateData) ->
{next_state, StateName, StateData}.
@ -499,7 +509,7 @@ handle_info(_, StateName, StateData) ->
%%----------------------------------------------------------------------
terminate(Reason, _StateName, StateData) ->
?INFO_MSG("terminated: ~p", [Reason]),
(StateData#state.sockmod):close(StateData#state.socket),
ejabberd_receiver:close(StateData#state.receiver),
ok.
%%%----------------------------------------------------------------------

View File

@ -164,6 +164,8 @@ open_socket(init, StateData) ->
case Res of
{ok, Socket} ->
ReceiverPid = ejabberd_receiver:start(Socket, gen_tcp, none),
ok = gen_tcp:controlling_process(Socket, ReceiverPid),
ejabberd_receiver:become_controller(ReceiverPid),
Version = if
StateData#state.use_v10 ->
" version='1.0'";
@ -342,7 +344,6 @@ wait_for_features({xmlstreamelement, El}, StateData) ->
StateData#state{try_auth = false}};
StartTLS and StateData#state.tls and
(not StateData#state.tls_enabled) ->
StateData#state.receiver ! {change_timeout, 100},
send_element(StateData,
{xmlelement, "starttls",
[{"xmlns", ?NS_TLS}], []}),
@ -462,7 +463,6 @@ wait_for_starttls_proceed({xmlstreamelement, El}, StateData) ->
{ok, TLSSocket} = tls:tcp_to_tls(Socket, TLSOpts),
ejabberd_receiver:starttls(
StateData#state.receiver, TLSSocket),
StateData#state.receiver ! {change_timeout, infinity},
NewStateData = StateData#state{sockmod = tls,
socket = TLSSocket,
streamid = new_id(),
@ -630,8 +630,7 @@ handle_info(_, StateName, StateData) ->
%%----------------------------------------------------------------------
terminate(Reason, StateName, StateData) ->
?INFO_MSG("terminated: ~p", [Reason]),
Error = ?ERR_REMOTE_SERVER_NOT_FOUND,
bounce_queue(StateData#state.queue, Error),
bounce_queue(StateData#state.queue, ?ERR_REMOTE_SERVER_NOT_FOUND),
case StateData#state.new of
false ->
ok;
@ -642,8 +641,8 @@ terminate(Reason, StateName, StateData) ->
case StateData#state.socket of
undefined ->
ok;
Socket ->
(StateData#state.sockmod):close(Socket)
_Socket ->
ejabberd_receiver:close(StateData#state.receiver)
end,
ok.

View File

@ -13,7 +13,12 @@
-behaviour(gen_fsm).
%% External exports
-export([start/2, start_link/2, receiver/3, send_text/2, send_element/2]).
-export([start/2,
start_link/2,
receiver/3,
send_text/2,
send_element/2,
become_controller/1]).
%% gen_fsm callbacks
-export([init/1,
@ -75,6 +80,9 @@ start(SockData, Opts) ->
start_link(SockData, Opts) ->
gen_fsm:start_link(ejabberd_service, [SockData, Opts], ?FSMOPTS).
become_controller(_Pid) ->
ok.
%%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm
%%%----------------------------------------------------------------------
@ -175,7 +183,7 @@ wait_for_handshake({xmlstreamelement, El}, StateData) ->
{stop, normal, StateData}
end;
_ ->
{next_state, wait_for_key, StateData}
{next_state, wait_for_handshake, StateData}
end;
wait_for_handshake({xmlstreamend, _Name}, StateData) ->

View File

@ -33,25 +33,26 @@ new1({maxrate, MaxRate}) ->
lasttime = now_to_usec(now())}.
update(none, Size) ->
none;
update(none, _Size) ->
{none, 0};
update(#maxrate{} = State, Size) ->
MinInterv = 1000 * Size /
(2 * State#maxrate.maxrate - State#maxrate.lastrate),
Interv = (now_to_usec(now()) - State#maxrate.lasttime) / 1000,
%io:format("State: ~p, Size=~p~nM=~p, I=~p~n",
% [State, Size, MinInterv, Interv]),
if
MinInterv > Interv ->
timer:sleep(1 + trunc(MinInterv - Interv));
true ->
ok
end,
Now = now_to_usec(now()),
State#maxrate{
lastrate = (State#maxrate.lastrate +
1000000 * Size / (Now - State#maxrate.lasttime))/2,
lasttime = Now}.
Pause = if
MinInterv > Interv ->
1 + trunc(MinInterv - Interv);
true ->
0
end,
NextNow = now_to_usec(now()) + Pause * 1000,
{State#maxrate{
lastrate = (State#maxrate.lastrate +
1000000 * Size / (NextNow - State#maxrate.lasttime))/2,
lasttime = NextNow},
Pause}.
now_to_usec({MSec, Sec, USec}) ->

View File

@ -16,6 +16,8 @@
tcp_to_tls/2, tls_to_tcp/1,
send/2,
recv/2, recv/3, recv_data/2,
setopts/2,
controlling_process/2,
close/1,
get_peer_certificate/1,
get_verify_result/1,
@ -175,6 +177,12 @@ send(#tlssock{tcpsock = TCPSocket, tlsport = Port}, Packet) ->
end.
setopts(#tlssock{tcpsock = TCPSocket}, Opts) ->
inet:setopts(TCPSocket, Opts).
controlling_process(#tlssock{tcpsock = TCPSocket}, Pid) ->
gen_tcp:controlling_process(TCPSocket, Pid).
close(#tlssock{tcpsock = TCPSocket, tlsport = Port}) ->
gen_tcp:close(TCPSocket),
port_close(Port).

View File

@ -13,6 +13,7 @@
%% External exports
-export([start/2,
start_link/2,
become_controller/1,
receive_headers/1,
url_encode/1]).
@ -81,6 +82,9 @@ start_link({SockMod, Socket}, Opts) ->
use_web_admin = UseWebAdmin}])}.
become_controller(_Pid) ->
ok.
send_text(State, Text) ->
(State#state.sockmod):send(State#state.socket, Text).