From 0e5b930b22e57da971cd1e190ec47482d7f9f684 Mon Sep 17 00:00:00 2001 From: Jerome Sautret Date: Fri, 11 Jun 2010 16:35:45 +0200 Subject: [PATCH] Discard queued requests that are too old (the caller has already got a timeout). --- src/odbc/ejabberd_odbc.erl | 45 +++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/src/odbc/ejabberd_odbc.erl b/src/odbc/ejabberd_odbc.erl index 390cc44a6..d7cdd0371 100644 --- a/src/odbc/ejabberd_odbc.erl +++ b/src/odbc/ejabberd_odbc.erl @@ -47,7 +47,7 @@ handle_sync_event/4, handle_info/3, terminate/3, - print_state/1, + print_state/1, code_change/4]). %% gen_fsm states @@ -119,14 +119,14 @@ sql_call(Host, Msg) -> case get(?STATE_KEY) of undefined -> ?GEN_FSM:sync_send_event(ejabberd_odbc_sup:get_random_pid(Host), - {sql_cmd, Msg}, ?TRANSACTION_TIMEOUT); + {sql_cmd, Msg, now()}, ?TRANSACTION_TIMEOUT); _State -> nested_op(Msg) end. % perform a harmless query on all opened connexions to avoid connexion close. keep_alive(PID) -> - ?GEN_FSM:sync_send_event(PID, {sql_cmd, {sql_query, ?KEEPALIVE_QUERY}}, + ?GEN_FSM:sync_send_event(PID, {sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, now()}, ?KEEPALIVE_TIMEOUT). %% This function is intended to be used from inside an sql_transaction: @@ -218,22 +218,22 @@ connecting(Event, State) -> ?WARNING_MSG("unexpected event in 'connecting': ~p", [Event]), {next_state, connecting, State}. -connecting({sql_cmd, {sql_query, ?KEEPALIVE_QUERY}}, From, State) -> +connecting({sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, _Timestamp}, From, State) -> ?GEN_FSM:reply(From, {error, "SQL connection failed"}), {next_state, connecting, State}; -connecting({sql_cmd, Command} = Req, From, State) -> - ?DEBUG("queueing pending request while connecting:~n\t~p", [Req]), +connecting({sql_cmd, Command, Timestamp} = Req, From, State) -> + ?DEBUG("queuing pending request while connecting:~n\t~p", [Req]), {Len, PendingRequests} = State#state.pending_requests, NewPendingRequests = if Len < State#state.max_pending_requests_len -> - {Len + 1, queue:in({sql_cmd, Command, From}, PendingRequests)}; + {Len + 1, queue:in({sql_cmd, Command, From, Timestamp}, PendingRequests)}; true -> lists:foreach( - fun({sql_cmd, _, To}) -> + fun({sql_cmd, _, To, _Timestamp}) -> ?GEN_FSM:reply( To, {error, "SQL connection failed"}) end, queue:to_list(PendingRequests)), - {1, queue:from_list([{sql_cmd, Command, From}])} + {1, queue:from_list([{sql_cmd, Command, From, Timestamp}])} end, {next_state, connecting, State#state{pending_requests = NewPendingRequests}}; @@ -242,19 +242,15 @@ connecting(Request, {Who, _Ref}, State) -> [Request, Who]), {reply, {error, badarg}, connecting, State}. -session_established({sql_cmd, Command}, From, State) -> - put(?NESTING_KEY, ?TOP_LEVEL_TXN), - put(?STATE_KEY, State), - abort_on_driver_error(outer_op(Command), From); +session_established({sql_cmd, Command, Timestamp}, From, State) -> + run_sql_cmd(Command, From, State, Timestamp); session_established(Request, {Who, _Ref}, State) -> ?WARNING_MSG("unexpected call ~p from ~p in 'session_established'", [Request, Who]), {reply, {error, badarg}, session_established, State}. -session_established({sql_cmd, Command, From}, State) -> - put(?NESTING_KEY, ?TOP_LEVEL_TXN), - put(?STATE_KEY, State), - abort_on_driver_error(outer_op(Command), From); +session_established({sql_cmd, Command, From, Timestamp}, State) -> + run_sql_cmd(Command, From, State, Timestamp); session_established(Event, State) -> ?WARNING_MSG("unexpected event in 'session_established': ~p", [Event]), {next_state, session_established, State}. @@ -299,6 +295,19 @@ print_state(State) -> %%% Internal functions %%%---------------------------------------------------------------------- +run_sql_cmd(Command, From, State, Timestamp) -> + case timer:now_diff(now(), Timestamp) div 1000 of + Age when Age < ?TRANSACTION_TIMEOUT -> + put(?NESTING_KEY, ?TOP_LEVEL_TXN), + put(?STATE_KEY, State), + abort_on_driver_error(outer_op(Command), From); + Age -> + ?ERROR_MSG("Database was not available or too slow," + " discarding ~p milliseconds old request~n~p~n", + [Age, Command]), + {next_state, session_established, State} + end. + %% Only called by handle_call, only handles top level operations. %% @spec outer_op(Op) -> {error, Reason} | {aborted, Reason} | {atomic, Result} outer_op({sql_query, Query}) -> @@ -547,7 +556,7 @@ max_fsm_queue() -> _ -> undefined end. - + fsm_limit_opts() -> case max_fsm_queue() of N when is_integer(N) ->