mirror of
https://github.com/processone/ejabberd.git
synced 2024-12-24 17:29:28 +01:00
*** empty log message ***
SVN Revision: 13
This commit is contained in:
parent
e6c062fe40
commit
6c96829311
@ -25,6 +25,7 @@ init() ->
|
||||
ejabberd_auth:start(),
|
||||
ejabberd_router:start(),
|
||||
ejabberd_sm:start(),
|
||||
ejabberd_s2s:start(),
|
||||
ejabberd_local:start(),
|
||||
ejabberd_listener:start(),
|
||||
loop(Port).
|
||||
|
@ -15,3 +15,6 @@
|
||||
-define(DEBUG(F,A),[]).
|
||||
-endif.
|
||||
|
||||
|
||||
-define(MYNAME,"127.0.0.1").
|
||||
|
||||
|
@ -23,11 +23,11 @@
|
||||
handle_info/3,
|
||||
terminate/3]).
|
||||
|
||||
-record(state, {socket, sender, receiver, streamid,
|
||||
user = "", server = "localhost", resource = ""}).
|
||||
|
||||
-include("ejabberd.hrl").
|
||||
|
||||
-record(state, {socket, sender, receiver, streamid,
|
||||
user = "", server = ?MYNAME, resource = ""}).
|
||||
|
||||
-define(DBGFSM, true).
|
||||
|
||||
-ifdef(DBGFSM).
|
||||
@ -84,7 +84,7 @@ state_name(Event, StateData) ->
|
||||
wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) ->
|
||||
% TODO
|
||||
Header = io_lib:format(?STREAM_HEADER,
|
||||
[StateData#state.streamid, "localhost"]),
|
||||
[StateData#state.streamid, ?MYNAME]),
|
||||
send_text(StateData#state.sender, Header),
|
||||
case lists:keysearch("xmlns:stream", 1, Attrs) of
|
||||
{value, {"xmlns:stream", "http://etherx.jabber.org/streams"}} ->
|
||||
|
@ -21,7 +21,7 @@ start() ->
|
||||
spawn(ejabberd_local, init, []).
|
||||
|
||||
init() ->
|
||||
ejabberd_router:register_local_route("localhost"),
|
||||
ejabberd_router:register_local_route(?MYNAME),
|
||||
loop().
|
||||
|
||||
loop() ->
|
||||
|
@ -101,15 +101,16 @@ do_route(From, To, Packet) ->
|
||||
case mnesia:transaction(F) of
|
||||
{atomic, error} ->
|
||||
% TODO: start s2s instead of below
|
||||
{xmlelement, Name, Attrs, SubTags} = Packet,
|
||||
case xml:get_attr_s("type", Attrs) of
|
||||
"error" ->
|
||||
ok;
|
||||
_ ->
|
||||
Err = jlib:make_error_reply(Packet,
|
||||
"502", "Service Unavailable"),
|
||||
ejabberd_router ! {route, To, From, Err}
|
||||
end;
|
||||
ejabberd_s2s ! {route, From, To, Packet};
|
||||
%{xmlelement, Name, Attrs, SubTags} = Packet,
|
||||
%case xml:get_attr_s("type", Attrs) of
|
||||
% "error" ->
|
||||
% ok;
|
||||
% _ ->
|
||||
% Err = jlib:make_error_reply(Packet,
|
||||
% "502", "Service Unavailable"),
|
||||
% ejabberd_router ! {route, To, From, Err}
|
||||
%end;
|
||||
{atomic, {ok, Node, Pid}} ->
|
||||
case node() of
|
||||
Node ->
|
||||
|
252
src/ejabberd_s2s.erl
Normal file
252
src/ejabberd_s2s.erl
Normal file
@ -0,0 +1,252 @@
|
||||
%%%----------------------------------------------------------------------
|
||||
%%% File : ejabberd_s2s.erl
|
||||
%%% Author : Alexey Shchepin <alexey@sevcom.net>
|
||||
%%% Purpose :
|
||||
%%% Created : 7 Dec 2002 by Alexey Shchepin <alexey@sevcom.net>
|
||||
%%% Id : $Id$
|
||||
%%%----------------------------------------------------------------------
|
||||
|
||||
-module(ejabberd_s2s).
|
||||
-author('alexey@sevcom.net').
|
||||
-vsn('$Revision$ ').
|
||||
|
||||
-export([start/0, init/0, open_session/2, close_session/2,
|
||||
have_connection/1,
|
||||
get_key/1]).
|
||||
|
||||
-include_lib("mnemosyne/include/mnemosyne.hrl").
|
||||
-include("ejabberd.hrl").
|
||||
|
||||
-record(s2s, {server, node, key}).
|
||||
-record(mys2s, {server, pid}).
|
||||
|
||||
|
||||
start() ->
|
||||
spawn(ejabberd_s2s, init, []).
|
||||
|
||||
init() ->
|
||||
register(ejabberd_s2s, self()),
|
||||
mnesia:create_table(s2s,[{ram_copies, [node()]},
|
||||
{attributes, record_info(fields, s2s)}]),
|
||||
mnesia:add_table_index(session, node),
|
||||
mnesia:create_table(mys2s,
|
||||
[{ram_copies, [node()]},
|
||||
{local_content, true},
|
||||
{attributes, record_info(fields, mys2s)}]),
|
||||
mnesia:subscribe(system),
|
||||
loop().
|
||||
|
||||
loop() ->
|
||||
receive
|
||||
%{open_connection, User, Resource, From} ->
|
||||
% replace_and_register_my_connection(User, Resource, From),
|
||||
% replace_alien_connection(User, Resource),
|
||||
% loop();
|
||||
{closed_conection, Server} ->
|
||||
remove_connection(Server),
|
||||
loop();
|
||||
%{replace, User, Resource} ->
|
||||
% replace_my_connection(User, Resource),
|
||||
% loop();
|
||||
{mnesia_system_event, {mnesia_down, Node}} ->
|
||||
clean_table_from_bad_node(Node),
|
||||
loop();
|
||||
{route, From, To, Packet} ->
|
||||
do_route(From, To, Packet),
|
||||
loop();
|
||||
_ ->
|
||||
loop()
|
||||
end.
|
||||
|
||||
|
||||
open_session(User, Resource) ->
|
||||
ejabberd_s2s ! {open_session, User, Resource, self()}.
|
||||
|
||||
close_session(User, Resource) ->
|
||||
ejabberd_s2s ! {close_session, User, Resource}.
|
||||
|
||||
%replace_alien_connection(User, Resource) ->
|
||||
% F = fun() ->
|
||||
% [ID] = mnemosyne:eval(query [X.id || X <- table(user_resource),
|
||||
% X.user = User,
|
||||
% X.resource = Resource]
|
||||
% end),
|
||||
% Es = mnesia:read({session, ID}),
|
||||
% mnesia:write(#session{id = ID, node = node()}),
|
||||
% Es
|
||||
% end,
|
||||
% case mnesia:transaction(F) of
|
||||
% {atomic, Rs} ->
|
||||
% lists:foreach(
|
||||
% fun(R) ->
|
||||
% if R#session.node /= node() ->
|
||||
% {ejabberd_s2s, R#session.node} !
|
||||
% {replace, User, Resource};
|
||||
% true ->
|
||||
% ok
|
||||
% end
|
||||
% end, Rs);
|
||||
% _ ->
|
||||
% false
|
||||
% end.
|
||||
%
|
||||
%
|
||||
%replace_my_connection(User, Resource) ->
|
||||
% F = fun() ->
|
||||
% [ID] = mnemosyne:eval(query [X.id || X <- table(user_resource),
|
||||
% X.user = User,
|
||||
% X.resource = Resource]
|
||||
% end),
|
||||
%
|
||||
% Es = mnesia:read({mysession, ID}),
|
||||
% mnesia:delete({mysession, ID}),
|
||||
% Es
|
||||
% end,
|
||||
% case mnesia:transaction(F) of
|
||||
% {atomic, Rs} ->
|
||||
% lists:foreach(
|
||||
% fun(R) ->
|
||||
% (R#mysession.info)#mysession_info.pid ! replaced
|
||||
% end, Rs);
|
||||
% _ ->
|
||||
% false
|
||||
% end.
|
||||
|
||||
remove_connection(Server) ->
|
||||
F = fun() ->
|
||||
mnesia:delete({mys2s, Server}),
|
||||
mnesia:delete({s2s, Server})
|
||||
end,
|
||||
mnesia:transaction(F).
|
||||
|
||||
%replace_and_register_my_connection(User, Resource, Pid) ->
|
||||
% F = fun() ->
|
||||
% IDs = mnemosyne:eval(query [X.id || X <- table(user_resource),
|
||||
% X.user = User,
|
||||
% X.resource = Resource]
|
||||
% end),
|
||||
%
|
||||
% ID = case IDs of
|
||||
% [Id] -> Id;
|
||||
% [] ->
|
||||
% [CurID] =
|
||||
% mnemosyne:eval(
|
||||
% query [X.id ||
|
||||
% X <- table(user_resource_id_seq)]
|
||||
% end),
|
||||
% mnesia:write(
|
||||
% #user_resource_id_seq{id = CurID + 1}),
|
||||
% mnesia:write(
|
||||
% #user_resource{id = CurID,
|
||||
% user = User,
|
||||
% resource = Resource}),
|
||||
% CurID
|
||||
% end,
|
||||
% Es = mnesia:read({mysession, ID}),
|
||||
% mnesia:write(#mysession{id = ID,
|
||||
% info = #mysession_info{pid = Pid}}),
|
||||
% Es
|
||||
% end,
|
||||
% case mnesia:transaction(F) of
|
||||
% {atomic, Rs} ->
|
||||
% lists:foreach(
|
||||
% fun(R) ->
|
||||
% (R#mysession.info)#mysession_info.pid ! replaced
|
||||
% end, Rs);
|
||||
% _ ->
|
||||
% false
|
||||
% end.
|
||||
|
||||
|
||||
clean_table_from_bad_node(Node) ->
|
||||
F = fun() ->
|
||||
Es = mnesia:index_read(s2s, Node, #s2s.node),
|
||||
lists:foreach(fun(E) ->
|
||||
mnesia:delete_object(s2s, E, write)
|
||||
end, Es)
|
||||
end,
|
||||
mnesia:transaction(F).
|
||||
|
||||
have_connection(Server) ->
|
||||
F = fun() ->
|
||||
[E] = mnesia:read({s2s, Server})
|
||||
end,
|
||||
case mnesia:transaction(F) of
|
||||
{atomic, _} ->
|
||||
true;
|
||||
_ ->
|
||||
false
|
||||
end.
|
||||
|
||||
get_key(Server) ->
|
||||
F = fun() ->
|
||||
[E] = mnesia:read({s2s, Server}),
|
||||
E
|
||||
end,
|
||||
case mnesia:transaction(F) of
|
||||
{atomic, E} ->
|
||||
E#s2s.key;
|
||||
_ ->
|
||||
""
|
||||
end.
|
||||
|
||||
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
|
||||
do_route(From, To, Packet) ->
|
||||
?DEBUG("s2s manager~n\tfrom ~p~n\tto ~p~n\tpacket ~P~n",
|
||||
[From, To, Packet, 8]),
|
||||
{User, Server, Resource} = To,
|
||||
Key = lists:flatten(io_lib:format("~p", [random:uniform(65536*65536)])),
|
||||
F = fun() ->
|
||||
case mnesia:read({mys2s, Server}) of
|
||||
[] ->
|
||||
case mnesia:read({s2s, Server}) of
|
||||
[Er] ->
|
||||
{remote, Er#s2s.node};
|
||||
[] ->
|
||||
% TODO
|
||||
mnesia:write(#s2s{server = Server,
|
||||
node = node(),
|
||||
key = Key}),
|
||||
new
|
||||
end;
|
||||
[El] ->
|
||||
{local, El#mys2s.pid}
|
||||
end
|
||||
end,
|
||||
case mnesia:transaction(F) of
|
||||
{atomic, {local, Pid}} ->
|
||||
?DEBUG("sending to process ~p~n", [Pid]),
|
||||
% TODO
|
||||
{xmlelement, Name, Attrs, Els} = Packet,
|
||||
NewAttrs = jlib:replace_from_to_attrs(jlib:jid_to_string(From),
|
||||
jlib:jid_to_string(To),
|
||||
Attrs),
|
||||
send_element(Pid, {xmlelement, Name, NewAttrs, Els}),
|
||||
ok;
|
||||
{atomic, {remote, Node}} ->
|
||||
?DEBUG("sending to node ~p~n", [Node]),
|
||||
{ejabberd_s2s, Node} ! {route, From, To, Packet},
|
||||
ok;
|
||||
{atomic, new} ->
|
||||
?DEBUG("starting new s2s connection~n", []),
|
||||
Pid = ejabberd_s2s_out:start(Server, {new, Key}),
|
||||
mnesia:transaction(fun() -> mnesia:write(#mys2s{server = Server,
|
||||
pid = Pid}) end),
|
||||
{xmlelement, Name, Attrs, Els} = Packet,
|
||||
NewAttrs = jlib:replace_from_to_attrs(jlib:jid_to_string(From),
|
||||
jlib:jid_to_string(To),
|
||||
Attrs),
|
||||
send_element(Pid, {xmlelement, Name, NewAttrs, Els}),
|
||||
ok;
|
||||
{atomic, not_exists} ->
|
||||
?DEBUG("packet droped~n", []),
|
||||
ok;
|
||||
{aborted, Reason} ->
|
||||
?DEBUG("delivery failed: ~p~n", [Reason]),
|
||||
false
|
||||
end.
|
||||
|
||||
send_element(Pid, El) ->
|
||||
Pid ! {send_element, El}.
|
@ -16,15 +16,19 @@
|
||||
-export([start/1, receiver/2, send_text/2, send_element/2]).
|
||||
|
||||
%% gen_fsm callbacks
|
||||
-export([init/1, wait_for_stream/2, wait_for_key/2, session_established/2,
|
||||
-export([init/1,
|
||||
wait_for_stream/2,
|
||||
wait_for_key/2,
|
||||
wait_for_verification/2,
|
||||
stream_established/2,
|
||||
handle_info/3,
|
||||
terminate/3]).
|
||||
|
||||
-record(state, {socket, receiver, streamid,
|
||||
myself = "localhost", server}).
|
||||
|
||||
-include("ejabberd.hrl").
|
||||
|
||||
-record(state, {socket, receiver, streamid,
|
||||
myself = ?MYNAME, server, queue}).
|
||||
|
||||
-define(DBGFSM, true).
|
||||
|
||||
-ifdef(DBGFSM).
|
||||
@ -70,7 +74,8 @@ init([Socket]) ->
|
||||
ReceiverPid = spawn(?MODULE, receiver, [Socket, self()]),
|
||||
{ok, wait_for_stream, #state{socket = Socket,
|
||||
receiver = ReceiverPid,
|
||||
streamid = new_id()}}.
|
||||
streamid = new_id(),
|
||||
queue = queue:new()}}.
|
||||
|
||||
%%----------------------------------------------------------------------
|
||||
%% Func: StateName/2
|
||||
@ -100,12 +105,24 @@ wait_for_key({xmlstreamelement, El}, StateData) ->
|
||||
case is_key_packet(El) of
|
||||
{key, To, From, Id, Key} ->
|
||||
io:format("GET KEY: ~p~n", [{To, From, Id, Key}]),
|
||||
% TODO
|
||||
ejabberd_s2s_out:start(From, {verify, self(), Key}),
|
||||
{next_state, wait_for_key, StateData};
|
||||
{next_state,
|
||||
wait_for_verification,
|
||||
StateData#state{server = From}};
|
||||
{verify, To, From, Id, Key} ->
|
||||
io:format("VERIFY KEY: ~p~n", [{To, From, Id, Key}]),
|
||||
% TODO
|
||||
Key1 = ejabberd_s2s:get_key(From),
|
||||
Type = if Key == Key1 -> "valid";
|
||||
true -> "invalid"
|
||||
end,
|
||||
send_element(StateData#state.socket,
|
||||
{xmlelement,
|
||||
"db:verify",
|
||||
[{"from", ?MYNAME},
|
||||
{"to", From},
|
||||
{"id", Id},
|
||||
{"type", Type}],
|
||||
[]}),
|
||||
{next_state, wait_for_key, StateData};
|
||||
_ ->
|
||||
{next_state, wait_for_key, StateData}
|
||||
@ -118,28 +135,85 @@ wait_for_key({xmlstreamend, Name}, StateData) ->
|
||||
wait_for_key(closed, StateData) ->
|
||||
{stop, normal, StateData}.
|
||||
|
||||
session_established({xmlstreamelement, El}, StateData) ->
|
||||
wait_for_verification(valid, StateData) ->
|
||||
send_element(StateData#state.socket,
|
||||
{xmlelement,
|
||||
"db:result",
|
||||
[{"from", ?MYNAME},
|
||||
{"to", StateData#state.server},
|
||||
{"type", "valid"}],
|
||||
[]}),
|
||||
send_queue(StateData#state.socket, StateData#state.queue),
|
||||
{next_state, stream_established, StateData};
|
||||
|
||||
wait_for_verification(invalid, StateData) ->
|
||||
send_element(StateData#state.socket,
|
||||
{xmlelement,
|
||||
"db:result",
|
||||
[{"from", ?MYNAME},
|
||||
{"to", StateData#state.server},
|
||||
{"type", "invalid"}],
|
||||
[]}),
|
||||
{stop, normal, StateData};
|
||||
|
||||
wait_for_verification({xmlstreamelement, El}, StateData) ->
|
||||
case is_key_packet(El) of
|
||||
{verify, To, From, Id, Key} ->
|
||||
io:format("VERIFY KEY: ~p~n", [{To, From, Id, Key}]),
|
||||
Key1 = ejabberd_s2s:get_key(From),
|
||||
Type = if Key == Key1 -> "valid";
|
||||
true -> "invalid"
|
||||
end,
|
||||
send_element(StateData#state.socket,
|
||||
{xmlelement,
|
||||
"db:verify",
|
||||
[{"from", ?MYNAME},
|
||||
{"to", From},
|
||||
{"id", Id},
|
||||
{"type", Type}],
|
||||
[]}),
|
||||
{next_state, wait_for_verification, StateData};
|
||||
_ ->
|
||||
{next_state, wait_for_verification, StateData}
|
||||
end;
|
||||
|
||||
wait_for_verification({xmlstreamend, Name}, StateData) ->
|
||||
% TODO
|
||||
{stop, normal, StateData};
|
||||
|
||||
wait_for_verification(closed, StateData) ->
|
||||
{stop, normal, StateData}.
|
||||
|
||||
stream_established({xmlstreamelement, El}, StateData) ->
|
||||
{xmlelement, Name, Attrs, Els} = El,
|
||||
% TODO
|
||||
FromJID = {StateData#state.server},
|
||||
From = xml:get_attr_s("from", Attrs),
|
||||
FromJID1 = jlib:string_to_jid(From),
|
||||
FromJID = case FromJID1 of
|
||||
{Node, Server, Resource} ->
|
||||
if Server == StateData#state.server -> FromJID1;
|
||||
true -> error
|
||||
end;
|
||||
_ -> error
|
||||
end,
|
||||
To = xml:get_attr_s("to", Attrs),
|
||||
ToJID = case To of
|
||||
"" ->
|
||||
{"", StateData#state.server, ""};
|
||||
_ ->
|
||||
jlib:string_to_jid(To)
|
||||
"" -> error;
|
||||
_ -> jlib:string_to_jid(To)
|
||||
end,
|
||||
case ToJID of
|
||||
error ->
|
||||
% TODO
|
||||
error;
|
||||
_ ->
|
||||
%?DEBUG("FromJID=~w, ToJID=~w, El=~w~n", [FromJID, ToJID, El]),
|
||||
ejabberd_router:route(FromJID, ToJID, El)
|
||||
if ((Name == "iq") or (Name == "message") or (Name == "presence")) and
|
||||
(ToJID /= error) and (FromJID /= error) ->
|
||||
ejabberd_router:route(FromJID, ToJID, El);
|
||||
true ->
|
||||
error
|
||||
end,
|
||||
{next_state, session_established, StateData};
|
||||
{next_state, stream_established, StateData};
|
||||
|
||||
session_established(closed, StateData) ->
|
||||
stream_established({xmlstreamend, Name}, StateData) ->
|
||||
% TODO
|
||||
{stop, normal, StateData};
|
||||
|
||||
stream_established(closed, StateData) ->
|
||||
% TODO
|
||||
{stop, normal, StateData}.
|
||||
|
||||
@ -188,7 +262,17 @@ handle_sync_event(Event, From, StateName, StateData) ->
|
||||
%%----------------------------------------------------------------------
|
||||
handle_info({send_text, Text}, StateName, StateData) ->
|
||||
send_text(StateData#state.socket, Text),
|
||||
{next_state, StateName, StateData}.
|
||||
{next_state, StateName, StateData};
|
||||
handle_info({send_element, El}, StateName, StateData) ->
|
||||
case StateName of
|
||||
stream_established ->
|
||||
send_element(StateData#state.socket, El),
|
||||
{next_state, StateName, StateData};
|
||||
_ ->
|
||||
Q = queue:in(El, StateData#state.queue),
|
||||
{next_state, StateName, StateData#state{queue = Q}}
|
||||
end.
|
||||
|
||||
|
||||
%%----------------------------------------------------------------------
|
||||
%% Func: terminate/3
|
||||
@ -203,6 +287,7 @@ terminate(Reason, StateName, StateData) ->
|
||||
% %ejabberd_sm:close_session(StateData#state.user,
|
||||
% % StateData#state.resource)
|
||||
% end,
|
||||
%ejabberd_s2s ! {closed_conection, StateData#state.server},
|
||||
gen_tcp:close(StateData#state.socket),
|
||||
ok.
|
||||
|
||||
@ -231,6 +316,16 @@ send_text(Socket, Text) ->
|
||||
send_element(Socket, El) ->
|
||||
send_text(Socket, xml:element_to_string(El)).
|
||||
|
||||
send_queue(Socket, Q) ->
|
||||
case queue:out(Q) of
|
||||
{{value, El}, Q1} ->
|
||||
send_element(Socket, El),
|
||||
send_queue(Socket, Q1);
|
||||
{empty, Q1} ->
|
||||
ok
|
||||
end.
|
||||
|
||||
|
||||
new_id() ->
|
||||
lists:flatten(io_lib:format("~p", [random:uniform(65536*65536)])).
|
||||
|
||||
@ -250,25 +345,5 @@ is_key_packet({xmlelement, Name, Attrs, Els}) when Name == "db:verify" ->
|
||||
is_key_packet(_) ->
|
||||
false.
|
||||
|
||||
get_auth_tags([{xmlelement, Name, Attrs, Els}| L], U, P, D, R) ->
|
||||
CData = xml:get_cdata(Els),
|
||||
case Name of
|
||||
"username" ->
|
||||
get_auth_tags(L, CData, P, D, R);
|
||||
"password" ->
|
||||
get_auth_tags(L, U, CData, D, R);
|
||||
"digest" ->
|
||||
get_auth_tags(L, U, P, CData, R);
|
||||
"resource" ->
|
||||
get_auth_tags(L, U, P, D, CData);
|
||||
_ ->
|
||||
get_auth_tags(L, U, P, D, R)
|
||||
end;
|
||||
get_auth_tags([_ | L], U, P, D, R) ->
|
||||
get_auth_tags(L, U, P, D, R);
|
||||
get_auth_tags([], U, P, D, R) ->
|
||||
{U, P, D, R}.
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -17,18 +17,19 @@
|
||||
|
||||
%% gen_fsm callbacks
|
||||
-export([init/1,
|
||||
open_socket/2,
|
||||
wait_for_stream/2,
|
||||
wait_for_key/2,
|
||||
wait_for_validation/2,
|
||||
wait_for_verification/2,
|
||||
session_established/2,
|
||||
stream_established/2,
|
||||
handle_info/3,
|
||||
terminate/3]).
|
||||
|
||||
-record(state, {socket, receiver, streamid,
|
||||
myself = "localhost", server, type, xmlpid}).
|
||||
|
||||
-include("ejabberd.hrl").
|
||||
|
||||
-record(state, {socket, receiver, streamid,
|
||||
myself = ?MYNAME, server, type, xmlpid, queue}).
|
||||
|
||||
-define(DBGFSM, true).
|
||||
|
||||
-ifdef(DBGFSM).
|
||||
@ -57,7 +58,8 @@
|
||||
%%% API
|
||||
%%%----------------------------------------------------------------------
|
||||
start(Host, Type) ->
|
||||
gen_fsm:start(ejabberd_s2s_out, [Host, Type], ?FSMOPTS).
|
||||
{ok, Pid} = gen_fsm:start(ejabberd_s2s_out, [Host, Type], ?FSMOPTS),
|
||||
Pid.
|
||||
|
||||
%%%----------------------------------------------------------------------
|
||||
%%% Callback functions from gen_fsm
|
||||
@ -70,17 +72,11 @@ start(Host, Type) ->
|
||||
%% ignore |
|
||||
%% {stop, StopReason}
|
||||
%%----------------------------------------------------------------------
|
||||
init([Host, Type]) ->
|
||||
{ok, Socket} = gen_tcp:connect(Host, 5569,
|
||||
[binary, {packet, 0}]),
|
||||
XMLStreamPid = xml_stream:start(self()),
|
||||
%ReceiverPid = spawn(?MODULE, receiver, [Socket, self()]),
|
||||
send_text(Socket, ?STREAM_HEADER),
|
||||
{ok, wait_for_stream, #state{socket = Socket,
|
||||
xmlpid = XMLStreamPid,
|
||||
streamid = new_id(),
|
||||
server = Host,
|
||||
type = Type}}.
|
||||
init([Server, Type]) ->
|
||||
gen_fsm:send_event(self(), init),
|
||||
{ok, open_socket, #state{queue = queue:new(),
|
||||
server = Server,
|
||||
type = Type}}.
|
||||
|
||||
%%----------------------------------------------------------------------
|
||||
%% Func: StateName/2
|
||||
@ -88,22 +84,46 @@ init([Host, Type]) ->
|
||||
%% {next_state, NextStateName, NextStateData, Timeout} |
|
||||
%% {stop, Reason, NewStateData}
|
||||
%%----------------------------------------------------------------------
|
||||
state_name(Event, StateData) ->
|
||||
{next_state, state_name, StateData}.
|
||||
open_socket(init, StateData) ->
|
||||
case gen_tcp:connect(StateData#state.server, 5569,
|
||||
[binary, {packet, 0}]) of
|
||||
{ok, Socket} ->
|
||||
XMLStreamPid = xml_stream:start(self()),
|
||||
send_text(Socket, ?STREAM_HEADER),
|
||||
{next_state, wait_for_stream,
|
||||
StateData#state{socket = Socket,
|
||||
xmlpid = XMLStreamPid,
|
||||
streamid = new_id()}};
|
||||
{error, Reason} ->
|
||||
?DEBUG("s2s_out: connect return ~p~n", [Reason]),
|
||||
Text = case Reason of
|
||||
timeout -> "Server Connect Timeout";
|
||||
_ -> "Server Connect Failed"
|
||||
end,
|
||||
bounce_messages(Text),
|
||||
{stop, normal, StateData}
|
||||
end.
|
||||
|
||||
|
||||
wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) ->
|
||||
% TODO
|
||||
case {xml:get_attr_s("xmlns", Attrs), xml:get_attr_s("xmlns:db", Attrs)} of
|
||||
{"jabber:server", "jabber:server:dialback"} ->
|
||||
case StateData#state.type of
|
||||
new ->
|
||||
{new, Key} ->
|
||||
send_element(StateData#state.socket,
|
||||
{xmlelement,
|
||||
"db:result",
|
||||
[{"from", ?MYNAME},
|
||||
{"to", StateData#state.server}],
|
||||
[{xmlcdata, Key}]}),
|
||||
% TODO
|
||||
{next_state, wait_for_key, StateData};
|
||||
{next_state, wait_for_validation, StateData};
|
||||
{verify, Pid, Key} ->
|
||||
send_element(StateData#state.socket,
|
||||
{xmlelement,
|
||||
"db:verify",
|
||||
[{"from", "127.0.0.1"},
|
||||
[{"from", ?MYNAME},
|
||||
{"to", StateData#state.server}],
|
||||
[{xmlcdata, Key}]}),
|
||||
{next_state, wait_for_verification, StateData}
|
||||
@ -117,37 +137,43 @@ wait_for_stream(closed, StateData) ->
|
||||
{stop, normal, StateData}.
|
||||
|
||||
|
||||
wait_for_key({xmlstreamelement, El}, StateData) ->
|
||||
case is_key_packet(El) of
|
||||
{key, To, From, Id, Key} ->
|
||||
io:format("GET KEY: ~p~n", [{To, From, Id, Key}]),
|
||||
% TODO
|
||||
{next_state, wait_for_key, StateData};
|
||||
{verify, To, From, Id, Key} ->
|
||||
io:format("VERIFY KEY: ~p~n", [{To, From, Id, Key}]),
|
||||
% TODO
|
||||
{next_state, wait_for_key, StateData};
|
||||
_ ->
|
||||
{next_state, wait_for_key, StateData}
|
||||
end;
|
||||
|
||||
wait_for_key({xmlstreamend, Name}, StateData) ->
|
||||
% TODO
|
||||
{stop, normal, StateData};
|
||||
|
||||
wait_for_key(closed, StateData) ->
|
||||
{stop, normal, StateData}.
|
||||
|
||||
wait_for_verification({xmlstreamelement, El}, StateData) ->
|
||||
wait_for_validation({xmlstreamelement, El}, StateData) ->
|
||||
case is_verify_res(El) of
|
||||
{result, To, From, Id, Type} ->
|
||||
case Type of
|
||||
"valid" ->
|
||||
io:format("VALID KEY~n", []);
|
||||
% TODO
|
||||
send_queue(StateData#state.socket, StateData#state.queue),
|
||||
{next_state, stream_established, StateData};
|
||||
_ ->
|
||||
% TODO: bounce packets
|
||||
{stop, normal, StateData}
|
||||
end;
|
||||
_ ->
|
||||
{next_state, wait_for_validation, StateData}
|
||||
end;
|
||||
|
||||
wait_for_validation({xmlstreamend, Name}, StateData) ->
|
||||
% TODO
|
||||
{stop, normal, StateData};
|
||||
|
||||
wait_for_validation(closed, StateData) ->
|
||||
{stop, normal, StateData}.
|
||||
|
||||
|
||||
wait_for_verification({xmlstreamelement, El}, StateData) ->
|
||||
case is_verify_res(El) of
|
||||
{result, To, From, Id, Type} ->
|
||||
{verify, Pid, Key} = StateData#state.type,
|
||||
case Type of
|
||||
"valid" ->
|
||||
io:format("VALID KEY~n", []),
|
||||
gen_fsm:send_event(Pid, valid);
|
||||
% TODO
|
||||
_ ->
|
||||
% TODO
|
||||
ok
|
||||
gen_fsm:send_event(Pid, invalid)
|
||||
end,
|
||||
{stop, normal, StateData};
|
||||
_ ->
|
||||
@ -161,28 +187,37 @@ wait_for_verification({xmlstreamend, Name}, StateData) ->
|
||||
wait_for_verification(closed, StateData) ->
|
||||
{stop, normal, StateData}.
|
||||
|
||||
session_established({xmlstreamelement, El}, StateData) ->
|
||||
|
||||
stream_established({xmlstreamelement, El}, StateData) ->
|
||||
{xmlelement, Name, Attrs, Els} = El,
|
||||
% TODO
|
||||
FromJID = {StateData#state.server},
|
||||
From = xml:get_attr_s("from", Attrs),
|
||||
FromJID1 = jlib:string_to_jid(From),
|
||||
FromJID = case FromJID1 of
|
||||
{Node, Server, Resource} ->
|
||||
if Server == StateData#state.server -> FromJID1;
|
||||
true -> error
|
||||
end;
|
||||
_ -> error
|
||||
end,
|
||||
To = xml:get_attr_s("to", Attrs),
|
||||
ToJID = case To of
|
||||
"" ->
|
||||
{"", StateData#state.server, ""};
|
||||
_ ->
|
||||
jlib:string_to_jid(To)
|
||||
"" -> error;
|
||||
_ -> jlib:string_to_jid(To)
|
||||
end,
|
||||
case ToJID of
|
||||
error ->
|
||||
% TODO
|
||||
error;
|
||||
_ ->
|
||||
%?DEBUG("FromJID=~w, ToJID=~w, El=~w~n", [FromJID, ToJID, El]),
|
||||
ejabberd_router:route(FromJID, ToJID, El)
|
||||
if ((Name == "iq") or (Name == "message") or (Name == "presence")) and
|
||||
(ToJID /= error) and (FromJID /= error) ->
|
||||
ejabberd_router:route(FromJID, ToJID, El);
|
||||
true ->
|
||||
error
|
||||
end,
|
||||
{next_state, session_established, StateData};
|
||||
{next_state, stream_established, StateData};
|
||||
|
||||
session_established(closed, StateData) ->
|
||||
stream_established({xmlstreamend, Name}, StateData) ->
|
||||
% TODO
|
||||
{stop, normal, StateData};
|
||||
|
||||
stream_established(closed, StateData) ->
|
||||
% TODO
|
||||
{stop, normal, StateData}.
|
||||
|
||||
@ -232,14 +267,23 @@ handle_sync_event(Event, From, StateName, StateData) ->
|
||||
handle_info({send_text, Text}, StateName, StateData) ->
|
||||
send_text(StateData#state.socket, Text),
|
||||
{next_state, StateName, StateData};
|
||||
handle_info({send_element, El}, StateName, StateData) ->
|
||||
case StateName of
|
||||
stream_established ->
|
||||
send_element(StateData#state.socket, El),
|
||||
{next_state, StateName, StateData};
|
||||
_ ->
|
||||
Q = queue:in(El, StateData#state.queue),
|
||||
{next_state, StateName, StateData#state{queue = Q}}
|
||||
end;
|
||||
handle_info({tcp, Socket, Data}, StateName, StateData) ->
|
||||
xml_stream:send_text(StateData#state.xmlpid, Data),
|
||||
{next_state, StateName, StateData};
|
||||
handle_info({tcp_closed, Socket}, StateName, StateData) ->
|
||||
self() ! closed,
|
||||
gen_fsm:send_event(self(), closed),
|
||||
{next_state, StateName, StateData};
|
||||
handle_info({tcp_error, Socket, Reason}, StateName, StateData) ->
|
||||
self() ! closed,
|
||||
gen_fsm:send_event(self(), closed),
|
||||
{next_state, StateName, StateData}.
|
||||
|
||||
%%----------------------------------------------------------------------
|
||||
@ -248,14 +292,19 @@ handle_info({tcp_error, Socket, Reason}, StateName, StateData) ->
|
||||
%% Returns: any
|
||||
%%----------------------------------------------------------------------
|
||||
terminate(Reason, StateName, StateData) ->
|
||||
% case StateData#state.user of
|
||||
% "" ->
|
||||
% ok;
|
||||
% _ ->
|
||||
% %ejabberd_sm:close_session(StateData#state.user,
|
||||
% % StateData#state.resource)
|
||||
% end,
|
||||
gen_tcp:close(StateData#state.socket),
|
||||
?DEBUG("s2s_out: terminate ~p~n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!~n", [[Reason, StateName, StateData]]),
|
||||
case StateData#state.type of
|
||||
{new, Key} ->
|
||||
ejabberd_s2s ! {closed_conection, StateData#state.server};
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
case StateData#state.socket of
|
||||
undefined ->
|
||||
ok;
|
||||
Socket ->
|
||||
gen_tcp:close(Socket)
|
||||
end,
|
||||
ok.
|
||||
|
||||
%%%----------------------------------------------------------------------
|
||||
@ -283,9 +332,36 @@ send_text(Socket, Text) ->
|
||||
send_element(Socket, El) ->
|
||||
send_text(Socket, xml:element_to_string(El)).
|
||||
|
||||
send_queue(Socket, Q) ->
|
||||
case queue:out(Q) of
|
||||
{{value, El}, Q1} ->
|
||||
send_element(Socket, El),
|
||||
send_queue(Socket, Q1);
|
||||
{empty, Q1} ->
|
||||
ok
|
||||
end.
|
||||
|
||||
new_id() ->
|
||||
lists:flatten(io_lib:format("~p", [random:uniform(65536*65536)])).
|
||||
|
||||
bounce_messages(Reason) ->
|
||||
receive
|
||||
{send_element, El} ->
|
||||
{xmlelement, Name, Attrs, SubTags} = El,
|
||||
case xml:get_attr_s("type", Attrs) of
|
||||
"error" ->
|
||||
ok;
|
||||
_ ->
|
||||
Err = jlib:make_error_reply(El,
|
||||
"502", Reason),
|
||||
From = jlib:string_to_jid(xml:get_attr_s("from", Attrs)),
|
||||
To = jlib:string_to_jid(xml:get_attr_s("to", Attrs)),
|
||||
ejabberd_router ! {route, To, From, Err}
|
||||
end,
|
||||
bounce_messages(Reason)
|
||||
after 0 ->
|
||||
ok
|
||||
end.
|
||||
|
||||
is_key_packet({xmlelement, Name, Attrs, Els}) when Name == "db:result" ->
|
||||
{key,
|
||||
@ -317,25 +393,3 @@ is_verify_res({xmlelement, Name, Attrs, Els}) when Name == "db:verify" ->
|
||||
is_verify_res(_) ->
|
||||
false.
|
||||
|
||||
get_auth_tags([{xmlelement, Name, Attrs, Els}| L], U, P, D, R) ->
|
||||
CData = xml:get_cdata(Els),
|
||||
case Name of
|
||||
"username" ->
|
||||
get_auth_tags(L, CData, P, D, R);
|
||||
"password" ->
|
||||
get_auth_tags(L, U, CData, D, R);
|
||||
"digest" ->
|
||||
get_auth_tags(L, U, P, CData, R);
|
||||
"resource" ->
|
||||
get_auth_tags(L, U, P, D, CData);
|
||||
_ ->
|
||||
get_auth_tags(L, U, P, D, R)
|
||||
end;
|
||||
get_auth_tags([_ | L], U, P, D, R) ->
|
||||
get_auth_tags(L, U, P, D, R);
|
||||
get_auth_tags([], U, P, D, R) ->
|
||||
{U, P, D, R}.
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -45,7 +45,6 @@ init() ->
|
||||
{local_content, true},
|
||||
{attributes, record_info(fields, mysession)}]),
|
||||
mnesia:subscribe(system),
|
||||
%ejabberd_router:register_local_route("localhost"),
|
||||
loop().
|
||||
|
||||
loop() ->
|
||||
|
Loading…
Reference in New Issue
Block a user