24
1
mirror of https://github.com/processone/ejabberd.git synced 2024-07-14 23:44:18 +02:00

Add shaper support

This commit is contained in:
Evgeniy Khramtsov 2011-08-05 15:53:57 +10:00
parent f8fd9969e1
commit 1c72c45404

View File

@ -64,6 +64,8 @@
-define(DEFAULT_POLLING, 2). %% secs -define(DEFAULT_POLLING, 2). %% secs
-define(DEFAULT_INACTIVITY, 30). %% secs -define(DEFAULT_INACTIVITY, 30). %% secs
-define(MAX_SHAPED_REQUESTS_QUEUE_LEN, 1000).
-record(state, {host, -record(state, {host,
socket, socket,
el_ibuf, el_ibuf,
@ -80,6 +82,7 @@
prev_poll, prev_poll,
responses = gb_trees:empty(), responses = gb_trees:empty(),
receivers = gb_trees:empty(), receivers = gb_trees:empty(),
shaped_receivers = queue:new(),
max_requests, max_requests,
ip}). ip}).
@ -88,7 +91,8 @@
%% a connection gets terminated: %% a connection gets terminated:
%% 'condition' attribute is not enough %% 'condition' attribute is not enough
attrs = [], attrs = [],
els = []}). els = [],
size = 0}).
%%%=================================================================== %%%===================================================================
%%% API %%% API
@ -165,7 +169,7 @@ process_request(Data, IP) ->
if PayloadSize > MaxStanzaSize -> if PayloadSize > MaxStanzaSize ->
http_error(403, "Request Too Large"); http_error(403, "Request Too Large");
true -> true ->
case decode_body(Data) of case decode_body(Data, PayloadSize) of
{ok, #body{attrs = Attrs} = Body} -> {ok, #body{attrs = Attrs} = Body} ->
SID = get_attr('sid', Attrs), SID = get_attr('sid', Attrs),
To = get_attr('to', Attrs), To = get_attr('to', Attrs),
@ -297,10 +301,12 @@ wait_for_session(#body{attrs = Attrs} = Req, From, State) ->
{'xmlns:xmpp', ?NS_BOSH}, {'xmlns:xmpp', ?NS_BOSH},
{'xmlns:stream', ?NS_STREAM}, {'xmlns:stream', ?NS_STREAM},
{from, State#state.host}|Polling]}, {from, State#state.host}|Polling]},
{ShaperState, _} = shaper:update(State#state.shaper_state, Req#body.size),
State1 = State#state{wait_timeout = Wait, State1 = State#state{wait_timeout = Wait,
prev_rid = RID, prev_rid = RID,
prev_key = NewKey, prev_key = NewKey,
prev_poll = PollTime, prev_poll = PollTime,
shaper_state = ShaperState,
max_requests = Requests}, max_requests = Requests},
Els = maybe_add_xmlstreamend(Req#body.els, Type), Els = maybe_add_xmlstreamend(Req#body.els, Type),
State2 = route_els(State1, Els), State2 = route_els(State1, Els),
@ -320,19 +326,43 @@ wait_for_session(_Event, _From, State) ->
{reply, {error, badarg}, wait_for_session, State}. {reply, {error, badarg}, wait_for_session, State}.
active({#body{} = Body, From}, State) -> active({#body{} = Body, From}, State) ->
active(Body, From, State); active1(Body, From, State);
active(_Event, State) -> active(_Event, State) ->
?ERROR_MSG("unexpected event in 'active': ~p", [_Event]), ?ERROR_MSG("unexpected event in 'active': ~p", [_Event]),
{next_state, active, State}. {next_state, active, State}.
active(#body{attrs = Attrs} = Req, From, State) -> active(#body{attrs = Attrs, size = Size} = Req, From, State) ->
RID = get_attr('rid', Attrs),
?DEBUG("got request:~n" ?DEBUG("got request:~n"
"** RequestID: ~p~n"
"** Request: ~p~n" "** Request: ~p~n"
"** From: ~p~n" "** From: ~p~n"
"** State: ~p", "** State: ~p",
[RID, Req, From, State]), [Req, From, State]),
{ShaperState, Pause} = shaper:update(State#state.shaper_state, Size),
State1 = State#state{shaper_state = ShaperState},
if Pause > 0 ->
QLen = queue:len(State1#state.shaped_receivers),
if QLen < ?MAX_SHAPED_REQUESTS_QUEUE_LEN ->
TRef = start_shaper_timer(Pause),
Q = queue:in({TRef, From, Req}, State1#state.shaped_receivers),
State2 = stop_inactivity_timer(State1),
{next_state, active, State2#state{shaped_receivers = Q}};
true ->
RID = get_attr('rid', Attrs),
reply_stop(State1,
#body{http_reason = "Too many requests",
attrs = [{"type", "terminate"},
{"condition", "policy-violation"}]},
From, RID)
end;
true ->
active1(Req, From, State1)
end;
active(_Event, _From, State) ->
?ERROR_MSG("unexpected sync event in 'active': ~p", [_Event]),
{reply, {error, badarg}, active, State}.
active1(#body{attrs = Attrs} = Req, From, State) ->
RID = get_attr('rid', Attrs),
Key = get_attr('key', Attrs), Key = get_attr('key', Attrs),
IsValidKey = is_valid_key(State#state.prev_key, Key), IsValidKey = is_valid_key(State#state.prev_key, Key),
IsOveractivity = is_overactivity(State#state.prev_poll), IsOveractivity = is_overactivity(State#state.prev_poll),
@ -420,10 +450,7 @@ active(#body{attrs = Attrs} = Req, From, State) ->
reply_next_state(State6#state{prev_rid = RID}, reply_next_state(State6#state{prev_rid = RID},
#body{els = RespEls}, RID, From) #body{els = RespEls}, RID, From)
end end
end; end.
active(_Event, _From, State) ->
?ERROR_MSG("unexpected sync event in 'active': ~p", [_Event]),
{reply, {error, badarg}, active, State}.
handle_event({become_controller, C2SPid}, StateName, State) -> handle_event({become_controller, C2SPid}, StateName, State) ->
State1 = route_els(State#state{c2s_pid = C2SPid}), State1 = route_els(State#state{c2s_pid = C2SPid}),
@ -448,8 +475,16 @@ handle_sync_event({send_xml, El}, _From, StateName, State) ->
{reply, ok, StateName, reply(State1, Body#body{els = OutBuf}, {reply, ok, StateName, reply(State1, Body#body{els = OutBuf},
State1#state.prev_rid, From)}; State1#state.prev_rid, From)};
none -> none ->
OutBuf = buf_in([El], State#state.el_obuf), State1 = case queue:out(State#state.shaped_receivers) of
{reply, ok, StateName, State#state{el_obuf = OutBuf}} {{value, {TRef, From, Body}}, Q} ->
cancel_timer(TRef),
?GEN_FSM:send_event(self(), {Body, From}),
State#state{shaped_receivers = Q};
_ ->
State
end,
OutBuf = buf_in([El], State1#state.el_obuf),
{reply, ok, StateName, State1#state{el_obuf = OutBuf}}
end; end;
handle_sync_event(peername, _From, StateName, State) -> handle_sync_event(peername, _From, StateName, State) ->
{reply, {ok, State#state.ip}, StateName, State}; {reply, {ok, State#state.ip}, StateName, State};
@ -465,6 +500,20 @@ handle_info({timeout, TRef, wait_timeout}, StateName,
handle_info({timeout, TRef, inactive}, _StateName, handle_info({timeout, TRef, inactive}, _StateName,
#state{inactivity_timer = TRef} = State) -> #state{inactivity_timer = TRef} = State) ->
{stop, normal, State}; {stop, normal, State};
handle_info({timeout, TRef, shaper_timeout}, StateName, State) ->
case queue:out(State#state.shaped_receivers) of
{{value, {TRef, From, Req}}, Q} ->
?GEN_FSM:send_event(self(), {Req, From}),
{next_state, StateName, State#state{shaped_receivers = Q}};
{{value, _}, _} ->
?ERROR_MSG("shaper_timeout mismatch:~n"
"** TRef: ~p~n"
"** State: ~p",
[TRef, State]),
{stop, normal, State};
_ ->
{next_state, StateName, State}
end;
handle_info(_Info, StateName, State) -> handle_info(_Info, StateName, State) ->
?ERROR_MSG("unexpected info:~n" ?ERROR_MSG("unexpected info:~n"
"** Msg: ~p~n" "** Msg: ~p~n"
@ -567,6 +616,12 @@ do_reply(State, From, Body, RID) ->
State#state{responses = Responses2}. State#state{responses = Responses2}.
bounce_receivers(State) -> bounce_receivers(State) ->
Receivers = gb_trees:to_list(State#state.receivers),
ShapedReceivers = lists:map(
fun({_, From, #body{attrs = Attrs} = Body}) ->
RID = get_attr('rid', Attrs),
{RID, {From, Body}}
end, queue:to_list(State#state.shaped_receivers)),
lists:foreach( lists:foreach(
fun({RID, {From, _Body}}) -> fun({RID, {From, _Body}}) ->
do_reply(State, From, do_reply(State, From,
@ -574,7 +629,7 @@ bounce_receivers(State) ->
attrs = [{type, "terminate"}, attrs = [{type, "terminate"},
{condition, "other-request"}]}, {condition, "other-request"}]},
RID) RID)
end, gb_trees:to_list(State#state.receivers)). end, Receivers ++ ShapedReceivers).
bounce_els_from_obuf(State) -> bounce_els_from_obuf(State) ->
lists:foreach( lists:foreach(
@ -686,7 +741,7 @@ encode_body(#body{attrs = Attrs, els = Els}) ->
["<body", attrs_to_list(Attrs3), $>, XMLs, "</body>"] ["<body", attrs_to_list(Attrs3), $>, XMLs, "</body>"]
end. end.
decode_body(BodyXML) -> decode_body(BodyXML, Size) ->
case xml_stream:parse_element(BodyXML) of case xml_stream:parse_element(BodyXML) of
{xmlelement, "body", Attrs, Els} -> {xmlelement, "body", Attrs, Els} ->
case attrs_to_body_attrs(Attrs) of case attrs_to_body_attrs(Attrs) of
@ -703,7 +758,9 @@ decode_body(BodyXML) ->
(_) -> (_) ->
[] []
end, Els), end, Els),
{ok, #body{attrs = BodyAttrs, els = Els1}} {ok, #body{attrs = BodyAttrs,
size = Size,
els = Els1}}
end end
end; end;
{xmlelement, _, _, _} -> {xmlelement, _, _, _} ->
@ -852,6 +909,9 @@ stop_wait_timer(#state{wait_timer = TRef} = State) ->
cancel_timer(TRef), cancel_timer(TRef),
State#state{wait_timer = undefined}. State#state{wait_timer = undefined}.
start_shaper_timer(Timeout) ->
erlang:start_timer(Timeout, self(), shaper_timeout).
make_random_jid(Host) -> make_random_jid(Host) ->
%% Copied from cyrsasl_anonymous.erl %% Copied from cyrsasl_anonymous.erl
User = lists:concat([randoms:get_string() | tuple_to_list(now())]), User = lists:concat([randoms:get_string() | tuple_to_list(now())]),