From 7d594086c3601c3281473b4ca54714b714027019 Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Wed, 12 Mar 2014 23:34:14 +0100 Subject: [PATCH 01/11] Add initial XEP-0198 support (EJAB-532) Implement partial support for XEP-0198: Stream Management. After successful negotiation of this feature, the server requests an ACK for each stanza transmitted to the client and responds to ACK requests issued by the client. On session termination, the server re-routes any unacknowledged stanzas. The length of the pending queue can be limited by setting the "max_ack_queue" option to some integer value (default: 500). XEP-0198 support can be disabled entirely by setting the "stream_management" option to false (default: true). So far, stream management is implemented only for c2s connections, and the optional stream resumption feature also described in XEP-0198 is not (yet) supported. This addition was originally based on a patch provided by Magnus Henoch and updated by Grzegorz Grasza. Their code implements an early draft of XEP-0198 for some previous version of ejabberd. It has since been rewritten almost entirely. --- doc/guide.tex | 16 +- include/ns.hrl | 2 + src/ejabberd_c2s.erl | 403 ++++++++++++++++++++++++++++++++++++------- src/jlib.erl | 17 +- 4 files changed, 374 insertions(+), 64 deletions(-) diff --git a/doc/guide.tex b/doc/guide.tex index 5d5bf2fdc..59f39e256 100644 --- a/doc/guide.tex +++ b/doc/guide.tex @@ -870,9 +870,10 @@ The available modules, their purpose and the options allowed by each one are: \titem{\texttt{ejabberd\_c2s}} Handles c2s connections.\\ Options: \texttt{access}, \texttt{certfile}, \texttt{ciphers}, - \texttt{max\_fsm\_queue}, + \texttt{max\_ack\_queue}, \texttt{max\_fsm\_queue}, \texttt{max\_stanza\_size}, \texttt{shaper}, - \texttt{starttls}, \texttt{starttls\_required}, \texttt{tls}, + \texttt{starttls}, \texttt{starttls\_required}, + \texttt{stream\_management}, \texttt{tls}, \texttt{zlib}, \texttt{tls\_compression} \titem{\texttt{ejabberd\_s2s\_in}} Handles incoming s2s connections.\\ @@ -962,6 +963,13 @@ This is a detailed description of each option allowed by the listening modules: \term{http\_poll\_timeout}. The default value is five minutes. The option can be defined in \term{ejabberd.yml}, expressing the time in seconds: \verb|{http_poll_timeout, 300}.| + \titem{max\_ack\_queue: Size} + This option specifies the maximum number of unacknowledged stanzas + queued for possible retransmission if \term{stream\_management} is + enabled. When the limit is reached, the first stanza is dropped from + the queue before adding the next one. This option can be specified + for \term{ejabberd\_c2s} listeners. The allowed values are positive + integers and \term{infinity}. Default value: \term{500}. \titem{max\_fsm\_queue: Size} This option specifies the maximum number of elements in the queue of the FSM (Finite State Machine). @@ -1022,6 +1030,10 @@ request_handlers: No unencrypted connections will be allowed. You should also set the \option{certfile} option. You can define a certificate file for a specific domain using the global option \option{domain\_certfile}. + \titem{stream\_management: true|false} + Setting this option to \term{false} disables ejabberd's support for + \ind{protocols!XEP-0198: Stream Management}. It can be specified for + \term{ejabberd\_c2s} listeners. The default value is \term{true}. \titem{timeout: Integer} \ind{options!timeout} Timeout of the connections, expressed in milliseconds. Default: 5000 diff --git a/include/ns.hrl b/include/ns.hrl index 6d041a478..2cf218c7c 100644 --- a/include/ns.hrl +++ b/include/ns.hrl @@ -144,3 +144,5 @@ -define(NS_MEDIA, <<"urn:xmpp:media-element">>). -define(NS_BOB, <<"urn:xmpp:bob">>). -define(NS_PING, <<"urn:xmpp:ping">>). +-define(NS_STREAM_MGMT_2, <<"urn:xmpp:sm:2">>). +-define(NS_STREAM_MGMT_3, <<"urn:xmpp:sm:3">>). diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index ae5fc97b8..044afd0f4 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -108,6 +108,12 @@ auth_module = unknown, ip, aux_fields = [], + sm_xmlns, + ack_queue, + max_ack_queue, + manage_stream = fun negotiate_stream_mgmt/2, + n_stanzas_in = 0, + n_stanzas_out = 0, lang}). %-define(DBGFSM, true). @@ -156,6 +162,35 @@ -define(INVALID_FROM, ?SERR_INVALID_FROM). +%% XEP-0198: + +-define(IS_STREAM_MGMT_TAG(Name), + Name == <<"enable">>; + Name == <<"a">>; + Name == <<"r">>). + +-define(IS_SUPPORTED_SM_XMLNS(Xmlns), + Xmlns == ?NS_STREAM_MGMT_2; + Xmlns == ?NS_STREAM_MGMT_3). + +-define(SM_FAILED(Condition, Xmlns), + #xmlel{name = <<"failed">>, + attrs = [{<<"xmlns">>, Xmlns}], + children = [#xmlel{name = Condition, + attrs = [{<<"xmlns">>, ?NS_STANZAS}], + children = []}]}). + +-define(SM_BAD_REQUEST(Xmlns), + ?SM_FAILED(<<"bad-request">>, Xmlns)). + +-define(SM_SERVICE_UNAVAILABLE(Xmlns), + ?SM_FAILED(<<"service-unavailable">>, Xmlns)). + +-define(SM_UNEXPECTED_REQUEST(Xmlns), + ?SM_FAILED(<<"unexpected-request">>, Xmlns)). + +-define(SM_UNSUPPORTED_VERSION(Xmlns), + ?SM_FAILED(<<"unsupported-version">>, Xmlns)). %%%---------------------------------------------------------------------- %%% API @@ -250,6 +285,16 @@ init([{SockMod, Socket}, Opts]) -> true -> TLSOpts1 end, TLSOpts = [verify_none | TLSOpts2], + MaxAckQueue = case proplists:get_value(max_ack_queue, Opts) of + Limit when is_integer(Limit), Limit > 0 -> Limit; + _ -> 500 + end, + StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true), + AckQueue = if not StreamMgmtEnabled -> + none; + true -> + undefined + end, IP = peerip(SockMod, Socket), %% Check if IP is blacklisted: case is_ip_blacklisted(IP) of @@ -271,7 +316,8 @@ init([{SockMod, Socket}, Opts]) -> tls_required = StartTLSRequired, tls_enabled = TLSEnabled, tls_options = TLSOpts, streamid = new_id(), access = Access, - shaper = Shaper, ip = IP}, + shaper = Shaper, ip = IP, + ack_queue = AckQueue, max_ack_queue = MaxAckQueue}, {ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT} end. @@ -402,6 +448,18 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) -> ejabberd_hooks:run_fold(roster_get_versioning_feature, Server, [], [Server]), + StreamManagementFeature = + case stream_mgmt_enabled(StateData) of + true -> + [#xmlel{name = <<"sm">>, + attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_2}], + children = []}, + #xmlel{name = <<"sm">>, + attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_3}], + children = []}]; + false -> + [] + end, StreamFeatures = [#xmlel{name = <<"bind">>, attrs = [{<<"xmlns">>, ?NS_BIND}], children = []}, @@ -410,6 +468,7 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) -> children = []}] ++ RosterVersioningFeature ++ + StreamManagementFeature ++ ejabberd_hooks:run_fold(c2s_stream_features, Server, [], [Server]), send_element(StateData, @@ -472,6 +531,9 @@ wait_for_stream({xmlstreamerror, _}, StateData) -> wait_for_stream(closed, StateData) -> {stop, normal, StateData}. +wait_for_auth({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_auth, (StateData#state.manage_stream)(El, StateData)); wait_for_auth({xmlstreamelement, El}, StateData) -> case is_auth_packet(El) of {auth, _ID, get, {U, _, _, _}} -> @@ -618,6 +680,9 @@ wait_for_auth({xmlstreamerror, _}, StateData) -> wait_for_auth(closed, StateData) -> {stop, normal, StateData}. +wait_for_feature_request({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_feature_request, (StateData#state.manage_stream)(El, StateData)); wait_for_feature_request({xmlstreamelement, El}, StateData) -> #xmlel{name = Name, attrs = Attrs, children = Els} = El, @@ -783,6 +848,9 @@ wait_for_feature_request({xmlstreamerror, _}, wait_for_feature_request(closed, StateData) -> {stop, normal, StateData}. +wait_for_sasl_response({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_sasl_response, (StateData#state.manage_stream)(El, StateData)); wait_for_sasl_response({xmlstreamelement, El}, StateData) -> #xmlel{name = Name, attrs = Attrs, children = Els} = El, @@ -913,6 +981,9 @@ resource_conflict_action(U, S, R) -> {accept_resource, Rnew} end. +wait_for_bind({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_bind, (StateData#state.manage_stream)(El, StateData)); wait_for_bind({xmlstreamelement, El}, StateData) -> case jlib:iq_query_info(El) of #iq{type = set, xmlns = ?NS_BIND, sub_el = SubEl} = @@ -974,61 +1045,65 @@ wait_for_bind({xmlstreamerror, _}, StateData) -> wait_for_bind(closed, StateData) -> {stop, normal, StateData}. +wait_for_session({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_session, (StateData#state.manage_stream)(El, StateData)); wait_for_session({xmlstreamelement, El}, StateData) -> + NewStateData = update_num_stanzas_in(StateData, El), case jlib:iq_query_info(El) of #iq{type = set, xmlns = ?NS_SESSION} -> - U = StateData#state.user, - R = StateData#state.resource, - JID = StateData#state.jid, - case acl:match_rule(StateData#state.server, - StateData#state.access, JID) of + U = NewStateData#state.user, + R = NewStateData#state.resource, + JID = NewStateData#state.jid, + case acl:match_rule(NewStateData#state.server, + NewStateData#state.access, JID) of allow -> ?INFO_MSG("(~w) Opened session for ~s", - [StateData#state.socket, + [NewStateData#state.socket, jlib:jid_to_string(JID)]), Res = jlib:make_result_iq_reply(El#xmlel{children = []}), - send_element(StateData, Res), - change_shaper(StateData, JID), + NewState = send_stanza(NewStateData, Res), + change_shaper(NewState, JID), {Fs, Ts} = ejabberd_hooks:run_fold( roster_get_subscription_lists, - StateData#state.server, + NewState#state.server, {[], []}, - [U, StateData#state.server]), + [U, NewState#state.server]), LJID = jlib:jid_tolower(jlib:jid_remove_resource(JID)), Fs1 = [LJID | Fs], Ts1 = [LJID | Ts], PrivList = ejabberd_hooks:run_fold( - privacy_get_user_list, StateData#state.server, + privacy_get_user_list, NewState#state.server, #userlist{}, - [U, StateData#state.server]), + [U, NewState#state.server]), SID = {now(), self()}, - Conn = get_conn_type(StateData), - Info = [{ip, StateData#state.ip}, {conn, Conn}, - {auth_module, StateData#state.auth_module}], + Conn = get_conn_type(NewState), + Info = [{ip, NewState#state.ip}, {conn, Conn}, + {auth_module, NewState#state.auth_module}], ejabberd_sm:open_session( - SID, U, StateData#state.server, R, Info), - NewStateData = - StateData#state{ + SID, U, NewState#state.server, R, Info), + UpdatedStateData = + NewState#state{ sid = SID, conn = Conn, pres_f = ?SETS:from_list(Fs1), pres_t = ?SETS:from_list(Ts1), privacy_list = PrivList}, fsm_next_state_pack(session_established, - NewStateData); + UpdatedStateData); _ -> ejabberd_hooks:run(forbidden_session_hook, - StateData#state.server, [JID]), + NewStateData#state.server, [JID]), ?INFO_MSG("(~w) Forbidden session for ~s", - [StateData#state.socket, + [NewStateData#state.socket, jlib:jid_to_string(JID)]), Err = jlib:make_error_reply(El, ?ERR_NOT_ALLOWED), - send_element(StateData, Err), - fsm_next_state(wait_for_session, StateData) + send_element(NewStateData, Err), + fsm_next_state(wait_for_session, NewStateData) end; _ -> - fsm_next_state(wait_for_session, StateData) + fsm_next_state(wait_for_session, NewStateData) end; wait_for_session(timeout, StateData) -> @@ -1042,6 +1117,9 @@ wait_for_session({xmlstreamerror, _}, StateData) -> wait_for_session(closed, StateData) -> {stop, normal, StateData}. +session_established({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(session_established, (StateData#state.manage_stream)(El, StateData)); session_established({xmlstreamelement, El}, StateData) -> FromJID = StateData#state.jid, @@ -1080,9 +1158,10 @@ session_established(closed, StateData) -> %% connection) session_established2(El, StateData) -> #xmlel{name = Name, attrs = Attrs} = El, - User = StateData#state.user, - Server = StateData#state.server, - FromJID = StateData#state.jid, + NewStateData = update_num_stanzas_in(StateData, El), + User = NewStateData#state.user, + Server = NewStateData#state.server, + FromJID = NewStateData#state.jid, To = xml:get_attr_s(<<"to">>, Attrs), ToJID = case To of <<"">> -> jlib:make_jid(User, Server, <<"">>); @@ -1091,7 +1170,7 @@ session_established2(El, StateData) -> NewEl1 = jlib:remove_attr(<<"xmlns">>, El), NewEl = case xml:get_attr_s(<<"xml:lang">>, Attrs) of <<"">> -> - case StateData#state.lang of + case NewStateData#state.lang of <<"">> -> NewEl1; Lang -> xml:replace_tag_attr(<<"xml:lang">>, Lang, NewEl1) @@ -1101,13 +1180,18 @@ session_established2(El, StateData) -> NewState = case ToJID of error -> case xml:get_attr_s(<<"type">>, Attrs) of - <<"error">> -> StateData; - <<"result">> -> StateData; + <<"error">> -> NewStateData; + <<"result">> -> NewStateData; _ -> Err = jlib:make_error_reply(NewEl, ?ERR_JID_MALFORMED), - send_element(StateData, Err), - StateData + case is_stanza(Err) of + true -> + send_stanza(NewStateData, Err); + false -> + send_element(NewStateData, Err), + NewStateData + end end; _ -> case Name of @@ -1122,12 +1206,12 @@ session_established2(El, StateData) -> #jid{user = User, server = Server, resource = <<"">>} -> ?DEBUG("presence_update(~p,~n\t~p,~n\t~p)", - [FromJID, PresenceEl, StateData]), + [FromJID, PresenceEl, NewStateData]), presence_update(FromJID, PresenceEl, - StateData); + NewStateData); _ -> presence_track(FromJID, ToJID, PresenceEl, - StateData) + NewStateData) end; <<"iq">> -> case jlib:iq_query_info(NewEl) of @@ -1135,21 +1219,21 @@ session_established2(El, StateData) -> when Xmlns == (?NS_PRIVACY); Xmlns == (?NS_BLOCKING) -> process_privacy_iq(FromJID, ToJID, IQ, - StateData); + NewStateData); _ -> ejabberd_hooks:run(user_send_packet, Server, [FromJID, ToJID, NewEl]), - check_privacy_route(FromJID, StateData, + check_privacy_route(FromJID, NewStateData, FromJID, ToJID, NewEl), - StateData + NewStateData end; <<"message">> -> ejabberd_hooks:run(user_send_packet, Server, [FromJID, ToJID, NewEl]), - check_privacy_route(FromJID, StateData, FromJID, + check_privacy_route(FromJID, NewStateData, FromJID, ToJID, NewEl), - StateData; - _ -> StateData + NewStateData; + _ -> NewStateData end end, ejabberd_hooks:run(c2s_loop_debug, @@ -1263,13 +1347,13 @@ handle_info({route, _From, _To, {broadcast, Data}}, jlib:jid_remove_resource(StateData#state.jid), StateData#state.jid, jlib:iq_to_xml(PrivPushIQ)), - send_element(StateData, PrivPushEl), + NewState = send_stanza(StateData, PrivPushEl), fsm_next_state(StateName, - StateData#state{privacy_list = NewPL}) + NewState#state{privacy_list = NewPL}) end; {blocking, What} -> - route_blocking(What, StateData), - fsm_next_state(StateName, StateData); + NewState = route_blocking(What, StateData), + fsm_next_state(StateName, NewState); _ -> fsm_next_state(StateName, StateData) end; @@ -1515,12 +1599,12 @@ handle_info({route, From, To, jlib:replace_from_to_attrs(jlib:jid_to_string(From), jlib:jid_to_string(To), NewAttrs), FixedPacket = #xmlel{name = Name, attrs = Attrs2, children = Els}, - send_element(StateData, FixedPacket), + SentStateData = send_stanza(StateData, FixedPacket), ejabberd_hooks:run(user_receive_packet, - StateData#state.server, - [StateData#state.jid, From, To, FixedPacket]), + SentStateData#state.server, + [SentStateData#state.jid, From, To, FixedPacket]), ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), - fsm_next_state(StateName, NewState); + fsm_next_state(StateName, SentStateData); true -> ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), fsm_next_state(StateName, NewState) @@ -1643,6 +1727,7 @@ terminate(_Reason, StateName, StateData) -> StateData#state.pres_i, Packet) end end, + resend_unacked_stanzas(StateData), bounce_messages(); _ -> ok @@ -1674,6 +1759,13 @@ send_element(StateData, El) when StateData#state.xml_socket -> send_element(StateData, El) -> send_text(StateData, xml:element_to_binary(El)). +send_stanza(StateData, Stanza) when StateData#state.sm_xmlns /= undefined -> + send_stanza_and_ack_req(StateData, Stanza), + ack_queue_add(StateData, Stanza); +send_stanza(StateData, Stanza) -> + send_element(StateData, Stanza), + StateData. + send_header(StateData, Server, Version, Lang) when StateData#state.xml_socket -> VersionAttr = case Version of @@ -1727,6 +1819,19 @@ is_auth_packet(El) -> _ -> false end. +is_stanza(#xmlel{name = Name, attrs = Attrs}) when Name == <<"message">>; + Name == <<"presence">>; + Name == <<"iq">> -> + case xml:get_attr(<<"xmlns">>, Attrs) of + {value, NS} when NS /= <<"jabber:client">>, + NS /= <<"jabber:server">> -> + false; + _ -> + true + end; +is_stanza(_El) -> + false. + get_auth_tags([#xmlel{name = Name, children = Els} | L], U, P, D, R) -> CData = xml:get_cdata(Els), @@ -1854,12 +1959,12 @@ presence_update(From, Packet, StateData) -> ejabberd_hooks:run(user_available_hook, NewStateData#state.server, [NewStateData#state.jid]), - if NewPriority >= 0 -> - resend_offline_messages(NewStateData), - resend_subscription_requests(NewStateData); - true -> ok - end, - presence_broadcast_first(From, NewStateData, + ResentStateData = if NewPriority >= 0 -> + resend_offline_messages(NewStateData), + resend_subscription_requests(NewStateData); + true -> NewStateData + end, + presence_broadcast_first(From, ResentStateData, Packet); true -> presence_broadcast_to_trusted(NewStateData, From, @@ -2173,10 +2278,11 @@ resend_subscription_requests(#state{user = User, PendingSubscriptions = ejabberd_hooks:run_fold(resend_subscription_requests_hook, Server, [], [User, Server]), - lists:foreach(fun (XMLPacket) -> - send_element(StateData, XMLPacket) - end, - PendingSubscriptions). + lists:foldl(fun (XMLPacket, AccStateData) -> + send_stanza(AccStateData, XMLPacket) + end, + StateData, + PendingSubscriptions). get_showtag(undefined) -> <<"unavailable">>; get_showtag(Presence) -> @@ -2352,10 +2458,185 @@ route_blocking(What, StateData) -> PrivPushEl = jlib:replace_from_to(jlib:jid_remove_resource(StateData#state.jid), StateData#state.jid, jlib:iq_to_xml(PrivPushIQ)), - send_element(StateData, PrivPushEl), %% No need to replace active privacy list here, %% blocking pushes are always accompanied by %% Privacy List pushes + send_stanza(StateData, PrivPushEl). + +%%%---------------------------------------------------------------------- +%%% XEP-0198 +%%%---------------------------------------------------------------------- + +stream_mgmt_enabled(#state{ack_queue = none}) -> + false; +stream_mgmt_enabled(_StateData) -> + true. + +negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) -> + %% XEP-0198 says: "For client-to-server connections, the client MUST NOT + %% attempt to enable stream management until after it has completed Resource + %% Binding". However, it also says: "Stream management errors SHOULD be + %% considered recoverable", so we won't bail out. + send_element(StateData, ?SM_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)), + StateData; +negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> + case xml:get_attr_s(<<"xmlns">>, Attrs) of + Xmlns when ?IS_SUPPORTED_SM_XMLNS(Xmlns) -> + case stream_mgmt_enabled(StateData) of + true -> + case Name of + <<"enable">> -> + handle_enable(StateData#state{sm_xmlns = Xmlns}); + _ -> + Res = if Name == <<"a">>; + Name == <<"r">> -> + ?SM_UNEXPECTED_REQUEST(Xmlns); + true -> + ?SM_BAD_REQUEST(Xmlns) + end, + send_element(StateData, Res), + StateData + end; + false -> + send_element(StateData, ?SM_SERVICE_UNAVAILABLE(Xmlns)), + StateData + end; + _ -> + send_element(StateData, ?SM_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3)), + StateData + end. + +perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> + case xml:get_attr_s(<<"xmlns">>, Attrs) of + Xmlns when Xmlns == StateData#state.sm_xmlns -> + case Name of + <<"r">> -> + handle_r(StateData); + <<"a">> -> + handle_a(StateData, Attrs); + _ -> + Res = if Name == <<"enable">> -> + ?SM_UNEXPECTED_REQUEST(Xmlns); + true -> + ?SM_BAD_REQUEST(Xmlns) + end, + send_element(StateData, Res), + StateData + end; + _ -> + send_element(StateData, + ?SM_UNSUPPORTED_VERSION(StateData#state.sm_xmlns)), + StateData + end. + +handle_enable(StateData) -> + ?INFO_MSG("Enabling XEP-0198 stream management for ~s", + [jlib:jid_to_string(StateData#state.jid)]), + Res = #xmlel{name = <<"enabled">>, + attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}], + children = []}, + send_element(StateData, Res), + StateData#state{ack_queue = queue:new(), + manage_stream = fun perform_stream_mgmt/2}. + +handle_r(StateData) -> + H = jlib:integer_to_binary(StateData#state.n_stanzas_in), + Res = #xmlel{name = <<"a">>, + attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}, + {<<"h">>, H}], + children = []}, + send_element(StateData, Res), + StateData. + +handle_a(#state{jid = JID, n_stanzas_out = NumStanzasOut} = StateData, Attrs) -> + case catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs)) of + H when is_integer(H), H > NumStanzasOut -> + ?WARNING_MSG("~s acknowledged ~B stanzas, but only ~B were sent", + [jlib:jid_to_string(JID), H, NumStanzasOut]), + ack_queue_drop(StateData, NumStanzasOut); + H when is_integer(H), H >= 0 -> + ?DEBUG("~s acknowledged ~B of ~B stanzas", + [jlib:jid_to_string(JID), H, NumStanzasOut]), + ack_queue_drop(StateData, H); + _ -> + ?WARNING_MSG("Ignoring invalid ACK element from ~s", + [jlib:jid_to_string(JID)]), + StateData + end. + +update_num_stanzas_in(StateData, El) when StateData#state.sm_xmlns /= undefined -> + NewNum = case {is_stanza(El), StateData#state.n_stanzas_in} of + {true, 4294967295} -> + 0; + {true, Num} -> + Num + 1; + {false, Num} -> + Num + end, + StateData#state{n_stanzas_in = NewNum}; +update_num_stanzas_in(StateData, _El) -> + StateData. + +send_stanza_and_ack_req(StateData, Stanza) -> + AckReq = #xmlel{name = <<"r">>, + attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}], + children = []}, + StanzaS = xml:element_to_binary(Stanza), + AckReqS = xml:element_to_binary(AckReq), + send_text(StateData, [StanzaS, AckReqS]). + +ack_queue_add(StateData, El) -> + NewNum = case StateData#state.n_stanzas_out of + 4294967295 -> + 0; + Num -> + Num + 1 + end, + NewState = limit_queue_length(StateData), + NewQueue = queue:in({NewNum, El}, NewState#state.ack_queue), + NewState#state{ack_queue = NewQueue, n_stanzas_out = NewNum}. + +ack_queue_drop(StateData, NumHandled) -> + NewQueue = jlib:queue_drop_while(fun({N, _Stanza}) -> N =< NumHandled end, + StateData#state.ack_queue), + StateData#state{ack_queue = NewQueue}. + +limit_queue_length(#state{max_ack_queue = Limit} = StateData) + when Limit == infinity; + Limit == unlimited -> + StateData; +limit_queue_length(#state{jid = JID, + ack_queue = Queue, + max_ack_queue = Limit} = StateData) -> + case queue:len(Queue) >= Limit of + true -> + ?WARNING_MSG("Dropping stanza from too long ACK queue for ~s", + [jlib:jid_to_string(JID)]), + limit_queue_length(StateData#state{ack_queue = queue:drop(Queue)}); + false -> + StateData + end. + +resend_unacked_stanzas(StateData) when StateData#state.sm_xmlns /= undefined -> + Queue = StateData#state.ack_queue, + case queue:len(Queue) of + 0 -> + ok; + N -> + ?INFO_MSG("Resending ~B unacknowledged stanzas to ~s", + [N, jlib:jid_to_string(StateData#state.jid)]), + lists:foreach( + fun({Num, #xmlel{attrs = Attrs} = El}) -> + From_s = xml:get_attr_s(<<"from">>, Attrs), + From = jlib:string_to_jid(From_s), + To_s = xml:get_attr_s(<<"to">>, Attrs), + To = jlib:string_to_jid(To_s), + ?DEBUG("Resending unacknowledged stanza #~B from ~s to ~s", + [Num, From_s, To_s]), + ejabberd_router:route(From, To, El) + end, queue:to_list(Queue)) + end; +resend_unacked_stanzas(_StateData) -> ok. %%%---------------------------------------------------------------------- diff --git a/src/jlib.erl b/src/jlib.erl index 46e864b0c..a362697f4 100644 --- a/src/jlib.erl +++ b/src/jlib.erl @@ -51,7 +51,7 @@ binary_to_integer/1, binary_to_integer/2, integer_to_binary/1, integer_to_binary/2, atom_to_binary/1, binary_to_atom/1, tuple_to_binary/1, - l2i/1, i2l/1, i2l/2]). + l2i/1, i2l/1, i2l/2, queue_drop_while/2]). %% TODO: Remove once XEP-0091 is Obsolete %% TODO: Remove once XEP-0091 is Obsolete @@ -894,3 +894,18 @@ i2l(L, N) when is_binary(L) -> C when C > N -> L; _ -> i2l(<<$0, L/binary>>, N) end. + +-spec queue_drop_while(fun((term()) -> boolean()), queue()) -> queue(). + +queue_drop_while(F, Q) -> + case queue:peek(Q) of + {value, Item} -> + case F(Item) of + true -> + queue_drop_while(F, queue:drop(Q)); + _ -> + Q + end; + empty -> + Q + end. From 88a200e100fec1411b8618005fd9d4ef6963b7a2 Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Sun, 16 Mar 2014 00:12:47 +0100 Subject: [PATCH 02/11] Remove some commented out code The code that had been commented out at some earlier point in time would now break XEP-0198. --- src/ejabberd_c2s.erl | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 044afd0f4..d89529d4e 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -2261,11 +2261,6 @@ resend_offline_messages(StateData) -> end, if Pass -> ejabberd_router:route(From, To, Packet); - %% send_element(StateData, FixedPacket), - %% ejabberd_hooks:run(user_receive_packet, - %% StateData#state.server, - %% [StateData#state.jid, - %% From, To, FixedPacket]); true -> ok end end, From e360c56f876e191b9ef4f7b2fd7d5fc3afad1cc1 Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Wed, 19 Mar 2014 00:51:33 +0100 Subject: [PATCH 03/11] Support XEP-0198 session resumption Implement the optional session resumption feature described in XEP-0198. A client that supports this feature may now resume the previous session (within a configurable number of seconds) if the connection was lost. During resumption, ejabberd will retransmit any stanzas that hadn't been acknowledged by the client. --- doc/guide.tex | 11 +- src/ejabberd_c2s.erl | 411 ++++++++++++++++++++++++++++++++++--------- src/ejabberd_sm.erl | 15 +- src/jlib.erl | 16 ++ 4 files changed, 359 insertions(+), 94 deletions(-) diff --git a/doc/guide.tex b/doc/guide.tex index 59f39e256..23752a27d 100644 --- a/doc/guide.tex +++ b/doc/guide.tex @@ -871,8 +871,8 @@ The available modules, their purpose and the options allowed by each one are: Handles c2s connections.\\ Options: \texttt{access}, \texttt{certfile}, \texttt{ciphers}, \texttt{max\_ack\_queue}, \texttt{max\_fsm\_queue}, - \texttt{max\_stanza\_size}, \texttt{shaper}, - \texttt{starttls}, \texttt{starttls\_required}, + \texttt{max\_stanza\_size}, \texttt{resume\_timeout}, + \texttt{shaper}, \texttt{starttls}, \texttt{starttls\_required}, \texttt{stream\_management}, \texttt{tls}, \texttt{zlib}, \texttt{tls\_compression} \titem{\texttt{ejabberd\_s2s\_in}} @@ -1007,6 +1007,13 @@ request_handlers: /"a"/"b": mod_foo /"http-bind": mod_http_bind \end{verbatim} + \titem{resume\_timeout: Seconds} + This option configures the number of seconds until a session times + out if the connection is lost. During this period of time, a client + may resume the session if \term{stream\_management} is enabled. This + option can be specified for \term{ejabberd\_c2s} listeners. Setting + it to \term{0} effectively disables session resumption. The default + value is \term{300}. \titem{service\_check\_from: true|false} \ind{options!service\_check\_from} This option can be used with \term{ejabberd\_service} only. diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index d89529d4e..fb3de4a6d 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -58,6 +58,7 @@ wait_for_bind/2, wait_for_session/2, wait_for_sasl_response/2, + wait_for_resume/2, session_established/2, handle_event/3, handle_sync_event/4, @@ -108,10 +109,13 @@ auth_module = unknown, ip, aux_fields = [], + sm_state, sm_xmlns, ack_queue, max_ack_queue, manage_stream = fun negotiate_stream_mgmt/2, + pending_since, + resume_timeout, n_stanzas_in = 0, n_stanzas_out = 0, lang}). @@ -166,6 +170,7 @@ -define(IS_STREAM_MGMT_TAG(Name), Name == <<"enable">>; + Name == <<"resume">>; Name == <<"a">>; Name == <<"r">>). @@ -183,6 +188,9 @@ -define(SM_BAD_REQUEST(Xmlns), ?SM_FAILED(<<"bad-request">>, Xmlns)). +-define(SM_ITEM_NOT_FOUND(Xmlns), + ?SM_FAILED(<<"item-not-found">>, Xmlns)). + -define(SM_SERVICE_UNAVAILABLE(Xmlns), ?SM_FAILED(<<"service-unavailable">>, Xmlns)). @@ -285,16 +293,18 @@ init([{SockMod, Socket}, Opts]) -> true -> TLSOpts1 end, TLSOpts = [verify_none | TLSOpts2], + StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true), + StreamMgmtState = if StreamMgmtEnabled -> inactive; + true -> disabled + end, MaxAckQueue = case proplists:get_value(max_ack_queue, Opts) of Limit when is_integer(Limit), Limit > 0 -> Limit; _ -> 500 end, - StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true), - AckQueue = if not StreamMgmtEnabled -> - none; - true -> - undefined - end, + ResumeTimeout = case proplists:get_value(resume_timeout, Opts) of + Timeout when is_integer(Timeout), Timeout >= 0 -> Timeout; + _ -> 300 + end, IP = peerip(SockMod, Socket), %% Check if IP is blacklisted: case is_ip_blacklisted(IP) of @@ -315,9 +325,11 @@ init([{SockMod, Socket}, Opts]) -> xml_socket = XMLSocket, zlib = Zlib, tls = TLS, tls_required = StartTLSRequired, tls_enabled = TLSEnabled, tls_options = TLSOpts, - streamid = new_id(), access = Access, - shaper = Shaper, ip = IP, - ack_queue = AckQueue, max_ack_queue = MaxAckQueue}, + sid = {now(), self()}, streamid = new_id(), + access = Access, shaper = Shaper, ip = IP, + sm_state = StreamMgmtState, + max_ack_queue = MaxAckQueue, + resume_timeout = ResumeTimeout}, {ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT} end. @@ -603,7 +615,6 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> ?INFO_MSG("(~w) Accepted legacy authentication for ~s by ~p", [StateData#state.socket, jlib:jid_to_string(JID), AuthModule]), - SID = {now(), self()}, Conn = (StateData#state.sockmod):get_conn_type( StateData#state.socket), Info = [{ip, StateData#state.ip}, {conn, Conn}, @@ -611,7 +622,9 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> Res = jlib:make_result_iq_reply( El#xmlel{children = []}), send_element(StateData, Res), - ejabberd_sm:open_session(SID, U, StateData#state.server, R, Info), + ejabberd_sm:open_session(StateData#state.sid, U, + StateData#state.server, R, + Info), change_shaper(StateData, JID), {Fs, Ts} = ejabberd_hooks:run_fold(roster_get_subscription_lists, @@ -629,7 +642,7 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> [U, StateData#state.server]), NewStateData = StateData#state{user = U, resource = R, - jid = JID, sid = SID, + jid = JID, conn = Conn, auth_module = AuthModule, pres_f = (?SETS):from_list(Fs1), @@ -981,9 +994,20 @@ resource_conflict_action(U, S, R) -> {accept_resource, Rnew} end. -wait_for_bind({xmlstreamelement, #xmlel{name = Name} = El}, StateData) +wait_for_bind({xmlstreamelement, #xmlel{name = Name, attrs = Attrs} = El}, + StateData) when ?IS_STREAM_MGMT_TAG(Name) -> - fsm_next_state(wait_for_bind, (StateData#state.manage_stream)(El, StateData)); + case Name of + <<"resume">> -> + case handle_resume(StateData, Attrs) of + {ok, ResumedState} -> + fsm_next_state(session_established, ResumedState); + error -> + fsm_next_state(wait_for_bind, StateData) + end; + _ -> + fsm_next_state(wait_for_bind, (StateData#state.manage_stream)(El, StateData)) + end; wait_for_bind({xmlstreamelement, El}, StateData) -> case jlib:iq_query_info(El) of #iq{type = set, xmlns = ?NS_BIND, sub_el = SubEl} = @@ -1077,15 +1101,13 @@ wait_for_session({xmlstreamelement, El}, StateData) -> privacy_get_user_list, NewState#state.server, #userlist{}, [U, NewState#state.server]), - SID = {now(), self()}, Conn = get_conn_type(NewState), Info = [{ip, NewState#state.ip}, {conn, Conn}, {auth_module, NewState#state.auth_module}], ejabberd_sm:open_session( - SID, U, NewState#state.server, R, Info), + NewState#state.sid, U, NewState#state.server, R, Info), UpdatedStateData = NewState#state{ - sid = SID, conn = Conn, pres_f = ?SETS:from_list(Fs1), pres_t = ?SETS:from_list(Ts1), @@ -1151,6 +1173,11 @@ session_established({xmlstreamerror, _}, StateData) -> send_element(StateData, ?INVALID_XML_ERR), send_trailer(StateData), {stop, normal, StateData}; +session_established(closed, StateData) + when StateData#state.resume_timeout > 0, + StateData#state.sm_state == active orelse + StateData#state.sm_state == pending -> + fsm_next_state(wait_for_resume, StateData#state{sm_state = pending}); session_established(closed, StateData) -> {stop, normal, StateData}. @@ -1240,6 +1267,17 @@ session_established2(El, StateData) -> [{xmlstreamelement, El}]), fsm_next_state(session_established, NewState). +wait_for_resume(timeout, StateData) -> + ?DEBUG("Timed out waiting for resumption of stream for ~s", + [jlib:jid_to_string(StateData#state.jid)]), + {stop, normal, StateData}; +wait_for_resume(closed, StateData) -> + ?DEBUG("Ignoring 'closed' event while waiting for resumption", []), + fsm_next_state(wait_for_resume, StateData); +wait_for_resume(Event, StateData) -> + ?WARNING_MSG("Ignoring event while waiting for resumption: ~p", [Event]), + fsm_next_state(wait_for_resume, StateData). + %%---------------------------------------------------------------------- %% Func: StateName/3 %% Returns: {next_state, NextStateName, NextStateData} | @@ -1284,6 +1322,15 @@ handle_sync_event(get_subscribed, _From, StateName, StateData) -> Subscribed = (?SETS):to_list(StateData#state.pres_f), {reply, Subscribed, StateName, StateData}; +handle_sync_event(resume_session, _From, _StateName, + StateData) -> + %% The old session should be closed before the new one is opened, so we do + %% this here instead of leaving it to the terminate callback + ejabberd_sm:close_session(StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource), + {stop, normal, {ok, StateData}, StateData#state{sm_state = resumed}}; handle_sync_event(_Event, _From, StateName, StateData) -> Reply = ok, fsm_reply(Reply, StateName, StateData). @@ -1612,7 +1659,13 @@ handle_info({route, From, To, handle_info({'DOWN', Monitor, _Type, _Object, _Info}, _StateName, StateData) when Monitor == StateData#state.socket_monitor -> - {stop, normal, StateData}; + if StateData#state.resume_timeout > 0, + StateData#state.sm_state == active orelse + StateData#state.sm_state == pending -> + fsm_next_state(wait_for_resume, StateData#state{sm_state = pending}); + true -> + {stop, normal, StateData} + end; handle_info(system_shutdown, StateName, StateData) -> case StateName of wait_for_stream -> @@ -1676,61 +1729,71 @@ print_state(State = #state{pres_t = T, pres_f = F, pres_a = A, pres_i = I}) -> %% Returns: any %%---------------------------------------------------------------------- terminate(_Reason, StateName, StateData) -> - case StateName of - session_established -> - case StateData#state.authenticated of - replaced -> - ?INFO_MSG("(~w) Replaced session for ~s", - [StateData#state.socket, - jlib:jid_to_string(StateData#state.jid)]), - From = StateData#state.jid, - Packet = #xmlel{name = <<"presence">>, - attrs = [{<<"type">>, <<"unavailable">>}], - children = - [#xmlel{name = <<"status">>, attrs = [], - children = - [{xmlcdata, - <<"Replaced by new connection">>}]}]}, - ejabberd_sm:close_session_unset_presence(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource, - <<"Replaced by new connection">>), - presence_broadcast(StateData, From, - StateData#state.pres_a, Packet), - presence_broadcast(StateData, From, - StateData#state.pres_i, Packet); - _ -> - ?INFO_MSG("(~w) Close session for ~s", - [StateData#state.socket, - jlib:jid_to_string(StateData#state.jid)]), - EmptySet = (?SETS):new(), - case StateData of - #state{pres_last = undefined, pres_a = EmptySet, pres_i = EmptySet, pres_invis = false} -> - ejabberd_sm:close_session(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource); - _ -> - From = StateData#state.jid, - Packet = #xmlel{name = <<"presence">>, - attrs = [{<<"type">>, <<"unavailable">>}], - children = []}, - ejabberd_sm:close_session_unset_presence(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource, - <<"">>), - presence_broadcast(StateData, From, - StateData#state.pres_a, Packet), - presence_broadcast(StateData, From, - StateData#state.pres_i, Packet) - end - end, - resend_unacked_stanzas(StateData), - bounce_messages(); + case StateData#state.sm_state of + resumed -> + ?INFO_MSG("Closing former stream of resumed session for ~s", + [jlib:jid_to_string(StateData#state.jid)]); _ -> - ok + if StateName == session_established; + StateName == wait_for_resume -> + case StateData#state.authenticated of + replaced -> + ?INFO_MSG("(~w) Replaced session for ~s", + [StateData#state.socket, + jlib:jid_to_string(StateData#state.jid)]), + From = StateData#state.jid, + Packet = #xmlel{name = <<"presence">>, + attrs = [{<<"type">>, <<"unavailable">>}], + children = + [#xmlel{name = <<"status">>, attrs = [], + children = + [{xmlcdata, + <<"Replaced by new connection">>}]}]}, + ejabberd_sm:close_session_unset_presence(StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource, + <<"Replaced by new connection">>), + presence_broadcast(StateData, From, + StateData#state.pres_a, Packet), + presence_broadcast(StateData, From, + StateData#state.pres_i, Packet), + resend_unacked_stanzas(StateData); + _ -> + ?INFO_MSG("(~w) Close session for ~s", + [StateData#state.socket, + jlib:jid_to_string(StateData#state.jid)]), + EmptySet = (?SETS):new(), + case StateData of + #state{pres_last = undefined, + pres_a = EmptySet, + pres_i = EmptySet, + pres_invis = false} -> + ejabberd_sm:close_session(StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource); + _ -> + From = StateData#state.jid, + Packet = #xmlel{name = <<"presence">>, + attrs = [{<<"type">>, <<"unavailable">>}], + children = []}, + ejabberd_sm:close_session_unset_presence(StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource, + <<"">>), + presence_broadcast(StateData, From, + StateData#state.pres_a, Packet), + presence_broadcast(StateData, From, + StateData#state.pres_i, Packet) + end, + resend_unacked_stanzas(StateData) + end, + bounce_messages(); + true -> + ok + end end, (StateData#state.sockmod):close(StateData#state.socket), ok. @@ -1745,6 +1808,8 @@ change_shaper(StateData, JID) -> (StateData#state.sockmod):change_shaper(StateData#state.socket, Shaper). +send_text(StateData, Text) when StateData#state.sm_state == pending -> + ?DEBUG("Cannot send text while waiting for resumption: ~p", [Text]); send_text(StateData, Text) when StateData#state.xml_socket -> ?DEBUG("Send Text on stream = ~p", [Text]), (StateData#state.sockmod):send_xml(StateData#state.socket, @@ -1753,13 +1818,17 @@ send_text(StateData, Text) -> ?DEBUG("Send XML on stream = ~p", [Text]), (StateData#state.sockmod):send(StateData#state.socket, Text). +send_element(StateData, El) when StateData#state.sm_state == pending -> + ?DEBUG("Cannot send element while waiting for resumption: ~p", [El]); send_element(StateData, El) when StateData#state.xml_socket -> (StateData#state.sockmod):send_xml(StateData#state.socket, {xmlstreamelement, El}); send_element(StateData, El) -> send_text(StateData, xml:element_to_binary(El)). -send_stanza(StateData, Stanza) when StateData#state.sm_xmlns /= undefined -> +send_stanza(StateData, Stanza) when StateData#state.sm_state == pending -> + ack_queue_add(StateData, Stanza); +send_stanza(StateData, Stanza) when StateData#state.sm_state == active -> send_stanza_and_ack_req(StateData, Stanza), ack_queue_add(StateData, Stanza); send_stanza(StateData, Stanza) -> @@ -2356,6 +2425,15 @@ fsm_next_state_gc(StateName, PackedStateData) -> fsm_next_state(session_established, StateData) -> {next_state, session_established, StateData, ?C2S_HIBERNATE_TIMEOUT}; +fsm_next_state(wait_for_resume, #state{pending_since = undefined} = + StateData) -> + {next_state, wait_for_resume, + StateData#state{pending_since = os:timestamp()}, + StateData#state.resume_timeout}; +fsm_next_state(wait_for_resume, StateData) -> + Diff = timer:now_diff(os:timestamp(), StateData#state.pending_since), + Timeout = max(StateData#state.resume_timeout - Diff div 1000, 1), + {next_state, wait_for_resume, StateData, Timeout}; fsm_next_state(StateName, StateData) -> {next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}. @@ -2364,6 +2442,15 @@ fsm_next_state(StateName, StateData) -> fsm_reply(Reply, session_established, StateData) -> {reply, Reply, session_established, StateData, ?C2S_HIBERNATE_TIMEOUT}; +fsm_reply(Reply, wait_for_resume, #state{pending_since = undefined} = + StateData) -> + {reply, Reply, wait_for_resume, + StateData#state{pending_since = os:timestamp()}, + StateData#state.resume_timeout}; +fsm_reply(Reply, wait_for_resume, StateData) -> + Diff = timer:now_diff(os:timestamp(), StateData#state.pending_since), + Timeout = max(StateData#state.resume_timeout - Diff div 1000, 1), + {reply, Reply, wait_for_resume, StateData, Timeout}; fsm_reply(Reply, StateName, StateData) -> {reply, Reply, StateName, StateData, ?C2S_OPEN_TIMEOUT}. @@ -2462,7 +2549,7 @@ route_blocking(What, StateData) -> %%% XEP-0198 %%%---------------------------------------------------------------------- -stream_mgmt_enabled(#state{ack_queue = none}) -> +stream_mgmt_enabled(#state{sm_state = disabled}) -> false; stream_mgmt_enabled(_StateData) -> true. @@ -2470,8 +2557,9 @@ stream_mgmt_enabled(_StateData) -> negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) -> %% XEP-0198 says: "For client-to-server connections, the client MUST NOT %% attempt to enable stream management until after it has completed Resource - %% Binding". However, it also says: "Stream management errors SHOULD be - %% considered recoverable", so we won't bail out. + %% Binding unless it is resuming a previous session". However, it also + %% says: "Stream management errors SHOULD be considered recoverable", so we + %% won't bail out. send_element(StateData, ?SM_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)), StateData; negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> @@ -2481,10 +2569,11 @@ negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> true -> case Name of <<"enable">> -> - handle_enable(StateData#state{sm_xmlns = Xmlns}); + handle_enable(StateData#state{sm_xmlns = Xmlns}, Attrs); _ -> Res = if Name == <<"a">>; - Name == <<"r">> -> + Name == <<"r">>; + Name == <<"resume">> -> ?SM_UNEXPECTED_REQUEST(Xmlns); true -> ?SM_BAD_REQUEST(Xmlns) @@ -2510,7 +2599,8 @@ perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> <<"a">> -> handle_a(StateData, Attrs); _ -> - Res = if Name == <<"enable">> -> + Res = if Name == <<"enable">>; + Name == <<"resume">> -> ?SM_UNEXPECTED_REQUEST(Xmlns); true -> ?SM_BAD_REQUEST(Xmlns) @@ -2524,15 +2614,40 @@ perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> StateData end. -handle_enable(StateData) -> - ?INFO_MSG("Enabling XEP-0198 stream management for ~s", - [jlib:jid_to_string(StateData#state.jid)]), +handle_enable(#state{resume_timeout = ConfigTimeout} = StateData, Attrs) -> + Timeout = case xml:get_attr_s(<<"resume">>, Attrs) of + ResumeAttr when ResumeAttr == <<"true">>; + ResumeAttr == <<"1">> -> + MaxAttr = xml:get_attr_s(<<"max">>, Attrs), + case catch jlib:binary_to_integer(MaxAttr) of + Max when is_integer(Max), Max > 0, Max =< ConfigTimeout -> + Max; + _ -> + ConfigTimeout + end; + _ -> + 0 + end, + ResAttrs = [{<<"xmlns">>, StateData#state.sm_xmlns}] ++ + if Timeout > 0 -> + ?INFO_MSG("Stream management with resumption enabled for ~s", + [jlib:jid_to_string(StateData#state.jid)]), + [{<<"id">>, make_resume_id(StateData)}, + {<<"resume">>, <<"true">>}, + {<<"max">>, jlib:integer_to_binary(Timeout)}]; + true -> + ?INFO_MSG("Stream management without resumption enabled for ~s", + [jlib:jid_to_string(StateData#state.jid)]), + [] + end, Res = #xmlel{name = <<"enabled">>, - attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}], + attrs = ResAttrs, children = []}, send_element(StateData, Res), - StateData#state{ack_queue = queue:new(), - manage_stream = fun perform_stream_mgmt/2}. + StateData#state{sm_state = active, + ack_queue = queue:new(), + manage_stream = fun perform_stream_mgmt/2, + resume_timeout = Timeout * 1000}. handle_r(StateData) -> H = jlib:integer_to_binary(StateData#state.n_stanzas_in), @@ -2559,7 +2674,59 @@ handle_a(#state{jid = JID, n_stanzas_out = NumStanzasOut} = StateData, Attrs) -> StateData end. -update_num_stanzas_in(StateData, El) when StateData#state.sm_xmlns /= undefined -> +handle_resume(StateData, Attrs) -> + R = case xml:get_attr_s(<<"xmlns">>, Attrs) of + Xmlns when ?IS_SUPPORTED_SM_XMLNS(Xmlns) -> + case stream_mgmt_enabled(StateData) of + true -> + case {xml:get_attr(<<"previd">>, Attrs), + catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs))} + of + {{value, PrevID}, H} when is_integer(H) -> + case inherit_session_state(StateData, PrevID) of + {ok, InheritedState} -> + {ok, InheritedState, H}; + {error, Err} -> + {error, ?SM_ITEM_NOT_FOUND(Xmlns), Err} + end; + _ -> + {error, ?SM_BAD_REQUEST(Xmlns), <<"Invalid request">>} + end; + false -> + {error, ?SM_SERVICE_UNAVAILABLE(Xmlns), <<"XEP-0198 disabled">>} + end; + _ -> + {error, ?SM_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3), <<"Invalid XMLNS">>} + end, + case R of + {ok, ResumedState, NumHandled} -> + NewState = ack_queue_drop(ResumedState, NumHandled), + AttrXmlns = NewState#state.sm_xmlns, + AttrId = make_resume_id(NewState), + AttrH = jlib:integer_to_binary(NewState#state.n_stanzas_in), + send_element(NewState, + #xmlel{name = <<"resumed">>, + attrs = [{<<"xmlns">>, AttrXmlns}, + {<<"h">>, AttrH}, + {<<"previd">>, AttrId}], + children = []}), + SendFun = fun(_F, _T, El) -> send_element(NewState, El) end, + resend_unacked_stanzas(NewState, SendFun), + send_element(NewState, + #xmlel{name = <<"r">>, + attrs = [{<<"xmlns">>, AttrXmlns}], + children = []}), + ?INFO_MSG("Resumed session for ~s", + [jlib:jid_to_string(NewState#state.jid)]), + {ok, NewState}; + {error, El, Msg} -> + send_element(StateData, El), + ?WARNING_MSG("Cannot resume session for ~s@~s: ~s", + [StateData#state.user, StateData#state.server, Msg]), + error + end. + +update_num_stanzas_in(#state{sm_state = active} = StateData, El) -> NewNum = case {is_stanza(El), StateData#state.n_stanzas_in} of {true, 4294967295} -> 0; @@ -2612,7 +2779,8 @@ limit_queue_length(#state{jid = JID, StateData end. -resend_unacked_stanzas(StateData) when StateData#state.sm_xmlns /= undefined -> +resend_unacked_stanzas(StateData, F) when StateData#state.sm_state == active; + StateData#state.sm_state == pending -> Queue = StateData#state.ack_queue, case queue:len(Queue) of 0 -> @@ -2628,12 +2796,79 @@ resend_unacked_stanzas(StateData) when StateData#state.sm_xmlns /= undefined -> To = jlib:string_to_jid(To_s), ?DEBUG("Resending unacknowledged stanza #~B from ~s to ~s", [Num, From_s, To_s]), - ejabberd_router:route(From, To, El) + F(From, To, El) end, queue:to_list(Queue)) end; +resend_unacked_stanzas(_StateData, _F) -> + ok. + +resend_unacked_stanzas(StateData) when StateData#state.sm_state == active; + StateData#state.sm_state == pending -> + resend_unacked_stanzas(StateData, fun ejabberd_router:route/3); resend_unacked_stanzas(_StateData) -> ok. +inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) -> + case jlib:base64_to_term(ResumeID) of + {term, {U, S, R, Time}} -> + case ejabberd_sm:get_session_pid(U, S, R) of + none -> + {error, <<"Previous session PID not found">>}; + OldPID -> + OldSID = {Time, OldPID}, + case catch resume_session(OldPID) of + {ok, #state{sid = OldSID} = OldStateData} -> + NewSID = {Time, self()}, % Old time, new PID + Priority = case OldStateData#state.pres_last of + undefined -> + 0; + Presence -> + get_priority_from_presence(Presence) + end, + Conn = get_conn_type(StateData), + Info = [{ip, StateData#state.ip}, {conn, Conn}, + {auth_module, StateData#state.auth_module}], + ejabberd_sm:open_session(NewSID, U, S, R, + Priority, Info), + {ok, StateData#state{sid = NewSID, + jid = OldStateData#state.jid, + resource = OldStateData#state.resource, + pres_t = OldStateData#state.pres_t, + pres_f = OldStateData#state.pres_f, + pres_a = OldStateData#state.pres_a, + pres_i = OldStateData#state.pres_i, + pres_last = OldStateData#state.pres_last, + pres_pri = OldStateData#state.pres_pri, + pres_timestamp = OldStateData#state.pres_timestamp, + pres_invis = OldStateData#state.pres_invis, + privacy_list = OldStateData#state.privacy_list, + aux_fields = OldStateData#state.aux_fields, + sm_xmlns = OldStateData#state.sm_xmlns, + ack_queue = OldStateData#state.ack_queue, + manage_stream = fun perform_stream_mgmt/2, + resume_timeout = OldStateData#state.resume_timeout, + n_stanzas_in = OldStateData#state.n_stanzas_in, + n_stanzas_out = OldStateData#state.n_stanzas_out, + sm_state = active}}; + _ -> + {error, <<"Cannot grab session state">>} + end + end; + error -> + {error, <<"Invalid 'previd' value">>} + end. + +resume_session(FsmRef) -> + (?GEN_FSM):sync_send_all_state_event(FsmRef, resume_session, 3000). + +make_resume_id(StateData) -> + {Time, _} = StateData#state.sid, + ID = {StateData#state.user, + StateData#state.server, + StateData#state.resource, + Time}, + jlib:term_to_base64(ID). + %%%---------------------------------------------------------------------- %%% JID Set memory footprint reduction code %%%---------------------------------------------------------------------- diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index c7e277a8d..6bcacbe78 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -33,7 +33,9 @@ %% API -export([start_link/0, route/3, - open_session/5, close_session/4, + open_session/5, + open_session/6, + close_session/4, check_in_subscription/6, bounce_offline_message/3, disconnect_removed_user/2, @@ -107,10 +109,10 @@ route(From, To, Packet) -> _ -> ok end. --spec open_session(sid(), binary(), binary(), binary(), info()) -> ok. +-spec open_session(sid(), binary(), binary(), binary(), prio(), info()) -> ok. -open_session(SID, User, Server, Resource, Info) -> - set_session(SID, User, Server, Resource, undefined, Info), +open_session(SID, User, Server, Resource, Priority, Info) -> + set_session(SID, User, Server, Resource, Priority, Info), mnesia:dirty_update_counter(session_counter, jlib:nameprep(Server), 1), check_for_sessions_to_replace(User, Server, Resource), @@ -118,6 +120,11 @@ open_session(SID, User, Server, Resource, Info) -> ejabberd_hooks:run(sm_register_connection_hook, JID#jid.lserver, [SID, JID, Info]). +-spec open_session(sid(), binary(), binary(), binary(), info()) -> ok. + +open_session(SID, User, Server, Resource, Info) -> + open_session(SID, User, Server, Resource, undefined, Info). + -spec close_session(sid(), binary(), binary(), binary()) -> ok. close_session(SID, User, Server, Resource) -> diff --git a/src/jlib.erl b/src/jlib.erl index a362697f4..ffabb3ffe 100644 --- a/src/jlib.erl +++ b/src/jlib.erl @@ -46,6 +46,7 @@ timestamp_to_iso/2, timestamp_to_xml/4, timestamp_to_xml/1, now_to_utc_string/1, now_to_local_string/1, datetime_string_to_timestamp/1, + term_to_base64/1, base64_to_term/1, decode_base64/1, encode_base64/1, ip_to_list/1, rsm_encode/1, rsm_encode/2, rsm_decode/1, binary_to_integer/1, binary_to_integer/2, @@ -780,6 +781,21 @@ check_list(List) -> % Base64 stuff (based on httpd_util.erl) % +-spec term_to_base64(term()) -> binary(). + +term_to_base64(Term) -> + encode_base64(term_to_binary(Term)). + +-spec base64_to_term(binary()) -> {term, term()} | error. + +base64_to_term(Base64) -> + case catch binary_to_term(decode_base64(Base64), [safe]) of + {'EXIT', _} -> + error; + Term -> + {term, Term} + end. + -spec decode_base64(binary()) -> binary(). decode_base64(S) -> From 2da6933bb77d1a512d486ce7f873f222b137249b Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Sat, 22 Mar 2014 20:25:43 +0100 Subject: [PATCH 04/11] Remove "fun" element from c2s #state Memory consumption wise, local "fun" references are quite expensive. --- src/ejabberd_c2s.erl | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index fb3de4a6d..4955f6e42 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -113,7 +113,6 @@ sm_xmlns, ack_queue, max_ack_queue, - manage_stream = fun negotiate_stream_mgmt/2, pending_since, resume_timeout, n_stanzas_in = 0, @@ -545,7 +544,7 @@ wait_for_stream(closed, StateData) -> wait_for_auth({xmlstreamelement, #xmlel{name = Name} = El}, StateData) when ?IS_STREAM_MGMT_TAG(Name) -> - fsm_next_state(wait_for_auth, (StateData#state.manage_stream)(El, StateData)); + fsm_next_state(wait_for_auth, dispatch_stream_mgmt(El, StateData)); wait_for_auth({xmlstreamelement, El}, StateData) -> case is_auth_packet(El) of {auth, _ID, get, {U, _, _, _}} -> @@ -695,7 +694,7 @@ wait_for_auth(closed, StateData) -> wait_for_feature_request({xmlstreamelement, #xmlel{name = Name} = El}, StateData) when ?IS_STREAM_MGMT_TAG(Name) -> - fsm_next_state(wait_for_feature_request, (StateData#state.manage_stream)(El, StateData)); + fsm_next_state(wait_for_feature_request, dispatch_stream_mgmt(El, StateData)); wait_for_feature_request({xmlstreamelement, El}, StateData) -> #xmlel{name = Name, attrs = Attrs, children = Els} = El, @@ -863,7 +862,7 @@ wait_for_feature_request(closed, StateData) -> wait_for_sasl_response({xmlstreamelement, #xmlel{name = Name} = El}, StateData) when ?IS_STREAM_MGMT_TAG(Name) -> - fsm_next_state(wait_for_sasl_response, (StateData#state.manage_stream)(El, StateData)); + fsm_next_state(wait_for_sasl_response, dispatch_stream_mgmt(El, StateData)); wait_for_sasl_response({xmlstreamelement, El}, StateData) -> #xmlel{name = Name, attrs = Attrs, children = Els} = El, @@ -1006,7 +1005,7 @@ wait_for_bind({xmlstreamelement, #xmlel{name = Name, attrs = Attrs} = El}, fsm_next_state(wait_for_bind, StateData) end; _ -> - fsm_next_state(wait_for_bind, (StateData#state.manage_stream)(El, StateData)) + fsm_next_state(wait_for_bind, dispatch_stream_mgmt(El, StateData)) end; wait_for_bind({xmlstreamelement, El}, StateData) -> case jlib:iq_query_info(El) of @@ -1071,7 +1070,7 @@ wait_for_bind(closed, StateData) -> wait_for_session({xmlstreamelement, #xmlel{name = Name} = El}, StateData) when ?IS_STREAM_MGMT_TAG(Name) -> - fsm_next_state(wait_for_session, (StateData#state.manage_stream)(El, StateData)); + fsm_next_state(wait_for_session, dispatch_stream_mgmt(El, StateData)); wait_for_session({xmlstreamelement, El}, StateData) -> NewStateData = update_num_stanzas_in(StateData, El), case jlib:iq_query_info(El) of @@ -1141,7 +1140,7 @@ wait_for_session(closed, StateData) -> session_established({xmlstreamelement, #xmlel{name = Name} = El}, StateData) when ?IS_STREAM_MGMT_TAG(Name) -> - fsm_next_state(session_established, (StateData#state.manage_stream)(El, StateData)); + fsm_next_state(session_established, dispatch_stream_mgmt(El, StateData)); session_established({xmlstreamelement, El}, StateData) -> FromJID = StateData#state.jid, @@ -2554,6 +2553,11 @@ stream_mgmt_enabled(#state{sm_state = disabled}) -> stream_mgmt_enabled(_StateData) -> true. +dispatch_stream_mgmt(El, #state{sm_state = active} = StateData) -> + perform_stream_mgmt(El, StateData); +dispatch_stream_mgmt(El, StateData) -> + negotiate_stream_mgmt(El, StateData). + negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) -> %% XEP-0198 says: "For client-to-server connections, the client MUST NOT %% attempt to enable stream management until after it has completed Resource @@ -2646,7 +2650,6 @@ handle_enable(#state{resume_timeout = ConfigTimeout} = StateData, Attrs) -> send_element(StateData, Res), StateData#state{sm_state = active, ack_queue = queue:new(), - manage_stream = fun perform_stream_mgmt/2, resume_timeout = Timeout * 1000}. handle_r(StateData) -> @@ -2845,7 +2848,6 @@ inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) -> aux_fields = OldStateData#state.aux_fields, sm_xmlns = OldStateData#state.sm_xmlns, ack_queue = OldStateData#state.ack_queue, - manage_stream = fun perform_stream_mgmt/2, resume_timeout = OldStateData#state.resume_timeout, n_stanzas_in = OldStateData#state.n_stanzas_in, n_stanzas_out = OldStateData#state.n_stanzas_out, From a97c716352c3baec1416f5e0ee303c996bb5900c Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Tue, 25 Mar 2014 23:23:38 +0100 Subject: [PATCH 05/11] XEP-0198: Bounce unacked stanzas by default If the new "resend_on_timeout" option is set to false (which it is by default), bounce any unacknowledged stanzas instead of re-routing them. --- doc/guide.tex | 15 +++++++++++++-- src/ejabberd_c2s.erl | 36 +++++++++++++++++++++++------------- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/doc/guide.tex b/doc/guide.tex index 23752a27d..714a62a57 100644 --- a/doc/guide.tex +++ b/doc/guide.tex @@ -871,8 +871,9 @@ The available modules, their purpose and the options allowed by each one are: Handles c2s connections.\\ Options: \texttt{access}, \texttt{certfile}, \texttt{ciphers}, \texttt{max\_ack\_queue}, \texttt{max\_fsm\_queue}, - \texttt{max\_stanza\_size}, \texttt{resume\_timeout}, - \texttt{shaper}, \texttt{starttls}, \texttt{starttls\_required}, + \texttt{max\_stanza\_size}, \texttt{resend\_on\_timeout}, + \texttt{resume\_timeout}, \texttt{shaper}, + \texttt{starttls}, \texttt{starttls\_required}, \texttt{stream\_management}, \texttt{tls}, \texttt{zlib}, \texttt{tls\_compression} \titem{\texttt{ejabberd\_s2s\_in}} @@ -1007,6 +1008,16 @@ request_handlers: /"a"/"b": mod_foo /"http-bind": mod_http_bind \end{verbatim} + \titem{resend\_on\_timeout: true|false} + If \term{stream\_management} is enabled and this option is set to + \term{true}, any stanzas that weren't acknowledged by the client + will be resent on session timeout. This behavior might often be + desired, but could have unexpected results under certain + circumstances. For example, a message that was sent to two resources + might get resent to one of them if the other one timed out. + Therefore, the default value for this option is \term{false}, which + tells ejabberd to generate an error message instead. The option can + be specified for \term{ejabberd\_c2s} listeners. \titem{resume\_timeout: Seconds} This option configures the number of seconds until a session times out if the connection is lost. During this period of time, a client diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 4955f6e42..0051266c7 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -115,6 +115,7 @@ max_ack_queue, pending_since, resume_timeout, + resend_on_timeout, n_stanzas_in = 0, n_stanzas_out = 0, lang}). @@ -304,6 +305,7 @@ init([{SockMod, Socket}, Opts]) -> Timeout when is_integer(Timeout), Timeout >= 0 -> Timeout; _ -> 300 end, + ResendOnTimeout = proplists:get_bool(resend_on_timeout, Opts), IP = peerip(SockMod, Socket), %% Check if IP is blacklisted: case is_ip_blacklisted(IP) of @@ -328,7 +330,8 @@ init([{SockMod, Socket}, Opts]) -> access = Access, shaper = Shaper, ip = IP, sm_state = StreamMgmtState, max_ack_queue = MaxAckQueue, - resume_timeout = ResumeTimeout}, + resume_timeout = ResumeTimeout, + resend_on_timeout = ResendOnTimeout}, {ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT} end. @@ -1757,7 +1760,7 @@ terminate(_Reason, StateName, StateData) -> StateData#state.pres_a, Packet), presence_broadcast(StateData, From, StateData#state.pres_i, Packet), - resend_unacked_stanzas(StateData); + handle_unacked_stanzas(StateData); _ -> ?INFO_MSG("(~w) Close session for ~s", [StateData#state.socket, @@ -1787,7 +1790,7 @@ terminate(_Reason, StateName, StateData) -> presence_broadcast(StateData, From, StateData#state.pres_i, Packet) end, - resend_unacked_stanzas(StateData) + handle_unacked_stanzas(StateData) end, bounce_messages(); true -> @@ -2714,7 +2717,7 @@ handle_resume(StateData, Attrs) -> {<<"previd">>, AttrId}], children = []}), SendFun = fun(_F, _T, El) -> send_element(NewState, El) end, - resend_unacked_stanzas(NewState, SendFun), + handle_unacked_stanzas(NewState, SendFun), send_element(NewState, #xmlel{name = <<"r">>, attrs = [{<<"xmlns">>, AttrXmlns}], @@ -2782,33 +2785,40 @@ limit_queue_length(#state{jid = JID, StateData end. -resend_unacked_stanzas(StateData, F) when StateData#state.sm_state == active; +handle_unacked_stanzas(StateData, F) when StateData#state.sm_state == active; StateData#state.sm_state == pending -> Queue = StateData#state.ack_queue, case queue:len(Queue) of 0 -> ok; N -> - ?INFO_MSG("Resending ~B unacknowledged stanzas to ~s", + ?INFO_MSG("~B stanzas were not acknowledged by ~s", [N, jlib:jid_to_string(StateData#state.jid)]), lists:foreach( - fun({Num, #xmlel{attrs = Attrs} = El}) -> + fun({_, #xmlel{attrs = Attrs} = El}) -> From_s = xml:get_attr_s(<<"from">>, Attrs), From = jlib:string_to_jid(From_s), To_s = xml:get_attr_s(<<"to">>, Attrs), To = jlib:string_to_jid(To_s), - ?DEBUG("Resending unacknowledged stanza #~B from ~s to ~s", - [Num, From_s, To_s]), F(From, To, El) end, queue:to_list(Queue)) end; -resend_unacked_stanzas(_StateData, _F) -> +handle_unacked_stanzas(_StateData, _F) -> ok. -resend_unacked_stanzas(StateData) when StateData#state.sm_state == active; +handle_unacked_stanzas(StateData) when StateData#state.sm_state == active; StateData#state.sm_state == pending -> - resend_unacked_stanzas(StateData, fun ejabberd_router:route/3); -resend_unacked_stanzas(_StateData) -> + F = case StateData#state.resend_on_timeout of + true -> + fun ejabberd_router:route/3; + false -> + fun(From, To, El) -> + Err = jlib:make_error_reply(El, ?ERR_SERVICE_UNAVAILABLE), + ejabberd_router:route(To, From, Err) + end + end, + handle_unacked_stanzas(StateData, F); +handle_unacked_stanzas(_StateData) -> ok. inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) -> From c114eb373650d270082e81406f13db035de33385 Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Mon, 7 Apr 2014 21:21:11 +0200 Subject: [PATCH 06/11] XEP-0198: Don't bounce/resend forwarded messages On connection timeout, drop any messages that were forwarded by some encapsulating protocol, such as XEP-0280 carbon copies or XEP-0313 archive messages. Bouncing or resending them could easily lead to unexpected results. --- src/ejabberd_c2s.erl | 60 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 52 insertions(+), 8 deletions(-) diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 0051266c7..17b54e3c5 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -2808,19 +2808,63 @@ handle_unacked_stanzas(_StateData, _F) -> handle_unacked_stanzas(StateData) when StateData#state.sm_state == active; StateData#state.sm_state == pending -> - F = case StateData#state.resend_on_timeout of - true -> - fun ejabberd_router:route/3; - false -> - fun(From, To, El) -> - Err = jlib:make_error_reply(El, ?ERR_SERVICE_UNAVAILABLE), - ejabberd_router:route(To, From, Err) - end + ReRoute = case StateData#state.resend_on_timeout of + true -> + fun ejabberd_router:route/3; + false -> + fun(From, To, El) -> + Err = + jlib:make_error_reply(El, + ?ERR_SERVICE_UNAVAILABLE), + ejabberd_router:route(To, From, Err) + end + end, + F = fun(From, To, El) -> + %% We'll drop the stanza if it was by some + %% encapsulating protocol as per XEP-0297. One such protocol is + %% XEP-0280, which says: "When a receiving server attempts to + %% deliver a forked message, and that message bounces with an + %% error for any reason, the receiving server MUST NOT forward + %% that error back to the original sender." Resending such a + %% stanza could easily lead to unexpected results as well. + case is_encapsulated_forward(El) of + true -> + ?DEBUG("Dropping forwarded stanza from ~s", + [xml:get_attr_s(<<"from">>, El#xmlel.attrs)]); + false -> + ReRoute(From, To, El) + end end, handle_unacked_stanzas(StateData, F); handle_unacked_stanzas(_StateData) -> ok. +is_encapsulated_forward(#xmlel{name = <<"message">>} = El) -> + SubTag = case {xml:get_subtag(El, <<"sent">>), + xml:get_subtag(El, <<"received">>), + xml:get_subtag(El, <<"result">>)} of + {false, false, false} -> + false; + {Tag, false, false} -> + Tag; + {false, Tag, false} -> + Tag; + {_, _, Tag} -> + Tag + end, + if SubTag == false -> + false; + true -> + case xml:get_subtag(SubTag, <<"forwarded">>) of + false -> + false; + _ -> + true + end + end; +is_encapsulated_forward(_El) -> + false. + inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) -> case jlib:base64_to_term(ResumeID) of {term, {U, S, R, Time}} -> From a21d2298afcf874df85fb018b22c203411ab9462 Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Mon, 28 Apr 2014 01:01:30 +0200 Subject: [PATCH 07/11] XEP-0198: Turn some warnings into info messages Don't log warnings on events that will happen during normal operation. --- src/ejabberd_c2s.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 17b54e3c5..ba8c6eab6 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -1277,7 +1277,7 @@ wait_for_resume(closed, StateData) -> ?DEBUG("Ignoring 'closed' event while waiting for resumption", []), fsm_next_state(wait_for_resume, StateData); wait_for_resume(Event, StateData) -> - ?WARNING_MSG("Ignoring event while waiting for resumption: ~p", [Event]), + ?INFO_MSG("Ignoring event while waiting for resumption: ~p", [Event]), fsm_next_state(wait_for_resume, StateData). %%---------------------------------------------------------------------- @@ -2727,8 +2727,8 @@ handle_resume(StateData, Attrs) -> {ok, NewState}; {error, El, Msg} -> send_element(StateData, El), - ?WARNING_MSG("Cannot resume session for ~s@~s: ~s", - [StateData#state.user, StateData#state.server, Msg]), + ?INFO_MSG("Cannot resume session for ~s@~s: ~s", + [StateData#state.user, StateData#state.server, Msg]), error end. From a0917a8e9b8f1cfb5a5e18eaea0ef0c36c7b1f7e Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Sun, 4 May 2014 23:08:42 +0200 Subject: [PATCH 08/11] XEP-0198: Log message when waiting for resumption Log an informational message when a session goes into the pending state (waiting for resumption) after the connection was lost. Administrators may well be interested in this state change when looking into issues. --- src/ejabberd_c2s.erl | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index ba8c6eab6..9f7ab4b89 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -1179,6 +1179,7 @@ session_established(closed, StateData) when StateData#state.resume_timeout > 0, StateData#state.sm_state == active orelse StateData#state.sm_state == pending -> + log_pending_state(StateData), fsm_next_state(wait_for_resume, StateData#state{sm_state = pending}); session_established(closed, StateData) -> {stop, normal, StateData}. @@ -1664,6 +1665,7 @@ handle_info({'DOWN', Monitor, _Type, _Object, _Info}, if StateData#state.resume_timeout > 0, StateData#state.sm_state == active orelse StateData#state.sm_state == pending -> + log_pending_state(StateData), fsm_next_state(wait_for_resume, StateData#state{sm_state = pending}); true -> {stop, normal, StateData} @@ -2785,6 +2787,12 @@ limit_queue_length(#state{jid = JID, StateData end. +log_pending_state(StateData) when StateData#state.sm_state /= pending -> + ?INFO_MSG("Waiting for resumption of stream for ~s", + [jlib:jid_to_string(StateData#state.jid)]); +log_pending_state(_StateData) -> + ok. + handle_unacked_stanzas(StateData, F) when StateData#state.sm_state == active; StateData#state.sm_state == pending -> Queue = StateData#state.ack_queue, From 32abcbca6cd2462b6bcb6d056514fa24d2caed17 Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Mon, 5 May 2014 00:02:55 +0200 Subject: [PATCH 09/11] XEP-0198: Accept stream elements in pending state Due to timing issues, ejabberd_c2s might receive stream elements from the client while the session is waiting for stream resumption. Those elements are now accepted. --- src/ejabberd_c2s.erl | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 9f7ab4b89..314f07f8c 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -1270,15 +1270,15 @@ session_established2(El, StateData) -> [{xmlstreamelement, El}]), fsm_next_state(session_established, NewState). +wait_for_resume({xmlstreamelement, _El} = Event, StateData) -> + session_established(Event, StateData), + fsm_next_state(wait_for_resume, StateData); wait_for_resume(timeout, StateData) -> ?DEBUG("Timed out waiting for resumption of stream for ~s", [jlib:jid_to_string(StateData#state.jid)]), {stop, normal, StateData}; -wait_for_resume(closed, StateData) -> - ?DEBUG("Ignoring 'closed' event while waiting for resumption", []), - fsm_next_state(wait_for_resume, StateData); wait_for_resume(Event, StateData) -> - ?INFO_MSG("Ignoring event while waiting for resumption: ~p", [Event]), + ?DEBUG("Ignoring event while waiting for resumption: ~p", [Event]), fsm_next_state(wait_for_resume, StateData). %%---------------------------------------------------------------------- @@ -1873,6 +1873,9 @@ send_header(StateData, Server, Version, Lang) -> LangStr]), send_text(StateData, iolist_to_binary(Header)). +send_trailer(StateData) + when StateData#state.sm_state == pending -> + ?DEBUG("Cannot send stream trailer while waiting for resumption", []); send_trailer(StateData) when StateData#state.xml_socket -> (StateData#state.sockmod):send_xml(StateData#state.socket, @@ -2558,7 +2561,8 @@ stream_mgmt_enabled(#state{sm_state = disabled}) -> stream_mgmt_enabled(_StateData) -> true. -dispatch_stream_mgmt(El, #state{sm_state = active} = StateData) -> +dispatch_stream_mgmt(El, StateData) when StateData#state.sm_state == active; + StateData#state.sm_state == pending -> perform_stream_mgmt(El, StateData); dispatch_stream_mgmt(El, StateData) -> negotiate_stream_mgmt(El, StateData). From 3b3f3b9131fa09c0ee33c5729de068307d995a59 Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Mon, 5 May 2014 01:11:14 +0200 Subject: [PATCH 10/11] XEP-0198: Don't log protocol issues There are corner cases where certain clients acknowledge more stanzas than they received. Nothing really bad will happen in those cases, and server administrators can't do anything about such issues anyway. --- src/ejabberd_c2s.erl | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 314f07f8c..a1774b73a 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -2672,10 +2672,6 @@ handle_r(StateData) -> handle_a(#state{jid = JID, n_stanzas_out = NumStanzasOut} = StateData, Attrs) -> case catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs)) of - H when is_integer(H), H > NumStanzasOut -> - ?WARNING_MSG("~s acknowledged ~B stanzas, but only ~B were sent", - [jlib:jid_to_string(JID), H, NumStanzasOut]), - ack_queue_drop(StateData, NumStanzasOut); H when is_integer(H), H >= 0 -> ?DEBUG("~s acknowledged ~B of ~B stanzas", [jlib:jid_to_string(JID), H, NumStanzasOut]), From 8b1f92575a2d59c6784e48a8d439bbb3d3761033 Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Tue, 6 May 2014 07:27:10 +0200 Subject: [PATCH 11/11] XEP-0198: Use "mgmt_" prefix for all #state fields Prefix all ejabberd_c2s #state fields that are used for stream management with "mgmt_". --- src/ejabberd_c2s.erl | 236 ++++++++++++++++++++++--------------------- 1 file changed, 123 insertions(+), 113 deletions(-) diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index a1774b73a..7117e46ab 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -109,15 +109,15 @@ auth_module = unknown, ip, aux_fields = [], - sm_state, - sm_xmlns, - ack_queue, - max_ack_queue, - pending_since, - resume_timeout, - resend_on_timeout, - n_stanzas_in = 0, - n_stanzas_out = 0, + mgmt_state, + mgmt_xmlns, + mgmt_queue, + mgmt_max_queue, + mgmt_pending_since, + mgmt_timeout, + mgmt_resend, + mgmt_stanzas_in = 0, + mgmt_stanzas_out = 0, lang}). %-define(DBGFSM, true). @@ -174,31 +174,31 @@ Name == <<"a">>; Name == <<"r">>). --define(IS_SUPPORTED_SM_XMLNS(Xmlns), +-define(IS_SUPPORTED_MGMT_XMLNS(Xmlns), Xmlns == ?NS_STREAM_MGMT_2; Xmlns == ?NS_STREAM_MGMT_3). --define(SM_FAILED(Condition, Xmlns), +-define(MGMT_FAILED(Condition, Xmlns), #xmlel{name = <<"failed">>, attrs = [{<<"xmlns">>, Xmlns}], children = [#xmlel{name = Condition, attrs = [{<<"xmlns">>, ?NS_STANZAS}], children = []}]}). --define(SM_BAD_REQUEST(Xmlns), - ?SM_FAILED(<<"bad-request">>, Xmlns)). +-define(MGMT_BAD_REQUEST(Xmlns), + ?MGMT_FAILED(<<"bad-request">>, Xmlns)). --define(SM_ITEM_NOT_FOUND(Xmlns), - ?SM_FAILED(<<"item-not-found">>, Xmlns)). +-define(MGMT_ITEM_NOT_FOUND(Xmlns), + ?MGMT_FAILED(<<"item-not-found">>, Xmlns)). --define(SM_SERVICE_UNAVAILABLE(Xmlns), - ?SM_FAILED(<<"service-unavailable">>, Xmlns)). +-define(MGMT_SERVICE_UNAVAILABLE(Xmlns), + ?MGMT_FAILED(<<"service-unavailable">>, Xmlns)). --define(SM_UNEXPECTED_REQUEST(Xmlns), - ?SM_FAILED(<<"unexpected-request">>, Xmlns)). +-define(MGMT_UNEXPECTED_REQUEST(Xmlns), + ?MGMT_FAILED(<<"unexpected-request">>, Xmlns)). --define(SM_UNSUPPORTED_VERSION(Xmlns), - ?SM_FAILED(<<"unsupported-version">>, Xmlns)). +-define(MGMT_UNSUPPORTED_VERSION(Xmlns), + ?MGMT_FAILED(<<"unsupported-version">>, Xmlns)). %%%---------------------------------------------------------------------- %%% API @@ -328,10 +328,10 @@ init([{SockMod, Socket}, Opts]) -> tls_enabled = TLSEnabled, tls_options = TLSOpts, sid = {now(), self()}, streamid = new_id(), access = Access, shaper = Shaper, ip = IP, - sm_state = StreamMgmtState, - max_ack_queue = MaxAckQueue, - resume_timeout = ResumeTimeout, - resend_on_timeout = ResendOnTimeout}, + mgmt_state = StreamMgmtState, + mgmt_max_queue = MaxAckQueue, + mgmt_timeout = ResumeTimeout, + mgmt_resend = ResendOnTimeout}, {ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT} end. @@ -695,9 +695,11 @@ wait_for_auth({xmlstreamerror, _}, StateData) -> wait_for_auth(closed, StateData) -> {stop, normal, StateData}. -wait_for_feature_request({xmlstreamelement, #xmlel{name = Name} = El}, StateData) +wait_for_feature_request({xmlstreamelement, #xmlel{name = Name} = El}, + StateData) when ?IS_STREAM_MGMT_TAG(Name) -> - fsm_next_state(wait_for_feature_request, dispatch_stream_mgmt(El, StateData)); + fsm_next_state(wait_for_feature_request, + dispatch_stream_mgmt(El, StateData)); wait_for_feature_request({xmlstreamelement, El}, StateData) -> #xmlel{name = Name, attrs = Attrs, children = Els} = El, @@ -1176,11 +1178,11 @@ session_established({xmlstreamerror, _}, StateData) -> send_trailer(StateData), {stop, normal, StateData}; session_established(closed, StateData) - when StateData#state.resume_timeout > 0, - StateData#state.sm_state == active orelse - StateData#state.sm_state == pending -> + when StateData#state.mgmt_timeout > 0, + StateData#state.mgmt_state == active orelse + StateData#state.mgmt_state == pending -> log_pending_state(StateData), - fsm_next_state(wait_for_resume, StateData#state{sm_state = pending}); + fsm_next_state(wait_for_resume, StateData#state{mgmt_state = pending}); session_established(closed, StateData) -> {stop, normal, StateData}. @@ -1333,7 +1335,7 @@ handle_sync_event(resume_session, _From, _StateName, StateData#state.user, StateData#state.server, StateData#state.resource), - {stop, normal, {ok, StateData}, StateData#state{sm_state = resumed}}; + {stop, normal, {ok, StateData}, StateData#state{mgmt_state = resumed}}; handle_sync_event(_Event, _From, StateName, StateData) -> Reply = ok, fsm_reply(Reply, StateName, StateData). @@ -1662,11 +1664,12 @@ handle_info({route, From, To, handle_info({'DOWN', Monitor, _Type, _Object, _Info}, _StateName, StateData) when Monitor == StateData#state.socket_monitor -> - if StateData#state.resume_timeout > 0, - StateData#state.sm_state == active orelse - StateData#state.sm_state == pending -> + if StateData#state.mgmt_timeout > 0, + StateData#state.mgmt_state == active orelse + StateData#state.mgmt_state == pending -> log_pending_state(StateData), - fsm_next_state(wait_for_resume, StateData#state{sm_state = pending}); + fsm_next_state(wait_for_resume, + StateData#state{mgmt_state = pending}); true -> {stop, normal, StateData} end; @@ -1733,7 +1736,7 @@ print_state(State = #state{pres_t = T, pres_f = F, pres_a = A, pres_i = I}) -> %% Returns: any %%---------------------------------------------------------------------- terminate(_Reason, StateName, StateData) -> - case StateData#state.sm_state of + case StateData#state.mgmt_state of resumed -> ?INFO_MSG("Closing former stream of resumed session for ~s", [jlib:jid_to_string(StateData#state.jid)]); @@ -1812,7 +1815,7 @@ change_shaper(StateData, JID) -> (StateData#state.sockmod):change_shaper(StateData#state.socket, Shaper). -send_text(StateData, Text) when StateData#state.sm_state == pending -> +send_text(StateData, Text) when StateData#state.mgmt_state == pending -> ?DEBUG("Cannot send text while waiting for resumption: ~p", [Text]); send_text(StateData, Text) when StateData#state.xml_socket -> ?DEBUG("Send Text on stream = ~p", [Text]), @@ -1822,7 +1825,7 @@ send_text(StateData, Text) -> ?DEBUG("Send XML on stream = ~p", [Text]), (StateData#state.sockmod):send(StateData#state.socket, Text). -send_element(StateData, El) when StateData#state.sm_state == pending -> +send_element(StateData, El) when StateData#state.mgmt_state == pending -> ?DEBUG("Cannot send element while waiting for resumption: ~p", [El]); send_element(StateData, El) when StateData#state.xml_socket -> (StateData#state.sockmod):send_xml(StateData#state.socket, @@ -1830,11 +1833,11 @@ send_element(StateData, El) when StateData#state.xml_socket -> send_element(StateData, El) -> send_text(StateData, xml:element_to_binary(El)). -send_stanza(StateData, Stanza) when StateData#state.sm_state == pending -> - ack_queue_add(StateData, Stanza); -send_stanza(StateData, Stanza) when StateData#state.sm_state == active -> +send_stanza(StateData, Stanza) when StateData#state.mgmt_state == pending -> + mgmt_queue_add(StateData, Stanza); +send_stanza(StateData, Stanza) when StateData#state.mgmt_state == active -> send_stanza_and_ack_req(StateData, Stanza), - ack_queue_add(StateData, Stanza); + mgmt_queue_add(StateData, Stanza); send_stanza(StateData, Stanza) -> send_element(StateData, Stanza), StateData. @@ -1874,7 +1877,7 @@ send_header(StateData, Server, Version, Lang) -> send_text(StateData, iolist_to_binary(Header)). send_trailer(StateData) - when StateData#state.sm_state == pending -> + when StateData#state.mgmt_state == pending -> ?DEBUG("Cannot send stream trailer while waiting for resumption", []); send_trailer(StateData) when StateData#state.xml_socket -> @@ -2432,14 +2435,14 @@ fsm_next_state_gc(StateName, PackedStateData) -> fsm_next_state(session_established, StateData) -> {next_state, session_established, StateData, ?C2S_HIBERNATE_TIMEOUT}; -fsm_next_state(wait_for_resume, #state{pending_since = undefined} = +fsm_next_state(wait_for_resume, #state{mgmt_pending_since = undefined} = StateData) -> {next_state, wait_for_resume, - StateData#state{pending_since = os:timestamp()}, - StateData#state.resume_timeout}; + StateData#state{mgmt_pending_since = os:timestamp()}, + StateData#state.mgmt_timeout}; fsm_next_state(wait_for_resume, StateData) -> - Diff = timer:now_diff(os:timestamp(), StateData#state.pending_since), - Timeout = max(StateData#state.resume_timeout - Diff div 1000, 1), + Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since), + Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1), {next_state, wait_for_resume, StateData, Timeout}; fsm_next_state(StateName, StateData) -> {next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}. @@ -2449,14 +2452,14 @@ fsm_next_state(StateName, StateData) -> fsm_reply(Reply, session_established, StateData) -> {reply, Reply, session_established, StateData, ?C2S_HIBERNATE_TIMEOUT}; -fsm_reply(Reply, wait_for_resume, #state{pending_since = undefined} = +fsm_reply(Reply, wait_for_resume, #state{mgmt_pending_since = undefined} = StateData) -> {reply, Reply, wait_for_resume, - StateData#state{pending_since = os:timestamp()}, - StateData#state.resume_timeout}; + StateData#state{mgmt_pending_since = os:timestamp()}, + StateData#state.mgmt_timeout}; fsm_reply(Reply, wait_for_resume, StateData) -> - Diff = timer:now_diff(os:timestamp(), StateData#state.pending_since), - Timeout = max(StateData#state.resume_timeout - Diff div 1000, 1), + Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since), + Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1), {reply, Reply, wait_for_resume, StateData, Timeout}; fsm_reply(Reply, StateName, StateData) -> {reply, Reply, StateName, StateData, ?C2S_OPEN_TIMEOUT}. @@ -2556,13 +2559,14 @@ route_blocking(What, StateData) -> %%% XEP-0198 %%%---------------------------------------------------------------------- -stream_mgmt_enabled(#state{sm_state = disabled}) -> +stream_mgmt_enabled(#state{mgmt_state = disabled}) -> false; stream_mgmt_enabled(_StateData) -> true. -dispatch_stream_mgmt(El, StateData) when StateData#state.sm_state == active; - StateData#state.sm_state == pending -> +dispatch_stream_mgmt(El, StateData) + when StateData#state.mgmt_state == active; + StateData#state.mgmt_state == pending -> perform_stream_mgmt(El, StateData); dispatch_stream_mgmt(El, StateData) -> negotiate_stream_mgmt(El, StateData). @@ -2573,39 +2577,39 @@ negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) -> %% Binding unless it is resuming a previous session". However, it also %% says: "Stream management errors SHOULD be considered recoverable", so we %% won't bail out. - send_element(StateData, ?SM_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)), + send_element(StateData, ?MGMT_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)), StateData; negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> case xml:get_attr_s(<<"xmlns">>, Attrs) of - Xmlns when ?IS_SUPPORTED_SM_XMLNS(Xmlns) -> + Xmlns when ?IS_SUPPORTED_MGMT_XMLNS(Xmlns) -> case stream_mgmt_enabled(StateData) of true -> case Name of <<"enable">> -> - handle_enable(StateData#state{sm_xmlns = Xmlns}, Attrs); + handle_enable(StateData#state{mgmt_xmlns = Xmlns}, Attrs); _ -> Res = if Name == <<"a">>; Name == <<"r">>; Name == <<"resume">> -> - ?SM_UNEXPECTED_REQUEST(Xmlns); + ?MGMT_UNEXPECTED_REQUEST(Xmlns); true -> - ?SM_BAD_REQUEST(Xmlns) + ?MGMT_BAD_REQUEST(Xmlns) end, send_element(StateData, Res), StateData end; false -> - send_element(StateData, ?SM_SERVICE_UNAVAILABLE(Xmlns)), + send_element(StateData, ?MGMT_SERVICE_UNAVAILABLE(Xmlns)), StateData end; _ -> - send_element(StateData, ?SM_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3)), + send_element(StateData, ?MGMT_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3)), StateData end. perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> case xml:get_attr_s(<<"xmlns">>, Attrs) of - Xmlns when Xmlns == StateData#state.sm_xmlns -> + Xmlns when Xmlns == StateData#state.mgmt_xmlns -> case Name of <<"r">> -> handle_r(StateData); @@ -2614,20 +2618,20 @@ perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> _ -> Res = if Name == <<"enable">>; Name == <<"resume">> -> - ?SM_UNEXPECTED_REQUEST(Xmlns); + ?MGMT_UNEXPECTED_REQUEST(Xmlns); true -> - ?SM_BAD_REQUEST(Xmlns) + ?MGMT_BAD_REQUEST(Xmlns) end, send_element(StateData, Res), StateData end; _ -> send_element(StateData, - ?SM_UNSUPPORTED_VERSION(StateData#state.sm_xmlns)), + ?MGMT_UNSUPPORTED_VERSION(StateData#state.mgmt_xmlns)), StateData end. -handle_enable(#state{resume_timeout = ConfigTimeout} = StateData, Attrs) -> +handle_enable(#state{mgmt_timeout = ConfigTimeout} = StateData, Attrs) -> Timeout = case xml:get_attr_s(<<"resume">>, Attrs) of ResumeAttr when ResumeAttr == <<"true">>; ResumeAttr == <<"1">> -> @@ -2641,7 +2645,7 @@ handle_enable(#state{resume_timeout = ConfigTimeout} = StateData, Attrs) -> _ -> 0 end, - ResAttrs = [{<<"xmlns">>, StateData#state.sm_xmlns}] ++ + ResAttrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}] ++ if Timeout > 0 -> ?INFO_MSG("Stream management with resumption enabled for ~s", [jlib:jid_to_string(StateData#state.jid)]), @@ -2657,25 +2661,26 @@ handle_enable(#state{resume_timeout = ConfigTimeout} = StateData, Attrs) -> attrs = ResAttrs, children = []}, send_element(StateData, Res), - StateData#state{sm_state = active, - ack_queue = queue:new(), - resume_timeout = Timeout * 1000}. + StateData#state{mgmt_state = active, + mgmt_queue = queue:new(), + mgmt_timeout = Timeout * 1000}. handle_r(StateData) -> - H = jlib:integer_to_binary(StateData#state.n_stanzas_in), + H = jlib:integer_to_binary(StateData#state.mgmt_stanzas_in), Res = #xmlel{name = <<"a">>, - attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}, + attrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}, {<<"h">>, H}], children = []}, send_element(StateData, Res), StateData. -handle_a(#state{jid = JID, n_stanzas_out = NumStanzasOut} = StateData, Attrs) -> +handle_a(#state{jid = JID, mgmt_stanzas_out = NumStanzasOut} = StateData, + Attrs) -> case catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs)) of H when is_integer(H), H >= 0 -> ?DEBUG("~s acknowledged ~B of ~B stanzas", [jlib:jid_to_string(JID), H, NumStanzasOut]), - ack_queue_drop(StateData, H); + mgmt_queue_drop(StateData, H); _ -> ?WARNING_MSG("Ignoring invalid ACK element from ~s", [jlib:jid_to_string(JID)]), @@ -2684,7 +2689,7 @@ handle_a(#state{jid = JID, n_stanzas_out = NumStanzasOut} = StateData, Attrs) -> handle_resume(StateData, Attrs) -> R = case xml:get_attr_s(<<"xmlns">>, Attrs) of - Xmlns when ?IS_SUPPORTED_SM_XMLNS(Xmlns) -> + Xmlns when ?IS_SUPPORTED_MGMT_XMLNS(Xmlns) -> case stream_mgmt_enabled(StateData) of true -> case {xml:get_attr(<<"previd">>, Attrs), @@ -2695,23 +2700,26 @@ handle_resume(StateData, Attrs) -> {ok, InheritedState} -> {ok, InheritedState, H}; {error, Err} -> - {error, ?SM_ITEM_NOT_FOUND(Xmlns), Err} + {error, ?MGMT_ITEM_NOT_FOUND(Xmlns), Err} end; _ -> - {error, ?SM_BAD_REQUEST(Xmlns), <<"Invalid request">>} + {error, ?MGMT_BAD_REQUEST(Xmlns), + <<"Invalid request">>} end; false -> - {error, ?SM_SERVICE_UNAVAILABLE(Xmlns), <<"XEP-0198 disabled">>} + {error, ?MGMT_SERVICE_UNAVAILABLE(Xmlns), + <<"XEP-0198 disabled">>} end; _ -> - {error, ?SM_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3), <<"Invalid XMLNS">>} + {error, ?MGMT_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3), + <<"Invalid XMLNS">>} end, case R of {ok, ResumedState, NumHandled} -> - NewState = ack_queue_drop(ResumedState, NumHandled), - AttrXmlns = NewState#state.sm_xmlns, + NewState = mgmt_queue_drop(ResumedState, NumHandled), + AttrXmlns = NewState#state.mgmt_xmlns, AttrId = make_resume_id(NewState), - AttrH = jlib:integer_to_binary(NewState#state.n_stanzas_in), + AttrH = jlib:integer_to_binary(NewState#state.mgmt_stanzas_in), send_element(NewState, #xmlel{name = <<"resumed">>, attrs = [{<<"xmlns">>, AttrXmlns}, @@ -2734,8 +2742,8 @@ handle_resume(StateData, Attrs) -> error end. -update_num_stanzas_in(#state{sm_state = active} = StateData, El) -> - NewNum = case {is_stanza(El), StateData#state.n_stanzas_in} of +update_num_stanzas_in(#state{mgmt_state = active} = StateData, El) -> + NewNum = case {is_stanza(El), StateData#state.mgmt_stanzas_in} of {true, 4294967295} -> 0; {true, Num} -> @@ -2743,59 +2751,60 @@ update_num_stanzas_in(#state{sm_state = active} = StateData, El) -> {false, Num} -> Num end, - StateData#state{n_stanzas_in = NewNum}; + StateData#state{mgmt_stanzas_in = NewNum}; update_num_stanzas_in(StateData, _El) -> StateData. send_stanza_and_ack_req(StateData, Stanza) -> AckReq = #xmlel{name = <<"r">>, - attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}], + attrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}], children = []}, StanzaS = xml:element_to_binary(Stanza), AckReqS = xml:element_to_binary(AckReq), send_text(StateData, [StanzaS, AckReqS]). -ack_queue_add(StateData, El) -> - NewNum = case StateData#state.n_stanzas_out of +mgmt_queue_add(StateData, El) -> + NewNum = case StateData#state.mgmt_stanzas_out of 4294967295 -> 0; Num -> Num + 1 end, NewState = limit_queue_length(StateData), - NewQueue = queue:in({NewNum, El}, NewState#state.ack_queue), - NewState#state{ack_queue = NewQueue, n_stanzas_out = NewNum}. + NewQueue = queue:in({NewNum, El}, NewState#state.mgmt_queue), + NewState#state{mgmt_queue = NewQueue, mgmt_stanzas_out = NewNum}. -ack_queue_drop(StateData, NumHandled) -> +mgmt_queue_drop(StateData, NumHandled) -> NewQueue = jlib:queue_drop_while(fun({N, _Stanza}) -> N =< NumHandled end, - StateData#state.ack_queue), - StateData#state{ack_queue = NewQueue}. + StateData#state.mgmt_queue), + StateData#state{mgmt_queue = NewQueue}. -limit_queue_length(#state{max_ack_queue = Limit} = StateData) +limit_queue_length(#state{mgmt_max_queue = Limit} = StateData) when Limit == infinity; Limit == unlimited -> StateData; limit_queue_length(#state{jid = JID, - ack_queue = Queue, - max_ack_queue = Limit} = StateData) -> + mgmt_queue = Queue, + mgmt_max_queue = Limit} = StateData) -> case queue:len(Queue) >= Limit of true -> ?WARNING_MSG("Dropping stanza from too long ACK queue for ~s", [jlib:jid_to_string(JID)]), - limit_queue_length(StateData#state{ack_queue = queue:drop(Queue)}); + limit_queue_length(StateData#state{mgmt_queue = queue:drop(Queue)}); false -> StateData end. -log_pending_state(StateData) when StateData#state.sm_state /= pending -> +log_pending_state(StateData) when StateData#state.mgmt_state /= pending -> ?INFO_MSG("Waiting for resumption of stream for ~s", [jlib:jid_to_string(StateData#state.jid)]); log_pending_state(_StateData) -> ok. -handle_unacked_stanzas(StateData, F) when StateData#state.sm_state == active; - StateData#state.sm_state == pending -> - Queue = StateData#state.ack_queue, +handle_unacked_stanzas(StateData, F) + when StateData#state.mgmt_state == active; + StateData#state.mgmt_state == pending -> + Queue = StateData#state.mgmt_queue, case queue:len(Queue) of 0 -> ok; @@ -2814,9 +2823,10 @@ handle_unacked_stanzas(StateData, F) when StateData#state.sm_state == active; handle_unacked_stanzas(_StateData, _F) -> ok. -handle_unacked_stanzas(StateData) when StateData#state.sm_state == active; - StateData#state.sm_state == pending -> - ReRoute = case StateData#state.resend_on_timeout of +handle_unacked_stanzas(StateData) + when StateData#state.mgmt_state == active; + StateData#state.mgmt_state == pending -> + ReRoute = case StateData#state.mgmt_resend of true -> fun ejabberd_router:route/3; false -> @@ -2908,12 +2918,12 @@ inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) -> pres_invis = OldStateData#state.pres_invis, privacy_list = OldStateData#state.privacy_list, aux_fields = OldStateData#state.aux_fields, - sm_xmlns = OldStateData#state.sm_xmlns, - ack_queue = OldStateData#state.ack_queue, - resume_timeout = OldStateData#state.resume_timeout, - n_stanzas_in = OldStateData#state.n_stanzas_in, - n_stanzas_out = OldStateData#state.n_stanzas_out, - sm_state = active}}; + mgmt_xmlns = OldStateData#state.mgmt_xmlns, + mgmt_queue = OldStateData#state.mgmt_queue, + mgmt_timeout = OldStateData#state.mgmt_timeout, + mgmt_stanzas_in = OldStateData#state.mgmt_stanzas_in, + mgmt_stanzas_out = OldStateData#state.mgmt_stanzas_out, + mgmt_state = active}}; _ -> {error, <<"Cannot grab session state">>} end