From a4caafeb72738b1a5e33a8c019f6792c0003548a Mon Sep 17 00:00:00 2001 From: Alexey Shchepin Date: Mon, 23 Jan 2006 23:13:06 +0000 Subject: [PATCH] * src/ejabberd_sm.erl: Partially rewritten to work more efficiently and avoid race conditions * src/ejabberd_c2s.erl: Likewise * src/mod_irc/mod_irc_connection.erl: Cleanup SVN Revision: 488 --- ChangeLog | 13 +++ src/ejabberd_c2s.erl | 48 ++++---- src/ejabberd_sm.erl | 170 +++++++++++++++-------------- src/mod_irc/mod_irc_connection.erl | 19 +--- 4 files changed, 134 insertions(+), 116 deletions(-) diff --git a/ChangeLog b/ChangeLog index 2e657bcec..c485c6796 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,9 +1,22 @@ +<<<<<<< .mine +2006-01-23 Alexey Shchepin + + * src/ejabberd_sm.erl: Partially rewritten to work more + efficiently and avoid race conditions + * src/ejabberd_c2s.erl: Likewise + +2006-01-21 Alexey Shchepin + + * src/mod_irc/mod_irc_connection.erl: Cleanup + +======= 2006-01-20 Mickael Remond * src/ejabberd_receiver.erl: Added new debugging trace: It is now possible to dump the XML stream received from a client (usefull for client debugging). +>>>>>>> .r487 2006-01-19 Alexey Shchepin * src/aclocal.m4: Updated for zlib support diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 4a21772cb..636cd96a2 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -54,6 +54,7 @@ authenticated = false, jid, user = "", server = ?MYNAME, resource = "", + sid, pres_t = ?SETS:new(), pres_f = ?SETS:new(), pres_a = ?SETS:new(), @@ -372,8 +373,9 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> "(~w) Accepted legacy authentication for ~s", [StateData#state.socket, jlib:jid_to_string(JID)]), + SID = {now(), self()}, ejabberd_sm:open_session( - U, StateData#state.server, R), + SID, U, StateData#state.server, R), Res1 = jlib:make_result_iq_reply(El), Res = setelement(4, Res1, []), send_element(StateData, Res), @@ -397,6 +399,7 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> StateData#state{user = U, resource = R, jid = JID, + sid = SID, pres_f = ?SETS:from_list(Fs1), pres_t = ?SETS:from_list(Ts1), privacy_list = PrivList}}; @@ -664,8 +667,9 @@ wait_for_session({xmlstreamelement, El}, StateData) -> ?INFO_MSG("(~w) Opened session for ~s", [StateData#state.socket, jlib:jid_to_string(JID)]), + SID = {now(), self()}, ejabberd_sm:open_session( - U, StateData#state.server, R), + SID, U, StateData#state.server, R), Res = jlib:make_result_iq_reply(El), send_element(StateData, Res), change_shaper(StateData, JID), @@ -684,7 +688,8 @@ wait_for_session({xmlstreamelement, El}, StateData) -> PL -> PL end, {next_state, session_established, - StateData#state{pres_f = ?SETS:from_list(Fs1), + StateData#state{sid = SID, + pres_f = ?SETS:from_list(Fs1), pres_t = ?SETS:from_list(Ts1), privacy_list = PrivList}}; _ -> @@ -1037,10 +1042,12 @@ terminate(_Reason, StateName, StateData) -> [{"type", "unavailable"}], [{xmlelement, "status", [], [{xmlcdata, "Replaced by new connection"}]}]}, - ejabberd_sm:unset_presence(StateData#state.user, - StateData#state.server, - StateData#state.resource, - "Replaced by new connection"), + ejabberd_sm:close_session_unset_presence( + StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource, + "Replaced by new connection"), presence_broadcast( StateData, From, StateData#state.pres_a, Packet), presence_broadcast( @@ -1049,25 +1056,24 @@ terminate(_Reason, StateName, StateData) -> ?INFO_MSG("(~w) Close session for ~s", [StateData#state.socket, jlib:jid_to_string(StateData#state.jid)]), - ejabberd_sm:close_session(StateData#state.user, - StateData#state.server, - StateData#state.resource), - Tmp = ?SETS:new(), + EmptySet = ?SETS:new(), case StateData of #state{pres_last = undefined, - pres_a = Tmp, - pres_i = Tmp, + pres_a = EmptySet, + pres_i = EmptySet, pres_invis = false} -> - ok; + ejabberd_sm:close_session(StateData#state.sid); _ -> From = StateData#state.jid, Packet = {xmlelement, "presence", [{"type", "unavailable"}], []}, - ejabberd_sm:unset_presence(StateData#state.user, - StateData#state.server, - StateData#state.resource, - ""), + ejabberd_sm:close_session_unset_presence( + StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource, + ""), presence_broadcast( StateData, From, StateData#state.pres_a, Packet), presence_broadcast( @@ -1189,7 +1195,8 @@ presence_update(From, Packet, StateData) -> StatusTag -> xml:get_tag_cdata(StatusTag) end, - ejabberd_sm:unset_presence(StateData#state.user, + ejabberd_sm:unset_presence(StateData#state.sid, + StateData#state.user, StateData#state.server, StateData#state.resource, Status), @@ -1493,7 +1500,8 @@ update_priority(El, StateData) -> 0 end end, - ejabberd_sm:set_presence(StateData#state.user, + ejabberd_sm:set_presence(StateData#state.sid, + StateData#state.user, StateData#state.server, StateData#state.resource, Pri). diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index 79876dd35..34d2fec47 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -12,12 +12,13 @@ -export([start_link/0, init/0, route/3, - open_session/3, close_session/3, + open_session/4, close_session/1, bounce_offline_message/3, disconnect_removed_user/2, get_user_resources/2, - set_presence/4, - unset_presence/4, + set_presence/5, + unset_presence/5, + close_session_unset_presence/5, dirty_get_sessions_list/0, dirty_get_my_sessions_list/0, get_vh_session_list/1, @@ -29,8 +30,7 @@ -include("ejabberd.hrl"). -include("jlib.hrl"). --record(session, {usr, us, pid}). --record(presence, {usr, us, priority}). +-record(session, {sid, usr, us, priority}). start_link() -> Pid = proc_lib:spawn_link(ejabberd_sm, init, []), @@ -39,14 +39,12 @@ start_link() -> init() -> update_tables(), - mnesia:create_table(session, [{ram_copies, [node()]}, - {attributes, record_info(fields, session)}]), + mnesia:create_table(session, + [{ram_copies, [node()]}, + {attributes, record_info(fields, session)}]), + mnesia:add_table_index(session, usr), mnesia:add_table_index(session, us), mnesia:add_table_copy(session, node(), ram_copies), - mnesia:create_table(presence, - [{ram_copies, [node()]}, - {attributes, record_info(fields, presence)}]), - mnesia:add_table_index(presence, us), mnesia:subscribe(system), ets:new(sm_iqtable, [named_table]), lists:foreach( @@ -101,59 +99,57 @@ route(From, To, Packet) -> ok end. -open_session(User, Server, Resource) -> - register_connection(User, Server, Resource, self()). +open_session(SID, User, Server, Resource) -> + set_session(SID, User, Server, Resource, undefined). -close_session(User, Server, Resource) -> - remove_connection(User, Server, Resource). - - -register_connection(User, Server, Resource, Pid) -> +set_session(SID, User, Server, Resource, Priority) -> LUser = jlib:nodeprep(User), LServer = jlib:nameprep(Server), LResource = jlib:resourceprep(Resource), US = {LUser, LServer}, USR = {LUser, LServer, LResource}, F = fun() -> - Ss = mnesia:wread({session, USR}), - mnesia:write(#session{usr = USR, us = US, pid = Pid}), - Ss - end, - case mnesia:transaction(F) of - {atomic, Ss} -> + mnesia:write(#session{sid = SID, + usr = USR, + us = US, + priority = Priority}) + end, + mnesia:sync_dirty(F), + SIDs = mnesia:dirty_select( + session, + [{#session{sid = '$1', usr = USR, _ = '_'}, [], ['$1']}]), + if + SIDs == [] -> + ok; + true -> + MaxSID = lists:max(SIDs), lists:foreach( - fun(R) -> - R#session.pid ! replaced - end, Ss); - _ -> - false + fun({_, Pid} = S) when S /= MaxSID -> + Pid ! replaced; + (_) -> + ok + end, SIDs) end. - -remove_connection(User, Server, Resource) -> - LUser = jlib:nodeprep(User), - LResource = jlib:resourceprep(Resource), - LServer = jlib:nameprep(Server), - USR = {LUser, LServer, LResource}, +close_session(SID) -> F = fun() -> - mnesia:delete({session, USR}) + mnesia:delete({session, SID}) end, - mnesia:transaction(F). + mnesia:sync_dirty(F). clean_table_from_bad_node(Node) -> F = fun() -> Es = mnesia:select( session, - [{#session{pid = '$1', _ = '_'}, + [{#session{sid = {'_', '$1'}, _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}]), lists:foreach(fun(E) -> - mnesia:delete_object(E), - mnesia:delete({presence, E#session.usr}) + mnesia:delete_object(E) end, Es) end, - mnesia:transaction(F). + mnesia:sync_dirty(F). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -257,7 +253,7 @@ do_route(From, To, Packet) -> end; _ -> USR = {LUser, LServer, LResource}, - case mnesia:dirty_read({session, USR}) of + case mnesia:dirty_index_read(session, USR, #session.usr) of [] -> case Name of "message" -> @@ -275,8 +271,9 @@ do_route(From, To, Packet) -> _ -> ?DEBUG("packet droped~n", []) end; - [Sess] -> - Pid = Sess#session.pid, + Ss -> + Session = lists:max(Ss), + Pid = element(2, Session#session.sid), ?DEBUG("sending to process ~p~n", [Pid]), Pid ! {route, From, To, Packet} end @@ -290,11 +287,12 @@ route_message(From, To, Packet) -> Priority >= 0 -> LResource = jlib:resourceprep(R), USR = {LUser, LServer, LResource}, - case mnesia:dirty_read({session, USR}) of + case mnesia:dirty_index_read(session, USR, #session.usr) of [] -> ok; % Race condition - [Sess] -> - Pid = Sess#session.pid, + Ss -> + Session = lists:max(Ss), + Pid = element(2, Session#session.sid), ?DEBUG("sending to process ~p~n", [Pid]), Pid ! {route, From, To, Packet} end; @@ -337,53 +335,67 @@ get_user_resources(User, Server) -> case catch mnesia:dirty_index_read(session, US, #session.us) of {'EXIT', _Reason} -> []; - Rs -> - lists:map(fun(R) -> - element(3, R#session.usr) - end, Rs) + Ss -> + [element(3, S#session.usr) || S <- clean_session_list(Ss)] + end. + +clean_session_list(Ss) -> + clean_session_list(lists:keysort(#session.usr, Ss), []). + +clean_session_list([], Res) -> + Res; +clean_session_list([S], Res) -> + [S | Res]; +clean_session_list([S1, S2 | Rest], Res) -> + if + S1#session.usr == S2#session.usr -> + if + S1#session.sid > S2#session.sid -> + clean_session_list([S1 | Rest], Res); + true -> + clean_session_list([S2 | Rest], Res) + end; + true -> + clean_session_list([S2 | Rest], [S1 | Res]) end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -set_presence(User, Server, Resource, Priority) -> - LUser = jlib:nodeprep(User), - LServer = jlib:nameprep(Server), - USR = {User, Server, Resource}, - US = {LUser, LServer}, - F = fun() -> - mnesia:write(#presence{usr = USR, us = US, - priority = Priority}) - end, - mnesia:transaction(F). +set_presence(SID, User, Server, Resource, Priority) -> + set_session(SID, User, Server, Resource, Priority). -unset_presence(User, Server, Resource, Status) -> - USR = {User, Server, Resource}, - F = fun() -> - mnesia:delete({presence, USR}) - end, - mnesia:transaction(F), +unset_presence(SID, User, Server, Resource, Status) -> + set_session(SID, User, Server, Resource, undefined), + ejabberd_hooks:run(unset_presence_hook, jlib:nameprep(Server), + [User, Server, Resource, Status]). + +close_session_unset_presence(SID, User, Server, Resource, Status) -> + close_session(SID), ejabberd_hooks:run(unset_presence_hook, jlib:nameprep(Server), [User, Server, Resource, Status]). get_user_present_resources(LUser, LServer) -> US = {LUser, LServer}, - case catch mnesia:dirty_index_read(presence, US, #presence.us) of + case catch mnesia:dirty_index_read(session, US, #session.us) of {'EXIT', _Reason} -> []; - Rs -> - lists:map(fun(R) -> - {R#presence.priority, element(3, R#presence.usr)} - end, Rs) + Ss -> + [{S#session.priority, element(3, S#session.usr)} || + S <- clean_session_list(Ss), is_integer(S#session.priority)] end. dirty_get_sessions_list() -> - mnesia:dirty_all_keys(session). + mnesia:dirty_select( + session, + [{#session{usr = '$1', _ = '_'}, + [], + ['$1']}]). dirty_get_my_sessions_list() -> mnesia:dirty_select( session, - [{#session{pid = '$1', _ = '_'}, + [{#session{sid = {'_', '$1'}, _ = '_'}, [{'==', {node, '$1'}, node()}], ['$_']}]). @@ -447,16 +459,16 @@ update_tables() -> [ur, user, pid] -> mnesia:delete_table(session); [usr, us, pid] -> + mnesia:delete_table(session); + [sid, usr, us, priority] -> ok; {'EXIT', _} -> ok end, - case catch mnesia:table_info(presence, attributes) of - [ur, user, priority] -> + case lists:member(presence, mnesia:system_info(tables)) of + true -> mnesia:delete_table(presence); - [usr, us, priority] -> - ok; - {'EXIT', _} -> + false -> ok end, case lists:member(local_session, mnesia:system_info(tables)) of diff --git a/src/mod_irc/mod_irc_connection.erl b/src/mod_irc/mod_irc_connection.erl index aae1decdc..8343a6f92 100644 --- a/src/mod_irc/mod_irc_connection.erl +++ b/src/mod_irc/mod_irc_connection.erl @@ -13,7 +13,7 @@ -behaviour(gen_fsm). %% External exports --export([start/5, receiver/2, route_chan/4, route_nick/3]). +-export([start/5, route_chan/4, route_nick/3]). %% gen_fsm callbacks -export([init/1, @@ -31,7 +31,7 @@ -define(SETS, gb_sets). --record(state, {socket, encoding, receiver, queue, +-record(state, {socket, encoding, queue, user, host, server, nick, channels = dict:new(), inbuf = "", outbuf = ""}). @@ -523,21 +523,6 @@ terminate(Reason, StateName, StateData) -> %%% Internal functions %%%---------------------------------------------------------------------- -receiver(Socket, C2SPid) -> - XMLStreamPid = xml_stream:start(C2SPid), - receiver(Socket, C2SPid, XMLStreamPid). - -receiver(Socket, C2SPid, XMLStreamPid) -> - case gen_tcp:recv(Socket, 0) of - {ok, Text} -> - xml_stream:send_text(XMLStreamPid, Text), - receiver(Socket, C2SPid, XMLStreamPid); - {error, Reason} -> - exit(XMLStreamPid, closed), - gen_fsm:send_event(C2SPid, closed), - ok - end. - send_text(#state{socket = Socket, encoding = Encoding}, Text) -> CText = iconv:convert("utf-8", Encoding, lists:flatten(Text)), %?DEBUG("IRC OUTu: ~s~nIRC OUTk: ~s~n", [Text, CText]),