* src/mod_proxy65/: XEP-0065 proxy (thanks to Evgeniy Khramtsov)

* src/Makefile.win32: Likewise
* src/Makefile.in: Likewise
* src/configure.ac: Likewise
* src/jlib.hrl: Likewise
* src/ejabberd.hrl: Added the ejabberd URL

SVN Revision: 666
This commit is contained in:
Alexey Shchepin 2006-10-28 02:04:55 +00:00
parent 73e2972886
commit 399d170a78
15 changed files with 2600 additions and 2343 deletions

View File

@ -1,3 +1,12 @@
2006-10-28 Alexey Shchepin <alexey@sevcom.net>
* src/mod_proxy65/: XEP-0065 proxy (thanks to Evgeniy Khramtsov)
* src/Makefile.win32: Likewise
* src/Makefile.in: Likewise
* src/configure.ac: Likewise
* src/jlib.hrl: Likewise
* src/ejabberd.hrl: Added the ejabberd URL
2006-10-27 Mickael Remond <mickael.remond@process-one.net>
* src/guide.tex: Fixed typos.

View File

@ -27,7 +27,7 @@ endif
prefix = @prefix@
SUBDIRS = @mod_irc@ @mod_pubsub@ @mod_muc@ @eldap@ @web@ stringprep @tls@ @odbc@ @ejabberd_zlib@
SUBDIRS = @mod_irc@ @mod_pubsub@ @mod_muc@ @mod_proxy65@ @eldap@ @web@ stringprep @tls@ @odbc@ @ejabberd_zlib@
ERLSHLIBS = expat_erl.so
SOURCES = $(wildcard *.erl)
BEAMS = $(SOURCES:.erl=.beam)

View File

@ -59,6 +59,9 @@ release : build release_clean
copy mod_muc\*.erl $(SRC_DIR)\mod_muc
mkdir $(SRC_DIR)\mod_pubsub
copy mod_pubsub\*.erl $(SRC_DIR)\mod_pubsub
mkdir $(SRC_DIR)\mod_proxy65
copy mod_proxy65\*.erl $(SRC_DIR)\mod_proxy65
copy mod_proxy65\*.hrl $(SRC_DIR)\mod_proxy65
mkdir $(SRC_DIR)\stringprep
copy stringprep\*.erl $(SRC_DIR)\stringprep
copy stringprep\*.c $(SRC_DIR)\stringprep
@ -94,6 +97,8 @@ all-recursive :
nmake -nologo -f Makefile.win32
cd ..\mod_pubsub
nmake -nologo -f Makefile.win32
cd ..\mod_proxy65
nmake -nologo -f Makefile.win32
cd ..\stringprep
nmake -nologo -f Makefile.win32
cd ..\tls
@ -133,6 +138,8 @@ clean-recursive :
nmake -nologo -f Makefile.win32 clean
cd ..\mod_pubsub
nmake -nologo -f Makefile.win32 clean
cd ..\mod_proxy65
nmake -nologo -f Makefile.win32 clean
cd ..\stringprep
nmake -nologo -f Makefile.win32 clean
cd ..\tls

4051
src/configure vendored

File diff suppressed because it is too large Load Diff

View File

