25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-12-22 17:28:25 +01:00

Handle Redis connection in a separate module

This commit is contained in:
Evgeniy Khramtsov 2016-05-09 08:36:30 +03:00
parent 4717d64d7a
commit 068db1a2d9
4 changed files with 196 additions and 48 deletions

View File

@ -63,6 +63,7 @@ start(normal, _Args) ->
Sup = ejabberd_sup:start_link(),
ejabberd_rdbms:start(),
ejabberd_riak_sup:start(),
ejabberd_redis:start(),
ejabberd_sm:start(),
cyrsasl:start(),
% Profiling

View File

@ -30,7 +30,7 @@
add_global_option/2, add_local_option/2,
get_global_option/2, get_local_option/2,
get_global_option/3, get_local_option/3,
get_option/2, get_option/3, add_option/2,
get_option/2, get_option/3, add_option/2, has_option/1,
get_vh_by_auth_method/1, is_file_readable/1,
get_version/0, get_myhosts/0, get_mylang/0,
prepare_opt_val/4, convert_table_to_binary/5,
@ -838,6 +838,10 @@ get_option(Opt, F, Default) ->
end
end.
-spec has_option(atom() | {atom(), global | binary()}) -> any().
has_option(Opt) ->
get_option(Opt, fun(_) -> true end, false).
init_module_db_table(Modules) ->
catch ets:new(module_db, [named_table, public, bag]),
%% Dirty hack for mod_pubsub

179
src/ejabberd_redis.erl Normal file
View File

