25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-11-24 16:23:40 +01:00

Merge branch 'no-more-ejabberd-receivers'

Conflicts:
	rebar.config
This commit is contained in:
Evgeniy Khramtsov 2017-12-26 19:02:54 +03:00
commit 7cdc51becd
16 changed files with 561 additions and 797 deletions

View File

@ -81,7 +81,8 @@
iconv]}}. iconv]}}.
{erl_first_files, ["src/ejabberd_sql_pt.erl", "src/ejabberd_config.erl", {erl_first_files, ["src/ejabberd_sql_pt.erl", "src/ejabberd_config.erl",
"src/gen_mod.erl", "src/mod_muc_room.erl", "src/mod_push.erl"]}. "src/gen_mod.erl", "src/mod_muc_room.erl",
"src/mod_push.erl", "src/xmpp_socket.erl"]}.
{erl_opts, [nowarn_deprecated_function, {erl_opts, [nowarn_deprecated_function,
{i, "include"}, {i, "include"},

View File

@ -33,7 +33,7 @@
-export([start/2, start/3, start_link/3]). -export([start/2, start/3, start_link/3]).
-export([send_xml/2, setopts/2, controlling_process/2, -export([send_xml/2, setopts/2, controlling_process/2,
migrate/3, custom_receiver/1, become_controller/2, migrate/3, become_controller/2,
reset_stream/1, change_shaper/2, monitor/1, close/1, reset_stream/1, change_shaper/2, monitor/1, close/1,
sockname/1, peername/1, process_request/3, send/2, sockname/1, peername/1, process_request/3, send/2,
change_controller/2]). change_controller/2]).
@ -175,9 +175,6 @@ setopts({http_bind, FsmRef, _IP}, Opts) ->
controlling_process(_Socket, _Pid) -> ok. controlling_process(_Socket, _Pid) -> ok.
custom_receiver({http_bind, FsmRef, _IP}) ->
{receiver, ?MODULE, FsmRef}.
become_controller(FsmRef, C2SPid) -> become_controller(FsmRef, C2SPid) ->
p1_fsm:send_all_state_event(FsmRef, p1_fsm:send_all_state_event(FsmRef,
{become_controller, C2SPid}). {become_controller, C2SPid}).
@ -185,11 +182,11 @@ become_controller(FsmRef, C2SPid) ->
change_controller({http_bind, FsmRef, _IP}, C2SPid) -> change_controller({http_bind, FsmRef, _IP}, C2SPid) ->
become_controller(FsmRef, C2SPid). become_controller(FsmRef, C2SPid).
reset_stream({http_bind, _FsmRef, _IP}) -> ok. reset_stream({http_bind, _FsmRef, _IP} = Socket) ->
Socket.
change_shaper({http_bind, FsmRef, _IP}, Shaper) -> change_shaper({http_bind, FsmRef, _IP}, Shaper) ->
p1_fsm:send_all_state_event(FsmRef, p1_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}).
{change_shaper, Shaper}).
monitor({http_bind, FsmRef, _IP}) -> monitor({http_bind, FsmRef, _IP}) ->
erlang:monitor(process, FsmRef). erlang:monitor(process, FsmRef).
@ -306,8 +303,8 @@ init([#body{attrs = Attrs}, IP, SID]) ->
buf_new(XMPPDomain)), buf_new(XMPPDomain)),
Opts2} Opts2}
end, end,
ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, xmpp_socket:start(ejabberd_c2s, ?MODULE, Socket,
Opts), [{receiver, self()}|Opts]),
Inactivity = gen_mod:get_module_opt(XMPPDomain, Inactivity = gen_mod:get_module_opt(XMPPDomain,
mod_bosh, max_inactivity, mod_bosh, max_inactivity,
?DEFAULT_INACTIVITY), ?DEFAULT_INACTIVITY),

View File

