mirror of
https://github.com/processone/ejabberd.git
synced 2024-12-22 17:28:25 +01:00
Add rate limited delete_old_mam_messages command
This commit is contained in:
parent
6dbd1118a2
commit
b86fe14ef0
205
src/ejabberd_batch.erl
Normal file
205
src/ejabberd_batch.erl
Normal file
@ -0,0 +1,205 @@
|
||||
%%%----------------------------------------------------------------------
|
||||
%%% File : ejabberd_batch.erl
|
||||
%%% Author : Paweł Chmielowski <pawel@process-one.net>
|
||||
%%% Purpose : Batch tasks manager
|
||||
%%% Created : 8 mar 2022 by Paweł Chmielowski <pawel@process-one.net>
|
||||
%%%
|
||||
%%%
|
||||
%%% ejabberd, Copyright (C) 2002-2022 ProcessOne
|
||||
%%%
|
||||
%%% This program is free software; you can redistribute it and/or
|
||||
%%% modify it under the terms of the GNU General Public License as
|
||||
%%% published by the Free Software Foundation; either version 2 of the
|
||||
%%% License, or (at your option) any later version.
|
||||
%%%
|
||||
%%% This program is distributed in the hope that it will be useful,
|
||||
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
%%% General Public License for more details.
|
||||
%%%
|
||||
%%% You should have received a copy of the GNU General Public License along
|
||||
%%% with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
%%%
|
||||
%%%----------------------------------------------------------------------
|
||||
|
||||
-module(ejabberd_batch).
|
||||
-author("pawel@process-one.net").
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
-include("logger.hrl").
|
||||
|
||||
%% API
|
||||
-export([start_link/0]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||
code_change/3]).
|
||||
-export([register_task/5, task_status/1, abort_task/1]).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
-record(state, {tasks = #{}}).
|
||||
-record(task, {state = not_started, pid, steps, done_steps}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link() ->
|
||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
|
||||
register_task(Type, Steps, Rate, JobState, JobFun) ->
|
||||
gen_server:call(?MODULE, {register_task, Type, Steps, Rate, JobState, JobFun}).
|
||||
|
||||
task_status(Type) ->
|
||||
gen_server:call(?MODULE, {task_status, Type}).
|
||||
|
||||
abort_task(Type) ->
|
||||
gen_server:call(?MODULE, {abort_task, Type}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Initializes the server
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([]) ->
|
||||
{ok, #state{}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||
State :: #state{}) ->
|
||||
{reply, Reply :: term(), NewState :: #state{}} |
|
||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_call({register_task, Type, Steps, Rate, JobState, JobFun}, _From, #state{tasks = Tasks} = State) ->
|
||||
case maps:get(Type, Tasks, #task{}) of
|
||||
#task{state = S} when S == completed; S == not_started; S == aborted; S == failed ->
|
||||
Pid = spawn(fun() -> work_loop(Type, JobState, JobFun, Rate, erlang:monotonic_time(second), 0) end),
|
||||
Tasks2 = maps:put(Type, #task{state = working, pid = Pid, steps = Steps, done_steps = 0}, Tasks),
|
||||
{reply, ok, #state{tasks = Tasks2}};
|
||||
#task{state = working} ->
|
||||
{reply, {error, in_progress}, State}
|
||||
end;
|
||||
handle_call({task_status, Type}, _From, #state{tasks = Tasks} = State) ->
|
||||
case maps:get(Type, Tasks, none) of
|
||||
none ->
|
||||
{reply, not_started, State};
|
||||
#task{state = not_started} ->
|
||||
{reply, not_started, State};
|
||||
#task{state = failed, done_steps = Steps, pid = Error} ->
|
||||
{reply, {failed, Steps, Error}, State};
|
||||
#task{state = aborted, done_steps = Steps} ->
|
||||
{reply, {aborted, Steps}, State};
|
||||
#task{state = working, done_steps = Steps} ->
|
||||
{reply, {working, Steps}, State};
|
||||
#task{state = completed, done_steps = Steps} ->
|
||||
{reply, {completed, Steps}, State}
|
||||
end;
|
||||
handle_call({abort_task, Type}, _From, #state{tasks = Tasks} = State) ->
|
||||
case maps:get(Type, Tasks, none) of
|
||||
#task{state = working, pid = Pid} = T ->
|
||||
Pid ! abort,
|
||||
Tasks2 = maps:put(Type, T#task{state = aborted, pid = none}, Tasks),
|
||||
{reply, aborted, State#state{tasks = Tasks2}};
|
||||
_ ->
|
||||
{reply, not_started, State}
|
||||
end;
|
||||
handle_call(_Request, _From, State = #state{}) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling cast messages
|
||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast({task_finished, Type, Pid}, #state{tasks = Tasks} = State) ->
|
||||
case maps:get(Type, Tasks, none) of
|
||||
#task{state = working, pid = Pid2} = T when Pid == Pid2 ->
|
||||
Tasks2 = maps:put(Type, T#task{state = completed, pid = none}, Tasks),
|
||||
{noreply, State#state{tasks = Tasks2}};
|
||||
_ ->
|
||||
{noreply, State}
|
||||
end;
|
||||
handle_cast({task_progress, Type, Pid, Count}, #state{tasks = Tasks} = State) ->
|
||||
case maps:get(Type, Tasks, none) of
|
||||
#task{state = working, pid = Pid2, done_steps = Steps} = T when Pid == Pid2 ->
|
||||
Tasks2 = maps:put(Type, T#task{done_steps = Steps + Count}, Tasks),
|
||||
{noreply, State#state{tasks = Tasks2}};
|
||||
_ ->
|
||||
{noreply, State}
|
||||
end;
|
||||
handle_cast({task_error, Type, Pid, Error}, #state{tasks = Tasks} = State) ->
|
||||
case maps:get(Type, Tasks, none) of
|
||||
#task{state = working, pid = Pid2} = T when Pid == Pid2 ->
|
||||
Tasks2 = maps:put(Type, T#task{state = failed, pid = Error}, Tasks),
|
||||
{noreply, State#state{tasks = Tasks2}};
|
||||
_ ->
|
||||
{noreply, State}
|
||||
end;
|
||||
handle_cast(_Request, State = #state{}) ->
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info(_Info, State = #state{}) ->
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||
%% with Reason. The return value is ignored.
|
||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||
State :: #state{}) -> term()).
|
||||
terminate(_Reason, _State = #state{}) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||
Extra :: term()) ->
|
||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
|
||||
work_loop(Task, JobState, JobFun, Rate, StartDate, CurrentProgress) ->
|
||||
try JobFun(JobState) of
|
||||
{ok, _NewState, 0} ->
|
||||
gen_server:cast(?MODULE, {task_finished, Task, self()});
|
||||
{ok, NewState, Count} ->
|
||||
gen_server:cast(?MODULE, {task_progress, Task, self(), Count}),
|
||||
NewProgress = CurrentProgress + Count,
|
||||
TimeSpent = erlang:monotonic_time(second) - StartDate,
|
||||
SleepTime = max(0, NewProgress/Rate*60 - TimeSpent),
|
||||
receive
|
||||
abort -> ok
|
||||
after floor(SleepTime*1000) ->
|
||||
work_loop(Task, NewState, JobFun, Rate, StartDate, NewProgress)
|
||||
end;
|
||||
{error, Error} ->
|
||||
gen_server:cast(?MODULE, {task_error, Task, self(), Error})
|
||||
catch _:_ ->
|
||||
gen_server:cast(?MODULE, {task_error, Task, self(), internal_error})
|
||||
end.
|
@ -66,7 +66,8 @@ init([]) ->
|
||||
supervisor(ejabberd_gen_mod_sup, gen_mod),
|
||||
worker(ejabberd_acme),
|
||||
worker(ejabberd_auth),
|
||||
worker(ejabberd_oauth)]}}.
|
||||
worker(ejabberd_oauth),
|
||||
worker(ejabberd_batch)]}}.
|
||||
|
||||
-spec stop_child(atom()) -> ok.
|
||||
stop_child(Name) ->
|
||||
|
@ -43,7 +43,8 @@
|
||||
get_room_config/4, set_room_option/3, offline_message/1, export/1,
|
||||
mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2,
|
||||
is_empty_for_user/2, is_empty_for_room/3, check_create_room/4,
|
||||
process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2, select/7]).
|
||||
process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2, select/7,
|
||||
delete_old_messages_batch/5, delete_old_messages_status/1, delete_old_messages_abort/1]).
|
||||
|
||||
-include_lib("xmpp/include/xmpp.hrl").
|
||||
-include("logger.hrl").
|
||||
@ -568,6 +569,56 @@ message_is_archived(false, #{lserver := LServer}, Pkt) ->
|
||||
false
|
||||
end.
|
||||
|
||||
delete_old_messages_batch(Server, Type, Days, BatchSize, Rate) when Type == <<"chat">>;
|
||||
Type == <<"groupchat">>;
|
||||
Type == <<"all">> ->
|
||||
CurrentTime = make_id(),
|
||||
Diff = Days * 24 * 60 * 60 * 1000000,
|
||||
TimeStamp = misc:usec_to_now(CurrentTime - Diff),
|
||||
TypeA = misc:binary_to_atom(Type),
|
||||
LServer = jid:nameprep(Server),
|
||||
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||
case ejabberd_batch:register_task({mam, LServer}, 0, Rate, {LServer, TypeA, TimeStamp, BatchSize},
|
||||
fun({L, T, St, B} = S) ->
|
||||
case Mod:delete_old_messages_batch(L, St, T, B) of
|
||||
{ok, Count} ->
|
||||
{ok, S, Count};
|
||||
{error, _} = E ->
|
||||
E
|
||||
end
|
||||
end) of
|
||||
ok ->
|
||||
{ok, ""};
|
||||
{error, in_progress} ->
|
||||
{error, "Operation in progress"}
|
||||
end.
|
||||
delete_old_messages_status(Server) ->
|
||||
LServer = jid:nameprep(Server),
|
||||
Msg = case ejabberd_batch:task_status({mam, LServer}) of
|
||||
not_started ->
|
||||
"Operation not started";
|
||||
{failed, Steps, Error} ->
|
||||
io_lib:format("Operation failed after deleting ~p messages with error ~p",
|
||||
[Steps, misc:format_val(Error)]);
|
||||
{aborted, Steps} ->
|
||||
io_lib:format("Operation was aborted after deleting ~p messages",
|
||||
[Steps]);
|
||||
{working, Steps} ->
|
||||
io_lib:format("Operation in progress, deleted ~p messages",
|
||||
[Steps]);
|
||||
{completed, Steps} ->
|
||||
io_lib:format("Operation was completed after deleting ~p messages",
|
||||
[Steps])
|
||||
end,
|
||||
lists:flatten(Msg).
|
||||
|
||||
delete_old_messages_abort(Server) ->
|
||||
LServer = jid:nameprep(Server),
|
||||
case ejabberd_batch:abort_task({mam, LServer}) of
|
||||
aborted -> "Operation aborted";
|
||||
not_started -> "No task running"
|
||||
end.
|
||||
|
||||
delete_old_messages(TypeBin, Days) when TypeBin == <<"chat">>;
|
||||
TypeBin == <<"groupchat">>;
|
||||
TypeBin == <<"all">> ->
|
||||
@ -1379,6 +1430,39 @@ get_commands_spec() ->
|
||||
args_example = [<<"all">>, 31],
|
||||
args = [{type, binary}, {days, integer}],
|
||||
result = {res, rescode}},
|
||||
#ejabberd_commands{name = delete_old_mam_messages_batch, tags = [purge],
|
||||
desc = "Delete MAM messages older than DAYS",
|
||||
longdesc = "Valid message TYPEs: "
|
||||
"\"chat\", \"groupchat\", \"all\".",
|
||||
module = ?MODULE, function = delete_old_messages_batch,
|
||||
args_desc = ["Name of host where messages should be deleted",
|
||||
"Type of messages to delete (chat, groupchat, all)",
|
||||
"Days to keep messages",
|
||||
"Number of messages to delete per batch",
|
||||
"Desired rate of messages to delete per minute"],
|
||||
args_example = [<<"localhost">>, <<"all">>, 31, 1000, 10000],
|
||||
args = [{host, binary}, {type, binary}, {days, integer}, {batch_size, integer}, {rate, integer}],
|
||||
result = {res, restuple},
|
||||
result_desc = "Result tuple",
|
||||
result_example = {ok, <<"Removal of 5000 messages in progress">>}},
|
||||
#ejabberd_commands{name = delete_old_mam_messages_status, tags = [purge],
|
||||
desc = "Status of delete old MAM messages operation",
|
||||
module = ?MODULE, function = delete_old_messages_status,
|
||||
args_desc = ["Name of host where messages should be deleted"],
|
||||
args_example = [<<"localhost">>],
|
||||
args = [{host, binary}],
|
||||
result = {status, string},
|
||||
result_desc = "Status test",
|
||||
result_example = {"Operation in progress, delete 5000 messages"}},
|
||||
#ejabberd_commands{name = abort_delete_old_mam_messages, tags = [purge],
|
||||
desc = "Abort currently running delete old MAM messages operation",
|
||||
module = ?MODULE, function = delete_old_messages_abort,
|
||||
args_desc = ["Name of host where operation should be aborted"],
|
||||
args_example = [<<"localhost">>],
|
||||
args = [{host, binary}],
|
||||
result = {status, string},
|
||||
result_desc = "Status text",
|
||||
result_example = {"Operation aborted"}},
|
||||
#ejabberd_commands{name = remove_mam_for_user, tags = [mam],
|
||||
desc = "Remove mam archive for user",
|
||||
module = ?MODULE, function = remove_mam_for_user,
|
||||
|
@ -30,7 +30,8 @@
|
||||
%% API
|
||||
-export([init/2, remove_user/2, remove_room/3, delete_old_messages/3,
|
||||
extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/7, export/1, remove_from_archive/3,
|
||||
is_empty_for_user/2, is_empty_for_room/3, select_with_mucsub/6]).
|
||||
is_empty_for_user/2, is_empty_for_room/3, select_with_mucsub/6,
|
||||
delete_old_messages_batch/4, count_messages_to_delete/3]).
|
||||
|
||||
-include_lib("stdlib/include/ms_transform.hrl").
|
||||
-include_lib("xmpp/include/xmpp.hrl").
|
||||
@ -71,6 +72,56 @@ remove_from_archive(LUser, LServer, WithJid) ->
|
||||
_ -> ok
|
||||
end.
|
||||
|
||||
count_messages_to_delete(ServerHost, TimeStamp, Type) ->
|
||||
TS = misc:now_to_usec(TimeStamp),
|
||||
Res =
|
||||
case Type of
|
||||
all ->
|
||||
ejabberd_sql:sql_query(
|
||||
ServerHost,
|
||||
?SQL("select count(*) from archive"
|
||||
" where timestamp < %(TS)d and %(ServerHost)H"));
|
||||
_ ->
|
||||
SType = misc:atom_to_binary(Type),
|
||||
ejabberd_sql:sql_query(
|
||||
ServerHost,
|
||||
?SQL("select @(count(*))d from archive"
|
||||
" where timestamp < %(TS)d"
|
||||
" and kind=%(SType)s"
|
||||
" and %(ServerHost)H"))
|
||||
end,
|
||||
case Res of
|
||||
{selected, [Count]} ->
|
||||
{ok, Count};
|
||||
_ ->
|
||||
error
|
||||
end.
|
||||
|
||||
delete_old_messages_batch(ServerHost, TimeStamp, Type, Batch) ->
|
||||
TS = misc:now_to_usec(TimeStamp),
|
||||
Res =
|
||||
case Type of
|
||||
all ->
|
||||
ejabberd_sql:sql_query(
|
||||
ServerHost,
|
||||
?SQL("delete from archive"
|
||||
" where timestamp < %(TS)d and %(ServerHost)H limit %(Batch)d"));
|
||||
_ ->
|
||||
SType = misc:atom_to_binary(Type),
|
||||
ejabberd_sql:sql_query(
|
||||
ServerHost,
|
||||
?SQL("delete from archive"
|
||||
" where timestamp < %(TS)d"
|
||||
" and kind=%(SType)s"
|
||||
" and %(ServerHost)H limit %(Batch)d"))
|
||||
end,
|
||||
case Res of
|
||||
{updated, Count} ->
|
||||
{ok, Count};
|
||||
{error, _} = Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
delete_old_messages(ServerHost, TimeStamp, Type) ->
|
||||
TS = misc:now_to_usec(TimeStamp),
|
||||
case Type of
|
||||
|
Loading…
Reference in New Issue
Block a user