diff --git a/ChangeLog b/ChangeLog index b884095a3..a2d0e2489 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,12 @@ +2004-12-03 Alexey Shchepin + + * src/ejabberd_listener.erl: Enable keepalive option + + * src/xml_stream.erl: Added API for managing xml streams without + creating process + * src/ejabberd_receiver.erl: Use this API, now 2 processes are + created per C2S connection + 2004-12-01 Alexey Shchepin * src/expat_erl.c: Now uses port control instead of port output diff --git a/src/ejabberd_listener.erl b/src/ejabberd_listener.erl index 2ea422c30..6e84b65e2 100644 --- a/src/ejabberd_listener.erl +++ b/src/ejabberd_listener.erl @@ -70,7 +70,8 @@ init(Port, Module, Opts) -> {packet, 0}, {active, false}, {reuseaddr, true}, - {nodelay, true} | + {nodelay, true}, + {keepalive, true} | SockOpts]), case Res of {ok, ListenSocket} -> diff --git a/src/ejabberd_receiver.erl b/src/ejabberd_receiver.erl index 8217bb7e8..1f1897fb1 100644 --- a/src/ejabberd_receiver.erl +++ b/src/ejabberd_receiver.erl @@ -24,7 +24,7 @@ start(Socket, SockMod, Shaper) -> receiver(Socket, SockMod, Shaper, C2SPid) -> - XMLStreamPid = xml_stream:start(self(), C2SPid), + XMLStreamState = xml_stream:new(C2SPid), ShaperState = shaper:new(Shaper), Timeout = case SockMod of ssl -> @@ -32,32 +32,32 @@ receiver(Socket, SockMod, Shaper, C2SPid) -> _ -> infinity end, - receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid, Timeout). + receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout). -receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid, Timeout) -> +receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout) -> Res = (catch SockMod:recv(Socket, 0, Timeout)), case Res of {ok, Data} -> receive {starttls, TLSSocket} -> - exit(XMLStreamPid, closed), - XMLStreamPid1 = xml_stream:start(self(), C2SPid), + xml_stream:close(XMLStreamState), + XMLStreamState1 = xml_stream:new(C2SPid), TLSRes = tls:recv_data(TLSSocket, Data), receiver1(TLSSocket, tls, - ShaperState, C2SPid, XMLStreamPid1, Timeout, + ShaperState, C2SPid, XMLStreamState1, Timeout, TLSRes) after 0 -> receiver1(Socket, SockMod, - ShaperState, C2SPid, XMLStreamPid, Timeout, + ShaperState, C2SPid, XMLStreamState, Timeout, Res) end; _ -> receiver1(Socket, SockMod, - ShaperState, C2SPid, XMLStreamPid, Timeout, Res) + ShaperState, C2SPid, XMLStreamState, Timeout, Res) end. -receiver1(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid, Timeout, Res) -> +receiver1(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout, Res) -> case Res of {ok, Text} -> ShaperSt1 = receive @@ -67,27 +67,27 @@ receiver1(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid, Timeout, Res) -> ShaperState end, NewShaperState = shaper:update(ShaperSt1, size(Text)), - XMLStreamPid1 = receive - reset_stream -> - exit(XMLStreamPid, closed), - xml_stream:start(self(), C2SPid) - after 0 -> - XMLStreamPid - end, - xml_stream:send_text(XMLStreamPid1, Text), - receiver(Socket, SockMod, NewShaperState, C2SPid, XMLStreamPid1, + XMLStreamState1 = receive + reset_stream -> + xml_stream:close(XMLStreamState), + xml_stream:new(C2SPid) + after 0 -> + XMLStreamState + end, + XMLStreamState2 = xml_stream:parse(XMLStreamState1, Text), + receiver(Socket, SockMod, NewShaperState, C2SPid, XMLStreamState2, Timeout); {error, timeout} -> - receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid, + receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout); {error, Reason} -> - exit(XMLStreamPid, closed), + xml_stream:close(XMLStreamState), gen_fsm:send_event(C2SPid, closed), ok; {'EXIT', Reason} -> ?ERROR_MSG("(~w) abnormal ~w:recv termination:~n\t~p~n", [Socket, SockMod, Reason]), - exit(XMLStreamPid, closed), + xml_stream:close(XMLStreamState), gen_fsm:send_event(C2SPid, closed), ok end. diff --git a/src/xml_stream.erl b/src/xml_stream.erl index 8a892bf30..d5fbd9372 100644 --- a/src/xml_stream.erl +++ b/src/xml_stream.erl @@ -1,7 +1,7 @@ %%%---------------------------------------------------------------------- %%% File : xml_stream.erl %%% Author : Alexey Shchepin -%%% Purpose : +%%% Purpose : Parse XML streams %%% Created : 17 Nov 2002 by Alexey Shchepin %%% Id : $Id$ %%%---------------------------------------------------------------------- @@ -10,7 +10,12 @@ -author('alexey@sevcom.net'). -vsn('$Revision$ '). --export([start/1, start/2, init/1, init/2, send_text/2]). +-export([start/1, start/2, + init/1, init/2, + send_text/2, + new/1, + parse/2, + close/1]). -define(XML_START, 0). -define(XML_END, 1). @@ -19,6 +24,8 @@ -define(PARSE_COMMAND, 0). +-record(xml_stream_state, {callback_pid, port, stack}). + start(CallbackPid) -> spawn(?MODULE, init, [CallbackPid]). @@ -96,3 +103,23 @@ process_data(CallbackPid, Stack, Data) -> send_text(Pid, Text) -> Pid ! {self(), {send, Text}}. + +new(CallbackPid) -> + Port = open_port({spawn, expat_erl}, [binary]), + #xml_stream_state{callback_pid = CallbackPid, + port = Port, + stack = []}. + + +parse(#xml_stream_state{callback_pid = CallbackPid, + port = Port, + stack = Stack} = State, Str) -> + Res = port_control(Port, ?PARSE_COMMAND, Str), + NewStack = lists:foldl( + fun(Data, St) -> + process_data(CallbackPid, St, Data) + end, Stack, binary_to_term(Res)), + State#xml_stream_state{stack = NewStack}. + +close(#xml_stream_state{port = Port}) -> + port_close(Port).