diff --git a/src/ejabberd_service.erl b/src/ejabberd_service.erl index 34d62f433..2305407b0 100644 --- a/src/ejabberd_service.erl +++ b/src/ejabberd_service.erl @@ -27,7 +27,9 @@ -module(ejabberd_service). -author('alexey@process-one.net'). --behaviour(gen_fsm). +-define(GEN_FSM, p1_fsm). + +-behaviour(?GEN_FSM). %% External exports -export([start/2, @@ -62,6 +64,10 @@ -define(FSMOPTS, []). -endif. +%% Only change this value if you now what your are doing: +-define(FSMLIMITS,[]). +%% -define(FSMLIMITS, [{max_queue, 2000}]). + -define(STREAM_HEADER, "" " supervisor:start_child(ejabberd_service_sup, [SockData, Opts]). start_link(SockData, Opts) -> - gen_fsm:start_link(ejabberd_service, [SockData, Opts], ?FSMOPTS). + ?GEN_FSM:start_link( + ejabberd_service, [SockData, Opts], ?FSMLIMITS ++ ?FSMOPTS). socket_type() -> xml_stream. diff --git a/src/p1_fsm.erl b/src/p1_fsm.erl index c4de9faa4..dec999266 100644 --- a/src/p1_fsm.erl +++ b/src/p1_fsm.erl @@ -21,7 +21,7 @@ %% terminate immediatetly. If the fsm trap_exit process flag has been %% set to true, the FSM terminate function will called. %% - You can pass the gen_fsm options to control resource usage. -%% {max_messages, N} will exit the process with priority_shutdown +%% {max_queue, N} will exit the process with priority_shutdown %% - You can limit the time processing a message (TODO): If the %% message processing does not return in a given period of time, the %% process will be terminated. @@ -123,7 +123,7 @@ sync_send_all_state_event/2, sync_send_all_state_event/3, reply/2, start_timer/2,send_event_after/2,cancel_timer/1, - enter_loop/4, enter_loop/5, enter_loop/6]). + enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/7]). -export([behaviour_info/1]). @@ -273,8 +273,11 @@ enter_loop(Mod, Options, StateName, StateData, ServerName, Timeout) -> Name = get_proc_name(ServerName), Parent = get_parent(), Debug = gen:debug_options(Options), - Limits= limit_options(Options), - loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, Limits). + Limits = limit_options(Options), + Queue = queue:new(), + QueueLen = 0, + loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, + Limits, Queue, QueueLen). get_proc_name(Pid) when is_pid(Pid) -> Pid; @@ -329,16 +332,19 @@ name_to_pid(Name) -> %%% --------------------------------------------------- init_it(Starter, self, Name, Mod, Args, Options) -> init_it(Starter, self(), Name, Mod, Args, Options); -init_it(Starter, Parent, Name, Mod, Args, Options) -> +init_it(Starter, Parent, Name0, Mod, Args, Options) -> + Name = name(Name0), Debug = gen:debug_options(Options), - Limits= limit_options(Options), + Limits = limit_options(Options), + Queue = queue:new(), + QueueLen = 0, case catch Mod:init(Args) of {ok, StateName, StateData} -> proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, StateName, StateData, Mod, infinity, Debug, Limits); + loop(Parent, Name, StateName, StateData, Mod, infinity, Debug, Limits, Queue, QueueLen); {ok, StateName, StateData, Timeout} -> proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, Limits); + loop(Parent, Name, StateName, StateData, Mod, Timeout, Debug, Limits, Queue, QueueLen); {stop, Reason} -> proc_lib:init_ack(Starter, {error, Reason}), exit(Reason); @@ -354,13 +360,35 @@ init_it(Starter, Parent, Name, Mod, Args, Options) -> exit(Error) end. +name({local,Name}) -> Name; +name({global,Name}) -> Name; +name(Pid) when is_pid(Pid) -> Pid. + %%----------------------------------------------------------------- %% The MAIN loop %%----------------------------------------------------------------- +loop(Parent, Name, StateName, StateData, Mod, hibernate, Debug, + Limits, Queue, QueueLen) + when QueueLen > 0 -> + case queue:out(Queue) of + {{value, Msg}, Queue1} -> + decode_msg(Msg, Parent, Name, StateName, StateData, Mod, hibernate, + Debug, Limits, Queue1, QueueLen - 1, false); + {empty, _} -> + Reason = internal_queue_error, + error_info(Reason, Name, hibernate, StateName, StateData, Debug), + exit(Reason) + end; +loop(Parent, Name, StateName, StateData, Mod, hibernate, Debug, + Limits, _Queue, _QueueLen) -> + proc_lib:hibernate(?MODULE,wake_hib, + [Parent, Name, StateName, StateData, Mod, + Debug, Limits]); %% First we test if we have reach a defined limit ... -loop(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits) -> +loop(Parent, Name, StateName, StateData, Mod, Time, Debug, + Limits, Queue, QueueLen) -> try - message_queue_len(Limits) + message_queue_len(Limits, QueueLen) %% TODO: We can add more limit checking here... catch {process_limit, Limit} -> @@ -369,54 +397,89 @@ loop(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits) -> terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug) end, process_message(Parent, Name, StateName, StateData, - Mod, Time, Debug, Limits). + Mod, Time, Debug, Limits, Queue, QueueLen). %% ... then we can process a new message: -process_message(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits) -> +process_message(Parent, Name, StateName, StateData, Mod, Time, Debug, + Limits, Queue, QueueLen) -> + {Msg, Queue1, QueueLen1} = collect_messages(Queue, QueueLen, Time), + decode_msg(Msg,Parent, Name, StateName, StateData, Mod, Time, + Debug, Limits, Queue1, QueueLen1, false). + +collect_messages(Queue, QueueLen, Time) -> + receive + Input -> + case Input of + {'EXIT', _Parent, priority_shutdown} -> + {Input, Queue, QueueLen}; + _ -> + collect_messages( + queue:in(Input, Queue), QueueLen + 1, Time) + end + after 0 -> + case queue:out(Queue) of + {{value, Msg}, Queue1} -> + {Msg, Queue1, QueueLen - 1}; + {empty, _} -> + receive + Input -> + {Input, Queue, QueueLen} + after Time -> + {{'$gen_event', timeout}, Queue, QueueLen} + end + end + end. + + +wake_hib(Parent, Name, StateName, StateData, Mod, Debug, + Limits) -> Msg = receive - {'EXIT', Parent, priority_shutdown} -> - {'EXIT', Parent, priority_shutdown} - after 0 -> - receive - Input -> - Input - after Time -> - {'$gen_event', timeout} - end + Input -> + Input end, + Queue = queue:new(), + QueueLen = 0, + decode_msg(Msg, Parent, Name, StateName, StateData, Mod, hibernate, + Debug, Limits, Queue, QueueLen, true). + +decode_msg(Msg,Parent, Name, StateName, StateData, Mod, Time, Debug, + Limits, Queue, QueueLen, Hib) -> + put('$internal_queue_len', QueueLen), case Msg of {system, From, Req} -> sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, [Name, StateName, StateData, - Mod, Time, Limits]); + Mod, Time, Limits, Queue, QueueLen], Hib); {'EXIT', Parent, Reason} -> terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug); _Msg when Debug == [] -> handle_msg(Msg, Parent, Name, StateName, StateData, - Mod, Time, Limits); + Mod, Time, Limits, Queue, QueueLen); _Msg -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, {Name, StateName}, {in, Msg}), handle_msg(Msg, Parent, Name, StateName, StateData, - Mod, Time, Debug1, Limits) + Mod, Time, Debug1, Limits, Queue, QueueLen) end. %%----------------------------------------------------------------- %% Callback functions for system messages handling. %%----------------------------------------------------------------- -%% TODO: Fix me system_continue(Parent, Debug, [Name, StateName, StateData, - Mod, Time, Limits]) -> - loop(Parent, Name, StateName, StateData, Mod, Time, Debug, Limits). + Mod, Time, Limits, Queue, QueueLen]) -> + loop(Parent, Name, StateName, StateData, Mod, Time, Debug, + Limits, Queue, QueueLen). system_terminate(Reason, _Parent, Debug, [Name, StateName, StateData, Mod, _Time, _Limits]) -> terminate(Reason, Name, [], Mod, StateName, StateData, Debug). -system_code_change([Name, StateName, StateData, Mod, Time, Limits], +system_code_change([Name, StateName, StateData, Mod, Time, + Limits, Queue, QueueLen], _Module, OldVsn, Extra) -> case catch Mod:code_change(OldVsn, StateName, StateData, Extra) of {ok, NewStateName, NewStateData} -> - {ok, [Name, NewStateName, NewStateData, Mod, Time, Limits]}; + {ok, [Name, NewStateName, NewStateData, Mod, Time, + Limits, Queue, QueueLen]}; Else -> Else end. @@ -453,24 +516,27 @@ print_event(Dev, return, {Name, StateName}) -> io:format(Dev, "*DBG* ~p switched to state ~w~n", [Name, StateName]). -handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time, Limits) -> %No debug here +handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time, + Limits, Queue, QueueLen) -> %No debug here From = from(Msg), case catch dispatch(Msg, Mod, StateName, StateData) of {next_state, NStateName, NStateData} -> loop(Parent, Name, NStateName, NStateData, - Mod, infinity, [], Limits); + Mod, infinity, [], Limits, Queue, QueueLen); {next_state, NStateName, NStateData, Time1} -> - loop(Parent, Name, NStateName, NStateData, Mod, Time1, [], Limits); - {reply, Reply, NStateName, NStateData} when From /= undefined -> + loop(Parent, Name, NStateName, NStateData, Mod, Time1, [], + Limits, Queue, QueueLen); + {reply, Reply, NStateName, NStateData} when From =/= undefined -> reply(From, Reply), loop(Parent, Name, NStateName, NStateData, - Mod, infinity, [], Limits); - {reply, Reply, NStateName, NStateData, Time1} when From /= undefined -> + Mod, infinity, [], Limits, Queue, QueueLen); + {reply, Reply, NStateName, NStateData, Time1} when From =/= undefined -> reply(From, Reply), - loop(Parent, Name, NStateName, NStateData, Mod, Time1, [], Limits); + loop(Parent, Name, NStateName, NStateData, Mod, Time1, [], + Limits, Queue, QueueLen); {stop, Reason, NStateData} -> terminate(Reason, Name, Msg, Mod, StateName, NStateData, []); - {stop, Reason, Reply, NStateData} when From /= undefined -> + {stop, Reason, Reply, NStateData} when From =/= undefined -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, StateName, NStateData, [])), reply(From, Reply), @@ -483,30 +549,30 @@ handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time, Limits) -> %No d end. handle_msg(Msg, Parent, Name, StateName, StateData, - Mod, _Time, Debug, Limits) -> + Mod, _Time, Debug, Limits, Queue, QueueLen) -> From = from(Msg), case catch dispatch(Msg, Mod, StateName, StateData) of {next_state, NStateName, NStateData} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, {Name, NStateName}, return), loop(Parent, Name, NStateName, NStateData, - Mod, infinity, Debug1, Limits); + Mod, infinity, Debug1, Limits, Queue, QueueLen); {next_state, NStateName, NStateData, Time1} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, {Name, NStateName}, return), loop(Parent, Name, NStateName, NStateData, - Mod, Time1, Debug1, Limits); - {reply, Reply, NStateName, NStateData} when From /= undefined -> + Mod, Time1, Debug1, Limits, Queue, QueueLen); + {reply, Reply, NStateName, NStateData} when From =/= undefined -> Debug1 = reply(Name, From, Reply, Debug, NStateName), loop(Parent, Name, NStateName, NStateData, - Mod, infinity, Debug1, Limits); - {reply, Reply, NStateName, NStateData, Time1} when From /= undefined -> + Mod, infinity, Debug1, Limits, Queue, QueueLen); + {reply, Reply, NStateName, NStateData, Time1} when From =/= undefined -> Debug1 = reply(Name, From, Reply, Debug, NStateName), loop(Parent, Name, NStateName, NStateData, - Mod, Time1, Debug1, Limits); + Mod, Time1, Debug1, Limits, Queue, QueueLen); {stop, Reason, NStateData} -> terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug); - {stop, Reason, Reply, NStateData} when From /= undefined -> + {stop, Reason, Reply, NStateData} when From =/= undefined -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug)), reply(Name, From, Reply, Debug, StateName), @@ -671,13 +737,13 @@ limit_options([_|Options], Limits) -> %% Throw max_queue if we have reach the max queue size %% Returns ok otherwise -message_queue_len(#limits{max_queue = undefined}) -> +message_queue_len(#limits{max_queue = undefined}, _QueueLen) -> ok; -message_queue_len(#limits{max_queue = MaxQueue}) -> +message_queue_len(#limits{max_queue = MaxQueue}, QueueLen) -> Pid = self(), case process_info(Pid, message_queue_len) of - {message_queue_len, N} when N > MaxQueue -> - throw({process_limit, {max_queue, N}}); + {message_queue_len, N} when N + QueueLen > MaxQueue -> + throw({process_limit, {max_queue, N + QueueLen}}); _ -> ok end.