diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index 6f6a196e5..0655bbcf3 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -498,7 +498,7 @@ do_route(From, #jid{lresource = <<"">>} = To, #presence{} = Packet) -> end, get_user_present_resources(LUser, LServer)); do_route(From, #jid{lresource = <<"">>} = To, #message{type = T} = Packet) -> ?DEBUG("processing message to bare JID:~n~s", [xmpp:pp(Packet)]), - if T == chat; T == headline; T == normal -> + if T == chat; T == headline; T == normal; T == groupchat -> route_message(From, To, Packet, T); true -> Lang = xmpp:get_lang(Packet), @@ -516,7 +516,8 @@ do_route(From, To, Packet) -> case online(Mod:get_sessions(LUser, LServer, LResource)) of [] -> case Packet of - #message{type = T} when T == chat; T == normal -> + #message{type = T} when T == chat; T == normal; + T == headline; T == groupchat -> route_message(From, To, Packet, T); #presence{} -> ?DEBUG("dropping presence to unavalable resource:~n~s", @@ -586,20 +587,16 @@ route_message(From, To, Packet, Type) -> end, PrioRes); _ -> - case Type of - headline -> ok; - _ -> - case ejabberd_auth:is_user_exists(LUser, LServer) andalso - is_privacy_allow(From, To, Packet) of - true -> - ejabberd_hooks:run(offline_message_hook, LServer, - [From, To, Packet]); - false -> - Err = xmpp:make_error(Packet, - xmpp:err_service_unavailable()), - ejabberd_router:route(To, From, Err) - end - end + case ejabberd_auth:is_user_exists(LUser, LServer) andalso + is_privacy_allow(From, To, Packet) of + true -> + ejabberd_hooks:run(offline_message_hook, LServer, + [From, To, Packet]); + false -> + Err = xmpp:make_error(Packet, + xmpp:err_service_unavailable()), + ejabberd_router:route(To, From, Err) + end end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/src/mod_offline.erl b/src/mod_offline.erl index 240650234..6134823c1 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -102,7 +102,7 @@ -callback read_message_headers(binary(), binary()) -> any(). -callback read_message(binary(), binary(), non_neg_integer()) -> {ok, #offline_msg{}} | error. --callback remove_message(binary(), binary(), non_neg_integer()) -> ok. +-callback remove_message(binary(), binary(), non_neg_integer()) -> ok | {error, any()}. -callback read_all_messages(binary(), binary()) -> [#offline_msg{}]. -callback remove_all_messages(binary(), binary()) -> {atomic, any()}. -callback count_messages(binary(), binary()) -> non_neg_integer(). @@ -315,32 +315,52 @@ get_info(Acc, _From, _To, _Node, _Lang) -> Acc. -spec handle_offline_query(iq()) -> iq(). +handle_offline_query(#iq{from = #jid{luser = U1, lserver = S1}, + to = #jid{luser = U2, lserver = S2}, + lang = Lang, + sub_els = [#offline{}]} = IQ) + when {U1, S1} /= {U2, S2} -> + Txt = <<"Query to another users is forbidden">>, + xmpp:make_error(IQ, xmpp:err_forbidden(Txt, Lang)); handle_offline_query(#iq{from = #jid{luser = U, lserver = S} = From, to = #jid{luser = U, lserver = S} = _To, - type = Type, - sub_els = [#offline{purge = Purge, - items = Items, - fetch = Fetch}]} = IQ) -> - case Type of - get -> - if Fetch -> handle_offline_fetch(From); - true -> handle_offline_items_view(From, Items) + type = Type, lang = Lang, + sub_els = [#offline{} = Offline]} = IQ) -> + case {Type, Offline} of + {get, #offline{fetch = true, items = [], purge = false}} -> + %% TODO: report database errors + handle_offline_fetch(From), + xmpp:make_iq_result(IQ); + {get, #offline{fetch = false, items = [_|_] = Items, purge = false}} -> + case handle_offline_items_view(From, Items) of + true -> xmpp:make_iq_result(IQ); + false -> xmpp:make_error(IQ, xmpp:err_item_not_found()) end; - set -> - if Purge -> delete_all_msgs(U, S); - true -> handle_offline_items_remove(From, Items) - end - end, - xmpp:make_iq_result(IQ); + {set, #offline{fetch = false, items = [], purge = true}} -> + case delete_all_msgs(U, S) of + {atomic, ok} -> + xmpp:make_iq_result(IQ); + _Err -> + Txt = <<"Database failure">>, + xmpp:make_error(IQ, xmpp:err_internal_server_error(Txt, Lang)) + end; + {set, #offline{fetch = false, items = [_|_] = Items, purge = false}} -> + case handle_offline_items_remove(From, Items) of + true -> xmpp:make_iq_result(IQ); + false -> xmpp:make_error(IQ, xmpp:err_item_not_found()) + end; + _ -> + xmpp:make_error(IQ, xmpp:err_bad_request()) + end; handle_offline_query(#iq{lang = Lang} = IQ) -> - Txt = <<"Query to another users is forbidden">>, - xmpp:make_error(IQ, xmpp:err_forbidden(Txt, Lang)). + Txt = <<"No module is handling this query">>, + xmpp:make_error(IQ, xmpp:err_service_unavailable(Txt, Lang)). --spec handle_offline_items_view(jid(), [offline_item()]) -> ok. +-spec handle_offline_items_view(jid(), [offline_item()]) -> boolean(). handle_offline_items_view(JID, Items) -> {U, S, R} = jid:tolower(JID), - lists:foreach( - fun(#offline_item{node = Node, action = view}) -> + lists:foldl( + fun(#offline_item{node = Node, action = view}, Acc) -> case fetch_msg_by_node(JID, Node) of {ok, OfflineMsg} -> case offline_msg_to_route(S, OfflineMsg) of @@ -351,25 +371,22 @@ handle_offline_items_view(JID, Items) -> Pid ! {route, From, To, NewEl}; none -> ok - end; + end, + Acc or true; error -> - ok + Acc or false end; error -> - ok - end; - (_) -> - ok - end, Items). + Acc or false + end + end, false, Items). --spec handle_offline_items_remove(jid(), [offline_item()]) -> ok. +-spec handle_offline_items_remove(jid(), [offline_item()]) -> boolean(). handle_offline_items_remove(JID, Items) -> - lists:foreach( - fun(#offline_item{node = Node, action = remove}) -> - remove_msg_by_node(JID, Node); - (_) -> - ok - end, Items). + lists:foldl( + fun(#offline_item{node = Node, action = remove}, Acc) -> + Acc or remove_msg_by_node(JID, Node) + end, false, Items). -spec set_offline_tag(message(), binary()) -> message(). set_offline_tag(Msg, Node) -> @@ -401,23 +418,22 @@ fetch_msg_by_node(To, Seq) -> error end. --spec remove_msg_by_node(jid(), binary()) -> ok. +-spec remove_msg_by_node(jid(), binary()) -> boolean(). remove_msg_by_node(To, Seq) -> case catch binary_to_integer(Seq) of I when is_integer(I), I>= 0 -> LUser = To#jid.luser, LServer = To#jid.lserver, Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:remove_message(LUser, LServer, I); + Mod:remove_message(LUser, LServer, I), + true; _ -> - ok + false end. -spec need_to_store(binary(), message()) -> boolean(). need_to_store(_LServer, #message{type = error}) -> false; -need_to_store(_LServer, #message{type = groupchat}) -> false; -need_to_store(_LServer, #message{type = headline}) -> false; -need_to_store(LServer, Packet) -> +need_to_store(LServer, #message{type = Type} = Packet) -> case xmpp:has_subtag(Packet, #offline{}) of false -> case check_store_hint(Packet) of @@ -425,6 +441,8 @@ need_to_store(LServer, Packet) -> true; no_store -> false; + none when Type == headline; Type == groupchat -> + false; none -> case gen_mod:get_module_opt( LServer, ?MODULE, store_empty_body, diff --git a/src/mod_offline_mnesia.erl b/src/mod_offline_mnesia.erl index e8db08ddf..c9f088fa4 100644 --- a/src/mod_offline_mnesia.erl +++ b/src/mod_offline_mnesia.erl @@ -127,12 +127,16 @@ read_message(LUser, LServer, I) -> remove_message(LUser, LServer, I) -> US = {LUser, LServer}, TS = integer_to_now(I), - Msgs = mnesia:dirty_match_object( - offline_msg, #offline_msg{us = US, timestamp = TS, _ = '_'}), - lists:foreach( - fun(Msg) -> - mnesia:dirty_delete_object(Msg) - end, Msgs). + case mnesia:dirty_match_object( + offline_msg, #offline_msg{us = US, timestamp = TS, _ = '_'}) of + [] -> + {error, notfound}; + Msgs -> + lists:foreach( + fun(Msg) -> + mnesia:dirty_delete_object(Msg) + end, Msgs) + end. read_all_messages(LUser, LServer) -> US = {LUser, LServer}, diff --git a/src/xmpp_util.erl b/src/xmpp_util.erl index 102d88412..fb3bbc7ab 100644 --- a/src/xmpp_util.erl +++ b/src/xmpp_util.erl @@ -70,7 +70,7 @@ unwrap_carbon(Stanza) -> Stanza. is_standalone_chat_state(Stanza) -> case unwrap_carbon(Stanza) of #message{body = [], subject = [], sub_els = Els} -> - IgnoreNS = [?NS_CHATSTATES, ?NS_DELAY], + IgnoreNS = [?NS_CHATSTATES, ?NS_DELAY, ?NS_EVENT], Stripped = [El || El <- Els, not lists:member(xmpp:get_ns(El), IgnoreNS)], Stripped == []; diff --git a/test/ejabberd_SUITE.erl b/test/ejabberd_SUITE.erl index a13d801ea..121719cdf 100644 --- a/test/ejabberd_SUITE.erl +++ b/test/ejabberd_SUITE.erl @@ -399,14 +399,12 @@ db_tests(riak) -> privacy_tests:single_cases(), vcard, muc_tests:single_cases(), + offline_tests:master_slave_cases(), test_unregister]}, muc_tests:master_slave_cases(), privacy_tests:master_slave_cases(), roster_tests:master_slave_cases(), - {test_flex_offline, [sequence], - [flex_offline_master, flex_offline_slave]}, - {test_offline, [sequence], - [offline_master, offline_slave]}, + offline_tests:master_slave_cases(), {test_announce, [sequence], [announce_master, announce_slave]}, {test_vcard_xupdate, [parallel], @@ -425,17 +423,15 @@ db_tests(DB) when DB == mnesia; DB == redis -> vcard, pubsub_single_tests(), muc_tests:single_cases(), + offline_tests:single_cases(), test_unregister]}, muc_tests:master_slave_cases(), privacy_tests:master_slave_cases(), pubsub_multiple_tests(), roster_tests:master_slave_cases(), + offline_tests:master_slave_cases(), {test_mix, [parallel], [mix_master, mix_slave]}, - {test_flex_offline, [sequence], - [flex_offline_master, flex_offline_slave]}, - {test_offline, [sequence], - [offline_master, offline_slave]}, {test_old_mam, [parallel], [mam_old_master, mam_old_slave]}, {test_new_mam, [parallel], @@ -465,17 +461,15 @@ db_tests(_) -> vcard, pubsub_single_tests(), muc_tests:single_cases(), + offline_tests:single_cases(), test_unregister]}, muc_tests:master_slave_cases(), privacy_tests:master_slave_cases(), pubsub_multiple_tests(), roster_tests:master_slave_cases(), + offline_tests:master_slave_cases(), {test_mix, [parallel], [mix_master, mix_slave]}, - {test_flex_offline, [sequence], - [flex_offline_master, flex_offline_slave]}, - {test_offline, [sequence], - [offline_master, offline_slave]}, {test_old_mam, [parallel], [mam_old_master, mam_old_slave]}, {test_new_mam, [parallel], @@ -2294,6 +2288,35 @@ muc_config_visitor_nickchange_master(Config) -> muc_config_visitor_nickchange_slave(Config) -> muc_tests:muc_config_visitor_nickchange_slave(Config). +offline_feature_enabled(Config) -> + offline_tests:feature_enabled(Config). +offline_check_identity(Config) -> + offline_tests:check_identity(Config). +offline_send_non_existent(Config) -> + offline_tests:send_non_existent(Config). +offline_view_non_existent(Config) -> + offline_tests:view_non_existent(Config). +offline_remove_non_existent(Config) -> + offline_tests:remove_non_existent(Config). +offline_view_non_integer(Config) -> + offline_tests:view_non_integer(Config). +offline_remove_non_integer(Config) -> + offline_tests:remove_non_integer(Config). +offline_malformed_iq(Config) -> + offline_tests:malformed_iq(Config). +offline_wrong_user(Config) -> + offline_tests:wrong_user(Config). +offline_unsupported_iq(Config) -> + offline_tests:unsupported_iq(Config). +offline_flex_master(Config) -> + offline_tests:flex_master(Config). +offline_flex_slave(Config) -> + offline_tests:flex_slave(Config). +offline_send_all_master(Config) -> + offline_tests:send_all_master(Config). +offline_send_all_slave(Config) -> + offline_tests:send_all_slave(Config). + announce_master(Config) -> MyJID = my_jid(Config), ServerJID = server_jid(Config), @@ -2317,155 +2340,6 @@ announce_slave(Config) -> send(Config, #message{to = MotdDelJID}), disconnect(Config). -flex_offline_master(Config) -> - Peer = ?config(slave, Config), - LPeer = jid:remove_resource(Peer), - lists:foreach( - fun(I) -> - Body = integer_to_binary(I), - send(Config, #message{to = LPeer, - body = [#text{data = Body}], - subject = [#text{data = <<"subject">>}]}) - end, lists:seq(1, 5)), - disconnect(Config). - -flex_offline_slave(Config) -> - MyJID = my_jid(Config), - MyBareJID = jid:remove_resource(MyJID), - Peer = ?config(master, Config), - Peer_s = jid:to_string(Peer), - true = is_feature_advertised(Config, ?NS_FLEX_OFFLINE), - %% Request disco#info - #iq{type = result, - sub_els = [#disco_info{ - node = ?NS_FLEX_OFFLINE, - identities = Ids, - features = Fts, - xdata = [X]}]} = - send_recv(Config, #iq{type = get, - sub_els = [#disco_info{ - node = ?NS_FLEX_OFFLINE}]}), - %% Check if we have correct identities - true = lists:any( - fun(#identity{category = <<"automation">>, - type = <<"message-list">>}) -> true; - (_) -> false - end, Ids), - %% Check if we have needed feature - true = lists:member(?NS_FLEX_OFFLINE, Fts), - %% Check xdata, the 'number_of_messages' should be 5 - #xdata{type = result, - fields = [#xdata_field{type = hidden, - var = <<"FORM_TYPE">>}, - #xdata_field{var = <<"number_of_messages">>, - values = [<<"5">>]}]} = X, - %% Fetch headers, - #iq{type = result, - sub_els = [#disco_items{ - node = ?NS_FLEX_OFFLINE, - items = DiscoItems}]} = - send_recv(Config, #iq{type = get, - sub_els = [#disco_items{ - node = ?NS_FLEX_OFFLINE}]}), - %% Check if headers are correct - Nodes = lists:sort( - lists:map( - fun(#disco_item{jid = J, name = P, node = N}) - when (J == MyBareJID) and (P == Peer_s) -> - N - end, DiscoItems)), - %% Since headers are received we can send initial presence without a risk - %% of getting offline messages flood - #presence{from = MyJID} = send_recv(Config, #presence{}), - %% Check full fetch - #iq{type = result, sub_els = []} = - send_recv(Config, #iq{type = get, sub_els = [#offline{fetch = true}]}), - lists:foreach( - fun({I, N}) -> - Text = integer_to_binary(I), - #message{body = Body, sub_els = SubEls} = recv_message(Config), - [#text{data = Text}] = Body, - #offline{items = [#offline_item{node = N}]} = - lists:keyfind(offline, 1, SubEls), - #delay{} = lists:keyfind(delay, 1, SubEls) - end, lists:zip(lists:seq(1, 5), Nodes)), - %% Fetch 2nd and 4th message - #iq{type = result, sub_els = []} = - send_recv( - Config, - #iq{type = get, - sub_els = [#offline{ - items = [#offline_item{ - action = view, - node = lists:nth(2, Nodes)}, - #offline_item{ - action = view, - node = lists:nth(4, Nodes)}]}]}), - lists:foreach( - fun({I, N}) -> - Text = integer_to_binary(I), - #message{body = [#text{data = Text}], - sub_els = SubEls} = recv_message(Config), - #offline{items = [#offline_item{node = N}]} = - lists:keyfind(offline, 1, SubEls) - end, lists:zip([2, 4], [lists:nth(2, Nodes), lists:nth(4, Nodes)])), - %% Delete 2nd and 4th message - #iq{type = result, sub_els = []} = - send_recv( - Config, - #iq{type = set, - sub_els = [#offline{ - items = [#offline_item{ - action = remove, - node = lists:nth(2, Nodes)}, - #offline_item{ - action = remove, - node = lists:nth(4, Nodes)}]}]}), - %% Check if messages were deleted - #iq{type = result, - sub_els = [#disco_items{ - node = ?NS_FLEX_OFFLINE, - items = RemainedItems}]} = - send_recv(Config, #iq{type = get, - sub_els = [#disco_items{ - node = ?NS_FLEX_OFFLINE}]}), - RemainedNodes = [lists:nth(1, Nodes), - lists:nth(3, Nodes), - lists:nth(5, Nodes)], - RemainedNodes = lists:sort( - lists:map( - fun(#disco_item{node = N}) -> N end, - RemainedItems)), - %% Purge everything left - #iq{type = result, sub_els = []} = - send_recv(Config, #iq{type = set, sub_els = [#offline{purge = true}]}), - %% Check if there is no offline messages - #iq{type = result, - sub_els = [#disco_items{node = ?NS_FLEX_OFFLINE, items = []}]} = - send_recv(Config, #iq{type = get, - sub_els = [#disco_items{ - node = ?NS_FLEX_OFFLINE}]}), - disconnect(Config). - -offline_master(Config) -> - Peer = ?config(slave, Config), - LPeer = jid:remove_resource(Peer), - send(Config, #message{to = LPeer, - body = [#text{data = <<"body">>}], - subject = [#text{data = <<"subject">>}]}), - disconnect(Config). - -offline_slave(Config) -> - Peer = ?config(master, Config), - #presence{} = send_recv(Config, #presence{}), - #message{sub_els = SubEls, - from = Peer, - body = [#text{data = <<"body">>}], - subject = [#text{data = <<"subject">>}]} = - recv_message(Config), - true = lists:keymember(delay, 1, SubEls), - disconnect(Config). - carbons_master(Config) -> MyJID = my_jid(Config), MyBareJID = jid:remove_resource(MyJID), diff --git a/test/ejabberd_SUITE_data/ejabberd.yml b/test/ejabberd_SUITE_data/ejabberd.yml index 3a6d4947f..29243d683 100644 --- a/test/ejabberd_SUITE_data/ejabberd.yml +++ b/test/ejabberd_SUITE_data/ejabberd.yml @@ -388,8 +388,7 @@ access: local: local: allow max_user_offline_messages: - admin: 5000 - all: 100 + all: infinity max_user_sessions: all: 10 muc: @@ -459,4 +458,4 @@ s2s_use_starttls: false s2s_cafile: CAFILE shaper: fast: 50000 - normal: 1000 + normal: 10000 diff --git a/test/offline_tests.erl b/test/offline_tests.erl new file mode 100644 index 000000000..ea34544e3 --- /dev/null +++ b/test/offline_tests.erl @@ -0,0 +1,406 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov +%%% @copyright (C) 2016, Evgeny Khramtsov +%%% @doc +%%% +%%% @end +%%% Created : 7 Nov 2016 by Evgeny Khramtsov +%%%------------------------------------------------------------------- +-module(offline_tests). + +%% API +-compile(export_all). +-import(suite, [send/2, disconnect/1, my_jid/1, send_recv/2, recv_message/1, + get_features/1, recv/1, get_event/1, server_jid/1, + wait_for_master/1, wait_for_slave/1]). +-include("suite.hrl"). + +%%%=================================================================== +%%% API +%%%=================================================================== +single_cases() -> + {offline_single, [sequence], + [single_test(feature_enabled), + single_test(check_identity), + single_test(send_non_existent), + single_test(view_non_existent), + single_test(remove_non_existent), + single_test(view_non_integer), + single_test(remove_non_integer), + single_test(malformed_iq), + single_test(wrong_user), + single_test(unsupported_iq)]}. + +feature_enabled(Config) -> + Features = get_features(Config), + ct:comment("Checking if offline features are set"), + true = lists:member(?NS_FEATURE_MSGOFFLINE, Features), + true = lists:member(?NS_FLEX_OFFLINE, Features), + disconnect(Config). + +check_identity(Config) -> + #iq{type = result, + sub_els = [#disco_info{ + node = ?NS_FLEX_OFFLINE, + identities = Ids}]} = + send_recv(Config, #iq{type = get, + sub_els = [#disco_info{ + node = ?NS_FLEX_OFFLINE}]}), + true = lists:any( + fun(#identity{category = <<"automation">>, + type = <<"message-list">>}) -> true; + (_) -> false + end, Ids), + disconnect(Config). + +send_non_existent(Config) -> + Server = ?config(server, Config), + To = jid:make(<<"non-existent">>, Server), + #message{type = error} = Err = send_recv(Config, #message{to = To}), + #stanza_error{reason = 'service-unavailable'} = xmpp:get_error(Err), + disconnect(Config). + +view_non_existent(Config) -> + #stanza_error{reason = 'item-not-found'} = view(Config, [randoms:get_string()], false), + disconnect(Config). + +remove_non_existent(Config) -> + ok = remove(Config, [randoms:get_string()]), + disconnect(Config). + +view_non_integer(Config) -> + #stanza_error{reason = 'item-not-found'} = view(Config, [<<"foo">>], false), + disconnect(Config). + +remove_non_integer(Config) -> + #stanza_error{reason = 'item-not-found'} = remove(Config, [<<"foo">>]), + disconnect(Config). + +malformed_iq(Config) -> + Item = #offline_item{node = randoms:get_string()}, + Range = [{Type, SubEl} || Type <- [set, get], + SubEl <- [#offline{items = [], _ = false}, + #offline{items = [Item], _ = true}]] + ++ [{set, #offline{items = [], fetch = true, purge = false}}, + {set, #offline{items = [Item], fetch = true, purge = false}}, + {get, #offline{items = [], fetch = false, purge = true}}, + {get, #offline{items = [Item], fetch = false, purge = true}}], + lists:foreach( + fun({Type, SubEl}) -> + #iq{type = error} = Err = + send_recv(Config, #iq{type = Type, sub_els = [SubEl]}), + #stanza_error{reason = 'bad-request'} = xmpp:get_error(Err) + end, Range), + disconnect(Config). + +wrong_user(Config) -> + Server = ?config(server, Config), + To = jid:make(<<"foo">>, Server), + Item = #offline_item{node = randoms:get_string()}, + Range = [{Type, Items, Purge, Fetch} || + Type <- [set, get], + Items <- [[], [Item]], + Purge <- [false, true], + Fetch <- [false, true]], + lists:foreach( + fun({Type, Items, Purge, Fetch}) -> + #iq{type = error} = Err = + send_recv(Config, #iq{type = Type, to = To, + sub_els = [#offline{items = Items, + purge = Purge, + fetch = Fetch}]}), + #stanza_error{reason = 'forbidden'} = xmpp:get_error(Err) + end, Range), + disconnect(Config). + +unsupported_iq(Config) -> + Item = #offline_item{node = randoms:get_string()}, + lists:foreach( + fun(Type) -> + #iq{type = error} = Err = + send_recv(Config, #iq{type = Type, sub_els = [Item]}), + #stanza_error{reason = 'service-unavailable'} = xmpp:get_error(Err) + end, [set, get]), + disconnect(Config). + +%%%=================================================================== +%%% Master-slave tests +%%%=================================================================== +master_slave_cases() -> + {offline_master_slave, [sequence], + [master_slave_test(flex), + master_slave_test(send_all)]}. + +flex_master(Config) -> + send_messages(Config, 5), + disconnect(Config). + +flex_slave(Config) -> + wait_for_master(Config), + peer_down = get_event(Config), + 5 = get_number(Config), + Nodes = get_nodes(Config), + %% Since headers are received we can send initial presence without a risk + %% of getting offline messages flood + #presence{} = send_recv(Config, #presence{}), + ct:comment("Checking fetch"), + Nodes = fetch(Config, lists:seq(1, 5)), + ct:comment("Fetching 2nd and 4th message"), + [2, 4] = view(Config, [lists:nth(2, Nodes), lists:nth(4, Nodes)]), + ct:comment("Deleting 2nd and 4th message"), + ok = remove(Config, [lists:nth(2, Nodes), lists:nth(4, Nodes)]), + ct:comment("Checking if messages were deleted"), + [1, 3, 5] = view(Config, [lists:nth(1, Nodes), + lists:nth(3, Nodes), + lists:nth(5, Nodes)]), + ct:comment("Purging everything left"), + ok = purge(Config), + ct:comment("Checking if there are no offline messages"), + 0 = get_number(Config), + clean(disconnect(Config)). + +send_all_master(Config) -> + wait_for_slave(Config), + Peer = ?config(peer, Config), + BarePeer = jid:remove_resource(Peer), + {Deliver, Errors} = message_iterator(Config), + N = lists:foldl( + fun(#message{type = error} = Msg, Acc) -> + send(Config, Msg#message{to = BarePeer}), + Acc; + (Msg, Acc) -> + I = send(Config, Msg#message{to = BarePeer}), + case xmpp:get_subtag(Msg, #xevent{}) of + #xevent{offline = true, id = undefined} -> + ct:comment("Receiving event-reply for:~n~s", + [xmpp:pp(Msg)]), + #message{} = Reply = recv_message(Config), + #xevent{id = I} = xmpp:get_subtag(Reply, #xevent{}); + _ -> + ok + end, + Acc + 1 + end, 0, Deliver), + lists:foreach( + fun(Msg) -> + #message{type = error} = Err = + send_recv(Config, Msg#message{to = BarePeer}), + #stanza_error{reason = 'service-unavailable'} = xmpp:get_error(Err) + end, Errors), + ok = wait_for_complete(Config, N), + disconnect(Config). + +send_all_slave(Config) -> + ServerJID = server_jid(Config), + Peer = ?config(peer, Config), + wait_for_master(Config), + peer_down = get_event(Config), + #presence{} = send_recv(Config, #presence{}), + {Deliver, _Errors} = message_iterator(Config), + lists:foreach( + fun(#message{type = error}) -> + ok; + (#message{type = Type, body = Body, subject = Subject} = Msg) -> + ct:comment("Receiving message:~n~s", [xmpp:pp(Msg)]), + #message{from = Peer, + type = Type, + body = Body, + subject = Subject} = RecvMsg = recv_message(Config), + ct:comment("Checking if delay tag is correctly set"), + #delay{from = ServerJID} = xmpp:get_subtag(RecvMsg, #delay{}) + end, Deliver), + disconnect(Config). + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +single_test(T) -> + list_to_atom("offline_" ++ atom_to_list(T)). + +master_slave_test(T) -> + {list_to_atom("offline_" ++ atom_to_list(T)), [parallel], + [list_to_atom("offline_" ++ atom_to_list(T) ++ "_master"), + list_to_atom("offline_" ++ atom_to_list(T) ++ "_slave")]}. + +clean(Config) -> + {U, S, _} = jid:tolower(my_jid(Config)), + mod_offline:remove_user(U, S), + Config. + +send_messages(Config, Num) -> + send_messages(Config, Num, normal, []). + +send_messages(Config, Num, Type, SubEls) -> + wait_for_slave(Config), + Peer = ?config(peer, Config), + BarePeer = jid:remove_resource(Peer), + lists:foreach( + fun(I) -> + Body = integer_to_binary(I), + send(Config, + #message{to = BarePeer, + type = Type, + body = [#text{data = Body}], + subject = [#text{data = <<"subject">>}], + sub_els = SubEls}) + end, lists:seq(1, Num)), + ct:comment("Waiting for all messages to be delivered to offline spool"), + ok = wait_for_complete(Config, Num). + +recv_messages(Config, Num) -> + wait_for_master(Config), + peer_down = get_event(Config), + Peer = ?config(peer, Config), + #presence{} = send_recv(Config, #presence{}), + lists:foreach( + fun(I) -> + Text = integer_to_binary(I), + #message{sub_els = SubEls, + from = Peer, + body = [#text{data = Text}], + subject = [#text{data = <<"subject">>}]} = + recv_message(Config), + true = lists:keymember(delay, 1, SubEls) + end, lists:seq(1, Num)), + clean(disconnect(Config)). + +get_number(Config) -> + ct:comment("Getting offline message number"), + #iq{type = result, + sub_els = [#disco_info{ + node = ?NS_FLEX_OFFLINE, + xdata = [X]}]} = + send_recv(Config, #iq{type = get, + sub_els = [#disco_info{ + node = ?NS_FLEX_OFFLINE}]}), + Form = flex_offline:decode(X#xdata.fields), + proplists:get_value(number_of_messages, Form). + +get_nodes(Config) -> + MyJID = my_jid(Config), + MyBareJID = jid:remove_resource(MyJID), + Peer = ?config(peer, Config), + Peer_s = jid:to_string(Peer), + ct:comment("Getting headers"), + #iq{type = result, + sub_els = [#disco_items{ + node = ?NS_FLEX_OFFLINE, + items = DiscoItems}]} = + send_recv(Config, #iq{type = get, + sub_els = [#disco_items{ + node = ?NS_FLEX_OFFLINE}]}), + ct:comment("Checking if headers are correct"), + lists:sort( + lists:map( + fun(#disco_item{jid = J, name = P, node = N}) + when (J == MyBareJID) and (P == Peer_s) -> + N + end, DiscoItems)). + +fetch(Config, Range) -> + ID = send(Config, #iq{type = get, sub_els = [#offline{fetch = true}]}), + Nodes = lists:map( + fun(I) -> + Text = integer_to_binary(I), + #message{body = Body, sub_els = SubEls} = recv(Config), + [#text{data = Text}] = Body, + #offline{items = [#offline_item{node = Node}]} = + lists:keyfind(offline, 1, SubEls), + #delay{} = lists:keyfind(delay, 1, SubEls), + Node + end, Range), + #iq{id = ID, type = result, sub_els = []} = recv(Config), + Nodes. + +view(Config, Nodes) -> + view(Config, Nodes, true). + +view(Config, Nodes, NeedReceive) -> + Items = lists:map( + fun(Node) -> + #offline_item{action = view, node = Node} + end, Nodes), + I = send(Config, + #iq{type = get, sub_els = [#offline{items = Items}]}), + Range = if NeedReceive -> + lists:map( + fun(Node) -> + #message{body = [#text{data = Text}], + sub_els = SubEls} = recv(Config), + #offline{items = [#offline_item{node = Node}]} = + lists:keyfind(offline, 1, SubEls), + binary_to_integer(Text) + end, Nodes); + true -> + [] + end, + case recv(Config) of + #iq{id = I, type = result, sub_els = []} -> Range; + #iq{id = I, type = error} = Err -> xmpp:get_error(Err) + end. + +remove(Config, Nodes) -> + Items = lists:map( + fun(Node) -> + #offline_item{action = remove, node = Node} + end, Nodes), + case send_recv(Config, #iq{type = set, + sub_els = [#offline{items = Items}]}) of + #iq{type = result, sub_els = []} -> + ok; + #iq{type = error} = Err -> + xmpp:get_error(Err) + end. + +purge(Config) -> + case send_recv(Config, #iq{type = set, + sub_els = [#offline{purge = true}]}) of + #iq{type = result, sub_els = []} -> + ok; + #iq{type = error} = Err -> + xmpp:get_error(Err) + end. + +wait_for_complete(_Config, 0) -> + ok; +wait_for_complete(Config, N) -> + {U, S, _} = jid:tolower(?config(peer, Config)), + lists:foldl( + fun(_Time, ok) -> + ok; + (Time, Acc) -> + timer:sleep(Time), + case mod_offline:count_offline_messages(U, S) of + N -> ok; + _ -> Acc + end + end, error, [0, 100, 200, 2000, 5000, 10000]). + +message_iterator(Config) -> + ServerJID = server_jid(Config), + ChatStates = [[#chatstate{type = composing}]], + Offline = [[#offline{}]], + Hints = [[#hint{type = T}] || T <- [store, 'no-store']], + XEvent = [[#xevent{id = ID, offline = OfflineFlag}] + || ID <- [undefined, randoms:get_string()], + OfflineFlag <- [false, true]], + Delay = [[#delay{stamp = p1_time_compat:timestamp(), from = ServerJID}]], + AllEls = [Els1 ++ Els2 || Els1 <- [[]] ++ ChatStates ++ Delay ++ Hints ++ Offline, + Els2 <- [[]] ++ XEvent], + All = [#message{type = Type, body = Body, subject = Subject, sub_els = Els} + || %%Type <- [chat], + Type <- [error, chat, normal, groupchat, headline], + Body <- [[], xmpp:mk_text(<<"body">>)], + Subject <- [[], xmpp:mk_text(<<"subject">>)], + Els <- AllEls], + lists:partition( + fun(#message{type = error}) -> true; + (#message{sub_els = [#offline{}|_]}) -> false; + (#message{sub_els = [_, #xevent{id = I}]}) when I /= undefined -> false; + (#message{sub_els = [#xevent{id = I}]}) when I /= undefined -> false; + (#message{sub_els = [#hint{type = store}|_]}) -> true; + (#message{sub_els = [#hint{type = 'no-store'}|_]}) -> false; + (#message{body = [], subject = []}) -> false; + (#message{type = Type}) -> (Type == chat) or (Type == normal); + (_) -> false + end, All). diff --git a/test/suite.erl b/test/suite.erl index f88ac5a5e..52c030df1 100644 --- a/test/suite.erl +++ b/test/suite.erl @@ -199,27 +199,31 @@ init_stream(Config) -> component -> ?NS_COMPONENT; server -> ?NS_SERVER end, - #stream_start{id = ID, xmlns = XMLNS, version = Version} = recv(Config), - set_opt(stream_id, ID, NewConfig). + receive + #stream_start{id = ID, xmlns = XMLNS, version = Version} -> + set_opt(stream_id, ID, NewConfig) + end. process_stream_features(Config) -> - #stream_features{sub_els = Fs} = recv(Config), - Mechs = lists:flatmap( - fun(#sasl_mechanisms{list = Ms}) -> - Ms; - (_) -> - [] - end, Fs), - lists:foldl( - fun(#feature_register{}, Acc) -> - set_opt(register, true, Acc); - (#starttls{}, Acc) -> - set_opt(starttls, true, Acc); - (#compression{methods = Ms}, Acc) -> - set_opt(compression, Ms, Acc); - (_, Acc) -> - Acc - end, set_opt(mechs, Mechs, Config), Fs). + receive + #stream_features{sub_els = Fs} -> + Mechs = lists:flatmap( + fun(#sasl_mechanisms{list = Ms}) -> + Ms; + (_) -> + [] + end, Fs), + lists:foldl( + fun(#feature_register{}, Acc) -> + set_opt(register, true, Acc); + (#starttls{}, Acc) -> + set_opt(starttls, true, Acc); + (#compression{methods = Ms}, Acc) -> + set_opt(compression, Ms, Acc); + (_, Acc) -> + Acc + end, set_opt(mechs, Mechs, Config), Fs) + end. disconnect(Config) -> ct:comment("Disconnecting"), @@ -245,7 +249,7 @@ starttls(Config) -> starttls(Config, ShouldFail) -> send(Config, #starttls{}), - case recv(Config) of + receive #starttls_proceed{} when ShouldFail -> ct:fail(starttls_should_have_failed); #starttls_failure{} when ShouldFail -> @@ -262,7 +266,7 @@ starttls(Config, ShouldFail) -> zlib(Config) -> send(Config, #compress{methods = [<<"zlib">>]}), - #compressed{} = recv(Config), + receive #compressed{} -> ok end, ZlibSocket = ejabberd_socket:compress(?config(socket, Config)), process_stream_features(init_stream(set_opt(socket, ZlibSocket, Config))). @@ -376,7 +380,7 @@ auth_component(Config, ShouldFail) -> Password = ?config(password, Config), Digest = p1_sha:sha(<>), send(Config, #handshake{data = Digest}), - case recv(Config) of + receive #handshake{} when ShouldFail -> ct:fail(component_auth_should_have_failed); #handshake{} -> @@ -399,7 +403,7 @@ auth_SASL(Mech, Config, ShouldFail) -> wait_auth_SASL_result(set_opt(sasl, SASL, Config), ShouldFail). wait_auth_SASL_result(Config, ShouldFail) -> - case recv(Config) of + receive #sasl_success{} when ShouldFail -> ct:fail(sasl_auth_should_have_failed); #sasl_success{} -> @@ -409,24 +413,25 @@ wait_auth_SASL_result(Config, ShouldFail) -> NS = if Type == client -> ?NS_CLIENT; Type == server -> ?NS_SERVER end, - #stream_start{xmlns = NS, version = {1,0}} = recv(Config), - #stream_features{sub_els = Fs} = recv(Config), - if Type == client -> - #xmpp_session{optional = true} = - lists:keyfind(xmpp_session, 1, Fs); - true -> - ok - end, - lists:foldl( - fun(#feature_sm{}, ConfigAcc) -> - set_opt(sm, true, ConfigAcc); - (#feature_csi{}, ConfigAcc) -> - set_opt(csi, true, ConfigAcc); - (#rosterver_feature{}, ConfigAcc) -> - set_opt(rosterver, true, ConfigAcc); - (_, ConfigAcc) -> - ConfigAcc - end, Config, Fs); + receive #stream_start{xmlns = NS, version = {1,0}} -> ok end, + receive #stream_features{sub_els = Fs} -> + if Type == client -> + #xmpp_session{optional = true} = + lists:keyfind(xmpp_session, 1, Fs); + true -> + ok + end, + lists:foldl( + fun(#feature_sm{}, ConfigAcc) -> + set_opt(sm, true, ConfigAcc); + (#feature_csi{}, ConfigAcc) -> + set_opt(csi, true, ConfigAcc); + (#rosterver_feature{}, ConfigAcc) -> + set_opt(rosterver, true, ConfigAcc); + (_, ConfigAcc) -> + ConfigAcc + end, Config, Fs) + end; #sasl_challenge{text = ClientIn} -> {Response, SASL} = (?config(sasl, Config))(ClientIn), send(Config, #sasl_response{text = Response}),