25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-11-24 16:23:40 +01:00

Move shaper to p1_utils repo

This commit is contained in:
Evgeniy Khramtsov 2018-07-05 09:31:55 +03:00
parent 52f2a7de4b
commit 50b645aa92
12 changed files with 39 additions and 58 deletions

View File

@ -20,7 +20,7 @@
{deps, [{lager, ".*", {git, "https://github.com/erlang-lager/lager", {deps, [{lager, ".*", {git, "https://github.com/erlang-lager/lager",
{tag, {if_version_above, "17", "3.4.2", "3.2.1"}}}}, {tag, {if_version_above, "17", "3.4.2", "3.2.1"}}}},
{p1_utils, ".*", {git, "https://github.com/processone/p1_utils", {tag, "1.0.12"}}}, {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", "b49f804"}},
{cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "1.0.14"}}}, {cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "1.0.14"}}},
{fast_tls, ".*", {git, "https://github.com/processone/fast_tls", {tag, "1.0.23"}}}, {fast_tls, ".*", {git, "https://github.com/processone/fast_tls", {tag, "1.0.23"}}},
{stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.12"}}}, {stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.12"}}},

View File

@ -88,7 +88,7 @@
sid = <<"">> :: binary(), sid = <<"">> :: binary(),
el_ibuf :: p1_queue:queue(), el_ibuf :: p1_queue:queue(),
el_obuf :: p1_queue:queue(), el_obuf :: p1_queue:queue(),
shaper_state = none :: shaper:shaper(), shaper_state = none :: ejabberd_shaper:shaper(),
c2s_pid :: pid() | undefined, c2s_pid :: pid() | undefined,
xmpp_ver = <<"">> :: binary(), xmpp_ver = <<"">> :: binary(),
inactivity_timer :: reference() | undefined, inactivity_timer :: reference() | undefined,
@ -281,7 +281,7 @@ init([#body{attrs = Attrs}, IP, SID]) ->
Opts1 = ejabberd_c2s_config:get_c2s_limits(), Opts1 = ejabberd_c2s_config:get_c2s_limits(),
Opts2 = [{xml_socket, true} | Opts1], Opts2 = [{xml_socket, true} | Opts1],
Shaper = none, Shaper = none,
ShaperState = shaper:new(Shaper), ShaperState = ejabberd_shaper:new(Shaper),
Socket = make_socket(self(), IP), Socket = make_socket(self(), IP),
XMPPVer = get_attr('xmpp:version', Attrs), XMPPVer = get_attr('xmpp:version', Attrs),
XMPPDomain = get_attr(to, Attrs), XMPPDomain = get_attr(to, Attrs),
@ -355,7 +355,7 @@ wait_for_session(#body{attrs = Attrs} = Req, From,
{'xmlns:stream', ?NS_STREAM}, {from, State#state.host} {'xmlns:stream', ?NS_STREAM}, {from, State#state.host}
| Polling]}, | Polling]},
{ShaperState, _} = {ShaperState, _} =
shaper:update(State#state.shaper_state, Req#body.size), ejabberd_shaper:update(State#state.shaper_state, Req#body.size),
State1 = State#state{wait_timeout = Wait, State1 = State#state{wait_timeout = Wait,
prev_rid = RID, prev_key = NewKey, prev_rid = RID, prev_key = NewKey,
prev_poll = PollTime, shaper_state = ShaperState, prev_poll = PollTime, shaper_state = ShaperState,
@ -393,7 +393,7 @@ active(#body{attrs = Attrs, size = Size} = Req, From,
"~p~n** State: ~p", "~p~n** State: ~p",
[Req, From, State]), [Req, From, State]),
{ShaperState, Pause} = {ShaperState, Pause} =
shaper:update(State#state.shaper_state, Size), ejabberd_shaper:update(State#state.shaper_state, Size),
State1 = State#state{shaper_state = ShaperState}, State1 = State#state{shaper_state = ShaperState},
if Pause > 0 -> if Pause > 0 ->
TRef = start_shaper_timer(Pause), TRef = start_shaper_timer(Pause),
@ -524,7 +524,7 @@ handle_event({become_controller, C2SPid}, StateName,
{next_state, StateName, State1}; {next_state, StateName, State1};
handle_event({change_shaper, Shaper}, StateName, handle_event({change_shaper, Shaper}, StateName,
State) -> State) ->
NewShaperState = shaper:new(Shaper), NewShaperState = ejabberd_shaper:new(Shaper),
{next_state, StateName, {next_state, StateName,
State#state{shaper_state = NewShaperState}}; State#state{shaper_state = NewShaperState}};
handle_event(_Event, StateName, State) -> handle_event(_Event, StateName, State) ->

View File

@ -1245,7 +1245,7 @@ transform_terms(Terms) ->
ejabberd_s2s, ejabberd_s2s,
ejabberd_listener, ejabberd_listener,
ejabberd_sql_sup, ejabberd_sql_sup,
shaper, ejabberd_shaper,
ejabberd_s2s_out, ejabberd_s2s_out,
acl, acl,
ejabberd_config], ejabberd_config],

View File

@ -1,5 +1,5 @@
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
%%% File : shaper.erl %%% File : ejabberd_shaper.erl
%%% Author : Alexey Shchepin <alexey@process-one.net> %%% Author : Alexey Shchepin <alexey@process-one.net>
%%% Purpose : Functions to control connections traffic %%% Purpose : Functions to control connections traffic
%%% Created : 9 Feb 2003 by Alexey Shchepin <alexey@process-one.net> %%% Created : 9 Feb 2003 by Alexey Shchepin <alexey@process-one.net>
@ -23,7 +23,7 @@
%%% %%%
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
-module(shaper). -module(ejabberd_shaper).
-behaviour(gen_server). -behaviour(gen_server).
-behaviour(ejabberd_config). -behaviour(ejabberd_config).
@ -39,19 +39,13 @@
-include("logger.hrl"). -include("logger.hrl").
-record(maxrate, {maxrate = 0 :: integer(),
burst_size = 0 :: integer(),
acquired_credit = 0 :: integer(),
lasttime = 0 :: integer()}).
-record(shaper, {name :: {atom(), global}, -record(shaper, {name :: {atom(), global},
maxrate :: integer(), maxrate :: integer(),
burst_size :: integer()}). burst_size :: integer()}).
-record(state, {}). -record(state, {}).
-type shaper() :: none | #maxrate{}. -type shaper() :: none | p1_shaper:state().
-export_type([shaper/0]). -export_type([shaper/0]).
-spec start_link() -> {ok, pid()} | {error, any()}. -spec start_link() -> {ok, pid()} | {error, any()}.
@ -84,7 +78,6 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
-spec load_from_config() -> ok | {error, any()}. -spec load_from_config() -> ok | {error, any()}.
load_from_config() -> load_from_config() ->
Shapers = ejabberd_config:get_option(shaper, []), Shapers = ejabberd_config:get_option(shaper, []),
case mnesia:transaction( case mnesia:transaction(
@ -105,7 +98,6 @@ load_from_config() ->
end. end.
-spec get_max_rate(atom()) -> none | non_neg_integer(). -spec get_max_rate(atom()) -> none | non_neg_integer().
get_max_rate(none) -> get_max_rate(none) ->
none; none;
get_max_rate(Name) -> get_max_rate(Name) ->
@ -122,29 +114,18 @@ new(none) ->
new(Name) -> new(Name) ->
case ets:lookup(shaper, {Name, global}) of case ets:lookup(shaper, {Name, global}) of
[#shaper{maxrate = R, burst_size = B}] -> [#shaper{maxrate = R, burst_size = B}] ->
p1_shaper:new(R, B);
#maxrate{maxrate = R, burst_size = B,
acquired_credit = B,
lasttime = p1_time_compat:system_time(micro_seconds)};
[] -> [] ->
none none
end. end.
-spec update(shaper(), integer()) -> {shaper(), integer()}. -spec update(shaper(), integer()) -> {shaper(), integer()}.
update(none, _Size) -> {none, 0}; update(none, _Size) -> {none, 0};
update(#maxrate{maxrate = MR, burst_size = BS, update(Shaper, Size) ->
acquired_credit = AC, lasttime = L} = State, Size) -> Result = p1_shaper:update(Shaper, Size),
Now = p1_time_compat:system_time(micro_seconds), ?DEBUG("Shaper update:~n~s =>~n~s",
AC2 = min(BS, AC + (MR*(Now - L) div 1000000) - Size), [p1_shaper:pp(Shaper), p1_shaper:pp(Result)]),
Result.
Pause = if AC2 >= 0 -> 0;
true -> -1000*AC2 div MR
end,
?DEBUG("MaxRate=~p, BurstSize=~p, AcquiredCredit=~p, Size=~p, NewAcquiredCredit=~p, Pause=~p",
[MR, BS, AC, Size, AC2, Pause]),
{State#maxrate{acquired_credit = AC2, lasttime = Now},
Pause}.
transform_options(Opts) -> transform_options(Opts) ->
lists:foldl(fun transform_options/2, [], Opts). lists:foldl(fun transform_options/2, [], Opts).

View File

@ -107,7 +107,7 @@ prepare_turn_opts(Opts, _UseTurn = true) ->
_ -> _ ->
[] []
end, end,
MaxRate = shaper:get_max_rate(Shaper), MaxRate = ejabberd_shaper:get_max_rate(Shaper),
Opts1 = Realm ++ [{auth_fun, AuthFun},{shaper, MaxRate} | Opts1 = Realm ++ [{auth_fun, AuthFun},{shaper, MaxRate} |
lists:keydelete(shaper, 1, Opts)], lists:keydelete(shaper, 1, Opts)],
set_certfile(Opts1). set_certfile(Opts1).

View File

@ -53,7 +53,7 @@ init([]) ->
simple_supervisor(ejabberd_s2s_out), simple_supervisor(ejabberd_s2s_out),
simple_supervisor(ejabberd_service), simple_supervisor(ejabberd_service),
worker(acl), worker(acl),
worker(shaper), worker(ejabberd_shaper),
supervisor(ejabberd_backend_sup), supervisor(ejabberd_backend_sup),
supervisor(ejabberd_rdbms), supervisor(ejabberd_rdbms),
supervisor(ejabberd_riak_sup), supervisor(ejabberd_riak_sup),

View File

@ -84,7 +84,7 @@
max_rooms_discoitems = 100 :: non_neg_integer(), max_rooms_discoitems = 100 :: non_neg_integer(),
queue_type = ram :: ram | file, queue_type = ram :: ram | file,
default_room_opts = [] :: list(), default_room_opts = [] :: list(),
room_shaper = none :: shaper:shaper()}). room_shaper = none :: ejabberd_shaper:shaper()}).
-type muc_room_opts() :: [{atom(), any()}]. -type muc_room_opts() :: [{atom(), any()}].
-callback init(binary(), gen_mod:opts()) -> any(). -callback init(binary(), gen_mod:opts()) -> any().

