From dd57950103f960da32fc8988835af5209f47c78a Mon Sep 17 00:00:00 2001 From: Frank Diebolt Date: Tue, 29 Jan 2019 18:13:28 +0100 Subject: [PATCH 1/5] Fix ping IQ reply/timeout processing ("mod_ping" regression since 17.x that may cause resources leakage) --- src/mod_ping.erl | 41 +++++++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/src/mod_ping.erl b/src/mod_ping.erl index ffdee2f01..25d2b60ed 100644 --- a/src/mod_ping.erl +++ b/src/mod_ping.erl @@ -122,30 +122,31 @@ handle_cast({start_ping, JID}, State) -> handle_cast({stop_ping, JID}, State) -> Timers = del_timer(JID, State#state.timers), {noreply, State#state{timers = Timers}}; -handle_cast({iq_reply, timeout, JID}, State) -> - ejabberd_hooks:run(user_ping_timeout, State#state.host, - [JID]), - Timers = case State#state.timeout_action of - kill -> - #jid{user = User, server = Server, - resource = Resource} = - JID, - case ejabberd_sm:get_session_pid(User, Server, Resource) - of - Pid when is_pid(Pid) -> ejabberd_c2s:close(Pid, ping_timeout); - _ -> ok - end, - del_timer(JID, State#state.timers); - _ -> - State#state.timers - end, - {noreply, State#state{timers = Timers}}; -handle_cast({iq_reply, #iq{}, _JID}, State) -> - {noreply, State}; handle_cast(Msg, State) -> ?WARNING_MSG("unexpected cast: ~p", [Msg]), {noreply, State}. +handle_info({iq_reply, #iq{type = error}, JID}, State) -> + handle_info({iq_reply, timeout, JID}, State); +handle_info({iq_reply, #iq{}, _JID}, State) -> + {noreply, State}; +handle_info({iq_reply, timeout, JID}, State) -> + Timers = del_timer(JID, State#state.timers), + ejabberd_hooks:run(user_ping_timeout, State#state.host, + [JID]), + case State#state.timeout_action of + kill -> + #jid{user = User, server = Server, + resource = Resource} = + JID, + case ejabberd_sm:get_session_pid(User, Server, Resource) + of + Pid when is_pid(Pid) -> ejabberd_c2s:close(Pid, ping_timeout); + _ -> ok + end; + _ -> ok + end, + {noreply, State#state{timers = Timers}}; handle_info({timeout, _TRef, {ping, JID}}, State) -> Host = State#state.host, From = jid:remove_resource(JID), From dda89aeda070d03d9cb19dcdbba8b6ad033b2d45 Mon Sep 17 00:00:00 2001 From: Frank Diebolt Date: Wed, 30 Jan 2019 09:57:17 +0100 Subject: [PATCH 2/5] Apply shaping to websocket connections --- src/ejabberd_http_ws.erl | 10 ++++--- src/ejabberd_websocket.erl | 60 +++++++++++++++++++++++++++++--------- 2 files changed, 52 insertions(+), 18 deletions(-) diff --git a/src/ejabberd_http_ws.erl b/src/ejabberd_http_ws.erl index 4b54e67ec..79406c672 100644 --- a/src/ejabberd_http_ws.erl +++ b/src/ejabberd_http_ws.erl @@ -108,9 +108,8 @@ close({http_ws, FsmRef, _IP}) -> reset_stream({http_ws, _FsmRef, _IP} = Socket) -> Socket. -change_shaper({http_ws, _FsmRef, _IP}, _Shaper) -> - %% TODO??? - ok. +change_shaper({http_ws, FsmRef, _IP}, Shaper) -> + p1_fsm:send_all_state_event(FsmRef, {new_shaper, Shaper}). get_transport(_Socket) -> websocket. @@ -169,7 +168,10 @@ handle_event({activate, From}, StateName, State) -> end, Input), State#state{active = false, input = []} end, - {next_state, StateName, State1#state{c2s_pid = From}}. + {next_state, StateName, State1#state{c2s_pid = From}}; +handle_event({new_shaper, Shaper}, StateName, #state{ws = {_, WsPid}} = StateData) -> + WsPid ! {new_shaper, Shaper}, + {next_state, StateName, StateData}. handle_sync_event({send_xml, Packet}, _From, StateName, #state{ws = {_, WsPid}, rfc_compilant = R} = StateData) -> diff --git a/src/ejabberd_websocket.erl b/src/ejabberd_websocket.erl index 506ff142b..04e7f916a 100644 --- a/src/ejabberd_websocket.erl +++ b/src/ejabberd_websocket.erl @@ -141,7 +141,7 @@ connect(#ws{socket = Socket, sockmod = SockMod} = Ws, WsLoop) -> _ -> SockMod:setopts(Socket, [{packet, 0}, {active, true}]) end, - ws_loop(none, Socket, WsHandleLoopPid, SockMod). + ws_loop(none, Socket, WsHandleLoopPid, SockMod, none). handshake(#ws{headers = Headers} = State) -> {_, Key} = lists:keyfind(<<"Sec-Websocket-Key">>, 1, @@ -174,18 +174,26 @@ find_subprotocol(Headers) -> end. -ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode) -> +ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode, Shaper) -> receive {DataType, _Socket, Data} when DataType =:= tcp orelse DataType =:= raw -> - case handle_data(DataType, FrameInfo, Data, Socket, WsHandleLoopPid, SocketMode) of + case handle_data(DataType, FrameInfo, Data, Socket, WsHandleLoopPid, SocketMode, Shaper) of {error, Error} -> ?DEBUG("tls decode error ~p", [Error]), websocket_close(Socket, WsHandleLoopPid, SocketMode, 1002); % protocol error - {NewFrameInfo, ToSend} -> + {NewFrameInfo, ToSend, NewShaper} -> lists:foreach(fun(Pkt) -> SocketMode:send(Socket, Pkt) end, ToSend), - ws_loop(NewFrameInfo, Socket, WsHandleLoopPid, SocketMode) + ws_loop(NewFrameInfo, Socket, WsHandleLoopPid, SocketMode, NewShaper) end; + {new_shaper, NewShaper} -> + NewShaper = case NewShaper of + none when Shaper /= none -> + activate(Socket, SocketMode, true), none; + _ -> + NewShaper + end, + ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode, NewShaper); {tcp_closed, _Socket} -> ?DEBUG("tcp connection was closed, exit", []), websocket_close(Socket, WsHandleLoopPid, SocketMode, 0); @@ -204,11 +212,11 @@ ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode) -> {send, Data} -> SocketMode:send(Socket, encode_frame(Data, 1)), ws_loop(FrameInfo, Socket, WsHandleLoopPid, - SocketMode); + SocketMode, Shaper); {ping, Data} -> SocketMode:send(Socket, encode_frame(Data, 9)), ws_loop(FrameInfo, Socket, WsHandleLoopPid, - SocketMode); + SocketMode, Shaper); shutdown -> ?DEBUG("shutdown request received, closing websocket " "with pid ~p", @@ -218,7 +226,7 @@ ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode) -> ?WARNING_MSG("received unexpected message, ignoring: ~p", [_Ignored]), ws_loop(FrameInfo, Socket, WsHandleLoopPid, - SocketMode) + SocketMode, Shaper) end. encode_frame(Data, Opcode) -> @@ -373,17 +381,17 @@ process_frame(#frame_info{unprocessed = process_frame(FrameInfo#frame_info{unprocessed = <<>>}, <>). -handle_data(tcp, FrameInfo, Data, Socket, WsHandleLoopPid, fast_tls) -> +handle_data(tcp, FrameInfo, Data, Socket, WsHandleLoopPid, fast_tls, Shaper) -> case fast_tls:recv_data(Socket, Data) of {ok, NewData} -> - handle_data_int(FrameInfo, NewData, Socket, WsHandleLoopPid, fast_tls); + handle_data_int(FrameInfo, NewData, Socket, WsHandleLoopPid, fast_tls, Shaper); {error, Error} -> {error, Error} end; -handle_data(_, FrameInfo, Data, Socket, WsHandleLoopPid, SockMod) -> - handle_data_int(FrameInfo, Data, Socket, WsHandleLoopPid, SockMod). +handle_data(_, FrameInfo, Data, Socket, WsHandleLoopPid, SockMod, Shaper) -> + handle_data_int(FrameInfo, Data, Socket, WsHandleLoopPid, SockMod, Shaper). -handle_data_int(FrameInfo, Data, _Socket, WsHandleLoopPid, _SocketMode) -> +handle_data_int(FrameInfo, Data, Socket, WsHandleLoopPid, SocketMode, Shaper) -> {NewFrameInfo, Recv, Send} = process_frame(FrameInfo, Data), lists:foreach(fun (El) -> case El of @@ -396,7 +404,7 @@ handle_data_int(FrameInfo, Data, _Socket, WsHandleLoopPid, _SocketMode) -> end end, Recv), - {NewFrameInfo, Send}. + {NewFrameInfo, Send, handle_shaping(Data, Socket, SocketMode, Shaper)}. websocket_close(Socket, WsHandleLoopPid, SocketMode, CloseCode) when CloseCode > 0 -> @@ -406,3 +414,27 @@ websocket_close(Socket, WsHandleLoopPid, websocket_close(Socket, WsHandleLoopPid, SocketMode, _CloseCode) -> WsHandleLoopPid ! closed, SocketMode:close(Socket). + +handle_shaping(_Data, _Socket, _SocketMode, none) -> + none; +handle_shaping(Data, Socket, SocketMode, Shaper) -> + {NewShaper, Pause} = ejabberd_shaper:update(Shaper, byte_size(Data)), + if Pause > 0 -> + activate_after(Socket, self(), Pause); + true -> activate(Socket, SocketMode, once) + end, + NewShaper. + +activate(Socket, SockMod, ActiveState) -> + case SockMod of + gen_tcp -> inet:setopts(Socket, [{active, ActiveState}]); + _ -> SockMod:setopts(Socket, [{active, ActiveState}]) + end. + +activate_after(Socket, Pid, Pause) -> + if Pause > 0 -> + erlang:send_after(Pause, Pid, {tcp, Socket, <<>>}); + true -> + Pid ! {tcp, Socket, <<>>} + end, + ok. From 7df03ef56f7090c1a50bdd3ef197abb457b2dff3 Mon Sep 17 00:00:00 2001 From: Frank Diebolt Date: Wed, 30 Jan 2019 10:37:26 +0100 Subject: [PATCH 3/5] Revert "Apply shaping to websocket connections" This reverts commit dda89aeda070d03d9cb19dcdbba8b6ad033b2d45. --- src/ejabberd_http_ws.erl | 10 +++---- src/ejabberd_websocket.erl | 60 +++++++++----------------------------- 2 files changed, 18 insertions(+), 52 deletions(-) diff --git a/src/ejabberd_http_ws.erl b/src/ejabberd_http_ws.erl index 79406c672..4b54e67ec 100644 --- a/src/ejabberd_http_ws.erl +++ b/src/ejabberd_http_ws.erl @@ -108,8 +108,9 @@ close({http_ws, FsmRef, _IP}) -> reset_stream({http_ws, _FsmRef, _IP} = Socket) -> Socket. -change_shaper({http_ws, FsmRef, _IP}, Shaper) -> - p1_fsm:send_all_state_event(FsmRef, {new_shaper, Shaper}). +change_shaper({http_ws, _FsmRef, _IP}, _Shaper) -> + %% TODO??? + ok. get_transport(_Socket) -> websocket. @@ -168,10 +169,7 @@ handle_event({activate, From}, StateName, State) -> end, Input), State#state{active = false, input = []} end, - {next_state, StateName, State1#state{c2s_pid = From}}; -handle_event({new_shaper, Shaper}, StateName, #state{ws = {_, WsPid}} = StateData) -> - WsPid ! {new_shaper, Shaper}, - {next_state, StateName, StateData}. + {next_state, StateName, State1#state{c2s_pid = From}}. handle_sync_event({send_xml, Packet}, _From, StateName, #state{ws = {_, WsPid}, rfc_compilant = R} = StateData) -> diff --git a/src/ejabberd_websocket.erl b/src/ejabberd_websocket.erl index 04e7f916a..506ff142b 100644 --- a/src/ejabberd_websocket.erl +++ b/src/ejabberd_websocket.erl @@ -141,7 +141,7 @@ connect(#ws{socket = Socket, sockmod = SockMod} = Ws, WsLoop) -> _ -> SockMod:setopts(Socket, [{packet, 0}, {active, true}]) end, - ws_loop(none, Socket, WsHandleLoopPid, SockMod, none). + ws_loop(none, Socket, WsHandleLoopPid, SockMod). handshake(#ws{headers = Headers} = State) -> {_, Key} = lists:keyfind(<<"Sec-Websocket-Key">>, 1, @@ -174,26 +174,18 @@ find_subprotocol(Headers) -> end. -ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode, Shaper) -> +ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode) -> receive {DataType, _Socket, Data} when DataType =:= tcp orelse DataType =:= raw -> - case handle_data(DataType, FrameInfo, Data, Socket, WsHandleLoopPid, SocketMode, Shaper) of + case handle_data(DataType, FrameInfo, Data, Socket, WsHandleLoopPid, SocketMode) of {error, Error} -> ?DEBUG("tls decode error ~p", [Error]), websocket_close(Socket, WsHandleLoopPid, SocketMode, 1002); % protocol error - {NewFrameInfo, ToSend, NewShaper} -> + {NewFrameInfo, ToSend} -> lists:foreach(fun(Pkt) -> SocketMode:send(Socket, Pkt) end, ToSend), - ws_loop(NewFrameInfo, Socket, WsHandleLoopPid, SocketMode, NewShaper) + ws_loop(NewFrameInfo, Socket, WsHandleLoopPid, SocketMode) end; - {new_shaper, NewShaper} -> - NewShaper = case NewShaper of - none when Shaper /= none -> - activate(Socket, SocketMode, true), none; - _ -> - NewShaper - end, - ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode, NewShaper); {tcp_closed, _Socket} -> ?DEBUG("tcp connection was closed, exit", []), websocket_close(Socket, WsHandleLoopPid, SocketMode, 0); @@ -212,11 +204,11 @@ ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode, Shaper) -> {send, Data} -> SocketMode:send(Socket, encode_frame(Data, 1)), ws_loop(FrameInfo, Socket, WsHandleLoopPid, - SocketMode, Shaper); + SocketMode); {ping, Data} -> SocketMode:send(Socket, encode_frame(Data, 9)), ws_loop(FrameInfo, Socket, WsHandleLoopPid, - SocketMode, Shaper); + SocketMode); shutdown -> ?DEBUG("shutdown request received, closing websocket " "with pid ~p", @@ -226,7 +218,7 @@ ws_loop(FrameInfo, Socket, WsHandleLoopPid, SocketMode, Shaper) -> ?WARNING_MSG("received unexpected message, ignoring: ~p", [_Ignored]), ws_loop(FrameInfo, Socket, WsHandleLoopPid, - SocketMode, Shaper) + SocketMode) end. encode_frame(Data, Opcode) -> @@ -381,17 +373,17 @@ process_frame(#frame_info{unprocessed = process_frame(FrameInfo#frame_info{unprocessed = <<>>}, <>). -handle_data(tcp, FrameInfo, Data, Socket, WsHandleLoopPid, fast_tls, Shaper) -> +handle_data(tcp, FrameInfo, Data, Socket, WsHandleLoopPid, fast_tls) -> case fast_tls:recv_data(Socket, Data) of {ok, NewData} -> - handle_data_int(FrameInfo, NewData, Socket, WsHandleLoopPid, fast_tls, Shaper); + handle_data_int(FrameInfo, NewData, Socket, WsHandleLoopPid, fast_tls); {error, Error} -> {error, Error} end; -handle_data(_, FrameInfo, Data, Socket, WsHandleLoopPid, SockMod, Shaper) -> - handle_data_int(FrameInfo, Data, Socket, WsHandleLoopPid, SockMod, Shaper). +handle_data(_, FrameInfo, Data, Socket, WsHandleLoopPid, SockMod) -> + handle_data_int(FrameInfo, Data, Socket, WsHandleLoopPid, SockMod). -handle_data_int(FrameInfo, Data, Socket, WsHandleLoopPid, SocketMode, Shaper) -> +handle_data_int(FrameInfo, Data, _Socket, WsHandleLoopPid, _SocketMode) -> {NewFrameInfo, Recv, Send} = process_frame(FrameInfo, Data), lists:foreach(fun (El) -> case El of @@ -404,7 +396,7 @@ handle_data_int(FrameInfo, Data, Socket, WsHandleLoopPid, SocketMode, Shaper) -> end end, Recv), - {NewFrameInfo, Send, handle_shaping(Data, Socket, SocketMode, Shaper)}. + {NewFrameInfo, Send}. websocket_close(Socket, WsHandleLoopPid, SocketMode, CloseCode) when CloseCode > 0 -> @@ -414,27 +406,3 @@ websocket_close(Socket, WsHandleLoopPid, websocket_close(Socket, WsHandleLoopPid, SocketMode, _CloseCode) -> WsHandleLoopPid ! closed, SocketMode:close(Socket). - -handle_shaping(_Data, _Socket, _SocketMode, none) -> - none; -handle_shaping(Data, Socket, SocketMode, Shaper) -> - {NewShaper, Pause} = ejabberd_shaper:update(Shaper, byte_size(Data)), - if Pause > 0 -> - activate_after(Socket, self(), Pause); - true -> activate(Socket, SocketMode, once) - end, - NewShaper. - -activate(Socket, SockMod, ActiveState) -> - case SockMod of - gen_tcp -> inet:setopts(Socket, [{active, ActiveState}]); - _ -> SockMod:setopts(Socket, [{active, ActiveState}]) - end. - -activate_after(Socket, Pid, Pause) -> - if Pause > 0 -> - erlang:send_after(Pause, Pid, {tcp, Socket, <<>>}); - true -> - Pid ! {tcp, Socket, <<>>} - end, - ok. From 9b66894dda6e8c33220c5c616ec74d61f1a6cb08 Mon Sep 17 00:00:00 2001 From: Frank Diebolt Date: Thu, 31 Jan 2019 17:44:11 +0100 Subject: [PATCH 4/5] ! may raise exceptions on invalid proc --- src/ejabberd_iq.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ejabberd_iq.erl b/src/ejabberd_iq.erl index 8c731e0a1..fd24aec4b 100644 --- a/src/ejabberd_iq.erl +++ b/src/ejabberd_iq.erl @@ -173,4 +173,4 @@ calc_checksum(Data) -> callback(undefined, IQRes, Fun) -> Fun(IQRes); callback(Proc, IQRes, Ctx) -> - Proc ! {iq_reply, IQRes, Ctx}. + catch Proc ! {iq_reply, IQRes, Ctx}. From 4ff4711d4f3598093d9a7c6132c6c9188b3f8570 Mon Sep 17 00:00:00 2001 From: Frank Diebolt Date: Thu, 31 Jan 2019 18:14:36 +0100 Subject: [PATCH 5/5] catch badarg exceptions on invalid callback procs --- src/ejabberd_iq.erl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/ejabberd_iq.erl b/src/ejabberd_iq.erl index fd24aec4b..aeaffccde 100644 --- a/src/ejabberd_iq.erl +++ b/src/ejabberd_iq.erl @@ -173,4 +173,8 @@ calc_checksum(Data) -> callback(undefined, IQRes, Fun) -> Fun(IQRes); callback(Proc, IQRes, Ctx) -> - catch Proc ! {iq_reply, IQRes, Ctx}. + try + Proc ! {iq_reply, IQRes, Ctx} + catch _:badarg -> + ok + end.