From 621f0e2b7cbb6c431e2c26ad6561c6c0eb74c065 Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Wed, 7 Sep 2016 23:16:54 +0200 Subject: [PATCH] New stream management option: ack_timeout Close the connection if a stream management client fails to respond to an acknowledgement request within 60 seconds. This number of seconds can be changed with the new "ack_timeout" option, and the mechanism can be disabled by specifying 'infinity'. As a side effect of this change, a new acknowledgement is no longer requested before the response to the previous request is received. --- src/ejabberd_c2s.erl | 70 +++++++++++++++++++++++++++++++------- src/ejabberd_http_bind.erl | 1 + src/ejabberd_http_ws.erl | 1 + 3 files changed, 59 insertions(+), 13 deletions(-) diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index cf7602441..09df739b4 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -113,9 +113,12 @@ mgmt_pending_since, mgmt_timeout, mgmt_max_timeout, + mgmt_ack_timeout, + mgmt_ack_timer, mgmt_resend, mgmt_stanzas_in = 0, mgmt_stanzas_out = 0, + mgmt_stanzas_req = 0, ask_offline = true, lang = <<"">>}). @@ -308,13 +311,18 @@ init([{SockMod, Socket}, Opts]) -> _ -> 1000 end, ResumeTimeout = case proplists:get_value(resume_timeout, Opts) of - Timeout when is_integer(Timeout), Timeout >= 0 -> Timeout; + RTimeo when is_integer(RTimeo), RTimeo >= 0 -> RTimeo; _ -> 300 end, MaxResumeTimeout = case proplists:get_value(max_resume_timeout, Opts) of Max when is_integer(Max), Max >= ResumeTimeout -> Max; _ -> ResumeTimeout end, + AckTimeout = case proplists:get_value(ack_timeout, Opts) of + ATimeo when is_integer(ATimeo), ATimeo > 0 -> ATimeo * 1000; + infinity -> undefined; + _ -> 60000 + end, ResendOnTimeout = case proplists:get_value(resend_on_timeout, Opts) of Resend when is_boolean(Resend) -> Resend; if_offline -> if_offline; @@ -338,6 +346,7 @@ init([{SockMod, Socket}, Opts]) -> mgmt_max_queue = MaxAckQueue, mgmt_timeout = ResumeTimeout, mgmt_max_timeout = MaxResumeTimeout, + mgmt_ack_timeout = AckTimeout, mgmt_resend = ResendOnTimeout}, {ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}. @@ -1759,6 +1768,11 @@ handle_info({broadcast, Type, From, Packet}, StateName, StateData) -> fsm_next_state(StateName, StateData); handle_info(dont_ask_offline, StateName, StateData) -> fsm_next_state(StateName, StateData#state{ask_offline = false}); +handle_info(close, StateName, StateData) -> + ?DEBUG("Timeout waiting for stream management acknowledgement of ~s", + [jid:to_string(StateData#state.jid)]), + close(self()), + fsm_next_state(StateName, StateData); handle_info({_Ref, {resume, OldStateData}}, StateName, StateData) -> %% This happens if the resume_session/1 request timed out; the new session %% now receives the late response. @@ -1894,8 +1908,8 @@ send_stanza(StateData, Stanza) when StateData#state.csi_state == inactive -> send_stanza(StateData, Stanza) when StateData#state.mgmt_state == pending -> mgmt_queue_add(StateData, Stanza); send_stanza(StateData, Stanza) when StateData#state.mgmt_state == active -> - NewStateData = send_stanza_and_ack_req(StateData, Stanza), - mgmt_queue_add(NewStateData, Stanza); + NewStateData = mgmt_queue_add(StateData, Stanza), + mgmt_send_stanza(NewStateData, Stanza); send_stanza(StateData, Stanza) -> send_element(StateData, Stanza), StateData. @@ -2757,7 +2771,8 @@ handle_r(StateData) -> handle_a(StateData, Attrs) -> case catch jlib:binary_to_integer(fxml:get_attr_s(<<"h">>, Attrs)) of H when is_integer(H), H >= 0 -> - check_h_attribute(StateData, H); + NewStateData = check_h_attribute(StateData, H), + maybe_renew_ack_request(NewStateData); _ -> ?DEBUG("Ignoring invalid ACK element from ~s", [jid:to_string(StateData#state.jid)]), @@ -2850,18 +2865,47 @@ update_num_stanzas_in(#state{mgmt_state = MgmtState} = StateData, El) update_num_stanzas_in(StateData, _El) -> StateData. -send_stanza_and_ack_req(StateData, Stanza) -> - AckReq = #xmlel{name = <<"r">>, - attrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}], - children = []}, - case send_element(StateData, Stanza) == ok andalso - send_element(StateData, AckReq) == ok of - true -> - StateData; - false -> +mgmt_send_stanza(StateData, Stanza) -> + case send_element(StateData, Stanza) of + ok -> + maybe_request_ack(StateData); + _ -> StateData#state{mgmt_state = pending} end. +maybe_request_ack(#state{mgmt_ack_timer = undefined} = StateData) -> + request_ack(StateData); +maybe_request_ack(StateData) -> + StateData. + +request_ack(#state{mgmt_xmlns = Xmlns, + mgmt_ack_timeout = AckTimeout} = StateData) -> + AckReq = #xmlel{name = <<"r">>, attrs = [{<<"xmlns">>, Xmlns}]}, + case {send_element(StateData, AckReq), AckTimeout} of + {ok, undefined} -> + ok; + {ok, Timeout} -> + Timer = erlang:send_after(Timeout, self(), close), + StateData#state{mgmt_ack_timer = Timer, + mgmt_stanzas_req = StateData#state.mgmt_stanzas_out}; + _ -> + StateData#state{mgmt_state = pending} + end. + +maybe_renew_ack_request(#state{mgmt_ack_timer = undefined} = StateData) -> + StateData; +maybe_renew_ack_request(#state{mgmt_ack_timer = Timer, + mgmt_queue = Queue, + mgmt_stanzas_out = NumStanzasOut, + mgmt_stanzas_req = NumStanzasReq} = StateData) -> + erlang:cancel_timer(Timer), + case NumStanzasReq < NumStanzasOut andalso not queue:is_empty(Queue) of + true -> + request_ack(StateData#state{mgmt_ack_timer = undefined}); + false -> + StateData#state{mgmt_ack_timer = undefined} + end. + mgmt_queue_add(StateData, El) -> NewNum = case StateData#state.mgmt_stanzas_out of 4294967295 -> diff --git a/src/ejabberd_http_bind.erl b/src/ejabberd_http_bind.erl index 758c1cee5..628119e6f 100644 --- a/src/ejabberd_http_bind.erl +++ b/src/ejabberd_http_bind.erl @@ -340,6 +340,7 @@ init([Sid, Key, IP, HOpts]) -> Opts1 = ejabberd_c2s_config:get_c2s_limits(), SOpts = lists:filtermap(fun({stream_management, _}) -> true; ({max_ack_queue, _}) -> true; + ({ack_timeout, _}) -> true; ({resume_timeout, _}) -> true; ({max_resume_timeout, _}) -> true; ({resend_on_timeout, _}) -> true; diff --git a/src/ejabberd_http_ws.erl b/src/ejabberd_http_ws.erl index 24554a8cc..e76e8689a 100644 --- a/src/ejabberd_http_ws.erl +++ b/src/ejabberd_http_ws.erl @@ -114,6 +114,7 @@ socket_handoff(LocalPath, Request, Socket, SockMod, Buf, Opts) -> init([{#ws{ip = IP, http_opts = HOpts}, _} = WS]) -> SOpts = lists:filtermap(fun({stream_management, _}) -> true; ({max_ack_queue, _}) -> true; + ({ack_timeout, _}) -> true; ({resume_timeout, _}) -> true; ({max_resume_timeout, _}) -> true; ({resend_on_timeout, _}) -> true;