From ddfbca5830a4089d652faf3d2407554d9569007f Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Fri, 6 Jun 2014 13:52:55 +0400 Subject: [PATCH] Use a different timer for flow control --- src/mod_sip_registrar.erl | 112 ++++++++++++++++++++++---------------- 1 file changed, 64 insertions(+), 48 deletions(-) diff --git a/src/mod_sip_registrar.erl b/src/mod_sip_registrar.erl index da2c473c2..dcc761754 100644 --- a/src/mod_sip_registrar.erl +++ b/src/mod_sip_registrar.erl @@ -33,8 +33,9 @@ cseq = 0 :: non_neg_integer(), timestamp = now() :: erlang:timestamp(), contact :: {binary(), #uri{}, [{binary(), binary()}]}, - tref = make_ref() :: reference(), - mref = make_ref() :: reference(), + flow_tref :: reference(), + reg_tref = make_ref() :: reference(), + conn_mref = make_ref() :: reference(), expires = 0 :: non_neg_integer()}). -record(state, {}). @@ -153,15 +154,8 @@ find_sockets(U, S) -> [] end. -ping(#sip_socket{type = Type} = SIPSocket) -> - case mnesia:dirty_index_read(sip_session, SIPSocket, #sip_session.socket) of - [] when Type == udp -> - error; - [] -> - drop; - [_|_] -> - pong - end. +ping(SIPSocket) -> + call({ping, SIPSocket}). %%%=================================================================== %%% gen_server callbacks @@ -172,7 +166,7 @@ init([]) -> [{ram_copies, [node()]}, {type, bag}, {attributes, record_info(fields, sip_session)}]), - mnesia:add_table_index(sip_session, mref), + mnesia:add_table_index(sip_session, conn_mref), mnesia:add_table_index(sip_session, socket), mnesia:add_table_copy(sip_session, node(), ram_copies), {ok, #state{}}. @@ -183,6 +177,9 @@ handle_call({write, Sessions, Supported}, _From, State) -> handle_call({delete, US, CallID, CSeq}, _From, State) -> Res = delete_session(US, CallID, CSeq), {reply, Res, State}; +handle_call({ping, SIPSocket}, _From, State) -> + Res = process_ping(SIPSocket), + {reply, Res, State}; handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. @@ -200,7 +197,7 @@ handle_info({timeout, TRef, US}, State) -> delete_expired_session(US, TRef), {noreply, State}; handle_info({'DOWN', MRef, process, _Pid, _Reason}, State) -> - case mnesia:dirty_index_read(sip_session, MRef, #sip_session.mref) of + case mnesia:dirty_index_read(sip_session, MRef, #sip_session.conn_mref) of [Session] -> mnesia:dirty_delete_object(Session); _ -> @@ -275,7 +272,8 @@ write_session([#sip_session{us = {U, S} = US}|_] = NewSessions, {error, too_many_sessions}; true -> lists:foreach( - fun(#sip_session{tref = TRef, mref = MRef} = Session) -> + fun(#sip_session{reg_tref = TRef, + conn_mref = MRef} = Session) -> erlang:cancel_timer(TRef), catch erlang:demonitor(MRef, [flush]), mnesia:dirty_delete_object(Session) @@ -311,7 +309,7 @@ delete_session(US, CallID, CSeq) -> ContactsWithExpires = lists:map( fun(#sip_session{contact = Contact, - tref = TRef}) -> + reg_tref = TRef}) -> erlang:cancel_timer(TRef), {Contact, 0} end, Sessions), @@ -327,17 +325,14 @@ delete_session(US, CallID, CSeq) -> delete_expired_session(US, TRef) -> case mnesia:dirty_read(sip_session, US) of [_|_] = Sessions -> - case lists:filter( - fun(#sip_session{tref = TRef1}) when TRef1 == TRef -> - true; - (_) -> - false - end, Sessions) of - [Session|_] -> - mnesia:dirty_delete_object(Session); - [] -> - ok - end; + lists:foreach( + fun(#sip_session{reg_tref = T1, + flow_tref = T2} = Session) + when T1 == TRef; T2 == TRef -> + mnesia:dirty_delete_object(Session); + (_) -> + ok + end, Sessions); [] -> ok end. @@ -502,30 +497,51 @@ update_table() -> ok end. -set_monitor_and_timer(#sip_session{expires = Expires} = Session, - _IsOutboundSupported = false) -> - set_timer(Session, Expires); -set_monitor_and_timer(#sip_session{socket = SIPSock, - mref = MRef, +set_monitor_and_timer(#sip_session{socket = #sip_socket{type = Type, + pid = Pid} = SIPSock, + conn_mref = MRef, expires = Expires, us = {_, LServer}, contact = {_, _, Params}} = Session, - _IsOutboundSupported = true) -> - case get_ob_params(Params) of - error -> - set_timer(Session, Expires); - {_, _} -> - FlowTimeout = get_flow_timeout(LServer, SIPSock), - Timeout = lists:min([FlowTimeout, Expires]), - NewSession = set_timer(Session, Timeout), - NewMRef = if SIPSock#sip_socket.type == udp -> - MRef; - true -> - erlang:monitor(process, SIPSock#sip_socket.pid) - end, - NewSession#sip_session{mref = NewMRef} + IsOutboundSupported) -> + RegTRef = set_timer(Session, Expires), + Session1 = Session#sip_session{reg_tref = RegTRef}, + if IsOutboundSupported -> + case get_ob_params(Params) of + error -> + Session1; + {_, _} -> + FlowTimeout = get_flow_timeout(LServer, SIPSock), + FlowTRef = set_timer(Session1, FlowTimeout), + NewMRef = if Type == udp -> MRef; + true -> erlang:monitor(process, Pid) + end, + Session1#sip_session{conn_mref = NewMRef, + flow_tref = FlowTRef} + end; + true -> + Session1 end. -set_timer(#sip_session{us = US} = Session, Timeout) -> - TRef = erlang:start_timer(Timeout * 1000, self(), US), - Session#sip_session{tref = TRef}. +set_timer(#sip_session{us = US}, Timeout) -> + erlang:start_timer(Timeout * 1000, self(), US). + +process_ping(SIPSocket) -> + ErrResponse = if SIPSocket#sip_socket.type == udp -> error; + true -> drop + end, + Sessions = mnesia:dirty_index_read( + sip_session, SIPSocket, #sip_session.socket), + lists:foldl( + fun(#sip_session{flow_tref = TRef, + us = {_, LServer}} = Session, _) + when TRef /= undefined -> + erlang:cancel_timer(TRef), + mnesia:dirty_delete_object(Session), + Timeout = get_flow_timeout(LServer, SIPSocket), + NewTRef = set_timer(Session, Timeout), + mnesia:dirty_write( + Session#sip_session{flow_tref = NewTRef}); + (_, Acc) -> + Acc + end, ErrResponse, Sessions).