From 7d594086c3601c3281473b4ca54714b714027019 Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Wed, 12 Mar 2014 23:34:14 +0100 Subject: [PATCH] Add initial XEP-0198 support (EJAB-532) Implement partial support for XEP-0198: Stream Management. After successful negotiation of this feature, the server requests an ACK for each stanza transmitted to the client and responds to ACK requests issued by the client. On session termination, the server re-routes any unacknowledged stanzas. The length of the pending queue can be limited by setting the "max_ack_queue" option to some integer value (default: 500). XEP-0198 support can be disabled entirely by setting the "stream_management" option to false (default: true). So far, stream management is implemented only for c2s connections, and the optional stream resumption feature also described in XEP-0198 is not (yet) supported. This addition was originally based on a patch provided by Magnus Henoch and updated by Grzegorz Grasza. Their code implements an early draft of XEP-0198 for some previous version of ejabberd. It has since been rewritten almost entirely. --- doc/guide.tex | 16 +- include/ns.hrl | 2 + src/ejabberd_c2s.erl | 403 ++++++++++++++++++++++++++++++++++++------- src/jlib.erl | 17 +- 4 files changed, 374 insertions(+), 64 deletions(-) diff --git a/doc/guide.tex b/doc/guide.tex index 5d5bf2fdc..59f39e256 100644 --- a/doc/guide.tex +++ b/doc/guide.tex @@ -870,9 +870,10 @@ The available modules, their purpose and the options allowed by each one are: \titem{\texttt{ejabberd\_c2s}} Handles c2s connections.\\ Options: \texttt{access}, \texttt{certfile}, \texttt{ciphers}, - \texttt{max\_fsm\_queue}, + \texttt{max\_ack\_queue}, \texttt{max\_fsm\_queue}, \texttt{max\_stanza\_size}, \texttt{shaper}, - \texttt{starttls}, \texttt{starttls\_required}, \texttt{tls}, + \texttt{starttls}, \texttt{starttls\_required}, + \texttt{stream\_management}, \texttt{tls}, \texttt{zlib}, \texttt{tls\_compression} \titem{\texttt{ejabberd\_s2s\_in}} Handles incoming s2s connections.\\ @@ -962,6 +963,13 @@ This is a detailed description of each option allowed by the listening modules: \term{http\_poll\_timeout}. The default value is five minutes. The option can be defined in \term{ejabberd.yml}, expressing the time in seconds: \verb|{http_poll_timeout, 300}.| + \titem{max\_ack\_queue: Size} + This option specifies the maximum number of unacknowledged stanzas + queued for possible retransmission if \term{stream\_management} is + enabled. When the limit is reached, the first stanza is dropped from + the queue before adding the next one. This option can be specified + for \term{ejabberd\_c2s} listeners. The allowed values are positive + integers and \term{infinity}. Default value: \term{500}. \titem{max\_fsm\_queue: Size} This option specifies the maximum number of elements in the queue of the FSM (Finite State Machine). @@ -1022,6 +1030,10 @@ request_handlers: No unencrypted connections will be allowed. You should also set the \option{certfile} option. You can define a certificate file for a specific domain using the global option \option{domain\_certfile}. + \titem{stream\_management: true|false} + Setting this option to \term{false} disables ejabberd's support for + \ind{protocols!XEP-0198: Stream Management}. It can be specified for + \term{ejabberd\_c2s} listeners. The default value is \term{true}. \titem{timeout: Integer} \ind{options!timeout} Timeout of the connections, expressed in milliseconds. Default: 5000 diff --git a/include/ns.hrl b/include/ns.hrl index 6d041a478..2cf218c7c 100644 --- a/include/ns.hrl +++ b/include/ns.hrl @@ -144,3 +144,5 @@ -define(NS_MEDIA, <<"urn:xmpp:media-element">>). -define(NS_BOB, <<"urn:xmpp:bob">>). -define(NS_PING, <<"urn:xmpp:ping">>). +-define(NS_STREAM_MGMT_2, <<"urn:xmpp:sm:2">>). +-define(NS_STREAM_MGMT_3, <<"urn:xmpp:sm:3">>). diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index ae5fc97b8..044afd0f4 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -108,6 +108,12 @@ auth_module = unknown, ip, aux_fields = [], + sm_xmlns, + ack_queue, + max_ack_queue, + manage_stream = fun negotiate_stream_mgmt/2, + n_stanzas_in = 0, + n_stanzas_out = 0, lang}). %-define(DBGFSM, true). @@ -156,6 +162,35 @@ -define(INVALID_FROM, ?SERR_INVALID_FROM). +%% XEP-0198: + +-define(IS_STREAM_MGMT_TAG(Name), + Name == <<"enable">>; + Name == <<"a">>; + Name == <<"r">>). + +-define(IS_SUPPORTED_SM_XMLNS(Xmlns), + Xmlns == ?NS_STREAM_MGMT_2; + Xmlns == ?NS_STREAM_MGMT_3). + +-define(SM_FAILED(Condition, Xmlns), + #xmlel{name = <<"failed">>, + attrs = [{<<"xmlns">>, Xmlns}], + children = [#xmlel{name = Condition, + attrs = [{<<"xmlns">>, ?NS_STANZAS}], + children = []}]}). + +-define(SM_BAD_REQUEST(Xmlns), + ?SM_FAILED(<<"bad-request">>, Xmlns)). + +-define(SM_SERVICE_UNAVAILABLE(Xmlns), + ?SM_FAILED(<<"service-unavailable">>, Xmlns)). + +-define(SM_UNEXPECTED_REQUEST(Xmlns), + ?SM_FAILED(<<"unexpected-request">>, Xmlns)). + +-define(SM_UNSUPPORTED_VERSION(Xmlns), + ?SM_FAILED(<<"unsupported-version">>, Xmlns)). %%%---------------------------------------------------------------------- %%% API @@ -250,6 +285,16 @@ init([{SockMod, Socket}, Opts]) -> true -> TLSOpts1 end, TLSOpts = [verify_none | TLSOpts2], + MaxAckQueue = case proplists:get_value(max_ack_queue, Opts) of + Limit when is_integer(Limit), Limit > 0 -> Limit; + _ -> 500 + end, + StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true), + AckQueue = if not StreamMgmtEnabled -> + none; + true -> + undefined + end, IP = peerip(SockMod, Socket), %% Check if IP is blacklisted: case is_ip_blacklisted(IP) of @@ -271,7 +316,8 @@ init([{SockMod, Socket}, Opts]) -> tls_required = StartTLSRequired, tls_enabled = TLSEnabled, tls_options = TLSOpts, streamid = new_id(), access = Access, - shaper = Shaper, ip = IP}, + shaper = Shaper, ip = IP, + ack_queue = AckQueue, max_ack_queue = MaxAckQueue}, {ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT} end. @@ -402,6 +448,18 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) -> ejabberd_hooks:run_fold(roster_get_versioning_feature, Server, [], [Server]), + StreamManagementFeature = + case stream_mgmt_enabled(StateData) of + true -> + [#xmlel{name = <<"sm">>, + attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_2}], + children = []}, + #xmlel{name = <<"sm">>, + attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_3}], + children = []}]; + false -> + [] + end, StreamFeatures = [#xmlel{name = <<"bind">>, attrs = [{<<"xmlns">>, ?NS_BIND}], children = []}, @@ -410,6 +468,7 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) -> children = []}] ++ RosterVersioningFeature ++ + StreamManagementFeature ++ ejabberd_hooks:run_fold(c2s_stream_features, Server, [], [Server]), send_element(StateData, @@ -472,6 +531,9 @@ wait_for_stream({xmlstreamerror, _}, StateData) -> wait_for_stream(closed, StateData) -> {stop, normal, StateData}. +wait_for_auth({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_auth, (StateData#state.manage_stream)(El, StateData)); wait_for_auth({xmlstreamelement, El}, StateData) -> case is_auth_packet(El) of {auth, _ID, get, {U, _, _, _}} -> @@ -618,6 +680,9 @@ wait_for_auth({xmlstreamerror, _}, StateData) -> wait_for_auth(closed, StateData) -> {stop, normal, StateData}. +wait_for_feature_request({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_feature_request, (StateData#state.manage_stream)(El, StateData)); wait_for_feature_request({xmlstreamelement, El}, StateData) -> #xmlel{name = Name, attrs = Attrs, children = Els} = El, @@ -783,6 +848,9 @@ wait_for_feature_request({xmlstreamerror, _}, wait_for_feature_request(closed, StateData) -> {stop, normal, StateData}. +wait_for_sasl_response({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_sasl_response, (StateData#state.manage_stream)(El, StateData)); wait_for_sasl_response({xmlstreamelement, El}, StateData) -> #xmlel{name = Name, attrs = Attrs, children = Els} = El, @@ -913,6 +981,9 @@ resource_conflict_action(U, S, R) -> {accept_resource, Rnew} end. +wait_for_bind({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_bind, (StateData#state.manage_stream)(El, StateData)); wait_for_bind({xmlstreamelement, El}, StateData) -> case jlib:iq_query_info(El) of #iq{type = set, xmlns = ?NS_BIND, sub_el = SubEl} = @@ -974,61 +1045,65 @@ wait_for_bind({xmlstreamerror, _}, StateData) -> wait_for_bind(closed, StateData) -> {stop, normal, StateData}. +wait_for_session({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_session, (StateData#state.manage_stream)(El, StateData)); wait_for_session({xmlstreamelement, El}, StateData) -> + NewStateData = update_num_stanzas_in(StateData, El), case jlib:iq_query_info(El) of #iq{type = set, xmlns = ?NS_SESSION} -> - U = StateData#state.user, - R = StateData#state.resource, - JID = StateData#state.jid, - case acl:match_rule(StateData#state.server, - StateData#state.access, JID) of + U = NewStateData#state.user, + R = NewStateData#state.resource, + JID = NewStateData#state.jid, + case acl:match_rule(NewStateData#state.server, + NewStateData#state.access, JID) of allow -> ?INFO_MSG("(~w) Opened session for ~s", - [StateData#state.socket, + [NewStateData#state.socket, jlib:jid_to_string(JID)]), Res = jlib:make_result_iq_reply(El#xmlel{children = []}), - send_element(StateData, Res), - change_shaper(StateData, JID), + NewState = send_stanza(NewStateData, Res), + change_shaper(NewState, JID), {Fs, Ts} = ejabberd_hooks:run_fold( roster_get_subscription_lists, - StateData#state.server, + NewState#state.server, {[], []}, - [U, StateData#state.server]), + [U, NewState#state.server]), LJID = jlib:jid_tolower(jlib:jid_remove_resource(JID)), Fs1 = [LJID | Fs], Ts1 = [LJID | Ts], PrivList = ejabberd_hooks:run_fold( - privacy_get_user_list, StateData#state.server, + privacy_get_user_list, NewState#state.server, #userlist{}, - [U, StateData#state.server]), + [U, NewState#state.server]), SID = {now(), self()}, - Conn = get_conn_type(StateData), - Info = [{ip, StateData#state.ip}, {conn, Conn}, - {auth_module, StateData#state.auth_module}], + Conn = get_conn_type(NewState), + Info = [{ip, NewState#state.ip}, {conn, Conn}, + {auth_module, NewState#state.auth_module}], ejabberd_sm:open_session( - SID, U, StateData#state.server, R, Info), - NewStateData = - StateData#state{ + SID, U, NewState#state.server, R, Info), + UpdatedStateData = + NewState#state{ sid = SID, conn = Conn, pres_f = ?SETS:from_list(Fs1), pres_t = ?SETS:from_list(Ts1), privacy_list = PrivList}, fsm_next_state_pack(session_established, - NewStateData); + UpdatedStateData); _ -> ejabberd_hooks:run(forbidden_session_hook, - StateData#state.server, [JID]), + NewStateData#state.server, [JID]), ?INFO_MSG("(~w) Forbidden session for ~s", - [StateData#state.socket, + [NewStateData#state.socket, jlib:jid_to_string(JID)]), Err = jlib:make_error_reply(El, ?ERR_NOT_ALLOWED), - send_element(StateData, Err), - fsm_next_state(wait_for_session, StateData) + send_element(NewStateData, Err), + fsm_next_state(wait_for_session, NewStateData) end; _ -> - fsm_next_state(wait_for_session, StateData) + fsm_next_state(wait_for_session, NewStateData) end; wait_for_session(timeout, StateData) -> @@ -1042,6 +1117,9 @@ wait_for_session({xmlstreamerror, _}, StateData) -> wait_for_session(closed, StateData) -> {stop, normal, StateData}. +session_established({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(session_established, (StateData#state.manage_stream)(El, StateData)); session_established({xmlstreamelement, El}, StateData) -> FromJID = StateData#state.jid, @@ -1080,9 +1158,10 @@ session_established(closed, StateData) -> %% connection) session_established2(El, StateData) -> #xmlel{name = Name, attrs = Attrs} = El, - User = StateData#state.user, - Server = StateData#state.server, - FromJID = StateData#state.jid, + NewStateData = update_num_stanzas_in(StateData, El), + User = NewStateData#state.user, + Server = NewStateData#state.server, + FromJID = NewStateData#state.jid, To = xml:get_attr_s(<<"to">>, Attrs), ToJID = case To of <<"">> -> jlib:make_jid(User, Server, <<"">>); @@ -1091,7 +1170,7 @@ session_established2(El, StateData) -> NewEl1 = jlib:remove_attr(<<"xmlns">>, El), NewEl = case xml:get_attr_s(<<"xml:lang">>, Attrs) of <<"">> -> - case StateData#state.lang of + case NewStateData#state.lang of <<"">> -> NewEl1; Lang -> xml:replace_tag_attr(<<"xml:lang">>, Lang, NewEl1) @@ -1101,13 +1180,18 @@ session_established2(El, StateData) -> NewState = case ToJID of error -> case xml:get_attr_s(<<"type">>, Attrs) of - <<"error">> -> StateData; - <<"result">> -> StateData; + <<"error">> -> NewStateData; + <<"result">> -> NewStateData; _ -> Err = jlib:make_error_reply(NewEl, ?ERR_JID_MALFORMED), - send_element(StateData, Err), - StateData + case is_stanza(Err) of + true -> + send_stanza(NewStateData, Err); + false -> + send_element(NewStateData, Err), + NewStateData + end end; _ -> case Name of @@ -1122,12 +1206,12 @@ session_established2(El, StateData) -> #jid{user = User, server = Server, resource = <<"">>} -> ?DEBUG("presence_update(~p,~n\t~p,~n\t~p)", - [FromJID, PresenceEl, StateData]), + [FromJID, PresenceEl, NewStateData]), presence_update(FromJID, PresenceEl, - StateData); + NewStateData); _ -> presence_track(FromJID, ToJID, PresenceEl, - StateData) + NewStateData) end; <<"iq">> -> case jlib:iq_query_info(NewEl) of @@ -1135,21 +1219,21 @@ session_established2(El, StateData) -> when Xmlns == (?NS_PRIVACY); Xmlns == (?NS_BLOCKING) -> process_privacy_iq(FromJID, ToJID, IQ, - StateData); + NewStateData); _ -> ejabberd_hooks:run(user_send_packet, Server, [FromJID, ToJID, NewEl]), - check_privacy_route(FromJID, StateData, + check_privacy_route(FromJID, NewStateData, FromJID, ToJID, NewEl), - StateData + NewStateData end; <<"message">> -> ejabberd_hooks:run(user_send_packet, Server, [FromJID, ToJID, NewEl]), - check_privacy_route(FromJID, StateData, FromJID, + check_privacy_route(FromJID, NewStateData, FromJID, ToJID, NewEl), - StateData; - _ -> StateData + NewStateData; + _ -> NewStateData end end, ejabberd_hooks:run(c2s_loop_debug, @@ -1263,13 +1347,13 @@ handle_info({route, _From, _To, {broadcast, Data}}, jlib:jid_remove_resource(StateData#state.jid), StateData#state.jid, jlib:iq_to_xml(PrivPushIQ)), - send_element(StateData, PrivPushEl), + NewState = send_stanza(StateData, PrivPushEl), fsm_next_state(StateName, - StateData#state{privacy_list = NewPL}) + NewState#state{privacy_list = NewPL}) end; {blocking, What} -> - route_blocking(What, StateData), - fsm_next_state(StateName, StateData); + NewState = route_blocking(What, StateData), + fsm_next_state(StateName, NewState); _ -> fsm_next_state(StateName, StateData) end; @@ -1515,12 +1599,12 @@ handle_info({route, From, To, jlib:replace_from_to_attrs(jlib:jid_to_string(From), jlib:jid_to_string(To), NewAttrs), FixedPacket = #xmlel{name = Name, attrs = Attrs2, children = Els}, - send_element(StateData, FixedPacket), + SentStateData = send_stanza(StateData, FixedPacket), ejabberd_hooks:run(user_receive_packet, - StateData#state.server, - [StateData#state.jid, From, To, FixedPacket]), + SentStateData#state.server, + [SentStateData#state.jid, From, To, FixedPacket]), ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), - fsm_next_state(StateName, NewState); + fsm_next_state(StateName, SentStateData); true -> ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), fsm_next_state(StateName, NewState) @@ -1643,6 +1727,7 @@ terminate(_Reason, StateName, StateData) -> StateData#state.pres_i, Packet) end end, + resend_unacked_stanzas(StateData), bounce_messages(); _ -> ok @@ -1674,6 +1759,13 @@ send_element(StateData, El) when StateData#state.xml_socket -> send_element(StateData, El) -> send_text(StateData, xml:element_to_binary(El)). +send_stanza(StateData, Stanza) when StateData#state.sm_xmlns /= undefined -> + send_stanza_and_ack_req(StateData, Stanza), + ack_queue_add(StateData, Stanza); +send_stanza(StateData, Stanza) -> + send_element(StateData, Stanza), + StateData. + send_header(StateData, Server, Version, Lang) when StateData#state.xml_socket -> VersionAttr = case Version of @@ -1727,6 +1819,19 @@ is_auth_packet(El) -> _ -> false end. +is_stanza(#xmlel{name = Name, attrs = Attrs}) when Name == <<"message">>; + Name == <<"presence">>; + Name == <<"iq">> -> + case xml:get_attr(<<"xmlns">>, Attrs) of + {value, NS} when NS /= <<"jabber:client">>, + NS /= <<"jabber:server">> -> + false; + _ -> + true + end; +is_stanza(_El) -> + false. + get_auth_tags([#xmlel{name = Name, children = Els} | L], U, P, D, R) -> CData = xml:get_cdata(Els), @@ -1854,12 +1959,12 @@ presence_update(From, Packet, StateData) -> ejabberd_hooks:run(user_available_hook, NewStateData#state.server, [NewStateData#state.jid]), - if NewPriority >= 0 -> - resend_offline_messages(NewStateData), - resend_subscription_requests(NewStateData); - true -> ok - end, - presence_broadcast_first(From, NewStateData, + ResentStateData = if NewPriority >= 0 -> + resend_offline_messages(NewStateData), + resend_subscription_requests(NewStateData); + true -> NewStateData + end, + presence_broadcast_first(From, ResentStateData, Packet); true -> presence_broadcast_to_trusted(NewStateData, From, @@ -2173,10 +2278,11 @@ resend_subscription_requests(#state{user = User, PendingSubscriptions = ejabberd_hooks:run_fold(resend_subscription_requests_hook, Server, [], [User, Server]), - lists:foreach(fun (XMLPacket) -> - send_element(StateData, XMLPacket) - end, - PendingSubscriptions). + lists:foldl(fun (XMLPacket, AccStateData) -> + send_stanza(AccStateData, XMLPacket) + end, + StateData, + PendingSubscriptions). get_showtag(undefined) -> <<"unavailable">>; get_showtag(Presence) -> @@ -2352,10 +2458,185 @@ route_blocking(What, StateData) -> PrivPushEl = jlib:replace_from_to(jlib:jid_remove_resource(StateData#state.jid), StateData#state.jid, jlib:iq_to_xml(PrivPushIQ)), - send_element(StateData, PrivPushEl), %% No need to replace active privacy list here, %% blocking pushes are always accompanied by %% Privacy List pushes + send_stanza(StateData, PrivPushEl). + +%%%---------------------------------------------------------------------- +%%% XEP-0198 +%%%---------------------------------------------------------------------- + +stream_mgmt_enabled(#state{ack_queue = none}) -> + false; +stream_mgmt_enabled(_StateData) -> + true. + +negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) -> + %% XEP-0198 says: "For client-to-server connections, the client MUST NOT + %% attempt to enable stream management until after it has completed Resource + %% Binding". However, it also says: "Stream management errors SHOULD be + %% considered recoverable", so we won't bail out. + send_element(StateData, ?SM_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)), + StateData; +negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> + case xml:get_attr_s(<<"xmlns">>, Attrs) of + Xmlns when ?IS_SUPPORTED_SM_XMLNS(Xmlns) -> + case stream_mgmt_enabled(StateData) of + true -> + case Name of + <<"enable">> -> + handle_enable(StateData#state{sm_xmlns = Xmlns}); + _ -> + Res = if Name == <<"a">>; + Name == <<"r">> -> + ?SM_UNEXPECTED_REQUEST(Xmlns); + true -> + ?SM_BAD_REQUEST(Xmlns) + end, + send_element(StateData, Res), + StateData + end; + false -> + send_element(StateData, ?SM_SERVICE_UNAVAILABLE(Xmlns)), + StateData + end; + _ -> + send_element(StateData, ?SM_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3)), + StateData + end. + +perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> + case xml:get_attr_s(<<"xmlns">>, Attrs) of + Xmlns when Xmlns == StateData#state.sm_xmlns -> + case Name of + <<"r">> -> + handle_r(StateData); + <<"a">> -> + handle_a(StateData, Attrs); + _ -> + Res = if Name == <<"enable">> -> + ?SM_UNEXPECTED_REQUEST(Xmlns); + true -> + ?SM_BAD_REQUEST(Xmlns) + end, + send_element(StateData, Res), + StateData + end; + _ -> + send_element(StateData, + ?SM_UNSUPPORTED_VERSION(StateData#state.sm_xmlns)), + StateData + end. + +handle_enable(StateData) -> + ?INFO_MSG("Enabling XEP-0198 stream management for ~s", + [jlib:jid_to_string(StateData#state.jid)]), + Res = #xmlel{name = <<"enabled">>, + attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}], + children = []}, + send_element(StateData, Res), + StateData#state{ack_queue = queue:new(), + manage_stream = fun perform_stream_mgmt/2}. + +handle_r(StateData) -> + H = jlib:integer_to_binary(StateData#state.n_stanzas_in), + Res = #xmlel{name = <<"a">>, + attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}, + {<<"h">>, H}], + children = []}, + send_element(StateData, Res), + StateData. + +handle_a(#state{jid = JID, n_stanzas_out = NumStanzasOut} = StateData, Attrs) -> + case catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs)) of + H when is_integer(H), H > NumStanzasOut -> + ?WARNING_MSG("~s acknowledged ~B stanzas, but only ~B were sent", + [jlib:jid_to_string(JID), H, NumStanzasOut]), + ack_queue_drop(StateData, NumStanzasOut); + H when is_integer(H), H >= 0 -> + ?DEBUG("~s acknowledged ~B of ~B stanzas", + [jlib:jid_to_string(JID), H, NumStanzasOut]), + ack_queue_drop(StateData, H); + _ -> + ?WARNING_MSG("Ignoring invalid ACK element from ~s", + [jlib:jid_to_string(JID)]), + StateData + end. + +update_num_stanzas_in(StateData, El) when StateData#state.sm_xmlns /= undefined -> + NewNum = case {is_stanza(El), StateData#state.n_stanzas_in} of + {true, 4294967295} -> + 0; + {true, Num} -> + Num + 1; + {false, Num} -> + Num + end, + StateData#state{n_stanzas_in = NewNum}; +update_num_stanzas_in(StateData, _El) -> + StateData. + +send_stanza_and_ack_req(StateData, Stanza) -> + AckReq = #xmlel{name = <<"r">>, + attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}], + children = []}, + StanzaS = xml:element_to_binary(Stanza), + AckReqS = xml:element_to_binary(AckReq), + send_text(StateData, [StanzaS, AckReqS]). + +ack_queue_add(StateData, El) -> + NewNum = case StateData#state.n_stanzas_out of + 4294967295 -> + 0; + Num -> + Num + 1 + end, + NewState = limit_queue_length(StateData), + NewQueue = queue:in({NewNum, El}, NewState#state.ack_queue), + NewState#state{ack_queue = NewQueue, n_stanzas_out = NewNum}. + +ack_queue_drop(StateData, NumHandled) -> + NewQueue = jlib:queue_drop_while(fun({N, _Stanza}) -> N =< NumHandled end, + StateData#state.ack_queue), + StateData#state{ack_queue = NewQueue}. + +limit_queue_length(#state{max_ack_queue = Limit} = StateData) + when Limit == infinity; + Limit == unlimited -> + StateData; +limit_queue_length(#state{jid = JID, + ack_queue = Queue, + max_ack_queue = Limit} = StateData) -> + case queue:len(Queue) >= Limit of + true -> + ?WARNING_MSG("Dropping stanza from too long ACK queue for ~s", + [jlib:jid_to_string(JID)]), + limit_queue_length(StateData#state{ack_queue = queue:drop(Queue)}); + false -> + StateData + end. + +resend_unacked_stanzas(StateData) when StateData#state.sm_xmlns /= undefined -> + Queue = StateData#state.ack_queue, + case queue:len(Queue) of + 0 -> + ok; + N -> + ?INFO_MSG("Resending ~B unacknowledged stanzas to ~s", + [N, jlib:jid_to_string(StateData#state.jid)]), + lists:foreach( + fun({Num, #xmlel{attrs = Attrs} = El}) -> + From_s = xml:get_attr_s(<<"from">>, Attrs), + From = jlib:string_to_jid(From_s), + To_s = xml:get_attr_s(<<"to">>, Attrs), + To = jlib:string_to_jid(To_s), + ?DEBUG("Resending unacknowledged stanza #~B from ~s to ~s", + [Num, From_s, To_s]), + ejabberd_router:route(From, To, El) + end, queue:to_list(Queue)) + end; +resend_unacked_stanzas(_StateData) -> ok. %%%---------------------------------------------------------------------- diff --git a/src/jlib.erl b/src/jlib.erl index 46e864b0c..a362697f4 100644 --- a/src/jlib.erl +++ b/src/jlib.erl @@ -51,7 +51,7 @@ binary_to_integer/1, binary_to_integer/2, integer_to_binary/1, integer_to_binary/2, atom_to_binary/1, binary_to_atom/1, tuple_to_binary/1, - l2i/1, i2l/1, i2l/2]). + l2i/1, i2l/1, i2l/2, queue_drop_while/2]). %% TODO: Remove once XEP-0091 is Obsolete %% TODO: Remove once XEP-0091 is Obsolete @@ -894,3 +894,18 @@ i2l(L, N) when is_binary(L) -> C when C > N -> L; _ -> i2l(<<$0, L/binary>>, N) end. + +-spec queue_drop_while(fun((term()) -> boolean()), queue()) -> queue(). + +queue_drop_while(F, Q) -> + case queue:peek(Q) of + {value, Item} -> + case F(Item) of + true -> + queue_drop_while(F, queue:drop(Q)); + _ -> + Q + end; + empty -> + Q + end.