PubSub: Add delete_expired_pubsub_items command

Support XEP-0060's pubsub#item_expire feature by adding a command for
deleting expired PubSub items.

Thanks to Ammonit Measurement GmbH for sponsoring this work.
This commit is contained in:
Holger Weiss 2021-10-30 13:19:30 +02:00
parent 5d48329a3f
commit 29dcc9b94c
6 changed files with 113 additions and 5 deletions

View File

@ -133,6 +133,10 @@
{result, {[itemId()], [itemId()]}
}.
-callback remove_expired_items(NodeIdx :: nodeIdx(),
Seconds :: infinity | non_neg_integer()) ->
{result, [itemId()]}.
-callback get_node_affiliations(NodeIdx :: nodeIdx()) ->
{result, [{ljid(), affiliation()}]}.

View File

@ -95,7 +95,7 @@
terminate/2, code_change/3, depends/2, mod_opt_type/1, mod_options/1]).
%% ejabberd commands
-export([get_commands_spec/0, delete_old_items/1]).
-export([get_commands_spec/0, delete_old_items/1, delete_expired_items/0]).
-export([route/1]).
@ -3431,6 +3431,14 @@ max_items(Host, Options) ->
end
end.
-spec item_expire(host(), [{atom(), any()}]) -> non_neg_integer() | infinity.
item_expire(Host, Options) ->
case get_option(Options, item_expire) of
I when is_integer(I), I < 0 -> 0;
I when is_integer(I) -> I;
_ -> get_max_item_expire_node(Host)
end.
-spec get_configure_xfields(_, pubsub_node_config:result(),
binary(), [binary()]) -> [xdata_field()].
get_configure_xfields(_Type, Options, Lang, Groups) ->
@ -3575,6 +3583,10 @@ check_opt_range(Opt, Opts, Max) ->
get_max_items_node(Host) ->
config(Host, max_items_node, undefined).
-spec get_max_item_expire_node(host()) -> infinity | non_neg_integer().
get_max_item_expire_node(Host) ->
config(Host, max_item_expire_node, infinity).
-spec get_max_subscriptions_node(host()) -> undefined | non_neg_integer().
get_max_subscriptions_node(Host) ->
config(Host, max_subscriptions_node, undefined).
@ -4181,6 +4193,44 @@ delete_old_items(N) ->
ok
end.
-spec delete_expired_items() -> ok | error.
delete_expired_items() ->
Results = lists:flatmap(
fun(Host) ->
case tree_action(Host, get_all_nodes, [Host]) of
Nodes when is_list(Nodes) ->
lists:map(
fun(#pubsub_node{id = Nidx, type = Type,
options = Options}) ->
case item_expire(Host, Options) of
infinity ->
ok;
Seconds ->
case node_action(
Host, Type,
remove_expired_items,
[Nidx, Seconds]) of
{result, []} ->
ok;
{result, [_|_]} ->
unset_cached_item(
Host, Nidx);
{error, _} ->
error
end
end
end, Nodes);
_ ->
error
end
end, ejabberd_option:hosts()),
case lists:member(error, Results) of
true ->
error;
false ->
ok
end.
-spec get_commands_spec() -> [ejabberd_commands()].
get_commands_spec() ->
[#ejabberd_commands{name = delete_old_pubsub_items, tags = [purge],
@ -4191,6 +4241,13 @@ get_commands_spec() ->
result = {res, rescode},
result_desc = "0 if command failed, 1 when succeeded",
args_example = [1000],
result_example = ok},
#ejabberd_commands{name = delete_expired_pubsub_items, tags = [purge],
desc = "Delete expired PubSub items",
module = ?MODULE, function = delete_expired_items,
args = [],
result = {res, rescode},
result_desc = "0 if command failed, 1 when succeeded",
result_example = ok}].
-spec mod_opt_type(atom()) -> econf:validator().
@ -4204,6 +4261,8 @@ mod_opt_type(last_item_cache) ->
econf:bool();
mod_opt_type(max_items_node) ->
econf:non_neg_int(unlimited);
mod_opt_type(max_item_expire_node) ->
econf:timeout(second, infinity);
mod_opt_type(max_nodes_discoitems) ->
econf:non_neg_int(infinity);
mod_opt_type(max_subscriptions_node) ->
@ -4251,6 +4310,7 @@ mod_options(Host) ->
{ignore_pep_from_offline, true},
{last_item_cache, false},
{max_items_node, ?MAXITEMS},
{max_item_expire_node, infinity},
{max_nodes_discoitems, 100},
{nodetree, ?STDTREE},
{pep_mapping, []},
@ -4329,6 +4389,11 @@ mod_doc() ->
" so many nodes, caching last items speeds up pubsub "
"and allows to raise user connection rate. The cost "
"is memory usage, as every item is stored in memory.")}},
{max_item_expire_node,
#{value => "timeout() | infinity",
desc =>
?T("Specify the maximum item epiry time. Default value "
"is: 'infinity'.")}},
{max_items_node,
#{value => "non_neg_integer() | infinity",
desc =>

View File

@ -40,7 +40,7 @@
create_node_permission/6, create_node/2, delete_node/1,
purge_node/2, subscribe_node/8, unsubscribe_node/4,
publish_item/7, delete_item/4,
remove_extra_items/2, remove_extra_items/3,
remove_extra_items/2, remove_extra_items/3, remove_expired_items/2,
get_entity_affiliations/2, get_node_affiliations/1,
get_affiliation/2, set_affiliation/3,
get_entity_subscriptions/2, get_node_subscriptions/1,
@ -432,6 +432,22 @@ remove_extra_items(Nidx, MaxItems, ItemIds) ->
del_items(Nidx, OldItems),
{result, {NewItems, OldItems}}.
remove_expired_items(_Nidx, infinity) ->
{result, []};
remove_expired_items(Nidx, Seconds) ->
Items = mnesia:index_read(pubsub_item, Nidx, #pubsub_item.nodeidx),
ExpT = misc:usec_to_now(
erlang:system_time(microsecond) - (Seconds * 1000000)),
ExpItems = lists:filtermap(
fun(#pubsub_item{itemid = {ItemId, _},
modification = {ModT, _}}) when ModT < ExpT ->
{true, ItemId};
(#pubsub_item{}) ->
false
end, Items),
del_items(Nidx, ExpItems),
{result, ExpItems}.
%% @doc <p>Triggers item deletion.</p>
%% <p>Default plugin: The user performing the deletion must be the node owner
%% or a publisher, or PublishModel being open.</p>

View File

@ -43,7 +43,7 @@
create_node_permission/6, create_node/2, delete_node/1, purge_node/2,
subscribe_node/8, unsubscribe_node/4,
publish_item/7, delete_item/4,
remove_extra_items/2, remove_extra_items/3,
remove_extra_items/2, remove_extra_items/3, remove_expired_items/2,
get_entity_affiliations/2, get_node_affiliations/1,
get_affiliation/2, set_affiliation/3,
get_entity_subscriptions/2, get_node_subscriptions/1,
@ -285,6 +285,23 @@ remove_extra_items(Nidx, MaxItems, ItemIds) ->
del_items(Nidx, OldItems),
{result, {NewItems, OldItems}}.
remove_expired_items(_Nidx, infinity) ->
{result, []};
remove_expired_items(Nidx, Seconds) ->
ExpT = encode_now(
misc:usec_to_now(
erlang:system_time(microsecond) - (Seconds * 1000000))),
case ejabberd_sql:sql_query_t(
?SQL("select @(itemid)s from pubsub_item where nodeid=%(Nidx)d "
"and creation < %(ExpT)s")) of
{selected, RItems} ->
ItemIds = [ItemId || {ItemId} <- RItems],
del_items(Nidx, ItemIds),
{result, ItemIds};
_ ->
{result, []}
end.
delete_item(Nidx, Publisher, PublishModel, ItemId) ->
SubKey = jid:tolower(Publisher),
GenKey = jid:remove_resource(SubKey),

View File

@ -36,7 +36,7 @@
create_node_permission/6, create_node/2, delete_node/1,
purge_node/2, subscribe_node/8, unsubscribe_node/4,
publish_item/7, delete_item/4,
remove_extra_items/2, remove_extra_items/3,
remove_extra_items/2, remove_extra_items/3, remove_expired_items/2,
get_entity_affiliations/2, get_node_affiliations/1,
get_affiliation/2, set_affiliation/3,
get_entity_subscriptions/2, get_node_subscriptions/1,
@ -142,6 +142,9 @@ remove_extra_items(Nidx, MaxItems) ->
remove_extra_items(Nidx, MaxItems, ItemIds) ->
node_flat:remove_extra_items(Nidx, MaxItems, ItemIds).
remove_expired_items(Nidx, ItemIds) ->
node_flat:remove_expired_items(Nidx, ItemIds).
delete_item(Nidx, Publisher, PublishModel, ItemId) ->
node_flat:delete_item(Nidx, Publisher, PublishModel, ItemId).

View File

@ -38,7 +38,7 @@
create_node_permission/6, create_node/2, delete_node/1,
purge_node/2, subscribe_node/8, unsubscribe_node/4,
publish_item/7, delete_item/4,
remove_extra_items/2, remove_extra_items/3,
remove_extra_items/2, remove_extra_items/3, remove_expired_items/2,
get_entity_affiliations/2, get_node_affiliations/1,
get_affiliation/2, set_affiliation/3,
get_entity_subscriptions/2, get_node_subscriptions/1,
@ -99,6 +99,9 @@ remove_extra_items(Nidx, MaxItems) ->
remove_extra_items(Nidx, MaxItems, ItemIds) ->
node_flat_sql:remove_extra_items(Nidx, MaxItems, ItemIds).
remove_expired_items(Nidx, ItemIds) ->
node_flat_sql:remove_expired_items(Nidx, ItemIds).
delete_item(Nidx, Publisher, PublishModel, ItemId) ->
node_flat_sql:delete_item(Nidx, Publisher, PublishModel, ItemId).