diff --git a/src/mod_mam.erl b/src/mod_mam.erl index 5e20184fa..73a00180e 100644 --- a/src/mod_mam.erl +++ b/src/mod_mam.erl @@ -42,7 +42,7 @@ get_room_config/4, set_room_option/3, offline_message/1, export/1, mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2, is_empty_for_user/2, is_empty_for_room/3, check_create_room/4, - process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2]). + process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2, select/6]). -include("xmpp.hrl"). -include("logger.hrl"). @@ -112,7 +112,7 @@ start(Host, Opts) -> ejabberd_hooks:add(user_send_packet, Host, ?MODULE, user_send_packet_strip_tag, 500), ejabberd_hooks:add(offline_message_hook, Host, ?MODULE, - offline_message, 50), + offline_message, 49), ejabberd_hooks:add(muc_filter_message, Host, ?MODULE, muc_filter_message, 50), ejabberd_hooks:add(muc_process_iq, Host, ?MODULE, @@ -188,7 +188,7 @@ stop(Host) -> ejabberd_hooks:delete(user_send_packet, Host, ?MODULE, user_send_packet_strip_tag, 500), ejabberd_hooks:delete(offline_message_hook, Host, ?MODULE, - offline_message, 50), + offline_message, 49), ejabberd_hooks:delete(muc_filter_message, Host, ?MODULE, muc_filter_message, 50), ejabberd_hooks:delete(muc_process_iq, Host, ?MODULE, diff --git a/src/mod_offline.erl b/src/mod_offline.erl index 4a1a4cca2..6a9114a92 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -61,7 +61,8 @@ c2s_copy_session/2, webadmin_page/3, webadmin_user/4, - webadmin_user_parse_query/5]). + webadmin_user_parse_query/5, + user_unset_presence/4]). -export([mod_opt_type/1, mod_options/1, depends/2]). @@ -131,6 +132,8 @@ start(Host, Opts) -> ?MODULE, webadmin_user, 50), ejabberd_hooks:add(webadmin_user_parse_query, Host, ?MODULE, webadmin_user_parse_query, 50), + ejabberd_hooks:add(unset_presence_hook, Host, ?MODULE, + user_unset_presence, 50), gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE, ?MODULE, handle_offline_query). @@ -153,6 +156,8 @@ stop(Host) -> ?MODULE, webadmin_user, 50), ejabberd_hooks:delete(webadmin_user_parse_query, Host, ?MODULE, webadmin_user_parse_query, 50), + ejabberd_hooks:delete(unset_presence_hook, Host, ?MODULE, + user_unset_presence, 50), gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE). reload(Host, NewOpts, OldOpts) -> @@ -165,17 +170,28 @@ reload(Host, NewOpts, OldOpts) -> end. -spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}. -store_offline_msg(#offline_msg{us = {User, Server}} = Msg) -> - Mod = gen_mod:db_mod(Server, ?MODULE), - case get_max_user_messages(User, Server) of - infinity -> - Mod:store_message(Msg); - Limit -> - Num = count_offline_messages(User, Server), - if Num < Limit -> +store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) -> + case (not xmpp:get_meta(Pkt, activity_marker, false)) andalso + use_mam_for_user(User, Server) of + true -> + case xmpp:get_meta(Pkt, first_from_queue, false) of + true -> + store_last_activity_marker(User, Server, xmpp:get_meta(Pkt, stanza_id)); + _ -> + ok + end; + _ -> + Mod = gen_mod:db_mod(Server, ?MODULE), + case get_max_user_messages(User, Server) of + infinity -> Mod:store_message(Msg); - true -> - {error, full} + Limit -> + Num = count_offline_messages(User, Server), + if Num < Limit -> + Mod:store_message(Msg); + true -> + {error, full} + end end end. @@ -298,34 +314,44 @@ handle_offline_query(#iq{lang = Lang} = IQ) -> -spec handle_offline_items_view(jid(), [offline_item()]) -> boolean(). handle_offline_items_view(JID, Items) -> {U, S, R} = jid:tolower(JID), - lists:foldl( - fun(#offline_item{node = Node, action = view}, Acc) -> - case fetch_msg_by_node(JID, Node) of - {ok, OfflineMsg} -> - case offline_msg_to_route(S, OfflineMsg) of - {route, El} -> - NewEl = set_offline_tag(El, Node), - case ejabberd_sm:get_session_pid(U, S, R) of - Pid when is_pid(Pid) -> - Pid ! {route, NewEl}; - none -> - ok - end, - Acc or true; - error -> - Acc or false - end; - error -> - Acc or false - end - end, false, Items). + case use_mam_for_user(U, S) of + true -> + false; + _ -> + lists:foldl( + fun(#offline_item{node = Node, action = view}, Acc) -> + case fetch_msg_by_node(JID, Node) of + {ok, OfflineMsg} -> + case offline_msg_to_route(S, OfflineMsg) of + {route, El} -> + NewEl = set_offline_tag(El, Node), + case ejabberd_sm:get_session_pid(U, S, R) of + Pid when is_pid(Pid) -> + Pid ! {route, NewEl}; + none -> + ok + end, + Acc or true; + error -> + Acc or false + end; + error -> + Acc or false + end + end, false, Items) end. -spec handle_offline_items_remove(jid(), [offline_item()]) -> boolean(). handle_offline_items_remove(JID, Items) -> - lists:foldl( - fun(#offline_item{node = Node, action = remove}, Acc) -> - Acc or remove_msg_by_node(JID, Node) - end, false, Items). + {U, S, _R} = jid:tolower(JID), + case use_mam_for_user(U, S) of + true -> + false; + _ -> + lists:foldl( + fun(#offline_item{node = Node, action = remove}, Acc) -> + Acc or remove_msg_by_node(JID, Node) + end, false, Items) + end. -spec set_offline_tag(message(), binary()) -> message(). set_offline_tag(Msg, Node) -> @@ -334,11 +360,11 @@ set_offline_tag(Msg, Node) -> -spec handle_offline_fetch(jid()) -> ok. handle_offline_fetch(#jid{luser = U, lserver = S} = JID) -> ejabberd_sm:route(JID, {resend_offline, false}), - lists:foreach( - fun({Node, El}) -> - El1 = set_offline_tag(El, Node), - ejabberd_router:route(El1) - end, read_messages(U, S)). + lists:foreach( + fun({Node, El}) -> + El1 = set_offline_tag(El, Node), + ejabberd_router:route(El1) + end, read_messages(U, S)). -spec fetch_msg_by_node(jid(), binary()) -> error | {ok, #offline_msg{}}. fetch_msg_by_node(To, Seq) -> @@ -508,15 +534,26 @@ c2s_self_presence(Acc) -> -spec route_offline_messages(c2s_state()) -> ok. route_offline_messages(#{jid := #jid{luser = LUser, lserver = LServer}} = State) -> Mod = gen_mod:db_mod(LServer, ?MODULE), - case Mod:pop_messages(LUser, LServer) of - {ok, OffMsgs} -> - lists:foreach( - fun(OffMsg) -> - route_offline_message(State, OffMsg) - end, OffMsgs); - _ -> - ok - end. + Msgs = case Mod:pop_messages(LUser, LServer) of + {ok, OffMsgs} -> + case use_mam_for_user(LUser, LServer) of + true -> + lists:map( + fun({_, #message{from = From, to = To} = Msg}) -> + #offline_msg{from = From, to = To, + us = {LUser, LServer}, + packet = Msg} + end, read_mam_messages(LUser, LServer, OffMsgs)); + _ -> + OffMsgs + end; + _ -> + [] + end, + lists:foreach( + fun(OffMsg) -> + route_offline_message(State, OffMsg) + end, Msgs). -spec route_offline_message(c2s_state(), #offline_msg{}) -> ok. route_offline_message(#{lserver := LServer} = State, @@ -574,6 +611,31 @@ remove_user(User, Server) -> Mod:remove_user(LUser, LServer), ok. +-spec user_unset_presence(binary(), binary(), binary(), binary()) -> any(). +user_unset_presence(User, Server, _Resource, _Status) -> + case use_mam_for_user(User, Server) of + true -> + case ejabberd_sm:get_user_present_resources(User, Server) of + [] -> + TimeStamp = erlang:system_time(microsecond), + store_last_activity_marker(User, Server, TimeStamp); + _ -> + ok + end; + _ -> + ok + end. + +store_last_activity_marker(User, Server, Timestamp) -> + Jid = jid:make(User, Server, <<>>), + Pkt = xmpp:put_meta(#message{id = <<"ActivityMarker">>, type = error}, + activity_marker, true), + + Msg = #offline_msg{us = {User, Server}, from = Jid, to = Jid, + timestamp = misc:usec_to_now(Timestamp), + packet = Pkt}, + store_offline_msg(Msg). + %% Helper functions: -spec check_if_message_should_be_bounced(message()) -> boolean(). @@ -641,25 +703,123 @@ offline_msg_to_route(LServer, #offline_msg{from = From, to = To} = R) -> -spec read_messages(binary(), binary()) -> [{binary(), message()}]. read_messages(LUser, LServer) -> + Res = read_db_messages(LUser, LServer), + case use_mam_for_user(LUser, LServer) of + true -> + read_mam_messages(LUser, LServer, Res); + _ -> + Res + end. + +read_db_messages(LUser, LServer) -> Mod = gen_mod:db_mod(LServer, ?MODULE), CodecOpts = ejabberd_config:codec_options(LServer), lists:flatmap( - fun({Seq, From, To, TS, El}) -> - Node = integer_to_binary(Seq), - try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of - Pkt -> - Node = integer_to_binary(Seq), - Pkt1 = add_delay_info(Pkt, LServer, TS), - Pkt2 = xmpp:set_from_to(Pkt1, From, To), - [{Node, Pkt2}] - catch _:{xmpp_codec, Why} -> - ?ERROR_MSG("failed to decode packet ~p " - "of user ~s: ~s", - [El, jid:encode(To), - xmpp:format_error(Why)]), - [] - end - end, Mod:read_message_headers(LUser, LServer)). + fun({Seq, From, To, TS, El}) -> + Node = integer_to_binary(Seq), + try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of + Pkt -> + Node = integer_to_binary(Seq), + Pkt1 = add_delay_info(Pkt, LServer, TS), + Pkt2 = xmpp:set_from_to(Pkt1, From, To), + [{Node, Pkt2}] + catch _:{xmpp_codec, Why} -> + ?ERROR_MSG("failed to decode packet ~p " + "of user ~s: ~s", + [El, jid:encode(To), + xmpp:format_error(Why)]), + [] + end + end, Mod:read_message_headers(LUser, LServer)). + +read_mam_messages(LUser, LServer, ReadMsgs) -> + {Timestamp, ExtraMsgs} = lists:foldl( + fun({_Node, #message{id = <<"ActivityMarker">>, + body = [], type = error} = Msg}, {T, E}) -> + case xmpp:get_subtag(Msg, #delay{}) of + #delay{stamp = Time} -> + if T == none orelse T > Time -> + {Time, E}; + true -> + {T, E} + end + end; + (#offline_msg{from = From, to = To, timestamp = TS, packet = Pkt}, + {T, E}) -> + try xmpp:decode(Pkt) of + #message{id = <<"ActivityMarker">>, + body = [], type = error} = Msg -> + TS2 = case TS of + undefined -> + case xmpp:get_subtag(Msg, #delay{}) of + #delay{stamp = TS0} -> + TS0; + _ -> + erlang:timestamp() + end + end, + if T == none orelse T > TS2 -> + {TS2, E}; + true -> + {T, E} + end; + Decoded -> + Pkt1 = add_delay_info(Decoded, LServer, TS), + {T, [xmpp:set_from_to(Pkt1, From, To) | E]} + catch _:{xmpp_codec, _Why} -> + {T, E} + end; + ({_Node, Msg}, {T, E}) -> + {T, [Msg | E]} + end, {none, []}, ReadMsgs), + Start = case {Timestamp, ExtraMsgs} of + {none, [First|_]} -> + case xmpp:get_subtag(First, #delay{}) of + #delay{stamp = {Mega, Sec, Micro}} -> + {Mega, Sec, Micro+1}; + _ -> + none + end; + {none, _} -> + none; + _ -> + Timestamp + end, + AllMsgs = case Start of + none -> + ExtraMsgs; + _ -> + MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of + Number when is_integer(Number) -> Number; + _ -> 100 + end, + JID = jid:make(LUser, LServer, <<>>), + {MamMsgs, _, _} = mod_mam:select(LServer, JID, JID, + [{start, Start}], + #rsm_set{max = MaxOfflineMsgs, + before = <<"9999999999999999">>}, + chat), + MamMsgs2 = lists:map( + fun({_, _, #forwarded{sub_els = [MM | _], delay = #delay{stamp = MMT}}}) -> + add_delay_info(MM, LServer, MMT) + end, MamMsgs), + + ExtraMsgs ++ MamMsgs2 + end, + AllMsgs2 = lists:sort( + fun(A, B) -> + case {xmpp:get_subtag(A, #delay{}), xmpp:get_subtag(B, #delay{})} of + {#delay{stamp = TA}, #delay{stamp = TB}} -> + TA < TB; + _ -> + true + end + end, AllMsgs), + {AllMsgs3, _} = lists:mapfoldl( + fun(Msg, Counter) -> + {{Counter, Msg}, Counter + 1} + end, 1, AllMsgs2), + AllMsgs3. format_user_queue(Hdrs) -> lists:map( @@ -873,6 +1033,9 @@ import(LServer, {sql, _}, DBType, <<"spool">>, Mod = gen_mod:db_mod(DBType, ?MODULE), Mod:import(OffMsg). +use_mam_for_user(_User, Server) -> + gen_mod:get_module_opt(Server, ?MODULE, use_mam_for_storage). + mod_opt_type(access_max_user_messages) -> fun acl:shaper_rules_validator/1; mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end; @@ -880,6 +1043,8 @@ mod_opt_type(store_groupchat) -> fun(V) when is_boolean(V) -> V end; mod_opt_type(bounce_groupchat) -> fun(V) when is_boolean(V) -> V end; +mod_opt_type(use_mam_for_storage) -> + fun(V) when is_boolean(V) -> V end; mod_opt_type(store_empty_body) -> fun (V) when is_boolean(V) -> V; (unless_chat_state) -> unless_chat_state @@ -889,5 +1054,6 @@ mod_options(Host) -> [{db_type, ejabberd_config:default_db(Host, ?MODULE)}, {access_max_user_messages, max_user_offline_messages}, {store_empty_body, unless_chat_state}, + {use_mam_for_storage, false}, {bounce_groupchat, false}, {store_groupchat, false}]. diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl index 1a4308c58..34ce4e53d 100644 --- a/src/mod_stream_mgmt.erl +++ b/src/mod_stream_mgmt.erl @@ -591,22 +591,25 @@ route_unacked_stanzas(#{mgmt_state := MgmtState, end, ?DEBUG("Re-routing ~B unacknowledged stanza(s) to ~s", [p1_queue:len(Queue), jid:encode(JID)]), - p1_queue:foreach( - fun({_, _Time, #presence{from = From}}) -> - ?DEBUG("Dropping presence stanza from ~s", [jid:encode(From)]); - ({_, _Time, #iq{} = El}) -> + p1_queue:foldl( + fun({_, _Time, #presence{from = From}}, Acc) -> + ?DEBUG("Dropping presence stanza from ~s", [jid:encode(From)]), + Acc; + ({_, _Time, #iq{} = El}, Acc) -> Txt = <<"User session terminated">>, ejabberd_router:route_error( - El, xmpp:err_service_unavailable(Txt, Lang)); - ({_, _Time, #message{from = From, meta = #{carbon_copy := true}}}) -> + El, xmpp:err_service_unavailable(Txt, Lang)), + Acc; + ({_, _Time, #message{from = From, meta = #{carbon_copy := true}}}, Acc) -> %% XEP-0280 says: "When a receiving server attempts to deliver a %% forked message, and that message bounces with an error for %% any reason, the receiving server MUST NOT forward that error %% back to the original sender." Resending such a stanza could %% easily lead to unexpected results as well. ?DEBUG("Dropping forwarded message stanza from ~s", - [jid:encode(From)]); - ({_, Time, #message{} = Msg}) -> + [jid:encode(From)]), + Acc; + ({_, Time, #message{} = Msg}, Acc) -> case ejabberd_hooks:run_fold(message_is_archived, LServer, false, [State, Msg]) of @@ -615,17 +618,26 @@ route_unacked_stanzas(#{mgmt_state := MgmtState, [jid:encode(xmpp:get_from(Msg))]); false when ResendOnTimeout -> NewEl = add_resent_delay_info(State, Msg, Time), - ejabberd_router:route(NewEl); + NewEl2 = case Acc of + first_resend -> + xmpp:put_meta(NewEl, first_from_queue, true); + _ -> + NewEl + end, + ejabberd_router:route(NewEl2), + false; false -> Txt = <<"User session terminated">>, ejabberd_router:route_error( - Msg, xmpp:err_service_unavailable(Txt, Lang)) + Msg, xmpp:err_service_unavailable(Txt, Lang)), + Acc end; - ({_, _Time, El}) -> + ({_, _Time, El}, Acc) -> %% Raw element of type 'error' resulting from a validation error %% We cannot pass it to the router, it will generate an error - ?DEBUG("Do not route raw element from ack queue: ~p", [El]) - end, Queue); + ?DEBUG("Do not route raw element from ack queue: ~p", [El]), + Acc + end, first_resend, Queue); route_unacked_stanzas(_State) -> ok.