Clean mod_mam.erl from DB specific code
This commit is contained in:
parent
901d2e0aed
commit
52571e8624
541
src/mod_mam.erl
541
src/mod_mam.erl
|
@ -35,41 +35,37 @@
|
||||||
|
|
||||||
-export([user_send_packet/4, user_receive_packet/5,
|
-export([user_send_packet/4, user_receive_packet/5,
|
||||||
process_iq_v0_2/3, process_iq_v0_3/3, disco_sm_features/5,
|
process_iq_v0_2/3, process_iq_v0_3/3, disco_sm_features/5,
|
||||||
remove_user/2, remove_user/3, mod_opt_type/1, muc_process_iq/4,
|
remove_user/2, remove_room/3, mod_opt_type/1, muc_process_iq/4,
|
||||||
muc_filter_message/5, message_is_archived/5, delete_old_messages/2,
|
muc_filter_message/5, message_is_archived/5, delete_old_messages/2,
|
||||||
get_commands_spec/0]).
|
get_commands_spec/0, msg_to_el/4]).
|
||||||
|
|
||||||
-include_lib("stdlib/include/ms_transform.hrl").
|
|
||||||
-include("jlib.hrl").
|
-include("jlib.hrl").
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
-include("mod_muc_room.hrl").
|
-include("mod_muc_room.hrl").
|
||||||
-include("ejabberd_commands.hrl").
|
-include("ejabberd_commands.hrl").
|
||||||
|
-include("mod_mam.hrl").
|
||||||
|
|
||||||
-define(DEF_PAGE_SIZE, 50).
|
-define(DEF_PAGE_SIZE, 50).
|
||||||
-define(MAX_PAGE_SIZE, 250).
|
-define(MAX_PAGE_SIZE, 250).
|
||||||
|
|
||||||
-define(BIN_GREATER_THAN(A, B),
|
-callback init(binary(), gen_mod:opts()) -> any().
|
||||||
((A > B andalso byte_size(A) == byte_size(B))
|
-callback remove_user(binary(), binary()) -> any().
|
||||||
orelse byte_size(A) > byte_size(B))).
|
-callback remove_room(binary(), binary(), binary()) -> any().
|
||||||
-define(BIN_LESS_THAN(A, B),
|
-callback delete_old_messages(binary() | global,
|
||||||
((A < B andalso byte_size(A) == byte_size(B))
|
erlang:timestamp(),
|
||||||
orelse byte_size(A) < byte_size(B))).
|
all | chat | groupchat) -> any().
|
||||||
|
-callback extended_fields() -> [xmlel()].
|
||||||
-record(archive_msg,
|
-callback store(xmlel(), binary(), {binary(), binary()}, chat | groupchat,
|
||||||
{us = {<<"">>, <<"">>} :: {binary(), binary()} | '$2',
|
jid(), binary(), recv | send) -> {ok, binary()} | any().
|
||||||
id = <<>> :: binary() | '_',
|
-callback write_prefs(binary(), binary(), #archive_prefs{}, binary()) -> ok | any().
|
||||||
timestamp = p1_time_compat:timestamp() :: erlang:timestamp() | '_' | '$1',
|
-callback get_prefs(binary(), binary()) -> {ok, #archive_prefs{}} | error.
|
||||||
peer = {<<"">>, <<"">>, <<"">>} :: ljid() | '_' | '$3' | undefined,
|
-callback select(binary(), jid(), jid(),
|
||||||
bare_peer = {<<"">>, <<"">>, <<"">>} :: ljid() | '_' | '$3',
|
none | erlang:timestamp(),
|
||||||
packet = #xmlel{} :: xmlel() | '_',
|
none | erlang:timestamp(),
|
||||||
nick = <<"">> :: binary(),
|
none | ljid() | {text, binary()},
|
||||||
type = chat :: chat | groupchat}).
|
none | #rsm_in{},
|
||||||
|
chat | groupchat) ->
|
||||||
-record(archive_prefs,
|
{[{binary(), non_neg_integer(), xmlel()}], boolean(), non_neg_integer()}.
|
||||||
{us = {<<"">>, <<"">>} :: {binary(), binary()},
|
|
||||||
default = never :: never | always | roster,
|
|
||||||
always = [] :: [ljid()],
|
|
||||||
never = [] :: [ljid()]}).
|
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% API
|
%%% API
|
||||||
|
@ -77,9 +73,9 @@
|
||||||
start(Host, Opts) ->
|
start(Host, Opts) ->
|
||||||
IQDisc = gen_mod:get_opt(iqdisc, Opts, fun gen_iq_handler:check_type/1,
|
IQDisc = gen_mod:get_opt(iqdisc, Opts, fun gen_iq_handler:check_type/1,
|
||||||
one_queue),
|
one_queue),
|
||||||
DBType = gen_mod:db_type(Host, Opts),
|
Mod = gen_mod:db_mod(Host, Opts, ?MODULE),
|
||||||
init_db(DBType, Host),
|
Mod:init(Host, Opts),
|
||||||
init_cache(DBType, Opts),
|
init_cache(Opts),
|
||||||
gen_iq_handler:add_iq_handler(ejabberd_local, Host,
|
gen_iq_handler:add_iq_handler(ejabberd_local, Host,
|
||||||
?NS_MAM_TMP, ?MODULE, process_iq_v0_2, IQDisc),
|
?NS_MAM_TMP, ?MODULE, process_iq_v0_2, IQDisc),
|
||||||
gen_iq_handler:add_iq_handler(ejabberd_sm, Host,
|
gen_iq_handler:add_iq_handler(ejabberd_sm, Host,
|
||||||
|
@ -120,18 +116,7 @@ start(Host, Opts) ->
|
||||||
ejabberd_commands:register_commands(get_commands_spec()),
|
ejabberd_commands:register_commands(get_commands_spec()),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
init_db(mnesia, _Host) ->
|
init_cache(Opts) ->
|
||||||
mnesia:create_table(archive_msg,
|
|
||||||
[{disc_only_copies, [node()]},
|
|
||||||
{type, bag},
|
|
||||||
{attributes, record_info(fields, archive_msg)}]),
|
|
||||||
mnesia:create_table(archive_prefs,
|
|
||||||
[{disc_only_copies, [node()]},
|
|
||||||
{attributes, record_info(fields, archive_prefs)}]);
|
|
||||||
init_db(_, _) ->
|
|
||||||
ok.
|
|
||||||
|
|
||||||
init_cache(_DBType, Opts) ->
|
|
||||||
MaxSize = gen_mod:get_opt(cache_size, Opts,
|
MaxSize = gen_mod:get_opt(cache_size, Opts,
|
||||||
fun(I) when is_integer(I), I>0 -> I end,
|
fun(I) when is_integer(I), I>0 -> I end,
|
||||||
1000),
|
1000),
|
||||||
|
@ -179,24 +164,14 @@ stop(Host) ->
|
||||||
remove_user(User, Server) ->
|
remove_user(User, Server) ->
|
||||||
LUser = jid:nodeprep(User),
|
LUser = jid:nodeprep(User),
|
||||||
LServer = jid:nameprep(Server),
|
LServer = jid:nameprep(Server),
|
||||||
remove_user(LUser, LServer,
|
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||||
gen_mod:db_type(LServer, ?MODULE)).
|
Mod:remove_user(LUser, LServer).
|
||||||
|
|
||||||
remove_user(LUser, LServer, mnesia) ->
|
remove_room(LServer, Name, Host) ->
|
||||||
US = {LUser, LServer},
|
LName = jid:nodeprep(Name),
|
||||||
F = fun () ->
|
LHost = jid:nameprep(Host),
|
||||||
mnesia:delete({archive_msg, US}),
|
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||||
mnesia:delete({archive_prefs, US})
|
Mod:remove_room(LServer, LName, LHost).
|
||||||
end,
|
|
||||||
mnesia:transaction(F);
|
|
||||||
remove_user(LUser, LServer, odbc) ->
|
|
||||||
SUser = ejabberd_odbc:escape(LUser),
|
|
||||||
ejabberd_odbc:sql_query(
|
|
||||||
LServer,
|
|
||||||
[<<"delete from archive where username='">>, SUser, <<"';">>]),
|
|
||||||
ejabberd_odbc:sql_query(
|
|
||||||
LServer,
|
|
||||||
[<<"delete from archive_prefs where username='">>, SUser, <<"';">>]).
|
|
||||||
|
|
||||||
user_receive_packet(Pkt, C2SState, JID, Peer, To) ->
|
user_receive_packet(Pkt, C2SState, JID, Peer, To) ->
|
||||||
LUser = JID#jid.luser,
|
LUser = JID#jid.luser,
|
||||||
|
@ -343,10 +318,10 @@ message_is_archived(false, C2SState, Peer,
|
||||||
if_enabled ->
|
if_enabled ->
|
||||||
get_prefs(LUser, LServer);
|
get_prefs(LUser, LServer);
|
||||||
on_request ->
|
on_request ->
|
||||||
DBType = gen_mod:db_type(LServer, ?MODULE),
|
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||||
cache_tab:lookup(archive_prefs, {LUser, LServer},
|
cache_tab:lookup(archive_prefs, {LUser, LServer},
|
||||||
fun() ->
|
fun() ->
|
||||||
get_prefs(LUser, LServer, DBType)
|
Mod:get_prefs(LUser, LServer)
|
||||||
end);
|
end);
|
||||||
never ->
|
never ->
|
||||||
error
|
error
|
||||||
|
@ -365,21 +340,19 @@ delete_old_messages(TypeBin, Days) when TypeBin == <<"chat">>;
|
||||||
Diff = Days * 24 * 60 * 60 * 1000000,
|
Diff = Days * 24 * 60 * 60 * 1000000,
|
||||||
TimeStamp = usec_to_now(p1_time_compat:system_time(micro_seconds) - Diff),
|
TimeStamp = usec_to_now(p1_time_compat:system_time(micro_seconds) - Diff),
|
||||||
Type = jlib:binary_to_atom(TypeBin),
|
Type = jlib:binary_to_atom(TypeBin),
|
||||||
{Results, _} =
|
DBTypes = lists:usort(
|
||||||
lists:foldl(fun(Host, {Results, MnesiaDone}) ->
|
lists:map(
|
||||||
case {gen_mod:db_type(Host, ?MODULE), MnesiaDone} of
|
fun(Host) ->
|
||||||
{mnesia, true} ->
|
case gen_mod:db_type(Host, ?MODULE) of
|
||||||
{Results, true};
|
odbc -> {odbc, Host};
|
||||||
{mnesia, false} ->
|
Other -> {Other, global}
|
||||||
Res = delete_old_messages(TimeStamp, Type,
|
end
|
||||||
global, mnesia),
|
end, ?MYHOSTS)),
|
||||||
{[Res|Results], true};
|
Results = lists:map(
|
||||||
{DBType, _} ->
|
fun({DBType, ServerHost}) ->
|
||||||
Res = delete_old_messages(TimeStamp, Type,
|
Mod = gen_mod:db_mod(DBType, ?MODULE),
|
||||||
Host, DBType),
|
Mod:delete_old_messages(ServerHost, TimeStamp, Type)
|
||||||
{[Res|Results], MnesiaDone}
|
end, DBTypes),
|
||||||
end
|
|
||||||
end, {[], false}, ?MYHOSTS),
|
|
||||||
case lists:filter(fun(Res) -> Res /= ok end, Results) of
|
case lists:filter(fun(Res) -> Res /= ok end, Results) of
|
||||||
[] -> ok;
|
[] -> ok;
|
||||||
[NotOk|_] -> NotOk
|
[NotOk|_] -> NotOk
|
||||||
|
@ -387,21 +360,6 @@ delete_old_messages(TypeBin, Days) when TypeBin == <<"chat">>;
|
||||||
delete_old_messages(_TypeBin, _Days) ->
|
delete_old_messages(_TypeBin, _Days) ->
|
||||||
unsupported_type.
|
unsupported_type.
|
||||||
|
|
||||||
delete_old_messages(TimeStamp, Type, global, mnesia) ->
|
|
||||||
MS = ets:fun2ms(fun(#archive_msg{timestamp = MsgTS,
|
|
||||||
type = MsgType} = Msg)
|
|
||||||
when MsgTS < TimeStamp,
|
|
||||||
MsgType == Type orelse Type == all ->
|
|
||||||
Msg
|
|
||||||
end),
|
|
||||||
OldMsgs = mnesia:dirty_select(archive_msg, MS),
|
|
||||||
lists:foreach(fun(Rec) ->
|
|
||||||
ok = mnesia:dirty_delete_object(Rec)
|
|
||||||
end, OldMsgs);
|
|
||||||
delete_old_messages(_TimeStamp, _Type, _Host, _DBType) ->
|
|
||||||
%% TODO
|
|
||||||
not_implemented.
|
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
@ -427,15 +385,9 @@ process_iq(LServer, #iq{sub_el = #xmlel{attrs = Attrs}} = IQ) ->
|
||||||
#xmlel{name = <<"field">>,
|
#xmlel{name = <<"field">>,
|
||||||
attrs = [{<<"type">>, <<"text-single">>},
|
attrs = [{<<"type">>, <<"text-single">>},
|
||||||
{<<"var">>, <<"end">>}]}],
|
{<<"var">>, <<"end">>}]}],
|
||||||
Fields = case gen_mod:db_type(LServer, ?MODULE) of
|
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||||
odbc ->
|
ExtendedFields = Mod:extended_fields(),
|
||||||
WithText = #xmlel{name = <<"field">>,
|
Fields = ExtendedFields ++ CommonFields,
|
||||||
attrs = [{<<"type">>, <<"text-single">>},
|
|
||||||
{<<"var">>, <<"withtext">>}]},
|
|
||||||
[WithText|CommonFields];
|
|
||||||
_ ->
|
|
||||||
CommonFields
|
|
||||||
end,
|
|
||||||
Form = #xmlel{name = <<"x">>,
|
Form = #xmlel{name = <<"x">>,
|
||||||
attrs = [{<<"xmlns">>, ?NS_XDATA}, {<<"type">>, <<"form">>}],
|
attrs = [{<<"xmlns">>, ?NS_XDATA}, {<<"type">>, <<"form">>}],
|
||||||
children = Fields},
|
children = Fields},
|
||||||
|
@ -715,8 +667,8 @@ store_msg(C2SState, Pkt, LUser, LServer, Peer, Dir) ->
|
||||||
case should_archive_peer(C2SState, Prefs, Peer) of
|
case should_archive_peer(C2SState, Prefs, Peer) of
|
||||||
true ->
|
true ->
|
||||||
US = {LUser, LServer},
|
US = {LUser, LServer},
|
||||||
store(Pkt, LServer, US, chat, Peer, <<"">>, Dir,
|
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||||
gen_mod:db_type(LServer, ?MODULE));
|
Mod:store(Pkt, LServer, US, chat, Peer, <<"">>, Dir);
|
||||||
false ->
|
false ->
|
||||||
pass
|
pass
|
||||||
end.
|
end.
|
||||||
|
@ -726,101 +678,26 @@ store_muc(MUCState, Pkt, RoomJID, Peer, Nick) ->
|
||||||
true ->
|
true ->
|
||||||
LServer = MUCState#state.server_host,
|
LServer = MUCState#state.server_host,
|
||||||
{U, S, _} = jid:tolower(RoomJID),
|
{U, S, _} = jid:tolower(RoomJID),
|
||||||
store(Pkt, LServer, {U, S}, groupchat, Peer, Nick, recv,
|
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||||
gen_mod:db_type(LServer, ?MODULE));
|
Mod:store(Pkt, LServer, {U, S}, groupchat, Peer, Nick, recv);
|
||||||
false ->
|
false ->
|
||||||
pass
|
pass
|
||||||
end.
|
end.
|
||||||
|
|
||||||
store(Pkt, _, {LUser, LServer}, Type, Peer, Nick, _Dir, mnesia) ->
|
|
||||||
LPeer = {PUser, PServer, _} = jid:tolower(Peer),
|
|
||||||
TS = p1_time_compat:timestamp(),
|
|
||||||
ID = jlib:integer_to_binary(now_to_usec(TS)),
|
|
||||||
case mnesia:dirty_write(
|
|
||||||
#archive_msg{us = {LUser, LServer},
|
|
||||||
id = ID,
|
|
||||||
timestamp = TS,
|
|
||||||
peer = LPeer,
|
|
||||||
bare_peer = {PUser, PServer, <<>>},
|
|
||||||
type = Type,
|
|
||||||
nick = Nick,
|
|
||||||
packet = Pkt}) of
|
|
||||||
ok ->
|
|
||||||
{ok, ID};
|
|
||||||
Err ->
|
|
||||||
Err
|
|
||||||
end;
|
|
||||||
store(Pkt, LServer, {LUser, LHost}, Type, Peer, Nick, _Dir, odbc) ->
|
|
||||||
TSinteger = p1_time_compat:system_time(micro_seconds),
|
|
||||||
ID = TS = jlib:integer_to_binary(TSinteger),
|
|
||||||
SUser = case Type of
|
|
||||||
chat -> LUser;
|
|
||||||
groupchat -> jid:to_string({LUser, LHost, <<>>})
|
|
||||||
end,
|
|
||||||
BarePeer = jid:to_string(
|
|
||||||
jid:tolower(
|
|
||||||
jid:remove_resource(Peer))),
|
|
||||||
LPeer = jid:to_string(
|
|
||||||
jid:tolower(Peer)),
|
|
||||||
XML = fxml:element_to_binary(Pkt),
|
|
||||||
Body = fxml:get_subtag_cdata(Pkt, <<"body">>),
|
|
||||||
case ejabberd_odbc:sql_query(
|
|
||||||
LServer,
|
|
||||||
[<<"insert into archive (username, timestamp, "
|
|
||||||
"peer, bare_peer, xml, txt, kind, nick) values (">>,
|
|
||||||
<<"'">>, ejabberd_odbc:escape(SUser), <<"', ">>,
|
|
||||||
<<"'">>, TS, <<"', ">>,
|
|
||||||
<<"'">>, ejabberd_odbc:escape(LPeer), <<"', ">>,
|
|
||||||
<<"'">>, ejabberd_odbc:escape(BarePeer), <<"', ">>,
|
|
||||||
<<"'">>, ejabberd_odbc:escape(XML), <<"', ">>,
|
|
||||||
<<"'">>, ejabberd_odbc:escape(Body), <<"', ">>,
|
|
||||||
<<"'">>, jlib:atom_to_binary(Type), <<"', ">>,
|
|
||||||
<<"'">>, ejabberd_odbc:escape(Nick), <<"');">>]) of
|
|
||||||
{updated, _} ->
|
|
||||||
{ok, ID};
|
|
||||||
Err ->
|
|
||||||
Err
|
|
||||||
end.
|
|
||||||
|
|
||||||
write_prefs(LUser, LServer, Host, Default, Always, Never) ->
|
write_prefs(LUser, LServer, Host, Default, Always, Never) ->
|
||||||
DBType = case gen_mod:db_type(Host, ?MODULE) of
|
|
||||||
odbc -> {odbc, Host};
|
|
||||||
DB -> DB
|
|
||||||
end,
|
|
||||||
Prefs = #archive_prefs{us = {LUser, LServer},
|
Prefs = #archive_prefs{us = {LUser, LServer},
|
||||||
default = Default,
|
default = Default,
|
||||||
always = Always,
|
always = Always,
|
||||||
never = Never},
|
never = Never},
|
||||||
|
Mod = gen_mod:db_mod(Host, ?MODULE),
|
||||||
cache_tab:dirty_insert(
|
cache_tab:dirty_insert(
|
||||||
archive_prefs, {LUser, LServer}, Prefs,
|
archive_prefs, {LUser, LServer}, Prefs,
|
||||||
fun() -> write_prefs(LUser, LServer, Prefs, DBType) end).
|
fun() -> Mod:write_prefs(LUser, LServer, Prefs, Host) end).
|
||||||
|
|
||||||
write_prefs(_LUser, _LServer, Prefs, mnesia) ->
|
|
||||||
mnesia:dirty_write(Prefs);
|
|
||||||
write_prefs(LUser, _LServer, #archive_prefs{default = Default,
|
|
||||||
never = Never,
|
|
||||||
always = Always},
|
|
||||||
{odbc, Host}) ->
|
|
||||||
SUser = ejabberd_odbc:escape(LUser),
|
|
||||||
SDefault = erlang:atom_to_binary(Default, utf8),
|
|
||||||
SAlways = ejabberd_odbc:encode_term(Always),
|
|
||||||
SNever = ejabberd_odbc:encode_term(Never),
|
|
||||||
case update(Host, <<"archive_prefs">>,
|
|
||||||
[<<"username">>, <<"def">>, <<"always">>, <<"never">>],
|
|
||||||
[SUser, SDefault, SAlways, SNever],
|
|
||||||
[<<"username='">>, SUser, <<"'">>]) of
|
|
||||||
{updated, _} ->
|
|
||||||
ok;
|
|
||||||
Err ->
|
|
||||||
Err
|
|
||||||
end.
|
|
||||||
|
|
||||||
get_prefs(LUser, LServer) ->
|
get_prefs(LUser, LServer) ->
|
||||||
DBType = gen_mod:db_type(LServer, ?MODULE),
|
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||||
Res = cache_tab:lookup(archive_prefs, {LUser, LServer},
|
Res = cache_tab:lookup(archive_prefs, {LUser, LServer},
|
||||||
fun() -> get_prefs(LUser, LServer,
|
fun() -> Mod:get_prefs(LUser, LServer) end),
|
||||||
DBType)
|
|
||||||
end),
|
|
||||||
case Res of
|
case Res of
|
||||||
{ok, Prefs} ->
|
{ok, Prefs} ->
|
||||||
Prefs;
|
Prefs;
|
||||||
|
@ -842,31 +719,6 @@ get_prefs(LUser, LServer) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
get_prefs(LUser, LServer, mnesia) ->
|
|
||||||
case mnesia:dirty_read(archive_prefs, {LUser, LServer}) of
|
|
||||||
[Prefs] ->
|
|
||||||
{ok, Prefs};
|
|
||||||
_ ->
|
|
||||||
error
|
|
||||||
end;
|
|
||||||
get_prefs(LUser, LServer, odbc) ->
|
|
||||||
case ejabberd_odbc:sql_query(
|
|
||||||
LServer,
|
|
||||||
[<<"select def, always, never from archive_prefs ">>,
|
|
||||||
<<"where username='">>,
|
|
||||||
ejabberd_odbc:escape(LUser), <<"';">>]) of
|
|
||||||
{selected, _, [[SDefault, SAlways, SNever]]} ->
|
|
||||||
Default = erlang:binary_to_existing_atom(SDefault, utf8),
|
|
||||||
Always = ejabberd_odbc:decode_term(SAlways),
|
|
||||||
Never = ejabberd_odbc:decode_term(SNever),
|
|
||||||
{ok, #archive_prefs{us = {LUser, LServer},
|
|
||||||
default = Default,
|
|
||||||
always = Always,
|
|
||||||
never = Never}};
|
|
||||||
_ ->
|
|
||||||
error
|
|
||||||
end.
|
|
||||||
|
|
||||||
prefs_el(Default, Always, Never, NS) ->
|
prefs_el(Default, Always, Never, NS) ->
|
||||||
Default1 = jlib:atom_to_binary(Default),
|
Default1 = jlib:atom_to_binary(Default),
|
||||||
JFun = fun(L) ->
|
JFun = fun(L) ->
|
||||||
|
@ -890,11 +742,10 @@ maybe_activate_mam(LUser, LServer) ->
|
||||||
false),
|
false),
|
||||||
case ActivateOpt of
|
case ActivateOpt of
|
||||||
true ->
|
true ->
|
||||||
|
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||||
Res = cache_tab:lookup(archive_prefs, {LUser, LServer},
|
Res = cache_tab:lookup(archive_prefs, {LUser, LServer},
|
||||||
fun() ->
|
fun() ->
|
||||||
get_prefs(LUser, LServer,
|
Mod:get_prefs(LUser, LServer)
|
||||||
gen_mod:db_type(LServer,
|
|
||||||
?MODULE))
|
|
||||||
end),
|
end),
|
||||||
case Res of
|
case Res of
|
||||||
{ok, _Prefs} ->
|
{ok, _Prefs} ->
|
||||||
|
@ -912,31 +763,22 @@ maybe_activate_mam(LUser, LServer) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
select_and_send(LServer, From, To, Start, End, With, RSM, IQ, MsgType) ->
|
select_and_send(LServer, From, To, Start, End, With, RSM, IQ, MsgType) ->
|
||||||
DBType = case gen_mod:db_type(LServer, ?MODULE) of
|
|
||||||
odbc -> {odbc, LServer};
|
|
||||||
DB -> DB
|
|
||||||
end,
|
|
||||||
select_and_send(LServer, From, To, Start, End, With, RSM, IQ,
|
|
||||||
MsgType, DBType).
|
|
||||||
|
|
||||||
select_and_send(LServer, From, To, Start, End, With, RSM, IQ, MsgType, DBType) ->
|
|
||||||
{Msgs, IsComplete, Count} = select_and_start(LServer, From, To, Start, End,
|
{Msgs, IsComplete, Count} = select_and_start(LServer, From, To, Start, End,
|
||||||
With, RSM, MsgType, DBType),
|
With, RSM, MsgType),
|
||||||
SortedMsgs = lists:keysort(2, Msgs),
|
SortedMsgs = lists:keysort(2, Msgs),
|
||||||
send(From, To, SortedMsgs, RSM, Count, IsComplete, IQ).
|
send(From, To, SortedMsgs, RSM, Count, IsComplete, IQ).
|
||||||
|
|
||||||
select_and_start(LServer, From, To, Start, End, With, RSM, MsgType, DBType) ->
|
select_and_start(LServer, From, To, Start, End, With, RSM, MsgType) ->
|
||||||
case MsgType of
|
case MsgType of
|
||||||
chat ->
|
chat ->
|
||||||
select(LServer, From, From, Start, End, With, RSM, MsgType, DBType);
|
select(LServer, From, From, Start, End, With, RSM, MsgType);
|
||||||
{groupchat, _Role, _MUCState} ->
|
{groupchat, _Role, _MUCState} ->
|
||||||
select(LServer, From, To, Start, End, With, RSM, MsgType, DBType)
|
select(LServer, From, To, Start, End, With, RSM, MsgType)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
select(_LServer, JidRequestor, JidArchive, Start, End, _With, RSM,
|
select(_LServer, JidRequestor, JidArchive, Start, End, _With, RSM,
|
||||||
{groupchat, _Role, #state{config = #config{mam = false},
|
{groupchat, _Role, #state{config = #config{mam = false},
|
||||||
history = History}} = MsgType,
|
history = History}} = MsgType) ->
|
||||||
_DBType) ->
|
|
||||||
#lqueue{len = L, queue = Q} = History,
|
#lqueue{len = L, queue = Q} = History,
|
||||||
{Msgs0, _} =
|
{Msgs0, _} =
|
||||||
lists:mapfoldl(
|
lists:mapfoldl(
|
||||||
|
@ -970,81 +812,9 @@ select(_LServer, JidRequestor, JidArchive, Start, End, _With, RSM,
|
||||||
_ ->
|
_ ->
|
||||||
{Msgs, true, L}
|
{Msgs, true, L}
|
||||||
end;
|
end;
|
||||||
select(_LServer, JidRequestor,
|
select(LServer, From, From, Start, End, With, RSM, MsgType) ->
|
||||||
#jid{luser = LUser, lserver = LServer} = JidArchive,
|
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||||
Start, End, With, RSM, MsgType, mnesia) ->
|
Mod:select(LServer, From, From, Start, End, With, RSM, MsgType).
|
||||||
MS = make_matchspec(LUser, LServer, Start, End, With),
|
|
||||||
Msgs = mnesia:dirty_select(archive_msg, MS),
|
|
||||||
SortedMsgs = lists:keysort(#archive_msg.timestamp, Msgs),
|
|
||||||
{FilteredMsgs, IsComplete} = filter_by_rsm(SortedMsgs, RSM),
|
|
||||||
Count = length(Msgs),
|
|
||||||
{lists:map(
|
|
||||||
fun(Msg) ->
|
|
||||||
{Msg#archive_msg.id,
|
|
||||||
jlib:binary_to_integer(Msg#archive_msg.id),
|
|
||||||
msg_to_el(Msg, MsgType, JidRequestor, JidArchive)}
|
|
||||||
end, FilteredMsgs), IsComplete, Count};
|
|
||||||
select(LServer, JidRequestor, #jid{luser = LUser} = JidArchive,
|
|
||||||
Start, End, With, RSM, MsgType, {odbc, Host}) ->
|
|
||||||
User = case MsgType of
|
|
||||||
chat -> LUser;
|
|
||||||
{groupchat, _Role, _MUCState} -> jid:to_string(JidArchive)
|
|
||||||
end,
|
|
||||||
{Query, CountQuery} = make_sql_query(User, LServer,
|
|
||||||
Start, End, With, RSM),
|
|
||||||
% TODO from XEP-0313 v0.2: "To conserve resources, a server MAY place a
|
|
||||||
% reasonable limit on how many stanzas may be pushed to a client in one
|
|
||||||
% request. If a query returns a number of stanzas greater than this limit
|
|
||||||
% and the client did not specify a limit using RSM then the server should
|
|
||||||
% return a policy-violation error to the client." We currently don't do this
|
|
||||||
% for v0.2 requests, but we do limit #rsm_in.max for v0.3 and newer.
|
|
||||||
case {ejabberd_odbc:sql_query(Host, Query),
|
|
||||||
ejabberd_odbc:sql_query(Host, CountQuery)} of
|
|
||||||
{{selected, _, Res}, {selected, _, [[Count]]}} ->
|
|
||||||
{Max, Direction} = case RSM of
|
|
||||||
#rsm_in{max = M, direction = D} -> {M, D};
|
|
||||||
_ -> {undefined, undefined}
|
|
||||||
end,
|
|
||||||
{Res1, IsComplete} =
|
|
||||||
if Max >= 0 andalso Max /= undefined andalso length(Res) > Max ->
|
|
||||||
if Direction == before ->
|
|
||||||
{lists:nthtail(1, Res), false};
|
|
||||||
true ->
|
|
||||||
{lists:sublist(Res, Max), false}
|
|
||||||
end;
|
|
||||||
true ->
|
|
||||||
{Res, true}
|
|
||||||
end,
|
|
||||||
{lists:flatmap(
|
|
||||||
fun([TS, XML, PeerBin, Kind, Nick]) ->
|
|
||||||
try
|
|
||||||
#xmlel{} = El = fxml_stream:parse_element(XML),
|
|
||||||
Now = usec_to_now(jlib:binary_to_integer(TS)),
|
|
||||||
PeerJid = jid:tolower(jid:from_string(PeerBin)),
|
|
||||||
T = case Kind of
|
|
||||||
<<"">> -> chat;
|
|
||||||
null -> chat;
|
|
||||||
_ -> jlib:binary_to_atom(Kind)
|
|
||||||
end,
|
|
||||||
[{TS, jlib:binary_to_integer(TS),
|
|
||||||
msg_to_el(#archive_msg{timestamp = Now,
|
|
||||||
packet = El,
|
|
||||||
type = T,
|
|
||||||
nick = Nick,
|
|
||||||
peer = PeerJid},
|
|
||||||
MsgType, JidRequestor, JidArchive)}]
|
|
||||||
catch _:Err ->
|
|
||||||
?ERROR_MSG("failed to parse data from SQL: ~p. "
|
|
||||||
"The data was: "
|
|
||||||
"timestamp = ~s, xml = ~s, "
|
|
||||||
"peer = ~s, kind = ~s, nick = ~s",
|
|
||||||
[Err, TS, XML, PeerBin, Kind, Nick]),
|
|
||||||
[]
|
|
||||||
end
|
|
||||||
end, Res1), IsComplete, jlib:binary_to_integer(Count)};
|
|
||||||
_ ->
|
|
||||||
{[], false, 0}
|
|
||||||
end.
|
|
||||||
|
|
||||||
msg_to_el(#archive_msg{timestamp = TS, packet = Pkt1, nick = Nick, peer = Peer},
|
msg_to_el(#archive_msg{timestamp = TS, packet = Pkt1, nick = Nick, peer = Peer},
|
||||||
MsgType, JidRequestor, #jid{lserver = LServer} = JidArchive) ->
|
MsgType, JidRequestor, #jid{lserver = LServer} = JidArchive) ->
|
||||||
|
@ -1160,7 +930,6 @@ send(From, To, Msgs, RSM, Count, IsComplete, #iq{sub_el = SubEl} = IQ) ->
|
||||||
ignore
|
ignore
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
make_rsm_out([], _, Count, Attrs, NS) ->
|
make_rsm_out([], _, Count, Attrs, NS) ->
|
||||||
Tag = if NS == ?NS_MAM_TMP -> <<"query">>;
|
Tag = if NS == ?NS_MAM_TMP -> <<"query">>;
|
||||||
true -> <<"fin">>
|
true -> <<"fin">>
|
||||||
|
@ -1177,32 +946,6 @@ make_rsm_out([{FirstID, _, _}|_] = Msgs, _, Count, Attrs, NS) ->
|
||||||
#rsm_out{first = FirstID, count = Count,
|
#rsm_out{first = FirstID, count = Count,
|
||||||
last = LastID})}].
|
last = LastID})}].
|
||||||
|
|
||||||
filter_by_rsm(Msgs, none) ->
|
|
||||||
{Msgs, true};
|
|
||||||
filter_by_rsm(_Msgs, #rsm_in{max = Max}) when Max < 0 ->
|
|
||||||
{[], true};
|
|
||||||
filter_by_rsm(Msgs, #rsm_in{max = Max, direction = Direction, id = ID}) ->
|
|
||||||
NewMsgs = case Direction of
|
|
||||||
aft when ID /= <<"">> ->
|
|
||||||
lists:filter(
|
|
||||||
fun(#archive_msg{id = I}) ->
|
|
||||||
?BIN_GREATER_THAN(I, ID)
|
|
||||||
end, Msgs);
|
|
||||||
before when ID /= <<"">> ->
|
|
||||||
lists:foldl(
|
|
||||||
fun(#archive_msg{id = I} = Msg, Acc)
|
|
||||||
when ?BIN_LESS_THAN(I, ID) ->
|
|
||||||
[Msg|Acc];
|
|
||||||
(_, Acc) ->
|
|
||||||
Acc
|
|
||||||
end, [], Msgs);
|
|
||||||
before when ID == <<"">> ->
|
|
||||||
lists:reverse(Msgs);
|
|
||||||
_ ->
|
|
||||||
Msgs
|
|
||||||
end,
|
|
||||||
filter_by_max(NewMsgs, Max).
|
|
||||||
|
|
||||||
filter_by_max(Msgs, undefined) ->
|
filter_by_max(Msgs, undefined) ->
|
||||||
{Msgs, true};
|
{Msgs, true};
|
||||||
filter_by_max(Msgs, Len) when is_integer(Len), Len >= 0 ->
|
filter_by_max(Msgs, Len) when is_integer(Len), Len >= 0 ->
|
||||||
|
@ -1231,126 +974,6 @@ match_rsm(Now, #rsm_in{id = ID, direction = before}) when ID /= <<"">> ->
|
||||||
match_rsm(_Now, _) ->
|
match_rsm(_Now, _) ->
|
||||||
true.
|
true.
|
||||||
|
|
||||||
make_matchspec(LUser, LServer, Start, End, {_, _, <<>>} = With) ->
|
|
||||||
ets:fun2ms(
|
|
||||||
fun(#archive_msg{timestamp = TS,
|
|
||||||
us = US,
|
|
||||||
bare_peer = BPeer} = Msg)
|
|
||||||
when Start =< TS, End >= TS,
|
|
||||||
US == {LUser, LServer},
|
|
||||||
BPeer == With ->
|
|
||||||
Msg
|
|
||||||
end);
|
|
||||||
make_matchspec(LUser, LServer, Start, End, {_, _, _} = With) ->
|
|
||||||
ets:fun2ms(
|
|
||||||
fun(#archive_msg{timestamp = TS,
|
|
||||||
us = US,
|
|
||||||
peer = Peer} = Msg)
|
|
||||||
when Start =< TS, End >= TS,
|
|
||||||
US == {LUser, LServer},
|
|
||||||
Peer == With ->
|
|
||||||
Msg
|
|
||||||
end);
|
|
||||||
make_matchspec(LUser, LServer, Start, End, none) ->
|
|
||||||
ets:fun2ms(
|
|
||||||
fun(#archive_msg{timestamp = TS,
|
|
||||||
us = US,
|
|
||||||
peer = Peer} = Msg)
|
|
||||||
when Start =< TS, End >= TS,
|
|
||||||
US == {LUser, LServer} ->
|
|
||||||
Msg
|
|
||||||
end).
|
|
||||||
|
|
||||||
make_sql_query(User, LServer, Start, End, With, RSM) ->
|
|
||||||
{Max, Direction, ID} = case RSM of
|
|
||||||
#rsm_in{} ->
|
|
||||||
{RSM#rsm_in.max,
|
|
||||||
RSM#rsm_in.direction,
|
|
||||||
RSM#rsm_in.id};
|
|
||||||
none ->
|
|
||||||
{none, none, <<>>}
|
|
||||||
end,
|
|
||||||
ODBCType = ejabberd_config:get_option(
|
|
||||||
{odbc_type, LServer},
|
|
||||||
ejabberd_odbc:opt_type(odbc_type)),
|
|
||||||
LimitClause = if is_integer(Max), Max >= 0, ODBCType /= mssql ->
|
|
||||||
[<<" limit ">>, jlib:integer_to_binary(Max+1)];
|
|
||||||
true ->
|
|
||||||
[]
|
|
||||||
end,
|
|
||||||
TopClause = if is_integer(Max), Max >= 0, ODBCType == mssql ->
|
|
||||||
[<<" TOP ">>, jlib:integer_to_binary(Max+1)];
|
|
||||||
true ->
|
|
||||||
[]
|
|
||||||
end,
|
|
||||||
WithClause = case With of
|
|
||||||
{text, <<>>} ->
|
|
||||||
[];
|
|
||||||
{text, Txt} ->
|
|
||||||
[<<" and match (txt) against ('">>,
|
|
||||||
ejabberd_odbc:escape(Txt), <<"')">>];
|
|
||||||
{_, _, <<>>} ->
|
|
||||||
[<<" and bare_peer='">>,
|
|
||||||
ejabberd_odbc:escape(jid:to_string(With)),
|
|
||||||
<<"'">>];
|
|
||||||
{_, _, _} ->
|
|
||||||
[<<" and peer='">>,
|
|
||||||
ejabberd_odbc:escape(jid:to_string(With)),
|
|
||||||
<<"'">>];
|
|
||||||
none ->
|
|
||||||
[]
|
|
||||||
end,
|
|
||||||
PageClause = case catch jlib:binary_to_integer(ID) of
|
|
||||||
I when is_integer(I), I >= 0 ->
|
|
||||||
case Direction of
|
|
||||||
before ->
|
|
||||||
[<<" AND timestamp < ">>, ID];
|
|
||||||
aft ->
|
|
||||||
[<<" AND timestamp > ">>, ID];
|
|
||||||
_ ->
|
|
||||||
[]
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
[]
|
|
||||||
end,
|
|
||||||
StartClause = case Start of
|
|
||||||
{_, _, _} ->
|
|
||||||
[<<" and timestamp >= ">>,
|
|
||||||
jlib:integer_to_binary(now_to_usec(Start))];
|
|
||||||
_ ->
|
|
||||||
[]
|
|
||||||
end,
|
|
||||||
EndClause = case End of
|
|
||||||
{_, _, _} ->
|
|
||||||
[<<" and timestamp <= ">>,
|
|
||||||
jlib:integer_to_binary(now_to_usec(End))];
|
|
||||||
_ ->
|
|
||||||
[]
|
|
||||||
end,
|
|
||||||
SUser = ejabberd_odbc:escape(User),
|
|
||||||
|
|
||||||
Query = [<<"SELECT ">>, TopClause, <<" timestamp, xml, peer, kind, nick"
|
|
||||||
" FROM archive WHERE username='">>,
|
|
||||||
SUser, <<"'">>, WithClause, StartClause, EndClause,
|
|
||||||
PageClause],
|
|
||||||
|
|
||||||
QueryPage =
|
|
||||||
case Direction of
|
|
||||||
before ->
|
|
||||||
% ID can be empty because of
|
|
||||||
% XEP-0059: Result Set Management
|
|
||||||
% 2.5 Requesting the Last Page in a Result Set
|
|
||||||
[<<"SELECT timestamp, xml, peer, kind, nick FROM (">>, Query,
|
|
||||||
<<" ORDER BY timestamp DESC ">>,
|
|
||||||
LimitClause, <<") AS t ORDER BY timestamp ASC;">>];
|
|
||||||
_ ->
|
|
||||||
[Query, <<" ORDER BY timestamp ASC ">>,
|
|
||||||
LimitClause, <<";">>]
|
|
||||||
end,
|
|
||||||
{QueryPage,
|
|
||||||
[<<"SELECT COUNT(*) FROM archive WHERE username='">>,
|
|
||||||
SUser, <<"'">>, WithClause, StartClause, EndClause, <<";">>]}.
|
|
||||||
|
|
||||||
now_to_usec({MSec, Sec, USec}) ->
|
now_to_usec({MSec, Sec, USec}) ->
|
||||||
(MSec*1000000 + Sec)*1000000 + USec.
|
(MSec*1000000 + Sec)*1000000 + USec.
|
||||||
|
|
||||||
|
@ -1376,28 +999,6 @@ get_jids(Els) ->
|
||||||
[]
|
[]
|
||||||
end, Els).
|
end, Els).
|
||||||
|
|
||||||
update(LServer, Table, Fields, Vals, Where) ->
|
|
||||||
UPairs = lists:zipwith(fun (A, B) ->
|
|
||||||
<<A/binary, "='", B/binary, "'">>
|
|
||||||
end,
|
|
||||||
Fields, Vals),
|
|
||||||
case ejabberd_odbc:sql_query(LServer,
|
|
||||||
[<<"update ">>, Table, <<" set ">>,
|
|
||||||
join(UPairs, <<", ">>), <<" where ">>, Where,
|
|
||||||
<<";">>])
|
|
||||||
of
|
|
||||||
{updated, 1} -> {updated, 1};
|
|
||||||
_ ->
|
|
||||||
ejabberd_odbc:sql_query(LServer,
|
|
||||||
[<<"insert into ">>, Table, <<"(">>,
|
|
||||||
join(Fields, <<", ">>), <<") values ('">>,
|
|
||||||
join(Vals, <<"', '">>), <<"');">>])
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% Almost a copy of string:join/2.
|
|
||||||
join([], _Sep) -> [];
|
|
||||||
join([H | T], Sep) -> [H, [[Sep, X] || X <- T]].
|
|
||||||
|
|
||||||
get_commands_spec() ->
|
get_commands_spec() ->
|
||||||
[#ejabberd_commands{name = delete_old_mam_messages, tags = [purge],
|
[#ejabberd_commands{name = delete_old_mam_messages, tags = [purge],
|
||||||
desc = "Delete MAM messages older than DAYS",
|
desc = "Delete MAM messages older than DAYS",
|
||||||
|
@ -1416,7 +1017,11 @@ mod_opt_type(cache_life_time) ->
|
||||||
fun (I) when is_integer(I), I > 0 -> I end;
|
fun (I) when is_integer(I), I > 0 -> I end;
|
||||||
mod_opt_type(cache_size) ->
|
mod_opt_type(cache_size) ->
|
||||||
fun (I) when is_integer(I), I > 0 -> I end;
|
fun (I) when is_integer(I), I > 0 -> I end;
|
||||||
mod_opt_type(db_type) -> fun gen_mod:v_db/1;
|
mod_opt_type(db_type) ->
|
||||||
|
fun(odbc) -> odbc;
|
||||||
|
(internal) -> mnesia;
|
||||||
|
(mnesia) -> mnesia
|
||||||
|
end;
|
||||||
mod_opt_type(default) ->
|
mod_opt_type(default) ->
|
||||||
fun (always) -> always;
|
fun (always) -> always;
|
||||||
(never) -> never;
|
(never) -> never;
|
||||||
|
|
|
@ -0,0 +1,178 @@
|
||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
|
||||||
|
%%% @copyright (C) 2016, Evgeny Khramtsov
|
||||||
|
%%% @doc
|
||||||
|
%%%
|
||||||
|
%%% @end
|
||||||
|
%%% Created : 15 Apr 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
|
||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
-module(mod_mam_mnesia).
|
||||||
|
|
||||||
|
-behaviour(mod_mam).
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([init/2, remove_user/2, remove_room/3, delete_old_messages/3,
|
||||||
|
extended_fields/0, store/7, write_prefs/4, get_prefs/2, select/8]).
|
||||||
|
|
||||||
|
-include_lib("stdlib/include/ms_transform.hrl").
|
||||||
|
-include("jlib.hrl").
|
||||||
|
-include("mod_mam.hrl").
|
||||||
|
|
||||||
|
-define(BIN_GREATER_THAN(A, B),
|
||||||
|
((A > B andalso byte_size(A) == byte_size(B))
|
||||||
|
orelse byte_size(A) > byte_size(B))).
|
||||||
|
-define(BIN_LESS_THAN(A, B),
|
||||||
|
((A < B andalso byte_size(A) == byte_size(B))
|
||||||
|
orelse byte_size(A) < byte_size(B))).
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% API
|
||||||
|
%%%===================================================================
|
||||||
|
init(_Host, _Opts) ->
|
||||||
|
mnesia:create_table(archive_msg,
|
||||||
|
[{disc_only_copies, [node()]},
|
||||||
|
{type, bag},
|
||||||
|
{attributes, record_info(fields, archive_msg)}]),
|
||||||
|
mnesia:create_table(archive_prefs,
|
||||||
|
[{disc_only_copies, [node()]},
|
||||||
|
{attributes, record_info(fields, archive_prefs)}]).
|
||||||
|
|
||||||
|
remove_user(LUser, LServer) ->
|
||||||
|
US = {LUser, LServer},
|
||||||
|
F = fun () ->
|
||||||
|
mnesia:delete({archive_msg, US}),
|
||||||
|
mnesia:delete({archive_prefs, US})
|
||||||
|
end,
|
||||||
|
mnesia:transaction(F).
|
||||||
|
|
||||||
|
remove_room(_LServer, LName, LHost) ->
|
||||||
|
remove_user(LName, LHost).
|
||||||
|
|
||||||
|
delete_old_messages(global, TimeStamp, Type) ->
|
||||||
|
MS = ets:fun2ms(fun(#archive_msg{timestamp = MsgTS,
|
||||||
|
type = MsgType} = Msg)
|
||||||
|
when MsgTS < TimeStamp,
|
||||||
|
MsgType == Type orelse Type == all ->
|
||||||
|
Msg
|
||||||
|
end),
|
||||||
|
OldMsgs = mnesia:dirty_select(archive_msg, MS),
|
||||||
|
lists:foreach(fun(Rec) ->
|
||||||
|
ok = mnesia:dirty_delete_object(Rec)
|
||||||
|
end, OldMsgs).
|
||||||
|
|
||||||
|
extended_fields() ->
|
||||||
|
[].
|
||||||
|
|
||||||
|
store(Pkt, _, {LUser, LServer}, Type, Peer, Nick, _Dir) ->
|
||||||
|
LPeer = {PUser, PServer, _} = jid:tolower(Peer),
|
||||||
|
TS = p1_time_compat:timestamp(),
|
||||||
|
ID = jlib:integer_to_binary(now_to_usec(TS)),
|
||||||
|
case mnesia:dirty_write(
|
||||||
|
#archive_msg{us = {LUser, LServer},
|
||||||
|
id = ID,
|
||||||
|
timestamp = TS,
|
||||||
|
peer = LPeer,
|
||||||
|
bare_peer = {PUser, PServer, <<>>},
|
||||||
|
type = Type,
|
||||||
|
nick = Nick,
|
||||||
|
packet = Pkt}) of
|
||||||
|
ok ->
|
||||||
|
{ok, ID};
|
||||||
|
Err ->
|
||||||
|
Err
|
||||||
|
end.
|
||||||
|
|
||||||
|
write_prefs(_LUser, _LServer, Prefs, _ServerHost) ->
|
||||||
|
mnesia:dirty_write(Prefs).
|
||||||
|
|
||||||
|
get_prefs(LUser, LServer) ->
|
||||||
|
case mnesia:dirty_read(archive_prefs, {LUser, LServer}) of
|
||||||
|
[Prefs] ->
|
||||||
|
{ok, Prefs};
|
||||||
|
_ ->
|
||||||
|
error
|
||||||
|
end.
|
||||||
|
|
||||||
|
select(_LServer, JidRequestor,
|
||||||
|
#jid{luser = LUser, lserver = LServer} = JidArchive,
|
||||||
|
Start, End, With, RSM, MsgType) ->
|
||||||
|
MS = make_matchspec(LUser, LServer, Start, End, With),
|
||||||
|
Msgs = mnesia:dirty_select(archive_msg, MS),
|
||||||
|
SortedMsgs = lists:keysort(#archive_msg.timestamp, Msgs),
|
||||||
|
{FilteredMsgs, IsComplete} = filter_by_rsm(SortedMsgs, RSM),
|
||||||
|
Count = length(Msgs),
|
||||||
|
{lists:map(
|
||||||
|
fun(Msg) ->
|
||||||
|
{Msg#archive_msg.id,
|
||||||
|
jlib:binary_to_integer(Msg#archive_msg.id),
|
||||||
|
mod_mam:msg_to_el(Msg, MsgType, JidRequestor, JidArchive)}
|
||||||
|
end, FilteredMsgs), IsComplete, Count}.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% Internal functions
|
||||||
|
%%%===================================================================
|
||||||
|
now_to_usec({MSec, Sec, USec}) ->
|
||||||
|
(MSec*1000000 + Sec)*1000000 + USec.
|
||||||
|
|
||||||
|
make_matchspec(LUser, LServer, Start, End, {_, _, <<>>} = With) ->
|
||||||
|
ets:fun2ms(
|
||||||
|
fun(#archive_msg{timestamp = TS,
|
||||||
|
us = US,
|
||||||
|
bare_peer = BPeer} = Msg)
|
||||||
|
when Start =< TS, End >= TS,
|
||||||
|
US == {LUser, LServer},
|
||||||
|
BPeer == With ->
|
||||||
|
Msg
|
||||||
|
end);
|
||||||
|
make_matchspec(LUser, LServer, Start, End, {_, _, _} = With) ->
|
||||||
|
ets:fun2ms(
|
||||||
|
fun(#archive_msg{timestamp = TS,
|
||||||
|
us = US,
|
||||||
|
peer = Peer} = Msg)
|
||||||
|
when Start =< TS, End >= TS,
|
||||||
|
US == {LUser, LServer},
|
||||||
|
Peer == With ->
|
||||||
|
Msg
|
||||||
|
end);
|
||||||
|
make_matchspec(LUser, LServer, Start, End, none) ->
|
||||||
|
ets:fun2ms(
|
||||||
|
fun(#archive_msg{timestamp = TS,
|
||||||
|
us = US,
|
||||||
|
peer = Peer} = Msg)
|
||||||
|
when Start =< TS, End >= TS,
|
||||||
|
US == {LUser, LServer} ->
|
||||||
|
Msg
|
||||||
|
end).
|
||||||
|
|
||||||
|
filter_by_rsm(Msgs, none) ->
|
||||||
|
{Msgs, true};
|
||||||
|
filter_by_rsm(_Msgs, #rsm_in{max = Max}) when Max < 0 ->
|
||||||
|
{[], true};
|
||||||
|
filter_by_rsm(Msgs, #rsm_in{max = Max, direction = Direction, id = ID}) ->
|
||||||
|
NewMsgs = case Direction of
|
||||||
|
aft when ID /= <<"">> ->
|
||||||
|
lists:filter(
|
||||||
|
fun(#archive_msg{id = I}) ->
|
||||||
|
?BIN_GREATER_THAN(I, ID)
|
||||||
|
end, Msgs);
|
||||||
|
before when ID /= <<"">> ->
|
||||||
|
lists:foldl(
|
||||||
|
fun(#archive_msg{id = I} = Msg, Acc)
|
||||||
|
when ?BIN_LESS_THAN(I, ID) ->
|
||||||
|
[Msg|Acc];
|
||||||
|
(_, Acc) ->
|
||||||
|
Acc
|
||||||
|
end, [], Msgs);
|
||||||
|
before when ID == <<"">> ->
|
||||||
|
lists:reverse(Msgs);
|
||||||
|
_ ->
|
||||||
|
Msgs
|
||||||
|
end,
|
||||||
|
filter_by_max(NewMsgs, Max).
|
||||||
|
|
||||||
|
filter_by_max(Msgs, undefined) ->
|
||||||
|
{Msgs, true};
|
||||||
|
filter_by_max(Msgs, Len) when is_integer(Len), Len >= 0 ->
|
||||||
|
{lists:sublist(Msgs, Len), length(Msgs) =< Len};
|
||||||
|
filter_by_max(_Msgs, _Junk) ->
|
||||||
|
{[], true}.
|
|
@ -0,0 +1,309 @@
|
||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
|
||||||
|
%%% @copyright (C) 2016, Evgeny Khramtsov
|
||||||
|
%%% @doc
|
||||||
|
%%%
|
||||||
|
%%% @end
|
||||||
|
%%% Created : 15 Apr 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
|
||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
-module(mod_mam_sql).
|
||||||
|
|
||||||
|
-behaviour(mod_mam).
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([init/2, remove_user/2, remove_room/3, delete_old_messages/3,
|
||||||
|
extended_fields/0, store/7, write_prefs/4, get_prefs/2, select/8]).
|
||||||
|
|
||||||
|
-include_lib("stdlib/include/ms_transform.hrl").
|
||||||
|
-include("jlib.hrl").
|
||||||
|
-include("mod_mam.hrl").
|
||||||
|
-include("logger.hrl").
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% API
|
||||||
|
%%%===================================================================
|
||||||
|
init(_Host, _Opts) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
remove_user(LUser, LServer) ->
|
||||||
|
SUser = ejabberd_odbc:escape(LUser),
|
||||||
|
ejabberd_odbc:sql_query(
|
||||||
|
LServer,
|
||||||
|
[<<"delete from archive where username='">>, SUser, <<"';">>]),
|
||||||
|
ejabberd_odbc:sql_query(
|
||||||
|
LServer,
|
||||||
|
[<<"delete from archive_prefs where username='">>, SUser, <<"';">>]).
|
||||||
|
|
||||||
|
remove_room(LServer, LName, LHost) ->
|
||||||
|
LUser = jid:to_string({LName, LHost, <<>>}),
|
||||||
|
remove_user(LUser, LServer).
|
||||||
|
|
||||||
|
delete_old_messages(ServerHost, TimeStamp, Type) ->
|
||||||
|
TypeClause = if Type == all -> <<"">>;
|
||||||
|
true -> [<<" and kind='">>, jlib:atom_to_binary(Type), <<"'">>]
|
||||||
|
end,
|
||||||
|
TS = integer_to_binary(now_to_usec(TimeStamp)),
|
||||||
|
ejabberd_odbc:sql_query(
|
||||||
|
ServerHost, [<<"delete from archive where timestamp<">>,
|
||||||
|
TS, TypeClause, <<";">>]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
extended_fields() ->
|
||||||
|
[#xmlel{name = <<"field">>,
|
||||||
|
attrs = [{<<"type">>, <<"text-single">>},
|
||||||
|
{<<"var">>, <<"withtext">>}]}].
|
||||||
|
|
||||||
|
store(Pkt, LServer, {LUser, LHost}, Type, Peer, Nick, _Dir) ->
|
||||||
|
TSinteger = p1_time_compat:system_time(micro_seconds),
|
||||||
|
ID = TS = jlib:integer_to_binary(TSinteger),
|
||||||
|
SUser = case Type of
|
||||||
|
chat -> LUser;
|
||||||
|
groupchat -> jid:to_string({LUser, LHost, <<>>})
|
||||||
|
end,
|
||||||
|
BarePeer = jid:to_string(
|
||||||
|
jid:tolower(
|
||||||
|
jid:remove_resource(Peer))),
|
||||||
|
LPeer = jid:to_string(
|
||||||
|
jid:tolower(Peer)),
|
||||||
|
XML = fxml:element_to_binary(Pkt),
|
||||||
|
Body = fxml:get_subtag_cdata(Pkt, <<"body">>),
|
||||||
|
case ejabberd_odbc:sql_query(
|
||||||
|
LServer,
|
||||||
|
[<<"insert into archive (username, timestamp, "
|
||||||
|
"peer, bare_peer, xml, txt, kind, nick) values (">>,
|
||||||
|
<<"'">>, ejabberd_odbc:escape(SUser), <<"', ">>,
|
||||||
|
<<"'">>, TS, <<"', ">>,
|
||||||
|
<<"'">>, ejabberd_odbc:escape(LPeer), <<"', ">>,
|
||||||
|
<<"'">>, ejabberd_odbc:escape(BarePeer), <<"', ">>,
|
||||||
|
<<"'">>, ejabberd_odbc:escape(XML), <<"', ">>,
|
||||||
|
<<"'">>, ejabberd_odbc:escape(Body), <<"', ">>,
|
||||||
|
<<"'">>, jlib:atom_to_binary(Type), <<"', ">>,
|
||||||
|
<<"'">>, ejabberd_odbc:escape(Nick), <<"');">>]) of
|
||||||
|
{updated, _} ->
|
||||||
|
{ok, ID};
|
||||||
|
Err ->
|
||||||
|
Err
|
||||||
|
end.
|
||||||
|
|
||||||
|
write_prefs(LUser, _LServer, #archive_prefs{default = Default,
|
||||||
|
never = Never,
|
||||||
|
always = Always},
|
||||||
|
ServerHost) ->
|
||||||
|
SUser = ejabberd_odbc:escape(LUser),
|
||||||
|
SDefault = erlang:atom_to_binary(Default, utf8),
|
||||||
|
SAlways = ejabberd_odbc:encode_term(Always),
|
||||||
|
SNever = ejabberd_odbc:encode_term(Never),
|
||||||
|
case update(ServerHost, <<"archive_prefs">>,
|
||||||
|
[<<"username">>, <<"def">>, <<"always">>, <<"never">>],
|
||||||
|
[SUser, SDefault, SAlways, SNever],
|
||||||
|
[<<"username='">>, SUser, <<"'">>]) of
|
||||||
|
{updated, _} ->
|
||||||
|
ok;
|
||||||
|
Err ->
|
||||||
|
Err
|
||||||
|
end.
|
||||||
|
|
||||||
|
get_prefs(LUser, LServer) ->
|
||||||
|
case ejabberd_odbc:sql_query(
|
||||||
|
LServer,
|
||||||
|
[<<"select def, always, never from archive_prefs ">>,
|
||||||
|
<<"where username='">>,
|
||||||
|
ejabberd_odbc:escape(LUser), <<"';">>]) of
|
||||||
|
{selected, _, [[SDefault, SAlways, SNever]]} ->
|
||||||
|
Default = erlang:binary_to_existing_atom(SDefault, utf8),
|
||||||
|
Always = ejabberd_odbc:decode_term(SAlways),
|
||||||
|
Never = ejabberd_odbc:decode_term(SNever),
|
||||||
|
{ok, #archive_prefs{us = {LUser, LServer},
|
||||||
|
default = Default,
|
||||||
|
always = Always,
|
||||||
|
never = Never}};
|
||||||
|
_ ->
|
||||||
|
error
|
||||||
|
end.
|
||||||
|
|
||||||
|
select(LServer, JidRequestor, #jid{luser = LUser} = JidArchive,
|
||||||
|
Start, End, With, RSM, MsgType) ->
|
||||||
|
User = case MsgType of
|
||||||
|
chat -> LUser;
|
||||||
|
{groupchat, _Role, _MUCState} -> jid:to_string(JidArchive)
|
||||||
|
end,
|
||||||
|
{Query, CountQuery} = make_sql_query(User, LServer,
|
||||||
|
Start, End, With, RSM),
|
||||||
|
% TODO from XEP-0313 v0.2: "To conserve resources, a server MAY place a
|
||||||
|
% reasonable limit on how many stanzas may be pushed to a client in one
|
||||||
|
% request. If a query returns a number of stanzas greater than this limit
|
||||||
|
% and the client did not specify a limit using RSM then the server should
|
||||||
|
% return a policy-violation error to the client." We currently don't do this
|
||||||
|
% for v0.2 requests, but we do limit #rsm_in.max for v0.3 and newer.
|
||||||
|
case {ejabberd_odbc:sql_query(LServer, Query),
|
||||||
|
ejabberd_odbc:sql_query(LServer, CountQuery)} of
|
||||||
|
{{selected, _, Res}, {selected, _, [[Count]]}} ->
|
||||||
|
{Max, Direction} = case RSM of
|
||||||
|
#rsm_in{max = M, direction = D} -> {M, D};
|
||||||
|
_ -> {undefined, undefined}
|
||||||
|
end,
|
||||||
|
{Res1, IsComplete} =
|
||||||
|
if Max >= 0 andalso Max /= undefined andalso length(Res) > Max ->
|
||||||
|
if Direction == before ->
|
||||||
|
{lists:nthtail(1, Res), false};
|
||||||
|
true ->
|
||||||
|
{lists:sublist(Res, Max), false}
|
||||||
|
end;
|
||||||
|
true ->
|
||||||
|
{Res, true}
|
||||||
|
end,
|
||||||
|
{lists:flatmap(
|
||||||
|
fun([TS, XML, PeerBin, Kind, Nick]) ->
|
||||||
|
try
|
||||||
|
#xmlel{} = El = fxml_stream:parse_element(XML),
|
||||||
|
Now = usec_to_now(jlib:binary_to_integer(TS)),
|
||||||
|
PeerJid = jid:tolower(jid:from_string(PeerBin)),
|
||||||
|
T = case Kind of
|
||||||
|
<<"">> -> chat;
|
||||||
|
null -> chat;
|
||||||
|
_ -> jlib:binary_to_atom(Kind)
|
||||||
|
end,
|
||||||
|
[{TS, jlib:binary_to_integer(TS),
|
||||||
|
mod_mam:msg_to_el(#archive_msg{timestamp = Now,
|
||||||
|
packet = El,
|
||||||
|
type = T,
|
||||||
|
nick = Nick,
|
||||||
|
peer = PeerJid},
|
||||||
|
MsgType, JidRequestor, JidArchive)}]
|
||||||
|
catch _:Err ->
|
||||||
|
?ERROR_MSG("failed to parse data from SQL: ~p. "
|
||||||
|
"The data was: "
|
||||||
|
"timestamp = ~s, xml = ~s, "
|
||||||
|
"peer = ~s, kind = ~s, nick = ~s",
|
||||||
|
[Err, TS, XML, PeerBin, Kind, Nick]),
|
||||||
|
[]
|
||||||
|
end
|
||||||
|
end, Res1), IsComplete, jlib:binary_to_integer(Count)};
|
||||||
|
_ ->
|
||||||
|
{[], false, 0}
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% Internal functions
|
||||||
|
%%%===================================================================
|
||||||
|
now_to_usec({MSec, Sec, USec}) ->
|
||||||
|
(MSec*1000000 + Sec)*1000000 + USec.
|
||||||
|
|
||||||
|
usec_to_now(Int) ->
|
||||||
|
Secs = Int div 1000000,
|
||||||
|
USec = Int rem 1000000,
|
||||||
|
MSec = Secs div 1000000,
|
||||||
|
Sec = Secs rem 1000000,
|
||||||
|
{MSec, Sec, USec}.
|
||||||
|
|
||||||
|
make_sql_query(User, LServer, Start, End, With, RSM) ->
|
||||||
|
{Max, Direction, ID} = case RSM of
|
||||||
|
#rsm_in{} ->
|
||||||
|
{RSM#rsm_in.max,
|
||||||
|
RSM#rsm_in.direction,
|
||||||
|
RSM#rsm_in.id};
|
||||||
|
none ->
|
||||||
|
{none, none, <<>>}
|
||||||
|
end,
|
||||||
|
ODBCType = ejabberd_config:get_option(
|
||||||
|
{odbc_type, LServer},
|
||||||
|
ejabberd_odbc:opt_type(odbc_type)),
|
||||||
|
LimitClause = if is_integer(Max), Max >= 0, ODBCType /= mssql ->
|
||||||
|
[<<" limit ">>, jlib:integer_to_binary(Max+1)];
|
||||||
|
true ->
|
||||||
|
[]
|
||||||
|
end,
|
||||||
|
TopClause = if is_integer(Max), Max >= 0, ODBCType == mssql ->
|
||||||
|
[<<" TOP ">>, jlib:integer_to_binary(Max+1)];
|
||||||
|
true ->
|
||||||
|
[]
|
||||||
|
end,
|
||||||
|
WithClause = case With of
|
||||||
|
{text, <<>>} ->
|
||||||
|
[];
|
||||||
|
{text, Txt} ->
|
||||||
|
[<<" and match (txt) against ('">>,
|
||||||
|
ejabberd_odbc:escape(Txt), <<"')">>];
|
||||||
|
{_, _, <<>>} ->
|
||||||
|
[<<" and bare_peer='">>,
|
||||||
|
ejabberd_odbc:escape(jid:to_string(With)),
|
||||||
|
<<"'">>];
|
||||||
|
{_, _, _} ->
|
||||||
|
[<<" and peer='">>,
|
||||||
|
ejabberd_odbc:escape(jid:to_string(With)),
|
||||||
|
<<"'">>];
|
||||||
|
none ->
|
||||||
|
[]
|
||||||
|
end,
|
||||||
|
PageClause = case catch jlib:binary_to_integer(ID) of
|
||||||
|
I when is_integer(I), I >= 0 ->
|
||||||
|
case Direction of
|
||||||
|
before ->
|
||||||
|
[<<" AND timestamp < ">>, ID];
|
||||||
|
aft ->
|
||||||
|
[<<" AND timestamp > ">>, ID];
|
||||||
|
_ ->
|
||||||
|
[]
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
[]
|
||||||
|
end,
|
||||||
|
StartClause = case Start of
|
||||||
|
{_, _, _} ->
|
||||||
|
[<<" and timestamp >= ">>,
|
||||||
|
jlib:integer_to_binary(now_to_usec(Start))];
|
||||||
|
_ ->
|
||||||
|
[]
|
||||||
|
end,
|
||||||
|
EndClause = case End of
|
||||||
|
{_, _, _} ->
|
||||||
|
[<<" and timestamp <= ">>,
|
||||||
|
jlib:integer_to_binary(now_to_usec(End))];
|
||||||
|
_ ->
|
||||||
|
[]
|
||||||
|
end,
|
||||||
|
SUser = ejabberd_odbc:escape(User),
|
||||||
|
|
||||||
|
Query = [<<"SELECT ">>, TopClause, <<" timestamp, xml, peer, kind, nick"
|
||||||
|
" FROM archive WHERE username='">>,
|
||||||
|
SUser, <<"'">>, WithClause, StartClause, EndClause,
|
||||||
|
PageClause],
|
||||||
|
|
||||||
|
QueryPage =
|
||||||
|
case Direction of
|
||||||
|
before ->
|
||||||
|
% ID can be empty because of
|
||||||
|
% XEP-0059: Result Set Management
|
||||||
|
% 2.5 Requesting the Last Page in a Result Set
|
||||||
|
[<<"SELECT timestamp, xml, peer, kind, nick FROM (">>, Query,
|
||||||
|
<<" ORDER BY timestamp DESC ">>,
|
||||||
|
LimitClause, <<") AS t ORDER BY timestamp ASC;">>];
|
||||||
|
_ ->
|
||||||
|
[Query, <<" ORDER BY timestamp ASC ">>,
|
||||||
|
LimitClause, <<";">>]
|
||||||
|
end,
|
||||||
|
{QueryPage,
|
||||||
|
[<<"SELECT COUNT(*) FROM archive WHERE username='">>,
|
||||||
|
SUser, <<"'">>, WithClause, StartClause, EndClause, <<";">>]}.
|
||||||
|
|
||||||
|
update(LServer, Table, Fields, Vals, Where) ->
|
||||||
|
UPairs = lists:zipwith(fun (A, B) ->
|
||||||
|
<<A/binary, "='", B/binary, "'">>
|
||||||
|
end,
|
||||||
|
Fields, Vals),
|
||||||
|
case ejabberd_odbc:sql_query(LServer,
|
||||||
|
[<<"update ">>, Table, <<" set ">>,
|
||||||
|
join(UPairs, <<", ">>), <<" where ">>, Where,
|
||||||
|
<<";">>])
|
||||||
|
of
|
||||||
|
{updated, 1} -> {updated, 1};
|
||||||
|
_ ->
|
||||||
|
ejabberd_odbc:sql_query(LServer,
|
||||||
|
[<<"insert into ">>, Table, <<"(">>,
|
||||||
|
join(Fields, <<", ">>), <<") values ('">>,
|
||||||
|
join(Vals, <<"', '">>), <<"');">>])
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% Almost a copy of string:join/2.
|
||||||
|
join([], _Sep) -> [];
|
||||||
|
join([H | T], Sep) -> [H, [[Sep, X] || X <- T]].
|
|
@ -154,15 +154,7 @@ forget_room(ServerHost, Host, Name) ->
|
||||||
remove_room_mam(LServer, Host, Name) ->
|
remove_room_mam(LServer, Host, Name) ->
|
||||||
case gen_mod:is_loaded(LServer, mod_mam) of
|
case gen_mod:is_loaded(LServer, mod_mam) of
|
||||||
true ->
|
true ->
|
||||||
U = jid:nodeprep(Name),
|
mod_mam:remove_room(LServer, Name, Host);
|
||||||
S = jid:nameprep(Host),
|
|
||||||
DBType = gen_mod:db_type(LServer, mod_mam),
|
|
||||||
if DBType == odbc ->
|
|
||||||
mod_mam:remove_user(jid:to_string({U, S, <<>>}),
|
|
||||||
LServer, DBType);
|
|
||||||
true ->
|
|
||||||
mod_mam:remove_user(U, S, DBType)
|
|
||||||
end;
|
|
||||||
false ->
|
false ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
Loading…
Reference in New Issue