diff --git a/src/ejabberd_admin.erl b/src/ejabberd_admin.erl index cbad89a9c..ccd72a61f 100644 --- a/src/ejabberd_admin.erl +++ b/src/ejabberd_admin.erl @@ -715,13 +715,26 @@ delete_old_messages(Days) -> delete_old_messages_batch(Server, Days, BatchSize, Rate) -> LServer = jid:nameprep(Server), Mod = gen_mod:db_mod(LServer, mod_offline), - case ejabberd_batch:register_task({spool, LServer}, 0, Rate, {LServer, Days, BatchSize}, - fun({L, Da, B} = S) -> - case Mod:remove_old_messages_batch(L, Da, B) of - {ok, Count} -> - {ok, S, Count}; - {error, _} = E -> - E + case ejabberd_batch:register_task({spool, LServer}, 0, Rate, {LServer, Days, BatchSize, none}, + fun({L, Da, B, IS} = S) -> + case {erlang:function_exported(Mod, remove_old_messages_batch, 3), + erlang:function_exported(Mod, remove_old_messages_batch, 4)} of + {true, _} -> + case Mod:remove_old_messages_batch(L, Da, B) of + {ok, Count} -> + {ok, S, Count}; + {error, _} = E -> + E + end; + {_, true} -> + case Mod:remove_old_messages_batch(L, Da, B, IS) of + {ok, IS2, Count} -> + {ok, {L, Da, B, IS2}, Count}; + {error, _} = E -> + E + end; + _ -> + {error, not_implemented_for_backend} end end) of ok -> diff --git a/src/mod_mam.erl b/src/mod_mam.erl index fd292736b..0288e2d52 100644 --- a/src/mod_mam.erl +++ b/src/mod_mam.erl @@ -578,13 +578,27 @@ delete_old_messages_batch(Server, Type, Days, BatchSize, Rate) when Type == <<"c TypeA = misc:binary_to_atom(Type), LServer = jid:nameprep(Server), Mod = gen_mod:db_mod(LServer, ?MODULE), - case ejabberd_batch:register_task({mam, LServer}, 0, Rate, {LServer, TypeA, TimeStamp, BatchSize}, - fun({L, T, St, B} = S) -> - case Mod:delete_old_messages_batch(L, St, T, B) of - {ok, Count} -> - {ok, S, Count}; - {error, _} = E -> - E + + case ejabberd_batch:register_task({mam, LServer}, 0, Rate, {LServer, TypeA, TimeStamp, BatchSize, none}, + fun({L, T, St, B, IS} = S) -> + case {erlang:function_exported(Mod, remove_old_messages_batch, 4), + erlang:function_exported(Mod, remove_old_messages_batch, 5)} of + {true, _} -> + case Mod:delete_old_messages_batch(L, St, T, B) of + {ok, Count} -> + {ok, S, Count}; + {error, _} = E -> + E + end; + {_, true} -> + case Mod:remove_old_messages_batch(L, St, T, B, IS) of + {ok, IS2, Count} -> + {ok, {L, St, T, B, IS2}, Count}; + {error, _} = E -> + E + end; + _ -> + {error, not_implemented_for_backend} end end) of ok -> diff --git a/src/mod_mam_mnesia.erl b/src/mod_mam_mnesia.erl index dc5898fca..0fd459c67 100644 --- a/src/mod_mam_mnesia.erl +++ b/src/mod_mam_mnesia.erl @@ -29,7 +29,7 @@ %% API -export([init/2, remove_user/2, remove_room/3, delete_old_messages/3, extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/6, remove_from_archive/3, - is_empty_for_user/2, is_empty_for_room/3]). + is_empty_for_user/2, is_empty_for_room/3, remove_old_messages_batch/5]). -include_lib("stdlib/include/ms_transform.hrl"). -include_lib("xmpp/include/xmpp.hrl"). @@ -131,6 +131,43 @@ delete_old_user_messages(User, TimeStamp, Type) -> Err end. +delete_batch('$end_of_table', _LServer, _TS, _Type, Num) -> + {Num, '$end_of_table'}; +delete_batch(LastUS, _LServer, _TS, _Type, 0) -> + {0, LastUS}; +delete_batch(none, LServer, TS, Type, Num) -> + delete_batch(mnesia:first(archive_msg), LServer, TS, Type, Num); +delete_batch({_, LServer2} = LastUS, LServer, TS, Type, Num) when LServer /= LServer2 -> + delete_batch(mnesia:next(archive_msg, LastUS), LServer, TS, Type, Num); +delete_batch(LastUS, LServer, TS, Type, Num) -> + Left = + lists:foldl( + fun(_, 0) -> + 0; + (#archive_msg{timestamp = TS2, type = Type2} = O, Num2) when TS2 < TS, (Type == all orelse Type == Type2) -> + mnesia:delete_object(O), + Num2 - 1; + (_, Num2) -> + Num2 + end, Num, mnesia:wread({archive_msg, LastUS})), + case Left of + 0 -> {0, LastUS}; + _ -> delete_batch(mnesia:next(archive_msg, LastUS), LServer, TS, Type, Left) + end. + +remove_old_messages_batch(LServer, TimeStamp, Type, Batch, LastUS) -> + R = mnesia:transaction( + fun() -> + {Num, NextUS} = delete_batch(LastUS, LServer, TimeStamp, Type, Batch), + {Batch - Num, NextUS} + end), + case R of + {atomic, {Num, State}} -> + {ok, State, Num}; + {aborted, Err} -> + {error, Err} + end. + extended_fields() -> []. diff --git a/src/mod_offline_mnesia.erl b/src/mod_offline_mnesia.erl index 34893cd97..28a105dcf 100644 --- a/src/mod_offline_mnesia.erl +++ b/src/mod_offline_mnesia.erl @@ -29,7 +29,8 @@ -export([init/2, store_message/1, pop_messages/2, remove_expired_messages/1, remove_old_messages/2, remove_user/2, read_message_headers/2, read_message/3, remove_message/3, read_all_messages/2, - remove_all_messages/2, count_messages/2, import/1]). + remove_all_messages/2, count_messages/2, import/1, + remove_old_messages_batch/4]). -export([need_transform/1, transform/1]). -include_lib("xmpp/include/xmpp.hrl"). @@ -97,6 +98,47 @@ remove_old_messages(Days, _LServer) -> end, mnesia:transaction(F). +delete_batch('$end_of_table', _LServer, _TS, Num) -> + {Num, '$end_of_table'}; +delete_batch(LastUS, _LServer, _TS, 0) -> + {0, LastUS}; +delete_batch(none, LServer, TS, Num) -> + delete_batch(mnesia:first(offline_msg), LServer, TS, Num); +delete_batch({_, LServer2} = LastUS, LServer, TS, Num) when LServer /= LServer2 -> + delete_batch(mnesia:next(offline_msg, LastUS), LServer, TS, Num); +delete_batch(LastUS, LServer, TS, Num) -> + Left = + lists:foldl( + fun(_, 0) -> + 0; + (#offline_msg{timestamp = TS2} = O, Num2) when TS2 < TS -> + mnesia:delete_object(O), + Num2 - 1; + (_, Num2) -> + Num2 + end, Num, mnesia:wread({offline_msg, LastUS})), + case Left of + 0 -> {0, LastUS}; + _ -> delete_batch(mnesia:next(offline_msg, LastUS), LServer, TS, Left) + end. + +remove_old_messages_batch(LServer, Days, Batch, LastUS) -> + S = erlang:system_time(second) - 60 * 60 * 24 * Days, + MegaSecs1 = S div 1000000, + Secs1 = S rem 1000000, + TimeStamp = {MegaSecs1, Secs1, 0}, + R = mnesia:transaction( + fun() -> + {Num, NextUS} = delete_batch(LastUS, LServer, TimeStamp, Batch), + {Batch - Num, NextUS} + end), + case R of + {atomic, {Num, State}} -> + {ok, State, Num}; + {aborted, Err} -> + {error, Err} + end. + remove_user(LUser, LServer) -> US = {LUser, LServer}, F = fun () -> mnesia:delete({offline_msg, US}) end,