24
1
mirror of https://github.com/processone/ejabberd.git synced 2024-06-16 22:05:29 +02:00
xmpp.chapril.org-ejabberd/src/web/ejabberd_http_bind.erl
Paweł Chmielowski 6c94d040fa Repeated http-bind request should abort only requests with same rid
Before this change, when request with repeat rid was received any waiting
request was aborted (but only after next request was delivered). With this
change, only request with identical rid are aborted and this is done
immediately
2012-05-03 11:44:34 +02:00

1255 lines
41 KiB
Erlang

%%%----------------------------------------------------------------------
%%% File : ejabberd_http_bind.erl
%%% Author : Stefan Strigler <steve@zeank.in-berlin.de>
%%% Purpose : Implements XMPP over BOSH (XEP-0206) (formerly known as
%%% HTTP Binding)
%%% Created : 21 Sep 2005 by Stefan Strigler <steve@zeank.in-berlin.de>
%%% Modified: may 2009 by Mickael Remond, Alexey Schepin
%%% Id : $Id: ejabberd_http_bind.erl 953 2009-05-07 10:40:40Z alexey $
%%%----------------------------------------------------------------------
-module(ejabberd_http_bind).
-behaviour(gen_fsm).
%% External exports
-export([start_link/3,
init/1,
handle_event/3,
handle_sync_event/4,
code_change/4,
handle_info/3,
terminate/3,
send/2,
send_xml/2,
sockname/1,
peername/1,
setopts/2,
controlling_process/2,
become_controller/2,
custom_receiver/1,
reset_stream/1,
change_shaper/2,
monitor/1,
close/1,
start/4,
handle_session_start/8,
handle_http_put/7,
http_put/7,
http_get/2,
prepare_response/4,
process_request/2]).
-include("ejabberd.hrl").
-include("jlib.hrl").
-include("ejabberd_http.hrl").
-include("http_bind.hrl").
-record(http_bind, {id, pid, to, hold, wait, process_delay, version}).
-define(NULL_PEER, {{0, 0, 0, 0}, 0}).
%% http binding request
-record(hbr, {rid,
key,
out}).
-record(state, {id,
rid = none,
key,
socket,
output = "",
input = queue:new(),
waiting_input = false,
shaper_state,
shaper_timer,
last_receiver,
last_poll,
http_receiver,
out_of_order_receiver = false,
wait_timer,
ctime = 0,
timer,
pause=0,
unprocessed_req_list = [], % list of request that have been delayed for proper reordering: {Request, PID}
req_list = [], % list of requests (cache)
max_inactivity,
max_pause,
ip = ?NULL_PEER
}).
%% Internal request format:
-record(http_put, {rid,
attrs,
payload,
payload_size,
hold,
stream,
ip}).
%%-define(DBGFSM, true).
-ifdef(DBGFSM).
-define(FSMOPTS, [{debug, [trace]}]).
-else.
-define(FSMOPTS, []).
-endif.
-define(BOSH_VERSION, "1.8").
-define(NS_CLIENT, "jabber:client").
-define(NS_BOSH, "urn:xmpp:xbosh").
-define(NS_HTTP_BIND, "http://jabber.org/protocol/httpbind").
-define(MAX_REQUESTS, 2). % number of simultaneous requests
-define(MIN_POLLING, 2000000). % don't poll faster than that or we will
% shoot you (time in microsec)
-define(MAX_WAIT, 3600). % max num of secs to keep a request on hold
-define(MAX_INACTIVITY, 30000). % msecs to wait before terminating
% idle sessions
-define(MAX_PAUSE, 120). % may num of sec a client is allowed to pause
% the session
%% Wait 100ms before continue processing, to allow the client provide more related stanzas.
-define(PROCESS_DELAY_DEFAULT, 100).
-define(PROCESS_DELAY_MIN, 0).
-define(PROCESS_DELAY_MAX, 1000).
%% Line copied from mod_http_bind.erl
-define(PROCNAME_MHB, ejabberd_mod_http_bind).
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
%% TODO: If compile with no supervisor option, start the session without
%% supervisor
start(XMPPDomain, Sid, Key, IP) ->
?DEBUG("Starting session", []),
SupervisorProc = gen_mod:get_module_proc(XMPPDomain, ?PROCNAME_MHB),
case catch supervisor:start_child(SupervisorProc, [Sid, Key, IP]) of
{ok, Pid} -> {ok, Pid};
_ -> check_bind_module(XMPPDomain),
{error, "Cannot start HTTP bind session"}
end.
start_link(Sid, Key, IP) ->
gen_fsm:start_link(?MODULE, [Sid, Key, IP], ?FSMOPTS).
send({http_bind, FsmRef, _IP}, Packet) ->
gen_fsm:sync_send_all_state_event(FsmRef, {send, Packet}).
send_xml({http_bind, FsmRef, _IP}, Packet) ->
gen_fsm:sync_send_all_state_event(FsmRef, {send_xml, Packet}).
setopts({http_bind, FsmRef, _IP}, Opts) ->
case lists:member({active, once}, Opts) of
true ->
gen_fsm:send_all_state_event(FsmRef, {activate, self()});
_ ->
ok
end.
controlling_process(_Socket, _Pid) ->
ok.
custom_receiver({http_bind, FsmRef, _IP}) ->
{receiver, ?MODULE, FsmRef}.
become_controller(FsmRef, C2SPid) ->
gen_fsm:send_all_state_event(FsmRef, {become_controller, C2SPid}).
reset_stream({http_bind, _FsmRef, _IP}) ->
ok.
change_shaper({http_bind, FsmRef, _IP}, Shaper) ->
gen_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}).
monitor({http_bind, FsmRef, _IP}) ->
erlang:monitor(process, FsmRef).
close({http_bind, FsmRef, _IP}) ->
catch gen_fsm:sync_send_all_state_event(FsmRef, {stop, close}).
sockname(_Socket) ->
{ok, ?NULL_PEER}.
peername({http_bind, _FsmRef, IP}) ->
{ok, IP}.
%% Entry point for data coming from client through ejabberd HTTP server:
process_request(Data, IP) ->
Opts1 = ejabberd_c2s_config:get_c2s_limits(),
Opts = [{xml_socket, true} | Opts1],
MaxStanzaSize =
case lists:keysearch(max_stanza_size, 1, Opts) of
{value, {_, Size}} -> Size;
_ -> infinity
end,
PayloadSize = iolist_size(Data),
case catch parse_request(Data, PayloadSize, MaxStanzaSize) of
%% No existing session:
{ok, {"", Rid, Attrs, Payload}} ->
case xml:get_attr_s("to",Attrs) of
"" ->
?DEBUG("Session not created (Improper addressing)", []),
{200, ?HEADER, "<body type='terminate' "
"condition='improper-addressing' "
"xmlns='" ++ ?NS_HTTP_BIND ++ "'/>"};
XmppDomain ->
%% create new session
Sid = sha:sha(term_to_binary({now(), make_ref()})),
case start(XmppDomain, Sid, "", IP) of
{error, _} ->
{200, ?HEADER, "<body type='terminate' "
"condition='internal-server-error' "
"xmlns='" ++ ?NS_HTTP_BIND ++ "'>BOSH module not started</body>"};
{ok, Pid} ->
handle_session_start(
Pid, XmppDomain, Sid, Rid, Attrs,
Payload, PayloadSize, IP)
end
end;
%% Existing session
{ok, {Sid, Rid, Attrs, Payload1}} ->
StreamStart =
case xml:get_attr_s("xmpp:restart",Attrs) of
"true" ->
true;
_ ->
false
end,
Payload2 = case xml:get_attr_s("type",Attrs) of
"terminate" ->
%% close stream
Payload1 ++ [{xmlstreamend, "stream:stream"}];
_ ->
Payload1
end,
handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize,
StreamStart, IP);
{size_limit, Sid} ->
case mnesia:dirty_read({http_bind, Sid}) of
[] ->
{404, ?HEADER, ""};
[#http_bind{pid = FsmRef}] ->
gen_fsm:sync_send_all_state_event(FsmRef, {stop, close}),
{200, ?HEADER, "<body type='terminate' "
"condition='undefined-condition' "
"xmlns='" ++ ?NS_HTTP_BIND ++ "'>Request Too Large</body>"}
end;
_ ->
?DEBUG("Received bad request: ~p", [Data]),
{400, ?HEADER, ""}
end.
handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs,
Payload, PayloadSize, IP) ->
?DEBUG("got pid: ~p", [Pid]),
Wait = case string:to_integer(xml:get_attr_s("wait",Attrs)) of
{error, _} ->
?MAX_WAIT;
{CWait, _} ->
if
(CWait > ?MAX_WAIT) ->
?MAX_WAIT;
true ->
CWait
end
end,
Hold = case string:to_integer(xml:get_attr_s("hold",Attrs)) of
{error, _} ->
(?MAX_REQUESTS - 1);
{CHold, _} ->
if
(CHold > (?MAX_REQUESTS - 1)) ->
(?MAX_REQUESTS - 1);
true ->
CHold
end
end,
Pdelay = case string:to_integer(xml:get_attr_s("process-delay",Attrs)) of
{error, _} ->
?PROCESS_DELAY_DEFAULT;
{CPdelay, _} when
(?PROCESS_DELAY_MIN =< CPdelay) and
(CPdelay =< ?PROCESS_DELAY_MAX) ->
CPdelay;
{CPdelay, _} ->
lists:max([lists:min([CPdelay, ?PROCESS_DELAY_MAX]), ?PROCESS_DELAY_MIN])
end,
Version =
case catch list_to_float(
xml:get_attr_s("ver", Attrs)) of
{'EXIT', _} -> 0.0;
V -> V
end,
XmppVersion = xml:get_attr_s("xmpp:version", Attrs),
?DEBUG("Create session: ~p", [Sid]),
mnesia:dirty_write(
#http_bind{id = Sid,
pid = Pid,
to = {XmppDomain,
XmppVersion},
hold = Hold,
wait = Wait,
process_delay = Pdelay,
version = Version
}),
handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, true, IP).
%%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm
%%%----------------------------------------------------------------------
%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, StateName, StateData} |
%% {ok, StateName, StateData, Timeout} |
%% ignore |
%% {stop, StopReason}
%%----------------------------------------------------------------------
init([Sid, Key, IP]) ->
?DEBUG("started: ~p", [{Sid, Key, IP}]),
%% Read c2s options from the first ejabberd_c2s configuration in
%% the config file listen section
%% TODO: We should have different access and shaper values for
%% each connector. The default behaviour should be however to use
%% the default c2s restrictions if not defined for the current
%% connector.
Opts1 = ejabberd_c2s_config:get_c2s_limits(),
Opts = [{xml_socket, true} | Opts1],
Shaper = none,
ShaperState = shaper:new(Shaper),
Socket = {http_bind, self(), IP},
ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, Opts),
Timer = erlang:start_timer(?MAX_INACTIVITY, self(), []),
{ok, loop, #state{id = Sid,
key = Key,
socket = Socket,
shaper_state = ShaperState,
max_inactivity = ?MAX_INACTIVITY,
max_pause = ?MAX_PAUSE,
timer = Timer}}.
%%----------------------------------------------------------------------
%% Func: handle_event/3
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
handle_event({become_controller, C2SPid}, StateName, StateData) ->
case StateData#state.input of
cancel ->
{next_state, StateName, StateData#state{
waiting_input = C2SPid}};
Input ->
lists:foreach(
fun(Event) ->
C2SPid ! Event
end, queue:to_list(Input)),
{next_state, StateName, StateData#state{
input = queue:new(),
waiting_input = C2SPid}}
end;
handle_event({change_shaper, Shaper}, StateName, StateData) ->
NewShaperState = shaper:new(Shaper),
{next_state, StateName, StateData#state{shaper_state = NewShaperState}};
handle_event(_Event, StateName, StateData) ->
{next_state, StateName, StateData}.
%%----------------------------------------------------------------------
%% Func: handle_sync_event/4
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {reply, Reply, NextStateName, NextStateData} |
%% {reply, Reply, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData} |
%% {stop, Reason, Reply, NewStateData}
%%----------------------------------------------------------------------
handle_sync_event({send_xml, Packet}, _From, StateName,
#state{http_receiver = undefined} = StateData) ->
Output = [Packet | StateData#state.output],
Reply = ok,
{reply, Reply, StateName, StateData#state{output = Output}};
handle_sync_event({send_xml, Packet}, _From, StateName,
#state{out_of_order_receiver = true} = StateData) ->
Output = [Packet | StateData#state.output],
Reply = ok,
{reply, Reply, StateName, StateData#state{output = Output}};
handle_sync_event({send_xml, Packet}, _From, StateName, StateData) ->
Output = [Packet | StateData#state.output],
cancel_timer(StateData#state.timer),
Timer = set_inactivity_timer(StateData#state.pause,
StateData#state.max_inactivity),
HTTPReply = {ok, Output},
gen_fsm:reply(StateData#state.http_receiver, HTTPReply),
cancel_timer(StateData#state.wait_timer),
Rid = StateData#state.rid,
ReqList = [#hbr{rid = Rid,
key = StateData#state.key,
out = Output
} |
[El || El <- StateData#state.req_list,
El#hbr.rid /= Rid ]
],
Reply = ok,
{reply, Reply, StateName,
StateData#state{output = [],
http_receiver = undefined,
req_list = ReqList,
wait_timer = undefined,
timer = Timer}};
handle_sync_event({stop,close}, _From, _StateName, StateData) ->
Reply = ok,
{stop, normal, Reply, StateData};
handle_sync_event({stop,stream_closed}, _From, _StateName, StateData) ->
Reply = ok,
{stop, normal, Reply, StateData};
handle_sync_event({stop,Reason}, _From, _StateName, StateData) ->
?DEBUG("Closing bind session ~p - Reason: ~p", [StateData#state.id, Reason]),
Reply = ok,
{stop, normal, Reply, StateData};
%% HTTP PUT: Receive packets from the client
handle_sync_event(#http_put{rid = Rid},
_From, StateName, StateData)
when StateData#state.shaper_timer /= undefined ->
Pause =
case erlang:read_timer(StateData#state.shaper_timer) of
false ->
0;
P -> P
end,
Reply = {wait, Pause},
?DEBUG("Shaper timer for RID ~p: ~p", [Rid, Reply]),
{reply, Reply, StateName, StateData};
handle_sync_event(#http_put{payload_size = PayloadSize} = Request,
_From, StateName, StateData) ->
?DEBUG("New request: ~p",[Request]),
%% Updating trafic shaper
{NewShaperState, NewShaperTimer} =
update_shaper(StateData#state.shaper_state, PayloadSize),
handle_http_put_event(Request, StateName,
StateData#state{shaper_state = NewShaperState,
shaper_timer = NewShaperTimer});
%% HTTP GET: send packets to the client
handle_sync_event({http_get, Rid, Wait, Hold}, From, StateName, StateData) ->
%% setup timer
TNow = tnow(),
if
(Hold > 0) and
((StateData#state.output == []) or (StateData#state.rid < Rid)) and
((TNow - StateData#state.ctime) < (Wait*1000*1000)) and
(StateData#state.rid =< Rid) and
(StateData#state.pause == 0) ->
send_receiver_reply(StateData#state.http_receiver, {ok, empty}),
cancel_timer(StateData#state.wait_timer),
WaitTimer = erlang:start_timer(Wait * 1000, self(), []),
%% MR: Not sure we should cancel the state timer here.
cancel_timer(StateData#state.timer),
{next_state, StateName, StateData#state{
http_receiver = From,
out_of_order_receiver = StateData#state.rid < Rid,
wait_timer = WaitTimer,
timer = undefined}};
true ->
cancel_timer(StateData#state.timer),
Reply = {ok, StateData#state.output},
%% save request
ReqList = [#hbr{rid = Rid,
key = StateData#state.key,
out = StateData#state.output
} |
[El || El <- StateData#state.req_list,
El#hbr.rid /= Rid ]
],
if
(StateData#state.http_receiver /= undefined) and
StateData#state.out_of_order_receiver ->
{reply, Reply, StateName, StateData#state{
output = [],
timer = undefined,
req_list = ReqList,
out_of_order_receiver = false}};
true ->
send_receiver_reply(StateData#state.http_receiver, {ok, empty}),
cancel_timer(StateData#state.wait_timer),
Timer = set_inactivity_timer(StateData#state.pause,
StateData#state.max_inactivity),
{reply, Reply, StateName,
StateData#state{output = [],
http_receiver = undefined,
wait_timer = undefined,
timer = Timer,
req_list = ReqList}}
end
end;
handle_sync_event(peername, _From, StateName, StateData) ->
Reply = {ok, StateData#state.ip},
{reply, Reply, StateName, StateData};
handle_sync_event(_Event, _From, StateName, StateData) ->
Reply = ok,
{reply, Reply, StateName, StateData}.
code_change(_OldVsn, StateName, StateData, _Extra) ->
{ok, StateName, StateData}.
%%----------------------------------------------------------------------
%% Func: handle_info/3
%% Returns: {next_state, NextStateName, NextStateData} |
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
%% We reached the max_inactivity timeout:
handle_info({timeout, Timer, _}, _StateName,
#state{id=SID, timer = Timer} = StateData) ->
?INFO_MSG("Session timeout. Closing the HTTP bind session: ~p", [SID]),
{stop, normal, StateData};
handle_info({timeout, WaitTimer, _}, StateName,
#state{wait_timer = WaitTimer} = StateData) ->
if
StateData#state.http_receiver /= undefined ->
cancel_timer(StateData#state.timer),
Timer = set_inactivity_timer(StateData#state.pause,
StateData#state.max_inactivity),
gen_fsm:reply(StateData#state.http_receiver, {ok, empty}),
Rid = StateData#state.rid,
ReqList = [#hbr{rid = Rid,
key = StateData#state.key,
out = []
} |
[El || El <- StateData#state.req_list,
El#hbr.rid /= Rid ]
],
{next_state, StateName,
StateData#state{http_receiver = undefined,
req_list = ReqList,
wait_timer = undefined,
timer = Timer}};
true ->
{next_state, StateName, StateData}
end;
handle_info({timeout, ShaperTimer, _}, StateName,
#state{shaper_timer = ShaperTimer} = StateData) ->
{next_state, StateName, StateData#state{shaper_timer = undefined}};
handle_info(_, StateName, StateData) ->
{next_state, StateName, StateData}.
%%----------------------------------------------------------------------
%% Func: terminate/3
%% Purpose: Shutdown the fsm
%% Returns: any
%%----------------------------------------------------------------------
terminate(_Reason, _StateName, StateData) ->
?DEBUG("terminate: Deleting session ~s", [StateData#state.id]),
mnesia:dirty_delete({http_bind, StateData#state.id}),
send_receiver_reply(StateData#state.http_receiver, {ok, terminate}),
case StateData#state.waiting_input of
false ->
ok;
C2SPid ->
gen_fsm:send_event(C2SPid, closed)
end,
ok.
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------
%% PUT / Get processing:
handle_http_put_event(#http_put{rid = Rid, attrs = Attrs,
hold = Hold} = Request,
StateName, StateData) ->
?DEBUG("New request: ~p",[Request]),
%% Check if Rid valid
RidAllow = rid_allow(StateData#state.rid, Rid, Attrs, Hold,
StateData#state.max_pause),
%% Check if Rid is in sequence or out of sequence:
case RidAllow of
buffer ->
?DEBUG("Buffered request: ~p", [Request]),
%% Request is out of sequence:
PendingRequests = StateData#state.unprocessed_req_list,
%% In case an existing RID was already buffered:
Requests = lists:keydelete(Rid, 2, PendingRequests),
ReqList = [#hbr{rid = Rid,
key = StateData#state.key,
out = []
} |
[El || El <- StateData#state.req_list,
El#hbr.rid > (Rid - 1 - Hold)]
],
?DEBUG("reqlist: ~p", [ReqList]),
UnprocessedReqList = [Request | Requests],
cancel_timer(StateData#state.timer),
Timer = set_inactivity_timer(0, StateData#state.max_inactivity),
{reply, ok, StateName,
StateData#state{unprocessed_req_list = UnprocessedReqList,
req_list = ReqList,
timer = Timer}, hibernate};
_ ->
%% Request is in sequence:
process_http_put(Request, StateName, StateData, RidAllow)
end.
process_http_put(#http_put{rid = Rid, attrs = Attrs, payload = Payload,
hold = Hold, stream = StreamTo,
ip = IP} = Request,
StateName, StateData, RidAllow) ->
?DEBUG("Actually processing request: ~p", [Request]),
%% Check if key valid
Key = xml:get_attr_s("key", Attrs),
NewKey = xml:get_attr_s("newkey", Attrs),
KeyAllow =
case RidAllow of
repeat ->
true;
false ->
false;
{true, _} ->
case StateData#state.key of
"" ->
true;
OldKey ->
NextKey = sha:sha(Key),
?DEBUG("Key/OldKey/NextKey: ~s/~s/~s", [Key, OldKey, NextKey]),
if
OldKey == NextKey ->
true;
true ->
?DEBUG("wrong key: ~s",[Key]),
false
end
end
end,
TNow = tnow(),
LastPoll = if
Payload == [] ->
TNow;
true ->
0
end,
if
(Payload == []) and
(Hold == 0) and
(TNow - StateData#state.last_poll < ?MIN_POLLING) ->
Reply = {error, polling_too_frequently},
{reply, Reply, StateName, StateData};
KeyAllow ->
case RidAllow of
false ->
Reply = {error, not_exists},
{reply, Reply, StateName, StateData};
repeat ->
?DEBUG("REPEATING ~p", [Rid]),
case [El#hbr.out ||
El <- StateData#state.req_list,
El#hbr.rid == Rid] of
[] ->
{error, not_exists};
[Out | _XS] ->
if (Rid == StateData#state.rid) and
(StateData#state.http_receiver /= undefined) ->
{reply, ok, StateName, StateData};
true ->
Reply = {repeat, lists:reverse(Out)},
{reply, Reply, StateName, StateData#state{last_poll = LastPoll}}
end
end;
{true, Pause} ->
SaveKey = if
NewKey == "" ->
Key;
true ->
NewKey
end,
?DEBUG(" -- SaveKey: ~s~n", [SaveKey]),
%% save request
ReqList1 =
[El || El <- StateData#state.req_list,
El#hbr.rid > (Rid - 1 - Hold)],
ReqList =
case lists:keymember(Rid, #hbr.rid, ReqList1) of
true ->
ReqList1;
false ->
[#hbr{rid = Rid,
key = StateData#state.key,
out = []
} |
ReqList1
]
end,
?DEBUG("reqlist: ~p", [ReqList]),
%% setup next timer
cancel_timer(StateData#state.timer),
Timer = set_inactivity_timer(Pause,
StateData#state.max_inactivity),
case StateData#state.waiting_input of
false ->
Input =
lists:foldl(
fun queue:in/2,
StateData#state.input, Payload),
Reply = ok,
process_buffered_request(Reply, StateName,
StateData#state{input = Input,
rid = Rid,
key = SaveKey,
ctime = TNow,
timer = Timer,
pause = Pause,
last_poll = LastPoll,
req_list = ReqList,
ip = IP
});
C2SPid ->
case StreamTo of
{To, ""} ->
gen_fsm:send_event(
C2SPid,
{xmlstreamstart, "stream:stream",
[{"to", To},
{"xmlns", ?NS_CLIENT},
{"xmlns:stream", ?NS_STREAM}]});
{To, Version} ->
gen_fsm:send_event(
C2SPid,
{xmlstreamstart, "stream:stream",
[{"to", To},
{"xmlns", ?NS_CLIENT},
{"version", Version},
{"xmlns:stream", ?NS_STREAM}]});
_ ->
ok
end,
MaxInactivity = get_max_inactivity(StreamTo, StateData#state.max_inactivity),
MaxPause = get_max_inactivity(StreamTo, StateData#state.max_pause),
?DEBUG("really sending now: ~p", [Payload]),
lists:foreach(
fun({xmlstreamend, End}) ->
gen_fsm:send_event(
C2SPid, {xmlstreamend, End});
(El) ->
gen_fsm:send_event(
C2SPid, {xmlstreamelement, El})
end, Payload),
Reply = ok,
process_buffered_request(Reply, StateName,
StateData#state{input = queue:new(),
rid = Rid,
key = SaveKey,
ctime = TNow,
timer = Timer,
pause = Pause,
last_poll = LastPoll,
req_list = ReqList,
max_inactivity = MaxInactivity,
max_pause = MaxPause,
ip = IP
})
end
end;
true ->
Reply = {error, bad_key},
{reply, Reply, StateName, StateData}
end.
process_buffered_request(Reply, StateName, StateData) ->
Rid = StateData#state.rid,
Requests = StateData#state.unprocessed_req_list,
case lists:keysearch(Rid+1, 2, Requests) of
{value, Request} ->
?DEBUG("Processing buffered request: ~p", [Request]),
NewRequests = lists:keydelete(Rid+1, 2, Requests),
handle_http_put_event(
Request, StateName,
StateData#state{unprocessed_req_list = NewRequests});
_ ->
{reply, Reply, StateName, StateData, hibernate}
end.
handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) ->
case http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) of
{error, not_exists} ->
?DEBUG("no session associated with sid: ~p", [Sid]),
{404, ?HEADER, ""};
{{error, Reason}, Sess} ->
?DEBUG("Error on HTTP put. Reason: ~p", [Reason]),
handle_http_put_error(Reason, Sess);
{{repeat, OutPacket}, Sess} ->
?DEBUG("http_put said 'repeat!' ...~nOutPacket: ~p", [OutPacket]),
send_outpacket(Sess, OutPacket);
{{wait, Pause}, _Sess} ->
?DEBUG("Trafic Shaper: Delaying request ~p", [Rid]),
timer:sleep(Pause),
%{200, ?HEADER,
% xml:element_to_binary(
% {xmlelement, "body",
% [{"xmlns", ?NS_HTTP_BIND},
% {"type", "error"}], []})};
handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize,
StreamStart, IP);
{ok, Sess} ->
prepare_response(Sess, Rid, [], StreamStart)
end.
http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) ->
?DEBUG("Looking for session: ~p", [Sid]),
case mnesia:dirty_read({http_bind, Sid}) of
[] ->
{error, not_exists};
[#http_bind{pid = FsmRef, hold=Hold, to={To, StreamVersion}}=Sess] ->
NewStream =
case StreamStart of
true ->
{To, StreamVersion};
_ ->
""
end,
{gen_fsm:sync_send_all_state_event(
FsmRef, #http_put{rid = Rid, attrs = Attrs, payload = Payload,
payload_size = PayloadSize, hold = Hold,
stream = NewStream, ip = IP}, 30000), Sess}
end.
handle_http_put_error(Reason, #http_bind{pid=FsmRef, version=Version})
when Version >= 0 ->
gen_fsm:sync_send_all_state_event(FsmRef, {stop, {put_error,Reason}}),
case Reason of
not_exists ->
{200, ?HEADER,
xml:element_to_binary(
{xmlelement, "body",
[{"xmlns", ?NS_HTTP_BIND},
{"type", "terminate"},
{"condition", "item-not-found"}], []})};
bad_key ->
{200, ?HEADER,
xml:element_to_binary(
{xmlelement, "body",
[{"xmlns", ?NS_HTTP_BIND},
{"type", "terminate"},
{"condition", "item-not-found"}], []})};
polling_too_frequently ->
{200, ?HEADER,
xml:element_to_binary(
{xmlelement, "body",
[{"xmlns", ?NS_HTTP_BIND},
{"type", "terminate"},
{"condition", "policy-violation"}], []})}
end;
handle_http_put_error(Reason, #http_bind{pid=FsmRef}) ->
gen_fsm:sync_send_all_state_event(FsmRef,{stop, {put_error_no_version, Reason}}),
case Reason of
not_exists -> %% bad rid
?DEBUG("Closing HTTP bind session (Bad rid).", []),
{404, ?HEADER, ""};
bad_key ->
?DEBUG("Closing HTTP bind session (Bad key).", []),
{404, ?HEADER, ""};
polling_too_frequently ->
?DEBUG("Closing HTTP bind session (User polling too frequently).", []),
{403, ?HEADER, ""}
end.
%% Control RID ordering
rid_allow(none, _NewRid, _Attrs, _Hold, _MaxPause) ->
%% First request - nothing saved so far
{true, 0};
rid_allow(OldRid, NewRid, Attrs, Hold, MaxPause) ->
?DEBUG("Previous rid / New rid: ~p/~p", [OldRid, NewRid]),
if
%% We did not miss any packet, we can process it immediately:
NewRid == OldRid + 1 ->
case catch list_to_integer(
xml:get_attr_s("pause", Attrs)) of
{'EXIT', _} ->
{true, 0};
Pause1 when Pause1 =< MaxPause ->
?DEBUG("got pause: ~p", [Pause1]),
{true, Pause1};
_ ->
{true, 0}
end;
%% We have missed packets, we need to cached it to process it later on:
(OldRid < NewRid) and
(NewRid =< (OldRid + Hold + 1)) ->
buffer;
(NewRid =< OldRid) and
(NewRid > OldRid - Hold - 1) ->
repeat;
true ->
false
end.
update_shaper(ShaperState, PayloadSize) ->
{NewShaperState, Pause} = shaper:update(ShaperState, PayloadSize),
if
Pause > 0 ->
ShaperTimer = erlang:start_timer(Pause, self(), activate), %% MR: Seems timer is not needed. Activate is not handled
{NewShaperState, ShaperTimer};
true ->
{NewShaperState, undefined}
end.
prepare_response(Sess, Rid, OutputEls, StreamStart) ->
receive after Sess#http_bind.process_delay -> ok end,
case catch http_get(Sess, Rid) of
{ok, cancel} ->
%% actually it would be better if we could completely
%% cancel this request, but then we would have to hack
%% ejabberd_http and I'm too lazy now
{200, ?HEADER, "<body type='error' xmlns='"++?NS_HTTP_BIND++"'/>"};
{ok, empty} ->
{200, ?HEADER, "<body xmlns='"++?NS_HTTP_BIND++"'/>"};
{ok, terminate} ->
{200, ?HEADER, "<body type='terminate' xmlns='"++?NS_HTTP_BIND++"'/>"};
{ok, ROutPacket} ->
OutPacket = lists:reverse(ROutPacket),
?DEBUG("OutPacket: ~p", [OutputEls++OutPacket]),
prepare_outpacket_response(Sess, Rid, OutputEls++OutPacket, StreamStart);
{'EXIT', {shutdown, _}} ->
{200, ?HEADER, "<body type='terminate' condition='system-shutdown' xmlns='"++?NS_HTTP_BIND++"'/>"};
{'EXIT', _Reason} ->
{200, ?HEADER, "<body type='terminate' xmlns='"++?NS_HTTP_BIND++"'/>"}
end.
%% Send output payloads on establised sessions
prepare_outpacket_response(Sess, _Rid, OutPacket, false) ->
case catch send_outpacket(Sess, OutPacket) of
{'EXIT', _Reason} ->
?DEBUG("Error in sending packet ~p ", [_Reason]),
{200, ?HEADER,
"<body type='terminate' xmlns='"++
?NS_HTTP_BIND++"'/>"};
SendRes ->
SendRes
end;
%% Handle a new session along with its output payload
prepare_outpacket_response(#http_bind{id=Sid, wait=Wait,
hold=Hold, to=To}=_Sess,
_Rid, OutPacket, true) ->
case OutPacket of
[{xmlstreamstart, _, OutAttrs} | Els] ->
AuthID = xml:get_attr_s("id", OutAttrs),
From = xml:get_attr_s("from", OutAttrs),
Version = xml:get_attr_s("version", OutAttrs),
OutEls =
case Els of
[] ->
[];
[{xmlstreamelement,
{xmlelement, "stream:features",
StreamAttribs, StreamEls}}
| StreamTail] ->
TypedTail =
[check_default_xmlns(OEl) ||
{xmlstreamelement, OEl} <-
StreamTail],
[{xmlelement, "stream:features",
[{"xmlns:stream",
?NS_STREAM}] ++
StreamAttribs, StreamEls}] ++
TypedTail;
StreamTail ->
[check_default_xmlns(OEl) ||
{xmlstreamelement, OEl} <-
StreamTail]
end,
case OutEls of
[{xmlelement,
"stream:error",_,_}] ->
{200, ?HEADER, "<body type='terminate' "
"condition='host-unknown' "
"xmlns='"++?NS_HTTP_BIND++"'/>"};
_ ->
BOSH_attribs =
[{"authid", AuthID},
{"xmlns:xmpp", ?NS_BOSH},
{"xmlns:stream", ?NS_STREAM}] ++
case OutEls of
[] ->
[];
_ ->
[{"xmpp:version", Version}]
end,
MaxInactivity = get_max_inactivity(To, ?MAX_INACTIVITY),
MaxPause = get_max_pause(To),
{200, ?HEADER,
xml:element_to_binary(
{xmlelement,"body",
[{"xmlns",
?NS_HTTP_BIND},
{"sid", Sid},
{"wait", integer_to_list(Wait)},
{"requests", integer_to_list(Hold+1)},
{"inactivity",
integer_to_list(
trunc(MaxInactivity/1000))},
{"maxpause",
integer_to_list(MaxPause)},
{"polling",
integer_to_list(
trunc(?MIN_POLLING/1000000))},
{"ver", ?BOSH_VERSION},
{"from", From},
{"secure", "true"} %% we're always being secure
] ++ BOSH_attribs,OutEls})}
end;
_ ->
{200, ?HEADER, "<body type='terminate' "
"condition='internal-server-error' "
"xmlns='"++?NS_HTTP_BIND++"'/>"}
end.
http_get(#http_bind{pid = FsmRef, wait = Wait, hold = Hold}, Rid) ->
gen_fsm:sync_send_all_state_event(
FsmRef, {http_get, Rid, Wait, Hold}, 2 * ?MAX_WAIT * 1000).
send_outpacket(#http_bind{pid = FsmRef}, OutPacket) ->
case OutPacket of
[] ->
{200, ?HEADER, "<body xmlns='"++?NS_HTTP_BIND++"'/>"};
[{xmlstreamend, _}] ->
gen_fsm:sync_send_all_state_event(FsmRef,{stop,stream_closed}),
{200, ?HEADER, "<body xmlns='"++?NS_HTTP_BIND++"'/>"};
_ ->
%% TODO: We parse to add a default namespace to packet,
%% The spec says adding the jabber:client namespace if
%% mandatory, even if some implementation do not do that
%% change on packets.
%% I think this should be an option to avoid modifying
%% packet in most case.
AllElements =
lists:all(fun({xmlstreamelement,
{xmlelement, "stream:error", _, _}}) -> false;
({xmlstreamelement, _}) -> true;
({xmlstreamraw, _}) -> true;
(_) -> false
end, OutPacket),
case AllElements of
true ->
TypedEls = lists:foldl(fun({xmlstreamelement, El}, Acc) ->
Acc ++
[xml:element_to_string(
check_default_xmlns(El)
)];
({xmlstreamraw, R}, Acc) ->
Acc ++ [R]
end,
[],
OutPacket),
Body = "<body xmlns='"++?NS_HTTP_BIND++"'>"
++ TypedEls ++
"</body>",
?DEBUG(" --- outgoing data --- ~n~s~n --- END --- ~n",
[Body]),
{200, ?HEADER, Body};
false ->
case OutPacket of
[{xmlstreamstart, _, _} | SEls] ->
OutEls =
case SEls of
[{xmlstreamelement,
{xmlelement,
"stream:features",
StreamAttribs, StreamEls}} |
StreamTail] ->
TypedTail =
[check_default_xmlns(OEl) ||
{xmlstreamelement, OEl} <-
StreamTail],
[{xmlelement,
"stream:features",
[{"xmlns:stream",
?NS_STREAM}] ++
StreamAttribs, StreamEls}] ++
TypedTail;
StreamTail ->
[check_default_xmlns(OEl) ||
{xmlstreamelement, OEl} <-
StreamTail]
end,
{200, ?HEADER,
xml:element_to_binary(
{xmlelement,"body",
[{"xmlns",
?NS_HTTP_BIND}],
OutEls})};
_ ->
SErrCond =
lists:filter(
fun({xmlstreamelement,
{xmlelement, "stream:error",
_, _}}) ->
true;
(_) -> false
end, OutPacket),
StreamErrCond =
case SErrCond of
[] ->
null;
[{xmlstreamelement,
{xmlelement, _, _, _Cond} =
StreamErrorTag} | _] ->
[StreamErrorTag]
end,
gen_fsm:sync_send_all_state_event(FsmRef,
{stop, {stream_error,OutPacket}}),
case StreamErrCond of
null ->
{200, ?HEADER,
"<body type='terminate' "
"condition='internal-server-error' "
"xmlns='"++?NS_HTTP_BIND++"'/>"};
_ ->
{200, ?HEADER,
"<body type='terminate' "
"condition='remote-stream-error' "
"xmlns='"++?NS_HTTP_BIND++"' " ++
"xmlns:stream='"++?NS_STREAM++"'>" ++
elements_to_string(StreamErrCond) ++
"</body>"}
end
end
end
end.
parse_request(Data, PayloadSize, MaxStanzaSize) ->
?DEBUG("--- incoming data --- ~n~s~n --- END --- ", [Data]),
%% MR: I do not think it works if put put several elements in the
%% same body:
case xml_stream:parse_element(Data) of
{xmlelement, "body", Attrs, Els} ->
Xmlns = xml:get_attr_s("xmlns",Attrs),
if
Xmlns /= ?NS_HTTP_BIND ->
{error, bad_request};
true ->
case catch list_to_integer(xml:get_attr_s("rid", Attrs)) of
{'EXIT', _} ->
{error, bad_request};
Rid ->
%% I guess this is to remove XMLCDATA: Is it really needed ?
FixedEls =
lists:filter(
fun(I) ->
case I of
{xmlelement, _, _, _} ->
true;
_ ->
false
end
end, Els),
Sid = xml:get_attr_s("sid",Attrs),
if
PayloadSize =< MaxStanzaSize ->
{ok, {Sid, Rid, Attrs, FixedEls}};
true ->
{size_limit, Sid}
end
end
end;
{xmlelement, _Name, _Attrs, _Els} ->
{error, bad_request};
{error, _Reason} ->
{error, bad_request}
end.
send_receiver_reply(undefined, _Reply) ->
ok;
send_receiver_reply(Receiver, Reply) ->
gen_fsm:reply(Receiver, Reply).
%% Cancel timer and empty message queue.
cancel_timer(undefined) ->
ok;
cancel_timer(Timer) ->
erlang:cancel_timer(Timer),
receive
{timeout, Timer, _} ->
ok
after 0 ->
ok
end.
%% If client asked for a pause (pause > 0), we apply the pause value
%% as inactivity timer:
set_inactivity_timer(Pause, _MaxInactivity) when Pause > 0 ->
erlang:start_timer(Pause*1000, self(), []);
%% Otherwise, we apply the max_inactivity value as inactivity timer:
set_inactivity_timer(_Pause, MaxInactivity) ->
erlang:start_timer(MaxInactivity, self(), []).
%% TODO: Use tail recursion and list reverse ?
elements_to_string([]) ->
[];
elements_to_string([El | Els]) ->
[xml:element_to_binary(El)|elements_to_string(Els)].
%% @spec (To, Default::integer()) -> integer()
%% where To = [] | {Host::string(), Version::string()}
get_max_inactivity({Host, _}, Default) ->
case gen_mod:get_module_opt(Host, mod_http_bind, max_inactivity, undefined) of
Seconds when is_integer(Seconds) ->
Seconds * 1000;
undefined ->
Default
end;
get_max_inactivity(_, Default) ->
Default.
get_max_pause({Host, _}) ->
gen_mod:get_module_opt(Host, mod_http_bind, max_pause, ?MAX_PAUSE);
get_max_pause(_) ->
?MAX_PAUSE.
%% Current time as integer
tnow() ->
{TMegSec, TSec, TMSec} = now(),
(TMegSec * 1000000 + TSec) * 1000000 + TMSec.
check_default_xmlns({xmlelement, Name, Attrs, Els} = El) ->
case xml:get_tag_attr_s("xmlns", El) of
"" -> {xmlelement, Name, [{"xmlns", ?NS_CLIENT} | Attrs], Els};
_ -> El
end;
check_default_xmlns(El) ->
El.
%% Check that mod_http_bind has been defined in config file.
%% Print a warning in log file if this is not the case.
check_bind_module(XmppDomain) ->
case gen_mod:is_loaded(XmppDomain, mod_http_bind) of
true -> ok;
false -> ?ERROR_MSG("You are trying to use BOSH (HTTP Bind) in host ~p,"
" but the module mod_http_bind is not started in"
" that host. Configure your BOSH client to connect"
" to the correct host, or add your desired host to"
" the configuration, or check your 'modules'"
" section in your ejabberd configuration file.",
[XmppDomain])
end.