Improve Riak support

This commit is contained in:
Evgeniy Khramtsov 2012-11-13 22:56:27 +10:00
parent a4b02c38db
commit 0490c2f139
16 changed files with 1405 additions and 408 deletions

285
src/ejabberd_auth_riak.erl Normal file
View File

@ -0,0 +1,285 @@
%%%----------------------------------------------------------------------
%%% File : ejabberd_auth_riak.erl
%%% Author : Evgeniy Khramtsov <ekhramtsov@process-one.net>
%%% Purpose : Authentification via Riak
%%% Created : 12 Nov 2012 by Evgeniy Khramtsov <ekhramtsov@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2012 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License
%%% along with this program; if not, write to the Free Software
%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
%%% 02111-1307 USA
%%%
%%%----------------------------------------------------------------------
-module(ejabberd_auth_riak).
-author('alexey@process-one.net').
-behaviour(ejabberd_auth).
%% External exports
-export([start/1, set_password/3, check_password/3,
check_password/5, try_register/3,
dirty_get_registered_users/0, get_vh_registered_users/1,
get_vh_registered_users/2,
get_vh_registered_users_number/1,
get_vh_registered_users_number/2, get_password/2,
get_password_s/2, is_user_exists/2, remove_user/2,
remove_user/3, store_type/0, export/1,
plain_password_required/0]).
-include("ejabberd.hrl").
-record(passwd, {us = {<<"">>, <<"">>} :: {binary(), binary()} | '$1',
password = <<"">> :: binary() | scram() | '_'}).
-define(SALT_LENGTH, 16).
start(_Host) ->
ok.
plain_password_required() ->
case is_scrammed() of
false -> false;
true -> true
end.
store_type() ->
case is_scrammed() of
false -> plain; %% allows: PLAIN DIGEST-MD5 SCRAM
true -> scram %% allows: PLAIN SCRAM
end.
check_password(User, Server, Password) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
case ejabberd_riak:get(passwd, {LUser, LServer}) of
{ok, #passwd{password = Password}} when is_binary(Password) ->
Password /= <<"">>;
{ok, #passwd{password = Scram}} when is_record(Scram, scram) ->
is_password_scram_valid(Password, Scram);
_ ->
false
end.
check_password(User, Server, Password, Digest,
DigestGen) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
case ejabberd_riak:get(passwd, {LUser, LServer}) of
{ok, #passwd{password = Passwd}} when is_binary(Passwd) ->
DigRes = if Digest /= <<"">> ->
Digest == DigestGen(Passwd);
true -> false
end,
if DigRes -> true;
true -> (Passwd == Password) and (Password /= <<"">>)
end;
{ok, #passwd{password = Scram}}
when is_record(Scram, scram) ->
Passwd = jlib:decode_base64(Scram#scram.storedkey),
DigRes = if Digest /= <<"">> ->
Digest == DigestGen(Passwd);
true -> false
end,
if DigRes -> true;
true -> (Passwd == Password) and (Password /= <<"">>)
end;
_ -> false
end.
set_password(User, Server, Password) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
US = {LUser, LServer},
if (LUser == error) or (LServer == error) ->
{error, invalid_jid};
true ->
Password2 = case is_scrammed() and is_binary(Password)
of
true -> password_to_scram(Password);
false -> Password
end,
ok = ejabberd_riak:put(#passwd{us = US, password = Password2},
[{'2i', [{<<"host">>, LServer}]}])
end.
try_register(User, Server, PasswordList) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
Password = iolist_to_binary(PasswordList),
US = {LUser, LServer},
if (LUser == error) or (LServer == error) ->
{error, invalid_jid};
true ->
case ejabberd_riak:get(passwd, US) of
{error, notfound} ->
Password2 = case is_scrammed() and
is_binary(Password)
of
true -> password_to_scram(Password);
false -> Password
end,
{atomic, ejabberd_riak:put(
#passwd{us = US,
password = Password2},
[{'2i', [{<<"host">>, LServer}]}])};
{ok, _} ->
exists;
Err ->
{atomic, Err}
end
end.
dirty_get_registered_users() ->
lists:flatmap(
fun(Server) ->
get_vh_registered_users(Server)
end, ejabberd_config:get_vh_by_auth_method(riak)).
get_vh_registered_users(Server) ->
LServer = jlib:nameprep(Server),
case ejabberd_riak:get_keys_by_index(passwd, <<"host">>, LServer) of
{ok, Users} ->
Users;
_ ->
[]
end.
get_vh_registered_users(Server, _) ->
get_vh_registered_users(Server).
get_vh_registered_users_number(Server) ->
LServer = jlib:nameprep(Server),
case ejabberd_riak:count_by_index(passwd, <<"host">>, LServer) of
{ok, N} ->
N;
_ ->
0
end.
get_vh_registered_users_number(Server, _) ->
get_vh_registered_users_number(Server).
get_password(User, Server) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
case ejabberd_riak:get(passwd, {LUser, LServer}) of
{ok, #passwd{password = Password}}
when is_binary(Password) ->
Password;
{ok, #passwd{password = Scram}}
when is_record(Scram, scram) ->
{jlib:decode_base64(Scram#scram.storedkey),
jlib:decode_base64(Scram#scram.serverkey),
jlib:decode_base64(Scram#scram.salt),
Scram#scram.iterationcount};
_ -> false
end.
get_password_s(User, Server) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
case ejabberd_riak:get(passwd, {LUser, LServer}) of
{ok, #passwd{password = Password}}
when is_binary(Password) ->
Password;
{ok, #passwd{password = Scram}}
when is_record(Scram, scram) ->
<<"">>;
_ -> <<"">>
end.
is_user_exists(User, Server) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
case ejabberd_riak:get(passwd, {LUser, LServer}) of
{error, notfound} -> false;
{ok, _} -> true;
Err -> Err
end.
remove_user(User, Server) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
ejabberd_riak:delete(passwd, {LUser, LServer}),
ok.
remove_user(User, Server, Password) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
case ejabberd_riak:get(passwd, {LUser, LServer}) of
{ok, #passwd{password = Password}}
when is_binary(Password) ->
ejabberd_riak:delete(passwd, {LUser, LServer}),
ok;
{ok, #passwd{password = Scram}}
when is_record(Scram, scram) ->
case is_password_scram_valid(Password, Scram) of
true ->
ejabberd_riak:delete(passwd, {LUser, LServer}),
ok;
false -> not_allowed
end;
_ -> not_exists
end.
%%%
%%% SCRAM
%%%
is_scrammed() ->
scram ==
ejabberd_config:get_local_option({auth_password_format, ?MYNAME},
fun(V) -> V end).
password_to_scram(Password) ->
password_to_scram(Password,
?SCRAM_DEFAULT_ITERATION_COUNT).
password_to_scram(Password, IterationCount) ->
Salt = crypto:rand_bytes(?SALT_LENGTH),
SaltedPassword = scram:salted_password(Password, Salt,
IterationCount),
StoredKey =
scram:stored_key(scram:client_key(SaltedPassword)),
ServerKey = scram:server_key(SaltedPassword),
#scram{storedkey = jlib:encode_base64(StoredKey),
serverkey = jlib:encode_base64(ServerKey),
salt = jlib:encode_base64(Salt),
iterationcount = IterationCount}.
is_password_scram_valid(Password, Scram) ->
IterationCount = Scram#scram.iterationcount,
Salt = jlib:decode_base64(Scram#scram.salt),
SaltedPassword = scram:salted_password(Password, Salt,
IterationCount),
StoredKey =
scram:stored_key(scram:client_key(SaltedPassword)),
jlib:decode_base64(Scram#scram.storedkey) == StoredKey.
export(_Server) ->
[{passwd,
fun(Host, #passwd{us = {LUser, LServer}, password = Password})
when LServer == Host ->
Username = ejabberd_odbc:escape(LUser),
Pass = ejabberd_odbc:escape(Password),
[[<<"delete from users where username='">>, Username, <<"';">>],
[<<"insert into users(username, password) "
"values ('">>, Username, <<"', '">>, Pass, <<"');">>]];
(_Host, _R) ->
[]
end}].

View File

@ -1,11 +1,10 @@
%%%----------------------------------------------------------------------
%%% File : ejabberd_riak.erl
%%% Author : Alexey Shchepin <alexey@process-one.net>
%%% Purpose : Serve Riak connection
%%%-------------------------------------------------------------------
%%% @author Alexey Shchepin <alexey@process-one.net>
%%% @doc
%%% Interface for Riak database
%%% @end
%%% Created : 29 Dec 2011 by Alexey Shchepin <alexey@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2011 ProcessOne
%%% @copyright (C) 2002-2012 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
@ -22,76 +21,249 @@
%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA
%%% 02111-1307 USA
%%%
%%%----------------------------------------------------------------------
%%%-------------------------------------------------------------------
-module(ejabberd_riak).
-author('alexey@process-one.net').
%% External exports
-export([start_link/3,
put/4,
put/5,
get_object/3,
get/3,
get_objects_by_index/4,
get_by_index/4,
get_keys_by_index/4,
count_by_index/4,
delete/3]).
-behaviour(gen_server).
%% API
-export([start_link/3, make_bucket/1, put/1, put/2,
get/1, get/2, get_by_index/3, delete/1, delete/2,
count_by_index/3, get_by_index_range/4,
get_keys/1, get_keys_by_index/3,
count/1, delete_by_index/3]).
%% For debugging
-export([get_tables/0]).
%% map/reduce exports
-export([map_key/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-include("ejabberd.hrl").
%%%----------------------------------------------------------------------
-record(state, {pid = self() :: pid()}).
-type index() :: {binary(), any()}.
-type index_info() :: [{i, any()} | {'2i', [index()]}].
%% The `index_info()' is used in put/delete functions:
%% `i' defines a primary index, `` '2i' '' defines secondary indexes.
%% There must be only one primary index. If `i' is not specified,
%% the first element of the record is assumed as a primary index,
%% i.e. `i' = element(2, Record).
-export_types([index_info/0]).
%%%===================================================================
%%% API
%%%----------------------------------------------------------------------
start_link(Server, Port, StartInterval) ->
{ok, Pid} = riakc_pb_socket:start_link(
Server, Port,
[auto_reconnect]),
ejabberd_riak_sup:add_pid(Pid),
{ok, Pid}.
%%%===================================================================
%% @private
start_link(Server, Port, _StartInterval) ->
gen_server:start_link(?MODULE, [Server, Port], []).
make_bucket(Host, Table) ->
iolist_to_binary([Host, $@, Table]).
-spec make_bucket(atom()) -> binary().
%% @doc Makes a bucket from a table name
%% @private
make_bucket(Table) ->
erlang:atom_to_binary(Table, utf8).
put(Host, Table, Key, Value) ->
Bucket = make_bucket(Host, Table),
Obj = riakc_obj:new(Bucket, Key, Value),
riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj).
-spec put(tuple()) -> ok | {error, any()}.
%% @equiv put(Record, [])
put(Record) ->
?MODULE:put(Record, []).
put(Host, Table, Key, Value, Indexes) ->
Bucket = make_bucket(Host, Table),
Obj = riakc_obj:new(Bucket, Key, Value),
MetaData = dict:store(<<"index">>, Indexes, dict:new()),
Obj2 = riakc_obj:update_metadata(Obj, MetaData),
riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj2).
-spec put(tuple(), index_info()) -> ok | {error, any()}.
%% @doc Stores a record `Rec' with indexes described in ``IndexInfo''
put(Rec, IndexInfo) ->
Key = encode_key(proplists:get_value(i, IndexInfo, element(2, Rec))),
SecIdxs = [encode_index_key(K, V) ||
{K, V} <- proplists:get_value('2i', IndexInfo, [])],
Table = element(1, Rec),
Value = term_to_binary(Rec),
case put_raw(Table, Key, Value, SecIdxs) of
ok ->
ok;
Error ->
log_error(Error, put, [{record, Rec},
{index_info, IndexInfo}]),
Error
end.
get_object(Host, Table, Key) ->
Bucket = make_bucket(Host, Table),
put_raw(Table, Key, Value, Indexes) ->
Bucket = make_bucket(Table),
Obj = riakc_obj:new(Bucket, Key, Value, "application/x-erlang-term"),
Obj1 = if Indexes /= [] ->
MetaData = dict:store(<<"index">>, Indexes, dict:new()),
riakc_obj:update_metadata(Obj, MetaData);
true ->
Obj
end,
riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj1).
get_object_raw(Table, Key) ->
Bucket = make_bucket(Table),
riakc_pb_socket:get(ejabberd_riak_sup:get_random_pid(), Bucket, Key).
get(Host, Table, Key) ->
case get_object(Host, Table, Key) of
-spec get(atom()) -> {ok, [any()]} | {error, any()}.
%% @doc Returns all objects from table `Table'
get(Table) ->
Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
ejabberd_riak_sup:get_random_pid(),
Bucket,
[{map, {modfun, riak_kv_mapreduce, map_object_value},
none, true}]) of
{ok, [{_, Objs}]} ->
{ok, lists:flatmap(
fun(Obj) ->
case catch binary_to_term(Obj) of
{'EXIT', _} ->
Error = {error, make_invalid_object(Obj)},
log_error(Error, get,
[{table, Table}]),
[];
Term ->
[Term]
end
end, Objs)};
{error, notfound} ->
{ok, []};
Error ->
Error
end.
-spec get(atom(), any()) -> {ok, any()} | {error, any()}.
%% @doc Reads record by `Key' from table `Table'
get(Table, Key) ->
case get_raw(Table, encode_key(Key)) of
{ok, Val} ->
case catch binary_to_term(Val) of
{'EXIT', _} ->
Error = {error, make_invalid_object(Val)},
log_error(Error, get, [{table, Table}, {key, Key}]),
{error, notfound};
Term ->
{ok, Term}
end;
Error ->
log_error(Error, get, [{table, Table},
{key, Key}]),
Error
end.
-spec get_by_index(atom(), binary(), any()) -> {ok, [any()]} | {error, any()}.
%% @doc Reads records by `Index' and value `Key' from `Table'
get_by_index(Table, Index, Key) ->
{NewIndex, NewKey} = encode_index_key(Index, Key),
case get_by_index_raw(Table, NewIndex, NewKey) of
{ok, Vals} ->
{ok, lists:flatmap(
fun(Val) ->
case catch binary_to_term(Val) of
{'EXIT', _} ->
Error = {error, make_invalid_object(Val)},
log_error(Error, get_by_index,
[{table, Table},
{index, Index},
{key, Key}]),
[];
Term ->
[Term]
end
end, Vals)};
{error, notfound} ->
{ok, []};
Error ->
log_error(Error, get_by_index,
[{table, Table},
{index, Index},
{key, Key}]),
Error
end.
-spec get_by_index_range(atom(), binary(), any(), any()) ->
{ok, [any()]} | {error, any()}.
%% @doc Reads records by `Index' in the range `FromKey'..`ToKey' from `Table'
get_by_index_range(Table, Index, FromKey, ToKey) ->
{NewIndex, NewFromKey} = encode_index_key(Index, FromKey),
{NewIndex, NewToKey} = encode_index_key(Index, ToKey),
case get_by_index_range_raw(Table, NewIndex, NewFromKey, NewToKey) of
{ok, Vals} ->
{ok, lists:flatmap(
fun(Val) ->
case catch binary_to_term(Val) of
{'EXIT', _} ->
Error = {error, make_invalid_object(Val)},
log_error(Error, get_by_index_range,
[{table, Table},
{index, Index},
{start_key, FromKey},
{end_key, ToKey}]),
[];
Term ->
[Term]
end
end, Vals)};
{error, notfound} ->
{ok, []};
Error ->
log_error(Error, get_by_index_range,
[{table, Table}, {index, Index},
{start_key, FromKey}, {end_key, ToKey}]),
Error
end.
get_raw(Table, Key) ->
case get_object_raw(Table, Key) of
{ok, Obj} ->
{ok, riakc_obj:get_value(Obj)};
Error ->
Error
end.
get_objects_by_index(Host, Table, Index, Key) ->
Bucket = make_bucket(Host, Table),
-spec get_keys(atom()) -> {ok, [any()]} | {error, any()}.
%% @doc Returns a list of index values
get_keys(Table) ->
Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
ejabberd_riak_sup:get_random_pid(),
{index, Bucket, Index, Key},
[{map, {modfun, riak_kv_mapreduce, map_identity}, none, true}]) of
{ok, [{_, Objs}]} ->
{ok, Objs};
Bucket,
[{map, {modfun, ?MODULE, map_key}, none, true}]) of
{ok, [{_, Keys}]} ->
{ok, Keys};
Error ->
log_error(Error, get_keys, [{table, Table}]),
Error
end.
get_by_index(Host, Table, Index, Key) ->
Bucket = make_bucket(Host, Table),
-spec get_keys_by_index(atom(), binary(),
any()) -> {ok, [any()]} | {error, any()}.
%% @doc Returns a list of primary keys of objects indexed by `Key'.
get_keys_by_index(Table, Index, Key) ->
{NewIndex, NewKey} = encode_index_key(Index, Key),
Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
ejabberd_riak_sup:get_random_pid(),
{index, Bucket, NewIndex, NewKey},
[{map, {modfun, ?MODULE, map_key}, none, true}]) of
{ok, [{_, Keys}]} ->
{ok, Keys};
Error ->
log_error(Error, get_keys_by_index, [{table, Table},
{index, Index},
{key, Key}]),
Error
end.
%% @hidden
get_tables() ->
riakc_pb_socket:list_buckets(ejabberd_riak_sup:get_random_pid()).
get_by_index_raw(Table, Index, Key) ->
Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
ejabberd_riak_sup:get_random_pid(),
{index, Bucket, Index, Key},
@ -103,20 +275,55 @@ get_by_index(Host, Table, Index, Key) ->
Error
end.
get_keys_by_index(Host, Table, Index, Key) ->
Bucket = make_bucket(Host, Table),
get_by_index_range_raw(Table, Index, FromKey, ToKey) ->
Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
ejabberd_riak_sup:get_random_pid(),
{index, Bucket, Index, Key},
[]) of
{ok, [{_, Ls}]} ->
{ok, [K || {_, K} <- Ls]};
{index, Bucket, Index, FromKey, ToKey},
[{map, {modfun, riak_kv_mapreduce, map_object_value},
none, true}]) of
{ok, [{_, Objs}]} ->
{ok, Objs};
Error ->
Error
end.
count_by_index(Host, Table, Index, Key) ->
Bucket = make_bucket(Host, Table),
-spec count(atom()) -> {ok, non_neg_integer()} | {error, any()}.
%% @doc Returns the number of objects in the `Table'
count(Table) ->
Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
ejabberd_riak_sup:get_random_pid(),
Bucket,
[{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
none, true}]) of
{ok, [{_, [Cnt]}]} ->
{ok, Cnt};
Error ->
log_error(Error, count, [{table, Table}]),
Error
end.
-spec count_by_index(atom(), binary(), any()) ->
{ok, non_neg_integer()} | {error, any()}.
%% @doc Returns the number of objects in the `Table' by index
count_by_index(Tab, Index, Key) ->
{NewIndex, NewKey} = encode_index_key(Index, Key),
case count_by_index_raw(Tab, NewIndex, NewKey) of
{ok, Cnt} ->
{ok, Cnt};
{error, notfound} ->
{ok, 0};
Error ->
log_error(Error, count_by_index,
[{table, Tab},
{index, Index},
{key, Key}]),
Error
end.
count_by_index_raw(Table, Index, Key) ->
Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
ejabberd_riak_sup:get_random_pid(),
{index, Bucket, Index, Key},
@ -128,7 +335,154 @@ count_by_index(Host, Table, Index, Key) ->
Error
end.
delete(Host, Table, Key) ->
Bucket = make_bucket(Host, Table),
-spec delete(tuple() | atom()) -> ok | {error, any()}.
%% @doc Same as delete(T, []) when T is record.
%% Or deletes all elements from table if T is atom.
delete(Rec) when is_tuple(Rec) ->
delete(Rec, []);
delete(Table) when is_atom(Table) ->
try
{ok, Keys} = ?MODULE:get_keys(Table),
lists:foreach(
fun(K) ->
ok = delete(Table, K)
end, Keys)
catch _:{badmatch, Err} ->
Err
end.
-spec delete(tuple() | atom(), index_info() | any()) -> ok | {error, any()}.
%% @doc Delete an object
delete(Rec, Opts) when is_tuple(Rec) ->
Table = element(1, Rec),
Key = proplists:get_value(i, Opts, element(2, Rec)),
delete(Table, Key);
delete(Table, Key) when is_atom(Table) ->
case delete_raw(Table, encode_key(Key)) of
ok ->
ok;
Err ->
log_error(Err, delete, [{table, Table}, {key, Key}]),
Err
end.
delete_raw(Table, Key) ->
Bucket = make_bucket(Table),
riakc_pb_socket:delete(ejabberd_riak_sup:get_random_pid(), Bucket, Key).
-spec delete_by_index(atom(), binary(), any()) -> ok | {error, any()}.
%% @doc Deletes objects by index
delete_by_index(Table, Index, Key) ->
try
{ok, Keys} = get_keys_by_index(Table, Index, Key),
lists:foreach(
fun(K) ->
ok = delete(Table, K)
end, Keys)
catch _:{badmatch, Err} ->
Err
end.
%%%===================================================================
%%% map/reduce functions
%%%===================================================================
%% @private
map_key(Obj, _, _) ->
[case riak_object:key(Obj) of
<<"b_", B/binary>> ->
B;
<<"i_", B/binary>> ->
list_to_integer(binary_to_list(B));
B ->
erlang:binary_to_term(B)
end].
%%%===================================================================
%%% gen_server API
%%%===================================================================
%% @private
init([Server, Port]) ->
case riakc_pb_socket:start(
Server, Port,
[auto_reconnect]) of
{ok, Pid} ->
erlang:monitor(process, Pid),
ejabberd_riak_sup:add_pid(Pid),
{ok, #state{pid = Pid}};
Err ->
{stop, Err}
end.
%% @private
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
%% @private
handle_cast(_Msg, State) ->
{noreply, State}.
%% @private
handle_info({'DOWN', _MonitorRef, _Type, _Object, _Info}, State) ->
{stop, normal, State};
handle_info(_Info, State) ->
?ERROR_MSG("unexpected info: ~p", [_Info]),
{noreply, State}.
%% @private
terminate(_Reason, State) ->
ejabberd_riak_sup:remove_pid(State#state.pid),
ok.
%% @private
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
encode_index_key(Idx, Key) when is_integer(Key) ->
{<<Idx/binary, "_int">>, Key};
encode_index_key(Idx, Key) ->
{<<Idx/binary, "_bin">>, encode_key(Key)}.
encode_key(Bin) when is_binary(Bin) ->
<<"b_", Bin/binary>>;
encode_key(Int) when is_integer(Int) ->
<<"i_", (list_to_binary(integer_to_list(Int)))/binary>>;
encode_key(Term) ->
erlang:term_to_binary(Term).
log_error({error, notfound}, _, _) ->
ok;
log_error({error, Why} = Err, Function, Opts) ->
Txt = lists:map(
fun({table, Table}) ->
io_lib:fwrite("** Table: ~p~n", [Table]);
({key, Key}) ->
io_lib:fwrite("** Key: ~p~n", [Key]);
({index, Index}) ->
io_lib:fwrite("** Index = ~p~n", [Index]);
({start_key, Key}) ->
io_lib:fwrite("** Start Key: ~p~n", [Key]);
({end_key, Key}) ->
io_lib:fwrite("** End Key: ~p~n", [Key]);
({record, Rec}) ->
io_lib:fwrite("** Record = ~p~n", [Rec]);
({index_info, IdxInfo}) ->
io_lib:fwrite("** Index info = ~p~n", [IdxInfo]);
(_) ->
""
end, Opts),
ErrTxt = if is_binary(Why) ->
io_lib:fwrite("** Error: ~s", [Why]);
true ->
io_lib:fwrite("** Error: ~p", [Err])
end,
?ERROR_MSG("database error:~n** Function: ~p~n~s~s",
[Function, Txt, ErrTxt]);
log_error(_, _, _) ->
ok.
make_invalid_object(Val) ->
list_to_binary(io_lib:fwrite("Invalid object: ~p", [Val])).

View File

@ -105,8 +105,9 @@ init([]) ->
{Server, Port} =
ejabberd_config:get_local_option(
riak_server,
fun({S, P}) when is_list(S), is_integer(P), P >= 1 -> {S, P} end,
{"127.0.0.1", 8081}),
fun({S, P}) when is_integer(P), P > 0, P < 65536 ->
{binary_to_list(iolist_to_binary(S)), P}
end, {"127.0.0.1", 8081}),
{ok, {{one_for_one, PoolSize*10, 1},
lists:map(
fun(I) ->

View File

@ -792,6 +792,17 @@ announce_motd(Host, Packet) ->
end, Sessions)
end,
mnesia:transaction(F);
riak ->
try
lists:foreach(
fun({U, S, _R}) ->
ok = ejabberd_riak:put(#motd_users{us = {U, S}},
[{'2i', [{<<"server">>, S}]}])
end, Sessions),
{atomic, ok}
catch _:{badmatch, Err} ->
{atomic, Err}
end;
odbc ->
F = fun() ->
lists:foreach(
@ -837,6 +848,9 @@ announce_motd_update(LServer, Packet) ->
mnesia:write(#motd{server = LServer, packet = Packet})
end,
mnesia:transaction(F);
riak ->
{atomic, ejabberd_riak:put(#motd{server = LServer,
packet = Packet})};
odbc ->
XML = ejabberd_odbc:escape(xml:element_to_binary(Packet)),
F = fun() ->
@ -887,6 +901,16 @@ announce_motd_delete(LServer) ->
end, Users)
end,
mnesia:transaction(F);
riak ->
try
ok = ejabberd_riak:delete(motd, LServer),
ok = ejabberd_riak:delete_by_index(motd_users,
<<"server">>,
LServer),
{atomic, ok}
catch _:{badmatch, Err} ->
{atomic, Err}
end;
odbc ->
F = fun() ->
ejabberd_odbc:sql_query_t([<<"delete from motd;">>])
@ -915,6 +939,23 @@ send_motd(#jid{luser = LUser, lserver = LServer} = JID, mnesia) ->
_ ->
ok
end;
send_motd(#jid{luser = LUser, lserver = LServer} = JID, riak) ->
case catch ejabberd_riak:get(motd, LServer) of
{ok, #motd{packet = Packet}} ->
US = {LUser, LServer},
case ejabberd_riak:get(motd_users, US) of
{ok, #motd_users{}} ->
ok;
_ ->
Local = jlib:make_jid(<<>>, LServer, <<>>),
ejabberd_router:route(Local, JID, Packet),
{atomic, ejabberd_riak:put(
#motd_users{us = US},
[{'2i', [{<<"server">>, LServer}]}])}
end;
_ ->
ok
end;
send_motd(#jid{luser = LUser, lserver = LServer} = JID, odbc) when LUser /= <<>> ->
case catch ejabberd_odbc:sql_query(
LServer, [<<"select xml from motd where username='';">>]) of
@ -965,6 +1006,13 @@ get_stored_motd_packet(LServer, mnesia) ->
_ ->
error
end;
get_stored_motd_packet(LServer, riak) ->
case ejabberd_riak:get(motd, LServer) of
{ok, #motd{packet = Packet}} ->
{ok, Packet};
_ ->
error
end;
get_stored_motd_packet(LServer, odbc) ->
case catch ejabberd_odbc:sql_query(
LServer, [<<"select xml from motd where username='';">>]) of

View File

@ -181,6 +181,37 @@ process_blocklist_block(LUser, LServer, Filter,
{ok, NewDefault, NewList}
end,
mnesia:transaction(F);
process_blocklist_block(LUser, LServer, Filter,
riak) ->
{atomic,
begin
case ejabberd_riak:get(privacy, {LUser, LServer}) of
{ok, #privacy{default = Default, lists = Lists} = P} ->
case lists:keysearch(Default, 1, Lists) of
{value, {_, List}} ->
NewDefault = Default,
NewLists1 = lists:keydelete(Default, 1, Lists);
false ->
NewDefault = <<"Blocked contacts">>,
NewLists1 = Lists,
List = []
end;
{error, _} ->
P = #privacy{us = {LUser, LServer}},
NewDefault = <<"Blocked contacts">>,
NewLists1 = [],
List = []
end,
NewList = Filter(List),
NewLists = [{NewDefault, NewList} | NewLists1],
case ejabberd_riak:put(P#privacy{default = NewDefault,
lists = NewLists}) of
ok ->
{ok, NewDefault, NewList};
Err ->
Err
end
end};
process_blocklist_block(LUser, LServer, Filter, odbc) ->
F = fun () ->
Default = case
@ -256,6 +287,30 @@ process_blocklist_unblock_all(LUser, LServer, Filter,
end
end,
mnesia:transaction(F);
process_blocklist_unblock_all(LUser, LServer, Filter,
riak) ->
{atomic,
case ejabberd_riak:get(privacy, {LUser, LServer}) of
{ok, #privacy{default = Default, lists = Lists} = P} ->
case lists:keysearch(Default, 1, Lists) of
{value, {_, List}} ->
NewList = Filter(List),
NewLists1 = lists:keydelete(Default, 1, Lists),
NewLists = [{Default, NewList} | NewLists1],
case ejabberd_riak:put(P#privacy{lists = NewLists}) of
ok ->
{ok, Default, NewList};
Err ->
Err
end;
false ->
%% No default list, nothing to unblock
ok
end;
{error, _} ->
%% No lists, nothing to unblock
ok
end};
process_blocklist_unblock_all(LUser, LServer, Filter,
odbc) ->
F = fun () ->
@ -331,6 +386,30 @@ process_blocklist_unblock(LUser, LServer, Filter,
end
end,
mnesia:transaction(F);
process_blocklist_unblock(LUser, LServer, Filter,
riak) ->
{atomic,
case ejabberd_riak:get(privacy, {LUser, LServer}) of
{error, _} ->
%% No lists, nothing to unblock
ok;
{ok, #privacy{default = Default, lists = Lists} = P} ->
case lists:keysearch(Default, 1, Lists) of
{value, {_, List}} ->
NewList = Filter(List),
NewLists1 = lists:keydelete(Default, 1, Lists),
NewLists = [{Default, NewList} | NewLists1],
case ejabberd_riak:put(P#privacy{lists = NewLists}) of
ok ->
{ok, Default, NewList};
Err ->
Err
end;
false ->
%% No default list, nothing to unblock
ok
end
end};
process_blocklist_unblock(LUser, LServer, Filter,
odbc) ->
F = fun () ->
@ -409,6 +488,18 @@ process_blocklist_get(LUser, LServer, mnesia) ->
_ -> []
end
end;
process_blocklist_get(LUser, LServer, riak) ->
case ejabberd_riak:get(privacy, {LUser, LServer}) of
{ok, #privacy{default = Default, lists = Lists}} ->
case lists:keysearch(Default, 1, Lists) of
{value, {_, List}} -> List;
_ -> []
end;
{error, notfound} ->
[];
{error, _} ->
error
end;
process_blocklist_get(LUser, LServer, odbc) ->
case catch
mod_privacy:sql_get_default_privacy_list(LUser, LServer)

View File

@ -430,12 +430,49 @@ caps_read_fun(Node) ->
[#caps_features{features = Features}] -> {ok, Features};
_ -> error
end
end;
caps_read_fun(_LServer, Node, riak) ->
fun() ->
case ejabberd_riak:get(caps_features, Node) of
{ok, #caps_features{features = Features}} -> {ok, Features};
_ -> error
end
end;
caps_read_fun(LServer, {Node, SubNode}, odbc) ->
fun() ->
SNode = ejabberd_odbc:escape(Node),
SSubNode = ejabberd_odbc:escape(SubNode),
case ejabberd_odbc:sql_query(
LServer, [<<"select feature from caps_features where ">>,
<<"node='">>, SNode, <<"' and subnode='">>,
SSubNode, <<"';">>]) of
{selected, [<<"feature">>], [[H]|_] = Fs} ->
case catch jlib:binary_to_integer(H) of
Int when is_integer(Int), Int>=0 ->
{ok, Int};
_ ->
{ok, lists:flatten(Fs)}
end;
_ ->
error
end
end.
caps_write_fun(Node, Features) ->
fun () ->
mnesia:dirty_write(#caps_features{node_pair = Node,
features = Features})
end;
caps_write_fun(_LServer, Node, Features, riak) ->
fun () ->
ejabberd_riak:put(#caps_features{node_pair = Node,
features = Features})
end;
caps_write_fun(LServer, NodePair, Features, odbc) ->
fun () ->
ejabberd_odbc:sql_transaction(
LServer,
sql_write_features_t(NodePair, Features))
end.
make_my_disco_hash(Host) ->

View File

@ -591,6 +591,17 @@ get_data(_LServer, Host, From, mnesia) ->
[] -> empty;
[#irc_custom{data = Data}] -> Data
end;
get_data(LServer, Host, From, riak) ->
#jid{luser = LUser, lserver = LServer} = From,
US = {LUser, LServer},
case ejabberd_riak:get(irc_custom, {US, Host}) of
{ok, #irc_custom{data = Data}} ->
Data;
{error, notfound} ->
empty;
_Err ->
error
end;
get_data(LServer, Host, From, odbc) ->
SJID =
ejabberd_odbc:escape(jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(From)))),
@ -723,6 +734,11 @@ set_data(_LServer, Host, From, Data, mnesia) ->
data = Data})
end,
mnesia:transaction(F);
set_data(LServer, Host, From, Data, riak) ->
{LUser, LServer, _} = jlib:jid_tolower(From),
US = {LUser, LServer},
{atomic, ejabberd_riak:put(#irc_custom{us_host = {US, Host},
data = Data})};
set_data(LServer, Host, From, Data, odbc) ->
SJID =
ejabberd_odbc:escape(jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(From)))),

View File

@ -168,6 +168,16 @@ get_last(LUser, LServer, mnesia) ->
status = Status}] ->
{ok, TimeStamp, Status}
end;
get_last(LUser, LServer, riak) ->
case ejabberd_riak:get(last_activity, {LUser, LServer}) of
{ok, #last_activity{timestamp = TimeStamp,
status = Status}} ->
{ok, TimeStamp, Status};
{error, notfound} ->
not_found;
Err ->
Err
end;
get_last(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
case catch odbc_queries:get_last(LServer, Username) of
@ -235,6 +245,12 @@ store_last_info(LUser, LServer, TimeStamp, Status,
status = Status})
end,
mnesia:transaction(F);
store_last_info(LUser, LServer, TimeStamp, Status,
riak) ->
US = {LUser, LServer},
{atomic, ejabberd_riak:put(#last_activity{us = US,
timestamp = TimeStamp,
status = Status})};
store_last_info(LUser, LServer, TimeStamp, Status,
odbc) ->
Username = ejabberd_odbc:escape(LUser),
@ -264,7 +280,9 @@ remove_user(LUser, LServer, mnesia) ->
mnesia:transaction(F);
remove_user(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
odbc_queries:del_last(LServer, Username).
odbc_queries:del_last(LServer, Username);
remove_user(LUser, LServer, riak) ->
{atomic, ejabberd_riak:delete(last_activity, {LUser, LServer})}.
update_table() ->
Fields = record_info(fields, last_activity),

View File

@ -147,6 +147,9 @@ store_room(_LServer, Host, Name, Opts, mnesia) ->
opts = Opts})
end,
mnesia:transaction(F);
store_room(_LServer, Host, Name, Opts, riak) ->
{atomic, ejabberd_riak:put(#muc_room{name_host = {Name, Host},
opts = Opts})};
store_room(LServer, Host, Name, Opts, odbc) ->
SName = ejabberd_odbc:escape(Name),
SHost = ejabberd_odbc:escape(Host),
@ -170,6 +173,11 @@ restore_room(_LServer, Host, Name, mnesia) ->
[#muc_room{opts = Opts}] -> Opts;
_ -> error
end;
restore_room(_LServer, Host, Name, riak) ->
case ejabberd_riak:get(muc_room, {Name, Host}) of
{ok, #muc_room{opts = Opts}} -> Opts;
_ -> error
end;
restore_room(LServer, Host, Name, odbc) ->
SName = ejabberd_odbc:escape(Name),
SHost = ejabberd_odbc:escape(Host),
@ -192,6 +200,8 @@ forget_room(_LServer, Host, Name, mnesia) ->
F = fun () -> mnesia:delete({muc_room, {Name, Host}})
end,
mnesia:transaction(F);
forget_room(_LServer, Host, Name, riak) ->
{atomic, ejabberd_riak:delete(muc_room, {Name, Host})};
forget_room(LServer, Host, Name, odbc) ->
SName = ejabberd_odbc:escape(Name),
SHost = ejabberd_odbc:escape(Host),
@ -231,6 +241,18 @@ can_use_nick(_LServer, Host, JID, Nick, mnesia) ->
[] -> true;
[#muc_registered{us_host = {U, _Host}}] -> U == LUS
end;
can_use_nick(LServer, Host, JID, Nick, riak) ->
{LUser, LServer, _} = jlib:jid_tolower(JID),
LUS = {LUser, LServer},
case ejabberd_riak:get_by_index(muc_registered,
<<"nick_host">>, {Nick, Host}) of
{ok, []} ->
true;
{ok, [#muc_registered{us_host = {U, _Host}}]} ->
U == LUS;
{error, _} ->
true
end;
can_use_nick(LServer, Host, JID, Nick, odbc) ->
SJID =
jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(JID))),
@ -617,6 +639,16 @@ get_rooms(_LServer, Host, mnesia) ->
{'EXIT', Reason} -> ?ERROR_MSG("~p", [Reason]), [];
Rs -> Rs
end;
get_rooms(_LServer, Host, riak) ->
case ejabberd_riak:get(muc_room) of
{ok, Rs} ->
lists:filter(
fun(#muc_room{name_host = {_, H}}) ->
Host == H
end, Rs);
_Err ->
[]
end;
get_rooms(LServer, Host, odbc) ->
SHost = ejabberd_odbc:escape(Host),
case catch ejabberd_odbc:sql_query(LServer,
@ -839,6 +871,13 @@ get_nick(_LServer, Host, From, mnesia) ->
[] -> error;
[#muc_registered{nick = Nick}] -> Nick
end;
get_nick(LServer, Host, From, riak) ->
{LUser, LServer, _} = jlib:jid_tolower(From),
US = {LUser, LServer},
case ejabberd_riak:get(muc_registered, {US, Host}) of
{ok, #muc_registered{nick = Nick}} -> Nick;
{error, _} -> error
end;
get_nick(LServer, Host, From, odbc) ->
SJID =
ejabberd_odbc:escape(jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(From)))),
@ -922,6 +961,33 @@ set_nick(_LServer, Host, From, Nick, mnesia) ->
end
end,
mnesia:transaction(F);
set_nick(LServer, Host, From, Nick, riak) ->
{LUser, LServer, _} = jlib:jid_tolower(From),
LUS = {LUser, LServer},
{atomic,
case Nick of
<<"">> ->
ejabberd_riak:delete(muc_registered, {LUS, Host});
_ ->
Allow = case ejabberd_riak:get_by_index(
muc_registered,
<<"nick_host">>, {Nick, Host}) of
{ok, []} ->
true;
{ok, [#muc_registered{us_host = {U, _Host}}]} ->
U == LUS;
{error, _} ->
false
end,
if Allow ->
ejabberd_riak:put(#muc_registered{us_host = {LUS, Host},
nick = Nick},
[{'2i', [{<<"nick_host">>,
{Nick, Host}}]}]);
true ->
false
end
end};
set_nick(LServer, Host, From, Nick, odbc) ->
JID =
jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(From))),

View File

@ -187,43 +187,9 @@ store_offline_msg(Host, {User, _}, Msgs, Len, MaxOfflineMsgs,
discard_warn_sender(Msgs);
true ->
lists:foreach(
fun(M) ->
Username = User,
From = M#offline_msg.from,
To = M#offline_msg.to,
#xmlel{name = Name, attrs = Attrs,
children = Els} =
M#offline_msg.packet,
Attrs2 = jlib:replace_from_to_attrs(
jlib:jid_to_string(From),
jlib:jid_to_string(To),
Attrs),
Packet = #xmlel{name = Name,
attrs = Attrs2,
children =
Els ++
[jlib:timestamp_to_xml(
calendar:now_to_universal_time(
M#offline_msg.timestamp),
utc,
jlib:make_jid(<<"">>, Host, <<"">>),
<<"Offline Storage">>),
jlib:timestamp_to_xml(
calendar:now_to_universal_time(
M#offline_msg.timestamp))]},
XML = xml:element_to_binary(Packet),
{MegaSecs, Secs, MicroSecs} =
M#offline_msg.timestamp,
TS =
iolist_to_binary(
io_lib:format("~6..0w~6..0w.~6..0w",
[MegaSecs, Secs, MicroSecs])),
ejabberd_riak:put(
Host, <<"offline">>,
undefined, XML,
[{<<"user_bin">>, Username},
{<<"timestamp_bin">>, TS}
])
fun(#offline_msg{us = US,
timestamp = TS} = M) ->
ejabberd_riak:put(M, [{i, TS}, {'2i', [{<<"us">>, US}]}])
end, Msgs)
end.
@ -244,7 +210,7 @@ receive_all(US, Msgs, DBType) ->
case DBType of
mnesia -> Msgs;
odbc -> lists:reverse(Msgs);
riak -> lists:reverse(Msgs)
riak -> Msgs
end
end.
@ -474,41 +440,30 @@ pop_offline_messages(Ls, LUser, LServer, odbc) ->
_ -> Ls
end;
pop_offline_messages(Ls, LUser, LServer, riak) ->
Username = LUser,
case ejabberd_riak:get_objects_by_index(
LServer, <<"offline">>, <<"user_bin">>, Username) of
case ejabberd_riak:get_by_index(offline_msg,
<<"us">>, {LUser, LServer}) of
{ok, Rs} ->
SortedRs =
lists:sort(fun(X, Y) ->
MX = riak_object:get_metadata(X),
{ok, IX} = dict:find(<<"index">>, MX),
{value, TSX} = lists:keysearch(
<<"timestamp_bin">>, 1,
IX),
MY = riak_object:get_metadata(Y),
{ok, IY} = dict:find(<<"index">>, MY),
{value, TSY} = lists:keysearch(
<<"timestamp_bin">>, 1,
IY),
TSX =< TSY
end, Rs),
Ls ++ lists:flatmap(
fun(R) ->
Key = riak_object:key(R),
ejabberd_riak:delete(LServer, <<"offline">>, Key),
XML = riak_object:get_value(R),
case xml_stream:parse_element(XML) of
{error, _Reason} ->
[];
El ->
case offline_msg_to_route(LServer, El) of
error ->
[];
RouteMsg ->
[RouteMsg]
end
end
end, SortedRs);
try
lists:foreach(
fun(#offline_msg{timestamp = T}) ->
ok = ejabberd_riak:delete(offline_msg, T)
end, Rs),
TS = now(),
Ls ++ lists:map(
fun (R) ->
offline_msg_to_route(LServer, R)
end,
lists:filter(
fun(R) ->
case R#offline_msg.expire of
never -> true;
TimeStamp -> TS < TimeStamp
end
end,
lists:keysort(#offline_msg.timestamp, Rs)))
catch _:{badmatch, _} ->
Ls
end;
_ ->
Ls
end.
@ -579,17 +534,8 @@ remove_user(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
odbc_queries:del_spool_msg(LServer, Username);
remove_user(LUser, LServer, riak) ->
Username = LUser,
case ejabberd_riak:get_keys_by_index(
LServer, <<"offline">>, <<"user_bin">>, Username) of
{ok, Keys} ->
lists:foreach(
fun(Key) ->
ejabberd_riak:delete(LServer, <<"offline">>, Key)
end, Keys);
_ ->
ok
end.
{atomic, ejabberd_riak:delete_by_index(offline_msg,
<<"us">>, {LUser, LServer})}.
jid_to_binary(#jid{user = U, server = S, resource = R,
luser = LU, lserver = LS, lresource = LR}) ->
@ -650,6 +596,8 @@ get_offline_els(LUser, LServer) ->
get_offline_els(LUser, LServer, mnesia) ->
Msgs = read_all_msgs(LUser, LServer, mnesia),
get_offline_els(LUser, LServer, DBType) when DBType == mnesia; DBType == riak ->
Msgs = read_all_msgs(LUser, LServer, DBType),
lists:map(
fun(Msg) ->
{route, From, To, Packet} = offline_msg_to_route(LServer, Msg),
@ -706,6 +654,14 @@ read_all_msgs(LUser, LServer, mnesia) ->
US = {LUser, LServer},
lists:keysort(#offline_msg.timestamp,
mnesia:dirty_read({offline_msg, US}));
read_all_msgs(LUser, LServer, riak) ->
case ejabberd_riak:get_by_index(
offline_msg, <<"us">>, {LUser, LServer}) of
{ok, Rs} ->
lists:keysort(#offline_msg.timestamp, Rs);
_Err ->
[]
end;
read_all_msgs(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
case catch ejabberd_odbc:sql_query(LServer,
@ -723,7 +679,7 @@ read_all_msgs(LUser, LServer, odbc) ->
_ -> []
end.
format_user_queue(Msgs, mnesia) ->
format_user_queue(Msgs, DBType) when DBType == mnesia; DBType == riak ->
lists:map(fun (#offline_msg{timestamp = TimeStamp,
from = From, to = To,
packet =
@ -831,6 +787,26 @@ user_queue_parse_query(LUser, LServer, Query, mnesia) ->
ok;
false -> nothing
end;
user_queue_parse_query(LUser, LServer, Query, riak) ->
case lists:keysearch(<<"delete">>, 1, Query) of
{value, _} ->
Msgs = read_all_msgs(LUser, LServer, riak),
lists:foreach(
fun (Msg) ->
ID = jlib:encode_base64((term_to_binary(Msg))),
case lists:member({<<"selected">>, ID}, Query) of
true ->
ejabberd_riak:delete(offline_msg,
Msg#offline_msg.timestamp);
false ->
ok
end
end,
Msgs),
ok;
false ->
nothing
end;
user_queue_parse_query(LUser, LServer, Query, odbc) ->
Username = ejabberd_odbc:escape(LUser),
case lists:keysearch(<<"delete">>, 1, Query) of
@ -889,6 +865,14 @@ get_queue_length(LUser, LServer) ->
get_queue_length(LUser, LServer, mnesia) ->
length(mnesia:dirty_read({offline_msg,
{LUser, LServer}}));
get_queue_length(LUser, LServer, riak) ->
case ejabberd_riak:count_by_index(offline_msg,
<<"us">>, {LUser, LServer}) of
{ok, N} ->
N;
_ ->
0
end;
get_queue_length(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
case catch ejabberd_odbc:sql_query(LServer,
@ -917,7 +901,8 @@ get_messages_subset(User, Host, MsgsAll, DBType) ->
get_messages_subset2(Max, Length, MsgsAll, _DBType)
when Length =< Max * 2 ->
MsgsAll;
get_messages_subset2(Max, Length, MsgsAll, mnesia) ->
get_messages_subset2(Max, Length, MsgsAll, DBType)
when DBType == mnesia; DBType == riak ->
FirstN = Max,
{MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll),
MsgsLastN = lists:nthtail(Length - FirstN - FirstN,
@ -965,6 +950,10 @@ delete_all_msgs(LUser, LServer, mnesia) ->
mnesia:dirty_read({offline_msg, US}))
end,
mnesia:transaction(F);
delete_all_msgs(LUser, LServer, riak) ->
Res = ejabberd_riak:delete_by_index(offline_msg,
<<"us">>, {LUser, LServer}),
{atomic, Res};
delete_all_msgs(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
odbc_queries:del_spool_msg(LServer, Username),
@ -987,16 +976,44 @@ webadmin_user_parse_query(Acc, _Action, _User, _Server,
Acc.
%% Returns as integer the number of offline messages for a given user
count_offline_messages(LUser, LServer) ->
count_offline_messages(User, Server) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
DBType = gen_mod:db_type(LServer, ?MODULE),
count_offline_messages(LUser, LServer, DBType).
count_offline_messages(LUser, LServer, mnesia) ->
US = {LUser, LServer},
F = fun () ->
p1_mnesia:count_records(offline_msg,
#offline_msg{us = US, _ = '_'})
end,
case catch mnesia:async_dirty(F) of
I when is_integer(I) -> I;
_ -> 0
end;
count_offline_messages(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
case catch odbc_queries:count_records_where(
LServer, "spool",
<<"where username='", Username/binary, "'">>) of
{selected, [_], [[Res]]} ->
jlib:binary_to_integer(Res);
case catch odbc_queries:count_records_where(LServer,
<<"spool">>,
<<"where username='",
Username/binary, "'">>)
of
{selected, [_], [[Res]]} ->
jlib:binary_to_integer(Res);
_ -> 0
end;
count_offline_messages(LUser, LServer, riak) ->
case ejabberd_riak:count_by_index(
offline_msg, <<"us">>, {LUser, LServer}) of
{ok, Res} ->
Res;
_ ->
0
end.
end;
count_offline_messages(_Acc, User, Server) ->
N = count_offline_messages(User, Server),
{stop, N}.
export(_Server) ->
[{offline_msg,

View File

@ -160,6 +160,21 @@ process_lists_get(LUser, LServer, _Active, mnesia) ->
Lists),
{Default, LItems}
end;
process_lists_get(LUser, LServer, _Active, riak) ->
case ejabberd_riak:get(privacy, {LUser, LServer}) of
{ok, #privacy{default = Default, lists = Lists}} ->
LItems = lists:map(fun ({N, _}) ->
#xmlel{name = <<"list">>,
attrs = [{<<"name">>, N}],
children = []}
end,
Lists),
{Default, LItems};
{error, notfound} ->
{none, []};
{error, _} ->
error
end;
process_lists_get(LUser, LServer, _Active, odbc) ->
Default = case catch sql_get_default_privacy_list(LUser,
LServer)
@ -209,6 +224,18 @@ process_list_get(LUser, LServer, Name, mnesia) ->
_ -> not_found
end
end;
process_list_get(LUser, LServer, Name, riak) ->
case ejabberd_riak:get(privacy, {LUser, LServer}) of
{ok, #privacy{lists = Lists}} ->
case lists:keysearch(Name, 1, Lists) of
{value, {_, List}} -> List;
_ -> not_found
end;
{error, notfound} ->
not_found;
{error, _} ->
error
end;
process_list_get(LUser, LServer, Name, odbc) ->
case catch sql_get_privacy_list_id(LUser, LServer, Name)
of
@ -354,6 +381,20 @@ process_default_set(LUser, LServer, {value, Name},
end
end,
mnesia:transaction(F);
process_default_set(LUser, LServer, {value, Name}, riak) ->
{atomic,
case ejabberd_riak:get(privacy, {LUser, LServer}) of
{ok, #privacy{lists = Lists} = P} ->
case lists:keymember(Name, 1, Lists) of
true ->
ejabberd_riak:put(P#privacy{default = Name,
lists = Lists});
false ->
not_found
end;
{error, _} ->
not_found
end};
process_default_set(LUser, LServer, {value, Name},
odbc) ->
F = fun () ->
@ -375,6 +416,14 @@ process_default_set(LUser, LServer, false, mnesia) ->
end
end,
mnesia:transaction(F);
process_default_set(LUser, LServer, false, riak) ->
{atomic,
case ejabberd_riak:get(privacy, {LUser, LServer}) of
{ok, R} ->
ejabberd_riak:put(R#privacy{default = none});
{error, _} ->
ok
end};
process_default_set(LUser, LServer, false, odbc) ->
case catch sql_unset_default_privacy_list(LUser,
LServer)
@ -407,6 +456,16 @@ process_active_set(LUser, LServer, Name, mnesia) ->
false -> error
end
end;
process_active_set(LUser, LServer, Name, riak) ->
case ejabberd_riak:get(privacy, {LUser, LServer}) of
{ok, #privacy{lists = Lists}} ->
case lists:keysearch(Name, 1, Lists) of
{value, {_, List}} -> List;
false -> error
end;
{error, _} ->
error
end;
process_active_set(LUser, LServer, Name, odbc) ->
case catch sql_get_privacy_list_id(LUser, LServer, Name)
of
@ -438,6 +497,19 @@ remove_privacy_list(LUser, LServer, Name, mnesia) ->
end
end,
mnesia:transaction(F);
remove_privacy_list(LUser, LServer, Name, riak) ->
{atomic,
case ejabberd_riak:get(privacy, {LUser, LServer}) of
{ok, #privacy{default = Default, lists = Lists} = P} ->
if Name == Default ->
conflict;
true ->
NewLists = lists:keydelete(Name, 1, Lists),
ejabberd_riak:put(P#privacy{lists = NewLists})
end;
{error, _} ->
ok
end};
remove_privacy_list(LUser, LServer, Name, odbc) ->
F = fun () ->
case sql_get_default_privacy_list_t(LUser) of
@ -465,6 +537,18 @@ set_privacy_list(LUser, LServer, Name, List, mnesia) ->
end
end,
mnesia:transaction(F);
set_privacy_list(LUser, LServer, Name, List, riak) ->
{atomic,
case ejabberd_riak:get(privacy, {LUser, LServer}) of
{ok, #privacy{lists = Lists} = P} ->
NewLists1 = lists:keydelete(Name, 1, Lists),
NewLists = [{Name, List} | NewLists1],
ejabberd_riak:put(P#privacy{lists = NewLists});
{error, _} ->
NewLists = [{Name, List}],
ejabberd_riak:put(#privacy{us = {LUser, LServer},
lists = NewLists})
end};
set_privacy_list(LUser, LServer, Name, List, odbc) ->
RItems = lists:map(fun item_to_raw/1, List),
F = fun () ->
@ -649,6 +733,20 @@ get_user_list(_, LUser, LServer, mnesia) ->
end;
_ -> {none, []}
end;
get_user_list(_, LUser, LServer, riak) ->
case ejabberd_riak:get(privacy, {LUser, LServer}) of
{ok, #privacy{default = Default, lists = Lists}} ->
case Default of
none -> {none, []};
_ ->
case lists:keysearch(Default, 1, Lists) of
{value, {_, List}} -> {Default, List};
_ -> {none, []}
end
end;
{error, _} ->
{none, []}
end;
get_user_list(_, LUser, LServer, odbc) ->
case catch sql_get_default_privacy_list(LUser, LServer)
of
@ -680,6 +778,13 @@ get_user_lists(LUser, LServer, mnesia) ->
_ ->
error
end;
get_user_lists(LUser, LServer, riak) ->
case ejabberd_riak:get(privacy, {LUser, LServer}) of
{ok, #privacy{} = P} ->
{ok, P};
{error, _} ->
error
end;
get_user_lists(LUser, LServer, odbc) ->
Default = case catch sql_get_default_privacy_list(LUser, LServer) of
{selected, [<<"name">>], []} ->
@ -843,6 +948,8 @@ remove_user(LUser, LServer, mnesia) ->
F = fun () -> mnesia:delete({privacy, {LUser, LServer}})
end,
mnesia:transaction(F);
remove_user(LUser, LServer, riak) ->
{atomic, ejabberd_riak:delete(privacy, {LUser, LServer})};
remove_user(LUser, LServer, odbc) ->
sql_del_privacy_lists(LUser, LServer).

View File

@ -152,13 +152,9 @@ set_data(LUser, LServer, {XMLNS, El}, odbc) ->
odbc_queries:set_private_data(LServer, Username, LXMLNS,
SData);
set_data(LUser, LServer, {XMLNS, El}, riak) ->
Username = LUser,
Key = <<LUser/binary, $@, LServer/binary, $@, XMLNS/binary>>,
SData = xml:element_to_binary(El),
ejabberd_riak:put(
LServer, <<"private">>, Key, SData,
[{<<"user_bin">>, Username}]),
ok.
ejabberd_riak:put(#private_storage{usns = {LUser, LServer, XMLNS},
xml = El},
[{'2i', [{<<"us">>, {LUser, LServer}}]}]).
get_data(LUser, LServer, Data) ->
get_data(LUser, LServer,
@ -195,18 +191,13 @@ get_data(LUser, LServer, odbc, [{XMLNS, El} | Els],
end;
get_data(LUser, LServer, riak, [{XMLNS, El} | Els],
Res) ->
Key = <<LUser/binary, $@, LServer/binary, $@, XMLNS/binary>>,
case ejabberd_riak:get(LServer, <<"private">>, Key) of
{ok, SData} ->
case xml_stream:parse_element(SData) of
Data when element(1, Data) == xmlelement ->
get_data(LUser, LServer, riak, Els, [Data | Res])
end;
_ ->
get_data(LUser, LServer, riak, Els, [El | Res])
case ejabberd_riak:get(private_storage, {LUser, LServer, XMLNS}) of
{ok, #private_storage{xml = NewEl}} ->
get_data(LUser, LServer, riak, Els, [NewEl|Res]);
_ ->
get_data(LUser, LServer, riak, Els, [El|Res])
end.
get_data(LUser, LServer) ->
get_all_data(LUser, LServer,
gen_mod:db_type(LServer, ?MODULE)).
@ -234,19 +225,10 @@ get_all_data(LUser, LServer, odbc) ->
[]
end;
get_all_data(LUser, LServer, riak) ->
Username = LUser,
case ejabberd_riak:get_by_index(
LServer, <<"private">>, <<"user_bin">>, Username) of
private_storage, <<"us">>, {LUser, LServer}) of
{ok, Res} ->
lists:flatmap(
fun(SData) ->
case xml_stream:parse_element(SData) of
#xmlel{} = El ->
[El];
_ ->
[]
end
end, Res);
[El || #private_storage{xml = El} <- Res];
_ ->
[]
end.
@ -279,17 +261,8 @@ remove_user(LUser, LServer, odbc) ->
odbc_queries:del_user_private_storage(LServer,
Username);
remove_user(LUser, LServer, riak) ->
Username = LUser,
case ejabberd_riak:get_keys_by_index(
LServer, <<"private">>, <<"user_bin">>, Username) of
{ok, Keys} ->
lists:foreach(
fun(Key) ->
ejabberd_riak:delete(LServer, <<"private">>, Key)
end, Keys);
_ ->
ok
end.
{atomic, ejabberd_riak:delete_by_index(private_storage,
<<"us">>, {LUser, LServer})}.
update_table() ->
Fields = record_info(fields, private_storage),

View File

@ -206,11 +206,9 @@ read_roster_version(LUser, LServer, odbc) ->
{selected, [<<"version">>], []} -> error
end;
read_roster_version(LServer, LUser, riak) ->
Username = LUser,
case ejabberd_riak:get(LServer, <<"roster_version">>,
Username) of
case ejabberd_riak:get(roster_version, {LUser, LServer}) of
{ok, Version} -> Version;
{error, notfound} -> error
_Err -> error
end.
write_roster_version(LUser, LServer) ->
@ -249,8 +247,8 @@ write_roster_version(LUser, LServer, InTransaction, Ver,
end;
write_roster_version(LUser, LServer, _InTransaction, Ver,
riak) ->
Username = LUser,
riak_set_roster_version(LServer, Username, Ver).
US = {LUser, LServer},
ejabberd_riak:put(#roster_version{us = US, version = Ver}).
%% Load roster from DB only if neccesary.
%% It is neccesary if
@ -358,6 +356,11 @@ get_roster(LUser, LServer, mnesia) ->
Items when is_list(Items)-> Items;
_ -> []
end;
get_roster(LUser, LServer, riak) ->
case ejabberd_riak:get_by_index(roster, <<"us">>, {LUser, LServer}) of
{ok, Items} -> Items;
_Err -> []
end;
get_roster(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
case catch odbc_queries:get_roster(LServer, Username) of
@ -399,37 +402,6 @@ get_roster(LUser, LServer, odbc) ->
Items),
RItems;
_ -> []
end;
get_roster(LUser, LServer, riak) ->
Username = LUser,
case catch riak_get_roster(LServer, Username) of
{ok, Items} when is_list(Items) ->
JIDGroups = case riak_get_roster_jid_groups(LServer, Username) of
{ok, JGrps} when is_list(JGrps) ->
JGrps;
_ ->
[]
end,
GroupsDict = dict:from_list(JIDGroups),
RItems = lists:flatmap(
fun(I) ->
case riak_raw_to_record(LServer, I) of
%% Bad JID in database:
error ->
[];
R ->
SJID = jlib:jid_to_string(R#roster.jid),
Groups =
case dict:find(SJID, GroupsDict) of
{ok, Gs} -> Gs;
error -> []
end,
[R#roster{groups = Groups}]
end
end, Items),
RItems;
_ ->
[]
end.
item_to_xml(Item) ->
@ -499,29 +471,15 @@ get_roster_by_jid_t(LUser, LServer, LJID, odbc) ->
end
end;
get_roster_by_jid_t(LUser, LServer, LJID, riak) ->
Username = LUser,
SJID = jlib:jid_to_string(LJID),
Res = riak_get_roster_by_jid(LServer, Username, SJID),
case Res of
{error, _} ->
#roster{usj = {LUser, LServer, LJID},
us = {LUser, LServer},
jid = LJID};
case ejabberd_riak:get(roster, {LUser, LServer, LJID}) of
{ok, I} ->
R = riak_raw_to_record(LServer, I),
case R of
%% Bad JID in database:
error ->
#roster{usj = {LUser, LServer, LJID},
us = {LUser, LServer},
jid = LJID};
_ ->
R#roster{
usj = {LUser, LServer, LJID},
us = {LUser, LServer},
jid = LJID,
name = ""}
end
I#roster{jid = LJID, name = <<"">>, groups = [],
xs = []};
{error, notfound} ->
#roster{usj = {LUser, LServer, LJID},
us = {LUser, LServer}, jid = LJID};
Err ->
exit(Err)
end.
try_process_iq_set(From, To, #iq{sub_el = SubEl} = IQ) ->
@ -702,12 +660,9 @@ get_subscription_lists(_, LUser, LServer, odbc) ->
_ -> []
end;
get_subscription_lists(_, LUser, LServer, riak) ->
Username = LUser,
case catch riak_get_roster(LServer, Username) of
{ok, Items} when is_list(Items) ->
lists:map(fun(I) -> riak_raw_to_record(LServer, I) end, Items);
_ ->
[]
case ejabberd_riak:get_by_index(roster, <<"us">>, {LUser, LServer}) of
{ok, Items} -> Items;
_Err -> []
end.
fill_subscription_lists(LServer, [#roster{} = I | Is],
@ -747,11 +702,9 @@ roster_subscribe_t(LUser, LServer, LJID, Item, odbc) ->
SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)),
odbc_queries:roster_subscribe(LServer, Username, SJID,
ItemVals);
roster_subscribe_t(LUser, LServer, LJID, Item, riak) ->
ItemVals = riak_record_to_string(Item),
Username = LUser,
SJID = jlib:jid_to_string(LJID),
riak_roster_subscribe(LServer, Username, SJID, ItemVals).
roster_subscribe_t(LUser, LServer, _LJID, Item, riak) ->
ejabberd_riak:put(Item,
[{'2i', [{<<"us">>, {LUser, LServer}}]}]).
transaction(LServer, F) ->
case gen_mod:db_type(LServer, ?MODULE) of
@ -810,23 +763,14 @@ get_roster_by_jid_with_groups_t(LUser, LServer, LJID,
us = {LUser, LServer}, jid = LJID}
end;
get_roster_by_jid_with_groups_t(LUser, LServer, LJID, riak) ->
Username = LUser,
SJID = jlib:jid_to_string(LJID),
case riak_get_roster_by_jid(LServer, Username, SJID) of
case ejabberd_riak:get(roster, {LUser, LServer, LJID}) of
{ok, I} ->
R = riak_raw_to_record(LServer, I),
Groups =
case riak_get_roster_groups(LServer, Username, SJID) of
{ok, JGrps} when is_list(JGrps) ->
JGrps;
_ ->
[]
end,
R#roster{groups = Groups};
{error, _} ->
I;
{error, notfound} ->
#roster{usj = {LUser, LServer, LJID},
us = {LUser, LServer},
jid = LJID}
us = {LUser, LServer}, jid = LJID};
Err ->
exit(Err)
end.
process_subscription(Direction, User, Server, JID1,
@ -1040,9 +984,7 @@ remove_user(LUser, LServer, odbc) ->
odbc_queries:del_user_roster_t(LServer, Username),
ok;
remove_user(LUser, LServer, riak) ->
Username = LUser,
riak_del_user_roster(LServer, Username),
ok.
{atomic, ejabberd_riak:delete_by_index(roster, <<"us">>, {LUser, LServer})}.
%% For each contact with Subscription:
%% Both or From, send a "unsubscribed" presence stanza;
@ -1114,13 +1056,9 @@ update_roster_t(LUser, LServer, LJID, Item, odbc) ->
ItemGroups = groups_to_string(Item),
odbc_queries:update_roster(LServer, Username, SJID, ItemVals,
ItemGroups);
update_roster_t(LUser, LServer, LJID, Item, riak) ->
Username = LUser,
SJID = jlib:jid_to_string(LJID),
ItemVals = riak_record_to_string(Item),
ItemGroups = riak_groups_to_binary(Item),
riak_update_roster(
LServer, Username, SJID, ItemVals, ItemGroups).
update_roster_t(LUser, LServer, _LJID, Item, riak) ->
ejabberd_riak:put(Item,
[{'2i', [{<<"us">>, {LUser, LServer}}]}]).
del_roster_t(LUser, LServer, LJID) ->
DBType = gen_mod:db_type(LServer, ?MODULE),
@ -1133,9 +1071,7 @@ del_roster_t(LUser, LServer, LJID, odbc) ->
SJID = ejabberd_odbc:escape(jlib:jid_to_string(LJID)),
odbc_queries:del_roster(LServer, Username, SJID);
del_roster_t(LUser, LServer, LJID, riak) ->
Username = LUser,
SJID = jlib:jid_to_string(LJID),
riak_del_roster(LServer, Username, SJID).
ejabberd_riak:delete(roster, {LUser, LServer, LJID}).
process_item_set_t(LUser, LServer,
#xmlel{attrs = Attrs, children = Els}) ->
@ -1201,40 +1137,35 @@ get_in_pending_subscriptions(Ls, User, Server) ->
get_in_pending_subscriptions(Ls, User, Server,
gen_mod:db_type(LServer, ?MODULE)).
get_in_pending_subscriptions(Ls, User, Server,
mnesia) ->
get_in_pending_subscriptions(Ls, User, Server, DBType)
when DBType == mnesia; DBType == riak ->
JID = jlib:make_jid(User, Server, <<"">>),
US = {JID#jid.luser, JID#jid.lserver},
case mnesia:dirty_index_read(roster, US, #roster.us) of
Result when is_list(Result) ->
Ls ++
lists:map(fun (R) ->
Message = R#roster.askmessage,
Status = if is_binary(Message) -> (Message);
true -> <<"">>
end,
#xmlel{name = <<"presence">>,
attrs =
[{<<"from">>,
jlib:jid_to_string(R#roster.jid)},
{<<"to">>, jlib:jid_to_string(JID)},
{<<"type">>, <<"subscribe">>}],
children =
[#xmlel{name = <<"status">>,
attrs = [],
children =
[{xmlcdata, Status}]}]}
end,
lists:filter(fun (R) ->
case R#roster.ask of
in -> true;
both -> true;
_ -> false
end
end,
Result));
_ -> Ls
end;
Result = get_roster(JID#jid.luser, JID#jid.lserver, DBType),
Ls ++ lists:map(fun (R) ->
Message = R#roster.askmessage,
Status = if is_binary(Message) -> (Message);
true -> <<"">>
end,
#xmlel{name = <<"presence">>,
attrs =
[{<<"from">>,
jlib:jid_to_string(R#roster.jid)},
{<<"to">>, jlib:jid_to_string(JID)},
{<<"type">>, <<"subscribe">>}],
children =
[#xmlel{name = <<"status">>,
attrs = [],
children =
[{xmlcdata, Status}]}]}
end,
lists:filter(fun (R) ->
case R#roster.ask of
in -> true;
both -> true;
_ -> false
end
end,
Result));
get_in_pending_subscriptions(Ls, User, Server, odbc) ->
JID = jlib:make_jid(User, Server, <<"">>),
LUser = JID#jid.luser,
@ -1276,44 +1207,6 @@ get_in_pending_subscriptions(Ls, User, Server, odbc) ->
end,
Items));
_ -> Ls
end;
get_in_pending_subscriptions(Ls, User, Server, riak) ->
JID = jlib:make_jid(User, Server, <<"">>),
LUser = JID#jid.luser,
LServer = JID#jid.lserver,
Username = LUser,
case catch riak_get_roster(LServer, Username) of
{ok, Items} when is_list(Items) ->
Ls ++ lists:map(
fun(R) ->
Message = R#roster.askmessage,
#xmlel{name = <<"presence">>,
attrs = [{<<"from">>,
jlib:jid_to_string(R#roster.jid)},
{<<"to">>, jlib:jid_to_string(JID)},
{<<"type">>, <<"subscribe">>}],
children = [#xmlel{name = <<"status">>,
attrs = [],
children =
[{xmlcdata, Message}]}]}
end,
lists:flatmap(
fun(I) ->
case riak_raw_to_record(LServer, I) of
%% Bad JID in database:
error ->
[];
R ->
case R#roster.ask of
in -> [R];
both -> [R];
_ -> []
end
end
end,
Items));
_ ->
Ls
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -1361,18 +1254,12 @@ read_subscription_and_groups(LUser, LServer, LJID,
end;
read_subscription_and_groups(LUser, LServer, LJID,
riak) ->
Username = LUser,
SJID = jlib:jid_to_string(LJID),
case catch riak_get_subscription(LServer, Username, SJID) of
{ok, Subscription} ->
Groups = case riak_get_roster_jid_groups(LServer, Username) of
{ok, JGrps} when is_list(JGrps) ->
JGrps;
_ ->
[]
end,
{Subscription, Groups};
_ -> error
case ejabberd_riak:get(roster, {LUser, LServer, LJID}) of
{ok, #roster{subscription = Subscription,
groups = Groups}} ->
{Subscription, Groups};
_ ->
error
end.
get_jid_info(_, User, Server, JID) ->

View File

@ -400,6 +400,13 @@ list_groups(Host, mnesia) ->
mnesia:dirty_select(sr_group,
[{#sr_group{group_host = {'$1', '$2'}, _ = '_'},
[{'==', '$2', Host}], ['$1']}]);
list_groups(Host, riak) ->
case ejabberd_riak:get_keys_by_index(sr_group, <<"host">>, Host) of
{ok, Gs} ->
[G || {G, _} <- Gs];
_ ->
[]
end;
list_groups(Host, odbc) ->
case ejabberd_odbc:sql_query(Host,
[<<"select name from sr_group;">>])
@ -417,6 +424,13 @@ groups_with_opts(Host, mnesia) ->
_ = '_'},
[], [['$1', '$2']]}]),
lists:map(fun ([G, O]) -> {G, O} end, Gs);
groups_with_opts(Host, riak) ->
case ejabberd_riak:get_by_index(sr_group, <<"host">>, Host) of
{ok, Rs} ->
[{G, O} || #sr_group{group_host = {G, _}, opts = O} <- Rs];
_ ->
[]
end;
groups_with_opts(Host, odbc) ->
case ejabberd_odbc:sql_query(Host,
[<<"select name, opts from sr_group;">>])
@ -438,6 +452,10 @@ create_group(Host, Group, Opts, mnesia) ->
R = #sr_group{group_host = {Group, Host}, opts = Opts},
F = fun () -> mnesia:write(R) end,
mnesia:transaction(F);
create_group(Host, Group, Opts, riak) ->
{atomic, ejabberd_riak:put(#sr_group{group_host = {Group, Host},
opts = Opts},
[{'2i', [{<<"host">>, Host}]}])};
create_group(Host, Group, Opts, odbc) ->
SGroup = ejabberd_odbc:escape(Group),
SOpts = ejabberd_odbc:encode_term(Opts),
@ -464,6 +482,15 @@ delete_group(Host, Group, mnesia) ->
Users)
end,
mnesia:transaction(F);
delete_group(Host, Group, riak) ->
try
ok = ejabberd_riak:delete(sr_group, {Group, Host}),
ok = ejabberd_riak:delete_by_index(sr_user, <<"group_host">>,
{Group, Host}),
{atomic, ok}
catch _:{badmatch, Err} ->
{atomic, Err}
end;
delete_group(Host, Group, odbc) ->
SGroup = ejabberd_odbc:escape(Group),
F = fun () ->
@ -483,6 +510,11 @@ get_group_opts(Host, Group, mnesia) ->
[#sr_group{opts = Opts}] -> Opts;
_ -> error
end;
get_group_opts(Host, Group, riak) ->
case ejabberd_riak:get(sr_group, {Group, Host}) of
{ok, #sr_group{opts = Opts}} -> Opts;
_ -> error
end;
get_group_opts(Host, Group, odbc) ->
SGroup = ejabberd_odbc:escape(Group),
case catch ejabberd_odbc:sql_query(Host,
@ -502,6 +534,10 @@ set_group_opts(Host, Group, Opts, mnesia) ->
R = #sr_group{group_host = {Group, Host}, opts = Opts},
F = fun () -> mnesia:write(R) end,
mnesia:transaction(F);
set_group_opts(Host, Group, Opts, riak) ->
{atomic, ejabberd_riak:put(#sr_group{group_host = {Group, Host},
opts = Opts},
[{'2i', [{<<"host">>, Host}]}])};
set_group_opts(Host, Group, Opts, odbc) ->
SGroup = ejabberd_odbc:escape(Group),
SOpts = ejabberd_odbc:encode_term(Opts),
@ -525,6 +561,13 @@ get_user_groups(US, Host, mnesia) ->
|| #sr_user{group_host = {Group, H}} <- Rs, H == Host];
_ -> []
end;
get_user_groups(US, Host, riak) ->
case ejabberd_riak:get_by_index(sr_user, <<"us">>, US) of
{ok, Rs} ->
[Group || #sr_user{group_host = {Group, H}} <- Rs, H == Host];
_ ->
[]
end;
get_user_groups(US, Host, odbc) ->
SJID = make_jid_s(US),
case catch ejabberd_odbc:sql_query(Host,
@ -595,6 +638,14 @@ get_group_explicit_users(Host, Group, mnesia) ->
Rs when is_list(Rs) -> [R#sr_user.us || R <- Rs];
_ -> []
end;
get_group_explicit_users(Host, Group, riak) ->
case ejabberd_riak:get_by_index(sr_user, <<"group_host">>,
{Group, Host}) of
{ok, Rs} ->
[R#sr_user.us || R <- Rs];
_ ->
[]
end;
get_group_explicit_users(Host, Group, odbc) ->
SGroup = ejabberd_odbc:escape(Group),
case catch ejabberd_odbc:sql_query(Host,
@ -680,6 +731,16 @@ get_user_displayed_groups(LUser, LServer, GroupsOpts,
H == LServer];
_ -> []
end;
get_user_displayed_groups(LUser, LServer, GroupsOpts,
riak) ->
case ejabberd_riak:get_by_index(sr_user,
<<"us">>, {LUser, LServer}) of
{ok, Rs} ->
[{Group, proplists:get_value(Group, GroupsOpts, [])}
|| #sr_user{group_host = {Group, _}} <- Rs];
_ ->
[]
end;
get_user_displayed_groups(LUser, LServer, GroupsOpts,
odbc) ->
SJID = make_jid_s(LUser, LServer),
@ -726,6 +787,21 @@ is_user_in_group(US, Group, Host, mnesia) ->
[] -> lists:member(US, get_group_users(Host, Group));
_ -> true
end;
is_user_in_group(US, Group, Host, riak) ->
case ejabberd_riak:get_by_index(sr_user, <<"us">>, US) of
{ok, Rs} ->
case lists:any(
fun(#sr_user{group_host = {G, H}}) ->
(Group == G) and (Host == H)
end, Rs) of
false ->
lists:member(US, get_group_users(Host, Group));
true ->
true
end;
_Err ->
false
end;
is_user_in_group(US, Group, Host, odbc) ->
SJID = make_jid_s(US),
SGroup = ejabberd_odbc:escape(Group),
@ -765,6 +841,12 @@ add_user_to_group(Host, US, Group, mnesia) ->
R = #sr_user{us = US, group_host = {Group, Host}},
F = fun () -> mnesia:write(R) end,
mnesia:transaction(F);
add_user_to_group(Host, US, Group, riak) ->
{atomic, ejabberd_riak:put(
#sr_user{us = US, group_host = {Group, Host}},
[{i, {US, {Group, Host}}},
{'2i', [{<<"us">>, US},
{<<"group_host">>, {Group, Host}}]}])};
add_user_to_group(Host, US, Group, odbc) ->
SJID = make_jid_s(US),
SGroup = ejabberd_odbc:escape(Group),
@ -816,6 +898,8 @@ remove_user_from_group(Host, US, Group, mnesia) ->
R = #sr_user{us = US, group_host = {Group, Host}},
F = fun () -> mnesia:delete_object(R) end,
mnesia:transaction(F);
remove_user_from_group(Host, US, Group, riak) ->
{atomic, ejabberd_riak:delete(sr_group, {US, {Group, Host}})};
remove_user_from_group(Host, US, Group, odbc) ->
SJID = make_jid_s(US),
SGroup = ejabberd_odbc:escape(Group),

View File

@ -46,7 +46,7 @@
lbday, ctry, lctry, locality, llocality, email, lemail,
orgname, lorgname, orgunit, lorgunit}).
-record(vcard, {us = {<<"">>, <<"">>} :: {binary(), binary()},
-record(vcard, {us = {<<"">>, <<"">>} :: {binary(), binary()} | binary(),
vcard = #xmlel{} :: xmlel()}).
-define(PROCNAME, ejabberd_mod_vcard).
@ -214,13 +214,9 @@ get_vcard(LUser, LServer, odbc) ->
_ -> error
end;
get_vcard(LUser, LServer, riak) ->
Username = LUser,
case catch ejabberd_riak:get(LServer, <<"vcard">>, Username) of
{ok, SVCARD} ->
case xml_stream:parse_element(SVCARD) of
{error, _Reason} -> error;
VCARD -> [VCARD]
end;
case ejabberd_riak:get(vcard, {LUser, LServer}) of
{ok, R} ->
[R#vcard.vcard];
{error, notfound} ->
[];
_ ->
@ -302,6 +298,33 @@ set_vcard(User, LServer, VCARD) ->
lorgunit = LOrgUnit})
end,
mnesia:transaction(F);
riak ->
US = {LUser, LServer},
ejabberd_riak:put(#vcard{us = US, vcard = VCARD},
[{'2i', [{<<"user">>, User},
{<<"luser">>, LUser},
{<<"fn">>, FN},
{<<"lfn">>, LFN},
{<<"family">>, Family},
{<<"lfamily">>, LFamily},
{<<"given">>, Given},
{<<"lgiven">>, LGiven},
{<<"middle">>, Middle},
{<<"lmiddle">>, LMiddle},
{<<"nickname">>, Nickname},
{<<"lnickname">>, LNickname},
{<<"bday">>, BDay},
{<<"lbday">>, LBDay},
{<<"ctry">>, CTRY},
{<<"lctry">>, LCTRY},
{<<"locality">>, Locality},
{<<"llocality">>, LLocality},
{<<"email">>, EMail},
{<<"lemail">>, LEMail},
{<<"orgname">>, OrgName},
{<<"lorgname">>, LOrgName},
{<<"orgunit">>, OrgUnit},
{<<"lorgunit">>, LOrgUnit}]}]);
odbc ->
Username = ejabberd_odbc:escape(User),
LUsername = ejabberd_odbc:escape(LUser),
@ -335,25 +358,7 @@ set_vcard(User, LServer, VCARD) ->
SLGiven, SLLocality, SLMiddle,
SLNickname, SLOrgName, SLOrgUnit,
SLocality, SMiddle, SNickname, SOrgName,
SOrgUnit, SVCARD, Username);
riak ->
Username = LUser,
SVCARD = xml:element_to_binary(VCARD),
ejabberd_riak:put(
LServer, <<"vcard">>, Username, SVCARD,
[{<<"bday_bin">>, LBDay},
{<<"ctry_bin">>, LCTRY},
{<<"email_bin">>, LEMail},
{<<"fn_bin">>, LFN},
{<<"family_bin">>, LFamily},
{<<"given_bin">>, LGiven},
{<<"locality_bin">>, LLocality},
{<<"middle_bin">>, LMiddle},
{<<"nickname_bin">>, LNickname},
{<<"orgname_bin">>, LOrgName},
{<<"orgunit_bin">>, LOrgUnit},
{<<"user_bin">>, Username}])
SOrgUnit, SVCARD, Username)
end,
ejabberd_hooks:run(vcard_set, LServer,
[LUser, LServer, VCARD])
@ -921,9 +926,7 @@ remove_user(LUser, LServer, odbc) ->
[<<"delete from vcard_search where lusername='">>,
Username, <<"';">>]]);
remove_user(LUser, LServer, riak) ->
Username = LUser,
ejabberd_riak:delete(LServer, <<"vcard">>, Username),
ok.
{atomic, ejabberd_riak:delete(vcard, {LUser, LServer})}.
update_tables() ->
update_vcard_table(),

View File

@ -88,6 +88,9 @@ add_xupdate(LUser, LServer, Hash, mnesia) ->
hash = Hash})
end,
mnesia:transaction(F);
add_xupdate(LUser, LServer, Hash, riak) ->
{atomic, ejabberd_riak:put(#vcard_xupdate{us = {LUser, LServer},
hash = Hash})};
add_xupdate(LUser, LServer, Hash, odbc) ->
Username = ejabberd_odbc:escape(LUser),
SHash = ejabberd_odbc:escape(Hash),
@ -109,6 +112,11 @@ get_xupdate(LUser, LServer, mnesia) ->
[#vcard_xupdate{hash = Hash}] -> Hash;
_ -> undefined
end;
get_xupdate(LUser, LServer, riak) ->
case ejabberd_riak:get(vcard_xupdate, {LUser, LServer}) of
{ok, #vcard_xupdate{hash = Hash}} -> Hash;
_ -> undefined
end;
get_xupdate(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
case ejabberd_odbc:sql_query(LServer,
@ -129,6 +137,8 @@ remove_xupdate(LUser, LServer, mnesia) ->
mnesia:delete({vcard_xupdate, {LUser, LServer}})
end,
mnesia:transaction(F);
remove_xupdate(LUser, LServer, riak) ->
{atomic, ejabberd_riak:delete(vcard_xupdate, {LUser, LServer})};
remove_xupdate(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
F = fun () ->