Implement database backend interface for mod_proxy65

This commit is contained in:
Evgeniy Khramtsov 2017-01-16 15:28:11 +03:00
parent 0078a3b904
commit 557820707b
6 changed files with 219 additions and 189 deletions

View File

@ -28,6 +28,7 @@
%% API
-export([get_nodes/0, call/4, multicall/3, multicall/4]).
-export([join/1, leave/1]).
-export([node_id/0, get_node_by_id/1]).
-include("ejabberd.hrl").
-include("logger.hrl").
@ -102,3 +103,31 @@ leave([Master|_], Node) ->
erlang:halt(0)
end),
ok.
-spec node_id() -> binary().
node_id() ->
integer_to_binary(erlang:phash2(node())).
-spec get_node_by_id(binary()) -> node().
get_node_by_id(Hash) ->
try binary_to_integer(Hash) of
I -> match_node_id(I)
catch _:_ ->
node()
end.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec match_node_id(integer()) -> node().
match_node_id(I) ->
match_node_id(I, get_nodes()).
-spec match_node_id(integer(), [node()]) -> node().
match_node_id(I, [Node|Nodes]) ->
case erlang:phash2(Node) of
I -> Node;
_ -> match_node_id(I, Nodes)
end;
match_node_id(_I, []) ->
node().

View File

@ -43,6 +43,12 @@
-define(PROCNAME, ejabberd_mod_proxy65).
-callback init() -> any().
-callback register_stream(binary(), pid()) -> ok | {error, any()}.
-callback unregister_stream(binary()) -> ok | {error, any()}.
-callback activate_stream(binary(), binary(), pos_integer() | infinity, node()) ->
ok | {error, limit | conflict | notfound | term()}.
start(Host, Opts) ->
case mod_proxy65_service:add_listener(Host, Opts) of
{error, _} = Err -> erlang:error(Err);
@ -50,7 +56,12 @@ start(Host, Opts) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]},
transient, infinity, supervisor, [?MODULE]},
supervisor:start_child(ejabberd_sup, ChildSpec)
case supervisor:start_child(ejabberd_sup, ChildSpec) of
{error, _} = Err -> erlang:error(Err);
_ ->
Mod = gen_mod:ram_db_mod(global, ?MODULE),
Mod:init()
end
end.
stop(Host) ->
@ -77,12 +88,9 @@ init([Host, Opts]) ->
ejabberd_mod_proxy65_sup),
mod_proxy65_stream]},
transient, infinity, supervisor, [ejabberd_tmp_sup]},
StreamManager = {mod_proxy65_sm,
{mod_proxy65_sm, start_link, [Host, Opts]}, transient,
5000, worker, [mod_proxy65_sm]},
{ok,
{{one_for_one, 10, 1},
[StreamManager, StreamSupervisor, Service]}}.
[StreamSupervisor, Service]}}.
depends(_Host, _Opts) ->
[].
@ -112,7 +120,9 @@ mod_opt_type(max_connections) ->
fun (I) when is_integer(I), I > 0 -> I;
(infinity) -> infinity
end;
mod_opt_type(ram_db_type) ->
fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
mod_opt_type(_) ->
[auth_type, recbuf, shaper, sndbuf,
access, host, hostname, ip, name, port,
max_connections].
max_connections, ram_db_type].

145
src/mod_proxy65_mnesia.erl Normal file
View File

