25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-11-28 16:34:13 +01:00

Add rate limited delete_old_messages

This commit is contained in:
Paweł Chmielowski 2022-04-08 17:41:17 +02:00
parent b86fe14ef0
commit 0870f32c59
2 changed files with 103 additions and 3 deletions

View File

@ -60,8 +60,8 @@
restore/1, % Still used by some modules restore/1, % Still used by some modules
clear_cache/0, clear_cache/0,
gc/0, gc/0,
get_commands_spec/0 get_commands_spec/0,
]). delete_old_messages_batch/4, delete_old_messages_status/1, delete_old_messages_abort/1]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
@ -320,6 +320,36 @@ get_commands_spec() ->
args_desc = ["Number of days"], args_desc = ["Number of days"],
args_example = [31], args_example = [31],
args = [{days, integer}], result = {res, rescode}}, args = [{days, integer}], result = {res, rescode}},
#ejabberd_commands{name = delete_old_messages_batch, tags = [purge],
desc = "Delete offline messages older than DAYS",
module = ?MODULE, function = delete_old_messages_batch,
args_desc = ["Name of host where messages should be deleted",
"Days to keep messages",
"Number of messages to delete per batch",
"Desired rate of messages to delete per minute"],
args_example = [<<"localhost">>, 31, 1000, 10000],
args = [{host, binary}, {days, integer}, {batch_size, integer}, {rate, integer}],
result = {res, restuple},
result_desc = "Result tuple",
result_example = {ok, <<"Removal of 5000 messages in progress">>}},
#ejabberd_commands{name = delete_old_messages_status, tags = [purge],
desc = "Status of delete old offline messages operation",
module = ?MODULE, function = delete_old_messages_status,
args_desc = ["Name of host where messages should be deleted"],
args_example = [<<"localhost">>],
args = [{host, binary}],
result = {status, string},
result_desc = "Status test",
result_example = {"Operation in progress, delete 5000 messages"}},
#ejabberd_commands{name = abort_delete_old_messages, tags = [purge],
desc = "Abort currently running delete old offline messages operation",
module = ?MODULE, function = delete_old_messages_abort,
args_desc = ["Name of host where operation should be aborted"],
args_example = [<<"localhost">>],
args = [{host, binary}],
result = {status, string},
result_desc = "Status text",
result_example = {"Operation aborted"}},
#ejabberd_commands{name = export2sql, tags = [mnesia], #ejabberd_commands{name = export2sql, tags = [mnesia],
desc = "Export virtual host information from Mnesia tables to SQL file", desc = "Export virtual host information from Mnesia tables to SQL file",
@ -682,6 +712,51 @@ delete_old_messages(Days) ->
{atomic, _} = mod_offline:remove_old_messages(Days, Host) {atomic, _} = mod_offline:remove_old_messages(Days, Host)
end, ejabberd_option:hosts()). end, ejabberd_option:hosts()).
delete_old_messages_batch(Server, Days, BatchSize, Rate) ->
LServer = jid:nameprep(Server),
Mod = gen_mod:db_mod(LServer, mod_offline),
case ejabberd_batch:register_task({spool, LServer}, 0, Rate, {LServer, Days, BatchSize},
fun({L, Da, B} = S) ->
case Mod:remove_old_messages_batch(L, Da, B) of
{ok, Count} ->
{ok, S, Count};
{error, _} = E ->
E
end
end) of
ok ->
{ok, ""};
{error, in_progress} ->
{error, "Operation in progress"}
end.
delete_old_messages_status(Server) ->
LServer = jid:nameprep(Server),
Msg = case ejabberd_batch:task_status({spool, LServer}) of
not_started ->
"Operation not started";
{failed, Steps, Error} ->
io_lib:format("Operation failed after deleting ~p messages with error ~p",
[Steps, misc:format_val(Error)]);
{aborted, Steps} ->
io_lib:format("Operation was aborted after deleting ~p messages",
[Steps]);
{working, Steps} ->
io_lib:format("Operation in progress, deleted ~p messages",
[Steps]);
{completed, Steps} ->
io_lib:format("Operation was completed after deleting ~p messages",
[Steps])
end,
lists:flatten(Msg).
delete_old_messages_abort(Server) ->
LServer = jid:nameprep(Server),
case ejabberd_batch:abort_task({spool, LServer}) of
aborted -> "Operation aborted";
not_started -> "No task running"
end.
%%% %%%
%%% Mnesia management %%% Mnesia management
%%% %%%

View File

@ -30,7 +30,7 @@
-export([init/2, store_message/1, pop_messages/2, remove_expired_messages/1, -export([init/2, store_message/1, pop_messages/2, remove_expired_messages/1,
remove_old_messages/2, remove_user/2, read_message_headers/2, remove_old_messages/2, remove_user/2, read_message_headers/2,
read_message/3, remove_message/3, read_all_messages/2, read_message/3, remove_message/3, read_all_messages/2,
remove_all_messages/2, count_messages/2, import/1, export/1]). remove_all_messages/2, count_messages/2, import/1, export/1, remove_old_messages_batch/3]).
-include_lib("xmpp/include/xmpp.hrl"). -include_lib("xmpp/include/xmpp.hrl").
-include("mod_offline.hrl"). -include("mod_offline.hrl").
@ -112,6 +112,31 @@ remove_old_messages(Days, LServer) ->
end, end,
{atomic, ok}. {atomic, ok}.
remove_old_messages_batch(LServer, Days, Batch) ->
case ejabberd_sql:sql_query(
LServer,
fun(pgsql, _) ->
ejabberd_sql:sql_query_t(
?SQL("DELETE FROM spool"
" WHERE created_at <"
" NOW() - %(Days)d * INTERVAL '1 DAY' LIMIT %(Batch)d"));
(sqlite, _) ->
ejabberd_sql:sql_query_t(
?SQL("DELETE FROM spool"
" WHERE created_at <"
" DATETIME('now', '-%(Days)d days') LIMIT %(Batch)d"));
(_, _) ->
ejabberd_sql:sql_query_t(
?SQL("DELETE FROM spool"
" WHERE created_at < NOW() - INTERVAL %(Days)d DAY LIMIT %(Batch)d"))
end)
of
{updated, N} ->
{ok, N};
Error ->
{error, Error}
end.
remove_user(LUser, LServer) -> remove_user(LUser, LServer) ->
ejabberd_sql:sql_query( ejabberd_sql:sql_query(
LServer, LServer,