@ -0,0 +1,179 @@
%%%-------------------------------------------------------------------
%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
%%% @copyright (C) 2016, Evgeny Khramtsov
%%% @doc
%%%
%%% @end
%%% Created : 8 May 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%-------------------------------------------------------------------
-module(ejabberd_redis).
-behaviour(gen_server).
-behaviour(ejabberd_config).
%% API
-export([start/0, start_link/0, q/1, qp/1, opt_type/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(PROCNAME, 'ejabberd_redis_client').
-include("logger.hrl").
-include("ejabberd.hrl").
-record(state, {}).
%%%===================================================================
%%% API
%%%===================================================================
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
start() ->
case lists:any(
fun(Host) ->
is_redis_configured(Host)
end, ?MYHOSTS) of
true ->
Spec = {?MODULE, {?MODULE, start_link, []},
permanent, 2000, worker, [?MODULE]},
supervisor:start_child(ejabberd_sup, Spec);
false ->
ok
end.
q(Command) ->
try eredis:q(?PROCNAME, Command)
catch _:Reason -> {error, Reason}
end.
qp(Pipeline) ->
try eredis:qp(?PROCNAME, Pipeline)
catch _:Reason -> {error, Reason}
end.
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([]) ->
process_flag(trap_exit, true),
connect(),
{ok, #state{}}.
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(connect, State) ->
connect(),
{noreply, State};
handle_info({'DOWN', _MRef, _Type, _Pid, Reason}, State) ->
?INFO_MSG("Redis connection has failed: ~p", [Reason]),
connect(),
{noreply, State};
handle_info({'EXIT', _, _}, State) ->
{noreply, State};
handle_info(Info, State) ->
?INFO_MSG("unexpected info = ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
is_redis_configured(Host) ->
ServerConfigured = ejabberd_config:has_option({redis_server, Host}),
PortConfigured = ejabberd_config:has_option({redis_port, Host}),
DBConfigured = ejabberd_config:has_option({redis_db, Host}),
PassConfigured = ejabberd_config:has_option({redis_password, Host}),
ReconnTimeoutConfigured = ejabberd_config:has_option(
{redis_reconnect_timeout, Host}),
ConnTimeoutConfigured = ejabberd_config:has_option(
{redis_connect_timeout, Host}),
Modules = ejabberd_config:get_option(
{modules, Host},
fun(L) when is_list(L) -> L end, []),
SMConfigured = ejabberd_config:get_option(
{sm_db_type, Host},
fun(V) -> V end) == redis,
ModuleWithRedisDBConfigured =
lists:any(
fun({Module, Opts}) ->
gen_mod:db_type(Host, Opts, Module) == redis
end, Modules),
ServerConfigured or PortConfigured or DBConfigured or PassConfigured or
ReconnTimeoutConfigured or ConnTimeoutConfigured or
SMConfigured or ModuleWithRedisDBConfigured.
iolist_to_list(IOList) ->
binary_to_list(iolist_to_binary(IOList)).
connect() ->
Server = ejabberd_config:get_option(redis_server,
fun iolist_to_list/1,
"localhost"),
Port = ejabberd_config:get_option(redis_port,
fun(P) when is_integer(P),
P>0, P<65536 ->
P
end, 6379),
DB = ejabberd_config:get_option(redis_db,
fun(I) when is_integer(I), I >= 0 ->
I
end, 0),
Pass = ejabberd_config:get_option(redis_password,
fun iolist_to_list/1,
""),
ReconnTimeout = timer:seconds(
ejabberd_config:get_option(
redis_reconnect_timeout,
fun(I) when is_integer(I), I>0 -> I end,
1)),
ConnTimeout = timer:seconds(
ejabberd_config:get_option(
redis_connect_timeout,
fun(I) when is_integer(I), I>0 -> I end,
1)),
try case eredis:start_link(Server, Port, DB, Pass,
ReconnTimeout, ConnTimeout) of
{ok, Client} ->
?INFO_MSG("Connected to Redis at ~s:~p", [Server, Port]),
unlink(Client),
erlang:monitor(process, Client),
register(?PROCNAME, Client),
{ok, Client};
{error, Why} ->
erlang:error(Why)
end
catch _:Reason ->
Timeout = 10,
?ERROR_MSG("Redis connection at ~s:~p has failed: ~p; "
"reconnecting in ~p seconds",
[Server, Port, Reason, Timeout]),
erlang:send_after(timer:seconds(Timeout), self(), connect)
end.
opt_type(redis_connect_timeout) ->
fun (I) when is_integer(I), I > 0 -> I end;
opt_type(redis_db) ->
fun (I) when is_integer(I), I >= 0 -> I end;
opt_type(redis_password) -> fun iolist_to_list/1;
opt_type(redis_port) ->
fun (P) when is_integer(P), P > 0, P < 65536 -> P end;
opt_type(redis_reconnect_timeout) ->
fun (I) when is_integer(I), I > 0 -> I end;
opt_type(redis_server) -> fun iolist_to_list/1;
opt_type(_) ->
[redis_connect_timeout, redis_db, redis_password,
redis_port, redis_reconnect_timeout, redis_server].

View File

@ -21,48 +21,12 @@
-include("logger.hrl").
-include("jlib.hrl").
-define(PROCNAME, 'ejabberd_redis_client').
%%%===================================================================
%%% API
%%%===================================================================
-spec init() -> ok | {error, any()}.
init() ->
Server = ejabberd_config:get_option(redis_server,
fun iolist_to_list/1,
"localhost"),
Port = ejabberd_config:get_option(redis_port,
fun(P) when is_integer(P),
P>0, P<65536 ->
P
end, 6379),
DB = ejabberd_config:get_option(redis_db,
fun(I) when is_integer(I), I >= 0 ->
I
end, 0),
Pass = ejabberd_config:get_option(redis_password,
fun iolist_to_list/1,
""),
ReconnTimeout = timer:seconds(
ejabberd_config:get_option(
redis_reconnect_timeout,
fun(I) when is_integer(I), I>0 -> I end,
1)),
ConnTimeout = timer:seconds(
ejabberd_config:get_option(
redis_connect_timeout,
fun(I) when is_integer(I), I>0 -> I end,
1)),
case eredis:start_link(Server, Port, DB, Pass,
ReconnTimeout, ConnTimeout) of
{ok, Client} ->
register(?PROCNAME, Client),
clean_table(),
ok;
{error, _} = Err ->
?ERROR_MSG("failed to start redis client: ~p", [Err]),
Err
end.
clean_table().
-spec set_session(#session{}) -> ok.
set_session(Session) ->
@ -71,8 +35,8 @@ set_session(Session) ->
SIDKey = sid_to_key(Session#session.sid),
ServKey = server_to_key(element(2, Session#session.us)),
USSIDKey = us_sid_to_key(Session#session.us, Session#session.sid),
case eredis:qp(?PROCNAME, [["HSET", USKey, SIDKey, T],
["HSET", ServKey, USSIDKey, T]]) of
case ejabberd_redis:qp([["HSET", USKey, SIDKey, T],
["HSET", ServKey, USSIDKey, T]]) of
[{ok, _}, {ok, _}] ->
ok;
Err ->
@ -83,7 +47,7 @@ set_session(Session) ->
{ok, #session{}} | {error, notfound}.
delete_session(LUser, LServer, _LResource, SID) ->
USKey = us_to_key({LUser, LServer}),
case eredis:q(?PROCNAME, ["HGETALL", USKey]) of
case ejabberd_redis:q(["HGETALL", USKey]) of
{ok, Vals} ->
Ss = decode_session_list(Vals),
case lists:keyfind(SID, #session.sid, Ss) of
@ -93,8 +57,8 @@ delete_session(LUser, LServer, _LResource, SID) ->
SIDKey = sid_to_key(SID),
ServKey = server_to_key(element(2, Session#session.us)),
USSIDKey = us_sid_to_key(Session#session.us, SID),
eredis:qp(?PROCNAME, [["HDEL", USKey, SIDKey],
["HDEL", ServKey, USSIDKey]]),
ejabberd_redis:qp([["HDEL", USKey, SIDKey],
["HDEL", ServKey, USSIDKey]]),
{ok, Session}
end;
Err ->
@ -112,7 +76,7 @@ get_sessions() ->
-spec get_sessions(binary()) -> [#session{}].
get_sessions(LServer) ->
ServKey = server_to_key(LServer),
case eredis:q(?PROCNAME, ["HGETALL", ServKey]) of
case ejabberd_redis:q(["HGETALL", ServKey]) of
{ok, Vals} ->
decode_session_list(Vals);
Err ->
@ -123,7 +87,7 @@ get_sessions(LServer) ->
-spec get_sessions(binary(), binary()) -> [#session{}].
get_sessions(LUser, LServer) ->
USKey = us_to_key({LUser, LServer}),
case eredis:q(?PROCNAME, ["HGETALL", USKey]) of
case ejabberd_redis:q(["HGETALL", USKey]) of
{ok, Vals} when is_list(Vals) ->
decode_session_list(Vals);
Err ->
@ -135,7 +99,7 @@ get_sessions(LUser, LServer) ->
[#session{}].
get_sessions(LUser, LServer, LResource) ->
USKey = us_to_key({LUser, LServer}),
case eredis:q(?PROCNAME, ["HGETALL", USKey]) of
case ejabberd_redis:q(["HGETALL", USKey]) of
{ok, Vals} when is_list(Vals) ->
[S || S <- decode_session_list(Vals),
element(3, S#session.usr) == LResource];
@ -172,7 +136,7 @@ clean_table() ->
lists:foreach(
fun(LServer) ->
ServKey = server_to_key(LServer),
case eredis:q(?PROCNAME, ["HKEYS", ServKey]) of
case ejabberd_redis:q(["HKEYS", ServKey]) of
{ok, []} ->
ok;
{ok, Vals} ->
@ -189,7 +153,7 @@ clean_table() ->
SIDKey = sid_to_key(SID),
["HDEL", USKey, SIDKey]
end, Vals1),
Res = eredis:qp(?PROCNAME, [Q1|Q2]),
Res = ejabberd_redis:qp([Q1|Q2]),
case lists:filter(
fun({ok, _}) -> false;
(_) -> true