25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-11-26 16:26:24 +01:00

Discard queued requests that are too old (the caller has already got a timeout).

(cherry picked from commit 0e5b930b22)
This commit is contained in:
Jerome Sautret 2010-06-11 16:35:45 +02:00
parent 2028698336
commit 1142cdad1b

View File

@ -119,14 +119,14 @@ sql_call(Host, Msg) ->
case get(?STATE_KEY) of
undefined ->
?GEN_FSM:sync_send_event(ejabberd_odbc_sup:get_random_pid(Host),
{sql_cmd, Msg}, ?TRANSACTION_TIMEOUT);
{sql_cmd, Msg, now()}, ?TRANSACTION_TIMEOUT);
_State ->
nested_op(Msg)
end.
% perform a harmless query on all opened connexions to avoid connexion close.
keep_alive(PID) ->
?GEN_FSM:sync_send_event(PID, {sql_cmd, {sql_query, ?KEEPALIVE_QUERY}},
?GEN_FSM:sync_send_event(PID, {sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, now()},
?KEEPALIVE_TIMEOUT).
%% This function is intended to be used from inside an sql_transaction:
@ -219,22 +219,22 @@ connecting(Event, State) ->
?WARNING_MSG("unexpected event in 'connecting': ~p", [Event]),
{next_state, connecting, State}.
connecting({sql_cmd, {sql_query, ?KEEPALIVE_QUERY}}, From, State) ->
connecting({sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, _Timestamp}, From, State) ->
?GEN_FSM:reply(From, {error, "SQL connection failed"}),
{next_state, connecting, State};
connecting({sql_cmd, Command} = Req, From, State) ->
?DEBUG("queueing pending request while connecting:~n\t~p", [Req]),
connecting({sql_cmd, Command, Timestamp} = Req, From, State) ->
?DEBUG("queuing pending request while connecting:~n\t~p", [Req]),
{Len, PendingRequests} = State#state.pending_requests,
NewPendingRequests =
if Len < State#state.max_pending_requests_len ->
{Len + 1, queue:in({sql_cmd, Command, From}, PendingRequests)};
{Len + 1, queue:in({sql_cmd, Command, From, Timestamp}, PendingRequests)};
true ->
lists:foreach(
fun({sql_cmd, _, To}) ->
fun({sql_cmd, _, To, _Timestamp}) ->
?GEN_FSM:reply(
To, {error, "SQL connection failed"})
end, queue:to_list(PendingRequests)),
{1, queue:from_list([{sql_cmd, Command, From}])}
{1, queue:from_list([{sql_cmd, Command, From, Timestamp}])}
end,
{next_state, connecting,
State#state{pending_requests = NewPendingRequests}};
@ -243,19 +243,15 @@ connecting(Request, {Who, _Ref}, State) ->
[Request, Who]),
{reply, {error, badarg}, connecting, State}.
session_established({sql_cmd, Command}, From, State) ->
put(?NESTING_KEY, ?TOP_LEVEL_TXN),
put(?STATE_KEY, State),
abort_on_driver_error(outer_op(Command), From);
session_established({sql_cmd, Command, Timestamp}, From, State) ->
run_sql_cmd(Command, From, State, Timestamp);
session_established(Request, {Who, _Ref}, State) ->
?WARNING_MSG("unexpected call ~p from ~p in 'session_established'",
[Request, Who]),
{reply, {error, badarg}, session_established, State}.
session_established({sql_cmd, Command, From}, State) ->
put(?NESTING_KEY, ?TOP_LEVEL_TXN),
put(?STATE_KEY, State),
abort_on_driver_error(outer_op(Command), From);
session_established({sql_cmd, Command, From, Timestamp}, State) ->
run_sql_cmd(Command, From, State, Timestamp);
session_established(Event, State) ->
?WARNING_MSG("unexpected event in 'session_established': ~p", [Event]),
{next_state, session_established, State}.
@ -301,6 +297,19 @@ print_state(State) ->
%%% Internal functions
%%%----------------------------------------------------------------------
run_sql_cmd(Command, From, State, Timestamp) ->
case timer:now_diff(now(), Timestamp) div 1000 of
Age when Age < ?TRANSACTION_TIMEOUT ->
put(?NESTING_KEY, ?TOP_LEVEL_TXN),
put(?STATE_KEY, State),
abort_on_driver_error(outer_op(Command), From);
Age ->
?ERROR_MSG("Database was not available or too slow,"
" discarding ~p milliseconds old request~n~p~n",
[Age, Command]),
{next_state, session_established, State}
end.
%% Only called by handle_call, only handles top level operations.
%% @spec outer_op(Op) -> {error, Reason} | {aborted, Reason} | {atomic, Result}
outer_op({sql_query, Query}) ->