Add odbc connection robustness (#2428)

This commit is contained in:
Christophe Romain 2018-09-25 16:59:49 +02:00
parent 0d743da595
commit c109d3eff0
1 changed files with 59 additions and 24 deletions

View File

@ -136,7 +136,7 @@ start_link(Host, StartInterval) ->
-spec sql_query(binary(), sql_query()) -> sql_query_result(). -spec sql_query(binary(), sql_query()) -> sql_query_result().
sql_query(Host, Query) -> sql_query(Host, Query) ->
check_error(sql_call(Host, {sql_query, Query}), Query). sql_call(Host, {sql_query, Query}).
%% SQL transaction based on a list of queries %% SQL transaction based on a list of queries
%% This function automatically %% This function automatically
@ -172,10 +172,16 @@ sql_call(Host, Msg) ->
end. end.
keep_alive(Host, PID) -> keep_alive(Host, PID) ->
sync_send_event(PID, case sync_send_event(PID,
{sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, {sql_cmd, {sql_query, ?KEEPALIVE_QUERY},
p1_time_compat:monotonic_time(milli_seconds)}, p1_time_compat:monotonic_time(milli_seconds)},
query_timeout(Host)). query_timeout(Host)) of
{selected,[<<"1">>],[[<<"1">>]]} ->
ok;
_Err ->
?ERROR_MSG("keep alive query failed, closing connection: ~p", [_Err]),
sync_send_event(PID, force_timeout, query_timeout(Host))
end.
sync_send_event(Pid, Msg, Timeout) -> sync_send_event(Pid, Msg, Timeout) ->
try p1_fsm:sync_send_event(Pid, Msg, Timeout) try p1_fsm:sync_send_event(Pid, Msg, Timeout)
@ -389,6 +395,8 @@ session_established(Request, {Who, _Ref}, State) ->
session_established({sql_cmd, Command, From, Timestamp}, session_established({sql_cmd, Command, From, Timestamp},
State) -> State) ->
run_sql_cmd(Command, From, State, Timestamp); run_sql_cmd(Command, From, State, Timestamp);
session_established(force_timeout, State) ->
{stop, timeout, State};
session_established(Event, State) -> session_established(Event, State) ->
?WARNING_MSG("unexpected event in 'session_established': ~p", ?WARNING_MSG("unexpected event in 'session_established': ~p",
[Event]), [Event]),
@ -592,11 +600,7 @@ sql_query_internal(#sql_query{} = Query) ->
[{Class, Reason, ST}]), [{Class, Reason, ST}]),
{error, <<"internal error">>} {error, <<"internal error">>}
end, end,
case Res of check_error(Res, Query);
{error, <<"No SQL-driver information available.">>} ->
{updated, 0};
_Else -> Res
end;
sql_query_internal(F) when is_function(F) -> sql_query_internal(F) when is_function(F) ->
case catch execute_fun(F) of case catch execute_fun(F) of
{'EXIT', Reason} -> {error, Reason}; {'EXIT', Reason} -> {error, Reason};
@ -626,11 +630,7 @@ sql_query_internal(Query) ->
Host = State#state.host, Host = State#state.host,
sqlite_to_odbc(Host, sqlite3:sql_exec(sqlite_db(Host), Query)) sqlite_to_odbc(Host, sqlite3:sql_exec(sqlite_db(Host), Query))
end, end,
case Res of check_error(Res, Query).
{error, <<"No SQL-driver information available.">>} ->
{updated, 0};
_Else -> Res
end.
select_sql_query(Queries, State) -> select_sql_query(Queries, State) ->
select_sql_query( select_sql_query(
@ -749,14 +749,23 @@ sql_query_to_iolist(SQLQuery) ->
generic_sql_query_format(SQLQuery). generic_sql_query_format(SQLQuery).
%% Generate the OTP callback return tuple depending on the driver result. %% Generate the OTP callback return tuple depending on the driver result.
abort_on_driver_error({error, <<"query timed out">>} = abort_on_driver_error({error,
Reply, <<"query timed out">>} = Reply,
From) -> From) ->
p1_fsm:reply(From, Reply), p1_fsm:reply(From, Reply),
{stop, timeout, get(?STATE_KEY)}; {stop, timeout, get(?STATE_KEY)};
abort_on_driver_error({error, abort_on_driver_error({error,
<<"Failed sending data on socket", _/binary>>} = <<"Failed sending data on socket", _/binary>>} = Reply,
Reply, From) ->
p1_fsm:reply(From, Reply),
{stop, closed, get(?STATE_KEY)};
abort_on_driver_error({error,
<<"SQL connection failed">>} = Reply,
From) ->
p1_fsm:reply(From, Reply),
{stop, timeout, get(?STATE_KEY)};
abort_on_driver_error({error,
<<"Communication link failure">>} = Reply,
From) -> From) ->
p1_fsm:reply(From, Reply), p1_fsm:reply(From, Reply),
{stop, closed, get(?STATE_KEY)}; {stop, closed, get(?STATE_KEY)};
@ -772,6 +781,7 @@ odbc_connect(SQLServer, Timeout) ->
ejabberd:start_app(odbc), ejabberd:start_app(odbc),
odbc:connect(binary_to_list(SQLServer), odbc:connect(binary_to_list(SQLServer),
[{scrollable_cursors, off}, [{scrollable_cursors, off},
{extended_errors, on},
{tuple_row, off}, {tuple_row, off},
{timeout, Timeout}, {timeout, Timeout},
{binary_strings, on}]). {binary_strings, on}]).
@ -1098,20 +1108,45 @@ query_timeout(LServer) ->
timer:seconds( timer:seconds(
ejabberd_config:get_option({sql_query_timeout, LServer}, 60)). ejabberd_config:get_option({sql_query_timeout, LServer}, 60)).
%% ***IMPORTANT*** This error format requires extended_errors turned on.
extended_error({"08S01", _, Reason}) ->
% TCP Provider: The specified network name is no longer available
?DEBUG("ODBC Link Failure: ~s", [Reason]),
<<"Communication link failure">>;
extended_error({"08001", _, Reason}) ->
% Login timeout expired
?DEBUG("ODBC Connect Timeout: ~s", [Reason]),
<<"SQL connection failed">>;
extended_error({"IMC01", _, Reason}) ->
% The connection is broken and recovery is not possible
?DEBUG("ODBC Link Failure: ~s", [Reason]),
<<"Communication link failure">>;
extended_error({"IMC06", _, Reason}) ->
% The connection is broken and recovery is not possible
?DEBUG("ODBC Link Failure: ~s", [Reason]),
<<"Communication link failure">>;
extended_error({Code, _, Reason}) ->
?DEBUG("ODBC Error ~s: ~s", [Code, Reason]),
iolist_to_binary(Reason);
extended_error(Error) ->
Error.
check_error({error, Why} = Err, _Query) when Why == killed -> check_error({error, Why} = Err, _Query) when Why == killed ->
Err; Err;
check_error({error, Why} = Err, #sql_query{} = Query) -> check_error({error, Why}, #sql_query{} = Query) ->
Err = extended_error(Why),
?ERROR_MSG("SQL query '~s' at ~p failed: ~p", ?ERROR_MSG("SQL query '~s' at ~p failed: ~p",
[Query#sql_query.hash, Query#sql_query.loc, Why]), [Query#sql_query.hash, Query#sql_query.loc, Err]),
Err; {error, Err};
check_error({error, Why} = Err, Query) -> check_error({error, Why}, Query) ->
Err = extended_error(Why),
case catch iolist_to_binary(Query) of case catch iolist_to_binary(Query) of
SQuery when is_binary(SQuery) -> SQuery when is_binary(SQuery) ->
?ERROR_MSG("SQL query '~s' failed: ~p", [SQuery, Why]); ?ERROR_MSG("SQL query '~s' failed: ~p", [SQuery, Err]);
_ -> _ ->
?ERROR_MSG("SQL query ~p failed: ~p", [Query, Why]) ?ERROR_MSG("SQL query ~p failed: ~p", [Query, Err])
end, end,
Err; {error, Err};
check_error(Result, _Query) -> check_error(Result, _Query) ->
Result. Result.