mirror of
https://github.com/processone/ejabberd.git
synced 2024-11-24 16:23:40 +01:00
Implement packets reordering to avoid race conditions (EJAB-724).(thanks to Michael Remond)
SVN Revision: 2243
This commit is contained in:
parent
b8478c50b9
commit
3e9b5d4ed4
@ -4,7 +4,7 @@
|
|||||||
%%% Purpose : Implements XMPP over BOSH (XEP-0205) (formerly known as
|
%%% Purpose : Implements XMPP over BOSH (XEP-0205) (formerly known as
|
||||||
%%% HTTP Binding)
|
%%% HTTP Binding)
|
||||||
%%% Created : 21 Sep 2005 by Stefan Strigler <steve@zeank.in-berlin.de>
|
%%% Created : 21 Sep 2005 by Stefan Strigler <steve@zeank.in-berlin.de>
|
||||||
%%% Id : $Id: ejabberd_http_bind.erl 694 2008-07-15 13:27:03Z alexey $
|
%%% Id : $Id: ejabberd_http_bind.erl 720 2008-09-17 15:52:58Z mremond $
|
||||||
%%%----------------------------------------------------------------------
|
%%%----------------------------------------------------------------------
|
||||||
|
|
||||||
-module(ejabberd_http_bind).
|
-module(ejabberd_http_bind).
|
||||||
@ -58,7 +58,8 @@
|
|||||||
ctime = 0,
|
ctime = 0,
|
||||||
timer,
|
timer,
|
||||||
pause=0,
|
pause=0,
|
||||||
req_list = [], % list of requests
|
unprocessed_req_list = [], % list of request that have been delayed for proper reordering
|
||||||
|
req_list = [], % list of requests (cache)
|
||||||
ip = ?NULL_PEER
|
ip = ?NULL_PEER
|
||||||
}).
|
}).
|
||||||
|
|
||||||
@ -333,21 +334,20 @@ handle_sync_event(stop, _From, _StateName, StateData) ->
|
|||||||
Reply = ok,
|
Reply = ok,
|
||||||
{stop, normal, Reply, StateData};
|
{stop, normal, Reply, StateData};
|
||||||
|
|
||||||
handle_sync_event({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP},
|
%% HTTP PUT: Receive packets from the client
|
||||||
|
handle_sync_event({http_put, Rid, Attrs, _Payload, Hold, _StreamTo, _IP}=Request,
|
||||||
_From, StateName, StateData) ->
|
_From, StateName, StateData) ->
|
||||||
Key = xml:get_attr_s("key", Attrs),
|
%% Check if Rid valid
|
||||||
NewKey = xml:get_attr_s("newkey", Attrs),
|
RidAllow =
|
||||||
%% check if Rid valid
|
case StateData#state.rid of
|
||||||
RidAllow = case StateData#state.rid of
|
|
||||||
none ->
|
none ->
|
||||||
%% first request - nothing saved so far
|
%% First request - nothing saved so far
|
||||||
{true, 0};
|
{true, 0};
|
||||||
OldRid ->
|
OldRid ->
|
||||||
?DEBUG("state.rid/cur rid: ~p/~p",
|
?DEBUG("state.rid/cur rid: ~p/~p", [OldRid, Rid]),
|
||||||
[OldRid, Rid]),
|
|
||||||
if
|
if
|
||||||
(OldRid < Rid) and
|
%% We did not miss any packet, we can process it immediately:
|
||||||
(Rid =< (OldRid + Hold + 1)) ->
|
Rid == OldRid + 1 ->
|
||||||
case catch list_to_integer(
|
case catch list_to_integer(
|
||||||
xml:get_attr_s("pause", Attrs)) of
|
xml:get_attr_s("pause", Attrs)) of
|
||||||
{'EXIT', _} ->
|
{'EXIT', _} ->
|
||||||
@ -358,6 +358,10 @@ handle_sync_event({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP},
|
|||||||
_ ->
|
_ ->
|
||||||
{true, 0}
|
{true, 0}
|
||||||
end;
|
end;
|
||||||
|
%% We have missed packets, we need to cached it to process it later on:
|
||||||
|
(OldRid < Rid) and
|
||||||
|
(Rid =< (OldRid + Hold + 1)) ->
|
||||||
|
buffer;
|
||||||
(Rid =< OldRid) and
|
(Rid =< OldRid) and
|
||||||
(Rid > OldRid - Hold - 1) ->
|
(Rid > OldRid - Hold - 1) ->
|
||||||
repeat;
|
repeat;
|
||||||
@ -365,155 +369,22 @@ handle_sync_event({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP},
|
|||||||
false
|
false
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
%% check if key valid
|
|
||||||
KeyAllow = case RidAllow of
|
%% Check if Rid is in sequence or out of sequence:
|
||||||
repeat ->
|
|
||||||
true;
|
|
||||||
false ->
|
|
||||||
false;
|
|
||||||
{true, _} ->
|
|
||||||
case StateData#state.key of
|
|
||||||
"" ->
|
|
||||||
true;
|
|
||||||
OldKey ->
|
|
||||||
NextKey = jlib:tolower(
|
|
||||||
hex(binary_to_list(
|
|
||||||
crypto:sha(Key)))),
|
|
||||||
?DEBUG("Key/OldKey/NextKey: ~s/~s/~s",
|
|
||||||
[Key, OldKey, NextKey]),
|
|
||||||
if
|
|
||||||
OldKey == NextKey ->
|
|
||||||
true;
|
|
||||||
true ->
|
|
||||||
?DEBUG("wrong key: ~s",[Key]),
|
|
||||||
false
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
{TMegSec, TSec, TMSec} = now(),
|
|
||||||
TNow = (TMegSec * 1000000 + TSec) * 1000000 + TMSec,
|
|
||||||
LastPoll = if
|
|
||||||
Payload == "" ->
|
|
||||||
TNow;
|
|
||||||
true ->
|
|
||||||
0
|
|
||||||
end,
|
|
||||||
if
|
|
||||||
(Payload == "") and
|
|
||||||
(Hold == 0) and
|
|
||||||
(TNow - StateData#state.last_poll < ?MIN_POLLING) ->
|
|
||||||
Reply = {error, polling_too_frequently},
|
|
||||||
{reply, Reply, StateName, StateData};
|
|
||||||
KeyAllow ->
|
|
||||||
case RidAllow of
|
case RidAllow of
|
||||||
false ->
|
buffer ->
|
||||||
Reply = {error, not_exists},
|
?DEBUG("Buffered request: ~p", [Request]),
|
||||||
{reply, Reply, StateName, StateData};
|
%% Request is out of sequence:
|
||||||
repeat ->
|
PendingRequests = StateData#state.unprocessed_req_list,
|
||||||
?DEBUG("REPEATING ~p", [Rid]),
|
%% In case an existing RID was already buffered:
|
||||||
[Out | _XS] = [El#hbr.out ||
|
Requests = lists:keydelete(Rid, 2, PendingRequests),
|
||||||
El <- StateData#state.req_list,
|
{reply, ok, StateName, StateData#state{unprocessed_req_list=[Request|Requests]}};
|
||||||
El#hbr.rid == Rid],
|
|
||||||
case Out of
|
|
||||||
[[] | OutPacket] ->
|
|
||||||
Reply = {repeat, OutPacket};
|
|
||||||
_ ->
|
_ ->
|
||||||
Reply = {repeat, Out}
|
%% Request is in sequence:
|
||||||
end,
|
process_http_put(Request, StateName, StateData, RidAllow)
|
||||||
{reply, Reply, StateName,
|
|
||||||
StateData#state{input = "cancel", last_poll = LastPoll}};
|
|
||||||
{true, Pause} ->
|
|
||||||
SaveKey = if
|
|
||||||
NewKey == "" ->
|
|
||||||
Key;
|
|
||||||
true ->
|
|
||||||
NewKey
|
|
||||||
end,
|
|
||||||
?DEBUG(" -- SaveKey: ~s~n", [SaveKey]),
|
|
||||||
|
|
||||||
%% save request
|
|
||||||
ReqList = [#hbr{rid=Rid,
|
|
||||||
key=StateData#state.key,
|
|
||||||
in=StateData#state.input,
|
|
||||||
out=StateData#state.output
|
|
||||||
} |
|
|
||||||
[El || El <- StateData#state.req_list,
|
|
||||||
El#hbr.rid < Rid,
|
|
||||||
El#hbr.rid > (Rid - 1 - Hold)]
|
|
||||||
],
|
|
||||||
%% ?DEBUG("reqlist: ~p", [ReqList]),
|
|
||||||
|
|
||||||
%% setup next timer
|
|
||||||
if
|
|
||||||
StateData#state.timer /= undefined ->
|
|
||||||
cancel_timer(StateData#state.timer);
|
|
||||||
true ->
|
|
||||||
ok
|
|
||||||
end,
|
|
||||||
Timer = if
|
|
||||||
Pause > 0 ->
|
|
||||||
erlang:start_timer(
|
|
||||||
Pause*1000, self(), []);
|
|
||||||
true ->
|
|
||||||
erlang:start_timer(
|
|
||||||
?MAX_INACTIVITY, self(), [])
|
|
||||||
end,
|
|
||||||
case StateData#state.waiting_input of
|
|
||||||
false ->
|
|
||||||
Input = Payload ++ [StateData#state.input],
|
|
||||||
Reply = ok,
|
|
||||||
{reply, Reply, StateName,
|
|
||||||
StateData#state{input = Input,
|
|
||||||
rid = Rid,
|
|
||||||
key = SaveKey,
|
|
||||||
ctime = TNow,
|
|
||||||
timer = Timer,
|
|
||||||
pause = Pause,
|
|
||||||
last_poll = LastPoll,
|
|
||||||
req_list = ReqList,
|
|
||||||
ip = IP
|
|
||||||
}};
|
|
||||||
{Receiver, _Tag} ->
|
|
||||||
SendPacket =
|
|
||||||
case StreamTo of
|
|
||||||
{To, ""} ->
|
|
||||||
["<stream:stream to='", To, "' "
|
|
||||||
"xmlns='"++?NS_CLIENT++"' "
|
|
||||||
"xmlns:stream='"++?NS_STREAM++"'>"]
|
|
||||||
++ Payload;
|
|
||||||
{To, Version} ->
|
|
||||||
["<stream:stream to='", To, "' "
|
|
||||||
"xmlns='"++?NS_CLIENT++"' "
|
|
||||||
"version='", Version, "' "
|
|
||||||
"xmlns:stream='"++?NS_STREAM++"'>"]
|
|
||||||
++ Payload;
|
|
||||||
_ ->
|
|
||||||
Payload
|
|
||||||
end,
|
|
||||||
?DEBUG("really sending now: ~s", [SendPacket]),
|
|
||||||
Receiver ! {tcp, StateData#state.socket,
|
|
||||||
list_to_binary(SendPacket)},
|
|
||||||
Reply = ok,
|
|
||||||
{reply, Reply, StateName,
|
|
||||||
StateData#state{waiting_input = false,
|
|
||||||
last_receiver = Receiver,
|
|
||||||
input = "",
|
|
||||||
rid = Rid,
|
|
||||||
key = SaveKey,
|
|
||||||
ctime = TNow,
|
|
||||||
timer = Timer,
|
|
||||||
pause = Pause,
|
|
||||||
last_poll = LastPoll,
|
|
||||||
req_list = ReqList,
|
|
||||||
ip = IP
|
|
||||||
}}
|
|
||||||
end
|
|
||||||
end;
|
|
||||||
true ->
|
|
||||||
Reply = {error, bad_key},
|
|
||||||
{reply, Reply, StateName, StateData}
|
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
%% HTTP GET: send packets to the client
|
||||||
handle_sync_event({http_get, Rid, Wait, Hold}, From, StateName, StateData) ->
|
handle_sync_event({http_get, Rid, Wait, Hold}, From, StateName, StateData) ->
|
||||||
%% setup timer
|
%% setup timer
|
||||||
if
|
if
|
||||||
@ -695,6 +566,173 @@ terminate(_Reason, _StateName, StateData) ->
|
|||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%----------------------------------------------------------------------
|
%%%----------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% PUT / Get processing:
|
||||||
|
process_http_put({http_put, Rid, Attrs, Payload, Hold, StreamTo, IP},
|
||||||
|
StateName, StateData, RidAllow) ->
|
||||||
|
%% Check if key valid
|
||||||
|
Key = xml:get_attr_s("key", Attrs),
|
||||||
|
NewKey = xml:get_attr_s("newkey", Attrs),
|
||||||
|
KeyAllow =
|
||||||
|
case RidAllow of
|
||||||
|
repeat ->
|
||||||
|
true;
|
||||||
|
false ->
|
||||||
|
false;
|
||||||
|
{true, _} ->
|
||||||
|
case StateData#state.key of
|
||||||
|
"" ->
|
||||||
|
true;
|
||||||
|
OldKey ->
|
||||||
|
NextKey = jlib:tolower(
|
||||||
|
hex(binary_to_list(
|
||||||
|
crypto:sha(Key)))),
|
||||||
|
?DEBUG("Key/OldKey/NextKey: ~s/~s/~s", [Key, OldKey, NextKey]),
|
||||||
|
if
|
||||||
|
OldKey == NextKey ->
|
||||||
|
true;
|
||||||
|
true ->
|
||||||
|
?DEBUG("wrong key: ~s",[Key]),
|
||||||
|
false
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
{TMegSec, TSec, TMSec} = now(),
|
||||||
|
TNow = (TMegSec * 1000000 + TSec) * 1000000 + TMSec,
|
||||||
|
LastPoll = if
|
||||||
|
Payload == "" ->
|
||||||
|
TNow;
|
||||||
|
true ->
|
||||||
|
0
|
||||||
|
end,
|
||||||
|
if
|
||||||
|
(Payload == "") and
|
||||||
|
(Hold == 0) and
|
||||||
|
(TNow - StateData#state.last_poll < ?MIN_POLLING) ->
|
||||||
|
Reply = {error, polling_too_frequently},
|
||||||
|
{reply, Reply, StateName, StateData};
|
||||||
|
KeyAllow ->
|
||||||
|
case RidAllow of
|
||||||
|
false ->
|
||||||
|
Reply = {error, not_exists},
|
||||||
|
{reply, Reply, StateName, StateData};
|
||||||
|
repeat ->
|
||||||
|
?DEBUG("REPEATING ~p", [Rid]),
|
||||||
|
[Out | _XS] = [El#hbr.out ||
|
||||||
|
El <- StateData#state.req_list,
|
||||||
|
El#hbr.rid == Rid],
|
||||||
|
case Out of
|
||||||
|
[[] | OutPacket] ->
|
||||||
|
Reply = {repeat, OutPacket};
|
||||||
|
_ ->
|
||||||
|
Reply = {repeat, Out}
|
||||||
|
end,
|
||||||
|
{reply, Reply, StateName,
|
||||||
|
StateData#state{input = "cancel", last_poll = LastPoll}};
|
||||||
|
{true, Pause} ->
|
||||||
|
SaveKey = if
|
||||||
|
NewKey == "" ->
|
||||||
|
Key;
|
||||||
|
true ->
|
||||||
|
NewKey
|
||||||
|
end,
|
||||||
|
?DEBUG(" -- SaveKey: ~s~n", [SaveKey]),
|
||||||
|
|
||||||
|
%% save request
|
||||||
|
ReqList = [#hbr{rid=Rid,
|
||||||
|
key=StateData#state.key,
|
||||||
|
in=StateData#state.input,
|
||||||
|
out=StateData#state.output
|
||||||
|
} |
|
||||||
|
[El || El <- StateData#state.req_list,
|
||||||
|
El#hbr.rid < Rid,
|
||||||
|
El#hbr.rid > (Rid - 1 - Hold)]
|
||||||
|
],
|
||||||
|
%% ?DEBUG("reqlist: ~p", [ReqList]),
|
||||||
|
|
||||||
|
%% setup next timer
|
||||||
|
if
|
||||||
|
StateData#state.timer /= undefined ->
|
||||||
|
cancel_timer(StateData#state.timer);
|
||||||
|
true ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
Timer = if
|
||||||
|
Pause > 0 ->
|
||||||
|
erlang:start_timer(
|
||||||
|
Pause*1000, self(), []);
|
||||||
|
true ->
|
||||||
|
erlang:start_timer(
|
||||||
|
?MAX_INACTIVITY, self(), [])
|
||||||
|
end,
|
||||||
|
case StateData#state.waiting_input of
|
||||||
|
false ->
|
||||||
|
Input = Payload ++ [StateData#state.input],
|
||||||
|
Reply = ok,
|
||||||
|
process_buffered_request(Reply, StateName,
|
||||||
|
StateData#state{input = Input,
|
||||||
|
rid = Rid,
|
||||||
|
key = SaveKey,
|
||||||
|
ctime = TNow,
|
||||||
|
timer = Timer,
|
||||||
|
pause = Pause,
|
||||||
|
last_poll = LastPoll,
|
||||||
|
req_list = ReqList,
|
||||||
|
ip = IP
|
||||||
|
});
|
||||||
|
{Receiver, _Tag} ->
|
||||||
|
SendPacket =
|
||||||
|
case StreamTo of
|
||||||
|
{To, ""} ->
|
||||||
|
["<stream:stream to='", To, "' "
|
||||||
|
"xmlns='"++?NS_CLIENT++"' "
|
||||||
|
"xmlns:stream='"++?NS_STREAM++"'>"]
|
||||||
|
++ Payload;
|
||||||
|
{To, Version} ->
|
||||||
|
["<stream:stream to='", To, "' "
|
||||||
|
"xmlns='"++?NS_CLIENT++"' "
|
||||||
|
"version='", Version, "' "
|
||||||
|
"xmlns:stream='"++?NS_STREAM++"'>"]
|
||||||
|
++ Payload;
|
||||||
|
_ ->
|
||||||
|
Payload
|
||||||
|
end,
|
||||||
|
?DEBUG("really sending now: ~s", [SendPacket]),
|
||||||
|
Receiver ! {tcp, StateData#state.socket,
|
||||||
|
list_to_binary(SendPacket)},
|
||||||
|
Reply = ok,
|
||||||
|
process_buffered_request(Reply, StateName,
|
||||||
|
StateData#state{waiting_input = false,
|
||||||
|
last_receiver = Receiver,
|
||||||
|
input = "",
|
||||||
|
rid = Rid,
|
||||||
|
key = SaveKey,
|
||||||
|
ctime = TNow,
|
||||||
|
timer = Timer,
|
||||||
|
pause = Pause,
|
||||||
|
last_poll = LastPoll,
|
||||||
|
req_list = ReqList,
|
||||||
|
ip = IP
|
||||||
|
})
|
||||||
|
end
|
||||||
|
end;
|
||||||
|
true ->
|
||||||
|
Reply = {error, bad_key},
|
||||||
|
{reply, Reply, StateName, StateData}
|
||||||
|
end.
|
||||||
|
|
||||||
|
process_buffered_request(Reply, StateName, StateData) ->
|
||||||
|
Rid = StateData#state.rid,
|
||||||
|
Requests = StateData#state.unprocessed_req_list,
|
||||||
|
case lists:keysearch(Rid+1, 2, Requests) of
|
||||||
|
{value, Request} ->
|
||||||
|
?DEBUG("Processing buffered request: ~p", [Request]),
|
||||||
|
NewRequests = Requests -- [Request],
|
||||||
|
handle_sync_event(Request, undefined, StateName,
|
||||||
|
StateData#state{unprocessed_req_list=NewRequests});
|
||||||
|
_ ->
|
||||||
|
{reply, Reply, StateName, StateData}
|
||||||
|
end.
|
||||||
|
|
||||||
handle_http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) ->
|
handle_http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) ->
|
||||||
case http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) of
|
case http_put(Sid, Rid, Attrs, Payload, StreamStart, IP) of
|
||||||
{error, not_exists} ->
|
{error, not_exists} ->
|
||||||
|
Loading…
Reference in New Issue
Block a user