Merge remote-tracking branch 'processone/pr/3666'

* processone/pr/3666:
  PubSub: Add delete_old_pubsub_items command
  PubSub: Optimize publishing on large nodes (SQL)
  PubSub: Support unlimited number of items
  PubSub: Support 'max_items=max' node configuration
This commit is contained in:
Holger Weiss 2021-08-22 15:17:02 +02:00
commit 7e9c9703dd
12 changed files with 178 additions and 25 deletions

View File

@ -124,7 +124,7 @@ defmodule Ejabberd.MixProject do
{:pkix, "~> 1.0"},
{:stringprep, ">= 1.0.26"},
{:stun, "~> 1.0"},
{:xmpp, "~> 1.5"},
{:xmpp, git: "https://github.com/processone/xmpp", ref: "e943c0285aa85e3cbd4bfb9259f6b7de32b00395", override: true},
{:yconf, "~> 1.0"}]
++ cond_deps()
end

View File

@ -59,7 +59,7 @@
{stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.27"}}},
{if_var_true, stun,
{stun, ".*", {git, "https://github.com/processone/stun", {tag, "1.0.44"}}}},
{xmpp, ".*", {git, "https://github.com/processone/xmpp", {tag, "1.5.4"}}},
{xmpp, ".*", {git, "https://github.com/processone/xmpp", "e943c0285aa85e3cbd4bfb9259f6b7de32b00395"}},
{yconf, ".*", {git, "https://github.com/processone/yconf", {tag, "1.0.12"}}}
]}.

View File

@ -122,6 +122,11 @@
{result, {default, broadcast}} |
{error, stanza_error()}.
-callback remove_extra_items(NodeIdx :: nodeIdx(),
Max_Items :: unlimited | non_neg_integer()) ->
{result, {[itemId()], [itemId()]}
}.
-callback remove_extra_items(NodeIdx :: nodeIdx(),
Max_Items :: unlimited | non_neg_integer(),
ItemIds :: [itemId()]) ->

View File

@ -67,6 +67,9 @@
-callback get_nodes(Host :: host())->
[pubsubNode()].
-callback get_all_nodes(Host :: host()) ->
[pubsubNode()].
-callback get_parentnodes(Host :: host(),
NodeId :: nodeId(),
From :: jid:jid()) ->

View File