View File

@ -122,7 +122,7 @@ start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueT
init([Host, ServerHost, Access, Room, HistorySize, init([Host, ServerHost, Access, Room, HistorySize,
RoomShaper, Creator, _Nick, DefRoomOpts, QueueType]) -> RoomShaper, Creator, _Nick, DefRoomOpts, QueueType]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
Shaper = shaper:new(RoomShaper), Shaper = ejabberd_shaper:new(RoomShaper),
RoomQueue = room_queue_new(ServerHost, Shaper, QueueType), RoomQueue = room_queue_new(ServerHost, Shaper, QueueType),
State = set_affiliation(Creator, owner, State = set_affiliation(Creator, owner,
#state{host = Host, server_host = ServerHost, #state{host = Host, server_host = ServerHost,
@ -141,7 +141,7 @@ init([Host, ServerHost, Access, Room, HistorySize,
{ok, normal_state, State1}; {ok, normal_state, State1};
init([Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType]) -> init([Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
Shaper = shaper:new(RoomShaper), Shaper = ejabberd_shaper:new(RoomShaper),
RoomQueue = room_queue_new(ServerHost, Shaper, QueueType), RoomQueue = room_queue_new(ServerHost, Shaper, QueueType),
State = set_opts(Opts, #state{host = Host, State = set_opts(Opts, #state{host = Host,
server_host = ServerHost, server_host = ServerHost,
@ -169,7 +169,7 @@ normal_state({route, <<"">>,
* 1000000), * 1000000),
Size = element_size(Packet), Size = element_size(Packet),
{MessageShaper, MessageShaperInterval} = {MessageShaper, MessageShaperInterval} =
shaper:update(Activity#activity.message_shaper, Size), ejabberd_shaper:update(Activity#activity.message_shaper, Size),
if Activity#activity.message /= undefined -> if Activity#activity.message /= undefined ->
ErrText = <<"Traffic rate limit is exceeded">>, ErrText = <<"Traffic rate limit is exceeded">>,
Err = xmpp:err_resource_constraint(ErrText, Lang), Err = xmpp:err_resource_constraint(ErrText, Lang),
@ -178,7 +178,7 @@ normal_state({route, <<"">>,
Now >= Activity#activity.message_time + MinMessageInterval, Now >= Activity#activity.message_time + MinMessageInterval,
MessageShaperInterval == 0 -> MessageShaperInterval == 0 ->
{RoomShaper, RoomShaperInterval} = {RoomShaper, RoomShaperInterval} =
shaper:update(StateData#state.room_shaper, Size), ejabberd_shaper:update(StateData#state.room_shaper, Size),
RoomQueueEmpty = case StateData#state.room_queue of RoomQueueEmpty = case StateData#state.room_queue of
undefined -> true; undefined -> true;
RQ -> p1_queue:is_empty(RQ) RQ -> p1_queue:is_empty(RQ)
@ -1503,7 +1503,7 @@ get_max_users_admin_threshold(StateData) ->
gen_mod:get_module_opt(StateData#state.server_host, gen_mod:get_module_opt(StateData#state.server_host,
mod_muc, max_users_admin_threshold). mod_muc, max_users_admin_threshold).
-spec room_queue_new(binary(), shaper:shaper(), _) -> p1_queue:queue(). -spec room_queue_new(binary(), ejabberd_shaper:shaper(), _) -> p1_queue:queue().
room_queue_new(ServerHost, Shaper, QueueType) -> room_queue_new(ServerHost, Shaper, QueueType) ->
HaveRoomShaper = Shaper /= none, HaveRoomShaper = Shaper /= none,
HaveMessageShaper = gen_mod:get_module_opt( HaveMessageShaper = gen_mod:get_module_opt(
@ -1533,10 +1533,10 @@ get_user_activity(JID, StateData) ->
{ok, _P, A} -> A; {ok, _P, A} -> A;
error -> error ->
MessageShaper = MessageShaper =
shaper:new(gen_mod:get_module_opt(StateData#state.server_host, ejabberd_shaper:new(gen_mod:get_module_opt(StateData#state.server_host,
mod_muc, user_message_shaper)), mod_muc, user_message_shaper)),
PresenceShaper = PresenceShaper =
shaper:new(gen_mod:get_module_opt(StateData#state.server_host, ejabberd_shaper:new(gen_mod:get_module_opt(StateData#state.server_host,
mod_muc, user_presence_shaper)), mod_muc, user_presence_shaper)),
#activity{message_shaper = MessageShaper, #activity{message_shaper = MessageShaper,
presence_shaper = PresenceShaper} presence_shaper = PresenceShaper}
@ -1575,10 +1575,10 @@ store_user_activity(JID, UserActivity, StateData) ->
of of
true -> true ->
{_, MessageShaperInterval} = {_, MessageShaperInterval} =
shaper:update(UserActivity#activity.message_shaper, ejabberd_shaper:update(UserActivity#activity.message_shaper,
100000), 100000),
{_, PresenceShaperInterval} = {_, PresenceShaperInterval} =
shaper:update(UserActivity#activity.presence_shaper, ejabberd_shaper:update(UserActivity#activity.presence_shaper,
100000), 100000),
Delay = lists:max([MessageShaperInterval, Delay = lists:max([MessageShaperInterval,
PresenceShaperInterval, PresenceShaperInterval,
@ -1620,7 +1620,7 @@ prepare_room_queue(StateData) ->
Packet = Activity#activity.message, Packet = Activity#activity.message,
Size = element_size(Packet), Size = element_size(Packet),
{RoomShaper, RoomShaperInterval} = {RoomShaper, RoomShaperInterval} =
shaper:update(StateData#state.room_shaper, Size), ejabberd_shaper:update(StateData#state.room_shaper, Size),
erlang:send_after(RoomShaperInterval, self(), erlang:send_after(RoomShaperInterval, self(),
process_room_queue), process_room_queue),
StateData#state{room_shaper = RoomShaper}; StateData#state{room_shaper = RoomShaper};
@ -1629,7 +1629,7 @@ prepare_room_queue(StateData) ->
{_Nick, Packet} = Activity#activity.presence, {_Nick, Packet} = Activity#activity.presence,
Size = element_size(Packet), Size = element_size(Packet),
{RoomShaper, RoomShaperInterval} = {RoomShaper, RoomShaperInterval} =
shaper:update(StateData#state.room_shaper, Size), ejabberd_shaper:update(StateData#state.room_shaper, Size),
erlang:send_after(RoomShaperInterval, self(), erlang:send_after(RoomShaperInterval, self(),
process_room_queue), process_room_queue),
StateData#state{room_shaper = RoomShaper}; StateData#state{room_shaper = RoomShaper};

View File

@ -53,7 +53,7 @@
sha1 = <<"">> :: binary(), sha1 = <<"">> :: binary(),
host = <<"">> :: binary(), host = <<"">> :: binary(),
auth_type = anonymous :: plain | anonymous, auth_type = anonymous :: plain | anonymous,
shaper = none :: shaper:shaper()}). shaper = none :: ejabberd_shaper:shaper()}).
%% Unused callbacks %% Unused callbacks
handle_event(_Event, StateName, StateData) -> handle_event(_Event, StateName, StateData) ->
@ -248,7 +248,7 @@ relay(MySocket, PeerSocket, Shaper) ->
{ok, Data} -> {ok, Data} ->
case gen_tcp:send(PeerSocket, Data) of case gen_tcp:send(PeerSocket, Data) of
ok -> ok ->
{NewShaper, Pause} = shaper:update(Shaper, byte_size(Data)), {NewShaper, Pause} = ejabberd_shaper:update(Shaper, byte_size(Data)),
if Pause > 0 -> timer:sleep(Pause); if Pause > 0 -> timer:sleep(Pause);
true -> pass true -> pass
end, end,
@ -278,11 +278,11 @@ select_auth_method(anonymous, AuthMethods) ->
find_maxrate(Shaper, JID1, JID2, Host) -> find_maxrate(Shaper, JID1, JID2, Host) ->
MaxRate1 = case acl:match_rule(Host, Shaper, JID1) of MaxRate1 = case acl:match_rule(Host, Shaper, JID1) of
deny -> none; deny -> none;
R1 -> shaper:new(R1) R1 -> ejabberd_shaper:new(R1)
end, end,
MaxRate2 = case acl:match_rule(Host, Shaper, JID2) of MaxRate2 = case acl:match_rule(Host, Shaper, JID2) of
deny -> none; deny -> none;
R2 -> shaper:new(R2) R2 -> ejabberd_shaper:new(R2)
end, end,
if MaxRate1 == none; MaxRate2 == none -> none; if MaxRate1 == none; MaxRate2 == none -> none;
true -> lists:max([MaxRate1, MaxRate2]) true -> lists:max([MaxRate1, MaxRate2])

View File

@ -70,7 +70,7 @@
socket :: socket(), socket :: socket(),
max_stanza_size = infinity :: timeout(), max_stanza_size = infinity :: timeout(),
xml_stream :: undefined | fxml_stream:xml_stream_state(), xml_stream :: undefined | fxml_stream:xml_stream_state(),
shaper = none :: none | shaper:shaper(), shaper = none :: none | ejabberd_shaper:shaper(),
receiver :: receiver()}). receiver :: receiver()}).
-type socket_state() :: #socket_state{}. -type socket_state() :: #socket_state{}.
@ -263,7 +263,7 @@ recv(#socket_state{sockmod = SockMod, socket = Socket} = SocketData, Data) ->
end. end.
change_shaper(#socket_state{receiver = undefined} = SocketData, Shaper) -> change_shaper(#socket_state{receiver = undefined} = SocketData, Shaper) ->
ShaperState = shaper:new(Shaper), ShaperState = ejabberd_shaper:new(Shaper),
SocketData#socket_state{shaper = ShaperState}; SocketData#socket_state{shaper = ShaperState};
change_shaper(#socket_state{sockmod = SockMod, change_shaper(#socket_state{sockmod = SockMod,
socket = Socket} = SocketData, Shaper) -> socket = Socket} = SocketData, Shaper) ->
@ -373,7 +373,7 @@ parse(#socket_state{xml_stream = XMLStream,
when is_binary(Data) -> when is_binary(Data) ->
?DEBUG("(~s) Received XML on stream = ~p", [pp(SocketData), Data]), ?DEBUG("(~s) Received XML on stream = ~p", [pp(SocketData), Data]),
XMLStream1 = fxml_stream:parse(XMLStream, Data), XMLStream1 = fxml_stream:parse(XMLStream, Data),
{ShaperState1, Pause} = shaper:update(ShaperState, byte_size(Data)), {ShaperState1, Pause} = ejabberd_shaper:update(ShaperState, byte_size(Data)),
Ret = if Pause > 0 -> Ret = if Pause > 0 ->
activate_after(Socket, self(), Pause); activate_after(Socket, self(), Pause);
true -> true ->

View File

@ -183,7 +183,7 @@ get_transport(#{socket := Socket, owner := Owner})
get_transport(_) -> get_transport(_) ->
erlang:error(badarg). erlang:error(badarg).
-spec change_shaper(state(), shaper:shaper()) -> state(). -spec change_shaper(state(), ejabberd_shaper:shaper()) -> state().
change_shaper(#{socket := Socket, owner := Owner} = State, Shaper) change_shaper(#{socket := Socket, owner := Owner} = State, Shaper)
when Owner == self() -> when Owner == self() ->
Socket1 = xmpp_socket:change_shaper(Socket, Shaper), Socket1 = xmpp_socket:change_shaper(Socket, Shaper),

View File

@ -216,7 +216,7 @@ get_transport(#{socket := Socket, owner := Owner})
get_transport(_) -> get_transport(_) ->
erlang:error(badarg). erlang:error(badarg).
-spec change_shaper(state(), shaper:shaper()) -> state(). -spec change_shaper(state(), ejabberd_shaper:shaper()) -> state().
change_shaper(#{socket := Socket, owner := Owner} = State, Shaper) change_shaper(#{socket := Socket, owner := Owner} = State, Shaper)
when Owner == self() -> when Owner == self() ->
Socket1 = xmpp_socket:change_shaper(Socket, Shaper), Socket1 = xmpp_socket:change_shaper(Socket, Shaper),