diff --git a/src/ejabberd_cluster.erl b/src/ejabberd_cluster.erl index 1e3f02a9e..5826d6d31 100644 --- a/src/ejabberd_cluster.erl +++ b/src/ejabberd_cluster.erl @@ -28,6 +28,7 @@ %% API -export([get_nodes/0, call/4, multicall/3, multicall/4]). -export([join/1, leave/1]). +-export([node_id/0, get_node_by_id/1]). -include("ejabberd.hrl"). -include("logger.hrl"). @@ -102,3 +103,31 @@ leave([Master|_], Node) -> erlang:halt(0) end), ok. + +-spec node_id() -> binary(). +node_id() -> + integer_to_binary(erlang:phash2(node())). + +-spec get_node_by_id(binary()) -> node(). +get_node_by_id(Hash) -> + try binary_to_integer(Hash) of + I -> match_node_id(I) + catch _:_ -> + node() + end. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +-spec match_node_id(integer()) -> node(). +match_node_id(I) -> + match_node_id(I, get_nodes()). + +-spec match_node_id(integer(), [node()]) -> node(). +match_node_id(I, [Node|Nodes]) -> + case erlang:phash2(Node) of + I -> Node; + _ -> match_node_id(I, Nodes) + end; +match_node_id(_I, []) -> + node(). diff --git a/src/mod_proxy65.erl b/src/mod_proxy65.erl index 2d0d9ae0a..0c403e23b 100644 --- a/src/mod_proxy65.erl +++ b/src/mod_proxy65.erl @@ -43,6 +43,12 @@ -define(PROCNAME, ejabberd_mod_proxy65). +-callback init() -> any(). +-callback register_stream(binary(), pid()) -> ok | {error, any()}. +-callback unregister_stream(binary()) -> ok | {error, any()}. +-callback activate_stream(binary(), binary(), pos_integer() | infinity, node()) -> + ok | {error, limit | conflict | notfound | term()}. + start(Host, Opts) -> case mod_proxy65_service:add_listener(Host, Opts) of {error, _} = Err -> erlang:error(Err); @@ -50,7 +56,12 @@ start(Host, Opts) -> Proc = gen_mod:get_module_proc(Host, ?PROCNAME), ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]}, transient, infinity, supervisor, [?MODULE]}, - supervisor:start_child(ejabberd_sup, ChildSpec) + case supervisor:start_child(ejabberd_sup, ChildSpec) of + {error, _} = Err -> erlang:error(Err); + _ -> + Mod = gen_mod:ram_db_mod(global, ?MODULE), + Mod:init() + end end. stop(Host) -> @@ -77,12 +88,9 @@ init([Host, Opts]) -> ejabberd_mod_proxy65_sup), mod_proxy65_stream]}, transient, infinity, supervisor, [ejabberd_tmp_sup]}, - StreamManager = {mod_proxy65_sm, - {mod_proxy65_sm, start_link, [Host, Opts]}, transient, - 5000, worker, [mod_proxy65_sm]}, {ok, {{one_for_one, 10, 1}, - [StreamManager, StreamSupervisor, Service]}}. + [StreamSupervisor, Service]}}. depends(_Host, _Opts) -> []. @@ -112,7 +120,9 @@ mod_opt_type(max_connections) -> fun (I) when is_integer(I), I > 0 -> I; (infinity) -> infinity end; +mod_opt_type(ram_db_type) -> + fun(T) -> ejabberd_config:v_db(?MODULE, T) end; mod_opt_type(_) -> [auth_type, recbuf, shaper, sndbuf, access, host, hostname, ip, name, port, - max_connections]. + max_connections, ram_db_type]. diff --git a/src/mod_proxy65_mnesia.erl b/src/mod_proxy65_mnesia.erl new file mode 100644 index 000000000..e50b29c98 --- /dev/null +++ b/src/mod_proxy65_mnesia.erl @@ -0,0 +1,145 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov +%%% @copyright (C) 2017, Evgeny Khramtsov +%%% @doc +%%% +%%% @end +%%% Created : 16 Jan 2017 by Evgeny Khramtsov +%%%------------------------------------------------------------------- +-module(mod_proxy65_mnesia). +-behaviour(gen_server). +-behaviour(mod_proxy65). + +%% API +-export([init/0, register_stream/2, unregister_stream/1, activate_stream/4]). +-export([start_link/0]). +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-include("logger.hrl"). + +-record(bytestream, + {sha1 = <<"">> :: binary() | '$1', + target :: pid() | '_', + initiator :: pid() | '_', + active = false :: boolean() | '_', + jid_i :: undefined | binary() | '_'}). + +-record(state, {}). + +%%%=================================================================== +%%% API +%%%=================================================================== +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init() -> + Spec = {?MODULE, {?MODULE, start_link, []}, transient, + 5000, worker, [?MODULE]}, + supervisor:start_child(ejabberd_sup, Spec). + +register_stream(SHA1, StreamPid) -> + F = fun () -> + case mnesia:read(bytestream, SHA1, write) of + [] -> + mnesia:write(#bytestream{sha1 = SHA1, + target = StreamPid}); + [#bytestream{target = Pid, initiator = undefined} = + ByteStream] when is_pid(Pid), Pid /= StreamPid -> + mnesia:write(ByteStream#bytestream{ + initiator = StreamPid}) + end + end, + case mnesia:transaction(F) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + ?ERROR_MSG("Mnesia transaction failed: ~p", [Reason]), + {error, Reason} + end. + +unregister_stream(SHA1) -> + F = fun () -> mnesia:delete({bytestream, SHA1}) end, + case mnesia:transaction(F) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + ?ERROR_MSG("Mnesia transaction failed: ~p", [Reason]), + {error, Reason} + end. + +activate_stream(SHA1, Initiator, MaxConnections, _Node) -> + case gen_server:call(?MODULE, + {activate_stream, SHA1, Initiator, MaxConnections}) of + {atomic, {ok, IPid, TPid}} -> + {ok, IPid, TPid}; + {atomic, {limit, IPid, TPid}} -> + {error, {limit, IPid, TPid}}; + {atomic, conflict} -> + {error, conflict}; + {atomic, notfound} -> + {error, notfound}; + Err -> + {error, Err} + end. + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== +init([]) -> + ejabberd_mnesia:create(?MODULE, bytestream, + [{ram_copies, [node()]}, + {attributes, record_info(fields, bytestream)}]), + mnesia:add_table_copy(bytestream, node(), ram_copies), + {ok, #state{}}. + +handle_call({activate_stream, SHA1, Initiator, MaxConnections}, _From, State) -> + F = fun () -> + case mnesia:read(bytestream, SHA1, write) of + [#bytestream{target = TPid, initiator = IPid} = + ByteStream] when is_pid(TPid), is_pid(IPid) -> + ActiveFlag = ByteStream#bytestream.active, + if ActiveFlag == false -> + ConnsPerJID = mnesia:select( + bytestream, + [{#bytestream{sha1 = '$1', + jid_i = Initiator, + _ = '_'}, + [], ['$1']}]), + if length(ConnsPerJID) < MaxConnections -> + mnesia:write( + ByteStream#bytestream{active = true, + jid_i = Initiator}), + {ok, IPid, TPid}; + true -> + {limit, IPid, TPid} + end; + true -> + conflict + end; + _ -> + notfound + end + end, + Reply = mnesia:transaction(F), + {reply, Reply, State}; +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/src/mod_proxy65_service.erl b/src/mod_proxy65_service.erl index 0f69086e0..db844cb81 100644 --- a/src/mod_proxy65_service.erl +++ b/src/mod_proxy65_service.erl @@ -175,31 +175,39 @@ process_bytestreams(#iq{type = set, lang = Lang, from = InitiatorJID, to = To, all), case acl:match_rule(ServerHost, ACL, InitiatorJID) of allow -> + Node = ejabberd_cluster:get_node_by_id(To#jid.lresource), Target = jid:to_string(jid:tolower(TargetJID)), Initiator = jid:to_string(jid:tolower(InitiatorJID)), SHA1 = p1_sha:sha(<>), - case mod_proxy65_sm:activate_stream(SHA1, InitiatorJID, - TargetJID, ServerHost) of - ok -> + Mod = gen_mod:ram_db_mod(global, mod_proxy65), + MaxConnections = max_connections(ServerHost), + case Mod:activate_stream(SHA1, Initiator, MaxConnections, Node) of + {ok, InitiatorPid, TargetPid} -> + mod_proxy65_stream:activate( + {InitiatorPid, InitiatorJID}, {TargetPid, TargetJID}), xmpp:make_iq_result(IQ); - false -> + {error, notfound} -> Txt = <<"Failed to activate bytestream">>, xmpp:make_error(IQ, xmpp:err_item_not_found(Txt, Lang)); - limit -> + {error, {limit, InitiatorPid, TargetPid}} -> + mod_proxy65_stream:stop(InitiatorPid), + mod_proxy65_stream:stop(TargetPid), Txt = <<"Too many active bytestreams">>, xmpp:make_error(IQ, xmpp:err_resource_constraint(Txt, Lang)); - conflict -> + {error, conflict} -> Txt = <<"Bytestream already activated">>, xmpp:make_error(IQ, xmpp:err_conflict(Txt, Lang)); - Err -> + {error, Err} -> ?ERROR_MSG("failed to activate bytestream from ~s to ~s: ~p", [Initiator, Target, Err]), - xmpp:make_error(IQ, xmpp:err_internal_server_error()) + Txt = <<"Database failure">>, + xmpp:make_error(IQ, xmpp:err_internal_server_error(Txt, Lang)) end; deny -> Txt = <<"Denied by ACL">>, xmpp:make_error(IQ, xmpp:err_forbidden(Txt, Lang)) end. + %%%------------------------- %%% Auxiliary functions. %%%------------------------- @@ -219,7 +227,8 @@ get_streamhost(Host, ServerHost) -> HostName = gen_mod:get_module_opt(ServerHost, mod_proxy65, hostname, fun iolist_to_binary/1, jlib:ip_to_list(IP)), - #streamhost{jid = jid:make(Host), + Resource = ejabberd_cluster:node_id(), + #streamhost{jid = jid:make(<<"">>, Host, Resource), host = HostName, port = Port}. @@ -246,3 +255,9 @@ get_my_ip() -> {ok, Addr} -> Addr; {error, _} -> {127, 0, 0, 1} end. + +max_connections(ServerHost) -> + gen_mod:get_module_opt(ServerHost, mod_proxy65, max_connections, + fun(I) when is_integer(I), I>0 -> I; + (infinity) -> infinity + end, infinity). diff --git a/src/mod_proxy65_sm.erl b/src/mod_proxy65_sm.erl deleted file mode 100644 index b1d33b5d9..000000000 --- a/src/mod_proxy65_sm.erl +++ /dev/null @@ -1,171 +0,0 @@ -%%%---------------------------------------------------------------------- -%%% File : mod_proxy65_sm.erl -%%% Author : Evgeniy Khramtsov -%%% Purpose : Bytestreams manager. -%%% Created : 12 Oct 2006 by Evgeniy Khramtsov -%%% -%%% -%%% ejabberd, Copyright (C) 2002-2016 ProcessOne -%%% -%%% This program is free software; you can redistribute it and/or -%%% modify it under the terms of the GNU General Public License as -%%% published by the Free Software Foundation; either version 2 of the -%%% License, or (at your option) any later version. -%%% -%%% This program is distributed in the hope that it will be useful, -%%% but WITHOUT ANY WARRANTY; without even the implied warranty of -%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -%%% General Public License for more details. -%%% -%%% You should have received a copy of the GNU General Public License along -%%% with this program; if not, write to the Free Software Foundation, Inc., -%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -%%% -%%%---------------------------------------------------------------------- - --module(mod_proxy65_sm). - --author('xram@jabber.ru'). - --behaviour(gen_server). - -%% gen_server callbacks. --export([init/1, handle_info/2, handle_call/3, - handle_cast/2, terminate/2, code_change/3]). - --export([start_link/2, register_stream/1, - unregister_stream/1, activate_stream/4]). - --record(state, {max_connections = infinity :: non_neg_integer() | infinity}). - --record(bytestream, - {sha1 = <<"">> :: binary() | '$1', - target :: pid() | '_', - initiator :: pid() | '_', - active = false :: boolean() | '_', - jid_i = {<<"">>, <<"">>, <<"">>} :: jid:ljid() | '_'}). - --define(PROCNAME, ejabberd_mod_proxy65_sm). - -%% Unused callbacks. -handle_cast(_Request, State) -> {noreply, State}. - -code_change(_OldVsn, State, _Extra) -> {ok, State}. - -handle_info(_Info, State) -> {noreply, State}. - -%%---------------- - -start_link(Host, Opts) -> - Proc = gen_mod:get_module_proc(Host, ?PROCNAME), - gen_server:start_link({local, Proc}, ?MODULE, [Opts], - []). - -init([Opts]) -> - ejabberd_mnesia:create(?MODULE, bytestream, - [{ram_copies, [node()]}, - {attributes, record_info(fields, bytestream)}]), - mnesia:add_table_copy(bytestream, node(), ram_copies), - MaxConnections = gen_mod:get_opt(max_connections, Opts, - fun(I) when is_integer(I), I>0 -> - I; - (infinity) -> - infinity - end, infinity), - {ok, #state{max_connections = MaxConnections}}. - -terminate(_Reason, _State) -> ok. - -handle_call({activate, SHA1, IJid}, _From, State) -> - MaxConns = State#state.max_connections, - F = fun () -> - case mnesia:read(bytestream, SHA1, write) of - [#bytestream{target = TPid, initiator = IPid} = - ByteStream] - when is_pid(TPid), is_pid(IPid) -> - ActiveFlag = ByteStream#bytestream.active, - if ActiveFlag == false -> - ConnsPerJID = mnesia:select(bytestream, - [{#bytestream{sha1 = - '$1', - jid_i = - IJid, - _ = '_'}, - [], ['$1']}]), - if length(ConnsPerJID) < MaxConns -> - mnesia:write(ByteStream#bytestream{active = - true, - jid_i = - IJid}), - {ok, IPid, TPid}; - true -> {limit, IPid, TPid} - end; - true -> conflict - end; - _ -> false - end - end, - Reply = mnesia:transaction(F), - {reply, Reply, State}; -handle_call(_Request, _From, State) -> - {reply, ok, State}. - -%%%---------------------- -%%% API. -%%%---------------------- -%%%--------------------------------------------------- -%%% register_stream(SHA1) -> {atomic, ok} | -%%% {atomic, error} | -%%% transaction abort -%%% SHA1 = string() -%%%--------------------------------------------------- -register_stream(SHA1) when is_binary(SHA1) -> - StreamPid = self(), - F = fun () -> - case mnesia:read(bytestream, SHA1, write) of - [] -> - mnesia:write(#bytestream{sha1 = SHA1, - target = StreamPid}); - [#bytestream{target = Pid, initiator = undefined} = - ByteStream] - when is_pid(Pid), Pid /= StreamPid -> - mnesia:write(ByteStream#bytestream{initiator = - StreamPid}); - _ -> error - end - end, - mnesia:transaction(F). - -%%%---------------------------------------------------- -%%% unregister_stream(SHA1) -> ok | transaction abort -%%% SHA1 = string() -%%%---------------------------------------------------- -unregister_stream(SHA1) when is_binary(SHA1) -> - F = fun () -> mnesia:delete({bytestream, SHA1}) end, - mnesia:transaction(F). - -%%%-------------------------------------------------------- -%%% activate_stream(SHA1, IJid, TJid, Host) -> ok | -%%% false | -%%% limit | -%%% conflict | -%%% error -%%% SHA1 = string() -%%% IJid = TJid = jid() -%%% Host = string() -%%%-------------------------------------------------------- -activate_stream(SHA1, IJid, TJid, Host) - when is_binary(SHA1) -> - Proc = gen_mod:get_module_proc(Host, ?PROCNAME), - case catch gen_server:call(Proc, {activate, SHA1, IJid}) - of - {atomic, {ok, IPid, TPid}} -> - mod_proxy65_stream:activate({IPid, IJid}, {TPid, TJid}); - {atomic, {limit, IPid, TPid}} -> - mod_proxy65_stream:stop(IPid), - mod_proxy65_stream:stop(TPid), - limit; - {atomic, conflict} -> conflict; - {atomic, false} -> false; - _ -> error - end. diff --git a/src/mod_proxy65_stream.erl b/src/mod_proxy65_stream.erl index e6362d48c..484f41327 100644 --- a/src/mod_proxy65_stream.erl +++ b/src/mod_proxy65_stream.erl @@ -99,7 +99,8 @@ init([Socket, Host, Opts]) -> socket = Socket, shaper = Shaper, timer = TRef}}. terminate(_Reason, StateName, #state{sha1 = SHA1}) -> - catch mod_proxy65_sm:unregister_stream(SHA1), + Mod = gen_mod:ram_db_mod(global, mod_proxy65), + Mod:unregister_stream(SHA1), if StateName == stream_established -> ?INFO_MSG("Bytestream terminated", []); true -> ok @@ -168,8 +169,9 @@ wait_for_request(Packet, Request = mod_proxy65_lib:unpack_request(Packet), case Request of #s5_request{sha1 = SHA1, cmd = connect} -> - case catch mod_proxy65_sm:register_stream(SHA1) of - {atomic, ok} -> + Mod = gen_mod:ram_db_mod(global, mod_proxy65), + case Mod:register_stream(SHA1, self()) of + ok -> inet:setopts(Socket, [{active, false}]), gen_tcp:send(Socket, mod_proxy65_lib:make_reply(Request)),