25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-11-22 16:20:52 +01:00

Implement packets reordering to avoid race conditions (EJAB-724).(thanks to Michael Remond)

SVN Revision: 2306
This commit is contained in:
Badlop 2009-06-16 18:26:44 +00:00
parent 1d1f72fdc2
commit 6e52ca3f4e

View File

@ -4,7 +4,7 @@
%%% Purpose : Implements XMPP over BOSH (XEP-0205) (formerly known as %%% Purpose : Implements XMPP over BOSH (XEP-0205) (formerly known as
%%% HTTP Binding) %%% HTTP Binding)
%%% Created : 21 Sep 2005 by Stefan Strigler <steve@zeank.in-berlin.de> %%% Created : 21 Sep 2005 by Stefan Strigler <steve@zeank.in-berlin.de>
%%% Id : $Id: ejabberd_http_bind.erl 694 2008-07-15 13:27:03Z alexey $ %%% Id : $Id: ejabberd_http_bind.erl 720 2008-09-17 15:52:58Z mremond $
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
-module(ejabberd_http_bind). -module(ejabberd_http_bind).
@ -58,7 +58,8 @@
ctime = 0, ctime = 0,
timer, timer,
pause=0, pause=0,
req_list = [], % list of requests unprocessed_req_list = [], % list of request that have been delayed for proper reordering
req_list = [], % list of requests (cache)
ip = ?NULL_PEER ip = ?NULL_PEER
}). }).
@ -333,21 +334,20 @@ handle_sync_event(stop, _From, _StateName, StateData) ->
Reply = ok, Reply = ok,
{stop, normal, Reply, StateData}; {stop, normal, Reply, StateData};
handle_sync_event({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP}, %% HTTP PUT: Receive packets from the client
handle_sync_event({http_put, Rid, Attrs, _Payload, Hold, _StreamTo, _IP}=Request,
_From, StateName, StateData) -> _From, StateName, StateData) ->
Key = xml:get_attr_s("key", Attrs), %% Check if Rid valid
NewKey = xml:get_attr_s("newkey", Attrs), RidAllow =
%% check if Rid valid case StateData#state.rid of
RidAllow = case StateData#state.rid of
none -> none ->
%% first request - nothing saved so far %% First request - nothing saved so far
{true, 0}; {true, 0};
OldRid -> OldRid ->
?DEBUG("state.rid/cur rid: ~p/~p", ?DEBUG("state.rid/cur rid: ~p/~p", [OldRid, Rid]),
[OldRid, Rid]),
if if
(OldRid < Rid) and %% We did not miss any packet, we can process it immediately:
(Rid =< (OldRid + Hold + 1)) -> Rid == OldRid + 1 ->
case catch list_to_integer( case catch list_to_integer(
xml:get_attr_s("pause", Attrs)) of xml:get_attr_s("pause", Attrs)) of
{'EXIT', _} -> {'EXIT', _} ->
@ -358,6 +358,10 @@ handle_sync_event({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP},
_ -> _ ->
{true, 0} {true, 0}
end; end;
%% We have missed packets, we need to cached it to process it later on:
(OldRid < Rid) and
(Rid =< (OldRid + Hold + 1)) ->
buffer;
(Rid =< OldRid) and (Rid =< OldRid) and
(Rid > OldRid - Hold - 1) -> (Rid > OldRid - Hold - 1) ->
repeat; repeat;
@ -365,155 +369,22 @@ handle_sync_event({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP},
false false
end end
end, end,
%% check if key valid
KeyAllow = case RidAllow of %% Check if Rid is in sequence or out of sequence:
repeat ->
true;
false ->
false;
{true, _} ->
case StateData#state.key of
"" ->
true;
OldKey ->
NextKey = jlib:tolower(
hex(binary_to_list(
crypto:sha(Key)))),
?DEBUG("Key/OldKey/NextKey: ~s/~s/~s",
[Key, OldKey, NextKey]),
if
OldKey == NextKey ->
true;
true ->
?DEBUG("wrong key: ~s",[Key]),
false
end
end
end,
{TMegSec, TSec, TMSec} = now(),
TNow = (TMegSec * 1000000 + TSec) * 1000000 + TMSec,
LastPoll = if
Payload == "" ->
TNow;
true ->
0
end,
if
(Payload == "") and
(Hold == 0) and
(TNow - StateData#state.last_poll < ?MIN_POLLING) ->
Reply = {error, polling_too_frequently},
{reply, Reply, StateName, StateData};
KeyAllow ->
case RidAllow of case RidAllow of
false -> buffer ->
Reply = {error, not_exists}, ?DEBUG("Buffered request: ~p", [Request]),
{reply, Reply, StateName, StateData}; %% Request is out of sequence:
repeat -> PendingRequests = StateData#state.unprocessed_req_list,
?DEBUG("REPEATING ~p", [Rid]), %% In case an existing RID was already buffered:
[Out | _XS] = [El#hbr.out || Requests = lists:keydelete(Rid, 2, PendingRequests),
El <- StateData#state.req_list, {reply, ok, StateName, StateData#state{unprocessed_req_list=[Request|Requests]}};
El#hbr.rid == Rid],
case Out of
[[] | OutPacket] ->
Reply = {repeat, OutPacket};
_ -> _ ->
Reply = {repeat, Out} %% Request is in sequence:
end, process_http_put(Request, StateName, StateData, RidAllow)
{reply, Reply, StateName,
StateData#state{input = "cancel", last_poll = LastPoll}};
{true, Pause} ->
SaveKey = if
NewKey == "" ->
Key;
true ->
NewKey
end,
?DEBUG(" -- SaveKey: ~s~n", [SaveKey]),
%% save request
ReqList = [#hbr{rid=Rid,
key=StateData#state.key,
in=StateData#state.input,
out=StateData#state.output
} |
[El || El <- StateData#state.req_list,
El#hbr.rid < Rid,
El#hbr.rid > (Rid - 1 - Hold)]
],
%% ?DEBUG("reqlist: ~p", [ReqList]),
%% setup next timer
if
StateData#state.timer /= undefined ->
cancel_timer(StateData#state.timer);
true ->
ok
end,
Timer = if
Pause > 0 ->
erlang:start_timer(
Pause*1000, self(), []);
true ->
erlang:start_timer(
?MAX_INACTIVITY, self(), [])
end,
case StateData#state.waiting_input of
false ->
Input = Payload ++ [StateData#state.input],
Reply = ok,
{reply, Reply, StateName,
StateData#state{input = Input,
rid = Rid,
key = SaveKey,
ctime = TNow,
timer = Timer,
pause = Pause,
last_poll = LastPoll,
req_list = ReqList,
ip = IP
}};
{Receiver, _Tag} ->
SendPacket =
case StreamTo of
{To, ""} ->
["<stream:stream to='", To, "' "
"xmlns='"++?NS_CLIENT++"' "
"xmlns:stream='"++?NS_STREAM++"'>"]
++ Payload;
{To, Version} ->
["<stream:stream to='", To, "' "
"xmlns='"++?NS_CLIENT++"' "
"version='", Version, "' "
"xmlns:stream='"++?NS_STREAM++"'>"]
++ Payload;
_ ->
Payload
end,
?DEBUG("really sending now: ~s", [SendPacket]),
Receiver ! {tcp, StateData#state.socket,
list_to_binary(SendPacket)},
Reply = ok,
{reply, Reply, StateName,
StateData#state{waiting_input = false,
last_receiver = Receiver,
input = "",
rid = Rid,
key = SaveKey,
ctime = TNow,
timer = Timer,
pause = Pause,
last_poll = LastPoll,
req_list = ReqList,
ip = IP
}}
end
end;
true ->
Reply = {error, bad_key},
{reply, Reply, StateName, StateData}
end; end;
%% HTTP GET: send packets to the client
handle_sync_event({http_get, Rid, Wait, Hold}, From, StateName, StateData) -> handle_sync_event({http_get, Rid, Wait, Hold}, From, StateName, StateData) ->
%% setup timer %% setup timer
if if
@ -695,6 +566,173 @@ terminate(_Reason, _StateName, StateData) ->
%%% Internal functions %%% Internal functions
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
%% PUT / Get processing:
process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP},
StateName, StateData, RidAllow) ->
%% Check if key valid
Key = xml:get_attr_s("key", Attrs),
NewKey = xml:get_attr_s("newkey", Attrs),
KeyAllow =
case RidAllow of
repeat ->
true;
false ->
false;
{true, _} ->
case StateData#state.key of
"" ->
true;
OldKey ->
NextKey = jlib:tolower(
hex(binary_to_list(
crypto:sha(Key)))),
?DEBUG("Key/OldKey/NextKey: ~s/~s/~s", [Key, OldKey, NextKey]),
if
OldKey == NextKey ->
true;
true ->
?DEBUG("wrong key: ~s",[Key]),
false
end
end
end,
{TMegSec, TSec, TMSec} = now(),
TNow = (TMegSec * 1000000 + TSec) * 1000000 + TMSec,
LastPoll = if
Payload == "" ->
TNow;
true ->
0
end,
if
(Payload == "") and
(Hold == 0) and
(TNow - StateData#state.last_poll < ?MIN_POLLING) ->
Reply = {error, polling_too_frequently},
{reply, Reply, StateName, StateData};
KeyAllow ->
case RidAllow of
false ->
Reply = {error, not_exists},
{reply, Reply, StateName, StateData};
repeat ->
?DEBUG("REPEATING ~p", [Rid]),
[Out | _XS] = [El#hbr.out ||
El <- StateData#state.req_list,
El#hbr.rid == Rid],
case Out of
[[] | OutPacket] ->
Reply = {repeat, OutPacket};
_ ->
Reply = {repeat, Out}
end,
{reply, Reply, StateName,
StateData#state{input = "cancel", last_poll = LastPoll}};
{true, Pause} ->
SaveKey = if
NewKey == "" ->
Key;
true ->
NewKey
end,
?DEBUG(" -- SaveKey: ~s~n", [SaveKey]),
%% save request
ReqList = [#hbr{rid=Rid,
key=StateData#state.key,
in=StateData#state.input,
out=StateData#state.output
} |
[El || El <- StateData#state.req_list,
El#hbr.rid < Rid,
El#hbr.rid > (Rid - 1 - Hold)]
],
%% ?DEBUG("reqlist: ~p", [ReqList]),
%% setup next timer
if
StateData#state.timer /= undefined ->
cancel_timer(StateData#state.timer);
true ->
ok
end,
Timer = if
Pause > 0 ->
erlang:start_timer(
Pause*1000, self(), []);
true ->
erlang:start_timer(
?MAX_INACTIVITY, self(), [])
end,
case StateData#state.waiting_input of
false ->
Input = Payload ++ [StateData#state.input],
Reply = ok,
process_buffered_request(Reply, StateName,
StateData#state{input = Input,
rid = Rid,
key = SaveKey,
ctime = TNow,
timer = Timer,
pause = Pause,
last_poll = LastPoll,
req_list = ReqList,
ip = IP
});
{Receiver, _Tag} ->
SendPacket =
case StreamTo of
{To, ""} ->
["<stream:stream to='", To, "' "
"xmlns='"++?NS_CLIENT++"' "
"xmlns:stream='"++?NS_STREAM++"'>"]
++ Payload;
{To, Version} ->
["<stream:stream to='", To, "' "
"xmlns='"++?NS_CLIENT++"' "
"version='", Version, "' "
"xmlns:stream='"++?NS_STREAM++"'>"]
++ Payload;
_ ->
Payload
end,
?DEBUG("really sending now: ~s", [SendPacket]),
Receiver ! {tcp, StateData#state.socket,
list_to_binary(SendPacket)},
Reply = ok,
process_buffered_request(Reply, StateName,
StateData#state{waiting_input = false,
last_receiver = Receiver,
input = "",
rid = Rid,
key = SaveKey,
ctime = TNow,
timer = Timer,
pause = Pause,
last_poll = LastPoll,
req_list = ReqList,
ip = IP
})
end
end;
true ->
Reply = {error, bad_key},
{reply, Reply, StateName, StateData}
end.
process_buffered_request(Reply, StateName, StateData) ->
Rid = StateData#state.rid,
Requests = StateData#state.unprocessed_req_list,
case lists:keysearch(Rid+1, 2, Requests) of
{value, Request} ->
?DEBUG("Processing buffered request: ~p", [Request]),
NewRequests = Requests -- [Request],
handle_sync_event(Request, undefined, StateName,
StateData#state{unprocessed_req_list=NewRequests});
_ ->
{reply, Reply, StateName, StateData}
end.
handle_http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) -> handle_http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) ->
case http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) of case http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) of
{error, not_exists} -> {error, not_exists} ->