From 3a96d72a7f8aec54ab19e1b88968b48d714ce01e Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Mon, 22 May 2017 10:34:57 +0300 Subject: [PATCH] Implement cache for mod_private --- src/mod_private.erl | 161 +++++++++++++++++++++++++++++++------ src/mod_private_mnesia.erl | 43 +++++++--- src/mod_private_riak.erl | 31 ++++--- src/mod_private_sql.erl | 62 +++++++++----- 4 files changed, 229 insertions(+), 68 deletions(-) diff --git a/src/mod_private.erl b/src/mod_private.erl index 8f9059d90..1cc5e3c11 100644 --- a/src/mod_private.erl +++ b/src/mod_private.erl @@ -37,21 +37,27 @@ -include("ejabberd.hrl"). -include("logger.hrl"). - -include("xmpp.hrl"). -include("mod_private.hrl"). +-define(PRIVATE_CACHE, private_cache). + -callback init(binary(), gen_mod:opts()) -> any(). -callback import(binary(), binary(), [binary()]) -> ok. --callback set_data(binary(), binary(), [{binary(), xmlel()}]) -> {atomic, any()}. --callback get_data(binary(), binary(), binary()) -> {ok, xmlel()} | error. --callback get_all_data(binary(), binary()) -> [xmlel()]. --callback remove_user(binary(), binary()) -> any(). +-callback set_data(binary(), binary(), [{binary(), xmlel()}]) -> ok | {error, any()}. +-callback get_data(binary(), binary(), binary()) -> {ok, xmlel()} | error | {error, any()}. +-callback get_all_data(binary(), binary()) -> {ok, [xmlel()]} | error | {error, any()}. +-callback del_data(binary(), binary()) -> ok | {error, any()}. +-callback use_cache(binary()) -> boolean(). +-callback cache_nodes(binary()) -> [node()]. + +-optional_callbacks([use_cache/1, cache_nodes/1]). start(Host, Opts) -> IQDisc = gen_mod:get_opt(iqdisc, Opts, gen_iq_handler:iqdisc(Host)), Mod = gen_mod:db_mod(Host, Opts, ?MODULE), Mod:init(Host, Opts), + init_cache(Mod, Host, Opts), ejabberd_hooks:add(remove_user, Host, ?MODULE, remove_user, 50), gen_iq_handler:add_iq_handler(ejabberd_sm, Host, @@ -71,6 +77,7 @@ reload(Host, NewOpts, OldOpts) -> true -> ok end, + init_cache(NewMod, Host, NewOpts), case gen_mod:is_equal_opt(iqdisc, NewOpts, OldOpts, gen_iq_handler:iqdisc(Host)) of {false, IQDisc, _} -> gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_PRIVATE, @@ -89,11 +96,23 @@ process_sm_iq(#iq{type = Type, lang = Lang, Txt = <<"No private data found in this query">>, xmpp:make_error(IQ, xmpp:err_bad_request(Txt, Lang)); Data when Type == set -> - set_data(LUser, LServer, Data), - xmpp:make_iq_result(IQ); + case set_data(LUser, LServer, Data) of + ok -> + xmpp:make_iq_result(IQ); + {error, _} -> + Txt = <<"Database failure">>, + Err = xmpp:err_internal_server_error(Txt, Lang), + xmpp:make_error(IQ, Err) + end; Data when Type == get -> - StorageEls = get_data(LUser, LServer, Data), - xmpp:make_iq_result(IQ, #private{xml_els = StorageEls}) + case get_data(LUser, LServer, Data) of + {error, _} -> + Txt = <<"Database failure">>, + Err = xmpp:err_internal_server_error(Txt, Lang), + xmpp:make_error(IQ, Err); + Els -> + xmpp:make_iq_result(IQ, #private{xml_els = Els}) + end end; process_sm_iq(#iq{lang = Lang} = IQ) -> Txt = <<"Query to another users is forbidden">>, @@ -109,35 +128,124 @@ filter_xmlels(Els) -> end end, Els). --spec set_data(binary(), binary(), [{binary(), xmlel()}]) -> {atomic, any()}. +-spec set_data(binary(), binary(), [{binary(), xmlel()}]) -> ok | {error, _}. set_data(LUser, LServer, Data) -> Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:set_data(LUser, LServer, Data). + case Mod:set_data(LUser, LServer, Data) of + ok -> + delete_cache(Mod, LServer, LServer, Data); + {error, _} = Err -> + Err + end. --spec get_data(binary(), binary(), [{binary(), xmlel()}]) -> [xmlel()]. +-spec get_data(binary(), binary(), [{binary(), xmlel()}]) -> [xmlel()] | {error, _}. get_data(LUser, LServer, Data) -> Mod = gen_mod:db_mod(LServer, ?MODULE), - lists:map( - fun({NS, El}) -> - case Mod:get_data(LUser, LServer, NS) of + lists:foldr( + fun(_, {error, _} = Err) -> + Err; + ({NS, El}, Els) -> + Res = case use_cache(Mod, LServer) of + true -> + ets_cache:lookup( + ?PRIVATE_CACHE, {LUser, LServer, NS}, + fun() -> Mod:get_data(LUser, LServer, NS) end); + false -> + Mod:get_data(LUser, LServer, NS) + end, + case Res of {ok, StorageEl} -> - StorageEl; + [StorageEl|Els]; error -> - El + [El|Els]; + {error, _} = Err -> + Err end - end, Data). + end, [], Data). --spec get_data(binary(), binary()) -> [xmlel()]. +-spec get_data(binary(), binary()) -> [xmlel()] | {error, _}. get_data(LUser, LServer) -> Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:get_all_data(LUser, LServer). + case Mod:get_all_data(LUser, LServer) of + {ok, Els} -> Els; + error -> []; + {error, _} = Err -> Err + end. --spec remove_user(binary(), binary()) -> any(). +-spec remove_user(binary(), binary()) -> ok. remove_user(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), Mod = gen_mod:db_mod(Server, ?MODULE), - Mod:remove_user(LUser, LServer). + Data = case use_cache(Mod, LServer) of + true -> + case Mod:get_all_data(LUser, LServer) of + {ok, Els} -> filter_xmlels(Els); + _ -> [] + end; + false -> + [] + end, + Mod:del_data(LUser, LServer), + delete_cache(Mod, LUser, LServer, Data). + +-spec delete_cache(module(), binary(), binary(), [{binary(), xmlel()}]) -> ok. +delete_cache(Mod, LUser, LServer, Data) -> + case use_cache(Mod, LServer) of + true -> + Nodes = cache_nodes(Mod, LServer), + lists:foreach( + fun({NS, _}) -> + ets_cache:delete(?PRIVATE_CACHE, + {LUser, LServer, NS}, + Nodes) + end, Data); + false -> + ok + end. + +-spec init_cache(module(), binary(), gen_mod:opts()) -> ok. +init_cache(Mod, Host, Opts) -> + case use_cache(Mod, Host) of + true -> + CacheOpts = cache_opts(Host, Opts), + ets_cache:new(?PRIVATE_CACHE, CacheOpts); + false -> + ets_cache:delete(?PRIVATE_CACHE) + end. + +-spec cache_opts(binary(), gen_mod:opts()) -> [proplists:property()]. +cache_opts(Host, Opts) -> + MaxSize = gen_mod:get_opt( + cache_size, Opts, + ejabberd_config:cache_size(Host)), + CacheMissed = gen_mod:get_opt( + cache_missed, Opts, + ejabberd_config:cache_missed(Host)), + LifeTime = case gen_mod:get_opt( + cache_life_time, Opts, + ejabberd_config:cache_life_time(Host)) of + infinity -> infinity; + I -> timer:seconds(I) + end, + [{max_size, MaxSize}, {cache_missed, CacheMissed}, {life_time, LifeTime}]. + +-spec use_cache(module(), binary()) -> boolean(). +use_cache(Mod, Host) -> + case erlang:function_exported(Mod, use_cache, 1) of + true -> Mod:use_cache(Host); + false -> + gen_mod:get_module_opt( + Host, ?MODULE, use_cache, + ejabberd_config:use_cache(Host)) + end. + +-spec cache_nodes(module(), binary()) -> [node()]. +cache_nodes(Mod, Host) -> + case erlang:function_exported(Mod, cache_nodes, 1) of + true -> Mod:cache_nodes(Host); + false -> ejabberd_cluster:get_nodes() + end. import_info() -> [{<<"private_storage">>, 4}]. @@ -159,4 +267,11 @@ depends(_Host, _Opts) -> mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end; mod_opt_type(iqdisc) -> fun gen_iq_handler:check_type/1; -mod_opt_type(_) -> [db_type, iqdisc]. +mod_opt_type(O) when O == cache_life_time; O == cache_size -> + fun (I) when is_integer(I), I > 0 -> I; + (infinity) -> infinity + end; +mod_opt_type(O) when O == use_cache; O == cache_missed -> + fun (B) when is_boolean(B) -> B end; +mod_opt_type(_) -> + [db_type, iqdisc, cache_life_time, cache_size, use_cache, cache_missed]. diff --git a/src/mod_private_mnesia.erl b/src/mod_private_mnesia.erl index 67417bb7f..04c1a04a1 100644 --- a/src/mod_private_mnesia.erl +++ b/src/mod_private_mnesia.erl @@ -27,8 +27,8 @@ -behaviour(mod_private). %% API --export([init/2, set_data/3, get_data/3, get_all_data/2, remove_user/2, - import/3]). +-export([init/2, set_data/3, get_data/3, get_all_data/2, del_data/2, + use_cache/1, import/3]). -export([need_transform/1, transform/1]). -include("xmpp.hrl"). @@ -43,6 +43,16 @@ init(_Host, _Opts) -> [{disc_only_copies, [node()]}, {attributes, record_info(fields, private_storage)}]). +use_cache(Host) -> + case mnesia:table_info(private_storage, storage_type) of + disc_only_copies -> + gen_mod:get_module_opt( + Host, mod_private, use_cache, + ejabberd_config:use_cache(Host)); + _ -> + false + end. + set_data(LUser, LServer, Data) -> F = fun () -> lists:foreach( @@ -53,7 +63,7 @@ set_data(LUser, LServer, Data) -> xml = Xmlel}) end, Data) end, - mnesia:transaction(F). + transaction(F). get_data(LUser, LServer, XmlNS) -> case mnesia:dirty_read(private_storage, {LUser, LServer, XmlNS}) of @@ -64,13 +74,18 @@ get_data(LUser, LServer, XmlNS) -> end. get_all_data(LUser, LServer) -> - lists:flatten( - mnesia:dirty_select(private_storage, - [{#private_storage{usns = {LUser, LServer, '_'}, - xml = '$1'}, - [], ['$1']}])). + case lists:flatten( + mnesia:dirty_select(private_storage, + [{#private_storage{usns = {LUser, LServer, '_'}, + xml = '$1'}, + [], ['$1']}])) of + [] -> + error; + Res -> + {ok, Res} + end. -remove_user(LUser, LServer) -> +del_data(LUser, LServer) -> F = fun () -> Namespaces = mnesia:select(private_storage, [{#private_storage{usns = @@ -86,7 +101,7 @@ remove_user(LUser, LServer) -> end, Namespaces) end, - mnesia:transaction(F). + transaction(F). import(LServer, <<"private_storage">>, [LUser, XMLNS, XML, _TimeStamp]) -> @@ -110,3 +125,11 @@ transform(#private_storage{usns = {U, S, NS}, xml = El} = R) -> %%%=================================================================== %%% Internal functions %%%=================================================================== +transaction(F) -> + case mnesia:transaction(F) of + {atomic, Res} -> + Res; + {aborted, Reason} -> + ?ERROR_MSG("Mnesia transaction failed: ~p", [Reason]), + {error, db_failure} + end. diff --git a/src/mod_private_riak.erl b/src/mod_private_riak.erl index 5c5185573..be175f071 100644 --- a/src/mod_private_riak.erl +++ b/src/mod_private_riak.erl @@ -27,7 +27,7 @@ -behaviour(mod_private). %% API --export([init/2, set_data/3, get_data/3, get_all_data/2, remove_user/2, +-export([init/2, set_data/3, get_data/3, get_all_data/2, del_data/2, import/3]). -include("xmpp.hrl"). @@ -40,37 +40,42 @@ init(_Host, _Opts) -> ok. set_data(LUser, LServer, Data) -> - lists:foreach( - fun({XMLNS, El}) -> + lists:foldl( + fun(_, {error, _} = Err) -> + Err; + ({XMLNS, El}, _) -> ejabberd_riak:put(#private_storage{usns = {LUser, LServer, XMLNS}, xml = El}, private_storage_schema(), [{'2i', [{<<"us">>, {LUser, LServer}}]}]) - end, Data), - {atomic, ok}. + end, ok, Data). get_data(LUser, LServer, XMLNS) -> case ejabberd_riak:get(private_storage, private_storage_schema(), {LUser, LServer, XMLNS}) of {ok, #private_storage{xml = El}} -> {ok, El}; - _ -> - error + {error, notfound} -> + error; + Err -> + Err end. get_all_data(LUser, LServer) -> case ejabberd_riak:get_by_index( private_storage, private_storage_schema(), <<"us">>, {LUser, LServer}) of + {ok, []} -> + error; {ok, Res} -> - [El || #private_storage{xml = El} <- Res]; - _ -> - [] + {ok, [El || #private_storage{xml = El} <- Res]}; + Err -> + Err end. -remove_user(LUser, LServer) -> - {atomic, ejabberd_riak:delete_by_index(private_storage, - <<"us">>, {LUser, LServer})}. +del_data(LUser, LServer) -> + ejabberd_riak:delete_by_index(private_storage, + <<"us">>, {LUser, LServer}). import(LServer, <<"private_storage">>, [LUser, XMLNS, XML, _TimeStamp]) -> diff --git a/src/mod_private_sql.erl b/src/mod_private_sql.erl index 800b31a21..d7e4b3aef 100644 --- a/src/mod_private_sql.erl +++ b/src/mod_private_sql.erl @@ -27,11 +27,12 @@ -behaviour(mod_private). %% API --export([init/2, set_data/3, get_data/3, get_all_data/2, remove_user/2, +-export([init/2, set_data/3, get_data/3, get_all_data/2, del_data/2, import/3, export/1]). -include("xmpp.hrl"). -include("mod_private.hrl"). +-include("logger.hrl"). %%%=================================================================== %%% API @@ -48,39 +49,46 @@ set_data(LUser, LServer, Data) -> LServer, LUser, XMLNS, SData) end, Data) end, - ejabberd_sql:sql_transaction(LServer, F). + case ejabberd_sql:sql_transaction(LServer, F) of + {atomic, ok} -> + ok; + _ -> + {error, db_failure} + end. get_data(LUser, LServer, XMLNS) -> - case catch sql_queries:get_private_data(LServer, LUser, XMLNS) of + case sql_queries:get_private_data(LServer, LUser, XMLNS) of {selected, [{SData}]} -> - case fxml_stream:parse_element(SData) of - Data when is_record(Data, xmlel) -> - {ok, Data}; - _ -> - error - end; + parse_element(LUser, LServer, SData); + {selected, []} -> + error; _ -> - error + {error, db_failure} end. get_all_data(LUser, LServer) -> case catch sql_queries:get_private_data(LServer, LUser) of + {selected, []} -> + error; {selected, Res} -> - lists:flatmap( - fun({_, SData}) -> - case fxml_stream:parse_element(SData) of - #xmlel{} = El -> - [El]; - _ -> - [] - end - end, Res); + {ok, lists:flatmap( + fun({_, SData}) -> + case parse_element(LUser, LServer, SData) of + {ok, El} -> [El]; + error -> [] + end + end, Res)}; _ -> - [] + {error, db_failure} end. -remove_user(LUser, LServer) -> - sql_queries:del_user_private_storage(LServer, LUser). +del_data(LUser, LServer) -> + case sql_queries:del_user_private_storage(LServer, LUser) of + {updated, _} -> + ok; + _ -> + {error, db_failure} + end. export(_Server) -> [{private_storage, @@ -99,3 +107,13 @@ import(_, _, _) -> %%%=================================================================== %%% Internal functions %%%=================================================================== +parse_element(LUser, LServer, XML) -> + case fxml_stream:parse_element(XML) of + El when is_record(El, xmlel) -> + {ok, El}; + _ -> + ?ERROR_MSG("malformed XML element in SQL table " + "'private_storage' for user ~s@~s: ~s", + [LUser, LServer, XML]), + error + end.