diff --git a/priv/lua/redis_sm.lua b/priv/lua/redis_sm.lua new file mode 100644 index 000000000..2624fef16 --- /dev/null +++ b/priv/lua/redis_sm.lua @@ -0,0 +1,16 @@ +redis.replicate_commands() +local cursor = redis.call('GET', KEYS[3]) or 0 +local scan_result = redis.call('HSCAN', KEYS[1], cursor, 'COUNT', ARGV[1]) +local newcursor = scan_result[1] +local cursor = redis.call('SET', KEYS[3], newcursor) +redis.call('EXPIRE', KEYS[3], 30) +for key,value in ipairs(scan_result[2]) do + local uskey, sidkey = string.match(value, '(.*)||(.*)') + if uskey and sidkey then + redis.call('HDEL', uskey, sidkey) + redis.call('HDEL', KEYS[1], value) + else + redis.call('HDEL', KEYS[2], value) + end +end +return newcursor diff --git a/src/ejabberd_redis.erl b/src/ejabberd_redis.erl index 3bfc9fa52..596b75117 100644 --- a/src/ejabberd_redis.erl +++ b/src/ejabberd_redis.erl @@ -36,7 +36,7 @@ -export([multi/1, get/1, set/2, del/1, sadd/2, srem/2, smembers/1, sismember/2, scard/1, hget/2, hset/3, hdel/2, hlen/1, hgetall/1, hkeys/1, - subscribe/1, publish/2]). + subscribe/1, publish/2, script_load/1, evalsha/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -316,6 +316,24 @@ publish(Channel, Data) -> tr_enq(Cmd, Stack) end. +-spec script_load(iodata()) -> {ok, binary()} | redis_error(). +script_load(Data) -> + case erlang:get(?TR_STACK) of + undefined -> + q([<<"SCRIPT">>, <<"LOAD">>, Data]); + _ -> + erlang:error(transaction_unsupported) + end. + +-spec evalsha(binary(), [iodata()], [iodata()]) -> {ok, binary()} | redis_error(). +evalsha(SHA, Keys, Args) -> + case erlang:get(?TR_STACK) of + undefined -> + q([<<"EVALSHA">>, SHA, length(Keys)|Keys ++ Args]); + _ -> + erlang:error(transaction_unsupported) + end. + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== diff --git a/src/ejabberd_sm_redis.erl b/src/ejabberd_sm_redis.erl index 7bb7d2e74..ec1fc1d1b 100644 --- a/src/ejabberd_sm_redis.erl +++ b/src/ejabberd_sm_redis.erl @@ -27,7 +27,6 @@ -define(GEN_SERVER, p1_server). -endif. -behaviour(?GEN_SERVER). --define(DELETION_CURSOR_TIMEOUT_SEC, "30"). -behaviour(ejabberd_sm). -export([init/0, set_session/1, delete_session/1, @@ -75,7 +74,9 @@ set_session(Session) -> fun() -> ejabberd_redis:hset(USKey, SIDKey, T), ejabberd_redis:hset(ServKey, USSIDKey, T), - ejabberd_redis:hset(NodeHostKey , <>, USSIDKey), + ejabberd_redis:hset(NodeHostKey, + <>, + USSIDKey), ejabberd_redis:publish( ?SM_KEY, term_to_binary({delete, Session#session.us})) end) of @@ -96,7 +97,8 @@ delete_session(#session{sid = SID} = Session) -> fun() -> ejabberd_redis:hdel(USKey, [SIDKey]), ejabberd_redis:hdel(ServKey, [USSIDKey]), - ejabberd_redis:hdel(NodeHostKey, [<>]), + ejabberd_redis:hdel(NodeHostKey, + [<>]), ejabberd_redis:publish( ?SM_KEY, term_to_binary({delete, Session#session.us})) @@ -159,7 +161,7 @@ handle_info({redis_message, ?SM_KEY, Data}, State) -> end, {noreply, State}; handle_info(Info, State) -> - ?ERROR_MSG("unexpected info: ~p ", [Info]), + ?ERROR_MSG("unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> @@ -172,12 +174,10 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%=================================================================== us_to_key({LUser, LServer}) -> - SMPrefixKey = ?SM_KEY, - <>. + <<(?SM_KEY)/binary, ":", LUser/binary, "@", LServer/binary>>. server_to_key(LServer) -> - SMPrefixKey = ?SM_KEY, - <>. + <<(?SM_KEY)/binary, ":", LServer/binary>>. us_sid_to_key(US, SID) -> term_to_binary({US, SID}). @@ -185,18 +185,16 @@ us_sid_to_key(US, SID) -> sid_to_key(SID) -> term_to_binary(SID). -node_session_deletion_cursor(Node, Host) when is_binary(Host) and is_binary(Node) -> +node_session_deletion_cursor(Node, Host) -> NodeName = node_host_to_key(Node, Host), <>. -node_host_to_key(Node, Host) when is_atom(Node) and is_binary(Host) -> +node_host_to_key(Node, Host) when is_atom(Node) -> NodeBin = atom_to_binary(node(), utf8), node_host_to_key(NodeBin, Host); -node_host_to_key(NodeBin, Host) when is_binary(NodeBin) and is_binary(Host) -> +node_host_to_key(NodeBin, Host) -> HostKey = server_to_key(Host), - <>; -node_host_to_key(_NodeBin, _Host) -> - ?ERROR_MSG("Invalid node type ", []). + <>. decode_session_list(Vals) -> [binary_to_term(Val) || {_, Val} <- Vals]. @@ -206,67 +204,43 @@ clean_table() -> clean_table(Node) when is_atom(Node) -> clean_table(atom_to_binary(Node, utf8)); - -clean_table(Node) when is_binary(Node) -> +clean_table(Node) -> ?DEBUG("Cleaning Redis SM table... ", []), try - lists:foreach( - fun(Host) -> clean_node_sessions(Node, Host) end, - ejabberd_sm:get_vh_by_backend(?MODULE) - ), - ok - catch E:R -> - ?ERROR_MSG("failed to clean redis c2s sessions due to ~p: ~p", [E, R]), - {error, R} - end; - -clean_table(_) -> - ?ERROR_MSG("Wrong node data type in clean table call ", []). + lists:foreach( + fun(Host) -> + ok = clean_node_sessions(Node, Host) + end, ejabberd_sm:get_vh_by_backend(?MODULE)) + catch _:{badmatch, {error, _} = Err} -> + ?ERROR_MSG("Failed to clean Redis SM table", []), + Err + end. clean_node_sessions(Node, Host) -> case load_script() of - {ok , SHA} -> + {ok, SHA} -> clean_node_sessions(Node, Host, SHA); - Error -> - ?ERROR_MSG("Failure in generating the SHA ~p", [Error]) + Err -> + Err end. clean_node_sessions(Node, Host, SHA) -> - ?INFO_MSG("Cleaning node sessions for node ~p with host ~p ", [Node, Host]), - case ejabberd_redis:q(["EVALSHA", SHA, - 3, - node_host_to_key(Node, Host), - server_to_key(Host), - node_session_deletion_cursor(Node, Host), - 1000 - ]) of - {ok, <<"0">>} -> - ?DEBUG("Cleaned node sessions for node ~p with host ~p ", [Node, Host]); - {ok, Cursor} -> - ?DEBUG("Cleaning redis sessions with cursor ~p ", [Cursor]), - clean_node_sessions(Node, Host, SHA); - Error -> - ?INFO_MSG("Error in redis clean up: ~p", [Error]), - throw(Error) + Keys = [node_host_to_key(Node, Host), + server_to_key(Host), + node_session_deletion_cursor(Node, Host)], + case ejabberd_redis:evalsha(SHA, Keys, [1000]) of + {ok, <<"0">>} -> + ok; + {ok, _Cursor} -> + clean_node_sessions(Node, Host, SHA); + {error, _} = Err -> + Err end. load_script() -> - ejabberd_redis:q(["SCRIPT", "LOAD", - ["redis.replicate_commands() ", - "local cursor = redis.call('GET', KEYS[3]) or 0 ", - "local scan_result = redis.call('HSCAN', KEYS[1], cursor, 'COUNT', ARGV[1]) ", - "local newcursor = scan_result[1] ", - "local cursor = redis.call('SET', KEYS[3], newcursor) ", - "redis.call('EXPIRE', KEYS[3], ", ?DELETION_CURSOR_TIMEOUT_SEC , ") ", - "for key,value in ipairs(scan_result[2]) do ", - "local uskey, sidkey = string.match(value, '(.*)||(.*)') ", - "if uskey and sidkey then ", - "redis.call('HDEL', uskey, sidkey) ", - "redis.call('HDEL', KEYS[1], value) ", - "else ", - "redis.call('HDEL', KEYS[2], value) ", - "end ", - "end ", - " return newcursor " - ] - ]). + case misc:read_lua("redis_sm.lua") of + {ok, Data} -> + ejabberd_redis:script_load(Data); + {error, _} = Err -> + Err + end. diff --git a/src/misc.erl b/src/misc.erl index a470e2b63..abd0691ee 100644 --- a/src/misc.erl +++ b/src/misc.erl @@ -34,9 +34,9 @@ l2i/1, i2l/1, i2l/2, expr_to_term/1, term_to_expr/1, now_to_usec/1, usec_to_now/1, encode_pid/1, decode_pid/2, compile_exprs/2, join_atoms/2, try_read_file/1, get_descr/2, - css_dir/0, img_dir/0, js_dir/0, msgs_dir/0, sql_dir/0, - read_css/1, read_img/1, read_js/1, try_url/1, intersection/2, - format_val/1]). + css_dir/0, img_dir/0, js_dir/0, msgs_dir/0, sql_dir/0, lua_dir/0, + read_css/1, read_img/1, read_js/1, read_lua/1, try_url/1, + intersection/2, format_val/1]). %% Deprecated functions -export([decode_base64/1, encode_base64/1]). @@ -263,6 +263,10 @@ msgs_dir() -> sql_dir() -> get_dir("sql"). +-spec lua_dir() -> file:filename(). +lua_dir() -> + get_dir("lua"). + -spec read_css(file:filename()) -> {ok, binary()} | {error, file:posix()}. read_css(File) -> read_file(filename:join(css_dir(), File)). @@ -275,6 +279,10 @@ read_img(File) -> read_js(File) -> read_file(filename:join(js_dir(), File)). +-spec read_lua(file:filename()) -> {ok, binary()} | {error, file:posix()}. +read_lua(File) -> + read_file(filename:join(lua_dir(), File)). + -spec get_descr(binary(), binary()) -> binary(). get_descr(Lang, Text) -> Desc = translate:translate(Lang, Text),