@ -30,6 +30,7 @@ AC_HEADER_STDC
AC_MOD_ENABLE(mod_pubsub, yes)
AC_MOD_ENABLE(mod_irc, yes)
AC_MOD_ENABLE(mod_muc, yes)
AC_MOD_ENABLE(mod_proxy65, yes)
AC_MOD_ENABLE(eldap, yes)
AC_MOD_ENABLE(web, yes)
AC_MOD_ENABLE(tls, yes)
@ -58,6 +59,7 @@ AC_CONFIG_FILES([Makefile
$make_mod_irc
$make_mod_muc
$make_mod_pubsub
$make_mod_proxy65
$make_eldap
$make_web
stringprep/Makefile

View File

@ -39,3 +39,4 @@
-define(PRIVACY_SUPPORT, true).
-define(EJABBERD_URI, "http://ejabberd.jabber.ru").

View File

@ -34,6 +34,7 @@
-define(NS_PUBSUB_OWNER, "http://jabber.org/protocol/pubsub#owner").
-define(NS_PUBSUB_NMI, "http://jabber.org/protocol/pubsub#node-meta-info").
-define(NS_COMMANDS, "http://jabber.org/protocol/commands").
-define(NS_BYTESTREAMS, "http://jabber.org/protocol/bytestreams").
-define(NS_EJABBERD_CONFIG, "ejabberd:config").

View File

@ -0,0 +1,33 @@
# $Id$
CC = @CC@
CFLAGS = @CFLAGS@ @ERLANG_CFLAGS@
CPPFLAGS = @CPPFLAGS@
LDFLAGS = @LDFLAGS@
LIBS = @LIBS@ @ERLANG_LIBS@
SUBDIRS =
OUTDIR = ..
EFLAGS = -I .. -pz ..
OBJS = \
$(OUTDIR)/mod_proxy65.beam \
$(OUTDIR)/mod_proxy65_service.beam \
$(OUTDIR)/mod_proxy65_sm.beam \
$(OUTDIR)/mod_proxy65_stream.beam \
$(OUTDIR)/mod_proxy65_lib.beam
all: $(OBJS)
$(OUTDIR)/%.beam: %.erl
@ERLC@ -W $(EFLAGS) -o $(OUTDIR) $<
clean:
rm -f $(OBJS)
distclean: clean
rm -f Makefile
TAGS:
etags *.erl

View File

@ -0,0 +1,32 @@
include ..\Makefile.inc
OUTDIR = ..
EFLAGS = -I .. -pz ..
OBJS = \
$(OUTDIR)\mod_proxy65.beam \
$(OUTDIR)\mod_proxy65_service.beam \
$(OUTDIR)\mod_proxy65_sm.beam \
$(OUTDIR)\mod_proxy65_stream.beam \
$(OUTDIR)\mod_proxy65_lib.beam
ALL : $(OBJS)
CLEAN :
-@erase $(OBJS)
$(OUTDIR)\mod_proxy65.beam : mod_proxy65.erl
erlc -W $(EFLAGS) -o $(OUTDIR) mod_proxy65.erl
$(OUTDIR)\mod_proxy65_service.beam : mod_proxy65_service.erl
erlc -W $(EFLAGS) -o $(OUTDIR) mod_proxy65_service.erl
$(OUTDIR)\mod_proxy65_sm.beam : mod_proxy65_sm.erl
erlc -W $(EFLAGS) -o $(OUTDIR) mod_mod_proxy65_sm.erl
$(OUTDIR)\mod_proxy65_stream.beam : mod_proxy65_stream.erl
erlc -W $(EFLAGS) -o $(OUTDIR) mod_proxy65_stream.erl
$(OUTDIR)\mod_proxy65_lib.beam : mod_proxy65_lib.erl
erlc -W $(EFLAGS) -o $(OUTDIR) mod_proxy65_lib.erl

View File

@ -0,0 +1,57 @@
%%%----------------------------------------------------------------------
%%% File : mod_proxy65.erl
%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
%%% Purpose : Main supervisor.
%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
%%% Id : $Id$
%%%----------------------------------------------------------------------
-module(mod_proxy65).
-author('xram@jabber.ru').
-behaviour(gen_mod).
-behaviour(supervisor).
%% gen_mod callbacks.
-export([start/2, stop/1]).
%% supervisor callbacks.
-export([init/1]).
%% API.
-export([start_link/2]).
-define(PROCNAME, ejabberd_mod_proxy65).
start(Host, Opts) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
ChildSpec = {
Proc, {?MODULE, start_link, [Host, Opts]},
transient, infinity, supervisor, [?MODULE]
},
supervisor:start_child(ejabberd_sup, ChildSpec).
stop(Host) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
supervisor:terminate_child(ejabberd_sup, Proc),
supervisor:delete_child(ejabberd_sup, Proc).
start_link(Host, Opts) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
supervisor:start_link({local, Proc}, ?MODULE, [Host, Opts]).
init([Host, Opts]) ->
Service =
{mod_proxy65_service, {mod_proxy65_service, start_link, [Host, Opts]},
transient, 5000, worker, [mod_proxy65_service]},
StreamSupervisor =
{ejabberd_mod_proxy65_sup,
{ejabberd_tmp_sup, start_link,
[gen_mod:get_module_proc(Host, ejabberd_mod_proxy65_sup),
mod_proxy65_stream]},
transient, infinity, supervisor, [ejabberd_tmp_sup]},
StreamManager =
{mod_proxy65_sm, {mod_proxy65_sm, start_link, [Host, Opts]},
transient, 5000, worker, [mod_proxy65_sm]},
{ok, {{one_for_one, 10, 1},
[StreamManager, StreamSupervisor, Service]}}.

View File

@ -0,0 +1,46 @@
%%%----------------------------------------------------------------------
%%% File : mod_proxy65.hrl
%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
%%% Purpose : RFC 1928 constants.
%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
%%% Id : $Id$
%%%----------------------------------------------------------------------
%% Version
-define(VERSION_5, 5).
%% Authentication methods
-define(AUTH_ANONYMOUS, 0).
-define(AUTH_GSSAPI, 1).
-define(AUTH_PLAIN, 2).
-define(AUTH_NO_METHODS, 16#FF).
%% Address Type
-define(ATYP_IPV4, 1).
-define(ATYP_DOMAINNAME, 3).
-define(ATYP_IPV6, 4).
%% Commands
-define(CMD_CONNECT, 1).
-define(CMD_BIND, 2).
-define(CMD_UDP, 3).
%% RFC 1928 replies
-define(SUCCESS, 0).
-define(ERR_GENERAL_FAILURE, 0).
-define(ERR_NOT_ALLOWED, 2).
-define(ERR_NETWORK_UNREACHABLE, 3).
-define(ERR_HOST_UNREACHABLE, 4).
-define(ERR_CONNECTION_REFUSED, 5).
-define(ERR_TTL_EXPIRED, 6).
-define(ERR_COMMAND_NOT_SUPPORTED, 7).
-define(ERR_ADDRESS_TYPE_NOT_SUPPORTED, 8).
%% RFC 1928 defined timeout.
-define(SOCKS5_REPLY_TIMEOUT, 10000).
-record(s5_request, {
rsv = 0,
cmd,
sha1
}).

View File

@ -0,0 +1,69 @@
%%%----------------------------------------------------------------------
%%% File : mod_proxy65_lib.erl
%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
%%% Purpose : SOCKS5 parsing library.
%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
%%% Id : $Id$
%%%----------------------------------------------------------------------
-module(mod_proxy65_lib).
-author('xram@jabber.ru').
-include("mod_proxy65.hrl").
-export([
unpack_init_message/1,
unpack_auth_request/1,
unpack_request/1,
make_init_reply/1,
make_auth_reply/1,
make_reply/0,
make_error_reply/1,
make_error_reply/2
]).
unpack_init_message(<<?VERSION_5, N, AuthMethodList:N/binary>>)
when N > 0, N < 256 ->
{ok, binary_to_list(AuthMethodList)};
unpack_init_message(_) ->
error.
unpack_auth_request(<<1, ULen, User:ULen/binary,
PLen, Pass:PLen/binary>>) when ULen < 256, PLen < 256 ->
{binary_to_list(User), binary_to_list(Pass)};
unpack_auth_request(_) ->
error.
unpack_request(<<?VERSION_5, CMD, RSV,
?ATYP_DOMAINNAME, 40,
SHA1:40/binary, 0, 0>>) when CMD == ?CMD_CONNECT;
CMD == ?CMD_UDP ->
Command = if
CMD == ?CMD_CONNECT -> connect;
CMD == ?CMD_UDP -> udp
end,
#s5_request{cmd = Command, rsv = RSV, sha1 = binary_to_list(SHA1)};
unpack_request(_) ->
error.
make_init_reply(Method) ->
[?VERSION_5, Method].
make_auth_reply(true) -> [1, ?SUCCESS];
make_auth_reply(false) -> [1, ?ERR_NOT_ALLOWED].
%% WARNING: According to SOCKS5 RFC, this reply is _incorrect_, but
%% Psi writes junk to the beginning of the file on correct reply.
%% I'm not sure, but there may be an issue with other clients.
%% Needs more testing.
make_reply() ->
[?VERSION_5, ?SUCCESS, 0, 0, 0, 0].
make_error_reply(Request) ->
make_error_reply(Request, ?ERR_NOT_ALLOWED).
make_error_reply(#s5_request{rsv = RSV, sha1 = SHA1}, Reason) ->
[?VERSION_5, Reason, RSV, ?ATYP_DOMAINNAME, length(SHA1), SHA1, 0,0].

View File

@ -0,0 +1,198 @@
%%%----------------------------------------------------------------------
%%% File : mod_proxy65_service.erl
%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
%%% Purpose : SOCKS5 Bytestreams XMPP service.
%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
%%% Id : $Id$
%%%----------------------------------------------------------------------
-module(mod_proxy65_service).
-author('xram@jabber.ru').
-behaviour(gen_server).
%% gen_server callbacks.
-export([init/1,
handle_info/2,
handle_call/3,
handle_cast/2,
terminate/2,
code_change/3
]).
%% API.
-export([start_link/2]).
-include("../ejabberd.hrl").
-include("../jlib.hrl").
-define(PROCNAME, ejabberd_mod_proxy65_service).
-record(state, {
myhost,
serverhost,
name,
stream_addr,
port,
acl
}).
%% Unused callbacks.
handle_cast(_Request, State) ->
{noreply, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
%%----------------
start_link(Host, Opts) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
gen_server:start_link({local, Proc}, ?MODULE, [Host, Opts], []).
init([Host, Opts]) ->
{IP, State} = parse_options(Host, Opts),
NewOpts = [Host, {ip, IP} | Opts],
ejabberd_listener:add_listener(State#state.port, mod_proxy65_stream, NewOpts),
ejabberd_router:register_route(State#state.myhost),
{ok, State}.
terminate(_Reason, #state{myhost=MyHost, port=Port}) ->
catch ejabberd_listener:delete_listener(Port),
ejabberd_router:unregister_route(MyHost),
ok.
handle_info({route, From, To, {xmlelement, "iq", _, _} = Packet}, State) ->
IQ = jlib:iq_query_info(Packet),
case catch process_iq(From, IQ, State) of
Result when is_record(Result, iq) ->
ejabberd_router:route(To, From, jlib:iq_to_xml(Result));
{'EXIT', Reason} ->
?ERROR_MSG("Error when processing IQ stanza: ~p", [Reason]),
Err = jlib:make_error_reply(Packet, ?ERR_INTERNAL_SERVER_ERROR),
ejabberd_router:route(To, From, Err);
_ ->
ok
end,
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
%%%------------------------
%%% IQ Processing
%%%------------------------
%% disco#info request
process_iq(_, #iq{type = get, xmlns = ?NS_DISCO_INFO} = IQ, #state{name=Name}) ->
IQ#iq{type = result, sub_el =
[{xmlelement, "query", [{"xmlns", ?NS_DISCO_INFO}], iq_disco_info(Name)}]};
%% disco#items request
process_iq(_, #iq{type = get, xmlns = ?NS_DISCO_ITEMS} = IQ, _) ->
IQ#iq{type = result, sub_el =
[{xmlelement, "query", [{"xmlns", ?NS_DISCO_ITEMS}], []}]};
%% vCard request
process_iq(_, #iq{type = get, xmlns = ?NS_VCARD, lang = Lang} = IQ, _) ->
IQ#iq{type = result, sub_el =
[{xmlelement, "vCard", [{"xmlns", ?NS_VCARD}], iq_vcard(Lang)}]};
%% bytestreams info request
process_iq(JID, #iq{type = get, sub_el = SubEl, xmlns = ?NS_BYTESTREAMS} = IQ,
#state{acl = ACL, stream_addr = StreamAddr, serverhost = ServerHost}) ->
case acl:match_rule(ServerHost, ACL, JID) of
allow ->
StreamHostEl = [{xmlelement, "streamhost", StreamAddr, []}],
IQ#iq{type = result, sub_el =
[{xmlelement, "query", [{"xmlns", ?NS_BYTESTREAMS}], StreamHostEl}]};
deny ->
IQ#iq{type = error, sub_el = [SubEl, ?ERR_FORBIDDEN]}
end;
%% bytestream activation request
process_iq(InitiatorJID, #iq{type = set, sub_el = SubEl, xmlns = ?NS_BYTESTREAMS} = IQ,
#state{acl = ACL, serverhost = ServerHost}) ->
case acl:match_rule(ServerHost, ACL, InitiatorJID) of
allow ->
ActivateEl = xml:get_path_s(SubEl, [{elem, "activate"}]),
SID = xml:get_tag_attr_s("sid", SubEl),
case catch jlib:string_to_jid(xml:get_tag_cdata(ActivateEl)) of
TargetJID when is_record(TargetJID, jid), SID /= "",
length(SID) =< 128, TargetJID /= InitiatorJID ->
Target = jlib:jid_to_string(jlib:jid_tolower(TargetJID)),
Initiator = jlib:jid_to_string(jlib:jid_tolower(InitiatorJID)),
SHA1 = sha:sha(SID ++ Initiator ++ Target),
case mod_proxy65_sm:activate_stream(SHA1, InitiatorJID, TargetJID, ServerHost) of
ok ->
IQ#iq{type = result, sub_el = []};
false ->
IQ#iq{type = error, sub_el = [SubEl, ?ERR_ITEM_NOT_FOUND]};
limit ->
IQ#iq{type = error, sub_el = [SubEl, ?ERR_RESOURCE_CONSTRAINT]};
conflict ->
IQ#iq{type = error, sub_el = [SubEl, ?ERR_CONFLICT]};
_ ->
IQ#iq{type = error, sub_el = [SubEl, ?ERR_INTERNAL_SERVER_ERROR]}
end;
_ ->
IQ#iq{type = error, sub_el = [SubEl, ?ERR_BAD_REQUEST]}
end;
deny ->
IQ#iq{type = error, sub_el = [SubEl, ?ERR_FORBIDDEN]}
end;
%% Unknown "set" or "get" request
process_iq(_, #iq{type=Type, sub_el=SubEl} = IQ, _) when Type==get; Type==set ->
IQ#iq{type = error, sub_el = [SubEl, ?ERR_SERVICE_UNAVAILABLE]};
%% IQ "result" or "error".
process_iq(_, _, _) ->
ok.
%%%-------------------------
%%% Auxiliary functions.
%%%-------------------------
-define(FEATURE(Feat), {xmlelement,"feature",[{"var", Feat}],[]}).
iq_disco_info(Name) ->
[{xmlelement, "identity",
[{"category", "proxy"},
{"type", "bytestreams"},
{"name", Name}], []},
?FEATURE(?NS_DISCO_INFO),
?FEATURE(?NS_DISCO_ITEMS),
?FEATURE(?NS_VCARD),
?FEATURE(?NS_BYTESTREAMS)].
iq_vcard(Lang) ->
[{xmlelement, "FN", [],
[{xmlcdata, "ejabberd/mod_proxy65"}]},
{xmlelement, "URL", [],
[{xmlcdata, ?EJABBERD_URI}]},
{xmlelement, "DESC", [],
[{xmlcdata, translate:translate(Lang, "ejabberd SOCKS5 Bytestreams module\n"
"Copyright (c) 2003-2006 Alexey Shchepin")}]}].
parse_options(ServerHost, Opts) ->
MyHost = gen_mod:get_opt(host, Opts, "proxy." ++ ServerHost),
Port = gen_mod:get_opt(port, Opts, 7777),
ACL = gen_mod:get_opt(access, Opts, all),
Name = gen_mod:get_opt(name, Opts, "SOCKS5 Bytestreams"),
IP = case gen_mod:get_opt(ip, Opts, none) of
none ->
case inet:getaddr(MyHost, inet) of
{ok, Addr} -> Addr;
{error, _} -> {127,0,0,1}
end;
Addr ->
Addr
end,
[_ | StrIP] = lists:append([[$. | integer_to_list(X)] || X <- inet:ip_to_bytes(IP)]),
StreamAddr = [{"jid", MyHost}, {"host", StrIP}, {"port", integer_to_list(Port)}],
{IP, #state{myhost = MyHost,
serverhost = ServerHost,
name = Name,
port = Port,
stream_addr = StreamAddr,
acl = ACL}}.

View File

@ -0,0 +1,162 @@
%%%----------------------------------------------------------------------
%%% File : mod_proxy65_sm.erl
%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
%%% Purpose : Bytestreams manager.
%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
%%% Id : $Id$
%%%----------------------------------------------------------------------
-module(mod_proxy65_sm).
-author('xram@jabber.ru').
-behaviour(gen_server).
%% gen_server callbacks.
-export([init/1,
handle_info/2,
handle_call/3,
handle_cast/2,
terminate/2,
code_change/3
]).
%% API.
-export([
start_link/2,
register_stream/1,
unregister_stream/1,
activate_stream/4
]).
-record(state, {max_connections}).
-record(bytestream, {
sha1, %% SHA1 key
target, %% Target Pid
initiator, %% Initiator Pid
active = false, %% Activity flag
jid_i %% Initiator's JID
}).
-define(PROCNAME, ejabberd_mod_proxy65_sm).
%% Unused callbacks.
handle_cast(_Request, State) ->
{noreply, State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handle_info(_Info, State) ->
{noreply, State}.
%%----------------
start_link(Host, Opts) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
gen_server:start_link({local, Proc}, ?MODULE, [Opts], []).
init([Opts]) ->
mnesia:create_table(bytestream, [{ram_copies, [node()]},
{attributes, record_info(fields, bytestream)}]),
MaxConnections = gen_mod:get_opt(max_connections, Opts, infinity),
{ok, #state{max_connections=MaxConnections}}.
terminate(_Reason, _State) ->
ok.
handle_call({activate, SHA1, IJid}, _From, State) ->
MaxConns = State#state.max_connections,
F = fun() ->
case mnesia:read(bytestream, SHA1, write) of
[#bytestream{target = TPid, initiator = IPid} = ByteStream]
when is_pid(TPid), is_pid(IPid) ->
ActiveFlag = ByteStream#bytestream.active,
if
ActiveFlag == false ->
ConnsPerJID =
mnesia:select(bytestream,
[{#bytestream{sha1 = '$1',
jid_i = IJid,
_='_'},
[],
['$1']}]),
if
length(ConnsPerJID) < MaxConns ->
mnesia:write(
ByteStream#bytestream{active = true,
jid_i = IJid}),
{ok, IPid, TPid};
true ->
{limit, IPid, TPid}
end;
true ->
conflict
end;
_ ->
false
end
end,
Reply = mnesia:transaction(F),
{reply, Reply, State};
handle_call(_Request, _From, State) ->
{reply, ok, State}.
%%%----------------------
%%% API.
%%%----------------------
%%%---------------------------------------------------
%%% register_stream(SHA1) -> {atomic, ok} |
%%% {atomic, error} |
%%% transaction abort
%%% SHA1 = string()
%%%---------------------------------------------------
register_stream(SHA1) when is_list(SHA1) ->
StreamPid = self(),
F = fun() ->
case mnesia:read(bytestream, SHA1, write) of
[] ->
mnesia:write(#bytestream{sha1 = SHA1,
target = StreamPid});
[#bytestream{target = Pid,
initiator = undefined} = ByteStream]
when is_pid(Pid), Pid /= StreamPid ->
mnesia:write(
ByteStream#bytestream{initiator = StreamPid});
_ ->
error
end
end,
mnesia:transaction(F).
%%%----------------------------------------------------
%%% unregister_stream(SHA1) -> ok | transaction abort
%%% SHA1 = string()
%%%----------------------------------------------------
unregister_stream(SHA1) when is_list(SHA1) ->
F = fun() -> mnesia:delete({bytestream, SHA1}) end,
mnesia:transaction(F).
%%%--------------------------------------------------------
%%% activate_stream(SHA1, IJid, TJid, Host) -> ok |
%%% false |
%%% limit |
%%% conflict |
%%% error
%%% SHA1 = string()
%%% IJid = TJid = jid()
%%% Host = string()
%%%--------------------------------------------------------
activate_stream(SHA1, IJid, TJid, Host) when is_list(SHA1) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
case catch gen_server:call(Proc, {activate, SHA1, IJid}) of
{atomic, {ok, IPid, TPid}} ->
mod_proxy65_stream:activate({IPid, IJid}, {TPid, TJid});
{atomic, {limit, IPid, TPid}} ->
mod_proxy65_stream:stop(IPid),
mod_proxy65_stream:stop(TPid),
limit;
{atomic, conflict} ->
conflict;
{atomic, false} ->
false;
_ ->
error
end.

View File

@ -0,0 +1,273 @@
%%%----------------------------------------------------------------------
%%% File : mod_proxy65_stream.erl
%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
%%% Purpose : Bytestream process.
%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
%%% Id : $Id$
%%%----------------------------------------------------------------------
-module(mod_proxy65_stream).
-author('xram@jabber.ru').
-behaviour(gen_fsm).
%% gen_fsm callbacks.
-export([
init/1,
handle_event/3,
handle_sync_event/4,
code_change/4,
handle_info/3,
terminate/3
]).
%% gen_fsm states.
-export([
wait_for_init/2,
wait_for_auth/2,
wait_for_request/2,
wait_for_activation/2,
stream_established/2
]).
%% API.
-export([
start/2,
stop/1,
start_link/3,
activate/2,
relay/3,
socket_type/0
]).
-include("mod_proxy65.hrl").
-include("../ejabberd.hrl").
-define(WAIT_TIMEOUT, 60000). %% 1 minute (is it enough?)
-record(state, {
socket, %% TCP socket
timer, %% timer reference
sha1, %% SHA1 key
host, %% virtual host
auth_type, %% authentication type: anonymous or plain
shaper, %% Shaper name
active = false %% Activity flag
}).
%% Unused callbacks
handle_event(_Event, StateName, StateData) ->
{next_state, StateName, StateData}.
code_change(_OldVsn, StateName, StateData, _Extra) ->
{ok, StateName, StateData}.
%%-------------------------------
start({gen_tcp, Socket}, [Host | Opts]) ->
Supervisor = gen_mod:get_module_proc(Host, ejabberd_mod_proxy65_sup),
supervisor:start_child(Supervisor, [Socket, Host, Opts]).
start_link(Socket, Host, Opts) ->
gen_fsm:start_link(?MODULE, [Socket, Host, Opts], []).
init([Socket, Host, Opts]) ->
process_flag(trap_exit, true),
AuthType = gen_mod:get_opt(auth_type, Opts, anonymous),
Shaper = gen_mod:get_opt(shaper, Opts, none),
RecvBuf = gen_mod:get_opt(recbuf, Opts, 65535),
SendBuf = gen_mod:get_opt(sndbuf, Opts, 65535),
TRef = erlang:send_after(?WAIT_TIMEOUT, self(), stop),
inet:setopts(Socket, [{active, true}, {recbuf, RecvBuf}, {sndbuf, SendBuf}]),
{ok, wait_for_init, #state{host = Host,
auth_type = AuthType,
socket = Socket,
shaper = Shaper,
timer = TRef}}.
terminate(_Reason, _StateName, #state{sha1=SHA1,active=Flag}) ->
catch mod_proxy65_sm:unregister_stream(SHA1),
if Flag==true ->
?INFO_MSG("Bytestream terminated", []);
true ->
ok
end.
%%%------------------------------
%%% API.
%%%------------------------------
socket_type() ->
raw.
stop(StreamPid) ->
StreamPid ! stop.
activate({P1, J1}, {P2, J2}) ->
case catch {gen_fsm:sync_send_all_state_event(P1, get_socket),
gen_fsm:sync_send_all_state_event(P2, get_socket)} of
{S1, S2} when is_port(S1), is_port(S2) ->
P1 ! {activate, P2, S2, J1, J2},
P2 ! {activate, P1, S1, J1, J2},
JID1 = jlib:jid_to_string(J1),
JID2 = jlib:jid_to_string(J2),
?INFO_MSG("(~w:~w) Activated bytestream for ~s -> ~s", [P1, P2, JID1, JID2]),
ok;
_ ->
error
end.
%%%-----------------------
%%% States
%%%-----------------------
wait_for_init(Packet, #state{socket=Socket, auth_type=AuthType} = StateData) ->
case mod_proxy65_lib:unpack_init_message(Packet) of
{ok, AuthMethods} ->
Method = select_auth_method(AuthType, AuthMethods),
gen_tcp:send(Socket, mod_proxy65_lib:make_init_reply(Method)),
case Method of
?AUTH_ANONYMOUS ->
{next_state, wait_for_request, StateData};
?AUTH_PLAIN ->
{next_state, wait_for_auth, StateData};
?AUTH_NO_METHODS ->
{stop, normal, StateData}
end;
error ->
{stop, normal, StateData}
end.
wait_for_auth(Packet, #state{socket=Socket, host=Host} = StateData) ->
case mod_proxy65_lib:unpack_auth_request(Packet) of
{User, Pass} ->
Result = ejabberd_auth:check_password(User, Host, Pass),
gen_tcp:send(Socket, mod_proxy65_lib:make_auth_reply(Result)),
case Result of
true ->
{next_state, wait_for_request, StateData};
false ->
{stop, normal, StateData}
end;
_ ->
{stop, normal, StateData}
end.
wait_for_request(Packet, #state{socket=Socket} = StateData) ->
Request = mod_proxy65_lib:unpack_request(Packet),
case Request of
#s5_request{sha1=SHA1, cmd=connect} ->
case catch mod_proxy65_sm:register_stream(SHA1) of
{atomic, ok} ->
inet:setopts(Socket, [{active, false}]),
gen_tcp:send(Socket, mod_proxy65_lib:make_reply()),
{next_state, wait_for_activation, StateData#state{sha1=SHA1}};
_ ->
Err = mod_proxy65_lib:make_error_reply(Request),
gen_tcp:send(Socket, Err),
{stop, normal, StateData}
end;
#s5_request{cmd=udp} ->
Err = mod_proxy65_lib:make_error_reply(Request, ?ERR_COMMAND_NOT_SUPPORTED),
gen_tcp:send(Socket, Err),
{stop, normal, StateData};
_ ->
{stop, normal, StateData}
end.
wait_for_activation(_Data, StateData) ->
{next_state, wait_for_activation, StateData}.
stream_established(_Data, StateData) ->
{next_state, stream_established, StateData}.
%%%-----------------------
%%% Callbacks processing
%%%-----------------------
%% SOCKS5 packets.
handle_info({tcp, _S, Data}, StateName, StateData)
when StateName /= wait_for_activation ->
erlang:cancel_timer(StateData#state.timer),
TRef = erlang:send_after(?WAIT_TIMEOUT, self(), stop),
gen_fsm:send_event(self(), Data),
{next_state, StateName, StateData#state{timer=TRef}};
%% Activation message.
handle_info({activate, PeerPid, PeerSocket, IJid, TJid},
wait_for_activation, StateData) ->
erlang:monitor(process, PeerPid),
erlang:cancel_timer(StateData#state.timer),
MySocket = StateData#state.socket,
Shaper = StateData#state.shaper,
Host = StateData#state.host,
MaxRate = find_maxrate(Shaper, IJid, TJid, Host),
spawn_link(?MODULE, relay, [MySocket, PeerSocket, MaxRate]),
{next_state, stream_established, StateData#state{active=true}};
%% Socket closed
handle_info({tcp_closed, _Socket}, _StateName, StateData) ->
{stop, normal, StateData};
handle_info({tcp_error, _Socket, _Reason}, _StateName, StateData) ->
{stop, normal, StateData};
%% Got stop message.
handle_info(stop, _StateName, StateData) ->
{stop, normal, StateData};
%% Either linked process or peer process died.
handle_info({'EXIT',_,_}, _StateName, StateData) ->
{stop, normal, StateData};
handle_info({'DOWN',_,_,_,_}, _StateName, StateData) ->
{stop, normal, StateData};
%% Packets of no interest
handle_info(_Info, StateName, StateData) ->
{next_state, StateName, StateData}.
%% Socket request.
handle_sync_event(get_socket, _From, wait_for_activation, StateData) ->
Socket = StateData#state.socket,
{reply, Socket, wait_for_activation, StateData};
handle_sync_event(_Event, _From, StateName, StateData) ->
{reply, error, StateName, StateData}.
%%%-------------------------------------------------
%%% Relay Process.
%%%-------------------------------------------------
relay(MySocket, PeerSocket, Shaper) ->
case gen_tcp:recv(MySocket, 0) of
{ok, Data} ->
gen_tcp:send(PeerSocket, Data),
{NewShaper, Pause} = shaper:update(Shaper, size(Data)),
if
Pause > 0 -> timer:sleep(Pause);
true -> pass
end,
relay(MySocket, PeerSocket, NewShaper);
_ ->
stopped
end.
%%%------------------------
%%% Auxiliary functions
%%%------------------------
select_auth_method(plain, AuthMethods) ->
case lists:member(?AUTH_PLAIN, AuthMethods) of
true -> ?AUTH_PLAIN;
false -> ?AUTH_NO_METHODS
end;
select_auth_method(anonymous, AuthMethods) ->
case lists:member(?AUTH_ANONYMOUS, AuthMethods) of
true -> ?AUTH_ANONYMOUS;
false -> ?AUTH_NO_METHODS
end.
%% Obviously, we must use shaper with maximum rate.
find_maxrate(Shaper, JID1, JID2, Host) ->
MaxRate1 = shaper:new(acl:match_rule(Host, Shaper, JID1)),
MaxRate2 = shaper:new(acl:match_rule(Host, Shaper, JID2)),
if
MaxRate1 == none; MaxRate2 == none ->
none;
true ->
lists:max([MaxRate1, MaxRate2])
end.