@ -45,6 +45,7 @@
-include("mod_roster.hrl").
-include("translate.hrl").
-include("ejabberd_stacktrace.hrl").
-include("ejabberd_commands.hrl").
-define(STDTREE, <<"tree">>).
-define(STDNODE, <<"flat">>).
@ -93,6 +94,9 @@
handle_call/3, handle_cast/2, handle_info/2, mod_doc/0,
terminate/2, code_change/3, depends/2, mod_opt_type/1, mod_options/1]).
%% ejabberd commands
-export([get_commands_spec/0, delete_old_items/1]).
-export([route/1]).
%%====================================================================
@ -210,7 +214,7 @@
pep_mapping :: [{binary(), binary()}],
ignore_pep_from_offline :: boolean(),
last_item_cache :: boolean(),
max_items_node :: non_neg_integer(),
max_items_node :: non_neg_integer()|unlimited,
max_subscriptions_node :: non_neg_integer()|undefined,
default_node_config :: [{atom(), binary()|boolean()|integer()|atom()}],
nodetree :: binary(),
@ -337,6 +341,7 @@ init([ServerHost|_]) ->
false ->
ok
end,
ejabberd_commands:register_commands(?MODULE, get_commands_spec()),
NodeTree = config(ServerHost, nodetree),
Plugins = config(ServerHost, plugins),
PepMapping = config(ServerHost, pep_mapping),
@ -806,7 +811,13 @@ terminate(_Reason,
gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_COMMANDS),
terminate_plugins(Host, ServerHost, Plugins, TreePlugin),
ejabberd_router:unregister_route(Host)
end, Hosts).
end, Hosts),
case gen_mod:is_loaded_elsewhere(ServerHost, ?MODULE) of
false ->
ejabberd_commands:unregister_commands(get_commands_spec());
true ->
ok
end.
%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
@ -3399,7 +3410,7 @@ node_config(_, _, []) ->
%% @doc <p>Return the maximum number of items for a given node.</p>
%% <p>Unlimited means that there is no limit in the number of items that can
%% be stored.</p>
-spec max_items(host(), [{atom(), any()}]) -> non_neg_integer().
-spec max_items(host(), [{atom(), any()}]) -> non_neg_integer() | unlimited.
max_items(Host, Options) ->
case get_option(Options, persist_items) of
true ->
@ -3549,14 +3560,18 @@ decode_get_pending(#xdata{fields = Fs}, Lang) ->
end.
-spec check_opt_range(atom(), [proplists:property()],
non_neg_integer() | undefined) -> boolean().
non_neg_integer() | unlimited | undefined) -> boolean().
check_opt_range(_Opt, _Opts, undefined) ->
true;
check_opt_range(_Opt, _Opts, unlimited) ->
true;
check_opt_range(Opt, Opts, Max) ->
Val = proplists:get_value(Opt, Opts, Max),
Val =< Max.
case proplists:get_value(Opt, Opts, Max) of
max -> true;
Val -> Val =< Max
end.
-spec get_max_items_node(host()) -> undefined | non_neg_integer().
-spec get_max_items_node(host()) -> undefined | unlimited | non_neg_integer().
get_max_items_node(Host) ->
config(Host, max_items_node, undefined).
@ -3708,6 +3723,7 @@ features() ->
<<"access-whitelist">>, % OPTIONAL
<<"collections">>, % RECOMMENDED
<<"config-node">>, % RECOMMENDED
<<"config-node-max">>,
<<"create-and-configure">>, % RECOMMENDED
<<"item-ids">>, % RECOMMENDED
<<"last-published">>, % RECOMMENDED
@ -4137,6 +4153,46 @@ purge_offline(Host, LJID, Node) ->
{error, xmpp:err_internal_server_error(Txt, Lang)}
end.
-spec delete_old_items(non_neg_integer()) -> ok | error.
delete_old_items(N) ->
Results = lists:flatmap(
fun(Host) ->
case tree_action(Host, get_all_nodes, [Host]) of
Nodes when is_list(Nodes) ->
lists:map(
fun(#pubsub_node{id = Nidx, type = Type}) ->
case node_action(Host, Type,
remove_extra_items,
[Nidx , N]) of
{result, _} ->
ok;
{error, _} ->
error
end
end, Nodes);
_ ->
error
end
end, ejabberd_option:hosts()),
case lists:member(error, Results) of
true ->
error;
false ->
ok
end.
-spec get_commands_spec() -> [ejabberd_commands()].
get_commands_spec() ->
[#ejabberd_commands{name = delete_old_pubsub_items, tags = [purge],
desc = "Keep only NUMBER of PubSub items per node",
module = ?MODULE, function = delete_old_items,
args_desc = ["Number of items to keep per node"],
args = [{number, integer}],
result = {res, rescode},
result_desc = "0 if command failed, 1 when succeeded",
args_example = [1000],
result_example = ok}].
-spec mod_opt_type(atom()) -> econf:validator().
mod_opt_type(access_createnode) ->
econf:acl();
@ -4147,7 +4203,7 @@ mod_opt_type(ignore_pep_from_offline) ->
mod_opt_type(last_item_cache) ->
econf:bool();
mod_opt_type(max_items_node) ->
econf:non_neg_int();
econf:non_neg_int(unlimited);
mod_opt_type(max_nodes_discoitems) ->
econf:non_neg_int(infinity);
mod_opt_type(max_subscriptions_node) ->
@ -4274,7 +4330,7 @@ mod_doc() ->
"and allows to raise user connection rate. The cost "
"is memory usage, as every item is stored in memory.")}},
{max_items_node,
#{value => "MaxItems",
#{value => "non_neg_integer() | infinity",
desc =>
?T("Define the maximum number of items that can be "
"stored in a node. Default value is: '10'.")}},

