25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-11-22 16:20:52 +01:00

* src/ejabberd_frontend_socket.erl: Support for frontend

connection manager
* src/ejabberd_c2s.erl: Likewise
* src/ejabberd_listener.erl: Likewise
* src/ejabberd_s2s_in.erl: Likewise
* src/ejabberd_service.erl: Likewise
* src/ejabberd_socket.erl: Likewise
* src/web/ejabberd_http_poll.erl: Likewise

SVN Revision: 657
This commit is contained in:
Alexey Shchepin 2006-10-01 01:53:37 +00:00
parent e4b3da37ef
commit c72599cacd
8 changed files with 343 additions and 56 deletions

View File

@ -1,3 +1,14 @@
2006-10-01 Alexey Shchepin <alexey@sevcom.net>
* src/ejabberd_frontend_socket.erl: Support for frontend
connection manager
* src/ejabberd_c2s.erl: Likewise
* src/ejabberd_listener.erl: Likewise
* src/ejabberd_s2s_in.erl: Likewise
* src/ejabberd_service.erl: Likewise
* src/ejabberd_socket.erl: Likewise
* src/web/ejabberd_http_poll.erl: Likewise
2006-09-27 Mickael Remond <mickael.remond@process-one.net> 2006-09-27 Mickael Remond <mickael.remond@process-one.net>
* doc/release_notes_1.1.2.txt: Minor fixes. * doc/release_notes_1.1.2.txt: Minor fixes.

View File

