25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-12-22 17:28:25 +01:00

Implement batch operations in mnesia backend

This commit is contained in:
Paweł Chmielowski 2022-05-02 15:05:55 +02:00
parent ce18c93419
commit 6f11210edd
4 changed files with 122 additions and 16 deletions

View File

@ -715,13 +715,26 @@ delete_old_messages(Days) ->
delete_old_messages_batch(Server, Days, BatchSize, Rate) -> delete_old_messages_batch(Server, Days, BatchSize, Rate) ->
LServer = jid:nameprep(Server), LServer = jid:nameprep(Server),
Mod = gen_mod:db_mod(LServer, mod_offline), Mod = gen_mod:db_mod(LServer, mod_offline),
case ejabberd_batch:register_task({spool, LServer}, 0, Rate, {LServer, Days, BatchSize}, case ejabberd_batch:register_task({spool, LServer}, 0, Rate, {LServer, Days, BatchSize, none},
fun({L, Da, B} = S) -> fun({L, Da, B, IS} = S) ->
case {erlang:function_exported(Mod, remove_old_messages_batch, 3),
erlang:function_exported(Mod, remove_old_messages_batch, 4)} of
{true, _} ->
case Mod:remove_old_messages_batch(L, Da, B) of case Mod:remove_old_messages_batch(L, Da, B) of
{ok, Count} -> {ok, Count} ->
{ok, S, Count}; {ok, S, Count};
{error, _} = E -> {error, _} = E ->
E E
end;
{_, true} ->
case Mod:remove_old_messages_batch(L, Da, B, IS) of
{ok, IS2, Count} ->
{ok, {L, Da, B, IS2}, Count};
{error, _} = E ->
E
end;
_ ->
{error, not_implemented_for_backend}
end end
end) of end) of
ok -> ok ->

View File

@ -578,13 +578,27 @@ delete_old_messages_batch(Server, Type, Days, BatchSize, Rate) when Type == <<"c
TypeA = misc:binary_to_atom(Type), TypeA = misc:binary_to_atom(Type),
LServer = jid:nameprep(Server), LServer = jid:nameprep(Server),
Mod = gen_mod:db_mod(LServer, ?MODULE), Mod = gen_mod:db_mod(LServer, ?MODULE),
case ejabberd_batch:register_task({mam, LServer}, 0, Rate, {LServer, TypeA, TimeStamp, BatchSize},
fun({L, T, St, B} = S) -> case ejabberd_batch:register_task({mam, LServer}, 0, Rate, {LServer, TypeA, TimeStamp, BatchSize, none},
fun({L, T, St, B, IS} = S) ->
case {erlang:function_exported(Mod, remove_old_messages_batch, 4),
erlang:function_exported(Mod, remove_old_messages_batch, 5)} of
{true, _} ->
case Mod:delete_old_messages_batch(L, St, T, B) of case Mod:delete_old_messages_batch(L, St, T, B) of
{ok, Count} -> {ok, Count} ->
{ok, S, Count}; {ok, S, Count};
{error, _} = E -> {error, _} = E ->
E E
end;
{_, true} ->
case Mod:remove_old_messages_batch(L, St, T, B, IS) of
{ok, IS2, Count} ->
{ok, {L, St, T, B, IS2}, Count};
{error, _} = E ->
E
end;
_ ->
{error, not_implemented_for_backend}
end end
end) of end) of
ok -> ok ->

View File

