25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-11-28 16:34:13 +01:00

Make resumed sessions try to deliver possibly queued messages to new session

Between receiving resume request and being closed by new session, it's
possible (even if not very likely) that new messages would arrive to
process that is resumed. In that case try to reroute messages that were
received after we sent resume reply to new process.
This commit is contained in:
Paweł Chmielowski 2020-04-07 14:51:49 +02:00
parent 16585713f8
commit 9bb3aee0e2

View File

@ -192,7 +192,7 @@ c2s_handle_recv(State, _, _) ->
c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod, c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod,
lang := Lang} = State, Pkt, SendResult) lang := Lang} = State, Pkt, SendResult)
when MgmtState == pending; MgmtState == active -> when MgmtState == pending; MgmtState == active; MgmtState == resumed ->
IsStanza = xmpp:is_stanza(Pkt), IsStanza = xmpp:is_stanza(Pkt),
case Pkt of case Pkt of
_ when IsStanza -> _ when IsStanza ->
@ -214,6 +214,8 @@ c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod,
end; end;
#stream_error{} -> #stream_error{} ->
case MgmtState of case MgmtState of
resumed ->
State;
active -> active ->
State; State;
pending -> pending ->
@ -230,7 +232,7 @@ c2s_handle_call(#{sid := {Time, _}, mod := Mod, mgmt_queue := Queue} = State,
{resume_session, Time}, From) -> {resume_session, Time}, From) ->
State1 = State#{mgmt_queue => p1_queue:file_to_ram(Queue)}, State1 = State#{mgmt_queue => p1_queue:file_to_ram(Queue)},
Mod:reply(From, {resume, State1}), Mod:reply(From, {resume, State1}),
{stop, State#{mgmt_state => resumed}}; {stop, State#{mgmt_state => resumed, mgmt_queue => p1_queue:clear(Queue)}};
c2s_handle_call(#{mod := Mod} = State, {resume_session, _}, From) -> c2s_handle_call(#{mod := Mod} = State, {resume_session, _}, From) ->
Mod:reply(From, {error, session_not_found}), Mod:reply(From, {error, session_not_found}),
{stop, State}; {stop, State};
@ -282,6 +284,7 @@ c2s_terminated(#{mgmt_state := resumed, sid := SID, jid := JID} = State, _Reason
[jid:encode(JID)]), [jid:encode(JID)]),
{U, S, R} = jid:tolower(JID), {U, S, R} = jid:tolower(JID),
ejabberd_sm:close_session(SID, U, S, R), ejabberd_sm:close_session(SID, U, S, R),
route_late_queue_after_resume(State),
ejabberd_c2s:bounce_message_queue(SID, JID), ejabberd_c2s:bounce_message_queue(SID, JID),
{stop, State}; {stop, State};
c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In, c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In,
@ -544,6 +547,18 @@ check_queue_length(#{mgmt_queue := Queue, mgmt_max_queue := Limit} = State) ->
State State
end. end.
-spec route_late_queue_after_resume(state()) -> ok.
route_late_queue_after_resume(#{mgmt_queue := Queue, jid := JID})
when ?qlen(Queue) > 0 ->
?DEBUG("Re-routing ~B late queued packets to ~ts",
[p1_queue:len(Queue), jid:encode(JID)]),
p1_queue:foreach(
fun({_, _Time, Pkt}) ->
ejabberd_router:route(Pkt)
end, Queue);
route_late_queue_after_resume(_State) ->
ok.
-spec resend_unacked_stanzas(state()) -> state(). -spec resend_unacked_stanzas(state()) -> state().
resend_unacked_stanzas(#{mgmt_state := MgmtState, resend_unacked_stanzas(#{mgmt_state := MgmtState,
mgmt_queue := Queue, mgmt_queue := Queue,