25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-12-22 17:28:25 +01:00

Test stream management queue overload

This commit is contained in:
Evgeniy Khramtsov 2017-02-21 12:38:03 +03:00
parent e11bcf9c43
commit 6ffd5ffd0c
4 changed files with 80 additions and 14 deletions

View File

@ -180,22 +180,17 @@ c2s_handle_recv(State, _, _) ->
c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod,
lang := Lang} = State, Pkt, SendResult)
when MgmtState == pending; MgmtState == active ->
case xmpp:is_stanza(Pkt) of
true ->
case Pkt of
_ when ?is_stanza(Pkt) ->
Meta = xmpp:get_meta(Pkt),
case maps:get(mgmt_is_resent, Meta, false) of
false ->
case mgmt_queue_add(State, Pkt) of
#{mgmt_max_queue := exceeded} = State1 ->
State2 = State1#{mgmt_resend => false},
case MgmtState of
active ->
Err = xmpp:serr_policy_violation(
<<"Too many unacked stanzas">>, Lang),
send(State2, Err);
_ ->
Mod:stop(State2)
end;
Err = xmpp:serr_policy_violation(
<<"Too many unacked stanzas">>, Lang),
send(State2, Err);
State1 when SendResult == ok ->
send_rack(State1);
State1 ->
@ -204,7 +199,14 @@ c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod,
true ->
State
end;
false ->
#stream_error{} ->
case MgmtState of
active ->
State;
pending ->
Mod:stop(State#{stop_reason => {stream, {out, Pkt}}})
end;
_ ->
State
end;
c2s_handle_send(State, _Pkt, _Result) ->

View File

@ -372,6 +372,7 @@ no_db_tests() ->
s2s_required,
s2s_required_trusted]},
sm_tests:single_cases(),
sm_tests:master_slave_cases(),
muc_tests:single_cases(),
muc_tests:master_slave_cases(),
proxy65_tests:single_cases(),

View File

@ -464,7 +464,8 @@ Welcome to this XMPP server."
mod_stats: []
mod_s2s_dialback: []
mod_legacy_auth: []
mod_stream_mgmt: []
mod_stream_mgmt:
max_ack_queue: 10
mod_time: []
mod_version: []
registration_timeout: infinity

View File

@ -26,7 +26,8 @@
%% API
-compile(export_all).
-import(suite, [send/2, recv/1, close_socket/1, set_opt/3, my_jid/1,
recv_message/1, disconnect/1]).
recv_message/1, disconnect/1, send_recv/2,
put_event/2, get_event/1]).
-include("suite.hrl").
@ -109,7 +110,51 @@ resume_failed(Config) ->
%%% Master-slave tests
%%%===================================================================
master_slave_cases() ->
{sm_master_slave, [sequence], []}.
{sm_master_slave, [sequence],
[master_slave_test(queue_limit),
master_slave_test(queue_limit_detached)]}.
queue_limit_master(Config) ->
ct:comment("Waiting for 'send' command from the peer"),
send = get_event(Config),
send_recv_messages(Config),
ct:comment("Waiting for peer to disconnect"),
peer_down = get_event(Config),
disconnect(Config).
queue_limit_slave(Config) ->
ct:comment("Enable the session management without resumption"),
send(Config, #sm_enable{xmlns = ?NS_STREAM_MGMT_3}),
#sm_enabled{resume = false} = recv(Config),
put_event(Config, send),
ct:comment("Receiving all messages"),
lists:foreach(
fun(I) ->
ID = integer_to_binary(I),
Body = xmpp:mk_text(ID),
#message{id = ID, body = Body} = recv_message(Config)
end, lists:seq(1, 11)),
ct:comment("Receiving request ACK"),
#sm_r{} = recv(Config),
ct:comment("Receiving policy-violation stream error"),
#stream_error{reason = 'policy-violation'} = recv(Config),
{xmlstreamend, <<"stream:stream">>} = recv(Config),
ct:comment("Closing socket"),
close_socket(Config).
queue_limit_detached_master(Config) ->
ct:comment("Waiting for the peer to disconnect"),
peer_down = get_event(Config),
send_recv_messages(Config),
disconnect(Config).
queue_limit_detached_slave(Config) ->
#presence{} = send_recv(Config, #presence{}),
ct:comment("Enable the session management with resumption enabled"),
send(Config, #sm_enable{resume = true, xmlns = ?NS_STREAM_MGMT_3}),
#sm_enabled{resume = true} = recv(Config),
ct:comment("Closing socket"),
close_socket(Config).
%%%===================================================================
%%% Internal functions
@ -121,3 +166,20 @@ master_slave_test(T) ->
{list_to_atom("sm_" ++ atom_to_list(T)), [parallel],
[list_to_atom("sm_" ++ atom_to_list(T) ++ "_master"),
list_to_atom("sm_" ++ atom_to_list(T) ++ "_slave")]}.
send_recv_messages(Config) ->
PeerJID = ?config(peer, Config),
Msg = #message{to = PeerJID},
ct:comment("Sending messages to peer"),
lists:foreach(
fun(I) ->
ID = integer_to_binary(I),
send(Config, Msg#message{id = ID, body = xmpp:mk_text(ID)})
end, lists:seq(1, 11)),
ct:comment("Receiving bounced messages from the peer"),
lists:foreach(
fun(I) ->
ID = integer_to_binary(I),
Err = #message{id = ID, type = error} = recv_message(Config),
#stanza_error{reason = 'service-unavailable'} = xmpp:get_error(Err)
end, lists:seq(1, 11)).