* src/ejabberd_sm.erl: Partially rewritten to work more

efficiently and avoid race conditions
* src/ejabberd_c2s.erl: Likewise

* src/mod_irc/mod_irc_connection.erl: Cleanup

SVN Revision: 488
This commit is contained in:
Alexey Shchepin 2006-01-23 23:13:06 +00:00
parent 1295893864
commit a4caafeb72
4 changed files with 134 additions and 116 deletions

View File

@ -1,9 +1,22 @@
<<<<<<< .mine
2006-01-23 Alexey Shchepin <alexey@sevcom.net>
* src/ejabberd_sm.erl: Partially rewritten to work more
efficiently and avoid race conditions
* src/ejabberd_c2s.erl: Likewise
2006-01-21 Alexey Shchepin <alexey@sevcom.net>
* src/mod_irc/mod_irc_connection.erl: Cleanup
=======
2006-01-20 Mickael Remond <mickael.remond@process-one.net> 2006-01-20 Mickael Remond <mickael.remond@process-one.net>
* src/ejabberd_receiver.erl: Added new debugging trace: It is now * src/ejabberd_receiver.erl: Added new debugging trace: It is now
possible to dump the XML stream received from a client (usefull for possible to dump the XML stream received from a client (usefull for
client debugging). client debugging).
>>>>>>> .r487
2006-01-19 Alexey Shchepin <alexey@sevcom.net> 2006-01-19 Alexey Shchepin <alexey@sevcom.net>
* src/aclocal.m4: Updated for zlib support * src/aclocal.m4: Updated for zlib support

View File

