diff --git a/doc/guide.tex b/doc/guide.tex index 2e54193e8..92beb1f04 100644 --- a/doc/guide.tex +++ b/doc/guide.tex @@ -873,9 +873,11 @@ The available modules, their purpose and the options allowed by each one are: \titem{\texttt{ejabberd\_c2s}} Handles c2s connections.\\ Options: \texttt{access}, \texttt{certfile}, \texttt{ciphers}, \texttt{protocol\_options} - \texttt{max\_fsm\_queue}, - \texttt{max\_stanza\_size}, \texttt{shaper}, - \texttt{starttls}, \texttt{starttls\_required}, \texttt{tls}, + \texttt{max\_ack\_queue}, \texttt{max\_fsm\_queue}, + \texttt{max\_stanza\_size}, \texttt{resend\_on\_timeout}, + \texttt{resume\_timeout}, \texttt{shaper}, + \texttt{starttls}, \texttt{starttls\_required}, + \texttt{stream\_management}, \texttt{tls}, \texttt{zlib}, \texttt{tls\_compression} \titem{\texttt{ejabberd\_s2s\_in}} Handles incoming s2s connections.\\ @@ -973,6 +975,13 @@ This is a detailed description of each option allowed by the listening modules: \term{http\_poll\_timeout}. The default value is five minutes. The option can be defined in \term{ejabberd.yml}, expressing the time in seconds: \verb|{http_poll_timeout, 300}.| + \titem{max\_ack\_queue: Size} + This option specifies the maximum number of unacknowledged stanzas + queued for possible retransmission if \term{stream\_management} is + enabled. When the limit is reached, the first stanza is dropped from + the queue before adding the next one. This option can be specified + for \term{ejabberd\_c2s} listeners. The allowed values are positive + integers and \term{infinity}. Default value: \term{500}. \titem{max\_fsm\_queue: Size} This option specifies the maximum number of elements in the queue of the FSM (Finite State Machine). @@ -1010,6 +1019,23 @@ request_handlers: /"a"/"b": mod_foo /"http-bind": mod_http_bind \end{verbatim} + \titem{resend\_on\_timeout: true|false} + If \term{stream\_management} is enabled and this option is set to + \term{true}, any stanzas that weren't acknowledged by the client + will be resent on session timeout. This behavior might often be + desired, but could have unexpected results under certain + circumstances. For example, a message that was sent to two resources + might get resent to one of them if the other one timed out. + Therefore, the default value for this option is \term{false}, which + tells ejabberd to generate an error message instead. The option can + be specified for \term{ejabberd\_c2s} listeners. + \titem{resume\_timeout: Seconds} + This option configures the number of seconds until a session times + out if the connection is lost. During this period of time, a client + may resume the session if \term{stream\_management} is enabled. This + option can be specified for \term{ejabberd\_c2s} listeners. Setting + it to \term{0} effectively disables session resumption. The default + value is \term{300}. \titem{service\_check\_from: true|false} \ind{options!service\_check\_from} This option can be used with \term{ejabberd\_service} only. @@ -1033,6 +1059,10 @@ request_handlers: No unencrypted connections will be allowed. You should also set the \option{certfile} option. You can define a certificate file for a specific domain using the global option \option{domain\_certfile}. + \titem{stream\_management: true|false} + Setting this option to \term{false} disables ejabberd's support for + \ind{protocols!XEP-0198: Stream Management}. It can be specified for + \term{ejabberd\_c2s} listeners. The default value is \term{true}. \titem{timeout: Integer} \ind{options!timeout} Timeout of the connections, expressed in milliseconds. Default: 5000 diff --git a/include/ns.hrl b/include/ns.hrl index c5eb2e4ba..aa6150cfd 100644 --- a/include/ns.hrl +++ b/include/ns.hrl @@ -143,3 +143,5 @@ -define(NS_MEDIA, <<"urn:xmpp:media-element">>). -define(NS_BOB, <<"urn:xmpp:bob">>). -define(NS_PING, <<"urn:xmpp:ping">>). +-define(NS_STREAM_MGMT_2, <<"urn:xmpp:sm:2">>). +-define(NS_STREAM_MGMT_3, <<"urn:xmpp:sm:3">>). diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index f2e16e15b..8874c48ad 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -57,6 +57,7 @@ wait_for_bind/2, wait_for_session/2, wait_for_sasl_response/2, + wait_for_resume/2, session_established/2, handle_event/3, handle_sync_event/4, @@ -107,6 +108,15 @@ auth_module = unknown, ip, aux_fields = [], + mgmt_state, + mgmt_xmlns, + mgmt_queue, + mgmt_max_queue, + mgmt_pending_since, + mgmt_timeout, + mgmt_resend, + mgmt_stanzas_in = 0, + mgmt_stanzas_out = 0, lang = <<"">>}). %-define(DBGFSM, true). @@ -155,6 +165,39 @@ -define(INVALID_FROM, ?SERR_INVALID_FROM). +%% XEP-0198: + +-define(IS_STREAM_MGMT_TAG(Name), + Name == <<"enable">>; + Name == <<"resume">>; + Name == <<"a">>; + Name == <<"r">>). + +-define(IS_SUPPORTED_MGMT_XMLNS(Xmlns), + Xmlns == ?NS_STREAM_MGMT_2; + Xmlns == ?NS_STREAM_MGMT_3). + +-define(MGMT_FAILED(Condition, Xmlns), + #xmlel{name = <<"failed">>, + attrs = [{<<"xmlns">>, Xmlns}], + children = [#xmlel{name = Condition, + attrs = [{<<"xmlns">>, ?NS_STANZAS}], + children = []}]}). + +-define(MGMT_BAD_REQUEST(Xmlns), + ?MGMT_FAILED(<<"bad-request">>, Xmlns)). + +-define(MGMT_ITEM_NOT_FOUND(Xmlns), + ?MGMT_FAILED(<<"item-not-found">>, Xmlns)). + +-define(MGMT_SERVICE_UNAVAILABLE(Xmlns), + ?MGMT_FAILED(<<"service-unavailable">>, Xmlns)). + +-define(MGMT_UNEXPECTED_REQUEST(Xmlns), + ?MGMT_FAILED(<<"unexpected-request">>, Xmlns)). + +-define(MGMT_UNSUPPORTED_VERSION(Xmlns), + ?MGMT_FAILED(<<"unsupported-version">>, Xmlns)). %%%---------------------------------------------------------------------- %%% API @@ -258,6 +301,19 @@ init([{SockMod, Socket}, Opts]) -> true -> TLSOpts2 end, TLSOpts = [verify_none | TLSOpts3], + StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true), + StreamMgmtState = if StreamMgmtEnabled -> inactive; + true -> disabled + end, + MaxAckQueue = case proplists:get_value(max_ack_queue, Opts) of + Limit when is_integer(Limit), Limit > 0 -> Limit; + _ -> 500 + end, + ResumeTimeout = case proplists:get_value(resume_timeout, Opts) of + Timeout when is_integer(Timeout), Timeout >= 0 -> Timeout; + _ -> 300 + end, + ResendOnTimeout = proplists:get_bool(resend_on_timeout, Opts), IP = peerip(SockMod, Socket), %% Check if IP is blacklisted: case is_ip_blacklisted(IP) of @@ -278,8 +334,12 @@ init([{SockMod, Socket}, Opts]) -> xml_socket = XMLSocket, zlib = Zlib, tls = TLS, tls_required = StartTLSRequired, tls_enabled = TLSEnabled, tls_options = TLSOpts, - streamid = new_id(), access = Access, - shaper = Shaper, ip = IP}, + sid = {now(), self()}, streamid = new_id(), + access = Access, shaper = Shaper, ip = IP, + mgmt_state = StreamMgmtState, + mgmt_max_queue = MaxAckQueue, + mgmt_timeout = ResumeTimeout, + mgmt_resend = ResendOnTimeout}, {ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT} end. @@ -410,6 +470,18 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) -> ejabberd_hooks:run_fold(roster_get_versioning_feature, Server, [], [Server]), + StreamManagementFeature = + case stream_mgmt_enabled(StateData) of + true -> + [#xmlel{name = <<"sm">>, + attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_2}], + children = []}, + #xmlel{name = <<"sm">>, + attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_3}], + children = []}]; + false -> + [] + end, StreamFeatures = [#xmlel{name = <<"bind">>, attrs = [{<<"xmlns">>, ?NS_BIND}], children = []}, @@ -418,6 +490,7 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) -> children = []}] ++ RosterVersioningFeature ++ + StreamManagementFeature ++ ejabberd_hooks:run_fold(c2s_stream_features, Server, [], [Server]), send_element(StateData, @@ -480,6 +553,9 @@ wait_for_stream({xmlstreamerror, _}, StateData) -> wait_for_stream(closed, StateData) -> {stop, normal, StateData}. +wait_for_auth({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_auth, dispatch_stream_mgmt(El, StateData)); wait_for_auth({xmlstreamelement, El}, StateData) -> case is_auth_packet(El) of {auth, _ID, get, {U, _, _, _}} -> @@ -549,14 +625,15 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> ?INFO_MSG("(~w) Accepted legacy authentication for ~s by ~p", [StateData#state.socket, jlib:jid_to_string(JID), AuthModule]), - SID = {now(), self()}, Conn = get_conn_type(StateData), Info = [{ip, StateData#state.ip}, {conn, Conn}, {auth_module, AuthModule}], Res = jlib:make_result_iq_reply( El#xmlel{children = []}), send_element(StateData, Res), - ejabberd_sm:open_session(SID, U, StateData#state.server, R, Info), + ejabberd_sm:open_session(StateData#state.sid, U, + StateData#state.server, R, + Info), change_shaper(StateData, JID), {Fs, Ts} = ejabberd_hooks:run_fold(roster_get_subscription_lists, @@ -574,7 +651,7 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> [U, StateData#state.server]), NewStateData = StateData#state{user = U, resource = R, - jid = JID, sid = SID, + jid = JID, conn = Conn, auth_module = AuthModule, pres_f = (?SETS):from_list(Fs1), @@ -625,6 +702,11 @@ wait_for_auth({xmlstreamerror, _}, StateData) -> wait_for_auth(closed, StateData) -> {stop, normal, StateData}. +wait_for_feature_request({xmlstreamelement, #xmlel{name = Name} = El}, + StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_feature_request, + dispatch_stream_mgmt(El, StateData)); wait_for_feature_request({xmlstreamelement, El}, StateData) -> #xmlel{name = Name, attrs = Attrs, children = Els} = El, @@ -790,6 +872,9 @@ wait_for_feature_request({xmlstreamerror, _}, wait_for_feature_request(closed, StateData) -> {stop, normal, StateData}. +wait_for_sasl_response({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_sasl_response, dispatch_stream_mgmt(El, StateData)); wait_for_sasl_response({xmlstreamelement, El}, StateData) -> #xmlel{name = Name, attrs = Attrs, children = Els} = El, @@ -920,6 +1005,20 @@ resource_conflict_action(U, S, R) -> {accept_resource, Rnew} end. +wait_for_bind({xmlstreamelement, #xmlel{name = Name, attrs = Attrs} = El}, + StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + case Name of + <<"resume">> -> + case handle_resume(StateData, Attrs) of + {ok, ResumedState} -> + fsm_next_state(session_established, ResumedState); + error -> + fsm_next_state(wait_for_bind, StateData) + end; + _ -> + fsm_next_state(wait_for_bind, dispatch_stream_mgmt(El, StateData)) + end; wait_for_bind({xmlstreamelement, El}, StateData) -> case jlib:iq_query_info(El) of #iq{type = set, xmlns = ?NS_BIND, sub_el = SubEl} = @@ -981,61 +1080,63 @@ wait_for_bind({xmlstreamerror, _}, StateData) -> wait_for_bind(closed, StateData) -> {stop, normal, StateData}. +wait_for_session({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_session, dispatch_stream_mgmt(El, StateData)); wait_for_session({xmlstreamelement, El}, StateData) -> + NewStateData = update_num_stanzas_in(StateData, El), case jlib:iq_query_info(El) of #iq{type = set, xmlns = ?NS_SESSION} -> - U = StateData#state.user, - R = StateData#state.resource, - JID = StateData#state.jid, - case acl:match_rule(StateData#state.server, - StateData#state.access, JID) of + U = NewStateData#state.user, + R = NewStateData#state.resource, + JID = NewStateData#state.jid, + case acl:match_rule(NewStateData#state.server, + NewStateData#state.access, JID) of allow -> ?INFO_MSG("(~w) Opened session for ~s", - [StateData#state.socket, + [NewStateData#state.socket, jlib:jid_to_string(JID)]), Res = jlib:make_result_iq_reply(El#xmlel{children = []}), - send_element(StateData, Res), - change_shaper(StateData, JID), + NewState = send_stanza(NewStateData, Res), + change_shaper(NewState, JID), {Fs, Ts} = ejabberd_hooks:run_fold( roster_get_subscription_lists, - StateData#state.server, + NewState#state.server, {[], []}, - [U, StateData#state.server]), + [U, NewState#state.server]), LJID = jlib:jid_tolower(jlib:jid_remove_resource(JID)), Fs1 = [LJID | Fs], Ts1 = [LJID | Ts], PrivList = ejabberd_hooks:run_fold( - privacy_get_user_list, StateData#state.server, + privacy_get_user_list, NewState#state.server, #userlist{}, - [U, StateData#state.server]), - SID = {now(), self()}, - Conn = get_conn_type(StateData), - Info = [{ip, StateData#state.ip}, {conn, Conn}, - {auth_module, StateData#state.auth_module}], + [U, NewState#state.server]), + Conn = get_conn_type(NewState), + Info = [{ip, NewState#state.ip}, {conn, Conn}, + {auth_module, NewState#state.auth_module}], ejabberd_sm:open_session( - SID, U, StateData#state.server, R, Info), - NewStateData = - StateData#state{ - sid = SID, + NewState#state.sid, U, NewState#state.server, R, Info), + UpdatedStateData = + NewState#state{ conn = Conn, pres_f = ?SETS:from_list(Fs1), pres_t = ?SETS:from_list(Ts1), privacy_list = PrivList}, fsm_next_state_pack(session_established, - NewStateData); + UpdatedStateData); _ -> ejabberd_hooks:run(forbidden_session_hook, - StateData#state.server, [JID]), + NewStateData#state.server, [JID]), ?INFO_MSG("(~w) Forbidden session for ~s", - [StateData#state.socket, + [NewStateData#state.socket, jlib:jid_to_string(JID)]), Err = jlib:make_error_reply(El, ?ERR_NOT_ALLOWED), - send_element(StateData, Err), - fsm_next_state(wait_for_session, StateData) + send_element(NewStateData, Err), + fsm_next_state(wait_for_session, NewStateData) end; _ -> - fsm_next_state(wait_for_session, StateData) + fsm_next_state(wait_for_session, NewStateData) end; wait_for_session(timeout, StateData) -> @@ -1049,6 +1150,9 @@ wait_for_session({xmlstreamerror, _}, StateData) -> wait_for_session(closed, StateData) -> {stop, normal, StateData}. +session_established({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(session_established, dispatch_stream_mgmt(El, StateData)); session_established({xmlstreamelement, El}, StateData) -> FromJID = StateData#state.jid, @@ -1080,6 +1184,12 @@ session_established({xmlstreamerror, _}, StateData) -> send_element(StateData, ?INVALID_XML_ERR), send_trailer(StateData), {stop, normal, StateData}; +session_established(closed, StateData) + when StateData#state.mgmt_timeout > 0, + StateData#state.mgmt_state == active orelse + StateData#state.mgmt_state == pending -> + log_pending_state(StateData), + fsm_next_state(wait_for_resume, StateData#state{mgmt_state = pending}); session_established(closed, StateData) -> {stop, normal, StateData}. @@ -1087,9 +1197,10 @@ session_established(closed, StateData) -> %% connection) session_established2(El, StateData) -> #xmlel{name = Name, attrs = Attrs} = El, - User = StateData#state.user, - Server = StateData#state.server, - FromJID = StateData#state.jid, + NewStateData = update_num_stanzas_in(StateData, El), + User = NewStateData#state.user, + Server = NewStateData#state.server, + FromJID = NewStateData#state.jid, To = xml:get_attr_s(<<"to">>, Attrs), ToJID = case To of <<"">> -> jlib:make_jid(User, Server, <<"">>); @@ -1098,7 +1209,7 @@ session_established2(El, StateData) -> NewEl1 = jlib:remove_attr(<<"xmlns">>, El), NewEl = case xml:get_attr_s(<<"xml:lang">>, Attrs) of <<"">> -> - case StateData#state.lang of + case NewStateData#state.lang of <<"">> -> NewEl1; Lang -> xml:replace_tag_attr(<<"xml:lang">>, Lang, NewEl1) @@ -1108,13 +1219,18 @@ session_established2(El, StateData) -> NewState = case ToJID of error -> case xml:get_attr_s(<<"type">>, Attrs) of - <<"error">> -> StateData; - <<"result">> -> StateData; + <<"error">> -> NewStateData; + <<"result">> -> NewStateData; _ -> Err = jlib:make_error_reply(NewEl, ?ERR_JID_MALFORMED), - send_element(StateData, Err), - StateData + case is_stanza(Err) of + true -> + send_stanza(NewStateData, Err); + false -> + send_element(NewStateData, Err), + NewStateData + end end; _ -> case Name of @@ -1129,12 +1245,12 @@ session_established2(El, StateData) -> #jid{user = User, server = Server, resource = <<"">>} -> ?DEBUG("presence_update(~p,~n\t~p,~n\t~p)", - [FromJID, PresenceEl, StateData]), + [FromJID, PresenceEl, NewStateData]), presence_update(FromJID, PresenceEl, - StateData); + NewStateData); _ -> presence_track(FromJID, ToJID, PresenceEl, - StateData) + NewStateData) end; <<"iq">> -> case jlib:iq_query_info(NewEl) of @@ -1142,27 +1258,38 @@ session_established2(El, StateData) -> when Xmlns == (?NS_PRIVACY); Xmlns == (?NS_BLOCKING) -> process_privacy_iq(FromJID, ToJID, IQ, - StateData); + NewStateData); _ -> ejabberd_hooks:run(user_send_packet, Server, [FromJID, ToJID, NewEl]), - check_privacy_route(FromJID, StateData, + check_privacy_route(FromJID, NewStateData, FromJID, ToJID, NewEl), - StateData + NewStateData end; <<"message">> -> ejabberd_hooks:run(user_send_packet, Server, [FromJID, ToJID, NewEl]), - check_privacy_route(FromJID, StateData, FromJID, + check_privacy_route(FromJID, NewStateData, FromJID, ToJID, NewEl), - StateData; - _ -> StateData + NewStateData; + _ -> NewStateData end end, ejabberd_hooks:run(c2s_loop_debug, [{xmlstreamelement, El}]), fsm_next_state(session_established, NewState). +wait_for_resume({xmlstreamelement, _El} = Event, StateData) -> + session_established(Event, StateData), + fsm_next_state(wait_for_resume, StateData); +wait_for_resume(timeout, StateData) -> + ?DEBUG("Timed out waiting for resumption of stream for ~s", + [jlib:jid_to_string(StateData#state.jid)]), + {stop, normal, StateData}; +wait_for_resume(Event, StateData) -> + ?DEBUG("Ignoring event while waiting for resumption: ~p", [Event]), + fsm_next_state(wait_for_resume, StateData). + %%---------------------------------------------------------------------- %% Func: StateName/3 %% Returns: {next_state, NextStateName, NextStateData} | @@ -1207,6 +1334,15 @@ handle_sync_event(get_subscribed, _From, StateName, StateData) -> Subscribed = (?SETS):to_list(StateData#state.pres_f), {reply, Subscribed, StateName, StateData}; +handle_sync_event(resume_session, _From, _StateName, + StateData) -> + %% The old session should be closed before the new one is opened, so we do + %% this here instead of leaving it to the terminate callback + ejabberd_sm:close_session(StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource), + {stop, normal, {ok, StateData}, StateData#state{mgmt_state = resumed}}; handle_sync_event(_Event, _From, StateName, StateData) -> Reply = ok, fsm_reply(Reply, StateName, StateData). @@ -1275,13 +1411,13 @@ handle_info({route, _From, _To, {broadcast, Data}}, jlib:jid_remove_resource(StateData#state.jid), StateData#state.jid, jlib:iq_to_xml(PrivPushIQ)), - send_element(StateData, PrivPushEl), + NewState = send_stanza(StateData, PrivPushEl), fsm_next_state(StateName, - StateData#state{privacy_list = NewPL}) + NewState#state{privacy_list = NewPL}) end; {blocking, What} -> - route_blocking(What, StateData), - fsm_next_state(StateName, StateData); + NewState = route_blocking(What, StateData), + fsm_next_state(StateName, NewState); _ -> fsm_next_state(StateName, StateData) end; @@ -1527,12 +1663,12 @@ handle_info({route, From, To, jlib:replace_from_to_attrs(jlib:jid_to_string(From), jlib:jid_to_string(To), NewAttrs), FixedPacket = #xmlel{name = Name, attrs = Attrs2, children = Els}, - send_element(StateData, FixedPacket), + SentStateData = send_stanza(StateData, FixedPacket), ejabberd_hooks:run(user_receive_packet, - StateData#state.server, - [StateData#state.jid, From, To, FixedPacket]), + SentStateData#state.server, + [SentStateData#state.jid, From, To, FixedPacket]), ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), - fsm_next_state(StateName, NewState); + fsm_next_state(StateName, SentStateData); true -> ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), fsm_next_state(StateName, NewState) @@ -1540,7 +1676,15 @@ handle_info({route, From, To, handle_info({'DOWN', Monitor, _Type, _Object, _Info}, _StateName, StateData) when Monitor == StateData#state.socket_monitor -> - {stop, normal, StateData}; + if StateData#state.mgmt_timeout > 0, + StateData#state.mgmt_state == active orelse + StateData#state.mgmt_state == pending -> + log_pending_state(StateData), + fsm_next_state(wait_for_resume, + StateData#state{mgmt_state = pending}); + true -> + {stop, normal, StateData} + end; handle_info(system_shutdown, StateName, StateData) -> case StateName of wait_for_stream -> @@ -1604,60 +1748,71 @@ print_state(State = #state{pres_t = T, pres_f = F, pres_a = A, pres_i = I}) -> %% Returns: any %%---------------------------------------------------------------------- terminate(_Reason, StateName, StateData) -> - case StateName of - session_established -> - case StateData#state.authenticated of - replaced -> - ?INFO_MSG("(~w) Replaced session for ~s", - [StateData#state.socket, - jlib:jid_to_string(StateData#state.jid)]), - From = StateData#state.jid, - Packet = #xmlel{name = <<"presence">>, - attrs = [{<<"type">>, <<"unavailable">>}], - children = - [#xmlel{name = <<"status">>, attrs = [], - children = - [{xmlcdata, - <<"Replaced by new connection">>}]}]}, - ejabberd_sm:close_session_unset_presence(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource, - <<"Replaced by new connection">>), - presence_broadcast(StateData, From, - StateData#state.pres_a, Packet), - presence_broadcast(StateData, From, - StateData#state.pres_i, Packet); - _ -> - ?INFO_MSG("(~w) Close session for ~s", - [StateData#state.socket, - jlib:jid_to_string(StateData#state.jid)]), - EmptySet = (?SETS):new(), - case StateData of - #state{pres_last = undefined, pres_a = EmptySet, pres_i = EmptySet, pres_invis = false} -> - ejabberd_sm:close_session(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource); - _ -> - From = StateData#state.jid, - Packet = #xmlel{name = <<"presence">>, - attrs = [{<<"type">>, <<"unavailable">>}], - children = []}, - ejabberd_sm:close_session_unset_presence(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource, - <<"">>), - presence_broadcast(StateData, From, - StateData#state.pres_a, Packet), - presence_broadcast(StateData, From, - StateData#state.pres_i, Packet) - end - end, - bounce_messages(); + case StateData#state.mgmt_state of + resumed -> + ?INFO_MSG("Closing former stream of resumed session for ~s", + [jlib:jid_to_string(StateData#state.jid)]); _ -> - ok + if StateName == session_established; + StateName == wait_for_resume -> + case StateData#state.authenticated of + replaced -> + ?INFO_MSG("(~w) Replaced session for ~s", + [StateData#state.socket, + jlib:jid_to_string(StateData#state.jid)]), + From = StateData#state.jid, + Packet = #xmlel{name = <<"presence">>, + attrs = [{<<"type">>, <<"unavailable">>}], + children = + [#xmlel{name = <<"status">>, attrs = [], + children = + [{xmlcdata, + <<"Replaced by new connection">>}]}]}, + ejabberd_sm:close_session_unset_presence(StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource, + <<"Replaced by new connection">>), + presence_broadcast(StateData, From, + StateData#state.pres_a, Packet), + presence_broadcast(StateData, From, + StateData#state.pres_i, Packet), + handle_unacked_stanzas(StateData); + _ -> + ?INFO_MSG("(~w) Close session for ~s", + [StateData#state.socket, + jlib:jid_to_string(StateData#state.jid)]), + EmptySet = (?SETS):new(), + case StateData of + #state{pres_last = undefined, + pres_a = EmptySet, + pres_i = EmptySet, + pres_invis = false} -> + ejabberd_sm:close_session(StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource); + _ -> + From = StateData#state.jid, + Packet = #xmlel{name = <<"presence">>, + attrs = [{<<"type">>, <<"unavailable">>}], + children = []}, + ejabberd_sm:close_session_unset_presence(StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource, + <<"">>), + presence_broadcast(StateData, From, + StateData#state.pres_a, Packet), + presence_broadcast(StateData, From, + StateData#state.pres_i, Packet) + end, + handle_unacked_stanzas(StateData) + end, + bounce_messages(); + true -> + ok + end end, (StateData#state.sockmod):close(StateData#state.socket), ok. @@ -1672,6 +1827,8 @@ change_shaper(StateData, JID) -> (StateData#state.sockmod):change_shaper(StateData#state.socket, Shaper). +send_text(StateData, Text) when StateData#state.mgmt_state == pending -> + ?DEBUG("Cannot send text while waiting for resumption: ~p", [Text]); send_text(StateData, Text) when StateData#state.xml_socket -> ?DEBUG("Send Text on stream = ~p", [Text]), (StateData#state.sockmod):send_xml(StateData#state.socket, @@ -1680,12 +1837,23 @@ send_text(StateData, Text) -> ?DEBUG("Send XML on stream = ~p", [Text]), (StateData#state.sockmod):send(StateData#state.socket, Text). +send_element(StateData, El) when StateData#state.mgmt_state == pending -> + ?DEBUG("Cannot send element while waiting for resumption: ~p", [El]); send_element(StateData, El) when StateData#state.xml_socket -> (StateData#state.sockmod):send_xml(StateData#state.socket, {xmlstreamelement, El}); send_element(StateData, El) -> send_text(StateData, xml:element_to_binary(El)). +send_stanza(StateData, Stanza) when StateData#state.mgmt_state == pending -> + mgmt_queue_add(StateData, Stanza); +send_stanza(StateData, Stanza) when StateData#state.mgmt_state == active -> + send_stanza_and_ack_req(StateData, Stanza), + mgmt_queue_add(StateData, Stanza); +send_stanza(StateData, Stanza) -> + send_element(StateData, Stanza), + StateData. + send_header(StateData, Server, Version, Lang) when StateData#state.xml_socket -> VersionAttr = case Version of @@ -1720,6 +1888,9 @@ send_header(StateData, Server, Version, Lang) -> LangStr]), send_text(StateData, iolist_to_binary(Header)). +send_trailer(StateData) + when StateData#state.mgmt_state == pending -> + ?DEBUG("Cannot send stream trailer while waiting for resumption", []); send_trailer(StateData) when StateData#state.xml_socket -> (StateData#state.sockmod):send_xml(StateData#state.socket, @@ -1739,6 +1910,19 @@ is_auth_packet(El) -> _ -> false end. +is_stanza(#xmlel{name = Name, attrs = Attrs}) when Name == <<"message">>; + Name == <<"presence">>; + Name == <<"iq">> -> + case xml:get_attr(<<"xmlns">>, Attrs) of + {value, NS} when NS /= <<"jabber:client">>, + NS /= <<"jabber:server">> -> + false; + _ -> + true + end; +is_stanza(_El) -> + false. + get_auth_tags([#xmlel{name = Name, children = Els} | L], U, P, D, R) -> CData = xml:get_cdata(Els), @@ -1866,12 +2050,12 @@ presence_update(From, Packet, StateData) -> ejabberd_hooks:run(user_available_hook, NewStateData#state.server, [NewStateData#state.jid]), - if NewPriority >= 0 -> - resend_offline_messages(NewStateData), - resend_subscription_requests(NewStateData); - true -> ok - end, - presence_broadcast_first(From, NewStateData, + ResentStateData = if NewPriority >= 0 -> + resend_offline_messages(NewStateData), + resend_subscription_requests(NewStateData); + true -> NewStateData + end, + presence_broadcast_first(From, ResentStateData, Packet); true -> presence_broadcast_to_trusted(NewStateData, From, @@ -2168,11 +2352,6 @@ resend_offline_messages(StateData) -> end, if Pass -> ejabberd_router:route(From, To, Packet); - %% send_element(StateData, FixedPacket), - %% ejabberd_hooks:run(user_receive_packet, - %% StateData#state.server, - %% [StateData#state.jid, - %% From, To, FixedPacket]); true -> ok end end, @@ -2185,10 +2364,11 @@ resend_subscription_requests(#state{user = User, PendingSubscriptions = ejabberd_hooks:run_fold(resend_subscription_requests_hook, Server, [], [User, Server]), - lists:foreach(fun (XMLPacket) -> - send_element(StateData, XMLPacket) - end, - PendingSubscriptions). + lists:foldl(fun (XMLPacket, AccStateData) -> + send_stanza(AccStateData, XMLPacket) + end, + StateData, + PendingSubscriptions). get_showtag(undefined) -> <<"unavailable">>; get_showtag(Presence) -> @@ -2267,6 +2447,15 @@ fsm_next_state_gc(StateName, PackedStateData) -> fsm_next_state(session_established, StateData) -> {next_state, session_established, StateData, ?C2S_HIBERNATE_TIMEOUT}; +fsm_next_state(wait_for_resume, #state{mgmt_pending_since = undefined} = + StateData) -> + {next_state, wait_for_resume, + StateData#state{mgmt_pending_since = os:timestamp()}, + StateData#state.mgmt_timeout}; +fsm_next_state(wait_for_resume, StateData) -> + Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since), + Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1), + {next_state, wait_for_resume, StateData, Timeout}; fsm_next_state(StateName, StateData) -> {next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}. @@ -2275,6 +2464,15 @@ fsm_next_state(StateName, StateData) -> fsm_reply(Reply, session_established, StateData) -> {reply, Reply, session_established, StateData, ?C2S_HIBERNATE_TIMEOUT}; +fsm_reply(Reply, wait_for_resume, #state{mgmt_pending_since = undefined} = + StateData) -> + {reply, Reply, wait_for_resume, + StateData#state{mgmt_pending_since = os:timestamp()}, + StateData#state.mgmt_timeout}; +fsm_reply(Reply, wait_for_resume, StateData) -> + Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since), + Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1), + {reply, Reply, wait_for_resume, StateData, Timeout}; fsm_reply(Reply, StateName, StateData) -> {reply, Reply, StateName, StateData, ?C2S_OPEN_TIMEOUT}. @@ -2364,12 +2562,399 @@ route_blocking(What, StateData) -> PrivPushEl = jlib:replace_from_to(jlib:jid_remove_resource(StateData#state.jid), StateData#state.jid, jlib:iq_to_xml(PrivPushIQ)), - send_element(StateData, PrivPushEl), %% No need to replace active privacy list here, %% blocking pushes are always accompanied by %% Privacy List pushes + send_stanza(StateData, PrivPushEl). + +%%%---------------------------------------------------------------------- +%%% XEP-0198 +%%%---------------------------------------------------------------------- + +stream_mgmt_enabled(#state{mgmt_state = disabled}) -> + false; +stream_mgmt_enabled(_StateData) -> + true. + +dispatch_stream_mgmt(El, StateData) + when StateData#state.mgmt_state == active; + StateData#state.mgmt_state == pending -> + perform_stream_mgmt(El, StateData); +dispatch_stream_mgmt(El, StateData) -> + negotiate_stream_mgmt(El, StateData). + +negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) -> + %% XEP-0198 says: "For client-to-server connections, the client MUST NOT + %% attempt to enable stream management until after it has completed Resource + %% Binding unless it is resuming a previous session". However, it also + %% says: "Stream management errors SHOULD be considered recoverable", so we + %% won't bail out. + send_element(StateData, ?MGMT_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)), + StateData; +negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> + case xml:get_attr_s(<<"xmlns">>, Attrs) of + Xmlns when ?IS_SUPPORTED_MGMT_XMLNS(Xmlns) -> + case stream_mgmt_enabled(StateData) of + true -> + case Name of + <<"enable">> -> + handle_enable(StateData#state{mgmt_xmlns = Xmlns}, Attrs); + _ -> + Res = if Name == <<"a">>; + Name == <<"r">>; + Name == <<"resume">> -> + ?MGMT_UNEXPECTED_REQUEST(Xmlns); + true -> + ?MGMT_BAD_REQUEST(Xmlns) + end, + send_element(StateData, Res), + StateData + end; + false -> + send_element(StateData, ?MGMT_SERVICE_UNAVAILABLE(Xmlns)), + StateData + end; + _ -> + send_element(StateData, ?MGMT_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3)), + StateData + end. + +perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> + case xml:get_attr_s(<<"xmlns">>, Attrs) of + Xmlns when Xmlns == StateData#state.mgmt_xmlns -> + case Name of + <<"r">> -> + handle_r(StateData); + <<"a">> -> + handle_a(StateData, Attrs); + _ -> + Res = if Name == <<"enable">>; + Name == <<"resume">> -> + ?MGMT_UNEXPECTED_REQUEST(Xmlns); + true -> + ?MGMT_BAD_REQUEST(Xmlns) + end, + send_element(StateData, Res), + StateData + end; + _ -> + send_element(StateData, + ?MGMT_UNSUPPORTED_VERSION(StateData#state.mgmt_xmlns)), + StateData + end. + +handle_enable(#state{mgmt_timeout = ConfigTimeout} = StateData, Attrs) -> + Timeout = case xml:get_attr_s(<<"resume">>, Attrs) of + ResumeAttr when ResumeAttr == <<"true">>; + ResumeAttr == <<"1">> -> + MaxAttr = xml:get_attr_s(<<"max">>, Attrs), + case catch jlib:binary_to_integer(MaxAttr) of + Max when is_integer(Max), Max > 0, Max =< ConfigTimeout -> + Max; + _ -> + ConfigTimeout + end; + _ -> + 0 + end, + ResAttrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}] ++ + if Timeout > 0 -> + ?INFO_MSG("Stream management with resumption enabled for ~s", + [jlib:jid_to_string(StateData#state.jid)]), + [{<<"id">>, make_resume_id(StateData)}, + {<<"resume">>, <<"true">>}, + {<<"max">>, jlib:integer_to_binary(Timeout)}]; + true -> + ?INFO_MSG("Stream management without resumption enabled for ~s", + [jlib:jid_to_string(StateData#state.jid)]), + [] + end, + Res = #xmlel{name = <<"enabled">>, + attrs = ResAttrs, + children = []}, + send_element(StateData, Res), + StateData#state{mgmt_state = active, + mgmt_queue = queue:new(), + mgmt_timeout = Timeout * 1000}. + +handle_r(StateData) -> + H = jlib:integer_to_binary(StateData#state.mgmt_stanzas_in), + Res = #xmlel{name = <<"a">>, + attrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}, + {<<"h">>, H}], + children = []}, + send_element(StateData, Res), + StateData. + +handle_a(#state{jid = JID, mgmt_stanzas_out = NumStanzasOut} = StateData, + Attrs) -> + case catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs)) of + H when is_integer(H), H >= 0 -> + ?DEBUG("~s acknowledged ~B of ~B stanzas", + [jlib:jid_to_string(JID), H, NumStanzasOut]), + mgmt_queue_drop(StateData, H); + _ -> + ?WARNING_MSG("Ignoring invalid ACK element from ~s", + [jlib:jid_to_string(JID)]), + StateData + end. + +handle_resume(StateData, Attrs) -> + R = case xml:get_attr_s(<<"xmlns">>, Attrs) of + Xmlns when ?IS_SUPPORTED_MGMT_XMLNS(Xmlns) -> + case stream_mgmt_enabled(StateData) of + true -> + case {xml:get_attr(<<"previd">>, Attrs), + catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs))} + of + {{value, PrevID}, H} when is_integer(H) -> + case inherit_session_state(StateData, PrevID) of + {ok, InheritedState} -> + {ok, InheritedState, H}; + {error, Err} -> + {error, ?MGMT_ITEM_NOT_FOUND(Xmlns), Err} + end; + _ -> + {error, ?MGMT_BAD_REQUEST(Xmlns), + <<"Invalid request">>} + end; + false -> + {error, ?MGMT_SERVICE_UNAVAILABLE(Xmlns), + <<"XEP-0198 disabled">>} + end; + _ -> + {error, ?MGMT_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3), + <<"Invalid XMLNS">>} + end, + case R of + {ok, ResumedState, NumHandled} -> + NewState = mgmt_queue_drop(ResumedState, NumHandled), + AttrXmlns = NewState#state.mgmt_xmlns, + AttrId = make_resume_id(NewState), + AttrH = jlib:integer_to_binary(NewState#state.mgmt_stanzas_in), + send_element(NewState, + #xmlel{name = <<"resumed">>, + attrs = [{<<"xmlns">>, AttrXmlns}, + {<<"h">>, AttrH}, + {<<"previd">>, AttrId}], + children = []}), + SendFun = fun(_F, _T, El) -> send_element(NewState, El) end, + handle_unacked_stanzas(NewState, SendFun), + send_element(NewState, + #xmlel{name = <<"r">>, + attrs = [{<<"xmlns">>, AttrXmlns}], + children = []}), + ?INFO_MSG("Resumed session for ~s", + [jlib:jid_to_string(NewState#state.jid)]), + {ok, NewState}; + {error, El, Msg} -> + send_element(StateData, El), + ?INFO_MSG("Cannot resume session for ~s@~s: ~s", + [StateData#state.user, StateData#state.server, Msg]), + error + end. + +update_num_stanzas_in(#state{mgmt_state = active} = StateData, El) -> + NewNum = case {is_stanza(El), StateData#state.mgmt_stanzas_in} of + {true, 4294967295} -> + 0; + {true, Num} -> + Num + 1; + {false, Num} -> + Num + end, + StateData#state{mgmt_stanzas_in = NewNum}; +update_num_stanzas_in(StateData, _El) -> + StateData. + +send_stanza_and_ack_req(StateData, Stanza) -> + AckReq = #xmlel{name = <<"r">>, + attrs = [{<<"xmlns">>, StateData#state.mgmt_xmlns}], + children = []}, + StanzaS = xml:element_to_binary(Stanza), + AckReqS = xml:element_to_binary(AckReq), + send_text(StateData, [StanzaS, AckReqS]). + +mgmt_queue_add(StateData, El) -> + NewNum = case StateData#state.mgmt_stanzas_out of + 4294967295 -> + 0; + Num -> + Num + 1 + end, + NewState = limit_queue_length(StateData), + NewQueue = queue:in({NewNum, El}, NewState#state.mgmt_queue), + NewState#state{mgmt_queue = NewQueue, mgmt_stanzas_out = NewNum}. + +mgmt_queue_drop(StateData, NumHandled) -> + NewQueue = jlib:queue_drop_while(fun({N, _Stanza}) -> N =< NumHandled end, + StateData#state.mgmt_queue), + StateData#state{mgmt_queue = NewQueue}. + +limit_queue_length(#state{mgmt_max_queue = Limit} = StateData) + when Limit == infinity; + Limit == unlimited -> + StateData; +limit_queue_length(#state{jid = JID, + mgmt_queue = Queue, + mgmt_max_queue = Limit} = StateData) -> + case queue:len(Queue) >= Limit of + true -> + ?WARNING_MSG("Dropping stanza from too long ACK queue for ~s", + [jlib:jid_to_string(JID)]), + limit_queue_length(StateData#state{mgmt_queue = queue:drop(Queue)}); + false -> + StateData + end. + +log_pending_state(StateData) when StateData#state.mgmt_state /= pending -> + ?INFO_MSG("Waiting for resumption of stream for ~s", + [jlib:jid_to_string(StateData#state.jid)]); +log_pending_state(_StateData) -> ok. +handle_unacked_stanzas(StateData, F) + when StateData#state.mgmt_state == active; + StateData#state.mgmt_state == pending -> + Queue = StateData#state.mgmt_queue, + case queue:len(Queue) of + 0 -> + ok; + N -> + ?INFO_MSG("~B stanzas were not acknowledged by ~s", + [N, jlib:jid_to_string(StateData#state.jid)]), + lists:foreach( + fun({_, #xmlel{attrs = Attrs} = El}) -> + From_s = xml:get_attr_s(<<"from">>, Attrs), + From = jlib:string_to_jid(From_s), + To_s = xml:get_attr_s(<<"to">>, Attrs), + To = jlib:string_to_jid(To_s), + F(From, To, El) + end, queue:to_list(Queue)) + end; +handle_unacked_stanzas(_StateData, _F) -> + ok. + +handle_unacked_stanzas(StateData) + when StateData#state.mgmt_state == active; + StateData#state.mgmt_state == pending -> + ReRoute = case StateData#state.mgmt_resend of + true -> + fun ejabberd_router:route/3; + false -> + fun(From, To, El) -> + Err = + jlib:make_error_reply(El, + ?ERR_SERVICE_UNAVAILABLE), + ejabberd_router:route(To, From, Err) + end + end, + F = fun(From, To, El) -> + %% We'll drop the stanza if it was by some + %% encapsulating protocol as per XEP-0297. One such protocol is + %% XEP-0280, which says: "When a receiving server attempts to + %% deliver a forked message, and that message bounces with an + %% error for any reason, the receiving server MUST NOT forward + %% that error back to the original sender." Resending such a + %% stanza could easily lead to unexpected results as well. + case is_encapsulated_forward(El) of + true -> + ?DEBUG("Dropping forwarded stanza from ~s", + [xml:get_attr_s(<<"from">>, El#xmlel.attrs)]); + false -> + ReRoute(From, To, El) + end + end, + handle_unacked_stanzas(StateData, F); +handle_unacked_stanzas(_StateData) -> + ok. + +is_encapsulated_forward(#xmlel{name = <<"message">>} = El) -> + SubTag = case {xml:get_subtag(El, <<"sent">>), + xml:get_subtag(El, <<"received">>), + xml:get_subtag(El, <<"result">>)} of + {false, false, false} -> + false; + {Tag, false, false} -> + Tag; + {false, Tag, false} -> + Tag; + {_, _, Tag} -> + Tag + end, + if SubTag == false -> + false; + true -> + case xml:get_subtag(SubTag, <<"forwarded">>) of + false -> + false; + _ -> + true + end + end; +is_encapsulated_forward(_El) -> + false. + +inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) -> + case jlib:base64_to_term(ResumeID) of + {term, {U, S, R, Time}} -> + case ejabberd_sm:get_session_pid(U, S, R) of + none -> + {error, <<"Previous session PID not found">>}; + OldPID -> + OldSID = {Time, OldPID}, + case catch resume_session(OldPID) of + {ok, #state{sid = OldSID} = OldStateData} -> + NewSID = {Time, self()}, % Old time, new PID + Priority = case OldStateData#state.pres_last of + undefined -> + 0; + Presence -> + get_priority_from_presence(Presence) + end, + Conn = get_conn_type(StateData), + Info = [{ip, StateData#state.ip}, {conn, Conn}, + {auth_module, StateData#state.auth_module}], + ejabberd_sm:open_session(NewSID, U, S, R, + Priority, Info), + {ok, StateData#state{sid = NewSID, + jid = OldStateData#state.jid, + resource = OldStateData#state.resource, + pres_t = OldStateData#state.pres_t, + pres_f = OldStateData#state.pres_f, + pres_a = OldStateData#state.pres_a, + pres_i = OldStateData#state.pres_i, + pres_last = OldStateData#state.pres_last, + pres_pri = OldStateData#state.pres_pri, + pres_timestamp = OldStateData#state.pres_timestamp, + pres_invis = OldStateData#state.pres_invis, + privacy_list = OldStateData#state.privacy_list, + aux_fields = OldStateData#state.aux_fields, + mgmt_xmlns = OldStateData#state.mgmt_xmlns, + mgmt_queue = OldStateData#state.mgmt_queue, + mgmt_timeout = OldStateData#state.mgmt_timeout, + mgmt_stanzas_in = OldStateData#state.mgmt_stanzas_in, + mgmt_stanzas_out = OldStateData#state.mgmt_stanzas_out, + mgmt_state = active}}; + _ -> + {error, <<"Cannot grab session state">>} + end + end; + error -> + {error, <<"Invalid 'previd' value">>} + end. + +resume_session(FsmRef) -> + (?GEN_FSM):sync_send_all_state_event(FsmRef, resume_session, 3000). + +make_resume_id(StateData) -> + {Time, _} = StateData#state.sid, + ID = {StateData#state.user, + StateData#state.server, + StateData#state.resource, + Time}, + jlib:term_to_base64(ID). + %%%---------------------------------------------------------------------- %%% JID Set memory footprint reduction code %%%---------------------------------------------------------------------- diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index 2a06fd2f8..094918cd9 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -32,7 +32,9 @@ %% API -export([start_link/0, route/3, - open_session/5, close_session/4, + open_session/5, + open_session/6, + close_session/4, check_in_subscription/6, bounce_offline_message/3, disconnect_removed_user/2, @@ -109,10 +111,10 @@ route(From, To, Packet) -> _ -> ok end. --spec open_session(sid(), binary(), binary(), binary(), info()) -> ok. +-spec open_session(sid(), binary(), binary(), binary(), prio(), info()) -> ok. -open_session(SID, User, Server, Resource, Info) -> - set_session(SID, User, Server, Resource, undefined, Info), +open_session(SID, User, Server, Resource, Priority, Info) -> + set_session(SID, User, Server, Resource, Priority, Info), mnesia:dirty_update_counter(session_counter, jlib:nameprep(Server), 1), check_for_sessions_to_replace(User, Server, Resource), @@ -120,6 +122,11 @@ open_session(SID, User, Server, Resource, Info) -> ejabberd_hooks:run(sm_register_connection_hook, JID#jid.lserver, [SID, JID, Info]). +-spec open_session(sid(), binary(), binary(), binary(), info()) -> ok. + +open_session(SID, User, Server, Resource, Info) -> + open_session(SID, User, Server, Resource, undefined, Info). + -spec close_session(sid(), binary(), binary(), binary()) -> ok. close_session(SID, User, Server, Resource) -> diff --git a/src/jlib.erl b/src/jlib.erl index 0ff210652..7735d7dbc 100644 --- a/src/jlib.erl +++ b/src/jlib.erl @@ -45,12 +45,13 @@ timestamp_to_iso/2, timestamp_to_xml/4, timestamp_to_xml/1, now_to_utc_string/1, now_to_local_string/1, datetime_string_to_timestamp/1, + term_to_base64/1, base64_to_term/1, decode_base64/1, encode_base64/1, ip_to_list/1, rsm_encode/1, rsm_encode/2, rsm_decode/1, binary_to_integer/1, binary_to_integer/2, integer_to_binary/1, integer_to_binary/2, atom_to_binary/1, binary_to_atom/1, tuple_to_binary/1, - l2i/1, i2l/1, i2l/2]). + l2i/1, i2l/1, i2l/2, queue_drop_while/2]). %% TODO: Remove once XEP-0091 is Obsolete %% TODO: Remove once XEP-0091 is Obsolete @@ -779,6 +780,21 @@ check_list(List) -> % Base64 stuff (based on httpd_util.erl) % +-spec term_to_base64(term()) -> binary(). + +term_to_base64(Term) -> + encode_base64(term_to_binary(Term)). + +-spec base64_to_term(binary()) -> {term, term()} | error. + +base64_to_term(Base64) -> + case catch binary_to_term(decode_base64(Base64), [safe]) of + {'EXIT', _} -> + error; + Term -> + {term, Term} + end. + -spec decode_base64(binary()) -> binary(). decode_base64(S) -> @@ -893,3 +909,18 @@ i2l(L, N) when is_binary(L) -> C when C > N -> L; _ -> i2l(<<$0, L/binary>>, N) end. + +-spec queue_drop_while(fun((term()) -> boolean()), queue()) -> queue(). + +queue_drop_while(F, Q) -> + case queue:peek(Q) of + {value, Item} -> + case F(Item) of + true -> + queue_drop_while(F, queue:drop(Q)); + _ -> + Q + end; + empty -> + Q + end.