mirror of
https://github.com/processone/ejabberd.git
synced 2024-12-08 16:52:59 +01:00
Use cache in front of Redis/SQL RAM backends
This commit is contained in:
parent
aa7d5df6a0
commit
e40baf0bda
@ -47,3 +47,5 @@
|
||||
|
||||
-define(HEADER(CType),
|
||||
[CType, ?AC_ALLOW_ORIGIN, ?AC_ALLOW_HEADERS]).
|
||||
|
||||
-define(BOSH_CACHE, bosh_cache).
|
||||
|
@ -1,3 +1,5 @@
|
||||
-define(ROUTES_CACHE, routes_cache).
|
||||
|
||||
-type local_hint() :: integer() | {apply, atom(), atom()}.
|
||||
|
||||
-record(route, {domain :: binary() | '_',
|
||||
|
@ -21,6 +21,8 @@
|
||||
-ifndef(EJABBERD_SM_HRL).
|
||||
-define(EJABBERD_SM_HRL, true).
|
||||
|
||||
-define(SM_CACHE, sm_cache).
|
||||
|
||||
-record(session, {sid, usr, us, priority, info = []}).
|
||||
-record(session_counter, {vhost, count}).
|
||||
-type sid() :: {erlang:timestamp(), pid()}.
|
||||
|
@ -22,3 +22,5 @@
|
||||
-record(carboncopy, {us :: {binary(), binary()} | matchspec_atom(),
|
||||
resource :: binary() | matchspec_atom(),
|
||||
version :: binary() | matchspec_atom()}).
|
||||
|
||||
-define(CARBONCOPY_CACHE, carboncopy_cache).
|
||||
|
@ -20,7 +20,7 @@
|
||||
|
||||
{deps, [{lager, ".*", {git, "https://github.com/basho/lager", {tag, "3.2.1"}}},
|
||||
{p1_utils, ".*", {git, "https://github.com/processone/p1_utils", {tag, "1.0.8"}}},
|
||||
{cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "1.0.7"}}},
|
||||
{cache_tab, ".*", {git, "https://github.com/processone/cache_tab", "35cc9904fde"}},
|
||||
{fast_tls, ".*", {git, "https://github.com/processone/fast_tls", {tag, "1.0.11"}}},
|
||||
{stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.8"}}},
|
||||
{fast_xml, ".*", {git, "https://github.com/processone/fast_xml", {tag, "1.1.21"}}},
|
||||
|
@ -26,7 +26,8 @@
|
||||
-module(ejabberd_cluster).
|
||||
|
||||
%% API
|
||||
-export([get_nodes/0, call/4, multicall/3, multicall/4]).
|
||||
-export([get_nodes/0, call/4, multicall/3, multicall/4,
|
||||
eval_everywhere/3, eval_everywhere/4]).
|
||||
-export([join/1, leave/1, get_known_nodes/0]).
|
||||
-export([node_id/0, get_node_by_id/1]).
|
||||
|
||||
@ -59,6 +60,18 @@ multicall(Module, Function, Args) ->
|
||||
multicall(Nodes, Module, Function, Args) ->
|
||||
rpc:multicall(Nodes, Module, Function, Args, 5000).
|
||||
|
||||
-spec eval_everywhere(module(), atom(), [any()]) -> ok.
|
||||
|
||||
eval_everywhere(Module, Function, Args) ->
|
||||
eval_everywhere(get_nodes(), Module, Function, Args),
|
||||
ok.
|
||||
|
||||
-spec eval_everywhere([node()], module(), atom(), [any()]) -> ok.
|
||||
|
||||
eval_everywhere(Nodes, Module, Function, Args) ->
|
||||
rpc:eval_everywhere(Nodes, Module, Function, Args),
|
||||
ok.
|
||||
|
||||
-spec join(node()) -> ok | {error, any()}.
|
||||
|
||||
join(Node) ->
|
||||
|
@ -37,7 +37,8 @@
|
||||
env_binary_to_list/2, opt_type/1, may_hide_data/1,
|
||||
is_elixir_enabled/0, v_dbs/1, v_dbs_mods/1,
|
||||
default_db/1, default_db/2, default_ram_db/1, default_ram_db/2,
|
||||
default_queue_type/1, queue_dir/0, fsm_limit_opts/1]).
|
||||
default_queue_type/1, queue_dir/0, fsm_limit_opts/1,
|
||||
use_cache/1, cache_size/1, cache_missed/1, cache_life_time/1]).
|
||||
|
||||
-export([start/2]).
|
||||
|
||||
@ -1460,9 +1461,24 @@ opt_type(queue_dir) ->
|
||||
fun iolist_to_binary/1;
|
||||
opt_type(queue_type) ->
|
||||
fun(ram) -> ram; (file) -> file end;
|
||||
opt_type(use_cache) ->
|
||||
fun(B) when is_boolean(B) -> B end;
|
||||
opt_type(cache_size) ->
|
||||
fun(I) when is_integer(I), I>0 -> I;
|
||||
(infinity) -> infinity;
|
||||
(unlimited) -> infinity
|
||||
end;
|
||||
opt_type(cache_missed) ->
|
||||
fun(B) when is_boolean(B) -> B end;
|
||||
opt_type(cache_life_time) ->
|
||||
fun(I) when is_integer(I), I>0 -> I;
|
||||
(infinity) -> infinity;
|
||||
(unlimited) -> infinity
|
||||
end;
|
||||
opt_type(_) ->
|
||||
[hide_sensitive_log_data, hosts, language, max_fsm_queue,
|
||||
default_db, default_ram_db, queue_type, queue_dir, loglevel].
|
||||
default_db, default_ram_db, queue_type, queue_dir, loglevel,
|
||||
use_cache, cache_size, cache_missed, cache_life_time].
|
||||
|
||||
-spec may_hide_data(any()) -> any().
|
||||
may_hide_data(Data) ->
|
||||
@ -1499,3 +1515,20 @@ queue_dir() ->
|
||||
-spec default_queue_type(binary()) -> ram | file.
|
||||
default_queue_type(Host) ->
|
||||
get_option({queue_type, Host}, opt_type(queue_type), ram).
|
||||
|
||||
-spec use_cache(binary() | global) -> boolean().
|
||||
use_cache(Host) ->
|
||||
get_option({use_cache, Host}, opt_type(use_cache), true).
|
||||
|
||||
-spec cache_size(binary() | global) -> pos_integer() | infinity.
|
||||
cache_size(Host) ->
|
||||
get_option({cache_size, Host}, opt_type(cache_size), 1000).
|
||||
|
||||
-spec cache_missed(binary() | global) -> boolean().
|
||||
cache_missed(Host) ->
|
||||
get_option({cache_missed, Host}, opt_type(cache_missed), true).
|
||||
|
||||
-spec cache_life_time(binary() | global) -> pos_integer() | infinity.
|
||||
%% NOTE: the integer value returned is in *seconds*
|
||||
cache_life_time(Host) ->
|
||||
get_option({cache_life_time, Host}, opt_type(cache_life_time), 3600).
|
||||
|
@ -31,11 +31,12 @@
|
||||
-compile({no_auto_import, [get/1, put/2]}).
|
||||
|
||||
%% API
|
||||
-export([start_link/1, get_proc/1, q/1, qp/1, format_error/1]).
|
||||
-export([start_link/1, get_proc/1, get_connection/1, q/1, qp/1, format_error/1]).
|
||||
%% Commands
|
||||
-export([multi/1, get/1, set/2, del/1,
|
||||
sadd/2, srem/2, smembers/1, sismember/2, scard/1,
|
||||
hget/2, hset/3, hdel/2, hlen/1, hgetall/1, hkeys/1]).
|
||||
hget/2, hset/3, hdel/2, hlen/1, hgetall/1, hkeys/1,
|
||||
subscribe/1, publish/2]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
@ -53,14 +54,18 @@
|
||||
|
||||
-record(state, {connection :: pid() | undefined,
|
||||
num :: pos_integer(),
|
||||
subscriptions = #{} :: map(),
|
||||
pending_q :: p1_queue:queue()}).
|
||||
|
||||
-type redis_error() :: {error, binary() | timeout | disconnected | overloaded}.
|
||||
-type error_reason() :: binary() | timeout | disconnected | overloaded.
|
||||
-type redis_error() :: {error, error_reason()}.
|
||||
-type redis_reply() :: binary() | [binary()].
|
||||
-type redis_command() :: [binary()].
|
||||
-type redis_pipeline() :: [redis_command()].
|
||||
-type state() :: #state{}.
|
||||
|
||||
-export_type([error_reason/0]).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
@ -79,11 +84,11 @@ get_connection(I) ->
|
||||
|
||||
-spec q(redis_command()) -> {ok, redis_reply()} | redis_error().
|
||||
q(Command) ->
|
||||
call(get_worker(), {q, Command}, ?MAX_RETRIES).
|
||||
call(get_rnd_id(), {q, Command}, ?MAX_RETRIES).
|
||||
|
||||
-spec qp(redis_pipeline()) -> {ok, [redis_reply()]} | redis_error().
|
||||
qp(Pipeline) ->
|
||||
call(get_worker(), {qp, Pipeline}, ?MAX_RETRIES).
|
||||
call(get_rnd_id(), {qp, Pipeline}, ?MAX_RETRIES).
|
||||
|
||||
-spec multi(fun(() -> any())) -> {ok, [redis_reply()]} | redis_error().
|
||||
multi(F) ->
|
||||
@ -288,6 +293,30 @@ hkeys(Key) ->
|
||||
erlang:error(transaction_unsupported)
|
||||
end.
|
||||
|
||||
-spec subscribe([binary()]) -> ok | redis_error().
|
||||
subscribe(Channels) ->
|
||||
try ?GEN_SERVER:call(get_proc(1), {subscribe, self(), Channels}, ?CALL_TIMEOUT)
|
||||
catch exit:{Why, {?GEN_SERVER, call, _}} ->
|
||||
Reason = case Why of
|
||||
timeout -> timeout;
|
||||
_ -> disconnected
|
||||
end,
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
-spec publish(iodata(), iodata()) -> {ok, non_neg_integer()} | redis_error() | queued.
|
||||
publish(Channel, Data) ->
|
||||
Cmd = [<<"PUBLISH">>, Channel, Data],
|
||||
case erlang:get(?TR_STACK) of
|
||||
undefined ->
|
||||
case q(Cmd) of
|
||||
{ok, N} -> {ok, binary_to_integer(N)};
|
||||
{error, _} = Err -> Err
|
||||
end;
|
||||
Stack ->
|
||||
tr_enq(Cmd, Stack)
|
||||
end.
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
@ -315,6 +344,15 @@ handle_call(connect, From, #state{connection = Pid} = State) ->
|
||||
self() ! connect,
|
||||
handle_call(connect, From, State#state{connection = undefined})
|
||||
end;
|
||||
handle_call({subscribe, Caller, Channels}, _From,
|
||||
#state{connection = Pid, subscriptions = Subs} = State) ->
|
||||
Subs1 = lists:foldl(
|
||||
fun(Channel, Acc) ->
|
||||
Callers = maps:get(Channel, Acc, []) -- [Caller],
|
||||
maps:put(Channel, [Caller|Callers], Acc)
|
||||
end, Subs, Channels),
|
||||
eredis_subscribe(Pid, Channels),
|
||||
{reply, ok, State#state{subscriptions = Subs1}};
|
||||
handle_call(Request, _From, State) ->
|
||||
?WARNING_MSG("unexepected call: ~p", [Request]),
|
||||
{noreply, State}.
|
||||
@ -326,6 +364,7 @@ handle_info(connect, #state{connection = undefined} = State) ->
|
||||
NewState = case connect(State) of
|
||||
{ok, Connection} ->
|
||||
Q1 = flush_queue(State#state.pending_q),
|
||||
re_subscribe(Connection, State#state.subscriptions),
|
||||
State#state{connection = Connection, pending_q = Q1};
|
||||
{error, _} ->
|
||||
State
|
||||
@ -342,6 +381,31 @@ handle_info({'EXIT', Pid, _}, State) ->
|
||||
_ ->
|
||||
{noreply, State}
|
||||
end;
|
||||
handle_info({subscribed, Channel, Pid}, State) ->
|
||||
case State#state.connection of
|
||||
Pid ->
|
||||
case maps:is_key(Channel, State#state.subscriptions) of
|
||||
true -> eredis_sub:ack_message(Pid);
|
||||
false ->
|
||||
?WARNING_MSG("got subscription ack for unknown channel ~s",
|
||||
[Channel])
|
||||
end;
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
{noreply, State};
|
||||
handle_info({message, Channel, Data, Pid}, State) ->
|
||||
case State#state.connection of
|
||||
Pid ->
|
||||
lists:foreach(
|
||||
fun(Subscriber) ->
|
||||
erlang:send(Subscriber, {redis_message, Channel, Data})
|
||||
end, maps:get(Channel, State#state.subscriptions, [])),
|
||||
eredis_sub:ack_message(Pid);
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
{noreply, State};
|
||||
handle_info(Info, State) ->
|
||||
?WARNING_MSG("unexpected info = ~p", [Info]),
|
||||
{noreply, State}.
|
||||
@ -377,8 +441,7 @@ connect(#state{num = Num}) ->
|
||||
redis_connect_timeout,
|
||||
fun(I) when is_integer(I), I>0 -> I end,
|
||||
1)),
|
||||
try case eredis:start_link(Server, Port, DB, Pass,
|
||||
no_reconnect, ConnTimeout) of
|
||||
try case do_connect(Num, Server, Port, Pass, DB, ConnTimeout) of
|
||||
{ok, Client} ->
|
||||
?DEBUG("Connection #~p established to Redis at ~s:~p",
|
||||
[Num, Server, Port]),
|
||||
@ -397,12 +460,24 @@ connect(#state{num = Num}) ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
-spec call({atom(), atom()}, {q, redis_command()}, integer()) ->
|
||||
do_connect(1, Server, Port, Pass, _DB, _ConnTimeout) ->
|
||||
%% First connection in the pool is always a subscriber
|
||||
Res = eredis_sub:start_link(Server, Port, Pass, no_reconnect, infinity, drop),
|
||||
case Res of
|
||||
{ok, Pid} -> eredis_sub:controlling_process(Pid);
|
||||
_ -> ok
|
||||
end,
|
||||
Res;
|
||||
do_connect(_, Server, Port, Pass, DB, ConnTimeout) ->
|
||||
eredis:start_link(Server, Port, DB, Pass, no_reconnect, ConnTimeout).
|
||||
|
||||
-spec call(pos_integer(), {q, redis_command()}, integer()) ->
|
||||
{ok, redis_reply()} | redis_error();
|
||||
({atom(), atom()}, {qp, redis_pipeline()}, integer()) ->
|
||||
(pos_integer(), {qp, redis_pipeline()}, integer()) ->
|
||||
{ok, [redis_reply()]} | redis_error().
|
||||
call({Conn, Parent}, {F, Cmd}, Retries) ->
|
||||
call(I, {F, Cmd}, Retries) ->
|
||||
?DEBUG("redis query: ~p", [Cmd]),
|
||||
Conn = get_connection(I),
|
||||
Res = try eredis:F(Conn, Cmd, ?CALL_TIMEOUT) of
|
||||
{error, Reason} when is_atom(Reason) ->
|
||||
try exit(whereis(Conn), kill) catch _:_ -> ok end,
|
||||
@ -414,8 +489,8 @@ call({Conn, Parent}, {F, Cmd}, Retries) ->
|
||||
end,
|
||||
case Res of
|
||||
{error, disconnected} when Retries > 0 ->
|
||||
try ?GEN_SERVER:call(Parent, connect, ?CALL_TIMEOUT) of
|
||||
ok -> call({Conn, Parent}, {F, Cmd}, Retries-1);
|
||||
try ?GEN_SERVER:call(get_proc(I), connect, ?CALL_TIMEOUT) of
|
||||
ok -> call(I, {F, Cmd}, Retries-1);
|
||||
{error, _} = Err -> Err
|
||||
catch exit:{Why, {?GEN_SERVER, call, _}} ->
|
||||
Reason1 = case Why of
|
||||
@ -439,11 +514,9 @@ log_error(Cmd, Reason) ->
|
||||
"** response = ~s",
|
||||
[Cmd, format_error(Reason)]).
|
||||
|
||||
-spec get_worker() -> {atom(), atom()}.
|
||||
get_worker() ->
|
||||
Time = p1_time_compat:system_time(),
|
||||
I = erlang:phash2(Time, ejabberd_redis_sup:get_pool_size()) + 1,
|
||||
{get_connection(I), get_proc(I)}.
|
||||
-spec get_rnd_id() -> pos_integer().
|
||||
get_rnd_id() ->
|
||||
randoms:uniform(2, ejabberd_redis_sup:get_pool_size()).
|
||||
|
||||
-spec get_result([{error, atom() | binary()} | {ok, iodata()}]) ->
|
||||
{ok, [redis_reply()]} | {error, binary()}.
|
||||
@ -531,3 +604,13 @@ clean_queue(Q, CurrTime) ->
|
||||
true ->
|
||||
Q1
|
||||
end.
|
||||
|
||||
re_subscribe(Pid, Subs) ->
|
||||
case maps:keys(Subs) of
|
||||
[] -> ok;
|
||||
Channels -> eredis_subscribe(Pid, Channels)
|
||||
end.
|
||||
|
||||
eredis_subscribe(Pid, Channels) ->
|
||||
?DEBUG("redis query: ~p", [[<<"SUBSCRIBE">>|Channels]]),
|
||||
eredis_sub:subscribe(Pid, Channels).
|
||||
|
@ -136,7 +136,7 @@ get_pool_size() ->
|
||||
ejabberd_config:get_option(
|
||||
redis_pool_size,
|
||||
fun(N) when is_integer(N), N >= 1 -> N end,
|
||||
?DEFAULT_POOL_SIZE).
|
||||
?DEFAULT_POOL_SIZE) + 1.
|
||||
|
||||
iolist_to_list(IOList) ->
|
||||
binary_to_list(iolist_to_binary(IOList)).
|
||||
|
@ -49,7 +49,8 @@
|
||||
get_all_routes/0,
|
||||
is_my_route/1,
|
||||
is_my_host/1,
|
||||
find_routes/0,
|
||||
clean_cache/1,
|
||||
config_reloaded/0,
|
||||
get_backend/0]).
|
||||
|
||||
-export([start_link/0]).
|
||||
@ -70,12 +71,8 @@
|
||||
-callback register_route(binary(), binary(), local_hint(),
|
||||
undefined | pos_integer(), pid()) -> ok | {error, term()}.
|
||||
-callback unregister_route(binary(), undefined | pos_integer(), pid()) -> ok | {error, term()}.
|
||||
-callback find_routes(binary()) -> [#route{}].
|
||||
-callback find_routes() -> [#route{}].
|
||||
-callback host_of_route(binary()) -> {ok, binary()} | error.
|
||||
-callback is_my_route(binary()) -> boolean().
|
||||
-callback is_my_host(binary()) -> boolean().
|
||||
-callback get_all_routes() -> [binary()].
|
||||
-callback find_routes(binary()) -> {ok, [#route{}]} | {error, any()}.
|
||||
-callback get_all_routes() -> {ok, [binary()]} | {error, any()}.
|
||||
|
||||
-record(state, {}).
|
||||
|
||||
@ -159,7 +156,8 @@ register_route(Domain, ServerHost, LocalHint, Pid) ->
|
||||
case Mod:register_route(LDomain, LServerHost, LocalHint,
|
||||
get_component_number(LDomain), Pid) of
|
||||
ok ->
|
||||
?DEBUG("Route registered: ~s", [LDomain]);
|
||||
?DEBUG("Route registered: ~s", [LDomain]),
|
||||
delete_cache(Mod, LDomain);
|
||||
{error, Err} ->
|
||||
?ERROR_MSG("Failed to register route ~s: ~p",
|
||||
[LDomain, Err])
|
||||
@ -186,7 +184,8 @@ unregister_route(Domain, Pid) ->
|
||||
case Mod:unregister_route(
|
||||
LDomain, get_component_number(LDomain), Pid) of
|
||||
ok ->
|
||||
?DEBUG("Route unregistered: ~s", [LDomain]);
|
||||
?DEBUG("Route unregistered: ~s", [LDomain]),
|
||||
delete_cache(Mod, LDomain);
|
||||
{error, Err} ->
|
||||
?ERROR_MSG("Failed to unregister route ~s: ~p",
|
||||
[LDomain, Err])
|
||||
@ -199,15 +198,55 @@ unregister_routes(Domains) ->
|
||||
end,
|
||||
Domains).
|
||||
|
||||
-spec find_routes(binary()) -> [#route{}].
|
||||
find_routes(Domain) ->
|
||||
Mod = get_backend(),
|
||||
case use_cache(Mod) of
|
||||
true ->
|
||||
case ets_cache:lookup(
|
||||
?ROUTES_CACHE, {route, Domain},
|
||||
fun() ->
|
||||
case Mod:find_routes(Domain) of
|
||||
{ok, Rs} when Rs /= [] ->
|
||||
{ok, Rs};
|
||||
_ ->
|
||||
error
|
||||
end
|
||||
end) of
|
||||
{ok, Rs} -> Rs;
|
||||
error -> []
|
||||
end;
|
||||
false ->
|
||||
case Mod:find_routes(Domain) of
|
||||
{ok, Rs} -> Rs;
|
||||
_ -> []
|
||||
end
|
||||
end.
|
||||
|
||||
-spec get_all_routes() -> [binary()].
|
||||
get_all_routes() ->
|
||||
Mod = get_backend(),
|
||||
Mod:get_all_routes().
|
||||
|
||||
-spec find_routes() -> [#route{}].
|
||||
find_routes() ->
|
||||
Mod = get_backend(),
|
||||
Mod:find_routes().
|
||||
case use_cache(Mod) of
|
||||
true ->
|
||||
case ets_cache:lookup(
|
||||
?ROUTES_CACHE, routes,
|
||||
fun() ->
|
||||
case Mod:get_all_routes() of
|
||||
{ok, Rs} when Rs /= [] ->
|
||||
{ok, Rs};
|
||||
_ ->
|
||||
error
|
||||
end
|
||||
end) of
|
||||
{ok, Rs} -> Rs;
|
||||
error -> []
|
||||
end;
|
||||
false ->
|
||||
case Mod:get_all_routes() of
|
||||
{ok, Rs} -> Rs;
|
||||
_ -> []
|
||||
end
|
||||
end.
|
||||
|
||||
-spec host_of_route(binary()) -> binary().
|
||||
host_of_route(Domain) ->
|
||||
@ -215,10 +254,11 @@ host_of_route(Domain) ->
|
||||
error ->
|
||||
erlang:error({invalid_domain, Domain});
|
||||
LDomain ->
|
||||
Mod = get_backend(),
|
||||
case Mod:host_of_route(LDomain) of
|
||||
{ok, ServerHost} -> ServerHost;
|
||||
error -> erlang:error({unregistered_route, Domain})
|
||||
case find_routes(LDomain) of
|
||||
[#route{server_host = ServerHost}|_] ->
|
||||
ServerHost;
|
||||
_ ->
|
||||
erlang:error({unregistered_route, Domain})
|
||||
end
|
||||
end.
|
||||
|
||||
@ -228,8 +268,7 @@ is_my_route(Domain) ->
|
||||
error ->
|
||||
erlang:error({invalid_domain, Domain});
|
||||
LDomain ->
|
||||
Mod = get_backend(),
|
||||
Mod:is_my_route(LDomain)
|
||||
lists:member(LDomain, get_all_routes())
|
||||
end.
|
||||
|
||||
-spec is_my_host(binary()) -> boolean().
|
||||
@ -238,8 +277,10 @@ is_my_host(Domain) ->
|
||||
error ->
|
||||
erlang:error({invalid_domain, Domain});
|
||||
LDomain ->
|
||||
Mod = get_backend(),
|
||||
Mod:is_my_host(LDomain)
|
||||
case find_routes(LDomain) of
|
||||
[#route{server_host = LDomain}|_] -> true;
|
||||
_ -> false
|
||||
end
|
||||
end.
|
||||
|
||||
-spec process_iq(iq()) -> any().
|
||||
@ -250,12 +291,20 @@ process_iq(#iq{to = To} = IQ) ->
|
||||
ejabberd_sm:process_iq(IQ)
|
||||
end.
|
||||
|
||||
-spec config_reloaded() -> ok.
|
||||
config_reloaded() ->
|
||||
Mod = get_backend(),
|
||||
init_cache(Mod).
|
||||
|
||||
%%====================================================================
|
||||
%% gen_server callbacks
|
||||
%%====================================================================
|
||||
init([]) ->
|
||||
ejabberd_hooks:add(config_reloaded, ?MODULE, config_reloaded, 50),
|
||||
Mod = get_backend(),
|
||||
init_cache(Mod),
|
||||
Mod:init(),
|
||||
clean_cache(),
|
||||
{ok, #state{}}.
|
||||
|
||||
handle_call(_Request, _From, State) ->
|
||||
@ -273,7 +322,7 @@ handle_info(Info, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
ok.
|
||||
ejabberd_hooks:add(config_reloaded, ?MODULE, config_reloaded, 50).
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
@ -290,8 +339,7 @@ do_route(OrigPacket) ->
|
||||
Packet ->
|
||||
To = xmpp:get_to(Packet),
|
||||
LDstDomain = To#jid.lserver,
|
||||
Mod = get_backend(),
|
||||
case Mod:find_routes(LDstDomain) of
|
||||
case find_routes(LDstDomain) of
|
||||
[] ->
|
||||
ejabberd_s2s:route(Packet);
|
||||
[Route] ->
|
||||
@ -366,6 +414,80 @@ get_backend() ->
|
||||
end,
|
||||
list_to_atom("ejabberd_router_" ++ atom_to_list(DBType)).
|
||||
|
||||
-spec cache_nodes(module()) -> [node()].
|
||||
cache_nodes(Mod) ->
|
||||
case erlang:function_exported(Mod, cache_nodes, 0) of
|
||||
true -> Mod:cache_nodes();
|
||||
false -> ejabberd_cluster:get_nodes()
|
||||
end.
|
||||
|
||||
-spec use_cache(module()) -> boolean().
|
||||
use_cache(Mod) ->
|
||||
case erlang:function_exported(Mod, use_cache, 0) of
|
||||
true -> Mod:use_cache();
|
||||
false ->
|
||||
ejabberd_config:get_option(
|
||||
router_use_cache, opt_type(router_use_cache),
|
||||
ejabberd_config:use_cache(global))
|
||||
end.
|
||||
|
||||
-spec delete_cache(module(), binary()) -> ok.
|
||||
delete_cache(Mod, Domain) ->
|
||||
case use_cache(Mod) of
|
||||
true ->
|
||||
ets_cache:delete(?ROUTES_CACHE, {route, Domain}, cache_nodes(Mod)),
|
||||
ets_cache:delete(?ROUTES_CACHE, routes, cache_nodes(Mod));
|
||||
false ->
|
||||
ok
|
||||
end.
|
||||
|
||||
-spec init_cache(module()) -> ok.
|
||||
init_cache(Mod) ->
|
||||
case use_cache(Mod) of
|
||||
true ->
|
||||
ets_cache:new(?ROUTES_CACHE, cache_opts());
|
||||
false ->
|
||||
ets_cache:delete(?ROUTES_CACHE)
|
||||
end.
|
||||
|
||||
-spec cache_opts() -> [proplists:property()].
|
||||
cache_opts() ->
|
||||
MaxSize = ejabberd_config:get_option(
|
||||
router_cache_size,
|
||||
opt_type(router_cache_size),
|
||||
ejabberd_config:cache_size(global)),
|
||||
CacheMissed = ejabberd_config:get_option(
|
||||
router_cache_missed,
|
||||
opt_type(router_cache_missed),
|
||||
ejabberd_config:cache_missed(global)),
|
||||
LifeTime = case ejabberd_config:get_option(
|
||||
router_cache_life_time,
|
||||
opt_type(router_cache_life_time),
|
||||
ejabberd_config:cache_life_time(global)) of
|
||||
infinity -> infinity;
|
||||
I -> timer:seconds(I)
|
||||
end,
|
||||
[{max_size, MaxSize}, {cache_missed, CacheMissed}, {life_time, LifeTime}].
|
||||
|
||||
-spec clean_cache(node()) -> ok.
|
||||
clean_cache(Node) ->
|
||||
ets_cache:filter(
|
||||
?ROUTES_CACHE,
|
||||
fun(_, error) ->
|
||||
false;
|
||||
(routes, _) ->
|
||||
false;
|
||||
({route, _}, {ok, Rs}) ->
|
||||
not lists:any(
|
||||
fun(#route{pid = Pid}) ->
|
||||
node(Pid) == Node
|
||||
end, Rs)
|
||||
end).
|
||||
|
||||
-spec clean_cache() -> ok.
|
||||
clean_cache() ->
|
||||
ejabberd_cluster:eval_everywhere(?MODULE, clean_cache, [node()]).
|
||||
|
||||
opt_type(domain_balancing) ->
|
||||
fun (random) -> random;
|
||||
(source) -> source;
|
||||
@ -376,6 +498,14 @@ opt_type(domain_balancing) ->
|
||||
opt_type(domain_balancing_component_number) ->
|
||||
fun (N) when is_integer(N), N > 1 -> N end;
|
||||
opt_type(router_db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
|
||||
opt_type(O) when O == router_use_cache; O == router_cache_missed ->
|
||||
fun(B) when is_boolean(B) -> B end;
|
||||
opt_type(O) when O == router_cache_size; O == router_cache_life_time ->
|
||||
fun(I) when is_integer(I), I>0 -> I;
|
||||
(unlimited) -> infinity;
|
||||
(infinity) -> infinity
|
||||
end;
|
||||
opt_type(_) ->
|
||||
[domain_balancing, domain_balancing_component_number,
|
||||
router_db_type].
|
||||
router_db_type, router_use_cache, router_cache_size,
|
||||
router_cache_missed, router_cache_life_time].
|
||||
|
@ -25,8 +25,7 @@
|
||||
|
||||
%% API
|
||||
-export([init/0, register_route/5, unregister_route/3, find_routes/1,
|
||||
host_of_route/1, is_my_route/1, is_my_host/1, get_all_routes/0,
|
||||
find_routes/0]).
|
||||
get_all_routes/0, use_cache/0]).
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_cast/2, handle_call/3, handle_info/2,
|
||||
terminate/2, code_change/3, start_link/0]).
|
||||
@ -54,6 +53,9 @@ init() ->
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
use_cache() ->
|
||||
false.
|
||||
|
||||
register_route(Domain, ServerHost, LocalHint, undefined, Pid) ->
|
||||
F = fun () ->
|
||||
mnesia:write(#route{domain = Domain,
|
||||
@ -124,37 +126,15 @@ unregister_route(Domain, _, Pid) ->
|
||||
transaction(F).
|
||||
|
||||
find_routes(Domain) ->
|
||||
mnesia:dirty_read(route, Domain).
|
||||
|
||||
host_of_route(Domain) ->
|
||||
case mnesia:dirty_read(route, Domain) of
|
||||
[#route{server_host = ServerHost}|_] ->
|
||||
{ok, ServerHost};
|
||||
[] ->
|
||||
error
|
||||
end.
|
||||
|
||||
is_my_route(Domain) ->
|
||||
mnesia:dirty_read(route, Domain) /= [].
|
||||
|
||||
is_my_host(Domain) ->
|
||||
case mnesia:dirty_read(route, Domain) of
|
||||
[#route{server_host = Host}|_] ->
|
||||
Host == Domain;
|
||||
[] ->
|
||||
false
|
||||
end.
|
||||
{ok, mnesia:dirty_read(route, Domain)}.
|
||||
|
||||
get_all_routes() ->
|
||||
mnesia:dirty_select(
|
||||
route,
|
||||
ets:fun2ms(
|
||||
fun(#route{domain = Domain, server_host = ServerHost})
|
||||
when Domain /= ServerHost -> Domain
|
||||
end)).
|
||||
|
||||
find_routes() ->
|
||||
ets:tab2list(route).
|
||||
{ok, mnesia:dirty_select(
|
||||
route,
|
||||
ets:fun2ms(
|
||||
fun(#route{domain = Domain, server_host = ServerHost})
|
||||
when Domain /= ServerHost -> Domain
|
||||
end))}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
@ -227,7 +207,7 @@ transaction(F) ->
|
||||
ok;
|
||||
{aborted, Reason} ->
|
||||
?ERROR_MSG("Mnesia transaction failed: ~p", [Reason]),
|
||||
{error, Reason}
|
||||
{error, db_failure}
|
||||
end.
|
||||
|
||||
-spec update_tables() -> ok.
|
||||
|
@ -22,23 +22,37 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(ejabberd_router_redis).
|
||||
-behaviour(ejabberd_router).
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([init/0, register_route/5, unregister_route/3, find_routes/1,
|
||||
host_of_route/1, is_my_route/1, is_my_host/1, get_all_routes/0,
|
||||
find_routes/0]).
|
||||
get_all_routes/0]).
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_cast/2, handle_call/3, handle_info/2,
|
||||
terminate/2, code_change/3, start_link/0]).
|
||||
|
||||
-include("ejabberd.hrl").
|
||||
-include("logger.hrl").
|
||||
-include("ejabberd_router.hrl").
|
||||
|
||||
-define(ROUTES_KEY, "ejabberd:routes").
|
||||
-record(state, {}).
|
||||
|
||||
-define(ROUTES_KEY, <<"ejabberd:routes">>).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
init() ->
|
||||
clean_table().
|
||||
Spec = {?MODULE, {?MODULE, start_link, []},
|
||||
transient, 5000, worker, [?MODULE]},
|
||||
case supervisor:start_child(ejabberd_backend_sup, Spec) of
|
||||
{ok, _Pid} -> ok;
|
||||
Err -> Err
|
||||
end.
|
||||
|
||||
-spec start_link() -> {ok, pid()} | {error, any()}.
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
register_route(Domain, ServerHost, LocalHint, _, Pid) ->
|
||||
DomKey = domain_key(Domain),
|
||||
@ -83,47 +97,48 @@ find_routes(Domain) ->
|
||||
DomKey = domain_key(Domain),
|
||||
case ejabberd_redis:hgetall(DomKey) of
|
||||
{ok, Vals} ->
|
||||
decode_routes(Domain, Vals);
|
||||
{error, _} ->
|
||||
[]
|
||||
end.
|
||||
|
||||
host_of_route(Domain) ->
|
||||
DomKey = domain_key(Domain),
|
||||
case ejabberd_redis:hgetall(DomKey) of
|
||||
{ok, [{_Pid, Data}|_]} ->
|
||||
{ServerHost, _} = binary_to_term(Data),
|
||||
{ok, ServerHost};
|
||||
{ok, decode_routes(Domain, Vals)};
|
||||
_ ->
|
||||
error
|
||||
{error, db_failure}
|
||||
end.
|
||||
|
||||
is_my_route(Domain) ->
|
||||
case ejabberd_redis:sismember(?ROUTES_KEY, Domain) of
|
||||
{ok, Bool} ->
|
||||
Bool;
|
||||
{error, _} ->
|
||||
false
|
||||
end.
|
||||
|
||||
is_my_host(Domain) ->
|
||||
{ok, Domain} == host_of_route(Domain).
|
||||
|
||||
get_all_routes() ->
|
||||
case ejabberd_redis:smembers(?ROUTES_KEY) of
|
||||
{ok, Routes} ->
|
||||
Routes;
|
||||
{error, _} ->
|
||||
[]
|
||||
{ok, Routes};
|
||||
_ ->
|
||||
{error, db_failure}
|
||||
end.
|
||||
|
||||
find_routes() ->
|
||||
lists:flatmap(fun find_routes/1, get_all_routes()).
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
init([]) ->
|
||||
clean_table(),
|
||||
{ok, #state{}}.
|
||||
|
||||
handle_call(_Request, _From, State) ->
|
||||
Reply = ok,
|
||||
{reply, Reply, State}.
|
||||
|
||||
handle_cast(_Msg, State) ->
|
||||
{noreply, State}.
|
||||
|
||||
handle_info(Info, State) ->
|
||||
?ERROR_MSG("unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
clean_table() ->
|
||||
?INFO_MSG("Cleaning Redis route entries...", []),
|
||||
lists:foreach(
|
||||
fun(#route{domain = Domain, pid = Pid}) when node(Pid) == node() ->
|
||||
unregister_route(Domain, undefined, Pid);
|
||||
@ -131,6 +146,20 @@ clean_table() ->
|
||||
ok
|
||||
end, find_routes()).
|
||||
|
||||
find_routes() ->
|
||||
case get_all_routes() of
|
||||
{ok, Domains} ->
|
||||
lists:flatmap(
|
||||
fun(Domain) ->
|
||||
case find_routes(Domain) of
|
||||
{ok, Routes} -> Routes;
|
||||
{error, _} -> []
|
||||
end
|
||||
end, Domains);
|
||||
{error, _} ->
|
||||
[]
|
||||
end.
|
||||
|
||||
domain_key(Domain) ->
|
||||
<<"ejabberd:route:", Domain/binary>>.
|
||||
|
||||
|
@ -27,8 +27,7 @@
|
||||
|
||||
%% API
|
||||
-export([init/0, register_route/5, unregister_route/3, find_routes/1,
|
||||
host_of_route/1, is_my_route/1, is_my_host/1, get_all_routes/0,
|
||||
find_routes/0]).
|
||||
get_all_routes/0]).
|
||||
|
||||
-include("ejabberd.hrl").
|
||||
-include("logger.hrl").
|
||||
@ -64,18 +63,22 @@ register_route(Domain, ServerHost, LocalHint, _, Pid) ->
|
||||
ok;
|
||||
Err ->
|
||||
?ERROR_MSG("failed to update 'route' table: ~p", [Err]),
|
||||
{error, Err}
|
||||
{error, db_failure}
|
||||
end.
|
||||
|
||||
unregister_route(Domain, _, Pid) ->
|
||||
PidS = misc:encode_pid(Pid),
|
||||
Node = erlang:atom_to_binary(node(Pid), latin1),
|
||||
ejabberd_sql:sql_query(
|
||||
?MYNAME,
|
||||
?SQL("delete from route where domain=%(Domain)s "
|
||||
"and pid=%(PidS)s and node=%(Node)s")),
|
||||
%% TODO: return meaningful error
|
||||
ok.
|
||||
case ejabberd_sql:sql_query(
|
||||
?MYNAME,
|
||||
?SQL("delete from route where domain=%(Domain)s "
|
||||
"and pid=%(PidS)s and node=%(Node)s")) of
|
||||
{updated, _} ->
|
||||
ok;
|
||||
Err ->
|
||||
?ERROR_MSG("failed to delete from 'route' table: ~p", [Err]),
|
||||
{error, db_failure}
|
||||
end.
|
||||
|
||||
find_routes(Domain) ->
|
||||
case ejabberd_sql:sql_query(
|
||||
@ -83,61 +86,24 @@ find_routes(Domain) ->
|
||||
?SQL("select @(server_host)s, @(node)s, @(pid)s, @(local_hint)s "
|
||||
"from route where domain=%(Domain)s")) of
|
||||
{selected, Rows} ->
|
||||
lists:flatmap(
|
||||
fun(Row) ->
|
||||
row_to_route(Domain, Row)
|
||||
end, Rows);
|
||||
{ok, lists:flatmap(
|
||||
fun(Row) ->
|
||||
row_to_route(Domain, Row)
|
||||
end, Rows)};
|
||||
Err ->
|
||||
?ERROR_MSG("failed to select from 'route' table: ~p", [Err]),
|
||||
{error, Err}
|
||||
{error, db_failure}
|
||||
end.
|
||||
|
||||
host_of_route(Domain) ->
|
||||
case ejabberd_sql:sql_query(
|
||||
?MYNAME,
|
||||
?SQL("select @(server_host)s from route where domain=%(Domain)s")) of
|
||||
{selected, [{ServerHost}|_]} ->
|
||||
{ok, ServerHost};
|
||||
{selected, []} ->
|
||||
error;
|
||||
Err ->
|
||||
?ERROR_MSG("failed to select from 'route' table: ~p", [Err]),
|
||||
error
|
||||
end.
|
||||
|
||||
is_my_route(Domain) ->
|
||||
case host_of_route(Domain) of
|
||||
{ok, _} -> true;
|
||||
_ -> false
|
||||
end.
|
||||
|
||||
is_my_host(Domain) ->
|
||||
{ok, Domain} == host_of_route(Domain).
|
||||
|
||||
get_all_routes() ->
|
||||
case ejabberd_sql:sql_query(
|
||||
?MYNAME,
|
||||
?SQL("select @(domain)s from route where domain <> server_host")) of
|
||||
{selected, Domains} ->
|
||||
[Domain || {Domain} <- Domains];
|
||||
{ok, [Domain || {Domain} <- Domains]};
|
||||
Err ->
|
||||
?ERROR_MSG("failed to select from 'route' table: ~p", [Err]),
|
||||
[]
|
||||
end.
|
||||
|
||||
find_routes() ->
|
||||
case ejabberd_sql:sql_query(
|
||||
?MYNAME,
|
||||
?SQL("select @(domain)s, @(server_host)s, @(node)s, @(pid)s, "
|
||||
"@(local_hint)s from route")) of
|
||||
{selected, Rows} ->
|
||||
lists:flatmap(
|
||||
fun({Domain, ServerHost, Node, Pid, LocalHint}) ->
|
||||
row_to_route(Domain, {ServerHost, Node, Pid, LocalHint})
|
||||
end, Rows);
|
||||
Err ->
|
||||
?ERROR_MSG("failed to select from 'route' table: ~p", [Err]),
|
||||
[]
|
||||
{error, db_failure}
|
||||
end.
|
||||
|
||||
%%%===================================================================
|
||||
|
@ -76,7 +76,9 @@
|
||||
c2s_handle_info/2,
|
||||
host_up/1,
|
||||
host_down/1,
|
||||
make_sid/0
|
||||
make_sid/0,
|
||||
clean_cache/1,
|
||||
config_reloaded/0
|
||||
]).
|
||||
|
||||
-export([init/1, handle_call/3, handle_cast/2,
|
||||
@ -91,13 +93,15 @@
|
||||
-include("ejabberd_sm.hrl").
|
||||
|
||||
-callback init() -> ok | {error, any()}.
|
||||
-callback set_session(#session{}) -> ok.
|
||||
-callback delete_session(binary(), binary(), binary(), sid()) ->
|
||||
{ok, #session{}} | {error, notfound}.
|
||||
-callback set_session(#session{}) -> ok | {error, any()}.
|
||||
-callback delete_session(#session{}) -> ok | {error, any()}.
|
||||
-callback get_sessions() -> [#session{}].
|
||||
-callback get_sessions(binary()) -> [#session{}].
|
||||
-callback get_sessions(binary(), binary()) -> [#session{}].
|
||||
-callback get_sessions(binary(), binary(), binary()) -> [#session{}].
|
||||
-callback get_sessions(binary(), binary()) -> {ok, [#session{}]} | {error, any()}.
|
||||
-callback use_cache(binary()) -> boolean().
|
||||
-callback cache_nodes(binary()) -> [node()].
|
||||
|
||||
-optional_callbacks([use_cache/1, cache_nodes/1]).
|
||||
|
||||
-record(state, {}).
|
||||
|
||||
@ -158,9 +162,12 @@ close_session(SID, User, Server, Resource) ->
|
||||
LServer = jid:nameprep(Server),
|
||||
LResource = jid:resourceprep(Resource),
|
||||
Mod = get_sm_backend(LServer),
|
||||
Info = case Mod:delete_session(LUser, LServer, LResource, SID) of
|
||||
{ok, #session{info = I}} -> I;
|
||||
{error, notfound} -> []
|
||||
Info = case get_sessions(Mod, LUser, LServer, LResource) of
|
||||
[#session{info = I} = Session|_] ->
|
||||
delete_session(Mod, Session),
|
||||
I;
|
||||
[] ->
|
||||
[]
|
||||
end,
|
||||
JID = jid:make(User, Server, Resource),
|
||||
ejabberd_hooks:run(sm_remove_connection_hook,
|
||||
@ -196,14 +203,14 @@ get_user_resources(User, Server) ->
|
||||
LUser = jid:nodeprep(User),
|
||||
LServer = jid:nameprep(Server),
|
||||
Mod = get_sm_backend(LServer),
|
||||
Ss = online(Mod:get_sessions(LUser, LServer)),
|
||||
Ss = online(get_sessions(Mod, LUser, LServer)),
|
||||
[element(3, S#session.usr) || S <- clean_session_list(Ss)].
|
||||
|
||||
-spec get_user_present_resources(binary(), binary()) -> [tuple()].
|
||||
|
||||
get_user_present_resources(LUser, LServer) ->
|
||||
Mod = get_sm_backend(LServer),
|
||||
Ss = online(Mod:get_sessions(LUser, LServer)),
|
||||
Ss = online(get_sessions(Mod, LUser, LServer)),
|
||||
[{S#session.priority, element(3, S#session.usr)}
|
||||
|| S <- clean_session_list(Ss), is_integer(S#session.priority)].
|
||||
|
||||
@ -214,7 +221,7 @@ get_user_ip(User, Server, Resource) ->
|
||||
LServer = jid:nameprep(Server),
|
||||
LResource = jid:resourceprep(Resource),
|
||||
Mod = get_sm_backend(LServer),
|
||||
case online(Mod:get_sessions(LUser, LServer, LResource)) of
|
||||
case online(get_sessions(Mod, LUser, LServer, LResource)) of
|
||||
[] ->
|
||||
undefined;
|
||||
Ss ->
|
||||
@ -227,7 +234,7 @@ get_user_info(User, Server) ->
|
||||
LUser = jid:nodeprep(User),
|
||||
LServer = jid:nameprep(Server),
|
||||
Mod = get_sm_backend(LServer),
|
||||
Ss = online(Mod:get_sessions(LUser, LServer)),
|
||||
Ss = online(get_sessions(Mod, LUser, LServer)),
|
||||
[{LResource, [{node, node(Pid)}|Info]}
|
||||
|| #session{usr = {_, _, LResource},
|
||||
info = Info,
|
||||
@ -240,7 +247,7 @@ get_user_info(User, Server, Resource) ->
|
||||
LServer = jid:nameprep(Server),
|
||||
LResource = jid:resourceprep(Resource),
|
||||
Mod = get_sm_backend(LServer),
|
||||
case online(Mod:get_sessions(LUser, LServer, LResource)) of
|
||||
case online(get_sessions(Mod, LUser, LServer, LResource)) of
|
||||
[] ->
|
||||
offline;
|
||||
Ss ->
|
||||
@ -288,7 +295,7 @@ get_session_pid(User, Server, Resource) ->
|
||||
LServer = jid:nameprep(Server),
|
||||
LResource = jid:resourceprep(Resource),
|
||||
Mod = get_sm_backend(LServer),
|
||||
case online(Mod:get_sessions(LUser, LServer, LResource)) of
|
||||
case online(get_sessions(Mod, LUser, LServer, LResource)) of
|
||||
[#session{sid = {_, Pid}}] -> Pid;
|
||||
_ -> none
|
||||
end.
|
||||
@ -309,7 +316,7 @@ get_offline_info(Time, User, Server, Resource) ->
|
||||
LServer = jid:nameprep(Server),
|
||||
LResource = jid:resourceprep(Resource),
|
||||
Mod = get_sm_backend(LServer),
|
||||
case Mod:get_sessions(LUser, LServer, LResource) of
|
||||
case get_sessions(Mod, LUser, LServer, LResource) of
|
||||
[#session{sid = {Time, _}, info = Info}] ->
|
||||
case proplists:get_bool(offline, Info) of
|
||||
true ->
|
||||
@ -326,7 +333,7 @@ get_offline_info(Time, User, Server, Resource) ->
|
||||
dirty_get_sessions_list() ->
|
||||
lists:flatmap(
|
||||
fun(Mod) ->
|
||||
[S#session.usr || S <- online(Mod:get_sessions())]
|
||||
[S#session.usr || S <- online(get_sessions(Mod))]
|
||||
end, get_sm_backends()).
|
||||
|
||||
-spec dirty_get_my_sessions_list() -> [#session{}].
|
||||
@ -334,7 +341,7 @@ dirty_get_sessions_list() ->
|
||||
dirty_get_my_sessions_list() ->
|
||||
lists:flatmap(
|
||||
fun(Mod) ->
|
||||
[S || S <- online(Mod:get_sessions()),
|
||||
[S || S <- online(get_sessions(Mod)),
|
||||
node(element(2, S#session.sid)) == node()]
|
||||
end, get_sm_backends()).
|
||||
|
||||
@ -343,14 +350,14 @@ dirty_get_my_sessions_list() ->
|
||||
get_vh_session_list(Server) ->
|
||||
LServer = jid:nameprep(Server),
|
||||
Mod = get_sm_backend(LServer),
|
||||
[S#session.usr || S <- online(Mod:get_sessions(LServer))].
|
||||
[S#session.usr || S <- online(get_sessions(Mod, LServer))].
|
||||
|
||||
-spec get_all_pids() -> [pid()].
|
||||
|
||||
get_all_pids() ->
|
||||
lists:flatmap(
|
||||
fun(Mod) ->
|
||||
[element(2, S#session.sid) || S <- online(Mod:get_sessions())]
|
||||
[element(2, S#session.sid) || S <- online(get_sessions(Mod))]
|
||||
end, get_sm_backends()).
|
||||
|
||||
-spec get_vh_session_number(binary()) -> non_neg_integer().
|
||||
@ -358,7 +365,7 @@ get_all_pids() ->
|
||||
get_vh_session_number(Server) ->
|
||||
LServer = jid:nameprep(Server),
|
||||
Mod = get_sm_backend(LServer),
|
||||
length(online(Mod:get_sessions(LServer))).
|
||||
length(online(get_sessions(Mod, LServer))).
|
||||
|
||||
-spec register_iq_handler(binary(), binary(), atom(), atom(), list()) -> ok.
|
||||
|
||||
@ -387,16 +394,23 @@ c2s_handle_info(#{lang := Lang} = State, {exit, Reason}) ->
|
||||
c2s_handle_info(State, _) ->
|
||||
State.
|
||||
|
||||
-spec config_reloaded() -> ok.
|
||||
config_reloaded() ->
|
||||
init_cache().
|
||||
|
||||
%%====================================================================
|
||||
%% gen_server callbacks
|
||||
%%====================================================================
|
||||
|
||||
init([]) ->
|
||||
process_flag(trap_exit, true),
|
||||
init_cache(),
|
||||
lists:foreach(fun(Mod) -> Mod:init() end, get_sm_backends()),
|
||||
clean_cache(),
|
||||
ets:new(sm_iqtable, [named_table, public, {read_concurrency, true}]),
|
||||
ejabberd_hooks:add(host_up, ?MODULE, host_up, 50),
|
||||
ejabberd_hooks:add(host_down, ?MODULE, host_down, 60),
|
||||
ejabberd_hooks:add(config_reloaded, ?MODULE, config_reloaded, 50),
|
||||
lists:foreach(fun host_up/1, ?MYHOSTS),
|
||||
ejabberd_commands:register_commands(get_commands_spec()),
|
||||
{ok, #state{}}.
|
||||
@ -432,6 +446,7 @@ terminate(_Reason, _State) ->
|
||||
lists:foreach(fun host_down/1, ?MYHOSTS),
|
||||
ejabberd_hooks:delete(host_up, ?MODULE, host_up, 50),
|
||||
ejabberd_hooks:delete(host_down, ?MODULE, host_down, 60),
|
||||
ejabberd_hooks:delete(config_reloaded, ?MODULE, config_reloaded, 50),
|
||||
ejabberd_commands:unregister_commands(get_commands_spec()),
|
||||
ok.
|
||||
|
||||
@ -460,7 +475,7 @@ host_down(Host) ->
|
||||
ejabberd_c2s:send(Pid, xmpp:serr_system_shutdown());
|
||||
(_) ->
|
||||
ok
|
||||
end, Mod:get_sessions(Host)),
|
||||
end, get_sessions(Mod, Host)),
|
||||
ejabberd_hooks:delete(c2s_handle_info, Host,
|
||||
ejabberd_sm, c2s_handle_info, 50),
|
||||
ejabberd_hooks:delete(roster_in_subscription, Host,
|
||||
@ -472,7 +487,7 @@ host_down(Host) ->
|
||||
ejabberd_c2s:host_down(Host).
|
||||
|
||||
-spec set_session(sid(), binary(), binary(), binary(),
|
||||
prio(), info()) -> ok.
|
||||
prio(), info()) -> ok | {error, any()}.
|
||||
|
||||
set_session(SID, User, Server, Resource, Priority, Info) ->
|
||||
LUser = jid:nodeprep(User),
|
||||
@ -481,8 +496,69 @@ set_session(SID, User, Server, Resource, Priority, Info) ->
|
||||
US = {LUser, LServer},
|
||||
USR = {LUser, LServer, LResource},
|
||||
Mod = get_sm_backend(LServer),
|
||||
Mod:set_session(#session{sid = SID, usr = USR, us = US,
|
||||
priority = Priority, info = Info}).
|
||||
case Mod:set_session(#session{sid = SID, usr = USR, us = US,
|
||||
priority = Priority, info = Info}) of
|
||||
ok ->
|
||||
case use_cache(Mod, LServer) of
|
||||
true ->
|
||||
ets_cache:delete(?SM_CACHE, {LUser, LServer},
|
||||
cache_nodes(Mod, LServer));
|
||||
false ->
|
||||
ok
|
||||
end;
|
||||
{error, _} = Err ->
|
||||
Err
|
||||
end.
|
||||
|
||||
-spec get_sessions(module()) -> [#session{}].
|
||||
get_sessions(Mod) ->
|
||||
Mod:get_sessions().
|
||||
|
||||
-spec get_sessions(module(), binary()) -> [#session{}].
|
||||
get_sessions(Mod, LServer) ->
|
||||
Mod:get_sessions(LServer).
|
||||
|
||||
-spec get_sessions(module(), binary(), binary()) -> [#session{}].
|
||||
get_sessions(Mod, LUser, LServer) ->
|
||||
case use_cache(Mod, LServer) of
|
||||
true ->
|
||||