@ -0,0 +1,145 @@
%%%-------------------------------------------------------------------
%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
%%% @copyright (C) 2017, Evgeny Khramtsov
%%% @doc
%%%
%%% @end
%%% Created : 16 Jan 2017 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%-------------------------------------------------------------------
-module(mod_proxy65_mnesia).
-behaviour(gen_server).
-behaviour(mod_proxy65).
%% API
-export([init/0, register_stream/2, unregister_stream/1, activate_stream/4]).
-export([start_link/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-include("logger.hrl").
-record(bytestream,
{sha1 = <<"">> :: binary() | '$1',
target :: pid() | '_',
initiator :: pid() | '_',
active = false :: boolean() | '_',
jid_i :: undefined | binary() | '_'}).
-record(state, {}).
%%%===================================================================
%%% API
%%%===================================================================
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init() ->
Spec = {?MODULE, {?MODULE, start_link, []}, transient,
5000, worker, [?MODULE]},
supervisor:start_child(ejabberd_sup, Spec).
register_stream(SHA1, StreamPid) ->
F = fun () ->
case mnesia:read(bytestream, SHA1, write) of
[] ->
mnesia:write(#bytestream{sha1 = SHA1,
target = StreamPid});
[#bytestream{target = Pid, initiator = undefined} =
ByteStream] when is_pid(Pid), Pid /= StreamPid ->
mnesia:write(ByteStream#bytestream{
initiator = StreamPid})
end
end,
case mnesia:transaction(F) of
{atomic, ok} ->
ok;
{aborted, Reason} ->
?ERROR_MSG("Mnesia transaction failed: ~p", [Reason]),
{error, Reason}
end.
unregister_stream(SHA1) ->
F = fun () -> mnesia:delete({bytestream, SHA1}) end,
case mnesia:transaction(F) of
{atomic, ok} ->
ok;
{aborted, Reason} ->
?ERROR_MSG("Mnesia transaction failed: ~p", [Reason]),
{error, Reason}
end.
activate_stream(SHA1, Initiator, MaxConnections, _Node) ->
case gen_server:call(?MODULE,
{activate_stream, SHA1, Initiator, MaxConnections}) of
{atomic, {ok, IPid, TPid}} ->
{ok, IPid, TPid};
{atomic, {limit, IPid, TPid}} ->
{error, {limit, IPid, TPid}};
{atomic, conflict} ->
{error, conflict};
{atomic, notfound} ->
{error, notfound};
Err ->
{error, Err}
end.
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([]) ->
ejabberd_mnesia:create(?MODULE, bytestream,
[{ram_copies, [node()]},
{attributes, record_info(fields, bytestream)}]),
mnesia:add_table_copy(bytestream, node(), ram_copies),
{ok, #state{}}.
handle_call({activate_stream, SHA1, Initiator, MaxConnections}, _From, State) ->
F = fun () ->
case mnesia:read(bytestream, SHA1, write) of
[#bytestream{target = TPid, initiator = IPid} =
ByteStream] when is_pid(TPid), is_pid(IPid) ->
ActiveFlag = ByteStream#bytestream.active,
if ActiveFlag == false ->
ConnsPerJID = mnesia:select(
bytestream,
[{#bytestream{sha1 = '$1',
jid_i = Initiator,
_ = '_'},
[], ['$1']}]),
if length(ConnsPerJID) < MaxConnections ->
mnesia:write(
ByteStream#bytestream{active = true,
jid_i = Initiator}),
{ok, IPid, TPid};
true ->
{limit, IPid, TPid}
end;
true ->
conflict
end;
_ ->
notfound
end
end,
Reply = mnesia:transaction(F),
{reply, Reply, State};
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -175,31 +175,39 @@ process_bytestreams(#iq{type = set, lang = Lang, from = InitiatorJID, to = To,
all),
case acl:match_rule(ServerHost, ACL, InitiatorJID) of
allow ->
Node = ejabberd_cluster:get_node_by_id(To#jid.lresource),
Target = jid:to_string(jid:tolower(TargetJID)),
Initiator = jid:to_string(jid:tolower(InitiatorJID)),
SHA1 = p1_sha:sha(<<SID/binary, Initiator/binary, Target/binary>>),
case mod_proxy65_sm:activate_stream(SHA1, InitiatorJID,
TargetJID, ServerHost) of
ok ->
Mod = gen_mod:ram_db_mod(global, mod_proxy65),
MaxConnections = max_connections(ServerHost),
case Mod:activate_stream(SHA1, Initiator, MaxConnections, Node) of
{ok, InitiatorPid, TargetPid} ->
mod_proxy65_stream:activate(
{InitiatorPid, InitiatorJID}, {TargetPid, TargetJID}),
xmpp:make_iq_result(IQ);
false ->
{error, notfound} ->
Txt = <<"Failed to activate bytestream">>,
xmpp:make_error(IQ, xmpp:err_item_not_found(Txt, Lang));
limit ->
{error, {limit, InitiatorPid, TargetPid}} ->
mod_proxy65_stream:stop(InitiatorPid),
mod_proxy65_stream:stop(TargetPid),
Txt = <<"Too many active bytestreams">>,
xmpp:make_error(IQ, xmpp:err_resource_constraint(Txt, Lang));
conflict ->
{error, conflict} ->
Txt = <<"Bytestream already activated">>,
xmpp:make_error(IQ, xmpp:err_conflict(Txt, Lang));
Err ->
{error, Err} ->
?ERROR_MSG("failed to activate bytestream from ~s to ~s: ~p",
[Initiator, Target, Err]),
xmpp:make_error(IQ, xmpp:err_internal_server_error())
Txt = <<"Database failure">>,
xmpp:make_error(IQ, xmpp:err_internal_server_error(Txt, Lang))
end;
deny ->
Txt = <<"Denied by ACL">>,
xmpp:make_error(IQ, xmpp:err_forbidden(Txt, Lang))
end.
%%%-------------------------
%%% Auxiliary functions.
%%%-------------------------
@ -219,7 +227,8 @@ get_streamhost(Host, ServerHost) ->
HostName = gen_mod:get_module_opt(ServerHost, mod_proxy65, hostname,
fun iolist_to_binary/1,
jlib:ip_to_list(IP)),
#streamhost{jid = jid:make(Host),
Resource = ejabberd_cluster:node_id(),
#streamhost{jid = jid:make(<<"">>, Host, Resource),
host = HostName,
port = Port}.
@ -246,3 +255,9 @@ get_my_ip() ->
{ok, Addr} -> Addr;
{error, _} -> {127, 0, 0, 1}
end.
max_connections(ServerHost) ->
gen_mod:get_module_opt(ServerHost, mod_proxy65, max_connections,
fun(I) when is_integer(I), I>0 -> I;
(infinity) -> infinity
end, infinity).

