mirror of
https://github.com/processone/ejabberd.git
synced 2024-11-22 16:20:52 +01:00
Merge pull request #1287 from weiss/ack-timeout
New stream management option: ack_timeout
This commit is contained in:
commit
6c943aa293
@ -113,9 +113,12 @@
|
|||||||
mgmt_pending_since,
|
mgmt_pending_since,
|
||||||
mgmt_timeout,
|
mgmt_timeout,
|
||||||
mgmt_max_timeout,
|
mgmt_max_timeout,
|
||||||
|
mgmt_ack_timeout,
|
||||||
|
mgmt_ack_timer,
|
||||||
mgmt_resend,
|
mgmt_resend,
|
||||||
mgmt_stanzas_in = 0,
|
mgmt_stanzas_in = 0,
|
||||||
mgmt_stanzas_out = 0,
|
mgmt_stanzas_out = 0,
|
||||||
|
mgmt_stanzas_req = 0,
|
||||||
ask_offline = true,
|
ask_offline = true,
|
||||||
lang = <<"">>}).
|
lang = <<"">>}).
|
||||||
|
|
||||||
@ -308,13 +311,18 @@ init([{SockMod, Socket}, Opts]) ->
|
|||||||
_ -> 1000
|
_ -> 1000
|
||||||
end,
|
end,
|
||||||
ResumeTimeout = case proplists:get_value(resume_timeout, Opts) of
|
ResumeTimeout = case proplists:get_value(resume_timeout, Opts) of
|
||||||
Timeout when is_integer(Timeout), Timeout >= 0 -> Timeout;
|
RTimeo when is_integer(RTimeo), RTimeo >= 0 -> RTimeo;
|
||||||
_ -> 300
|
_ -> 300
|
||||||
end,
|
end,
|
||||||
MaxResumeTimeout = case proplists:get_value(max_resume_timeout, Opts) of
|
MaxResumeTimeout = case proplists:get_value(max_resume_timeout, Opts) of
|
||||||
Max when is_integer(Max), Max >= ResumeTimeout -> Max;
|
Max when is_integer(Max), Max >= ResumeTimeout -> Max;
|
||||||
_ -> ResumeTimeout
|
_ -> ResumeTimeout
|
||||||
end,
|
end,
|
||||||
|
AckTimeout = case proplists:get_value(ack_timeout, Opts) of
|
||||||
|
ATimeo when is_integer(ATimeo), ATimeo > 0 -> ATimeo * 1000;
|
||||||
|
infinity -> undefined;
|
||||||
|
_ -> 60000
|
||||||
|
end,
|
||||||
ResendOnTimeout = case proplists:get_value(resend_on_timeout, Opts) of
|
ResendOnTimeout = case proplists:get_value(resend_on_timeout, Opts) of
|
||||||
Resend when is_boolean(Resend) -> Resend;
|
Resend when is_boolean(Resend) -> Resend;
|
||||||
if_offline -> if_offline;
|
if_offline -> if_offline;
|
||||||
@ -338,6 +346,7 @@ init([{SockMod, Socket}, Opts]) ->
|
|||||||
mgmt_max_queue = MaxAckQueue,
|
mgmt_max_queue = MaxAckQueue,
|
||||||
mgmt_timeout = ResumeTimeout,
|
mgmt_timeout = ResumeTimeout,
|
||||||
mgmt_max_timeout = MaxResumeTimeout,
|
mgmt_max_timeout = MaxResumeTimeout,
|
||||||
|
mgmt_ack_timeout = AckTimeout,
|
||||||
mgmt_resend = ResendOnTimeout},
|
mgmt_resend = ResendOnTimeout},
|
||||||
{ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}.
|
{ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}.
|
||||||
|
|
||||||
@ -1759,6 +1768,11 @@ handle_info({broadcast, Type, From, Packet}, StateName, StateData) ->
|
|||||||
fsm_next_state(StateName, StateData);
|
fsm_next_state(StateName, StateData);
|
||||||
handle_info(dont_ask_offline, StateName, StateData) ->
|
handle_info(dont_ask_offline, StateName, StateData) ->
|
||||||
fsm_next_state(StateName, StateData#state{ask_offline = false});
|
fsm_next_state(StateName, StateData#state{ask_offline = false});
|
||||||
|
handle_info(close, StateName, StateData) ->
|
||||||
|
?DEBUG("Timeout waiting for stream management acknowledgement of ~s",
|
||||||
|
[jid:to_string(StateData#state.jid)]),
|
||||||
|
close(self()),
|
||||||
|
fsm_next_state(StateName, StateData);
|
||||||
handle_info({_Ref, {resume, OldStateData}}, StateName, StateData) ->
|
handle_info({_Ref, {resume, OldStateData}}, StateName, StateData) ->
|
||||||
%% This happens if the resume_session/1 request timed out; the new session
|
%% This happens if the resume_session/1 request timed out; the new session
|
||||||
%% now receives the late response.
|
%% now receives the late response.
|
||||||
@ -1894,8 +1908,8 @@ send_stanza(StateData, Stanza) when StateData#state.csi_state == inactive ->
|
|||||||
send_stanza(StateData, Stanza) when StateData#state.mgmt_state == pending ->
|
send_stanza(StateData, Stanza) when StateData#state.mgmt_state == pending ->
|
||||||
mgmt_queue_add(StateData, Stanza);
|
mgmt_queue_add(StateData, Stanza);
|
||||||
send_stanza(StateData, Stanza) when StateData#state.mgmt_state == active ->
|
send_stanza(StateData, Stanza) when StateData#state.mgmt_state == active ->
|
||||||
NewStateData = send_stanza_and_ack_req(StateData, Stanza),
|
NewStateData = mgmt_queue_add(StateData, Stanza),
|
||||||
mgmt_queue_add(NewStateData, Stanza);
|
mgmt_send_stanza(NewStateData, Stanza);
|
||||||
send_stanza(StateData, Stanza) ->
|
send_stanza(StateData, Stanza) ->
|
||||||
send_element(StateData, Stanza),
|
send_element(StateData, Stanza),
|
||||||
StateData.
|
StateData.
|
||||||
@ -2757,7 +2771,8 @@ handle_r(StateData) ->
|
|||||||
handle_a(StateData, Attrs) ->
|
handle_a(StateData, Attrs) ->
|
||||||
case catch jlib:binary_to_integer(fxml:get_attr_s(<<"h">>, Attrs)) of
|
case catch jlib:binary_to_integer(fxml:get_attr_s(<<"h">>, Attrs)) of
|
||||||
H when is_integer(H), H >= 0 ->
|
H when is_integer(H), H >= 0 ->
|
||||||
check_h_attribute(StateData, H);
|
NewStateData = check_h_attribute(StateData, H),
|
||||||
|
maybe_renew_ack_request(NewStateData);
|
||||||
_ ->
|
_ ->
|
||||||
?DEBUG("Ignoring invalid ACK element from ~s",
|
?DEBUG("Ignoring invalid ACK element from ~s",
|
||||||
[jid:to_string(StateData#state.jid)]),
|
[jid:to_string(StateData#state.jid)]),
|
||||||
@ -2850,18 +2865,47 @@ update_num_stanzas_in(#state{mgmt_state = MgmtState} = StateData, El)
|
|||||||
update_num_stanzas_in(StateData, _El) ->
|
update_num_stanzas_in(StateData, _El) ->
|
||||||
StateData.
|
StateData.
|
||||||
|
|
||||||
send_stanza_and_ack_req(StateData, Stanza) ->
|
mgmt_send_stanza(StateData, Stanza) ->
|
||||||
AckReq = #xmlel{name = <<"r">>,
|
case send_element(StateData, Stanza) of
|
||||||
attrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}],
|
ok ->
|
||||||
children = []},
|
maybe_request_ack(StateData);
|
||||||
case send_element(StateData, Stanza) == ok andalso
|
_ ->
|
||||||
send_element(StateData, AckReq) == ok of
|
|
||||||
true ->
|
|
||||||
StateData;
|
|
||||||
false ->
|
|
||||||
StateData#state{mgmt_state = pending}
|
StateData#state{mgmt_state = pending}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
maybe_request_ack(#state{mgmt_ack_timer = undefined} = StateData) ->
|
||||||
|
request_ack(StateData);
|
||||||
|
maybe_request_ack(StateData) ->
|
||||||
|
StateData.
|
||||||
|
|
||||||
|
request_ack(#state{mgmt_xmlns = Xmlns,
|
||||||
|
mgmt_ack_timeout = AckTimeout} = StateData) ->
|
||||||
|
AckReq = #xmlel{name = <<"r">>, attrs = [{<<"xmlns">>, Xmlns}]},
|
||||||
|
case {send_element(StateData, AckReq), AckTimeout} of
|
||||||
|
{ok, undefined} ->
|
||||||
|
ok;
|
||||||
|
{ok, Timeout} ->
|
||||||
|
Timer = erlang:send_after(Timeout, self(), close),
|
||||||
|
StateData#state{mgmt_ack_timer = Timer,
|
||||||
|
mgmt_stanzas_req = StateData#state.mgmt_stanzas_out};
|
||||||
|
_ ->
|
||||||
|
StateData#state{mgmt_state = pending}
|
||||||
|
end.
|
||||||
|
|
||||||
|
maybe_renew_ack_request(#state{mgmt_ack_timer = undefined} = StateData) ->
|
||||||
|
StateData;
|
||||||
|
maybe_renew_ack_request(#state{mgmt_ack_timer = Timer,
|
||||||
|
mgmt_queue = Queue,
|
||||||
|
mgmt_stanzas_out = NumStanzasOut,
|
||||||
|
mgmt_stanzas_req = NumStanzasReq} = StateData) ->
|
||||||
|
erlang:cancel_timer(Timer),
|
||||||
|
case NumStanzasReq < NumStanzasOut andalso not queue:is_empty(Queue) of
|
||||||
|
true ->
|
||||||
|
request_ack(StateData#state{mgmt_ack_timer = undefined});
|
||||||
|
false ->
|
||||||
|
StateData#state{mgmt_ack_timer = undefined}
|
||||||
|
end.
|
||||||
|
|
||||||
mgmt_queue_add(StateData, El) ->
|
mgmt_queue_add(StateData, El) ->
|
||||||
NewNum = case StateData#state.mgmt_stanzas_out of
|
NewNum = case StateData#state.mgmt_stanzas_out of
|
||||||
4294967295 ->
|
4294967295 ->
|
||||||
|
@ -340,6 +340,7 @@ init([Sid, Key, IP, HOpts]) ->
|
|||||||
Opts1 = ejabberd_c2s_config:get_c2s_limits(),
|
Opts1 = ejabberd_c2s_config:get_c2s_limits(),
|
||||||
SOpts = lists:filtermap(fun({stream_management, _}) -> true;
|
SOpts = lists:filtermap(fun({stream_management, _}) -> true;
|
||||||
({max_ack_queue, _}) -> true;
|
({max_ack_queue, _}) -> true;
|
||||||
|
({ack_timeout, _}) -> true;
|
||||||
({resume_timeout, _}) -> true;
|
({resume_timeout, _}) -> true;
|
||||||
({max_resume_timeout, _}) -> true;
|
({max_resume_timeout, _}) -> true;
|
||||||
({resend_on_timeout, _}) -> true;
|
({resend_on_timeout, _}) -> true;
|
||||||
|
@ -114,6 +114,7 @@ socket_handoff(LocalPath, Request, Socket, SockMod, Buf, Opts) ->
|
|||||||
init([{#ws{ip = IP, http_opts = HOpts}, _} = WS]) ->
|
init([{#ws{ip = IP, http_opts = HOpts}, _} = WS]) ->
|
||||||
SOpts = lists:filtermap(fun({stream_management, _}) -> true;
|
SOpts = lists:filtermap(fun({stream_management, _}) -> true;
|
||||||
({max_ack_queue, _}) -> true;
|
({max_ack_queue, _}) -> true;
|
||||||
|
({ack_timeout, _}) -> true;
|
||||||
({resume_timeout, _}) -> true;
|
({resume_timeout, _}) -> true;
|
||||||
({max_resume_timeout, _}) -> true;
|
({max_resume_timeout, _}) -> true;
|
||||||
({resend_on_timeout, _}) -> true;
|
({resend_on_timeout, _}) -> true;
|
||||||
|
Loading…
Reference in New Issue
Block a user