Add initial XEP-0198 support (EJAB-532)

Implement partial support for XEP-0198: Stream Management.  After
successful negotiation of this feature, the server requests an ACK for
each stanza transmitted to the client and responds to ACK requests
issued by the client.  On session termination, the server re-routes any
unacknowledged stanzas.  The length of the pending queue can be limited
by setting the "max_ack_queue" option to some integer value (default:
500).  XEP-0198 support can be disabled entirely by setting the
"stream_management" option to false (default: true).

So far, stream management is implemented only for c2s connections, and
the optional stream resumption feature also described in XEP-0198 is not
(yet) supported.

This addition was originally based on a patch provided by Magnus Henoch
and updated by Grzegorz Grasza.  Their code implements an early draft of
XEP-0198 for some previous version of ejabberd.  It has since been
rewritten almost entirely.
This commit is contained in:
Holger Weiss 2014-03-12 23:34:14 +01:00 committed by Holger Wei
parent 0f0e99ccd3
commit 7d594086c3
4 changed files with 374 additions and 64 deletions

View File

@ -870,9 +870,10 @@ The available modules, their purpose and the options allowed by each one are:
\titem{\texttt{ejabberd\_c2s}}
Handles c2s connections.\\
Options: \texttt{access}, \texttt{certfile}, \texttt{ciphers},
\texttt{max\_fsm\_queue},
\texttt{max\_ack\_queue}, \texttt{max\_fsm\_queue},
\texttt{max\_stanza\_size}, \texttt{shaper},
\texttt{starttls}, \texttt{starttls\_required}, \texttt{tls},
\texttt{starttls}, \texttt{starttls\_required},
\texttt{stream\_management}, \texttt{tls},
\texttt{zlib}, \texttt{tls\_compression}
\titem{\texttt{ejabberd\_s2s\_in}}
Handles incoming s2s connections.\\
@ -962,6 +963,13 @@ This is a detailed description of each option allowed by the listening modules:
\term{http\_poll\_timeout}. The default value is five minutes.
The option can be defined in \term{ejabberd.yml}, expressing the time
in seconds: \verb|{http_poll_timeout, 300}.|
\titem{max\_ack\_queue: Size}
This option specifies the maximum number of unacknowledged stanzas
queued for possible retransmission if \term{stream\_management} is
enabled. When the limit is reached, the first stanza is dropped from
the queue before adding the next one. This option can be specified
for \term{ejabberd\_c2s} listeners. The allowed values are positive
integers and \term{infinity}. Default value: \term{500}.
\titem{max\_fsm\_queue: Size}
This option specifies the maximum number of elements in the queue of the FSM
(Finite State Machine).
@ -1022,6 +1030,10 @@ request_handlers:
No unencrypted connections will be allowed.
You should also set the \option{certfile} option.
You can define a certificate file for a specific domain using the global option \option{domain\_certfile}.
\titem{stream\_management: true|false}
Setting this option to \term{false} disables ejabberd's support for
\ind{protocols!XEP-0198: Stream Management}. It can be specified for
\term{ejabberd\_c2s} listeners. The default value is \term{true}.
\titem{timeout: Integer} \ind{options!timeout}
Timeout of the connections, expressed in milliseconds.
Default: 5000

View File

@ -144,3 +144,5 @@
-define(NS_MEDIA, <<"urn:xmpp:media-element">>).
-define(NS_BOB, <<"urn:xmpp:bob">>).
-define(NS_PING, <<"urn:xmpp:ping">>).
-define(NS_STREAM_MGMT_2, <<"urn:xmpp:sm:2">>).
-define(NS_STREAM_MGMT_3, <<"urn:xmpp:sm:3">>).

View File

