Keep info about carbons inside session table

Accordingly, Mnesia/SQL/Riak table 'carboncopy' is not used anymore
and can be safely removed.

As a consequence, the commit deprecates the following options of
mod_carboncopy:
- ram_db_type
- use_cache
- cache_size
- cache_missed
- cache_life_time

Fixes #2663
This commit is contained in:
Evgeny Khramtsov 2018-12-01 13:33:44 +03:00
parent a9539fef22
commit 109ed8f2f6
14 changed files with 93 additions and 690 deletions

View File

@ -1,27 +0,0 @@
%%%----------------------------------------------------------------------
%%%
%%% ejabberd, Copyright (C) 2002-2018 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%----------------------------------------------------------------------
-type matchspec_atom() :: '_' | '$1' | '$2' | '$3' | '$4'.
-record(carboncopy, {us :: {binary(), binary()} | matchspec_atom(),
resource :: binary() | matchspec_atom(),
version :: binary() | matchspec_atom(),
node = node() :: node() | matchspec_atom()}).
-define(CARBONCOPY_CACHE, carboncopy_cache).

View File

@ -387,17 +387,6 @@ CREATE TABLE bosh (
CREATE UNIQUE INDEX i_bosh_sid ON bosh(sid);
CREATE TABLE carboncopy (
username text NOT NULL,
server_host text NOT NULL,
resource text NOT NULL,
namespace text NOT NULL,
node text NOT NULL,
PRIMARY KEY (server_host, username, resource)
);
CREATE INDEX i_carboncopy_sh_user ON carboncopy (server_host, username);
CREATE TABLE proxy65 (
sid text NOT NULL,
pid_t text NOT NULL,

View File

@ -357,16 +357,6 @@ CREATE TABLE bosh (
CREATE UNIQUE INDEX i_bosh_sid ON bosh(sid);
CREATE TABLE carboncopy (
username text NOT NULL,
resource text NOT NULL,
namespace text NOT NULL,
node text NOT NULL
);
CREATE UNIQUE INDEX i_carboncopy_ur ON carboncopy (username, resource);
CREATE INDEX i_carboncopy_user ON carboncopy (username);
CREATE TABLE proxy65 (
sid text NOT NULL,
pid_t text NOT NULL,

View File

@ -531,19 +531,6 @@ CREATE TABLE [dbo].[bosh] (
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON)
);
CREATE TABLE [dbo].[carboncopy] (
[username] [varchar] (255) NOT NULL,
[resource] [varchar] (255) NOT NULL,
[namespace] [varchar] (255) NOT NULL,
[node] [varchar] (255) NOT NULL
);
CREATE UNIQUE CLUSTERED INDEX [carboncopy_ur] ON [carboncopy] (username, resource)
WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON);
CREATE INDEX [carboncopy_user] ON [carboncopy] (username)
WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON);
CREATE TABLE [dbo].[push_session] (
[username] [varchar] (255) NOT NULL,
[timestamp] [bigint] NOT NULL,

View File

@ -403,17 +403,6 @@ CREATE TABLE bosh (
CREATE UNIQUE INDEX i_bosh_sid ON bosh(sid(75));
CREATE TABLE carboncopy (
username text NOT NULL,
server_host text NOT NULL,
resource text NOT NULL,
namespace text NOT NULL,
node text NOT NULL,
PRIMARY KEY (server_host(191), username(191), resource(191))
) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE INDEX i_carboncopy_sh_user ON carboncopy (server_host(191), username(75));
CREATE TABLE proxy65 (
sid text NOT NULL,
pid_t text NOT NULL,

View File

@ -373,16 +373,6 @@ CREATE TABLE bosh (
CREATE UNIQUE INDEX i_bosh_sid ON bosh(sid(75));
CREATE TABLE carboncopy (
username text NOT NULL,
resource text NOT NULL,
namespace text NOT NULL,
node text NOT NULL
) ENGINE=InnoDB CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE UNIQUE INDEX i_carboncopy_ur ON carboncopy (username(75), resource(75));
CREATE INDEX i_carboncopy_user ON carboncopy (username(75));
CREATE TABLE proxy65 (
sid text NOT NULL,
pid_t text NOT NULL,

View File

@ -156,13 +156,6 @@
-- CREATE INDEX i_sm_sh_username ON sm USING btree (server_host, username);
-- ALTER TABLE sm ALTER COLUMN server_host DROP DEFAULT;
-- ALTER TABLE carboncopy ADD COLUMN server_host text NOT NULL DEFAULT '<HOST>';
-- DROP INDEX i_carboncopy_ur;
-- DROP INDEX i_carboncopy_user;
-- ALTER TABLE carboncopy ADD PRIMARY KEY (server_host, username, resource);
-- CREATE INDEX i_carboncopy_sh_user ON carboncopy USING btree (server_host, username);
-- ALTER TABLE carboncopy ALTER COLUMN server_host DROP DEFAULT;
CREATE TABLE users (
username text NOT NULL,
@ -555,17 +548,6 @@ CREATE TABLE bosh (
CREATE UNIQUE INDEX i_bosh_sid ON bosh USING btree (sid);
CREATE TABLE carboncopy (
username text NOT NULL,
server_host text NOT NULL,
resource text NOT NULL,
namespace text NOT NULL,
node text NOT NULL,
PRIMARY KEY (server_host, username, resource)
);
CREATE INDEX i_carboncopy_sh_user ON carboncopy USING btree (server_host, username);
CREATE TABLE proxy65 (
sid text NOT NULL,
pid_t text NOT NULL,

View File

@ -377,16 +377,6 @@ CREATE TABLE bosh (
CREATE UNIQUE INDEX i_bosh_sid ON bosh USING btree (sid);
CREATE TABLE carboncopy (
username text NOT NULL,
resource text NOT NULL,
namespace text NOT NULL,
node text NOT NULL
);
CREATE UNIQUE INDEX i_carboncopy_ur ON carboncopy USING btree (username, resource);
CREATE INDEX i_carboncopy_user ON carboncopy USING btree (username);
CREATE TABLE proxy65 (
sid text NOT NULL,
pid_t text NOT NULL,

View File

@ -66,6 +66,8 @@
get_session_sids/2,
get_user_info/2,
get_user_info/3,
set_user_info/5,
del_user_info/4,
get_user_ip/3,
get_max_user_sessions/2,
get_all_pids/0,
@ -266,6 +268,44 @@ get_user_info(User, Server, Resource) ->
|Session#session.info]
end.
-spec set_user_info(binary(), binary(), binary(), atom(), term()) -> ok | {error, any()}.
set_user_info(User, Server, Resource, Key, Val) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
case get_sessions(Mod, LUser, LServer, LResource) of
[] -> {error, notfound};
Ss ->
lists:foldl(
fun(#session{sid = {_, Pid},
info = Info} = Session, _) when Pid == self() ->
Info1 = lists:keystore(Key, 1, Info, {Key, Val}),
set_session(Session#session{info = Info1});
(_, Acc) ->
Acc
end, {error, not_owner}, Ss)
end.
-spec del_user_info(binary(), binary(), binary(), atom()) -> ok | {error, any()}.
del_user_info(User, Server, Resource, Key) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
case get_sessions(Mod, LUser, LServer, LResource) of
[] -> {error, notfound};
Ss ->
lists:foldl(
fun(#session{sid = {_, Pid},
info = Info} = Session, _) when Pid == self() ->
Info1 = lists:keydelete(Key, 1, Info),
set_session(Session#session{info = Info1});
(_, Acc) ->
Acc
end, {error, not_owner}, Ss)
end.
-spec set_presence(sid(), binary(), binary(), binary(),
prio(), presence(), info()) -> ok.
@ -483,9 +523,13 @@ set_session(SID, User, Server, Resource, Priority, Info) ->
LResource = jid:resourceprep(Resource),
US = {LUser, LServer},
USR = {LUser, LServer, LResource},
set_session(#session{sid = SID, usr = USR, us = US,
priority = Priority, info = Info}).
-spec set_session(#session{}) -> ok | {error, any()}.
set_session(#session{us = {LUser, LServer}} = Session) ->
Mod = get_sm_backend(LServer),
case Mod:set_session(#session{sid = SID, usr = USR, us = US,
priority = Priority, info = Info}) of
case Mod:set_session(Session) of
ok ->
case use_cache(Mod, LServer) of
true ->

View File

@ -35,38 +35,25 @@
-export([start/2, stop/1, reload/3]).
-export([user_send_packet/1, user_receive_packet/1,
iq_handler/1, remove_connection/4, disco_features/5,
is_carbon_copy/1, mod_opt_type/1, depends/2, clean_cache/1,
iq_handler/1, disco_features/5,
is_carbon_copy/1, mod_opt_type/1, depends/2,
mod_options/1]).
%% For debugging purposes
-export([list/2]).
-include("logger.hrl").
-include("xmpp.hrl").
-include("mod_carboncopy.hrl").
-type direction() :: sent | received.
-callback init(binary(), gen_mod:opts()) -> any().
-callback enable(binary(), binary(), binary(), binary()) -> ok | {error, any()}.
-callback disable(binary(), binary(), binary()) -> ok | {error, any()}.
-callback list(binary(), binary()) -> [{binary(), binary(), node()}].
-callback use_cache(binary()) -> boolean().
-callback cache_nodes(binary()) -> [node()].
-optional_callbacks([use_cache/1, cache_nodes/1]).
-spec is_carbon_copy(stanza()) -> boolean().
is_carbon_copy(#message{meta = #{carbon_copy := true}}) ->
true;
is_carbon_copy(_) ->
false.
start(Host, Opts) ->
start(Host, _Opts) ->
ejabberd_hooks:add(disco_local_features, Host, ?MODULE, disco_features, 50),
Mod = gen_mod:ram_db_mod(Host, ?MODULE),
init_cache(Mod, Host, Opts),
Mod:init(Host, Opts),
clean_cache(),
ejabberd_hooks:add(unset_presence_hook,Host, ?MODULE, remove_connection, 10),
%% why priority 89: to define clearly that we must run BEFORE mod_logdb hook (90)
ejabberd_hooks:add(user_send_packet,Host, ?MODULE, user_send_packet, 89),
ejabberd_hooks:add(user_receive_packet,Host, ?MODULE, user_receive_packet, 89),
@ -77,23 +64,10 @@ stop(Host) ->
ejabberd_hooks:delete(disco_local_features, Host, ?MODULE, disco_features, 50),
%% why priority 89: to define clearly that we must run BEFORE mod_logdb hook (90)
ejabberd_hooks:delete(user_send_packet,Host, ?MODULE, user_send_packet, 89),
ejabberd_hooks:delete(user_receive_packet,Host, ?MODULE, user_receive_packet, 89),
ejabberd_hooks:delete(unset_presence_hook,Host, ?MODULE, remove_connection, 10).
ejabberd_hooks:delete(user_receive_packet,Host, ?MODULE, user_receive_packet, 89).
reload(Host, NewOpts, OldOpts) ->
NewMod = gen_mod:ram_db_mod(Host, NewOpts, ?MODULE),
OldMod = gen_mod:ram_db_mod(Host, OldOpts, ?MODULE),
if NewMod /= OldMod ->
NewMod:init(Host, NewOpts);
true ->
ok
end,
case use_cache(NewMod, Host) of
true ->
ets_cache:new(?CARBONCOPY_CACHE, cache_opts(NewOpts));
false ->
ok
end.
reload(_Host, _NewOpts, _OldOpts) ->
ok.
-spec disco_features({error, stanza_error()} | {result, [binary()]} | empty,
jid(), jid(), binary(), binary()) ->
@ -113,19 +87,13 @@ iq_handler(#iq{type = set, lang = Lang, from = From,
is_record(El, carbons_disable) ->
{U, S, R} = jid:tolower(From),
Result = case El of
#carbons_enable{} ->
?DEBUG("Carbons enabled for user ~s@~s/~s", [U,S,R]),
enable(S, U, R, ?NS_CARBONS_2);
#carbons_disable{} ->
?DEBUG("Carbons disabled for user ~s@~s/~s", [U,S,R]),
disable(S, U, R)
#carbons_enable{} -> enable(S, U, R, ?NS_CARBONS_2);
#carbons_disable{} -> disable(S, U, R)
end,
case Result of
ok ->
?DEBUG("carbons IQ result: ok", []),
xmpp:make_iq_result(IQ);
{error,_Error} ->
?ERROR_MSG("Error enabling / disabling carbons: ~p", [Result]),
{error, _} ->
Txt = <<"Database failure">>,
xmpp:make_error(IQ, xmpp:err_internal_server_error(Txt, Lang))
end;
@ -180,12 +148,6 @@ check_and_forward(JID, To, Packet, Direction)->
Packet
end.
-spec remove_connection(binary(), binary(), binary(), binary()) -> ok.
remove_connection(User, Server, Resource, _Status)->
disable(Server, User, Resource),
ok.
%%% Internal
%% Direction = received | sent <received xmlns='urn:xmpp:carbons:1'/>
-spec send_copies(jid(), jid(), message(), direction()) -> ok.
@ -248,22 +210,26 @@ build_forward_packet(JID, #message{type = T} = Msg, Sender, Dest, Direction) ->
-spec enable(binary(), binary(), binary(), binary()) -> ok | {error, any()}.
enable(Host, U, R, CC)->
?DEBUG("enabling for ~p", [U]),
Mod = gen_mod:ram_db_mod(Host, ?MODULE),
case Mod:enable(U, Host, R, CC) of
ok ->
delete_cache(Mod, U, Host);
{error, _} = Err ->
?DEBUG("Enabling carbons for ~s@~s/~s", [U, Host, R]),
case ejabberd_sm:set_user_info(U, Host, R, carboncopy, CC) of
ok -> ok;
{error, Reason} = Err ->
?ERROR_MSG("Failed to disable carbons for ~s@~s/~s: ~p",
[U, Host, R, Reason]),
Err
end.
-spec disable(binary(), binary(), binary()) -> ok | {error, any()}.
disable(Host, U, R)->
?DEBUG("disabling for ~p", [U]),
Mod = gen_mod:ram_db_mod(Host, ?MODULE),
Res = Mod:disable(U, Host, R),
delete_cache(Mod, U, Host),
Res.
?DEBUG("Disabling carbons for ~s@~s/~s", [U, Host, R]),
case ejabberd_sm:del_user_info(U, Host, R, carboncopy) of
ok -> ok;
{error, notfound} -> ok;
{error, Reason} = Err ->
?ERROR_MSG("Failed to disable carbons for ~s@~s/~s: ~p",
[U, Host, R, Reason]),
Err
end.
-spec complete_packet(jid(), message(), direction()) -> message().
complete_packet(From, #message{from = undefined} = Msg, sent) ->
@ -291,99 +257,30 @@ is_received_muc_pm(_To, Packet, received) ->
-spec list(binary(), binary()) -> [{Resource :: binary(), Namespace :: binary()}].
list(User, Server) ->
Mod = gen_mod:ram_db_mod(Server, ?MODULE),
case use_cache(Mod, Server) of
true ->
case ets_cache:lookup(
?CARBONCOPY_CACHE, {User, Server},
fun() ->
case Mod:list(User, Server) of
{ok, L} when L /= [] -> {ok, L};
_ -> error
end
end) of
{ok, L} -> [{Resource, NS} || {Resource, NS, _} <- L];
error -> []
end;
false ->
case Mod:list(User, Server) of
{ok, L} -> [{Resource, NS} || {Resource, NS, _} <- L];
error -> []
end
end.
-spec init_cache(module(), binary(), gen_mod:opts()) -> ok.
init_cache(Mod, Host, Opts) ->
case use_cache(Mod, Host) of
true ->
ets_cache:new(?CARBONCOPY_CACHE, cache_opts(Opts));
false ->
ets_cache:delete(?CARBONCOPY_CACHE)
end.
-spec cache_opts(gen_mod:opts()) -> [proplists:property()].
cache_opts(Opts) ->
MaxSize = gen_mod:get_opt(cache_size, Opts),
CacheMissed = gen_mod:get_opt(cache_missed, Opts),
LifeTime = case gen_mod:get_opt(cache_life_time, Opts) of
infinity -> infinity;
I -> timer:seconds(I)
end,
[{max_size, MaxSize}, {cache_missed, CacheMissed}, {life_time, LifeTime}].
-spec use_cache(module(), binary()) -> boolean().
use_cache(Mod, Host) ->
case erlang:function_exported(Mod, use_cache, 1) of
true -> Mod:use_cache(Host);
false -> gen_mod:get_module_opt(Host, ?MODULE, use_cache)
end.
-spec cache_nodes(module(), binary()) -> [node()].
cache_nodes(Mod, Host) ->
case erlang:function_exported(Mod, cache_nodes, 1) of
true -> Mod:cache_nodes(Host);
false -> ejabberd_cluster:get_nodes()
end.
-spec clean_cache(node()) -> non_neg_integer().
clean_cache(Node) ->
ets_cache:filter(
?CARBONCOPY_CACHE,
fun(_, error) ->
false;
(_, {ok, L}) ->
not lists:any(fun({_, _, N}) -> N == Node end, L)
end).
-spec clean_cache() -> ok.
clean_cache() ->
ejabberd_cluster:eval_everywhere(?MODULE, clean_cache, [node()]).
-spec delete_cache(module(), binary(), binary()) -> ok.
delete_cache(Mod, User, Server) ->
case use_cache(Mod, Server) of
true ->
ets_cache:delete(?CARBONCOPY_CACHE, {User, Server},
cache_nodes(Mod, Server));
false ->
ok
end.
lists:filtermap(
fun({Resource, Info}) ->
case lists:keyfind(carboncopy, 1, Info) of
{_, NS} -> {true, {Resource, NS}};
false -> false
end
end, ejabberd_sm:get_user_info(User, Server)).
depends(_Host, _Opts) ->
[].
mod_opt_type(ram_db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
mod_opt_type(O) when O == use_cache; O == cache_missed ->
fun(B) when is_boolean(B) -> B end;
mod_opt_type(O) when O == cache_size; O == cache_life_time ->
fun(I) when is_integer(I), I>0 -> I;
(unlimited) -> infinity;
(infinity) -> infinity
mod_opt_type(O) when O == cache_size; O == cache_life_time;
O == use_cache; O == cache_missed;
O == ram_db_type ->
fun(deprecated) -> deprecated;
(_) ->
?WARNING_MSG("Option ~s of ~s has no effect anymore "
"and will be ingored", [O, ?MODULE]),
deprecated
end.
mod_options(Host) ->
[{ram_db_type, ejabberd_config:default_ram_db(Host, ?MODULE)},
{use_cache, ejabberd_config:use_cache(Host)},
{cache_size, ejabberd_config:cache_size(Host)},
{cache_missed, ejabberd_config:cache_missed(Host)},
{cache_life_time, ejabberd_config:cache_life_time(Host)}].
mod_options(_) ->
[{ram_db_type, deprecated},
{use_cache, deprecated},
{cache_size, deprecated},
{cache_missed, deprecated},
{cache_life_time, deprecated}].

View File

@ -1,79 +0,0 @@
%%%-------------------------------------------------------------------
%%% File : mod_carboncopy_mnesia.erl
%%% Author : Evgeny Khramtsov <ekhramtsov@process-one.net>
%%% Created : 15 Apr 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2018 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%----------------------------------------------------------------------
-module(mod_carboncopy_mnesia).
-behaviour(mod_carboncopy).
%% API
-export([init/2, enable/4, disable/3, list/2, use_cache/1]).
-include("mod_carboncopy.hrl").
%%%===================================================================
%%% API
%%%===================================================================
init(_Host, _Opts) ->
Fields = record_info(fields, carboncopy),
try mnesia:table_info(carboncopy, attributes) of
Fields ->
ok;
_ ->
%% recreate..
mnesia:delete_table(carboncopy)
catch _:_Error ->
%% probably table don't exist
ok
end,
ejabberd_mnesia:create(?MODULE, carboncopy,
[{ram_copies, [node()]},
{attributes, record_info(fields, carboncopy)},
{type, bag}]).
enable(LUser, LServer, LResource, NS) ->
mnesia:dirty_write(
#carboncopy{us = {LUser, LServer},
resource = LResource,
version = NS}).
disable(LUser, LServer, LResource) ->
ToDelete = mnesia:dirty_match_object(
#carboncopy{us = {LUser, LServer},
resource = LResource,
_ = '_'}),
lists:foreach(fun mnesia:dirty_delete_object/1, ToDelete).
list(LUser, LServer) ->
{ok, mnesia:dirty_select(
carboncopy,
[{#carboncopy{us = {LUser, LServer}, resource = '$2',
version = '$3', node = '$4'},
[], [{{'$2','$3','$4'}}]}])}.
use_cache(_LServer) ->
false.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -1,176 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
%%% Created : 30 Mar 2017 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2018 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%-------------------------------------------------------------------
-module(mod_carboncopy_redis).
-behaviour(mod_carboncopy).
-behaviour(gen_server).
%% API
-export([init/2, enable/4, disable/3, list/2, cache_nodes/1]).
%% gen_server callbacks
-export([init/1, handle_cast/2, handle_call/3, handle_info/2,
terminate/2, code_change/3, start_link/0]).
-include("logger.hrl").
-include("mod_carboncopy.hrl").
-define(CARBONCOPY_KEY, <<"ejabberd:carboncopy">>).
-record(state, {}).
%%%===================================================================
%%% API
%%%===================================================================
init(_Host, _Opts) ->
Spec = {?MODULE, {?MODULE, start_link, []},
transient, 5000, worker, [?MODULE]},
case supervisor:start_child(ejabberd_backend_sup, Spec) of
{ok, _Pid} -> ok;
Err -> Err
end.
-spec start_link() -> {ok, pid()} | {error, any()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
cache_nodes(_LServer) ->
[node()].
enable(LUser, LServer, LResource, NS) ->
USKey = us_key(LUser, LServer),
NodeKey = node_key(),
JID = jid:encode({LUser, LServer, LResource}),
Data = term_to_binary({NS, node()}),
case ejabberd_redis:multi(
fun() ->
ejabberd_redis:hset(USKey, LResource, Data),
ejabberd_redis:sadd(NodeKey, [JID]),
ejabberd_redis:publish(
?CARBONCOPY_KEY,
term_to_binary({delete, {LUser, LServer}}))
end) of
{ok, _} ->
ok;
{error, _} ->
{error, db_failure}
end.
disable(LUser, LServer, LResource) ->
USKey = us_key(LUser, LServer),
NodeKey = node_key(),
JID = jid:encode({LUser, LServer, LResource}),
case ejabberd_redis:multi(
fun() ->
ejabberd_redis:hdel(USKey, [LResource]),
ejabberd_redis:srem(NodeKey, [JID]),
ejabberd_redis:publish(
?CARBONCOPY_KEY,
term_to_binary({delete, {LUser, LServer}}))
end) of
{ok, _} ->
ok;
{error, _} ->
{error, db_failure}
end.
list(LUser, LServer) ->
USKey = us_key(LUser, LServer),
case ejabberd_redis:hgetall(USKey) of
{ok, Pairs} ->
{ok, lists:flatmap(
fun({Resource, Data}) ->
try
{NS, Node} = binary_to_term(Data),
[{Resource, NS, Node}]
catch _:_ ->
?ERROR_MSG("invalid term stored in Redis "
"(key = ~s): ~p",
[USKey, Data]),
[]
end
end, Pairs)};
{error, _} ->
{error, db_failure}
end.
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([]) ->
ejabberd_redis:subscribe([?CARBONCOPY_KEY]),
clean_table(),
{ok, #state{}}.
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({redis_message, ?CARBONCOPY_KEY, Data}, State) ->
case binary_to_term(Data) of
{delete, Key} ->
ets_cache:delete(?CARBONCOPY_CACHE, Key);
Msg ->
?WARNING_MSG("unexpected redis message: ~p", [Msg])
end,
{noreply, State};
handle_info(Info, State) ->
?ERROR_MSG("unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
clean_table() ->
?DEBUG("Cleaning Redis 'carboncopy' table...", []),
NodeKey = node_key(),
case ejabberd_redis:smembers(NodeKey) of
{ok, JIDs} ->
ejabberd_redis:multi(
fun() ->
lists:foreach(
fun(JID) ->
{U, S, R} = jid:split(jid:decode(JID)),
USKey = us_key(U, S),
ejabberd_redis:hdel(USKey, [R])
end, JIDs)
end);
{error, _} ->
ok
end,
ejabberd_redis:del([NodeKey]),
ok.
us_key(LUser, LServer) ->
<<"ejabberd:carboncopy:users:", LUser/binary, $@, LServer/binary>>.
node_key() ->
Node = erlang:atom_to_binary(node(), latin1),
<<"ejabberd:carboncopy:nodes:", Node/binary>>.

View File

@ -1,82 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
%%% Created : 15 Apr 2017 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2018 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%-------------------------------------------------------------------
-module(mod_carboncopy_riak).
-behaviour(mod_carboncopy).
%% API
-export([init/2, enable/4, disable/3, list/2]).
-include("logger.hrl").
-include("mod_carboncopy.hrl").
%%%===================================================================
%%% API
%%%===================================================================
init(_Host, _Opts) ->
clean_table().
enable(LUser, LServer, LResource, NS) ->
ejabberd_riak:put(#carboncopy{us = {LUser, LServer},
resource = LResource,
version = NS},
carboncopy_schema(),
[{i, {LUser, LServer, LResource}},
{'2i', [{<<"us">>, {LUser, LServer}}]}]).
disable(LUser, LServer, LResource) ->
ejabberd_riak:delete(carboncopy, {LUser, LServer, LResource}).
list(LUser, LServer) ->
case ejabberd_riak:get_by_index(
carboncopy, carboncopy_schema(),
<<"us">>, {LUser, LServer}) of
{ok, Rs} ->
{ok, [{Resource, NS, Node}
|| #carboncopy{resource = Resource,
version = NS,
node = Node} <- Rs]};
{error, _} = Err ->
Err
end.
%%%===================================================================
%%% Internal functions
%%%===================================================================
carboncopy_schema() ->
{record_info(fields, carboncopy), #carboncopy{}}.
clean_table() ->
?DEBUG("Cleaning Riak 'carboncopy' table...", []),
case ejabberd_riak:get(carboncopy, carboncopy_schema()) of
{ok, Rs} ->
lists:foreach(
fun(#carboncopy{us = {U, S}, resource = R, node = Node})
when Node == node() ->
ejabberd_riak:delete(carboncopy, {U, S, R});
(_) ->
ok
end, Rs);
{error, Reason} = Err ->
?ERROR_MSG("Failed to clean Riak 'carboncopy' table: ~p", [Reason]),
Err
end.

View File

@ -1,91 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
%%% Created : 29 Mar 2017 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2018 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
%%% published by the Free Software Foundation; either version 2 of the
%%% License, or (at your option) any later version.
%%%
%%% This program is distributed in the hope that it will be useful,
%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
%%% General Public License for more details.
%%%
%%% You should have received a copy of the GNU General Public License along
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
%%%-------------------------------------------------------------------
-module(mod_carboncopy_sql).
-behaviour(mod_carboncopy).
-compile([{parse_transform, ejabberd_sql_pt}]).
%% API
-export([init/2, enable/4, disable/3, list/2]).
-include("logger.hrl").
-include("ejabberd_sql_pt.hrl").
%%%===================================================================
%%% API
%%%===================================================================
init(Host, _Opts) ->
clean_table(Host).
enable(LUser, LServer, LResource, NS) ->
NodeS = erlang:atom_to_binary(node(), latin1),
case ?SQL_UPSERT(LServer, "carboncopy",
["!username=%(LUser)s",
"!server_host=%(LServer)s",
"!resource=%(LResource)s",
"namespace=%(NS)s",
"node=%(NodeS)s"]) of
ok ->
ok;
_Err ->
{error, db_failure}
end.
disable(LUser, LServer, LResource) ->
case ejabberd_sql:sql_query(
LServer,
?SQL("delete from carboncopy where username=%(LUser)s "
"and %(LServer)H and resource=%(LResource)s")) of
{updated, _} ->
ok;
_Err ->
{error, db_failure}
end.
list(LUser, LServer) ->
case ejabberd_sql:sql_query(
LServer,
?SQL("select @(resource)s, @(namespace)s, @(node)s from carboncopy "
"where username=%(LUser)s and %(LServer)H")) of
{selected, Rows} ->
{ok, [{Resource, NS, binary_to_atom(Node, latin1)}
|| {Resource, NS, Node} <- Rows]};
_Err ->
{error, db_failure}
end.
%%%===================================================================
%%% Internal functions
%%%===================================================================
clean_table(LServer) ->
NodeS = erlang:atom_to_binary(node(), latin1),
?DEBUG("Cleaning SQL 'carboncopy' table...", []),
case ejabberd_sql:sql_query(
LServer,
?SQL("delete from carboncopy where node=%(NodeS)s")) of
{updated, _} ->
ok;
Err ->
?ERROR_MSG("failed to clean 'carboncopy' table: ~p", [Err]),
{error, db_failure}
end.