From f8dd973373a139b9857720dcb4405509e5e4fc76 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Tue, 10 Aug 2010 19:42:22 +1000 Subject: [PATCH 1/8] fixes typo for table copy --- src/odbc/ejabberd_odbc_sup.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/odbc/ejabberd_odbc_sup.erl b/src/odbc/ejabberd_odbc_sup.erl index 45ede1835..8fb6e2552 100644 --- a/src/odbc/ejabberd_odbc_sup.erl +++ b/src/odbc/ejabberd_odbc_sup.erl @@ -54,7 +54,7 @@ start_link(Host) -> {type, bag}, {local_content, true}, {attributes, record_info(fields, sql_pool)}]), - mnesia:add_table_copy(local_config, node(), ram_copies), + mnesia:add_table_copy(sql_pool, node(), ram_copies), F = fun() -> mnesia:delete({sql_pool, Host}) end, From 3024bb0cbf359f3e14b5386ed14ecfb168e8a267 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Thu, 19 Aug 2010 16:28:31 +1000 Subject: [PATCH 2/8] fixes ampersand escaping (EJAB-1258) --- src/eldap/eldap_filter.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/eldap/eldap_filter.erl b/src/eldap/eldap_filter.erl index 51dac5ec7..341fba13a 100644 --- a/src/eldap/eldap_filter.erl +++ b/src/eldap/eldap_filter.erl @@ -171,7 +171,7 @@ do_sub(S, {RegExp, New, Times}, Iter) -> end. replace_amps(String) -> - lists:map( + lists:flatmap( fun($&) -> "\\&"; - (Chr) -> Chr + (Chr) -> [Chr] end, String). From d87fff1a4c500d9abe3945cbfea792b4eddfa3f2 Mon Sep 17 00:00:00 2001 From: Badlop Date: Thu, 19 Aug 2010 17:37:21 +0200 Subject: [PATCH 3/8] Bugfix http-poll for correctly parsing binary (thanks to Peter Lemenkov) --- src/web/ejabberd_http_poll.erl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/web/ejabberd_http_poll.erl b/src/web/ejabberd_http_poll.erl index 0bdbc6287..faa0f3c17 100644 --- a/src/web/ejabberd_http_poll.erl +++ b/src/web/ejabberd_http_poll.erl @@ -272,7 +272,13 @@ handle_event(_Event, StateName, StateData) -> %% {stop, Reason, Reply, NewStateData} %%---------------------------------------------------------------------- handle_sync_event({send, Packet}, _From, StateName, StateData) -> - Output = StateData#state.output ++ [lists:flatten(Packet)], + Packet2 = if + is_binary(Packet) -> + binary_to_list(Packet); + true -> + Packet + end, + Output = StateData#state.output ++ [lists:flatten(Packet2)], Reply = ok, {reply, Reply, StateName, StateData#state{output = Output}}; From 4b5ef8f2cefbddfaa468fe30a0a205da51b330d2 Mon Sep 17 00:00:00 2001 From: Badlop Date: Sat, 21 Aug 2010 18:56:01 +0200 Subject: [PATCH 4/8] Add mod_pubsub_odbc to list of ODBC modules (EJAB-1293) --- doc/guide.html | 5 +++-- doc/guide.tex | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/doc/guide.html b/doc/guide.html index 57873a873..50cdca74a 100644 --- a/doc/guide.html +++ b/doc/guide.html @@ -1944,8 +1944,8 @@ Mnesia as backend. (see 3.2) as backend.
  • ‘_ldap’, this means that the module needs an LDAP server as backend.
  • If you want to, -it is possible to use a relational database to store pieces of -information. You can do this by changing the module name to a name with an +it is possible to use a relational database to store the tables created by some ejabberd modules. +You can do this by changing the module name to a name with an _odbc suffix in ejabberd config file. You can use a relational database for the following data:

    • Last connection date and time: Use mod_last_odbc instead of @@ -1956,6 +1956,7 @@ Last connection date and time: Use mod_last_odbc instead of
    • Users’ VCARD: Use mod_vcard_odbc instead of mod_vcard.
    • Private XML storage: Use mod_private_odbc instead of mod_private.
    • User rules for blocking communications: Use mod_privacy_odbc instead of mod_privacy. +
    • Pub-Sub nodes, items and subscriptions: Use mod_pubsub_odbc instead of mod_pubsub.

    You can find more contributed modules on the ejabberd website. Please remember that these contributions might not work or diff --git a/doc/guide.tex b/doc/guide.tex index 044fab829..73f848f52 100644 --- a/doc/guide.tex +++ b/doc/guide.tex @@ -2560,8 +2560,8 @@ You can see which database backend each module needs by looking at the suffix: \end{itemize} If you want to, -it is possible to use a relational database to store pieces of -information. You can do this by changing the module name to a name with an +it is possible to use a relational database to store the tables created by some ejabberd modules. +You can do this by changing the module name to a name with an \term{\_odbc} suffix in \ejabberd{} config file. You can use a relational database for the following data: @@ -2574,6 +2574,7 @@ database for the following data: \item Users' VCARD: Use \term{mod\_vcard\_odbc} instead of \term{mod\_vcard}. \item Private XML storage: Use \term{mod\_private\_odbc} instead of \term{mod\_private}. \item User rules for blocking communications: Use \term{mod\_privacy\_odbc} instead of \term{mod\_privacy}. +\item Pub-Sub nodes, items and subscriptions: Use \term{mod\_pubsub\_odbc} instead of \term{mod\_pubsub}. \end{itemize} You can find more From 695232450925071d40fedd0c5e727d336e1d5af6 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Mon, 30 Aug 2010 14:02:47 +1000 Subject: [PATCH 5/8] implemented backend-independed key-value cacheing table; applied to mod_caps.erl first --- src/cache_tab.erl | 480 ++++++++++++++++++++++++++++++++++++++++++ src/cache_tab_sup.erl | 34 +++ src/ejabberd_sup.erl | 8 + src/mod_caps.erl | 61 ++++-- 4 files changed, 569 insertions(+), 14 deletions(-) create mode 100644 src/cache_tab.erl create mode 100644 src/cache_tab_sup.erl diff --git a/src/cache_tab.erl b/src/cache_tab.erl new file mode 100644 index 000000000..e2551ec78 --- /dev/null +++ b/src/cache_tab.erl @@ -0,0 +1,480 @@ +%%%------------------------------------------------------------------- +%%% File : cache_tab.erl +%%% Author : Evgeniy Khramtsov +%%% Description : Caching key-value table +%%% +%%% Created : 29 Aug 2010 by Evgeniy Khramtsov +%%%------------------------------------------------------------------- +-module(cache_tab). + +-define(GEN_SERVER, gen_server). + +-behaviour(?GEN_SERVER). + +%% API +-export([start_link/4, new/2, delete/1, delete/3, lookup/3, + insert/4, info/2, tab2list/1, setopts/2, all/0, test/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-include("ejabberd.hrl"). + +-record(state, {tab = treap:empty(), + name, + size = 0, + owner, + max_size, + life_time, + warn, + hits = 0, + miss = 0, + procs_num, + cache_missed, + shrink_size}). + +-define(PROCNAME, ?MODULE). +-define(CALL_TIMEOUT, 60000). + +%% Defaults +-define(MAX_SIZE, 1000). +-define(WARN, true). +-define(CACHE_MISSED, true). +-define(LIFETIME, 600). %% 10 minutes + +%%==================================================================== +%% API +%%==================================================================== +start_link(Proc, Tab, Opts, Owner) -> + ?GEN_SERVER:start_link( + {local, Proc}, ?MODULE, [Tab, Opts, get_proc_num(), Owner], []). + +new(Tab, Opts) -> + Res = lists:flatmap( + fun(Proc) -> + Spec = {{Tab, Proc}, + {?MODULE, start_link, + [Proc, Tab, Opts, self()]}, + permanent, + brutal_kill, + worker, + [?MODULE]}, + case supervisor:start_child(cache_tab_sup, Spec) of + {ok, _Pid} -> + [ok]; + R -> + [R] + end + end, get_all_procs(Tab)), + case lists:filter(fun(ok) -> false; (_) -> true end, Res) of + [] -> + ok; + Err -> + {error, Err} + end. + +delete(Tab) -> + lists:foreach( + fun(Proc) -> + supervisor:terminate_child(cache_tab_sup, {Tab, Proc}), + supervisor:delete_child(cache_tab_sup, {Tab, Proc}) + end, get_all_procs(Tab)). + +delete(Tab, Key, F) -> + ?GEN_SERVER:call( + get_proc_by_hash(Tab, Key), {delete, Key, F}, ?CALL_TIMEOUT). + +lookup(Tab, Key, F) -> + ?GEN_SERVER:call( + get_proc_by_hash(Tab, Key), {lookup, Key, F}, ?CALL_TIMEOUT). + +insert(Tab, Key, Val, F) -> + ?GEN_SERVER:call( + get_proc_by_hash(Tab, Key), {insert, Key, Val, F}, ?CALL_TIMEOUT). + +info(Tab, Info) -> + case lists:map( + fun(Proc) -> + ?GEN_SERVER:call(Proc, {info, Info}, ?CALL_TIMEOUT) + end, get_all_procs(Tab)) of + Res when Info == size -> + {ok, lists:sum(Res)}; + Res when Info == all -> + {ok, Res}; + Res when Info == ratio -> + {H, M} = lists:foldl( + fun({Hits, Miss}, {HitsAcc, MissAcc}) -> + {HitsAcc + Hits, MissAcc + Miss} + end, {0, 0}, Res), + {ok, [{hits, H}, {miss, M}]}; + _ -> + {error, badarg} + end. + +setopts(Tab, Opts) -> + lists:foreach( + fun(Proc) -> + ?GEN_SERVER:call(Proc, {setopts, Opts}, ?CALL_TIMEOUT) + end, get_all_procs(Tab)). + +tab2list(Tab) -> + lists:flatmap( + fun(Proc) -> + ?GEN_SERVER:call(Proc, tab2list, ?CALL_TIMEOUT) + end, get_all_procs(Tab)). + +all() -> + lists:usort( + [Tab || {{Tab, _}, _, _, _} <- supervisor:which_children(cache_tab_sup)]). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== +init([Tab, Opts, N, Pid]) -> + State = #state{procs_num = N, + owner = Pid, + name = Tab}, + {ok, do_setopts(State, Opts)}. + +handle_call({lookup, Key, F}, _From, #state{tab = T} = State) -> + case treap:lookup(Key, T) of + {ok, _Prio, Val} -> + Hits = State#state.hits, + NewState = treap_update(Key, Val, State#state{hits = Hits + 1}), + case Val of + '$cached_mismatch' -> + {reply, error, NewState}; + _ -> + {reply, {ok, Val}, NewState} + end; + _ -> + case catch F() of + {ok, Val} -> + Miss = State#state.miss, + NewState = treap_insert(Key, Val, State), + {reply, {ok, Val}, NewState#state{miss = Miss + 1}}; + {'EXIT', Reason} -> + print_error(lookup, [Key], Reason, State), + {reply, error, State}; + _ -> + Miss = State#state.miss, + NewState = State#state{miss = Miss + 1}, + if State#state.cache_missed -> + {reply, error, + treap_insert(Key, '$cached_mismatch', NewState)}; + true -> + {reply, error, NewState} + end + end + end; +handle_call({insert, Key, Val, F}, _From, #state{tab = T} = State) -> + case treap:lookup(Key, T) of + {ok, _Prio, Val} -> + {reply, ok, State}; + Res -> + case catch F() of + {'EXIT', Reason} -> + print_error(insert, [Key, Val], Reason, State), + {reply, ok, State}; + _ -> + NewState = case Res of + {ok, _, _} -> + treap_update(Key, Val, State); + _ -> + treap_insert(Key, Val, State) + end, + {reply, ok, NewState} + end + end; +handle_call({delete, Key, F}, _From, State) -> + NewState = treap_delete(Key, State), + case catch F() of + {'EXIT', Reason} -> + print_error(delete, [Key], Reason, State); + _ -> + ok + end, + {reply, ok, NewState}; +handle_call({info, Info}, _From, State) -> + Res = case Info of + size -> + State#state.size; + ratio -> + {State#state.hits, State#state.miss}; + all -> + [{max_size, State#state.max_size}, + {life_time, State#state.life_time}, + {shrink_size, State#state.shrink_size}, + {size, State#state.size}, + {owner, State#state.owner}, + {hits, State#state.hits}, + {miss, State#state.miss}, + {cache_missed, State#state.cache_missed}, + {warn, State#state.warn}]; + _ -> + badarg + end, + {reply, Res, State}; +handle_call(tab2list, _From, #state{tab = T} = State) -> + Res = treap:fold( + fun({Key, _, Val}, Acc) -> + [{Key, Val}|Acc] + end, [], T), + {reply, Res, State}; +handle_call({setopts, Opts}, _From, State) -> + {reply, ok, do_setopts(State, Opts)}; +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- +do_setopts(#state{procs_num = N} = State, Opts) -> + MaxSize = case {proplists:get_value(max_size, Opts), + State#state.max_size} of + {MS, _} when is_integer(MS), MS > 0 -> + round(MS/N); + {unlimited, _} -> + unlimited; + {_, undefined} -> + round(?MAX_SIZE/N); + {_, MS} -> + MS + end, + LifeTime = case {proplists:get_value(life_time, Opts), + State#state.life_time} of + {LT, _} when is_integer(LT), LT > 0 -> + LT*1000; + {unlimited, _} -> + unlimited; + {_, undefined} -> + ?LIFETIME*1000; + {_, LT} -> + LT + end, + ShrinkSize = case {proplists:get_value(shrink_size, Opts), + State#state.shrink_size} of + {SS, _} when is_integer(SS), SS > 0 -> + round(SS/N); + _ when is_integer(MaxSize) -> + round(MaxSize/2); + _ -> + unlimited + end, + Warn = case {proplists:get_value(warn, Opts), + State#state.warn} of + {true, _} -> + true; + {false, _} -> + false; + {_, undefined} -> + ?WARN; + {_, W} -> + W + end, + CacheMissed = case proplists:get_value( + cache_missed, Opts, State#state.cache_missed) of + false -> + false; + true -> + true; + _ -> + ?CACHE_MISSED + end, + State#state{max_size = MaxSize, + warn = Warn, + life_time = LifeTime, + cache_missed = CacheMissed, + shrink_size = ShrinkSize}. + +get_proc_num() -> + erlang:system_info(logical_processors). + +get_proc_by_hash(Tab, Term) -> + N = erlang:phash2(Term, get_proc_num()) + 1, + get_proc(Tab, N). + +get_proc(Tab, N) -> + list_to_atom(atom_to_list(?PROCNAME) ++ "_" ++ + atom_to_list(Tab) ++ "_" ++ integer_to_list(N)). + +get_all_procs(Tab) -> + [get_proc(Tab, N) || N <- lists:seq(1, get_proc_num())]. + +now_priority() -> + {MSec, Sec, USec} = now(), + -((MSec*1000000 + Sec)*1000000 + USec). + +treap_update(Key, Val, #state{tab = T} = State) -> + Priority = now_priority(), + NewT = treap:insert(Key, Priority, Val, T), + State#state{tab = NewT}. + +treap_insert(Key, Val, State) -> + State1 = clean_treap(State), + #state{size = Size} = State2 = shrink_treap(State1), + treap_update(Key, Val, State2#state{size = Size+1}). + +treap_delete(Key, #state{tab = T, size = Size} = State) -> + case treap:lookup(Key, T) of + {ok, _, _} -> + NewT = treap:delete(Key, T), + clean_treap(State#state{tab = NewT, size = Size-1}); + _ -> + State + end. + +clean_treap(#state{tab = T, size = Size, life_time = LifeTime} = State) -> + if is_integer(LifeTime) -> + Priority = now_priority(), + {Cleaned, NewT} = clean_treap(T, Priority + LifeTime, 0), + State#state{size = Size - Cleaned, tab = NewT}; + true -> + State + end. + +clean_treap(Treap, CleanPriority, N) -> + case treap:is_empty(Treap) of + true -> + {N, Treap}; + false -> + {_Key, Priority, _Value} = treap:get_root(Treap), + if Priority > CleanPriority -> + clean_treap(treap:delete_root(Treap), CleanPriority, N+1); + true -> + {N, Treap} + end + end. + +shrink_treap(#state{tab = T, + max_size = MaxSize, + shrink_size = ShrinkSize, + warn = Warn, + size = Size} = State) when Size >= MaxSize -> + if Warn -> + ?WARNING_MSG("shrinking table:~n" + "** Table: ~p~n" + "** Processes Number: ~p~n" + "** Max Size: ~p items~n" + "** Shrink Size: ~p items~n" + "** Life Time: ~p microseconds~n" + "** Hits/Miss: ~p/~p~n" + "** Owner: ~p~n" + "** Cache Missed: ~p~n" + "** Instruction: you have to tune cacheing options" + " if this message repeats too frequently", + [State#state.name, State#state.procs_num, + MaxSize, ShrinkSize, State#state.life_time, + State#state.hits, State#state.miss, + State#state.owner, State#state.cache_missed]); + true -> + ok + end, + {Shrinked, NewT} = shrink_treap(T, ShrinkSize, 0), + State#state{tab = NewT, size = Size - Shrinked}; +shrink_treap(State) -> + State. + +shrink_treap(T, ShrinkSize, ShrinkSize) -> + {ShrinkSize, T}; +shrink_treap(T, ShrinkSize, N) -> + case treap:is_empty(T) of + true -> + {N, T}; + false -> + shrink_treap(treap:delete_root(T), ShrinkSize, N+1) + end. + +print_error(Operation, Args, Reason, State) -> + ?ERROR_MSG("callback failed:~n" + "** Tab: ~p~n" + "** Owner: ~p~n" + "** Operation: ~p~n" + "** Args: ~p~n" + "** Reason: ~p", + [State#state.name, State#state.owner, + Operation, Args, Reason]). + +%%-------------------------------------------------------------------- +%%% Tests +%%-------------------------------------------------------------------- +test() -> + LifeTime = 2, + ok = new(test_tbl, [{life_time, LifeTime}, {max_size, unlimited}]), + check([]), + ok = insert(test_tbl, "key", "value", fun() -> ok end), + check([{"key", "value"}]), + {ok, "value"} = lookup(test_tbl, "key", fun() -> error end), + check([{"key", "value"}]), + io:format("** waiting for ~p seconds to check if cleaner works fine...~n", + [LifeTime+1]), + timer:sleep(timer:seconds(LifeTime+1)), + ok = insert(test_tbl, "key1", "value1", fun() -> ok end), + check([{"key1", "value1"}]), + ok = delete(test_tbl, "key1", fun() -> ok end), + {ok, "value"} = lookup(test_tbl, "key", fun() -> {ok, "value"} end), + check([{"key", "value"}]), + ok = delete(test_tbl, "key", fun() -> ok end), + check([]), + %% io:format("** testing buggy callbacks...~n"), + %% delete(test_tbl, "key", fun() -> erlang:error(badarg) end), + %% insert(test_tbl, "key", "val", fun() -> erlang:error(badarg) end), + %% lookup(test_tbl, "key", fun() -> erlang:error(badarg) end), + check([]), + delete(test_tbl), + test1(). + +test1() -> + MaxSize = 10, + ok = new(test_tbl, [{max_size, MaxSize}, {shrink_size, 1}, {warn, false}]), + lists:foreach( + fun(N) -> + ok = insert(test_tbl, N, N, fun() -> ok end) + end, lists:seq(1, MaxSize*get_proc_num())), + {ok, MaxSize} = info(test_tbl, size), + delete(test_tbl), + success. +%% io:format("** testing speed, this may take a while...~n"), +%% test2(1000), +%% test2(10000), +%% test2(100000), +%% test2(1000000). + +%% test2(Iter) -> +%% ok = new(test_tbl, [{max_size, unlimited}, {life_time, unlimited}]), +%% L = lists:seq(1, Iter), +%% T1 = now(), +%% lists:foreach( +%% fun(N) -> +%% ok = insert(test_tbl, N, N, fun() -> ok end) +%% end, L), +%% io:format("** average insert (size = ~p): ~p usec~n", +%% [Iter, round(timer:now_diff(now(), T1)/Iter)]), +%% T2 = now(), +%% lists:foreach( +%% fun(N) -> +%% {ok, N} = lookup(test_tbl, N, fun() -> ok end) +%% end, L), +%% io:format("** average lookup (size = ~p): ~p usec~n", +%% [Iter, round(timer:now_diff(now(), T2)/Iter)]), +%% {ok, Iter} = info(test_tbl, size), +%% delete(test_tbl). + +check(List) -> + Size = length(List), + {ok, Size} = info(test_tbl, size), + List = tab2list(test_tbl). diff --git a/src/cache_tab_sup.erl b/src/cache_tab_sup.erl new file mode 100644 index 000000000..d417c5488 --- /dev/null +++ b/src/cache_tab_sup.erl @@ -0,0 +1,34 @@ +%%%------------------------------------------------------------------- +%%% File : cache_tab_sup.erl +%%% Author : Evgeniy Khramtsov +%%% Description : +%%% +%%% Created : 30 Aug 2010 by Evgeniy Khramtsov +%%%------------------------------------------------------------------- +-module(cache_tab_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +-define(SERVER, ?MODULE). + +%%==================================================================== +%% API functions +%%==================================================================== +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%%==================================================================== +%% Supervisor callbacks +%%==================================================================== +init([]) -> + {ok, {{one_for_one,10,1}, []}}. + +%%==================================================================== +%% Internal functions +%%==================================================================== diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl index f0119326a..7dc3068ed 100644 --- a/src/ejabberd_sup.erl +++ b/src/ejabberd_sup.erl @@ -177,6 +177,13 @@ init([]) -> infinity, supervisor, [ejabberd_tmp_sup]}, + CacheTabSupervisor = + {cache_tab_sup, + {cache_tab_sup, start_link, []}, + permanent, + infinity, + supervisor, + [cache_tab_sup]}, {ok, {{one_for_one, 10, 1}, [Hooks, NodeGroups, @@ -196,6 +203,7 @@ init([]) -> IQSupervisor, STUNSupervisor, FrontendSocketSupervisor, + CacheTabSupervisor, Listener]}}. diff --git a/src/mod_caps.erl b/src/mod_caps.erl index d9f4f30c8..01056fbe5 100644 --- a/src/mod_caps.erl +++ b/src/mod_caps.erl @@ -98,11 +98,12 @@ get_features(#caps{node = Node, version = Version, exts = Exts}) -> SubNodes = [Version | Exts], lists:foldl( fun(SubNode, Acc) -> - case mnesia:dirty_read({caps_features, - node_to_binary(Node, SubNode)}) of - [] -> + BinaryNode = node_to_binary(Node, SubNode), + case cache_tab:lookup(caps_features, BinaryNode, + caps_read_fun(BinaryNode)) of + error -> Acc; - [#caps_features{features = Features}] -> + {ok, Features} -> binary_to_features(Features) ++ Acc end end, [], SubNodes). @@ -196,12 +197,23 @@ disco_info(Acc, _Host, _Module, _Node, _Lang) -> %%==================================================================== %% gen_server callbacks %%==================================================================== -init([Host, _Opts]) -> +init([Host, Opts]) -> + case catch mnesia:table_info(caps_features, storage_type) of + {'EXIT', _} -> + ok; + disc_only_copies -> + ok; + _ -> + mnesia:delete_table(caps_features) + end, mnesia:create_table(caps_features, - [{disc_copies, [node()]}, + [{disc_only_copies, [node()]}, {local_content, true}, {attributes, record_info(fields, caps_features)}]), - mnesia:add_table_copy(caps_features, node(), disc_copies), + mnesia:add_table_copy(caps_features, node(), disc_only_copies), + MaxSize = gen_mod:get_opt(cache_size, Opts, 1000), + LifeTime = gen_mod:get_opt(cache_life_time, Opts, timer:hours(24)), + cache_tab:new(caps_features, [{max_size, MaxSize}, {life_time, LifeTime}]), ejabberd_hooks:add(user_send_packet, Host, ?MODULE, user_send_packet, 75), ejabberd_hooks:add(c2s_stream_features, Host, @@ -252,8 +264,9 @@ code_change(_OldVsn, State, _Extra) -> feature_request(Host, From, Caps, [SubNode | Tail] = SubNodes) -> Node = Caps#caps.node, BinaryNode = node_to_binary(Node, SubNode), - case mnesia:dirty_read({caps_features, BinaryNode}) of - [] -> + case cache_tab:lookup(caps_features, BinaryNode, + caps_read_fun(BinaryNode)) of + error -> IQ = #iq{type = get, xmlns = ?NS_DISCO_INFO, sub_el = [{xmlelement, "query", @@ -284,11 +297,13 @@ feature_response(#iq{type = result, (_) -> [] end, Els), - mnesia:dirty_write( - #caps_features{node_pair = BinaryNode, - features = features_to_binary(Features)}); + BinaryFeatures = features_to_binary(Features), + cache_tab:insert( + caps_features, BinaryNode, BinaryFeatures, + caps_write_fun(BinaryNode, BinaryFeatures)); false -> - mnesia:dirty_write(#caps_features{node_pair = BinaryNode}) + cache_tab:insert(caps_features, BinaryNode, [], + caps_write_fun(BinaryNode, [])) end, feature_request(Host, From, Caps, SubNodes); feature_response(timeout, _Host, _From, _Caps, _SubNodes) -> @@ -297,7 +312,8 @@ feature_response(_IQResult, Host, From, Caps, [SubNode | SubNodes]) -> %% We got type=error or invalid type=result stanza, so %% we cache empty feature not to probe the client permanently BinaryNode = node_to_binary(Caps#caps.node, SubNode), - mnesia:dirty_write(#caps_features{node_pair = BinaryNode}), + cache_tab:insert(caps_features, BinaryNode, [], + caps_write_fun(BinaryNode, [])), feature_request(Host, From, Caps, SubNodes). node_to_binary(Node, SubNode) -> @@ -306,6 +322,23 @@ node_to_binary(Node, SubNode) -> features_to_binary(L) -> [list_to_binary(I) || I <- L]. binary_to_features(L) -> [binary_to_list(I) || I <- L]. +caps_read_fun(Node) -> + fun() -> + case mnesia:dirty_read({caps_features, Node}) of + [#caps_features{features = Features}] -> + {ok, Features}; + _ -> + error + end + end. + +caps_write_fun(Node, Features) -> + fun() -> + mnesia:dirty_write( + #caps_features{node_pair = Node, + features = Features}) + end. + make_my_disco_hash(Host) -> JID = jlib:make_jid("", Host, ""), case {ejabberd_hooks:run_fold(disco_local_features, From c754c91ad1e68bb10afc541321dc13c4711fc665 Mon Sep 17 00:00:00 2001 From: Badlop Date: Mon, 30 Aug 2010 23:23:28 +0200 Subject: [PATCH 6/8] Add IPv6 support to mod_irc (thanks to Matthias Schiffer)(EJAB-1298) --- src/mod_irc/mod_irc_connection.erl | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/mod_irc/mod_irc_connection.erl b/src/mod_irc/mod_irc_connection.erl index 107e6602e..39a06a294 100644 --- a/src/mod_irc/mod_irc_connection.erl +++ b/src/mod_irc/mod_irc_connection.erl @@ -105,8 +105,16 @@ init([From, Host, Server, Username, Encoding, Port, Password]) -> open_socket(init, StateData) -> Addr = StateData#state.server, Port = StateData#state.port, - ?DEBUG("connecting to ~s:~p~n", [Addr, Port]), - case gen_tcp:connect(Addr, Port, [binary, {packet, 0}]) of + ?DEBUG("Connecting with IPv6 to ~s:~p", [Addr, Port]), + Connect6 = gen_tcp:connect(Addr, Port, [inet6, binary, {packet, 0}]), + Connect = case Connect6 of + {error, _} -> + ?DEBUG("Connection with IPv6 to ~s:~p failed. Now using IPv4.", [Addr, Port]), + gen_tcp:connect(Addr, Port, [inet, binary, {packet, 0}]); + _ -> + Connect6 + end, + case Connect of {ok, Socket} -> NewStateData = StateData#state{socket = Socket}, if From c75b7b2b125f9b6dd199a2e6a4a309c4add893f3 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Tue, 31 Aug 2010 18:06:02 +1000 Subject: [PATCH 7/8] Implemented dirty (non-atomic) functions; added copyright notice --- src/cache_tab.erl | 155 ++++++++++++++++++++++++++++++++---------- src/cache_tab_sup.erl | 21 +++++- 2 files changed, 139 insertions(+), 37 deletions(-) diff --git a/src/cache_tab.erl b/src/cache_tab.erl index e2551ec78..75ef6159a 100644 --- a/src/cache_tab.erl +++ b/src/cache_tab.erl @@ -4,6 +4,25 @@ %%% Description : Caching key-value table %%% %%% Created : 29 Aug 2010 by Evgeniy Khramtsov +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2010 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License +%%% along with this program; if not, write to the Free Software +%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA +%%% 02111-1307 USA +%%% %%%------------------------------------------------------------------- -module(cache_tab). @@ -13,7 +32,9 @@ %% API -export([start_link/4, new/2, delete/1, delete/3, lookup/3, - insert/4, info/2, tab2list/1, setopts/2, all/0, test/0]). + insert/4, info/2, tab2list/1, setopts/2, + dirty_lookup/3, dirty_insert/4, dirty_delete/3, + all/0, test/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -83,15 +104,43 @@ delete(Tab) -> delete(Tab, Key, F) -> ?GEN_SERVER:call( - get_proc_by_hash(Tab, Key), {delete, Key, F}, ?CALL_TIMEOUT). + get_proc_by_hash(Tab, Key), {delete, Key, F}, ?CALL_TIMEOUT). + +dirty_delete(Tab, Key, F) -> + F(), + ?GEN_SERVER:call( + get_proc_by_hash(Tab, Key), {dirty_delete, Key}, ?CALL_TIMEOUT). lookup(Tab, Key, F) -> ?GEN_SERVER:call( - get_proc_by_hash(Tab, Key), {lookup, Key, F}, ?CALL_TIMEOUT). + get_proc_by_hash(Tab, Key), {lookup, Key, F}, ?CALL_TIMEOUT). + +dirty_lookup(Tab, Key, F) -> + Proc = get_proc_by_hash(Tab, Key), + case ?GEN_SERVER:call(Proc, {dirty_lookup, Key}, ?CALL_TIMEOUT) of + {ok, '$cached_mismatch'} -> + error; + {ok, Val} -> + {ok, Val}; + _ -> + case F() of + {ok, Val} -> + ?GEN_SERVER:call( + Proc, {dirty_insert, Key, Val}, ?CALL_TIMEOUT), + {ok, Val}; + _ -> + error + end + end. insert(Tab, Key, Val, F) -> ?GEN_SERVER:call( - get_proc_by_hash(Tab, Key), {insert, Key, Val, F}, ?CALL_TIMEOUT). + get_proc_by_hash(Tab, Key), {insert, Key, Val, F}, ?CALL_TIMEOUT). + +dirty_insert(Tab, Key, Val, F) -> + F(), + ?GEN_SERVER:call( + get_proc_by_hash(Tab, Key), {dirty_insert, Key, Val}, ?CALL_TIMEOUT). info(Tab, Info) -> case lists:map( @@ -168,6 +217,22 @@ handle_call({lookup, Key, F}, _From, #state{tab = T} = State) -> end end end; +handle_call({dirty_lookup, Key}, _From, #state{tab = T} = State) -> + case treap:lookup(Key, T) of + {ok, _Prio, Val} -> + Hits = State#state.hits, + NewState = treap_update(Key, Val, State#state{hits = Hits + 1}), + {reply, {ok, Val}, NewState}; + _ -> + Miss = State#state.miss, + NewState = State#state{miss = Miss + 1}, + if State#state.cache_missed -> + {reply, error, + treap_insert(Key, '$cached_mismatch', NewState)}; + true -> + {reply, error, NewState} + end + end; handle_call({insert, Key, Val, F}, _From, #state{tab = T} = State) -> case treap:lookup(Key, T) of {ok, _Prio, Val} -> @@ -187,6 +252,15 @@ handle_call({insert, Key, Val, F}, _From, #state{tab = T} = State) -> {reply, ok, NewState} end end; +handle_call({dirty_insert, Key, Val}, _From, #state{tab = T} = State) -> + case treap:lookup(Key, T) of + {ok, _Prio, Val} -> + {reply, ok, State}; + {ok, _, _} -> + {reply, ok, treap_update(Key, Val, State)}; + _ -> + {reply, ok, treap_insert(Key, Val, State)} + end; handle_call({delete, Key, F}, _From, State) -> NewState = treap_delete(Key, State), case catch F() of @@ -196,6 +270,9 @@ handle_call({delete, Key, F}, _From, State) -> ok end, {reply, ok, NewState}; +handle_call({dirty_delete, Key}, _From, State) -> + NewState = treap_delete(Key, State), + {reply, ok, NewState}; handle_call({info, Info}, _From, State) -> Res = case Info of size -> @@ -412,23 +489,30 @@ print_error(Operation, Args, Reason, State) -> %%-------------------------------------------------------------------- %%% Tests %%-------------------------------------------------------------------- +-define(lookup, dirty_lookup). +-define(delete, dirty_delete). +-define(insert, dirty_insert). +%% -define(lookup, lookup). +%% -define(delete, delete). +%% -define(insert, insert). + test() -> LifeTime = 2, ok = new(test_tbl, [{life_time, LifeTime}, {max_size, unlimited}]), check([]), - ok = insert(test_tbl, "key", "value", fun() -> ok end), + ok = ?insert(test_tbl, "key", "value", fun() -> ok end), check([{"key", "value"}]), - {ok, "value"} = lookup(test_tbl, "key", fun() -> error end), + {ok, "value"} = ?lookup(test_tbl, "key", fun() -> error end), check([{"key", "value"}]), io:format("** waiting for ~p seconds to check if cleaner works fine...~n", [LifeTime+1]), timer:sleep(timer:seconds(LifeTime+1)), - ok = insert(test_tbl, "key1", "value1", fun() -> ok end), + ok = ?insert(test_tbl, "key1", "value1", fun() -> ok end), check([{"key1", "value1"}]), - ok = delete(test_tbl, "key1", fun() -> ok end), - {ok, "value"} = lookup(test_tbl, "key", fun() -> {ok, "value"} end), + ok = ?delete(test_tbl, "key1", fun() -> ok end), + {ok, "value"} = ?lookup(test_tbl, "key", fun() -> {ok, "value"} end), check([{"key", "value"}]), - ok = delete(test_tbl, "key", fun() -> ok end), + ok = ?delete(test_tbl, "key", fun() -> ok end), check([]), %% io:format("** testing buggy callbacks...~n"), %% delete(test_tbl, "key", fun() -> erlang:error(badarg) end), @@ -443,36 +527,35 @@ test1() -> ok = new(test_tbl, [{max_size, MaxSize}, {shrink_size, 1}, {warn, false}]), lists:foreach( fun(N) -> - ok = insert(test_tbl, N, N, fun() -> ok end) + ok = ?insert(test_tbl, N, N, fun() -> ok end) end, lists:seq(1, MaxSize*get_proc_num())), {ok, MaxSize} = info(test_tbl, size), delete(test_tbl), - success. -%% io:format("** testing speed, this may take a while...~n"), -%% test2(1000), -%% test2(10000), -%% test2(100000), -%% test2(1000000). + io:format("** testing speed, this may take a while...~n"), + test2(1000), + test2(10000), + test2(100000), + test2(1000000). -%% test2(Iter) -> -%% ok = new(test_tbl, [{max_size, unlimited}, {life_time, unlimited}]), -%% L = lists:seq(1, Iter), -%% T1 = now(), -%% lists:foreach( -%% fun(N) -> -%% ok = insert(test_tbl, N, N, fun() -> ok end) -%% end, L), -%% io:format("** average insert (size = ~p): ~p usec~n", -%% [Iter, round(timer:now_diff(now(), T1)/Iter)]), -%% T2 = now(), -%% lists:foreach( -%% fun(N) -> -%% {ok, N} = lookup(test_tbl, N, fun() -> ok end) -%% end, L), -%% io:format("** average lookup (size = ~p): ~p usec~n", -%% [Iter, round(timer:now_diff(now(), T2)/Iter)]), -%% {ok, Iter} = info(test_tbl, size), -%% delete(test_tbl). +test2(Iter) -> + ok = new(test_tbl, [{max_size, unlimited}, {life_time, unlimited}]), + L = lists:seq(1, Iter), + T1 = now(), + lists:foreach( + fun(N) -> + ok = ?insert(test_tbl, N, N, fun() -> ok end) + end, L), + io:format("** average insert (size = ~p): ~p usec~n", + [Iter, round(timer:now_diff(now(), T1)/Iter)]), + T2 = now(), + lists:foreach( + fun(N) -> + {ok, N} = ?lookup(test_tbl, N, fun() -> ok end) + end, L), + io:format("** average lookup (size = ~p): ~p usec~n", + [Iter, round(timer:now_diff(now(), T2)/Iter)]), + {ok, Iter} = info(test_tbl, size), + delete(test_tbl). check(List) -> Size = length(List), diff --git a/src/cache_tab_sup.erl b/src/cache_tab_sup.erl index d417c5488..a49593f5e 100644 --- a/src/cache_tab_sup.erl +++ b/src/cache_tab_sup.erl @@ -1,9 +1,28 @@ %%%------------------------------------------------------------------- %%% File : cache_tab_sup.erl %%% Author : Evgeniy Khramtsov -%%% Description : +%%% Description : Cache tables supervisor %%% %%% Created : 30 Aug 2010 by Evgeniy Khramtsov +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2010 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License +%%% along with this program; if not, write to the Free Software +%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA +%%% 02111-1307 USA +%%% %%%------------------------------------------------------------------- -module(cache_tab_sup). From c29b2fda991a4d090b0a729a41b6fb0e0022e050 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Wed, 8 Sep 2010 02:33:49 +1000 Subject: [PATCH 8/8] cache lifetime should be converted in microseconds --- src/cache_tab.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cache_tab.erl b/src/cache_tab.erl index 75ef6159a..b05e78b0e 100644 --- a/src/cache_tab.erl +++ b/src/cache_tab.erl @@ -335,11 +335,11 @@ do_setopts(#state{procs_num = N} = State, Opts) -> LifeTime = case {proplists:get_value(life_time, Opts), State#state.life_time} of {LT, _} when is_integer(LT), LT > 0 -> - LT*1000; + LT*1000*1000; {unlimited, _} -> unlimited; {_, undefined} -> - ?LIFETIME*1000; + ?LIFETIME*1000*1000; {_, LT} -> LT end,