mirror of
https://github.com/processone/ejabberd.git
synced 2024-09-09 13:36:03 +02:00
Remove Riak support
Reasons: - Riak DB development is almost halted after Basho - riak-erlang-client is abandoned and doesn't work correctly with OTP22 - Riak is slow in comparison to other databases - Missing key ordering makes it impossible to implement range queries efficiently (e.g. MAM queries)
This commit is contained in:
parent
11e4b9d882
commit
3f7d9e3ad6
@ -44,7 +44,7 @@ before_script:
|
||||
|
||||
script:
|
||||
- ./autogen.sh
|
||||
- ./configure --prefix=/tmp/ejabberd --enable-all --disable-odbc --disable-riak --disable-elixir
|
||||
- ./configure --prefix=/tmp/ejabberd --enable-all --disable-odbc --disable-elixir
|
||||
- make
|
||||
- make install -s
|
||||
- make xref
|
||||
|
17
configure.ac
17
configure.ac
@ -109,10 +109,10 @@ AC_ARG_ENABLE(mssql,
|
||||
esac],[db_type=generic])
|
||||
|
||||
AC_ARG_ENABLE(all,
|
||||
[AC_HELP_STRING([--enable-all], [same as --enable-odbc --enable-mysql --enable-pgsql --enable-sqlite --enable-pam --enable-zlib --enable-riak --enable-redis --enable-elixir --enable-stun --enable-sip --enable-debug --enable-tools (useful for Dialyzer checks, default: no)])],
|
||||
[AC_HELP_STRING([--enable-all], [same as --enable-odbc --enable-mysql --enable-pgsql --enable-sqlite --enable-pam --enable-zlib --enable-redis --enable-elixir --enable-stun --enable-sip --enable-debug --enable-tools (useful for Dialyzer checks, default: no)])],
|
||||
[case "${enableval}" in
|
||||
yes) odbc=true mysql=true pgsql=true sqlite=true pam=true zlib=true riak=true redis=true elixir=true stun=true sip=true debug=true tools=true ;;
|
||||
no) odbc=false mysql=false pgsql=false sqlite=false pam=false zlib=false riak=false redis=false elixir=false stun=false sip=false debug=false tools=false ;;
|
||||
yes) odbc=true mysql=true pgsql=true sqlite=true pam=true zlib=true redis=true elixir=true stun=true sip=true debug=true tools=true ;;
|
||||
no) odbc=false mysql=false pgsql=false sqlite=false pam=false zlib=false redis=false elixir=false stun=false sip=false debug=false tools=false ;;
|
||||
*) AC_MSG_ERROR(bad value ${enableval} for --enable-all) ;;
|
||||
esac],[])
|
||||
|
||||
@ -172,14 +172,6 @@ AC_ARG_ENABLE(zlib,
|
||||
*) AC_MSG_ERROR(bad value ${enableval} for --enable-zlib) ;;
|
||||
esac],[if test "x$zlib" = "x"; then zlib=true; fi])
|
||||
|
||||
AC_ARG_ENABLE(riak,
|
||||
[AC_HELP_STRING([--enable-riak], [enable Riak support (default: no)])],
|
||||
[case "${enableval}" in
|
||||
yes) riak=true ;;
|
||||
no) riak=false ;;
|
||||
*) AC_MSG_ERROR(bad value ${enableval} for --enable-riak) ;;
|
||||
esac],[if test "x$riak" = "x"; then riak=false; fi])
|
||||
|
||||
AC_ARG_ENABLE(redis,
|
||||
[AC_HELP_STRING([--enable-redis], [enable Redis support (default: no)])],
|
||||
[case "${enableval}" in
|
||||
@ -275,7 +267,7 @@ if test "$sqlite" = "true"; then
|
||||
fi
|
||||
|
||||
enabled_backends=""
|
||||
for backend in odbc mysql pgsql sqlite riak redis; do
|
||||
for backend in odbc mysql pgsql sqlite redis; do
|
||||
if eval test x\${$backend} = xtrue; then
|
||||
if test "x$enabled_backends" = "x"; then
|
||||
enabled_backends=$backend
|
||||
@ -296,7 +288,6 @@ AC_SUBST(pgsql)
|
||||
AC_SUBST(sqlite)
|
||||
AC_SUBST(pam)
|
||||
AC_SUBST(zlib)
|
||||
AC_SUBST(riak)
|
||||
AC_SUBST(redis)
|
||||
AC_SUBST(elixir)
|
||||
AC_SUBST(stun)
|
||||
|
1
mix.exs
1
mix.exs
@ -107,7 +107,6 @@ defmodule Ejabberd.Mixfile do
|
||||
|
||||
defp cond_deps do
|
||||
for {:true, dep} <- [{config(:sqlite), {:sqlite3, "~> 1.1"}},
|
||||
{config(:riak), {:riakc, "~> 2.4"}},
|
||||
{config(:redis), {:eredis, "~> 1.0"}},
|
||||
{config(:zlib), {:ezlib, "~> 1.0"}},
|
||||
{config(:pam), {:epam, "~> 1.0"}},
|
||||
|
2
mix.lock
2
mix.lock
@ -28,8 +28,6 @@
|
||||
"p1_pgsql": {:hex, :p1_pgsql, "1.1.7", "ef64d34adbbe08258cc10b1532649446d8c086ff8663d44f430d837ec31a89f8", [:rebar3], [], "hexpm"},
|
||||
"p1_utils": {:hex, :p1_utils, "1.0.15", "731f76ae1f31f4554afb2ae629cb5589d53bd13efc72b11f5a7c3b1242f91046", [:rebar3], [], "hexpm"},
|
||||
"pkix": {:hex, :pkix, "1.0.2", "f618455c4edbcc7ebf8be5e655b269876fa36e890b806d18d8b2b708dfb1e583", [:rebar3], [], "hexpm"},
|
||||
"riak_pb": {:hex, :riak_pb, "2.3.2", "48ffbf66dbb3f136ab9a7134bac4e496754baa5ef58c4f50a61326736d996390", [:make, :mix, :rebar3], [{:hamcrest, "~> 0.4.1", [hex: :basho_hamcrest, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
"riakc": {:hex, :riakc, "2.5.3", "6132d9e687a0dfd314b2b24c4594302ca8b55568a5d733c491d8fb6cd4004763", [:make, :mix, :rebar3], [{:riak_pb, "~> 2.3", [hex: :riak_pb, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
"sqlite3": {:hex, :sqlite3, "1.1.6", "4ea71af0b45908b5f02c9b09e4c87177039ef404f20accb35049cd8924cc417c", [:rebar3], [], "hexpm"},
|
||||
"stringprep": {:hex, :stringprep, "1.0.16", "5a7e617cabba5791aed45b394307d46b9f22ac2eef3bbcf6a4b639637079c84d", [:rebar3], [{:p1_utils, "1.0.15", [hex: :p1_utils, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
"stun": {:hex, :stun, "1.0.28", "ee81bc075f955f529679213157f76c3d3355a1dec4625108a62872006c3db8a0", [:rebar3], [{:fast_tls, "1.1.1", [hex: :fast_tls, repo: "hexpm", optional: false]}, {:p1_utils, "1.0.15", [hex: :p1_utils, repo: "hexpm", optional: false]}], "hexpm"},
|
||||
|
@ -45,8 +45,6 @@
|
||||
{tag, "1.0.6"}}}},
|
||||
{if_var_true, zlib, {ezlib, ".*", {git, "https://github.com/processone/ezlib",
|
||||
{tag, "1.0.6"}}}},
|
||||
{if_var_true, riak, {riakc, ".*", {git, "https://github.com/processone/riak-erlang-client",
|
||||
{tag, {if_version_above, "19", "develop", "2.5.3"}}}}},
|
||||
%% Elixir support, needed to run tests
|
||||
{if_var_true, elixir, {elixir, ".*", {git, "https://github.com/elixir-lang/elixir",
|
||||
{tag, {if_version_above, "17", "v1.4.4", "v1.1.1"}}}}},
|
||||
@ -129,8 +127,6 @@
|
||||
{if_var_false, mysql, "(\".*mysql.*\":_/_)"},
|
||||
{if_var_false, pgsql, "(\".*pgsql.*\":_/_)"},
|
||||
{if_var_false, pam, "(\"epam\":_/_)"},
|
||||
{if_var_false, riak, "(\"riak.*\":_/_)"},
|
||||
{if_var_true, riak, "(\"riak_object\":_/_)"},
|
||||
{if_var_false, zlib, "(\"ezlib\":_/_)"},
|
||||
{if_var_false, http, "(\"lhttpc\":_/_)"},
|
||||
{if_var_false, odbc, "(\"odbc\":_/_)"},
|
||||
|
@ -278,7 +278,7 @@ get_commands_spec() ->
|
||||
args_example = ["example.com"],
|
||||
args = [{host, binary}], result = {res, rescode}},
|
||||
|
||||
#ejabberd_commands{name = import_prosody, tags = [mnesia, sql, riak],
|
||||
#ejabberd_commands{name = import_prosody, tags = [mnesia, sql],
|
||||
desc = "Import data from Prosody",
|
||||
longdesc = "Note: this method requires ejabberd compiled with optional tools support "
|
||||
"and package must provide optional luerl dependency.",
|
||||
|
@ -901,7 +901,5 @@ import_start(_LServer, _) ->
|
||||
|
||||
import(Server, {sql, _}, mnesia, <<"users">>, Fields) ->
|
||||
ejabberd_auth_mnesia:import(Server, Fields);
|
||||
import(Server, {sql, _}, riak, <<"users">>, Fields) ->
|
||||
ejabberd_auth_riak:import(Server, Fields);
|
||||
import(_LServer, {sql, _}, sql, <<"users">>, _) ->
|
||||
ok.
|
||||
|
@ -1,127 +0,0 @@
|
||||
%%%----------------------------------------------------------------------
|
||||
%%% File : ejabberd_auth_riak.erl
|
||||
%%% Author : Evgeniy Khramtsov <ekhramtsov@process-one.net>
|
||||
%%% Purpose : Authentication via Riak
|
||||
%%% Created : 12 Nov 2012 by Evgeniy Khramtsov <ekhramtsov@process-one.net>
|
||||
%%%
|
||||
%%%
|
||||
%%% ejabberd, Copyright (C) 2002-2019 ProcessOne
|
||||
%%%
|
||||
%%% This program is free software; you can redistribute it and/or
|
||||
%%% modify it under the terms of the GNU General Public License as
|
||||
%%% published by the Free Software Foundation; either version 2 of the
|
||||
%%% License, or (at your option) any later version.
|
||||
%%%
|
||||
%%% This program is distributed in the hope that it will be useful,
|
||||
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
%%% General Public License for more details.
|
||||
%%%
|
||||
%%% You should have received a copy of the GNU General Public License along
|
||||
%%% with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
%%%
|
||||
%%%----------------------------------------------------------------------
|
||||
|
||||
-module(ejabberd_auth_riak).
|
||||
|
||||
-author('alexey@process-one.net').
|
||||
|
||||
-behaviour(ejabberd_auth).
|
||||
|
||||
%% External exports
|
||||
-export([start/1, stop/1, set_password/3, try_register/3,
|
||||
get_users/2, count_users/2,
|
||||
get_password/2, remove_user/2, store_type/1, export/1, import/2,
|
||||
plain_password_required/1]).
|
||||
-export([passwd_schema/0]).
|
||||
|
||||
-include("ejabberd_sql_pt.hrl").
|
||||
-include("scram.hrl").
|
||||
-include("ejabberd_auth.hrl").
|
||||
|
||||
start(_Host) ->
|
||||
ok.
|
||||
|
||||
stop(_Host) ->
|
||||
ok.
|
||||
|
||||
plain_password_required(Server) ->
|
||||
store_type(Server) == scram.
|
||||
|
||||
store_type(Server) ->
|
||||
ejabberd_auth:password_format(Server).
|
||||
|
||||
passwd_schema() ->
|
||||
{record_info(fields, passwd), #passwd{}}.
|
||||
|
||||
set_password(User, Server, Password) ->
|
||||
case ejabberd_riak:put(#passwd{us = {User, Server}, password = Password},
|
||||
passwd_schema(),
|
||||
[{'2i', [{<<"host">>, Server}]}]) of
|
||||
ok -> {cache, {ok, Password}};
|
||||
{error, _} -> {nocache, {error, db_failure}}
|
||||
end.
|
||||
|
||||
try_register(User, Server, Password) ->
|
||||
US = {User, Server},
|
||||
case ejabberd_riak:get(passwd, passwd_schema(), US) of
|
||||
{error, notfound} ->
|
||||
case ejabberd_riak:put(#passwd{us = US, password = Password},
|
||||
passwd_schema(),
|
||||
[{'2i', [{<<"host">>, Server}]}]) of
|
||||
ok -> {cache, {ok, Password}};
|
||||
{error, _} -> {nocache, {error, db_failure}}
|
||||
end;
|
||||
{ok, _} ->
|
||||
{cache, {error, exists}};
|
||||
{error, _} ->
|
||||
{nocache, {error, db_failure}}
|
||||
end.
|
||||
|
||||
get_users(Server, _) ->
|
||||
case ejabberd_riak:get_keys_by_index(passwd, <<"host">>, Server) of
|
||||
{ok, Users} ->
|
||||
Users;
|
||||
_ ->
|
||||
[]
|
||||
end.
|
||||
|
||||
count_users(Server, _) ->
|
||||
case ejabberd_riak:count_by_index(passwd, <<"host">>, Server) of
|
||||
{ok, N} ->
|
||||
N;
|
||||
_ ->
|
||||
0
|
||||
end.
|
||||
|
||||
get_password(User, Server) ->
|
||||
case ejabberd_riak:get(passwd, passwd_schema(), {User, Server}) of
|
||||
{ok, Password} ->
|
||||
{cache, {ok, Password}};
|
||||
{error, notfound} ->
|
||||
{cache, error};
|
||||
{error, _} ->
|
||||
{nocache, error}
|
||||
end.
|
||||
|
||||
remove_user(User, Server) ->
|
||||
ejabberd_riak:delete(passwd, {User, Server}).
|
||||
|
||||
export(_Server) ->
|
||||
[{passwd,
|
||||
fun(Host, #passwd{us = {LUser, LServer}, password = Password})
|
||||
when LServer == Host ->
|
||||
[?SQL("delete from users where username=%(LUser)s and %(LServer)H;"),
|
||||
?SQL_INSERT(
|
||||
"users",
|
||||
["username=%(LUser)s",
|
||||
"server_host=%(LServer)s",
|
||||
"password=%(Password)s"])];
|
||||
(_Host, _R) ->
|
||||
[]
|
||||
end}].
|
||||
|
||||
import(LServer, [LUser, Password, _TimeStamp]) ->
|
||||
Passwd = #passwd{us = {LUser, LServer}, password = Password},
|
||||
ejabberd_riak:put(Passwd, passwd_schema(), [{'2i', [{<<"host">>, LServer}]}]).
|
@ -60,8 +60,7 @@ transform(Opts) ->
|
||||
Opts1 = transform_register(Opts),
|
||||
Opts2 = transform_s2s(Opts1),
|
||||
Opts3 = transform_listeners(Opts2),
|
||||
Opts4 = transform_sql(Opts3),
|
||||
Opts5 = transform_riak(Opts4),
|
||||
Opts5 = transform_sql(Opts3),
|
||||
Opts6 = transform_shaper(Opts5),
|
||||
Opts7 = transform_s2s_out(Opts6),
|
||||
Opts8 = transform_acl(Opts7),
|
||||
@ -412,17 +411,6 @@ transform_sql({odbc_pool_size, N}, Opts) ->
|
||||
transform_sql(Opt, Opts) ->
|
||||
[Opt|Opts].
|
||||
|
||||
%%%===================================================================
|
||||
%%% Riak
|
||||
%%%===================================================================
|
||||
transform_riak(Opts) ->
|
||||
lists:foldl(fun transform_riak/2, [], Opts).
|
||||
|
||||
transform_riak({riak_server, {S, P}}, Opts) ->
|
||||
[{riak_server, S}, {riak_port, P}|Opts];
|
||||
transform_riak(Opt, Opts) ->
|
||||
[Opt|Opts].
|
||||
|
||||
%%%===================================================================
|
||||
%%% modules
|
||||
%%%===================================================================
|
||||
|
@ -105,13 +105,6 @@
|
||||
-export([redis_server/0]).
|
||||
-export([registration_timeout/0]).
|
||||
-export([resource_conflict/0, resource_conflict/1]).
|
||||
-export([riak_cacertfile/0]).
|
||||
-export([riak_password/0]).
|
||||
-export([riak_pool_size/0]).
|
||||
-export([riak_port/0]).
|
||||
-export([riak_server/0]).
|
||||
-export([riak_start_interval/0]).
|
||||
-export([riak_username/0]).
|
||||
-export([router_cache_life_time/0]).
|
||||
-export([router_cache_missed/0]).
|
||||
-export([router_cache_size/0]).
|
||||
@ -327,17 +320,17 @@ cluster_backend() ->
|
||||
cluster_nodes() ->
|
||||
ejabberd_config:get_option({cluster_nodes, global}).
|
||||
|
||||
-spec default_db() -> 'mnesia' | 'riak' | 'sql'.
|
||||
-spec default_db() -> 'mnesia' | 'sql'.
|
||||
default_db() ->
|
||||
default_db(global).
|
||||
-spec default_db(global | binary()) -> 'mnesia' | 'riak' | 'sql'.
|
||||
-spec default_db(global | binary()) -> 'mnesia' | 'sql'.
|
||||
default_db(Host) ->
|
||||
ejabberd_config:get_option({default_db, Host}).
|
||||
|
||||
-spec default_ram_db() -> 'mnesia' | 'redis' | 'riak' | 'sql'.
|
||||
-spec default_ram_db() -> 'mnesia' | 'redis' | 'sql'.
|
||||
default_ram_db() ->
|
||||
default_ram_db(global).
|
||||
-spec default_ram_db(global | binary()) -> 'mnesia' | 'redis' | 'riak' | 'sql'.
|
||||
-spec default_ram_db(global | binary()) -> 'mnesia' | 'redis' | 'sql'.
|
||||
default_ram_db(Host) ->
|
||||
ejabberd_config:get_option({default_ram_db, Host}).
|
||||
|
||||
@ -735,34 +728,6 @@ resource_conflict() ->
|
||||
resource_conflict(Host) ->
|
||||
ejabberd_config:get_option({resource_conflict, Host}).
|
||||
|
||||
-spec riak_cacertfile() -> 'nil' | string().
|
||||
riak_cacertfile() ->
|
||||
ejabberd_config:get_option({riak_cacertfile, global}).
|
||||
|
||||
-spec riak_password() -> 'nil' | string().
|
||||
riak_password() ->
|
||||
ejabberd_config:get_option({riak_password, global}).
|
||||
|
||||
-spec riak_pool_size() -> pos_integer().
|
||||
riak_pool_size() ->
|
||||
ejabberd_config:get_option({riak_pool_size, global}).
|
||||
|
||||
-spec riak_port() -> 1..1114111.
|
||||
riak_port() ->
|
||||
ejabberd_config:get_option({riak_port, global}).
|
||||
|
||||
-spec riak_server() -> string().
|
||||
riak_server() ->
|
||||
ejabberd_config:get_option({riak_server, global}).
|
||||
|
||||
-spec riak_start_interval() -> pos_integer().
|
||||
riak_start_interval() ->
|
||||
ejabberd_config:get_option({riak_start_interval, global}).
|
||||
|
||||
-spec riak_username() -> 'nil' | string().
|
||||
riak_username() ->
|
||||
ejabberd_config:get_option({riak_username, global}).
|
||||
|
||||
-spec router_cache_life_time() -> 'infinity' | pos_integer().
|
||||
router_cache_life_time() ->
|
||||
ejabberd_config:get_option({router_cache_life_time, global}).
|
||||
|
@ -104,9 +104,9 @@ opt_type(cluster_backend) ->
|
||||
opt_type(cluster_nodes) ->
|
||||
econf:list(econf:atom(), [unique]);
|
||||
opt_type(default_db) ->
|
||||
econf:enum([mnesia, riak, sql]);
|
||||
econf:enum([mnesia, sql]);
|
||||
opt_type(default_ram_db) ->
|
||||
econf:enum([mnesia, riak, sql, redis]);
|
||||
econf:enum([mnesia, sql, redis]);
|
||||
opt_type(define_macro) ->
|
||||
econf:any();
|
||||
opt_type(disable_sasl_mechanisms) ->
|
||||
@ -280,20 +280,6 @@ opt_type(registration_timeout) ->
|
||||
econf:timeout(second, infinity);
|
||||
opt_type(resource_conflict) ->
|
||||
econf:enum([setresource, closeold, closenew, acceptnew]);
|
||||
opt_type(riak_cacertfile) ->
|
||||
econf:and_then(econf:pem(), econf:string());
|
||||
opt_type(riak_password) ->
|
||||
econf:string();
|
||||
opt_type(riak_pool_size) ->
|
||||
econf:pos_int();
|
||||
opt_type(riak_port) ->
|
||||
econf:port();
|
||||
opt_type(riak_server) ->
|
||||
econf:string();
|
||||
opt_type(riak_start_interval) ->
|
||||
econf:timeout(second);
|
||||
opt_type(riak_username) ->
|
||||
econf:string();
|
||||
opt_type(router_cache_life_time) ->
|
||||
econf:timeout(second, infinity);
|
||||
opt_type(router_cache_missed) ->
|
||||
@ -570,13 +556,6 @@ options() ->
|
||||
{redis_server, "localhost"},
|
||||
{registration_timeout, timer:seconds(600)},
|
||||
{resource_conflict, acceptnew},
|
||||
{riak_cacertfile, nil},
|
||||
{riak_password, nil},
|
||||
{riak_pool_size, 10},
|
||||
{riak_port, 8087},
|
||||
{riak_server, "127.0.0.1"},
|
||||
{riak_start_interval, timer:seconds(30)},
|
||||
{riak_username, nil},
|
||||
{router_cache_life_time,
|
||||
fun(Host) -> ejabberd_config:get_option({cache_life_time, Host}) end},
|
||||
{router_cache_missed,
|
||||
@ -702,13 +681,6 @@ globals() ->
|
||||
redis_queue_type,
|
||||
redis_server,
|
||||
registration_timeout,
|
||||
riak_cacertfile,
|
||||
riak_password,
|
||||
riak_pool_size,
|
||||
riak_port,
|
||||
riak_server,
|
||||
riak_start_interval,
|
||||
riak_username,
|
||||
router_cache_life_time,
|
||||
router_cache_missed,
|
||||
router_cache_size,
|
||||
|
@ -1,568 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% File : ejabberd_riak.erl
|
||||
%%% Author : Alexey Shchepin <alexey@process-one.net>
|
||||
%%% Purpose : Interface for Riak database
|
||||
%%% Created : 29 Dec 2011 by Alexey Shchepin <alexey@process-one.net>
|
||||
%%%
|
||||
%%%
|
||||
%%% ejabberd, Copyright (C) 2002-2019 ProcessOne
|
||||
%%%
|
||||
%%% This program is free software; you can redistribute it and/or
|
||||
%%% modify it under the terms of the GNU General Public License as
|
||||
%%% published by the Free Software Foundation; either version 2 of the
|
||||
%%% License, or (at your option) any later version.
|
||||
%%%
|
||||
%%% This program is distributed in the hope that it will be useful,
|
||||
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
%%% General Public License for more details.
|
||||
%%%
|
||||
%%% You should have received a copy of the GNU General Public License along
|
||||
%%% with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
%%%
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(ejabberd_riak).
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/5, get_proc/1, make_bucket/1, put/2, put/3,
|
||||
get/2, get/3, get_by_index/4, delete/1, delete/2,
|
||||
count_by_index/3, get_by_index_range/5,
|
||||
get_keys/1, get_keys_by_index/3, is_connected/0,
|
||||
count/1, delete_by_index/3]).
|
||||
%% For debugging
|
||||
-export([get_tables/0]).
|
||||
%% map/reduce exports
|
||||
-export([map_key/3]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
|
||||
-include("logger.hrl").
|
||||
|
||||
-record(state, {pid = self() :: pid()}).
|
||||
|
||||
-type index() :: {binary(), any()}.
|
||||
|
||||
-type index_info() :: [{i, any()} | {'2i', [index()]}].
|
||||
|
||||
%% The `record_schema()' is just a tuple:
|
||||
%% {record_info(fields, some_record), #some_record{}}
|
||||
|
||||
-type record_schema() :: {[atom()], tuple()}.
|
||||
|
||||
%% The `index_info()' is used in put/delete functions:
|
||||
%% `i' defines a primary index, `` '2i' '' defines secondary indexes.
|
||||
%% There must be only one primary index. If `i' is not specified,
|
||||
%% the first element of the record is assumed as a primary index,
|
||||
%% i.e. `i' = element(2, Record).
|
||||
|
||||
-export_type([index_info/0]).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
%% @private
|
||||
start_link(Num, Server, Port, _StartInterval, Options) ->
|
||||
gen_server:start_link({local, get_proc(Num)}, ?MODULE, [Server, Port, Options], []).
|
||||
|
||||
%% @private
|
||||
is_connected() ->
|
||||
lists:all(
|
||||
fun({_Id, Pid, _Type, _Modules}) when is_pid(Pid) ->
|
||||
case catch riakc_pb_socket:is_connected(get_riak_pid(Pid)) of
|
||||
true -> true;
|
||||
_ -> false
|
||||
end;
|
||||
(_) ->
|
||||
false
|
||||
end, supervisor:which_children(ejabberd_riak_sup)).
|
||||
|
||||
%% @private
|
||||
get_proc(I) ->
|
||||
misc:binary_to_atom(
|
||||
iolist_to_binary(
|
||||
[atom_to_list(?MODULE), $_, integer_to_list(I)])).
|
||||
|
||||
-spec make_bucket(atom()) -> binary().
|
||||
%% @doc Makes a bucket from a table name
|
||||
%% @private
|
||||
make_bucket(Table) ->
|
||||
erlang:atom_to_binary(Table, utf8).
|
||||
|
||||
-spec put(tuple(), record_schema()) -> ok | {error, any()}.
|
||||
%% @equiv put(Record, [])
|
||||
put(Record, RecFields) ->
|
||||
?MODULE:put(Record, RecFields, []).
|
||||
|
||||
-spec put(tuple(), record_schema(), index_info()) -> ok | {error, any()}.
|
||||
%% @doc Stores a record `Rec' with indexes described in ``IndexInfo''
|
||||
put(Rec, RecSchema, IndexInfo) ->
|
||||
Key = encode_key(proplists:get_value(i, IndexInfo, element(2, Rec))),
|
||||
SecIdxs = [encode_index_key(K, V) ||
|
||||
{K, V} <- proplists:get_value('2i', IndexInfo, [])],
|
||||
Table = element(1, Rec),
|
||||
Value = encode_record(Rec, RecSchema),
|
||||
case put_raw(Table, Key, Value, SecIdxs) of
|
||||
ok ->
|
||||
ok;
|
||||
{error, _} = Error ->
|
||||
log_error(Error, put, [{record, Rec},
|
||||
{index_info, IndexInfo}]),
|
||||
Error
|
||||
end.
|
||||
|
||||
put_raw(Table, Key, Value, Indexes) ->
|
||||
Bucket = make_bucket(Table),
|
||||
Obj = riakc_obj:new(Bucket, Key, Value, "application/x-erlang-term"),
|
||||
Obj1 = if Indexes /= [] ->
|
||||
MetaData = dict:store(<<"index">>, Indexes, dict:new()),
|
||||
riakc_obj:update_metadata(Obj, MetaData);
|
||||
true ->
|
||||
Obj
|
||||
end,
|
||||
catch riakc_pb_socket:put(get_random_pid(), Obj1).
|
||||
|
||||
get_object_raw(Table, Key) ->
|
||||
Bucket = make_bucket(Table),
|
||||
catch riakc_pb_socket:get(get_random_pid(), Bucket, Key).
|
||||
|
||||
-spec get(atom(), record_schema()) -> {ok, [any()]} | {error, any()}.
|
||||
%% @doc Returns all objects from table `Table'
|
||||
get(Table, RecSchema) ->
|
||||
Bucket = make_bucket(Table),
|
||||
case catch riakc_pb_socket:mapred(
|
||||
get_random_pid(),
|
||||
Bucket,
|
||||
[{map, {modfun, riak_kv_mapreduce, map_object_value},
|
||||
none, true}]) of
|
||||
{ok, [{_, Objs}]} ->
|
||||
{ok, lists:flatmap(
|
||||
fun(Obj) ->
|
||||
case catch decode_record(Obj, RecSchema) of
|
||||
{'EXIT', _} ->
|
||||
Error = {error, make_invalid_object(Obj)},
|
||||
log_error(Error, get,
|
||||
[{table, Table}]),
|
||||
[];
|
||||
Term ->
|
||||
[Term]
|
||||
end
|
||||
end, Objs)};
|
||||
{ok, []} ->
|
||||
{ok, []};
|
||||
{error, notfound} ->
|
||||
{ok, []};
|
||||
{error, _} = Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
-spec get(atom(), record_schema(), any()) -> {ok, any()} | {error, any()}.
|
||||
%% @doc Reads record by `Key' from table `Table'
|
||||
get(Table, RecSchema, Key) ->
|
||||
case get_raw(Table, encode_key(Key)) of
|
||||
{ok, Val} ->
|
||||
case catch decode_record(Val, RecSchema) of
|
||||
{'EXIT', _} ->
|
||||
Error = {error, make_invalid_object(Val)},
|
||||
log_error(Error, get, [{table, Table}, {key, Key}]),
|
||||
{error, notfound};
|
||||
Term ->
|
||||
{ok, Term}
|
||||
end;
|
||||
{error, _} = Error ->
|
||||
log_error(Error, get, [{table, Table},
|
||||
{key, Key}]),
|
||||
Error
|
||||
end.
|
||||
|
||||
-spec get_by_index(atom(), record_schema(), binary(), any()) ->
|
||||
{ok, [any()]} | {error, any()}.
|
||||
%% @doc Reads records by `Index' and value `Key' from `Table'
|
||||
get_by_index(Table, RecSchema, Index, Key) ->
|
||||
{NewIndex, NewKey} = encode_index_key(Index, Key),
|
||||
case get_by_index_raw(Table, NewIndex, NewKey) of
|
||||
{ok, Vals} ->
|
||||
{ok, lists:flatmap(
|
||||
fun(Val) ->
|
||||
case catch decode_record(Val, RecSchema) of
|
||||
{'EXIT', _} ->
|
||||
Error = {error, make_invalid_object(Val)},
|
||||
log_error(Error, get_by_index,
|
||||
[{table, Table},
|
||||
{index, Index},
|
||||
{key, Key}]),
|
||||
[];
|
||||
Term ->
|
||||
[Term]
|
||||
end
|
||||
end, Vals)};
|
||||
{error, notfound} ->
|
||||
{ok, []};
|
||||
{error, _} = Error ->
|
||||
log_error(Error, get_by_index,
|
||||
[{table, Table},
|
||||
{index, Index},
|
||||
{key, Key}]),
|
||||
Error
|
||||
end.
|
||||
|
||||
-spec get_by_index_range(atom(), record_schema(), binary(), any(), any()) ->
|
||||
{ok, [any()]} | {error, any()}.
|
||||
%% @doc Reads records by `Index' in the range `FromKey'..`ToKey' from `Table'
|
||||
get_by_index_range(Table, RecSchema, Index, FromKey, ToKey) ->
|
||||
{NewIndex, NewFromKey} = encode_index_key(Index, FromKey),
|
||||
{NewIndex, NewToKey} = encode_index_key(Index, ToKey),
|
||||
case get_by_index_range_raw(Table, NewIndex, NewFromKey, NewToKey) of
|
||||
{ok, Vals} ->
|
||||
{ok, lists:flatmap(
|
||||
fun(Val) ->
|
||||
case catch decode_record(Val, RecSchema) of
|
||||
{'EXIT', _} ->
|
||||
Error = {error, make_invalid_object(Val)},
|
||||
log_error(Error, get_by_index_range,
|
||||
[{table, Table},
|
||||
{index, Index},
|
||||
{start_key, FromKey},
|
||||
{end_key, ToKey}]),
|
||||
[];
|
||||
Term ->
|
||||
[Term]
|
||||
end
|
||||
end, Vals)};
|
||||
{error, notfound} ->
|
||||
{ok, []};
|
||||
{error, _} = Error ->
|
||||
log_error(Error, get_by_index_range,
|
||||
[{table, Table}, {index, Index},
|
||||
{start_key, FromKey}, {end_key, ToKey}]),
|
||||
Error
|
||||
end.
|
||||
|
||||
get_raw(Table, Key) ->
|
||||
case get_object_raw(Table, Key) of
|
||||
{ok, Obj} ->
|
||||
{ok, riakc_obj:get_value(Obj)};
|
||||
{error, _} = Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
-spec get_keys(atom()) -> {ok, [any()]} | {error, any()}.
|
||||
%% @doc Returns a list of index values
|
||||
get_keys(Table) ->
|
||||
Bucket = make_bucket(Table),
|
||||
case catch riakc_pb_socket:mapred(
|
||||
get_random_pid(),
|
||||
Bucket,
|
||||
[{map, {modfun, ?MODULE, map_key}, none, true}]) of
|
||||
{ok, [{_, Keys}]} ->
|
||||
{ok, Keys};
|
||||
{ok, []} ->
|
||||
{ok, []};
|
||||
{error, _} = Error ->
|
||||
log_error(Error, get_keys, [{table, Table}]),
|
||||
Error
|
||||
end.
|
||||
|
||||
-spec get_keys_by_index(atom(), binary(),
|
||||
any()) -> {ok, [any()]} | {error, any()}.
|
||||
%% @doc Returns a list of primary keys of objects indexed by `Key'.
|
||||
get_keys_by_index(Table, Index, Key) ->
|
||||
{NewIndex, NewKey} = encode_index_key(Index, Key),
|
||||
Bucket = make_bucket(Table),
|
||||
case catch riakc_pb_socket:mapred(
|
||||
get_random_pid(),
|
||||
{index, Bucket, NewIndex, NewKey},
|
||||
[{map, {modfun, ?MODULE, map_key}, none, true}]) of
|
||||
{ok, [{_, Keys}]} ->
|
||||
{ok, Keys};
|
||||
{ok, []} ->
|
||||
{ok, []};
|
||||
{error, _} = Error ->
|
||||
log_error(Error, get_keys_by_index, [{table, Table},
|
||||
{index, Index},
|
||||
{key, Key}]),
|
||||
Error
|
||||
end.
|
||||
|
||||
%% @hidden
|
||||
get_tables() ->
|
||||
catch riakc_pb_socket:list_buckets(get_random_pid()).
|
||||
|
||||
get_by_index_raw(Table, Index, Key) ->
|
||||
Bucket = make_bucket(Table),
|
||||
case riakc_pb_socket:mapred(
|
||||
get_random_pid(),
|
||||
{index, Bucket, Index, Key},
|
||||
[{map, {modfun, riak_kv_mapreduce, map_object_value},
|
||||
none, true}]) of
|
||||
{ok, [{_, Objs}]} ->
|
||||
{ok, Objs};
|
||||
{ok, []} ->
|
||||
{ok, []};
|
||||
{error, _} = Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
get_by_index_range_raw(Table, Index, FromKey, ToKey) ->
|
||||
Bucket = make_bucket(Table),
|
||||
case catch riakc_pb_socket:mapred(
|
||||
get_random_pid(),
|
||||
{index, Bucket, Index, FromKey, ToKey},
|
||||
[{map, {modfun, riak_kv_mapreduce, map_object_value},
|
||||
none, true}]) of
|
||||
{ok, [{_, Objs}]} ->
|
||||
{ok, Objs};
|
||||
{ok, []} ->
|
||||
{ok, []};
|
||||
{error, _} = Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
-spec count(atom()) -> {ok, non_neg_integer()} | {error, any()}.
|
||||
%% @doc Returns the number of objects in the `Table'
|
||||
count(Table) ->
|
||||
Bucket = make_bucket(Table),
|
||||
case catch riakc_pb_socket:mapred(
|
||||
get_random_pid(),
|
||||
Bucket,
|
||||
[{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
|
||||
none, true}]) of
|
||||
{ok, [{_, [Cnt]}]} ->
|
||||
{ok, Cnt};
|
||||
{error, _} = Error ->
|
||||
log_error(Error, count, [{table, Table}]),
|
||||
Error
|
||||
end.
|
||||
|
||||
-spec count_by_index(atom(), binary(), any()) ->
|
||||
{ok, non_neg_integer()} | {error, any()}.
|
||||
%% @doc Returns the number of objects in the `Table' by index
|
||||
count_by_index(Tab, Index, Key) ->
|
||||
{NewIndex, NewKey} = encode_index_key(Index, Key),
|
||||
case count_by_index_raw(Tab, NewIndex, NewKey) of
|
||||
{ok, Cnt} ->
|
||||
{ok, Cnt};
|
||||
{error, notfound} ->
|
||||
{ok, 0};
|
||||
{error, _} = Error ->
|
||||
log_error(Error, count_by_index,
|
||||
[{table, Tab},
|
||||
{index, Index},
|
||||
{key, Key}]),
|
||||
Error
|
||||
end.
|
||||
|
||||
count_by_index_raw(Table, Index, Key) ->
|
||||
Bucket = make_bucket(Table),
|
||||
case catch riakc_pb_socket:mapred(
|
||||
get_random_pid(),
|
||||
{index, Bucket, Index, Key},
|
||||
[{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
|
||||
none, true}]) of
|
||||
{ok, [{_, [Cnt]}]} ->
|
||||
{ok, Cnt};
|
||||
{error, _} = Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
-spec delete(tuple() | atom()) -> ok | {error, any()}.
|
||||
%% @doc Same as delete(T, []) when T is record.
|
||||
%% Or deletes all elements from table if T is atom.
|
||||
delete(Rec) when is_tuple(Rec) ->
|
||||
delete(Rec, []);
|
||||
delete(Table) when is_atom(Table) ->
|
||||
try
|
||||
{ok, Keys} = ?MODULE:get_keys(Table),
|
||||
lists:foreach(
|
||||
fun(K) ->
|
||||
ok = delete(Table, K)
|
||||
end, Keys)
|
||||
catch _:{badmatch, Err} ->
|
||||
Err
|
||||
end.
|
||||
|
||||
-spec delete(tuple() | atom(), index_info() | any()) -> ok | {error, any()}.
|
||||
%% @doc Delete an object
|
||||
delete(Rec, Opts) when is_tuple(Rec) ->
|
||||
Table = element(1, Rec),
|
||||
Key = proplists:get_value(i, Opts, element(2, Rec)),
|
||||
delete(Table, Key);
|
||||
delete(Table, Key) when is_atom(Table) ->
|
||||
case delete_raw(Table, encode_key(Key)) of
|
||||
ok ->
|
||||
ok;
|
||||
Err ->
|
||||
log_error(Err, delete, [{table, Table}, {key, Key}]),
|
||||
Err
|
||||
end.
|
||||
|
||||
delete_raw(Table, Key) ->
|
||||
Bucket = make_bucket(Table),
|
||||
catch riakc_pb_socket:delete(get_random_pid(), Bucket, Key).
|
||||
|
||||
-spec delete_by_index(atom(), binary(), any()) -> ok | {error, any()}.
|
||||
%% @doc Deletes objects by index
|
||||
delete_by_index(Table, Index, Key) ->
|
||||
try
|
||||
{ok, Keys} = get_keys_by_index(Table, Index, Key),
|
||||
lists:foreach(
|
||||
fun(K) ->
|
||||
ok = delete(Table, K)
|
||||
end, Keys)
|
||||
catch _:{badmatch, Err} ->
|
||||
Err
|
||||
end.
|
||||
|
||||
%%%===================================================================
|
||||
%%% map/reduce functions
|
||||
%%%===================================================================
|
||||
%% @private
|
||||
map_key(Obj, _, _) ->
|
||||
[case riak_object:key(Obj) of
|
||||
<<"b_", B/binary>> ->
|
||||
B;
|
||||
<<"i_", B/binary>> ->
|
||||
(binary_to_integer(B));
|
||||
B ->
|
||||
erlang:binary_to_term(B)
|
||||
end].
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server API
|
||||
%%%===================================================================
|
||||
%% @private
|
||||
init([Server, Port, Options]) ->
|
||||
case riakc_pb_socket:start(Server, Port, Options) of
|
||||
{ok, Pid} ->
|
||||
erlang:monitor(process, Pid),
|
||||
{ok, #state{pid = Pid}};
|
||||
Err ->
|
||||
{stop, Err}
|
||||
end.
|
||||
|
||||
%% @private
|
||||
handle_call(get_pid, _From, #state{pid = Pid} = State) ->
|
||||
{reply, {ok, Pid}, State};
|
||||
handle_call(Request, From, State) ->
|
||||
?WARNING_MSG("Unexpected call from ~p: ~p", [From, Request]),
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
handle_cast(Msg, State) ->
|
||||
?WARNING_MSG("Unexpected cast: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
handle_info({'DOWN', _MonitorRef, _Type, _Object, _Info}, State) ->
|
||||
{stop, normal, State};
|
||||
handle_info(Info, State) ->
|
||||
?ERROR_MSG("Unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
terminate(_Reason, _State) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
encode_index_key(Idx, Key) when is_integer(Key) ->
|
||||
{<<Idx/binary, "_int">>, Key};
|
||||
encode_index_key(Idx, Key) ->
|
||||
{<<Idx/binary, "_bin">>, encode_key(Key)}.
|
||||
|
||||
encode_key(Bin) when is_binary(Bin) ->
|
||||
<<"b_", Bin/binary>>;
|
||||
encode_key(Int) when is_integer(Int) ->
|
||||
<<"i_", ((integer_to_binary(Int)))/binary>>;
|
||||
encode_key(Term) ->
|
||||
erlang:term_to_binary(Term).
|
||||
|
||||
log_error({error, notfound}, _, _) ->
|
||||
ok;
|
||||
log_error({error, Why} = Err, Function, Opts) ->
|
||||
Txt = lists:map(
|
||||
fun({table, Table}) ->
|
||||
io_lib:fwrite("** Table: ~p~n", [Table]);
|
||||
({key, Key}) ->
|
||||
io_lib:fwrite("** Key: ~p~n", [Key]);
|
||||
({index, Index}) ->
|
||||
io_lib:fwrite("** Index = ~p~n", [Index]);
|
||||
({start_key, Key}) ->
|
||||
io_lib:fwrite("** Start Key: ~p~n", [Key]);
|
||||
({end_key, Key}) ->
|
||||
io_lib:fwrite("** End Key: ~p~n", [Key]);
|
||||
({record, Rec}) ->
|
||||
io_lib:fwrite("** Record = ~p~n", [Rec]);
|
||||
({index_info, IdxInfo}) ->
|
||||
io_lib:fwrite("** Index info = ~p~n", [IdxInfo]);
|
||||
(_) ->
|
||||
""
|
||||
end, Opts),
|
||||
ErrTxt = if is_binary(Why) ->
|
||||
io_lib:fwrite("** Error: ~s", [Why]);
|
||||
true ->
|
||||
io_lib:fwrite("** Error: ~p", [Err])
|
||||
end,
|
||||
?ERROR_MSG("Database error:~n** Function: ~p~n~s~s",
|
||||
[Function, Txt, ErrTxt]);
|
||||
log_error(_, _, _) ->
|
||||
ok.
|
||||
|
||||
make_invalid_object(Val) ->
|
||||
(str:format("Invalid object: ~p", [Val])).
|
||||
|
||||
get_random_pid() ->
|
||||
case ejabberd_riak_sup:start() of
|
||||
ok ->
|
||||
PoolPid = ejabberd_riak_sup:get_random_pid(),
|
||||
get_riak_pid(PoolPid);
|
||||
{error, _} = Err ->
|
||||
Err
|
||||
end.
|
||||
|
||||
get_riak_pid(PoolPid) ->
|
||||
case catch gen_server:call(PoolPid, get_pid) of
|
||||
{ok, Pid} ->
|
||||
Pid;
|
||||
{'EXIT', {timeout, _}} ->
|
||||
throw({error, timeout});
|
||||
{'EXIT', Err} ->
|
||||
throw({error, Err})
|
||||
end.
|
||||
|
||||
encode_record(Rec, {Fields, DefRec}) ->
|
||||
term_to_binary(encode_record(Rec, Fields, DefRec, 2)).
|
||||
|
||||
encode_record(Rec, [FieldName|Fields], DefRec, Pos) ->
|
||||
Value = element(Pos, Rec),
|
||||
DefValue = element(Pos, DefRec),
|
||||
if Value == DefValue ->
|
||||
encode_record(Rec, Fields, DefRec, Pos+1);
|
||||
true ->
|
||||
[{FieldName, Value}|encode_record(Rec, Fields, DefRec, Pos+1)]
|
||||
end;
|
||||
encode_record(_, [], _, _) ->
|
||||
[].
|
||||
|
||||
decode_record(Bin, {Fields, DefRec}) ->
|
||||
decode_record(binary_to_term(Bin), Fields, DefRec, 2).
|
||||
|
||||
decode_record(KeyVals, [FieldName|Fields], Rec, Pos) ->
|
||||
case lists:keyfind(FieldName, 1, KeyVals) of
|
||||
{_, Value} ->
|
||||
NewRec = setelement(Pos, Rec, Value),
|
||||
decode_record(KeyVals, Fields, NewRec, Pos+1);
|
||||
false ->
|
||||
decode_record(KeyVals, Fields, Rec, Pos+1)
|
||||
end;
|
||||
decode_record(_, [], Rec, _) ->
|
||||
Rec.
|
@ -1,142 +0,0 @@
|
||||
%%%----------------------------------------------------------------------
|
||||
%%% File : ejabberd_riak_sup.erl
|
||||
%%% Author : Alexey Shchepin <alexey@process-one.net>
|
||||
%%% Purpose : Riak connections supervisor
|
||||
%%% Created : 29 Dec 2011 by Alexey Shchepin <alexey@process-one.net>
|
||||
%%%
|
||||
%%%
|
||||
%%% ejabberd, Copyright (C) 2002-2019 ProcessOne
|
||||
%%%
|
||||
%%% This program is free software; you can redistribute it and/or
|
||||
%%% modify it under the terms of the GNU General Public License as
|
||||
%%% published by the Free Software Foundation; either version 2 of the
|
||||
%%% License, or (at your option) any later version.
|
||||
%%%
|
||||
%%% This program is distributed in the hope that it will be useful,
|
||||
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
%%% General Public License for more details.
|
||||
%%%
|
||||
%%% You should have received a copy of the GNU General Public License along
|
||||
%%% with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
%%%
|
||||
%%%----------------------------------------------------------------------
|
||||
|
||||
-module(ejabberd_riak_sup).
|
||||
|
||||
-behaviour(supervisor).
|
||||
-author('alexey@process-one.net').
|
||||
|
||||
-export([start/0, start_link/0, init/1, get_pids/0,
|
||||
get_random_pid/0, config_reloaded/0]).
|
||||
|
||||
-include("logger.hrl").
|
||||
|
||||
% time to wait for the supervisor to start its child before returning
|
||||
% a timeout error to the request
|
||||
-define(CONNECT_TIMEOUT, 500). % milliseconds
|
||||
|
||||
start() ->
|
||||
case is_started() of
|
||||
true -> ok;
|
||||
false ->
|
||||
ejabberd:start_app(riakc),
|
||||
Spec = {?MODULE, {?MODULE, start_link, []},
|
||||
permanent, infinity, supervisor, [?MODULE]},
|
||||
case supervisor:start_child(ejabberd_db_sup, Spec) of
|
||||
{ok, _} -> ok;
|
||||
{error, {already_started, _}} -> ok;
|
||||
{error, Why} = Err ->
|
||||
?ERROR_MSG("Failed to start ~s: ~p",
|
||||
[?MODULE, Why]),
|
||||
Err
|
||||
end
|
||||
end.
|
||||
|
||||
config_reloaded() ->
|
||||
case is_started() of
|
||||
true ->
|
||||
lists:foreach(
|
||||
fun(Spec) ->
|
||||
supervisor:start_child(?MODULE, Spec)
|
||||
end, get_specs()),
|
||||
PoolSize = get_pool_size(),
|
||||
lists:foreach(
|
||||
fun({Id, _, _, _}) when Id > PoolSize ->
|
||||
case supervisor:terminate_child(?MODULE, Id) of
|
||||
ok -> supervisor:delete_child(?MODULE, Id);
|
||||
_ -> ok
|
||||
end;
|
||||
(_) ->
|
||||
ok
|
||||
end, supervisor:which_children(?MODULE));
|
||||
false ->
|
||||
ok
|
||||
end.
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
init([]) ->
|
||||
ejabberd_hooks:add(config_reloaded, ?MODULE, config_reloaded, 20),
|
||||
{ok, {{one_for_one, 500, 1}, get_specs()}}.
|
||||
|
||||
is_started() ->
|
||||
whereis(?MODULE) /= undefined.
|
||||
|
||||
-spec get_specs() -> [supervisor:child_spec()].
|
||||
get_specs() ->
|
||||
PoolSize = get_pool_size(),
|
||||
StartInterval = get_start_interval(),
|
||||
Server = get_riak_server(),
|
||||
Port = get_riak_port(),
|
||||
CACertFile = get_riak_cacertfile(),
|
||||
Username = get_riak_username(),
|
||||
Password = get_riak_password(),
|
||||
Options = lists:filter(
|
||||
fun(X) -> X /= nil end,
|
||||
[auto_reconnect,
|
||||
{keepalive, true},
|
||||
if CACertFile /= nil -> {cacertfile ,CACertFile};
|
||||
true -> nil
|
||||
end,
|
||||
if (Username /= nil) and (Password /= nil) ->
|
||||
{credentials, Username, Password};
|
||||
true -> nil
|
||||
end]),
|
||||
lists:map(
|
||||
fun(I) ->
|
||||
{ejabberd_riak:get_proc(I),
|
||||
{ejabberd_riak, start_link,
|
||||
[I, Server, Port, StartInterval, Options]},
|
||||
transient, 2000, worker, [?MODULE]}
|
||||
end, lists:seq(1, PoolSize)).
|
||||
|
||||
get_start_interval() ->
|
||||
ejabberd_option:riak_start_interval().
|
||||
|
||||
get_pool_size() ->
|
||||
ejabberd_option:riak_pool_size().
|
||||
|
||||
get_riak_server() ->
|
||||
ejabberd_option:riak_server().
|
||||
|
||||
get_riak_cacertfile() ->
|
||||
ejabberd_option:riak_cacertfile().
|
||||
|
||||
get_riak_username() ->
|
||||
ejabberd_option:riak_username().
|
||||
|
||||
get_riak_password() ->
|
||||
ejabberd_option:riak_password().
|
||||
|
||||
get_riak_port() ->
|
||||
ejabberd_option:riak_port().
|
||||
|
||||
get_pids() ->
|
||||
[ejabberd_riak:get_proc(I) || I <- lists:seq(1, get_pool_size())].
|
||||
|
||||
get_random_pid() ->
|
||||
I = p1_rand:round_robin(get_pool_size()) + 1,
|
||||
ejabberd_riak:get_proc(I).
|
@ -1,83 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
|
||||
%%% Created : 15 Apr 2017 by Evgeny Khramtsov <ekhramtsov@process-one.net>
|
||||
%%%
|
||||
%%%
|
||||
%%% ejabberd, Copyright (C) 2002-2019 ProcessOne
|
||||
%%%
|
||||
%%% This program is free software; you can redistribute it and/or
|
||||
%%% modify it under the terms of the GNU General Public License as
|
||||
%%% published by the Free Software Foundation; either version 2 of the
|
||||
%%% License, or (at your option) any later version.
|
||||
%%%
|
||||
%%% This program is distributed in the hope that it will be useful,
|
||||
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
%%% General Public License for more details.
|
||||
%%%
|
||||
%%% You should have received a copy of the GNU General Public License along
|
||||
%%% with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
%%%
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(ejabberd_router_riak).
|
||||
-behaviour(ejabberd_router).
|
||||
|
||||
%% API
|
||||
-export([init/0, register_route/5, unregister_route/3, find_routes/1,
|
||||
get_all_routes/0]).
|
||||
|
||||
-include("logger.hrl").
|
||||
-include("ejabberd_router.hrl").
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
init() ->
|
||||
clean_table().
|
||||
|
||||
register_route(Domain, ServerHost, LocalHint, _, Pid) ->
|
||||
ejabberd_riak:put(#route{domain = Domain,
|
||||
server_host = ServerHost,
|
||||
local_hint = LocalHint,
|
||||
pid = Pid},
|
||||
route_schema(),
|
||||
[{i, {Domain, Pid}}, {'2i', [{<<"route">>, Domain}]}]).
|
||||
|
||||
unregister_route(Domain, _, Pid) ->
|
||||
ejabberd_riak:delete(route, {Domain, Pid}).
|
||||
|
||||
find_routes(Domain) ->
|
||||
ejabberd_riak:get_by_index(route, route_schema(), <<"route">>, Domain).
|
||||
|
||||
get_all_routes() ->
|
||||
case ejabberd_riak:get(route, route_schema()) of
|
||||
{ok, Routes} ->
|
||||
{ok, lists:flatmap(
|
||||
fun(#route{domain = D, server_host = S}) when D /= S ->
|
||||
[D];
|
||||
(_) ->
|
||||
[]
|
||||
end, Routes)};
|
||||
Err ->
|
||||
Err
|
||||
end.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
route_schema() ->
|
||||
{record_info(fields, route), #route{domain = <<>>, server_host = <<>>}}.
|
||||
|
||||
clean_table() ->
|
||||
?DEBUG("Cleaning Riak 'route' table...", []),
|
||||
case ejabberd_riak:get(route, route_schema()) of
|
||||
{ok, Routes} ->
|
||||
lists:foreach(
|
||||
fun(#route{domain = Domain, pid = Pid}) ->
|
||||
ejabberd_riak:delete(route, {Domain, Pid})
|
||||
end, Routes);
|
||||
{error, Err} ->
|
||||
?ERROR_MSG("Failed to clean Riak 'route' table: ~p", [Err]),
|
||||
Err
|
||||
end.
|
@ -1,72 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
|
||||
%%% Created : 15 Apr 2017 by Evgeny Khramtsov <ekhramtsov@process-one.net>
|
||||
%%%
|
||||
%%%
|
||||
%%% ejabberd, Copyright (C) 2002-2019 ProcessOne
|
||||
%%%
|
||||
%%% This program is free software; you can redistribute it and/or
|
||||
%%% modify it under the terms of the GNU General Public License as
|
||||
%%% published by the Free Software Foundation; either version 2 of the
|
||||
%%% License, or (at your option) any later version.
|
||||
%%%
|
||||
%%% This program is distributed in the hope that it will be useful,
|
||||
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
%%% General Public License for more details.
|
||||
%%%
|
||||
%%% You should have received a copy of the GNU General Public License along
|
||||
%%% with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
%%%
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(ejabberd_sm_riak).
|
||||
-behaviour(ejabberd_sm).
|
||||
|
||||
%% API
|
||||
-export([init/0, set_session/1, delete_session/1, get_sessions/0,
|
||||
get_sessions/1, get_sessions/2]).
|
||||
|
||||
-include("ejabberd_sm.hrl").
|
||||
-include("logger.hrl").
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
init() ->
|
||||
clean_table().
|
||||
|
||||
set_session(Session) ->
|
||||
ejabberd_riak:put(Session, session_schema(),
|
||||
[{'2i', [{<<"us">>, Session#session.us}]}]).
|
||||
|
||||
delete_session(Session) ->
|
||||
ejabberd_riak:delete(session, Session#session.sid).
|
||||
|
||||
get_sessions() ->
|
||||
case ejabberd_riak:get(session, session_schema()) of
|
||||
{ok, Ss} -> Ss;
|
||||
{error, _} -> []
|
||||
end.
|
||||
|
||||
get_sessions(LServer) ->
|
||||
[S || S <- get_sessions(), element(2, S#session.us) == LServer].
|
||||
|
||||