From c15dc01cffecccc3fd626acc15460dbf2f95a1bc Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Wed, 9 Jul 2014 16:40:18 +0400 Subject: [PATCH] Improve Riak pool management --- src/ejabberd_riak.erl | 109 ++++++++++++++++++------------- src/ejabberd_riak_sup.erl | 134 +++++++++++++++++++++----------------- 2 files changed, 137 insertions(+), 106 deletions(-) diff --git a/src/ejabberd_riak.erl b/src/ejabberd_riak.erl index 04ff1ea11..0b576ca61 100644 --- a/src/ejabberd_riak.erl +++ b/src/ejabberd_riak.erl @@ -27,7 +27,7 @@ -behaviour(gen_server). %% API --export([start_link/3, make_bucket/1, put/1, put/2, +-export([start_link/4, get_proc/1, make_bucket/1, put/1, put/2, get/1, get/2, get_by_index/3, delete/1, delete/2, count_by_index/3, get_by_index_range/4, get_keys/1, get_keys_by_index/3, @@ -61,8 +61,14 @@ %%% API %%%=================================================================== %% @private -start_link(Server, Port, _StartInterval) -> - gen_server:start_link(?MODULE, [Server, Port], []). +start_link(Num, Server, Port, _StartInterval) -> + gen_server:start_link({local, get_proc(Num)}, ?MODULE, [Server, Port], []). + +%% @private +get_proc(I) -> + jlib:binary_to_atom( + iolist_to_binary( + [atom_to_list(?MODULE), $_, integer_to_list(I)])). -spec make_bucket(atom()) -> binary(). %% @doc Makes a bucket from a table name @@ -101,21 +107,21 @@ put_raw(Table, Key, Value, Indexes) -> true -> Obj end, - riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj1). + catch riakc_pb_socket:put(get_random_pid(), Obj1). get_object_raw(Table, Key) -> Bucket = make_bucket(Table), - riakc_pb_socket:get(ejabberd_riak_sup:get_random_pid(), Bucket, Key). + catch riakc_pb_socket:get(get_random_pid(), Bucket, Key). -spec get(atom()) -> {ok, [any()]} | {error, any()}. %% @doc Returns all objects from table `Table' get(Table) -> Bucket = make_bucket(Table), - case riakc_pb_socket:mapred( - ejabberd_riak_sup:get_random_pid(), - Bucket, - [{map, {modfun, riak_kv_mapreduce, map_object_value}, - none, true}]) of + case catch riakc_pb_socket:mapred( + get_random_pid(), + Bucket, + [{map, {modfun, riak_kv_mapreduce, map_object_value}, + none, true}]) of {ok, [{_, Objs}]} -> {ok, lists:flatmap( fun(Obj) -> @@ -131,7 +137,7 @@ get(Table) -> end, Objs)}; {error, notfound} -> {ok, []}; - Error -> + {error, _} = Error -> Error end. @@ -228,13 +234,13 @@ get_raw(Table, Key) -> %% @doc Returns a list of index values get_keys(Table) -> Bucket = make_bucket(Table), - case riakc_pb_socket:mapred( - ejabberd_riak_sup:get_random_pid(), - Bucket, - [{map, {modfun, ?MODULE, map_key}, none, true}]) of + case catch riakc_pb_socket:mapred( + get_random_pid(), + Bucket, + [{map, {modfun, ?MODULE, map_key}, none, true}]) of {ok, [{_, Keys}]} -> {ok, Keys}; - Error -> + {error, _} = Error -> log_error(Error, get_keys, [{table, Table}]), Error end. @@ -245,13 +251,13 @@ get_keys(Table) -> get_keys_by_index(Table, Index, Key) -> {NewIndex, NewKey} = encode_index_key(Index, Key), Bucket = make_bucket(Table), - case riakc_pb_socket:mapred( - ejabberd_riak_sup:get_random_pid(), - {index, Bucket, NewIndex, NewKey}, - [{map, {modfun, ?MODULE, map_key}, none, true}]) of + case catch riakc_pb_socket:mapred( + get_random_pid(), + {index, Bucket, NewIndex, NewKey}, + [{map, {modfun, ?MODULE, map_key}, none, true}]) of {ok, [{_, Keys}]} -> {ok, Keys}; - Error -> + {error, _} = Error -> log_error(Error, get_keys_by_index, [{table, Table}, {index, Index}, {key, Key}]), @@ -260,31 +266,31 @@ get_keys_by_index(Table, Index, Key) -> %% @hidden get_tables() -> - riakc_pb_socket:list_buckets(ejabberd_riak_sup:get_random_pid()). + catch riakc_pb_socket:list_buckets(get_random_pid()). get_by_index_raw(Table, Index, Key) -> Bucket = make_bucket(Table), case riakc_pb_socket:mapred( - ejabberd_riak_sup:get_random_pid(), + get_random_pid(), {index, Bucket, Index, Key}, [{map, {modfun, riak_kv_mapreduce, map_object_value}, none, true}]) of {ok, [{_, Objs}]} -> {ok, Objs}; - Error -> + {error, _} = Error -> Error end. get_by_index_range_raw(Table, Index, FromKey, ToKey) -> Bucket = make_bucket(Table), - case riakc_pb_socket:mapred( - ejabberd_riak_sup:get_random_pid(), - {index, Bucket, Index, FromKey, ToKey}, - [{map, {modfun, riak_kv_mapreduce, map_object_value}, - none, true}]) of + case catch riakc_pb_socket:mapred( + get_random_pid(), + {index, Bucket, Index, FromKey, ToKey}, + [{map, {modfun, riak_kv_mapreduce, map_object_value}, + none, true}]) of {ok, [{_, Objs}]} -> {ok, Objs}; - Error -> + {error, _} = Error -> Error end. @@ -292,14 +298,14 @@ get_by_index_range_raw(Table, Index, FromKey, ToKey) -> %% @doc Returns the number of objects in the `Table' count(Table) -> Bucket = make_bucket(Table), - case riakc_pb_socket:mapred( - ejabberd_riak_sup:get_random_pid(), - Bucket, - [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs}, - none, true}]) of + case catch riakc_pb_socket:mapred( + get_random_pid(), + Bucket, + [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs}, + none, true}]) of {ok, [{_, [Cnt]}]} -> {ok, Cnt}; - Error -> + {error, _} = Error -> log_error(Error, count, [{table, Table}]), Error end. @@ -324,14 +330,14 @@ count_by_index(Tab, Index, Key) -> count_by_index_raw(Table, Index, Key) -> Bucket = make_bucket(Table), - case riakc_pb_socket:mapred( - ejabberd_riak_sup:get_random_pid(), - {index, Bucket, Index, Key}, - [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs}, - none, true}]) of + case catch riakc_pb_socket:mapred( + get_random_pid(), + {index, Bucket, Index, Key}, + [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs}, + none, true}]) of {ok, [{_, [Cnt]}]} -> {ok, Cnt}; - Error -> + {error, _} = Error -> Error end. @@ -368,7 +374,7 @@ delete(Table, Key) when is_atom(Table) -> delete_raw(Table, Key) -> Bucket = make_bucket(Table), - riakc_pb_socket:delete(ejabberd_riak_sup:get_random_pid(), Bucket, Key). + catch riakc_pb_socket:delete(get_random_pid(), Bucket, Key). -spec delete_by_index(atom(), binary(), any()) -> ok | {error, any()}. %% @doc Deletes objects by index @@ -407,13 +413,14 @@ init([Server, Port]) -> [auto_reconnect]) of {ok, Pid} -> erlang:monitor(process, Pid), - ejabberd_riak_sup:add_pid(Pid), {ok, #state{pid = Pid}}; Err -> {stop, Err} end. %% @private +handle_call(get_pid, _From, #state{pid = Pid} = State) -> + {reply, {ok, Pid}, State}; handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. @@ -430,8 +437,7 @@ handle_info(_Info, State) -> {noreply, State}. %% @private -terminate(_Reason, State) -> - ejabberd_riak_sup:remove_pid(State#state.pid), +terminate(_Reason, _State) -> ok. %% @private @@ -486,3 +492,14 @@ log_error(_, _, _) -> make_invalid_object(Val) -> list_to_binary(io_lib:fwrite("Invalid object: ~p", [Val])). + +get_random_pid() -> + PoolPid = ejabberd_riak_sup:get_random_pid(), + case catch gen_server:call(PoolPid, get_pid) of + {ok, Pid} -> + Pid; + {'EXIT', {timeout, _}} -> + throw({error, timeout}); + {'EXIT', Err} -> + throw({error, Err}) + end. diff --git a/src/ejabberd_riak_sup.erl b/src/ejabberd_riak_sup.erl index d19b9fbe9..a066a3c8c 100644 --- a/src/ejabberd_riak_sup.erl +++ b/src/ejabberd_riak_sup.erl @@ -31,34 +31,52 @@ -export([start/0, start_link/0, init/1, - add_pid/1, - remove_pid/1, get_pids/0, - get_random_pid/0 + transform_options/1, + get_random_pid/0, + get_random_pid/1 ]). -include("ejabberd.hrl"). +-include("logger.hrl"). -define(DEFAULT_POOL_SIZE, 10). -define(DEFAULT_RIAK_START_INTERVAL, 30). % 30 seconds +-define(DEFAULT_RIAK_HOST, "127.0.0.1"). +-define(DEFAULT_RIAK_PORT, 8087). % time to wait for the supervisor to start its child before returning % a timeout error to the request -define(CONNECT_TIMEOUT, 500). % milliseconds - --record(riak_pool, {undefined, pid}). - start() -> - StartRiak = ejabberd_config:get_local_option( - riak_server, fun(_) -> true end, false), - if - StartRiak -> + case lists:any( + fun(Host) -> + is_riak_configured(Host) + end, ?MYHOSTS) of + true -> + ejabberd:start_app(riakc), do_start(); - true -> - ok + false -> + ok end. +is_riak_configured(Host) -> + ServerConfigured = ejabberd_config:get_option( + {riak_server, Host}, + fun(_) -> true end, false), + PortConfigured = ejabberd_config:get_option( + {riak_port, Host}, + fun(_) -> true end, false), + Modules = ejabberd_config:get_option( + {modules, Host}, + fun(L) when is_list(L) -> L end, []), + ModuleWithRiakDBConfigured = lists:any( + fun({_Module, Opts}) -> + gen_mod:db_type(Opts) == riak + end, Modules), + ServerConfigured or PortConfigured or ModuleWithRiakDBConfigured. + do_start() -> SupervisorName = ?MODULE, ChildSpec = @@ -79,65 +97,61 @@ do_start() -> end. start_link() -> - mnesia:create_table(riak_pool, - [{ram_copies, [node()]}, - {type, bag}, - {local_content, true}, - {attributes, record_info(fields, riak_pool)}]), - mnesia:add_table_copy(riak_pool, node(), ram_copies), - F = fun() -> - mnesia:delete({riak_pool, undefined}) - end, - mnesia:ets(F), supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - PoolSize = - ejabberd_config:get_local_option( - riak_pool_size, - fun(N) when is_integer(N), N >= 1 -> N end, - ?DEFAULT_POOL_SIZE), - StartInterval = - ejabberd_config:get_local_option( - riak_start_interval, - fun(N) when is_integer(N), N >= 1 -> N end, - ?DEFAULT_RIAK_START_INTERVAL), - {Server, Port} = - ejabberd_config:get_local_option( - riak_server, - fun({S, P}) when is_integer(P), P > 0, P < 65536 -> - {binary_to_list(iolist_to_binary(S)), P} - end, {"127.0.0.1", 8081}), + PoolSize = get_pool_size(), + StartInterval = get_start_interval(), + Server = get_riak_server(), + Port = get_riak_port(), {ok, {{one_for_one, PoolSize*10, 1}, lists:map( fun(I) -> - {I, + {ejabberd_riak:get_proc(I), {ejabberd_riak, start_link, - [Server, Port, StartInterval*1000]}, - transient, - 2000, - worker, - [?MODULE]} + [I, Server, Port, StartInterval*1000]}, + transient, 2000, worker, [?MODULE]} end, lists:seq(1, PoolSize))}}. +get_start_interval() -> + ejabberd_config:get_option( + riak_start_interval, + fun(N) when is_integer(N), N >= 1 -> N end, + ?DEFAULT_RIAK_START_INTERVAL). + +get_pool_size() -> + ejabberd_config:get_option( + riak_pool_size, + fun(N) when is_integer(N), N >= 1 -> N end, + ?DEFAULT_POOL_SIZE). + +get_riak_server() -> + ejabberd_config:get_option( + riak_server, + fun(S) -> + binary_to_list(iolist_to_binary(S)) + end, ?DEFAULT_RIAK_HOST). + +get_riak_port() -> + ejabberd_config:get_option( + riak_port, + fun(P) when is_integer(P), P > 0, P < 65536 -> P end, + ?DEFAULT_RIAK_PORT). + get_pids() -> - Rs = mnesia:dirty_read(riak_pool, undefined), - [R#riak_pool.pid || R <- Rs]. + [ejabberd_riak:get_proc(I) || I <- lists:seq(1, get_pool_size())]. get_random_pid() -> - Pids = get_pids(), - lists:nth(erlang:phash(now(), length(Pids)), Pids). + get_random_pid(now()). -add_pid(Pid) -> - F = fun() -> - mnesia:write( - #riak_pool{pid = Pid}) - end, - mnesia:ets(F). +get_random_pid(Term) -> + I = erlang:phash2(Term, get_pool_size()) + 1, + ejabberd_riak:get_proc(I). -remove_pid(Pid) -> - F = fun() -> - mnesia:delete_object( - #riak_pool{pid = Pid}) - end, - mnesia:ets(F). +transform_options(Opts) -> + lists:foldl(fun transform_options/2, [], Opts). + +transform_options({riak_server, {S, P}}, Opts) -> + [{riak_server, S}, {riak_port, P}|Opts]; +transform_options(Opt, Opts) -> + [Opt|Opts].