@ -41,6 +41,7 @@
-define(SETS, gb_sets). -define(SETS, gb_sets).
-record(state, {socket, -record(state, {socket,
sockmod,
streamid, streamid,
sasl_state, sasl_state,
access, access,
@ -117,7 +118,7 @@ get_presence(FsmRef) ->
%% ignore | %% ignore |
%% {stop, StopReason} %% {stop, StopReason}
%%---------------------------------------------------------------------- %%----------------------------------------------------------------------
init([Socket, Opts]) -> init([{SockMod, Socket}, Opts]) ->
Access = case lists:keysearch(access, 1, Opts) of Access = case lists:keysearch(access, 1, Opts) of
{value, {_, A}} -> A; {value, {_, A}} -> A;
_ -> all _ -> all
@ -137,11 +138,12 @@ init([Socket, Opts]) ->
Socket1 = Socket1 =
if if
TLSEnabled -> TLSEnabled ->
ejabberd_socket:starttls(Socket, TLSOpts); SockMod:starttls(Socket, TLSOpts);
true -> true ->
Socket Socket
end, end,
{ok, wait_for_stream, #state{socket = Socket1, {ok, wait_for_stream, #state{socket = Socket1,
sockmod = SockMod,
zlib = Zlib, zlib = Zlib,
tls = TLS, tls = TLS,
tls_required = StartTLSRequired, tls_required = StartTLSRequired,
@ -198,7 +200,8 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
{xmlelement, "mechanism", [], {xmlelement, "mechanism", [],
[{xmlcdata, S}]} [{xmlcdata, S}]}
end, cyrsasl:listmech(Server)), end, cyrsasl:listmech(Server)),
SockMod = ejabberd_socket:get_sockmod( SockMod =
(StateData#state.sockmod):get_sockmod(
StateData#state.socket), StateData#state.socket),
Zlib = StateData#state.zlib, Zlib = StateData#state.zlib,
CompressFeature = CompressFeature =
@ -453,7 +456,7 @@ wait_for_feature_request({xmlstreamelement, El}, StateData) ->
TLS = StateData#state.tls, TLS = StateData#state.tls,
TLSEnabled = StateData#state.tls_enabled, TLSEnabled = StateData#state.tls_enabled,
TLSRequired = StateData#state.tls_required, TLSRequired = StateData#state.tls_required,
SockMod = ejabberd_socket:get_sockmod(StateData#state.socket), SockMod = (StateData#state.sockmod):get_sockmod(StateData#state.socket),
case {xml:get_attr_s("xmlns", Attrs), Name} of case {xml:get_attr_s("xmlns", Attrs), Name} of
{?NS_SASL, "auth"} when not ((SockMod == gen_tcp) and TLSRequired) -> {?NS_SASL, "auth"} when not ((SockMod == gen_tcp) and TLSRequired) ->
Mech = xml:get_attr_s("mechanism", Attrs), Mech = xml:get_attr_s("mechanism", Attrs),
@ -462,7 +465,8 @@ wait_for_feature_request({xmlstreamelement, El}, StateData) ->
Mech, Mech,
ClientIn) of ClientIn) of
{ok, Props} -> {ok, Props} ->
ejabberd_socket:reset_stream(StateData#state.socket), (StateData#state.sockmod):reset_stream(
StateData#state.socket),
send_element(StateData, send_element(StateData,
{xmlelement, "success", {xmlelement, "success",
[{"xmlns", ?NS_SASL}], []}), [{"xmlns", ?NS_SASL}], []}),
@ -502,9 +506,10 @@ wait_for_feature_request({xmlstreamelement, El}, StateData) ->
certfile, 1, StateData#state.tls_options)] certfile, 1, StateData#state.tls_options)]
end, end,
Socket = StateData#state.socket, Socket = StateData#state.socket,
TLSSocket = ejabberd_socket:starttls(Socket, TLSOpts), TLSSocket = (StateData#state.sockmod):starttls(
send_element(StateData, Socket, TLSOpts,
{xmlelement, "proceed", [{"xmlns", ?NS_TLS}], []}), xml:element_to_string(
{xmlelement, "proceed", [{"xmlns", ?NS_TLS}], []})),
{next_state, wait_for_stream, {next_state, wait_for_stream,
StateData#state{socket = TLSSocket, StateData#state{socket = TLSSocket,
streamid = new_id(), streamid = new_id(),
@ -523,10 +528,11 @@ wait_for_feature_request({xmlstreamelement, El}, StateData) ->
case xml:get_tag_cdata(Method) of case xml:get_tag_cdata(Method) of
"zlib" -> "zlib" ->
Socket = StateData#state.socket, Socket = StateData#state.socket,
ZlibSocket = ejabberd_socket:compress(Socket), ZlibSocket = (StateData#state.sockmod):compress(
send_element(StateData, Socket,
xml:element_to_string(
{xmlelement, "compressed", {xmlelement, "compressed",
[{"xmlns", ?NS_COMPRESS}], []}), [{"xmlns", ?NS_COMPRESS}], []})),
{next_state, wait_for_stream, {next_state, wait_for_stream,
StateData#state{socket = ZlibSocket, StateData#state{socket = ZlibSocket,
streamid = new_id() streamid = new_id()
@ -575,7 +581,8 @@ wait_for_sasl_response({xmlstreamelement, El}, StateData) ->
case cyrsasl:server_step(StateData#state.sasl_state, case cyrsasl:server_step(StateData#state.sasl_state,
ClientIn) of ClientIn) of
{ok, Props} -> {ok, Props} ->
ejabberd_socket:reset_stream(StateData#state.socket), (StateData#state.sockmod):reset_stream(
StateData#state.socket),
send_element(StateData, send_element(StateData,
{xmlelement, "success", {xmlelement, "success",
[{"xmlns", ?NS_SASL}], []}), [{"xmlns", ?NS_SASL}], []}),
@ -1167,7 +1174,7 @@ terminate(_Reason, StateName, StateData) ->
_ -> _ ->
ok ok
end, end,
ejabberd_socket:close(StateData#state.socket), (StateData#state.sockmod):close(StateData#state.socket),
ok. ok.
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
@ -1177,11 +1184,11 @@ terminate(_Reason, StateName, StateData) ->
change_shaper(StateData, JID) -> change_shaper(StateData, JID) ->
Shaper = acl:match_rule(StateData#state.server, Shaper = acl:match_rule(StateData#state.server,
StateData#state.shaper, JID), StateData#state.shaper, JID),
ejabberd_socket:change_shaper(StateData#state.socket, Shaper). (StateData#state.sockmod):change_shaper(StateData#state.socket, Shaper).
send_text(StateData, Text) -> send_text(StateData, Text) ->
?DEBUG("Send XML on stream = ~p", [lists:flatten(Text)]), ?DEBUG("Send XML on stream = ~p", [lists:flatten(Text)]),
ejabberd_socket:send(StateData#state.socket, Text). (StateData#state.sockmod):send(StateData#state.socket, Text).
send_element(StateData, El) -> send_element(StateData, El) ->
send_text(StateData, xml:element_to_string(El)). send_text(StateData, xml:element_to_string(El)).

View File

@ -0,0 +1,249 @@
%%%-------------------------------------------------------------------
%%% File : ejabberd_frontend_socket.erl
%%% Author : Alexey Shchepin <alexey@process-one.net>
%%% Purpose : Frontend socket with zlib and TLS support library
%%% Created : 23 Aug 2006 by Alexey Shchepin <alex@alex.sevcom.net>
%%% Id : $Id$
%%%----------------------------------------------------------------------
-module(ejabberd_frontend_socket).
-author('alexey@process-one.net').
-behaviour(gen_server).
%% API
-export([start/4,
start_link/4,
%connect/3,
starttls/2,
starttls/3,
compress/1,
compress/2,
reset_stream/1,
send/2,
change_shaper/2,
get_sockmod/1,
get_peer_certificate/1,
get_verify_result/1,
close/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {sockmod, socket, receiver}).
%%====================================================================
%% API
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link(Module, SockMod, Socket, Opts) ->
gen_server:start_link(?MODULE,
[Module, SockMod, Socket, Opts], []).
start(Module, SockMod, Socket, Opts) ->
case Module:socket_type() of
xml_stream ->
MaxStanzaSize =
case lists:keysearch(max_stanza_size, 1, Opts) of
{value, {_, Size}} -> Size;
_ -> infinity
end,
Receiver = ejabberd_receiver:start(Socket, SockMod, none, MaxStanzaSize),
case SockMod:controlling_process(Socket, Receiver) of
ok ->
ok;
{error, _Reason} ->
SockMod:close(Socket)
end,
gen_server:start(?MODULE,
[Module, SockMod, Socket, Opts, Receiver], []);
raw ->
%{ok, Pid} = Module:start({SockMod, Socket}, Opts),
%case SockMod:controlling_process(Socket, Pid) of
% ok ->
% ok;
% {error, _Reason} ->
% SockMod:close(Socket)
%end
todo
end.
starttls(FsmRef, TLSOpts) ->
gen_server:call(FsmRef, {starttls, TLSOpts}),
FsmRef.
starttls(FsmRef, TLSOpts, Data) ->
gen_server:call(FsmRef, {starttls, TLSOpts, Data}),
FsmRef.
compress(FsmRef) ->
gen_server:call(FsmRef, compress),
FsmRef.
compress(FsmRef, Data) ->
gen_server:call(FsmRef, {compress, Data}),
FsmRef.
reset_stream(FsmRef) ->
gen_server:call(FsmRef, reset_stream).
send(FsmRef, Data) ->
gen_server:call(FsmRef, {send, Data}).
change_shaper(FsmRef, Shaper) ->
gen_server:call(FsmRef, {change_shaper, Shaper}).
get_sockmod(FsmRef) ->
gen_server:call(FsmRef, get_sockmod).
get_peer_certificate(FsmRef) ->
gen_server:call(FsmRef, get_peer_certificate).
get_verify_result(FsmRef) ->
gen_server:call(FsmRef, get_verify_result).
close(FsmRef) ->
gen_server:call(FsmRef, close).
%%====================================================================
%% gen_server callbacks
%%====================================================================
%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([Module, SockMod, Socket, Opts, Receiver]) ->
Nodes = mnesia:table_info(schema, disc_copies),
Node = lists:nth(erlang:phash(now(), length(Nodes)), Nodes),
{ok, Pid} =
rpc:call(Node, Module, start, [{?MODULE, self()}, Opts]),
ejabberd_receiver:become_controller(Receiver, Pid),
{ok, #state{sockmod = SockMod,
socket = Socket,
receiver = Receiver}}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
handle_call({starttls, TLSOpts}, _From, State) ->
{ok, TLSSocket} = tls:tcp_to_tls(State#state.socket, TLSOpts),
ejabberd_receiver:starttls(State#state.receiver, TLSSocket),
Reply = ok,
{reply, Reply, State#state{socket = TLSSocket, sockmod = tls}};
handle_call({starttls, TLSOpts, Data}, _From, State) ->
{ok, TLSSocket} = tls:tcp_to_tls(State#state.socket, TLSOpts),
ejabberd_receiver:starttls(State#state.receiver, TLSSocket),
catch (State#state.sockmod):send(
State#state.socket, Data),
Reply = ok,
{reply, Reply, State#state{socket = TLSSocket, sockmod = tls}};
handle_call(compress, _From, State) ->
{ok, ZlibSocket} = ejabberd_zlib:enable_zlib(
State#state.sockmod,
State#state.socket),
ejabberd_receiver:compress(State#state.receiver, ZlibSocket),
Reply = ok,
{reply, Reply, State#state{socket = ZlibSocket, sockmod = ejabberd_zlib}};
handle_call({compress, Data}, _From, State) ->
{ok, ZlibSocket} = ejabberd_zlib:enable_zlib(
State#state.sockmod,
State#state.socket),
ejabberd_receiver:compress(State#state.receiver, ZlibSocket),
catch (State#state.sockmod):send(
State#state.socket, Data),
Reply = ok,
{reply, Reply, State#state{socket = ZlibSocket, sockmod = ejabberd_zlib}};
handle_call(reset_stream, _From, State) ->
ejabberd_receiver:reset_stream(State#state.receiver),
Reply = ok,
{reply, Reply, State};
handle_call({send, Data}, _From, State) ->
catch (State#state.sockmod):send(
State#state.socket, Data),
Reply = ok,
{reply, Reply, State};
handle_call({change_shaper, Shaper}, _From, State) ->
ejabberd_receiver:change_shaper(State#state.receiver, Shaper),
Reply = ok,
{reply, Reply, State};
handle_call(get_sockmod, _From, State) ->
Reply = State#state.sockmod,
{reply, Reply, State};
handle_call(get_peer_certificate, _From, State) ->
Reply = tls:get_peer_certificate(State#state.socket),
{reply, Reply, State};
handle_call(get_verify_result, _From, State) ->
Reply = tls:get_verify_result(State#state.socket),
{reply, Reply, State};
handle_call(close, _From, State) ->
ejabberd_receiver:close(State#state.receiver),
Reply = ok,
{stop, normal, Reply, State};
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast(_Msg, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> void()
%% Description: This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------

View File

@ -93,9 +93,8 @@ accept(ListenSocket, Module, Opts) ->
ok ok
end, end,
case Module of case Module of
ejabberd_http -> {frontend, Mod} ->
{ok, Pid} = Module:start({gen_tcp, Socket}, Opts), ejabberd_frontend_socket:start(Mod, gen_tcp, Socket, Opts);
catch gen_tcp:controlling_process(Socket, Pid);
_ -> _ ->
ejabberd_socket:start(Module, gen_tcp, Socket, Opts) ejabberd_socket:start(Module, gen_tcp, Socket, Opts)
end, end,

View File

@ -37,6 +37,7 @@
-define(DICT, dict). -define(DICT, dict).
-record(state, {socket, -record(state, {socket,
sockmod,
streamid, streamid,
shaper, shaper,
tls = false, tls = false,
@ -99,7 +100,7 @@ socket_type() ->
%% ignore | %% ignore |
%% {stop, StopReason} %% {stop, StopReason}
%%---------------------------------------------------------------------- %%----------------------------------------------------------------------
init([Socket, Opts]) -> init([{SockMod, Socket}, Opts]) ->
?INFO_MSG("started: ~p", [Socket]), ?INFO_MSG("started: ~p", [Socket]),
Shaper = case lists:keysearch(shaper, 1, Opts) of Shaper = case lists:keysearch(shaper, 1, Opts) of
{value, {_, S}} -> S; {value, {_, S}} -> S;
@ -120,6 +121,7 @@ init([Socket, Opts]) ->
Timer = erlang:start_timer(?S2STIMEOUT, self(), []), Timer = erlang:start_timer(?S2STIMEOUT, self(), []),
{ok, wait_for_stream, {ok, wait_for_stream,
#state{socket = Socket, #state{socket = Socket,
sockmod = SockMod,
streamid = new_id(), streamid = new_id(),
shaper = Shaper, shaper = Shaper,
tls = StartTLS, tls = StartTLS,
@ -144,10 +146,10 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
SASL = SASL =
if if
StateData#state.tls_enabled -> StateData#state.tls_enabled ->
case ejabberd_socket:get_peer_certificate( case (StateData#state.sockmod):get_peer_certificate(
StateData#state.socket) of StateData#state.socket) of
{ok, _Cert} -> {ok, _Cert} ->
case ejabberd_socket:get_verify_result( case (StateData#state.sockmod):get_verify_result(
StateData#state.socket) of StateData#state.socket) of
0 -> 0 ->
[{xmlelement, "mechanisms", [{xmlelement, "mechanisms",
@ -204,7 +206,7 @@ wait_for_feature_request({xmlstreamelement, El}, StateData) ->
{xmlelement, Name, Attrs, Els} = El, {xmlelement, Name, Attrs, Els} = El,
TLS = StateData#state.tls, TLS = StateData#state.tls,
TLSEnabled = StateData#state.tls_enabled, TLSEnabled = StateData#state.tls_enabled,
SockMod = ejabberd_socket:get_sockmod(StateData#state.socket), SockMod = (StateData#state.sockmod):get_sockmod(StateData#state.socket),
case {xml:get_attr_s("xmlns", Attrs), Name} of case {xml:get_attr_s("xmlns", Attrs), Name} of
{?NS_TLS, "starttls"} when TLS == true, {?NS_TLS, "starttls"} when TLS == true,
TLSEnabled == false, TLSEnabled == false,
@ -212,9 +214,10 @@ wait_for_feature_request({xmlstreamelement, El}, StateData) ->
?INFO_MSG("starttls", []), ?INFO_MSG("starttls", []),
Socket = StateData#state.socket, Socket = StateData#state.socket,
TLSOpts = StateData#state.tls_options, TLSOpts = StateData#state.tls_options,
TLSSocket = ejabberd_socket:starttls(Socket, TLSOpts), TLSSocket = (StateData#state.sockmod):starttls(
send_element(StateData, Socket, TLSOpts,
{xmlelement, "proceed", [{"xmlns", ?NS_TLS}], []}), xml:element_to_string(
{xmlelement, "proceed", [{"xmlns", ?NS_TLS}], []})),
{next_state, wait_for_stream, {next_state, wait_for_stream,
StateData#state{socket = TLSSocket, StateData#state{socket = TLSSocket,
streamid = new_id(), streamid = new_id(),
@ -227,10 +230,10 @@ wait_for_feature_request({xmlstreamelement, El}, StateData) ->
Auth = jlib:decode_base64(xml:get_cdata(Els)), Auth = jlib:decode_base64(xml:get_cdata(Els)),
AuthDomain = jlib:nameprep(Auth), AuthDomain = jlib:nameprep(Auth),
AuthRes = AuthRes =
case ejabberd_socket:get_peer_certificate( case (StateData#state.sockmod):get_peer_certificate(
StateData#state.socket) of StateData#state.socket) of
{ok, Cert} -> {ok, Cert} ->
case ejabberd_socket:get_verify_result( case (StateData#state.sockmod):get_verify_result(
StateData#state.socket) of StateData#state.socket) of
0 -> 0 ->
case AuthDomain of case AuthDomain of
@ -256,7 +259,7 @@ wait_for_feature_request({xmlstreamelement, El}, StateData) ->
end, end,
if if
AuthRes -> AuthRes ->
ejabberd_socket:reset_stream( (StateData#state.sockmod):reset_stream(
StateData#state.socket), StateData#state.socket),
send_element(StateData, send_element(StateData,
{xmlelement, "success", {xmlelement, "success",
@ -500,7 +503,7 @@ handle_info(_, StateName, StateData) ->
%%---------------------------------------------------------------------- %%----------------------------------------------------------------------
terminate(Reason, _StateName, StateData) -> terminate(Reason, _StateName, StateData) ->
?INFO_MSG("terminated: ~p", [Reason]), ?INFO_MSG("terminated: ~p", [Reason]),
ejabberd_socket:close(StateData#state.socket), (StateData#state.sockmod):close(StateData#state.socket),
ok. ok.
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
@ -508,7 +511,7 @@ terminate(Reason, _StateName, StateData) ->
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
send_text(StateData, Text) -> send_text(StateData, Text) ->
ejabberd_socket:send(StateData#state.socket, Text). (StateData#state.sockmod):send(StateData#state.socket, Text).
send_element(StateData, El) -> send_element(StateData, El) ->
send_text(StateData, xml:element_to_string(El)). send_text(StateData, xml:element_to_string(El)).
@ -516,7 +519,7 @@ send_element(StateData, El) ->
change_shaper(StateData, Host, JID) -> change_shaper(StateData, Host, JID) ->
Shaper = acl:match_rule(Host, StateData#state.shaper, JID), Shaper = acl:match_rule(Host, StateData#state.shaper, JID),
ejabberd_socket:change_shaper(StateData#state.socket, Shaper). (StateData#state.sockmod):change_shaper(StateData#state.socket, Shaper).
new_id() -> new_id() ->

View File

@ -33,7 +33,7 @@
-include("ejabberd.hrl"). -include("ejabberd.hrl").
-include("jlib.hrl"). -include("jlib.hrl").
-record(state, {socket, streamid, -record(state, {socket, sockmod, streamid,
hosts, password, access}). hosts, password, access}).
%-define(DBGFSM, true). %-define(DBGFSM, true).
@ -93,7 +93,7 @@ socket_type() ->
%% ignore | %% ignore |
%% {stop, StopReason} %% {stop, StopReason}
%%---------------------------------------------------------------------- %%----------------------------------------------------------------------
init([Socket, Opts]) -> init([{SockMod, Socket}, Opts]) ->
Access = case lists:keysearch(access, 1, Opts) of Access = case lists:keysearch(access, 1, Opts) of
{value, {_, A}} -> A; {value, {_, A}} -> A;
_ -> all _ -> all
@ -124,6 +124,7 @@ init([Socket, Opts]) ->
end end
end, end,
{ok, wait_for_stream, #state{socket = Socket, {ok, wait_for_stream, #state{socket = Socket,
sockmod = SockMod,
streamid = new_id(), streamid = new_id(),
hosts = Hosts, hosts = Hosts,
password = Password, password = Password,
@ -319,7 +320,7 @@ terminate(Reason, StateName, StateData) ->
_ -> _ ->
ok ok
end, end,
ejabberd_socket:close(StateData#state.socket), (StateData#state.sockmod):close(StateData#state.socket),
ok. ok.
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
@ -327,7 +328,7 @@ terminate(Reason, StateName, StateData) ->
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
send_text(StateData, Text) -> send_text(StateData, Text) ->
ejabberd_socket:send(StateData#state.socket, Text). (StateData#state.sockmod):send(StateData#state.socket, Text).
send_element(StateData, El) -> send_element(StateData, El) ->
send_text(StateData, xml:element_to_string(El)). send_text(StateData, xml:element_to_string(El)).

View File

@ -13,7 +13,9 @@
-export([start/4, -export([start/4,
connect/3, connect/3,
starttls/2, starttls/2,
starttls/3,
compress/1, compress/1,
compress/2,
reset_stream/1, reset_stream/1,
send/2, send/2,
change_shaper/2, change_shaper/2,
@ -43,7 +45,7 @@ start(Module, SockMod, Socket, Opts) ->
SocketData = #socket_state{sockmod = SockMod, SocketData = #socket_state{sockmod = SockMod,
socket = Socket, socket = Socket,
receiver = Receiver}, receiver = Receiver},
{ok, Pid} = Module:start(SocketData, Opts), {ok, Pid} = Module:start({?MODULE, SocketData}, Opts),
case SockMod:controlling_process(Socket, Receiver) of case SockMod:controlling_process(Socket, Receiver) of
ok -> ok ->
ok; ok;
@ -58,8 +60,7 @@ start(Module, SockMod, Socket, Opts) ->
ok; ok;
{error, _Reason} -> {error, _Reason} ->
SockMod:close(Socket) SockMod:close(Socket)
end, end
ejabberd_receiver:become_controller(Pid)
end. end.
connect(Addr, Port, Opts) -> connect(Addr, Port, Opts) ->
@ -87,6 +88,12 @@ starttls(SocketData, TLSOpts) ->
ejabberd_receiver:starttls(SocketData#socket_state.receiver, TLSSocket), ejabberd_receiver:starttls(SocketData#socket_state.receiver, TLSSocket),
SocketData#socket_state{socket = TLSSocket, sockmod = tls}. SocketData#socket_state{socket = TLSSocket, sockmod = tls}.
starttls(SocketData, TLSOpts, Data) ->
{ok, TLSSocket} = tls:tcp_to_tls(SocketData#socket_state.socket, TLSOpts),
ejabberd_receiver:starttls(SocketData#socket_state.receiver, TLSSocket),
send(SocketData, Data),
SocketData#socket_state{socket = TLSSocket, sockmod = tls}.
compress(SocketData) -> compress(SocketData) ->
{ok, ZlibSocket} = ejabberd_zlib:enable_zlib( {ok, ZlibSocket} = ejabberd_zlib:enable_zlib(
SocketData#socket_state.sockmod, SocketData#socket_state.sockmod,
@ -94,6 +101,14 @@ compress(SocketData) ->
ejabberd_receiver:compress(SocketData#socket_state.receiver, ZlibSocket), ejabberd_receiver:compress(SocketData#socket_state.receiver, ZlibSocket),
SocketData#socket_state{socket = ZlibSocket, sockmod = ejabberd_zlib}. SocketData#socket_state{socket = ZlibSocket, sockmod = ejabberd_zlib}.
compress(SocketData, Data) ->
{ok, ZlibSocket} = ejabberd_zlib:enable_zlib(
SocketData#socket_state.sockmod,
SocketData#socket_state.socket),
ejabberd_receiver:compress(SocketData#socket_state.receiver, ZlibSocket),
send(SocketData, Data),
SocketData#socket_state{socket = ZlibSocket, sockmod = ejabberd_zlib}.
reset_stream(SocketData) -> reset_stream(SocketData) ->
ejabberd_receiver:reset_stream(SocketData#socket_state.receiver). ejabberd_receiver:reset_stream(SocketData#socket_state.receiver).

View File

@ -71,7 +71,7 @@ send({http_poll, FsmRef}, Packet) ->
setopts({http_poll, FsmRef}, Opts) -> setopts({http_poll, FsmRef}, Opts) ->
case lists:member({active, once}, Opts) of case lists:member({active, once}, Opts) of
true -> true ->
gen_fsm:sync_send_all_state_event(FsmRef, activate); gen_fsm:send_all_state_event(FsmRef, {activate, self()});
_ -> _ ->
ok ok
end. end.
@ -148,8 +148,9 @@ process_request(_Request) ->
init([ID, Key]) -> init([ID, Key]) ->
?INFO_MSG("started: ~p", [{ID, Key}]), ?INFO_MSG("started: ~p", [{ID, Key}]),
Opts = [], % TODO Opts = [], % TODO
{ok, C2SPid} = ejabberd_c2s:start({?MODULE, {http_poll, self()}}, Opts), ejabberd_socket:start(ejabberd_c2s, ?MODULE, {http_poll, self()}, Opts),
ejabberd_c2s:become_controller(C2SPid), %{ok, C2SPid} = ejabberd_c2s:start({?MODULE, {http_poll, self()}}, Opts),
%ejabberd_c2s:become_controller(C2SPid),
Timer = erlang:start_timer(?HTTP_POLL_TIMEOUT, self(), []), Timer = erlang:start_timer(?HTTP_POLL_TIMEOUT, self(), []),
{ok, loop, #state{id = ID, {ok, loop, #state{id = ID,
key = Key, key = Key,
@ -182,6 +183,20 @@ init([ID, Key]) ->
%% {next_state, NextStateName, NextStateData, Timeout} | %% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData} %% {stop, Reason, NewStateData}
%%---------------------------------------------------------------------- %%----------------------------------------------------------------------
handle_event({activate, From}, StateName, StateData) ->
case StateData#state.input of
"" ->
{next_state, StateName,
StateData#state{waiting_input = {From, ok}}};
Input ->
{Receiver, _Tag} = From,
Receiver ! {tcp, {http_poll, self()}, list_to_binary(Input)},
{next_state, StateName, StateData#state{input = "",
waiting_input = false,
last_receiver = From
}}
end;
handle_event(Event, StateName, StateData) -> handle_event(Event, StateName, StateData) ->
{next_state, StateName, StateData}. {next_state, StateName, StateData}.
@ -199,19 +214,6 @@ handle_sync_event({send, Packet}, From, StateName, StateData) ->
Reply = ok, Reply = ok,
{reply, Reply, StateName, StateData#state{output = Output}}; {reply, Reply, StateName, StateData#state{output = Output}};
handle_sync_event(activate, From, StateName, StateData) ->
case StateData#state.input of
"" ->
{reply, ok, StateName, StateData#state{waiting_input = From}};
Input ->
{Receiver, _Tag} = From,
Receiver ! {tcp, {http_poll, self()}, list_to_binary(Input)},
{reply, ok, StateName, StateData#state{input = "",
waiting_input = false,
last_receiver = From
}}
end;
handle_sync_event(stop, From, StateName, StateData) -> handle_sync_event(stop, From, StateName, StateData) ->
Reply = ok, Reply = ok,
{stop, normal, Reply, StateData}; {stop, normal, Reply, StateData};