Use ets insead of asking supervisor in ejabberd_odbc_sup:get_pids/1 (Thanks to Alexey Shchepin)
This commit is contained in:
parent
fa22b23435
commit
4c2e7e38a1
|
@ -178,6 +178,7 @@ init([Host, StartInterval]) ->
|
||||||
end,
|
end,
|
||||||
[DBType | _] = db_opts(Host),
|
[DBType | _] = db_opts(Host),
|
||||||
?GEN_FSM:send_event(self(), connect),
|
?GEN_FSM:send_event(self(), connect),
|
||||||
|
ejabberd_odbc_sup:add_pid(Host, self()),
|
||||||
{ok, connecting, #state{db_type = DBType,
|
{ok, connecting, #state{db_type = DBType,
|
||||||
host = Host,
|
host = Host,
|
||||||
max_pending_requests_len = max_fsm_queue(),
|
max_pending_requests_len = max_fsm_queue(),
|
||||||
|
@ -274,6 +275,7 @@ handle_info(Info, StateName, State) ->
|
||||||
{next_state, StateName, State}.
|
{next_state, StateName, State}.
|
||||||
|
|
||||||
terminate(_Reason, _StateName, State) ->
|
terminate(_Reason, _StateName, State) ->
|
||||||
|
ejabberd_odbc_sup:remove_pid(State#state.host, self()),
|
||||||
case State#state.db_type of
|
case State#state.db_type of
|
||||||
mysql ->
|
mysql ->
|
||||||
%% old versions of mysql driver don't have the stop function
|
%% old versions of mysql driver don't have the stop function
|
||||||
|
|
|
@ -30,6 +30,8 @@
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/1,
|
-export([start_link/1,
|
||||||
init/1,
|
init/1,
|
||||||
|
add_pid/2,
|
||||||
|
remove_pid/2,
|
||||||
get_pids/1,
|
get_pids/1,
|
||||||
get_random_pid/1
|
get_random_pid/1
|
||||||
]).
|
]).
|
||||||
|
@ -44,7 +46,19 @@
|
||||||
-define(CONNECT_TIMEOUT, 500). % milliseconds
|
-define(CONNECT_TIMEOUT, 500). % milliseconds
|
||||||
|
|
||||||
|
|
||||||
|
-record(sql_pool, {host, pid}).
|
||||||
|
|
||||||
start_link(Host) ->
|
start_link(Host) ->
|
||||||
|
mnesia:create_table(sql_pool,
|
||||||
|
[{ram_copies, [node()]},
|
||||||
|
{type, bag},
|
||||||
|
{local_content, true},
|
||||||
|
{attributes, record_info(fields, sql_pool)}]),
|
||||||
|
mnesia:add_table_copy(local_config, node(), ram_copies),
|
||||||
|
F = fun() ->
|
||||||
|
mnesia:delete({sql_pool, Host})
|
||||||
|
end,
|
||||||
|
mnesia:ets(F),
|
||||||
supervisor:start_link({local, gen_mod:get_module_proc(Host, ?MODULE)},
|
supervisor:start_link({local, gen_mod:get_module_proc(Host, ?MODULE)},
|
||||||
?MODULE, [Host]).
|
?MODULE, [Host]).
|
||||||
|
|
||||||
|
@ -86,16 +100,25 @@ init([Host]) ->
|
||||||
end, lists:seq(1, PoolSize))}}.
|
end, lists:seq(1, PoolSize))}}.
|
||||||
|
|
||||||
get_pids(Host) ->
|
get_pids(Host) ->
|
||||||
Proc = gen_mod:get_module_proc(Host, ?MODULE),
|
Rs = mnesia:dirty_read(sql_pool, Host),
|
||||||
|
[R#sql_pool.pid || R <- Rs].
|
||||||
% throw an exception if supervisor is not ready (i.e. if it cannot
|
|
||||||
% start its children, if the database is down for example)
|
|
||||||
sys:get_status(Proc, ?CONNECT_TIMEOUT),
|
|
||||||
|
|
||||||
[Child ||
|
|
||||||
{_Id, Child, _Type, _Modules} <- supervisor:which_children(Proc),
|
|
||||||
Child /= undefined].
|
|
||||||
|
|
||||||
get_random_pid(Host) ->
|
get_random_pid(Host) ->
|
||||||
Pids = get_pids(Host),
|
Pids = get_pids(Host),
|
||||||
lists:nth(erlang:phash(now(), length(Pids)), Pids).
|
lists:nth(erlang:phash(now(), length(Pids)), Pids).
|
||||||
|
|
||||||
|
add_pid(Host, Pid) ->
|
||||||
|
F = fun() ->
|
||||||
|
mnesia:write(
|
||||||
|
#sql_pool{host = Host,
|
||||||
|
pid = Pid})
|
||||||
|
end,
|
||||||
|
mnesia:ets(F).
|
||||||
|
|
||||||
|
remove_pid(Host, Pid) ->
|
||||||
|
F = fun() ->
|
||||||
|
mnesia:delete_object(
|
||||||
|
#sql_pool{host = Host,
|
||||||
|
pid = Pid})
|
||||||
|
end,
|
||||||
|
mnesia:ets(F).
|
||||||
|
|
|
@ -517,6 +517,25 @@ print_event(Dev, return, {Name, StateName}) ->
|
||||||
io:format(Dev, "*DBG* ~p switched to state ~w~n",
|
io:format(Dev, "*DBG* ~p switched to state ~w~n",
|
||||||
[Name, StateName]).
|
[Name, StateName]).
|
||||||
|
|
||||||
|
relay_messages(MRef, TRef, Clone, Queue) ->
|
||||||
|
lists:foreach(
|
||||||
|
fun(Msg) -> Clone ! Msg end,
|
||||||
|
queue:to_list(Queue)),
|
||||||
|
relay_messages(MRef, TRef, Clone).
|
||||||
|
|
||||||
|
relay_messages(MRef, TRef, Clone) ->
|
||||||
|
receive
|
||||||
|
{'DOWN', MRef, process, Clone, Reason} ->
|
||||||
|
Reason;
|
||||||
|
{'EXIT', _Parent, _Reason} ->
|
||||||
|
{migrated, Clone};
|
||||||
|
{timeout, TRef, timeout} ->
|
||||||
|
{migrated, Clone};
|
||||||
|
Msg ->
|
||||||
|
Clone ! Msg,
|
||||||
|
relay_messages(MRef, TRef, Clone)
|
||||||
|
end.
|
||||||
|
|
||||||
handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time,
|
handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time,
|
||||||
Limits, Queue, QueueLen) -> %No debug here
|
Limits, Queue, QueueLen) -> %No debug here
|
||||||
From = from(Msg),
|
From = from(Msg),
|
||||||
|
@ -535,6 +554,23 @@ handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time,
|
||||||
reply(From, Reply),
|
reply(From, Reply),
|
||||||
loop(Parent, Name, NStateName, NStateData, Mod, Time1, [],
|
loop(Parent, Name, NStateName, NStateData, Mod, Time1, [],
|
||||||
Limits, Queue, QueueLen);
|
Limits, Queue, QueueLen);
|
||||||
|
{migrate, NStateData, {Node, M, F, A}, Time1} ->
|
||||||
|
Reason = case catch rpc:call(Node, M, F, A, 5000) of
|
||||||
|
{badrpc, _} = Err ->
|
||||||
|
{migration_error, Err};
|
||||||
|
{'EXIT', _} = Err ->
|
||||||
|
{migration_error, Err};
|
||||||
|
{error, _} = Err ->
|
||||||
|
{migration_error, Err};
|
||||||
|
{ok, Clone} ->
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
MRef = erlang:monitor(process, Clone),
|
||||||
|
TRef = erlang:start_timer(Time1, self(), timeout),
|
||||||
|
relay_messages(MRef, TRef, Clone, Queue);
|
||||||
|
Reply ->
|
||||||
|
{migration_error, {bad_reply, Reply}}
|
||||||
|
end,
|
||||||
|
terminate(Reason, Name, Msg, Mod, StateName, NStateData, []);
|
||||||
{stop, Reason, NStateData} ->
|
{stop, Reason, NStateData} ->
|
||||||
terminate(Reason, Name, Msg, Mod, StateName, NStateData, []);
|
terminate(Reason, Name, Msg, Mod, StateName, NStateData, []);
|
||||||
{stop, Reason, Reply, NStateData} when From =/= undefined ->
|
{stop, Reason, Reply, NStateData} when From =/= undefined ->
|
||||||
|
@ -571,6 +607,23 @@ handle_msg(Msg, Parent, Name, StateName, StateData,
|
||||||
Debug1 = reply(Name, From, Reply, Debug, NStateName),
|
Debug1 = reply(Name, From, Reply, Debug, NStateName),
|
||||||
loop(Parent, Name, NStateName, NStateData,
|
loop(Parent, Name, NStateName, NStateData,
|
||||||
Mod, Time1, Debug1, Limits, Queue, QueueLen);
|
Mod, Time1, Debug1, Limits, Queue, QueueLen);
|
||||||
|
{migrate, NStateData, {Node, M, F, A}, Time1} ->
|
||||||
|
Reason = case catch rpc:call(Node, M, F, A, Time1) of
|
||||||
|
{badrpc, R} ->
|
||||||
|
{migration_error, R};
|
||||||
|
{'EXIT', R} ->
|
||||||
|
{migration_error, R};
|
||||||
|
{error, R} ->
|
||||||
|
{migration_error, R};
|
||||||
|
{ok, Clone} ->
|
||||||
|
process_flag(trap_exit, true),
|
||||||
|
MRef = erlang:monitor(process, Clone),
|
||||||
|
TRef = erlang:start_timer(Time1, self(), timeout),
|
||||||
|
relay_messages(MRef, TRef, Clone, Queue);
|
||||||
|
Reply ->
|
||||||
|
{migration_error, {bad_reply, Reply}}
|
||||||
|
end,
|
||||||
|
terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug);
|
||||||
{stop, Reason, NStateData} ->
|
{stop, Reason, NStateData} ->
|
||||||
terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug);
|
terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug);
|
||||||
{stop, Reason, Reply, NStateData} when From =/= undefined ->
|
{stop, Reason, Reply, NStateData} when From =/= undefined ->
|
||||||
|
@ -633,12 +686,10 @@ terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug) ->
|
||||||
%% Priority shutdown should be considered as
|
%% Priority shutdown should be considered as
|
||||||
%% shutdown by SASL
|
%% shutdown by SASL
|
||||||
exit(shutdown);
|
exit(shutdown);
|
||||||
{process_limit, Limit} ->
|
{process_limit, _Limit} ->
|
||||||
%% Priority shutdown should be considered as
|
exit(Reason);
|
||||||
%% shutdown by SASL
|
{migrated, _Clone} ->
|
||||||
error_logger:error_msg("FSM limit reached (~p): ~p~n",
|
exit(normal);
|
||||||
[self(), Limit]),
|
|
||||||
exit(shutdown);
|
|
||||||
_ ->
|
_ ->
|
||||||
error_info(Mod, Reason, Name, Msg, StateName, StateData, Debug),
|
error_info(Mod, Reason, Name, Msg, StateName, StateData, Debug),
|
||||||
exit(Reason)
|
exit(Reason)
|
||||||
|
@ -705,7 +756,12 @@ get_msg(Msg) -> Msg.
|
||||||
format_status(Opt, StatusData) ->
|
format_status(Opt, StatusData) ->
|
||||||
[PDict, SysState, Parent, Debug, [Name, StateName, StateData, Mod, _Time]] =
|
[PDict, SysState, Parent, Debug, [Name, StateName, StateData, Mod, _Time]] =
|
||||||
StatusData,
|
StatusData,
|
||||||
Header = lists:concat(["Status for state machine ", Name]),
|
NameTag = if is_pid(Name) ->
|
||||||
|
pid_to_list(Name);
|
||||||
|
is_atom(Name) ->
|
||||||
|
Name
|
||||||
|
end,
|
||||||
|
Header = lists:concat(["Status for state machine ", NameTag]),
|
||||||
Log = sys:get_debug(log, Debug, []),
|
Log = sys:get_debug(log, Debug, []),
|
||||||
Specfic =
|
Specfic =
|
||||||
case erlang:function_exported(Mod, format_status, 2) of
|
case erlang:function_exported(Mod, format_status, 2) of
|
||||||
|
|
Loading…
Reference in New Issue