24
1
mirror of https://github.com/processone/ejabberd.git synced 2024-06-10 21:47:01 +02:00

SIP Outbound (RFC 5626) support

This commit is contained in:
Evgeniy Khramtsov 2014-06-06 09:32:07 +04:00
parent 11aa51373a
commit 9e72529544
2 changed files with 254 additions and 103 deletions

View File

@ -68,8 +68,8 @@ message_in(#sip{type = request, method = M} = Req, SIPSock)
Action -> Action ->
request(Req, SIPSock, undefined, Action) request(Req, SIPSock, undefined, Action)
end; end;
message_in(ping, _SIPSock) -> message_in(ping, SIPSock) ->
pong; mod_sip_registrar:ping(SIPSock);
message_in(_, _) -> message_in(_, _) ->
ok. ok.
@ -162,8 +162,9 @@ action(#sip{method = <<"REGISTER">>, type = request, hdrs = Hdrs,
uri = #uri{user = <<"">>} = URI} = Req, SIPSock) -> uri = #uri{user = <<"">>} = URI} = Req, SIPSock) ->
case at_my_host(URI) of case at_my_host(URI) of
true -> true ->
case esip:get_hdrs('require', Hdrs) of Require = esip:get_hdrs('require', Hdrs) -- supported(),
[_|_] = Require -> case Require of
[_|_] ->
{unsupported, Require}; {unsupported, Require};
_ -> _ ->
{_, ToURI, _} = esip:get_hdr('to', Hdrs), {_, ToURI, _} = esip:get_hdr('to', Hdrs),
@ -189,8 +190,9 @@ action(#sip{method = Method, hdrs = Hdrs, type = request} = Req, SIPSock) ->
0 -> 0 ->
loop; loop;
_ -> _ ->
case esip:get_hdrs('proxy-require', Hdrs) of Require = esip:get_hdrs('proxy-require', Hdrs) -- supported(),
[_|_] = Require -> case Require of
[_|_] ->
{unsupported, Require}; {unsupported, Require};
_ -> _ ->
{_, ToURI, _} = esip:get_hdr('to', Hdrs), {_, ToURI, _} = esip:get_hdr('to', Hdrs),
@ -253,9 +255,13 @@ check_auth(#sip{method = Method, hdrs = Hdrs, body = Body}, AuthHdr, _SIPSock) -
allow() -> allow() ->
[<<"OPTIONS">>, <<"REGISTER">>]. [<<"OPTIONS">>, <<"REGISTER">>].
supported() ->
[<<"path">>, <<"outbound">>].
process(#sip{method = <<"OPTIONS">>} = Req, _) -> process(#sip{method = <<"OPTIONS">>} = Req, _) ->
make_response(Req, #sip{type = response, status = 200, make_response(Req, #sip{type = response, status = 200,
hdrs = [{'allow', allow()}]}); hdrs = [{'allow', allow()},
{'supported', supported()}]});
process(#sip{method = <<"REGISTER">>} = Req, _) -> process(#sip{method = <<"REGISTER">>} = Req, _) ->
make_response(Req, #sip{type = response, status = 400}); make_response(Req, #sip{type = response, status = 400});
process(Req, _) -> process(Req, _) ->

View File

@ -12,7 +12,7 @@
-behaviour(?GEN_SERVER). -behaviour(?GEN_SERVER).
%% API %% API
-export([start_link/0, request/2, find_sockets/2]). -export([start_link/0, request/2, find_sockets/2, ping/1]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -24,17 +24,18 @@
-define(CALL_TIMEOUT, timer:seconds(30)). -define(CALL_TIMEOUT, timer:seconds(30)).
-define(DEFAULT_EXPIRES, 3600). -define(DEFAULT_EXPIRES, 3600).
-define(FLOW_TIMEOUT_DATAGRAM, 29).
-record(binding, {socket = #sip_socket{}, -define(FLOW_TIMEOUT_STREAM, 180).
call_id = <<"">> :: binary(),
cseq = 0 :: non_neg_integer(),
timestamp = now() :: erlang:timestamp(),
contact :: {binary(), #uri{}, [{binary(), binary()}]},
tref = make_ref() :: reference(),
expires = 0 :: non_neg_integer()}).
-record(sip_session, {us = {<<"">>, <<"">>} :: {binary(), binary()}, -record(sip_session, {us = {<<"">>, <<"">>} :: {binary(), binary()},
bindings = [] :: [#binding{}]}). socket = #sip_socket{} :: #sip_socket{},
call_id = <<"">> :: binary(),
cseq = 0 :: non_neg_integer(),
timestamp = now() :: erlang:timestamp(),
contact :: {binary(), #uri{}, [{binary(), binary()}]},
tref = make_ref() :: reference(),
mref = make_ref() :: reference(),
expires = 0 :: non_neg_integer()}).
-record(state, {}). -record(state, {}).
@ -53,6 +54,8 @@ request(#sip{hdrs = Hdrs} = Req, SIPSock) ->
CallID = esip:get_hdr('call-id', Hdrs), CallID = esip:get_hdr('call-id', Hdrs),
CSeq = esip:get_hdr('cseq', Hdrs), CSeq = esip:get_hdr('cseq', Hdrs),
Expires = esip:get_hdr('expires', Hdrs, ?DEFAULT_EXPIRES), Expires = esip:get_hdr('expires', Hdrs, ?DEFAULT_EXPIRES),
Supported = esip:get_hdrs('supported', Hdrs),
IsOutboundSupported = lists:member(<<"outbound">>, Supported),
case esip:get_hdrs('contact', Hdrs) of case esip:get_hdrs('contact', Hdrs) of
[<<"*">>] when Expires == 0 -> [<<"*">>] when Expires == 0 ->
case unregister_session(US, CallID, CSeq) of case unregister_session(US, CallID, CSeq) of
@ -74,6 +77,7 @@ request(#sip{hdrs = Hdrs} = Req, SIPSock) ->
end; end;
[{_, _URI, _Params}|_] = Contacts -> [{_, _URI, _Params}|_] = Contacts ->
ContactsWithExpires = make_contacts_with_expires(Contacts, Expires), ContactsWithExpires = make_contacts_with_expires(Contacts, Expires),
ContactsHaveManyRegID = contacts_have_many_reg_id(Contacts),
Expires1 = lists:max([E || {_, E} <- ContactsWithExpires]), Expires1 = lists:max([E || {_, E} <- ContactsWithExpires]),
MinExpires = min_expires(), MinExpires = min_expires(),
if Expires1 > 0, Expires1 < MinExpires -> if Expires1 > 0, Expires1 < MinExpires ->
@ -81,19 +85,31 @@ request(#sip{hdrs = Hdrs} = Req, SIPSock) ->
Req, #sip{type = response, Req, #sip{type = response,
status = 423, status = 423,
hdrs = [{'min-expires', MinExpires}]}); hdrs = [{'min-expires', MinExpires}]});
ContactsHaveManyRegID ->
mod_sip:make_response(
Req, #sip{type = response, status = 400,
reason = <<"Multiple 'reg-id' parameter">>});
true -> true ->
case register_session(US, SIPSock, CallID, CSeq, case register_session(US, SIPSock, CallID, CSeq,
IsOutboundSupported,
ContactsWithExpires) of ContactsWithExpires) of
{ok, Res} -> {ok, Res} ->
?INFO_MSG("~s SIP session for user ~s@~s from ~s", ?INFO_MSG("~s SIP session for user ~s@~s from ~s",
[Res, LUser, LServer, [Res, LUser, LServer,
inet_parse:ntoa(PeerIP)]), inet_parse:ntoa(PeerIP)]),
Cs = prepare_contacts_to_send(ContactsWithExpires), Cs = prepare_contacts_to_send(ContactsWithExpires),
Require = case need_ob_hdrs(
Contacts, IsOutboundSupported) of
true -> [{'require', [<<"outbound">>]},
{'flow-timer',
get_flow_timeout(LServer, SIPSock)}];
false -> []
end,
mod_sip:make_response( mod_sip:make_response(
Req, Req,
#sip{type = response, #sip{type = response,
status = 200, status = 200,
hdrs = [{'contact', Cs}]}); hdrs = [{'contact', Cs}|Require]});
{error, Why} -> {error, Why} ->
{Status, Reason} = make_status(Why), {Status, Reason} = make_status(Why),
mod_sip:make_response( mod_sip:make_response(
@ -104,12 +120,12 @@ request(#sip{hdrs = Hdrs} = Req, SIPSock) ->
end; end;
[] -> [] ->
case mnesia:dirty_read(sip_session, US) of case mnesia:dirty_read(sip_session, US) of
[#sip_session{bindings = Bindings}] -> [_|_] = Sessions ->
ContactsWithExpires = ContactsWithExpires =
lists:map( lists:map(
fun(#binding{contact = Contact, expires = Es}) -> fun(#sip_session{contact = Contact, expires = Es}) ->
{Contact, Es} {Contact, Es}
end, Bindings), end, Sessions),
Cs = prepare_contacts_to_send(ContactsWithExpires), Cs = prepare_contacts_to_send(ContactsWithExpires),
mod_sip:make_response( mod_sip:make_response(
Req, #sip{type = response, status = 200, Req, #sip{type = response, status = 200,
@ -127,28 +143,42 @@ request(#sip{hdrs = Hdrs} = Req, SIPSock) ->
find_sockets(U, S) -> find_sockets(U, S) ->
case mnesia:dirty_read(sip_session, {U, S}) of case mnesia:dirty_read(sip_session, {U, S}) of
[#sip_session{bindings = Bindings}] -> [_|_] = Sessions ->
lists:map( lists:map(
fun(#binding{contact = {_, URI, _}, fun(#sip_session{contact = {_, URI, _},
socket = Socket}) -> socket = Socket}) ->
{Socket, URI} {Socket, URI}
end, Bindings); end, Sessions);
[] -> [] ->
[] []
end. end.
ping(#sip_socket{type = Type} = SIPSocket) ->
case mnesia:dirty_index_read(sip_session, SIPSocket, #sip_session.socket) of
[] when Type == udp ->
error;
[] ->
drop;
[_|_] ->
pong
end.
%%%=================================================================== %%%===================================================================
%%% gen_server callbacks %%% gen_server callbacks
%%%=================================================================== %%%===================================================================
init([]) -> init([]) ->
update_table(),
mnesia:create_table(sip_session, mnesia:create_table(sip_session,
[{ram_copies, [node()]}, [{ram_copies, [node()]},
{type, bag},
{attributes, record_info(fields, sip_session)}]), {attributes, record_info(fields, sip_session)}]),
mnesia:add_table_index(sip_session, mref),
mnesia:add_table_index(sip_session, socket),
mnesia:add_table_copy(sip_session, node(), ram_copies), mnesia:add_table_copy(sip_session, node(), ram_copies),
{ok, #state{}}. {ok, #state{}}.
handle_call({write, Session}, _From, State) -> handle_call({write, Sessions, Supported}, _From, State) ->
Res = write_session(Session), Res = write_session(Sessions, Supported),
{reply, Res, State}; {reply, Res, State};
handle_call({delete, US, CallID, CSeq}, _From, State) -> handle_call({delete, US, CallID, CSeq}, _From, State) ->
Res = delete_session(US, CallID, CSeq), Res = delete_session(US, CallID, CSeq),
@ -160,8 +190,8 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) -> handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({write, Session}, State) -> handle_info({write, Sessions, Supported}, State) ->
write_session(Session), write_session(Sessions, Supported),
{noreply, State}; {noreply, State};
handle_info({delete, US, CallID, CSeq}, State) -> handle_info({delete, US, CallID, CSeq}, State) ->
delete_session(US, CallID, CSeq), delete_session(US, CallID, CSeq),
@ -169,6 +199,14 @@ handle_info({delete, US, CallID, CSeq}, State) ->
handle_info({timeout, TRef, US}, State) -> handle_info({timeout, TRef, US}, State) ->
delete_expired_session(US, TRef), delete_expired_session(US, TRef),
{noreply, State}; {noreply, State};
handle_info({'DOWN', MRef, process, _Pid, _Reason}, State) ->
case mnesia:dirty_index_read(sip_session, MRef, #sip_session.mref) of
[Session] ->
mnesia:dirty_delete_object(Session);
_ ->
ok
end,
{noreply, State};
handle_info(_Info, State) -> handle_info(_Info, State) ->
?ERROR_MSG("got unexpected info: ~p", [_Info]), ?ERROR_MSG("got unexpected info: ~p", [_Info]),
{noreply, State}. {noreply, State}.
@ -182,107 +220,101 @@ code_change(_OldVsn, State, _Extra) ->
%%%=================================================================== %%%===================================================================
%%% Internal functions %%% Internal functions
%%%=================================================================== %%%===================================================================
register_session(US, SIPSocket, CallID, CSeq, ContactsWithExpires) -> register_session(US, SIPSocket, CallID, CSeq, IsOutboundSupported,
Bindings = lists:map( ContactsWithExpires) ->
Sessions = lists:map(
fun({Contact, Expires}) -> fun({Contact, Expires}) ->
#binding{socket = SIPSocket, #sip_session{us = US,
call_id = CallID, socket = SIPSocket,
cseq = CSeq, call_id = CallID,
timestamp = now(), cseq = CSeq,
contact = Contact, timestamp = now(),
expires = Expires} contact = Contact,
expires = Expires}
end, ContactsWithExpires), end, ContactsWithExpires),
Session = #sip_session{us = US, bindings = Bindings}, Msg = {write, Sessions, IsOutboundSupported},
call({write, Session}). call(Msg).
unregister_session(US, CallID, CSeq) -> unregister_session(US, CallID, CSeq) ->
Msg = {delete, US, CallID, CSeq}, Msg = {delete, US, CallID, CSeq},
call(Msg). call(Msg).
write_session(#sip_session{us = {U, S} = US, bindings = NewBindings}) -> write_session([#sip_session{us = {U, S} = US}|_] = NewSessions,
PrevBindings = case mnesia:dirty_read(sip_session, US) of IsOutboundSupported) ->
[#sip_session{bindings = PrevBindings1}] -> PrevSessions = mnesia:dirty_read(sip_session, US),
PrevBindings1;
[] ->
[]
end,
Res = lists:foldl( Res = lists:foldl(
fun(_, {error, _} = Err) -> fun(_, {error, _} = Err) ->
Err; Err;
(#binding{call_id = CallID, (#sip_session{call_id = CallID,
expires = Expires, expires = Expires,
cseq = CSeq} = Binding, {Add, Keep, Del}) -> cseq = CSeq} = Session, {Add, Del}) ->
case find_binding(Binding, PrevBindings) of case find_session(Session, PrevSessions,
{ok, #binding{call_id = CallID, cseq = PrevCSeq}} IsOutboundSupported) of
{ok, normal, #sip_session{call_id = CallID,
cseq = PrevCSeq}}
when PrevCSeq > CSeq -> when PrevCSeq > CSeq ->
{error, cseq_out_of_order}; {error, cseq_out_of_order};
{ok, PrevBinding} when Expires == 0 -> {ok, _Type, PrevSession} when Expires == 0 ->
{Add, Keep -- [PrevBinding], [PrevBinding|Del]}; {Add, [PrevSession|Del]};
{ok, PrevBinding} -> {ok, _Type, PrevSession} ->
{[Binding|Add], Keep -- [PrevBinding], Del}; {[Session|Add], [PrevSession|Del]};
{error, notfound} when Expires == 0 -> {error, notfound} when Expires == 0 ->
{error, notfound}; {error, notfound};
{error, notfound} -> {error, notfound} ->
{[Binding|Add], Keep, Del} {[Session|Add], Del}
end end
end, {[], PrevBindings, []}, NewBindings), end, {[], []}, NewSessions),
MaxSessions = ejabberd_sm:get_max_user_sessions(U, S), MaxSessions = ejabberd_sm:get_max_user_sessions(U, S),
case Res of case Res of
{error, Why} -> {error, Why} ->
{error, Why}; {error, Why};
{AddBindings, KeepBindings, DelBindings} -> {AddSessions, DelSessions} ->
MaxSessions = ejabberd_sm:get_max_user_sessions(U, S), MaxSessions = ejabberd_sm:get_max_user_sessions(U, S),
AllBindings = AddBindings ++ KeepBindings, AllSessions = AddSessions ++ PrevSessions -- DelSessions,
if length(AllBindings) > MaxSessions -> if length(AllSessions) > MaxSessions ->
{error, too_many_sessions}; {error, too_many_sessions};
true -> true ->
lists:foreach( lists:foreach(
fun(#binding{tref = TRef}) -> fun(#sip_session{tref = TRef, mref = MRef} = Session) ->
erlang:cancel_timer(TRef) erlang:cancel_timer(TRef),
end, DelBindings), catch erlang:demonitor(MRef, [flush]),
AddBindings1 = lists:map( mnesia:dirty_delete_object(Session)
fun(#binding{tref = TRef, end, DelSessions),
expires = Expires} = Binding) -> lists:foreach(
erlang:cancel_timer(TRef), fun(Session) ->
NewTRef = erlang:start_timer( NewSession = set_monitor_and_timer(
Expires * 1000, self(), US), Session, IsOutboundSupported),
Binding#binding{tref = NewTRef} mnesia:dirty_write(NewSession)
end, AddBindings), end, AddSessions),
AllBindings1 = AddBindings1 ++ KeepBindings, case {AllSessions, AddSessions} of
case AllBindings1 of {[], _} ->
[] -> {ok, unregister};
mnesia:dirty_delete(sip_session, US), {_, []} ->
{ok, unregister}; {ok, unregister};
_ -> _ ->
mnesia:dirty_write( {ok, register}
#sip_session{us = US, bindings = AllBindings1}),
if length(DelBindings) == length(NewBindings) ->
{ok, unregister};
true ->
{ok, register}
end
end end
end end
end. end.
delete_session(US, CallID, CSeq) -> delete_session(US, CallID, CSeq) ->
case mnesia:dirty_read(sip_session, US) of case mnesia:dirty_read(sip_session, US) of
[#sip_session{bindings = Bindings}] -> [_|_] = Sessions ->
case lists:all( case lists:all(
fun(B) when B#binding.call_id == CallID, fun(S) when S#sip_session.call_id == CallID,
B#binding.cseq > CSeq -> S#sip_session.cseq > CSeq ->
false; false;
(_) -> (_) ->
true true
end, Bindings) of end, Sessions) of
true -> true ->
ContactsWithExpires = ContactsWithExpires =
lists:map( lists:map(
fun(#binding{contact = Contact, fun(#sip_session{contact = Contact,
tref = TRef}) -> tref = TRef}) ->
erlang:cancel_timer(TRef), erlang:cancel_timer(TRef),
{Contact, 0} {Contact, 0}
end, Bindings), end, Sessions),
mnesia:dirty_delete(sip_session, US), mnesia:dirty_delete(sip_session, US),
{ok, ContactsWithExpires}; {ok, ContactsWithExpires};
false -> false ->
@ -294,19 +326,17 @@ delete_session(US, CallID, CSeq) ->
delete_expired_session(US, TRef) -> delete_expired_session(US, TRef) ->
case mnesia:dirty_read(sip_session, US) of case mnesia:dirty_read(sip_session, US) of
[#sip_session{bindings = Bindings}] -> [_|_] = Sessions ->
case lists:filter( case lists:filter(
fun(#binding{tref = TRef1}) when TRef1 == TRef -> fun(#sip_session{tref = TRef1}) when TRef1 == TRef ->
false; true;
(_) -> (_) ->
true false
end, Bindings) of end, Sessions) of
[Session|_] ->
mnesia:dirty_delete_object(Session);
[] -> [] ->
mnesia:dirty_delete(sip_session, US); ok
NewBindings ->
mnesia:dirty_write(sip_session,
#sip_session{us = US,
bindings = NewBindings})
end; end;
[] -> [] ->
ok ok
@ -355,15 +385,55 @@ prepare_contacts_to_send(ContactsWithExpires) ->
{Name, URI, Params1} {Name, URI, Params1}
end, ContactsWithExpires). end, ContactsWithExpires).
find_binding(#binding{contact = {_, URI1, _}} = OrigBinding, contacts_have_many_reg_id(Contacts) ->
[#binding{contact = {_, URI2, _}} = Binding|Bindings]) -> Sum = lists:foldl(
fun({_Name, _URI, Params}, Acc) ->
case get_ob_params(Params) of
error ->
Acc;
{_, _} ->
Acc + 1
end
end, 0, Contacts),
if Sum > 1 ->
true;
true ->
false
end.
find_session(#sip_session{contact = {_, URI, Params}}, Sessions,
IsOutboundSupported) ->
if IsOutboundSupported ->
case get_ob_params(Params) of
{InstanceID, RegID} ->
find_session_by_ob({InstanceID, RegID}, Sessions);
error ->
find_session_by_uri(URI, Sessions)
end;
true ->
find_session_by_uri(URI, Sessions)
end.
find_session_by_ob({InstanceID, RegID},
[#sip_session{contact = {_, _, Params}} = Session|Sessions]) ->
case get_ob_params(Params) of
{InstanceID, RegID} ->
{ok, flow, Session};
_ ->
find_session_by_ob({InstanceID, RegID}, Sessions)
end;
find_session_by_ob(_, []) ->
{error, notfound}.
find_session_by_uri(URI1,
[#sip_session{contact = {_, URI2, _}} = Session|Sessions]) ->
case cmp_uri(URI1, URI2) of case cmp_uri(URI1, URI2) of
true -> true ->
{ok, Binding}; {ok, normal, Session};
false -> false ->
find_binding(OrigBinding, Bindings) find_session_by_uri(URI1, Sessions)
end; end;
find_binding(_, []) -> find_session_by_uri(_, []) ->
{error, notfound}. {error, notfound}.
%% TODO: this is *totally* wrong. %% TODO: this is *totally* wrong.
@ -384,3 +454,78 @@ make_status(too_many_sessions) ->
{503, <<"Too Many Registered Sessions">>}; {503, <<"Too Many Registered Sessions">>};
make_status(_) -> make_status(_) ->
{500, esip:reason(500)}. {500, esip:reason(500)}.
get_ob_params(Params) ->
case esip:get_param(<<"+sip.instance">>, Params) of
<<>> ->
error;
InstanceID ->
case to_integer(esip:get_param(<<"reg-id">>, Params),
0, (1 bsl 32)-1) of
{ok, RegID} ->
{InstanceID, RegID};
error ->
error
end
end.
need_ob_hdrs(_Contacts, _IsOutboundSupported = false) ->
false;
need_ob_hdrs(Contacts, _IsOutboundSupported = true) ->
lists:any(
fun({_Name, _URI, Params}) ->
case get_ob_params(Params) of
error -> false;
{_, _} -> true
end
end, Contacts).
get_flow_timeout(LServer, #sip_socket{type = Type}) ->
{Option, Default} =
case Type of
udp -> {flow_timeout_datagram, ?FLOW_TIMEOUT_DATAGRAM};
_ -> {flow_timeout_stream, ?FLOW_TIMEOUT_STREAM}
end,
gen_mod:get_module_opt(
LServer, mod_sip, Option,
fun(I) when is_integer(I), I>0 -> I end,
Default).
update_table() ->
Fields = record_info(fields, sip_session),
case catch mnesia:table_info(sip_session, attributes) of
Fields ->
ok;
[_|_] ->
mnesia:delete_table(sip_session);
{'EXIT', _} ->
ok
end.
set_monitor_and_timer(#sip_session{expires = Expires} = Session,
_IsOutboundSupported = false) ->
set_timer(Session, Expires);
set_monitor_and_timer(#sip_session{socket = SIPSock,
mref = MRef,
expires = Expires,
us = {_, LServer},
contact = {_, _, Params}} = Session,
_IsOutboundSupported = true) ->
case get_ob_params(Params) of
error ->
set_timer(Session, Expires);
{_, _} ->
FlowTimeout = get_flow_timeout(LServer, SIPSock),
Timeout = lists:min([FlowTimeout, Expires]),
NewSession = set_timer(Session, Timeout),
NewMRef = if SIPSock#sip_socket.type == udp ->
MRef;
true ->
erlang:monitor(process, SIPSock#sip_socket.pid)
end,
NewSession#sip_session{mref = NewMRef}
end.
set_timer(#sip_session{us = US} = Session, Timeout) ->
TRef = erlang:start_timer(Timeout * 1000, self(), US),
Session#sip_session{tref = TRef}.