Improve SQL timeouts handling

Also improve some formatting
This commit is contained in:
Evgeny Khramtsov 2019-07-31 10:39:53 +03:00
parent 651d69fb98
commit bbee13f970
1 changed files with 79 additions and 78 deletions

View File

@ -70,45 +70,27 @@
-include("ejabberd_stacktrace.hrl"). -include("ejabberd_stacktrace.hrl").
-record(state, -record(state,
{db_ref = self() :: pid(), {db_ref :: undefined | pid(),
db_type = odbc :: pgsql | mysql | sqlite | odbc | mssql, db_type = odbc :: pgsql | mysql | sqlite | odbc | mssql,
db_version = undefined :: undefined | non_neg_integer(), db_version :: undefined | non_neg_integer(),
host = <<"">> :: binary(), host :: binary(),
pending_requests :: p1_queue:queue()}). pending_requests :: p1_queue:queue(),
overload_reported :: undefined | integer()}).
-define(STATE_KEY, ejabberd_sql_state). -define(STATE_KEY, ejabberd_sql_state).
-define(NESTING_KEY, ejabberd_sql_nesting_level). -define(NESTING_KEY, ejabberd_sql_nesting_level).
-define(TOP_LEVEL_TXN, 0). -define(TOP_LEVEL_TXN, 0).
-define(MAX_TRANSACTION_RESTARTS, 10). -define(MAX_TRANSACTION_RESTARTS, 10).
-define(KEEPALIVE_QUERY, [<<"SELECT 1;">>]). -define(KEEPALIVE_QUERY, [<<"SELECT 1;">>]).
-define(PREPARE_KEY, ejabberd_sql_prepare). -define(PREPARE_KEY, ejabberd_sql_prepare).
%%-define(DBGFSM, true). %%-define(DBGFSM, true).
-ifdef(DBGFSM). -ifdef(DBGFSM).
-define(FSMOPTS, [{debug, [trace]}]). -define(FSMOPTS, [{debug, [trace]}]).
-else. -else.
-define(FSMOPTS, []). -define(FSMOPTS, []).
-endif. -endif.
%%%---------------------------------------------------------------------- -type state() :: #state{}.
%%% API
%%%----------------------------------------------------------------------
-spec start_link(binary(), pos_integer()) -> {ok, pid()} | {error, term()}.
start_link(Host, I) ->
Proc = binary_to_atom(get_worker_name(Host, I), utf8),
p1_fsm:start_link({local, Proc}, ?MODULE, [Host],
fsm_limit_opts() ++ ?FSMOPTS).
-type sql_query_simple() :: [sql_query() | binary()] | #sql_query{} | -type sql_query_simple() :: [sql_query() | binary()] | #sql_query{} |
fun(() -> any()) | fun((atom(), _) -> any()). fun(() -> any()) | fun((atom(), _) -> any()).
-type sql_query() :: sql_query_simple() | -type sql_query() :: sql_query_simple() |
@ -119,8 +101,16 @@ start_link(Host, I) ->
{selected, [any()]} | {selected, [any()]} |
ok. ok.
-spec sql_query(binary(), sql_query()) -> sql_query_result(). %%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
-spec start_link(binary(), pos_integer()) -> {ok, pid()} | {error, term()}.
start_link(Host, I) ->
Proc = binary_to_atom(get_worker_name(Host, I), utf8),
p1_fsm:start_link({local, Proc}, ?MODULE, [Host],
fsm_limit_opts() ++ ?FSMOPTS).
-spec sql_query(binary(), sql_query()) -> sql_query_result().
sql_query(Host, Query) -> sql_query(Host, Query) ->
sql_call(Host, {sql_query, Query}). sql_call(Host, {sql_query, Query}).
@ -129,7 +119,6 @@ sql_query(Host, Query) ->
-spec sql_transaction(binary(), [sql_query()] | fun(() -> any())) -> -spec sql_transaction(binary(), [sql_query()] | fun(() -> any())) ->
{atomic, any()} | {atomic, any()} |
{aborted, any()}. {aborted, any()}.
sql_transaction(Host, Queries) sql_transaction(Host, Queries)
when is_list(Queries) -> when is_list(Queries) ->
F = fun () -> F = fun () ->
@ -149,26 +138,27 @@ sql_transaction(Host, F) when is_function(F) ->
sql_bloc(Host, F) -> sql_call(Host, {sql_bloc, F}). sql_bloc(Host, F) -> sql_call(Host, {sql_bloc, F}).
sql_call(Host, Msg) -> sql_call(Host, Msg) ->
Timeout = query_timeout(Host),
case get(?STATE_KEY) of case get(?STATE_KEY) of
undefined -> undefined ->
Proc = get_worker(Host), Proc = get_worker(Host),
sync_send_event(Proc, {sql_cmd, Msg, sync_send_event(Proc, {sql_cmd, Msg, current_time() + Timeout},
erlang:monotonic_time(millisecond)}, Timeout);
query_timeout(Host));
_State -> _State ->
nested_op(Msg) nested_op(Msg)
end. end.
keep_alive(Host, Proc) -> keep_alive(Host, Proc) ->
case sync_send_event(Proc, Timeout = query_timeout(Host),
{sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, case sync_send_event(
erlang:monotonic_time(millisecond)}, Proc,
query_timeout(Host)) of {sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, current_time() + Timeout},
Timeout) of
{selected,_,[[<<"1">>]]} -> {selected,_,[[<<"1">>]]} ->
ok; ok;
_Err -> _Err ->
?ERROR_MSG("Keep alive query failed, closing connection: ~p", [_Err]), ?ERROR_MSG("Keep alive query failed, closing connection: ~p", [_Err]),
sync_send_event(Proc, force_timeout, query_timeout(Host)) sync_send_event(Proc, force_timeout, Timeout)
end. end.
sync_send_event(Proc, Msg, Timeout) -> sync_send_event(Proc, Msg, Timeout) ->
@ -178,15 +168,14 @@ sync_send_event(Proc, Msg, Timeout) ->
end. end.
-spec sql_query_t(sql_query()) -> sql_query_result(). -spec sql_query_t(sql_query()) -> sql_query_result().
%% This function is intended to be used from inside an sql_transaction: %% This function is intended to be used from inside an sql_transaction:
sql_query_t(Query) -> sql_query_t(Query) ->
QRes = sql_query_internal(Query), QRes = sql_query_internal(Query),
case QRes of case QRes of
{error, Reason} -> throw({aborted, Reason}); {error, Reason} -> restart(Reason);
Rs when is_list(Rs) -> Rs when is_list(Rs) ->
case lists:keysearch(error, 1, Rs) of case lists:keysearch(error, 1, Rs) of
{value, {error, Reason}} -> throw({aborted, Reason}); {value, {error, Reason}} -> restart(Reason);
_ -> QRes _ -> QRes
end; end;
_ -> QRes _ -> QRes
@ -372,11 +361,9 @@ connecting(Event, State) ->
[Event]), [Event]),
{next_state, connecting, State}. {next_state, connecting, State}.
connecting({sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, connecting({sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, Timestamp},
_Timestamp},
From, State) -> From, State) ->
p1_fsm:reply(From, reply(From, {error, <<"SQL connection failed">>}, Timestamp),
{error, <<"SQL connection failed">>}),
{next_state, connecting, State}; {next_state, connecting, State};
connecting({sql_cmd, Command, Timestamp} = Req, From, connecting({sql_cmd, Command, Timestamp} = Req, From,
State) -> State) ->
@ -386,10 +373,11 @@ connecting({sql_cmd, Command, Timestamp} = Req, From,
try p1_queue:in({sql_cmd, Command, From, Timestamp}, try p1_queue:in({sql_cmd, Command, From, Timestamp},
State#state.pending_requests) State#state.pending_requests)
catch error:full -> catch error:full ->
Err = <<"SQL request queue is overfilled">>,
?ERROR_MSG("~s, bouncing all pending requests", [Err]),
Q = p1_queue:dropwhile( Q = p1_queue:dropwhile(
fun({sql_cmd, _, To, _Timestamp}) -> fun({sql_cmd, _, To, TS}) ->
p1_fsm:reply( reply(To, {error, Err}, TS),
To, {error, <<"SQL connection failed">>}),
true true
end, State#state.pending_requests), end, State#state.pending_requests),
p1_queue:in({sql_cmd, Command, From, Timestamp}, Q) p1_queue:in({sql_cmd, Command, From, Timestamp}, Q)
@ -399,16 +387,15 @@ connecting({sql_cmd, Command, Timestamp} = Req, From,
connecting(Request, {Who, _Ref}, State) -> connecting(Request, {Who, _Ref}, State) ->
?WARNING_MSG("Unexpected call ~p from ~p in 'connecting'", ?WARNING_MSG("Unexpected call ~p from ~p in 'connecting'",
[Request, Who]), [Request, Who]),
{reply, {error, badarg}, connecting, State}. {next_state, connecting, State}.
session_established({sql_cmd, Command, Timestamp}, From, session_established({sql_cmd, Command, Timestamp}, From,
State) -> State) ->
run_sql_cmd(Command, From, State, Timestamp); run_sql_cmd(Command, From, State, Timestamp);
session_established(Request, {Who, _Ref}, State) -> session_established(Request, {Who, _Ref}, State) ->
?WARNING_MSG("Unexpected call ~p from ~p in 'session_establ" ?WARNING_MSG("Unexpected call ~p from ~p in 'session_established'",
"ished'",
[Request, Who]), [Request, Who]),
{reply, {error, badarg}, session_established, State}. {next_state, session_established, State}.
session_established({sql_cmd, Command, From, Timestamp}, session_established({sql_cmd, Command, From, Timestamp},
State) -> State) ->
@ -465,17 +452,14 @@ handle_reconnect(Reason, #state{host = Host} = State) ->
{next_state, connecting, State}. {next_state, connecting, State}.
run_sql_cmd(Command, From, State, Timestamp) -> run_sql_cmd(Command, From, State, Timestamp) ->
QueryTimeout = query_timeout(State#state.host), case current_time() >= Timestamp of
case erlang:monotonic_time(millisecond) - Timestamp of true ->
Age when Age < QueryTimeout -> State1 = report_overload(State),
put(?NESTING_KEY, ?TOP_LEVEL_TXN), {next_state, session_established, State1};
put(?STATE_KEY, State), false ->
abort_on_driver_error(outer_op(Command), From); put(?NESTING_KEY, ?TOP_LEVEL_TXN),
Age -> put(?STATE_KEY, State),
?ERROR_MSG("Database was not available or too slow, " abort_on_driver_error(outer_op(Command), From, Timestamp)
"discarding ~p milliseconds old request~n~p~n",
[Age, Command]),
{next_state, session_established, State}
end. end.
%% Only called by handle_call, only handles top level operations. %% Only called by handle_call, only handles top level operations.
@ -620,6 +604,8 @@ sql_query_internal(#sql_query{} = Query) ->
{error, <<"killed">>}; {error, <<"killed">>};
exit:{normal, _} -> exit:{normal, _} ->
{error, <<"terminated unexpectedly">>}; {error, <<"terminated unexpectedly">>};
exit:{shutdown, _} ->
{error, <<"shutdown">>};
?EX_RULE(Class, Reason, Stack) -> ?EX_RULE(Class, Reason, Stack) ->
StackTrace = ?EX_STACK(Stack), StackTrace = ?EX_STACK(Stack),
?ERROR_MSG("Internal error while processing SQL query:~n** ~s", ?ERROR_MSG("Internal error while processing SQL query:~n** ~s",
@ -779,30 +765,42 @@ sql_query_to_iolist(SQLQuery) ->
generic_sql_query_format(SQLQuery). generic_sql_query_format(SQLQuery).
%% Generate the OTP callback return tuple depending on the driver result. %% Generate the OTP callback return tuple depending on the driver result.
abort_on_driver_error({error, abort_on_driver_error({error, <<"query timed out">>} = Reply, From, Timestamp) ->
<<"query timed out">>} = Reply, reply(From, Reply, Timestamp),
From) ->
p1_fsm:reply(From, Reply),
{stop, timeout, get(?STATE_KEY)}; {stop, timeout, get(?STATE_KEY)};
abort_on_driver_error({error, abort_on_driver_error({error, <<"Failed sending data on socket", _/binary>>} = Reply,
<<"Failed sending data on socket", _/binary>>} = Reply, From, Timestamp) ->
From) -> reply(From, Reply, Timestamp),
p1_fsm:reply(From, Reply),
{stop, closed, get(?STATE_KEY)}; {stop, closed, get(?STATE_KEY)};
abort_on_driver_error({error, abort_on_driver_error({error, <<"SQL connection failed">>} = Reply, From, Timestamp) ->
<<"SQL connection failed">>} = Reply, reply(From, Reply, Timestamp),
From) ->
p1_fsm:reply(From, Reply),
{stop, timeout, get(?STATE_KEY)}; {stop, timeout, get(?STATE_KEY)};
abort_on_driver_error({error, abort_on_driver_error({error, <<"Communication link failure">>} = Reply, From, Timestamp) ->
<<"Communication link failure">>} = Reply, reply(From, Reply, Timestamp),
From) ->
p1_fsm:reply(From, Reply),
{stop, closed, get(?STATE_KEY)}; {stop, closed, get(?STATE_KEY)};
abort_on_driver_error(Reply, From) -> abort_on_driver_error(Reply, From, Timestamp) ->
p1_fsm:reply(From, Reply), reply(From, Reply, Timestamp),
{next_state, session_established, get(?STATE_KEY)}. {next_state, session_established, get(?STATE_KEY)}.
-spec report_overload(state()) -> state().
report_overload(#state{overload_reported = PrevTime} = State) ->
CurrTime = current_time(),
case PrevTime == undefined orelse (CurrTime - PrevTime) > timer:seconds(30) of
true ->
?ERROR_MSG("SQL connection pool is overloaded, "
"discarding stale requests", []),
State#state{overload_reported = current_time()};
false ->
State
end.
-spec reply({pid(), term()}, term(), integer()) -> term().
reply(From, Reply, Timestamp) ->
case current_time() >= Timestamp of
true -> ok;
false -> p1_fsm:reply(From, Reply)
end.
%% == pure ODBC code %% == pure ODBC code
%% part of init/1 %% part of init/1
@ -1143,6 +1141,9 @@ fsm_limit_opts() ->
query_timeout(LServer) -> query_timeout(LServer) ->
ejabberd_option:sql_query_timeout(LServer). ejabberd_option:sql_query_timeout(LServer).
current_time() ->
erlang:monotonic_time(millisecond).
%% ***IMPORTANT*** This error format requires extended_errors turned on. %% ***IMPORTANT*** This error format requires extended_errors turned on.
extended_error({"08S01", _, Reason}) -> extended_error({"08S01", _, Reason}) ->
% TCP Provider: The specified network name is no longer available % TCP Provider: The specified network name is no longer available