mirror of
https://github.com/processone/ejabberd.git
synced 2024-09-15 13:51:17 +02:00
182 lines
5.3 KiB
Erlang
182 lines
5.3 KiB
Erlang
%%%----------------------------------------------------------------------
|
|
%%% File : mod_proxy65_sm.erl
|
|
%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
|
|
%%% Purpose : Bytestreams manager.
|
|
%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
|
|
%%%
|
|
%%%
|
|
%%% ejabberd, Copyright (C) 2002-2011 ProcessOne
|
|
%%%
|
|
%%% This program is free software; you can redistribute it and/or
|
|
%%% modify it under the terms of the GNU General Public License as
|
|
%%% published by the Free Software Foundation; either version 2 of the
|
|
%%% License, or (at your option) any later version.
|
|
%%%
|
|
%%% This program is distributed in the hope that it will be useful,
|
|
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
%%% General Public License for more details.
|
|
%%%
|
|
%%% You should have received a copy of the GNU General Public License
|
|
%%% along with this program; if not, write to the Free Software
|
|
%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
|
|
%%% 02111-1307 USA
|
|
%%%
|
|
%%%----------------------------------------------------------------------
|
|
|
|
-module(mod_proxy65_sm).
|
|
-author('xram@jabber.ru').
|
|
|
|
-behaviour(gen_server).
|
|
|
|
%% gen_server callbacks.
|
|
-export([init/1,
|
|
handle_info/2,
|
|
handle_call/3,
|
|
handle_cast/2,
|
|
terminate/2,
|
|
code_change/3
|
|
]).
|
|
|
|
%% API.
|
|
-export([
|
|
start_link/2,
|
|
register_stream/1,
|
|
unregister_stream/1,
|
|
activate_stream/4
|
|
]).
|
|
|
|
-record(state, {max_connections}).
|
|
-record(bytestream, {
|
|
sha1, %% SHA1 key
|
|
target, %% Target Pid
|
|
initiator, %% Initiator Pid
|
|
active = false, %% Activity flag
|
|
jid_i %% Initiator's JID
|
|
}).
|
|
|
|
-define(PROCNAME, ejabberd_mod_proxy65_sm).
|
|
|
|
%% Unused callbacks.
|
|
handle_cast(_Request, State) ->
|
|
{noreply, State}.
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
{ok, State}.
|
|
handle_info(_Info, State) ->
|
|
{noreply, State}.
|
|
%%----------------
|
|
|
|
start_link(Host, Opts) ->
|
|
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
|
|
gen_server:start_link({local, Proc}, ?MODULE, [Opts], []).
|
|
|
|
init([Opts]) ->
|
|
mnesia:create_table(bytestream, [{ram_copies, [node()]},
|
|
{attributes, record_info(fields, bytestream)}]),
|
|
mnesia:add_table_copy(bytestream, node(), ram_copies),
|
|
MaxConnections = gen_mod:get_opt(max_connections, Opts, infinity),
|
|
{ok, #state{max_connections=MaxConnections}}.
|
|
|
|
terminate(_Reason, _State) ->
|
|
ok.
|
|
|
|
handle_call({activate, SHA1, IJid}, _From, State) ->
|
|
MaxConns = State#state.max_connections,
|
|
F = fun() ->
|
|
case mnesia:read(bytestream, SHA1, write) of
|
|
[#bytestream{target = TPid, initiator = IPid} = ByteStream]
|
|
when is_pid(TPid), is_pid(IPid) ->
|
|
ActiveFlag = ByteStream#bytestream.active,
|
|
if
|
|
ActiveFlag == false ->
|
|
ConnsPerJID =
|
|
mnesia:select(bytestream,
|
|
[{#bytestream{sha1 = '$1',
|
|
jid_i = IJid,
|
|
_='_'},
|
|
[],
|
|
['$1']}]),
|
|
if
|
|
length(ConnsPerJID) < MaxConns ->
|
|
mnesia:write(
|
|
ByteStream#bytestream{active = true,
|
|
jid_i = IJid}),
|
|
{ok, IPid, TPid};
|
|
true ->
|
|
{limit, IPid, TPid}
|
|
end;
|
|
true ->
|
|
conflict
|
|
end;
|
|
_ ->
|
|
false
|
|
end
|
|
end,
|
|
Reply = mnesia:transaction(F),
|
|
{reply, Reply, State};
|
|
|
|
handle_call(_Request, _From, State) ->
|
|
{reply, ok, State}.
|
|
|
|
%%%----------------------
|
|
%%% API.
|
|
%%%----------------------
|
|
%%%---------------------------------------------------
|
|
%%% register_stream(SHA1) -> {atomic, ok} |
|
|
%%% {atomic, error} |
|
|
%%% transaction abort
|
|
%%% SHA1 = string()
|
|
%%%---------------------------------------------------
|
|
register_stream(SHA1) when is_list(SHA1) ->
|
|
StreamPid = self(),
|
|
F = fun() ->
|
|
case mnesia:read(bytestream, SHA1, write) of
|
|
[] ->
|
|
mnesia:write(#bytestream{sha1 = SHA1,
|
|
target = StreamPid});
|
|
[#bytestream{target = Pid,
|
|
initiator = undefined} = ByteStream]
|
|
when is_pid(Pid), Pid /= StreamPid ->
|
|
mnesia:write(
|
|
ByteStream#bytestream{initiator = StreamPid});
|
|
_ ->
|
|
error
|
|
end
|
|
end,
|
|
mnesia:transaction(F).
|
|
|
|
%%%----------------------------------------------------
|
|
%%% unregister_stream(SHA1) -> ok | transaction abort
|
|
%%% SHA1 = string()
|
|
%%%----------------------------------------------------
|
|
unregister_stream(SHA1) when is_list(SHA1) ->
|
|
F = fun() -> mnesia:delete({bytestream, SHA1}) end,
|
|
mnesia:transaction(F).
|
|
|
|
%%%--------------------------------------------------------
|
|
%%% activate_stream(SHA1, IJid, TJid, Host) -> ok |
|
|
%%% false |
|
|
%%% limit |
|
|
%%% conflict |
|
|
%%% error
|
|
%%% SHA1 = string()
|
|
%%% IJid = TJid = jid()
|
|
%%% Host = string()
|
|
%%%--------------------------------------------------------
|
|
activate_stream(SHA1, IJid, TJid, Host) when is_list(SHA1) ->
|
|
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
|
|
case catch gen_server:call(Proc, {activate, SHA1, IJid}) of
|
|
{atomic, {ok, IPid, TPid}} ->
|
|
mod_proxy65_stream:activate({IPid, IJid}, {TPid, TJid});
|
|
{atomic, {limit, IPid, TPid}} ->
|
|
mod_proxy65_stream:stop(IPid),
|
|
mod_proxy65_stream:stop(TPid),
|
|
limit;
|
|
{atomic, conflict} ->
|
|
conflict;
|
|
{atomic, false} ->
|
|
false;
|
|
_ ->
|
|
error
|
|
end.
|