From 1bd560f3f25d0a644bac3d06904ca97e20a6f7d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chmielowski?= Date: Wed, 1 Apr 2020 14:35:49 +0200 Subject: [PATCH] Fix potential message loss in terminating c2s sessions Calling sync version of xmpp_stream_in/out:stop could lead to messages never being processed by c2s process if they were queued in p1_server. This could be reproduced by when having messages in offline storage, starting sessions, enabling stream_mgmt, sending initial presence, and then immediately , messages that mod_offline would send process would not be bounced back by stream_mgmt. --- rebar.config | 2 +- src/ejabberd_c2s.erl | 12 ++++++------ src/ejabberd_s2s.erl | 10 +++++----- src/ejabberd_s2s_in.erl | 10 ++++++---- src/ejabberd_s2s_out.erl | 16 +++++++++------- src/ejabberd_service.erl | 11 +++++------ src/ejabberd_sm.erl | 2 +- src/mod_s2s_dialback.erl | 3 ++- src/mod_stream_mgmt.erl | 15 +++++++++------ 9 files changed, 44 insertions(+), 37 deletions(-) diff --git a/rebar.config b/rebar.config index 148622b04..9ab3b6058 100644 --- a/rebar.config +++ b/rebar.config @@ -25,7 +25,7 @@ {stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.19"}}}, {fast_xml, ".*", {git, "https://github.com/processone/fast_xml", {tag, "1.1.39"}}}, {idna, ".*", {git, "https://github.com/benoitc/erlang-idna", {tag, "6.0.0"}}}, - {xmpp, ".*", {git, "https://github.com/processone/xmpp", {tag, "1.4.5"}}}, + {xmpp, ".*", {git, "https://github.com/processone/xmpp", "c23e66ebac8fdec4aa08c8926091b0dcf6dacf22"}}, {fast_yaml, ".*", {git, "https://github.com/processone/fast_yaml", {tag, "1.0.24"}}}, {yconf, ".*", {git, "https://github.com/processone/yconf", {tag, "1.0.4"}}}, {jiffy, ".*", {git, "https://github.com/davisp/jiffy", {tag, "1.0.1"}}}, diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 8f069bcbe..f533fbed3 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -43,7 +43,7 @@ process_closed/2, process_terminated/2, process_info/2]). %% API -export([get_presence/1, set_presence/2, resend_presence/1, resend_presence/2, - open_session/1, call/3, cast/2, send/2, close/1, close/2, stop/1, + open_session/1, call/3, cast/2, send/2, close/1, close/2, stop_async/1, reply/2, copy_state/2, set_timeout/2, route/2, format_reason/2, host_up/1, host_down/1, send_ws_ping/1, bounce_message_queue/2]). @@ -110,10 +110,9 @@ close(Ref) -> close(Ref, Reason) -> xmpp_stream_in:close(Ref, Reason). --spec stop(pid()) -> ok; - (state()) -> no_return(). -stop(Ref) -> - xmpp_stream_in:stop(Ref). +-spec stop_async(pid()) -> ok. +stop_async(Pid) -> + xmpp_stream_in:stop_async(Pid). -spec send(pid(), xmpp_element()) -> ok; (state(), xmpp_element()) -> state(). @@ -285,7 +284,8 @@ process_auth_result(#{sasl_mech := Mech, State. process_closed(State, Reason) -> - stop(State#{stop_reason => Reason}). + stop_async(self()), + State#{stop_reason => Reason}. process_terminated(#{sid := SID, socket := Socket, jid := JID, user := U, server := S, resource := R} = State, diff --git a/src/ejabberd_s2s.erl b/src/ejabberd_s2s.erl index afe941f9a..3d2302f95 100644 --- a/src/ejabberd_s2s.erl +++ b/src/ejabberd_s2s.erl @@ -319,7 +319,7 @@ host_down(Host) -> case ejabberd_router:host_of_route(From) of Host -> ejabberd_s2s_out:send(Pid, Err), - ejabberd_s2s_out:stop(Pid); + ejabberd_s2s_out:stop_async(Pid); _ -> ok end; @@ -473,14 +473,14 @@ new_connection(MyServer, Server, From, FromTo, if Pid1 == Pid -> ejabberd_s2s_out:connect(Pid); true -> - ejabberd_s2s_out:stop(Pid) + ejabberd_s2s_out:stop_async(Pid) end, [Pid1]; {aborted, Reason} -> ?ERROR_MSG("Failed to register s2s connection ~ts -> ~ts: " "Mnesia failure: ~p", [MyServer, Server, Reason]), - ejabberd_s2s_out:stop(Pid), + ejabberd_s2s_out:stop_async(Pid), [] end. @@ -553,13 +553,13 @@ stop_s2s_connections(Err) -> lists:foreach( fun({_Id, Pid, _Type, _Module}) -> ejabberd_s2s_in:send(Pid, Err), - ejabberd_s2s_in:stop(Pid), + ejabberd_s2s_in:stop_async(Pid), supervisor:terminate_child(ejabberd_s2s_in_sup, Pid) end, supervisor:which_children(ejabberd_s2s_in_sup)), lists:foreach( fun({_Id, Pid, _Type, _Module}) -> ejabberd_s2s_out:send(Pid, Err), - ejabberd_s2s_out:stop(Pid), + ejabberd_s2s_out:stop_async(Pid), supervisor:terminate_child(ejabberd_s2s_out_sup, Pid) end, supervisor:which_children(ejabberd_s2s_out_sup)), _ = mnesia:clear_table(s2s), diff --git a/src/ejabberd_s2s_in.erl b/src/ejabberd_s2s_in.erl index 2c838f76e..187869cfb 100644 --- a/src/ejabberd_s2s_in.erl +++ b/src/ejabberd_s2s_in.erl @@ -38,7 +38,7 @@ -export([handle_unexpected_info/2, handle_unexpected_cast/2, reject_unauthenticated_packet/2, process_closed/2]). %% API --export([stop/1, close/1, close/2, send/2, update_state/2, establish/1, +-export([stop_async/1, close/1, close/2, send/2, update_state/2, establish/1, host_up/1, host_down/1]). -include("xmpp.hrl"). @@ -64,8 +64,9 @@ close(Ref) -> close(Ref, Reason) -> xmpp_stream_in:close(Ref, Reason). -stop(Ref) -> - xmpp_stream_in:stop(Ref). +-spec stop_async(pid()) -> ok. +stop_async(Pid) -> + xmpp_stream_in:stop_async(Pid). accept(Ref) -> xmpp_stream_in:accept(Ref). @@ -130,7 +131,8 @@ process_closed(#{server := LServer} = State, Reason) -> end, ?INFO_MSG("Closing inbound s2s connection ~ts -> ~ts: ~ts", [RServer, LServer, xmpp_stream_out:format_error(Reason)]), - stop(State). + stop_async(self()), + State. %%%=================================================================== %%% xmpp_stream_in callbacks diff --git a/src/ejabberd_s2s_out.erl b/src/ejabberd_s2s_out.erl index 7bbaf870c..0b44eb132 100644 --- a/src/ejabberd_s2s_out.erl +++ b/src/ejabberd_s2s_out.erl @@ -36,7 +36,7 @@ -export([process_auth_result/2, process_closed/2, handle_unexpected_info/2, handle_unexpected_cast/2, process_downgraded/2]). %% API --export([start/3, start_link/3, connect/1, close/1, close/2, stop/1, send/2, +-export([start/3, start_link/3, connect/1, close/1, close/2, stop_async/1, send/2, route/2, establish/1, update_state/2, host_up/1, host_down/1]). -include("xmpp.hrl"). @@ -79,10 +79,9 @@ close(Ref) -> close(Ref, Reason) -> xmpp_stream_out:close(Ref, Reason). --spec stop(pid()) -> ok; - (state()) -> no_return(). -stop(Ref) -> - xmpp_stream_out:stop(Ref). +-spec stop_async(pid()) -> ok. +stop_async(Pid) -> + xmpp_stream_out:stop_async(Pid). -spec send(pid(), xmpp_element()) -> ok; (state(), xmpp_element()) -> state(). @@ -150,7 +149,8 @@ process_closed(#{server := LServer, remote_server := RServer, Reason) -> ?INFO_MSG("Closing outbound s2s connection ~ts -> ~ts: ~ts", [LServer, RServer, format_error(Reason)]), - stop(State); + stop_async(self()), + State; process_closed(#{server := LServer, remote_server := RServer} = State, Reason) -> Delay = get_delay(), @@ -248,7 +248,9 @@ handle_send(El, Pkt, #{server_host := ServerHost} = State) -> handle_timeout(#{on_route := Action, lang := Lang} = State) -> case Action of - bounce -> stop(State); + bounce -> + stop_async(self()), + State; _ -> Txt = ?T("Idle connection"), send(State, xmpp:serr_connection_timeout(Txt, Lang)) diff --git a/src/ejabberd_service.erl b/src/ejabberd_service.erl index 92350956d..a5270b54d 100644 --- a/src/ejabberd_service.erl +++ b/src/ejabberd_service.erl @@ -33,7 +33,7 @@ -export([handle_stream_start/2, handle_auth_success/4, handle_auth_failure/4, handle_authenticated_packet/2, get_password_fun/1, tls_options/1]). %% API --export([send/2, close/1, close/2, stop/1]). +-export([send/2, close/1, close/2, stop_async/1]). -include("xmpp.hrl"). -include("logger.hrl"). @@ -59,7 +59,7 @@ stop() -> lists:foreach( fun({_Id, Pid, _Type, _Module}) -> send(Pid, Err), - stop(Pid), + stop_async(Pid), supervisor:terminate_child(ejabberd_service_sup, Pid) end, supervisor:which_children(ejabberd_service_sup)), _ = supervisor:terminate_child(ejabberd_sup, ejabberd_service_sup), @@ -83,10 +83,9 @@ close(Ref) -> close(Ref, Reason) -> xmpp_stream_in:close(Ref, Reason). --spec stop(pid()) -> ok; - (state()) -> no_return(). -stop(Ref) -> - xmpp_stream_in:stop(Ref). +-spec stop_async(pid()) -> ok. +stop_async(Pid) -> + xmpp_stream_in:stop_async(Pid). %%%=================================================================== %%% xmpp_stream_in callbacks diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index 6202614db..79839b4aa 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -538,7 +538,7 @@ host_down(Host) -> lists:foreach( fun(#session{sid = {_, Pid}}) when node(Pid) == node() -> ejabberd_c2s:send(Pid, Err), - ejabberd_c2s:stop(Pid); + ejabberd_c2s:stop_async(Pid); (_) -> ok end, get_sessions(Mod, Host)), diff --git a/src/mod_s2s_dialback.erl b/src/mod_s2s_dialback.erl index bf241bf52..91e2554ad 100644 --- a/src/mod_s2s_dialback.erl +++ b/src/mod_s2s_dialback.erl @@ -265,7 +265,8 @@ s2s_out_packet(#{server := LServer, ejabberd_s2s_in:update_state( Pid, fun(S) -> send_db_result(S, Response) end), %% At this point the connection is no longer needed and we can terminate it - ejabberd_s2s_out:stop(State); + ejabberd_s2s_out:stop_async(self()), + State; s2s_out_packet(#{server := LServer, remote_server := RServer} = State, #db_result{to = LServer, from = RServer, type = Type} = Result) when Type /= undefined -> diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl index 4c9ff3c8b..fb1a609b1 100644 --- a/src/mod_stream_mgmt.erl +++ b/src/mod_stream_mgmt.erl @@ -217,7 +217,8 @@ c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod, active -> State; pending -> - Mod:stop(State#{stop_reason => {stream, {out, Pkt}}}) + Mod:stop_async(self()), + {stop, State#{stop_reason => {stream, {out, Pkt}}}} end; _ -> State @@ -250,8 +251,9 @@ c2s_handle_info(#{mgmt_state := pending, lang := Lang, [jid:encode(JID)]), Txt = ?T("Timed out waiting for stream resumption"), Err = xmpp:serr_connection_timeout(Txt, Lang), - Mod:stop(State#{mgmt_state => timeout, - stop_reason => {stream, {out, Err}}}); + Mod:stop_async(self()), + {stop, State#{mgmt_state => timeout, + stop_reason => {stream, {out, Err}}}}; c2s_handle_info(State, {_Ref, {resume, #{jid := JID} = OldState}}) -> %% This happens if the resume_session/1 request timed out; the new session %% now receives the late response. @@ -444,7 +446,8 @@ handle_resume(#{user := User, lserver := LServer, -spec transition_to_pending(state(), _) -> state(). transition_to_pending(#{mgmt_state := active, mod := Mod, mgmt_timeout := 0} = State, _Reason) -> - Mod:stop(State); + Mod:stop_async(self()), + State; transition_to_pending(#{mgmt_state := active, jid := JID, socket := Socket, lserver := LServer, mgmt_timeout := Timeout} = State, Reason) -> @@ -660,7 +663,7 @@ inherit_session_state(#{user := U, server := S, mgmt_stanzas_out => NumStanzasOut, mgmt_state => active}, State3 = ejabberd_c2s:open_session(State2), - ejabberd_c2s:stop(OldPID), + ejabberd_c2s:stop_async(OldPID), {ok, State3}; {error, Msg} -> {error, Msg} @@ -674,7 +677,7 @@ inherit_session_state(#{user := U, server := S, {error, session_was_killed}; exit:{timeout, _} -> ejabberd_sm:close_session(OldSID, U, S, R), - ejabberd_c2s:stop(OldPID), + ejabberd_c2s:stop_async(OldPID), {error, session_copy_timed_out} end end;