25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-12-22 17:28:25 +01:00

Improve redis related code

This commit is contained in:
Evgeniy Khramtsov 2017-04-02 11:56:09 +03:00
parent 5087e9c2df
commit 9d9037856c
5 changed files with 265 additions and 149 deletions

View File

@ -32,7 +32,9 @@
%% API
-export([start_link/0, q/1, qp/1, config_reloaded/0, opt_type/1]).
%% Commands
-export([multi/1, get/1, set/2, del/1, sadd/2, srem/2, smembers/1, scard/1]).
-export([multi/1, get/1, set/2, del/1,
sadd/2, srem/2, smembers/1, sismember/2, scard/1,
hget/2, hset/3, hdel/2, hlen/1, hgetall/1, hkeys/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -57,12 +59,14 @@ start_link() ->
q(Command) ->
try eredis:q(?PROCNAME, Command)
catch _:Reason -> {error, Reason}
catch _:{noproc, _} -> {error, disconnected};
_:{timeout, _} -> {error, timeout}
end.
qp(Pipeline) ->
try eredis:qp(?PROCNAME, Pipeline)
catch _:Reason -> {error, Reason}
catch _:{noproc, _} -> {error, disconnected};
_:{timeout, _} -> {error, timeout}
end.
-spec multi(fun(() -> any())) -> {ok, list()} | redis_error().
@ -95,6 +99,7 @@ config_reloaded() ->
?MODULE ! disconnect
end.
-spec get(iodata()) -> {ok, undefined | binary()} | redis_error().
get(Key) ->
case erlang:get(?TR_STACK) of
undefined ->
@ -113,11 +118,12 @@ set(Key, Val) ->
{error, _} = Err -> Err
end;
Stack ->
erlang:put(?TR_STACK, [Cmd|Stack]),
queued
tr_enq(Cmd, Stack)
end.
-spec del(list()) -> {ok, non_neg_integer()} | redis_error() | queued.
del([]) ->
reply(0);
del(Keys) ->
Cmd = [<<"DEL">>|Keys],
case erlang:get(?TR_STACK) of
@ -127,11 +133,12 @@ del(Keys) ->
{error, _} = Err -> Err
end;
Stack ->
erlang:put(?TR_STACK, [Cmd|Stack]),
queued
tr_enq(Cmd, Stack)
end.
-spec sadd(iodata(), list()) -> {ok, non_neg_integer()} | redis_error() | queued.
sadd(_Set, []) ->
reply(0);
sadd(Set, Members) ->
Cmd = [<<"SADD">>, Set|Members],
case erlang:get(?TR_STACK) of
@ -141,11 +148,12 @@ sadd(Set, Members) ->
{error, _} = Err -> Err
end;
Stack ->
erlang:put(?TR_STACK, [Cmd|Stack]),
queued
tr_enq(Cmd, Stack)
end.
-spec srem(iodata(), list()) -> {ok, non_neg_integer()} | redis_error() | queued.
srem(_Set, []) ->
reply(0);
srem(Set, Members) ->
Cmd = [<<"SREM">>, Set|Members],
case erlang:get(?TR_STACK) of
@ -155,8 +163,7 @@ srem(Set, Members) ->
{error, _} = Err -> Err
end;
Stack ->
erlang:put(?TR_STACK, [Cmd|Stack]),
queued
tr_enq(Cmd, Stack)
end.
-spec smembers(iodata()) -> {ok, [binary()]} | redis_error().
@ -168,6 +175,18 @@ smembers(Set) ->
{error, transaction_unsupported}
end.
-spec sismember(iodata(), iodata()) -> boolean() | redis_error().
sismember(Set, Member) ->
case erlang:get(?TR_STACK) of
undefined ->
case q([<<"SISMEMBER">>, Set, Member]) of
{ok, Flag} -> {ok, dec_bool(Flag)};
{error, _} = Err -> Err
end;
_ ->
{error, transaction_unsupported}
end.
-spec scard(iodata()) -> {ok, non_neg_integer()} | redis_error().
scard(Set) ->
case erlang:get(?TR_STACK) of
@ -182,6 +201,76 @@ scard(Set) ->
{error, transaction_unsupported}
end.
-spec hget(iodata(), iodata()) -> {ok, undefined | binary()} | redis_error().
hget(Key, Field) ->
case erlang:get(?TR_STACK) of
undefined ->
q([<<"HGET">>, Key, Field]);
_ ->
{error, transaction_unsupported}
end.
-spec hset(iodata(), iodata(), iodata()) -> {ok, boolean()} | redis_error() | queued.
hset(Key, Field, Val) ->
Cmd = [<<"HSET">>, Key, Field, Val],
case erlang:get(?TR_STACK) of
undefined ->
case q(Cmd) of
{ok, Flag} -> {ok, dec_bool(Flag)};
{error, _} = Err -> Err
end;
Stack ->
tr_enq(Cmd, Stack)
end.
-spec hdel(iodata(), list()) -> {ok, non_neg_integer()} | redis_error() | queued.
hdel(_Key, []) ->
reply(0);
hdel(Key, Fields) ->
Cmd = [<<"HDEL">>, Key|Fields],
case erlang:get(?TR_STACK) of
undefined ->
case q(Cmd) of
{ok, N} -> {ok, binary_to_integer(N)};
{error, _} = Err -> Err
end;
Stack ->
tr_enq(Cmd, Stack)
end.
-spec hgetall(iodata()) -> {ok, [{binary(), binary()}]} | redis_error().
hgetall(Key) ->
case erlang:get(?TR_STACK) of
undefined ->
case q([<<"HGETALL">>, Key]) of
{ok, Pairs} -> {ok, decode_pairs(Pairs)};
{error, _} = Err -> Err
end;
_ ->
{error, transaction_unsupported}
end.
-spec hlen(iodata()) -> {ok, non_neg_integer()} | redis_error().
hlen(Key) ->
case erlang:get(?TR_STACK) of
undefined ->
case q([<<"HLEN">>, Key]) of
{ok, N} -> {ok, binary_to_integer(N)};
{error, _} = Err -> Err
end;
_ ->
{error, transaction_unsupported}
end.
-spec hkeys(iodata()) -> {ok, [binary()]} | redis_error().
hkeys(Key) ->
case erlang:get(?TR_STACK) of
undefined ->
q([<<"HKEYS">>, Key]);
_ ->
{error, transaction_unsupported}
end.
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
@ -325,6 +414,28 @@ get_result([{ok, _} = OK]) ->
get_result([_|T]) ->
get_result(T).
-spec tr_enq([iodata()], list()) -> queued.
tr_enq(Cmd, Stack) ->
erlang:put(?TR_STACK, [Cmd|Stack]),
queued.
decode_pairs(Pairs) ->
decode_pairs(Pairs, []).
decode_pairs([Field, Val|Pairs], Acc) ->
decode_pairs(Pairs, [{Field, Val}|Acc]);
decode_pairs([], Acc) ->
lists:reverse(Acc).
dec_bool(<<$1>>) -> true;
dec_bool(<<$0>>) -> false.
reply(Val) ->
case erlang:get(?TR_STACK) of
undefined -> {ok, Val};
_ -> queued
end.
opt_type(redis_connect_timeout) ->
fun (I) when is_integer(I), I > 0 -> I end;
opt_type(redis_db) ->

View File

@ -44,9 +44,12 @@ register_route(Domain, ServerHost, LocalHint, _, Pid) ->
DomKey = domain_key(Domain),
PidKey = term_to_binary(Pid),
T = term_to_binary({ServerHost, LocalHint}),
case ejabberd_redis:qp([["HSET", DomKey, PidKey, T],
["SADD", ?ROUTES_KEY, Domain]]) of
[{ok, _}, {ok, _}] ->
case ejabberd_redis:multi(
fun() ->
ejabberd_redis:hset(DomKey, PidKey, T),
ejabberd_redis:sadd(?ROUTES_KEY, [Domain])
end) of
{ok, _} ->
ok;
Err ->
?ERROR_MSG("failed to register route in redis: ~p", [Err]),
@ -57,13 +60,20 @@ unregister_route(Domain, _, Pid) ->
DomKey = domain_key(Domain),
PidKey = term_to_binary(Pid),
try
{ok, _} = ejabberd_redis:q(["HDEL", DomKey, PidKey]),
{ok, Num} = ejabberd_redis:q(["HLEN", DomKey]),
case binary_to_integer(Num) of
0 ->
{ok, _} = ejabberd_redis:q(["SREM", ?ROUTES_KEY, Domain]),
ok;
_ ->
{ok, Num} = ejabberd_redis:hdel(DomKey, [PidKey]),
if Num > 0 ->
{ok, Len} = ejabberd_redis:hlen(DomKey),
if Len == 0 ->
{ok, _} = ejabberd_redis:multi(
fun() ->
ejabberd_redis:del([DomKey]),
ejabberd_redis:srem(?ROUTES_KEY, [Domain])
end),
ok;
true ->
ok
end;
true ->
ok
end
catch _:{badmatch, Err} ->
@ -73,7 +83,7 @@ unregister_route(Domain, _, Pid) ->
find_routes(Domain) ->
DomKey = domain_key(Domain),
case ejabberd_redis:q(["HGETALL", DomKey]) of
case ejabberd_redis:hgetall(DomKey) of
{ok, Vals} ->
decode_routes(Domain, Vals);
Err ->
@ -83,8 +93,8 @@ find_routes(Domain) ->
host_of_route(Domain) ->
DomKey = domain_key(Domain),
case ejabberd_redis:q(["HGETALL", DomKey]) of
{ok, [_, Data|_]} ->
case ejabberd_redis:hgetall(DomKey) of
{ok, [{_Pid, Data}|_]} ->
{ServerHost, _} = binary_to_term(Data),
{ok, ServerHost};
{ok, []} ->
@ -95,9 +105,9 @@ host_of_route(Domain) ->
end.
is_my_route(Domain) ->
case ejabberd_redis:q(["SISMEMBER", ?ROUTES_KEY, Domain]) of
{ok, <<"1">>} -> true;
{ok, _} -> false;
case ejabberd_redis:sismember(?ROUTES_KEY, Domain) of
{ok, Bool} ->
Bool;
Err ->
?ERROR_MSG("failed to check route in redis: ~p", [Err]),
false
@ -107,7 +117,7 @@ is_my_host(Domain) ->
{ok, Domain} == host_of_route(Domain).
get_all_routes() ->
case ejabberd_redis:q(["SMEMBERS", ?ROUTES_KEY]) of
case ejabberd_redis:smembers(?ROUTES_KEY) of
{ok, Routes} ->
Routes;
Err ->
@ -116,18 +126,7 @@ get_all_routes() ->
end.
find_routes() ->
lists:flatmap(
fun(Domain) ->
DomKey = domain_key(Domain),
case ejabberd_redis:q(["HGETALL", DomKey]) of
{ok, Vals} ->
decode_routes(Domain, Vals);
Err ->
?ERROR_MSG("failed to fetch routes from redis: ~p",
[Err]),
[]
end
end, get_all_routes()).
lists:flatmap(fun find_routes/1, get_all_routes()).
%%%===================================================================
%%% Internal functions
@ -143,12 +142,12 @@ clean_table() ->
domain_key(Domain) ->
<<"ejabberd:route:", Domain/binary>>.
decode_routes(Domain, [Pid, Data|Vals]) ->
{ServerHost, LocalHint} = binary_to_term(Data),
[#route{domain = Domain,
pid = binary_to_term(Pid),
server_host = ServerHost,
local_hint = LocalHint}|
decode_routes(Domain, Vals)];
decode_routes(_, []) ->
[].
decode_routes(Domain, Vals) ->
lists:map(
fun({Pid, Data}) ->
{ServerHost, LocalHint} = binary_to_term(Data),
#route{domain = Domain,
pid = binary_to_term(Pid),
server_host = ServerHost,
local_hint = LocalHint}
end, Vals).