@ -54,6 +54,7 @@
authenticated = false, authenticated = false,
jid, jid,
user = "", server = ?MYNAME, resource = "", user = "", server = ?MYNAME, resource = "",
sid,
pres_t = ?SETS:new(), pres_t = ?SETS:new(),
pres_f = ?SETS:new(), pres_f = ?SETS:new(),
pres_a = ?SETS:new(), pres_a = ?SETS:new(),
@ -372,8 +373,9 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
"(~w) Accepted legacy authentication for ~s", "(~w) Accepted legacy authentication for ~s",
[StateData#state.socket, [StateData#state.socket,
jlib:jid_to_string(JID)]), jlib:jid_to_string(JID)]),
SID = {now(), self()},
ejabberd_sm:open_session( ejabberd_sm:open_session(
U, StateData#state.server, R), SID, U, StateData#state.server, R),
Res1 = jlib:make_result_iq_reply(El), Res1 = jlib:make_result_iq_reply(El),
Res = setelement(4, Res1, []), Res = setelement(4, Res1, []),
send_element(StateData, Res), send_element(StateData, Res),
@ -397,6 +399,7 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
StateData#state{user = U, StateData#state{user = U,
resource = R, resource = R,
jid = JID, jid = JID,
sid = SID,
pres_f = ?SETS:from_list(Fs1), pres_f = ?SETS:from_list(Fs1),
pres_t = ?SETS:from_list(Ts1), pres_t = ?SETS:from_list(Ts1),
privacy_list = PrivList}}; privacy_list = PrivList}};
@ -664,8 +667,9 @@ wait_for_session({xmlstreamelement, El}, StateData) ->
?INFO_MSG("(~w) Opened session for ~s", ?INFO_MSG("(~w) Opened session for ~s",
[StateData#state.socket, [StateData#state.socket,
jlib:jid_to_string(JID)]), jlib:jid_to_string(JID)]),
SID = {now(), self()},
ejabberd_sm:open_session( ejabberd_sm:open_session(
U, StateData#state.server, R), SID, U, StateData#state.server, R),
Res = jlib:make_result_iq_reply(El), Res = jlib:make_result_iq_reply(El),
send_element(StateData, Res), send_element(StateData, Res),
change_shaper(StateData, JID), change_shaper(StateData, JID),
@ -684,7 +688,8 @@ wait_for_session({xmlstreamelement, El}, StateData) ->
PL -> PL PL -> PL
end, end,
{next_state, session_established, {next_state, session_established,
StateData#state{pres_f = ?SETS:from_list(Fs1), StateData#state{sid = SID,
pres_f = ?SETS:from_list(Fs1),
pres_t = ?SETS:from_list(Ts1), pres_t = ?SETS:from_list(Ts1),
privacy_list = PrivList}}; privacy_list = PrivList}};
_ -> _ ->
@ -1037,10 +1042,12 @@ terminate(_Reason, StateName, StateData) ->
[{"type", "unavailable"}], [{"type", "unavailable"}],
[{xmlelement, "status", [], [{xmlelement, "status", [],
[{xmlcdata, "Replaced by new connection"}]}]}, [{xmlcdata, "Replaced by new connection"}]}]},
ejabberd_sm:unset_presence(StateData#state.user, ejabberd_sm:close_session_unset_presence(
StateData#state.server, StateData#state.sid,
StateData#state.resource, StateData#state.user,
"Replaced by new connection"), StateData#state.server,
StateData#state.resource,
"Replaced by new connection"),
presence_broadcast( presence_broadcast(
StateData, From, StateData#state.pres_a, Packet), StateData, From, StateData#state.pres_a, Packet),
presence_broadcast( presence_broadcast(
@ -1049,25 +1056,24 @@ terminate(_Reason, StateName, StateData) ->
?INFO_MSG("(~w) Close session for ~s", ?INFO_MSG("(~w) Close session for ~s",
[StateData#state.socket, [StateData#state.socket,
jlib:jid_to_string(StateData#state.jid)]), jlib:jid_to_string(StateData#state.jid)]),
ejabberd_sm:close_session(StateData#state.user,
StateData#state.server,
StateData#state.resource),
Tmp = ?SETS:new(), EmptySet = ?SETS:new(),
case StateData of case StateData of
#state{pres_last = undefined, #state{pres_last = undefined,
pres_a = Tmp, pres_a = EmptySet,
pres_i = Tmp, pres_i = EmptySet,
pres_invis = false} -> pres_invis = false} ->
ok; ejabberd_sm:close_session(StateData#state.sid);
_ -> _ ->
From = StateData#state.jid, From = StateData#state.jid,
Packet = {xmlelement, "presence", Packet = {xmlelement, "presence",
[{"type", "unavailable"}], []}, [{"type", "unavailable"}], []},
ejabberd_sm:unset_presence(StateData#state.user, ejabberd_sm:close_session_unset_presence(
StateData#state.server, StateData#state.sid,
StateData#state.resource, StateData#state.user,
""), StateData#state.server,
StateData#state.resource,
""),
presence_broadcast( presence_broadcast(
StateData, From, StateData#state.pres_a, Packet), StateData, From, StateData#state.pres_a, Packet),
presence_broadcast( presence_broadcast(
@ -1189,7 +1195,8 @@ presence_update(From, Packet, StateData) ->
StatusTag -> StatusTag ->
xml:get_tag_cdata(StatusTag) xml:get_tag_cdata(StatusTag)
end, end,
ejabberd_sm:unset_presence(StateData#state.user, ejabberd_sm:unset_presence(StateData#state.sid,
StateData#state.user,
StateData#state.server, StateData#state.server,
StateData#state.resource, StateData#state.resource,
Status), Status),
@ -1493,7 +1500,8 @@ update_priority(El, StateData) ->
0 0
end end
end, end,
ejabberd_sm:set_presence(StateData#state.user, ejabberd_sm:set_presence(StateData#state.sid,
StateData#state.user,
StateData#state.server, StateData#state.server,
StateData#state.resource, StateData#state.resource,
Pri). Pri).

View File

@ -12,12 +12,13 @@
-export([start_link/0, init/0, -export([start_link/0, init/0,
route/3, route/3,
open_session/3, close_session/3, open_session/4, close_session/1,
bounce_offline_message/3, bounce_offline_message/3,
disconnect_removed_user/2, disconnect_removed_user/2,
get_user_resources/2, get_user_resources/2,
set_presence/4, set_presence/5,
unset_presence/4, unset_presence/5,
close_session_unset_presence/5,
dirty_get_sessions_list/0, dirty_get_sessions_list/0,
dirty_get_my_sessions_list/0, dirty_get_my_sessions_list/0,
get_vh_session_list/1, get_vh_session_list/1,
@ -29,8 +30,7 @@
-include("ejabberd.hrl"). -include("ejabberd.hrl").
-include("jlib.hrl"). -include("jlib.hrl").
-record(session, {usr, us, pid}). -record(session, {sid, usr, us, priority}).
-record(presence, {usr, us, priority}).
start_link() -> start_link() ->
Pid = proc_lib:spawn_link(ejabberd_sm, init, []), Pid = proc_lib:spawn_link(ejabberd_sm, init, []),
@ -39,14 +39,12 @@ start_link() ->
init() -> init() ->
update_tables(), update_tables(),
mnesia:create_table(session, [{ram_copies, [node()]}, mnesia:create_table(session,
{attributes, record_info(fields, session)}]), [{ram_copies, [node()]},
{attributes, record_info(fields, session)}]),
mnesia:add_table_index(session, usr),
mnesia:add_table_index(session, us), mnesia:add_table_index(session, us),
mnesia:add_table_copy(session, node(), ram_copies), mnesia:add_table_copy(session, node(), ram_copies),
mnesia:create_table(presence,
[{ram_copies, [node()]},
{attributes, record_info(fields, presence)}]),
mnesia:add_table_index(presence, us),
mnesia:subscribe(system), mnesia:subscribe(system),
ets:new(sm_iqtable, [named_table]), ets:new(sm_iqtable, [named_table]),
lists:foreach( lists:foreach(
@ -101,59 +99,57 @@ route(From, To, Packet) ->
ok ok
end. end.
open_session(User, Server, Resource) -> open_session(SID, User, Server, Resource) ->
register_connection(User, Server, Resource, self()). set_session(SID, User, Server, Resource, undefined).
close_session(User, Server, Resource) -> set_session(SID, User, Server, Resource, Priority) ->
remove_connection(User, Server, Resource).
register_connection(User, Server, Resource, Pid) ->
LUser = jlib:nodeprep(User), LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server), LServer = jlib:nameprep(Server),
LResource = jlib:resourceprep(Resource), LResource = jlib:resourceprep(Resource),
US = {LUser, LServer}, US = {LUser, LServer},
USR = {LUser, LServer, LResource}, USR = {LUser, LServer, LResource},
F = fun() -> F = fun() ->
Ss = mnesia:wread({session, USR}), mnesia:write(#session{sid = SID,
mnesia:write(#session{usr = USR, us = US, pid = Pid}), usr = USR,
Ss us = US,
end, priority = Priority})
case mnesia:transaction(F) of end,
{atomic, Ss} -> mnesia:sync_dirty(F),
SIDs = mnesia:dirty_select(
session,
[{#session{sid = '$1', usr = USR, _ = '_'}, [], ['$1']}]),
if
SIDs == [] ->
ok;
true ->
MaxSID = lists:max(SIDs),
lists:foreach( lists:foreach(
fun(R) -> fun({_, Pid} = S) when S /= MaxSID ->
R#session.pid ! replaced Pid ! replaced;
end, Ss); (_) ->
_ -> ok
false end, SIDs)
end. end.
close_session(SID) ->
remove_connection(User, Server, Resource) ->
LUser = jlib:nodeprep(User),
LResource = jlib:resourceprep(Resource),
LServer = jlib:nameprep(Server),
USR = {LUser, LServer, LResource},
F = fun() -> F = fun() ->
mnesia:delete({session, USR}) mnesia:delete({session, SID})
end, end,
mnesia:transaction(F). mnesia:sync_dirty(F).
clean_table_from_bad_node(Node) -> clean_table_from_bad_node(Node) ->
F = fun() -> F = fun() ->
Es = mnesia:select( Es = mnesia:select(
session, session,
[{#session{pid = '$1', _ = '_'}, [{#session{sid = {'_', '$1'}, _ = '_'},
[{'==', {node, '$1'}, Node}], [{'==', {node, '$1'}, Node}],
['$_']}]), ['$_']}]),
lists:foreach(fun(E) -> lists:foreach(fun(E) ->
mnesia:delete_object(E), mnesia:delete_object(E)
mnesia:delete({presence, E#session.usr})
end, Es) end, Es)
end, end,
mnesia:transaction(F). mnesia:sync_dirty(F).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -257,7 +253,7 @@ do_route(From, To, Packet) ->
end; end;
_ -> _ ->
USR = {LUser, LServer, LResource}, USR = {LUser, LServer, LResource},
case mnesia:dirty_read({session, USR}) of case mnesia:dirty_index_read(session, USR, #session.usr) of
[] -> [] ->
case Name of case Name of
"message" -> "message" ->
@ -275,8 +271,9 @@ do_route(From, To, Packet) ->
_ -> _ ->
?DEBUG("packet droped~n", []) ?DEBUG("packet droped~n", [])
end; end;
[Sess] -> Ss ->
Pid = Sess#session.pid, Session = lists:max(Ss),
Pid = element(2, Session#session.sid),
?DEBUG("sending to process ~p~n", [Pid]), ?DEBUG("sending to process ~p~n", [Pid]),
Pid ! {route, From, To, Packet} Pid ! {route, From, To, Packet}
end end
@ -290,11 +287,12 @@ route_message(From, To, Packet) ->
Priority >= 0 -> Priority >= 0 ->
LResource = jlib:resourceprep(R), LResource = jlib:resourceprep(R),
USR = {LUser, LServer, LResource}, USR = {LUser, LServer, LResource},
case mnesia:dirty_read({session, USR}) of case mnesia:dirty_index_read(session, USR, #session.usr) of
[] -> [] ->
ok; % Race condition ok; % Race condition
[Sess] -> Ss ->
Pid = Sess#session.pid, Session = lists:max(Ss),
Pid = element(2, Session#session.sid),
?DEBUG("sending to process ~p~n", [Pid]), ?DEBUG("sending to process ~p~n", [Pid]),
Pid ! {route, From, To, Packet} Pid ! {route, From, To, Packet}
end; end;
@ -337,53 +335,67 @@ get_user_resources(User, Server) ->
case catch mnesia:dirty_index_read(session, US, #session.us) of case catch mnesia:dirty_index_read(session, US, #session.us) of
{'EXIT', _Reason} -> {'EXIT', _Reason} ->
[]; [];
Rs -> Ss ->
lists:map(fun(R) -> [element(3, S#session.usr) || S <- clean_session_list(Ss)]
element(3, R#session.usr) end.
end, Rs)
clean_session_list(Ss) ->
clean_session_list(lists:keysort(#session.usr, Ss), []).
clean_session_list([], Res) ->
Res;
clean_session_list([S], Res) ->
[S | Res];
clean_session_list([S1, S2 | Rest], Res) ->
if
S1#session.usr == S2#session.usr ->
if
S1#session.sid > S2#session.sid ->
clean_session_list([S1 | Rest], Res);
true ->
clean_session_list([S2 | Rest], Res)
end;
true ->
clean_session_list([S2 | Rest], [S1 | Res])
end. end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
set_presence(User, Server, Resource, Priority) -> set_presence(SID, User, Server, Resource, Priority) ->
LUser = jlib:nodeprep(User), set_session(SID, User, Server, Resource, Priority).
LServer = jlib:nameprep(Server),
USR = {User, Server, Resource},
US = {LUser, LServer},
F = fun() ->
mnesia:write(#presence{usr = USR, us = US,
priority = Priority})
end,
mnesia:transaction(F).
unset_presence(User, Server, Resource, Status) -> unset_presence(SID, User, Server, Resource, Status) ->
USR = {User, Server, Resource}, set_session(SID, User, Server, Resource, undefined),
F = fun() -> ejabberd_hooks:run(unset_presence_hook, jlib:nameprep(Server),
mnesia:delete({presence, USR}) [User, Server, Resource, Status]).
end,
mnesia:transaction(F), close_session_unset_presence(SID, User, Server, Resource, Status) ->
close_session(SID),
ejabberd_hooks:run(unset_presence_hook, jlib:nameprep(Server), ejabberd_hooks:run(unset_presence_hook, jlib:nameprep(Server),
[User, Server, Resource, Status]). [User, Server, Resource, Status]).
get_user_present_resources(LUser, LServer) -> get_user_present_resources(LUser, LServer) ->
US = {LUser, LServer}, US = {LUser, LServer},
case catch mnesia:dirty_index_read(presence, US, #presence.us) of case catch mnesia:dirty_index_read(session, US, #session.us) of
{'EXIT', _Reason} -> {'EXIT', _Reason} ->
[]; [];
Rs -> Ss ->
lists:map(fun(R) -> [{S#session.priority, element(3, S#session.usr)} ||
{R#presence.priority, element(3, R#presence.usr)} S <- clean_session_list(Ss), is_integer(S#session.priority)]
end, Rs)
end. end.
dirty_get_sessions_list() -> dirty_get_sessions_list() ->
mnesia:dirty_all_keys(session). mnesia:dirty_select(
session,
[{#session{usr = '$1', _ = '_'},
[],
['$1']}]).
dirty_get_my_sessions_list() -> dirty_get_my_sessions_list() ->
mnesia:dirty_select( mnesia:dirty_select(
session, session,
[{#session{pid = '$1', _ = '_'}, [{#session{sid = {'_', '$1'}, _ = '_'},
[{'==', {node, '$1'}, node()}], [{'==', {node, '$1'}, node()}],
['$_']}]). ['$_']}]).
@ -447,16 +459,16 @@ update_tables() ->
[ur, user, pid] -> [ur, user, pid] ->
mnesia:delete_table(session); mnesia:delete_table(session);
[usr, us, pid] -> [usr, us, pid] ->
mnesia:delete_table(session);
[sid, usr, us, priority] ->
ok; ok;
{'EXIT', _} -> {'EXIT', _} ->
ok ok
end, end,
case catch mnesia:table_info(presence, attributes) of case lists:member(presence, mnesia:system_info(tables)) of
[ur, user, priority] -> true ->
mnesia:delete_table(presence); mnesia:delete_table(presence);
[usr, us, priority] -> false ->
ok;
{'EXIT', _} ->
ok ok
end, end,
case lists:member(local_session, mnesia:system_info(tables)) of case lists:member(local_session, mnesia:system_info(tables)) of

View File

@ -13,7 +13,7 @@
-behaviour(gen_fsm). -behaviour(gen_fsm).
%% External exports %% External exports
-export([start/5, receiver/2, route_chan/4, route_nick/3]). -export([start/5, route_chan/4, route_nick/3]).
%% gen_fsm callbacks %% gen_fsm callbacks
-export([init/1, -export([init/1,
@ -31,7 +31,7 @@
-define(SETS, gb_sets). -define(SETS, gb_sets).
-record(state, {socket, encoding, receiver, queue, -record(state, {socket, encoding, queue,
user, host, server, nick, user, host, server, nick,
channels = dict:new(), channels = dict:new(),
inbuf = "", outbuf = ""}). inbuf = "", outbuf = ""}).
@ -523,21 +523,6 @@ terminate(Reason, StateName, StateData) ->
%%% Internal functions %%% Internal functions
%%%---------------------------------------------------------------------- %%%----------------------------------------------------------------------
receiver(Socket, C2SPid) ->
XMLStreamPid = xml_stream:start(C2SPid),
receiver(Socket, C2SPid, XMLStreamPid).
receiver(Socket, C2SPid, XMLStreamPid) ->
case gen_tcp:recv(Socket, 0) of
{ok, Text} ->
xml_stream:send_text(XMLStreamPid, Text),
receiver(Socket, C2SPid, XMLStreamPid);
{error, Reason} ->
exit(XMLStreamPid, closed),
gen_fsm:send_event(C2SPid, closed),
ok
end.
send_text(#state{socket = Socket, encoding = Encoding}, Text) -> send_text(#state{socket = Socket, encoding = Encoding}, Text) ->
CText = iconv:convert("utf-8", Encoding, lists:flatten(Text)), CText = iconv:convert("utf-8", Encoding, lists:flatten(Text)),
%?DEBUG("IRC OUTu: ~s~nIRC OUTk: ~s~n", [Text, CText]), %?DEBUG("IRC OUTu: ~s~nIRC OUTk: ~s~n", [Text, CText]),