mirror of
https://github.com/processone/ejabberd.git
synced 2024-12-22 17:28:25 +01:00
174 lines
5.7 KiB
Erlang
174 lines
5.7 KiB
Erlang
%%%----------------------------------------------------------------------
|
|
%%% File : mod_proxy65_sm.erl
|
|
%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
|
|
%%% Purpose : Bytestreams manager.
|
|
%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
|
|
%%%
|
|
%%%
|
|
%%% ejabberd, Copyright (C) 2002-2016 ProcessOne
|
|
%%%
|
|
%%% This program is free software; you can redistribute it and/or
|
|
%%% modify it under the terms of the GNU General Public License as
|
|
%%% published by the Free Software Foundation; either version 2 of the
|
|
%%% License, or (at your option) any later version.
|
|
%%%
|
|
%%% This program is distributed in the hope that it will be useful,
|
|
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
%%% General Public License for more details.
|
|
%%%
|
|
%%% You should have received a copy of the GNU General Public License along
|
|
%%% with this program; if not, write to the Free Software Foundation, Inc.,
|
|
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
%%%
|
|
%%%----------------------------------------------------------------------
|
|
|
|
-module(mod_proxy65_sm).
|
|
|
|
-author('xram@jabber.ru').
|
|
|
|
-behaviour(gen_server).
|
|
|
|
%% gen_server callbacks.
|
|
-export([init/1, handle_info/2, handle_call/3,
|
|
handle_cast/2, terminate/2, code_change/3]).
|
|
|
|
-export([start_link/2, register_stream/1,
|
|
unregister_stream/1, activate_stream/4]).
|
|
|
|
-record(state, {max_connections = infinity :: non_neg_integer() | infinity}).
|
|
|
|
-include("jlib.hrl").
|
|
|
|
-record(bytestream,
|
|
{sha1 = <<"">> :: binary() | '$1',
|
|
target :: pid() | '_',
|
|
initiator :: pid() | '_',
|
|
active = false :: boolean() | '_',
|
|
jid_i = {<<"">>, <<"">>, <<"">>} :: ljid() | '_'}).
|
|
|
|
-define(PROCNAME, ejabberd_mod_proxy65_sm).
|
|
|
|
%% Unused callbacks.
|
|
handle_cast(_Request, State) -> {noreply, State}.
|
|
|
|
code_change(_OldVsn, State, _Extra) -> {ok, State}.
|
|
|
|
handle_info(_Info, State) -> {noreply, State}.
|
|
|
|
%%----------------
|
|
|
|
start_link(Host, Opts) ->
|
|
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
|
|
gen_server:start_link({local, Proc}, ?MODULE, [Opts],
|
|
[]).
|
|
|
|
init([Opts]) ->
|
|
mnesia:create_table(bytestream,
|
|
[{ram_copies, [node()]},
|
|
{attributes, record_info(fields, bytestream)}]),
|
|
mnesia:add_table_copy(bytestream, node(), ram_copies),
|
|
MaxConnections = gen_mod:get_opt(max_connections, Opts,
|
|
fun(I) when is_integer(I), I>0 ->
|
|
I;
|
|
(infinity) ->
|
|
infinity
|
|
end, infinity),
|
|
{ok, #state{max_connections = MaxConnections}}.
|
|
|
|
terminate(_Reason, _State) -> ok.
|
|
|
|
handle_call({activate, SHA1, IJid}, _From, State) ->
|
|
MaxConns = State#state.max_connections,
|
|
F = fun () ->
|
|
case mnesia:read(bytestream, SHA1, write) of
|
|
[#bytestream{target = TPid, initiator = IPid} =
|
|
ByteStream]
|
|
when is_pid(TPid), is_pid(IPid) ->
|
|
ActiveFlag = ByteStream#bytestream.active,
|
|
if ActiveFlag == false ->
|
|
ConnsPerJID = mnesia:select(bytestream,
|
|
[{#bytestream{sha1 =
|
|
'$1',
|
|
jid_i =
|
|
IJid,
|
|
_ = '_'},
|
|
[], ['$1']}]),
|
|
if length(ConnsPerJID) < MaxConns ->
|
|
mnesia:write(ByteStream#bytestream{active =
|
|
true,
|
|
jid_i =
|
|
IJid}),
|
|
{ok, IPid, TPid};
|
|
true -> {limit, IPid, TPid}
|
|
end;
|
|
true -> conflict
|
|
end;
|
|
_ -> false
|
|
end
|
|
end,
|
|
Reply = mnesia:transaction(F),
|
|
{reply, Reply, State};
|
|
handle_call(_Request, _From, State) ->
|
|
{reply, ok, State}.
|
|
|
|
%%%----------------------
|
|
%%% API.
|
|
%%%----------------------
|
|
%%%---------------------------------------------------
|
|
%%% register_stream(SHA1) -> {atomic, ok} |
|
|
%%% {atomic, error} |
|
|
%%% transaction abort
|
|
%%% SHA1 = string()
|
|
%%%---------------------------------------------------
|
|
register_stream(SHA1) when is_binary(SHA1) ->
|
|
StreamPid = self(),
|
|
F = fun () ->
|
|
case mnesia:read(bytestream, SHA1, write) of
|
|
[] ->
|
|
mnesia:write(#bytestream{sha1 = SHA1,
|
|
target = StreamPid});
|
|
[#bytestream{target = Pid, initiator = undefined} =
|
|
ByteStream]
|
|
when is_pid(Pid), Pid /= StreamPid ->
|
|
mnesia:write(ByteStream#bytestream{initiator =
|
|
StreamPid});
|
|
_ -> error
|
|
end
|
|
end,
|
|
mnesia:transaction(F).
|
|
|
|
%%%----------------------------------------------------
|
|
%%% unregister_stream(SHA1) -> ok | transaction abort
|
|
%%% SHA1 = string()
|
|
%%%----------------------------------------------------
|
|
unregister_stream(SHA1) when is_binary(SHA1) ->
|
|
F = fun () -> mnesia:delete({bytestream, SHA1}) end,
|
|
mnesia:transaction(F).
|
|
|
|
%%%--------------------------------------------------------
|
|
%%% activate_stream(SHA1, IJid, TJid, Host) -> ok |
|
|
%%% false |
|
|
%%% limit |
|
|
%%% conflict |
|
|
%%% error
|
|
%%% SHA1 = string()
|
|
%%% IJid = TJid = jid()
|
|
%%% Host = string()
|
|
%%%--------------------------------------------------------
|
|
activate_stream(SHA1, IJid, TJid, Host)
|
|
when is_binary(SHA1) ->
|
|
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
|
|
case catch gen_server:call(Proc, {activate, SHA1, IJid})
|
|
of
|
|
{atomic, {ok, IPid, TPid}} ->
|
|
mod_proxy65_stream:activate({IPid, IJid}, {TPid, TJid});
|
|
{atomic, {limit, IPid, TPid}} ->
|
|
mod_proxy65_stream:stop(IPid),
|
|
mod_proxy65_stream:stop(TPid),
|
|
limit;
|
|
{atomic, conflict} -> conflict;
|
|
{atomic, false} -> false;
|
|
_ -> error
|
|
end.
|