View File

@ -50,9 +50,12 @@ set_session(Session) ->
SIDKey = sid_to_key(Session#session.sid),
ServKey = server_to_key(element(2, Session#session.us)),
USSIDKey = us_sid_to_key(Session#session.us, Session#session.sid),
case ejabberd_redis:qp([["HSET", USKey, SIDKey, T],
["HSET", ServKey, USSIDKey, T]]) of
[{ok, _}, {ok, _}] ->
case ejabberd_redis:multi(
fun() ->
ejabberd_redis:hset(USKey, SIDKey, T),
ejabberd_redis:hset(ServKey, USSIDKey, T)
end) of
{ok, _} ->
ok;
Err ->
?ERROR_MSG("failed to set session for redis: ~p", [Err])
@ -62,7 +65,7 @@ set_session(Session) ->
{ok, #session{}} | {error, notfound}.
delete_session(LUser, LServer, _LResource, SID) ->
USKey = us_to_key({LUser, LServer}),
case ejabberd_redis:q(["HGETALL", USKey]) of
case ejabberd_redis:hgetall(USKey) of
{ok, Vals} ->
Ss = decode_session_list(Vals),
case lists:keyfind(SID, #session.sid, Ss) of
@ -72,8 +75,16 @@ delete_session(LUser, LServer, _LResource, SID) ->
SIDKey = sid_to_key(SID),
ServKey = server_to_key(element(2, Session#session.us)),
USSIDKey = us_sid_to_key(Session#session.us, SID),
ejabberd_redis:qp([["HDEL", USKey, SIDKey],
["HDEL", ServKey, USSIDKey]]),
case ejabberd_redis:multi(
fun() ->
ejabberd_redis:hdel(USKey, [SIDKey]),
ejabberd_redis:hdel(ServKey, [USSIDKey])
end) of
{ok, _} ->
ok;
Err ->
?ERROR_MSG("failed to delete session from redis: ~p", [Err])
end,
{ok, Session}
end;
Err ->
@ -91,7 +102,7 @@ get_sessions() ->
-spec get_sessions(binary()) -> [#session{}].
get_sessions(LServer) ->
ServKey = server_to_key(LServer),
case ejabberd_redis:q(["HGETALL", ServKey]) of
case ejabberd_redis:hgetall(ServKey) of
{ok, Vals} ->
decode_session_list(Vals);
Err ->
@ -102,8 +113,8 @@ get_sessions(LServer) ->
-spec get_sessions(binary(), binary()) -> [#session{}].
get_sessions(LUser, LServer) ->
USKey = us_to_key({LUser, LServer}),
case ejabberd_redis:q(["HGETALL", USKey]) of
{ok, Vals} when is_list(Vals) ->
case ejabberd_redis:hgetall(USKey) of
{ok, Vals} ->
decode_session_list(Vals);
Err ->
?ERROR_MSG("failed to get sessions from redis: ~p", [Err]),
@ -114,8 +125,8 @@ get_sessions(LUser, LServer) ->
[#session{}].
get_sessions(LUser, LServer, LResource) ->
USKey = us_to_key({LUser, LServer}),
case ejabberd_redis:q(["HGETALL", USKey]) of
{ok, Vals} when is_list(Vals) ->
case ejabberd_redis:hgetall(USKey) of
{ok, Vals} ->
[S || S <- decode_session_list(Vals),
element(3, S#session.usr) == LResource];
Err ->
@ -141,52 +152,36 @@ us_sid_to_key(US, SID) ->
sid_to_key(SID) ->
term_to_binary(SID).
decode_session_list([_, Val|T]) ->
[binary_to_term(Val)|decode_session_list(T)];
decode_session_list([]) ->
[].
decode_session_list(Vals) ->
[binary_to_term(Val) || {_, Val} <- Vals].
clean_table() ->
?INFO_MSG("Cleaning Redis SM table...", []),
lists:foreach(
fun(LServer) ->
ServKey = server_to_key(LServer),
case ejabberd_redis:q(["HKEYS", ServKey]) of
{ok, []} ->
ok;
{ok, Vals} ->
Vals1 = lists:filter(
fun(USSIDKey) ->
{_, SID} = binary_to_term(USSIDKey),
node(element(2, SID)) == node()
end, Vals),
Q1 = case Vals1 of
[] -> [];
_ -> ["HDEL", ServKey | Vals1]
end,
Q2 = lists:map(
fun(USSIDKey) ->
{US, SID} = binary_to_term(USSIDKey),
USKey = us_to_key(US),
SIDKey = sid_to_key(SID),
["HDEL", USKey, SIDKey]
end, Vals1),
Res = ejabberd_redis:qp(lists:delete([], [Q1|Q2])),
case lists:filter(
fun({ok, _}) -> false;
(_) -> true
end, Res) of
[] ->
ok;
Errs ->
?ERROR_MSG("failed to clean redis table for "
"server ~s: ~p", [LServer, Errs])
end;
Err ->
?ERROR_MSG("failed to clean redis table for "
"server ~s: ~p", [LServer, Err])
end
end, ejabberd_sm:get_vh_by_backend(?MODULE)).
try
lists:foreach(
fun(LServer) ->
ServKey = server_to_key(LServer),
{ok, Vals} = ejabberd_redis:hkeys(ServKey),
{ok, _} =
ejabberd_redis:multi(
fun() ->
lists:foreach(
fun(USSIDKey) ->
{US, SID} = binary_to_term(USSIDKey),
if node(element(2, SID)) == node() ->
USKey = us_to_key(US),
SIDKey = sid_to_key(SID),
ejabberd_redis:hdel(ServKey, [USSIDKey]),
ejabberd_redis:hdel(USKey, [SIDKey]);
true ->
ok
end
end, Vals)
end)
end, ejabberd_sm:get_vh_by_backend(?MODULE))
catch _:{badmatch, {error, _} = Err} ->
?ERROR_MSG("failed to clean redis c2s sessions: ~p", [Err])
end.
opt_type(redis_connect_timeout) ->
fun (I) when is_integer(I), I > 0 -> I end;

View File

@ -25,7 +25,7 @@ init() ->
open_session(SID, Pid) ->
PidBin = term_to_binary(Pid),
case ejabberd_redis:q(["HSET", ?BOSH_KEY, SID, PidBin]) of
case ejabberd_redis:hset(?BOSH_KEY, SID, PidBin) of
{ok, _} ->
ok;
Err ->
@ -34,7 +34,7 @@ open_session(SID, Pid) ->
end.
close_session(SID) ->
case ejabberd_redis:q(["HDEL", ?BOSH_KEY, SID]) of
case ejabberd_redis:hdel(?BOSH_KEY, [SID]) of
{ok, _} ->
ok;
Err ->
@ -42,9 +42,15 @@ close_session(SID) ->
end.
find_session(SID) ->
case ejabberd_redis:q(["HGET", ?BOSH_KEY, SID]) of
case ejabberd_redis:hget(?BOSH_KEY, SID) of
{ok, Pid} when is_binary(Pid) ->
{ok, binary_to_term(Pid)};
try
{ok, binary_to_term(Pid)}
catch _:badarg ->
?ERROR_MSG("malformed data in redis (key = '~s'): ~p",
[SID, Pid]),
error
end;
{ok, _} ->
error;
Err ->
@ -56,21 +62,23 @@ find_session(SID) ->
%%% Internal functions
%%%===================================================================
clean_table() ->
?INFO_MSG("Cleaning Redis BOSH table...", []),
case ejabberd_redis:q(["HGETALL", ?BOSH_KEY]) of
?INFO_MSG("Cleaning Redis BOSH sessions...", []),
case ejabberd_redis:hgetall(?BOSH_KEY) of
{ok, Vals} ->
clean_table(Vals);
case ejabberd_redis:multi(
fun() ->
lists:foreach(
fun({SID, Pid}) when node(Pid) == node() ->
ejabberd_redis:hdel(?BOSH_KEY, [SID]);
(_) ->
ok
end, Vals)
end) of
{ok, _} ->
ok;
Err ->
?ERROR_MSG("failed to clean bosh sessions in redis: ~p", [Err])
end;
Err ->
?ERROR_MSG("failed to clean bosh table in redis: ~p", [Err])
?ERROR_MSG("failed to clean bosh sessions in redis: ~p", [Err])
end.
clean_table([SID, PidBin|Vals]) ->
case binary_to_term(PidBin) of
Pid when node(Pid) == node() ->
close_session(SID);
_ ->
ok
end,
clean_table(Vals);
clean_table([]) ->
ok.

View File

@ -39,9 +39,12 @@ enable(LUser, LServer, LResource, NS) ->
USKey = us_key(LUser, LServer),
NodeKey = node_key(),
JID = jid:encode({LUser, LServer, LResource}),
case ejabberd_redis:qp([["HSET", USKey, LResource, NS],
["SADD", NodeKey, JID]]) of
[{ok, _}, {ok, _}] ->
case ejabberd_redis:multi(
fun() ->
ejabberd_redis:hset(USKey, LResource, NS),
ejabberd_redis:sadd(NodeKey, [JID])
end) of
{ok, _} ->
ok;
Err ->
?ERROR_MSG("failed to write in redis: ~p", [Err]),
@ -52,9 +55,12 @@ disable(LUser, LServer, LResource) ->
USKey = us_key(LUser, LServer),
NodeKey = node_key(),
JID = jid:encode({LUser, LServer, LResource}),
case ejabberd_redis:qp([["HDEL", USKey, LResource],
["SREM", NodeKey, JID]]) of
[{ok, _}, {ok, _}] ->
case ejabberd_redis:multi(
fun() ->
ejabberd_redis:hdel(USKey, [LResource]),
ejabberd_redis:srem(NodeKey, [JID])
end) of
{ok, _} ->
ok;
Err ->
?ERROR_MSG("failed to delete from redis: ~p", [Err]),
@ -63,9 +69,9 @@ disable(LUser, LServer, LResource) ->
list(LUser, LServer) ->
USKey = us_key(LUser, LServer),
case ejabberd_redis:q(["HGETALL", USKey]) of
case ejabberd_redis:hgetall(USKey) of
{ok, Vals} ->
decode_vals(Vals);
Vals;
Err ->
?ERROR_MSG("failed to read from redis: ~p", [Err]),
[]
@ -77,24 +83,26 @@ list(LUser, LServer) ->
clean_table() ->
?INFO_MSG("Cleaning Redis 'carboncopy' table...", []),
NodeKey = node_key(),
case ejabberd_redis:q(["SMEMBERS", NodeKey]) of
case ejabberd_redis:smembers(NodeKey) of
{ok, JIDs} ->
lists:foreach(
fun(JID) ->
{U, S, R} = jid:split(jid:decode(JID)),
USKey = us_key(U, S),
case ejabberd_redis:q(["HDEL", USKey, R]) of
{ok, _} ->
ok;
Err ->
?ERROR_MSG("failed to delete from redis: ~p",
[Err])
end
end, JIDs);
case ejabberd_redis:multi(
fun() ->
lists:foreach(
fun(JID) ->
{U, S, R} = jid:split(jid:decode(JID)),
USKey = us_key(U, S),
ejabberd_redis:hdel(USKey, [R])
end, JIDs)
end) of
{ok, _} ->
ok;
Err ->
?ERROR_MSG("failed to delete from redis: ~p", [Err])
end;
Err ->
?ERROR_MSG("failed to read from redis: ~p", [Err])
end,
case ejabberd_redis:q(["DEL", NodeKey]) of
case ejabberd_redis:del([NodeKey]) of
{ok, _} -> ok;
Error -> ?ERROR_MSG("failed to delete from redis: ~p", [Error])
end.
@ -105,8 +113,3 @@ us_key(LUser, LServer) ->
node_key() ->
Node = erlang:atom_to_binary(node(), latin1),
<<"ejabberd:carboncopy:nodes:", Node/binary>>.
decode_vals([Resource, NS|Vals]) ->
[{Resource, NS}|decode_vals(Vals)];
decode_vals([]) ->
[].