From 52571e8624b0a48797ca24ec05e47a5c9477a12a Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Fri, 15 Apr 2016 15:11:31 +0300 Subject: [PATCH] Clean mod_mam.erl from DB specific code --- src/mod_mam.erl | 541 ++++++----------------------------------- src/mod_mam_mnesia.erl | 178 ++++++++++++++ src/mod_mam_sql.erl | 309 +++++++++++++++++++++++ src/mod_muc.erl | 10 +- 4 files changed, 561 insertions(+), 477 deletions(-) create mode 100644 src/mod_mam_mnesia.erl create mode 100644 src/mod_mam_sql.erl diff --git a/src/mod_mam.erl b/src/mod_mam.erl index 862adee99..098ee8967 100644 --- a/src/mod_mam.erl +++ b/src/mod_mam.erl @@ -35,41 +35,37 @@ -export([user_send_packet/4, user_receive_packet/5, process_iq_v0_2/3, process_iq_v0_3/3, disco_sm_features/5, - remove_user/2, remove_user/3, mod_opt_type/1, muc_process_iq/4, + remove_user/2, remove_room/3, mod_opt_type/1, muc_process_iq/4, muc_filter_message/5, message_is_archived/5, delete_old_messages/2, - get_commands_spec/0]). + get_commands_spec/0, msg_to_el/4]). --include_lib("stdlib/include/ms_transform.hrl"). -include("jlib.hrl"). -include("logger.hrl"). -include("mod_muc_room.hrl"). -include("ejabberd_commands.hrl"). +-include("mod_mam.hrl"). -define(DEF_PAGE_SIZE, 50). -define(MAX_PAGE_SIZE, 250). --define(BIN_GREATER_THAN(A, B), - ((A > B andalso byte_size(A) == byte_size(B)) - orelse byte_size(A) > byte_size(B))). --define(BIN_LESS_THAN(A, B), - ((A < B andalso byte_size(A) == byte_size(B)) - orelse byte_size(A) < byte_size(B))). - --record(archive_msg, - {us = {<<"">>, <<"">>} :: {binary(), binary()} | '$2', - id = <<>> :: binary() | '_', - timestamp = p1_time_compat:timestamp() :: erlang:timestamp() | '_' | '$1', - peer = {<<"">>, <<"">>, <<"">>} :: ljid() | '_' | '$3' | undefined, - bare_peer = {<<"">>, <<"">>, <<"">>} :: ljid() | '_' | '$3', - packet = #xmlel{} :: xmlel() | '_', - nick = <<"">> :: binary(), - type = chat :: chat | groupchat}). - --record(archive_prefs, - {us = {<<"">>, <<"">>} :: {binary(), binary()}, - default = never :: never | always | roster, - always = [] :: [ljid()], - never = [] :: [ljid()]}). +-callback init(binary(), gen_mod:opts()) -> any(). +-callback remove_user(binary(), binary()) -> any(). +-callback remove_room(binary(), binary(), binary()) -> any(). +-callback delete_old_messages(binary() | global, + erlang:timestamp(), + all | chat | groupchat) -> any(). +-callback extended_fields() -> [xmlel()]. +-callback store(xmlel(), binary(), {binary(), binary()}, chat | groupchat, + jid(), binary(), recv | send) -> {ok, binary()} | any(). +-callback write_prefs(binary(), binary(), #archive_prefs{}, binary()) -> ok | any(). +-callback get_prefs(binary(), binary()) -> {ok, #archive_prefs{}} | error. +-callback select(binary(), jid(), jid(), + none | erlang:timestamp(), + none | erlang:timestamp(), + none | ljid() | {text, binary()}, + none | #rsm_in{}, + chat | groupchat) -> + {[{binary(), non_neg_integer(), xmlel()}], boolean(), non_neg_integer()}. %%%=================================================================== %%% API @@ -77,9 +73,9 @@ start(Host, Opts) -> IQDisc = gen_mod:get_opt(iqdisc, Opts, fun gen_iq_handler:check_type/1, one_queue), - DBType = gen_mod:db_type(Host, Opts), - init_db(DBType, Host), - init_cache(DBType, Opts), + Mod = gen_mod:db_mod(Host, Opts, ?MODULE), + Mod:init(Host, Opts), + init_cache(Opts), gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MAM_TMP, ?MODULE, process_iq_v0_2, IQDisc), gen_iq_handler:add_iq_handler(ejabberd_sm, Host, @@ -120,18 +116,7 @@ start(Host, Opts) -> ejabberd_commands:register_commands(get_commands_spec()), ok. -init_db(mnesia, _Host) -> - mnesia:create_table(archive_msg, - [{disc_only_copies, [node()]}, - {type, bag}, - {attributes, record_info(fields, archive_msg)}]), - mnesia:create_table(archive_prefs, - [{disc_only_copies, [node()]}, - {attributes, record_info(fields, archive_prefs)}]); -init_db(_, _) -> - ok. - -init_cache(_DBType, Opts) -> +init_cache(Opts) -> MaxSize = gen_mod:get_opt(cache_size, Opts, fun(I) when is_integer(I), I>0 -> I end, 1000), @@ -179,24 +164,14 @@ stop(Host) -> remove_user(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), - remove_user(LUser, LServer, - gen_mod:db_type(LServer, ?MODULE)). + Mod = gen_mod:db_mod(LServer, ?MODULE), + Mod:remove_user(LUser, LServer). -remove_user(LUser, LServer, mnesia) -> - US = {LUser, LServer}, - F = fun () -> - mnesia:delete({archive_msg, US}), - mnesia:delete({archive_prefs, US}) - end, - mnesia:transaction(F); -remove_user(LUser, LServer, odbc) -> - SUser = ejabberd_odbc:escape(LUser), - ejabberd_odbc:sql_query( - LServer, - [<<"delete from archive where username='">>, SUser, <<"';">>]), - ejabberd_odbc:sql_query( - LServer, - [<<"delete from archive_prefs where username='">>, SUser, <<"';">>]). +remove_room(LServer, Name, Host) -> + LName = jid:nodeprep(Name), + LHost = jid:nameprep(Host), + Mod = gen_mod:db_mod(LServer, ?MODULE), + Mod:remove_room(LServer, LName, LHost). user_receive_packet(Pkt, C2SState, JID, Peer, To) -> LUser = JID#jid.luser, @@ -343,10 +318,10 @@ message_is_archived(false, C2SState, Peer, if_enabled -> get_prefs(LUser, LServer); on_request -> - DBType = gen_mod:db_type(LServer, ?MODULE), + Mod = gen_mod:db_mod(LServer, ?MODULE), cache_tab:lookup(archive_prefs, {LUser, LServer}, fun() -> - get_prefs(LUser, LServer, DBType) + Mod:get_prefs(LUser, LServer) end); never -> error @@ -365,21 +340,19 @@ delete_old_messages(TypeBin, Days) when TypeBin == <<"chat">>; Diff = Days * 24 * 60 * 60 * 1000000, TimeStamp = usec_to_now(p1_time_compat:system_time(micro_seconds) - Diff), Type = jlib:binary_to_atom(TypeBin), - {Results, _} = - lists:foldl(fun(Host, {Results, MnesiaDone}) -> - case {gen_mod:db_type(Host, ?MODULE), MnesiaDone} of - {mnesia, true} -> - {Results, true}; - {mnesia, false} -> - Res = delete_old_messages(TimeStamp, Type, - global, mnesia), - {[Res|Results], true}; - {DBType, _} -> - Res = delete_old_messages(TimeStamp, Type, - Host, DBType), - {[Res|Results], MnesiaDone} - end - end, {[], false}, ?MYHOSTS), + DBTypes = lists:usort( + lists:map( + fun(Host) -> + case gen_mod:db_type(Host, ?MODULE) of + odbc -> {odbc, Host}; + Other -> {Other, global} + end + end, ?MYHOSTS)), + Results = lists:map( + fun({DBType, ServerHost}) -> + Mod = gen_mod:db_mod(DBType, ?MODULE), + Mod:delete_old_messages(ServerHost, TimeStamp, Type) + end, DBTypes), case lists:filter(fun(Res) -> Res /= ok end, Results) of [] -> ok; [NotOk|_] -> NotOk @@ -387,21 +360,6 @@ delete_old_messages(TypeBin, Days) when TypeBin == <<"chat">>; delete_old_messages(_TypeBin, _Days) -> unsupported_type. -delete_old_messages(TimeStamp, Type, global, mnesia) -> - MS = ets:fun2ms(fun(#archive_msg{timestamp = MsgTS, - type = MsgType} = Msg) - when MsgTS < TimeStamp, - MsgType == Type orelse Type == all -> - Msg - end), - OldMsgs = mnesia:dirty_select(archive_msg, MS), - lists:foreach(fun(Rec) -> - ok = mnesia:dirty_delete_object(Rec) - end, OldMsgs); -delete_old_messages(_TimeStamp, _Type, _Host, _DBType) -> - %% TODO - not_implemented. - %%%=================================================================== %%% Internal functions %%%=================================================================== @@ -427,15 +385,9 @@ process_iq(LServer, #iq{sub_el = #xmlel{attrs = Attrs}} = IQ) -> #xmlel{name = <<"field">>, attrs = [{<<"type">>, <<"text-single">>}, {<<"var">>, <<"end">>}]}], - Fields = case gen_mod:db_type(LServer, ?MODULE) of - odbc -> - WithText = #xmlel{name = <<"field">>, - attrs = [{<<"type">>, <<"text-single">>}, - {<<"var">>, <<"withtext">>}]}, - [WithText|CommonFields]; - _ -> - CommonFields - end, + Mod = gen_mod:db_mod(LServer, ?MODULE), + ExtendedFields = Mod:extended_fields(), + Fields = ExtendedFields ++ CommonFields, Form = #xmlel{name = <<"x">>, attrs = [{<<"xmlns">>, ?NS_XDATA}, {<<"type">>, <<"form">>}], children = Fields}, @@ -715,8 +667,8 @@ store_msg(C2SState, Pkt, LUser, LServer, Peer, Dir) -> case should_archive_peer(C2SState, Prefs, Peer) of true -> US = {LUser, LServer}, - store(Pkt, LServer, US, chat, Peer, <<"">>, Dir, - gen_mod:db_type(LServer, ?MODULE)); + Mod = gen_mod:db_mod(LServer, ?MODULE), + Mod:store(Pkt, LServer, US, chat, Peer, <<"">>, Dir); false -> pass end. @@ -726,101 +678,26 @@ store_muc(MUCState, Pkt, RoomJID, Peer, Nick) -> true -> LServer = MUCState#state.server_host, {U, S, _} = jid:tolower(RoomJID), - store(Pkt, LServer, {U, S}, groupchat, Peer, Nick, recv, - gen_mod:db_type(LServer, ?MODULE)); + Mod = gen_mod:db_mod(LServer, ?MODULE), + Mod:store(Pkt, LServer, {U, S}, groupchat, Peer, Nick, recv); false -> pass end. -store(Pkt, _, {LUser, LServer}, Type, Peer, Nick, _Dir, mnesia) -> - LPeer = {PUser, PServer, _} = jid:tolower(Peer), - TS = p1_time_compat:timestamp(), - ID = jlib:integer_to_binary(now_to_usec(TS)), - case mnesia:dirty_write( - #archive_msg{us = {LUser, LServer}, - id = ID, - timestamp = TS, - peer = LPeer, - bare_peer = {PUser, PServer, <<>>}, - type = Type, - nick = Nick, - packet = Pkt}) of - ok -> - {ok, ID}; - Err -> - Err - end; -store(Pkt, LServer, {LUser, LHost}, Type, Peer, Nick, _Dir, odbc) -> - TSinteger = p1_time_compat:system_time(micro_seconds), - ID = TS = jlib:integer_to_binary(TSinteger), - SUser = case Type of - chat -> LUser; - groupchat -> jid:to_string({LUser, LHost, <<>>}) - end, - BarePeer = jid:to_string( - jid:tolower( - jid:remove_resource(Peer))), - LPeer = jid:to_string( - jid:tolower(Peer)), - XML = fxml:element_to_binary(Pkt), - Body = fxml:get_subtag_cdata(Pkt, <<"body">>), - case ejabberd_odbc:sql_query( - LServer, - [<<"insert into archive (username, timestamp, " - "peer, bare_peer, xml, txt, kind, nick) values (">>, - <<"'">>, ejabberd_odbc:escape(SUser), <<"', ">>, - <<"'">>, TS, <<"', ">>, - <<"'">>, ejabberd_odbc:escape(LPeer), <<"', ">>, - <<"'">>, ejabberd_odbc:escape(BarePeer), <<"', ">>, - <<"'">>, ejabberd_odbc:escape(XML), <<"', ">>, - <<"'">>, ejabberd_odbc:escape(Body), <<"', ">>, - <<"'">>, jlib:atom_to_binary(Type), <<"', ">>, - <<"'">>, ejabberd_odbc:escape(Nick), <<"');">>]) of - {updated, _} -> - {ok, ID}; - Err -> - Err - end. - write_prefs(LUser, LServer, Host, Default, Always, Never) -> - DBType = case gen_mod:db_type(Host, ?MODULE) of - odbc -> {odbc, Host}; - DB -> DB - end, Prefs = #archive_prefs{us = {LUser, LServer}, default = Default, always = Always, never = Never}, + Mod = gen_mod:db_mod(Host, ?MODULE), cache_tab:dirty_insert( archive_prefs, {LUser, LServer}, Prefs, - fun() -> write_prefs(LUser, LServer, Prefs, DBType) end). - -write_prefs(_LUser, _LServer, Prefs, mnesia) -> - mnesia:dirty_write(Prefs); -write_prefs(LUser, _LServer, #archive_prefs{default = Default, - never = Never, - always = Always}, - {odbc, Host}) -> - SUser = ejabberd_odbc:escape(LUser), - SDefault = erlang:atom_to_binary(Default, utf8), - SAlways = ejabberd_odbc:encode_term(Always), - SNever = ejabberd_odbc:encode_term(Never), - case update(Host, <<"archive_prefs">>, - [<<"username">>, <<"def">>, <<"always">>, <<"never">>], - [SUser, SDefault, SAlways, SNever], - [<<"username='">>, SUser, <<"'">>]) of - {updated, _} -> - ok; - Err -> - Err - end. + fun() -> Mod:write_prefs(LUser, LServer, Prefs, Host) end). get_prefs(LUser, LServer) -> - DBType = gen_mod:db_type(LServer, ?MODULE), + Mod = gen_mod:db_mod(LServer, ?MODULE), Res = cache_tab:lookup(archive_prefs, {LUser, LServer}, - fun() -> get_prefs(LUser, LServer, - DBType) - end), + fun() -> Mod:get_prefs(LUser, LServer) end), case Res of {ok, Prefs} -> Prefs; @@ -842,31 +719,6 @@ get_prefs(LUser, LServer) -> end end. -get_prefs(LUser, LServer, mnesia) -> - case mnesia:dirty_read(archive_prefs, {LUser, LServer}) of - [Prefs] -> - {ok, Prefs}; - _ -> - error - end; -get_prefs(LUser, LServer, odbc) -> - case ejabberd_odbc:sql_query( - LServer, - [<<"select def, always, never from archive_prefs ">>, - <<"where username='">>, - ejabberd_odbc:escape(LUser), <<"';">>]) of - {selected, _, [[SDefault, SAlways, SNever]]} -> - Default = erlang:binary_to_existing_atom(SDefault, utf8), - Always = ejabberd_odbc:decode_term(SAlways), - Never = ejabberd_odbc:decode_term(SNever), - {ok, #archive_prefs{us = {LUser, LServer}, - default = Default, - always = Always, - never = Never}}; - _ -> - error - end. - prefs_el(Default, Always, Never, NS) -> Default1 = jlib:atom_to_binary(Default), JFun = fun(L) -> @@ -890,11 +742,10 @@ maybe_activate_mam(LUser, LServer) -> false), case ActivateOpt of true -> + Mod = gen_mod:db_mod(LServer, ?MODULE), Res = cache_tab:lookup(archive_prefs, {LUser, LServer}, fun() -> - get_prefs(LUser, LServer, - gen_mod:db_type(LServer, - ?MODULE)) + Mod:get_prefs(LUser, LServer) end), case Res of {ok, _Prefs} -> @@ -912,31 +763,22 @@ maybe_activate_mam(LUser, LServer) -> end. select_and_send(LServer, From, To, Start, End, With, RSM, IQ, MsgType) -> - DBType = case gen_mod:db_type(LServer, ?MODULE) of - odbc -> {odbc, LServer}; - DB -> DB - end, - select_and_send(LServer, From, To, Start, End, With, RSM, IQ, - MsgType, DBType). - -select_and_send(LServer, From, To, Start, End, With, RSM, IQ, MsgType, DBType) -> {Msgs, IsComplete, Count} = select_and_start(LServer, From, To, Start, End, - With, RSM, MsgType, DBType), + With, RSM, MsgType), SortedMsgs = lists:keysort(2, Msgs), send(From, To, SortedMsgs, RSM, Count, IsComplete, IQ). -select_and_start(LServer, From, To, Start, End, With, RSM, MsgType, DBType) -> +select_and_start(LServer, From, To, Start, End, With, RSM, MsgType) -> case MsgType of chat -> - select(LServer, From, From, Start, End, With, RSM, MsgType, DBType); + select(LServer, From, From, Start, End, With, RSM, MsgType); {groupchat, _Role, _MUCState} -> - select(LServer, From, To, Start, End, With, RSM, MsgType, DBType) + select(LServer, From, To, Start, End, With, RSM, MsgType) end. select(_LServer, JidRequestor, JidArchive, Start, End, _With, RSM, {groupchat, _Role, #state{config = #config{mam = false}, - history = History}} = MsgType, - _DBType) -> + history = History}} = MsgType) -> #lqueue{len = L, queue = Q} = History, {Msgs0, _} = lists:mapfoldl( @@ -970,81 +812,9 @@ select(_LServer, JidRequestor, JidArchive, Start, End, _With, RSM, _ -> {Msgs, true, L} end; -select(_LServer, JidRequestor, - #jid{luser = LUser, lserver = LServer} = JidArchive, - Start, End, With, RSM, MsgType, mnesia) -> - MS = make_matchspec(LUser, LServer, Start, End, With), - Msgs = mnesia:dirty_select(archive_msg, MS), - SortedMsgs = lists:keysort(#archive_msg.timestamp, Msgs), - {FilteredMsgs, IsComplete} = filter_by_rsm(SortedMsgs, RSM), - Count = length(Msgs), - {lists:map( - fun(Msg) -> - {Msg#archive_msg.id, - jlib:binary_to_integer(Msg#archive_msg.id), - msg_to_el(Msg, MsgType, JidRequestor, JidArchive)} - end, FilteredMsgs), IsComplete, Count}; -select(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, - Start, End, With, RSM, MsgType, {odbc, Host}) -> - User = case MsgType of - chat -> LUser; - {groupchat, _Role, _MUCState} -> jid:to_string(JidArchive) - end, - {Query, CountQuery} = make_sql_query(User, LServer, - Start, End, With, RSM), - % TODO from XEP-0313 v0.2: "To conserve resources, a server MAY place a - % reasonable limit on how many stanzas may be pushed to a client in one - % request. If a query returns a number of stanzas greater than this limit - % and the client did not specify a limit using RSM then the server should - % return a policy-violation error to the client." We currently don't do this - % for v0.2 requests, but we do limit #rsm_in.max for v0.3 and newer. - case {ejabberd_odbc:sql_query(Host, Query), - ejabberd_odbc:sql_query(Host, CountQuery)} of - {{selected, _, Res}, {selected, _, [[Count]]}} -> - {Max, Direction} = case RSM of - #rsm_in{max = M, direction = D} -> {M, D}; - _ -> {undefined, undefined} - end, - {Res1, IsComplete} = - if Max >= 0 andalso Max /= undefined andalso length(Res) > Max -> - if Direction == before -> - {lists:nthtail(1, Res), false}; - true -> - {lists:sublist(Res, Max), false} - end; - true -> - {Res, true} - end, - {lists:flatmap( - fun([TS, XML, PeerBin, Kind, Nick]) -> - try - #xmlel{} = El = fxml_stream:parse_element(XML), - Now = usec_to_now(jlib:binary_to_integer(TS)), - PeerJid = jid:tolower(jid:from_string(PeerBin)), - T = case Kind of - <<"">> -> chat; - null -> chat; - _ -> jlib:binary_to_atom(Kind) - end, - [{TS, jlib:binary_to_integer(TS), - msg_to_el(#archive_msg{timestamp = Now, - packet = El, - type = T, - nick = Nick, - peer = PeerJid}, - MsgType, JidRequestor, JidArchive)}] - catch _:Err -> - ?ERROR_MSG("failed to parse data from SQL: ~p. " - "The data was: " - "timestamp = ~s, xml = ~s, " - "peer = ~s, kind = ~s, nick = ~s", - [Err, TS, XML, PeerBin, Kind, Nick]), - [] - end - end, Res1), IsComplete, jlib:binary_to_integer(Count)}; - _ -> - {[], false, 0} - end. +select(LServer, From, From, Start, End, With, RSM, MsgType) -> + Mod = gen_mod:db_mod(LServer, ?MODULE), + Mod:select(LServer, From, From, Start, End, With, RSM, MsgType). msg_to_el(#archive_msg{timestamp = TS, packet = Pkt1, nick = Nick, peer = Peer}, MsgType, JidRequestor, #jid{lserver = LServer} = JidArchive) -> @@ -1160,7 +930,6 @@ send(From, To, Msgs, RSM, Count, IsComplete, #iq{sub_el = SubEl} = IQ) -> ignore end. - make_rsm_out([], _, Count, Attrs, NS) -> Tag = if NS == ?NS_MAM_TMP -> <<"query">>; true -> <<"fin">> @@ -1177,32 +946,6 @@ make_rsm_out([{FirstID, _, _}|_] = Msgs, _, Count, Attrs, NS) -> #rsm_out{first = FirstID, count = Count, last = LastID})}]. -filter_by_rsm(Msgs, none) -> - {Msgs, true}; -filter_by_rsm(_Msgs, #rsm_in{max = Max}) when Max < 0 -> - {[], true}; -filter_by_rsm(Msgs, #rsm_in{max = Max, direction = Direction, id = ID}) -> - NewMsgs = case Direction of - aft when ID /= <<"">> -> - lists:filter( - fun(#archive_msg{id = I}) -> - ?BIN_GREATER_THAN(I, ID) - end, Msgs); - before when ID /= <<"">> -> - lists:foldl( - fun(#archive_msg{id = I} = Msg, Acc) - when ?BIN_LESS_THAN(I, ID) -> - [Msg|Acc]; - (_, Acc) -> - Acc - end, [], Msgs); - before when ID == <<"">> -> - lists:reverse(Msgs); - _ -> - Msgs - end, - filter_by_max(NewMsgs, Max). - filter_by_max(Msgs, undefined) -> {Msgs, true}; filter_by_max(Msgs, Len) when is_integer(Len), Len >= 0 -> @@ -1231,126 +974,6 @@ match_rsm(Now, #rsm_in{id = ID, direction = before}) when ID /= <<"">> -> match_rsm(_Now, _) -> true. -make_matchspec(LUser, LServer, Start, End, {_, _, <<>>} = With) -> - ets:fun2ms( - fun(#archive_msg{timestamp = TS, - us = US, - bare_peer = BPeer} = Msg) - when Start =< TS, End >= TS, - US == {LUser, LServer}, - BPeer == With -> - Msg - end); -make_matchspec(LUser, LServer, Start, End, {_, _, _} = With) -> - ets:fun2ms( - fun(#archive_msg{timestamp = TS, - us = US, - peer = Peer} = Msg) - when Start =< TS, End >= TS, - US == {LUser, LServer}, - Peer == With -> - Msg - end); -make_matchspec(LUser, LServer, Start, End, none) -> - ets:fun2ms( - fun(#archive_msg{timestamp = TS, - us = US, - peer = Peer} = Msg) - when Start =< TS, End >= TS, - US == {LUser, LServer} -> - Msg - end). - -make_sql_query(User, LServer, Start, End, With, RSM) -> - {Max, Direction, ID} = case RSM of - #rsm_in{} -> - {RSM#rsm_in.max, - RSM#rsm_in.direction, - RSM#rsm_in.id}; - none -> - {none, none, <<>>} - end, - ODBCType = ejabberd_config:get_option( - {odbc_type, LServer}, - ejabberd_odbc:opt_type(odbc_type)), - LimitClause = if is_integer(Max), Max >= 0, ODBCType /= mssql -> - [<<" limit ">>, jlib:integer_to_binary(Max+1)]; - true -> - [] - end, - TopClause = if is_integer(Max), Max >= 0, ODBCType == mssql -> - [<<" TOP ">>, jlib:integer_to_binary(Max+1)]; - true -> - [] - end, - WithClause = case With of - {text, <<>>} -> - []; - {text, Txt} -> - [<<" and match (txt) against ('">>, - ejabberd_odbc:escape(Txt), <<"')">>]; - {_, _, <<>>} -> - [<<" and bare_peer='">>, - ejabberd_odbc:escape(jid:to_string(With)), - <<"'">>]; - {_, _, _} -> - [<<" and peer='">>, - ejabberd_odbc:escape(jid:to_string(With)), - <<"'">>]; - none -> - [] - end, - PageClause = case catch jlib:binary_to_integer(ID) of - I when is_integer(I), I >= 0 -> - case Direction of - before -> - [<<" AND timestamp < ">>, ID]; - aft -> - [<<" AND timestamp > ">>, ID]; - _ -> - [] - end; - _ -> - [] - end, - StartClause = case Start of - {_, _, _} -> - [<<" and timestamp >= ">>, - jlib:integer_to_binary(now_to_usec(Start))]; - _ -> - [] - end, - EndClause = case End of - {_, _, _} -> - [<<" and timestamp <= ">>, - jlib:integer_to_binary(now_to_usec(End))]; - _ -> - [] - end, - SUser = ejabberd_odbc:escape(User), - - Query = [<<"SELECT ">>, TopClause, <<" timestamp, xml, peer, kind, nick" - " FROM archive WHERE username='">>, - SUser, <<"'">>, WithClause, StartClause, EndClause, - PageClause], - - QueryPage = - case Direction of - before -> - % ID can be empty because of - % XEP-0059: Result Set Management - % 2.5 Requesting the Last Page in a Result Set - [<<"SELECT timestamp, xml, peer, kind, nick FROM (">>, Query, - <<" ORDER BY timestamp DESC ">>, - LimitClause, <<") AS t ORDER BY timestamp ASC;">>]; - _ -> - [Query, <<" ORDER BY timestamp ASC ">>, - LimitClause, <<";">>] - end, - {QueryPage, - [<<"SELECT COUNT(*) FROM archive WHERE username='">>, - SUser, <<"'">>, WithClause, StartClause, EndClause, <<";">>]}. - now_to_usec({MSec, Sec, USec}) -> (MSec*1000000 + Sec)*1000000 + USec. @@ -1376,28 +999,6 @@ get_jids(Els) -> [] end, Els). -update(LServer, Table, Fields, Vals, Where) -> - UPairs = lists:zipwith(fun (A, B) -> - <> - end, - Fields, Vals), - case ejabberd_odbc:sql_query(LServer, - [<<"update ">>, Table, <<" set ">>, - join(UPairs, <<", ">>), <<" where ">>, Where, - <<";">>]) - of - {updated, 1} -> {updated, 1}; - _ -> - ejabberd_odbc:sql_query(LServer, - [<<"insert into ">>, Table, <<"(">>, - join(Fields, <<", ">>), <<") values ('">>, - join(Vals, <<"', '">>), <<"');">>]) - end. - -%% Almost a copy of string:join/2. -join([], _Sep) -> []; -join([H | T], Sep) -> [H, [[Sep, X] || X <- T]]. - get_commands_spec() -> [#ejabberd_commands{name = delete_old_mam_messages, tags = [purge], desc = "Delete MAM messages older than DAYS", @@ -1416,7 +1017,11 @@ mod_opt_type(cache_life_time) -> fun (I) when is_integer(I), I > 0 -> I end; mod_opt_type(cache_size) -> fun (I) when is_integer(I), I > 0 -> I end; -mod_opt_type(db_type) -> fun gen_mod:v_db/1; +mod_opt_type(db_type) -> + fun(odbc) -> odbc; + (internal) -> mnesia; + (mnesia) -> mnesia + end; mod_opt_type(default) -> fun (always) -> always; (never) -> never; diff --git a/src/mod_mam_mnesia.erl b/src/mod_mam_mnesia.erl new file mode 100644 index 000000000..007ef5eba --- /dev/null +++ b/src/mod_mam_mnesia.erl @@ -0,0 +1,178 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov +%%% @copyright (C) 2016, Evgeny Khramtsov +%%% @doc +%%% +%%% @end +%%% Created : 15 Apr 2016 by Evgeny Khramtsov +%%%------------------------------------------------------------------- +-module(mod_mam_mnesia). + +-behaviour(mod_mam). + +%% API +-export([init/2, remove_user/2, remove_room/3, delete_old_messages/3, + extended_fields/0, store/7, write_prefs/4, get_prefs/2, select/8]). + +-include_lib("stdlib/include/ms_transform.hrl"). +-include("jlib.hrl"). +-include("mod_mam.hrl"). + +-define(BIN_GREATER_THAN(A, B), + ((A > B andalso byte_size(A) == byte_size(B)) + orelse byte_size(A) > byte_size(B))). +-define(BIN_LESS_THAN(A, B), + ((A < B andalso byte_size(A) == byte_size(B)) + orelse byte_size(A) < byte_size(B))). + +%%%=================================================================== +%%% API +%%%=================================================================== +init(_Host, _Opts) -> + mnesia:create_table(archive_msg, + [{disc_only_copies, [node()]}, + {type, bag}, + {attributes, record_info(fields, archive_msg)}]), + mnesia:create_table(archive_prefs, + [{disc_only_copies, [node()]}, + {attributes, record_info(fields, archive_prefs)}]). + +remove_user(LUser, LServer) -> + US = {LUser, LServer}, + F = fun () -> + mnesia:delete({archive_msg, US}), + mnesia:delete({archive_prefs, US}) + end, + mnesia:transaction(F). + +remove_room(_LServer, LName, LHost) -> + remove_user(LName, LHost). + +delete_old_messages(global, TimeStamp, Type) -> + MS = ets:fun2ms(fun(#archive_msg{timestamp = MsgTS, + type = MsgType} = Msg) + when MsgTS < TimeStamp, + MsgType == Type orelse Type == all -> + Msg + end), + OldMsgs = mnesia:dirty_select(archive_msg, MS), + lists:foreach(fun(Rec) -> + ok = mnesia:dirty_delete_object(Rec) + end, OldMsgs). + +extended_fields() -> + []. + +store(Pkt, _, {LUser, LServer}, Type, Peer, Nick, _Dir) -> + LPeer = {PUser, PServer, _} = jid:tolower(Peer), + TS = p1_time_compat:timestamp(), + ID = jlib:integer_to_binary(now_to_usec(TS)), + case mnesia:dirty_write( + #archive_msg{us = {LUser, LServer}, + id = ID, + timestamp = TS, + peer = LPeer, + bare_peer = {PUser, PServer, <<>>}, + type = Type, + nick = Nick, + packet = Pkt}) of + ok -> + {ok, ID}; + Err -> + Err + end. + +write_prefs(_LUser, _LServer, Prefs, _ServerHost) -> + mnesia:dirty_write(Prefs). + +get_prefs(LUser, LServer) -> + case mnesia:dirty_read(archive_prefs, {LUser, LServer}) of + [Prefs] -> + {ok, Prefs}; + _ -> + error + end. + +select(_LServer, JidRequestor, + #jid{luser = LUser, lserver = LServer} = JidArchive, + Start, End, With, RSM, MsgType) -> + MS = make_matchspec(LUser, LServer, Start, End, With), + Msgs = mnesia:dirty_select(archive_msg, MS), + SortedMsgs = lists:keysort(#archive_msg.timestamp, Msgs), + {FilteredMsgs, IsComplete} = filter_by_rsm(SortedMsgs, RSM), + Count = length(Msgs), + {lists:map( + fun(Msg) -> + {Msg#archive_msg.id, + jlib:binary_to_integer(Msg#archive_msg.id), + mod_mam:msg_to_el(Msg, MsgType, JidRequestor, JidArchive)} + end, FilteredMsgs), IsComplete, Count}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +now_to_usec({MSec, Sec, USec}) -> + (MSec*1000000 + Sec)*1000000 + USec. + +make_matchspec(LUser, LServer, Start, End, {_, _, <<>>} = With) -> + ets:fun2ms( + fun(#archive_msg{timestamp = TS, + us = US, + bare_peer = BPeer} = Msg) + when Start =< TS, End >= TS, + US == {LUser, LServer}, + BPeer == With -> + Msg + end); +make_matchspec(LUser, LServer, Start, End, {_, _, _} = With) -> + ets:fun2ms( + fun(#archive_msg{timestamp = TS, + us = US, + peer = Peer} = Msg) + when Start =< TS, End >= TS, + US == {LUser, LServer}, + Peer == With -> + Msg + end); +make_matchspec(LUser, LServer, Start, End, none) -> + ets:fun2ms( + fun(#archive_msg{timestamp = TS, + us = US, + peer = Peer} = Msg) + when Start =< TS, End >= TS, + US == {LUser, LServer} -> + Msg + end). + +filter_by_rsm(Msgs, none) -> + {Msgs, true}; +filter_by_rsm(_Msgs, #rsm_in{max = Max}) when Max < 0 -> + {[], true}; +filter_by_rsm(Msgs, #rsm_in{max = Max, direction = Direction, id = ID}) -> + NewMsgs = case Direction of + aft when ID /= <<"">> -> + lists:filter( + fun(#archive_msg{id = I}) -> + ?BIN_GREATER_THAN(I, ID) + end, Msgs); + before when ID /= <<"">> -> + lists:foldl( + fun(#archive_msg{id = I} = Msg, Acc) + when ?BIN_LESS_THAN(I, ID) -> + [Msg|Acc]; + (_, Acc) -> + Acc + end, [], Msgs); + before when ID == <<"">> -> + lists:reverse(Msgs); + _ -> + Msgs + end, + filter_by_max(NewMsgs, Max). + +filter_by_max(Msgs, undefined) -> + {Msgs, true}; +filter_by_max(Msgs, Len) when is_integer(Len), Len >= 0 -> + {lists:sublist(Msgs, Len), length(Msgs) =< Len}; +filter_by_max(_Msgs, _Junk) -> + {[], true}. diff --git a/src/mod_mam_sql.erl b/src/mod_mam_sql.erl new file mode 100644 index 000000000..1f24de317 --- /dev/null +++ b/src/mod_mam_sql.erl @@ -0,0 +1,309 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov +%%% @copyright (C) 2016, Evgeny Khramtsov +%%% @doc +%%% +%%% @end +%%% Created : 15 Apr 2016 by Evgeny Khramtsov +%%%------------------------------------------------------------------- +-module(mod_mam_sql). + +-behaviour(mod_mam). + +%% API +-export([init/2, remove_user/2, remove_room/3, delete_old_messages/3, + extended_fields/0, store/7, write_prefs/4, get_prefs/2, select/8]). + +-include_lib("stdlib/include/ms_transform.hrl"). +-include("jlib.hrl"). +-include("mod_mam.hrl"). +-include("logger.hrl"). + +%%%=================================================================== +%%% API +%%%=================================================================== +init(_Host, _Opts) -> + ok. + +remove_user(LUser, LServer) -> + SUser = ejabberd_odbc:escape(LUser), + ejabberd_odbc:sql_query( + LServer, + [<<"delete from archive where username='">>, SUser, <<"';">>]), + ejabberd_odbc:sql_query( + LServer, + [<<"delete from archive_prefs where username='">>, SUser, <<"';">>]). + +remove_room(LServer, LName, LHost) -> + LUser = jid:to_string({LName, LHost, <<>>}), + remove_user(LUser, LServer). + +delete_old_messages(ServerHost, TimeStamp, Type) -> + TypeClause = if Type == all -> <<"">>; + true -> [<<" and kind='">>, jlib:atom_to_binary(Type), <<"'">>] + end, + TS = integer_to_binary(now_to_usec(TimeStamp)), + ejabberd_odbc:sql_query( + ServerHost, [<<"delete from archive where timestamp<">>, + TS, TypeClause, <<";">>]), + ok. + +extended_fields() -> + [#xmlel{name = <<"field">>, + attrs = [{<<"type">>, <<"text-single">>}, + {<<"var">>, <<"withtext">>}]}]. + +store(Pkt, LServer, {LUser, LHost}, Type, Peer, Nick, _Dir) -> + TSinteger = p1_time_compat:system_time(micro_seconds), + ID = TS = jlib:integer_to_binary(TSinteger), + SUser = case Type of + chat -> LUser; + groupchat -> jid:to_string({LUser, LHost, <<>>}) + end, + BarePeer = jid:to_string( + jid:tolower( + jid:remove_resource(Peer))), + LPeer = jid:to_string( + jid:tolower(Peer)), + XML = fxml:element_to_binary(Pkt), + Body = fxml:get_subtag_cdata(Pkt, <<"body">>), + case ejabberd_odbc:sql_query( + LServer, + [<<"insert into archive (username, timestamp, " + "peer, bare_peer, xml, txt, kind, nick) values (">>, + <<"'">>, ejabberd_odbc:escape(SUser), <<"', ">>, + <<"'">>, TS, <<"', ">>, + <<"'">>, ejabberd_odbc:escape(LPeer), <<"', ">>, + <<"'">>, ejabberd_odbc:escape(BarePeer), <<"', ">>, + <<"'">>, ejabberd_odbc:escape(XML), <<"', ">>, + <<"'">>, ejabberd_odbc:escape(Body), <<"', ">>, + <<"'">>, jlib:atom_to_binary(Type), <<"', ">>, + <<"'">>, ejabberd_odbc:escape(Nick), <<"');">>]) of + {updated, _} -> + {ok, ID}; + Err -> + Err + end. + +write_prefs(LUser, _LServer, #archive_prefs{default = Default, + never = Never, + always = Always}, + ServerHost) -> + SUser = ejabberd_odbc:escape(LUser), + SDefault = erlang:atom_to_binary(Default, utf8), + SAlways = ejabberd_odbc:encode_term(Always), + SNever = ejabberd_odbc:encode_term(Never), + case update(ServerHost, <<"archive_prefs">>, + [<<"username">>, <<"def">>, <<"always">>, <<"never">>], + [SUser, SDefault, SAlways, SNever], + [<<"username='">>, SUser, <<"'">>]) of + {updated, _} -> + ok; + Err -> + Err + end. + +get_prefs(LUser, LServer) -> + case ejabberd_odbc:sql_query( + LServer, + [<<"select def, always, never from archive_prefs ">>, + <<"where username='">>, + ejabberd_odbc:escape(LUser), <<"';">>]) of + {selected, _, [[SDefault, SAlways, SNever]]} -> + Default = erlang:binary_to_existing_atom(SDefault, utf8), + Always = ejabberd_odbc:decode_term(SAlways), + Never = ejabberd_odbc:decode_term(SNever), + {ok, #archive_prefs{us = {LUser, LServer}, + default = Default, + always = Always, + never = Never}}; + _ -> + error + end. + +select(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, + Start, End, With, RSM, MsgType) -> + User = case MsgType of + chat -> LUser; + {groupchat, _Role, _MUCState} -> jid:to_string(JidArchive) + end, + {Query, CountQuery} = make_sql_query(User, LServer, + Start, End, With, RSM), + % TODO from XEP-0313 v0.2: "To conserve resources, a server MAY place a + % reasonable limit on how many stanzas may be pushed to a client in one + % request. If a query returns a number of stanzas greater than this limit + % and the client did not specify a limit using RSM then the server should + % return a policy-violation error to the client." We currently don't do this + % for v0.2 requests, but we do limit #rsm_in.max for v0.3 and newer. + case {ejabberd_odbc:sql_query(LServer, Query), + ejabberd_odbc:sql_query(LServer, CountQuery)} of + {{selected, _, Res}, {selected, _, [[Count]]}} -> + {Max, Direction} = case RSM of + #rsm_in{max = M, direction = D} -> {M, D}; + _ -> {undefined, undefined} + end, + {Res1, IsComplete} = + if Max >= 0 andalso Max /= undefined andalso length(Res) > Max -> + if Direction == before -> + {lists:nthtail(1, Res), false}; + true -> + {lists:sublist(Res, Max), false} + end; + true -> + {Res, true} + end, + {lists:flatmap( + fun([TS, XML, PeerBin, Kind, Nick]) -> + try + #xmlel{} = El = fxml_stream:parse_element(XML), + Now = usec_to_now(jlib:binary_to_integer(TS)), + PeerJid = jid:tolower(jid:from_string(PeerBin)), + T = case Kind of + <<"">> -> chat; + null -> chat; + _ -> jlib:binary_to_atom(Kind) + end, + [{TS, jlib:binary_to_integer(TS), + mod_mam:msg_to_el(#archive_msg{timestamp = Now, + packet = El, + type = T, + nick = Nick, + peer = PeerJid}, + MsgType, JidRequestor, JidArchive)}] + catch _:Err -> + ?ERROR_MSG("failed to parse data from SQL: ~p. " + "The data was: " + "timestamp = ~s, xml = ~s, " + "peer = ~s, kind = ~s, nick = ~s", + [Err, TS, XML, PeerBin, Kind, Nick]), + [] + end + end, Res1), IsComplete, jlib:binary_to_integer(Count)}; + _ -> + {[], false, 0} + end. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +now_to_usec({MSec, Sec, USec}) -> + (MSec*1000000 + Sec)*1000000 + USec. + +usec_to_now(Int) -> + Secs = Int div 1000000, + USec = Int rem 1000000, + MSec = Secs div 1000000, + Sec = Secs rem 1000000, + {MSec, Sec, USec}. + +make_sql_query(User, LServer, Start, End, With, RSM) -> + {Max, Direction, ID} = case RSM of + #rsm_in{} -> + {RSM#rsm_in.max, + RSM#rsm_in.direction, + RSM#rsm_in.id}; + none -> + {none, none, <<>>} + end, + ODBCType = ejabberd_config:get_option( + {odbc_type, LServer}, + ejabberd_odbc:opt_type(odbc_type)), + LimitClause = if is_integer(Max), Max >= 0, ODBCType /= mssql -> + [<<" limit ">>, jlib:integer_to_binary(Max+1)]; + true -> + [] + end, + TopClause = if is_integer(Max), Max >= 0, ODBCType == mssql -> + [<<" TOP ">>, jlib:integer_to_binary(Max+1)]; + true -> + [] + end, + WithClause = case With of + {text, <<>>} -> + []; + {text, Txt} -> + [<<" and match (txt) against ('">>, + ejabberd_odbc:escape(Txt), <<"')">>]; + {_, _, <<>>} -> + [<<" and bare_peer='">>, + ejabberd_odbc:escape(jid:to_string(With)), + <<"'">>]; + {_, _, _} -> + [<<" and peer='">>, + ejabberd_odbc:escape(jid:to_string(With)), + <<"'">>]; + none -> + [] + end, + PageClause = case catch jlib:binary_to_integer(ID) of + I when is_integer(I), I >= 0 -> + case Direction of + before -> + [<<" AND timestamp < ">>, ID]; + aft -> + [<<" AND timestamp > ">>, ID]; + _ -> + [] + end; + _ -> + [] + end, + StartClause = case Start of + {_, _, _} -> + [<<" and timestamp >= ">>, + jlib:integer_to_binary(now_to_usec(Start))]; + _ -> + [] + end, + EndClause = case End of + {_, _, _} -> + [<<" and timestamp <= ">>, + jlib:integer_to_binary(now_to_usec(End))]; + _ -> + [] + end, + SUser = ejabberd_odbc:escape(User), + + Query = [<<"SELECT ">>, TopClause, <<" timestamp, xml, peer, kind, nick" + " FROM archive WHERE username='">>, + SUser, <<"'">>, WithClause, StartClause, EndClause, + PageClause], + + QueryPage = + case Direction of + before -> + % ID can be empty because of + % XEP-0059: Result Set Management + % 2.5 Requesting the Last Page in a Result Set + [<<"SELECT timestamp, xml, peer, kind, nick FROM (">>, Query, + <<" ORDER BY timestamp DESC ">>, + LimitClause, <<") AS t ORDER BY timestamp ASC;">>]; + _ -> + [Query, <<" ORDER BY timestamp ASC ">>, + LimitClause, <<";">>] + end, + {QueryPage, + [<<"SELECT COUNT(*) FROM archive WHERE username='">>, + SUser, <<"'">>, WithClause, StartClause, EndClause, <<";">>]}. + +update(LServer, Table, Fields, Vals, Where) -> + UPairs = lists:zipwith(fun (A, B) -> + <> + end, + Fields, Vals), + case ejabberd_odbc:sql_query(LServer, + [<<"update ">>, Table, <<" set ">>, + join(UPairs, <<", ">>), <<" where ">>, Where, + <<";">>]) + of + {updated, 1} -> {updated, 1}; + _ -> + ejabberd_odbc:sql_query(LServer, + [<<"insert into ">>, Table, <<"(">>, + join(Fields, <<", ">>), <<") values ('">>, + join(Vals, <<"', '">>), <<"');">>]) + end. + +%% Almost a copy of string:join/2. +join([], _Sep) -> []; +join([H | T], Sep) -> [H, [[Sep, X] || X <- T]]. diff --git a/src/mod_muc.erl b/src/mod_muc.erl index 95631e25c..6aa186318 100644 --- a/src/mod_muc.erl +++ b/src/mod_muc.erl @@ -154,15 +154,7 @@ forget_room(ServerHost, Host, Name) -> remove_room_mam(LServer, Host, Name) -> case gen_mod:is_loaded(LServer, mod_mam) of true -> - U = jid:nodeprep(Name), - S = jid:nameprep(Host), - DBType = gen_mod:db_type(LServer, mod_mam), - if DBType == odbc -> - mod_mam:remove_user(jid:to_string({U, S, <<>>}), - LServer, DBType); - true -> - mod_mam:remove_user(U, S, DBType) - end; + mod_mam:remove_room(LServer, Name, Host); false -> ok end.