diff --git a/src/ejabberd_app.erl b/src/ejabberd_app.erl index b8c9ae644..1b0289752 100644 --- a/src/ejabberd_app.erl +++ b/src/ejabberd_app.erl @@ -65,6 +65,8 @@ start(normal, _Args) -> %ejabberd_debug:fprof_start(), maybe_add_nameservers(), start_modules(), + ejabberd_cluster:announce(), + ejabberd_node_groups:start(), ejabberd_listener:start_listeners(), ?INFO_MSG("ejabberd ~s is started in the node ~p", [?VERSION, node()]), Sup; diff --git a/src/ejabberd_auth_anonymous.erl b/src/ejabberd_auth_anonymous.erl index cc4cf91fb..53b251f06 100644 --- a/src/ejabberd_auth_anonymous.erl +++ b/src/ejabberd_auth_anonymous.erl @@ -67,9 +67,12 @@ start(Host) when is_list(Host) -> HostB = list_to_binary(Host), %% TODO: Check cluster mode + update_tables(), mnesia:create_table(anonymous, [{ram_copies, [node()]}, {type, bag}, - {attributes, record_info(fields, anonymous)}]), + {type, bag}, {local_content, true}, + {attributes, record_info(fields, anonymous)}]), + mnesia:add_table_copy(anonymous, node(), ram_copies), %% The hooks are needed to add / remove users from the anonymous tables ejabberd_hooks:add(sm_register_connection_hook, HostB, ?MODULE, register_connection, 100), @@ -168,7 +171,7 @@ remove_connection(SID, LUser, LServer) when is_list(LUser), is_list(LServer) -> F = fun() -> mnesia:delete_object({anonymous, US, SID}) end, - mnesia:transaction(F). + mnesia:async_dirty(F). %% @spec (SID, JID, Info) -> term() %% SID = term() @@ -184,7 +187,7 @@ register_connection(SID, JID, Info) when ?IS_JID(JID) -> ok; ?MODULE -> US = {LUser, LServer}, - mnesia:sync_dirty( + mnesia:async_dirty( fun() -> mnesia:write(#anonymous{us = US, sid=SID}) end); _ -> @@ -345,3 +348,11 @@ remove_user(_User, _Server, _Password) -> plain_password_required() -> false. + +update_tables() -> + case catch mnesia:table_info(anonymous, local_content) of + false -> + mnesia:delete_table(anonymous); + _ -> + ok + end. diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 2991677e1..11a0a39ab 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -35,7 +35,7 @@ %% External exports -export([start/2, stop/1, - start_link/2, + start_link/3, send_text/2, send_element/2, socket_type/0, @@ -56,8 +56,9 @@ code_change/4, handle_info/3, terminate/3, - print_state/1 - ]). + print_state/1, + migrate/3 + ]). -export([get_state/1]). @@ -100,6 +101,7 @@ conn = unknown, auth_module = unknown, ip, + fsm_limit_opts, lang}). %-define(DBGFSM, true). @@ -112,11 +114,12 @@ %% Module start with or without supervisor: -ifdef(NO_TRANSIENT_SUPERVISORS). --define(SUPERVISOR_START, ?GEN_FSM:start(ejabberd_c2s, [SockData, Opts], - fsm_limit_opts(Opts) ++ ?FSMOPTS)). +-define(SUPERVISOR_START, ?GEN_FSM:start(ejabberd_c2s, + [SockData, Opts, FSMLimitOpts], + FSMLimitOpts ++ ?FSMOPTS)). -else. -define(SUPERVISOR_START, supervisor:start_child(ejabberd_c2s_sup, - [SockData, Opts])). + [SockData, Opts, FSMLimitOpts])). -endif. %% This is the timeout to apply between event when starting a new @@ -155,12 +158,17 @@ %%%---------------------------------------------------------------------- %%% API %%%---------------------------------------------------------------------- +start(StateName, #state{fsm_limit_opts = Opts} = State) -> + start(StateName, State, Opts); start(SockData, Opts) -> + start(SockData, Opts, fsm_limit_opts(Opts)). + +start(SockData, Opts, FSMLimitOpts) -> ?SUPERVISOR_START. -start_link(SockData, Opts) -> - ?GEN_FSM:start_link(ejabberd_c2s, [SockData, Opts], - fsm_limit_opts(Opts) ++ ?FSMOPTS). +start_link(SockData, Opts, FSMLimitOpts) -> + ?GEN_FSM:start_link(ejabberd_c2s, [SockData, Opts, FSMLimitOpts], + FSMLimitOpts ++ ?FSMOPTS). socket_type() -> xml_stream. @@ -177,6 +185,9 @@ get_state(FsmRef) -> stop(FsmRef) -> ?GEN_FSM:send_event(FsmRef, closed). +migrate(FsmRef, Node, After) -> + ?GEN_FSM:send_all_state_event(FsmRef, {migrate, Node, After}). + %%%---------------------------------------------------------------------- %%% Callback functions from gen_fsm %%%---------------------------------------------------------------------- @@ -188,7 +199,7 @@ stop(FsmRef) -> %% ignore | %% {stop, StopReason} %%---------------------------------------------------------------------- -init([{SockMod, Socket}, Opts]) -> +init([{SockMod, Socket}, Opts, FSMLimitOpts]) -> Access = case lists:keysearch(access, 1, Opts) of {value, {_, A}} -> A; _ -> all @@ -212,7 +223,12 @@ init([{SockMod, Socket}, Opts]) -> (_) -> false end, Opts), TLSOpts = [verify_none | TLSOpts1], - IP = peerip(SockMod, Socket), + IP = case lists:keysearch(frontend_ip, 1, Opts) of + {value, {_, IP1}} -> + IP1; + _ -> + peerip(SockMod, Socket) + end, %% Check if IP is blacklisted: case is_ip_blacklisted(IP) of true -> @@ -240,8 +256,29 @@ init([{SockMod, Socket}, Opts]) -> streamid = new_id(), access = Access, shaper = Shaper, - ip = IP}, + ip = IP, + fsm_limit_opts = FSMLimitOpts}, ?C2S_OPEN_TIMEOUT} + end; +init([StateName, StateData, _FSMLimitOpts]) -> + MRef = (StateData#state.sockmod):monitor(StateData#state.socket), + if StateName == session_established -> + Conn = get_conn_type(StateData), + Info = [{ip, StateData#state.ip}, {conn, Conn}, + {auth_module, StateData#state.auth_module}], + {Time, _} = StateData#state.sid, + SID = {Time, self()}, + Priority = case StateData#state.pres_last of + undefined -> + undefined; + El -> + exmpp_presence:get_priority(El) + end, + ejabberd_sm:open_session(SID, StateData#state.jid, Priority, Info), + NewStateData = StateData#state{sid = SID, socket_monitor = MRef}, + {ok, StateName, NewStateData}; + true -> + {ok, StateName, StateData#state{socket_monitor = MRef}} end. %% Return list of all available resources of contacts, @@ -460,12 +497,12 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> exmpp_jid:to_binary(JID), AuthModule]), SID = {now(), self()}, Conn = get_conn_type(StateData), - Info = [{ip, StateData#state.ip}, {conn, Conn}, - {auth_module, AuthModule}], + %% Info = [{ip, StateData#state.ip}, {conn, Conn}, + %% {auth_module, AuthModule}], Res = exmpp_server_legacy_auth:success(El), send_element(StateData, Res), - ejabberd_sm:open_session( - SID, exmpp_jid:make(U, StateData#state.server, R), Info), + %% ejabberd_sm:open_session( + %% SID, exmpp_jid:make(U, StateData#state.server, R), Info), change_shaper(StateData, JID), {Fs, Ts} = ejabberd_hooks:run_fold( roster_get_subscription_lists, @@ -479,19 +516,19 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> privacy_get_user_list, StateData#state.server, #userlist{}, [UBinary, StateData#state.server]), - fsm_next_state(session_established, - StateData#state{ - sasl_state = 'undefined', - %not used anymore, let the GC work. - user = list_to_binary(U), - resource = list_to_binary(R), - jid = JID, - sid = SID, - conn = Conn, - auth_module = AuthModule, - pres_f = ?SETS:from_list(Fs1), - pres_t = ?SETS:from_list(Ts1), - privacy_list = PrivList}); + maybe_migrate(session_established, + StateData#state{ + sasl_state = 'undefined', + %not used anymore, let the GC work. + user = list_to_binary(U), + resource = list_to_binary(R), + jid = JID, + sid = SID, + conn = Conn, + auth_module = AuthModule, + pres_f = ?SETS:from_list(Fs1), + pres_t = ?SETS:from_list(Ts1), + privacy_list = PrivList}); _ -> ?INFO_MSG( "(~w) Failed legacy authentication for ~s", @@ -805,19 +842,19 @@ wait_for_session({xmlstreamelement, El}, StateData) -> [StateData#state.user, StateData#state.server]), SID = {now(), self()}, Conn = get_conn_type(StateData), - Info = [{ip, StateData#state.ip}, {conn, Conn}, - {auth_module, StateData#state.auth_module}], - ejabberd_sm:open_session( - SID, JID, Info), - fsm_next_state(session_established, - StateData#state{ - sasl_state = 'undefined', - %not used anymore, let the GC work. - sid = SID, - conn = Conn, - pres_f = ?SETS:from_list(Fs1), - pres_t = ?SETS:from_list(Ts1), - privacy_list = PrivList}); + %% Info = [{ip, StateData#state.ip}, {conn, Conn}, + %% {auth_module, StateData#state.auth_module}], + %% ejabberd_sm:open_session( + %% SID, JID, Info), + maybe_migrate(session_established, + StateData#state{ + sasl_state = 'undefined', + %%not used anymore, let the GC work. + sid = SID, + conn = Conn, + pres_f = ?SETS:from_list(Fs1), + pres_t = ?SETS:from_list(Ts1), + privacy_list = PrivList}); _ -> ejabberd_hooks:run(forbidden_session_hook, StateData#state.server, [JID]), @@ -997,6 +1034,8 @@ session_established2(El, StateData) -> %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- +handle_event({migrate, Node, After}, StateName, StateData) when Node /= node() -> + fsm_migrate(StateName, StateData, Node, After * 2); handle_event(_Event, StateName, StateData) -> fsm_next_state(StateName, StateData). @@ -1337,6 +1376,20 @@ print_state(State = #state{pres_t = T, pres_f = F, pres_a = A, pres_i = I}) -> %% Purpose: Shutdown the fsm %% Returns: any %%---------------------------------------------------------------------- +terminate({migrated, ClonePid}, StateName, StateData) -> + if StateName == session_established -> + ?INFO_MSG("(~w) Migrating ~s to ~p on node ~p", + [StateData#state.socket, + exmpp_jid:to_binary(StateData#state.jid), + ClonePid, node(ClonePid)]), + ejabberd_sm:close_session(StateData#state.sid, + StateData#state.jid); + true -> + ok + end, + (StateData#state.sockmod):change_controller( + StateData#state.socket, ClonePid), + ok; terminate(_Reason, StateName, StateData) -> %%TODO: resource could be 'undefined' if terminate before bind? case StateName of @@ -1504,16 +1557,21 @@ get_auth_tags([], U, P, D, R) -> get_conn_type(StateData) -> case (StateData#state.sockmod):get_sockmod(StateData#state.socket) of - gen_tcp -> c2s; - tls -> c2s_tls; - ejabberd_zlib -> - case ejabberd_zlib:get_sockmod((StateData#state.socket)#socket_state.socket) of - gen_tcp -> c2s_compressed; - tls -> c2s_compressed_tls - end; - ejabberd_http_poll -> http_poll; - ejabberd_http_bind -> http_bind; - _ -> unknown + gen_tcp -> c2s; + tls -> c2s_tls; + ejabberd_zlib -> + if is_pid(StateData#state.socket) -> + unknown; + true -> + case ejabberd_zlib:get_sockmod( + (StateData#state.socket)#socket_state.socket) of + gen_tcp -> c2s_compressed; + tls -> c2s_compressed_tls + end + end; + ejabberd_http_poll -> http_poll; + ejabberd_http_bind -> http_bind; + _ -> unknown end. process_presence_probe(From, To, StateData) -> @@ -2072,6 +2130,27 @@ peerip(SockMod, Socket) -> _ -> undefined end. +maybe_migrate(StateName, StateData) -> + case ejabberd_cluster:get_node({StateData#state.user, + StateData#state.server}) of + Node when Node == node() -> + Conn = get_conn_type(StateData), + Info = [{ip, StateData#state.ip}, {conn, Conn}, + {auth_module, StateData#state.auth_module}], + #state{user = U, server = S, jid = JID, sid = SID} = StateData, + ejabberd_sm:open_session(SID, JID, Info), + case ejabberd_cluster:get_node_new({U, S}) of + Node -> + ok; + NewNode -> + After = ejabberd_cluster:rehash_timeout(), + migrate(self(), NewNode, After) + end, + fsm_next_state(StateName, StateData); + Node -> + fsm_migrate(StateName, StateData, Node, 0) + end. + %% fsm_next_state: Generate the next_state FSM tuple with different %% timeout, depending on the future state fsm_next_state(session_established, StateData) -> @@ -2079,6 +2158,10 @@ fsm_next_state(session_established, StateData) -> fsm_next_state(StateName, StateData) -> {next_state, StateName, StateData, ?C2S_OPEN_TIMEOUT}. +fsm_migrate(StateName, StateData, Node, Timeout) -> + {migrate, StateData, + {Node, ?MODULE, start, [StateName, StateData]}, Timeout}. + %% fsm_reply: Generate the reply FSM tuple with different timeout, %% depending on the future state fsm_reply(Reply, session_established, StateData) -> diff --git a/src/ejabberd_captcha.erl b/src/ejabberd_captcha.erl index d933a19f3..4bf1eaacb 100644 --- a/src/ejabberd_captcha.erl +++ b/src/ejabberd_captcha.erl @@ -35,7 +35,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([create_captcha/6, build_captcha_html/2, check_captcha/2, +-export([create_captcha/5, build_captcha_html/2, check_captcha/2, process_reply/1, process/2, is_feature_available/0]). -include_lib("exmpp/include/exmpp.hrl"). @@ -65,19 +65,11 @@ -define(CAPTCHA_TEXT(Lang), translate:translate(Lang, "Enter the text you see")). -define(CAPTCHA_LIFETIME, 120000). % two minutes +-define(RPC_TIMEOUT, 5000). -record(state, {}). -record(captcha, {id, pid, key, tref, args}). --define(T(S), - case catch mnesia:transaction(fun() -> S end) of - {atomic, Res} -> - Res; - {_, Reason} -> - ?ERROR_MSG("mnesia transaction failed: ~p", [Reason]), - {error, Reason} - end). - %%==================================================================== %% API %%==================================================================== @@ -88,10 +80,11 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -create_captcha(Id, SID, From, To, Lang, Args) - when is_list(Id), is_list(SID) -> +create_captcha(SID, From, To, Lang, Args) + when is_binary(Lang), is_list(SID) -> case create_image() of {ok, Type, Key, Image} -> + Id = randoms:get_string() ++ "-" ++ ejabberd_cluster:node_id(), B64Image = jlib:encode_base64(binary_to_list(Image)), JID = exmpp_jid:to_list(From), CID = "sha1+" ++ sha:sha(Image) ++ "@bob.xmpp.org", @@ -216,13 +209,9 @@ create_captcha(Id, SID, From, To, Lang, Args) %OOB = {xmlelement, "x", [{"xmlns", ?NS_OOBD_X_s}], % [{xmlelement, "url", [], [{xmlcdata, get_url(Id)}]}]}, Tref = erlang:send_after(?CAPTCHA_LIFETIME, ?MODULE, {remove_id, Id}), - case ?T(mnesia:write(#captcha{id=Id, pid=self(), key=Key, - tref=Tref, args=Args})) of - ok -> - {ok, [Body, OOB, Captcha, Data]}; - _Err -> - error - end; + ets:insert(captcha, #captcha{id=Id, pid=self(), key=Key, + tref=Tref, args=Args}), + {ok, Id, [Body, OOB, Captcha, Data]}; _Err -> error end. @@ -234,8 +223,8 @@ create_captcha(Id, SID, From, To, Lang, Args) %% IdEl = xmlelement() %% KeyEl = xmlelement() build_captcha_html(Id, Lang) -> - case mnesia:dirty_read(captcha, Id) of - [#captcha{}] -> + case lookup_captcha(Id) of + {ok, _} -> %ImgEl = {xmlelement, "img", [{"src", get_url(Id ++ "/image")}], []}, ImgEl = #xmlel{ @@ -362,20 +351,25 @@ build_captcha_html(Id, Lang) -> %% @spec (Id::string(), ProvidedKey::string()) -> captcha_valid | captcha_non_valid | captcha_not_found check_captcha(Id, ProvidedKey) -> - ?T(case mnesia:read(captcha, Id, write) of - [#captcha{pid=Pid, args=Args, key=StoredKey, tref=Tref}] -> - mnesia:delete({captcha, Id}), - erlang:cancel_timer(Tref), - if StoredKey == ProvidedKey -> - Pid ! {captcha_succeed, Args}, - captcha_valid; - true -> - Pid ! {captcha_failed, Args}, - captcha_non_valid - end; - _ -> - captcha_not_found - end). + case string:tokens(Id, "-") of + [_, NodeID] -> + case ejabberd_cluster:get_node_by_id(NodeID) of + Node when Node == node() -> + do_check_captcha(Id, ProvidedKey); + Node -> + case catch rpc:call(Node, ?MODULE, check_captcha, + [Id, ProvidedKey], ?RPC_TIMEOUT) of + {'EXIT', _} -> + captcha_not_found; + {badrpc, _} -> + captcha_not_found; + Res -> + Res + end + end; + _ -> + captcha_not_found + end. process_reply(El) -> case {exmpp_xml:element_matches(El, captcha), @@ -389,20 +383,14 @@ process_reply(El) -> case {proplists:get_value("challenge", Fields), proplists:get_value("ocr", Fields)} of {[Id|_], [OCR|_]} -> - ?T(case mnesia:read(captcha, Id, write) of - [#captcha{pid=Pid, args=Args, key=Key, tref=Tref}] -> - mnesia:delete({captcha, Id}), - erlang:cancel_timer(Tref), - if OCR == Key -> - Pid ! {captcha_succeed, Args}, - ok; - true -> - Pid ! {captcha_failed, Args}, - {error, bad_match} - end; - _ -> - {error, not_found} - end); + case check_captcha(Id, OCR) of + captcha_valid -> + ok; + captcha_non_valid -> + {error, bad_match}; + captcha_not_found -> + {error, not_found} + end; _ -> {error, malformed} end @@ -431,8 +419,8 @@ process(_Handlers, #request{method='GET', lang=Lang, path=[_, Id]}) -> end; process(_Handlers, #request{method='GET', path=[_, Id, "image"]}) -> - case mnesia:dirty_read(captcha, Id) of - [#captcha{key=Key}] -> + case lookup_captcha(Id) of + {ok, #captcha{key=Key}} -> case create_image(Key) of {ok, Type, _, Img} -> {200, @@ -477,10 +465,8 @@ process(_Handlers, _Request) -> %% gen_server callbacks %%==================================================================== init([]) -> - mnesia:create_table(captcha, - [{ram_copies, [node()]}, - {attributes, record_info(fields, captcha)}]), - mnesia:add_table_copy(captcha, node(), ram_copies), + mnesia:delete_table(captcha), + ets:new(captcha, [named_table, public, {keypos, #captcha.id}]), check_captcha_setup(), {ok, #state{}}. @@ -492,13 +478,13 @@ handle_cast(_Msg, State) -> handle_info({remove_id, Id}, State) -> ?DEBUG("captcha ~p timed out", [Id]), - _ = ?T(case mnesia:read(captcha, Id, write) of - [#captcha{args=Args, pid=Pid}] -> - Pid ! {captcha_failed, Args}, - mnesia:delete({captcha, Id}); - _ -> - ok - end), + case ets:lookup(captcha, Id) of + [#captcha{args=Args, pid=Pid}] -> + Pid ! {captcha_failed, Args}, + ets:delete(captcha, Id); + _ -> + ok + end, {noreply, State}; handle_info(_Info, State) -> @@ -642,3 +628,43 @@ check_captcha_setup() -> false -> ok end. + +lookup_captcha(Id) -> + case string:tokens(Id, "-") of + [_, NodeID] -> + case ejabberd_cluster:get_node_by_id(NodeID) of + Node when Node == node() -> + case ets:lookup(captcha, Id) of + [C] -> + {ok, C}; + _ -> + {error, enoent} + end; + Node -> + case catch rpc:call(Node, ets, lookup, + [captcha, Id], ?RPC_TIMEOUT) of + [C] -> + {ok, C}; + _ -> + {error, enoent} + end + end; + _ -> + {error, enoent} + end. + +do_check_captcha(Id, ProvidedKey) -> + case ets:lookup(captcha, Id) of + [#captcha{pid = Pid, args = Args, key = ValidKey, tref = Tref}] -> + ets:delete(captcha, Id), + erlang:cancel_timer(Tref), + if ValidKey == ProvidedKey -> + Pid ! {captcha_succeed, Args}, + captcha_valid; + true -> + Pid ! {captcha_failed, Args}, + captcha_non_valid + end; + _ -> + captcha_not_found + end. diff --git a/src/ejabberd_cluster.erl b/src/ejabberd_cluster.erl new file mode 100644 index 000000000..3db8dd29d --- /dev/null +++ b/src/ejabberd_cluster.erl @@ -0,0 +1,177 @@ +%%%------------------------------------------------------------------- +%%% File : ejabberd_cluster.erl +%%% Author : Evgeniy Khramtsov +%%% Description : +%%% +%%% Created : 2 Apr 2010 by Evgeniy Khramtsov +%%%------------------------------------------------------------------- +-module(ejabberd_cluster). + +-behaviour(gen_server). + +%% API +-export([start_link/0, get_node/1, get_node_new/1, announce/0, + node_id/0, get_node_by_id/1, get_nodes/0, rehash_timeout/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-include("ejabberd.hrl"). + +-define(HASHTBL, nodes_hash). +-define(HASHTBL_NEW, nodes_hash_new). +-define(POINTS, 16). +-define(REHASH_TIMEOUT, 5000). + +-record(state, {}). + +%%==================================================================== +%% API +%%==================================================================== +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +get_node(Key) -> + Hash = erlang:phash2(Key), + get_node_by_hash(?HASHTBL, Hash). + +get_node_new(Key) -> + Hash = erlang:phash2(Key), + get_node_by_hash(?HASHTBL_NEW, Hash). + +get_nodes() -> + %% TODO + mnesia:system_info(running_db_nodes). + +announce() -> + gen_server:call(?MODULE, announce, infinity). + +node_id() -> + integer_to_list(erlang:phash2(node())). + +rehash_timeout() -> + ?REHASH_TIMEOUT. + +get_node_by_id(NodeID) when is_list(NodeID) -> + case catch list_to_existing_atom(NodeID) of + {'EXIT', _} -> + node(); + Res -> + get_node_by_id(Res) + end; +get_node_by_id(NodeID) -> + case global:whereis_name(NodeID) of + Pid when is_pid(Pid) -> + node(Pid); + _ -> + node() + end. + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== +init([]) -> + net_kernel:monitor_nodes(true, [{node_type, visible}]), + ets:new(?HASHTBL, [named_table, public, ordered_set]), + ets:new(?HASHTBL_NEW, [named_table, public, ordered_set]), + register_node(), + AllNodes = mnesia:system_info(running_db_nodes), + OtherNodes = case AllNodes of + [_] -> + AllNodes; + _ -> + AllNodes -- [node()] + end, + append_nodes(?HASHTBL, OtherNodes), + append_nodes(?HASHTBL_NEW, AllNodes), + {ok, #state{}}. + +handle_call(announce, _From, State) -> + case mnesia:system_info(running_db_nodes) of + [_MyNode] -> + ok; + Nodes -> + OtherNodes = Nodes -- [node()], + lists:foreach( + fun(Node) -> + {?MODULE, Node} ! {node_ready, node()} + end, OtherNodes), + ?INFO_MSG("waiting for migration from nodes: ~w", + [OtherNodes]), + timer:sleep(?REHASH_TIMEOUT), + append_node(?HASHTBL, node()) + end, + {reply, ok, State}; +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({node_ready, Node}, State) -> + ?INFO_MSG("node ~p is ready, starting migration", [Node]), + append_node(?HASHTBL_NEW, Node), + ejabberd_hooks:run(node_hash_update, [?REHASH_TIMEOUT]), + timer:sleep(?REHASH_TIMEOUT), + ?INFO_MSG("adding node ~p to hash", [Node]), + append_node(?HASHTBL, Node), + {noreply, State}; +handle_info({nodedown, Node, _}, State) -> + ?INFO_MSG("node ~p goes down", [Node]), + delete_node(?HASHTBL, Node), + delete_node(?HASHTBL_NEW, Node), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- +append_nodes(Tab, Nodes) -> + lists:foreach( + fun(Node) -> + append_node(Tab, Node) + end, Nodes). + +append_node(Tab, Node) -> + lists:foreach( + fun(I) -> + Hash = erlang:phash2({I, Node}), + ets:insert(Tab, {Hash, Node}) + end, lists:seq(1, ?POINTS)). + +delete_node(Tab, Node) -> + lists:foreach( + fun(I) -> + Hash = erlang:phash2({I, Node}), + ets:delete(Tab, Hash) + end, lists:seq(1, ?POINTS)). + +get_node_by_hash(Tab, Hash) -> + NodeHash = case ets:next(Tab, Hash) of + '$end_of_table' -> + ets:first(Tab); + NH -> + NH + end, + if NodeHash == '$end_of_table' -> + erlang:error(no_running_nodes); + true -> + case ets:lookup(Tab, NodeHash) of + [] -> + get_node_by_hash(Tab, Hash); + [{_, Node}] -> + Node + end + end. + +register_node() -> + global:register_name(list_to_atom(node_id()), self()). diff --git a/src/ejabberd_frontend_socket.erl b/src/ejabberd_frontend_socket.erl index 82106da55..88d77aed9 100644 --- a/src/ejabberd_frontend_socket.erl +++ b/src/ejabberd_frontend_socket.erl @@ -45,6 +45,8 @@ get_peer_certificate/1, get_verify_result/1, close/1, + setopts/2, + change_controller/2, sockname/1, peername/1]). %% gen_server callbacks @@ -94,17 +96,15 @@ start(Module, SockMod, Socket, Opts) -> todo end. -starttls(FsmRef, _TLSOpts) -> - %gen_server:call(FsmRef, {starttls, TLSOpts}), - FsmRef. +starttls(FsmRef, TLSOpts) -> + starttls(FsmRef, TLSOpts, undefined). starttls(FsmRef, TLSOpts, Data) -> gen_server:call(FsmRef, {starttls, TLSOpts, Data}), FsmRef. compress(FsmRef) -> - gen_server:call(FsmRef, compress), - FsmRef. + compress(FsmRef, undefined). compress(FsmRef, Data) -> gen_server:call(FsmRef, {compress, Data}), @@ -137,10 +137,14 @@ close(FsmRef) -> sockname(FsmRef) -> gen_server:call(FsmRef, sockname). -peername(_FsmRef) -> - %gen_server:call(FsmRef, peername). - {ok, {{0, 0, 0, 0}, 0}}. +setopts(FsmRef, Opts) -> + gen_server:call(FsmRef, {setopts, Opts}). +change_controller(FsmRef, C2SPid) -> + gen_server:call(FsmRef, {change_controller, C2SPid}). + +peername(FsmRef) -> + gen_server:call(FsmRef, peername). %%==================================================================== %% gen_server callbacks @@ -156,9 +160,16 @@ peername(_FsmRef) -> init([Module, SockMod, Socket, Opts, Receiver]) -> %% TODO: monitor the receiver Node = ejabberd_node_groups:get_closest_node(backend), + IP = case peername(SockMod, Socket) of + {ok, IP1} -> + IP1; + _ -> + undefined + end, {SockMod2, Socket2} = check_starttls(SockMod, Socket, Receiver, Opts), {ok, Pid} = - rpc:call(Node, Module, start, [{?MODULE, self()}, Opts]), + rpc:call(Node, Module, start, + [{?MODULE, self()}, [{frontend_ip, IP} | Opts]]), ejabberd_receiver:become_controller(Receiver, Pid), {ok, #state{sockmod = SockMod2, socket = Socket2, @@ -173,38 +184,16 @@ init([Module, SockMod, Socket, Opts, Receiver]) -> %% {stop, Reason, State} %% Description: Handling call messages %%-------------------------------------------------------------------- -handle_call({starttls, TLSOpts}, _From, State) -> - {ok, TLSSocket} = tls:tcp_to_tls(State#state.socket, TLSOpts), - ejabberd_receiver:starttls(State#state.receiver, TLSSocket), - Reply = ok, - {reply, Reply, State#state{socket = TLSSocket, sockmod = tls}, - ?HIBERNATE_TIMEOUT}; - handle_call({starttls, TLSOpts, Data}, _From, State) -> - {ok, TLSSocket} = tls:tcp_to_tls(State#state.socket, TLSOpts), - ejabberd_receiver:starttls(State#state.receiver, TLSSocket), - catch (State#state.sockmod):send( - State#state.socket, Data), + {ok, TLSSocket} = ejabberd_receiver:starttls( + State#state.receiver, TLSOpts, Data), Reply = ok, {reply, Reply, State#state{socket = TLSSocket, sockmod = tls}, ?HIBERNATE_TIMEOUT}; -handle_call(compress, _From, State) -> - {ok, ZlibSocket} = ejabberd_zlib:enable_zlib( - State#state.sockmod, - State#state.socket), - ejabberd_receiver:compress(State#state.receiver, ZlibSocket), - Reply = ok, - {reply, Reply, State#state{socket = ZlibSocket, sockmod = ejabberd_zlib}, - ?HIBERNATE_TIMEOUT}; - handle_call({compress, Data}, _From, State) -> - {ok, ZlibSocket} = ejabberd_zlib:enable_zlib( - State#state.sockmod, - State#state.socket), - ejabberd_receiver:compress(State#state.receiver, ZlibSocket), - catch (State#state.sockmod):send( - State#state.socket, Data), + {ok, ZlibSocket} = ejabberd_receiver:compress( + State#state.receiver, Data), Reply = ok, {reply, Reply, State#state{socket = ZlibSocket, sockmod = ejabberd_zlib}, ?HIBERNATE_TIMEOUT}; @@ -244,13 +233,7 @@ handle_call(close, _From, State) -> handle_call(sockname, _From, State) -> #state{sockmod = SockMod, socket = Socket} = State, - Reply = - case SockMod of - gen_tcp -> - inet:sockname(Socket); - _ -> - SockMod:sockname(Socket) - end, + Reply = peername(SockMod, Socket), {reply, Reply, State, ?HIBERNATE_TIMEOUT}; handle_call(peername, _From, State) -> @@ -264,6 +247,14 @@ handle_call(peername, _From, State) -> end, {reply, Reply, State, ?HIBERNATE_TIMEOUT}; +handle_call({setopts, Opts}, _From, State) -> + ejabberd_receiver:setopts(State#state.receiver, Opts), + {reply, ok, State, ?HIBERNATE_TIMEOUT}; + +handle_call({change_controller, Pid}, _From, State) -> + ejabberd_receiver:change_controller(State#state.receiver, Pid), + {reply, ok, State, ?HIBERNATE_TIMEOUT}; + handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State, ?HIBERNATE_TIMEOUT}. @@ -316,10 +307,16 @@ check_starttls(SockMod, Socket, Receiver, Opts) -> end, Opts), if TLSEnabled -> - {ok, TLSSocket} = tls:tcp_to_tls(Socket, TLSOpts), - ejabberd_receiver:starttls(Receiver, TLSSocket), + {ok, TLSSocket} = ejabberd_receiver:starttls(Receiver, TLSOpts), {tls, TLSSocket}; true -> {SockMod, Socket} end. +peername(SockMod, Socket) -> + case SockMod of + gen_tcp -> + inet:peername(Socket); + _ -> + SockMod:peername(Socket) + end. diff --git a/src/ejabberd_local.erl b/src/ejabberd_local.erl index 107aadc0b..65086d972 100644 --- a/src/ejabberd_local.erl +++ b/src/ejabberd_local.erl @@ -142,7 +142,7 @@ route(From, To, Packet) -> route_iq(From, To, #iq{type = Type} = IQ, F) when is_function(F) -> Packet = if Type == set; Type == get -> - ID = list_to_binary(randoms:get_string()), + ID = list_to_binary(ejabberd_router:make_id()), Host = exmpp_jid:prep_domain(From), register_iq_response_handler(Host, ID, undefined, F), exmpp_iq:iq_to_xmlel(IQ#iq{id = ID}); @@ -153,10 +153,10 @@ route_iq(From, To, #iq{type = Type} = IQ, F) when is_function(F) -> register_iq_response_handler(_Host, ID, Module, Function) -> TRef = erlang:start_timer(?IQ_TIMEOUT, ejabberd_local, ID), - mnesia:dirty_write(#iq_response{id = ID, - module = Module, - function = Function, - timer = TRef}). + ets:insert(iq_response, #iq_response{id = ID, + module = Module, + function = Function, + timer = TRef}). register_iq_handler(Host, XMLNS, Module, Fun) -> ejabberd_local ! {register_iq_handler, Host, XMLNS, Module, Fun}. @@ -198,11 +198,9 @@ init([]) -> ?MODULE, bounce_resource_packet, 100) end, ?MYHOSTS), catch ets:new(?IQTABLE, [named_table, public]), - update_table(), - mnesia:create_table(iq_response, - [{ram_copies, [node()]}, - {attributes, record_info(fields, iq_response)}]), - mnesia:add_table_copy(iq_response, node(), ram_copies), + mnesia:delete_table(iq_response), + catch ets:new(iq_response, [named_table, public, + {keypos, #iq_response.id}]), {ok, #state{}}. %%-------------------------------------------------------------------- @@ -286,7 +284,7 @@ handle_info(refresh_iq_handlers, State) -> end, ets:tab2list(?IQTABLE)), {noreply, State}; handle_info({timeout, _TRef, ID}, State) -> - process_iq_timeout(ID), + spawn(fun() -> process_iq_timeout(ID) end), {noreply, State}; handle_info(_Info, State) -> {noreply, State}. @@ -342,40 +340,22 @@ do_route(From, To, Packet) -> end end. -update_table() -> - case catch mnesia:table_info(iq_response, attributes) of - [id, module, function] -> - mnesia:delete_table(iq_response); - [id, module, function, timer] -> - ok; - {'EXIT', _} -> - ok - end. - get_iq_callback(ID) -> - case mnesia:dirty_read(iq_response, ID) of + case ets:lookup(iq_response, ID) of [#iq_response{module = Module, timer = TRef, function = Function}] -> cancel_timer(TRef), - mnesia:dirty_delete(iq_response, ID), + ets:delete(iq_response, ID), {ok, Module, Function}; _ -> error end. process_iq_timeout(ID) -> - spawn(fun process_iq_timeout/0) ! ID. - -process_iq_timeout() -> - receive - ID -> - case get_iq_callback(ID) of - {ok, undefined, Function} -> - Function(timeout); - _ -> - ok - end - after 5000 -> + case get_iq_callback(ID) of + {ok, undefined, Function} -> + Function(timeout); + _ -> ok end. diff --git a/src/ejabberd_node_groups.erl b/src/ejabberd_node_groups.erl index fc1b4ded5..055b96aef 100644 --- a/src/ejabberd_node_groups.erl +++ b/src/ejabberd_node_groups.erl @@ -31,6 +31,7 @@ %% API -export([start_link/0, + start/0, join/1, leave/1, get_members/1, @@ -40,7 +41,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {}). +-record(state, {groups = []}). %%==================================================================== %% API @@ -49,6 +50,15 @@ %% Function: start_link() -> {ok,Pid} | ignore | {error,Error} %% Description: Starts the server %%-------------------------------------------------------------------- +start() -> + ChildSpec = {?MODULE, + {?MODULE, start_link, []}, + permanent, + brutal_kill, + worker, + [?MODULE]}, + supervisor:start_child(ejabberd_sup, ChildSpec). + start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -81,30 +91,19 @@ get_closest_node(Name) -> %% Description: Initiates the server %%-------------------------------------------------------------------- init([]) -> - {FE, BE} = + Groups = case ejabberd_config:get_local_option(node_type) of frontend -> - {true, false}; + [frontend]; backend -> - {false, true}; + [backend]; generic -> - {true, true}; + [frontend, backend]; undefined -> - {true, true} + [frontend, backend] end, - if - FE -> - join(frontend); - true -> - ok - end, - if - BE -> - join(backend); - true -> - ok - end, - {ok, #state{}}. + lists:foreach(fun join/1, Groups), + {ok, #state{groups = Groups}}. %%-------------------------------------------------------------------- %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | @@ -144,7 +143,8 @@ handle_info(_Info, State) -> %% cleaning up. When it returns, the gen_server terminates with Reason. %% The return value is ignored. %%-------------------------------------------------------------------- -terminate(_Reason, _State) -> +terminate(_Reason, #state{groups = Groups}) -> + lists:foreach(fun leave/1, Groups), ok. %%-------------------------------------------------------------------- diff --git a/src/ejabberd_receiver.erl b/src/ejabberd_receiver.erl index d0c464d30..f0cef9fe8 100644 --- a/src/ejabberd_receiver.erl +++ b/src/ejabberd_receiver.erl @@ -36,8 +36,12 @@ change_shaper/2, reset_stream/1, starttls/2, + starttls/3, compress/2, + send/2, become_controller/2, + change_controller/2, + setopts/2, close/1]). %% gen_server callbacks @@ -52,6 +56,7 @@ c2s_pid, max_stanza_size, xml_stream_state, + tref, timeout}). -define(HIBERNATE_TIMEOUT, 90000). @@ -86,15 +91,32 @@ change_shaper(Pid, Shaper) -> reset_stream(Pid) -> gen_server:call(Pid, reset_stream). -starttls(Pid, TLSSocket) -> - gen_server:call(Pid, {starttls, TLSSocket}). +starttls(Pid, TLSOpts) -> + starttls(Pid, TLSOpts, undefined). -compress(Pid, ZlibSocket) -> - gen_server:call(Pid, {compress, ZlibSocket}). +starttls(Pid, TLSOpts, Data) -> + gen_server:call(Pid, {starttls, TLSOpts, Data}). + +compress(Pid, Data) -> + gen_server:call(Pid, {compress, Data}). become_controller(Pid, C2SPid) -> gen_server:call(Pid, {become_controller, C2SPid}). +change_controller(Pid, C2SPid) -> + gen_server:call(Pid, {change_controller, C2SPid}). + +setopts(Pid, Opts) -> + case lists:member({active, false}, Opts) of + true -> + gen_server:call(Pid, deactivate_socket); + false -> + ok + end. + +send(Pid, Data) -> + gen_server:call(Pid, {send, Data}). + close(Pid) -> gen_server:cast(Pid, close). @@ -132,28 +154,42 @@ init([Socket, SockMod, Shaper, MaxStanzaSize]) -> %% {stop, Reason, State} %% Description: Handling call messages %%-------------------------------------------------------------------- -handle_call({starttls, TLSSocket}, _From, State) -> +handle_call({starttls, TLSOpts, Data}, _From, State) -> + {ok, TLSSocket} = tls:tcp_to_tls(State#state.socket, TLSOpts), + if Data /= undefined -> + do_send(State, Data); + true -> + ok + end, NewXMLStreamState = do_reset_stream(State), NewState = State#state{socket = TLSSocket, sock_mod = tls, xml_stream_state = NewXMLStreamState}, case tls:recv_data(TLSSocket, "") of {ok, TLSData} -> - {NextState, Hib} = process_data(TLSData, NewState), - {reply, ok, NextState, Hib}; + {NextState, Hib} = process_data(TLSData, NewState), + {reply, {ok, TLSSocket}, NextState, Hib}; {error, _Reason} -> {stop, normal, ok, NewState} end; -handle_call({compress, ZlibSocket}, _From, - #state{xml_stream_state = XMLStreamState} = State) -> +handle_call({compress, Data}, _From, + #state{xml_stream_state = XMLStreamState, + sock_mod = SockMod, + socket = Socket} = State) -> + {ok, ZlibSocket} = ejabberd_zlib:enable_zlib(SockMod, Socket), + if Data /= undefined -> + do_send(State, Data); + true -> + ok + end, NewXMLStreamState = exmpp_xmlstream:reset(XMLStreamState), NewState = State#state{socket = ZlibSocket, sock_mod = ejabberd_zlib, xml_stream_state = NewXMLStreamState}, case ejabberd_zlib:recv_data(ZlibSocket, "") of {ok, ZlibData} -> - {NextState, Hib} = process_data(ZlibData, NewState), - {reply, ok, NextState, Hib}; + {NextState, Hib} = process_data(ZlibData, NewState), + {reply, {ok, ZlibSocket}, NextState, Hib}; {error, _Reason} -> {stop, normal, ok, NewState} end; @@ -164,6 +200,7 @@ handle_call(reset_stream, _From, {reply, Reply, State#state{xml_stream_state = NewXMLStreamState}, ?HIBERNATE_TIMEOUT}; handle_call({become_controller, C2SPid}, _From, State) -> + erlang:monitor(process, C2SPid), close_stream(State#state.xml_stream_state), XMLStreamState = new_xmlstream(C2SPid, State#state.max_stanza_size), NewState = State#state{c2s_pid = C2SPid, @@ -171,6 +208,24 @@ handle_call({become_controller, C2SPid}, _From, State) -> activate_socket(NewState), Reply = ok, {reply, Reply, NewState, ?HIBERNATE_TIMEOUT}; +handle_call({change_controller, C2SPid}, _From, State) -> + erlang:monitor(process, C2SPid), + NewXMLStreamState = exmpp_xmlstream:change_callback( + State#state.xml_stream_state, {gen_fsm, C2SPid}), + NewState = State#state{c2s_pid = C2SPid, + xml_stream_state = NewXMLStreamState}, + activate_socket(NewState), + {reply, ok, NewState, ?HIBERNATE_TIMEOUT}; +handle_call({send, Data}, _From, State) -> + case do_send(State, Data) of + ok -> + {reply, ok, State, ?HIBERNATE_TIMEOUT}; + {error, _Reason} = Err -> + {stop, normal, Err, State} + end; +handle_call(deactivate_socket, _From, State) -> + deactivate_socket(State), + {reply, ok, State, ?HIBERNATE_TIMEOUT}; handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State, ?HIBERNATE_TIMEOUT}. @@ -231,6 +286,9 @@ handle_info({Tag, _TCPSocket, Reason}, State) _ -> {stop, normal, State} end; +handle_info({'DOWN', _MRef, process, C2SPid, _}, + #state{c2s_pid = C2SPid} = State) -> + {stop, normal, State}; handle_info({timeout, _Ref, activate}, State) -> activate_socket(State), {noreply, State, ?HIBERNATE_TIMEOUT}; @@ -288,6 +346,17 @@ activate_socket(#state{socket = Socket, ok end. +deactivate_socket(#state{socket = Socket, + tref = TRef, + sock_mod = SockMod}) -> + cancel_timer(TRef), + case SockMod of + gen_tcp -> + inet:setopts(Socket, [{active, false}]); + _ -> + SockMod:setopts(Socket, [{active, false}]) + end. + %% Data processing for connectors directly generating xmlel in %% Erlang data structure. %% WARNING: Shaper does not work with Erlang data structure. @@ -309,25 +378,25 @@ process_data([Element|Els], #state{c2s_pid = C2SPid} = State) %% Data processing for connectors receivind data as string. process_data(Data, #state{xml_stream_state = XMLStreamState, + tref = TRef, shaper_state = ShaperState, c2s_pid = C2SPid} = State) -> ?DEBUG("Received XML on stream = ~p", [Data]), {ok, XMLStreamState1} = exmpp_xmlstream:parse(XMLStreamState, Data), {NewShaperState, Pause} = shaper:update(ShaperState, size(Data)), - HibTimeout = + {NewTRef, HibTimeout} = if - C2SPid == undefined -> - infinity; + C2SPid == undefined -> + {TRef, infinity}; Pause > 0 -> - erlang:start_timer(Pause, self(), activate), - hibernate; - - true -> - activate_socket(State), - ?HIBERNATE_TIMEOUT + {erlang:start_timer(Pause, self(), activate), hibernate}; + true -> + activate_socket(State), + {TRef, ?HIBERNATE_TIMEOUT} end, {State#state{xml_stream_state = XMLStreamState1, - shaper_state = NewShaperState}, HibTimeout}. + tref = NewTRef, + shaper_state = NewShaperState}, HibTimeout}. %% Element coming from XML parser are wrapped inside xmlstreamelement %% When we receive directly xmlel tuple (from a socket module @@ -345,6 +414,23 @@ close_stream(XMLStreamState) -> exmpp_xml:stop_parser(exmpp_xmlstream:get_parser(XMLStreamState)), exmpp_xmlstream:stop(XMLStreamState). +do_send(State, Data) -> + (State#state.sock_mod):send(State#state.socket, Data). + +cancel_timer(TRef) when is_reference(TRef) -> + case erlang:cancel_timer(TRef) of + false -> + receive + {timeout, TRef, _} -> + ok + after 0 -> + ok + end; + _ -> + ok + end; +cancel_timer(_) -> + ok. do_reset_stream(#state{xml_stream_state = undefined, c2s_pid = C2SPid, max_stanza_size = MaxStanzaSize}) -> new_xmlstream(C2SPid, MaxStanzaSize); diff --git a/src/ejabberd_router.erl b/src/ejabberd_router.erl index 122afa4df..d6927f398 100644 --- a/src/ejabberd_router.erl +++ b/src/ejabberd_router.erl @@ -38,7 +38,8 @@ unregister_route/1, unregister_routes/1, dirty_get_all_routes/0, - dirty_get_all_domains/0 + dirty_get_all_domains/0, + make_id/0 ]). -export([start_link/0]). @@ -54,6 +55,9 @@ -record(route, {domain, pid, local_hint}). -record(state, {}). +%% "rr" stands for Record-Route. +-define(ROUTE_PREFIX, "rr-"). + %%==================================================================== %% API %%==================================================================== @@ -76,7 +80,7 @@ route(FromOld, ToOld, #xmlelement{} = PacketOld) -> [{?NS_XMPP, ?NS_XMPP_pfx}]), route(From, To, Packet); route(From, To, Packet) -> - case catch do_route(From, To, Packet) of + case catch route_check_id(From, To, Packet) of {'EXIT', Reason} -> ?ERROR_MSG("~p~nwhen processing: ~p", [Reason, {From, To, Packet}]); @@ -210,6 +214,8 @@ dirty_get_all_domains() -> lists:map(fun erlang:binary_to_list/1, mnesia:dirty_all_keys(route))). +make_id() -> + ?ROUTE_PREFIX ++ randoms:get_string() ++ "-" ++ ejabberd_cluster:node_id(). %%==================================================================== %% gen_server callbacks @@ -338,6 +344,32 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- +route_check_id(From, To, #xmlel{name = iq} = Packet) -> + case exmpp_xml:get_attribute_as_list(Packet, 'id', "") of + ?ROUTE_PREFIX ++ Rest -> + Type = exmpp_xml:get_attribute_as_list(Packet, 'type', ""), + if Type == "error"; Type == "result" -> + case string:tokens(Rest, "-") of + [_, NodeID] -> + case ejabberd_cluster:get_node_by_id(NodeID) of + Node when Node == node() -> + do_route(From, To, Packet); + Node -> + {ejabberd_router, Node} ! + {route, From, To, Packet} + end; + _ -> + do_route(From, To, Packet) + end; + true -> + do_route(From, To, Packet) + end; + _ -> + do_route(From, To, Packet) + end; +route_check_id(From, To, Packet) -> + do_route(From, To, Packet). + do_route(OrigFrom, OrigTo, OrigPacket) -> ?DEBUG("route~n\tfrom ~p~n\tto ~p~n\tpacket ~p~n", [OrigFrom, OrigTo, OrigPacket]), diff --git a/src/ejabberd_s2s.erl b/src/ejabberd_s2s.erl index f5946a416..59af8d9aa 100644 --- a/src/ejabberd_s2s.erl +++ b/src/ejabberd_s2s.erl @@ -40,7 +40,8 @@ dirty_get_connections/0, allow_host/2, incoming_s2s_number/0, - outgoing_s2s_number/0 + outgoing_s2s_number/0, + migrate/1 ]). %% gen_server callbacks @@ -113,30 +114,63 @@ remove_connection(FromTo, Pid, Key) -> end. have_connection(FromTo) -> - case catch mnesia:dirty_read(s2s, FromTo) of - [_] -> - true; - _ -> - false + case ejabberd_cluster:get_node(FromTo) of + Node when Node == node() -> + case mnesia:dirty_read(s2s, FromTo) of + [_] -> + true; + _ -> + false + end; + Node -> + case catch rpc:call(Node, mnesia, dirty_read, + [s2s, FromTo], 5000) of + [_] -> + true; + _ -> + false + end end. has_key(FromTo, Key) -> - case mnesia:dirty_select(s2s, - [{#s2s{fromto = FromTo, key = Key, _ = '_'}, - [], - ['$_']}]) of - [] -> - false; - _ -> - true + Query = [{#s2s{fromto = FromTo, key = Key, _ = '_'}, + [], + ['$_']}], + case ejabberd_cluster:get_node(FromTo) of + Node when Node == node() -> + case mnesia:dirty_select(s2s, Query) of + [] -> + false; + _ -> + true + end; + Node -> + case catch rpc:call(Node, mnesia, dirty_select, + [s2s, Query], 5000) of + [_|_] -> + true; + _ -> + false + end end. get_connections_pids(FromTo) -> - case catch mnesia:dirty_read(s2s, FromTo) of - L when is_list(L) -> - [Connection#s2s.pid || Connection <- L]; - _ -> - [] + case ejabberd_cluster:get_node(FromTo) of + Node when Node == node() -> + case catch mnesia:dirty_read(s2s, FromTo) of + L when is_list(L) -> + [Connection#s2s.pid || Connection <- L]; + _ -> + [] + end; + Node -> + case catch rpc:call(Node, mnesia, dirty_read, + [s2s, FromTo], 5000) of + L when is_list(L) -> + [Connection#s2s.pid || Connection <- L]; + _ -> + [] + end end. try_register(FromTo) -> @@ -167,7 +201,33 @@ try_register(FromTo) -> end. dirty_get_connections() -> - mnesia:dirty_all_keys(s2s). + lists:flatmap( + fun(Node) when Node == node() -> + mnesia:dirty_all_keys(s2s); + (Node) -> + case catch rpc:call(Node, mnesia, dirty_all_keys, [s2s], 5000) of + L when is_list(L) -> + L; + _ -> + [] + end + end, ejabberd_cluster:get_nodes()). + +migrate(After) -> + Ss = mnesia:dirty_select( + s2s, + [{#s2s{fromto = '$1', pid = '$2', _ = '_'}, + [], + ['$$']}]), + lists:foreach( + fun([FromTo, Pid]) -> + case ejabberd_cluster:get_node_new(FromTo) of + Node when Node /= node() -> + ejabberd_s2s_out:stop_connection(Pid, After * 2); + _ -> + ok + end + end, Ss). %%==================================================================== %% gen_server callbacks @@ -182,10 +242,11 @@ dirty_get_connections() -> %%-------------------------------------------------------------------- init([]) -> update_tables(), - mnesia:create_table(s2s, [{ram_copies, [node()]}, {type, bag}, + mnesia:create_table(s2s, [{ram_copies, [node()]}, + {type, bag}, {local_content, true}, {attributes, record_info(fields, s2s)}]), mnesia:add_table_copy(s2s, node(), ram_copies), - mnesia:subscribe(system), + ejabberd_hooks:add(node_hash_update, ?MODULE, migrate, 100), ejabberd_commands:register_commands(commands()), {ok, #state{}}. @@ -217,9 +278,6 @@ handle_cast(_Msg, State) -> %% {stop, Reason, State} %% Description: Handling all non call/cast messages %%-------------------------------------------------------------------- -handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> - clean_table_from_bad_node(Node), - {noreply, State}; %% #xmlelement{} used for retro-compatibility handle_info({route, FromOld, ToOld, #xmlelement{} = PacketOld}, State) -> catch throw(for_stacktrace), % To have a stacktrace. @@ -251,6 +309,7 @@ handle_info(_Info, State) -> %% The return value is ignored. %%-------------------------------------------------------------------- terminate(_Reason, _State) -> + ejabberd_hooks:delete(node_hash_update, ?MODULE, migrate, 100), ejabberd_commands:unregister_commands(commands()), ok. @@ -264,22 +323,19 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- -clean_table_from_bad_node(Node) -> - F = fun() -> - Es = mnesia:select( - s2s, - [{#s2s{pid = '$1', _ = '_'}, - [{'==', {node, '$1'}, Node}], - ['$_']}]), - lists:foreach(fun(E) -> - mnesia:delete_object(E) - end, Es) - end, - mnesia:async_dirty(F). - do_route(From, To, Packet) -> ?DEBUG("s2s manager~n\tfrom ~p~n\tto ~p~n\tpacket ~P~n", [From, To, Packet, 8]), + FromTo = {exmpp_jid:prep_domain_as_list(From), + exmpp_jid:prep_domain_as_list(To)}, + case ejabberd_cluster:get_node(FromTo) of + Node when Node == node() -> + do_route1(From, To, Packet); + Node -> + {?MODULE, Node} ! {route, From, To, Packet} + end. + +do_route1(From, To, Packet) -> case find_connection(From, To) of {atomic, Pid} when is_pid(Pid) -> ?DEBUG("sending to process ~p~n", [Pid]), @@ -510,6 +566,12 @@ update_tables() -> mnesia:delete_table(local_s2s); false -> ok + end, + case catch mnesia:table_info(s2s, local_content) of + false -> + mnesia:delete_table(s2s); + _ -> + ok end. %% Check if host is in blacklist or white list diff --git a/src/ejabberd_s2s_out.erl b/src/ejabberd_s2s_out.erl index b2e8764e1..37b45d390 100644 --- a/src/ejabberd_s2s_out.erl +++ b/src/ejabberd_s2s_out.erl @@ -34,7 +34,8 @@ start_link/3, start_connection/1, terminate_if_waiting_delay/2, - stop_connection/1]). + stop_connection/1, + stop_connection/2]). %% p1_fsm callbacks (same as gen_fsm) -export([init/1, @@ -52,7 +53,7 @@ handle_info/3, terminate/3, code_change/4, - print_state/1, + print_state/1, test_get_addr_port/1, get_addr_port/1]). @@ -85,10 +86,11 @@ %% Module start with or without supervisor: -ifdef(NO_TRANSIENT_SUPERVISORS). --define(SUPERVISOR_START, p1_fsm:start(ejabberd_s2s_out, [From, Host, Type], - fsm_limit_opts() ++ ?FSMOPTS)). +-define(SUPERVISOR_START, rpc:call(Node, p1_fsm, start, + [ejabberd_s2s_out, [From, Host, Type], + fsm_limit_opts() ++ ?FSMOPTS])). -else. --define(SUPERVISOR_START, supervisor:start_child(ejabberd_s2s_out_sup, +-define(SUPERVISOR_START, supervisor:start_child({ejabberd_s2s_out_sup, Node}, [From, Host, Type])). -endif. @@ -115,6 +117,7 @@ %%% API %%%---------------------------------------------------------------------- start(From, Host, Type) -> + Node = ejabberd_cluster:get_node({From, Host}), ?SUPERVISOR_START. start_link(From, Host, Type) -> @@ -125,7 +128,10 @@ start_connection(Pid) -> p1_fsm:send_event(Pid, init). stop_connection(Pid) -> - p1_fsm:send_event(Pid, stop). + p1_fsm:send_event(Pid, closed). + +stop_connection(Pid, Timeout) -> + p1_fsm:send_all_state_event(Pid, {closed, Timeout}). %%%---------------------------------------------------------------------- %%% Callback functions from p1_fsm @@ -736,6 +742,9 @@ stream_established(closed, StateData) -> %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- +handle_event({closed, Timeout}, StateName, StateData) -> + p1_fsm:send_event_after(Timeout, closed), + {next_state, StateName, StateData}; handle_event(_Event, StateName, StateData) -> {next_state, StateName, StateData, get_timeout_interval(StateName)}. diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index 08059082f..3d1984ad0 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -32,7 +32,10 @@ %% API -export([start_link/0, route/3, - open_session/3, close_session/2, + set_session/4, + open_session/3, + open_session/4, + close_session/2, check_in_subscription/6, bounce_offline_message/3, disconnect_removed_user/2, @@ -43,6 +46,7 @@ dirty_get_sessions_list/0, dirty_get_my_sessions_list/0, get_vh_session_list/1, + get_vh_my_session_list/1, get_vh_session_number/1, register_iq_handler/4, register_iq_handler/5, @@ -53,7 +57,8 @@ user_resources/2, get_session_pid/1, get_user_info/3, - get_user_ip/1 + get_user_ip/1, + migrate/1 ]). %% gen_server callbacks @@ -67,7 +72,6 @@ -include("mod_privacy.hrl"). -record(session, {sid, usr, us, priority, info}). --record(session_counter, {vhost, count}). -record(state, {}). %% default value for the maximum number of user connections @@ -112,10 +116,11 @@ route(From, To, Packet) -> ok end. -open_session(SID, JID, Info) when ?IS_JID(JID) -> - set_session(SID, JID, undefined, Info), - mnesia:dirty_update_counter(session_counter, - exmpp_jid:domain(JID), 1), +open_session(SID, JID, Info) -> + open_session(SID, JID, undefined, Info). + +open_session(SID, JID, Priority, Info) when ?IS_JID(JID) -> + set_session(SID, JID, Priority, Info), check_for_sessions_to_replace(JID), ejabberd_hooks:run(sm_register_connection_hook, exmpp_jid:prep_domain(JID), [SID, JID, Info]). @@ -126,9 +131,7 @@ close_session(SID, JID ) when ?IS_JID(JID)-> [#session{info=I}] -> I end, F = fun() -> - mnesia:delete({session, SID}), - mnesia:dirty_update_counter(session_counter, - exmpp_jid:domain(JID), -1) + mnesia:delete({session, SID}) end, mnesia:sync_dirty(F), ejabberd_hooks:run(sm_remove_connection_hook, exmpp_jid:prep_domain(JID), @@ -156,25 +159,37 @@ disconnect_removed_user(User, Server) -> children = [{exit, "User removed"}]}). get_user_resources(User, Server) - when is_binary(User), is_binary(Server) -> + when is_binary(User), is_binary(Server) -> US = {User, Server}, - case catch mnesia:dirty_index_read(session, US, #session.us) of - {'EXIT', _Reason} -> - []; - Ss -> - [element(3, S#session.usr) || S <- clean_session_list(Ss)] + Ss = case ejabberd_cluster:get_node({User, Server}) of + Node when Node == node() -> + catch mnesia:dirty_index_read(session, US, #session.us); + Node -> + catch rpc:call(Node, mnesia, dirty_index_read, + [session, US, #session.us], 5000) + end, + if is_list(Ss) -> + [element(3, S#session.usr) || S <- clean_session_list(Ss)]; + true -> + [] end. get_user_ip(JID) when ?IS_JID(JID) -> - USR = {exmpp_jid:prep_node(JID), - exmpp_jid:prep_domain(JID), + USR = {LUser = exmpp_jid:prep_node(JID), + LServer = exmpp_jid:prep_domain(JID), exmpp_jid:prep_resource(JID)}, - case mnesia:dirty_index_read(session, USR, #session.usr) of - [] -> - undefined; - Ss -> + Ss = case ejabberd_cluster:get_node({LUser, LServer}) of + Node when Node == node() -> + mnesia:dirty_index_read(session, USR, #session.usr); + Node -> + catch rpc:call(Node, mnesia, dirty_index_read, + [session, USR, #session.usr], 5000) + end, + if is_list(Ss), Ss /= [] -> Session = lists:max(Ss), - proplists:get_value(ip, Session#session.info) + proplists:get_value(ip, Session#session.info); + true -> + undefined end. get_user_info(User, Server, Resource) @@ -187,15 +202,21 @@ get_user_info(User, Server, Resource) USR = {LUser, LServer, LResource}, - case mnesia:dirty_index_read(session, USR, #session.usr) of - [] -> - offline; - Ss -> + Ss = case ejabberd_cluster:get_node({LUser, LServer}) of + Node when Node == node() -> + mnesia:dirty_index_read(session, USR, #session.usr); + Node -> + catch rpc:call(Node, mnesia, dirty_index_read, + [session, USR, #session.usr], 5000) + end, + if is_list(Ss), Ss /= [] -> Session = lists:max(Ss), - Node = node(element(2, Session#session.sid)), + N = node(element(2, Session#session.sid)), Conn = proplists:get_value(conn, Session#session.info), IP = proplists:get_value(ip, Session#session.info), - [{node, Node}, {conn, Conn}, {ip, IP}] + [{node, N}, {conn, Conn}, {ip, IP}]; + true -> + offline end. set_presence(SID, JID, Priority, Presence, Info) when ?IS_JID(JID) -> @@ -229,27 +250,38 @@ get_session_pid(JID) when ?IS_JID(JID) -> get_session_pid({exmpp_jid:prep_node(JID), exmpp_jid:prep_domain(JID), exmpp_jid:prep_resource(JID)}); -get_session_pid(USR) -> - case catch mnesia:dirty_index_read(session, USR, #session.usr) of +get_session_pid({LUser, LServer, _} = USR) -> + Res = case ejabberd_cluster:get_node({LUser, LServer}) of + Node when Node == node() -> + mnesia:dirty_index_read(session, USR, #session.usr); + Node -> + catch rpc:call(Node, mnesia, dirty_index_read, + [session, USR, #session.usr], 5000) + end, + case Res of [#session{sid = {_, Pid}}] -> Pid; _ -> none end. dirty_get_sessions_list() -> - mnesia:dirty_select( - session, - [{#session{usr = '$1', _ = '_'}, - [], - ['$1']}]). + Match = [{#session{usr = '$1', _ = '_'}, [], ['$1']}], + lists:flatmap( + fun(Node) when Node == node() -> + mnesia:dirty_select(session, Match); + (Node) -> + case catch rpc:call(Node, mnesia, dirty_select, + [session, Match], 5000) of + Ss when is_list(Ss) -> + Ss; + _ -> + [] + end + end, ejabberd_cluster:get_nodes()). dirty_get_my_sessions_list() -> - mnesia:dirty_select( - session, - [{#session{sid = {'_', '$1'}, _ = '_'}, - [{'==', {node, '$1'}, node()}], - ['$_']}]). + mnesia:dirty_match_object(#session{_ = '_'}). -get_vh_session_list(Server) when is_binary(Server) -> +get_vh_my_session_list(Server) when is_binary(Server) -> LServer = exmpp_stringprep:nameprep(Server), mnesia:dirty_select( session, @@ -257,19 +289,24 @@ get_vh_session_list(Server) when is_binary(Server) -> [{'==', {element, 2, '$1'}, LServer}], ['$1']}]). +get_vh_session_list(Server) when is_binary(Server) -> + lists:flatmap( + fun(Node) when Node == node() -> + get_vh_my_session_list(Server); + (Node) -> + case catch rpc:call(Node, ?MODULE, get_vh_my_session_list, + [Server], 5000) of + Ss when is_list(Ss) -> + Ss; + _ -> + [] + end + end, ejabberd_cluster:get_nodes()). + get_vh_session_number(Server) -> - LServer = exmpp_jid:prep_domain(exmpp_jid:parse(Server)), - Query = mnesia:dirty_select( - session_counter, - [{#session_counter{vhost = LServer, count = '$1'}, - [], - ['$1']}]), - case Query of - [Count] -> - Count; - _ -> 0 - end. - + %% TODO + length(get_vh_session_list(Server)). + register_iq_handler(Host, XMLNS, Module, Fun) -> ejabberd_sm ! {register_iq_handler, Host, XMLNS, Module, Fun}. @@ -279,6 +316,21 @@ register_iq_handler(Host, XMLNS, Module, Fun, Opts) -> unregister_iq_handler(Host, XMLNS) -> ejabberd_sm ! {unregister_iq_handler, Host, XMLNS}. +migrate(After) -> + Ss = mnesia:dirty_select( + session, + [{#session{us = '$1', sid = {'_', '$2'}, _ = '_'}, + [], + ['$$']}]), + lists:foreach( + fun([US, Pid]) -> + case ejabberd_cluster:get_node_new(US) of + Node when Node /= node() -> + ejabberd_c2s:migrate(Pid, Node, After); + _ -> + ok + end + end, Ss). %%==================================================================== %% gen_server callbacks @@ -295,16 +347,13 @@ init([]) -> update_tables(), mnesia:create_table(session, [{ram_copies, [node()]}, + {local_content, true}, {attributes, record_info(fields, session)}]), - mnesia:create_table(session_counter, - [{ram_copies, [node()]}, - {attributes, record_info(fields, session_counter)}]), mnesia:add_table_index(session, usr), mnesia:add_table_index(session, us), mnesia:add_table_copy(session, node(), ram_copies), - mnesia:add_table_copy(session_counter, node(), ram_copies), - mnesia:subscribe(system), ets:new(sm_iqtable, [named_table]), + ejabberd_hooks:add(node_hash_update, ?MODULE, migrate, 100), lists:foreach( fun(Host) -> HostB = list_to_binary(Host), @@ -367,9 +416,6 @@ handle_info({route, From, To, Packet}, State) -> ok end, {noreply, State}; -handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> - recount_session_table(Node), - {noreply, State}; handle_info({register_iq_handler, Host, XMLNS, Module, Function}, State) -> ets:insert(sm_iqtable, {{XMLNS, Host}, Module, Function}), {noreply, State}; @@ -396,6 +442,7 @@ handle_info(_Info, State) -> %% The return value is ignored. %%-------------------------------------------------------------------- terminate(_Reason, _State) -> + ejabberd_hooks:delete(node_hash_update, ?MODULE, migrate, 100), ejabberd_commands:unregister_commands(commands()), ok. @@ -424,38 +471,19 @@ set_session(SID, JID, Priority, Info) -> end, mnesia:sync_dirty(F). -%% Recalculates alive sessions when Node goes down -%% and updates session and session_counter tables -recount_session_table(Node) -> - F = fun() -> - Es = mnesia:select( - session, - [{#session{sid = {'_', '$1'}, _ = '_'}, - [{'==', {node, '$1'}, Node}], - ['$_']}]), - lists:foreach(fun(E) -> - mnesia:delete({session, E#session.sid}) - end, Es), - %% reset session_counter table with active sessions - mnesia:clear_table(session_counter), - lists:foreach(fun(Server) -> - LServer = exmpp_jid:prep_domain(exmpp_jid:parse(Server)), - Hs = mnesia:select(session, - [{#session{usr = '$1', _ = '_'}, - [{'==', {element, 2, '$1'}, LServer}], - ['$1']}]), - mnesia:write( - #session_counter{vhost = LServer, - count = length(Hs)}) - end, ?MYHOSTS) - end, - mnesia:async_dirty(F). - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - do_route(From, To, Packet) -> ?DEBUG("session manager~n\tfrom ~p~n\tto ~p~n\tpacket ~P~n", [From, To, Packet, 8]), + {U, S} = {exmpp_jid:prep_node(To), exmpp_jid:prep_domain(To)}, + case ejabberd_cluster:get_node({U, S}) of + Node when Node /= node() -> + {?MODULE, Node} ! {route, From, To, Packet}; + _ -> + do_route1(From, To, Packet) + end. + +do_route1(From, To, Packet) -> case exmpp_jid:prep_resource(To) of undefined -> case Packet of @@ -835,4 +863,11 @@ update_tables() -> mnesia:delete_table(local_session); false -> ok + end, + mnesia:delete_table(session_counter), + case catch mnesia:table_info(session, local_content) of + false -> + mnesia:delete_table(session); + _ -> + ok end. diff --git a/src/ejabberd_socket.erl b/src/ejabberd_socket.erl index 25ff64c27..8eeda452d 100644 --- a/src/ejabberd_socket.erl +++ b/src/ejabberd_socket.erl @@ -44,6 +44,7 @@ get_peer_certificate/1, get_verify_result/1, close/1, + change_controller/2, sockname/1, peername/1]). -include("ejabberd.hrl"). @@ -129,29 +130,19 @@ connect(Addr, Port, Opts, Timeout) -> end. starttls(SocketData, TLSOpts) -> - {ok, TLSSocket} = tls:tcp_to_tls(SocketData#socket_state.socket, TLSOpts), - ejabberd_receiver:starttls(SocketData#socket_state.receiver, TLSSocket), - SocketData#socket_state{socket = TLSSocket, sockmod = tls}. + starttls(SocketData, TLSOpts, undefined). starttls(SocketData, TLSOpts, Data) -> - {ok, TLSSocket} = tls:tcp_to_tls(SocketData#socket_state.socket, TLSOpts), - ejabberd_receiver:starttls(SocketData#socket_state.receiver, TLSSocket), - send(SocketData, Data), + {ok, TLSSocket} = ejabberd_receiver:starttls( + SocketData#socket_state.receiver, TLSOpts, Data), SocketData#socket_state{socket = TLSSocket, sockmod = tls}. compress(SocketData) -> - {ok, ZlibSocket} = ejabberd_zlib:enable_zlib( - SocketData#socket_state.sockmod, - SocketData#socket_state.socket), - ejabberd_receiver:compress(SocketData#socket_state.receiver, ZlibSocket), - SocketData#socket_state{socket = ZlibSocket, sockmod = ejabberd_zlib}. + compress(SocketData, undefined). compress(SocketData, Data) -> - {ok, ZlibSocket} = ejabberd_zlib:enable_zlib( - SocketData#socket_state.sockmod, - SocketData#socket_state.socket), - ejabberd_receiver:compress(SocketData#socket_state.receiver, ZlibSocket), - send(SocketData, Data), + {ok, ZlibSocket} = ejabberd_receiver:compress( + SocketData#socket_state.receiver, Data), SocketData#socket_state{socket = ZlibSocket, sockmod = ejabberd_zlib}. reset_stream(SocketData) when is_pid(SocketData#socket_state.receiver) -> @@ -160,10 +151,25 @@ reset_stream(SocketData) when is_atom(SocketData#socket_state.receiver) -> (SocketData#socket_state.receiver):reset_stream( SocketData#socket_state.socket). +change_controller(#socket_state{receiver = Recv}, Pid) when is_pid(Recv) -> + ejabberd_receiver:setopts(Recv, [{active, false}]), + sync_events(Pid), + ejabberd_receiver:change_controller(Recv, Pid); +change_controller(#socket_state{socket = Socket, receiver = Mod}, Pid) -> + Mod:setopts(Socket, [{active, false}]), + sync_events(Pid), + Mod:change_controller(Socket, Pid). + %% sockmod=gen_tcp|tls|ejabberd_zlib send(SocketData, Data) -> - case catch (SocketData#socket_state.sockmod):send( - SocketData#socket_state.socket, Data) of + Res = if node(SocketData#socket_state.receiver) == node() -> + catch (SocketData#socket_state.sockmod):send( + SocketData#socket_state.socket, Data); + true -> + catch ejabberd_receiver:send( + SocketData#socket_state.receiver, Data) + end, + case Res of ok -> ok; {error, timeout} -> ?INFO_MSG("Timeout on ~p:send",[SocketData#socket_state.sockmod]), @@ -225,3 +231,21 @@ peername(#socket_state{sockmod = SockMod, socket = Socket}) -> %%==================================================================== %% Internal functions %%==================================================================== +%% dirty hack to relay queued messages from +%% old owner to new owner. The idea is based +%% on code of gen_tcp:controlling_process/2. +sync_events(C2SPid) -> + receive + {'$gen_event', El} = Event when element(1, El) == xmlel; + element(1, El) == xmlstreamstart; + element(1, El) == xmlstreamelement; + element(1, El) == xmlstreamend; + element(1, El) == xmlstreamerror -> + C2SPid ! Event, + sync_events(C2SPid); + closed -> + C2SPid ! closed, + sync_events(C2SPid) + after 0 -> + ok + end. diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl index b3a1a5ea1..1fc193e5d 100644 --- a/src/ejabberd_sup.erl +++ b/src/ejabberd_sup.erl @@ -42,13 +42,6 @@ init([]) -> brutal_kill, worker, [ejabberd_hooks]}, - NodeGroups = - {ejabberd_node_groups, - {ejabberd_node_groups, start_link, []}, - permanent, - brutal_kill, - worker, - [ejabberd_node_groups]}, SystemMonitor = {ejabberd_system_monitor, {ejabberd_system_monitor, start_link, []}, @@ -184,9 +177,16 @@ init([]) -> infinity, supervisor, [ejabberd_tmp_sup]}, + Cluster = + {ejabberd_cluster, + {ejabberd_cluster, start_link, []}, + permanent, + brutal_kill, + worker, + [ejabberd_cluster]}, {ok, {{one_for_one, 10, 1}, [Hooks, - NodeGroups, + Cluster, SystemMonitor, Router, Router_multicast, diff --git a/src/mod_muc/mod_muc.erl b/src/mod_muc/mod_muc.erl index c75c3e8cb..b02d8b8a4 100644 --- a/src/mod_muc/mod_muc.erl +++ b/src/mod_muc/mod_muc.erl @@ -41,6 +41,9 @@ create_room/5, process_iq_disco_items/4, broadcast_service_message/2, + register_room/3, + migrate/1, + get_vh_rooms/1, can_use_nick/3]). %% gen_server callbacks @@ -112,7 +115,9 @@ room_destroyed(Host, Room, Pid, ServerHost) when is_binary(Host), %% Else use the passed options as defined in mod_muc_room. create_room(Host, Name, From, Nick, Opts) -> Proc = gen_mod:get_module_proc(Host, ?PROCNAME), - gen_server:call(Proc, {create, Name, From, Nick, Opts}). + RoomHost = gen_mod:get_module_opt_host(Host, ?MODULE, "conference.@HOST@"), + Node = ejabberd_cluster:get_node({Name, RoomHost}), + gen_server:call({Proc, Node}, {create, Name, From, Nick, Opts}). store_room(Host, Name, Opts) when is_binary(Host), is_binary(Name) -> F = fun() -> @@ -163,6 +168,22 @@ can_use_nick(Host, JID, Nick) when is_binary(Host), is_binary(Nick) -> U == LUS end. +migrate(After) -> + Rs = mnesia:dirty_select( + muc_online_room, + [{#muc_online_room{name_host = '$1', pid = '$2', _ = '_'}, + [], + ['$$']}]), + lists:foreach( + fun([NameHost, Pid]) -> + case ejabberd_cluster:get_node_new(NameHost) of + Node when Node /= node() -> + mod_muc_room:migrate(Pid, Node, After); + _ -> + ok + end + end, Rs). + %%==================================================================== %% gen_server callbacks %%==================================================================== @@ -175,6 +196,7 @@ can_use_nick(Host, JID, Nick) when is_binary(Host), is_binary(Nick) -> %% Description: Initiates the server %%-------------------------------------------------------------------- init([Host, Opts]) -> + update_muc_online_table(), mnesia:create_table(muc_room, [{disc_copies, [node()]}, {attributes, record_info(fields, muc_room)}]), @@ -183,15 +205,14 @@ init([Host, Opts]) -> {attributes, record_info(fields, muc_registered)}]), mnesia:create_table(muc_online_room, [{ram_copies, [node()]}, + {local_content, true}, {attributes, record_info(fields, muc_online_room)}]), mnesia:add_table_copy(muc_online_room, node(), ram_copies), catch ets:new(muc_online_users, [bag, named_table, public, {keypos, 2}]), MyHost_L = gen_mod:get_opt_host(Host, Opts, "conference.@HOST@"), MyHost = list_to_binary(MyHost_L), update_tables(MyHost), - clean_table_from_bad_node(node(), MyHost), mnesia:add_table_index(muc_registered, nick), - mnesia:subscribe(system), Access = gen_mod:get_opt(access, Opts, all), AccessCreate = gen_mod:get_opt(access_create, Opts, all), AccessAdmin = gen_mod:get_opt(access_admin, Opts, none), @@ -200,6 +221,7 @@ init([Host, Opts]) -> DefRoomOpts = gen_mod:get_opt(default_room_options, Opts, []), RoomShaper = gen_mod:get_opt(room_shaper, Opts, none), ejabberd_router:register_route(MyHost_L), + ejabberd_hooks:add(node_hash_update, ?MODULE, migrate, 100), load_permanent_rooms(MyHost, Host, {Access, AccessCreate, AccessAdmin, AccessPersistent}, HistorySize, @@ -266,12 +288,19 @@ handle_info({route, From, To, Packet}, default_room_opts = DefRoomOpts, history_size = HistorySize, room_shaper = RoomShaper} = State) -> - case catch do_route(Host, ServerHost, Access, HistorySize, RoomShaper, - From, To, Packet, DefRoomOpts) of - {'EXIT', Reason} -> - ?ERROR_MSG("~p", [Reason]); - _ -> - ok + US = {exmpp_jid:prep_node(To), exmpp_jid:prep_domain(To)}, + case ejabberd_cluster:get_node(US) of + Node when Node == node() -> + case catch do_route(Host, ServerHost, Access, HistorySize, + RoomShaper, From, To, Packet, DefRoomOpts) of + {'EXIT', Reason} -> + ?ERROR_MSG("~p", [Reason]); + _ -> + ok + end; + Node -> + Proc = gen_mod:get_module_proc(ServerHost, ?PROCNAME), + {Proc, Node} ! {route, From, To, Packet} end, {noreply, State}; handle_info({room_destroyed, RoomHost, Pid}, State) -> @@ -279,10 +308,7 @@ handle_info({room_destroyed, RoomHost, Pid}, State) -> mnesia:delete_object(#muc_online_room{name_host = RoomHost, pid = Pid}) end, - mnesia:transaction(F), - {noreply, State}; -handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> - clean_table_from_bad_node(Node), + mnesia:async_dirty(F), {noreply, State}; handle_info(_Info, State) -> {noreply, State}. @@ -295,6 +321,7 @@ handle_info(_Info, State) -> %% The return value is ignored. %%-------------------------------------------------------------------- terminate(_Reason, State) -> + ejabberd_hooks:delete(node_hash_update, ?MODULE, migrate, 100), ejabberd_router:unregister_route(binary_to_list(State#state.host)), ok. @@ -516,17 +543,22 @@ load_permanent_rooms(Host, ServerHost, Access, HistorySize, RoomShaper) -> lists:foreach( fun(R) -> {Room, Host} = R#muc_room.name_host, - case mnesia:dirty_read(muc_online_room, {Room, Host}) of - [] -> - {ok, Pid} = mod_muc_room:start( - Host, - ServerHost, - Access, - Room, - HistorySize, - RoomShaper, - R#muc_room.opts), - register_room(Host, Room, Pid); + case ejabberd_cluster:get_node({Room, Host}) of + Node when Node == node() -> + case mnesia:dirty_read(muc_online_room, {Room, Host}) of + [] -> + {ok, Pid} = mod_muc_room:start( + Host, + ServerHost, + Access, + Room, + HistorySize, + RoomShaper, + R#muc_room.opts), + register_room(Host, Room, Pid); + _ -> + ok + end; _ -> ok end @@ -555,7 +587,7 @@ register_room(Host, Room, Pid) when is_binary(Host), is_binary(Room) -> mnesia:write(#muc_online_room{name_host = {Room, Host}, pid = Pid}) end, - mnesia:transaction(F). + mnesia:async_dirty(F). iq_disco_info(Lang) -> @@ -605,7 +637,7 @@ iq_disco_items(Host, From, Lang, none) when is_binary(Host) -> _ -> false end - end, get_vh_rooms(Host)); + end, get_vh_rooms_all_nodes(Host)); iq_disco_items(Host, From, Lang, Rsm) -> {Rooms, RsmO} = get_vh_rooms(Host, Rsm), @@ -639,19 +671,9 @@ iq_disco_items(Host, From, Lang, Rsm) -> end, Rooms) ++ RsmOut. get_vh_rooms(Host, #rsm_in{max=M, direction=Direction, id=I, index=Index})-> - AllRooms = lists:sort(get_vh_rooms(Host)), + AllRooms = get_vh_rooms_all_nodes(Host), Count = erlang:length(AllRooms), - Guard = case Direction of - _ when Index =/= undefined -> [{'==', {element, 2, '$1'}, Host}]; - aft -> [{'==', {element, 2, '$1'}, Host}, {'>=',{element, 1, '$1'} ,I}]; - before when I =/= []-> [{'==', {element, 2, '$1'}, Host}, {'=<',{element, 1, '$1'} ,I}]; - _ -> [{'==', {element, 2, '$1'}, Host}] - end, - L = lists:sort( - mnesia:dirty_select(muc_online_room, - [{#muc_online_room{name_host = '$1', _ = '_'}, - Guard, - ['$_']}])), + L = get_vh_rooms_direction(Direction, I, Index, AllRooms), L2 = if Index == undefined andalso Direction == before -> lists:reverse(lists:sublist(lists:reverse(L), 1, M)); @@ -674,6 +696,27 @@ get_vh_rooms(Host, #rsm_in{max=M, direction=Direction, id=I, index=Index})-> {L2, #rsm_out{first=F, last=Last, count=Count, index=NewIndex}} end. +get_vh_rooms_direction(_Direction, _I, Index, AllRooms) when Index =/= undefined -> + AllRooms; +get_vh_rooms_direction(aft, I, _Index, AllRooms) -> + {_Before, After} = + lists:splitwith( + fun(#muc_online_room{name_host = {Na, _}}) -> + Na < I end, AllRooms), + case After of + [] -> []; + [#muc_online_room{name_host = {I, _Host}} | AfterTail] -> AfterTail; + _ -> After + end; +get_vh_rooms_direction(before, I, _Index, AllRooms) when I =/= []-> + {Before, _} = + lists:splitwith( + fun(#muc_online_room{name_host = {Na, _}}) -> + Na < I end, AllRooms), + Before; +get_vh_rooms_direction(_Direction, _I, _Index, AllRooms) -> + AllRooms. + %% @doc Return the position of desired room in the list of rooms. %% The room must exist in the list. The count starts in 0. %% @spec (Desired::muc_online_room(), Rooms::[muc_online_room()]) -> integer() @@ -847,7 +890,22 @@ broadcast_service_message(Host, Msg) -> fun(#muc_online_room{pid = Pid}) -> gen_fsm:send_all_state_event( Pid, {service_message, Msg}) - end, get_vh_rooms(Host)). + end, get_vh_rooms_all_nodes(Host)). + +get_vh_rooms_all_nodes(Host) -> + Rooms = lists:foldl( + fun(Node, Acc) when Node == node() -> + get_vh_rooms(Host) ++ Acc; + (Node, Acc) -> + case catch rpc:call(Node, ?MODULE, get_vh_rooms, + [Host], 5000) of + Res when is_list(Res) -> + Res ++ Acc; + _ -> + Acc + end + end, [], ejabberd_cluster:get_nodes()), + lists:ukeysort(#muc_online_room.name_host, Rooms). get_vh_rooms(Host) when is_binary(Host) -> mnesia:dirty_select(muc_online_room, @@ -855,39 +913,18 @@ get_vh_rooms(Host) when is_binary(Host) -> [{'==', {element, 2, '$1'}, Host}], ['$_']}]). - -clean_table_from_bad_node(Node) -> - F = fun() -> - Es = mnesia:select( - muc_online_room, - [{#muc_online_room{pid = '$1', _ = '_'}, - [{'==', {node, '$1'}, Node}], - ['$_']}]), - lists:foreach(fun(E) -> - mnesia:delete_object(E) - end, Es) - end, - mnesia:async_dirty(F). - -clean_table_from_bad_node(Node, Host) -> - F = fun() -> - Es = mnesia:select( - muc_online_room, - [{#muc_online_room{pid = '$1', - name_host = {'_', Host}, - _ = '_'}, - [{'==', {node, '$1'}, Node}], - ['$_']}]), - lists:foreach(fun(E) -> - mnesia:delete_object(E) - end, Es) - end, - mnesia:async_dirty(F). - update_tables(Host) -> update_muc_room_table(Host), update_muc_registered_table(Host). +update_muc_online_table() -> + case catch mnesia:table_info(muc_online_room, local_content) of + false -> + mnesia:delete_table(muc_online_room); + _ -> + ok + end. + update_muc_room_table(Host) -> Fields = record_info(fields, muc_room), case mnesia:table_info(muc_room, attributes) of diff --git a/src/mod_muc/mod_muc_log.erl b/src/mod_muc/mod_muc_log.erl index 71d07ee27..1b5175ec0 100644 --- a/src/mod_muc/mod_muc_log.erl +++ b/src/mod_muc/mod_muc_log.erl @@ -74,11 +74,11 @@ %% Description: Starts the server %%-------------------------------------------------------------------- start_link(Host, Opts) -> - Proc = gen_mod:get_module_proc(Host, ?PROCNAME), - gen_server:start_link({local, Proc}, ?MODULE, [Host, Opts], []). + Proc = get_proc_name(Host), + gen_server:start_link(Proc, ?MODULE, [Host, Opts], []). start(Host, Opts) -> - Proc = gen_mod:get_module_proc(Host, ?PROCNAME), + Proc = get_proc_name(Host), ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]}, @@ -89,7 +89,7 @@ start(Host, Opts) -> supervisor:start_child(ejabberd_sup, ChildSpec). stop(Host) -> - Proc = gen_mod:get_module_proc(Host, ?PROCNAME), + Proc = get_proc_name(Host), gen_server:call(Proc, stop), supervisor:delete_child(ejabberd_sup, Proc). @@ -955,7 +955,8 @@ get_room_state(RoomPid) -> {ok, R} = gen_fsm:sync_send_all_state_event(RoomPid, get_state), R. -get_proc_name(Host) -> gen_mod:get_module_proc(Host, ?PROCNAME). +get_proc_name(Host) -> + {global, gen_mod:get_module_proc(Host, ?PROCNAME)}. calc_hour_offset(TimeHere) -> TimeZero = calendar:now_to_universal_time(now()), diff --git a/src/mod_muc/mod_muc_room.erl b/src/mod_muc/mod_muc_room.erl index 8d2229e99..dccbd1a97 100644 --- a/src/mod_muc/mod_muc_room.erl +++ b/src/mod_muc/mod_muc_room.erl @@ -27,14 +27,17 @@ -module(mod_muc_room). -author('alexey@process-one.net'). --behaviour(gen_fsm). +-define(GEN_FSM, p1_fsm). %% External exports -export([start_link/9, start_link/7, + start_link/2, start/9, start/7, + start/2, + migrate/3, route/4]). %% gen_fsm callbacks @@ -44,6 +47,7 @@ handle_sync_event/4, handle_info/3, terminate/3, + print_state/1, code_change/4]). -include_lib("exmpp/include/exmpp.hrl"). @@ -65,19 +69,14 @@ %% Module start with or without supervisor: -ifdef(NO_TRANSIENT_SUPERVISORS). --define(SUPERVISOR_START, - gen_fsm:start(?MODULE, [Host, ServerHost, Access, Room, HistorySize, - RoomShaper, Creator, Nick, DefRoomOpts], - ?FSMOPTS)). +-define(SUPERVISOR_START(Args), + ?GEN_FSM:start(?MODULE, Args, ?FSMOPTS)). -else. --define(SUPERVISOR_START, +-define(SUPERVISOR_START(Args), Supervisor = gen_mod:get_module_proc(ServerHost, ejabberd_mod_muc_sup), - supervisor:start_child( - Supervisor, [Host, ServerHost, Access, Room, HistorySize, RoomShaper, - Creator, Nick, DefRoomOpts])). + supervisor:start_child(Supervisor, Args)). -endif. - -define(ERR(Packet,Type, Lang, ErrText), exmpp_stanza:error(Packet#xmlel.ns, Type, @@ -88,7 +87,8 @@ %%%---------------------------------------------------------------------- start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Creator, Nick, DefRoomOpts) -> - ?SUPERVISOR_START. + ?SUPERVISOR_START([Host, ServerHost, Access, Room, HistorySize, + RoomShaper, Creator, Nick, DefRoomOpts]). start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) -> Supervisor = gen_mod:get_module_proc(ServerHost, ejabberd_mod_muc_sup), @@ -96,16 +96,26 @@ start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) -> Supervisor, [Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts]). +start(StateName, StateData) -> + ServerHost = StateData#state.server_host, + ?SUPERVISOR_START([StateName, StateData]). + start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Creator, Nick, DefRoomOpts) -> - gen_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize, - RoomShaper, Creator, Nick, DefRoomOpts], - ?FSMOPTS). + ?GEN_FSM:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize, + RoomShaper, Creator, Nick, DefRoomOpts], + ?FSMOPTS). start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) -> - gen_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize, - RoomShaper, Opts], - ?FSMOPTS). + ?GEN_FSM:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize, + RoomShaper, Opts], + ?FSMOPTS). + +start_link(StateName, StateData) -> + ?GEN_FSM:start_link(?MODULE, [StateName, StateData], ?FSMOPTS). + +migrate(FsmRef, Node, After) -> + ?GEN_FSM:send_all_state_event(FsmRef, {migrate, Node, After}). %%%---------------------------------------------------------------------- %%% Callback functions from gen_fsm @@ -147,7 +157,11 @@ init([Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts]) -> jid = exmpp_jid:make(Room, Host), room_shaper = Shaper}), add_to_log(room_existence, started, State), - {ok, normal_state, State}. + {ok, normal_state, State}; +init([StateName, #state{room = Room, host = Host} = StateData]) -> + process_flag(trap_exit, true), + mod_muc:register_room(Host, Room, self()), + {ok, StateName, StateData}. %%---------------------------------------------------------------------- %% Func: StateName/2 @@ -600,6 +614,9 @@ handle_event(destroy, StateName, StateData) -> handle_event({set_affiliations, Affiliations}, StateName, StateData) -> {next_state, StateName, StateData#state{affiliations = Affiliations}}; +handle_event({migrate, Node, After}, StateName, StateData) when Node /= node() -> + {migrate, StateData, + {Node, ?MODULE, start, [StateName, StateData]}, After * 2}; handle_event(_Event, StateName, StateData) -> {next_state, StateName, StateData}. @@ -644,6 +661,9 @@ handle_sync_event(_Event, _From, StateName, StateData) -> code_change(_OldVsn, StateName, StateData, _Extra) -> {ok, StateName, StateData}. +print_state(StateData) -> + StateData. + %%---------------------------------------------------------------------- %% Func: handle_info/3 %% Returns: {next_state, NextStateName, NextStateData} | @@ -733,6 +753,13 @@ handle_info(_Info, StateName, StateData) -> %% Purpose: Shutdown the fsm %% Returns: any %%---------------------------------------------------------------------- +terminate({migrated, Clone}, _StateName, StateData) -> + ?INFO_MSG("Migrating room ~s@~s to ~p on node ~p", + [StateData#state.room, StateData#state.host, + Clone, node(Clone)]), + mod_muc:room_destroyed(StateData#state.host, StateData#state.room, + self(), StateData#state.server_host), + ok; terminate(Reason, _StateName, StateData) -> ?INFO_MSG("Stopping MUC room ~s@~s", [StateData#state.room, StateData#state.host]), @@ -778,7 +805,7 @@ terminate(Reason, _StateName, StateData) -> %%%---------------------------------------------------------------------- route(Pid, From, ToNick, Packet) -> - gen_fsm:send_event(Pid, {route, From, ToNick, Packet}). + ?GEN_FSM:send_event(Pid, {route, From, ToNick, Packet}). process_groupchat_message(From, #xmlel{name = 'message'} = Packet, StateData) -> @@ -1673,13 +1700,15 @@ add_new_user(From, Nick, Packet, StateData) -> From, Err), StateData; captcha_required -> - ID = randoms:get_string(), - SID = case exmpp_stanza:get_id(Packet) of undefined -> ""; SID1 -> SID1 end, + SID = case exmpp_stanza:get_id(Packet) of + undefined -> ""; + SID1 -> SID1 + end, RoomJID = StateData#state.jid, To = jid_replace_resource(RoomJID, Nick), case ejabberd_captcha:create_captcha( - ID, SID, RoomJID, To, Lang, From) of - {ok, CaptchaEls} -> + SID, RoomJID, To, Lang, From) of + {ok, ID, CaptchaEls} -> MsgPkt = #xmlel{name = 'message', attrs = [#xmlattr{name = 'id', value = ID}], children = CaptchaEls}, diff --git a/src/mod_proxy65/mod_proxy65_sm.erl b/src/mod_proxy65/mod_proxy65_sm.erl index 569458f6a..bdd1b5bb8 100644 --- a/src/mod_proxy65/mod_proxy65_sm.erl +++ b/src/mod_proxy65/mod_proxy65_sm.erl @@ -71,7 +71,9 @@ start_link(Host, Opts) -> gen_server:start_link({local, Proc}, ?MODULE, [Opts], []). init([Opts]) -> + update_tables(), mnesia:create_table(bytestream, [{ram_copies, [node()]}, + {local_content, true}, {attributes, record_info(fields, bytestream)}]), mnesia:add_table_copy(bytestream, node(), ram_copies), MaxConnections = gen_mod:get_opt(max_connections, Opts, infinity), @@ -179,3 +181,11 @@ activate_stream(SHA1, IJid, TJid, Host) when is_list(SHA1) -> _ -> error end. + +update_tables() -> + case catch mnesia:table_info(bytestream, local_content) of + false -> + mnesia:delete_table(bytestream); + _ -> + ok + end. diff --git a/src/p1_fsm.erl b/src/p1_fsm.erl index 03ff7f8ce..9ca924112 100644 --- a/src/p1_fsm.erl +++ b/src/p1_fsm.erl @@ -517,6 +517,25 @@ print_event(Dev, return, {Name, StateName}) -> io:format(Dev, "*DBG* ~p switched to state ~w~n", [Name, StateName]). +relay_messages(MRef, TRef, Clone, Queue) -> + lists:foreach( + fun(Msg) -> Clone ! Msg end, + queue:to_list(Queue)), + relay_messages(MRef, TRef, Clone). + +relay_messages(MRef, TRef, Clone) -> + receive + {'DOWN', MRef, process, Clone, Reason} -> + Reason; + {'EXIT', _Parent, _Reason} -> + {migrated, Clone}; + {timeout, TRef, timeout} -> + {migrated, Clone}; + Msg -> + Clone ! Msg, + relay_messages(MRef, TRef, Clone) + end. + handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time, Limits, Queue, QueueLen) -> %No debug here From = from(Msg), @@ -535,6 +554,23 @@ handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time, reply(From, Reply), loop(Parent, Name, NStateName, NStateData, Mod, Time1, [], Limits, Queue, QueueLen); + {migrate, NStateData, {Node, M, F, A}, Time1} -> + Reason = case catch rpc:call(Node, M, F, A, 5000) of + {badrpc, _} = Err -> + {migration_error, Err}; + {'EXIT', _} = Err -> + {migration_error, Err}; + {error, _} = Err -> + {migration_error, Err}; + {ok, Clone} -> + process_flag(trap_exit, true), + MRef = erlang:monitor(process, Clone), + TRef = erlang:start_timer(Time1, self(), timeout), + relay_messages(MRef, TRef, Clone, Queue); + Reply -> + {migration_error, {bad_reply, Reply}} + end, + terminate(Reason, Name, Msg, Mod, StateName, NStateData, []); {stop, Reason, NStateData} -> terminate(Reason, Name, Msg, Mod, StateName, NStateData, []); {stop, Reason, Reply, NStateData} when From =/= undefined -> @@ -571,6 +607,23 @@ handle_msg(Msg, Parent, Name, StateName, StateData, Debug1 = reply(Name, From, Reply, Debug, NStateName), loop(Parent, Name, NStateName, NStateData, Mod, Time1, Debug1, Limits, Queue, QueueLen); + {migrate, NStateData, {Node, M, F, A}, Time1} -> + Reason = case catch rpc:call(Node, M, F, A, Time1) of + {badrpc, R} -> + {migration_error, R}; + {'EXIT', R} -> + {migration_error, R}; + {error, R} -> + {migration_error, R}; + {ok, Clone} -> + process_flag(trap_exit, true), + MRef = erlang:monitor(process, Clone), + TRef = erlang:start_timer(Time1, self(), timeout), + relay_messages(MRef, TRef, Clone, Queue); + Reply -> + {migration_error, {bad_reply, Reply}} + end, + terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug); {stop, Reason, NStateData} -> terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug); {stop, Reason, Reply, NStateData} when From =/= undefined -> @@ -633,12 +686,10 @@ terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug) -> %% Priority shutdown should be considered as %% shutdown by SASL exit(shutdown); - {process_limit, Limit} -> - %% Priority shutdown should be considered as - %% shutdown by SASL - error_logger:error_msg("FSM limit reached (~p): ~p~n", - [self(), Limit]), - exit(shutdown); + {process_limit, _Limit} -> + exit(Reason); + {migrated, _Clone} -> + exit(normal); _ -> error_info(Mod, Reason, Name, Msg, StateName, StateData, Debug), exit(Reason) @@ -705,7 +756,12 @@ get_msg(Msg) -> Msg. format_status(Opt, StatusData) -> [PDict, SysState, Parent, Debug, [Name, StateName, StateData, Mod, _Time]] = StatusData, - Header = lists:concat(["Status for state machine ", Name]), + NameTag = if is_pid(Name) -> + pid_to_list(Name); + is_atom(Name) -> + Name + end, + Header = lists:concat(["Status for state machine ", NameTag]), Log = sys:get_debug(log, Debug, []), Specfic = case erlang:function_exported(Mod, format_status, 2) of diff --git a/src/web/ejabberd_http_bind.erl b/src/web/ejabberd_http_bind.erl index bef9c268d..d25c7e060 100644 --- a/src/web/ejabberd_http_bind.erl +++ b/src/web/ejabberd_http_bind.erl @@ -27,6 +27,7 @@ setopts/2, controlling_process/2, become_controller/2, + change_controller/2, custom_receiver/1, reset_stream/1, change_shaper/2, @@ -114,9 +115,19 @@ start(XMPPDomain, Sid, Key, IP) -> ?DEBUG("Starting session", []), case catch supervisor:start_child(ejabberd_http_bind_sup, [Sid, Key, IP]) of - {ok, Pid} -> {ok, Pid}; - _ -> check_bind_module(XMPPDomain), - {error, "Cannot start HTTP bind session"} + {ok, Pid} -> + {ok, Pid}; + {error, _} = Err -> + case check_bind_module(XMPPDomain) of + false -> + {error, "Cannot start HTTP bind session"}; + true -> + ?ERROR_MSG("Cannot start HTTP bind session: ~p", [Err]), + Err + end; + Exit -> + ?ERROR_MSG("Cannot start HTTP bind session: ~p", [Exit]), + {error, Exit} end. start_link(Sid, Key, IP) -> @@ -133,7 +144,13 @@ setopts({http_bind, FsmRef, _IP}, Opts) -> true -> gen_fsm:send_all_state_event(FsmRef, {activate, self()}); _ -> - ok + case lists:member({active, false}, Opts) of + true -> + gen_fsm:sync_send_all_state_event( + FsmRef, deactivate_socket); + _ -> + ok + end end. controlling_process(_Socket, _Pid) -> @@ -145,6 +162,9 @@ custom_receiver({http_bind, FsmRef, _IP}) -> become_controller(FsmRef, C2SPid) -> gen_fsm:send_all_state_event(FsmRef, {become_controller, C2SPid}). +change_controller({http_bind, FsmRef, _IP}, C2SPid) -> + become_controller(FsmRef, C2SPid). + reset_stream({http_bind, _FsmRef, _IP}) -> ok. @@ -185,12 +205,13 @@ process_request(Data, IP) -> "xmlns='" ++ ?NS_HTTP_BIND_s ++ "'/>"}; XmppDomain -> %% create new session - Sid = sha:sha(term_to_binary({now(), make_ref()})), + Sid = make_sid(), case start(XmppDomain, Sid, "", IP) of {error, _} -> - {200, ?HEADER, "BOSH module not started"}; + "xmlns='" ++ ?NS_HTTP_BIND_s ++ + "'>Internal Server Error"}; {ok, Pid} -> handle_session_start( Pid, XmppDomain, Sid, Rid, Attrs, @@ -216,10 +237,10 @@ process_request(Data, IP) -> handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize, StreamStart, IP); {size_limit, Sid} -> - case mnesia:dirty_read({http_bind, Sid}) of - [] -> + case get_session(Sid) of + {error, _} -> {404, ?HEADER, ""}; - [#http_bind{pid = FsmRef}] -> + {ok, #http_bind{pid = FsmRef}} -> gen_fsm:sync_send_all_state_event(FsmRef, {stop, close}), {200, ?HEADER, " mnesia:write( #http_bind{id = Sid, @@ -320,6 +341,7 @@ init([Sid, Key, IP]) -> %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- handle_event({become_controller, C2SPid}, StateName, StateData) -> + erlang:monitor(process, C2SPid), case StateData#state.input of cancel -> {next_state, StateName, StateData#state{ @@ -384,6 +406,14 @@ handle_sync_event({stop,close}, _From, _StateName, StateData) -> handle_sync_event({stop,stream_closed}, _From, _StateName, StateData) -> Reply = ok, {stop, normal, Reply, StateData}; +handle_sync_event(deactivate_socket, _From, StateName, StateData) -> + %% Input = case StateData#state.input of + %% cancel -> + %% queue:new(); + %% Q -> + %% Q + %% end, + {reply, ok, StateName, StateData#state{waiting_input = false}}; handle_sync_event({stop,Reason}, _From, _StateName, StateData) -> ?DEBUG("Closing bind session ~p - Reason: ~p", [StateData#state.id, Reason]), Reply = ok, @@ -517,6 +547,9 @@ handle_info({timeout, ShaperTimer, _}, StateName, #state{shaper_timer = ShaperTimer} = StateData) -> {next_state, StateName, StateData#state{shaper_timer = undefined}}; +handle_info({'DOWN', _MRef, process, C2SPid, _}, _StateName, + #state{waiting_input = C2SPid} = StateData) -> + {stop, normal, StateData}; handle_info(_, StateName, StateData) -> {next_state, StateName, StateData}. @@ -788,10 +821,10 @@ handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) -> http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) -> ?DEBUG("Looking for session: ~p", [Sid]), - case mnesia:dirty_read({http_bind, Sid}) of - [] -> + case get_session(Sid) of + {error, _} -> {error, not_exists}; - [#http_bind{pid = FsmRef, hold=Hold, to={To, StreamVersion}}=Sess] -> + {ok, #http_bind{pid = FsmRef, hold=Hold, to={To, StreamVersion}}=Sess}-> NewStream = case StreamStart of true -> @@ -1305,7 +1338,36 @@ check_default_xmlns(#xmlel{name = Name, ns = Xmlns, attrs = Attrs, children = El %% Print a warning in log file if this is not the case. check_bind_module(XmppDomain) -> case gen_mod:is_loaded(XmppDomain, mod_http_bind) of - true -> ok; + true -> true; false -> ?ERROR_MSG("You are trying to use BOSH (HTTP Bind), but the module mod_http_bind is not started.~n" - "Check your 'modules' section in your ejabberd configuration file.",[]) + "Check your 'modules' section in your ejabberd configuration file.",[]), + false + end. + +make_sid() -> + sha:sha(term_to_binary({now(), make_ref()})) + ++ "-" ++ ejabberd_cluster:node_id(). + +get_session(SID) -> + case string:tokens(SID, "-") of + [_, NodeID] -> + case ejabberd_cluster:get_node_by_id(NodeID) of + Node when Node == node() -> + case mnesia:dirty_read({http_bind, SID}) of + [] -> + {error, enoent}; + [Session] -> + {ok, Session} + end; + Node -> + case catch rpc:call(Node, mnesia, dirty_read, + [{http_bind, SID}], 5000) of + [Session] -> + {ok, Session}; + _ -> + {error, enoent} + end + end; + _ -> + {error, enoent} end. diff --git a/src/web/ejabberd_http_poll.erl b/src/web/ejabberd_http_poll.erl index e1af0ad82..66685b413 100644 --- a/src/web/ejabberd_http_poll.erl +++ b/src/web/ejabberd_http_poll.erl @@ -83,9 +83,12 @@ %%% API %%%---------------------------------------------------------------------- start(ID, Key, IP) -> + update_tables(), mnesia:create_table(http_poll, [{ram_copies, [node()]}, + {local_content, true}, {attributes, record_info(fields, http_poll)}]), + mnesia:add_table_copy(http_poll, node(), ram_copies), supervisor:start_child(ejabberd_http_poll_sup, [ID, Key, IP]). start_link(ID, Key, IP) -> @@ -121,9 +124,9 @@ process([], #request{data = Data, {ok, ID1, Key, NewKey, Packet} -> ID = if (ID1 == "0") or (ID1 == "mobile") -> - NewID = sha:sha(term_to_binary({now(), make_ref()})), + NewID = make_sid(), {ok, Pid} = start(NewID, "", IP), - mnesia:transaction( + mnesia:async_dirty( fun() -> mnesia:write(#http_poll{id = NewID, pid = Pid}) @@ -358,7 +361,7 @@ handle_info(_, StateName, StateData) -> %% Returns: any %%---------------------------------------------------------------------- terminate(_Reason, _StateName, StateData) -> - mnesia:transaction( + mnesia:async_dirty( fun() -> mnesia:delete({http_poll, StateData#state.id}) end), @@ -383,19 +386,19 @@ terminate(_Reason, _StateName, StateData) -> %%%---------------------------------------------------------------------- http_put(ID, Key, NewKey, Packet) -> - case mnesia:dirty_read({http_poll, ID}) of - [] -> + case get_session(ID) of + {error, _} -> {error, not_exists}; - [#http_poll{pid = FsmRef}] -> + {ok, #http_poll{pid = FsmRef}} -> gen_fsm:sync_send_all_state_event( FsmRef, {http_put, Key, NewKey, Packet}) end. http_get(ID) -> - case mnesia:dirty_read({http_poll, ID}) of - [] -> + case get_session(ID) of + {error, _} -> {error, not_exists}; - [#http_poll{pid = FsmRef}] -> + {ok, #http_poll{pid = FsmRef}} -> gen_fsm:sync_send_all_state_event(FsmRef, http_get) end. @@ -461,3 +464,39 @@ get_jid("to", ParsedPacket) -> From -> exmpp_jid:parse(From) end. + +update_tables() -> + case catch mnesia:table_info(http_poll, local_content) of + false -> + mnesia:delete_table(http_poll); + _ -> + ok + end. + +make_sid() -> + sha:sha(term_to_binary({now(), make_ref()})) + ++ "-" ++ ejabberd_cluster:node_id(). + +get_session(SID) -> + case string:tokens(SID, "-") of + [_, NodeID] -> + case ejabberd_cluster:get_node_by_id(NodeID) of + Node when Node == node() -> + case mnesia:dirty_read({http_poll, SID}) of + [] -> + {error, enoent}; + [Session] -> + {ok, Session} + end; + Node -> + case catch rpc:call(Node, mnesia, dirty_read, + [{http_poll, SID}], 5000) of + [Session] -> + {ok, Session}; + _ -> + {error, enoent} + end + end; + _ -> + {error, enoent} + end. diff --git a/src/web/mod_http_bind.erl b/src/web/mod_http_bind.erl index a5ff62fa6..0c422b812 100644 --- a/src/web/mod_http_bind.erl +++ b/src/web/mod_http_bind.erl @@ -137,7 +137,9 @@ setup_database() -> migrate_database(), mnesia:create_table(http_bind, [{ram_copies, [node()]}, - {attributes, record_info(fields, http_bind)}]). + {local_content, true}, + {attributes, record_info(fields, http_bind)}]), + mnesia:add_table_copy(http_bind, node(), ram_copies). migrate_database() -> case catch mnesia:table_info(http_bind, attributes) of @@ -147,4 +149,10 @@ migrate_database() -> %% Since the stored information is not important, instead %% of actually migrating data, let's just destroy the table mnesia:delete_table(http_bind) + end, + case catch mnesia:table_info(http_bind, local_content) of + false -> + mnesia:delete_table(http_bind); + _ -> + ok end.