mirror of
https://github.com/processone/ejabberd.git
synced 2024-11-22 16:20:52 +01:00
8faa6afa67
Since we now use Lua scripting for cleaning up c2s sessions the minimum supported Redis version is 3.2.0 or above because we need to work correctly with Redis replication mechanism. ****** BACKWARD INCOMPATIBILITY WARNING ******* ** THIS SHOULD BE ADDED TO THE RELEASE NOTES ** *** PACKAGE MAINTAINERS SHOULD BE INFORMED *** ***********************************************
631 lines
18 KiB
Erlang
631 lines
18 KiB
Erlang
%%%-------------------------------------------------------------------
|
|
%%% File : ejabberd_redis.erl
|
|
%%% Author : Evgeny Khramtsov <ekhramtsov@process-one.net>
|
|
%%% Created : 8 May 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
|
|
%%%
|
|
%%%
|
|
%%% ejabberd, Copyright (C) 2002-2018 ProcessOne
|
|
%%%
|
|
%%% This program is free software; you can redistribute it and/or
|
|
%%% modify it under the terms of the GNU General Public License as
|
|
%%% published by the Free Software Foundation; either version 2 of the
|
|
%%% License, or (at your option) any later version.
|
|
%%%
|
|
%%% This program is distributed in the hope that it will be useful,
|
|
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
%%% General Public License for more details.
|
|
%%%
|
|
%%% You should have received a copy of the GNU General Public License along
|
|
%%% with this program; if not, write to the Free Software Foundation, Inc.,
|
|
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
%%%
|
|
%%%----------------------------------------------------------------------
|
|
|
|
-module(ejabberd_redis).
|
|
-ifndef(GEN_SERVER).
|
|
-define(GEN_SERVER, gen_server).
|
|
-endif.
|
|
-behaviour(?GEN_SERVER).
|
|
|
|
-compile({no_auto_import, [get/1, put/2]}).
|
|
|
|
%% API
|
|
-export([start_link/1, get_proc/1, get_connection/1, q/1, qp/1, format_error/1]).
|
|
%% Commands
|
|
-export([multi/1, get/1, set/2, del/1, info/1,
|
|
sadd/2, srem/2, smembers/1, sismember/2, scard/1,
|
|
hget/2, hset/3, hdel/2, hlen/1, hgetall/1, hkeys/1,
|
|
subscribe/1, publish/2, script_load/1, evalsha/3]).
|
|
|
|
%% gen_server callbacks
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
terminate/2, code_change/3]).
|
|
|
|
-define(SERVER, ?MODULE).
|
|
-define(PROCNAME, 'ejabberd_redis_client').
|
|
-define(TR_STACK, redis_transaction_stack).
|
|
-define(DEFAULT_MAX_QUEUE, 10000).
|
|
-define(MAX_RETRIES, 1).
|
|
-define(CALL_TIMEOUT, 60*1000). %% 60 seconds
|
|
|
|
-include("logger.hrl").
|
|
|
|
-record(state, {connection :: pid() | undefined,
|
|
num :: pos_integer(),
|
|
subscriptions = #{} :: map(),
|
|
pending_q :: p1_queue:queue()}).
|
|
|
|
-type error_reason() :: binary() | timeout | disconnected | overloaded.
|
|
-type redis_error() :: {error, error_reason()}.
|
|
-type redis_reply() :: binary() | [binary()].
|
|
-type redis_command() :: [binary()].
|
|
-type redis_pipeline() :: [redis_command()].
|
|
-type redis_info() :: server | clients | memory | persistence |
|
|
stats | replication | cpu | commandstats |
|
|
cluster | keyspace | default | all.
|
|
-type state() :: #state{}.
|
|
|
|
-export_type([error_reason/0]).
|
|
|
|
%%%===================================================================
|
|
%%% API
|
|
%%%===================================================================
|
|
start_link(I) ->
|
|
?GEN_SERVER:start_link({local, get_proc(I)}, ?MODULE, [I], []).
|
|
|
|
get_proc(I) ->
|
|
misc:binary_to_atom(
|
|
iolist_to_binary(
|
|
[atom_to_list(?MODULE), $_, integer_to_list(I)])).
|
|
|
|
get_connection(I) ->
|
|
misc:binary_to_atom(
|
|
iolist_to_binary(
|
|
[atom_to_list(?MODULE), "_connection_", integer_to_list(I)])).
|
|
|
|
-spec q(redis_command()) -> {ok, redis_reply()} | redis_error().
|
|
q(Command) ->
|
|
call(get_rnd_id(), {q, Command}, ?MAX_RETRIES).
|
|
|
|
-spec qp(redis_pipeline()) -> {ok, [redis_reply()]} | redis_error().
|
|
qp(Pipeline) ->
|
|
call(get_rnd_id(), {qp, Pipeline}, ?MAX_RETRIES).
|
|
|
|
-spec multi(fun(() -> any())) -> {ok, [redis_reply()]} | redis_error().
|
|
multi(F) ->
|
|
case erlang:get(?TR_STACK) of
|
|
undefined ->
|
|
erlang:put(?TR_STACK, []),
|
|
try F() of
|
|
_ ->
|
|
Stack = erlang:get(?TR_STACK),
|
|
erlang:erase(?TR_STACK),
|
|
Command = [["MULTI"]|lists:reverse([["EXEC"]|Stack])],
|
|
case qp(Command) of
|
|
{error, _} = Err -> Err;
|
|
Result -> get_result(Result)
|
|
end
|
|
catch E:R ->
|
|
erlang:erase(?TR_STACK),
|
|
erlang:raise(E, R, erlang:get_stacktrace())
|
|
end;
|
|
_ ->
|
|
erlang:error(nested_transaction)
|
|
end.
|
|
|
|
-spec format_error(atom() | binary()) -> binary().
|
|
format_error(Reason) when is_atom(Reason) ->
|
|
format_error(misc:atom_to_binary(Reason));
|
|
format_error(Reason) ->
|
|
Reason.
|
|
|
|
%%%===================================================================
|
|
%%% Redis commands API
|
|
%%%===================================================================
|
|
-spec get(iodata()) -> {ok, undefined | binary()} | redis_error().
|
|
get(Key) ->
|
|
case erlang:get(?TR_STACK) of
|
|
undefined ->
|
|
q([<<"GET">>, Key]);
|
|
_ ->
|
|
erlang:error(transaction_unsupported)
|
|
end.
|
|
|
|
-spec set(iodata(), iodata()) -> ok | redis_error() | queued.
|
|
set(Key, Val) ->
|
|
Cmd = [<<"SET">>, Key, Val],
|
|
case erlang:get(?TR_STACK) of
|
|
undefined ->
|
|
case q(Cmd) of
|
|
{ok, <<"OK">>} -> ok;
|
|
{error, _} = Err -> Err
|
|
end;
|
|
Stack ->
|
|
tr_enq(Cmd, Stack)
|
|
end.
|
|
|
|
-spec del(list()) -> {ok, non_neg_integer()} | redis_error() | queued.
|
|
del([]) ->
|
|
reply(0);
|
|
del(Keys) ->
|
|
Cmd = [<<"DEL">>|Keys],
|
|
case erlang:get(?TR_STACK) of
|
|
undefined ->
|
|
case q(Cmd) of
|
|
{ok, N} -> {ok, binary_to_integer(N)};
|
|
{error, _} = Err -> Err
|
|
end;
|
|
Stack ->
|
|
tr_enq(Cmd, Stack)
|
|
end.
|
|
|
|
-spec sadd(iodata(), list()) -> {ok, non_neg_integer()} | redis_error() | queued.
|
|
sadd(_Set, []) ->
|
|
reply(0);
|
|
sadd(Set, Members) ->
|
|
Cmd = [<<"SADD">>, Set|Members],
|
|
case erlang:get(?TR_STACK) of
|
|
undefined ->
|
|
case q(Cmd) of
|
|
{ok, N} -> {ok, binary_to_integer(N)};
|
|
{error, _} = Err -> Err
|
|
end;
|
|
Stack ->
|
|
tr_enq(Cmd, Stack)
|
|
end.
|
|
|
|
-spec srem(iodata(), list()) -> {ok, non_neg_integer()} | redis_error() | queued.
|
|
srem(_Set, []) ->
|
|
reply(0);
|
|
srem(Set, Members) ->
|
|
Cmd = [<<"SREM">>, Set|Members],
|
|
case erlang:get(?TR_STACK) of
|
|
undefined ->
|
|
case q(Cmd) of
|
|
{ok, N} -> {ok, binary_to_integer(N)};
|
|
{error, _} = Err -> Err
|
|
end;
|
|
Stack ->
|
|
tr_enq(Cmd, Stack)
|
|
end.
|
|
|
|
-spec smembers(iodata()) -> {ok, [binary()]} | redis_error().
|
|
smembers(Set) ->
|
|
case erlang:get(?TR_STACK) of
|
|
undefined ->
|
|
q([<<"SMEMBERS">>, Set]);
|
|
_ ->
|
|
erlang:error(transaction_unsupported)
|
|
end.
|
|
|
|
-spec sismember(iodata(), iodata()) -> boolean() | redis_error().
|
|
sismember(Set, Member) ->
|
|
case erlang:get(?TR_STACK) of
|
|
undefined ->
|
|
case q([<<"SISMEMBER">>, Set, Member]) of
|
|
{ok, Flag} -> {ok, dec_bool(Flag)};
|
|
{error, _} = Err -> Err
|
|
end;
|
|
_ ->
|
|
erlang:error(transaction_unsupported)
|
|
end.
|
|
|
|
-spec scard(iodata()) -> {ok, non_neg_integer()} | redis_error().
|
|
scard(Set) ->
|
|
case erlang:get(?TR_STACK) of
|
|
undefined ->
|
|
case q([<<"SCARD">>, Set]) of
|
|
{ok, N} ->
|
|
{ok, binary_to_integer(N)};
|
|
{error, _} = Err ->
|
|
Err
|
|
end;
|
|
_ ->
|
|
erlang:error(transaction_unsupported)
|
|
end.
|
|
|
|
-spec hget(iodata(), iodata()) -> {ok, undefined | binary()} | redis_error().
|
|
hget(Key, Field) ->
|
|
case erlang:get(?TR_STACK) of
|
|
undefined ->
|
|
q([<<"HGET">>, Key, Field]);
|
|
_ ->
|
|
erlang:error(transaction_unsupported)
|
|
end.
|
|
|
|
-spec hset(iodata(), iodata(), iodata()) -> {ok, boolean()} | redis_error() | queued.
|
|
hset(Key, Field, Val) ->
|
|
Cmd = [<<"HSET">>, Key, Field, Val],
|
|
case erlang:get(?TR_STACK) of
|
|
undefined ->
|
|
case q(Cmd) of
|
|
{ok, Flag} -> {ok, dec_bool(Flag)};
|
|
{error, _} = Err -> Err
|
|
end;
|
|
Stack ->
|
|
tr_enq(Cmd, Stack)
|
|
end.
|
|
|
|
-spec hdel(iodata(), list()) -> {ok, non_neg_integer()} | redis_error() | queued.
|
|
hdel(_Key, []) ->
|
|
reply(0);
|
|
hdel(Key, Fields) ->
|
|
Cmd = [<<"HDEL">>, Key|Fields],
|
|
case erlang:get(?TR_STACK) of
|
|
undefined ->
|
|
case q(Cmd) of
|
|
{ok, N} -> {ok, binary_to_integer(N)};
|
|
{error, _} = Err -> Err
|
|
end;
|
|
Stack ->
|
|
tr_enq(Cmd, Stack)
|
|
end.
|
|
|
|
-spec hgetall(iodata()) -> {ok, [{binary(), binary()}]} | redis_error().
|
|
hgetall(Key) ->
|
|
case erlang:get(?TR_STACK) of
|
|
undefined ->
|
|
case q([<<"HGETALL">>, Key]) of
|
|
{ok, Pairs} -> {ok, decode_pairs(Pairs)};
|
|
{error, _} = Err -> Err
|
|
end;
|
|
_ ->
|
|
erlang:error(transaction_unsupported)
|
|
end.
|
|
|
|
-spec hlen(iodata()) -> {ok, non_neg_integer()} | redis_error().
|
|
hlen(Key) ->
|
|
case erlang:get(?TR_STACK) of
|
|
undefined ->
|
|
case q([<<"HLEN">>, Key]) of
|
|
{ok, N} -> {ok, binary_to_integer(N)};
|
|
{error, _} = Err -> Err
|
|
end;
|
|
_ ->
|
|
erlang:error(transaction_unsupported)
|
|
end.
|
|
|
|
-spec hkeys(iodata()) -> {ok, [binary()]} | redis_error().
|
|
hkeys(Key) ->
|
|
case erlang:get(?TR_STACK) of
|
|
undefined ->
|
|
q([<<"HKEYS">>, Key]);
|
|
_ ->
|
|
erlang:error(transaction_unsupported)
|
|
end.
|
|
|
|
-spec subscribe([binary()]) -> ok | redis_error().
|
|
subscribe(Channels) ->
|
|
try ?GEN_SERVER:call(get_proc(1), {subscribe, self(), Channels}, ?CALL_TIMEOUT)
|
|
catch exit:{Why, {?GEN_SERVER, call, _}} ->
|
|
Reason = case Why of
|
|
timeout -> timeout;
|
|
_ -> disconnected
|
|
end,
|
|
{error, Reason}
|
|
end.
|
|
|
|
-spec publish(iodata(), iodata()) -> {ok, non_neg_integer()} | redis_error() | queued.
|
|
publish(Channel, Data) ->
|
|
Cmd = [<<"PUBLISH">>, Channel, Data],
|
|
case erlang:get(?TR_STACK) of
|
|
undefined ->
|
|
case q(Cmd) of
|
|
{ok, N} -> {ok, binary_to_integer(N)};
|
|
{error, _} = Err -> Err
|
|
end;
|
|
Stack ->
|
|
tr_enq(Cmd, Stack)
|
|
end.
|
|
|
|
-spec script_load(iodata()) -> {ok, binary()} | redis_error().
|
|
script_load(Data) ->
|
|
case erlang:get(?TR_STACK) of
|
|
undefined ->
|
|
q([<<"SCRIPT">>, <<"LOAD">>, Data]);
|
|
_ ->
|
|
erlang:error(transaction_unsupported)
|
|
end.
|
|
|
|
-spec evalsha(binary(), [iodata()], [iodata()]) -> {ok, binary()} | redis_error().
|
|
evalsha(SHA, Keys, Args) ->
|
|
case erlang:get(?TR_STACK) of
|
|
undefined ->
|
|
q([<<"EVALSHA">>, SHA, length(Keys)|Keys ++ Args]);
|
|
_ ->
|
|
erlang:error(transaction_unsupported)
|
|
end.
|
|
|
|
-spec info(redis_info()) -> {ok, [{atom(), binary()}]} | redis_error().
|
|
info(Type) ->
|
|
case erlang:get(?TR_STACK) of
|
|
undefined ->
|
|
case q([<<"INFO">>, misc:atom_to_binary(Type)]) of
|
|
{ok, Info} ->
|
|
Lines = binary:split(Info, <<"\r\n">>, [global]),
|
|
KVs = [binary:split(Line, <<":">>) || Line <- Lines],
|
|
{ok, [{misc:binary_to_atom(K), V} || [K, V] <- KVs]};
|
|
{error, _} = Err ->
|
|
Err
|
|
end;
|
|
_ ->
|
|
erlang:error(transaction_unsupported)
|
|
end.
|
|
|
|
%%%===================================================================
|
|
%%% gen_server callbacks
|
|
%%%===================================================================
|
|
init([I]) ->
|
|
process_flag(trap_exit, true),
|
|
QueueType = get_queue_type(),
|
|
Limit = max_fsm_queue(),
|
|
self() ! connect,
|
|
{ok, #state{num = I, pending_q = p1_queue:new(QueueType, Limit)}}.
|
|
|
|
handle_call(connect, From, #state{connection = undefined,
|
|
pending_q = Q} = State) ->
|
|
CurrTime = p1_time_compat:monotonic_time(milli_seconds),
|
|
Q2 = try p1_queue:in({From, CurrTime}, Q)
|
|
catch error:full ->
|
|
Q1 = clean_queue(Q, CurrTime),
|
|
p1_queue:in({From, CurrTime}, Q1)
|
|
end,
|
|
{noreply, State#state{pending_q = Q2}};
|
|
handle_call(connect, From, #state{connection = Pid} = State) ->
|
|
case is_process_alive(Pid) of
|
|
true ->
|
|
{reply, ok, State};
|
|
false ->
|
|
self() ! connect,
|
|
handle_call(connect, From, State#state{connection = undefined})
|
|
end;
|
|
handle_call({subscribe, Caller, Channels}, _From,
|
|
#state{connection = Pid, subscriptions = Subs} = State) ->
|
|
Subs1 = lists:foldl(
|
|
fun(Channel, Acc) ->
|
|
Callers = maps:get(Channel, Acc, []) -- [Caller],
|
|
maps:put(Channel, [Caller|Callers], Acc)
|
|
end, Subs, Channels),
|
|
eredis_subscribe(Pid, Channels),
|
|
{reply, ok, State#state{subscriptions = Subs1}};
|
|
handle_call(Request, _From, State) ->
|
|
?WARNING_MSG("unexepected call: ~p", [Request]),
|
|
{noreply, State}.
|
|
|
|
handle_cast(_Msg, State) ->
|
|
{noreply, State}.
|
|
|
|
handle_info(connect, #state{connection = undefined} = State) ->
|
|
NewState = case connect(State) of
|
|
{ok, Connection} ->
|
|
Q1 = flush_queue(State#state.pending_q),
|
|
re_subscribe(Connection, State#state.subscriptions),
|
|
State#state{connection = Connection, pending_q = Q1};
|
|
{error, _} ->
|
|
State
|
|
end,
|
|
{noreply, NewState};
|
|
handle_info(connect, State) ->
|
|
%% Already connected
|
|
{noreply, State};
|
|
handle_info({'EXIT', Pid, _}, State) ->
|
|
case State#state.connection of
|
|
Pid ->
|
|
self() ! connect,
|
|
{noreply, State#state{connection = undefined}};
|
|
_ ->
|
|
{noreply, State}
|
|
end;
|
|
handle_info({subscribed, Channel, Pid}, State) ->
|
|
case State#state.connection of
|
|
Pid ->
|
|
case maps:is_key(Channel, State#state.subscriptions) of
|
|
true -> eredis_sub:ack_message(Pid);
|
|
false ->
|
|
?WARNING_MSG("got subscription ack for unknown channel ~s",
|
|
[Channel])
|
|
end;
|
|
_ ->
|
|
ok
|
|
end,
|
|
{noreply, State};
|
|
handle_info({message, Channel, Data, Pid}, State) ->
|
|
case State#state.connection of
|
|
Pid ->
|
|
lists:foreach(
|
|
fun(Subscriber) ->
|
|
erlang:send(Subscriber, {redis_message, Channel, Data})
|
|
end, maps:get(Channel, State#state.subscriptions, [])),
|
|
eredis_sub:ack_message(Pid);
|
|
_ ->
|
|
ok
|
|
end,
|
|
{noreply, State};
|
|
handle_info(Info, State) ->
|
|
?WARNING_MSG("unexpected info = ~p", [Info]),
|
|
{noreply, State}.
|
|
|
|
terminate(_Reason, _State) ->
|
|
ok.
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
{ok, State}.
|
|
|
|
%%%===================================================================
|
|
%%% Internal functions
|
|
%%%===================================================================
|
|
-spec connect(state()) -> {ok, pid()} | {error, any()}.
|
|
connect(#state{num = Num}) ->
|
|
Server = ejabberd_config:get_option(redis_server, "localhost"),
|
|
Port = ejabberd_config:get_option(redis_port, 6379),
|
|
DB = ejabberd_config:get_option(redis_db, 0),
|
|
Pass = ejabberd_config:get_option(redis_password, ""),
|
|
ConnTimeout = timer:seconds(
|
|
ejabberd_config:get_option(
|
|
redis_connect_timeout, 1)),
|
|
try case do_connect(Num, Server, Port, Pass, DB, ConnTimeout) of
|
|
{ok, Client} ->
|
|
?DEBUG("Connection #~p established to Redis at ~s:~p",
|
|
[Num, Server, Port]),
|
|
register(get_connection(Num), Client),
|
|
{ok, Client};
|
|
{error, Why} ->
|
|
erlang:error(Why)
|
|
end
|
|
catch _:Reason ->
|
|
Timeout = p1_rand:uniform(
|
|
min(10, ejabberd_redis_sup:get_pool_size())),
|
|
?ERROR_MSG("Redis connection #~p at ~s:~p has failed: ~p; "
|
|
"reconnecting in ~p seconds",
|
|
[Num, Server, Port, Reason, Timeout]),
|
|
erlang:send_after(timer:seconds(Timeout), self(), connect),
|
|
{error, Reason}
|
|
end.
|
|
|
|
do_connect(1, Server, Port, Pass, _DB, _ConnTimeout) ->
|
|
%% First connection in the pool is always a subscriber
|
|
Res = eredis_sub:start_link(Server, Port, Pass, no_reconnect, infinity, drop),
|
|
case Res of
|
|
{ok, Pid} -> eredis_sub:controlling_process(Pid);
|
|
_ -> ok
|
|
end,
|
|
Res;
|
|
do_connect(_, Server, Port, Pass, DB, ConnTimeout) ->
|
|
eredis:start_link(Server, Port, DB, Pass, no_reconnect, ConnTimeout).
|
|
|
|
-spec call(pos_integer(), {q, redis_command()}, integer()) ->
|
|
{ok, redis_reply()} | redis_error();
|
|
(pos_integer(), {qp, redis_pipeline()}, integer()) ->
|
|
{ok, [redis_reply()]} | redis_error().
|
|
call(I, {F, Cmd}, Retries) ->
|
|
?DEBUG("redis query: ~p", [Cmd]),
|
|
Conn = get_connection(I),
|
|
Res = try eredis:F(Conn, Cmd, ?CALL_TIMEOUT) of
|
|
{error, Reason} when is_atom(Reason) ->
|
|
try exit(whereis(Conn), kill) catch _:_ -> ok end,
|
|
{error, disconnected};
|
|
Other ->
|
|
Other
|
|
catch exit:{timeout, _} -> {error, timeout};
|
|
exit:{_, {gen_server, call, _}} -> {error, disconnected}
|
|
end,
|
|
case Res of
|
|
{error, disconnected} when Retries > 0 ->
|
|
try ?GEN_SERVER:call(get_proc(I), connect, ?CALL_TIMEOUT) of
|
|
ok -> call(I, {F, Cmd}, Retries-1);
|
|
{error, _} = Err -> Err
|
|
catch exit:{Why, {?GEN_SERVER, call, _}} ->
|
|
Reason1 = case Why of
|
|
timeout -> timeout;
|
|
_ -> disconnected
|
|
end,
|
|
log_error(Cmd, Reason1),
|
|
{error, Reason1}
|
|
end;
|
|
{error, Reason1} ->
|
|
log_error(Cmd, Reason1),
|
|
Res;
|
|
_ ->
|
|
Res
|
|
end.
|
|
|
|
-spec log_error(redis_command() | redis_pipeline(), atom() | binary()) -> ok.
|
|
log_error(Cmd, Reason) ->
|
|
?ERROR_MSG("Redis request has failed:~n"
|
|
"** request = ~p~n"
|
|
"** response = ~s",
|
|
[Cmd, format_error(Reason)]).
|
|
|
|
-spec get_rnd_id() -> pos_integer().
|
|
get_rnd_id() ->
|
|
p1_rand:round_robin(ejabberd_redis_sup:get_pool_size() - 1) + 2.
|
|
|
|
-spec get_result([{error, atom() | binary()} | {ok, iodata()}]) ->
|
|
{ok, [redis_reply()]} | {error, binary()}.
|
|
get_result([{error, _} = Err|_]) ->
|
|
Err;
|
|
get_result([{ok, _} = OK]) ->
|
|
OK;
|
|
get_result([_|T]) ->
|
|
get_result(T).
|
|
|
|
-spec tr_enq([iodata()], list()) -> queued.
|
|
tr_enq(Cmd, Stack) ->
|
|
erlang:put(?TR_STACK, [Cmd|Stack]),
|
|
queued.
|
|
|
|
-spec decode_pairs([binary()]) -> [{binary(), binary()}].
|
|
decode_pairs(Pairs) ->
|
|
decode_pairs(Pairs, []).
|
|
|
|
-spec decode_pairs([binary()], [{binary(), binary()}]) -> [{binary(), binary()}].
|
|
decode_pairs([Field, Val|Pairs], Acc) ->
|
|
decode_pairs(Pairs, [{Field, Val}|Acc]);
|
|
decode_pairs([], Acc) ->
|
|
lists:reverse(Acc).
|
|
|
|
dec_bool(<<$1>>) -> true;
|
|
dec_bool(<<$0>>) -> false.
|
|
|
|
-spec reply(T) -> {ok, T} | queued.
|
|
reply(Val) ->
|
|
case erlang:get(?TR_STACK) of
|
|
undefined -> {ok, Val};
|
|
_ -> queued
|
|
end.
|
|
|
|
-spec max_fsm_queue() -> pos_integer().
|
|
max_fsm_queue() ->
|
|
proplists:get_value(max_queue, fsm_limit_opts(), ?DEFAULT_MAX_QUEUE).
|
|
|
|
fsm_limit_opts() ->
|
|
ejabberd_config:fsm_limit_opts([]).
|
|
|
|
get_queue_type() ->
|
|
ejabberd_config:get_option(
|
|
redis_queue_type,
|
|
ejabberd_config:default_queue_type(global)).
|
|
|
|
-spec flush_queue(p1_queue:queue()) -> p1_queue:queue().
|
|
flush_queue(Q) ->
|
|
CurrTime = p1_time_compat:monotonic_time(milli_seconds),
|
|
p1_queue:dropwhile(
|
|
fun({From, Time}) ->
|
|
if (CurrTime - Time) >= ?CALL_TIMEOUT ->
|
|
ok;
|
|
true ->
|
|
?GEN_SERVER:reply(From, ok)
|
|
end,
|
|
true
|
|
end, Q).
|
|
|
|
-spec clean_queue(p1_queue:queue(), integer()) -> p1_queue:queue().
|
|
clean_queue(Q, CurrTime) ->
|
|
Q1 = p1_queue:dropwhile(
|
|
fun({_From, Time}) ->
|
|
(CurrTime - Time) >= ?CALL_TIMEOUT
|
|
end, Q),
|
|
Len = p1_queue:len(Q1),
|
|
Limit = p1_queue:get_limit(Q1),
|
|
if Len >= Limit ->
|
|
?ERROR_MSG("Redis request queue is overloaded", []),
|
|
p1_queue:dropwhile(
|
|
fun({From, _Time}) ->
|
|
?GEN_SERVER:reply(From, {error, overloaded}),
|
|
true
|
|
end, Q1);
|
|
true ->
|
|
Q1
|
|
end.
|
|
|
|
re_subscribe(Pid, Subs) ->
|
|
case maps:keys(Subs) of
|
|
[] -> ok;
|
|
Channels -> eredis_subscribe(Pid, Channels)
|
|
end.
|
|
|
|
eredis_subscribe(Pid, Channels) ->
|
|
?DEBUG("redis query: ~p", [[<<"SUBSCRIBE">>|Channels]]),
|
|
eredis_sub:subscribe(Pid, Channels).
|