Support XEP-0198 session resumption

Implement the optional session resumption feature described in XEP-0198.
A client that supports this feature may now resume the previous session
(within a configurable number of seconds) if the connection was lost.
During resumption, ejabberd will retransmit any stanzas that hadn't been
acknowledged by the client.
This commit is contained in:
Holger Weiss 2014-03-19 00:51:33 +01:00 committed by Holger Wei
parent 88a200e100
commit e360c56f87
4 changed files with 359 additions and 94 deletions

View File

@ -871,8 +871,8 @@ The available modules, their purpose and the options allowed by each one are:
Handles c2s connections.\\
Options: \texttt{access}, \texttt{certfile}, \texttt{ciphers},
\texttt{max\_ack\_queue}, \texttt{max\_fsm\_queue},
\texttt{max\_stanza\_size}, \texttt{shaper},
\texttt{starttls}, \texttt{starttls\_required},
\texttt{max\_stanza\_size}, \texttt{resume\_timeout},
\texttt{shaper}, \texttt{starttls}, \texttt{starttls\_required},
\texttt{stream\_management}, \texttt{tls},
\texttt{zlib}, \texttt{tls\_compression}
\titem{\texttt{ejabberd\_s2s\_in}}
@ -1007,6 +1007,13 @@ request_handlers:
/"a"/"b": mod_foo
/"http-bind": mod_http_bind
\end{verbatim}
\titem{resume\_timeout: Seconds}
This option configures the number of seconds until a session times
out if the connection is lost. During this period of time, a client
may resume the session if \term{stream\_management} is enabled. This
option can be specified for \term{ejabberd\_c2s} listeners. Setting
it to \term{0} effectively disables session resumption. The default
value is \term{300}.
\titem{service\_check\_from: true|false}
\ind{options!service\_check\_from}
This option can be used with \term{ejabberd\_service} only.

View File

