diff --git a/src/ejabberd_app.erl b/src/ejabberd_app.erl index b52450d24..64edf508c 100644 --- a/src/ejabberd_app.erl +++ b/src/ejabberd_app.erl @@ -25,12 +25,11 @@ -module(ejabberd_app). --behaviour(ejabberd_config). -author('alexey@process-one.net'). -behaviour(application). --export([start/2, prep_stop/1, stop/1, opt_type/1]). +-export([start/2, prep_stop/1, stop/1]). -include("ejabberd.hrl"). -include("logger.hrl"). @@ -49,13 +48,12 @@ start(normal, _Args) -> setup_if_elixir_conf_used(), ejabberd_config:start(), ejabberd_mnesia:start(), - set_settings_from_config(), file_queue_init(), maybe_add_nameservers(), - connect_nodes(), case ejabberd_sup:start_link() of {ok, SupPid} -> register_elixir_config_hooks(), + ejabberd_cluster:wait_for_sync(infinity), {T2, _} = statistics(wall_clock), ?INFO_MSG("ejabberd ~s is started in the node ~p in ~.2fs", [?VERSION, node(), (T2-T1)/1000]), @@ -88,12 +86,6 @@ stop(_State) -> %%% Internal functions %%% -connect_nodes() -> - Nodes = ejabberd_config:get_option(cluster_nodes, []), - lists:foreach(fun(Node) -> - net_kernel:connect_node(Node) - end, Nodes). - %% If ejabberd is running on some Windows machine, get nameservers and add to Erlang maybe_add_nameservers() -> case os:type() of @@ -136,10 +128,6 @@ delete_pid_file() -> file:delete(PidFilename) end. -set_settings_from_config() -> - Ticktime = ejabberd_config:get_option(net_ticktime, 60), - net_kernel:set_net_ticktime(Ticktime). - file_queue_init() -> QueueDir = case ejabberd_config:queue_dir() of undefined -> @@ -160,15 +148,6 @@ start_apps() -> ejabberd:start_app(xmpp), ejabberd:start_app(cache_tab). --spec opt_type(net_ticktime) -> fun((pos_integer()) -> pos_integer()); - (cluster_nodes) -> fun(([node()]) -> [node()]); - (atom()) -> atom(). -opt_type(net_ticktime) -> - fun (P) when is_integer(P), P > 0 -> P end; -opt_type(cluster_nodes) -> - fun (Ns) -> true = lists:all(fun is_atom/1, Ns), Ns end; -opt_type(_) -> [cluster_nodes, net_ticktime]. - setup_if_elixir_conf_used() -> case ejabberd_config:is_using_elixir_config() of true -> 'Elixir.Ejabberd.Config.Store':start_link(); diff --git a/src/ejabberd_cluster.erl b/src/ejabberd_cluster.erl index aeae294b0..c04216ebc 100644 --- a/src/ejabberd_cluster.erl +++ b/src/ejabberd_cluster.erl @@ -1,152 +1,192 @@ -%%%---------------------------------------------------------------------- -%%% File : ejabberd_cluster.erl -%%% Author : Christophe Romain -%%% Purpose : Ejabberd clustering management -%%% Created : 7 Oct 2015 by Christophe Romain +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov +%%% @copyright (C) 2017, Evgeny Khramtsov +%%% @doc %%% -%%% -%%% ejabberd, Copyright (C) 2002-2017 ProcessOne -%%% -%%% This program is free software; you can redistribute it and/or -%%% modify it under the terms of the GNU General Public License as -%%% published by the Free Software Foundation; either version 2 of the -%%% License, or (at your option) any later version. -%%% -%%% This program is distributed in the hope that it will be useful, -%%% but WITHOUT ANY WARRANTY; without even the implied warranty of -%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -%%% General Public License for more details. -%%% -%%% You should have received a copy of the GNU General Public License along -%%% with this program; if not, write to the Free Software Foundation, Inc., -%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -%%% -%%%---------------------------------------------------------------------- - +%%% @end +%%% Created : 5 Jul 2017 by Evgeny Khramtsov +%%%------------------------------------------------------------------- -module(ejabberd_cluster). +-behaviour(ejabberd_config). +-behaviour(gen_server). %% API --export([get_nodes/0, call/4, multicall/3, multicall/4, - eval_everywhere/3, eval_everywhere/4]). --export([join/1, leave/1, get_known_nodes/0]). --export([node_id/0, get_node_by_id/1]). +-export([start_link/0, call/4, multicall/3, multicall/4, eval_everywhere/3, + eval_everywhere/4]). +%% Backend dependent API +-export([get_nodes/0, get_known_nodes/0, join/1, leave/1, subscribe/0, + subscribe/1, node_id/0, get_node_by_id/1, send/2, wait_for_sync/1]). +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). +-export([opt_type/1]). --include("ejabberd.hrl"). -include("logger.hrl"). --spec get_nodes() -> [node()]. +-type dst() :: pid() | atom() | {atom(), node()}. -get_nodes() -> - mnesia:system_info(running_db_nodes). +-callback init() -> ok | {error, any()}. +-callback get_nodes() -> [node()]. +-callback get_known_nodes() -> [node()]. +-callback join(node()) -> ok | {error, any()}. +-callback leave(node()) -> ok | {error, any()}. +-callback node_id() -> binary(). +-callback get_node_by_id(binary()) -> node(). +-callback send({atom(), node()}, term()) -> boolean(). +-callback wait_for_sync(timeout()) -> ok | {error, any()}. +-callback subscribe(dst()) -> ok. --spec get_known_nodes() -> [node()]. +-record(state, {}). -get_known_nodes() -> - lists:usort(mnesia:system_info(db_nodes) - ++ mnesia:system_info(extra_db_nodes)). +%%%=================================================================== +%%% API +%%%=================================================================== +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -spec call(node(), module(), atom(), [any()]) -> any(). - call(Node, Module, Function, Args) -> - rpc:call(Node, Module, Function, Args, 5000). + rpc:call(Node, Module, Function, Args, rpc_timeout()). -spec multicall(module(), atom(), [any()]) -> {list(), [node()]}. - multicall(Module, Function, Args) -> multicall(get_nodes(), Module, Function, Args). -spec multicall([node()], module(), atom(), list()) -> {list(), [node()]}. - multicall(Nodes, Module, Function, Args) -> - rpc:multicall(Nodes, Module, Function, Args, 5000). + rpc:multicall(Nodes, Module, Function, Args, rpc_timeout()). -spec eval_everywhere(module(), atom(), [any()]) -> ok. - eval_everywhere(Module, Function, Args) -> eval_everywhere(get_nodes(), Module, Function, Args), ok. -spec eval_everywhere([node()], module(), atom(), [any()]) -> ok. - eval_everywhere(Nodes, Module, Function, Args) -> rpc:eval_everywhere(Nodes, Module, Function, Args), ok. --spec join(node()) -> ok | {error, any()}. +%%%=================================================================== +%%% Backend dependent API +%%%=================================================================== +-spec get_nodes() -> [node()]. +get_nodes() -> + Mod = get_mod(), + Mod:get_nodes(). +-spec get_known_nodes() -> [node()]. +get_known_nodes() -> + Mod = get_mod(), + Mod:get_known_nodes(). + +-spec join(node()) -> ok | {error, any()}. join(Node) -> - case {node(), net_adm:ping(Node)} of - {Node, _} -> - {error, {not_master, Node}}; - {_, pong} -> - application:stop(ejabberd), - application:stop(mnesia), - mnesia:delete_schema([node()]), - application:start(mnesia), - mnesia:change_config(extra_db_nodes, [Node]), - mnesia:change_table_copy_type(schema, node(), disc_copies), - spawn(fun() -> - lists:foreach(fun(Table) -> - Type = call(Node, mnesia, table_info, [Table, storage_type]), - mnesia:add_table_copy(Table, node(), Type) - end, mnesia:system_info(tables)--[schema]) - end), - application:start(ejabberd); - _ -> - {error, {no_ping, Node}} - end. + Mod = get_mod(), + Mod:join(Node). -spec leave(node()) -> ok | {error, any()}. - leave(Node) -> - case {node(), net_adm:ping(Node)} of - {Node, _} -> - Cluster = get_nodes()--[Node], - leave(Cluster, Node); - {_, pong} -> - rpc:call(Node, ?MODULE, leave, [Node], 10000); - {_, pang} -> - case mnesia:del_table_copy(schema, Node) of - {atomic, ok} -> ok; - {aborted, Reason} -> {error, Reason} - end - end. -leave([], Node) -> - {error, {no_cluster, Node}}; -leave([Master|_], Node) -> - application:stop(ejabberd), - application:stop(mnesia), - call(Master, mnesia, del_table_copy, [schema, Node]), - spawn(fun() -> - mnesia:delete_schema([node()]), - erlang:halt(0) - end), - ok. + Mod = get_mod(), + Mod:leave(Node). -spec node_id() -> binary(). node_id() -> - integer_to_binary(erlang:phash2(node())). + Mod = get_mod(), + Mod:node_id(). -spec get_node_by_id(binary()) -> node(). -get_node_by_id(Hash) -> - try binary_to_integer(Hash) of - I -> match_node_id(I) - catch _:_ -> - node() +get_node_by_id(ID) -> + Mod = get_mod(), + Mod:get_node_by_id(ID). + +-spec send(dst(), term()) -> boolean(). +send(Dst, Msg) -> + IsLocal = case Dst of + {_, Node} -> Node == node(); + Pid when is_pid(Pid) -> node(Pid) == node(); + Name when is_atom(Name) -> true; + _ -> false + end, + if IsLocal -> + erlang:send(Dst, Msg), + true; + true -> + Mod = get_mod(), + Mod:send(Dst, Msg) end. +-spec wait_for_sync(timeout()) -> ok | {error, any()}. +wait_for_sync(Timeout) -> + Mod = get_mod(), + Mod:wait_for_sync(Timeout). + +-spec subscribe() -> ok. +subscribe() -> + subscribe(self()). + +-spec subscribe(dst()) -> ok. +subscribe(Proc) -> + Mod = get_mod(), + Mod:subscribe(Proc). + +%%%=================================================================== +%%% gen_server API +%%%=================================================================== +init([]) -> + Ticktime = ejabberd_config:get_option(net_ticktime, 60), + Nodes = ejabberd_config:get_option(cluster_nodes, []), + net_kernel:set_net_ticktime(Ticktime), + lists:foreach(fun(Node) -> + net_kernel:connect_node(Node) + end, Nodes), + Mod = get_mod(), + case Mod:init() of + ok -> + Mod:subscribe(?MODULE), + {ok, #state{}}; + {error, Reason} -> + {stop, Reason} + end. + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({node_up, Node}, State) -> + ?INFO_MSG("Node ~s has joined", [Node]), + {noreply, State}; +handle_info({node_down, Node}, State) -> + ?INFO_MSG("Node ~s has left", [Node]), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + %%%=================================================================== %%% Internal functions %%%=================================================================== --spec match_node_id(integer()) -> node(). -match_node_id(I) -> - match_node_id(I, get_nodes()). +get_mod() -> + Backend = ejabberd_config:get_option(cluster_backend, mnesia), + list_to_atom("ejabberd_cluster_" ++ atom_to_list(Backend)). --spec match_node_id(integer(), [node()]) -> node(). -match_node_id(I, [Node|Nodes]) -> - case erlang:phash2(Node) of - I -> Node; - _ -> match_node_id(I, Nodes) - end; -match_node_id(_I, []) -> - node(). +rpc_timeout() -> + timer:seconds(ejabberd_config:get_option(rpc_timeout, 5)). + +opt_type(net_ticktime) -> + fun (P) when is_integer(P), P > 0 -> P end; +opt_type(cluster_nodes) -> + fun (Ns) -> true = lists:all(fun is_atom/1, Ns), Ns end; +opt_type(rpc_timeout) -> + fun (T) when is_integer(T), T > 0 -> T end; +opt_type(cluster_backend) -> + fun (T) -> ejabberd_config:v_db(?MODULE, T) end; +opt_type(_) -> + [rpc_timeout, cluster_backend, cluster_nodes, net_ticktime]. diff --git a/src/ejabberd_cluster_mnesia.erl b/src/ejabberd_cluster_mnesia.erl new file mode 100644 index 000000000..100bdaff1 --- /dev/null +++ b/src/ejabberd_cluster_mnesia.erl @@ -0,0 +1,144 @@ +%%%---------------------------------------------------------------------- +%%% File : ejabberd_cluster.erl +%%% Author : Christophe Romain +%%% Purpose : Ejabberd clustering management +%%% Created : 7 Oct 2015 by Christophe Romain +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2017 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- + +-module(ejabberd_cluster_mnesia). +-behaviour(ejabberd_cluster). + +%% API +-export([init/0, get_nodes/0, join/1, leave/1, + get_known_nodes/0, node_id/0, get_node_by_id/1, + send/2, wait_for_sync/1, subscribe/1]). + +-include("ejabberd.hrl"). +-include("logger.hrl"). + +-spec init() -> ok. +init() -> + ok. + +-spec get_nodes() -> [node()]. + +get_nodes() -> + mnesia:system_info(running_db_nodes). + +-spec get_known_nodes() -> [node()]. + +get_known_nodes() -> + lists:usort(mnesia:system_info(db_nodes) + ++ mnesia:system_info(extra_db_nodes)). + +-spec join(node()) -> ok | {error, any()}. + +join(Node) -> + case {node(), net_adm:ping(Node)} of + {Node, _} -> + {error, {not_master, Node}}; + {_, pong} -> + application:stop(ejabberd), + application:stop(mnesia), + mnesia:delete_schema([node()]), + application:start(mnesia), + mnesia:change_config(extra_db_nodes, [Node]), + mnesia:change_table_copy_type(schema, node(), disc_copies), + spawn(fun() -> + lists:foreach(fun(Table) -> + Type = ejabberd_cluster:call( + Node, mnesia, table_info, [Table, storage_type]), + mnesia:add_table_copy(Table, node(), Type) + end, mnesia:system_info(tables)--[schema]) + end), + application:start(ejabberd); + _ -> + {error, {no_ping, Node}} + end. + +-spec leave(node()) -> ok | {error, any()}. + +leave(Node) -> + case {node(), net_adm:ping(Node)} of + {Node, _} -> + Cluster = get_nodes()--[Node], + leave(Cluster, Node); + {_, pong} -> + rpc:call(Node, ?MODULE, leave, [Node], 10000); + {_, pang} -> + case mnesia:del_table_copy(schema, Node) of + {atomic, ok} -> ok; + {aborted, Reason} -> {error, Reason} + end + end. +leave([], Node) -> + {error, {no_cluster, Node}}; +leave([Master|_], Node) -> + application:stop(ejabberd), + application:stop(mnesia), + ejabberd_cluster:call(Master, mnesia, del_table_copy, [schema, Node]), + spawn(fun() -> + mnesia:delete_schema([node()]), + erlang:halt(0) + end), + ok. + +-spec node_id() -> binary(). +node_id() -> + integer_to_binary(erlang:phash2(node())). + +-spec get_node_by_id(binary()) -> node(). +get_node_by_id(Hash) -> + try binary_to_integer(Hash) of + I -> match_node_id(I) + catch _:_ -> + node() + end. + +-spec send({atom(), node()}, term()) -> boolean(). +send(Dst, Msg) -> + erlang:send(Dst, Msg). + +-spec wait_for_sync(timeout()) -> ok. +wait_for_sync(Timeout) -> + ?INFO_MSG("Waiting for Mnesia synchronization to complete", []), + mnesia:wait_for_tables(mnesia:system_info(local_tables), Timeout), + ok. + +-spec subscribe(_) -> ok. +subscribe(_) -> + ok. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +-spec match_node_id(integer()) -> node(). +match_node_id(I) -> + match_node_id(I, get_nodes()). + +-spec match_node_id(integer(), [node()]) -> node(). +match_node_id(I, [Node|Nodes]) -> + case erlang:phash2(Node) of + I -> Node; + _ -> match_node_id(I, Nodes) + end; +match_node_id(_I, []) -> + node(). diff --git a/src/ejabberd_listener.erl b/src/ejabberd_listener.erl index 3ea23d61b..e9b4306e5 100644 --- a/src/ejabberd_listener.erl +++ b/src/ejabberd_listener.erl @@ -109,6 +109,7 @@ init_udp(PortIP, Module, Opts, SockOpts, Port, IPS) -> {ok, Socket} -> %% Inform my parent that this port was opened succesfully proc_lib:init_ack({ok, self()}), + application:ensure_started(ejabberd), start_module_sup(Port, Module), ?INFO_MSG("Start accepting UDP connections at ~s for ~p", [format_portip(PortIP), Module]), @@ -134,6 +135,7 @@ init_tcp(PortIP, Module, Opts, SockOpts, Port, IPS) -> ListenSocket = listen_tcp(PortIP, Module, SockOpts, Port, IPS), %% Inform my parent that this port was opened succesfully proc_lib:init_ack({ok, self()}), + application:ensure_started(ejabberd), start_module_sup(Port, Module), ?INFO_MSG("Start accepting TCP connections at ~s for ~p", [format_portip(PortIP), Module]), diff --git a/src/ejabberd_mnesia.erl b/src/ejabberd_mnesia.erl index 34691545a..16e385011 100644 --- a/src/ejabberd_mnesia.erl +++ b/src/ejabberd_mnesia.erl @@ -68,8 +68,6 @@ init([]) -> _ -> ok end, ejabberd:start_app(mnesia, permanent), - ?DEBUG("Waiting for Mnesia tables synchronization...", []), - mnesia:wait_for_tables(mnesia:system_info(local_tables), infinity), Schema = read_schema_file(), {ok, #state{schema = Schema}}; false -> diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl index 224ed16c1..35527ebd7 100644 --- a/src/ejabberd_sup.erl +++ b/src/ejabberd_sup.erl @@ -41,6 +41,12 @@ init([]) -> brutal_kill, worker, [ejabberd_hooks]}, + Cluster = {ejabberd_cluster, + {ejabberd_cluster, start_link, []}, + permanent, + 5000, + worker, + [ejabberd_cluster]}, SystemMonitor = {ejabberd_system_monitor, {ejabberd_system_monitor, start_link, []}, @@ -152,6 +158,7 @@ init([]) -> permanent, 5000, worker, [ejabberd_pkix]}, {ok, {{one_for_one, 10, 1}, [Hooks, + Cluster, CyrSASL, Translation, AccessPerms,