diff --git a/src/ejabberd.cfg b/src/ejabberd.cfg index c91c71139..3705030ac 100644 --- a/src/ejabberd.cfg +++ b/src/ejabberd.cfg @@ -1,6 +1,6 @@ % $Id$ -override_acls. +%override_acls. {acl, admin, {user, "aleksey"}}. {acl, admin, {user, "ermine"}}. @@ -14,10 +14,13 @@ override_acls. {acl, jabberorg, {server, "jabber.org"}}. {acl, aleksey, {user, "aleksey", "jabber.ru"}}. -{acl, test, {user_glob, "test.*"}}. +%{acl, test, {user_regexp, "^test"}}. %{acl, test2, {user_glob, "test*"}}. +{shaper, normal, {maxrate, 1000}}. + + {access, disco_admin, [{allow, admin}, {deny, all}]}. @@ -26,9 +29,15 @@ override_acls. {access, c2s, [{deny, blocked}, {allow, all}]}. + +{access, c2s_shaper, [{none, admin}, + {normal, all}]}. + + {host, "e.localhost"}. -{listen, [{5522, ejabberd_c2s, start, [{access, c2s}]}, +{listen, [{5522, ejabberd_c2s, start, [{access, c2s}, + {shaper, c2s_shaper}]}, %{5523, ejabberd_c2s, start, % [{access, c2s}, {ssl, [{certfile, "./ssl.pem"}]}]}, {5269, ejabberd_s2s_in, start, []}, diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 13e7cc169..a6b4e691a 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -13,7 +13,7 @@ -behaviour(gen_fsm). %% External exports --export([start/2, receiver/3, sender/2, send_text/2, send_element/2]). +-export([start/2, receiver/4, send_text/2, send_element/2]). %% gen_fsm callbacks -export([init/1, wait_for_stream/2, wait_for_auth/2, session_established/2, @@ -28,8 +28,9 @@ -define(SETS, gb_sets). --record(state, {socket, sender, receiver, streamid, +-record(state, {socket, receiver, streamid, access, + shaper, user = "", server = ?MYNAME, resource = "", pres_t = ?SETS:new(), pres_f = ?SETS:new(), @@ -76,19 +77,20 @@ start(SockData, Opts) -> %% {stop, StopReason} %%---------------------------------------------------------------------- init([{SockMod, Socket}, Opts]) -> - SenderPid = spawn(?MODULE, sender, [Socket, SockMod]), - ReceiverPid = spawn(?MODULE, receiver, [Socket, SockMod, self()]), + ReceiverPid = spawn(?MODULE, receiver, [Socket, SockMod, none, self()]), Access = case lists:keysearch(access, 1, Opts) of - {value, {_, A}} -> - A; - _ -> - all + {value, {_, A}} -> A; + _ -> all + end, + Shaper = case lists:keysearch(shaper, 1, Opts) of + {value, {_, S}} -> S; + _ -> none end, {ok, wait_for_stream, #state{socket = Socket, receiver = ReceiverPid, - sender = SenderPid, streamid = new_id(), - access = Access}}. + access = Access, + shaper = Shaper}}. %%---------------------------------------------------------------------- %% Func: StateName/2 @@ -101,13 +103,13 @@ wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) -> % TODO Header = io_lib:format(?STREAM_HEADER, [StateData#state.streamid, ?MYNAME]), - send_text(StateData#state.sender, Header), + send_text(StateData#state.socket, Header), case lists:keysearch("xmlns:stream", 1, Attrs) of {value, {"xmlns:stream", "http://etherx.jabber.org/streams"}} -> % TODO {next_state, wait_for_auth, StateData}; _ -> - send_text(StateData#state.sender, ?INVALID_NS_ERR ?STREAM_TRAILER), + send_text(StateData#state.socket, ?INVALID_NS_ERR ?STREAM_TRAILER), {stop, normal, StateData} end; @@ -119,18 +121,20 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> case is_auth_packet(El) of {auth, ID, {U, P, D, ""}} -> Err = jlib:make_error_reply(El, "406", "Not Acceptable"), - send_element(StateData#state.sender, Err), + send_element(StateData#state.socket, Err), {next_state, wait_for_auth, StateData}; {auth, ID, {U, P, D, R}} -> io:format("AUTH: ~p~n", [{U, P, D, R}]), - case acl:match_rule(StateData#state.access, {U, ?MYNAME, R}) of + JID = {U, ?MYNAME, R}, + case acl:match_rule(StateData#state.access, JID) of allow -> case ejabberd_auth:check_password( U, P, StateData#state.streamid, D) of true -> ejabberd_sm:open_session(U, R), Res = jlib:make_result_iq_reply(El), - send_element(StateData#state.sender, Res), + send_element(StateData#state.socket, Res), + change_shaper(StateData, JID), {Fs, Ts} = mod_roster:get_subscription_lists(U), {next_state, session_established, StateData#state{user = U, @@ -140,12 +144,12 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> _ -> Err = jlib:make_error_reply( El, "401", "Unauthorized"), - send_element(StateData#state.sender, Err), + send_element(StateData#state.socket, Err), {next_state, wait_for_auth, StateData} end; _ -> Err = jlib:make_error_reply(El, "405", "Not Allowed"), - send_element(StateData#state.sender, Err), + send_element(StateData#state.socket, Err), {next_state, wait_for_auth, StateData} end; _ -> @@ -158,7 +162,7 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> {"", "", ""}, jlib:iq_to_xml(ResIQ)), Res = jlib:remove_attr("to", Res1), - send_element(StateData#state.sender, Res), + send_element(StateData#state.socket, Res), {next_state, wait_for_auth, StateData}; _ -> {next_state, wait_for_auth, StateData} @@ -262,7 +266,7 @@ code_change(OldVsn, StateName, StateData, Extra) -> %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- handle_info({send_text, Text}, StateName, StateData) -> - send_text(StateData#state.sender, Text), + send_text(StateData#state.socket, Text), {next_state, StateName, StateData}; handle_info(replaced, StateName, StateData) -> % TODO @@ -333,7 +337,7 @@ handle_info({route, From, To, Packet}, StateName, StateData) -> jlib:jid_to_string(To), NewAttrs), Text = xml:element_to_string({xmlelement, Name, Attrs2, Els}), - send_text(StateData#state.sender, Text), + send_text(StateData#state.socket, Text), {next_state, StateName, NewState}; true -> {next_state, StateName, NewState} @@ -360,43 +364,46 @@ terminate(Reason, StateName, StateData) -> presence_broadcast(From, StateData#state.pres_a, Packet), presence_broadcast(From, StateData#state.pres_i, Packet) end, - StateData#state.sender ! close, ok. %%%---------------------------------------------------------------------- %%% Internal functions %%%---------------------------------------------------------------------- -receiver(Socket, SockMod, C2SPid) -> +receiver(Socket, SockMod, Shaper, C2SPid) -> XMLStreamPid = xml_stream:start(C2SPid), - receiver(Socket, SockMod, C2SPid, XMLStreamPid). + ShaperState = shaper:new(Shaper), + receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid). -receiver(Socket, SockMod, C2SPid, XMLStreamPid) -> +receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamPid) -> case SockMod:recv(Socket, 0) of {ok, Text} -> + ShaperSt1 = receive + {change_shaper, Shaper} -> + io:format("RECV: ChShaper to ~p~n", [Shaper]), + shaper:new(Shaper) + after 0 -> + ShaperState + end, + NewShaperState = shaper:update(ShaperSt1, size(Text)), xml_stream:send_text(XMLStreamPid, Text), - receiver(Socket, SockMod, C2SPid, XMLStreamPid); + receiver(Socket, SockMod, NewShaperState, C2SPid, XMLStreamPid); {error, Reason} -> exit(XMLStreamPid, closed), gen_fsm:send_event(C2SPid, closed), ok end. -sender(Socket, SockMod) -> - receive - {send_text, Text} -> - SockMod:send(Socket,Text), - sender(Socket, SockMod); - close -> - SockMod:close(Socket), - ok - end. +change_shaper(StateData, JID) -> + Shaper = acl:match_rule(StateData#state.shaper, JID), + StateData#state.receiver ! {change_shaper, Shaper}. -send_text(Pid, Text) -> - Pid ! {send_text, Text}. +send_text(Socket, Text) -> + gen_tcp:send(Socket,Text). + +send_element(Socket, El) -> + send_text(Socket, xml:element_to_string(El)). -send_element(Pid, El) -> - send_text(Pid, xml:element_to_string(El)). new_id() -> randoms:get_string(). diff --git a/src/ejabberd_config.erl b/src/ejabberd_config.erl index 16dc95602..9afa704dc 100644 --- a/src/ejabberd_config.erl +++ b/src/ejabberd_config.erl @@ -62,6 +62,10 @@ process_term(Term, State) -> State#state{opts = [#config{key = {access, RuleName}, value = Rules} | State#state.opts]}; + {shaper, Name, Data} -> + State#state{opts = [#config{key = {shaper, Name}, + value = Data} | + State#state.opts]}; {Opt, Val} -> add_option(Opt, Val, State) end. diff --git a/src/shaper.erl b/src/shaper.erl new file mode 100644 index 000000000..811315f0f --- /dev/null +++ b/src/shaper.erl @@ -0,0 +1,58 @@ +%%%---------------------------------------------------------------------- +%%% File : shaper.erl +%%% Author : Alexey Shchepin +%%% Purpose : Functions to control connections traffic +%%% Created : 9 Feb 2003 by Alexey Shchepin +%%% Id : $Id$ +%%%---------------------------------------------------------------------- + +-module(shaper). +-author('alexey@sevcom.net'). +-vsn('$Revision$ '). + +-export([new/1, new1/1, update/2]). + +-record(maxrate, {maxrate, lastrate, lasttime}). + + +new(Name) -> + Data = case ejabberd_config:get_global_option({shaper, Name}) of + undefined -> + none; + D -> + D + end, + new1(Data). + + +new1(none) -> + none; +new1({maxrate, MaxRate}) -> + #maxrate{maxrate = MaxRate, + lastrate = 0, + lasttime = now_to_usec(now())}. + + +update(none, Size) -> + none; +update(#maxrate{} = State, Size) -> + MinInterv = 1000 * Size / + (2 * State#maxrate.maxrate - State#maxrate.lastrate), + Interv = (now_to_usec(now()) - State#maxrate.lasttime) / 1000, + %io:format("State: ~p, Size=~p~nM=~p, I=~p~n", + % [State, Size, MinInterv, Interv]), + if + MinInterv > Interv -> + timer:sleep(1 + trunc(MinInterv - Interv)); + true -> + ok + end, + Now = now_to_usec(now()), + State#maxrate{ + lastrate = (State#maxrate.lastrate + + 1000000 * Size / (Now - State#maxrate.lasttime))/2, + lasttime = Now}. + + +now_to_usec({MSec, Sec, USec}) -> + (MSec*1000000 + Sec)*1000000 + USec.