diff --git a/rebar.config b/rebar.config index 7088cfefe..ca41b6ee6 100644 --- a/rebar.config +++ b/rebar.config @@ -19,7 +19,7 @@ %%%---------------------------------------------------------------------- {deps, [{lager, ".*", {git, "https://github.com/basho/lager", {tag, "3.2.1"}}}, - {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", "13b03e1c8c7a5777de728f759809142f997f8af3"}}, + {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", "f677e61"}}, {cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "1.0.6"}}}, {fast_tls, ".*", {git, "https://github.com/processone/fast_tls", "afdd07811e0e6eff444c035ffeb2aa9efb4dbe6d"}}, {stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.7"}}}, diff --git a/src/ejabberd_s2s_out.erl b/src/ejabberd_s2s_out.erl index 8c9f9d631..60c19b082 100644 --- a/src/ejabberd_s2s_out.erl +++ b/src/ejabberd_s2s_out.erl @@ -145,14 +145,14 @@ process_closed(#{server := LServer, remote_server := RServer, on_route := send} = State, Reason) -> ?INFO_MSG("Closing outbound s2s connection ~s -> ~s: ~s", - [LServer, RServer, xmpp_stream_out:format_error(Reason)]), + [LServer, RServer, format_error(Reason)]), stop(State); process_closed(#{server := LServer, remote_server := RServer} = State, Reason) -> Delay = get_delay(), ?INFO_MSG("Failed to establish outbound s2s connection ~s -> ~s: ~s; " "bouncing for ~p seconds", - [LServer, RServer, xmpp_stream_out:format_error(Reason), Delay]), + [LServer, RServer, format_error(Reason), Delay]), State1 = State#{on_route => bounce}, State2 = bounce_queue(State1), xmpp_stream_out:set_timeout(State2, timer:seconds(Delay)). @@ -309,11 +309,9 @@ handle_info({route, Pkt}, #{queue := Q, on_route := Action} = State) -> queue -> try State#{queue => p1_queue:in(Pkt, Q)} catch error:full -> - #{server := LServer, remote_server := RServer} = State, - ?INFO_MSG("Failed to establish outbound s2s connection " - "~s -> ~s: message queue is overloaded", - [LServer, RServer]), - stop(State#{stop_reason => queue_full}) + Q1 = p1_queue:set_limit(Q, unlimited), + Q2 = p1_queue:in(Pkt, Q1), + handle_stream_end(queue_full, State#{queue => Q2}) end; bounce -> bounce_packet(Pkt, State); send -> set_idle_timeout(send(State, Pkt)) @@ -371,12 +369,12 @@ bounce_packet(_, State) -> -spec mk_bounce_error(binary(), state()) -> stanza_error(). mk_bounce_error(Lang, #{stop_reason := Why}) -> - Reason = xmpp_stream_out:format_error(Why), + Reason = format_error(Why), case Why of internal_failure -> - xmpp:err_internal_server_error(); + xmpp:err_internal_server_error(Reason, Lang); queue_full -> - xmpp:err_resource_constraint(); + xmpp:err_resource_constraint(Reason, Lang); {dns, _} -> xmpp:err_remote_server_not_found(Reason, Lang); _ -> @@ -401,6 +399,7 @@ set_idle_timeout(#{on_route := send, server := LServer} = State) -> set_idle_timeout(State) -> State. +-spec queue_fold(fun((xmpp_element(), state()) -> state()), state()) -> state(). queue_fold(F, #{queue := Q} = State) -> case p1_queue:out(Q) of {{value, Pkt}, Q1} -> @@ -410,6 +409,13 @@ queue_fold(F, #{queue := Q} = State) -> State#{queue => Q1} end. +format_error(internal_failure) -> + <<"Internal server error">>; +format_error(queue_full) -> + <<"Stream queue is overloaded">>; +format_error(Reason) -> + xmpp_stream_out:format_error(Reason). + transform_options(Opts) -> lists:foldl(fun transform_options/2, [], Opts).