25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-11-24 16:23:40 +01:00

Merge pull request #1131 from weiss/failed-resume-h

XEP-0198: Indicate number of handled stanzas if resumption fails
This commit is contained in:
Evgeny Khramtsov 2016-05-25 11:56:47 +04:00
commit 14b53fbcb0
8 changed files with 155 additions and 47 deletions

View File

@ -3,10 +3,11 @@
-record(session, {sid, usr, us, priority, info}).
-record(session_counter, {vhost, count}).
-type sid() :: {erlang:timestamp(), pid()}.
-type sid() :: {erlang:timestamp(), pid()} | {erlang:timestamp(), undefined}.
-type ip() :: {inet:ip_address(), inet:port_number()} | undefined.
-type info() :: [{conn, atom()} | {ip, ip()} | {node, atom()}
| {oor, boolean()} | {auth_module, atom()}].
| {oor, boolean()} | {auth_module, atom()}
| {num_stanzas_in, non_neg_integer()}].
-type prio() :: undefined | integer().
-endif.

View File

@ -166,27 +166,32 @@
(Xmlns == ?NS_STREAM_MGMT_2) or
(Xmlns == ?NS_STREAM_MGMT_3)).
-define(MGMT_FAILED(Condition, Xmlns),
-define(MGMT_FAILED(Condition, Attrs),
#xmlel{name = <<"failed">>,
attrs = [{<<"xmlns">>, Xmlns}],
attrs = Attrs,
children = [#xmlel{name = Condition,
attrs = [{<<"xmlns">>, ?NS_STANZAS}],
children = []}]}).
-define(MGMT_BAD_REQUEST(Xmlns),
?MGMT_FAILED(<<"bad-request">>, Xmlns)).
-define(MGMT_ITEM_NOT_FOUND(Xmlns),
?MGMT_FAILED(<<"item-not-found">>, Xmlns)).
?MGMT_FAILED(<<"bad-request">>, [{<<"xmlns">>, Xmlns}])).
-define(MGMT_SERVICE_UNAVAILABLE(Xmlns),
?MGMT_FAILED(<<"service-unavailable">>, Xmlns)).
?MGMT_FAILED(<<"service-unavailable">>, [{<<"xmlns">>, Xmlns}])).
-define(MGMT_UNEXPECTED_REQUEST(Xmlns),
?MGMT_FAILED(<<"unexpected-request">>, Xmlns)).
?MGMT_FAILED(<<"unexpected-request">>, [{<<"xmlns">>, Xmlns}])).
-define(MGMT_UNSUPPORTED_VERSION(Xmlns),
?MGMT_FAILED(<<"unsupported-version">>, Xmlns)).
?MGMT_FAILED(<<"unsupported-version">>, [{<<"xmlns">>, Xmlns}])).
-define(MGMT_ITEM_NOT_FOUND(Xmlns),
?MGMT_FAILED(<<"item-not-found">>, [{<<"xmlns">>, Xmlns}])).
-define(MGMT_ITEM_NOT_FOUND_H(Xmlns, NumStanzasIn),
?MGMT_FAILED(<<"item-not-found">>,
[{<<"xmlns">>, Xmlns},
{<<"h">>, jlib:integer_to_binary(NumStanzasIn)}])).
%%%----------------------------------------------------------------------
%%% API
@ -1280,7 +1285,7 @@ wait_for_resume({xmlstreamelement, _El} = Event, StateData) ->
wait_for_resume(timeout, StateData) ->
?DEBUG("Timed out waiting for resumption of stream for ~s",
[jid:to_string(StateData#state.jid)]),
{stop, normal, StateData};
{stop, normal, StateData#state{mgmt_state = timeout}};
wait_for_resume(Event, StateData) ->
?DEBUG("Ignoring event while waiting for resumption: ~p", [Event]),
fsm_next_state(wait_for_resume, StateData).
@ -1791,6 +1796,18 @@ terminate(_Reason, StateName, StateData) ->
presence_broadcast(StateData, From,
StateData#state.pres_a, Packet)
end,
case StateData#state.mgmt_state of
timeout ->
Info = [{num_stanzas_in,
StateData#state.mgmt_stanzas_in}],
ejabberd_sm:set_offline_info(StateData#state.sid,
StateData#state.user,
StateData#state.server,
StateData#state.resource,
Info);
_ ->
ok
end,
handle_unacked_stanzas(StateData)
end,
bounce_messages();
@ -2726,6 +2743,8 @@ handle_resume(StateData, Attrs) ->
case inherit_session_state(StateData, PrevID) of
{ok, InheritedState} ->
{ok, InheritedState, H};
{error, Err, InH} ->
{error, ?MGMT_ITEM_NOT_FOUND_H(Xmlns, InH), Err};
{error, Err} ->
{error, ?MGMT_ITEM_NOT_FOUND(Xmlns), Err}
end;
@ -2965,7 +2984,17 @@ inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) ->
{term, {R, Time}} ->
case ejabberd_sm:get_session_pid(U, S, R) of
none ->
{error, <<"Previous session PID not found">>};
case ejabberd_sm:get_offline_info(Time, U, S, R) of
none ->
{error, <<"Previous session PID not found">>};
Info ->
case proplists:get_value(num_stanzas_in, Info) of
undefined ->
{error, <<"Previous session timed out">>};
H ->
{error, <<"Previous session timed out">>, H}
end
end;
OldPID ->
OldSID = {Time, OldPID},
case catch resume_session(OldSID) of

View File

@ -47,6 +47,8 @@
set_presence/7,
unset_presence/6,
close_session_unset_presence/5,
set_offline_info/5,
get_offline_info/4,
dirty_get_sessions_list/0,
dirty_get_my_sessions_list/0,
get_vh_session_list/1,
@ -178,14 +180,14 @@ get_user_resources(User, Server) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
Mod = get_sm_backend(LServer),
Ss = Mod:get_sessions(LUser, LServer),
Ss = online(Mod:get_sessions(LUser, LServer)),
[element(3, S#session.usr) || S <- clean_session_list(Ss)].
-spec get_user_present_resources(binary(), binary()) -> [tuple()].
get_user_present_resources(LUser, LServer) ->
Mod = get_sm_backend(LServer),
Ss = Mod:get_sessions(LUser, LServer),
Ss = online(Mod:get_sessions(LUser, LServer)),
[{S#session.priority, element(3, S#session.usr)}
|| S <- clean_session_list(Ss), is_integer(S#session.priority)].
@ -196,7 +198,7 @@ get_user_ip(User, Server, Resource) ->
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
case Mod:get_sessions(LUser, LServer, LResource) of
case online(Mod:get_sessions(LUser, LServer, LResource)) of
[] ->
undefined;
Ss ->
@ -211,7 +213,7 @@ get_user_info(User, Server, Resource) ->
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
case Mod:get_sessions(LUser, LServer, LResource) of
case online(Mod:get_sessions(LUser, LServer, LResource)) of
[] ->
offline;
Ss ->
@ -261,17 +263,42 @@ get_session_pid(User, Server, Resource) ->
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
case Mod:get_sessions(LUser, LServer, LResource) of
case online(Mod:get_sessions(LUser, LServer, LResource)) of
[#session{sid = {_, Pid}}] -> Pid;
_ -> none
end.
-spec set_offline_info(sid(), binary(), binary(), binary(), info()) -> ok.
set_offline_info({Time, _Pid}, User, Server, Resource, Info) ->
SID = {Time, undefined},
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
set_session(SID, LUser, LServer, LResource, undefined, Info).
-spec get_offline_info(erlang:timestamp(), binary(), binary(),
binary()) -> none | info().
get_offline_info(Time, User, Server, Resource) ->
SID = {Time, undefined},
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
case Mod:get_sessions(LUser, LServer, LResource) of
[#session{sid = SID, info = Info}] ->
Info;
_ ->
none
end.
-spec dirty_get_sessions_list() -> [ljid()].
dirty_get_sessions_list() ->
lists:flatmap(
fun(Mod) ->
[S#session.usr || S <- Mod:get_sessions()]
[S#session.usr || S <- online(Mod:get_sessions())]
end, get_sm_backends()).
-spec dirty_get_my_sessions_list() -> [#session{}].
@ -279,7 +306,7 @@ dirty_get_sessions_list() ->
dirty_get_my_sessions_list() ->
lists:flatmap(
fun(Mod) ->
[S || S <- Mod:get_sessions(),
[S || S <- online(Mod:get_sessions()),
node(element(2, S#session.sid)) == node()]
end, get_sm_backends()).
@ -288,14 +315,14 @@ dirty_get_my_sessions_list() ->
get_vh_session_list(Server) ->
LServer = jid:nameprep(Server),
Mod = get_sm_backend(LServer),
[S#session.usr || S <- Mod:get_sessions(LServer)].
[S#session.usr || S <- online(Mod:get_sessions(LServer))].
-spec get_all_pids() -> [pid()].
get_all_pids() ->
lists:flatmap(
fun(Mod) ->
[element(2, S#session.sid) || S <- Mod:get_sessions()]
[element(2, S#session.sid) || S <- online(Mod:get_sessions())]
end, get_sm_backends()).
-spec get_vh_session_number(binary()) -> non_neg_integer().
@ -303,7 +330,7 @@ get_all_pids() ->
get_vh_session_number(Server) ->
LServer = jid:nameprep(Server),
Mod = get_sm_backend(LServer),
length(Mod:get_sessions(LServer)).
length(online(Mod:get_sessions(LServer))).
register_iq_handler(Host, XMLNS, Module, Fun) ->
ejabberd_sm ! {register_iq_handler, Host, XMLNS, Module, Fun}.
@ -395,6 +422,15 @@ set_session(SID, User, Server, Resource, Priority, Info) ->
Mod:set_session(#session{sid = SID, usr = USR, us = US,
priority = Priority, info = Info}).
-spec online([#session{}]) -> [#session{}].
online(Sessions) ->
lists:filter(fun(#session{sid = {_, undefined}}) ->
false;
(_) ->
true
end, Sessions).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
do_route(From, To, {broadcast, _} = Packet) ->
@ -409,7 +445,7 @@ do_route(From, To, {broadcast, _} = Packet) ->
_ ->
{U, S, R} = jid:tolower(To),
Mod = get_sm_backend(S),
case Mod:get_sessions(U, S, R) of
case online(Mod:get_sessions(U, S, R)) of
[] ->
?DEBUG("packet dropped~n", []);
Ss ->
@ -511,8 +547,8 @@ do_route(From, To, #xmlel{} = Packet) ->
_ -> ok
end;
_ ->
Mod = get_sm_backend(LServer),
case Mod:get_sessions(LUser, LServer, LResource) of
Mod = get_sm_backend(LServer),
case online(Mod:get_sessions(LUser, LServer, LResource)) of
[] ->
case Name of
<<"message">> ->
@ -584,8 +620,8 @@ route_message(From, To, Packet, Type) ->
(P >= 0) and (Type == headline) ->
LResource = jid:resourceprep(R),
Mod = get_sm_backend(LServer),
case Mod:get_sessions(LUser, LServer,
LResource) of
case online(Mod:get_sessions(LUser, LServer,
LResource)) of
[] ->
ok; % Race condition
Ss ->
@ -646,7 +682,11 @@ check_existing_resources(LUser, LServer, LResource) ->
if SIDs == [] -> ok;
true ->
MaxSID = lists:max(SIDs),
lists:foreach(fun ({_, Pid} = S) when S /= MaxSID ->
lists:foreach(fun ({_, undefined} = S) ->
Mod = get_sm_backend(LServer),
Mod:delete_session(LUser, LServer, LResource,
S);
({_, Pid} = S) when S /= MaxSID ->
Pid ! replaced;
(_) -> ok
end,
@ -663,11 +703,11 @@ get_resource_sessions(User, Server, Resource) ->
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
[S#session.sid || S <- Mod:get_sessions(LUser, LServer, LResource)].
[S#session.sid || S <- online(Mod:get_sessions(LUser, LServer, LResource))].
check_max_sessions(LUser, LServer) ->
Mod = get_sm_backend(LServer),
SIDs = [S#session.sid || S <- Mod:get_sessions(LUser, LServer)],
SIDs = [S#session.sid || S <- online(Mod:get_sessions(LUser, LServer))],
MaxSessions = get_max_user_sessions(LUser, LServer),
if length(SIDs) =< MaxSessions -> ok;
true -> {_, Pid} = lists:min(SIDs), Pid ! replaced
@ -721,7 +761,7 @@ process_iq(From, To, Packet) ->
force_update_presence({LUser, LServer}) ->
Mod = get_sm_backend(LServer),
Ss = Mod:get_sessions(LUser, LServer),
Ss = online(Mod:get_sessions(LUser, LServer)),
lists:foreach(fun (#session{sid = {_, Pid}}) ->
Pid ! {force_update_presence, LUser, LServer}
end,

View File

@ -203,6 +203,8 @@ init_per_testcase(TestCase, OrigConfig) ->
auth(connect(Config));
sm_resume ->
auth(connect(Config));
sm_resume_failed ->
auth(connect(Config));
test_open_session ->
bind(auth(connect(Config)));
_ when IsMaster or IsSlave ->
@ -231,6 +233,7 @@ no_db_tests() ->
stats,
sm,
sm_resume,
sm_resume_failed,
disco]},
{test_proxy65, [parallel],
[proxy65_master, proxy65_slave]}].
@ -641,6 +644,17 @@ sm_resume(Config) ->
?recv1(#message{from = ServerJID, to = MyJID, body = [Txt]}),
?recv1(#sm_r{}),
send(Config, #sm_a{h = 1, xmlns = ?NS_STREAM_MGMT_3}),
%% Send another stanza to increment the server's 'h' for sm_resume_failed.
send(Config, #presence{to = ServerJID}),
close_socket(Config),
{save_config, set_opt(sm_previd, ID, Config)}.
sm_resume_failed(Config) ->
{sm_resume, SMConfig} = ?config(saved_config, Config),
ID = ?config(sm_previd, SMConfig),
ct:sleep(5000), % Wait for session to time out.
send(Config, #sm_resume{previd = ID, h = 1, xmlns = ?NS_STREAM_MGMT_3}),
?recv1(#sm_failed{reason = 'item-not-found', h = 4}),
disconnect(Config).
private(Config) ->

View File

@ -419,6 +419,7 @@ listen:
starttls: true
shaper: c2s_shaper
access: c2s
resume_timeout: 3
-
port: @@s2s_port@@
module: ejabberd_s2s_in

View File

@ -2174,7 +2174,7 @@ encode({sm_resumed, _, _, _} = Resumed) ->
encode_sm_resumed(Resumed, []);
encode({sm_r, _} = R) -> encode_sm_r(R, []);
encode({sm_a, _, _} = A) -> encode_sm_a(A, []);
encode({sm_failed, _, _} = Failed) ->
encode({sm_failed, _, _, _} = Failed) ->
encode_sm_failed(Failed, []);
encode({offline_item, _, _} = Item) ->
encode_offline_item(Item,
@ -2597,7 +2597,7 @@ pp(sm_resume, 3) -> [h, previd, xmlns];
pp(sm_resumed, 3) -> [h, previd, xmlns];
pp(sm_r, 1) -> [xmlns];
pp(sm_a, 2) -> [h, xmlns];
pp(sm_failed, 2) -> [reason, xmlns];
pp(sm_failed, 3) -> [reason, h, xmlns];
pp(offline_item, 2) -> [node, action];
pp(offline, 3) -> [items, purge, fetch];
pp(mix_join, 2) -> [jid, subscribe];
@ -2963,9 +2963,9 @@ decode_sm_failed(__TopXMLNS, __IgnoreEls,
{xmlel, <<"failed">>, _attrs, _els}) ->
Reason = decode_sm_failed_els(__TopXMLNS, __IgnoreEls,
_els, undefined),
Xmlns = decode_sm_failed_attrs(__TopXMLNS, _attrs,
undefined),
{sm_failed, Reason, Xmlns}.
{H, Xmlns} = decode_sm_failed_attrs(__TopXMLNS, _attrs,
undefined, undefined),
{sm_failed, Reason, H, Xmlns}.
decode_sm_failed_els(__TopXMLNS, __IgnoreEls, [],
Reason) ->
@ -3285,20 +3285,25 @@ decode_sm_failed_els(__TopXMLNS, __IgnoreEls,
Reason).
decode_sm_failed_attrs(__TopXMLNS,
[{<<"xmlns">>, _val} | _attrs], _Xmlns) ->
decode_sm_failed_attrs(__TopXMLNS, _attrs, _val);
decode_sm_failed_attrs(__TopXMLNS, [_ | _attrs],
[{<<"h">>, _val} | _attrs], _H, Xmlns) ->
decode_sm_failed_attrs(__TopXMLNS, _attrs, _val, Xmlns);
decode_sm_failed_attrs(__TopXMLNS,
[{<<"xmlns">>, _val} | _attrs], H, _Xmlns) ->
decode_sm_failed_attrs(__TopXMLNS, _attrs, H, _val);
decode_sm_failed_attrs(__TopXMLNS, [_ | _attrs], H,
Xmlns) ->
decode_sm_failed_attrs(__TopXMLNS, _attrs, Xmlns);
decode_sm_failed_attrs(__TopXMLNS, [], Xmlns) ->
decode_sm_failed_attr_xmlns(__TopXMLNS, Xmlns).
decode_sm_failed_attrs(__TopXMLNS, _attrs, H, Xmlns);
decode_sm_failed_attrs(__TopXMLNS, [], H, Xmlns) ->
{decode_sm_failed_attr_h(__TopXMLNS, H),
decode_sm_failed_attr_xmlns(__TopXMLNS, Xmlns)}.
encode_sm_failed({sm_failed, Reason, Xmlns},
encode_sm_failed({sm_failed, Reason, H, Xmlns},
_xmlns_attrs) ->
_els = lists:reverse('encode_sm_failed_$reason'(Reason,
[])),
_attrs = encode_sm_failed_attr_xmlns(Xmlns,
_xmlns_attrs),
encode_sm_failed_attr_h(H,
_xmlns_attrs)),
{xmlel, <<"failed">>, _attrs, _els}.
'encode_sm_failed_$reason'(undefined, _acc) -> _acc;
@ -3443,6 +3448,20 @@ encode_sm_failed({sm_failed, Reason, Xmlns},
<<"urn:ietf:params:xml:ns:xmpp-stanzas">>}])
| _acc].
decode_sm_failed_attr_h(__TopXMLNS, undefined) ->
undefined;
decode_sm_failed_attr_h(__TopXMLNS, _val) ->
case catch dec_int(_val, 0, infinity) of
{'EXIT', _} ->
erlang:error({xmpp_codec,
{bad_attr_value, <<"h">>, <<"failed">>, __TopXMLNS}});
_res -> _res
end.
encode_sm_failed_attr_h(undefined, _acc) -> _acc;
encode_sm_failed_attr_h(_val, _acc) ->
[{<<"h">>, enc_int(_val)} | _acc].
decode_sm_failed_attr_xmlns(__TopXMLNS, undefined) ->
undefined;
decode_sm_failed_attr_xmlns(__TopXMLNS, _val) -> _val.

View File

@ -440,6 +440,7 @@
-record(sasl_mechanisms, {list = [] :: [binary()]}).
-record(sm_failed, {reason :: atom() | #gone{} | #redirect{},
h :: non_neg_integer(),
xmlns :: binary()}).
-record(error, {type :: 'auth' | 'cancel' | 'continue' | 'modify' | 'wait',

View File

@ -2351,8 +2351,11 @@
-xml(sm_failed,
#elem{name = <<"failed">>,
xmlns = [<<"urn:xmpp:sm:2">>, <<"urn:xmpp:sm:3">>],
result = {sm_failed, '$reason', '$xmlns'},
attrs = [#attr{name = <<"xmlns">>}],
result = {sm_failed, '$reason', '$h', '$xmlns'},
attrs = [#attr{name = <<"h">>,
dec = {dec_int, [0, infinity]},
enc = {enc_int, []}},
#attr{name = <<"xmlns">>}],
refs = [#ref{name = error_bad_request,
min = 0, max = 1, label = '$reason'},
#ref{name = error_conflict,