25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-11-26 16:26:24 +01:00

Use queue to reduced quadratic time effort on selective receive (thanks to Alexey Shchepin)

Merged from trunk@2644

SVN Revision: 2646
This commit is contained in:
Badlop 2009-10-08 11:33:18 +00:00
parent 65926460b2
commit a2bb22a0e7
2 changed files with 125 additions and 51 deletions

View File

@ -27,7 +27,9 @@
-module(ejabberd_service).
-author('alexey@process-one.net').
-behaviour(gen_fsm).
-define(GEN_FSM, p1_fsm).
-behaviour(?GEN_FSM).
%% External exports
-export([start/2,
@ -68,6 +70,11 @@
-define(DEFAULT_NS, ?NS_COMPONENT_ACCEPT).
-define(PREFIXED_NS, [{?NS_XMPP, ?NS_XMPP_pfx}]).
%% Only change this value if you now what your are doing:
-define(FSMLIMITS,[]).
%% -define(FSMLIMITS, [{max_queue, 2000}]).
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
@ -75,7 +82,8 @@ start(SockData, Opts) ->
supervisor:start_child(ejabberd_service_sup, [SockData, Opts]).
start_link(SockData, Opts) ->
gen_fsm:start_link(ejabberd_service, [SockData, Opts], ?FSMOPTS).
?GEN_FSM:start_link(
ejabberd_service, [SockData, Opts], ?FSMLIMITS ++ ?FSMOPTS).
socket_type() ->
xml_stream.

View File

@ -21,7 +21,7 @@
%% terminate immediatetly. If the fsm trap_exit process flag has been
%% set to true, the FSM terminate function will called.
%% - You can pass the gen_fsm options to control resource usage.
%% {max_messages, N} will exit the process with priority_shutdown
%% {max_queue, N} will exit the process with priority_shutdown
%% - You can limit the time processing a message (TODO): If the
%% message processing does not return in a given period of time, the
%% process will be terminated.
@ -123,7 +123,7 @@
sync_send_all_state_event/2, sync_send_all_state_event/3,
reply/2,
start_timer/2,send_event_after/2,cancel_timer/1,
enter_loop/4, enter_loop/5, enter_loop/6]).
enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]).
-export([behaviour_info/1]).
@ -274,7 +274,10 @@ enter_loop(Mod, Options, StateName, StateData, ServerName, Timeout) ->
Parent = get_parent(),
Debug = gen:debug_options(Options),
Limits = limit_options(Options),
loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, Limits).
Queue = queue:new(),
QueueLen = 0,
loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug,
Limits, Queue, QueueLen).
get_proc_name(Pid) when is_pid(Pid) ->
Pid;
@ -329,16 +332,19 @@ name_to_pid(Name) ->
%%% ---------------------------------------------------
init_it(Starter, self, Name, Mod, Args, Options) ->
init_it(Starter, self(), Name, Mod, Args, Options);
init_it(Starter, Parent, Name, Mod, Args, Options) ->
init_it(Starter, Parent, Name0, Mod, Args, Options) ->
Name = name(Name0),
Debug = gen:debug_options(Options),
Limits = limit_options(Options),
Queue = queue:new(),
QueueLen = 0,
case catch Mod:init(Args) of
{ok, StateName, StateData} ->
proc_lib:init_ack(Starter, {ok, self()}),
loop(Parent, Name, StateName, StateData, Mod, infinity, Debug, Limits);
loop(Parent, Name, StateName, StateData, Mod, infinity, Debug, Limits, Queue, QueueLen);
{ok, StateName, StateData, Timeout} ->
proc_lib:init_ack(Starter, {ok, self()}),
loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, Limits);
loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, Limits, Queue, QueueLen);
{stop, Reason} ->
proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason);
@ -354,13 +360,35 @@ init_it(Starter, Parent, Name, Mod, Args, Options) ->
exit(Error)
end.
name({local,Name}) -> Name;
name({global,Name}) -> Name;
name(Pid) when is_pid(Pid) -> Pid.
%%-----------------------------------------------------------------
%% The MAIN loop
%%-----------------------------------------------------------------
loop(Parent, Name, StateName, StateData, Mod, hibernate, Debug,
Limits, Queue, QueueLen)
when QueueLen > 0 ->
case queue:out(Queue) of
{{value, Msg}, Queue1} ->
decode_msg(Msg, Parent, Name, StateName, StateData, Mod, hibernate,
Debug, Limits, Queue1, QueueLen - 1, false);
{empty, _} ->
Reason = internal_queue_error,
error_info(Reason, Name, hibernate, StateName, StateData, Debug),
exit(Reason)
end;
loop(Parent, Name, StateName, StateData, Mod, hibernate, Debug,
Limits, _Queue, _QueueLen) ->
proc_lib:hibernate(?MODULE,wake_hib,
[Parent, Name, StateName, StateData, Mod,
Debug, Limits]);
%% First we test if we have reach a defined limit ...
loop(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits) ->
loop(Parent, Name, StateName, StateData, Mod, Time, Debug,
Limits, Queue, QueueLen) ->
try
message_queue_len(Limits)
message_queue_len(Limits, QueueLen)
%% TODO: We can add more limit checking here...
catch
{process_limit, Limit} ->
@ -369,54 +397,89 @@ loop(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits) ->
terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug)
end,
process_message(Parent, Name, StateName, StateData,
Mod, Time, Debug, Limits).
Mod, Time, Debug, Limits, Queue, QueueLen).
%% ... then we can process a new message:
process_message(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits) ->
Msg = receive
{'EXIT', Parent, priority_shutdown} ->
{'EXIT', Parent, priority_shutdown}
after 0 ->
process_message(Parent, Name, StateName, StateData, Mod, Time, Debug,
Limits, Queue, QueueLen) ->
{Msg, Queue1, QueueLen1} = collect_messages(Queue, QueueLen, Time),
decode_msg(Msg,Parent, Name, StateName, StateData, Mod, Time,
Debug, Limits, Queue1, QueueLen1, false).
collect_messages(Queue, QueueLen, Time) ->
receive
Input ->
Input
after Time ->
{'$gen_event', timeout}
case Input of
{'EXIT', _Parent, priority_shutdown} ->
{Input, Queue, QueueLen};
_ ->
collect_messages(
queue:in(Input, Queue), QueueLen + 1, Time)
end
after 0 ->
case queue:out(Queue) of
{{value, Msg}, Queue1} ->
{Msg, Queue1, QueueLen - 1};
{empty, _} ->
receive
Input ->
{Input, Queue, QueueLen}
after Time ->
{{'$gen_event', timeout}, Queue, QueueLen}
end
end
end.
wake_hib(Parent, Name, StateName, StateData, Mod, Debug,
Limits) ->
Msg = receive
Input ->
Input
end,
Queue = queue:new(),
QueueLen = 0,
decode_msg(Msg, Parent, Name, StateName, StateData, Mod, hibernate,
Debug, Limits, Queue, QueueLen, true).
decode_msg(Msg,Parent, Name, StateName, StateData, Mod, Time, Debug,
Limits, Queue, QueueLen, Hib) ->
put('$internal_queue_len', QueueLen),
case Msg of
{system, From, Req} ->
sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
[Name, StateName, StateData,
Mod, Time, Limits]);
Mod, Time, Limits, Queue, QueueLen], Hib);
{'EXIT', Parent, Reason} ->
terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug);
_Msg when Debug == [] ->
handle_msg(Msg, Parent, Name, StateName, StateData,
Mod, Time, Limits);
Mod, Time, Limits, Queue, QueueLen);
_Msg ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
{Name, StateName}, {in, Msg}),
handle_msg(Msg, Parent, Name, StateName, StateData,
Mod, Time, Debug1, Limits)
Mod, Time, Debug1, Limits, Queue, QueueLen)
end.
%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
%%-----------------------------------------------------------------
%% TODO: Fix me
system_continue(Parent, Debug, [Name, StateName, StateData,
Mod, Time, Limits]) ->
loop(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits).
Mod, Time, Limits, Queue, QueueLen]) ->
loop(Parent, Name, StateName, StateData, Mod, Time, Debug,
Limits, Queue, QueueLen).
system_terminate(Reason, _Parent, Debug,
[Name, StateName, StateData, Mod, _Time, _Limits]) ->
terminate(Reason, Name, [], Mod, StateName, StateData, Debug).
system_code_change([Name, StateName, StateData, Mod, Time, Limits],
system_code_change([Name, StateName, StateData, Mod, Time,
Limits, Queue, QueueLen],
_Module, OldVsn, Extra) ->
case catch Mod:code_change(OldVsn, StateName, StateData, Extra) of
{ok, NewStateName, NewStateData} ->
{ok, [Name, NewStateName, NewStateData, Mod, Time, Limits]};
{ok, [Name, NewStateName, NewStateData, Mod, Time,
Limits, Queue, QueueLen]};
Else -> Else
end.
@ -453,24 +516,27 @@ print_event(Dev, return, {Name, StateName}) ->
io:format(Dev, "*DBG* ~p switched to state ~w~n",
[Name, StateName]).
handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time, Limits) -> %No debug here
handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time,
Limits, Queue, QueueLen) -> %No debug here
From = from(Msg),
case catch dispatch(Msg, Mod, StateName, StateData) of
{next_state, NStateName, NStateData} ->
loop(Parent, Name, NStateName, NStateData,
Mod, infinity, [], Limits);
Mod, infinity, [], Limits, Queue, QueueLen);
{next_state, NStateName, NStateData, Time1} ->
loop(Parent, Name, NStateName, NStateData, Mod, Time1, [], Limits);
{reply, Reply, NStateName, NStateData} when From /= undefined ->
loop(Parent, Name, NStateName, NStateData, Mod, Time1, [],
Limits, Queue, QueueLen);
{reply, Reply, NStateName, NStateData} when From =/= undefined ->
reply(From, Reply),
loop(Parent, Name, NStateName, NStateData,
Mod, infinity, [], Limits);
{reply, Reply, NStateName, NStateData, Time1} when From /= undefined ->
Mod, infinity, [], Limits, Queue, QueueLen);
{reply, Reply, NStateName, NStateData, Time1} when From =/= undefined ->
reply(From, Reply),
loop(Parent, Name, NStateName, NStateData, Mod, Time1, [], Limits);
loop(Parent, Name, NStateName, NStateData, Mod, Time1, [],
Limits, Queue, QueueLen);
{stop, Reason, NStateData} ->
terminate(Reason, Name, Msg, Mod, StateName, NStateData, []);
{stop, Reason, Reply, NStateData} when From /= undefined ->
{stop, Reason, Reply, NStateData} when From =/= undefined ->
{'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod,
StateName, NStateData, [])),
reply(From, Reply),
@ -483,30 +549,30 @@ handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time, Limits) -> %No d
end.
handle_msg(Msg, Parent, Name, StateName, StateData,
Mod, _Time, Debug, Limits) ->
Mod, _Time, Debug, Limits, Queue, QueueLen) ->
From = from(Msg),
case catch dispatch(Msg, Mod, StateName, StateData) of
{next_state, NStateName, NStateData} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
{Name, NStateName}, return),
loop(Parent, Name, NStateName, NStateData,
Mod, infinity, Debug1, Limits);
Mod, infinity, Debug1, Limits, Queue, QueueLen);
{next_state, NStateName, NStateData, Time1} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
{Name, NStateName}, return),
loop(Parent, Name, NStateName, NStateData,
Mod, Time1, Debug1, Limits);
{reply, Reply, NStateName, NStateData} when From /= undefined ->
Mod, Time1, Debug1, Limits, Queue, QueueLen);
{reply, Reply, NStateName, NStateData} when From =/= undefined ->
Debug1 = reply(Name, From, Reply, Debug, NStateName),
loop(Parent, Name, NStateName, NStateData,
Mod, infinity, Debug1, Limits);
{reply, Reply, NStateName, NStateData, Time1} when From /= undefined ->
Mod, infinity, Debug1, Limits, Queue, QueueLen);
{reply, Reply, NStateName, NStateData, Time1} when From =/= undefined ->
Debug1 = reply(Name, From, Reply, Debug, NStateName),
loop(Parent, Name, NStateName, NStateData,
Mod, Time1, Debug1, Limits);
Mod, Time1, Debug1, Limits, Queue, QueueLen);
{stop, Reason, NStateData} ->
terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug);
{stop, Reason, Reply, NStateData} when From /= undefined ->
{stop, Reason, Reply, NStateData} when From =/= undefined ->
{'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod,
StateName, NStateData, Debug)),
reply(Name, From, Reply, Debug, StateName),
@ -671,13 +737,13 @@ limit_options([_|Options], Limits) ->
%% Throw max_queue if we have reach the max queue size
%% Returns ok otherwise
message_queue_len(#limits{max_queue = undefined}) ->
message_queue_len(#limits{max_queue = undefined}, _QueueLen) ->
ok;
message_queue_len(#limits{max_queue = MaxQueue}) ->
message_queue_len(#limits{max_queue = MaxQueue}, QueueLen) ->
Pid = self(),
case process_info(Pid, message_queue_len) of
{message_queue_len, N} when N > MaxQueue ->
throw({process_limit, {max_queue, N}});
{message_queue_len, N} when N + QueueLen > MaxQueue ->
throw({process_limit, {max_queue, N + QueueLen}});
_ ->
ok
end.