diff --git a/src/ejabberd_redis.erl b/src/ejabberd_redis.erl index 8eeb19607..740b552f4 100644 --- a/src/ejabberd_redis.erl +++ b/src/ejabberd_redis.erl @@ -55,7 +55,11 @@ num :: pos_integer(), pending_q :: p1_queue:queue()}). --type redis_error() :: {error, binary() | atom()}. +-type redis_error() :: {error, binary() | timeout | disconnected}. +-type redis_reply() :: binary() | [binary()]. +-type redis_command() :: [binary()]. +-type redis_pipeline() :: [redis_command()]. +-type state() :: #state{}. %%%=================================================================== %%% API @@ -73,13 +77,15 @@ get_connection(I) -> iolist_to_binary( [atom_to_list(?MODULE), "_connection_", integer_to_list(I)])). +-spec q(redis_command()) -> {ok, redis_reply()} | redis_error(). q(Command) -> call(get_worker(), {q, Command}, ?MAX_RETRIES). +-spec qp(redis_pipeline()) -> {ok, [redis_reply()]} | redis_error(). qp(Pipeline) -> call(get_worker(), {qp, Pipeline}, ?MAX_RETRIES). --spec multi(fun(() -> any())) -> {ok, list()} | redis_error(). +-spec multi(fun(() -> any())) -> {ok, [redis_reply()]} | redis_error(). multi(F) -> case erlang:get(?TR_STACK) of undefined -> @@ -98,7 +104,7 @@ multi(F) -> erlang:raise(E, R, erlang:get_stacktrace()) end; _ -> - {error, nested_transaction} + erlang:error(nested_transaction) end. -spec get(iodata()) -> {ok, undefined | binary()} | redis_error(). @@ -107,7 +113,7 @@ get(Key) -> undefined -> q([<<"GET">>, Key]); _ -> - {error, transaction_unsupported} + erlang:error(transaction_unsupported) end. -spec set(iodata(), iodata()) -> ok | redis_error() | queued. @@ -174,7 +180,7 @@ smembers(Set) -> undefined -> q([<<"SMEMBERS">>, Set]); _ -> - {error, transaction_unsupported} + erlang:error(transaction_unsupported) end. -spec sismember(iodata(), iodata()) -> boolean() | redis_error(). @@ -186,7 +192,7 @@ sismember(Set, Member) -> {error, _} = Err -> Err end; _ -> - {error, transaction_unsupported} + erlang:error(transaction_unsupported) end. -spec scard(iodata()) -> {ok, non_neg_integer()} | redis_error(). @@ -200,7 +206,7 @@ scard(Set) -> Err end; _ -> - {error, transaction_unsupported} + erlang:error(transaction_unsupported) end. -spec hget(iodata(), iodata()) -> {ok, undefined | binary()} | redis_error(). @@ -209,7 +215,7 @@ hget(Key, Field) -> undefined -> q([<<"HGET">>, Key, Field]); _ -> - {error, transaction_unsupported} + erlang:error(transaction_unsupported) end. -spec hset(iodata(), iodata(), iodata()) -> {ok, boolean()} | redis_error() | queued. @@ -249,7 +255,7 @@ hgetall(Key) -> {error, _} = Err -> Err end; _ -> - {error, transaction_unsupported} + erlang:error(transaction_unsupported) end. -spec hlen(iodata()) -> {ok, non_neg_integer()} | redis_error(). @@ -261,7 +267,7 @@ hlen(Key) -> {error, _} = Err -> Err end; _ -> - {error, transaction_unsupported} + erlang:error(transaction_unsupported) end. -spec hkeys(iodata()) -> {ok, [binary()]} | redis_error(). @@ -270,7 +276,7 @@ hkeys(Key) -> undefined -> q([<<"HKEYS">>, Key]); _ -> - {error, transaction_unsupported} + erlang:error(transaction_unsupported) end. %%%=================================================================== @@ -340,6 +346,7 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== +-spec connect(state()) -> {ok, pid()} | {error, any()}. connect(#state{num = Num}) -> Server = ejabberd_config:get_option(redis_server, fun iolist_to_list/1, @@ -381,9 +388,12 @@ connect(#state{num = Num}) -> {error, Reason} end. --spec call({atom(), atom()}, {q | qp, list()}, integer()) -> - {error, disconnected | timeout | binary()} | {ok, iodata()}. +-spec call({atom(), atom()}, {q, redis_command()}, integer()) -> + {ok, redis_reply()} | redis_error(); + ({atom(), atom()}, {qp, redis_pipeline()}, integer()) -> + {ok, [redis_reply()]} | redis_error(). call({Conn, Parent}, {F, Cmd}, Retries) -> + ?DEBUG("redis query: ~p", [Cmd]), Res = try eredis:F(Conn, Cmd, ?CALL_TIMEOUT) of {error, Reason} when is_atom(Reason) -> try exit(whereis(Conn), kill) catch _:_ -> ok end, @@ -405,11 +415,14 @@ call({Conn, Parent}, {F, Cmd}, Retries) -> Res end. +-spec get_worker() -> {atom(), atom()}. get_worker() -> Time = p1_time_compat:system_time(), I = erlang:phash2(Time, ejabberd_redis_sup:get_pool_size()) + 1, {get_connection(I), get_proc(I)}. +-spec get_result([{error, atom() | binary()} | {ok, iodata()}]) -> + {ok, [redis_reply()]} | {error, binary()}. get_result([{error, _} = Err|_]) -> Err; get_result([{ok, _} = OK]) -> @@ -422,9 +435,11 @@ tr_enq(Cmd, Stack) -> erlang:put(?TR_STACK, [Cmd|Stack]), queued. +-spec decode_pairs([binary()]) -> [{binary(), binary()}]. decode_pairs(Pairs) -> decode_pairs(Pairs, []). +-spec decode_pairs([binary()], [{binary(), binary()}]) -> [{binary(), binary()}]. decode_pairs([Field, Val|Pairs], Acc) -> decode_pairs(Pairs, [{Field, Val}|Acc]); decode_pairs([], Acc) -> @@ -433,15 +448,18 @@ decode_pairs([], Acc) -> dec_bool(<<$1>>) -> true; dec_bool(<<$0>>) -> false. +-spec reply(T) -> {ok, T} | queued. reply(Val) -> case erlang:get(?TR_STACK) of undefined -> {ok, Val}; _ -> queued end. +-spec iolist_to_list(iodata()) -> string(). iolist_to_list(IOList) -> binary_to_list(iolist_to_binary(IOList)). +-spec max_fsm_queue() -> pos_integer(). max_fsm_queue() -> proplists:get_value(max_queue, fsm_limit_opts(), ?DEFAULT_MAX_QUEUE). @@ -458,6 +476,7 @@ get_queue_type() -> Type end. +-spec flush_queue(p1_queue:queue()) -> p1_queue:queue(). flush_queue(Q) -> CurrTime = p1_time_compat:monotonic_time(milli_seconds), p1_queue:dropwhile( @@ -470,6 +489,7 @@ flush_queue(Q) -> true end, Q). +-spec clean_queue(p1_queue:queue(), integer()) -> p1_queue:queue(). clean_queue(Q, CurrTime) -> Q1 = p1_queue:dropwhile( fun({_From, Time}) ->