From 4c2e7e38a1170bacad0ecbe0244d048d11fd355c Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Fri, 2 Jul 2010 20:31:42 +1000 Subject: [PATCH] Use ets insead of asking supervisor in ejabberd_odbc_sup:get_pids/1 (Thanks to Alexey Shchepin) --- src/odbc/ejabberd_odbc.erl | 2 + src/odbc/ejabberd_odbc_sup.erl | 41 +++++++++++++++----- src/p1_fsm.erl | 70 ++++++++++++++++++++++++++++++---- 3 files changed, 97 insertions(+), 16 deletions(-) diff --git a/src/odbc/ejabberd_odbc.erl b/src/odbc/ejabberd_odbc.erl index d7cdd0371..1b07fd6d9 100644 --- a/src/odbc/ejabberd_odbc.erl +++ b/src/odbc/ejabberd_odbc.erl @@ -178,6 +178,7 @@ init([Host, StartInterval]) -> end, [DBType | _] = db_opts(Host), ?GEN_FSM:send_event(self(), connect), + ejabberd_odbc_sup:add_pid(Host, self()), {ok, connecting, #state{db_type = DBType, host = Host, max_pending_requests_len = max_fsm_queue(), @@ -274,6 +275,7 @@ handle_info(Info, StateName, State) -> {next_state, StateName, State}. terminate(_Reason, _StateName, State) -> + ejabberd_odbc_sup:remove_pid(State#state.host, self()), case State#state.db_type of mysql -> %% old versions of mysql driver don't have the stop function diff --git a/src/odbc/ejabberd_odbc_sup.erl b/src/odbc/ejabberd_odbc_sup.erl index d828449ec..45ede1835 100644 --- a/src/odbc/ejabberd_odbc_sup.erl +++ b/src/odbc/ejabberd_odbc_sup.erl @@ -30,6 +30,8 @@ %% API -export([start_link/1, init/1, + add_pid/2, + remove_pid/2, get_pids/1, get_random_pid/1 ]). @@ -44,7 +46,19 @@ -define(CONNECT_TIMEOUT, 500). % milliseconds +-record(sql_pool, {host, pid}). + start_link(Host) -> + mnesia:create_table(sql_pool, + [{ram_copies, [node()]}, + {type, bag}, + {local_content, true}, + {attributes, record_info(fields, sql_pool)}]), + mnesia:add_table_copy(local_config, node(), ram_copies), + F = fun() -> + mnesia:delete({sql_pool, Host}) + end, + mnesia:ets(F), supervisor:start_link({local, gen_mod:get_module_proc(Host, ?MODULE)}, ?MODULE, [Host]). @@ -86,16 +100,25 @@ init([Host]) -> end, lists:seq(1, PoolSize))}}. get_pids(Host) -> - Proc = gen_mod:get_module_proc(Host, ?MODULE), - - % throw an exception if supervisor is not ready (i.e. if it cannot - % start its children, if the database is down for example) - sys:get_status(Proc, ?CONNECT_TIMEOUT), - - [Child || - {_Id, Child, _Type, _Modules} <- supervisor:which_children(Proc), - Child /= undefined]. + Rs = mnesia:dirty_read(sql_pool, Host), + [R#sql_pool.pid || R <- Rs]. get_random_pid(Host) -> Pids = get_pids(Host), lists:nth(erlang:phash(now(), length(Pids)), Pids). + +add_pid(Host, Pid) -> + F = fun() -> + mnesia:write( + #sql_pool{host = Host, + pid = Pid}) + end, + mnesia:ets(F). + +remove_pid(Host, Pid) -> + F = fun() -> + mnesia:delete_object( + #sql_pool{host = Host, + pid = Pid}) + end, + mnesia:ets(F). diff --git a/src/p1_fsm.erl b/src/p1_fsm.erl index 03ff7f8ce..9ca924112 100644 --- a/src/p1_fsm.erl +++ b/src/p1_fsm.erl @@ -517,6 +517,25 @@ print_event(Dev, return, {Name, StateName}) -> io:format(Dev, "*DBG* ~p switched to state ~w~n", [Name, StateName]). +relay_messages(MRef, TRef, Clone, Queue) -> + lists:foreach( + fun(Msg) -> Clone ! Msg end, + queue:to_list(Queue)), + relay_messages(MRef, TRef, Clone). + +relay_messages(MRef, TRef, Clone) -> + receive + {'DOWN', MRef, process, Clone, Reason} -> + Reason; + {'EXIT', _Parent, _Reason} -> + {migrated, Clone}; + {timeout, TRef, timeout} -> + {migrated, Clone}; + Msg -> + Clone ! Msg, + relay_messages(MRef, TRef, Clone) + end. + handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time, Limits, Queue, QueueLen) -> %No debug here From = from(Msg), @@ -535,6 +554,23 @@ handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time, reply(From, Reply), loop(Parent, Name, NStateName, NStateData, Mod, Time1, [], Limits, Queue, QueueLen); + {migrate, NStateData, {Node, M, F, A}, Time1} -> + Reason = case catch rpc:call(Node, M, F, A, 5000) of + {badrpc, _} = Err -> + {migration_error, Err}; + {'EXIT', _} = Err -> + {migration_error, Err}; + {error, _} = Err -> + {migration_error, Err}; + {ok, Clone} -> + process_flag(trap_exit, true), + MRef = erlang:monitor(process, Clone), + TRef = erlang:start_timer(Time1, self(), timeout), + relay_messages(MRef, TRef, Clone, Queue); + Reply -> + {migration_error, {bad_reply, Reply}} + end, + terminate(Reason, Name, Msg, Mod, StateName, NStateData, []); {stop, Reason, NStateData} -> terminate(Reason, Name, Msg, Mod, StateName, NStateData, []); {stop, Reason, Reply, NStateData} when From =/= undefined -> @@ -571,6 +607,23 @@ handle_msg(Msg, Parent, Name, StateName, StateData, Debug1 = reply(Name, From, Reply, Debug, NStateName), loop(Parent, Name, NStateName, NStateData, Mod, Time1, Debug1, Limits, Queue, QueueLen); + {migrate, NStateData, {Node, M, F, A}, Time1} -> + Reason = case catch rpc:call(Node, M, F, A, Time1) of + {badrpc, R} -> + {migration_error, R}; + {'EXIT', R} -> + {migration_error, R}; + {error, R} -> + {migration_error, R}; + {ok, Clone} -> + process_flag(trap_exit, true), + MRef = erlang:monitor(process, Clone), + TRef = erlang:start_timer(Time1, self(), timeout), + relay_messages(MRef, TRef, Clone, Queue); + Reply -> + {migration_error, {bad_reply, Reply}} + end, + terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug); {stop, Reason, NStateData} -> terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug); {stop, Reason, Reply, NStateData} when From =/= undefined -> @@ -633,12 +686,10 @@ terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug) -> %% Priority shutdown should be considered as %% shutdown by SASL exit(shutdown); - {process_limit, Limit} -> - %% Priority shutdown should be considered as - %% shutdown by SASL - error_logger:error_msg("FSM limit reached (~p): ~p~n", - [self(), Limit]), - exit(shutdown); + {process_limit, _Limit} -> + exit(Reason); + {migrated, _Clone} -> + exit(normal); _ -> error_info(Mod, Reason, Name, Msg, StateName, StateData, Debug), exit(Reason) @@ -705,7 +756,12 @@ get_msg(Msg) -> Msg. format_status(Opt, StatusData) -> [PDict, SysState, Parent, Debug, [Name, StateName, StateData, Mod, _Time]] = StatusData, - Header = lists:concat(["Status for state machine ", Name]), + NameTag = if is_pid(Name) -> + pid_to_list(Name); + is_atom(Name) -> + Name + end, + Header = lists:concat(["Status for state machine ", NameTag]), Log = sys:get_debug(log, Debug, []), Specfic = case erlang:function_exported(Mod, format_status, 2) of