diff --git a/rebar.config b/rebar.config index e7e9120ae..b03a6bba7 100644 --- a/rebar.config +++ b/rebar.config @@ -20,7 +20,7 @@ {deps, [{lager, ".*", {git, "https://github.com/erlang-lager/lager", "3.6.7"}}, {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", {tag, "1.0.13"}}}, - {cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "1.0.16"}}}, + {cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "6493974"}}}, {fast_tls, ".*", {git, "https://github.com/processone/fast_tls", {tag, "1.0.26"}}}, {stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.14"}}}, {fast_xml, ".*", {git, "https://github.com/processone/fast_xml", {tag, "1.1.34"}}}, diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index d9e211656..de871a11c 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -50,8 +50,6 @@ set_presence/7, unset_presence/6, close_session_unset_presence/5, - set_offline_info/5, - get_offline_info/4, dirty_get_sessions_list/0, dirty_get_my_sessions_list/0, get_vh_session_list/1, @@ -78,8 +76,7 @@ host_down/1, make_sid/0, clean_cache/1, - config_reloaded/0, - is_online/1 + config_reloaded/0 ]). -export([init/1, handle_call/3, handle_cast/2, @@ -211,14 +208,14 @@ get_user_resources(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), Mod = get_sm_backend(LServer), - Ss = online(get_sessions(Mod, LUser, LServer)), + Ss = get_sessions(Mod, LUser, LServer), [element(3, S#session.usr) || S <- clean_session_list(Ss)]. -spec get_user_present_resources(binary(), binary()) -> [tuple()]. get_user_present_resources(LUser, LServer) -> Mod = get_sm_backend(LServer), - Ss = online(get_sessions(Mod, LUser, LServer)), + Ss = get_sessions(Mod, LUser, LServer), [{S#session.priority, element(3, S#session.usr)} || S <- clean_session_list(Ss), is_integer(S#session.priority)]. @@ -229,7 +226,7 @@ get_user_ip(User, Server, Resource) -> LServer = jid:nameprep(Server), LResource = jid:resourceprep(Resource), Mod = get_sm_backend(LServer), - case online(get_sessions(Mod, LUser, LServer, LResource)) of + case get_sessions(Mod, LUser, LServer, LResource) of [] -> undefined; Ss -> @@ -242,7 +239,7 @@ get_user_info(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), Mod = get_sm_backend(LServer), - Ss = online(get_sessions(Mod, LUser, LServer)), + Ss = get_sessions(Mod, LUser, LServer), [{LResource, [{node, node(Pid)}, {ts, Ts}, {pid, Pid}, {priority, Priority} | Info]} || #session{usr = {_, _, LResource}, @@ -257,7 +254,7 @@ get_user_info(User, Server, Resource) -> LServer = jid:nameprep(Server), LResource = jid:resourceprep(Resource), Mod = get_sm_backend(LServer), - case online(get_sessions(Mod, LUser, LServer, LResource)) of + case get_sessions(Mod, LUser, LServer, LResource) of [] -> offline; Ss -> @@ -316,7 +313,7 @@ get_session_sid(User, Server, Resource) -> LServer = jid:nameprep(Server), LResource = jid:resourceprep(Resource), Mod = get_sm_backend(LServer), - case online(get_sessions(Mod, LUser, LServer, LResource)) of + case get_sessions(Mod, LUser, LServer, LResource) of [] -> none; Ss -> @@ -330,43 +327,15 @@ get_session_sids(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), Mod = get_sm_backend(LServer), - Sessions = online(get_sessions(Mod, LUser, LServer)), + Sessions = get_sessions(Mod, LUser, LServer), [SID || #session{sid = SID} <- Sessions]. --spec set_offline_info(sid(), binary(), binary(), binary(), info()) -> ok. - -set_offline_info(SID, User, Server, Resource, Info) -> - LUser = jid:nodeprep(User), - LServer = jid:nameprep(Server), - LResource = jid:resourceprep(Resource), - set_session(SID, LUser, LServer, LResource, undefined, [offline | Info]). - --spec get_offline_info(erlang:timestamp(), binary(), binary(), - binary()) -> none | info(). - -get_offline_info(Time, User, Server, Resource) -> - LUser = jid:nodeprep(User), - LServer = jid:nameprep(Server), - LResource = jid:resourceprep(Resource), - Mod = get_sm_backend(LServer), - case get_sessions(Mod, LUser, LServer, LResource) of - [#session{sid = {Time, _}, info = Info}] -> - case proplists:get_bool(offline, Info) of - true -> - Info; - false -> - none - end; - _ -> - none - end. - -spec dirty_get_sessions_list() -> [ljid()]. dirty_get_sessions_list() -> lists:flatmap( fun(Mod) -> - [S#session.usr || S <- online(get_sessions(Mod))] + [S#session.usr || S <- get_sessions(Mod)] end, get_sm_backends()). -spec dirty_get_my_sessions_list() -> [#session{}]. @@ -374,7 +343,7 @@ dirty_get_sessions_list() -> dirty_get_my_sessions_list() -> lists:flatmap( fun(Mod) -> - [S || S <- online(get_sessions(Mod)), + [S || S <- get_sessions(Mod), node(element(2, S#session.sid)) == node()] end, get_sm_backends()). @@ -383,14 +352,14 @@ dirty_get_my_sessions_list() -> get_vh_session_list(Server) -> LServer = jid:nameprep(Server), Mod = get_sm_backend(LServer), - [S#session.usr || S <- online(get_sessions(Mod, LServer))]. + [S#session.usr || S <- get_sessions(Mod, LServer)]. -spec get_all_pids() -> [pid()]. get_all_pids() -> lists:flatmap( fun(Mod) -> - [element(2, S#session.sid) || S <- online(get_sessions(Mod))] + [element(2, S#session.sid) || S <- get_sessions(Mod)] end, get_sm_backends()). -spec get_vh_session_number(binary()) -> non_neg_integer(). @@ -398,7 +367,7 @@ get_all_pids() -> get_vh_session_number(Server) -> LServer = jid:nameprep(Server), Mod = get_sm_backend(LServer), - length(online(get_sessions(Mod, LServer))). + length(get_sessions(Mod, LServer)). %% Why the hell do we have so many similar kicks? c2s_handle_info(#{lang := Lang} = State, replaced) -> @@ -579,16 +548,6 @@ delete_session(Mod, #session{usr = {LUser, LServer, _}} = Session) -> ok end. --spec online([#session{}]) -> [#session{}]. - -online(Sessions) -> - lists:filter(fun is_online/1, Sessions). - --spec is_online(#session{}) -> boolean(). - -is_online(#session{info = Info}) -> - not proplists:get_bool(offline, Info). - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -spec do_route(jid(), term()) -> any(). do_route(#jid{lresource = <<"">>} = To, Term) -> @@ -600,7 +559,7 @@ do_route(To, Term) -> ?DEBUG("broadcasting ~p to ~s", [Term, jid:encode(To)]), {U, S, R} = jid:tolower(To), Mod = get_sm_backend(S), - case online(get_sessions(Mod, U, S, R)) of + case get_sessions(Mod, U, S, R) of [] -> ?DEBUG("dropping broadcast to unavailable resourse: ~p", [Term]); Ss -> @@ -631,7 +590,7 @@ do_route(#presence{to = To, type = T} = Packet) ejabberd_c2s:route(Pid, {route, Packet1}); (_) -> ok - end, online(get_sessions(Mod, LUser, LServer))); + end, get_sessions(Mod, LUser, LServer)); false -> ok end; @@ -660,7 +619,7 @@ do_route(Packet) -> To = xmpp:get_to(Packet), {LUser, LServer, LResource} = jid:tolower(To), Mod = get_sm_backend(LServer), - case online(get_sessions(Mod, LUser, LServer, LResource)) of + case get_sessions(Mod, LUser, LServer, LResource) of [] -> case Packet of #message{type = T} when T == chat; T == normal -> @@ -708,8 +667,8 @@ route_message(#message{to = To, type = Type} = Packet) -> (P >= 0) and (Type == headline) -> LResource = jid:resourceprep(R), Mod = get_sm_backend(LServer), - case online(get_sessions(Mod, LUser, LServer, - LResource)) of + case get_sessions(Mod, LUser, LServer, + LResource) of [] -> ok; % Race condition Ss -> @@ -780,13 +739,9 @@ check_for_sessions_to_replace(User, Server, Resource) -> check_existing_resources(LUser, LServer, LResource) -> Mod = get_sm_backend(LServer), Ss = get_sessions(Mod, LUser, LServer, LResource), - {OnlineSs, OfflineSs} = lists:partition(fun is_online/1, Ss), - lists:foreach(fun(S) -> - delete_session(Mod, S) - end, OfflineSs), - if OnlineSs == [] -> ok; + if Ss == [] -> ok; true -> - SIDs = [SID || #session{sid = SID} <- OnlineSs], + SIDs = [SID || #session{sid = SID} <- Ss], MaxSID = lists:max(SIDs), lists:foreach(fun ({_, Pid} = S) when S /= MaxSID -> ejabberd_c2s:route(Pid, replaced); @@ -806,22 +761,17 @@ get_resource_sessions(User, Server, Resource) -> LServer = jid:nameprep(Server), LResource = jid:resourceprep(Resource), Mod = get_sm_backend(LServer), - [S#session.sid || S <- online(get_sessions(Mod, LUser, LServer, LResource))]. + [S#session.sid || S <- get_sessions(Mod, LUser, LServer, LResource)]. -spec check_max_sessions(binary(), binary()) -> ok | replaced. check_max_sessions(LUser, LServer) -> Mod = get_sm_backend(LServer), Ss = get_sessions(Mod, LUser, LServer), - {OnlineSs, OfflineSs} = lists:partition(fun is_online/1, Ss), MaxSessions = get_max_user_sessions(LUser, LServer), - if length(OnlineSs) =< MaxSessions -> ok; + if length(Ss) =< MaxSessions -> ok; true -> - #session{sid = {_, Pid}} = lists:min(OnlineSs), + #session{sid = {_, Pid}} = lists:min(Ss), ejabberd_c2s:route(Pid, replaced) - end, - if length(OfflineSs) =< MaxSessions -> ok; - true -> - delete_session(Mod, lists:min(OfflineSs)) end. %% Get the user_max_session setting @@ -843,7 +793,7 @@ get_max_user_sessions(LUser, Host) -> force_update_presence({LUser, LServer}) -> Mod = get_sm_backend(LServer), - Ss = online(get_sessions(Mod, LUser, LServer)), + Ss = get_sessions(Mod, LUser, LServer), lists:foreach(fun (#session{sid = {_, Pid}}) -> ejabberd_c2s:resend_presence(Pid) end, diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl index 1927afa95..546c45a69 100644 --- a/src/mod_stream_mgmt.erl +++ b/src/mod_stream_mgmt.erl @@ -40,6 +40,8 @@ -include("logger.hrl"). -include("p1_queue.hrl"). +-define(STREAM_MGMT_CACHE, stream_mgmt_cache). + -define(is_sm_packet(Pkt), is_record(Pkt, sm_enable) or is_record(Pkt, sm_resume) or @@ -51,7 +53,8 @@ %%%=================================================================== %%% API %%%=================================================================== -start(Host, _Opts) -> +start(Host, Opts) -> + init_cache(Opts), ejabberd_hooks:add(c2s_init, ?MODULE, c2s_stream_init, 50), ejabberd_hooks:add(c2s_stream_started, Host, ?MODULE, c2s_stream_started, 50), @@ -284,23 +287,16 @@ c2s_terminated(#{mgmt_state := resumed, jid := JID} = State, _Reason) -> [jid:encode(JID)]), bounce_message_queue(), {stop, State}; -c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In, sid := SID, - user := U, server := S, resource := R} = State, Reason) -> - Result = case MgmtState of - timeout -> - Info = [{num_stanzas_in, In}], - %% TODO: Usually, ejabberd_c2s:process_terminated/2 is - %% called later in the hook chain. We swap the order so - %% that the offline info won't be purged after we stored - %% it. This should be fixed in a proper way. - State1 = ejabberd_c2s:process_terminated(State, Reason), - ejabberd_sm:set_offline_info(SID, U, S, R, Info), - {stop, State1}; - _ -> - State - end, +c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In, + sid := {Time, _}, jid := JID} = State, _Reason) -> + case MgmtState of + timeout -> + store_stanzas_in(jid:tolower(JID), Time, In); + _ -> + ok + end, route_unacked_stanzas(State), - Result; + State; c2s_terminated(State, _Reason) -> State. @@ -641,16 +637,11 @@ inherit_session_state(#{user := U, server := S, {term, {R, Time}} -> case ejabberd_sm:get_session_pid(U, S, R) of none -> - case ejabberd_sm:get_offline_info(Time, U, S, R) of - none -> + case pop_stanzas_in({U, S, R}, Time) of + error -> {error, <<"Previous session PID not found">>}; - Info -> - case proplists:get_value(num_stanzas_in, Info) of - undefined -> - {error, <<"Previous session timed out">>}; - H -> - {error, <<"Previous session timed out">>, H} - end + {ok, H} -> + {error, <<"Previous session timed out">>, H} end; OldPID -> OldSID = {Time, OldPID}, @@ -750,6 +741,32 @@ need_to_enqueue(#{mgmt_force_enqueue := true} = State, #xmlel{}) -> need_to_enqueue(State, _) -> {false, State}. +%%%=================================================================== +%%% Cache-like storage for last handled stanzas +%%%=================================================================== +init_cache(Opts) -> + ets_cache:new(?STREAM_MGMT_CACHE, cache_opts(Opts)). + +cache_opts(Opts) -> + [{max_size, gen_mod:get_opt(cache_size, Opts)}, + {life_time, infinity}]. + +-spec store_stanzas_in(ljid(), erlang:timestamp(), non_neg_integer()) -> boolean(). +store_stanzas_in(LJID, Time, Num) -> + ets_cache:insert(?STREAM_MGMT_CACHE, {LJID, Time}, Num, + ejabberd_cluster:get_nodes()). + +-spec pop_stanzas_in(ljid(), erlang:timestamp()) -> {ok, non_neg_integer()} | error. +pop_stanzas_in(LJID, Time) -> + case ets_cache:lookup(?STREAM_MGMT_CACHE, {LJID, Time}) of + {ok, Val} -> + ets_cache:delete(?STREAM_MGMT_CACHE, {LJID, Time}, + ejabberd_cluster:get_nodes()), + {ok, Val}; + error -> + error + end. + %%%=================================================================== %%% Configuration processing %%%=================================================================== @@ -796,6 +813,11 @@ mod_opt_type(resend_on_timeout) -> fun(B) when is_boolean(B) -> B; (if_offline) -> if_offline end; +mod_opt_type(cache_size) -> + fun(I) when is_integer(I), I>0 -> I; + (unlimited) -> infinity; + (infinity) -> infinity + end; mod_opt_type(queue_type) -> fun(ram) -> ram; (file) -> file end. @@ -804,5 +826,6 @@ mod_options(Host) -> {resume_timeout, 300}, {max_resume_timeout, undefined}, {ack_timeout, 60}, + {cache_size, ejabberd_config:cache_size(Host)}, {resend_on_timeout, false}, {queue_type, ejabberd_config:default_queue_type(Host)}].