Serialize records to proplists before storing then in Riak

This commit is contained in:
Evgeniy Khramtsov 2014-07-14 07:49:02 +04:00
parent 19cc687928
commit 792b5a24df
15 changed files with 237 additions and 105 deletions

View File

@ -40,6 +40,7 @@
get_password_s/2, is_user_exists/2, remove_user/2,
remove_user/3, store_type/0, export/1, import/3,
plain_password_required/0]).
-export([passwd_schema/0]).
-include("ejabberd.hrl").
@ -63,10 +64,13 @@ store_type() ->
true -> scram %% allows: PLAIN SCRAM
end.
passwd_schema() ->
{record_info(fields, passwd), #passwd{}}.
check_password(User, Server, Password) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
case ejabberd_riak:get(passwd, {LUser, LServer}) of
case ejabberd_riak:get(passwd, passwd_schema(), {LUser, LServer}) of
{ok, #passwd{password = Password}} when is_binary(Password) ->
Password /= <<"">>;
{ok, #passwd{password = Scram}} when is_record(Scram, scram) ->
@ -79,7 +83,7 @@ check_password(User, Server, Password, Digest,
DigestGen) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
case ejabberd_riak:get(passwd, {LUser, LServer}) of
case ejabberd_riak:get(passwd, passwd_schema(), {LUser, LServer}) of
{ok, #passwd{password = Passwd}} when is_binary(Passwd) ->
DigRes = if Digest /= <<"">> ->
Digest == DigestGen(Passwd);
@ -114,6 +118,7 @@ set_password(User, Server, Password) ->
false -> Password
end,
ok = ejabberd_riak:put(#passwd{us = US, password = Password2},
passwd_schema(),
[{'2i', [{<<"host">>, LServer}]}])
end.
@ -125,7 +130,7 @@ try_register(User, Server, PasswordList) ->
if (LUser == error) or (LServer == error) ->
{error, invalid_jid};
true ->
case ejabberd_riak:get(passwd, US) of
case ejabberd_riak:get(passwd, passwd_schema(), US) of
{error, notfound} ->
Password2 = case is_scrammed() and
is_binary(Password)
@ -136,6 +141,7 @@ try_register(User, Server, PasswordList) ->
{atomic, ejabberd_riak:put(
#passwd{us = US,
password = Password2},
passwd_schema(),
[{'2i', [{<<"host">>, LServer}]}])};
{ok, _} ->
exists;
@ -177,7 +183,7 @@ get_vh_registered_users_number(Server, _) ->
get_password(User, Server) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
case ejabberd_riak:get(passwd, {LUser, LServer}) of
case ejabberd_riak:get(passwd, passwd_schema(), {LUser, LServer}) of
{ok, #passwd{password = Password}}
when is_binary(Password) ->
Password;
@ -193,7 +199,7 @@ get_password(User, Server) ->
get_password_s(User, Server) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
case ejabberd_riak:get(passwd, {LUser, LServer}) of
case ejabberd_riak:get(passwd, passwd_schema(), {LUser, LServer}) of
{ok, #passwd{password = Password}}
when is_binary(Password) ->
Password;
@ -206,7 +212,7 @@ get_password_s(User, Server) ->
is_user_exists(User, Server) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
case ejabberd_riak:get(passwd, {LUser, LServer}) of
case ejabberd_riak:get(passwd, passwd_schema(), {LUser, LServer}) of
{error, notfound} -> false;
{ok, _} -> true;
Err -> Err
@ -221,7 +227,7 @@ remove_user(User, Server) ->
remove_user(User, Server, Password) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
case ejabberd_riak:get(passwd, {LUser, LServer}) of
case ejabberd_riak:get(passwd, passwd_schema(), {LUser, LServer}) of
{ok, #passwd{password = Password}}
when is_binary(Password) ->
ejabberd_riak:delete(passwd, {LUser, LServer}),
@ -285,6 +291,6 @@ export(_Server) ->
end}].
import(LServer, riak, #passwd{} = Passwd) ->
ejabberd_riak:put(Passwd, [{'2i', [{<<"host">>, LServer}]}]);
ejabberd_riak:put(Passwd, passwd_schema(), [{'2i', [{<<"host">>, LServer}]}]);
import(_, _, _) ->
pass.

View File

@ -27,9 +27,9 @@
-behaviour(gen_server).
%% API
-export([start_link/4, get_proc/1, make_bucket/1, put/1, put/2,
get/1, get/2, get_by_index/3, delete/1, delete/2,
count_by_index/3, get_by_index_range/4,
-export([start_link/4, get_proc/1, make_bucket/1, put/2, put/3,
get/2, get/3, get_by_index/4, delete/1, delete/2,
count_by_index/3, get_by_index_range/5,
get_keys/1, get_keys_by_index/3, is_connected/0,
count/1, delete_by_index/3]).
%% For debugging
@ -50,6 +50,11 @@
-type index_info() :: [{i, any()} | {'2i', [index()]}].
%% The `record_schema()' is just a tuple:
%% {record_info(fields, some_record), #some_record{}}
-type record_schema() :: {[atom()], tuple()}.
%% The `index_info()' is used in put/delete functions:
%% `i' defines a primary index, `` '2i' '' defines secondary indexes.
%% There must be only one primary index. If `i' is not specified,
@ -81,19 +86,19 @@ get_proc(I) ->
make_bucket(Table) ->
erlang:atom_to_binary(Table, utf8).
-spec put(tuple()) -> ok | {error, any()}.
-spec put(tuple(), record_schema()) -> ok | {error, any()}.
%% @equiv put(Record, [])
put(Record) ->
?MODULE:put(Record, []).
put(Record, RecFields) ->
?MODULE:put(Record, RecFields, []).
-spec put(tuple(), index_info()) -> ok | {error, any()}.
-spec put(tuple(), record_schema(), index_info()) -> ok | {error, any()}.
%% @doc Stores a record `Rec' with indexes described in ``IndexInfo''
put(Rec, IndexInfo) ->
put(Rec, RecSchema, IndexInfo) ->
Key = encode_key(proplists:get_value(i, IndexInfo, element(2, Rec))),
SecIdxs = [encode_index_key(K, V) ||
{K, V} <- proplists:get_value('2i', IndexInfo, [])],
Table = element(1, Rec),
Value = term_to_binary(Rec),
Value = encode_record(Rec, RecSchema),
case put_raw(Table, Key, Value, SecIdxs) of
ok ->
ok;
@ -118,9 +123,9 @@ get_object_raw(Table, Key) ->
Bucket = make_bucket(Table),
catch riakc_pb_socket:get(get_random_pid(), Bucket, Key).
-spec get(atom()) -> {ok, [any()]} | {error, any()}.
-spec get(atom(), record_schema()) -> {ok, [any()]} | {error, any()}.
%% @doc Returns all objects from table `Table'
get(Table) ->
get(Table, RecSchema) ->
Bucket = make_bucket(Table),
case catch riakc_pb_socket:mapred(
get_random_pid(),
@ -130,7 +135,7 @@ get(Table) ->
{ok, [{_, Objs}]} ->
{ok, lists:flatmap(
fun(Obj) ->
case catch binary_to_term(Obj) of
case catch decode_record(Obj, RecSchema) of
{'EXIT', _} ->
Error = {error, make_invalid_object(Obj)},
log_error(Error, get,
@ -148,12 +153,12 @@ get(Table) ->
Error
end.
-spec get(atom(), any()) -> {ok, any()} | {error, any()}.
-spec get(atom(), record_schema(), any()) -> {ok, any()} | {error, any()}.
%% @doc Reads record by `Key' from table `Table'
get(Table, Key) ->
get(Table, RecSchema, Key) ->
case get_raw(Table, encode_key(Key)) of
{ok, Val} ->
case catch binary_to_term(Val) of
case catch decode_record(Val, RecSchema) of
{'EXIT', _} ->
Error = {error, make_invalid_object(Val)},
log_error(Error, get, [{table, Table}, {key, Key}]),
@ -167,15 +172,16 @@ get(Table, Key) ->
Error
end.
-spec get_by_index(atom(), binary(), any()) -> {ok, [any()]} | {error, any()}.
-spec get_by_index(atom(), record_schema(), binary(), any()) ->
{ok, [any()]} | {error, any()}.
%% @doc Reads records by `Index' and value `Key' from `Table'
get_by_index(Table, Index, Key) ->
get_by_index(Table, RecSchema, Index, Key) ->
{NewIndex, NewKey} = encode_index_key(Index, Key),
case get_by_index_raw(Table, NewIndex, NewKey) of
{ok, Vals} ->
{ok, lists:flatmap(
fun(Val) ->
case catch binary_to_term(Val) of
case catch decode_record(Val, RecSchema) of
{'EXIT', _} ->
Error = {error, make_invalid_object(Val)},
log_error(Error, get_by_index,
@ -197,17 +203,17 @@ get_by_index(Table, Index, Key) ->
Error
end.
-spec get_by_index_range(atom(), binary(), any(), any()) ->
-spec get_by_index_range(atom(), record_schema(), binary(), any(), any()) ->
{ok, [any()]} | {error, any()}.
%% @doc Reads records by `Index' in the range `FromKey'..`ToKey' from `Table'
get_by_index_range(Table, Index, FromKey, ToKey) ->
get_by_index_range(Table, RecSchema, Index, FromKey, ToKey) ->
{NewIndex, NewFromKey} = encode_index_key(Index, FromKey),
{NewIndex, NewToKey} = encode_index_key(Index, ToKey),
case get_by_index_range_raw(Table, NewIndex, NewFromKey, NewToKey) of
{ok, Vals} ->
{ok, lists:flatmap(
fun(Val) ->
case catch binary_to_term(Val) of
case catch decode_record(Val, RecSchema) of
{'EXIT', _} ->
Error = {error, make_invalid_object(Val)},
log_error(Error, get_by_index_range,
@ -518,3 +524,31 @@ get_random_pid() ->
{'EXIT', Err} ->
throw({error, Err})
end.
encode_record(Rec, {Fields, DefRec}) ->
term_to_binary(encode_record(Rec, Fields, DefRec, 2)).
encode_record(Rec, [FieldName|Fields], DefRec, Pos) ->
Value = element(Pos, Rec),
DefValue = element(Pos, DefRec),
if Value == DefValue ->
encode_record(Rec, Fields, DefRec, Pos+1);
true ->
[{FieldName, Value}|encode_record(Rec, Fields, DefRec, Pos+1)]
end;
encode_record(_, [], _, _) ->
[].
decode_record(Bin, {Fields, DefRec}) ->
decode_record(binary_to_term(Bin), Fields, DefRec, 2).
decode_record(KeyVals, [FieldName|Fields], Rec, Pos) ->
case lists:keyfind(FieldName, 1, KeyVals) of
{_, Value} ->
NewRec = setelement(Pos, Rec, Value),
decode_record(KeyVals, Fields, NewRec, Pos+1);
false ->
decode_record(KeyVals, Fields, Rec, Pos+1)
end;
decode_record(_, [], Rec, _) ->
Rec.

View File

@ -797,6 +797,7 @@ announce_motd(Host, Packet) ->
lists:foreach(
fun({U, S, _R}) ->
ok = ejabberd_riak:put(#motd_users{us = {U, S}},
motd_users_schema(),
[{'2i', [{<<"server">>, S}]}])
end, Sessions),
{atomic, ok}
@ -850,7 +851,8 @@ announce_motd_update(LServer, Packet) ->
mnesia:transaction(F);
riak ->
{atomic, ejabberd_riak:put(#motd{server = LServer,
packet = Packet})};
packet = Packet},
motd_schema())};
odbc ->
XML = ejabberd_odbc:escape(xml:element_to_binary(Packet)),
F = fun() ->
@ -940,17 +942,17 @@ send_motd(#jid{luser = LUser, lserver = LServer} = JID, mnesia) ->
ok
end;
send_motd(#jid{luser = LUser, lserver = LServer} = JID, riak) ->
case catch ejabberd_riak:get(motd, LServer) of
case catch ejabberd_riak:get(motd, motd_schema(), LServer) of
{ok, #motd{packet = Packet}} ->
US = {LUser, LServer},
case ejabberd_riak:get(motd_users, US) of
case ejabberd_riak:get(motd_users, motd_users_schema(), US) of
{ok, #motd_users{}} ->
ok;
_ ->
Local = jlib:make_jid(<<>>, LServer, <<>>),
ejabberd_router:route(Local, JID, Packet),
{atomic, ejabberd_riak:put(
#motd_users{us = US},
#motd_users{us = US}, motd_users_schema(),
[{'2i', [{<<"server">>, LServer}]}])}
end;
_ ->
@ -1007,7 +1009,7 @@ get_stored_motd_packet(LServer, mnesia) ->
error
end;
get_stored_motd_packet(LServer, riak) ->
case ejabberd_riak:get(motd, LServer) of
case ejabberd_riak:get(motd, motd_schema(), LServer) of
{ok, #motd{packet = Packet}} ->
{ok, Packet};
_ ->
@ -1100,6 +1102,12 @@ update_motd_users_table() ->
mnesia:transform_table(motd_users, ignore, Fields)
end.
motd_schema() ->
{record_info(fields, motd), #motd{}}.
motd_users_schema() ->
{record_info(fields, motd_users), #motd_users{}}.
export(_Server) ->
[{motd,
fun(Host, #motd{server = LServer, packet = El})
@ -1138,8 +1146,9 @@ import(_LServer, mnesia, #motd{} = Motd) ->
import(_LServer, mnesia, #motd_users{} = Users) ->
mnesia:dirty_write(Users);
import(_LServer, riak, #motd{} = Motd) ->
ejabberd_riak:put(Motd);
ejabberd_riak:put(Motd, motd_schema());
import(_LServer, riak, #motd_users{us = {_, S}} = Users) ->
ejabberd_riak:put(Users, [{'2i', [{<<"server">>, S}]}]);
ejabberd_riak:put(Users, motd_users_schema(),
[{'2i', [{<<"server">>, S}]}]);
import(_, _, _) ->
pass.

View File

@ -185,7 +185,8 @@ process_blocklist_block(LUser, LServer, Filter,
riak) ->
{atomic,
begin
case ejabberd_riak:get(privacy, {LUser, LServer}) of
case ejabberd_riak:get(privacy, mod_privacy:privacy_schema(),
{LUser, LServer}) of
{ok, #privacy{default = Default, lists = Lists} = P} ->
case lists:keysearch(Default, 1, Lists) of
{value, {_, List}} ->
@ -205,7 +206,8 @@ process_blocklist_block(LUser, LServer, Filter,
NewList = Filter(List),
NewLists = [{NewDefault, NewList} | NewLists1],
case ejabberd_riak:put(P#privacy{default = NewDefault,
lists = NewLists}) of
lists = NewLists},
mod_privacy:privacy_schema()) of
ok ->
{ok, NewDefault, NewList};
Err ->
@ -389,7 +391,8 @@ process_blocklist_unblock(LUser, LServer, Filter,
process_blocklist_unblock(LUser, LServer, Filter,
riak) ->
{atomic,
case ejabberd_riak:get(privacy, {LUser, LServer}) of
case ejabberd_riak:get(privacy, mod_privacy:privacy_schema(),
{LUser, LServer}) of
{error, _} ->
%% No lists, nothing to unblock
ok;
@ -399,7 +402,8 @@ process_blocklist_unblock(LUser, LServer, Filter,
NewList = Filter(List),
NewLists1 = lists:keydelete(Default, 1, Lists),
NewLists = [{Default, NewList} | NewLists1],
case ejabberd_riak:put(P#privacy{lists = NewLists}) of
case ejabberd_riak:put(P#privacy{lists = NewLists},
mod_privacy:privacy_schema()) of
ok ->
{ok, Default, NewList};
Err ->
@ -489,7 +493,8 @@ process_blocklist_get(LUser, LServer, mnesia) ->
end
end;
process_blocklist_get(LUser, LServer, riak) ->
case ejabberd_riak:get(privacy, {LUser, LServer}) of
case ejabberd_riak:get(privacy, {LUser, LServer},
mod_privacy:privacy_schema()) of
{ok, #privacy{default = Default, lists = Lists}} ->
case lists:keysearch(Default, 1, Lists) of
{value, {_, List}} -> List;

View File

@ -444,7 +444,7 @@ caps_read_fun(_LServer, Node, mnesia) ->
end;
caps_read_fun(_LServer, Node, riak) ->
fun() ->
case ejabberd_riak:get(caps_features, Node) of
case ejabberd_riak:get(caps_features, caps_features_schema(), Node) of
{ok, #caps_features{features = Features}} -> {ok, Features};
_ -> error
end
@ -482,7 +482,8 @@ caps_write_fun(_LServer, Node, Features, mnesia) ->
caps_write_fun(_LServer, Node, Features, riak) ->
fun () ->
ejabberd_riak:put(#caps_features{node_pair = Node,
features = Features})
features = Features},
caps_features_schema())
end;
caps_write_fun(LServer, NodePair, Features, odbc) ->
fun () ->
@ -676,6 +677,9 @@ sql_write_features_t({Node, SubNode}, Features) ->
<<"values ('">>, SNode, <<"', '">>, SSubNode, <<"', '">>,
ejabberd_odbc:escape(F), <<"');">>] || F <- NewFeatures]].
caps_features_schema() ->
{record_info(fields, caps_features), #caps_features{}}.
export(_Server) ->
[{caps_features,
fun(_Host, #caps_features{node_pair = NodePair,
@ -717,13 +721,15 @@ import_next(LServer, DBType, NodePair) ->
#caps_features{node_pair = NodePair, features = I});
[I] when is_integer(I), DBType == riak ->
ejabberd_riak:put(
#caps_features{node_pair = NodePair, features = I});
#caps_features{node_pair = NodePair, features = I},
caps_features_schema());
_ when DBType == mnesia ->
mnesia:dirty_write(
#caps_features{node_pair = NodePair, features = Features});
_ when DBType == riak ->
ejabberd_riak:put(
#caps_features{node_pair = NodePair, features = Features});
#caps_features{node_pair = NodePair, features = Features},
caps_features_schema());
_ when DBType == odbc ->
ok
end,

View File

@ -594,7 +594,7 @@ get_data(_LServer, Host, From, mnesia) ->
get_data(LServer, Host, From, riak) ->
#jid{luser = LUser, lserver = LServer} = From,
US = {LUser, LServer},
case ejabberd_riak:get(irc_custom, {US, Host}) of
case ejabberd_riak:get(irc_custom, irc_custom_schema(), {US, Host}) of
{ok, #irc_custom{data = Data}} ->
Data;
{error, notfound} ->
@ -738,7 +738,8 @@ set_data(LServer, Host, From, Data, riak) ->
{LUser, LServer, _} = jlib:jid_tolower(From),
US = {LUser, LServer},
{atomic, ejabberd_riak:put(#irc_custom{us_host = {US, Host},
data = Data})};
data = Data},
irc_custom_schema())};
set_data(LServer, Host, From, Data, odbc) ->
SJID =
ejabberd_odbc:escape(jlib:jid_to_string(jlib:jid_tolower(jlib:jid_remove_resource(From)))),
@ -1284,6 +1285,9 @@ conn_params_to_list(Params) ->
Port, binary_to_list(P)}
end, Params).
irc_custom_schema() ->
{record_info(fields, irc_custom), #irc_custom{}}.
update_table() ->
Fields = record_info(fields, irc_custom),
case mnesia:table_info(irc_custom, attributes) of
@ -1338,6 +1342,6 @@ import(_LServer) ->
import(_LServer, mnesia, #irc_custom{} = R) ->
mnesia:dirty_write(R);
import(_LServer, riak, #irc_custom{} = R) ->
ejabberd_riak:put(R);
ejabberd_riak:put(R, irc_custom_schema());
import(_, _, _) ->
pass.

View File

@ -169,7 +169,8 @@ get_last(LUser, LServer, mnesia) ->
{ok, TimeStamp, Status}
end;
get_last(LUser, LServer, riak) ->
case ejabberd_riak:get(last_activity, {LUser, LServer}) of
case ejabberd_riak:get(last_activity, last_activity_schema(),
{LUser, LServer}) of
{ok, #last_activity{timestamp = TimeStamp,
status = Status}} ->
{ok, TimeStamp, Status};
@ -250,7 +251,8 @@ store_last_info(LUser, LServer, TimeStamp, Status,
US = {LUser, LServer},
{atomic, ejabberd_riak:put(#last_activity{us = US,
timestamp = TimeStamp,
status = Status})};
status = Status},
last_activity_schema())};
store_last_info(LUser, LServer, TimeStamp, Status,
odbc) ->
Username = ejabberd_odbc:escape(LUser),
@ -301,6 +303,9 @@ update_table() ->
mnesia:transform_table(last_activity, ignore, Fields)
end.
last_activity_schema() ->
{record_info(fields, last_activity), #last_activity{}}.
export(_Server) ->
[{last_activity,
fun(Host, #last_activity{us = {LUser, LServer},
@ -331,7 +336,7 @@ import(LServer) ->
import(_LServer, mnesia, #last_activity{} = LA) ->
mnesia:dirty_write(LA);
import(_LServer, riak, #last_activity{} = LA) ->
ejabberd_riak:put(LA);
ejabberd_riak:put(LA, last_activity_schema());
import(_, _, _) ->
pass.

View File

@ -149,7 +149,8 @@ store_room(_LServer, Host, Name, Opts, mnesia) ->
mnesia:transaction(F);
store_room(_LServer, Host, Name, Opts, riak) ->
{atomic, ejabberd_riak:put(#muc_room{name_host = {Name, Host},
opts = Opts})};
opts = Opts},
muc_room_schema())};
store_room(LServer, Host, Name, Opts, odbc) ->
SName = ejabberd_odbc:escape(Name),
SHost = ejabberd_odbc:escape(Host),
@ -174,7 +175,7 @@ restore_room(_LServer, Host, Name, mnesia) ->
_ -> error
end;
restore_room(_LServer, Host, Name, riak) ->
case ejabberd_riak:get(muc_room, {Name, Host}) of
case ejabberd_riak:get(muc_room, muc_room_schema(), {Name, Host}) of
{ok, #muc_room{opts = Opts}} -> Opts;
_ -> error
end;
@ -245,6 +246,7 @@ can_use_nick(LServer, Host, JID, Nick, riak) ->
{LUser, LServer, _} = jlib:jid_tolower(JID),
LUS = {LUser, LServer},
case ejabberd_riak:get_by_index(muc_registered,
muc_registered_schema(),
<<"nick_host">>, {Nick, Host}) of
{ok, []} ->
true;
@ -640,7 +642,7 @@ get_rooms(_LServer, Host, mnesia) ->
Rs -> Rs
end;
get_rooms(_LServer, Host, riak) ->
case ejabberd_riak:get(muc_room) of
case ejabberd_riak:get(muc_room, muc_room_schema()) of
{ok, Rs} ->
lists:filter(
fun(#muc_room{name_host = {_, H}}) ->
@ -874,7 +876,9 @@ get_nick(_LServer, Host, From, mnesia) ->
get_nick(LServer, Host, From, riak) ->
{LUser, LServer, _} = jlib:jid_tolower(From),
US = {LUser, LServer},
case ejabberd_riak:get(muc_registered, {US, Host}) of
case ejabberd_riak:get(muc_registered,
muc_registered_schema(),
{US, Host}) of
{ok, #muc_registered{nick = Nick}} -> Nick;
{error, _} -> error
end;
@ -971,6 +975,7 @@ set_nick(LServer, Host, From, Nick, riak) ->
_ ->
Allow = case ejabberd_riak:get_by_index(
muc_registered,
muc_registered_schema(),
<<"nick_host">>, {Nick, Host}) of
{ok, []} ->
true;
@ -982,6 +987,7 @@ set_nick(LServer, Host, From, Nick, riak) ->
if Allow ->
ejabberd_riak:put(#muc_registered{us_host = {LUS, Host},
nick = Nick},
muc_registered_schema(),
[{'2i', [{<<"nick_host">>,
{Nick, Host}}]}]);
true ->
@ -1173,6 +1179,12 @@ update_tables(Host) ->
update_muc_room_table(Host),
update_muc_registered_table(Host).
muc_room_schema() ->
{record_info(fields, muc_room), #muc_room{}}.
muc_registered_schema() ->
{record_info(fields, muc_registered), #muc_registered{}}.
update_muc_room_table(_Host) ->
Fields = record_info(fields, muc_room),
case mnesia:table_info(muc_room, attributes) of
@ -1269,9 +1281,10 @@ import(_LServer, mnesia, #muc_room{} = R) ->
import(_LServer, mnesia, #muc_registered{} = R) ->
mnesia:dirty_write(R);
import(_LServer, riak, #muc_room{} = R) ->
ejabberd_riak:put(R);
ejabberd_riak:put(R, muc_room_schema());
import(_LServer, riak,
#muc_registered{us_host = {_, Host}, nick = Nick} = R) ->
ejabberd_riak:put(R, [{'2i', [{<<"nick_host">>, {Nick, Host}}]}]);
ejabberd_riak:put(R, muc_registered_schema(),
[{'2i', [{<<"nick_host">>, {Nick, Host}}]}]);
import(_, _, _) ->
pass.

View File

@ -189,7 +189,8 @@ store_offline_msg(Host, {User, _}, Msgs, Len, MaxOfflineMsgs,
lists:foreach(
fun(#offline_msg{us = US,
timestamp = TS} = M) ->
ejabberd_riak:put(M, [{i, TS}, {'2i', [{<<"us">>, US}]}])
ejabberd_riak:put(M, offline_msg_schema(),
[{i, TS}, {'2i', [{<<"us">>, US}]}])
end, Msgs)
end.
@ -440,7 +441,7 @@ pop_offline_messages(Ls, LUser, LServer, odbc) ->
_ -> Ls
end;
pop_offline_messages(Ls, LUser, LServer, riak) ->
case ejabberd_riak:get_by_index(offline_msg,
case ejabberd_riak:get_by_index(offline_msg, offline_msg_schema(),
<<"us">>, {LUser, LServer}) of
{ok, Rs} ->
try
@ -655,7 +656,8 @@ read_all_msgs(LUser, LServer, mnesia) ->
mnesia:dirty_read({offline_msg, US}));
read_all_msgs(LUser, LServer, riak) ->
case ejabberd_riak:get_by_index(
offline_msg, <<"us">>, {LUser, LServer}) of
offline_msg, offline_msg_schema(),
<<"us">>, {LUser, LServer}) of
{ok, Rs} ->
lists:keysort(#offline_msg.timestamp, Rs);
_Err ->
@ -1014,6 +1016,9 @@ count_offline_messages(_Acc, User, Server) ->
N = count_offline_messages(User, Server),
{stop, N}.
offline_msg_schema() ->
{record_info(fields, offline_msg), #offline_msg{}}.
export(_Server) ->
[{offline_msg,
fun(Host, #offline_msg{us = {LUser, LServer},
@ -1073,6 +1078,7 @@ import(LServer) ->
import(_LServer, mnesia, #offline_msg{} = Msg) ->
mnesia:dirty_write(Msg);
import(_LServer, riak, #offline_msg{us = US, timestamp = TS} = M) ->
ejabberd_riak:put(M, [{i, TS}, {'2i', [{<<"us">>, US}]}]);
ejabberd_riak:put(M, offline_msg_schema(),
[{i, TS}, {'2i', [{<<"us">>, US}]}]);
import(_, _, _) ->
pass.

View File

@ -43,7 +43,7 @@
sql_get_privacy_list_data_by_id_t/1,
sql_get_privacy_list_id_t/2,
sql_set_default_privacy_list/2,
sql_set_privacy_list/2]).
sql_set_privacy_list/2, privacy_schema/0]).
-include("ejabberd.hrl").
-include("logger.hrl").
@ -52,6 +52,9 @@
-include("mod_privacy.hrl").
privacy_schema() ->
{record_info(fields, privacy), #privacy{}}.
start(Host, Opts) ->
IQDisc = gen_mod:get_opt(iqdisc, Opts, fun gen_iq_handler:check_type/1,
one_queue),
@ -161,7 +164,7 @@ process_lists_get(LUser, LServer, _Active, mnesia) ->
{Default, LItems}
end;
process_lists_get(LUser, LServer, _Active, riak) ->
case ejabberd_riak:get(privacy, {LUser, LServer}) of
case ejabberd_riak:get(privacy, privacy_schema(), {LUser, LServer}) of
{ok, #privacy{default = Default, lists = Lists}} ->
LItems = lists:map(fun ({N, _}) ->
#xmlel{name = <<"list">>,
@ -225,7 +228,7 @@ process_list_get(LUser, LServer, Name, mnesia) ->
end
end;
process_list_get(LUser, LServer, Name, riak) ->
case ejabberd_riak:get(privacy, {LUser, LServer}) of
case ejabberd_riak:get(privacy, privacy_schema(), {LUser, LServer}) of
{ok, #privacy{lists = Lists}} ->
case lists:keysearch(Name, 1, Lists) of
{value, {_, List}} -> List;
@ -383,12 +386,13 @@ process_default_set(LUser, LServer, {value, Name},
mnesia:transaction(F);
process_default_set(LUser, LServer, {value, Name}, riak) ->
{atomic,
case ejabberd_riak:get(privacy, {LUser, LServer}) of
case ejabberd_riak:get(privacy, privacy_schema(), {LUser, LServer}) of
{ok, #privacy{lists = Lists} = P} ->
case lists:keymember(Name, 1, Lists) of
true ->
ejabberd_riak:put(P#privacy{default = Name,
lists = Lists});
lists = Lists},
privacy_schema());
false ->
not_found
end;
@ -418,9 +422,9 @@ process_default_set(LUser, LServer, false, mnesia) ->
mnesia:transaction(F);
process_default_set(LUser, LServer, false, riak) ->
{atomic,
case ejabberd_riak:get(privacy, {LUser, LServer}) of
case ejabberd_riak:get(privacy, privacy_schema(), {LUser, LServer}) of
{ok, R} ->
ejabberd_riak:put(R#privacy{default = none});
ejabberd_riak:put(R#privacy{default = none}, privacy_schema());
{error, _} ->
ok
end};
@ -457,7 +461,7 @@ process_active_set(LUser, LServer, Name, mnesia) ->
end
end;
process_active_set(LUser, LServer, Name, riak) ->
case ejabberd_riak:get(privacy, {LUser, LServer}) of
case ejabberd_riak:get(privacy, privacy_schema(), {LUser, LServer}) of
{ok, #privacy{lists = Lists}} ->
case lists:keysearch(Name, 1, Lists) of
{value, {_, List}} -> List;
@ -499,13 +503,14 @@ remove_privacy_list(LUser, LServer, Name, mnesia) ->
mnesia:transaction(F);
remove_privacy_list(LUser, LServer, Name, riak) ->
{atomic,
case ejabberd_riak:get(privacy, {LUser, LServer}) of
case ejabberd_riak:get(privacy, privacy_schema(), {LUser, LServer}) of
{ok, #privacy{default = Default, lists = Lists} = P} ->
if Name == Default ->
conflict;
true ->
NewLists = lists:keydelete(Name, 1, Lists),
ejabberd_riak:put(P#privacy{lists = NewLists})
ejabberd_riak:put(P#privacy{lists = NewLists},
privacy_schema())
end;
{error, _} ->
ok
@ -539,15 +544,16 @@ set_privacy_list(LUser, LServer, Name, List, mnesia) ->
mnesia:transaction(F);
set_privacy_list(LUser, LServer, Name, List, riak) ->
{atomic,
case ejabberd_riak:get(privacy, {LUser, LServer}) of
case ejabberd_riak:get(privacy, privacy_schema(), {LUser, LServer}) of
{ok, #privacy{lists = Lists} = P} ->
NewLists1 = lists:keydelete(Name, 1, Lists),
NewLists = [{Name, List} | NewLists1],
ejabberd_riak:put(P#privacy{lists = NewLists});
ejabberd_riak:put(P#privacy{lists = NewLists}, privacy_schema());
{error, _} ->
NewLists = [{Name, List}],
ejabberd_riak:put(#privacy{us = {LUser, LServer},
lists = NewLists})
lists = NewLists},
privacy_schema())
end};
set_privacy_list(LUser, LServer, Name, List, odbc) ->
RItems = lists:map(fun item_to_raw/1, List),
@ -734,7 +740,7 @@ get_user_list(_, LUser, LServer, mnesia) ->
_ -> {none, []}
end;
get_user_list(_, LUser, LServer, riak) ->
case ejabberd_riak:get(privacy, {LUser, LServer}) of
case ejabberd_riak:get(privacy, privacy_schema(), {LUser, LServer}) of
{ok, #privacy{default = Default, lists = Lists}} ->
case Default of
none -> {none, []};
@ -779,7 +785,7 @@ get_user_lists(LUser, LServer, mnesia) ->
error
end;
get_user_lists(LUser, LServer, riak) ->
case ejabberd_riak:get(privacy, {LUser, LServer}) of
case ejabberd_riak:get(privacy, privacy_schema(), {LUser, LServer}) of
{ok, #privacy{} = P} ->
{ok, P};
{error, _} ->
@ -1242,6 +1248,6 @@ import(LServer) ->
import(_LServer, mnesia, #privacy{} = P) ->
mnesia:dirty_write(P);
import(_LServer, riak, #privacy{} = P) ->
ejabberd_riak:put(P);
ejabberd_riak:put(P, privacy_schema());
import(_, _, _) ->
pass.

View File

@ -154,6 +154,7 @@ set_data(LUser, LServer, {XMLNS, El}, odbc) ->
set_data(LUser, LServer, {XMLNS, El}, riak) ->
ejabberd_riak:put(#private_storage{usns = {LUser, LServer, XMLNS},
xml = El},
private_storage_schema(),
[{'2i', [{<<"us">>, {LUser, LServer}}]}]).
get_data(LUser, LServer, Data) ->
@ -191,7 +192,8 @@ get_data(LUser, LServer, odbc, [{XMLNS, El} | Els],
end;
get_data(LUser, LServer, riak, [{XMLNS, El} | Els],
Res) ->
case ejabberd_riak:get(private_storage, {LUser, LServer, XMLNS}) of
case ejabberd_riak:get(private_storage, private_storage_schema(),
{LUser, LServer, XMLNS}) of
{ok, #private_storage{xml = NewEl}} ->
get_data(LUser, LServer, riak, Els, [NewEl|Res]);
_ ->
@ -226,13 +228,17 @@ get_all_data(LUser, LServer, odbc) ->
end;
get_all_data(LUser, LServer, riak) ->
case ejabberd_riak:get_by_index(
private_storage, <<"us">>, {LUser, LServer}) of
private_storage, private_storage_schema(),
<<"us">>, {LUser, LServer}) of
{ok, Res} ->
[El || #private_storage{xml = El} <- Res];
_ ->
[]
end.
private_storage_schema() ->
{record_info(fields, private_storage), #private_storage{}}.
remove_user(User, Server) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
@ -307,7 +313,9 @@ import(LServer) ->
import(_LServer, mnesia, #private_storage{} = PS) ->
mnesia:dirty_write(PS);
import(_LServer, riak, #private_storage{usns = {LUser, LServer, _}} = PS) ->
ejabberd_riak:put(PS, [{'2i', [{<<"us">>, {LUser, LServer}}]}]);
ejabberd_riak:put(PS, private_storage_schema(),
[{'2i', [{<<"us">>, {LUser, LServer}}]}]);
import(_, _, _) ->
pass.

View File

@ -206,7 +206,8 @@ read_roster_version(LUser, LServer, odbc) ->
{selected, [<<"version">>], []} -> error
end;
read_roster_version(LServer, LUser, riak) ->
case ejabberd_riak:get(roster_version, {LUser, LServer}) of
case ejabberd_riak:get(roster_version, roster_version_schema(),
{LUser, LServer}) of
{ok, #roster_version{version = V}} -> V;
_Err -> error
end.
@ -248,7 +249,8 @@ write_roster_version(LUser, LServer, InTransaction, Ver,
write_roster_version(LUser, LServer, _InTransaction, Ver,
riak) ->
US = {LUser, LServer},
ejabberd_riak:put(#roster_version{us = US, version = Ver}).
ejabberd_riak:put(#roster_version{us = US, version = Ver},
roster_version_schema()).
%% Load roster from DB only if neccesary.
%% It is neccesary if
@ -357,7 +359,8 @@ get_roster(LUser, LServer, mnesia) ->
_ -> []
end;
get_roster(LUser, LServer, riak) ->
case ejabberd_riak:get_by_index(roster, <<"us">>, {LUser, LServer}) of
case ejabberd_riak:get_by_index(roster, roster_schema(),
<<"us">>, {LUser, LServer}) of
{ok, Items} -> Items;
_Err -> []
end;
@ -471,7 +474,7 @@ get_roster_by_jid_t(LUser, LServer, LJID, odbc) ->
end
end;
get_roster_by_jid_t(LUser, LServer, LJID, riak) ->
case ejabberd_riak:get(roster, {LUser, LServer, LJID}) of
case ejabberd_riak:get(roster, roster_schema(), {LUser, LServer, LJID}) of
{ok, I} ->
I#roster{jid = LJID, name = <<"">>, groups = [],
xs = []};
@ -660,7 +663,8 @@ get_subscription_lists(_, LUser, LServer, odbc) ->
_ -> []
end;
get_subscription_lists(_, LUser, LServer, riak) ->
case ejabberd_riak:get_by_index(roster, <<"us">>, {LUser, LServer}) of
case ejabberd_riak:get_by_index(roster, roster_schema(),
<<"us">>, {LUser, LServer}) of
{ok, Items} -> Items;
_Err -> []
end.
@ -703,7 +707,7 @@ roster_subscribe_t(LUser, LServer, LJID, Item, odbc) ->
odbc_queries:roster_subscribe(LServer, Username, SJID,
ItemVals);
roster_subscribe_t(LUser, LServer, _LJID, Item, riak) ->
ejabberd_riak:put(Item,
ejabberd_riak:put(Item, roster_schema(),
[{'2i', [{<<"us">>, {LUser, LServer}}]}]).
transaction(LServer, F) ->
@ -763,7 +767,7 @@ get_roster_by_jid_with_groups_t(LUser, LServer, LJID,
us = {LUser, LServer}, jid = LJID}
end;
get_roster_by_jid_with_groups_t(LUser, LServer, LJID, riak) ->
case ejabberd_riak:get(roster, {LUser, LServer, LJID}) of
case ejabberd_riak:get(roster, roster_schema(), {LUser, LServer, LJID}) of
{ok, I} ->
I;
{error, notfound} ->
@ -1057,7 +1061,7 @@ update_roster_t(LUser, LServer, LJID, Item, odbc) ->
odbc_queries:update_roster(LServer, Username, SJID, ItemVals,
ItemGroups);
update_roster_t(LUser, LServer, _LJID, Item, riak) ->
ejabberd_riak:put(Item,
ejabberd_riak:put(Item, roster_schema(),
[{'2i', [{<<"us">>, {LUser, LServer}}]}]).
del_roster_t(LUser, LServer, LJID) ->
@ -1254,7 +1258,7 @@ read_subscription_and_groups(LUser, LServer, LJID,
end;
read_subscription_and_groups(LUser, LServer, LJID,
riak) ->
case ejabberd_riak:get(roster, {LUser, LServer, LJID}) of
case ejabberd_riak:get(roster, roster_schema(), {LUser, LServer, LJID}) of
{ok, #roster{subscription = Subscription,
groups = Groups}} ->
{Subscription, Groups};
@ -1698,6 +1702,11 @@ is_managed_from_id(<<"roster-remotely-managed">>) ->
is_managed_from_id(_Id) ->
false.
roster_schema() ->
{record_info(fields, roster), #roster{}}.
roster_version_schema() ->
{record_info(fields, roster_version), #roster_version{}}.
export(_Server) ->
[{roster,
@ -1749,8 +1758,9 @@ import(_LServer, mnesia, #roster{} = R) ->
import(_LServer, mnesia, #roster_version{} = RV) ->
mnesia:dirty_write(RV);
import(_LServer, riak, #roster{us = {LUser, LServer}} = R) ->
ejabberd_riak:put(R, [{'2i', [{<<"us">>, {LUser, LServer}}]}]);
ejabberd_riak:put(R, roster_schema(),
[{'2i', [{<<"us">>, {LUser, LServer}}]}]);
import(_LServer, riak, #roster_version{} = RV) ->
ejabberd_riak:put(RV);
ejabberd_riak:put(RV, roster_version_schema());
import(_, _, _) ->
pass.

View File

@ -425,7 +425,8 @@ groups_with_opts(Host, mnesia) ->
[], [['$1', '$2']]}]),
lists:map(fun ([G, O]) -> {G, O} end, Gs);
groups_with_opts(Host, riak) ->
case ejabberd_riak:get_by_index(sr_group, <<"host">>, Host) of
case ejabberd_riak:get_by_index(sr_group, sr_group_schema(),
<<"host">>, Host) of
{ok, Rs} ->
[{G, O} || #sr_group{group_host = {G, _}, opts = O} <- Rs];
_ ->
@ -455,6 +456,7 @@ create_group(Host, Group, Opts, mnesia) ->
create_group(Host, Group, Opts, riak) ->
{atomic, ejabberd_riak:put(#sr_group{group_host = {Group, Host},
opts = Opts},
sr_group_schema(),
[{'2i', [{<<"host">>, Host}]}])};
create_group(Host, Group, Opts, odbc) ->
SGroup = ejabberd_odbc:escape(Group),
@ -511,7 +513,7 @@ get_group_opts(Host, Group, mnesia) ->
_ -> error
end;
get_group_opts(Host, Group, riak) ->
case ejabberd_riak:get(sr_group, {Group, Host}) of
case ejabberd_riak:get(sr_group, sr_group_schema(), {Group, Host}) of
{ok, #sr_group{opts = Opts}} -> Opts;
_ -> error
end;
@ -537,6 +539,7 @@ set_group_opts(Host, Group, Opts, mnesia) ->
set_group_opts(Host, Group, Opts, riak) ->
{atomic, ejabberd_riak:put(#sr_group{group_host = {Group, Host},
opts = Opts},
sr_group_schema(),
[{'2i', [{<<"host">>, Host}]}])};
set_group_opts(Host, Group, Opts, odbc) ->
SGroup = ejabberd_odbc:escape(Group),
@ -562,7 +565,7 @@ get_user_groups(US, Host, mnesia) ->
_ -> []
end;
get_user_groups(US, Host, riak) ->
case ejabberd_riak:get_by_index(sr_user, <<"us">>, US) of
case ejabberd_riak:get_by_index(sr_user, sr_user_schema(), <<"us">>, US) of
{ok, Rs} ->
[Group || #sr_user{group_host = {Group, H}} <- Rs, H == Host];
_ ->
@ -639,8 +642,8 @@ get_group_explicit_users(Host, Group, mnesia) ->
_ -> []
end;
get_group_explicit_users(Host, Group, riak) ->
case ejabberd_riak:get_by_index(sr_user, <<"group_host">>,
{Group, Host}) of
case ejabberd_riak:get_by_index(sr_user, sr_user_schema(),
<<"group_host">>, {Group, Host}) of
{ok, Rs} ->
[R#sr_user.us || R <- Rs];
_ ->
@ -733,7 +736,7 @@ get_user_displayed_groups(LUser, LServer, GroupsOpts,
end;
get_user_displayed_groups(LUser, LServer, GroupsOpts,
riak) ->
case ejabberd_riak:get_by_index(sr_user,
case ejabberd_riak:get_by_index(sr_user, sr_user_schema(),
<<"us">>, {LUser, LServer}) of
{ok, Rs} ->
[{Group, proplists:get_value(Group, GroupsOpts, [])}
@ -788,7 +791,7 @@ is_user_in_group(US, Group, Host, mnesia) ->
_ -> true
end;
is_user_in_group(US, Group, Host, riak) ->
case ejabberd_riak:get_by_index(sr_user, <<"us">>, US) of
case ejabberd_riak:get_by_index(sr_user, sr_user_schema(), <<"us">>, US) of
{ok, Rs} ->
case lists:any(
fun(#sr_user{group_host = {G, H}}) ->
@ -844,6 +847,7 @@ add_user_to_group(Host, US, Group, mnesia) ->
add_user_to_group(Host, US, Group, riak) ->
{atomic, ejabberd_riak:put(
#sr_user{us = US, group_host = {Group, Host}},
sr_user_schema(),
[{i, {US, {Group, Host}}},
{'2i', [{<<"us">>, US},
{<<"group_host">>, {Group, Host}}]}])};
@ -1358,6 +1362,12 @@ opts_to_binary(Opts) ->
Opt
end, Opts).
sr_group_schema() ->
{record_info(fields, sr_group), #sr_group{}}.
sr_user_schema() ->
{record_info(fields, sr_user), #sr_user{}}.
update_tables() ->
update_sr_group_table(),
update_sr_user_table().
@ -1439,12 +1449,13 @@ import(LServer) ->
import(_LServer, mnesia, #sr_group{} = G) ->
mnesia:dirty_write(G);
import(_LServer, mnesia, #sr_user{} = U) ->
mnesia:dirty_write(U);
import(_LServer, riak, #sr_group{group_host = {_, Host}} = G) ->
ejabberd_riak:put(G, [{'2i', [{<<"host">>, Host}]}]);
ejabberd_riak:put(G, sr_group_schema(), [{'2i', [{<<"host">>, Host}]}]);
import(_LServer, riak, #sr_user{us = US, group_host = {Group, Host}} = User) ->
ejabberd_riak:put(User,
ejabberd_riak:put(User, sr_user_schema(),
[{i, {US, {Group, Host}}},
{'2i', [{<<"us">>, US},
{<<"group_host">>, {Group, Host}}]}]);

View File

@ -214,7 +214,7 @@ get_vcard(LUser, LServer, odbc) ->
_ -> error
end;
get_vcard(LUser, LServer, riak) ->
case ejabberd_riak:get(vcard, {LUser, LServer}) of
case ejabberd_riak:get(vcard, vcard_schema(), {LUser, LServer}) of
{ok, R} ->
[R#vcard.vcard];
{error, notfound} ->
@ -301,6 +301,7 @@ set_vcard(User, LServer, VCARD) ->
riak ->
US = {LUser, LServer},
ejabberd_riak:put(#vcard{us = US, vcard = VCARD},
vcard_schema(),
[{'2i', [{<<"user">>, User},
{<<"luser">>, LUser},
{<<"fn">>, FN},
@ -972,6 +973,9 @@ update_vcard_search_table() ->
mnesia:transform_table(vcard_search, ignore, Fields)
end.
vcard_schema() ->
{record_info(fields, vcard), #vcard{}}.
export(_Server) ->
[{vcard,
fun(Host, #vcard{us = {LUser, LServer}, vcard = VCARD})
@ -1121,7 +1125,7 @@ import(_LServer, riak, #vcard{us = {LUser, _}, vcard = El} = VCard) ->
LEMail = string2lower(EMail),
LOrgName = string2lower(OrgName),
LOrgUnit = string2lower(OrgUnit),
ejabberd_riak:put(VCard,
ejabberd_riak:put(VCard, vcard_schema(),
[{'2i', [{<<"user">>, LUser},
{<<"luser">>, LUser},
{<<"fn">>, FN},

View File

@ -90,7 +90,8 @@ add_xupdate(LUser, LServer, Hash, mnesia) ->
mnesia:transaction(F);
add_xupdate(LUser, LServer, Hash, riak) ->
{atomic, ejabberd_riak:put(#vcard_xupdate{us = {LUser, LServer},
hash = Hash})};
hash = Hash},
vcard_xupdate_schema())};
add_xupdate(LUser, LServer, Hash, odbc) ->
Username = ejabberd_odbc:escape(LUser),
SHash = ejabberd_odbc:escape(Hash),
@ -113,7 +114,8 @@ get_xupdate(LUser, LServer, mnesia) ->
_ -> undefined
end;
get_xupdate(LUser, LServer, riak) ->
case ejabberd_riak:get(vcard_xupdate, {LUser, LServer}) of
case ejabberd_riak:get(vcard_xupdate, vcard_xupdate_schema(),
{LUser, LServer}) of
{ok, #vcard_xupdate{hash = Hash}} -> Hash;
_ -> undefined
end;
@ -182,6 +184,9 @@ build_xphotoel(User, Host) ->
attrs = [{<<"xmlns">>, ?NS_VCARD_UPDATE}],
children = PhotoEl}.
vcard_xupdate_schema() ->
{record_info(fields, vcard_xupdate), #vcard_xupdate{}}.
update_table() ->
Fields = record_info(fields, vcard_xupdate),
case mnesia:table_info(vcard_xupdate, attributes) of
@ -223,6 +228,6 @@ import(LServer) ->
import(_LServer, mnesia, #vcard_xupdate{} = R) ->
mnesia:dirty_write(R);
import(_LServer, riak, #vcard_xupdate{} = R) ->
ejabberd_riak:put(R);
ejabberd_riak:put(R, vcard_xupdate_schema());
import(_, _, _) ->
pass.