@ -29,7 +29,7 @@
%% API %% API
-export([init/2, remove_user/2, remove_room/3, delete_old_messages/3, -export([init/2, remove_user/2, remove_room/3, delete_old_messages/3,
extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/6, remove_from_archive/3, extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/6, remove_from_archive/3,
is_empty_for_user/2, is_empty_for_room/3]). is_empty_for_user/2, is_empty_for_room/3, remove_old_messages_batch/5]).
-include_lib("stdlib/include/ms_transform.hrl"). -include_lib("stdlib/include/ms_transform.hrl").
-include_lib("xmpp/include/xmpp.hrl"). -include_lib("xmpp/include/xmpp.hrl").
@ -131,6 +131,43 @@ delete_old_user_messages(User, TimeStamp, Type) ->
Err Err
end. end.
delete_batch('$end_of_table', _LServer, _TS, _Type, Num) ->
{Num, '$end_of_table'};
delete_batch(LastUS, _LServer, _TS, _Type, 0) ->
{0, LastUS};
delete_batch(none, LServer, TS, Type, Num) ->
delete_batch(mnesia:first(archive_msg), LServer, TS, Type, Num);
delete_batch({_, LServer2} = LastUS, LServer, TS, Type, Num) when LServer /= LServer2 ->
delete_batch(mnesia:next(archive_msg, LastUS), LServer, TS, Type, Num);
delete_batch(LastUS, LServer, TS, Type, Num) ->
Left =
lists:foldl(
fun(_, 0) ->
0;
(#archive_msg{timestamp = TS2, type = Type2} = O, Num2) when TS2 < TS, (Type == all orelse Type == Type2) ->
mnesia:delete_object(O),
Num2 - 1;
(_, Num2) ->
Num2
end, Num, mnesia:wread({archive_msg, LastUS})),
case Left of
0 -> {0, LastUS};
_ -> delete_batch(mnesia:next(archive_msg, LastUS), LServer, TS, Type, Left)
end.
remove_old_messages_batch(LServer, TimeStamp, Type, Batch, LastUS) ->
R = mnesia:transaction(
fun() ->
{Num, NextUS} = delete_batch(LastUS, LServer, TimeStamp, Type, Batch),
{Batch - Num, NextUS}
end),
case R of
{atomic, {Num, State}} ->
{ok, State, Num};
{aborted, Err} ->
{error, Err}
end.
extended_fields() -> extended_fields() ->
[]. [].

View File

@ -29,7 +29,8 @@
-export([init/2, store_message/1, pop_messages/2, remove_expired_messages/1, -export([init/2, store_message/1, pop_messages/2, remove_expired_messages/1,
remove_old_messages/2, remove_user/2, read_message_headers/2, remove_old_messages/2, remove_user/2, read_message_headers/2,
read_message/3, remove_message/3, read_all_messages/2, read_message/3, remove_message/3, read_all_messages/2,
remove_all_messages/2, count_messages/2, import/1]). remove_all_messages/2, count_messages/2, import/1,
remove_old_messages_batch/4]).
-export([need_transform/1, transform/1]). -export([need_transform/1, transform/1]).
-include_lib("xmpp/include/xmpp.hrl"). -include_lib("xmpp/include/xmpp.hrl").
@ -97,6 +98,47 @@ remove_old_messages(Days, _LServer) ->
end, end,
mnesia:transaction(F). mnesia:transaction(F).
delete_batch('$end_of_table', _LServer, _TS, Num) ->
{Num, '$end_of_table'};
delete_batch(LastUS, _LServer, _TS, 0) ->
{0, LastUS};
delete_batch(none, LServer, TS, Num) ->
delete_batch(mnesia:first(offline_msg), LServer, TS, Num);
delete_batch({_, LServer2} = LastUS, LServer, TS, Num) when LServer /= LServer2 ->
delete_batch(mnesia:next(offline_msg, LastUS), LServer, TS, Num);
delete_batch(LastUS, LServer, TS, Num) ->
Left =
lists:foldl(
fun(_, 0) ->
0;
(#offline_msg{timestamp = TS2} = O, Num2) when TS2 < TS ->
mnesia:delete_object(O),
Num2 - 1;
(_, Num2) ->
Num2
end, Num, mnesia:wread({offline_msg, LastUS})),
case Left of
0 -> {0, LastUS};
_ -> delete_batch(mnesia:next(offline_msg, LastUS), LServer, TS, Left)
end.
remove_old_messages_batch(LServer, Days, Batch, LastUS) ->
S = erlang:system_time(second) - 60 * 60 * 24 * Days,
MegaSecs1 = S div 1000000,
Secs1 = S rem 1000000,
TimeStamp = {MegaSecs1, Secs1, 0},
R = mnesia:transaction(
fun() ->
{Num, NextUS} = delete_batch(LastUS, LServer, TimeStamp, Batch),
{Batch - Num, NextUS}
end),
case R of
{atomic, {Num, State}} ->
{ok, State, Num};
{aborted, Err} ->
{error, Err}
end.
remove_user(LUser, LServer) -> remove_user(LUser, LServer) ->
US = {LUser, LServer}, US = {LUser, LServer},
F = fun () -> mnesia:delete({offline_msg, US}) end, F = fun () -> mnesia:delete({offline_msg, US}) end,