diff --git a/src/ejabberd_redis.erl b/src/ejabberd_redis.erl index ec5d73596..8eeb19607 100644 --- a/src/ejabberd_redis.erl +++ b/src/ejabberd_redis.erl @@ -23,14 +23,15 @@ %%%---------------------------------------------------------------------- -module(ejabberd_redis). - --behaviour(gen_server). --behaviour(ejabberd_config). +-ifndef(GEN_SERVER). +-define(GEN_SERVER, gen_server). +-endif. +-behaviour(?GEN_SERVER). -compile({no_auto_import, [get/1, put/2]}). %% API --export([start_link/0, q/1, qp/1, config_reloaded/0, opt_type/1]). +-export([start_link/1, get_proc/1, q/1, qp/1]). %% Commands -export([multi/1, get/1, set/2, del/1, sadd/2, srem/2, smembers/1, sismember/2, scard/1, @@ -43,31 +44,40 @@ -define(SERVER, ?MODULE). -define(PROCNAME, 'ejabberd_redis_client'). -define(TR_STACK, redis_transaction_stack). +-define(DEFAULT_MAX_QUEUE, 5000). +-define(MAX_RETRIES, 1). +-define(CALL_TIMEOUT, 60*1000). %% 60 seconds -include("logger.hrl"). -include("ejabberd.hrl"). --record(state, {connection :: {pid(), reference()} | undefined}). +-record(state, {connection :: pid() | undefined, + num :: pos_integer(), + pending_q :: p1_queue:queue()}). -type redis_error() :: {error, binary() | atom()}. %%%=================================================================== %%% API %%%=================================================================== -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +start_link(I) -> + ?GEN_SERVER:start_link({local, get_proc(I)}, ?MODULE, [I], []). + +get_proc(I) -> + aux:binary_to_atom( + iolist_to_binary( + [atom_to_list(?MODULE), $_, integer_to_list(I)])). + +get_connection(I) -> + aux:binary_to_atom( + iolist_to_binary( + [atom_to_list(?MODULE), "_connection_", integer_to_list(I)])). q(Command) -> - try eredis:q(?PROCNAME, Command) - catch _:{noproc, _} -> {error, disconnected}; - _:{timeout, _} -> {error, timeout} - end. + call(get_worker(), {q, Command}, ?MAX_RETRIES). qp(Pipeline) -> - try eredis:qp(?PROCNAME, Pipeline) - catch _:{noproc, _} -> {error, disconnected}; - _:{timeout, _} -> {error, timeout} - end. + call(get_worker(), {qp, Pipeline}, ?MAX_RETRIES). -spec multi(fun(() -> any())) -> {ok, list()} | redis_error(). multi(F) -> @@ -91,14 +101,6 @@ multi(F) -> {error, nested_transaction} end. -config_reloaded() -> - case is_redis_configured() of - true -> - ?MODULE ! connect; - false -> - ?MODULE ! disconnect - end. - -spec get(iodata()) -> {ok, undefined | binary()} | redis_error(). get(Key) -> case erlang:get(?TR_STACK) of @@ -274,56 +276,63 @@ hkeys(Key) -> %%%=================================================================== %%% gen_server callbacks %%%=================================================================== -init([]) -> - ejabberd_hooks:add(config_reloaded, ?MODULE, config_reloaded, 20), +init([I]) -> process_flag(trap_exit, true), - {_, State} = handle_info(connect, #state{}), - {ok, State}. + QueueType = get_queue_type(), + Limit = max_fsm_queue(), + self() ! connect, + {ok, #state{num = I, pending_q = p1_queue:new(QueueType, Limit)}}. -handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State}. +handle_call(connect, From, #state{connection = undefined, + pending_q = Q} = State) -> + CurrTime = p1_time_compat:monotonic_time(milli_seconds), + Q2 = try p1_queue:in({From, CurrTime}, Q) + catch error:full -> + Q1 = clean_queue(Q, CurrTime), + p1_queue:in({From, CurrTime}, Q1) + end, + {noreply, State#state{pending_q = Q2}}; +handle_call(connect, From, #state{connection = Pid} = State) -> + case is_process_alive(Pid) of + true -> + {reply, ok, State}; + false -> + self() ! connect, + handle_call(connect, From, State#state{connection = undefined}) + end; +handle_call(Request, _From, State) -> + ?WARNING_MSG("unexepected call: ~p", [Request]), + {noreply, State}. handle_cast(_Msg, State) -> {noreply, State}. handle_info(connect, #state{connection = undefined} = State) -> - NewState = case is_redis_configured() of - true -> - case connect() of - {ok, Connection} -> - State#state{connection = Connection}; - {error, _} -> - State - end; - false -> + NewState = case connect(State) of + {ok, Connection} -> + Q1 = flush_queue(State#state.pending_q), + State#state{connection = Connection, pending_q = Q1}; + {error, _} -> State end, {noreply, NewState}; handle_info(connect, State) -> %% Already connected {noreply, State}; -handle_info(disconnect, #state{connection = {Pid, MRef}} = State) -> - ?INFO_MSG("Disconnecting from Redis server", []), - erlang:demonitor(MRef, [flush]), - eredis:stop(Pid), - {noreply, State#state{connection = undefined}}; -handle_info(disconnect, State) -> - %% Not connected - {noreply, State}; -handle_info({'DOWN', MRef, _Type, Pid, Reason}, - #state{connection = {Pid, MRef}} = State) -> - ?INFO_MSG("Redis connection has failed: ~p", [Reason]), - connect(), - {noreply, State#state{connection = undefined}}; -handle_info({'EXIT', _, _}, State) -> - {noreply, State}; +handle_info({'EXIT', Pid, _}, State) -> + case State#state.connection of + Pid -> + self() ! connect, + {noreply, State#state{connection = undefined}}; + _ -> + {noreply, State} + end; handle_info(Info, State) -> - ?INFO_MSG("unexpected info = ~p", [Info]), + ?WARNING_MSG("unexpected info = ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> - ejabberd_hooks:delete(config_reloaded, ?MODULE, config_reloaded, 20). + ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -331,37 +340,7 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -is_redis_configured() -> - lists:any(fun is_redis_configured/1, ?MYHOSTS). - -is_redis_configured(Host) -> - ServerConfigured = ejabberd_config:has_option({redis_server, Host}), - PortConfigured = ejabberd_config:has_option({redis_port, Host}), - DBConfigured = ejabberd_config:has_option({redis_db, Host}), - PassConfigured = ejabberd_config:has_option({redis_password, Host}), - ReconnTimeoutConfigured = ejabberd_config:has_option( - {redis_reconnect_timeout, Host}), - ConnTimeoutConfigured = ejabberd_config:has_option( - {redis_connect_timeout, Host}), - Modules = ejabberd_config:get_option( - {modules, Host}, - fun(L) when is_list(L) -> L end, []), - SMConfigured = ejabberd_config:get_option( - {sm_db_type, Host}, - fun(V) -> V end) == redis, - ModuleWithRedisDBConfigured = - lists:any( - fun({Module, Opts}) -> - gen_mod:db_type(Host, Opts, Module) == redis - end, Modules), - ServerConfigured or PortConfigured or DBConfigured or PassConfigured or - ReconnTimeoutConfigured or ConnTimeoutConfigured or - SMConfigured or ModuleWithRedisDBConfigured. - -iolist_to_list(IOList) -> - binary_to_list(iolist_to_binary(IOList)). - -connect() -> +connect(#state{num = Num}) -> Server = ejabberd_config:get_option(redis_server, fun iolist_to_list/1, "localhost"), @@ -377,36 +356,60 @@ connect() -> Pass = ejabberd_config:get_option(redis_password, fun iolist_to_list/1, ""), - ReconnTimeout = timer:seconds( - ejabberd_config:get_option( - redis_reconnect_timeout, - fun(I) when is_integer(I), I>0 -> I end, - 1)), ConnTimeout = timer:seconds( ejabberd_config:get_option( redis_connect_timeout, fun(I) when is_integer(I), I>0 -> I end, 1)), try case eredis:start_link(Server, Port, DB, Pass, - ReconnTimeout, ConnTimeout) of + no_reconnect, ConnTimeout) of {ok, Client} -> - ?INFO_MSG("Connected to Redis at ~s:~p", [Server, Port]), - unlink(Client), - MRef = erlang:monitor(process, Client), - register(?PROCNAME, Client), - {ok, {Client, MRef}}; + ?DEBUG("Connection #~p established to Redis at ~s:~p", + [Num, Server, Port]), + register(get_connection(Num), Client), + {ok, Client}; {error, Why} -> erlang:error(Why) end catch _:Reason -> - Timeout = 10, - ?ERROR_MSG("Redis connection at ~s:~p has failed: ~p; " + Timeout = randoms:uniform( + min(10, ejabberd_redis_sup:get_pool_size())), + ?ERROR_MSG("Redis connection #~p at ~s:~p has failed: ~p; " "reconnecting in ~p seconds", - [Server, Port, Reason, Timeout]), + [Num, Server, Port, Reason, Timeout]), erlang:send_after(timer:seconds(Timeout), self(), connect), {error, Reason} end. +-spec call({atom(), atom()}, {q | qp, list()}, integer()) -> + {error, disconnected | timeout | binary()} | {ok, iodata()}. +call({Conn, Parent}, {F, Cmd}, Retries) -> + Res = try eredis:F(Conn, Cmd, ?CALL_TIMEOUT) of + {error, Reason} when is_atom(Reason) -> + try exit(whereis(Conn), kill) catch _:_ -> ok end, + {error, disconnected}; + Other -> + Other + catch exit:{timeout, _} -> {error, timeout}; + exit:{_, {gen_server, call, _}} -> {error, disconnected} + end, + case Res of + {error, disconnected} when Retries > 0 -> + try ?GEN_SERVER:call(Parent, connect, ?CALL_TIMEOUT) of + ok -> call({Conn, Parent}, {F, Cmd}, Retries-1); + {error, _} = Err -> Err + catch exit:{timeout, _} -> {error, timeout}; + exit:{_, {?GEN_SERVER, call, _}} -> {error, disconnected} + end; + _ -> + Res + end. + +get_worker() -> + Time = p1_time_compat:system_time(), + I = erlang:phash2(Time, ejabberd_redis_sup:get_pool_size()) + 1, + {get_connection(I), get_proc(I)}. + get_result([{error, _} = Err|_]) -> Err; get_result([{ok, _} = OK]) -> @@ -436,16 +439,51 @@ reply(Val) -> _ -> queued end. -opt_type(redis_connect_timeout) -> - fun (I) when is_integer(I), I > 0 -> I end; -opt_type(redis_db) -> - fun (I) when is_integer(I), I >= 0 -> I end; -opt_type(redis_password) -> fun iolist_to_list/1; -opt_type(redis_port) -> - fun (P) when is_integer(P), P > 0, P < 65536 -> P end; -opt_type(redis_reconnect_timeout) -> - fun (I) when is_integer(I), I > 0 -> I end; -opt_type(redis_server) -> fun iolist_to_list/1; -opt_type(_) -> - [redis_connect_timeout, redis_db, redis_password, - redis_port, redis_reconnect_timeout, redis_server]. +iolist_to_list(IOList) -> + binary_to_list(iolist_to_binary(IOList)). + +max_fsm_queue() -> + proplists:get_value(max_queue, fsm_limit_opts(), ?DEFAULT_MAX_QUEUE). + +fsm_limit_opts() -> + ejabberd_config:fsm_limit_opts([]). + +get_queue_type() -> + case ejabberd_config:get_option( + redis_queue_type, + ejabberd_redis_sup:opt_type(redis_queue_type)) of + undefined -> + ejabberd_config:default_queue_type(global); + Type -> + Type + end. + +flush_queue(Q) -> + CurrTime = p1_time_compat:monotonic_time(milli_seconds), + p1_queue:dropwhile( + fun({From, Time}) -> + if (CurrTime - Time) >= ?CALL_TIMEOUT -> + ok; + true -> + ?GEN_SERVER:reply(From, ok) + end, + true + end, Q). + +clean_queue(Q, CurrTime) -> + Q1 = p1_queue:dropwhile( + fun({_From, Time}) -> + (CurrTime - Time) >= ?CALL_TIMEOUT + end, Q), + Len = p1_queue:len(Q1), + Limit = p1_queue:get_limit(Q1), + if Len >= Limit -> + ?ERROR_MSG("Redis request queue is overloaded", []), + p1_queue:dropwhile( + fun({From, _Time}) -> + ?GEN_SERVER:reply(From, {error, disconnected}), + true + end, Q1); + true -> + Q1 + end. diff --git a/src/ejabberd_redis_sup.erl b/src/ejabberd_redis_sup.erl new file mode 100644 index 000000000..23330f87c --- /dev/null +++ b/src/ejabberd_redis_sup.erl @@ -0,0 +1,159 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov +%%% Created : 6 Apr 2017 by Evgeny Khramtsov +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2017 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%------------------------------------------------------------------- +-module(ejabberd_redis_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0, get_pool_size/0, + host_up/1, config_reloaded/0, opt_type/1]). + +%% Supervisor callbacks +-export([init/1]). + +-include("ejabberd.hrl"). +-include("logger.hrl"). + +-define(DEFAULT_POOL_SIZE, 10). + +%%%=================================================================== +%%% API functions +%%%=================================================================== +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +host_up(Host) -> + case is_redis_configured(Host) of + true -> + ejabberd:start_app(eredis), + lists:foreach( + fun(Spec) -> + supervisor:start_child(?MODULE, Spec) + end, get_specs()); + false -> + ok + end. + +config_reloaded() -> + case is_redis_configured() of + true -> + ejabberd:start_app(eredis), + lists:foreach( + fun(Spec) -> + supervisor:start_child(?MODULE, Spec) + end, get_specs()), + PoolSize = get_pool_size(), + lists:foreach( + fun({Id, _, _, _}) when Id > PoolSize -> + supervisor:terminate_child(?MODULE, Id), + supervisor:delete_child(?MODULE, Id); + (_) -> + ok + end, supervisor:which_children(?MODULE)); + false -> + lists:foreach( + fun({Id, _, _, _}) -> + supervisor:terminate_child(?MODULE, Id), + supervisor:delete_child(?MODULE, Id) + end, supervisor:which_children(?MODULE)) + end. + +%%%=================================================================== +%%% Supervisor callbacks +%%%=================================================================== +init([]) -> + ejabberd_hooks:add(config_reloaded, ?MODULE, config_reloaded, 20), + ejabberd_hooks:add(host_up, ?MODULE, host_up, 20), + Specs = case is_redis_configured() of + true -> + ejabberd:start_app(eredis), + get_specs(); + false -> + [] + end, + {ok, {{one_for_one, 500, 1}, Specs}}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +is_redis_configured() -> + lists:any(fun is_redis_configured/1, ?MYHOSTS). + +is_redis_configured(Host) -> + ServerConfigured = ejabberd_config:has_option({redis_server, Host}), + PortConfigured = ejabberd_config:has_option({redis_port, Host}), + DBConfigured = ejabberd_config:has_option({redis_db, Host}), + PassConfigured = ejabberd_config:has_option({redis_password, Host}), + PoolSize = ejabberd_config:has_option({redis_pool_size, Host}), + ConnTimeoutConfigured = ejabberd_config:has_option( + {redis_connect_timeout, Host}), + Modules = ejabberd_config:get_option( + {modules, Host}, + fun(L) when is_list(L) -> L end, []), + SMConfigured = ejabberd_config:get_option( + {sm_db_type, Host}, + fun(V) -> V end) == redis, + RouterConfigured = ejabberd_config:get_option( + {router_db_type, Host}, + fun(V) -> V end) == redis, + ModuleWithRedisDBConfigured = + lists:any( + fun({Module, Opts}) -> + gen_mod:db_type(Host, Opts, Module) == redis + end, Modules), + ServerConfigured or PortConfigured or DBConfigured or PassConfigured or + PoolSize or ConnTimeoutConfigured or + SMConfigured or RouterConfigured or ModuleWithRedisDBConfigured. + +get_specs() -> + lists:map( + fun(I) -> + {I, {ejabberd_redis, start_link, [I]}, + transient, 2000, worker, [?MODULE]} + end, lists:seq(1, get_pool_size())). + +get_pool_size() -> + ejabberd_config:get_option( + redis_pool_size, + fun(N) when is_integer(N), N >= 1 -> N end, + ?DEFAULT_POOL_SIZE). + +iolist_to_list(IOList) -> + binary_to_list(iolist_to_binary(IOList)). + +opt_type(redis_connect_timeout) -> + fun (I) when is_integer(I), I > 0 -> I end; +opt_type(redis_db) -> + fun (I) when is_integer(I), I >= 0 -> I end; +opt_type(redis_password) -> fun iolist_to_list/1; +opt_type(redis_port) -> + fun (P) when is_integer(P), P > 0, P < 65536 -> P end; +opt_type(redis_server) -> fun iolist_to_list/1; +opt_type(redis_pool_size) -> + fun (I) when is_integer(I), I > 0 -> I end; +opt_type(redis_queue_type) -> + fun(ram) -> ram; (file) -> file end; +opt_type(_) -> + [redis_connect_timeout, redis_db, redis_password, + redis_port, redis_pool_size, redis_server, + redis_pool_size, redis_queue_type]. diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl index 0a33a5c76..26e1f9e2e 100644 --- a/src/ejabberd_sup.erl +++ b/src/ejabberd_sup.erl @@ -115,8 +115,9 @@ init([]) -> RiakSupervisor = {ejabberd_riak_sup, {ejabberd_riak_sup, start_link, []}, permanent, infinity, supervisor, [ejabberd_riak_sup]}, - Redis = {ejabberd_redis, {ejabberd_redis, start_link, []}, - permanent, 5000, worker, [ejabberd_redis]}, + RedisSupervisor = {ejabberd_redis_sup, + {ejabberd_redis_sup, start_link, []}, + permanent, infinity, supervisor, [ejabberd_redis_sup]}, Router = {ejabberd_router, {ejabberd_router, start_link, []}, permanent, 5000, worker, [ejabberd_router]}, RouterMulticast = {ejabberd_router_multicast, @@ -168,7 +169,7 @@ init([]) -> BackendSupervisor, SQLSupervisor, RiakSupervisor, - Redis, + RedisSupervisor, Router, RouterMulticast, Local,