@ -108,6 +108,12 @@
auth_module = unknown,
ip,
aux_fields = [],
sm_xmlns,
ack_queue,
max_ack_queue,
manage_stream = fun negotiate_stream_mgmt/2,
n_stanzas_in = 0,
n_stanzas_out = 0,
lang}).
%-define(DBGFSM, true).
@ -156,6 +162,35 @@
-define(INVALID_FROM, ?SERR_INVALID_FROM).
%% XEP-0198:
-define(IS_STREAM_MGMT_TAG(Name),
Name == <<"enable">>;
Name == <<"a">>;
Name == <<"r">>).
-define(IS_SUPPORTED_SM_XMLNS(Xmlns),
Xmlns == ?NS_STREAM_MGMT_2;
Xmlns == ?NS_STREAM_MGMT_3).
-define(SM_FAILED(Condition, Xmlns),
#xmlel{name = <<"failed">>,
attrs = [{<<"xmlns">>, Xmlns}],
children = [#xmlel{name = Condition,
attrs = [{<<"xmlns">>, ?NS_STANZAS}],
children = []}]}).
-define(SM_BAD_REQUEST(Xmlns),
?SM_FAILED(<<"bad-request">>, Xmlns)).
-define(SM_SERVICE_UNAVAILABLE(Xmlns),
?SM_FAILED(<<"service-unavailable">>, Xmlns)).
-define(SM_UNEXPECTED_REQUEST(Xmlns),
?SM_FAILED(<<"unexpected-request">>, Xmlns)).
-define(SM_UNSUPPORTED_VERSION(Xmlns),
?SM_FAILED(<<"unsupported-version">>, Xmlns)).
%%%----------------------------------------------------------------------
%%% API
@ -250,6 +285,16 @@ init([{SockMod, Socket}, Opts]) ->
true -> TLSOpts1
end,
TLSOpts = [verify_none | TLSOpts2],
MaxAckQueue = case proplists:get_value(max_ack_queue, Opts) of
Limit when is_integer(Limit), Limit > 0 -> Limit;
_ -> 500
end,
StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true),
AckQueue = if not StreamMgmtEnabled ->
none;
true ->
undefined
end,
IP = peerip(SockMod, Socket),
%% Check if IP is blacklisted:
case is_ip_blacklisted(IP) of
@ -271,7 +316,8 @@ init([{SockMod, Socket}, Opts]) ->
tls_required = StartTLSRequired,
tls_enabled = TLSEnabled, tls_options = TLSOpts,
streamid = new_id(), access = Access,
shaper = Shaper, ip = IP},
shaper = Shaper, ip = IP,
ack_queue = AckQueue, max_ack_queue = MaxAckQueue},
{ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}
end.
@ -402,6 +448,18 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
ejabberd_hooks:run_fold(roster_get_versioning_feature,
Server, [],
[Server]),
StreamManagementFeature =
case stream_mgmt_enabled(StateData) of
true ->
[#xmlel{name = <<"sm">>,
attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_2}],
children = []},
#xmlel{name = <<"sm">>,
attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_3}],
children = []}];
false ->
[]
end,
StreamFeatures = [#xmlel{name = <<"bind">>,
attrs = [{<<"xmlns">>, ?NS_BIND}],
children = []},
@ -410,6 +468,7 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
children = []}]
++
RosterVersioningFeature ++
StreamManagementFeature ++
ejabberd_hooks:run_fold(c2s_stream_features,
Server, [], [Server]),
send_element(StateData,
@ -472,6 +531,9 @@ wait_for_stream({xmlstreamerror, _}, StateData) ->
wait_for_stream(closed, StateData) ->
{stop, normal, StateData}.
wait_for_auth({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
when ?IS_STREAM_MGMT_TAG(Name) ->
fsm_next_state(wait_for_auth, (StateData#state.manage_stream)(El, StateData));
wait_for_auth({xmlstreamelement, El}, StateData) ->
case is_auth_packet(El) of
{auth, _ID, get, {U, _, _, _}} ->
@ -618,6 +680,9 @@ wait_for_auth({xmlstreamerror, _}, StateData) ->
wait_for_auth(closed, StateData) ->
{stop, normal, StateData}.
wait_for_feature_request({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
when ?IS_STREAM_MGMT_TAG(Name) ->
fsm_next_state(wait_for_feature_request, (StateData#state.manage_stream)(El, StateData));
wait_for_feature_request({xmlstreamelement, El},
StateData) ->
#xmlel{name = Name, attrs = Attrs, children = Els} = El,
@ -783,6 +848,9 @@ wait_for_feature_request({xmlstreamerror, _},
wait_for_feature_request(closed, StateData) ->
{stop, normal, StateData}.
wait_for_sasl_response({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
when ?IS_STREAM_MGMT_TAG(Name) ->
fsm_next_state(wait_for_sasl_response, (StateData#state.manage_stream)(El, StateData));
wait_for_sasl_response({xmlstreamelement, El},
StateData) ->
#xmlel{name = Name, attrs = Attrs, children = Els} = El,
@ -913,6 +981,9 @@ resource_conflict_action(U, S, R) ->
{accept_resource, Rnew}
end.
wait_for_bind({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
when ?IS_STREAM_MGMT_TAG(Name) ->
fsm_next_state(wait_for_bind, (StateData#state.manage_stream)(El, StateData));
wait_for_bind({xmlstreamelement, El}, StateData) ->
case jlib:iq_query_info(El) of
#iq{type = set, xmlns = ?NS_BIND, sub_el = SubEl} =
@ -974,61 +1045,65 @@ wait_for_bind({xmlstreamerror, _}, StateData) ->
wait_for_bind(closed, StateData) ->
{stop, normal, StateData}.
wait_for_session({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
when ?IS_STREAM_MGMT_TAG(Name) ->
fsm_next_state(wait_for_session, (StateData#state.manage_stream)(El, StateData));
wait_for_session({xmlstreamelement, El}, StateData) ->
NewStateData = update_num_stanzas_in(StateData, El),
case jlib:iq_query_info(El) of
#iq{type = set, xmlns = ?NS_SESSION} ->
U = StateData#state.user,
R = StateData#state.resource,
JID = StateData#state.jid,
case acl:match_rule(StateData#state.server,
StateData#state.access, JID) of
U = NewStateData#state.user,
R = NewStateData#state.resource,
JID = NewStateData#state.jid,
case acl:match_rule(NewStateData#state.server,
NewStateData#state.access, JID) of
allow ->
?INFO_MSG("(~w) Opened session for ~s",
[StateData#state.socket,
[NewStateData#state.socket,
jlib:jid_to_string(JID)]),
Res = jlib:make_result_iq_reply(El#xmlel{children = []}),
send_element(StateData, Res),
change_shaper(StateData, JID),
NewState = send_stanza(NewStateData, Res),
change_shaper(NewState, JID),
{Fs, Ts} = ejabberd_hooks:run_fold(
roster_get_subscription_lists,
StateData#state.server,
NewState#state.server,
{[], []},
[U, StateData#state.server]),
[U, NewState#state.server]),
LJID = jlib:jid_tolower(jlib:jid_remove_resource(JID)),
Fs1 = [LJID | Fs],
Ts1 = [LJID | Ts],
PrivList =
ejabberd_hooks:run_fold(
privacy_get_user_list, StateData#state.server,
privacy_get_user_list, NewState#state.server,
#userlist{},
[U, StateData#state.server]),
[U, NewState#state.server]),
SID = {now(), self()},
Conn = get_conn_type(StateData),
Info = [{ip, StateData#state.ip}, {conn, Conn},
{auth_module, StateData#state.auth_module}],
Conn = get_conn_type(NewState),
Info = [{ip, NewState#state.ip}, {conn, Conn},
{auth_module, NewState#state.auth_module}],
ejabberd_sm:open_session(
SID, U, StateData#state.server, R, Info),
NewStateData =
StateData#state{
SID, U, NewState#state.server, R, Info),
UpdatedStateData =
NewState#state{
sid = SID,
conn = Conn,
pres_f = ?SETS:from_list(Fs1),
pres_t = ?SETS:from_list(Ts1),
privacy_list = PrivList},
fsm_next_state_pack(session_established,
NewStateData);
UpdatedStateData);
_ ->
ejabberd_hooks:run(forbidden_session_hook,
StateData#state.server, [JID]),
NewStateData#state.server, [JID]),
?INFO_MSG("(~w) Forbidden session for ~s",
[StateData#state.socket,
[NewStateData#state.socket,
jlib:jid_to_string(JID)]),
Err = jlib:make_error_reply(El, ?ERR_NOT_ALLOWED),
send_element(StateData, Err),
fsm_next_state(wait_for_session, StateData)
send_element(NewStateData, Err),
fsm_next_state(wait_for_session, NewStateData)
end;
_ ->
fsm_next_state(wait_for_session, StateData)
fsm_next_state(wait_for_session, NewStateData)
end;
wait_for_session(timeout, StateData) ->
@ -1042,6 +1117,9 @@ wait_for_session({xmlstreamerror, _}, StateData) ->
wait_for_session(closed, StateData) ->
{stop, normal, StateData}.
session_established({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
when ?IS_STREAM_MGMT_TAG(Name) ->
fsm_next_state(session_established, (StateData#state.manage_stream)(El, StateData));
session_established({xmlstreamelement, El},
StateData) ->
FromJID = StateData#state.jid,
@ -1080,9 +1158,10 @@ session_established(closed, StateData) ->
%% connection)
session_established2(El, StateData) ->
#xmlel{name = Name, attrs = Attrs} = El,
User = StateData#state.user,
Server = StateData#state.server,
FromJID = StateData#state.jid,
NewStateData = update_num_stanzas_in(StateData, El),
User = NewStateData#state.user,
Server = NewStateData#state.server,
FromJID = NewStateData#state.jid,
To = xml:get_attr_s(<<"to">>, Attrs),
ToJID = case To of
<<"">> -> jlib:make_jid(User, Server, <<"">>);
@ -1091,7 +1170,7 @@ session_established2(El, StateData) ->
NewEl1 = jlib:remove_attr(<<"xmlns">>, El),
NewEl = case xml:get_attr_s(<<"xml:lang">>, Attrs) of
<<"">> ->
case StateData#state.lang of
case NewStateData#state.lang of
<<"">> -> NewEl1;
Lang ->
xml:replace_tag_attr(<<"xml:lang">>, Lang, NewEl1)
@ -1101,13 +1180,18 @@ session_established2(El, StateData) ->
NewState = case ToJID of
error ->
case xml:get_attr_s(<<"type">>, Attrs) of
<<"error">> -> StateData;
<<"result">> -> StateData;
<<"error">> -> NewStateData;
<<"result">> -> NewStateData;
_ ->
Err = jlib:make_error_reply(NewEl,
?ERR_JID_MALFORMED),
send_element(StateData, Err),
StateData
case is_stanza(Err) of
true ->
send_stanza(NewStateData, Err);
false ->
send_element(NewStateData, Err),
NewStateData
end
end;
_ ->
case Name of
@ -1122,12 +1206,12 @@ session_established2(El, StateData) ->
#jid{user = User, server = Server,
resource = <<"">>} ->
?DEBUG("presence_update(~p,~n\t~p,~n\t~p)",
[FromJID, PresenceEl, StateData]),
[FromJID, PresenceEl, NewStateData]),
presence_update(FromJID, PresenceEl,
StateData);
NewStateData);
_ ->
presence_track(FromJID, ToJID, PresenceEl,
StateData)
NewStateData)
end;
<<"iq">> ->
case jlib:iq_query_info(NewEl) of
@ -1135,21 +1219,21 @@ session_established2(El, StateData) ->
when Xmlns == (?NS_PRIVACY);
Xmlns == (?NS_BLOCKING) ->
process_privacy_iq(FromJID, ToJID, IQ,
StateData);
NewStateData);
_ ->
ejabberd_hooks:run(user_send_packet, Server,
[FromJID, ToJID, NewEl]),
check_privacy_route(FromJID, StateData,
check_privacy_route(FromJID, NewStateData,
FromJID, ToJID, NewEl),
StateData
NewStateData
end;
<<"message">> ->
ejabberd_hooks:run(user_send_packet, Server,
[FromJID, ToJID, NewEl]),
check_privacy_route(FromJID, StateData, FromJID,
check_privacy_route(FromJID, NewStateData, FromJID,
ToJID, NewEl),
StateData;
_ -> StateData
NewStateData;
_ -> NewStateData
end
end,
ejabberd_hooks:run(c2s_loop_debug,
@ -1263,13 +1347,13 @@ handle_info({route, _From, _To, {broadcast, Data}},
jlib:jid_remove_resource(StateData#state.jid),
StateData#state.jid,
jlib:iq_to_xml(PrivPushIQ)),
send_element(StateData, PrivPushEl),
NewState = send_stanza(StateData, PrivPushEl),
fsm_next_state(StateName,
StateData#state{privacy_list = NewPL})
NewState#state{privacy_list = NewPL})
end;
{blocking, What} ->
route_blocking(What, StateData),
fsm_next_state(StateName, StateData);
NewState = route_blocking(What, StateData),
fsm_next_state(StateName, NewState);
_ ->
fsm_next_state(StateName, StateData)
end;
@ -1515,12 +1599,12 @@ handle_info({route, From, To,
jlib:replace_from_to_attrs(jlib:jid_to_string(From),
jlib:jid_to_string(To), NewAttrs),
FixedPacket = #xmlel{name = Name, attrs = Attrs2, children = Els},
send_element(StateData, FixedPacket),
SentStateData = send_stanza(StateData, FixedPacket),
ejabberd_hooks:run(user_receive_packet,
StateData#state.server,
[StateData#state.jid, From, To, FixedPacket]),
SentStateData#state.server,
[SentStateData#state.jid, From, To, FixedPacket]),
ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
fsm_next_state(StateName, NewState);
fsm_next_state(StateName, SentStateData);
true ->
ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
fsm_next_state(StateName, NewState)
@ -1643,6 +1727,7 @@ terminate(_Reason, StateName, StateData) ->
StateData#state.pres_i, Packet)
end
end,
resend_unacked_stanzas(StateData),
bounce_messages();
_ ->
ok
@ -1674,6 +1759,13 @@ send_element(StateData, El) when StateData#state.xml_socket ->
send_element(StateData, El) ->
send_text(StateData, xml:element_to_binary(El)).
send_stanza(StateData, Stanza) when StateData#state.sm_xmlns /= undefined ->
send_stanza_and_ack_req(StateData, Stanza),
ack_queue_add(StateData, Stanza);
send_stanza(StateData, Stanza) ->
send_element(StateData, Stanza),
StateData.
send_header(StateData, Server, Version, Lang)
when StateData#state.xml_socket ->
VersionAttr = case Version of
@ -1727,6 +1819,19 @@ is_auth_packet(El) ->
_ -> false
end.
is_stanza(#xmlel{name = Name, attrs = Attrs}) when Name == <<"message">>;
Name == <<"presence">>;
Name == <<"iq">> ->
case xml:get_attr(<<"xmlns">>, Attrs) of
{value, NS} when NS /= <<"jabber:client">>,
NS /= <<"jabber:server">> ->
false;
_ ->
true
end;
is_stanza(_El) ->
false.
get_auth_tags([#xmlel{name = Name, children = Els} | L],
U, P, D, R) ->
CData = xml:get_cdata(Els),
@ -1854,12 +1959,12 @@ presence_update(From, Packet, StateData) ->
ejabberd_hooks:run(user_available_hook,
NewStateData#state.server,
[NewStateData#state.jid]),
if NewPriority >= 0 ->
resend_offline_messages(NewStateData),
resend_subscription_requests(NewStateData);
true -> ok
end,
presence_broadcast_first(From, NewStateData,
ResentStateData = if NewPriority >= 0 ->
resend_offline_messages(NewStateData),
resend_subscription_requests(NewStateData);
true -> NewStateData
end,
presence_broadcast_first(From, ResentStateData,
Packet);
true ->
presence_broadcast_to_trusted(NewStateData, From,
@ -2173,10 +2278,11 @@ resend_subscription_requests(#state{user = User,
PendingSubscriptions =
ejabberd_hooks:run_fold(resend_subscription_requests_hook,
Server, [], [User, Server]),
lists:foreach(fun (XMLPacket) ->
send_element(StateData, XMLPacket)
end,
PendingSubscriptions).
lists:foldl(fun (XMLPacket, AccStateData) ->
send_stanza(AccStateData, XMLPacket)
end,
StateData,
PendingSubscriptions).
get_showtag(undefined) -> <<"unavailable">>;
get_showtag(Presence) ->
@ -2352,10 +2458,185 @@ route_blocking(What, StateData) ->
PrivPushEl =
jlib:replace_from_to(jlib:jid_remove_resource(StateData#state.jid),
StateData#state.jid, jlib:iq_to_xml(PrivPushIQ)),
send_element(StateData, PrivPushEl),
%% No need to replace active privacy list here,
%% blocking pushes are always accompanied by
%% Privacy List pushes
send_stanza(StateData, PrivPushEl).
%%%----------------------------------------------------------------------
%%% XEP-0198
%%%----------------------------------------------------------------------
stream_mgmt_enabled(#state{ack_queue = none}) ->
false;
stream_mgmt_enabled(_StateData) ->
true.
negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) ->
%% XEP-0198 says: "For client-to-server connections, the client MUST NOT
%% attempt to enable stream management until after it has completed Resource
%% Binding". However, it also says: "Stream management errors SHOULD be
%% considered recoverable", so we won't bail out.
send_element(StateData, ?SM_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)),
StateData;
negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
case xml:get_attr_s(<<"xmlns">>, Attrs) of
Xmlns when ?IS_SUPPORTED_SM_XMLNS(Xmlns) ->
case stream_mgmt_enabled(StateData) of
true ->
case Name of
<<"enable">> ->
handle_enable(StateData#state{sm_xmlns = Xmlns});
_ ->
Res = if Name == <<"a">>;
Name == <<"r">> ->
?SM_UNEXPECTED_REQUEST(Xmlns);
true ->
?SM_BAD_REQUEST(Xmlns)
end,
send_element(StateData, Res),
StateData
end;
false ->
send_element(StateData, ?SM_SERVICE_UNAVAILABLE(Xmlns)),
StateData
end;
_ ->
send_element(StateData, ?SM_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3)),
StateData
end.
perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
case xml:get_attr_s(<<"xmlns">>, Attrs) of
Xmlns when Xmlns == StateData#state.sm_xmlns ->
case Name of
<<"r">> ->
handle_r(StateData);
<<"a">> ->
handle_a(StateData, Attrs);
_ ->
Res = if Name == <<"enable">> ->
?SM_UNEXPECTED_REQUEST(Xmlns);
true ->
?SM_BAD_REQUEST(Xmlns)
end,
send_element(StateData, Res),
StateData
end;
_ ->
send_element(StateData,
?SM_UNSUPPORTED_VERSION(StateData#state.sm_xmlns)),
StateData
end.
handle_enable(StateData) ->
?INFO_MSG("Enabling XEP-0198 stream management for ~s",
[jlib:jid_to_string(StateData#state.jid)]),
Res = #xmlel{name = <<"enabled">>,
attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}],
children = []},
send_element(StateData, Res),
StateData#state{ack_queue = queue:new(),
manage_stream = fun perform_stream_mgmt/2}.
handle_r(StateData) ->
H = jlib:integer_to_binary(StateData#state.n_stanzas_in),
Res = #xmlel{name = <<"a">>,
attrs = [{<<"xmlns">>, StateData#state.sm_xmlns},
{<<"h">>, H}],
children = []},
send_element(StateData, Res),
StateData.
handle_a(#state{jid = JID, n_stanzas_out = NumStanzasOut} = StateData, Attrs) ->
case catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs)) of
H when is_integer(H), H > NumStanzasOut ->
?WARNING_MSG("~s acknowledged ~B stanzas, but only ~B were sent",
[jlib:jid_to_string(JID), H, NumStanzasOut]),
ack_queue_drop(StateData, NumStanzasOut);
H when is_integer(H), H >= 0 ->
?DEBUG("~s acknowledged ~B of ~B stanzas",
[jlib:jid_to_string(JID), H, NumStanzasOut]),
ack_queue_drop(StateData, H);
_ ->
?WARNING_MSG("Ignoring invalid ACK element from ~s",
[jlib:jid_to_string(JID)]),
StateData
end.
update_num_stanzas_in(StateData, El) when StateData#state.sm_xmlns /= undefined ->
NewNum = case {is_stanza(El), StateData#state.n_stanzas_in} of
{true, 4294967295} ->
0;
{true, Num} ->
Num + 1;
{false, Num} ->
Num
end,
StateData#state{n_stanzas_in = NewNum};
update_num_stanzas_in(StateData, _El) ->
StateData.
send_stanza_and_ack_req(StateData, Stanza) ->
AckReq = #xmlel{name = <<"r">>,
attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}],
children = []},
StanzaS = xml:element_to_binary(Stanza),
AckReqS = xml:element_to_binary(AckReq),
send_text(StateData, [StanzaS, AckReqS]).
ack_queue_add(StateData, El) ->
NewNum = case StateData#state.n_stanzas_out of
4294967295 ->
0;
Num ->
Num + 1
end,
NewState = limit_queue_length(StateData),
NewQueue = queue:in({NewNum, El}, NewState#state.ack_queue),
NewState#state{ack_queue = NewQueue, n_stanzas_out = NewNum}.
ack_queue_drop(StateData, NumHandled) ->
NewQueue = jlib:queue_drop_while(fun({N, _Stanza}) -> N =< NumHandled end,
StateData#state.ack_queue),
StateData#state{ack_queue = NewQueue}.
limit_queue_length(#state{max_ack_queue = Limit} = StateData)
when Limit == infinity;
Limit == unlimited ->
StateData;
limit_queue_length(#state{jid = JID,
ack_queue = Queue,
max_ack_queue = Limit} = StateData) ->
case queue:len(Queue) >= Limit of
true ->
?WARNING_MSG("Dropping stanza from too long ACK queue for ~s",
[jlib:jid_to_string(JID)]),
limit_queue_length(StateData#state{ack_queue = queue:drop(Queue)});
false ->
StateData
end.
resend_unacked_stanzas(StateData) when StateData#state.sm_xmlns /= undefined ->
Queue = StateData#state.ack_queue,
case queue:len(Queue) of
0 ->
ok;
N ->
?INFO_MSG("Resending ~B unacknowledged stanzas to ~s",
[N, jlib:jid_to_string(StateData#state.jid)]),
lists:foreach(
fun({Num, #xmlel{attrs = Attrs} = El}) ->
From_s = xml:get_attr_s(<<"from">>, Attrs),
From = jlib:string_to_jid(From_s),
To_s = xml:get_attr_s(<<"to">>, Attrs),
To = jlib:string_to_jid(To_s),
?DEBUG("Resending unacknowledged stanza #~B from ~s to ~s",
[Num, From_s, To_s]),
ejabberd_router:route(From, To, El)
end, queue:to_list(Queue))
end;
resend_unacked_stanzas(_StateData) ->
ok.
%%%----------------------------------------------------------------------

View File

@ -51,7 +51,7 @@
binary_to_integer/1, binary_to_integer/2,
integer_to_binary/1, integer_to_binary/2,
atom_to_binary/1, binary_to_atom/1, tuple_to_binary/1,
l2i/1, i2l/1, i2l/2]).
l2i/1, i2l/1, i2l/2, queue_drop_while/2]).
%% TODO: Remove once XEP-0091 is Obsolete
%% TODO: Remove once XEP-0091 is Obsolete
@ -894,3 +894,18 @@ i2l(L, N) when is_binary(L) ->
C when C > N -> L;
_ -> i2l(<<$0, L/binary>>, N)
end.
-spec queue_drop_while(fun((term()) -> boolean()), queue()) -> queue().
queue_drop_while(F, Q) ->
case queue:peek(Q) of
{value, Item} ->
case F(Item) of
true ->
queue_drop_while(F, queue:drop(Q));
_ ->
Q
end;
empty ->
Q
end.