xmpp.chapril.org-ejabberd/src/mod_proxy65_sm.erl

175 lines
5.7 KiB
Erlang

%%%----------------------------------------------------------------------
%%% File : mod_proxy65_sm.erl
%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
%%% Purpose : Bytestreams manager.
%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2013 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License
%%% along with this program; if not, write to the Free Software
%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
%%% 02111-1307 USA
%%%
%%%----------------------------------------------------------------------
-module(mod_proxy65_sm).
-author('xram@jabber.ru').
-behaviour(gen_server).
%% gen_server callbacks.
-export([init/1, handle_info/2, handle_call/3,
handle_cast/2, terminate/2, code_change/3]).
%% API.
-export([start_link/2, register_stream/1,
unregister_stream/1, activate_stream/4]).
-record(state, {max_connections = infinity :: non_neg_integer() | infinity}).
-include("jlib.hrl").
-record(bytestream,
{sha1 = <<"">> :: binary() | '$1',
target :: pid() | '_',
initiator :: pid() | '_',
active = false :: boolean() | '_',
jid_i = {<<"">>, <<"">>, <<"">>} :: ljid() | '_'}).
-define(PROCNAME, ejabberd_mod_proxy65_sm).
%% Unused callbacks.
handle_cast(_Request, State) -> {noreply, State}.
code_change(_OldVsn, State, _Extra) -> {ok, State}.
handle_info(_Info, State) -> {noreply, State}.
%%----------------
start_link(Host, Opts) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
gen_server:start_link({local, Proc}, ?MODULE, [Opts],
[]).
init([Opts]) ->
mnesia:create_table(bytestream, [{ram_copies, [node()]},
{attributes, record_info(fields, bytestream)}]),
mnesia:add_table_copy(bytestream, node(), ram_copies),
MaxConnections = gen_mod:get_opt(max_connections, Opts,
fun(I) when is_integer(I), I>0 ->
I;
(infinity) ->
infinity
end, infinity),
{ok, #state{max_connections = MaxConnections}}.
terminate(_Reason, _State) -> ok.
handle_call({activate, SHA1, IJid}, _From, State) ->
MaxConns = State#state.max_connections,
F = fun () ->
case mnesia:read(bytestream, SHA1, write) of
[#bytestream{target = TPid, initiator = IPid} =
ByteStream]
when is_pid(TPid), is_pid(IPid) ->
ActiveFlag = ByteStream#bytestream.active,
if ActiveFlag == false ->
ConnsPerJID = mnesia:select(bytestream,
[{#bytestream{sha1 =
'$1',
jid_i =
IJid,
_ = '_'},
[], ['$1']}]),
if length(ConnsPerJID) < MaxConns ->
mnesia:write(ByteStream#bytestream{active =
true,
jid_i =
IJid}),
{ok, IPid, TPid};
true -> {limit, IPid, TPid}
end;
true -> conflict
end;
_ -> false
end
end,
Reply = mnesia:transaction(F),
{reply, Reply, State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
%%%----------------------
%%% API.
%%%----------------------
%%%---------------------------------------------------
%%% register_stream(SHA1) -> {atomic, ok} |
%%% {atomic, error} |
%%% transaction abort
%%% SHA1 = string()
%%%---------------------------------------------------
register_stream(SHA1) when is_binary(SHA1) ->
StreamPid = self(),
F = fun () ->
case mnesia:read(bytestream, SHA1, write) of
[] ->
mnesia:write(#bytestream{sha1 = SHA1,
target = StreamPid});
[#bytestream{target = Pid, initiator = undefined} =
ByteStream]
when is_pid(Pid), Pid /= StreamPid ->
mnesia:write(ByteStream#bytestream{initiator =
StreamPid});
_ -> error
end
end,
mnesia:transaction(F).
%%%----------------------------------------------------
%%% unregister_stream(SHA1) -> ok | transaction abort
%%% SHA1 = string()
%%%----------------------------------------------------
unregister_stream(SHA1) when is_binary(SHA1) ->
F = fun () -> mnesia:delete({bytestream, SHA1}) end,
mnesia:transaction(F).
%%%--------------------------------------------------------
%%% activate_stream(SHA1, IJid, TJid, Host) -> ok |
%%% false |
%%% limit |
%%% conflict |
%%% error
%%% SHA1 = string()
%%% IJid = TJid = jid()
%%% Host = string()
%%%--------------------------------------------------------
activate_stream(SHA1, IJid, TJid, Host)
when is_binary(SHA1) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
case catch gen_server:call(Proc, {activate, SHA1, IJid})
of
{atomic, {ok, IPid, TPid}} ->
mod_proxy65_stream:activate({IPid, IJid}, {TPid, TJid});
{atomic, {limit, IPid, TPid}} ->
mod_proxy65_stream:stop(IPid),
mod_proxy65_stream:stop(TPid),
limit;
{atomic, conflict} -> conflict;
{atomic, false} -> false;
_ -> error
end.