PubSub: Add delete_old_pubsub_items command

Add a command for keeping only the specified number of items on each
node and removing all older items.  This might be especially useful if
nodes may be configured to have no 'max_items' limit.

Thanks to Ammonit Measurement GmbH for sponsoring this work.
This commit is contained in:
Holger Weiss 2021-08-22 12:44:50 +02:00
parent 29751a6174
commit 8d5025076f
10 changed files with 146 additions and 10 deletions

View File

@ -122,6 +122,11 @@
{result, {default, broadcast}} |
{error, stanza_error()}.
-callback remove_extra_items(NodeIdx :: nodeIdx(),
Max_Items :: unlimited | non_neg_integer()) ->
{result, {[itemId()], [itemId()]}
}.
-callback remove_extra_items(NodeIdx :: nodeIdx(),
Max_Items :: unlimited | non_neg_integer(),
ItemIds :: [itemId()]) ->

View File

@ -67,6 +67,9 @@
-callback get_nodes(Host :: host())->
[pubsubNode()].
-callback get_all_nodes(Host :: host()) ->
[pubsubNode()].
-callback get_parentnodes(Host :: host(),
NodeId :: nodeId(),
From :: jid:jid()) ->

View File

@ -45,6 +45,7 @@
-include("mod_roster.hrl").
-include("translate.hrl").
-include("ejabberd_stacktrace.hrl").
-include("ejabberd_commands.hrl").
-define(STDTREE, <<"tree">>).
-define(STDNODE, <<"flat">>).
@ -93,6 +94,9 @@
handle_call/3, handle_cast/2, handle_info/2, mod_doc/0,
terminate/2, code_change/3, depends/2, mod_opt_type/1, mod_options/1]).
%% ejabberd commands
-export([get_commands_spec/0, delete_old_items/1]).
-export([route/1]).
%%====================================================================
@ -337,6 +341,7 @@ init([ServerHost|_]) ->
false ->
ok
end,
ejabberd_commands:register_commands(?MODULE, get_commands_spec()),
NodeTree = config(ServerHost, nodetree),
Plugins = config(ServerHost, plugins),
PepMapping = config(ServerHost, pep_mapping),
@ -806,7 +811,13 @@ terminate(_Reason,
gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_COMMANDS),
terminate_plugins(Host, ServerHost, Plugins, TreePlugin),
ejabberd_router:unregister_route(Host)
end, Hosts).
end, Hosts),
case gen_mod:is_loaded_elsewhere(ServerHost, ?MODULE) of
false ->
ejabberd_commands:unregister_commands(get_commands_spec());
true ->
ok
end.
%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
@ -4142,6 +4153,46 @@ purge_offline(Host, LJID, Node) ->
{error, xmpp:err_internal_server_error(Txt, Lang)}
end.
-spec delete_old_items(non_neg_integer()) -> ok | error.
delete_old_items(N) ->
Results = lists:flatmap(
fun(Host) ->
case tree_action(Host, get_all_nodes, [Host]) of
Nodes when is_list(Nodes) ->
lists:map(
fun(#pubsub_node{id = Nidx, type = Type}) ->
case node_action(Host, Type,
remove_extra_items,
[Nidx , N]) of
{result, _} ->
ok;
{error, _} ->
error
end
end, Nodes);
_ ->
error
end
end, ejabberd_option:hosts()),
case lists:member(error, Results) of
true ->
error;
false ->
ok
end.
-spec get_commands_spec() -> [ejabberd_commands()].
get_commands_spec() ->
[#ejabberd_commands{name = delete_old_pubsub_items, tags = [purge],
desc = "Keep only NUMBER of PubSub items per node",
module = ?MODULE, function = delete_old_items,
args_desc = ["Number of items to keep per node"],
args = [{number, integer}],
result = {res, rescode},
result_desc = "0 if command failed, 1 when succeeded",
args_example = [1000],
result_example = ok}].
-spec mod_opt_type(atom()) -> econf:validator().
mod_opt_type(access_createnode) ->
econf:acl();

View File

