diff --git a/src/mod_mam.erl b/src/mod_mam.erl index aed43c21e..24803f9e1 100644 --- a/src/mod_mam.erl +++ b/src/mod_mam.erl @@ -33,12 +33,12 @@ %% API -export([start/2, stop/1, reload/3, depends/2]). --export([user_send_packet/1, user_send_packet_strip_tag/1, sm_receive_packet/1, - process_iq_v0_2/1, process_iq_v0_3/1, disco_sm_features/5, - remove_user/2, remove_room/3, mod_opt_type/1, muc_process_iq/2, - muc_filter_message/3, message_is_archived/3, delete_old_messages/2, - get_commands_spec/0, msg_to_el/4, get_room_config/4, set_room_option/3, - offline_message/1, export/1]). +-export([sm_receive_packet/1, user_receive_packet/1, user_send_packet/1, + user_send_packet_strip_tag/1, process_iq_v0_2/1, process_iq_v0_3/1, + disco_sm_features/5, remove_user/2, remove_room/3, mod_opt_type/1, + muc_process_iq/2, muc_filter_message/3, message_is_archived/3, + delete_old_messages/2, get_commands_spec/0, msg_to_el/4, + get_room_config/4, set_room_option/3, offline_message/1, export/1]). -include("xmpp.hrl"). -include("logger.hrl"). @@ -59,7 +59,7 @@ all | chat | groupchat) -> any(). -callback extended_fields() -> [mam_query:property() | #xdata_field{}]. -callback store(xmlel(), binary(), {binary(), binary()}, chat | groupchat, - jid(), binary(), recv | send) -> {ok, binary()} | any(). + jid(), binary(), recv | send, integer()) -> ok | any(). -callback write_prefs(binary(), binary(), #archive_prefs{}, binary()) -> ok | any(). -callback get_prefs(binary(), binary()) -> {ok, #archive_prefs{}} | error. -callback select(binary(), jid(), jid(), mam_query:result(), @@ -80,6 +80,8 @@ start(Host, Opts) -> register_iq_handlers(Host, IQDisc), ejabberd_hooks:add(sm_receive_packet, Host, ?MODULE, sm_receive_packet, 50), + ejabberd_hooks:add(user_receive_packet, Host, ?MODULE, + user_receive_packet, 88), ejabberd_hooks:add(user_send_packet, Host, ?MODULE, user_send_packet, 88), ejabberd_hooks:add(user_send_packet, Host, ?MODULE, @@ -143,6 +145,8 @@ stop(Host) -> unregister_iq_handlers(Host), ejabberd_hooks:delete(sm_receive_packet, Host, ?MODULE, sm_receive_packet, 50), + ejabberd_hooks:delete(user_receive_packet, Host, ?MODULE, + user_receive_packet, 88), ejabberd_hooks:delete(user_send_packet, Host, ?MODULE, user_send_packet, 88), ejabberd_hooks:delete(user_send_packet, Host, ?MODULE, @@ -267,23 +271,28 @@ set_room_option(Acc, _Property, _Lang) -> Acc. -spec sm_receive_packet(stanza()) -> stanza(). -sm_receive_packet(#message{from = Peer, to = JID} = Pkt) -> +sm_receive_packet(#message{to = #jid{lserver = LServer}} = Pkt) -> + init_stanza_id(Pkt, LServer); +sm_receive_packet(Acc) -> + Acc. + +-spec user_receive_packet({stanza(), c2s_state()}) -> {stanza(), c2s_state()}. +user_receive_packet({#message{from = Peer} = Pkt, #{jid := JID} = C2SState}) -> LUser = JID#jid.luser, LServer = JID#jid.lserver, - Pkt1 = strip_my_archived_tag(Pkt, LServer), - case should_archive(Pkt1, LServer) of - true -> - case store_msg(Pkt1, LUser, LServer, Peer, recv) of - {ok, ID} -> - xmpp:put_meta(set_stanza_id(Pkt1, JID, ID), - mam_archived, true); - _ -> - Pkt1 - end; - _ -> - Pkt1 - end; -sm_receive_packet(Acc) -> + Pkt1 = case should_archive(Pkt, LServer) of + true -> + case store_msg(Pkt, LUser, LServer, Peer, recv) of + ok -> + mark_stored_msg(Pkt, JID); + _ -> + Pkt + end; + _ -> + Pkt + end, + {Pkt1, C2SState}; +user_receive_packet(Acc) -> Acc. -spec user_send_packet({stanza(), c2s_state()}) @@ -291,14 +300,13 @@ sm_receive_packet(Acc) -> user_send_packet({#message{to = Peer} = Pkt, #{jid := JID} = C2SState}) -> LUser = JID#jid.luser, LServer = JID#jid.lserver, - Pkt1 = strip_my_archived_tag(Pkt, LServer), + Pkt1 = init_stanza_id(Pkt, LServer), Pkt2 = case should_archive(Pkt1, LServer) of true -> case store_msg(xmpp:set_from_to(Pkt1, JID, Peer), LUser, LServer, Peer, send) of - {ok, ID} -> - xmpp:put_meta(set_stanza_id(Pkt1, JID, ID), - mam_archived, true); + ok -> + mark_stored_msg(Pkt1, JID); _ -> Pkt1 end; @@ -318,10 +326,20 @@ user_send_packet_strip_tag(Acc) -> Acc. -spec offline_message({any(), message()}) -> {any(), message()}. -offline_message({_Action, #message{meta = #{mam_archived := true}} = Pkt}) -> - {archived, Pkt}; -offline_message(Acc) -> - Acc. +offline_message({_Action, #message{from = Peer, to = To} = Pkt} = Acc) -> + LUser = To#jid.luser, + LServer = To#jid.lserver, + case should_archive(Pkt, LServer) of + true -> + case store_msg(Pkt, LUser, LServer, Peer, recv) of + ok -> + {archived, mark_stored_msg(Pkt, To)}; + _ -> + Acc + end; + false -> + Acc + end. -spec muc_filter_message(message(), mod_muc_room:state(), binary()) -> message(). @@ -329,13 +347,12 @@ muc_filter_message(#message{from = From} = Pkt, #state{config = Config, jid = RoomJID} = MUCState, FromNick) -> LServer = RoomJID#jid.lserver, - NewPkt = strip_my_archived_tag(Pkt, LServer), + NewPkt = init_stanza_id(Pkt, LServer), if Config#config.mam -> StorePkt = strip_x_jid_tags(NewPkt), case store_muc(MUCState, StorePkt, RoomJID, From, FromNick) of - {ok, ID} -> - xmpp:put_meta(set_stanza_id(NewPkt, RoomJID, ID), - mam_archived, true); + ok -> + mark_stored_msg(NewPkt, RoomJID); _ -> NewPkt end; @@ -345,6 +362,16 @@ muc_filter_message(#message{from = From} = Pkt, muc_filter_message(Acc, _MUCState, _FromNick) -> Acc. +-spec get_stanza_id(stanza()) -> integer(). +get_stanza_id(#message{meta = #{stanza_id := ID}}) -> + ID. + +-spec init_stanza_id(stanza(), binary()) -> stanza(). +init_stanza_id(Pkt, LServer) -> + ID = p1_time_compat:system_time(micro_seconds), + Pkt1 = strip_my_archived_tag(Pkt, LServer), + xmpp:put_meta(Pkt1, stanza_id, ID). + set_stanza_id(Pkt, JID, ID) -> BareJID = jid:remove_resource(JID), Archived = #mam_archived{by = BareJID, id = ID}, @@ -352,6 +379,11 @@ set_stanza_id(Pkt, JID, ID) -> NewEls = [Archived, StanzaID|xmpp:get_els(Pkt)], xmpp:set_els(Pkt, NewEls). +-spec mark_stored_msg(message(), jid()) -> message(). +mark_stored_msg(#message{meta = #{stanza_id := ID}} = Pkt, JID) -> + Pkt1 = set_stanza_id(Pkt, JID, integer_to_binary(ID)), + xmpp:put_meta(Pkt1, mam_archived, true). + % Query archive v0.2 process_iq_v0_2(#iq{from = #jid{lserver = LServer}, to = #jid{lserver = LServer}, @@ -715,9 +747,10 @@ may_enter_room(From, may_enter_room(From, MUCState) -> mod_muc_room:is_occupant_or_admin(From, MUCState). --spec store_msg(message(), - binary(), binary(), jid(), send | recv) -> - {ok, binary()} | pass. +-spec store_msg(message(), binary(), binary(), jid(), send | recv) + -> ok | pass | any(). +store_msg(#message{meta = #{sm_copy := true}}, _LUser, _LServer, _Peer, _Dir) -> + ok; % Already stored. store_msg(Pkt, LUser, LServer, Peer, Dir) -> Prefs = get_prefs(LUser, LServer), case should_archive_peer(LUser, LServer, Prefs, Peer) of @@ -729,8 +762,9 @@ store_msg(Pkt, LUser, LServer, Peer, Dir) -> pass; NewPkt -> Mod = gen_mod:db_mod(LServer, ?MODULE), + ID = get_stanza_id(NewPkt), El = xmpp:encode(NewPkt), - Mod:store(El, LServer, US, chat, Peer, <<"">>, Dir) + Mod:store(El, LServer, US, chat, Peer, <<"">>, Dir, ID) end; false -> pass @@ -747,8 +781,9 @@ store_muc(MUCState, Pkt, RoomJID, Peer, Nick) -> pass; NewPkt -> Mod = gen_mod:db_mod(LServer, ?MODULE), + ID = get_stanza_id(NewPkt), El = xmpp:encode(NewPkt), - Mod:store(El, LServer, {U, S}, groupchat, Peer, Nick, recv) + Mod:store(El, LServer, {U, S}, groupchat, Peer, Nick, recv, ID) end; false -> pass diff --git a/src/mod_mam_mnesia.erl b/src/mod_mam_mnesia.erl index b672b4107..71f1f701b 100644 --- a/src/mod_mam_mnesia.erl +++ b/src/mod_mam_mnesia.erl @@ -28,7 +28,7 @@ %% API -export([init/2, remove_user/2, remove_room/3, delete_old_messages/3, - extended_fields/0, store/7, write_prefs/4, get_prefs/2, select/6]). + extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/6]). -include_lib("stdlib/include/ms_transform.hrl"). -include("xmpp.hrl"). @@ -103,7 +103,7 @@ delete_old_user_messages(User, TimeStamp, Type) -> extended_fields() -> []. -store(Pkt, _, {LUser, LServer}, Type, Peer, Nick, _Dir) -> +store(Pkt, _, {LUser, LServer}, Type, Peer, Nick, _Dir, TS) -> case {mnesia:table_info(archive_msg, disc_only_copies), mnesia:table_info(archive_msg, memory)} of {[_|_], TableSize} when TableSize > ?TABLE_SIZE_LIMIT -> @@ -112,13 +112,11 @@ store(Pkt, _, {LUser, LServer}, Type, Peer, Nick, _Dir) -> {error, overflow}; _ -> LPeer = {PUser, PServer, _} = jid:tolower(Peer), - TS = p1_time_compat:timestamp(), - ID = integer_to_binary(now_to_usec(TS)), F = fun() -> mnesia:write( #archive_msg{us = {LUser, LServer}, - id = ID, - timestamp = TS, + id = integer_to_binary(TS), + timestamp = misc:usec_to_now(TS), peer = LPeer, bare_peer = {PUser, PServer, <<>>}, type = Type, @@ -127,7 +125,7 @@ store(Pkt, _, {LUser, LServer}, Type, Peer, Nick, _Dir) -> end, case mnesia:transaction(F) of {atomic, ok} -> - {ok, ID}; + ok; {aborted, Err} -> ?ERROR_MSG("Cannot add message to MAM archive of ~s@~s: ~s", [LUser, LServer, Err]), @@ -178,9 +176,6 @@ select(_LServer, JidRequestor, %%%=================================================================== %%% Internal functions %%%=================================================================== -now_to_usec({MSec, Sec, USec}) -> - (MSec*1000000 + Sec)*1000000 + USec. - make_matchspec(LUser, LServer, Start, undefined, With) -> %% List is always greater than a tuple make_matchspec(LUser, LServer, Start, [], With); diff --git a/src/mod_mam_sql.erl b/src/mod_mam_sql.erl index 53ccd94a4..40aa98367 100644 --- a/src/mod_mam_sql.erl +++ b/src/mod_mam_sql.erl @@ -30,7 +30,7 @@ %% API -export([init/2, remove_user/2, remove_room/3, delete_old_messages/3, - extended_fields/0, store/7, write_prefs/4, get_prefs/2, select/6, export/1]). + extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/6, export/1]). -include_lib("stdlib/include/ms_transform.hrl"). -include("xmpp.hrl"). @@ -84,9 +84,7 @@ delete_old_messages(ServerHost, TimeStamp, Type) -> extended_fields() -> [{withtext, <<"">>}]. -store(Pkt, LServer, {LUser, LHost}, Type, Peer, Nick, _Dir) -> - TSinteger = p1_time_compat:system_time(micro_seconds), - ID = integer_to_binary(TSinteger), +store(Pkt, LServer, {LUser, LHost}, Type, Peer, Nick, _Dir, TS) -> SUser = case Type of chat -> LUser; groupchat -> jid:encode({LUser, LHost, <<>>}) @@ -105,7 +103,7 @@ store(Pkt, LServer, {LUser, LHost}, Type, Peer, Nick, _Dir) -> "archive", ["username=%(SUser)s", "server_host=%(LServer)s", - "timestamp=%(TSinteger)d", + "timestamp=%(TS)d", "peer=%(LPeer)s", "bare_peer=%(BarePeer)s", "xml=%(XML)s", @@ -113,7 +111,7 @@ store(Pkt, LServer, {LUser, LHost}, Type, Peer, Nick, _Dir) -> "kind=%(SType)s", "nick=%(Nick)s"])) of {updated, _} -> - {ok, ID}; + ok; Err -> Err end.