@ -22,11 +22,11 @@
-module(ejabberd_c2s). -module(ejabberd_c2s).
-behaviour(xmpp_stream_in). -behaviour(xmpp_stream_in).
-behaviour(ejabberd_config). -behaviour(ejabberd_config).
-behaviour(ejabberd_socket). -behaviour(xmpp_socket).
-protocol({rfc, 6121}). -protocol({rfc, 6121}).
%% ejabberd_socket callbacks %% xmpp_socket callbacks
-export([start/2, start_link/2, socket_type/0]). -export([start/2, start_link/2, socket_type/0]).
%% ejabberd_config callbacks %% ejabberd_config callbacks
-export([opt_type/1, listen_opt_type/1, transform_listen_option/2]). -export([opt_type/1, listen_opt_type/1, transform_listen_option/2]).
@ -62,7 +62,7 @@
-export_type([state/0]). -export_type([state/0]).
%%%=================================================================== %%%===================================================================
%%% ejabberd_socket API %%% xmpp_socket API
%%%=================================================================== %%%===================================================================
start(SockData, Opts) -> start(SockData, Opts) ->
case proplists:get_value(supervisor, Opts, true) of case proplists:get_value(supervisor, Opts, true) of
@ -203,16 +203,16 @@ copy_state(#{owner := Owner} = NewState,
open_session(#{user := U, server := S, resource := R, open_session(#{user := U, server := S, resource := R,
sid := SID, ip := IP, auth_module := AuthModule} = State) -> sid := SID, ip := IP, auth_module := AuthModule} = State) ->
JID = jid:make(U, S, R), JID = jid:make(U, S, R),
change_shaper(State), State1 = change_shaper(State),
Conn = get_conn_type(State), Conn = get_conn_type(State1),
State1 = State#{conn => Conn, resource => R, jid => JID}, State2 = State1#{conn => Conn, resource => R, jid => JID},
Prio = case maps:get(pres_last, State, undefined) of Prio = case maps:get(pres_last, State, undefined) of
undefined -> undefined; undefined -> undefined;
Pres -> get_priority_from_presence(Pres) Pres -> get_priority_from_presence(Pres)
end, end,
Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthModule}], Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthModule}],
ejabberd_sm:open_session(SID, U, S, R, Prio, Info), ejabberd_sm:open_session(SID, U, S, R, Prio, Info),
xmpp_stream_in:establish(State1). xmpp_stream_in:establish(State2).
%%%=================================================================== %%%===================================================================
%%% Hooks %%% Hooks
@ -264,12 +264,12 @@ reject_unauthenticated_packet(State, _Pkt) ->
process_closed(State, Reason) -> process_closed(State, Reason) ->
stop(State#{stop_reason => Reason}). stop(State#{stop_reason => Reason}).
process_terminated(#{sid := SID, sockmod := SockMod, socket := Socket, process_terminated(#{sid := SID, socket := Socket,
jid := JID, user := U, server := S, resource := R} = State, jid := JID, user := U, server := S, resource := R} = State,
Reason) -> Reason) ->
Status = format_reason(State, Reason), Status = format_reason(State, Reason),
?INFO_MSG("(~s) Closing c2s session for ~s: ~s", ?INFO_MSG("(~s) Closing c2s session for ~s: ~s",
[SockMod:pp(Socket), jid:encode(JID), Status]), [xmpp_socket:pp(Socket), jid:encode(JID), Status]),
State1 = case maps:is_key(pres_last, State) of State1 = case maps:is_key(pres_last, State) of
true -> true ->
Pres = #presence{type = unavailable, Pres = #presence{type = unavailable,
@ -285,10 +285,10 @@ process_terminated(#{sid := SID, sockmod := SockMod, socket := Socket,
end, end,
bounce_message_queue(), bounce_message_queue(),
State1; State1;
process_terminated(#{sockmod := SockMod, socket := Socket, process_terminated(#{socket := Socket,
stop_reason := {tls, _}} = State, Reason) -> stop_reason := {tls, _}} = State, Reason) ->
?WARNING_MSG("(~s) Failed to secure c2s connection: ~s", ?WARNING_MSG("(~s) Failed to secure c2s connection: ~s",
[SockMod:pp(Socket), format_reason(State, Reason)]), [xmpp_socket:pp(Socket), format_reason(State, Reason)]),
State; State;
process_terminated(State, _Reason) -> process_terminated(State, _Reason) ->
State. State.
@ -385,7 +385,7 @@ check_password_digest_fun(#{lserver := LServer}) ->
bind(<<"">>, State) -> bind(<<"">>, State) ->
bind(new_uniq_id(), State); bind(new_uniq_id(), State);
bind(R, #{user := U, server := S, access := Access, lang := Lang, bind(R, #{user := U, server := S, access := Access, lang := Lang,
lserver := LServer, sockmod := SockMod, socket := Socket, lserver := LServer, socket := Socket,
ip := IP} = State) -> ip := IP} = State) ->
case resource_conflict_action(U, S, R) of case resource_conflict_action(U, S, R) of
closenew -> closenew ->
@ -401,12 +401,12 @@ bind(R, #{user := U, server := S, access := Access, lang := Lang,
State2 = ejabberd_hooks:run_fold( State2 = ejabberd_hooks:run_fold(
c2s_session_opened, LServer, State1, []), c2s_session_opened, LServer, State1, []),
?INFO_MSG("(~s) Opened c2s session for ~s", ?INFO_MSG("(~s) Opened c2s session for ~s",
[SockMod:pp(Socket), jid:encode(JID)]), [xmpp_socket:pp(Socket), jid:encode(JID)]),
{ok, State2}; {ok, State2};
deny -> deny ->
ejabberd_hooks:run(forbidden_session_hook, LServer, [JID]), ejabberd_hooks:run(forbidden_session_hook, LServer, [JID]),
?INFO_MSG("(~s) Forbidden c2s session for ~s", ?INFO_MSG("(~s) Forbidden c2s session for ~s",
[SockMod:pp(Socket), jid:encode(JID)]), [xmpp_socket:pp(Socket), jid:encode(JID)]),
Txt = <<"Access denied by service policy">>, Txt = <<"Access denied by service policy">>,
{error, xmpp:err_not_allowed(Txt, Lang), State} {error, xmpp:err_not_allowed(Txt, Lang), State}
end end
@ -417,9 +417,9 @@ handle_stream_start(StreamStart, #{lserver := LServer} = State) ->
false -> false ->
send(State#{lserver => ?MYNAME}, xmpp:serr_host_unknown()); send(State#{lserver => ?MYNAME}, xmpp:serr_host_unknown());
true -> true ->
change_shaper(State), State1 = change_shaper(State),
ejabberd_hooks:run_fold( ejabberd_hooks:run_fold(
c2s_stream_started, LServer, State, [StreamStart]) c2s_stream_started, LServer, State1, [StreamStart])
end. end.
handle_stream_end(Reason, #{lserver := LServer} = State) -> handle_stream_end(Reason, #{lserver := LServer} = State) ->
@ -427,20 +427,20 @@ handle_stream_end(Reason, #{lserver := LServer} = State) ->
ejabberd_hooks:run_fold(c2s_closed, LServer, State1, [Reason]). ejabberd_hooks:run_fold(c2s_closed, LServer, State1, [Reason]).
handle_auth_success(User, Mech, AuthModule, handle_auth_success(User, Mech, AuthModule,
#{socket := Socket, sockmod := SockMod, #{socket := Socket,
ip := IP, lserver := LServer} = State) -> ip := IP, lserver := LServer} = State) ->
?INFO_MSG("(~s) Accepted c2s ~s authentication for ~s@~s by ~s backend from ~s", ?INFO_MSG("(~s) Accepted c2s ~s authentication for ~s@~s by ~s backend from ~s",
[SockMod:pp(Socket), Mech, User, LServer, [xmpp_socket:pp(Socket), Mech, User, LServer,
ejabberd_auth:backend_type(AuthModule), ejabberd_auth:backend_type(AuthModule),
ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), ejabberd_config:may_hide_data(misc:ip_to_list(IP))]),
State1 = State#{auth_module => AuthModule}, State1 = State#{auth_module => AuthModule},
ejabberd_hooks:run_fold(c2s_auth_result, LServer, State1, [true, User]). ejabberd_hooks:run_fold(c2s_auth_result, LServer, State1, [true, User]).
handle_auth_failure(User, Mech, Reason, handle_auth_failure(User, Mech, Reason,
#{socket := Socket, sockmod := SockMod, #{socket := Socket,
ip := IP, lserver := LServer} = State) -> ip := IP, lserver := LServer} = State) ->
?INFO_MSG("(~s) Failed c2s ~s authentication ~sfrom ~s: ~s", ?INFO_MSG("(~s) Failed c2s ~s authentication ~sfrom ~s: ~s",
[SockMod:pp(Socket), Mech, [xmpp_socket:pp(Socket), Mech,
if User /= <<"">> -> ["for ", User, "@", LServer, " "]; if User /= <<"">> -> ["for ", User, "@", LServer, " "];
true -> "" true -> ""
end, end,
@ -912,7 +912,7 @@ fix_from_to(Pkt, #{jid := JID}) when ?is_stanza(Pkt) ->
fix_from_to(Pkt, _State) -> fix_from_to(Pkt, _State) ->
Pkt. Pkt.
-spec change_shaper(state()) -> ok. -spec change_shaper(state()) -> state().
change_shaper(#{shaper := ShaperName, ip := IP, lserver := LServer, change_shaper(#{shaper := ShaperName, ip := IP, lserver := LServer,
user := U, server := S, resource := R} = State) -> user := U, server := S, resource := R} = State) ->
JID = jid:make(U, S, R), JID = jid:make(U, S, R),

View File

@ -34,7 +34,8 @@
handle_sync_event/4, code_change/4, handle_info/3, handle_sync_event/4, code_change/4, handle_info/3,
terminate/3, send_xml/2, setopts/2, sockname/1, terminate/3, send_xml/2, setopts/2, sockname/1,
peername/1, controlling_process/2, become_controller/2, peername/1, controlling_process/2, become_controller/2,
close/1, socket_handoff/6, opt_type/1]). monitor/1, reset_stream/1, close/1, change_shaper/2,
socket_handoff/6, opt_type/1]).
-include("ejabberd.hrl"). -include("ejabberd.hrl").
-include("logger.hrl"). -include("logger.hrl").
@ -105,12 +106,21 @@ peername({http_ws, _FsmRef, IP}) -> {ok, IP}.
controlling_process(_Socket, _Pid) -> ok. controlling_process(_Socket, _Pid) -> ok.
become_controller(FsmRef, C2SPid) -> become_controller(FsmRef, C2SPid) ->
p1_fsm:send_all_state_event(FsmRef, p1_fsm:send_all_state_event(FsmRef, {activate, C2SPid}).
{become_controller, C2SPid}).
close({http_ws, FsmRef, _IP}) -> close({http_ws, FsmRef, _IP}) ->
catch p1_fsm:sync_send_all_state_event(FsmRef, close). catch p1_fsm:sync_send_all_state_event(FsmRef, close).
monitor({http_ws, FsmRef, _IP}) ->
erlang:monitor(process, FsmRef).
reset_stream({http_ws, _FsmRef, _IP} = Socket) ->
Socket.
change_shaper({http_ws, _FsmRef, _IP}, _Shaper) ->
%% TODO???
ok.
socket_handoff(LocalPath, Request, Socket, SockMod, Buf, Opts) -> socket_handoff(LocalPath, Request, Socket, SockMod, Buf, Opts) ->
ejabberd_websocket:socket_handoff(LocalPath, Request, Socket, SockMod, ejabberd_websocket:socket_handoff(LocalPath, Request, Socket, SockMod,
Buf, Opts, ?MODULE, fun get_human_html_xmlel/0). Buf, Opts, ?MODULE, fun get_human_html_xmlel/0).
@ -136,8 +146,8 @@ init([{#ws{ip = IP, http_opts = HOpts}, _} = WS]) ->
Socket = {http_ws, self(), IP}, Socket = {http_ws, self(), IP},
?DEBUG("Client connected through websocket ~p", ?DEBUG("Client connected through websocket ~p",
[Socket]), [Socket]),
ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, xmpp_socket:start(ejabberd_c2s, ?MODULE, Socket,
Opts), [{receiver, self()}|Opts]),
Timer = erlang:start_timer(WSTimeout, self(), []), Timer = erlang:start_timer(WSTimeout, self(), []),
{ok, loop, {ok, loop,
#state{socket = Socket, timeout = WSTimeout, #state{socket = Socket, timeout = WSTimeout,

View File

@ -294,7 +294,7 @@ accept(ListenSocket, Module, Opts, Interval) ->
{ok, Socket} -> {ok, Socket} ->
case {inet:sockname(Socket), inet:peername(Socket)} of case {inet:sockname(Socket), inet:peername(Socket)} of
{{ok, {Addr, Port}}, {ok, {PAddr, PPort}}} -> {{ok, {Addr, Port}}, {ok, {PAddr, PPort}}} ->
Receiver = case ejabberd_socket:start(Module, Receiver = case xmpp_socket:start(Module,
gen_tcp, Socket, Opts) of gen_tcp, Socket, Opts) of
{ok, RecvPid} -> RecvPid; {ok, RecvPid} -> RecvPid;
_ -> none _ -> none

View File

@ -1,357 +0,0 @@
%%%----------------------------------------------------------------------
%%% File : ejabberd_receiver.erl
%%% Author : Alexey Shchepin <alexey@process-one.net>
%%% Purpose : Socket receiver for C2S and S2S connections
%%% Created : 10 Nov 2003 by Alexey Shchepin <alexey@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2017 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%----------------------------------------------------------------------
-module(ejabberd_receiver).
-author('alexey@process-one.net').
-ifndef(GEN_SERVER).
-define(GEN_SERVER, gen_server).
-endif.
-behaviour(?GEN_SERVER).
-behaviour(ejabberd_config).
%% API
-export([start_link/4,
start/3,
start/4,
change_shaper/2,
reset_stream/1,
starttls/2,
compress/2,
become_controller/2,
close/1,
opt_type/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2,
handle_info/2, terminate/2, code_change/3]).
-include("ejabberd.hrl").
-include("logger.hrl").
-record(state,
{socket :: inet:socket() | fast_tls:tls_socket() | ezlib:zlib_socket(),
sock_mod = gen_tcp :: gen_tcp | fast_tls | ezlib,
shaper_state = none :: shaper:shaper(),
c2s_pid :: pid() | undefined,
max_stanza_size = infinity :: non_neg_integer() | infinity,
xml_stream_state :: fxml_stream:xml_stream_state() | undefined,
timeout = infinity:: timeout()}).
-spec start_link(inet:socket(), atom(), shaper:shaper(),
non_neg_integer() | infinity) -> ignore |
{error, any()} |
{ok, pid()}.
start_link(Socket, SockMod, Shaper, MaxStanzaSize) ->
?GEN_SERVER:start_link(?MODULE,
[Socket, SockMod, Shaper, MaxStanzaSize], []).
-spec start(inet:socket(), atom(), shaper:shaper()) -> undefined | pid().
start(Socket, SockMod, Shaper) ->
start(Socket, SockMod, Shaper, infinity).
-spec start(inet:socket(), atom(), shaper:shaper(),
non_neg_integer() | infinity) -> undefined | pid().
start(Socket, SockMod, Shaper, MaxStanzaSize) ->
{ok, Pid} = ?GEN_SERVER:start(ejabberd_receiver,
[Socket, SockMod, Shaper, MaxStanzaSize], []),
Pid.
-spec change_shaper(pid(), shaper:shaper()) -> ok.
change_shaper(Pid, Shaper) ->
?GEN_SERVER:cast(Pid, {change_shaper, Shaper}).
-spec reset_stream(pid()) -> ok | {error, any()}.
reset_stream(Pid) -> do_call(Pid, reset_stream).
-spec starttls(pid(), fast_tls:tls_socket()) -> ok | {error, any()}.
starttls(Pid, TLSSocket) ->
do_call(Pid, {starttls, TLSSocket}).
-spec compress(pid(), iodata() | undefined) -> {error, any()} |
{ok, ezlib:zlib_socket()}.
compress(Pid, Data) ->
do_call(Pid, {compress, Data}).
-spec become_controller(pid(), pid()) -> ok | {error, any()}.
become_controller(Pid, C2SPid) ->
do_call(Pid, {become_controller, C2SPid}).
-spec close(pid()) -> ok.
close(Pid) ->
?GEN_SERVER:cast(Pid, close).
%%====================================================================
%% gen_server callbacks
%%====================================================================
init([Socket, SockMod, Shaper, MaxStanzaSize]) ->
ShaperState = shaper:new(Shaper),
Timeout = case SockMod of
ssl -> 20;
_ -> infinity
end,
{ok,
#state{socket = Socket, sock_mod = SockMod,
shaper_state = ShaperState,
max_stanza_size = MaxStanzaSize, timeout = Timeout}}.
handle_call({starttls, TLSSocket}, _From, State) ->
State1 = reset_parser(State),
NewState = State1#state{socket = TLSSocket,
sock_mod = fast_tls},
case fast_tls:recv_data(TLSSocket, <<"">>) of
{ok, TLSData} ->
{reply, ok,
process_data(TLSData, NewState), hibernate_timeout()};
{error, _} = Err ->
{stop, normal, Err, NewState}
end;
handle_call({compress, Data}, _From,
#state{socket = Socket, sock_mod = SockMod} =
State) ->
ejabberd:start_app(ezlib),
{ok, ZlibSocket} = ezlib:enable_zlib(SockMod,
Socket),
if Data /= undefined -> do_send(State, Data);
true -> ok
end,
State1 = reset_parser(State),
NewState = State1#state{socket = ZlibSocket,
sock_mod = ezlib},
case ezlib:recv_data(ZlibSocket, <<"">>) of
{ok, ZlibData} ->
{reply, {ok, ZlibSocket},
process_data(ZlibData, NewState), hibernate_timeout()};
{error, _} = Err ->
{stop, normal, Err, NewState}
end;
handle_call(reset_stream, _From, State) ->
NewState = reset_parser(State),
Reply = ok,
{reply, Reply, NewState, hibernate_timeout()};
handle_call({become_controller, C2SPid}, _From, State) ->
XMLStreamState = fxml_stream:new(C2SPid, State#state.max_stanza_size),
NewState = State#state{c2s_pid = C2SPid,
xml_stream_state = XMLStreamState},
activate_socket(NewState),
Reply = ok,
{reply, Reply, NewState, hibernate_timeout()};
handle_call(_Request, _From, State) ->
Reply = ok, {reply, Reply, State, hibernate_timeout()}.
handle_cast({change_shaper, Shaper}, State) ->
NewShaperState = shaper:new(Shaper),
{noreply, State#state{shaper_state = NewShaperState},
hibernate_timeout()};
handle_cast(close, State) -> {stop, normal, State};
handle_cast(_Msg, State) ->
{noreply, State, hibernate_timeout()}.
handle_info({Tag, _TCPSocket, Data},
#state{socket = Socket, sock_mod = SockMod} = State)
when (Tag == tcp) or (Tag == ssl) or
(Tag == ejabberd_xml) ->
case SockMod of
fast_tls ->
case fast_tls:recv_data(Socket, Data) of
{ok, TLSData} ->
{noreply, process_data(TLSData, State),
hibernate_timeout()};
{error, Reason} ->
if is_binary(Reason) ->
?DEBUG("TLS error = ~s", [Reason]);
true ->
ok
end,
{stop, normal, State}
end;
ezlib ->
case ezlib:recv_data(Socket, Data) of
{ok, ZlibData} ->
{noreply, process_data(ZlibData, State),
hibernate_timeout()};
{error, _Reason} -> {stop, normal, State}
end;
_ ->
{noreply, process_data(Data, State), hibernate_timeout()}
end;
handle_info({Tag, _TCPSocket}, State)
when (Tag == tcp_closed) or (Tag == ssl_closed) ->
{stop, normal, State};
handle_info({Tag, _TCPSocket, Reason}, State)
when (Tag == tcp_error) or (Tag == ssl_error) ->
case Reason of
timeout -> {noreply, State, hibernate_timeout()};
_ -> {stop, normal, State}
end;
handle_info({timeout, _Ref, activate}, State) ->
activate_socket(State),
{noreply, State, hibernate_timeout()};
handle_info(timeout, State) ->
proc_lib:hibernate(?GEN_SERVER, enter_loop,
[?MODULE, [], State]),
{noreply, State, hibernate_timeout()};
handle_info(_Info, State) ->
{noreply, State, hibernate_timeout()}.
terminate(_Reason,
#state{xml_stream_state = XMLStreamState,
c2s_pid = C2SPid} =
State) ->
close_stream(XMLStreamState),
if C2SPid /= undefined ->
p1_fsm:send_event(C2SPid, closed);
true -> ok
end,
catch (State#state.sock_mod):close(State#state.socket),
ok.
code_change(_OldVsn, State, _Extra) -> {ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
activate_socket(#state{socket = Socket,
sock_mod = SockMod}) ->
Res = case SockMod of
gen_tcp ->
inet:setopts(Socket, [{active, once}]);
_ ->
SockMod:setopts(Socket, [{active, once}])
end,
case Res of
{error, _Reason} -> self() ! {tcp_closed, Socket};
ok -> ok
end.
%% Data processing for connectors directly generating xmlelement in
%% Erlang data structure.
%% WARNING: Shaper does not work with Erlang data structure.
process_data([], State) ->
activate_socket(State), State;
process_data([Element | Els],
#state{c2s_pid = C2SPid} = State)
when element(1, Element) == xmlel;
element(1, Element) == xmlstreamstart;
element(1, Element) == xmlstreamelement;
element(1, Element) == xmlstreamend ->
if C2SPid == undefined -> State;
true ->
catch p1_fsm:send_event(C2SPid,
element_wrapper(Element)),
process_data(Els, State)
end;
%% Data processing for connectors receivind data as string.
process_data(Data,
#state{xml_stream_state = XMLStreamState,
shaper_state = ShaperState, c2s_pid = C2SPid} =
State) ->
?DEBUG("Received XML on stream = ~p", [(Data)]),
XMLStreamState1 = case XMLStreamState of
undefined ->
XMLStreamState;
_ ->
fxml_stream:parse(XMLStreamState, Data)
end,
{NewShaperState, Pause} = shaper:update(ShaperState, byte_size(Data)),
if
C2SPid == undefined ->
ok;
Pause > 0 ->
erlang:start_timer(Pause, self(), activate);
true ->
activate_socket(State)
end,
State#state{xml_stream_state = XMLStreamState1,
shaper_state = NewShaperState}.
%% Element coming from XML parser are wrapped inside xmlstreamelement
%% When we receive directly xmlelement tuple (from a socket module
%% speaking directly Erlang XML), we wrap it inside the same
%% xmlstreamelement coming from the XML parser.
element_wrapper(XMLElement)
when element(1, XMLElement) == xmlel ->
{xmlstreamelement, XMLElement};
element_wrapper(Element) -> Element.
close_stream(undefined) -> ok;
close_stream(XMLStreamState) ->
fxml_stream:close(XMLStreamState).
reset_parser(#state{xml_stream_state = undefined} = State) ->
State;
reset_parser(#state{c2s_pid = C2SPid,
max_stanza_size = MaxStanzaSize,
xml_stream_state = XMLStreamState}
= State) ->
NewStreamState = try fxml_stream:reset(XMLStreamState)
catch error:_ ->
close_stream(XMLStreamState),
case C2SPid of
undefined ->
undefined;
_ ->
fxml_stream:new(C2SPid, MaxStanzaSize)
end
end,
State#state{xml_stream_state = NewStreamState}.
do_send(State, Data) ->
(State#state.sock_mod):send(State#state.socket, Data).
do_call(Pid, Msg) ->
try ?GEN_SERVER:call(Pid, Msg) of
Res -> Res
catch _:{timeout, _} ->
{error, timeout};
_:_ ->
{error, einval}
end.
hibernate_timeout() ->
ejabberd_config:get_option(receiver_hibernate, timer:seconds(90)).
-spec opt_type(receiver_hibernate) -> fun((pos_integer() | hibernate) ->
pos_integer() | hibernate);
(atom()) -> [atom()].
opt_type(receiver_hibernate) ->
fun(I) when is_integer(I), I>0 -> I;
(hibernate) -> hibernate
end;
opt_type(_) ->
[receiver_hibernate].

View File

@ -21,9 +21,9 @@
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
-module(ejabberd_s2s_in). -module(ejabberd_s2s_in).
-behaviour(xmpp_stream_in). -behaviour(xmpp_stream_in).
-behaviour(ejabberd_socket). -behaviour(xmpp_socket).
%% ejabberd_socket callbacks %% xmpp_socket callbacks
-export([start/2, start_link/2, socket_type/0]). -export([start/2, start_link/2, socket_type/0]).
%% ejabberd_listener callbacks %% ejabberd_listener callbacks
-export([listen_opt_type/1]). -export([listen_opt_type/1]).
@ -180,31 +180,29 @@ handle_stream_established(State) ->
set_idle_timeout(State#{established => true}). set_idle_timeout(State#{established => true}).
handle_auth_success(RServer, Mech, _AuthModule, handle_auth_success(RServer, Mech, _AuthModule,
#{sockmod := SockMod, #{socket := Socket, ip := IP,
socket := Socket, ip := IP,
auth_domains := AuthDomains, auth_domains := AuthDomains,
server_host := ServerHost, server_host := ServerHost,
lserver := LServer} = State) -> lserver := LServer} = State) ->
?INFO_MSG("(~s) Accepted inbound s2s ~s authentication ~s -> ~s (~s)", ?INFO_MSG("(~s) Accepted inbound s2s ~s authentication ~s -> ~s (~s)",
[SockMod:pp(Socket), Mech, RServer, LServer, [xmpp_socket:pp(Socket), Mech, RServer, LServer,
ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), ejabberd_config:may_hide_data(misc:ip_to_list(IP))]),
State1 = case ejabberd_s2s:allow_host(ServerHost, RServer) of State1 = case ejabberd_s2s:allow_host(ServerHost, RServer) of
true -> true ->
AuthDomains1 = sets:add_element(RServer, AuthDomains), AuthDomains1 = sets:add_element(RServer, AuthDomains),
change_shaper(State, RServer), State0 = change_shaper(State, RServer),
State#{auth_domains => AuthDomains1}; State0#{auth_domains => AuthDomains1};
false -> false ->
State State
end, end,
ejabberd_hooks:run_fold(s2s_in_auth_result, ServerHost, State1, [true, RServer]). ejabberd_hooks:run_fold(s2s_in_auth_result, ServerHost, State1, [true, RServer]).
handle_auth_failure(RServer, Mech, Reason, handle_auth_failure(RServer, Mech, Reason,
#{sockmod := SockMod, #{socket := Socket, ip := IP,
socket := Socket, ip := IP,
server_host := ServerHost, server_host := ServerHost,
lserver := LServer} = State) -> lserver := LServer} = State) ->
?INFO_MSG("(~s) Failed inbound s2s ~s authentication ~s -> ~s (~s): ~s", ?INFO_MSG("(~s) Failed inbound s2s ~s authentication ~s -> ~s (~s): ~s",
[SockMod:pp(Socket), Mech, RServer, LServer, [xmpp_socket:pp(Socket), Mech, RServer, LServer,
ejabberd_config:may_hide_data(misc:ip_to_list(IP)), Reason]), ejabberd_config:may_hide_data(misc:ip_to_list(IP)), Reason]),
ejabberd_hooks:run_fold(s2s_in_auth_result, ejabberd_hooks:run_fold(s2s_in_auth_result,
ServerHost, State, [false, RServer]). ServerHost, State, [false, RServer]).
@ -286,11 +284,11 @@ handle_info(Info, #{server_host := LServer} = State) ->
ejabberd_hooks:run_fold(s2s_in_handle_info, LServer, State, [Info]). ejabberd_hooks:run_fold(s2s_in_handle_info, LServer, State, [Info]).
terminate(Reason, #{auth_domains := AuthDomains, terminate(Reason, #{auth_domains := AuthDomains,
sockmod := SockMod, socket := Socket} = State) -> socket := Socket} = State) ->
case maps:get(stop_reason, State, undefined) of case maps:get(stop_reason, State, undefined) of
{tls, _} = Err -> {tls, _} = Err ->
?WARNING_MSG("(~s) Failed to secure inbound s2s connection: ~s", ?WARNING_MSG("(~s) Failed to secure inbound s2s connection: ~s",
[SockMod:pp(Socket), xmpp_stream_in:format_error(Err)]); [xmpp_socket:pp(Socket), xmpp_stream_in:format_error(Err)]);
_ -> _ ->
ok ok
end, end,
@ -340,7 +338,7 @@ set_idle_timeout(#{server_host := LServer,
set_idle_timeout(State) -> set_idle_timeout(State) ->
State. State.
-spec change_shaper(state(), binary()) -> ok. -spec change_shaper(state(), binary()) -> state().
change_shaper(#{shaper := ShaperName, server_host := ServerHost} = State, change_shaper(#{shaper := ShaperName, server_host := ServerHost} = State,
RServer) -> RServer) ->
Shaper = acl:match_rule(ServerHost, ShaperName, jid:make(RServer)), Shaper = acl:match_rule(ServerHost, ShaperName, jid:make(RServer)),

