Improve overloaded S2S queue processing
This commit is contained in:
parent
02064ae12a
commit
4b1bdb563e
|
@ -19,7 +19,7 @@
|
||||||
%%%----------------------------------------------------------------------
|
%%%----------------------------------------------------------------------
|
||||||
|
|
||||||
{deps, [{lager, ".*", {git, "https://github.com/basho/lager", {tag, "3.2.1"}}},
|
{deps, [{lager, ".*", {git, "https://github.com/basho/lager", {tag, "3.2.1"}}},
|
||||||
{p1_utils, ".*", {git, "https://github.com/processone/p1_utils", "13b03e1c8c7a5777de728f759809142f997f8af3"}},
|
{p1_utils, ".*", {git, "https://github.com/processone/p1_utils", "f677e61"}},
|
||||||
{cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "1.0.6"}}},
|
{cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "1.0.6"}}},
|
||||||
{fast_tls, ".*", {git, "https://github.com/processone/fast_tls", "afdd07811e0e6eff444c035ffeb2aa9efb4dbe6d"}},
|
{fast_tls, ".*", {git, "https://github.com/processone/fast_tls", "afdd07811e0e6eff444c035ffeb2aa9efb4dbe6d"}},
|
||||||
{stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.7"}}},
|
{stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.7"}}},
|
||||||
|
|
|
@ -145,14 +145,14 @@ process_closed(#{server := LServer, remote_server := RServer,
|
||||||
on_route := send} = State,
|
on_route := send} = State,
|
||||||
Reason) ->
|
Reason) ->
|
||||||
?INFO_MSG("Closing outbound s2s connection ~s -> ~s: ~s",
|
?INFO_MSG("Closing outbound s2s connection ~s -> ~s: ~s",
|
||||||
[LServer, RServer, xmpp_stream_out:format_error(Reason)]),
|
[LServer, RServer, format_error(Reason)]),
|
||||||
stop(State);
|
stop(State);
|
||||||
process_closed(#{server := LServer, remote_server := RServer} = State,
|
process_closed(#{server := LServer, remote_server := RServer} = State,
|
||||||
Reason) ->
|
Reason) ->
|
||||||
Delay = get_delay(),
|
Delay = get_delay(),
|
||||||
?INFO_MSG("Failed to establish outbound s2s connection ~s -> ~s: ~s; "
|
?INFO_MSG("Failed to establish outbound s2s connection ~s -> ~s: ~s; "
|
||||||
"bouncing for ~p seconds",
|
"bouncing for ~p seconds",
|
||||||
[LServer, RServer, xmpp_stream_out:format_error(Reason), Delay]),
|
[LServer, RServer, format_error(Reason), Delay]),
|
||||||
State1 = State#{on_route => bounce},
|
State1 = State#{on_route => bounce},
|
||||||
State2 = bounce_queue(State1),
|
State2 = bounce_queue(State1),
|
||||||
xmpp_stream_out:set_timeout(State2, timer:seconds(Delay)).
|
xmpp_stream_out:set_timeout(State2, timer:seconds(Delay)).
|
||||||
|
@ -309,11 +309,9 @@ handle_info({route, Pkt}, #{queue := Q, on_route := Action} = State) ->
|
||||||
queue ->
|
queue ->
|
||||||
try State#{queue => p1_queue:in(Pkt, Q)}
|
try State#{queue => p1_queue:in(Pkt, Q)}
|
||||||
catch error:full ->
|
catch error:full ->
|
||||||
#{server := LServer, remote_server := RServer} = State,
|
Q1 = p1_queue:set_limit(Q, unlimited),
|
||||||
?INFO_MSG("Failed to establish outbound s2s connection "
|
Q2 = p1_queue:in(Pkt, Q1),
|
||||||
"~s -> ~s: message queue is overloaded",
|
handle_stream_end(queue_full, State#{queue => Q2})
|
||||||
[LServer, RServer]),
|
|
||||||
stop(State#{stop_reason => queue_full})
|
|
||||||
end;
|
end;
|
||||||
bounce -> bounce_packet(Pkt, State);
|
bounce -> bounce_packet(Pkt, State);
|
||||||
send -> set_idle_timeout(send(State, Pkt))
|
send -> set_idle_timeout(send(State, Pkt))
|
||||||
|
@ -371,12 +369,12 @@ bounce_packet(_, State) ->
|
||||||
|
|
||||||
-spec mk_bounce_error(binary(), state()) -> stanza_error().
|
-spec mk_bounce_error(binary(), state()) -> stanza_error().
|
||||||
mk_bounce_error(Lang, #{stop_reason := Why}) ->
|
mk_bounce_error(Lang, #{stop_reason := Why}) ->
|
||||||
Reason = xmpp_stream_out:format_error(Why),
|
Reason = format_error(Why),
|
||||||
case Why of
|
case Why of
|
||||||
internal_failure ->
|
internal_failure ->
|
||||||
xmpp:err_internal_server_error();
|
xmpp:err_internal_server_error(Reason, Lang);
|
||||||
queue_full ->
|
queue_full ->
|
||||||
xmpp:err_resource_constraint();
|
xmpp:err_resource_constraint(Reason, Lang);
|
||||||
{dns, _} ->
|
{dns, _} ->
|
||||||
xmpp:err_remote_server_not_found(Reason, Lang);
|
xmpp:err_remote_server_not_found(Reason, Lang);
|
||||||
_ ->
|
_ ->
|
||||||
|
@ -401,6 +399,7 @@ set_idle_timeout(#{on_route := send, server := LServer} = State) ->
|
||||||
set_idle_timeout(State) ->
|
set_idle_timeout(State) ->
|
||||||
State.
|
State.
|
||||||
|
|
||||||
|
-spec queue_fold(fun((xmpp_element(), state()) -> state()), state()) -> state().
|
||||||
queue_fold(F, #{queue := Q} = State) ->
|
queue_fold(F, #{queue := Q} = State) ->
|
||||||
case p1_queue:out(Q) of
|
case p1_queue:out(Q) of
|
||||||
{{value, Pkt}, Q1} ->
|
{{value, Pkt}, Q1} ->
|
||||||
|
@ -410,6 +409,13 @@ queue_fold(F, #{queue := Q} = State) ->
|
||||||
State#{queue => Q1}
|
State#{queue => Q1}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
format_error(internal_failure) ->
|
||||||
|
<<"Internal server error">>;
|
||||||
|
format_error(queue_full) ->
|
||||||
|
<<"Stream queue is overloaded">>;
|
||||||
|
format_error(Reason) ->
|
||||||
|
xmpp_stream_out:format_error(Reason).
|
||||||
|
|
||||||
transform_options(Opts) ->
|
transform_options(Opts) ->
|
||||||
lists:foldl(fun transform_options/2, [], Opts).
|
lists:foldl(fun transform_options/2, [], Opts).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue