From 4a2f62062ed2b729b464b37f2ee0e9e17328a296 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Mon, 15 Nov 2010 01:09:46 +0900 Subject: [PATCH] New migration procedure --- src/ejabberd_c2s.erl | 20 ++++----- src/ejabberd_cluster.erl | 69 ++++++++++++++++++++++-------- src/ejabberd_s2s.erl | 4 +- src/ejabberd_sm.erl | 81 ++++++++++++++++++++++++++++++------ src/mod_muc/mod_muc.erl | 81 ++++++++++++++++++++++++++++++------ src/mod_muc/mod_muc_room.erl | 12 ++++-- 6 files changed, 208 insertions(+), 59 deletions(-) diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index a44f66a7d..4ff959217 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -194,7 +194,7 @@ stop(FsmRef) -> ?GEN_FSM:send_event(FsmRef, closed). migrate(FsmRef, Node, After) -> - ?GEN_FSM:send_all_state_event(FsmRef, {migrate, Node, After}). + erlang:send_after(After, FsmRef, {migrate, Node}). %%%---------------------------------------------------------------------- %%% Callback functions from gen_fsm @@ -282,6 +282,7 @@ init([StateName, StateData, _FSMLimitOpts]) -> El -> get_priority_from_presence(El) end, + ejabberd_sm:drop_session(StateData#state.sid), ejabberd_sm:open_session( SID, StateData#state.user, @@ -289,6 +290,7 @@ init([StateName, StateData, _FSMLimitOpts]) -> StateData#state.resource, Priority, Info), + %%ejabberd_sm:drop_session(StateData#state.sid), NewStateData = StateData#state{sid = SID, socket_monitor = MRef}, StateData2 = change_reception(NewStateData, true), StateData3 = start_keepalive_timer(StateData2), @@ -1232,9 +1234,6 @@ session_established2(El, StateData) -> %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- -handle_event({migrate, Node, After}, StateName, StateData) when Node /= node() -> - fsm_migrate(StateName, StateData, Node, After * 2); - handle_event({add_rosteritem, IJID, ISubscription}, StateName, StateData) -> NewStateData = roster_change(IJID, ISubscription, StateData), fsm_next_state(StateName, NewStateData); @@ -1624,6 +1623,12 @@ handle_info({force_update_presence, LUser}, StateName, StateData end, {next_state, StateName, NewStateData}; +handle_info({migrate, Node}, StateName, StateData) -> + if Node /= node() -> + fsm_migrate(StateName, StateData, Node, 0); + true -> + fsm_next_state(StateName, StateData) + end; handle_info({broadcast, Type, From, Packet}, StateName, StateData) -> Recipients = ejabberd_hooks:run_fold( c2s_broadcast_recipients, StateData#state.server, @@ -2535,13 +2540,6 @@ maybe_migrate(StateName, StateData) -> StateData2 = change_reception(PackedStateData, true), StateData3 = start_keepalive_timer(StateData2), erlang:garbage_collect(), - case ejabberd_cluster:get_node_new({U, S}) of - Node -> - ok; - NewNode -> - After = ejabberd_cluster:rehash_timeout(), - migrate(self(), NewNode, After) - end, fsm_next_state(StateName, StateData3); Node -> fsm_migrate(StateName, PackedStateData, Node, 0) diff --git a/src/ejabberd_cluster.erl b/src/ejabberd_cluster.erl index c189eb450..bdae7039f 100644 --- a/src/ejabberd_cluster.erl +++ b/src/ejabberd_cluster.erl @@ -22,7 +22,11 @@ -define(HASHTBL, nodes_hash). -define(HASHTBL_NEW, nodes_hash_new). -define(POINTS, 64). --define(REHASH_TIMEOUT, 30000). +-define(REHASH_TIMEOUT, timer:seconds(30)). +-define(MIGRATE_TIMEOUT, timer:minutes(2)). +%%-define(REHASH_TIMEOUT, timer:seconds(10)). +%%-define(MIGRATE_TIMEOUT, timer:seconds(5)). +-define(LOCK, {migrate, node()}). -record(state, {}). @@ -80,13 +84,16 @@ shutdown() -> %% gen_server callbacks %%==================================================================== init([]) -> + {A, B, C} = now(), + random:seed(A, B, C), net_kernel:monitor_nodes(true, [{node_type, visible}]), ets:new(?HASHTBL, [named_table, public, ordered_set]), ets:new(?HASHTBL_NEW, [named_table, public, ordered_set]), register_node(), - AllNodes = mnesia:system_info(running_db_nodes), + pg2:create(?MODULE), + AllNodes = cluster_group(), OtherNodes = case AllNodes of - [_] -> + [_MyNode] -> AllNodes; _ -> AllNodes -- [node()] @@ -96,35 +103,55 @@ init([]) -> {ok, #state{}}. handle_call(announce, _From, State) -> - case mnesia:system_info(running_db_nodes) of + case global:set_lock(?LOCK, cluster_group(), 0) of + false -> + ?INFO_MSG("Another node is recently attached to " + "the cluster and is being rebalanced. " + "Waiting for the rebalancing to be completed " + "before starting this node. " + "This may take serveral minutes. " + "Please, be patient.", []), + global:set_lock(?LOCK, cluster_group(), infinity); + true -> + ok + end, + case cluster_group() of [_MyNode] -> - ok; + join_cluster_group(), + global:del_lock(?LOCK); Nodes -> OtherNodes = Nodes -- [node()], - lists:foreach( - fun(Node) -> - {?MODULE, Node} ! {node_ready, node()} - end, OtherNodes), ?INFO_MSG("waiting for migration from nodes: ~w", [OtherNodes]), - timer:sleep(?REHASH_TIMEOUT), - append_node(?HASHTBL, node()) + {_Res, BadNodes} = gen_server:multi_call( + OtherNodes, ?MODULE, + {node_ready, node()}, ?REHASH_TIMEOUT), + append_node(?HASHTBL, node()), + join_cluster_group(), + gen_server:abcast(OtherNodes -- BadNodes, + ?MODULE, {node_ready, node()}), + erlang:send_after(?MIGRATE_TIMEOUT, self(), del_lock) end, {reply, ok, State}; +handle_call({node_ready, Node}, _From, State) -> + ?INFO_MSG("node ~p is ready, preparing migration", [Node]), + append_node(?HASHTBL_NEW, Node), + ejabberd_hooks:run(node_up, [Node]), + {reply, ok, State}; handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. +handle_cast({node_ready, Node}, State) -> + ?INFO_MSG("adding node ~p to hash and starting migration", [Node]), + append_node(?HASHTBL, Node), + ejabberd_hooks:run(node_hash_update, [?MIGRATE_TIMEOUT]), + {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. -handle_info({node_ready, Node}, State) -> - ?INFO_MSG("node ~p is ready, starting migration", [Node]), - append_node(?HASHTBL_NEW, Node), - ejabberd_hooks:run(node_hash_update, [?REHASH_TIMEOUT]), - timer:sleep(?REHASH_TIMEOUT), - ?INFO_MSG("adding node ~p to hash", [Node]), - append_node(?HASHTBL, Node), +handle_info(del_lock, State) -> + global:del_lock(?LOCK), {noreply, State}; handle_info({node_down, Node}, State) -> delete_node(?HASHTBL, Node), @@ -187,3 +214,9 @@ get_node_by_hash(Tab, Hash) -> register_node() -> global:register_name(list_to_atom(node_id()), self()). + +cluster_group() -> + [node() | [node(P) || P <- pg2:get_members(?MODULE)]]. + +join_cluster_group() -> + pg2:join(?MODULE, whereis(?MODULE)). diff --git a/src/ejabberd_s2s.erl b/src/ejabberd_s2s.erl index 13c2d6d07..99403f75c 100644 --- a/src/ejabberd_s2s.erl +++ b/src/ejabberd_s2s.erl @@ -203,9 +203,9 @@ migrate(After) -> ['$$']}]), lists:foreach( fun([FromTo, Pid]) -> - case ejabberd_cluster:get_node_new(FromTo) of + case ejabberd_cluster:get_node(FromTo) of Node when Node /= node() -> - ejabberd_s2s_out:stop_connection(Pid, After * 2); + ejabberd_s2s_out:stop_connection(Pid, After); _ -> ok end diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index cca7db502..dd347edc9 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -37,6 +37,7 @@ open_session/6, close_session/4, close_migrated_session/4, + drop_session/1, check_in_subscription/6, bounce_offline_message/3, disconnect_removed_user/2, @@ -60,6 +61,7 @@ get_session_pid/3, get_user_info/3, get_user_ip/3, + node_up/1, migrate/1 ]). @@ -108,27 +110,37 @@ open_session(SID, User, Server, Resource, Priority, Info) -> [SID, JID, Info]). close_session(SID, User, Server, Resource) -> - Info = do_close_session(SID, User, Server, Resource), + Info = do_close_session(SID), + US = {jlib:nodeprep(User), jlib:nameprep(Server)}, + case ejabberd_cluster:get_node_new(US) of + Node when Node /= node() -> + rpc:cast(Node, ?MODULE, drop_session, [SID]); + _ -> + ok + end, JID = jlib:make_jid(User, Server, Resource), ejabberd_hooks:run(sm_remove_connection_hook, JID#jid.lserver, [SID, JID, Info]). close_migrated_session(SID, User, Server, Resource) -> - Info = do_close_session(SID, User, Server, Resource), + Info = do_close_session(SID), JID = jlib:make_jid(User, Server, Resource), ejabberd_hooks:run(sm_remove_migrated_connection_hook, JID#jid.lserver, [SID, JID, Info]). -do_close_session(SID, User, Server, Resource) -> +do_close_session(SID) -> Info = case mnesia:dirty_read({session, SID}) of - [] -> []; - [#session{info=I}] -> I - end, + [] -> []; + [#session{info=I}] -> I + end, + drop_session(SID), + Info. + +drop_session(SID) -> F = fun() -> mnesia:delete({session, SID}) end, - mnesia:sync_dirty(F), - Info. + mnesia:sync_dirty(F). check_in_subscription(Acc, User, Server, _JID, _Type, _Reason) -> case ejabberd_auth:is_user_exists(User, Server) of @@ -312,14 +324,33 @@ migrate(After) -> ['$$']}]), lists:foreach( fun([US, Pid]) -> - case ejabberd_cluster:get_node_new(US) of + case ejabberd_cluster:get_node(US) of Node when Node /= node() -> - ejabberd_c2s:migrate(Pid, Node, After); + ejabberd_c2s:migrate(Pid, Node, random:uniform(After)); _ -> ok end end, Ss). +node_up(_Node) -> + copy_sessions(mnesia:dirty_first(session)). + +copy_sessions('$end_of_table') -> + ok; +copy_sessions(Key) -> + case mnesia:dirty_read(session, Key) of + [#session{us = US} = Session] -> + case ejabberd_cluster:get_node_new(US) of + Node when node() /= Node -> + rpc:cast(Node, mnesia, dirty_write, [Session]); + _ -> + ok + end; + _ -> + ok + end, + copy_sessions(mnesia:dirty_next(session, Key)). + %%==================================================================== %% gen_server callbacks %%==================================================================== @@ -341,6 +372,7 @@ init([]) -> mnesia:add_table_index(session, us), mnesia:add_table_copy(session, node(), ram_copies), ets:new(sm_iqtable, [named_table]), + ejabberd_hooks:add(node_up, ?MODULE, node_up, 100), ejabberd_hooks:add(node_hash_update, ?MODULE, migrate, 100), lists:foreach( fun(Host) -> @@ -418,6 +450,7 @@ handle_info(_Info, State) -> %% The return value is ignored. %%-------------------------------------------------------------------- terminate(_Reason, _State) -> + ejabberd_hooks:delete(node_up, ?MODULE, node_up, 100), ejabberd_hooks:delete(node_hash_update, ?MODULE, migrate, 100), ejabberd_commands:unregister_commands(commands()), ok. @@ -433,7 +466,7 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%-------------------------------------------------------------------- -set_session(SID, User, Server, Resource, Priority, Info) -> +set_session({_, Pid} = SID, User, Server, Resource, Priority, Info) -> LUser = jlib:nodeprep(User), LServer = jlib:nameprep(Server), LResource = jlib:resourceprep(Resource), @@ -446,7 +479,31 @@ set_session(SID, User, Server, Resource, Priority, Info) -> priority = Priority, info = Info}) end, - mnesia:sync_dirty(F). + mnesia:sync_dirty(F), + case ejabberd_cluster:get_node_new(US) of + Node when node() /= Node -> + %% New node has just been added. But we may miss session records + %% copy procedure, so we copy the session record manually just + %% to make sure + rpc:cast(Node, mnesia, dirty_write, + [#session{sid = SID, + usr = USR, + us = US, + priority = Priority, + info = Info}]), + case ejabberd_cluster:get_node(US) of + Node when node() /= Node -> + %% Migration to new node has completed, and seems like + %% we missed it, so we migrate the session pid manually. + %% It is not a problem if we have already got migration + %% notification: dups are just ignored by the c2s pid. + ejabberd_c2s:migrate(Pid, Node, 0); + _ -> + ok + end; + _ -> + ok + end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/src/mod_muc/mod_muc.erl b/src/mod_muc/mod_muc.erl index ff6482f73..ff329e5b6 100644 --- a/src/mod_muc/mod_muc.erl +++ b/src/mod_muc/mod_muc.erl @@ -42,6 +42,7 @@ process_iq_disco_items/4, broadcast_service_message/2, register_room/3, + node_up/1, migrate/1, get_vh_rooms/1, can_use_nick/3]). @@ -175,14 +176,33 @@ migrate(After) -> ['$$']}]), lists:foreach( fun([NameHost, Pid]) -> - case ejabberd_cluster:get_node_new(NameHost) of + case ejabberd_cluster:get_node(NameHost) of Node when Node /= node() -> - mod_muc_room:migrate(Pid, Node, After); + mod_muc_room:migrate(Pid, Node, random:uniform(After)); _ -> ok end end, Rs). +node_up(Node) -> + copy_rooms(mnesia:dirty_first(muc_online_room)). + +copy_rooms('$end_of_table') -> + ok; +copy_rooms(Key) -> + case mnesia:dirty_read(muc_online_room, Key) of + [#muc_online_room{name_host = NameHost} = Room] -> + case ejabberd_cluster:get_node_new(NameHost) of + Node when node() /= Node -> + rpc:cast(Node, mnesia, dirty_write, [Room]); + _ -> + ok + end; + _ -> + ok + end, + copy_rooms(mnesia:dirty_next(muc_online_room, Key)). + %%==================================================================== %% gen_server callbacks %%==================================================================== @@ -219,6 +239,7 @@ init([Host, Opts]) -> DefRoomOpts = gen_mod:get_opt(default_room_options, Opts, []), RoomShaper = gen_mod:get_opt(room_shaper, Opts, none), ejabberd_router:register_route(MyHost), + ejabberd_hooks:add(node_up, ?MODULE, node_up, 100), ejabberd_hooks:add(node_hash_update, ?MODULE, migrate, 100), load_permanent_rooms(MyHost, Host, {Access, AccessCreate, AccessAdmin, AccessPersistent}, @@ -306,7 +327,15 @@ handle_info({room_destroyed, RoomHost, Pid}, State) -> mnesia:delete_object(#muc_online_room{name_host = RoomHost, pid = Pid}) end, - mnesia:sync_dirty(F), + mnesia:async_dirty(F), + case ejabberd_cluster:get_node_new(RoomHost) of + Node when Node /= node() -> + rpc:cast(Node, mnesia, dirty_delete_object, + [#muc_online_room{name_host = RoomHost, + pid = Pid}]); + _ -> + ok + end, {noreply, State}; handle_info(_Info, State) -> {noreply, State}. @@ -319,6 +348,7 @@ handle_info(_Info, State) -> %% The return value is ignored. %%-------------------------------------------------------------------- terminate(_Reason, State) -> + ejabberd_hooks:delete(node_up, ?MODULE, node_up, 100), ejabberd_hooks:delete(node_hash_update, ?MODULE, migrate, 100), ejabberd_router:unregister_route(State#state.host), ok. @@ -507,14 +537,20 @@ do_route1(Host, ServerHost, Access, HistorySize, RoomShaper, AccessCreate, From, Room) of true -> - {ok, Pid} = start_new_room( - Host, ServerHost, Access, - Room, HistorySize, - RoomShaper, From, - Nick, DefRoomOpts), - register_room(Host, Room, Pid), - mod_muc_room:route(Pid, From, Nick, Packet), - ok; + case start_new_room( + Host, ServerHost, Access, + Room, HistorySize, + RoomShaper, From, + Nick, DefRoomOpts) of + {ok, Pid} -> + mod_muc_room:route(Pid, From, Nick, Packet), + register_room(Host, Room, Pid), + ok; + _Err -> + Err = jlib:make_error_reply( + Packet, ?ERR_INTERNAL_SERVER_ERROR), + ejabberd_router:route(To, From, Err) + end; false -> Lang = xml:get_attr_s("xml:lang", Attrs), ErrText = "Room creation is denied by service policy", @@ -603,7 +639,28 @@ register_room(Host, Room, Pid) -> mnesia:write(#muc_online_room{name_host = {Room, Host}, pid = Pid}) end, - mnesia:sync_dirty(F). + mnesia:async_dirty(F), + case ejabberd_cluster:get_node_new({Room, Host}) of + Node when Node /= node() -> + %% New node has just been added. But we may miss MUC records + %% copy procedure, so we copy the MUC record manually just + %% to make sure + rpc:cast(Node, mnesia, dirty_write, + [#muc_online_room{name_host = {Room, Host}, + pid = Pid}]), + case ejabberd_cluster:get_node({Room, Host}) of + Node when node() /= Node -> + %% Migration to new node has completed, and seems like + %% we missed it, so we migrate the MUC room pid manually. + %% It is not a problem if we have already got migration + %% notification: dups are just ignored by the MUC room pid. + mod_muc_room:migrate(Pid, Node, 0); + _ -> + ok + end; + _ -> + ok + end. iq_disco_info(Lang) -> [{xmlelement, "identity", diff --git a/src/mod_muc/mod_muc_room.erl b/src/mod_muc/mod_muc_room.erl index 913d408d6..2891cf261 100644 --- a/src/mod_muc/mod_muc_room.erl +++ b/src/mod_muc/mod_muc_room.erl @@ -110,7 +110,7 @@ start_link(StateName, StateData) -> ?GEN_FSM:start_link(?MODULE, [StateName, StateData], ?FSMOPTS). migrate(FsmRef, Node, After) -> - ?GEN_FSM:send_all_state_event(FsmRef, {migrate, Node, After}). + erlang:send_after(After, FsmRef, {migrate, Node}). %%%---------------------------------------------------------------------- %%% Callback functions from gen_fsm @@ -597,9 +597,6 @@ handle_event(destroy, StateName, StateData) -> handle_event({set_affiliations, Affiliations}, StateName, StateData) -> {next_state, StateName, StateData#state{affiliations = Affiliations}}; -handle_event({migrate, Node, After}, StateName, StateData) when Node /= node() -> - {migrate, StateData, - {Node, ?MODULE, start, [StateName, StateData]}, After * 2}; handle_event(_Event, StateName, StateData) -> {next_state, StateName, StateData}. @@ -716,6 +713,13 @@ handle_info({captcha_failed, From}, normal_state, StateData) -> StateData end, {next_state, normal_state, NewState}; +handle_info({migrate, Node}, StateName, StateData) -> + if Node /= node() -> + {migrate, StateData, + {Node, ?MODULE, start, [StateName, StateData]}, 0}; + true -> + {next_state, StateName, StateData} + end; handle_info(_Info, StateName, StateData) -> {next_state, StateName, StateData}.