View File

@ -61,12 +61,12 @@ start(From, To, Opts) ->
Res -> Res Res -> Res
end; end;
_ -> _ ->
xmpp_stream_out:start(?MODULE, [ejabberd_socket, From, To, Opts], xmpp_stream_out:start(?MODULE, [xmpp_socket, From, To, Opts],
ejabberd_config:fsm_limit_opts([])) ejabberd_config:fsm_limit_opts([]))
end. end.
start_link(From, To, Opts) -> start_link(From, To, Opts) ->
xmpp_stream_out:start_link(?MODULE, [ejabberd_socket, From, To, Opts], xmpp_stream_out:start_link(?MODULE, [xmpp_socket, From, To, Opts],
ejabberd_config:fsm_limit_opts([])). ejabberd_config:fsm_limit_opts([])).
-spec connect(pid()) -> ok. -spec connect(pid()) -> ok.
@ -210,24 +210,22 @@ dns_retries(#{server := LServer}) ->
dns_timeout(#{server := LServer}) -> dns_timeout(#{server := LServer}) ->
ejabberd_config:get_option({s2s_dns_timeout, LServer}, timer:seconds(10)). ejabberd_config:get_option({s2s_dns_timeout, LServer}, timer:seconds(10)).
handle_auth_success(Mech, #{sockmod := SockMod, handle_auth_success(Mech, #{socket := Socket, ip := IP,
socket := Socket, ip := IP,
remote_server := RServer, remote_server := RServer,
server_host := ServerHost, server_host := ServerHost,
server := LServer} = State) -> server := LServer} = State) ->
?INFO_MSG("(~s) Accepted outbound s2s ~s authentication ~s -> ~s (~s)", ?INFO_MSG("(~s) Accepted outbound s2s ~s authentication ~s -> ~s (~s)",
[SockMod:pp(Socket), Mech, LServer, RServer, [xmpp_socket:pp(Socket), Mech, LServer, RServer,
ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), ejabberd_config:may_hide_data(misc:ip_to_list(IP))]),
ejabberd_hooks:run_fold(s2s_out_auth_result, ServerHost, State, [true]). ejabberd_hooks:run_fold(s2s_out_auth_result, ServerHost, State, [true]).
handle_auth_failure(Mech, Reason, handle_auth_failure(Mech, Reason,
#{sockmod := SockMod, #{socket := Socket, ip := IP,
socket := Socket, ip := IP,
remote_server := RServer, remote_server := RServer,
server_host := ServerHost, server_host := ServerHost,
server := LServer} = State) -> server := LServer} = State) ->
?INFO_MSG("(~s) Failed outbound s2s ~s authentication ~s -> ~s (~s): ~s", ?INFO_MSG("(~s) Failed outbound s2s ~s authentication ~s -> ~s (~s): ~s",
[SockMod:pp(Socket), Mech, LServer, RServer, [xmpp_socket:pp(Socket), Mech, LServer, RServer,
ejabberd_config:may_hide_data(misc:ip_to_list(IP)), ejabberd_config:may_hide_data(misc:ip_to_list(IP)),
xmpp_stream_out:format_error(Reason)]), xmpp_stream_out:format_error(Reason)]),
ejabberd_hooks:run_fold(s2s_out_auth_result, ServerHost, State, [{false, Reason}]). ejabberd_hooks:run_fold(s2s_out_auth_result, ServerHost, State, [{false, Reason}]).

