Add Redis pool support

Fixes #1624
This commit is contained in:
Evgeniy Khramtsov 2017-04-06 17:56:37 +03:00
parent 00c613b351
commit 6876a37e61
3 changed files with 315 additions and 117 deletions

View File

@ -23,14 +23,15 @@
%%%----------------------------------------------------------------------
-module(ejabberd_redis).
-behaviour(gen_server).
-behaviour(ejabberd_config).
-ifndef(GEN_SERVER).
-define(GEN_SERVER, gen_server).
-endif.
-behaviour(?GEN_SERVER).
-compile({no_auto_import, [get/1, put/2]}).
%% API
-export([start_link/0, q/1, qp/1, config_reloaded/0, opt_type/1]).
-export([start_link/1, get_proc/1, q/1, qp/1]).
%% Commands
-export([multi/1, get/1, set/2, del/1,
sadd/2, srem/2, smembers/1, sismember/2, scard/1,
@ -43,31 +44,40 @@
-define(SERVER, ?MODULE).
-define(PROCNAME, 'ejabberd_redis_client').
-define(TR_STACK, redis_transaction_stack).
-define(DEFAULT_MAX_QUEUE, 5000).
-define(MAX_RETRIES, 1).
-define(CALL_TIMEOUT, 60*1000). %% 60 seconds
-include("logger.hrl").
-include("ejabberd.hrl").
-record(state, {connection :: {pid(), reference()} | undefined}).
-record(state, {connection :: pid() | undefined,
num :: pos_integer(),
pending_q :: p1_queue:queue()}).
-type redis_error() :: {error, binary() | atom()}.
%%%===================================================================
%%% API
%%%===================================================================
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
start_link(I) ->
?GEN_SERVER:start_link({local, get_proc(I)}, ?MODULE, [I], []).
get_proc(I) ->
aux:binary_to_atom(
iolist_to_binary(
[atom_to_list(?MODULE), $_, integer_to_list(I)])).
get_connection(I) ->
aux:binary_to_atom(
iolist_to_binary(
[atom_to_list(?MODULE), "_connection_", integer_to_list(I)])).
q(Command) ->
try eredis:q(?PROCNAME, Command)
catch _:{noproc, _} -> {error, disconnected};
_:{timeout, _} -> {error, timeout}
end.
call(get_worker(), {q, Command}, ?MAX_RETRIES).
qp(Pipeline) ->
try eredis:qp(?PROCNAME, Pipeline)
catch _:{noproc, _} -> {error, disconnected};
_:{timeout, _} -> {error, timeout}
end.
call(get_worker(), {qp, Pipeline}, ?MAX_RETRIES).
-spec multi(fun(() -> any())) -> {ok, list()} | redis_error().
multi(F) ->
@ -91,14 +101,6 @@ multi(F) ->
{error, nested_transaction}
end.
config_reloaded() ->
case is_redis_configured() of
true ->
?MODULE ! connect;
false ->
?MODULE ! disconnect
end.
-spec get(iodata()) -> {ok, undefined | binary()} | redis_error().
get(Key) ->
case erlang:get(?TR_STACK) of
@ -274,56 +276,63 @@ hkeys(Key) ->
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([]) ->
ejabberd_hooks:add(config_reloaded, ?MODULE, config_reloaded, 20),
init([I]) ->
process_flag(trap_exit, true),
{_, State} = handle_info(connect, #state{}),
{ok, State}.
QueueType = get_queue_type(),
Limit = max_fsm_queue(),
self() ! connect,
{ok, #state{num = I, pending_q = p1_queue:new(QueueType, Limit)}}.
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_call(connect, From, #state{connection = undefined,
pending_q = Q} = State) ->
CurrTime = p1_time_compat:monotonic_time(milli_seconds),
Q2 = try p1_queue:in({From, CurrTime}, Q)
catch error:full ->
Q1 = clean_queue(Q, CurrTime),
p1_queue:in({From, CurrTime}, Q1)
end,
{noreply, State#state{pending_q = Q2}};
handle_call(connect, From, #state{connection = Pid} = State) ->
case is_process_alive(Pid) of
true ->
{reply, ok, State};
false ->
self() ! connect,
handle_call(connect, From, State#state{connection = undefined})
end;
handle_call(Request, _From, State) ->
?WARNING_MSG("unexepected call: ~p", [Request]),
{noreply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(connect, #state{connection = undefined} = State) ->
NewState = case is_redis_configured() of
true ->
case connect() of
{ok, Connection} ->
State#state{connection = Connection};
{error, _} ->
State
end;
false ->
NewState = case connect(State) of
{ok, Connection} ->
Q1 = flush_queue(State#state.pending_q),
State#state{connection = Connection, pending_q = Q1};
{error, _} ->
State
end,
{noreply, NewState};
handle_info(connect, State) ->
%% Already connected
{noreply, State};
handle_info(disconnect, #state{connection = {Pid, MRef}} = State) ->
?INFO_MSG("Disconnecting from Redis server", []),
erlang:demonitor(MRef, [flush]),
eredis:stop(Pid),
{noreply, State#state{connection = undefined}};
handle_info(disconnect, State) ->
%% Not connected
{noreply, State};
handle_info({'DOWN', MRef, _Type, Pid, Reason},
#state{connection = {Pid, MRef}} = State) ->
?INFO_MSG("Redis connection has failed: ~p", [Reason]),
connect(),
{noreply, State#state{connection = undefined}};
handle_info({'EXIT', _, _}, State) ->
{noreply, State};
handle_info({'EXIT', Pid, _}, State) ->
case State#state.connection of
Pid ->
self() ! connect,
{noreply, State#state{connection = undefined}};
_ ->
{noreply, State}
end;
handle_info(Info, State) ->
?INFO_MSG("unexpected info = ~p", [Info]),
?WARNING_MSG("unexpected info = ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ejabberd_hooks:delete(config_reloaded, ?MODULE, config_reloaded, 20).
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -331,37 +340,7 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
is_redis_configured() ->
lists:any(fun is_redis_configured/1, ?MYHOSTS).
is_redis_configured(Host) ->
ServerConfigured = ejabberd_config:has_option({redis_server, Host}),
PortConfigured = ejabberd_config:has_option({redis_port, Host}),
DBConfigured = ejabberd_config:has_option({redis_db, Host}),
PassConfigured = ejabberd_config:has_option({redis_password, Host}),
ReconnTimeoutConfigured = ejabberd_config:has_option(
{redis_reconnect_timeout, Host}),
ConnTimeoutConfigured = ejabberd_config:has_option(
{redis_connect_timeout, Host}),
Modules = ejabberd_config:get_option(
{modules, Host},
fun(L) when is_list(L) -> L end, []),
SMConfigured = ejabberd_config:get_option(
{sm_db_type, Host},
fun(V) -> V end) == redis,
ModuleWithRedisDBConfigured =
lists:any(
fun({Module, Opts}) ->
gen_mod:db_type(Host, Opts, Module) == redis
end, Modules),
ServerConfigured or PortConfigured or DBConfigured or PassConfigured or
ReconnTimeoutConfigured or ConnTimeoutConfigured or
SMConfigured or ModuleWithRedisDBConfigured.
iolist_to_list(IOList) ->
binary_to_list(iolist_to_binary(IOList)).
connect() ->
connect(#state{num = Num}) ->
Server = ejabberd_config:get_option(redis_server,
fun iolist_to_list/1,
"localhost"),
@ -377,36 +356,60 @@ connect() ->
Pass = ejabberd_config:get_option(redis_password,
fun iolist_to_list/1,
""),
ReconnTimeout = timer:seconds(
ejabberd_config:get_option(
redis_reconnect_timeout,
fun(I) when is_integer(I), I>0 -> I end,
1)),
ConnTimeout = timer:seconds(
ejabberd_config:get_option(
redis_connect_timeout,
fun(I) when is_integer(I), I>0 -> I end,
1)),
try case eredis:start_link(Server, Port, DB, Pass,
ReconnTimeout, ConnTimeout) of
no_reconnect, ConnTimeout) of
{ok, Client} ->
?INFO_MSG("Connected to Redis at ~s:~p", [Server, Port]),
unlink(Client),
MRef = erlang:monitor(process, Client),
register(?PROCNAME, Client),
{ok, {Client, MRef}};
?DEBUG("Connection #~p established to Redis at ~s:~p",
[Num, Server, Port]),
register(get_connection(Num), Client),
{ok, Client};
{error, Why} ->
erlang:error(Why)
end
catch _:Reason ->
Timeout = 10,
?ERROR_MSG("Redis connection at ~s:~p has failed: ~p; "
Timeout = randoms:uniform(
min(10, ejabberd_redis_sup:get_pool_size())),
?ERROR_MSG("Redis connection #~p at ~s:~p has failed: ~p; "
"reconnecting in ~p seconds",
[Server, Port, Reason, Timeout]),
[Num, Server, Port, Reason, Timeout]),
erlang:send_after(timer:seconds(Timeout), self(), connect),
{error, Reason}
end.
-spec call({atom(), atom()}, {q | qp, list()}, integer()) ->
{error, disconnected | timeout | binary()} | {ok, iodata()}.
call({Conn, Parent}, {F, Cmd}, Retries) ->
Res = try eredis:F(Conn, Cmd, ?CALL_TIMEOUT) of
{error, Reason} when is_atom(Reason) ->
try exit(whereis(Conn), kill) catch _:_ -> ok end,
{error, disconnected};
Other ->
Other
catch exit:{timeout, _} -> {error, timeout};
exit:{_, {gen_server, call, _}} -> {error, disconnected}
end,
case Res of
{error, disconnected} when Retries > 0 ->
try ?GEN_SERVER:call(Parent, connect, ?CALL_TIMEOUT) of
ok -> call({Conn, Parent}, {F, Cmd}, Retries-1);
{error, _} = Err -> Err
catch exit:{timeout, _} -> {error, timeout};
exit:{_, {?GEN_SERVER, call, _}} -> {error, disconnected}
end;
_ ->
Res
end.
get_worker() ->
Time = p1_time_compat:system_time(),
I = erlang:phash2(Time, ejabberd_redis_sup:get_pool_size()) + 1,
{get_connection(I), get_proc(I)}.
get_result([{error, _} = Err|_]) ->
Err;
get_result([{ok, _} = OK]) ->
@ -436,16 +439,51 @@ reply(Val) ->
_ -> queued
end.
opt_type(redis_connect_timeout) ->
fun (I) when is_integer(I), I > 0 -> I end;
opt_type(redis_db) ->
fun (I) when is_integer(I), I >= 0 -> I end;
opt_type(redis_password) -> fun iolist_to_list/1;
opt_type(redis_port) ->
fun (P) when is_integer(P), P > 0, P < 65536 -> P end;
opt_type(redis_reconnect_timeout) ->
fun (I) when is_integer(I), I > 0 -> I end;
opt_type(redis_server) -> fun iolist_to_list/1;
opt_type(_) ->
[redis_connect_timeout, redis_db, redis_password,
redis_port, redis_reconnect_timeout, redis_server].
iolist_to_list(IOList) ->
binary_to_list(iolist_to_binary(IOList)).
max_fsm_queue() ->
proplists:get_value(max_queue, fsm_limit_opts(), ?DEFAULT_MAX_QUEUE).
fsm_limit_opts() ->
ejabberd_config:fsm_limit_opts([]).
get_queue_type() ->
case ejabberd_config:get_option(
redis_queue_type,
ejabberd_redis_sup:opt_type(redis_queue_type)) of
undefined ->
ejabberd_config:default_queue_type(global);
Type ->
Type
end.
flush_queue(Q) ->
CurrTime = p1_time_compat:monotonic_time(milli_seconds),
p1_queue:dropwhile(
fun({From, Time}) ->
if (CurrTime - Time) >= ?CALL_TIMEOUT ->
ok;
true ->
?GEN_SERVER:reply(From, ok)
end,
true
end, Q).
clean_queue(Q, CurrTime) ->
Q1 = p1_queue:dropwhile(
fun({_From, Time}) ->
(CurrTime - Time) >= ?CALL_TIMEOUT
end, Q),
Len = p1_queue:len(Q1),
Limit = p1_queue:get_limit(Q1),
if Len >= Limit ->
?ERROR_MSG("Redis request queue is overloaded", []),
p1_queue:dropwhile(
fun({From, _Time}) ->
?GEN_SERVER:reply(From, {error, disconnected}),
true
end, Q1);
true ->
Q1
end.

159
src/ejabberd_redis_sup.erl Normal file
View File

@ -0,0 +1,159 @@
%%%-------------------------------------------------------------------
%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
%%% Created : 6 Apr 2017 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2017 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%-------------------------------------------------------------------
-module(ejabberd_redis_sup).
-behaviour(supervisor).
%% API
-export([start_link/0, get_pool_size/0,
host_up/1, config_reloaded/0, opt_type/1]).
%% Supervisor callbacks
-export([init/1]).
-include("ejabberd.hrl").
-include("logger.hrl").
-define(DEFAULT_POOL_SIZE, 10).
%%%===================================================================
%%% API functions
%%%===================================================================
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
host_up(Host) ->
case is_redis_configured(Host) of
true ->
ejabberd:start_app(eredis),
lists:foreach(
fun(Spec) ->
supervisor:start_child(?MODULE, Spec)
end, get_specs());
false ->
ok
end.
config_reloaded() ->
case is_redis_configured() of
true ->
ejabberd:start_app(eredis),
lists:foreach(
fun(Spec) ->
supervisor:start_child(?MODULE, Spec)
end, get_specs()),
PoolSize = get_pool_size(),
lists:foreach(
fun({Id, _, _, _}) when Id > PoolSize ->
supervisor:terminate_child(?MODULE, Id),
supervisor:delete_child(?MODULE, Id);
(_) ->
ok
end, supervisor:which_children(?MODULE));
false ->
lists:foreach(
fun({Id, _, _, _}) ->
supervisor:terminate_child(?MODULE, Id),
supervisor:delete_child(?MODULE, Id)
end, supervisor:which_children(?MODULE))
end.
%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================
init([]) ->
ejabberd_hooks:add(config_reloaded, ?MODULE, config_reloaded, 20),
ejabberd_hooks:add(host_up, ?MODULE, host_up, 20),
Specs = case is_redis_configured() of
true ->
ejabberd:start_app(eredis),
get_specs();
false ->
[]
end,
{ok, {{one_for_one, 500, 1}, Specs}}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
is_redis_configured() ->
lists:any(fun is_redis_configured/1, ?MYHOSTS).
is_redis_configured(Host) ->
ServerConfigured = ejabberd_config:has_option({redis_server, Host}),
PortConfigured = ejabberd_config:has_option({redis_port, Host}),
DBConfigured = ejabberd_config:has_option({redis_db, Host}),
PassConfigured = ejabberd_config:has_option({redis_password, Host}),
PoolSize = ejabberd_config:has_option({redis_pool_size, Host}),
ConnTimeoutConfigured = ejabberd_config:has_option(
{redis_connect_timeout, Host}),
Modules = ejabberd_config:get_option(
{modules, Host},
fun(L) when is_list(L) -> L end, []),
SMConfigured = ejabberd_config:get_option(
{sm_db_type, Host},
fun(V) -> V end) == redis,
RouterConfigured = ejabberd_config:get_option(
{router_db_type, Host},
fun(V) -> V end) == redis,
ModuleWithRedisDBConfigured =
lists:any(
fun({Module, Opts}) ->
gen_mod:db_type(Host, Opts, Module) == redis
end, Modules),
ServerConfigured or PortConfigured or DBConfigured or PassConfigured or
PoolSize or ConnTimeoutConfigured or
SMConfigured or RouterConfigured or ModuleWithRedisDBConfigured.
get_specs() ->
lists:map(
fun(I) ->
{I, {ejabberd_redis, start_link, [I]},
transient, 2000, worker, [?MODULE]}
end, lists:seq(1, get_pool_size())).
get_pool_size() ->
ejabberd_config:get_option(
redis_pool_size,
fun(N) when is_integer(N), N >= 1 -> N end,
?DEFAULT_POOL_SIZE).
iolist_to_list(IOList) ->
binary_to_list(iolist_to_binary(IOList)).
opt_type(redis_connect_timeout) ->
fun (I) when is_integer(I), I > 0 -> I end;
opt_type(redis_db) ->
fun (I) when is_integer(I), I >= 0 -> I end;
opt_type(redis_password) -> fun iolist_to_list/1;
opt_type(redis_port) ->
fun (P) when is_integer(P), P > 0, P < 65536 -> P end;
opt_type(redis_server) -> fun iolist_to_list/1;
opt_type(redis_pool_size) ->
fun (I) when is_integer(I), I > 0 -> I end;
opt_type(redis_queue_type) ->
fun(ram) -> ram; (file) -> file end;
opt_type(_) ->
[redis_connect_timeout, redis_db, redis_password,
redis_port, redis_pool_size, redis_server,
redis_pool_size, redis_queue_type].

View File

@ -115,8 +115,9 @@ init([]) ->
RiakSupervisor = {ejabberd_riak_sup,
{ejabberd_riak_sup, start_link, []},
permanent, infinity, supervisor, [ejabberd_riak_sup]},
Redis = {ejabberd_redis, {ejabberd_redis, start_link, []},
permanent, 5000, worker, [ejabberd_redis]},
RedisSupervisor = {ejabberd_redis_sup,
{ejabberd_redis_sup, start_link, []},
permanent, infinity, supervisor, [ejabberd_redis_sup]},
Router = {ejabberd_router, {ejabberd_router, start_link, []},
permanent, 5000, worker, [ejabberd_router]},
RouterMulticast = {ejabberd_router_multicast,
@ -168,7 +169,7 @@ init([]) ->
BackendSupervisor,
SQLSupervisor,
RiakSupervisor,
Redis,
RedisSupervisor,
Router,
RouterMulticast,
Local,