From b86fe14ef098212a1ed33f333f739888a7b9c038 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Chmielowski?= Date: Fri, 8 Apr 2022 16:49:24 +0200 Subject: [PATCH] Add rate limited delete_old_mam_messages command --- src/ejabberd_batch.erl | 205 +++++++++++++++++++++++++++++++++++++++++ src/ejabberd_sup.erl | 3 +- src/mod_mam.erl | 86 ++++++++++++++++- src/mod_mam_sql.erl | 53 ++++++++++- 4 files changed, 344 insertions(+), 3 deletions(-) create mode 100644 src/ejabberd_batch.erl diff --git a/src/ejabberd_batch.erl b/src/ejabberd_batch.erl new file mode 100644 index 000000000..05750164c --- /dev/null +++ b/src/ejabberd_batch.erl @@ -0,0 +1,205 @@ +%%%---------------------------------------------------------------------- +%%% File : ejabberd_batch.erl +%%% Author : Paweł Chmielowski +%%% Purpose : Batch tasks manager +%%% Created : 8 mar 2022 by Paweł Chmielowski +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2022 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- + +-module(ejabberd_batch). +-author("pawel@process-one.net"). + +-behaviour(gen_server). + +-include("logger.hrl"). + +%% API +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). +-export([register_task/5, task_status/1, abort_task/1]). + +-define(SERVER, ?MODULE). + +-record(state, {tasks = #{}}). +-record(task, {state = not_started, pid, steps, done_steps}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +register_task(Type, Steps, Rate, JobState, JobFun) -> + gen_server:call(?MODULE, {register_task, Type, Steps, Rate, JobState, JobFun}). + +task_status(Type) -> + gen_server:call(?MODULE, {task_status, Type}). + +abort_task(Type) -> + gen_server:call(?MODULE, {abort_task, Type}). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call({register_task, Type, Steps, Rate, JobState, JobFun}, _From, #state{tasks = Tasks} = State) -> + case maps:get(Type, Tasks, #task{}) of + #task{state = S} when S == completed; S == not_started; S == aborted; S == failed -> + Pid = spawn(fun() -> work_loop(Type, JobState, JobFun, Rate, erlang:monotonic_time(second), 0) end), + Tasks2 = maps:put(Type, #task{state = working, pid = Pid, steps = Steps, done_steps = 0}, Tasks), + {reply, ok, #state{tasks = Tasks2}}; + #task{state = working} -> + {reply, {error, in_progress}, State} + end; +handle_call({task_status, Type}, _From, #state{tasks = Tasks} = State) -> + case maps:get(Type, Tasks, none) of + none -> + {reply, not_started, State}; + #task{state = not_started} -> + {reply, not_started, State}; + #task{state = failed, done_steps = Steps, pid = Error} -> + {reply, {failed, Steps, Error}, State}; + #task{state = aborted, done_steps = Steps} -> + {reply, {aborted, Steps}, State}; + #task{state = working, done_steps = Steps} -> + {reply, {working, Steps}, State}; + #task{state = completed, done_steps = Steps} -> + {reply, {completed, Steps}, State} + end; +handle_call({abort_task, Type}, _From, #state{tasks = Tasks} = State) -> + case maps:get(Type, Tasks, none) of + #task{state = working, pid = Pid} = T -> + Pid ! abort, + Tasks2 = maps:put(Type, T#task{state = aborted, pid = none}, Tasks), + {reply, aborted, State#state{tasks = Tasks2}}; + _ -> + {reply, not_started, State} + end; +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({task_finished, Type, Pid}, #state{tasks = Tasks} = State) -> + case maps:get(Type, Tasks, none) of + #task{state = working, pid = Pid2} = T when Pid == Pid2 -> + Tasks2 = maps:put(Type, T#task{state = completed, pid = none}, Tasks), + {noreply, State#state{tasks = Tasks2}}; + _ -> + {noreply, State} + end; +handle_cast({task_progress, Type, Pid, Count}, #state{tasks = Tasks} = State) -> + case maps:get(Type, Tasks, none) of + #task{state = working, pid = Pid2, done_steps = Steps} = T when Pid == Pid2 -> + Tasks2 = maps:put(Type, T#task{done_steps = Steps + Count}, Tasks), + {noreply, State#state{tasks = Tasks2}}; + _ -> + {noreply, State} + end; +handle_cast({task_error, Type, Pid, Error}, #state{tasks = Tasks} = State) -> + case maps:get(Type, Tasks, none) of + #task{state = working, pid = Pid2} = T when Pid == Pid2 -> + Tasks2 = maps:put(Type, T#task{state = failed, pid = Error}, Tasks), + {noreply, State#state{tasks = Tasks2}}; + _ -> + {noreply, State} + end; +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +work_loop(Task, JobState, JobFun, Rate, StartDate, CurrentProgress) -> + try JobFun(JobState) of + {ok, _NewState, 0} -> + gen_server:cast(?MODULE, {task_finished, Task, self()}); + {ok, NewState, Count} -> + gen_server:cast(?MODULE, {task_progress, Task, self(), Count}), + NewProgress = CurrentProgress + Count, + TimeSpent = erlang:monotonic_time(second) - StartDate, + SleepTime = max(0, NewProgress/Rate*60 - TimeSpent), + receive + abort -> ok + after floor(SleepTime*1000) -> + work_loop(Task, NewState, JobFun, Rate, StartDate, NewProgress) + end; + {error, Error} -> + gen_server:cast(?MODULE, {task_error, Task, self(), Error}) + catch _:_ -> + gen_server:cast(?MODULE, {task_error, Task, self(), internal_error}) + end. diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl index a97c7784b..e15e658c4 100644 --- a/src/ejabberd_sup.erl +++ b/src/ejabberd_sup.erl @@ -66,7 +66,8 @@ init([]) -> supervisor(ejabberd_gen_mod_sup, gen_mod), worker(ejabberd_acme), worker(ejabberd_auth), - worker(ejabberd_oauth)]}}. + worker(ejabberd_oauth), + worker(ejabberd_batch)]}}. -spec stop_child(atom()) -> ok. stop_child(Name) -> diff --git a/src/mod_mam.erl b/src/mod_mam.erl index 59940ec81..ffbd1d4eb 100644 --- a/src/mod_mam.erl +++ b/src/mod_mam.erl @@ -43,7 +43,8 @@ get_room_config/4, set_room_option/3, offline_message/1, export/1, mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2, is_empty_for_user/2, is_empty_for_room/3, check_create_room/4, - process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2, select/7]). + process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2, select/7, + delete_old_messages_batch/5, delete_old_messages_status/1, delete_old_messages_abort/1]). -include_lib("xmpp/include/xmpp.hrl"). -include("logger.hrl"). @@ -568,6 +569,56 @@ message_is_archived(false, #{lserver := LServer}, Pkt) -> false end. +delete_old_messages_batch(Server, Type, Days, BatchSize, Rate) when Type == <<"chat">>; + Type == <<"groupchat">>; + Type == <<"all">> -> + CurrentTime = make_id(), + Diff = Days * 24 * 60 * 60 * 1000000, + TimeStamp = misc:usec_to_now(CurrentTime - Diff), + TypeA = misc:binary_to_atom(Type), + LServer = jid:nameprep(Server), + Mod = gen_mod:db_mod(LServer, ?MODULE), + case ejabberd_batch:register_task({mam, LServer}, 0, Rate, {LServer, TypeA, TimeStamp, BatchSize}, + fun({L, T, St, B} = S) -> + case Mod:delete_old_messages_batch(L, St, T, B) of + {ok, Count} -> + {ok, S, Count}; + {error, _} = E -> + E + end + end) of + ok -> + {ok, ""}; + {error, in_progress} -> + {error, "Operation in progress"} + end. +delete_old_messages_status(Server) -> + LServer = jid:nameprep(Server), + Msg = case ejabberd_batch:task_status({mam, LServer}) of + not_started -> + "Operation not started"; + {failed, Steps, Error} -> + io_lib:format("Operation failed after deleting ~p messages with error ~p", + [Steps, misc:format_val(Error)]); + {aborted, Steps} -> + io_lib:format("Operation was aborted after deleting ~p messages", + [Steps]); + {working, Steps} -> + io_lib:format("Operation in progress, deleted ~p messages", + [Steps]); + {completed, Steps} -> + io_lib:format("Operation was completed after deleting ~p messages", + [Steps]) + end, + lists:flatten(Msg). + +delete_old_messages_abort(Server) -> + LServer = jid:nameprep(Server), + case ejabberd_batch:abort_task({mam, LServer}) of + aborted -> "Operation aborted"; + not_started -> "No task running" + end. + delete_old_messages(TypeBin, Days) when TypeBin == <<"chat">>; TypeBin == <<"groupchat">>; TypeBin == <<"all">> -> @@ -1379,6 +1430,39 @@ get_commands_spec() -> args_example = [<<"all">>, 31], args = [{type, binary}, {days, integer}], result = {res, rescode}}, + #ejabberd_commands{name = delete_old_mam_messages_batch, tags = [purge], + desc = "Delete MAM messages older than DAYS", + longdesc = "Valid message TYPEs: " + "\"chat\", \"groupchat\", \"all\".", + module = ?MODULE, function = delete_old_messages_batch, + args_desc = ["Name of host where messages should be deleted", + "Type of messages to delete (chat, groupchat, all)", + "Days to keep messages", + "Number of messages to delete per batch", + "Desired rate of messages to delete per minute"], + args_example = [<<"localhost">>, <<"all">>, 31, 1000, 10000], + args = [{host, binary}, {type, binary}, {days, integer}, {batch_size, integer}, {rate, integer}], + result = {res, restuple}, + result_desc = "Result tuple", + result_example = {ok, <<"Removal of 5000 messages in progress">>}}, + #ejabberd_commands{name = delete_old_mam_messages_status, tags = [purge], + desc = "Status of delete old MAM messages operation", + module = ?MODULE, function = delete_old_messages_status, + args_desc = ["Name of host where messages should be deleted"], + args_example = [<<"localhost">>], + args = [{host, binary}], + result = {status, string}, + result_desc = "Status test", + result_example = {"Operation in progress, delete 5000 messages"}}, + #ejabberd_commands{name = abort_delete_old_mam_messages, tags = [purge], + desc = "Abort currently running delete old MAM messages operation", + module = ?MODULE, function = delete_old_messages_abort, + args_desc = ["Name of host where operation should be aborted"], + args_example = [<<"localhost">>], + args = [{host, binary}], + result = {status, string}, + result_desc = "Status text", + result_example = {"Operation aborted"}}, #ejabberd_commands{name = remove_mam_for_user, tags = [mam], desc = "Remove mam archive for user", module = ?MODULE, function = remove_mam_for_user, diff --git a/src/mod_mam_sql.erl b/src/mod_mam_sql.erl index 8e803587e..9eb9716fb 100644 --- a/src/mod_mam_sql.erl +++ b/src/mod_mam_sql.erl @@ -30,7 +30,8 @@ %% API -export([init/2, remove_user/2, remove_room/3, delete_old_messages/3, extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/7, export/1, remove_from_archive/3, - is_empty_for_user/2, is_empty_for_room/3, select_with_mucsub/6]). + is_empty_for_user/2, is_empty_for_room/3, select_with_mucsub/6, + delete_old_messages_batch/4, count_messages_to_delete/3]). -include_lib("stdlib/include/ms_transform.hrl"). -include_lib("xmpp/include/xmpp.hrl"). @@ -71,6 +72,56 @@ remove_from_archive(LUser, LServer, WithJid) -> _ -> ok end. +count_messages_to_delete(ServerHost, TimeStamp, Type) -> + TS = misc:now_to_usec(TimeStamp), + Res = + case Type of + all -> + ejabberd_sql:sql_query( + ServerHost, + ?SQL("select count(*) from archive" + " where timestamp < %(TS)d and %(ServerHost)H")); + _ -> + SType = misc:atom_to_binary(Type), + ejabberd_sql:sql_query( + ServerHost, + ?SQL("select @(count(*))d from archive" + " where timestamp < %(TS)d" + " and kind=%(SType)s" + " and %(ServerHost)H")) + end, + case Res of + {selected, [Count]} -> + {ok, Count}; + _ -> + error + end. + +delete_old_messages_batch(ServerHost, TimeStamp, Type, Batch) -> + TS = misc:now_to_usec(TimeStamp), + Res = + case Type of + all -> + ejabberd_sql:sql_query( + ServerHost, + ?SQL("delete from archive" + " where timestamp < %(TS)d and %(ServerHost)H limit %(Batch)d")); + _ -> + SType = misc:atom_to_binary(Type), + ejabberd_sql:sql_query( + ServerHost, + ?SQL("delete from archive" + " where timestamp < %(TS)d" + " and kind=%(SType)s" + " and %(ServerHost)H limit %(Batch)d")) + end, + case Res of + {updated, Count} -> + {ok, Count}; + {error, _} = Error -> + Error + end. + delete_old_messages(ServerHost, TimeStamp, Type) -> TS = misc:now_to_usec(TimeStamp), case Type of