View File

@ -1,171 +0,0 @@
%%%----------------------------------------------------------------------
%%% File : mod_proxy65_sm.erl
%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
%%% Purpose : Bytestreams manager.
%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2016 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%----------------------------------------------------------------------
-module(mod_proxy65_sm).
-author('xram@jabber.ru').
-behaviour(gen_server).
%% gen_server callbacks.
-export([init/1, handle_info/2, handle_call/3,
handle_cast/2, terminate/2, code_change/3]).
-export([start_link/2, register_stream/1,
unregister_stream/1, activate_stream/4]).
-record(state, {max_connections = infinity :: non_neg_integer() | infinity}).
-record(bytestream,
{sha1 = <<"">> :: binary() | '$1',
target :: pid() | '_',
initiator :: pid() | '_',
active = false :: boolean() | '_',
jid_i = {<<"">>, <<"">>, <<"">>} :: jid:ljid() | '_'}).
-define(PROCNAME, ejabberd_mod_proxy65_sm).
%% Unused callbacks.
handle_cast(_Request, State) -> {noreply, State}.
code_change(_OldVsn, State, _Extra) -> {ok, State}.
handle_info(_Info, State) -> {noreply, State}.
%%----------------
start_link(Host, Opts) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
gen_server:start_link({local, Proc}, ?MODULE, [Opts],
[]).
init([Opts]) ->
ejabberd_mnesia:create(?MODULE, bytestream,
[{ram_copies, [node()]},
{attributes, record_info(fields, bytestream)}]),
mnesia:add_table_copy(bytestream, node(), ram_copies),
MaxConnections = gen_mod:get_opt(max_connections, Opts,
fun(I) when is_integer(I), I>0 ->
I;
(infinity) ->
infinity
end, infinity),
{ok, #state{max_connections = MaxConnections}}.
terminate(_Reason, _State) -> ok.
handle_call({activate, SHA1, IJid}, _From, State) ->
MaxConns = State#state.max_connections,
F = fun () ->
case mnesia:read(bytestream, SHA1, write) of
[#bytestream{target = TPid, initiator = IPid} =
ByteStream]
when is_pid(TPid), is_pid(IPid) ->
ActiveFlag = ByteStream#bytestream.active,
if ActiveFlag == false ->
ConnsPerJID = mnesia:select(bytestream,
[{#bytestream{sha1 =
'$1',
jid_i =
IJid,
_ = '_'},
[], ['$1']}]),
if length(ConnsPerJID) < MaxConns ->
mnesia:write(ByteStream#bytestream{active =
true,
jid_i =
IJid}),
{ok, IPid, TPid};
true -> {limit, IPid, TPid}
end;
true -> conflict
end;
_ -> false
end
end,
Reply = mnesia:transaction(F),
{reply, Reply, State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
%%%----------------------
%%% API.
%%%----------------------
%%%---------------------------------------------------
%%% register_stream(SHA1) -> {atomic, ok} |
%%% {atomic, error} |
%%% transaction abort
%%% SHA1 = string()
%%%---------------------------------------------------
register_stream(SHA1) when is_binary(SHA1) ->
StreamPid = self(),
F = fun () ->
case mnesia:read(bytestream, SHA1, write) of
[] ->
mnesia:write(#bytestream{sha1 = SHA1,
target = StreamPid});
[#bytestream{target = Pid, initiator = undefined} =
ByteStream]
when is_pid(Pid), Pid /= StreamPid ->
mnesia:write(ByteStream#bytestream{initiator =
StreamPid});
_ -> error
end
end,
mnesia:transaction(F).
%%%----------------------------------------------------
%%% unregister_stream(SHA1) -> ok | transaction abort
%%% SHA1 = string()
%%%----------------------------------------------------
unregister_stream(SHA1) when is_binary(SHA1) ->
F = fun () -> mnesia:delete({bytestream, SHA1}) end,
mnesia:transaction(F).
%%%--------------------------------------------------------
%%% activate_stream(SHA1, IJid, TJid, Host) -> ok |
%%% false |
%%% limit |
%%% conflict |
%%% error
%%% SHA1 = string()
%%% IJid = TJid = jid()
%%% Host = string()
%%%--------------------------------------------------------
activate_stream(SHA1, IJid, TJid, Host)
when is_binary(SHA1) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
case catch gen_server:call(Proc, {activate, SHA1, IJid})
of
{atomic, {ok, IPid, TPid}} ->
mod_proxy65_stream:activate({IPid, IJid}, {TPid, TJid});
{atomic, {limit, IPid, TPid}} ->
mod_proxy65_stream:stop(IPid),
mod_proxy65_stream:stop(TPid),
limit;
{atomic, conflict} -> conflict;
{atomic, false} -> false;
_ -> error
end.

View File

@ -99,7 +99,8 @@ init([Socket, Host, Opts]) ->
socket = Socket, shaper = Shaper, timer = TRef}}.
terminate(_Reason, StateName, #state{sha1 = SHA1}) ->
catch mod_proxy65_sm:unregister_stream(SHA1),
Mod = gen_mod:ram_db_mod(global, mod_proxy65),
Mod:unregister_stream(SHA1),
if StateName == stream_established ->
?INFO_MSG("Bytestream terminated", []);
true -> ok
@ -168,8 +169,9 @@ wait_for_request(Packet,
Request = mod_proxy65_lib:unpack_request(Packet),
case Request of
#s5_request{sha1 = SHA1, cmd = connect} ->
case catch mod_proxy65_sm:register_stream(SHA1) of
{atomic, ok} ->
Mod = gen_mod:ram_db_mod(global, mod_proxy65),
case Mod:register_stream(SHA1, self()) of
ok ->
inet:setopts(Socket, [{active, false}]),
gen_tcp:send(Socket,
mod_proxy65_lib:make_reply(Request)),