24
1
mirror of https://github.com/processone/ejabberd.git synced 2024-06-02 21:17:12 +02:00

Distribute routing of MUC messages accross all CPU cores

Also relay as less stanzas as possible through mod_muc workers
This commit is contained in:
Evgeny Khramtsov 2019-07-05 10:35:31 +03:00
parent 05461d1686
commit b3caade0a0
3 changed files with 399 additions and 259 deletions

View File

@ -22,20 +22,19 @@
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. %%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%% %%%
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
-module(mod_muc). -module(mod_muc).
-author('alexey@process-one.net'). -author('alexey@process-one.net').
-protocol({xep, 45, '1.25'}). -protocol({xep, 45, '1.25'}).
-ifndef(GEN_SERVER).
-behaviour(gen_server). -define(GEN_SERVER, gen_server).
-endif.
-behaviour(?GEN_SERVER).
-behaviour(gen_mod). -behaviour(gen_mod).
%% API %% API
-export([start/2, -export([start/2,
stop/1, stop/1,
start_link/3,
reload/3, reload/3,
room_destroyed/4, room_destroyed/4,
store_room/4, store_room/4,
@ -66,7 +65,9 @@
count_online_rooms_by_user/3, count_online_rooms_by_user/3,
get_online_rooms_by_user/3, get_online_rooms_by_user/3,
can_use_nick/4, can_use_nick/4,
get_subscribed_rooms/2]). get_subscribed_rooms/2,
procname/2,
route/1]).
-export([init/1, handle_call/3, handle_cast/2, -export([init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2, code_change/3, handle_info/2, terminate/2, code_change/3,
@ -76,18 +77,15 @@
-include("xmpp.hrl"). -include("xmpp.hrl").
-include("mod_muc.hrl"). -include("mod_muc.hrl").
-include("translate.hrl"). -include("translate.hrl").
-include("ejabberd_stacktrace.hrl").
-record(state, -record(state, {hosts :: [binary()],
{hosts = [] :: [binary()], server_host :: binary(),
server_host = <<"">> :: binary(), worker :: pos_integer()}).
access = {none, none, none, none} :: {atom(), atom(), atom(), atom(), atom()},
history_size = 20 :: non_neg_integer(),
max_rooms_discoitems = 100 :: non_neg_integer(),
queue_type = ram :: ram | file,
default_room_opts = [] :: list(),
room_shaper = none :: ejabberd_shaper:shaper()}).
-type access() :: {acl:acl(), acl:acl(), acl:acl(), acl:acl(), acl:acl()}.
-type muc_room_opts() :: [{atom(), any()}]. -type muc_room_opts() :: [{atom(), any()}].
-export_type([access/0]).
-callback init(binary(), gen_mod:opts()) -> any(). -callback init(binary(), gen_mod:opts()) -> any().
-callback import(binary(), binary(), [binary()]) -> ok. -callback import(binary(), binary(), [binary()]) -> ok.
-callback store_room(binary(), binary(), binary(), list(), list()|undefined) -> {atomic, any()}. -callback store_room(binary(), binary(), binary(), list(), list()|undefined) -> {atomic, any()}.
@ -116,23 +114,159 @@
%% API %% API
%%==================================================================== %%====================================================================
start(Host, Opts) -> start(Host, Opts) ->
gen_mod:start_child(?MODULE, Host, Opts). case mod_muc_sup:start(Host, Opts) of
{ok, _} ->
MyHosts = gen_mod:get_opt_hosts(Opts),
Mod = gen_mod:db_mod(Opts, ?MODULE),
RMod = gen_mod:ram_db_mod(Opts, ?MODULE),
Mod:init(Host, gen_mod:set_opt(hosts, MyHosts, Opts)),
RMod:init(Host, gen_mod:set_opt(hosts, MyHosts, Opts)),
load_permanent_rooms(MyHosts, Host, Opts);
Err ->
Err
end.
stop(Host) -> stop(Host) ->
Proc = mod_muc_sup:procname(Host),
Rooms = shutdown_rooms(Host), Rooms = shutdown_rooms(Host),
gen_mod:stop_child(?MODULE, Host), supervisor:terminate_child(ejabberd_gen_mod_sup, Proc),
supervisor:delete_child(ejabberd_gen_mod_sup, Proc),
{wait, Rooms}. {wait, Rooms}.
reload(Host, NewOpts, OldOpts) -> -spec reload(binary(), gen_mod:opts(), gen_mod:opts()) -> ok.
Proc = gen_mod:get_module_proc(Host, ?MODULE), reload(ServerHost, NewOpts, OldOpts) ->
gen_server:cast(Proc, {reload, Host, NewOpts, OldOpts}). NewMod = gen_mod:db_mod(NewOpts, ?MODULE),
NewRMod = gen_mod:ram_db_mod(NewOpts, ?MODULE),
OldMod = gen_mod:db_mod(OldOpts, ?MODULE),
OldRMod = gen_mod:ram_db_mod(OldOpts, ?MODULE),
NewHosts = gen_mod:get_opt_hosts(NewOpts),
OldHosts = gen_mod:get_opt_hosts(OldOpts),
AddHosts = NewHosts -- OldHosts,
DelHosts = OldHosts -- NewHosts,
if NewMod /= OldMod ->
NewMod:init(ServerHost, gen_mod:set_opt(hosts, NewHosts, NewOpts));
true ->
ok
end,
if NewRMod /= OldRMod ->
NewRMod:init(ServerHost, gen_mod:set_opt(hosts, NewHosts, NewOpts));
true ->
ok
end,
lists:foreach(
fun(I) ->
?GEN_SERVER:cast(procname(ServerHost, I),
{reload, AddHosts, DelHosts, NewHosts})
end, lists:seq(1, erlang:system_info(logical_processors))),
load_permanent_rooms(AddHosts, ServerHost, NewOpts),
shutdown_rooms(ServerHost, DelHosts, OldRMod),
lists:foreach(
fun(Host) ->
lists:foreach(
fun({_, _, Pid}) when node(Pid) == node() ->
Pid ! config_reloaded;
(_) ->
ok
end, get_online_rooms(ServerHost, Host))
end, misc:intersection(NewHosts, OldHosts)).
depends(_Host, _Opts) -> depends(_Host, _Opts) ->
[{mod_mam, soft}]. [{mod_mam, soft}].
start_link(Host, Opts, I) ->
Proc = procname(Host, I),
?GEN_SERVER:start_link({local, Proc}, ?MODULE, [Host, Opts, I],
ejabberd_config:fsm_limit_opts([])).
-spec procname(binary(), pos_integer() | {binary(), binary()}) -> atom().
procname(Host, I) when is_integer(I) ->
binary_to_atom(
<<(atom_to_binary(?MODULE, latin1))/binary, "_", Host/binary,
"_", (integer_to_binary(I))/binary>>, utf8);
procname(Host, RoomHost) ->
Cores = erlang:system_info(logical_processors),
I = erlang:phash2(RoomHost, Cores) + 1,
procname(Host, I).
-spec route(stanza()) -> ok.
route(Pkt) ->
To = xmpp:get_to(Pkt),
ServerHost = ejabberd_router:host_of_route(To#jid.lserver),
route(Pkt, ServerHost).
-spec route(stanza(), binary()) -> ok.
route(Pkt, ServerHost) ->
From = xmpp:get_from(Pkt),
To = xmpp:get_to(Pkt),
Host = To#jid.lserver,
Access = mod_muc_opt:access(ServerHost),
case acl:match_rule(ServerHost, Access, From) of
allow ->
route(Pkt, Host, ServerHost);
deny ->
Lang = xmpp:get_lang(Pkt),
ErrText = ?T("Access denied by service policy"),
Err = xmpp:err_forbidden(ErrText, Lang),
ejabberd_router:route_error(Pkt, Err)
end.
-spec route(stanza(), binary(), binary()) -> ok.
route(#iq{to = #jid{luser = <<"">>, lresource = <<"">>}} = IQ, _, _) ->
ejabberd_router:process_iq(IQ);
route(#message{lang = Lang, body = Body, type = Type, from = From,
to = #jid{luser = <<"">>, lresource = <<"">>}} = Pkt,
Host, ServerHost) ->
if Type == error ->
ok;
true ->
AccessAdmin = mod_muc_opt:access_admin(ServerHost),
case acl:match_rule(ServerHost, AccessAdmin, From) of
allow ->
Msg = xmpp:get_text(Body),
broadcast_service_message(ServerHost, Host, Msg);
deny ->
ErrText = ?T("Only service administrators are allowed "
"to send service messages"),
Err = xmpp:err_forbidden(ErrText, Lang),
ejabberd_router:route_error(Pkt, Err)
end
end;
route(Pkt, Host, ServerHost) ->
{Room, _, _} = jid:tolower(xmpp:get_to(Pkt)),
case Room of
<<"">> ->
Txt = ?T("No module is handling this query"),
Err = xmpp:err_service_unavailable(Txt, xmpp:get_lang(Pkt)),
ejabberd_router:route_error(Pkt, Err);
_ ->
RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE),
case RMod:find_online_room(ServerHost, Room, Host) of
error ->
Proc = procname(ServerHost, {Room, Host}),
case whereis(Proc) of
Pid when Pid == self() ->
route_to_room(Pkt, ServerHost);
Pid when is_pid(Pid) ->
?DEBUG("Routing to MUC worker ~p:~n~s", [Proc, xmpp:pp(Pkt)]),
?GEN_SERVER:cast(Pid, {route_to_room, Pkt});
undefined ->
?DEBUG("MUC worker ~p is dead", [Proc]),
Err = xmpp:err_internal_server_error(),
ejabberd_router:route_error(Pkt, Err)
end;
{ok, Pid} ->
mod_muc_room:route(Pid, Pkt)
end
end.
-spec shutdown_rooms(binary()) -> [pid()].
shutdown_rooms(ServerHost) -> shutdown_rooms(ServerHost) ->
RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE), RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE),
Hosts = gen_mod:get_module_opt_hosts(ServerHost, mod_muc), Hosts = gen_mod:get_module_opt_hosts(ServerHost, mod_muc),
shutdown_rooms(ServerHost, Hosts, RMod).
-spec shutdown_rooms(binary(), [binary()], module()) -> [pid()].
shutdown_rooms(ServerHost, Hosts, RMod) ->
Rooms = [RMod:get_online_rooms(ServerHost, Host, undefined) Rooms = [RMod:get_online_rooms(ServerHost, Host, undefined)
|| Host <- Hosts], || Host <- Hosts],
lists:flatmap( lists:flatmap(
@ -149,18 +283,18 @@ shutdown_rooms(ServerHost) ->
%% C) mod_muc:stop was called, and each room is being terminated %% C) mod_muc:stop was called, and each room is being terminated
%% In this case, the mod_muc process died before the room processes %% In this case, the mod_muc process died before the room processes
%% So the message sending must be catched %% So the message sending must be catched
-spec room_destroyed(binary(), binary(), pid(), binary()) -> ok.
room_destroyed(Host, Room, Pid, ServerHost) -> room_destroyed(Host, Room, Pid, ServerHost) ->
catch gen_mod:get_module_proc(ServerHost, ?MODULE) ! Proc = procname(ServerHost, {Room, Host}),
{room_destroyed, {Room, Host}, Pid}, ?GEN_SERVER:cast(Proc, {room_destroyed, {Room, Host}, Pid}).
ok.
%% @doc Create a room. %% @doc Create a room.
%% If Opts = default, the default room options are used. %% If Opts = default, the default room options are used.
%% Else use the passed options as defined in mod_muc_room. %% Else use the passed options as defined in mod_muc_room.
create_room(Host, Name, From, Nick, Opts) -> create_room(Host, Name, From, Nick, Opts) ->
ServerHost = ejabberd_router:host_of_route(Host), ServerHost = ejabberd_router:host_of_route(Host),
Proc = gen_mod:get_module_proc(ServerHost, ?MODULE), Proc = procname(ServerHost, {Name, Host}),
gen_server:call(Proc, {create, Name, Host, From, Nick, Opts}). ?GEN_SERVER:call(Proc, {create, Name, Host, From, Nick, Opts}).
store_room(ServerHost, Host, Name, Opts) -> store_room(ServerHost, Host, Name, Opts) ->
store_room(ServerHost, Host, Name, Opts, undefined). store_room(ServerHost, Host, Name, Opts, undefined).
@ -232,228 +366,158 @@ get_online_rooms_by_user(ServerHost, LUser, LServer) ->
%%==================================================================== %%====================================================================
%% gen_server callbacks %% gen_server callbacks
%%==================================================================== %%====================================================================
init([Host, Opts, Worker]) ->
init([Host, Opts]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
#state{access = Access, hosts = MyHosts, MyHosts = gen_mod:get_opt_hosts(Opts),
history_size = HistorySize, queue_type = QueueType, register_routes(Host, MyHosts, Worker),
room_shaper = RoomShaper} = State = init_state(Host, Opts), register_iq_handlers(MyHosts, Worker),
Mod = gen_mod:db_mod(Opts, ?MODULE), {ok, #state{server_host = Host, hosts = MyHosts, worker = Worker}}.
RMod = gen_mod:ram_db_mod(Opts, ?MODULE),
Mod:init(Host, gen_mod:set_opt(hosts, MyHosts, Opts)),
RMod:init(Host, gen_mod:set_opt(hosts, MyHosts, Opts)),
lists:foreach(
fun(MyHost) ->
register_iq_handlers(MyHost),
ejabberd_router:register_route(MyHost, Host),
load_permanent_rooms(MyHost, Host, Access, HistorySize,
RoomShaper, QueueType)
end, MyHosts),
{ok, State}.
handle_call(stop, _From, State) -> handle_call(stop, _From, State) ->
{stop, normal, ok, State}; {stop, normal, ok, State};
handle_call({create, Room, Host, From, Nick, Opts}, _From, handle_call({create, Room, Host, From, Nick, Opts}, _From,
#state{server_host = ServerHost, #state{server_host = ServerHost} = State) ->
access = Access, default_room_opts = DefOpts,
history_size = HistorySize, queue_type = QueueType,
room_shaper = RoomShaper} = State) ->
?DEBUG("MUC: create new room '~s'~n", [Room]), ?DEBUG("MUC: create new room '~s'~n", [Room]),
NewOpts = case Opts of NewOpts = case Opts of
default -> DefOpts; default -> mod_muc_opt:default_room_options(ServerHost);
_ -> Opts _ -> Opts
end, end,
{ok, Pid} = mod_muc_room:start(
Host, ServerHost, Access,
Room, HistorySize,
RoomShaper, From,
Nick, NewOpts, QueueType),
RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE), RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE),
RMod:register_online_room(ServerHost, Room, Host, Pid), case start_room(RMod, Host, ServerHost, Room, NewOpts, From, Nick) of
ejabberd_hooks:run(create_room, ServerHost, [ServerHost, Room, Host]), {ok, _} ->
{reply, ok, State}. ejabberd_hooks:run(create_room, ServerHost, [ServerHost, Room, Host]),
{reply, ok, State};
Err ->
{reply, Err, State}
end.
handle_cast({reload, ServerHost, NewOpts, OldOpts}, #state{hosts = OldHosts}) -> handle_cast({route_to_room, Packet}, #state{server_host = ServerHost} = State) ->
NewMod = gen_mod:db_mod(NewOpts, ?MODULE), try route_to_room(Packet, ServerHost)
NewRMod = gen_mod:ram_db_mod(NewOpts, ?MODULE), catch ?EX_RULE(E, R, St) ->
OldMod = gen_mod:db_mod(OldOpts, ?MODULE), StackTrace = ?EX_STACK(St),
OldRMod = gen_mod:ram_db_mod(OldOpts, ?MODULE), ?ERROR_MSG("Failed to route packet:~n~s~nReason = ~p",
#state{hosts = NewHosts} = NewState = init_state(ServerHost, NewOpts), [xmpp:pp(Packet), {E, {R, StackTrace}}])
if NewMod /= OldMod ->
NewMod:init(ServerHost, gen_mod:set_opt(hosts, NewHosts, NewOpts));
true ->
ok
end,
if NewRMod /= OldRMod ->
NewRMod:init(ServerHost, gen_mod:set_opt(hosts, NewHosts, NewOpts));
true ->
ok
end,
lists:foreach(
fun(NewHost) ->
ejabberd_router:register_route(NewHost, ServerHost),
register_iq_handlers(NewHost)
end, NewHosts -- OldHosts),
lists:foreach(
fun(OldHost) ->
ejabberd_router:unregister_route(OldHost),
unregister_iq_handlers(OldHost)
end, OldHosts -- NewHosts),
lists:foreach(
fun(Host) ->
lists:foreach(
fun({_, _, Pid}) when node(Pid) == node() ->
Pid ! config_reloaded;
(_) ->
ok
end, get_online_rooms(ServerHost, Host))
end, misc:intersection(NewHosts, OldHosts)),
{noreply, NewState};
handle_cast(Msg, State) ->
?WARNING_MSG("Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({route, Packet},
#state{server_host = ServerHost,
access = Access, default_room_opts = DefRoomOpts,
history_size = HistorySize, queue_type = QueueType,
max_rooms_discoitems = MaxRoomsDiscoItems,
room_shaper = RoomShaper} = State) ->
From = xmpp:get_from(Packet),
To = xmpp:get_to(Packet),
Host = To#jid.lserver,
case catch do_route(Host, ServerHost, Access, HistorySize, RoomShaper,
From, To, Packet, DefRoomOpts, MaxRoomsDiscoItems,
QueueType) of
{'EXIT', Reason} ->
?ERROR_MSG("~p", [Reason]);
_ ->
ok
end, end,
{noreply, State}; {noreply, State};
handle_info({room_destroyed, {Room, Host}, Pid}, State) -> handle_cast({room_destroyed, {Room, Host}, Pid}, State) ->
ServerHost = State#state.server_host, ServerHost = State#state.server_host,
RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE), RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE),
RMod:unregister_online_room(ServerHost, Room, Host, Pid), RMod:unregister_online_room(ServerHost, Room, Host, Pid),
{noreply, State}; {noreply, State};
handle_cast({reload, AddHosts, DelHosts, NewHosts},
#state{server_host = ServerHost, worker = Worker} = State) ->
register_routes(ServerHost, AddHosts, Worker),
register_iq_handlers(AddHosts, Worker),
unregister_routes(DelHosts, Worker),
unregister_iq_handlers(DelHosts, Worker),
{noreply, State#state{hosts = NewHosts}};
handle_cast(Msg, State) ->
?WARNING_MSG("Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({route, Packet}, State) ->
%% We can only receive the packet here from other nodes
%% where mod_muc is not loaded. Such configuration
%% is *highly* discouraged
try route(Packet, State#state.server_host)
catch ?EX_RULE(E, R, St) ->
StackTrace = ?EX_STACK(St),
?ERROR_MSG("Failed to route packet:~n~s~nReason = ~p",
[xmpp:pp(Packet), {E, {R, StackTrace}}])
end,
{noreply, State};
handle_info({room_destroyed, {Room, Host}, Pid}, State) ->
%% For backward compat
handle_cast({room_destroyed, {Room, Host}, Pid}, State);
handle_info(Info, State) -> handle_info(Info, State) ->
?ERROR_MSG("Unexpected info: ~p", [Info]), ?ERROR_MSG("Unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, #state{hosts = MyHosts}) -> terminate(_Reason, #state{hosts = Hosts, worker = Worker}) ->
lists:foreach( unregister_routes(Hosts, Worker),
fun(MyHost) -> unregister_iq_handlers(Hosts, Worker).
ejabberd_router:unregister_route(MyHost),
unregister_iq_handlers(MyHost)
end, MyHosts).
code_change(_OldVsn, State, _Extra) -> {ok, State}. code_change(_OldVsn, State, _Extra) -> {ok, State}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%%% Internal functions %%% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init_state(Host, Opts) -> -spec register_iq_handlers([binary()], pos_integer()) -> ok.
MyHosts = gen_mod:get_opt_hosts(Opts), register_iq_handlers(Hosts, 1) ->
Access = mod_muc_opt:access(Opts), %% Only register handlers on first worker
AccessCreate = mod_muc_opt:access_create(Opts), lists:foreach(
AccessAdmin = mod_muc_opt:access_admin(Opts), fun(Host) ->
AccessPersistent = mod_muc_opt:access_persistent(Opts), gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_REGISTER,
AccessMam = mod_muc_opt:access_mam(Opts), ?MODULE, process_register),
HistorySize = mod_muc_opt:history_size(Opts), gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_VCARD,
MaxRoomsDiscoItems = mod_muc_opt:max_rooms_discoitems(Opts), ?MODULE, process_vcard),
DefRoomOpts = mod_muc_opt:default_room_options(Opts), gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MUCSUB,
QueueType = mod_muc_opt:queue_type(Opts), ?MODULE, process_mucsub),
RoomShaper = mod_muc_opt:room_shaper(Opts), gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MUC_UNIQUE,
#state{hosts = MyHosts, ?MODULE, process_muc_unique),
server_host = Host, gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO,
access = {Access, AccessCreate, AccessAdmin, AccessPersistent, AccessMam}, ?MODULE, process_disco_info),
default_room_opts = DefRoomOpts, gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS,
queue_type = QueueType, ?MODULE, process_disco_items)
history_size = HistorySize, end, Hosts);
max_rooms_discoitems = MaxRoomsDiscoItems, register_iq_handlers(_, _) ->
room_shaper = RoomShaper}. ok.
register_iq_handlers(Host) -> -spec unregister_iq_handlers([binary()], pos_integer()) -> ok.
gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_REGISTER, unregister_iq_handlers(Hosts, 1) ->
?MODULE, process_register), %% Only unregister handlers on first worker
gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_VCARD, lists:foreach(
?MODULE, process_vcard), fun(Host) ->
gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MUCSUB, gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_REGISTER),
?MODULE, process_mucsub), gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_VCARD),
gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MUC_UNIQUE, gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MUCSUB),
?MODULE, process_muc_unique), gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MUC_UNIQUE),
gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO, gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO),
?MODULE, process_disco_info), gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS)
gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS, end, Hosts);
?MODULE, process_disco_items). unregister_iq_handlers(_, _) ->
ok.
unregister_iq_handlers(Host) -> -spec register_routes(binary(), [binary()], pos_integer()) -> ok.
gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_REGISTER), register_routes(ServerHost, Hosts, 1) ->
gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_VCARD), %% Only register routes on first worker
gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MUCSUB), lists:foreach(
gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MUC_UNIQUE), fun(Host) ->
gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO), ejabberd_router:register_route(
gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS). Host, ServerHost, {apply, ?MODULE, route})
end, Hosts);
register_routes(_, _, _) ->
ok.
do_route(Host, ServerHost, Access, HistorySize, RoomShaper, -spec unregister_routes([binary()], pos_integer()) -> ok.
From, To, Packet, DefRoomOpts, _MaxRoomsDiscoItems, QueueType) -> unregister_routes(Hosts, 1) ->
{AccessRoute, _AccessCreate, _AccessAdmin, _AccessPersistent, _AccessMam} = Access, %% Only unregister routes on first worker
case acl:match_rule(ServerHost, AccessRoute, From) of lists:foreach(
allow -> fun(Host) ->
do_route1(Host, ServerHost, Access, HistorySize, RoomShaper, ejabberd_router:unregister_route(Host)
From, To, Packet, DefRoomOpts, QueueType); end, Hosts);
deny -> unregister_routes(_, _) ->
Lang = xmpp:get_lang(Packet), ok.
ErrText = ?T("Access denied by service policy"),
Err = xmpp:err_forbidden(ErrText, Lang),
ejabberd_router:route_error(Packet, Err)
end.
do_route1(_Host, _ServerHost, _Access, _HistorySize, _RoomShaper, -spec route_to_room(stanza(), binary()) -> ok.
_From, #jid{luser = <<"">>, lresource = <<"">>} = _To, route_to_room(Packet, ServerHost) ->
#iq{} = IQ, _DefRoomOpts, _QueueType) -> From = xmpp:get_from(Packet),
ejabberd_router:process_iq(IQ); To = xmpp:get_to(Packet),
do_route1(Host, ServerHost, Access, _HistorySize, _RoomShaper, {Room, Host, Nick} = jid:tolower(To),
From, #jid{luser = <<"">>, lresource = <<"">>} = _To,
#message{lang = Lang, body = Body, type = Type} = Packet, _, _) ->
{_AccessRoute, _AccessCreate, AccessAdmin, _AccessPersistent, _AccessMam} = Access,
if Type == error ->
ok;
true ->
case acl:match_rule(ServerHost, AccessAdmin, From) of
allow ->
Msg = xmpp:get_text(Body),
broadcast_service_message(ServerHost, Host, Msg);
deny ->
ErrText = ?T("Only service administrators are allowed "
"to send service messages"),
Err = xmpp:err_forbidden(ErrText, Lang),
ejabberd_router:route_error(Packet, Err)
end
end;
do_route1(_Host, _ServerHost, _Access, _HistorySize, _RoomShaper,
_From, #jid{luser = <<"">>} = _To, Packet, _DefRoomOpts, _) ->
Err = xmpp:err_service_unavailable(),
ejabberd_router:route_error(Packet, Err);
do_route1(Host, ServerHost, Access, HistorySize, RoomShaper,
From, To, Packet, DefRoomOpts, QueueType) ->
{Room, _, Nick} = jid:tolower(To),
RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE), RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE),
case RMod:find_online_room(ServerHost, Room, Host) of case RMod:find_online_room(ServerHost, Room, Host) of
error -> error ->
case is_create_request(Packet) of case is_create_request(Packet) of
true -> true ->
case check_create_room( case check_create_room(ServerHost, Host, Room, From) of
ServerHost, Host, Room, From, Access) of
true -> true ->
{ok, Pid} = start_new_room( case start_new_room(Host, ServerHost, Room, From, Nick) of
Host, ServerHost, Access, {ok, Pid} ->
Room, HistorySize, mod_muc_room:route(Pid, Packet);
RoomShaper, From, Nick, DefRoomOpts, _Err ->
QueueType), Err = xmpp:err_internal_server_error(),
RMod:register_online_room(ServerHost, Room, Host, Pid), ejabberd_router:route_error(Packet, Err)
mod_muc_room:route(Pid, Packet), end;
ok;
false -> false ->
Lang = xmpp:get_lang(Packet), Lang = xmpp:get_lang(Packet),
ErrText = ?T("Room creation is denied by service policy"), ErrText = ?T("Room creation is denied by service policy"),
@ -467,9 +531,7 @@ do_route1(Host, ServerHost, Access, HistorySize, RoomShaper,
ejabberd_router:route_error(Packet, Err) ejabberd_router:route_error(Packet, Err)
end; end;
{ok, Pid} -> {ok, Pid} ->
?DEBUG("MUC: send to process ~p~n", [Pid]), mod_muc_room:route(Pid, Packet)
mod_muc_room:route(Pid, Packet),
ok
end. end.
-spec process_vcard(iq()) -> iq(). -spec process_vcard(iq()) -> iq().
@ -612,11 +674,9 @@ is_create_request(#iq{type = T} = IQ) when T == get; T == set ->
is_create_request(_) -> is_create_request(_) ->
false. false.
-spec check_create_room(binary(), binary(), binary(), jid(), tuple()) -spec check_create_room(binary(), binary(), binary(), jid()) -> boolean().
-> boolean(). check_create_room(ServerHost, Host, Room, From) ->
check_create_room(ServerHost, Host, Room, From, Access) -> AccessCreate = mod_muc_opt:access_create(ServerHost),
{_AccessRoute, AccessCreate, AccessAdmin,
_AccessPersistent, _AccessMam} = Access,
case acl:match_rule(ServerHost, AccessCreate, From) of case acl:match_rule(ServerHost, AccessCreate, From) of
allow -> allow ->
case mod_muc_opt:max_room_id(ServerHost) of case mod_muc_opt:max_room_id(ServerHost) of
@ -624,8 +684,8 @@ check_create_room(ServerHost, Host, Room, From, Access) ->
Regexp = mod_muc_opt:regexp_room_id(ServerHost), Regexp = mod_muc_opt:regexp_room_id(ServerHost),
case re:run(Room, Regexp, [unicode, {capture, none}]) of case re:run(Room, Regexp, [unicode, {capture, none}]) of
match -> match ->
case acl:match_rule( AccessAdmin = mod_muc_opt:access_admin(ServerHost),
ServerHost, AccessAdmin, From) of case acl:match_rule(ServerHost, AccessAdmin, From) of
allow -> allow ->
true; true;
_ -> _ ->
@ -643,37 +703,56 @@ check_create_room(ServerHost, Host, Room, From, Access) ->
false false
end. end.
-spec get_access(binary() | gen_mod:opts()) -> access().
get_access(ServerHost) ->
Access = mod_muc_opt:access(ServerHost),
AccessCreate = mod_muc_opt:access_create(ServerHost),
AccessAdmin = mod_muc_opt:access_admin(ServerHost),
AccessPersistent = mod_muc_opt:access_persistent(ServerHost),
AccessMam = mod_muc_opt:access_mam(ServerHost),
{Access, AccessCreate, AccessAdmin, AccessPersistent, AccessMam}.
-spec get_rooms(binary(), binary()) -> [#muc_room{}].
get_rooms(ServerHost, Host) -> get_rooms(ServerHost, Host) ->
LServer = jid:nameprep(ServerHost), Mod = gen_mod:db_mod(ServerHost, ?MODULE),
Mod = gen_mod:db_mod(LServer, ?MODULE), Mod:get_rooms(ServerHost, Host).
Mod:get_rooms(LServer, Host).
load_permanent_rooms(Host, ServerHost, Access, -spec load_permanent_rooms([binary()], binary(), gen_mod:opts()) -> ok.
HistorySize, RoomShaper, QueueType) -> load_permanent_rooms(Hosts, ServerHost, Opts) ->
case mod_muc_opt:preload_rooms(Opts) of
true ->
Access = get_access(Opts),
HistorySize = mod_muc_opt:history_size(Opts),
QueueType = mod_muc_opt:queue_type(Opts),
RoomShaper = mod_muc_opt:room_shaper(Opts),
RMod = gen_mod:ram_db_mod(Opts, ?MODULE),
lists:foreach(
fun(Host) ->
?DEBUG("Loading rooms at ~s", [Host]),
lists:foreach(
fun(R) ->
{Room, _} = R#muc_room.name_host,
case proplists:get_bool(persistent, R#muc_room.opts) of
true ->
case RMod:find_online_room(ServerHost, Room, Host) of
error ->
start_room(RMod, Host, ServerHost, Access,
Room, HistorySize, RoomShaper,
R#muc_room.opts, QueueType);
{ok, _} ->
ok
end;
_ ->
forget_room(ServerHost, Host, Room)
end
end, get_rooms(ServerHost, Host))
end, Hosts);
false ->
ok
end.
start_new_room(Host, ServerHost, Room, From, Nick) ->
RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE), RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE),
lists:foreach(
fun(R) ->
{Room, Host} = R#muc_room.name_host,
case proplists:get_bool(persistent, R#muc_room.opts) of
true ->
case RMod:find_online_room(ServerHost, Room, Host) of
error ->
{ok, Pid} = mod_muc_room:start(Host,
ServerHost, Access, Room,
HistorySize, RoomShaper,
R#muc_room.opts, QueueType),
RMod:register_online_room(ServerHost, Room, Host, Pid);
{ok, _} ->
ok
end;
_ ->
forget_room(ServerHost, Host, Room)
end
end, get_rooms(ServerHost, Host)).
start_new_room(Host, ServerHost, Access, Room,
HistorySize, RoomShaper, From,
Nick, DefRoomOpts, QueueType) ->
Opts = case restore_room(ServerHost, Host, Room) of Opts = case restore_room(ServerHost, Host, Room) of
error -> error ->
error; error;
@ -687,14 +766,52 @@ start_new_room(Host, ServerHost, Access, Room,
end, end,
case Opts of case Opts of
error -> error ->
?DEBUG("MUC: open new room '~s'~n", [Room]), ?DEBUG("Open new room: ~s", [Room]),
mod_muc_room:start(Host, ServerHost, Access, Room, DefRoomOpts = mod_muc_opt:default_room_options(ServerHost),
HistorySize, RoomShaper, start_room(RMod, Host, ServerHost, Room, DefRoomOpts, From, Nick);
From, Nick, DefRoomOpts, QueueType);
_ -> _ ->
?DEBUG("MUC: restore room '~s'~n", [Room]), ?DEBUG("Restore room: ~s", [Room]),
mod_muc_room:start(Host, ServerHost, Access, Room, start_room(RMod, Host, ServerHost, Room, Opts)
HistorySize, RoomShaper, Opts, QueueType) end.
start_room(Mod, Host, ServerHost, Room, DefOpts) ->
Access = get_access(ServerHost),
HistorySize = mod_muc_opt:history_size(ServerHost),
QueueType = mod_muc_opt:queue_type(ServerHost),
RoomShaper = mod_muc_opt:room_shaper(ServerHost),
start_room(Mod, Host, ServerHost, Access, Room, HistorySize,
RoomShaper, DefOpts, QueueType).
start_room(Mod, Host, ServerHost, Room, DefOpts, Creator, Nick) ->
Access = get_access(ServerHost),
HistorySize = mod_muc_opt:history_size(ServerHost),
QueueType = mod_muc_opt:queue_type(ServerHost),
RoomShaper = mod_muc_opt:room_shaper(ServerHost),
start_room(Mod, Host, ServerHost, Access, Room,
HistorySize, RoomShaper,
Creator, Nick, DefOpts, QueueType).
start_room(Mod, Host, ServerHost, Access, Room,
HistorySize, RoomShaper, DefOpts, QueueType) ->
case mod_muc_room:start(Host, ServerHost, Access, Room,
HistorySize, RoomShaper, DefOpts, QueueType) of
{ok, Pid} ->
Mod:register_online_room(ServerHost, Room, Host, Pid),
{ok, Pid};
Err ->
Err
end.
start_room(Mod, Host, ServerHost, Access, Room, HistorySize,
RoomShaper, Creator, Nick, DefOpts, QueueType) ->
case mod_muc_room:start(Host, ServerHost, Access, Room,
HistorySize, RoomShaper,
Creator, Nick, DefOpts, QueueType) of
{ok, Pid} ->
Mod:register_online_room(ServerHost, Room, Host, Pid),
{ok, Pid};
Err ->
Err
end. end.
-spec iq_disco_items(binary(), binary(), jid(), binary(), integer(), binary(), -spec iq_disco_items(binary(), binary(), jid(), binary(), integer(), binary(),
@ -964,7 +1081,7 @@ mod_opt_type(max_room_id) ->
mod_opt_type(max_rooms_discoitems) -> mod_opt_type(max_rooms_discoitems) ->
econf:non_neg_int(); econf:non_neg_int();
mod_opt_type(regexp_room_id) -> mod_opt_type(regexp_room_id) ->
econf:binary(); econf:re();
mod_opt_type(max_room_name) -> mod_opt_type(max_room_name) ->
econf:pos_int(infinity); econf:pos_int(infinity);
mod_opt_type(max_user_conferences) -> mod_opt_type(max_user_conferences) ->
@ -979,6 +1096,8 @@ mod_opt_type(min_message_interval) ->
econf:number(0); econf:number(0);
mod_opt_type(min_presence_interval) -> mod_opt_type(min_presence_interval) ->
econf:number(0); econf:number(0);
mod_opt_type(preload_rooms) ->
econf:bool();
mod_opt_type(room_shaper) -> mod_opt_type(room_shaper) ->
econf:atom(); econf:atom();
mod_opt_type(user_message_shaper) -> mod_opt_type(user_message_shaper) ->
@ -1053,6 +1172,7 @@ mod_options(Host) ->
{room_shaper, none}, {room_shaper, none},
{user_message_shaper, none}, {user_message_shaper, none},
{user_presence_shaper, none}, {user_presence_shaper, none},
{preload_rooms, true},
{default_room_options, {default_room_options,
[{allow_change_subj,true}, [{allow_change_subj,true},
{allow_private_messages,true}, {allow_private_messages,true},

View File

@ -25,6 +25,7 @@
-export([min_message_interval/1]). -export([min_message_interval/1]).
-export([min_presence_interval/1]). -export([min_presence_interval/1]).
-export([name/1]). -export([name/1]).
-export([preload_rooms/1]).
-export([queue_type/1]). -export([queue_type/1]).
-export([ram_db_type/1]). -export([ram_db_type/1]).
-export([regexp_room_id/1]). -export([regexp_room_id/1]).
@ -164,6 +165,12 @@ name(Opts) when is_map(Opts) ->
name(Host) -> name(Host) ->
gen_mod:get_module_opt(Host, mod_muc, name). gen_mod:get_module_opt(Host, mod_muc, name).
-spec preload_rooms(gen_mod:opts() | global | binary()) -> boolean().
preload_rooms(Opts) when is_map(Opts) ->
gen_mod:get_opt(preload_rooms, Opts);
preload_rooms(Host) ->
gen_mod:get_module_opt(Host, mod_muc, preload_rooms).
-spec queue_type(gen_mod:opts() | global | binary()) -> 'file' | 'ram'. -spec queue_type(gen_mod:opts() | global | binary()) -> 'file' | 'ram'.
queue_type(Opts) when is_map(Opts) -> queue_type(Opts) when is_map(Opts) ->
gen_mod:get_opt(queue_type, Opts); gen_mod:get_opt(queue_type, Opts);
@ -176,7 +183,7 @@ ram_db_type(Opts) when is_map(Opts) ->
ram_db_type(Host) -> ram_db_type(Host) ->
gen_mod:get_module_opt(Host, mod_muc, ram_db_type). gen_mod:get_module_opt(Host, mod_muc, ram_db_type).
-spec regexp_room_id(gen_mod:opts() | global | binary()) -> binary(). -spec regexp_room_id(gen_mod:opts() | global | binary()) -> <<>> | re:mp().
regexp_room_id(Opts) when is_map(Opts) -> regexp_room_id(Opts) when is_map(Opts) ->
gen_mod:get_opt(regexp_room_id, Opts); gen_mod:get_opt(regexp_room_id, Opts);
regexp_room_id(Host) -> regexp_room_id(Host) ->

View File

@ -93,23 +93,35 @@
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
%%% API %%% API
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
-spec start(binary(), binary(), mod_muc:access(), binary(), non_neg_integer(),
atom(), jid(), binary(), [{atom(), term()}], ram | file) ->
{ok, pid()} | {error, any()}.
start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, start(Host, ServerHost, Access, Room, HistorySize, RoomShaper,
Creator, Nick, DefRoomOpts, QueueType) -> Creator, Nick, DefRoomOpts, QueueType) ->
p1_fsm:start(?MODULE, [Host, ServerHost, Access, Room, HistorySize, p1_fsm:start(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
RoomShaper, Creator, Nick, DefRoomOpts, QueueType], RoomShaper, Creator, Nick, DefRoomOpts, QueueType],
?FSMOPTS). ?FSMOPTS).
-spec start(binary(), binary(), mod_muc:access(), binary(), non_neg_integer(),
atom(), [{atom(), term()}], ram | file) ->
{ok, pid()} | {error, any()}.
start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType) -> start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType) ->
p1_fsm:start(?MODULE, [Host, ServerHost, Access, Room, HistorySize, p1_fsm:start(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
RoomShaper, Opts, QueueType], RoomShaper, Opts, QueueType],
?FSMOPTS). ?FSMOPTS).
-spec start_link(binary(), binary(), mod_muc:access(), binary(), non_neg_integer(),
atom(), jid(), binary(), [{atom(), term()}], ram | file) ->
{ok, pid()} | {error, any()}.
start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper,
Creator, Nick, DefRoomOpts, QueueType) -> Creator, Nick, DefRoomOpts, QueueType) ->
p1_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize, p1_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
RoomShaper, Creator, Nick, DefRoomOpts, QueueType], RoomShaper, Creator, Nick, DefRoomOpts, QueueType],
?FSMOPTS). ?FSMOPTS).
-spec start_link(binary(), binary(), mod_muc:access(), binary(), non_neg_integer(),
atom(), [{atom(), term()}], ram | file) ->
{ok, pid()} | {error, any()}.
start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType) -> start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType) ->
p1_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize, p1_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
RoomShaper, Opts, QueueType], RoomShaper, Opts, QueueType],
@ -756,6 +768,7 @@ terminate(Reason, _StateName,
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
-spec route(pid(), stanza()) -> ok. -spec route(pid(), stanza()) -> ok.
route(Pid, Packet) -> route(Pid, Packet) ->
?DEBUG("Routing to MUC room ~p:~n~s", [Pid, xmpp:pp(Packet)]),
#jid{lresource = Nick} = xmpp:get_to(Packet), #jid{lresource = Nick} = xmpp:get_to(Packet),
p1_fsm:send_event(Pid, {route, Nick, Packet}). p1_fsm:send_event(Pid, {route, Nick, Packet}).