25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-11-28 16:34:13 +01:00

Implement database backend interface for ejabberd_router

This commit is contained in:
Evgeniy Khramtsov 2017-01-11 16:25:43 +03:00
parent 2129a33077
commit 02f96d0f41
10 changed files with 326 additions and 282 deletions

View File

@ -54,8 +54,6 @@ start(normal, _Args) ->
ejabberd_ctl:init(), ejabberd_ctl:init(),
ejabberd_commands:init(), ejabberd_commands:init(),
ejabberd_admin:start(), ejabberd_admin:start(),
gen_mod:start(),
ext_mod:start(),
setup_if_elixir_conf_used(), setup_if_elixir_conf_used(),
ejabberd_config:start(), ejabberd_config:start(),
set_settings_from_config(), set_settings_from_config(),
@ -66,11 +64,13 @@ start(normal, _Args) ->
ejabberd_rdbms:start(), ejabberd_rdbms:start(),
ejabberd_riak_sup:start(), ejabberd_riak_sup:start(),
ejabberd_redis:start(), ejabberd_redis:start(),
ejabberd_router:start(),
ejabberd_router_multicast:start(),
ejabberd_local:start(),
ejabberd_sm:start(), ejabberd_sm:start(),
cyrsasl:start(), cyrsasl:start(),
% Profiling gen_mod:start(),
%ejabberd_debug:eprof_start(), ext_mod:start(),
%ejabberd_debug:fprof_start(),
maybe_add_nameservers(), maybe_add_nameservers(),
ejabberd_auth:start(), ejabberd_auth:start(),
ejabberd_oauth:start(), ejabberd_oauth:start(),

View File

@ -35,10 +35,11 @@
get_version/0, get_myhosts/0, get_mylang/0, get_version/0, get_myhosts/0, get_mylang/0,
get_ejabberd_config_path/0, is_using_elixir_config/0, get_ejabberd_config_path/0, is_using_elixir_config/0,
prepare_opt_val/4, convert_table_to_binary/5, prepare_opt_val/4, convert_table_to_binary/5,
transform_options/1, collect_options/1, default_db/2, transform_options/1, collect_options/1,
convert_to_yaml/1, convert_to_yaml/2, v_db/2, convert_to_yaml/1, convert_to_yaml/2, v_db/2,
env_binary_to_list/2, opt_type/1, may_hide_data/1, env_binary_to_list/2, opt_type/1, may_hide_data/1,
is_elixir_enabled/0, v_dbs/1, v_dbs_mods/1, is_elixir_enabled/0, v_dbs/1, v_dbs_mods/1,
default_db/1, default_db/2, default_ram_db/1, default_ram_db/2,
fsm_limit_opts/1]). fsm_limit_opts/1]).
-export([start/2]). -export([start/2]).
@ -857,8 +858,8 @@ get_option(Opt, F, Default) ->
case Opt of case Opt of
{Key, Host} when Host /= global -> {Key, Host} when Host /= global ->
get_option({Key, global}, F, Default); get_option({Key, global}, F, Default);
_ -> _ ->
Default Default
end end
end. end.
@ -907,11 +908,26 @@ v_dbs_mods(Mod) ->
(atom_to_binary(M, utf8))/binary>>, utf8) (atom_to_binary(M, utf8))/binary>>, utf8)
end, ets:match(module_db, {Mod, '$1'})). end, ets:match(module_db, {Mod, '$1'})).
-spec default_db(binary(), module()) -> atom(). -spec default_db(module()) -> atom().
default_db(Module) ->
default_db(global, Module).
-spec default_db(binary(), module()) -> atom().
default_db(Host, Module) -> default_db(Host, Module) ->
default_db(default_db, Host, Module).
-spec default_ram_db(module()) -> atom().
default_ram_db(Module) ->
default_ram_db(global, Module).
-spec default_ram_db(binary(), module()) -> atom().
default_ram_db(Host, Module) ->
default_db(default_ram_db, Host, Module).
-spec default_db(default_db | default_ram_db, binary(), module()) -> atom().
default_db(Opt, Host, Module) ->
case ejabberd_config:get_option( case ejabberd_config:get_option(
{default_db, Host}, fun(T) when is_atom(T) -> T end) of {Opt, Host}, fun(T) when is_atom(T) -> T end) of
undefined -> undefined ->
mnesia; mnesia;
DBType -> DBType ->
@ -919,8 +935,8 @@ default_db(Host, Module) ->
v_db(Module, DBType) v_db(Module, DBType)
catch error:badarg -> catch error:badarg ->
?WARNING_MSG("Module '~s' doesn't support database '~s' " ?WARNING_MSG("Module '~s' doesn't support database '~s' "
"defined in option 'default_db', using " "defined in option '~s', using "
"'mnesia' as fallback", [Module, DBType]), "'mnesia' as fallback", [Module, DBType, Opt]),
mnesia mnesia
end end
end. end.
@ -1406,8 +1422,13 @@ opt_type(language) ->
fun iolist_to_binary/1; fun iolist_to_binary/1;
opt_type(max_fsm_queue) -> opt_type(max_fsm_queue) ->
fun (I) when is_integer(I), I > 0 -> I end; fun (I) when is_integer(I), I > 0 -> I end;
opt_type(default_db) ->
fun(T) when is_atom(T) -> T end;
opt_type(default_ram_db) ->
fun(T) when is_atom(T) -> T end;
opt_type(_) -> opt_type(_) ->
[hide_sensitive_log_data, hosts, language]. [hide_sensitive_log_data, hosts, language,
default_db, default_ram_db].
-spec may_hide_data(string()) -> string(); -spec may_hide_data(string()) -> string();
(binary()) -> binary(). (binary()) -> binary().

