From 47763c10e3072208d762b4808ab8d66d02628f16 Mon Sep 17 00:00:00 2001 From: Alexey Shchepin Date: Wed, 25 Jan 2012 12:02:16 +0200 Subject: [PATCH] Preliminary Riak support --- src/ejabberd_riak.erl | 134 ++++ src/ejabberd_riak_sup.erl | 142 ++++ src/mod_offline_riak.erl | 533 +++++++++++++++ src/mod_private_riak.erl | 139 ++++ src/mod_roster_riak.erl | 1310 +++++++++++++++++++++++++++++++++++++ src/mod_vcard_riak.erl | 209 ++++++ 6 files changed, 2467 insertions(+) create mode 100644 src/ejabberd_riak.erl create mode 100644 src/ejabberd_riak_sup.erl create mode 100644 src/mod_offline_riak.erl create mode 100644 src/mod_private_riak.erl create mode 100644 src/mod_roster_riak.erl create mode 100644 src/mod_vcard_riak.erl diff --git a/src/ejabberd_riak.erl b/src/ejabberd_riak.erl new file mode 100644 index 000000000..ca7df6833 --- /dev/null +++ b/src/ejabberd_riak.erl @@ -0,0 +1,134 @@ +%%%---------------------------------------------------------------------- +%%% File : ejabberd_riak.erl +%%% Author : Alexey Shchepin +%%% Purpose : Serve Riak connection +%%% Created : 29 Dec 2011 by Alexey Shchepin +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2011 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License +%%% along with this program; if not, write to the Free Software +%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA +%%% 02111-1307 USA +%%% +%%%---------------------------------------------------------------------- + +-module(ejabberd_riak). +-author('alexey@process-one.net'). + +%% External exports +-export([start_link/1, + put/4, + put/5, + get_object/3, + get/3, + get_objects_by_index/4, + get_by_index/4, + get_keys_by_index/4, + count_by_index/4, + delete/3]). + +-include("ejabberd.hrl"). + +%%%---------------------------------------------------------------------- +%%% API +%%%---------------------------------------------------------------------- +start_link(StartInterval) -> + {ok, Pid} = riakc_pb_socket:start_link( + "127.0.0.1", 8081, + [auto_reconnect]), + ejabberd_riak_sup:add_pid(Pid), + {ok, Pid}. + +make_bucket(Host, Table) -> + iolist_to_binary([Host, $@, Table]). + +put(Host, Table, Key, Value) -> + Bucket = make_bucket(Host, Table), + Obj = riakc_obj:new(Bucket, Key, Value), + riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj). + +put(Host, Table, Key, Value, Indexes) -> + Bucket = make_bucket(Host, Table), + Obj = riakc_obj:new(Bucket, Key, Value), + MetaData = dict:store(<<"index">>, Indexes, dict:new()), + Obj2 = riakc_obj:update_metadata(Obj, MetaData), + riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj2). + +get_object(Host, Table, Key) -> + Bucket = make_bucket(Host, Table), + riakc_pb_socket:get(ejabberd_riak_sup:get_random_pid(), Bucket, Key). + +get(Host, Table, Key) -> + case get_object(Host, Table, Key) of + {ok, Obj} -> + {ok, riakc_obj:get_value(Obj)}; + Error -> + Error + end. + +get_objects_by_index(Host, Table, Index, Key) -> + Bucket = make_bucket(Host, Table), + case riakc_pb_socket:mapred( + ejabberd_riak_sup:get_random_pid(), + {index, Bucket, Index, Key}, + [{map, {modfun, riak_kv_mapreduce, map_identity}, none, true}]) of + {ok, [{_, Objs}]} -> + {ok, Objs}; + Error -> + Error + end. + +get_by_index(Host, Table, Index, Key) -> + Bucket = make_bucket(Host, Table), + case riakc_pb_socket:mapred( + ejabberd_riak_sup:get_random_pid(), + {index, Bucket, Index, Key}, + [{map, {modfun, riak_kv_mapreduce, map_object_value}, + none, true}]) of + {ok, [{_, Objs}]} -> + {ok, Objs}; + Error -> + Error + end. + +get_keys_by_index(Host, Table, Index, Key) -> + Bucket = make_bucket(Host, Table), + case riakc_pb_socket:mapred( + ejabberd_riak_sup:get_random_pid(), + {index, Bucket, Index, Key}, + []) of + {ok, [{_, Ls}]} -> + {ok, [K || {_, K} <- Ls]}; + Error -> + Error + end. + +count_by_index(Host, Table, Index, Key) -> + Bucket = make_bucket(Host, Table), + case riakc_pb_socket:mapred( + ejabberd_riak_sup:get_random_pid(), + {index, Bucket, Index, Key}, + [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs}, + none, true}]) of + {ok, [{_, [Cnt]}]} -> + {ok, Cnt}; + Error -> + Error + end. + +delete(Host, Table, Key) -> + Bucket = make_bucket(Host, Table), + riakc_pb_socket:delete(ejabberd_riak_sup:get_random_pid(), Bucket, Key). + diff --git a/src/ejabberd_riak_sup.erl b/src/ejabberd_riak_sup.erl new file mode 100644 index 000000000..38fb202bd --- /dev/null +++ b/src/ejabberd_riak_sup.erl @@ -0,0 +1,142 @@ +%%%---------------------------------------------------------------------- +%%% File : ejabberd_riak_sup.erl +%%% Author : Alexey Shchepin +%%% Purpose : Riak connections supervisor +%%% Created : 29 Dec 2011 by Alexey Shchepin +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2011 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License +%%% along with this program; if not, write to the Free Software +%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA +%%% 02111-1307 USA +%%% +%%%---------------------------------------------------------------------- + +-module(ejabberd_riak_sup). +-author('alexey@process-one.net'). + +%% API +-export([start/0, + start_link/0, + init/1, + add_pid/1, + remove_pid/1, + get_pids/0, + get_random_pid/0 + ]). + +-include("ejabberd.hrl"). + +-define(DEFAULT_POOL_SIZE, 10). +-define(DEFAULT_RIAK_START_INTERVAL, 30). % 30 seconds + +% time to wait for the supervisor to start its child before returning +% a timeout error to the request +-define(CONNECT_TIMEOUT, 500). % milliseconds + + +-record(riak_pool, {undefined, pid}). + +start() -> + SupervisorName = ?MODULE, + ChildSpec = + {SupervisorName, + {?MODULE, start_link, []}, + transient, + infinity, + supervisor, + [?MODULE]}, + case supervisor:start_child(ejabberd_sup, ChildSpec) of + {ok, _PID} -> + ok; + _Error -> + ?ERROR_MSG("Start of supervisor ~p failed:~n~p~nRetrying...~n", + [SupervisorName, _Error]), + timer:sleep(5000), + start() + end. + +start_link() -> + mnesia:create_table(riak_pool, + [{ram_copies, [node()]}, + {type, bag}, + {local_content, true}, + {attributes, record_info(fields, riak_pool)}]), + mnesia:add_table_copy(riak_pool, node(), ram_copies), + F = fun() -> + mnesia:delete({riak_pool, undefined}) + end, + mnesia:ets(F), + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + PoolSize = + case ejabberd_config:get_local_option(riak_pool_size) of + I when is_integer(I) -> + I; + undefined -> + ?DEFAULT_POOL_SIZE; + Other -> + ?ERROR_MSG("Wrong riak_pool_size definition '~p' " + "default to ~p~n", + [Other, ?DEFAULT_POOL_SIZE]), + ?DEFAULT_POOL_SIZE + end, + StartInterval = + case ejabberd_config:get_local_option(riak_start_interval) of + Interval when is_integer(Interval) -> + Interval; + undefined -> + ?DEFAULT_RIAK_START_INTERVAL; + _Other2 -> + ?ERROR_MSG("Wrong riak_start_interval " + "definition '~p', " + "defaulting to ~p~n", + [_Other2, + ?DEFAULT_RIAK_START_INTERVAL]), + ?DEFAULT_RIAK_START_INTERVAL + end, + {ok, {{one_for_one, PoolSize*10, 1}, + lists:map( + fun(I) -> + {I, + {ejabberd_riak, start_link, [StartInterval*1000]}, + transient, + 2000, + worker, + [?MODULE]} + end, lists:seq(1, PoolSize))}}. + +get_pids() -> + Rs = mnesia:dirty_read(riak_pool, undefined), + [R#riak_pool.pid || R <- Rs]. + +get_random_pid() -> + Pids = get_pids(), + lists:nth(erlang:phash(now(), length(Pids)), Pids). + +add_pid(Pid) -> + F = fun() -> + mnesia:write( + #riak_pool{pid = Pid}) + end, + mnesia:ets(F). + +remove_pid(Pid) -> + F = fun() -> + mnesia:delete_object( + #riak_pool{pid = Pid}) + end, + mnesia:ets(F). diff --git a/src/mod_offline_riak.erl b/src/mod_offline_riak.erl new file mode 100644 index 000000000..433e583e8 --- /dev/null +++ b/src/mod_offline_riak.erl @@ -0,0 +1,533 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_offline_riak.erl +%%% Author : Alexey Shchepin +%%% Purpose : Store and manage offline messages in Riak. +%%% Created : 4 Jan 2012 by Alexey Shchepin +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2011 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License +%%% along with this program; if not, write to the Free Software +%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA +%%% 02111-1307 USA +%%% +%%%---------------------------------------------------------------------- + +-module(mod_offline_riak). +-author('alexey@process-one.net'). + +-behaviour(gen_mod). + +-export([count_offline_messages/2]). + +-export([start/2, + init/2, + stop/1, + store_packet/3, + pop_offline_messages/3, + remove_user/2, + webadmin_page/3, + webadmin_user/4, + webadmin_user_parse_query/5, + count_offline_messages/3]). + +-include("ejabberd.hrl"). +-include("jlib.hrl"). +-include("web/ejabberd_http.hrl"). +-include("web/ejabberd_web_admin.hrl"). + +-record(offline_msg, {user, timestamp, expire, from, to, packet}). + +-define(PROCNAME, ejabberd_offline). +-define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000). + +start(Host, Opts) -> + ejabberd_hooks:add(offline_message_hook, Host, + ?MODULE, store_packet, 50), + ejabberd_hooks:add(resend_offline_messages_hook, Host, + ?MODULE, pop_offline_messages, 50), + ejabberd_hooks:add(remove_user, Host, + ?MODULE, remove_user, 50), + ejabberd_hooks:add(anonymous_purge_hook, Host, + ?MODULE, remove_user, 50), + ejabberd_hooks:add(webadmin_page_host, Host, + ?MODULE, webadmin_page, 50), + ejabberd_hooks:add(webadmin_user, Host, + ?MODULE, webadmin_user, 50), + ejabberd_hooks:add(webadmin_user_parse_query, Host, + ?MODULE, webadmin_user_parse_query, 50), + ejabberd_hooks:add(count_offline_messages, Host, + ?MODULE, count_offline_messages, 50), + MaxOfflineMsgs = gen_mod:get_opt(user_max_messages, Opts, infinity), + register(gen_mod:get_module_proc(Host, ?PROCNAME), + spawn(?MODULE, init, [Host, MaxOfflineMsgs])). + +%% MaxOfflineMsgs is either infinity of integer > 0 +init(Host, infinity) -> + loop(Host, infinity); +init(Host, MaxOfflineMsgs) + when is_integer(MaxOfflineMsgs), MaxOfflineMsgs > 0 -> + loop(Host, MaxOfflineMsgs). + +loop(Host, MaxOfflineMsgs) -> + receive + #offline_msg{user = User} = Msg -> + Msgs = receive_all(User, [Msg]), + Len = length(Msgs), + + %% Only count existing messages if needed: + Count = if MaxOfflineMsgs =/= infinity -> + Len + count_offline_messages(User, Host); + true -> 0 + end, + if + Count > MaxOfflineMsgs -> + discard_warn_sender(Msgs); + true -> + lists:foreach( + fun(M) -> + Username = list_to_binary(User), + From = M#offline_msg.from, + To = M#offline_msg.to, + {xmlelement, Name, Attrs, Els} = + M#offline_msg.packet, + Attrs2 = jlib:replace_from_to_attrs( + jlib:jid_to_string(From), + jlib:jid_to_string(To), + Attrs), + Packet = {xmlelement, Name, Attrs2, + Els ++ + [jlib:timestamp_to_xml( + calendar:now_to_universal_time( + M#offline_msg.timestamp))]}, + XML = + iolist_to_binary( + xml:element_to_string(Packet)), + {MegaSecs, Secs, MicroSecs} = + M#offline_msg.timestamp, + TS = + iolist_to_binary( + io_lib:format("~6..0w~6..0w.~6..0w", + [MegaSecs, Secs, MicroSecs])), + ejabberd_riak:put( + Host, <<"offline">>, + undefined, XML, + [{<<"user_bin">>, Username}, + {<<"timestamp_bin">>, TS} + ]) + end, Msgs) + end, + loop(Host, MaxOfflineMsgs); + _ -> + loop(Host, MaxOfflineMsgs) + end. + +receive_all(Username, Msgs) -> + receive + #offline_msg{user=Username} = Msg -> + receive_all(Username, [Msg | Msgs]) + after 0 -> + lists:reverse(Msgs) + end. + + +stop(Host) -> + ejabberd_hooks:delete(offline_message_hook, Host, + ?MODULE, store_packet, 50), + ejabberd_hooks:delete(resend_offline_messages_hook, Host, + ?MODULE, pop_offline_messages, 50), + ejabberd_hooks:delete(remove_user, Host, + ?MODULE, remove_user, 50), + ejabberd_hooks:delete(anonymous_purge_hook, Host, + ?MODULE, remove_user, 50), + ejabberd_hooks:delete(webadmin_page_host, Host, + ?MODULE, webadmin_page, 50), + ejabberd_hooks:delete(webadmin_user, Host, + ?MODULE, webadmin_user, 50), + ejabberd_hooks:delete(webadmin_user_parse_query, Host, + ?MODULE, webadmin_user_parse_query, 50), + Proc = gen_mod:get_module_proc(Host, ?PROCNAME), + exit(whereis(Proc), stop), + ok. + +store_packet(From, To, Packet) -> + Type = xml:get_tag_attr_s("type", Packet), + if + (Type /= "error") and (Type /= "groupchat") and + (Type /= "headline") -> + case check_event(From, To, Packet) of + true -> + #jid{luser = LUser} = To, + TimeStamp = now(), + {xmlelement, _Name, _Attrs, Els} = Packet, + Expire = find_x_expire(TimeStamp, Els), + gen_mod:get_module_proc(To#jid.lserver, ?PROCNAME) ! + #offline_msg{user = LUser, + timestamp = TimeStamp, + expire = Expire, + from = From, + to = To, + packet = Packet}, + stop; + _ -> + ok + end; + true -> + ok + end. + +check_event(From, To, Packet) -> + {xmlelement, Name, Attrs, Els} = Packet, + case find_x_event(Els) of + false -> + true; + El -> + case xml:get_subtag(El, "id") of + false -> + case xml:get_subtag(El, "offline") of + false -> + true; + _ -> + ID = case xml:get_tag_attr_s("id", Packet) of + "" -> + {xmlelement, "id", [], []}; + S -> + {xmlelement, "id", [], + [{xmlcdata, S}]} + end, + ejabberd_router:route( + To, From, {xmlelement, Name, Attrs, + [{xmlelement, "x", + [{"xmlns", ?NS_EVENT}], + [ID, + {xmlelement, "offline", [], []}]}] + }), + true + end; + _ -> + false + end + end. + +find_x_event([]) -> + false; +find_x_event([{xmlcdata, _} | Els]) -> + find_x_event(Els); +find_x_event([El | Els]) -> + case xml:get_tag_attr_s("xmlns", El) of + ?NS_EVENT -> + El; + _ -> + find_x_event(Els) + end. + +find_x_expire(_, []) -> + never; +find_x_expire(TimeStamp, [{xmlcdata, _} | Els]) -> + find_x_expire(TimeStamp, Els); +find_x_expire(TimeStamp, [El | Els]) -> + case xml:get_tag_attr_s("xmlns", El) of + ?NS_EXPIRE -> + Val = xml:get_tag_attr_s("seconds", El), + case catch list_to_integer(Val) of + {'EXIT', _} -> + never; + Int when Int > 0 -> + {MegaSecs, Secs, MicroSecs} = TimeStamp, + S = MegaSecs * 1000000 + Secs + Int, + MegaSecs1 = S div 1000000, + Secs1 = S rem 1000000, + {MegaSecs1, Secs1, MicroSecs}; + _ -> + never + end; + _ -> + find_x_expire(TimeStamp, Els) + end. + + +pop_offline_messages(Ls, User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + Username = list_to_binary(LUser), + case ejabberd_riak:get_objects_by_index( + LServer, <<"offline">>, <<"user_bin">>, Username) of + {ok, Rs} -> + SortedRs = + lists:sort(fun(X, Y) -> + MX = riak_object:get_metadata(X), + {ok, IX} = dict:find(<<"index">>, MX), + {value, TSX} = lists:keysearch( + <<"timestamp_bin">>, 1, + IX), + MY = riak_object:get_metadata(Y), + {ok, IY} = dict:find(<<"index">>, MY), + {value, TSY} = lists:keysearch( + <<"timestamp_bin">>, 1, + IY), + TSX =< TSY + end, Rs), + Ls ++ lists:flatmap( + fun(R) -> + Key = riak_object:key(R), + ejabberd_riak:delete(LServer, <<"offline">>, Key), + XML = riak_object:get_value(R), + case xml_stream:parse_element(XML) of + {error, _Reason} -> + []; + El -> + To = jlib:string_to_jid( + xml:get_tag_attr_s("to", El)), + From = jlib:string_to_jid( + xml:get_tag_attr_s("from", El)), + if + (To /= error) and + (From /= error) -> + [{route, From, To, El}]; + true -> + [] + end + end + end, SortedRs); + _ -> + Ls + end. + + +remove_user(User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + Username = list_to_binary(LUser), + case ejabberd_riak:get_keys_by_index( + LServer, <<"offline">>, <<"user_bin">>, Username) of + {ok, Keys} -> + lists:foreach( + fun(Key) -> + ejabberd_riak:delete(LServer, <<"offline">>, Key) + end, Keys); + _ -> + ok + end. + + +%% Helper functions: + +%% TODO: Warning - This function is a duplicate from mod_offline.erl +%% It is duplicate to stay consistent (many functions are duplicated +%% in this module). It will be refactored later on. +%% Warn senders that their messages have been discarded: +discard_warn_sender(Msgs) -> + lists:foreach( + fun(#offline_msg{from=From, to=To, packet=Packet}) -> + ErrText = "Your contact offline message queue is full. The message has been discarded.", + Lang = xml:get_tag_attr_s("xml:lang", Packet), + Err = jlib:make_error_reply( + Packet, ?ERRT_RESOURCE_CONSTRAINT(Lang, ErrText)), + ejabberd_router:route( + To, + From, Err) + end, Msgs). + + +webadmin_page(_, Host, + #request{us = _US, + path = ["user", U, "queue"], + q = Query, + lang = Lang} = _Request) -> + Res = user_queue(U, Host, Query, Lang), + {stop, Res}; + +webadmin_page(Acc, _, _) -> Acc. + +user_queue(User, Server, Query, Lang) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + Username = ejabberd_odbc:escape(LUser), + US = {LUser, LServer}, + Res = user_queue_parse_query(Username, LServer, Query), + Msgs = case catch ejabberd_odbc:sql_query( + LServer, + ["select username, xml from spool" + " where username='", Username, "'" + " order by seq;"]) of + {selected, ["username", "xml"], Rs} -> + lists:flatmap( + fun({_, XML}) -> + case xml_stream:parse_element(XML) of + {error, _Reason} -> + []; + El -> + [El] + end + end, Rs); + _ -> + [] + end, + FMsgs = + lists:map( + fun({xmlelement, _Name, _Attrs, _Els} = Msg) -> + ID = jlib:encode_base64(binary_to_list(term_to_binary(Msg))), + Packet = Msg, + FPacket = ejabberd_web_admin:pretty_print_xml(Packet), + ?XE("tr", + [?XAE("td", [{"class", "valign"}], [?INPUT("checkbox", "selected", ID)]), + ?XAE("td", [{"class", "valign"}], [?XC("pre", FPacket)])] + ) + end, Msgs), + [?XC("h1", io_lib:format(?T("~s's Offline Messages Queue"), + [us_to_list(US)]))] ++ + case Res of + ok -> [?XREST("Submitted")]; + nothing -> [] + end ++ + [?XAE("form", [{"action", ""}, {"method", "post"}], + [?XE("table", + [?XE("thead", + [?XE("tr", + [?X("td"), + ?XCT("td", "Packet") + ])]), + ?XE("tbody", + if + FMsgs == [] -> + [?XE("tr", + [?XAC("td", [{"colspan", "4"}], " ")] + )]; + true -> + FMsgs + end + )]), + ?BR, + ?INPUTT("submit", "delete", "Delete Selected") + ])]. + +user_queue_parse_query(Username, LServer, Query) -> + case lists:keysearch("delete", 1, Query) of + {value, _} -> + Msgs = case catch ejabberd_odbc:sql_query( + LServer, + ["select xml, seq from spool" + " where username='", Username, "'" + " order by seq;"]) of + {selected, ["xml", "seq"], Rs} -> + lists:flatmap( + fun({XML, Seq}) -> + case xml_stream:parse_element(XML) of + {error, _Reason} -> + []; + El -> + [{El, Seq}] + end + end, Rs); + _ -> + [] + end, + F = fun() -> + lists:foreach( + fun({Msg, Seq}) -> + ID = jlib:encode_base64( + binary_to_list(term_to_binary(Msg))), + case lists:member({"selected", ID}, Query) of + true -> + SSeq = ejabberd_odbc:escape(Seq), + catch ejabberd_odbc:sql_query( + LServer, + ["delete from spool" + " where username='", Username, "'" + " and seq='", SSeq, "';"]); + false -> + ok + end + end, Msgs) + end, + mnesia:transaction(F), + ok; + false -> + nothing + end. + +us_to_list({User, Server}) -> + jlib:jid_to_string({User, Server, ""}). + +webadmin_user(Acc, User, Server, Lang) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + Username = ejabberd_odbc:escape(LUser), + QueueLen = case catch ejabberd_odbc:sql_query( + LServer, + ["select count(*) from spool" + " where username='", Username, "';"]) of + {selected, [_], [{SCount}]} -> + SCount; + _ -> + 0 + end, + FQueueLen = [?AC("queue/", QueueLen)], + Acc ++ [?XCT("h3", "Offline Messages:")] ++ FQueueLen ++ [?C(" "), ?INPUTT("submit", "removealloffline", "Remove All Offline Messages")]. + +webadmin_user_parse_query(_, "removealloffline", User, Server, _Query) -> + case catch odbc_queries:del_spool_msg(Server, User) of + {'EXIT', Reason} -> + ?ERROR_MSG("Failed to remove offline messages: ~p", [Reason]), + {stop, error}; + {error, Reason} -> + ?ERROR_MSG("Failed to remove offline messages: ~p", [Reason]), + {stop, error}; + _ -> + ?INFO_MSG("Removed all offline messages for ~s@~s", [User, Server]), + {stop, ok} + end; +webadmin_user_parse_query(Acc, _Action, _User, _Server, _Query) -> + Acc. + +%% ------------------------------------------------ +%% mod_offline: number of messages quota management + +%% Returns as integer the number of offline messages for a given user +count_offline_messages(LUser, LServer) -> + Username = list_to_binary([LUser, $@, LServer]), + case catch ejabberd_riak:count_by_index( + LServer, <<"offline">>, <<"user_bin">>, Username) of + {ok, Res} when is_integer(Res) -> + Res; + _ -> + 0 + end. + +count_offline_messages(_Acc, User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + Num = case catch ejabberd_odbc:sql_query( + LServer, + ["select xml from spool" + " where username='", LUser, "';"]) of + {selected, ["xml"], Rs} -> + lists:foldl( + fun({XML}, Acc) -> + case xml_stream:parse_element(XML) of + {error, _Reason} -> + Acc; + El -> + case xml:get_subtag(El, "body") of + false -> + Acc; + _ -> + Acc + 1 + end + end + end, 0, Rs); + _ -> + 0 + end, + {stop, Num}. diff --git a/src/mod_private_riak.erl b/src/mod_private_riak.erl new file mode 100644 index 000000000..e1b4f896d --- /dev/null +++ b/src/mod_private_riak.erl @@ -0,0 +1,139 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_private_riak.erl +%%% Author : Alexey Shchepin +%%% Purpose : Private storage support +%%% Created : 6 Jan 2012 by Alexey Shchepin +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2011 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License +%%% along with this program; if not, write to the Free Software +%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA +%%% 02111-1307 USA +%%% +%%%---------------------------------------------------------------------- + +-module(mod_private_riak). +-author('alexey@process-one.net'). + +-behaviour(gen_mod). + +-export([start/2, + stop/1, + process_sm_iq/3, + remove_user/2]). + +-include("ejabberd.hrl"). +-include("jlib.hrl"). + +start(Host, Opts) -> + IQDisc = gen_mod:get_opt(iqdisc, Opts, one_queue), + ejabberd_hooks:add(remove_user, Host, + ?MODULE, remove_user, 50), + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_PRIVATE, + ?MODULE, process_sm_iq, IQDisc). + +stop(Host) -> + ejabberd_hooks:delete(remove_user, Host, + ?MODULE, remove_user, 50), + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_PRIVATE). + + +process_sm_iq(From, _To, #iq{type = Type, sub_el = SubEl} = IQ) -> + #jid{luser = LUser, lserver = LServer} = From, + case lists:member(LServer, ?MYHOSTS) of + true -> + {xmlelement, Name, Attrs, Els} = SubEl, + case Type of + set -> + lists:foreach( + fun(El) -> + set_data(LUser, LServer, El) + end, Els), + IQ#iq{type = result, + sub_el = [{xmlelement, Name, Attrs, []}]}; + get -> + case catch get_data(LUser, LServer, Els) of + {'EXIT', _Reason} -> + IQ#iq{type = error, + sub_el = [SubEl, + ?ERR_INTERNAL_SERVER_ERROR]}; + Res -> + IQ#iq{type = result, + sub_el = [{xmlelement, Name, Attrs, Res}]} + end + end; + false -> + IQ#iq{type = error, sub_el = [SubEl, ?ERR_NOT_ALLOWED]} + end. + +set_data(LUser, LServer, El) -> + case El of + {xmlelement, _Name, Attrs, _Els} -> + XMLNS = xml:get_attr_s("xmlns", Attrs), + case XMLNS of + "" -> + ignore; + _ -> + Username = list_to_binary(LUser), + Key = list_to_binary([LUser, $@, LServer, $@, XMLNS]), + SData = xml:element_to_binary(El), + ejabberd_riak:put( + LServer, <<"private">>, Key, SData, + [{<<"user_bin">>, Username}]), + ok + end; + _ -> + ignore + end. + +get_data(LUser, LServer, Els) -> + get_data(LUser, LServer, Els, []). + +get_data(_LUser, _LServer, [], Res) -> + lists:reverse(Res); +get_data(LUser, LServer, [El | Els], Res) -> + case El of + {xmlelement, _Name, Attrs, _} -> + XMLNS = xml:get_attr_s("xmlns", Attrs), + Key = list_to_binary([LUser, $@, LServer, $@, XMLNS]), + case ejabberd_riak:get(LServer, <<"private">>, Key) of + {ok, SData} -> + case xml_stream:parse_element(SData) of + Data when element(1, Data) == xmlelement -> + get_data(LUser, LServer, Els, + [Data | Res]) + end; + _ -> + get_data(LUser, LServer, Els, [El | Res]) + end; + _ -> + get_data(LUser, LServer, Els, Res) + end. + + +remove_user(User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + Username = list_to_binary(LUser), + case ejabberd_riak:get_keys_by_index( + LServer, <<"private">>, <<"user_bin">>, Username) of + {ok, Keys} -> + lists:foreach( + fun(Key) -> + ejabberd_riak:delete(LServer, <<"private">>, Key) + end, Keys); + _ -> + ok + end. diff --git a/src/mod_roster_riak.erl b/src/mod_roster_riak.erl new file mode 100644 index 000000000..e66800bca --- /dev/null +++ b/src/mod_roster_riak.erl @@ -0,0 +1,1310 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_roster_riak.erl +%%% Author : Alexey Shchepin +%%% Purpose : Roster management +%%% Created : 6 Jan 2012 by Alexey Shchepin +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2011 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License +%%% along with this program; if not, write to the Free Software +%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA +%%% 02111-1307 USA +%%% +%%%---------------------------------------------------------------------- + +%%% @doc Roster management (Mnesia storage). +%%% +%%% Includes support for XEP-0237: Roster Versioning. +%%% The roster versioning follows an all-or-nothing strategy: +%%% - If the version supplied by the client is the latest, return an empty response. +%%% - If not, return the entire new roster (with updated version string). +%%% Roster version is a hash digest of the entire roster. +%%% No additional data is stored in DB. + +-module(mod_roster_riak). +-author('alexey@process-one.net'). + +-behaviour(gen_mod). + +-export([start/2, stop/1, + process_iq/3, + process_local_iq/3, + get_user_roster/2, + get_subscription_lists/3, + get_in_pending_subscriptions/3, + in_subscription/6, + out_subscription/4, + set_items/3, + remove_user/2, + get_jid_info/4, + webadmin_page/3, + webadmin_user/4, + get_versioning_feature/2, + roster_versioning_enabled/1]). + +-include("ejabberd.hrl"). +-include("jlib.hrl"). +-include("mod_roster.hrl"). +-include("web/ejabberd_http.hrl"). +-include("web/ejabberd_web_admin.hrl"). + + +start(Host, _Opts) -> + IQDisc = no_queue, + ejabberd_hooks:add(roster_get, Host, + ?MODULE, get_user_roster, 50), + ejabberd_hooks:add(roster_in_subscription, Host, + ?MODULE, in_subscription, 50), + ejabberd_hooks:add(roster_out_subscription, Host, + ?MODULE, out_subscription, 50), + ejabberd_hooks:add(roster_get_subscription_lists, Host, + ?MODULE, get_subscription_lists, 50), + ejabberd_hooks:add(roster_get_jid_info, Host, + ?MODULE, get_jid_info, 50), + ejabberd_hooks:add(remove_user, Host, + ?MODULE, remove_user, 50), + ejabberd_hooks:add(anonymous_purge_hook, Host, + ?MODULE, remove_user, 50), + ejabberd_hooks:add(resend_subscription_requests_hook, Host, + ?MODULE, get_in_pending_subscriptions, 50), + ejabberd_hooks:add(roster_get_versioning_feature, Host, + ?MODULE, get_versioning_feature, 50), + ejabberd_hooks:add(webadmin_page_host, Host, + ?MODULE, webadmin_page, 50), + ejabberd_hooks:add(webadmin_user, Host, + ?MODULE, webadmin_user, 50), + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_ROSTER, + ?MODULE, process_iq, IQDisc). + +stop(Host) -> + ejabberd_hooks:delete(roster_get, Host, + ?MODULE, get_user_roster, 50), + ejabberd_hooks:delete(roster_in_subscription, Host, + ?MODULE, in_subscription, 50), + ejabberd_hooks:delete(roster_out_subscription, Host, + ?MODULE, out_subscription, 50), + ejabberd_hooks:delete(roster_get_subscription_lists, Host, + ?MODULE, get_subscription_lists, 50), + ejabberd_hooks:delete(roster_get_jid_info, Host, + ?MODULE, get_jid_info, 50), + ejabberd_hooks:delete(remove_user, Host, + ?MODULE, remove_user, 50), + ejabberd_hooks:delete(anonymous_purge_hook, Host, + ?MODULE, remove_user, 50), + ejabberd_hooks:delete(resend_subscription_requests_hook, Host, + ?MODULE, get_in_pending_subscriptions, 50), + ejabberd_hooks:delete(roster_get_versioning_feature, Host, + ?MODULE, get_versioning_feature, 50), + ejabberd_hooks:delete(webadmin_page_host, Host, + ?MODULE, webadmin_page, 50), + ejabberd_hooks:delete(webadmin_user, Host, + ?MODULE, webadmin_user, 50), + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_ROSTER). + + +process_iq(From, To, IQ) -> + #iq{sub_el = SubEl} = IQ, + #jid{lserver = LServer} = From, + case lists:member(LServer, ?MYHOSTS) of + true -> + process_local_iq(From, To, IQ); + _ -> + IQ#iq{type = error, sub_el = [SubEl, ?ERR_ITEM_NOT_FOUND]} + end. + +process_local_iq(From, To, #iq{type = Type} = IQ) -> + case Type of + set -> + process_iq_set(From, To, IQ); + get -> + process_iq_get(From, To, IQ) + end. + + +roster_hash(Items) -> + sha:sha(term_to_binary( + lists:sort( + [R#roster{groups = lists:sort(Grs)} || + R = #roster{groups = Grs} <- Items]))). + +roster_versioning_enabled(Host) -> + gen_mod:get_module_opt(Host, ?MODULE, versioning, false). + +roster_version_on_db(Host) -> + gen_mod:get_module_opt(Host, ?MODULE, store_current_id, false). + +%% Returns a list that may contain an xmlelement with the XEP-237 feature if it's enabled. +get_versioning_feature(Acc, Host) -> + case roster_versioning_enabled(Host) of + true -> + Feature = {xmlelement, + "ver", + [{"xmlns", ?NS_ROSTER_VER}], + [{xmlelement, "optional", [], []}]}, + [Feature | Acc]; + false -> [] + end. + +roster_version(LServer, LUser) -> + US = {LUser, LServer}, + Username = list_to_binary(LUser), + case roster_version_on_db(LServer) of + true -> + case ejabberd_riak:get(LServer, <<"roster_version">>, + Username) of + {ok, Version} -> Version; + {error, notfound} -> + %% If for some reason we don't had it on DB. Create a version Id and store it. + %% (we did the same on process_iq_get, that is called when client get roster, + %% not sure why it can still not be on DB at this point) + RosterVersion = sha:sha(term_to_binary(now())), + riak_set_roster_version(LServer, Username, RosterVersion), + RosterVersion + end; + false -> + roster_hash(ejabberd_hooks:run_fold(roster_get, LServer, [], [US])) + end. + +%% Load roster from DB only if neccesary. +%% It is neccesary if +%% - roster versioning is disabled in server OR +%% - roster versioning is not used by the client OR +%% - roster versioning is used by server and client, BUT the server isn't storing versions on db OR +%% - the roster version from client don't match current version. +process_iq_get(From, To, #iq{sub_el = SubEl} = IQ) -> + LUser = From#jid.luser, + LServer = From#jid.lserver, + US = {LUser, LServer}, + Username = list_to_binary(LUser), + + try + {ItemsToSend, VersionToSend} = + case {xml:get_tag_attr("ver", SubEl), + roster_versioning_enabled(LServer), + roster_version_on_db(LServer)} of + {{value, RequestedVersion}, true, true} -> + BRequestedVersion = iolist_to_binary(RequestedVersion), + %% Retrieve version from DB. Only load entire roster + %% when neccesary. + case ejabberd_riak:get(LServer, <<"roster_version">>, + Username) of + {ok, BRequestedVersion} -> + {false, false}; + {ok, NewVersion} -> + {lists:map(fun item_to_xml/1, + ejabberd_hooks:run_fold(roster_get, To#jid.lserver, [], [US])), NewVersion}; + {error, notfound} -> + RosterVersion = sha:sha(term_to_binary(now())), + ejabberd_riak:put( + LServer, <<"roster_version">>, + Username, list_to_binary(RosterVersion)), + {lists:map(fun item_to_xml/1, + ejabberd_hooks:run_fold(roster_get, To#jid.lserver, [], [US])), RosterVersion} + end; + + {{value, RequestedVersion}, true, false} -> + RosterItems = ejabberd_hooks:run_fold(roster_get, To#jid.lserver, [] , [US]), + case roster_hash(RosterItems) of + RequestedVersion -> + {false, false}; + New -> + {lists:map(fun item_to_xml/1, RosterItems), New} + end; + + _ -> + {lists:map(fun item_to_xml/1, + ejabberd_hooks:run_fold(roster_get, To#jid.lserver, [], [US])), false} + end, + IQ#iq{type = result, + sub_el = case {ItemsToSend, VersionToSend} of + {false, false} -> + []; + {Items, false} -> + [{xmlelement, "query", + [{"xmlns", ?NS_ROSTER}], + Items}]; + {Items, Version} -> + [{xmlelement, "query", + [{"xmlns", ?NS_ROSTER}, + {"ver", Version}], + Items}] + end} + catch + _:_ -> + IQ#iq{type = error, sub_el = [SubEl, ?ERR_INTERNAL_SERVER_ERROR]} + end. + + +get_user_roster(Acc, {LUser, LServer}) -> + Items = get_roster(LUser, LServer), + lists:filter(fun(#roster{subscription = none, ask = in}) -> + false; + (_) -> + true + end, Items) ++ Acc. + +get_roster(LUser, LServer) -> + Username = list_to_binary(LUser), + case catch riak_get_roster(LServer, Username) of + {ok, Items} when is_list(Items) -> + JIDGroups = case riak_get_roster_jid_groups(LServer, Username) of + {ok, JGrps} when is_list(JGrps) -> + JGrps; + _ -> + [] + end, + GroupsDict = dict:from_list(JIDGroups), + RItems = lists:flatmap( + fun(I) -> + case raw_to_record(LServer, I) of + %% Bad JID in database: + error -> + []; + R -> + SJID = jlib:jid_to_string(R#roster.jid), + Groups = + case dict:find(SJID, GroupsDict) of + {ok, Gs} -> Gs; + error -> [] + end, + [R#roster{groups = Groups}] + end + end, Items), + RItems; + _ -> + [] + end. + + +item_to_xml(Item) -> + Attrs1 = [{"jid", jlib:jid_to_string(Item#roster.jid)}], + Attrs2 = case Item#roster.name of + "" -> + Attrs1; + Name -> + [{"name", Name} | Attrs1] + end, + Attrs3 = case Item#roster.subscription of + none -> + [{"subscription", "none"} | Attrs2]; + from -> + [{"subscription", "from"} | Attrs2]; + to -> + [{"subscription", "to"} | Attrs2]; + both -> + [{"subscription", "both"} | Attrs2]; + remove -> + [{"subscription", "remove"} | Attrs2] + end, + Attrs = case ask_to_pending(Item#roster.ask) of + out -> + [{"ask", "subscribe"} | Attrs3]; + both -> + [{"ask", "subscribe"} | Attrs3]; + _ -> + Attrs3 + end, + SubEls = lists:map(fun(G) -> + {xmlelement, "group", [], [{xmlcdata, G}]} + end, Item#roster.groups), + {xmlelement, "item", Attrs, SubEls}. + + +process_iq_set(From, To, #iq{sub_el = SubEl} = IQ) -> + {xmlelement, _Name, _Attrs, Els} = SubEl, + lists:foreach(fun(El) -> process_item_set(From, To, El) end, Els), + IQ#iq{type = result, sub_el = []}. + +process_item_set(From, To, {xmlelement, _Name, Attrs, Els}) -> + JID1 = jlib:string_to_jid(xml:get_attr_s("jid", Attrs)), + #jid{user = User, luser = LUser, lserver = LServer} = From, + case JID1 of + error -> + ok; + _ -> + LJID = jlib:jid_tolower(JID1), + Username = list_to_binary(LUser), + SJID = list_to_binary(jlib:jid_to_string(LJID)), + F = fun() -> + Res = riak_get_roster_by_jid(LServer, Username, SJID), + Item = case Res of + {error, _} -> + #roster{usj = {LUser, LServer, LJID}, + us = {LUser, LServer}, + jid = LJID}; + {ok, I} -> + R = raw_to_record(LServer, I), + case R of + %% Bad JID in database: + error -> + #roster{usj = {LUser, LServer, LJID}, + us = {LUser, LServer}, + jid = LJID}; + _ -> + R#roster{ + usj = {LUser, LServer, LJID}, + us = {LUser, LServer}, + jid = LJID, + name = ""} + end + end, + Item1 = process_item_attrs(Item, Attrs), + Item2 = process_item_els(Item1, Els), + case Item2#roster.subscription of + remove -> + riak_del_roster(LServer, Username, SJID); + _ -> + ItemVals = record_to_string(Item2), + ItemGroups = groups_to_binary(Item2), + riak_update_roster(LServer, Username, SJID, ItemVals, ItemGroups) + end, + %% If the item exist in shared roster, take the + %% subscription information from there: + Item3 = ejabberd_hooks:run_fold(roster_process_item, + LServer, Item2, [LServer]), + case roster_version_on_db(LServer) of + true -> + RosterVersion = sha:sha(term_to_binary(now())), + riak_set_roster_version( + LServer, Username, RosterVersion); + false -> ok + end, + {ok, Item, Item3} + end, + case catch F() of + {ok, OldItem, Item} -> + push_item(User, LServer, To, Item), + case Item#roster.subscription of + remove -> + send_unsubscribing_presence(From, OldItem), + ok; + _ -> + ok + end; + E -> + ?ERROR_MSG("ROSTER: roster item set error: ~p~n", [E]), + ok + end + end; +process_item_set(_From, _To, _) -> + ok. + +process_item_attrs(Item, [{Attr, Val} | Attrs]) -> + case Attr of + "jid" -> + case jlib:string_to_jid(Val) of + error -> + process_item_attrs(Item, Attrs); + JID1 -> + JID = {JID1#jid.luser, JID1#jid.lserver, JID1#jid.lresource}, + process_item_attrs(Item#roster{jid = JID}, Attrs) + end; + "name" -> + process_item_attrs(Item#roster{name = Val}, Attrs); + "subscription" -> + case Val of + "remove" -> + process_item_attrs(Item#roster{subscription = remove}, + Attrs); + _ -> + process_item_attrs(Item, Attrs) + end; + "ask" -> + process_item_attrs(Item, Attrs); + _ -> + process_item_attrs(Item, Attrs) + end; +process_item_attrs(Item, []) -> + Item. + + +process_item_els(Item, [{xmlelement, Name, _Attrs, SEls} | Els]) -> + case Name of + "group" -> + Groups = [xml:get_cdata(SEls) | Item#roster.groups], + process_item_els(Item#roster{groups = Groups}, Els); + _ -> + process_item_els(Item, Els) + end; +process_item_els(Item, [{xmlcdata, _} | Els]) -> + process_item_els(Item, Els); +process_item_els(Item, []) -> + Item. + + +push_item(User, Server, From, Item) -> + ejabberd_sm:route(jlib:make_jid("", "", ""), + jlib:make_jid(User, Server, ""), + {xmlelement, "broadcast", [], + [{item, + Item#roster.jid, + Item#roster.subscription}]}), + case roster_versioning_enabled(Server) of + true -> + push_item_version(Server, User, From, Item, roster_version(Server, User)); + false -> + lists:foreach(fun(Resource) -> + push_item(User, Server, Resource, From, Item) + end, ejabberd_sm:get_user_resources(User, Server)) + end. + +% TODO: don't push to those who not load roster +push_item(User, Server, Resource, From, Item) -> + ResIQ = #iq{type = set, xmlns = ?NS_ROSTER, + id = "push" ++ randoms:get_string(), + sub_el = [{xmlelement, "query", + [{"xmlns", ?NS_ROSTER}], + [item_to_xml(Item)]}]}, + ejabberd_router:route( + From, + jlib:make_jid(User, Server, Resource), + jlib:iq_to_xml(ResIQ)). + +%% @doc Roster push, calculate and include the version attribute. +%% TODO: don't push to those who didn't load roster +push_item_version(Server, User, From, Item, RosterVersion) -> + lists:foreach(fun(Resource) -> + push_item_version(User, Server, Resource, From, Item, RosterVersion) + end, ejabberd_sm:get_user_resources(User, Server)). + +push_item_version(User, Server, Resource, From, Item, RosterVersion) -> + IQPush = #iq{type = 'set', xmlns = ?NS_ROSTER, + id = "push" ++ randoms:get_string(), + sub_el = [{xmlelement, "query", + [{"xmlns", ?NS_ROSTER}, + {"ver", RosterVersion}], + [item_to_xml(Item)]}]}, + ejabberd_router:route( + From, + jlib:make_jid(User, Server, Resource), + jlib:iq_to_xml(IQPush)). + +get_subscription_lists(_, User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + Username = list_to_binary(LUser), + case catch riak_get_roster(LServer, Username) of + {ok, Items} when is_list(Items) -> + fill_subscription_lists(LServer, Items, [], []); + _ -> + {[], []} + end. + +fill_subscription_lists(LServer, [RawI | Is], F, T) -> + I = raw_to_record(LServer, RawI), + case I of + %% Bad JID in database: + error -> + fill_subscription_lists(LServer, Is, F, T); + _ -> + J = I#roster.jid, + case I#roster.subscription of + both -> + fill_subscription_lists(LServer, Is, [J | F], [J | T]); + from -> + fill_subscription_lists(LServer, Is, [J | F], T); + to -> + fill_subscription_lists(LServer, Is, F, [J | T]); + _ -> + fill_subscription_lists(LServer, Is, F, T) + end + end; +fill_subscription_lists(_LServer, [], F, T) -> + {F, T}. + +ask_to_pending(subscribe) -> out; +ask_to_pending(unsubscribe) -> none; +ask_to_pending(Ask) -> Ask. + + + +in_subscription(_, User, Server, JID, Type, Reason) -> + process_subscription(in, User, Server, JID, Type, Reason). + +out_subscription(User, Server, JID, Type) -> + process_subscription(out, User, Server, JID, Type, []). + +process_subscription(Direction, User, Server, JID1, Type, Reason) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + LJID = jlib:jid_tolower(JID1), + Username = list_to_binary(LUser), + SJID = list_to_binary(jlib:jid_to_string(LJID)), + F = fun() -> + Item = + case riak_get_roster_by_jid(LServer, Username, SJID) of + {ok, I} -> + %% raw_to_record can return error, but + %% jlib_to_string would fail before this point + R = raw_to_record(LServer, I), + Groups = + case riak_get_roster_groups(LServer, Username, SJID) of + {selected, ["grp"], JGrps} when is_list(JGrps) -> + [JGrp || {JGrp} <- JGrps]; + _ -> + [] + end, + R#roster{groups = Groups}; + {error, _} -> + #roster{usj = {LUser, LServer, LJID}, + us = {LUser, LServer}, + jid = LJID} + end, + NewState = case Direction of + out -> + out_state_change(Item#roster.subscription, + Item#roster.ask, + Type); + in -> + in_state_change(Item#roster.subscription, + Item#roster.ask, + Type) + end, + AutoReply = case Direction of + out -> + none; + in -> + in_auto_reply(Item#roster.subscription, + Item#roster.ask, + Type) + end, + AskMessage = case NewState of + {_, both} -> Reason; + {_, in} -> Reason; + _ -> "" + end, + case NewState of + none -> + {ok, none, AutoReply}; + {none, none} when Item#roster.subscription == none, + Item#roster.ask == in -> + riak_del_roster(LServer, Username, SJID), + {ok, none, AutoReply}; + {Subscription, Pending} -> + NewItem = Item#roster{subscription = Subscription, + ask = Pending, + askmessage = AskMessage}, + ItemVals = record_to_string(NewItem), + riak_roster_subscribe(LServer, Username, SJID, ItemVals), + case roster_version_on_db(LServer) of + true -> + riak_set_roster_version( + LServer, Username, + sha:sha(term_to_binary(now()))); + false -> ok + end, + {ok, {push, NewItem}, AutoReply} + end + end, + case catch F() of + {ok, Push, AutoReply} -> + case AutoReply of + none -> + ok; + _ -> + T = case AutoReply of + subscribed -> "subscribed"; + unsubscribed -> "unsubscribed" + end, + ejabberd_router:route( + jlib:make_jid(User, Server, ""), JID1, + {xmlelement, "presence", [{"type", T}], []}) + end, + case Push of + {push, Item} -> + if + Item#roster.subscription == none, + Item#roster.ask == in -> + ok; + true -> + push_item(User, Server, + jlib:make_jid(User, Server, ""), Item) + end, + true; + none -> + false + end; + E -> + ?ERROR_MSG("subscription error: ~p~n", [E]), + false + end. + + +%% in_state_change(Subscription, Pending, Type) -> NewState +%% NewState = none | {NewSubscription, NewPending} +-ifdef(ROSTER_GATEWAY_WORKAROUND). +-define(NNSD, {to, none}). +-define(NISD, {to, in}). +-else. +-define(NNSD, none). +-define(NISD, none). +-endif. + +in_state_change(none, none, subscribe) -> {none, in}; +in_state_change(none, none, subscribed) -> ?NNSD; +in_state_change(none, none, unsubscribe) -> none; +in_state_change(none, none, unsubscribed) -> none; +in_state_change(none, out, subscribe) -> {none, both}; +in_state_change(none, out, subscribed) -> {to, none}; +in_state_change(none, out, unsubscribe) -> none; +in_state_change(none, out, unsubscribed) -> {none, none}; +in_state_change(none, in, subscribe) -> none; +in_state_change(none, in, subscribed) -> ?NISD; +in_state_change(none, in, unsubscribe) -> {none, none}; +in_state_change(none, in, unsubscribed) -> none; +in_state_change(none, both, subscribe) -> none; +in_state_change(none, both, subscribed) -> {to, in}; +in_state_change(none, both, unsubscribe) -> {none, out}; +in_state_change(none, both, unsubscribed) -> {none, in}; +in_state_change(to, none, subscribe) -> {to, in}; +in_state_change(to, none, subscribed) -> none; +in_state_change(to, none, unsubscribe) -> none; +in_state_change(to, none, unsubscribed) -> {none, none}; +in_state_change(to, in, subscribe) -> none; +in_state_change(to, in, subscribed) -> none; +in_state_change(to, in, unsubscribe) -> {to, none}; +in_state_change(to, in, unsubscribed) -> {none, in}; +in_state_change(from, none, subscribe) -> none; +in_state_change(from, none, subscribed) -> {both, none}; +in_state_change(from, none, unsubscribe) -> {none, none}; +in_state_change(from, none, unsubscribed) -> none; +in_state_change(from, out, subscribe) -> none; +in_state_change(from, out, subscribed) -> {both, none}; +in_state_change(from, out, unsubscribe) -> {none, out}; +in_state_change(from, out, unsubscribed) -> {from, none}; +in_state_change(both, none, subscribe) -> none; +in_state_change(both, none, subscribed) -> none; +in_state_change(both, none, unsubscribe) -> {to, none}; +in_state_change(both, none, unsubscribed) -> {from, none}. + +out_state_change(none, none, subscribe) -> {none, out}; +out_state_change(none, none, subscribed) -> none; +out_state_change(none, none, unsubscribe) -> none; +out_state_change(none, none, unsubscribed) -> none; +out_state_change(none, out, subscribe) -> {none, out}; %% We need to resend query (RFC3921, section 9.2) +out_state_change(none, out, subscribed) -> none; +out_state_change(none, out, unsubscribe) -> {none, none}; +out_state_change(none, out, unsubscribed) -> none; +out_state_change(none, in, subscribe) -> {none, both}; +out_state_change(none, in, subscribed) -> {from, none}; +out_state_change(none, in, unsubscribe) -> none; +out_state_change(none, in, unsubscribed) -> {none, none}; +out_state_change(none, both, subscribe) -> none; +out_state_change(none, both, subscribed) -> {from, out}; +out_state_change(none, both, unsubscribe) -> {none, in}; +out_state_change(none, both, unsubscribed) -> {none, out}; +out_state_change(to, none, subscribe) -> none; +out_state_change(to, none, subscribed) -> {both, none}; +out_state_change(to, none, unsubscribe) -> {none, none}; +out_state_change(to, none, unsubscribed) -> none; +out_state_change(to, in, subscribe) -> none; +out_state_change(to, in, subscribed) -> {both, none}; +out_state_change(to, in, unsubscribe) -> {none, in}; +out_state_change(to, in, unsubscribed) -> {to, none}; +out_state_change(from, none, subscribe) -> {from, out}; +out_state_change(from, none, subscribed) -> none; +out_state_change(from, none, unsubscribe) -> none; +out_state_change(from, none, unsubscribed) -> {none, none}; +out_state_change(from, out, subscribe) -> none; +out_state_change(from, out, subscribed) -> none; +out_state_change(from, out, unsubscribe) -> {from, none}; +out_state_change(from, out, unsubscribed) -> {none, out}; +out_state_change(both, none, subscribe) -> none; +out_state_change(both, none, subscribed) -> none; +out_state_change(both, none, unsubscribe) -> {from, none}; +out_state_change(both, none, unsubscribed) -> {to, none}. + +in_auto_reply(from, none, subscribe) -> subscribed; +in_auto_reply(from, out, subscribe) -> subscribed; +in_auto_reply(both, none, subscribe) -> subscribed; +in_auto_reply(none, in, unsubscribe) -> unsubscribed; +in_auto_reply(none, both, unsubscribe) -> unsubscribed; +in_auto_reply(to, in, unsubscribe) -> unsubscribed; +in_auto_reply(from, none, unsubscribe) -> unsubscribed; +in_auto_reply(from, out, unsubscribe) -> unsubscribed; +in_auto_reply(both, none, unsubscribe) -> unsubscribed; +in_auto_reply(_, _, _) -> none. + + +remove_user(User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + Username = list_to_binary(LUser), + send_unsubscription_to_rosteritems(LUser, LServer), + riak_del_user_roster(LServer, Username), + ok. + +%% For each contact with Subscription: +%% Both or From, send a "unsubscribed" presence stanza; +%% Both or To, send a "unsubscribe" presence stanza. +send_unsubscription_to_rosteritems(LUser, LServer) -> + RosterItems = get_user_roster([], {LUser, LServer}), + From = jlib:make_jid({LUser, LServer, ""}), + lists:foreach(fun(RosterItem) -> + send_unsubscribing_presence(From, RosterItem) + end, + RosterItems). + +%% @spec (From::jid(), Item::roster()) -> ok +send_unsubscribing_presence(From, Item) -> + IsTo = case Item#roster.subscription of + both -> true; + to -> true; + _ -> false + end, + IsFrom = case Item#roster.subscription of + both -> true; + from -> true; + _ -> false + end, + if IsTo -> + send_presence_type( + jlib:jid_remove_resource(From), + jlib:make_jid(Item#roster.jid), "unsubscribe"); + true -> ok + end, + if IsFrom -> + send_presence_type( + jlib:jid_remove_resource(From), + jlib:make_jid(Item#roster.jid), "unsubscribed"); + true -> ok + end, + ok. + +send_presence_type(From, To, Type) -> + ejabberd_router:route( + From, To, + {xmlelement, "presence", + [{"type", Type}], + []}). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +set_items(User, Server, SubEl) -> + {xmlelement, _Name, _Attrs, Els} = SubEl, + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + lists:foreach(fun(El) -> + process_item_set_t(LUser, LServer, El) + end, Els). + +process_item_set_t(LUser, LServer, {xmlelement, _Name, Attrs, Els}) -> + JID1 = jlib:string_to_jid(xml:get_attr_s("jid", Attrs)), + case JID1 of + error -> + []; + _ -> + LJID = {JID1#jid.luser, JID1#jid.lserver, JID1#jid.lresource}, + Username = list_to_binary(LUser), + SJID = list_to_binary(jlib:jid_to_string(LJID)), + Item = #roster{usj = {LUser, LServer, LJID}, + us = {LUser, LServer}, + jid = LJID}, + Item1 = process_item_attrs_ws(Item, Attrs), + Item2 = process_item_els(Item1, Els), + case Item2#roster.subscription of + remove -> + riak_del_roster(LServer, Username, SJID); + _ -> + ItemVals = record_to_string(Item1), + ItemGroups = groups_to_binary(Item2), + riak_update_roster( + LServer, Username, SJID, ItemVals, ItemGroups) + end + end; +process_item_set_t(_LUser, _LServer, _) -> + []. + +process_item_attrs_ws(Item, [{Attr, Val} | Attrs]) -> + case Attr of + "jid" -> + case jlib:string_to_jid(Val) of + error -> + process_item_attrs_ws(Item, Attrs); + JID1 -> + JID = {JID1#jid.luser, JID1#jid.lserver, JID1#jid.lresource}, + process_item_attrs_ws(Item#roster{jid = JID}, Attrs) + end; + "name" -> + process_item_attrs_ws(Item#roster{name = Val}, Attrs); + "subscription" -> + case Val of + "remove" -> + process_item_attrs_ws(Item#roster{subscription = remove}, + Attrs); + "none" -> + process_item_attrs_ws(Item#roster{subscription = none}, + Attrs); + "both" -> + process_item_attrs_ws(Item#roster{subscription = both}, + Attrs); + "from" -> + process_item_attrs_ws(Item#roster{subscription = from}, + Attrs); + "to" -> + process_item_attrs_ws(Item#roster{subscription = to}, + Attrs); + _ -> + process_item_attrs_ws(Item, Attrs) + end; + "ask" -> + process_item_attrs_ws(Item, Attrs); + _ -> + process_item_attrs_ws(Item, Attrs) + end; +process_item_attrs_ws(Item, []) -> + Item. + +get_in_pending_subscriptions(Ls, User, Server) -> + JID = jlib:make_jid(User, Server, ""), + LUser = JID#jid.luser, + LServer = JID#jid.lserver, + Username = list_to_binary(LUser), + case catch riak_get_roster(LServer, Username) of + {ok, Items} when is_list(Items) -> + Ls ++ lists:map( + fun(R) -> + Message = R#roster.askmessage, + {xmlelement, "presence", + [{"from", jlib:jid_to_string(R#roster.jid)}, + {"to", jlib:jid_to_string(JID)}, + {"type", "subscribe"}], + [{xmlelement, "status", [], + [{xmlcdata, Message}]}]} + end, + lists:flatmap( + fun(I) -> + case raw_to_record(LServer, I) of + %% Bad JID in database: + error -> + []; + R -> + case R#roster.ask of + in -> [R]; + both -> [R]; + _ -> [] + end + end + end, + Items)); + _ -> + Ls + end. + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +get_jid_info(_, User, Server, JID) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + LJID = jlib:jid_tolower(JID), + Username = list_to_binary(LUser), + SJID = list_to_binary(jlib:jid_to_string(LJID)), + case catch riak_get_subscription(LServer, Username, SJID) of + {ok, Subscription} -> + Groups = + case catch riak_get_roster_groups(LServer, Username, SJID) of + {ok, JGrps} when is_list(JGrps) -> + lists:map(fun binary_to_list/1, JGrps); + _ -> + [] + end, + {Subscription, Groups}; + _ -> + LRJID = jlib:jid_tolower(jlib:jid_remove_resource(JID)), + if + LRJID == LJID -> + {none, []}; + true -> + SRJID = list_to_binary(jlib:jid_to_string(LRJID)), + case catch riak_get_subscription(LServer, Username, SRJID) of + {ok, Subscription} -> + Groups = case catch riak_get_roster_groups(LServer, Username, SRJID) of + {ok, JGrps} when is_list(JGrps) -> + lists:map(fun binary_to_list/1, + JGrps); + _ -> + [] + end, + {Subscription, Groups}; + _ -> + {none, []} + end + end + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +raw_to_record(LServer, + <>) -> + User = binary_to_list(Username), + case jlib:string_to_jid(binary_to_list(SJID)) of + error -> + error; + JID -> + LJID = jlib:jid_tolower(JID), + Subscription = case SSubscription of + $B -> both; + $T -> to; + $F -> from; + _ -> none + end, + Ask = case SAsk of + $S -> subscribe; + $U -> unsubscribe; + $B -> both; + $O -> out; + $I -> in; + _ -> none + end, + #roster{usj = {User, LServer, LJID}, + us = {User, LServer}, + jid = LJID, + name = binary_to_list(Nick), + subscription = Subscription, + ask = Ask, + askmessage = SAskMessage} + end. + +record_to_string(#roster{us = {User, _Server}, + jid = JID, + name = Name, + subscription = Subscription, + ask = Ask, + askmessage = AskMessage}) -> + Username = list_to_binary(User), + UsernameLen = size(Username), + SJID = list_to_binary(jlib:jid_to_string(jlib:jid_tolower(JID))), + SJIDLen = size(SJID), + Nick = list_to_binary(Name), + NickLen = size(Nick), + SSubscription = case Subscription of + both -> $B; + to -> $T; + from -> $F; + none -> $N + end, + SAsk = case Ask of + subscribe -> $S; + unsubscribe -> $U; + both -> $B; + out -> $O; + in -> $I; + none -> $N + end, + SAskMessage = iolist_to_binary(AskMessage), + SAskMessageLen = size(SAskMessage), + <>. + +groups_to_binary(#roster{jid = JID, groups = Groups}) -> + SJID = list_to_binary(jlib:jid_to_string(jlib:jid_tolower(JID))), + SJIDLen = size(SJID), + %% Empty groups do not need to be converted to string to be inserted in + %% the database + lists:foldl( + fun([], Acc) -> + Acc; + (Group, Acc) -> + G = list_to_binary(Group), + Len = size(G), + <> + end, <>, Groups). + +binary_to_groups(<>) -> + {binary_to_list(SJID), binary_to_groups(Rest, [])}. + +binary_to_groups(<>, Res) -> + binary_to_groups(Rest, [G | Res]); +binary_to_groups(_, Res) -> + Res. + + +webadmin_page(_, Host, + #request{us = _US, + path = ["user", U, "roster"], + q = Query, + lang = Lang} = _Request) -> + Res = user_roster(U, Host, Query, Lang), + {stop, Res}; + +webadmin_page(Acc, _, _) -> Acc. + +user_roster(User, Server, Query, Lang) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + US = {LUser, LServer}, + Items1 = get_roster(LUser, LServer), + Res = user_roster_parse_query(User, Server, Items1, Query), + Items = get_roster(LUser, LServer), + SItems = lists:sort(Items), + FItems = + case SItems of + [] -> + [?CT("None")]; + _ -> + [?XE("table", + [?XE("thead", + [?XE("tr", + [?XCT("td", "Jabber ID"), + ?XCT("td", "Nickname"), + ?XCT("td", "Subscription"), + ?XCT("td", "Pending"), + ?XCT("td", "Groups") + ])]), + ?XE("tbody", + lists:map( + fun(R) -> + Groups = + lists:flatmap( + fun(Group) -> + [?C(Group), ?BR] + end, R#roster.groups), + Pending = ask_to_pending(R#roster.ask), + TDJID = build_contact_jid_td(R#roster.jid), + ?XE("tr", + [TDJID, + ?XAC("td", [{"class", "valign"}], + R#roster.name), + ?XAC("td", [{"class", "valign"}], + atom_to_list(R#roster.subscription)), + ?XAC("td", [{"class", "valign"}], + atom_to_list(Pending)), + ?XAE("td", [{"class", "valign"}], Groups), + if + Pending == in -> + ?XAE("td", [{"class", "valign"}], + [?INPUTT("submit", + "validate" ++ + ejabberd_web_admin:term_to_id(R#roster.jid), + "Validate")]); + true -> + ?X("td") + end, + ?XAE("td", [{"class", "valign"}], + [?INPUTT("submit", + "remove" ++ + ejabberd_web_admin:term_to_id(R#roster.jid), + "Remove")])]) + end, SItems))])] + end, + [?XC("h1", ?T("Roster of ") ++ us_to_list(US))] ++ + case Res of + ok -> [?XREST("Submitted")]; + error -> [?XREST("Bad format")]; + nothing -> [] + end ++ + [?XAE("form", [{"action", ""}, {"method", "post"}], + FItems ++ + [?P, + ?INPUT("text", "newjid", ""), ?C(" "), + ?INPUTT("submit", "addjid", "Add Jabber ID") + ])]. + +build_contact_jid_td(RosterJID) -> + %% Convert {U, S, R} into {jid, U, S, R, U, S, R}: + ContactJID = jlib:make_jid(RosterJID), + JIDURI = case {ContactJID#jid.luser, ContactJID#jid.lserver} of + {"", _} -> ""; + {CUser, CServer} -> + case lists:member(CServer, ?MYHOSTS) of + false -> ""; + true -> "/admin/server/" ++ CServer ++ "/user/" ++ CUser ++ "/" + end + end, + case JIDURI of + [] -> + ?XAC("td", [{"class", "valign"}], jlib:jid_to_string(RosterJID)); + URI when is_list(URI) -> + ?XAE("td", [{"class", "valign"}], [?AC(JIDURI, jlib:jid_to_string(RosterJID))]) + end. + +user_roster_parse_query(User, Server, Items, Query) -> + case lists:keysearch("addjid", 1, Query) of + {value, _} -> + case lists:keysearch("newjid", 1, Query) of + {value, {_, undefined}} -> + error; + {value, {_, SJID}} -> + case jlib:string_to_jid(SJID) of + JID when is_record(JID, jid) -> + user_roster_subscribe_jid(User, Server, JID), + ok; + error -> + error + end; + false -> + error + end; + false -> + case catch user_roster_item_parse_query( + User, Server, Items, Query) of + submitted -> + ok; + {'EXIT', _Reason} -> + error; + _ -> + nothing + end + end. + + +user_roster_subscribe_jid(User, Server, JID) -> + out_subscription(User, Server, JID, subscribe), + UJID = jlib:make_jid(User, Server, ""), + ejabberd_router:route( + UJID, JID, {xmlelement, "presence", [{"type", "subscribe"}], []}). + +user_roster_item_parse_query(User, Server, Items, Query) -> + lists:foreach( + fun(R) -> + JID = R#roster.jid, + case lists:keysearch( + "validate" ++ ejabberd_web_admin:term_to_id(JID), 1, Query) of + {value, _} -> + JID1 = jlib:make_jid(JID), + out_subscription( + User, Server, JID1, subscribed), + UJID = jlib:make_jid(User, Server, ""), + ejabberd_router:route( + UJID, JID1, {xmlelement, "presence", + [{"type", "subscribed"}], []}), + throw(submitted); + false -> + case lists:keysearch( + "remove" ++ ejabberd_web_admin:term_to_id(JID), 1, Query) of + {value, _} -> + UJID = jlib:make_jid(User, Server, ""), + process_iq( + UJID, UJID, + #iq{type = set, + sub_el = {xmlelement, "query", + [{"xmlns", ?NS_ROSTER}], + [{xmlelement, "item", + [{"jid", jlib:jid_to_string(JID)}, + {"subscription", "remove"}], + []}]}}), + throw(submitted); + false -> + ok + end + + end + end, Items), + nothing. + +us_to_list({User, Server}) -> + jlib:jid_to_string({User, Server, ""}). + +webadmin_user(Acc, _User, _Server, Lang) -> + Acc ++ [?XE("h3", [?ACT("roster/", "Roster")])]. + + +riak_get_roster(LServer, Username) -> + ejabberd_riak:get_by_index( + LServer, <<"roster">>, <<"user_bin">>, Username). + +riak_get_roster_jid_groups(LServer, Username) -> + case ejabberd_riak:get_by_index( + LServer, <<"roster_groups">>, <<"user_bin">>, Username) of + {ok, JGs} -> + Res = lists:map(fun binary_to_groups/1, JGs), + {ok, Res}; + Error -> Error + end. + +riak_get_roster_groups(LServer, Username, SJID) -> + Key = <>, + case ejabberd_riak:get(LServer, <<"roster_groups">>, Key) of + {ok, Gs} -> + {_, Res} = binary_to_groups(Gs), + {ok, Res}; + {error, notfound} -> + {ok, []}; + Error -> Error + end. + +riak_get_roster_by_jid(LServer, Username, SJID) -> + Key = <>, + ejabberd_riak:get(LServer, <<"roster">>, Key). + +riak_del_roster(LServer, Username, SJID) -> + Key = <>, + ejabberd_riak:delete(LServer, <<"roster">>, Key). + +riak_update_roster(LServer, Username, SJID, ItemVals, ItemGroups) -> + Key = <>, + ejabberd_riak:put( + LServer, <<"roster">>, Key, ItemVals, + [{<<"user_bin">>, Username}]), + ejabberd_riak:put( + LServer, <<"roster_groups">>, Key, ItemGroups, + [{<<"user_bin">>, Username}]). + +riak_roster_subscribe(LServer, Username, SJID, ItemVals) -> + Key = <>, + ejabberd_riak:put( + LServer, <<"roster">>, Key, ItemVals, + [{<<"user_bin">>, Username}]). + +riak_get_subscription(LServer, Username, SJID) -> + case riak_get_roster_by_jid(LServer, Username, SJID) of + {ok, SR} -> + case raw_to_record(LServer, SR) of + error -> + {error, bad_record}; + R -> + {ok, R#roster.subscription} + end; + Error -> + Error + end. + +riak_set_roster_version(LServer, Username, RosterVersion) -> + ejabberd_riak:put(LServer, <<"roster_version">>, + Username, list_to_binary(RosterVersion)). + + +riak_del_user_roster(LServer, Username) -> + case ejabberd_riak:get_keys_by_index( + LServer, <<"roster">>, <<"user_bin">>, Username) of + {ok, Keys} -> + lists:foreach( + fun(Key) -> + ejabberd_riak:delete(LServer, <<"roster">>, Key) + end, Keys); + _ -> + ok + end, + case ejabberd_riak:get_keys_by_index( + LServer, <<"roster_groups">>, <<"user_bin">>, Username) of + {ok, GKeys} -> + lists:foreach( + fun(Key) -> + ejabberd_riak:delete(LServer, <<"roster_groups">>, Key) + end, GKeys); + _ -> + ok + end, + ejabberd_riak:delete(LServer, <<"roster_version">>, Username). + diff --git a/src/mod_vcard_riak.erl b/src/mod_vcard_riak.erl new file mode 100644 index 000000000..44ce35ff2 --- /dev/null +++ b/src/mod_vcard_riak.erl @@ -0,0 +1,209 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_vcard_riak.erl +%%% Author : Alexey Shchepin +%%% Purpose : vCard support via Riak +%%% Created : 6 Jan 2012 by Alexey Shchepin +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2011 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License +%%% along with this program; if not, write to the Free Software +%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA +%%% 02111-1307 USA +%%% +%%%---------------------------------------------------------------------- + +-module(mod_vcard_riak). +-author('alexey@process-one.net'). + +-behaviour(gen_mod). + +-export([start/2, stop/1, + get_sm_features/5, + process_local_iq/3, + process_sm_iq/3, + remove_user/2]). + +-include("ejabberd.hrl"). +-include("jlib.hrl"). + + +-define(JUD_MATCHES, 30). +-define(PROCNAME, ejabberd_mod_vcard). + +start(Host, Opts) -> + ejabberd_hooks:add(remove_user, Host, + ?MODULE, remove_user, 50), + IQDisc = gen_mod:get_opt(iqdisc, Opts, one_queue), + gen_iq_handler:add_iq_handler(ejabberd_local, Host, ?NS_VCARD, + ?MODULE, process_local_iq, IQDisc), + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_VCARD, + ?MODULE, process_sm_iq, IQDisc), + ejabberd_hooks:add(disco_sm_features, Host, ?MODULE, get_sm_features, 50), + ok. + +stop(Host) -> + ejabberd_hooks:delete(remove_user, Host, + ?MODULE, remove_user, 50), + gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_VCARD), + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_VCARD), + ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, get_sm_features, 50), + ok. + +get_sm_features({error, _Error} = Acc, _From, _To, _Node, _Lang) -> + Acc; + +get_sm_features(Acc, _From, _To, Node, _Lang) -> + case Node of + [] -> + case Acc of + {result, Features} -> + {result, [?NS_VCARD | Features]}; + empty -> + {result, [?NS_VCARD]} + end; + _ -> + Acc + end. + +process_local_iq(_From, _To, #iq{type = Type, lang = Lang, sub_el = SubEl} = IQ) -> + case Type of + set -> + IQ#iq{type = error, sub_el = [SubEl, ?ERR_NOT_ALLOWED]}; + get -> + IQ#iq{type = result, + sub_el = [{xmlelement, "vCard", + [{"xmlns", ?NS_VCARD}], + [{xmlelement, "FN", [], + [{xmlcdata, "ejabberd"}]}, + {xmlelement, "URL", [], + [{xmlcdata, ?EJABBERD_URI}]}, + {xmlelement, "DESC", [], + [{xmlcdata, + translate:translate( + Lang, + "Erlang Jabber Server") ++ + "\nCopyright (c) 2002-2011 ProcessOne"}]}, + {xmlelement, "BDAY", [], + [{xmlcdata, "2002-11-16"}]} + ]}]} + end. + + +process_sm_iq(From, To, #iq{type = Type, sub_el = SubEl} = IQ) -> + case Type of + set -> + #jid{user = User, lserver = LServer} = From, + case lists:member(LServer, ?MYHOSTS) of + true -> + set_vcard(User, LServer, SubEl), + IQ#iq{type = result, sub_el = []}; + false -> + IQ#iq{type = error, sub_el = [SubEl, ?ERR_NOT_ALLOWED]} + end; + get -> + #jid{luser = LUser, lserver = LServer} = To, + Username = list_to_binary(LUser), + case catch ejabberd_riak:get(LServer, <<"vcard">>, Username) of + {ok, SVCARD} -> + case xml_stream:parse_element(SVCARD) of + {error, _Reason} -> + IQ#iq{type = error, + sub_el = [SubEl, ?ERR_SERVICE_UNAVAILABLE]}; + VCARD -> + IQ#iq{type = result, sub_el = [VCARD]} + end; + {error, notfound} -> + IQ#iq{type = result, sub_el = []}; + _ -> + IQ#iq{type = error, + sub_el = [SubEl, ?ERR_INTERNAL_SERVER_ERROR]} + end + end. + +set_vcard(User, LServer, VCARD) -> + FN = xml:get_path_s(VCARD, [{elem, "FN"}, cdata]), + Family = xml:get_path_s(VCARD, [{elem, "N"}, {elem, "FAMILY"}, cdata]), + Given = xml:get_path_s(VCARD, [{elem, "N"}, {elem, "GIVEN"}, cdata]), + Middle = xml:get_path_s(VCARD, [{elem, "N"}, {elem, "MIDDLE"}, cdata]), + Nickname = xml:get_path_s(VCARD, [{elem, "NICKNAME"}, cdata]), + BDay = xml:get_path_s(VCARD, [{elem, "BDAY"}, cdata]), + CTRY = xml:get_path_s(VCARD, [{elem, "ADR"}, {elem, "CTRY"}, cdata]), + Locality = xml:get_path_s(VCARD, [{elem, "ADR"}, {elem, "LOCALITY"},cdata]), + EMail1 = xml:get_path_s(VCARD, [{elem, "EMAIL"}, {elem, "USERID"},cdata]), + EMail2 = xml:get_path_s(VCARD, [{elem, "EMAIL"}, cdata]), + OrgName = xml:get_path_s(VCARD, [{elem, "ORG"}, {elem, "ORGNAME"}, cdata]), + OrgUnit = xml:get_path_s(VCARD, [{elem, "ORG"}, {elem, "ORGUNIT"}, cdata]), + EMail = case EMail1 of + "" -> + EMail2; + _ -> + EMail1 + end, + + LUser = jlib:nodeprep(User), + LFN = stringprep:tolower(FN), + LFamily = stringprep:tolower(Family), + LGiven = stringprep:tolower(Given), + LMiddle = stringprep:tolower(Middle), + LNickname = stringprep:tolower(Nickname), + LBDay = stringprep:tolower(BDay), + LCTRY = stringprep:tolower(CTRY), + LLocality = stringprep:tolower(Locality), + LEMail = stringprep:tolower(EMail), + LOrgName = stringprep:tolower(OrgName), + LOrgUnit = stringprep:tolower(OrgUnit), + + if + (LUser == error) or + (LFN == error) or + (LFamily == error) or + (LGiven == error) or + (LMiddle == error) or + (LNickname == error) or + (LBDay == error) or + (LCTRY == error) or + (LLocality == error) or + (LEMail == error) or + (LOrgName == error) or + (LOrgUnit == error) -> + {error, badarg}; + true -> + Username = list_to_binary(LUser), + SVCARD = xml:element_to_binary(VCARD), + + ejabberd_riak:put( + LServer, <<"vcard">>, Username, SVCARD, + [{<<"bday_bin">>, list_to_binary(LBDay)}, + {<<"ctry_bin">>, list_to_binary(LCTRY)}, + {<<"email_bin">>, list_to_binary(LEMail)}, + {<<"fn_bin">>, list_to_binary(LFN)}, + {<<"family_bin">>, list_to_binary(LFamily)}, + {<<"given_bin">>, list_to_binary(LGiven)}, + {<<"locality_bin">>, list_to_binary(LLocality)}, + {<<"middle_bin">>, list_to_binary(LMiddle)}, + {<<"nickname_bin">>, list_to_binary(LNickname)}, + {<<"orgname_bin">>, list_to_binary(LOrgName)}, + {<<"orgunit_bin">>, list_to_binary(LOrgUnit)}, + {<<"user_bin">>, Username}]), + + ejabberd_hooks:run(vcard_set, LServer, [LUser, LServer, VCARD]) + end. + +remove_user(User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + Username = list_to_binary(LUser), + ejabberd_riak:delete(LServer, <<"vcard">>, Username), + ok.