From b3caade0a015e99657b5eedfef80a32710a4e31b Mon Sep 17 00:00:00 2001 From: Evgeny Khramtsov Date: Fri, 5 Jul 2019 10:35:31 +0300 Subject: [PATCH] Distribute routing of MUC messages accross all CPU cores Also relay as less stanzas as possible through mod_muc workers --- src/mod_muc.erl | 636 +++++++++++++++++++++++++------------------ src/mod_muc_opt.erl | 9 +- src/mod_muc_room.erl | 13 + 3 files changed, 399 insertions(+), 259 deletions(-) diff --git a/src/mod_muc.erl b/src/mod_muc.erl index ef9a904b8..c69681261 100644 --- a/src/mod_muc.erl +++ b/src/mod_muc.erl @@ -22,20 +22,19 @@ %%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. %%% %%%---------------------------------------------------------------------- - -module(mod_muc). - -author('alexey@process-one.net'). - -protocol({xep, 45, '1.25'}). - --behaviour(gen_server). - +-ifndef(GEN_SERVER). +-define(GEN_SERVER, gen_server). +-endif. +-behaviour(?GEN_SERVER). -behaviour(gen_mod). %% API -export([start/2, stop/1, + start_link/3, reload/3, room_destroyed/4, store_room/4, @@ -66,7 +65,9 @@ count_online_rooms_by_user/3, get_online_rooms_by_user/3, can_use_nick/4, - get_subscribed_rooms/2]). + get_subscribed_rooms/2, + procname/2, + route/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, @@ -76,18 +77,15 @@ -include("xmpp.hrl"). -include("mod_muc.hrl"). -include("translate.hrl"). +-include("ejabberd_stacktrace.hrl"). --record(state, - {hosts = [] :: [binary()], - server_host = <<"">> :: binary(), - access = {none, none, none, none} :: {atom(), atom(), atom(), atom(), atom()}, - history_size = 20 :: non_neg_integer(), - max_rooms_discoitems = 100 :: non_neg_integer(), - queue_type = ram :: ram | file, - default_room_opts = [] :: list(), - room_shaper = none :: ejabberd_shaper:shaper()}). +-record(state, {hosts :: [binary()], + server_host :: binary(), + worker :: pos_integer()}). +-type access() :: {acl:acl(), acl:acl(), acl:acl(), acl:acl(), acl:acl()}. -type muc_room_opts() :: [{atom(), any()}]. +-export_type([access/0]). -callback init(binary(), gen_mod:opts()) -> any(). -callback import(binary(), binary(), [binary()]) -> ok. -callback store_room(binary(), binary(), binary(), list(), list()|undefined) -> {atomic, any()}. @@ -116,23 +114,159 @@ %% API %%==================================================================== start(Host, Opts) -> - gen_mod:start_child(?MODULE, Host, Opts). + case mod_muc_sup:start(Host, Opts) of + {ok, _} -> + MyHosts = gen_mod:get_opt_hosts(Opts), + Mod = gen_mod:db_mod(Opts, ?MODULE), + RMod = gen_mod:ram_db_mod(Opts, ?MODULE), + Mod:init(Host, gen_mod:set_opt(hosts, MyHosts, Opts)), + RMod:init(Host, gen_mod:set_opt(hosts, MyHosts, Opts)), + load_permanent_rooms(MyHosts, Host, Opts); + Err -> + Err + end. stop(Host) -> + Proc = mod_muc_sup:procname(Host), Rooms = shutdown_rooms(Host), - gen_mod:stop_child(?MODULE, Host), + supervisor:terminate_child(ejabberd_gen_mod_sup, Proc), + supervisor:delete_child(ejabberd_gen_mod_sup, Proc), {wait, Rooms}. -reload(Host, NewOpts, OldOpts) -> - Proc = gen_mod:get_module_proc(Host, ?MODULE), - gen_server:cast(Proc, {reload, Host, NewOpts, OldOpts}). +-spec reload(binary(), gen_mod:opts(), gen_mod:opts()) -> ok. +reload(ServerHost, NewOpts, OldOpts) -> + NewMod = gen_mod:db_mod(NewOpts, ?MODULE), + NewRMod = gen_mod:ram_db_mod(NewOpts, ?MODULE), + OldMod = gen_mod:db_mod(OldOpts, ?MODULE), + OldRMod = gen_mod:ram_db_mod(OldOpts, ?MODULE), + NewHosts = gen_mod:get_opt_hosts(NewOpts), + OldHosts = gen_mod:get_opt_hosts(OldOpts), + AddHosts = NewHosts -- OldHosts, + DelHosts = OldHosts -- NewHosts, + if NewMod /= OldMod -> + NewMod:init(ServerHost, gen_mod:set_opt(hosts, NewHosts, NewOpts)); + true -> + ok + end, + if NewRMod /= OldRMod -> + NewRMod:init(ServerHost, gen_mod:set_opt(hosts, NewHosts, NewOpts)); + true -> + ok + end, + lists:foreach( + fun(I) -> + ?GEN_SERVER:cast(procname(ServerHost, I), + {reload, AddHosts, DelHosts, NewHosts}) + end, lists:seq(1, erlang:system_info(logical_processors))), + load_permanent_rooms(AddHosts, ServerHost, NewOpts), + shutdown_rooms(ServerHost, DelHosts, OldRMod), + lists:foreach( + fun(Host) -> + lists:foreach( + fun({_, _, Pid}) when node(Pid) == node() -> + Pid ! config_reloaded; + (_) -> + ok + end, get_online_rooms(ServerHost, Host)) + end, misc:intersection(NewHosts, OldHosts)). depends(_Host, _Opts) -> [{mod_mam, soft}]. +start_link(Host, Opts, I) -> + Proc = procname(Host, I), + ?GEN_SERVER:start_link({local, Proc}, ?MODULE, [Host, Opts, I], + ejabberd_config:fsm_limit_opts([])). + +-spec procname(binary(), pos_integer() | {binary(), binary()}) -> atom(). +procname(Host, I) when is_integer(I) -> + binary_to_atom( + <<(atom_to_binary(?MODULE, latin1))/binary, "_", Host/binary, + "_", (integer_to_binary(I))/binary>>, utf8); +procname(Host, RoomHost) -> + Cores = erlang:system_info(logical_processors), + I = erlang:phash2(RoomHost, Cores) + 1, + procname(Host, I). + +-spec route(stanza()) -> ok. +route(Pkt) -> + To = xmpp:get_to(Pkt), + ServerHost = ejabberd_router:host_of_route(To#jid.lserver), + route(Pkt, ServerHost). + +-spec route(stanza(), binary()) -> ok. +route(Pkt, ServerHost) -> + From = xmpp:get_from(Pkt), + To = xmpp:get_to(Pkt), + Host = To#jid.lserver, + Access = mod_muc_opt:access(ServerHost), + case acl:match_rule(ServerHost, Access, From) of + allow -> + route(Pkt, Host, ServerHost); + deny -> + Lang = xmpp:get_lang(Pkt), + ErrText = ?T("Access denied by service policy"), + Err = xmpp:err_forbidden(ErrText, Lang), + ejabberd_router:route_error(Pkt, Err) + end. + +-spec route(stanza(), binary(), binary()) -> ok. +route(#iq{to = #jid{luser = <<"">>, lresource = <<"">>}} = IQ, _, _) -> + ejabberd_router:process_iq(IQ); +route(#message{lang = Lang, body = Body, type = Type, from = From, + to = #jid{luser = <<"">>, lresource = <<"">>}} = Pkt, + Host, ServerHost) -> + if Type == error -> + ok; + true -> + AccessAdmin = mod_muc_opt:access_admin(ServerHost), + case acl:match_rule(ServerHost, AccessAdmin, From) of + allow -> + Msg = xmpp:get_text(Body), + broadcast_service_message(ServerHost, Host, Msg); + deny -> + ErrText = ?T("Only service administrators are allowed " + "to send service messages"), + Err = xmpp:err_forbidden(ErrText, Lang), + ejabberd_router:route_error(Pkt, Err) + end + end; +route(Pkt, Host, ServerHost) -> + {Room, _, _} = jid:tolower(xmpp:get_to(Pkt)), + case Room of + <<"">> -> + Txt = ?T("No module is handling this query"), + Err = xmpp:err_service_unavailable(Txt, xmpp:get_lang(Pkt)), + ejabberd_router:route_error(Pkt, Err); + _ -> + RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE), + case RMod:find_online_room(ServerHost, Room, Host) of + error -> + Proc = procname(ServerHost, {Room, Host}), + case whereis(Proc) of + Pid when Pid == self() -> + route_to_room(Pkt, ServerHost); + Pid when is_pid(Pid) -> + ?DEBUG("Routing to MUC worker ~p:~n~s", [Proc, xmpp:pp(Pkt)]), + ?GEN_SERVER:cast(Pid, {route_to_room, Pkt}); + undefined -> + ?DEBUG("MUC worker ~p is dead", [Proc]), + Err = xmpp:err_internal_server_error(), + ejabberd_router:route_error(Pkt, Err) + end; + {ok, Pid} -> + mod_muc_room:route(Pid, Pkt) + end + end. + +-spec shutdown_rooms(binary()) -> [pid()]. shutdown_rooms(ServerHost) -> RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE), Hosts = gen_mod:get_module_opt_hosts(ServerHost, mod_muc), + shutdown_rooms(ServerHost, Hosts, RMod). + +-spec shutdown_rooms(binary(), [binary()], module()) -> [pid()]. +shutdown_rooms(ServerHost, Hosts, RMod) -> Rooms = [RMod:get_online_rooms(ServerHost, Host, undefined) || Host <- Hosts], lists:flatmap( @@ -149,18 +283,18 @@ shutdown_rooms(ServerHost) -> %% C) mod_muc:stop was called, and each room is being terminated %% In this case, the mod_muc process died before the room processes %% So the message sending must be catched +-spec room_destroyed(binary(), binary(), pid(), binary()) -> ok. room_destroyed(Host, Room, Pid, ServerHost) -> - catch gen_mod:get_module_proc(ServerHost, ?MODULE) ! - {room_destroyed, {Room, Host}, Pid}, - ok. + Proc = procname(ServerHost, {Room, Host}), + ?GEN_SERVER:cast(Proc, {room_destroyed, {Room, Host}, Pid}). %% @doc Create a room. %% If Opts = default, the default room options are used. %% Else use the passed options as defined in mod_muc_room. create_room(Host, Name, From, Nick, Opts) -> ServerHost = ejabberd_router:host_of_route(Host), - Proc = gen_mod:get_module_proc(ServerHost, ?MODULE), - gen_server:call(Proc, {create, Name, Host, From, Nick, Opts}). + Proc = procname(ServerHost, {Name, Host}), + ?GEN_SERVER:call(Proc, {create, Name, Host, From, Nick, Opts}). store_room(ServerHost, Host, Name, Opts) -> store_room(ServerHost, Host, Name, Opts, undefined). @@ -232,228 +366,158 @@ get_online_rooms_by_user(ServerHost, LUser, LServer) -> %%==================================================================== %% gen_server callbacks %%==================================================================== - -init([Host, Opts]) -> +init([Host, Opts, Worker]) -> process_flag(trap_exit, true), - #state{access = Access, hosts = MyHosts, - history_size = HistorySize, queue_type = QueueType, - room_shaper = RoomShaper} = State = init_state(Host, Opts), - Mod = gen_mod:db_mod(Opts, ?MODULE), - RMod = gen_mod:ram_db_mod(Opts, ?MODULE), - Mod:init(Host, gen_mod:set_opt(hosts, MyHosts, Opts)), - RMod:init(Host, gen_mod:set_opt(hosts, MyHosts, Opts)), - lists:foreach( - fun(MyHost) -> - register_iq_handlers(MyHost), - ejabberd_router:register_route(MyHost, Host), - load_permanent_rooms(MyHost, Host, Access, HistorySize, - RoomShaper, QueueType) - end, MyHosts), - {ok, State}. + MyHosts = gen_mod:get_opt_hosts(Opts), + register_routes(Host, MyHosts, Worker), + register_iq_handlers(MyHosts, Worker), + {ok, #state{server_host = Host, hosts = MyHosts, worker = Worker}}. handle_call(stop, _From, State) -> {stop, normal, ok, State}; handle_call({create, Room, Host, From, Nick, Opts}, _From, - #state{server_host = ServerHost, - access = Access, default_room_opts = DefOpts, - history_size = HistorySize, queue_type = QueueType, - room_shaper = RoomShaper} = State) -> + #state{server_host = ServerHost} = State) -> ?DEBUG("MUC: create new room '~s'~n", [Room]), NewOpts = case Opts of - default -> DefOpts; - _ -> Opts + default -> mod_muc_opt:default_room_options(ServerHost); + _ -> Opts end, - {ok, Pid} = mod_muc_room:start( - Host, ServerHost, Access, - Room, HistorySize, - RoomShaper, From, - Nick, NewOpts, QueueType), RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE), - RMod:register_online_room(ServerHost, Room, Host, Pid), - ejabberd_hooks:run(create_room, ServerHost, [ServerHost, Room, Host]), - {reply, ok, State}. + case start_room(RMod, Host, ServerHost, Room, NewOpts, From, Nick) of + {ok, _} -> + ejabberd_hooks:run(create_room, ServerHost, [ServerHost, Room, Host]), + {reply, ok, State}; + Err -> + {reply, Err, State} + end. -handle_cast({reload, ServerHost, NewOpts, OldOpts}, #state{hosts = OldHosts}) -> - NewMod = gen_mod:db_mod(NewOpts, ?MODULE), - NewRMod = gen_mod:ram_db_mod(NewOpts, ?MODULE), - OldMod = gen_mod:db_mod(OldOpts, ?MODULE), - OldRMod = gen_mod:ram_db_mod(OldOpts, ?MODULE), - #state{hosts = NewHosts} = NewState = init_state(ServerHost, NewOpts), - if NewMod /= OldMod -> - NewMod:init(ServerHost, gen_mod:set_opt(hosts, NewHosts, NewOpts)); - true -> - ok - end, - if NewRMod /= OldRMod -> - NewRMod:init(ServerHost, gen_mod:set_opt(hosts, NewHosts, NewOpts)); - true -> - ok - end, - lists:foreach( - fun(NewHost) -> - ejabberd_router:register_route(NewHost, ServerHost), - register_iq_handlers(NewHost) - end, NewHosts -- OldHosts), - lists:foreach( - fun(OldHost) -> - ejabberd_router:unregister_route(OldHost), - unregister_iq_handlers(OldHost) - end, OldHosts -- NewHosts), - lists:foreach( - fun(Host) -> - lists:foreach( - fun({_, _, Pid}) when node(Pid) == node() -> - Pid ! config_reloaded; - (_) -> - ok - end, get_online_rooms(ServerHost, Host)) - end, misc:intersection(NewHosts, OldHosts)), - {noreply, NewState}; -handle_cast(Msg, State) -> - ?WARNING_MSG("Unexpected cast: ~p", [Msg]), - {noreply, State}. - -handle_info({route, Packet}, - #state{server_host = ServerHost, - access = Access, default_room_opts = DefRoomOpts, - history_size = HistorySize, queue_type = QueueType, - max_rooms_discoitems = MaxRoomsDiscoItems, - room_shaper = RoomShaper} = State) -> - From = xmpp:get_from(Packet), - To = xmpp:get_to(Packet), - Host = To#jid.lserver, - case catch do_route(Host, ServerHost, Access, HistorySize, RoomShaper, - From, To, Packet, DefRoomOpts, MaxRoomsDiscoItems, - QueueType) of - {'EXIT', Reason} -> - ?ERROR_MSG("~p", [Reason]); - _ -> - ok +handle_cast({route_to_room, Packet}, #state{server_host = ServerHost} = State) -> + try route_to_room(Packet, ServerHost) + catch ?EX_RULE(E, R, St) -> + StackTrace = ?EX_STACK(St), + ?ERROR_MSG("Failed to route packet:~n~s~nReason = ~p", + [xmpp:pp(Packet), {E, {R, StackTrace}}]) end, {noreply, State}; -handle_info({room_destroyed, {Room, Host}, Pid}, State) -> +handle_cast({room_destroyed, {Room, Host}, Pid}, State) -> ServerHost = State#state.server_host, RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE), RMod:unregister_online_room(ServerHost, Room, Host, Pid), {noreply, State}; +handle_cast({reload, AddHosts, DelHosts, NewHosts}, + #state{server_host = ServerHost, worker = Worker} = State) -> + register_routes(ServerHost, AddHosts, Worker), + register_iq_handlers(AddHosts, Worker), + unregister_routes(DelHosts, Worker), + unregister_iq_handlers(DelHosts, Worker), + {noreply, State#state{hosts = NewHosts}}; +handle_cast(Msg, State) -> + ?WARNING_MSG("Unexpected cast: ~p", [Msg]), + {noreply, State}. + +handle_info({route, Packet}, State) -> + %% We can only receive the packet here from other nodes + %% where mod_muc is not loaded. Such configuration + %% is *highly* discouraged + try route(Packet, State#state.server_host) + catch ?EX_RULE(E, R, St) -> + StackTrace = ?EX_STACK(St), + ?ERROR_MSG("Failed to route packet:~n~s~nReason = ~p", + [xmpp:pp(Packet), {E, {R, StackTrace}}]) + end, + {noreply, State}; +handle_info({room_destroyed, {Room, Host}, Pid}, State) -> + %% For backward compat + handle_cast({room_destroyed, {Room, Host}, Pid}, State); handle_info(Info, State) -> ?ERROR_MSG("Unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{hosts = MyHosts}) -> - lists:foreach( - fun(MyHost) -> - ejabberd_router:unregister_route(MyHost), - unregister_iq_handlers(MyHost) - end, MyHosts). +terminate(_Reason, #state{hosts = Hosts, worker = Worker}) -> + unregister_routes(Hosts, Worker), + unregister_iq_handlers(Hosts, Worker). code_change(_OldVsn, State, _Extra) -> {ok, State}. %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- -init_state(Host, Opts) -> - MyHosts = gen_mod:get_opt_hosts(Opts), - Access = mod_muc_opt:access(Opts), - AccessCreate = mod_muc_opt:access_create(Opts), - AccessAdmin = mod_muc_opt:access_admin(Opts), - AccessPersistent = mod_muc_opt:access_persistent(Opts), - AccessMam = mod_muc_opt:access_mam(Opts), - HistorySize = mod_muc_opt:history_size(Opts), - MaxRoomsDiscoItems = mod_muc_opt:max_rooms_discoitems(Opts), - DefRoomOpts = mod_muc_opt:default_room_options(Opts), - QueueType = mod_muc_opt:queue_type(Opts), - RoomShaper = mod_muc_opt:room_shaper(Opts), - #state{hosts = MyHosts, - server_host = Host, - access = {Access, AccessCreate, AccessAdmin, AccessPersistent, AccessMam}, - default_room_opts = DefRoomOpts, - queue_type = QueueType, - history_size = HistorySize, - max_rooms_discoitems = MaxRoomsDiscoItems, - room_shaper = RoomShaper}. +-spec register_iq_handlers([binary()], pos_integer()) -> ok. +register_iq_handlers(Hosts, 1) -> + %% Only register handlers on first worker + lists:foreach( + fun(Host) -> + gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_REGISTER, + ?MODULE, process_register), + gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_VCARD, + ?MODULE, process_vcard), + gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MUCSUB, + ?MODULE, process_mucsub), + gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MUC_UNIQUE, + ?MODULE, process_muc_unique), + gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO, + ?MODULE, process_disco_info), + gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS, + ?MODULE, process_disco_items) + end, Hosts); +register_iq_handlers(_, _) -> + ok. -register_iq_handlers(Host) -> - gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_REGISTER, - ?MODULE, process_register), - gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_VCARD, - ?MODULE, process_vcard), - gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MUCSUB, - ?MODULE, process_mucsub), - gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_MUC_UNIQUE, - ?MODULE, process_muc_unique), - gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO, - ?MODULE, process_disco_info), - gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS, - ?MODULE, process_disco_items). +-spec unregister_iq_handlers([binary()], pos_integer()) -> ok. +unregister_iq_handlers(Hosts, 1) -> + %% Only unregister handlers on first worker + lists:foreach( + fun(Host) -> + gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_REGISTER), + gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_VCARD), + gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MUCSUB), + gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MUC_UNIQUE), + gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO), + gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS) + end, Hosts); +unregister_iq_handlers(_, _) -> + ok. -unregister_iq_handlers(Host) -> - gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_REGISTER), - gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_VCARD), - gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MUCSUB), - gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MUC_UNIQUE), - gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO), - gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_ITEMS). +-spec register_routes(binary(), [binary()], pos_integer()) -> ok. +register_routes(ServerHost, Hosts, 1) -> + %% Only register routes on first worker + lists:foreach( + fun(Host) -> + ejabberd_router:register_route( + Host, ServerHost, {apply, ?MODULE, route}) + end, Hosts); +register_routes(_, _, _) -> + ok. -do_route(Host, ServerHost, Access, HistorySize, RoomShaper, - From, To, Packet, DefRoomOpts, _MaxRoomsDiscoItems, QueueType) -> - {AccessRoute, _AccessCreate, _AccessAdmin, _AccessPersistent, _AccessMam} = Access, - case acl:match_rule(ServerHost, AccessRoute, From) of - allow -> - do_route1(Host, ServerHost, Access, HistorySize, RoomShaper, - From, To, Packet, DefRoomOpts, QueueType); - deny -> - Lang = xmpp:get_lang(Packet), - ErrText = ?T("Access denied by service policy"), - Err = xmpp:err_forbidden(ErrText, Lang), - ejabberd_router:route_error(Packet, Err) - end. +-spec unregister_routes([binary()], pos_integer()) -> ok. +unregister_routes(Hosts, 1) -> + %% Only unregister routes on first worker + lists:foreach( + fun(Host) -> + ejabberd_router:unregister_route(Host) + end, Hosts); +unregister_routes(_, _) -> + ok. -do_route1(_Host, _ServerHost, _Access, _HistorySize, _RoomShaper, - _From, #jid{luser = <<"">>, lresource = <<"">>} = _To, - #iq{} = IQ, _DefRoomOpts, _QueueType) -> - ejabberd_router:process_iq(IQ); -do_route1(Host, ServerHost, Access, _HistorySize, _RoomShaper, - From, #jid{luser = <<"">>, lresource = <<"">>} = _To, - #message{lang = Lang, body = Body, type = Type} = Packet, _, _) -> - {_AccessRoute, _AccessCreate, AccessAdmin, _AccessPersistent, _AccessMam} = Access, - if Type == error -> - ok; - true -> - case acl:match_rule(ServerHost, AccessAdmin, From) of - allow -> - Msg = xmpp:get_text(Body), - broadcast_service_message(ServerHost, Host, Msg); - deny -> - ErrText = ?T("Only service administrators are allowed " - "to send service messages"), - Err = xmpp:err_forbidden(ErrText, Lang), - ejabberd_router:route_error(Packet, Err) - end - end; -do_route1(_Host, _ServerHost, _Access, _HistorySize, _RoomShaper, - _From, #jid{luser = <<"">>} = _To, Packet, _DefRoomOpts, _) -> - Err = xmpp:err_service_unavailable(), - ejabberd_router:route_error(Packet, Err); -do_route1(Host, ServerHost, Access, HistorySize, RoomShaper, - From, To, Packet, DefRoomOpts, QueueType) -> - {Room, _, Nick} = jid:tolower(To), +-spec route_to_room(stanza(), binary()) -> ok. +route_to_room(Packet, ServerHost) -> + From = xmpp:get_from(Packet), + To = xmpp:get_to(Packet), + {Room, Host, Nick} = jid:tolower(To), RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE), case RMod:find_online_room(ServerHost, Room, Host) of error -> case is_create_request(Packet) of true -> - case check_create_room( - ServerHost, Host, Room, From, Access) of + case check_create_room(ServerHost, Host, Room, From) of true -> - {ok, Pid} = start_new_room( - Host, ServerHost, Access, - Room, HistorySize, - RoomShaper, From, Nick, DefRoomOpts, - QueueType), - RMod:register_online_room(ServerHost, Room, Host, Pid), - mod_muc_room:route(Pid, Packet), - ok; + case start_new_room(Host, ServerHost, Room, From, Nick) of + {ok, Pid} -> + mod_muc_room:route(Pid, Packet); + _Err -> + Err = xmpp:err_internal_server_error(), + ejabberd_router:route_error(Packet, Err) + end; false -> Lang = xmpp:get_lang(Packet), ErrText = ?T("Room creation is denied by service policy"), @@ -467,9 +531,7 @@ do_route1(Host, ServerHost, Access, HistorySize, RoomShaper, ejabberd_router:route_error(Packet, Err) end; {ok, Pid} -> - ?DEBUG("MUC: send to process ~p~n", [Pid]), - mod_muc_room:route(Pid, Packet), - ok + mod_muc_room:route(Pid, Packet) end. -spec process_vcard(iq()) -> iq(). @@ -612,11 +674,9 @@ is_create_request(#iq{type = T} = IQ) when T == get; T == set -> is_create_request(_) -> false. --spec check_create_room(binary(), binary(), binary(), jid(), tuple()) - -> boolean(). -check_create_room(ServerHost, Host, Room, From, Access) -> - {_AccessRoute, AccessCreate, AccessAdmin, - _AccessPersistent, _AccessMam} = Access, +-spec check_create_room(binary(), binary(), binary(), jid()) -> boolean(). +check_create_room(ServerHost, Host, Room, From) -> + AccessCreate = mod_muc_opt:access_create(ServerHost), case acl:match_rule(ServerHost, AccessCreate, From) of allow -> case mod_muc_opt:max_room_id(ServerHost) of @@ -624,8 +684,8 @@ check_create_room(ServerHost, Host, Room, From, Access) -> Regexp = mod_muc_opt:regexp_room_id(ServerHost), case re:run(Room, Regexp, [unicode, {capture, none}]) of match -> - case acl:match_rule( - ServerHost, AccessAdmin, From) of + AccessAdmin = mod_muc_opt:access_admin(ServerHost), + case acl:match_rule(ServerHost, AccessAdmin, From) of allow -> true; _ -> @@ -643,37 +703,56 @@ check_create_room(ServerHost, Host, Room, From, Access) -> false end. +-spec get_access(binary() | gen_mod:opts()) -> access(). +get_access(ServerHost) -> + Access = mod_muc_opt:access(ServerHost), + AccessCreate = mod_muc_opt:access_create(ServerHost), + AccessAdmin = mod_muc_opt:access_admin(ServerHost), + AccessPersistent = mod_muc_opt:access_persistent(ServerHost), + AccessMam = mod_muc_opt:access_mam(ServerHost), + {Access, AccessCreate, AccessAdmin, AccessPersistent, AccessMam}. + +-spec get_rooms(binary(), binary()) -> [#muc_room{}]. get_rooms(ServerHost, Host) -> - LServer = jid:nameprep(ServerHost), - Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:get_rooms(LServer, Host). + Mod = gen_mod:db_mod(ServerHost, ?MODULE), + Mod:get_rooms(ServerHost, Host). -load_permanent_rooms(Host, ServerHost, Access, - HistorySize, RoomShaper, QueueType) -> +-spec load_permanent_rooms([binary()], binary(), gen_mod:opts()) -> ok. +load_permanent_rooms(Hosts, ServerHost, Opts) -> + case mod_muc_opt:preload_rooms(Opts) of + true -> + Access = get_access(Opts), + HistorySize = mod_muc_opt:history_size(Opts), + QueueType = mod_muc_opt:queue_type(Opts), + RoomShaper = mod_muc_opt:room_shaper(Opts), + RMod = gen_mod:ram_db_mod(Opts, ?MODULE), + lists:foreach( + fun(Host) -> + ?DEBUG("Loading rooms at ~s", [Host]), + lists:foreach( + fun(R) -> + {Room, _} = R#muc_room.name_host, + case proplists:get_bool(persistent, R#muc_room.opts) of + true -> + case RMod:find_online_room(ServerHost, Room, Host) of + error -> + start_room(RMod, Host, ServerHost, Access, + Room, HistorySize, RoomShaper, + R#muc_room.opts, QueueType); + {ok, _} -> + ok + end; + _ -> + forget_room(ServerHost, Host, Room) + end + end, get_rooms(ServerHost, Host)) + end, Hosts); + false -> + ok + end. + +start_new_room(Host, ServerHost, Room, From, Nick) -> RMod = gen_mod:ram_db_mod(ServerHost, ?MODULE), - lists:foreach( - fun(R) -> - {Room, Host} = R#muc_room.name_host, - case proplists:get_bool(persistent, R#muc_room.opts) of - true -> - case RMod:find_online_room(ServerHost, Room, Host) of - error -> - {ok, Pid} = mod_muc_room:start(Host, - ServerHost, Access, Room, - HistorySize, RoomShaper, - R#muc_room.opts, QueueType), - RMod:register_online_room(ServerHost, Room, Host, Pid); - {ok, _} -> - ok - end; - _ -> - forget_room(ServerHost, Host, Room) - end - end, get_rooms(ServerHost, Host)). - -start_new_room(Host, ServerHost, Access, Room, - HistorySize, RoomShaper, From, - Nick, DefRoomOpts, QueueType) -> Opts = case restore_room(ServerHost, Host, Room) of error -> error; @@ -687,14 +766,52 @@ start_new_room(Host, ServerHost, Access, Room, end, case Opts of error -> - ?DEBUG("MUC: open new room '~s'~n", [Room]), - mod_muc_room:start(Host, ServerHost, Access, Room, - HistorySize, RoomShaper, - From, Nick, DefRoomOpts, QueueType); + ?DEBUG("Open new room: ~s", [Room]), + DefRoomOpts = mod_muc_opt:default_room_options(ServerHost), + start_room(RMod, Host, ServerHost, Room, DefRoomOpts, From, Nick); _ -> - ?DEBUG("MUC: restore room '~s'~n", [Room]), - mod_muc_room:start(Host, ServerHost, Access, Room, - HistorySize, RoomShaper, Opts, QueueType) + ?DEBUG("Restore room: ~s", [Room]), + start_room(RMod, Host, ServerHost, Room, Opts) + end. + +start_room(Mod, Host, ServerHost, Room, DefOpts) -> + Access = get_access(ServerHost), + HistorySize = mod_muc_opt:history_size(ServerHost), + QueueType = mod_muc_opt:queue_type(ServerHost), + RoomShaper = mod_muc_opt:room_shaper(ServerHost), + start_room(Mod, Host, ServerHost, Access, Room, HistorySize, + RoomShaper, DefOpts, QueueType). + +start_room(Mod, Host, ServerHost, Room, DefOpts, Creator, Nick) -> + Access = get_access(ServerHost), + HistorySize = mod_muc_opt:history_size(ServerHost), + QueueType = mod_muc_opt:queue_type(ServerHost), + RoomShaper = mod_muc_opt:room_shaper(ServerHost), + start_room(Mod, Host, ServerHost, Access, Room, + HistorySize, RoomShaper, + Creator, Nick, DefOpts, QueueType). + +start_room(Mod, Host, ServerHost, Access, Room, + HistorySize, RoomShaper, DefOpts, QueueType) -> + case mod_muc_room:start(Host, ServerHost, Access, Room, + HistorySize, RoomShaper, DefOpts, QueueType) of + {ok, Pid} -> + Mod:register_online_room(ServerHost, Room, Host, Pid), + {ok, Pid}; + Err -> + Err + end. + +start_room(Mod, Host, ServerHost, Access, Room, HistorySize, + RoomShaper, Creator, Nick, DefOpts, QueueType) -> + case mod_muc_room:start(Host, ServerHost, Access, Room, + HistorySize, RoomShaper, + Creator, Nick, DefOpts, QueueType) of + {ok, Pid} -> + Mod:register_online_room(ServerHost, Room, Host, Pid), + {ok, Pid}; + Err -> + Err end. -spec iq_disco_items(binary(), binary(), jid(), binary(), integer(), binary(), @@ -964,7 +1081,7 @@ mod_opt_type(max_room_id) -> mod_opt_type(max_rooms_discoitems) -> econf:non_neg_int(); mod_opt_type(regexp_room_id) -> - econf:binary(); + econf:re(); mod_opt_type(max_room_name) -> econf:pos_int(infinity); mod_opt_type(max_user_conferences) -> @@ -979,6 +1096,8 @@ mod_opt_type(min_message_interval) -> econf:number(0); mod_opt_type(min_presence_interval) -> econf:number(0); +mod_opt_type(preload_rooms) -> + econf:bool(); mod_opt_type(room_shaper) -> econf:atom(); mod_opt_type(user_message_shaper) -> @@ -1053,6 +1172,7 @@ mod_options(Host) -> {room_shaper, none}, {user_message_shaper, none}, {user_presence_shaper, none}, + {preload_rooms, true}, {default_room_options, [{allow_change_subj,true}, {allow_private_messages,true}, diff --git a/src/mod_muc_opt.erl b/src/mod_muc_opt.erl index 67c42e98f..df6d5e784 100644 --- a/src/mod_muc_opt.erl +++ b/src/mod_muc_opt.erl @@ -25,6 +25,7 @@ -export([min_message_interval/1]). -export([min_presence_interval/1]). -export([name/1]). +-export([preload_rooms/1]). -export([queue_type/1]). -export([ram_db_type/1]). -export([regexp_room_id/1]). @@ -164,6 +165,12 @@ name(Opts) when is_map(Opts) -> name(Host) -> gen_mod:get_module_opt(Host, mod_muc, name). +-spec preload_rooms(gen_mod:opts() | global | binary()) -> boolean(). +preload_rooms(Opts) when is_map(Opts) -> + gen_mod:get_opt(preload_rooms, Opts); +preload_rooms(Host) -> + gen_mod:get_module_opt(Host, mod_muc, preload_rooms). + -spec queue_type(gen_mod:opts() | global | binary()) -> 'file' | 'ram'. queue_type(Opts) when is_map(Opts) -> gen_mod:get_opt(queue_type, Opts); @@ -176,7 +183,7 @@ ram_db_type(Opts) when is_map(Opts) -> ram_db_type(Host) -> gen_mod:get_module_opt(Host, mod_muc, ram_db_type). --spec regexp_room_id(gen_mod:opts() | global | binary()) -> binary(). +-spec regexp_room_id(gen_mod:opts() | global | binary()) -> <<>> | re:mp(). regexp_room_id(Opts) when is_map(Opts) -> gen_mod:get_opt(regexp_room_id, Opts); regexp_room_id(Host) -> diff --git a/src/mod_muc_room.erl b/src/mod_muc_room.erl index bbbd3a2ec..b0d9da276 100644 --- a/src/mod_muc_room.erl +++ b/src/mod_muc_room.erl @@ -93,23 +93,35 @@ %%%---------------------------------------------------------------------- %%% API %%%---------------------------------------------------------------------- +-spec start(binary(), binary(), mod_muc:access(), binary(), non_neg_integer(), + atom(), jid(), binary(), [{atom(), term()}], ram | file) -> + {ok, pid()} | {error, any()}. start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Creator, Nick, DefRoomOpts, QueueType) -> p1_fsm:start(?MODULE, [Host, ServerHost, Access, Room, HistorySize, RoomShaper, Creator, Nick, DefRoomOpts, QueueType], ?FSMOPTS). +-spec start(binary(), binary(), mod_muc:access(), binary(), non_neg_integer(), + atom(), [{atom(), term()}], ram | file) -> + {ok, pid()} | {error, any()}. start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType) -> p1_fsm:start(?MODULE, [Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType], ?FSMOPTS). +-spec start_link(binary(), binary(), mod_muc:access(), binary(), non_neg_integer(), + atom(), jid(), binary(), [{atom(), term()}], ram | file) -> + {ok, pid()} | {error, any()}. start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Creator, Nick, DefRoomOpts, QueueType) -> p1_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize, RoomShaper, Creator, Nick, DefRoomOpts, QueueType], ?FSMOPTS). +-spec start_link(binary(), binary(), mod_muc:access(), binary(), non_neg_integer(), + atom(), [{atom(), term()}], ram | file) -> + {ok, pid()} | {error, any()}. start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType) -> p1_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType], @@ -756,6 +768,7 @@ terminate(Reason, _StateName, %%%---------------------------------------------------------------------- -spec route(pid(), stanza()) -> ok. route(Pid, Packet) -> + ?DEBUG("Routing to MUC room ~p:~n~s", [Pid, xmpp:pp(Packet)]), #jid{lresource = Nick} = xmpp:get_to(Packet), p1_fsm:send_event(Pid, {route, Nick, Packet}).