From e360c56f876e191b9ef4f7b2fd7d5fc3afad1cc1 Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Wed, 19 Mar 2014 00:51:33 +0100 Subject: [PATCH] Support XEP-0198 session resumption Implement the optional session resumption feature described in XEP-0198. A client that supports this feature may now resume the previous session (within a configurable number of seconds) if the connection was lost. During resumption, ejabberd will retransmit any stanzas that hadn't been acknowledged by the client. --- doc/guide.tex | 11 +- src/ejabberd_c2s.erl | 411 ++++++++++++++++++++++++++++++++++--------- src/ejabberd_sm.erl | 15 +- src/jlib.erl | 16 ++ 4 files changed, 359 insertions(+), 94 deletions(-) diff --git a/doc/guide.tex b/doc/guide.tex index 59f39e256..23752a27d 100644 --- a/doc/guide.tex +++ b/doc/guide.tex @@ -871,8 +871,8 @@ The available modules, their purpose and the options allowed by each one are: Handles c2s connections.\\ Options: \texttt{access}, \texttt{certfile}, \texttt{ciphers}, \texttt{max\_ack\_queue}, \texttt{max\_fsm\_queue}, - \texttt{max\_stanza\_size}, \texttt{shaper}, - \texttt{starttls}, \texttt{starttls\_required}, + \texttt{max\_stanza\_size}, \texttt{resume\_timeout}, + \texttt{shaper}, \texttt{starttls}, \texttt{starttls\_required}, \texttt{stream\_management}, \texttt{tls}, \texttt{zlib}, \texttt{tls\_compression} \titem{\texttt{ejabberd\_s2s\_in}} @@ -1007,6 +1007,13 @@ request_handlers: /"a"/"b": mod_foo /"http-bind": mod_http_bind \end{verbatim} + \titem{resume\_timeout: Seconds} + This option configures the number of seconds until a session times + out if the connection is lost. During this period of time, a client + may resume the session if \term{stream\_management} is enabled. This + option can be specified for \term{ejabberd\_c2s} listeners. Setting + it to \term{0} effectively disables session resumption. The default + value is \term{300}. \titem{service\_check\_from: true|false} \ind{options!service\_check\_from} This option can be used with \term{ejabberd\_service} only. diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index d89529d4e..fb3de4a6d 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -58,6 +58,7 @@ wait_for_bind/2, wait_for_session/2, wait_for_sasl_response/2, + wait_for_resume/2, session_established/2, handle_event/3, handle_sync_event/4, @@ -108,10 +109,13 @@ auth_module = unknown, ip, aux_fields = [], + sm_state, sm_xmlns, ack_queue, max_ack_queue, manage_stream = fun negotiate_stream_mgmt/2, + pending_since, + resume_timeout, n_stanzas_in = 0, n_stanzas_out = 0, lang}). @@ -166,6 +170,7 @@ -define(IS_STREAM_MGMT_TAG(Name), Name == <<"enable">>; + Name == <<"resume">>; Name == <<"a">>; Name == <<"r">>). @@ -183,6 +188,9 @@ -define(SM_BAD_REQUEST(Xmlns), ?SM_FAILED(<<"bad-request">>, Xmlns)). +-define(SM_ITEM_NOT_FOUND(Xmlns), + ?SM_FAILED(<<"item-not-found">>, Xmlns)). + -define(SM_SERVICE_UNAVAILABLE(Xmlns), ?SM_FAILED(<<"service-unavailable">>, Xmlns)). @@ -285,16 +293,18 @@ init([{SockMod, Socket}, Opts]) -> true -> TLSOpts1 end, TLSOpts = [verify_none | TLSOpts2], + StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true), + StreamMgmtState = if StreamMgmtEnabled -> inactive; + true -> disabled + end, MaxAckQueue = case proplists:get_value(max_ack_queue, Opts) of Limit when is_integer(Limit), Limit > 0 -> Limit; _ -> 500 end, - StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true), - AckQueue = if not StreamMgmtEnabled -> - none; - true -> - undefined - end, + ResumeTimeout = case proplists:get_value(resume_timeout, Opts) of + Timeout when is_integer(Timeout), Timeout >= 0 -> Timeout; + _ -> 300 + end, IP = peerip(SockMod, Socket), %% Check if IP is blacklisted: case is_ip_blacklisted(IP) of @@ -315,9 +325,11 @@ init([{SockMod, Socket}, Opts]) -> xml_socket = XMLSocket, zlib = Zlib, tls = TLS, tls_required = StartTLSRequired, tls_enabled = TLSEnabled, tls_options = TLSOpts, - streamid = new_id(), access = Access, - shaper = Shaper, ip = IP, - ack_queue = AckQueue, max_ack_queue = MaxAckQueue}, + sid = {now(), self()}, streamid = new_id(), + access = Access, shaper = Shaper, ip = IP, + sm_state = StreamMgmtState, + max_ack_queue = MaxAckQueue, + resume_timeout = ResumeTimeout}, {ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT} end. @@ -603,7 +615,6 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> ?INFO_MSG("(~w) Accepted legacy authentication for ~s by ~p", [StateData#state.socket, jlib:jid_to_string(JID), AuthModule]), - SID = {now(), self()}, Conn = (StateData#state.sockmod):get_conn_type( StateData#state.socket), Info = [{ip, StateData#state.ip}, {conn, Conn}, @@ -611,7 +622,9 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> Res = jlib:make_result_iq_reply( El#xmlel{children = []}), send_element(StateData, Res), - ejabberd_sm:open_session(SID, U, StateData#state.server, R, Info), + ejabberd_sm:open_session(StateData#state.sid, U, + StateData#state.server, R, + Info), change_shaper(StateData, JID), {Fs, Ts} = ejabberd_hooks:run_fold(roster_get_subscription_lists, @@ -629,7 +642,7 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> [U, StateData#state.server]), NewStateData = StateData#state{user = U, resource = R, - jid = JID, sid = SID, + jid = JID, conn = Conn, auth_module = AuthModule, pres_f = (?SETS):from_list(Fs1), @@ -981,9 +994,20 @@ resource_conflict_action(U, S, R) -> {accept_resource, Rnew} end. -wait_for_bind({xmlstreamelement, #xmlel{name = Name} = El}, StateData) +wait_for_bind({xmlstreamelement, #xmlel{name = Name, attrs = Attrs} = El}, + StateData) when ?IS_STREAM_MGMT_TAG(Name) -> - fsm_next_state(wait_for_bind, (StateData#state.manage_stream)(El, StateData)); + case Name of + <<"resume">> -> + case handle_resume(StateData, Attrs) of + {ok, ResumedState} -> + fsm_next_state(session_established, ResumedState); + error -> + fsm_next_state(wait_for_bind, StateData) + end; + _ -> + fsm_next_state(wait_for_bind, (StateData#state.manage_stream)(El, StateData)) + end; wait_for_bind({xmlstreamelement, El}, StateData) -> case jlib:iq_query_info(El) of #iq{type = set, xmlns = ?NS_BIND, sub_el = SubEl} = @@ -1077,15 +1101,13 @@ wait_for_session({xmlstreamelement, El}, StateData) -> privacy_get_user_list, NewState#state.server, #userlist{}, [U, NewState#state.server]), - SID = {now(), self()}, Conn = get_conn_type(NewState), Info = [{ip, NewState#state.ip}, {conn, Conn}, {auth_module, NewState#state.auth_module}], ejabberd_sm:open_session( - SID, U, NewState#state.server, R, Info), + NewState#state.sid, U, NewState#state.server, R, Info), UpdatedStateData = NewState#state{ - sid = SID, conn = Conn, pres_f = ?SETS:from_list(Fs1), pres_t = ?SETS:from_list(Ts1), @@ -1151,6 +1173,11 @@ session_established({xmlstreamerror, _}, StateData) -> send_element(StateData, ?INVALID_XML_ERR), send_trailer(StateData), {stop, normal, StateData}; +session_established(closed, StateData) + when StateData#state.resume_timeout > 0, + StateData#state.sm_state == active orelse + StateData#state.sm_state == pending -> + fsm_next_state(wait_for_resume, StateData#state{sm_state = pending}); session_established(closed, StateData) -> {stop, normal, StateData}. @@ -1240,6 +1267,17 @@ session_established2(El, StateData) -> [{xmlstreamelement, El}]), fsm_next_state(session_established, NewState). +wait_for_resume(timeout, StateData) -> + ?DEBUG("Timed out waiting for resumption of stream for ~s", + [jlib:jid_to_string(StateData#state.jid)]), + {stop, normal, StateData}; +wait_for_resume(closed, StateData) -> + ?DEBUG("Ignoring 'closed' event while waiting for resumption", []), + fsm_next_state(wait_for_resume, StateData); +wait_for_resume(Event, StateData) -> + ?WARNING_MSG("Ignoring event while waiting for resumption: ~p", [Event]), + fsm_next_state(wait_for_resume, StateData). + %%---------------------------------------------------------------------- %% Func: StateName/3 %% Returns: {next_state, NextStateName, NextStateData} | @@ -1284,6 +1322,15 @@ handle_sync_event(get_subscribed, _From, StateName, StateData) -> Subscribed = (?SETS):to_list(StateData#state.pres_f), {reply, Subscribed, StateName, StateData}; +handle_sync_event(resume_session, _From, _StateName, + StateData) -> + %% The old session should be closed before the new one is opened, so we do + %% this here instead of leaving it to the terminate callback + ejabberd_sm:close_session(StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource), + {stop, normal, {ok, StateData}, StateData#state{sm_state = resumed}}; handle_sync_event(_Event, _From, StateName, StateData) -> Reply = ok, fsm_reply(Reply, StateName, StateData). @@ -1612,7 +1659,13 @@ handle_info({route, From, To, handle_info({'DOWN', Monitor, _Type, _Object, _Info}, _StateName, StateData) when Monitor == StateData#state.socket_monitor -> - {stop, normal, StateData}; + if StateData#state.resume_timeout > 0, + StateData#state.sm_state == active orelse + StateData#state.sm_state == pending -> + fsm_next_state(wait_for_resume, StateData#state{sm_state = pending}); + true -> + {stop, normal, StateData} + end; handle_info(system_shutdown, StateName, StateData) -> case StateName of wait_for_stream -> @@ -1676,61 +1729,71 @@ print_state(State = #state{pres_t = T, pres_f = F, pres_a = A, pres_i = I}) -> %% Returns: any %%---------------------------------------------------------------------- terminate(_Reason, StateName, StateData) -> - case StateName of - session_established -> - case StateData#state.authenticated of - replaced -> - ?INFO_MSG("(~w) Replaced session for ~s", - [StateData#state.socket, - jlib:jid_to_string(StateData#state.jid)]), - From = StateData#state.jid, - Packet = #xmlel{name = <<"presence">>, - attrs = [{<<"type">>, <<"unavailable">>}], - children = - [#xmlel{name = <<"status">>, attrs = [], - children = - [{xmlcdata, - <<"Replaced by new connection">>}]}]}, - ejabberd_sm:close_session_unset_presence(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource, - <<"Replaced by new connection">>), - presence_broadcast(StateData, From, - StateData#state.pres_a, Packet), - presence_broadcast(StateData, From, - StateData#state.pres_i, Packet); - _ -> - ?INFO_MSG("(~w) Close session for ~s", - [StateData#state.socket, - jlib:jid_to_string(StateData#state.jid)]), - EmptySet = (?SETS):new(), - case StateData of - #state{pres_last = undefined, pres_a = EmptySet, pres_i = EmptySet, pres_invis = false} -> - ejabberd_sm:close_session(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource); - _ -> - From = StateData#state.jid, - Packet = #xmlel{name = <<"presence">>, - attrs = [{<<"type">>, <<"unavailable">>}], - children = []}, - ejabberd_sm:close_session_unset_presence(StateData#state.sid, - StateData#state.user, - StateData#state.server, - StateData#state.resource, - <<"">>), - presence_broadcast(StateData, From, - StateData#state.pres_a, Packet), - presence_broadcast(StateData, From, - StateData#state.pres_i, Packet) - end - end, - resend_unacked_stanzas(StateData), - bounce_messages(); + case StateData#state.sm_state of + resumed -> + ?INFO_MSG("Closing former stream of resumed session for ~s", + [jlib:jid_to_string(StateData#state.jid)]); _ -> - ok + if StateName == session_established; + StateName == wait_for_resume -> + case StateData#state.authenticated of + replaced -> + ?INFO_MSG("(~w) Replaced session for ~s", + [StateData#state.socket, + jlib:jid_to_string(StateData#state.jid)]), + From = StateData#state.jid, + Packet = #xmlel{name = <<"presence">>, + attrs = [{<<"type">>, <<"unavailable">>}], + children = + [#xmlel{name = <<"status">>, attrs = [], + children = + [{xmlcdata, + <<"Replaced by new connection">>}]}]}, + ejabberd_sm:close_session_unset_presence(StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource, + <<"Replaced by new connection">>), + presence_broadcast(StateData, From, + StateData#state.pres_a, Packet), + presence_broadcast(StateData, From, + StateData#state.pres_i, Packet), + resend_unacked_stanzas(StateData); + _ -> + ?INFO_MSG("(~w) Close session for ~s", + [StateData#state.socket, + jlib:jid_to_string(StateData#state.jid)]), + EmptySet = (?SETS):new(), + case StateData of + #state{pres_last = undefined, + pres_a = EmptySet, + pres_i = EmptySet, + pres_invis = false} -> + ejabberd_sm:close_session(StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource); + _ -> + From = StateData#state.jid, + Packet = #xmlel{name = <<"presence">>, + attrs = [{<<"type">>, <<"unavailable">>}], + children = []}, + ejabberd_sm:close_session_unset_presence(StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource, + <<"">>), + presence_broadcast(StateData, From, + StateData#state.pres_a, Packet), + presence_broadcast(StateData, From, + StateData#state.pres_i, Packet) + end, + resend_unacked_stanzas(StateData) + end, + bounce_messages(); + true -> + ok + end end, (StateData#state.sockmod):close(StateData#state.socket), ok. @@ -1745,6 +1808,8 @@ change_shaper(StateData, JID) -> (StateData#state.sockmod):change_shaper(StateData#state.socket, Shaper). +send_text(StateData, Text) when StateData#state.sm_state == pending -> + ?DEBUG("Cannot send text while waiting for resumption: ~p", [Text]); send_text(StateData, Text) when StateData#state.xml_socket -> ?DEBUG("Send Text on stream = ~p", [Text]), (StateData#state.sockmod):send_xml(StateData#state.socket, @@ -1753,13 +1818,17 @@ send_text(StateData, Text) -> ?DEBUG("Send XML on stream = ~p", [Text]), (StateData#state.sockmod):send(StateData#state.socket, Text). +send_element(StateData, El) when StateData#state.sm_state == pending -> + ?DEBUG("Cannot send element while waiting for resumption: ~p", [El]); send_element(StateData, El) when StateData#state.xml_socket -> (StateData#state.sockmod):send_xml(StateData#state.socket, {xmlstreamelement, El}); send_element(StateData, El) -> send_text(StateData, xml:element_to_binary(El)). -send_stanza(StateData, Stanza) when StateData#state.sm_xmlns /= undefined -> +send_stanza(StateData, Stanza) when StateData#state.sm_state == pending -> + ack_queue_add(StateData, Stanza); +send_stanza(StateData, Stanza) when StateData#state.sm_state == active -> send_stanza_and_ack_req(StateData, Stanza), ack_queue_add(StateData, Stanza); send_stanza(StateData, Stanza) -> @@ -2356,6 +2425,15 @@ fsm_next_state_gc(StateName, PackedStateData) -> fsm_next_state(session_established, StateData) -> {next_state, session_established, StateData, ?C2S_HIBERNATE_TIMEOUT}; +fsm_next_state(wait_for_resume, #state{pending_since = undefined} = + StateData) -> + {next_state, wait_for_resume, + StateData#state{pending_since = os:timestamp()}, + StateData#state.resume_timeout}; +fsm_next_state(wait_for_resume, StateData) -> + Diff = timer:now_diff(os:timestamp(), StateData#state.pending_since), + Timeout = max(StateData#state.resume_timeout - Diff div 1000, 1), + {next_state, wait_for_resume, StateData, Timeout}; fsm_next_state(StateName, StateData) -> {next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}. @@ -2364,6 +2442,15 @@ fsm_next_state(StateName, StateData) -> fsm_reply(Reply, session_established, StateData) -> {reply, Reply, session_established, StateData, ?C2S_HIBERNATE_TIMEOUT}; +fsm_reply(Reply, wait_for_resume, #state{pending_since = undefined} = + StateData) -> + {reply, Reply, wait_for_resume, + StateData#state{pending_since = os:timestamp()}, + StateData#state.resume_timeout}; +fsm_reply(Reply, wait_for_resume, StateData) -> + Diff = timer:now_diff(os:timestamp(), StateData#state.pending_since), + Timeout = max(StateData#state.resume_timeout - Diff div 1000, 1), + {reply, Reply, wait_for_resume, StateData, Timeout}; fsm_reply(Reply, StateName, StateData) -> {reply, Reply, StateName, StateData, ?C2S_OPEN_TIMEOUT}. @@ -2462,7 +2549,7 @@ route_blocking(What, StateData) -> %%% XEP-0198 %%%---------------------------------------------------------------------- -stream_mgmt_enabled(#state{ack_queue = none}) -> +stream_mgmt_enabled(#state{sm_state = disabled}) -> false; stream_mgmt_enabled(_StateData) -> true. @@ -2470,8 +2557,9 @@ stream_mgmt_enabled(_StateData) -> negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) -> %% XEP-0198 says: "For client-to-server connections, the client MUST NOT %% attempt to enable stream management until after it has completed Resource - %% Binding". However, it also says: "Stream management errors SHOULD be - %% considered recoverable", so we won't bail out. + %% Binding unless it is resuming a previous session". However, it also + %% says: "Stream management errors SHOULD be considered recoverable", so we + %% won't bail out. send_element(StateData, ?SM_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)), StateData; negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> @@ -2481,10 +2569,11 @@ negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> true -> case Name of <<"enable">> -> - handle_enable(StateData#state{sm_xmlns = Xmlns}); + handle_enable(StateData#state{sm_xmlns = Xmlns}, Attrs); _ -> Res = if Name == <<"a">>; - Name == <<"r">> -> + Name == <<"r">>; + Name == <<"resume">> -> ?SM_UNEXPECTED_REQUEST(Xmlns); true -> ?SM_BAD_REQUEST(Xmlns) @@ -2510,7 +2599,8 @@ perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> <<"a">> -> handle_a(StateData, Attrs); _ -> - Res = if Name == <<"enable">> -> + Res = if Name == <<"enable">>; + Name == <<"resume">> -> ?SM_UNEXPECTED_REQUEST(Xmlns); true -> ?SM_BAD_REQUEST(Xmlns) @@ -2524,15 +2614,40 @@ perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> StateData end. -handle_enable(StateData) -> - ?INFO_MSG("Enabling XEP-0198 stream management for ~s", - [jlib:jid_to_string(StateData#state.jid)]), +handle_enable(#state{resume_timeout = ConfigTimeout} = StateData, Attrs) -> + Timeout = case xml:get_attr_s(<<"resume">>, Attrs) of + ResumeAttr when ResumeAttr == <<"true">>; + ResumeAttr == <<"1">> -> + MaxAttr = xml:get_attr_s(<<"max">>, Attrs), + case catch jlib:binary_to_integer(MaxAttr) of + Max when is_integer(Max), Max > 0, Max =< ConfigTimeout -> + Max; + _ -> + ConfigTimeout + end; + _ -> + 0 + end, + ResAttrs = [{<<"xmlns">>, StateData#state.sm_xmlns}] ++ + if Timeout > 0 -> + ?INFO_MSG("Stream management with resumption enabled for ~s", + [jlib:jid_to_string(StateData#state.jid)]), + [{<<"id">>, make_resume_id(StateData)}, + {<<"resume">>, <<"true">>}, + {<<"max">>, jlib:integer_to_binary(Timeout)}]; + true -> + ?INFO_MSG("Stream management without resumption enabled for ~s", + [jlib:jid_to_string(StateData#state.jid)]), + [] + end, Res = #xmlel{name = <<"enabled">>, - attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}], + attrs = ResAttrs, children = []}, send_element(StateData, Res), - StateData#state{ack_queue = queue:new(), - manage_stream = fun perform_stream_mgmt/2}. + StateData#state{sm_state = active, + ack_queue = queue:new(), + manage_stream = fun perform_stream_mgmt/2, + resume_timeout = Timeout * 1000}. handle_r(StateData) -> H = jlib:integer_to_binary(StateData#state.n_stanzas_in), @@ -2559,7 +2674,59 @@ handle_a(#state{jid = JID, n_stanzas_out = NumStanzasOut} = StateData, Attrs) -> StateData end. -update_num_stanzas_in(StateData, El) when StateData#state.sm_xmlns /= undefined -> +handle_resume(StateData, Attrs) -> + R = case xml:get_attr_s(<<"xmlns">>, Attrs) of + Xmlns when ?IS_SUPPORTED_SM_XMLNS(Xmlns) -> + case stream_mgmt_enabled(StateData) of + true -> + case {xml:get_attr(<<"previd">>, Attrs), + catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs))} + of + {{value, PrevID}, H} when is_integer(H) -> + case inherit_session_state(StateData, PrevID) of + {ok, InheritedState} -> + {ok, InheritedState, H}; + {error, Err} -> + {error, ?SM_ITEM_NOT_FOUND(Xmlns), Err} + end; + _ -> + {error, ?SM_BAD_REQUEST(Xmlns), <<"Invalid request">>} + end; + false -> + {error, ?SM_SERVICE_UNAVAILABLE(Xmlns), <<"XEP-0198 disabled">>} + end; + _ -> + {error, ?SM_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3), <<"Invalid XMLNS">>} + end, + case R of + {ok, ResumedState, NumHandled} -> + NewState = ack_queue_drop(ResumedState, NumHandled), + AttrXmlns = NewState#state.sm_xmlns, + AttrId = make_resume_id(NewState), + AttrH = jlib:integer_to_binary(NewState#state.n_stanzas_in), + send_element(NewState, + #xmlel{name = <<"resumed">>, + attrs = [{<<"xmlns">>, AttrXmlns}, + {<<"h">>, AttrH}, + {<<"previd">>, AttrId}], + children = []}), + SendFun = fun(_F, _T, El) -> send_element(NewState, El) end, + resend_unacked_stanzas(NewState, SendFun), + send_element(NewState, + #xmlel{name = <<"r">>, + attrs = [{<<"xmlns">>, AttrXmlns}], + children = []}), + ?INFO_MSG("Resumed session for ~s", + [jlib:jid_to_string(NewState#state.jid)]), + {ok, NewState}; + {error, El, Msg} -> + send_element(StateData, El), + ?WARNING_MSG("Cannot resume session for ~s@~s: ~s", + [StateData#state.user, StateData#state.server, Msg]), + error + end. + +update_num_stanzas_in(#state{sm_state = active} = StateData, El) -> NewNum = case {is_stanza(El), StateData#state.n_stanzas_in} of {true, 4294967295} -> 0; @@ -2612,7 +2779,8 @@ limit_queue_length(#state{jid = JID, StateData end. -resend_unacked_stanzas(StateData) when StateData#state.sm_xmlns /= undefined -> +resend_unacked_stanzas(StateData, F) when StateData#state.sm_state == active; + StateData#state.sm_state == pending -> Queue = StateData#state.ack_queue, case queue:len(Queue) of 0 -> @@ -2628,12 +2796,79 @@ resend_unacked_stanzas(StateData) when StateData#state.sm_xmlns /= undefined -> To = jlib:string_to_jid(To_s), ?DEBUG("Resending unacknowledged stanza #~B from ~s to ~s", [Num, From_s, To_s]), - ejabberd_router:route(From, To, El) + F(From, To, El) end, queue:to_list(Queue)) end; +resend_unacked_stanzas(_StateData, _F) -> + ok. + +resend_unacked_stanzas(StateData) when StateData#state.sm_state == active; + StateData#state.sm_state == pending -> + resend_unacked_stanzas(StateData, fun ejabberd_router:route/3); resend_unacked_stanzas(_StateData) -> ok. +inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) -> + case jlib:base64_to_term(ResumeID) of + {term, {U, S, R, Time}} -> + case ejabberd_sm:get_session_pid(U, S, R) of + none -> + {error, <<"Previous session PID not found">>}; + OldPID -> + OldSID = {Time, OldPID}, + case catch resume_session(OldPID) of + {ok, #state{sid = OldSID} = OldStateData} -> + NewSID = {Time, self()}, % Old time, new PID + Priority = case OldStateData#state.pres_last of + undefined -> + 0; + Presence -> + get_priority_from_presence(Presence) + end, + Conn = get_conn_type(StateData), + Info = [{ip, StateData#state.ip}, {conn, Conn}, + {auth_module, StateData#state.auth_module}], + ejabberd_sm:open_session(NewSID, U, S, R, + Priority, Info), + {ok, StateData#state{sid = NewSID, + jid = OldStateData#state.jid, + resource = OldStateData#state.resource, + pres_t = OldStateData#state.pres_t, + pres_f = OldStateData#state.pres_f, + pres_a = OldStateData#state.pres_a, + pres_i = OldStateData#state.pres_i, + pres_last = OldStateData#state.pres_last, + pres_pri = OldStateData#state.pres_pri, + pres_timestamp = OldStateData#state.pres_timestamp, + pres_invis = OldStateData#state.pres_invis, + privacy_list = OldStateData#state.privacy_list, + aux_fields = OldStateData#state.aux_fields, + sm_xmlns = OldStateData#state.sm_xmlns, + ack_queue = OldStateData#state.ack_queue, + manage_stream = fun perform_stream_mgmt/2, + resume_timeout = OldStateData#state.resume_timeout, + n_stanzas_in = OldStateData#state.n_stanzas_in, + n_stanzas_out = OldStateData#state.n_stanzas_out, + sm_state = active}}; + _ -> + {error, <<"Cannot grab session state">>} + end + end; + error -> + {error, <<"Invalid 'previd' value">>} + end. + +resume_session(FsmRef) -> + (?GEN_FSM):sync_send_all_state_event(FsmRef, resume_session, 3000). + +make_resume_id(StateData) -> + {Time, _} = StateData#state.sid, + ID = {StateData#state.user, + StateData#state.server, + StateData#state.resource, + Time}, + jlib:term_to_base64(ID). + %%%---------------------------------------------------------------------- %%% JID Set memory footprint reduction code %%%---------------------------------------------------------------------- diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index c7e277a8d..6bcacbe78 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -33,7 +33,9 @@ %% API -export([start_link/0, route/3, - open_session/5, close_session/4, + open_session/5, + open_session/6, + close_session/4, check_in_subscription/6, bounce_offline_message/3, disconnect_removed_user/2, @@ -107,10 +109,10 @@ route(From, To, Packet) -> _ -> ok end. --spec open_session(sid(), binary(), binary(), binary(), info()) -> ok. +-spec open_session(sid(), binary(), binary(), binary(), prio(), info()) -> ok. -open_session(SID, User, Server, Resource, Info) -> - set_session(SID, User, Server, Resource, undefined, Info), +open_session(SID, User, Server, Resource, Priority, Info) -> + set_session(SID, User, Server, Resource, Priority, Info), mnesia:dirty_update_counter(session_counter, jlib:nameprep(Server), 1), check_for_sessions_to_replace(User, Server, Resource), @@ -118,6 +120,11 @@ open_session(SID, User, Server, Resource, Info) -> ejabberd_hooks:run(sm_register_connection_hook, JID#jid.lserver, [SID, JID, Info]). +-spec open_session(sid(), binary(), binary(), binary(), info()) -> ok. + +open_session(SID, User, Server, Resource, Info) -> + open_session(SID, User, Server, Resource, undefined, Info). + -spec close_session(sid(), binary(), binary(), binary()) -> ok. close_session(SID, User, Server, Resource) -> diff --git a/src/jlib.erl b/src/jlib.erl index a362697f4..ffabb3ffe 100644 --- a/src/jlib.erl +++ b/src/jlib.erl @@ -46,6 +46,7 @@ timestamp_to_iso/2, timestamp_to_xml/4, timestamp_to_xml/1, now_to_utc_string/1, now_to_local_string/1, datetime_string_to_timestamp/1, + term_to_base64/1, base64_to_term/1, decode_base64/1, encode_base64/1, ip_to_list/1, rsm_encode/1, rsm_encode/2, rsm_decode/1, binary_to_integer/1, binary_to_integer/2, @@ -780,6 +781,21 @@ check_list(List) -> % Base64 stuff (based on httpd_util.erl) % +-spec term_to_base64(term()) -> binary(). + +term_to_base64(Term) -> + encode_base64(term_to_binary(Term)). + +-spec base64_to_term(binary()) -> {term, term()} | error. + +base64_to_term(Base64) -> + case catch binary_to_term(decode_base64(Base64), [safe]) of + {'EXIT', _} -> + error; + Term -> + {term, Term} + end. + -spec decode_base64(binary()) -> binary(). decode_base64(S) ->