25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-12-24 17:29:28 +01:00

New migration procedure

This commit is contained in:
Evgeniy Khramtsov 2010-11-15 01:09:46 +09:00
parent 33d4126290
commit 4a2f62062e
6 changed files with 208 additions and 59 deletions

View File

@ -194,7 +194,7 @@ stop(FsmRef) ->
?GEN_FSM:send_event(FsmRef, closed).
migrate(FsmRef, Node, After) ->
?GEN_FSM:send_all_state_event(FsmRef, {migrate, Node, After}).
erlang:send_after(After, FsmRef, {migrate, Node}).
%%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm
@ -282,6 +282,7 @@ init([StateName, StateData, _FSMLimitOpts]) ->
El ->
get_priority_from_presence(El)
end,
ejabberd_sm:drop_session(StateData#state.sid),
ejabberd_sm:open_session(
SID,
StateData#state.user,
@ -289,6 +290,7 @@ init([StateName, StateData, _FSMLimitOpts]) ->
StateData#state.resource,
Priority,
Info),
%%ejabberd_sm:drop_session(StateData#state.sid),
NewStateData = StateData#state{sid = SID, socket_monitor = MRef},
StateData2 = change_reception(NewStateData, true),
StateData3 = start_keepalive_timer(StateData2),
@ -1232,9 +1234,6 @@ session_established2(El, StateData) ->
%% {next_state, NextStateName, NextStateData, Timeout} |
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
handle_event({migrate, Node, After}, StateName, StateData) when Node /= node() ->
fsm_migrate(StateName, StateData, Node, After * 2);
handle_event({add_rosteritem, IJID, ISubscription}, StateName, StateData) ->
NewStateData = roster_change(IJID, ISubscription, StateData),
fsm_next_state(StateName, NewStateData);
@ -1624,6 +1623,12 @@ handle_info({force_update_presence, LUser}, StateName,
StateData
end,
{next_state, StateName, NewStateData};
handle_info({migrate, Node}, StateName, StateData) ->
if Node /= node() ->
fsm_migrate(StateName, StateData, Node, 0);
true ->
fsm_next_state(StateName, StateData)
end;
handle_info({broadcast, Type, From, Packet}, StateName, StateData) ->
Recipients = ejabberd_hooks:run_fold(
c2s_broadcast_recipients, StateData#state.server,
@ -2535,13 +2540,6 @@ maybe_migrate(StateName, StateData) ->
StateData2 = change_reception(PackedStateData, true),
StateData3 = start_keepalive_timer(StateData2),
erlang:garbage_collect(),
case ejabberd_cluster:get_node_new({U, S}) of
Node ->
ok;
NewNode ->
After = ejabberd_cluster:rehash_timeout(),
migrate(self(), NewNode, After)
end,
fsm_next_state(StateName, StateData3);
Node ->
fsm_migrate(StateName, PackedStateData, Node, 0)

View File

@ -22,7 +22,11 @@
-define(HASHTBL, nodes_hash).
-define(HASHTBL_NEW, nodes_hash_new).
-define(POINTS, 64).
-define(REHASH_TIMEOUT, 30000).
-define(REHASH_TIMEOUT, timer:seconds(30)).
-define(MIGRATE_TIMEOUT, timer:minutes(2)).
%%-define(REHASH_TIMEOUT, timer:seconds(10)).
%%-define(MIGRATE_TIMEOUT, timer:seconds(5)).
-define(LOCK, {migrate, node()}).
-record(state, {}).
@ -80,13 +84,16 @@ shutdown() ->
%% gen_server callbacks
%%====================================================================
init([]) ->
{A, B, C} = now(),
random:seed(A, B, C),
net_kernel:monitor_nodes(true, [{node_type, visible}]),
ets:new(?HASHTBL, [named_table, public, ordered_set]),
ets:new(?HASHTBL_NEW, [named_table, public, ordered_set]),
register_node(),
AllNodes = mnesia:system_info(running_db_nodes),
pg2:create(?MODULE),
AllNodes = cluster_group(),
OtherNodes = case AllNodes of
[_] ->
[_MyNode] ->
AllNodes;
_ ->
AllNodes -- [node()]
@ -96,35 +103,55 @@ init([]) ->
{ok, #state{}}.
handle_call(announce, _From, State) ->
case mnesia:system_info(running_db_nodes) of
case global:set_lock(?LOCK, cluster_group(), 0) of
false ->
?INFO_MSG("Another node is recently attached to "
"the cluster and is being rebalanced. "
"Waiting for the rebalancing to be completed "
"before starting this node. "
"This may take serveral minutes. "
"Please, be patient.", []),
global:set_lock(?LOCK, cluster_group(), infinity);
true ->
ok
end,
case cluster_group() of
[_MyNode] ->
ok;
join_cluster_group(),
global:del_lock(?LOCK);
Nodes ->
OtherNodes = Nodes -- [node()],
lists:foreach(
fun(Node) ->
{?MODULE, Node} ! {node_ready, node()}
end, OtherNodes),
?INFO_MSG("waiting for migration from nodes: ~w",
[OtherNodes]),
timer:sleep(?REHASH_TIMEOUT),
append_node(?HASHTBL, node())
{_Res, BadNodes} = gen_server:multi_call(
OtherNodes, ?MODULE,
{node_ready, node()}, ?REHASH_TIMEOUT),
append_node(?HASHTBL, node()),
join_cluster_group(),
gen_server:abcast(OtherNodes -- BadNodes,
?MODULE, {node_ready, node()}),
erlang:send_after(?MIGRATE_TIMEOUT, self(), del_lock)
end,
{reply, ok, State};
handle_call({node_ready, Node}, _From, State) ->
?INFO_MSG("node ~p is ready, preparing migration", [Node]),
append_node(?HASHTBL_NEW, Node),
ejabberd_hooks:run(node_up, [Node]),
{reply, ok, State};
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast({node_ready, Node}, State) ->
?INFO_MSG("adding node ~p to hash and starting migration", [Node]),
append_node(?HASHTBL, Node),
ejabberd_hooks:run(node_hash_update, [?MIGRATE_TIMEOUT]),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({node_ready, Node}, State) ->
?INFO_MSG("node ~p is ready, starting migration", [Node]),
append_node(?HASHTBL_NEW, Node),
ejabberd_hooks:run(node_hash_update, [?REHASH_TIMEOUT]),
timer:sleep(?REHASH_TIMEOUT),
?INFO_MSG("adding node ~p to hash", [Node]),
append_node(?HASHTBL, Node),
handle_info(del_lock, State) ->
global:del_lock(?LOCK),
{noreply, State};
handle_info({node_down, Node}, State) ->
delete_node(?HASHTBL, Node),
@ -187,3 +214,9 @@ get_node_by_hash(Tab, Hash) ->
register_node() ->
global:register_name(list_to_atom(node_id()), self()).
cluster_group() ->
[node() | [node(P) || P <- pg2:get_members(?MODULE)]].
join_cluster_group() ->
pg2:join(?MODULE, whereis(?MODULE)).

View File

@ -203,9 +203,9 @@ migrate(After) ->
['$$']}]),
lists:foreach(
fun([FromTo, Pid]) ->
case ejabberd_cluster:get_node_new(FromTo) of
case ejabberd_cluster:get_node(FromTo) of
Node when Node /= node() ->
ejabberd_s2s_out:stop_connection(Pid, After * 2);
ejabberd_s2s_out:stop_connection(Pid, After);
_ ->
ok
end

View File

@ -37,6 +37,7 @@
open_session/6,
close_session/4,
close_migrated_session/4,
drop_session/1,
check_in_subscription/6,
bounce_offline_message/3,
disconnect_removed_user/2,
@ -60,6 +61,7 @@
get_session_pid/3,
get_user_info/3,
get_user_ip/3,
node_up/1,
migrate/1
]).
@ -108,27 +110,37 @@ open_session(SID, User, Server, Resource, Priority, Info) ->
[SID, JID, Info]).
close_session(SID, User, Server, Resource) ->
Info = do_close_session(SID, User, Server, Resource),
Info = do_close_session(SID),
US = {jlib:nodeprep(User), jlib:nameprep(Server)},
case ejabberd_cluster:get_node_new(US) of
Node when Node /= node() ->
rpc:cast(Node, ?MODULE, drop_session, [SID]);
_ ->
ok
end,
JID = jlib:make_jid(User, Server, Resource),
ejabberd_hooks:run(sm_remove_connection_hook, JID#jid.lserver,
[SID, JID, Info]).
close_migrated_session(SID, User, Server, Resource) ->
Info = do_close_session(SID, User, Server, Resource),
Info = do_close_session(SID),
JID = jlib:make_jid(User, Server, Resource),
ejabberd_hooks:run(sm_remove_migrated_connection_hook, JID#jid.lserver,
[SID, JID, Info]).
do_close_session(SID, User, Server, Resource) ->
do_close_session(SID) ->
Info = case mnesia:dirty_read({session, SID}) of
[] -> [];
[#session{info=I}] -> I
end,
[] -> [];
[#session{info=I}] -> I
end,
drop_session(SID),
Info.
drop_session(SID) ->
F = fun() ->
mnesia:delete({session, SID})
end,
mnesia:sync_dirty(F),
Info.
mnesia:sync_dirty(F).
check_in_subscription(Acc, User, Server, _JID, _Type, _Reason) ->
case ejabberd_auth:is_user_exists(User, Server) of
@ -312,14 +324,33 @@ migrate(After) ->
['$$']}]),
lists:foreach(
fun([US, Pid]) ->
case ejabberd_cluster:get_node_new(US) of
case ejabberd_cluster:get_node(US) of
Node when Node /= node() ->
ejabberd_c2s:migrate(Pid, Node, After);
ejabberd_c2s:migrate(Pid, Node, random:uniform(After));
_ ->
ok
end
end, Ss).
node_up(_Node) ->
copy_sessions(mnesia:dirty_first(session)).
copy_sessions('$end_of_table') ->
ok;
copy_sessions(Key) ->
case mnesia:dirty_read(session, Key) of
[#session{us = US} = Session] ->
case ejabberd_cluster:get_node_new(US) of
Node when node() /= Node ->
rpc:cast(Node, mnesia, dirty_write, [Session]);
_ ->
ok
end;
_ ->
ok
end,
copy_sessions(mnesia:dirty_next(session, Key)).
%%====================================================================
%% gen_server callbacks
%%====================================================================
@ -341,6 +372,7 @@ init([]) ->
mnesia:add_table_index(session, us),
mnesia:add_table_copy(session, node(), ram_copies),
ets:new(sm_iqtable, [named_table]),
ejabberd_hooks:add(node_up, ?MODULE, node_up, 100),
ejabberd_hooks:add(node_hash_update, ?MODULE, migrate, 100),
lists:foreach(
fun(Host) ->
@ -418,6 +450,7 @@ handle_info(_Info, State) ->
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ejabberd_hooks:delete(node_up, ?MODULE, node_up, 100),
ejabberd_hooks:delete(node_hash_update, ?MODULE, migrate, 100),
ejabberd_commands:unregister_commands(commands()),
ok.
@ -433,7 +466,7 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%--------------------------------------------------------------------
set_session(SID, User, Server, Resource, Priority, Info) ->
set_session({_, Pid} = SID, User, Server, Resource, Priority, Info) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
LResource = jlib:resourceprep(Resource),
@ -446,7 +479,31 @@ set_session(SID, User, Server, Resource, Priority, Info) ->
priority = Priority,
info = Info})
end,
mnesia:sync_dirty(F).
mnesia:sync_dirty(F),
case ejabberd_cluster:get_node_new(US) of
Node when node() /= Node ->
%% New node has just been added. But we may miss session records
%% copy procedure, so we copy the session record manually just
%% to make sure
rpc:cast(Node, mnesia, dirty_write,
[#session{sid = SID,
usr = USR,
us = US,
priority = Priority,
info = Info}]),
case ejabberd_cluster:get_node(US) of
Node when node() /= Node ->
%% Migration to new node has completed, and seems like
%% we missed it, so we migrate the session pid manually.
%% It is not a problem if we have already got migration
%% notification: dups are just ignored by the c2s pid.
ejabberd_c2s:migrate(Pid, Node, 0);
_ ->
ok
end;
_ ->
ok
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

