* src/ejabberd_listener.erl: Enable keepalive option

* src/xml_stream.erl: Added API for managing xml streams without
creating process
* src/ejabberd_receiver.erl: Use this API, now 2 processes are
created per C2S connection

SVN Revision: 288
This commit is contained in:
Alexey Shchepin 2004-12-03 22:54:02 +00:00
parent fb977729a9
commit cc19cb1785
4 changed files with 61 additions and 24 deletions

View File

@ -1,3 +1,12 @@
2004-12-03 Alexey Shchepin <alexey@sevcom.net>
* src/ejabberd_listener.erl: Enable keepalive option
* src/xml_stream.erl: Added API for managing xml streams without
creating process
* src/ejabberd_receiver.erl: Use this API, now 2 processes are
created per C2S connection
2004-12-01 Alexey Shchepin <alexey@sevcom.net>
* src/expat_erl.c: Now uses port control instead of port output

View File

@ -70,7 +70,8 @@ init(Port, Module, Opts) ->
{packet, 0},
{active, false},
{reuseaddr, true},
{nodelay, true} |
{nodelay, true},
{keepalive, true} |
SockOpts]),
case Res of
{ok, ListenSocket} ->

View File

@ -24,7 +24,7 @@ start(Socket, SockMod, Shaper) ->
receiver(Socket, SockMod, Shaper, C2SPid) ->
XMLStreamPid = xml_stream:start(self(), C2SPid),
XMLStreamState = xml_stream:new(C2SPid),
ShaperState = shaper:new(Shaper),
Timeout = case SockMod of
ssl ->
@ -32,32 +32,32 @@ receiver(Socket, SockMod, Shaper, C2SPid) ->
_ ->
infinity
end,
receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid, Timeout).
receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout).
receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid, Timeout) ->
receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout) ->
Res = (catch SockMod:recv(Socket, 0, Timeout)),
case Res of
{ok, Data} ->
receive
{starttls, TLSSocket} ->
exit(XMLStreamPid, closed),
XMLStreamPid1 = xml_stream:start(self(), C2SPid),
xml_stream:close(XMLStreamState),
XMLStreamState1 = xml_stream:new(C2SPid),
TLSRes = tls:recv_data(TLSSocket, Data),
receiver1(TLSSocket, tls,
ShaperState, C2SPid, XMLStreamPid1, Timeout,
ShaperState, C2SPid, XMLStreamState1, Timeout,
TLSRes)
after 0 ->
receiver1(Socket, SockMod,
ShaperState, C2SPid, XMLStreamPid, Timeout,
ShaperState, C2SPid, XMLStreamState, Timeout,
Res)
end;
_ ->
receiver1(Socket, SockMod,
ShaperState, C2SPid, XMLStreamPid, Timeout, Res)
ShaperState, C2SPid, XMLStreamState, Timeout, Res)
end.
receiver1(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid, Timeout, Res) ->
receiver1(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout, Res) ->
case Res of
{ok, Text} ->
ShaperSt1 = receive
@ -67,27 +67,27 @@ receiver1(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid, Timeout, Res) ->
ShaperState
end,
NewShaperState = shaper:update(ShaperSt1, size(Text)),
XMLStreamPid1 = receive
reset_stream ->
exit(XMLStreamPid, closed),
xml_stream:start(self(), C2SPid)
after 0 ->
XMLStreamPid
end,
xml_stream:send_text(XMLStreamPid1, Text),
receiver(Socket, SockMod, NewShaperState, C2SPid, XMLStreamPid1,
XMLStreamState1 = receive
reset_stream ->
xml_stream:close(XMLStreamState),
xml_stream:new(C2SPid)
after 0 ->
XMLStreamState
end,
XMLStreamState2 = xml_stream:parse(XMLStreamState1, Text),
receiver(Socket, SockMod, NewShaperState, C2SPid, XMLStreamState2,
Timeout);
{error, timeout} ->
receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid,
receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState,
Timeout);
{error, Reason} ->
exit(XMLStreamPid, closed),
xml_stream:close(XMLStreamState),
gen_fsm:send_event(C2SPid, closed),
ok;
{'EXIT', Reason} ->
?ERROR_MSG("(~w) abnormal ~w:recv termination:~n\t~p~n",
[Socket, SockMod, Reason]),
exit(XMLStreamPid, closed),
xml_stream:close(XMLStreamState),
gen_fsm:send_event(C2SPid, closed),
ok
end.

View File

@ -1,7 +1,7 @@
%%%----------------------------------------------------------------------
%%% File : xml_stream.erl
%%% Author : Alexey Shchepin <alexey@sevcom.net>
%%% Purpose :
%%% Purpose : Parse XML streams
%%% Created : 17 Nov 2002 by Alexey Shchepin <alexey@sevcom.net>
%%% Id : $Id$
%%%----------------------------------------------------------------------
@ -10,7 +10,12 @@
-author('alexey@sevcom.net').
-vsn('$Revision$ ').
-export([start/1, start/2, init/1, init/2, send_text/2]).
-export([start/1, start/2,
init/1, init/2,
send_text/2,
new/1,
parse/2,
close/1]).
-define(XML_START, 0).
-define(XML_END, 1).
@ -19,6 +24,8 @@
-define(PARSE_COMMAND, 0).
-record(xml_stream_state, {callback_pid, port, stack}).
start(CallbackPid) ->
spawn(?MODULE, init, [CallbackPid]).
@ -96,3 +103,23 @@ process_data(CallbackPid, Stack, Data) ->
send_text(Pid, Text) ->
Pid ! {self(), {send, Text}}.
new(CallbackPid) ->
Port = open_port({spawn, expat_erl}, [binary]),
#xml_stream_state{callback_pid = CallbackPid,
port = Port,
stack = []}.
parse(#xml_stream_state{callback_pid = CallbackPid,
port = Port,
stack = Stack} = State, Str) ->
Res = port_control(Port, ?PARSE_COMMAND, Str),
NewStack = lists:foldl(
fun(Data, St) ->
process_data(CallbackPid, St, Data)
end, Stack, binary_to_term(Res)),
State#xml_stream_state{stack = NewStack}.
close(#xml_stream_state{port = Port}) ->
port_close(Port).