diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl index 3ec3eb72a..981b357b7 100644 --- a/src/mod_stream_mgmt.erl +++ b/src/mod_stream_mgmt.erl @@ -180,22 +180,17 @@ c2s_handle_recv(State, _, _) -> c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod, lang := Lang} = State, Pkt, SendResult) when MgmtState == pending; MgmtState == active -> - case xmpp:is_stanza(Pkt) of - true -> + case Pkt of + _ when ?is_stanza(Pkt) -> Meta = xmpp:get_meta(Pkt), case maps:get(mgmt_is_resent, Meta, false) of false -> case mgmt_queue_add(State, Pkt) of #{mgmt_max_queue := exceeded} = State1 -> State2 = State1#{mgmt_resend => false}, - case MgmtState of - active -> - Err = xmpp:serr_policy_violation( - <<"Too many unacked stanzas">>, Lang), - send(State2, Err); - _ -> - Mod:stop(State2) - end; + Err = xmpp:serr_policy_violation( + <<"Too many unacked stanzas">>, Lang), + send(State2, Err); State1 when SendResult == ok -> send_rack(State1); State1 -> @@ -204,7 +199,14 @@ c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod, true -> State end; - false -> + #stream_error{} -> + case MgmtState of + active -> + State; + pending -> + Mod:stop(State#{stop_reason => {stream, {out, Pkt}}}) + end; + _ -> State end; c2s_handle_send(State, _Pkt, _Result) -> diff --git a/test/ejabberd_SUITE.erl b/test/ejabberd_SUITE.erl index 089d9d6a6..175a6e69a 100644 --- a/test/ejabberd_SUITE.erl +++ b/test/ejabberd_SUITE.erl @@ -372,6 +372,7 @@ no_db_tests() -> s2s_required, s2s_required_trusted]}, sm_tests:single_cases(), + sm_tests:master_slave_cases(), muc_tests:single_cases(), muc_tests:master_slave_cases(), proxy65_tests:single_cases(), diff --git a/test/ejabberd_SUITE_data/ejabberd.yml b/test/ejabberd_SUITE_data/ejabberd.yml index 1eede5331..e766d0cc3 100644 --- a/test/ejabberd_SUITE_data/ejabberd.yml +++ b/test/ejabberd_SUITE_data/ejabberd.yml @@ -464,7 +464,8 @@ Welcome to this XMPP server." mod_stats: [] mod_s2s_dialback: [] mod_legacy_auth: [] - mod_stream_mgmt: [] + mod_stream_mgmt: + max_ack_queue: 10 mod_time: [] mod_version: [] registration_timeout: infinity diff --git a/test/sm_tests.erl b/test/sm_tests.erl index 37cecf5b7..134a2f951 100644 --- a/test/sm_tests.erl +++ b/test/sm_tests.erl @@ -26,7 +26,8 @@ %% API -compile(export_all). -import(suite, [send/2, recv/1, close_socket/1, set_opt/3, my_jid/1, - recv_message/1, disconnect/1]). + recv_message/1, disconnect/1, send_recv/2, + put_event/2, get_event/1]). -include("suite.hrl"). @@ -109,7 +110,51 @@ resume_failed(Config) -> %%% Master-slave tests %%%=================================================================== master_slave_cases() -> - {sm_master_slave, [sequence], []}. + {sm_master_slave, [sequence], + [master_slave_test(queue_limit), + master_slave_test(queue_limit_detached)]}. + +queue_limit_master(Config) -> + ct:comment("Waiting for 'send' command from the peer"), + send = get_event(Config), + send_recv_messages(Config), + ct:comment("Waiting for peer to disconnect"), + peer_down = get_event(Config), + disconnect(Config). + +queue_limit_slave(Config) -> + ct:comment("Enable the session management without resumption"), + send(Config, #sm_enable{xmlns = ?NS_STREAM_MGMT_3}), + #sm_enabled{resume = false} = recv(Config), + put_event(Config, send), + ct:comment("Receiving all messages"), + lists:foreach( + fun(I) -> + ID = integer_to_binary(I), + Body = xmpp:mk_text(ID), + #message{id = ID, body = Body} = recv_message(Config) + end, lists:seq(1, 11)), + ct:comment("Receiving request ACK"), + #sm_r{} = recv(Config), + ct:comment("Receiving policy-violation stream error"), + #stream_error{reason = 'policy-violation'} = recv(Config), + {xmlstreamend, <<"stream:stream">>} = recv(Config), + ct:comment("Closing socket"), + close_socket(Config). + +queue_limit_detached_master(Config) -> + ct:comment("Waiting for the peer to disconnect"), + peer_down = get_event(Config), + send_recv_messages(Config), + disconnect(Config). + +queue_limit_detached_slave(Config) -> + #presence{} = send_recv(Config, #presence{}), + ct:comment("Enable the session management with resumption enabled"), + send(Config, #sm_enable{resume = true, xmlns = ?NS_STREAM_MGMT_3}), + #sm_enabled{resume = true} = recv(Config), + ct:comment("Closing socket"), + close_socket(Config). %%%=================================================================== %%% Internal functions @@ -121,3 +166,20 @@ master_slave_test(T) -> {list_to_atom("sm_" ++ atom_to_list(T)), [parallel], [list_to_atom("sm_" ++ atom_to_list(T) ++ "_master"), list_to_atom("sm_" ++ atom_to_list(T) ++ "_slave")]}. + +send_recv_messages(Config) -> + PeerJID = ?config(peer, Config), + Msg = #message{to = PeerJID}, + ct:comment("Sending messages to peer"), + lists:foreach( + fun(I) -> + ID = integer_to_binary(I), + send(Config, Msg#message{id = ID, body = xmpp:mk_text(ID)}) + end, lists:seq(1, 11)), + ct:comment("Receiving bounced messages from the peer"), + lists:foreach( + fun(I) -> + ID = integer_to_binary(I), + Err = #message{id = ID, type = error} = recv_message(Config), + #stanza_error{reason = 'service-unavailable'} = xmpp:get_error(Err) + end, lists:seq(1, 11)).