mod_mam: Don't store from 'sm_receive_packet' hook

Let mod_mam use the 'sm_receive_packet' hook to generate stanza IDs for
incoming messages, but not to actually store them.  The latter would
require additional changes to make sure modules such as mod_privacy and
mod_block_strangers continue to affect MAM.
This commit is contained in:
Holger Weiss 2017-11-14 22:02:48 +01:00
parent 054413d8f4
commit 8376370ae1
3 changed files with 83 additions and 55 deletions

View File

@ -33,12 +33,12 @@
%% API %% API
-export([start/2, stop/1, reload/3, depends/2]). -export([start/2, stop/1, reload/3, depends/2]).
-export([user_send_packet/1, user_send_packet_strip_tag/1, sm_receive_packet/1, -export([sm_receive_packet/1, user_receive_packet/1, user_send_packet/1,
process_iq_v0_2/1, process_iq_v0_3/1, disco_sm_features/5, user_send_packet_strip_tag/1, process_iq_v0_2/1, process_iq_v0_3/1,
remove_user/2, remove_room/3, mod_opt_type/1, muc_process_iq/2, disco_sm_features/5, remove_user/2, remove_room/3, mod_opt_type/1,
muc_filter_message/3, message_is_archived/3, delete_old_messages/2, muc_process_iq/2, muc_filter_message/3, message_is_archived/3,
get_commands_spec/0, msg_to_el/4, get_room_config/4, set_room_option/3, delete_old_messages/2, get_commands_spec/0, msg_to_el/4,
offline_message/1, export/1]). get_room_config/4, set_room_option/3, offline_message/1, export/1]).
-include("xmpp.hrl"). -include("xmpp.hrl").
-include("logger.hrl"). -include("logger.hrl").
@ -59,7 +59,7 @@
all | chat | groupchat) -> any(). all | chat | groupchat) -> any().
-callback extended_fields() -> [mam_query:property() | #xdata_field{}]. -callback extended_fields() -> [mam_query:property() | #xdata_field{}].
-callback store(xmlel(), binary(), {binary(), binary()}, chat | groupchat, -callback store(xmlel(), binary(), {binary(), binary()}, chat | groupchat,
jid(), binary(), recv | send) -> {ok, binary()} | any(). jid(), binary(), recv | send, integer()) -> ok | any().
-callback write_prefs(binary(), binary(), #archive_prefs{}, binary()) -> ok | any(). -callback write_prefs(binary(), binary(), #archive_prefs{}, binary()) -> ok | any().
-callback get_prefs(binary(), binary()) -> {ok, #archive_prefs{}} | error. -callback get_prefs(binary(), binary()) -> {ok, #archive_prefs{}} | error.
-callback select(binary(), jid(), jid(), mam_query:result(), -callback select(binary(), jid(), jid(), mam_query:result(),
@ -80,6 +80,8 @@ start(Host, Opts) ->
register_iq_handlers(Host, IQDisc), register_iq_handlers(Host, IQDisc),
ejabberd_hooks:add(sm_receive_packet, Host, ?MODULE, ejabberd_hooks:add(sm_receive_packet, Host, ?MODULE,
sm_receive_packet, 50), sm_receive_packet, 50),
ejabberd_hooks:add(user_receive_packet, Host, ?MODULE,
user_receive_packet, 88),
ejabberd_hooks:add(user_send_packet, Host, ?MODULE, ejabberd_hooks:add(user_send_packet, Host, ?MODULE,
user_send_packet, 88), user_send_packet, 88),
ejabberd_hooks:add(user_send_packet, Host, ?MODULE, ejabberd_hooks:add(user_send_packet, Host, ?MODULE,
@ -143,6 +145,8 @@ stop(Host) ->
unregister_iq_handlers(Host), unregister_iq_handlers(Host),
ejabberd_hooks:delete(sm_receive_packet, Host, ?MODULE, ejabberd_hooks:delete(sm_receive_packet, Host, ?MODULE,
sm_receive_packet, 50), sm_receive_packet, 50),
ejabberd_hooks:delete(user_receive_packet, Host, ?MODULE,
user_receive_packet, 88),
ejabberd_hooks:delete(user_send_packet, Host, ?MODULE, ejabberd_hooks:delete(user_send_packet, Host, ?MODULE,
user_send_packet, 88), user_send_packet, 88),
ejabberd_hooks:delete(user_send_packet, Host, ?MODULE, ejabberd_hooks:delete(user_send_packet, Host, ?MODULE,
@ -267,23 +271,28 @@ set_room_option(Acc, _Property, _Lang) ->
Acc. Acc.
-spec sm_receive_packet(stanza()) -> stanza(). -spec sm_receive_packet(stanza()) -> stanza().
sm_receive_packet(#message{from = Peer, to = JID} = Pkt) -> sm_receive_packet(#message{to = #jid{lserver = LServer}} = Pkt) ->
init_stanza_id(Pkt, LServer);
sm_receive_packet(Acc) ->
Acc.
-spec user_receive_packet({stanza(), c2s_state()}) -> {stanza(), c2s_state()}.
user_receive_packet({#message{from = Peer} = Pkt, #{jid := JID} = C2SState}) ->
LUser = JID#jid.luser, LUser = JID#jid.luser,
LServer = JID#jid.lserver, LServer = JID#jid.lserver,
Pkt1 = strip_my_archived_tag(Pkt, LServer), Pkt1 = case should_archive(Pkt, LServer) of
case should_archive(Pkt1, LServer) of true ->
true -> case store_msg(Pkt, LUser, LServer, Peer, recv) of
case store_msg(Pkt1, LUser, LServer, Peer, recv) of ok ->
{ok, ID} -> mark_stored_msg(Pkt, JID);
xmpp:put_meta(set_stanza_id(Pkt1, JID, ID), _ ->
mam_archived, true); Pkt
_ -> end;
Pkt1 _ ->
end; Pkt
_ -> end,
Pkt1 {Pkt1, C2SState};
end; user_receive_packet(Acc) ->
sm_receive_packet(Acc) ->
Acc. Acc.
-spec user_send_packet({stanza(), c2s_state()}) -spec user_send_packet({stanza(), c2s_state()})
@ -291,14 +300,13 @@ sm_receive_packet(Acc) ->
user_send_packet({#message{to = Peer} = Pkt, #{jid := JID} = C2SState}) -> user_send_packet({#message{to = Peer} = Pkt, #{jid := JID} = C2SState}) ->
LUser = JID#jid.luser, LUser = JID#jid.luser,
LServer = JID#jid.lserver, LServer = JID#jid.lserver,
Pkt1 = strip_my_archived_tag(Pkt, LServer), Pkt1 = init_stanza_id(Pkt, LServer),
Pkt2 = case should_archive(Pkt1, LServer) of Pkt2 = case should_archive(Pkt1, LServer) of
true -> true ->
case store_msg(xmpp:set_from_to(Pkt1, JID, Peer), case store_msg(xmpp:set_from_to(Pkt1, JID, Peer),
LUser, LServer, Peer, send) of LUser, LServer, Peer, send) of
{ok, ID} -> ok ->
xmpp:put_meta(set_stanza_id(Pkt1, JID, ID), mark_stored_msg(Pkt1, JID);
mam_archived, true);
_ -> _ ->
Pkt1 Pkt1
end; end;
@ -318,10 +326,20 @@ user_send_packet_strip_tag(Acc) ->
Acc. Acc.
-spec offline_message({any(), message()}) -> {any(), message()}. -spec offline_message({any(), message()}) -> {any(), message()}.
offline_message({_Action, #message{meta = #{mam_archived := true}} = Pkt}) -> offline_message({_Action, #message{from = Peer, to = To} = Pkt} = Acc) ->
{archived, Pkt}; LUser = To#jid.luser,
offline_message(Acc) -> LServer = To#jid.lserver,
Acc. case should_archive(Pkt, LServer) of
true ->
case store_msg(Pkt, LUser, LServer, Peer, recv) of
ok ->
{archived, mark_stored_msg(Pkt, To)};
_ ->
Acc
end;
false ->
Acc
end.
-spec muc_filter_message(message(), mod_muc_room:state(), -spec muc_filter_message(message(), mod_muc_room:state(),
binary()) -> message(). binary()) -> message().
@ -329,13 +347,12 @@ muc_filter_message(#message{from = From} = Pkt,
#state{config = Config, jid = RoomJID} = MUCState, #state{config = Config, jid = RoomJID} = MUCState,
FromNick) -> FromNick) ->
LServer = RoomJID#jid.lserver, LServer = RoomJID#jid.lserver,
NewPkt = strip_my_archived_tag(Pkt, LServer), NewPkt = init_stanza_id(Pkt, LServer),
if Config#config.mam -> if Config#config.mam ->
StorePkt = strip_x_jid_tags(NewPkt), StorePkt = strip_x_jid_tags(NewPkt),
case store_muc(MUCState, StorePkt, RoomJID, From, FromNick) of case store_muc(MUCState, StorePkt, RoomJID, From, FromNick) of
{ok, ID} -> ok ->
xmpp:put_meta(set_stanza_id(NewPkt, RoomJID, ID), mark_stored_msg(NewPkt, RoomJID);
mam_archived, true);
_ -> _ ->
NewPkt NewPkt
end; end;
@ -345,6 +362,16 @@ muc_filter_message(#message{from = From} = Pkt,
muc_filter_message(Acc, _MUCState, _FromNick) -> muc_filter_message(Acc, _MUCState, _FromNick) ->
Acc. Acc.
-spec get_stanza_id(stanza()) -> integer().
get_stanza_id(#message{meta = #{stanza_id := ID}}) ->
ID.
-spec init_stanza_id(stanza(), binary()) -> stanza().
init_stanza_id(Pkt, LServer) ->
ID = p1_time_compat:system_time(micro_seconds),
Pkt1 = strip_my_archived_tag(Pkt, LServer),
xmpp:put_meta(Pkt1, stanza_id, ID).
set_stanza_id(Pkt, JID, ID) -> set_stanza_id(Pkt, JID, ID) ->
BareJID = jid:remove_resource(JID), BareJID = jid:remove_resource(JID),
Archived = #mam_archived{by = BareJID, id = ID}, Archived = #mam_archived{by = BareJID, id = ID},
@ -352,6 +379,11 @@ set_stanza_id(Pkt, JID, ID) ->
NewEls = [Archived, StanzaID|xmpp:get_els(Pkt)], NewEls = [Archived, StanzaID|xmpp:get_els(Pkt)],
xmpp:set_els(Pkt, NewEls). xmpp:set_els(Pkt, NewEls).
-spec mark_stored_msg(message(), jid()) -> message().
mark_stored_msg(#message{meta = #{stanza_id := ID}} = Pkt, JID) ->
Pkt1 = set_stanza_id(Pkt, JID, integer_to_binary(ID)),
xmpp:put_meta(Pkt1, mam_archived, true).
% Query archive v0.2 % Query archive v0.2
process_iq_v0_2(#iq{from = #jid{lserver = LServer}, process_iq_v0_2(#iq{from = #jid{lserver = LServer},
to = #jid{lserver = LServer}, to = #jid{lserver = LServer},
@ -715,9 +747,10 @@ may_enter_room(From,
may_enter_room(From, MUCState) -> may_enter_room(From, MUCState) ->
mod_muc_room:is_occupant_or_admin(From, MUCState). mod_muc_room:is_occupant_or_admin(From, MUCState).
-spec store_msg(message(), -spec store_msg(message(), binary(), binary(), jid(), send | recv)
binary(), binary(), jid(), send | recv) -> -> ok | pass | any().
{ok, binary()} | pass. store_msg(#message{meta = #{sm_copy := true}}, _LUser, _LServer, _Peer, _Dir) ->
ok; % Already stored.
store_msg(Pkt, LUser, LServer, Peer, Dir) -> store_msg(Pkt, LUser, LServer, Peer, Dir) ->
Prefs = get_prefs(LUser, LServer), Prefs = get_prefs(LUser, LServer),
case should_archive_peer(LUser, LServer, Prefs, Peer) of case should_archive_peer(LUser, LServer, Prefs, Peer) of
@ -729,8 +762,9 @@ store_msg(Pkt, LUser, LServer, Peer, Dir) ->
pass; pass;
NewPkt -> NewPkt ->
Mod = gen_mod:db_mod(LServer, ?MODULE), Mod = gen_mod:db_mod(LServer, ?MODULE),
ID = get_stanza_id(NewPkt),
El = xmpp:encode(NewPkt), El = xmpp:encode(NewPkt),
Mod:store(El, LServer, US, chat, Peer, <<"">>, Dir) Mod:store(El, LServer, US, chat, Peer, <<"">>, Dir, ID)
end; end;
false -> false ->
pass pass
@ -747,8 +781,9 @@ store_muc(MUCState, Pkt, RoomJID, Peer, Nick) ->
pass; pass;
NewPkt -> NewPkt ->
Mod = gen_mod:db_mod(LServer, ?MODULE), Mod = gen_mod:db_mod(LServer, ?MODULE),
ID = get_stanza_id(NewPkt),
El = xmpp:encode(NewPkt), El = xmpp:encode(NewPkt),
Mod:store(El, LServer, {U, S}, groupchat, Peer, Nick, recv) Mod:store(El, LServer, {U, S}, groupchat, Peer, Nick, recv, ID)
end; end;
false -> false ->
pass pass

View File

@ -28,7 +28,7 @@
%% API %% API
-export([init/2, remove_user/2, remove_room/3, delete_old_messages/3, -export([init/2, remove_user/2, remove_room/3, delete_old_messages/3,
extended_fields/0, store/7, write_prefs/4, get_prefs/2, select/6]). extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/6]).
-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("stdlib/include/ms_transform.hrl").
-include("xmpp.hrl"). -include("xmpp.hrl").
@ -103,7 +103,7 @@ delete_old_user_messages(User, TimeStamp, Type) ->
extended_fields() -> extended_fields() ->
[]. [].
store(Pkt, _, {LUser, LServer}, Type, Peer, Nick, _Dir) -> store(Pkt, _, {LUser, LServer}, Type, Peer, Nick, _Dir, TS) ->
case {mnesia:table_info(archive_msg, disc_only_copies), case {mnesia:table_info(archive_msg, disc_only_copies),
mnesia:table_info(archive_msg, memory)} of mnesia:table_info(archive_msg, memory)} of
{[_|_], TableSize} when TableSize > ?TABLE_SIZE_LIMIT -> {[_|_], TableSize} when TableSize > ?TABLE_SIZE_LIMIT ->
@ -112,13 +112,11 @@ store(Pkt, _, {LUser, LServer}, Type, Peer, Nick, _Dir) ->
{error, overflow}; {error, overflow};
_ -> _ ->
LPeer = {PUser, PServer, _} = jid:tolower(Peer), LPeer = {PUser, PServer, _} = jid:tolower(Peer),
TS = p1_time_compat:timestamp(),
ID = integer_to_binary(now_to_usec(TS)),
F = fun() -> F = fun() ->
mnesia:write( mnesia:write(
#archive_msg{us = {LUser, LServer}, #archive_msg{us = {LUser, LServer},
id = ID, id = integer_to_binary(TS),
timestamp = TS, timestamp = misc:usec_to_now(TS),
peer = LPeer, peer = LPeer,
bare_peer = {PUser, PServer, <<>>}, bare_peer = {PUser, PServer, <<>>},
type = Type, type = Type,
@ -127,7 +125,7 @@ store(Pkt, _, {LUser, LServer}, Type, Peer, Nick, _Dir) ->
end, end,
case mnesia:transaction(F) of case mnesia:transaction(F) of
{atomic, ok} -> {atomic, ok} ->
{ok, ID}; ok;
{aborted, Err} -> {aborted, Err} ->
?ERROR_MSG("Cannot add message to MAM archive of ~s@~s: ~s", ?ERROR_MSG("Cannot add message to MAM archive of ~s@~s: ~s",
[LUser, LServer, Err]), [LUser, LServer, Err]),
@ -178,9 +176,6 @@ select(_LServer, JidRequestor,
%%%=================================================================== %%%===================================================================
%%% Internal functions %%% Internal functions
%%%=================================================================== %%%===================================================================
now_to_usec({MSec, Sec, USec}) ->
(MSec*1000000 + Sec)*1000000 + USec.
make_matchspec(LUser, LServer, Start, undefined, With) -> make_matchspec(LUser, LServer, Start, undefined, With) ->
%% List is always greater than a tuple %% List is always greater than a tuple
make_matchspec(LUser, LServer, Start, [], With); make_matchspec(LUser, LServer, Start, [], With);

View File

@ -30,7 +30,7 @@
%% API %% API
-export([init/2, remove_user/2, remove_room/3, delete_old_messages/3, -export([init/2, remove_user/2, remove_room/3, delete_old_messages/3,
extended_fields/0, store/7, write_prefs/4, get_prefs/2, select/6, export/1]). extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/6, export/1]).
-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("stdlib/include/ms_transform.hrl").
-include("xmpp.hrl"). -include("xmpp.hrl").
@ -84,9 +84,7 @@ delete_old_messages(ServerHost, TimeStamp, Type) ->
extended_fields() -> extended_fields() ->
[{withtext, <<"">>}]. [{withtext, <<"">>}].
store(Pkt, LServer, {LUser, LHost}, Type, Peer, Nick, _Dir) -> store(Pkt, LServer, {LUser, LHost}, Type, Peer, Nick, _Dir, TS) ->
TSinteger = p1_time_compat:system_time(micro_seconds),
ID = integer_to_binary(TSinteger),
SUser = case Type of SUser = case Type of
chat -> LUser; chat -> LUser;
groupchat -> jid:encode({LUser, LHost, <<>>}) groupchat -> jid:encode({LUser, LHost, <<>>})
@ -105,7 +103,7 @@ store(Pkt, LServer, {LUser, LHost}, Type, Peer, Nick, _Dir) ->
"archive", "archive",
["username=%(SUser)s", ["username=%(SUser)s",
"server_host=%(LServer)s", "server_host=%(LServer)s",
"timestamp=%(TSinteger)d", "timestamp=%(TS)d",
"peer=%(LPeer)s", "peer=%(LPeer)s",
"bare_peer=%(BarePeer)s", "bare_peer=%(BarePeer)s",
"xml=%(XML)s", "xml=%(XML)s",
@ -113,7 +111,7 @@ store(Pkt, LServer, {LUser, LHost}, Type, Peer, Nick, _Dir) ->
"kind=%(SType)s", "kind=%(SType)s",
"nick=%(Nick)s"])) of "nick=%(Nick)s"])) of
{updated, _} -> {updated, _} ->
{ok, ID}; ok;
Err -> Err ->
Err Err
end. end.