%%%---------------------------------------------------------------------- %%% File : ejabberd_c2s.erl %%% Author : Alexey Shchepin %%% Purpose : Serve C2S connection %%% Created : 16 Nov 2002 by Alexey Shchepin %%% %%% %%% ejabberd, Copyright (C) 2002-2016 ProcessOne %%% %%% This program is free software; you can redistribute it and/or %%% modify it under the terms of the GNU General Public License as %%% published by the Free Software Foundation; either version 2 of the %%% License, or (at your option) any later version. %%% %%% This program is distributed in the hope that it will be useful, %%% but WITHOUT ANY WARRANTY; without even the implied warranty of %%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU %%% General Public License for more details. %%% %%% You should have received a copy of the GNU General Public License along %%% with this program; if not, write to the Free Software Foundation, Inc., %%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. %%% %%%---------------------------------------------------------------------- -module(ejabberd_c2s). -behaviour(ejabberd_config). -author('alexey@process-one.net'). -protocol({xep, 78, '2.5'}). -protocol({xep, 138, '2.0'}). -protocol({xep, 198, '1.3'}). -protocol({xep, 356, '7.1'}). -update_info({update, 0}). -define(GEN_FSM, p1_fsm). -behaviour(?GEN_FSM). %% External exports -export([start/2, stop/1, start_link/2, close/1, send_text/2, send_element/2, socket_type/0, get_presence/1, get_last_presence/1, get_aux_field/2, set_aux_field/3, del_aux_field/2, get_subscription/2, get_queued_stanzas/1, get_csi_state/1, set_csi_state/2, get_resume_timeout/1, set_resume_timeout/2, send_filtered/5, broadcast/4, get_subscribed/1, transform_listen_option/2]). -export([init/1, wait_for_stream/2, wait_for_auth/2, wait_for_feature_request/2, wait_for_bind/2, wait_for_sasl_response/2, wait_for_resume/2, session_established/2, handle_event/3, handle_sync_event/4, code_change/4, handle_info/3, terminate/3, print_state/1, opt_type/1]). -include("ejabberd.hrl"). -include("logger.hrl"). -include("xmpp.hrl"). %%-include("legacy.hrl"). -include("mod_privacy.hrl"). -define(SETS, gb_sets). -define(DICT, dict). %% pres_a contains all the presence available send (either through roster mechanism or directed). %% Directed presence unavailable remove user from pres_a. -record(state, {socket, sockmod, socket_monitor, xml_socket, streamid, sasl_state, access, shaper, zlib = false, tls = false, tls_required = false, tls_enabled = false, tls_options = [], authenticated = false, jid, user = <<"">>, server = <<"">>, resource = <<"">>, sid, pres_t = ?SETS:new(), pres_f = ?SETS:new(), pres_a = ?SETS:new(), pres_last, pres_timestamp, privacy_list = #userlist{}, conn = unknown, auth_module = unknown, ip, aux_fields = [], csi_state = active, mgmt_state, mgmt_xmlns, mgmt_queue, mgmt_max_queue, mgmt_pending_since, mgmt_timeout, mgmt_max_timeout, mgmt_ack_timeout, mgmt_ack_timer, mgmt_resend, mgmt_stanzas_in = 0, mgmt_stanzas_out = 0, mgmt_stanzas_req = 0, ask_offline = true, lang = <<"">>}). -type state_name() :: wait_for_stream | wait_for_auth | wait_for_feature_request | wait_for_bind | wait_for_sasl_response | wait_for_resume | session_established. -type state() :: #state{}. -type fsm_stop() :: {stop, normal, state()}. -type fsm_next() :: {next_state, state_name(), state(), non_neg_integer()}. -type fsm_reply() :: {reply, any(), state_name(), state(), non_neg_integer()}. -type fsm_transition() :: fsm_stop() | fsm_next(). -export_type([state/0]). %-define(DBGFSM, true). -ifdef(DBGFSM). -define(FSMOPTS, [{debug, [trace]}]). -else. -define(FSMOPTS, []). -endif. %% This is the timeout to apply between event when starting a new %% session: -define(C2S_OPEN_TIMEOUT, 60000). -define(C2S_HIBERNATE_TIMEOUT, ejabberd_config:get_option(c2s_hibernate, fun(X) when is_integer(X); X == hibernate-> X end, 90000)). -define(STREAM_HEADER, <<"">>). -define(STREAM_TRAILER, <<"">>). %% XEP-0198: -define(IS_STREAM_MGMT_PACKET(Pkt), is_record(Pkt, sm_enable) or is_record(Pkt, sm_resume) or is_record(Pkt, sm_a) or is_record(Pkt, sm_r)). %%%---------------------------------------------------------------------- %%% API %%%---------------------------------------------------------------------- start(SockData, Opts) -> ?GEN_FSM:start(ejabberd_c2s, [SockData, Opts], fsm_limit_opts(Opts) ++ ?FSMOPTS). start_link(SockData, Opts) -> (?GEN_FSM):start_link(ejabberd_c2s, [SockData, Opts], fsm_limit_opts(Opts) ++ ?FSMOPTS). socket_type() -> xml_stream. %% Return Username, Resource and presence information get_presence(FsmRef) -> (?GEN_FSM):sync_send_all_state_event(FsmRef, {get_presence}, 1000). get_last_presence(FsmRef) -> (?GEN_FSM):sync_send_all_state_event(FsmRef, {get_last_presence}, 1000). -spec get_aux_field(any(), state()) -> {ok, any()} | error. get_aux_field(Key, #state{aux_fields = Opts}) -> case lists:keyfind(Key, 1, Opts) of {_, Val} -> {ok, Val}; false -> error end. -spec set_aux_field(any(), any(), state()) -> state(). set_aux_field(Key, Val, #state{aux_fields = Opts} = State) -> Opts1 = lists:keydelete(Key, 1, Opts), State#state{aux_fields = [{Key, Val} | Opts1]}. -spec del_aux_field(any(), state()) -> state(). del_aux_field(Key, #state{aux_fields = Opts} = State) -> Opts1 = lists:keydelete(Key, 1, Opts), State#state{aux_fields = Opts1}. -spec get_subscription(jid() | ljid(), state()) -> both | from | to | none. get_subscription(From = #jid{}, StateData) -> get_subscription(jid:tolower(From), StateData); get_subscription(LFrom, StateData) -> LBFrom = setelement(3, LFrom, <<"">>), F = (?SETS):is_element(LFrom, StateData#state.pres_f) orelse (?SETS):is_element(LBFrom, StateData#state.pres_f), T = (?SETS):is_element(LFrom, StateData#state.pres_t) orelse (?SETS):is_element(LBFrom, StateData#state.pres_t), if F and T -> both; F -> from; T -> to; true -> none end. get_queued_stanzas(#state{mgmt_queue = Queue} = StateData) -> lists:map(fun({_N, Time, El}) -> add_resent_delay_info(StateData, El, Time) end, queue:to_list(Queue)). get_csi_state(#state{csi_state = CsiState}) -> CsiState. set_csi_state(#state{} = StateData, CsiState) -> StateData#state{csi_state = CsiState}; set_csi_state(FsmRef, CsiState) -> FsmRef ! {set_csi_state, CsiState}. get_resume_timeout(#state{mgmt_timeout = Timeout}) -> Timeout. set_resume_timeout(#state{} = StateData, Timeout) -> StateData#state{mgmt_timeout = Timeout}; set_resume_timeout(FsmRef, Timeout) -> FsmRef ! {set_resume_timeout, Timeout}. -spec send_filtered(pid(), binary(), jid(), jid(), stanza()) -> any(). send_filtered(FsmRef, Feature, From, To, Packet) -> FsmRef ! {send_filtered, Feature, From, To, Packet}. -spec broadcast(pid(), any(), jid(), stanza()) -> any(). broadcast(FsmRef, Type, From, Packet) -> FsmRef ! {broadcast, Type, From, Packet}. -spec stop(pid()) -> any(). stop(FsmRef) -> (?GEN_FSM):send_event(FsmRef, stop). -spec close(pid()) -> any(). %% What is the difference between stop and close??? close(FsmRef) -> (?GEN_FSM):send_event(FsmRef, closed). %%%---------------------------------------------------------------------- %%% Callback functions from gen_fsm %%%---------------------------------------------------------------------- init([{SockMod, Socket}, Opts]) -> Access = gen_mod:get_opt(access, Opts, fun acl:access_rules_validator/1, all), Shaper = gen_mod:get_opt(shaper, Opts, fun acl:shaper_rules_validator/1, none), XMLSocket = case lists:keysearch(xml_socket, 1, Opts) of {value, {_, XS}} -> XS; _ -> false end, Zlib = proplists:get_bool(zlib, Opts), StartTLS = proplists:get_bool(starttls, Opts), StartTLSRequired = proplists:get_bool(starttls_required, Opts), TLSEnabled = proplists:get_bool(tls, Opts), TLS = StartTLS orelse StartTLSRequired orelse TLSEnabled, TLSOpts1 = lists:filter(fun ({certfile, _}) -> true; ({ciphers, _}) -> true; ({dhfile, _}) -> true; (_) -> false end, Opts), TLSOpts2 = case lists:keysearch(protocol_options, 1, Opts) of {value, {_, O}} -> [_|ProtocolOptions] = lists:foldl( fun(X, Acc) -> X ++ Acc end, [], [["|" | binary_to_list(Opt)] || Opt <- O, is_binary(Opt)] ), [{protocol_options, iolist_to_binary(ProtocolOptions)} | TLSOpts1]; _ -> TLSOpts1 end, TLSOpts3 = case proplists:get_bool(tls_compression, Opts) of false -> [compression_none | TLSOpts2]; true -> TLSOpts2 end, TLSOpts = [verify_none | TLSOpts3], StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true), StreamMgmtState = if StreamMgmtEnabled -> inactive; true -> disabled end, MaxAckQueue = case proplists:get_value(max_ack_queue, Opts) of Limit when is_integer(Limit), Limit > 0 -> Limit; infinity -> infinity; _ -> 1000 end, ResumeTimeout = case proplists:get_value(resume_timeout, Opts) of RTimeo when is_integer(RTimeo), RTimeo >= 0 -> RTimeo; _ -> 300 end, MaxResumeTimeout = case proplists:get_value(max_resume_timeout, Opts) of Max when is_integer(Max), Max >= ResumeTimeout -> Max; _ -> ResumeTimeout end, AckTimeout = case proplists:get_value(ack_timeout, Opts) of ATimeo when is_integer(ATimeo), ATimeo > 0 -> ATimeo * 1000; infinity -> undefined; _ -> 60000 end, ResendOnTimeout = case proplists:get_value(resend_on_timeout, Opts) of Resend when is_boolean(Resend) -> Resend; if_offline -> if_offline; _ -> false end, IP = peerip(SockMod, Socket), Socket1 = if TLSEnabled andalso SockMod /= ejabberd_frontend_socket -> SockMod:starttls(Socket, TLSOpts); true -> Socket end, SocketMonitor = SockMod:monitor(Socket1), StateData = #state{socket = Socket1, sockmod = SockMod, socket_monitor = SocketMonitor, xml_socket = XMLSocket, zlib = Zlib, tls = TLS, tls_required = StartTLSRequired, tls_enabled = TLSEnabled, tls_options = TLSOpts, sid = ejabberd_sm:make_sid(), streamid = new_id(), access = Access, shaper = Shaper, ip = IP, mgmt_state = StreamMgmtState, mgmt_max_queue = MaxAckQueue, mgmt_timeout = ResumeTimeout, mgmt_max_timeout = MaxResumeTimeout, mgmt_ack_timeout = AckTimeout, mgmt_resend = ResendOnTimeout}, {ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}. -spec get_subscribed(pid()) -> [ljid()]. %% Return list of all available resources of contacts, get_subscribed(FsmRef) -> (?GEN_FSM):sync_send_all_state_event(FsmRef, get_subscribed, 1000). wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) -> try xmpp:decode(#xmlel{name = Name, attrs = Attrs}) of #stream_start{xmlns = NS_CLIENT, stream_xmlns = NS_STREAM, version = Version, lang = Lang} when NS_CLIENT /= ?NS_CLIENT; NS_STREAM /= ?NS_STREAM -> send_header(StateData, ?MYNAME, Version, Lang), send_element(StateData, xmpp:serr_invalid_namespace()), {stop, normal, StateData}; #stream_start{lang = Lang, version = Version} when byte_size(Lang) > 35 -> %% As stated in BCP47, 4.4.1: %% Protocols or specifications that specify limited buffer sizes for %% language tags MUST allow for language tags of at least 35 characters. %% Do not store long language tag to avoid possible DoS/flood attacks send_header(StateData, ?MYNAME, Version, ?MYLANG), Txt = <<"Too long value of 'xml:lang' attribute">>, send_element(StateData, xmpp:serr_policy_violation(Txt, ?MYLANG)), {stop, normal, StateData}; #stream_start{to = undefined, lang = Lang, version = Version} -> Txt = <<"Missing 'to' attribute">>, send_header(StateData, ?MYNAME, Version, Lang), send_element(StateData, xmpp:serr_improper_addressing(Txt, Lang)), {stop, normal, StateData}; #stream_start{to = #jid{lserver = To}, lang = Lang, version = Version} -> Server = case StateData#state.server of <<"">> -> To; S -> S end, StreamVersion = case Version of {1,0} -> {1,0}; _ -> undefined end, IsBlacklistedIP = is_ip_blacklisted(StateData#state.ip, Lang), case lists:member(Server, ?MYHOSTS) of true when IsBlacklistedIP == false -> change_shaper(StateData, jid:make(<<"">>, Server, <<"">>)), case StreamVersion of {1,0} -> send_header(StateData, Server, {1,0}, ?MYLANG), case StateData#state.authenticated of false -> TLS = StateData#state.tls, TLSEnabled = StateData#state.tls_enabled, TLSRequired = StateData#state.tls_required, SASLState = cyrsasl:server_new( <<"jabber">>, Server, <<"">>, [], fun (U) -> ejabberd_auth:get_password_with_authmodule( U, Server) end, fun(U, AuthzId, P) -> ejabberd_auth:check_password_with_authmodule( U, AuthzId, Server, P) end, fun(U, AuthzId, P, D, DG) -> ejabberd_auth:check_password_with_authmodule( U, AuthzId, Server, P, D, DG) end), Mechs = case TLSEnabled or not TLSRequired of true -> [#sasl_mechanisms{list = cyrsasl:listmech(Server)}]; false -> [] end, SockMod = (StateData#state.sockmod):get_sockmod(StateData#state.socket), Zlib = StateData#state.zlib, CompressFeature = case Zlib andalso ((SockMod == gen_tcp) orelse (SockMod == fast_tls)) of true -> [#compression{methods = [<<"zlib">>]}]; _ -> [] end, TLSFeature = case (TLS == true) andalso (TLSEnabled == false) andalso (SockMod == gen_tcp) of true -> [#starttls{required = TLSRequired}]; false -> [] end, StreamFeatures1 = TLSFeature ++ CompressFeature ++ Mechs, StreamFeatures = ejabberd_hooks:run_fold(c2s_stream_features, Server, StreamFeatures1, [Server]), send_element(StateData, #stream_features{sub_els = StreamFeatures}), fsm_next_state(wait_for_feature_request, StateData#state{server = Server, sasl_state = SASLState, lang = Lang}); _ -> case StateData#state.resource of <<"">> -> RosterVersioningFeature = ejabberd_hooks:run_fold(roster_get_versioning_feature, Server, [], [Server]), StreamManagementFeature = case stream_mgmt_enabled(StateData) of true -> [#feature_sm{xmlns = ?NS_STREAM_MGMT_2}, #feature_sm{xmlns = ?NS_STREAM_MGMT_3}]; false -> [] end, SockMod = (StateData#state.sockmod):get_sockmod( StateData#state.socket), Zlib = StateData#state.zlib, CompressFeature = case Zlib andalso ((SockMod == gen_tcp) orelse (SockMod == fast_tls)) of true -> [#compression{methods = [<<"zlib">>]}]; _ -> [] end, StreamFeatures1 = [#bind{}, #xmpp_session{optional = true}] ++ RosterVersioningFeature ++ StreamManagementFeature ++ CompressFeature ++ ejabberd_hooks:run_fold(c2s_post_auth_features, Server, [], [Server]), StreamFeatures = ejabberd_hooks:run_fold(c2s_stream_features, Server, StreamFeatures1, [Server]), send_element(StateData, #stream_features{sub_els = StreamFeatures}), fsm_next_state(wait_for_bind, StateData#state{server = Server, lang = Lang}); _ -> send_element(StateData, #stream_features{}), fsm_next_state(session_established, StateData#state{server = Server, lang = Lang}) end end; _ -> send_header(StateData, Server, StreamVersion, ?MYLANG), if not StateData#state.tls_enabled and StateData#state.tls_required -> send_element( StateData, xmpp:serr_policy_violation( <<"Use of STARTTLS required">>, Lang)), {stop, normal, StateData}; true -> fsm_next_state(wait_for_auth, StateData#state{server = Server, lang = Lang}) end end; true -> IP = StateData#state.ip, {true, LogReason, ReasonT} = IsBlacklistedIP, ?INFO_MSG("Connection attempt from blacklisted IP ~s: ~s", [jlib:ip_to_list(IP), LogReason]), send_header(StateData, Server, StreamVersion, ?MYLANG), send_element(StateData, xmpp:serr_policy_violation(ReasonT, Lang)), {stop, normal, StateData}; _ -> send_header(StateData, ?MYNAME, StreamVersion, ?MYLANG), send_element(StateData, xmpp:serr_host_unknown()), {stop, normal, StateData} end; _ -> send_header(StateData, ?MYNAME, {1,0}, ?MYLANG), send_element(StateData, xmpp:serr_invalid_xml()), {stop, normal, StateData} catch _:{xmpp_codec, Why} -> Txt = xmpp:format_error(Why), send_header(StateData, ?MYNAME, {1,0}, ?MYLANG), send_element(StateData, xmpp:serr_invalid_xml(Txt, ?MYLANG)), {stop, normal, StateData} end; wait_for_stream(timeout, StateData) -> {stop, normal, StateData}; wait_for_stream({xmlstreamelement, _}, StateData) -> send_element(StateData, xmpp:serr_not_well_formed()), {stop, normal, StateData}; wait_for_stream({xmlstreamend, _}, StateData) -> send_element(StateData, xmpp:serr_not_well_formed()), {stop, normal, StateData}; wait_for_stream({xmlstreamerror, _}, StateData) -> send_header(StateData, ?MYNAME, {1,0}, <<"">>), send_element(StateData, xmpp:serr_not_well_formed()), {stop, normal, StateData}; wait_for_stream(closed, StateData) -> {stop, normal, StateData}; wait_for_stream(stop, StateData) -> {stop, normal, StateData}. wait_for_auth({xmlstreamelement, #xmlel{} = El}, StateData) -> decode_element(El, wait_for_auth, StateData); wait_for_auth(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) -> fsm_next_state(wait_for_auth, dispatch_stream_mgmt(Pkt, StateData)); wait_for_auth(#iq{type = get, sub_els = [#legacy_auth{}]} = IQ, StateData) -> Auth = #legacy_auth{username = <<>>, password = <<>>, resource = <<>>}, Res = case ejabberd_auth:plain_password_required(StateData#state.server) of false -> xmpp:make_iq_result(IQ, Auth#legacy_auth{digest = <<>>}); true -> xmpp:make_iq_result(IQ, Auth) end, send_element(StateData, Res), fsm_next_state(wait_for_auth, StateData); wait_for_auth(#iq{type = set, sub_els = [#legacy_auth{resource = <<"">>}]} = IQ, StateData) -> Lang = StateData#state.lang, Txt = <<"No resource provided">>, Err = xmpp:make_error(IQ, xmpp:err_not_acceptable(Txt, Lang)), send_element(StateData, Err), fsm_next_state(wait_for_auth, StateData); wait_for_auth(#iq{type = set, sub_els = [#legacy_auth{username = U, password = P0, digest = D0, resource = R}]} = IQ, StateData) when is_binary(U), is_binary(R) -> JID = jid:make(U, StateData#state.server, R), case (JID /= error) andalso acl:access_matches(StateData#state.access, #{usr => jid:split(JID), ip => StateData#state.ip}, StateData#state.server) == allow of true -> DGen = fun (PW) -> p1_sha:sha(<<(StateData#state.streamid)/binary, PW/binary>>) end, P = if is_binary(P0) -> P0; true -> <<>> end, D = if is_binary(D0) -> D0; true -> <<>> end, case ejabberd_auth:check_password_with_authmodule( U, U, StateData#state.server, P, D, DGen) of {true, AuthModule} -> ?INFO_MSG("(~w) Accepted legacy authentication for ~s by ~p from ~s", [StateData#state.socket, jid:to_string(JID), AuthModule, ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), ejabberd_hooks:run(c2s_auth_result, StateData#state.server, [true, U, StateData#state.server, StateData#state.ip]), Conn = get_conn_type(StateData), Info = [{ip, StateData#state.ip}, {conn, Conn}, {auth_module, AuthModule}], Res = xmpp:make_iq_result(IQ), send_element(StateData, Res), ejabberd_sm:open_session(StateData#state.sid, U, StateData#state.server, R, Info), change_shaper(StateData, JID), {Fs, Ts} = ejabberd_hooks:run_fold( roster_get_subscription_lists, StateData#state.server, {[], []}, [U, StateData#state.server]), LJID = jid:tolower(jid:remove_resource(JID)), Fs1 = [LJID | Fs], Ts1 = [LJID | Ts], PrivList = ejabberd_hooks:run_fold(privacy_get_user_list, StateData#state.server, #userlist{}, [U, StateData#state.server]), NewStateData = StateData#state{ user = U, resource = R, jid = JID, conn = Conn, auth_module = AuthModule, pres_f = (?SETS):from_list(Fs1), pres_t = (?SETS):from_list(Ts1), privacy_list = PrivList}, fsm_next_state(session_established, NewStateData); _ -> ?INFO_MSG("(~w) Failed legacy authentication for ~s from ~s", [StateData#state.socket, jid:to_string(JID), ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), ejabberd_hooks:run(c2s_auth_result, StateData#state.server, [false, U, StateData#state.server, StateData#state.ip]), Lang = StateData#state.lang, Txt = <<"Legacy authentication failed">>, Err = xmpp:make_error(IQ, xmpp:err_not_authorized(Txt, Lang)), send_element(StateData, Err), fsm_next_state(wait_for_auth, StateData) end; false when JID == error -> ?INFO_MSG("(~w) Forbidden legacy authentication " "for username '~s' with resource '~s'", [StateData#state.socket, U, R]), Err = xmpp:make_error(IQ, xmpp:err_jid_malformed()), send_element(StateData, Err), fsm_next_state(wait_for_auth, StateData); false -> ?INFO_MSG("(~w) Forbidden legacy authentication for ~s from ~s", [StateData#state.socket, jid:to_string(JID), ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), ejabberd_hooks:run(c2s_auth_result, StateData#state.server, [false, U, StateData#state.server, StateData#state.ip]), Lang = StateData#state.lang, Txt = <<"Legacy authentication forbidden">>, Err = xmpp:make_error(IQ, xmpp:err_not_allowed(Txt, Lang)), send_element(StateData, Err), fsm_next_state(wait_for_auth, StateData) end; wait_for_auth(timeout, StateData) -> {stop, normal, StateData}; wait_for_auth({xmlstreamend, _Name}, StateData) -> {stop, normal, StateData}; wait_for_auth({xmlstreamerror, _}, StateData) -> send_element(StateData, xmpp:serr_not_well_formed()), {stop, normal, StateData}; wait_for_auth(closed, StateData) -> {stop, normal, StateData}; wait_for_auth(stop, StateData) -> {stop, normal, StateData}; wait_for_auth(Pkt, StateData) -> process_unauthenticated_stanza(StateData, Pkt), fsm_next_state(wait_for_auth, StateData). wait_for_feature_request({xmlstreamelement, El}, StateData) -> decode_element(El, wait_for_feature_request, StateData); wait_for_feature_request(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) -> fsm_next_state(wait_for_feature_request, dispatch_stream_mgmt(Pkt, StateData)); wait_for_feature_request(#sasl_auth{mechanism = Mech, text = ClientIn}, #state{tls_enabled = TLSEnabled, tls_required = TLSRequired} = StateData) when TLSEnabled or not TLSRequired -> case cyrsasl:server_start(StateData#state.sasl_state, Mech, ClientIn) of {ok, Props} -> (StateData#state.sockmod):reset_stream(StateData#state.socket), U = identity(Props), AuthModule = proplists:get_value(auth_module, Props, undefined), ?INFO_MSG("(~w) Accepted authentication for ~s by ~p from ~s", [StateData#state.socket, U, AuthModule, ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), ejabberd_hooks:run(c2s_auth_result, StateData#state.server, [true, U, StateData#state.server, StateData#state.ip]), send_element(StateData, #sasl_success{}), fsm_next_state(wait_for_stream, StateData#state{streamid = new_id(), authenticated = true, auth_module = AuthModule, sasl_state = undefined, user = U}); {continue, ServerOut, NewSASLState} -> send_element(StateData, #sasl_challenge{text = ServerOut}), fsm_next_state(wait_for_sasl_response, StateData#state{sasl_state = NewSASLState}); {error, Error, Username} -> ?INFO_MSG("(~w) Failed authentication for ~s@~s from ~s", [StateData#state.socket, Username, StateData#state.server, ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), ejabberd_hooks:run(c2s_auth_result, StateData#state.server, [false, Username, StateData#state.server, StateData#state.ip]), send_element(StateData, #sasl_failure{reason = Error}), fsm_next_state(wait_for_feature_request, StateData); {error, Error} -> send_element(StateData, #sasl_failure{reason = Error}), fsm_next_state(wait_for_feature_request, StateData) end; wait_for_feature_request(#starttls{}, #state{tls = true, tls_enabled = false} = StateData) -> case (StateData#state.sockmod):get_sockmod(StateData#state.socket) of gen_tcp -> TLSOpts = case ejabberd_config:get_option( {domain_certfile, StateData#state.server}, fun iolist_to_binary/1) of undefined -> StateData#state.tls_options; CertFile -> lists:keystore(certfile, 1, StateData#state.tls_options, {certfile, CertFile}) end, Socket = StateData#state.socket, BProceed = fxml:element_to_binary(xmpp:encode(#starttls_proceed{})), TLSSocket = (StateData#state.sockmod):starttls(Socket, TLSOpts, BProceed), fsm_next_state(wait_for_stream, StateData#state{socket = TLSSocket, streamid = new_id(), tls_enabled = true}); _ -> Lang = StateData#state.lang, Txt = <<"Unsupported TLS transport">>, send_element(StateData, xmpp:serr_policy_violation(Txt, Lang)), {stop, normal, StateData} end; wait_for_feature_request(#compress{} = Comp, StateData) -> Zlib = StateData#state.zlib, SockMod = (StateData#state.sockmod):get_sockmod(StateData#state.socket), if Zlib == true, (SockMod == gen_tcp) or (SockMod == fast_tls) -> process_compression_request(Comp, wait_for_feature_request, StateData); true -> send_element(StateData, #compress_failure{reason = 'setup-failed'}), fsm_next_state(wait_for_feature_request, StateData) end; wait_for_feature_request(timeout, StateData) -> {stop, normal, StateData}; wait_for_feature_request({xmlstreamend, _Name}, StateData) -> {stop, normal, StateData}; wait_for_feature_request({xmlstreamerror, _}, StateData) -> send_element(StateData, xmpp:serr_not_well_formed()), {stop, normal, StateData}; wait_for_feature_request(closed, StateData) -> {stop, normal, StateData}; wait_for_feature_request(stop, StateData) -> {stop, normal, StateData}; wait_for_feature_request(_Pkt, #state{tls_required = TLSRequired, tls_enabled = TLSEnabled} = StateData) when TLSRequired and not TLSEnabled -> Lang = StateData#state.lang, Txt = <<"Use of STARTTLS required">>, send_element(StateData, xmpp:serr_policy_violation(Txt, Lang)), {stop, normal, StateData}; wait_for_feature_request(Pkt, StateData) -> process_unauthenticated_stanza(StateData, Pkt), fsm_next_state(wait_for_feature_request, StateData). wait_for_sasl_response({xmlstreamelement, El}, StateData) -> decode_element(El, wait_for_sasl_response, StateData); wait_for_sasl_response(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) -> fsm_next_state(wait_for_sasl_response, dispatch_stream_mgmt(Pkt, StateData)); wait_for_sasl_response(#sasl_response{text = ClientIn}, StateData) -> case cyrsasl:server_step(StateData#state.sasl_state, ClientIn) of {ok, Props} -> catch (StateData#state.sockmod):reset_stream(StateData#state.socket), U = identity(Props), AuthModule = proplists:get_value(auth_module, Props, <<>>), ?INFO_MSG("(~w) Accepted authentication for ~s by ~p from ~s", [StateData#state.socket, U, AuthModule, ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), ejabberd_hooks:run(c2s_auth_result, StateData#state.server, [true, U, StateData#state.server, StateData#state.ip]), send_element(StateData, #sasl_success{}), fsm_next_state(wait_for_stream, StateData#state{streamid = new_id(), authenticated = true, auth_module = AuthModule, sasl_state = undefined, user = U}); {ok, Props, ServerOut} -> (StateData#state.sockmod):reset_stream(StateData#state.socket), U = identity(Props), AuthModule = proplists:get_value(auth_module, Props, undefined), ?INFO_MSG("(~w) Accepted authentication for ~s by ~p from ~s", [StateData#state.socket, U, AuthModule, ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), ejabberd_hooks:run(c2s_auth_result, StateData#state.server, [true, U, StateData#state.server, StateData#state.ip]), send_element(StateData, #sasl_success{text = ServerOut}), fsm_next_state(wait_for_stream, StateData#state{streamid = new_id(), authenticated = true, auth_module = AuthModule, sasl_state = undefined, user = U}); {continue, ServerOut, NewSASLState} -> send_element(StateData, #sasl_challenge{text = ServerOut}), fsm_next_state(wait_for_sasl_response, StateData#state{sasl_state = NewSASLState}); {error, Error, Username} -> ?INFO_MSG("(~w) Failed authentication for ~s@~s from ~s", [StateData#state.socket, Username, StateData#state.server, ejabberd_config:may_hide_data(jlib:ip_to_list(StateData#state.ip))]), ejabberd_hooks:run(c2s_auth_result, StateData#state.server, [false, Username, StateData#state.server, StateData#state.ip]), send_element(StateData, #sasl_failure{reason = Error}), fsm_next_state(wait_for_feature_request, StateData); {error, Error} -> send_element(StateData, #sasl_failure{reason = Error}), fsm_next_state(wait_for_feature_request, StateData) end; wait_for_sasl_response(timeout, StateData) -> {stop, normal, StateData}; wait_for_sasl_response({xmlstreamend, _Name}, StateData) -> {stop, normal, StateData}; wait_for_sasl_response({xmlstreamerror, _}, StateData) -> send_element(StateData, xmpp:serr_not_well_formed()), {stop, normal, StateData}; wait_for_sasl_response(closed, StateData) -> {stop, normal, StateData}; wait_for_sasl_response(stop, StateData) -> {stop, normal, StateData}; wait_for_sasl_response(Pkt, StateData) -> process_unauthenticated_stanza(StateData, Pkt), fsm_next_state(wait_for_feature_request, StateData). -spec resource_conflict_action(binary(), binary(), binary()) -> {accept_resource, binary()} | closenew. resource_conflict_action(U, S, R) -> OptionRaw = case ejabberd_sm:is_existing_resource(U, S, R) of true -> ejabberd_config:get_option( {resource_conflict, S}, fun(setresource) -> setresource; (closeold) -> closeold; (closenew) -> closenew; (acceptnew) -> acceptnew end); false -> acceptnew end, Option = case OptionRaw of setresource -> setresource; closeold -> acceptnew; %% ejabberd_sm will close old session closenew -> closenew; acceptnew -> acceptnew; _ -> acceptnew %% default ejabberd behavior end, case Option of acceptnew -> {accept_resource, R}; closenew -> closenew; setresource -> Rnew = new_uniq_id(), {accept_resource, Rnew} end. -spec decode_element(xmlel(), state_name(), state()) -> fsm_transition(). decode_element(#xmlel{} = El, StateName, StateData) -> try case xmpp:decode(El, ?NS_CLIENT, [ignore_els]) of #iq{sub_els = [_], type = T} = Pkt when T == set; T == get -> NewPkt = xmpp:decode_els( Pkt, ?NS_CLIENT, fun(SubEl) when StateName == session_established -> case xmpp:get_ns(SubEl) of ?NS_PRIVACY -> true; ?NS_BLOCKING -> true; _ -> false end; (SubEl) -> xmpp:is_known_tag(SubEl) end), ?MODULE:StateName(NewPkt, StateData); Pkt -> ?MODULE:StateName(Pkt, StateData) end catch error:{xmpp_codec, Why} -> NS = xmpp:get_ns(El), fsm_next_state( StateName, case xmpp:is_stanza(El) of true -> Lang = xmpp:get_lang(El), Txt = xmpp:format_error(Why), send_error(StateData, El, xmpp:err_bad_request(Txt, Lang)); false when NS == ?NS_STREAM_MGMT_2; NS == ?NS_STREAM_MGMT_3 -> Err = #sm_failed{reason = 'bad-request', xmlns = NS}, send_element(StateData, Err), StateData; false -> StateData end) end. wait_for_bind({xmlstreamelement, El}, StateData) -> decode_element(El, wait_for_bind, StateData); wait_for_bind(#sm_resume{} = Pkt, StateData) -> case handle_resume(StateData, Pkt) of {ok, ResumedState} -> fsm_next_state(session_established, ResumedState); error -> fsm_next_state(wait_for_bind, StateData) end; wait_for_bind(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) -> fsm_next_state(wait_for_bind, dispatch_stream_mgmt(Pkt, StateData)); wait_for_bind(#iq{type = set, sub_els = [#bind{resource = R0}]} = IQ, StateData) -> U = StateData#state.user, R = case R0 of <<>> -> new_uniq_id(); _ -> R0 end, case resource_conflict_action(U, StateData#state.server, R) of closenew -> Err = xmpp:make_error(IQ, xmpp:err_conflict()), send_element(StateData, Err), fsm_next_state(wait_for_bind, StateData); {accept_resource, R2} -> JID = jid:make(U, StateData#state.server, R2), StateData2 = StateData#state{resource = R2, jid = JID}, case open_session(StateData2) of {ok, StateData3} -> Res = xmpp:make_iq_result(IQ, #bind{jid = JID}), try send_element(StateData3, Res) catch exit:normal -> close(self()) end, fsm_next_state_pack(session_established,StateData3); {error, Error} -> Err = xmpp:make_error(IQ, Error), send_element(StateData, Err), fsm_next_state(wait_for_bind, StateData) end end; wait_for_bind(#compress{} = Comp, StateData) -> Zlib = StateData#state.zlib, SockMod = (StateData#state.sockmod):get_sockmod(StateData#state.socket), if Zlib == true, (SockMod == gen_tcp) or (SockMod == fast_tls) -> process_compression_request(Comp, wait_for_bind, StateData); true -> send_element(StateData, #compress_failure{reason = 'setup-failed'}), fsm_next_state(wait_for_bind, StateData) end; wait_for_bind(timeout, StateData) -> {stop, normal, StateData}; wait_for_bind({xmlstreamend, _Name}, StateData) -> {stop, normal, StateData}; wait_for_bind({xmlstreamerror, _}, StateData) -> send_element(StateData, xmpp:serr_not_well_formed()), {stop, normal, StateData}; wait_for_bind(closed, StateData) -> {stop, normal, StateData}; wait_for_bind(stop, StateData) -> {stop, normal, StateData}; wait_for_bind(Pkt, StateData) -> fsm_next_state( wait_for_bind, case xmpp:is_stanza(Pkt) of true -> send_error(StateData, Pkt, xmpp:err_not_acceptable()); false -> StateData end). -spec open_session(state()) -> {ok, state()} | {error, stanza_error()}. open_session(StateData) -> U = StateData#state.user, R = StateData#state.resource, JID = StateData#state.jid, Lang = StateData#state.lang, IP = StateData#state.ip, case acl:access_matches(StateData#state.access, #{usr => jid:split(JID), ip => IP}, StateData#state.server) of allow -> ?INFO_MSG("(~w) Opened session for ~s", [StateData#state.socket, jid:to_string(JID)]), change_shaper(StateData, JID), {Fs, Ts} = ejabberd_hooks:run_fold( roster_get_subscription_lists, StateData#state.server, {[], []}, [U, StateData#state.server]), LJID = jid:tolower(jid:remove_resource(JID)), Fs1 = [LJID | Fs], Ts1 = [LJID | Ts], PrivList = ejabberd_hooks:run_fold( privacy_get_user_list, StateData#state.server, #userlist{}, [U, StateData#state.server]), Conn = get_conn_type(StateData), Info = [{ip, StateData#state.ip}, {conn, Conn}, {auth_module, StateData#state.auth_module}], ejabberd_sm:open_session( StateData#state.sid, U, StateData#state.server, R, Info), UpdatedStateData = StateData#state{ conn = Conn, pres_f = ?SETS:from_list(Fs1), pres_t = ?SETS:from_list(Ts1), privacy_list = PrivList}, {ok, UpdatedStateData}; _ -> ejabberd_hooks:run(forbidden_session_hook, StateData#state.server, [JID]), ?INFO_MSG("(~w) Forbidden session for ~s", [StateData#state.socket, jid:to_string(JID)]), Txt = <<"Denied by ACL">>, {error, xmpp:err_not_allowed(Txt, Lang)} end. session_established({xmlstreamelement, El}, StateData) -> decode_element(El, session_established, StateData); session_established(Pkt, StateData) when ?IS_STREAM_MGMT_PACKET(Pkt) -> fsm_next_state(session_established, dispatch_stream_mgmt(Pkt, StateData)); session_established(#csi{type = active}, StateData) -> NewStateData = csi_flush_queue(StateData), fsm_next_state(session_established, NewStateData#state{csi_state = active}); session_established(#csi{type = inactive}, StateData) -> fsm_next_state(session_established, StateData#state{csi_state = inactive}); %% We hibernate the process to reduce memory consumption after a %% configurable activity timeout session_established(timeout, StateData) -> Options = [], proc_lib:hibernate(?GEN_FSM, enter_loop, [?MODULE, Options, session_established, StateData]), fsm_next_state(session_established, StateData); session_established({xmlstreamend, _Name}, StateData) -> {stop, normal, StateData}; session_established({xmlstreamerror, <<"XML stanza is too big">> = E}, StateData) -> send_element(StateData, xmpp:serr_policy_violation(E, StateData#state.lang)), {stop, normal, StateData}; session_established({xmlstreamerror, _}, StateData) -> send_element(StateData, xmpp:serr_not_well_formed()), {stop, normal, StateData}; session_established(closed, #state{mgmt_state = active} = StateData) -> catch (StateData#state.sockmod):close(StateData#state.socket), fsm_next_state(wait_for_resume, StateData); session_established(closed, StateData) -> {stop, normal, StateData}; session_established(stop, StateData) -> {stop, normal, StateData}; session_established(Pkt, StateData) when ?is_stanza(Pkt) -> FromJID = StateData#state.jid, case check_from(Pkt, FromJID) of 'invalid-from' -> send_element(StateData, xmpp:serr_invalid_from()), {stop, normal, StateData}; _ -> NewStateData = update_num_stanzas_in(StateData, Pkt), session_established2(Pkt, NewStateData) end; session_established(_Pkt, StateData) -> fsm_next_state(session_established, StateData). -spec session_established2(xmpp_element(), state()) -> fsm_next(). %% Process packets sent by user (coming from user on c2s XMPP connection) session_established2(Pkt, StateData) -> User = StateData#state.user, Server = StateData#state.server, FromJID = StateData#state.jid, ToJID = case xmpp:get_to(Pkt) of undefined -> jid:make(User, Server, <<"">>); J -> J end, Lang = case xmpp:get_lang(Pkt) of <<"">> -> StateData#state.lang; L -> L end, NewPkt = xmpp:set_lang(Pkt, Lang), NewState = case NewPkt of #presence{} -> Presence0 = ejabberd_hooks:run_fold( c2s_update_presence, Server, NewPkt, [User, Server]), Presence = ejabberd_hooks:run_fold( user_send_packet, Server, Presence0, [StateData, FromJID, ToJID]), case ToJID of #jid{user = User, server = Server, resource = <<"">>} -> ?DEBUG("presence_update(~p,~n\t~p,~n\t~p)", [FromJID, Presence, StateData]), presence_update(FromJID, Presence, StateData); _ -> presence_track(FromJID, ToJID, Presence, StateData) end; #iq{type = T, sub_els = [El]} when T == set; T == get -> NS = xmpp:get_ns(El), if NS == ?NS_BLOCKING; NS == ?NS_PRIVACY -> IQ = xmpp:set_from_to(Pkt, FromJID, ToJID), process_privacy_iq(IQ, StateData); NS == ?NS_SESSION -> Res = xmpp:make_iq_result(Pkt), send_stanza(StateData, Res); true -> NewPkt0 = ejabberd_hooks:run_fold( user_send_packet, Server, NewPkt, [StateData, FromJID, ToJID]), check_privacy_route(FromJID, StateData, FromJID, ToJID, NewPkt0) end; _ -> NewPkt0 = ejabberd_hooks:run_fold( user_send_packet, Server, NewPkt, [StateData, FromJID, ToJID]), check_privacy_route(FromJID, StateData, FromJID, ToJID, NewPkt0) end, ejabberd_hooks:run(c2s_loop_debug, [{xmlstreamelement, Pkt}]), fsm_next_state(session_established, NewState). wait_for_resume({xmlstreamelement, _El} = Event, StateData) -> Result = session_established(Event, StateData), fsm_next_state(wait_for_resume, element(3, Result)); wait_for_resume(timeout, StateData) -> ?DEBUG("Timed out waiting for resumption of stream for ~s", [jid:to_string(StateData#state.jid)]), {stop, normal, StateData#state{mgmt_state = timeout}}; wait_for_resume(Event, StateData) -> ?DEBUG("Ignoring event while waiting for resumption: ~p", [Event]), fsm_next_state(wait_for_resume, StateData). handle_event(_Event, StateName, StateData) -> fsm_next_state(StateName, StateData). handle_sync_event({get_presence}, _From, StateName, StateData) -> User = StateData#state.user, PresLast = StateData#state.pres_last, Show = get_showtag(PresLast), Status = get_statustag(PresLast), Resource = StateData#state.resource, Reply = {User, Resource, Show, Status}, fsm_reply(Reply, StateName, StateData); handle_sync_event({get_last_presence}, _From, StateName, StateData) -> User = StateData#state.user, Server = StateData#state.server, PresLast = StateData#state.pres_last, Resource = StateData#state.resource, Reply = {User, Server, Resource, PresLast}, fsm_reply(Reply, StateName, StateData); handle_sync_event(get_subscribed, _From, StateName, StateData) -> Subscribed = (?SETS):to_list(StateData#state.pres_f), {reply, Subscribed, StateName, StateData}; handle_sync_event({resume_session, Time}, _From, _StateName, StateData) when element(1, StateData#state.sid) == Time -> %% The old session should be closed before the new one is opened, so we do %% this here instead of leaving it to the terminate callback ejabberd_sm:close_session(StateData#state.sid, StateData#state.user, StateData#state.server, StateData#state.resource), {stop, normal, {resume, StateData}, StateData#state{mgmt_state = resumed}}; handle_sync_event({resume_session, _Time}, _From, StateName, StateData) -> {reply, {error, <<"Previous session not found">>}, StateName, StateData}; handle_sync_event(_Event, _From, StateName, StateData) -> Reply = ok, fsm_reply(Reply, StateName, StateData). code_change(_OldVsn, StateName, StateData, _Extra) -> {ok, StateName, StateData}. handle_info({send_text, Text}, StateName, StateData) -> send_text(StateData, Text), ejabberd_hooks:run(c2s_loop_debug, [Text]), fsm_next_state(StateName, StateData); handle_info(replaced, StateName, StateData) -> Lang = StateData#state.lang, Pkt = xmpp:serr_conflict(<<"Replaced by new connection">>, Lang), handle_info({kick, replaced, Pkt}, StateName, StateData); handle_info(kick, StateName, StateData) -> Lang = StateData#state.lang, Pkt = xmpp:serr_policy_violation(<<"has been kicked">>, Lang), handle_info({kick, kicked_by_admin, Pkt}, StateName, StateData); handle_info({kick, Reason, Pkt}, _StateName, StateData) -> send_element(StateData, Pkt), {stop, normal, StateData#state{authenticated = Reason}}; handle_info({route, _From, _To, {broadcast, Data}}, StateName, StateData) -> ?DEBUG("broadcast~n~p~n", [Data]), case Data of {item, IJID, ISubscription} -> fsm_next_state(StateName, roster_change(IJID, ISubscription, StateData)); {exit, Reason} -> Lang = StateData#state.lang, send_element(StateData, xmpp:serr_conflict(Reason, Lang)), {stop, normal, StateData}; {privacy_list, PrivList, PrivListName} -> case ejabberd_hooks:run_fold(privacy_updated_list, StateData#state.server, false, [StateData#state.privacy_list, PrivList]) of false -> fsm_next_state(StateName, StateData); NewPL -> PrivPushIQ = #iq{type = set, from = jid:remove_resource(StateData#state.jid), to = StateData#state.jid, id = <<"push", (randoms:get_string())/binary>>, sub_els = [#privacy_query{ lists = [#privacy_list{ name = PrivListName}]}]}, NewState = send_stanza(StateData, PrivPushIQ), fsm_next_state(StateName, NewState#state{privacy_list = NewPL}) end; {blocking, What} -> NewState = route_blocking(What, StateData), fsm_next_state(StateName, NewState); _ -> fsm_next_state(StateName, StateData) end; %% Process Packets that are to be send to the user handle_info({route, From, To, Packet}, StateName, StateData) when ?is_stanza(Packet) -> {Pass, NewState} = case Packet of #presence{type = T} -> State = ejabberd_hooks:run_fold(c2s_presence_in, StateData#state.server, StateData, [{From, To, Packet}]), case T of probe -> LFrom = jid:tolower(From), LBFrom = jid:remove_resource(LFrom), NewStateData = case (?SETS):is_element(LFrom, State#state.pres_a) orelse (?SETS):is_element(LBFrom, State#state.pres_a) of true -> State; false -> case (?SETS):is_element(LFrom, State#state.pres_f) of true -> A = (?SETS):add_element(LFrom, State#state.pres_a), State#state{pres_a = A}; false -> case (?SETS):is_element(LBFrom, State#state.pres_f) of true -> A = (?SETS):add_element(LBFrom, State#state.pres_a), State#state{pres_a = A}; false -> State end end end, process_presence_probe(From, To, NewStateData), {false, NewStateData}; error -> NewA = ?SETS:del_element(jid:tolower(From), State#state.pres_a), {true, State#state{pres_a = NewA}}; subscribe -> SRes = is_privacy_allow(State, From, To, Packet, in), {SRes, State}; subscribed -> SRes = is_privacy_allow(State, From, To, Packet, in), {SRes, State}; unsubscribe -> SRes = is_privacy_allow(State, From, To, Packet, in), {SRes, State}; unsubscribed -> SRes = is_privacy_allow(State, From, To, Packet, in), {SRes, State}; _ -> case privacy_check_packet(State, From, To, Packet, in) of allow -> LFrom = jid:tolower(From), LBFrom = jid:remove_resource(LFrom), case (?SETS):is_element(LFrom, State#state.pres_a) orelse (?SETS):is_element(LBFrom, State#state.pres_a) of true -> {true, State}; false -> case (?SETS):is_element(LFrom, State#state.pres_f) of true -> A = (?SETS):add_element(LFrom, State#state.pres_a), {true, State#state{pres_a = A}}; false -> case (?SETS):is_element(LBFrom, State#state.pres_f) of true -> A = (?SETS):add_element( LBFrom, State#state.pres_a), {true, State#state{pres_a = A}}; false -> {true, State} end end end; deny -> {false, State} end end; #iq{type = T} -> case xmpp:has_subtag(Packet, #last{}) of true when T == get; T == set -> LFrom = jid:tolower(From), LBFrom = jid:remove_resource(LFrom), HasFromSub = ((?SETS):is_element(LFrom, StateData#state.pres_f) orelse (?SETS):is_element(LBFrom, StateData#state.pres_f)) andalso is_privacy_allow(StateData, To, From, #presence{}, out), case HasFromSub of true -> case privacy_check_packet( StateData, From, To, Packet, in) of allow -> {true, StateData}; deny -> ejabberd_router:route_error( To, From, Packet, xmpp:err_service_unavailable()), {false, StateData} end; _ -> ejabberd_router:route_error( To, From, Packet, xmpp:err_forbidden()), {false, StateData} end; _ -> case privacy_check_packet(StateData, From, To, Packet, in) of allow -> {true, StateData}; deny -> ejabberd_router:route_error( To, From, Packet, xmpp:err_service_unavailable()), {false, StateData} end end; #message{type = T} -> case privacy_check_packet(StateData, From, To, Packet, in) of allow -> {true, StateData}; deny -> case T of groupchat -> ok; headline -> ok; _ -> case xmpp:has_subtag(Packet, #muc_user{}) of true -> ok; false -> ejabberd_router:route_error( To, From, Packet, xmpp:err_service_unavailable()) end end, {false, StateData} end end, if Pass -> FixedPacket0 = xmpp:set_from_to(Packet, From, To), FixedPacket = ejabberd_hooks:run_fold( user_receive_packet, NewState#state.server, FixedPacket0, [NewState, NewState#state.jid, From, To]), SentStateData = send_packet(NewState, FixedPacket), ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), fsm_next_state(StateName, SentStateData); true -> ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), fsm_next_state(StateName, NewState) end; handle_info({'DOWN', Monitor, _Type, _Object, _Info}, _StateName, StateData) when Monitor == StateData#state.socket_monitor -> if StateData#state.mgmt_state == active; StateData#state.mgmt_state == pending -> fsm_next_state(wait_for_resume, StateData); true -> {stop, normal, StateData} end; handle_info(system_shutdown, StateName, StateData) -> case StateName of wait_for_stream -> send_header(StateData, ?MYNAME, {1,0}, <<"en">>), send_element(StateData, xmpp:serr_system_shutdown()), ok; _ -> send_element(StateData, xmpp:serr_system_shutdown()), ok end, {stop, normal, StateData}; handle_info({route_xmlstreamelement, El}, _StateName, StateData) -> {next_state, NStateName, NStateData, _Timeout} = session_established({xmlstreamelement, El}, StateData), fsm_next_state(NStateName, NStateData); handle_info({force_update_presence, LUser, LServer}, StateName, #state{jid = #jid{luser = LUser, lserver = LServer}} = StateData) -> NewStateData = case StateData#state.pres_last of #presence{} -> Presence = ejabberd_hooks:run_fold(c2s_update_presence, LServer, StateData#state.pres_last, [LUser, LServer]), StateData2 = StateData#state{pres_last = Presence}, presence_update(StateData2#state.jid, Presence, StateData2), StateData2; undefined -> StateData end, fsm_next_state(StateName, NewStateData); handle_info({send_filtered, Feature, From, To, Packet}, StateName, StateData) -> Drop = ejabberd_hooks:run_fold(c2s_filter_packet, StateData#state.server, true, [StateData#state.server, StateData, Feature, To, Packet]), NewStateData = if Drop -> ?DEBUG("Dropping packet from ~p to ~p", [jid:to_string(From), jid:to_string(To)]), StateData; true -> FinalPacket = xmpp:set_from_to(Packet, From, To), case StateData#state.jid of To -> case privacy_check_packet(StateData, From, To, FinalPacket, in) of deny -> StateData; allow -> send_stanza(StateData, FinalPacket) end; _ -> ejabberd_router:route(From, To, FinalPacket), StateData end end, fsm_next_state(StateName, NewStateData); handle_info({broadcast, Type, From, Packet}, StateName, StateData) -> Recipients = ejabberd_hooks:run_fold( c2s_broadcast_recipients, StateData#state.server, [], [StateData#state.server, StateData, Type, From, Packet]), lists:foreach( fun(USR) -> ejabberd_router:route( From, jid:make(USR), Packet) end, lists:usort(Recipients)), fsm_next_state(StateName, StateData); handle_info({set_csi_state, CsiState}, StateName, StateData) -> fsm_next_state(StateName, StateData#state{csi_state = CsiState}); handle_info({set_resume_timeout, Timeout}, StateName, StateData) -> fsm_next_state(StateName, StateData#state{mgmt_timeout = Timeout}); handle_info(dont_ask_offline, StateName, StateData) -> fsm_next_state(StateName, StateData#state{ask_offline = false}); handle_info(close, StateName, StateData) -> ?DEBUG("Timeout waiting for stream management acknowledgement of ~s", [jid:to_string(StateData#state.jid)]), close(self()), fsm_next_state(StateName, StateData#state{mgmt_ack_timer = undefined}); handle_info({_Ref, {resume, OldStateData}}, StateName, StateData) -> %% This happens if the resume_session/1 request timed out; the new session %% now receives the late response. ?DEBUG("Received old session state for ~s after failed resumption", [jid:to_string(OldStateData#state.jid)]), handle_unacked_stanzas(OldStateData#state{mgmt_resend = false}), fsm_next_state(StateName, StateData); handle_info(Info, StateName, StateData) -> ?ERROR_MSG("Unexpected info: ~p", [Info]), fsm_next_state(StateName, StateData). -spec print_state(state()) -> state(). print_state(State = #state{pres_t = T, pres_f = F, pres_a = A}) -> State#state{pres_t = {pres_t, (?SETS):size(T)}, pres_f = {pres_f, (?SETS):size(F)}, pres_a = {pres_a, (?SETS):size(A)}}. terminate(_Reason, StateName, StateData) -> case StateData#state.mgmt_state of resumed -> ?INFO_MSG("Closing former stream of resumed session for ~s", [jid:to_string(StateData#state.jid)]); _ -> if StateName == session_established; StateName == wait_for_resume -> case StateData#state.authenticated of replaced -> ?INFO_MSG("(~w) Replaced session for ~s", [StateData#state.socket, jid:to_string(StateData#state.jid)]), From = StateData#state.jid, Lang = StateData#state.lang, Status = <<"Replaced by new connection">>, Packet = #presence{ type = unavailable, status = xmpp:mk_text(Status, Lang)}, ejabberd_sm:close_session_unset_presence(StateData#state.sid, StateData#state.user, StateData#state.server, StateData#state.resource, Status), presence_broadcast(StateData, From, StateData#state.pres_a, Packet); _ -> ?INFO_MSG("(~w) Close session for ~s", [StateData#state.socket, jid:to_string(StateData#state.jid)]), EmptySet = (?SETS):new(), case StateData of #state{pres_last = undefined, pres_a = EmptySet} -> ejabberd_sm:close_session(StateData#state.sid, StateData#state.user, StateData#state.server, StateData#state.resource); _ -> From = StateData#state.jid, Packet = #presence{type = unavailable}, ejabberd_sm:close_session_unset_presence(StateData#state.sid, StateData#state.user, StateData#state.server, StateData#state.resource, <<"">>), presence_broadcast(StateData, From, StateData#state.pres_a, Packet) end, case StateData#state.mgmt_state of timeout -> Info = [{num_stanzas_in, StateData#state.mgmt_stanzas_in}], ejabberd_sm:set_offline_info(StateData#state.sid, StateData#state.user, StateData#state.server, StateData#state.resource, Info); _ -> ok end end, handle_unacked_stanzas(StateData), bounce_messages(); true -> ok end end, catch send_trailer(StateData), (StateData#state.sockmod):close(StateData#state.socket), ok. %%%---------------------------------------------------------------------- %%% Internal functions %%%---------------------------------------------------------------------- -spec change_shaper(state(), jid()) -> ok. change_shaper(StateData, JID) -> Shaper = acl:access_matches(StateData#state.shaper, #{usr => jid:split(JID), ip => StateData#state.ip}, StateData#state.server), (StateData#state.sockmod):change_shaper(StateData#state.socket, Shaper). -spec send_text(state(), iodata()) -> ok | {error, any()}. send_text(StateData, Text) when StateData#state.mgmt_state == pending -> ?DEBUG("Cannot send text while waiting for resumption: ~p", [Text]); send_text(StateData, Text) when StateData#state.xml_socket -> ?DEBUG("Send Text on stream = ~p", [Text]), (StateData#state.sockmod):send_xml(StateData#state.socket, {xmlstreamraw, Text}); send_text(StateData, Text) when StateData#state.mgmt_state == active -> ?DEBUG("Send XML on stream = ~p", [Text]), case catch (StateData#state.sockmod):send(StateData#state.socket, Text) of {'EXIT', _} -> (StateData#state.sockmod):close(StateData#state.socket), {error, closed}; _ -> ok end; send_text(StateData, Text) -> ?DEBUG("Send XML on stream = ~p", [Text]), (StateData#state.sockmod):send(StateData#state.socket, Text). -spec send_element(state(), xmlel() | xmpp_element()) -> ok | {error, any()}. send_element(StateData, El) when StateData#state.mgmt_state == pending -> ?DEBUG("Cannot send element while waiting for resumption: ~p", [El]); send_element(StateData, #xmlel{} = El) when StateData#state.xml_socket -> ?DEBUG("Send XML on stream = ~p", [fxml:element_to_binary(El)]), (StateData#state.sockmod):send_xml(StateData#state.socket, {xmlstreamelement, El}); send_element(StateData, #xmlel{} = El) -> send_text(StateData, fxml:element_to_binary(El)); send_element(StateData, Pkt) -> send_element(StateData, xmpp:encode(Pkt, ?NS_CLIENT)). -spec send_error(state(), xmlel() | stanza(), stanza_error()) -> state(). send_error(StateData, Stanza, Error) -> Type = xmpp:get_type(Stanza), if Type == error; Type == result; Type == <<"error">>; Type == <<"result">> -> StateData; true -> send_stanza(StateData, xmpp:make_error(Stanza, Error)) end. -spec send_stanza(state(), xmpp_element()) -> state(). send_stanza(StateData, Stanza) when StateData#state.csi_state == inactive -> csi_filter_stanza(StateData, Stanza); send_stanza(StateData, Stanza) when StateData#state.mgmt_state == pending -> mgmt_queue_add(StateData, Stanza); send_stanza(StateData, Stanza) when StateData#state.mgmt_state == active -> NewStateData = mgmt_queue_add(StateData, Stanza), mgmt_send_stanza(NewStateData, Stanza); send_stanza(StateData, Stanza) -> send_element(StateData, Stanza), StateData. -spec send_packet(state(), xmpp_element()) -> state(). send_packet(StateData, Packet) -> case xmpp:is_stanza(Packet) of true -> send_stanza(StateData, Packet); false -> send_element(StateData, Packet), StateData end. -spec send_header(state(), binary(), binary(), binary()) -> ok | {error, any()}. send_header(StateData, Server, Version, Lang) -> Header = #xmlel{name = Name, attrs = Attrs} = xmpp:encode(#stream_start{version = Version, lang = Lang, xmlns = ?NS_CLIENT, stream_xmlns = ?NS_STREAM, id = StateData#state.streamid, from = jid:make(Server)}), if StateData#state.xml_socket -> (StateData#state.sockmod):send_xml(StateData#state.socket, {xmlstreamstart, Name, Attrs}); true -> send_text(StateData, fxml:element_to_header(Header)) end. -spec send_trailer(state()) -> ok | {error, any()}. send_trailer(StateData) when StateData#state.mgmt_state == pending -> ?DEBUG("Cannot send stream trailer while waiting for resumption", []); send_trailer(StateData) when StateData#state.xml_socket -> (StateData#state.sockmod):send_xml(StateData#state.socket, {xmlstreamend, <<"stream:stream">>}); send_trailer(StateData) -> send_text(StateData, ?STREAM_TRAILER). -spec new_id() -> binary(). new_id() -> randoms:get_string(). -spec new_uniq_id() -> binary(). new_uniq_id() -> iolist_to_binary([randoms:get_string(), integer_to_binary(p1_time_compat:unique_integer([positive]))]). -spec get_conn_type(state()) -> c2s | c2s_tls | c2s_compressed | websocket | c2s_compressed_tls | http_bind. get_conn_type(StateData) -> case (StateData#state.sockmod):get_transport(StateData#state.socket) of tcp -> c2s; tls -> c2s_tls; tcp_zlib -> c2s_compressed; tls_zlib -> c2s_compressed_tls; http_bind -> http_bind; websocket -> websocket end. -spec process_presence_probe(jid(), jid(), state()) -> ok. process_presence_probe(From, To, StateData) -> LFrom = jid:tolower(From), LBFrom = setelement(3, LFrom, <<"">>), case StateData#state.pres_last of undefined -> ok; _ -> Cond = ((?SETS):is_element(LFrom, StateData#state.pres_f) orelse ((LFrom /= LBFrom) andalso (?SETS):is_element(LBFrom, StateData#state.pres_f))), if Cond -> %% To is the one sending the presence (the probe target) Packet = xmpp_util:add_delay_info( StateData#state.pres_last, To, StateData#state.pres_timestamp), case privacy_check_packet(StateData, To, From, Packet, out) of deny -> ok; allow -> Pid=element(2, StateData#state.sid), ejabberd_hooks:run(presence_probe_hook, StateData#state.server, [From, To, Pid]), %% Don't route a presence probe to oneself case From == To of false -> ejabberd_router:route(To, From, Packet); true -> ok end end; true -> ok end end. %% User updates his presence (non-directed presence packet) -spec presence_update(jid(), presence(), state()) -> state(). presence_update(From, Packet, StateData) -> #presence{type = Type} = Packet, case Type of unavailable -> Status = xmpp:get_text(Packet#presence.status), Info = [{ip, StateData#state.ip}, {conn, StateData#state.conn}, {auth_module, StateData#state.auth_module}], ejabberd_sm:unset_presence(StateData#state.sid, StateData#state.user, StateData#state.server, StateData#state.resource, Status, Info), presence_broadcast(StateData, From, StateData#state.pres_a, Packet), StateData#state{pres_last = undefined, pres_timestamp = undefined, pres_a = (?SETS):new()}; error -> StateData; probe -> StateData; subscribe -> StateData; subscribed -> StateData; unsubscribe -> StateData; unsubscribed -> StateData; _ -> OldPriority = case StateData#state.pres_last of undefined -> 0; OldPresence -> get_priority_from_presence(OldPresence) end, NewPriority = get_priority_from_presence(Packet), update_priority(NewPriority, Packet, StateData), FromUnavail = (StateData#state.pres_last == undefined), ?DEBUG("from unavail = ~p~n", [FromUnavail]), NewStateData = StateData#state{pres_last = Packet, pres_timestamp = p1_time_compat:timestamp()}, NewState = if FromUnavail -> ejabberd_hooks:run(user_available_hook, NewStateData#state.server, [NewStateData#state.jid]), ResentStateData = if NewPriority >= 0 -> resend_offline_messages(NewStateData), resend_subscription_requests(NewStateData); true -> NewStateData end, presence_broadcast_first(From, ResentStateData, Packet); true -> presence_broadcast_to_trusted(NewStateData, From, NewStateData#state.pres_f, NewStateData#state.pres_a, Packet), if OldPriority < 0, NewPriority >= 0 -> resend_offline_messages(NewStateData); true -> ok end, NewStateData end, NewState end. %% User sends a directed presence packet -spec presence_track(jid(), jid(), presence(), state()) -> state(). presence_track(From, To, Packet, StateData) -> #presence{type = Type} = Packet, LTo = jid:tolower(To), User = StateData#state.user, Server = StateData#state.server, Lang = StateData#state.lang, case privacy_check_packet(StateData, From, To, Packet, out) of deny -> ErrText = <<"Your active privacy list has denied " "the routing of this stanza.">>, Err = xmpp:err_not_acceptable(ErrText, Lang), send_error(StateData, xmpp:set_from_to(Packet, From, To), Err); allow when Type == subscribe; Type == subscribed; Type == unsubscribe; Type == unsubscribed -> Access = gen_mod:get_module_opt(Server, mod_roster, access, fun(A) when is_atom(A) -> A end, all), MyBareJID = jid:make(User, Server, <<"">>), case acl:match_rule(Server, Access, MyBareJID) of deny -> ErrText = <<"Denied by ACL">>, Err = xmpp:err_forbidden(ErrText, Lang), send_error(StateData, xmpp:set_from_to(Packet, From, To), Err); allow -> ejabberd_hooks:run(roster_out_subscription, Server, [User, Server, To, Type]), ejabberd_router:route(jid:remove_resource(From), To, Packet), StateData end; allow when Type == error; Type == probe -> ejabberd_router:route(From, To, Packet), StateData; allow -> ejabberd_router:route(From, To, Packet), A = case Type of available -> ?SETS:add_element(LTo, StateData#state.pres_a); unavailable -> ?SETS:del_element(LTo, StateData#state.pres_a) end, StateData#state{pres_a = A} end. -spec check_privacy_route(jid(), state(), jid(), jid(), stanza()) -> state(). check_privacy_route(From, StateData, FromRoute, To, Packet) -> case privacy_check_packet(StateData, From, To, Packet, out) of deny -> Lang = StateData#state.lang, ErrText = <<"Your active privacy list has denied " "the routing of this stanza.">>, Err = xmpp:err_not_acceptable(ErrText, Lang), send_error(StateData, xmpp:set_from_to(Packet, From, To), Err); allow -> ejabberd_router:route(FromRoute, To, Packet), StateData end. %% Check if privacy rules allow this delivery -spec privacy_check_packet(state(), jid(), jid(), stanza(), in | out) -> allow | deny. privacy_check_packet(StateData, From, To, Packet, Dir) -> ejabberd_hooks:run_fold(privacy_check_packet, StateData#state.server, allow, [StateData#state.user, StateData#state.server, StateData#state.privacy_list, {From, To, Packet}, Dir]). -spec is_privacy_allow(state(), jid(), jid(), stanza(), in | out) -> boolean(). is_privacy_allow(StateData, From, To, Packet, Dir) -> allow == privacy_check_packet(StateData, From, To, Packet, Dir). %% Send presence when disconnecting -spec presence_broadcast(state(), jid(), ?SETS:set(), presence()) -> ok. presence_broadcast(StateData, From, JIDSet, Packet) -> JIDs = ?SETS:to_list(JIDSet), JIDs2 = format_and_check_privacy(From, StateData, Packet, JIDs, out), Server = StateData#state.server, send_multiple(From, Server, JIDs2, Packet). -spec presence_broadcast_to_trusted( state(), jid(), ?SETS:set(), ?SETS:set(), presence()) -> ok. %% Send presence when updating presence presence_broadcast_to_trusted(StateData, From, Trusted, JIDSet, Packet) -> JIDs = ?SETS:to_list(?SETS:intersection(Trusted, JIDSet)), JIDs2 = format_and_check_privacy(From, StateData, Packet, JIDs, out), Server = StateData#state.server, send_multiple(From, Server, JIDs2, Packet). %% Send presence when connecting -spec presence_broadcast_first(jid(), state(), presence()) -> state(). presence_broadcast_first(From, StateData, Packet) -> JIDsProbe = ?SETS:fold( fun(JID, L) -> [JID | L] end, [], StateData#state.pres_t), PacketProbe = #presence{type = probe}, JIDs2Probe = format_and_check_privacy(From, StateData, PacketProbe, JIDsProbe, out), Server = StateData#state.server, send_multiple(From, Server, JIDs2Probe, PacketProbe), {As, JIDs} = ?SETS:fold( fun(JID, {A, JID_list}) -> {?SETS:add_element(JID, A), JID_list++[JID]} end, {StateData#state.pres_a, []}, StateData#state.pres_f), JIDs2 = format_and_check_privacy(From, StateData, Packet, JIDs, out), send_multiple(From, Server, JIDs2, Packet), StateData#state{pres_a = As}. -spec format_and_check_privacy( jid(), state(), stanza(), [ljid()], in | out) -> [jid()]. format_and_check_privacy(From, StateData, Packet, JIDs, Dir) -> FJIDs = [jid:make(JID) || JID <- JIDs], lists:filter( fun(FJID) -> case ejabberd_hooks:run_fold( privacy_check_packet, StateData#state.server, allow, [StateData#state.user, StateData#state.server, StateData#state.privacy_list, {From, FJID, Packet}, Dir]) of deny -> false; allow -> true end end, FJIDs). -spec send_multiple(jid(), binary(), [jid()], stanza()) -> ok. send_multiple(From, Server, JIDs, Packet) -> ejabberd_router_multicast:route_multicast(From, Server, JIDs, Packet). -spec roster_change(jid(), both | from | none | remove | to, state()) -> state(). roster_change(IJID, ISubscription, StateData) -> LIJID = jid:tolower(IJID), IsFrom = (ISubscription == both) or (ISubscription == from), IsTo = (ISubscription == both) or (ISubscription == to), OldIsFrom = (?SETS):is_element(LIJID, StateData#state.pres_f), FSet = if IsFrom -> (?SETS):add_element(LIJID, StateData#state.pres_f); true -> ?SETS:del_element(LIJID, StateData#state.pres_f) end, TSet = if IsTo -> (?SETS):add_element(LIJID, StateData#state.pres_t); true -> ?SETS:del_element(LIJID, StateData#state.pres_t) end, case StateData#state.pres_last of undefined -> StateData#state{pres_f = FSet, pres_t = TSet}; P -> ?DEBUG("roster changed for ~p~n", [StateData#state.user]), From = StateData#state.jid, To = jid:make(IJID), Cond1 = IsFrom andalso not OldIsFrom, Cond2 = not IsFrom andalso OldIsFrom andalso ((?SETS):is_element(LIJID, StateData#state.pres_a)), if Cond1 -> ?DEBUG("C1: ~p~n", [LIJID]), case privacy_check_packet(StateData, From, To, P, out) of deny -> ok; allow -> ejabberd_router:route(From, To, P) end, A = (?SETS):add_element(LIJID, StateData#state.pres_a), StateData#state{pres_a = A, pres_f = FSet, pres_t = TSet}; Cond2 -> ?DEBUG("C2: ~p~n", [LIJID]), PU = #presence{type = unavailable}, case privacy_check_packet(StateData, From, To, PU, out) of deny -> ok; allow -> ejabberd_router:route(From, To, PU) end, A = ?SETS:del_element(LIJID, StateData#state.pres_a), StateData#state{pres_a = A, pres_f = FSet, pres_t = TSet}; true -> StateData#state{pres_f = FSet, pres_t = TSet} end end. -spec update_priority(integer(), presence(), state()) -> ok. update_priority(Priority, Packet, StateData) -> Info = [{ip, StateData#state.ip}, {conn, StateData#state.conn}, {auth_module, StateData#state.auth_module}], ejabberd_sm:set_presence(StateData#state.sid, StateData#state.user, StateData#state.server, StateData#state.resource, Priority, Packet, Info). -spec get_priority_from_presence(presence()) -> integer(). get_priority_from_presence(#presence{priority = Prio}) -> case Prio of undefined -> 0; _ -> Prio end. -spec process_privacy_iq(iq(), state()) -> state(). process_privacy_iq(#iq{from = From, to = To, type = Type, lang = Lang} = IQ, StateData) -> Txt = <<"No module is handling this query">>, {Res, NewStateData} = case Type of get -> R = ejabberd_hooks:run_fold( privacy_iq_get, StateData#state.server, {error, xmpp:err_feature_not_implemented(Txt, Lang)}, [IQ, StateData#state.privacy_list]), {R, StateData}; set -> case ejabberd_hooks:run_fold( privacy_iq_set, StateData#state.server, {error, xmpp:err_feature_not_implemented(Txt, Lang)}, [IQ, StateData#state.privacy_list]) of {result, R, NewPrivList} -> {{result, R}, StateData#state{privacy_list = NewPrivList}}; R -> {R, StateData} end end, IQRes = case Res of {result, Result} -> xmpp:make_iq_result(IQ, Result); {error, Error} -> xmpp:make_error(IQ, Error) end, ejabberd_router:route(To, From, IQRes), NewStateData. -spec resend_offline_messages(state()) -> ok. resend_offline_messages(#state{ask_offline = true} = StateData) -> case ejabberd_hooks:run_fold(resend_offline_messages_hook, StateData#state.server, [], [StateData#state.user, StateData#state.server]) of Rs -> %%when is_list(Rs) -> lists:foreach(fun ({route, From, To, Packet}) -> Pass = case privacy_check_packet(StateData, From, To, Packet, in) of allow -> true; deny -> false end, if Pass -> ejabberd_router:route(From, To, Packet); true -> ok end end, Rs) end; resend_offline_messages(_StateData) -> ok. -spec resend_subscription_requests(state()) -> state(). resend_subscription_requests(#state{user = User, server = Server} = StateData) -> PendingSubscriptions = ejabberd_hooks:run_fold(resend_subscription_requests_hook, Server, [], [User, Server]), lists:foldl(fun (XMLPacket, AccStateData) -> send_packet(AccStateData, XMLPacket) end, StateData, PendingSubscriptions). -spec get_showtag(undefined | presence()) -> binary(). get_showtag(undefined) -> <<"unavailable">>; get_showtag(#presence{show = undefined}) -> <<"available">>; get_showtag(#presence{show = Show}) -> atom_to_binary(Show, utf8). -spec get_statustag(undefined | presence()) -> binary(). get_statustag(#presence{status = Status}) -> xmpp:get_text(Status); get_statustag(undefined) -> <<"">>. -spec process_unauthenticated_stanza(state(), iq()) -> ok | {error, any()}. process_unauthenticated_stanza(StateData, #iq{type = T, lang = L} = IQ) when T == set; T == get -> Lang = if L == undefined; L == <<"">> -> StateData#state.lang; true -> L end, NewIQ = IQ#iq{lang = Lang}, Res = ejabberd_hooks:run_fold(c2s_unauthenticated_iq, StateData#state.server, empty, [StateData#state.server, NewIQ, StateData#state.ip]), case Res of empty -> Txt = <<"Authentication required">>, Err0 = xmpp:make_error(IQ, xmpp:err_service_unavailable(Txt, Lang)), Err1 = Err0#iq{from = jid:make(<<>>, StateData#state.server, <<>>), to = undefined}, send_element(StateData, Err1); _ -> send_element(StateData, Res) end; process_unauthenticated_stanza(_StateData, _) -> %% Drop any stanza, which isn't IQ stanza ok. -spec peerip(ejabberd_socket:sockmod(), ejabberd_socket:socket()) -> {inet:ip_address(), non_neg_integer()} | undefined. peerip(SockMod, Socket) -> IP = case SockMod of gen_tcp -> inet:peername(Socket); _ -> SockMod:peername(Socket) end, case IP of {ok, IPOK} -> IPOK; _ -> undefined end. %% fsm_next_state_pack: Pack the StateData structure to improve %% sharing. -spec fsm_next_state_pack(state_name(), state()) -> fsm_transition(). fsm_next_state_pack(StateName, StateData) -> fsm_next_state_gc(StateName, pack(StateData)). -spec fsm_next_state_gc(state_name(), state()) -> fsm_transition(). %% fsm_next_state_gc: Garbage collect the process heap to make use of %% the newly packed StateData structure. fsm_next_state_gc(StateName, PackedStateData) -> erlang:garbage_collect(), fsm_next_state(StateName, PackedStateData). %% fsm_next_state: Generate the next_state FSM tuple with different %% timeout, depending on the future state -spec fsm_next_state(state_name(), state()) -> fsm_transition(). fsm_next_state(session_established, #state{mgmt_max_queue = exceeded} = StateData) -> ?WARNING_MSG("ACK queue too long, terminating session for ~s", [jid:to_string(StateData#state.jid)]), Err = xmpp:serr_policy_violation(<<"Too many unacked stanzas">>, StateData#state.lang), send_element(StateData, Err), {stop, normal, StateData#state{mgmt_resend = false}}; fsm_next_state(session_established, #state{mgmt_state = pending} = StateData) -> fsm_next_state(wait_for_resume, StateData); fsm_next_state(session_established, StateData) -> {next_state, session_established, StateData, ?C2S_HIBERNATE_TIMEOUT}; fsm_next_state(wait_for_resume, #state{mgmt_timeout = 0} = StateData) -> {stop, normal, StateData}; fsm_next_state(wait_for_resume, #state{mgmt_pending_since = undefined, sid = SID, jid = JID, ip = IP, conn = Conn, auth_module = AuthModule, server = Host} = StateData) -> case StateData of #state{mgmt_ack_timer = undefined} -> ok; #state{mgmt_ack_timer = Timer} -> erlang:cancel_timer(Timer) end, ?INFO_MSG("Waiting for resumption of stream for ~s", [jid:to_string(JID)]), Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthModule}], NewStateData = ejabberd_hooks:run_fold(c2s_session_pending, Host, StateData, [SID, JID, Info]), {next_state, wait_for_resume, NewStateData#state{mgmt_state = pending, mgmt_pending_since = os:timestamp()}, NewStateData#state.mgmt_timeout}; fsm_next_state(wait_for_resume, StateData) -> Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since), Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1), {next_state, wait_for_resume, StateData, Timeout}; fsm_next_state(StateName, StateData) -> {next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}. %% fsm_reply: Generate the reply FSM tuple with different timeout, %% depending on the future state -spec fsm_reply(_, state_name(), state()) -> fsm_reply(). fsm_reply(Reply, session_established, StateData) -> {reply, Reply, session_established, StateData, ?C2S_HIBERNATE_TIMEOUT}; fsm_reply(Reply, wait_for_resume, StateData) -> Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since), Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1), {reply, Reply, wait_for_resume, StateData, Timeout}; fsm_reply(Reply, StateName, StateData) -> {reply, Reply, StateName, StateData, ?C2S_OPEN_TIMEOUT}. %% Used by c2s blacklist plugins -spec is_ip_blacklisted(undefined | {inet:ip_address(), non_neg_integer()}, binary()) -> false | {true, binary(), binary()}. is_ip_blacklisted(undefined, _Lang) -> false; is_ip_blacklisted({IP, _Port}, Lang) -> ejabberd_hooks:run_fold(check_bl_c2s, false, [IP, Lang]). %% Check from attributes %% returns invalid-from|NewElement -spec check_from(stanza(), jid()) -> 'invalid-from' | stanza(). check_from(Pkt, FromJID) -> JID = xmpp:get_from(Pkt), case JID of undefined -> Pkt; #jid{} -> if (JID#jid.luser == FromJID#jid.luser) and (JID#jid.lserver == FromJID#jid.lserver) and (JID#jid.lresource == FromJID#jid.lresource) -> Pkt; (JID#jid.luser == FromJID#jid.luser) and (JID#jid.lserver == FromJID#jid.lserver) and (JID#jid.lresource == <<"">>) -> Pkt; true -> 'invalid-from' end end. fsm_limit_opts(Opts) -> case lists:keysearch(max_fsm_queue, 1, Opts) of {value, {_, N}} when is_integer(N) -> [{max_queue, N}]; _ -> case ejabberd_config:get_option( max_fsm_queue, fun(I) when is_integer(I), I > 0 -> I end) of undefined -> []; N -> [{max_queue, N}] end end. -spec bounce_messages() -> ok. bounce_messages() -> receive {route, From, To, El} -> ejabberd_router:route(From, To, El), bounce_messages() after 0 -> ok end. -spec process_compression_request(compress(), state_name(), state()) -> fsm_next(). process_compression_request(#compress{methods = []}, StateName, StateData) -> send_element(StateData, #compress_failure{reason = 'setup-failed'}), fsm_next_state(StateName, StateData); process_compression_request(#compress{methods = Ms}, StateName, StateData) -> case lists:member(<<"zlib">>, Ms) of true -> Socket = StateData#state.socket, BCompressed = fxml:element_to_binary(xmpp:encode(#compressed{})), ZlibSocket = (StateData#state.sockmod):compress(Socket, BCompressed), fsm_next_state(wait_for_stream, StateData#state{socket = ZlibSocket, streamid = new_id()}); false -> send_element(StateData, #compress_failure{reason = 'unsupported-method'}), fsm_next_state(StateName, StateData) end. %%%---------------------------------------------------------------------- %%% XEP-0191 %%%---------------------------------------------------------------------- -spec route_blocking( {block, [jid()]} | {unblock, [jid()]} | unblock_all, state()) -> state(). route_blocking(What, StateData) -> SubEl = case What of {block, JIDs} -> #block{items = JIDs}; {unblock, JIDs} -> #unblock{items = JIDs}; unblock_all -> #unblock{} end, PrivPushIQ = #iq{type = set, id = <<"push">>, sub_els = [SubEl], from = jid:remove_resource(StateData#state.jid), to = StateData#state.jid}, %% No need to replace active privacy list here, %% blocking pushes are always accompanied by %% Privacy List pushes send_stanza(StateData, PrivPushIQ). %%%---------------------------------------------------------------------- %%% XEP-0198 %%%---------------------------------------------------------------------- -spec stream_mgmt_enabled(state()) -> boolean(). stream_mgmt_enabled(#state{mgmt_state = disabled}) -> false; stream_mgmt_enabled(_StateData) -> true. -spec dispatch_stream_mgmt(xmpp_element(), state()) -> state(). dispatch_stream_mgmt(El, #state{mgmt_state = MgmtState} = StateData) when MgmtState == active; MgmtState == pending -> perform_stream_mgmt(El, StateData); dispatch_stream_mgmt(El, StateData) -> negotiate_stream_mgmt(El, StateData). -spec negotiate_stream_mgmt(xmpp_element(), state()) -> state(). negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) -> %% XEP-0198 says: "For client-to-server connections, the client MUST NOT %% attempt to enable stream management until after it has completed Resource %% Binding unless it is resuming a previous session". However, it also %% says: "Stream management errors SHOULD be considered recoverable", so we %% won't bail out. send_element(StateData, #sm_failed{reason = 'unexpected-request', xmlns = ?NS_STREAM_MGMT_3}), StateData; negotiate_stream_mgmt(Pkt, StateData) -> Xmlns = xmpp:get_ns(Pkt), case stream_mgmt_enabled(StateData) of true -> case Pkt of #sm_enable{} -> handle_enable(StateData#state{mgmt_xmlns = Xmlns}, Pkt); _ -> Res = if is_record(Pkt, sm_a); is_record(Pkt, sm_r); is_record(Pkt, sm_resume) -> #sm_failed{reason = 'unexpected-request', xmlns = Xmlns}; true -> #sm_failed{reason = 'bad-request', xmlns = Xmlns} end, send_element(StateData, Res), StateData end; false -> send_element(StateData, #sm_failed{reason = 'service-unavailable', xmlns = Xmlns}), StateData end. -spec perform_stream_mgmt(xmpp_element(), state()) -> state(). perform_stream_mgmt(Pkt, StateData) -> case xmpp:get_ns(Pkt) of Xmlns when Xmlns == StateData#state.mgmt_xmlns -> case Pkt of #sm_r{} -> handle_r(StateData); #sm_a{} -> handle_a(StateData, Pkt); _ -> Res = if is_record(Pkt, sm_enable); is_record(Pkt, sm_resume) -> #sm_failed{reason = 'unexpected-request', xmlns = Xmlns}; true -> #sm_failed{reason = 'bad-request', xmlns = Xmlns} end, send_element(StateData, Res), StateData end; _ -> send_element(StateData, #sm_failed{reason = 'unsupported-version', xmlns = StateData#state.mgmt_xmlns}) end. -spec handle_enable(state(), sm_enable()) -> state(). handle_enable(#state{mgmt_timeout = DefaultTimeout, mgmt_max_timeout = MaxTimeout} = StateData, #sm_enable{resume = Resume, max = Max}) -> Timeout = if Resume == false -> 0; Max /= undefined, Max > 0, Max =< MaxTimeout -> Max; true -> DefaultTimeout end, Res = if Timeout > 0 -> ?INFO_MSG("Stream management with resumption enabled for ~s", [jid:to_string(StateData#state.jid)]), #sm_enabled{xmlns = StateData#state.mgmt_xmlns, id = make_resume_id(StateData), resume = true, max = Timeout}; true -> ?INFO_MSG("Stream management without resumption enabled for ~s", [jid:to_string(StateData#state.jid)]), #sm_enabled{xmlns = StateData#state.mgmt_xmlns} end, send_element(StateData, Res), StateData#state{mgmt_state = active, mgmt_queue = queue:new(), mgmt_timeout = Timeout * 1000}. -spec handle_r(state()) -> state(). handle_r(StateData) -> Res = #sm_a{xmlns = StateData#state.mgmt_xmlns, h = StateData#state.mgmt_stanzas_in}, send_element(StateData, Res), StateData. -spec handle_a(state(), sm_a()) -> state(). handle_a(StateData, #sm_a{h = H}) -> NewStateData = check_h_attribute(StateData, H), maybe_renew_ack_request(NewStateData). -spec handle_resume(state(), sm_resume()) -> {ok, state()} | error. handle_resume(StateData, #sm_resume{h = H, previd = PrevID, xmlns = Xmlns}) -> R = case stream_mgmt_enabled(StateData) of true -> case inherit_session_state(StateData, PrevID) of {ok, InheritedState, Info} -> {ok, InheritedState, Info, H}; {error, Err, InH} -> {error, #sm_failed{reason = 'item-not-found', h = InH, xmlns = Xmlns}, Err}; {error, Err} -> {error, #sm_failed{reason = 'item-not-found', xmlns = Xmlns}, Err} end; false -> {error, #sm_failed{reason = 'service-unavailable', xmlns = Xmlns}, <<"XEP-0198 disabled">>} end, case R of {ok, ResumedState, ResumedInfo, NumHandled} -> NewState = check_h_attribute(ResumedState, NumHandled), AttrXmlns = NewState#state.mgmt_xmlns, AttrId = make_resume_id(NewState), AttrH = NewState#state.mgmt_stanzas_in, send_element(NewState, #sm_resumed{xmlns = AttrXmlns, h = AttrH, previd = AttrId}), SendFun = fun(_F, _T, El, Time) -> NewEl = add_resent_delay_info(NewState, El, Time), send_element(NewState, NewEl) end, handle_unacked_stanzas(NewState, SendFun), send_element(NewState, #sm_r{xmlns = AttrXmlns}), NewState1 = csi_flush_queue(NewState), NewState2 = ejabberd_hooks:run_fold(c2s_session_resumed, StateData#state.server, NewState1, [NewState1#state.sid, NewState1#state.jid, ResumedInfo]), ?INFO_MSG("Resumed session for ~s", [jid:to_string(NewState2#state.jid)]), {ok, NewState2}; {error, El, Msg} -> send_element(StateData, El), ?INFO_MSG("Cannot resume session for ~s@~s: ~s", [StateData#state.user, StateData#state.server, Msg]), error end. -spec check_h_attribute(state(), non_neg_integer()) -> state(). check_h_attribute(#state{mgmt_stanzas_out = NumStanzasOut} = StateData, H) when H > NumStanzasOut -> ?DEBUG("~s acknowledged ~B stanzas, but only ~B were sent", [jid:to_string(StateData#state.jid), H, NumStanzasOut]), mgmt_queue_drop(StateData#state{mgmt_stanzas_out = H}, NumStanzasOut); check_h_attribute(#state{mgmt_stanzas_out = NumStanzasOut} = StateData, H) -> ?DEBUG("~s acknowledged ~B of ~B stanzas", [jid:to_string(StateData#state.jid), H, NumStanzasOut]), mgmt_queue_drop(StateData, H). -spec update_num_stanzas_in(state(), xmpp_element()) -> state(). update_num_stanzas_in(#state{mgmt_state = MgmtState} = StateData, El) when MgmtState == active; MgmtState == pending -> NewNum = case {xmpp:is_stanza(El), StateData#state.mgmt_stanzas_in} of {true, 4294967295} -> 0; {true, Num} -> Num + 1; {false, Num} -> Num end, StateData#state{mgmt_stanzas_in = NewNum}; update_num_stanzas_in(StateData, _El) -> StateData. mgmt_send_stanza(StateData, Stanza) -> case send_element(StateData, Stanza) of ok -> maybe_request_ack(StateData); _ -> StateData#state{mgmt_state = pending} end. maybe_request_ack(#state{mgmt_ack_timer = undefined} = StateData) -> request_ack(StateData); maybe_request_ack(StateData) -> StateData. request_ack(#state{mgmt_xmlns = Xmlns, mgmt_ack_timeout = AckTimeout} = StateData) -> AckReq = #sm_r{xmlns = Xmlns}, case {send_element(StateData, AckReq), AckTimeout} of {ok, undefined} -> ok; {ok, Timeout} -> Timer = erlang:send_after(Timeout, self(), close), StateData#state{mgmt_ack_timer = Timer, mgmt_stanzas_req = StateData#state.mgmt_stanzas_out}; _ -> StateData#state{mgmt_state = pending} end. maybe_renew_ack_request(#state{mgmt_ack_timer = undefined} = StateData) -> StateData; maybe_renew_ack_request(#state{mgmt_ack_timer = Timer, mgmt_queue = Queue, mgmt_stanzas_out = NumStanzasOut, mgmt_stanzas_req = NumStanzasReq} = StateData) -> erlang:cancel_timer(Timer), case NumStanzasReq < NumStanzasOut andalso not queue:is_empty(Queue) of true -> request_ack(StateData#state{mgmt_ack_timer = undefined}); false -> StateData#state{mgmt_ack_timer = undefined} end. -spec mgmt_queue_add(state(), xmpp_element()) -> state(). mgmt_queue_add(StateData, El) -> NewNum = case StateData#state.mgmt_stanzas_out of 4294967295 -> 0; Num -> Num + 1 end, NewQueue = queue:in({NewNum, p1_time_compat:timestamp(), El}, StateData#state.mgmt_queue), NewState = StateData#state{mgmt_queue = NewQueue, mgmt_stanzas_out = NewNum}, check_queue_length(NewState). -spec mgmt_queue_drop(state(), non_neg_integer()) -> state(). mgmt_queue_drop(StateData, NumHandled) -> NewQueue = jlib:queue_drop_while(fun({N, _T, _E}) -> N =< NumHandled end, StateData#state.mgmt_queue), StateData#state{mgmt_queue = NewQueue}. -spec check_queue_length(state()) -> state(). check_queue_length(#state{mgmt_max_queue = Limit} = StateData) when Limit == infinity; Limit == exceeded -> StateData; check_queue_length(#state{mgmt_queue = Queue, mgmt_max_queue = Limit} = StateData) -> case queue:len(Queue) > Limit of true -> StateData#state{mgmt_max_queue = exceeded}; false -> StateData end. -spec handle_unacked_stanzas(state(), fun((_, _, _, _) -> _)) -> ok. handle_unacked_stanzas(#state{mgmt_state = MgmtState} = StateData, F) when MgmtState == active; MgmtState == pending; MgmtState == timeout -> Queue = StateData#state.mgmt_queue, case queue:len(Queue) of 0 -> ok; N -> ?DEBUG("~B stanza(s) were not acknowledged by ~s", [N, jid:to_string(StateData#state.jid)]), lists:foreach( fun({_, Time, Pkt}) -> From = xmpp:get_from(Pkt), To = xmpp:get_to(Pkt), case {From, To} of {#jid{}, #jid{}} -> F(From, To, Pkt, Time); {_, _} -> ?DEBUG("Dropping stanza due to invalid JID(s)", []) end end, queue:to_list(Queue)) end; handle_unacked_stanzas(_StateData, _F) -> ok. -spec handle_unacked_stanzas(state()) -> ok. handle_unacked_stanzas(#state{mgmt_state = MgmtState} = StateData) when MgmtState == active; MgmtState == pending; MgmtState == timeout -> ResendOnTimeout = case StateData#state.mgmt_resend of Resend when is_boolean(Resend) -> Resend; if_offline -> Resource = StateData#state.resource, case ejabberd_sm:get_user_resources(StateData#state.user, StateData#state.server) of [Resource] -> % Same resource opened new session true; [] -> true; _ -> false end end, Lang = StateData#state.lang, ReRoute = case ResendOnTimeout of true -> fun(From, To, El, Time) -> NewEl = add_resent_delay_info(StateData, El, Time), ejabberd_router:route(From, To, NewEl) end; false -> fun(From, To, El, _Time) -> Txt = <<"User session terminated">>, ejabberd_router:route_error( To, From, El, xmpp:err_service_unavailable(Txt, Lang)) end end, F = fun(From, _To, #presence{}, _Time) -> ?DEBUG("Dropping presence stanza from ~s", [jid:to_string(From)]); (From, To, #iq{} = El, _Time) -> Txt = <<"User session terminated">>, ejabberd_router:route_error( To, From, El, xmpp:err_service_unavailable(Txt, Lang)); (From, _To, #message{meta = #{carbon_copy := true}}, _Time) -> %% XEP-0280 says: "When a receiving server attempts to deliver a %% forked message, and that message bounces with an error for %% any reason, the receiving server MUST NOT forward that error %% back to the original sender." Resending such a stanza could %% easily lead to unexpected results as well. ?DEBUG("Dropping forwarded message stanza from ~s", [jid:to_string(From)]); (From, To, El, Time) -> case ejabberd_hooks:run_fold(message_is_archived, StateData#state.server, false, [StateData, From, StateData#state.jid, El]) of true -> ?DEBUG("Dropping archived message stanza from ~p", [jid:to_string(xmpp:get_from(El))]); false -> ReRoute(From, To, El, Time) end end, handle_unacked_stanzas(StateData, F); handle_unacked_stanzas(_StateData) -> ok. -spec inherit_session_state(state(), binary()) -> {ok, state()} | {error, binary()} | {error, binary(), non_neg_integer()}. inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) -> case jlib:base64_to_term(ResumeID) of {term, {R, Time}} -> case ejabberd_sm:get_session_pid(U, S, R) of none -> case ejabberd_sm:get_offline_info(Time, U, S, R) of none -> {error, <<"Previous session PID not found">>}; Info -> case proplists:get_value(num_stanzas_in, Info) of undefined -> {error, <<"Previous session timed out">>}; H -> {error, <<"Previous session timed out">>, H} end end; OldPID -> OldSID = {Time, OldPID}, case catch resume_session(OldSID) of {resume, OldStateData} -> NewSID = {Time, self()}, % Old time, new PID Priority = case OldStateData#state.pres_last of undefined -> 0; Presence -> get_priority_from_presence(Presence) end, Conn = get_conn_type(StateData), Info = [{ip, StateData#state.ip}, {conn, Conn}, {auth_module, StateData#state.auth_module}], ejabberd_sm:open_session(NewSID, U, S, R, Priority, Info), {ok, StateData#state{conn = Conn, sid = NewSID, jid = OldStateData#state.jid, resource = OldStateData#state.resource, pres_t = OldStateData#state.pres_t, pres_f = OldStateData#state.pres_f, pres_a = OldStateData#state.pres_a, pres_last = OldStateData#state.pres_last, pres_timestamp = OldStateData#state.pres_timestamp, privacy_list = OldStateData#state.privacy_list, aux_fields = OldStateData#state.aux_fields, mgmt_xmlns = OldStateData#state.mgmt_xmlns, mgmt_queue = OldStateData#state.mgmt_queue, mgmt_timeout = OldStateData#state.mgmt_timeout, mgmt_stanzas_in = OldStateData#state.mgmt_stanzas_in, mgmt_stanzas_out = OldStateData#state.mgmt_stanzas_out, mgmt_state = active, csi_state = active}, Info}; {error, Msg} -> {error, Msg}; _ -> {error, <<"Cannot grab session state">>} end end; _ -> {error, <<"Invalid 'previd' value">>} end. -spec resume_session({integer(), pid()}) -> any(). resume_session({Time, PID}) -> (?GEN_FSM):sync_send_all_state_event(PID, {resume_session, Time}, 15000). -spec make_resume_id(state()) -> binary(). make_resume_id(StateData) -> {Time, _} = StateData#state.sid, jlib:term_to_base64({StateData#state.resource, Time}). -spec add_resent_delay_info(state(), stanza(), erlang:timestamp()) -> stanza(). add_resent_delay_info(_State, #iq{} = El, _Time) -> El; add_resent_delay_info(#state{server = From}, El, Time) -> xmpp_util:add_delay_info(El, jid:make(From), Time, <<"Resent">>). %%%---------------------------------------------------------------------- %%% XEP-0352 %%%---------------------------------------------------------------------- -spec csi_filter_stanza(state(), stanza()) -> state(). csi_filter_stanza(#state{csi_state = CsiState, jid = JID, server = Server} = StateData, Stanza) -> {StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_filter_stanza, Server, {StateData, [Stanza]}, [Server, JID, Stanza]), StateData2 = lists:foldl(fun(CurStanza, AccState) -> send_stanza(AccState, CurStanza) end, StateData1#state{csi_state = active}, Stanzas), StateData2#state{csi_state = CsiState}. -spec csi_flush_queue(state()) -> state(). csi_flush_queue(#state{csi_state = CsiState, jid = JID, server = Server} = StateData) -> {StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_flush_queue, Server, {StateData, []}, [Server, JID]), StateData2 = lists:foldl(fun(CurStanza, AccState) -> send_stanza(AccState, CurStanza) end, StateData1#state{csi_state = active}, Stanzas), StateData2#state{csi_state = CsiState}. %%%---------------------------------------------------------------------- %%% JID Set memory footprint reduction code %%%---------------------------------------------------------------------- %% Try to reduce the heap footprint of the four presence sets %% by ensuring that we re-use strings and Jids wherever possible. -spec pack(state()) -> state(). pack(S = #state{pres_a = A, pres_f = F, pres_t = T}) -> {NewA, Pack2} = pack_jid_set(A, gb_trees:empty()), {NewF, Pack3} = pack_jid_set(F, Pack2), {NewT, _Pack4} = pack_jid_set(T, Pack3), S#state{pres_a = NewA, pres_f = NewF, pres_t = NewT}. pack_jid_set(Set, Pack) -> Jids = (?SETS):to_list(Set), {PackedJids, NewPack} = pack_jids(Jids, Pack, []), {(?SETS):from_list(PackedJids), NewPack}. pack_jids([], Pack, Acc) -> {Acc, Pack}; pack_jids([{U, S, R} = Jid | Jids], Pack, Acc) -> case gb_trees:lookup(Jid, Pack) of {value, PackedJid} -> pack_jids(Jids, Pack, [PackedJid | Acc]); none -> {NewU, Pack1} = pack_string(U, Pack), {NewS, Pack2} = pack_string(S, Pack1), {NewR, Pack3} = pack_string(R, Pack2), NewJid = {NewU, NewS, NewR}, NewPack = gb_trees:insert(NewJid, NewJid, Pack3), pack_jids(Jids, NewPack, [NewJid | Acc]) end. pack_string(String, Pack) -> case gb_trees:lookup(String, Pack) of {value, PackedString} -> {PackedString, Pack}; none -> {String, gb_trees:insert(String, String, Pack)} end. transform_listen_option(Opt, Opts) -> [Opt|Opts]. -spec identity([{atom(), binary()}]) -> binary(). identity(Props) -> case proplists:get_value(authzid, Props, <<>>) of <<>> -> proplists:get_value(username, Props, <<>>); AuthzId -> AuthzId end. opt_type(domain_certfile) -> fun iolist_to_binary/1; opt_type(max_fsm_queue) -> fun (I) when is_integer(I), I > 0 -> I end; opt_type(resource_conflict) -> fun (setresource) -> setresource; (closeold) -> closeold; (closenew) -> closenew; (acceptnew) -> acceptnew end; opt_type(_) -> [domain_certfile, max_fsm_queue, resource_conflict].