View File

@ -21,11 +21,11 @@
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
-module(ejabberd_service). -module(ejabberd_service).
-behaviour(xmpp_stream_in). -behaviour(xmpp_stream_in).
-behaviour(ejabberd_socket). -behaviour(xmpp_socket).
-protocol({xep, 114, '1.6'}). -protocol({xep, 114, '1.6'}).
%% ejabberd_socket callbacks %% xmpp_socket callbacks
-export([start/2, start_link/2, socket_type/0, close/1, close/2]). -export([start/2, start_link/2, socket_type/0, close/1, close/2]).
%% ejabberd_listener callbacks %% ejabberd_listener callbacks
-export([listen_opt_type/1, transform_listen_option/2]). -export([listen_opt_type/1, transform_listen_option/2]).
@ -100,8 +100,8 @@ init([State, Opts]) ->
false -> [compression_none | TLSOpts1]; false -> [compression_none | TLSOpts1];
true -> TLSOpts1 true -> TLSOpts1
end, end,
xmpp_stream_in:change_shaper(State, Shaper), State1 = xmpp_stream_in:change_shaper(State, Shaper),
State1 = State#{access => Access, State2 = State1#{access => Access,
xmlns => ?NS_COMPONENT, xmlns => ?NS_COMPONENT,
lang => ?MYLANG, lang => ?MYLANG,
server => ?MYNAME, server => ?MYNAME,
@ -109,7 +109,7 @@ init([State, Opts]) ->
stream_version => undefined, stream_version => undefined,
tls_options => TLSOpts, tls_options => TLSOpts,
check_from => CheckFrom}, check_from => CheckFrom},
ejabberd_hooks:run_fold(component_init, {ok, State1}, [Opts]). ejabberd_hooks:run_fold(component_init, {ok, State2}, [Opts]).
handle_stream_start(_StreamStart, handle_stream_start(_StreamStart,
#{remote_server := RemoteServer, #{remote_server := RemoteServer,
@ -135,8 +135,7 @@ handle_stream_start(_StreamStart,
end. end.
get_password_fun(#{remote_server := RemoteServer, get_password_fun(#{remote_server := RemoteServer,
socket := Socket, sockmod := SockMod, socket := Socket, ip := IP,
ip := IP,
host_opts := HostOpts}) -> host_opts := HostOpts}) ->
fun(_) -> fun(_) ->
case dict:find(RemoteServer, HostOpts) of case dict:find(RemoteServer, HostOpts) of
@ -145,7 +144,7 @@ get_password_fun(#{remote_server := RemoteServer,
error -> error ->
?INFO_MSG("(~s) Domain ~s is unconfigured for " ?INFO_MSG("(~s) Domain ~s is unconfigured for "
"external component from ~s", "external component from ~s",
[SockMod:pp(Socket), RemoteServer, [xmpp_socket:pp(Socket), RemoteServer,
ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), ejabberd_config:may_hide_data(misc:ip_to_list(IP))]),
{false, undefined} {false, undefined}
end end
@ -153,11 +152,10 @@ get_password_fun(#{remote_server := RemoteServer,
handle_auth_success(_, Mech, _, handle_auth_success(_, Mech, _,
#{remote_server := RemoteServer, host_opts := HostOpts, #{remote_server := RemoteServer, host_opts := HostOpts,
socket := Socket, sockmod := SockMod, socket := Socket, ip := IP} = State) ->
ip := IP} = State) ->
?INFO_MSG("(~s) Accepted external component ~s authentication " ?INFO_MSG("(~s) Accepted external component ~s authentication "
"for ~s from ~s", "for ~s from ~s",
[SockMod:pp(Socket), Mech, RemoteServer, [xmpp_socket:pp(Socket), Mech, RemoteServer,
ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), ejabberd_config:may_hide_data(misc:ip_to_list(IP))]),
lists:foreach( lists:foreach(
fun (H) -> fun (H) ->
@ -168,11 +166,10 @@ handle_auth_success(_, Mech, _,
handle_auth_failure(_, Mech, Reason, handle_auth_failure(_, Mech, Reason,
#{remote_server := RemoteServer, #{remote_server := RemoteServer,
sockmod := SockMod,
socket := Socket, ip := IP} = State) -> socket := Socket, ip := IP} = State) ->
?INFO_MSG("(~s) Failed external component ~s authentication " ?INFO_MSG("(~s) Failed external component ~s authentication "
"for ~s from ~s: ~s", "for ~s from ~s: ~s",
[SockMod:pp(Socket), Mech, RemoteServer, [xmpp_socket:pp(Socket), Mech, RemoteServer,
ejabberd_config:may_hide_data(misc:ip_to_list(IP)), ejabberd_config:may_hide_data(misc:ip_to_list(IP)),
Reason]), Reason]),
State. State.

View File

@ -1,293 +0,0 @@
%%%----------------------------------------------------------------------
%%% File : ejabberd_socket.erl
%%% Author : Alexey Shchepin <alexey@process-one.net>
%%% Purpose : Socket with zlib and TLS support library
%%% Created : 23 Aug 2006 by Alexey Shchepin <alexey@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2017 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%----------------------------------------------------------------------
-module(ejabberd_socket).
-author('alexey@process-one.net').
%% API
-export([start/4,
connect/3,
connect/4,
connect/5,
starttls/2,
compress/1,
compress/2,
reset_stream/1,
send_element/2,
send_header/2,
send_trailer/1,
send/2,
send_xml/2,
change_shaper/2,
monitor/1,
get_sockmod/1,
get_transport/1,
get_peer_certificate/2,
get_verify_result/1,
close/1,
pp/1,
sockname/1, peername/1]).
-include("ejabberd.hrl").
-include("xmpp.hrl").
-include("logger.hrl").
-type sockmod() :: ejabberd_bosh |
ejabberd_http_ws |
gen_tcp | fast_tls | ezlib.
-type receiver() :: pid () | atom().
-type socket() :: pid() | inet:socket() |
fast_tls:tls_socket() |
ezlib:zlib_socket() |
ejabberd_bosh:bosh_socket() |
ejabberd_http_ws:ws_socket().
-record(socket_state, {sockmod = gen_tcp :: sockmod(),
socket = self() :: socket(),
receiver = self() :: receiver()}).
-type socket_state() :: #socket_state{}.
-export_type([socket/0, socket_state/0, sockmod/0]).
-callback start({module(), socket_state()},
[proplists:property()]) -> {ok, pid()} | {error, term()} | ignore.
-callback start_link({module(), socket_state()},
[proplists:property()]) -> {ok, pid()} | {error, term()} | ignore.
-callback socket_type() -> xml_stream | independent | raw.
-define(is_http_socket(S),
(S#socket_state.sockmod == ejabberd_bosh orelse
S#socket_state.sockmod == ejabberd_http_ws)).
%%====================================================================
%% API
%%====================================================================
-spec start(atom(), sockmod(), socket(), [proplists:property()])
-> {ok, pid() | independent} | {error, inet:posix() | any()} | ignore.
start(Module, SockMod, Socket, Opts) ->
case Module:socket_type() of
independent -> {ok, independent};
xml_stream ->
MaxStanzaSize = proplists:get_value(max_stanza_size, Opts, infinity),
{ReceiverMod, Receiver, RecRef} =
try SockMod:custom_receiver(Socket) of
{receiver, RecMod, RecPid} ->
{RecMod, RecPid, RecMod}
catch _:_ ->
RecPid = ejabberd_receiver:start(
Socket, SockMod, none, MaxStanzaSize),
{ejabberd_receiver, RecPid, RecPid}
end,
SocketData = #socket_state{sockmod = SockMod,
socket = Socket, receiver = RecRef},
case Module:start({?MODULE, SocketData}, Opts) of
{ok, Pid} ->
case SockMod:controlling_process(Socket, Receiver) of
ok ->
ReceiverMod:become_controller(Receiver, Pid),
{ok, Receiver};
Err ->
SockMod:close(Socket),
Err
end;
Err ->
SockMod:close(Socket),
case ReceiverMod of
ejabberd_receiver -> ReceiverMod:close(Receiver);
_ -> ok
end,
Err
end;
raw ->
case Module:start({SockMod, Socket}, Opts) of
{ok, Pid} ->
case SockMod:controlling_process(Socket, Pid) of
ok ->
{ok, Pid};
{error, _} = Err ->
SockMod:close(Socket),
Err
end;
Err ->
SockMod:close(Socket),
Err
end
end.
connect(Addr, Port, Opts) ->
connect(Addr, Port, Opts, infinity, self()).
connect(Addr, Port, Opts, Timeout) ->
connect(Addr, Port, Opts, Timeout, self()).
connect(Addr, Port, Opts, Timeout, Owner) ->
case gen_tcp:connect(Addr, Port, Opts, Timeout) of
{ok, Socket} ->
Receiver = ejabberd_receiver:start(Socket, gen_tcp,
none),
SocketData = #socket_state{sockmod = gen_tcp,
socket = Socket, receiver = Receiver},
case gen_tcp:controlling_process(Socket, Receiver) of
ok ->
ejabberd_receiver:become_controller(Receiver, Owner),
{ok, SocketData};
{error, _Reason} = Error -> gen_tcp:close(Socket), Error
end;
{error, _Reason} = Error -> Error
end.
starttls(#socket_state{socket = Socket,
receiver = Receiver} = SocketData, TLSOpts) ->
case fast_tls:tcp_to_tls(Socket, TLSOpts) of
{ok, TLSSocket} ->
case ejabberd_receiver:starttls(Receiver, TLSSocket) of
ok ->
{ok, SocketData#socket_state{socket = TLSSocket,
sockmod = fast_tls}};
{error, _} = Err ->
Err
end;
{error, _} = Err ->
Err
end.
compress(SocketData) -> compress(SocketData, undefined).
compress(SocketData, Data) ->
case ejabberd_receiver:compress(SocketData#socket_state.receiver, Data) of
{ok, ZlibSocket} ->
{ok, SocketData#socket_state{socket = ZlibSocket, sockmod = ezlib}};
Err ->
?ERROR_MSG("compress failed: ~p", [Err]),
Err
end.
reset_stream(SocketData)
when is_pid(SocketData#socket_state.receiver) ->
ejabberd_receiver:reset_stream(SocketData#socket_state.receiver);
reset_stream(SocketData)
when is_atom(SocketData#socket_state.receiver) ->
(SocketData#socket_state.receiver):reset_stream(SocketData#socket_state.socket).
-spec send_element(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}.
send_element(SocketData, El) when ?is_http_socket(SocketData) ->
send_xml(SocketData, {xmlstreamelement, El});
send_element(SocketData, El) ->
send(SocketData, fxml:element_to_binary(El)).
-spec send_header(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}.
send_header(SocketData, El) when ?is_http_socket(SocketData) ->
send_xml(SocketData, {xmlstreamstart, El#xmlel.name, El#xmlel.attrs});
send_header(SocketData, El) ->
send(SocketData, fxml:element_to_header(El)).
-spec send_trailer(socket_state()) -> ok | {error, inet:posix()}.
send_trailer(SocketData) when ?is_http_socket(SocketData) ->
send_xml(SocketData, {xmlstreamend, <<"stream:stream">>});
send_trailer(SocketData) ->
send(SocketData, <<"</stream:stream>">>).
-spec send(socket_state(), iodata()) -> ok | {error, closed | inet:posix()}.
send(#socket_state{sockmod = SockMod, socket = Socket} = SocketData, Data) ->
?DEBUG("(~s) Send XML on stream = ~p", [pp(SocketData), Data]),
try SockMod:send(Socket, Data) of
{error, einval} -> {error, closed};
Result -> Result
catch _:badarg ->
%% Some modules throw badarg exceptions on closed sockets
%% TODO: their code should be improved
{error, closed}
end.
-spec send_xml(socket_state(),
{xmlstreamelement, fxml:xmlel()} |
{xmlstreamstart, binary(), [{binary(), binary()}]} |
{xmlstreamend, binary()} |
{xmlstreamraw, iodata()}) -> term().
send_xml(SocketData, El) ->
(SocketData#socket_state.sockmod):send_xml(SocketData#socket_state.socket, El).
change_shaper(SocketData, Shaper)
when is_pid(SocketData#socket_state.receiver) ->
ejabberd_receiver:change_shaper(SocketData#socket_state.receiver,
Shaper);
change_shaper(SocketData, Shaper)
when is_atom(SocketData#socket_state.receiver) ->
(SocketData#socket_state.receiver):change_shaper(SocketData#socket_state.socket,
Shaper).
monitor(SocketData)
when is_pid(SocketData#socket_state.receiver) ->
erlang:monitor(process,
SocketData#socket_state.receiver);
monitor(SocketData)
when is_atom(SocketData#socket_state.receiver) ->
(SocketData#socket_state.receiver):monitor(SocketData#socket_state.socket).
get_sockmod(SocketData) ->
SocketData#socket_state.sockmod.
get_transport(#socket_state{sockmod = SockMod,
socket = Socket}) ->
case SockMod of
gen_tcp -> tcp;
fast_tls -> tls;
ezlib ->
case ezlib:get_sockmod(Socket) of
gen_tcp -> tcp_zlib;
fast_tls -> tls_zlib
end;
ejabberd_bosh -> http_bind;
ejabberd_http_ws -> websocket
end.
get_peer_certificate(SocketData, Type) ->
fast_tls:get_peer_certificate(SocketData#socket_state.socket, Type).
get_verify_result(SocketData) ->
fast_tls:get_verify_result(SocketData#socket_state.socket).
close(SocketData) ->
ejabberd_receiver:close(SocketData#socket_state.receiver).
sockname(#socket_state{sockmod = SockMod,
socket = Socket}) ->
case SockMod of
gen_tcp -> inet:sockname(Socket);
_ -> SockMod:sockname(Socket)
end.
peername(#socket_state{sockmod = SockMod,
socket = Socket}) ->
case SockMod of
gen_tcp -> inet:peername(Socket);
_ -> SockMod:peername(Socket)
end.
pp(#socket_state{receiver = Receiver} = State) ->
Transport = get_transport(State),
io_lib:format("~s|~w", [Transport, Receiver]).

View File

@ -139,14 +139,13 @@ s2s_out_auth_result(#{db_verify := _} = State, _) ->
%% in section 2.1.2, step 2 %% in section 2.1.2, step 2
{stop, send_verify_request(State)}; {stop, send_verify_request(State)};
s2s_out_auth_result(#{db_enabled := true, s2s_out_auth_result(#{db_enabled := true,
sockmod := SockMod,
socket := Socket, ip := IP, socket := Socket, ip := IP,
server := LServer, server := LServer,
remote_server := RServer} = State, {false, _}) -> remote_server := RServer} = State, {false, _}) ->
%% SASL authentication has failed, retrying with dialback %% SASL authentication has failed, retrying with dialback
%% Sending dialback request, section 2.1.1, step 1 %% Sending dialback request, section 2.1.1, step 1
?INFO_MSG("(~s) Retrying with s2s dialback authentication: ~s -> ~s (~s)", ?INFO_MSG("(~s) Retrying with s2s dialback authentication: ~s -> ~s (~s)",
[SockMod:pp(Socket), LServer, RServer, [xmpp_socket:pp(Socket), LServer, RServer,
ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), ejabberd_config:may_hide_data(misc:ip_to_list(IP))]),
State1 = maps:remove(stop_reason, State#{on_route => queue}), State1 = maps:remove(stop_reason, State#{on_route => queue}),
{stop, send_db_request(State1)}; {stop, send_db_request(State1)};
@ -159,7 +158,6 @@ s2s_out_downgraded(#{db_verify := _} = State, _) ->
%% section 2.1.2, step 2 %% section 2.1.2, step 2
{stop, send_verify_request(State)}; {stop, send_verify_request(State)};
s2s_out_downgraded(#{db_enabled := true, s2s_out_downgraded(#{db_enabled := true,
sockmod := SockMod,
socket := Socket, ip := IP, socket := Socket, ip := IP,
server := LServer, server := LServer,
remote_server := RServer} = State, _) -> remote_server := RServer} = State, _) ->
@ -167,7 +165,7 @@ s2s_out_downgraded(#{db_enabled := true,
%% section 2.1.1, step 1 %% section 2.1.1, step 1
?INFO_MSG("(~s) Trying s2s dialback authentication with " ?INFO_MSG("(~s) Trying s2s dialback authentication with "
"non-RFC compliant server: ~s -> ~s (~s)", "non-RFC compliant server: ~s -> ~s (~s)",
[SockMod:pp(Socket), LServer, RServer, [xmpp_socket:pp(Socket), LServer, RServer,
ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), ejabberd_config:may_hide_data(misc:ip_to_list(IP))]),
{stop, send_db_request(State)}; {stop, send_db_request(State)};
s2s_out_downgraded(State, _) -> s2s_out_downgraded(State, _) ->

View File

@ -389,7 +389,7 @@ handle_a(State, #sm_a{h = H}) ->
resend_rack(State1). resend_rack(State1).
-spec handle_resume(state(), sm_resume()) -> {ok, state()} | {error, state()}. -spec handle_resume(state(), sm_resume()) -> {ok, state()} | {error, state()}.
handle_resume(#{user := User, lserver := LServer, sockmod := SockMod, handle_resume(#{user := User, lserver := LServer,
lang := Lang, socket := Socket} = State, lang := Lang, socket := Socket} = State,
#sm_resume{h = H, previd = PrevID, xmlns = Xmlns}) -> #sm_resume{h = H, previd = PrevID, xmlns = Xmlns}) ->
R = case inherit_session_state(State, PrevID) of R = case inherit_session_state(State, PrevID) of
@ -416,7 +416,7 @@ handle_resume(#{user := User, lserver := LServer, sockmod := SockMod,
State4 = send(State3, #sm_r{xmlns = AttrXmlns}), State4 = send(State3, #sm_r{xmlns = AttrXmlns}),
State5 = ejabberd_hooks:run_fold(c2s_session_resumed, LServer, State4, []), State5 = ejabberd_hooks:run_fold(c2s_session_resumed, LServer, State4, []),
?INFO_MSG("(~s) Resumed session for ~s", ?INFO_MSG("(~s) Resumed session for ~s",
[SockMod:pp(Socket), jid:encode(JID)]), [xmpp_socket:pp(Socket), jid:encode(JID)]),
{ok, State5}; {ok, State5};
{error, El, Msg} -> {error, El, Msg} ->
?INFO_MSG("Cannot resume session for ~s@~s: ~s", ?INFO_MSG("Cannot resume session for ~s@~s: ~s",

386
src/xmpp_socket.erl Normal file
View File

@ -0,0 +1,386 @@
%%%----------------------------------------------------------------------
%%% File : xmpp_socket.erl
%%% Author : Alexey Shchepin <alexey@process-one.net>
%%% Purpose : Socket with zlib and TLS support library
%%% Created : 23 Aug 2006 by Alexey Shchepin <alexey@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2017 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%----------------------------------------------------------------------
-module(xmpp_socket).
-author('alexey@process-one.net').
%% API
-export([start/4,
connect/3,
connect/4,
connect/5,
starttls/2,
compress/1,
compress/2,
reset_stream/1,
send_element/2,
send_header/2,
send_trailer/1,
send/2,
send_xml/2,
recv/2,
activate/1,
change_shaper/2,
monitor/1,
get_sockmod/1,
get_transport/1,
get_peer_certificate/2,
get_verify_result/1,
close/1,
pp/1,
sockname/1, peername/1]).
-include("ejabberd.hrl").
-include("xmpp.hrl").
-include("logger.hrl").
-type sockmod() :: ejabberd_bosh |
ejabberd_http_ws |
gen_tcp | fast_tls | ezlib.
-type receiver() :: atom().
-type socket() :: pid() | inet:socket() |
fast_tls:tls_socket() |
ezlib:zlib_socket() |
ejabberd_bosh:bosh_socket() |
ejabberd_http_ws:ws_socket().
-record(socket_state, {sockmod = gen_tcp :: sockmod(),
socket :: socket(),
max_stanza_size = infinity :: timeout(),
xml_stream :: undefined | fxml_stream:xml_stream_state(),
shaper = none :: none | shaper:shaper(),
receiver :: receiver()}).
-type socket_state() :: #socket_state{}.
-export_type([socket/0, socket_state/0, sockmod/0]).
-callback start({module(), socket_state()},
[proplists:property()]) -> {ok, pid()} | {error, term()} | ignore.
-callback start_link({module(), socket_state()},
[proplists:property()]) -> {ok, pid()} | {error, term()} | ignore.
-callback socket_type() -> xml_stream | independent | raw.
-define(is_http_socket(S),
(S#socket_state.sockmod == ejabberd_bosh orelse
S#socket_state.sockmod == ejabberd_http_ws)).
%%====================================================================
%% API
%%====================================================================
-spec start(atom(), sockmod(), socket(), [proplists:property()])
-> {ok, pid() | independent} | {error, inet:posix() | any()} | ignore.
start(Module, SockMod, Socket, Opts) ->
try
case Module:socket_type() of
independent ->
{ok, independent};
xml_stream ->
MaxStanzaSize = proplists:get_value(max_stanza_size, Opts, infinity),
Receiver = proplists:get_value(receiver, Opts),
SocketData = #socket_state{sockmod = SockMod,
socket = Socket,
receiver = Receiver,
max_stanza_size = MaxStanzaSize},
{ok, Pid} = Module:start({?MODULE, SocketData}, Opts),
Receiver1 = if is_pid(Receiver) -> Receiver;
true -> Pid
end,
ok = controlling_process(SocketData, Receiver1),
ok = become_controller(SocketData, Pid),
{ok, Receiver1};
raw ->
{ok, Pid} = Module:start({SockMod, Socket}, Opts),
ok = SockMod:controlling_process(Socket, Pid),
{ok, Pid}
end
catch _:{badmatch, {error, _} = Err} ->
SockMod:close(Socket),
Err
end.
connect(Addr, Port, Opts) ->
connect(Addr, Port, Opts, infinity, self()).
connect(Addr, Port, Opts, Timeout) ->
connect(Addr, Port, Opts, Timeout, self()).
connect(Addr, Port, Opts, Timeout, Owner) ->
case gen_tcp:connect(Addr, Port, Opts, Timeout) of
{ok, Socket} ->
SocketData = #socket_state{sockmod = gen_tcp, socket = Socket},
case controlling_process(SocketData, Owner) of
ok ->
activate_after(Socket, Owner, 0),
{ok, SocketData};
{error, _Reason} = Error ->
gen_tcp:close(Socket),
Error
end;
{error, _Reason} = Error ->
Error
end.
starttls(#socket_state{socket = Socket,
receiver = undefined} = SocketData, TLSOpts) ->
case fast_tls:tcp_to_tls(Socket, TLSOpts) of
{ok, TLSSocket} ->
SocketData1 = SocketData#socket_state{socket = TLSSocket,
sockmod = fast_tls},
SocketData2 = reset_stream(SocketData1),
case fast_tls:recv_data(TLSSocket, <<>>) of
{ok, TLSData} ->
parse(SocketData2, TLSData);
{error, _} = Err ->
Err
end;
{error, _} = Err ->
Err
end.
compress(SocketData) -> compress(SocketData, undefined).
compress(#socket_state{receiver = undefined,
sockmod = SockMod,
socket = Socket} = SocketData, Data) ->
ejabberd:start_app(ezlib),
{ok, ZlibSocket} = ezlib:enable_zlib(SockMod, Socket),
case Data of
undefined -> ok;
_ -> send(SocketData, Data)
end,
SocketData1 = SocketData#socket_state{socket = ZlibSocket,
sockmod = ezlib},
SocketData2 = reset_stream(SocketData1),
case ezlib:recv_data(ZlibSocket, <<"">>) of
{ok, ZlibData} ->
parse(SocketData2, ZlibData);
{error, _} = Err ->
Err
end.
reset_stream(#socket_state{xml_stream = XMLStream,
max_stanza_size = MaxStanzaSize} = SocketData)
when XMLStream /= undefined ->
XMLStream1 = try fxml_stream:reset(XMLStream)
catch error:_ ->
close_stream(XMLStream),
fxml_stream:new(self(), MaxStanzaSize)
end,
SocketData#socket_state{xml_stream = XMLStream1};
reset_stream(#socket_state{sockmod = SockMod, socket = Socket} = SocketData) ->
Socket1 = SockMod:reset_stream(Socket),
SocketData#socket_state{socket = Socket1}.
-spec send_element(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}.
send_element(SocketData, El) when ?is_http_socket(SocketData) ->
send_xml(SocketData, {xmlstreamelement, El});
send_element(SocketData, El) ->
send(SocketData, fxml:element_to_binary(El)).
-spec send_header(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}.
send_header(SocketData, El) when ?is_http_socket(SocketData) ->
send_xml(SocketData, {xmlstreamstart, El#xmlel.name, El#xmlel.attrs});
send_header(SocketData, El) ->
send(SocketData, fxml:element_to_header(El)).
-spec send_trailer(socket_state()) -> ok | {error, inet:posix()}.
send_trailer(SocketData) when ?is_http_socket(SocketData) ->
send_xml(SocketData, {xmlstreamend, <<"stream:stream">>});
send_trailer(SocketData) ->
send(SocketData, <<"</stream:stream>">>).
-spec send(socket_state(), iodata()) -> ok | {error, closed | inet:posix()}.
send(#socket_state{sockmod = SockMod, socket = Socket} = SocketData, Data) ->
?DEBUG("(~s) Send XML on stream = ~p", [pp(SocketData), Data]),
try SockMod:send(Socket, Data) of
{error, einval} -> {error, closed};
Result -> Result
catch _:badarg ->
%% Some modules throw badarg exceptions on closed sockets
%% TODO: their code should be improved
{error, closed}
end.
-spec send_xml(socket_state(),
{xmlstreamelement, fxml:xmlel()} |
{xmlstreamstart, binary(), [{binary(), binary()}]} |
{xmlstreamend, binary()} |
{xmlstreamraw, iodata()}) -> term().
send_xml(SocketData, El) ->
(SocketData#socket_state.sockmod):send_xml(SocketData#socket_state.socket, El).
recv(#socket_state{xml_stream = undefined} = SocketData, Data) ->
XMLStream = fxml_stream:new(self(), SocketData#socket_state.max_stanza_size),
recv(SocketData#socket_state{xml_stream = XMLStream}, Data);
recv(#socket_state{sockmod = SockMod, socket = Socket} = SocketData, Data) ->
case SockMod of
fast_tls ->
case fast_tls:recv_data(Socket, Data) of
{ok, TLSData} ->
parse(SocketData, TLSData);
{error, _} = Err ->
Err
end;
ezlib ->
case ezlib:recv_data(Socket, Data) of
{ok, ZlibData} ->
parse(SocketData, ZlibData);
{error, _} = Err ->
Err
end;
_ ->
parse(SocketData, Data)
end.
change_shaper(#socket_state{receiver = undefined} = SocketData, Shaper) ->
ShaperState = shaper:new(Shaper),
SocketData#socket_state{shaper = ShaperState};
change_shaper(#socket_state{sockmod = SockMod,
socket = Socket} = SocketData, Shaper) ->
SockMod:change_shaper(Socket, Shaper),
SocketData.
monitor(#socket_state{receiver = undefined}) ->
make_ref();
monitor(#socket_state{sockmod = SockMod, socket = Socket}) ->
SockMod:monitor(Socket).
controlling_process(#socket_state{sockmod = SockMod,
socket = Socket}, Pid) ->
SockMod:controlling_process(Socket, Pid).
become_controller(#socket_state{receiver = Receiver,
sockmod = SockMod,
socket = Socket}, Pid) ->
if is_pid(Receiver) ->
SockMod:become_controller(Receiver, Pid);
true ->
activate_after(Socket, Pid, 0)
end.
get_sockmod(SocketData) ->
SocketData#socket_state.sockmod.
get_transport(#socket_state{sockmod = SockMod,
socket = Socket}) ->
case SockMod of
gen_tcp -> tcp;
fast_tls -> tls;
ezlib ->
case ezlib:get_sockmod(Socket) of
gen_tcp -> tcp_zlib;
fast_tls -> tls_zlib
end;
ejabberd_bosh -> http_bind;
ejabberd_http_ws -> websocket
end.
get_peer_certificate(SocketData, Type) ->
fast_tls:get_peer_certificate(SocketData#socket_state.socket, Type).
get_verify_result(SocketData) ->
fast_tls:get_verify_result(SocketData#socket_state.socket).
close(#socket_state{sockmod = SockMod, socket = Socket}) ->
SockMod:close(Socket).
sockname(#socket_state{sockmod = SockMod,
socket = Socket}) ->
case SockMod of
gen_tcp -> inet:sockname(Socket);
_ -> SockMod:sockname(Socket)
end.
peername(#socket_state{sockmod = SockMod,
socket = Socket}) ->
case SockMod of
gen_tcp -> inet:peername(Socket);
_ -> SockMod:peername(Socket)
end.
activate(#socket_state{sockmod = SockMod, socket = Socket}) ->
case SockMod of
gen_tcp -> inet:setopts(Socket, [{active, once}]);
_ -> SockMod:setopts(Socket, [{active, once}])
end.
activate_after(Socket, Pid, Pause) ->
if Pause > 0 ->
erlang:send_after(Pause, Pid, {tcp, Socket, <<>>});
true ->
Pid ! {tcp, Socket, <<>>}
end,
ok.
pp(#socket_state{receiver = Receiver} = State) ->
Transport = get_transport(State),
Receiver1 = case Receiver of
undefined -> self();
_ -> Receiver
end,
io_lib:format("~s|~w", [Transport, Receiver1]).
parse(SocketData, Data) when Data == <<>>; Data == [] ->
case activate(SocketData) of
ok ->
{ok, SocketData};
{error, _} = Err ->
Err
end;
parse(SocketData, [El | Els]) when is_record(El, xmlel) ->
self() ! {'$gen_event', {xmlstreamelement, El}},
parse(SocketData, Els);
parse(SocketData, [El | Els]) when
element(1, El) == xmlstreamstart;
element(1, El) == xmlstreamelement;
element(1, El) == xmlstreamend;
element(1, El) == xmlstreamerror ->
self() ! {'$gen_event', El},
parse(SocketData, Els);
parse(#socket_state{xml_stream = XMLStream,
socket = Socket,
shaper = ShaperState} = SocketData, Data)
when is_binary(Data) ->
XMLStream1 = fxml_stream:parse(XMLStream, Data),
{ShaperState1, Pause} = shaper:update(ShaperState, byte_size(Data)),
Ret = if Pause > 0 ->
activate_after(Socket, self(), Pause);
true ->
activate(SocketData)
end,
case Ret of
ok ->
{ok, SocketData#socket_state{xml_stream = XMLStream1,
shaper = ShaperState1}};
{error, _} = Err ->
Err
end.
close_stream(undefined) ->
ok;
close_stream(XMLStream) ->
fxml_stream:close(XMLStream).

View File

@ -177,16 +177,17 @@ set_timeout(#{owner := Owner} = State, Timeout) when Owner == self() ->
set_timeout(_, _) -> set_timeout(_, _) ->
erlang:error(badarg). erlang:error(badarg).
get_transport(#{sockmod := SockMod, socket := Socket, owner := Owner}) get_transport(#{socket := Socket, owner := Owner})
when Owner == self() -> when Owner == self() ->
SockMod:get_transport(Socket); xmpp_socket:get_transport(Socket);
get_transport(_) -> get_transport(_) ->
erlang:error(badarg). erlang:error(badarg).
-spec change_shaper(state(), shaper:shaper()) -> ok. -spec change_shaper(state(), shaper:shaper()) -> state().
change_shaper(#{sockmod := SockMod, socket := Socket, owner := Owner}, Shaper) change_shaper(#{socket := Socket, owner := Owner} = State, Shaper)
when Owner == self() -> when Owner == self() ->
SockMod:change_shaper(Socket, Shaper); Socket1 = xmpp_socket:change_shaper(Socket, Shaper),
State#{socket => Socket1};
change_shaper(_, _) -> change_shaper(_, _) ->
erlang:error(badarg). erlang:error(badarg).
@ -209,16 +210,15 @@ format_error(Err) ->
%%%=================================================================== %%%===================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%=================================================================== %%%===================================================================
init([Module, {SockMod, Socket}, Opts]) -> init([Module, {_SockMod, Socket}, Opts]) ->
Encrypted = proplists:get_bool(tls, Opts), Encrypted = proplists:get_bool(tls, Opts),
SocketMonitor = SockMod:monitor(Socket), SocketMonitor = xmpp_socket:monitor(Socket),
case SockMod:peername(Socket) of case xmpp_socket:peername(Socket) of
{ok, IP} -> {ok, IP} ->
Time = p1_time_compat:monotonic_time(milli_seconds), Time = p1_time_compat:monotonic_time(milli_seconds),
State = #{owner => self(), State = #{owner => self(),
mod => Module, mod => Module,
socket => Socket, socket => Socket,
sockmod => SockMod,
socket_monitor => SocketMonitor, socket_monitor => SocketMonitor,
stream_timeout => {timer:seconds(30), Time}, stream_timeout => {timer:seconds(30), Time},
stream_direction => in, stream_direction => in,
@ -247,7 +247,7 @@ init([Module, {SockMod, Socket}, Opts]) ->
TLSOpts = try Module:tls_options(State1) TLSOpts = try Module:tls_options(State1)
catch _:undef -> [] catch _:undef -> []
end, end,
case SockMod:starttls(Socket, TLSOpts) of case xmpp_socket:starttls(Socket, TLSOpts) of
{ok, TLSSocket} -> {ok, TLSSocket} ->
State2 = State1#{socket => TLSSocket}, State2 = State1#{socket => TLSSocket},
{_, State3, Timeout} = noreply(State2), {_, State3, Timeout} = noreply(State2),
@ -333,8 +333,7 @@ handle_info({'$gen_event', {xmlstreamerror, Reason}}, #{lang := Lang}= State) ->
send_pkt(State1, Err) send_pkt(State1, Err)
end); end);
handle_info({'$gen_event', El}, #{stream_state := wait_for_stream} = State) -> handle_info({'$gen_event', El}, #{stream_state := wait_for_stream} = State) ->
%% TODO: find and fix this in fast_xml error_logger:error_msg("unexpected event from XML driver: ~p; "
error_logger:error_msg("unexpected event from receiver: ~p; "
"xmlstreamstart was expected", [El]), "xmlstreamstart was expected", [El]),
State1 = send_header(State), State1 = send_header(State),
noreply( noreply(
@ -379,6 +378,21 @@ handle_info(timeout, #{mod := Mod} = State) ->
handle_info({'DOWN', MRef, _Type, _Object, _Info}, handle_info({'DOWN', MRef, _Type, _Object, _Info},
#{socket_monitor := MRef} = State) -> #{socket_monitor := MRef} = State) ->
noreply(process_stream_end({socket, closed}, State)); noreply(process_stream_end({socket, closed}, State));
handle_info({tcp, _, Data}, #{socket := Socket} = State) ->
noreply(
case xmpp_socket:recv(Socket, Data) of
{ok, NewSocket} ->
State#{socket => NewSocket};
{error, Reason} when is_atom(Reason) ->
process_stream_end({socket, Reason}, State);
{error, Reason} ->
%% TODO: make fast_tls return atoms
process_stream_end({tls, Reason}, State)
end);
handle_info({tcp_closed, _}, State) ->
handle_info({'$gen_event', closed}, State);
handle_info({tcp_error, _, Reason}, State) ->
noreply(process_stream_end({socket, Reason}, State));
handle_info(Info, #{mod := Mod} = State) -> handle_info(Info, #{mod := Mod} = State) ->
noreply(try Mod:handle_info(Info, State) noreply(try Mod:handle_info(Info, State)
catch _:undef -> State catch _:undef -> State
@ -698,14 +712,14 @@ process_compress(#compress{},
when Compressed or not Authenticated -> when Compressed or not Authenticated ->
send_pkt(State, #compress_failure{reason = 'setup-failed'}); send_pkt(State, #compress_failure{reason = 'setup-failed'});
process_compress(#compress{methods = HisMethods}, process_compress(#compress{methods = HisMethods},
#{socket := Socket, sockmod := SockMod, mod := Mod} = State) -> #{socket := Socket, mod := Mod} = State) ->
MyMethods = try Mod:compress_methods(State) MyMethods = try Mod:compress_methods(State)
catch _:undef -> [] catch _:undef -> []
end, end,
CommonMethods = lists_intersection(MyMethods, HisMethods), CommonMethods = lists_intersection(MyMethods, HisMethods),
case lists:member(<<"zlib">>, CommonMethods) of case lists:member(<<"zlib">>, CommonMethods) of
true -> true ->
case SockMod:compress(Socket) of case xmpp_socket:compress(Socket) of
{ok, ZlibSocket} -> {ok, ZlibSocket} ->
State1 = send_pkt(State, #compressed{}), State1 = send_pkt(State, #compressed{}),
case is_disconnected(State1) of case is_disconnected(State1) of
@ -730,13 +744,13 @@ process_compress(#compress{methods = HisMethods},
process_starttls(#{stream_encrypted := true} = State) -> process_starttls(#{stream_encrypted := true} = State) ->
process_starttls_failure(already_encrypted, State); process_starttls_failure(already_encrypted, State);
process_starttls(#{socket := Socket, process_starttls(#{socket := Socket,
sockmod := SockMod, mod := Mod} = State) -> mod := Mod} = State) ->
case is_starttls_available(State) of case is_starttls_available(State) of
true -> true ->
TLSOpts = try Mod:tls_options(State) TLSOpts = try Mod:tls_options(State)
catch _:undef -> [] catch _:undef -> []
end, end,
case SockMod:starttls(Socket, TLSOpts) of case xmpp_socket:starttls(Socket, TLSOpts) of
{ok, TLSSocket} -> {ok, TLSSocket} ->
State1 = send_pkt(State, #starttls_proceed{}), State1 = send_pkt(State, #starttls_proceed{}),
case is_disconnected(State1) of case is_disconnected(State1) of
@ -814,12 +828,13 @@ process_sasl_result({error, Reason, User}, State) ->
-spec process_sasl_success([cyrsasl:sasl_property()], binary(), state()) -> state(). -spec process_sasl_success([cyrsasl:sasl_property()], binary(), state()) -> state().
process_sasl_success(Props, ServerOut, process_sasl_success(Props, ServerOut,
#{socket := Socket, sockmod := SockMod, #{socket := Socket,
mod := Mod, sasl_mech := Mech} = State) -> mod := Mod, sasl_mech := Mech} = State) ->
User = identity(Props), User = identity(Props),
AuthModule = proplists:get_value(auth_module, Props), AuthModule = proplists:get_value(auth_module, Props),
SockMod:reset_stream(Socket), Socket1 = xmpp_socket:reset_stream(Socket),
State1 = send_pkt(State, #sasl_success{text = ServerOut}), State0 = State#{socket => Socket1},
State1 = send_pkt(State0, #sasl_success{text = ServerOut}),
case is_disconnected(State1) of case is_disconnected(State1) of
true -> State1; true -> State1;
false -> false ->
@ -1090,17 +1105,17 @@ send_trailer(State) ->
close_socket(State). close_socket(State).
-spec socket_send(state(), xmpp_element() | xmlel() | trailer) -> ok | {error, inet:posix()}. -spec socket_send(state(), xmpp_element() | xmlel() | trailer) -> ok | {error, inet:posix()}.
socket_send(#{socket := Sock, sockmod := SockMod, socket_send(#{socket := Sock,
stream_state := StateName, stream_state := StateName,
xmlns := NS, xmlns := NS,
stream_header_sent := true}, Pkt) -> stream_header_sent := true}, Pkt) ->
case Pkt of case Pkt of
trailer -> trailer ->
SockMod:send_trailer(Sock); xmpp_socket:send_trailer(Sock);
#stream_start{} when StateName /= disconnected -> #stream_start{} when StateName /= disconnected ->
SockMod:send_header(Sock, xmpp:encode(Pkt)); xmpp_socket:send_header(Sock, xmpp:encode(Pkt));
_ when StateName /= disconnected -> _ when StateName /= disconnected ->
SockMod:send_element(Sock, xmpp:encode(Pkt, NS)); xmpp_socket:send_element(Sock, xmpp:encode(Pkt, NS));
_ -> _ ->
{error, closed} {error, closed}
end; end;
@ -1108,8 +1123,8 @@ socket_send(_, _) ->
{error, closed}. {error, closed}.
-spec close_socket(state()) -> state(). -spec close_socket(state()) -> state().
close_socket(#{sockmod := SockMod, socket := Socket} = State) -> close_socket(#{socket := Socket} = State) ->
SockMod:close(Socket), xmpp_socket:close(Socket),
State#{stream_timeout => infinity, State#{stream_timeout => infinity,
stream_state => disconnected}. stream_state => disconnected}.

View File

@ -191,16 +191,17 @@ set_timeout(#{owner := Owner} = State, Timeout) when Owner == self() ->
set_timeout(_, _) -> set_timeout(_, _) ->
erlang:error(badarg). erlang:error(badarg).
get_transport(#{sockmod := SockMod, socket := Socket, owner := Owner}) get_transport(#{socket := Socket, owner := Owner})
when Owner == self() -> when Owner == self() ->
SockMod:get_transport(Socket); xmpp_socket:get_transport(Socket);
get_transport(_) -> get_transport(_) ->
erlang:error(badarg). erlang:error(badarg).
-spec change_shaper(state(), shaper:shaper()) -> ok. -spec change_shaper(state(), shaper:shaper()) -> state().
change_shaper(#{sockmod := SockMod, socket := Socket, owner := Owner}, Shaper) change_shaper(#{socket := Socket, owner := Owner} = State, Shaper)
when Owner == self() -> when Owner == self() ->
SockMod:change_shaper(Socket, Shaper); Socket1 = xmpp_socket:change_shaper(Socket, Shaper),
State#{socket => Socket1};
change_shaper(_, _) -> change_shaper(_, _) ->
erlang:error(badarg). erlang:error(badarg).
@ -233,11 +234,10 @@ format_error(Err) ->
%%% gen_server callbacks %%% gen_server callbacks
%%%=================================================================== %%%===================================================================
-spec init(list()) -> {ok, state(), timeout()} | {stop, term()} | ignore. -spec init(list()) -> {ok, state(), timeout()} | {stop, term()} | ignore.
init([Mod, SockMod, From, To, Opts]) -> init([Mod, _SockMod, From, To, Opts]) ->
Time = p1_time_compat:monotonic_time(milli_seconds), Time = p1_time_compat:monotonic_time(milli_seconds),
State = #{owner => self(), State = #{owner => self(),
mod => Mod, mod => Mod,
sockmod => SockMod,
server => From, server => From,
user => <<"">>, user => <<"">>,
resource => <<"">>, resource => <<"">>,
@ -272,7 +272,6 @@ handle_call(Call, From, #{mod := Mod} = State) ->
-spec handle_cast(term(), state()) -> noreply(). -spec handle_cast(term(), state()) -> noreply().
handle_cast(connect, #{remote_server := RemoteServer, handle_cast(connect, #{remote_server := RemoteServer,
sockmod := SockMod,
stream_state := connecting} = State) -> stream_state := connecting} = State) ->
noreply( noreply(
case idna_to_ascii(RemoteServer) of case idna_to_ascii(RemoteServer) of
@ -283,7 +282,7 @@ handle_cast(connect, #{remote_server := RemoteServer,
{ok, AddrPorts} -> {ok, AddrPorts} ->
case connect(AddrPorts, State) of case connect(AddrPorts, State) of
{ok, Socket, {Addr, Port, Encrypted}} -> {ok, Socket, {Addr, Port, Encrypted}} ->
SocketMonitor = SockMod:monitor(Socket), SocketMonitor = xmpp_socket:monitor(Socket),
State1 = State#{ip => {Addr, Port}, State1 = State#{ip => {Addr, Port},
socket => Socket, socket => Socket,
stream_encrypted => Encrypted, stream_encrypted => Encrypted,
@ -388,6 +387,21 @@ handle_info(timeout, #{mod := Mod} = State) ->
handle_info({'DOWN', MRef, _Type, _Object, _Info}, handle_info({'DOWN', MRef, _Type, _Object, _Info},
#{socket_monitor := MRef} = State) -> #{socket_monitor := MRef} = State) ->
noreply(process_stream_end({socket, closed}, State)); noreply(process_stream_end({socket, closed}, State));
handle_info({tcp, _, Data}, #{socket := Socket} = State) ->
noreply(
case xmpp_socket:recv(Socket, Data) of
{ok, NewSocket} ->
State#{socket => NewSocket};
{error, Reason} when is_atom(Reason) ->
process_stream_end({socket, Reason}, State);
{error, Reason} ->
%% TODO: make fast_tls return atoms
process_stream_end({tls, Reason}, State)
end);
handle_info({tcp_closed, _}, State) ->
handle_info({'$gen_event', closed}, State);
handle_info({tcp_error, _, Reason}, State) ->
noreply(process_stream_end({socket, Reason}, State));
handle_info(Info, #{mod := Mod} = State) -> handle_info(Info, #{mod := Mod} = State) ->
noreply(try Mod:handle_info(Info, State) noreply(try Mod:handle_info(Info, State)
catch _:undef -> State catch _:undef -> State
@ -638,13 +652,13 @@ process_cert_verification(State) ->
-spec process_sasl_success(state()) -> state(). -spec process_sasl_success(state()) -> state().
process_sasl_success(#{mod := Mod, process_sasl_success(#{mod := Mod,
sockmod := SockMod,
socket := Socket} = State) -> socket := Socket} = State) ->
SockMod:reset_stream(Socket), Socket1 = xmpp_socket:reset_stream(Socket),
State1 = State#{stream_id => new_id(), State0 = State#{socket => Socket1},
stream_restarted => true, State1 = State0#{stream_id => new_id(),
stream_state => wait_for_stream, stream_restarted => true,
stream_authenticated => true}, stream_state => wait_for_stream,
stream_authenticated => true},
State2 = send_header(State1), State2 = send_header(State1),
case is_disconnected(State2) of case is_disconnected(State2) of
true -> State2; true -> State2;
@ -745,15 +759,15 @@ send_error(State, Pkt, Err) ->
end. end.
-spec socket_send(state(), xmpp_element() | xmlel() | trailer) -> ok | {error, inet:posix()}. -spec socket_send(state(), xmpp_element() | xmlel() | trailer) -> ok | {error, inet:posix()}.
socket_send(#{sockmod := SockMod, socket := Socket, xmlns := NS, socket_send(#{socket := Socket, xmlns := NS,
stream_state := StateName}, Pkt) -> stream_state := StateName}, Pkt) ->
case Pkt of case Pkt of
trailer -> trailer ->
SockMod:send_trailer(Socket); xmpp_socket:send_trailer(Socket);
#stream_start{} when StateName /= disconnected -> #stream_start{} when StateName /= disconnected ->
SockMod:send_header(Socket, xmpp:encode(Pkt)); xmpp_socket:send_header(Socket, xmpp:encode(Pkt));
_ when StateName /= disconnected -> _ when StateName /= disconnected ->
SockMod:send_element(Socket, xmpp:encode(Pkt, NS)); xmpp_socket:send_element(Socket, xmpp:encode(Pkt, NS));
_ -> _ ->
{error, closed} {error, closed}
end; end;
@ -768,8 +782,8 @@ send_trailer(State) ->
-spec close_socket(state()) -> state(). -spec close_socket(state()) -> state().
close_socket(State) -> close_socket(State) ->
case State of case State of
#{sockmod := SockMod, socket := Socket} -> #{socket := Socket} ->
SockMod:close(Socket); xmpp_socket:close(Socket);
_ -> _ ->
ok ok
end, end,
@ -777,8 +791,8 @@ close_socket(State) ->
stream_state => disconnected}. stream_state => disconnected}.
-spec starttls(term(), state()) -> {ok, term()} | {error, tls_error_reason()}. -spec starttls(term(), state()) -> {ok, term()} | {error, tls_error_reason()}.
starttls(Socket, #{sockmod := SockMod, mod := Mod, starttls(Socket, #{mod := Mod, xmlns := NS,
xmlns := NS, remote_server := RemoteServer} = State) -> remote_server := RemoteServer} = State) ->
TLSOpts = try Mod:tls_options(State) TLSOpts = try Mod:tls_options(State)
catch _:undef -> [] catch _:undef -> []
end, end,
@ -787,7 +801,7 @@ starttls(Socket, #{sockmod := SockMod, mod := Mod,
?NS_SERVER -> <<"xmpp-server">>; ?NS_SERVER -> <<"xmpp-server">>;
?NS_CLIENT -> <<"xmpp-client">> ?NS_CLIENT -> <<"xmpp-client">>
end, end,
SockMod:starttls(Socket, [connect, {sni, SNI}, {alpn, [ALPN]}|TLSOpts]). xmpp_socket:starttls(Socket, [connect, {sni, SNI}, {alpn, [ALPN]}|TLSOpts]).
-spec select_lang(binary(), binary()) -> binary(). -spec select_lang(binary(), binary()) -> binary().
select_lang(Lang, <<"">>) -> Lang; select_lang(Lang, <<"">>) -> Lang;
@ -1020,9 +1034,9 @@ host_entry_to_addr_ports(#hostent{h_addr_list = AddrList}, Port, TLS) ->
-spec connect([ip_port()], state()) -> {ok, term(), ip_port()} | -spec connect([ip_port()], state()) -> {ok, term(), ip_port()} |
{error, {socket, socket_error_reason()}} | {error, {socket, socket_error_reason()}} |
{error, {tls, tls_error_reason()}}. {error, {tls, tls_error_reason()}}.
connect(AddrPorts, #{sockmod := SockMod} = State) -> connect(AddrPorts, State) ->
Timeout = get_connect_timeout(State), Timeout = get_connect_timeout(State),
case connect(AddrPorts, SockMod, Timeout, {error, nxdomain}) of case connect(AddrPorts, Timeout, {error, nxdomain}) of
{ok, Socket, {Addr, Port, TLS = true}} -> {ok, Socket, {Addr, Port, TLS = true}} ->
case starttls(Socket, State) of case starttls(Socket, State) of
{ok, TLSSocket} -> {ok, TLSSocket, {Addr, Port, TLS}}; {ok, TLSSocket} -> {ok, TLSSocket, {Addr, Port, TLS}};
@ -1034,24 +1048,24 @@ connect(AddrPorts, #{sockmod := SockMod} = State) ->
{error, {socket, Why}} {error, {socket, Why}}
end. end.
-spec connect([ip_port()], module(), timeout(), network_error()) -> -spec connect([ip_port()], timeout(), network_error()) ->
{ok, term(), ip_port()} | network_error(). {ok, term(), ip_port()} | network_error().
connect([{Addr, Port, TLS}|AddrPorts], SockMod, Timeout, _) -> connect([{Addr, Port, TLS}|AddrPorts], Timeout, _) ->
Type = get_addr_type(Addr), Type = get_addr_type(Addr),
try SockMod:connect(Addr, Port, try xmpp_socket:connect(Addr, Port,
[binary, {packet, 0}, [binary, {packet, 0},
{send_timeout, ?TCP_SEND_TIMEOUT}, {send_timeout, ?TCP_SEND_TIMEOUT},
{send_timeout_close, true}, {send_timeout_close, true},
{active, false}, Type], {active, false}, Type],
Timeout) of Timeout) of
{ok, Socket} -> {ok, Socket} ->
{ok, Socket, {Addr, Port, TLS}}; {ok, Socket, {Addr, Port, TLS}};
Err -> Err ->
connect(AddrPorts, SockMod, Timeout, Err) connect(AddrPorts, Timeout, Err)
catch _:badarg -> catch _:badarg ->
connect(AddrPorts, SockMod, Timeout, {error, einval}) connect(AddrPorts, Timeout, {error, einval})
end; end;
connect([], _SockMod, _Timeout, Err) -> connect([], _Timeout, Err) ->
Err. Err.
-spec get_addr_type(inet:ip_address()) -> inet:address_family(). -spec get_addr_type(inet:ip_address()) -> inet:address_family().

View File

@ -40,10 +40,10 @@ authenticate(State) ->
-spec authenticate(xmpp_stream_in:state() | xmpp_stream_out:state(), binary()) -spec authenticate(xmpp_stream_in:state() | xmpp_stream_out:state(), binary())
-> {ok, binary()} | {error, atom(), binary()}. -> {ok, binary()} | {error, atom(), binary()}.
authenticate(#{xmlns := ?NS_SERVER, sockmod := SockMod, authenticate(#{xmlns := ?NS_SERVER,
socket := Socket} = State, Authzid) -> socket := Socket} = State, Authzid) ->
Peer = maps:get(remote_server, State, Authzid), Peer = maps:get(remote_server, State, Authzid),
case verify_cert(SockMod, Socket) of case verify_cert(Socket) of
{ok, Cert} -> {ok, Cert} ->
case ejabberd_idna:domain_utf8_to_ascii(Peer) of case ejabberd_idna:domain_utf8_to_ascii(Peer) of
false -> false ->
@ -61,7 +61,7 @@ authenticate(#{xmlns := ?NS_SERVER, sockmod := SockMod,
{error, Reason} -> {error, Reason} ->
{error, Reason, Peer} {error, Reason, Peer}
end; end;
authenticate(#{xmlns := ?NS_CLIENT, sockmod := SockMod, authenticate(#{xmlns := ?NS_CLIENT,
socket := Socket, lserver := LServer}, Authzid) -> socket := Socket, lserver := LServer}, Authzid) ->
JID = try jid:decode(Authzid) JID = try jid:decode(Authzid)
catch _:{bad_jid, <<>>} -> jid:make(LServer); catch _:{bad_jid, <<>>} -> jid:make(LServer);
@ -69,7 +69,7 @@ authenticate(#{xmlns := ?NS_CLIENT, sockmod := SockMod,
end, end,
case JID of case JID of
#jid{user = User} -> #jid{user = User} ->
case verify_cert(SockMod, Socket) of case verify_cert(Socket) of
{ok, Cert} -> {ok, Cert} ->
JIDs = get_xmpp_addrs(Cert), JIDs = get_xmpp_addrs(Cert),
get_username(JID, JIDs, LServer); get_username(JID, JIDs, LServer);
@ -104,11 +104,11 @@ get_cert_domains(Cert) ->
%%%=================================================================== %%%===================================================================
%%% Internal functions %%% Internal functions
%%%=================================================================== %%%===================================================================
-spec verify_cert(module(), ejabberd_socket:socket()) -> {ok, cert()} | {error, atom()}. -spec verify_cert(xmpp_socket:socket()) -> {ok, cert()} | {error, atom()}.
verify_cert(SockMod, Socket) -> verify_cert(Socket) ->
case SockMod:get_peer_certificate(Socket, otp) of case xmpp_socket:get_peer_certificate(Socket, otp) of
{ok, Cert} -> {ok, Cert} ->
case SockMod:get_verify_result(Socket) of case xmpp_socket:get_verify_result(Socket) of
0 -> 0 ->
{ok, Cert}; {ok, Cert};
VerifyRes -> VerifyRes ->