From 86e03f3828445ab6b10c730135d5e8fb89974494 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Mon, 9 Mar 2015 14:57:33 +0300 Subject: [PATCH] Separate db backend from the SM --- src/ejabberd_sm.erl | 251 +++++++++++-------------------------- src/ejabberd_sm_mnesia.erl | 148 ++++++++++++++++++++++ 2 files changed, 223 insertions(+), 176 deletions(-) create mode 100644 src/ejabberd_sm_mnesia.erl diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index b3a46ba2d..9925d26dd 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -73,11 +73,18 @@ -include("jlib.hrl"). -include("ejabberd_commands.hrl"). --include_lib("stdlib/include/ms_transform.hrl"). -include("mod_privacy.hrl"). +-include("ejabberd_sm.hrl"). + +-callback init() -> ok | {error, any()}. +-callback get_session(sid()) -> {ok, #session{}} | {error, notfound}. +-callback set_session(#session{}) -> ok. +-callback delete_session(sid()) -> ok. +-callback get_sessions() -> [#session{}]. +-callback get_sessions(binary()) -> [#session{}]. +-callback get_sessions(binary(), binary()) -> [#session{}]. +-callback get_sessions(binary(), binary(), binary()) -> [#session{}]. --record(session, {sid, usr, us, priority, info}). --record(session_counter, {vhost, count}). -record(state, {}). %% default value for the maximum number of user connections @@ -90,12 +97,6 @@ %% Function: start_link() -> {ok,Pid} | ignore | {error,Error} %% Description: Starts the server %%-------------------------------------------------------------------- --type sid() :: {erlang:timestamp(), pid()}. --type ip() :: {inet:ip_address(), inet:port_number()} | undefined. --type info() :: [{conn, atom()} | {ip, ip()} | {node, atom()} - | {oor, boolean()} | {auth_module, atom()}]. --type prio() :: undefined | integer(). - -export_type([sid/0]). start_link() -> @@ -116,8 +117,6 @@ route(From, To, Packet) -> open_session(SID, User, Server, Resource, Priority, Info) -> set_session(SID, User, Server, Resource, Priority, Info), - mnesia:dirty_update_counter(session_counter, - jlib:nameprep(Server), 1), check_for_sessions_to_replace(User, Server, Resource), JID = jlib:make_jid(User, Server, Resource), ejabberd_hooks:run(sm_register_connection_hook, @@ -131,16 +130,12 @@ open_session(SID, User, Server, Resource, Info) -> -spec close_session(sid(), binary(), binary(), binary()) -> ok. close_session(SID, User, Server, Resource) -> - Info = case mnesia:dirty_read({session, SID}) of - [] -> []; - [#session{info=I}] -> I - end, - F = fun() -> - mnesia:delete({session, SID}), - mnesia:dirty_update_counter(session_counter, - jlib:nameprep(Server), -1) - end, - mnesia:sync_dirty(F), + Mod = get_sm_backend(), + Info = case Mod:get_session(SID) of + {ok, #session{info = I}} -> I; + {error, notfound} -> [] + end, + Mod:delete_session(SID), JID = jlib:make_jid(User, Server, Resource), ejabberd_hooks:run(sm_remove_connection_hook, JID#jid.lserver, [SID, JID, Info]). @@ -169,27 +164,17 @@ disconnect_removed_user(User, Server) -> get_user_resources(User, Server) -> LUser = jlib:nodeprep(User), LServer = jlib:nameprep(Server), - US = {LUser, LServer}, - case catch mnesia:dirty_index_read(session, US, #session.us) of - {'EXIT', _Reason} -> - []; - Ss -> - [element(3, S#session.usr) || S <- clean_session_list(Ss)] - end. + Mod = get_sm_backend(), + Ss = Mod:get_sessions(LUser, LServer), + [element(3, S#session.usr) || S <- clean_session_list(Ss)]. -spec get_user_present_resources(binary(), binary()) -> [tuple()]. get_user_present_resources(LUser, LServer) -> - US = {LUser, LServer}, - case catch mnesia:dirty_index_read(session, US, - #session.us) - of - {'EXIT', _Reason} -> []; - Ss -> - [{S#session.priority, element(3, S#session.usr)} - || S <- clean_session_list(Ss), - is_integer(S#session.priority)] - end. + Mod = get_sm_backend(), + Ss = Mod:get_sessions(LUser, LServer), + [{S#session.priority, element(3, S#session.usr)} + || S <- clean_session_list(Ss), is_integer(S#session.priority)]. -spec get_user_ip(binary(), binary(), binary()) -> ip(). @@ -197,8 +182,8 @@ get_user_ip(User, Server, Resource) -> LUser = jlib:nodeprep(User), LServer = jlib:nameprep(Server), LResource = jlib:resourceprep(Resource), - USR = {LUser, LServer, LResource}, - case mnesia:dirty_index_read(session, USR, #session.usr) of + Mod = get_sm_backend(), + case Mod:get_sessions(LUser, LServer, LResource) of [] -> undefined; Ss -> @@ -212,8 +197,8 @@ get_user_info(User, Server, Resource) -> LUser = jlib:nodeprep(User), LServer = jlib:nameprep(Server), LResource = jlib:resourceprep(Resource), - USR = {LUser, LServer, LResource}, - case mnesia:dirty_index_read(session, USR, #session.usr) of + Mod = get_sm_backend(), + case Mod:get_sessions(LUser, LServer, LResource) of [] -> offline; Ss -> @@ -262,8 +247,8 @@ get_session_pid(User, Server, Resource) -> LUser = jlib:nodeprep(User), LServer = jlib:nameprep(Server), LResource = jlib:resourceprep(Resource), - USR = {LUser, LServer, LResource}, - case catch mnesia:dirty_index_read(session, USR, #session.usr) of + Mod = get_sm_backend(), + case Mod:get_sessions(LUser, LServer, LResource) of [#session{sid = {_, Pid}}] -> Pid; _ -> none end. @@ -271,49 +256,30 @@ get_session_pid(User, Server, Resource) -> -spec dirty_get_sessions_list() -> [ljid()]. dirty_get_sessions_list() -> - mnesia:dirty_select( - session, - [{#session{usr = '$1', _ = '_'}, - [], - ['$1']}]). + Mod = get_sm_backend(), + [S#session.usr || S <- Mod:get_sessions()]. dirty_get_my_sessions_list() -> - mnesia:dirty_select( - session, - [{#session{sid = {'_', '$1'}, _ = '_'}, - [{'==', {node, '$1'}, node()}], - ['$_']}]). + Mod = get_sm_backend(), + [S || S <- Mod:get_sessions(), node(element(2, S#session.sid)) == node()]. -spec get_vh_session_list(binary()) -> [ljid()]. get_vh_session_list(Server) -> LServer = jlib:nameprep(Server), - mnesia:dirty_select(session, - [{#session{usr = '$1', _ = '_'}, - [{'==', {element, 2, '$1'}, LServer}], ['$1']}]). + Mod = get_sm_backend(), + [S#session.usr || S <- Mod:get_sessions(LServer)]. -spec get_all_pids() -> [pid()]. get_all_pids() -> - mnesia:dirty_select( - session, - ets:fun2ms( - fun(#session{sid = {_, Pid}}) -> - Pid - end)). + Mod = get_sm_backend(), + [element(2, S#session.sid) || S <- Mod:get_sessions()]. get_vh_session_number(Server) -> LServer = jlib:nameprep(Server), - Query = mnesia:dirty_select( - session_counter, - [{#session_counter{vhost = LServer, count = '$1'}, - [], - ['$1']}]), - case Query of - [Count] -> - Count; - _ -> 0 - end. + Mod = get_sm_backend(), + length(Mod:get_sessions(LServer)). register_iq_handler(Host, XMLNS, Module, Fun) -> ejabberd_sm ! @@ -343,18 +309,8 @@ unregister_iq_handler(Host, XMLNS) -> %% Description: Initiates the server %%-------------------------------------------------------------------- init([]) -> - update_tables(), - mnesia:create_table(session, - [{ram_copies, [node()]}, - {attributes, record_info(fields, session)}]), - mnesia:create_table(session_counter, - [{ram_copies, [node()]}, - {attributes, record_info(fields, session_counter)}]), - mnesia:add_table_index(session, usr), - mnesia:add_table_index(session, us), - mnesia:add_table_copy(session, node(), ram_copies), - mnesia:add_table_copy(session_counter, node(), ram_copies), - mnesia:subscribe(system), + Mod = get_sm_backend(), + Mod:init(), ets:new(sm_iqtable, [named_table]), lists:foreach( fun(Host) -> @@ -366,7 +322,6 @@ init([]) -> ejabberd_sm, disconnect_removed_user, 100) end, ?MYHOSTS), ejabberd_commands:register_commands(commands()), - {ok, #state{}}. %%-------------------------------------------------------------------- @@ -404,9 +359,6 @@ handle_info({route, From, To, Packet}, State) -> ok end, {noreply, State}; -handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> - recount_session_table(Node), - {noreply, State}; handle_info({register_iq_handler, Host, XMLNS, Module, Function}, State) -> ets:insert(sm_iqtable, {{XMLNS, Host}, Module, Function}), {noreply, State}; @@ -454,38 +406,9 @@ set_session(SID, User, Server, Resource, Priority, Info) -> LResource = jlib:resourceprep(Resource), US = {LUser, LServer}, USR = {LUser, LServer, LResource}, - F = fun () -> - mnesia:write(#session{sid = SID, usr = USR, us = US, - priority = Priority, info = Info}) - end, - mnesia:sync_dirty(F). - -%% Recalculates alive sessions when Node goes down -%% and updates session and session_counter tables -recount_session_table(Node) -> - F = fun() -> - Es = mnesia:select( - session, - [{#session{sid = {'_', '$1'}, _ = '_'}, - [{'==', {node, '$1'}, Node}], - ['$_']}]), - lists:foreach(fun(E) -> - mnesia:delete({session, E#session.sid}) - end, Es), - %% reset session_counter table with active sessions - mnesia:clear_table(session_counter), - lists:foreach(fun(Server) -> - LServer = jlib:nameprep(Server), - Hs = mnesia:select(session, - [{#session{usr = '$1', _ = '_'}, - [{'==', {element, 2, '$1'}, LServer}], - ['$1']}]), - mnesia:write( - #session_counter{vhost = LServer, - count = length(Hs)}) - end, ?MYHOSTS) - end, - mnesia:async_dirty(F). + Mod = get_sm_backend(), + Mod:set_session(#session{sid = SID, usr = USR, us = US, + priority = Priority, info = Info}). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -499,8 +422,9 @@ do_route(From, To, {broadcast, _} = Packet) -> end, get_user_resources(To#jid.user, To#jid.server)); _ -> - USR = jlib:jid_tolower(To), - case mnesia:dirty_index_read(session, USR, #session.usr) of + {U, S, R} = jlib:jid_tolower(To), + Mod = get_sm_backend(), + case Mod:get_sessions(U, S, R) of [] -> ?DEBUG("packet dropped~n", []); Ss -> @@ -589,9 +513,8 @@ do_route(From, To, #xmlel{} = Packet) -> _ -> ok end; _ -> - USR = {LUser, LServer, LResource}, - case mnesia:dirty_index_read(session, USR, #session.usr) - of + Mod = get_sm_backend(), + case Mod:get_sessions(LUser, LServer, LResource) of [] -> case Name of <<"message">> -> route_message(From, To, Packet); @@ -646,10 +569,9 @@ route_message(From, To, Packet) -> when is_integer(Priority), Priority >= 0 -> lists:foreach(fun ({P, R}) when P == Priority -> LResource = jlib:resourceprep(R), - USR = {LUser, LServer, LResource}, - case mnesia:dirty_index_read(session, USR, - #session.usr) - of + Mod = get_sm_backend(), + case Mod:get_sessions(LUser, LServer, + LResource) of [] -> ok; % Race condition Ss -> @@ -730,17 +652,15 @@ is_existing_resource(LUser, LServer, LResource) -> [] /= get_resource_sessions(LUser, LServer, LResource). get_resource_sessions(User, Server, Resource) -> - USR = {jlib:nodeprep(User), jlib:nameprep(Server), - jlib:resourceprep(Resource)}, - mnesia:dirty_select(session, - [{#session{sid = '$1', usr = USR, _ = '_'}, [], - ['$1']}]). + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + LResource = jlib:resourceprep(Resource), + Mod = get_sm_backend(), + [S#session.sid || S <- Mod:get_sessions(LUser, LServer, LResource)]. check_max_sessions(LUser, LServer) -> - SIDs = mnesia:dirty_select(session, - [{#session{sid = '$1', us = {LUser, LServer}, - _ = '_'}, - [], ['$1']}]), + Mod = get_sm_backend(), + SIDs = [S#session.sid || S <- Mod:get_sessions(LUser, LServer)], MaxSessions = get_max_user_sessions(LUser, LServer), if length(SIDs) =< MaxSessions -> ok; true -> {_, Pid} = lists:min(SIDs), Pid ! replaced @@ -790,17 +710,22 @@ process_iq(From, To, Packet) -> -spec force_update_presence({binary(), binary()}) -> any(). -force_update_presence({LUser, _LServer} = US) -> - case catch mnesia:dirty_index_read(session, US, - #session.us) - of - {'EXIT', _Reason} -> ok; - Ss -> - lists:foreach(fun (#session{sid = {_, Pid}}) -> - Pid ! {force_update_presence, LUser} - end, - Ss) - end. +force_update_presence({LUser, LServer}) -> + Mod = get_sm_backend(), + Ss = Mod:get_sessions(LUser, LServer), + lists:foreach(fun (#session{sid = {_, Pid}}) -> + Pid ! {force_update_presence, LUser} + end, + Ss). + +-spec get_sm_backend() -> module(). + +get_sm_backend() -> + DBType = ejabberd_config:get_option(sm_db_type, + fun(mnesia) -> mnesia; + (internal) -> mnesia + end, mnesia), + list_to_atom("ejabberd_sm_" ++ atom_to_list(DBType)). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% ejabberd commands @@ -852,29 +777,3 @@ kick_user(User, Server) -> PID ! kick end, Resources), length(Resources). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%%% Update Mnesia tables - -update_tables() -> - case catch mnesia:table_info(session, attributes) of - [ur, user, node] -> mnesia:delete_table(session); - [ur, user, pid] -> mnesia:delete_table(session); - [usr, us, pid] -> mnesia:delete_table(session); - [usr, us, sid, priority, info] -> mnesia:delete_table(session); - [sid, usr, us, priority] -> - mnesia:delete_table(session); - [sid, usr, us, priority, info] -> ok; - {'EXIT', _} -> ok - end, - case lists:member(presence, mnesia:system_info(tables)) - of - true -> mnesia:delete_table(presence); - false -> ok - end, - case lists:member(local_session, mnesia:system_info(tables)) of - true -> - mnesia:delete_table(local_session); - false -> - ok - end. diff --git a/src/ejabberd_sm_mnesia.erl b/src/ejabberd_sm_mnesia.erl new file mode 100644 index 000000000..504fa9842 --- /dev/null +++ b/src/ejabberd_sm_mnesia.erl @@ -0,0 +1,148 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov +%%% @copyright (C) 2015, Evgeny Khramtsov +%%% @doc +%%% +%%% @end +%%% Created : 9 Mar 2015 by Evgeny Khramtsov +%%%------------------------------------------------------------------- +-module(ejabberd_sm_mnesia). + +-behaviour(gen_server). +-behaviour(ejabberd_sm). + +%% API +-export([init/0, + get_session/1, + set_session/1, + delete_session/1, + get_sessions/0, + get_sessions/1, + get_sessions/2, + get_sessions/3]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-include("ejabberd.hrl"). +-include("ejabberd_sm.hrl"). +-include("jlib.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). + +-record(state, {}). + +%%%=================================================================== +%%% API +%%%=================================================================== +-spec init() -> ok | {error, any()}. +init() -> + case gen_server:start_link({local, ?MODULE}, ?MODULE, [], []) of + {ok, _Pid} -> + ok; + Err -> + Err + end. + +-spec get_session(sid()) -> {ok, #session{}} | {error, notfound}. +get_session(SID) -> + case mnesia:dirty_read(session, SID) of + [] -> + {error, notfound}; + [Session] -> + {ok, Session} + end. + +-spec set_session(#session{}) -> ok. +set_session(Session) -> + mnesia:dirty_write(Session). + +-spec delete_session(sid()) -> ok. +delete_session(SID) -> + mnesia:dirty_delete(session, SID). + +-spec get_sessions() -> [#session{}]. +get_sessions() -> + ets:tab2list(session). + +-spec get_sessions(binary()) -> [#session{}]. +get_sessions(LServer) -> + mnesia:dirty_select(session, + [{#session{usr = '$1', _ = '_'}, + [{'==', {element, 2, '$1'}, LServer}], ['$_']}]). + +-spec get_sessions(binary(), binary()) -> [#session{}]. +get_sessions(LUser, LServer) -> + mnesia:dirty_index_read(session, {LUser, LServer}, #session.us). + +-spec get_sessions(binary(), binary(), binary()) -> [#session{}]. +get_sessions(LUser, LServer, LResource) -> + mnesia:dirty_index_read(session, {LUser, LServer, LResource}, #session.usr). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== +init([]) -> + update_tables(), + mnesia:create_table(session, + [{ram_copies, [node()]}, + {attributes, record_info(fields, session)}]), + mnesia:create_table(session_counter, + [{ram_copies, [node()]}, + {attributes, record_info(fields, session_counter)}]), + mnesia:add_table_index(session, usr), + mnesia:add_table_index(session, us), + mnesia:add_table_copy(session, node(), ram_copies), + mnesia:add_table_copy(session_counter, node(), ram_copies), + mnesia:subscribe(system), + {ok, #state{}}. + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> + ets:select_delete( + session, + ets:fun2ms( + fun(#session{sid = {_, Pid}}) -> + node(Pid) == Node + end)), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +update_tables() -> + case catch mnesia:table_info(session, attributes) of + [ur, user, node] -> mnesia:delete_table(session); + [ur, user, pid] -> mnesia:delete_table(session); + [usr, us, pid] -> mnesia:delete_table(session); + [usr, us, sid, priority, info] -> mnesia:delete_table(session); + [sid, usr, us, priority] -> + mnesia:delete_table(session); + [sid, usr, us, priority, info] -> ok; + {'EXIT', _} -> ok + end, + case lists:member(presence, mnesia:system_info(tables)) + of + true -> mnesia:delete_table(presence); + false -> ok + end, + case lists:member(local_session, mnesia:system_info(tables)) of + true -> + mnesia:delete_table(local_session); + false -> + ok + end.