25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-12-24 17:29:28 +01:00

Process ejabberd_sm messages using several dispatchers (EJABS-1653)

This commit is contained in:
Evgeniy Khramtsov 2011-04-07 22:52:50 +10:00
parent a97a60a888
commit 38693a670b

View File

@ -384,7 +384,7 @@ init([]) ->
ejabberd_sm, disconnect_removed_user, 100) ejabberd_sm, disconnect_removed_user, 100)
end, ?MYHOSTS), end, ?MYHOSTS),
ejabberd_commands:register_commands(commands()), ejabberd_commands:register_commands(commands()),
start_dispatchers(),
{ok, #state{}}. {ok, #state{}}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -415,13 +415,19 @@ handle_cast(_Msg, State) ->
%% {stop, Reason, State} %% {stop, Reason, State}
%% Description: Handling all non call/cast messages %% Description: Handling all non call/cast messages
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
handle_info({route, From, To, Packet}, State) -> handle_info({route, From, To, Packet} = Msg, State) ->
case catch do_route(From, To, Packet) of case get_proc_num() of
{'EXIT', Reason} -> N when N > 1 ->
?ERROR_MSG("~p~nwhen processing: ~p", #jid{luser = U, lserver = S} = To,
[Reason, {From, To, Packet}]); get_proc_by_hash({U, S}) ! Msg;
_ -> _ ->
ok case catch do_route(From, To, Packet) of
{'EXIT', Reason} ->
?ERROR_MSG("~p~nwhen processing: ~p",
[Reason, {From, To, Packet}]);
_ ->
ok
end
end, end,
{noreply, State}; {noreply, State};
handle_info({register_iq_handler, Host, XMLNS, Module, Function}, State) -> handle_info({register_iq_handler, Host, XMLNS, Module, Function}, State) ->
@ -453,6 +459,7 @@ terminate(_Reason, _State) ->
ejabberd_hooks:delete(node_up, ?MODULE, node_up, 100), ejabberd_hooks:delete(node_up, ?MODULE, node_up, 100),
ejabberd_hooks:delete(node_hash_update, ?MODULE, migrate, 100), ejabberd_hooks:delete(node_hash_update, ?MODULE, migrate, 100),
ejabberd_commands:unregister_commands(commands()), ejabberd_commands:unregister_commands(commands()),
stop_dispatchers(),
ok. ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -875,6 +882,55 @@ user_resources(User, Server) ->
Resources = get_user_resources(User, Server), Resources = get_user_resources(User, Server),
lists:sort(Resources). lists:sort(Resources).
get_proc_num() ->
erlang:system_info(logical_processors).
get_proc_by_hash(Term) ->
N = erlang:phash2(Term, get_proc_num()) + 1,
get_proc(N).
get_proc(N) ->
list_to_atom(atom_to_list(?MODULE) ++ "_" ++ integer_to_list(N)).
start_dispatchers() ->
case get_proc_num() of
N when N > 1 ->
lists:foreach(
fun(I) ->
Pid = spawn(fun dispatch/0),
erlang:register(get_proc(I), Pid)
end, lists:seq(1, N));
_ ->
ok
end.
stop_dispatchers() ->
case get_proc_num() of
N when N > 1 ->
lists:foreach(
fun(I) ->
get_proc(I) ! stop
end, lists:seq(1, N));
_ ->
ok
end.
dispatch() ->
receive
{route, From, To, Packet} ->
case catch do_route(From, To, Packet) of
{'EXIT', Reason} ->
?ERROR_MSG("~p~nwhen processing: ~p",
[Reason, {From, To, Packet}]);
_ ->
ok
end,
dispatch();
stop ->
stopped;
_ ->
dispatch()
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Update Mnesia tables %%% Update Mnesia tables