diff --git a/rebar.config b/rebar.config index dc8fe4f9c..0a8a6efd6 100644 --- a/rebar.config +++ b/rebar.config @@ -81,7 +81,8 @@ iconv]}}. {erl_first_files, ["src/ejabberd_sql_pt.erl", "src/ejabberd_config.erl", - "src/gen_mod.erl", "src/mod_muc_room.erl", "src/mod_push.erl"]}. + "src/gen_mod.erl", "src/mod_muc_room.erl", + "src/mod_push.erl", "src/xmpp_socket.erl"]}. {erl_opts, [nowarn_deprecated_function, {i, "include"}, diff --git a/src/ejabberd_bosh.erl b/src/ejabberd_bosh.erl index 1df6681ff..710e24ae4 100644 --- a/src/ejabberd_bosh.erl +++ b/src/ejabberd_bosh.erl @@ -33,7 +33,7 @@ -export([start/2, start/3, start_link/3]). -export([send_xml/2, setopts/2, controlling_process/2, - migrate/3, custom_receiver/1, become_controller/2, + migrate/3, become_controller/2, reset_stream/1, change_shaper/2, monitor/1, close/1, sockname/1, peername/1, process_request/3, send/2, change_controller/2]). @@ -175,9 +175,6 @@ setopts({http_bind, FsmRef, _IP}, Opts) -> controlling_process(_Socket, _Pid) -> ok. -custom_receiver({http_bind, FsmRef, _IP}) -> - {receiver, ?MODULE, FsmRef}. - become_controller(FsmRef, C2SPid) -> p1_fsm:send_all_state_event(FsmRef, {become_controller, C2SPid}). @@ -185,11 +182,11 @@ become_controller(FsmRef, C2SPid) -> change_controller({http_bind, FsmRef, _IP}, C2SPid) -> become_controller(FsmRef, C2SPid). -reset_stream({http_bind, _FsmRef, _IP}) -> ok. +reset_stream({http_bind, _FsmRef, _IP} = Socket) -> + Socket. change_shaper({http_bind, FsmRef, _IP}, Shaper) -> - p1_fsm:send_all_state_event(FsmRef, - {change_shaper, Shaper}). + p1_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}). monitor({http_bind, FsmRef, _IP}) -> erlang:monitor(process, FsmRef). @@ -306,8 +303,8 @@ init([#body{attrs = Attrs}, IP, SID]) -> buf_new(XMPPDomain)), Opts2} end, - ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, - Opts), + xmpp_socket:start(ejabberd_c2s, ?MODULE, Socket, + [{receiver, self()}|Opts]), Inactivity = gen_mod:get_module_opt(XMPPDomain, mod_bosh, max_inactivity, ?DEFAULT_INACTIVITY), diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 6794ef163..35353bb84 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -22,11 +22,11 @@ -module(ejabberd_c2s). -behaviour(xmpp_stream_in). -behaviour(ejabberd_config). --behaviour(ejabberd_socket). +-behaviour(xmpp_socket). -protocol({rfc, 6121}). -%% ejabberd_socket callbacks +%% xmpp_socket callbacks -export([start/2, start_link/2, socket_type/0]). %% ejabberd_config callbacks -export([opt_type/1, listen_opt_type/1, transform_listen_option/2]). @@ -62,7 +62,7 @@ -export_type([state/0]). %%%=================================================================== -%%% ejabberd_socket API +%%% xmpp_socket API %%%=================================================================== start(SockData, Opts) -> case proplists:get_value(supervisor, Opts, true) of @@ -203,16 +203,16 @@ copy_state(#{owner := Owner} = NewState, open_session(#{user := U, server := S, resource := R, sid := SID, ip := IP, auth_module := AuthModule} = State) -> JID = jid:make(U, S, R), - change_shaper(State), - Conn = get_conn_type(State), - State1 = State#{conn => Conn, resource => R, jid => JID}, + State1 = change_shaper(State), + Conn = get_conn_type(State1), + State2 = State1#{conn => Conn, resource => R, jid => JID}, Prio = case maps:get(pres_last, State, undefined) of undefined -> undefined; Pres -> get_priority_from_presence(Pres) end, Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthModule}], ejabberd_sm:open_session(SID, U, S, R, Prio, Info), - xmpp_stream_in:establish(State1). + xmpp_stream_in:establish(State2). %%%=================================================================== %%% Hooks @@ -264,12 +264,12 @@ reject_unauthenticated_packet(State, _Pkt) -> process_closed(State, Reason) -> stop(State#{stop_reason => Reason}). -process_terminated(#{sid := SID, sockmod := SockMod, socket := Socket, +process_terminated(#{sid := SID, socket := Socket, jid := JID, user := U, server := S, resource := R} = State, Reason) -> Status = format_reason(State, Reason), ?INFO_MSG("(~s) Closing c2s session for ~s: ~s", - [SockMod:pp(Socket), jid:encode(JID), Status]), + [xmpp_socket:pp(Socket), jid:encode(JID), Status]), State1 = case maps:is_key(pres_last, State) of true -> Pres = #presence{type = unavailable, @@ -285,10 +285,10 @@ process_terminated(#{sid := SID, sockmod := SockMod, socket := Socket, end, bounce_message_queue(), State1; -process_terminated(#{sockmod := SockMod, socket := Socket, +process_terminated(#{socket := Socket, stop_reason := {tls, _}} = State, Reason) -> ?WARNING_MSG("(~s) Failed to secure c2s connection: ~s", - [SockMod:pp(Socket), format_reason(State, Reason)]), + [xmpp_socket:pp(Socket), format_reason(State, Reason)]), State; process_terminated(State, _Reason) -> State. @@ -385,7 +385,7 @@ check_password_digest_fun(#{lserver := LServer}) -> bind(<<"">>, State) -> bind(new_uniq_id(), State); bind(R, #{user := U, server := S, access := Access, lang := Lang, - lserver := LServer, sockmod := SockMod, socket := Socket, + lserver := LServer, socket := Socket, ip := IP} = State) -> case resource_conflict_action(U, S, R) of closenew -> @@ -401,12 +401,12 @@ bind(R, #{user := U, server := S, access := Access, lang := Lang, State2 = ejabberd_hooks:run_fold( c2s_session_opened, LServer, State1, []), ?INFO_MSG("(~s) Opened c2s session for ~s", - [SockMod:pp(Socket), jid:encode(JID)]), + [xmpp_socket:pp(Socket), jid:encode(JID)]), {ok, State2}; deny -> ejabberd_hooks:run(forbidden_session_hook, LServer, [JID]), ?INFO_MSG("(~s) Forbidden c2s session for ~s", - [SockMod:pp(Socket), jid:encode(JID)]), + [xmpp_socket:pp(Socket), jid:encode(JID)]), Txt = <<"Access denied by service policy">>, {error, xmpp:err_not_allowed(Txt, Lang), State} end @@ -417,9 +417,9 @@ handle_stream_start(StreamStart, #{lserver := LServer} = State) -> false -> send(State#{lserver => ?MYNAME}, xmpp:serr_host_unknown()); true -> - change_shaper(State), + State1 = change_shaper(State), ejabberd_hooks:run_fold( - c2s_stream_started, LServer, State, [StreamStart]) + c2s_stream_started, LServer, State1, [StreamStart]) end. handle_stream_end(Reason, #{lserver := LServer} = State) -> @@ -427,20 +427,20 @@ handle_stream_end(Reason, #{lserver := LServer} = State) -> ejabberd_hooks:run_fold(c2s_closed, LServer, State1, [Reason]). handle_auth_success(User, Mech, AuthModule, - #{socket := Socket, sockmod := SockMod, + #{socket := Socket, ip := IP, lserver := LServer} = State) -> ?INFO_MSG("(~s) Accepted c2s ~s authentication for ~s@~s by ~s backend from ~s", - [SockMod:pp(Socket), Mech, User, LServer, + [xmpp_socket:pp(Socket), Mech, User, LServer, ejabberd_auth:backend_type(AuthModule), ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), State1 = State#{auth_module => AuthModule}, ejabberd_hooks:run_fold(c2s_auth_result, LServer, State1, [true, User]). handle_auth_failure(User, Mech, Reason, - #{socket := Socket, sockmod := SockMod, + #{socket := Socket, ip := IP, lserver := LServer} = State) -> ?INFO_MSG("(~s) Failed c2s ~s authentication ~sfrom ~s: ~s", - [SockMod:pp(Socket), Mech, + [xmpp_socket:pp(Socket), Mech, if User /= <<"">> -> ["for ", User, "@", LServer, " "]; true -> "" end, @@ -912,7 +912,7 @@ fix_from_to(Pkt, #{jid := JID}) when ?is_stanza(Pkt) -> fix_from_to(Pkt, _State) -> Pkt. --spec change_shaper(state()) -> ok. +-spec change_shaper(state()) -> state(). change_shaper(#{shaper := ShaperName, ip := IP, lserver := LServer, user := U, server := S, resource := R} = State) -> JID = jid:make(U, S, R), diff --git a/src/ejabberd_http_ws.erl b/src/ejabberd_http_ws.erl index f9f7b07e9..4a5d0b1b6 100644 --- a/src/ejabberd_http_ws.erl +++ b/src/ejabberd_http_ws.erl @@ -34,7 +34,8 @@ handle_sync_event/4, code_change/4, handle_info/3, terminate/3, send_xml/2, setopts/2, sockname/1, peername/1, controlling_process/2, become_controller/2, - close/1, socket_handoff/6, opt_type/1]). + monitor/1, reset_stream/1, close/1, change_shaper/2, + socket_handoff/6, opt_type/1]). -include("ejabberd.hrl"). -include("logger.hrl"). @@ -105,12 +106,21 @@ peername({http_ws, _FsmRef, IP}) -> {ok, IP}. controlling_process(_Socket, _Pid) -> ok. become_controller(FsmRef, C2SPid) -> - p1_fsm:send_all_state_event(FsmRef, - {become_controller, C2SPid}). + p1_fsm:send_all_state_event(FsmRef, {activate, C2SPid}). close({http_ws, FsmRef, _IP}) -> catch p1_fsm:sync_send_all_state_event(FsmRef, close). +monitor({http_ws, FsmRef, _IP}) -> + erlang:monitor(process, FsmRef). + +reset_stream({http_ws, _FsmRef, _IP} = Socket) -> + Socket. + +change_shaper({http_ws, _FsmRef, _IP}, _Shaper) -> + %% TODO??? + ok. + socket_handoff(LocalPath, Request, Socket, SockMod, Buf, Opts) -> ejabberd_websocket:socket_handoff(LocalPath, Request, Socket, SockMod, Buf, Opts, ?MODULE, fun get_human_html_xmlel/0). @@ -136,8 +146,8 @@ init([{#ws{ip = IP, http_opts = HOpts}, _} = WS]) -> Socket = {http_ws, self(), IP}, ?DEBUG("Client connected through websocket ~p", [Socket]), - ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, - Opts), + xmpp_socket:start(ejabberd_c2s, ?MODULE, Socket, + [{receiver, self()}|Opts]), Timer = erlang:start_timer(WSTimeout, self(), []), {ok, loop, #state{socket = Socket, timeout = WSTimeout, diff --git a/src/ejabberd_listener.erl b/src/ejabberd_listener.erl index 248e3faf0..dd4842f58 100644 --- a/src/ejabberd_listener.erl +++ b/src/ejabberd_listener.erl @@ -294,7 +294,7 @@ accept(ListenSocket, Module, Opts, Interval) -> {ok, Socket} -> case {inet:sockname(Socket), inet:peername(Socket)} of {{ok, {Addr, Port}}, {ok, {PAddr, PPort}}} -> - Receiver = case ejabberd_socket:start(Module, + Receiver = case xmpp_socket:start(Module, gen_tcp, Socket, Opts) of {ok, RecvPid} -> RecvPid; _ -> none diff --git a/src/ejabberd_receiver.erl b/src/ejabberd_receiver.erl deleted file mode 100644 index 52077ac3c..000000000 --- a/src/ejabberd_receiver.erl +++ /dev/null @@ -1,357 +0,0 @@ -%%%---------------------------------------------------------------------- -%%% File : ejabberd_receiver.erl -%%% Author : Alexey Shchepin -%%% Purpose : Socket receiver for C2S and S2S connections -%%% Created : 10 Nov 2003 by Alexey Shchepin -%%% -%%% -%%% ejabberd, Copyright (C) 2002-2017 ProcessOne -%%% -%%% This program is free software; you can redistribute it and/or -%%% modify it under the terms of the GNU General Public License as -%%% published by the Free Software Foundation; either version 2 of the -%%% License, or (at your option) any later version. -%%% -%%% This program is distributed in the hope that it will be useful, -%%% but WITHOUT ANY WARRANTY; without even the implied warranty of -%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -%%% General Public License for more details. -%%% -%%% You should have received a copy of the GNU General Public License along -%%% with this program; if not, write to the Free Software Foundation, Inc., -%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -%%% -%%%---------------------------------------------------------------------- - --module(ejabberd_receiver). - --author('alexey@process-one.net'). - --ifndef(GEN_SERVER). --define(GEN_SERVER, gen_server). --endif. --behaviour(?GEN_SERVER). --behaviour(ejabberd_config). - -%% API --export([start_link/4, - start/3, - start/4, - change_shaper/2, - reset_stream/1, - starttls/2, - compress/2, - become_controller/2, - close/1, - opt_type/1]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, - handle_info/2, terminate/2, code_change/3]). - --include("ejabberd.hrl"). --include("logger.hrl"). - --record(state, - {socket :: inet:socket() | fast_tls:tls_socket() | ezlib:zlib_socket(), - sock_mod = gen_tcp :: gen_tcp | fast_tls | ezlib, - shaper_state = none :: shaper:shaper(), - c2s_pid :: pid() | undefined, - max_stanza_size = infinity :: non_neg_integer() | infinity, - xml_stream_state :: fxml_stream:xml_stream_state() | undefined, - timeout = infinity:: timeout()}). - --spec start_link(inet:socket(), atom(), shaper:shaper(), - non_neg_integer() | infinity) -> ignore | - {error, any()} | - {ok, pid()}. - -start_link(Socket, SockMod, Shaper, MaxStanzaSize) -> - ?GEN_SERVER:start_link(?MODULE, - [Socket, SockMod, Shaper, MaxStanzaSize], []). - --spec start(inet:socket(), atom(), shaper:shaper()) -> undefined | pid(). - -start(Socket, SockMod, Shaper) -> - start(Socket, SockMod, Shaper, infinity). - --spec start(inet:socket(), atom(), shaper:shaper(), - non_neg_integer() | infinity) -> undefined | pid(). - -start(Socket, SockMod, Shaper, MaxStanzaSize) -> - {ok, Pid} = ?GEN_SERVER:start(ejabberd_receiver, - [Socket, SockMod, Shaper, MaxStanzaSize], []), - Pid. - --spec change_shaper(pid(), shaper:shaper()) -> ok. - -change_shaper(Pid, Shaper) -> - ?GEN_SERVER:cast(Pid, {change_shaper, Shaper}). - --spec reset_stream(pid()) -> ok | {error, any()}. - -reset_stream(Pid) -> do_call(Pid, reset_stream). - --spec starttls(pid(), fast_tls:tls_socket()) -> ok | {error, any()}. - -starttls(Pid, TLSSocket) -> - do_call(Pid, {starttls, TLSSocket}). - --spec compress(pid(), iodata() | undefined) -> {error, any()} | - {ok, ezlib:zlib_socket()}. - -compress(Pid, Data) -> - do_call(Pid, {compress, Data}). - --spec become_controller(pid(), pid()) -> ok | {error, any()}. - -become_controller(Pid, C2SPid) -> - do_call(Pid, {become_controller, C2SPid}). - --spec close(pid()) -> ok. - -close(Pid) -> - ?GEN_SERVER:cast(Pid, close). - - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -init([Socket, SockMod, Shaper, MaxStanzaSize]) -> - ShaperState = shaper:new(Shaper), - Timeout = case SockMod of - ssl -> 20; - _ -> infinity - end, - {ok, - #state{socket = Socket, sock_mod = SockMod, - shaper_state = ShaperState, - max_stanza_size = MaxStanzaSize, timeout = Timeout}}. - -handle_call({starttls, TLSSocket}, _From, State) -> - State1 = reset_parser(State), - NewState = State1#state{socket = TLSSocket, - sock_mod = fast_tls}, - case fast_tls:recv_data(TLSSocket, <<"">>) of - {ok, TLSData} -> - {reply, ok, - process_data(TLSData, NewState), hibernate_timeout()}; - {error, _} = Err -> - {stop, normal, Err, NewState} - end; -handle_call({compress, Data}, _From, - #state{socket = Socket, sock_mod = SockMod} = - State) -> - ejabberd:start_app(ezlib), - {ok, ZlibSocket} = ezlib:enable_zlib(SockMod, - Socket), - if Data /= undefined -> do_send(State, Data); - true -> ok - end, - State1 = reset_parser(State), - NewState = State1#state{socket = ZlibSocket, - sock_mod = ezlib}, - case ezlib:recv_data(ZlibSocket, <<"">>) of - {ok, ZlibData} -> - {reply, {ok, ZlibSocket}, - process_data(ZlibData, NewState), hibernate_timeout()}; - {error, _} = Err -> - {stop, normal, Err, NewState} - end; -handle_call(reset_stream, _From, State) -> - NewState = reset_parser(State), - Reply = ok, - {reply, Reply, NewState, hibernate_timeout()}; -handle_call({become_controller, C2SPid}, _From, State) -> - XMLStreamState = fxml_stream:new(C2SPid, State#state.max_stanza_size), - NewState = State#state{c2s_pid = C2SPid, - xml_stream_state = XMLStreamState}, - activate_socket(NewState), - Reply = ok, - {reply, Reply, NewState, hibernate_timeout()}; -handle_call(_Request, _From, State) -> - Reply = ok, {reply, Reply, State, hibernate_timeout()}. - -handle_cast({change_shaper, Shaper}, State) -> - NewShaperState = shaper:new(Shaper), - {noreply, State#state{shaper_state = NewShaperState}, - hibernate_timeout()}; -handle_cast(close, State) -> {stop, normal, State}; -handle_cast(_Msg, State) -> - {noreply, State, hibernate_timeout()}. - -handle_info({Tag, _TCPSocket, Data}, - #state{socket = Socket, sock_mod = SockMod} = State) - when (Tag == tcp) or (Tag == ssl) or - (Tag == ejabberd_xml) -> - case SockMod of - fast_tls -> - case fast_tls:recv_data(Socket, Data) of - {ok, TLSData} -> - {noreply, process_data(TLSData, State), - hibernate_timeout()}; - {error, Reason} -> - if is_binary(Reason) -> - ?DEBUG("TLS error = ~s", [Reason]); - true -> - ok - end, - {stop, normal, State} - end; - ezlib -> - case ezlib:recv_data(Socket, Data) of - {ok, ZlibData} -> - {noreply, process_data(ZlibData, State), - hibernate_timeout()}; - {error, _Reason} -> {stop, normal, State} - end; - _ -> - {noreply, process_data(Data, State), hibernate_timeout()} - end; -handle_info({Tag, _TCPSocket}, State) - when (Tag == tcp_closed) or (Tag == ssl_closed) -> - {stop, normal, State}; -handle_info({Tag, _TCPSocket, Reason}, State) - when (Tag == tcp_error) or (Tag == ssl_error) -> - case Reason of - timeout -> {noreply, State, hibernate_timeout()}; - _ -> {stop, normal, State} - end; -handle_info({timeout, _Ref, activate}, State) -> - activate_socket(State), - {noreply, State, hibernate_timeout()}; -handle_info(timeout, State) -> - proc_lib:hibernate(?GEN_SERVER, enter_loop, - [?MODULE, [], State]), - {noreply, State, hibernate_timeout()}; -handle_info(_Info, State) -> - {noreply, State, hibernate_timeout()}. - -terminate(_Reason, - #state{xml_stream_state = XMLStreamState, - c2s_pid = C2SPid} = - State) -> - close_stream(XMLStreamState), - if C2SPid /= undefined -> - p1_fsm:send_event(C2SPid, closed); - true -> ok - end, - catch (State#state.sock_mod):close(State#state.socket), - ok. - -code_change(_OldVsn, State, _Extra) -> {ok, State}. - -%%-------------------------------------------------------------------- -%%% Internal functions -%%-------------------------------------------------------------------- - -activate_socket(#state{socket = Socket, - sock_mod = SockMod}) -> - Res = case SockMod of - gen_tcp -> - inet:setopts(Socket, [{active, once}]); - _ -> - SockMod:setopts(Socket, [{active, once}]) - end, - case Res of - {error, _Reason} -> self() ! {tcp_closed, Socket}; - ok -> ok - end. - -%% Data processing for connectors directly generating xmlelement in -%% Erlang data structure. -%% WARNING: Shaper does not work with Erlang data structure. -process_data([], State) -> - activate_socket(State), State; -process_data([Element | Els], - #state{c2s_pid = C2SPid} = State) - when element(1, Element) == xmlel; - element(1, Element) == xmlstreamstart; - element(1, Element) == xmlstreamelement; - element(1, Element) == xmlstreamend -> - if C2SPid == undefined -> State; - true -> - catch p1_fsm:send_event(C2SPid, - element_wrapper(Element)), - process_data(Els, State) - end; -%% Data processing for connectors receivind data as string. -process_data(Data, - #state{xml_stream_state = XMLStreamState, - shaper_state = ShaperState, c2s_pid = C2SPid} = - State) -> - ?DEBUG("Received XML on stream = ~p", [(Data)]), - XMLStreamState1 = case XMLStreamState of - undefined -> - XMLStreamState; - _ -> - fxml_stream:parse(XMLStreamState, Data) - end, - {NewShaperState, Pause} = shaper:update(ShaperState, byte_size(Data)), - if - C2SPid == undefined -> - ok; - Pause > 0 -> - erlang:start_timer(Pause, self(), activate); - true -> - activate_socket(State) - end, - State#state{xml_stream_state = XMLStreamState1, - shaper_state = NewShaperState}. - -%% Element coming from XML parser are wrapped inside xmlstreamelement -%% When we receive directly xmlelement tuple (from a socket module -%% speaking directly Erlang XML), we wrap it inside the same -%% xmlstreamelement coming from the XML parser. -element_wrapper(XMLElement) - when element(1, XMLElement) == xmlel -> - {xmlstreamelement, XMLElement}; -element_wrapper(Element) -> Element. - -close_stream(undefined) -> ok; -close_stream(XMLStreamState) -> - fxml_stream:close(XMLStreamState). - -reset_parser(#state{xml_stream_state = undefined} = State) -> - State; -reset_parser(#state{c2s_pid = C2SPid, - max_stanza_size = MaxStanzaSize, - xml_stream_state = XMLStreamState} - = State) -> - NewStreamState = try fxml_stream:reset(XMLStreamState) - catch error:_ -> - close_stream(XMLStreamState), - case C2SPid of - undefined -> - undefined; - _ -> - fxml_stream:new(C2SPid, MaxStanzaSize) - end - end, - State#state{xml_stream_state = NewStreamState}. - -do_send(State, Data) -> - (State#state.sock_mod):send(State#state.socket, Data). - -do_call(Pid, Msg) -> - try ?GEN_SERVER:call(Pid, Msg) of - Res -> Res - catch _:{timeout, _} -> - {error, timeout}; - _:_ -> - {error, einval} - end. - -hibernate_timeout() -> - ejabberd_config:get_option(receiver_hibernate, timer:seconds(90)). - --spec opt_type(receiver_hibernate) -> fun((pos_integer() | hibernate) -> - pos_integer() | hibernate); - (atom()) -> [atom()]. -opt_type(receiver_hibernate) -> - fun(I) when is_integer(I), I>0 -> I; - (hibernate) -> hibernate - end; -opt_type(_) -> - [receiver_hibernate]. diff --git a/src/ejabberd_s2s_in.erl b/src/ejabberd_s2s_in.erl index ae81f739e..ecd632cb7 100644 --- a/src/ejabberd_s2s_in.erl +++ b/src/ejabberd_s2s_in.erl @@ -21,9 +21,9 @@ %%%------------------------------------------------------------------- -module(ejabberd_s2s_in). -behaviour(xmpp_stream_in). --behaviour(ejabberd_socket). +-behaviour(xmpp_socket). -%% ejabberd_socket callbacks +%% xmpp_socket callbacks -export([start/2, start_link/2, socket_type/0]). %% ejabberd_listener callbacks -export([listen_opt_type/1]). @@ -180,31 +180,29 @@ handle_stream_established(State) -> set_idle_timeout(State#{established => true}). handle_auth_success(RServer, Mech, _AuthModule, - #{sockmod := SockMod, - socket := Socket, ip := IP, + #{socket := Socket, ip := IP, auth_domains := AuthDomains, server_host := ServerHost, lserver := LServer} = State) -> ?INFO_MSG("(~s) Accepted inbound s2s ~s authentication ~s -> ~s (~s)", - [SockMod:pp(Socket), Mech, RServer, LServer, + [xmpp_socket:pp(Socket), Mech, RServer, LServer, ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), State1 = case ejabberd_s2s:allow_host(ServerHost, RServer) of true -> AuthDomains1 = sets:add_element(RServer, AuthDomains), - change_shaper(State, RServer), - State#{auth_domains => AuthDomains1}; + State0 = change_shaper(State, RServer), + State0#{auth_domains => AuthDomains1}; false -> State end, ejabberd_hooks:run_fold(s2s_in_auth_result, ServerHost, State1, [true, RServer]). handle_auth_failure(RServer, Mech, Reason, - #{sockmod := SockMod, - socket := Socket, ip := IP, + #{socket := Socket, ip := IP, server_host := ServerHost, lserver := LServer} = State) -> ?INFO_MSG("(~s) Failed inbound s2s ~s authentication ~s -> ~s (~s): ~s", - [SockMod:pp(Socket), Mech, RServer, LServer, + [xmpp_socket:pp(Socket), Mech, RServer, LServer, ejabberd_config:may_hide_data(misc:ip_to_list(IP)), Reason]), ejabberd_hooks:run_fold(s2s_in_auth_result, ServerHost, State, [false, RServer]). @@ -286,11 +284,11 @@ handle_info(Info, #{server_host := LServer} = State) -> ejabberd_hooks:run_fold(s2s_in_handle_info, LServer, State, [Info]). terminate(Reason, #{auth_domains := AuthDomains, - sockmod := SockMod, socket := Socket} = State) -> + socket := Socket} = State) -> case maps:get(stop_reason, State, undefined) of {tls, _} = Err -> ?WARNING_MSG("(~s) Failed to secure inbound s2s connection: ~s", - [SockMod:pp(Socket), xmpp_stream_in:format_error(Err)]); + [xmpp_socket:pp(Socket), xmpp_stream_in:format_error(Err)]); _ -> ok end, @@ -340,7 +338,7 @@ set_idle_timeout(#{server_host := LServer, set_idle_timeout(State) -> State. --spec change_shaper(state(), binary()) -> ok. +-spec change_shaper(state(), binary()) -> state(). change_shaper(#{shaper := ShaperName, server_host := ServerHost} = State, RServer) -> Shaper = acl:match_rule(ServerHost, ShaperName, jid:make(RServer)), diff --git a/src/ejabberd_s2s_out.erl b/src/ejabberd_s2s_out.erl index fea5d8162..0ece804a1 100644 --- a/src/ejabberd_s2s_out.erl +++ b/src/ejabberd_s2s_out.erl @@ -61,12 +61,12 @@ start(From, To, Opts) -> Res -> Res end; _ -> - xmpp_stream_out:start(?MODULE, [ejabberd_socket, From, To, Opts], + xmpp_stream_out:start(?MODULE, [xmpp_socket, From, To, Opts], ejabberd_config:fsm_limit_opts([])) end. start_link(From, To, Opts) -> - xmpp_stream_out:start_link(?MODULE, [ejabberd_socket, From, To, Opts], + xmpp_stream_out:start_link(?MODULE, [xmpp_socket, From, To, Opts], ejabberd_config:fsm_limit_opts([])). -spec connect(pid()) -> ok. @@ -210,24 +210,22 @@ dns_retries(#{server := LServer}) -> dns_timeout(#{server := LServer}) -> ejabberd_config:get_option({s2s_dns_timeout, LServer}, timer:seconds(10)). -handle_auth_success(Mech, #{sockmod := SockMod, - socket := Socket, ip := IP, +handle_auth_success(Mech, #{socket := Socket, ip := IP, remote_server := RServer, server_host := ServerHost, server := LServer} = State) -> ?INFO_MSG("(~s) Accepted outbound s2s ~s authentication ~s -> ~s (~s)", - [SockMod:pp(Socket), Mech, LServer, RServer, + [xmpp_socket:pp(Socket), Mech, LServer, RServer, ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), ejabberd_hooks:run_fold(s2s_out_auth_result, ServerHost, State, [true]). handle_auth_failure(Mech, Reason, - #{sockmod := SockMod, - socket := Socket, ip := IP, + #{socket := Socket, ip := IP, remote_server := RServer, server_host := ServerHost, server := LServer} = State) -> ?INFO_MSG("(~s) Failed outbound s2s ~s authentication ~s -> ~s (~s): ~s", - [SockMod:pp(Socket), Mech, LServer, RServer, + [xmpp_socket:pp(Socket), Mech, LServer, RServer, ejabberd_config:may_hide_data(misc:ip_to_list(IP)), xmpp_stream_out:format_error(Reason)]), ejabberd_hooks:run_fold(s2s_out_auth_result, ServerHost, State, [{false, Reason}]). diff --git a/src/ejabberd_service.erl b/src/ejabberd_service.erl index 7b5f945d0..3fe176a3b 100644 --- a/src/ejabberd_service.erl +++ b/src/ejabberd_service.erl @@ -21,11 +21,11 @@ %%%------------------------------------------------------------------- -module(ejabberd_service). -behaviour(xmpp_stream_in). --behaviour(ejabberd_socket). +-behaviour(xmpp_socket). -protocol({xep, 114, '1.6'}). -%% ejabberd_socket callbacks +%% xmpp_socket callbacks -export([start/2, start_link/2, socket_type/0, close/1, close/2]). %% ejabberd_listener callbacks -export([listen_opt_type/1, transform_listen_option/2]). @@ -100,8 +100,8 @@ init([State, Opts]) -> false -> [compression_none | TLSOpts1]; true -> TLSOpts1 end, - xmpp_stream_in:change_shaper(State, Shaper), - State1 = State#{access => Access, + State1 = xmpp_stream_in:change_shaper(State, Shaper), + State2 = State1#{access => Access, xmlns => ?NS_COMPONENT, lang => ?MYLANG, server => ?MYNAME, @@ -109,7 +109,7 @@ init([State, Opts]) -> stream_version => undefined, tls_options => TLSOpts, check_from => CheckFrom}, - ejabberd_hooks:run_fold(component_init, {ok, State1}, [Opts]). + ejabberd_hooks:run_fold(component_init, {ok, State2}, [Opts]). handle_stream_start(_StreamStart, #{remote_server := RemoteServer, @@ -135,8 +135,7 @@ handle_stream_start(_StreamStart, end. get_password_fun(#{remote_server := RemoteServer, - socket := Socket, sockmod := SockMod, - ip := IP, + socket := Socket, ip := IP, host_opts := HostOpts}) -> fun(_) -> case dict:find(RemoteServer, HostOpts) of @@ -145,7 +144,7 @@ get_password_fun(#{remote_server := RemoteServer, error -> ?INFO_MSG("(~s) Domain ~s is unconfigured for " "external component from ~s", - [SockMod:pp(Socket), RemoteServer, + [xmpp_socket:pp(Socket), RemoteServer, ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), {false, undefined} end @@ -153,11 +152,10 @@ get_password_fun(#{remote_server := RemoteServer, handle_auth_success(_, Mech, _, #{remote_server := RemoteServer, host_opts := HostOpts, - socket := Socket, sockmod := SockMod, - ip := IP} = State) -> + socket := Socket, ip := IP} = State) -> ?INFO_MSG("(~s) Accepted external component ~s authentication " "for ~s from ~s", - [SockMod:pp(Socket), Mech, RemoteServer, + [xmpp_socket:pp(Socket), Mech, RemoteServer, ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), lists:foreach( fun (H) -> @@ -168,11 +166,10 @@ handle_auth_success(_, Mech, _, handle_auth_failure(_, Mech, Reason, #{remote_server := RemoteServer, - sockmod := SockMod, socket := Socket, ip := IP} = State) -> ?INFO_MSG("(~s) Failed external component ~s authentication " "for ~s from ~s: ~s", - [SockMod:pp(Socket), Mech, RemoteServer, + [xmpp_socket:pp(Socket), Mech, RemoteServer, ejabberd_config:may_hide_data(misc:ip_to_list(IP)), Reason]), State. diff --git a/src/ejabberd_socket.erl b/src/ejabberd_socket.erl deleted file mode 100644 index 9953a76ae..000000000 --- a/src/ejabberd_socket.erl +++ /dev/null @@ -1,293 +0,0 @@ -%%%---------------------------------------------------------------------- -%%% File : ejabberd_socket.erl -%%% Author : Alexey Shchepin -%%% Purpose : Socket with zlib and TLS support library -%%% Created : 23 Aug 2006 by Alexey Shchepin -%%% -%%% -%%% ejabberd, Copyright (C) 2002-2017 ProcessOne -%%% -%%% This program is free software; you can redistribute it and/or -%%% modify it under the terms of the GNU General Public License as -%%% published by the Free Software Foundation; either version 2 of the -%%% License, or (at your option) any later version. -%%% -%%% This program is distributed in the hope that it will be useful, -%%% but WITHOUT ANY WARRANTY; without even the implied warranty of -%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -%%% General Public License for more details. -%%% -%%% You should have received a copy of the GNU General Public License along -%%% with this program; if not, write to the Free Software Foundation, Inc., -%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -%%% -%%%---------------------------------------------------------------------- - --module(ejabberd_socket). - --author('alexey@process-one.net'). - -%% API --export([start/4, - connect/3, - connect/4, - connect/5, - starttls/2, - compress/1, - compress/2, - reset_stream/1, - send_element/2, - send_header/2, - send_trailer/1, - send/2, - send_xml/2, - change_shaper/2, - monitor/1, - get_sockmod/1, - get_transport/1, - get_peer_certificate/2, - get_verify_result/1, - close/1, - pp/1, - sockname/1, peername/1]). - --include("ejabberd.hrl"). --include("xmpp.hrl"). --include("logger.hrl"). - --type sockmod() :: ejabberd_bosh | - ejabberd_http_ws | - gen_tcp | fast_tls | ezlib. --type receiver() :: pid () | atom(). --type socket() :: pid() | inet:socket() | - fast_tls:tls_socket() | - ezlib:zlib_socket() | - ejabberd_bosh:bosh_socket() | - ejabberd_http_ws:ws_socket(). - --record(socket_state, {sockmod = gen_tcp :: sockmod(), - socket = self() :: socket(), - receiver = self() :: receiver()}). - --type socket_state() :: #socket_state{}. - --export_type([socket/0, socket_state/0, sockmod/0]). - --callback start({module(), socket_state()}, - [proplists:property()]) -> {ok, pid()} | {error, term()} | ignore. --callback start_link({module(), socket_state()}, - [proplists:property()]) -> {ok, pid()} | {error, term()} | ignore. --callback socket_type() -> xml_stream | independent | raw. - --define(is_http_socket(S), - (S#socket_state.sockmod == ejabberd_bosh orelse - S#socket_state.sockmod == ejabberd_http_ws)). - -%%==================================================================== -%% API -%%==================================================================== --spec start(atom(), sockmod(), socket(), [proplists:property()]) - -> {ok, pid() | independent} | {error, inet:posix() | any()} | ignore. -start(Module, SockMod, Socket, Opts) -> - case Module:socket_type() of - independent -> {ok, independent}; - xml_stream -> - MaxStanzaSize = proplists:get_value(max_stanza_size, Opts, infinity), - {ReceiverMod, Receiver, RecRef} = - try SockMod:custom_receiver(Socket) of - {receiver, RecMod, RecPid} -> - {RecMod, RecPid, RecMod} - catch _:_ -> - RecPid = ejabberd_receiver:start( - Socket, SockMod, none, MaxStanzaSize), - {ejabberd_receiver, RecPid, RecPid} - end, - SocketData = #socket_state{sockmod = SockMod, - socket = Socket, receiver = RecRef}, - case Module:start({?MODULE, SocketData}, Opts) of - {ok, Pid} -> - case SockMod:controlling_process(Socket, Receiver) of - ok -> - ReceiverMod:become_controller(Receiver, Pid), - {ok, Receiver}; - Err -> - SockMod:close(Socket), - Err - end; - Err -> - SockMod:close(Socket), - case ReceiverMod of - ejabberd_receiver -> ReceiverMod:close(Receiver); - _ -> ok - end, - Err - end; - raw -> - case Module:start({SockMod, Socket}, Opts) of - {ok, Pid} -> - case SockMod:controlling_process(Socket, Pid) of - ok -> - {ok, Pid}; - {error, _} = Err -> - SockMod:close(Socket), - Err - end; - Err -> - SockMod:close(Socket), - Err - end - end. - -connect(Addr, Port, Opts) -> - connect(Addr, Port, Opts, infinity, self()). - -connect(Addr, Port, Opts, Timeout) -> - connect(Addr, Port, Opts, Timeout, self()). - -connect(Addr, Port, Opts, Timeout, Owner) -> - case gen_tcp:connect(Addr, Port, Opts, Timeout) of - {ok, Socket} -> - Receiver = ejabberd_receiver:start(Socket, gen_tcp, - none), - SocketData = #socket_state{sockmod = gen_tcp, - socket = Socket, receiver = Receiver}, - case gen_tcp:controlling_process(Socket, Receiver) of - ok -> - ejabberd_receiver:become_controller(Receiver, Owner), - {ok, SocketData}; - {error, _Reason} = Error -> gen_tcp:close(Socket), Error - end; - {error, _Reason} = Error -> Error - end. - -starttls(#socket_state{socket = Socket, - receiver = Receiver} = SocketData, TLSOpts) -> - case fast_tls:tcp_to_tls(Socket, TLSOpts) of - {ok, TLSSocket} -> - case ejabberd_receiver:starttls(Receiver, TLSSocket) of - ok -> - {ok, SocketData#socket_state{socket = TLSSocket, - sockmod = fast_tls}}; - {error, _} = Err -> - Err - end; - {error, _} = Err -> - Err - end. - -compress(SocketData) -> compress(SocketData, undefined). - -compress(SocketData, Data) -> - case ejabberd_receiver:compress(SocketData#socket_state.receiver, Data) of - {ok, ZlibSocket} -> - {ok, SocketData#socket_state{socket = ZlibSocket, sockmod = ezlib}}; - Err -> - ?ERROR_MSG("compress failed: ~p", [Err]), - Err - end. - -reset_stream(SocketData) - when is_pid(SocketData#socket_state.receiver) -> - ejabberd_receiver:reset_stream(SocketData#socket_state.receiver); -reset_stream(SocketData) - when is_atom(SocketData#socket_state.receiver) -> - (SocketData#socket_state.receiver):reset_stream(SocketData#socket_state.socket). - --spec send_element(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}. -send_element(SocketData, El) when ?is_http_socket(SocketData) -> - send_xml(SocketData, {xmlstreamelement, El}); -send_element(SocketData, El) -> - send(SocketData, fxml:element_to_binary(El)). - --spec send_header(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}. -send_header(SocketData, El) when ?is_http_socket(SocketData) -> - send_xml(SocketData, {xmlstreamstart, El#xmlel.name, El#xmlel.attrs}); -send_header(SocketData, El) -> - send(SocketData, fxml:element_to_header(El)). - --spec send_trailer(socket_state()) -> ok | {error, inet:posix()}. -send_trailer(SocketData) when ?is_http_socket(SocketData) -> - send_xml(SocketData, {xmlstreamend, <<"stream:stream">>}); -send_trailer(SocketData) -> - send(SocketData, <<"">>). - --spec send(socket_state(), iodata()) -> ok | {error, closed | inet:posix()}. -send(#socket_state{sockmod = SockMod, socket = Socket} = SocketData, Data) -> - ?DEBUG("(~s) Send XML on stream = ~p", [pp(SocketData), Data]), - try SockMod:send(Socket, Data) of - {error, einval} -> {error, closed}; - Result -> Result - catch _:badarg -> - %% Some modules throw badarg exceptions on closed sockets - %% TODO: their code should be improved - {error, closed} - end. - --spec send_xml(socket_state(), - {xmlstreamelement, fxml:xmlel()} | - {xmlstreamstart, binary(), [{binary(), binary()}]} | - {xmlstreamend, binary()} | - {xmlstreamraw, iodata()}) -> term(). -send_xml(SocketData, El) -> - (SocketData#socket_state.sockmod):send_xml(SocketData#socket_state.socket, El). - -change_shaper(SocketData, Shaper) - when is_pid(SocketData#socket_state.receiver) -> - ejabberd_receiver:change_shaper(SocketData#socket_state.receiver, - Shaper); -change_shaper(SocketData, Shaper) - when is_atom(SocketData#socket_state.receiver) -> - (SocketData#socket_state.receiver):change_shaper(SocketData#socket_state.socket, - Shaper). - -monitor(SocketData) - when is_pid(SocketData#socket_state.receiver) -> - erlang:monitor(process, - SocketData#socket_state.receiver); -monitor(SocketData) - when is_atom(SocketData#socket_state.receiver) -> - (SocketData#socket_state.receiver):monitor(SocketData#socket_state.socket). - -get_sockmod(SocketData) -> - SocketData#socket_state.sockmod. - -get_transport(#socket_state{sockmod = SockMod, - socket = Socket}) -> - case SockMod of - gen_tcp -> tcp; - fast_tls -> tls; - ezlib -> - case ezlib:get_sockmod(Socket) of - gen_tcp -> tcp_zlib; - fast_tls -> tls_zlib - end; - ejabberd_bosh -> http_bind; - ejabberd_http_ws -> websocket - end. - -get_peer_certificate(SocketData, Type) -> - fast_tls:get_peer_certificate(SocketData#socket_state.socket, Type). - -get_verify_result(SocketData) -> - fast_tls:get_verify_result(SocketData#socket_state.socket). - -close(SocketData) -> - ejabberd_receiver:close(SocketData#socket_state.receiver). - -sockname(#socket_state{sockmod = SockMod, - socket = Socket}) -> - case SockMod of - gen_tcp -> inet:sockname(Socket); - _ -> SockMod:sockname(Socket) - end. - -peername(#socket_state{sockmod = SockMod, - socket = Socket}) -> - case SockMod of - gen_tcp -> inet:peername(Socket); - _ -> SockMod:peername(Socket) - end. - -pp(#socket_state{receiver = Receiver} = State) -> - Transport = get_transport(State), - io_lib:format("~s|~w", [Transport, Receiver]). diff --git a/src/mod_s2s_dialback.erl b/src/mod_s2s_dialback.erl index 1bf04af35..917bb71e1 100644 --- a/src/mod_s2s_dialback.erl +++ b/src/mod_s2s_dialback.erl @@ -139,14 +139,13 @@ s2s_out_auth_result(#{db_verify := _} = State, _) -> %% in section 2.1.2, step 2 {stop, send_verify_request(State)}; s2s_out_auth_result(#{db_enabled := true, - sockmod := SockMod, socket := Socket, ip := IP, server := LServer, remote_server := RServer} = State, {false, _}) -> %% SASL authentication has failed, retrying with dialback %% Sending dialback request, section 2.1.1, step 1 ?INFO_MSG("(~s) Retrying with s2s dialback authentication: ~s -> ~s (~s)", - [SockMod:pp(Socket), LServer, RServer, + [xmpp_socket:pp(Socket), LServer, RServer, ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), State1 = maps:remove(stop_reason, State#{on_route => queue}), {stop, send_db_request(State1)}; @@ -159,7 +158,6 @@ s2s_out_downgraded(#{db_verify := _} = State, _) -> %% section 2.1.2, step 2 {stop, send_verify_request(State)}; s2s_out_downgraded(#{db_enabled := true, - sockmod := SockMod, socket := Socket, ip := IP, server := LServer, remote_server := RServer} = State, _) -> @@ -167,7 +165,7 @@ s2s_out_downgraded(#{db_enabled := true, %% section 2.1.1, step 1 ?INFO_MSG("(~s) Trying s2s dialback authentication with " "non-RFC compliant server: ~s -> ~s (~s)", - [SockMod:pp(Socket), LServer, RServer, + [xmpp_socket:pp(Socket), LServer, RServer, ejabberd_config:may_hide_data(misc:ip_to_list(IP))]), {stop, send_db_request(State)}; s2s_out_downgraded(State, _) -> diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl index 658bd504e..c533a2bab 100644 --- a/src/mod_stream_mgmt.erl +++ b/src/mod_stream_mgmt.erl @@ -389,7 +389,7 @@ handle_a(State, #sm_a{h = H}) -> resend_rack(State1). -spec handle_resume(state(), sm_resume()) -> {ok, state()} | {error, state()}. -handle_resume(#{user := User, lserver := LServer, sockmod := SockMod, +handle_resume(#{user := User, lserver := LServer, lang := Lang, socket := Socket} = State, #sm_resume{h = H, previd = PrevID, xmlns = Xmlns}) -> R = case inherit_session_state(State, PrevID) of @@ -416,7 +416,7 @@ handle_resume(#{user := User, lserver := LServer, sockmod := SockMod, State4 = send(State3, #sm_r{xmlns = AttrXmlns}), State5 = ejabberd_hooks:run_fold(c2s_session_resumed, LServer, State4, []), ?INFO_MSG("(~s) Resumed session for ~s", - [SockMod:pp(Socket), jid:encode(JID)]), + [xmpp_socket:pp(Socket), jid:encode(JID)]), {ok, State5}; {error, El, Msg} -> ?INFO_MSG("Cannot resume session for ~s@~s: ~s", diff --git a/src/xmpp_socket.erl b/src/xmpp_socket.erl new file mode 100644 index 000000000..1ee021318 --- /dev/null +++ b/src/xmpp_socket.erl @@ -0,0 +1,386 @@ +%%%---------------------------------------------------------------------- +%%% File : xmpp_socket.erl +%%% Author : Alexey Shchepin +%%% Purpose : Socket with zlib and TLS support library +%%% Created : 23 Aug 2006 by Alexey Shchepin +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2017 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- + +-module(xmpp_socket). + +-author('alexey@process-one.net'). + +%% API +-export([start/4, + connect/3, + connect/4, + connect/5, + starttls/2, + compress/1, + compress/2, + reset_stream/1, + send_element/2, + send_header/2, + send_trailer/1, + send/2, + send_xml/2, + recv/2, + activate/1, + change_shaper/2, + monitor/1, + get_sockmod/1, + get_transport/1, + get_peer_certificate/2, + get_verify_result/1, + close/1, + pp/1, + sockname/1, peername/1]). + +-include("ejabberd.hrl"). +-include("xmpp.hrl"). +-include("logger.hrl"). + +-type sockmod() :: ejabberd_bosh | + ejabberd_http_ws | + gen_tcp | fast_tls | ezlib. +-type receiver() :: atom(). +-type socket() :: pid() | inet:socket() | + fast_tls:tls_socket() | + ezlib:zlib_socket() | + ejabberd_bosh:bosh_socket() | + ejabberd_http_ws:ws_socket(). + +-record(socket_state, {sockmod = gen_tcp :: sockmod(), + socket :: socket(), + max_stanza_size = infinity :: timeout(), + xml_stream :: undefined | fxml_stream:xml_stream_state(), + shaper = none :: none | shaper:shaper(), + receiver :: receiver()}). + +-type socket_state() :: #socket_state{}. + +-export_type([socket/0, socket_state/0, sockmod/0]). + +-callback start({module(), socket_state()}, + [proplists:property()]) -> {ok, pid()} | {error, term()} | ignore. +-callback start_link({module(), socket_state()}, + [proplists:property()]) -> {ok, pid()} | {error, term()} | ignore. +-callback socket_type() -> xml_stream | independent | raw. + +-define(is_http_socket(S), + (S#socket_state.sockmod == ejabberd_bosh orelse + S#socket_state.sockmod == ejabberd_http_ws)). + +%%==================================================================== +%% API +%%==================================================================== +-spec start(atom(), sockmod(), socket(), [proplists:property()]) + -> {ok, pid() | independent} | {error, inet:posix() | any()} | ignore. +start(Module, SockMod, Socket, Opts) -> + try + case Module:socket_type() of + independent -> + {ok, independent}; + xml_stream -> + MaxStanzaSize = proplists:get_value(max_stanza_size, Opts, infinity), + Receiver = proplists:get_value(receiver, Opts), + SocketData = #socket_state{sockmod = SockMod, + socket = Socket, + receiver = Receiver, + max_stanza_size = MaxStanzaSize}, + {ok, Pid} = Module:start({?MODULE, SocketData}, Opts), + Receiver1 = if is_pid(Receiver) -> Receiver; + true -> Pid + end, + ok = controlling_process(SocketData, Receiver1), + ok = become_controller(SocketData, Pid), + {ok, Receiver1}; + raw -> + {ok, Pid} = Module:start({SockMod, Socket}, Opts), + ok = SockMod:controlling_process(Socket, Pid), + {ok, Pid} + end + catch _:{badmatch, {error, _} = Err} -> + SockMod:close(Socket), + Err + end. + +connect(Addr, Port, Opts) -> + connect(Addr, Port, Opts, infinity, self()). + +connect(Addr, Port, Opts, Timeout) -> + connect(Addr, Port, Opts, Timeout, self()). + +connect(Addr, Port, Opts, Timeout, Owner) -> + case gen_tcp:connect(Addr, Port, Opts, Timeout) of + {ok, Socket} -> + SocketData = #socket_state{sockmod = gen_tcp, socket = Socket}, + case controlling_process(SocketData, Owner) of + ok -> + activate_after(Socket, Owner, 0), + {ok, SocketData}; + {error, _Reason} = Error -> + gen_tcp:close(Socket), + Error + end; + {error, _Reason} = Error -> + Error + end. + +starttls(#socket_state{socket = Socket, + receiver = undefined} = SocketData, TLSOpts) -> + case fast_tls:tcp_to_tls(Socket, TLSOpts) of + {ok, TLSSocket} -> + SocketData1 = SocketData#socket_state{socket = TLSSocket, + sockmod = fast_tls}, + SocketData2 = reset_stream(SocketData1), + case fast_tls:recv_data(TLSSocket, <<>>) of + {ok, TLSData} -> + parse(SocketData2, TLSData); + {error, _} = Err -> + Err + end; + {error, _} = Err -> + Err + end. + +compress(SocketData) -> compress(SocketData, undefined). + +compress(#socket_state{receiver = undefined, + sockmod = SockMod, + socket = Socket} = SocketData, Data) -> + ejabberd:start_app(ezlib), + {ok, ZlibSocket} = ezlib:enable_zlib(SockMod, Socket), + case Data of + undefined -> ok; + _ -> send(SocketData, Data) + end, + SocketData1 = SocketData#socket_state{socket = ZlibSocket, + sockmod = ezlib}, + SocketData2 = reset_stream(SocketData1), + case ezlib:recv_data(ZlibSocket, <<"">>) of + {ok, ZlibData} -> + parse(SocketData2, ZlibData); + {error, _} = Err -> + Err + end. + +reset_stream(#socket_state{xml_stream = XMLStream, + max_stanza_size = MaxStanzaSize} = SocketData) + when XMLStream /= undefined -> + XMLStream1 = try fxml_stream:reset(XMLStream) + catch error:_ -> + close_stream(XMLStream), + fxml_stream:new(self(), MaxStanzaSize) + end, + SocketData#socket_state{xml_stream = XMLStream1}; +reset_stream(#socket_state{sockmod = SockMod, socket = Socket} = SocketData) -> + Socket1 = SockMod:reset_stream(Socket), + SocketData#socket_state{socket = Socket1}. + +-spec send_element(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}. +send_element(SocketData, El) when ?is_http_socket(SocketData) -> + send_xml(SocketData, {xmlstreamelement, El}); +send_element(SocketData, El) -> + send(SocketData, fxml:element_to_binary(El)). + +-spec send_header(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}. +send_header(SocketData, El) when ?is_http_socket(SocketData) -> + send_xml(SocketData, {xmlstreamstart, El#xmlel.name, El#xmlel.attrs}); +send_header(SocketData, El) -> + send(SocketData, fxml:element_to_header(El)). + +-spec send_trailer(socket_state()) -> ok | {error, inet:posix()}. +send_trailer(SocketData) when ?is_http_socket(SocketData) -> + send_xml(SocketData, {xmlstreamend, <<"stream:stream">>}); +send_trailer(SocketData) -> + send(SocketData, <<"">>). + +-spec send(socket_state(), iodata()) -> ok | {error, closed | inet:posix()}. +send(#socket_state{sockmod = SockMod, socket = Socket} = SocketData, Data) -> + ?DEBUG("(~s) Send XML on stream = ~p", [pp(SocketData), Data]), + try SockMod:send(Socket, Data) of + {error, einval} -> {error, closed}; + Result -> Result + catch _:badarg -> + %% Some modules throw badarg exceptions on closed sockets + %% TODO: their code should be improved + {error, closed} + end. + +-spec send_xml(socket_state(), + {xmlstreamelement, fxml:xmlel()} | + {xmlstreamstart, binary(), [{binary(), binary()}]} | + {xmlstreamend, binary()} | + {xmlstreamraw, iodata()}) -> term(). +send_xml(SocketData, El) -> + (SocketData#socket_state.sockmod):send_xml(SocketData#socket_state.socket, El). + +recv(#socket_state{xml_stream = undefined} = SocketData, Data) -> + XMLStream = fxml_stream:new(self(), SocketData#socket_state.max_stanza_size), + recv(SocketData#socket_state{xml_stream = XMLStream}, Data); +recv(#socket_state{sockmod = SockMod, socket = Socket} = SocketData, Data) -> + case SockMod of + fast_tls -> + case fast_tls:recv_data(Socket, Data) of + {ok, TLSData} -> + parse(SocketData, TLSData); + {error, _} = Err -> + Err + end; + ezlib -> + case ezlib:recv_data(Socket, Data) of + {ok, ZlibData} -> + parse(SocketData, ZlibData); + {error, _} = Err -> + Err + end; + _ -> + parse(SocketData, Data) + end. + +change_shaper(#socket_state{receiver = undefined} = SocketData, Shaper) -> + ShaperState = shaper:new(Shaper), + SocketData#socket_state{shaper = ShaperState}; +change_shaper(#socket_state{sockmod = SockMod, + socket = Socket} = SocketData, Shaper) -> + SockMod:change_shaper(Socket, Shaper), + SocketData. + +monitor(#socket_state{receiver = undefined}) -> + make_ref(); +monitor(#socket_state{sockmod = SockMod, socket = Socket}) -> + SockMod:monitor(Socket). + +controlling_process(#socket_state{sockmod = SockMod, + socket = Socket}, Pid) -> + SockMod:controlling_process(Socket, Pid). + +become_controller(#socket_state{receiver = Receiver, + sockmod = SockMod, + socket = Socket}, Pid) -> + if is_pid(Receiver) -> + SockMod:become_controller(Receiver, Pid); + true -> + activate_after(Socket, Pid, 0) + end. + +get_sockmod(SocketData) -> + SocketData#socket_state.sockmod. + +get_transport(#socket_state{sockmod = SockMod, + socket = Socket}) -> + case SockMod of + gen_tcp -> tcp; + fast_tls -> tls; + ezlib -> + case ezlib:get_sockmod(Socket) of + gen_tcp -> tcp_zlib; + fast_tls -> tls_zlib + end; + ejabberd_bosh -> http_bind; + ejabberd_http_ws -> websocket + end. + +get_peer_certificate(SocketData, Type) -> + fast_tls:get_peer_certificate(SocketData#socket_state.socket, Type). + +get_verify_result(SocketData) -> + fast_tls:get_verify_result(SocketData#socket_state.socket). + +close(#socket_state{sockmod = SockMod, socket = Socket}) -> + SockMod:close(Socket). + +sockname(#socket_state{sockmod = SockMod, + socket = Socket}) -> + case SockMod of + gen_tcp -> inet:sockname(Socket); + _ -> SockMod:sockname(Socket) + end. + +peername(#socket_state{sockmod = SockMod, + socket = Socket}) -> + case SockMod of + gen_tcp -> inet:peername(Socket); + _ -> SockMod:peername(Socket) + end. + +activate(#socket_state{sockmod = SockMod, socket = Socket}) -> + case SockMod of + gen_tcp -> inet:setopts(Socket, [{active, once}]); + _ -> SockMod:setopts(Socket, [{active, once}]) + end. + +activate_after(Socket, Pid, Pause) -> + if Pause > 0 -> + erlang:send_after(Pause, Pid, {tcp, Socket, <<>>}); + true -> + Pid ! {tcp, Socket, <<>>} + end, + ok. + +pp(#socket_state{receiver = Receiver} = State) -> + Transport = get_transport(State), + Receiver1 = case Receiver of + undefined -> self(); + _ -> Receiver + end, + io_lib:format("~s|~w", [Transport, Receiver1]). + +parse(SocketData, Data) when Data == <<>>; Data == [] -> + case activate(SocketData) of + ok -> + {ok, SocketData}; + {error, _} = Err -> + Err + end; +parse(SocketData, [El | Els]) when is_record(El, xmlel) -> + self() ! {'$gen_event', {xmlstreamelement, El}}, + parse(SocketData, Els); +parse(SocketData, [El | Els]) when + element(1, El) == xmlstreamstart; + element(1, El) == xmlstreamelement; + element(1, El) == xmlstreamend; + element(1, El) == xmlstreamerror -> + self() ! {'$gen_event', El}, + parse(SocketData, Els); +parse(#socket_state{xml_stream = XMLStream, + socket = Socket, + shaper = ShaperState} = SocketData, Data) + when is_binary(Data) -> + XMLStream1 = fxml_stream:parse(XMLStream, Data), + {ShaperState1, Pause} = shaper:update(ShaperState, byte_size(Data)), + Ret = if Pause > 0 -> + activate_after(Socket, self(), Pause); + true -> + activate(SocketData) + end, + case Ret of + ok -> + {ok, SocketData#socket_state{xml_stream = XMLStream1, + shaper = ShaperState1}}; + {error, _} = Err -> + Err + end. + +close_stream(undefined) -> + ok; +close_stream(XMLStream) -> + fxml_stream:close(XMLStream). diff --git a/src/xmpp_stream_in.erl b/src/xmpp_stream_in.erl index c28bad8e1..f41bed6c8 100644 --- a/src/xmpp_stream_in.erl +++ b/src/xmpp_stream_in.erl @@ -177,16 +177,17 @@ set_timeout(#{owner := Owner} = State, Timeout) when Owner == self() -> set_timeout(_, _) -> erlang:error(badarg). -get_transport(#{sockmod := SockMod, socket := Socket, owner := Owner}) +get_transport(#{socket := Socket, owner := Owner}) when Owner == self() -> - SockMod:get_transport(Socket); + xmpp_socket:get_transport(Socket); get_transport(_) -> erlang:error(badarg). --spec change_shaper(state(), shaper:shaper()) -> ok. -change_shaper(#{sockmod := SockMod, socket := Socket, owner := Owner}, Shaper) +-spec change_shaper(state(), shaper:shaper()) -> state(). +change_shaper(#{socket := Socket, owner := Owner} = State, Shaper) when Owner == self() -> - SockMod:change_shaper(Socket, Shaper); + Socket1 = xmpp_socket:change_shaper(Socket, Shaper), + State#{socket => Socket1}; change_shaper(_, _) -> erlang:error(badarg). @@ -209,16 +210,15 @@ format_error(Err) -> %%%=================================================================== %%% gen_server callbacks %%%=================================================================== -init([Module, {SockMod, Socket}, Opts]) -> +init([Module, {_SockMod, Socket}, Opts]) -> Encrypted = proplists:get_bool(tls, Opts), - SocketMonitor = SockMod:monitor(Socket), - case SockMod:peername(Socket) of + SocketMonitor = xmpp_socket:monitor(Socket), + case xmpp_socket:peername(Socket) of {ok, IP} -> Time = p1_time_compat:monotonic_time(milli_seconds), State = #{owner => self(), mod => Module, socket => Socket, - sockmod => SockMod, socket_monitor => SocketMonitor, stream_timeout => {timer:seconds(30), Time}, stream_direction => in, @@ -247,7 +247,7 @@ init([Module, {SockMod, Socket}, Opts]) -> TLSOpts = try Module:tls_options(State1) catch _:undef -> [] end, - case SockMod:starttls(Socket, TLSOpts) of + case xmpp_socket:starttls(Socket, TLSOpts) of {ok, TLSSocket} -> State2 = State1#{socket => TLSSocket}, {_, State3, Timeout} = noreply(State2), @@ -333,8 +333,7 @@ handle_info({'$gen_event', {xmlstreamerror, Reason}}, #{lang := Lang}= State) -> send_pkt(State1, Err) end); handle_info({'$gen_event', El}, #{stream_state := wait_for_stream} = State) -> - %% TODO: find and fix this in fast_xml - error_logger:error_msg("unexpected event from receiver: ~p; " + error_logger:error_msg("unexpected event from XML driver: ~p; " "xmlstreamstart was expected", [El]), State1 = send_header(State), noreply( @@ -379,6 +378,21 @@ handle_info(timeout, #{mod := Mod} = State) -> handle_info({'DOWN', MRef, _Type, _Object, _Info}, #{socket_monitor := MRef} = State) -> noreply(process_stream_end({socket, closed}, State)); +handle_info({tcp, _, Data}, #{socket := Socket} = State) -> + noreply( + case xmpp_socket:recv(Socket, Data) of + {ok, NewSocket} -> + State#{socket => NewSocket}; + {error, Reason} when is_atom(Reason) -> + process_stream_end({socket, Reason}, State); + {error, Reason} -> + %% TODO: make fast_tls return atoms + process_stream_end({tls, Reason}, State) + end); +handle_info({tcp_closed, _}, State) -> + handle_info({'$gen_event', closed}, State); +handle_info({tcp_error, _, Reason}, State) -> + noreply(process_stream_end({socket, Reason}, State)); handle_info(Info, #{mod := Mod} = State) -> noreply(try Mod:handle_info(Info, State) catch _:undef -> State @@ -698,14 +712,14 @@ process_compress(#compress{}, when Compressed or not Authenticated -> send_pkt(State, #compress_failure{reason = 'setup-failed'}); process_compress(#compress{methods = HisMethods}, - #{socket := Socket, sockmod := SockMod, mod := Mod} = State) -> + #{socket := Socket, mod := Mod} = State) -> MyMethods = try Mod:compress_methods(State) catch _:undef -> [] end, CommonMethods = lists_intersection(MyMethods, HisMethods), case lists:member(<<"zlib">>, CommonMethods) of true -> - case SockMod:compress(Socket) of + case xmpp_socket:compress(Socket) of {ok, ZlibSocket} -> State1 = send_pkt(State, #compressed{}), case is_disconnected(State1) of @@ -730,13 +744,13 @@ process_compress(#compress{methods = HisMethods}, process_starttls(#{stream_encrypted := true} = State) -> process_starttls_failure(already_encrypted, State); process_starttls(#{socket := Socket, - sockmod := SockMod, mod := Mod} = State) -> + mod := Mod} = State) -> case is_starttls_available(State) of true -> TLSOpts = try Mod:tls_options(State) catch _:undef -> [] end, - case SockMod:starttls(Socket, TLSOpts) of + case xmpp_socket:starttls(Socket, TLSOpts) of {ok, TLSSocket} -> State1 = send_pkt(State, #starttls_proceed{}), case is_disconnected(State1) of @@ -814,12 +828,13 @@ process_sasl_result({error, Reason, User}, State) -> -spec process_sasl_success([cyrsasl:sasl_property()], binary(), state()) -> state(). process_sasl_success(Props, ServerOut, - #{socket := Socket, sockmod := SockMod, + #{socket := Socket, mod := Mod, sasl_mech := Mech} = State) -> User = identity(Props), AuthModule = proplists:get_value(auth_module, Props), - SockMod:reset_stream(Socket), - State1 = send_pkt(State, #sasl_success{text = ServerOut}), + Socket1 = xmpp_socket:reset_stream(Socket), + State0 = State#{socket => Socket1}, + State1 = send_pkt(State0, #sasl_success{text = ServerOut}), case is_disconnected(State1) of true -> State1; false -> @@ -1090,17 +1105,17 @@ send_trailer(State) -> close_socket(State). -spec socket_send(state(), xmpp_element() | xmlel() | trailer) -> ok | {error, inet:posix()}. -socket_send(#{socket := Sock, sockmod := SockMod, +socket_send(#{socket := Sock, stream_state := StateName, xmlns := NS, stream_header_sent := true}, Pkt) -> case Pkt of trailer -> - SockMod:send_trailer(Sock); + xmpp_socket:send_trailer(Sock); #stream_start{} when StateName /= disconnected -> - SockMod:send_header(Sock, xmpp:encode(Pkt)); + xmpp_socket:send_header(Sock, xmpp:encode(Pkt)); _ when StateName /= disconnected -> - SockMod:send_element(Sock, xmpp:encode(Pkt, NS)); + xmpp_socket:send_element(Sock, xmpp:encode(Pkt, NS)); _ -> {error, closed} end; @@ -1108,8 +1123,8 @@ socket_send(_, _) -> {error, closed}. -spec close_socket(state()) -> state(). -close_socket(#{sockmod := SockMod, socket := Socket} = State) -> - SockMod:close(Socket), +close_socket(#{socket := Socket} = State) -> + xmpp_socket:close(Socket), State#{stream_timeout => infinity, stream_state => disconnected}. diff --git a/src/xmpp_stream_out.erl b/src/xmpp_stream_out.erl index 8f4fa5c84..b5851b0b8 100644 --- a/src/xmpp_stream_out.erl +++ b/src/xmpp_stream_out.erl @@ -191,16 +191,17 @@ set_timeout(#{owner := Owner} = State, Timeout) when Owner == self() -> set_timeout(_, _) -> erlang:error(badarg). -get_transport(#{sockmod := SockMod, socket := Socket, owner := Owner}) +get_transport(#{socket := Socket, owner := Owner}) when Owner == self() -> - SockMod:get_transport(Socket); + xmpp_socket:get_transport(Socket); get_transport(_) -> erlang:error(badarg). --spec change_shaper(state(), shaper:shaper()) -> ok. -change_shaper(#{sockmod := SockMod, socket := Socket, owner := Owner}, Shaper) +-spec change_shaper(state(), shaper:shaper()) -> state(). +change_shaper(#{socket := Socket, owner := Owner} = State, Shaper) when Owner == self() -> - SockMod:change_shaper(Socket, Shaper); + Socket1 = xmpp_socket:change_shaper(Socket, Shaper), + State#{socket => Socket1}; change_shaper(_, _) -> erlang:error(badarg). @@ -233,11 +234,10 @@ format_error(Err) -> %%% gen_server callbacks %%%=================================================================== -spec init(list()) -> {ok, state(), timeout()} | {stop, term()} | ignore. -init([Mod, SockMod, From, To, Opts]) -> +init([Mod, _SockMod, From, To, Opts]) -> Time = p1_time_compat:monotonic_time(milli_seconds), State = #{owner => self(), mod => Mod, - sockmod => SockMod, server => From, user => <<"">>, resource => <<"">>, @@ -272,7 +272,6 @@ handle_call(Call, From, #{mod := Mod} = State) -> -spec handle_cast(term(), state()) -> noreply(). handle_cast(connect, #{remote_server := RemoteServer, - sockmod := SockMod, stream_state := connecting} = State) -> noreply( case idna_to_ascii(RemoteServer) of @@ -283,7 +282,7 @@ handle_cast(connect, #{remote_server := RemoteServer, {ok, AddrPorts} -> case connect(AddrPorts, State) of {ok, Socket, {Addr, Port, Encrypted}} -> - SocketMonitor = SockMod:monitor(Socket), + SocketMonitor = xmpp_socket:monitor(Socket), State1 = State#{ip => {Addr, Port}, socket => Socket, stream_encrypted => Encrypted, @@ -388,6 +387,21 @@ handle_info(timeout, #{mod := Mod} = State) -> handle_info({'DOWN', MRef, _Type, _Object, _Info}, #{socket_monitor := MRef} = State) -> noreply(process_stream_end({socket, closed}, State)); +handle_info({tcp, _, Data}, #{socket := Socket} = State) -> + noreply( + case xmpp_socket:recv(Socket, Data) of + {ok, NewSocket} -> + State#{socket => NewSocket}; + {error, Reason} when is_atom(Reason) -> + process_stream_end({socket, Reason}, State); + {error, Reason} -> + %% TODO: make fast_tls return atoms + process_stream_end({tls, Reason}, State) + end); +handle_info({tcp_closed, _}, State) -> + handle_info({'$gen_event', closed}, State); +handle_info({tcp_error, _, Reason}, State) -> + noreply(process_stream_end({socket, Reason}, State)); handle_info(Info, #{mod := Mod} = State) -> noreply(try Mod:handle_info(Info, State) catch _:undef -> State @@ -638,13 +652,13 @@ process_cert_verification(State) -> -spec process_sasl_success(state()) -> state(). process_sasl_success(#{mod := Mod, - sockmod := SockMod, socket := Socket} = State) -> - SockMod:reset_stream(Socket), - State1 = State#{stream_id => new_id(), - stream_restarted => true, - stream_state => wait_for_stream, - stream_authenticated => true}, + Socket1 = xmpp_socket:reset_stream(Socket), + State0 = State#{socket => Socket1}, + State1 = State0#{stream_id => new_id(), + stream_restarted => true, + stream_state => wait_for_stream, + stream_authenticated => true}, State2 = send_header(State1), case is_disconnected(State2) of true -> State2; @@ -745,15 +759,15 @@ send_error(State, Pkt, Err) -> end. -spec socket_send(state(), xmpp_element() | xmlel() | trailer) -> ok | {error, inet:posix()}. -socket_send(#{sockmod := SockMod, socket := Socket, xmlns := NS, +socket_send(#{socket := Socket, xmlns := NS, stream_state := StateName}, Pkt) -> case Pkt of trailer -> - SockMod:send_trailer(Socket); + xmpp_socket:send_trailer(Socket); #stream_start{} when StateName /= disconnected -> - SockMod:send_header(Socket, xmpp:encode(Pkt)); + xmpp_socket:send_header(Socket, xmpp:encode(Pkt)); _ when StateName /= disconnected -> - SockMod:send_element(Socket, xmpp:encode(Pkt, NS)); + xmpp_socket:send_element(Socket, xmpp:encode(Pkt, NS)); _ -> {error, closed} end; @@ -768,8 +782,8 @@ send_trailer(State) -> -spec close_socket(state()) -> state(). close_socket(State) -> case State of - #{sockmod := SockMod, socket := Socket} -> - SockMod:close(Socket); + #{socket := Socket} -> + xmpp_socket:close(Socket); _ -> ok end, @@ -777,8 +791,8 @@ close_socket(State) -> stream_state => disconnected}. -spec starttls(term(), state()) -> {ok, term()} | {error, tls_error_reason()}. -starttls(Socket, #{sockmod := SockMod, mod := Mod, - xmlns := NS, remote_server := RemoteServer} = State) -> +starttls(Socket, #{mod := Mod, xmlns := NS, + remote_server := RemoteServer} = State) -> TLSOpts = try Mod:tls_options(State) catch _:undef -> [] end, @@ -787,7 +801,7 @@ starttls(Socket, #{sockmod := SockMod, mod := Mod, ?NS_SERVER -> <<"xmpp-server">>; ?NS_CLIENT -> <<"xmpp-client">> end, - SockMod:starttls(Socket, [connect, {sni, SNI}, {alpn, [ALPN]}|TLSOpts]). + xmpp_socket:starttls(Socket, [connect, {sni, SNI}, {alpn, [ALPN]}|TLSOpts]). -spec select_lang(binary(), binary()) -> binary(). select_lang(Lang, <<"">>) -> Lang; @@ -1020,9 +1034,9 @@ host_entry_to_addr_ports(#hostent{h_addr_list = AddrList}, Port, TLS) -> -spec connect([ip_port()], state()) -> {ok, term(), ip_port()} | {error, {socket, socket_error_reason()}} | {error, {tls, tls_error_reason()}}. -connect(AddrPorts, #{sockmod := SockMod} = State) -> +connect(AddrPorts, State) -> Timeout = get_connect_timeout(State), - case connect(AddrPorts, SockMod, Timeout, {error, nxdomain}) of + case connect(AddrPorts, Timeout, {error, nxdomain}) of {ok, Socket, {Addr, Port, TLS = true}} -> case starttls(Socket, State) of {ok, TLSSocket} -> {ok, TLSSocket, {Addr, Port, TLS}}; @@ -1034,24 +1048,24 @@ connect(AddrPorts, #{sockmod := SockMod} = State) -> {error, {socket, Why}} end. --spec connect([ip_port()], module(), timeout(), network_error()) -> +-spec connect([ip_port()], timeout(), network_error()) -> {ok, term(), ip_port()} | network_error(). -connect([{Addr, Port, TLS}|AddrPorts], SockMod, Timeout, _) -> +connect([{Addr, Port, TLS}|AddrPorts], Timeout, _) -> Type = get_addr_type(Addr), - try SockMod:connect(Addr, Port, - [binary, {packet, 0}, - {send_timeout, ?TCP_SEND_TIMEOUT}, - {send_timeout_close, true}, - {active, false}, Type], - Timeout) of + try xmpp_socket:connect(Addr, Port, + [binary, {packet, 0}, + {send_timeout, ?TCP_SEND_TIMEOUT}, + {send_timeout_close, true}, + {active, false}, Type], + Timeout) of {ok, Socket} -> {ok, Socket, {Addr, Port, TLS}}; Err -> - connect(AddrPorts, SockMod, Timeout, Err) + connect(AddrPorts, Timeout, Err) catch _:badarg -> - connect(AddrPorts, SockMod, Timeout, {error, einval}) + connect(AddrPorts, Timeout, {error, einval}) end; -connect([], _SockMod, _Timeout, Err) -> +connect([], _Timeout, Err) -> Err. -spec get_addr_type(inet:ip_address()) -> inet:address_family(). diff --git a/src/xmpp_stream_pkix.erl b/src/xmpp_stream_pkix.erl index 8361999f4..9ca9fdffc 100644 --- a/src/xmpp_stream_pkix.erl +++ b/src/xmpp_stream_pkix.erl @@ -40,10 +40,10 @@ authenticate(State) -> -spec authenticate(xmpp_stream_in:state() | xmpp_stream_out:state(), binary()) -> {ok, binary()} | {error, atom(), binary()}. -authenticate(#{xmlns := ?NS_SERVER, sockmod := SockMod, +authenticate(#{xmlns := ?NS_SERVER, socket := Socket} = State, Authzid) -> Peer = maps:get(remote_server, State, Authzid), - case verify_cert(SockMod, Socket) of + case verify_cert(Socket) of {ok, Cert} -> case ejabberd_idna:domain_utf8_to_ascii(Peer) of false -> @@ -61,7 +61,7 @@ authenticate(#{xmlns := ?NS_SERVER, sockmod := SockMod, {error, Reason} -> {error, Reason, Peer} end; -authenticate(#{xmlns := ?NS_CLIENT, sockmod := SockMod, +authenticate(#{xmlns := ?NS_CLIENT, socket := Socket, lserver := LServer}, Authzid) -> JID = try jid:decode(Authzid) catch _:{bad_jid, <<>>} -> jid:make(LServer); @@ -69,7 +69,7 @@ authenticate(#{xmlns := ?NS_CLIENT, sockmod := SockMod, end, case JID of #jid{user = User} -> - case verify_cert(SockMod, Socket) of + case verify_cert(Socket) of {ok, Cert} -> JIDs = get_xmpp_addrs(Cert), get_username(JID, JIDs, LServer); @@ -104,11 +104,11 @@ get_cert_domains(Cert) -> %%%=================================================================== %%% Internal functions %%%=================================================================== --spec verify_cert(module(), ejabberd_socket:socket()) -> {ok, cert()} | {error, atom()}. -verify_cert(SockMod, Socket) -> - case SockMod:get_peer_certificate(Socket, otp) of +-spec verify_cert(xmpp_socket:socket()) -> {ok, cert()} | {error, atom()}. +verify_cert(Socket) -> + case xmpp_socket:get_peer_certificate(Socket, otp) of {ok, Cert} -> - case SockMod:get_verify_result(Socket) of + case xmpp_socket:get_verify_result(Socket) of 0 -> {ok, Cert}; VerifyRes ->