View File

@ -30,7 +30,7 @@
-behaviour(gen_server). -behaviour(gen_server).
%% API %% API
-export([start_link/0]). -export([start/0, start_link/0]).
-export([route/3, route_iq/4, route_iq/5, process_iq/3, -export([route/3, route_iq/4, route_iq/5, process_iq/3,
process_iq_reply/3, register_iq_handler/4, process_iq_reply/3, register_iq_handler/4,
@ -68,6 +68,11 @@
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} %% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server %% Description: Starts the server
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
start() ->
ChildSpec = {?MODULE, {?MODULE, start_link, []},
transient, 1000, worker, [?MODULE]},
supervisor:start_child(ejabberd_sup, ChildSpec).
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], gen_server:start_link({local, ?MODULE}, ?MODULE, [],
[]). []).

View File

@ -34,7 +34,6 @@
%% API %% API
-export([route/3, -export([route/3,
route_error/4, route_error/4,
register_route/1,
register_route/2, register_route/2,
register_route/3, register_route/3,
register_routes/1, register_routes/1,
@ -42,43 +41,49 @@
process_iq/3, process_iq/3,
unregister_route/1, unregister_route/1,
unregister_routes/1, unregister_routes/1,
dirty_get_all_routes/0, get_all_routes/0,
dirty_get_all_domains/0,
is_my_route/1, is_my_route/1,
is_my_host/1 is_my_host/1,
]). get_backend/0]).
-export([start_link/0]). -export([start/0, start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, -export([init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2, code_change/3, opt_type/1]). handle_info/2, terminate/2, code_change/3, opt_type/1]).
-include("ejabberd.hrl"). -include("ejabberd.hrl").
-include("logger.hrl"). -include("logger.hrl").
-include("ejabberd_router.hrl").
-include("xmpp.hrl"). -include("xmpp.hrl").
-type local_hint() :: undefined | integer() | {apply, atom(), atom()}. -callback init() -> any().
-callback register_route(binary(), binary(), local_hint(),
-record(route, {domain, server_host, pid, local_hint}). undefined | pos_integer()) -> ok | {error, term()}.
-callback unregister_route(binary(), undefined | pos_integer()) -> ok | {error, term()}.
-callback find_routes(binary()) -> [#route{}].
-callback host_of_route(binary()) -> {ok, binary()} | error.
-callback is_my_route(binary()) -> boolean().
-callback is_my_host(binary()) -> boolean().
-callback get_all_routes() -> [binary()].
-callback handle_event(term()) -> any().
-record(state, {}). -record(state, {}).
%%==================================================================== %%====================================================================
%% API %% API
%%==================================================================== %%====================================================================
%%-------------------------------------------------------------------- start() ->
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} ChildSpec = {?MODULE, {?MODULE, start_link, []},
%% Description: Starts the server transient, 1000, worker, [?MODULE]},
%%-------------------------------------------------------------------- supervisor:start_child(ejabberd_sup, ChildSpec).
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec route(jid(), jid(), xmlel() | stanza()) -> ok. -spec route(jid(), jid(), xmlel() | stanza()) -> ok.
route(#jid{} = From, #jid{} = To, #xmlel{} = El) -> route(#jid{} = From, #jid{} = To, #xmlel{} = El) ->
try xmpp:decode(El, ?NS_CLIENT, [ignore_els]) of try xmpp:decode(El, ?NS_CLIENT, [ignore_els]) of
Pkt -> route(From, To, xmpp:set_from_to(Pkt, From, To)) Pkt -> route(From, To, Pkt)
catch _:{xmpp_codec, Why} -> catch _:{xmpp_codec, Why} ->
?ERROR_MSG("failed to decode xml element ~p when " ?ERROR_MSG("failed to decode xml element ~p when "
"routing from ~s to ~s: ~s", "routing from ~s to ~s: ~s",
@ -98,7 +103,6 @@ route(#jid{} = From, #jid{} = To, Packet) ->
%% RFC3920 9.3.1 %% RFC3920 9.3.1
-spec route_error(jid(), jid(), xmlel(), xmlel()) -> ok; -spec route_error(jid(), jid(), xmlel(), xmlel()) -> ok;
(jid(), jid(), stanza(), stanza_error()) -> ok. (jid(), jid(), stanza(), stanza_error()) -> ok.
route_error(From, To, #xmlel{} = ErrPacket, #xmlel{} = OrigPacket) -> route_error(From, To, #xmlel{} = ErrPacket, #xmlel{} = OrigPacket) ->
#xmlel{attrs = Attrs} = OrigPacket, #xmlel{attrs = Attrs} = OrigPacket,
case <<"error">> == fxml:get_attr_s(<<"type">>, Attrs) of case <<"error">> == fxml:get_attr_s(<<"type">>, Attrs) of
@ -113,152 +117,72 @@ route_error(From, To, Packet, #stanza_error{} = Err) ->
ejabberd_router:route(From, To, xmpp:make_error(Packet, Err)) ejabberd_router:route(From, To, xmpp:make_error(Packet, Err))
end. end.
-spec register_route(binary()) -> term().
register_route(Domain) ->
?WARNING_MSG("~s:register_route/1 is deprected, "
"use ~s:register_route/2 instead",
[?MODULE, ?MODULE]),
register_route(Domain, ?MYNAME).
-spec register_route(binary(), binary()) -> ok. -spec register_route(binary(), binary()) -> ok.
register_route(Domain, ServerHost) -> register_route(Domain, ServerHost) ->
register_route(Domain, ServerHost, undefined). register_route(Domain, ServerHost, undefined).
-spec register_route(binary(), binary(), local_hint()) -> ok. -spec register_route(binary(), binary(), local_hint()) -> ok.
register_route(Domain, ServerHost, LocalHint) -> register_route(Domain, ServerHost, LocalHint) ->
case {jid:nameprep(Domain), jid:nameprep(ServerHost)} of case {jid:nameprep(Domain), jid:nameprep(ServerHost)} of
{error, _} -> erlang:error({invalid_domain, Domain}); {error, _} ->
{_, error} -> erlang:error({invalid_domain, ServerHost}); erlang:error({invalid_domain, Domain});
{LDomain, LServerHost} -> {_, error} ->
Pid = self(), erlang:error({invalid_domain, ServerHost});
case get_component_number(LDomain) of {LDomain, LServerHost} ->
undefined -> Mod = get_backend(),
F = fun () -> case Mod:register_route(LDomain, LServerHost, LocalHint,
mnesia:write(#route{domain = LDomain, pid = Pid, get_component_number(LDomain)) of
server_host = LServerHost, ok ->
local_hint = LocalHint}) ?DEBUG("Route registered: ~s", [LDomain]);
end, {error, Err} ->
mnesia:transaction(F); ?ERROR_MSG("Failed to register route ~s: ~p",
N -> [LDomain, Err])
F = fun () -> end
case mnesia:wread({route, LDomain}) of
[] ->
mnesia:write(#route{domain = LDomain,
server_host = LServerHost,
pid = Pid,
local_hint = 1}),
lists:foreach(
fun (I) ->
mnesia:write(
#route{domain = LDomain,
pid = undefined,
server_host = LServerHost,
local_hint = I})
end,
lists:seq(2, N));
Rs ->
lists:any(
fun (#route{pid = undefined,
local_hint = I} = R) ->
mnesia:write(
#route{domain = LDomain,
pid = Pid,
server_host = LServerHost,
local_hint = I}),
mnesia:delete_object(R),
true;
(_) -> false
end,
Rs)
end
end,
mnesia:transaction(F)
end,
if LocalHint == undefined ->
?DEBUG("Route registered: ~s", [LDomain]);
true ->
ok
end
end. end.
-spec register_routes([{binary(), binary()}]) -> ok. -spec register_routes([{binary(), binary()}]) -> ok.
register_routes(Domains) -> register_routes(Domains) ->
lists:foreach(fun ({Domain, ServerHost}) -> register_route(Domain, ServerHost) lists:foreach(fun ({Domain, ServerHost}) -> register_route(Domain, ServerHost)
end, end,
Domains). Domains).
-spec unregister_route(binary()) -> ok. -spec unregister_route(binary()) -> ok.
unregister_route(Domain) -> unregister_route(Domain) ->
case jid:nameprep(Domain) of case jid:nameprep(Domain) of
error -> erlang:error({invalid_domain, Domain}); error ->
LDomain -> erlang:error({invalid_domain, Domain});
Pid = self(), LDomain ->
case get_component_number(LDomain) of Mod = get_backend(),
undefined -> case Mod:unregister_route(LDomain, get_component_number(LDomain)) of
F = fun () -> ok ->
case mnesia:match_object(#route{domain = LDomain, ?DEBUG("Route unregistered: ~s", [LDomain]);
pid = Pid, _ = '_'}) {error, Err} ->
of ?ERROR_MSG("Failed to unregister route ~s: ~p",
[R] -> mnesia:delete_object(R); [LDomain, Err])
_ -> ok end
end
end,
mnesia:transaction(F);
_ ->
F = fun () ->
case mnesia:match_object(#route{domain = LDomain,
pid = Pid, _ = '_'})
of
[R] ->
I = R#route.local_hint,
ServerHost = R#route.server_host,
mnesia:write(#route{domain = LDomain,
server_host = ServerHost,
pid = undefined,
local_hint = I}),
mnesia:delete_object(R);
_ -> ok
end
end,
mnesia:transaction(F)
end,
?DEBUG("Route unregistered: ~s", [LDomain])
end. end.
-spec unregister_routes([binary()]) -> ok. -spec unregister_routes([binary()]) -> ok.
unregister_routes(Domains) -> unregister_routes(Domains) ->
lists:foreach(fun (Domain) -> unregister_route(Domain) lists:foreach(fun (Domain) -> unregister_route(Domain)
end, end,
Domains). Domains).
-spec dirty_get_all_routes() -> [binary()]. -spec get_all_routes() -> [binary()].
get_all_routes() ->
dirty_get_all_routes() -> Mod = get_backend(),
lists:usort(mnesia:dirty_all_keys(route)) -- (?MYHOSTS). Mod:get_all_routes().
-spec dirty_get_all_domains() -> [binary()].
dirty_get_all_domains() ->
lists:usort(mnesia:dirty_all_keys(route)).
-spec host_of_route(binary()) -> binary(). -spec host_of_route(binary()) -> binary().
host_of_route(Domain) -> host_of_route(Domain) ->
case jid:nameprep(Domain) of case jid:nameprep(Domain) of
error -> error ->
erlang:error({invalid_domain, Domain}); erlang:error({invalid_domain, Domain});
LDomain -> LDomain ->
case mnesia:dirty_read(route, LDomain) of Mod = get_backend(),
[#route{server_host = ServerHost}|_] -> case Mod:host_of_route(LDomain) of
ServerHost; {ok, ServerHost} -> ServerHost;
[] -> error -> erlang:error({unregistered_route, Domain})
erlang:error({unregistered_route, Domain})
end end
end. end.
@ -268,7 +192,8 @@ is_my_route(Domain) ->
error -> error ->
erlang:error({invalid_domain, Domain}); erlang:error({invalid_domain, Domain});
LDomain -> LDomain ->
mnesia:dirty_read(route, LDomain) /= [] Mod = get_backend(),
Mod:is_my_route(LDomain)
end. end.
-spec is_my_host(binary()) -> boolean(). -spec is_my_host(binary()) -> boolean().
@ -277,12 +202,8 @@ is_my_host(Domain) ->
error -> error ->
erlang:error({invalid_domain, Domain}); erlang:error({invalid_domain, Domain});
LDomain -> LDomain ->
case mnesia:dirty_read(route, LDomain) of Mod = get_backend(),
[#route{server_host = Host}|_] -> Mod:is_my_host(LDomain)
Host == LDomain;
[] ->
false
end
end. end.
-spec process_iq(jid(), jid(), iq() | xmlel()) -> any(). -spec process_iq(jid(), jid(), iq() | xmlel()) -> any().
@ -294,7 +215,7 @@ process_iq(From, To, #iq{} = IQ) ->
end; end;
process_iq(From, To, #xmlel{} = El) -> process_iq(From, To, #xmlel{} = El) ->
try xmpp:decode(El, ?NS_CLIENT, [ignore_els]) of try xmpp:decode(El, ?NS_CLIENT, [ignore_els]) of
IQ -> process_iq(From, To, xmpp:set_from_to(IQ, From, To)) #iq{} = IQ -> process_iq(From, To, xmpp:set_from_to(IQ, From, To))
catch _:{xmpp_codec, Why} -> catch _:{xmpp_codec, Why} ->
Type = xmpp:get_type(El), Type = xmpp:get_type(El),
if Type == <<"get">>; Type == <<"set">> -> if Type == <<"get">>; Type == <<"set">> ->
@ -310,54 +231,18 @@ process_iq(From, To, #xmlel{} = El) ->
%%==================================================================== %%====================================================================
%% gen_server callbacks %% gen_server callbacks
%%==================================================================== %%====================================================================
%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([]) -> init([]) ->
update_tables(), Mod = get_backend(),
ejabberd_mnesia:create(?MODULE, route, Mod:init(),
[{ram_copies, [node()]},
{type, bag},
{attributes, record_info(fields, route)}]),
mnesia:add_table_copy(route, node(), ram_copies),
mnesia:subscribe({table, route, simple}),
lists:foreach(fun (Pid) -> erlang:monitor(process, Pid)
end,
mnesia:dirty_select(route,
[{{route, '_', '$1', '_'}, [], ['$1']}])),
{ok, #state{}}. {ok, #state{}}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
Reply = ok, {reply, Reply, State}. Reply = ok,
{reply, Reply, State}.
%%-------------------------------------------------------------------- handle_cast(_Msg, State) ->
%% Function: handle_cast(Msg, State) -> {noreply, State} | {noreply, State}.
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast(_Msg, State) -> {noreply, State}.
%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info({route, From, To, Packet}, State) -> handle_info({route, From, To, Packet}, State) ->
case catch do_route(From, To, Packet) of case catch do_route(From, To, Packet) of
{'EXIT', Reason} -> {'EXIT', Reason} ->
@ -366,52 +251,14 @@ handle_info({route, From, To, Packet}, State) ->
_ -> ok _ -> ok
end, end,
{noreply, State}; {noreply, State};
handle_info({mnesia_table_event, handle_info(Event, State) ->
{write, #route{pid = Pid}, _ActivityId}}, Mod = get_backend(),
State) -> Mod:handle_event(Event),
erlang:monitor(process, Pid), {noreply, State};
handle_info({'DOWN', _Ref, _Type, Pid, _Info}, State) ->
F = fun () ->
Es = mnesia:select(route,
[{#route{pid = Pid, _ = '_'}, [], ['$_']}]),
lists:foreach(fun (E) ->
if is_integer(E#route.local_hint) ->
LDomain = E#route.domain,
I = E#route.local_hint,
ServerHost = E#route.server_host,
mnesia:write(#route{domain =
LDomain,
server_host =
ServerHost,
pid =
undefined,
local_hint =
I}),
mnesia:delete_object(E);
true -> mnesia:delete_object(E)
end
end,
Es)
end,
mnesia:transaction(F),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}. {noreply, State}.
%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> void()
%% Description: This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, _State) -> terminate(_Reason, _State) ->
ok. ok.
%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -425,7 +272,8 @@ do_route(OrigFrom, OrigTo, OrigPacket) ->
{OrigFrom, OrigTo, OrigPacket}, []) of {OrigFrom, OrigTo, OrigPacket}, []) of
{From, To, Packet} -> {From, To, Packet} ->
LDstDomain = To#jid.lserver, LDstDomain = To#jid.lserver,
case mnesia:dirty_read(route, LDstDomain) of Mod = get_backend(),
case Mod:find_routes(LDstDomain) of
[] -> [] ->
ejabberd_s2s:route(From, To, Packet); ejabberd_s2s:route(From, To, Packet);
[Route] -> [Route] ->
@ -488,19 +336,17 @@ get_domain_balancing(From, To, LDomain) ->
bare_destination -> jid:remove_resource(jid:tolower(To)) bare_destination -> jid:remove_resource(jid:tolower(To))
end. end.
-spec update_tables() -> ok. -spec get_backend() -> module().
update_tables() -> get_backend() ->
try DBType = case ejabberd_config:get_option(
mnesia:transform_table(route, ignore, record_info(fields, route)) router_db_type,
catch exit:{aborted, {no_exists, _}} -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end) of
ok undefined ->
end, ejabberd_config:default_ram_db(?MODULE);
case lists:member(local_route, T ->
mnesia:system_info(tables)) T
of end,
true -> mnesia:delete_table(local_route); list_to_atom("ejabberd_router_" ++ atom_to_list(DBType)).
false -> ok
end.
opt_type(domain_balancing) -> opt_type(domain_balancing) ->
fun (random) -> random; fun (random) -> random;
@ -511,4 +357,7 @@ opt_type(domain_balancing) ->
end; end;
opt_type(domain_balancing_component_number) -> opt_type(domain_balancing_component_number) ->
fun (N) when is_integer(N), N > 1 -> N end; fun (N) when is_integer(N), N > 1 -> N end;
opt_type(_) -> [domain_balancing, domain_balancing_component_number]. opt_type(router_db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
opt_type(_) ->
[domain_balancing, domain_balancing_component_number,
router_db_type].

View File

@ -0,0 +1,185 @@
%%%-------------------------------------------------------------------
%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
%%% @copyright (C) 2017, Evgeny Khramtsov
%%% @doc
%%%
%%% @end
%%% Created : 11 Jan 2017 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%-------------------------------------------------------------------
-module(ejabberd_router_mnesia).
-behaviour(ejabberd_router).
%% API
-export([init/0, register_route/4, unregister_route/2, find_routes/1,
host_of_route/1, is_my_route/1, is_my_host/1, get_all_routes/0,
handle_event/1]).
-include("ejabberd.hrl").
-include("ejabberd_router.hrl").
-include("logger.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
%%%===================================================================
%%% API
%%%===================================================================
init() ->
update_tables(),
ejabberd_mnesia:create(?MODULE, route,
[{ram_copies, [node()]},
{type, bag},
{attributes, record_info(fields, route)}]),
mnesia:add_table_copy(route, node(), ram_copies),
mnesia:subscribe({table, route, simple}),
lists:foreach(
fun (Pid) -> erlang:monitor(process, Pid) end,
mnesia:dirty_select(route,
[{{route, '_', '$1', '_'}, [], ['$1']}])).
register_route(Domain, ServerHost, LocalHint, undefined) ->
F = fun () ->
mnesia:write(#route{domain = Domain,
pid = self(),
server_host = ServerHost,
local_hint = LocalHint})
end,
transaction(F);
register_route(Domain, ServerHost, _LocalHint, N) ->
Pid = self(),
F = fun () ->
case mnesia:wread({route, Domain}) of
[] ->
mnesia:write(#route{domain = Domain,
server_host = ServerHost,
pid = Pid,
local_hint = 1}),
lists:foreach(
fun (I) ->
mnesia:write(
#route{domain = Domain,
pid = undefined,
server_host = ServerHost,
local_hint = I})
end,
lists:seq(2, N));
Rs ->
lists:any(
fun (#route{pid = undefined,
local_hint = I} = R) ->
mnesia:write(
#route{domain = Domain,
pid = Pid,
server_host = ServerHost,
local_hint = I}),
mnesia:delete_object(R),
true;
(_) -> false
end,
Rs)
end
end,
transaction(F).
unregister_route(Domain, undefined) ->
F = fun () ->
case mnesia:match_object(
#route{domain = Domain, pid = self(), _ = '_'}) of
[R] -> mnesia:delete_object(R);
_ -> ok
end
end,
transaction(F);
unregister_route(Domain, _) ->
F = fun () ->
case mnesia:match_object(
#route{domain = Domain, pid = self(), _ = '_'}) of
[R] ->
I = R#route.local_hint,
ServerHost = R#route.server_host,
mnesia:write(#route{domain = Domain,
server_host = ServerHost,
pid = undefined,
local_hint = I}),
mnesia:delete_object(R);
_ -> ok
end
end,
transaction(F).
find_routes(Domain) ->
mnesia:dirty_read(route, Domain).
host_of_route(Domain) ->
case mnesia:dirty_read(route, Domain) of
[#route{server_host = ServerHost}|_] ->
{ok, ServerHost};
[] ->
error
end.
is_my_route(Domain) ->
mnesia:dirty_read(route, Domain) /= [].
is_my_host(Domain) ->
case mnesia:dirty_read(route, Domain) of
[#route{server_host = Host}|_] ->
Host == Domain;
[] ->
false
end.
get_all_routes() ->
mnesia:dirty_select(
route,
ets:fun2ms(
fun(#route{domain = Domain, server_host = ServerHost})
when Domain /= ServerHost -> Domain
end)).
handle_event({mnesia_table_event,
{write, #route{pid = Pid}, _ActivityId}}) ->
erlang:monitor(process, Pid);
handle_event({'DOWN', _Ref, _Type, Pid, _Info}) ->
F = fun () ->
Es = mnesia:select(route,
[{#route{pid = Pid, _ = '_'}, [], ['$_']}]),
lists:foreach(
fun(E) ->
if is_integer(E#route.local_hint) ->
LDomain = E#route.domain,
I = E#route.local_hint,
ServerHost = E#route.server_host,
mnesia:write(#route{domain = LDomain,
server_host = ServerHost,
pid = undefined,
local_hint = I}),
mnesia:delete_object(E);
true ->
mnesia:delete_object(E)
end
end, Es)
end,
transaction(F).
%%%===================================================================
%%% Internal functions
%%%===================================================================
transaction(F) ->
case mnesia:transaction(F) of
{atomic, _} ->
ok;
{aborted, Reason} ->
?ERROR_MSG("Mnesia transaction failed: ~p", [Reason]),
{error, Reason}
end.
-spec update_tables() -> ok.
update_tables() ->
try
mnesia:transform_table(route, ignore, record_info(fields, route))
catch exit:{aborted, {no_exists, _}} ->
ok
end,
case lists:member(local_route, mnesia:system_info(tables)) of
true -> mnesia:delete_table(local_route);
false -> ok
end.

View File

@ -35,7 +35,7 @@
unregister_route/1 unregister_route/1
]). ]).
-export([start_link/0]). -export([start/0, start_link/0]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -56,6 +56,11 @@
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} %% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server %% Description: Starts the server
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
start() ->
ChildSpec = {?MODULE, {?MODULE, start_link, []},
transient, 1000, worker, [?MODULE]},
supervisor:start_child(ejabberd_sup, ChildSpec).
start_link() -> start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

View File

@ -766,10 +766,14 @@ force_update_presence({LUser, LServer}) ->
-spec get_sm_backend(binary()) -> module(). -spec get_sm_backend(binary()) -> module().
get_sm_backend(Host) -> get_sm_backend(Host) ->
DBType = ejabberd_config:get_option( DBType = case ejabberd_config:get_option(
{sm_db_type, Host}, {sm_db_type, Host},
fun(T) -> ejabberd_config:v_db(?MODULE, T) end, fun(T) -> ejabberd_config:v_db(?MODULE, T) end) of
mnesia), undefined ->
ejabberd_config:default_ram_db(Host, ?MODULE);
T ->
T
end,
list_to_atom("ejabberd_sm_" ++ atom_to_list(DBType)). list_to_atom("ejabberd_sm_" ++ atom_to_list(DBType)).
-spec get_sm_backends() -> [module()]. -spec get_sm_backends() -> [module()].

View File

@ -55,20 +55,6 @@ init([]) ->
brutal_kill, brutal_kill,
worker, worker,
[ejabberd_system_monitor]}, [ejabberd_system_monitor]},
Router =
{ejabberd_router,
{ejabberd_router, start_link, []},
permanent,
brutal_kill,
worker,
[ejabberd_router]},
Router_multicast =
{ejabberd_router_multicast,
{ejabberd_router_multicast, start_link, []},
permanent,
brutal_kill,
worker,
[ejabberd_router_multicast]},
S2S = S2S =
{ejabberd_s2s, {ejabberd_s2s,
{ejabberd_s2s, start_link, []}, {ejabberd_s2s, start_link, []},
@ -76,13 +62,6 @@ init([]) ->
brutal_kill, brutal_kill,
worker, worker,
[ejabberd_s2s]}, [ejabberd_s2s]},
Local =
{ejabberd_local,
{ejabberd_local, start_link, []},
permanent,
brutal_kill,
worker,
[ejabberd_local]},
Captcha = Captcha =
{ejabberd_captcha, {ejabberd_captcha,
{ejabberd_captcha, start_link, []}, {ejabberd_captcha, start_link, []},
@ -141,10 +120,7 @@ init([]) ->
[Hooks, [Hooks,
NodeGroups, NodeGroups,
SystemMonitor, SystemMonitor,
Router,
Router_multicast,
S2S, S2S,
Local,
Captcha, Captcha,
S2SInSupervisor, S2SInSupervisor,
S2SOutSupervisor, S2SOutSupervisor,

View File

@ -470,6 +470,5 @@ get_module_proc(Host, Base) ->
is_loaded(Host, Module) -> is_loaded(Host, Module) ->
ets:member(ejabberd_modules, {Module, Host}). ets:member(ejabberd_modules, {Module, Host}).
opt_type(default_db) -> fun(T) when is_atom(T) -> T end;
opt_type(modules) -> fun (L) when is_list(L) -> L end; opt_type(modules) -> fun (L) when is_list(L) -> L end;
opt_type(_) -> [default_db, modules]. opt_type(_) -> [modules].

View File

@ -235,7 +235,7 @@ get_vh_services(Host) ->
[VH | _] -> VH == Host [VH | _] -> VH == Host
end end
end, end,
ejabberd_router:dirty_get_all_routes()). ejabberd_router:get_all_routes()).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%