@ -39,7 +39,8 @@
-export([init/3, terminate/2, options/0, features/0,
create_node_permission/6, create_node/2, delete_node/1,
purge_node/2, subscribe_node/8, unsubscribe_node/4,
publish_item/7, delete_item/4, remove_extra_items/3,
publish_item/7, delete_item/4,
remove_extra_items/2, remove_extra_items/3,
get_entity_affiliations/2, get_node_affiliations/1,
get_affiliation/2, set_affiliation/3,
get_entity_subscriptions/2, get_node_subscriptions/1,
@ -403,6 +404,16 @@ publish_item(Nidx, Publisher, PublishModel, MaxItems, ItemId, Payload,
end
end.
remove_extra_items(Nidx, MaxItems) ->
{result, States} = get_states(Nidx),
Records = States ++ mnesia:read({pubsub_orphan, Nidx}),
ItemIds = lists:flatmap(fun(#pubsub_state{items = Is}) ->
Is;
(#pubsub_orphan{items = Is}) ->
Is
end, Records),
remove_extra_items(Nidx, MaxItems, ItemIds).
%% @doc <p>This function is used to remove extra items, most notably when the
%% maximum number of items has been reached.</p>
%% <p>This function is used internally by the core PubSub module, as no

View File

@ -40,9 +40,10 @@
-include("translate.hrl").
-export([init/3, terminate/2, options/0, features/0,
create_node_permission/6, create_node/2, delete_node/1,
purge_node/2, subscribe_node/8, unsubscribe_node/4,
publish_item/7, delete_item/4, remove_extra_items/3,
create_node_permission/6, create_node/2, delete_node/1, purge_node/2,
subscribe_node/8, unsubscribe_node/4,
publish_item/7, delete_item/4,
remove_extra_items/2, remove_extra_items/3,
get_entity_affiliations/2, get_node_affiliations/1,
get_affiliation/2, set_affiliation/3,
get_entity_subscriptions/2, get_node_subscriptions/1,
@ -273,6 +274,9 @@ publish_item(Nidx, Publisher, PublishModel, MaxItems, ItemId, Payload,
end
end.
remove_extra_items(Nidx, MaxItems) ->
remove_extra_items(Nidx, MaxItems, itemids(Nidx)).
remove_extra_items(_Nidx, unlimited, ItemIds) ->
{result, {ItemIds, []}};
remove_extra_items(Nidx, MaxItems, ItemIds) ->
@ -863,6 +867,18 @@ first_in_list(Pred, [H | T]) ->
_ -> first_in_list(Pred, T)
end.
itemids(Nidx) ->
case catch
ejabberd_sql:sql_query_t(
?SQL("select @(itemid)s from pubsub_item where "
"nodeid=%(Nidx)d order by modification desc"))
of
{selected, RItems} ->
[ItemId || {ItemId} <- RItems];
_ ->
[]
end.
itemids(Nidx, {_U, _S, _R} = JID) ->
SJID = encode_jid(JID),
SJIDLike = <<(encode_jid_like(JID))/binary, "/%">>,

View File

@ -35,7 +35,8 @@
-export([init/3, terminate/2, options/0, features/0,
create_node_permission/6, create_node/2, delete_node/1,
purge_node/2, subscribe_node/8, unsubscribe_node/4,
publish_item/7, delete_item/4, remove_extra_items/3,
publish_item/7, delete_item/4,
remove_extra_items/2, remove_extra_items/3,
get_entity_affiliations/2, get_node_affiliations/1,
get_affiliation/2, set_affiliation/3,
get_entity_subscriptions/2, get_node_subscriptions/1,
@ -135,6 +136,9 @@ publish_item(Nidx, Publisher, Model, MaxItems, ItemId, Payload, PubOpts) ->
node_flat:publish_item(Nidx, Publisher, Model, MaxItems, ItemId,
Payload, PubOpts).
remove_extra_items(Nidx, MaxItems) ->
node_flat:remove_extra_items(Nidx, MaxItems).
remove_extra_items(Nidx, MaxItems, ItemIds) ->
node_flat:remove_extra_items(Nidx, MaxItems, ItemIds).

View File

@ -37,7 +37,8 @@
-export([init/3, terminate/2, options/0, features/0,
create_node_permission/6, create_node/2, delete_node/1,
purge_node/2, subscribe_node/8, unsubscribe_node/4,
publish_item/7, delete_item/4, remove_extra_items/3,
publish_item/7, delete_item/4,
remove_extra_items/2, remove_extra_items/3,
get_entity_affiliations/2, get_node_affiliations/1,
get_affiliation/2, set_affiliation/3,
get_entity_subscriptions/2, get_node_subscriptions/1,
@ -92,6 +93,9 @@ publish_item(Nidx, Publisher, Model, MaxItems, ItemId, Payload, PubOpts) ->
node_flat_sql:publish_item(Nidx, Publisher, Model, MaxItems, ItemId,
Payload, PubOpts).
remove_extra_items(Nidx, MaxItems) ->
node_flat_sql:remove_extra_items(Nidx, MaxItems).
remove_extra_items(Nidx, MaxItems, ItemIds) ->
node_flat_sql:remove_extra_items(Nidx, MaxItems, ItemIds).

View File

@ -46,7 +46,8 @@
-export([init/3, terminate/2, options/0, set_node/1,
get_node/3, get_node/2, get_node/1, get_nodes/2,
get_nodes/1, get_parentnodes/3, get_parentnodes_tree/3,
get_nodes/1, get_all_nodes/1,
get_parentnodes/3, get_parentnodes_tree/3,
get_subnodes/3, get_subnodes_tree/3, create_node/6,
delete_node/2]).
@ -98,6 +99,14 @@ get_nodes(Host, Limit) ->
{Nodes, _} -> Nodes
end.
get_all_nodes({_U, _S, _R} = Owner) ->
Host = jid:tolower(jid:remove_resource(Owner)),
mnesia:match_object(#pubsub_node{nodeid = {Host, '_'}, _ = '_'});
get_all_nodes(Host) ->
mnesia:match_object(#pubsub_node{nodeid = {Host, '_'}, _ = '_'})
++ mnesia:match_object(#pubsub_node{nodeid = {{'_', Host, '_'}, '_'},
_ = '_'}).
get_parentnodes(Host, Node, _From) ->
case catch mnesia:read({pubsub_node, {Host, Node}}) of
[Record] when is_record(Record, pubsub_node) ->

View File

@ -45,7 +45,8 @@
-export([init/3, terminate/2, options/0, set_node/1,
get_node/3, get_node/2, get_node/1, get_nodes/2,
get_nodes/1, get_parentnodes/3, get_parentnodes_tree/3,
get_nodes/1, get_all_nodes/1,
get_parentnodes/3, get_parentnodes_tree/3,
get_subnodes/3, get_subnodes_tree/3, create_node/6,
delete_node/2]).
@ -165,6 +166,34 @@ get_nodes(Host, Limit) ->
[]
end.
get_all_nodes({_U, _S, _R} = JID) ->
SubKey = jid:tolower(JID),
GenKey = jid:remove_resource(SubKey),
EncKey = node_flat_sql:encode_jid(GenKey),
Pattern = <<(node_flat_sql:encode_jid_like(GenKey))/binary, "/%">>,
case ejabberd_sql:sql_query_t(
?SQL("select @(node)s, @(parent)s, @(plugin)s, @(nodeid)d "
"from pubsub_node where host=%(EncKey)s "
"or host like %(Pattern)s %ESCAPE")) of
{selected, RItems} ->
[raw_to_node(GenKey, Item) || Item <- RItems];
_ ->
[]
end;
get_all_nodes(Host) ->
Pattern1 = <<"%@", Host/binary>>,
Pattern2 = <<"%@", Host/binary, "/%">>,
case ejabberd_sql:sql_query_t(
?SQL("select @(node)s, @(parent)s, @(plugin)s, @(nodeid)d "
"from pubsub_node where host=%(Host)s "
"or host like %(Pattern1)s "
"or host like %(Pattern2)s %ESCAPE")) of
{selected, RItems} ->
[raw_to_node(Host, Item) || Item <- RItems];
_ ->
[]
end.
get_parentnodes(Host, Node, _From) ->
case get_node(Host, Node) of
Record when is_record(Record, pubsub_node) ->

View File

@ -38,7 +38,8 @@
-export([init/3, terminate/2, options/0, set_node/1,
get_node/3, get_node/2, get_node/1, get_nodes/2,
get_nodes/1, get_parentnodes/3, get_parentnodes_tree/3,
get_nodes/1, get_all_nodes/1,
get_parentnodes/3, get_parentnodes_tree/3,
get_subnodes/3, get_subnodes_tree/3, create_node/6,
delete_node/2]).
@ -71,6 +72,9 @@ get_nodes(Host) ->
get_nodes(_Host, _Limit) ->
[].
get_all_nodes(_Host) ->
[].
get_parentnodes(_Host, _Node, _From) ->
[].