diff --git a/src/ejabberd_bosh.erl b/src/ejabberd_bosh.erl index 06aaacda9..93ac4bf1b 100644 --- a/src/ejabberd_bosh.erl +++ b/src/ejabberd_bosh.erl @@ -31,16 +31,33 @@ %% API -export([start/2, start/3, start_link/3]). --export([send_xml/2, setopts/2, controlling_process/2, - reset_stream/1, change_shaper/2, close/1, - sockname/1, peername/1, process_request/3, send/2, - get_transport/1, get_owner/1]). +-export([ + send_xml/2, + setopts/2, + controlling_process/2, + reset_stream/1, + change_shaper/2, + close/1, + sockname/1, + peername/1, + process_request/4, + send/2, + get_transport/1, + get_owner/1 +]). %% gen_fsm callbacks --export([init/1, wait_for_session/2, wait_for_session/3, - active/2, active/3, handle_event/3, print_state/1, - handle_sync_event/4, handle_info/3, terminate/3, - code_change/4]). +-export([ + init/1, + wait_for_session/2, wait_for_session/3, + active/2, active/3, + handle_event/3, + print_state/1, + handle_sync_event/4, + handle_info/3, + terminate/3, + code_change/4 +]). -include("logger.hrl"). -include_lib("xmpp/include/xmpp.hrl"). @@ -63,7 +80,8 @@ -define(NS_BOSH, <<"urn:xmpp:xbosh">>). -define(NS_HTTP_BIND, - <<"http://jabber.org/protocol/httpbind">>). + <<"http://jabber.org/protocol/httpbind">> +). -define(DEFAULT_WAIT, 300). @@ -75,92 +93,111 @@ -define(SEND_TIMEOUT, 15000). --type bosh_socket() :: {http_bind, pid(), - {inet:ip_address(), - inet:port_number()}}. +-type bosh_socket() :: {http_bind, pid(), {inet:ip_address(), inet:port_number()}}. -export_type([bosh_socket/0]). --record(state, - {host = <<"">> :: binary(), - sid = <<"">> :: binary(), - el_ibuf :: p1_queue:queue(), - el_obuf :: p1_queue:queue(), - shaper_state = none :: ejabberd_shaper:shaper(), - c2s_pid :: pid() | undefined, - xmpp_ver = <<"">> :: binary(), - inactivity_timer :: reference() | undefined, - wait_timer :: reference() | undefined, - wait_timeout = ?DEFAULT_WAIT :: pos_integer(), - inactivity_timeout :: pos_integer(), - prev_rid = 0 :: non_neg_integer(), - prev_key = <<"">> :: binary(), - prev_poll :: erlang:timestamp() | undefined, - max_concat = unlimited :: unlimited | non_neg_integer(), - responses = gb_trees:empty() :: gb_trees:tree(), - receivers = gb_trees:empty() :: gb_trees:tree(), - shaped_receivers :: p1_queue:queue(), - ip :: inet:ip_address(), - max_requests = 1 :: non_neg_integer()}). +-record(state, { + host = <<"">> :: binary(), + sid = <<"">> :: binary(), + el_ibuf :: p1_queue:queue(), + el_obuf :: p1_queue:queue(), + shaper_state = none :: ejabberd_shaper:shaper(), + c2s_pid :: pid() | undefined, + xmpp_ver = <<"">> :: binary(), + inactivity_timer :: reference() | undefined, + wait_timer :: reference() | undefined, + wait_timeout = ?DEFAULT_WAIT :: pos_integer(), + inactivity_timeout :: pos_integer(), + prev_rid = 0 :: non_neg_integer(), + prev_key = <<"">> :: binary(), + prev_poll :: erlang:timestamp() | undefined, + max_concat = unlimited :: unlimited | non_neg_integer(), + responses = gb_trees:empty() :: gb_trees:tree(), + receivers = gb_trees:empty() :: gb_trees:tree(), + shaped_receivers :: p1_queue:queue(), + ip :: inet:ip_address(), + max_requests = 1 :: non_neg_integer() +}). --record(body, - {http_reason = <<"">> :: binary(), - attrs = [] :: [{any(), any()}], - els = [] :: [fxml_stream:xml_stream_el()], - size = 0 :: non_neg_integer()}). +-record(body, { + http_reason = <<"">> :: binary(), + attrs = [] :: [{any(), any()}], + els = [] :: [fxml_stream:xml_stream_el()], + size = 0 :: non_neg_integer() +}). start(#body{attrs = Attrs} = Body, IP, SID) -> XMPPDomain = get_attr(to, Attrs), SupervisorProc = gen_mod:get_module_proc(XMPPDomain, mod_bosh), - case catch supervisor:start_child(SupervisorProc, - [Body, IP, SID]) - of - {ok, Pid} -> {ok, Pid}; - {'EXIT', {noproc, _}} -> - check_bosh_module(XMPPDomain), - {error, module_not_loaded}; - Err -> - ?ERROR_MSG("Failed to start BOSH session: ~p", [Err]), - {error, Err} + case + catch supervisor:start_child( + SupervisorProc, + [Body, IP, SID] + ) + of + {ok, Pid} -> + {ok, Pid}; + {'EXIT', {noproc, _}} -> + check_bosh_module(XMPPDomain), + {error, module_not_loaded}; + Err -> + ?ERROR_MSG("Failed to start BOSH session: ~p", [Err]), + {error, Err} end. start(StateName, State) -> - p1_fsm:start_link(?MODULE, [StateName, State], - ?FSMOPTS). + p1_fsm:start_link( + ?MODULE, + [StateName, State], + ?FSMOPTS + ). start_link(Body, IP, SID) -> - p1_fsm:start_link(?MODULE, [Body, IP, SID], - ?FSMOPTS). + p1_fsm:start_link( + ?MODULE, + [Body, IP, SID], + ?FSMOPTS + ). send({http_bind, FsmRef, IP}, Packet) -> send_xml({http_bind, FsmRef, IP}, Packet). send_xml({http_bind, FsmRef, _IP}, Packet) -> - case catch p1_fsm:sync_send_all_state_event(FsmRef, - {send_xml, Packet}, - ?SEND_TIMEOUT) - of - {'EXIT', {timeout, _}} -> {error, timeout}; - {'EXIT', _} -> {error, einval}; - Res -> Res + case + catch p1_fsm:sync_send_all_state_event( + FsmRef, + {send_xml, Packet}, + ?SEND_TIMEOUT + ) + of + {'EXIT', {timeout, _}} -> {error, timeout}; + {'EXIT', _} -> {error, einval}; + Res -> Res end. setopts({http_bind, FsmRef, _IP}, Opts) -> case lists:member({active, once}, Opts) of - true -> - p1_fsm:send_all_state_event(FsmRef, - {activate, self()}); - _ -> - case lists:member({active, false}, Opts) of - true -> - case catch p1_fsm:sync_send_all_state_event(FsmRef, - deactivate_socket) - of - {'EXIT', _} -> {error, einval}; - Res -> Res - end; - _ -> ok - end + true -> + p1_fsm:send_all_state_event( + FsmRef, + {activate, self()} + ); + _ -> + case lists:member({active, false}, Opts) of + true -> + case + catch p1_fsm:sync_send_all_state_event( + FsmRef, + deactivate_socket + ) + of + {'EXIT', _} -> {error, einval}; + Res -> Res + end; + _ -> + ok + end end. controlling_process(_Socket, _Pid) -> ok. @@ -172,8 +209,10 @@ change_shaper({http_bind, FsmRef, _IP}, Shaper) -> p1_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}). close({http_bind, FsmRef, _IP}) -> - catch p1_fsm:sync_send_all_state_event(FsmRef, - close). + catch p1_fsm:sync_send_all_state_event( + FsmRef, + close + ). sockname(_Socket) -> {ok, {{0, 0, 0, 0}, 0}}. @@ -185,85 +224,133 @@ get_transport(_Socket) -> get_owner({http_bind, FsmRef, _IP}) -> FsmRef. -process_request(Data, IP, Type) -> +process_request(Data, IP, Type, Headers) -> Opts1 = ejabberd_c2s_config:get_c2s_limits(), - Opts = case Type of - xml -> - [{xml_socket, true} | Opts1]; - json -> - Opts1 - end, - MaxStanzaSize = case lists:keysearch(max_stanza_size, 1, - Opts) - of - {value, {_, Size}} -> Size; - _ -> infinity - end, + Opts = + case Type of + xml -> + [{xml_socket, true} | Opts1]; + json -> + Opts1 + end, + MaxStanzaSize = + case + lists:keysearch( + max_stanza_size, + 1, + Opts + ) + of + {value, {_, Size}} -> Size; + _ -> infinity + end, PayloadSize = iolist_size(Data), - if PayloadSize > MaxStanzaSize -> - http_error(403, <<"Request Too Large">>, Type); - true -> - case decode_body(Data, PayloadSize, Type) of - {ok, #body{attrs = Attrs} = Body} -> - SID = get_attr(sid, Attrs), - To = get_attr(to, Attrs), - if SID == <<"">>, To == <<"">> -> - bosh_response_with_msg(#body{http_reason = - <<"Missing 'to' attribute">>, - attrs = - [{type, <<"terminate">>}, - {condition, - <<"improper-addressing">>}]}, - Type, Body); - SID == <<"">> -> - case start(Body, IP, make_sid()) of - {ok, Pid} -> process_request(Pid, Body, IP, Type); - _Err -> - bosh_response_with_msg(#body{http_reason = - <<"Failed to start BOSH session">>, - attrs = - [{type, <<"terminate">>}, - {condition, - <<"internal-server-error">>}]}, - Type, Body) - end; - true -> - case mod_bosh:find_session(SID) of - {ok, Pid} -> process_request(Pid, Body, IP, Type); - error -> - bosh_response_with_msg(#body{http_reason = - <<"Session ID mismatch">>, - attrs = - [{type, <<"terminate">>}, - {condition, - <<"item-not-found">>}]}, - Type, Body) - end - end; - {error, Reason} -> http_error(400, Reason, Type) - end + if + PayloadSize > MaxStanzaSize -> + http_error(403, <<"Request Too Large">>, Type); + true -> + XUser = proplists:get_value(<<"X-User">>, Headers), + case decode_body(Data, PayloadSize, Type, XUser) of + {ok, #body{attrs = Attrs} = Body} -> + SID = get_attr(sid, Attrs), + To = get_attr(to, Attrs), + if + SID == <<"">>, To == <<"">> -> + bosh_response_with_msg( + #body{ + http_reason = + <<"Missing 'to' attribute">>, + attrs = + [ + {type, <<"terminate">>}, + {condition, <<"improper-addressing">>} + ] + }, + Type, + Body + ); + SID == <<"">> -> + case start(Body, IP, make_sid()) of + {ok, Pid} -> + process_request(Pid, Body, IP, Type, Headers); + _Err -> + bosh_response_with_msg( + #body{ + http_reason = + <<"Failed to start BOSH session">>, + attrs = + [ + {type, <<"terminate">>}, + {condition, <<"internal-server-error">>} + ] + }, + Type, + Body + ) + end; + true -> + case mod_bosh:find_session(SID) of + {ok, Pid} -> + process_request(Pid, Body, IP, Type, Headers); + error -> + bosh_response_with_msg( + #body{ + http_reason = + <<"Session ID mismatch">>, + attrs = + [ + {type, <<"terminate">>}, + {condition, <<"item-not-found">>} + ] + }, + Type, + Body + ) + end + end; + {error, Reason} -> + http_error(400, Reason, Type) + end end. -process_request(Pid, Req, _IP, Type) -> - case catch p1_fsm:sync_send_event(Pid, Req, - infinity) - of - #body{} = Resp -> bosh_response(Resp, Type); - {'EXIT', {Reason, _}} - when Reason == noproc; Reason == normal -> - bosh_response(#body{http_reason = - <<"BOSH session not found">>, - attrs = - [{type, <<"terminate">>}, - {condition, <<"item-not-found">>}]}, - Type); - {'EXIT', _} -> - bosh_response(#body{http_reason = - <<"Unexpected error">>, - attrs = - [{type, <<"terminate">>}, - {condition, <<"internal-server-error">>}]}, - Type) +process_request(Pid, Req, _IP, Type, _Headers) -> + case + catch p1_fsm:sync_send_event( + Pid, + Req, + infinity + ) + of + #body{} = Resp -> + bosh_response(Resp, Type); + {'EXIT', {Reason, _}} when + Reason == noproc; Reason == normal + -> + bosh_response( + #body{ + http_reason = + <<"BOSH session not found">>, + attrs = + [ + {type, <<"terminate">>}, + {condition, <<"item-not-found">>} + ] + }, + Type + ); + {'EXIT', _} -> + bosh_response( + #body{ + http_reason = + <<"Unexpected error">>, + attrs = + [ + {type, <<"terminate">>}, + {condition, <<"internal-server-error">>} + ] + }, + Type + ) end. init([#body{attrs = Attrs}, IP, SID]) -> @@ -274,143 +361,211 @@ init([#body{attrs = Attrs}, IP, SID]) -> Socket = make_socket(self(), IP), XMPPVer = get_attr('xmpp:version', Attrs), XMPPDomain = get_attr(to, Attrs), - {InBuf, Opts} = case mod_bosh_opt:prebind(XMPPDomain) of - true -> - JID = make_random_jid(XMPPDomain), - {buf_new(XMPPDomain), [{jid, JID} | Opts2]}; - false -> - {buf_in([make_xmlstreamstart(XMPPDomain, XMPPVer)], - buf_new(XMPPDomain)), - Opts2} - end, - case ejabberd_c2s:start(?MODULE, Socket, [{receiver, self()}|Opts]) of - {ok, C2SPid} -> - ejabberd_c2s:accept(C2SPid), - Inactivity = mod_bosh_opt:max_inactivity(XMPPDomain) div 1000, - MaxConcat = mod_bosh_opt:max_concat(XMPPDomain), - ShapedReceivers = buf_new(XMPPDomain, ?MAX_SHAPED_REQUESTS_QUEUE_LEN), - State = #state{host = XMPPDomain, sid = SID, ip = IP, - xmpp_ver = XMPPVer, el_ibuf = InBuf, - max_concat = MaxConcat, el_obuf = buf_new(XMPPDomain), - inactivity_timeout = Inactivity, - shaped_receivers = ShapedReceivers, - shaper_state = ShaperState}, - NewState = restart_inactivity_timer(State), - case mod_bosh:open_session(SID, self()) of - ok -> - {ok, wait_for_session, NewState}; - {error, Reason} -> - {stop, Reason} - end; - {error, Reason} -> - {stop, Reason}; - ignore -> - ignore + {InBuf, Opts} = + case mod_bosh_opt:prebind(XMPPDomain) of + true -> + JID = make_random_jid(XMPPDomain), + {buf_new(XMPPDomain), [{jid, JID} | Opts2]}; + false -> + { + buf_in( + [make_xmlstreamstart(XMPPDomain, XMPPVer)], + buf_new(XMPPDomain) + ), + Opts2 + } + end, + case ejabberd_c2s:start(?MODULE, Socket, [{receiver, self()} | Opts]) of + {ok, C2SPid} -> + ejabberd_c2s:accept(C2SPid), + Inactivity = mod_bosh_opt:max_inactivity(XMPPDomain) div 1000, + MaxConcat = mod_bosh_opt:max_concat(XMPPDomain), + ShapedReceivers = buf_new(XMPPDomain, ?MAX_SHAPED_REQUESTS_QUEUE_LEN), + State = #state{ + host = XMPPDomain, + sid = SID, + ip = IP, + xmpp_ver = XMPPVer, + el_ibuf = InBuf, + max_concat = MaxConcat, + el_obuf = buf_new(XMPPDomain), + inactivity_timeout = Inactivity, + shaped_receivers = ShapedReceivers, + shaper_state = ShaperState + }, + NewState = restart_inactivity_timer(State), + case mod_bosh:open_session(SID, self()) of + ok -> + {ok, wait_for_session, NewState}; + {error, Reason} -> + {stop, Reason} + end; + {error, Reason} -> + {stop, Reason}; + ignore -> + ignore end. wait_for_session(_Event, State) -> - ?ERROR_MSG("Unexpected event in 'wait_for_session': ~p", - [_Event]), + ?ERROR_MSG( + "Unexpected event in 'wait_for_session': ~p", + [_Event] + ), {next_state, wait_for_session, State}. -wait_for_session(#body{attrs = Attrs} = Req, From, - State) -> +wait_for_session( + #body{attrs = Attrs} = Req, + From, + State +) -> RID = get_attr(rid, Attrs), - ?DEBUG("Got request:~n** RequestID: ~p~n** Request: " - "~p~n** From: ~p~n** State: ~p", - [RID, Req, From, State]), - Wait = min(get_attr(wait, Attrs, undefined), - ?DEFAULT_WAIT), - Hold = min(get_attr(hold, Attrs, undefined), - ?DEFAULT_HOLD), + ?DEBUG( + "Got request:~n** RequestID: ~p~n** Request: " + "~p~n** From: ~p~n** State: ~p", + [RID, Req, From, State] + ), + Wait = min( + get_attr(wait, Attrs, undefined), + ?DEFAULT_WAIT + ), + Hold = min( + get_attr(hold, Attrs, undefined), + ?DEFAULT_HOLD + ), NewKey = get_attr(newkey, Attrs), Type = get_attr(type, Attrs), Requests = Hold + 1, - PollTime = if - Wait == 0, Hold == 0 -> erlang:timestamp(); - true -> undefined - end, + PollTime = + if + Wait == 0, Hold == 0 -> erlang:timestamp(); + true -> undefined + end, MaxPause = mod_bosh_opt:max_pause(State#state.host) div 1000, - Resp = #body{attrs = - [{sid, State#state.sid}, {wait, Wait}, - {ver, ?BOSH_VERSION}, {polling, ?DEFAULT_POLLING}, - {inactivity, State#state.inactivity_timeout}, - {hold, Hold}, {'xmpp:restartlogic', true}, - {requests, Requests}, {secure, true}, - {maxpause, MaxPause}, {'xmlns:xmpp', ?NS_BOSH}, - {'xmlns:stream', ?NS_STREAM}, {from, State#state.host}]}, + Resp = #body{ + attrs = + [ + {sid, State#state.sid}, + {wait, Wait}, + {ver, ?BOSH_VERSION}, + {polling, ?DEFAULT_POLLING}, + {inactivity, State#state.inactivity_timeout}, + {hold, Hold}, + {'xmpp:restartlogic', true}, + {requests, Requests}, + {secure, true}, + {maxpause, MaxPause}, + {'xmlns:xmpp', ?NS_BOSH}, + {'xmlns:stream', ?NS_STREAM}, + {from, State#state.host} + ] + }, {ShaperState, _} = - ejabberd_shaper:update(State#state.shaper_state, Req#body.size), - State1 = State#state{wait_timeout = Wait, - prev_rid = RID, prev_key = NewKey, - prev_poll = PollTime, shaper_state = ShaperState, - max_requests = Requests}, + ejabberd_shaper:update(State#state.shaper_state, Req#body.size), + State1 = State#state{ + wait_timeout = Wait, + prev_rid = RID, + prev_key = NewKey, + prev_poll = PollTime, + shaper_state = ShaperState, + max_requests = Requests + }, Els = maybe_add_xmlstreamend(Req#body.els, Type), State2 = route_els(State1, Els), {State3, RespEls} = get_response_els(State2), State4 = stop_inactivity_timer(State3), case RespEls of - [{xmlstreamstart, _, _} = El1] -> - OutBuf = buf_in([El1], State4#state.el_obuf), - State5 = restart_wait_timer(State4), - Receivers = gb_trees:insert(RID, {From, Resp}, - State5#state.receivers), - {next_state, active, - State5#state{receivers = Receivers, el_obuf = OutBuf}}; - [] -> - State5 = restart_wait_timer(State4), - Receivers = gb_trees:insert(RID, {From, Resp}, - State5#state.receivers), - {next_state, active, - State5#state{receivers = Receivers}}; - _ -> - reply_next_state(State4, Resp#body{els = RespEls}, RID, - From) + [{xmlstreamstart, _, _} = El1] -> + OutBuf = buf_in([El1], State4#state.el_obuf), + State5 = restart_wait_timer(State4), + Receivers = gb_trees:insert( + RID, + {From, Resp}, + State5#state.receivers + ), + {next_state, active, State5#state{receivers = Receivers, el_obuf = OutBuf}}; + [] -> + State5 = restart_wait_timer(State4), + Receivers = gb_trees:insert( + RID, + {From, Resp}, + State5#state.receivers + ), + {next_state, active, State5#state{receivers = Receivers}}; + _ -> + reply_next_state( + State4, + Resp#body{els = RespEls}, + RID, + From + ) end; wait_for_session(_Event, _From, State) -> - ?ERROR_MSG("Unexpected sync event in 'wait_for_session': ~p", - [_Event]), + ?ERROR_MSG( + "Unexpected sync event in 'wait_for_session': ~p", + [_Event] + ), {reply, {error, badarg}, wait_for_session, State}. active({#body{} = Body, From}, State) -> active1(Body, From, State); active(_Event, State) -> - ?ERROR_MSG("Unexpected event in 'active': ~p", - [_Event]), + ?ERROR_MSG( + "Unexpected event in 'active': ~p", + [_Event] + ), {next_state, active, State}. -active(#body{attrs = Attrs, size = Size} = Req, From, - State) -> - ?DEBUG("Got request:~n** Request: ~p~n** From: " - "~p~n** State: ~p", - [Req, From, State]), +active( + #body{attrs = Attrs, size = Size} = Req, + From, + State +) -> + ?DEBUG( + "Got request:~n** Request: ~p~n** From: " + "~p~n** State: ~p", + [Req, From, State] + ), {ShaperState, Pause} = - ejabberd_shaper:update(State#state.shaper_state, Size), + ejabberd_shaper:update(State#state.shaper_state, Size), State1 = State#state{shaper_state = ShaperState}, - if Pause > 0 -> - TRef = start_shaper_timer(Pause), - try p1_queue:in({TRef, From, Req}, - State1#state.shaped_receivers) of - Q -> - State2 = stop_inactivity_timer(State1), - {next_state, active, - State2#state{shaped_receivers = Q}} - catch error:full -> - misc:cancel_timer(TRef), - RID = get_attr(rid, Attrs), - reply_stop(State1, - #body{http_reason = <<"Too many requests">>, - attrs = - [{<<"type">>, <<"terminate">>}, - {<<"condition">>, - <<"policy-violation">>}]}, - From, RID) - end; - true -> active1(Req, From, State1) + if + Pause > 0 -> + TRef = start_shaper_timer(Pause), + try + p1_queue:in( + {TRef, From, Req}, + State1#state.shaped_receivers + ) + of + Q -> + State2 = stop_inactivity_timer(State1), + {next_state, active, State2#state{shaped_receivers = Q}} + catch + error:full -> + misc:cancel_timer(TRef), + RID = get_attr(rid, Attrs), + reply_stop( + State1, + #body{ + http_reason = <<"Too many requests">>, + attrs = + [ + {<<"type">>, <<"terminate">>}, + {<<"condition">>, <<"policy-violation">>} + ] + }, + From, + RID + ) + end; + true -> + active1(Req, From, State1) end; active(_Event, _From, State) -> - ?ERROR_MSG("Unexpected sync event in 'active': ~p", - [_Event]), + ?ERROR_MSG( + "Unexpected sync event in 'active': ~p", + [_Event] + ), {reply, {error, badarg}, active, State}. active1(#body{attrs = Attrs} = Req, From, State) -> @@ -419,181 +574,271 @@ active1(#body{attrs = Attrs} = Req, From, State) -> IsValidKey = is_valid_key(State#state.prev_key, Key), IsOveractivity = is_overactivity(State#state.prev_poll), Type = get_attr(type, Attrs), - if RID > - State#state.prev_rid + State#state.max_requests -> - reply_stop(State, - #body{http_reason = <<"Request ID is out of range">>, - attrs = - [{<<"type">>, <<"terminate">>}, - {<<"condition">>, <<"item-not-found">>}]}, - From, RID); - RID > State#state.prev_rid + 1 -> - State1 = restart_inactivity_timer(State), - Receivers = gb_trees:insert(RID, {From, Req}, - State1#state.receivers), - {next_state, active, - State1#state{receivers = Receivers}}; - RID =< State#state.prev_rid -> + if + RID > + State#state.prev_rid + State#state.max_requests -> + reply_stop( + State, + #body{ + http_reason = <<"Request ID is out of range">>, + attrs = + [ + {<<"type">>, <<"terminate">>}, + {<<"condition">>, <<"item-not-found">>} + ] + }, + From, + RID + ); + RID > State#state.prev_rid + 1 -> + State1 = restart_inactivity_timer(State), + Receivers = gb_trees:insert( + RID, + {From, Req}, + State1#state.receivers + ), + {next_state, active, State1#state{receivers = Receivers}}; + RID =< State#state.prev_rid -> %% TODO: do we need to check 'key' here? It seems so... case gb_trees:lookup(RID, State#state.responses) of {value, PrevBody} -> - {next_state, active, - do_reply(State, From, PrevBody, RID)}; + {next_state, active, do_reply(State, From, PrevBody, RID)}; none -> State1 = drop_holding_receiver(State, RID), State2 = stop_inactivity_timer(State1), State3 = restart_wait_timer(State2), - Receivers = gb_trees:insert(RID, {From, Req}, - State3#state.receivers), + Receivers = gb_trees:insert( + RID, + {From, Req}, + State3#state.receivers + ), {next_state, active, State3#state{receivers = Receivers}} end; - not IsValidKey -> - reply_stop(State, - #body{http_reason = <<"Session key mismatch">>, - attrs = - [{<<"type">>, <<"terminate">>}, - {<<"condition">>, <<"item-not-found">>}]}, - From, RID); - IsOveractivity -> - reply_stop(State, - #body{http_reason = <<"Too many requests">>, - attrs = - [{<<"type">>, <<"terminate">>}, - {<<"condition">>, <<"policy-violation">>}]}, - From, RID); - true -> - State1 = stop_inactivity_timer(State), - State2 = stop_wait_timer(State1), - Els = case get_attr('xmpp:restart', Attrs, false) of - true -> - XMPPDomain = get_attr(to, Attrs, State#state.host), - XMPPVer = get_attr('xmpp:version', Attrs, - State#state.xmpp_ver), - [make_xmlstreamstart(XMPPDomain, XMPPVer)]; - false -> Req#body.els - end, - State3 = route_els(State2, - maybe_add_xmlstreamend(Els, Type)), - {State4, RespEls} = get_response_els(State3), - NewKey = get_attr(newkey, Attrs, Key), - Pause = get_attr(pause, Attrs, undefined), - NewPoll = case State#state.prev_poll of - undefined -> undefined; - _ -> erlang:timestamp() - end, - State5 = State4#state{prev_poll = NewPoll, - prev_key = NewKey}, - if Type == <<"terminate">> -> - reply_stop(State5, - #body{http_reason = <<"Session close">>, - attrs = [{<<"type">>, <<"terminate">>}], - els = RespEls}, - From, RID); - Pause /= undefined -> - State6 = drop_holding_receiver(State5), - State7 = restart_inactivity_timer(State6, Pause), - InBuf = buf_in(RespEls, State7#state.el_ibuf), - {next_state, active, - State7#state{prev_rid = RID, el_ibuf = InBuf}}; - RespEls == [] -> - State6 = drop_holding_receiver(State5), - State7 = stop_inactivity_timer(State6), - State8 = restart_wait_timer(State7), - Receivers = gb_trees:insert(RID, {From, #body{}}, - State8#state.receivers), - {next_state, active, - State8#state{prev_rid = RID, receivers = Receivers}}; - true -> - State6 = drop_holding_receiver(State5), - reply_next_state(State6#state{prev_rid = RID}, - #body{els = RespEls}, RID, From) - end + not IsValidKey -> + reply_stop( + State, + #body{ + http_reason = <<"Session key mismatch">>, + attrs = + [ + {<<"type">>, <<"terminate">>}, + {<<"condition">>, <<"item-not-found">>} + ] + }, + From, + RID + ); + IsOveractivity -> + reply_stop( + State, + #body{ + http_reason = <<"Too many requests">>, + attrs = + [ + {<<"type">>, <<"terminate">>}, + {<<"condition">>, <<"policy-violation">>} + ] + }, + From, + RID + ); + true -> + State1 = stop_inactivity_timer(State), + State2 = stop_wait_timer(State1), + Els = + case get_attr('xmpp:restart', Attrs, false) of + true -> + XMPPDomain = get_attr(to, Attrs, State#state.host), + XMPPVer = get_attr( + 'xmpp:version', + Attrs, + State#state.xmpp_ver + ), + [make_xmlstreamstart(XMPPDomain, XMPPVer)]; + false -> + Req#body.els + end, + State3 = route_els( + State2, + maybe_add_xmlstreamend(Els, Type) + ), + {State4, RespEls} = get_response_els(State3), + NewKey = get_attr(newkey, Attrs, Key), + Pause = get_attr(pause, Attrs, undefined), + NewPoll = + case State#state.prev_poll of + undefined -> undefined; + _ -> erlang:timestamp() + end, + State5 = State4#state{ + prev_poll = NewPoll, + prev_key = NewKey + }, + if + Type == <<"terminate">> -> + reply_stop( + State5, + #body{ + http_reason = <<"Session close">>, + attrs = [{<<"type">>, <<"terminate">>}], + els = RespEls + }, + From, + RID + ); + Pause /= undefined -> + State6 = drop_holding_receiver(State5), + State7 = restart_inactivity_timer(State6, Pause), + InBuf = buf_in(RespEls, State7#state.el_ibuf), + {next_state, active, State7#state{prev_rid = RID, el_ibuf = InBuf}}; + RespEls == [] -> + State6 = drop_holding_receiver(State5), + State7 = stop_inactivity_timer(State6), + State8 = restart_wait_timer(State7), + Receivers = gb_trees:insert( + RID, + {From, #body{}}, + State8#state.receivers + ), + {next_state, active, State8#state{prev_rid = RID, receivers = Receivers}}; + true -> + State6 = drop_holding_receiver(State5), + reply_next_state( + State6#state{prev_rid = RID}, + #body{els = RespEls}, + RID, + From + ) + end end. -handle_event({activate, C2SPid}, StateName, - State) -> +handle_event( + {activate, C2SPid}, + StateName, + State +) -> State1 = route_els(State#state{c2s_pid = C2SPid}), {next_state, StateName, State1}; -handle_event({change_shaper, Shaper}, StateName, - State) -> +handle_event( + {change_shaper, Shaper}, + StateName, + State +) -> {next_state, StateName, State#state{shaper_state = Shaper}}; handle_event(_Event, StateName, State) -> - ?ERROR_MSG("Unexpected event in '~ts': ~p", - [StateName, _Event]), + ?ERROR_MSG( + "Unexpected event in '~ts': ~p", + [StateName, _Event] + ), {next_state, StateName, State}. -handle_sync_event({send_xml, - {xmlstreamstart, _, _} = El}, - _From, StateName, State) - when State#state.xmpp_ver >= <<"1.0">> -> +handle_sync_event( + {send_xml, {xmlstreamstart, _, _} = El}, + _From, + StateName, + State +) when + State#state.xmpp_ver >= <<"1.0">> +-> OutBuf = buf_in([El], State#state.el_obuf), {reply, ok, StateName, State#state{el_obuf = OutBuf}}; -handle_sync_event({send_xml, El}, _From, StateName, - State) -> +handle_sync_event( + {send_xml, El}, + _From, + StateName, + State +) -> OutBuf = buf_in([El], State#state.el_obuf), State1 = State#state{el_obuf = OutBuf}, - case gb_trees:lookup(State1#state.prev_rid, - State1#state.receivers) - of - {value, {From, Body}} -> - {State2, Els} = get_response_els(State1), - {reply, ok, StateName, - reply(State2, Body#body{els = Els}, - State2#state.prev_rid, From)}; - none -> - State2 = case p1_queue:out(State1#state.shaped_receivers) - of - {{value, {TRef, From, Body}}, Q} -> - misc:cancel_timer(TRef), - p1_fsm:send_event(self(), {Body, From}), - State1#state{shaped_receivers = Q}; - _ -> State1 - end, - {reply, ok, StateName, State2} + case + gb_trees:lookup( + State1#state.prev_rid, + State1#state.receivers + ) + of + {value, {From, Body}} -> + {State2, Els} = get_response_els(State1), + {reply, ok, StateName, + reply( + State2, + Body#body{els = Els}, + State2#state.prev_rid, + From + )}; + none -> + State2 = + case p1_queue:out(State1#state.shaped_receivers) of + {{value, {TRef, From, Body}}, Q} -> + misc:cancel_timer(TRef), + p1_fsm:send_event(self(), {Body, From}), + State1#state{shaped_receivers = Q}; + _ -> + State1 + end, + {reply, ok, StateName, State2} end; handle_sync_event(close, _From, _StateName, State) -> {stop, normal, State}; -handle_sync_event(deactivate_socket, _From, StateName, - StateData) -> - {reply, ok, StateName, - StateData#state{c2s_pid = undefined}}; +handle_sync_event( + deactivate_socket, + _From, + StateName, + StateData +) -> + {reply, ok, StateName, StateData#state{c2s_pid = undefined}}; handle_sync_event(_Event, _From, StateName, State) -> - ?ERROR_MSG("Unexpected sync event in '~ts': ~p", - [StateName, _Event]), + ?ERROR_MSG( + "Unexpected sync event in '~ts': ~p", + [StateName, _Event] + ), {reply, {error, badarg}, StateName, State}. -handle_info({timeout, TRef, wait_timeout}, StateName, - #state{wait_timer = TRef} = State) -> +handle_info( + {timeout, TRef, wait_timeout}, + StateName, + #state{wait_timer = TRef} = State +) -> State2 = State#state{wait_timer = undefined}, {next_state, StateName, drop_holding_receiver(State2)}; -handle_info({timeout, TRef, inactive}, _StateName, - #state{inactivity_timer = TRef} = State) -> +handle_info( + {timeout, TRef, inactive}, + _StateName, + #state{inactivity_timer = TRef} = State +) -> {stop, normal, State}; -handle_info({timeout, TRef, shaper_timeout}, StateName, - State) -> +handle_info( + {timeout, TRef, shaper_timeout}, + StateName, + State +) -> case p1_queue:out(State#state.shaped_receivers) of - {{value, {TRef, From, Req}}, Q} -> - p1_fsm:send_event(self(), {Req, From}), - {next_state, StateName, - State#state{shaped_receivers = Q}}; - {{value, _}, _} -> - ?ERROR_MSG("shaper_timeout mismatch:~n** TRef: ~p~n** " - "State: ~p", - [TRef, State]), - {stop, normal, State}; - _ -> {next_state, StateName, State} + {{value, {TRef, From, Req}}, Q} -> + p1_fsm:send_event(self(), {Req, From}), + {next_state, StateName, State#state{shaped_receivers = Q}}; + {{value, _}, _} -> + ?ERROR_MSG( + "shaper_timeout mismatch:~n** TRef: ~p~n** " + "State: ~p", + [TRef, State] + ), + {stop, normal, State}; + _ -> + {next_state, StateName, State} end; handle_info(_Info, StateName, State) -> - ?ERROR_MSG("Unexpected info:~n** Msg: ~p~n** StateName: ~p", - [_Info, StateName]), + ?ERROR_MSG( + "Unexpected info:~n** Msg: ~p~n** StateName: ~p", + [_Info, StateName] + ), {next_state, StateName, State}. terminate(_Reason, _StateName, State) -> mod_bosh:close_session(State#state.sid), case State#state.c2s_pid of - C2SPid when is_pid(C2SPid) -> - p1_fsm:send_event(C2SPid, closed); - _ -> ok + C2SPid when is_pid(C2SPid) -> + p1_fsm:send_event(C2SPid, closed); + _ -> + ok end, bounce_receivers(State, closed), bounce_els_from_obuf(State). @@ -605,57 +850,74 @@ print_state(State) -> State. route_els(#state{el_ibuf = Buf, c2s_pid = C2SPid} = State) -> NewBuf = p1_queue:dropwhile( - fun(El) -> - p1_fsm:send_event(C2SPid, El), - true - end, Buf), + fun(El) -> + p1_fsm:send_event(C2SPid, El), + true + end, + Buf + ), State#state{el_ibuf = NewBuf}. route_els(State, Els) -> case State#state.c2s_pid of - C2SPid when is_pid(C2SPid) -> - lists:foreach(fun (El) -> - p1_fsm:send_event(C2SPid, El) - end, - Els), - State; - _ -> - InBuf = buf_in(Els, State#state.el_ibuf), - State#state{el_ibuf = InBuf} + C2SPid when is_pid(C2SPid) -> + lists:foreach( + fun(El) -> + p1_fsm:send_event(C2SPid, El) + end, + Els + ), + State; + _ -> + InBuf = buf_in(Els, State#state.el_ibuf), + State#state{el_ibuf = InBuf} end. -get_response_els(#state{el_obuf = OutBuf, - max_concat = MaxConcat} = - State) -> +get_response_els( + #state{ + el_obuf = OutBuf, + max_concat = MaxConcat + } = + State +) -> {Els, NewOutBuf} = buf_out(OutBuf, MaxConcat), {State#state{el_obuf = NewOutBuf}, Els}. reply(State, Body, RID, From) -> State1 = restart_inactivity_timer(State), - Receivers = gb_trees:delete_any(RID, - State1#state.receivers), + Receivers = gb_trees:delete_any( + RID, + State1#state.receivers + ), State2 = do_reply(State1, From, Body, RID), case catch gb_trees:take_smallest(Receivers) of - {NextRID, {From1, Req}, Receivers1} - when NextRID == RID + 1 -> - p1_fsm:send_event(self(), {Req, From1}), - State2#state{receivers = Receivers1}; - _ -> State2#state{receivers = Receivers} + {NextRID, {From1, Req}, Receivers1} when + NextRID == RID + 1 + -> + p1_fsm:send_event(self(), {Req, From1}), + State2#state{receivers = Receivers1}; + _ -> + State2#state{receivers = Receivers} end. reply_next_state(State, Body, RID, From) -> State1 = restart_inactivity_timer(State), - Receivers = gb_trees:delete_any(RID, - State1#state.receivers), + Receivers = gb_trees:delete_any( + RID, + State1#state.receivers + ), State2 = do_reply(State1, From, Body, RID), case catch gb_trees:take_smallest(Receivers) of - {NextRID, {From1, Req}, Receivers1} - when NextRID == RID + 1 -> - active(Req, From1, - State2#state{receivers = Receivers1}); - _ -> - {next_state, active, - State2#state{receivers = Receivers}} + {NextRID, {From1, Req}, Receivers1} when + NextRID == RID + 1 + -> + active( + Req, + From1, + State2#state{receivers = Receivers1} + ); + _ -> + {next_state, active, State2#state{receivers = Receivers}} end. reply_stop(State, Body, From, RID) -> @@ -665,196 +927,271 @@ drop_holding_receiver(State) -> drop_holding_receiver(State, State#state.prev_rid). drop_holding_receiver(State, RID) -> case gb_trees:lookup(RID, State#state.receivers) of - {value, {From, Body}} -> - State1 = restart_inactivity_timer(State), - Receivers = gb_trees:delete_any(RID, - State1#state.receivers), - State2 = State1#state{receivers = Receivers}, - do_reply(State2, From, Body, RID); - none -> - restart_inactivity_timer(State) + {value, {From, Body}} -> + State1 = restart_inactivity_timer(State), + Receivers = gb_trees:delete_any( + RID, + State1#state.receivers + ), + State2 = State1#state{receivers = Receivers}, + do_reply(State2, From, Body, RID); + none -> + restart_inactivity_timer(State) end. do_reply(State, From, Body, RID) -> - ?DEBUG("Send reply:~n** RequestID: ~p~n** Reply: " - "~p~n** To: ~p~n** State: ~p", - [RID, Body, From, State]), + ?DEBUG( + "Send reply:~n** RequestID: ~p~n** Reply: " + "~p~n** To: ~p~n** State: ~p", + [RID, Body, From, State] + ), p1_fsm:reply(From, Body), - Responses = gb_trees:delete_any(RID, - State#state.responses), - Responses1 = case gb_trees:size(Responses) of - N when N < State#state.max_requests; N == 0 -> - Responses; - _ -> element(3, gb_trees:take_smallest(Responses)) - end, + Responses = gb_trees:delete_any( + RID, + State#state.responses + ), + Responses1 = + case gb_trees:size(Responses) of + N when N < State#state.max_requests; N == 0 -> + Responses; + _ -> + element(3, gb_trees:take_smallest(Responses)) + end, Responses2 = gb_trees:insert(RID, Body, Responses1), State#state{responses = Responses2}. bounce_receivers(State, _Reason) -> Receivers = gb_trees:to_list(State#state.receivers), - ShapedReceivers = lists:map(fun ({_, From, - #body{attrs = Attrs} = Body}) -> - RID = get_attr(rid, Attrs), - {RID, {From, Body}} - end, - p1_queue:to_list(State#state.shaped_receivers)), - lists:foldl(fun ({RID, {From, _Body}}, AccState) -> - NewBody = #body{http_reason = - <<"Session closed">>, - attrs = - [{type, <<"terminate">>}, - {condition, - <<"other-request">>}]}, - do_reply(AccState, From, NewBody, RID) - end, - State, Receivers ++ ShapedReceivers). + ShapedReceivers = lists:map( + fun({_, From, #body{attrs = Attrs} = Body}) -> + RID = get_attr(rid, Attrs), + {RID, {From, Body}} + end, + p1_queue:to_list(State#state.shaped_receivers) + ), + lists:foldl( + fun({RID, {From, _Body}}, AccState) -> + NewBody = #body{ + http_reason = + <<"Session closed">>, + attrs = + [ + {type, <<"terminate">>}, + {condition, <<"other-request">>} + ] + }, + do_reply(AccState, From, NewBody, RID) + end, + State, + Receivers ++ ShapedReceivers + ). bounce_els_from_obuf(State) -> Opts = ejabberd_config:codec_options(), p1_queue:foreach( - fun({xmlstreamelement, El}) -> - try xmpp:decode(El, ?NS_CLIENT, Opts) of - Pkt when ?is_stanza(Pkt) -> - case {xmpp:get_from(Pkt), xmpp:get_to(Pkt)} of - {#jid{}, #jid{}} -> - ejabberd_router:route(Pkt); - _ -> - ok - end; - _ -> - ok - catch _:{xmpp_codec, _} -> - ok - end; - (_) -> - ok - end, State#state.el_obuf). + fun + ({xmlstreamelement, El}) -> + try xmpp:decode(El, ?NS_CLIENT, Opts) of + Pkt when ?is_stanza(Pkt) -> + case {xmpp:get_from(Pkt), xmpp:get_to(Pkt)} of + {#jid{}, #jid{}} -> + ejabberd_router:route(Pkt); + _ -> + ok + end; + _ -> + ok + catch + _:{xmpp_codec, _} -> + ok + end; + (_) -> + ok + end, + State#state.el_obuf + ). is_valid_key(<<"">>, <<"">>) -> true; -is_valid_key(PrevKey, Key) -> - str:sha(Key) == PrevKey. +is_valid_key(PrevKey, Key) -> str:sha(Key) == PrevKey. -is_overactivity(undefined) -> false; +is_overactivity(undefined) -> + false; is_overactivity(PrevPoll) -> - PollPeriod = timer:now_diff(erlang:timestamp(), PrevPoll) div - 1000000, - if PollPeriod < (?DEFAULT_POLLING) -> true; - true -> false + PollPeriod = + timer:now_diff(erlang:timestamp(), PrevPoll) div + 1000000, + if + PollPeriod < (?DEFAULT_POLLING) -> true; + true -> false end. make_xmlstreamstart(XMPPDomain, Version) -> - VersionEl = case Version of - <<"">> -> []; - _ -> [{<<"version">>, Version}] - end, - {xmlstreamstart, <<"stream:stream">>, - [{<<"to">>, XMPPDomain}, {<<"xmlns">>, ?NS_CLIENT}, - {<<"xmlns:xmpp">>, ?NS_BOSH}, - {<<"xmlns:stream">>, ?NS_STREAM} - | VersionEl]}. + VersionEl = + case Version of + <<"">> -> []; + _ -> [{<<"version">>, Version}] + end, + {xmlstreamstart, <<"stream:stream">>, [ + {<<"to">>, XMPPDomain}, + {<<"xmlns">>, ?NS_CLIENT}, + {<<"xmlns:xmpp">>, ?NS_BOSH}, + {<<"xmlns:stream">>, ?NS_STREAM} + | VersionEl + ]}. maybe_add_xmlstreamend(Els, <<"terminate">>) -> Els ++ [{xmlstreamend, <<"stream:stream">>}]; -maybe_add_xmlstreamend(Els, _) -> Els. +maybe_add_xmlstreamend(Els, _) -> + Els. encode_body(#body{attrs = Attrs, els = Els}, Type) -> - Attrs1 = lists:map(fun ({K, V}) when is_atom(K) -> - AmK = iolist_to_binary(atom_to_list(K)), - case V of - true -> {AmK, <<"true">>}; - false -> {AmK, <<"false">>}; - I when is_integer(I), I >= 0 -> - {AmK, integer_to_binary(I)}; - _ -> {AmK, V} - end; - ({K, V}) -> {K, V} - end, - Attrs), + Attrs1 = lists:map( + fun + ({K, V}) when is_atom(K) -> + AmK = iolist_to_binary(atom_to_list(K)), + case V of + true -> + {AmK, <<"true">>}; + false -> + {AmK, <<"false">>}; + I when is_integer(I), I >= 0 -> + {AmK, integer_to_binary(I)}; + _ -> + {AmK, V} + end; + ({K, V}) -> + {K, V} + end, + Attrs + ), Attrs2 = [{<<"xmlns">>, ?NS_HTTP_BIND} | Attrs1], - {Attrs3, XMLs} = lists:foldr(fun ({xmlstreamraw, XML}, - {AttrsAcc, XMLBuf}) -> - {AttrsAcc, [XML | XMLBuf]}; - ({xmlstreamelement, - #xmlel{name = <<"stream:error">>} = El}, - {AttrsAcc, XMLBuf}) -> - {[{<<"type">>, <<"terminate">>}, - {<<"condition">>, - <<"remote-stream-error">>}, - {<<"xmlns:stream">>, ?NS_STREAM} - | AttrsAcc], - [encode_element(El, Type) | XMLBuf]}; - ({xmlstreamelement, - #xmlel{name = <<"stream:features">>} = - El}, - {AttrsAcc, XMLBuf}) -> - {lists:keystore(<<"xmlns:stream">>, 1, - AttrsAcc, - {<<"xmlns:stream">>, - ?NS_STREAM}), - [encode_element(El, Type) | XMLBuf]}; - ({xmlstreamelement, - #xmlel{name = Name, attrs = EAttrs} = El}, - {AttrsAcc, XMLBuf}) - when Name == <<"message">>; - Name == <<"presence">>; - Name == <<"iq">> -> - NewAttrs = lists:keystore( - <<"xmlns">>, 1, EAttrs, - {<<"xmlns">>, ?NS_CLIENT}), - NewEl = El#xmlel{attrs = NewAttrs}, - {AttrsAcc, - [encode_element(NewEl, Type) | XMLBuf]}; - ({xmlstreamelement, El}, - {AttrsAcc, XMLBuf}) -> - {AttrsAcc, - [encode_element(El, Type) | XMLBuf]}; - ({xmlstreamend, _}, {AttrsAcc, XMLBuf}) -> - {[{<<"type">>, <<"terminate">>}, - {<<"condition">>, - <<"remote-stream-error">>} - | AttrsAcc], - XMLBuf}; - ({xmlstreamstart, <<"stream:stream">>, - SAttrs}, - {AttrsAcc, XMLBuf}) -> - StreamID = fxml:get_attr_s(<<"id">>, - SAttrs), - NewAttrs = case - fxml:get_attr_s(<<"version">>, - SAttrs) - of - <<"">> -> - [{<<"authid">>, - StreamID} - | AttrsAcc]; - V -> - lists:keystore(<<"xmlns:xmpp">>, - 1, - [{<<"xmpp:version">>, - V}, - {<<"authid">>, - StreamID} - | AttrsAcc], - {<<"xmlns:xmpp">>, - ?NS_BOSH}) - end, - {NewAttrs, XMLBuf}; - ({xmlstreamerror, _}, - {AttrsAcc, XMLBuf}) -> - {[{<<"type">>, <<"terminate">>}, - {<<"condition">>, - <<"remote-stream-error">>} - | AttrsAcc], - XMLBuf}; - (_, Acc) -> Acc - end, - {Attrs2, []}, Els), + {Attrs3, XMLs} = lists:foldr( + fun + ( + {xmlstreamraw, XML}, + {AttrsAcc, XMLBuf} + ) -> + {AttrsAcc, [XML | XMLBuf]}; + ( + {xmlstreamelement, #xmlel{name = <<"stream:error">>} = El}, + {AttrsAcc, XMLBuf} + ) -> + { + [ + {<<"type">>, <<"terminate">>}, + {<<"condition">>, <<"remote-stream-error">>}, + {<<"xmlns:stream">>, ?NS_STREAM} + | AttrsAcc + ], + [encode_element(El, Type) | XMLBuf] + }; + ( + {xmlstreamelement, + #xmlel{name = <<"stream:features">>} = + El}, + {AttrsAcc, XMLBuf} + ) -> + { + lists:keystore( + <<"xmlns:stream">>, + 1, + AttrsAcc, + {<<"xmlns:stream">>, ?NS_STREAM} + ), + [encode_element(El, Type) | XMLBuf] + }; + ( + {xmlstreamelement, #xmlel{name = Name, attrs = EAttrs} = El}, + {AttrsAcc, XMLBuf} + ) when + Name == <<"message">>; + Name == <<"presence">>; + Name == <<"iq">> + -> + NewAttrs = lists:keystore( + <<"xmlns">>, + 1, + EAttrs, + {<<"xmlns">>, ?NS_CLIENT} + ), + NewEl = El#xmlel{attrs = NewAttrs}, + {AttrsAcc, [encode_element(NewEl, Type) | XMLBuf]}; + ( + {xmlstreamelement, El}, + {AttrsAcc, XMLBuf} + ) -> + {AttrsAcc, [encode_element(El, Type) | XMLBuf]}; + ({xmlstreamend, _}, {AttrsAcc, XMLBuf}) -> + { + [ + {<<"type">>, <<"terminate">>}, + {<<"condition">>, <<"remote-stream-error">>} + | AttrsAcc + ], + XMLBuf + }; + ( + {xmlstreamstart, <<"stream:stream">>, SAttrs}, + {AttrsAcc, XMLBuf} + ) -> + StreamID = fxml:get_attr_s( + <<"id">>, + SAttrs + ), + NewAttrs = + case + fxml:get_attr_s( + <<"version">>, + SAttrs + ) + of + <<"">> -> + [ + {<<"authid">>, StreamID} + | AttrsAcc + ]; + V -> + lists:keystore( + <<"xmlns:xmpp">>, + 1, + [ + {<<"xmpp:version">>, V}, + {<<"authid">>, StreamID} + | AttrsAcc + ], + {<<"xmlns:xmpp">>, ?NS_BOSH} + ) + end, + {NewAttrs, XMLBuf}; + ( + {xmlstreamerror, _}, + {AttrsAcc, XMLBuf} + ) -> + { + [ + {<<"type">>, <<"terminate">>}, + {<<"condition">>, <<"remote-stream-error">>} + | AttrsAcc + ], + XMLBuf + }; + (_, Acc) -> + Acc + end, + {Attrs2, []}, + Els + ), case XMLs of - [] when Type == xml -> + [] when Type == xml -> [<<">, attrs_to_list(Attrs3), <<"/>">>]; - _ when Type == xml -> - [<<">, attrs_to_list(Attrs3), $>, XMLs, - <<"">>] + _ when Type == xml -> + [ + <<">, + attrs_to_list(Attrs3), + $>, + XMLs, + <<"">> + ] end. encode_element(El, xml) -> @@ -862,72 +1199,128 @@ encode_element(El, xml) -> encode_element(El, json) -> El. -decode_body(Data, Size, Type) -> +decode_body(Data, Size, Type, XUser) -> case decode(Data, Type) of - #xmlel{name = <<"body">>, attrs = Attrs, - children = Els} -> - case attrs_to_body_attrs(Attrs) of - {error, _} = Err -> Err; - BodyAttrs -> - case get_attr(rid, BodyAttrs) of - <<"">> -> {error, <<"Missing \"rid\" attribute">>}; - _ -> - Els1 = lists:flatmap(fun (#xmlel{} = El) -> - [{xmlstreamelement, El}]; - (_) -> [] - end, - Els), - {ok, #body{attrs = BodyAttrs, size = Size, els = Els1}} - end - end; - #xmlel{} -> {error, <<"Unexpected payload">>}; - _ when Type == xml -> + #xmlel{ + name = <<"body">>, + attrs = Attrs, + children = Els + } -> + case attrs_to_body_attrs(Attrs) of + {error, _} = Err -> + Err; + BodyAttrs -> + case get_attr(rid, BodyAttrs) of + <<"">> -> + {error, <<"Missing \"rid\" attribute">>}; + _ -> + Els1 = lists:flatmap( + fun(#xmlel{} = El) -> + ?DEBUG("decoe El ~p", [El]), + case {xmpp:get_name(El)} of + {<<"auth">>} -> + ?DEBUG("decode_body El cdata ~p", [ + parse(base64:decode(fxml:get_tag_cdata(El))) + ]), + NewCData = base64:encode( + update_cdata( + parse(base64:decode(fxml:get_tag_cdata(El))), + XUser + ) + ), + % recreate a new xmlel + El2 = #xmlel{ + name = xmpp:get_name(El), + attrs = El#xmlel.attrs + }, + [ + {xmlstreamelement, + fxml:append_subtags(El2, [{xmlcdata, NewCData}])} + ]; + (_) -> + [{xmlstreamelement, El}] + end + end, + Els + ), + ?DEBUG("decoe Els1 ~p", [Els1]), + {ok, #body{attrs = BodyAttrs, size = Size, els = Els1}} + end + end; + #xmlel{} -> + {error, <<"Unexpected payload">>}; + _ when Type == xml -> {error, <<"XML is not well-formed">>}; - _ when Type == json -> + _ when Type == json -> {error, <<"JSON is not well-formed">>} end. +update_cdata([AuthzId, Username, _Password], XUser) -> + erlang:iolist_to_binary([AuthzId, <<0>>, Username, <<0>>, XUser]). + +parse(S) -> + binary:split(S, <<0>>, [global]). + decode(Data, xml) -> fxml_stream:parse_element(Data); decode(Data, json) -> Data. attrs_to_body_attrs(Attrs) -> - lists:foldl(fun (_, {error, Reason}) -> {error, Reason}; - ({Attr, Val}, Acc) -> - try case Attr of - <<"ver">> -> [{ver, Val} | Acc]; - <<"xmpp:version">> -> - [{'xmpp:version', Val} | Acc]; - <<"type">> -> [{type, Val} | Acc]; - <<"key">> -> [{key, Val} | Acc]; - <<"newkey">> -> [{newkey, Val} | Acc]; - <<"xmlns">> -> Val = (?NS_HTTP_BIND), Acc; - <<"secure">> -> [{secure, to_bool(Val)} | Acc]; - <<"xmpp:restart">> -> - [{'xmpp:restart', to_bool(Val)} | Acc]; - <<"to">> -> - [{to, jid:nameprep(Val)} | Acc]; - <<"wait">> -> [{wait, to_int(Val, 0)} | Acc]; - <<"ack">> -> [{ack, to_int(Val, 0)} | Acc]; - <<"sid">> -> [{sid, Val} | Acc]; - <<"hold">> -> [{hold, to_int(Val, 0)} | Acc]; - <<"rid">> -> [{rid, to_int(Val, 0)} | Acc]; - <<"pause">> -> [{pause, to_int(Val, 0)} | Acc]; - _ -> [{Attr, Val} | Acc] - end - catch - _:_ -> - {error, - <<"Invalid \"", Attr/binary, "\" attribute">>} - end - end, - [], Attrs). + lists:foldl( + fun + (_, {error, Reason}) -> + {error, Reason}; + ({Attr, Val}, Acc) -> + try + case Attr of + <<"ver">> -> + [{ver, Val} | Acc]; + <<"xmpp:version">> -> + [{'xmpp:version', Val} | Acc]; + <<"type">> -> + [{type, Val} | Acc]; + <<"key">> -> + [{key, Val} | Acc]; + <<"newkey">> -> + [{newkey, Val} | Acc]; + <<"xmlns">> -> + Val = (?NS_HTTP_BIND), + Acc; + <<"secure">> -> + [{secure, to_bool(Val)} | Acc]; + <<"xmpp:restart">> -> + [{'xmpp:restart', to_bool(Val)} | Acc]; + <<"to">> -> + [{to, jid:nameprep(Val)} | Acc]; + <<"wait">> -> + [{wait, to_int(Val, 0)} | Acc]; + <<"ack">> -> + [{ack, to_int(Val, 0)} | Acc]; + <<"sid">> -> + [{sid, Val} | Acc]; + <<"hold">> -> + [{hold, to_int(Val, 0)} | Acc]; + <<"rid">> -> + [{rid, to_int(Val, 0)} | Acc]; + <<"pause">> -> + [{pause, to_int(Val, 0)} | Acc]; + _ -> + [{Attr, Val} | Acc] + end + catch + _:_ -> + {error, <<"Invalid \"", Attr/binary, "\" attribute">>} + end + end, + [], + Attrs + ). to_int(S, Min) -> case binary_to_integer(S) of - I when I >= Min -> I; - _ -> erlang:error(badarg) + I when I >= Min -> I; + _ -> erlang:error(badarg) end. to_bool(<<"true">>) -> true; @@ -941,23 +1334,26 @@ attr_to_list({Name, Value}) -> [$\s, Name, $=, $', fxml:crypt(Value), $']. bosh_response(Body, Type) -> - CType = case Type of - xml -> ?CT_XML; - json -> ?CT_JSON - end, - {200, Body#body.http_reason, ?HEADER(CType), - encode_body(Body, Type)}. + CType = + case Type of + xml -> ?CT_XML; + json -> ?CT_JSON + end, + {200, Body#body.http_reason, ?HEADER(CType), encode_body(Body, Type)}. bosh_response_with_msg(Body, Type, RcvBody) -> - ?DEBUG("Send error reply:~p~n** Receiced body: ~p", - [Body, RcvBody]), + ?DEBUG( + "Send error reply:~p~n** Receiced body: ~p", + [Body, RcvBody] + ), bosh_response(Body, Type). http_error(Status, Reason, Type) -> - CType = case Type of - xml -> ?CT_XML; - json -> ?CT_JSON - end, + CType = + case Type of + xml -> ?CT_XML; + json -> ?CT_JSON + end, {Status, Reason, ?HEADER(CType), <<"">>}. make_sid() -> str:sha(p1_rand:get_string()). @@ -969,25 +1365,28 @@ min(A, B) -> erlang:min(A, B). check_bosh_module(XmppDomain) -> case gen_mod:is_loaded(XmppDomain, mod_bosh) of - true -> ok; - false -> - ?ERROR_MSG("You are trying to use BOSH (HTTP Bind) " - "in host ~p, but the module mod_bosh " - "is not started in that host. Configure " - "your BOSH client to connect to the correct " - "host, or add your desired host to the " - "configuration, or check your 'modules' " - "section in your ejabberd configuration " - "file.", - [XmppDomain]) + true -> + ok; + false -> + ?ERROR_MSG( + "You are trying to use BOSH (HTTP Bind) " + "in host ~p, but the module mod_bosh " + "is not started in that host. Configure " + "your BOSH client to connect to the correct " + "host, or add your desired host to the " + "configuration, or check your 'modules' " + "section in your ejabberd configuration " + "file.", + [XmppDomain] + ) end. get_attr(Attr, Attrs) -> get_attr(Attr, Attrs, <<"">>). get_attr(Attr, Attrs, Default) -> case lists:keysearch(Attr, 1, Attrs) of - {value, {_, Val}} -> Val; - _ -> Default + {value, {_, Val}} -> Val; + _ -> Default end. buf_new(Host) -> @@ -1002,45 +1401,63 @@ buf_in(Xs, Buf) -> buf_out(Buf, Num) when is_integer(Num), Num > 0 -> buf_out(Buf, Num, []); -buf_out(Buf, _) -> {p1_queue:to_list(Buf), p1_queue:clear(Buf)}. +buf_out(Buf, _) -> + {p1_queue:to_list(Buf), p1_queue:clear(Buf)}. -buf_out(Buf, 0, Els) -> {lists:reverse(Els), Buf}; +buf_out(Buf, 0, Els) -> + {lists:reverse(Els), Buf}; buf_out(Buf, I, Els) -> case p1_queue:out(Buf) of - {{value, El}, NewBuf} -> - buf_out(NewBuf, I - 1, [El | Els]); - {empty, _} -> buf_out(Buf, 0, Els) + {{value, El}, NewBuf} -> + buf_out(NewBuf, I - 1, [El | Els]); + {empty, _} -> + buf_out(Buf, 0, Els) end. restart_timer(TRef, Timeout, Msg) -> misc:cancel_timer(TRef), erlang:start_timer(timer:seconds(Timeout), self(), Msg). -restart_inactivity_timer(#state{inactivity_timeout = - Timeout} = - State) -> +restart_inactivity_timer( + #state{ + inactivity_timeout = + Timeout + } = + State +) -> restart_inactivity_timer(State, Timeout). -restart_inactivity_timer(#state{inactivity_timer = - TRef} = - State, - Timeout) -> +restart_inactivity_timer( + #state{ + inactivity_timer = + TRef + } = + State, + Timeout +) -> NewTRef = restart_timer(TRef, Timeout, inactive), State#state{inactivity_timer = NewTRef}. -stop_inactivity_timer(#state{inactivity_timer = TRef} = - State) -> +stop_inactivity_timer( + #state{inactivity_timer = TRef} = + State +) -> misc:cancel_timer(TRef), State#state{inactivity_timer = undefined}. -restart_wait_timer(#state{wait_timer = TRef, - wait_timeout = Timeout} = - State) -> +restart_wait_timer( + #state{ + wait_timer = TRef, + wait_timeout = Timeout + } = + State +) -> NewTRef = restart_timer(TRef, Timeout, wait_timeout), State#state{wait_timer = NewTRef}. stop_wait_timer(#state{wait_timer = TRef} = State) -> - misc:cancel_timer(TRef), State#state{wait_timer = undefined}. + misc:cancel_timer(TRef), + State#state{wait_timer = undefined}. start_shaper_timer(Timeout) -> erlang:start_timer(Timeout, self(), shaper_timeout).