mirror of
https://github.com/processone/ejabberd.git
synced 2024-06-18 22:15:20 +02:00
XEP-0198: Use "mgmt_" prefix for all #state fields
Prefix all ejabberd_c2s #state fields that are used for stream management with "mgmt_".
This commit is contained in:
parent
3b3f3b9131
commit
8b1f92575a
|
@ -109,15 +109,15 @@
|
|||
auth_module = unknown,
|
||||
ip,
|
||||
aux_fields = [],
|
||||
sm_state,
|
||||
sm_xmlns,
|
||||
ack_queue,
|
||||
max_ack_queue,
|
||||
pending_since,
|
||||
resume_timeout,
|
||||
resend_on_timeout,
|
||||
n_stanzas_in = 0,
|
||||
n_stanzas_out = 0,
|
||||
mgmt_state,
|
||||
mgmt_xmlns,
|
||||
mgmt_queue,
|
||||
mgmt_max_queue,
|
||||
mgmt_pending_since,
|
||||
mgmt_timeout,
|
||||
mgmt_resend,
|
||||
mgmt_stanzas_in = 0,
|
||||
mgmt_stanzas_out = 0,
|
||||
lang}).
|
||||
|
||||
%-define(DBGFSM, true).
|
||||
|
@ -174,31 +174,31 @@
|
|||
Name == <<"a">>;
|
||||
Name == <<"r">>).
|
||||
|
||||
-define(IS_SUPPORTED_SM_XMLNS(Xmlns),
|
||||
-define(IS_SUPPORTED_MGMT_XMLNS(Xmlns),
|
||||
Xmlns == ?NS_STREAM_MGMT_2;
|
||||
Xmlns == ?NS_STREAM_MGMT_3).
|
||||
|
||||
-define(SM_FAILED(Condition, Xmlns),
|
||||
-define(MGMT_FAILED(Condition, Xmlns),
|
||||
#xmlel{name = <<"failed">>,
|
||||
attrs = [{<<"xmlns">>, Xmlns}],
|
||||
children = [#xmlel{name = Condition,
|
||||
attrs = [{<<"xmlns">>, ?NS_STANZAS}],
|
||||
children = []}]}).
|
||||
|
||||
-define(SM_BAD_REQUEST(Xmlns),
|
||||
?SM_FAILED(<<"bad-request">>, Xmlns)).
|
||||
-define(MGMT_BAD_REQUEST(Xmlns),
|
||||
?MGMT_FAILED(<<"bad-request">>, Xmlns)).
|
||||
|
||||
-define(SM_ITEM_NOT_FOUND(Xmlns),
|
||||
?SM_FAILED(<<"item-not-found">>, Xmlns)).
|
||||
-define(MGMT_ITEM_NOT_FOUND(Xmlns),
|
||||
?MGMT_FAILED(<<"item-not-found">>, Xmlns)).
|
||||
|
||||
-define(SM_SERVICE_UNAVAILABLE(Xmlns),
|
||||
?SM_FAILED(<<"service-unavailable">>, Xmlns)).
|
||||
-define(MGMT_SERVICE_UNAVAILABLE(Xmlns),
|
||||
?MGMT_FAILED(<<"service-unavailable">>, Xmlns)).
|
||||
|
||||
-define(SM_UNEXPECTED_REQUEST(Xmlns),
|
||||
?SM_FAILED(<<"unexpected-request">>, Xmlns)).
|
||||
-define(MGMT_UNEXPECTED_REQUEST(Xmlns),
|
||||
?MGMT_FAILED(<<"unexpected-request">>, Xmlns)).
|
||||
|
||||
-define(SM_UNSUPPORTED_VERSION(Xmlns),
|
||||
?SM_FAILED(<<"unsupported-version">>, Xmlns)).
|
||||
-define(MGMT_UNSUPPORTED_VERSION(Xmlns),
|
||||
?MGMT_FAILED(<<"unsupported-version">>, Xmlns)).
|
||||
|
||||
%%%----------------------------------------------------------------------
|
||||
%%% API
|
||||
|
@ -328,10 +328,10 @@ init([{SockMod, Socket}, Opts]) ->
|
|||
tls_enabled = TLSEnabled, tls_options = TLSOpts,
|
||||
sid = {now(), self()}, streamid = new_id(),
|
||||
access = Access, shaper = Shaper, ip = IP,
|
||||
sm_state = StreamMgmtState,
|
||||
max_ack_queue = MaxAckQueue,
|
||||
resume_timeout = ResumeTimeout,
|
||||
resend_on_timeout = ResendOnTimeout},
|
||||
mgmt_state = StreamMgmtState,
|
||||
mgmt_max_queue = MaxAckQueue,
|
||||
mgmt_timeout = ResumeTimeout,
|
||||
mgmt_resend = ResendOnTimeout},
|
||||
{ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}
|
||||
end.
|
||||
|
||||
|
@ -695,9 +695,11 @@ wait_for_auth({xmlstreamerror, _}, StateData) ->
|
|||
wait_for_auth(closed, StateData) ->
|
||||
{stop, normal, StateData}.
|
||||
|
||||
wait_for_feature_request({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
|
||||
wait_for_feature_request({xmlstreamelement, #xmlel{name = Name} = El},
|
||||
StateData)
|
||||
when ?IS_STREAM_MGMT_TAG(Name) ->
|
||||
fsm_next_state(wait_for_feature_request, dispatch_stream_mgmt(El, StateData));
|
||||
fsm_next_state(wait_for_feature_request,
|
||||
dispatch_stream_mgmt(El, StateData));
|
||||
wait_for_feature_request({xmlstreamelement, El},
|
||||
StateData) ->
|
||||
#xmlel{name = Name, attrs = Attrs, children = Els} = El,
|
||||
|
@ -1176,11 +1178,11 @@ session_established({xmlstreamerror, _}, StateData) ->
|
|||
send_trailer(StateData),
|
||||
{stop, normal, StateData};
|
||||
session_established(closed, StateData)
|
||||
when StateData#state.resume_timeout > 0,
|
||||
StateData#state.sm_state == active orelse
|
||||
StateData#state.sm_state == pending ->
|
||||
when StateData#state.mgmt_timeout > 0,
|
||||
StateData#state.mgmt_state == active orelse
|
||||
StateData#state.mgmt_state == pending ->
|
||||
log_pending_state(StateData),
|
||||
fsm_next_state(wait_for_resume, StateData#state{sm_state = pending});
|
||||
fsm_next_state(wait_for_resume, StateData#state{mgmt_state = pending});
|
||||
session_established(closed, StateData) ->
|
||||
{stop, normal, StateData}.
|
||||
|
||||
|
@ -1333,7 +1335,7 @@ handle_sync_event(resume_session, _From, _StateName,
|
|||
StateData#state.user,
|
||||
StateData#state.server,
|
||||
StateData#state.resource),
|
||||
{stop, normal, {ok, StateData}, StateData#state{sm_state = resumed}};
|
||||
{stop, normal, {ok, StateData}, StateData#state{mgmt_state = resumed}};
|
||||
handle_sync_event(_Event, _From, StateName,
|
||||
StateData) ->
|
||||
Reply = ok, fsm_reply(Reply, StateName, StateData).
|
||||
|
@ -1662,11 +1664,12 @@ handle_info({route, From, To,
|
|||
handle_info({'DOWN', Monitor, _Type, _Object, _Info},
|
||||
_StateName, StateData)
|
||||
when Monitor == StateData#state.socket_monitor ->
|
||||
if StateData#state.resume_timeout > 0,
|
||||
StateData#state.sm_state == active orelse
|
||||
StateData#state.sm_state == pending ->
|
||||
if StateData#state.mgmt_timeout > 0,
|
||||
StateData#state.mgmt_state == active orelse
|
||||
StateData#state.mgmt_state == pending ->
|
||||
log_pending_state(StateData),
|
||||
fsm_next_state(wait_for_resume, StateData#state{sm_state = pending});
|
||||
fsm_next_state(wait_for_resume,
|
||||
StateData#state{mgmt_state = pending});
|
||||
true ->
|
||||
{stop, normal, StateData}
|
||||
end;
|
||||
|
@ -1733,7 +1736,7 @@ print_state(State = #state{pres_t = T, pres_f = F, pres_a = A, pres_i = I}) ->
|
|||
%% Returns: any
|
||||
%%----------------------------------------------------------------------
|
||||
terminate(_Reason, StateName, StateData) ->
|
||||
case StateData#state.sm_state of
|
||||
case StateData#state.mgmt_state of
|
||||
resumed ->
|
||||
?INFO_MSG("Closing former stream of resumed session for ~s",
|
||||
[jlib:jid_to_string(StateData#state.jid)]);
|
||||
|
@ -1812,7 +1815,7 @@ change_shaper(StateData, JID) ->
|
|||
(StateData#state.sockmod):change_shaper(StateData#state.socket,
|
||||
Shaper).
|
||||
|
||||
send_text(StateData, Text) when StateData#state.sm_state == pending ->
|
||||
send_text(StateData, Text) when StateData#state.mgmt_state == pending ->
|
||||
?DEBUG("Cannot send text while waiting for resumption: ~p", [Text]);
|
||||
send_text(StateData, Text) when StateData#state.xml_socket ->
|
||||
?DEBUG("Send Text on stream = ~p", [Text]),
|
||||
|
@ -1822,7 +1825,7 @@ send_text(StateData, Text) ->
|
|||
?DEBUG("Send XML on stream = ~p", [Text]),
|
||||
(StateData#state.sockmod):send(StateData#state.socket, Text).
|
||||
|
||||
send_element(StateData, El) when StateData#state.sm_state == pending ->
|
||||
send_element(StateData, El) when StateData#state.mgmt_state == pending ->
|
||||
?DEBUG("Cannot send element while waiting for resumption: ~p", [El]);
|
||||
send_element(StateData, El) when StateData#state.xml_socket ->
|
||||
(StateData#state.sockmod):send_xml(StateData#state.socket,
|
||||
|
@ -1830,11 +1833,11 @@ send_element(StateData, El) when StateData#state.xml_socket ->
|
|||
send_element(StateData, El) ->
|
||||
send_text(StateData, xml:element_to_binary(El)).
|
||||
|
||||
send_stanza(StateData, Stanza) when StateData#state.sm_state == pending ->
|
||||
ack_queue_add(StateData, Stanza);
|
||||
send_stanza(StateData, Stanza) when StateData#state.sm_state == active ->
|
||||
send_stanza(StateData, Stanza) when StateData#state.mgmt_state == pending ->
|
||||
mgmt_queue_add(StateData, Stanza);
|
||||
send_stanza(StateData, Stanza) when StateData#state.mgmt_state == active ->
|
||||
send_stanza_and_ack_req(StateData, Stanza),
|
||||
ack_queue_add(StateData, Stanza);
|
||||
mgmt_queue_add(StateData, Stanza);
|
||||
send_stanza(StateData, Stanza) ->
|
||||
send_element(StateData, Stanza),
|
||||
StateData.
|
||||
|
@ -1874,7 +1877,7 @@ send_header(StateData, Server, Version, Lang) ->
|
|||
send_text(StateData, iolist_to_binary(Header)).
|
||||
|
||||
send_trailer(StateData)
|
||||
when StateData#state.sm_state == pending ->
|
||||
when StateData#state.mgmt_state == pending ->
|
||||
?DEBUG("Cannot send stream trailer while waiting for resumption", []);
|
||||
send_trailer(StateData)
|
||||
when StateData#state.xml_socket ->
|
||||
|
@ -2432,14 +2435,14 @@ fsm_next_state_gc(StateName, PackedStateData) ->
|
|||
fsm_next_state(session_established, StateData) ->
|
||||
{next_state, session_established, StateData,
|
||||
?C2S_HIBERNATE_TIMEOUT};
|
||||
fsm_next_state(wait_for_resume, #state{pending_since = undefined} =
|
||||
fsm_next_state(wait_for_resume, #state{mgmt_pending_since = undefined} =
|
||||
StateData) ->
|
||||
{next_state, wait_for_resume,
|
||||
StateData#state{pending_since = os:timestamp()},
|
||||
StateData#state.resume_timeout};
|
||||
StateData#state{mgmt_pending_since = os:timestamp()},
|
||||
StateData#state.mgmt_timeout};
|
||||
fsm_next_state(wait_for_resume, StateData) ->
|
||||
Diff = timer:now_diff(os:timestamp(), StateData#state.pending_since),
|
||||
Timeout = max(StateData#state.resume_timeout - Diff div 1000, 1),
|
||||
Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since),
|
||||
Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1),
|
||||
{next_state, wait_for_resume, StateData, Timeout};
|
||||
fsm_next_state(StateName, StateData) ->
|
||||
{next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}.
|
||||
|
@ -2449,14 +2452,14 @@ fsm_next_state(StateName, StateData) ->
|
|||
fsm_reply(Reply, session_established, StateData) ->
|
||||
{reply, Reply, session_established, StateData,
|
||||
?C2S_HIBERNATE_TIMEOUT};
|
||||
fsm_reply(Reply, wait_for_resume, #state{pending_since = undefined} =
|
||||
fsm_reply(Reply, wait_for_resume, #state{mgmt_pending_since = undefined} =
|
||||
StateData) ->
|
||||
{reply, Reply, wait_for_resume,
|
||||
StateData#state{pending_since = os:timestamp()},
|
||||
StateData#state.resume_timeout};
|
||||
StateData#state{mgmt_pending_since = os:timestamp()},
|
||||
StateData#state.mgmt_timeout};
|
||||
fsm_reply(Reply, wait_for_resume, StateData) ->
|
||||
Diff = timer:now_diff(os:timestamp(), StateData#state.pending_since),
|
||||
Timeout = max(StateData#state.resume_timeout - Diff div 1000, 1),
|
||||
Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since),
|
||||
Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1),
|
||||
{reply, Reply, wait_for_resume, StateData, Timeout};
|
||||
fsm_reply(Reply, StateName, StateData) ->
|
||||
{reply, Reply, StateName, StateData, ?C2S_OPEN_TIMEOUT}.
|
||||
|
@ -2556,13 +2559,14 @@ route_blocking(What, StateData) ->
|
|||
%%% XEP-0198
|
||||
%%%----------------------------------------------------------------------
|
||||
|
||||
stream_mgmt_enabled(#state{sm_state = disabled}) ->
|
||||
stream_mgmt_enabled(#state{mgmt_state = disabled}) ->
|
||||
false;
|
||||
stream_mgmt_enabled(_StateData) ->
|
||||
true.
|
||||
|
||||
dispatch_stream_mgmt(El, StateData) when StateData#state.sm_state == active;
|
||||
StateData#state.sm_state == pending ->
|
||||
dispatch_stream_mgmt(El, StateData)
|
||||
when StateData#state.mgmt_state == active;
|
||||
StateData#state.mgmt_state == pending ->
|
||||
perform_stream_mgmt(El, StateData);
|
||||
dispatch_stream_mgmt(El, StateData) ->
|
||||
negotiate_stream_mgmt(El, StateData).
|
||||
|
@ -2573,39 +2577,39 @@ negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) ->
|
|||
%% Binding unless it is resuming a previous session". However, it also
|
||||
%% says: "Stream management errors SHOULD be considered recoverable", so we
|
||||
%% won't bail out.
|
||||
send_element(StateData, ?SM_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)),
|
||||
send_element(StateData, ?MGMT_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)),
|
||||
StateData;
|
||||
negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
|
||||
case xml:get_attr_s(<<"xmlns">>, Attrs) of
|
||||
Xmlns when ?IS_SUPPORTED_SM_XMLNS(Xmlns) ->
|
||||
Xmlns when ?IS_SUPPORTED_MGMT_XMLNS(Xmlns) ->
|
||||
case stream_mgmt_enabled(StateData) of
|
||||
true ->
|
||||
case Name of
|
||||
<<"enable">> ->
|
||||
handle_enable(StateData#state{sm_xmlns = Xmlns}, Attrs);
|
||||
handle_enable(StateData#state{mgmt_xmlns = Xmlns}, Attrs);
|
||||
_ ->
|
||||
Res = if Name == <<"a">>;
|
||||
Name == <<"r">>;
|
||||
Name == <<"resume">> ->
|
||||
?SM_UNEXPECTED_REQUEST(Xmlns);
|
||||
?MGMT_UNEXPECTED_REQUEST(Xmlns);
|
||||
true ->
|
||||
?SM_BAD_REQUEST(Xmlns)
|
||||
?MGMT_BAD_REQUEST(Xmlns)
|
||||
end,
|
||||
send_element(StateData, Res),
|
||||
StateData
|
||||
end;
|
||||
false ->
|
||||
send_element(StateData, ?SM_SERVICE_UNAVAILABLE(Xmlns)),
|
||||
send_element(StateData, ?MGMT_SERVICE_UNAVAILABLE(Xmlns)),
|
||||
StateData
|
||||
end;
|
||||
_ ->
|
||||
send_element(StateData, ?SM_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3)),
|
||||
send_element(StateData, ?MGMT_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3)),
|
||||
StateData
|
||||
end.
|
||||
|
||||
perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
|
||||
case xml:get_attr_s(<<"xmlns">>, Attrs) of
|
||||
Xmlns when Xmlns == StateData#state.sm_xmlns ->
|
||||
Xmlns when Xmlns == StateData#state.mgmt_xmlns ->
|
||||
case Name of
|
||||
<<"r">> ->
|
||||
handle_r(StateData);
|
||||
|
@ -2614,20 +2618,20 @@ perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
|
|||
_ ->
|
||||
Res = if Name == <<"enable">>;
|
||||
Name == <<"resume">> ->
|
||||
?SM_UNEXPECTED_REQUEST(Xmlns);
|
||||
?MGMT_UNEXPECTED_REQUEST(Xmlns);
|
||||
true ->
|
||||
?SM_BAD_REQUEST(Xmlns)
|
||||
?MGMT_BAD_REQUEST(Xmlns)
|
||||
end,
|
||||
send_element(StateData, Res),
|
||||
StateData
|
||||
end;
|
||||
_ ->
|
||||
send_element(StateData,
|
||||
?SM_UNSUPPORTED_VERSION(StateData#state.sm_xmlns)),
|
||||
?MGMT_UNSUPPORTED_VERSION(StateData#state.mgmt_xmlns)),
|
||||
StateData
|
||||
end.
|
||||
|
||||
handle_enable(#state{resume_timeout = ConfigTimeout} = StateData, Attrs) ->
|
||||
handle_enable(#state{mgmt_timeout = ConfigTimeout} = StateData, Attrs) ->
|
||||
Timeout = case xml:get_attr_s(<<"resume">>, Attrs) of
|
||||
ResumeAttr when ResumeAttr == <<"true">>;
|
||||
ResumeAttr == <<"1">> ->
|
||||
|
@ -2641,7 +2645,7 @@ handle_enable(#state{resume_timeout = ConfigTimeout} = StateData, Attrs) ->
|
|||
_ ->
|
||||
0
|
||||
end,
|
||||
ResAttrs = [{<<"xmlns">>, StateData#state.sm_xmlns}] ++
|
||||
ResAttrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}] ++
|
||||
if Timeout > 0 ->
|
||||
?INFO_MSG("Stream management with resumption enabled for ~s",
|
||||
[jlib:jid_to_string(StateData#state.jid)]),
|
||||
|
@ -2657,25 +2661,26 @@ handle_enable(#state{resume_timeout = ConfigTimeout} = StateData, Attrs) ->
|
|||
attrs = ResAttrs,
|
||||
children = []},
|
||||
send_element(StateData, Res),
|
||||
StateData#state{sm_state = active,
|
||||
ack_queue = queue:new(),
|
||||
resume_timeout = Timeout * 1000}.
|
||||
StateData#state{mgmt_state = active,
|
||||
mgmt_queue = queue:new(),
|
||||
mgmt_timeout = Timeout * 1000}.
|
||||
|
||||
handle_r(StateData) ->
|
||||
H = jlib:integer_to_binary(StateData#state.n_stanzas_in),
|
||||
H = jlib:integer_to_binary(StateData#state.mgmt_stanzas_in),
|
||||
Res = #xmlel{name = <<"a">>,
|
||||
attrs = [{<<"xmlns">>, StateData#state.sm_xmlns},
|
||||
attrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns},
|
||||
{<<"h">>, H}],
|
||||
children = []},
|
||||
send_element(StateData, Res),
|
||||
StateData.
|
||||
|
||||
handle_a(#state{jid = JID, n_stanzas_out = NumStanzasOut} = StateData, Attrs) ->
|
||||
handle_a(#state{jid = JID, mgmt_stanzas_out = NumStanzasOut} = StateData,
|
||||
Attrs) ->
|
||||
case catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs)) of
|
||||
H when is_integer(H), H >= 0 ->
|
||||
?DEBUG("~s acknowledged ~B of ~B stanzas",
|
||||
[jlib:jid_to_string(JID), H, NumStanzasOut]),
|
||||
ack_queue_drop(StateData, H);
|
||||
mgmt_queue_drop(StateData, H);
|
||||
_ ->
|
||||
?WARNING_MSG("Ignoring invalid ACK element from ~s",
|
||||
[jlib:jid_to_string(JID)]),
|
||||
|
@ -2684,7 +2689,7 @@ handle_a(#state{jid = JID, n_stanzas_out = NumStanzasOut} = StateData, Attrs) ->
|
|||
|
||||
handle_resume(StateData, Attrs) ->
|
||||
R = case xml:get_attr_s(<<"xmlns">>, Attrs) of
|
||||
Xmlns when ?IS_SUPPORTED_SM_XMLNS(Xmlns) ->
|
||||
Xmlns when ?IS_SUPPORTED_MGMT_XMLNS(Xmlns) ->
|
||||
case stream_mgmt_enabled(StateData) of
|
||||
true ->
|
||||
case {xml:get_attr(<<"previd">>, Attrs),
|
||||
|
@ -2695,23 +2700,26 @@ handle_resume(StateData, Attrs) ->
|
|||
{ok, InheritedState} ->
|
||||
{ok, InheritedState, H};
|
||||
{error, Err} ->
|
||||
{error, ?SM_ITEM_NOT_FOUND(Xmlns), Err}
|
||||
{error, ?MGMT_ITEM_NOT_FOUND(Xmlns), Err}
|
||||
end;
|
||||
_ ->
|
||||
{error, ?SM_BAD_REQUEST(Xmlns), <<"Invalid request">>}
|
||||
{error, ?MGMT_BAD_REQUEST(Xmlns),
|
||||
<<"Invalid request">>}
|
||||
end;
|
||||
false ->
|
||||
{error, ?SM_SERVICE_UNAVAILABLE(Xmlns), <<"XEP-0198 disabled">>}
|
||||
{error, ?MGMT_SERVICE_UNAVAILABLE(Xmlns),
|
||||
<<"XEP-0198 disabled">>}
|
||||
end;
|
||||
_ ->
|
||||
{error, ?SM_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3), <<"Invalid XMLNS">>}
|
||||
{error, ?MGMT_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3),
|
||||
<<"Invalid XMLNS">>}
|
||||
end,
|
||||
case R of
|
||||
{ok, ResumedState, NumHandled} ->
|
||||
NewState = ack_queue_drop(ResumedState, NumHandled),
|
||||
AttrXmlns = NewState#state.sm_xmlns,
|
||||
NewState = mgmt_queue_drop(ResumedState, NumHandled),
|
||||
AttrXmlns = NewState#state.mgmt_xmlns,
|
||||
AttrId = make_resume_id(NewState),
|
||||
AttrH = jlib:integer_to_binary(NewState#state.n_stanzas_in),
|
||||
AttrH = jlib:integer_to_binary(NewState#state.mgmt_stanzas_in),
|
||||
send_element(NewState,
|
||||
#xmlel{name = <<"resumed">>,
|
||||
attrs = [{<<"xmlns">>, AttrXmlns},
|
||||
|
@ -2734,8 +2742,8 @@ handle_resume(StateData, Attrs) ->
|
|||
error
|
||||
end.
|
||||
|
||||
update_num_stanzas_in(#state{sm_state = active} = StateData, El) ->
|
||||
NewNum = case {is_stanza(El), StateData#state.n_stanzas_in} of
|
||||
update_num_stanzas_in(#state{mgmt_state = active} = StateData, El) ->
|
||||
NewNum = case {is_stanza(El), StateData#state.mgmt_stanzas_in} of
|
||||
{true, 4294967295} ->
|
||||
0;
|
||||
{true, Num} ->
|
||||
|
@ -2743,59 +2751,60 @@ update_num_stanzas_in(#state{sm_state = active} = StateData, El) ->
|
|||
{false, Num} ->
|
||||
Num
|
||||
end,
|
||||
StateData#state{n_stanzas_in = NewNum};
|
||||
StateData#state{mgmt_stanzas_in = NewNum};
|
||||
update_num_stanzas_in(StateData, _El) ->
|
||||
StateData.
|
||||
|
||||
send_stanza_and_ack_req(StateData, Stanza) ->
|
||||
AckReq = #xmlel{name = <<"r">>,
|
||||
attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}],
|
||||
attrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}],
|
||||
children = []},
|
||||
StanzaS = xml:element_to_binary(Stanza),
|
||||
AckReqS = xml:element_to_binary(AckReq),
|
||||
send_text(StateData, [StanzaS, AckReqS]).
|
||||
|
||||
ack_queue_add(StateData, El) ->
|
||||
NewNum = case StateData#state.n_stanzas_out of
|
||||
mgmt_queue_add(StateData, El) ->
|
||||
NewNum = case StateData#state.mgmt_stanzas_out of
|
||||
4294967295 ->
|
||||
0;
|
||||
Num ->
|
||||
Num + 1
|
||||
end,
|
||||
NewState = limit_queue_length(StateData),
|
||||
NewQueue = queue:in({NewNum, El}, NewState#state.ack_queue),
|
||||
NewState#state{ack_queue = NewQueue, n_stanzas_out = NewNum}.
|
||||
NewQueue = queue:in({NewNum, El}, NewState#state.mgmt_queue),
|
||||
NewState#state{mgmt_queue = NewQueue, mgmt_stanzas_out = NewNum}.
|
||||
|
||||
ack_queue_drop(StateData, NumHandled) ->
|
||||
mgmt_queue_drop(StateData, NumHandled) ->
|
||||
NewQueue = jlib:queue_drop_while(fun({N, _Stanza}) -> N =< NumHandled end,
|
||||
StateData#state.ack_queue),
|
||||
StateData#state{ack_queue = NewQueue}.
|
||||
StateData#state.mgmt_queue),
|
||||
StateData#state{mgmt_queue = NewQueue}.
|
||||
|
||||
limit_queue_length(#state{max_ack_queue = Limit} = StateData)
|
||||
limit_queue_length(#state{mgmt_max_queue = Limit} = StateData)
|
||||
when Limit == infinity;
|
||||
Limit == unlimited ->
|
||||
StateData;
|
||||
limit_queue_length(#state{jid = JID,
|
||||
ack_queue = Queue,
|
||||
max_ack_queue = Limit} = StateData) ->
|
||||
mgmt_queue = Queue,
|
||||
mgmt_max_queue = Limit} = StateData) ->
|
||||
case queue:len(Queue) >= Limit of
|
||||
true ->
|
||||
?WARNING_MSG("Dropping stanza from too long ACK queue for ~s",
|
||||
[jlib:jid_to_string(JID)]),
|
||||
limit_queue_length(StateData#state{ack_queue = queue:drop(Queue)});
|
||||
limit_queue_length(StateData#state{mgmt_queue = queue:drop(Queue)});
|
||||
false ->
|
||||
StateData
|
||||
end.
|
||||
|
||||
log_pending_state(StateData) when StateData#state.sm_state /= pending ->
|
||||
log_pending_state(StateData) when StateData#state.mgmt_state /= pending ->
|
||||
?INFO_MSG("Waiting for resumption of stream for ~s",
|
||||
[jlib:jid_to_string(StateData#state.jid)]);
|
||||
log_pending_state(_StateData) ->
|
||||
ok.
|
||||
|
||||
handle_unacked_stanzas(StateData, F) when StateData#state.sm_state == active;
|
||||
StateData#state.sm_state == pending ->
|
||||
Queue = StateData#state.ack_queue,
|
||||
handle_unacked_stanzas(StateData, F)
|
||||
when StateData#state.mgmt_state == active;
|
||||
StateData#state.mgmt_state == pending ->
|
||||
Queue = StateData#state.mgmt_queue,
|
||||
case queue:len(Queue) of
|
||||
0 ->
|
||||
ok;
|
||||
|
@ -2814,9 +2823,10 @@ handle_unacked_stanzas(StateData, F) when StateData#state.sm_state == active;
|
|||
handle_unacked_stanzas(_StateData, _F) ->
|
||||
ok.
|
||||
|
||||
handle_unacked_stanzas(StateData) when StateData#state.sm_state == active;
|
||||
StateData#state.sm_state == pending ->
|
||||
ReRoute = case StateData#state.resend_on_timeout of
|
||||
handle_unacked_stanzas(StateData)
|
||||
when StateData#state.mgmt_state == active;
|
||||
StateData#state.mgmt_state == pending ->
|
||||
ReRoute = case StateData#state.mgmt_resend of
|
||||
true ->
|
||||
fun ejabberd_router:route/3;
|
||||
false ->
|
||||
|
@ -2908,12 +2918,12 @@ inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) ->
|
|||
pres_invis = OldStateData#state.pres_invis,
|
||||
privacy_list = OldStateData#state.privacy_list,
|
||||
aux_fields = OldStateData#state.aux_fields,
|
||||
sm_xmlns = OldStateData#state.sm_xmlns,
|
||||
ack_queue = OldStateData#state.ack_queue,
|
||||
resume_timeout = OldStateData#state.resume_timeout,
|
||||
n_stanzas_in = OldStateData#state.n_stanzas_in,
|
||||
n_stanzas_out = OldStateData#state.n_stanzas_out,
|
||||
sm_state = active}};
|
||||
mgmt_xmlns = OldStateData#state.mgmt_xmlns,
|
||||
mgmt_queue = OldStateData#state.mgmt_queue,
|
||||
mgmt_timeout = OldStateData#state.mgmt_timeout,
|
||||
mgmt_stanzas_in = OldStateData#state.mgmt_stanzas_in,
|
||||
mgmt_stanzas_out = OldStateData#state.mgmt_stanzas_out,
|
||||
mgmt_state = active}};
|
||||
_ ->
|
||||
{error, <<"Cannot grab session state">>}
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue
Block a user