diff --git a/src/web/ejabberd_bosh.erl b/src/web/ejabberd_bosh.erl index 68cba4646..71fe62635 100644 --- a/src/web/ejabberd_bosh.erl +++ b/src/web/ejabberd_bosh.erl @@ -64,6 +64,8 @@ -define(DEFAULT_POLLING, 2). %% secs -define(DEFAULT_INACTIVITY, 30). %% secs +-define(MAX_SHAPED_REQUESTS_QUEUE_LEN, 1000). + -record(state, {host, socket, el_ibuf, @@ -80,6 +82,7 @@ prev_poll, responses = gb_trees:empty(), receivers = gb_trees:empty(), + shaped_receivers = queue:new(), max_requests, ip}). @@ -88,7 +91,8 @@ %% a connection gets terminated: %% 'condition' attribute is not enough attrs = [], - els = []}). + els = [], + size = 0}). %%%=================================================================== %%% API @@ -165,7 +169,7 @@ process_request(Data, IP) -> if PayloadSize > MaxStanzaSize -> http_error(403, "Request Too Large"); true -> - case decode_body(Data) of + case decode_body(Data, PayloadSize) of {ok, #body{attrs = Attrs} = Body} -> SID = get_attr('sid', Attrs), To = get_attr('to', Attrs), @@ -297,10 +301,12 @@ wait_for_session(#body{attrs = Attrs} = Req, From, State) -> {'xmlns:xmpp', ?NS_BOSH}, {'xmlns:stream', ?NS_STREAM}, {from, State#state.host}|Polling]}, + {ShaperState, _} = shaper:update(State#state.shaper_state, Req#body.size), State1 = State#state{wait_timeout = Wait, prev_rid = RID, prev_key = NewKey, prev_poll = PollTime, + shaper_state = ShaperState, max_requests = Requests}, Els = maybe_add_xmlstreamend(Req#body.els, Type), State2 = route_els(State1, Els), @@ -320,19 +326,43 @@ wait_for_session(_Event, _From, State) -> {reply, {error, badarg}, wait_for_session, State}. active({#body{} = Body, From}, State) -> - active(Body, From, State); + active1(Body, From, State); active(_Event, State) -> ?ERROR_MSG("unexpected event in 'active': ~p", [_Event]), {next_state, active, State}. -active(#body{attrs = Attrs} = Req, From, State) -> - RID = get_attr('rid', Attrs), +active(#body{attrs = Attrs, size = Size} = Req, From, State) -> ?DEBUG("got request:~n" - "** RequestID: ~p~n" "** Request: ~p~n" "** From: ~p~n" "** State: ~p", - [RID, Req, From, State]), + [Req, From, State]), + {ShaperState, Pause} = shaper:update(State#state.shaper_state, Size), + State1 = State#state{shaper_state = ShaperState}, + if Pause > 0 -> + QLen = queue:len(State1#state.shaped_receivers), + if QLen < ?MAX_SHAPED_REQUESTS_QUEUE_LEN -> + TRef = start_shaper_timer(Pause), + Q = queue:in({TRef, From, Req}, State1#state.shaped_receivers), + State2 = stop_inactivity_timer(State1), + {next_state, active, State2#state{shaped_receivers = Q}}; + true -> + RID = get_attr('rid', Attrs), + reply_stop(State1, + #body{http_reason = "Too many requests", + attrs = [{"type", "terminate"}, + {"condition", "policy-violation"}]}, + From, RID) + end; + true -> + active1(Req, From, State1) + end; +active(_Event, _From, State) -> + ?ERROR_MSG("unexpected sync event in 'active': ~p", [_Event]), + {reply, {error, badarg}, active, State}. + +active1(#body{attrs = Attrs} = Req, From, State) -> + RID = get_attr('rid', Attrs), Key = get_attr('key', Attrs), IsValidKey = is_valid_key(State#state.prev_key, Key), IsOveractivity = is_overactivity(State#state.prev_poll), @@ -420,10 +450,7 @@ active(#body{attrs = Attrs} = Req, From, State) -> reply_next_state(State6#state{prev_rid = RID}, #body{els = RespEls}, RID, From) end - end; -active(_Event, _From, State) -> - ?ERROR_MSG("unexpected sync event in 'active': ~p", [_Event]), - {reply, {error, badarg}, active, State}. + end. handle_event({become_controller, C2SPid}, StateName, State) -> State1 = route_els(State#state{c2s_pid = C2SPid}), @@ -448,8 +475,16 @@ handle_sync_event({send_xml, El}, _From, StateName, State) -> {reply, ok, StateName, reply(State1, Body#body{els = OutBuf}, State1#state.prev_rid, From)}; none -> - OutBuf = buf_in([El], State#state.el_obuf), - {reply, ok, StateName, State#state{el_obuf = OutBuf}} + State1 = case queue:out(State#state.shaped_receivers) of + {{value, {TRef, From, Body}}, Q} -> + cancel_timer(TRef), + ?GEN_FSM:send_event(self(), {Body, From}), + State#state{shaped_receivers = Q}; + _ -> + State + end, + OutBuf = buf_in([El], State1#state.el_obuf), + {reply, ok, StateName, State1#state{el_obuf = OutBuf}} end; handle_sync_event(peername, _From, StateName, State) -> {reply, {ok, State#state.ip}, StateName, State}; @@ -465,6 +500,20 @@ handle_info({timeout, TRef, wait_timeout}, StateName, handle_info({timeout, TRef, inactive}, _StateName, #state{inactivity_timer = TRef} = State) -> {stop, normal, State}; +handle_info({timeout, TRef, shaper_timeout}, StateName, State) -> + case queue:out(State#state.shaped_receivers) of + {{value, {TRef, From, Req}}, Q} -> + ?GEN_FSM:send_event(self(), {Req, From}), + {next_state, StateName, State#state{shaped_receivers = Q}}; + {{value, _}, _} -> + ?ERROR_MSG("shaper_timeout mismatch:~n" + "** TRef: ~p~n" + "** State: ~p", + [TRef, State]), + {stop, normal, State}; + _ -> + {next_state, StateName, State} + end; handle_info(_Info, StateName, State) -> ?ERROR_MSG("unexpected info:~n" "** Msg: ~p~n" @@ -567,6 +616,12 @@ do_reply(State, From, Body, RID) -> State#state{responses = Responses2}. bounce_receivers(State) -> + Receivers = gb_trees:to_list(State#state.receivers), + ShapedReceivers = lists:map( + fun({_, From, #body{attrs = Attrs} = Body}) -> + RID = get_attr('rid', Attrs), + {RID, {From, Body}} + end, queue:to_list(State#state.shaped_receivers)), lists:foreach( fun({RID, {From, _Body}}) -> do_reply(State, From, @@ -574,7 +629,7 @@ bounce_receivers(State) -> attrs = [{type, "terminate"}, {condition, "other-request"}]}, RID) - end, gb_trees:to_list(State#state.receivers)). + end, Receivers ++ ShapedReceivers). bounce_els_from_obuf(State) -> lists:foreach( @@ -686,7 +741,7 @@ encode_body(#body{attrs = Attrs, els = Els}) -> [", XMLs, ""] end. -decode_body(BodyXML) -> +decode_body(BodyXML, Size) -> case xml_stream:parse_element(BodyXML) of {xmlelement, "body", Attrs, Els} -> case attrs_to_body_attrs(Attrs) of @@ -703,7 +758,9 @@ decode_body(BodyXML) -> (_) -> [] end, Els), - {ok, #body{attrs = BodyAttrs, els = Els1}} + {ok, #body{attrs = BodyAttrs, + size = Size, + els = Els1}} end end; {xmlelement, _, _, _} -> @@ -852,6 +909,9 @@ stop_wait_timer(#state{wait_timer = TRef} = State) -> cancel_timer(TRef), State#state{wait_timer = undefined}. +start_shaper_timer(Timeout) -> + erlang:start_timer(Timeout, self(), shaper_timeout). + make_random_jid(Host) -> %% Copied from cyrsasl_anonymous.erl User = lists:concat([randoms:get_string() | tuple_to_list(now())]),