Optimize muc subscriptions handling
This commit is contained in:
parent
e24e05c6af
commit
0452ffc1df
12
sql/lite.sql
12
sql/lite.sql
|
@ -290,6 +290,18 @@ CREATE TABLE muc_online_users (
|
|||
CREATE UNIQUE INDEX i_muc_online_users ON muc_online_users (username, server, resource, name, host);
|
||||
CREATE INDEX i_muc_online_users_us ON muc_online_users (username, server);
|
||||
|
||||
CREATE TABLE muc_room_subscribers (
|
||||
room text NOT NULL,
|
||||
host text NOT NULL,
|
||||
jid text NOT NULL,
|
||||
nick text NOT NULL,
|
||||
nodes text NOT NULL,
|
||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
CREATE INDEX i_muc_room_subscribers_host_jid USING BTREE ON muc_room_subscribers(host, jid);
|
||||
CREATE UNIQUE INDEX i_muc_room_subscribers_host_room_jid USING BTREE ON muc_room_subscribers(host, room, jid);
|
||||
|
||||
CREATE TABLE irc_custom (
|
||||
jid text NOT NULL,
|
||||
host text NOT NULL,
|
||||
|
|
|
@ -149,6 +149,18 @@ WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW
|
|||
CREATE UNIQUE CLUSTERED INDEX [muc_online_users_us] ON [muc_online_users] (username, server);
|
||||
WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON);
|
||||
|
||||
CREATE TABLE [dbo].[muc_room_subscribers] (
|
||||
[room] [varchar] (191) NOT NULL,
|
||||
[host] [varchar] (191) NOT NULL,
|
||||
[jid] [varchar] (191) NOT NULL,
|
||||
[nick] [text] NOT NULL,
|
||||
[nodes] [text] NOT NULL,
|
||||
[created_at] [datetime] NOT NULL DEFAULT GETDATE()
|
||||
);
|
||||
|
||||
CREATE UNIQUE CLUSTERED INDEX [muc_room_subscribers_host_room_jid] ON [muc_room_subscribers] (host, room, jid);
|
||||
CREATE INDEX [muc_room_subscribers_host_jid] ON [muc_room_subscribers] (host, jid);
|
||||
|
||||
CREATE TABLE [dbo].[privacy_default_list] (
|
||||
[username] [varchar] (250) NOT NULL,
|
||||
[name] [varchar] (250) NOT NULL,
|
||||
|
|
|
@ -306,6 +306,18 @@ CREATE TABLE muc_online_users (
|
|||
CREATE UNIQUE INDEX i_muc_online_users USING BTREE ON muc_online_users(username(75), server(75), resource(75), name(75), host(75));
|
||||
CREATE INDEX i_muc_online_users_us USING BTREE ON muc_online_users(username(75), server(75));
|
||||
|
||||
CREATE TABLE muc_room_subscribers (
|
||||
room varchar(191) NOT NULL,
|
||||
host varchar(191) NOT NULL,
|
||||
jid varchar(191) NOT NULL,
|
||||
nick text NOT NULL,
|
||||
nodes text NOT NULL,
|
||||
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
UNIQUE KEY i_muc_room_subscribers_host_room_jid (host, room, jid)
|
||||
) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
|
||||
|
||||
CREATE INDEX i_muc_room_subscribers_host_jid USING BTREE ON muc_room_subscribers(host, jid);
|
||||
|
||||
CREATE TABLE irc_custom (
|
||||
jid text NOT NULL,
|
||||
host text NOT NULL,
|
||||
|
|
12
sql/pg.sql
12
sql/pg.sql
|
@ -308,6 +308,18 @@ CREATE TABLE muc_online_users (
|
|||
CREATE UNIQUE INDEX i_muc_online_users ON muc_online_users USING btree (username, server, resource, name, host);
|
||||
CREATE INDEX i_muc_online_users_us ON muc_online_users USING btree (username, server);
|
||||
|
||||
CREATE TABLE muc_room_subscribers (
|
||||
room text NOT NULL,
|
||||
host text NOT NULL,
|
||||
jid text NOT NULL,
|
||||
nick text NOT NULL,
|
||||
nodes text NOT NULL,
|
||||
created_at TIMESTAMP NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE INDEX i_muc_room_subscribers_host_jid ON muc_room_subscribers USING btree (host, jid);
|
||||
CREATE UNIQUE INDEX i_muc_room_subscribers_host_room_jid ON muc_room_subscribers USING btree (host, room, jid);
|
||||
|
||||
CREATE TABLE irc_custom (
|
||||
jid text NOT NULL,
|
||||
host text NOT NULL,
|
||||
|
|
|
@ -39,6 +39,7 @@
|
|||
reload/3,
|
||||
room_destroyed/4,
|
||||
store_room/4,
|
||||
store_room/5,
|
||||
restore_room/3,
|
||||
forget_room/3,
|
||||
create_room/5,
|
||||
|
@ -88,7 +89,7 @@
|
|||
-type muc_room_opts() :: [{atom(), any()}].
|
||||
-callback init(binary(), gen_mod:opts()) -> any().
|
||||
-callback import(binary(), binary(), [binary()]) -> ok.
|
||||
-callback store_room(binary(), binary(), binary(), list()) -> {atomic, any()}.
|
||||
-callback store_room(binary(), binary(), binary(), list(), list()|undefined) -> {atomic, any()}.
|
||||
-callback restore_room(binary(), binary(), binary()) -> muc_room_opts() | error.
|
||||
-callback forget_room(binary(), binary(), binary()) -> {atomic, any()}.
|
||||
-callback can_use_nick(binary(), binary(), jid(), binary()) -> boolean().
|
||||
|
@ -105,6 +106,8 @@
|
|||
-callback unregister_online_user(binary(), ljid(), binary(), binary()) -> any().
|
||||
-callback count_online_rooms_by_user(binary(), binary(), binary()) -> non_neg_integer().
|
||||
-callback get_online_rooms_by_user(binary(), binary(), binary()) -> [{binary(), binary()}].
|
||||
-callback get_subscribed_rooms(binary(), binary(), jid()) ->
|
||||
{ok, [{ljid(), binary(), [binary()]}]} | {error, any()}.
|
||||
|
||||
%%====================================================================
|
||||
%% API
|
||||
|
@ -157,9 +160,12 @@ create_room(Host, Name, From, Nick, Opts) ->
|
|||
gen_server:call(Proc, {create, Name, Host, From, Nick, Opts}).
|
||||
|
||||
store_room(ServerHost, Host, Name, Opts) ->
|
||||
store_room(ServerHost, Host, Name, Opts, undefined).
|
||||
|
||||
store_room(ServerHost, Host, Name, Opts, ChangesHints) ->
|
||||
LServer = jid:nameprep(ServerHost),
|
||||
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||
Mod:store_room(LServer, Host, Name, Opts).
|
||||
Mod:store_room(LServer, Host, Name, Opts, ChangesHints).
|
||||
|
||||
restore_room(ServerHost, Host, Name) ->
|
||||
LServer = jid:nameprep(ServerHost),
|
||||
|
@ -696,8 +702,12 @@ get_room_disco_item({Name, Host, Pid}, Query) ->
|
|||
end.
|
||||
|
||||
get_subscribed_rooms(ServerHost, Host, From) ->
|
||||
Rooms = get_online_rooms(ServerHost, Host),
|
||||
LServer = jid:nameprep(ServerHost),
|
||||
Mod = gen_mod:db_mod(LServer, ?MODULE),
|
||||
BareFrom = jid:remove_resource(From),
|
||||
case Mod:get_subscribed_rooms(LServer, Host, BareFrom) of
|
||||
not_implmented ->
|
||||
Rooms = get_online_rooms(ServerHost, Host),
|
||||
lists:flatmap(
|
||||
fun({Name, _, Pid}) ->
|
||||
case p1_fsm:sync_send_all_state_event(Pid, {is_subscribed, BareFrom}) of
|
||||
|
@ -706,7 +716,10 @@ get_subscribed_rooms(ServerHost, Host, From) ->
|
|||
end;
|
||||
(_) ->
|
||||
[]
|
||||
end, Rooms).
|
||||
end, Rooms);
|
||||
V ->
|
||||
V
|
||||
end.
|
||||
|
||||
get_nick(ServerHost, Host, From) ->
|
||||
LServer = jid:nameprep(ServerHost),
|
||||
|
|
|
@ -28,12 +28,13 @@
|
|||
-behaviour(mod_muc_room).
|
||||
|
||||
%% API
|
||||
-export([init/2, import/3, store_room/4, restore_room/3, forget_room/3,
|
||||
-export([init/2, import/3, store_room/5, restore_room/3, forget_room/3,
|
||||
can_use_nick/4, get_rooms/2, get_nick/3, set_nick/4]).
|
||||
-export([register_online_room/4, unregister_online_room/4, find_online_room/3,
|
||||
get_online_rooms/3, count_online_rooms/2, rsm_supported/0,
|
||||
register_online_user/4, unregister_online_user/4,
|
||||
count_online_rooms_by_user/3, get_online_rooms_by_user/3]).
|
||||
count_online_rooms_by_user/3, get_online_rooms_by_user/3,
|
||||
get_subscribed_rooms/3]).
|
||||
-export([set_affiliation/6, set_affiliations/4, get_affiliation/5,
|
||||
get_affiliations/3, search_affiliation/4]).
|
||||
%% gen_server callbacks
|
||||
|
@ -63,7 +64,7 @@ start_link(Host, Opts) ->
|
|||
Name = gen_mod:get_module_proc(Host, ?MODULE),
|
||||
gen_server:start_link({local, Name}, ?MODULE, [Host, Opts], []).
|
||||
|
||||
store_room(_LServer, Host, Name, Opts) ->
|
||||
store_room(_LServer, Host, Name, Opts, _) ->
|
||||
F = fun () ->
|
||||
mnesia:write(#muc_room{name_host = {Name, Host},
|
||||
opts = Opts})
|
||||
|
@ -397,3 +398,6 @@ transform(#muc_registered{us_host = {{U, S}, H}, nick = Nick} = R) ->
|
|||
R#muc_registered{us_host = {{iolist_to_binary(U), iolist_to_binary(S)},
|
||||
iolist_to_binary(H)},
|
||||
nick = iolist_to_binary(Nick)}.
|
||||
|
||||
get_subscribed_rooms(_, _, _) ->
|
||||
not_implemented.
|
||||
|
|
|
@ -28,12 +28,13 @@
|
|||
-behaviour(mod_muc_room).
|
||||
|
||||
%% API
|
||||
-export([init/2, import/3, store_room/4, restore_room/3, forget_room/3,
|
||||
-export([init/2, import/3, store_room/5, restore_room/3, forget_room/3,
|
||||
can_use_nick/4, get_rooms/2, get_nick/3, set_nick/4]).
|
||||
-export([register_online_room/4, unregister_online_room/4, find_online_room/3,
|
||||
get_online_rooms/3, count_online_rooms/2, rsm_supported/0,
|
||||
register_online_user/4, unregister_online_user/4,
|
||||
count_online_rooms_by_user/3, get_online_rooms_by_user/3]).
|
||||
count_online_rooms_by_user/3, get_online_rooms_by_user/3,
|
||||
get_subscribed_rooms/3]).
|
||||
-export([set_affiliation/6, set_affiliations/4, get_affiliation/5,
|
||||
get_affiliations/3, search_affiliation/4]).
|
||||
|
||||
|
@ -46,7 +47,7 @@
|
|||
init(_Host, _Opts) ->
|
||||
ok.
|
||||
|
||||
store_room(_LServer, Host, Name, Opts) ->
|
||||
store_room(_LServer, Host, Name, Opts, _) ->
|
||||
{atomic, ejabberd_riak:put(#muc_room{name_host = {Name, Host},
|
||||
opts = Opts},
|
||||
muc_room_schema())}.
|
||||
|
@ -183,6 +184,9 @@ import(_LServer, <<"muc_registered">>,
|
|||
ejabberd_riak:put(R, muc_registered_schema(),
|
||||
[{'2i', [{<<"nick_host">>, {Nick, RoomHost}}]}]).
|
||||
|
||||
get_subscribed_rooms(_, _, _) ->
|
||||
not_implemented.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
|
|
|
@ -1624,7 +1624,7 @@ set_subscriber(JID, Nick, Nodes, StateData) ->
|
|||
Nicks = ?DICT:store(Nick, [LBareJID], StateData#state.subscriber_nicks),
|
||||
NewStateData = StateData#state{subscribers = Subscribers,
|
||||
subscriber_nicks = Nicks},
|
||||
store_room(NewStateData),
|
||||
store_room(NewStateData, [{add_subscription, BareJID, Nick, Nodes}]),
|
||||
case not ?DICT:is_key(LBareJID, StateData#state.subscribers) of
|
||||
true ->
|
||||
send_subscriptions_change_notifications(BareJID, Nick, subscribe, NewStateData);
|
||||
|
@ -3800,7 +3800,7 @@ process_iq_mucsub(From, #iq{type = set, sub_els = [#muc_unsubscribe{}]},
|
|||
Subscribers = ?DICT:erase(LBareJID, StateData#state.subscribers),
|
||||
NewStateData = StateData#state{subscribers = Subscribers,
|
||||
subscriber_nicks = Nicks},
|
||||
store_room(NewStateData),
|
||||
store_room(NewStateData, [{del_subscription, LBareJID}]),
|
||||
send_subscriptions_change_notifications(LBareJID, Nick, unsubscribe, StateData),
|
||||
NewStateData2 = case close_room_if_temporary_and_empty(NewStateData) of
|
||||
{stop, normal, _} -> stop;
|
||||
|
@ -4068,10 +4068,13 @@ element_size(El) ->
|
|||
|
||||
-spec store_room(state()) -> ok.
|
||||
store_room(StateData) ->
|
||||
store_room(StateData, []).
|
||||
store_room(StateData, ChangesHints) ->
|
||||
if (StateData#state.config)#config.persistent ->
|
||||
mod_muc:store_room(StateData#state.server_host,
|
||||
StateData#state.host, StateData#state.room,
|
||||
make_opts(StateData));
|
||||
make_opts(StateData),
|
||||
ChangesHints);
|
||||
true ->
|
||||
ok
|
||||
end.
|
||||
|
|
|
@ -30,13 +30,14 @@
|
|||
-behaviour(mod_muc_room).
|
||||
|
||||
%% API
|
||||
-export([init/2, store_room/4, restore_room/3, forget_room/3,
|
||||
-export([init/2, store_room/5, restore_room/3, forget_room/3,
|
||||
can_use_nick/4, get_rooms/2, get_nick/3, set_nick/4,
|
||||
import/3, export/1]).
|
||||
-export([register_online_room/4, unregister_online_room/4, find_online_room/3,
|
||||
get_online_rooms/3, count_online_rooms/2, rsm_supported/0,
|
||||
register_online_user/4, unregister_online_user/4,
|
||||
count_online_rooms_by_user/3, get_online_rooms_by_user/3]).
|
||||
count_online_rooms_by_user/3, get_online_rooms_by_user/3,
|
||||
get_subscribed_rooms/3]).
|
||||
-export([set_affiliation/6, set_affiliations/4, get_affiliation/5,
|
||||
get_affiliations/3, search_affiliation/4]).
|
||||
|
||||
|
@ -56,24 +57,77 @@ init(Host, Opts) ->
|
|||
ok
|
||||
end.
|
||||
|
||||
store_room(LServer, Host, Name, Opts) ->
|
||||
SOpts = misc:term_to_expr(Opts),
|
||||
store_room(LServer, Host, Name, Opts, ChangesHints) ->
|
||||
{Subs, Opts2} = case lists:keytake(subscribers, 1, Opts) of
|
||||
{value, {subscribers, S}, OptN} -> {S, OptN};
|
||||
_ -> {[], Opts}
|
||||
end,
|
||||
SOpts = misc:term_to_expr(Opts2),
|
||||
F = fun () ->
|
||||
?SQL_UPSERT_T(
|
||||
"muc_room",
|
||||
["!name=%(Name)s",
|
||||
"!host=%(Host)s",
|
||||
"opts=%(SOpts)s"])
|
||||
"opts=%(SOpts)s"]),
|
||||
case ChangesHints of
|
||||
Changes when is_list(Changes) ->
|
||||
[change_room(Host, Name, Change) || Change <- Changes];
|
||||
_ ->
|
||||
ejabberd_sql:sql_query_t(?SQL("delete from muc_room_subscribers where "
|
||||
"room=%(Name)s and host=%(Host)s")),
|
||||
[change_room(Host, Name, {add_subscription, JID, Nick, Nodes})
|
||||
|| {JID, Nick, Nodes} <- Subs]
|
||||
end
|
||||
end,
|
||||
ejabberd_sql:sql_transaction(LServer, F).
|
||||
|
||||
change_room(Host, Room, {add_subscription, JID, Nick, Nodes}) ->
|
||||
SJID = jid:to_string(JID),
|
||||
SNodes = jlib:term_to_expr(Nodes),
|
||||
?SQL_UPSERT_T(
|
||||
"muc_room_subscribers",
|
||||
["!jid=%(SJID)s",
|
||||
"!host=%(Host)s",
|
||||
"!room=%(Room)s",
|
||||
"nick=%(Nick)s",
|
||||
"nodes=%(SNodes)s"]);
|
||||
change_room(Host, Room, {del_subscription, JID}) ->
|
||||
SJID = jid:to_string(JID),
|
||||
ejabberd_sql:sql_query_t(?SQL("delete from muc_room_subscribers where "
|
||||
"room=%(Room)s and host=%(Host)s and jid=%(SJID)s"));
|
||||
change_room(Host, Room, Change) ->
|
||||
?ERROR_MSG("Unsupported change on room ~s@~s: ~p", [Room, Host, Change]).
|
||||
|
||||
restore_room(LServer, Host, Name) ->
|
||||
case catch ejabberd_sql:sql_query(
|
||||
LServer,
|
||||
?SQL("select @(opts)s from muc_room where name=%(Name)s"
|
||||
" and host=%(Host)s")) of
|
||||
{selected, [{Opts}]} ->
|
||||
mod_muc:opts_to_binary(ejabberd_sql:decode_term(Opts));
|
||||
OptsD = ejabberd_sql:decode_term(Opts),
|
||||
case catch ejabberd_sql:sql_query(
|
||||
LServer,
|
||||
?SQL("select @(jid)s, @(nick)s, @(nodes)s from muc_room_subscribers where name=%(Name)s"
|
||||
" and host=%(Host)s")) of
|
||||
{selected, []} ->
|
||||
OptsR = mod_muc:opts_to_binary(OptsD),
|
||||
case lists:keymember(subscribers, 1, OptsD) of
|
||||
true ->
|
||||
store_room(LServer, Host, Name, OptsR, undefined);
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
OptsR;
|
||||
{selected, Subs} ->
|
||||
SubData = lists:map(
|
||||
fun({Jid, Nick, Nodes}) ->
|
||||
{jid:from_string(Jid), Nick, ejabberd_sql:decode_term(Nodes)}
|
||||
end, Subs),
|
||||
Opts2 = lists:keystore(subscribers, 1, OptsD, {subscribers, SubData}),
|
||||
mod_muc:opts_to_binary(Opts2);
|
||||
_ ->
|
||||
error
|
||||
end;
|
||||
_ ->
|
||||
error
|
||||
end.
|
||||
|
@ -82,6 +136,9 @@ forget_room(LServer, Host, Name) ->
|
|||
F = fun () ->
|
||||
ejabberd_sql:sql_query_t(
|
||||
?SQL("delete from muc_room where name=%(Name)s"
|
||||
" and host=%(Host)s")),
|
||||
ejabberd_sql:sql_query_t(
|
||||
?SQL("delete from muc_room_subscribers where room=%(Name)s"
|
||||
" and host=%(Host)s"))
|
||||
end,
|
||||
ejabberd_sql:sql_transaction(LServer, F).
|
||||
|
@ -103,12 +160,35 @@ get_rooms(LServer, Host) ->
|
|||
?SQL("select @(name)s, @(opts)s from muc_room"
|
||||
" where host=%(Host)s")) of
|
||||
{selected, RoomOpts} ->
|
||||
case catch ejabberd_sql:sql_query(
|
||||
LServer,
|
||||
?SQL("select @(room)s, @(jid)s, @(nick)s, @(nodes)s from muc_room_subscribers"
|
||||
" where host=%(Host)s")) of
|
||||
{selected, Subs} ->
|
||||
SubsD = lists:foldl(
|
||||
fun({Room, Jid, Nick, Nodes}, Dict) ->
|
||||
dict:append(Room, {jid:from_string(Jid),
|
||||
Nick, ejabberd_sql:decode_term(Nodes)}, Dict)
|
||||
end, dict:new(), Subs),
|
||||
lists:map(
|
||||
fun({Room, Opts}) ->
|
||||
OptsD = ejabberd_sql:decode_term(Opts),
|
||||
OptsD2 = case {dict:find(Room, SubsD), lists:keymember(subscribers, 1, OptsD)} of
|
||||
{_, true} ->
|
||||
store_room(LServer, Host, Room, mod_muc:opts_to_binary(OptsD), undefined),
|
||||
OptsD;
|
||||
{{ok, SubsI}, false} ->
|
||||
lists:keystore(subscribers, 1, OptsD, {subscribers, SubsI});
|
||||
_ ->
|
||||
OptsD
|
||||
end,
|
||||
#muc_room{name_host = {Room, Host},
|
||||
opts = mod_muc:opts_to_binary(
|
||||
ejabberd_sql:decode_term(Opts))}
|
||||
opts = mod_muc:opts_to_binary(OptsD2)}
|
||||
end, RoomOpts);
|
||||
Err ->
|
||||
?ERROR_MSG("failed to get rooms subscribers: ~p", [Err]),
|
||||
[]
|
||||
end;
|
||||
Err ->
|
||||
?ERROR_MSG("failed to get rooms: ~p", [Err]),
|
||||
[]
|
||||
|
@ -325,6 +405,19 @@ export(_Server) ->
|
|||
import(_, _, _) ->
|
||||
ok.
|
||||
|
||||
get_subscribed_rooms(LServer, Host, Jid) ->
|
||||
JidS = jid:to_string(Jid),
|
||||
case catch ejabberd_sql:sql_query(
|
||||
LServer,
|
||||
?SQL("select @(room)s from muc_room_subscribers where jid=%(JidS)s"
|
||||
" and host=%(Host)s")) of
|
||||
{selected, Subs} ->
|
||||
[jid:make(Room, Host, <<>>) || Room<-Subs];
|
||||
Error ->
|
||||
?ERROR_MSG("Error when fetching subscribed rooms ~p", [Error]),
|
||||
[]
|
||||
end.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
|
|
Loading…
Reference in New Issue