From 068db1a2d98c338b408f84b5a294ae7400256e13 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Mon, 9 May 2016 08:36:30 +0300 Subject: [PATCH] Handle Redis connection in a separate module --- src/ejabberd_app.erl | 1 + src/ejabberd_config.erl | 6 +- src/ejabberd_redis.erl | 179 ++++++++++++++++++++++++++++++++++++++ src/ejabberd_sm_redis.erl | 58 +++--------- 4 files changed, 196 insertions(+), 48 deletions(-) create mode 100644 src/ejabberd_redis.erl diff --git a/src/ejabberd_app.erl b/src/ejabberd_app.erl index b25bf231b..44d0db626 100644 --- a/src/ejabberd_app.erl +++ b/src/ejabberd_app.erl @@ -63,6 +63,7 @@ start(normal, _Args) -> Sup = ejabberd_sup:start_link(), ejabberd_rdbms:start(), ejabberd_riak_sup:start(), + ejabberd_redis:start(), ejabberd_sm:start(), cyrsasl:start(), % Profiling diff --git a/src/ejabberd_config.erl b/src/ejabberd_config.erl index 06de61b5f..16eebc0e3 100644 --- a/src/ejabberd_config.erl +++ b/src/ejabberd_config.erl @@ -30,7 +30,7 @@ add_global_option/2, add_local_option/2, get_global_option/2, get_local_option/2, get_global_option/3, get_local_option/3, - get_option/2, get_option/3, add_option/2, + get_option/2, get_option/3, add_option/2, has_option/1, get_vh_by_auth_method/1, is_file_readable/1, get_version/0, get_myhosts/0, get_mylang/0, prepare_opt_val/4, convert_table_to_binary/5, @@ -838,6 +838,10 @@ get_option(Opt, F, Default) -> end end. +-spec has_option(atom() | {atom(), global | binary()}) -> any(). +has_option(Opt) -> + get_option(Opt, fun(_) -> true end, false). + init_module_db_table(Modules) -> catch ets:new(module_db, [named_table, public, bag]), %% Dirty hack for mod_pubsub diff --git a/src/ejabberd_redis.erl b/src/ejabberd_redis.erl new file mode 100644 index 000000000..c6e3b4dd0 --- /dev/null +++ b/src/ejabberd_redis.erl @@ -0,0 +1,179 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov +%%% @copyright (C) 2016, Evgeny Khramtsov +%%% @doc +%%% +%%% @end +%%% Created : 8 May 2016 by Evgeny Khramtsov +%%%------------------------------------------------------------------- +-module(ejabberd_redis). + +-behaviour(gen_server). +-behaviour(ejabberd_config). + +%% API +-export([start/0, start_link/0, q/1, qp/1, opt_type/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). +-define(PROCNAME, 'ejabberd_redis_client'). + +-include("logger.hrl"). +-include("ejabberd.hrl"). + +-record(state, {}). + +%%%=================================================================== +%%% API +%%%=================================================================== +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +start() -> + case lists:any( + fun(Host) -> + is_redis_configured(Host) + end, ?MYHOSTS) of + true -> + Spec = {?MODULE, {?MODULE, start_link, []}, + permanent, 2000, worker, [?MODULE]}, + supervisor:start_child(ejabberd_sup, Spec); + false -> + ok + end. + +q(Command) -> + try eredis:q(?PROCNAME, Command) + catch _:Reason -> {error, Reason} + end. + +qp(Pipeline) -> + try eredis:qp(?PROCNAME, Pipeline) + catch _:Reason -> {error, Reason} + end. + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== +init([]) -> + process_flag(trap_exit, true), + connect(), + {ok, #state{}}. + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(connect, State) -> + connect(), + {noreply, State}; +handle_info({'DOWN', _MRef, _Type, _Pid, Reason}, State) -> + ?INFO_MSG("Redis connection has failed: ~p", [Reason]), + connect(), + {noreply, State}; +handle_info({'EXIT', _, _}, State) -> + {noreply, State}; +handle_info(Info, State) -> + ?INFO_MSG("unexpected info = ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +is_redis_configured(Host) -> + ServerConfigured = ejabberd_config:has_option({redis_server, Host}), + PortConfigured = ejabberd_config:has_option({redis_port, Host}), + DBConfigured = ejabberd_config:has_option({redis_db, Host}), + PassConfigured = ejabberd_config:has_option({redis_password, Host}), + ReconnTimeoutConfigured = ejabberd_config:has_option( + {redis_reconnect_timeout, Host}), + ConnTimeoutConfigured = ejabberd_config:has_option( + {redis_connect_timeout, Host}), + Modules = ejabberd_config:get_option( + {modules, Host}, + fun(L) when is_list(L) -> L end, []), + SMConfigured = ejabberd_config:get_option( + {sm_db_type, Host}, + fun(V) -> V end) == redis, + ModuleWithRedisDBConfigured = + lists:any( + fun({Module, Opts}) -> + gen_mod:db_type(Host, Opts, Module) == redis + end, Modules), + ServerConfigured or PortConfigured or DBConfigured or PassConfigured or + ReconnTimeoutConfigured or ConnTimeoutConfigured or + SMConfigured or ModuleWithRedisDBConfigured. + +iolist_to_list(IOList) -> + binary_to_list(iolist_to_binary(IOList)). + +connect() -> + Server = ejabberd_config:get_option(redis_server, + fun iolist_to_list/1, + "localhost"), + Port = ejabberd_config:get_option(redis_port, + fun(P) when is_integer(P), + P>0, P<65536 -> + P + end, 6379), + DB = ejabberd_config:get_option(redis_db, + fun(I) when is_integer(I), I >= 0 -> + I + end, 0), + Pass = ejabberd_config:get_option(redis_password, + fun iolist_to_list/1, + ""), + ReconnTimeout = timer:seconds( + ejabberd_config:get_option( + redis_reconnect_timeout, + fun(I) when is_integer(I), I>0 -> I end, + 1)), + ConnTimeout = timer:seconds( + ejabberd_config:get_option( + redis_connect_timeout, + fun(I) when is_integer(I), I>0 -> I end, + 1)), + try case eredis:start_link(Server, Port, DB, Pass, + ReconnTimeout, ConnTimeout) of + {ok, Client} -> + ?INFO_MSG("Connected to Redis at ~s:~p", [Server, Port]), + unlink(Client), + erlang:monitor(process, Client), + register(?PROCNAME, Client), + {ok, Client}; + {error, Why} -> + erlang:error(Why) + end + catch _:Reason -> + Timeout = 10, + ?ERROR_MSG("Redis connection at ~s:~p has failed: ~p; " + "reconnecting in ~p seconds", + [Server, Port, Reason, Timeout]), + erlang:send_after(timer:seconds(Timeout), self(), connect) + end. + +opt_type(redis_connect_timeout) -> + fun (I) when is_integer(I), I > 0 -> I end; +opt_type(redis_db) -> + fun (I) when is_integer(I), I >= 0 -> I end; +opt_type(redis_password) -> fun iolist_to_list/1; +opt_type(redis_port) -> + fun (P) when is_integer(P), P > 0, P < 65536 -> P end; +opt_type(redis_reconnect_timeout) -> + fun (I) when is_integer(I), I > 0 -> I end; +opt_type(redis_server) -> fun iolist_to_list/1; +opt_type(_) -> + [redis_connect_timeout, redis_db, redis_password, + redis_port, redis_reconnect_timeout, redis_server]. diff --git a/src/ejabberd_sm_redis.erl b/src/ejabberd_sm_redis.erl index bf9e0eff5..d25f777e3 100644 --- a/src/ejabberd_sm_redis.erl +++ b/src/ejabberd_sm_redis.erl @@ -21,48 +21,12 @@ -include("logger.hrl"). -include("jlib.hrl"). --define(PROCNAME, 'ejabberd_redis_client'). - %%%=================================================================== %%% API %%%=================================================================== -spec init() -> ok | {error, any()}. init() -> - Server = ejabberd_config:get_option(redis_server, - fun iolist_to_list/1, - "localhost"), - Port = ejabberd_config:get_option(redis_port, - fun(P) when is_integer(P), - P>0, P<65536 -> - P - end, 6379), - DB = ejabberd_config:get_option(redis_db, - fun(I) when is_integer(I), I >= 0 -> - I - end, 0), - Pass = ejabberd_config:get_option(redis_password, - fun iolist_to_list/1, - ""), - ReconnTimeout = timer:seconds( - ejabberd_config:get_option( - redis_reconnect_timeout, - fun(I) when is_integer(I), I>0 -> I end, - 1)), - ConnTimeout = timer:seconds( - ejabberd_config:get_option( - redis_connect_timeout, - fun(I) when is_integer(I), I>0 -> I end, - 1)), - case eredis:start_link(Server, Port, DB, Pass, - ReconnTimeout, ConnTimeout) of - {ok, Client} -> - register(?PROCNAME, Client), - clean_table(), - ok; - {error, _} = Err -> - ?ERROR_MSG("failed to start redis client: ~p", [Err]), - Err - end. + clean_table(). -spec set_session(#session{}) -> ok. set_session(Session) -> @@ -71,8 +35,8 @@ set_session(Session) -> SIDKey = sid_to_key(Session#session.sid), ServKey = server_to_key(element(2, Session#session.us)), USSIDKey = us_sid_to_key(Session#session.us, Session#session.sid), - case eredis:qp(?PROCNAME, [["HSET", USKey, SIDKey, T], - ["HSET", ServKey, USSIDKey, T]]) of + case ejabberd_redis:qp([["HSET", USKey, SIDKey, T], + ["HSET", ServKey, USSIDKey, T]]) of [{ok, _}, {ok, _}] -> ok; Err -> @@ -83,7 +47,7 @@ set_session(Session) -> {ok, #session{}} | {error, notfound}. delete_session(LUser, LServer, _LResource, SID) -> USKey = us_to_key({LUser, LServer}), - case eredis:q(?PROCNAME, ["HGETALL", USKey]) of + case ejabberd_redis:q(["HGETALL", USKey]) of {ok, Vals} -> Ss = decode_session_list(Vals), case lists:keyfind(SID, #session.sid, Ss) of @@ -93,8 +57,8 @@ delete_session(LUser, LServer, _LResource, SID) -> SIDKey = sid_to_key(SID), ServKey = server_to_key(element(2, Session#session.us)), USSIDKey = us_sid_to_key(Session#session.us, SID), - eredis:qp(?PROCNAME, [["HDEL", USKey, SIDKey], - ["HDEL", ServKey, USSIDKey]]), + ejabberd_redis:qp([["HDEL", USKey, SIDKey], + ["HDEL", ServKey, USSIDKey]]), {ok, Session} end; Err -> @@ -112,7 +76,7 @@ get_sessions() -> -spec get_sessions(binary()) -> [#session{}]. get_sessions(LServer) -> ServKey = server_to_key(LServer), - case eredis:q(?PROCNAME, ["HGETALL", ServKey]) of + case ejabberd_redis:q(["HGETALL", ServKey]) of {ok, Vals} -> decode_session_list(Vals); Err -> @@ -123,7 +87,7 @@ get_sessions(LServer) -> -spec get_sessions(binary(), binary()) -> [#session{}]. get_sessions(LUser, LServer) -> USKey = us_to_key({LUser, LServer}), - case eredis:q(?PROCNAME, ["HGETALL", USKey]) of + case ejabberd_redis:q(["HGETALL", USKey]) of {ok, Vals} when is_list(Vals) -> decode_session_list(Vals); Err -> @@ -135,7 +99,7 @@ get_sessions(LUser, LServer) -> [#session{}]. get_sessions(LUser, LServer, LResource) -> USKey = us_to_key({LUser, LServer}), - case eredis:q(?PROCNAME, ["HGETALL", USKey]) of + case ejabberd_redis:q(["HGETALL", USKey]) of {ok, Vals} when is_list(Vals) -> [S || S <- decode_session_list(Vals), element(3, S#session.usr) == LResource]; @@ -172,7 +136,7 @@ clean_table() -> lists:foreach( fun(LServer) -> ServKey = server_to_key(LServer), - case eredis:q(?PROCNAME, ["HKEYS", ServKey]) of + case ejabberd_redis:q(["HKEYS", ServKey]) of {ok, []} -> ok; {ok, Vals} -> @@ -189,7 +153,7 @@ clean_table() -> SIDKey = sid_to_key(SID), ["HDEL", USKey, SIDKey] end, Vals1), - Res = eredis:qp(?PROCNAME, [Q1|Q2]), + Res = ejabberd_redis:qp([Q1|Q2]), case lists:filter( fun({ok, _}) -> false; (_) -> true