From 2d43c07c624d6c85f0d7c8b3274c9d30e458d95e Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Tue, 26 Dec 2017 18:55:57 +0300 Subject: [PATCH] Get rid of ejabberd receiver ejabberd receivers were meant to serve connections from frontends to backends. However, this approach was not popular and frontend related code was removed in previous releases. Now, ejabberd receiver's code was also removed, making the code shorter and cleaner. Also, in stress tests ejabberd now handles load more robustly, without c2s processes overload (even with disabled shapers). ejabberd_socket.erl is renamed to xmpp_socket.erl: it's supposed to be finally moved into stand-alone xmpp library. --- rebar.config | 3 +- src/ejabberd_bosh.erl | 15 +- src/ejabberd_c2s.erl | 42 ++--- src/ejabberd_http_ws.erl | 20 +- src/ejabberd_listener.erl | 2 +- src/ejabberd_receiver.erl | 357 ----------------------------------- src/ejabberd_s2s_in.erl | 24 ++- src/ejabberd_s2s_out.erl | 14 +- src/ejabberd_service.erl | 23 +-- src/ejabberd_socket.erl | 293 ----------------------------- src/mod_s2s_dialback.erl | 6 +- src/mod_stream_mgmt.erl | 4 +- src/xmpp_socket.erl | 386 ++++++++++++++++++++++++++++++++++++++ src/xmpp_stream_in.erl | 65 ++++--- src/xmpp_stream_out.erl | 88 +++++---- src/xmpp_stream_pkix.erl | 16 +- 16 files changed, 561 insertions(+), 797 deletions(-) delete mode 100644 src/ejabberd_receiver.erl delete mode 100644 src/ejabberd_socket.erl create mode 100644 src/xmpp_socket.erl diff --git a/rebar.config b/rebar.config index 857133829..af6d36ba4 100644 --- a/rebar.config +++ b/rebar.config @@ -81,7 +81,8 @@ iconv]}}. {erl_first_files, ["src/ejabberd_sql_pt.erl", "src/ejabberd_config.erl", - "src/gen_mod.erl", "src/mod_muc_room.erl", "src/mod_push.erl"]}. + "src/gen_mod.erl", "src/mod_muc_room.erl", + "src/mod_push.erl", "src/xmpp_socket.erl"]}. {erl_opts, [nowarn_deprecated_function, {i, "include"}, diff --git a/src/ejabberd_bosh.erl b/src/ejabberd_bosh.erl index 1df6681ff..710e24ae4 100644 --- a/src/ejabberd_bosh.erl +++ b/src/ejabberd_bosh.erl @@ -33,7 +33,7 @@ -export([start/2, start/3, start_link/3]). -export([send_xml/2, setopts/2, controlling_process/2, - migrate/3, custom_receiver/1, become_controller/2, + migrate/3, become_controller/2, reset_stream/1, change_shaper/2, monitor/1, close/1, sockname/1, peername/1, process_request/3, send/2, change_controller/2]). @@ -175,9 +175,6 @@ setopts({http_bind, FsmRef, _IP}, Opts) -> controlling_process(_Socket, _Pid) -> ok. -custom_receiver({http_bind, FsmRef, _IP}) -> - {receiver, ?MODULE, FsmRef}. - become_controller(FsmRef, C2SPid) -> p1_fsm:send_all_state_event(FsmRef, {become_controller, C2SPid}). @@ -185,11 +182,11 @@ become_controller(FsmRef, C2SPid) -> change_controller({http_bind, FsmRef, _IP}, C2SPid) -> become_controller(FsmRef, C2SPid). -reset_stream({http_bind, _FsmRef, _IP}) -> ok. +reset_stream({http_bind, _FsmRef, _IP} = Socket) -> + Socket. change_shaper({http_bind, FsmRef, _IP}, Shaper) -> - p1_fsm:send_all_state_event(FsmRef, - {change_shaper, Shaper}). + p1_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}). monitor({http_bind, FsmRef, _IP}) -> erlang:monitor(process, FsmRef). @@ -306,8 +303,8 @@ init([#body{attrs = Attrs}, IP, SID]) -> buf_new(XMPPDomain)), Opts2} end, - ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, - Opts), + xmpp_socket:start(ejabberd_c2s, ?MODULE, Socket, + [{receiver, self()}|Opts]), Inactivity = gen_mod:get_module_opt(XMPPDomain, mod_bosh, max_inactivity, ?DEFAULT_INACTIVITY), diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 6794ef163..35353bb84 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -22,11 +22,11 @@ -module(ejabberd_c2s). -behaviour(xmpp_stream_in). -behaviour(ejabberd_config). --behaviour(ejabberd_socket). +-behaviour(xmpp_socket). -protocol({rfc, 6121}). -%% ejabberd_socket callbacks +%% xmpp_socket callbacks -export([start/2, start_link/2, socket_type/0]). %% ejabberd_config callbacks -export([opt_type/1, listen_opt_type/1, transform_listen_option/2]). @@ -62,7 +62,7 @@ -export_type([state/0]). %%%=================================================================== -%%% ejabberd_socket API +%%% xmpp_socket API %%%=================================================================== start(SockData, Opts) -> case proplists:get_value(supervisor, Opts, true) of @@ -203,16 +203,16 @@ copy_state(#{owner := Owner} = NewState, open_session(#{user := U, server := S, resource := R, sid := SID, ip := IP, auth_module := AuthModule} = State) -> JID = jid:make(U, S, R), - change_shaper(State), - Conn = get_conn_type(State), - State1 = State#{conn => Conn, resource => R, jid => JID}, + State1 = change_shaper(State), + Conn = get_conn_type(State1), + State2 = State1#{conn => Conn, resource => R, jid => JID}, Prio = case maps:get(pres_last, State, undefined) of undefined -> undefined; Pres -> get_priority_from_presence(Pres) end, Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthModule}], ejabberd_sm:open_session(SID, U, S, R, Prio, Info), - xmpp_stream_in:establish(State1). + xmpp_stream_in:establish(State2). %%%=================================================================== %%% Hooks @@ -264,12 +264,12 @@ reject_unauthenticated_packet(State, _Pkt) -> process_closed(State, Reason) -> stop(State#{stop_reason => Reason}). -process_terminated(#{sid := SID, sockmod := SockMod, socket := Socket, +process_terminated(#{sid := SID, socket := Socket, jid := JID, user := U, server := S, resource := R} = State, Reason) -> Status = format_reason(State, Reason), ?INFO_MSG("(~s) Closing c2s session for ~s: ~s", - [SockMod:pp(Socket), jid:encode(JID), Status]), + [xmpp_socket:pp(Socket), jid:encode(JID), Status]), State1 = case maps:is_key(pres_last, State) of true -> Pres = #presence{type = unavailable, @@ -285,10 +285,10 @@ process_terminated(#{sid := SID, sockmod := SockMod, socket := Socket, end, bounce_message_queue(), State1; -process_terminated(#{sockmod := SockMod, socket := Socket, +process_terminated(#{socket := Socket, stop_reason := {tls, _}} = State, Reason) -> ?WARNING_MSG("(~s) Failed to secure c2s connection: ~s", - [SockMod:pp(Socket), format_reason(State, Reason)]), + [xmpp_socket:pp(Socket), format_reason(State, Reason)]), State; process_terminated(State, _Reason) -> State. @@ -385,7 +385,7 @@ check_password_digest_fun(#{lserver := LServer}) -> bind(<<"">>, State) -> bind(new_uniq_id(), State); bind(R, #{user := U, server := S, access := Access, lang := Lang, - lserver := LServer, sockmod := SockMod, socket := Socket, + lserver := LServer, socket := Socket, ip := IP} = State) -> case resource_conflict_action(U, S, R) of closenew -> @@ -401,12 +401,12 @@ bind(R, #{user := U, server := S, access := Access, lang := Lang, State2 = ejabberd_hooks:run_fold( c2s_session_opened, LServer, State1, []), ?INFO_MSG("(~s) Opened c2s session for ~s", - [SockMod:pp(Socket), jid:encode(JID)]), + [xmpp_socket:pp(Socket), jid:encode(JID)]), {ok, State2}; deny -> ejabberd_hooks:run(forbidden_session_hook, LServer, [JID]), ?INFO_MSG("(~s) Forbidden c2s session for ~s", - [SockMod:pp(Socket), jid:encode(JID)]), + [xmpp_socket:pp(Socket), jid:encode(JID)]), Txt = <<"Access denied by service policy">>, {error, xmpp:err_not_allowed(Txt, Lang), State} end @@ -417,9 +417,9 @@ handle_stream_start(StreamStart, #{lserver := LServer} = State) -> false -> send(State#{lserver => ?MYNAME}, xmpp:serr_host_unknown()); true -> - change_shaper(State), + State1 = change_shaper(State), ejabberd_hooks:run_fold( - c2s_stream_started, LServer, State, [StreamStart]) + c2s_stream_started, LServer, State1, [StreamStart]) end. handle_stream_end(Reason, #{lserver := LServer} = State) -> @@ -427,20 +427,20 @@ handle_stream_end(Reason, #{lserver := LServer} = State) -> ejabberd_hooks:run_fold(c2s_closed, LServer, State1, [Reason]). handle_auth_success(User, Mech, AuthModule, - #{socket := Socket, sockmod := SockMod, + #{socket := Socket, ip := IP, lserver := LServer} = State) -> ?INFO_MSG("(~s) Accepted c2s ~s authentication for ~s@~s by ~s backend from ~s", - [SockMod:pp(Socket), Mech, User, LServer, + [xmpp_socket:pp(Socket), Mech, User, LServer, ejabberd_auth:backend_type(AuthModule), ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), State1 = State#{auth_module => AuthModule}, ejabberd_hooks:run_fold(c2s_auth_result, LServer, State1, [true, User]). handle_auth_failure(User, Mech, Reason, - #{socket := Socket, sockmod := SockMod, + #{socket := Socket, ip := IP, lserver := LServer} = State) -> ?INFO_MSG("(~s) Failed c2s ~s authentication ~sfrom ~s: ~s", - [SockMod:pp(Socket), Mech, + [xmpp_socket:pp(Socket), Mech, if User /= <<"">> -> ["for ", User, "@", LServer, " "]; true -> "" end, @@ -912,7 +912,7 @@ fix_from_to(Pkt, #{jid := JID}) when ?is_stanza(Pkt) -> fix_from_to(Pkt, _State) -> Pkt. --spec change_shaper(state()) -> ok. +-spec change_shaper(state()) -> state(). change_shaper(#{shaper := ShaperName, ip := IP, lserver := LServer, user := U, server := S, resource := R} = State) -> JID = jid:make(U, S, R), diff --git a/src/ejabberd_http_ws.erl b/src/ejabberd_http_ws.erl index f9f7b07e9..4a5d0b1b6 100644 --- a/src/ejabberd_http_ws.erl +++ b/src/ejabberd_http_ws.erl @@ -34,7 +34,8 @@ handle_sync_event/4, code_change/4, handle_info/3, terminate/3, send_xml/2, setopts/2, sockname/1, peername/1, controlling_process/2, become_controller/2, - close/1, socket_handoff/6, opt_type/1]). + monitor/1, reset_stream/1, close/1, change_shaper/2, + socket_handoff/6, opt_type/1]). -include("ejabberd.hrl"). -include("logger.hrl"). @@ -105,12 +106,21 @@ peername({http_ws, _FsmRef, IP}) -> {ok, IP}. controlling_process(_Socket, _Pid) -> ok. become_controller(FsmRef, C2SPid) -> - p1_fsm:send_all_state_event(FsmRef, - {become_controller, C2SPid}). + p1_fsm:send_all_state_event(FsmRef, {activate, C2SPid}). close({http_ws, FsmRef, _IP}) -> catch p1_fsm:sync_send_all_state_event(FsmRef, close). +monitor({http_ws, FsmRef, _IP}) -> + erlang:monitor(process, FsmRef). + +reset_stream({http_ws, _FsmRef, _IP} = Socket) -> + Socket. + +change_shaper({http_ws, _FsmRef, _IP}, _Shaper) -> + %% TODO??? + ok. + socket_handoff(LocalPath, Request, Socket, SockMod, Buf, Opts) -> ejabberd_websocket:socket_handoff(LocalPath, Request, Socket, SockMod, Buf, Opts, ?MODULE, fun get_human_html_xmlel/0). @@ -136,8 +146,8 @@ init([{#ws{ip = IP, http_opts = HOpts}, _} = WS]) -> Socket = {http_ws, self(), IP}, ?DEBUG("Client connected through websocket ~p", [Socket]), - ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, - Opts), + xmpp_socket:start(ejabberd_c2s, ?MODULE, Socket, + [{receiver, self()}|Opts]), Timer = erlang:start_timer(WSTimeout, self(), []), {ok, loop, #state{socket = Socket, timeout = WSTimeout, diff --git a/src/ejabberd_listener.erl b/src/ejabberd_listener.erl index 248e3faf0..dd4842f58 100644 --- a/src/ejabberd_listener.erl +++ b/src/ejabberd_listener.erl @@ -294,7 +294,7 @@ accept(ListenSocket, Module, Opts, Interval) -> {ok, Socket} -> case {inet:sockname(Socket), inet:peername(Socket)} of {{ok, {Addr, Port}}, {ok, {PAddr, PPort}}} -> - Receiver = case ejabberd_socket:start(Module, + Receiver = case xmpp_socket:start(Module, gen_tcp, Socket, Opts) of {ok, RecvPid} -> RecvPid; _ -> none diff --git a/src/ejabberd_receiver.erl b/src/ejabberd_receiver.erl deleted file mode 100644 index 52077ac3c..000000000 --- a/src/ejabberd_receiver.erl +++ /dev/null @@ -1,357 +0,0 @@ -%%%---------------------------------------------------------------------- -%%% File : ejabberd_receiver.erl -%%% Author : Alexey Shchepin -%%% Purpose : Socket receiver for C2S and S2S connections -%%% Created : 10 Nov 2003 by Alexey Shchepin -%%% -%%% -%%% ejabberd, Copyright (C) 2002-2017 ProcessOne -%%% -%%% This program is free software; you can redistribute it and/or -%%% modify it under the terms of the GNU General Public License as -%%% published by the Free Software Foundation; either version 2 of the -%%% License, or (at your option) any later version. -%%% -%%% This program is distributed in the hope that it will be useful, -%%% but WITHOUT ANY WARRANTY; without even the implied warranty of -%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -%%% General Public License for more details. -%%% -%%% You should have received a copy of the GNU General Public License along -%%% with this program; if not, write to the Free Software Foundation, Inc., -%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -%%% -%%%---------------------------------------------------------------------- - --module(ejabberd_receiver). - --author('alexey@process-one.net'). - --ifndef(GEN_SERVER). --define(GEN_SERVER, gen_server). --endif. --behaviour(?GEN_SERVER). --behaviour(ejabberd_config). - -%% API --export([start_link/4, - start/3, - start/4, - change_shaper/2, - reset_stream/1, - starttls/2, - compress/2, - become_controller/2, - close/1, - opt_type/1]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, - handle_info/2, terminate/2, code_change/3]). - --include("ejabberd.hrl"). --include("logger.hrl"). - --record(state, - {socket :: inet:socket() | fast_tls:tls_socket() | ezlib:zlib_socket(), - sock_mod = gen_tcp :: gen_tcp | fast_tls | ezlib, - shaper_state = none :: shaper:shaper(), - c2s_pid :: pid() | undefined, - max_stanza_size = infinity :: non_neg_integer() | infinity, - xml_stream_state :: fxml_stream:xml_stream_state() | undefined, - timeout = infinity:: timeout()}). - --spec start_link(inet:socket(), atom(), shaper:shaper(), - non_neg_integer() | infinity) -> ignore | - {error, any()} | - {ok, pid()}. - -start_link(Socket, SockMod, Shaper, MaxStanzaSize) -> - ?GEN_SERVER:start_link(?MODULE, - [Socket, SockMod, Shaper, MaxStanzaSize], []). - --spec start(inet:socket(), atom(), shaper:shaper()) -> undefined | pid(). - -start(Socket, SockMod, Shaper) -> - start(Socket, SockMod, Shaper, infinity). - --spec start(inet:socket(), atom(), shaper:shaper(), - non_neg_integer() | infinity) -> undefined | pid(). - -start(Socket, SockMod, Shaper, MaxStanzaSize) -> - {ok, Pid} = ?GEN_SERVER:start(ejabberd_receiver, - [Socket, SockMod, Shaper, MaxStanzaSize], []), - Pid. - --spec change_shaper(pid(), shaper:shaper()) -> ok. - -change_shaper(Pid, Shaper) -> - ?GEN_SERVER:cast(Pid, {change_shaper, Shaper}). - --spec reset_stream(pid()) -> ok | {error, any()}. - -reset_stream(Pid) -> do_call(Pid, reset_stream). - --spec starttls(pid(), fast_tls:tls_socket()) -> ok | {error, any()}. - -starttls(Pid, TLSSocket) -> - do_call(Pid, {starttls, TLSSocket}). - --spec compress(pid(), iodata() | undefined) -> {error, any()} | - {ok, ezlib:zlib_socket()}. - -compress(Pid, Data) -> - do_call(Pid, {compress, Data}). - --spec become_controller(pid(), pid()) -> ok | {error, any()}. - -become_controller(Pid, C2SPid) -> - do_call(Pid, {become_controller, C2SPid}). - --spec close(pid()) -> ok. - -close(Pid) -> - ?GEN_SERVER:cast(Pid, close). - - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -init([Socket, SockMod, Shaper, MaxStanzaSize]) -> - ShaperState = shaper:new(Shaper), - Timeout = case SockMod of - ssl -> 20; - _ -> infinity - end, - {ok, - #state{socket = Socket, sock_mod = SockMod, - shaper_state = ShaperState, - max_stanza_size = MaxStanzaSize, timeout = Timeout}}. - -handle_call({starttls, TLSSocket}, _From, State) -> - State1 = reset_parser(State), - NewState = State1#state{socket = TLSSocket, - sock_mod = fast_tls}, - case fast_tls:recv_data(TLSSocket, <<"">>) of - {ok, TLSData} -> - {reply, ok, - process_data(TLSData, NewState), hibernate_timeout()}; - {error, _} = Err -> - {stop, normal, Err, NewState} - end; -handle_call({compress, Data}, _From, - #state{socket = Socket, sock_mod = SockMod} = - State) -> - ejabberd:start_app(ezlib), - {ok, ZlibSocket} = ezlib:enable_zlib(SockMod, - Socket), - if Data /= undefined -> do_send(State, Data); - true -> ok - end, - State1 = reset_parser(State), - NewState = State1#state{socket = ZlibSocket, - sock_mod = ezlib}, - case ezlib:recv_data(ZlibSocket, <<"">>) of - {ok, ZlibData} -> - {reply, {ok, ZlibSocket}, - process_data(ZlibData, NewState), hibernate_timeout()}; - {error, _} = Err -> - {stop, normal, Err, NewState} - end; -handle_call(reset_stream, _From, State) -> - NewState = reset_parser(State), - Reply = ok, - {reply, Reply, NewState, hibernate_timeout()}; -handle_call({become_controller, C2SPid}, _From, State) -> - XMLStreamState = fxml_stream:new(C2SPid, State#state.max_stanza_size), - NewState = State#state{c2s_pid = C2SPid, - xml_stream_state = XMLStreamState}, - activate_socket(NewState), - Reply = ok, - {reply, Reply, NewState, hibernate_timeout()}; -handle_call(_Request, _From, State) -> - Reply = ok, {reply, Reply, State, hibernate_timeout()}. - -handle_cast({change_shaper, Shaper}, State) -> - NewShaperState = shaper:new(Shaper), - {noreply, State#state{shaper_state = NewShaperState}, - hibernate_timeout()}; -handle_cast(close, State) -> {stop, normal, State}; -handle_cast(_Msg, State) -> - {noreply, State, hibernate_timeout()}. - -handle_info({Tag, _TCPSocket, Data}, - #state{socket = Socket, sock_mod = SockMod} = State) - when (Tag == tcp) or (Tag == ssl) or - (Tag == ejabberd_xml) -> - case SockMod of - fast_tls -> - case fast_tls:recv_data(Socket, Data) of - {ok, TLSData} -> - {noreply, process_data(TLSData, State), - hibernate_timeout()}; - {error, Reason} -> - if is_binary(Reason) -> - ?DEBUG("TLS error = ~s", [Reason]); - true -> - ok - end, - {stop, normal, State} - end; - ezlib -> - case ezlib:recv_data(Socket, Data) of - {ok, ZlibData} -> - {noreply, process_data(ZlibData, State), - hibernate_timeout()}; - {error, _Reason} -> {stop, normal, State} - end; - _ -> - {noreply, process_data(Data, State), hibernate_timeout()} - end; -handle_info({Tag, _TCPSocket}, State) - when (Tag == tcp_closed) or (Tag == ssl_closed) -> - {stop, normal, State}; -handle_info({Tag, _TCPSocket, Reason}, State) - when (Tag == tcp_error) or (Tag == ssl_error) -> - case Reason of - timeout -> {noreply, State, hibernate_timeout()}; - _ -> {stop, normal, State} - end; -handle_info({timeout, _Ref, activate}, State) -> - activate_socket(State), - {noreply, State, hibernate_timeout()}; -handle_info(timeout, State) -> - proc_lib:hibernate(?GEN_SERVER, enter_loop, - [?MODULE, [], State]), - {noreply, State, hibernate_timeout()}; -handle_info(_Info, State) -> - {noreply, State, hibernate_timeout()}. - -terminate(_Reason, - #state{xml_stream_state = XMLStreamState, - c2s_pid = C2SPid} = - State) -> - close_stream(XMLStreamState), - if C2SPid /= undefined -> - p1_fsm:send_event(C2SPid, closed); - true -> ok - end, - catch (State#state.sock_mod):close(State#state.socket), - ok. - -code_change(_OldVsn, State, _Extra) -> {ok, State}. - -%%-------------------------------------------------------------------- -%%% Internal functions -%%-------------------------------------------------------------------- - -activate_socket(#state{socket = Socket, - sock_mod = SockMod}) -> - Res = case SockMod of - gen_tcp -> - inet:setopts(Socket, [{active, once}]); - _ -> - SockMod:setopts(Socket, [{active, once}]) - end, - case Res of - {error, _Reason} -> self() ! {tcp_closed, Socket}; - ok -> ok - end. - -%% Data processing for connectors directly generating xmlelement in -%% Erlang data structure. -%% WARNING: Shaper does not work with Erlang data structure. -process_data([], State) -> - activate_socket(State), State; -process_data([Element | Els], - #state{c2s_pid = C2SPid} = State) - when element(1, Element) == xmlel; - element(1, Element) == xmlstreamstart; - element(1, Element) == xmlstreamelement; - element(1, Element) == xmlstreamend -> - if C2SPid == undefined -> State; - true -> - catch p1_fsm:send_event(C2SPid, - element_wrapper(Element)), - process_data(Els, State) - end; -%% Data processing for connectors receivind data as string. -process_data(Data, - #state{xml_stream_state = XMLStreamState, - shaper_state = ShaperState, c2s_pid = C2SPid} = - State) -> - ?DEBUG("Received XML on stream = ~p", [(Data)]), - XMLStreamState1 = case XMLStreamState of - undefined -> - XMLStreamState; - _ -> - fxml_stream:parse(XMLStreamState, Data) - end, - {NewShaperState, Pause} = shaper:update(ShaperState, byte_size(Data)), - if - C2SPid == undefined -> - ok; - Pause > 0 -> - erlang:start_timer(Pause, self(), activate); - true -> - activate_socket(State) - end, - State#state{xml_stream_state = XMLStreamState1, - shaper_state = NewShaperState}. - -%% Element coming from XML parser are wrapped inside xmlstreamelement -%% When we receive directly xmlelement tuple (from a socket module -%% speaking directly Erlang XML), we wrap it inside the same -%% xmlstreamelement coming from the XML parser. -element_wrapper(XMLElement) - when element(1, XMLElement) == xmlel -> - {xmlstreamelement, XMLElement}; -element_wrapper(Element) -> Element. - -close_stream(undefined) -> ok; -close_stream(XMLStreamState) -> - fxml_stream:close(XMLStreamState). - -reset_parser(#state{xml_stream_state = undefined} = State) -> - State; -reset_parser(#state{c2s_pid = C2SPid, - max_stanza_size = MaxStanzaSize, - xml_stream_state = XMLStreamState} - = State) -> - NewStreamState = try fxml_stream:reset(XMLStreamState) - catch error:_ -> - close_stream(XMLStreamState), - case C2SPid of - undefined -> - undefined; - _ -> - fxml_stream:new(C2SPid, MaxStanzaSize) - end - end, - State#state{xml_stream_state = NewStreamState}. - -do_send(State, Data) -> - (State#state.sock_mod):send(State#state.socket, Data). - -do_call(Pid, Msg) -> - try ?GEN_SERVER:call(Pid, Msg) of - Res -> Res - catch _:{timeout, _} -> - {error, timeout}; - _:_ -> - {error, einval} - end. - -hibernate_timeout() -> - ejabberd_config:get_option(receiver_hibernate, timer:seconds(90)). - --spec opt_type(receiver_hibernate) -> fun((pos_integer() | hibernate) -> - pos_integer() | hibernate); - (atom()) -> [atom()]. -opt_type(receiver_hibernate) -> - fun(I) when is_integer(I), I>0 -> I; - (hibernate) -> hibernate - end; -opt_type(_) -> - [receiver_hibernate]. diff --git a/src/ejabberd_s2s_in.erl b/src/ejabberd_s2s_in.erl index ae81f739e..ecd632cb7 100644 --- a/src/ejabberd_s2s_in.erl +++ b/src/ejabberd_s2s_in.erl @@ -21,9 +21,9 @@ %%%------------------------------------------------------------------- -module(ejabberd_s2s_in). -behaviour(xmpp_stream_in). --behaviour(ejabberd_socket). +-behaviour(xmpp_socket). -%% ejabberd_socket callbacks +%% xmpp_socket callbacks -export([start/2, start_link/2, socket_type/0]). %% ejabberd_listener callbacks -export([listen_opt_type/1]). @@ -180,31 +180,29 @@ handle_stream_established(State) -> set_idle_timeout(State#{established => true}). handle_auth_success(RServer, Mech, _AuthModule, - #{sockmod := SockMod, - socket := Socket, ip := IP, + #{socket := Socket, ip := IP, auth_domains := AuthDomains, server_host := ServerHost, lserver := LServer} = State) -> ?INFO_MSG("(~s) Accepted inbound s2s ~s authentication ~s -> ~s (~s)", - [SockMod:pp(Socket), Mech, RServer, LServer, + [xmpp_socket:pp(Socket), Mech, RServer, LServer, ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), State1 = case ejabberd_s2s:allow_host(ServerHost, RServer) of true -> AuthDomains1 = sets:add_element(RServer, AuthDomains), - change_shaper(State, RServer), - State#{auth_domains => AuthDomains1}; + State0 = change_shaper(State, RServer), + State0#{auth_domains => AuthDomains1}; false -> State end, ejabberd_hooks:run_fold(s2s_in_auth_result, ServerHost, State1, [true, RServer]). handle_auth_failure(RServer, Mech, Reason, - #{sockmod := SockMod, - socket := Socket, ip := IP, + #{socket := Socket, ip := IP, server_host := ServerHost, lserver := LServer} = State) -> ?INFO_MSG("(~s) Failed inbound s2s ~s authentication ~s -> ~s (~s): ~s", - [SockMod:pp(Socket), Mech, RServer, LServer, + [xmpp_socket:pp(Socket), Mech, RServer, LServer, ejabberd_config:may_hide_data(misc:ip_to_list(IP)), Reason]), ejabberd_hooks:run_fold(s2s_in_auth_result, ServerHost, State, [false, RServer]). @@ -286,11 +284,11 @@ handle_info(Info, #{server_host := LServer} = State) -> ejabberd_hooks:run_fold(s2s_in_handle_info, LServer, State, [Info]). terminate(Reason, #{auth_domains := AuthDomains, - sockmod := SockMod, socket := Socket} = State) -> + socket := Socket} = State) -> case maps:get(stop_reason, State, undefined) of {tls, _} = Err -> ?WARNING_MSG("(~s) Failed to secure inbound s2s connection: ~s", - [SockMod:pp(Socket), xmpp_stream_in:format_error(Err)]); + [xmpp_socket:pp(Socket), xmpp_stream_in:format_error(Err)]); _ -> ok end, @@ -340,7 +338,7 @@ set_idle_timeout(#{server_host := LServer, set_idle_timeout(State) -> State. --spec change_shaper(state(), binary()) -> ok. +-spec change_shaper(state(), binary()) -> state(). change_shaper(#{shaper := ShaperName, server_host := ServerHost} = State, RServer) -> Shaper = acl:match_rule(ServerHost, ShaperName, jid:make(RServer)), diff --git a/src/ejabberd_s2s_out.erl b/src/ejabberd_s2s_out.erl index fea5d8162..0ece804a1 100644 --- a/src/ejabberd_s2s_out.erl +++ b/src/ejabberd_s2s_out.erl @@ -61,12 +61,12 @@ start(From, To, Opts) -> Res -> Res end; _ -> - xmpp_stream_out:start(?MODULE, [ejabberd_socket, From, To, Opts], + xmpp_stream_out:start(?MODULE, [xmpp_socket, From, To, Opts], ejabberd_config:fsm_limit_opts([])) end. start_link(From, To, Opts) -> - xmpp_stream_out:start_link(?MODULE, [ejabberd_socket, From, To, Opts], + xmpp_stream_out:start_link(?MODULE, [xmpp_socket, From, To, Opts], ejabberd_config:fsm_limit_opts([])). -spec connect(pid()) -> ok. @@ -210,24 +210,22 @@ dns_retries(#{server := LServer}) -> dns_timeout(#{server := LServer}) -> ejabberd_config:get_option({s2s_dns_timeout, LServer}, timer:seconds(10)). -handle_auth_success(Mech, #{sockmod := SockMod, - socket := Socket, ip := IP, +handle_auth_success(Mech, #{socket := Socket, ip := IP, remote_server := RServer, server_host := ServerHost, server := LServer} = State) -> ?INFO_MSG("(~s) Accepted outbound s2s ~s authentication ~s -> ~s (~s)", - [SockMod:pp(Socket), Mech, LServer, RServer, + [xmpp_socket:pp(Socket), Mech, LServer, RServer, ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), ejabberd_hooks:run_fold(s2s_out_auth_result, ServerHost, State, [true]). handle_auth_failure(Mech, Reason, - #{sockmod := SockMod, - socket := Socket, ip := IP, + #{socket := Socket, ip := IP, remote_server := RServer, server_host := ServerHost, server := LServer} = State) -> ?INFO_MSG("(~s) Failed outbound s2s ~s authentication ~s -> ~s (~s): ~s", - [SockMod:pp(Socket), Mech, LServer, RServer, + [xmpp_socket:pp(Socket), Mech, LServer, RServer, ejabberd_config:may_hide_data(misc:ip_to_list(IP)), xmpp_stream_out:format_error(Reason)]), ejabberd_hooks:run_fold(s2s_out_auth_result, ServerHost, State, [{false, Reason}]). diff --git a/src/ejabberd_service.erl b/src/ejabberd_service.erl index 7b5f945d0..3fe176a3b 100644 --- a/src/ejabberd_service.erl +++ b/src/ejabberd_service.erl @@ -21,11 +21,11 @@ %%%------------------------------------------------------------------- -module(ejabberd_service). -behaviour(xmpp_stream_in). --behaviour(ejabberd_socket). +-behaviour(xmpp_socket). -protocol({xep, 114, '1.6'}). -%% ejabberd_socket callbacks +%% xmpp_socket callbacks -export([start/2, start_link/2, socket_type/0, close/1, close/2]). %% ejabberd_listener callbacks -export([listen_opt_type/1, transform_listen_option/2]). @@ -100,8 +100,8 @@ init([State, Opts]) -> false -> [compression_none | TLSOpts1]; true -> TLSOpts1 end, - xmpp_stream_in:change_shaper(State, Shaper), - State1 = State#{access => Access, + State1 = xmpp_stream_in:change_shaper(State, Shaper), + State2 = State1#{access => Access, xmlns => ?NS_COMPONENT, lang => ?MYLANG, server => ?MYNAME, @@ -109,7 +109,7 @@ init([State, Opts]) -> stream_version => undefined, tls_options => TLSOpts, check_from => CheckFrom}, - ejabberd_hooks:run_fold(component_init, {ok, State1}, [Opts]). + ejabberd_hooks:run_fold(component_init, {ok, State2}, [Opts]). handle_stream_start(_StreamStart, #{remote_server := RemoteServer, @@ -135,8 +135,7 @@ handle_stream_start(_StreamStart, end. get_password_fun(#{remote_server := RemoteServer, - socket := Socket, sockmod := SockMod, - ip := IP, + socket := Socket, ip := IP, host_opts := HostOpts}) -> fun(_) -> case dict:find(RemoteServer, HostOpts) of @@ -145,7 +144,7 @@ get_password_fun(#{remote_server := RemoteServer, error -> ?INFO_MSG("(~s) Domain ~s is unconfigured for " "external component from ~s", - [SockMod:pp(Socket), RemoteServer, + [xmpp_socket:pp(Socket), RemoteServer, ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), {false, undefined} end @@ -153,11 +152,10 @@ get_password_fun(#{remote_server := RemoteServer, handle_auth_success(_, Mech, _, #{remote_server := RemoteServer, host_opts := HostOpts, - socket := Socket, sockmod := SockMod, - ip := IP} = State) -> + socket := Socket, ip := IP} = State) -> ?INFO_MSG("(~s) Accepted external component ~s authentication " "for ~s from ~s", - [SockMod:pp(Socket), Mech, RemoteServer, + [xmpp_socket:pp(Socket), Mech, RemoteServer, ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), lists:foreach( fun (H) -> @@ -168,11 +166,10 @@ handle_auth_success(_, Mech, _, handle_auth_failure(_, Mech, Reason, #{remote_server := RemoteServer, - sockmod := SockMod, socket := Socket, ip := IP} = State) -> ?INFO_MSG("(~s) Failed external component ~s authentication " "for ~s from ~s: ~s", - [SockMod:pp(Socket), Mech, RemoteServer, + [xmpp_socket:pp(Socket), Mech, RemoteServer, ejabberd_config:may_hide_data(misc:ip_to_list(IP)), Reason]), State. diff --git a/src/ejabberd_socket.erl b/src/ejabberd_socket.erl deleted file mode 100644 index 9953a76ae..000000000 --- a/src/ejabberd_socket.erl +++ /dev/null @@ -1,293 +0,0 @@ -%%%---------------------------------------------------------------------- -%%% File : ejabberd_socket.erl -%%% Author : Alexey Shchepin -%%% Purpose : Socket with zlib and TLS support library -%%% Created : 23 Aug 2006 by Alexey Shchepin -%%% -%%% -%%% ejabberd, Copyright (C) 2002-2017 ProcessOne -%%% -%%% This program is free software; you can redistribute it and/or -%%% modify it under the terms of the GNU General Public License as -%%% published by the Free Software Foundation; either version 2 of the -%%% License, or (at your option) any later version. -%%% -%%% This program is distributed in the hope that it will be useful, -%%% but WITHOUT ANY WARRANTY; without even the implied warranty of -%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -%%% General Public License for more details. -%%% -%%% You should have received a copy of the GNU General Public License along -%%% with this program; if not, write to the Free Software Foundation, Inc., -%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -%%% -%%%---------------------------------------------------------------------- - --module(ejabberd_socket). - --author('alexey@process-one.net'). - -%% API --export([start/4, - connect/3, - connect/4, - connect/5, - starttls/2, - compress/1, - compress/2, - reset_stream/1, - send_element/2, - send_header/2, - send_trailer/1, - send/2, - send_xml/2, - change_shaper/2, - monitor/1, - get_sockmod/1, - get_transport/1, - get_peer_certificate/2, - get_verify_result/1, - close/1, - pp/1, - sockname/1, peername/1]). - --include("ejabberd.hrl"). --include("xmpp.hrl"). --include("logger.hrl"). - --type sockmod() :: ejabberd_bosh | - ejabberd_http_ws | - gen_tcp | fast_tls | ezlib. --type receiver() :: pid () | atom(). --type socket() :: pid() | inet:socket() | - fast_tls:tls_socket() | - ezlib:zlib_socket() | - ejabberd_bosh:bosh_socket() | - ejabberd_http_ws:ws_socket(). - --record(socket_state, {sockmod = gen_tcp :: sockmod(), - socket = self() :: socket(), - receiver = self() :: receiver()}). - --type socket_state() :: #socket_state{}. - --export_type([socket/0, socket_state/0, sockmod/0]). - --callback start({module(), socket_state()}, - [proplists:property()]) -> {ok, pid()} | {error, term()} | ignore. --callback start_link({module(), socket_state()}, - [proplists:property()]) -> {ok, pid()} | {error, term()} | ignore. --callback socket_type() -> xml_stream | independent | raw. - --define(is_http_socket(S), - (S#socket_state.sockmod == ejabberd_bosh orelse - S#socket_state.sockmod == ejabberd_http_ws)). - -%%==================================================================== -%% API -%%==================================================================== --spec start(atom(), sockmod(), socket(), [proplists:property()]) - -> {ok, pid() | independent} | {error, inet:posix() | any()} | ignore. -start(Module, SockMod, Socket, Opts) -> - case Module:socket_type() of - independent -> {ok, independent}; - xml_stream -> - MaxStanzaSize = proplists:get_value(max_stanza_size, Opts, infinity), - {ReceiverMod, Receiver, RecRef} = - try SockMod:custom_receiver(Socket) of - {receiver, RecMod, RecPid} -> - {RecMod, RecPid, RecMod} - catch _:_ -> - RecPid = ejabberd_receiver:start( - Socket, SockMod, none, MaxStanzaSize), - {ejabberd_receiver, RecPid, RecPid} - end, - SocketData = #socket_state{sockmod = SockMod, - socket = Socket, receiver = RecRef}, - case Module:start({?MODULE, SocketData}, Opts) of - {ok, Pid} -> - case SockMod:controlling_process(Socket, Receiver) of - ok -> - ReceiverMod:become_controller(Receiver, Pid), - {ok, Receiver}; - Err -> - SockMod:close(Socket), - Err - end; - Err -> - SockMod:close(Socket), - case ReceiverMod of - ejabberd_receiver -> ReceiverMod:close(Receiver); - _ -> ok - end, - Err - end; - raw -> - case Module:start({SockMod, Socket}, Opts) of - {ok, Pid} -> - case SockMod:controlling_process(Socket, Pid) of - ok -> - {ok, Pid}; - {error, _} = Err -> - SockMod:close(Socket), - Err - end; - Err -> - SockMod:close(Socket), - Err - end - end. - -connect(Addr, Port, Opts) -> - connect(Addr, Port, Opts, infinity, self()). - -connect(Addr, Port, Opts, Timeout) -> - connect(Addr, Port, Opts, Timeout, self()). - -connect(Addr, Port, Opts, Timeout, Owner) -> - case gen_tcp:connect(Addr, Port, Opts, Timeout) of - {ok, Socket} -> - Receiver = ejabberd_receiver:start(Socket, gen_tcp, - none), - SocketData = #socket_state{sockmod = gen_tcp, - socket = Socket, receiver = Receiver}, - case gen_tcp:controlling_process(Socket, Receiver) of - ok -> - ejabberd_receiver:become_controller(Receiver, Owner), - {ok, SocketData}; - {error, _Reason} = Error -> gen_tcp:close(Socket), Error - end; - {error, _Reason} = Error -> Error - end. - -starttls(#socket_state{socket = Socket, - receiver = Receiver} = SocketData, TLSOpts) -> - case fast_tls:tcp_to_tls(Socket, TLSOpts) of - {ok, TLSSocket} -> - case ejabberd_receiver:starttls(Receiver, TLSSocket) of - ok -> - {ok, SocketData#socket_state{socket = TLSSocket, - sockmod = fast_tls}}; - {error, _} = Err -> - Err - end; - {error, _} = Err -> - Err - end. - -compress(SocketData) -> compress(SocketData, undefined). - -compress(SocketData, Data) -> - case ejabberd_receiver:compress(SocketData#socket_state.receiver, Data) of - {ok, ZlibSocket} -> - {ok, SocketData#socket_state{socket = ZlibSocket, sockmod = ezlib}}; - Err -> - ?ERROR_MSG("compress failed: ~p", [Err]), - Err - end. - -reset_stream(SocketData) - when is_pid(SocketData#socket_state.receiver) -> - ejabberd_receiver:reset_stream(SocketData#socket_state.receiver); -reset_stream(SocketData) - when is_atom(SocketData#socket_state.receiver) -> - (SocketData#socket_state.receiver):reset_stream(SocketData#socket_state.socket). - --spec send_element(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}. -send_element(SocketData, El) when ?is_http_socket(SocketData) -> - send_xml(SocketData, {xmlstreamelement, El}); -send_element(SocketData, El) -> - send(SocketData, fxml:element_to_binary(El)). - --spec send_header(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}. -send_header(SocketData, El) when ?is_http_socket(SocketData) -> - send_xml(SocketData, {xmlstreamstart, El#xmlel.name, El#xmlel.attrs}); -send_header(SocketData, El) -> - send(SocketData, fxml:element_to_header(El)). - --spec send_trailer(socket_state()) -> ok | {error, inet:posix()}. -send_trailer(SocketData) when ?is_http_socket(SocketData) -> - send_xml(SocketData, {xmlstreamend, <<"stream:stream">>}); -send_trailer(SocketData) -> - send(SocketData, <<"">>). - --spec send(socket_state(), iodata()) -> ok | {error, closed | inet:posix()}. -send(#socket_state{sockmod = SockMod, socket = Socket} = SocketData, Data) -> - ?DEBUG("(~s) Send XML on stream = ~p", [pp(SocketData), Data]), - try SockMod:send(Socket, Data) of - {error, einval} -> {error, closed}; - Result -> Result - catch _:badarg -> - %% Some modules throw badarg exceptions on closed sockets - %% TODO: their code should be improved - {error, closed} - end. - --spec send_xml(socket_state(), - {xmlstreamelement, fxml:xmlel()} | - {xmlstreamstart, binary(), [{binary(), binary()}]} | - {xmlstreamend, binary()} | - {xmlstreamraw, iodata()}) -> term(). -send_xml(SocketData, El) -> - (SocketData#socket_state.sockmod):send_xml(SocketData#socket_state.socket, El). - -change_shaper(SocketData, Shaper) - when is_pid(SocketData#socket_state.receiver) -> - ejabberd_receiver:change_shaper(SocketData#socket_state.receiver, - Shaper); -change_shaper(SocketData, Shaper) - when is_atom(SocketData#socket_state.receiver) -> - (SocketData#socket_state.receiver):change_shaper(SocketData#socket_state.socket, - Shaper). - -monitor(SocketData) - when is_pid(SocketData#socket_state.receiver) -> - erlang:monitor(process, - SocketData#socket_state.receiver); -monitor(SocketData) - when is_atom(SocketData#socket_state.receiver) -> - (SocketData#socket_state.receiver):monitor(SocketData#socket_state.socket). - -get_sockmod(SocketData) -> - SocketData#socket_state.sockmod. - -get_transport(#socket_state{sockmod = SockMod, - socket = Socket}) -> - case SockMod of - gen_tcp -> tcp; - fast_tls -> tls; - ezlib -> - case ezlib:get_sockmod(Socket) of - gen_tcp -> tcp_zlib; - fast_tls -> tls_zlib - end; - ejabberd_bosh -> http_bind; - ejabberd_http_ws -> websocket - end. - -get_peer_certificate(SocketData, Type) -> - fast_tls:get_peer_certificate(SocketData#socket_state.socket, Type). - -get_verify_result(SocketData) -> - fast_tls:get_verify_result(SocketData#socket_state.socket). - -close(SocketData) -> - ejabberd_receiver:close(SocketData#socket_state.receiver). - -sockname(#socket_state{sockmod = SockMod, - socket = Socket}) -> - case SockMod of - gen_tcp -> inet:sockname(Socket); - _ -> SockMod:sockname(Socket) - end. - -peername(#socket_state{sockmod = SockMod, - socket = Socket}) -> - case SockMod of - gen_tcp -> inet:peername(Socket); - _ -> SockMod:peername(Socket) - end. - -pp(#socket_state{receiver = Receiver} = State) -> - Transport = get_transport(State), - io_lib:format("~s|~w", [Transport, Receiver]). diff --git a/src/mod_s2s_dialback.erl b/src/mod_s2s_dialback.erl index 1bf04af35..917bb71e1 100644 --- a/src/mod_s2s_dialback.erl +++ b/src/mod_s2s_dialback.erl @@ -139,14 +139,13 @@ s2s_out_auth_result(#{db_verify := _} = State, _) -> %% in section 2.1.2, step 2 {stop, send_verify_request(State)}; s2s_out_auth_result(#{db_enabled := true, - sockmod := SockMod, socket := Socket, ip := IP, server := LServer, remote_server := RServer} = State, {false, _}) -> %% SASL authentication has failed, retrying with dialback %% Sending dialback request, section 2.1.1, step 1 ?INFO_MSG("(~s) Retrying with s2s dialback authentication: ~s -> ~s (~s)", - [SockMod:pp(Socket), LServer, RServer, + [xmpp_socket:pp(Socket), LServer, RServer, ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), State1 = maps:remove(stop_reason, State#{on_route => queue}), {stop, send_db_request(State1)}; @@ -159,7 +158,6 @@ s2s_out_downgraded(#{db_verify := _} = State, _) -> %% section 2.1.2, step 2 {stop, send_verify_request(State)}; s2s_out_downgraded(#{db_enabled := true, - sockmod := SockMod, socket := Socket, ip := IP, server := LServer, remote_server := RServer} = State, _) -> @@ -167,7 +165,7 @@ s2s_out_downgraded(#{db_enabled := true, %% section 2.1.1, step 1 ?INFO_MSG("(~s) Trying s2s dialback authentication with " "non-RFC compliant server: ~s -> ~s (~s)", - [SockMod:pp(Socket), LServer, RServer, + [xmpp_socket:pp(Socket), LServer, RServer, ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), {stop, send_db_request(State)}; s2s_out_downgraded(State, _) -> diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl index 658bd504e..c533a2bab 100644 --- a/src/mod_stream_mgmt.erl +++ b/src/mod_stream_mgmt.erl @@ -389,7 +389,7 @@ handle_a(State, #sm_a{h = H}) -> resend_rack(State1). -spec handle_resume(state(), sm_resume()) -> {ok, state()} | {error, state()}. -handle_resume(#{user := User, lserver := LServer, sockmod := SockMod, +handle_resume(#{user := User, lserver := LServer, lang := Lang, socket := Socket} = State, #sm_resume{h = H, previd = PrevID, xmlns = Xmlns}) -> R = case inherit_session_state(State, PrevID) of @@ -416,7 +416,7 @@ handle_resume(#{user := User, lserver := LServer, sockmod := SockMod, State4 = send(State3, #sm_r{xmlns = AttrXmlns}), State5 = ejabberd_hooks:run_fold(c2s_session_resumed, LServer, State4, []), ?INFO_MSG("(~s) Resumed session for ~s", - [SockMod:pp(Socket), jid:encode(JID)]), + [xmpp_socket:pp(Socket), jid:encode(JID)]), {ok, State5}; {error, El, Msg} -> ?INFO_MSG("Cannot resume session for ~s@~s: ~s", diff --git a/src/xmpp_socket.erl b/src/xmpp_socket.erl new file mode 100644 index 000000000..1ee021318 --- /dev/null +++ b/src/xmpp_socket.erl @@ -0,0 +1,386 @@ +%%%---------------------------------------------------------------------- +%%% File : xmpp_socket.erl +%%% Author : Alexey Shchepin +%%% Purpose : Socket with zlib and TLS support library +%%% Created : 23 Aug 2006 by Alexey Shchepin +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2017 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- + +-module(xmpp_socket). + +-author('alexey@process-one.net'). + +%% API +-export([start/4, + connect/3, + connect/4, + connect/5, + starttls/2, + compress/1, + compress/2, + reset_stream/1, + send_element/2, + send_header/2, + send_trailer/1, + send/2, + send_xml/2, + recv/2, + activate/1, + change_shaper/2, + monitor/1, + get_sockmod/1, + get_transport/1, + get_peer_certificate/2, + get_verify_result/1, + close/1, + pp/1, + sockname/1, peername/1]). + +-include("ejabberd.hrl"). +-include("xmpp.hrl"). +-include("logger.hrl"). + +-type sockmod() :: ejabberd_bosh | + ejabberd_http_ws | + gen_tcp | fast_tls | ezlib. +-type receiver() :: atom(). +-type socket() :: pid() | inet:socket() | + fast_tls:tls_socket() | + ezlib:zlib_socket() | + ejabberd_bosh:bosh_socket() | + ejabberd_http_ws:ws_socket(). + +-record(socket_state, {sockmod = gen_tcp :: sockmod(), + socket :: socket(), + max_stanza_size = infinity :: timeout(), + xml_stream :: undefined | fxml_stream:xml_stream_state(), + shaper = none :: none | shaper:shaper(), + receiver :: receiver()}). + +-type socket_state() :: #socket_state{}. + +-export_type([socket/0, socket_state/0, sockmod/0]). + +-callback start({module(), socket_state()}, + [proplists:property()]) -> {ok, pid()} | {error, term()} | ignore. +-callback start_link({module(), socket_state()}, + [proplists:property()]) -> {ok, pid()} | {error, term()} | ignore. +-callback socket_type() -> xml_stream | independent | raw. + +-define(is_http_socket(S), + (S#socket_state.sockmod == ejabberd_bosh orelse + S#socket_state.sockmod == ejabberd_http_ws)). + +%%==================================================================== +%% API +%%==================================================================== +-spec start(atom(), sockmod(), socket(), [proplists:property()]) + -> {ok, pid() | independent} | {error, inet:posix() | any()} | ignore. +start(Module, SockMod, Socket, Opts) -> + try + case Module:socket_type() of + independent -> + {ok, independent}; + xml_stream -> + MaxStanzaSize = proplists:get_value(max_stanza_size, Opts, infinity), + Receiver = proplists:get_value(receiver, Opts), + SocketData = #socket_state{sockmod = SockMod, + socket = Socket, + receiver = Receiver, + max_stanza_size = MaxStanzaSize}, + {ok, Pid} = Module:start({?MODULE, SocketData}, Opts), + Receiver1 = if is_pid(Receiver) -> Receiver; + true -> Pid + end, + ok = controlling_process(SocketData, Receiver1), + ok = become_controller(SocketData, Pid), + {ok, Receiver1}; + raw -> + {ok, Pid} = Module:start({SockMod, Socket}, Opts), + ok = SockMod:controlling_process(Socket, Pid), + {ok, Pid} + end + catch _:{badmatch, {error, _} = Err} -> + SockMod:close(Socket), + Err + end. + +connect(Addr, Port, Opts) -> + connect(Addr, Port, Opts, infinity, self()). + +connect(Addr, Port, Opts, Timeout) -> + connect(Addr, Port, Opts, Timeout, self()). + +connect(Addr, Port, Opts, Timeout, Owner) -> + case gen_tcp:connect(Addr, Port, Opts, Timeout) of + {ok, Socket} -> + SocketData = #socket_state{sockmod = gen_tcp, socket = Socket}, + case controlling_process(SocketData, Owner) of + ok -> + activate_after(Socket, Owner, 0), + {ok, SocketData}; + {error, _Reason} = Error -> + gen_tcp:close(Socket), + Error + end; + {error, _Reason} = Error -> + Error + end. + +starttls(#socket_state{socket = Socket, + receiver = undefined} = SocketData, TLSOpts) -> + case fast_tls:tcp_to_tls(Socket, TLSOpts) of + {ok, TLSSocket} -> + SocketData1 = SocketData#socket_state{socket = TLSSocket, + sockmod = fast_tls}, + SocketData2 = reset_stream(SocketData1), + case fast_tls:recv_data(TLSSocket, <<>>) of + {ok, TLSData} -> + parse(SocketData2, TLSData); + {error, _} = Err -> + Err + end; + {error, _} = Err -> + Err + end. + +compress(SocketData) -> compress(SocketData, undefined). + +compress(#socket_state{receiver = undefined, + sockmod = SockMod, + socket = Socket} = SocketData, Data) -> + ejabberd:start_app(ezlib), + {ok, ZlibSocket} = ezlib:enable_zlib(SockMod, Socket), + case Data of + undefined -> ok; + _ -> send(SocketData, Data) + end, + SocketData1 = SocketData#socket_state{socket = ZlibSocket, + sockmod = ezlib}, + SocketData2 = reset_stream(SocketData1), + case ezlib:recv_data(ZlibSocket, <<"">>) of + {ok, ZlibData} -> + parse(SocketData2, ZlibData); + {error, _} = Err -> + Err + end. + +reset_stream(#socket_state{xml_stream = XMLStream, + max_stanza_size = MaxStanzaSize} = SocketData) + when XMLStream /= undefined -> + XMLStream1 = try fxml_stream:reset(XMLStream) + catch error:_ -> + close_stream(XMLStream), + fxml_stream:new(self(), MaxStanzaSize) + end, + SocketData#socket_state{xml_stream = XMLStream1}; +reset_stream(#socket_state{sockmod = SockMod, socket = Socket} = SocketData) -> + Socket1 = SockMod:reset_stream(Socket), + SocketData#socket_state{socket = Socket1}. + +-spec send_element(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}. +send_element(SocketData, El) when ?is_http_socket(SocketData) -> + send_xml(SocketData, {xmlstreamelement, El}); +send_element(SocketData, El) -> + send(SocketData, fxml:element_to_binary(El)). + +-spec send_header(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}. +send_header(SocketData, El) when ?is_http_socket(SocketData) -> + send_xml(SocketData, {xmlstreamstart, El#xmlel.name, El#xmlel.attrs}); +send_header(SocketData, El) -> + send(SocketData, fxml:element_to_header(El)). + +-spec send_trailer(socket_state()) -> ok | {error, inet:posix()}. +send_trailer(SocketData) when ?is_http_socket(SocketData) -> + send_xml(SocketData, {xmlstreamend, <<"stream:stream">>}); +send_trailer(SocketData) -> + send(SocketData, <<"">>). + +-spec send(socket_state(), iodata()) -> ok | {error, closed | inet:posix()}. +send(#socket_state{sockmod = SockMod, socket = Socket} = SocketData, Data) -> + ?DEBUG("(~s) Send XML on stream = ~p", [pp(SocketData), Data]), + try SockMod:send(Socket, Data) of + {error, einval} -> {error, closed}; + Result -> Result + catch _:badarg -> + %% Some modules throw badarg exceptions on closed sockets + %% TODO: their code should be improved + {error, closed} + end. + +-spec send_xml(socket_state(), + {xmlstreamelement, fxml:xmlel()} | + {xmlstreamstart, binary(), [{binary(), binary()}]} | + {xmlstreamend, binary()} | + {xmlstreamraw, iodata()}) -> term(). +send_xml(SocketData, El) -> + (SocketData#socket_state.sockmod):send_xml(SocketData#socket_state.socket, El). + +recv(#socket_state{xml_stream = undefined} = SocketData, Data) -> + XMLStream = fxml_stream:new(self(), SocketData#socket_state.max_stanza_size), + recv(SocketData#socket_state{xml_stream = XMLStream}, Data); +recv(#socket_state{sockmod = SockMod, socket = Socket} = SocketData, Data) -> + case SockMod of + fast_tls -> + case fast_tls:recv_data(Socket, Data) of + {ok, TLSData} -> + parse(SocketData, TLSData); + {error, _} = Err -> + Err + end; + ezlib -> + case ezlib:recv_data(Socket, Data) of + {ok, ZlibData} -> + parse(SocketData, ZlibData); + {error, _} = Err -> + Err + end; + _ -> + parse(SocketData, Data) + end. + +change_shaper(#socket_state{receiver = undefined} = SocketData, Shaper) -> + ShaperState = shaper:new(Shaper), + SocketData#socket_state{shaper = ShaperState}; +change_shaper(#socket_state{sockmod = SockMod, + socket = Socket} = SocketData, Shaper) -> + SockMod:change_shaper(Socket, Shaper), + SocketData. + +monitor(#socket_state{receiver = undefined}) -> + make_ref(); +monitor(#socket_state{sockmod = SockMod, socket = Socket}) -> + SockMod:monitor(Socket). + +controlling_process(#socket_state{sockmod = SockMod, + socket = Socket}, Pid) -> + SockMod:controlling_process(Socket, Pid). + +become_controller(#socket_state{receiver = Receiver, + sockmod = SockMod, + socket = Socket}, Pid) -> + if is_pid(Receiver) -> + SockMod:become_controller(Receiver, Pid); + true -> + activate_after(Socket, Pid, 0) + end. + +get_sockmod(SocketData) -> + SocketData#socket_state.sockmod. + +get_transport(#socket_state{sockmod = SockMod, + socket = Socket}) -> + case SockMod of + gen_tcp -> tcp; + fast_tls -> tls; + ezlib -> + case ezlib:get_sockmod(Socket) of + gen_tcp -> tcp_zlib; + fast_tls -> tls_zlib + end; + ejabberd_bosh -> http_bind; + ejabberd_http_ws -> websocket + end. + +get_peer_certificate(SocketData, Type) -> + fast_tls:get_peer_certificate(SocketData#socket_state.socket, Type). + +get_verify_result(SocketData) -> + fast_tls:get_verify_result(SocketData#socket_state.socket). + +close(#socket_state{sockmod = SockMod, socket = Socket}) -> + SockMod:close(Socket). + +sockname(#socket_state{sockmod = SockMod, + socket = Socket}) -> + case SockMod of + gen_tcp -> inet:sockname(Socket); + _ -> SockMod:sockname(Socket) + end. + +peername(#socket_state{sockmod = SockMod, + socket = Socket}) -> + case SockMod of + gen_tcp -> inet:peername(Socket); + _ -> SockMod:peername(Socket) + end. + +activate(#socket_state{sockmod = SockMod, socket = Socket}) -> + case SockMod of + gen_tcp -> inet:setopts(Socket, [{active, once}]); + _ -> SockMod:setopts(Socket, [{active, once}]) + end. + +activate_after(Socket, Pid, Pause) -> + if Pause > 0 -> + erlang:send_after(Pause, Pid, {tcp, Socket, <<>>}); + true -> + Pid ! {tcp, Socket, <<>>} + end, + ok. + +pp(#socket_state{receiver = Receiver} = State) -> + Transport = get_transport(State), + Receiver1 = case Receiver of + undefined -> self(); + _ -> Receiver + end, + io_lib:format("~s|~w", [Transport, Receiver1]). + +parse(SocketData, Data) when Data == <<>>; Data == [] -> + case activate(SocketData) of + ok -> + {ok, SocketData}; + {error, _} = Err -> + Err + end; +parse(SocketData, [El | Els]) when is_record(El, xmlel) -> + self() ! {'$gen_event', {xmlstreamelement, El}}, + parse(SocketData, Els); +parse(SocketData, [El | Els]) when + element(1, El) == xmlstreamstart; + element(1, El) == xmlstreamelement; + element(1, El) == xmlstreamend; + element(1, El) == xmlstreamerror -> + self() ! {'$gen_event', El}, + parse(SocketData, Els); +parse(#socket_state{xml_stream = XMLStream, + socket = Socket, + shaper = ShaperState} = SocketData, Data) + when is_binary(Data) -> + XMLStream1 = fxml_stream:parse(XMLStream, Data), + {ShaperState1, Pause} = shaper:update(ShaperState, byte_size(Data)), + Ret = if Pause > 0 -> + activate_after(Socket, self(), Pause); + true -> + activate(SocketData) + end, + case Ret of + ok -> + {ok, SocketData#socket_state{xml_stream = XMLStream1, + shaper = ShaperState1}}; + {error, _} = Err -> + Err + end. + +close_stream(undefined) -> + ok; +close_stream(XMLStream) -> + fxml_stream:close(XMLStream). diff --git a/src/xmpp_stream_in.erl b/src/xmpp_stream_in.erl index c28bad8e1..f41bed6c8 100644 --- a/src/xmpp_stream_in.erl +++ b/src/xmpp_stream_in.erl @@ -177,16 +177,17 @@ set_timeout(#{owner := Owner} = State, Timeout) when Owner == self() -> set_timeout(_, _) -> erlang:error(badarg). -get_transport(#{sockmod := SockMod, socket := Socket, owner := Owner}) +get_transport(#{socket := Socket, owner := Owner}) when Owner == self() -> - SockMod:get_transport(Socket); + xmpp_socket:get_transport(Socket); get_transport(_) -> erlang:error(badarg). --spec change_shaper(state(), shaper:shaper()) -> ok. -change_shaper(#{sockmod := SockMod, socket := Socket, owner := Owner}, Shaper) +-spec change_shaper(state(), shaper:shaper()) -> state(). +change_shaper(#{socket := Socket, owner := Owner} = State, Shaper) when Owner == self() -> - SockMod:change_shaper(Socket, Shaper); + Socket1 = xmpp_socket:change_shaper(Socket, Shaper), + State#{socket => Socket1}; change_shaper(_, _) -> erlang:error(badarg). @@ -209,16 +210,15 @@ format_error(Err) -> %%%=================================================================== %%% gen_server callbacks %%%=================================================================== -init([Module, {SockMod, Socket}, Opts]) -> +init([Module, {_SockMod, Socket}, Opts]) -> Encrypted = proplists:get_bool(tls, Opts), - SocketMonitor = SockMod:monitor(Socket), - case SockMod:peername(Socket) of + SocketMonitor = xmpp_socket:monitor(Socket), + case xmpp_socket:peername(Socket) of {ok, IP} -> Time = p1_time_compat:monotonic_time(milli_seconds), State = #{owner => self(), mod => Module, socket => Socket, - sockmod => SockMod, socket_monitor => SocketMonitor, stream_timeout => {timer:seconds(30), Time}, stream_direction => in, @@ -247,7 +247,7 @@ init([Module, {SockMod, Socket}, Opts]) -> TLSOpts = try Module:tls_options(State1) catch _:undef -> [] end, - case SockMod:starttls(Socket, TLSOpts) of + case xmpp_socket:starttls(Socket, TLSOpts) of {ok, TLSSocket} -> State2 = State1#{socket => TLSSocket}, {_, State3, Timeout} = noreply(State2), @@ -333,8 +333,7 @@ handle_info({'$gen_event', {xmlstreamerror, Reason}}, #{lang := Lang}= State) -> send_pkt(State1, Err) end); handle_info({'$gen_event', El}, #{stream_state := wait_for_stream} = State) -> - %% TODO: find and fix this in fast_xml - error_logger:error_msg("unexpected event from receiver: ~p; " + error_logger:error_msg("unexpected event from XML driver: ~p; " "xmlstreamstart was expected", [El]), State1 = send_header(State), noreply( @@ -379,6 +378,21 @@ handle_info(timeout, #{mod := Mod} = State) -> handle_info({'DOWN', MRef, _Type, _Object, _Info}, #{socket_monitor := MRef} = State) -> noreply(process_stream_end({socket, closed}, State)); +handle_info({tcp, _, Data}, #{socket := Socket} = State) -> + noreply( + case xmpp_socket:recv(Socket, Data) of + {ok, NewSocket} -> + State#{socket => NewSocket}; + {error, Reason} when is_atom(Reason) -> + process_stream_end({socket, Reason}, State); + {error, Reason} -> + %% TODO: make fast_tls return atoms + process_stream_end({tls, Reason}, State) + end); +handle_info({tcp_closed, _}, State) -> + handle_info({'$gen_event', closed}, State); +handle_info({tcp_error, _, Reason}, State) -> + noreply(process_stream_end({socket, Reason}, State)); handle_info(Info, #{mod := Mod} = State) -> noreply(try Mod:handle_info(Info, State) catch _:undef -> State @@ -698,14 +712,14 @@ process_compress(#compress{}, when Compressed or not Authenticated -> send_pkt(State, #compress_failure{reason = 'setup-failed'}); process_compress(#compress{methods = HisMethods}, - #{socket := Socket, sockmod := SockMod, mod := Mod} = State) -> + #{socket := Socket, mod := Mod} = State) -> MyMethods = try Mod:compress_methods(State) catch _:undef -> [] end, CommonMethods = lists_intersection(MyMethods, HisMethods), case lists:member(<<"zlib">>, CommonMethods) of true -> - case SockMod:compress(Socket) of + case xmpp_socket:compress(Socket) of {ok, ZlibSocket} -> State1 = send_pkt(State, #compressed{}), case is_disconnected(State1) of @@ -730,13 +744,13 @@ process_compress(#compress{methods = HisMethods}, process_starttls(#{stream_encrypted := true} = State) -> process_starttls_failure(already_encrypted, State); process_starttls(#{socket := Socket, - sockmod := SockMod, mod := Mod} = State) -> + mod := Mod} = State) -> case is_starttls_available(State) of true -> TLSOpts = try Mod:tls_options(State) catch _:undef -> [] end, - case SockMod:starttls(Socket, TLSOpts) of + case xmpp_socket:starttls(Socket, TLSOpts) of {ok, TLSSocket} -> State1 = send_pkt(State, #starttls_proceed{}), case is_disconnected(State1) of @@ -814,12 +828,13 @@ process_sasl_result({error, Reason, User}, State) -> -spec process_sasl_success([cyrsasl:sasl_property()], binary(), state()) -> state(). process_sasl_success(Props, ServerOut, - #{socket := Socket, sockmod := SockMod, + #{socket := Socket, mod := Mod, sasl_mech := Mech} = State) -> User = identity(Props), AuthModule = proplists:get_value(auth_module, Props), - SockMod:reset_stream(Socket), - State1 = send_pkt(State, #sasl_success{text = ServerOut}), + Socket1 = xmpp_socket:reset_stream(Socket), + State0 = State#{socket => Socket1}, + State1 = send_pkt(State0, #sasl_success{text = ServerOut}), case is_disconnected(State1) of true -> State1; false -> @@ -1090,17 +1105,17 @@ send_trailer(State) -> close_socket(State). -spec socket_send(state(), xmpp_element() | xmlel() | trailer) -> ok | {error, inet:posix()}. -socket_send(#{socket := Sock, sockmod := SockMod, +socket_send(#{socket := Sock, stream_state := StateName, xmlns := NS, stream_header_sent := true}, Pkt) -> case Pkt of trailer -> - SockMod:send_trailer(Sock); + xmpp_socket:send_trailer(Sock); #stream_start{} when StateName /= disconnected -> - SockMod:send_header(Sock, xmpp:encode(Pkt)); + xmpp_socket:send_header(Sock, xmpp:encode(Pkt)); _ when StateName /= disconnected -> - SockMod:send_element(Sock, xmpp:encode(Pkt, NS)); + xmpp_socket:send_element(Sock, xmpp:encode(Pkt, NS)); _ -> {error, closed} end; @@ -1108,8 +1123,8 @@ socket_send(_, _) -> {error, closed}. -spec close_socket(state()) -> state(). -close_socket(#{sockmod := SockMod, socket := Socket} = State) -> - SockMod:close(Socket), +close_socket(#{socket := Socket} = State) -> + xmpp_socket:close(Socket), State#{stream_timeout => infinity, stream_state => disconnected}. diff --git a/src/xmpp_stream_out.erl b/src/xmpp_stream_out.erl index 8f4fa5c84..b5851b0b8 100644 --- a/src/xmpp_stream_out.erl +++ b/src/xmpp_stream_out.erl @@ -191,16 +191,17 @@ set_timeout(#{owner := Owner} = State, Timeout) when Owner == self() -> set_timeout(_, _) -> erlang:error(badarg). -get_transport(#{sockmod := SockMod, socket := Socket, owner := Owner}) +get_transport(#{socket := Socket, owner := Owner}) when Owner == self() -> - SockMod:get_transport(Socket); + xmpp_socket:get_transport(Socket); get_transport(_) -> erlang:error(badarg). --spec change_shaper(state(), shaper:shaper()) -> ok. -change_shaper(#{sockmod := SockMod, socket := Socket, owner := Owner}, Shaper) +-spec change_shaper(state(), shaper:shaper()) -> state(). +change_shaper(#{socket := Socket, owner := Owner} = State, Shaper) when Owner == self() -> - SockMod:change_shaper(Socket, Shaper); + Socket1 = xmpp_socket:change_shaper(Socket, Shaper), + State#{socket => Socket1}; change_shaper(_, _) -> erlang:error(badarg). @@ -233,11 +234,10 @@ format_error(Err) -> %%% gen_server callbacks %%%=================================================================== -spec init(list()) -> {ok, state(), timeout()} | {stop, term()} | ignore. -init([Mod, SockMod, From, To, Opts]) -> +init([Mod, _SockMod, From, To, Opts]) -> Time = p1_time_compat:monotonic_time(milli_seconds), State = #{owner => self(), mod => Mod, - sockmod => SockMod, server => From, user => <<"">>, resource => <<"">>, @@ -272,7 +272,6 @@ handle_call(Call, From, #{mod := Mod} = State) -> -spec handle_cast(term(), state()) -> noreply(). handle_cast(connect, #{remote_server := RemoteServer, - sockmod := SockMod, stream_state := connecting} = State) -> noreply( case idna_to_ascii(RemoteServer) of @@ -283,7 +282,7 @@ handle_cast(connect, #{remote_server := RemoteServer, {ok, AddrPorts} -> case connect(AddrPorts, State) of {ok, Socket, {Addr, Port, Encrypted}} -> - SocketMonitor = SockMod:monitor(Socket), + SocketMonitor = xmpp_socket:monitor(Socket), State1 = State#{ip => {Addr, Port}, socket => Socket, stream_encrypted => Encrypted, @@ -388,6 +387,21 @@ handle_info(timeout, #{mod := Mod} = State) -> handle_info({'DOWN', MRef, _Type, _Object, _Info}, #{socket_monitor := MRef} = State) -> noreply(process_stream_end({socket, closed}, State)); +handle_info({tcp, _, Data}, #{socket := Socket} = State) -> + noreply( + case xmpp_socket:recv(Socket, Data) of + {ok, NewSocket} -> + State#{socket => NewSocket}; + {error, Reason} when is_atom(Reason) -> + process_stream_end({socket, Reason}, State); + {error, Reason} -> + %% TODO: make fast_tls return atoms + process_stream_end({tls, Reason}, State) + end); +handle_info({tcp_closed, _}, State) -> + handle_info({'$gen_event', closed}, State); +handle_info({tcp_error, _, Reason}, State) -> + noreply(process_stream_end({socket, Reason}, State)); handle_info(Info, #{mod := Mod} = State) -> noreply(try Mod:handle_info(Info, State) catch _:undef -> State @@ -638,13 +652,13 @@ process_cert_verification(State) -> -spec process_sasl_success(state()) -> state(). process_sasl_success(#{mod := Mod, - sockmod := SockMod, socket := Socket} = State) -> - SockMod:reset_stream(Socket), - State1 = State#{stream_id => new_id(), - stream_restarted => true, - stream_state => wait_for_stream, - stream_authenticated => true}, + Socket1 = xmpp_socket:reset_stream(Socket), + State0 = State#{socket => Socket1}, + State1 = State0#{stream_id => new_id(), + stream_restarted => true, + stream_state => wait_for_stream, + stream_authenticated => true}, State2 = send_header(State1), case is_disconnected(State2) of true -> State2; @@ -745,15 +759,15 @@ send_error(State, Pkt, Err) -> end. -spec socket_send(state(), xmpp_element() | xmlel() | trailer) -> ok | {error, inet:posix()}. -socket_send(#{sockmod := SockMod, socket := Socket, xmlns := NS, +socket_send(#{socket := Socket, xmlns := NS, stream_state := StateName}, Pkt) -> case Pkt of trailer -> - SockMod:send_trailer(Socket); + xmpp_socket:send_trailer(Socket); #stream_start{} when StateName /= disconnected -> - SockMod:send_header(Socket, xmpp:encode(Pkt)); + xmpp_socket:send_header(Socket, xmpp:encode(Pkt)); _ when StateName /= disconnected -> - SockMod:send_element(Socket, xmpp:encode(Pkt, NS)); + xmpp_socket:send_element(Socket, xmpp:encode(Pkt, NS)); _ -> {error, closed} end; @@ -768,8 +782,8 @@ send_trailer(State) -> -spec close_socket(state()) -> state(). close_socket(State) -> case State of - #{sockmod := SockMod, socket := Socket} -> - SockMod:close(Socket); + #{socket := Socket} -> + xmpp_socket:close(Socket); _ -> ok end, @@ -777,8 +791,8 @@ close_socket(State) -> stream_state => disconnected}. -spec starttls(term(), state()) -> {ok, term()} | {error, tls_error_reason()}. -starttls(Socket, #{sockmod := SockMod, mod := Mod, - xmlns := NS, remote_server := RemoteServer} = State) -> +starttls(Socket, #{mod := Mod, xmlns := NS, + remote_server := RemoteServer} = State) -> TLSOpts = try Mod:tls_options(State) catch _:undef -> [] end, @@ -787,7 +801,7 @@ starttls(Socket, #{sockmod := SockMod, mod := Mod, ?NS_SERVER -> <<"xmpp-server">>; ?NS_CLIENT -> <<"xmpp-client">> end, - SockMod:starttls(Socket, [connect, {sni, SNI}, {alpn, [ALPN]}|TLSOpts]). + xmpp_socket:starttls(Socket, [connect, {sni, SNI}, {alpn, [ALPN]}|TLSOpts]). -spec select_lang(binary(), binary()) -> binary(). select_lang(Lang, <<"">>) -> Lang; @@ -1020,9 +1034,9 @@ host_entry_to_addr_ports(#hostent{h_addr_list = AddrList}, Port, TLS) -> -spec connect([ip_port()], state()) -> {ok, term(), ip_port()} | {error, {socket, socket_error_reason()}} | {error, {tls, tls_error_reason()}}. -connect(AddrPorts, #{sockmod := SockMod} = State) -> +connect(AddrPorts, State) -> Timeout = get_connect_timeout(State), - case connect(AddrPorts, SockMod, Timeout, {error, nxdomain}) of + case connect(AddrPorts, Timeout, {error, nxdomain}) of {ok, Socket, {Addr, Port, TLS = true}} -> case starttls(Socket, State) of {ok, TLSSocket} -> {ok, TLSSocket, {Addr, Port, TLS}}; @@ -1034,24 +1048,24 @@ connect(AddrPorts, #{sockmod := SockMod} = State) -> {error, {socket, Why}} end. --spec connect([ip_port()], module(), timeout(), network_error()) -> +-spec connect([ip_port()], timeout(), network_error()) -> {ok, term(), ip_port()} | network_error(). -connect([{Addr, Port, TLS}|AddrPorts], SockMod, Timeout, _) -> +connect([{Addr, Port, TLS}|AddrPorts], Timeout, _) -> Type = get_addr_type(Addr), - try SockMod:connect(Addr, Port, - [binary, {packet, 0}, - {send_timeout, ?TCP_SEND_TIMEOUT}, - {send_timeout_close, true}, - {active, false}, Type], - Timeout) of + try xmpp_socket:connect(Addr, Port, + [binary, {packet, 0}, + {send_timeout, ?TCP_SEND_TIMEOUT}, + {send_timeout_close, true}, + {active, false}, Type], + Timeout) of {ok, Socket} -> {ok, Socket, {Addr, Port, TLS}}; Err -> - connect(AddrPorts, SockMod, Timeout, Err) + connect(AddrPorts, Timeout, Err) catch _:badarg -> - connect(AddrPorts, SockMod, Timeout, {error, einval}) + connect(AddrPorts, Timeout, {error, einval}) end; -connect([], _SockMod, _Timeout, Err) -> +connect([], _Timeout, Err) -> Err. -spec get_addr_type(inet:ip_address()) -> inet:address_family(). diff --git a/src/xmpp_stream_pkix.erl b/src/xmpp_stream_pkix.erl index 8361999f4..9ca9fdffc 100644 --- a/src/xmpp_stream_pkix.erl +++ b/src/xmpp_stream_pkix.erl @@ -40,10 +40,10 @@ authenticate(State) -> -spec authenticate(xmpp_stream_in:state() | xmpp_stream_out:state(), binary()) -> {ok, binary()} | {error, atom(), binary()}. -authenticate(#{xmlns := ?NS_SERVER, sockmod := SockMod, +authenticate(#{xmlns := ?NS_SERVER, socket := Socket} = State, Authzid) -> Peer = maps:get(remote_server, State, Authzid), - case verify_cert(SockMod, Socket) of + case verify_cert(Socket) of {ok, Cert} -> case ejabberd_idna:domain_utf8_to_ascii(Peer) of false -> @@ -61,7 +61,7 @@ authenticate(#{xmlns := ?NS_SERVER, sockmod := SockMod, {error, Reason} -> {error, Reason, Peer} end; -authenticate(#{xmlns := ?NS_CLIENT, sockmod := SockMod, +authenticate(#{xmlns := ?NS_CLIENT, socket := Socket, lserver := LServer}, Authzid) -> JID = try jid:decode(Authzid) catch _:{bad_jid, <<>>} -> jid:make(LServer); @@ -69,7 +69,7 @@ authenticate(#{xmlns := ?NS_CLIENT, sockmod := SockMod, end, case JID of #jid{user = User} -> - case verify_cert(SockMod, Socket) of + case verify_cert(Socket) of {ok, Cert} -> JIDs = get_xmpp_addrs(Cert), get_username(JID, JIDs, LServer); @@ -104,11 +104,11 @@ get_cert_domains(Cert) -> %%%=================================================================== %%% Internal functions %%%=================================================================== --spec verify_cert(module(), ejabberd_socket:socket()) -> {ok, cert()} | {error, atom()}. -verify_cert(SockMod, Socket) -> - case SockMod:get_peer_certificate(Socket, otp) of +-spec verify_cert(xmpp_socket:socket()) -> {ok, cert()} | {error, atom()}. +verify_cert(Socket) -> + case xmpp_socket:get_peer_certificate(Socket, otp) of {ok, Cert} -> - case SockMod:get_verify_result(Socket) of + case xmpp_socket:get_verify_result(Socket) of 0 -> {ok, Cert}; VerifyRes ->