From 0490c2f139d3fee2001aaa22fe08f1ac1c4c810d Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Tue, 13 Nov 2012 22:56:27 +1000 Subject: [PATCH] Improve Riak support --- src/ejabberd_auth_riak.erl | 285 ++++++++++++++++++++++ src/ejabberd_riak.erl | 480 ++++++++++++++++++++++++++++++++----- src/ejabberd_riak_sup.erl | 5 +- src/mod_announce.erl | 48 ++++ src/mod_blocking.erl | 91 +++++++ src/mod_caps.erl | 37 +++ src/mod_irc.erl | 16 ++ src/mod_last.erl | 20 +- src/mod_muc.erl | 66 +++++ src/mod_offline.erl | 201 +++++++++------- src/mod_privacy.erl | 107 +++++++++ src/mod_private.erl | 51 +--- src/mod_roster.erl | 249 ++++++------------- src/mod_shared_roster.erl | 84 +++++++ src/mod_vcard.erl | 63 ++--- src/mod_vcard_xupdate.erl | 10 + 16 files changed, 1405 insertions(+), 408 deletions(-) create mode 100644 src/ejabberd_auth_riak.erl diff --git a/src/ejabberd_auth_riak.erl b/src/ejabberd_auth_riak.erl new file mode 100644 index 000000000..870aa4890 --- /dev/null +++ b/src/ejabberd_auth_riak.erl @@ -0,0 +1,285 @@ +%%%---------------------------------------------------------------------- +%%% File : ejabberd_auth_riak.erl +%%% Author : Evgeniy Khramtsov +%%% Purpose : Authentification via Riak +%%% Created : 12 Nov 2012 by Evgeniy Khramtsov +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2012 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License +%%% along with this program; if not, write to the Free Software +%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA +%%% 02111-1307 USA +%%% +%%%---------------------------------------------------------------------- + +-module(ejabberd_auth_riak). + +-author('alexey@process-one.net'). + +-behaviour(ejabberd_auth). + +%% External exports +-export([start/1, set_password/3, check_password/3, + check_password/5, try_register/3, + dirty_get_registered_users/0, get_vh_registered_users/1, + get_vh_registered_users/2, + get_vh_registered_users_number/1, + get_vh_registered_users_number/2, get_password/2, + get_password_s/2, is_user_exists/2, remove_user/2, + remove_user/3, store_type/0, export/1, + plain_password_required/0]). + +-include("ejabberd.hrl"). + +-record(passwd, {us = {<<"">>, <<"">>} :: {binary(), binary()} | '$1', + password = <<"">> :: binary() | scram() | '_'}). + +-define(SALT_LENGTH, 16). + +start(_Host) -> + ok. + +plain_password_required() -> + case is_scrammed() of + false -> false; + true -> true + end. + +store_type() -> + case is_scrammed() of + false -> plain; %% allows: PLAIN DIGEST-MD5 SCRAM + true -> scram %% allows: PLAIN SCRAM + end. + +check_password(User, Server, Password) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + case ejabberd_riak:get(passwd, {LUser, LServer}) of + {ok, #passwd{password = Password}} when is_binary(Password) -> + Password /= <<"">>; + {ok, #passwd{password = Scram}} when is_record(Scram, scram) -> + is_password_scram_valid(Password, Scram); + _ -> + false + end. + +check_password(User, Server, Password, Digest, + DigestGen) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + case ejabberd_riak:get(passwd, {LUser, LServer}) of + {ok, #passwd{password = Passwd}} when is_binary(Passwd) -> + DigRes = if Digest /= <<"">> -> + Digest == DigestGen(Passwd); + true -> false + end, + if DigRes -> true; + true -> (Passwd == Password) and (Password /= <<"">>) + end; + {ok, #passwd{password = Scram}} + when is_record(Scram, scram) -> + Passwd = jlib:decode_base64(Scram#scram.storedkey), + DigRes = if Digest /= <<"">> -> + Digest == DigestGen(Passwd); + true -> false + end, + if DigRes -> true; + true -> (Passwd == Password) and (Password /= <<"">>) + end; + _ -> false + end. + +set_password(User, Server, Password) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + US = {LUser, LServer}, + if (LUser == error) or (LServer == error) -> + {error, invalid_jid}; + true -> + Password2 = case is_scrammed() and is_binary(Password) + of + true -> password_to_scram(Password); + false -> Password + end, + ok = ejabberd_riak:put(#passwd{us = US, password = Password2}, + [{'2i', [{<<"host">>, LServer}]}]) + end. + +try_register(User, Server, PasswordList) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + Password = iolist_to_binary(PasswordList), + US = {LUser, LServer}, + if (LUser == error) or (LServer == error) -> + {error, invalid_jid}; + true -> + case ejabberd_riak:get(passwd, US) of + {error, notfound} -> + Password2 = case is_scrammed() and + is_binary(Password) + of + true -> password_to_scram(Password); + false -> Password + end, + {atomic, ejabberd_riak:put( + #passwd{us = US, + password = Password2}, + [{'2i', [{<<"host">>, LServer}]}])}; + {ok, _} -> + exists; + Err -> + {atomic, Err} + end + end. + +dirty_get_registered_users() -> + lists:flatmap( + fun(Server) -> + get_vh_registered_users(Server) + end, ejabberd_config:get_vh_by_auth_method(riak)). + +get_vh_registered_users(Server) -> + LServer = jlib:nameprep(Server), + case ejabberd_riak:get_keys_by_index(passwd, <<"host">>, LServer) of + {ok, Users} -> + Users; + _ -> + [] + end. + +get_vh_registered_users(Server, _) -> + get_vh_registered_users(Server). + +get_vh_registered_users_number(Server) -> + LServer = jlib:nameprep(Server), + case ejabberd_riak:count_by_index(passwd, <<"host">>, LServer) of + {ok, N} -> + N; + _ -> + 0 + end. + +get_vh_registered_users_number(Server, _) -> + get_vh_registered_users_number(Server). + +get_password(User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + case ejabberd_riak:get(passwd, {LUser, LServer}) of + {ok, #passwd{password = Password}} + when is_binary(Password) -> + Password; + {ok, #passwd{password = Scram}} + when is_record(Scram, scram) -> + {jlib:decode_base64(Scram#scram.storedkey), + jlib:decode_base64(Scram#scram.serverkey), + jlib:decode_base64(Scram#scram.salt), + Scram#scram.iterationcount}; + _ -> false + end. + +get_password_s(User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + case ejabberd_riak:get(passwd, {LUser, LServer}) of + {ok, #passwd{password = Password}} + when is_binary(Password) -> + Password; + {ok, #passwd{password = Scram}} + when is_record(Scram, scram) -> + <<"">>; + _ -> <<"">> + end. + +is_user_exists(User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + case ejabberd_riak:get(passwd, {LUser, LServer}) of + {error, notfound} -> false; + {ok, _} -> true; + Err -> Err + end. + +remove_user(User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + ejabberd_riak:delete(passwd, {LUser, LServer}), + ok. + +remove_user(User, Server, Password) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + case ejabberd_riak:get(passwd, {LUser, LServer}) of + {ok, #passwd{password = Password}} + when is_binary(Password) -> + ejabberd_riak:delete(passwd, {LUser, LServer}), + ok; + {ok, #passwd{password = Scram}} + when is_record(Scram, scram) -> + case is_password_scram_valid(Password, Scram) of + true -> + ejabberd_riak:delete(passwd, {LUser, LServer}), + ok; + false -> not_allowed + end; + _ -> not_exists + end. + +%%% +%%% SCRAM +%%% + +is_scrammed() -> + scram == + ejabberd_config:get_local_option({auth_password_format, ?MYNAME}, + fun(V) -> V end). + +password_to_scram(Password) -> + password_to_scram(Password, + ?SCRAM_DEFAULT_ITERATION_COUNT). + +password_to_scram(Password, IterationCount) -> + Salt = crypto:rand_bytes(?SALT_LENGTH), + SaltedPassword = scram:salted_password(Password, Salt, + IterationCount), + StoredKey = + scram:stored_key(scram:client_key(SaltedPassword)), + ServerKey = scram:server_key(SaltedPassword), + #scram{storedkey = jlib:encode_base64(StoredKey), + serverkey = jlib:encode_base64(ServerKey), + salt = jlib:encode_base64(Salt), + iterationcount = IterationCount}. + +is_password_scram_valid(Password, Scram) -> + IterationCount = Scram#scram.iterationcount, + Salt = jlib:decode_base64(Scram#scram.salt), + SaltedPassword = scram:salted_password(Password, Salt, + IterationCount), + StoredKey = + scram:stored_key(scram:client_key(SaltedPassword)), + jlib:decode_base64(Scram#scram.storedkey) == StoredKey. + +export(_Server) -> + [{passwd, + fun(Host, #passwd{us = {LUser, LServer}, password = Password}) + when LServer == Host -> + Username = ejabberd_odbc:escape(LUser), + Pass = ejabberd_odbc:escape(Password), + [[<<"delete from users where username='">>, Username, <<"';">>], + [<<"insert into users(username, password) " + "values ('">>, Username, <<"', '">>, Pass, <<"');">>]]; + (_Host, _R) -> + [] + end}]. diff --git a/src/ejabberd_riak.erl b/src/ejabberd_riak.erl index 892e8fd69..04ff1ea11 100644 --- a/src/ejabberd_riak.erl +++ b/src/ejabberd_riak.erl @@ -1,11 +1,10 @@ -%%%---------------------------------------------------------------------- -%%% File : ejabberd_riak.erl -%%% Author : Alexey Shchepin -%%% Purpose : Serve Riak connection +%%%------------------------------------------------------------------- +%%% @author Alexey Shchepin +%%% @doc +%%% Interface for Riak database +%%% @end %%% Created : 29 Dec 2011 by Alexey Shchepin -%%% -%%% -%%% ejabberd, Copyright (C) 2002-2011 ProcessOne +%%% @copyright (C) 2002-2012 ProcessOne %%% %%% This program is free software; you can redistribute it and/or %%% modify it under the terms of the GNU General Public License as @@ -22,76 +21,249 @@ %%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA %%% 02111-1307 USA %%% -%%%---------------------------------------------------------------------- - +%%%------------------------------------------------------------------- -module(ejabberd_riak). --author('alexey@process-one.net'). -%% External exports --export([start_link/3, - put/4, - put/5, - get_object/3, - get/3, - get_objects_by_index/4, - get_by_index/4, - get_keys_by_index/4, - count_by_index/4, - delete/3]). +-behaviour(gen_server). + +%% API +-export([start_link/3, make_bucket/1, put/1, put/2, + get/1, get/2, get_by_index/3, delete/1, delete/2, + count_by_index/3, get_by_index_range/4, + get_keys/1, get_keys_by_index/3, + count/1, delete_by_index/3]). +%% For debugging +-export([get_tables/0]). +%% map/reduce exports +-export([map_key/3]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). -include("ejabberd.hrl"). -%%%---------------------------------------------------------------------- +-record(state, {pid = self() :: pid()}). + +-type index() :: {binary(), any()}. + +-type index_info() :: [{i, any()} | {'2i', [index()]}]. + +%% The `index_info()' is used in put/delete functions: +%% `i' defines a primary index, `` '2i' '' defines secondary indexes. +%% There must be only one primary index. If `i' is not specified, +%% the first element of the record is assumed as a primary index, +%% i.e. `i' = element(2, Record). + +-export_types([index_info/0]). + +%%%=================================================================== %%% API -%%%---------------------------------------------------------------------- -start_link(Server, Port, StartInterval) -> - {ok, Pid} = riakc_pb_socket:start_link( - Server, Port, - [auto_reconnect]), - ejabberd_riak_sup:add_pid(Pid), - {ok, Pid}. +%%%=================================================================== +%% @private +start_link(Server, Port, _StartInterval) -> + gen_server:start_link(?MODULE, [Server, Port], []). -make_bucket(Host, Table) -> - iolist_to_binary([Host, $@, Table]). +-spec make_bucket(atom()) -> binary(). +%% @doc Makes a bucket from a table name +%% @private +make_bucket(Table) -> + erlang:atom_to_binary(Table, utf8). -put(Host, Table, Key, Value) -> - Bucket = make_bucket(Host, Table), - Obj = riakc_obj:new(Bucket, Key, Value), - riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj). +-spec put(tuple()) -> ok | {error, any()}. +%% @equiv put(Record, []) +put(Record) -> + ?MODULE:put(Record, []). -put(Host, Table, Key, Value, Indexes) -> - Bucket = make_bucket(Host, Table), - Obj = riakc_obj:new(Bucket, Key, Value), - MetaData = dict:store(<<"index">>, Indexes, dict:new()), - Obj2 = riakc_obj:update_metadata(Obj, MetaData), - riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj2). +-spec put(tuple(), index_info()) -> ok | {error, any()}. +%% @doc Stores a record `Rec' with indexes described in ``IndexInfo'' +put(Rec, IndexInfo) -> + Key = encode_key(proplists:get_value(i, IndexInfo, element(2, Rec))), + SecIdxs = [encode_index_key(K, V) || + {K, V} <- proplists:get_value('2i', IndexInfo, [])], + Table = element(1, Rec), + Value = term_to_binary(Rec), + case put_raw(Table, Key, Value, SecIdxs) of + ok -> + ok; + Error -> + log_error(Error, put, [{record, Rec}, + {index_info, IndexInfo}]), + Error + end. -get_object(Host, Table, Key) -> - Bucket = make_bucket(Host, Table), +put_raw(Table, Key, Value, Indexes) -> + Bucket = make_bucket(Table), + Obj = riakc_obj:new(Bucket, Key, Value, "application/x-erlang-term"), + Obj1 = if Indexes /= [] -> + MetaData = dict:store(<<"index">>, Indexes, dict:new()), + riakc_obj:update_metadata(Obj, MetaData); + true -> + Obj + end, + riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj1). + +get_object_raw(Table, Key) -> + Bucket = make_bucket(Table), riakc_pb_socket:get(ejabberd_riak_sup:get_random_pid(), Bucket, Key). -get(Host, Table, Key) -> - case get_object(Host, Table, Key) of +-spec get(atom()) -> {ok, [any()]} | {error, any()}. +%% @doc Returns all objects from table `Table' +get(Table) -> + Bucket = make_bucket(Table), + case riakc_pb_socket:mapred( + ejabberd_riak_sup:get_random_pid(), + Bucket, + [{map, {modfun, riak_kv_mapreduce, map_object_value}, + none, true}]) of + {ok, [{_, Objs}]} -> + {ok, lists:flatmap( + fun(Obj) -> + case catch binary_to_term(Obj) of + {'EXIT', _} -> + Error = {error, make_invalid_object(Obj)}, + log_error(Error, get, + [{table, Table}]), + []; + Term -> + [Term] + end + end, Objs)}; + {error, notfound} -> + {ok, []}; + Error -> + Error + end. + +-spec get(atom(), any()) -> {ok, any()} | {error, any()}. +%% @doc Reads record by `Key' from table `Table' +get(Table, Key) -> + case get_raw(Table, encode_key(Key)) of + {ok, Val} -> + case catch binary_to_term(Val) of + {'EXIT', _} -> + Error = {error, make_invalid_object(Val)}, + log_error(Error, get, [{table, Table}, {key, Key}]), + {error, notfound}; + Term -> + {ok, Term} + end; + Error -> + log_error(Error, get, [{table, Table}, + {key, Key}]), + Error + end. + +-spec get_by_index(atom(), binary(), any()) -> {ok, [any()]} | {error, any()}. +%% @doc Reads records by `Index' and value `Key' from `Table' +get_by_index(Table, Index, Key) -> + {NewIndex, NewKey} = encode_index_key(Index, Key), + case get_by_index_raw(Table, NewIndex, NewKey) of + {ok, Vals} -> + {ok, lists:flatmap( + fun(Val) -> + case catch binary_to_term(Val) of + {'EXIT', _} -> + Error = {error, make_invalid_object(Val)}, + log_error(Error, get_by_index, + [{table, Table}, + {index, Index}, + {key, Key}]), + []; + Term -> + [Term] + end + end, Vals)}; + {error, notfound} -> + {ok, []}; + Error -> + log_error(Error, get_by_index, + [{table, Table}, + {index, Index}, + {key, Key}]), + Error + end. + +-spec get_by_index_range(atom(), binary(), any(), any()) -> + {ok, [any()]} | {error, any()}. +%% @doc Reads records by `Index' in the range `FromKey'..`ToKey' from `Table' +get_by_index_range(Table, Index, FromKey, ToKey) -> + {NewIndex, NewFromKey} = encode_index_key(Index, FromKey), + {NewIndex, NewToKey} = encode_index_key(Index, ToKey), + case get_by_index_range_raw(Table, NewIndex, NewFromKey, NewToKey) of + {ok, Vals} -> + {ok, lists:flatmap( + fun(Val) -> + case catch binary_to_term(Val) of + {'EXIT', _} -> + Error = {error, make_invalid_object(Val)}, + log_error(Error, get_by_index_range, + [{table, Table}, + {index, Index}, + {start_key, FromKey}, + {end_key, ToKey}]), + []; + Term -> + [Term] + end + end, Vals)}; + {error, notfound} -> + {ok, []}; + Error -> + log_error(Error, get_by_index_range, + [{table, Table}, {index, Index}, + {start_key, FromKey}, {end_key, ToKey}]), + Error + end. + +get_raw(Table, Key) -> + case get_object_raw(Table, Key) of {ok, Obj} -> {ok, riakc_obj:get_value(Obj)}; Error -> Error end. -get_objects_by_index(Host, Table, Index, Key) -> - Bucket = make_bucket(Host, Table), +-spec get_keys(atom()) -> {ok, [any()]} | {error, any()}. +%% @doc Returns a list of index values +get_keys(Table) -> + Bucket = make_bucket(Table), case riakc_pb_socket:mapred( ejabberd_riak_sup:get_random_pid(), - {index, Bucket, Index, Key}, - [{map, {modfun, riak_kv_mapreduce, map_identity}, none, true}]) of - {ok, [{_, Objs}]} -> - {ok, Objs}; + Bucket, + [{map, {modfun, ?MODULE, map_key}, none, true}]) of + {ok, [{_, Keys}]} -> + {ok, Keys}; Error -> + log_error(Error, get_keys, [{table, Table}]), Error end. -get_by_index(Host, Table, Index, Key) -> - Bucket = make_bucket(Host, Table), +-spec get_keys_by_index(atom(), binary(), + any()) -> {ok, [any()]} | {error, any()}. +%% @doc Returns a list of primary keys of objects indexed by `Key'. +get_keys_by_index(Table, Index, Key) -> + {NewIndex, NewKey} = encode_index_key(Index, Key), + Bucket = make_bucket(Table), + case riakc_pb_socket:mapred( + ejabberd_riak_sup:get_random_pid(), + {index, Bucket, NewIndex, NewKey}, + [{map, {modfun, ?MODULE, map_key}, none, true}]) of + {ok, [{_, Keys}]} -> + {ok, Keys}; + Error -> + log_error(Error, get_keys_by_index, [{table, Table}, + {index, Index}, + {key, Key}]), + Error + end. + +%% @hidden +get_tables() -> + riakc_pb_socket:list_buckets(ejabberd_riak_sup:get_random_pid()). + +get_by_index_raw(Table, Index, Key) -> + Bucket = make_bucket(Table), case riakc_pb_socket:mapred( ejabberd_riak_sup:get_random_pid(), {index, Bucket, Index, Key}, @@ -103,20 +275,55 @@ get_by_index(Host, Table, Index, Key) -> Error end. -get_keys_by_index(Host, Table, Index, Key) -> - Bucket = make_bucket(Host, Table), +get_by_index_range_raw(Table, Index, FromKey, ToKey) -> + Bucket = make_bucket(Table), case riakc_pb_socket:mapred( ejabberd_riak_sup:get_random_pid(), - {index, Bucket, Index, Key}, - []) of - {ok, [{_, Ls}]} -> - {ok, [K || {_, K} <- Ls]}; + {index, Bucket, Index, FromKey, ToKey}, + [{map, {modfun, riak_kv_mapreduce, map_object_value}, + none, true}]) of + {ok, [{_, Objs}]} -> + {ok, Objs}; Error -> Error end. -count_by_index(Host, Table, Index, Key) -> - Bucket = make_bucket(Host, Table), +-spec count(atom()) -> {ok, non_neg_integer()} | {error, any()}. +%% @doc Returns the number of objects in the `Table' +count(Table) -> + Bucket = make_bucket(Table), + case riakc_pb_socket:mapred( + ejabberd_riak_sup:get_random_pid(), + Bucket, + [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs}, + none, true}]) of + {ok, [{_, [Cnt]}]} -> + {ok, Cnt}; + Error -> + log_error(Error, count, [{table, Table}]), + Error + end. + +-spec count_by_index(atom(), binary(), any()) -> + {ok, non_neg_integer()} | {error, any()}. +%% @doc Returns the number of objects in the `Table' by index +count_by_index(Tab, Index, Key) -> + {NewIndex, NewKey} = encode_index_key(Index, Key), + case count_by_index_raw(Tab, NewIndex, NewKey) of + {ok, Cnt} -> + {ok, Cnt}; + {error, notfound} -> + {ok, 0}; + Error -> + log_error(Error, count_by_index, + [{table, Tab}, + {index, Index}, + {key, Key}]), + Error + end. + +count_by_index_raw(Table, Index, Key) -> + Bucket = make_bucket(Table), case riakc_pb_socket:mapred( ejabberd_riak_sup:get_random_pid(), {index, Bucket, Index, Key}, @@ -128,7 +335,154 @@ count_by_index(Host, Table, Index, Key) -> Error end. -delete(Host, Table, Key) -> - Bucket = make_bucket(Host, Table), +-spec delete(tuple() | atom()) -> ok | {error, any()}. +%% @doc Same as delete(T, []) when T is record. +%% Or deletes all elements from table if T is atom. +delete(Rec) when is_tuple(Rec) -> + delete(Rec, []); +delete(Table) when is_atom(Table) -> + try + {ok, Keys} = ?MODULE:get_keys(Table), + lists:foreach( + fun(K) -> + ok = delete(Table, K) + end, Keys) + catch _:{badmatch, Err} -> + Err + end. + +-spec delete(tuple() | atom(), index_info() | any()) -> ok | {error, any()}. +%% @doc Delete an object +delete(Rec, Opts) when is_tuple(Rec) -> + Table = element(1, Rec), + Key = proplists:get_value(i, Opts, element(2, Rec)), + delete(Table, Key); +delete(Table, Key) when is_atom(Table) -> + case delete_raw(Table, encode_key(Key)) of + ok -> + ok; + Err -> + log_error(Err, delete, [{table, Table}, {key, Key}]), + Err + end. + +delete_raw(Table, Key) -> + Bucket = make_bucket(Table), riakc_pb_socket:delete(ejabberd_riak_sup:get_random_pid(), Bucket, Key). +-spec delete_by_index(atom(), binary(), any()) -> ok | {error, any()}. +%% @doc Deletes objects by index +delete_by_index(Table, Index, Key) -> + try + {ok, Keys} = get_keys_by_index(Table, Index, Key), + lists:foreach( + fun(K) -> + ok = delete(Table, K) + end, Keys) + catch _:{badmatch, Err} -> + Err + end. + +%%%=================================================================== +%%% map/reduce functions +%%%=================================================================== +%% @private +map_key(Obj, _, _) -> + [case riak_object:key(Obj) of + <<"b_", B/binary>> -> + B; + <<"i_", B/binary>> -> + list_to_integer(binary_to_list(B)); + B -> + erlang:binary_to_term(B) + end]. + +%%%=================================================================== +%%% gen_server API +%%%=================================================================== +%% @private +init([Server, Port]) -> + case riakc_pb_socket:start( + Server, Port, + [auto_reconnect]) of + {ok, Pid} -> + erlang:monitor(process, Pid), + ejabberd_riak_sup:add_pid(Pid), + {ok, #state{pid = Pid}}; + Err -> + {stop, Err} + end. + +%% @private +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +%% @private +handle_cast(_Msg, State) -> + {noreply, State}. + +%% @private +handle_info({'DOWN', _MonitorRef, _Type, _Object, _Info}, State) -> + {stop, normal, State}; +handle_info(_Info, State) -> + ?ERROR_MSG("unexpected info: ~p", [_Info]), + {noreply, State}. + +%% @private +terminate(_Reason, State) -> + ejabberd_riak_sup:remove_pid(State#state.pid), + ok. + +%% @private +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +encode_index_key(Idx, Key) when is_integer(Key) -> + {<>, Key}; +encode_index_key(Idx, Key) -> + {<>, encode_key(Key)}. + +encode_key(Bin) when is_binary(Bin) -> + <<"b_", Bin/binary>>; +encode_key(Int) when is_integer(Int) -> + <<"i_", (list_to_binary(integer_to_list(Int)))/binary>>; +encode_key(Term) -> + erlang:term_to_binary(Term). + +log_error({error, notfound}, _, _) -> + ok; +log_error({error, Why} = Err, Function, Opts) -> + Txt = lists:map( + fun({table, Table}) -> + io_lib:fwrite("** Table: ~p~n", [Table]); + ({key, Key}) -> + io_lib:fwrite("** Key: ~p~n", [Key]); + ({index, Index}) -> + io_lib:fwrite("** Index = ~p~n", [Index]); + ({start_key, Key}) -> + io_lib:fwrite("** Start Key: ~p~n", [Key]); + ({end_key, Key}) -> + io_lib:fwrite("** End Key: ~p~n", [Key]); + ({record, Rec}) -> + io_lib:fwrite("** Record = ~p~n", [Rec]); + ({index_info, IdxInfo}) -> + io_lib:fwrite("** Index info = ~p~n", [IdxInfo]); + (_) -> + "" + end, Opts), + ErrTxt = if is_binary(Why) -> + io_lib:fwrite("** Error: ~s", [Why]); + true -> + io_lib:fwrite("** Error: ~p", [Err]) + end, + ?ERROR_MSG("database error:~n** Function: ~p~n~s~s", + [Function, Txt, ErrTxt]); +log_error(_, _, _) -> + ok. + +make_invalid_object(Val) -> + list_to_binary(io_lib:fwrite("Invalid object: ~p", [Val])). diff --git a/src/ejabberd_riak_sup.erl b/src/ejabberd_riak_sup.erl index 4ad7d4130..d19b9fbe9 100644 --- a/src/ejabberd_riak_sup.erl +++ b/src/ejabberd_riak_sup.erl @@ -105,8 +105,9 @@ init([]) -> {Server, Port} = ejabberd_config:get_local_option( riak_server, - fun({S, P}) when is_list(S), is_integer(P), P >= 1 -> {S, P} end, - {"127.0.0.1", 8081}), + fun({S, P}) when is_integer(P), P > 0, P < 65536 -> + {binary_to_list(iolist_to_binary(S)), P} + end, {"127.0.0.1", 8081}), {ok, {{one_for_one, PoolSize*10, 1}, lists:map( fun(I) -> diff --git a/src/mod_announce.erl b/src/mod_announce.erl index fba6d3b81..6ccc5990e 100644 --- a/src/mod_announce.erl +++ b/src/mod_announce.erl @@ -792,6 +792,17 @@ announce_motd(Host, Packet) -> end, Sessions) end, mnesia:transaction(F); + riak -> + try + lists:foreach( + fun({U, S, _R}) -> + ok = ejabberd_riak:put(#motd_users{us = {U, S}}, + [{'2i', [{<<"server">>, S}]}]) + end, Sessions), + {atomic, ok} + catch _:{badmatch, Err} -> + {atomic, Err} + end; odbc -> F = fun() -> lists:foreach( @@ -837,6 +848,9 @@ announce_motd_update(LServer, Packet) -> mnesia:write(#motd{server = LServer, packet = Packet}) end, mnesia:transaction(F); + riak -> + {atomic, ejabberd_riak:put(#motd{server = LServer, + packet = Packet})}; odbc -> XML = ejabberd_odbc:escape(xml:element_to_binary(Packet)), F = fun() -> @@ -887,6 +901,16 @@ announce_motd_delete(LServer) -> end, Users) end, mnesia:transaction(F); + riak -> + try + ok = ejabberd_riak:delete(motd, LServer), + ok = ejabberd_riak:delete_by_index(motd_users, + <<"server">>, + LServer), + {atomic, ok} + catch _:{badmatch, Err} -> + {atomic, Err} + end; odbc -> F = fun() -> ejabberd_odbc:sql_query_t([<<"delete from motd;">>]) @@ -915,6 +939,23 @@ send_motd(#jid{luser = LUser, lserver = LServer} = JID, mnesia) -> _ -> ok end; +send_motd(#jid{luser = LUser, lserver = LServer} = JID, riak) -> + case catch ejabberd_riak:get(motd, LServer) of + {ok, #motd{packet = Packet}} -> + US = {LUser, LServer}, + case ejabberd_riak:get(motd_users, US) of + {ok, #motd_users{}} -> + ok; + _ -> + Local = jlib:make_jid(<<>>, LServer, <<>>), + ejabberd_router:route(Local, JID, Packet), + {atomic, ejabberd_riak:put( + #motd_users{us = US}, + [{'2i', [{<<"server">>, LServer}]}])} + end; + _ -> + ok + end; send_motd(#jid{luser = LUser, lserver = LServer} = JID, odbc) when LUser /= <<>> -> case catch ejabberd_odbc:sql_query( LServer, [<<"select xml from motd where username='';">>]) of @@ -965,6 +1006,13 @@ get_stored_motd_packet(LServer, mnesia) -> _ -> error end; +get_stored_motd_packet(LServer, riak) -> + case ejabberd_riak:get(motd, LServer) of + {ok, #motd{packet = Packet}} -> + {ok, Packet}; + _ -> + error + end; get_stored_motd_packet(LServer, odbc) -> case catch ejabberd_odbc:sql_query( LServer, [<<"select xml from motd where username='';">>]) of diff --git a/src/mod_blocking.erl b/src/mod_blocking.erl index 797b7573b..1bd7ae3b5 100644 --- a/src/mod_blocking.erl +++ b/src/mod_blocking.erl @@ -181,6 +181,37 @@ process_blocklist_block(LUser, LServer, Filter, {ok, NewDefault, NewList} end, mnesia:transaction(F); +process_blocklist_block(LUser, LServer, Filter, + riak) -> + {atomic, + begin + case ejabberd_riak:get(privacy, {LUser, LServer}) of + {ok, #privacy{default = Default, lists = Lists} = P} -> + case lists:keysearch(Default, 1, Lists) of + {value, {_, List}} -> + NewDefault = Default, + NewLists1 = lists:keydelete(Default, 1, Lists); + false -> + NewDefault = <<"Blocked contacts">>, + NewLists1 = Lists, + List = [] + end; + {error, _} -> + P = #privacy{us = {LUser, LServer}}, + NewDefault = <<"Blocked contacts">>, + NewLists1 = [], + List = [] + end, + NewList = Filter(List), + NewLists = [{NewDefault, NewList} | NewLists1], + case ejabberd_riak:put(P#privacy{default = NewDefault, + lists = NewLists}) of + ok -> + {ok, NewDefault, NewList}; + Err -> + Err + end + end}; process_blocklist_block(LUser, LServer, Filter, odbc) -> F = fun () -> Default = case @@ -256,6 +287,30 @@ process_blocklist_unblock_all(LUser, LServer, Filter, end end, mnesia:transaction(F); +process_blocklist_unblock_all(LUser, LServer, Filter, + riak) -> + {atomic, + case ejabberd_riak:get(privacy, {LUser, LServer}) of + {ok, #privacy{default = Default, lists = Lists} = P} -> + case lists:keysearch(Default, 1, Lists) of + {value, {_, List}} -> + NewList = Filter(List), + NewLists1 = lists:keydelete(Default, 1, Lists), + NewLists = [{Default, NewList} | NewLists1], + case ejabberd_riak:put(P#privacy{lists = NewLists}) of + ok -> + {ok, Default, NewList}; + Err -> + Err + end; + false -> + %% No default list, nothing to unblock + ok + end; + {error, _} -> + %% No lists, nothing to unblock + ok + end}; process_blocklist_unblock_all(LUser, LServer, Filter, odbc) -> F = fun () -> @@ -331,6 +386,30 @@ process_blocklist_unblock(LUser, LServer, Filter, end end, mnesia:transaction(F); +process_blocklist_unblock(LUser, LServer, Filter, + riak) -> + {atomic, + case ejabberd_riak:get(privacy, {LUser, LServer}) of + {error, _} -> + %% No lists, nothing to unblock + ok; + {ok, #privacy{default = Default, lists = Lists} = P} -> + case lists:keysearch(Default, 1, Lists) of + {value, {_, List}} -> + NewList = Filter(List), + NewLists1 = lists:keydelete(Default, 1, Lists), + NewLists = [{Default, NewList} | NewLists1], + case ejabberd_riak:put(P#privacy{lists = NewLists}) of + ok -> + {ok, Default, NewList}; + Err -> + Err + end; + false -> + %% No default list, nothing to unblock + ok + end + end}; process_blocklist_unblock(LUser, LServer, Filter, odbc) -> F = fun () -> @@ -409,6 +488,18 @@ process_blocklist_get(LUser, LServer, mnesia) -> _ -> [] end end; +process_blocklist_get(LUser, LServer, riak) -> + case ejabberd_riak:get(privacy, {LUser, LServer}) of + {ok, #privacy{default = Default, lists = Lists}} -> + case lists:keysearch(Default, 1, Lists) of + {value, {_, List}} -> List; + _ -> [] + end; + {error, notfound} -> + []; + {error, _} -> + error + end; process_blocklist_get(LUser, LServer, odbc) -> case catch mod_privacy:sql_get_default_privacy_list(LUser, LServer) diff --git a/src/mod_caps.erl b/src/mod_caps.erl index 5f529bd28..df00dbfb1 100644 --- a/src/mod_caps.erl +++ b/src/mod_caps.erl @@ -430,12 +430,49 @@ caps_read_fun(Node) -> [#caps_features{features = Features}] -> {ok, Features}; _ -> error end + end; +caps_read_fun(_LServer, Node, riak) -> + fun() -> + case ejabberd_riak:get(caps_features, Node) of + {ok, #caps_features{features = Features}} -> {ok, Features}; + _ -> error + end + end; +caps_read_fun(LServer, {Node, SubNode}, odbc) -> + fun() -> + SNode = ejabberd_odbc:escape(Node), + SSubNode = ejabberd_odbc:escape(SubNode), + case ejabberd_odbc:sql_query( + LServer, [<<"select feature from caps_features where ">>, + <<"node='">>, SNode, <<"' and subnode='">>, + SSubNode, <<"';">>]) of + {selected, [<<"feature">>], [[H]|_] = Fs} -> + case catch jlib:binary_to_integer(H) of + Int when is_integer(Int), Int>=0 -> + {ok, Int}; + _ -> + {ok, lists:flatten(Fs)} + end; + _ -> + error + end end. caps_write_fun(Node, Features) -> fun () -> mnesia:dirty_write(#caps_features{node_pair = Node, features = Features}) + end; +caps_write_fun(_LServer, Node, Features, riak) -> + fun () -> + ejabberd_riak:put(#caps_features{node_pair = Node, + features = Features}) + end; +caps_write_fun(LServer, NodePair, Features, odbc) -> + fun () -> + ejabberd_odbc:sql_transaction( + LServer, + sql_write_features_t(NodePair, Features)) end. make_my_disco_hash(Host) -> diff --git a/src/mod_irc.erl b/src/mod_irc.erl index c7dda8303..0e52b2342 100644 --- a/src/mod_irc.erl +++ b/src/mod_irc.erl @@ -591,6 +591,17 @@ get_data(_LServer, Host, From, mnesia) -> [] -> empty; [#irc_custom{data = Data}] -> Data end; +get_data(LServer, Host, From, riak) -> + #jid{luser = LUser, lserver = LServer} = From, + US = {LUser, LServer}, + case ejabberd_riak:get(irc_custom, {US, Host}) of + {ok, #irc_custom{data = Data}} -> + Data; + {error, notfound} -> + empty; + _Err -> + error + end; get_data(LServer, Host, From, odbc) -> SJID = ejabberd_odbc:escape(jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(From)))), @@ -723,6 +734,11 @@ set_data(_LServer, Host, From, Data, mnesia) -> data = Data}) end, mnesia:transaction(F); +set_data(LServer, Host, From, Data, riak) -> + {LUser, LServer, _} = jlib:jid_tolower(From), + US = {LUser, LServer}, + {atomic, ejabberd_riak:put(#irc_custom{us_host = {US, Host}, + data = Data})}; set_data(LServer, Host, From, Data, odbc) -> SJID = ejabberd_odbc:escape(jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(From)))), diff --git a/src/mod_last.erl b/src/mod_last.erl index 6b7a06bed..22f9ded97 100644 --- a/src/mod_last.erl +++ b/src/mod_last.erl @@ -168,6 +168,16 @@ get_last(LUser, LServer, mnesia) -> status = Status}] -> {ok, TimeStamp, Status} end; +get_last(LUser, LServer, riak) -> + case ejabberd_riak:get(last_activity, {LUser, LServer}) of + {ok, #last_activity{timestamp = TimeStamp, + status = Status}} -> + {ok, TimeStamp, Status}; + {error, notfound} -> + not_found; + Err -> + Err + end; get_last(LUser, LServer, odbc) -> Username = ejabberd_odbc:escape(LUser), case catch odbc_queries:get_last(LServer, Username) of @@ -235,6 +245,12 @@ store_last_info(LUser, LServer, TimeStamp, Status, status = Status}) end, mnesia:transaction(F); +store_last_info(LUser, LServer, TimeStamp, Status, + riak) -> + US = {LUser, LServer}, + {atomic, ejabberd_riak:put(#last_activity{us = US, + timestamp = TimeStamp, + status = Status})}; store_last_info(LUser, LServer, TimeStamp, Status, odbc) -> Username = ejabberd_odbc:escape(LUser), @@ -264,7 +280,9 @@ remove_user(LUser, LServer, mnesia) -> mnesia:transaction(F); remove_user(LUser, LServer, odbc) -> Username = ejabberd_odbc:escape(LUser), - odbc_queries:del_last(LServer, Username). + odbc_queries:del_last(LServer, Username); +remove_user(LUser, LServer, riak) -> + {atomic, ejabberd_riak:delete(last_activity, {LUser, LServer})}. update_table() -> Fields = record_info(fields, last_activity), diff --git a/src/mod_muc.erl b/src/mod_muc.erl index 160b9009d..408700909 100644 --- a/src/mod_muc.erl +++ b/src/mod_muc.erl @@ -147,6 +147,9 @@ store_room(_LServer, Host, Name, Opts, mnesia) -> opts = Opts}) end, mnesia:transaction(F); +store_room(_LServer, Host, Name, Opts, riak) -> + {atomic, ejabberd_riak:put(#muc_room{name_host = {Name, Host}, + opts = Opts})}; store_room(LServer, Host, Name, Opts, odbc) -> SName = ejabberd_odbc:escape(Name), SHost = ejabberd_odbc:escape(Host), @@ -170,6 +173,11 @@ restore_room(_LServer, Host, Name, mnesia) -> [#muc_room{opts = Opts}] -> Opts; _ -> error end; +restore_room(_LServer, Host, Name, riak) -> + case ejabberd_riak:get(muc_room, {Name, Host}) of + {ok, #muc_room{opts = Opts}} -> Opts; + _ -> error + end; restore_room(LServer, Host, Name, odbc) -> SName = ejabberd_odbc:escape(Name), SHost = ejabberd_odbc:escape(Host), @@ -192,6 +200,8 @@ forget_room(_LServer, Host, Name, mnesia) -> F = fun () -> mnesia:delete({muc_room, {Name, Host}}) end, mnesia:transaction(F); +forget_room(_LServer, Host, Name, riak) -> + {atomic, ejabberd_riak:delete(muc_room, {Name, Host})}; forget_room(LServer, Host, Name, odbc) -> SName = ejabberd_odbc:escape(Name), SHost = ejabberd_odbc:escape(Host), @@ -231,6 +241,18 @@ can_use_nick(_LServer, Host, JID, Nick, mnesia) -> [] -> true; [#muc_registered{us_host = {U, _Host}}] -> U == LUS end; +can_use_nick(LServer, Host, JID, Nick, riak) -> + {LUser, LServer, _} = jlib:jid_tolower(JID), + LUS = {LUser, LServer}, + case ejabberd_riak:get_by_index(muc_registered, + <<"nick_host">>, {Nick, Host}) of + {ok, []} -> + true; + {ok, [#muc_registered{us_host = {U, _Host}}]} -> + U == LUS; + {error, _} -> + true + end; can_use_nick(LServer, Host, JID, Nick, odbc) -> SJID = jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(JID))), @@ -617,6 +639,16 @@ get_rooms(_LServer, Host, mnesia) -> {'EXIT', Reason} -> ?ERROR_MSG("~p", [Reason]), []; Rs -> Rs end; +get_rooms(_LServer, Host, riak) -> + case ejabberd_riak:get(muc_room) of + {ok, Rs} -> + lists:filter( + fun(#muc_room{name_host = {_, H}}) -> + Host == H + end, Rs); + _Err -> + [] + end; get_rooms(LServer, Host, odbc) -> SHost = ejabberd_odbc:escape(Host), case catch ejabberd_odbc:sql_query(LServer, @@ -839,6 +871,13 @@ get_nick(_LServer, Host, From, mnesia) -> [] -> error; [#muc_registered{nick = Nick}] -> Nick end; +get_nick(LServer, Host, From, riak) -> + {LUser, LServer, _} = jlib:jid_tolower(From), + US = {LUser, LServer}, + case ejabberd_riak:get(muc_registered, {US, Host}) of + {ok, #muc_registered{nick = Nick}} -> Nick; + {error, _} -> error + end; get_nick(LServer, Host, From, odbc) -> SJID = ejabberd_odbc:escape(jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(From)))), @@ -922,6 +961,33 @@ set_nick(_LServer, Host, From, Nick, mnesia) -> end end, mnesia:transaction(F); +set_nick(LServer, Host, From, Nick, riak) -> + {LUser, LServer, _} = jlib:jid_tolower(From), + LUS = {LUser, LServer}, + {atomic, + case Nick of + <<"">> -> + ejabberd_riak:delete(muc_registered, {LUS, Host}); + _ -> + Allow = case ejabberd_riak:get_by_index( + muc_registered, + <<"nick_host">>, {Nick, Host}) of + {ok, []} -> + true; + {ok, [#muc_registered{us_host = {U, _Host}}]} -> + U == LUS; + {error, _} -> + false + end, + if Allow -> + ejabberd_riak:put(#muc_registered{us_host = {LUS, Host}, + nick = Nick}, + [{'2i', [{<<"nick_host">>, + {Nick, Host}}]}]); + true -> + false + end + end}; set_nick(LServer, Host, From, Nick, odbc) -> JID = jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(From))), diff --git a/src/mod_offline.erl b/src/mod_offline.erl index 5e2d80aa2..c5dc305d6 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -187,43 +187,9 @@ store_offline_msg(Host, {User, _}, Msgs, Len, MaxOfflineMsgs, discard_warn_sender(Msgs); true -> lists:foreach( - fun(M) -> - Username = User, - From = M#offline_msg.from, - To = M#offline_msg.to, - #xmlel{name = Name, attrs = Attrs, - children = Els} = - M#offline_msg.packet, - Attrs2 = jlib:replace_from_to_attrs( - jlib:jid_to_string(From), - jlib:jid_to_string(To), - Attrs), - Packet = #xmlel{name = Name, - attrs = Attrs2, - children = - Els ++ - [jlib:timestamp_to_xml( - calendar:now_to_universal_time( - M#offline_msg.timestamp), - utc, - jlib:make_jid(<<"">>, Host, <<"">>), - <<"Offline Storage">>), - jlib:timestamp_to_xml( - calendar:now_to_universal_time( - M#offline_msg.timestamp))]}, - XML = xml:element_to_binary(Packet), - {MegaSecs, Secs, MicroSecs} = - M#offline_msg.timestamp, - TS = - iolist_to_binary( - io_lib:format("~6..0w~6..0w.~6..0w", - [MegaSecs, Secs, MicroSecs])), - ejabberd_riak:put( - Host, <<"offline">>, - undefined, XML, - [{<<"user_bin">>, Username}, - {<<"timestamp_bin">>, TS} - ]) + fun(#offline_msg{us = US, + timestamp = TS} = M) -> + ejabberd_riak:put(M, [{i, TS}, {'2i', [{<<"us">>, US}]}]) end, Msgs) end. @@ -244,7 +210,7 @@ receive_all(US, Msgs, DBType) -> case DBType of mnesia -> Msgs; odbc -> lists:reverse(Msgs); - riak -> lists:reverse(Msgs) + riak -> Msgs end end. @@ -474,41 +440,30 @@ pop_offline_messages(Ls, LUser, LServer, odbc) -> _ -> Ls end; pop_offline_messages(Ls, LUser, LServer, riak) -> - Username = LUser, - case ejabberd_riak:get_objects_by_index( - LServer, <<"offline">>, <<"user_bin">>, Username) of + case ejabberd_riak:get_by_index(offline_msg, + <<"us">>, {LUser, LServer}) of {ok, Rs} -> - SortedRs = - lists:sort(fun(X, Y) -> - MX = riak_object:get_metadata(X), - {ok, IX} = dict:find(<<"index">>, MX), - {value, TSX} = lists:keysearch( - <<"timestamp_bin">>, 1, - IX), - MY = riak_object:get_metadata(Y), - {ok, IY} = dict:find(<<"index">>, MY), - {value, TSY} = lists:keysearch( - <<"timestamp_bin">>, 1, - IY), - TSX =< TSY - end, Rs), - Ls ++ lists:flatmap( - fun(R) -> - Key = riak_object:key(R), - ejabberd_riak:delete(LServer, <<"offline">>, Key), - XML = riak_object:get_value(R), - case xml_stream:parse_element(XML) of - {error, _Reason} -> - []; - El -> - case offline_msg_to_route(LServer, El) of - error -> - []; - RouteMsg -> - [RouteMsg] - end - end - end, SortedRs); + try + lists:foreach( + fun(#offline_msg{timestamp = T}) -> + ok = ejabberd_riak:delete(offline_msg, T) + end, Rs), + TS = now(), + Ls ++ lists:map( + fun (R) -> + offline_msg_to_route(LServer, R) + end, + lists:filter( + fun(R) -> + case R#offline_msg.expire of + never -> true; + TimeStamp -> TS < TimeStamp + end + end, + lists:keysort(#offline_msg.timestamp, Rs))) + catch _:{badmatch, _} -> + Ls + end; _ -> Ls end. @@ -579,17 +534,8 @@ remove_user(LUser, LServer, odbc) -> Username = ejabberd_odbc:escape(LUser), odbc_queries:del_spool_msg(LServer, Username); remove_user(LUser, LServer, riak) -> - Username = LUser, - case ejabberd_riak:get_keys_by_index( - LServer, <<"offline">>, <<"user_bin">>, Username) of - {ok, Keys} -> - lists:foreach( - fun(Key) -> - ejabberd_riak:delete(LServer, <<"offline">>, Key) - end, Keys); - _ -> - ok - end. + {atomic, ejabberd_riak:delete_by_index(offline_msg, + <<"us">>, {LUser, LServer})}. jid_to_binary(#jid{user = U, server = S, resource = R, luser = LU, lserver = LS, lresource = LR}) -> @@ -650,6 +596,8 @@ get_offline_els(LUser, LServer) -> get_offline_els(LUser, LServer, mnesia) -> Msgs = read_all_msgs(LUser, LServer, mnesia), +get_offline_els(LUser, LServer, DBType) when DBType == mnesia; DBType == riak -> + Msgs = read_all_msgs(LUser, LServer, DBType), lists:map( fun(Msg) -> {route, From, To, Packet} = offline_msg_to_route(LServer, Msg), @@ -706,6 +654,14 @@ read_all_msgs(LUser, LServer, mnesia) -> US = {LUser, LServer}, lists:keysort(#offline_msg.timestamp, mnesia:dirty_read({offline_msg, US})); +read_all_msgs(LUser, LServer, riak) -> + case ejabberd_riak:get_by_index( + offline_msg, <<"us">>, {LUser, LServer}) of + {ok, Rs} -> + lists:keysort(#offline_msg.timestamp, Rs); + _Err -> + [] + end; read_all_msgs(LUser, LServer, odbc) -> Username = ejabberd_odbc:escape(LUser), case catch ejabberd_odbc:sql_query(LServer, @@ -723,7 +679,7 @@ read_all_msgs(LUser, LServer, odbc) -> _ -> [] end. -format_user_queue(Msgs, mnesia) -> +format_user_queue(Msgs, DBType) when DBType == mnesia; DBType == riak -> lists:map(fun (#offline_msg{timestamp = TimeStamp, from = From, to = To, packet = @@ -831,6 +787,26 @@ user_queue_parse_query(LUser, LServer, Query, mnesia) -> ok; false -> nothing end; +user_queue_parse_query(LUser, LServer, Query, riak) -> + case lists:keysearch(<<"delete">>, 1, Query) of + {value, _} -> + Msgs = read_all_msgs(LUser, LServer, riak), + lists:foreach( + fun (Msg) -> + ID = jlib:encode_base64((term_to_binary(Msg))), + case lists:member({<<"selected">>, ID}, Query) of + true -> + ejabberd_riak:delete(offline_msg, + Msg#offline_msg.timestamp); + false -> + ok + end + end, + Msgs), + ok; + false -> + nothing + end; user_queue_parse_query(LUser, LServer, Query, odbc) -> Username = ejabberd_odbc:escape(LUser), case lists:keysearch(<<"delete">>, 1, Query) of @@ -889,6 +865,14 @@ get_queue_length(LUser, LServer) -> get_queue_length(LUser, LServer, mnesia) -> length(mnesia:dirty_read({offline_msg, {LUser, LServer}})); +get_queue_length(LUser, LServer, riak) -> + case ejabberd_riak:count_by_index(offline_msg, + <<"us">>, {LUser, LServer}) of + {ok, N} -> + N; + _ -> + 0 + end; get_queue_length(LUser, LServer, odbc) -> Username = ejabberd_odbc:escape(LUser), case catch ejabberd_odbc:sql_query(LServer, @@ -917,7 +901,8 @@ get_messages_subset(User, Host, MsgsAll, DBType) -> get_messages_subset2(Max, Length, MsgsAll, _DBType) when Length =< Max * 2 -> MsgsAll; -get_messages_subset2(Max, Length, MsgsAll, mnesia) -> +get_messages_subset2(Max, Length, MsgsAll, DBType) + when DBType == mnesia; DBType == riak -> FirstN = Max, {MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll), MsgsLastN = lists:nthtail(Length - FirstN - FirstN, @@ -965,6 +950,10 @@ delete_all_msgs(LUser, LServer, mnesia) -> mnesia:dirty_read({offline_msg, US})) end, mnesia:transaction(F); +delete_all_msgs(LUser, LServer, riak) -> + Res = ejabberd_riak:delete_by_index(offline_msg, + <<"us">>, {LUser, LServer}), + {atomic, Res}; delete_all_msgs(LUser, LServer, odbc) -> Username = ejabberd_odbc:escape(LUser), odbc_queries:del_spool_msg(LServer, Username), @@ -987,16 +976,44 @@ webadmin_user_parse_query(Acc, _Action, _User, _Server, Acc. %% Returns as integer the number of offline messages for a given user -count_offline_messages(LUser, LServer) -> +count_offline_messages(User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + DBType = gen_mod:db_type(LServer, ?MODULE), + count_offline_messages(LUser, LServer, DBType). + +count_offline_messages(LUser, LServer, mnesia) -> + US = {LUser, LServer}, + F = fun () -> + p1_mnesia:count_records(offline_msg, + #offline_msg{us = US, _ = '_'}) + end, + case catch mnesia:async_dirty(F) of + I when is_integer(I) -> I; + _ -> 0 + end; +count_offline_messages(LUser, LServer, odbc) -> Username = ejabberd_odbc:escape(LUser), - case catch odbc_queries:count_records_where( - LServer, "spool", - <<"where username='", Username/binary, "'">>) of - {selected, [_], [[Res]]} -> - jlib:binary_to_integer(Res); + case catch odbc_queries:count_records_where(LServer, + <<"spool">>, + <<"where username='", + Username/binary, "'">>) + of + {selected, [_], [[Res]]} -> + jlib:binary_to_integer(Res); + _ -> 0 + end; +count_offline_messages(LUser, LServer, riak) -> + case ejabberd_riak:count_by_index( + offline_msg, <<"us">>, {LUser, LServer}) of + {ok, Res} -> + Res; _ -> 0 - end. + end; +count_offline_messages(_Acc, User, Server) -> + N = count_offline_messages(User, Server), + {stop, N}. export(_Server) -> [{offline_msg, diff --git a/src/mod_privacy.erl b/src/mod_privacy.erl index 6b852bb47..2286875e3 100644 --- a/src/mod_privacy.erl +++ b/src/mod_privacy.erl @@ -160,6 +160,21 @@ process_lists_get(LUser, LServer, _Active, mnesia) -> Lists), {Default, LItems} end; +process_lists_get(LUser, LServer, _Active, riak) -> + case ejabberd_riak:get(privacy, {LUser, LServer}) of + {ok, #privacy{default = Default, lists = Lists}} -> + LItems = lists:map(fun ({N, _}) -> + #xmlel{name = <<"list">>, + attrs = [{<<"name">>, N}], + children = []} + end, + Lists), + {Default, LItems}; + {error, notfound} -> + {none, []}; + {error, _} -> + error + end; process_lists_get(LUser, LServer, _Active, odbc) -> Default = case catch sql_get_default_privacy_list(LUser, LServer) @@ -209,6 +224,18 @@ process_list_get(LUser, LServer, Name, mnesia) -> _ -> not_found end end; +process_list_get(LUser, LServer, Name, riak) -> + case ejabberd_riak:get(privacy, {LUser, LServer}) of + {ok, #privacy{lists = Lists}} -> + case lists:keysearch(Name, 1, Lists) of + {value, {_, List}} -> List; + _ -> not_found + end; + {error, notfound} -> + not_found; + {error, _} -> + error + end; process_list_get(LUser, LServer, Name, odbc) -> case catch sql_get_privacy_list_id(LUser, LServer, Name) of @@ -354,6 +381,20 @@ process_default_set(LUser, LServer, {value, Name}, end end, mnesia:transaction(F); +process_default_set(LUser, LServer, {value, Name}, riak) -> + {atomic, + case ejabberd_riak:get(privacy, {LUser, LServer}) of + {ok, #privacy{lists = Lists} = P} -> + case lists:keymember(Name, 1, Lists) of + true -> + ejabberd_riak:put(P#privacy{default = Name, + lists = Lists}); + false -> + not_found + end; + {error, _} -> + not_found + end}; process_default_set(LUser, LServer, {value, Name}, odbc) -> F = fun () -> @@ -375,6 +416,14 @@ process_default_set(LUser, LServer, false, mnesia) -> end end, mnesia:transaction(F); +process_default_set(LUser, LServer, false, riak) -> + {atomic, + case ejabberd_riak:get(privacy, {LUser, LServer}) of + {ok, R} -> + ejabberd_riak:put(R#privacy{default = none}); + {error, _} -> + ok + end}; process_default_set(LUser, LServer, false, odbc) -> case catch sql_unset_default_privacy_list(LUser, LServer) @@ -407,6 +456,16 @@ process_active_set(LUser, LServer, Name, mnesia) -> false -> error end end; +process_active_set(LUser, LServer, Name, riak) -> + case ejabberd_riak:get(privacy, {LUser, LServer}) of + {ok, #privacy{lists = Lists}} -> + case lists:keysearch(Name, 1, Lists) of + {value, {_, List}} -> List; + false -> error + end; + {error, _} -> + error + end; process_active_set(LUser, LServer, Name, odbc) -> case catch sql_get_privacy_list_id(LUser, LServer, Name) of @@ -438,6 +497,19 @@ remove_privacy_list(LUser, LServer, Name, mnesia) -> end end, mnesia:transaction(F); +remove_privacy_list(LUser, LServer, Name, riak) -> + {atomic, + case ejabberd_riak:get(privacy, {LUser, LServer}) of + {ok, #privacy{default = Default, lists = Lists} = P} -> + if Name == Default -> + conflict; + true -> + NewLists = lists:keydelete(Name, 1, Lists), + ejabberd_riak:put(P#privacy{lists = NewLists}) + end; + {error, _} -> + ok + end}; remove_privacy_list(LUser, LServer, Name, odbc) -> F = fun () -> case sql_get_default_privacy_list_t(LUser) of @@ -465,6 +537,18 @@ set_privacy_list(LUser, LServer, Name, List, mnesia) -> end end, mnesia:transaction(F); +set_privacy_list(LUser, LServer, Name, List, riak) -> + {atomic, + case ejabberd_riak:get(privacy, {LUser, LServer}) of + {ok, #privacy{lists = Lists} = P} -> + NewLists1 = lists:keydelete(Name, 1, Lists), + NewLists = [{Name, List} | NewLists1], + ejabberd_riak:put(P#privacy{lists = NewLists}); + {error, _} -> + NewLists = [{Name, List}], + ejabberd_riak:put(#privacy{us = {LUser, LServer}, + lists = NewLists}) + end}; set_privacy_list(LUser, LServer, Name, List, odbc) -> RItems = lists:map(fun item_to_raw/1, List), F = fun () -> @@ -649,6 +733,20 @@ get_user_list(_, LUser, LServer, mnesia) -> end; _ -> {none, []} end; +get_user_list(_, LUser, LServer, riak) -> + case ejabberd_riak:get(privacy, {LUser, LServer}) of + {ok, #privacy{default = Default, lists = Lists}} -> + case Default of + none -> {none, []}; + _ -> + case lists:keysearch(Default, 1, Lists) of + {value, {_, List}} -> {Default, List}; + _ -> {none, []} + end + end; + {error, _} -> + {none, []} + end; get_user_list(_, LUser, LServer, odbc) -> case catch sql_get_default_privacy_list(LUser, LServer) of @@ -680,6 +778,13 @@ get_user_lists(LUser, LServer, mnesia) -> _ -> error end; +get_user_lists(LUser, LServer, riak) -> + case ejabberd_riak:get(privacy, {LUser, LServer}) of + {ok, #privacy{} = P} -> + {ok, P}; + {error, _} -> + error + end; get_user_lists(LUser, LServer, odbc) -> Default = case catch sql_get_default_privacy_list(LUser, LServer) of {selected, [<<"name">>], []} -> @@ -843,6 +948,8 @@ remove_user(LUser, LServer, mnesia) -> F = fun () -> mnesia:delete({privacy, {LUser, LServer}}) end, mnesia:transaction(F); +remove_user(LUser, LServer, riak) -> + {atomic, ejabberd_riak:delete(privacy, {LUser, LServer})}; remove_user(LUser, LServer, odbc) -> sql_del_privacy_lists(LUser, LServer). diff --git a/src/mod_private.erl b/src/mod_private.erl index 301880e97..a925eecdf 100644 --- a/src/mod_private.erl +++ b/src/mod_private.erl @@ -152,13 +152,9 @@ set_data(LUser, LServer, {XMLNS, El}, odbc) -> odbc_queries:set_private_data(LServer, Username, LXMLNS, SData); set_data(LUser, LServer, {XMLNS, El}, riak) -> - Username = LUser, - Key = <>, - SData = xml:element_to_binary(El), - ejabberd_riak:put( - LServer, <<"private">>, Key, SData, - [{<<"user_bin">>, Username}]), - ok. + ejabberd_riak:put(#private_storage{usns = {LUser, LServer, XMLNS}, + xml = El}, + [{'2i', [{<<"us">>, {LUser, LServer}}]}]). get_data(LUser, LServer, Data) -> get_data(LUser, LServer, @@ -195,18 +191,13 @@ get_data(LUser, LServer, odbc, [{XMLNS, El} | Els], end; get_data(LUser, LServer, riak, [{XMLNS, El} | Els], Res) -> - Key = <>, - case ejabberd_riak:get(LServer, <<"private">>, Key) of - {ok, SData} -> - case xml_stream:parse_element(SData) of - Data when element(1, Data) == xmlelement -> - get_data(LUser, LServer, riak, Els, [Data | Res]) - end; - _ -> - get_data(LUser, LServer, riak, Els, [El | Res]) + case ejabberd_riak:get(private_storage, {LUser, LServer, XMLNS}) of + {ok, #private_storage{xml = NewEl}} -> + get_data(LUser, LServer, riak, Els, [NewEl|Res]); + _ -> + get_data(LUser, LServer, riak, Els, [El|Res]) end. - get_data(LUser, LServer) -> get_all_data(LUser, LServer, gen_mod:db_type(LServer, ?MODULE)). @@ -234,19 +225,10 @@ get_all_data(LUser, LServer, odbc) -> [] end; get_all_data(LUser, LServer, riak) -> - Username = LUser, case ejabberd_riak:get_by_index( - LServer, <<"private">>, <<"user_bin">>, Username) of + private_storage, <<"us">>, {LUser, LServer}) of {ok, Res} -> - lists:flatmap( - fun(SData) -> - case xml_stream:parse_element(SData) of - #xmlel{} = El -> - [El]; - _ -> - [] - end - end, Res); + [El || #private_storage{xml = El} <- Res]; _ -> [] end. @@ -279,17 +261,8 @@ remove_user(LUser, LServer, odbc) -> odbc_queries:del_user_private_storage(LServer, Username); remove_user(LUser, LServer, riak) -> - Username = LUser, - case ejabberd_riak:get_keys_by_index( - LServer, <<"private">>, <<"user_bin">>, Username) of - {ok, Keys} -> - lists:foreach( - fun(Key) -> - ejabberd_riak:delete(LServer, <<"private">>, Key) - end, Keys); - _ -> - ok - end. + {atomic, ejabberd_riak:delete_by_index(private_storage, + <<"us">>, {LUser, LServer})}. update_table() -> Fields = record_info(fields, private_storage), diff --git a/src/mod_roster.erl b/src/mod_roster.erl index 8ef70eb59..24386e8e8 100644 --- a/src/mod_roster.erl +++ b/src/mod_roster.erl @@ -206,11 +206,9 @@ read_roster_version(LUser, LServer, odbc) -> {selected, [<<"version">>], []} -> error end; read_roster_version(LServer, LUser, riak) -> - Username = LUser, - case ejabberd_riak:get(LServer, <<"roster_version">>, - Username) of + case ejabberd_riak:get(roster_version, {LUser, LServer}) of {ok, Version} -> Version; - {error, notfound} -> error + _Err -> error end. write_roster_version(LUser, LServer) -> @@ -249,8 +247,8 @@ write_roster_version(LUser, LServer, InTransaction, Ver, end; write_roster_version(LUser, LServer, _InTransaction, Ver, riak) -> - Username = LUser, - riak_set_roster_version(LServer, Username, Ver). + US = {LUser, LServer}, + ejabberd_riak:put(#roster_version{us = US, version = Ver}). %% Load roster from DB only if neccesary. %% It is neccesary if @@ -358,6 +356,11 @@ get_roster(LUser, LServer, mnesia) -> Items when is_list(Items)-> Items; _ -> [] end; +get_roster(LUser, LServer, riak) -> + case ejabberd_riak:get_by_index(roster, <<"us">>, {LUser, LServer}) of + {ok, Items} -> Items; + _Err -> [] + end; get_roster(LUser, LServer, odbc) -> Username = ejabberd_odbc:escape(LUser), case catch odbc_queries:get_roster(LServer, Username) of @@ -399,37 +402,6 @@ get_roster(LUser, LServer, odbc) -> Items), RItems; _ -> [] - end; -get_roster(LUser, LServer, riak) -> - Username = LUser, - case catch riak_get_roster(LServer, Username) of - {ok, Items} when is_list(Items) -> - JIDGroups = case riak_get_roster_jid_groups(LServer, Username) of - {ok, JGrps} when is_list(JGrps) -> - JGrps; - _ -> - [] - end, - GroupsDict = dict:from_list(JIDGroups), - RItems = lists:flatmap( - fun(I) -> - case riak_raw_to_record(LServer, I) of - %% Bad JID in database: - error -> - []; - R -> - SJID = jlib:jid_to_string(R#roster.jid), - Groups = - case dict:find(SJID, GroupsDict) of - {ok, Gs} -> Gs; - error -> [] - end, - [R#roster{groups = Groups}] - end - end, Items), - RItems; - _ -> - [] end. item_to_xml(Item) -> @@ -499,29 +471,15 @@ get_roster_by_jid_t(LUser, LServer, LJID, odbc) -> end end; get_roster_by_jid_t(LUser, LServer, LJID, riak) -> - Username = LUser, - SJID = jlib:jid_to_string(LJID), - Res = riak_get_roster_by_jid(LServer, Username, SJID), - case Res of - {error, _} -> - #roster{usj = {LUser, LServer, LJID}, - us = {LUser, LServer}, - jid = LJID}; + case ejabberd_riak:get(roster, {LUser, LServer, LJID}) of {ok, I} -> - R = riak_raw_to_record(LServer, I), - case R of - %% Bad JID in database: - error -> - #roster{usj = {LUser, LServer, LJID}, - us = {LUser, LServer}, - jid = LJID}; - _ -> - R#roster{ - usj = {LUser, LServer, LJID}, - us = {LUser, LServer}, - jid = LJID, - name = ""} - end + I#roster{jid = LJID, name = <<"">>, groups = [], + xs = []}; + {error, notfound} -> + #roster{usj = {LUser, LServer, LJID}, + us = {LUser, LServer}, jid = LJID}; + Err -> + exit(Err) end. try_process_iq_set(From, To, #iq{sub_el = SubEl} = IQ) -> @@ -702,12 +660,9 @@ get_subscription_lists(_, LUser, LServer, odbc) -> _ -> [] end; get_subscription_lists(_, LUser, LServer, riak) -> - Username = LUser, - case catch riak_get_roster(LServer, Username) of - {ok, Items} when is_list(Items) -> - lists:map(fun(I) -> riak_raw_to_record(LServer, I) end, Items); - _ -> - [] + case ejabberd_riak:get_by_index(roster, <<"us">>, {LUser, LServer}) of + {ok, Items} -> Items; + _Err -> [] end. fill_subscription_lists(LServer, [#roster{} = I | Is], @@ -747,11 +702,9 @@ roster_subscribe_t(LUser, LServer, LJID, Item, odbc) -> SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)), odbc_queries:roster_subscribe(LServer, Username, SJID, ItemVals); -roster_subscribe_t(LUser, LServer, LJID, Item, riak) -> - ItemVals = riak_record_to_string(Item), - Username = LUser, - SJID = jlib:jid_to_string(LJID), - riak_roster_subscribe(LServer, Username, SJID, ItemVals). +roster_subscribe_t(LUser, LServer, _LJID, Item, riak) -> + ejabberd_riak:put(Item, + [{'2i', [{<<"us">>, {LUser, LServer}}]}]). transaction(LServer, F) -> case gen_mod:db_type(LServer, ?MODULE) of @@ -810,23 +763,14 @@ get_roster_by_jid_with_groups_t(LUser, LServer, LJID, us = {LUser, LServer}, jid = LJID} end; get_roster_by_jid_with_groups_t(LUser, LServer, LJID, riak) -> - Username = LUser, - SJID = jlib:jid_to_string(LJID), - case riak_get_roster_by_jid(LServer, Username, SJID) of + case ejabberd_riak:get(roster, {LUser, LServer, LJID}) of {ok, I} -> - R = riak_raw_to_record(LServer, I), - Groups = - case riak_get_roster_groups(LServer, Username, SJID) of - {ok, JGrps} when is_list(JGrps) -> - JGrps; - _ -> - [] - end, - R#roster{groups = Groups}; - {error, _} -> + I; + {error, notfound} -> #roster{usj = {LUser, LServer, LJID}, - us = {LUser, LServer}, - jid = LJID} + us = {LUser, LServer}, jid = LJID}; + Err -> + exit(Err) end. process_subscription(Direction, User, Server, JID1, @@ -1040,9 +984,7 @@ remove_user(LUser, LServer, odbc) -> odbc_queries:del_user_roster_t(LServer, Username), ok; remove_user(LUser, LServer, riak) -> - Username = LUser, - riak_del_user_roster(LServer, Username), - ok. + {atomic, ejabberd_riak:delete_by_index(roster, <<"us">>, {LUser, LServer})}. %% For each contact with Subscription: %% Both or From, send a "unsubscribed" presence stanza; @@ -1114,13 +1056,9 @@ update_roster_t(LUser, LServer, LJID, Item, odbc) -> ItemGroups = groups_to_string(Item), odbc_queries:update_roster(LServer, Username, SJID, ItemVals, ItemGroups); -update_roster_t(LUser, LServer, LJID, Item, riak) -> - Username = LUser, - SJID = jlib:jid_to_string(LJID), - ItemVals = riak_record_to_string(Item), - ItemGroups = riak_groups_to_binary(Item), - riak_update_roster( - LServer, Username, SJID, ItemVals, ItemGroups). +update_roster_t(LUser, LServer, _LJID, Item, riak) -> + ejabberd_riak:put(Item, + [{'2i', [{<<"us">>, {LUser, LServer}}]}]). del_roster_t(LUser, LServer, LJID) -> DBType = gen_mod:db_type(LServer, ?MODULE), @@ -1133,9 +1071,7 @@ del_roster_t(LUser, LServer, LJID, odbc) -> SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)), odbc_queries:del_roster(LServer, Username, SJID); del_roster_t(LUser, LServer, LJID, riak) -> - Username = LUser, - SJID = jlib:jid_to_string(LJID), - riak_del_roster(LServer, Username, SJID). + ejabberd_riak:delete(roster, {LUser, LServer, LJID}). process_item_set_t(LUser, LServer, #xmlel{attrs = Attrs, children = Els}) -> @@ -1201,40 +1137,35 @@ get_in_pending_subscriptions(Ls, User, Server) -> get_in_pending_subscriptions(Ls, User, Server, gen_mod:db_type(LServer, ?MODULE)). -get_in_pending_subscriptions(Ls, User, Server, - mnesia) -> +get_in_pending_subscriptions(Ls, User, Server, DBType) + when DBType == mnesia; DBType == riak -> JID = jlib:make_jid(User, Server, <<"">>), - US = {JID#jid.luser, JID#jid.lserver}, - case mnesia:dirty_index_read(roster, US, #roster.us) of - Result when is_list(Result) -> - Ls ++ - lists:map(fun (R) -> - Message = R#roster.askmessage, - Status = if is_binary(Message) -> (Message); - true -> <<"">> - end, - #xmlel{name = <<"presence">>, - attrs = - [{<<"from">>, - jlib:jid_to_string(R#roster.jid)}, - {<<"to">>, jlib:jid_to_string(JID)}, - {<<"type">>, <<"subscribe">>}], - children = - [#xmlel{name = <<"status">>, - attrs = [], - children = - [{xmlcdata, Status}]}]} - end, - lists:filter(fun (R) -> - case R#roster.ask of - in -> true; - both -> true; - _ -> false - end - end, - Result)); - _ -> Ls - end; + Result = get_roster(JID#jid.luser, JID#jid.lserver, DBType), + Ls ++ lists:map(fun (R) -> + Message = R#roster.askmessage, + Status = if is_binary(Message) -> (Message); + true -> <<"">> + end, + #xmlel{name = <<"presence">>, + attrs = + [{<<"from">>, + jlib:jid_to_string(R#roster.jid)}, + {<<"to">>, jlib:jid_to_string(JID)}, + {<<"type">>, <<"subscribe">>}], + children = + [#xmlel{name = <<"status">>, + attrs = [], + children = + [{xmlcdata, Status}]}]} + end, + lists:filter(fun (R) -> + case R#roster.ask of + in -> true; + both -> true; + _ -> false + end + end, + Result)); get_in_pending_subscriptions(Ls, User, Server, odbc) -> JID = jlib:make_jid(User, Server, <<"">>), LUser = JID#jid.luser, @@ -1276,44 +1207,6 @@ get_in_pending_subscriptions(Ls, User, Server, odbc) -> end, Items)); _ -> Ls - end; -get_in_pending_subscriptions(Ls, User, Server, riak) -> - JID = jlib:make_jid(User, Server, <<"">>), - LUser = JID#jid.luser, - LServer = JID#jid.lserver, - Username = LUser, - case catch riak_get_roster(LServer, Username) of - {ok, Items} when is_list(Items) -> - Ls ++ lists:map( - fun(R) -> - Message = R#roster.askmessage, - #xmlel{name = <<"presence">>, - attrs = [{<<"from">>, - jlib:jid_to_string(R#roster.jid)}, - {<<"to">>, jlib:jid_to_string(JID)}, - {<<"type">>, <<"subscribe">>}], - children = [#xmlel{name = <<"status">>, - attrs = [], - children = - [{xmlcdata, Message}]}]} - end, - lists:flatmap( - fun(I) -> - case riak_raw_to_record(LServer, I) of - %% Bad JID in database: - error -> - []; - R -> - case R#roster.ask of - in -> [R]; - both -> [R]; - _ -> [] - end - end - end, - Items)); - _ -> - Ls end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -1361,18 +1254,12 @@ read_subscription_and_groups(LUser, LServer, LJID, end; read_subscription_and_groups(LUser, LServer, LJID, riak) -> - Username = LUser, - SJID = jlib:jid_to_string(LJID), - case catch riak_get_subscription(LServer, Username, SJID) of - {ok, Subscription} -> - Groups = case riak_get_roster_jid_groups(LServer, Username) of - {ok, JGrps} when is_list(JGrps) -> - JGrps; - _ -> - [] - end, - {Subscription, Groups}; - _ -> error + case ejabberd_riak:get(roster, {LUser, LServer, LJID}) of + {ok, #roster{subscription = Subscription, + groups = Groups}} -> + {Subscription, Groups}; + _ -> + error end. get_jid_info(_, User, Server, JID) -> diff --git a/src/mod_shared_roster.erl b/src/mod_shared_roster.erl index 8a1423c76..58a79d920 100644 --- a/src/mod_shared_roster.erl +++ b/src/mod_shared_roster.erl @@ -400,6 +400,13 @@ list_groups(Host, mnesia) -> mnesia:dirty_select(sr_group, [{#sr_group{group_host = {'$1', '$2'}, _ = '_'}, [{'==', '$2', Host}], ['$1']}]); +list_groups(Host, riak) -> + case ejabberd_riak:get_keys_by_index(sr_group, <<"host">>, Host) of + {ok, Gs} -> + [G || {G, _} <- Gs]; + _ -> + [] + end; list_groups(Host, odbc) -> case ejabberd_odbc:sql_query(Host, [<<"select name from sr_group;">>]) @@ -417,6 +424,13 @@ groups_with_opts(Host, mnesia) -> _ = '_'}, [], [['$1', '$2']]}]), lists:map(fun ([G, O]) -> {G, O} end, Gs); +groups_with_opts(Host, riak) -> + case ejabberd_riak:get_by_index(sr_group, <<"host">>, Host) of + {ok, Rs} -> + [{G, O} || #sr_group{group_host = {G, _}, opts = O} <- Rs]; + _ -> + [] + end; groups_with_opts(Host, odbc) -> case ejabberd_odbc:sql_query(Host, [<<"select name, opts from sr_group;">>]) @@ -438,6 +452,10 @@ create_group(Host, Group, Opts, mnesia) -> R = #sr_group{group_host = {Group, Host}, opts = Opts}, F = fun () -> mnesia:write(R) end, mnesia:transaction(F); +create_group(Host, Group, Opts, riak) -> + {atomic, ejabberd_riak:put(#sr_group{group_host = {Group, Host}, + opts = Opts}, + [{'2i', [{<<"host">>, Host}]}])}; create_group(Host, Group, Opts, odbc) -> SGroup = ejabberd_odbc:escape(Group), SOpts = ejabberd_odbc:encode_term(Opts), @@ -464,6 +482,15 @@ delete_group(Host, Group, mnesia) -> Users) end, mnesia:transaction(F); +delete_group(Host, Group, riak) -> + try + ok = ejabberd_riak:delete(sr_group, {Group, Host}), + ok = ejabberd_riak:delete_by_index(sr_user, <<"group_host">>, + {Group, Host}), + {atomic, ok} + catch _:{badmatch, Err} -> + {atomic, Err} + end; delete_group(Host, Group, odbc) -> SGroup = ejabberd_odbc:escape(Group), F = fun () -> @@ -483,6 +510,11 @@ get_group_opts(Host, Group, mnesia) -> [#sr_group{opts = Opts}] -> Opts; _ -> error end; +get_group_opts(Host, Group, riak) -> + case ejabberd_riak:get(sr_group, {Group, Host}) of + {ok, #sr_group{opts = Opts}} -> Opts; + _ -> error + end; get_group_opts(Host, Group, odbc) -> SGroup = ejabberd_odbc:escape(Group), case catch ejabberd_odbc:sql_query(Host, @@ -502,6 +534,10 @@ set_group_opts(Host, Group, Opts, mnesia) -> R = #sr_group{group_host = {Group, Host}, opts = Opts}, F = fun () -> mnesia:write(R) end, mnesia:transaction(F); +set_group_opts(Host, Group, Opts, riak) -> + {atomic, ejabberd_riak:put(#sr_group{group_host = {Group, Host}, + opts = Opts}, + [{'2i', [{<<"host">>, Host}]}])}; set_group_opts(Host, Group, Opts, odbc) -> SGroup = ejabberd_odbc:escape(Group), SOpts = ejabberd_odbc:encode_term(Opts), @@ -525,6 +561,13 @@ get_user_groups(US, Host, mnesia) -> || #sr_user{group_host = {Group, H}} <- Rs, H == Host]; _ -> [] end; +get_user_groups(US, Host, riak) -> + case ejabberd_riak:get_by_index(sr_user, <<"us">>, US) of + {ok, Rs} -> + [Group || #sr_user{group_host = {Group, H}} <- Rs, H == Host]; + _ -> + [] + end; get_user_groups(US, Host, odbc) -> SJID = make_jid_s(US), case catch ejabberd_odbc:sql_query(Host, @@ -595,6 +638,14 @@ get_group_explicit_users(Host, Group, mnesia) -> Rs when is_list(Rs) -> [R#sr_user.us || R <- Rs]; _ -> [] end; +get_group_explicit_users(Host, Group, riak) -> + case ejabberd_riak:get_by_index(sr_user, <<"group_host">>, + {Group, Host}) of + {ok, Rs} -> + [R#sr_user.us || R <- Rs]; + _ -> + [] + end; get_group_explicit_users(Host, Group, odbc) -> SGroup = ejabberd_odbc:escape(Group), case catch ejabberd_odbc:sql_query(Host, @@ -680,6 +731,16 @@ get_user_displayed_groups(LUser, LServer, GroupsOpts, H == LServer]; _ -> [] end; +get_user_displayed_groups(LUser, LServer, GroupsOpts, + riak) -> + case ejabberd_riak:get_by_index(sr_user, + <<"us">>, {LUser, LServer}) of + {ok, Rs} -> + [{Group, proplists:get_value(Group, GroupsOpts, [])} + || #sr_user{group_host = {Group, _}} <- Rs]; + _ -> + [] + end; get_user_displayed_groups(LUser, LServer, GroupsOpts, odbc) -> SJID = make_jid_s(LUser, LServer), @@ -726,6 +787,21 @@ is_user_in_group(US, Group, Host, mnesia) -> [] -> lists:member(US, get_group_users(Host, Group)); _ -> true end; +is_user_in_group(US, Group, Host, riak) -> + case ejabberd_riak:get_by_index(sr_user, <<"us">>, US) of + {ok, Rs} -> + case lists:any( + fun(#sr_user{group_host = {G, H}}) -> + (Group == G) and (Host == H) + end, Rs) of + false -> + lists:member(US, get_group_users(Host, Group)); + true -> + true + end; + _Err -> + false + end; is_user_in_group(US, Group, Host, odbc) -> SJID = make_jid_s(US), SGroup = ejabberd_odbc:escape(Group), @@ -765,6 +841,12 @@ add_user_to_group(Host, US, Group, mnesia) -> R = #sr_user{us = US, group_host = {Group, Host}}, F = fun () -> mnesia:write(R) end, mnesia:transaction(F); +add_user_to_group(Host, US, Group, riak) -> + {atomic, ejabberd_riak:put( + #sr_user{us = US, group_host = {Group, Host}}, + [{i, {US, {Group, Host}}}, + {'2i', [{<<"us">>, US}, + {<<"group_host">>, {Group, Host}}]}])}; add_user_to_group(Host, US, Group, odbc) -> SJID = make_jid_s(US), SGroup = ejabberd_odbc:escape(Group), @@ -816,6 +898,8 @@ remove_user_from_group(Host, US, Group, mnesia) -> R = #sr_user{us = US, group_host = {Group, Host}}, F = fun () -> mnesia:delete_object(R) end, mnesia:transaction(F); +remove_user_from_group(Host, US, Group, riak) -> + {atomic, ejabberd_riak:delete(sr_group, {US, {Group, Host}})}; remove_user_from_group(Host, US, Group, odbc) -> SJID = make_jid_s(US), SGroup = ejabberd_odbc:escape(Group), diff --git a/src/mod_vcard.erl b/src/mod_vcard.erl index e7dd77224..8ffe6642b 100644 --- a/src/mod_vcard.erl +++ b/src/mod_vcard.erl @@ -46,7 +46,7 @@ lbday, ctry, lctry, locality, llocality, email, lemail, orgname, lorgname, orgunit, lorgunit}). --record(vcard, {us = {<<"">>, <<"">>} :: {binary(), binary()}, +-record(vcard, {us = {<<"">>, <<"">>} :: {binary(), binary()} | binary(), vcard = #xmlel{} :: xmlel()}). -define(PROCNAME, ejabberd_mod_vcard). @@ -214,13 +214,9 @@ get_vcard(LUser, LServer, odbc) -> _ -> error end; get_vcard(LUser, LServer, riak) -> - Username = LUser, - case catch ejabberd_riak:get(LServer, <<"vcard">>, Username) of - {ok, SVCARD} -> - case xml_stream:parse_element(SVCARD) of - {error, _Reason} -> error; - VCARD -> [VCARD] - end; + case ejabberd_riak:get(vcard, {LUser, LServer}) of + {ok, R} -> + [R#vcard.vcard]; {error, notfound} -> []; _ -> @@ -302,6 +298,33 @@ set_vcard(User, LServer, VCARD) -> lorgunit = LOrgUnit}) end, mnesia:transaction(F); + riak -> + US = {LUser, LServer}, + ejabberd_riak:put(#vcard{us = US, vcard = VCARD}, + [{'2i', [{<<"user">>, User}, + {<<"luser">>, LUser}, + {<<"fn">>, FN}, + {<<"lfn">>, LFN}, + {<<"family">>, Family}, + {<<"lfamily">>, LFamily}, + {<<"given">>, Given}, + {<<"lgiven">>, LGiven}, + {<<"middle">>, Middle}, + {<<"lmiddle">>, LMiddle}, + {<<"nickname">>, Nickname}, + {<<"lnickname">>, LNickname}, + {<<"bday">>, BDay}, + {<<"lbday">>, LBDay}, + {<<"ctry">>, CTRY}, + {<<"lctry">>, LCTRY}, + {<<"locality">>, Locality}, + {<<"llocality">>, LLocality}, + {<<"email">>, EMail}, + {<<"lemail">>, LEMail}, + {<<"orgname">>, OrgName}, + {<<"lorgname">>, LOrgName}, + {<<"orgunit">>, OrgUnit}, + {<<"lorgunit">>, LOrgUnit}]}]); odbc -> Username = ejabberd_odbc:escape(User), LUsername = ejabberd_odbc:escape(LUser), @@ -335,25 +358,7 @@ set_vcard(User, LServer, VCARD) -> SLGiven, SLLocality, SLMiddle, SLNickname, SLOrgName, SLOrgUnit, SLocality, SMiddle, SNickname, SOrgName, - SOrgUnit, SVCARD, Username); - riak -> - Username = LUser, - SVCARD = xml:element_to_binary(VCARD), - - ejabberd_riak:put( - LServer, <<"vcard">>, Username, SVCARD, - [{<<"bday_bin">>, LBDay}, - {<<"ctry_bin">>, LCTRY}, - {<<"email_bin">>, LEMail}, - {<<"fn_bin">>, LFN}, - {<<"family_bin">>, LFamily}, - {<<"given_bin">>, LGiven}, - {<<"locality_bin">>, LLocality}, - {<<"middle_bin">>, LMiddle}, - {<<"nickname_bin">>, LNickname}, - {<<"orgname_bin">>, LOrgName}, - {<<"orgunit_bin">>, LOrgUnit}, - {<<"user_bin">>, Username}]) + SOrgUnit, SVCARD, Username) end, ejabberd_hooks:run(vcard_set, LServer, [LUser, LServer, VCARD]) @@ -921,9 +926,7 @@ remove_user(LUser, LServer, odbc) -> [<<"delete from vcard_search where lusername='">>, Username, <<"';">>]]); remove_user(LUser, LServer, riak) -> - Username = LUser, - ejabberd_riak:delete(LServer, <<"vcard">>, Username), - ok. + {atomic, ejabberd_riak:delete(vcard, {LUser, LServer})}. update_tables() -> update_vcard_table(), diff --git a/src/mod_vcard_xupdate.erl b/src/mod_vcard_xupdate.erl index b2ea34419..74dd30f27 100644 --- a/src/mod_vcard_xupdate.erl +++ b/src/mod_vcard_xupdate.erl @@ -88,6 +88,9 @@ add_xupdate(LUser, LServer, Hash, mnesia) -> hash = Hash}) end, mnesia:transaction(F); +add_xupdate(LUser, LServer, Hash, riak) -> + {atomic, ejabberd_riak:put(#vcard_xupdate{us = {LUser, LServer}, + hash = Hash})}; add_xupdate(LUser, LServer, Hash, odbc) -> Username = ejabberd_odbc:escape(LUser), SHash = ejabberd_odbc:escape(Hash), @@ -109,6 +112,11 @@ get_xupdate(LUser, LServer, mnesia) -> [#vcard_xupdate{hash = Hash}] -> Hash; _ -> undefined end; +get_xupdate(LUser, LServer, riak) -> + case ejabberd_riak:get(vcard_xupdate, {LUser, LServer}) of + {ok, #vcard_xupdate{hash = Hash}} -> Hash; + _ -> undefined + end; get_xupdate(LUser, LServer, odbc) -> Username = ejabberd_odbc:escape(LUser), case ejabberd_odbc:sql_query(LServer, @@ -129,6 +137,8 @@ remove_xupdate(LUser, LServer, mnesia) -> mnesia:delete({vcard_xupdate, {LUser, LServer}}) end, mnesia:transaction(F); +remove_xupdate(LUser, LServer, riak) -> + {atomic, ejabberd_riak:delete(vcard_xupdate, {LUser, LServer})}; remove_xupdate(LUser, LServer, odbc) -> Username = ejabberd_odbc:escape(LUser), F = fun () ->