From a0fafc383a3088bb2a277aa127f90f16735a1569 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Wed, 11 Mar 2015 14:46:57 +0300 Subject: [PATCH] Add Redis backend for SM --- configure.ac | 15 ++- rebar.config.script | 2 + src/ejabberd_sm.erl | 12 ++- src/ejabberd_sm_mnesia.erl | 25 ++--- src/ejabberd_sm_odbc.erl | 40 +++---- src/ejabberd_sm_redis.erl | 208 +++++++++++++++++++++++++++++++++++++ vars.config.in | 1 + 7 files changed, 256 insertions(+), 47 deletions(-) create mode 100644 src/ejabberd_sm_redis.erl diff --git a/configure.ac b/configure.ac index edd65747a..6bef15864 100644 --- a/configure.ac +++ b/configure.ac @@ -106,10 +106,10 @@ AC_ARG_ENABLE(mssql, esac],[db_type=generic]) AC_ARG_ENABLE(all, -[AC_HELP_STRING([--enable-all], [same as --enable-nif --enable-odbc --enable-mysql --enable-pgsql --enable-pam --enable-zlib --enable-riak --enable-json --enable-elixir --enable-iconv --enable-debug --enable-lager --enable-tools (useful for Dialyzer checks, default: no)])], +[AC_HELP_STRING([--enable-all], [same as --enable-nif --enable-odbc --enable-mysql --enable-pgsql --enable-pam --enable-zlib --enable-riak --enable-redis --enable-json --enable-elixir --enable-iconv --enable-debug --enable-lager --enable-tools (useful for Dialyzer checks, default: no)])], [case "${enableval}" in - yes) nif=true odbc=true mysql=true pgsql=true pam=true zlib=true riak=true json=true elixir=true iconv=true debug=true lager=true tools=true ;; - no) nif=false odbc=false mysql=false pgsql=false pam=false zlib=false riak=false json=false elixir=false iconv=false debug=false lager=false tools=false ;; + yes) nif=true odbc=true mysql=true pgsql=true pam=true zlib=true riak=true redis=true json=true elixir=true iconv=true debug=true lager=true tools=true ;; + no) nif=false odbc=false mysql=false pgsql=false pam=false zlib=false riak=false redis=false json=false elixir=false iconv=false debug=false lager=false tools=false ;; *) AC_MSG_ERROR(bad value ${enableval} for --enable-all) ;; esac],[]) @@ -177,6 +177,14 @@ AC_ARG_ENABLE(riak, *) AC_MSG_ERROR(bad value ${enableval} for --enable-riak) ;; esac],[if test "x$riak" = "x"; then riak=false; fi]) +AC_ARG_ENABLE(redis, +[AC_HELP_STRING([--enable-redis], [enable Redis support (default: no)])], +[case "${enableval}" in + yes) redis=true ;; + no) redis=false ;; + *) AC_MSG_ERROR(bad value ${enableval} for --enable-redis) ;; +esac],[if test "x$redis" = "x"; then redis=false; fi]) + AC_ARG_ENABLE(json, [AC_HELP_STRING([--enable-json], [enable JSON support for mod_bosh (default: no)])], [case "${enableval}" in @@ -249,6 +257,7 @@ AC_SUBST(pgsql) AC_SUBST(pam) AC_SUBST(zlib) AC_SUBST(riak) +AC_SUBST(redis) AC_SUBST(json) AC_SUBST(elixir) AC_SUBST(iconv) diff --git a/rebar.config.script b/rebar.config.script index f7846c0fa..69964852b 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -106,6 +106,8 @@ CfgDeps = lists:flatmap( [{p1_logger, ".*", {git, "git://github.com/processone/p1_logger"}}]; ({tools, true}) -> [{meck, "0.*", {git, "https://github.com/eproxus/meck"}}]; + ({redis, true}) -> + [{eredis, ".*", {git, "git://github.com/wooga/eredis"}}]; (_) -> [] end, Cfg), diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index 67a82d024..abe15d9ff 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -78,9 +78,9 @@ -include("ejabberd_sm.hrl"). -callback init() -> ok | {error, any()}. --callback get_session(binary(), sid()) -> {ok, #session{}} | {error, notfound}. -callback set_session(#session{}) -> ok. --callback delete_session(binary(), sid()) -> ok. +-callback delete_session(binary(), binary(), binary(), sid()) -> + {ok, #session{}} | {error, notfound}. -callback get_sessions() -> [#session{}]. -callback get_sessions(binary()) -> [#session{}]. -callback get_sessions(binary(), binary()) -> [#session{}]. @@ -137,12 +137,13 @@ open_session(SID, User, Server, Resource, Info) -> close_session(SID, User, Server, Resource) -> Mod = get_sm_backend(), + LUser = jlib:nodeprep(User), LServer = jlib:nameprep(Server), - Info = case Mod:get_session(LServer, SID) of + LResource = jlib:resourceprep(Resource), + Info = case Mod:delete_session(LUser, LServer, LResource, SID) of {ok, #session{info = I}} -> I; {error, notfound} -> [] end, - Mod:delete_session(LServer, SID), JID = jlib:make_jid(User, Server, Resource), ejabberd_hooks:run(sm_remove_connection_hook, JID#jid.lserver, [SID, JID, Info]). @@ -731,7 +732,8 @@ get_sm_backend() -> DBType = ejabberd_config:get_option(sm_db_type, fun(mnesia) -> mnesia; (internal) -> mnesia; - (odbc) -> odbc + (odbc) -> odbc; + (redis) -> redis end, mnesia), list_to_atom("ejabberd_sm_" ++ atom_to_list(DBType)). diff --git a/src/ejabberd_sm_mnesia.erl b/src/ejabberd_sm_mnesia.erl index 59a6c64f6..7acc1022d 100644 --- a/src/ejabberd_sm_mnesia.erl +++ b/src/ejabberd_sm_mnesia.erl @@ -13,9 +13,8 @@ %% API -export([init/0, - get_session/2, set_session/1, - delete_session/2, + delete_session/4, get_sessions/0, get_sessions/1, get_sessions/2, @@ -44,22 +43,20 @@ init() -> Err end. --spec get_session(binary(), sid()) -> {ok, #session{}} | {error, notfound}. -get_session(_LServer, SID) -> - case mnesia:dirty_read(session, SID) of - [] -> - {error, notfound}; - [Session] -> - {ok, Session} - end. - -spec set_session(#session{}) -> ok. set_session(Session) -> mnesia:dirty_write(Session). --spec delete_session(binary(), sid()) -> ok. -delete_session(_LServer, SID) -> - mnesia:dirty_delete(session, SID). +-spec delete_session(binary(), binary(), binary(), sid()) -> + {ok, #session{}} | {error, notfound}. +delete_session(_LUser, _LServer, _LResource, SID) -> + case mnesia:dirty_read(session, SID) of + [Session] -> + mnesia:dirty_delete(session, SID), + {ok, Session}; + [] -> + {error, notfound} + end. -spec get_sessions() -> [#session{}]. get_sessions() -> diff --git a/src/ejabberd_sm_odbc.erl b/src/ejabberd_sm_odbc.erl index 55bbc74fb..946f58ffa 100644 --- a/src/ejabberd_sm_odbc.erl +++ b/src/ejabberd_sm_odbc.erl @@ -12,9 +12,8 @@ %% API -export([init/0, - get_session/2, set_session/1, - delete_session/2, + delete_session/4, get_sessions/0, get_sessions/1, get_sessions/2, @@ -31,6 +30,7 @@ -spec init() -> ok | {error, any()}. init() -> Node = ejabberd_odbc:escape(jlib:atom_to_binary(node())), + ?INFO_MSG("Cleaning SQL SM table...", []), lists:foldl( fun(Host, ok) -> case ejabberd_odbc:sql_query( @@ -45,23 +45,6 @@ init() -> Err end, ok, ?MYHOSTS). --spec get_session(binary(), sid()) -> {ok, #session{}} | {error, notfound}. -get_session(LServer, {Now, Pid} = SID) -> - Host = ejabberd_odbc:escape(LServer), - PidS = list_to_binary(erlang:pid_to_list(Pid)), - TS = now_to_timestamp(Now), - case ejabberd_odbc:sql_query( - Host, [<<"select username, resource, priority, info from sm ">>, - <<"where usec='">>, TS, <<"' and pid='">>, PidS, <<"'">>]) of - {selected, _, [[User, Resource, Priority, Info]|_]} -> - {ok, #session{sid = SID, us = {User, Resource}, - usr = {User, Resource, LServer}, - priority = dec_priority(Priority), - info = ejabberd_odbc:decode_term(Info)}}; - {selected, _, []} -> - {error, notfound} - end. - set_session(#session{sid = {Now, Pid}, usr = {U, LServer, R}, priority = Priority, info = Info}) -> Username = ejabberd_odbc:escape(U), @@ -84,16 +67,23 @@ set_session(#session{sid = {Now, Pid}, usr = {U, LServer, R}, ?ERROR_MSG("failed to update 'sm' table: ~p", [Err]) end. -delete_session(LServer, {Now, Pid}) -> +delete_session(_LUser, LServer, _LResource, {Now, Pid}) -> TS = now_to_timestamp(Now), PidS = list_to_binary(erlang:pid_to_list(Pid)), case ejabberd_odbc:sql_query( - LServer, [<<"delete from sm where usec='">>, - TS, <<"' and pid='">>, PidS, <<"'">>]) of - {updated, _} -> - ok; + LServer, + [<<"select usec, pid, username, resource, priority, info ">>, + <<"from sm where usec='">>, TS, <<"' and pid='">>,PidS, <<"'">>]) of + {selected, _, [Row]} -> + ejabberd_odbc:sql_query( + LServer, [<<"delete from sm where usec='">>, + TS, <<"' and pid='">>, PidS, <<"'">>]), + {ok, row_to_session(LServer, Row)}; + {selected, _, []} -> + {error, notfound}; Err -> - ?ERROR_MSG("failed to delete from 'sm' table: ~p", [Err]) + ?ERROR_MSG("failed to delete from 'sm' table: ~p", [Err]), + {error, notfound} end. get_sessions() -> diff --git a/src/ejabberd_sm_redis.erl b/src/ejabberd_sm_redis.erl new file mode 100644 index 000000000..7abab1847 --- /dev/null +++ b/src/ejabberd_sm_redis.erl @@ -0,0 +1,208 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov +%%% @copyright (C) 2015, Evgeny Khramtsov +%%% @doc +%%% +%%% @end +%%% Created : 11 Mar 2015 by Evgeny Khramtsov +%%%------------------------------------------------------------------- +-module(ejabberd_sm_redis). + +-behaviour(ejabberd_sm). + +%% API +-export([init/0, + set_session/1, + delete_session/4, + get_sessions/0, + get_sessions/1, + get_sessions/2, + get_sessions/3]). + +-include("ejabberd.hrl"). +-include("ejabberd_sm.hrl"). +-include("logger.hrl"). +-include("jlib.hrl"). + +-define(PROCNAME, 'ejabberd_redis_client'). + +%%%=================================================================== +%%% API +%%%=================================================================== +-spec init() -> ok | {error, any()}. +init() -> + Server = ejabberd_config:get_option(redis_server, + fun iolist_to_list/1, + "localhost"), + Port = ejabberd_config:get_option(redis_port, + fun(P) when is_integer(P), + P>0, P<65536 -> + P + end, 6379), + DB = ejabberd_config:get_option(redis_db, + fun(I) when is_integer(I), I >= 0 -> + I + end, 0), + Pass = ejabberd_config:get_option(redis_password, + fun iolist_to_list/1, + ""), + ReconnTimeout = timer:seconds( + ejabberd_config:get_option( + redis_reconnect_timeout, + fun(I) when is_integer(I), I>0 -> I end, + 1)), + ConnTimeout = timer:seconds( + ejabberd_config:get_option( + redis_connect_timeout, + fun(I) when is_integer(I), I>0 -> I end, + 1)), + case eredis:start_link(Server, Port, DB, Pass, + ReconnTimeout, ConnTimeout) of + {ok, Client} -> + register(?PROCNAME, Client), + clean_table(), + ok; + {error, _} = Err -> + ?ERROR_MSG("failed to start redis client: ~p", [Err]), + Err + end. + +-spec set_session(#session{}) -> ok. +set_session(Session) -> + T = term_to_binary(Session), + USKey = us_to_key(Session#session.us), + SIDKey = sid_to_key(Session#session.sid), + ServKey = server_to_key(element(2, Session#session.us)), + USSIDKey = us_sid_to_key(Session#session.us, Session#session.sid), + case eredis:qp(?PROCNAME, [["HSET", USKey, SIDKey, T], + ["HSET", ServKey, USSIDKey, T]]) of + [{ok, _}, {ok, _}] -> + ok; + Err -> + ?ERROR_MSG("failed to set session for redis: ~p", [Err]) + end. + +-spec delete_session(binary(), binary(), binary(), sid()) -> ok. +delete_session(LUser, LServer, _LResource, SID) -> + USKey = us_to_key({LUser, LServer}), + case eredis:q(?PROCNAME, ["HGETALL", USKey]) of + {ok, Vals} -> + Ss = decode_session_list(Vals), + case lists:keyfind(SID, #session.sid, Ss) of + false -> + {error, notfound}; + Session -> + SIDKey = sid_to_key(SID), + ServKey = server_to_key(element(2, Session#session.us)), + USSIDKey = us_sid_to_key(Session#session.us, SID), + eredis:qp(?PROCNAME, [["HDEL", USKey, SIDKey], + ["HDEL", ServKey, USSIDKey]]), + {ok, Session} + end; + Err -> + ?ERROR_MSG("failed to delete session from redis: ~p", [Err]), + {error, notfound} + end. + +-spec get_sessions() -> [#session{}]. +get_sessions() -> + lists:flatmap( + fun(LServer) -> + get_sessions(LServer) + end, ?MYHOSTS). + +-spec get_sessions(binary()) -> [#session{}]. +get_sessions(LServer) -> + ServKey = server_to_key(LServer), + case eredis:q(?PROCNAME, ["HGETALL", ServKey]) of + {ok, Vals} -> + decode_session_list(Vals); + Err -> + ?ERROR_MSG("failed to get sessions from redis: ~p", [Err]), + [] + end. + +-spec get_sessions(binary(), binary()) -> [#session{}]. +get_sessions(LUser, LServer) -> + USKey = us_to_key({LUser, LServer}), + case eredis:q(?PROCNAME, ["HGETALL", USKey]) of + {ok, Vals} when is_list(Vals) -> + decode_session_list(Vals); + Err -> + ?ERROR_MSG("failed to get sessions from redis: ~p", [Err]), + [] + end. + +-spec get_sessions(binary(), binary(), binary()) -> [#session{}]. +get_sessions(LUser, LServer, LResource) -> + USKey = us_to_key({LUser, LServer}), + case eredis:q(?PROCNAME, ["HGETALL", USKey]) of + {ok, Vals} when is_list(Vals) -> + [S || S <- decode_session_list(Vals), + element(3, S#session.usr) == LResource]; + Err -> + ?ERROR_MSG("failed to get sessions from redis: ~p", [Err]), + [] + end. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +iolist_to_list(IOList) -> + binary_to_list(iolist_to_binary(IOList)). + +us_to_key({LUser, LServer}) -> + <<"ejabberd:sm:", LUser/binary, "@", LServer/binary>>. + +server_to_key(LServer) -> + <<"ejabberd:sm:", LServer/binary>>. + +us_sid_to_key(US, SID) -> + term_to_binary({US, SID}). + +sid_to_key(SID) -> + term_to_binary(SID). + +decode_session_list([_, Val|T]) -> + [binary_to_term(Val)|decode_session_list(T)]; +decode_session_list([]) -> + []. + +clean_table() -> + ?INFO_MSG("Cleaning Redis SM table...", []), + lists:foreach( + fun(LServer) -> + ServKey = server_to_key(LServer), + case eredis:q(?PROCNAME, ["HKEYS", ServKey]) of + {ok, []} -> + ok; + {ok, Vals} -> + Vals1 = lists:filter( + fun(USSIDKey) -> + {_, SID} = binary_to_term(USSIDKey), + node(element(2, SID)) == node() + end, Vals), + Q1 = ["HDEL", ServKey | Vals1], + Q2 = lists:map( + fun(USSIDKey) -> + {US, SID} = binary_to_term(USSIDKey), + USKey = us_to_key(US), + SIDKey = sid_to_key(SID), + ["HDEL", USKey, SIDKey] + end, Vals1), + Res = eredis:qp(?PROCNAME, [Q1|Q2]), + case lists:filter( + fun({ok, _}) -> false; + (_) -> true + end, Res) of + [] -> + ok; + Errs -> + ?ERROR_MSG("failed to clean redis table for " + "server ~s: ~p", [LServer, Errs]) + end; + Err -> + ?ERROR_MSG("failed to clean redis table for " + "server ~s: ~p", [LServer, Err]) + end + end, ?MYHOSTS). diff --git a/vars.config.in b/vars.config.in index 2efa449ae..cb82775aa 100644 --- a/vars.config.in +++ b/vars.config.in @@ -26,6 +26,7 @@ {pam, @pam@}. {zlib, @zlib@}. {riak, @riak@}. +{redis, @redis@}. {json, @json@}. {elixir, @elixir@}. {lager, @lager@}.