diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl index fb1a609b1..0c73eb7f7 100644 --- a/src/mod_stream_mgmt.erl +++ b/src/mod_stream_mgmt.erl @@ -192,7 +192,7 @@ c2s_handle_recv(State, _, _) -> c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod, lang := Lang} = State, Pkt, SendResult) - when MgmtState == pending; MgmtState == active -> + when MgmtState == pending; MgmtState == active; MgmtState == resumed -> IsStanza = xmpp:is_stanza(Pkt), case Pkt of _ when IsStanza -> @@ -214,6 +214,8 @@ c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod, end; #stream_error{} -> case MgmtState of + resumed -> + State; active -> State; pending -> @@ -230,7 +232,7 @@ c2s_handle_call(#{sid := {Time, _}, mod := Mod, mgmt_queue := Queue} = State, {resume_session, Time}, From) -> State1 = State#{mgmt_queue => p1_queue:file_to_ram(Queue)}, Mod:reply(From, {resume, State1}), - {stop, State#{mgmt_state => resumed}}; + {stop, State#{mgmt_state => resumed, mgmt_queue => p1_queue:clear(Queue)}}; c2s_handle_call(#{mod := Mod} = State, {resume_session, _}, From) -> Mod:reply(From, {error, session_not_found}), {stop, State}; @@ -282,6 +284,7 @@ c2s_terminated(#{mgmt_state := resumed, sid := SID, jid := JID} = State, _Reason [jid:encode(JID)]), {U, S, R} = jid:tolower(JID), ejabberd_sm:close_session(SID, U, S, R), + route_late_queue_after_resume(State), ejabberd_c2s:bounce_message_queue(SID, JID), {stop, State}; c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In, @@ -544,6 +547,18 @@ check_queue_length(#{mgmt_queue := Queue, mgmt_max_queue := Limit} = State) -> State end. +-spec route_late_queue_after_resume(state()) -> ok. +route_late_queue_after_resume(#{mgmt_queue := Queue, jid := JID}) + when ?qlen(Queue) > 0 -> + ?DEBUG("Re-routing ~B late queued packets to ~ts", + [p1_queue:len(Queue), jid:encode(JID)]), + p1_queue:foreach( + fun({_, _Time, Pkt}) -> + ejabberd_router:route(Pkt) + end, Queue); +route_late_queue_after_resume(_State) -> + ok. + -spec resend_unacked_stanzas(state()) -> state(). resend_unacked_stanzas(#{mgmt_state := MgmtState, mgmt_queue := Queue,