EJAB-940: Implements reliable ODBC transaction nesting.

SVN Revision: 2397
This commit is contained in:
Geoff Cant 2009-07-28 13:46:28 +00:00
parent 1262b0e353
commit bf10d1e956
1 changed files with 139 additions and 111 deletions

View File

@ -52,6 +52,8 @@
-record(state, {db_ref, db_type}). -record(state, {db_ref, db_type}).
-define(STATE_KEY, ejabberd_odbc_state). -define(STATE_KEY, ejabberd_odbc_state).
-define(NESTING_KEY, ejabberd_odbc_nesting_level).
-define(TOP_LEVEL_TXN, 0).
-define(MAX_TRANSACTION_RESTARTS, 10). -define(MAX_TRANSACTION_RESTARTS, 10).
-define(PGSQL_PORT, 5432). -define(PGSQL_PORT, 5432).
-define(MYSQL_PORT, 3306). -define(MYSQL_PORT, 3306).
@ -70,8 +72,7 @@ start_link(Host, StartInterval) ->
gen_server:start_link(ejabberd_odbc, [Host, StartInterval], []). gen_server:start_link(ejabberd_odbc, [Host, StartInterval], []).
sql_query(Host, Query) -> sql_query(Host, Query) ->
Msg = {sql_query, Query}, sql_call(Host, {sql_query, Query}).
sql_call(Host, Msg).
%% SQL transaction based on a list of queries %% SQL transaction based on a list of queries
%% This function automatically %% This function automatically
@ -84,34 +85,27 @@ sql_transaction(Host, Queries) when is_list(Queries) ->
end, end,
sql_transaction(Host, F); sql_transaction(Host, F);
%% SQL transaction, based on a erlang anonymous function (F = fun) %% SQL transaction, based on a erlang anonymous function (F = fun)
sql_transaction(Host, F) -> sql_transaction(Host, F) when is_function(F) ->
Msg = {sql_transaction, F}, sql_call(Host, {sql_transaction, F}).
sql_call(Host, Msg).
%% SQL bloc, based on a erlang anonymous function (F = fun) %% SQL bloc, based on a erlang anonymous function (F = fun)
sql_bloc(Host, F) -> sql_bloc(Host, F) ->
Msg = {sql_bloc, F}, sql_call(Host, {sql_bloc, F}).
sql_call(Host, Msg).
sql_call(Host, Msg) -> sql_call(Host, Msg) ->
case get(?STATE_KEY) of case get(?STATE_KEY) of
undefined -> undefined ->
gen_server:call(ejabberd_odbc_sup:get_random_pid(Host), gen_server:call(ejabberd_odbc_sup:get_random_pid(Host),
Msg, ?TRANSACTION_TIMEOUT); {sql_cmd, Msg}, ?TRANSACTION_TIMEOUT);
State -> _State ->
%% Query, Transaction or Bloc nested inside transaction nested_op(Msg)
nested_op(Msg, State)
end. end.
%% This function is intended to be used from inside an sql_transaction: %% This function is intended to be used from inside an sql_transaction:
sql_query_t(Query) -> sql_query_t(Query) ->
State = get(?STATE_KEY), QRes = sql_query_internal(Query),
QRes = sql_query_internal(State, Query),
case QRes of case QRes of
{error, "No SQL-driver information available."} ->
% workaround for odbc bug
{updated, 0};
{error, Reason} -> {error, Reason} ->
throw({aborted, Reason}); throw({aborted, Reason});
Rs when is_list(Rs) -> Rs when is_list(Rs) ->
@ -184,8 +178,13 @@ init([Host, StartInterval]) ->
%% {stop, Reason, Reply, State} | (terminate/2 is called) %% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called) %% {stop, Reason, State} (terminate/2 is called)
%%---------------------------------------------------------------------- %%----------------------------------------------------------------------
handle_call(Command, _From, State) -> handle_call({sql_cmd, Command}, _From, State) ->
dispatch_sql_command(Command, State). put(?NESTING_KEY, ?TOP_LEVEL_TXN),
put(?STATE_KEY, State),
abort_on_driver_error(outer_op(Command));
handle_call(Request, {Who, _Ref}, State) ->
?WARNING_MSG("Unexpected call ~p from ~p.", [Request, Who]),
{reply, ok, State}.
%%---------------------------------------------------------------------- %%----------------------------------------------------------------------
%% Func: handle_cast/2 %% Func: handle_cast/2
@ -233,110 +232,139 @@ terminate(_Reason, State) ->
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
%%% Internal functions %%% Internal functions
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
dispatch_sql_command({sql_query, Query}, State) ->
abort_on_driver_error(sql_query_internal(State, Query), State);
dispatch_sql_command({sql_transaction, F}, State) ->
abort_on_driver_error(
execute_transaction(State, F, ?MAX_TRANSACTION_RESTARTS, ""), State);
dispatch_sql_command({sql_bloc, F}, State) ->
abort_on_driver_error(execute_bloc(State, F), State);
dispatch_sql_command(Request, State) ->
?WARNING_MSG("Unexpected call ~p.", [Request]),
{reply, ok, State}.
sql_query_internal(State, Query) -> %% Only called by handle_call, only handles top level operations.
Nested = case get(?STATE_KEY) of %% @spec outer_op(Op) -> {error, Reason} | {aborted, Reason} | {atomic, Result}
undefined -> put(?STATE_KEY, State), false; outer_op({sql_query, Query}) ->
_State -> true sql_query_internal(Query);
end, outer_op({sql_transaction, F}) ->
Result = case State#state.db_type of outer_transaction(F, ?MAX_TRANSACTION_RESTARTS, "");
odbc -> outer_op({sql_bloc, F}) ->
odbc:sql_query(State#state.db_ref, Query); execute_bloc(F).
pgsql ->
pgsql_to_odbc(pgsql:squery(State#state.db_ref, Query)); %% Called via sql_query/transaction/bloc from client code when inside a
mysql -> %% nested operation
?DEBUG("MySQL, Send query~n~p~n", [Query]), nested_op({sql_query, Query}) ->
R = mysql_to_odbc(mysql_conn:fetch(State#state.db_ref, Query, self())), %% XXX - use sql_query_t here insted? Most likely would break
?DEBUG("MySQL, Received result~n~p~n", [R]), %% callers who expect {error, _} tuples (sql_query_t turns
R %% these into throws)
end, sql_query_internal(Query);
case Nested of nested_op({sql_transaction, F}) ->
true -> Result; NestingLevel = get(?NESTING_KEY),
false -> erase(?STATE_KEY), Result if NestingLevel =:= ?TOP_LEVEL_TXN ->
%% First transaction inside a (series of) sql_blocs
outer_transaction(F, ?MAX_TRANSACTION_RESTARTS, "");
true ->
%% Transaction inside a transaction
inner_transaction(F)
end;
nested_op({sql_bloc, F}) ->
execute_bloc(F).
%% Never retry nested transactions - only outer transactions
inner_transaction(F) ->
PreviousNestingLevel = get(?NESTING_KEY),
case get(?NESTING_KEY) of
?TOP_LEVEL_TXN ->
{backtrace, T} = process_info(self(), backtrace),
?ERROR_MSG("inner transaction called at outer txn level. Trace: ~s", [T]),
erlang:exit(implementation_faulty);
_N -> ok
end,
put(?NESTING_KEY, PreviousNestingLevel + 1),
Result = (catch F()),
put(?NESTING_KEY, PreviousNestingLevel),
case Result of
{aborted, Reason} ->
{aborted, Reason};
{'EXIT', Reason} ->
{'EXIT', Reason};
{atomic, Res} ->
{atomic, Res};
Res ->
{atomic, Res}
end. end.
execute_transaction(State, _F, 0, Reason) -> outer_transaction(F, NRestarts, _Reason) ->
?ERROR_MSG("SQL transaction restarts exceeded~n" PreviousNestingLevel = get(?NESTING_KEY),
"** Restarts: ~p~n" case get(?NESTING_KEY) of
"** Last abort reason: ~p~n" ?TOP_LEVEL_TXN ->
"** Stacktrace: ~p~n" ok;
"** When State == ~p", _N ->
[?MAX_TRANSACTION_RESTARTS, Reason, {backtrace, T} = process_info(self(), backtrace),
erlang:get_stacktrace(), State]), ?ERROR_MSG("outer transaction called at inner txn level. Trace: ~s", [T]),
sql_query_internal(State, "rollback;"), erlang:exit(implementation_faulty)
{aborted, restarts_exceeded}; end,
execute_transaction(State, F, NRestarts, _Reason) -> sql_query_internal("begin;"),
Nested = case get(?STATE_KEY) of put(?NESTING_KEY, PreviousNestingLevel + 1),
undefined -> Result = (catch F()),
put(?STATE_KEY, State), put(?NESTING_KEY, PreviousNestingLevel),
sql_query_internal(State, "begin;"), case Result of
false; {aborted, Reason} when NRestarts > 0 ->
_State -> %% Retry outer transaction upto NRestarts times.
true sql_query_internal("rollback;"),
end, outer_transaction(F, NRestarts - 1, Reason);
Result = case catch F() of {aborted, Reason} when NRestarts =:= 0 ->
{aborted, Reason} -> %% Too many retries of outer transaction.
execute_transaction(State, F, NRestarts - 1, Reason); ?ERROR_MSG("SQL transaction restarts exceeded~n"
{'EXIT', Reason} -> "** Restarts: ~p~n"
sql_query_internal(State, "rollback;"), "** Last abort reason: ~p~n"
{aborted, Reason}; "** Stacktrace: ~p~n"
Res -> "** When State == ~p",
{atomic, Res} [?MAX_TRANSACTION_RESTARTS, Reason,
end, erlang:get_stacktrace(), get(?STATE_KEY)]),
case Nested of sql_query_internal("rollback;"),
true -> Result; {aborted, Reason};
false -> sql_query_internal(State, "commit;"), erase(?STATE_KEY), Result {'EXIT', Reason} ->
%% Abort sql transaction on EXIT from outer txn only.
sql_query_internal("rollback;"),
{aborted, Reason};
Res ->
%% Commit successful outer txn
sql_query_internal("commit;"),
{atomic, Res}
end. end.
execute_bloc(State, F) -> execute_bloc(F) ->
Nested = case get(?STATE_KEY) of %% We don't alter ?NESTING_KEY here as only SQL transactions alter txn nesting
undefined -> put(?STATE_KEY, State), false; case catch F() of
_State -> true {aborted, Reason} ->
end, {aborted, Reason};
Result = case catch F() of {'EXIT', Reason} ->
{aborted, Reason} -> {aborted, Reason};
{aborted, Reason}; Res ->
{'EXIT', Reason} -> {atomic, Res}
{aborted, Reason};
Res ->
{atomic, Res}
end,
case Nested of
true -> Result;
false -> erase(?STATE_KEY), Result
end. end.
nested_op(Op, State) -> sql_query_internal(Query) ->
case dispatch_sql_command(Op, State) of State = get(?STATE_KEY),
{reply, Res, NewState} -> Res = case State#state.db_type of
put(?STATE_KEY, NewState), odbc ->
Res; odbc:sql_query(State#state.db_ref, Query);
{stop, _Reason, Reply, NewState} -> pgsql ->
put(?STATE_KEY, NewState), pgsql_to_odbc(pgsql:squery(State#state.db_ref, Query));
throw({aborted, Reply}); mysql ->
{noreply, NewState} -> ?DEBUG("MySQL, Send query~n~p~n", [Query]),
put(?STATE_KEY, NewState), R = mysql_to_odbc(mysql_conn:fetch(State#state.db_ref, Query, self())),
exit({bad_op_in_nested_txn, Op}) ?INFO_MSG("MySQL, Received result~n~p~n", [R]),
R
end,
case Res of
{error, "No SQL-driver information available."} ->
% workaround for odbc bug
{updated, 0};
_Else -> Res
end. end.
abort_on_driver_error({error, "query timed out"} = Reply, State) -> %% Generate the OTP callback return tuple depending on the driver result.
abort_on_driver_error({error, "query timed out"} = Reply) ->
%% mysql driver error %% mysql driver error
{stop, timeout, Reply, State}; {stop, timeout, Reply, get(?STATE_KEY)};
abort_on_driver_error({error, "Failed sending data on socket"++_} = Reply, State) -> abort_on_driver_error({error, "Failed sending data on socket"++_} = Reply) ->
%% mysql driver error %% mysql driver error
{stop, closed, Reply, State}; {stop, closed, Reply, get(?STATE_KEY)};
abort_on_driver_error(Reply, State) -> abort_on_driver_error(Reply) ->
{reply, Reply, State}. {reply, Reply, get(?STATE_KEY)}.
%% == pure ODBC code %% == pure ODBC code