mirror of
https://github.com/processone/ejabberd.git
synced 2024-11-22 16:20:52 +01:00
385 lines
12 KiB
Erlang
385 lines
12 KiB
Erlang
%%
|
|
%% %CopyrightBegin%
|
|
%%
|
|
%% Copyright Ericsson AB 1997-2011. All Rights Reserved.
|
|
%%
|
|
%% The contents of this file are subject to the Erlang Public License,
|
|
%% Version 1.1, (the "License"); you may not use this file except in
|
|
%% compliance with the License. You should have received a copy of the
|
|
%% Erlang Public License along with this software. If not, it can be
|
|
%% retrieved online at http://www.erlang.org/.
|
|
%%
|
|
%% Software distributed under the License is distributed on an "AS IS"
|
|
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
|
%% the License for the specific language governing rights and limitations
|
|
%% under the License.
|
|
%%
|
|
%% %CopyrightEnd%
|
|
%%
|
|
-module(pg2_backport).
|
|
|
|
-export([create/1, delete/1, join/2, leave/2]).
|
|
-export([get_members/1, get_local_members/1]).
|
|
-export([get_closest_pid/1, which_groups/0]).
|
|
-export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2,
|
|
terminate/2]).
|
|
|
|
%%% As of R13B03 monitors are used instead of links.
|
|
|
|
%%%
|
|
%%% Exported functions
|
|
%%%
|
|
|
|
-spec start_link() -> {'ok', pid()} | {'error', term()}.
|
|
|
|
start_link() ->
|
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
|
|
|
-spec start() -> {'ok', pid()} | {'error', term()}.
|
|
|
|
start() ->
|
|
ensure_started().
|
|
|
|
-spec create(term()) -> 'ok'.
|
|
|
|
create(Name) ->
|
|
ensure_started(),
|
|
case ets:member(pg2_table, {group, Name}) of
|
|
false ->
|
|
global:trans({{?MODULE, Name}, self()},
|
|
fun() ->
|
|
gen_server:multi_call(?MODULE, {create, Name})
|
|
end),
|
|
ok;
|
|
true ->
|
|
ok
|
|
end.
|
|
|
|
-type name() :: term().
|
|
|
|
-spec delete(name()) -> 'ok'.
|
|
|
|
delete(Name) ->
|
|
ensure_started(),
|
|
global:trans({{?MODULE, Name}, self()},
|
|
fun() ->
|
|
gen_server:multi_call(?MODULE, {delete, Name})
|
|
end),
|
|
ok.
|
|
|
|
-spec join(name(), pid()) -> 'ok' | {'error', {'no_such_group', term()}}.
|
|
|
|
join(Name, Pid) when is_pid(Pid) ->
|
|
ensure_started(),
|
|
case ets:member(pg2_table, {group, Name}) of
|
|
false ->
|
|
{error, {no_such_group, Name}};
|
|
true ->
|
|
global:trans({{?MODULE, Name}, self()},
|
|
fun() ->
|
|
gen_server:multi_call(?MODULE,
|
|
{join, Name, Pid})
|
|
end),
|
|
ok
|
|
end.
|
|
|
|
-spec leave(name(), pid()) -> 'ok' | {'error', {'no_such_group', name()}}.
|
|
|
|
leave(Name, Pid) when is_pid(Pid) ->
|
|
ensure_started(),
|
|
case ets:member(pg2_table, {group, Name}) of
|
|
false ->
|
|
{error, {no_such_group, Name}};
|
|
true ->
|
|
global:trans({{?MODULE, Name}, self()},
|
|
fun() ->
|
|
gen_server:multi_call(?MODULE,
|
|
{leave, Name, Pid})
|
|
end),
|
|
ok
|
|
end.
|
|
|
|
-type get_members_ret() :: [pid()] | {'error', {'no_such_group', name()}}.
|
|
|
|
-spec get_members(name()) -> get_members_ret().
|
|
|
|
get_members(Name) ->
|
|
ensure_started(),
|
|
case ets:member(pg2_table, {group, Name}) of
|
|
true ->
|
|
group_members(Name);
|
|
false ->
|
|
{error, {no_such_group, Name}}
|
|
end.
|
|
|
|
-spec get_local_members(name()) -> get_members_ret().
|
|
|
|
get_local_members(Name) ->
|
|
ensure_started(),
|
|
case ets:member(pg2_table, {group, Name}) of
|
|
true ->
|
|
local_group_members(Name);
|
|
false ->
|
|
{error, {no_such_group, Name}}
|
|
end.
|
|
|
|
-spec which_groups() -> [name()].
|
|
|
|
which_groups() ->
|
|
ensure_started(),
|
|
all_groups().
|
|
|
|
-type gcp_error_reason() :: {'no_process', term()} | {'no_such_group', term()}.
|
|
|
|
-spec get_closest_pid(term()) -> pid() | {'error', gcp_error_reason()}.
|
|
|
|
get_closest_pid(Name) ->
|
|
case get_local_members(Name) of
|
|
[Pid] ->
|
|
Pid;
|
|
[] ->
|
|
{_,_,X} = erlang:now(),
|
|
case get_members(Name) of
|
|
[] -> {error, {no_process, Name}};
|
|
Members ->
|
|
lists:nth((X rem length(Members))+1, Members)
|
|
end;
|
|
Members when is_list(Members) ->
|
|
{_,_,X} = erlang:now(),
|
|
lists:nth((X rem length(Members))+1, Members);
|
|
Else ->
|
|
Else
|
|
end.
|
|
|
|
%%%
|
|
%%% Callback functions from gen_server
|
|
%%%
|
|
|
|
-record(state, {}).
|
|
|
|
-spec init([]) -> {'ok', #state{}}.
|
|
|
|
init([]) ->
|
|
Ns = nodes(),
|
|
net_kernel:monitor_nodes(true),
|
|
lists:foreach(fun(N) ->
|
|
{?MODULE, N} ! {new_pg2, node()},
|
|
self() ! {nodeup, N}
|
|
end, Ns),
|
|
pg2_table = ets:new(pg2_table, [ordered_set, protected, named_table]),
|
|
{ok, #state{}}.
|
|
|
|
-type call() :: {'create', name()}
|
|
| {'delete', name()}
|
|
| {'join', name(), pid()}
|
|
| {'leave', name(), pid()}.
|
|
|
|
-spec handle_call(call(), _, #state{}) ->
|
|
{'reply', 'ok', #state{}}.
|
|
|
|
handle_call({create, Name}, _From, S) ->
|
|
assure_group(Name),
|
|
{reply, ok, S};
|
|
handle_call({join, Name, Pid}, _From, S) ->
|
|
ets:member(pg2_table, {group, Name}) andalso join_group(Name, Pid),
|
|
{reply, ok, S};
|
|
handle_call({leave, Name, Pid}, _From, S) ->
|
|
ets:member(pg2_table, {group, Name}) andalso leave_group(Name, Pid),
|
|
{reply, ok, S};
|
|
handle_call({delete, Name}, _From, S) ->
|
|
delete_group(Name),
|
|
{reply, ok, S};
|
|
handle_call(Request, From, S) ->
|
|
error_logger:warning_msg("The pg2 server received an unexpected message:\n"
|
|
"handle_call(~p, ~p, _)\n",
|
|
[Request, From]),
|
|
{noreply, S}.
|
|
|
|
-type all_members() :: [[name(),...]].
|
|
-type cast() :: {'exchange', node(), all_members()}
|
|
| {'del_member', name(), pid()}.
|
|
|
|
-spec handle_cast(cast(), #state{}) -> {'noreply', #state{}}.
|
|
|
|
handle_cast({exchange, _Node, List}, S) ->
|
|
store(List),
|
|
{noreply, S};
|
|
handle_cast(_, S) ->
|
|
%% Ignore {del_member, Name, Pid}.
|
|
{noreply, S}.
|
|
|
|
-spec handle_info(tuple(), #state{}) -> {'noreply', #state{}}.
|
|
|
|
handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) ->
|
|
member_died(MonitorRef),
|
|
{noreply, S};
|
|
handle_info({nodeup, Node}, S) ->
|
|
gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
|
|
{noreply, S};
|
|
handle_info({new_pg2, Node}, S) ->
|
|
gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
|
|
{noreply, S};
|
|
handle_info(_, S) ->
|
|
{noreply, S}.
|
|
|
|
-spec terminate(term(), #state{}) -> 'ok'.
|
|
|
|
terminate(_Reason, _S) ->
|
|
true = ets:delete(pg2_table),
|
|
ok.
|
|
|
|
%%%
|
|
%%% Local functions
|
|
%%%
|
|
|
|
%%% One ETS table, pg2_table, is used for bookkeeping. The type of the
|
|
%%% table is ordered_set, and the fast matching of partially
|
|
%%% instantiated keys is used extensively.
|
|
%%%
|
|
%%% {{group, Name}}
|
|
%%% Process group Name.
|
|
%%% {{ref, Pid}, RPid, MonitorRef, Counter}
|
|
%%% {{ref, MonitorRef}, Pid}
|
|
%%% Each process has one monitor. Sometimes a process is spawned to
|
|
%%% monitor the pid (RPid). Counter is incremented when the Pid joins
|
|
%%% some group.
|
|
%%% {{member, Name, Pid}, GroupCounter}
|
|
%%% {{local_member, Name, Pid}}
|
|
%%% Pid is a member of group Name, GroupCounter is incremented when the
|
|
%%% Pid joins the group Name.
|
|
%%% {{pid, Pid, Name}}
|
|
%%% Pid is a member of group Name.
|
|
|
|
store(List) ->
|
|
_ = [(assure_group(Name)
|
|
andalso
|
|
store2(Name, Members)) ||
|
|
[Name, Members] <- List],
|
|
ok.
|
|
store2(Name, Members) ->
|
|
[join_group(Name, P) || P <- Members -- group_members(Name)],
|
|
true.
|
|
|
|
assure_group(Name) ->
|
|
Key = {group, Name},
|
|
ets:member(pg2_table, Key) orelse true =:= ets:insert(pg2_table, {Key}).
|
|
|
|
delete_group(Name) ->
|
|
_ = [leave_group(Name, Pid) || Pid <- group_members(Name)],
|
|
true = ets:delete(pg2_table, {group, Name}),
|
|
ok.
|
|
|
|
member_died(Ref) ->
|
|
[{{ref, Ref}, Pid}] = ets:lookup(pg2_table, {ref, Ref}),
|
|
Names = member_groups(Pid),
|
|
_ = [leave_group(Name, P) ||
|
|
Name <- Names,
|
|
P <- member_in_group(Pid, Name)],
|
|
%% Kept for backward compatibility with links. Can be removed, eventually.
|
|
_ = [gen_server:abcast(nodes(), ?MODULE, {del_member, Name, Pid}) ||
|
|
Name <- Names],
|
|
ok.
|
|
|
|
join_group(Name, Pid) ->
|
|
Ref_Pid = {ref, Pid},
|
|
try _ = ets:update_counter(pg2_table, Ref_Pid, {4, +1})
|
|
catch _:_ ->
|
|
{RPid, Ref} = do_monitor(Pid),
|
|
true = ets:insert(pg2_table, {Ref_Pid, RPid, Ref, 1}),
|
|
true = ets:insert(pg2_table, {{ref, Ref}, Pid})
|
|
end,
|
|
Member_Name_Pid = {member, Name, Pid},
|
|
try _ = ets:update_counter(pg2_table, Member_Name_Pid, {2, +1})
|
|
catch _:_ ->
|
|
true = ets:insert(pg2_table, {Member_Name_Pid, 1}),
|
|
_ = [ets:insert(pg2_table, {{local_member, Name, Pid}}) ||
|
|
node(Pid) =:= node()],
|
|
true = ets:insert(pg2_table, {{pid, Pid, Name}})
|
|
end.
|
|
|
|
leave_group(Name, Pid) ->
|
|
Member_Name_Pid = {member, Name, Pid},
|
|
try ets:update_counter(pg2_table, Member_Name_Pid, {2, -1}) of
|
|
N ->
|
|
if
|
|
N =:= 0 ->
|
|
true = ets:delete(pg2_table, {pid, Pid, Name}),
|
|
_ = [ets:delete(pg2_table, {local_member, Name, Pid}) ||
|
|
node(Pid) =:= node()],
|
|
true = ets:delete(pg2_table, Member_Name_Pid);
|
|
true ->
|
|
ok
|
|
end,
|
|
Ref_Pid = {ref, Pid},
|
|
case ets:update_counter(pg2_table, Ref_Pid, {4, -1}) of
|
|
0 ->
|
|
[{Ref_Pid,RPid,Ref,0}] = ets:lookup(pg2_table, Ref_Pid),
|
|
true = ets:delete(pg2_table, {ref, Ref}),
|
|
true = ets:delete(pg2_table, Ref_Pid),
|
|
true = erlang:demonitor(Ref, [flush]),
|
|
kill_monitor_proc(RPid, Pid);
|
|
_ ->
|
|
ok
|
|
end
|
|
catch _:_ ->
|
|
ok
|
|
end.
|
|
|
|
all_members() ->
|
|
[[G, group_members(G)] || G <- all_groups()].
|
|
|
|
group_members(Name) ->
|
|
[P ||
|
|
[P, N] <- ets:match(pg2_table, {{member, Name, '$1'},'$2'}),
|
|
_ <- lists:seq(1, N)].
|
|
|
|
local_group_members(Name) ->
|
|
[P ||
|
|
[Pid] <- ets:match(pg2_table, {{local_member, Name, '$1'}}),
|
|
P <- member_in_group(Pid, Name)].
|
|
|
|
member_in_group(Pid, Name) ->
|
|
case ets:lookup(pg2_table, {member, Name, Pid}) of
|
|
[] -> [];
|
|
[{{member, Name, Pid}, N}] ->
|
|
lists:duplicate(N, Pid)
|
|
end.
|
|
|
|
member_groups(Pid) ->
|
|
[Name || [Name] <- ets:match(pg2_table, {{pid, Pid, '$1'}})].
|
|
|
|
all_groups() ->
|
|
[N || [N] <- ets:match(pg2_table, {{group,'$1'}})].
|
|
|
|
ensure_started() ->
|
|
case whereis(?MODULE) of
|
|
undefined ->
|
|
C = {pg2, {?MODULE, start_link, []}, permanent,
|
|
1000, worker, [?MODULE]},
|
|
supervisor:start_child(kernel_safe_sup, C);
|
|
Pg2Pid ->
|
|
{ok, Pg2Pid}
|
|
end.
|
|
|
|
|
|
kill_monitor_proc(RPid, Pid) ->
|
|
RPid =:= Pid orelse exit(RPid, kill).
|
|
|
|
%% When/if erlang:monitor() returns before trying to connect to the
|
|
%% other node this function can be removed.
|
|
do_monitor(Pid) ->
|
|
case (node(Pid) =:= node()) orelse lists:member(node(Pid), nodes()) of
|
|
true ->
|
|
%% Assume the node is still up
|
|
{Pid, erlang:monitor(process, Pid)};
|
|
false ->
|
|
F = fun() ->
|
|
Ref = erlang:monitor(process, Pid),
|
|
receive
|
|
{'DOWN', Ref, process, Pid, _Info} ->
|
|
exit(normal)
|
|
end
|
|
end,
|
|
erlang:spawn_monitor(F)
|
|
end.
|