Separate db backend from the SM

This commit is contained in:
Evgeniy Khramtsov 2015-03-09 14:57:33 +03:00
parent db9a400279
commit 86e03f3828
2 changed files with 223 additions and 176 deletions

View File

@ -73,11 +73,18 @@
-include("jlib.hrl").
-include("ejabberd_commands.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-include("mod_privacy.hrl").
-include("ejabberd_sm.hrl").
-callback init() -> ok | {error, any()}.
-callback get_session(sid()) -> {ok, #session{}} | {error, notfound}.
-callback set_session(#session{}) -> ok.
-callback delete_session(sid()) -> ok.
-callback get_sessions() -> [#session{}].
-callback get_sessions(binary()) -> [#session{}].
-callback get_sessions(binary(), binary()) -> [#session{}].
-callback get_sessions(binary(), binary(), binary()) -> [#session{}].
-record(session, {sid, usr, us, priority, info}).
-record(session_counter, {vhost, count}).
-record(state, {}).
%% default value for the maximum number of user connections
@ -90,12 +97,6 @@
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
-type sid() :: {erlang:timestamp(), pid()}.
-type ip() :: {inet:ip_address(), inet:port_number()} | undefined.
-type info() :: [{conn, atom()} | {ip, ip()} | {node, atom()}
| {oor, boolean()} | {auth_module, atom()}].
-type prio() :: undefined | integer().
-export_type([sid/0]).
start_link() ->
@ -116,8 +117,6 @@ route(From, To, Packet) ->
open_session(SID, User, Server, Resource, Priority, Info) ->
set_session(SID, User, Server, Resource, Priority, Info),
mnesia:dirty_update_counter(session_counter,
jlib:nameprep(Server), 1),
check_for_sessions_to_replace(User, Server, Resource),
JID = jlib:make_jid(User, Server, Resource),
ejabberd_hooks:run(sm_register_connection_hook,
@ -131,16 +130,12 @@ open_session(SID, User, Server, Resource, Info) ->
-spec close_session(sid(), binary(), binary(), binary()) -> ok.
close_session(SID, User, Server, Resource) ->
Info = case mnesia:dirty_read({session, SID}) of
[] -> [];
[#session{info=I}] -> I
end,
F = fun() ->
mnesia:delete({session, SID}),
mnesia:dirty_update_counter(session_counter,
jlib:nameprep(Server), -1)
end,
mnesia:sync_dirty(F),
Mod = get_sm_backend(),
Info = case Mod:get_session(SID) of
{ok, #session{info = I}} -> I;
{error, notfound} -> []
end,
Mod:delete_session(SID),
JID = jlib:make_jid(User, Server, Resource),
ejabberd_hooks:run(sm_remove_connection_hook,
JID#jid.lserver, [SID, JID, Info]).
@ -169,27 +164,17 @@ disconnect_removed_user(User, Server) ->
get_user_resources(User, Server) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
US = {LUser, LServer},
case catch mnesia:dirty_index_read(session, US, #session.us) of
{'EXIT', _Reason} ->
[];
Ss ->
[element(3, S#session.usr) || S <- clean_session_list(Ss)]
end.
Mod = get_sm_backend(),
Ss = Mod:get_sessions(LUser, LServer),
[element(3, S#session.usr) || S <- clean_session_list(Ss)].
-spec get_user_present_resources(binary(), binary()) -> [tuple()].
get_user_present_resources(LUser, LServer) ->
US = {LUser, LServer},
case catch mnesia:dirty_index_read(session, US,
#session.us)
of
{'EXIT', _Reason} -> [];
Ss ->
[{S#session.priority, element(3, S#session.usr)}
|| S <- clean_session_list(Ss),
is_integer(S#session.priority)]
end.
Mod = get_sm_backend(),
Ss = Mod:get_sessions(LUser, LServer),
[{S#session.priority, element(3, S#session.usr)}
|| S <- clean_session_list(Ss), is_integer(S#session.priority)].
-spec get_user_ip(binary(), binary(), binary()) -> ip().
@ -197,8 +182,8 @@ get_user_ip(User, Server, Resource) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
LResource = jlib:resourceprep(Resource),
USR = {LUser, LServer, LResource},
case mnesia:dirty_index_read(session, USR, #session.usr) of
Mod = get_sm_backend(),
case Mod:get_sessions(LUser, LServer, LResource) of
[] ->
undefined;
Ss ->
@ -212,8 +197,8 @@ get_user_info(User, Server, Resource) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
LResource = jlib:resourceprep(Resource),
USR = {LUser, LServer, LResource},
case mnesia:dirty_index_read(session, USR, #session.usr) of
Mod = get_sm_backend(),
case Mod:get_sessions(LUser, LServer, LResource) of
[] ->
offline;
Ss ->
@ -262,8 +247,8 @@ get_session_pid(User, Server, Resource) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
LResource = jlib:resourceprep(Resource),
USR = {LUser, LServer, LResource},
case catch mnesia:dirty_index_read(session, USR, #session.usr) of
Mod = get_sm_backend(),
case Mod:get_sessions(LUser, LServer, LResource) of
[#session{sid = {_, Pid}}] -> Pid;
_ -> none
end.
@ -271,49 +256,30 @@ get_session_pid(User, Server, Resource) ->
-spec dirty_get_sessions_list() -> [ljid()].
dirty_get_sessions_list() ->
mnesia:dirty_select(
session,
[{#session{usr = '$1', _ = '_'},
[],
['$1']}]).
Mod = get_sm_backend(),
[S#session.usr || S <- Mod:get_sessions()].
dirty_get_my_sessions_list() ->
mnesia:dirty_select(
session,
[{#session{sid = {'_', '$1'}, _ = '_'},
[{'==', {node, '$1'}, node()}],
['$_']}]).
Mod = get_sm_backend(),
[S || S <- Mod:get_sessions(), node(element(2, S#session.sid)) == node()].
-spec get_vh_session_list(binary()) -> [ljid()].
get_vh_session_list(Server) ->
LServer = jlib:nameprep(Server),
mnesia:dirty_select(session,
[{#session{usr = '$1', _ = '_'},
[{'==', {element, 2, '$1'}, LServer}], ['$1']}]).
Mod = get_sm_backend(),
[S#session.usr || S <- Mod:get_sessions(LServer)].
-spec get_all_pids() -> [pid()].
get_all_pids() ->
mnesia:dirty_select(
session,
ets:fun2ms(
fun(#session{sid = {_, Pid}}) ->
Pid
end)).
Mod = get_sm_backend(),
[element(2, S#session.sid) || S <- Mod:get_sessions()].
get_vh_session_number(Server) ->
LServer = jlib:nameprep(Server),
Query = mnesia:dirty_select(
session_counter,
[{#session_counter{vhost = LServer, count = '$1'},
[],
['$1']}]),
case Query of
[Count] ->
Count;
_ -> 0
end.
Mod = get_sm_backend(),
length(Mod:get_sessions(LServer)).
register_iq_handler(Host, XMLNS, Module, Fun) ->
ejabberd_sm !
@ -343,18 +309,8 @@ unregister_iq_handler(Host, XMLNS) ->
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([]) ->
update_tables(),
mnesia:create_table(session,
[{ram_copies, [node()]},
{attributes, record_info(fields, session)}]),
mnesia:create_table(session_counter,
[{ram_copies, [node()]},
{attributes, record_info(fields, session_counter)}]),
mnesia:add_table_index(session, usr),
mnesia:add_table_index(session, us),
mnesia:add_table_copy(session, node(), ram_copies),
mnesia:add_table_copy(session_counter, node(), ram_copies),
mnesia:subscribe(system),
Mod = get_sm_backend(),
Mod:init(),
ets:new(sm_iqtable, [named_table]),
lists:foreach(
fun(Host) ->
@ -366,7 +322,6 @@ init([]) ->
ejabberd_sm, disconnect_removed_user, 100)
end, ?MYHOSTS),
ejabberd_commands:register_commands(commands()),
{ok, #state{}}.
%%--------------------------------------------------------------------
@ -404,9 +359,6 @@ handle_info({route, From, To, Packet}, State) ->
ok
end,
{noreply, State};
handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
recount_session_table(Node),
{noreply, State};
handle_info({register_iq_handler, Host, XMLNS, Module, Function}, State) ->
ets:insert(sm_iqtable, {{XMLNS, Host}, Module, Function}),
{noreply, State};
@ -454,38 +406,9 @@ set_session(SID, User, Server, Resource, Priority, Info) ->
LResource = jlib:resourceprep(Resource),
US = {LUser, LServer},
USR = {LUser, LServer, LResource},
F = fun () ->
mnesia:write(#session{sid = SID, usr = USR, us = US,
priority = Priority, info = Info})
end,
mnesia:sync_dirty(F).
%% Recalculates alive sessions when Node goes down
%% and updates session and session_counter tables
recount_session_table(Node) ->
F = fun() ->
Es = mnesia:select(
session,
[{#session{sid = {'_', '$1'}, _ = '_'},
[{'==', {node, '$1'}, Node}],
['$_']}]),
lists:foreach(fun(E) ->
mnesia:delete({session, E#session.sid})
end, Es),
%% reset session_counter table with active sessions
mnesia:clear_table(session_counter),
lists:foreach(fun(Server) ->
LServer = jlib:nameprep(Server),
Hs = mnesia:select(session,
[{#session{usr = '$1', _ = '_'},
[{'==', {element, 2, '$1'}, LServer}],
['$1']}]),
mnesia:write(
#session_counter{vhost = LServer,
count = length(Hs)})
end, ?MYHOSTS)
end,
mnesia:async_dirty(F).
Mod = get_sm_backend(),
Mod:set_session(#session{sid = SID, usr = USR, us = US,
priority = Priority, info = Info}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -499,8 +422,9 @@ do_route(From, To, {broadcast, _} = Packet) ->
end,
get_user_resources(To#jid.user, To#jid.server));
_ ->
USR = jlib:jid_tolower(To),
case mnesia:dirty_index_read(session, USR, #session.usr) of
{U, S, R} = jlib:jid_tolower(To),
Mod = get_sm_backend(),
case Mod:get_sessions(U, S, R) of
[] ->
?DEBUG("packet dropped~n", []);
Ss ->
@ -589,9 +513,8 @@ do_route(From, To, #xmlel{} = Packet) ->
_ -> ok
end;
_ ->
USR = {LUser, LServer, LResource},
case mnesia:dirty_index_read(session, USR, #session.usr)
of
Mod = get_sm_backend(),
case Mod:get_sessions(LUser, LServer, LResource) of
[] ->
case Name of
<<"message">> -> route_message(From, To, Packet);
@ -646,10 +569,9 @@ route_message(From, To, Packet) ->
when is_integer(Priority), Priority >= 0 ->
lists:foreach(fun ({P, R}) when P == Priority ->
LResource = jlib:resourceprep(R),
USR = {LUser, LServer, LResource},
case mnesia:dirty_index_read(session, USR,
#session.usr)
of
Mod = get_sm_backend(),
case Mod:get_sessions(LUser, LServer,
LResource) of
[] ->
ok; % Race condition
Ss ->
@ -730,17 +652,15 @@ is_existing_resource(LUser, LServer, LResource) ->
[] /= get_resource_sessions(LUser, LServer, LResource).
get_resource_sessions(User, Server, Resource) ->
USR = {jlib:nodeprep(User), jlib:nameprep(Server),
jlib:resourceprep(Resource)},
mnesia:dirty_select(session,
[{#session{sid = '$1', usr = USR, _ = '_'}, [],
['$1']}]).
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
LResource = jlib:resourceprep(Resource),
Mod = get_sm_backend(),
[S#session.sid || S <- Mod:get_sessions(LUser, LServer, LResource)].
check_max_sessions(LUser, LServer) ->
SIDs = mnesia:dirty_select(session,
[{#session{sid = '$1', us = {LUser, LServer},
_ = '_'},
[], ['$1']}]),
Mod = get_sm_backend(),
SIDs = [S#session.sid || S <- Mod:get_sessions(LUser, LServer)],
MaxSessions = get_max_user_sessions(LUser, LServer),
if length(SIDs) =< MaxSessions -> ok;
true -> {_, Pid} = lists:min(SIDs), Pid ! replaced
@ -790,17 +710,22 @@ process_iq(From, To, Packet) ->
-spec force_update_presence({binary(), binary()}) -> any().
force_update_presence({LUser, _LServer} = US) ->
case catch mnesia:dirty_index_read(session, US,
#session.us)
of
{'EXIT', _Reason} -> ok;
Ss ->
lists:foreach(fun (#session{sid = {_, Pid}}) ->
Pid ! {force_update_presence, LUser}
end,
Ss)
end.
force_update_presence({LUser, LServer}) ->
Mod = get_sm_backend(),
Ss = Mod:get_sessions(LUser, LServer),
lists:foreach(fun (#session{sid = {_, Pid}}) ->
Pid ! {force_update_presence, LUser}
end,
Ss).
-spec get_sm_backend() -> module().
get_sm_backend() ->
DBType = ejabberd_config:get_option(sm_db_type,
fun(mnesia) -> mnesia;
(internal) -> mnesia
end, mnesia),
list_to_atom("ejabberd_sm_" ++ atom_to_list(DBType)).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% ejabberd commands
@ -852,29 +777,3 @@ kick_user(User, Server) ->
PID ! kick
end, Resources),
length(Resources).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Update Mnesia tables
update_tables() ->
case catch mnesia:table_info(session, attributes) of
[ur, user, node] -> mnesia:delete_table(session);
[ur, user, pid] -> mnesia:delete_table(session);
[usr, us, pid] -> mnesia:delete_table(session);
[usr, us, sid, priority, info] -> mnesia:delete_table(session);
[sid, usr, us, priority] ->
mnesia:delete_table(session);
[sid, usr, us, priority, info] -> ok;
{'EXIT', _} -> ok
end,
case lists:member(presence, mnesia:system_info(tables))
of
true -> mnesia:delete_table(presence);
false -> ok
end,
case lists:member(local_session, mnesia:system_info(tables)) of
true ->
mnesia:delete_table(local_session);
false ->
ok
end.

148
src/ejabberd_sm_mnesia.erl Normal file
View File

@ -0,0 +1,148 @@
%%%-------------------------------------------------------------------
%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
%%% @copyright (C) 2015, Evgeny Khramtsov
%%% @doc
%%%
%%% @end
%%% Created : 9 Mar 2015 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%-------------------------------------------------------------------
-module(ejabberd_sm_mnesia).
-behaviour(gen_server).
-behaviour(ejabberd_sm).
%% API
-export([init/0,
get_session/1,
set_session/1,
delete_session/1,
get_sessions/0,
get_sessions/1,
get_sessions/2,
get_sessions/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-include("ejabberd.hrl").
-include("ejabberd_sm.hrl").
-include("jlib.hrl").
-include_lib("stdlib/include/ms_transform.hrl").
-record(state, {}).
%%%===================================================================
%%% API
%%%===================================================================
-spec init() -> ok | {error, any()}.
init() ->
case gen_server:start_link({local, ?MODULE}, ?MODULE, [], []) of
{ok, _Pid} ->
ok;
Err ->
Err
end.
-spec get_session(sid()) -> {ok, #session{}} | {error, notfound}.
get_session(SID) ->
case mnesia:dirty_read(session, SID) of
[] ->
{error, notfound};
[Session] ->
{ok, Session}
end.
-spec set_session(#session{}) -> ok.
set_session(Session) ->
mnesia:dirty_write(Session).
-spec delete_session(sid()) -> ok.
delete_session(SID) ->
mnesia:dirty_delete(session, SID).
-spec get_sessions() -> [#session{}].
get_sessions() ->
ets:tab2list(session).
-spec get_sessions(binary()) -> [#session{}].
get_sessions(LServer) ->
mnesia:dirty_select(session,
[{#session{usr = '$1', _ = '_'},
[{'==', {element, 2, '$1'}, LServer}], ['$_']}]).
-spec get_sessions(binary(), binary()) -> [#session{}].
get_sessions(LUser, LServer) ->
mnesia:dirty_index_read(session, {LUser, LServer}, #session.us).
-spec get_sessions(binary(), binary(), binary()) -> [#session{}].
get_sessions(LUser, LServer, LResource) ->
mnesia:dirty_index_read(session, {LUser, LServer, LResource}, #session.usr).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([]) ->
update_tables(),
mnesia:create_table(session,
[{ram_copies, [node()]},
{attributes, record_info(fields, session)}]),
mnesia:create_table(session_counter,
[{ram_copies, [node()]},
{attributes, record_info(fields, session_counter)}]),
mnesia:add_table_index(session, usr),
mnesia:add_table_index(session, us),
mnesia:add_table_copy(session, node(), ram_copies),
mnesia:add_table_copy(session_counter, node(), ram_copies),
mnesia:subscribe(system),
{ok, #state{}}.
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
ets:select_delete(
session,
ets:fun2ms(
fun(#session{sid = {_, Pid}}) ->
node(Pid) == Node
end)),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
update_tables() ->
case catch mnesia:table_info(session, attributes) of
[ur, user, node] -> mnesia:delete_table(session);
[ur, user, pid] -> mnesia:delete_table(session);
[usr, us, pid] -> mnesia:delete_table(session);
[usr, us, sid, priority, info] -> mnesia:delete_table(session);
[sid, usr, us, priority] ->
mnesia:delete_table(session);
[sid, usr, us, priority, info] -> ok;
{'EXIT', _} -> ok
end,
case lists:member(presence, mnesia:system_info(tables))
of
true -> mnesia:delete_table(presence);
false -> ok
end,
case lists:member(local_session, mnesia:system_info(tables)) of
true ->
mnesia:delete_table(local_session);
false ->
ok
end.