mirror of
https://github.com/processone/ejabberd.git
synced 2024-11-26 16:26:24 +01:00
dc3899dfd9
The original size of 65535 confused clients with tight shapers (thanks to Evgeniy Khramtsov) SVN Revision: 2097
291 lines
8.8 KiB
Erlang
291 lines
8.8 KiB
Erlang
%%%----------------------------------------------------------------------
|
|
%%% File : mod_proxy65_stream.erl
|
|
%%% Author : Evgeniy Khramtsov <xram@jabber.ru>
|
|
%%% Purpose : Bytestream process.
|
|
%%% Created : 12 Oct 2006 by Evgeniy Khramtsov <xram@jabber.ru>
|
|
%%%
|
|
%%% ejabberd, Copyright (C) 2002-2009 ProcessOne
|
|
%%%
|
|
%%% This program is free software; you can redistribute it and/or
|
|
%%% modify it under the terms of the GNU General Public License as
|
|
%%% published by the Free Software Foundation; either version 2 of the
|
|
%%% License, or (at your option) any later version.
|
|
%%%
|
|
%%% This program is distributed in the hope that it will be useful,
|
|
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
%%% General Public License for more details.
|
|
%%%
|
|
%%% You should have received a copy of the GNU General Public License
|
|
%%% along with this program; if not, write to the Free Software
|
|
%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
|
|
%%% 02111-1307 USA
|
|
%%%
|
|
%%%----------------------------------------------------------------------
|
|
|
|
-module(mod_proxy65_stream).
|
|
-author('xram@jabber.ru').
|
|
|
|
-behaviour(gen_fsm).
|
|
|
|
%% gen_fsm callbacks.
|
|
-export([
|
|
init/1,
|
|
handle_event/3,
|
|
handle_sync_event/4,
|
|
code_change/4,
|
|
handle_info/3,
|
|
terminate/3
|
|
]).
|
|
|
|
%% gen_fsm states.
|
|
-export([
|
|
wait_for_init/2,
|
|
wait_for_auth/2,
|
|
wait_for_request/2,
|
|
wait_for_activation/2,
|
|
stream_established/2
|
|
]).
|
|
|
|
%% API.
|
|
-export([
|
|
start/2,
|
|
stop/1,
|
|
start_link/3,
|
|
activate/2,
|
|
relay/3,
|
|
socket_type/0
|
|
]).
|
|
|
|
-include("mod_proxy65.hrl").
|
|
-include("ejabberd.hrl").
|
|
|
|
-define(WAIT_TIMEOUT, 60000). %% 1 minute (is it enough?)
|
|
|
|
-record(state, {
|
|
socket, %% TCP socket
|
|
timer, %% timer reference
|
|
sha1, %% SHA1 key
|
|
host, %% virtual host
|
|
auth_type, %% authentication type: anonymous or plain
|
|
shaper %% Shaper name
|
|
}).
|
|
|
|
%% Unused callbacks
|
|
handle_event(_Event, StateName, StateData) ->
|
|
{next_state, StateName, StateData}.
|
|
code_change(_OldVsn, StateName, StateData, _Extra) ->
|
|
{ok, StateName, StateData}.
|
|
%%-------------------------------
|
|
|
|
start({gen_tcp, Socket}, Opts1) ->
|
|
{[Host], Opts} = lists:partition(fun(O) -> is_list(O) end, Opts1),
|
|
Supervisor = gen_mod:get_module_proc(Host, ejabberd_mod_proxy65_sup),
|
|
supervisor:start_child(Supervisor, [Socket, Host, Opts]).
|
|
|
|
start_link(Socket, Host, Opts) ->
|
|
gen_fsm:start_link(?MODULE, [Socket, Host, Opts], []).
|
|
|
|
init([Socket, Host, Opts]) ->
|
|
process_flag(trap_exit, true),
|
|
AuthType = gen_mod:get_opt(auth_type, Opts, anonymous),
|
|
Shaper = gen_mod:get_opt(shaper, Opts, none),
|
|
RecvBuf = gen_mod:get_opt(recbuf, Opts, 8192),
|
|
SendBuf = gen_mod:get_opt(sndbuf, Opts, 8192),
|
|
TRef = erlang:send_after(?WAIT_TIMEOUT, self(), stop),
|
|
inet:setopts(Socket, [{active, true}, {recbuf, RecvBuf}, {sndbuf, SendBuf}]),
|
|
{ok, wait_for_init, #state{host = Host,
|
|
auth_type = AuthType,
|
|
socket = Socket,
|
|
shaper = Shaper,
|
|
timer = TRef}}.
|
|
|
|
terminate(_Reason, StateName, #state{sha1=SHA1}) ->
|
|
catch mod_proxy65_sm:unregister_stream(SHA1),
|
|
if StateName == stream_established ->
|
|
?INFO_MSG("Bytestream terminated", []);
|
|
true ->
|
|
ok
|
|
end.
|
|
|
|
%%%------------------------------
|
|
%%% API.
|
|
%%%------------------------------
|
|
socket_type() ->
|
|
raw.
|
|
|
|
stop(StreamPid) ->
|
|
StreamPid ! stop.
|
|
|
|
activate({P1, J1}, {P2, J2}) ->
|
|
case catch {gen_fsm:sync_send_all_state_event(P1, get_socket),
|
|
gen_fsm:sync_send_all_state_event(P2, get_socket)} of
|
|
{S1, S2} when is_port(S1), is_port(S2) ->
|
|
P1 ! {activate, P2, S2, J1, J2},
|
|
P2 ! {activate, P1, S1, J1, J2},
|
|
JID1 = exmpp_jid:jid_to_list(J1),
|
|
JID2 = exmpp_jid:jid_to_list(J2),
|
|
?INFO_MSG("(~w:~w) Activated bytestream for ~s -> ~s", [P1, P2, JID1, JID2]),
|
|
ok;
|
|
_ ->
|
|
error
|
|
end.
|
|
|
|
%%%-----------------------
|
|
%%% States
|
|
%%%-----------------------
|
|
wait_for_init(Packet, #state{socket=Socket, auth_type=AuthType} = StateData) ->
|
|
case mod_proxy65_lib:unpack_init_message(Packet) of
|
|
{ok, AuthMethods} ->
|
|
Method = select_auth_method(AuthType, AuthMethods),
|
|
gen_tcp:send(Socket, mod_proxy65_lib:make_init_reply(Method)),
|
|
case Method of
|
|
?AUTH_ANONYMOUS ->
|
|
{next_state, wait_for_request, StateData};
|
|
?AUTH_PLAIN ->
|
|
{next_state, wait_for_auth, StateData};
|
|
?AUTH_NO_METHODS ->
|
|
{stop, normal, StateData}
|
|
end;
|
|
error ->
|
|
{stop, normal, StateData}
|
|
end.
|
|
|
|
wait_for_auth(Packet, #state{socket=Socket, host=Host} = StateData) ->
|
|
case mod_proxy65_lib:unpack_auth_request(Packet) of
|
|
{User, Pass} ->
|
|
Result = ejabberd_auth:check_password(User, Host, Pass),
|
|
gen_tcp:send(Socket, mod_proxy65_lib:make_auth_reply(Result)),
|
|
case Result of
|
|
true ->
|
|
{next_state, wait_for_request, StateData};
|
|
false ->
|
|
{stop, normal, StateData}
|
|
end;
|
|
_ ->
|
|
{stop, normal, StateData}
|
|
end.
|
|
|
|
wait_for_request(Packet, #state{socket=Socket} = StateData) ->
|
|
Request = mod_proxy65_lib:unpack_request(Packet),
|
|
case Request of
|
|
#s5_request{sha1=SHA1, cmd=connect} ->
|
|
case catch mod_proxy65_sm:register_stream(SHA1) of
|
|
{atomic, ok} ->
|
|
inet:setopts(Socket, [{active, false}]),
|
|
gen_tcp:send(Socket, mod_proxy65_lib:make_reply(Request)),
|
|
{next_state, wait_for_activation, StateData#state{sha1=SHA1}};
|
|
_ ->
|
|
Err = mod_proxy65_lib:make_error_reply(Request),
|
|
gen_tcp:send(Socket, Err),
|
|
{stop, normal, StateData}
|
|
end;
|
|
#s5_request{cmd=udp} ->
|
|
Err = mod_proxy65_lib:make_error_reply(Request, ?ERR_COMMAND_NOT_SUPPORTED),
|
|
gen_tcp:send(Socket, Err),
|
|
{stop, normal, StateData};
|
|
_ ->
|
|
{stop, normal, StateData}
|
|
end.
|
|
|
|
wait_for_activation(_Data, StateData) ->
|
|
{next_state, wait_for_activation, StateData}.
|
|
|
|
stream_established(_Data, StateData) ->
|
|
{next_state, stream_established, StateData}.
|
|
|
|
%%%-----------------------
|
|
%%% Callbacks processing
|
|
%%%-----------------------
|
|
|
|
%% SOCKS5 packets.
|
|
handle_info({tcp, _S, Data}, StateName, StateData)
|
|
when StateName /= wait_for_activation ->
|
|
erlang:cancel_timer(StateData#state.timer),
|
|
TRef = erlang:send_after(?WAIT_TIMEOUT, self(), stop),
|
|
gen_fsm:send_event(self(), Data),
|
|
{next_state, StateName, StateData#state{timer=TRef}};
|
|
|
|
%% Activation message.
|
|
handle_info({activate, PeerPid, PeerSocket, IJid, TJid},
|
|
wait_for_activation, StateData) ->
|
|
erlang:monitor(process, PeerPid),
|
|
erlang:cancel_timer(StateData#state.timer),
|
|
MySocket = StateData#state.socket,
|
|
Shaper = StateData#state.shaper,
|
|
Host = StateData#state.host,
|
|
MaxRate = find_maxrate(Shaper, IJid, TJid, Host),
|
|
spawn_link(?MODULE, relay, [MySocket, PeerSocket, MaxRate]),
|
|
{next_state, stream_established, StateData};
|
|
|
|
%% Socket closed
|
|
handle_info({tcp_closed, _Socket}, _StateName, StateData) ->
|
|
{stop, normal, StateData};
|
|
handle_info({tcp_error, _Socket, _Reason}, _StateName, StateData) ->
|
|
{stop, normal, StateData};
|
|
|
|
%% Got stop message.
|
|
handle_info(stop, _StateName, StateData) ->
|
|
{stop, normal, StateData};
|
|
|
|
%% Either linked process or peer process died.
|
|
handle_info({'EXIT',_,_}, _StateName, StateData) ->
|
|
{stop, normal, StateData};
|
|
handle_info({'DOWN',_,_,_,_}, _StateName, StateData) ->
|
|
{stop, normal, StateData};
|
|
|
|
%% Packets of no interest
|
|
handle_info(_Info, StateName, StateData) ->
|
|
{next_state, StateName, StateData}.
|
|
|
|
%% Socket request.
|
|
handle_sync_event(get_socket, _From, wait_for_activation, StateData) ->
|
|
Socket = StateData#state.socket,
|
|
{reply, Socket, wait_for_activation, StateData};
|
|
|
|
handle_sync_event(_Event, _From, StateName, StateData) ->
|
|
{reply, error, StateName, StateData}.
|
|
|
|
%%%-------------------------------------------------
|
|
%%% Relay Process.
|
|
%%%-------------------------------------------------
|
|
relay(MySocket, PeerSocket, Shaper) ->
|
|
case gen_tcp:recv(MySocket, 0) of
|
|
{ok, Data} ->
|
|
gen_tcp:send(PeerSocket, Data),
|
|
{NewShaper, Pause} = shaper:update(Shaper, size(Data)),
|
|
if
|
|
Pause > 0 -> timer:sleep(Pause);
|
|
true -> pass
|
|
end,
|
|
relay(MySocket, PeerSocket, NewShaper);
|
|
_ ->
|
|
stopped
|
|
end.
|
|
|
|
%%%------------------------
|
|
%%% Auxiliary functions
|
|
%%%------------------------
|
|
select_auth_method(plain, AuthMethods) ->
|
|
case lists:member(?AUTH_PLAIN, AuthMethods) of
|
|
true -> ?AUTH_PLAIN;
|
|
false -> ?AUTH_NO_METHODS
|
|
end;
|
|
|
|
select_auth_method(anonymous, AuthMethods) ->
|
|
case lists:member(?AUTH_ANONYMOUS, AuthMethods) of
|
|
true -> ?AUTH_ANONYMOUS;
|
|
false -> ?AUTH_NO_METHODS
|
|
end.
|
|
|
|
%% Obviously, we must use shaper with maximum rate.
|
|
find_maxrate(Shaper, JID1, JID2, Host) ->
|
|
MaxRate1 = shaper:new(acl:match_rule(Host, Shaper, JID1)),
|
|
MaxRate2 = shaper:new(acl:match_rule(Host, Shaper, JID2)),
|
|
if
|
|
MaxRate1 == none; MaxRate2 == none ->
|
|
none;
|
|
true ->
|
|
lists:max([MaxRate1, MaxRate2])
|
|
end.
|