mirror of
https://github.com/processone/ejabberd.git
synced 2024-11-24 16:23:40 +01:00
Apply shaping to websocket connections
This commit is contained in:
parent
35576b4608
commit
cd88d342b9
@ -104,9 +104,8 @@ close({http_ws, FsmRef, _IP}) ->
|
|||||||
reset_stream({http_ws, _FsmRef, _IP} = Socket) ->
|
reset_stream({http_ws, _FsmRef, _IP} = Socket) ->
|
||||||
Socket.
|
Socket.
|
||||||
|
|
||||||
change_shaper({http_ws, _FsmRef, _IP}, _Shaper) ->
|
change_shaper({http_ws, FsmRef, _IP}, Shaper) ->
|
||||||
%% TODO???
|
p1_fsm:send_all_state_event(FsmRef, {new_shaper, Shaper}).
|
||||||
ok.
|
|
||||||
|
|
||||||
get_transport(_Socket) ->
|
get_transport(_Socket) ->
|
||||||
websocket.
|
websocket.
|
||||||
@ -161,7 +160,10 @@ handle_event({activate, From}, StateName, State) ->
|
|||||||
end, Input),
|
end, Input),
|
||||||
State#state{active = false, input = []}
|
State#state{active = false, input = []}
|
||||||
end,
|
end,
|
||||||
{next_state, StateName, State1#state{c2s_pid = From}}.
|
{next_state, StateName, State1#state{c2s_pid = From}};
|
||||||
|
handle_event({new_shaper, Shaper}, StateName, #state{ws = {_, WsPid}} = StateData) ->
|
||||||
|
WsPid ! {new_shaper, Shaper},
|
||||||
|
{next_state, StateName, StateData}.
|
||||||
|
|
||||||
handle_sync_event({send_xml, Packet}, _From, StateName,
|
handle_sync_event({send_xml, Packet}, _From, StateName,
|
||||||
#state{ws = {_, WsPid}, rfc_compilant = R} = StateData) ->
|
#state{ws = {_, WsPid}, rfc_compilant = R} = StateData) ->
|
||||||
|
@ -154,7 +154,7 @@ connect(#ws{socket = Socket, sockmod = SockMod} = Ws, WsLoop) ->
|
|||||||
_ ->
|
_ ->
|
||||||
SockMod:setopts(Socket, [{packet, 0}, {active, true}])
|
SockMod:setopts(Socket, [{packet, 0}, {active, true}])
|
||||||
end,
|
end,
|
||||||
ws_loop(none, Socket, WsHandleLoopPid, SockMod).
|
ws_loop(none, Socket, WsHandleLoopPid, SockMod, none).
|
||||||
|
|
||||||
handshake(#ws{headers = Headers} = State) ->
|
handshake(#ws{headers = Headers} = State) ->
|
||||||
{_, Key} = lists:keyfind(<<"Sec-Websocket-Key">>, 1,
|
{_, Key} = lists:keyfind(<<"Sec-Websocket-Key">>, 1,
|
||||||
@ -187,18 +187,26 @@ find_subprotocol(Headers) ->
|
|||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode) ->
|
ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode, Shaper) ->
|
||||||
receive
|
receive
|
||||||
{DataType, _Socket, Data} when DataType =:= tcp orelse DataType =:= raw ->
|
{DataType, _Socket, Data} when DataType =:= tcp orelse DataType =:= raw ->
|
||||||
case handle_data(DataType, FrameInfo, Data, Socket, WsHandleLoopPid, SocketMode) of
|
case handle_data(DataType, FrameInfo, Data, Socket, WsHandleLoopPid, SocketMode, Shaper) of
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
?DEBUG("TLS decode error ~p", [Error]),
|
?DEBUG("TLS decode error ~p", [Error]),
|
||||||
websocket_close(Socket, WsHandleLoopPid, SocketMode, 1002); % protocol error
|
websocket_close(Socket, WsHandleLoopPid, SocketMode, 1002); % protocol error
|
||||||
{NewFrameInfo, ToSend} ->
|
{NewFrameInfo, ToSend, NewShaper} ->
|
||||||
lists:foreach(fun(Pkt) -> SocketMode:send(Socket, Pkt)
|
lists:foreach(fun(Pkt) -> SocketMode:send(Socket, Pkt)
|
||||||
end, ToSend),
|
end, ToSend),
|
||||||
ws_loop(NewFrameInfo, Socket, WsHandleLoopPid, SocketMode)
|
ws_loop(NewFrameInfo, Socket, WsHandleLoopPid, SocketMode, NewShaper)
|
||||||
end;
|
end;
|
||||||
|
{new_shaper, NewShaper} ->
|
||||||
|
NewShaper = case NewShaper of
|
||||||
|
none when Shaper /= none ->
|
||||||
|
activate(Socket, SocketMode, true), none;
|
||||||
|
_ ->
|
||||||
|
NewShaper
|
||||||
|
end,
|
||||||
|
ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode, NewShaper);
|
||||||
{tcp_closed, _Socket} ->
|
{tcp_closed, _Socket} ->
|
||||||
?DEBUG("TCP connection was closed, exit", []),
|
?DEBUG("TCP connection was closed, exit", []),
|
||||||
websocket_close(Socket, WsHandleLoopPid, SocketMode, 0);
|
websocket_close(Socket, WsHandleLoopPid, SocketMode, 0);
|
||||||
@ -220,15 +228,15 @@ ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode) ->
|
|||||||
{text, Data} ->
|
{text, Data} ->
|
||||||
SocketMode:send(Socket, encode_frame(Data, 1)),
|
SocketMode:send(Socket, encode_frame(Data, 1)),
|
||||||
ws_loop(FrameInfo, Socket, WsHandleLoopPid,
|
ws_loop(FrameInfo, Socket, WsHandleLoopPid,
|
||||||
SocketMode);
|
SocketMode, Shaper);
|
||||||
{data, Data} ->
|
{data, Data} ->
|
||||||
SocketMode:send(Socket, encode_frame(Data, 2)),
|
SocketMode:send(Socket, encode_frame(Data, 2)),
|
||||||
ws_loop(FrameInfo, Socket, WsHandleLoopPid,
|
ws_loop(FrameInfo, Socket, WsHandleLoopPid,
|
||||||
SocketMode);
|
SocketMode, Shaper);
|
||||||
{ping, Data} ->
|
{ping, Data} ->
|
||||||
SocketMode:send(Socket, encode_frame(Data, 9)),
|
SocketMode:send(Socket, encode_frame(Data, 9)),
|
||||||
ws_loop(FrameInfo, Socket, WsHandleLoopPid,
|
ws_loop(FrameInfo, Socket, WsHandleLoopPid,
|
||||||
SocketMode);
|
SocketMode, Shaper);
|
||||||
shutdown ->
|
shutdown ->
|
||||||
?DEBUG("Shutdown request received, closing websocket "
|
?DEBUG("Shutdown request received, closing websocket "
|
||||||
"with pid ~p",
|
"with pid ~p",
|
||||||
@ -238,7 +246,7 @@ ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode) ->
|
|||||||
?WARNING_MSG("Received unexpected message, ignoring: ~p",
|
?WARNING_MSG("Received unexpected message, ignoring: ~p",
|
||||||
[_Ignored]),
|
[_Ignored]),
|
||||||
ws_loop(FrameInfo, Socket, WsHandleLoopPid,
|
ws_loop(FrameInfo, Socket, WsHandleLoopPid,
|
||||||
SocketMode)
|
SocketMode, Shaper)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
encode_frame(Data, Opcode) ->
|
encode_frame(Data, Opcode) ->
|
||||||
@ -393,17 +401,17 @@ process_frame(#frame_info{unprocessed =
|
|||||||
process_frame(FrameInfo#frame_info{unprocessed = <<>>},
|
process_frame(FrameInfo#frame_info{unprocessed = <<>>},
|
||||||
<<UnprocessedPre/binary, Data/binary>>).
|
<<UnprocessedPre/binary, Data/binary>>).
|
||||||
|
|
||||||
handle_data(tcp, FrameInfo, Data, Socket, WsHandleLoopPid, fast_tls) ->
|
handle_data(tcp, FrameInfo, Data, Socket, WsHandleLoopPid, fast_tls, Shaper) ->
|
||||||
case fast_tls:recv_data(Socket, Data) of
|
case fast_tls:recv_data(Socket, Data) of
|
||||||
{ok, NewData} ->
|
{ok, NewData} ->
|
||||||
handle_data_int(FrameInfo, NewData, Socket, WsHandleLoopPid, fast_tls);
|
handle_data_int(FrameInfo, NewData, Socket, WsHandleLoopPid, fast_tls, Shaper);
|
||||||
{error, Error} ->
|
{error, Error} ->
|
||||||
{error, Error}
|
{error, Error}
|
||||||
end;
|
end;
|
||||||
handle_data(_, FrameInfo, Data, Socket, WsHandleLoopPid, SockMod) ->
|
handle_data(_, FrameInfo, Data, Socket, WsHandleLoopPid, SockMod, Shaper) ->
|
||||||
handle_data_int(FrameInfo, Data, Socket, WsHandleLoopPid, SockMod).
|
handle_data_int(FrameInfo, Data, Socket, WsHandleLoopPid, SockMod, Shaper).
|
||||||
|
|
||||||
handle_data_int(FrameInfo, Data, _Socket, WsHandleLoopPid, _SocketMode) ->
|
handle_data_int(FrameInfo, Data, Socket, WsHandleLoopPid, SocketMode, Shaper) ->
|
||||||
{NewFrameInfo, Recv, Send} = process_frame(FrameInfo, Data),
|
{NewFrameInfo, Recv, Send} = process_frame(FrameInfo, Data),
|
||||||
lists:foreach(fun (El) ->
|
lists:foreach(fun (El) ->
|
||||||
case El of
|
case El of
|
||||||
@ -416,7 +424,7 @@ handle_data_int(FrameInfo, Data, _Socket, WsHandleLoopPid, _SocketMode) ->
|
|||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
Recv),
|
Recv),
|
||||||
{NewFrameInfo, Send}.
|
{NewFrameInfo, Send, handle_shaping(Data, Socket, SocketMode, Shaper)}.
|
||||||
|
|
||||||
websocket_close(Socket, WsHandleLoopPid,
|
websocket_close(Socket, WsHandleLoopPid,
|
||||||
SocketMode, CloseCode) when CloseCode > 0 ->
|
SocketMode, CloseCode) when CloseCode > 0 ->
|
||||||
@ -429,3 +437,27 @@ websocket_close(Socket, WsHandleLoopPid, SocketMode, _CloseCode) ->
|
|||||||
|
|
||||||
get_origin() ->
|
get_origin() ->
|
||||||
ejabberd_option:websocket_origin().
|
ejabberd_option:websocket_origin().
|
||||||
|
|
||||||
|
handle_shaping(_Data, _Socket, _SocketMode, none) ->
|
||||||
|
none;
|
||||||
|
handle_shaping(Data, Socket, SocketMode, Shaper) ->
|
||||||
|
{NewShaper, Pause} = ejabberd_shaper:update(Shaper, byte_size(Data)),
|
||||||
|
if Pause > 0 ->
|
||||||
|
activate_after(Socket, self(), Pause);
|
||||||
|
true -> activate(Socket, SocketMode, once)
|
||||||
|
end,
|
||||||
|
NewShaper.
|
||||||
|
|
||||||
|
activate(Socket, SockMod, ActiveState) ->
|
||||||
|
case SockMod of
|
||||||
|
gen_tcp -> inet:setopts(Socket, [{active, ActiveState}]);
|
||||||
|
_ -> SockMod:setopts(Socket, [{active, ActiveState}])
|
||||||
|
end.
|
||||||
|
|
||||||
|
activate_after(Socket, Pid, Pause) ->
|
||||||
|
if Pause > 0 ->
|
||||||
|
erlang:send_after(Pause, Pid, {tcp, Socket, <<>>});
|
||||||
|
true ->
|
||||||
|
Pid ! {tcp, Socket, <<>>}
|
||||||
|
end,
|
||||||
|
ok.
|
||||||
|
Loading…
Reference in New Issue
Block a user