Improve Riak pool management

This commit is contained in:
Evgeniy Khramtsov 2014-07-09 16:40:18 +04:00
parent f1d0b05db5
commit c15dc01cff
2 changed files with 137 additions and 106 deletions

View File

@ -27,7 +27,7 @@
-behaviour(gen_server).
%% API
-export([start_link/3, make_bucket/1, put/1, put/2,
-export([start_link/4, get_proc/1, make_bucket/1, put/1, put/2,
get/1, get/2, get_by_index/3, delete/1, delete/2,
count_by_index/3, get_by_index_range/4,
get_keys/1, get_keys_by_index/3,
@ -61,8 +61,14 @@
%%% API
%%%===================================================================
%% @private
start_link(Server, Port, _StartInterval) ->
gen_server:start_link(?MODULE, [Server, Port], []).
start_link(Num, Server, Port, _StartInterval) ->
gen_server:start_link({local, get_proc(Num)}, ?MODULE, [Server, Port], []).
%% @private
get_proc(I) ->
jlib:binary_to_atom(
iolist_to_binary(
[atom_to_list(?MODULE), $_, integer_to_list(I)])).
-spec make_bucket(atom()) -> binary().
%% @doc Makes a bucket from a table name
@ -101,21 +107,21 @@ put_raw(Table, Key, Value, Indexes) ->
true ->
Obj
end,
riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj1).
catch riakc_pb_socket:put(get_random_pid(), Obj1).
get_object_raw(Table, Key) ->
Bucket = make_bucket(Table),
riakc_pb_socket:get(ejabberd_riak_sup:get_random_pid(), Bucket, Key).
catch riakc_pb_socket:get(get_random_pid(), Bucket, Key).
-spec get(atom()) -> {ok, [any()]} | {error, any()}.
%% @doc Returns all objects from table `Table'
get(Table) ->
Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
ejabberd_riak_sup:get_random_pid(),
Bucket,
[{map, {modfun, riak_kv_mapreduce, map_object_value},
none, true}]) of
case catch riakc_pb_socket:mapred(
get_random_pid(),
Bucket,
[{map, {modfun, riak_kv_mapreduce, map_object_value},
none, true}]) of
{ok, [{_, Objs}]} ->
{ok, lists:flatmap(
fun(Obj) ->
@ -131,7 +137,7 @@ get(Table) ->
end, Objs)};
{error, notfound} ->
{ok, []};
Error ->
{error, _} = Error ->
Error
end.
@ -228,13 +234,13 @@ get_raw(Table, Key) ->
%% @doc Returns a list of index values
get_keys(Table) ->
Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
ejabberd_riak_sup:get_random_pid(),
Bucket,
[{map, {modfun, ?MODULE, map_key}, none, true}]) of
case catch riakc_pb_socket:mapred(
get_random_pid(),
Bucket,
[{map, {modfun, ?MODULE, map_key}, none, true}]) of
{ok, [{_, Keys}]} ->
{ok, Keys};
Error ->
{error, _} = Error ->
log_error(Error, get_keys, [{table, Table}]),
Error
end.
@ -245,13 +251,13 @@ get_keys(Table) ->
get_keys_by_index(Table, Index, Key) ->
{NewIndex, NewKey} = encode_index_key(Index, Key),
Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
ejabberd_riak_sup:get_random_pid(),
{index, Bucket, NewIndex, NewKey},
[{map, {modfun, ?MODULE, map_key}, none, true}]) of
case catch riakc_pb_socket:mapred(
get_random_pid(),
{index, Bucket, NewIndex, NewKey},
[{map, {modfun, ?MODULE, map_key}, none, true}]) of
{ok, [{_, Keys}]} ->
{ok, Keys};
Error ->
{error, _} = Error ->
log_error(Error, get_keys_by_index, [{table, Table},
{index, Index},
{key, Key}]),
@ -260,31 +266,31 @@ get_keys_by_index(Table, Index, Key) ->
%% @hidden
get_tables() ->
riakc_pb_socket:list_buckets(ejabberd_riak_sup:get_random_pid()).
catch riakc_pb_socket:list_buckets(get_random_pid()).
get_by_index_raw(Table, Index, Key) ->
Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
ejabberd_riak_sup:get_random_pid(),
get_random_pid(),
{index, Bucket, Index, Key},
[{map, {modfun, riak_kv_mapreduce, map_object_value},
none, true}]) of
{ok, [{_, Objs}]} ->
{ok, Objs};
Error ->
{error, _} = Error ->
Error
end.
get_by_index_range_raw(Table, Index, FromKey, ToKey) ->
Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
ejabberd_riak_sup:get_random_pid(),
{index, Bucket, Index, FromKey, ToKey},
[{map, {modfun, riak_kv_mapreduce, map_object_value},
none, true}]) of
case catch riakc_pb_socket:mapred(
get_random_pid(),
{index, Bucket, Index, FromKey, ToKey},
[{map, {modfun, riak_kv_mapreduce, map_object_value},
none, true}]) of
{ok, [{_, Objs}]} ->
{ok, Objs};
Error ->
{error, _} = Error ->
Error
end.
@ -292,14 +298,14 @@ get_by_index_range_raw(Table, Index, FromKey, ToKey) ->
%% @doc Returns the number of objects in the `Table'
count(Table) ->
Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
ejabberd_riak_sup:get_random_pid(),
Bucket,
[{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
none, true}]) of
case catch riakc_pb_socket:mapred(
get_random_pid(),
Bucket,
[{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
none, true}]) of
{ok, [{_, [Cnt]}]} ->
{ok, Cnt};
Error ->
{error, _} = Error ->
log_error(Error, count, [{table, Table}]),
Error
end.
@ -324,14 +330,14 @@ count_by_index(Tab, Index, Key) ->
count_by_index_raw(Table, Index, Key) ->
Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
ejabberd_riak_sup:get_random_pid(),
{index, Bucket, Index, Key},
[{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
none, true}]) of
case catch riakc_pb_socket:mapred(
get_random_pid(),
{index, Bucket, Index, Key},
[{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
none, true}]) of
{ok, [{_, [Cnt]}]} ->
{ok, Cnt};
Error ->
{error, _} = Error ->
Error
end.
@ -368,7 +374,7 @@ delete(Table, Key) when is_atom(Table) ->
delete_raw(Table, Key) ->
Bucket = make_bucket(Table),
riakc_pb_socket:delete(ejabberd_riak_sup:get_random_pid(), Bucket, Key).
catch riakc_pb_socket:delete(get_random_pid(), Bucket, Key).
-spec delete_by_index(atom(), binary(), any()) -> ok | {error, any()}.
%% @doc Deletes objects by index
@ -407,13 +413,14 @@ init([Server, Port]) ->
[auto_reconnect]) of
{ok, Pid} ->
erlang:monitor(process, Pid),
ejabberd_riak_sup:add_pid(Pid),
{ok, #state{pid = Pid}};
Err ->
{stop, Err}
end.
%% @private
handle_call(get_pid, _From, #state{pid = Pid} = State) ->
{reply, {ok, Pid}, State};
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
@ -430,8 +437,7 @@ handle_info(_Info, State) ->
{noreply, State}.
%% @private
terminate(_Reason, State) ->
ejabberd_riak_sup:remove_pid(State#state.pid),
terminate(_Reason, _State) ->
ok.
%% @private
@ -486,3 +492,14 @@ log_error(_, _, _) ->
make_invalid_object(Val) ->
list_to_binary(io_lib:fwrite("Invalid object: ~p", [Val])).
get_random_pid() ->
PoolPid = ejabberd_riak_sup:get_random_pid(),
case catch gen_server:call(PoolPid, get_pid) of
{ok, Pid} ->
Pid;
{'EXIT', {timeout, _}} ->
throw({error, timeout});
{'EXIT', Err} ->
throw({error, Err})
end.

View File

@ -31,34 +31,52 @@
-export([start/0,
start_link/0,
init/1,
add_pid/1,
remove_pid/1,
get_pids/0,
get_random_pid/0
transform_options/1,
get_random_pid/0,
get_random_pid/1
]).
-include("ejabberd.hrl").
-include("logger.hrl").
-define(DEFAULT_POOL_SIZE, 10).
-define(DEFAULT_RIAK_START_INTERVAL, 30). % 30 seconds
-define(DEFAULT_RIAK_HOST, "127.0.0.1").
-define(DEFAULT_RIAK_PORT, 8087).
% time to wait for the supervisor to start its child before returning
% a timeout error to the request
-define(CONNECT_TIMEOUT, 500). % milliseconds
-record(riak_pool, {undefined, pid}).
start() ->
StartRiak = ejabberd_config:get_local_option(
riak_server, fun(_) -> true end, false),
if
StartRiak ->
case lists:any(
fun(Host) ->
is_riak_configured(Host)
end, ?MYHOSTS) of
true ->
ejabberd:start_app(riakc),
do_start();
true ->
ok
false ->
ok
end.
is_riak_configured(Host) ->
ServerConfigured = ejabberd_config:get_option(
{riak_server, Host},
fun(_) -> true end, false),
PortConfigured = ejabberd_config:get_option(
{riak_port, Host},
fun(_) -> true end, false),
Modules = ejabberd_config:get_option(
{modules, Host},
fun(L) when is_list(L) -> L end, []),
ModuleWithRiakDBConfigured = lists:any(
fun({_Module, Opts}) ->
gen_mod:db_type(Opts) == riak
end, Modules),
ServerConfigured or PortConfigured or ModuleWithRiakDBConfigured.
do_start() ->
SupervisorName = ?MODULE,
ChildSpec =
@ -79,65 +97,61 @@ do_start() ->
end.
start_link() ->
mnesia:create_table(riak_pool,
[{ram_copies, [node()]},
{type, bag},
{local_content, true},
{attributes, record_info(fields, riak_pool)}]),
mnesia:add_table_copy(riak_pool, node(), ram_copies),
F = fun() ->
mnesia:delete({riak_pool, undefined})
end,
mnesia:ets(F),
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
PoolSize =
ejabberd_config:get_local_option(
riak_pool_size,
fun(N) when is_integer(N), N >= 1 -> N end,
?DEFAULT_POOL_SIZE),
StartInterval =
ejabberd_config:get_local_option(
riak_start_interval,
fun(N) when is_integer(N), N >= 1 -> N end,
?DEFAULT_RIAK_START_INTERVAL),
{Server, Port} =
ejabberd_config:get_local_option(
riak_server,
fun({S, P}) when is_integer(P), P > 0, P < 65536 ->
{binary_to_list(iolist_to_binary(S)), P}
end, {"127.0.0.1", 8081}),
PoolSize = get_pool_size(),
StartInterval = get_start_interval(),
Server = get_riak_server(),
Port = get_riak_port(),
{ok, {{one_for_one, PoolSize*10, 1},
lists:map(
fun(I) ->
{I,
{ejabberd_riak:get_proc(I),
{ejabberd_riak, start_link,
[Server, Port, StartInterval*1000]},
transient,
2000,
worker,
[?MODULE]}
[I, Server, Port, StartInterval*1000]},
transient, 2000, worker, [?MODULE]}
end, lists:seq(1, PoolSize))}}.
get_start_interval() ->
ejabberd_config:get_option(
riak_start_interval,
fun(N) when is_integer(N), N >= 1 -> N end,
?DEFAULT_RIAK_START_INTERVAL).
get_pool_size() ->
ejabberd_config:get_option(
riak_pool_size,
fun(N) when is_integer(N), N >= 1 -> N end,
?DEFAULT_POOL_SIZE).
get_riak_server() ->
ejabberd_config:get_option(
riak_server,
fun(S) ->
binary_to_list(iolist_to_binary(S))
end, ?DEFAULT_RIAK_HOST).
get_riak_port() ->
ejabberd_config:get_option(
riak_port,
fun(P) when is_integer(P), P > 0, P < 65536 -> P end,
?DEFAULT_RIAK_PORT).
get_pids() ->
Rs = mnesia:dirty_read(riak_pool, undefined),
[R#riak_pool.pid || R <- Rs].
[ejabberd_riak:get_proc(I) || I <- lists:seq(1, get_pool_size())].
get_random_pid() ->
Pids = get_pids(),
lists:nth(erlang:phash(now(), length(Pids)), Pids).
get_random_pid(now()).
add_pid(Pid) ->
F = fun() ->
mnesia:write(
#riak_pool{pid = Pid})
end,
mnesia:ets(F).
get_random_pid(Term) ->
I = erlang:phash2(Term, get_pool_size()) + 1,
ejabberd_riak:get_proc(I).
remove_pid(Pid) ->
F = fun() ->
mnesia:delete_object(
#riak_pool{pid = Pid})
end,
mnesia:ets(F).
transform_options(Opts) ->
lists:foldl(fun transform_options/2, [], Opts).
transform_options({riak_server, {S, P}}, Opts) ->
[{riak_server, S}, {riak_port, P}|Opts];
transform_options(Opt, Opts) ->
[Opt|Opts].