Revert "Apply shaping to websocket connections"

This reverts commit dda89aeda0.
This commit is contained in:
Frank Diebolt 2019-01-30 10:37:26 +01:00
parent dda89aeda0
commit 7df03ef56f
2 changed files with 18 additions and 52 deletions

View File

@ -108,8 +108,9 @@ close({http_ws, FsmRef, _IP}) ->
reset_stream({http_ws, _FsmRef, _IP} = Socket) -> reset_stream({http_ws, _FsmRef, _IP} = Socket) ->
Socket. Socket.
change_shaper({http_ws, FsmRef, _IP}, Shaper) -> change_shaper({http_ws, _FsmRef, _IP}, _Shaper) ->
p1_fsm:send_all_state_event(FsmRef, {new_shaper, Shaper}). %% TODO???
ok.
get_transport(_Socket) -> get_transport(_Socket) ->
websocket. websocket.
@ -168,10 +169,7 @@ handle_event({activate, From}, StateName, State) ->
end, Input), end, Input),
State#state{active = false, input = []} State#state{active = false, input = []}
end, end,
{next_state, StateName, State1#state{c2s_pid = From}}; {next_state, StateName, State1#state{c2s_pid = From}}.
handle_event({new_shaper, Shaper}, StateName, #state{ws = {_, WsPid}} = StateData) ->
WsPid ! {new_shaper, Shaper},
{next_state, StateName, StateData}.
handle_sync_event({send_xml, Packet}, _From, StateName, handle_sync_event({send_xml, Packet}, _From, StateName,
#state{ws = {_, WsPid}, rfc_compilant = R} = StateData) -> #state{ws = {_, WsPid}, rfc_compilant = R} = StateData) ->

View File

@ -141,7 +141,7 @@ connect(#ws{socket = Socket, sockmod = SockMod} = Ws, WsLoop) ->
_ -> _ ->
SockMod:setopts(Socket, [{packet, 0}, {active, true}]) SockMod:setopts(Socket, [{packet, 0}, {active, true}])
end, end,
ws_loop(none, Socket, WsHandleLoopPid, SockMod, none). ws_loop(none, Socket, WsHandleLoopPid, SockMod).
handshake(#ws{headers = Headers} = State) -> handshake(#ws{headers = Headers} = State) ->
{_, Key} = lists:keyfind(<<"Sec-Websocket-Key">>, 1, {_, Key} = lists:keyfind(<<"Sec-Websocket-Key">>, 1,
@ -174,26 +174,18 @@ find_subprotocol(Headers) ->
end. end.
ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode, Shaper) -> ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode) ->
receive receive
{DataType, _Socket, Data} when DataType =:= tcp orelse DataType =:= raw -> {DataType, _Socket, Data} when DataType =:= tcp orelse DataType =:= raw ->
case handle_data(DataType, FrameInfo, Data, Socket, WsHandleLoopPid, SocketMode, Shaper) of case handle_data(DataType, FrameInfo, Data, Socket, WsHandleLoopPid, SocketMode) of
{error, Error} -> {error, Error} ->
?DEBUG("tls decode error ~p", [Error]), ?DEBUG("tls decode error ~p", [Error]),
websocket_close(Socket, WsHandleLoopPid, SocketMode, 1002); % protocol error websocket_close(Socket, WsHandleLoopPid, SocketMode, 1002); % protocol error
{NewFrameInfo, ToSend, NewShaper} -> {NewFrameInfo, ToSend} ->
lists:foreach(fun(Pkt) -> SocketMode:send(Socket, Pkt) lists:foreach(fun(Pkt) -> SocketMode:send(Socket, Pkt)
end, ToSend), end, ToSend),
ws_loop(NewFrameInfo, Socket, WsHandleLoopPid, SocketMode, NewShaper) ws_loop(NewFrameInfo, Socket, WsHandleLoopPid, SocketMode)
end; end;
{new_shaper, NewShaper} ->
NewShaper = case NewShaper of
none when Shaper /= none ->
activate(Socket, SocketMode, true), none;
_ ->
NewShaper
end,
ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode, NewShaper);
{tcp_closed, _Socket} -> {tcp_closed, _Socket} ->
?DEBUG("tcp connection was closed, exit", []), ?DEBUG("tcp connection was closed, exit", []),
websocket_close(Socket, WsHandleLoopPid, SocketMode, 0); websocket_close(Socket, WsHandleLoopPid, SocketMode, 0);
@ -212,11 +204,11 @@ ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode, Shaper) ->
{send, Data} -> {send, Data} ->
SocketMode:send(Socket, encode_frame(Data, 1)), SocketMode:send(Socket, encode_frame(Data, 1)),
ws_loop(FrameInfo, Socket, WsHandleLoopPid, ws_loop(FrameInfo, Socket, WsHandleLoopPid,
SocketMode, Shaper); SocketMode);
{ping, Data} -> {ping, Data} ->
SocketMode:send(Socket, encode_frame(Data, 9)), SocketMode:send(Socket, encode_frame(Data, 9)),
ws_loop(FrameInfo, Socket, WsHandleLoopPid, ws_loop(FrameInfo, Socket, WsHandleLoopPid,
SocketMode, Shaper); SocketMode);
shutdown -> shutdown ->
?DEBUG("shutdown request received, closing websocket " ?DEBUG("shutdown request received, closing websocket "
"with pid ~p", "with pid ~p",
@ -226,7 +218,7 @@ ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode, Shaper) ->
?WARNING_MSG("received unexpected message, ignoring: ~p", ?WARNING_MSG("received unexpected message, ignoring: ~p",
[_Ignored]), [_Ignored]),
ws_loop(FrameInfo, Socket, WsHandleLoopPid, ws_loop(FrameInfo, Socket, WsHandleLoopPid,
SocketMode, Shaper) SocketMode)
end. end.
encode_frame(Data, Opcode) -> encode_frame(Data, Opcode) ->
@ -381,17 +373,17 @@ process_frame(#frame_info{unprocessed =
process_frame(FrameInfo#frame_info{unprocessed = <<>>}, process_frame(FrameInfo#frame_info{unprocessed = <<>>},
<<UnprocessedPre/binary, Data/binary>>). <<UnprocessedPre/binary, Data/binary>>).
handle_data(tcp, FrameInfo, Data, Socket, WsHandleLoopPid, fast_tls, Shaper) -> handle_data(tcp, FrameInfo, Data, Socket, WsHandleLoopPid, fast_tls) ->
case fast_tls:recv_data(Socket, Data) of case fast_tls:recv_data(Socket, Data) of
{ok, NewData} -> {ok, NewData} ->
handle_data_int(FrameInfo, NewData, Socket, WsHandleLoopPid, fast_tls, Shaper); handle_data_int(FrameInfo, NewData, Socket, WsHandleLoopPid, fast_tls);
{error, Error} -> {error, Error} ->
{error, Error} {error, Error}
end; end;
handle_data(_, FrameInfo, Data, Socket, WsHandleLoopPid, SockMod, Shaper) -> handle_data(_, FrameInfo, Data, Socket, WsHandleLoopPid, SockMod) ->
handle_data_int(FrameInfo, Data, Socket, WsHandleLoopPid, SockMod, Shaper). handle_data_int(FrameInfo, Data, Socket, WsHandleLoopPid, SockMod).
handle_data_int(FrameInfo, Data, Socket, WsHandleLoopPid, SocketMode, Shaper) -> handle_data_int(FrameInfo, Data, _Socket, WsHandleLoopPid, _SocketMode) ->
{NewFrameInfo, Recv, Send} = process_frame(FrameInfo, Data), {NewFrameInfo, Recv, Send} = process_frame(FrameInfo, Data),
lists:foreach(fun (El) -> lists:foreach(fun (El) ->
case El of case El of
@ -404,7 +396,7 @@ handle_data_int(FrameInfo, Data, Socket, WsHandleLoopPid, SocketMode, Shaper) ->
end end
end, end,
Recv), Recv),
{NewFrameInfo, Send, handle_shaping(Data, Socket, SocketMode, Shaper)}. {NewFrameInfo, Send}.
websocket_close(Socket, WsHandleLoopPid, websocket_close(Socket, WsHandleLoopPid,
SocketMode, CloseCode) when CloseCode > 0 -> SocketMode, CloseCode) when CloseCode > 0 ->
@ -414,27 +406,3 @@ websocket_close(Socket, WsHandleLoopPid,
websocket_close(Socket, WsHandleLoopPid, SocketMode, _CloseCode) -> websocket_close(Socket, WsHandleLoopPid, SocketMode, _CloseCode) ->
WsHandleLoopPid ! closed, WsHandleLoopPid ! closed,
SocketMode:close(Socket). SocketMode:close(Socket).
handle_shaping(_Data, _Socket, _SocketMode, none) ->
none;
handle_shaping(Data, Socket, SocketMode, Shaper) ->
{NewShaper, Pause} = ejabberd_shaper:update(Shaper, byte_size(Data)),
if Pause > 0 ->
activate_after(Socket, self(), Pause);
true -> activate(Socket, SocketMode, once)
end,
NewShaper.
activate(Socket, SockMod, ActiveState) ->
case SockMod of
gen_tcp -> inet:setopts(Socket, [{active, ActiveState}]);
_ -> SockMod:setopts(Socket, [{active, ActiveState}])
end.
activate_after(Socket, Pid, Pause) ->
if Pause > 0 ->
erlang:send_after(Pause, Pid, {tcp, Socket, <<>>});
true ->
Pid ! {tcp, Socket, <<>>}
end,
ok.