xmpp.chapril.org-ejabberd/src/mod_proxy65/mod_proxy65_sm.erl

182 lines
5.3 KiB
Erlang

%%%----------------------------------------------------------------------
%%% File : mod_proxy65_sm.erl
%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
%%% Purpose : Bytestreams manager.
%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2009 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License
%%% along with this program; if not, write to the Free Software
%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
%%% 02111-1307 USA
%%%
%%%----------------------------------------------------------------------
-module(mod_proxy65_sm).
-author('xram@jabber.ru').
-behaviour(gen_server).
%% gen_server callbacks.
-export([init/1,
handle_info/2,
handle_call/3,
handle_cast/2,
terminate/2,
code_change/3
]).
%% API.
-export([
start_link/2,
register_stream/1,
unregister_stream/1,
activate_stream/4
]).
-record(state, {max_connections}).
-record(bytestream, {
sha1, %% SHA1 key
target, %% Target Pid
initiator, %% Initiator Pid
active = false, %% Activity flag
jid_i %% Initiator's JID
}).
-define(PROCNAME, ejabberd_mod_proxy65_sm).
%% Unused callbacks.
handle_cast(_Request, State) ->
{noreply, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handle_info(_Info, State) ->
{noreply, State}.
%%----------------
start_link(Host, Opts) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
gen_server:start_link({local, Proc}, ?MODULE, [Opts], []).
init([Opts]) ->
mnesia:create_table(bytestream, [{ram_copies, [node()]},
{attributes, record_info(fields, bytestream)}]),
mnesia:add_table_copy(bytestream, node(), ram_copies),
MaxConnections = gen_mod:get_opt(max_connections, Opts, infinity),
{ok, #state{max_connections=MaxConnections}}.
terminate(_Reason, _State) ->
ok.
handle_call({activate, SHA1, IJid}, _From, State) ->
MaxConns = State#state.max_connections,
F = fun() ->
case mnesia:read(bytestream, SHA1, write) of
[#bytestream{target = TPid, initiator = IPid} = ByteStream]
when is_pid(TPid), is_pid(IPid) ->
ActiveFlag = ByteStream#bytestream.active,
if
ActiveFlag == false ->
ConnsPerJID =
mnesia:select(bytestream,
[{#bytestream{sha1 = '$1',
jid_i = IJid,
_='_'},
[],
['$1']}]),
if
length(ConnsPerJID) < MaxConns ->
mnesia:write(
ByteStream#bytestream{active = true,
jid_i = IJid}),
{ok, IPid, TPid};
true ->
{limit, IPid, TPid}
end;
true ->
conflict
end;
_ ->
false
end
end,
Reply = mnesia:transaction(F),
{reply, Reply, State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
%%%----------------------
%%% API.
%%%----------------------
%%%---------------------------------------------------
%%% register_stream(SHA1) -> {atomic, ok} |
%%% {atomic, error} |
%%% transaction abort
%%% SHA1 = string()
%%%---------------------------------------------------
register_stream(SHA1) when is_list(SHA1) ->
StreamPid = self(),
F = fun() ->
case mnesia:read(bytestream, SHA1, write) of
[] ->
mnesia:write(#bytestream{sha1 = SHA1,
target = StreamPid});
[#bytestream{target = Pid,
initiator = undefined} = ByteStream]
when is_pid(Pid), Pid /= StreamPid ->
mnesia:write(
ByteStream#bytestream{initiator = StreamPid});
_ ->
error
end
end,
mnesia:transaction(F).
%%%----------------------------------------------------
%%% unregister_stream(SHA1) -> ok | transaction abort
%%% SHA1 = string()
%%%----------------------------------------------------
unregister_stream(SHA1) when is_list(SHA1) ->
F = fun() -> mnesia:delete({bytestream, SHA1}) end,
mnesia:transaction(F).
%%%--------------------------------------------------------
%%% activate_stream(SHA1, IJid, TJid, Host) -> ok |
%%% false |
%%% limit |
%%% conflict |
%%% error
%%% SHA1 = string()
%%% IJid = TJid = jid()
%%% Host = string()
%%%--------------------------------------------------------
activate_stream(SHA1, IJid, TJid, Host) when is_list(SHA1) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
case catch gen_server:call(Proc, {activate, SHA1, IJid}) of
{atomic, {ok, IPid, TPid}} ->
mod_proxy65_stream:activate({IPid, IJid}, {TPid, TJid});
{atomic, {limit, IPid, TPid}} ->
mod_proxy65_stream:stop(IPid),
mod_proxy65_stream:stop(TPid),
limit;
{atomic, conflict} ->
conflict;
{atomic, false} ->
false;
_ ->
error
end.