mirror of
https://github.com/processone/ejabberd.git
synced 2024-11-24 16:23:40 +01:00
1053 lines
34 KiB
Erlang
1053 lines
34 KiB
Erlang
%%%-------------------------------------------------------------------
|
|
%%% File : ejabberd_bosh.erl
|
|
%%% Author : Evgeniy Khramtsov <ekhramtsov@process-one.net>
|
|
%%% Purpose : Manage BOSH sockets
|
|
%%% Created : 20 Jul 2011 by Evgeniy Khramtsov <ekhramtsov@process-one.net>
|
|
%%%
|
|
%%%
|
|
%%% ejabberd, Copyright (C) 2002-2024 ProcessOne
|
|
%%%
|
|
%%% This program is free software; you can redistribute it and/or
|
|
%%% modify it under the terms of the GNU General Public License as
|
|
%%% published by the Free Software Foundation; either version 2 of the
|
|
%%% License, or (at your option) any later version.
|
|
%%%
|
|
%%% This program is distributed in the hope that it will be useful,
|
|
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
%%% General Public License for more details.
|
|
%%%
|
|
%%% You should have received a copy of the GNU General Public License along
|
|
%%% with this program; if not, write to the Free Software Foundation, Inc.,
|
|
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
%%%
|
|
%%%-------------------------------------------------------------------
|
|
-module(ejabberd_bosh).
|
|
-behaviour(xmpp_socket).
|
|
-behaviour(p1_fsm).
|
|
-protocol({xep, 124, '1.11'}).
|
|
-protocol({xep, 206, '1.4'}).
|
|
|
|
%% API
|
|
-export([start/2, start/3, start_link/3]).
|
|
|
|
-export([send_xml/2, setopts/2, controlling_process/2,
|
|
reset_stream/1, change_shaper/2, close/1,
|
|
sockname/1, peername/1, process_request/3, send/2,
|
|
get_transport/1, get_owner/1]).
|
|
|
|
%% gen_fsm callbacks
|
|
-export([init/1, wait_for_session/2, wait_for_session/3,
|
|
active/2, active/3, handle_event/3, print_state/1,
|
|
handle_sync_event/4, handle_info/3, terminate/3,
|
|
code_change/4]).
|
|
|
|
-include("logger.hrl").
|
|
-include_lib("xmpp/include/xmpp.hrl").
|
|
-include("ejabberd_http.hrl").
|
|
-include("bosh.hrl").
|
|
|
|
%%-define(DBGFSM, true).
|
|
-ifdef(DBGFSM).
|
|
|
|
-define(FSMOPTS, [{debug, [trace]}]).
|
|
|
|
-else.
|
|
|
|
-define(FSMOPTS, []).
|
|
|
|
-endif.
|
|
|
|
-define(BOSH_VERSION, <<"1.11">>).
|
|
|
|
-define(NS_BOSH, <<"urn:xmpp:xbosh">>).
|
|
|
|
-define(NS_HTTP_BIND,
|
|
<<"http://jabber.org/protocol/httpbind">>).
|
|
|
|
-define(DEFAULT_WAIT, 300).
|
|
|
|
-define(DEFAULT_HOLD, 1).
|
|
|
|
-define(DEFAULT_POLLING, 2).
|
|
|
|
-define(MAX_SHAPED_REQUESTS_QUEUE_LEN, 1000).
|
|
|
|
-define(SEND_TIMEOUT, 15000).
|
|
|
|
-type bosh_socket() :: {http_bind, pid(),
|
|
{inet:ip_address(),
|
|
inet:port_number()}}.
|
|
|
|
-export_type([bosh_socket/0]).
|
|
|
|
-record(state,
|
|
{host = <<"">> :: binary(),
|
|
sid = <<"">> :: binary(),
|
|
el_ibuf :: p1_queue:queue(),
|
|
el_obuf :: p1_queue:queue(),
|
|
shaper_state = none :: ejabberd_shaper:shaper(),
|
|
c2s_pid :: pid() | undefined,
|
|
xmpp_ver = <<"">> :: binary(),
|
|
inactivity_timer :: reference() | undefined,
|
|
wait_timer :: reference() | undefined,
|
|
wait_timeout = ?DEFAULT_WAIT :: pos_integer(),
|
|
inactivity_timeout :: pos_integer(),
|
|
prev_rid = 0 :: non_neg_integer(),
|
|
prev_key = <<"">> :: binary(),
|
|
prev_poll :: erlang:timestamp() | undefined,
|
|
max_concat = unlimited :: unlimited | non_neg_integer(),
|
|
responses = gb_trees:empty() :: gb_trees:tree(),
|
|
receivers = gb_trees:empty() :: gb_trees:tree(),
|
|
shaped_receivers :: p1_queue:queue(),
|
|
ip :: inet:ip_address(),
|
|
max_requests = 1 :: non_neg_integer()}).
|
|
|
|
-record(body,
|
|
{http_reason = <<"">> :: binary(),
|
|
attrs = [] :: [{any(), any()}],
|
|
els = [] :: [fxml_stream:xml_stream_el()],
|
|
size = 0 :: non_neg_integer()}).
|
|
|
|
start(#body{attrs = Attrs} = Body, IP, SID) ->
|
|
XMPPDomain = get_attr(to, Attrs),
|
|
SupervisorProc = gen_mod:get_module_proc(XMPPDomain, mod_bosh),
|
|
case catch supervisor:start_child(SupervisorProc,
|
|
[Body, IP, SID])
|
|
of
|
|
{ok, Pid} -> {ok, Pid};
|
|
{'EXIT', {noproc, _}} ->
|
|
check_bosh_module(XMPPDomain),
|
|
{error, module_not_loaded};
|
|
Err ->
|
|
?ERROR_MSG("Failed to start BOSH session: ~p", [Err]),
|
|
{error, Err}
|
|
end.
|
|
|
|
start(StateName, State) ->
|
|
p1_fsm:start_link(?MODULE, [StateName, State],
|
|
?FSMOPTS).
|
|
|
|
start_link(Body, IP, SID) ->
|
|
p1_fsm:start_link(?MODULE, [Body, IP, SID],
|
|
?FSMOPTS).
|
|
|
|
send({http_bind, FsmRef, IP}, Packet) ->
|
|
send_xml({http_bind, FsmRef, IP}, Packet).
|
|
|
|
send_xml({http_bind, FsmRef, _IP}, Packet) ->
|
|
case catch p1_fsm:sync_send_all_state_event(FsmRef,
|
|
{send_xml, Packet},
|
|
?SEND_TIMEOUT)
|
|
of
|
|
{'EXIT', {timeout, _}} -> {error, timeout};
|
|
{'EXIT', _} -> {error, einval};
|
|
Res -> Res
|
|
end.
|
|
|
|
setopts({http_bind, FsmRef, _IP}, Opts) ->
|
|
case lists:member({active, once}, Opts) of
|
|
true ->
|
|
p1_fsm:send_all_state_event(FsmRef,
|
|
{activate, self()});
|
|
_ ->
|
|
case lists:member({active, false}, Opts) of
|
|
true ->
|
|
case catch p1_fsm:sync_send_all_state_event(FsmRef,
|
|
deactivate_socket)
|
|
of
|
|
{'EXIT', _} -> {error, einval};
|
|
Res -> Res
|
|
end;
|
|
_ -> ok
|
|
end
|
|
end.
|
|
|
|
controlling_process(_Socket, _Pid) -> ok.
|
|
|
|
reset_stream({http_bind, _FsmRef, _IP} = Socket) ->
|
|
Socket.
|
|
|
|
change_shaper({http_bind, FsmRef, _IP}, Shaper) ->
|
|
p1_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}).
|
|
|
|
close({http_bind, FsmRef, _IP}) ->
|
|
catch p1_fsm:sync_send_all_state_event(FsmRef,
|
|
close).
|
|
|
|
sockname(_Socket) -> {ok, {{0, 0, 0, 0}, 0}}.
|
|
|
|
peername({http_bind, _FsmRef, IP}) -> {ok, IP}.
|
|
|
|
get_transport(_Socket) ->
|
|
http_bind.
|
|
|
|
get_owner({http_bind, FsmRef, _IP}) ->
|
|
FsmRef.
|
|
|
|
process_request(Data, IP, Type) ->
|
|
Opts1 = ejabberd_c2s_config:get_c2s_limits(),
|
|
Opts = case Type of
|
|
xml ->
|
|
[{xml_socket, true} | Opts1];
|
|
json ->
|
|
Opts1
|
|
end,
|
|
MaxStanzaSize = case lists:keysearch(max_stanza_size, 1,
|
|
Opts)
|
|
of
|
|
{value, {_, Size}} -> Size;
|
|
_ -> infinity
|
|
end,
|
|
PayloadSize = iolist_size(Data),
|
|
if PayloadSize > MaxStanzaSize ->
|
|
http_error(403, <<"Request Too Large">>, Type);
|
|
true ->
|
|
case decode_body(Data, PayloadSize, Type) of
|
|
{ok, #body{attrs = Attrs} = Body} ->
|
|
SID = get_attr(sid, Attrs),
|
|
To = get_attr(to, Attrs),
|
|
if SID == <<"">>, To == <<"">> ->
|
|
bosh_response_with_msg(#body{http_reason =
|
|
<<"Missing 'to' attribute">>,
|
|
attrs =
|
|
[{type, <<"terminate">>},
|
|
{condition,
|
|
<<"improper-addressing">>}]},
|
|
Type, Body);
|
|
SID == <<"">> ->
|
|
case start(Body, IP, make_sid()) of
|
|
{ok, Pid} -> process_request(Pid, Body, IP, Type);
|
|
_Err ->
|
|
bosh_response_with_msg(#body{http_reason =
|
|
<<"Failed to start BOSH session">>,
|
|
attrs =
|
|
[{type, <<"terminate">>},
|
|
{condition,
|
|
<<"internal-server-error">>}]},
|
|
Type, Body)
|
|
end;
|
|
true ->
|
|
case mod_bosh:find_session(SID) of
|
|
{ok, Pid} -> process_request(Pid, Body, IP, Type);
|
|
error ->
|
|
bosh_response_with_msg(#body{http_reason =
|
|
<<"Session ID mismatch">>,
|
|
attrs =
|
|
[{type, <<"terminate">>},
|
|
{condition,
|
|
<<"item-not-found">>}]},
|
|
Type, Body)
|
|
end
|
|
end;
|
|
{error, Reason} -> http_error(400, Reason, Type)
|
|
end
|
|
end.
|
|
|
|
process_request(Pid, Req, _IP, Type) ->
|
|
case catch p1_fsm:sync_send_event(Pid, Req,
|
|
infinity)
|
|
of
|
|
#body{} = Resp -> bosh_response(Resp, Type);
|
|
{'EXIT', {Reason, _}}
|
|
when Reason == noproc; Reason == normal ->
|
|
bosh_response(#body{http_reason =
|
|
<<"BOSH session not found">>,
|
|
attrs =
|
|
[{type, <<"terminate">>},
|
|
{condition, <<"item-not-found">>}]},
|
|
Type);
|
|
{'EXIT', _} ->
|
|
bosh_response(#body{http_reason =
|
|
<<"Unexpected error">>,
|
|
attrs =
|
|
[{type, <<"terminate">>},
|
|
{condition, <<"internal-server-error">>}]},
|
|
Type)
|
|
end.
|
|
|
|
init([#body{attrs = Attrs}, IP, SID]) ->
|
|
Opts1 = ejabberd_c2s_config:get_c2s_limits(),
|
|
Opts2 = [{xml_socket, true} | Opts1],
|
|
Shaper = none,
|
|
ShaperState = ejabberd_shaper:new(Shaper),
|
|
Socket = make_socket(self(), IP),
|
|
XMPPVer = get_attr('xmpp:version', Attrs),
|
|
XMPPDomain = get_attr(to, Attrs),
|
|
{InBuf, Opts} = case mod_bosh_opt:prebind(XMPPDomain) of
|
|
true ->
|
|
JID = make_random_jid(XMPPDomain),
|
|
{buf_new(XMPPDomain), [{jid, JID} | Opts2]};
|
|
false ->
|
|
{buf_in([make_xmlstreamstart(XMPPDomain, XMPPVer)],
|
|
buf_new(XMPPDomain)),
|
|
Opts2}
|
|
end,
|
|
case ejabberd_c2s:start(?MODULE, Socket, [{receiver, self()}|Opts]) of
|
|
{ok, C2SPid} ->
|
|
ejabberd_c2s:accept(C2SPid),
|
|
Inactivity = mod_bosh_opt:max_inactivity(XMPPDomain) div 1000,
|
|
MaxConcat = mod_bosh_opt:max_concat(XMPPDomain),
|
|
ShapedReceivers = buf_new(XMPPDomain, ?MAX_SHAPED_REQUESTS_QUEUE_LEN),
|
|
State = #state{host = XMPPDomain, sid = SID, ip = IP,
|
|
xmpp_ver = XMPPVer, el_ibuf = InBuf,
|
|
max_concat = MaxConcat, el_obuf = buf_new(XMPPDomain),
|
|
inactivity_timeout = Inactivity,
|
|
shaped_receivers = ShapedReceivers,
|
|
shaper_state = ShaperState},
|
|
NewState = restart_inactivity_timer(State),
|
|
case mod_bosh:open_session(SID, self()) of
|
|
ok ->
|
|
{ok, wait_for_session, NewState};
|
|
{error, Reason} ->
|
|
{stop, Reason}
|
|
end;
|
|
{error, Reason} ->
|
|
{stop, Reason};
|
|
ignore ->
|
|
ignore
|
|
end.
|
|
|
|
wait_for_session(_Event, State) ->
|
|
?ERROR_MSG("Unexpected event in 'wait_for_session': ~p",
|
|
[_Event]),
|
|
{next_state, wait_for_session, State}.
|
|
|
|
wait_for_session(#body{attrs = Attrs} = Req, From,
|
|
State) ->
|
|
RID = get_attr(rid, Attrs),
|
|
?DEBUG("Got request:~n** RequestID: ~p~n** Request: "
|
|
"~p~n** From: ~p~n** State: ~p",
|
|
[RID, Req, From, State]),
|
|
Wait = min(get_attr(wait, Attrs, undefined),
|
|
?DEFAULT_WAIT),
|
|
Hold = min(get_attr(hold, Attrs, undefined),
|
|
?DEFAULT_HOLD),
|
|
NewKey = get_attr(newkey, Attrs),
|
|
Type = get_attr(type, Attrs),
|
|
Requests = Hold + 1,
|
|
PollTime = if
|
|
Wait == 0, Hold == 0 -> erlang:timestamp();
|
|
true -> undefined
|
|
end,
|
|
MaxPause = mod_bosh_opt:max_pause(State#state.host) div 1000,
|
|
Resp = #body{attrs =
|
|
[{sid, State#state.sid}, {wait, Wait},
|
|
{ver, ?BOSH_VERSION}, {polling, ?DEFAULT_POLLING},
|
|
{inactivity, State#state.inactivity_timeout},
|
|
{hold, Hold}, {'xmpp:restartlogic', true},
|
|
{requests, Requests}, {secure, true},
|
|
{maxpause, MaxPause}, {'xmlns:xmpp', ?NS_BOSH},
|
|
{'xmlns:stream', ?NS_STREAM}, {from, State#state.host}]},
|
|
{ShaperState, _} =
|
|
ejabberd_shaper:update(State#state.shaper_state, Req#body.size),
|
|
State1 = State#state{wait_timeout = Wait,
|
|
prev_rid = RID, prev_key = NewKey,
|
|
prev_poll = PollTime, shaper_state = ShaperState,
|
|
max_requests = Requests},
|
|
Els = maybe_add_xmlstreamend(Req#body.els, Type),
|
|
State2 = route_els(State1, Els),
|
|
{State3, RespEls} = get_response_els(State2),
|
|
State4 = stop_inactivity_timer(State3),
|
|
case RespEls of
|
|
[{xmlstreamstart, _, _} = El1] ->
|
|
OutBuf = buf_in([El1], State4#state.el_obuf),
|
|
State5 = restart_wait_timer(State4),
|
|
Receivers = gb_trees:insert(RID, {From, Resp},
|
|
State5#state.receivers),
|
|
{next_state, active,
|
|
State5#state{receivers = Receivers, el_obuf = OutBuf}};
|
|
[] ->
|
|
State5 = restart_wait_timer(State4),
|
|
Receivers = gb_trees:insert(RID, {From, Resp},
|
|
State5#state.receivers),
|
|
{next_state, active,
|
|
State5#state{receivers = Receivers}};
|
|
_ ->
|
|
reply_next_state(State4, Resp#body{els = RespEls}, RID,
|
|
From)
|
|
end;
|
|
wait_for_session(_Event, _From, State) ->
|
|
?ERROR_MSG("Unexpected sync event in 'wait_for_session': ~p",
|
|
[_Event]),
|
|
{reply, {error, badarg}, wait_for_session, State}.
|
|
|
|
active({#body{} = Body, From}, State) ->
|
|
active1(Body, From, State);
|
|
active(_Event, State) ->
|
|
?ERROR_MSG("Unexpected event in 'active': ~p",
|
|
[_Event]),
|
|
{next_state, active, State}.
|
|
|
|
active(#body{attrs = Attrs, size = Size} = Req, From,
|
|
State) ->
|
|
?DEBUG("Got request:~n** Request: ~p~n** From: "
|
|
"~p~n** State: ~p",
|
|
[Req, From, State]),
|
|
{ShaperState, Pause} =
|
|
ejabberd_shaper:update(State#state.shaper_state, Size),
|
|
State1 = State#state{shaper_state = ShaperState},
|
|
if Pause > 0 ->
|
|
TRef = start_shaper_timer(Pause),
|
|
try p1_queue:in({TRef, From, Req},
|
|
State1#state.shaped_receivers) of
|
|
Q ->
|
|
State2 = stop_inactivity_timer(State1),
|
|
{next_state, active,
|
|
State2#state{shaped_receivers = Q}}
|
|
catch error:full ->
|
|
misc:cancel_timer(TRef),
|
|
RID = get_attr(rid, Attrs),
|
|
reply_stop(State1,
|
|
#body{http_reason = <<"Too many requests">>,
|
|
attrs =
|
|
[{<<"type">>, <<"terminate">>},
|
|
{<<"condition">>,
|
|
<<"policy-violation">>}]},
|
|
From, RID)
|
|
end;
|
|
true -> active1(Req, From, State1)
|
|
end;
|
|
active(_Event, _From, State) ->
|
|
?ERROR_MSG("Unexpected sync event in 'active': ~p",
|
|
[_Event]),
|
|
{reply, {error, badarg}, active, State}.
|
|
|
|
active1(#body{attrs = Attrs} = Req, From, State) ->
|
|
RID = get_attr(rid, Attrs),
|
|
Key = get_attr(key, Attrs),
|
|
IsValidKey = is_valid_key(State#state.prev_key, Key),
|
|
IsOveractivity = is_overactivity(State#state.prev_poll),
|
|
Type = get_attr(type, Attrs),
|
|
if RID >
|
|
State#state.prev_rid + State#state.max_requests ->
|
|
reply_stop(State,
|
|
#body{http_reason = <<"Request ID is out of range">>,
|
|
attrs =
|
|
[{<<"type">>, <<"terminate">>},
|
|
{<<"condition">>, <<"item-not-found">>}]},
|
|
From, RID);
|
|
RID > State#state.prev_rid + 1 ->
|
|
State1 = restart_inactivity_timer(State),
|
|
Receivers = gb_trees:insert(RID, {From, Req},
|
|
State1#state.receivers),
|
|
{next_state, active,
|
|
State1#state{receivers = Receivers}};
|
|
RID =< State#state.prev_rid ->
|
|
%% TODO: do we need to check 'key' here? It seems so...
|
|
case gb_trees:lookup(RID, State#state.responses) of
|
|
{value, PrevBody} ->
|
|
{next_state, active,
|
|
do_reply(State, From, PrevBody, RID)};
|
|
none ->
|
|
State1 = drop_holding_receiver(State, RID),
|
|
State2 = stop_inactivity_timer(State1),
|
|
State3 = restart_wait_timer(State2),
|
|
Receivers = gb_trees:insert(RID, {From, Req},
|
|
State3#state.receivers),
|
|
{next_state, active, State3#state{receivers = Receivers}}
|
|
end;
|
|
not IsValidKey ->
|
|
reply_stop(State,
|
|
#body{http_reason = <<"Session key mismatch">>,
|
|
attrs =
|
|
[{<<"type">>, <<"terminate">>},
|
|
{<<"condition">>, <<"item-not-found">>}]},
|
|
From, RID);
|
|
IsOveractivity ->
|
|
reply_stop(State,
|
|
#body{http_reason = <<"Too many requests">>,
|
|
attrs =
|
|
[{<<"type">>, <<"terminate">>},
|
|
{<<"condition">>, <<"policy-violation">>}]},
|
|
From, RID);
|
|
true ->
|
|
State1 = stop_inactivity_timer(State),
|
|
State2 = stop_wait_timer(State1),
|
|
Els = case get_attr('xmpp:restart', Attrs, false) of
|
|
true ->
|
|
XMPPDomain = get_attr(to, Attrs, State#state.host),
|
|
XMPPVer = get_attr('xmpp:version', Attrs,
|
|
State#state.xmpp_ver),
|
|
[make_xmlstreamstart(XMPPDomain, XMPPVer)];
|
|
false -> Req#body.els
|
|
end,
|
|
State3 = route_els(State2,
|
|
maybe_add_xmlstreamend(Els, Type)),
|
|
{State4, RespEls} = get_response_els(State3),
|
|
NewKey = get_attr(newkey, Attrs, Key),
|
|
Pause = get_attr(pause, Attrs, undefined),
|
|
NewPoll = case State#state.prev_poll of
|
|
undefined -> undefined;
|
|
_ -> erlang:timestamp()
|
|
end,
|
|
State5 = State4#state{prev_poll = NewPoll,
|
|
prev_key = NewKey},
|
|
if Type == <<"terminate">> ->
|
|
reply_stop(State5,
|
|
#body{http_reason = <<"Session close">>,
|
|
attrs = [{<<"type">>, <<"terminate">>}],
|
|
els = RespEls},
|
|
From, RID);
|
|
Pause /= undefined ->
|
|
State6 = drop_holding_receiver(State5),
|
|
State7 = restart_inactivity_timer(State6, Pause),
|
|
InBuf = buf_in(RespEls, State7#state.el_ibuf),
|
|
{next_state, active,
|
|
State7#state{prev_rid = RID, el_ibuf = InBuf}};
|
|
RespEls == [] ->
|
|
State6 = drop_holding_receiver(State5),
|
|
State7 = stop_inactivity_timer(State6),
|
|
State8 = restart_wait_timer(State7),
|
|
Receivers = gb_trees:insert(RID, {From, #body{}},
|
|
State8#state.receivers),
|
|
{next_state, active,
|
|
State8#state{prev_rid = RID, receivers = Receivers}};
|
|
true ->
|
|
State6 = drop_holding_receiver(State5),
|
|
reply_next_state(State6#state{prev_rid = RID},
|
|
#body{els = RespEls}, RID, From)
|
|
end
|
|
end.
|
|
|
|
handle_event({activate, C2SPid}, StateName,
|
|
State) ->
|
|
State1 = route_els(State#state{c2s_pid = C2SPid}),
|
|
{next_state, StateName, State1};
|
|
handle_event({change_shaper, Shaper}, StateName,
|
|
State) ->
|
|
{next_state, StateName, State#state{shaper_state = Shaper}};
|
|
handle_event(_Event, StateName, State) ->
|
|
?ERROR_MSG("Unexpected event in '~ts': ~p",
|
|
[StateName, _Event]),
|
|
{next_state, StateName, State}.
|
|
|
|
handle_sync_event({send_xml,
|
|
{xmlstreamstart, _, _} = El},
|
|
_From, StateName, State)
|
|
when State#state.xmpp_ver >= <<"1.0">> ->
|
|
OutBuf = buf_in([El], State#state.el_obuf),
|
|
{reply, ok, StateName, State#state{el_obuf = OutBuf}};
|
|
handle_sync_event({send_xml, El}, _From, StateName,
|
|
State) ->
|
|
OutBuf = buf_in([El], State#state.el_obuf),
|
|
State1 = State#state{el_obuf = OutBuf},
|
|
case gb_trees:lookup(State1#state.prev_rid,
|
|
State1#state.receivers)
|
|
of
|
|
{value, {From, Body}} ->
|
|
{State2, Els} = get_response_els(State1),
|
|
{reply, ok, StateName,
|
|
reply(State2, Body#body{els = Els},
|
|
State2#state.prev_rid, From)};
|
|
none ->
|
|
State2 = case p1_queue:out(State1#state.shaped_receivers)
|
|
of
|
|
{{value, {TRef, From, Body}}, Q} ->
|
|
misc:cancel_timer(TRef),
|
|
p1_fsm:send_event(self(), {Body, From}),
|
|
State1#state{shaped_receivers = Q};
|
|
_ -> State1
|
|
end,
|
|
{reply, ok, StateName, State2}
|
|
end;
|
|
handle_sync_event(close, _From, _StateName, State) ->
|
|
{stop, normal, State};
|
|
handle_sync_event(deactivate_socket, _From, StateName,
|
|
StateData) ->
|
|
{reply, ok, StateName,
|
|
StateData#state{c2s_pid = undefined}};
|
|
handle_sync_event(_Event, _From, StateName, State) ->
|
|
?ERROR_MSG("Unexpected sync event in '~ts': ~p",
|
|
[StateName, _Event]),
|
|
{reply, {error, badarg}, StateName, State}.
|
|
|
|
handle_info({timeout, TRef, wait_timeout}, StateName,
|
|
#state{wait_timer = TRef} = State) ->
|
|
State2 = State#state{wait_timer = undefined},
|
|
{next_state, StateName, drop_holding_receiver(State2)};
|
|
handle_info({timeout, TRef, inactive}, _StateName,
|
|
#state{inactivity_timer = TRef} = State) ->
|
|
{stop, normal, State};
|
|
handle_info({timeout, TRef, shaper_timeout}, StateName,
|
|
State) ->
|
|
case p1_queue:out(State#state.shaped_receivers) of
|
|
{{value, {TRef, From, Req}}, Q} ->
|
|
p1_fsm:send_event(self(), {Req, From}),
|
|
{next_state, StateName,
|
|
State#state{shaped_receivers = Q}};
|
|
{{value, _}, _} ->
|
|
?ERROR_MSG("shaper_timeout mismatch:~n** TRef: ~p~n** "
|
|
"State: ~p",
|
|
[TRef, State]),
|
|
{stop, normal, State};
|
|
_ -> {next_state, StateName, State}
|
|
end;
|
|
handle_info(_Info, StateName, State) ->
|
|
?ERROR_MSG("Unexpected info:~n** Msg: ~p~n** StateName: ~p",
|
|
[_Info, StateName]),
|
|
{next_state, StateName, State}.
|
|
|
|
terminate(_Reason, _StateName, State) ->
|
|
mod_bosh:close_session(State#state.sid),
|
|
case State#state.c2s_pid of
|
|
C2SPid when is_pid(C2SPid) ->
|
|
p1_fsm:send_event(C2SPid, closed);
|
|
_ -> ok
|
|
end,
|
|
bounce_receivers(State, closed),
|
|
bounce_els_from_obuf(State).
|
|
|
|
code_change(_OldVsn, StateName, State, _Extra) ->
|
|
{ok, StateName, State}.
|
|
|
|
print_state(State) -> State.
|
|
|
|
route_els(#state{el_ibuf = Buf, c2s_pid = C2SPid} = State) ->
|
|
NewBuf = p1_queue:dropwhile(
|
|
fun(El) ->
|
|
p1_fsm:send_event(C2SPid, El),
|
|
true
|
|
end, Buf),
|
|
State#state{el_ibuf = NewBuf}.
|
|
|
|
route_els(State, Els) ->
|
|
case State#state.c2s_pid of
|
|
C2SPid when is_pid(C2SPid) ->
|
|
lists:foreach(fun (El) ->
|
|
p1_fsm:send_event(C2SPid, El)
|
|
end,
|
|
Els),
|
|
State;
|
|
_ ->
|
|
InBuf = buf_in(Els, State#state.el_ibuf),
|
|
State#state{el_ibuf = InBuf}
|
|
end.
|
|
|
|
get_response_els(#state{el_obuf = OutBuf,
|
|
max_concat = MaxConcat} =
|
|
State) ->
|
|
{Els, NewOutBuf} = buf_out(OutBuf, MaxConcat),
|
|
{State#state{el_obuf = NewOutBuf}, Els}.
|
|
|
|
reply(State, Body, RID, From) ->
|
|
State1 = restart_inactivity_timer(State),
|
|
Receivers = gb_trees:delete_any(RID,
|
|
State1#state.receivers),
|
|
State2 = do_reply(State1, From, Body, RID),
|
|
case catch gb_trees:take_smallest(Receivers) of
|
|
{NextRID, {From1, Req}, Receivers1}
|
|
when NextRID == RID + 1 ->
|
|
p1_fsm:send_event(self(), {Req, From1}),
|
|
State2#state{receivers = Receivers1};
|
|
_ -> State2#state{receivers = Receivers}
|
|
end.
|
|
|
|
reply_next_state(State, Body, RID, From) ->
|
|
State1 = restart_inactivity_timer(State),
|
|
Receivers = gb_trees:delete_any(RID,
|
|
State1#state.receivers),
|
|
State2 = do_reply(State1, From, Body, RID),
|
|
case catch gb_trees:take_smallest(Receivers) of
|
|
{NextRID, {From1, Req}, Receivers1}
|
|
when NextRID == RID + 1 ->
|
|
active(Req, From1,
|
|
State2#state{receivers = Receivers1});
|
|
_ ->
|
|
{next_state, active,
|
|
State2#state{receivers = Receivers}}
|
|
end.
|
|
|
|
reply_stop(State, Body, From, RID) ->
|
|
{stop, normal, do_reply(State, From, Body, RID)}.
|
|
|
|
drop_holding_receiver(State) ->
|
|
drop_holding_receiver(State, State#state.prev_rid).
|
|
drop_holding_receiver(State, RID) ->
|
|
case gb_trees:lookup(RID, State#state.receivers) of
|
|
{value, {From, Body}} ->
|
|
State1 = restart_inactivity_timer(State),
|
|
Receivers = gb_trees:delete_any(RID,
|
|
State1#state.receivers),
|
|
State2 = State1#state{receivers = Receivers},
|
|
do_reply(State2, From, Body, RID);
|
|
none ->
|
|
restart_inactivity_timer(State)
|
|
end.
|
|
|
|
do_reply(State, From, Body, RID) ->
|
|
?DEBUG("Send reply:~n** RequestID: ~p~n** Reply: "
|
|
"~p~n** To: ~p~n** State: ~p",
|
|
[RID, Body, From, State]),
|
|
p1_fsm:reply(From, Body),
|
|
Responses = gb_trees:delete_any(RID,
|
|
State#state.responses),
|
|
Responses1 = case gb_trees:size(Responses) of
|
|
N when N < State#state.max_requests; N == 0 ->
|
|
Responses;
|
|
_ -> element(3, gb_trees:take_smallest(Responses))
|
|
end,
|
|
Responses2 = gb_trees:insert(RID, Body, Responses1),
|
|
State#state{responses = Responses2}.
|
|
|
|
bounce_receivers(State, _Reason) ->
|
|
Receivers = gb_trees:to_list(State#state.receivers),
|
|
ShapedReceivers = lists:map(fun ({_, From,
|
|
#body{attrs = Attrs} = Body}) ->
|
|
RID = get_attr(rid, Attrs),
|
|
{RID, {From, Body}}
|
|
end,
|
|
p1_queue:to_list(State#state.shaped_receivers)),
|
|
lists:foldl(fun ({RID, {From, _Body}}, AccState) ->
|
|
NewBody = #body{http_reason =
|
|
<<"Session closed">>,
|
|
attrs =
|
|
[{type, <<"terminate">>},
|
|
{condition,
|
|
<<"other-request">>}]},
|
|
do_reply(AccState, From, NewBody, RID)
|
|
end,
|
|
State, Receivers ++ ShapedReceivers).
|
|
|
|
bounce_els_from_obuf(State) ->
|
|
Opts = ejabberd_config:codec_options(),
|
|
p1_queue:foreach(
|
|
fun({xmlstreamelement, El}) ->
|
|
try xmpp:decode(El, ?NS_CLIENT, Opts) of
|
|
Pkt when ?is_stanza(Pkt) ->
|
|
case {xmpp:get_from(Pkt), xmpp:get_to(Pkt)} of
|
|
{#jid{}, #jid{}} ->
|
|
ejabberd_router:route(Pkt);
|
|
_ ->
|
|
ok
|
|
end;
|
|
_ ->
|
|
ok
|
|
catch _:{xmpp_codec, _} ->
|
|
ok
|
|
end;
|
|
(_) ->
|
|
ok
|
|
end, State#state.el_obuf).
|
|
|
|
is_valid_key(<<"">>, <<"">>) -> true;
|
|
is_valid_key(PrevKey, Key) ->
|
|
str:sha(Key) == PrevKey.
|
|
|
|
is_overactivity(undefined) -> false;
|
|
is_overactivity(PrevPoll) ->
|
|
PollPeriod = timer:now_diff(erlang:timestamp(), PrevPoll) div
|
|
1000000,
|
|
if PollPeriod < (?DEFAULT_POLLING) -> true;
|
|
true -> false
|
|
end.
|
|
|
|
make_xmlstreamstart(XMPPDomain, Version) ->
|
|
VersionEl = case Version of
|
|
<<"">> -> [];
|
|
_ -> [{<<"version">>, Version}]
|
|
end,
|
|
{xmlstreamstart, <<"stream:stream">>,
|
|
[{<<"to">>, XMPPDomain}, {<<"xmlns">>, ?NS_CLIENT},
|
|
{<<"xmlns:xmpp">>, ?NS_BOSH},
|
|
{<<"xmlns:stream">>, ?NS_STREAM}
|
|
| VersionEl]}.
|
|
|
|
maybe_add_xmlstreamend(Els, <<"terminate">>) ->
|
|
Els ++ [{xmlstreamend, <<"stream:stream">>}];
|
|
maybe_add_xmlstreamend(Els, _) -> Els.
|
|
|
|
encode_body(#body{attrs = Attrs, els = Els}, Type) ->
|
|
Attrs1 = lists:map(fun ({K, V}) when is_atom(K) ->
|
|
AmK = iolist_to_binary(atom_to_list(K)),
|
|
case V of
|
|
true -> {AmK, <<"true">>};
|
|
false -> {AmK, <<"false">>};
|
|
I when is_integer(I), I >= 0 ->
|
|
{AmK, integer_to_binary(I)};
|
|
_ -> {AmK, V}
|
|
end;
|
|
({K, V}) -> {K, V}
|
|
end,
|
|
Attrs),
|
|
Attrs2 = [{<<"xmlns">>, ?NS_HTTP_BIND} | Attrs1],
|
|
{Attrs3, XMLs} = lists:foldr(fun ({xmlstreamraw, XML},
|
|
{AttrsAcc, XMLBuf}) ->
|
|
{AttrsAcc, [XML | XMLBuf]};
|
|
({xmlstreamelement,
|
|
#xmlel{name = <<"stream:error">>} = El},
|
|
{AttrsAcc, XMLBuf}) ->
|
|
{[{<<"type">>, <<"terminate">>},
|
|
{<<"condition">>,
|
|
<<"remote-stream-error">>},
|
|
{<<"xmlns:stream">>, ?NS_STREAM}
|
|
| AttrsAcc],
|
|
[encode_element(El, Type) | XMLBuf]};
|
|
({xmlstreamelement,
|
|
#xmlel{name = <<"stream:features">>} =
|
|
El},
|
|
{AttrsAcc, XMLBuf}) ->
|
|
{lists:keystore(<<"xmlns:stream">>, 1,
|
|
AttrsAcc,
|
|
{<<"xmlns:stream">>,
|
|
?NS_STREAM}),
|
|
[encode_element(El, Type) | XMLBuf]};
|
|
({xmlstreamelement,
|
|
#xmlel{name = Name, attrs = EAttrs} = El},
|
|
{AttrsAcc, XMLBuf})
|
|
when Name == <<"message">>;
|
|
Name == <<"presence">>;
|
|
Name == <<"iq">> ->
|
|
NewAttrs = lists:keystore(
|
|
<<"xmlns">>, 1, EAttrs,
|
|
{<<"xmlns">>, ?NS_CLIENT}),
|
|
NewEl = El#xmlel{attrs = NewAttrs},
|
|
{AttrsAcc,
|
|
[encode_element(NewEl, Type) | XMLBuf]};
|
|
({xmlstreamelement, El},
|
|
{AttrsAcc, XMLBuf}) ->
|
|
{AttrsAcc,
|
|
[encode_element(El, Type) | XMLBuf]};
|
|
({xmlstreamend, _}, {AttrsAcc, XMLBuf}) ->
|
|
{[{<<"type">>, <<"terminate">>},
|
|
{<<"condition">>,
|
|
<<"remote-stream-error">>}
|
|
| AttrsAcc],
|
|
XMLBuf};
|
|
({xmlstreamstart, <<"stream:stream">>,
|
|
SAttrs},
|
|
{AttrsAcc, XMLBuf}) ->
|
|
StreamID = fxml:get_attr_s(<<"id">>,
|
|
SAttrs),
|
|
NewAttrs = case
|
|
fxml:get_attr_s(<<"version">>,
|
|
SAttrs)
|
|
of
|
|
<<"">> ->
|
|
[{<<"authid">>,
|
|
StreamID}
|
|
| AttrsAcc];
|
|
V ->
|
|
lists:keystore(<<"xmlns:xmpp">>,
|
|
1,
|
|
[{<<"xmpp:version">>,
|
|
V},
|
|
{<<"authid">>,
|
|
StreamID}
|
|
| AttrsAcc],
|
|
{<<"xmlns:xmpp">>,
|
|
?NS_BOSH})
|
|
end,
|
|
{NewAttrs, XMLBuf};
|
|
({xmlstreamerror, _},
|
|
{AttrsAcc, XMLBuf}) ->
|
|
{[{<<"type">>, <<"terminate">>},
|
|
{<<"condition">>,
|
|
<<"remote-stream-error">>}
|
|
| AttrsAcc],
|
|
XMLBuf};
|
|
(_, Acc) -> Acc
|
|
end,
|
|
{Attrs2, []}, Els),
|
|
case XMLs of
|
|
[] when Type == xml ->
|
|
[<<"<body">>, attrs_to_list(Attrs3), <<"/>">>];
|
|
_ when Type == xml ->
|
|
[<<"<body">>, attrs_to_list(Attrs3), $>, XMLs,
|
|
<<"</body>">>]
|
|
end.
|
|
|
|
encode_element(El, xml) ->
|
|
fxml:element_to_binary(El);
|
|
encode_element(El, json) ->
|
|
El.
|
|
|
|
decode_body(Data, Size, Type) ->
|
|
case decode(Data, Type) of
|
|
#xmlel{name = <<"body">>, attrs = Attrs,
|
|
children = Els} ->
|
|
case attrs_to_body_attrs(Attrs) of
|
|
{error, _} = Err -> Err;
|
|
BodyAttrs ->
|
|
case get_attr(rid, BodyAttrs) of
|
|
<<"">> -> {error, <<"Missing \"rid\" attribute">>};
|
|
_ ->
|
|
Els1 = lists:flatmap(fun (#xmlel{} = El) ->
|
|
[{xmlstreamelement, El}];
|
|
(_) -> []
|
|
end,
|
|
Els),
|
|
{ok, #body{attrs = BodyAttrs, size = Size, els = Els1}}
|
|
end
|
|
end;
|
|
#xmlel{} -> {error, <<"Unexpected payload">>};
|
|
_ when Type == xml ->
|
|
{error, <<"XML is not well-formed">>};
|
|
_ when Type == json ->
|
|
{error, <<"JSON is not well-formed">>}
|
|
end.
|
|
|
|
decode(Data, xml) ->
|
|
fxml_stream:parse_element(Data);
|
|
decode(Data, json) ->
|
|
Data.
|
|
|
|
attrs_to_body_attrs(Attrs) ->
|
|
lists:foldl(fun (_, {error, Reason}) -> {error, Reason};
|
|
({Attr, Val}, Acc) ->
|
|
try case Attr of
|
|
<<"ver">> -> [{ver, Val} | Acc];
|
|
<<"xmpp:version">> ->
|
|
[{'xmpp:version', Val} | Acc];
|
|
<<"type">> -> [{type, Val} | Acc];
|
|
<<"key">> -> [{key, Val} | Acc];
|
|
<<"newkey">> -> [{newkey, Val} | Acc];
|
|
<<"xmlns">> -> Val = (?NS_HTTP_BIND), Acc;
|
|
<<"secure">> -> [{secure, to_bool(Val)} | Acc];
|
|
<<"xmpp:restart">> ->
|
|
[{'xmpp:restart', to_bool(Val)} | Acc];
|
|
<<"to">> ->
|
|
[{to, jid:nameprep(Val)} | Acc];
|
|
<<"wait">> -> [{wait, to_int(Val, 0)} | Acc];
|
|
<<"ack">> -> [{ack, to_int(Val, 0)} | Acc];
|
|
<<"sid">> -> [{sid, Val} | Acc];
|
|
<<"hold">> -> [{hold, to_int(Val, 0)} | Acc];
|
|
<<"rid">> -> [{rid, to_int(Val, 0)} | Acc];
|
|
<<"pause">> -> [{pause, to_int(Val, 0)} | Acc];
|
|
_ -> [{Attr, Val} | Acc]
|
|
end
|
|
catch
|
|
_:_ ->
|
|
{error,
|
|
<<"Invalid \"", Attr/binary, "\" attribute">>}
|
|
end
|
|
end,
|
|
[], Attrs).
|
|
|
|
to_int(S, Min) ->
|
|
case binary_to_integer(S) of
|
|
I when I >= Min -> I;
|
|
_ -> erlang:error(badarg)
|
|
end.
|
|
|
|
to_bool(<<"true">>) -> true;
|
|
to_bool(<<"1">>) -> true;
|
|
to_bool(<<"false">>) -> false;
|
|
to_bool(<<"0">>) -> false.
|
|
|
|
attrs_to_list(Attrs) -> [attr_to_list(A) || A <- Attrs].
|
|
|
|
attr_to_list({Name, Value}) ->
|
|
[$\s, Name, $=, $', fxml:crypt(Value), $'].
|
|
|
|
bosh_response(Body, Type) ->
|
|
CType = case Type of
|
|
xml -> ?CT_XML;
|
|
json -> ?CT_JSON
|
|
end,
|
|
{200, Body#body.http_reason, ?HEADER(CType),
|
|
encode_body(Body, Type)}.
|
|
|
|
bosh_response_with_msg(Body, Type, RcvBody) ->
|
|
?DEBUG("Send error reply:~p~n** Receiced body: ~p",
|
|
[Body, RcvBody]),
|
|
bosh_response(Body, Type).
|
|
|
|
http_error(Status, Reason, Type) ->
|
|
CType = case Type of
|
|
xml -> ?CT_XML;
|
|
json -> ?CT_JSON
|
|
end,
|
|
{Status, Reason, ?HEADER(CType), <<"">>}.
|
|
|
|
make_sid() -> str:sha(p1_rand:get_string()).
|
|
|
|
-compile({no_auto_import, [{min, 2}]}).
|
|
|
|
min(undefined, B) -> B;
|
|
min(A, B) -> erlang:min(A, B).
|
|
|
|
check_bosh_module(XmppDomain) ->
|
|
case gen_mod:is_loaded(XmppDomain, mod_bosh) of
|
|
true -> ok;
|
|
false ->
|
|
?ERROR_MSG("You are trying to use BOSH (HTTP Bind) "
|
|
"in host ~p, but the module mod_bosh "
|
|
"is not started in that host. Configure "
|
|
"your BOSH client to connect to the correct "
|
|
"host, or add your desired host to the "
|
|
"configuration, or check your 'modules' "
|
|
"section in your ejabberd configuration "
|
|
"file.",
|
|
[XmppDomain])
|
|
end.
|
|
|
|
get_attr(Attr, Attrs) -> get_attr(Attr, Attrs, <<"">>).
|
|
|
|
get_attr(Attr, Attrs, Default) ->
|
|
case lists:keysearch(Attr, 1, Attrs) of
|
|
{value, {_, Val}} -> Val;
|
|
_ -> Default
|
|
end.
|
|
|
|
buf_new(Host) ->
|
|
buf_new(Host, unlimited).
|
|
|
|
buf_new(Host, Limit) ->
|
|
QueueType = mod_bosh_opt:queue_type(Host),
|
|
p1_queue:new(QueueType, Limit).
|
|
|
|
buf_in(Xs, Buf) ->
|
|
lists:foldl(fun p1_queue:in/2, Buf, Xs).
|
|
|
|
buf_out(Buf, Num) when is_integer(Num), Num > 0 ->
|
|
buf_out(Buf, Num, []);
|
|
buf_out(Buf, _) -> {p1_queue:to_list(Buf), p1_queue:clear(Buf)}.
|
|
|
|
buf_out(Buf, 0, Els) -> {lists:reverse(Els), Buf};
|
|
buf_out(Buf, I, Els) ->
|
|
case p1_queue:out(Buf) of
|
|
{{value, El}, NewBuf} ->
|
|
buf_out(NewBuf, I - 1, [El | Els]);
|
|
{empty, _} -> buf_out(Buf, 0, Els)
|
|
end.
|
|
|
|
restart_timer(TRef, Timeout, Msg) ->
|
|
misc:cancel_timer(TRef),
|
|
erlang:start_timer(timer:seconds(Timeout), self(), Msg).
|
|
|
|
restart_inactivity_timer(#state{inactivity_timeout =
|
|
Timeout} =
|
|
State) ->
|
|
restart_inactivity_timer(State, Timeout).
|
|
|
|
restart_inactivity_timer(#state{inactivity_timer =
|
|
TRef} =
|
|
State,
|
|
Timeout) ->
|
|
NewTRef = restart_timer(TRef, Timeout, inactive),
|
|
State#state{inactivity_timer = NewTRef}.
|
|
|
|
stop_inactivity_timer(#state{inactivity_timer = TRef} =
|
|
State) ->
|
|
misc:cancel_timer(TRef),
|
|
State#state{inactivity_timer = undefined}.
|
|
|
|
restart_wait_timer(#state{wait_timer = TRef,
|
|
wait_timeout = Timeout} =
|
|
State) ->
|
|
NewTRef = restart_timer(TRef, Timeout, wait_timeout),
|
|
State#state{wait_timer = NewTRef}.
|
|
|
|
stop_wait_timer(#state{wait_timer = TRef} = State) ->
|
|
misc:cancel_timer(TRef), State#state{wait_timer = undefined}.
|
|
|
|
start_shaper_timer(Timeout) ->
|
|
erlang:start_timer(Timeout, self(), shaper_timeout).
|
|
|
|
make_random_jid(Host) ->
|
|
User = p1_rand:get_string(),
|
|
jid:make(User, Host, p1_rand:get_string()).
|
|
|
|
make_socket(Pid, IP) -> {http_bind, Pid, IP}.
|