diff --git a/doc/guide.tex b/doc/guide.tex index 5d5bf2fdc..59f39e256 100644 --- a/doc/guide.tex +++ b/doc/guide.tex @@ -870,9 +870,10 @@ The available modules, their purpose and the options allowed by each one are: \titem{\texttt{ejabberd\_c2s}} Handles c2s connections.\\ Options: \texttt{access}, \texttt{certfile}, \texttt{ciphers}, - \texttt{max\_fsm\_queue}, + \texttt{max\_ack\_queue}, \texttt{max\_fsm\_queue}, \texttt{max\_stanza\_size}, \texttt{shaper}, - \texttt{starttls}, \texttt{starttls\_required}, \texttt{tls}, + \texttt{starttls}, \texttt{starttls\_required}, + \texttt{stream\_management}, \texttt{tls}, \texttt{zlib}, \texttt{tls\_compression} \titem{\texttt{ejabberd\_s2s\_in}} Handles incoming s2s connections.\\ @@ -962,6 +963,13 @@ This is a detailed description of each option allowed by the listening modules: \term{http\_poll\_timeout}. The default value is five minutes. The option can be defined in \term{ejabberd.yml}, expressing the time in seconds: \verb|{http_poll_timeout, 300}.| + \titem{max\_ack\_queue: Size} + This option specifies the maximum number of unacknowledged stanzas + queued for possible retransmission if \term{stream\_management} is + enabled. When the limit is reached, the first stanza is dropped from + the queue before adding the next one. This option can be specified + for \term{ejabberd\_c2s} listeners. The allowed values are positive + integers and \term{infinity}. Default value: \term{500}. \titem{max\_fsm\_queue: Size} This option specifies the maximum number of elements in the queue of the FSM (Finite State Machine). @@ -1022,6 +1030,10 @@ request_handlers: No unencrypted connections will be allowed. You should also set the \option{certfile} option. You can define a certificate file for a specific domain using the global option \option{domain\_certfile}. + \titem{stream\_management: true|false} + Setting this option to \term{false} disables ejabberd's support for + \ind{protocols!XEP-0198: Stream Management}. It can be specified for + \term{ejabberd\_c2s} listeners. The default value is \term{true}. \titem{timeout: Integer} \ind{options!timeout} Timeout of the connections, expressed in milliseconds. Default: 5000 diff --git a/include/ns.hrl b/include/ns.hrl index 6d041a478..2cf218c7c 100644 --- a/include/ns.hrl +++ b/include/ns.hrl @@ -144,3 +144,5 @@ -define(NS_MEDIA, <<"urn:xmpp:media-element">>). -define(NS_BOB, <<"urn:xmpp:bob">>). -define(NS_PING, <<"urn:xmpp:ping">>). +-define(NS_STREAM_MGMT_2, <<"urn:xmpp:sm:2">>). +-define(NS_STREAM_MGMT_3, <<"urn:xmpp:sm:3">>). diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index ae5fc97b8..044afd0f4 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -108,6 +108,12 @@ auth_module = unknown, ip, aux_fields = [], + sm_xmlns, + ack_queue, + max_ack_queue, + manage_stream = fun negotiate_stream_mgmt/2, + n_stanzas_in = 0, + n_stanzas_out = 0, lang}). %-define(DBGFSM, true). @@ -156,6 +162,35 @@ -define(INVALID_FROM, ?SERR_INVALID_FROM). +%% XEP-0198: + +-define(IS_STREAM_MGMT_TAG(Name), + Name == <<"enable">>; + Name == <<"a">>; + Name == <<"r">>). + +-define(IS_SUPPORTED_SM_XMLNS(Xmlns), + Xmlns == ?NS_STREAM_MGMT_2; + Xmlns == ?NS_STREAM_MGMT_3). + +-define(SM_FAILED(Condition, Xmlns), + #xmlel{name = <<"failed">>, + attrs = [{<<"xmlns">>, Xmlns}], + children = [#xmlel{name = Condition, + attrs = [{<<"xmlns">>, ?NS_STANZAS}], + children = []}]}). + +-define(SM_BAD_REQUEST(Xmlns), + ?SM_FAILED(<<"bad-request">>, Xmlns)). + +-define(SM_SERVICE_UNAVAILABLE(Xmlns), + ?SM_FAILED(<<"service-unavailable">>, Xmlns)). + +-define(SM_UNEXPECTED_REQUEST(Xmlns), + ?SM_FAILED(<<"unexpected-request">>, Xmlns)). + +-define(SM_UNSUPPORTED_VERSION(Xmlns), + ?SM_FAILED(<<"unsupported-version">>, Xmlns)). %%%---------------------------------------------------------------------- %%% API @@ -250,6 +285,16 @@ init([{SockMod, Socket}, Opts]) -> true -> TLSOpts1 end, TLSOpts = [verify_none | TLSOpts2], + MaxAckQueue = case proplists:get_value(max_ack_queue, Opts) of + Limit when is_integer(Limit), Limit > 0 -> Limit; + _ -> 500 + end, + StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true), + AckQueue = if not StreamMgmtEnabled -> + none; + true -> + undefined + end, IP = peerip(SockMod, Socket), %% Check if IP is blacklisted: case is_ip_blacklisted(IP) of @@ -271,7 +316,8 @@ init([{SockMod, Socket}, Opts]) -> tls_required = StartTLSRequired, tls_enabled = TLSEnabled, tls_options = TLSOpts, streamid = new_id(), access = Access, - shaper = Shaper, ip = IP}, + shaper = Shaper, ip = IP, + ack_queue = AckQueue, max_ack_queue = MaxAckQueue}, {ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT} end. @@ -402,6 +448,18 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) -> ejabberd_hooks:run_fold(roster_get_versioning_feature, Server, [], [Server]), + StreamManagementFeature = + case stream_mgmt_enabled(StateData) of + true -> + [#xmlel{name = <<"sm">>, + attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_2}], + children = []}, + #xmlel{name = <<"sm">>, + attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_3}], + children = []}]; + false -> + [] + end, StreamFeatures = [#xmlel{name = <<"bind">>, attrs = [{<<"xmlns">>, ?NS_BIND}], children = []}, @@ -410,6 +468,7 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) -> children = []}] ++ RosterVersioningFeature ++ + StreamManagementFeature ++ ejabberd_hooks:run_fold(c2s_stream_features, Server, [], [Server]), send_element(StateData, @@ -472,6 +531,9 @@ wait_for_stream({xmlstreamerror, _}, StateData) -> wait_for_stream(closed, StateData) -> {stop, normal, StateData}. +wait_for_auth({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_auth, (StateData#state.manage_stream)(El, StateData)); wait_for_auth({xmlstreamelement, El}, StateData) -> case is_auth_packet(El) of {auth, _ID, get, {U, _, _, _}} -> @@ -618,6 +680,9 @@ wait_for_auth({xmlstreamerror, _}, StateData) -> wait_for_auth(closed, StateData) -> {stop, normal, StateData}. +wait_for_feature_request({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_feature_request, (StateData#state.manage_stream)(El, StateData)); wait_for_feature_request({xmlstreamelement, El}, StateData) -> #xmlel{name = Name, attrs = Attrs, children = Els} = El, @@ -783,6 +848,9 @@ wait_for_feature_request({xmlstreamerror, _}, wait_for_feature_request(closed, StateData) -> {stop, normal, StateData}. +wait_for_sasl_response({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_sasl_response, (StateData#state.manage_stream)(El, StateData)); wait_for_sasl_response({xmlstreamelement, El}, StateData) -> #xmlel{name = Name, attrs = Attrs, children = Els} = El, @@ -913,6 +981,9 @@ resource_conflict_action(U, S, R) -> {accept_resource, Rnew} end. +wait_for_bind({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_bind, (StateData#state.manage_stream)(El, StateData)); wait_for_bind({xmlstreamelement, El}, StateData) -> case jlib:iq_query_info(El) of #iq{type = set, xmlns = ?NS_BIND, sub_el = SubEl} = @@ -974,61 +1045,65 @@ wait_for_bind({xmlstreamerror, _}, StateData) -> wait_for_bind(closed, StateData) -> {stop, normal, StateData}. +wait_for_session({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_session, (StateData#state.manage_stream)(El, StateData)); wait_for_session({xmlstreamelement, El}, StateData) -> + NewStateData = update_num_stanzas_in(StateData, El), case jlib:iq_query_info(El) of #iq{type = set, xmlns = ?NS_SESSION} -> - U = StateData#state.user, - R = StateData#state.resource, - JID = StateData#state.jid, - case acl:match_rule(StateData#state.server, - StateData#state.access, JID) of + U = NewStateData#state.user, + R = NewStateData#state.resource, + JID = NewStateData#state.jid, + case acl:match_rule(NewStateData#state.server, + NewStateData#state.access, JID) of allow -> ?INFO_MSG("(~w) Opened session for ~s", - [StateData#state.socket, + [NewStateData#state.socket, jlib:jid_to_string(JID)]), Res = jlib:make_result_iq_reply(El#xmlel{children = []}), - send_element(StateData, Res), - change_shaper(StateData, JID), + NewState = send_stanza(NewStateData, Res), + change_shaper(NewState, JID), {Fs, Ts} = ejabberd_hooks:run_fold( roster_get_subscription_lists, - StateData#state.server, + NewState#state.server, {[], []}, - [U, StateData#state.server]), + [U, NewState#state.server]), LJID = jlib:jid_tolower(jlib:jid_remove_resource(JID)), Fs1 = [LJID | Fs], Ts1 = [LJID | Ts], PrivList = ejabberd_hooks:run_fold( - privacy_get_user_list, StateData#state.server, + privacy_get_user_list, NewState#state.server, #userlist{}, - [U, StateData#state.server]), + [U, NewState#state.server]), SID = {now(), self()}, - Conn = get_conn_type(StateData), - Info = [{ip, StateData#state.ip}, {conn, Conn}, - {auth_module, StateData#state.auth_module}], + Conn = get_conn_type(NewState), + Info = [{ip, NewState#state.ip}, {conn, Conn}, + {auth_module, NewState#state.auth_module}], ejabberd_sm:open_session( - SID, U, StateData#state.server, R, Info), - NewStateData = - StateData#state{ + SID, U, NewState#state.server, R, Info), + UpdatedStateData = + NewState#state{ sid = SID, conn = Conn, pres_f = ?SETS:from_list(Fs1), pres_t = ?SETS:from_list(Ts1), privacy_list = PrivList}, fsm_next_state_pack(session_established, - NewStateData); + UpdatedStateData); _ -> ejabberd_hooks:run(forbidden_session_hook, - StateData#state.server, [JID]), + NewStateData#state.server, [JID]), ?INFO_MSG("(~w) Forbidden session for ~s", - [StateData#state.socket, + [NewStateData#state.socket, jlib:jid_to_string(JID)]), Err = jlib:make_error_reply(El, ?ERR_NOT_ALLOWED), - send_element(StateData, Err), - fsm_next_state(wait_for_session, StateData) + send_element(NewStateData, Err), + fsm_next_state(wait_for_session, NewStateData) end; _ -> - fsm_next_state(wait_for_session, StateData) + fsm_next_state(wait_for_session, NewStateData) end; wait_for_session(timeout, StateData) -> @@ -1042,6 +1117,9 @@ wait_for_session({xmlstreamerror, _}, StateData) -> wait_for_session(closed, StateData) -> {stop, normal, StateData}. +session_established({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(session_established, (StateData#state.manage_stream)(El, StateData)); session_established({xmlstreamelement, El}, StateData) -> FromJID = StateData#state.jid, @@ -1080,9 +1158,10 @@ session_established(closed, StateData) -> %% connection) session_established2(El, StateData) -> #xmlel{name = Name, attrs = Attrs} = El, - User = StateData#state.user, - Server = StateData#state.server, - FromJID = StateData#state.jid, + NewStateData = update_num_stanzas_in(StateData, El), + User = NewStateData#state.user, + Server = NewStateData#state.server, + FromJID = NewStateData#state.jid, To = xml:get_attr_s(<<"to">>, Attrs), ToJID = case To of <<"">> -> jlib:make_jid(User, Server, <<"">>); @@ -1091,7 +1170,7 @@ session_established2(El, StateData) -> NewEl1 = jlib:remove_attr(<<"xmlns">>, El), NewEl = case xml:get_attr_s(<<"xml:lang">>, Attrs) of <<"">> -> - case StateData#state.lang of + case NewStateData#state.lang of <<"">> -> NewEl1; Lang -> xml:replace_tag_attr(<<"xml:lang">>, Lang, NewEl1) @@ -1101,13 +1180,18 @@ session_established2(El, StateData) -> NewState = case ToJID of error -> case xml:get_attr_s(<<"type">>, Attrs) of - <<"error">> -> StateData; - <<"result">> -> StateData; + <<"error">> -> NewStateData; + <<"result">> -> NewStateData; _ -> Err = jlib:make_error_reply(NewEl, ?ERR_JID_MALFORMED), - send_element(StateData, Err), - StateData + case is_stanza(Err) of + true -> + send_stanza(NewStateData, Err); + false -> + send_element(NewStateData, Err), + NewStateData + end end; _ -> case Name of @@ -1122,12 +1206,12 @@ session_established2(El, StateData) -> #jid{user = User, server = Server, resource = <<"">>} -> ?DEBUG("presence_update(~p,~n\t~p,~n\t~p)", - [FromJID, PresenceEl, StateData]), + [FromJID, PresenceEl, NewStateData]), presence_update(FromJID, PresenceEl, - StateData); + NewStateData); _ -> presence_track(FromJID, ToJID, PresenceEl, - StateData) + NewStateData) end; <<"iq">> -> case jlib:iq_query_info(NewEl) of @@ -1135,21 +1219,21 @@ session_established2(El, StateData) -> when Xmlns == (?NS_PRIVACY); Xmlns == (?NS_BLOCKING) -> process_privacy_iq(FromJID, ToJID, IQ, - StateData); + NewStateData); _ -> ejabberd_hooks:run(user_send_packet, Server, [FromJID, ToJID, NewEl]), - check_privacy_route(FromJID, StateData, + check_privacy_route(FromJID, NewStateData, FromJID, ToJID, NewEl), - StateData + NewStateData end; <<"message">> -> ejabberd_hooks:run(user_send_packet, Server, [FromJID, ToJID, NewEl]), - check_privacy_route(FromJID, StateData, FromJID, + check_privacy_route(FromJID, NewStateData, FromJID, ToJID, NewEl), - StateData; - _ -> StateData + NewStateData; + _ -> NewStateData end end, ejabberd_hooks:run(c2s_loop_debug, @@ -1263,13 +1347,13 @@ handle_info({route, _From, _To, {broadcast, Data}}, jlib:jid_remove_resource(StateData#state.jid), StateData#state.jid, jlib:iq_to_xml(PrivPushIQ)), - send_element(StateData, PrivPushEl), + NewState = send_stanza(StateData, PrivPushEl), fsm_next_state(StateName, - StateData#state{privacy_list = NewPL}) + NewState#state{privacy_list = NewPL}) end; {blocking, What} -> - route_blocking(What, StateData), - fsm_next_state(StateName, StateData); + NewState = route_blocking(What, StateData), + fsm_next_state(StateName, NewState); _ -> fsm_next_state(StateName, StateData) end; @@ -1515,12 +1599,12 @@ handle_info({route, From, To, jlib:replace_from_to_attrs(jlib:jid_to_string(From), jlib:jid_to_string(To), NewAttrs), FixedPacket = #xmlel{name = Name, attrs = Attrs2, children = Els}, - send_element(StateData, FixedPacket), + SentStateData = send_stanza(StateData, FixedPacket), ejabberd_hooks:run(user_receive_packet, - StateData#state.server, - [StateData#state.jid, From, To, FixedPacket]), + SentStateData#state.server, + [SentStateData#state.jid, From, To, FixedPacket]), ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), - fsm_next_state(StateName, NewState); + fsm_next_state(StateName, SentStateData); true -> ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), fsm_next_state(StateName, NewState) @@ -1643,6 +1727,7 @@ terminate(_Reason, StateName, StateData) -> StateData#state.pres_i, Packet) end end, + resend_unacked_stanzas(StateData), bounce_messages(); _ -> ok @@ -1674,6 +1759,13 @@ send_element(StateData, El) when StateData#state.xml_socket -> send_element(StateData, El) -> send_text(StateData, xml:element_to_binary(El)). +send_stanza(StateData, Stanza) when StateData#state.sm_xmlns /= undefined -> + send_stanza_and_ack_req(StateData, Stanza), + ack_queue_add(StateData, Stanza); +send_stanza(StateData, Stanza) -> + send_element(StateData, Stanza), + StateData. + send_header(StateData, Server, Version, Lang) when StateData#state.xml_socket -> VersionAttr = case Version of @@ -1727,6 +1819,19 @@ is_auth_packet(El) -> _ -> false end. +is_stanza(#xmlel{name = Name, attrs = Attrs}) when Name == <<"message">>; + Name == <<"presence">>; + Name == <<"iq">> -> + case xml:get_attr(<<"xmlns">>, Attrs) of + {value, NS} when NS /= <<"jabber:client">>, + NS /= <<"jabber:server">> -> + false; + _ -> + true + end; +is_stanza(_El) -> + false. + get_auth_tags([#xmlel{name = Name, children = Els} | L], U, P, D, R) -> CData = xml:get_cdata(Els), @@ -1854,12 +1959,12 @@ presence_update(From, Packet, StateData) -> ejabberd_hooks:run(user_available_hook, NewStateData#state.server, [NewStateData#state.jid]), - if NewPriority >= 0 -> - resend_offline_messages(NewStateData), - resend_subscription_requests(NewStateData); - true -> ok - end, - presence_broadcast_first(From, NewStateData, + ResentStateData = if NewPriority >= 0 -> + resend_offline_messages(NewStateData), + resend_subscription_requests(NewStateData); + true -> NewStateData + end, + presence_broadcast_first(From, ResentStateData, Packet); true -> presence_broadcast_to_trusted(NewStateData, From, @@ -2173,10 +2278,11 @@ resend_subscription_requests(#state{user = User, PendingSubscriptions = ejabberd_hooks:run_fold(resend_subscription_requests_hook, Server, [], [User, Server]), - lists:foreach(fun (XMLPacket) -> - send_element(StateData, XMLPacket) - end, - PendingSubscriptions). + lists:foldl(fun (XMLPacket, AccStateData) -> + send_stanza(AccStateData, XMLPacket) + end, + StateData, + PendingSubscriptions). get_showtag(undefined) -> <<"unavailable">>; get_showtag(Presence) -> @@ -2352,10 +2458,185 @@ route_blocking(What, StateData) -> PrivPushEl = jlib:replace_from_to(jlib:jid_remove_resource(StateData#state.jid), StateData#state.jid, jlib:iq_to_xml(PrivPushIQ)), - send_element(StateData, PrivPushEl), %% No need to replace active privacy list here, %% blocking pushes are always accompanied by %% Privacy List pushes + send_stanza(StateData, PrivPushEl). + +%%%---------------------------------------------------------------------- +%%% XEP-0198 +%%%---------------------------------------------------------------------- + +stream_mgmt_enabled(#state{ack_queue = none}) -> + false; +stream_mgmt_enabled(_StateData) -> + true. + +negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) -> + %% XEP-0198 says: "For client-to-server connections, the client MUST NOT + %% attempt to enable stream management until after it has completed Resource + %% Binding". However, it also says: "Stream management errors SHOULD be + %% considered recoverable", so we won't bail out. + send_element(StateData, ?SM_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)), + StateData; +negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> + case xml:get_attr_s(<<"xmlns">>, Attrs) of + Xmlns when ?IS_SUPPORTED_SM_XMLNS(Xmlns) -> + case stream_mgmt_enabled(StateData) of + true -> + case Name of + <<"enable">> -> + handle_enable(StateData#state{sm_xmlns = Xmlns}); + _ -> + Res = if Name == <<"a">>; + Name == <<"r">> -> + ?SM_UNEXPECTED_REQUEST(Xmlns); + true -> + ?SM_BAD_REQUEST(Xmlns) + end, + send_element(StateData, Res), + StateData + end; + false -> + send_element(StateData, ?SM_SERVICE_UNAVAILABLE(Xmlns)), + StateData + end; + _ -> + send_element(StateData, ?SM_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3)), + StateData + end. + +perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> + case xml:get_attr_s(<<"xmlns">>, Attrs) of + Xmlns when Xmlns == StateData#state.sm_xmlns -> + case Name of + <<"r">> -> + handle_r(StateData); + <<"a">> -> + handle_a(StateData, Attrs); + _ -> + Res = if Name == <<"enable">> -> + ?SM_UNEXPECTED_REQUEST(Xmlns); + true -> + ?SM_BAD_REQUEST(Xmlns) + end, + send_element(StateData, Res), + StateData + end; + _ -> + send_element(StateData, + ?SM_UNSUPPORTED_VERSION(StateData#state.sm_xmlns)), + StateData + end. + +handle_enable(StateData) -> + ?INFO_MSG("Enabling XEP-0198 stream management for ~s", + [jlib:jid_to_string(StateData#state.jid)]), + Res = #xmlel{name = <<"enabled">>, + attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}], + children = []}, + send_element(StateData, Res), + StateData#state{ack_queue = queue:new(), + manage_stream = fun perform_stream_mgmt/2}. + +handle_r(StateData) -> + H = jlib:integer_to_binary(StateData#state.n_stanzas_in), + Res = #xmlel{name = <<"a">>, + attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}, + {<<"h">>, H}], + children = []}, + send_element(StateData, Res), + StateData. + +handle_a(#state{jid = JID, n_stanzas_out = NumStanzasOut} = StateData, Attrs) -> + case catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs)) of + H when is_integer(H), H > NumStanzasOut -> + ?WARNING_MSG("~s acknowledged ~B stanzas, but only ~B were sent", + [jlib:jid_to_string(JID), H, NumStanzasOut]), + ack_queue_drop(StateData, NumStanzasOut); + H when is_integer(H), H >= 0 -> + ?DEBUG("~s acknowledged ~B of ~B stanzas", + [jlib:jid_to_string(JID), H, NumStanzasOut]), + ack_queue_drop(StateData, H); + _ -> + ?WARNING_MSG("Ignoring invalid ACK element from ~s", + [jlib:jid_to_string(JID)]), + StateData + end. + +update_num_stanzas_in(StateData, El) when StateData#state.sm_xmlns /= undefined -> + NewNum = case {is_stanza(El), StateData#state.n_stanzas_in} of + {true, 4294967295} -> + 0; + {true, Num} -> + Num + 1; + {false, Num} -> + Num + end, + StateData#state{n_stanzas_in = NewNum}; +update_num_stanzas_in(StateData, _El) -> + StateData. + +send_stanza_and_ack_req(StateData, Stanza) -> + AckReq = #xmlel{name = <<"r">>, + attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}], + children = []}, + StanzaS = xml:element_to_binary(Stanza), + AckReqS = xml:element_to_binary(AckReq), + send_text(StateData, [StanzaS, AckReqS]). + +ack_queue_add(StateData, El) -> + NewNum = case StateData#state.n_stanzas_out of + 4294967295 -> + 0; + Num -> + Num + 1 + end, + NewState = limit_queue_length(StateData), + NewQueue = queue:in({NewNum, El}, NewState#state.ack_queue), + NewState#state{ack_queue = NewQueue, n_stanzas_out = NewNum}. + +ack_queue_drop(StateData, NumHandled) -> + NewQueue = jlib:queue_drop_while(fun({N, _Stanza}) -> N =< NumHandled end, + StateData#state.ack_queue), + StateData#state{ack_queue = NewQueue}. + +limit_queue_length(#state{max_ack_queue = Limit} = StateData) + when Limit == infinity; + Limit == unlimited -> + StateData; +limit_queue_length(#state{jid = JID, + ack_queue = Queue, + max_ack_queue = Limit} = StateData) -> + case queue:len(Queue) >= Limit of + true -> + ?WARNING_MSG("Dropping stanza from too long ACK queue for ~s", + [jlib:jid_to_string(JID)]), + limit_queue_length(StateData#state{ack_queue = queue:drop(Queue)}); + false -> + StateData + end. + +resend_unacked_stanzas(StateData) when StateData#state.sm_xmlns /= undefined -> + Queue = StateData#state.ack_queue, + case queue:len(Queue) of + 0 -> + ok; + N -> + ?INFO_MSG("Resending ~B unacknowledged stanzas to ~s", + [N, jlib:jid_to_string(StateData#state.jid)]), + lists:foreach( + fun({Num, #xmlel{attrs = Attrs} = El}) -> + From_s = xml:get_attr_s(<<"from">>, Attrs), + From = jlib:string_to_jid(From_s), + To_s = xml:get_attr_s(<<"to">>, Attrs), + To = jlib:string_to_jid(To_s), + ?DEBUG("Resending unacknowledged stanza #~B from ~s to ~s", + [Num, From_s, To_s]), + ejabberd_router:route(From, To, El) + end, queue:to_list(Queue)) + end; +resend_unacked_stanzas(_StateData) -> ok. %%%---------------------------------------------------------------------- diff --git a/src/jlib.erl b/src/jlib.erl index 46e864b0c..a362697f4 100644 --- a/src/jlib.erl +++ b/src/jlib.erl @@ -51,7 +51,7 @@ binary_to_integer/1, binary_to_integer/2, integer_to_binary/1, integer_to_binary/2, atom_to_binary/1, binary_to_atom/1, tuple_to_binary/1, - l2i/1, i2l/1, i2l/2]). + l2i/1, i2l/1, i2l/2, queue_drop_while/2]). %% TODO: Remove once XEP-0091 is Obsolete %% TODO: Remove once XEP-0091 is Obsolete @@ -894,3 +894,18 @@ i2l(L, N) when is_binary(L) -> C when C > N -> L; _ -> i2l(<<$0, L/binary>>, N) end. + +-spec queue_drop_while(fun((term()) -> boolean()), queue()) -> queue(). + +queue_drop_while(F, Q) -> + case queue:peek(Q) of + {value, Item} -> + case F(Item) of + true -> + queue_drop_while(F, queue:drop(Q)); + _ -> + Q + end; + empty -> + Q + end.