From 9e7252954459a9b705f1a4b1da1c78ab6ae8d213 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Fri, 6 Jun 2014 09:32:07 +0400 Subject: [PATCH] SIP Outbound (RFC 5626) support --- src/mod_sip.erl | 20 ++- src/mod_sip_registrar.erl | 337 +++++++++++++++++++++++++++----------- 2 files changed, 254 insertions(+), 103 deletions(-) diff --git a/src/mod_sip.erl b/src/mod_sip.erl index 8f7dba9cb..fd36fb5ac 100644 --- a/src/mod_sip.erl +++ b/src/mod_sip.erl @@ -68,8 +68,8 @@ message_in(#sip{type = request, method = M} = Req, SIPSock) Action -> request(Req, SIPSock, undefined, Action) end; -message_in(ping, _SIPSock) -> - pong; +message_in(ping, SIPSock) -> + mod_sip_registrar:ping(SIPSock); message_in(_, _) -> ok. @@ -162,8 +162,9 @@ action(#sip{method = <<"REGISTER">>, type = request, hdrs = Hdrs, uri = #uri{user = <<"">>} = URI} = Req, SIPSock) -> case at_my_host(URI) of true -> - case esip:get_hdrs('require', Hdrs) of - [_|_] = Require -> + Require = esip:get_hdrs('require', Hdrs) -- supported(), + case Require of + [_|_] -> {unsupported, Require}; _ -> {_, ToURI, _} = esip:get_hdr('to', Hdrs), @@ -189,8 +190,9 @@ action(#sip{method = Method, hdrs = Hdrs, type = request} = Req, SIPSock) -> 0 -> loop; _ -> - case esip:get_hdrs('proxy-require', Hdrs) of - [_|_] = Require -> + Require = esip:get_hdrs('proxy-require', Hdrs) -- supported(), + case Require of + [_|_] -> {unsupported, Require}; _ -> {_, ToURI, _} = esip:get_hdr('to', Hdrs), @@ -253,9 +255,13 @@ check_auth(#sip{method = Method, hdrs = Hdrs, body = Body}, AuthHdr, _SIPSock) - allow() -> [<<"OPTIONS">>, <<"REGISTER">>]. +supported() -> + [<<"path">>, <<"outbound">>]. + process(#sip{method = <<"OPTIONS">>} = Req, _) -> make_response(Req, #sip{type = response, status = 200, - hdrs = [{'allow', allow()}]}); + hdrs = [{'allow', allow()}, + {'supported', supported()}]}); process(#sip{method = <<"REGISTER">>} = Req, _) -> make_response(Req, #sip{type = response, status = 400}); process(Req, _) -> diff --git a/src/mod_sip_registrar.erl b/src/mod_sip_registrar.erl index 2102af851..da2c473c2 100644 --- a/src/mod_sip_registrar.erl +++ b/src/mod_sip_registrar.erl @@ -12,7 +12,7 @@ -behaviour(?GEN_SERVER). %% API --export([start_link/0, request/2, find_sockets/2]). +-export([start_link/0, request/2, find_sockets/2, ping/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -24,17 +24,18 @@ -define(CALL_TIMEOUT, timer:seconds(30)). -define(DEFAULT_EXPIRES, 3600). - --record(binding, {socket = #sip_socket{}, - call_id = <<"">> :: binary(), - cseq = 0 :: non_neg_integer(), - timestamp = now() :: erlang:timestamp(), - contact :: {binary(), #uri{}, [{binary(), binary()}]}, - tref = make_ref() :: reference(), - expires = 0 :: non_neg_integer()}). +-define(FLOW_TIMEOUT_DATAGRAM, 29). +-define(FLOW_TIMEOUT_STREAM, 180). -record(sip_session, {us = {<<"">>, <<"">>} :: {binary(), binary()}, - bindings = [] :: [#binding{}]}). + socket = #sip_socket{} :: #sip_socket{}, + call_id = <<"">> :: binary(), + cseq = 0 :: non_neg_integer(), + timestamp = now() :: erlang:timestamp(), + contact :: {binary(), #uri{}, [{binary(), binary()}]}, + tref = make_ref() :: reference(), + mref = make_ref() :: reference(), + expires = 0 :: non_neg_integer()}). -record(state, {}). @@ -53,6 +54,8 @@ request(#sip{hdrs = Hdrs} = Req, SIPSock) -> CallID = esip:get_hdr('call-id', Hdrs), CSeq = esip:get_hdr('cseq', Hdrs), Expires = esip:get_hdr('expires', Hdrs, ?DEFAULT_EXPIRES), + Supported = esip:get_hdrs('supported', Hdrs), + IsOutboundSupported = lists:member(<<"outbound">>, Supported), case esip:get_hdrs('contact', Hdrs) of [<<"*">>] when Expires == 0 -> case unregister_session(US, CallID, CSeq) of @@ -74,6 +77,7 @@ request(#sip{hdrs = Hdrs} = Req, SIPSock) -> end; [{_, _URI, _Params}|_] = Contacts -> ContactsWithExpires = make_contacts_with_expires(Contacts, Expires), + ContactsHaveManyRegID = contacts_have_many_reg_id(Contacts), Expires1 = lists:max([E || {_, E} <- ContactsWithExpires]), MinExpires = min_expires(), if Expires1 > 0, Expires1 < MinExpires -> @@ -81,19 +85,31 @@ request(#sip{hdrs = Hdrs} = Req, SIPSock) -> Req, #sip{type = response, status = 423, hdrs = [{'min-expires', MinExpires}]}); + ContactsHaveManyRegID -> + mod_sip:make_response( + Req, #sip{type = response, status = 400, + reason = <<"Multiple 'reg-id' parameter">>}); true -> case register_session(US, SIPSock, CallID, CSeq, + IsOutboundSupported, ContactsWithExpires) of {ok, Res} -> ?INFO_MSG("~s SIP session for user ~s@~s from ~s", [Res, LUser, LServer, inet_parse:ntoa(PeerIP)]), Cs = prepare_contacts_to_send(ContactsWithExpires), + Require = case need_ob_hdrs( + Contacts, IsOutboundSupported) of + true -> [{'require', [<<"outbound">>]}, + {'flow-timer', + get_flow_timeout(LServer, SIPSock)}]; + false -> [] + end, mod_sip:make_response( Req, #sip{type = response, status = 200, - hdrs = [{'contact', Cs}]}); + hdrs = [{'contact', Cs}|Require]}); {error, Why} -> {Status, Reason} = make_status(Why), mod_sip:make_response( @@ -104,12 +120,12 @@ request(#sip{hdrs = Hdrs} = Req, SIPSock) -> end; [] -> case mnesia:dirty_read(sip_session, US) of - [#sip_session{bindings = Bindings}] -> + [_|_] = Sessions -> ContactsWithExpires = lists:map( - fun(#binding{contact = Contact, expires = Es}) -> + fun(#sip_session{contact = Contact, expires = Es}) -> {Contact, Es} - end, Bindings), + end, Sessions), Cs = prepare_contacts_to_send(ContactsWithExpires), mod_sip:make_response( Req, #sip{type = response, status = 200, @@ -127,28 +143,42 @@ request(#sip{hdrs = Hdrs} = Req, SIPSock) -> find_sockets(U, S) -> case mnesia:dirty_read(sip_session, {U, S}) of - [#sip_session{bindings = Bindings}] -> + [_|_] = Sessions -> lists:map( - fun(#binding{contact = {_, URI, _}, + fun(#sip_session{contact = {_, URI, _}, socket = Socket}) -> {Socket, URI} - end, Bindings); + end, Sessions); [] -> [] end. +ping(#sip_socket{type = Type} = SIPSocket) -> + case mnesia:dirty_index_read(sip_session, SIPSocket, #sip_session.socket) of + [] when Type == udp -> + error; + [] -> + drop; + [_|_] -> + pong + end. + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== init([]) -> + update_table(), mnesia:create_table(sip_session, [{ram_copies, [node()]}, + {type, bag}, {attributes, record_info(fields, sip_session)}]), + mnesia:add_table_index(sip_session, mref), + mnesia:add_table_index(sip_session, socket), mnesia:add_table_copy(sip_session, node(), ram_copies), {ok, #state{}}. -handle_call({write, Session}, _From, State) -> - Res = write_session(Session), +handle_call({write, Sessions, Supported}, _From, State) -> + Res = write_session(Sessions, Supported), {reply, Res, State}; handle_call({delete, US, CallID, CSeq}, _From, State) -> Res = delete_session(US, CallID, CSeq), @@ -160,8 +190,8 @@ handle_call(_Request, _From, State) -> handle_cast(_Msg, State) -> {noreply, State}. -handle_info({write, Session}, State) -> - write_session(Session), +handle_info({write, Sessions, Supported}, State) -> + write_session(Sessions, Supported), {noreply, State}; handle_info({delete, US, CallID, CSeq}, State) -> delete_session(US, CallID, CSeq), @@ -169,6 +199,14 @@ handle_info({delete, US, CallID, CSeq}, State) -> handle_info({timeout, TRef, US}, State) -> delete_expired_session(US, TRef), {noreply, State}; +handle_info({'DOWN', MRef, process, _Pid, _Reason}, State) -> + case mnesia:dirty_index_read(sip_session, MRef, #sip_session.mref) of + [Session] -> + mnesia:dirty_delete_object(Session); + _ -> + ok + end, + {noreply, State}; handle_info(_Info, State) -> ?ERROR_MSG("got unexpected info: ~p", [_Info]), {noreply, State}. @@ -182,107 +220,101 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -register_session(US, SIPSocket, CallID, CSeq, ContactsWithExpires) -> - Bindings = lists:map( +register_session(US, SIPSocket, CallID, CSeq, IsOutboundSupported, + ContactsWithExpires) -> + Sessions = lists:map( fun({Contact, Expires}) -> - #binding{socket = SIPSocket, - call_id = CallID, - cseq = CSeq, - timestamp = now(), - contact = Contact, - expires = Expires} + #sip_session{us = US, + socket = SIPSocket, + call_id = CallID, + cseq = CSeq, + timestamp = now(), + contact = Contact, + expires = Expires} end, ContactsWithExpires), - Session = #sip_session{us = US, bindings = Bindings}, - call({write, Session}). + Msg = {write, Sessions, IsOutboundSupported}, + call(Msg). unregister_session(US, CallID, CSeq) -> Msg = {delete, US, CallID, CSeq}, call(Msg). -write_session(#sip_session{us = {U, S} = US, bindings = NewBindings}) -> - PrevBindings = case mnesia:dirty_read(sip_session, US) of - [#sip_session{bindings = PrevBindings1}] -> - PrevBindings1; - [] -> - [] - end, +write_session([#sip_session{us = {U, S} = US}|_] = NewSessions, + IsOutboundSupported) -> + PrevSessions = mnesia:dirty_read(sip_session, US), Res = lists:foldl( fun(_, {error, _} = Err) -> Err; - (#binding{call_id = CallID, - expires = Expires, - cseq = CSeq} = Binding, {Add, Keep, Del}) -> - case find_binding(Binding, PrevBindings) of - {ok, #binding{call_id = CallID, cseq = PrevCSeq}} + (#sip_session{call_id = CallID, + expires = Expires, + cseq = CSeq} = Session, {Add, Del}) -> + case find_session(Session, PrevSessions, + IsOutboundSupported) of + {ok, normal, #sip_session{call_id = CallID, + cseq = PrevCSeq}} when PrevCSeq > CSeq -> {error, cseq_out_of_order}; - {ok, PrevBinding} when Expires == 0 -> - {Add, Keep -- [PrevBinding], [PrevBinding|Del]}; - {ok, PrevBinding} -> - {[Binding|Add], Keep -- [PrevBinding], Del}; + {ok, _Type, PrevSession} when Expires == 0 -> + {Add, [PrevSession|Del]}; + {ok, _Type, PrevSession} -> + {[Session|Add], [PrevSession|Del]}; {error, notfound} when Expires == 0 -> {error, notfound}; {error, notfound} -> - {[Binding|Add], Keep, Del} + {[Session|Add], Del} end - end, {[], PrevBindings, []}, NewBindings), + end, {[], []}, NewSessions), MaxSessions = ejabberd_sm:get_max_user_sessions(U, S), case Res of {error, Why} -> {error, Why}; - {AddBindings, KeepBindings, DelBindings} -> + {AddSessions, DelSessions} -> MaxSessions = ejabberd_sm:get_max_user_sessions(U, S), - AllBindings = AddBindings ++ KeepBindings, - if length(AllBindings) > MaxSessions -> + AllSessions = AddSessions ++ PrevSessions -- DelSessions, + if length(AllSessions) > MaxSessions -> {error, too_many_sessions}; true -> lists:foreach( - fun(#binding{tref = TRef}) -> - erlang:cancel_timer(TRef) - end, DelBindings), - AddBindings1 = lists:map( - fun(#binding{tref = TRef, - expires = Expires} = Binding) -> - erlang:cancel_timer(TRef), - NewTRef = erlang:start_timer( - Expires * 1000, self(), US), - Binding#binding{tref = NewTRef} - end, AddBindings), - AllBindings1 = AddBindings1 ++ KeepBindings, - case AllBindings1 of - [] -> - mnesia:dirty_delete(sip_session, US), + fun(#sip_session{tref = TRef, mref = MRef} = Session) -> + erlang:cancel_timer(TRef), + catch erlang:demonitor(MRef, [flush]), + mnesia:dirty_delete_object(Session) + end, DelSessions), + lists:foreach( + fun(Session) -> + NewSession = set_monitor_and_timer( + Session, IsOutboundSupported), + mnesia:dirty_write(NewSession) + end, AddSessions), + case {AllSessions, AddSessions} of + {[], _} -> + {ok, unregister}; + {_, []} -> {ok, unregister}; _ -> - mnesia:dirty_write( - #sip_session{us = US, bindings = AllBindings1}), - if length(DelBindings) == length(NewBindings) -> - {ok, unregister}; - true -> - {ok, register} - end + {ok, register} end end end. delete_session(US, CallID, CSeq) -> case mnesia:dirty_read(sip_session, US) of - [#sip_session{bindings = Bindings}] -> + [_|_] = Sessions -> case lists:all( - fun(B) when B#binding.call_id == CallID, - B#binding.cseq > CSeq -> + fun(S) when S#sip_session.call_id == CallID, + S#sip_session.cseq > CSeq -> false; (_) -> true - end, Bindings) of + end, Sessions) of true -> ContactsWithExpires = lists:map( - fun(#binding{contact = Contact, - tref = TRef}) -> + fun(#sip_session{contact = Contact, + tref = TRef}) -> erlang:cancel_timer(TRef), {Contact, 0} - end, Bindings), + end, Sessions), mnesia:dirty_delete(sip_session, US), {ok, ContactsWithExpires}; false -> @@ -294,19 +326,17 @@ delete_session(US, CallID, CSeq) -> delete_expired_session(US, TRef) -> case mnesia:dirty_read(sip_session, US) of - [#sip_session{bindings = Bindings}] -> + [_|_] = Sessions -> case lists:filter( - fun(#binding{tref = TRef1}) when TRef1 == TRef -> - false; + fun(#sip_session{tref = TRef1}) when TRef1 == TRef -> + true; (_) -> - true - end, Bindings) of + false + end, Sessions) of + [Session|_] -> + mnesia:dirty_delete_object(Session); [] -> - mnesia:dirty_delete(sip_session, US); - NewBindings -> - mnesia:dirty_write(sip_session, - #sip_session{us = US, - bindings = NewBindings}) + ok end; [] -> ok @@ -355,15 +385,55 @@ prepare_contacts_to_send(ContactsWithExpires) -> {Name, URI, Params1} end, ContactsWithExpires). -find_binding(#binding{contact = {_, URI1, _}} = OrigBinding, - [#binding{contact = {_, URI2, _}} = Binding|Bindings]) -> +contacts_have_many_reg_id(Contacts) -> + Sum = lists:foldl( + fun({_Name, _URI, Params}, Acc) -> + case get_ob_params(Params) of + error -> + Acc; + {_, _} -> + Acc + 1 + end + end, 0, Contacts), + if Sum > 1 -> + true; + true -> + false + end. + +find_session(#sip_session{contact = {_, URI, Params}}, Sessions, + IsOutboundSupported) -> + if IsOutboundSupported -> + case get_ob_params(Params) of + {InstanceID, RegID} -> + find_session_by_ob({InstanceID, RegID}, Sessions); + error -> + find_session_by_uri(URI, Sessions) + end; + true -> + find_session_by_uri(URI, Sessions) + end. + +find_session_by_ob({InstanceID, RegID}, + [#sip_session{contact = {_, _, Params}} = Session|Sessions]) -> + case get_ob_params(Params) of + {InstanceID, RegID} -> + {ok, flow, Session}; + _ -> + find_session_by_ob({InstanceID, RegID}, Sessions) + end; +find_session_by_ob(_, []) -> + {error, notfound}. + +find_session_by_uri(URI1, + [#sip_session{contact = {_, URI2, _}} = Session|Sessions]) -> case cmp_uri(URI1, URI2) of true -> - {ok, Binding}; + {ok, normal, Session}; false -> - find_binding(OrigBinding, Bindings) + find_session_by_uri(URI1, Sessions) end; -find_binding(_, []) -> +find_session_by_uri(_, []) -> {error, notfound}. %% TODO: this is *totally* wrong. @@ -384,3 +454,78 @@ make_status(too_many_sessions) -> {503, <<"Too Many Registered Sessions">>}; make_status(_) -> {500, esip:reason(500)}. + +get_ob_params(Params) -> + case esip:get_param(<<"+sip.instance">>, Params) of + <<>> -> + error; + InstanceID -> + case to_integer(esip:get_param(<<"reg-id">>, Params), + 0, (1 bsl 32)-1) of + {ok, RegID} -> + {InstanceID, RegID}; + error -> + error + end + end. + +need_ob_hdrs(_Contacts, _IsOutboundSupported = false) -> + false; +need_ob_hdrs(Contacts, _IsOutboundSupported = true) -> + lists:any( + fun({_Name, _URI, Params}) -> + case get_ob_params(Params) of + error -> false; + {_, _} -> true + end + end, Contacts). + +get_flow_timeout(LServer, #sip_socket{type = Type}) -> + {Option, Default} = + case Type of + udp -> {flow_timeout_datagram, ?FLOW_TIMEOUT_DATAGRAM}; + _ -> {flow_timeout_stream, ?FLOW_TIMEOUT_STREAM} + end, + gen_mod:get_module_opt( + LServer, mod_sip, Option, + fun(I) when is_integer(I), I>0 -> I end, + Default). + +update_table() -> + Fields = record_info(fields, sip_session), + case catch mnesia:table_info(sip_session, attributes) of + Fields -> + ok; + [_|_] -> + mnesia:delete_table(sip_session); + {'EXIT', _} -> + ok + end. + +set_monitor_and_timer(#sip_session{expires = Expires} = Session, + _IsOutboundSupported = false) -> + set_timer(Session, Expires); +set_monitor_and_timer(#sip_session{socket = SIPSock, + mref = MRef, + expires = Expires, + us = {_, LServer}, + contact = {_, _, Params}} = Session, + _IsOutboundSupported = true) -> + case get_ob_params(Params) of + error -> + set_timer(Session, Expires); + {_, _} -> + FlowTimeout = get_flow_timeout(LServer, SIPSock), + Timeout = lists:min([FlowTimeout, Expires]), + NewSession = set_timer(Session, Timeout), + NewMRef = if SIPSock#sip_socket.type == udp -> + MRef; + true -> + erlang:monitor(process, SIPSock#sip_socket.pid) + end, + NewSession#sip_session{mref = NewMRef} + end. + +set_timer(#sip_session{us = US} = Session, Timeout) -> + TRef = erlang:start_timer(Timeout * 1000, self(), US), + Session#sip_session{tref = TRef}.