From 6c96829311f501769bff8f4463dc27eb7e9efcd9 Mon Sep 17 00:00:00 2001 From: Alexey Shchepin Date: Sat, 7 Dec 2002 20:27:26 +0000 Subject: [PATCH] *** empty log message *** SVN Revision: 13 --- src/ejabberd.erl | 1 + src/ejabberd.hrl | 3 + src/ejabberd_c2s.erl | 8 +- src/ejabberd_local.erl | 2 +- src/ejabberd_router.erl | 19 +-- src/ejabberd_s2s.erl | 252 +++++++++++++++++++++++++++++++++++++++ src/ejabberd_s2s_in.erl | 163 ++++++++++++++++++------- src/ejabberd_s2s_out.erl | 238 ++++++++++++++++++++++-------------- src/ejabberd_sm.erl | 1 - 9 files changed, 536 insertions(+), 151 deletions(-) create mode 100644 src/ejabberd_s2s.erl diff --git a/src/ejabberd.erl b/src/ejabberd.erl index c0d164c9f..e6e1717c6 100644 --- a/src/ejabberd.erl +++ b/src/ejabberd.erl @@ -25,6 +25,7 @@ init() -> ejabberd_auth:start(), ejabberd_router:start(), ejabberd_sm:start(), + ejabberd_s2s:start(), ejabberd_local:start(), ejabberd_listener:start(), loop(Port). diff --git a/src/ejabberd.hrl b/src/ejabberd.hrl index 35972c46b..cd08402f7 100644 --- a/src/ejabberd.hrl +++ b/src/ejabberd.hrl @@ -15,3 +15,6 @@ -define(DEBUG(F,A),[]). -endif. + +-define(MYNAME,"127.0.0.1"). + diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index f536fe32c..5c32bbde0 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -23,11 +23,11 @@ handle_info/3, terminate/3]). --record(state, {socket, sender, receiver, streamid, - user = "", server = "localhost", resource = ""}). - -include("ejabberd.hrl"). +-record(state, {socket, sender, receiver, streamid, + user = "", server = ?MYNAME, resource = ""}). + -define(DBGFSM, true). -ifdef(DBGFSM). @@ -84,7 +84,7 @@ state_name(Event, StateData) -> wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) -> % TODO Header = io_lib:format(?STREAM_HEADER, - [StateData#state.streamid, "localhost"]), + [StateData#state.streamid, ?MYNAME]), send_text(StateData#state.sender, Header), case lists:keysearch("xmlns:stream", 1, Attrs) of {value, {"xmlns:stream", "http://etherx.jabber.org/streams"}} -> diff --git a/src/ejabberd_local.erl b/src/ejabberd_local.erl index 4241088a5..0e0321c1a 100644 --- a/src/ejabberd_local.erl +++ b/src/ejabberd_local.erl @@ -21,7 +21,7 @@ start() -> spawn(ejabberd_local, init, []). init() -> - ejabberd_router:register_local_route("localhost"), + ejabberd_router:register_local_route(?MYNAME), loop(). loop() -> diff --git a/src/ejabberd_router.erl b/src/ejabberd_router.erl index 9ab3d1810..b6657effd 100644 --- a/src/ejabberd_router.erl +++ b/src/ejabberd_router.erl @@ -101,15 +101,16 @@ do_route(From, To, Packet) -> case mnesia:transaction(F) of {atomic, error} -> % TODO: start s2s instead of below - {xmlelement, Name, Attrs, SubTags} = Packet, - case xml:get_attr_s("type", Attrs) of - "error" -> - ok; - _ -> - Err = jlib:make_error_reply(Packet, - "502", "Service Unavailable"), - ejabberd_router ! {route, To, From, Err} - end; + ejabberd_s2s ! {route, From, To, Packet}; + %{xmlelement, Name, Attrs, SubTags} = Packet, + %case xml:get_attr_s("type", Attrs) of + % "error" -> + % ok; + % _ -> + % Err = jlib:make_error_reply(Packet, + % "502", "Service Unavailable"), + % ejabberd_router ! {route, To, From, Err} + %end; {atomic, {ok, Node, Pid}} -> case node() of Node -> diff --git a/src/ejabberd_s2s.erl b/src/ejabberd_s2s.erl new file mode 100644 index 000000000..8583c027d --- /dev/null +++ b/src/ejabberd_s2s.erl @@ -0,0 +1,252 @@ +%%%---------------------------------------------------------------------- +%%% File : ejabberd_s2s.erl +%%% Author : Alexey Shchepin +%%% Purpose : +%%% Created : 7 Dec 2002 by Alexey Shchepin +%%% Id : $Id$ +%%%---------------------------------------------------------------------- + +-module(ejabberd_s2s). +-author('alexey@sevcom.net'). +-vsn('$Revision$ '). + +-export([start/0, init/0, open_session/2, close_session/2, + have_connection/1, + get_key/1]). + +-include_lib("mnemosyne/include/mnemosyne.hrl"). +-include("ejabberd.hrl"). + +-record(s2s, {server, node, key}). +-record(mys2s, {server, pid}). + + +start() -> + spawn(ejabberd_s2s, init, []). + +init() -> + register(ejabberd_s2s, self()), + mnesia:create_table(s2s,[{ram_copies, [node()]}, + {attributes, record_info(fields, s2s)}]), + mnesia:add_table_index(session, node), + mnesia:create_table(mys2s, + [{ram_copies, [node()]}, + {local_content, true}, + {attributes, record_info(fields, mys2s)}]), + mnesia:subscribe(system), + loop(). + +loop() -> + receive + %{open_connection, User, Resource, From} -> + % replace_and_register_my_connection(User, Resource, From), + % replace_alien_connection(User, Resource), + % loop(); + {closed_conection, Server} -> + remove_connection(Server), + loop(); + %{replace, User, Resource} -> + % replace_my_connection(User, Resource), + % loop(); + {mnesia_system_event, {mnesia_down, Node}} -> + clean_table_from_bad_node(Node), + loop(); + {route, From, To, Packet} -> + do_route(From, To, Packet), + loop(); + _ -> + loop() + end. + + +open_session(User, Resource) -> + ejabberd_s2s ! {open_session, User, Resource, self()}. + +close_session(User, Resource) -> + ejabberd_s2s ! {close_session, User, Resource}. + +%replace_alien_connection(User, Resource) -> +% F = fun() -> +% [ID] = mnemosyne:eval(query [X.id || X <- table(user_resource), +% X.user = User, +% X.resource = Resource] +% end), +% Es = mnesia:read({session, ID}), +% mnesia:write(#session{id = ID, node = node()}), +% Es +% end, +% case mnesia:transaction(F) of +% {atomic, Rs} -> +% lists:foreach( +% fun(R) -> +% if R#session.node /= node() -> +% {ejabberd_s2s, R#session.node} ! +% {replace, User, Resource}; +% true -> +% ok +% end +% end, Rs); +% _ -> +% false +% end. +% +% +%replace_my_connection(User, Resource) -> +% F = fun() -> +% [ID] = mnemosyne:eval(query [X.id || X <- table(user_resource), +% X.user = User, +% X.resource = Resource] +% end), +% +% Es = mnesia:read({mysession, ID}), +% mnesia:delete({mysession, ID}), +% Es +% end, +% case mnesia:transaction(F) of +% {atomic, Rs} -> +% lists:foreach( +% fun(R) -> +% (R#mysession.info)#mysession_info.pid ! replaced +% end, Rs); +% _ -> +% false +% end. + +remove_connection(Server) -> + F = fun() -> + mnesia:delete({mys2s, Server}), + mnesia:delete({s2s, Server}) + end, + mnesia:transaction(F). + +%replace_and_register_my_connection(User, Resource, Pid) -> +% F = fun() -> +% IDs = mnemosyne:eval(query [X.id || X <- table(user_resource), +% X.user = User, +% X.resource = Resource] +% end), +% +% ID = case IDs of +% [Id] -> Id; +% [] -> +% [CurID] = +% mnemosyne:eval( +% query [X.id || +% X <- table(user_resource_id_seq)] +% end), +% mnesia:write( +% #user_resource_id_seq{id = CurID + 1}), +% mnesia:write( +% #user_resource{id = CurID, +% user = User, +% resource = Resource}), +% CurID +% end, +% Es = mnesia:read({mysession, ID}), +% mnesia:write(#mysession{id = ID, +% info = #mysession_info{pid = Pid}}), +% Es +% end, +% case mnesia:transaction(F) of +% {atomic, Rs} -> +% lists:foreach( +% fun(R) -> +% (R#mysession.info)#mysession_info.pid ! replaced +% end, Rs); +% _ -> +% false +% end. + + +clean_table_from_bad_node(Node) -> + F = fun() -> + Es = mnesia:index_read(s2s, Node, #s2s.node), + lists:foreach(fun(E) -> + mnesia:delete_object(s2s, E, write) + end, Es) + end, + mnesia:transaction(F). + +have_connection(Server) -> + F = fun() -> + [E] = mnesia:read({s2s, Server}) + end, + case mnesia:transaction(F) of + {atomic, _} -> + true; + _ -> + false + end. + +get_key(Server) -> + F = fun() -> + [E] = mnesia:read({s2s, Server}), + E + end, + case mnesia:transaction(F) of + {atomic, E} -> + E#s2s.key; + _ -> + "" + end. + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +do_route(From, To, Packet) -> + ?DEBUG("s2s manager~n\tfrom ~p~n\tto ~p~n\tpacket ~P~n", + [From, To, Packet, 8]), + {User, Server, Resource} = To, + Key = lists:flatten(io_lib:format("~p", [random:uniform(65536*65536)])), + F = fun() -> + case mnesia:read({mys2s, Server}) of + [] -> + case mnesia:read({s2s, Server}) of + [Er] -> + {remote, Er#s2s.node}; + [] -> + % TODO + mnesia:write(#s2s{server = Server, + node = node(), + key = Key}), + new + end; + [El] -> + {local, El#mys2s.pid} + end + end, + case mnesia:transaction(F) of + {atomic, {local, Pid}} -> + ?DEBUG("sending to process ~p~n", [Pid]), + % TODO + {xmlelement, Name, Attrs, Els} = Packet, + NewAttrs = jlib:replace_from_to_attrs(jlib:jid_to_string(From), + jlib:jid_to_string(To), + Attrs), + send_element(Pid, {xmlelement, Name, NewAttrs, Els}), + ok; + {atomic, {remote, Node}} -> + ?DEBUG("sending to node ~p~n", [Node]), + {ejabberd_s2s, Node} ! {route, From, To, Packet}, + ok; + {atomic, new} -> + ?DEBUG("starting new s2s connection~n", []), + Pid = ejabberd_s2s_out:start(Server, {new, Key}), + mnesia:transaction(fun() -> mnesia:write(#mys2s{server = Server, + pid = Pid}) end), + {xmlelement, Name, Attrs, Els} = Packet, + NewAttrs = jlib:replace_from_to_attrs(jlib:jid_to_string(From), + jlib:jid_to_string(To), + Attrs), + send_element(Pid, {xmlelement, Name, NewAttrs, Els}), + ok; + {atomic, not_exists} -> + ?DEBUG("packet droped~n", []), + ok; + {aborted, Reason} -> + ?DEBUG("delivery failed: ~p~n", [Reason]), + false + end. + +send_element(Pid, El) -> + Pid ! {send_element, El}. diff --git a/src/ejabberd_s2s_in.erl b/src/ejabberd_s2s_in.erl index b0300b81f..e282c23aa 100644 --- a/src/ejabberd_s2s_in.erl +++ b/src/ejabberd_s2s_in.erl @@ -16,15 +16,19 @@ -export([start/1, receiver/2, send_text/2, send_element/2]). %% gen_fsm callbacks --export([init/1, wait_for_stream/2, wait_for_key/2, session_established/2, +-export([init/1, + wait_for_stream/2, + wait_for_key/2, + wait_for_verification/2, + stream_established/2, handle_info/3, terminate/3]). --record(state, {socket, receiver, streamid, - myself = "localhost", server}). - -include("ejabberd.hrl"). +-record(state, {socket, receiver, streamid, + myself = ?MYNAME, server, queue}). + -define(DBGFSM, true). -ifdef(DBGFSM). @@ -70,7 +74,8 @@ init([Socket]) -> ReceiverPid = spawn(?MODULE, receiver, [Socket, self()]), {ok, wait_for_stream, #state{socket = Socket, receiver = ReceiverPid, - streamid = new_id()}}. + streamid = new_id(), + queue = queue:new()}}. %%---------------------------------------------------------------------- %% Func: StateName/2 @@ -100,12 +105,24 @@ wait_for_key({xmlstreamelement, El}, StateData) -> case is_key_packet(El) of {key, To, From, Id, Key} -> io:format("GET KEY: ~p~n", [{To, From, Id, Key}]), - % TODO ejabberd_s2s_out:start(From, {verify, self(), Key}), - {next_state, wait_for_key, StateData}; + {next_state, + wait_for_verification, + StateData#state{server = From}}; {verify, To, From, Id, Key} -> io:format("VERIFY KEY: ~p~n", [{To, From, Id, Key}]), - % TODO + Key1 = ejabberd_s2s:get_key(From), + Type = if Key == Key1 -> "valid"; + true -> "invalid" + end, + send_element(StateData#state.socket, + {xmlelement, + "db:verify", + [{"from", ?MYNAME}, + {"to", From}, + {"id", Id}, + {"type", Type}], + []}), {next_state, wait_for_key, StateData}; _ -> {next_state, wait_for_key, StateData} @@ -118,28 +135,85 @@ wait_for_key({xmlstreamend, Name}, StateData) -> wait_for_key(closed, StateData) -> {stop, normal, StateData}. -session_established({xmlstreamelement, El}, StateData) -> +wait_for_verification(valid, StateData) -> + send_element(StateData#state.socket, + {xmlelement, + "db:result", + [{"from", ?MYNAME}, + {"to", StateData#state.server}, + {"type", "valid"}], + []}), + send_queue(StateData#state.socket, StateData#state.queue), + {next_state, stream_established, StateData}; + +wait_for_verification(invalid, StateData) -> + send_element(StateData#state.socket, + {xmlelement, + "db:result", + [{"from", ?MYNAME}, + {"to", StateData#state.server}, + {"type", "invalid"}], + []}), + {stop, normal, StateData}; + +wait_for_verification({xmlstreamelement, El}, StateData) -> + case is_key_packet(El) of + {verify, To, From, Id, Key} -> + io:format("VERIFY KEY: ~p~n", [{To, From, Id, Key}]), + Key1 = ejabberd_s2s:get_key(From), + Type = if Key == Key1 -> "valid"; + true -> "invalid" + end, + send_element(StateData#state.socket, + {xmlelement, + "db:verify", + [{"from", ?MYNAME}, + {"to", From}, + {"id", Id}, + {"type", Type}], + []}), + {next_state, wait_for_verification, StateData}; + _ -> + {next_state, wait_for_verification, StateData} + end; + +wait_for_verification({xmlstreamend, Name}, StateData) -> + % TODO + {stop, normal, StateData}; + +wait_for_verification(closed, StateData) -> + {stop, normal, StateData}. + +stream_established({xmlstreamelement, El}, StateData) -> {xmlelement, Name, Attrs, Els} = El, % TODO - FromJID = {StateData#state.server}, + From = xml:get_attr_s("from", Attrs), + FromJID1 = jlib:string_to_jid(From), + FromJID = case FromJID1 of + {Node, Server, Resource} -> + if Server == StateData#state.server -> FromJID1; + true -> error + end; + _ -> error + end, To = xml:get_attr_s("to", Attrs), ToJID = case To of - "" -> - {"", StateData#state.server, ""}; - _ -> - jlib:string_to_jid(To) + "" -> error; + _ -> jlib:string_to_jid(To) end, - case ToJID of - error -> - % TODO - error; - _ -> - %?DEBUG("FromJID=~w, ToJID=~w, El=~w~n", [FromJID, ToJID, El]), - ejabberd_router:route(FromJID, ToJID, El) + if ((Name == "iq") or (Name == "message") or (Name == "presence")) and + (ToJID /= error) and (FromJID /= error) -> + ejabberd_router:route(FromJID, ToJID, El); + true -> + error end, - {next_state, session_established, StateData}; + {next_state, stream_established, StateData}; -session_established(closed, StateData) -> +stream_established({xmlstreamend, Name}, StateData) -> + % TODO + {stop, normal, StateData}; + +stream_established(closed, StateData) -> % TODO {stop, normal, StateData}. @@ -188,7 +262,17 @@ handle_sync_event(Event, From, StateName, StateData) -> %%---------------------------------------------------------------------- handle_info({send_text, Text}, StateName, StateData) -> send_text(StateData#state.socket, Text), - {next_state, StateName, StateData}. + {next_state, StateName, StateData}; +handle_info({send_element, El}, StateName, StateData) -> + case StateName of + stream_established -> + send_element(StateData#state.socket, El), + {next_state, StateName, StateData}; + _ -> + Q = queue:in(El, StateData#state.queue), + {next_state, StateName, StateData#state{queue = Q}} + end. + %%---------------------------------------------------------------------- %% Func: terminate/3 @@ -203,6 +287,7 @@ terminate(Reason, StateName, StateData) -> % %ejabberd_sm:close_session(StateData#state.user, % % StateData#state.resource) % end, + %ejabberd_s2s ! {closed_conection, StateData#state.server}, gen_tcp:close(StateData#state.socket), ok. @@ -231,6 +316,16 @@ send_text(Socket, Text) -> send_element(Socket, El) -> send_text(Socket, xml:element_to_string(El)). +send_queue(Socket, Q) -> + case queue:out(Q) of + {{value, El}, Q1} -> + send_element(Socket, El), + send_queue(Socket, Q1); + {empty, Q1} -> + ok + end. + + new_id() -> lists:flatten(io_lib:format("~p", [random:uniform(65536*65536)])). @@ -250,25 +345,5 @@ is_key_packet({xmlelement, Name, Attrs, Els}) when Name == "db:verify" -> is_key_packet(_) -> false. -get_auth_tags([{xmlelement, Name, Attrs, Els}| L], U, P, D, R) -> - CData = xml:get_cdata(Els), - case Name of - "username" -> - get_auth_tags(L, CData, P, D, R); - "password" -> - get_auth_tags(L, U, CData, D, R); - "digest" -> - get_auth_tags(L, U, P, CData, R); - "resource" -> - get_auth_tags(L, U, P, D, CData); - _ -> - get_auth_tags(L, U, P, D, R) - end; -get_auth_tags([_ | L], U, P, D, R) -> - get_auth_tags(L, U, P, D, R); -get_auth_tags([], U, P, D, R) -> - {U, P, D, R}. - - diff --git a/src/ejabberd_s2s_out.erl b/src/ejabberd_s2s_out.erl index b51a61d2a..414e588fc 100644 --- a/src/ejabberd_s2s_out.erl +++ b/src/ejabberd_s2s_out.erl @@ -17,18 +17,19 @@ %% gen_fsm callbacks -export([init/1, + open_socket/2, wait_for_stream/2, - wait_for_key/2, + wait_for_validation/2, wait_for_verification/2, - session_established/2, + stream_established/2, handle_info/3, terminate/3]). --record(state, {socket, receiver, streamid, - myself = "localhost", server, type, xmlpid}). - -include("ejabberd.hrl"). +-record(state, {socket, receiver, streamid, + myself = ?MYNAME, server, type, xmlpid, queue}). + -define(DBGFSM, true). -ifdef(DBGFSM). @@ -57,7 +58,8 @@ %%% API %%%---------------------------------------------------------------------- start(Host, Type) -> - gen_fsm:start(ejabberd_s2s_out, [Host, Type], ?FSMOPTS). + {ok, Pid} = gen_fsm:start(ejabberd_s2s_out, [Host, Type], ?FSMOPTS), + Pid. %%%---------------------------------------------------------------------- %%% Callback functions from gen_fsm @@ -70,17 +72,11 @@ start(Host, Type) -> %% ignore | %% {stop, StopReason} %%---------------------------------------------------------------------- -init([Host, Type]) -> - {ok, Socket} = gen_tcp:connect(Host, 5569, - [binary, {packet, 0}]), - XMLStreamPid = xml_stream:start(self()), - %ReceiverPid = spawn(?MODULE, receiver, [Socket, self()]), - send_text(Socket, ?STREAM_HEADER), - {ok, wait_for_stream, #state{socket = Socket, - xmlpid = XMLStreamPid, - streamid = new_id(), - server = Host, - type = Type}}. +init([Server, Type]) -> + gen_fsm:send_event(self(), init), + {ok, open_socket, #state{queue = queue:new(), + server = Server, + type = Type}}. %%---------------------------------------------------------------------- %% Func: StateName/2 @@ -88,22 +84,46 @@ init([Host, Type]) -> %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- -state_name(Event, StateData) -> - {next_state, state_name, StateData}. +open_socket(init, StateData) -> + case gen_tcp:connect(StateData#state.server, 5569, + [binary, {packet, 0}]) of + {ok, Socket} -> + XMLStreamPid = xml_stream:start(self()), + send_text(Socket, ?STREAM_HEADER), + {next_state, wait_for_stream, + StateData#state{socket = Socket, + xmlpid = XMLStreamPid, + streamid = new_id()}}; + {error, Reason} -> + ?DEBUG("s2s_out: connect return ~p~n", [Reason]), + Text = case Reason of + timeout -> "Server Connect Timeout"; + _ -> "Server Connect Failed" + end, + bounce_messages(Text), + {stop, normal, StateData} + end. + wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) -> % TODO case {xml:get_attr_s("xmlns", Attrs), xml:get_attr_s("xmlns:db", Attrs)} of {"jabber:server", "jabber:server:dialback"} -> case StateData#state.type of - new -> + {new, Key} -> + send_element(StateData#state.socket, + {xmlelement, + "db:result", + [{"from", ?MYNAME}, + {"to", StateData#state.server}], + [{xmlcdata, Key}]}), % TODO - {next_state, wait_for_key, StateData}; + {next_state, wait_for_validation, StateData}; {verify, Pid, Key} -> send_element(StateData#state.socket, {xmlelement, "db:verify", - [{"from", "127.0.0.1"}, + [{"from", ?MYNAME}, {"to", StateData#state.server}], [{xmlcdata, Key}]}), {next_state, wait_for_verification, StateData} @@ -117,37 +137,43 @@ wait_for_stream(closed, StateData) -> {stop, normal, StateData}. -wait_for_key({xmlstreamelement, El}, StateData) -> - case is_key_packet(El) of - {key, To, From, Id, Key} -> - io:format("GET KEY: ~p~n", [{To, From, Id, Key}]), - % TODO - {next_state, wait_for_key, StateData}; - {verify, To, From, Id, Key} -> - io:format("VERIFY KEY: ~p~n", [{To, From, Id, Key}]), - % TODO - {next_state, wait_for_key, StateData}; - _ -> - {next_state, wait_for_key, StateData} - end; -wait_for_key({xmlstreamend, Name}, StateData) -> - % TODO - {stop, normal, StateData}; - -wait_for_key(closed, StateData) -> - {stop, normal, StateData}. - -wait_for_verification({xmlstreamelement, El}, StateData) -> +wait_for_validation({xmlstreamelement, El}, StateData) -> case is_verify_res(El) of {result, To, From, Id, Type} -> case Type of "valid" -> - io:format("VALID KEY~n", []); + % TODO + send_queue(StateData#state.socket, StateData#state.queue), + {next_state, stream_established, StateData}; + _ -> + % TODO: bounce packets + {stop, normal, StateData} + end; + _ -> + {next_state, wait_for_validation, StateData} + end; + +wait_for_validation({xmlstreamend, Name}, StateData) -> + % TODO + {stop, normal, StateData}; + +wait_for_validation(closed, StateData) -> + {stop, normal, StateData}. + + +wait_for_verification({xmlstreamelement, El}, StateData) -> + case is_verify_res(El) of + {result, To, From, Id, Type} -> + {verify, Pid, Key} = StateData#state.type, + case Type of + "valid" -> + io:format("VALID KEY~n", []), + gen_fsm:send_event(Pid, valid); % TODO _ -> % TODO - ok + gen_fsm:send_event(Pid, invalid) end, {stop, normal, StateData}; _ -> @@ -161,28 +187,37 @@ wait_for_verification({xmlstreamend, Name}, StateData) -> wait_for_verification(closed, StateData) -> {stop, normal, StateData}. -session_established({xmlstreamelement, El}, StateData) -> + +stream_established({xmlstreamelement, El}, StateData) -> {xmlelement, Name, Attrs, Els} = El, % TODO - FromJID = {StateData#state.server}, + From = xml:get_attr_s("from", Attrs), + FromJID1 = jlib:string_to_jid(From), + FromJID = case FromJID1 of + {Node, Server, Resource} -> + if Server == StateData#state.server -> FromJID1; + true -> error + end; + _ -> error + end, To = xml:get_attr_s("to", Attrs), ToJID = case To of - "" -> - {"", StateData#state.server, ""}; - _ -> - jlib:string_to_jid(To) + "" -> error; + _ -> jlib:string_to_jid(To) end, - case ToJID of - error -> - % TODO - error; - _ -> - %?DEBUG("FromJID=~w, ToJID=~w, El=~w~n", [FromJID, ToJID, El]), - ejabberd_router:route(FromJID, ToJID, El) + if ((Name == "iq") or (Name == "message") or (Name == "presence")) and + (ToJID /= error) and (FromJID /= error) -> + ejabberd_router:route(FromJID, ToJID, El); + true -> + error end, - {next_state, session_established, StateData}; + {next_state, stream_established, StateData}; -session_established(closed, StateData) -> +stream_established({xmlstreamend, Name}, StateData) -> + % TODO + {stop, normal, StateData}; + +stream_established(closed, StateData) -> % TODO {stop, normal, StateData}. @@ -232,14 +267,23 @@ handle_sync_event(Event, From, StateName, StateData) -> handle_info({send_text, Text}, StateName, StateData) -> send_text(StateData#state.socket, Text), {next_state, StateName, StateData}; +handle_info({send_element, El}, StateName, StateData) -> + case StateName of + stream_established -> + send_element(StateData#state.socket, El), + {next_state, StateName, StateData}; + _ -> + Q = queue:in(El, StateData#state.queue), + {next_state, StateName, StateData#state{queue = Q}} + end; handle_info({tcp, Socket, Data}, StateName, StateData) -> xml_stream:send_text(StateData#state.xmlpid, Data), {next_state, StateName, StateData}; handle_info({tcp_closed, Socket}, StateName, StateData) -> - self() ! closed, + gen_fsm:send_event(self(), closed), {next_state, StateName, StateData}; handle_info({tcp_error, Socket, Reason}, StateName, StateData) -> - self() ! closed, + gen_fsm:send_event(self(), closed), {next_state, StateName, StateData}. %%---------------------------------------------------------------------- @@ -248,14 +292,19 @@ handle_info({tcp_error, Socket, Reason}, StateName, StateData) -> %% Returns: any %%---------------------------------------------------------------------- terminate(Reason, StateName, StateData) -> -% case StateData#state.user of -% "" -> -% ok; -% _ -> -% %ejabberd_sm:close_session(StateData#state.user, -% % StateData#state.resource) -% end, - gen_tcp:close(StateData#state.socket), + ?DEBUG("s2s_out: terminate ~p~n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!~n", [[Reason, StateName, StateData]]), + case StateData#state.type of + {new, Key} -> + ejabberd_s2s ! {closed_conection, StateData#state.server}; + _ -> + ok + end, + case StateData#state.socket of + undefined -> + ok; + Socket -> + gen_tcp:close(Socket) + end, ok. %%%---------------------------------------------------------------------- @@ -283,9 +332,36 @@ send_text(Socket, Text) -> send_element(Socket, El) -> send_text(Socket, xml:element_to_string(El)). +send_queue(Socket, Q) -> + case queue:out(Q) of + {{value, El}, Q1} -> + send_element(Socket, El), + send_queue(Socket, Q1); + {empty, Q1} -> + ok + end. + new_id() -> lists:flatten(io_lib:format("~p", [random:uniform(65536*65536)])). +bounce_messages(Reason) -> + receive + {send_element, El} -> + {xmlelement, Name, Attrs, SubTags} = El, + case xml:get_attr_s("type", Attrs) of + "error" -> + ok; + _ -> + Err = jlib:make_error_reply(El, + "502", Reason), + From = jlib:string_to_jid(xml:get_attr_s("from", Attrs)), + To = jlib:string_to_jid(xml:get_attr_s("to", Attrs)), + ejabberd_router ! {route, To, From, Err} + end, + bounce_messages(Reason) + after 0 -> + ok + end. is_key_packet({xmlelement, Name, Attrs, Els}) when Name == "db:result" -> {key, @@ -317,25 +393,3 @@ is_verify_res({xmlelement, Name, Attrs, Els}) when Name == "db:verify" -> is_verify_res(_) -> false. -get_auth_tags([{xmlelement, Name, Attrs, Els}| L], U, P, D, R) -> - CData = xml:get_cdata(Els), - case Name of - "username" -> - get_auth_tags(L, CData, P, D, R); - "password" -> - get_auth_tags(L, U, CData, D, R); - "digest" -> - get_auth_tags(L, U, P, CData, R); - "resource" -> - get_auth_tags(L, U, P, D, CData); - _ -> - get_auth_tags(L, U, P, D, R) - end; -get_auth_tags([_ | L], U, P, D, R) -> - get_auth_tags(L, U, P, D, R); -get_auth_tags([], U, P, D, R) -> - {U, P, D, R}. - - - - diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index e9f51f992..896a1b9e6 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -45,7 +45,6 @@ init() -> {local_content, true}, {attributes, record_info(fields, mysession)}]), mnesia:subscribe(system), - %ejabberd_router:register_local_route("localhost"), loop(). loop() ->