@ -58,6 +58,7 @@
wait_for_bind/2,
wait_for_session/2,
wait_for_sasl_response/2,
wait_for_resume/2,
session_established/2,
handle_event/3,
handle_sync_event/4,
@ -108,10 +109,13 @@
auth_module = unknown,
ip,
aux_fields = [],
sm_state,
sm_xmlns,
ack_queue,
max_ack_queue,
manage_stream = fun negotiate_stream_mgmt/2,
pending_since,
resume_timeout,
n_stanzas_in = 0,
n_stanzas_out = 0,
lang}).
@ -166,6 +170,7 @@
-define(IS_STREAM_MGMT_TAG(Name),
Name == <<"enable">>;
Name == <<"resume">>;
Name == <<"a">>;
Name == <<"r">>).
@ -183,6 +188,9 @@
-define(SM_BAD_REQUEST(Xmlns),
?SM_FAILED(<<"bad-request">>, Xmlns)).
-define(SM_ITEM_NOT_FOUND(Xmlns),
?SM_FAILED(<<"item-not-found">>, Xmlns)).
-define(SM_SERVICE_UNAVAILABLE(Xmlns),
?SM_FAILED(<<"service-unavailable">>, Xmlns)).
@ -285,16 +293,18 @@ init([{SockMod, Socket}, Opts]) ->
true -> TLSOpts1
end,
TLSOpts = [verify_none | TLSOpts2],
StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true),
StreamMgmtState = if StreamMgmtEnabled -> inactive;
true -> disabled
end,
MaxAckQueue = case proplists:get_value(max_ack_queue, Opts) of
Limit when is_integer(Limit), Limit > 0 -> Limit;
_ -> 500
end,
StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true),
AckQueue = if not StreamMgmtEnabled ->
none;
true ->
undefined
end,
ResumeTimeout = case proplists:get_value(resume_timeout, Opts) of
Timeout when is_integer(Timeout), Timeout >= 0 -> Timeout;
_ -> 300
end,
IP = peerip(SockMod, Socket),
%% Check if IP is blacklisted:
case is_ip_blacklisted(IP) of
@ -315,9 +325,11 @@ init([{SockMod, Socket}, Opts]) ->
xml_socket = XMLSocket, zlib = Zlib, tls = TLS,
tls_required = StartTLSRequired,
tls_enabled = TLSEnabled, tls_options = TLSOpts,
streamid = new_id(), access = Access,
shaper = Shaper, ip = IP,
ack_queue = AckQueue, max_ack_queue = MaxAckQueue},
sid = {now(), self()}, streamid = new_id(),
access = Access, shaper = Shaper, ip = IP,
sm_state = StreamMgmtState,
max_ack_queue = MaxAckQueue,
resume_timeout = ResumeTimeout},
{ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}
end.
@ -603,7 +615,6 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
?INFO_MSG("(~w) Accepted legacy authentication for ~s by ~p",
[StateData#state.socket,
jlib:jid_to_string(JID), AuthModule]),
SID = {now(), self()},
Conn = (StateData#state.sockmod):get_conn_type(
StateData#state.socket),
Info = [{ip, StateData#state.ip}, {conn, Conn},
@ -611,7 +622,9 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
Res = jlib:make_result_iq_reply(
El#xmlel{children = []}),
send_element(StateData, Res),
ejabberd_sm:open_session(SID, U, StateData#state.server, R, Info),
ejabberd_sm:open_session(StateData#state.sid, U,
StateData#state.server, R,
Info),
change_shaper(StateData, JID),
{Fs, Ts} =
ejabberd_hooks:run_fold(roster_get_subscription_lists,
@ -629,7 +642,7 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
[U, StateData#state.server]),
NewStateData = StateData#state{user = U,
resource = R,
jid = JID, sid = SID,
jid = JID,
conn = Conn,
auth_module = AuthModule,
pres_f = (?SETS):from_list(Fs1),
@ -981,9 +994,20 @@ resource_conflict_action(U, S, R) ->
{accept_resource, Rnew}
end.
wait_for_bind({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
wait_for_bind({xmlstreamelement, #xmlel{name = Name, attrs = Attrs} = El},
StateData)
when ?IS_STREAM_MGMT_TAG(Name) ->
fsm_next_state(wait_for_bind, (StateData#state.manage_stream)(El, StateData));
case Name of
<<"resume">> ->
case handle_resume(StateData, Attrs) of
{ok, ResumedState} ->
fsm_next_state(session_established, ResumedState);
error ->
fsm_next_state(wait_for_bind, StateData)
end;
_ ->
fsm_next_state(wait_for_bind, (StateData#state.manage_stream)(El, StateData))
end;
wait_for_bind({xmlstreamelement, El}, StateData) ->
case jlib:iq_query_info(El) of
#iq{type = set, xmlns = ?NS_BIND, sub_el = SubEl} =
@ -1077,15 +1101,13 @@ wait_for_session({xmlstreamelement, El}, StateData) ->
privacy_get_user_list, NewState#state.server,
#userlist{},
[U, NewState#state.server]),
SID = {now(), self()},
Conn = get_conn_type(NewState),
Info = [{ip, NewState#state.ip}, {conn, Conn},
{auth_module, NewState#state.auth_module}],
ejabberd_sm:open_session(
SID, U, NewState#state.server, R, Info),
NewState#state.sid, U, NewState#state.server, R, Info),
UpdatedStateData =
NewState#state{
sid = SID,
conn = Conn,
pres_f = ?SETS:from_list(Fs1),
pres_t = ?SETS:from_list(Ts1),
@ -1151,6 +1173,11 @@ session_established({xmlstreamerror, _}, StateData) ->
send_element(StateData, ?INVALID_XML_ERR),
send_trailer(StateData),
{stop, normal, StateData};
session_established(closed, StateData)
when StateData#state.resume_timeout > 0,
StateData#state.sm_state == active orelse
StateData#state.sm_state == pending ->
fsm_next_state(wait_for_resume, StateData#state{sm_state = pending});
session_established(closed, StateData) ->
{stop, normal, StateData}.
@ -1240,6 +1267,17 @@ session_established2(El, StateData) ->
[{xmlstreamelement, El}]),
fsm_next_state(session_established, NewState).
wait_for_resume(timeout, StateData) ->
?DEBUG("Timed out waiting for resumption of stream for ~s",
[jlib:jid_to_string(StateData#state.jid)]),
{stop, normal, StateData};
wait_for_resume(closed, StateData) ->
?DEBUG("Ignoring 'closed' event while waiting for resumption", []),
fsm_next_state(wait_for_resume, StateData);
wait_for_resume(Event, StateData) ->
?WARNING_MSG("Ignoring event while waiting for resumption: ~p", [Event]),
fsm_next_state(wait_for_resume, StateData).
%%----------------------------------------------------------------------
%% Func: StateName/3
%% Returns: {next_state, NextStateName, NextStateData} |
@ -1284,6 +1322,15 @@ handle_sync_event(get_subscribed, _From, StateName,
StateData) ->
Subscribed = (?SETS):to_list(StateData#state.pres_f),
{reply, Subscribed, StateName, StateData};
handle_sync_event(resume_session, _From, _StateName,
StateData) ->
%% The old session should be closed before the new one is opened, so we do
%% this here instead of leaving it to the terminate callback
ejabberd_sm:close_session(StateData#state.sid,
StateData#state.user,
StateData#state.server,
StateData#state.resource),
{stop, normal, {ok, StateData}, StateData#state{sm_state = resumed}};
handle_sync_event(_Event, _From, StateName,
StateData) ->
Reply = ok, fsm_reply(Reply, StateName, StateData).
@ -1612,7 +1659,13 @@ handle_info({route, From, To,
handle_info({'DOWN', Monitor, _Type, _Object, _Info},
_StateName, StateData)
when Monitor == StateData#state.socket_monitor ->
{stop, normal, StateData};
if StateData#state.resume_timeout > 0,
StateData#state.sm_state == active orelse
StateData#state.sm_state == pending ->
fsm_next_state(wait_for_resume, StateData#state{sm_state = pending});
true ->
{stop, normal, StateData}
end;
handle_info(system_shutdown, StateName, StateData) ->
case StateName of
wait_for_stream ->
@ -1676,61 +1729,71 @@ print_state(State = #state{pres_t = T, pres_f = F, pres_a = A, pres_i = I}) ->
%% Returns: any
%%----------------------------------------------------------------------
terminate(_Reason, StateName, StateData) ->
case StateName of
session_established ->
case StateData#state.authenticated of
replaced ->
?INFO_MSG("(~w) Replaced session for ~s",
[StateData#state.socket,
jlib:jid_to_string(StateData#state.jid)]),
From = StateData#state.jid,
Packet = #xmlel{name = <<"presence">>,
attrs = [{<<"type">>, <<"unavailable">>}],
children =
[#xmlel{name = <<"status">>, attrs = [],
children =
[{xmlcdata,
<<"Replaced by new connection">>}]}]},
ejabberd_sm:close_session_unset_presence(StateData#state.sid,
StateData#state.user,
StateData#state.server,
StateData#state.resource,
<<"Replaced by new connection">>),
presence_broadcast(StateData, From,
StateData#state.pres_a, Packet),
presence_broadcast(StateData, From,
StateData#state.pres_i, Packet);
_ ->
?INFO_MSG("(~w) Close session for ~s",
[StateData#state.socket,
jlib:jid_to_string(StateData#state.jid)]),
EmptySet = (?SETS):new(),
case StateData of
#state{pres_last = undefined, pres_a = EmptySet, pres_i = EmptySet, pres_invis = false} ->
ejabberd_sm:close_session(StateData#state.sid,
StateData#state.user,
StateData#state.server,
StateData#state.resource);
_ ->
From = StateData#state.jid,
Packet = #xmlel{name = <<"presence">>,
attrs = [{<<"type">>, <<"unavailable">>}],
children = []},
ejabberd_sm:close_session_unset_presence(StateData#state.sid,
StateData#state.user,
StateData#state.server,
StateData#state.resource,
<<"">>),
presence_broadcast(StateData, From,
StateData#state.pres_a, Packet),
presence_broadcast(StateData, From,
StateData#state.pres_i, Packet)
end
end,
resend_unacked_stanzas(StateData),
bounce_messages();
case StateData#state.sm_state of
resumed ->
?INFO_MSG("Closing former stream of resumed session for ~s",
[jlib:jid_to_string(StateData#state.jid)]);
_ ->
ok
if StateName == session_established;
StateName == wait_for_resume ->
case StateData#state.authenticated of
replaced ->
?INFO_MSG("(~w) Replaced session for ~s",
[StateData#state.socket,
jlib:jid_to_string(StateData#state.jid)]),
From = StateData#state.jid,
Packet = #xmlel{name = <<"presence">>,
attrs = [{<<"type">>, <<"unavailable">>}],
children =
[#xmlel{name = <<"status">>, attrs = [],
children =
[{xmlcdata,
<<"Replaced by new connection">>}]}]},
ejabberd_sm:close_session_unset_presence(StateData#state.sid,
StateData#state.user,
StateData#state.server,
StateData#state.resource,
<<"Replaced by new connection">>),
presence_broadcast(StateData, From,
StateData#state.pres_a, Packet),
presence_broadcast(StateData, From,
StateData#state.pres_i, Packet),
resend_unacked_stanzas(StateData);
_ ->
?INFO_MSG("(~w) Close session for ~s",
[StateData#state.socket,
jlib:jid_to_string(StateData#state.jid)]),
EmptySet = (?SETS):new(),
case StateData of
#state{pres_last = undefined,
pres_a = EmptySet,
pres_i = EmptySet,
pres_invis = false} ->
ejabberd_sm:close_session(StateData#state.sid,
StateData#state.user,
StateData#state.server,
StateData#state.resource);
_ ->
From = StateData#state.jid,
Packet = #xmlel{name = <<"presence">>,
attrs = [{<<"type">>, <<"unavailable">>}],
children = []},
ejabberd_sm:close_session_unset_presence(StateData#state.sid,
StateData#state.user,
StateData#state.server,
StateData#state.resource,
<<"">>),
presence_broadcast(StateData, From,
StateData#state.pres_a, Packet),
presence_broadcast(StateData, From,
StateData#state.pres_i, Packet)
end,
resend_unacked_stanzas(StateData)
end,
bounce_messages();
true ->
ok
end
end,
(StateData#state.sockmod):close(StateData#state.socket),
ok.
@ -1745,6 +1808,8 @@ change_shaper(StateData, JID) ->
(StateData#state.sockmod):change_shaper(StateData#state.socket,
Shaper).
send_text(StateData, Text) when StateData#state.sm_state == pending ->
?DEBUG("Cannot send text while waiting for resumption: ~p", [Text]);
send_text(StateData, Text) when StateData#state.xml_socket ->
?DEBUG("Send Text on stream = ~p", [Text]),
(StateData#state.sockmod):send_xml(StateData#state.socket,
@ -1753,13 +1818,17 @@ send_text(StateData, Text) ->
?DEBUG("Send XML on stream = ~p", [Text]),
(StateData#state.sockmod):send(StateData#state.socket, Text).
send_element(StateData, El) when StateData#state.sm_state == pending ->
?DEBUG("Cannot send element while waiting for resumption: ~p", [El]);
send_element(StateData, El) when StateData#state.xml_socket ->
(StateData#state.sockmod):send_xml(StateData#state.socket,
{xmlstreamelement, El});
send_element(StateData, El) ->
send_text(StateData, xml:element_to_binary(El)).
send_stanza(StateData, Stanza) when StateData#state.sm_xmlns /= undefined ->
send_stanza(StateData, Stanza) when StateData#state.sm_state == pending ->
ack_queue_add(StateData, Stanza);
send_stanza(StateData, Stanza) when StateData#state.sm_state == active ->
send_stanza_and_ack_req(StateData, Stanza),
ack_queue_add(StateData, Stanza);
send_stanza(StateData, Stanza) ->
@ -2356,6 +2425,15 @@ fsm_next_state_gc(StateName, PackedStateData) ->
fsm_next_state(session_established, StateData) ->
{next_state, session_established, StateData,
?C2S_HIBERNATE_TIMEOUT};
fsm_next_state(wait_for_resume, #state{pending_since = undefined} =
StateData) ->
{next_state, wait_for_resume,
StateData#state{pending_since = os:timestamp()},
StateData#state.resume_timeout};
fsm_next_state(wait_for_resume, StateData) ->
Diff = timer:now_diff(os:timestamp(), StateData#state.pending_since),
Timeout = max(StateData#state.resume_timeout - Diff div 1000, 1),
{next_state, wait_for_resume, StateData, Timeout};
fsm_next_state(StateName, StateData) ->
{next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}.
@ -2364,6 +2442,15 @@ fsm_next_state(StateName, StateData) ->
fsm_reply(Reply, session_established, StateData) ->
{reply, Reply, session_established, StateData,
?C2S_HIBERNATE_TIMEOUT};
fsm_reply(Reply, wait_for_resume, #state{pending_since = undefined} =
StateData) ->
{reply, Reply, wait_for_resume,
StateData#state{pending_since = os:timestamp()},
StateData#state.resume_timeout};
fsm_reply(Reply, wait_for_resume, StateData) ->
Diff = timer:now_diff(os:timestamp(), StateData#state.pending_since),
Timeout = max(StateData#state.resume_timeout - Diff div 1000, 1),
{reply, Reply, wait_for_resume, StateData, Timeout};
fsm_reply(Reply, StateName, StateData) ->
{reply, Reply, StateName, StateData, ?C2S_OPEN_TIMEOUT}.
@ -2462,7 +2549,7 @@ route_blocking(What, StateData) ->
%%% XEP-0198
%%%----------------------------------------------------------------------
stream_mgmt_enabled(#state{ack_queue = none}) ->
stream_mgmt_enabled(#state{sm_state = disabled}) ->
false;
stream_mgmt_enabled(_StateData) ->
true.
@ -2470,8 +2557,9 @@ stream_mgmt_enabled(_StateData) ->
negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) ->
%% XEP-0198 says: "For client-to-server connections, the client MUST NOT
%% attempt to enable stream management until after it has completed Resource
%% Binding". However, it also says: "Stream management errors SHOULD be
%% considered recoverable", so we won't bail out.
%% Binding unless it is resuming a previous session". However, it also
%% says: "Stream management errors SHOULD be considered recoverable", so we
%% won't bail out.
send_element(StateData, ?SM_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)),
StateData;
negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
@ -2481,10 +2569,11 @@ negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
true ->
case Name of
<<"enable">> ->
handle_enable(StateData#state{sm_xmlns = Xmlns});
handle_enable(StateData#state{sm_xmlns = Xmlns}, Attrs);
_ ->
Res = if Name == <<"a">>;
Name == <<"r">> ->
Name == <<"r">>;
Name == <<"resume">> ->
?SM_UNEXPECTED_REQUEST(Xmlns);
true ->
?SM_BAD_REQUEST(Xmlns)
@ -2510,7 +2599,8 @@ perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
<<"a">> ->
handle_a(StateData, Attrs);
_ ->
Res = if Name == <<"enable">> ->
Res = if Name == <<"enable">>;
Name == <<"resume">> ->
?SM_UNEXPECTED_REQUEST(Xmlns);
true ->
?SM_BAD_REQUEST(Xmlns)
@ -2524,15 +2614,40 @@ perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
StateData
end.
handle_enable(StateData) ->
?INFO_MSG("Enabling XEP-0198 stream management for ~s",
[jlib:jid_to_string(StateData#state.jid)]),
handle_enable(#state{resume_timeout = ConfigTimeout} = StateData, Attrs) ->
Timeout = case xml:get_attr_s(<<"resume">>, Attrs) of
ResumeAttr when ResumeAttr == <<"true">>;
ResumeAttr == <<"1">> ->
MaxAttr = xml:get_attr_s(<<"max">>, Attrs),
case catch jlib:binary_to_integer(MaxAttr) of
Max when is_integer(Max), Max > 0, Max =< ConfigTimeout ->
Max;
_ ->
ConfigTimeout
end;
_ ->
0
end,
ResAttrs = [{<<"xmlns">>, StateData#state.sm_xmlns}] ++
if Timeout > 0 ->
?INFO_MSG("Stream management with resumption enabled for ~s",
[jlib:jid_to_string(StateData#state.jid)]),
[{<<"id">>, make_resume_id(StateData)},
{<<"resume">>, <<"true">>},
{<<"max">>, jlib:integer_to_binary(Timeout)}];
true ->
?INFO_MSG("Stream management without resumption enabled for ~s",
[jlib:jid_to_string(StateData#state.jid)]),
[]
end,
Res = #xmlel{name = <<"enabled">>,
attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}],
attrs = ResAttrs,
children = []},
send_element(StateData, Res),
StateData#state{ack_queue = queue:new(),
manage_stream = fun perform_stream_mgmt/2}.
StateData#state{sm_state = active,
ack_queue = queue:new(),
manage_stream = fun perform_stream_mgmt/2,
resume_timeout = Timeout * 1000}.
handle_r(StateData) ->
H = jlib:integer_to_binary(StateData#state.n_stanzas_in),
@ -2559,7 +2674,59 @@ handle_a(#state{jid = JID, n_stanzas_out = NumStanzasOut} = StateData, Attrs) ->
StateData
end.
update_num_stanzas_in(StateData, El) when StateData#state.sm_xmlns /= undefined ->
handle_resume(StateData, Attrs) ->
R = case xml:get_attr_s(<<"xmlns">>, Attrs) of
Xmlns when ?IS_SUPPORTED_SM_XMLNS(Xmlns) ->
case stream_mgmt_enabled(StateData) of
true ->
case {xml:get_attr(<<"previd">>, Attrs),
catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs))}
of
{{value, PrevID}, H} when is_integer(H) ->
case inherit_session_state(StateData, PrevID) of
{ok, InheritedState} ->
{ok, InheritedState, H};
{error, Err} ->
{error, ?SM_ITEM_NOT_FOUND(Xmlns), Err}
end;
_ ->
{error, ?SM_BAD_REQUEST(Xmlns), <<"Invalid request">>}
end;
false ->
{error, ?SM_SERVICE_UNAVAILABLE(Xmlns), <<"XEP-0198 disabled">>}
end;
_ ->
{error, ?SM_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3), <<"Invalid XMLNS">>}
end,
case R of
{ok, ResumedState, NumHandled} ->
NewState = ack_queue_drop(ResumedState, NumHandled),
AttrXmlns = NewState#state.sm_xmlns,
AttrId = make_resume_id(NewState),
AttrH = jlib:integer_to_binary(NewState#state.n_stanzas_in),
send_element(NewState,
#xmlel{name = <<"resumed">>,
attrs = [{<<"xmlns">>, AttrXmlns},
{<<"h">>, AttrH},
{<<"previd">>, AttrId}],
children = []}),
SendFun = fun(_F, _T, El) -> send_element(NewState, El) end,
resend_unacked_stanzas(NewState, SendFun),
send_element(NewState,
#xmlel{name = <<"r">>,
attrs = [{<<"xmlns">>, AttrXmlns}],
children = []}),
?INFO_MSG("Resumed session for ~s",
[jlib:jid_to_string(NewState#state.jid)]),
{ok, NewState};
{error, El, Msg} ->
send_element(StateData, El),
?WARNING_MSG("Cannot resume session for ~s@~s: ~s",
[StateData#state.user, StateData#state.server, Msg]),
error
end.
update_num_stanzas_in(#state{sm_state = active} = StateData, El) ->
NewNum = case {is_stanza(El), StateData#state.n_stanzas_in} of
{true, 4294967295} ->
0;
@ -2612,7 +2779,8 @@ limit_queue_length(#state{jid = JID,
StateData
end.
resend_unacked_stanzas(StateData) when StateData#state.sm_xmlns /= undefined ->
resend_unacked_stanzas(StateData, F) when StateData#state.sm_state == active;
StateData#state.sm_state == pending ->
Queue = StateData#state.ack_queue,
case queue:len(Queue) of
0 ->
@ -2628,12 +2796,79 @@ resend_unacked_stanzas(StateData) when StateData#state.sm_xmlns /= undefined ->
To = jlib:string_to_jid(To_s),
?DEBUG("Resending unacknowledged stanza #~B from ~s to ~s",
[Num, From_s, To_s]),
ejabberd_router:route(From, To, El)
F(From, To, El)
end, queue:to_list(Queue))
end;
resend_unacked_stanzas(_StateData, _F) ->
ok.
resend_unacked_stanzas(StateData) when StateData#state.sm_state == active;
StateData#state.sm_state == pending ->
resend_unacked_stanzas(StateData, fun ejabberd_router:route/3);
resend_unacked_stanzas(_StateData) ->
ok.
inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) ->
case jlib:base64_to_term(ResumeID) of
{term, {U, S, R, Time}} ->
case ejabberd_sm:get_session_pid(U, S, R) of
none ->
{error, <<"Previous session PID not found">>};
OldPID ->
OldSID = {Time, OldPID},
case catch resume_session(OldPID) of
{ok, #state{sid = OldSID} = OldStateData} ->
NewSID = {Time, self()}, % Old time, new PID
Priority = case OldStateData#state.pres_last of
undefined ->
0;
Presence ->
get_priority_from_presence(Presence)
end,
Conn = get_conn_type(StateData),
Info = [{ip, StateData#state.ip}, {conn, Conn},
{auth_module, StateData#state.auth_module}],
ejabberd_sm:open_session(NewSID, U, S, R,
Priority, Info),
{ok, StateData#state{sid = NewSID,
jid = OldStateData#state.jid,
resource = OldStateData#state.resource,
pres_t = OldStateData#state.pres_t,
pres_f = OldStateData#state.pres_f,
pres_a = OldStateData#state.pres_a,
pres_i = OldStateData#state.pres_i,
pres_last = OldStateData#state.pres_last,
pres_pri = OldStateData#state.pres_pri,
pres_timestamp = OldStateData#state.pres_timestamp,
pres_invis = OldStateData#state.pres_invis,
privacy_list = OldStateData#state.privacy_list,
aux_fields = OldStateData#state.aux_fields,
sm_xmlns = OldStateData#state.sm_xmlns,
ack_queue = OldStateData#state.ack_queue,
manage_stream = fun perform_stream_mgmt/2,
resume_timeout = OldStateData#state.resume_timeout,
n_stanzas_in = OldStateData#state.n_stanzas_in,
n_stanzas_out = OldStateData#state.n_stanzas_out,
sm_state = active}};
_ ->
{error, <<"Cannot grab session state">>}
end
end;
error ->
{error, <<"Invalid 'previd' value">>}
end.
resume_session(FsmRef) ->
(?GEN_FSM):sync_send_all_state_event(FsmRef, resume_session, 3000).
make_resume_id(StateData) ->
{Time, _} = StateData#state.sid,
ID = {StateData#state.user,
StateData#state.server,
StateData#state.resource,
Time},
jlib:term_to_base64(ID).
%%%----------------------------------------------------------------------
%%% JID Set memory footprint reduction code
%%%----------------------------------------------------------------------

View File

@ -33,7 +33,9 @@
%% API
-export([start_link/0,
route/3,
open_session/5, close_session/4,
open_session/5,
open_session/6,
close_session/4,
check_in_subscription/6,
bounce_offline_message/3,
disconnect_removed_user/2,
@ -107,10 +109,10 @@ route(From, To, Packet) ->
_ -> ok
end.
-spec open_session(sid(), binary(), binary(), binary(), info()) -> ok.
-spec open_session(sid(), binary(), binary(), binary(), prio(), info()) -> ok.
open_session(SID, User, Server, Resource, Info) ->
set_session(SID, User, Server, Resource, undefined, Info),
open_session(SID, User, Server, Resource, Priority, Info) ->
set_session(SID, User, Server, Resource, Priority, Info),
mnesia:dirty_update_counter(session_counter,
jlib:nameprep(Server), 1),
check_for_sessions_to_replace(User, Server, Resource),
@ -118,6 +120,11 @@ open_session(SID, User, Server, Resource, Info) ->
ejabberd_hooks:run(sm_register_connection_hook,
JID#jid.lserver, [SID, JID, Info]).
-spec open_session(sid(), binary(), binary(), binary(), info()) -> ok.
open_session(SID, User, Server, Resource, Info) ->
open_session(SID, User, Server, Resource, undefined, Info).
-spec close_session(sid(), binary(), binary(), binary()) -> ok.
close_session(SID, User, Server, Resource) ->

View File

@ -46,6 +46,7 @@
timestamp_to_iso/2, timestamp_to_xml/4,
timestamp_to_xml/1, now_to_utc_string/1,
now_to_local_string/1, datetime_string_to_timestamp/1,
term_to_base64/1, base64_to_term/1,
decode_base64/1, encode_base64/1, ip_to_list/1,
rsm_encode/1, rsm_encode/2, rsm_decode/1,
binary_to_integer/1, binary_to_integer/2,
@ -780,6 +781,21 @@ check_list(List) ->
% Base64 stuff (based on httpd_util.erl)
%
-spec term_to_base64(term()) -> binary().
term_to_base64(Term) ->
encode_base64(term_to_binary(Term)).
-spec base64_to_term(binary()) -> {term, term()} | error.
base64_to_term(Base64) ->
case catch binary_to_term(decode_base64(Base64), [safe]) of
{'EXIT', _} ->
error;
Term ->
{term, Term}
end.
-spec decode_base64(binary()) -> binary().
decode_base64(S) ->