25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-12-26 17:38:45 +01:00

Use a different timer for flow control

This commit is contained in:
Evgeniy Khramtsov 2014-06-06 13:52:55 +04:00
parent 9e72529544
commit ddfbca5830

View File

@ -33,8 +33,9 @@
cseq = 0 :: non_neg_integer(), cseq = 0 :: non_neg_integer(),
timestamp = now() :: erlang:timestamp(), timestamp = now() :: erlang:timestamp(),
contact :: {binary(), #uri{}, [{binary(), binary()}]}, contact :: {binary(), #uri{}, [{binary(), binary()}]},
tref = make_ref() :: reference(), flow_tref :: reference(),
mref = make_ref() :: reference(), reg_tref = make_ref() :: reference(),
conn_mref = make_ref() :: reference(),
expires = 0 :: non_neg_integer()}). expires = 0 :: non_neg_integer()}).
-record(state, {}). -record(state, {}).
@ -153,15 +154,8 @@ find_sockets(U, S) ->
[] []
end. end.
ping(#sip_socket{type = Type} = SIPSocket) -> ping(SIPSocket) ->
case mnesia:dirty_index_read(sip_session, SIPSocket, #sip_session.socket) of call({ping, SIPSocket}).
[] when Type == udp ->
error;
[] ->
drop;
[_|_] ->
pong
end.
%%%=================================================================== %%%===================================================================
%%% gen_server callbacks %%% gen_server callbacks
@ -172,7 +166,7 @@ init([]) ->
[{ram_copies, [node()]}, [{ram_copies, [node()]},
{type, bag}, {type, bag},
{attributes, record_info(fields, sip_session)}]), {attributes, record_info(fields, sip_session)}]),
mnesia:add_table_index(sip_session, mref), mnesia:add_table_index(sip_session, conn_mref),
mnesia:add_table_index(sip_session, socket), mnesia:add_table_index(sip_session, socket),
mnesia:add_table_copy(sip_session, node(), ram_copies), mnesia:add_table_copy(sip_session, node(), ram_copies),
{ok, #state{}}. {ok, #state{}}.
@ -183,6 +177,9 @@ handle_call({write, Sessions, Supported}, _From, State) ->
handle_call({delete, US, CallID, CSeq}, _From, State) -> handle_call({delete, US, CallID, CSeq}, _From, State) ->
Res = delete_session(US, CallID, CSeq), Res = delete_session(US, CallID, CSeq),
{reply, Res, State}; {reply, Res, State};
handle_call({ping, SIPSocket}, _From, State) ->
Res = process_ping(SIPSocket),
{reply, Res, State};
handle_call(_Request, _From, State) -> handle_call(_Request, _From, State) ->
Reply = ok, Reply = ok,
{reply, Reply, State}. {reply, Reply, State}.
@ -200,7 +197,7 @@ handle_info({timeout, TRef, US}, State) ->
delete_expired_session(US, TRef), delete_expired_session(US, TRef),
{noreply, State}; {noreply, State};
handle_info({'DOWN', MRef, process, _Pid, _Reason}, State) -> handle_info({'DOWN', MRef, process, _Pid, _Reason}, State) ->
case mnesia:dirty_index_read(sip_session, MRef, #sip_session.mref) of case mnesia:dirty_index_read(sip_session, MRef, #sip_session.conn_mref) of
[Session] -> [Session] ->
mnesia:dirty_delete_object(Session); mnesia:dirty_delete_object(Session);
_ -> _ ->
@ -275,7 +272,8 @@ write_session([#sip_session{us = {U, S} = US}|_] = NewSessions,
{error, too_many_sessions}; {error, too_many_sessions};
true -> true ->
lists:foreach( lists:foreach(
fun(#sip_session{tref = TRef, mref = MRef} = Session) -> fun(#sip_session{reg_tref = TRef,
conn_mref = MRef} = Session) ->
erlang:cancel_timer(TRef), erlang:cancel_timer(TRef),
catch erlang:demonitor(MRef, [flush]), catch erlang:demonitor(MRef, [flush]),
mnesia:dirty_delete_object(Session) mnesia:dirty_delete_object(Session)
@ -311,7 +309,7 @@ delete_session(US, CallID, CSeq) ->
ContactsWithExpires = ContactsWithExpires =
lists:map( lists:map(
fun(#sip_session{contact = Contact, fun(#sip_session{contact = Contact,
tref = TRef}) -> reg_tref = TRef}) ->
erlang:cancel_timer(TRef), erlang:cancel_timer(TRef),
{Contact, 0} {Contact, 0}
end, Sessions), end, Sessions),
@ -327,17 +325,14 @@ delete_session(US, CallID, CSeq) ->
delete_expired_session(US, TRef) -> delete_expired_session(US, TRef) ->
case mnesia:dirty_read(sip_session, US) of case mnesia:dirty_read(sip_session, US) of
[_|_] = Sessions -> [_|_] = Sessions ->
case lists:filter( lists:foreach(
fun(#sip_session{tref = TRef1}) when TRef1 == TRef -> fun(#sip_session{reg_tref = T1,
true; flow_tref = T2} = Session)
(_) -> when T1 == TRef; T2 == TRef ->
false mnesia:dirty_delete_object(Session);
end, Sessions) of (_) ->
[Session|_] -> ok
mnesia:dirty_delete_object(Session); end, Sessions);
[] ->
ok
end;
[] -> [] ->
ok ok
end. end.
@ -502,30 +497,51 @@ update_table() ->
ok ok
end. end.
set_monitor_and_timer(#sip_session{expires = Expires} = Session, set_monitor_and_timer(#sip_session{socket = #sip_socket{type = Type,
_IsOutboundSupported = false) -> pid = Pid} = SIPSock,
set_timer(Session, Expires); conn_mref = MRef,
set_monitor_and_timer(#sip_session{socket = SIPSock,
mref = MRef,
expires = Expires, expires = Expires,
us = {_, LServer}, us = {_, LServer},
contact = {_, _, Params}} = Session, contact = {_, _, Params}} = Session,
_IsOutboundSupported = true) -> IsOutboundSupported) ->
case get_ob_params(Params) of RegTRef = set_timer(Session, Expires),
error -> Session1 = Session#sip_session{reg_tref = RegTRef},
set_timer(Session, Expires); if IsOutboundSupported ->
{_, _} -> case get_ob_params(Params) of
FlowTimeout = get_flow_timeout(LServer, SIPSock), error ->
Timeout = lists:min([FlowTimeout, Expires]), Session1;
NewSession = set_timer(Session, Timeout), {_, _} ->
NewMRef = if SIPSock#sip_socket.type == udp -> FlowTimeout = get_flow_timeout(LServer, SIPSock),
MRef; FlowTRef = set_timer(Session1, FlowTimeout),
true -> NewMRef = if Type == udp -> MRef;
erlang:monitor(process, SIPSock#sip_socket.pid) true -> erlang:monitor(process, Pid)
end, end,
NewSession#sip_session{mref = NewMRef} Session1#sip_session{conn_mref = NewMRef,
flow_tref = FlowTRef}
end;
true ->
Session1
end. end.
set_timer(#sip_session{us = US} = Session, Timeout) -> set_timer(#sip_session{us = US}, Timeout) ->
TRef = erlang:start_timer(Timeout * 1000, self(), US), erlang:start_timer(Timeout * 1000, self(), US).
Session#sip_session{tref = TRef}.
process_ping(SIPSocket) ->
ErrResponse = if SIPSocket#sip_socket.type == udp -> error;
true -> drop
end,
Sessions = mnesia:dirty_index_read(
sip_session, SIPSocket, #sip_session.socket),
lists:foldl(
fun(#sip_session{flow_tref = TRef,
us = {_, LServer}} = Session, _)
when TRef /= undefined ->
erlang:cancel_timer(TRef),
mnesia:dirty_delete_object(Session),
Timeout = get_flow_timeout(LServer, SIPSocket),
NewTRef = set_timer(Session, Timeout),
mnesia:dirty_write(
Session#sip_session{flow_tref = NewTRef});
(_, Acc) ->
Acc
end, ErrResponse, Sessions).