diff --git a/src/ejabberd_redis.erl b/src/ejabberd_redis.erl index dbd55e914..ec5d73596 100644 --- a/src/ejabberd_redis.erl +++ b/src/ejabberd_redis.erl @@ -32,7 +32,9 @@ %% API -export([start_link/0, q/1, qp/1, config_reloaded/0, opt_type/1]). %% Commands --export([multi/1, get/1, set/2, del/1, sadd/2, srem/2, smembers/1, scard/1]). +-export([multi/1, get/1, set/2, del/1, + sadd/2, srem/2, smembers/1, sismember/2, scard/1, + hget/2, hset/3, hdel/2, hlen/1, hgetall/1, hkeys/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -57,12 +59,14 @@ start_link() -> q(Command) -> try eredis:q(?PROCNAME, Command) - catch _:Reason -> {error, Reason} + catch _:{noproc, _} -> {error, disconnected}; + _:{timeout, _} -> {error, timeout} end. qp(Pipeline) -> try eredis:qp(?PROCNAME, Pipeline) - catch _:Reason -> {error, Reason} + catch _:{noproc, _} -> {error, disconnected}; + _:{timeout, _} -> {error, timeout} end. -spec multi(fun(() -> any())) -> {ok, list()} | redis_error(). @@ -95,6 +99,7 @@ config_reloaded() -> ?MODULE ! disconnect end. +-spec get(iodata()) -> {ok, undefined | binary()} | redis_error(). get(Key) -> case erlang:get(?TR_STACK) of undefined -> @@ -113,11 +118,12 @@ set(Key, Val) -> {error, _} = Err -> Err end; Stack -> - erlang:put(?TR_STACK, [Cmd|Stack]), - queued + tr_enq(Cmd, Stack) end. -spec del(list()) -> {ok, non_neg_integer()} | redis_error() | queued. +del([]) -> + reply(0); del(Keys) -> Cmd = [<<"DEL">>|Keys], case erlang:get(?TR_STACK) of @@ -127,11 +133,12 @@ del(Keys) -> {error, _} = Err -> Err end; Stack -> - erlang:put(?TR_STACK, [Cmd|Stack]), - queued + tr_enq(Cmd, Stack) end. -spec sadd(iodata(), list()) -> {ok, non_neg_integer()} | redis_error() | queued. +sadd(_Set, []) -> + reply(0); sadd(Set, Members) -> Cmd = [<<"SADD">>, Set|Members], case erlang:get(?TR_STACK) of @@ -141,11 +148,12 @@ sadd(Set, Members) -> {error, _} = Err -> Err end; Stack -> - erlang:put(?TR_STACK, [Cmd|Stack]), - queued + tr_enq(Cmd, Stack) end. -spec srem(iodata(), list()) -> {ok, non_neg_integer()} | redis_error() | queued. +srem(_Set, []) -> + reply(0); srem(Set, Members) -> Cmd = [<<"SREM">>, Set|Members], case erlang:get(?TR_STACK) of @@ -155,8 +163,7 @@ srem(Set, Members) -> {error, _} = Err -> Err end; Stack -> - erlang:put(?TR_STACK, [Cmd|Stack]), - queued + tr_enq(Cmd, Stack) end. -spec smembers(iodata()) -> {ok, [binary()]} | redis_error(). @@ -168,6 +175,18 @@ smembers(Set) -> {error, transaction_unsupported} end. +-spec sismember(iodata(), iodata()) -> boolean() | redis_error(). +sismember(Set, Member) -> + case erlang:get(?TR_STACK) of + undefined -> + case q([<<"SISMEMBER">>, Set, Member]) of + {ok, Flag} -> {ok, dec_bool(Flag)}; + {error, _} = Err -> Err + end; + _ -> + {error, transaction_unsupported} + end. + -spec scard(iodata()) -> {ok, non_neg_integer()} | redis_error(). scard(Set) -> case erlang:get(?TR_STACK) of @@ -182,6 +201,76 @@ scard(Set) -> {error, transaction_unsupported} end. +-spec hget(iodata(), iodata()) -> {ok, undefined | binary()} | redis_error(). +hget(Key, Field) -> + case erlang:get(?TR_STACK) of + undefined -> + q([<<"HGET">>, Key, Field]); + _ -> + {error, transaction_unsupported} + end. + +-spec hset(iodata(), iodata(), iodata()) -> {ok, boolean()} | redis_error() | queued. +hset(Key, Field, Val) -> + Cmd = [<<"HSET">>, Key, Field, Val], + case erlang:get(?TR_STACK) of + undefined -> + case q(Cmd) of + {ok, Flag} -> {ok, dec_bool(Flag)}; + {error, _} = Err -> Err + end; + Stack -> + tr_enq(Cmd, Stack) + end. + +-spec hdel(iodata(), list()) -> {ok, non_neg_integer()} | redis_error() | queued. +hdel(_Key, []) -> + reply(0); +hdel(Key, Fields) -> + Cmd = [<<"HDEL">>, Key|Fields], + case erlang:get(?TR_STACK) of + undefined -> + case q(Cmd) of + {ok, N} -> {ok, binary_to_integer(N)}; + {error, _} = Err -> Err + end; + Stack -> + tr_enq(Cmd, Stack) + end. + +-spec hgetall(iodata()) -> {ok, [{binary(), binary()}]} | redis_error(). +hgetall(Key) -> + case erlang:get(?TR_STACK) of + undefined -> + case q([<<"HGETALL">>, Key]) of + {ok, Pairs} -> {ok, decode_pairs(Pairs)}; + {error, _} = Err -> Err + end; + _ -> + {error, transaction_unsupported} + end. + +-spec hlen(iodata()) -> {ok, non_neg_integer()} | redis_error(). +hlen(Key) -> + case erlang:get(?TR_STACK) of + undefined -> + case q([<<"HLEN">>, Key]) of + {ok, N} -> {ok, binary_to_integer(N)}; + {error, _} = Err -> Err + end; + _ -> + {error, transaction_unsupported} + end. + +-spec hkeys(iodata()) -> {ok, [binary()]} | redis_error(). +hkeys(Key) -> + case erlang:get(?TR_STACK) of + undefined -> + q([<<"HKEYS">>, Key]); + _ -> + {error, transaction_unsupported} + end. + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -325,6 +414,28 @@ get_result([{ok, _} = OK]) -> get_result([_|T]) -> get_result(T). +-spec tr_enq([iodata()], list()) -> queued. +tr_enq(Cmd, Stack) -> + erlang:put(?TR_STACK, [Cmd|Stack]), + queued. + +decode_pairs(Pairs) -> + decode_pairs(Pairs, []). + +decode_pairs([Field, Val|Pairs], Acc) -> + decode_pairs(Pairs, [{Field, Val}|Acc]); +decode_pairs([], Acc) -> + lists:reverse(Acc). + +dec_bool(<<$1>>) -> true; +dec_bool(<<$0>>) -> false. + +reply(Val) -> + case erlang:get(?TR_STACK) of + undefined -> {ok, Val}; + _ -> queued + end. + opt_type(redis_connect_timeout) -> fun (I) when is_integer(I), I > 0 -> I end; opt_type(redis_db) -> diff --git a/src/ejabberd_router_redis.erl b/src/ejabberd_router_redis.erl index be8d166b9..cf240a2a0 100644 --- a/src/ejabberd_router_redis.erl +++ b/src/ejabberd_router_redis.erl @@ -44,9 +44,12 @@ register_route(Domain, ServerHost, LocalHint, _, Pid) -> DomKey = domain_key(Domain), PidKey = term_to_binary(Pid), T = term_to_binary({ServerHost, LocalHint}), - case ejabberd_redis:qp([["HSET", DomKey, PidKey, T], - ["SADD", ?ROUTES_KEY, Domain]]) of - [{ok, _}, {ok, _}] -> + case ejabberd_redis:multi( + fun() -> + ejabberd_redis:hset(DomKey, PidKey, T), + ejabberd_redis:sadd(?ROUTES_KEY, [Domain]) + end) of + {ok, _} -> ok; Err -> ?ERROR_MSG("failed to register route in redis: ~p", [Err]), @@ -57,13 +60,20 @@ unregister_route(Domain, _, Pid) -> DomKey = domain_key(Domain), PidKey = term_to_binary(Pid), try - {ok, _} = ejabberd_redis:q(["HDEL", DomKey, PidKey]), - {ok, Num} = ejabberd_redis:q(["HLEN", DomKey]), - case binary_to_integer(Num) of - 0 -> - {ok, _} = ejabberd_redis:q(["SREM", ?ROUTES_KEY, Domain]), - ok; - _ -> + {ok, Num} = ejabberd_redis:hdel(DomKey, [PidKey]), + if Num > 0 -> + {ok, Len} = ejabberd_redis:hlen(DomKey), + if Len == 0 -> + {ok, _} = ejabberd_redis:multi( + fun() -> + ejabberd_redis:del([DomKey]), + ejabberd_redis:srem(?ROUTES_KEY, [Domain]) + end), + ok; + true -> + ok + end; + true -> ok end catch _:{badmatch, Err} -> @@ -73,7 +83,7 @@ unregister_route(Domain, _, Pid) -> find_routes(Domain) -> DomKey = domain_key(Domain), - case ejabberd_redis:q(["HGETALL", DomKey]) of + case ejabberd_redis:hgetall(DomKey) of {ok, Vals} -> decode_routes(Domain, Vals); Err -> @@ -83,8 +93,8 @@ find_routes(Domain) -> host_of_route(Domain) -> DomKey = domain_key(Domain), - case ejabberd_redis:q(["HGETALL", DomKey]) of - {ok, [_, Data|_]} -> + case ejabberd_redis:hgetall(DomKey) of + {ok, [{_Pid, Data}|_]} -> {ServerHost, _} = binary_to_term(Data), {ok, ServerHost}; {ok, []} -> @@ -95,9 +105,9 @@ host_of_route(Domain) -> end. is_my_route(Domain) -> - case ejabberd_redis:q(["SISMEMBER", ?ROUTES_KEY, Domain]) of - {ok, <<"1">>} -> true; - {ok, _} -> false; + case ejabberd_redis:sismember(?ROUTES_KEY, Domain) of + {ok, Bool} -> + Bool; Err -> ?ERROR_MSG("failed to check route in redis: ~p", [Err]), false @@ -107,7 +117,7 @@ is_my_host(Domain) -> {ok, Domain} == host_of_route(Domain). get_all_routes() -> - case ejabberd_redis:q(["SMEMBERS", ?ROUTES_KEY]) of + case ejabberd_redis:smembers(?ROUTES_KEY) of {ok, Routes} -> Routes; Err -> @@ -116,18 +126,7 @@ get_all_routes() -> end. find_routes() -> - lists:flatmap( - fun(Domain) -> - DomKey = domain_key(Domain), - case ejabberd_redis:q(["HGETALL", DomKey]) of - {ok, Vals} -> - decode_routes(Domain, Vals); - Err -> - ?ERROR_MSG("failed to fetch routes from redis: ~p", - [Err]), - [] - end - end, get_all_routes()). + lists:flatmap(fun find_routes/1, get_all_routes()). %%%=================================================================== %%% Internal functions @@ -143,12 +142,12 @@ clean_table() -> domain_key(Domain) -> <<"ejabberd:route:", Domain/binary>>. -decode_routes(Domain, [Pid, Data|Vals]) -> - {ServerHost, LocalHint} = binary_to_term(Data), - [#route{domain = Domain, - pid = binary_to_term(Pid), - server_host = ServerHost, - local_hint = LocalHint}| - decode_routes(Domain, Vals)]; -decode_routes(_, []) -> - []. +decode_routes(Domain, Vals) -> + lists:map( + fun({Pid, Data}) -> + {ServerHost, LocalHint} = binary_to_term(Data), + #route{domain = Domain, + pid = binary_to_term(Pid), + server_host = ServerHost, + local_hint = LocalHint} + end, Vals). diff --git a/src/ejabberd_sm_redis.erl b/src/ejabberd_sm_redis.erl index 689e0ccdb..8c9dc56d3 100644 --- a/src/ejabberd_sm_redis.erl +++ b/src/ejabberd_sm_redis.erl @@ -50,9 +50,12 @@ set_session(Session) -> SIDKey = sid_to_key(Session#session.sid), ServKey = server_to_key(element(2, Session#session.us)), USSIDKey = us_sid_to_key(Session#session.us, Session#session.sid), - case ejabberd_redis:qp([["HSET", USKey, SIDKey, T], - ["HSET", ServKey, USSIDKey, T]]) of - [{ok, _}, {ok, _}] -> + case ejabberd_redis:multi( + fun() -> + ejabberd_redis:hset(USKey, SIDKey, T), + ejabberd_redis:hset(ServKey, USSIDKey, T) + end) of + {ok, _} -> ok; Err -> ?ERROR_MSG("failed to set session for redis: ~p", [Err]) @@ -62,7 +65,7 @@ set_session(Session) -> {ok, #session{}} | {error, notfound}. delete_session(LUser, LServer, _LResource, SID) -> USKey = us_to_key({LUser, LServer}), - case ejabberd_redis:q(["HGETALL", USKey]) of + case ejabberd_redis:hgetall(USKey) of {ok, Vals} -> Ss = decode_session_list(Vals), case lists:keyfind(SID, #session.sid, Ss) of @@ -72,8 +75,16 @@ delete_session(LUser, LServer, _LResource, SID) -> SIDKey = sid_to_key(SID), ServKey = server_to_key(element(2, Session#session.us)), USSIDKey = us_sid_to_key(Session#session.us, SID), - ejabberd_redis:qp([["HDEL", USKey, SIDKey], - ["HDEL", ServKey, USSIDKey]]), + case ejabberd_redis:multi( + fun() -> + ejabberd_redis:hdel(USKey, [SIDKey]), + ejabberd_redis:hdel(ServKey, [USSIDKey]) + end) of + {ok, _} -> + ok; + Err -> + ?ERROR_MSG("failed to delete session from redis: ~p", [Err]) + end, {ok, Session} end; Err -> @@ -91,7 +102,7 @@ get_sessions() -> -spec get_sessions(binary()) -> [#session{}]. get_sessions(LServer) -> ServKey = server_to_key(LServer), - case ejabberd_redis:q(["HGETALL", ServKey]) of + case ejabberd_redis:hgetall(ServKey) of {ok, Vals} -> decode_session_list(Vals); Err -> @@ -102,8 +113,8 @@ get_sessions(LServer) -> -spec get_sessions(binary(), binary()) -> [#session{}]. get_sessions(LUser, LServer) -> USKey = us_to_key({LUser, LServer}), - case ejabberd_redis:q(["HGETALL", USKey]) of - {ok, Vals} when is_list(Vals) -> + case ejabberd_redis:hgetall(USKey) of + {ok, Vals} -> decode_session_list(Vals); Err -> ?ERROR_MSG("failed to get sessions from redis: ~p", [Err]), @@ -114,8 +125,8 @@ get_sessions(LUser, LServer) -> [#session{}]. get_sessions(LUser, LServer, LResource) -> USKey = us_to_key({LUser, LServer}), - case ejabberd_redis:q(["HGETALL", USKey]) of - {ok, Vals} when is_list(Vals) -> + case ejabberd_redis:hgetall(USKey) of + {ok, Vals} -> [S || S <- decode_session_list(Vals), element(3, S#session.usr) == LResource]; Err -> @@ -141,52 +152,36 @@ us_sid_to_key(US, SID) -> sid_to_key(SID) -> term_to_binary(SID). -decode_session_list([_, Val|T]) -> - [binary_to_term(Val)|decode_session_list(T)]; -decode_session_list([]) -> - []. +decode_session_list(Vals) -> + [binary_to_term(Val) || {_, Val} <- Vals]. clean_table() -> ?INFO_MSG("Cleaning Redis SM table...", []), - lists:foreach( - fun(LServer) -> - ServKey = server_to_key(LServer), - case ejabberd_redis:q(["HKEYS", ServKey]) of - {ok, []} -> - ok; - {ok, Vals} -> - Vals1 = lists:filter( - fun(USSIDKey) -> - {_, SID} = binary_to_term(USSIDKey), - node(element(2, SID)) == node() - end, Vals), - Q1 = case Vals1 of - [] -> []; - _ -> ["HDEL", ServKey | Vals1] - end, - Q2 = lists:map( - fun(USSIDKey) -> - {US, SID} = binary_to_term(USSIDKey), - USKey = us_to_key(US), - SIDKey = sid_to_key(SID), - ["HDEL", USKey, SIDKey] - end, Vals1), - Res = ejabberd_redis:qp(lists:delete([], [Q1|Q2])), - case lists:filter( - fun({ok, _}) -> false; - (_) -> true - end, Res) of - [] -> - ok; - Errs -> - ?ERROR_MSG("failed to clean redis table for " - "server ~s: ~p", [LServer, Errs]) - end; - Err -> - ?ERROR_MSG("failed to clean redis table for " - "server ~s: ~p", [LServer, Err]) - end - end, ejabberd_sm:get_vh_by_backend(?MODULE)). + try + lists:foreach( + fun(LServer) -> + ServKey = server_to_key(LServer), + {ok, Vals} = ejabberd_redis:hkeys(ServKey), + {ok, _} = + ejabberd_redis:multi( + fun() -> + lists:foreach( + fun(USSIDKey) -> + {US, SID} = binary_to_term(USSIDKey), + if node(element(2, SID)) == node() -> + USKey = us_to_key(US), + SIDKey = sid_to_key(SID), + ejabberd_redis:hdel(ServKey, [USSIDKey]), + ejabberd_redis:hdel(USKey, [SIDKey]); + true -> + ok + end + end, Vals) + end) + end, ejabberd_sm:get_vh_by_backend(?MODULE)) + catch _:{badmatch, {error, _} = Err} -> + ?ERROR_MSG("failed to clean redis c2s sessions: ~p", [Err]) + end. opt_type(redis_connect_timeout) -> fun (I) when is_integer(I), I > 0 -> I end; diff --git a/src/mod_bosh_redis.erl b/src/mod_bosh_redis.erl index 0b61223bf..ca23f72bb 100644 --- a/src/mod_bosh_redis.erl +++ b/src/mod_bosh_redis.erl @@ -25,7 +25,7 @@ init() -> open_session(SID, Pid) -> PidBin = term_to_binary(Pid), - case ejabberd_redis:q(["HSET", ?BOSH_KEY, SID, PidBin]) of + case ejabberd_redis:hset(?BOSH_KEY, SID, PidBin) of {ok, _} -> ok; Err -> @@ -34,7 +34,7 @@ open_session(SID, Pid) -> end. close_session(SID) -> - case ejabberd_redis:q(["HDEL", ?BOSH_KEY, SID]) of + case ejabberd_redis:hdel(?BOSH_KEY, [SID]) of {ok, _} -> ok; Err -> @@ -42,9 +42,15 @@ close_session(SID) -> end. find_session(SID) -> - case ejabberd_redis:q(["HGET", ?BOSH_KEY, SID]) of + case ejabberd_redis:hget(?BOSH_KEY, SID) of {ok, Pid} when is_binary(Pid) -> - {ok, binary_to_term(Pid)}; + try + {ok, binary_to_term(Pid)} + catch _:badarg -> + ?ERROR_MSG("malformed data in redis (key = '~s'): ~p", + [SID, Pid]), + error + end; {ok, _} -> error; Err -> @@ -56,21 +62,23 @@ find_session(SID) -> %%% Internal functions %%%=================================================================== clean_table() -> - ?INFO_MSG("Cleaning Redis BOSH table...", []), - case ejabberd_redis:q(["HGETALL", ?BOSH_KEY]) of + ?INFO_MSG("Cleaning Redis BOSH sessions...", []), + case ejabberd_redis:hgetall(?BOSH_KEY) of {ok, Vals} -> - clean_table(Vals); + case ejabberd_redis:multi( + fun() -> + lists:foreach( + fun({SID, Pid}) when node(Pid) == node() -> + ejabberd_redis:hdel(?BOSH_KEY, [SID]); + (_) -> + ok + end, Vals) + end) of + {ok, _} -> + ok; + Err -> + ?ERROR_MSG("failed to clean bosh sessions in redis: ~p", [Err]) + end; Err -> - ?ERROR_MSG("failed to clean bosh table in redis: ~p", [Err]) + ?ERROR_MSG("failed to clean bosh sessions in redis: ~p", [Err]) end. - -clean_table([SID, PidBin|Vals]) -> - case binary_to_term(PidBin) of - Pid when node(Pid) == node() -> - close_session(SID); - _ -> - ok - end, - clean_table(Vals); -clean_table([]) -> - ok. diff --git a/src/mod_carboncopy_redis.erl b/src/mod_carboncopy_redis.erl index 4485e3896..4562e68eb 100644 --- a/src/mod_carboncopy_redis.erl +++ b/src/mod_carboncopy_redis.erl @@ -39,9 +39,12 @@ enable(LUser, LServer, LResource, NS) -> USKey = us_key(LUser, LServer), NodeKey = node_key(), JID = jid:encode({LUser, LServer, LResource}), - case ejabberd_redis:qp([["HSET", USKey, LResource, NS], - ["SADD", NodeKey, JID]]) of - [{ok, _}, {ok, _}] -> + case ejabberd_redis:multi( + fun() -> + ejabberd_redis:hset(USKey, LResource, NS), + ejabberd_redis:sadd(NodeKey, [JID]) + end) of + {ok, _} -> ok; Err -> ?ERROR_MSG("failed to write in redis: ~p", [Err]), @@ -52,9 +55,12 @@ disable(LUser, LServer, LResource) -> USKey = us_key(LUser, LServer), NodeKey = node_key(), JID = jid:encode({LUser, LServer, LResource}), - case ejabberd_redis:qp([["HDEL", USKey, LResource], - ["SREM", NodeKey, JID]]) of - [{ok, _}, {ok, _}] -> + case ejabberd_redis:multi( + fun() -> + ejabberd_redis:hdel(USKey, [LResource]), + ejabberd_redis:srem(NodeKey, [JID]) + end) of + {ok, _} -> ok; Err -> ?ERROR_MSG("failed to delete from redis: ~p", [Err]), @@ -63,9 +69,9 @@ disable(LUser, LServer, LResource) -> list(LUser, LServer) -> USKey = us_key(LUser, LServer), - case ejabberd_redis:q(["HGETALL", USKey]) of + case ejabberd_redis:hgetall(USKey) of {ok, Vals} -> - decode_vals(Vals); + Vals; Err -> ?ERROR_MSG("failed to read from redis: ~p", [Err]), [] @@ -77,24 +83,26 @@ list(LUser, LServer) -> clean_table() -> ?INFO_MSG("Cleaning Redis 'carboncopy' table...", []), NodeKey = node_key(), - case ejabberd_redis:q(["SMEMBERS", NodeKey]) of + case ejabberd_redis:smembers(NodeKey) of {ok, JIDs} -> - lists:foreach( - fun(JID) -> - {U, S, R} = jid:split(jid:decode(JID)), - USKey = us_key(U, S), - case ejabberd_redis:q(["HDEL", USKey, R]) of - {ok, _} -> - ok; - Err -> - ?ERROR_MSG("failed to delete from redis: ~p", - [Err]) - end - end, JIDs); + case ejabberd_redis:multi( + fun() -> + lists:foreach( + fun(JID) -> + {U, S, R} = jid:split(jid:decode(JID)), + USKey = us_key(U, S), + ejabberd_redis:hdel(USKey, [R]) + end, JIDs) + end) of + {ok, _} -> + ok; + Err -> + ?ERROR_MSG("failed to delete from redis: ~p", [Err]) + end; Err -> ?ERROR_MSG("failed to read from redis: ~p", [Err]) end, - case ejabberd_redis:q(["DEL", NodeKey]) of + case ejabberd_redis:del([NodeKey]) of {ok, _} -> ok; Error -> ?ERROR_MSG("failed to delete from redis: ~p", [Error]) end. @@ -105,8 +113,3 @@ us_key(LUser, LServer) -> node_key() -> Node = erlang:atom_to_binary(node(), latin1), <<"ejabberd:carboncopy:nodes:", Node/binary>>. - -decode_vals([Resource, NS|Vals]) -> - [{Resource, NS}|decode_vals(Vals)]; -decode_vals([]) -> - [].