%%%---------------------------------------------------------------------- %%% File : mod_proxy65_stream.erl %%% Author : Evgeniy Khramtsov %%% Purpose : Bytestream process. %%% Created : 12 Oct 2006 by Evgeniy Khramtsov %%% %%% ejabberd, Copyright (C) 2002-2007 Process-one %%% %%% This program is free software; you can redistribute it and/or %%% modify it under the terms of the GNU General Public License as %%% published by the Free Software Foundation; either version 2 of the %%% License, or (at your option) any later version. %%% %%% This program is distributed in the hope that it will be useful, %%% but WITHOUT ANY WARRANTY; without even the implied warranty of %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU %%% General Public License for more details. %%% %%% You should have received a copy of the GNU General Public License %%% along with this program; if not, write to the Free Software %%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA %%% 02111-1307 USA %%% %%%---------------------------------------------------------------------- -module(mod_proxy65_stream). -author('xram@jabber.ru'). -behaviour(gen_fsm). %% gen_fsm callbacks. -export([ init/1, handle_event/3, handle_sync_event/4, code_change/4, handle_info/3, terminate/3 ]). %% gen_fsm states. -export([ wait_for_init/2, wait_for_auth/2, wait_for_request/2, wait_for_activation/2, stream_established/2 ]). %% API. -export([ start/2, stop/1, start_link/3, activate/2, relay/3, socket_type/0 ]). -include("mod_proxy65.hrl"). -include("../ejabberd.hrl"). -define(WAIT_TIMEOUT, 60000). %% 1 minute (is it enough?) -record(state, { socket, %% TCP socket timer, %% timer reference sha1, %% SHA1 key host, %% virtual host auth_type, %% authentication type: anonymous or plain shaper %% Shaper name }). %% Unused callbacks handle_event(_Event, StateName, StateData) -> {next_state, StateName, StateData}. code_change(_OldVsn, StateName, StateData, _Extra) -> {ok, StateName, StateData}. %%------------------------------- start({gen_tcp, Socket}, [Host | Opts]) -> Supervisor = gen_mod:get_module_proc(Host, ejabberd_mod_proxy65_sup), supervisor:start_child(Supervisor, [Socket, Host, Opts]). start_link(Socket, Host, Opts) -> gen_fsm:start_link(?MODULE, [Socket, Host, Opts], []). init([Socket, Host, Opts]) -> process_flag(trap_exit, true), AuthType = gen_mod:get_opt(auth_type, Opts, anonymous), Shaper = gen_mod:get_opt(shaper, Opts, none), RecvBuf = gen_mod:get_opt(recbuf, Opts, 65535), SendBuf = gen_mod:get_opt(sndbuf, Opts, 65535), TRef = erlang:send_after(?WAIT_TIMEOUT, self(), stop), inet:setopts(Socket, [{active, true}, {recbuf, RecvBuf}, {sndbuf, SendBuf}]), {ok, wait_for_init, #state{host = Host, auth_type = AuthType, socket = Socket, shaper = Shaper, timer = TRef}}. terminate(_Reason, StateName, #state{sha1=SHA1}) -> catch mod_proxy65_sm:unregister_stream(SHA1), if StateName == stream_established -> ?INFO_MSG("Bytestream terminated", []); true -> ok end. %%%------------------------------ %%% API. %%%------------------------------ socket_type() -> raw. stop(StreamPid) -> StreamPid ! stop. activate({P1, J1}, {P2, J2}) -> case catch {gen_fsm:sync_send_all_state_event(P1, get_socket), gen_fsm:sync_send_all_state_event(P2, get_socket)} of {S1, S2} when is_port(S1), is_port(S2) -> P1 ! {activate, P2, S2, J1, J2}, P2 ! {activate, P1, S1, J1, J2}, JID1 = jlib:jid_to_string(J1), JID2 = jlib:jid_to_string(J2), ?INFO_MSG("(~w:~w) Activated bytestream for ~s -> ~s", [P1, P2, JID1, JID2]), ok; _ -> error end. %%%----------------------- %%% States %%%----------------------- wait_for_init(Packet, #state{socket=Socket, auth_type=AuthType} = StateData) -> case mod_proxy65_lib:unpack_init_message(Packet) of {ok, AuthMethods} -> Method = select_auth_method(AuthType, AuthMethods), gen_tcp:send(Socket, mod_proxy65_lib:make_init_reply(Method)), case Method of ?AUTH_ANONYMOUS -> {next_state, wait_for_request, StateData}; ?AUTH_PLAIN -> {next_state, wait_for_auth, StateData}; ?AUTH_NO_METHODS -> {stop, normal, StateData} end; error -> {stop, normal, StateData} end. wait_for_auth(Packet, #state{socket=Socket, host=Host} = StateData) -> case mod_proxy65_lib:unpack_auth_request(Packet) of {User, Pass} -> Result = ejabberd_auth:check_password(User, Host, Pass), gen_tcp:send(Socket, mod_proxy65_lib:make_auth_reply(Result)), case Result of true -> {next_state, wait_for_request, StateData}; false -> {stop, normal, StateData} end; _ -> {stop, normal, StateData} end. wait_for_request(Packet, #state{socket=Socket} = StateData) -> Request = mod_proxy65_lib:unpack_request(Packet), case Request of #s5_request{sha1=SHA1, cmd=connect} -> case catch mod_proxy65_sm:register_stream(SHA1) of {atomic, ok} -> inet:setopts(Socket, [{active, false}]), gen_tcp:send(Socket, mod_proxy65_lib:make_reply()), {next_state, wait_for_activation, StateData#state{sha1=SHA1}}; _ -> Err = mod_proxy65_lib:make_error_reply(Request), gen_tcp:send(Socket, Err), {stop, normal, StateData} end; #s5_request{cmd=udp} -> Err = mod_proxy65_lib:make_error_reply(Request, ?ERR_COMMAND_NOT_SUPPORTED), gen_tcp:send(Socket, Err), {stop, normal, StateData}; _ -> {stop, normal, StateData} end. wait_for_activation(_Data, StateData) -> {next_state, wait_for_activation, StateData}. stream_established(_Data, StateData) -> {next_state, stream_established, StateData}. %%%----------------------- %%% Callbacks processing %%%----------------------- %% SOCKS5 packets. handle_info({tcp, _S, Data}, StateName, StateData) when StateName /= wait_for_activation -> erlang:cancel_timer(StateData#state.timer), TRef = erlang:send_after(?WAIT_TIMEOUT, self(), stop), gen_fsm:send_event(self(), Data), {next_state, StateName, StateData#state{timer=TRef}}; %% Activation message. handle_info({activate, PeerPid, PeerSocket, IJid, TJid}, wait_for_activation, StateData) -> erlang:monitor(process, PeerPid), erlang:cancel_timer(StateData#state.timer), MySocket = StateData#state.socket, Shaper = StateData#state.shaper, Host = StateData#state.host, MaxRate = find_maxrate(Shaper, IJid, TJid, Host), spawn_link(?MODULE, relay, [MySocket, PeerSocket, MaxRate]), {next_state, stream_established, StateData}; %% Socket closed handle_info({tcp_closed, _Socket}, _StateName, StateData) -> {stop, normal, StateData}; handle_info({tcp_error, _Socket, _Reason}, _StateName, StateData) -> {stop, normal, StateData}; %% Got stop message. handle_info(stop, _StateName, StateData) -> {stop, normal, StateData}; %% Either linked process or peer process died. handle_info({'EXIT',_,_}, _StateName, StateData) -> {stop, normal, StateData}; handle_info({'DOWN',_,_,_,_}, _StateName, StateData) -> {stop, normal, StateData}; %% Packets of no interest handle_info(_Info, StateName, StateData) -> {next_state, StateName, StateData}. %% Socket request. handle_sync_event(get_socket, _From, wait_for_activation, StateData) -> Socket = StateData#state.socket, {reply, Socket, wait_for_activation, StateData}; handle_sync_event(_Event, _From, StateName, StateData) -> {reply, error, StateName, StateData}. %%%------------------------------------------------- %%% Relay Process. %%%------------------------------------------------- relay(MySocket, PeerSocket, Shaper) -> case gen_tcp:recv(MySocket, 0) of {ok, Data} -> gen_tcp:send(PeerSocket, Data), {NewShaper, Pause} = shaper:update(Shaper, size(Data)), if Pause > 0 -> timer:sleep(Pause); true -> pass end, relay(MySocket, PeerSocket, NewShaper); _ -> stopped end. %%%------------------------ %%% Auxiliary functions %%%------------------------ select_auth_method(plain, AuthMethods) -> case lists:member(?AUTH_PLAIN, AuthMethods) of true -> ?AUTH_PLAIN; false -> ?AUTH_NO_METHODS end; select_auth_method(anonymous, AuthMethods) -> case lists:member(?AUTH_ANONYMOUS, AuthMethods) of true -> ?AUTH_ANONYMOUS; false -> ?AUTH_NO_METHODS end. %% Obviously, we must use shaper with maximum rate. find_maxrate(Shaper, JID1, JID2, Host) -> MaxRate1 = shaper:new(acl:match_rule(Host, Shaper, JID1)), MaxRate2 = shaper:new(acl:match_rule(Host, Shaper, JID2)), if MaxRate1 == none; MaxRate2 == none -> none; true -> lists:max([MaxRate1, MaxRate2]) end.