View File

@ -39,7 +39,8 @@
-export([init/3, terminate/2, options/0, features/0,
create_node_permission/6, create_node/2, delete_node/1,
purge_node/2, subscribe_node/8, unsubscribe_node/4,
publish_item/7, delete_item/4, remove_extra_items/3,
publish_item/7, delete_item/4,
remove_extra_items/2, remove_extra_items/3,
get_entity_affiliations/2, get_node_affiliations/1,
get_affiliation/2, set_affiliation/3,
get_entity_subscriptions/2, get_node_subscriptions/1,
@ -375,7 +376,8 @@ publish_item(Nidx, Publisher, PublishModel, MaxItems, ItemId, Payload,
or (Subscribed == true)) ->
{error, xmpp:err_forbidden()};
true ->
if MaxItems > 0 ->
if MaxItems > 0;
MaxItems == unlimited ->
Now = erlang:timestamp(),
case get_item(Nidx, ItemId) of
{result, #pubsub_item{creation = {_, GenKey}} = OldItem} ->
@ -402,6 +404,16 @@ publish_item(Nidx, Publisher, PublishModel, MaxItems, ItemId, Payload,
end
end.
remove_extra_items(Nidx, MaxItems) ->
{result, States} = get_states(Nidx),
Records = States ++ mnesia:read({pubsub_orphan, Nidx}),
ItemIds = lists:flatmap(fun(#pubsub_state{items = Is}) ->
Is;
(#pubsub_orphan{items = Is}) ->
Is
end, Records),
remove_extra_items(Nidx, MaxItems, ItemIds).
%% @doc <p>This function is used to remove extra items, most notably when the
%% maximum number of items has been reached.</p>
%% <p>This function is used internally by the core PubSub module, as no

View File

@ -40,9 +40,10 @@
-include("translate.hrl").
-export([init/3, terminate/2, options/0, features/0,
create_node_permission/6, create_node/2, delete_node/1,
purge_node/2, subscribe_node/8, unsubscribe_node/4,
publish_item/7, delete_item/4, remove_extra_items/3,
create_node_permission/6, create_node/2, delete_node/1, purge_node/2,
subscribe_node/8, unsubscribe_node/4,
publish_item/7, delete_item/4,
remove_extra_items/2, remove_extra_items/3,
get_entity_affiliations/2, get_node_affiliations/1,
get_affiliation/2, set_affiliation/3,
get_entity_subscriptions/2, get_node_subscriptions/1,
@ -247,7 +248,8 @@ publish_item(Nidx, Publisher, PublishModel, MaxItems, ItemId, Payload,
or (Subscribed == true)) ->
{error, xmpp:err_forbidden()};
true ->
if MaxItems > 0 ->
if MaxItems > 0;
MaxItems == unlimited ->
Now = erlang:timestamp(),
case get_item(Nidx, ItemId) of
{result, #pubsub_item{creation = {_, GenKey}} = OldItem} ->
@ -258,20 +260,23 @@ publish_item(Nidx, Publisher, PublishModel, MaxItems, ItemId, Payload,
{result, _} ->
{error, xmpp:err_forbidden()};
_ ->
Items = [ItemId | itemids(Nidx, GenKey)],
{result, {_NI, OI}} = remove_extra_items(Nidx, MaxItems, Items),
OldIds = maybe_remove_extra_items(Nidx, MaxItems,
GenKey, ItemId),
set_item(#pubsub_item{
itemid = {ItemId, Nidx},
creation = {Now, GenKey},
modification = {Now, SubKey},
payload = Payload}),
{result, {default, broadcast, OI}}
{result, {default, broadcast, OldIds}}
end;
true ->
{result, {default, broadcast, []}}
end
end.
remove_extra_items(Nidx, MaxItems) ->
remove_extra_items(Nidx, MaxItems, itemids(Nidx)).
remove_extra_items(_Nidx, unlimited, ItemIds) ->
{result, {ItemIds, []}};
remove_extra_items(Nidx, MaxItems, ItemIds) ->
@ -862,6 +867,18 @@ first_in_list(Pred, [H | T]) ->
_ -> first_in_list(Pred, T)
end.
itemids(Nidx) ->
case catch
ejabberd_sql:sql_query_t(
?SQL("select @(itemid)s from pubsub_item where "
"nodeid=%(Nidx)d order by modification desc"))
of
{selected, RItems} ->
[ItemId || {ItemId} <- RItems];
_ ->
[]
end.
itemids(Nidx, {_U, _S, _R} = JID) ->
SJID = encode_jid(JID),
SJIDLike = <<(encode_jid_like(JID))/binary, "/%">>,
@ -933,6 +950,16 @@ update_subscription(Nidx, JID, Subscription) ->
"-affiliation='n'"
]).
-spec maybe_remove_extra_items(mod_pubsub:nodeIdx(),
non_neg_integer() | unlimited, ljid(),
mod_pubsub:itemId()) -> [mod_pubsub:itemId()].
maybe_remove_extra_items(_Nidx, unlimited, _GenKey, _ItemId) ->
[];
maybe_remove_extra_items(Nidx, MaxItems, GenKey, ItemId) ->
ItemIds = [ItemId | itemids(Nidx, GenKey)],
{result, {_NewIds, OldIds}} = remove_extra_items(Nidx, MaxItems, ItemIds),
OldIds.
-spec decode_jid(SJID :: binary()) -> ljid().
decode_jid(SJID) ->
jid:tolower(jid:decode(SJID)).

View File

@ -35,7 +35,8 @@
-export([init/3, terminate/2, options/0, features/0,
create_node_permission/6, create_node/2, delete_node/1,
purge_node/2, subscribe_node/8, unsubscribe_node/4,
publish_item/7, delete_item/4, remove_extra_items/3,
publish_item/7, delete_item/4,
remove_extra_items/2, remove_extra_items/3,
get_entity_affiliations/2, get_node_affiliations/1,
get_affiliation/2, set_affiliation/3,
get_entity_subscriptions/2, get_node_subscriptions/1,
@ -135,6 +136,9 @@ publish_item(Nidx, Publisher, Model, MaxItems, ItemId, Payload, PubOpts) ->
node_flat:publish_item(Nidx, Publisher, Model, MaxItems, ItemId,
Payload, PubOpts).
remove_extra_items(Nidx, MaxItems) ->
node_flat:remove_extra_items(Nidx, MaxItems).
remove_extra_items(Nidx, MaxItems, ItemIds) ->
node_flat:remove_extra_items(Nidx, MaxItems, ItemIds).

View File

@ -37,7 +37,8 @@
-export([init/3, terminate/2, options/0, features/0,
create_node_permission/6, create_node/2, delete_node/1,
purge_node/2, subscribe_node/8, unsubscribe_node/4,
publish_item/7, delete_item/4, remove_extra_items/3,
publish_item/7, delete_item/4,
remove_extra_items/2, remove_extra_items/3,
get_entity_affiliations/2, get_node_affiliations/1,
get_affiliation/2, set_affiliation/3,
get_entity_subscriptions/2, get_node_subscriptions/1,
@ -92,6 +93,9 @@ publish_item(Nidx, Publisher, Model, MaxItems, ItemId, Payload, PubOpts) ->
node_flat_sql:publish_item(Nidx, Publisher, Model, MaxItems, ItemId,
Payload, PubOpts).
remove_extra_items(Nidx, MaxItems) ->
node_flat_sql:remove_extra_items(Nidx, MaxItems).
remove_extra_items(Nidx, MaxItems, ItemIds) ->
node_flat_sql:remove_extra_items(Nidx, MaxItems, ItemIds).

View File

@ -46,7 +46,8 @@
-export([init/3, terminate/2, options/0, set_node/1,
get_node/3, get_node/2, get_node/1, get_nodes/2,
get_nodes/1, get_parentnodes/3, get_parentnodes_tree/3,
get_nodes/1, get_all_nodes/1,
get_parentnodes/3, get_parentnodes_tree/3,
get_subnodes/3, get_subnodes_tree/3, create_node/6,
delete_node/2]).
@ -98,6 +99,14 @@ get_nodes(Host, Limit) ->
{Nodes, _} -> Nodes
end.
get_all_nodes({_U, _S, _R} = Owner) ->
Host = jid:tolower(jid:remove_resource(Owner)),
mnesia:match_object(#pubsub_node{nodeid = {Host, '_'}, _ = '_'});
get_all_nodes(Host) ->
mnesia:match_object(#pubsub_node{nodeid = {Host, '_'}, _ = '_'})
++ mnesia:match_object(#pubsub_node{nodeid = {{'_', Host, '_'}, '_'},
_ = '_'}).
get_parentnodes(Host, Node, _From) ->
case catch mnesia:read({pubsub_node, {Host, Node}}) of
[Record] when is_record(Record, pubsub_node) ->

View File

@ -45,7 +45,8 @@
-export([init/3, terminate/2, options/0, set_node/1,
get_node/3, get_node/2, get_node/1, get_nodes/2,
get_nodes/1, get_parentnodes/3, get_parentnodes_tree/3,
get_nodes/1, get_all_nodes/1,
get_parentnodes/3, get_parentnodes_tree/3,
get_subnodes/3, get_subnodes_tree/3, create_node/6,
delete_node/2]).
@ -165,6 +166,34 @@ get_nodes(Host, Limit) ->
[]
end.
get_all_nodes({_U, _S, _R} = JID) ->
SubKey = jid:tolower(JID),
GenKey = jid:remove_resource(SubKey),
EncKey = node_flat_sql:encode_jid(GenKey),
Pattern = <<(node_flat_sql:encode_jid_like(GenKey))/binary, "/%">>,
case ejabberd_sql:sql_query_t(
?SQL("select @(node)s, @(parent)s, @(plugin)s, @(nodeid)d "
"from pubsub_node where host=%(EncKey)s "
"or host like %(Pattern)s %ESCAPE")) of
{selected, RItems} ->
[raw_to_node(GenKey, Item) || Item <- RItems];
_ ->
[]
end;
get_all_nodes(Host) ->
Pattern1 = <<"%@", Host/binary>>,
Pattern2 = <<"%@", Host/binary, "/%">>,
case ejabberd_sql:sql_query_t(
?SQL("select @(node)s, @(parent)s, @(plugin)s, @(nodeid)d "
"from pubsub_node where host=%(Host)s "
"or host like %(Pattern1)s "
"or host like %(Pattern2)s %ESCAPE")) of
{selected, RItems} ->
[raw_to_node(Host, Item) || Item <- RItems];
_ ->
[]
end.
get_parentnodes(Host, Node, _From) ->
case get_node(Host, Node) of
Record when is_record(Record, pubsub_node) ->

View File

@ -38,7 +38,8 @@
-export([init/3, terminate/2, options/0, set_node/1,
get_node/3, get_node/2, get_node/1, get_nodes/2,
get_nodes/1, get_parentnodes/3, get_parentnodes_tree/3,
get_nodes/1, get_all_nodes/1,
get_parentnodes/3, get_parentnodes_tree/3,
get_subnodes/3, get_subnodes_tree/3, create_node/6,
delete_node/2]).
@ -71,6 +72,9 @@ get_nodes(Host) ->
get_nodes(_Host, _Limit) ->
[].
get_all_nodes(_Host) ->
[].
get_parentnodes(_Host, _Node, _From) ->
[].