View File

@ -42,6 +42,7 @@
process_iq_disco_items/4,
broadcast_service_message/2,
register_room/3,
node_up/1,
migrate/1,
get_vh_rooms/1,
can_use_nick/3]).
@ -175,14 +176,33 @@ migrate(After) ->
['$$']}]),
lists:foreach(
fun([NameHost, Pid]) ->
case ejabberd_cluster:get_node_new(NameHost) of
case ejabberd_cluster:get_node(NameHost) of
Node when Node /= node() ->
mod_muc_room:migrate(Pid, Node, After);
mod_muc_room:migrate(Pid, Node, random:uniform(After));
_ ->
ok
end
end, Rs).
node_up(Node) ->
copy_rooms(mnesia:dirty_first(muc_online_room)).
copy_rooms('$end_of_table') ->
ok;
copy_rooms(Key) ->
case mnesia:dirty_read(muc_online_room, Key) of
[#muc_online_room{name_host = NameHost} = Room] ->
case ejabberd_cluster:get_node_new(NameHost) of
Node when node() /= Node ->
rpc:cast(Node, mnesia, dirty_write, [Room]);
_ ->
ok
end;
_ ->
ok
end,
copy_rooms(mnesia:dirty_next(muc_online_room, Key)).
%%====================================================================
%% gen_server callbacks
%%====================================================================
@ -219,6 +239,7 @@ init([Host, Opts]) ->
DefRoomOpts = gen_mod:get_opt(default_room_options, Opts, []),
RoomShaper = gen_mod:get_opt(room_shaper, Opts, none),
ejabberd_router:register_route(MyHost),
ejabberd_hooks:add(node_up, ?MODULE, node_up, 100),
ejabberd_hooks:add(node_hash_update, ?MODULE, migrate, 100),
load_permanent_rooms(MyHost, Host,
{Access, AccessCreate, AccessAdmin, AccessPersistent},
@ -306,7 +327,15 @@ handle_info({room_destroyed, RoomHost, Pid}, State) ->
mnesia:delete_object(#muc_online_room{name_host = RoomHost,
pid = Pid})
end,
mnesia:sync_dirty(F),
mnesia:async_dirty(F),
case ejabberd_cluster:get_node_new(RoomHost) of
Node when Node /= node() ->
rpc:cast(Node, mnesia, dirty_delete_object,
[#muc_online_room{name_host = RoomHost,
pid = Pid}]);
_ ->
ok
end,
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
@ -319,6 +348,7 @@ handle_info(_Info, State) ->
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, State) ->
ejabberd_hooks:delete(node_up, ?MODULE, node_up, 100),
ejabberd_hooks:delete(node_hash_update, ?MODULE, migrate, 100),
ejabberd_router:unregister_route(State#state.host),
ok.
@ -507,14 +537,20 @@ do_route1(Host, ServerHost, Access, HistorySize, RoomShaper,
AccessCreate, From,
Room) of
true ->
{ok, Pid} = start_new_room(
Host, ServerHost, Access,
Room, HistorySize,
RoomShaper, From,
Nick, DefRoomOpts),
register_room(Host, Room, Pid),
mod_muc_room:route(Pid, From, Nick, Packet),
ok;
case start_new_room(
Host, ServerHost, Access,
Room, HistorySize,
RoomShaper, From,
Nick, DefRoomOpts) of
{ok, Pid} ->
mod_muc_room:route(Pid, From, Nick, Packet),
register_room(Host, Room, Pid),
ok;
_Err ->
Err = jlib:make_error_reply(
Packet, ?ERR_INTERNAL_SERVER_ERROR),
ejabberd_router:route(To, From, Err)
end;
false ->
Lang = xml:get_attr_s("xml:lang", Attrs),
ErrText = "Room creation is denied by service policy",
@ -603,7 +639,28 @@ register_room(Host, Room, Pid) ->
mnesia:write(#muc_online_room{name_host = {Room, Host},
pid = Pid})
end,
mnesia:sync_dirty(F).
mnesia:async_dirty(F),
case ejabberd_cluster:get_node_new({Room, Host}) of
Node when Node /= node() ->
%% New node has just been added. But we may miss MUC records
%% copy procedure, so we copy the MUC record manually just
%% to make sure
rpc:cast(Node, mnesia, dirty_write,
[#muc_online_room{name_host = {Room, Host},
pid = Pid}]),
case ejabberd_cluster:get_node({Room, Host}) of
Node when node() /= Node ->
%% Migration to new node has completed, and seems like
%% we missed it, so we migrate the MUC room pid manually.
%% It is not a problem if we have already got migration
%% notification: dups are just ignored by the MUC room pid.
mod_muc_room:migrate(Pid, Node, 0);
_ ->
ok
end;
_ ->
ok
end.
iq_disco_info(Lang) ->
[{xmlelement, "identity",

View File

@ -110,7 +110,7 @@ start_link(StateName, StateData) ->
?GEN_FSM:start_link(?MODULE, [StateName, StateData], ?FSMOPTS).
migrate(FsmRef, Node, After) ->
?GEN_FSM:send_all_state_event(FsmRef, {migrate, Node, After}).
erlang:send_after(After, FsmRef, {migrate, Node}).
%%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm
@ -597,9 +597,6 @@ handle_event(destroy, StateName, StateData) ->
handle_event({set_affiliations, Affiliations}, StateName, StateData) ->
{next_state, StateName, StateData#state{affiliations = Affiliations}};
handle_event({migrate, Node, After}, StateName, StateData) when Node /= node() ->
{migrate, StateData,
{Node, ?MODULE, start, [StateName, StateData]}, After * 2};
handle_event(_Event, StateName, StateData) ->
{next_state, StateName, StateData}.
@ -716,6 +713,13 @@ handle_info({captcha_failed, From}, normal_state, StateData) ->
StateData
end,
{next_state, normal_state, NewState};
handle_info({migrate, Node}, StateName, StateData) ->
if Node /= node() ->
{migrate, StateData,
{Node, ?MODULE, start, [StateName, StateData]}, 0};
true ->
{next_state, StateName, StateData}
end;
handle_info(_Info, StateName, StateData) ->
{next_state, StateName, StateData}.