Make sure queue bouncing doesn't yield into infinite recursion

This commit is contained in:
Evgeny Khramtsov 2019-06-26 11:56:25 +03:00
parent ffe1c722e0
commit 4e5daf4d72
4 changed files with 49 additions and 28 deletions

View File

@ -47,7 +47,7 @@
-export([get_presence/1, set_presence/2, resend_presence/1, resend_presence/2,
open_session/1, call/3, cast/2, send/2, close/1, close/2, stop/1,
reply/2, copy_state/2, set_timeout/2, route/2,
host_up/1, host_down/1, send_ws_ping/1]).
host_up/1, host_down/1, send_ws_ping/1, bounce_message_queue/2]).
-include("xmpp.hrl").
-include("logger.hrl").
@ -299,7 +299,7 @@ process_terminated(#{sid := SID, socket := Socket,
ejabberd_sm:close_session(SID, U, S, R),
State
end,
bounce_message_queue(),
bounce_message_queue(SID, JID),
State1;
process_terminated(#{socket := Socket,
stop_reason := {tls, _}} = State, Reason) ->
@ -882,13 +882,23 @@ resource_conflict_action(U, S, R) ->
{accept_resource, Rnew}
end.
-spec bounce_message_queue() -> ok.
bounce_message_queue() ->
receive {route, Pkt} ->
ejabberd_router:route(Pkt),
bounce_message_queue()
after 0 ->
ok
-spec bounce_message_queue(ejabberd_sm:sid(), jid:jid()) -> ok.
bounce_message_queue(SID, JID) ->
{U, S, R} = jid:tolower(JID),
SIDs = ejabberd_sm:get_session_sids(U, S, R),
case lists:member(SID, SIDs) of
true ->
?WARNING_MSG("The session for ~s@~s/~s is supposed to "
"be unregistered, but session identifier ~p "
"still presents in the 'session' table",
[U, S, R]);
false ->
receive {route, Pkt} ->
ejabberd_router:route(Pkt),
bounce_message_queue(SID, JID)
after 0 ->
ok
end
end.
-spec new_uniq_id() -> binary().

View File

@ -310,7 +310,7 @@ terminate(Reason, #{server := LServer,
_ -> State#{stop_reason => internal_failure}
end,
State2 = bounce_queue(State1),
bounce_message_queue(State2).
bounce_message_queue({LServer, RServer}, State2).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -332,13 +332,22 @@ bounce_queue(State) ->
bounce_packet(Pkt, AccState)
end, State).
-spec bounce_message_queue(state()) -> state().
bounce_message_queue(State) ->
receive {route, Pkt} ->
State1 = bounce_packet(Pkt, State),
bounce_message_queue(State1)
after 0 ->
State
-spec bounce_message_queue({binary(), binary()}, state()) -> state().
bounce_message_queue({LServer, RServer} = FromTo, State) ->
Pids = ejabberd_s2s:get_connections_pids(FromTo),
case lists:member(self(), Pids) of
true ->
?WARNING_MSG("Outgoing s2s connection ~s -> ~s is supposed "
"to be unregistered, but pid ~p still presents "
"in 's2s' table", [LServer, RServer, self()]),
State;
false ->
receive {route, Pkt} ->
State1 = bounce_packet(Pkt, State),
bounce_message_queue(FromTo, State1)
after 0 ->
State
end
end.
-spec bounce_packet(xmpp_element(), state()) -> state().

View File

@ -63,6 +63,7 @@
get_session_pid/3,
get_session_sid/3,
get_session_sids/2,
get_session_sids/3,
get_user_info/2,
get_user_info/3,
set_user_info/5,
@ -400,6 +401,16 @@ get_session_sids(User, Server) ->
Sessions = get_sessions(Mod, LUser, LServer),
[SID || #session{sid = SID} <- Sessions].
-spec get_session_sids(binary(), binary(), binary()) -> [sid()].
get_session_sids(User, Server, Resource) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
Sessions = get_sessions(Mod, LUser, LServer, LResource),
[SID || #session{sid = SID} <- Sessions].
-spec dirty_get_sessions_list() -> [ljid()].
dirty_get_sessions_list() ->

View File

@ -266,10 +266,10 @@ c2s_closed(#{mgmt_state := active} = State, _Reason) ->
c2s_closed(State, _Reason) ->
State.
c2s_terminated(#{mgmt_state := resumed, jid := JID} = State, _Reason) ->
c2s_terminated(#{mgmt_state := resumed, sid := SID, jid := JID} = State, _Reason) ->
?DEBUG("Closing former stream of resumed session for ~s",
[jid:encode(JID)]),
bounce_message_queue(),
ejabberd_c2s:bounce_message_queue(SID, JID),
{stop, State};
c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In,
sid := {Time, _}, jid := JID} = State, _Reason) ->
@ -705,15 +705,6 @@ cancel_ack_timer(#{mgmt_ack_timer := TRef} = State) ->
cancel_ack_timer(State) ->
State.
-spec bounce_message_queue() -> ok.
bounce_message_queue() ->
receive {route, Pkt} ->
ejabberd_router:route(Pkt),
bounce_message_queue()
after 0 ->
ok
end.
-spec need_to_enqueue(state(), xmlel() | stanza()) -> {boolean(), state()}.
need_to_enqueue(State, Pkt) when ?is_stanza(Pkt) ->
{not xmpp:get_meta(Pkt, mgmt_is_resent, false), State};