diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index bd7a125ac..5a9f6fddc 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -71,6 +71,7 @@ -record(state, {socket, sockmod, socket_monitor, + xml_socket, streamid, sasl_state, access, @@ -124,6 +125,24 @@ -define(DEFAULT_NS, ?NS_JABBER_CLIENT). -define(PREFIXED_NS, [{?NS_XMPP, ?NS_XMPP_pfx}]). +-define(STREAM_HEADER, + "" + "" + ). + +-define(STREAM_TRAILER, ""). + +-define(INVALID_NS_ERR, exmpp_stream:error('invalid-namespace')). +-define(INVALID_XML_ERR, exmpp_stream:error('xml-not-well-formed')). +-define(HOST_UNKNOWN_ERR, exmpp_stream:error('host-unknown')). +-define(SERRT_CONFLICT, exmpp_stream:error('conflict')). +-define(POLICY_VIOLATION_ERR(Lang, Text), + exmpp_stream:error('policy-violation', {Lang, Text})). + +-define(INVALID_FROM, exmpp_stream:error('invalid-from')). + -define(STANZA_ERROR(NS, Condition), exmpp_xml:xmlel_to_xmlelement(exmpp_stanza:error(NS, Condition), [?NS_JABBER_CLIENT], [{?NS_XMPP, "stream"}])). @@ -174,6 +193,11 @@ init([{SockMod, Socket}, Opts]) -> {value, {_, S}} -> S; _ -> none end, + XMLSocket = + case lists:keysearch(xml_socket, 1, Opts) of + {value, {_, XS}} -> XS; + _ -> false + end, Zlib = lists:member(zlib, Opts), StartTLS = lists:member(starttls, Opts), StartTLSRequired = lists:member(starttls_required, Opts), @@ -203,6 +227,7 @@ init([{SockMod, Socket}, Opts]) -> {ok, wait_for_stream, #state{socket = Socket1, sockmod = SockMod, socket_monitor = SocketMonitor, + xml_socket = XMLSocket, zlib = Zlib, tls = TLS, tls_required = StartTLSRequired, @@ -248,7 +273,7 @@ wait_for_stream({xmlstreamstart, #xmlel{ns = NS} = Opening}, StateData) -> exmpp_jid:make(ServerB)), case exmpp_stream:get_version(Opening) of {1, 0} -> - send_element(StateData, Header), + send_header(StateData, Server, "1.0", DefaultLang), case StateData#state.authenticated of false -> SASLState = @@ -333,6 +358,7 @@ wait_for_stream({xmlstreamstart, #xmlel{ns = NS} = Opening}, StateData) -> end end; _ -> + send_header(StateData, Server, "", DefaultLang), if (not StateData#state.tls_enabled) and StateData#state.tls_required -> @@ -340,6 +366,7 @@ wait_for_stream({xmlstreamstart, #xmlel{ns = NS} = Opening}, StateData) -> exmpp_xml:append_child(Header, exmpp_stream:error('policy-violation', {"en", "Use of STARTTLS required"}))), + send_trailer(StateData), {stop, normal, StateData}; true -> send_element(StateData, Header), @@ -350,16 +377,15 @@ wait_for_stream({xmlstreamstart, #xmlel{ns = NS} = Opening}, StateData) -> end end; _ -> - Header2 = exmpp_stream:set_initiating_entity(Header, - ?MYNAME), - send_element(StateData, exmpp_xml:append_child(Header2, - exmpp_stream:error('host-unknown'))), + send_header(StateData, ?MYNAME, "", DefaultLang), + send_element(StateData, ?HOST_UNKNOWN_ERR), + send_trailer(StateData), {stop, normal, StateData} end; _ -> - Header2 = exmpp_stream:set_initiating_entity(Header, ?MYNAME), - send_element(StateData, exmpp_xml:append_child(Header2, - exmpp_stream:error('invalid-namespace'))), + send_header(StateData, ?MYNAME, "", DefaultLang), + send_element(StateData, ?INVALID_NS_ERR), + send_trailer(StateData), {stop, normal, StateData} end; @@ -367,21 +393,19 @@ wait_for_stream(timeout, StateData) -> {stop, normal, StateData}; wait_for_stream({xmlstreamelement, _}, StateData) -> - send_element(StateData, exmpp_stream:error('xml-not-well-formed')), - send_element(StateData, exmpp_stream:closing()), + send_element(StateData, ?INVALID_XML_ERR), + send_trailer(StateData), {stop, normal, StateData}; wait_for_stream({xmlstreamend, _}, StateData) -> - send_element(StateData, exmpp_stream:error('xml-not-well-formed')), - send_element(StateData, exmpp_stream:closing()), + send_element(StateData, ?INVALID_XML_ERR), + send_trailer(StateData), {stop, normal, StateData}; wait_for_stream({xmlstreamerror, _}, StateData) -> - Header = exmpp_stream:opening_reply(?MYNAME, 'jabber:client', "1.0", - "none"), - Header1 = exmpp_xml:append_child(Header, - exmpp_stream:error('xml-not-well-formed')), - send_element(StateData, Header1), + send_header(StateData, ?MYNAME, "1.0", ""), + send_element(StateData, ?INVALID_XML_ERR), + send_trailer(StateData), {stop, normal, StateData}; wait_for_stream(closed, StateData) -> @@ -495,12 +519,12 @@ wait_for_auth(timeout, StateData) -> {stop, normal, StateData}; wait_for_auth({xmlstreamend, _Name}, StateData) -> - send_element(StateData, exmpp_stream:closing()), + send_trailer(StateData), {stop, normal, StateData}; wait_for_auth({xmlstreamerror, _}, StateData) -> - send_element(StateData, exmpp_stream:error('xml-not-well-formed')), - send_element(StateData, exmpp_stream:closing()), + send_element(StateData, ?INVALID_XML_ERR), + send_trailer(StateData), {stop, normal, StateData}; wait_for_auth(closed, StateData) -> @@ -604,9 +628,10 @@ wait_for_feature_request({xmlstreamelement, #xmlel{ns = NS, name = Name} = El}, _ -> if (SockMod == gen_tcp) and TLSRequired -> + Lang = StateData#state.lang, send_element(StateData, exmpp_stream:error( - 'policy-violation', {"en", "Use of STARTTLS required"})), - send_element(StateData, exmpp_stream:closing()), + 'policy-violation', {Lang, "Use of STARTTLS required"})), + send_trailer(StateData), {stop, normal, StateData}; true -> process_unauthenticated_stanza(StateData, El), @@ -619,11 +644,12 @@ wait_for_feature_request(timeout, StateData) -> wait_for_feature_request({xmlstreamend, _Name}, StateData) -> send_element(StateData, exmpp_stream:closing()), + send_trailer(StateData), {stop, normal, StateData}; wait_for_feature_request({xmlstreamerror, _}, StateData) -> - send_element(StateData, exmpp_stream:error('xml-not-well-formed')), - send_element(StateData, exmpp_stream:closing()), + send_element(StateData, ?INVALID_XML_ERR), + send_trailer(StateData), {stop, normal, StateData}; wait_for_feature_request(closed, StateData) -> @@ -678,12 +704,12 @@ wait_for_sasl_response(timeout, StateData) -> {stop, normal, StateData}; wait_for_sasl_response({xmlstreamend, _Name}, StateData) -> - send_element(StateData, exmpp_stream:closing()), + send_trailer(StateData), {stop, normal, StateData}; wait_for_sasl_response({xmlstreamerror, _}, StateData) -> - send_element(StateData, exmpp_stream:error('xml-not-well-formed')), - send_element(StateData, exmpp_stream:closing()), + send_element(StateData, ?INVALID_XML_ERR), + send_trailer(StateData), {stop, normal, StateData}; wait_for_sasl_response(closed, StateData) -> @@ -717,12 +743,12 @@ wait_for_bind(timeout, StateData) -> {stop, normal, StateData}; wait_for_bind({xmlstreamend, _Name}, StateData) -> - send_element(StateData, exmpp_stream:closing()), + send_trailer(StateData), {stop, normal, StateData}; wait_for_bind({xmlstreamerror, _}, StateData) -> - send_element(StateData, exmpp_stream:error('xml-not-well-formed')), - send_element(StateData, exmpp_stream:closing()), + send_element(StateData, ?INVALID_XML_ERR), + send_trailer(StateData), {stop, normal, StateData}; wait_for_bind(closed, StateData) -> @@ -791,12 +817,12 @@ wait_for_session(timeout, StateData) -> {stop, normal, StateData}; wait_for_session({xmlstreamend, _Name}, StateData) -> - send_element(StateData, exmpp_stream:closing()), + send_trailer(StateData), {stop, normal, StateData}; wait_for_session({xmlstreamerror, _}, StateData) -> - send_element(StateData, exmpp_stream:error('xml-not-well-formed')), - send_element(StateData, exmpp_stream:closing()), + send_element(StateData, ?INVALID_XML_ERR), + send_trailer(StateData), {stop, normal, StateData}; wait_for_session(closed, StateData) -> @@ -804,10 +830,11 @@ wait_for_session(closed, StateData) -> session_established({xmlstreamelement, El}, StateData) -> + %% Check 'from' attribute in stanza RFC 3920 Section 9.1.2 case check_from(El, StateData#state.jid) of 'invalid-from' -> - send_element(StateData, exmpp_stream:error('invalid-from')), - send_element(StateData, exmpp_stream:closing()), + send_element(StateData, ?INVALID_FROM), + send_trailer(StateData), {stop, normal, StateData}; _ -> session_established2(El, StateData) @@ -823,12 +850,17 @@ session_established(timeout, StateData) -> fsm_next_state(session_established, StateData); session_established({xmlstreamend, _Name}, StateData) -> - send_element(StateData, exmpp_stream:closing()), + send_trailer(StateData), + {stop, normal, StateData}; + +session_established({xmlstreamerror, "XML stanza is too big" = E}, StateData) -> + send_element(StateData, ?POLICY_VIOLATION_ERR(StateData#state.lang, E)), + send_trailer(StateData), {stop, normal, StateData}; session_established({xmlstreamerror, _}, StateData) -> - send_element(StateData, exmpp_stream:error('xml-not-well-formed')), - send_element(StateData, exmpp_stream:closing()), + send_element(StateData, ?INVALID_XML_ERR), + send_trailer(StateData), {stop, normal, StateData}; session_established(closed, StateData) -> @@ -1002,8 +1034,10 @@ handle_info({send_text, Text}, StateName, StateData) -> ejabberd_hooks:run(c2s_loop_debug, [Text]), fsm_next_state(StateName, StateData); handle_info(replaced, _StateName, StateData) -> - send_element(StateData, exmpp_stream:error('conflict')), - send_element(StateData, exmpp_stream:closing()), + _Lang = StateData#state.lang, + send_element(StateData, + ?SERRT_CONFLICT), %% (Lang, "Replaced by new connection")), + send_trailer(StateData), {stop, normal, StateData#state{authenticated = replaced}}; %% Process Packets that are to be send to the user handle_info({route, From, To, Packet}, StateName, StateData) -> @@ -1194,9 +1228,9 @@ handle_info({route, From, To, Packet}, StateName, StateData) -> if Pass == exit -> %% When Pass==exit, NewState contains a string instead of a #state{} - Lang = StateData#state.lang, - catch send_element(StateData, exmpp_stream:error('undefined-condition', {Lang, NewState})), - catch send_element(StateData, exmpp_stream:closing()), + _Lang = StateData#state.lang, + send_element(StateData, ?SERRT_CONFLICT), %% (Lang, NewState)), + send_trailer(StateData), {stop, normal, StateData}; Pass -> Attrs2 = exmpp_stanza:set_sender_in_attrs(NewAttrs, From), @@ -1292,9 +1326,60 @@ send_text(StateData, Text) -> send_element(StateData, #xmlel{ns = ?NS_XMPP, name = 'stream'} = El) -> send_text(StateData, exmpp_stream:to_iolist(El)); +send_element(StateData, El) when StateData#state.xml_socket -> + (StateData#state.sockmod):send_xml(StateData#state.socket, + {xmlstreamelement, El}); send_element(StateData, El) -> send_text(StateData, exmpp_stanza:to_iolist(El)). +send_header(StateData, Server, Version, Lang) + when StateData#state.xml_socket -> + VersionAttr = + case Version of + "" -> []; + _ -> [{"version", Version}] + end, + LangAttr = + case Lang of + "" -> []; + _ -> [{"xml:lang", Lang}] + end, + Header = + {xmlstreamstart, + "stream:stream", + VersionAttr ++ + LangAttr ++ + [{"xmlns", "jabber:client"}, + {"xmlns:stream", "http://etherx.jabber.org/streams"}, + {"id", StateData#state.streamid}, + {"from", Server}]}, + (StateData#state.sockmod):send_xml( + StateData#state.socket, Header); +send_header(StateData, Server, Version, Lang) -> + VersionStr = + case Version of + "" -> ""; + _ -> [" version='", Version, "'"] + end, + LangStr = + case Lang of + "" -> ""; + _ -> [" xml:lang='", Lang, "'"] + end, + Header = io_lib:format(?STREAM_HEADER, + [StateData#state.streamid, + Server, + VersionStr, + LangStr]), + send_text(StateData, Header). + +send_trailer(StateData) when StateData#state.xml_socket -> + (StateData#state.sockmod):send_xml( + StateData#state.socket, + {xmlstreamend, "stream:stream"}); +send_trailer(StateData) -> + send_element(StateData, exmpp_stream:closing()). + new_id() -> randoms:get_string(). diff --git a/src/ejabberd_receiver.erl b/src/ejabberd_receiver.erl index 8d13eb497..6cfc3ae36 100644 --- a/src/ejabberd_receiver.erl +++ b/src/ejabberd_receiver.erl @@ -198,7 +198,7 @@ handle_cast(_Msg, State) -> handle_info({Tag, _TCPSocket, Data}, #state{socket = Socket, sock_mod = SockMod} = State) - when (Tag == tcp) or (Tag == ssl) -> + when (Tag == tcp) or (Tag == ssl) or (Tag == ejabberd_xml) -> case SockMod of tls -> case tls:recv_data(Socket, Data) of @@ -288,6 +288,25 @@ activate_socket(#state{socket = Socket, ok end. +%% Data processing for connectors directly generating xmlelement in +%% Erlang data structure. +%% WARNING: Shaper does not work with Erlang data structure. +process_data([], State) -> + activate_socket(State), + State; +process_data([Element|Els], #state{c2s_pid = C2SPid} = State) + when element(1, Element) == xmlelement; + element(1, Element) == xmlstreamstart; + element(1, Element) == xmlstreamelement; + element(1, Element) == xmlstreamend -> + if + C2SPid == undefined -> + State; + true -> + catch gen_fsm:send_event(C2SPid, element_wrapper(Element)), + process_data(Els, State) + end; +%% Data processing for connectors receivind data as string. process_data(Data, #state{xml_stream_state = XMLStreamState, shaper_state = ShaperState, @@ -310,6 +329,16 @@ process_data(Data, {State#state{xml_stream_state = XMLStreamState1, shaper_state = NewShaperState}, HibTimeout}. +%% Element coming from XML parser are wrapped inside xmlstreamelement +%% When we receive directly xmlelement tuple (from a socket module +%% speaking directly Erlang XML), we wrap it inside the same +%% xmlstreamelement coming from the XML parser. +element_wrapper(XMLElement) + when element(1, XMLElement) == xmlelement -> + {xmlstreamelement, XMLElement}; +element_wrapper(Element) -> + Element. + close_stream(undefined) -> ok; close_stream(XMLStreamState) -> diff --git a/src/ejabberd_socket.erl b/src/ejabberd_socket.erl index 566ee1e43..d629a77d9 100644 --- a/src/ejabberd_socket.erl +++ b/src/ejabberd_socket.erl @@ -37,6 +37,7 @@ compress/2, reset_stream/1, send/2, + send_xml/2, change_shaper/2, monitor/1, get_sockmod/1, @@ -62,10 +63,18 @@ start(Module, SockMod, Socket, Opts) -> {value, {_, Size}} -> Size; _ -> infinity end, - Receiver = ejabberd_receiver:start(Socket, SockMod, none, MaxStanzaSize), + {ReceiverMod, Receiver, RecRef} = + case catch SockMod:custom_receiver(Socket) of + {receiver, RecMod, RecPid} -> + {RecMod, RecPid, RecMod}; + _ -> + RecPid = ejabberd_receiver:start( + Socket, SockMod, none, MaxStanzaSize), + {ejabberd_receiver, RecPid, RecPid} + end, SocketData = #socket_state{sockmod = SockMod, socket = Socket, - receiver = Receiver}, + receiver = RecRef}, case Module:start({?MODULE, SocketData}, Opts) of {ok, Pid} -> case SockMod:controlling_process(Socket, Receiver) of @@ -74,7 +83,7 @@ start(Module, SockMod, Socket, Opts) -> {error, _Reason} -> SockMod:close(Socket) end, - ejabberd_receiver:become_controller(Receiver, Pid); + ReceiverMod:become_controller(Receiver, Pid); {error, _Reason} -> SockMod:close(Socket) end; @@ -143,18 +152,33 @@ compress(SocketData, Data) -> send(SocketData, Data), SocketData#socket_state{socket = ZlibSocket, sockmod = ejabberd_zlib}. -reset_stream(SocketData) -> - ejabberd_receiver:reset_stream(SocketData#socket_state.receiver). +reset_stream(SocketData) when is_pid(SocketData#socket_state.receiver) -> + ejabberd_receiver:reset_stream(SocketData#socket_state.receiver); +reset_stream(SocketData) when is_atom(SocketData#socket_state.receiver) -> + (SocketData#socket_state.receiver):reset_stream( + SocketData#socket_state.socket). send(SocketData, Data) -> catch (SocketData#socket_state.sockmod):send( SocketData#socket_state.socket, Data). -change_shaper(SocketData, Shaper) -> - ejabberd_receiver:change_shaper(SocketData#socket_state.receiver, Shaper). +send_xml(SocketData, Data) -> + catch (SocketData#socket_state.sockmod):send_xml( + SocketData#socket_state.socket, Data). -monitor(SocketData) -> - erlang:monitor(process, SocketData#socket_state.receiver). +change_shaper(SocketData, Shaper) + when is_pid(SocketData#socket_state.receiver) -> + ejabberd_receiver:change_shaper(SocketData#socket_state.receiver, Shaper); +change_shaper(SocketData, Shaper) + when is_atom(SocketData#socket_state.receiver) -> + (SocketData#socket_state.receiver):change_shaper( + SocketData#socket_state.socket, Shaper). + +monitor(SocketData) when is_pid(SocketData#socket_state.receiver) -> + erlang:monitor(process, SocketData#socket_state.receiver); +monitor(SocketData) when is_atom(SocketData#socket_state.receiver) -> + (SocketData#socket_state.receiver):monitor( + SocketData#socket_state.socket). get_sockmod(SocketData) -> SocketData#socket_state.sockmod. diff --git a/src/web/ejabberd_http_bind.erl b/src/web/ejabberd_http_bind.erl index e37ed4307..885ac9c68 100644 --- a/src/web/ejabberd_http_bind.erl +++ b/src/web/ejabberd_http_bind.erl @@ -4,29 +4,11 @@ %%% Purpose : Implements XMPP over BOSH (XEP-0205) (formerly known as %%% HTTP Binding) %%% Created : 21 Sep 2005 by Stefan Strigler -%%% -%%% -%%% ejabberd, Copyright (C) 2002-2009 ProcessOne -%%% -%%% This program is free software; you can redistribute it and/or -%%% modify it under the terms of the GNU General Public License as -%%% published by the Free Software Foundation; either version 2 of the -%%% License, or (at your option) any later version. -%%% -%%% This program is distributed in the hope that it will be useful, -%%% but WITHOUT ANY WARRANTY; without even the implied warranty of -%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -%%% General Public License for more details. -%%% -%%% You should have received a copy of the GNU General Public License -%%% along with this program; if not, write to the Free Software -%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA -%%% 02111-1307 USA -%%% +%%% Modified: may 2009 by Mickael Remond, Alexey Schepin +%%% Id : $Id: ejabberd_http_bind.erl 953 2009-05-07 10:40:40Z alexey $ %%%---------------------------------------------------------------------- -module(ejabberd_http_bind). --author('steve@zeank.in-berlin.de'). -behaviour(gen_fsm). @@ -39,15 +21,19 @@ handle_info/3, terminate/3, send/2, - setopts/2, + send_xml/2, sockname/1, peername/1, + setopts/2, controlling_process/2, + become_controller/2, + custom_receiver/1, + reset_stream/1, + change_shaper/2, + monitor/1, close/1, process_request/2]). --define(ejabberd_debug, true). - -include("ejabberd.hrl"). -include("jlib.hrl"). -include("ejabberd_http.hrl"). @@ -62,7 +48,6 @@ %% http binding request -record(hbr, {rid, key, - in, out}). -record(state, {id, @@ -70,8 +55,10 @@ key, socket, output = "", - input = "", + input = queue:new(), waiting_input = false, + shaper_state, + shaper_timer, last_receiver, last_poll, http_receiver, @@ -79,22 +66,30 @@ ctime = 0, timer, pause=0, - unprocessed_req_list = [], % list of request that have been delayed for proper reordering + unprocessed_req_list = [], % list of request that have been delayed for proper reordering: {Request, PID} req_list = [], % list of requests (cache) max_inactivity, + max_pause, ip = ?NULL_PEER }). +%% Internal request format: +-record(http_put, {rid, + attrs, + payload, + payload_size, + hold, + stream, + ip}). %%-define(DBGFSM, true). - -ifdef(DBGFSM). -define(FSMOPTS, [{debug, [trace]}]). -else. -define(FSMOPTS, []). -endif. --define(BOSH_VERSION, "1.6"). +-define(BOSH_VERSION, "1.8"). -define(NS_CLIENT, "jabber:client"). -define(NS_BOSH, "urn:xmpp:xbosh"). -define(NS_HTTP_BIND, "http://jabber.org/protocol/httpbind"). @@ -131,6 +126,9 @@ start_link(Sid, Key, IP) -> send({http_bind, FsmRef, _IP}, Packet) -> gen_fsm:sync_send_all_state_event(FsmRef, {send, Packet}). +send_xml({http_bind, FsmRef, _IP}, Packet) -> + gen_fsm:sync_send_all_state_event(FsmRef, {send_xml, Packet}). + setopts({http_bind, FsmRef, _IP}, Opts) -> case lists:member({active, once}, Opts) of true -> @@ -142,6 +140,21 @@ setopts({http_bind, FsmRef, _IP}, Opts) -> controlling_process(_Socket, _Pid) -> ok. +custom_receiver({http_bind, FsmRef, _IP}) -> + {receiver, ?MODULE, FsmRef}. + +become_controller(FsmRef, C2SPid) -> + gen_fsm:send_all_state_event(FsmRef, {become_controller, C2SPid}). + +reset_stream({http_bind, _FsmRef, _IP}) -> + ok. + +change_shaper({http_bind, FsmRef, _IP}, Shaper) -> + gen_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}). + +monitor({http_bind, FsmRef, _IP}) -> + erlang:monitor(process, FsmRef). + close({http_bind, FsmRef, _IP}) -> catch gen_fsm:sync_send_all_state_event(FsmRef, {stop, close}). @@ -151,8 +164,19 @@ sockname(_Socket) -> peername({http_bind, _FsmRef, IP}) -> {ok, IP}. + +%% Entry point for data coming from client through ejabberd HTTP server: process_request(Data, IP) -> - case catch parse_request(Data) of + Opts1 = ejabberd_c2s_config:get_c2s_limits(), + Opts = [{xml_socket, true} | Opts1], + MaxStanzaSize = + case lists:keysearch(max_stanza_size, 1, Opts) of + {value, {_, Size}} -> Size; + _ -> infinity + end, + PayloadSize = iolist_size(Data), + case catch parse_request(Data, PayloadSize, MaxStanzaSize) of + %% No existing session: {ok, {"", Rid, Attrs, Payload}} -> case xml:get_attr_s("to",Attrs) of "" -> @@ -169,11 +193,13 @@ process_request(Data, IP) -> "condition='internal-server-error' " "xmlns='" ++ ?NS_HTTP_BIND ++ "'>BOSH module not started - handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, Payload, IP) + handle_session_start( + Pid, XmppDomain, Sid, Rid, Attrs, + Payload, PayloadSize, IP) end end; + %% Existing session {ok, {Sid, Rid, Attrs, Payload1}} -> - %% old session StreamStart = case xml:get_attr_s("xmpp:restart",Attrs) of "true" -> @@ -184,17 +210,21 @@ process_request(Data, IP) -> Payload2 = case xml:get_attr_s("type",Attrs) of "terminate" -> %% close stream - Payload1 ++ ""; + Payload1 ++ [{xmlstreamend, "stream:stream"}]; _ -> Payload1 end, - handle_http_put(Sid, Rid, Attrs, Payload2, StreamStart, IP); + handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize, + StreamStart, IP); + {error, size_limit} -> + {413, ?HEADER, "Request Too Large"}; _ -> - ?ERROR_MSG("Received bad request: ~p", [Data]), + ?DEBUG("Received bad request: ~p", [Data]), {400, ?HEADER, ""} end. -handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, Payload, IP) -> +handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, + Payload, PayloadSize, IP) -> ?DEBUG("got pid: ~p", [Pid]), Wait = case string:to_integer(xml:get_attr_s("wait",Attrs)) of {error, _} -> @@ -238,7 +268,7 @@ handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, Payload, IP) -> version = Version }) end), - handle_http_put(Sid, Rid, Attrs, Payload, true, IP). + handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, true, IP). %%%---------------------------------------------------------------------- %%% Callback functions from gen_fsm @@ -260,58 +290,46 @@ init([Sid, Key, IP]) -> %% each connector. The default behaviour should be however to use %% the default c2s restrictions if not defined for the current %% connector. - Opts = ejabberd_c2s_config:get_c2s_limits(), + Opts1 = ejabberd_c2s_config:get_c2s_limits(), + Opts = [{xml_socket, true} | Opts1], + Shaper = none, + ShaperState = shaper:new(Shaper), Socket = {http_bind, self(), IP}, ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, Opts), Timer = erlang:start_timer(?MAX_INACTIVITY, self(), []), {ok, loop, #state{id = Sid, key = Key, socket = Socket, + shaper_state = ShaperState, max_inactivity = ?MAX_INACTIVITY, + max_pause = ?MAX_PAUSE, timer = Timer}}. -%%---------------------------------------------------------------------- -%% Func: StateName/2 -%% Returns: {next_state, NextStateName, NextStateData} | -%% {next_state, NextStateName, NextStateData, Timeout} | -%% {stop, Reason, NewStateData} -%%---------------------------------------------------------------------- - - -%%---------------------------------------------------------------------- -%% Func: StateName/3 -%% Returns: {next_state, NextStateName, NextStateData} | -%% {next_state, NextStateName, NextStateData, Timeout} | -%% {reply, Reply, NextStateName, NextStateData} | -%% {reply, Reply, NextStateName, NextStateData, Timeout} | -%% {stop, Reason, NewStateData} | -%% {stop, Reason, Reply, NewStateData} -%%---------------------------------------------------------------------- -%state_name(Event, From, StateData) -> -% Reply = ok, -% {reply, Reply, state_name, StateData}. - %%---------------------------------------------------------------------- %% Func: handle_event/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- -handle_event({activate, From}, StateName, StateData) -> +handle_event({become_controller, C2SPid}, StateName, StateData) -> case StateData#state.input of - "" -> + cancel -> {next_state, StateName, StateData#state{ - waiting_input = {From, ok}}}; + waiting_input = C2SPid}}; Input -> - Receiver = From, - Receiver ! {tcp, StateData#state.socket, list_to_binary(Input)}, + lists:foreach( + fun(Event) -> + C2SPid ! Event + end, queue:to_list(Input)), {next_state, StateName, StateData#state{ - input = "", - waiting_input = false, - last_receiver = Receiver}} + input = queue:new(), + waiting_input = C2SPid}} end; +handle_event({change_shaper, Shaper}, StateName, StateData) -> + NewShaperState = shaper:new(Shaper), + {next_state, StateName, StateData#state{shaper_state = NewShaperState}}; handle_event(_Event, StateName, StateData) -> {next_state, StateName, StateData}. @@ -324,37 +342,34 @@ handle_event(_Event, StateName, StateData) -> %% {stop, Reason, NewStateData} | %% {stop, Reason, Reply, NewStateData} %%---------------------------------------------------------------------- -handle_sync_event({send, Packet}, _From, StateName, StateData) -> - Output = [StateData#state.output | Packet], - if - StateData#state.http_receiver /= undefined -> - cancel_timer(StateData#state.timer), - Timer = if - StateData#state.pause > 0 -> - erlang:start_timer( - StateData#state.pause*1000, self(), []); - true -> - erlang:start_timer( - StateData#state.max_inactivity, self(), []) - end, - HTTPReply = case Output of - [[]| OutPacket] -> - {ok, OutPacket}; - _ -> - {ok, Output} - end, - gen_fsm:reply(StateData#state.http_receiver, HTTPReply), - cancel_timer(StateData#state.wait_timer), - Reply = ok, - {reply, Reply, StateName, - StateData#state{output = [], - http_receiver = undefined, - wait_timer = undefined, - timer = Timer}}; - true -> - Reply = ok, - {reply, Reply, StateName, StateData#state{output = Output}} - end; +handle_sync_event({send_xml, Packet}, _From, StateName, + #state{http_receiver = undefined} = StateData) -> + Output = [Packet | StateData#state.output], + Reply = ok, + {reply, Reply, StateName, StateData#state{output = Output}}; +handle_sync_event({send_xml, Packet}, _From, StateName, StateData) -> + Output = [Packet | StateData#state.output], + cancel_timer(StateData#state.timer), + Timer = set_inactivity_timer(StateData#state.pause, + StateData#state.max_inactivity), + HTTPReply = {ok, Output}, + gen_fsm:reply(StateData#state.http_receiver, HTTPReply), + cancel_timer(StateData#state.wait_timer), + Rid = StateData#state.rid, + ReqList = [#hbr{rid = Rid, + key = StateData#state.key, + out = Output + } | + [El || El <- StateData#state.req_list, + El#hbr.rid /= Rid ] + ], + Reply = ok, + {reply, Reply, StateName, + StateData#state{output = [], + http_receiver = undefined, + req_list = ReqList, + wait_timer = undefined, + timer = Timer}}; handle_sync_event({stop,close}, _From, _StateName, StateData) -> Reply = ok, @@ -363,130 +378,82 @@ handle_sync_event({stop,stream_closed}, _From, _StateName, StateData) -> Reply = ok, {stop, normal, Reply, StateData}; handle_sync_event({stop,Reason}, _From, _StateName, StateData) -> - ?DEBUG("Closing bind session ~p - Reason: ~p", [StateData#state.id, Reason]), + ?ERROR_MSG("Closing bind session ~p - Reason: ~p", [StateData#state.id, Reason]), Reply = ok, {stop, normal, Reply, StateData}; - %% HTTP PUT: Receive packets from the client -handle_sync_event({http_put, Rid, Attrs, _Payload, Hold, _StreamTo, _IP}=Request, - _From, StateName, StateData) -> - %% Check if Rid valid - RidAllow = - case StateData#state.rid of - none -> - %% First request - nothing saved so far - {true, 0}; - OldRid -> - ?DEBUG("state.rid/cur rid: ~p/~p", [OldRid, Rid]), - if - %% We did not miss any packet, we can process it immediately: - Rid == OldRid + 1 -> - case catch list_to_integer( - xml:get_attr_s("pause", Attrs)) of - {'EXIT', _} -> - {true, 0}; - Pause1 when Pause1 =< ?MAX_PAUSE -> - ?DEBUG("got pause: ~p", [Pause1]), - {true, Pause1}; - _ -> - {true, 0} - end; - %% We have missed packets, we need to cached it to process it later on: - (OldRid < Rid) and - (Rid =< (OldRid + Hold + 1)) -> - buffer; - (Rid =< OldRid) and - (Rid > OldRid - Hold - 1) -> - repeat; - true -> - false - end +handle_sync_event(#http_put{rid = Rid}, + _From, StateName, StateData) + when StateData#state.shaper_timer /= undefined -> + Pause = + case erlang:read_timer(StateData#state.shaper_timer) of + false -> + 0; + P -> P end, + Reply = {wait, Pause}, + ?DEBUG("Shaper timer for RID ~p: ~p", [Rid, Reply]), + {reply, Reply, StateName, StateData}; - %% Check if Rid is in sequence or out of sequence: - case RidAllow of - buffer -> - ?DEBUG("Buffered request: ~p", [Request]), - %% Request is out of sequence: - PendingRequests = StateData#state.unprocessed_req_list, - %% In case an existing RID was already buffered: - Requests = lists:keydelete(Rid, 2, PendingRequests), - {reply, ok, StateName, StateData#state{unprocessed_req_list=[Request|Requests]}}; - _ -> - %% Request is in sequence: - process_http_put(Request, StateName, StateData, RidAllow) - end; +handle_sync_event(#http_put{rid = _Rid, attrs = _Attrs, + payload_size = PayloadSize, + hold = _Hold} = Request, + _From, StateName, StateData) -> + ?DEBUG("New request: ~p",[Request]), + %% Updating trafic shaper + {NewShaperState, NewShaperTimer} = + update_shaper(StateData#state.shaper_state, PayloadSize), + + handle_http_put_event(Request, StateName, + StateData#state{shaper_state = NewShaperState, + shaper_timer = NewShaperTimer}); %% HTTP GET: send packets to the client handle_sync_event({http_get, Rid, Wait, Hold}, From, StateName, StateData) -> %% setup timer - if - StateData#state.http_receiver /= undefined -> - gen_fsm:reply(StateData#state.http_receiver, {ok, empty}); - true -> - ok - end, + send_receiver_reply(StateData#state.http_receiver, {ok, empty}), cancel_timer(StateData#state.wait_timer), - {TMegSec, TSec, TMSec} = now(), - TNow = (TMegSec * 1000000 + TSec) * 1000000 + TMSec, + TNow = tnow(), if (Hold > 0) and - (StateData#state.output == "") and + (StateData#state.output == []) and ((TNow - StateData#state.ctime) < (Wait*1000*1000)) and (StateData#state.rid == Rid) and - (StateData#state.input /= "cancel") and + (StateData#state.input /= cancel) and (StateData#state.pause == 0) -> WaitTimer = erlang:start_timer(Wait * 1000, self(), []), + %% MR: Not sure we should cancel the state timer here. cancel_timer(StateData#state.timer), {next_state, StateName, StateData#state{ http_receiver = From, wait_timer = WaitTimer, timer = undefined}}; - (StateData#state.input == "cancel") -> + (StateData#state.input == cancel) -> cancel_timer(StateData#state.timer), - Timer = if - StateData#state.pause > 0 -> - erlang:start_timer( - StateData#state.pause*1000, self(), []); - true -> - erlang:start_timer( - StateData#state.max_inactivity, self(), []) - end, + Timer = set_inactivity_timer(StateData#state.pause, + StateData#state.max_inactivity), Reply = {ok, cancel}, {reply, Reply, StateName, StateData#state{ - input = "", + input = queue:new(), http_receiver = undefined, wait_timer = undefined, timer = Timer}}; true -> cancel_timer(StateData#state.timer), - Timer = if - StateData#state.pause > 0 -> - erlang:start_timer( - StateData#state.pause*1000, self(), []); - true -> - erlang:start_timer( - StateData#state.max_inactivity, self(), []) - end, - case StateData#state.output of - [[]| OutPacket] -> - Reply = {ok, OutPacket}; - _ -> - Reply = {ok, StateData#state.output} - end, + Timer = set_inactivity_timer(StateData#state.pause, + StateData#state.max_inactivity), + Reply = {ok, StateData#state.output}, %% save request - ReqList = [#hbr{rid=Rid, - key=StateData#state.key, - in=StateData#state.input, - out=StateData#state.output + ReqList = [#hbr{rid = Rid, + key = StateData#state.key, + out = StateData#state.output } | [El || El <- StateData#state.req_list, El#hbr.rid /= Rid ] ], {reply, Reply, StateName, StateData#state{ - input = "", - output = "", + output = [], http_receiver = undefined, wait_timer = undefined, timer = Timer, @@ -510,6 +477,7 @@ code_change(_OldVsn, StateName, StateData, _Extra) -> %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- +%% We reached the max_inactivity timeout: handle_info({timeout, Timer, _}, _StateName, #state{id=SID, timer = Timer} = StateData) -> ?WARNING_MSG("Session timeout. Closing the HTTP bind session: ~p", [SID]), @@ -520,23 +488,30 @@ handle_info({timeout, WaitTimer, _}, StateName, if StateData#state.http_receiver /= undefined -> cancel_timer(StateData#state.timer), - Timer = if - StateData#state.pause > 0 -> - erlang:start_timer( - StateData#state.pause*1000, self(), []); - true -> - erlang:start_timer( - StateData#state.max_inactivity, self(), []) - end, + Timer = set_inactivity_timer(StateData#state.pause, + StateData#state.max_inactivity), gen_fsm:reply(StateData#state.http_receiver, {ok, empty}), + Rid = StateData#state.rid, + ReqList = [#hbr{rid = Rid, + key = StateData#state.key, + out = [] + } | + [El || El <- StateData#state.req_list, + El#hbr.rid /= Rid ] + ], {next_state, StateName, StateData#state{http_receiver = undefined, + req_list = ReqList, wait_timer = undefined, timer = Timer}}; true -> {next_state, StateName, StateData} end; +handle_info({timeout, ShaperTimer, _}, StateName, + #state{shaper_timer = ShaperTimer} = StateData) -> + {next_state, StateName, StateData#state{shaper_timer = undefined}}; + handle_info(_, StateName, StateData) -> {next_state, StateName, StateData}. @@ -551,19 +526,12 @@ terminate(_Reason, _StateName, StateData) -> fun() -> mnesia:delete({http_bind, StateData#state.id}) end), - if - StateData#state.http_receiver /= undefined -> - gen_fsm:reply(StateData#state.http_receiver, {ok, terminate}); - true -> - ok - end, + send_receiver_reply(StateData#state.http_receiver, {ok, terminate}), case StateData#state.waiting_input of false -> - case StateData#state.last_receiver of - undefined -> ok; - Receiver -> Receiver ! {tcp_closed, StateData#state.socket} - end; - {Receiver, _Tag} -> Receiver ! {tcp_closed, StateData#state.socket} + ok; + C2SPid -> + gen_fsm:send_event(C2SPid, closed) end, ok. @@ -572,8 +540,47 @@ terminate(_Reason, _StateName, StateData) -> %%%---------------------------------------------------------------------- %% PUT / Get processing: -process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP}, +handle_http_put_event(#http_put{rid = Rid, attrs = Attrs, + hold = Hold} = Request, + StateName, StateData) -> + ?DEBUG("New request: ~p",[Request]), + %% Check if Rid valid + RidAllow = rid_allow(StateData#state.rid, Rid, Attrs, Hold, + StateData#state.max_pause), + + %% Check if Rid is in sequence or out of sequence: + case RidAllow of + buffer -> + ?DEBUG("Buffered request: ~p", [Request]), + %% Request is out of sequence: + PendingRequests = StateData#state.unprocessed_req_list, + %% In case an existing RID was already buffered: + Requests = lists:keydelete(Rid, 2, PendingRequests), + ReqList = [#hbr{rid = Rid, + key = StateData#state.key, + out = [] + } | + [El || El <- StateData#state.req_list, + El#hbr.rid > (Rid - 1 - Hold)] + ], + ?DEBUG("reqlist: ~p", [ReqList]), + UnprocessedReqList = [Request | Requests], + cancel_timer(StateData#state.timer), + Timer = set_inactivity_timer(0, StateData#state.max_inactivity), + {reply, buffered, StateName, + StateData#state{unprocessed_req_list = UnprocessedReqList, + req_list = ReqList, + timer = Timer}}; + _ -> + %% Request is in sequence: + process_http_put(Request, StateName, StateData, RidAllow) + end. + +process_http_put(#http_put{rid = Rid, attrs = Attrs, payload = Payload, + hold = Hold, stream = StreamTo, + ip = IP} = Request, StateName, StateData, RidAllow) -> + ?DEBUG("Actually processing request: ~p", [Request]), %% Check if key valid Key = xml:get_attr_s("key", Attrs), NewKey = xml:get_attr_s("newkey", Attrs), @@ -599,16 +606,15 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP}, end end end, - {TMegSec, TSec, TMSec} = now(), - TNow = (TMegSec * 1000000 + TSec) * 1000000 + TMSec, + TNow = tnow(), LastPoll = if - Payload == "" -> + Payload == [] -> TNow; true -> 0 end, if - (Payload == "") and + (Payload == []) and (Hold == 0) and (TNow - StateData#state.last_poll < ?MIN_POLLING) -> Reply = {error, polling_too_frequently}, @@ -621,17 +627,15 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP}, repeat -> ?DEBUG("REPEATING ~p", [Rid]), Reply = case [El#hbr.out || - El <- StateData#state.req_list, + El <- StateData#state.req_list, El#hbr.rid == Rid] of [] -> {error, not_exists}; - [ [[] | Out] | _XS] -> - {repeat, Out}; [Out | _XS] -> - {repeat, Out} - end, - {reply, Reply, StateName, - StateData#state{input = "cancel", last_poll = LastPoll}}; + {repeat, lists:reverse(Out)} + end, + {reply, Reply, StateName, StateData#state{input = cancel, + last_poll = LastPoll}}; {true, Pause} -> SaveKey = if NewKey == "" -> @@ -642,30 +646,33 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP}, ?DEBUG(" -- SaveKey: ~s~n", [SaveKey]), %% save request - ReqList = [#hbr{rid=Rid, - key=StateData#state.key, - in=StateData#state.input, - out=StateData#state.output - } | - [El || El <- StateData#state.req_list, - El#hbr.rid < Rid, - El#hbr.rid > (Rid - 1 - Hold)] - ], + ReqList1 = + [El || El <- StateData#state.req_list, + El#hbr.rid > (Rid - 1 - Hold)], + ReqList = + case lists:keymember(Rid, #hbr.rid, ReqList1) of + true -> + ReqList1; + false -> + [#hbr{rid = Rid, + key = StateData#state.key, + out = [] + } | + ReqList1 + ] + end, ?DEBUG("reqlist: ~p", [ReqList]), %% setup next timer cancel_timer(StateData#state.timer), - Timer = if - Pause > 0 -> - erlang:start_timer( - Pause*1000, self(), []); - true -> - erlang:start_timer( - StateData#state.max_inactivity, self(), []) - end, + Timer = set_inactivity_timer(Pause, + StateData#state.max_inactivity), case StateData#state.waiting_input of false -> - Input = Payload ++ [StateData#state.input], + Input = + lists:foldl( + fun queue:in/2, + StateData#state.input, Payload), Reply = ok, process_buffered_request(Reply, StateName, StateData#state{input = Input, @@ -678,32 +685,42 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP}, req_list = ReqList, ip = IP }); - {Receiver, _Tag} -> - SendPacket = - case StreamTo of - {To, ""} -> - [""] - ++ Payload; - {To, Version} -> - [""] - ++ Payload; - _ -> - Payload - end, + C2SPid -> + case StreamTo of + {To, ""} -> + gen_fsm:send_event( + C2SPid, + {xmlstreamstart, "stream:stream", + [{"to", To}, + {"xmlns", ?NS_CLIENT}, + {"xmlns:stream", ?NS_STREAM}]}); + {To, Version} -> + gen_fsm:send_event( + C2SPid, + {xmlstreamstart, "stream:stream", + [{"to", To}, + {"xmlns", ?NS_CLIENT}, + {"version", Version}, + {"xmlns:stream", ?NS_STREAM}]}); + _ -> + ok + end, + MaxInactivity = get_max_inactivity(StreamTo, StateData#state.max_inactivity), - ?DEBUG("really sending now: ~s", [SendPacket]), - Receiver ! {tcp, StateData#state.socket, - list_to_binary(SendPacket)}, + MaxPause = get_max_inactivity(StreamTo, StateData#state.max_pause), + + ?DEBUG("really sending now: ~p", [Payload]), + lists:foreach( + fun({xmlstreamend, End}) -> + gen_fsm:send_event( + C2SPid, {xmlstreamend, End}); + (El) -> + gen_fsm:send_event( + C2SPid, {xmlstreamelement, El}) + end, Payload), Reply = ok, process_buffered_request(Reply, StateName, - StateData#state{waiting_input = false, - last_receiver = Receiver, - input = "", + StateData#state{input = queue:new(), rid = Rid, key = SaveKey, ctime = TNow, @@ -712,6 +729,7 @@ process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP}, last_poll = LastPoll, req_list = ReqList, max_inactivity = MaxInactivity, + max_pause = MaxPause, ip = IP }) end @@ -727,29 +745,42 @@ process_buffered_request(Reply, StateName, StateData) -> case lists:keysearch(Rid+1, 2, Requests) of {value, Request} -> ?DEBUG("Processing buffered request: ~p", [Request]), - NewRequests = Requests -- [Request], - handle_sync_event(Request, undefined, StateName, - StateData#state{unprocessed_req_list=NewRequests}); + NewRequests = lists:keydelete(Rid+1, 2, Requests), + handle_http_put_event( + Request, StateName, + StateData#state{unprocessed_req_list = NewRequests}); _ -> {reply, Reply, StateName, StateData} end. -handle_http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) -> - case http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) of +handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) -> + case http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) of {error, not_exists} -> - ?DEBUG("no session associated with sid: ~p", [Sid]), + ?ERROR_MSG("no session associated with sid: ~p", [Sid]), {404, ?HEADER, ""}; {{error, Reason}, Sess} -> - ?DEBUG("Error on HTTP put. Reason: ~p", [Reason]), + ?ERROR_MSG("Error on HTTP put. Reason: ~p", [Reason]), handle_http_put_error(Reason, Sess); {{repeat, OutPacket}, Sess} -> ?DEBUG("http_put said 'repeat!' ...~nOutPacket: ~p", [OutPacket]), send_outpacket(Sess, OutPacket); + {{wait, Pause}, _Sess} -> + ?DEBUG("Trafic Shaper: Delaying request ~p", [Rid]), + timer:sleep(Pause), + %{200, ?HEADER, + % xml:element_to_string( + % {xmlelement, "body", + % [{"xmlns", ?NS_HTTP_BIND}, + % {"type", "error"}], []})}; + handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, + StreamStart, IP); + {buffered, _Sess} -> + {200, ?HEADER, ""}; {ok, Sess} -> prepare_response(Sess, Rid, Attrs, StreamStart) end. -http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) -> +http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) -> ?DEBUG("Looking for session: ~p", [Sid]), case mnesia:dirty_read({http_bind, Sid}) of [] -> @@ -763,7 +794,9 @@ http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) -> "" end, {gen_fsm:sync_send_all_state_event( - FsmRef, {http_put, Rid, Attrs, Payload, Hold, NewStream, IP}), Sess} + FsmRef, #http_put{rid = Rid, attrs = Attrs, payload = Payload, + payload_size = PayloadSize, hold = Hold, + stream = NewStream, ip = IP}, 30000), Sess} end. handle_http_put_error(Reason, #http_bind{pid=FsmRef, version=Version}) @@ -806,6 +839,46 @@ handle_http_put_error(Reason, #http_bind{pid=FsmRef}) -> {403, ?HEADER, ""} end. +%% Control RID ordering +rid_allow(none, _NewRid, _Attrs, _Hold, _MaxPause) -> + %% First request - nothing saved so far + {true, 0}; +rid_allow(OldRid, NewRid, Attrs, Hold, MaxPause) -> + ?DEBUG("Previous rid / New rid: ~p/~p", [OldRid, NewRid]), + if + %% We did not miss any packet, we can process it immediately: + NewRid == OldRid + 1 -> + case catch list_to_integer( + xml:get_attr_s("pause", Attrs)) of + {'EXIT', _} -> + {true, 0}; + Pause1 when Pause1 =< MaxPause -> + ?DEBUG("got pause: ~p", [Pause1]), + {true, Pause1}; + _ -> + {true, 0} + end; + %% We have missed packets, we need to cached it to process it later on: + (OldRid < NewRid) and + (NewRid =< (OldRid + Hold + 1)) -> + buffer; + (NewRid =< OldRid) and + (NewRid > OldRid - Hold - 1) -> + repeat; + true -> + false + end. + +update_shaper(ShaperState, PayloadSize) -> + {NewShaperState, Pause} = shaper:update(ShaperState, PayloadSize), + if + Pause > 0 -> + ShaperTimer = erlang:start_timer(Pause, self(), activate), %% MR: Seems timer is not needed. Activate is not handled + {NewShaperState, ShaperTimer}; + true -> + {NewShaperState, undefined} + end. + prepare_response(#http_bind{id=Sid, wait=Wait, hold=Hold, to=To}=Sess, Rid, _, StreamStart) -> receive after 100 -> ok end, %% TODO: Why is this needed. Argh. Bad programming practice. @@ -819,62 +892,52 @@ prepare_response(#http_bind{id=Sid, wait=Wait, hold=Hold, to=To}=Sess, {200, ?HEADER, ""}; {ok, terminate} -> {200, ?HEADER, ""}; - {ok, OutPacket} -> - ?DEBUG("OutPacket: ~s", [OutPacket]), + {ok, ROutPacket} -> + OutPacket = lists:reverse(ROutPacket), + ?DEBUG("OutPacket: ~p", [OutPacket]), case StreamStart of false -> send_outpacket(Sess, OutPacket); true -> - OutEls = - case xml_stream:parse_element( - OutPacket++"") of - El when element(1, El) == xmlelement -> - ?DEBUG("~p", [El]), - {xmlelement, _, OutAttrs, Els} = El, - AuthID = xml:get_attr_s("id", OutAttrs), - From = xml:get_attr_s("from", OutAttrs), - Version = xml:get_attr_s("version", OutAttrs), - StreamError = false, - case Els of - [] -> - []; - [{xmlelement, "stream:features", - StreamAttribs, StreamEls} - | StreamTail] -> - [{xmlelement, "stream:features", - [{"xmlns:stream", - ?NS_STREAM} - ] - ++ StreamAttribs, - StreamEls - }] ++ StreamTail; - Xml -> - Xml - end; - {error, _} -> - AuthID = "", - From = "", - Version = "", - StreamError = true, - [] - end, - if - StreamError == true -> - {200, ?HEADER, ""}; - true -> - BOSH_attribs = - [{"authid", AuthID}, - {"xmlns:xmpp", ?NS_BOSH}, - {"xmlns:stream", ?NS_STREAM}] ++ - case OutEls of - [] -> - []; - _ -> - [{"xmpp:version", Version}] - end, - MaxInactivity = get_max_inactivity(To, ?MAX_INACTIVITY), + case OutPacket of + [{xmlstreamstart, _, OutAttrs} | Els] -> + AuthID = xml:get_attr_s("id", OutAttrs), + From = xml:get_attr_s("from", OutAttrs), + Version = xml:get_attr_s("version", OutAttrs), + OutEls = + case Els of + [] -> + []; + [{xmlstreamelement, + {xmlelement, "stream:features", + StreamAttribs, StreamEls}} + | StreamTail] -> + TypedTail = + [check_default_xmlns(OEl) || + {xmlstreamelement, OEl} <- + StreamTail], + [{xmlelement, "stream:features", + [{"xmlns:stream", + ?NS_STREAM}] ++ + StreamAttribs, StreamEls}] ++ + TypedTail; + StreamTail -> + [check_default_xmlns(OEl) || + {xmlstreamelement, OEl} <- + StreamTail] + end, + BOSH_attribs = + [{"authid", AuthID}, + {"xmlns:xmpp", ?NS_BOSH}, + {"xmlns:stream", ?NS_STREAM}] ++ + case OutEls of + [] -> + []; + _ -> + [{"xmpp:version", Version}] + end, + MaxInactivity = get_max_inactivity(To, ?MAX_INACTIVITY), + MaxPause = get_max_pause(To), {200, ?HEADER, xml:element_to_string( {xmlelement,"body", @@ -885,16 +948,20 @@ prepare_response(#http_bind{id=Sid, wait=Wait, hold=Hold, to=To}=Sess, {"requests", integer_to_list(Hold+1)}, {"inactivity", integer_to_list( - trunc(MaxInactivity/1000))}, - {"maxpause", - integer_to_list(?MAX_PAUSE)}, + trunc(MaxInactivity/1000))}, + {"maxpause", + integer_to_list(MaxPause)}, {"polling", - integer_to_list( - trunc(?MIN_POLLING/1000000))}, - {"ver", ?BOSH_VERSION}, - {"from", From}, - {"secure", "true"} %% we're always being secure - ] ++ BOSH_attribs,OutEls})} + integer_to_list( + trunc(?MIN_POLLING/1000000))}, + {"ver", ?BOSH_VERSION}, + {"from", From}, + {"secure", "true"} %% we're always being secure + ] ++ BOSH_attribs,OutEls})}; + {error, _} -> + {200, ?HEADER, ""} end end; {'EXIT', {shutdown, _}} -> @@ -909,77 +976,85 @@ http_get(#http_bind{pid = FsmRef, wait = Wait, hold = Hold}, Rid) -> send_outpacket(#http_bind{pid = FsmRef}, OutPacket) -> case OutPacket of - "" -> + [] -> {200, ?HEADER, ""}; - "" -> + [{xmlstreamend, _}] -> gen_fsm:sync_send_all_state_event(FsmRef,{stop,stream_closed}), {200, ?HEADER, ""}; _ -> - case xml_stream:parse_element("" - ++ OutPacket - ++ "") - of - El when element(1, El) == xmlelement -> - {xmlelement, _, _, OEls} = El, + %% TODO: We parse to add a default namespace to packet, + %% The spec says adding the jabber:client namespace if + %% mandatory, even if some implementation do not do that + %% change on packets. + %% I think this should be an option to avoid modifying + %% packet in most case. + AllElements = + lists:all(fun({xmlstreamelement, + {xmlelement, "stream:error", _, _}}) -> false; + ({xmlstreamelement, _}) -> true; + (_) -> false + end, OutPacket), + case AllElements of + true -> TypedEls = [check_default_xmlns(OEl) || - OEl <- OEls], + {xmlstreamelement, OEl} <- OutPacket], + Body = xml:element_to_string( + {xmlelement,"body", + [{"xmlns", + ?NS_HTTP_BIND}], + TypedEls}), ?DEBUG(" --- outgoing data --- ~n~s~n --- END --- ~n", - [xml:element_to_string( - {xmlelement,"body", - [{"xmlns", - ?NS_HTTP_BIND}], - TypedEls})] - ), - {200, ?HEADER, - xml:element_to_string( - {xmlelement,"body", - [{"xmlns", - ?NS_HTTP_BIND}], - TypedEls})}; - {error, _E} -> - OutEls = case xml_stream:parse_element( - OutPacket++"") of - SEl when element(1, SEl) == xmlelement -> - {xmlelement, _, _OutAttrs, SEls} = SEl, - StreamError = false, - case SEls of - [] -> - []; - [{xmlelement, - "stream:features", - StreamAttribs, StreamEls} | - StreamTail] -> - TypedTail = - [check_default_xmlns(OEl) || - OEl <- StreamTail], - [{xmlelement, - "stream:features", - [{"xmlns:stream", - ?NS_STREAM}] ++ - StreamAttribs, StreamEls}] ++ - TypedTail; - Xml -> - Xml - end; - {error, _} -> - StreamError = true, - [] - end, - if - StreamError -> + [Body]), + {200, ?HEADER, Body}; + false -> + case OutPacket of + [{xmlstreamstart, _, _} | SEls] -> + OutEls = + case SEls of + [{xmlstreamelement, + {xmlelement, + "stream:features", + StreamAttribs, StreamEls}} | + StreamTail] -> + TypedTail = + [check_default_xmlns(OEl) || + {xmlstreamelement, OEl} <- + StreamTail], + [{xmlelement, + "stream:features", + [{"xmlns:stream", + ?NS_STREAM}] ++ + StreamAttribs, StreamEls}] ++ + TypedTail; + StreamTail -> + [check_default_xmlns(OEl) || + {xmlstreamelement, OEl} <- + StreamTail] + end, + {200, ?HEADER, + xml:element_to_string( + {xmlelement,"body", + [{"xmlns", + ?NS_HTTP_BIND}], + OutEls})}; + _ -> + SErrCond = + lists:filter( + fun({xmlstreamelement, + {xmlelement, "stream:error", + _, _}}) -> + true; + (_) -> false + end, OutPacket), StreamErrCond = - case xml_stream:parse_element( - "" ++ OutPacket) of - El when element(1, El) == xmlelement -> - case xml:get_subtag(El, "stream:error") of - false -> - null; - {xmlelement, _, _, _Cond} = StreamErrorTag -> - [StreamErrorTag] - end; - {error, _E} -> - null - end, + case SErrCond of + [] -> + null; + [{xmlstreamelement, + {xmlelement, _, _, _Cond} = + StreamErrorTag} | _] -> + [StreamErrorTag] + end, gen_fsm:sync_send_all_state_event(FsmRef, {stop, {stream_error,OutPacket}}), case StreamErrCond of @@ -996,27 +1071,22 @@ send_outpacket(#http_bind{pid = FsmRef}, OutPacket) -> "xmlns:stream='"++?NS_STREAM++"'>" ++ elements_to_string(StreamErrCond) ++ ""} - end; - true -> - {200, ?HEADER, - xml:element_to_string( - {xmlelement,"body", - [{"xmlns", - ?NS_HTTP_BIND}], - OutEls})} - end + end + end end end. -parse_request(Data) -> +parse_request(_Data, PayloadSize, MaxStanzaSize) + when PayloadSize > MaxStanzaSize -> + {error, size_limit}; +parse_request(Data, _PayloadSize, _MaxStanzaSize) -> ?DEBUG("--- incoming data --- ~n~s~n --- END --- ", [Data]), + %% MR: I do not think it works if put put several elements in the + %% same body: case xml_stream:parse_element(Data) of - El when element(1, El) == xmlelement -> - {xmlelement, Name, Attrs, Els} = El, + {xmlelement, "body", Attrs, Els} -> Xmlns = xml:get_attr_s("xmlns",Attrs), if - Name /= "body" -> - {error, bad_request}; Xmlns /= ?NS_HTTP_BIND -> {error, bad_request}; true -> @@ -1024,7 +1094,7 @@ parse_request(Data) -> {'EXIT', _} -> {error, bad_request}; Rid -> - %% I guess this is to remove XMLCDATA: + %% I guess this is to remove XMLCDATA: Is it really needed ? FixedEls = lists:filter( fun(I) -> @@ -1035,28 +1105,22 @@ parse_request(Data) -> false end end, Els), - %% MR: I commented this code, because it is not used. -%% lists:map( -%% fun(E) -> -%% EXmlns = xml:get_tag_attr_s("xmlns",E), -%% if -%% EXmlns == ?NS_CLIENT -> -%% remove_tag_attr("xmlns",E); -%% true -> -%% ok -%% end -%% end, FixedEls), - Payload = [xml:element_to_string(E) || E <- FixedEls], Sid = xml:get_attr_s("sid",Attrs), - %% MR: I do not think we need to extract - %% Sid. We should have it somewhere else: - {ok, {Sid, Rid, Attrs, Payload}} + {ok, {Sid, Rid, Attrs, FixedEls}} end end; + {xmlelement, _Name, _Attrs, _Els} -> + {error, bad_request}; {error, _Reason} -> {error, bad_request} end. +send_receiver_reply(undefined, _Reply) -> + ok; +send_receiver_reply(Receiver, Reply) -> + gen_fsm:reply(Receiver, Reply). + + %% Cancel timer and empty message queue. cancel_timer(undefined) -> ok; @@ -1069,6 +1133,15 @@ cancel_timer(Timer) -> ok end. +%% If client asked for a pause (pause > 0), we apply the pause value +%% as inactivity timer: +set_inactivity_timer(Pause, _MaxInactivity) when Pause > 0 -> + erlang:start_timer(Pause*1000, self(), []); +%% Otherwise, we apply the max_inactivity value as inactivity timer: +set_inactivity_timer(_Pause, MaxInactivity) -> + erlang:start_timer(MaxInactivity, self(), []). + + %% TODO: Use tail recursion and list reverse ? elements_to_string([]) -> []; @@ -1087,11 +1160,15 @@ get_max_inactivity({Host, _}, Default) -> get_max_inactivity(_, Default) -> Default. -%% remove_tag_attr(Attr, {xmlelement, Name, Attrs, Els}) -> -%% Attrs1 = lists:keydelete(Attr, 1, Attrs), -%% {xmlelement, Name, Attrs1, Els}; -%% remove_tag_attr(Attr, El) -> -%% El. +get_max_pause({Host, _}) -> + gen_mod:get_module_opt(Host, mod_http_bind, max_pause, ?MAX_PAUSE); +get_max_pause(_) -> + ?MAX_PAUSE. + +%% Current time as integer +tnow() -> + {TMegSec, TSec, TMSec} = now(), + (TMegSec * 1000000 + TSec) * 1000000 + TMSec. check_default_xmlns({xmlelement, Name, Attrs, Els} = El) -> case xml:get_tag_attr_s("xmlns", El) of @@ -1104,6 +1181,6 @@ check_default_xmlns({xmlelement, Name, Attrs, Els} = El) -> check_bind_module(XmppDomain) -> case gen_mod:is_loaded(XmppDomain, mod_http_bind) of true -> ok; - false -> ?ERROR_MSG("You are trying to use HTTP Bind (BOSH), but the module mod_http_bind is not started.~n" + false -> ?ERROR_MSG("You are trying to use BOSH (HTTP Bind), but the module mod_http_bind is not started.~n" "Check your 'modules' section in your ejabberd configuration file.",[]) end.