Fix potential message loss in terminating c2s sessions

Calling sync version of xmpp_stream_in/out:stop could lead to messages
never being processed by c2s process if they were queued in p1_server.

This could be reproduced by when having messages in offline storage,
starting sessions, enabling stream_mgmt, sending initial presence, and then immediately
</stream:stream>, messages that mod_offline would send process would not
be bounced back by stream_mgmt.
This commit is contained in:
Paweł Chmielowski 2020-04-01 14:35:49 +02:00
parent 222bb1d55d
commit 1bd560f3f2
9 changed files with 44 additions and 37 deletions

View File

@ -25,7 +25,7 @@
{stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.19"}}},
{fast_xml, ".*", {git, "https://github.com/processone/fast_xml", {tag, "1.1.39"}}},
{idna, ".*", {git, "https://github.com/benoitc/erlang-idna", {tag, "6.0.0"}}},
{xmpp, ".*", {git, "https://github.com/processone/xmpp", {tag, "1.4.5"}}},
{xmpp, ".*", {git, "https://github.com/processone/xmpp", "c23e66ebac8fdec4aa08c8926091b0dcf6dacf22"}},
{fast_yaml, ".*", {git, "https://github.com/processone/fast_yaml", {tag, "1.0.24"}}},
{yconf, ".*", {git, "https://github.com/processone/yconf", {tag, "1.0.4"}}},
{jiffy, ".*", {git, "https://github.com/davisp/jiffy", {tag, "1.0.1"}}},

View File

@ -43,7 +43,7 @@
process_closed/2, process_terminated/2, process_info/2]).
%% API
-export([get_presence/1, set_presence/2, resend_presence/1, resend_presence/2,
open_session/1, call/3, cast/2, send/2, close/1, close/2, stop/1,
open_session/1, call/3, cast/2, send/2, close/1, close/2, stop_async/1,
reply/2, copy_state/2, set_timeout/2, route/2, format_reason/2,
host_up/1, host_down/1, send_ws_ping/1, bounce_message_queue/2]).
@ -110,10 +110,9 @@ close(Ref) ->
close(Ref, Reason) ->
xmpp_stream_in:close(Ref, Reason).
-spec stop(pid()) -> ok;
(state()) -> no_return().
stop(Ref) ->
xmpp_stream_in:stop(Ref).
-spec stop_async(pid()) -> ok.
stop_async(Pid) ->
xmpp_stream_in:stop_async(Pid).
-spec send(pid(), xmpp_element()) -> ok;
(state(), xmpp_element()) -> state().
@ -285,7 +284,8 @@ process_auth_result(#{sasl_mech := Mech,
State.
process_closed(State, Reason) ->
stop(State#{stop_reason => Reason}).
stop_async(self()),
State#{stop_reason => Reason}.
process_terminated(#{sid := SID, socket := Socket,
jid := JID, user := U, server := S, resource := R} = State,

View File

@ -319,7 +319,7 @@ host_down(Host) ->
case ejabberd_router:host_of_route(From) of
Host ->
ejabberd_s2s_out:send(Pid, Err),
ejabberd_s2s_out:stop(Pid);
ejabberd_s2s_out:stop_async(Pid);
_ ->
ok
end;
@ -473,14 +473,14 @@ new_connection(MyServer, Server, From, FromTo,
if Pid1 == Pid ->
ejabberd_s2s_out:connect(Pid);
true ->
ejabberd_s2s_out:stop(Pid)
ejabberd_s2s_out:stop_async(Pid)
end,
[Pid1];
{aborted, Reason} ->
?ERROR_MSG("Failed to register s2s connection ~ts -> ~ts: "
"Mnesia failure: ~p",
[MyServer, Server, Reason]),
ejabberd_s2s_out:stop(Pid),
ejabberd_s2s_out:stop_async(Pid),
[]
end.
@ -553,13 +553,13 @@ stop_s2s_connections(Err) ->
lists:foreach(
fun({_Id, Pid, _Type, _Module}) ->
ejabberd_s2s_in:send(Pid, Err),
ejabberd_s2s_in:stop(Pid),
ejabberd_s2s_in:stop_async(Pid),
supervisor:terminate_child(ejabberd_s2s_in_sup, Pid)
end, supervisor:which_children(ejabberd_s2s_in_sup)),
lists:foreach(
fun({_Id, Pid, _Type, _Module}) ->
ejabberd_s2s_out:send(Pid, Err),
ejabberd_s2s_out:stop(Pid),
ejabberd_s2s_out:stop_async(Pid),
supervisor:terminate_child(ejabberd_s2s_out_sup, Pid)
end, supervisor:which_children(ejabberd_s2s_out_sup)),
_ = mnesia:clear_table(s2s),

View File

@ -38,7 +38,7 @@
-export([handle_unexpected_info/2, handle_unexpected_cast/2,
reject_unauthenticated_packet/2, process_closed/2]).
%% API
-export([stop/1, close/1, close/2, send/2, update_state/2, establish/1,
-export([stop_async/1, close/1, close/2, send/2, update_state/2, establish/1,
host_up/1, host_down/1]).
-include("xmpp.hrl").
@ -64,8 +64,9 @@ close(Ref) ->
close(Ref, Reason) ->
xmpp_stream_in:close(Ref, Reason).
stop(Ref) ->
xmpp_stream_in:stop(Ref).
-spec stop_async(pid()) -> ok.
stop_async(Pid) ->
xmpp_stream_in:stop_async(Pid).
accept(Ref) ->
xmpp_stream_in:accept(Ref).
@ -130,7 +131,8 @@ process_closed(#{server := LServer} = State, Reason) ->
end,
?INFO_MSG("Closing inbound s2s connection ~ts -> ~ts: ~ts",
[RServer, LServer, xmpp_stream_out:format_error(Reason)]),
stop(State).
stop_async(self()),
State.
%%%===================================================================
%%% xmpp_stream_in callbacks

View File

@ -36,7 +36,7 @@
-export([process_auth_result/2, process_closed/2, handle_unexpected_info/2,
handle_unexpected_cast/2, process_downgraded/2]).
%% API
-export([start/3, start_link/3, connect/1, close/1, close/2, stop/1, send/2,
-export([start/3, start_link/3, connect/1, close/1, close/2, stop_async/1, send/2,
route/2, establish/1, update_state/2, host_up/1, host_down/1]).
-include("xmpp.hrl").
@ -79,10 +79,9 @@ close(Ref) ->
close(Ref, Reason) ->
xmpp_stream_out:close(Ref, Reason).
-spec stop(pid()) -> ok;
(state()) -> no_return().
stop(Ref) ->
xmpp_stream_out:stop(Ref).
-spec stop_async(pid()) -> ok.
stop_async(Pid) ->
xmpp_stream_out:stop_async(Pid).
-spec send(pid(), xmpp_element()) -> ok;
(state(), xmpp_element()) -> state().
@ -150,7 +149,8 @@ process_closed(#{server := LServer, remote_server := RServer,
Reason) ->
?INFO_MSG("Closing outbound s2s connection ~ts -> ~ts: ~ts",
[LServer, RServer, format_error(Reason)]),
stop(State);
stop_async(self()),
State;
process_closed(#{server := LServer, remote_server := RServer} = State,
Reason) ->
Delay = get_delay(),
@ -248,7 +248,9 @@ handle_send(El, Pkt, #{server_host := ServerHost} = State) ->
handle_timeout(#{on_route := Action, lang := Lang} = State) ->
case Action of
bounce -> stop(State);
bounce ->
stop_async(self()),
State;
_ ->
Txt = ?T("Idle connection"),
send(State, xmpp:serr_connection_timeout(Txt, Lang))

View File

@ -33,7 +33,7 @@
-export([handle_stream_start/2, handle_auth_success/4, handle_auth_failure/4,
handle_authenticated_packet/2, get_password_fun/1, tls_options/1]).
%% API
-export([send/2, close/1, close/2, stop/1]).
-export([send/2, close/1, close/2, stop_async/1]).
-include("xmpp.hrl").
-include("logger.hrl").
@ -59,7 +59,7 @@ stop() ->
lists:foreach(
fun({_Id, Pid, _Type, _Module}) ->
send(Pid, Err),
stop(Pid),
stop_async(Pid),
supervisor:terminate_child(ejabberd_service_sup, Pid)
end, supervisor:which_children(ejabberd_service_sup)),
_ = supervisor:terminate_child(ejabberd_sup, ejabberd_service_sup),
@ -83,10 +83,9 @@ close(Ref) ->
close(Ref, Reason) ->
xmpp_stream_in:close(Ref, Reason).
-spec stop(pid()) -> ok;
(state()) -> no_return().
stop(Ref) ->
xmpp_stream_in:stop(Ref).
-spec stop_async(pid()) -> ok.
stop_async(Pid) ->
xmpp_stream_in:stop_async(Pid).
%%%===================================================================
%%% xmpp_stream_in callbacks

View File

@ -538,7 +538,7 @@ host_down(Host) ->
lists:foreach(
fun(#session{sid = {_, Pid}}) when node(Pid) == node() ->
ejabberd_c2s:send(Pid, Err),
ejabberd_c2s:stop(Pid);
ejabberd_c2s:stop_async(Pid);
(_) ->
ok
end, get_sessions(Mod, Host)),

View File

@ -265,7 +265,8 @@ s2s_out_packet(#{server := LServer,
ejabberd_s2s_in:update_state(
Pid, fun(S) -> send_db_result(S, Response) end),
%% At this point the connection is no longer needed and we can terminate it
ejabberd_s2s_out:stop(State);
ejabberd_s2s_out:stop_async(self()),
State;
s2s_out_packet(#{server := LServer, remote_server := RServer} = State,
#db_result{to = LServer, from = RServer,
type = Type} = Result) when Type /= undefined ->

View File

@ -217,7 +217,8 @@ c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod,
active ->
State;
pending ->
Mod:stop(State#{stop_reason => {stream, {out, Pkt}}})
Mod:stop_async(self()),
{stop, State#{stop_reason => {stream, {out, Pkt}}}}
end;
_ ->
State
@ -250,8 +251,9 @@ c2s_handle_info(#{mgmt_state := pending, lang := Lang,
[jid:encode(JID)]),
Txt = ?T("Timed out waiting for stream resumption"),
Err = xmpp:serr_connection_timeout(Txt, Lang),
Mod:stop(State#{mgmt_state => timeout,
stop_reason => {stream, {out, Err}}});
Mod:stop_async(self()),
{stop, State#{mgmt_state => timeout,
stop_reason => {stream, {out, Err}}}};
c2s_handle_info(State, {_Ref, {resume, #{jid := JID} = OldState}}) ->
%% This happens if the resume_session/1 request timed out; the new session
%% now receives the late response.
@ -444,7 +446,8 @@ handle_resume(#{user := User, lserver := LServer,
-spec transition_to_pending(state(), _) -> state().
transition_to_pending(#{mgmt_state := active, mod := Mod,
mgmt_timeout := 0} = State, _Reason) ->
Mod:stop(State);
Mod:stop_async(self()),
State;
transition_to_pending(#{mgmt_state := active, jid := JID, socket := Socket,
lserver := LServer, mgmt_timeout := Timeout} = State,
Reason) ->
@ -660,7 +663,7 @@ inherit_session_state(#{user := U, server := S,
mgmt_stanzas_out => NumStanzasOut,
mgmt_state => active},
State3 = ejabberd_c2s:open_session(State2),
ejabberd_c2s:stop(OldPID),
ejabberd_c2s:stop_async(OldPID),
{ok, State3};
{error, Msg} ->
{error, Msg}
@ -674,7 +677,7 @@ inherit_session_state(#{user := U, server := S,
{error, session_was_killed};
exit:{timeout, _} ->
ejabberd_sm:close_session(OldSID, U, S, R),
ejabberd_c2s:stop(OldPID),
ejabberd_c2s:stop_async(OldPID),
{error, session_copy_timed_out}
end
end;