25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-12-20 17:27:00 +01:00

use queue to reduced quadratic time effort on selective receive (thanks to Alexey Shchepin)

SVN Revision: 2644
This commit is contained in:
Evgeniy Khramtsov 2009-10-07 13:41:36 +00:00
parent 0a77892391
commit 72535ee14d
2 changed files with 124 additions and 51 deletions

View File

@ -27,7 +27,9 @@
-module(ejabberd_service). -module(ejabberd_service).
-author('alexey@process-one.net'). -author('alexey@process-one.net').
-behaviour(gen_fsm). -define(GEN_FSM, p1_fsm).
-behaviour(?GEN_FSM).
%% External exports %% External exports
-export([start/2, -export([start/2,
@ -62,6 +64,10 @@
-define(FSMOPTS, []). -define(FSMOPTS, []).
-endif. -endif.
%% Only change this value if you now what your are doing:
-define(FSMLIMITS,[]).
%% -define(FSMLIMITS, [{max_queue, 2000}]).
-define(STREAM_HEADER, -define(STREAM_HEADER,
"<?xml version='1.0'?>" "<?xml version='1.0'?>"
"<stream:stream " "<stream:stream "
@ -100,7 +106,8 @@ start(SockData, Opts) ->
supervisor:start_child(ejabberd_service_sup, [SockData, Opts]). supervisor:start_child(ejabberd_service_sup, [SockData, Opts]).
start_link(SockData, Opts) -> start_link(SockData, Opts) ->
gen_fsm:start_link(ejabberd_service, [SockData, Opts], ?FSMOPTS). ?GEN_FSM:start_link(
ejabberd_service, [SockData, Opts], ?FSMLIMITS ++ ?FSMOPTS).
socket_type() -> socket_type() ->
xml_stream. xml_stream.

View File

@ -21,7 +21,7 @@
%% terminate immediatetly. If the fsm trap_exit process flag has been %% terminate immediatetly. If the fsm trap_exit process flag has been
%% set to true, the FSM terminate function will called. %% set to true, the FSM terminate function will called.
%% - You can pass the gen_fsm options to control resource usage. %% - You can pass the gen_fsm options to control resource usage.
%% {max_messages, N} will exit the process with priority_shutdown %% {max_queue, N} will exit the process with priority_shutdown
%% - You can limit the time processing a message (TODO): If the %% - You can limit the time processing a message (TODO): If the
%% message processing does not return in a given period of time, the %% message processing does not return in a given period of time, the
%% process will be terminated. %% process will be terminated.
@ -123,7 +123,7 @@
sync_send_all_state_event/2, sync_send_all_state_event/3, sync_send_all_state_event/2, sync_send_all_state_event/3,
reply/2, reply/2,
start_timer/2,send_event_after/2,cancel_timer/1, start_timer/2,send_event_after/2,cancel_timer/1,
enter_loop/4, enter_loop/5, enter_loop/6]). enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]).
-export([behaviour_info/1]). -export([behaviour_info/1]).
@ -273,8 +273,11 @@ enter_loop(Mod, Options, StateName, StateData, ServerName, Timeout) ->
Name = get_proc_name(ServerName), Name = get_proc_name(ServerName),
Parent = get_parent(), Parent = get_parent(),
Debug = gen:debug_options(Options), Debug = gen:debug_options(Options),
Limits= limit_options(Options), Limits = limit_options(Options),
loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, Limits). Queue = queue:new(),
QueueLen = 0,
loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug,
Limits, Queue, QueueLen).
get_proc_name(Pid) when is_pid(Pid) -> get_proc_name(Pid) when is_pid(Pid) ->
Pid; Pid;
@ -329,16 +332,19 @@ name_to_pid(Name) ->
%%% --------------------------------------------------- %%% ---------------------------------------------------
init_it(Starter, self, Name, Mod, Args, Options) -> init_it(Starter, self, Name, Mod, Args, Options) ->
init_it(Starter, self(), Name, Mod, Args, Options); init_it(Starter, self(), Name, Mod, Args, Options);
init_it(Starter, Parent, Name, Mod, Args, Options) -> init_it(Starter, Parent, Name0, Mod, Args, Options) ->
Name = name(Name0),
Debug = gen:debug_options(Options), Debug = gen:debug_options(Options),
Limits= limit_options(Options), Limits = limit_options(Options),
Queue = queue:new(),
QueueLen = 0,
case catch Mod:init(Args) of case catch Mod:init(Args) of
{ok, StateName, StateData} -> {ok, StateName, StateData} ->
proc_lib:init_ack(Starter, {ok, self()}), proc_lib:init_ack(Starter, {ok, self()}),
loop(Parent, Name, StateName, StateData, Mod, infinity, Debug, Limits); loop(Parent, Name, StateName, StateData, Mod, infinity, Debug, Limits, Queue, QueueLen);
{ok, StateName, StateData, Timeout} -> {ok, StateName, StateData, Timeout} ->
proc_lib:init_ack(Starter, {ok, self()}), proc_lib:init_ack(Starter, {ok, self()}),
loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, Limits); loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, Limits, Queue, QueueLen);
{stop, Reason} -> {stop, Reason} ->
proc_lib:init_ack(Starter, {error, Reason}), proc_lib:init_ack(Starter, {error, Reason}),
exit(Reason); exit(Reason);
@ -354,13 +360,35 @@ init_it(Starter, Parent, Name, Mod, Args, Options) ->
exit(Error) exit(Error)
end. end.
name({local,Name}) -> Name;
name({global,Name}) -> Name;
name(Pid) when is_pid(Pid) -> Pid.
%%----------------------------------------------------------------- %%-----------------------------------------------------------------
%% The MAIN loop %% The MAIN loop
%%----------------------------------------------------------------- %%-----------------------------------------------------------------
loop(Parent, Name, StateName, StateData, Mod, hibernate, Debug,
Limits, Queue, QueueLen)
when QueueLen > 0 ->
case queue:out(Queue) of
{{value, Msg}, Queue1} ->
decode_msg(Msg, Parent, Name, StateName, StateData, Mod, hibernate,
Debug, Limits, Queue1, QueueLen - 1, false);
{empty, _} ->
Reason = internal_queue_error,
error_info(Reason, Name, hibernate, StateName, StateData, Debug),
exit(Reason)
end;
loop(Parent, Name, StateName, StateData, Mod, hibernate, Debug,
Limits, _Queue, _QueueLen) ->
proc_lib:hibernate(?MODULE,wake_hib,
[Parent, Name, StateName, StateData, Mod,
Debug, Limits]);
%% First we test if we have reach a defined limit ... %% First we test if we have reach a defined limit ...
loop(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits) -> loop(Parent, Name, StateName, StateData, Mod, Time, Debug,
Limits, Queue, QueueLen) ->
try try
message_queue_len(Limits) message_queue_len(Limits, QueueLen)
%% TODO: We can add more limit checking here... %% TODO: We can add more limit checking here...
catch catch
{process_limit, Limit} -> {process_limit, Limit} ->
@ -369,54 +397,89 @@ loop(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits) ->
terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug) terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug)
end, end,
process_message(Parent, Name, StateName, StateData, process_message(Parent, Name, StateName, StateData,
Mod, Time, Debug, Limits). Mod, Time, Debug, Limits, Queue, QueueLen).
%% ... then we can process a new message: %% ... then we can process a new message:
process_message(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits) -> process_message(Parent, Name, StateName, StateData, Mod, Time, Debug,
Limits, Queue, QueueLen) ->
{Msg, Queue1, QueueLen1} = collect_messages(Queue, QueueLen, Time),
decode_msg(Msg,Parent, Name, StateName, StateData, Mod, Time,
Debug, Limits, Queue1, QueueLen1, false).
collect_messages(Queue, QueueLen, Time) ->
receive
Input ->
case Input of
{'EXIT', _Parent, priority_shutdown} ->
{Input, Queue, QueueLen};
_ ->
collect_messages(
queue:in(Input, Queue), QueueLen + 1, Time)
end
after 0 ->
case queue:out(Queue) of
{{value, Msg}, Queue1} ->
{Msg, Queue1, QueueLen - 1};
{empty, _} ->
receive
Input ->
{Input, Queue, QueueLen}
after Time ->
{{'$gen_event', timeout}, Queue, QueueLen}
end
end
end.
wake_hib(Parent, Name, StateName, StateData, Mod, Debug,
Limits) ->
Msg = receive Msg = receive
{'EXIT', Parent, priority_shutdown} -> Input ->
{'EXIT', Parent, priority_shutdown} Input
after 0 ->
receive
Input ->
Input
after Time ->
{'$gen_event', timeout}
end
end, end,
Queue = queue:new(),
QueueLen = 0,
decode_msg(Msg, Parent, Name, StateName, StateData, Mod, hibernate,
Debug, Limits, Queue, QueueLen, true).
decode_msg(Msg,Parent, Name, StateName, StateData, Mod, Time, Debug,
Limits, Queue, QueueLen, Hib) ->
put('$internal_queue_len', QueueLen),
case Msg of case Msg of
{system, From, Req} -> {system, From, Req} ->
sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
[Name, StateName, StateData, [Name, StateName, StateData,
Mod, Time, Limits]); Mod, Time, Limits, Queue, QueueLen], Hib);
{'EXIT', Parent, Reason} -> {'EXIT', Parent, Reason} ->
terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug); terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug);
_Msg when Debug == [] -> _Msg when Debug == [] ->
handle_msg(Msg, Parent, Name, StateName, StateData, handle_msg(Msg, Parent, Name, StateName, StateData,
Mod, Time, Limits); Mod, Time, Limits, Queue, QueueLen);
_Msg -> _Msg ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
{Name, StateName}, {in, Msg}), {Name, StateName}, {in, Msg}),
handle_msg(Msg, Parent, Name, StateName, StateData, handle_msg(Msg, Parent, Name, StateName, StateData,
Mod, Time, Debug1, Limits) Mod, Time, Debug1, Limits, Queue, QueueLen)
end. end.
%%----------------------------------------------------------------- %%-----------------------------------------------------------------
%% Callback functions for system messages handling. %% Callback functions for system messages handling.
%%----------------------------------------------------------------- %%-----------------------------------------------------------------
%% TODO: Fix me
system_continue(Parent, Debug, [Name, StateName, StateData, system_continue(Parent, Debug, [Name, StateName, StateData,
Mod, Time, Limits]) -> Mod, Time, Limits, Queue, QueueLen]) ->
loop(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits). loop(Parent, Name, StateName, StateData, Mod, Time, Debug,
Limits, Queue, QueueLen).
system_terminate(Reason, _Parent, Debug, system_terminate(Reason, _Parent, Debug,
[Name, StateName, StateData, Mod, _Time, _Limits]) -> [Name, StateName, StateData, Mod, _Time, _Limits]) ->
terminate(Reason, Name, [], Mod, StateName, StateData, Debug). terminate(Reason, Name, [], Mod, StateName, StateData, Debug).
system_code_change([Name, StateName, StateData, Mod, Time, Limits], system_code_change([Name, StateName, StateData, Mod, Time,
Limits, Queue, QueueLen],
_Module, OldVsn, Extra) -> _Module, OldVsn, Extra) ->
case catch Mod:code_change(OldVsn, StateName, StateData, Extra) of case catch Mod:code_change(OldVsn, StateName, StateData, Extra) of
{ok, NewStateName, NewStateData} -> {ok, NewStateName, NewStateData} ->
{ok, [Name, NewStateName, NewStateData, Mod, Time, Limits]}; {ok, [Name, NewStateName, NewStateData, Mod, Time,
Limits, Queue, QueueLen]};
Else -> Else Else -> Else
end. end.
@ -453,24 +516,27 @@ print_event(Dev, return, {Name, StateName}) ->
io:format(Dev, "*DBG* ~p switched to state ~w~n", io:format(Dev, "*DBG* ~p switched to state ~w~n",
[Name, StateName]). [Name, StateName]).
handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time, Limits) -> %No debug here handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time,
Limits, Queue, QueueLen) -> %No debug here
From = from(Msg), From = from(Msg),
case catch dispatch(Msg, Mod, StateName, StateData) of case catch dispatch(Msg, Mod, StateName, StateData) of
{next_state, NStateName, NStateData} -> {next_state, NStateName, NStateData} ->
loop(Parent, Name, NStateName, NStateData, loop(Parent, Name, NStateName, NStateData,
Mod, infinity, [], Limits); Mod, infinity, [], Limits, Queue, QueueLen);
{next_state, NStateName, NStateData, Time1} -> {next_state, NStateName, NStateData, Time1} ->
loop(Parent, Name, NStateName, NStateData, Mod, Time1, [], Limits); loop(Parent, Name, NStateName, NStateData, Mod, Time1, [],
{reply, Reply, NStateName, NStateData} when From /= undefined -> Limits, Queue, QueueLen);
{reply, Reply, NStateName, NStateData} when From =/= undefined ->
reply(From, Reply), reply(From, Reply),
loop(Parent, Name, NStateName, NStateData, loop(Parent, Name, NStateName, NStateData,
Mod, infinity, [], Limits); Mod, infinity, [], Limits, Queue, QueueLen);
{reply, Reply, NStateName, NStateData, Time1} when From /= undefined -> {reply, Reply, NStateName, NStateData, Time1} when From =/= undefined ->
reply(From, Reply), reply(From, Reply),
loop(Parent, Name, NStateName, NStateData, Mod, Time1, [], Limits); loop(Parent, Name, NStateName, NStateData, Mod, Time1, [],
Limits, Queue, QueueLen);
{stop, Reason, NStateData} -> {stop, Reason, NStateData} ->
terminate(Reason, Name, Msg, Mod, StateName, NStateData, []); terminate(Reason, Name, Msg, Mod, StateName, NStateData, []);
{stop, Reason, Reply, NStateData} when From /= undefined -> {stop, Reason, Reply, NStateData} when From =/= undefined ->
{'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod,
StateName, NStateData, [])), StateName, NStateData, [])),
reply(From, Reply), reply(From, Reply),
@ -483,30 +549,30 @@ handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time, Limits) -> %No d
end. end.
handle_msg(Msg, Parent, Name, StateName, StateData, handle_msg(Msg, Parent, Name, StateName, StateData,
Mod, _Time, Debug, Limits) -> Mod, _Time, Debug, Limits, Queue, QueueLen) ->
From = from(Msg), From = from(Msg),
case catch dispatch(Msg, Mod, StateName, StateData) of case catch dispatch(Msg, Mod, StateName, StateData) of
{next_state, NStateName, NStateData} -> {next_state, NStateName, NStateData} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
{Name, NStateName}, return), {Name, NStateName}, return),
loop(Parent, Name, NStateName, NStateData, loop(Parent, Name, NStateName, NStateData,
Mod, infinity, Debug1, Limits); Mod, infinity, Debug1, Limits, Queue, QueueLen);
{next_state, NStateName, NStateData, Time1} -> {next_state, NStateName, NStateData, Time1} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
{Name, NStateName}, return), {Name, NStateName}, return),
loop(Parent, Name, NStateName, NStateData, loop(Parent, Name, NStateName, NStateData,
Mod, Time1, Debug1, Limits); Mod, Time1, Debug1, Limits, Queue, QueueLen);
{reply, Reply, NStateName, NStateData} when From /= undefined -> {reply, Reply, NStateName, NStateData} when From =/= undefined ->
Debug1 = reply(Name, From, Reply, Debug, NStateName), Debug1 = reply(Name, From, Reply, Debug, NStateName),
loop(Parent, Name, NStateName, NStateData, loop(Parent, Name, NStateName, NStateData,
Mod, infinity, Debug1, Limits); Mod, infinity, Debug1, Limits, Queue, QueueLen);
{reply, Reply, NStateName, NStateData, Time1} when From /= undefined -> {reply, Reply, NStateName, NStateData, Time1} when From =/= undefined ->
Debug1 = reply(Name, From, Reply, Debug, NStateName), Debug1 = reply(Name, From, Reply, Debug, NStateName),
loop(Parent, Name, NStateName, NStateData, loop(Parent, Name, NStateName, NStateData,
Mod, Time1, Debug1, Limits); Mod, Time1, Debug1, Limits, Queue, QueueLen);
{stop, Reason, NStateData} -> {stop, Reason, NStateData} ->
terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug); terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug);
{stop, Reason, Reply, NStateData} when From /= undefined -> {stop, Reason, Reply, NStateData} when From =/= undefined ->
{'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod,
StateName, NStateData, Debug)), StateName, NStateData, Debug)),
reply(Name, From, Reply, Debug, StateName), reply(Name, From, Reply, Debug, StateName),
@ -671,13 +737,13 @@ limit_options([_|Options], Limits) ->
%% Throw max_queue if we have reach the max queue size %% Throw max_queue if we have reach the max queue size
%% Returns ok otherwise %% Returns ok otherwise
message_queue_len(#limits{max_queue = undefined}) -> message_queue_len(#limits{max_queue = undefined}, _QueueLen) ->
ok; ok;
message_queue_len(#limits{max_queue = MaxQueue}) -> message_queue_len(#limits{max_queue = MaxQueue}, QueueLen) ->
Pid = self(), Pid = self(),
case process_info(Pid, message_queue_len) of case process_info(Pid, message_queue_len) of
{message_queue_len, N} when N > MaxQueue -> {message_queue_len, N} when N + QueueLen > MaxQueue ->
throw({process_limit, {max_queue, N}}); throw({process_limit, {max_queue, N + QueueLen}});
_ -> _ ->
ok ok
end. end.