%%%---------------------------------------------------------------------- %%% File : ejabberd_http_bind.erl %%% Author : Stefan Strigler %%% Purpose : Implements XMPP over BOSH (XEP-0205) (formerly known as %%% HTTP Binding) %%% Created : 21 Sep 2005 by Stefan Strigler %%% Modified: may 2009 by Mickael Remond, Alexey Schepin %%%---------------------------------------------------------------------- -module(ejabberd_http_bind). -behaviour(gen_fsm). %% External exports -export([start_link/3, init/1, handle_event/3, handle_sync_event/4, code_change/4, handle_info/3, terminate/3, send/2, send_xml/2, sockname/1, peername/1, setopts/2, controlling_process/2, become_controller/2, change_controller/2, custom_receiver/1, reset_stream/1, change_shaper/2, monitor/1, close/1, start/4, handle_session_start/8, handle_http_put/7, http_put/7, http_get/2, prepare_response/4, process_request/2]). -include("ejabberd.hrl"). -include("jlib.hrl"). -include("ejabberd_http.hrl"). -include("http_bind.hrl"). -include_lib("exmpp/include/exmpp.hrl"). -record(http_bind, {id, pid, to, hold, wait, process_delay, version}). -define(NULL_PEER, {{0, 0, 0, 0}, 0}). %% http binding request -record(hbr, {rid, key, out}). -record(state, {id, rid = none, key, socket, output = [], input = queue:new(), waiting_input = false, shaper_state, shaper_timer, last_receiver, last_poll, http_receiver, wait_timer, ctime = 0, timer, pause=0, unprocessed_req_list = [], % list of request that have been delayed for proper reordering: {Request, PID} req_list = [], % list of requests (cache) max_inactivity, max_pause, ip = ?NULL_PEER }). %% Internal request format: -record(http_put, {rid, attrs, payload, payload_size, hold, stream, ip}). %%-define(DBGFSM, true). -ifdef(DBGFSM). -define(FSMOPTS, [{debug, [trace]}]). -else. -define(FSMOPTS, []). -endif. -define(BOSH_VERSION_b, <<"1.8">>). -define(MAX_REQUESTS, 2). % number of simultaneous requests -define(MIN_POLLING, 2000000). % don't poll faster than that or we will % shoot you (time in microsec) -define(MAX_WAIT, 3600). % max num of secs to keep a request on hold -define(MAX_INACTIVITY, 120000). % msecs to wait before terminating % idle sessions -define(MAX_PAUSE, 120). % may num of sec a client is allowed to pause % the session %% Wait 100ms before continue processing, to allow the client provide more related stanzas. -define(PROCESS_DELAY_DEFAULT, 100). -define(PROCESS_DELAY_MIN, 0). -define(PROCESS_DELAY_MAX, 1000). %%%---------------------------------------------------------------------- %%% API %%%---------------------------------------------------------------------- %% TODO: If compile with no supervisor option, start the session without %% supervisor start(XMPPDomain, Sid, Key, IP) -> ?DEBUG("Starting session", []), case catch supervisor:start_child(ejabberd_http_bind_sup, [Sid, Key, IP]) of {ok, Pid} -> {ok, Pid}; {error, _} = Err -> case check_bind_module(XMPPDomain) of false -> {error, "Cannot start HTTP bind session"}; true -> ?ERROR_MSG("Cannot start HTTP bind session: ~p", [Err]), Err end; Exit -> ?ERROR_MSG("Cannot start HTTP bind session: ~p", [Exit]), {error, Exit} end. start_link(Sid, Key, IP) -> gen_fsm:start_link(?MODULE, [Sid, Key, IP], ?FSMOPTS). send({http_bind, FsmRef, _IP}, Packet) -> gen_fsm:sync_send_all_state_event(FsmRef, {send, Packet}). send_xml({http_bind, FsmRef, _IP}, Packet) -> gen_fsm:sync_send_all_state_event(FsmRef, {send_xml, Packet}). setopts({http_bind, FsmRef, _IP}, Opts) -> case lists:member({active, once}, Opts) of true -> gen_fsm:send_all_state_event(FsmRef, {activate, self()}); _ -> case lists:member({active, false}, Opts) of true -> gen_fsm:sync_send_all_state_event( FsmRef, deactivate_socket); _ -> ok end end. controlling_process(_Socket, _Pid) -> ok. custom_receiver({http_bind, FsmRef, _IP}) -> {receiver, ?MODULE, FsmRef}. become_controller(FsmRef, C2SPid) -> gen_fsm:send_all_state_event(FsmRef, {become_controller, C2SPid}). change_controller({http_bind, FsmRef, _IP}, C2SPid) -> become_controller(FsmRef, C2SPid). reset_stream({http_bind, _FsmRef, _IP}) -> ok. change_shaper({http_bind, FsmRef, _IP}, Shaper) -> gen_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}). monitor({http_bind, FsmRef, _IP}) -> erlang:monitor(process, FsmRef). close({http_bind, FsmRef, _IP}) -> catch gen_fsm:sync_send_all_state_event(FsmRef, {stop, close}). sockname(_Socket) -> {ok, ?NULL_PEER}. peername({http_bind, _FsmRef, IP}) -> {ok, IP}. %% Entry point for data coming from client through ejabberd HTTP server: process_request(Data, IP) -> Opts1 = ejabberd_c2s_config:get_c2s_limits(), Opts = [{xml_socket, true} | Opts1], MaxStanzaSize = case lists:keysearch(max_stanza_size, 1, Opts) of {value, {_, Size}} -> Size; _ -> infinity end, PayloadSize = iolist_size(Data), case catch parse_request(Data, PayloadSize, MaxStanzaSize) of %% No existing session: {ok, {"", Rid, Attrs, Payload}} -> case exmpp_xml:get_attribute_from_list_as_list(Attrs, "to", "") of "" -> ?DEBUG("Session not created (Improper addressing)", []), {200, ?HEADER, ""}; XmppDomain -> %% create new session Sid = make_sid(), case start(XmppDomain, Sid, "", IP) of {error, _} -> {500, ?HEADER, "Internal Server Error"}; {ok, Pid} -> handle_session_start( Pid, XmppDomain, Sid, Rid, Attrs, Payload, PayloadSize, IP) end end; %% Existing session {ok, {Sid, Rid, Attrs, Payload1}} -> StreamStart = case exmpp_xml:get_attribute_from_list_as_list(Attrs, ?NS_BOSH, restart, "") of "true" -> true; _ -> false end, Payload2 = case exmpp_xml:get_attribute_from_list_as_list(Attrs, "type", "") of "terminate" -> %% close stream Payload1 ++ [{xmlstreamend, "stream:stream"}]; _ -> Payload1 end, handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize, StreamStart, IP); {size_limit, Sid} -> case get_session(Sid) of {error, _} -> {404, ?HEADER, ""}; {ok, #http_bind{pid = FsmRef}} -> gen_fsm:sync_send_all_state_event(FsmRef, {stop, close}), {200, ?HEADER, "Request Too Large"} end; Error -> ?DEBUG("Received bad request: ~p~nParsing error: ~p", [Data, Error]), {400, ?HEADER, ""} end. handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs, Payload, PayloadSize, IP) -> ?DEBUG("got pid: ~p", [Pid]), Wait = case string:to_integer(exmpp_xml:get_attribute_from_list_as_list(Attrs, "wait", "")) of {error, _} -> ?MAX_WAIT; {CWait, _} -> if (CWait > ?MAX_WAIT) -> ?MAX_WAIT; true -> CWait end end, Hold = case string:to_integer(exmpp_xml:get_attribute_from_list_as_list(Attrs, "hold", "")) of {error, _} -> (?MAX_REQUESTS - 1); {CHold, _} -> if (CHold > (?MAX_REQUESTS - 1)) -> (?MAX_REQUESTS - 1); true -> CHold end end, Pdelay = case string:to_integer(exmpp_xml:get_attribute_from_list_as_list(Attrs, "process-delay", "")) of {error, _} -> ?PROCESS_DELAY_DEFAULT; {CPdelay, _} when (?PROCESS_DELAY_MIN =< CPdelay) and (CPdelay =< ?PROCESS_DELAY_MAX) -> CPdelay; {CPdelay, _} -> erlang:max( erlang:min(CPdelay,?PROCESS_DELAY_MAX), ?PROCESS_DELAY_MIN) end, Version = case catch list_to_float( exmpp_xml:get_attribute_from_list_as_list(Attrs, "ver", "")) of {'EXIT', _} -> 0.0; V -> V end, XmppVersion = exmpp_xml:get_attribute_from_list_as_list(Attrs, ?NS_BOSH, version, ""), ?DEBUG("Create session: ~p", [Sid]), mnesia:async_dirty( fun() -> mnesia:write( #http_bind{id = Sid, pid = Pid, to = {XmppDomain, XmppVersion}, hold = Hold, wait = Wait, process_delay = Pdelay, version = Version }) end), handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, true, IP). %%%---------------------------------------------------------------------- %%% Callback functions from gen_fsm %%%---------------------------------------------------------------------- %%---------------------------------------------------------------------- %% Func: init/1 %% Returns: {ok, StateName, StateData} | %% {ok, StateName, StateData, Timeout} | %% ignore | %% {stop, StopReason} %%---------------------------------------------------------------------- init([Sid, Key, IP]) -> ?DEBUG("started: ~p", [{Sid, Key, IP}]), %% Read c2s options from the first ejabberd_c2s configuration in %% the config file listen section %% TODO: We should have different access and shaper values for %% each connector. The default behaviour should be however to use %% the default c2s restrictions if not defined for the current %% connector. Opts1 = ejabberd_c2s_config:get_c2s_limits(), Opts = [{xml_socket, true} | Opts1], Shaper = none, ShaperState = shaper:new(Shaper), Socket = {http_bind, self(), IP}, ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, Opts), Timer = erlang:start_timer(?MAX_INACTIVITY, self(), []), {ok, loop, #state{id = Sid, key = Key, socket = Socket, shaper_state = ShaperState, max_inactivity = ?MAX_INACTIVITY, max_pause = ?MAX_PAUSE, timer = Timer}}. %%---------------------------------------------------------------------- %% Func: handle_event/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- handle_event({become_controller, C2SPid}, StateName, StateData) -> erlang:monitor(process, C2SPid), case StateData#state.input of cancel -> {next_state, StateName, StateData#state{ waiting_input = C2SPid}}; Input -> lists:foreach( fun(Event) -> C2SPid ! Event end, queue:to_list(Input)), {next_state, StateName, StateData#state{ input = queue:new(), waiting_input = C2SPid}} end; handle_event({change_shaper, Shaper}, StateName, StateData) -> NewShaperState = shaper:new(Shaper), {next_state, StateName, StateData#state{shaper_state = NewShaperState}}; handle_event(_Event, StateName, StateData) -> {next_state, StateName, StateData}. %%---------------------------------------------------------------------- %% Func: handle_sync_event/4 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {reply, Reply, NextStateName, NextStateData} | %% {reply, Reply, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} | %% {stop, Reason, Reply, NewStateData} %%---------------------------------------------------------------------- handle_sync_event({send_xml, Packet}, _From, StateName, #state{http_receiver = undefined} = StateData) -> Output = [Packet | StateData#state.output], Reply = ok, {reply, Reply, StateName, StateData#state{output = Output}}; handle_sync_event({send_xml, Packet}, _From, StateName, StateData) -> Output = [Packet | StateData#state.output], cancel_timer(StateData#state.timer), Timer = set_inactivity_timer(StateData#state.pause, StateData#state.max_inactivity), HTTPReply = {ok, Output}, gen_fsm:reply(StateData#state.http_receiver, HTTPReply), cancel_timer(StateData#state.wait_timer), Rid = StateData#state.rid, ReqList = [#hbr{rid = Rid, key = StateData#state.key, out = Output } | [El || El <- StateData#state.req_list, El#hbr.rid /= Rid ] ], Reply = ok, {reply, Reply, StateName, StateData#state{output = [], http_receiver = undefined, req_list = ReqList, wait_timer = undefined, timer = Timer}}; handle_sync_event({stop,close}, _From, _StateName, StateData) -> Reply = ok, {stop, normal, Reply, StateData}; handle_sync_event({stop,stream_closed}, _From, _StateName, StateData) -> Reply = ok, {stop, normal, Reply, StateData}; handle_sync_event(deactivate_socket, _From, StateName, StateData) -> %% Input = case StateData#state.input of %% cancel -> %% queue:new(); %% Q -> %% Q %% end, {reply, ok, StateName, StateData#state{waiting_input = false}}; handle_sync_event({stop,Reason}, _From, _StateName, StateData) -> ?DEBUG("Closing bind session ~p - Reason: ~p", [StateData#state.id, Reason]), Reply = ok, {stop, normal, Reply, StateData}; %% HTTP PUT: Receive packets from the client handle_sync_event(#http_put{rid = Rid}, _From, StateName, StateData) when StateData#state.shaper_timer /= undefined -> Pause = case erlang:read_timer(StateData#state.shaper_timer) of false -> 0; P -> P end, Reply = {wait, Pause}, ?DEBUG("Shaper timer for RID ~p: ~p", [Rid, Reply]), {reply, Reply, StateName, StateData}; handle_sync_event(#http_put{payload_size = PayloadSize} = Request, _From, StateName, StateData) -> ?DEBUG("New request: ~p",[Request]), %% Updating trafic shaper {NewShaperState, NewShaperTimer} = update_shaper(StateData#state.shaper_state, PayloadSize), handle_http_put_event(Request, StateName, StateData#state{shaper_state = NewShaperState, shaper_timer = NewShaperTimer}); %% HTTP GET: send packets to the client handle_sync_event({http_get, Rid, Wait, Hold}, From, StateName, StateData) -> %% setup timer send_receiver_reply(StateData#state.http_receiver, {ok, empty}), cancel_timer(StateData#state.wait_timer), TNow = tnow(), if (Hold > 0) and (StateData#state.output == []) and ((TNow - StateData#state.ctime) < (Wait*1000*1000)) and (StateData#state.rid == Rid) and (StateData#state.input /= cancel) and (StateData#state.pause == 0) -> WaitTimer = erlang:start_timer(Wait * 1000, self(), []), %% MR: Not sure we should cancel the state timer here. cancel_timer(StateData#state.timer), {next_state, StateName, StateData#state{ http_receiver = From, wait_timer = WaitTimer, timer = undefined}}; (StateData#state.input == cancel) -> cancel_timer(StateData#state.timer), Timer = set_inactivity_timer(StateData#state.pause, StateData#state.max_inactivity), Reply = {ok, cancel}, {reply, Reply, StateName, StateData#state{ input = queue:new(), http_receiver = undefined, wait_timer = undefined, timer = Timer}}; true -> cancel_timer(StateData#state.timer), Timer = set_inactivity_timer(StateData#state.pause, StateData#state.max_inactivity), Reply = {ok, StateData#state.output}, %% save request ReqList = [#hbr{rid = Rid, key = StateData#state.key, out = StateData#state.output } | [El || El <- StateData#state.req_list, El#hbr.rid /= Rid ] ], {reply, Reply, StateName, StateData#state{ output = [], http_receiver = undefined, wait_timer = undefined, timer = Timer, req_list = ReqList}} end; handle_sync_event(peername, _From, StateName, StateData) -> Reply = {ok, StateData#state.ip}, {reply, Reply, StateName, StateData}; handle_sync_event(_Event, _From, StateName, StateData) -> Reply = ok, {reply, Reply, StateName, StateData}. code_change(_OldVsn, StateName, StateData, _Extra) -> {ok, StateName, StateData}. %%---------------------------------------------------------------------- %% Func: handle_info/3 %% Returns: {next_state, NextStateName, NextStateData} | %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- %% We reached the max_inactivity timeout: handle_info({timeout, Timer, _}, _StateName, #state{id=SID, timer = Timer} = StateData) -> ?INFO_MSG("Session timeout. Closing the HTTP bind session: ~p", [SID]), {stop, normal, StateData}; handle_info({timeout, WaitTimer, _}, StateName, #state{wait_timer = WaitTimer} = StateData) -> if StateData#state.http_receiver /= undefined -> cancel_timer(StateData#state.timer), Timer = set_inactivity_timer(StateData#state.pause, StateData#state.max_inactivity), gen_fsm:reply(StateData#state.http_receiver, {ok, empty}), Rid = StateData#state.rid, ReqList = [#hbr{rid = Rid, key = StateData#state.key, out = [] } | [El || El <- StateData#state.req_list, El#hbr.rid /= Rid ] ], {next_state, StateName, StateData#state{http_receiver = undefined, req_list = ReqList, wait_timer = undefined, timer = Timer}}; true -> {next_state, StateName, StateData} end; handle_info({timeout, ShaperTimer, _}, StateName, #state{shaper_timer = ShaperTimer} = StateData) -> {next_state, StateName, StateData#state{shaper_timer = undefined}}; handle_info({'DOWN', _MRef, process, C2SPid, _}, _StateName, #state{waiting_input = C2SPid} = StateData) -> {stop, normal, StateData}; handle_info(_, StateName, StateData) -> {next_state, StateName, StateData}. %%---------------------------------------------------------------------- %% Func: terminate/3 %% Purpose: Shutdown the fsm %% Returns: any %%---------------------------------------------------------------------- terminate(_Reason, _StateName, StateData) -> ?DEBUG("terminate: Deleting session ~s", [StateData#state.id]), mnesia:dirty_delete({http_bind, StateData#state.id}), send_receiver_reply(StateData#state.http_receiver, {ok, terminate}), case StateData#state.waiting_input of false -> ok; C2SPid -> gen_fsm:send_event(C2SPid, closed) end, ok. %%%---------------------------------------------------------------------- %%% Internal functions %%%---------------------------------------------------------------------- %% PUT / Get processing: handle_http_put_event(#http_put{rid = Rid, attrs = Attrs, hold = Hold} = Request, StateName, StateData) -> ?DEBUG("New request: ~p",[Request]), %% Check if Rid valid RidAllow = rid_allow(StateData#state.rid, Rid, Attrs, Hold, StateData#state.max_pause), %% Check if Rid is in sequence or out of sequence: case RidAllow of buffer -> ?DEBUG("Buffered request: ~p", [Request]), %% Request is out of sequence: PendingRequests = StateData#state.unprocessed_req_list, %% In case an existing RID was already buffered: Requests = lists:keydelete(Rid, 2, PendingRequests), ReqList = [#hbr{rid = Rid, key = StateData#state.key, out = [] } | [El || El <- StateData#state.req_list, El#hbr.rid > (Rid - 1 - Hold)] ], ?DEBUG("reqlist: ~p", [ReqList]), UnprocessedReqList = [Request | Requests], cancel_timer(StateData#state.timer), Timer = set_inactivity_timer(0, StateData#state.max_inactivity), {reply, buffered, StateName, StateData#state{unprocessed_req_list = UnprocessedReqList, req_list = ReqList, timer = Timer}}; _ -> %% Request is in sequence: process_http_put(Request, StateName, StateData, RidAllow) end. process_http_put(#http_put{rid = Rid, attrs = Attrs, payload = Payload, hold = Hold, stream = StreamTo, ip = IP} = Request, StateName, StateData, RidAllow) -> ?DEBUG("Actually processing request: ~p", [Request]), %% Check if key valid Key = exmpp_xml:get_attribute_from_list_as_list(Attrs, "key", ""), NewKey = exmpp_xml:get_attribute_from_list_as_list(Attrs, "newkey", ""), KeyAllow = case RidAllow of repeat -> true; false -> false; {true, _} -> case StateData#state.key of "" -> true; OldKey -> NextKey = sha:sha(Key), ?DEBUG("Key/OldKey/NextKey: ~s/~s/~s", [Key, OldKey, NextKey]), if OldKey == NextKey -> true; true -> ?DEBUG("wrong key: ~s",[Key]), false end end end, TNow = tnow(), LastPoll = if Payload == [] -> TNow; true -> 0 end, if (Payload == []) and (Hold == 0) and (TNow - StateData#state.last_poll < ?MIN_POLLING) -> Reply = {error, polling_too_frequently}, {reply, Reply, StateName, StateData}; KeyAllow -> case RidAllow of false -> Reply = {error, not_exists}, {reply, Reply, StateName, StateData}; repeat -> ?DEBUG("REPEATING ~p", [Rid]), Reply = case [El#hbr.out || El <- StateData#state.req_list, El#hbr.rid == Rid] of [] -> {error, not_exists}; [Out | _XS] -> {repeat, lists:reverse(Out)} end, {reply, Reply, StateName, StateData#state{input = cancel, last_poll = LastPoll}}; {true, Pause} -> SaveKey = if NewKey == "" -> Key; true -> NewKey end, ?DEBUG(" -- SaveKey: ~s~n", [SaveKey]), %% save request ReqList1 = [El || El <- StateData#state.req_list, El#hbr.rid > (Rid - 1 - Hold)], ReqList = case lists:keymember(Rid, #hbr.rid, ReqList1) of true -> ReqList1; false -> [#hbr{rid = Rid, key = StateData#state.key, out = [] } | ReqList1 ] end, ?DEBUG("reqlist: ~p", [ReqList]), %% setup next timer cancel_timer(StateData#state.timer), Timer = set_inactivity_timer(Pause, StateData#state.max_inactivity), case StateData#state.waiting_input of false -> Input = lists:foldl( fun queue:in/2, StateData#state.input, Payload), Reply = ok, process_buffered_request(Reply, StateName, StateData#state{input = Input, rid = Rid, key = SaveKey, ctime = TNow, timer = Timer, pause = Pause, last_poll = LastPoll, req_list = ReqList, ip = IP }); C2SPid -> case StreamTo of {To, ""} -> DecNs = [{?NS_XMPP, "stream"}, {?NS_JABBER_CLIENT, none}], StreamAttrs = [#xmlattr{name = 'to', value = list_to_binary(To)}], StreamEl = #xmlel{name = stream, ns = ?NS_XMPP, declared_ns = DecNs, attrs = StreamAttrs}, gen_fsm:send_event( C2SPid, {xmlstreamstart, StreamEl}); {To, Version} -> DecNs = [{?NS_XMPP, "stream"}, {?NS_JABBER_CLIENT, none}], StreamAttrs = [#xmlattr{name = 'to', value = list_to_binary(To)}, #xmlattr{name = 'version', value = list_to_binary(Version)}], StreamEl = #xmlel{name = stream, ns = ?NS_XMPP, declared_ns = DecNs, attrs = StreamAttrs}, gen_fsm:send_event( C2SPid, {xmlstreamstart, StreamEl}); _ -> ok end, MaxInactivity = get_max_inactivity(StreamTo, StateData#state.max_inactivity), MaxPause = get_max_inactivity(StreamTo, StateData#state.max_pause), ?DEBUG("really sending now: ~p", [Payload]), lists:foreach( fun({xmlstreamend, End}) -> gen_fsm:send_event( C2SPid, {xmlstreamend, End}); (El) -> gen_fsm:send_event( C2SPid, {xmlstreamelement, El}) end, Payload), Reply = ok, process_buffered_request(Reply, StateName, StateData#state{input = queue:new(), rid = Rid, key = SaveKey, ctime = TNow, timer = Timer, pause = Pause, last_poll = LastPoll, req_list = ReqList, max_inactivity = MaxInactivity, max_pause = MaxPause, ip = IP }) end end; true -> Reply = {error, bad_key}, {reply, Reply, StateName, StateData} end. process_buffered_request(Reply, StateName, StateData) -> Rid = StateData#state.rid, Requests = StateData#state.unprocessed_req_list, case lists:keysearch(Rid+1, 2, Requests) of {value, Request} -> ?DEBUG("Processing buffered request: ~p", [Request]), NewRequests = lists:keydelete(Rid+1, 2, Requests), handle_http_put_event( Request, StateName, StateData#state{unprocessed_req_list = NewRequests}); _ -> {reply, Reply, StateName, StateData, hibernate} end. handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) -> case http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) of {error, not_exists} -> ?DEBUG("no session associated with sid: ~p", [Sid]), {404, ?HEADER, ""}; {{error, Reason}, Sess} -> ?DEBUG("Error on HTTP put. Reason: ~p", [Reason]), handle_http_put_error(Reason, Sess); {{repeat, OutPacket}, Sess} -> ?DEBUG("http_put said 'repeat!' ...~nOutPacket: ~p", [OutPacket]), send_outpacket(Sess, OutPacket); {{wait, Pause}, _Sess} -> ?DEBUG("Trafic Shaper: Delaying request ~p", [Rid]), timer:sleep(Pause), %{200, ?HEADER, % exmpp_xml:document_to_list( % {xmlelement, "body", % [{"xmlns", ?NS_HTTP_BIND_s}, % {"type", "error"}], []})}; handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP); {buffered, _Sess} -> {200, ?HEADER, ""}; {ok, Sess} -> prepare_response(Sess, Rid, [], StreamStart) end. http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) -> ?DEBUG("Looking for session: ~p", [Sid]), case get_session(Sid) of {error, _} -> {error, not_exists}; {ok, #http_bind{pid = FsmRef, hold=Hold, to={To, StreamVersion}}=Sess}-> NewStream = case StreamStart of true -> {To, StreamVersion}; _ -> "" end, {gen_fsm:sync_send_all_state_event( FsmRef, #http_put{rid = Rid, attrs = Attrs, payload = Payload, payload_size = PayloadSize, hold = Hold, stream = NewStream, ip = IP}, 30000), Sess} end. handle_http_put_error(Reason, #http_bind{pid=FsmRef, version=Version}) when Version >= 0 -> gen_fsm:sync_send_all_state_event(FsmRef, {stop, {put_error,Reason}}), case Reason of not_exists -> {200, ?HEADER, exmpp_xml:document_to_list( #xmlel{name = 'body', ns = ?NS_HTTP_BIND_s, attrs = [ #xmlattr{name = 'type', ns = ?NS_HTTP_BIND_s, value = <<"terminate">> }, #xmlattr{name = 'type', ns = ?NS_HTTP_BIND_s, value = <<"item-not-found">> } ] })}; bad_key -> {200, ?HEADER, exmpp_xml:document_to_list( #xmlel{name = 'body', ns = ?NS_HTTP_BIND_s, attrs = [ #xmlattr{name = 'type', ns = ?NS_HTTP_BIND_s, value = <<"terminate">> }, #xmlattr{name = 'type', ns = ?NS_HTTP_BIND_s, value = <<"item-not-found">> } ] })}; polling_too_frequently -> {200, ?HEADER, exmpp_xml:document_to_list( #xmlel{name = 'body', ns = ?NS_HTTP_BIND_s, attrs = [ #xmlattr{name = 'type', ns = ?NS_HTTP_BIND_s, value = <<"terminate">> }, #xmlattr{name = 'type', ns = ?NS_HTTP_BIND_s, value = <<"policy-violation">> } ] })} end; handle_http_put_error(Reason, #http_bind{pid=FsmRef}) -> gen_fsm:sync_send_all_state_event(FsmRef,{stop, {put_error_no_version, Reason}}), case Reason of not_exists -> %% bad rid ?DEBUG("Closing HTTP bind session (Bad rid).", []), {404, ?HEADER, ""}; bad_key -> ?DEBUG("Closing HTTP bind session (Bad key).", []), {404, ?HEADER, ""}; polling_too_frequently -> ?DEBUG("Closing HTTP bind session (User polling too frequently).", []), {403, ?HEADER, ""} end. %% Control RID ordering rid_allow(none, _NewRid, _Attrs, _Hold, _MaxPause) -> %% First request - nothing saved so far {true, 0}; rid_allow(OldRid, NewRid, Attrs, Hold, MaxPause) -> ?DEBUG("Previous rid / New rid: ~p/~p", [OldRid, NewRid]), if %% We did not miss any packet, we can process it immediately: NewRid == OldRid + 1 -> case catch list_to_integer( exmpp_xml:get_attribute_from_list_as_list(Attrs, "pause", "")) of {'EXIT', _} -> {true, 0}; Pause1 when Pause1 =< MaxPause -> ?DEBUG("got pause: ~p", [Pause1]), {true, Pause1}; _ -> {true, 0} end; %% We have missed packets, we need to cached it to process it later on: (OldRid < NewRid) and (NewRid =< (OldRid + Hold + 1)) -> buffer; (NewRid =< OldRid) and (NewRid > OldRid - Hold - 1) -> repeat; true -> false end. update_shaper(ShaperState, PayloadSize) -> {NewShaperState, Pause} = shaper:update(ShaperState, PayloadSize), if Pause > 0 -> ShaperTimer = erlang:start_timer(Pause, self(), activate), %% MR: Seems timer is not needed. Activate is not handled {NewShaperState, ShaperTimer}; true -> {NewShaperState, undefined} end. prepare_response(Sess, Rid, OutputEls, StreamStart) -> receive after Sess#http_bind.process_delay -> ok end, case catch http_get(Sess, Rid) of {ok, cancel} -> %% actually it would be better if we could completely %% cancel this request, but then we would have to hack %% ejabberd_http and I'm too lazy now {200, ?HEADER, ""}; {ok, empty} -> {200, ?HEADER, ""}; {ok, terminate} -> {200, ?HEADER, ""}; {ok, ROutPacket} -> OutPacket = lists:reverse(ROutPacket), ?DEBUG("OutPacket: ~p", [OutputEls++OutPacket]), prepare_outpacket_response(Sess, Rid, OutputEls++OutPacket, StreamStart); {'EXIT', {shutdown, _}} -> {200, ?HEADER, ""}; {'EXIT', _Reason} -> {200, ?HEADER, ""} end. %% Send output payloads on establised sessions prepare_outpacket_response(Sess, _Rid, OutPacket, false) -> case catch send_outpacket(Sess, OutPacket) of {'EXIT', _Reason} -> {200, ?HEADER, ""}; SendRes -> SendRes end; %% Handle a new session along with its output payload prepare_outpacket_response(#http_bind{id=Sid, wait=Wait, hold=Hold, to=To}=Sess, Rid, OutPacket, true) -> case OutPacket of [{xmlstreamstart, _, OutAttrs} | Els] -> AuthID = exmpp_xml:get_attribute_from_list_as_list(OutAttrs, "id", ""), FromB = exmpp_xml:get_attribute_from_list_as_binary(OutAttrs, "from", ""), Version = exmpp_xml:get_attribute_from_list_as_list(OutAttrs, "version", ""), OutEls = case Els of [] -> []; [{xmlstreamelement, #xmlel{name = 'features', declared_ns = [{undefined, ?NS_XMPP_pfx}], attrs = StreamAttribs, children = StreamEls}} | StreamTail] -> %% {xmlelement, "stream:features", %% StreamAttribs, StreamEls}} %% | StreamTail] -> TypedTail = [check_default_xmlns(OEl) || {xmlstreamelement, OEl} <- StreamTail], [#xmlel{name = 'features', declared_ns = [{?NS_XMPP_s, ?NS_XMPP_pfx}], attrs = StreamAttribs, children = StreamEls}] ++ %% [{xmlelement, "stream:features", %% [{"xmlns:stream", %% ?NS_XMPP_s}] ++ %% StreamAttribs, StreamEls}] ++ TypedTail; StreamTail -> [check_default_xmlns(OEl) || {xmlstreamelement, OEl} <- StreamTail] end, case OutEls of [] -> prepare_response(Sess, Rid, OutPacket, true); [#xmlel{name = 'stream:error'}] -> {200, ?HEADER, ""}; _ -> BOSH_attribs = [#xmlattr{name = 'authid', value = list_to_binary(AuthID)}, #xmlattr{name = 'xmlns:stream', value = ?NS_XMPP_b}] ++ [#xmlattr{name = 'version', ns = ?NS_BOSH_s, value = list_to_binary(Version)}], MaxInactivity = get_max_inactivity(To, ?MAX_INACTIVITY), MaxPause = get_max_pause(To), {200, ?HEADER, exmpp_xml:document_to_list( #xmlel{name = 'body', ns = ?NS_HTTP_BIND_s, declared_ns = [{?NS_XBOSH_s, ?NS_XBOSH_pfx}], attrs = [ #xmlattr{name = 'sid', value = list_to_binary(Sid) }, #xmlattr{name = 'wait', value = list_to_binary(integer_to_list(Wait)) }, #xmlattr{name = 'requests', value = list_to_binary(integer_to_list(Hold+1)) }, #xmlattr{name = 'inactivity', value = list_to_binary(integer_to_list(trunc(MaxInactivity/1000))) }, #xmlattr{name = 'maxpause', value = list_to_binary(integer_to_list(MaxPause)) }, #xmlattr{name = 'polling', value = list_to_binary(integer_to_list(trunc(?MIN_POLLING/1000000))) }, #xmlattr{name = 'ver', value = ?BOSH_VERSION_b }, #xmlattr{name = 'from', value = FromB }, #xmlattr{name = 'secure', value = <<"true">> } ] ++ BOSH_attribs, children = OutEls})} % {xmlelement,"body", % [{"xmlns", % ?NS_HTTP_BIND_s}, % {"sid", Sid}, % {"wait", integer_to_list(Wait)}, % {"requests", integer_to_list(Hold+1)}, % {"inactivity", % integer_to_list( % trunc(MaxInactivity/1000))}, % {"maxpause", % integer_to_list(MaxPause)}, % {"polling", % integer_to_list( % trunc(?MIN_POLLING/1000000))}, % {"ver", ?BOSH_VERSION}, % {"from", From}, % {"secure", "true"} %% we're always being secure % ] ++ BOSH_attribs,OutEls})}; end; _ -> {200, ?HEADER, ""} end. http_get(#http_bind{pid = FsmRef, wait = Wait, hold = Hold}, Rid) -> gen_fsm:sync_send_all_state_event( FsmRef, {http_get, Rid, Wait, Hold}, 2 * ?MAX_WAIT * 1000). send_outpacket(#http_bind{pid = FsmRef}, OutPacket) -> case OutPacket of [] -> {200, ?HEADER, ""}; [{xmlstreamend, _}] -> gen_fsm:sync_send_all_state_event(FsmRef,{stop,stream_closed}), {200, ?HEADER, ""}; _ -> %% TODO: We parse to add a default namespace to packet, %% The spec says adding the jabber:client namespace if %% mandatory, even if some implementation do not do that %% change on packets. %% I think this should be an option to avoid modifying %% packet in most case. AllElements = lists:all(fun({xmlstreamelement, #xmlel{name = 'error', declared_ns = [{undefined, ?NS_XMPP_pfx}]}}) -> false; % {xmlelement, "stream:error", _, _}}) -> false; ({xmlstreamelement, _}) -> true; (_) -> false end, OutPacket), case AllElements of true -> TypedEls = [check_default_xmlns(OEl) || {xmlstreamelement, OEl} <- OutPacket], Body = exmpp_xml:document_to_list( #xmlel{name = 'body', ns = ?NS_HTTP_BIND_s, children = TypedEls}), % {xmlelement,"body", % [{"xmlns", % ?NS_HTTP_BIND_s}], % TypedEls}), ?DEBUG(" --- outgoing data --- ~n~s~n --- END --- ~n", [Body]), {200, ?HEADER, Body}; false -> case OutPacket of [{xmlstreamstart, _, _} | SEls] -> OutEls = case SEls of [{xmlstreamelement, #xmlel{name = 'features', declared_ns = [{undefined, "stream"}], attrs = StreamAttribs, children = StreamEls}} | % {xmlelement, % "stream:features", % StreamAttribs, StreamEls}} | StreamTail] -> TypedTail = [check_default_xmlns(OEl) || {xmlstreamelement, OEl} <- StreamTail], [#xmlel{name = 'features', declared_ns = [{?NS_XMPP_s, ?NS_XMPP_pfx}], attrs = StreamAttribs, children = StreamEls}] ++ % [{xmlelement, % "stream:features", % [{"xmlns:stream", % ?NS_XMPP_s}] ++ % StreamAttribs, StreamEls}] ++ TypedTail; StreamTail -> [check_default_xmlns(OEl) || {xmlstreamelement, OEl} <- StreamTail] end, {200, ?HEADER, exmpp_xml:document_to_list( #xmlel{name = 'body', ns = ?NS_HTTP_BIND_s, children = OutEls})}; % {xmlelement,"body", % [{"xmlns", % ?NS_HTTP_BIND_s}], % OutEls})}; _ -> SErrCond = lists:filter( fun({xmlstreamelement, #xmlel{name = 'error', declared_ns = [{undefined, ?NS_XMPP_pfx}]}}) -> % {xmlelement, "stream:error", % _, _}}) -> true; (_) -> false end, OutPacket), StreamErrCond = case SErrCond of [] -> null; [{xmlstreamelement, #xmlel{children = _Cond} = % {xmlelement, _, _, _Cond} = StreamErrorTag} | _] -> [StreamErrorTag] end, gen_fsm:sync_send_all_state_event(FsmRef, {stop, {stream_error,OutPacket}}), case StreamErrCond of null -> {200, ?HEADER, ""}; _ -> {200, ?HEADER, "" ++ elements_to_string(StreamErrCond) ++ ""} end end end end. parse_request(Data, PayloadSize, MaxStanzaSize) -> ?DEBUG("--- incoming data --- ~n~s~n --- END --- ", [Data]), %% MR: I do not think it works if put put several elements in the %% same body: case exmpp_xmlstream:parse_element(Data) of [#xmlel{name = 'body', ns = Xmlns, attrs = Attrs, children = Els}] = [Xml] -> % {xmlelement, "body", Attrs, Els} -> % Xmlns = xml:get_attr_s("xmlns",Attrs), if Xmlns /= ?NS_HTTP_BIND -> {error, bad_request}; true -> %case catch list_to_integer(xml:get_attr_s("rid", Attrs)) of case catch list_to_integer(exmpp_xml:get_attribute_as_list(Xml, "rid", "")) of {'EXIT', _} -> {error, bad_request}; Rid -> %% I guess this is to remove XMLCDATA: Is it really needed ? FixedEls = lists:filter( fun(I) -> case I of #xmlel{} -> %{xmlelement, _, _, _} -> true; _ -> false end end, Els), Sid = exmpp_xml:get_attribute_as_list(Xml, "sid", ""), if PayloadSize =< MaxStanzaSize -> {ok, {Sid, Rid, Attrs, FixedEls}}; true -> {size_limit, Sid} end end end; [#xmlel{}] -> % {xmlelement, _Name, _Attrs, _Els} -> {error, bad_request} end. send_receiver_reply(undefined, _Reply) -> ok; send_receiver_reply(Receiver, Reply) -> gen_fsm:reply(Receiver, Reply). %% Cancel timer and empty message queue. cancel_timer(undefined) -> ok; cancel_timer(Timer) -> erlang:cancel_timer(Timer), receive {timeout, Timer, _} -> ok after 0 -> ok end. %% If client asked for a pause (pause > 0), we apply the pause value %% as inactivity timer: set_inactivity_timer(Pause, _MaxInactivity) when Pause > 0 -> erlang:start_timer(Pause*1000, self(), []); %% Otherwise, we apply the max_inactivity value as inactivity timer: set_inactivity_timer(_Pause, MaxInactivity) -> erlang:start_timer(MaxInactivity, self(), []). %% TODO: Use tail recursion and list reverse ? elements_to_string([]) -> []; elements_to_string([El | Els]) -> exmpp_xml:document_to_list(El) ++ elements_to_string(Els). %% @spec (To, Default::integer()) -> integer() %% where To = [] | {Host::string(), Version::string()} get_max_inactivity({Host, _}, Default) -> case gen_mod:get_module_opt(Host, mod_http_bind, max_inactivity, undefined) of Seconds when is_integer(Seconds) -> Seconds * 1000; undefined -> Default end; get_max_inactivity(_, Default) -> Default. get_max_pause({Host, _}) -> gen_mod:get_module_opt(Host, mod_http_bind, max_pause, ?MAX_PAUSE); get_max_pause(_) -> ?MAX_PAUSE. %% Current time as integer tnow() -> {TMegSec, TSec, TMSec} = now(), (TMegSec * 1000000 + TSec) * 1000000 + TMSec. check_default_xmlns(#xmlel{name = Name, ns = Xmlns, attrs = Attrs, children = Els} = El) -> %check_default_xmlns({xmlelement, Name, Attrs, Els} = El) -> case Xmlns of % case xml:get_tag_attr_s("xmlns", El) of "" -> #xmlel{name = Name, ns = ?NS_JABBER_CLIENT_s, attrs = Attrs, children = Els}; % "" -> {xmlelement, Name, [{"xmlns", ?NS_JABBER_CLIENT_s} | Attrs], Els}; _ -> El end. %% Check that mod_http_bind has been defined in config file. %% Print a warning in log file if this is not the case. check_bind_module(XmppDomain) -> case gen_mod:is_loaded(XmppDomain, mod_http_bind) of true -> true; false -> ?ERROR_MSG("You are trying to use BOSH (HTTP Bind), but the module mod_http_bind is not started.~n" "Check your 'modules' section in your ejabberd configuration file.",[]), false end. make_sid() -> sha:sha(term_to_binary({now(), make_ref()})) ++ "-" ++ ejabberd_cluster:node_id(). get_session(SID) -> case string:tokens(SID, "-") of [_, NodeID] -> case ejabberd_cluster:get_node_by_id(NodeID) of Node when Node == node() -> case mnesia:dirty_read({http_bind, SID}) of [] -> {error, enoent}; [Session] -> {ok, Session} end; Node -> case catch rpc:call(Node, mnesia, dirty_read, [{http_bind, SID}], 5000) of [Session] -> {ok, Session}; _ -> {error, enoent} end end; _ -> {error, enoent} end.