mirror of
https://github.com/processone/ejabberd.git
synced 2024-12-20 17:27:00 +01:00
Keep last handled stanzas number in cache rather than session table
This commit is contained in:
parent
b8883b5a61
commit
5d27c975dc
@ -20,7 +20,7 @@
|
|||||||
|
|
||||||
{deps, [{lager, ".*", {git, "https://github.com/erlang-lager/lager", "3.6.7"}},
|
{deps, [{lager, ".*", {git, "https://github.com/erlang-lager/lager", "3.6.7"}},
|
||||||
{p1_utils, ".*", {git, "https://github.com/processone/p1_utils", {tag, "1.0.13"}}},
|
{p1_utils, ".*", {git, "https://github.com/processone/p1_utils", {tag, "1.0.13"}}},
|
||||||
{cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "1.0.16"}}},
|
{cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "6493974"}}},
|
||||||
{fast_tls, ".*", {git, "https://github.com/processone/fast_tls", {tag, "1.0.26"}}},
|
{fast_tls, ".*", {git, "https://github.com/processone/fast_tls", {tag, "1.0.26"}}},
|
||||||
{stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.14"}}},
|
{stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.14"}}},
|
||||||
{fast_xml, ".*", {git, "https://github.com/processone/fast_xml", {tag, "1.1.34"}}},
|
{fast_xml, ".*", {git, "https://github.com/processone/fast_xml", {tag, "1.1.34"}}},
|
||||||
|
@ -50,8 +50,6 @@
|
|||||||
set_presence/7,
|
set_presence/7,
|
||||||
unset_presence/6,
|
unset_presence/6,
|
||||||
close_session_unset_presence/5,
|
close_session_unset_presence/5,
|
||||||
set_offline_info/5,
|
|
||||||
get_offline_info/4,
|
|
||||||
dirty_get_sessions_list/0,
|
dirty_get_sessions_list/0,
|
||||||
dirty_get_my_sessions_list/0,
|
dirty_get_my_sessions_list/0,
|
||||||
get_vh_session_list/1,
|
get_vh_session_list/1,
|
||||||
@ -78,8 +76,7 @@
|
|||||||
host_down/1,
|
host_down/1,
|
||||||
make_sid/0,
|
make_sid/0,
|
||||||
clean_cache/1,
|
clean_cache/1,
|
||||||
config_reloaded/0,
|
config_reloaded/0
|
||||||
is_online/1
|
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2,
|
-export([init/1, handle_call/3, handle_cast/2,
|
||||||
@ -211,14 +208,14 @@ get_user_resources(User, Server) ->
|
|||||||
LUser = jid:nodeprep(User),
|
LUser = jid:nodeprep(User),
|
||||||
LServer = jid:nameprep(Server),
|
LServer = jid:nameprep(Server),
|
||||||
Mod = get_sm_backend(LServer),
|
Mod = get_sm_backend(LServer),
|
||||||
Ss = online(get_sessions(Mod, LUser, LServer)),
|
Ss = get_sessions(Mod, LUser, LServer),
|
||||||
[element(3, S#session.usr) || S <- clean_session_list(Ss)].
|
[element(3, S#session.usr) || S <- clean_session_list(Ss)].
|
||||||
|
|
||||||
-spec get_user_present_resources(binary(), binary()) -> [tuple()].
|
-spec get_user_present_resources(binary(), binary()) -> [tuple()].
|
||||||
|
|
||||||
get_user_present_resources(LUser, LServer) ->
|
get_user_present_resources(LUser, LServer) ->
|
||||||
Mod = get_sm_backend(LServer),
|
Mod = get_sm_backend(LServer),
|
||||||
Ss = online(get_sessions(Mod, LUser, LServer)),
|
Ss = get_sessions(Mod, LUser, LServer),
|
||||||
[{S#session.priority, element(3, S#session.usr)}
|
[{S#session.priority, element(3, S#session.usr)}
|
||||||
|| S <- clean_session_list(Ss), is_integer(S#session.priority)].
|
|| S <- clean_session_list(Ss), is_integer(S#session.priority)].
|
||||||
|
|
||||||
@ -229,7 +226,7 @@ get_user_ip(User, Server, Resource) ->
|
|||||||
LServer = jid:nameprep(Server),
|
LServer = jid:nameprep(Server),
|
||||||
LResource = jid:resourceprep(Resource),
|
LResource = jid:resourceprep(Resource),
|
||||||
Mod = get_sm_backend(LServer),
|
Mod = get_sm_backend(LServer),
|
||||||
case online(get_sessions(Mod, LUser, LServer, LResource)) of
|
case get_sessions(Mod, LUser, LServer, LResource) of
|
||||||
[] ->
|
[] ->
|
||||||
undefined;
|
undefined;
|
||||||
Ss ->
|
Ss ->
|
||||||
@ -242,7 +239,7 @@ get_user_info(User, Server) ->
|
|||||||
LUser = jid:nodeprep(User),
|
LUser = jid:nodeprep(User),
|
||||||
LServer = jid:nameprep(Server),
|
LServer = jid:nameprep(Server),
|
||||||
Mod = get_sm_backend(LServer),
|
Mod = get_sm_backend(LServer),
|
||||||
Ss = online(get_sessions(Mod, LUser, LServer)),
|
Ss = get_sessions(Mod, LUser, LServer),
|
||||||
[{LResource, [{node, node(Pid)}, {ts, Ts}, {pid, Pid},
|
[{LResource, [{node, node(Pid)}, {ts, Ts}, {pid, Pid},
|
||||||
{priority, Priority} | Info]}
|
{priority, Priority} | Info]}
|
||||||
|| #session{usr = {_, _, LResource},
|
|| #session{usr = {_, _, LResource},
|
||||||
@ -257,7 +254,7 @@ get_user_info(User, Server, Resource) ->
|
|||||||
LServer = jid:nameprep(Server),
|
LServer = jid:nameprep(Server),
|
||||||
LResource = jid:resourceprep(Resource),
|
LResource = jid:resourceprep(Resource),
|
||||||
Mod = get_sm_backend(LServer),
|
Mod = get_sm_backend(LServer),
|
||||||
case online(get_sessions(Mod, LUser, LServer, LResource)) of
|
case get_sessions(Mod, LUser, LServer, LResource) of
|
||||||
[] ->
|
[] ->
|
||||||
offline;
|
offline;
|
||||||
Ss ->
|
Ss ->
|
||||||
@ -316,7 +313,7 @@ get_session_sid(User, Server, Resource) ->
|
|||||||
LServer = jid:nameprep(Server),
|
LServer = jid:nameprep(Server),
|
||||||
LResource = jid:resourceprep(Resource),
|
LResource = jid:resourceprep(Resource),
|
||||||
Mod = get_sm_backend(LServer),
|
Mod = get_sm_backend(LServer),
|
||||||
case online(get_sessions(Mod, LUser, LServer, LResource)) of
|
case get_sessions(Mod, LUser, LServer, LResource) of
|
||||||
[] ->
|
[] ->
|
||||||
none;
|
none;
|
||||||
Ss ->
|
Ss ->
|
||||||
@ -330,43 +327,15 @@ get_session_sids(User, Server) ->
|
|||||||
LUser = jid:nodeprep(User),
|
LUser = jid:nodeprep(User),
|
||||||
LServer = jid:nameprep(Server),
|
LServer = jid:nameprep(Server),
|
||||||
Mod = get_sm_backend(LServer),
|
Mod = get_sm_backend(LServer),
|
||||||
Sessions = online(get_sessions(Mod, LUser, LServer)),
|
Sessions = get_sessions(Mod, LUser, LServer),
|
||||||
[SID || #session{sid = SID} <- Sessions].
|
[SID || #session{sid = SID} <- Sessions].
|
||||||
|
|
||||||
-spec set_offline_info(sid(), binary(), binary(), binary(), info()) -> ok.
|
|
||||||
|
|
||||||
set_offline_info(SID, User, Server, Resource, Info) ->
|
|
||||||
LUser = jid:nodeprep(User),
|
|
||||||
LServer = jid:nameprep(Server),
|
|
||||||
LResource = jid:resourceprep(Resource),
|
|
||||||
set_session(SID, LUser, LServer, LResource, undefined, [offline | Info]).
|
|
||||||
|
|
||||||
-spec get_offline_info(erlang:timestamp(), binary(), binary(),
|
|
||||||
binary()) -> none | info().
|
|
||||||
|
|
||||||
get_offline_info(Time, User, Server, Resource) ->
|
|
||||||
LUser = jid:nodeprep(User),
|
|
||||||
LServer = jid:nameprep(Server),
|
|
||||||
LResource = jid:resourceprep(Resource),
|
|
||||||
Mod = get_sm_backend(LServer),
|
|
||||||
case get_sessions(Mod, LUser, LServer, LResource) of
|
|
||||||
[#session{sid = {Time, _}, info = Info}] ->
|
|
||||||
case proplists:get_bool(offline, Info) of
|
|
||||||
true ->
|
|
||||||
Info;
|
|
||||||
false ->
|
|
||||||
none
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
none
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec dirty_get_sessions_list() -> [ljid()].
|
-spec dirty_get_sessions_list() -> [ljid()].
|
||||||
|
|
||||||
dirty_get_sessions_list() ->
|
dirty_get_sessions_list() ->
|
||||||
lists:flatmap(
|
lists:flatmap(
|
||||||
fun(Mod) ->
|
fun(Mod) ->
|
||||||
[S#session.usr || S <- online(get_sessions(Mod))]
|
[S#session.usr || S <- get_sessions(Mod)]
|
||||||
end, get_sm_backends()).
|
end, get_sm_backends()).
|
||||||
|
|
||||||
-spec dirty_get_my_sessions_list() -> [#session{}].
|
-spec dirty_get_my_sessions_list() -> [#session{}].
|
||||||
@ -374,7 +343,7 @@ dirty_get_sessions_list() ->
|
|||||||
dirty_get_my_sessions_list() ->
|
dirty_get_my_sessions_list() ->
|
||||||
lists:flatmap(
|
lists:flatmap(
|
||||||
fun(Mod) ->
|
fun(Mod) ->
|
||||||
[S || S <- online(get_sessions(Mod)),
|
[S || S <- get_sessions(Mod),
|
||||||
node(element(2, S#session.sid)) == node()]
|
node(element(2, S#session.sid)) == node()]
|
||||||
end, get_sm_backends()).
|
end, get_sm_backends()).
|
||||||
|
|
||||||
@ -383,14 +352,14 @@ dirty_get_my_sessions_list() ->
|
|||||||
get_vh_session_list(Server) ->
|
get_vh_session_list(Server) ->
|
||||||
LServer = jid:nameprep(Server),
|
LServer = jid:nameprep(Server),
|
||||||
Mod = get_sm_backend(LServer),
|
Mod = get_sm_backend(LServer),
|
||||||
[S#session.usr || S <- online(get_sessions(Mod, LServer))].
|
[S#session.usr || S <- get_sessions(Mod, LServer)].
|
||||||
|
|
||||||
-spec get_all_pids() -> [pid()].
|
-spec get_all_pids() -> [pid()].
|
||||||
|
|
||||||
get_all_pids() ->
|
get_all_pids() ->
|
||||||
lists:flatmap(
|
lists:flatmap(
|
||||||
fun(Mod) ->
|
fun(Mod) ->
|
||||||
[element(2, S#session.sid) || S <- online(get_sessions(Mod))]
|
[element(2, S#session.sid) || S <- get_sessions(Mod)]
|
||||||
end, get_sm_backends()).
|
end, get_sm_backends()).
|
||||||
|
|
||||||
-spec get_vh_session_number(binary()) -> non_neg_integer().
|
-spec get_vh_session_number(binary()) -> non_neg_integer().
|
||||||
@ -398,7 +367,7 @@ get_all_pids() ->
|
|||||||
get_vh_session_number(Server) ->
|
get_vh_session_number(Server) ->
|
||||||
LServer = jid:nameprep(Server),
|
LServer = jid:nameprep(Server),
|
||||||
Mod = get_sm_backend(LServer),
|
Mod = get_sm_backend(LServer),
|
||||||
length(online(get_sessions(Mod, LServer))).
|
length(get_sessions(Mod, LServer)).
|
||||||
|
|
||||||
%% Why the hell do we have so many similar kicks?
|
%% Why the hell do we have so many similar kicks?
|
||||||
c2s_handle_info(#{lang := Lang} = State, replaced) ->
|
c2s_handle_info(#{lang := Lang} = State, replaced) ->
|
||||||
@ -579,16 +548,6 @@ delete_session(Mod, #session{usr = {LUser, LServer, _}} = Session) ->
|
|||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec online([#session{}]) -> [#session{}].
|
|
||||||
|
|
||||||
online(Sessions) ->
|
|
||||||
lists:filter(fun is_online/1, Sessions).
|
|
||||||
|
|
||||||
-spec is_online(#session{}) -> boolean().
|
|
||||||
|
|
||||||
is_online(#session{info = Info}) ->
|
|
||||||
not proplists:get_bool(offline, Info).
|
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
-spec do_route(jid(), term()) -> any().
|
-spec do_route(jid(), term()) -> any().
|
||||||
do_route(#jid{lresource = <<"">>} = To, Term) ->
|
do_route(#jid{lresource = <<"">>} = To, Term) ->
|
||||||
@ -600,7 +559,7 @@ do_route(To, Term) ->
|
|||||||
?DEBUG("broadcasting ~p to ~s", [Term, jid:encode(To)]),
|
?DEBUG("broadcasting ~p to ~s", [Term, jid:encode(To)]),
|
||||||
{U, S, R} = jid:tolower(To),
|
{U, S, R} = jid:tolower(To),
|
||||||
Mod = get_sm_backend(S),
|
Mod = get_sm_backend(S),
|
||||||
case online(get_sessions(Mod, U, S, R)) of
|
case get_sessions(Mod, U, S, R) of
|
||||||
[] ->
|
[] ->
|
||||||
?DEBUG("dropping broadcast to unavailable resourse: ~p", [Term]);
|
?DEBUG("dropping broadcast to unavailable resourse: ~p", [Term]);
|
||||||
Ss ->
|
Ss ->
|
||||||
@ -631,7 +590,7 @@ do_route(#presence{to = To, type = T} = Packet)
|
|||||||
ejabberd_c2s:route(Pid, {route, Packet1});
|
ejabberd_c2s:route(Pid, {route, Packet1});
|
||||||
(_) ->
|
(_) ->
|
||||||
ok
|
ok
|
||||||
end, online(get_sessions(Mod, LUser, LServer)));
|
end, get_sessions(Mod, LUser, LServer));
|
||||||
false ->
|
false ->
|
||||||
ok
|
ok
|
||||||
end;
|
end;
|
||||||
@ -660,7 +619,7 @@ do_route(Packet) ->
|
|||||||
To = xmpp:get_to(Packet),
|
To = xmpp:get_to(Packet),
|
||||||
{LUser, LServer, LResource} = jid:tolower(To),
|
{LUser, LServer, LResource} = jid:tolower(To),
|
||||||
Mod = get_sm_backend(LServer),
|
Mod = get_sm_backend(LServer),
|
||||||
case online(get_sessions(Mod, LUser, LServer, LResource)) of
|
case get_sessions(Mod, LUser, LServer, LResource) of
|
||||||
[] ->
|
[] ->
|
||||||
case Packet of
|
case Packet of
|
||||||
#message{type = T} when T == chat; T == normal ->
|
#message{type = T} when T == chat; T == normal ->
|
||||||
@ -708,8 +667,8 @@ route_message(#message{to = To, type = Type} = Packet) ->
|
|||||||
(P >= 0) and (Type == headline) ->
|
(P >= 0) and (Type == headline) ->
|
||||||
LResource = jid:resourceprep(R),
|
LResource = jid:resourceprep(R),
|
||||||
Mod = get_sm_backend(LServer),
|
Mod = get_sm_backend(LServer),
|
||||||
case online(get_sessions(Mod, LUser, LServer,
|
case get_sessions(Mod, LUser, LServer,
|
||||||
LResource)) of
|
LResource) of
|
||||||
[] ->
|
[] ->
|
||||||
ok; % Race condition
|
ok; % Race condition
|
||||||
Ss ->
|
Ss ->
|
||||||
@ -780,13 +739,9 @@ check_for_sessions_to_replace(User, Server, Resource) ->
|
|||||||
check_existing_resources(LUser, LServer, LResource) ->
|
check_existing_resources(LUser, LServer, LResource) ->
|
||||||
Mod = get_sm_backend(LServer),
|
Mod = get_sm_backend(LServer),
|
||||||
Ss = get_sessions(Mod, LUser, LServer, LResource),
|
Ss = get_sessions(Mod, LUser, LServer, LResource),
|
||||||
{OnlineSs, OfflineSs} = lists:partition(fun is_online/1, Ss),
|
if Ss == [] -> ok;
|
||||||
lists:foreach(fun(S) ->
|
|
||||||
delete_session(Mod, S)
|
|
||||||
end, OfflineSs),
|
|
||||||
if OnlineSs == [] -> ok;
|
|
||||||
true ->
|
true ->
|
||||||
SIDs = [SID || #session{sid = SID} <- OnlineSs],
|
SIDs = [SID || #session{sid = SID} <- Ss],
|
||||||
MaxSID = lists:max(SIDs),
|
MaxSID = lists:max(SIDs),
|
||||||
lists:foreach(fun ({_, Pid} = S) when S /= MaxSID ->
|
lists:foreach(fun ({_, Pid} = S) when S /= MaxSID ->
|
||||||
ejabberd_c2s:route(Pid, replaced);
|
ejabberd_c2s:route(Pid, replaced);
|
||||||
@ -806,22 +761,17 @@ get_resource_sessions(User, Server, Resource) ->
|
|||||||
LServer = jid:nameprep(Server),
|
LServer = jid:nameprep(Server),
|
||||||
LResource = jid:resourceprep(Resource),
|
LResource = jid:resourceprep(Resource),
|
||||||
Mod = get_sm_backend(LServer),
|
Mod = get_sm_backend(LServer),
|
||||||
[S#session.sid || S <- online(get_sessions(Mod, LUser, LServer, LResource))].
|
[S#session.sid || S <- get_sessions(Mod, LUser, LServer, LResource)].
|
||||||
|
|
||||||
-spec check_max_sessions(binary(), binary()) -> ok | replaced.
|
-spec check_max_sessions(binary(), binary()) -> ok | replaced.
|
||||||
check_max_sessions(LUser, LServer) ->
|
check_max_sessions(LUser, LServer) ->
|
||||||
Mod = get_sm_backend(LServer),
|
Mod = get_sm_backend(LServer),
|
||||||
Ss = get_sessions(Mod, LUser, LServer),
|
Ss = get_sessions(Mod, LUser, LServer),
|
||||||
{OnlineSs, OfflineSs} = lists:partition(fun is_online/1, Ss),
|
|
||||||
MaxSessions = get_max_user_sessions(LUser, LServer),
|
MaxSessions = get_max_user_sessions(LUser, LServer),
|
||||||
if length(OnlineSs) =< MaxSessions -> ok;
|
if length(Ss) =< MaxSessions -> ok;
|
||||||
true ->
|
true ->
|
||||||
#session{sid = {_, Pid}} = lists:min(OnlineSs),
|
#session{sid = {_, Pid}} = lists:min(Ss),
|
||||||
ejabberd_c2s:route(Pid, replaced)
|
ejabberd_c2s:route(Pid, replaced)
|
||||||
end,
|
|
||||||
if length(OfflineSs) =< MaxSessions -> ok;
|
|
||||||
true ->
|
|
||||||
delete_session(Mod, lists:min(OfflineSs))
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% Get the user_max_session setting
|
%% Get the user_max_session setting
|
||||||
@ -843,7 +793,7 @@ get_max_user_sessions(LUser, Host) ->
|
|||||||
|
|
||||||
force_update_presence({LUser, LServer}) ->
|
force_update_presence({LUser, LServer}) ->
|
||||||
Mod = get_sm_backend(LServer),
|
Mod = get_sm_backend(LServer),
|
||||||
Ss = online(get_sessions(Mod, LUser, LServer)),
|
Ss = get_sessions(Mod, LUser, LServer),
|
||||||
lists:foreach(fun (#session{sid = {_, Pid}}) ->
|
lists:foreach(fun (#session{sid = {_, Pid}}) ->
|
||||||
ejabberd_c2s:resend_presence(Pid)
|
ejabberd_c2s:resend_presence(Pid)
|
||||||
end,
|
end,
|
||||||
|
@ -40,6 +40,8 @@
|
|||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
-include("p1_queue.hrl").
|
-include("p1_queue.hrl").
|
||||||
|
|
||||||
|
-define(STREAM_MGMT_CACHE, stream_mgmt_cache).
|
||||||
|
|
||||||
-define(is_sm_packet(Pkt),
|
-define(is_sm_packet(Pkt),
|
||||||
is_record(Pkt, sm_enable) or
|
is_record(Pkt, sm_enable) or
|
||||||
is_record(Pkt, sm_resume) or
|
is_record(Pkt, sm_resume) or
|
||||||
@ -51,7 +53,8 @@
|
|||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% API
|
%%% API
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
start(Host, _Opts) ->
|
start(Host, Opts) ->
|
||||||
|
init_cache(Opts),
|
||||||
ejabberd_hooks:add(c2s_init, ?MODULE, c2s_stream_init, 50),
|
ejabberd_hooks:add(c2s_init, ?MODULE, c2s_stream_init, 50),
|
||||||
ejabberd_hooks:add(c2s_stream_started, Host, ?MODULE,
|
ejabberd_hooks:add(c2s_stream_started, Host, ?MODULE,
|
||||||
c2s_stream_started, 50),
|
c2s_stream_started, 50),
|
||||||
@ -284,23 +287,16 @@ c2s_terminated(#{mgmt_state := resumed, jid := JID} = State, _Reason) ->
|
|||||||
[jid:encode(JID)]),
|
[jid:encode(JID)]),
|
||||||
bounce_message_queue(),
|
bounce_message_queue(),
|
||||||
{stop, State};
|
{stop, State};
|
||||||
c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In, sid := SID,
|
c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In,
|
||||||
user := U, server := S, resource := R} = State, Reason) ->
|
sid := {Time, _}, jid := JID} = State, _Reason) ->
|
||||||
Result = case MgmtState of
|
case MgmtState of
|
||||||
timeout ->
|
timeout ->
|
||||||
Info = [{num_stanzas_in, In}],
|
store_stanzas_in(jid:tolower(JID), Time, In);
|
||||||
%% TODO: Usually, ejabberd_c2s:process_terminated/2 is
|
_ ->
|
||||||
%% called later in the hook chain. We swap the order so
|
ok
|
||||||
%% that the offline info won't be purged after we stored
|
end,
|
||||||
%% it. This should be fixed in a proper way.
|
|
||||||
State1 = ejabberd_c2s:process_terminated(State, Reason),
|
|
||||||
ejabberd_sm:set_offline_info(SID, U, S, R, Info),
|
|
||||||
{stop, State1};
|
|
||||||
_ ->
|
|
||||||
State
|
|
||||||
end,
|
|
||||||
route_unacked_stanzas(State),
|
route_unacked_stanzas(State),
|
||||||
Result;
|
State;
|
||||||
c2s_terminated(State, _Reason) ->
|
c2s_terminated(State, _Reason) ->
|
||||||
State.
|
State.
|
||||||
|
|
||||||
@ -641,16 +637,11 @@ inherit_session_state(#{user := U, server := S,
|
|||||||
{term, {R, Time}} ->
|
{term, {R, Time}} ->
|
||||||
case ejabberd_sm:get_session_pid(U, S, R) of
|
case ejabberd_sm:get_session_pid(U, S, R) of
|
||||||
none ->
|
none ->
|
||||||
case ejabberd_sm:get_offline_info(Time, U, S, R) of
|
case pop_stanzas_in({U, S, R}, Time) of
|
||||||
none ->
|
error ->
|
||||||
{error, <<"Previous session PID not found">>};
|
{error, <<"Previous session PID not found">>};
|
||||||
Info ->
|
{ok, H} ->
|
||||||
case proplists:get_value(num_stanzas_in, Info) of
|
{error, <<"Previous session timed out">>, H}
|
||||||
undefined ->
|
|
||||||
{error, <<"Previous session timed out">>};
|
|
||||||
H ->
|
|
||||||
{error, <<"Previous session timed out">>, H}
|
|
||||||
end
|
|
||||||
end;
|
end;
|
||||||
OldPID ->
|
OldPID ->
|
||||||
OldSID = {Time, OldPID},
|
OldSID = {Time, OldPID},
|
||||||
@ -750,6 +741,32 @@ need_to_enqueue(#{mgmt_force_enqueue := true} = State, #xmlel{}) ->
|
|||||||
need_to_enqueue(State, _) ->
|
need_to_enqueue(State, _) ->
|
||||||
{false, State}.
|
{false, State}.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% Cache-like storage for last handled stanzas
|
||||||
|
%%%===================================================================
|
||||||
|
init_cache(Opts) ->
|
||||||
|
ets_cache:new(?STREAM_MGMT_CACHE, cache_opts(Opts)).
|
||||||
|
|
||||||
|
cache_opts(Opts) ->
|
||||||
|
[{max_size, gen_mod:get_opt(cache_size, Opts)},
|
||||||
|
{life_time, infinity}].
|
||||||
|
|
||||||
|
-spec store_stanzas_in(ljid(), erlang:timestamp(), non_neg_integer()) -> boolean().
|
||||||
|
store_stanzas_in(LJID, Time, Num) ->
|
||||||
|
ets_cache:insert(?STREAM_MGMT_CACHE, {LJID, Time}, Num,
|
||||||
|
ejabberd_cluster:get_nodes()).
|
||||||
|
|
||||||
|
-spec pop_stanzas_in(ljid(), erlang:timestamp()) -> {ok, non_neg_integer()} | error.
|
||||||
|
pop_stanzas_in(LJID, Time) ->
|
||||||
|
case ets_cache:lookup(?STREAM_MGMT_CACHE, {LJID, Time}) of
|
||||||
|
{ok, Val} ->
|
||||||
|
ets_cache:delete(?STREAM_MGMT_CACHE, {LJID, Time},
|
||||||
|
ejabberd_cluster:get_nodes()),
|
||||||
|
{ok, Val};
|
||||||
|
error ->
|
||||||
|
error
|
||||||
|
end.
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% Configuration processing
|
%%% Configuration processing
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
@ -796,6 +813,11 @@ mod_opt_type(resend_on_timeout) ->
|
|||||||
fun(B) when is_boolean(B) -> B;
|
fun(B) when is_boolean(B) -> B;
|
||||||
(if_offline) -> if_offline
|
(if_offline) -> if_offline
|
||||||
end;
|
end;
|
||||||
|
mod_opt_type(cache_size) ->
|
||||||
|
fun(I) when is_integer(I), I>0 -> I;
|
||||||
|
(unlimited) -> infinity;
|
||||||
|
(infinity) -> infinity
|
||||||
|
end;
|
||||||
mod_opt_type(queue_type) ->
|
mod_opt_type(queue_type) ->
|
||||||
fun(ram) -> ram; (file) -> file end.
|
fun(ram) -> ram; (file) -> file end.
|
||||||
|
|
||||||
@ -804,5 +826,6 @@ mod_options(Host) ->
|
|||||||
{resume_timeout, 300},
|
{resume_timeout, 300},
|
||||||
{max_resume_timeout, undefined},
|
{max_resume_timeout, undefined},
|
||||||
{ack_timeout, 60},
|
{ack_timeout, 60},
|
||||||
|
{cache_size, ejabberd_config:cache_size(Host)},
|
||||||
{resend_on_timeout, false},
|
{resend_on_timeout, false},
|
||||||
{queue_type, ejabberd_config:default_queue_type(Host)}].
|
{queue_type, ejabberd_config:default_queue_type(Host)}].
|
||||||
|
Loading…
Reference in New Issue
Block a user