mirror of
https://github.com/processone/ejabberd.git
synced 2024-11-22 16:20:52 +01:00
Use pg2 from R14B in systems with older versions (EJAB-1349)
pg2_backport.erl is a copy of pg2.erl from Erlang/OTP R14B. That module is used in ejabberd installations where an OTP previous to R14 is installed.
This commit is contained in:
parent
33116be0ae
commit
2aa56cd86e
@ -43,6 +43,12 @@
|
||||
|
||||
-record(state, {groups = []}).
|
||||
|
||||
-ifdef(SSL40).
|
||||
-define(PG2, pg2).
|
||||
-else.
|
||||
-define(PG2, pg2_backport).
|
||||
-endif.
|
||||
|
||||
%%====================================================================
|
||||
%% API
|
||||
%%====================================================================
|
||||
@ -64,20 +70,20 @@ start_link() ->
|
||||
|
||||
join(Name) ->
|
||||
PG = {?MODULE, Name},
|
||||
pg2:create(PG),
|
||||
pg2:join(PG, whereis(?MODULE)).
|
||||
?PG2:create(PG),
|
||||
?PG2:join(PG, whereis(?MODULE)).
|
||||
|
||||
leave(Name) ->
|
||||
PG = {?MODULE, Name},
|
||||
pg2:leave(PG, whereis(?MODULE)).
|
||||
?PG2:leave(PG, whereis(?MODULE)).
|
||||
|
||||
get_members(Name) ->
|
||||
PG = {?MODULE, Name},
|
||||
[node(P) || P <- pg2:get_members(PG)].
|
||||
[node(P) || P <- ?PG2:get_members(PG)].
|
||||
|
||||
get_closest_node(Name) ->
|
||||
PG = {?MODULE, Name},
|
||||
node(pg2:get_closest_pid(PG)).
|
||||
node(?PG2:get_closest_pid(PG)).
|
||||
|
||||
%%====================================================================
|
||||
%% gen_server callbacks
|
||||
|
@ -9,6 +9,7 @@ ASN_FLAGS = -bber_bin +optimize +driver
|
||||
ERLANG_CFLAGS = @ERLANG_CFLAGS@
|
||||
ERLANG_LIBS = @ERLANG_LIBS@
|
||||
|
||||
EFLAGS += @ERLANG_SSLVER@
|
||||
EFLAGS += -I ..
|
||||
EFLAGS += -pz ..
|
||||
|
||||
|
@ -37,6 +37,12 @@
|
||||
|
||||
-include("ejabberd.hrl").
|
||||
|
||||
-ifdef(SSL40).
|
||||
-define(PG2, pg2).
|
||||
-else.
|
||||
-define(PG2, pg2_backport).
|
||||
-endif.
|
||||
|
||||
%%====================================================================
|
||||
%% API
|
||||
%%====================================================================
|
||||
@ -51,14 +57,14 @@ modify_passwd(PoolName, DN, Passwd) ->
|
||||
|
||||
start_link(Name, Hosts, Backups, Port, Rootdn, Passwd, Opts) ->
|
||||
PoolName = make_id(Name),
|
||||
pg2:create(PoolName),
|
||||
?PG2:create(PoolName),
|
||||
lists:foreach(
|
||||
fun(Host) ->
|
||||
ID = erlang:ref_to_list(make_ref()),
|
||||
case catch eldap:start_link(ID, [Host|Backups], Port,
|
||||
Rootdn, Passwd, Opts) of
|
||||
{ok, Pid} ->
|
||||
pg2:join(PoolName, Pid);
|
||||
?PG2:join(PoolName, Pid);
|
||||
_ ->
|
||||
error
|
||||
end
|
||||
@ -68,7 +74,7 @@ start_link(Name, Hosts, Backups, Port, Rootdn, Passwd, Opts) ->
|
||||
%% Internal functions
|
||||
%%====================================================================
|
||||
do_request(Name, {F, Args}) ->
|
||||
case pg2:get_closest_pid(make_id(Name)) of
|
||||
case ?PG2:get_closest_pid(make_id(Name)) of
|
||||
Pid when is_pid(Pid) ->
|
||||
case catch apply(eldap, F, [Pid | Args]) of
|
||||
{'EXIT', {timeout, _}} ->
|
||||
|
381
src/pg2_backport.erl
Normal file
381
src/pg2_backport.erl
Normal file
@ -0,0 +1,381 @@
|
||||
%%
|
||||
%% %CopyrightBegin%
|
||||
%%
|
||||
%% Copyright Ericsson AB 1997-2010. All Rights Reserved.
|
||||
%%
|
||||
%% The contents of this file are subject to the Erlang Public License,
|
||||
%% Version 1.1, (the "License"); you may not use this file except in
|
||||
%% compliance with the License. You should have received a copy of the
|
||||
%% Erlang Public License along with this software. If not, it can be
|
||||
%% retrieved online at http://www.erlang.org/.
|
||||
%%
|
||||
%% Software distributed under the License is distributed on an "AS IS"
|
||||
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
|
||||
%% the License for the specific language governing rights and limitations
|
||||
%% under the License.
|
||||
%%
|
||||
%% %CopyrightEnd%
|
||||
%%
|
||||
-module(pg2_backport).
|
||||
|
||||
-export([create/1, delete/1, join/2, leave/2]).
|
||||
-export([get_members/1, get_local_members/1]).
|
||||
-export([get_closest_pid/1, which_groups/0]).
|
||||
-export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2,
|
||||
terminate/2]).
|
||||
|
||||
%%% As of R13B03 monitors are used instead of links.
|
||||
|
||||
%%%
|
||||
%%% Exported functions
|
||||
%%%
|
||||
|
||||
-spec start_link() -> {'ok', pid()} | {'error', term()}.
|
||||
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||
|
||||
-spec start() -> {'ok', pid()} | {'error', term()}.
|
||||
|
||||
start() ->
|
||||
ensure_started().
|
||||
|
||||
-spec create(term()) -> 'ok'.
|
||||
|
||||
create(Name) ->
|
||||
ensure_started(),
|
||||
case ets:member(pg2_table, {group, Name}) of
|
||||
false ->
|
||||
global:trans({{?MODULE, Name}, self()},
|
||||
fun() ->
|
||||
gen_server:multi_call(?MODULE, {create, Name})
|
||||
end),
|
||||
ok;
|
||||
true ->
|
||||
ok
|
||||
end.
|
||||
|
||||
-type name() :: term().
|
||||
|
||||
-spec delete(name()) -> 'ok'.
|
||||
|
||||
delete(Name) ->
|
||||
ensure_started(),
|
||||
global:trans({{?MODULE, Name}, self()},
|
||||
fun() ->
|
||||
gen_server:multi_call(?MODULE, {delete, Name})
|
||||
end),
|
||||
ok.
|
||||
|
||||
-spec join(name(), pid()) -> 'ok' | {'error', {'no_such_group', term()}}.
|
||||
|
||||
join(Name, Pid) when is_pid(Pid) ->
|
||||
ensure_started(),
|
||||
case ets:member(pg2_table, {group, Name}) of
|
||||
false ->
|
||||
{error, {no_such_group, Name}};
|
||||
true ->
|
||||
global:trans({{?MODULE, Name}, self()},
|
||||
fun() ->
|
||||
gen_server:multi_call(?MODULE,
|
||||
{join, Name, Pid})
|
||||
end),
|
||||
ok
|
||||
end.
|
||||
|
||||
-spec leave(name(), pid()) -> 'ok' | {'error', {'no_such_group', name()}}.
|
||||
|
||||
leave(Name, Pid) when is_pid(Pid) ->
|
||||
ensure_started(),
|
||||
case ets:member(pg2_table, {group, Name}) of
|
||||
false ->
|
||||
{error, {no_such_group, Name}};
|
||||
true ->
|
||||
global:trans({{?MODULE, Name}, self()},
|
||||
fun() ->
|
||||
gen_server:multi_call(?MODULE,
|
||||
{leave, Name, Pid})
|
||||
end),
|
||||
ok
|
||||
end.
|
||||
|
||||
-type get_members_ret() :: [pid()] | {'error', {'no_such_group', name()}}.
|
||||
|
||||
-spec get_members(name()) -> get_members_ret().
|
||||
|
||||
get_members(Name) ->
|
||||
ensure_started(),
|
||||
case ets:member(pg2_table, {group, Name}) of
|
||||
true ->
|
||||
group_members(Name);
|
||||
false ->
|
||||
{error, {no_such_group, Name}}
|
||||
end.
|
||||
|
||||
-spec get_local_members(name()) -> get_members_ret().
|
||||
|
||||
get_local_members(Name) ->
|
||||
ensure_started(),
|
||||
case ets:member(pg2_table, {group, Name}) of
|
||||
true ->
|
||||
local_group_members(Name);
|
||||
false ->
|
||||
{error, {no_such_group, Name}}
|
||||
end.
|
||||
|
||||
-spec which_groups() -> [name()].
|
||||
|
||||
which_groups() ->
|
||||
ensure_started(),
|
||||
all_groups().
|
||||
|
||||
-type gcp_error_reason() :: {'no_process', term()} | {'no_such_group', term()}.
|
||||
|
||||
-spec get_closest_pid(term()) -> pid() | {'error', gcp_error_reason()}.
|
||||
|
||||
get_closest_pid(Name) ->
|
||||
case get_local_members(Name) of
|
||||
[Pid] ->
|
||||
Pid;
|
||||
[] ->
|
||||
{_,_,X} = erlang:now(),
|
||||
case get_members(Name) of
|
||||
[] -> {error, {no_process, Name}};
|
||||
Members ->
|
||||
lists:nth((X rem length(Members))+1, Members)
|
||||
end;
|
||||
Members when is_list(Members) ->
|
||||
{_,_,X} = erlang:now(),
|
||||
lists:nth((X rem length(Members))+1, Members);
|
||||
Else ->
|
||||
Else
|
||||
end.
|
||||
|
||||
%%%
|
||||
%%% Callback functions from gen_server
|
||||
%%%
|
||||
|
||||
-record(state, {}).
|
||||
|
||||
-spec init([]) -> {'ok', #state{}}.
|
||||
|
||||
init([]) ->
|
||||
Ns = nodes(),
|
||||
net_kernel:monitor_nodes(true),
|
||||
lists:foreach(fun(N) ->
|
||||
{?MODULE, N} ! {new_pg2, node()},
|
||||
self() ! {nodeup, N}
|
||||
end, Ns),
|
||||
pg2_table = ets:new(pg2_table, [ordered_set, protected, named_table]),
|
||||
{ok, #state{}}.
|
||||
|
||||
-type call() :: {'create', name()}
|
||||
| {'delete', name()}
|
||||
| {'join', name(), pid()}
|
||||
| {'leave', name(), pid()}.
|
||||
|
||||
-spec handle_call(call(), _, #state{}) ->
|
||||
{'reply', 'ok', #state{}}.
|
||||
|
||||
handle_call({create, Name}, _From, S) ->
|
||||
assure_group(Name),
|
||||
{reply, ok, S};
|
||||
handle_call({join, Name, Pid}, _From, S) ->
|
||||
ets:member(pg2_table, {group, Name}) andalso join_group(Name, Pid),
|
||||
{reply, ok, S};
|
||||
handle_call({leave, Name, Pid}, _From, S) ->
|
||||
ets:member(pg2_table, {group, Name}) andalso leave_group(Name, Pid),
|
||||
{reply, ok, S};
|
||||
handle_call({delete, Name}, _From, S) ->
|
||||
delete_group(Name),
|
||||
{reply, ok, S};
|
||||
handle_call(Request, From, S) ->
|
||||
error_logger:warning_msg("The pg2 server received an unexpected message:\n"
|
||||
"handle_call(~p, ~p, _)\n",
|
||||
[Request, From]),
|
||||
{noreply, S}.
|
||||
|
||||
-type all_members() :: [[name(),...]].
|
||||
-type cast() :: {'exchange', node(), all_members()}
|
||||
| {'del_member', name(), pid()}.
|
||||
|
||||
-spec handle_cast(cast(), #state{}) -> {'noreply', #state{}}.
|
||||
|
||||
handle_cast({exchange, _Node, List}, S) ->
|
||||
store(List),
|
||||
{noreply, S};
|
||||
handle_cast(_, S) ->
|
||||
%% Ignore {del_member, Name, Pid}.
|
||||
{noreply, S}.
|
||||
|
||||
-spec handle_info(tuple(), #state{}) -> {'noreply', #state{}}.
|
||||
|
||||
handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) ->
|
||||
member_died(MonitorRef),
|
||||
{noreply, S};
|
||||
handle_info({nodeup, Node}, S) ->
|
||||
gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
|
||||
{noreply, S};
|
||||
handle_info({new_pg2, Node}, S) ->
|
||||
gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}),
|
||||
{noreply, S};
|
||||
handle_info(_, S) ->
|
||||
{noreply, S}.
|
||||
|
||||
-spec terminate(term(), #state{}) -> 'ok'.
|
||||
|
||||
terminate(_Reason, _S) ->
|
||||
true = ets:delete(pg2_table),
|
||||
ok.
|
||||
|
||||
%%%
|
||||
%%% Local functions
|
||||
%%%
|
||||
|
||||
%%% One ETS table, pg2_table, is used for bookkeeping. The type of the
|
||||
%%% table is ordered_set, and the fast matching of partially
|
||||
%%% instantiated keys is used extensively.
|
||||
%%%
|
||||
%%% {{group, Name}}
|
||||
%%% Process group Name.
|
||||
%%% {{ref, Pid}, RPid, MonitorRef, Counter}
|
||||
%%% {{ref, MonitorRef}, Pid}
|
||||
%%% Each process has one monitor. Sometimes a process is spawned to
|
||||
%%% monitor the pid (RPid). Counter is incremented when the Pid joins
|
||||
%%% some group.
|
||||
%%% {{member, Name, Pid}, GroupCounter}
|
||||
%%% {{local_member, Name, Pid}}
|
||||
%%% Pid is a member of group Name, GroupCounter is incremented when the
|
||||
%%% Pid joins the group Name.
|
||||
%%% {{pid, Pid, Name}}
|
||||
%%% Pid is a member of group Name.
|
||||
|
||||
store(List) ->
|
||||
_ = [(assure_group(Name)
|
||||
andalso
|
||||
[join_group(Name, P) || P <- Members -- group_members(Name)]) ||
|
||||
[Name, Members] <- List],
|
||||
ok.
|
||||
|
||||
assure_group(Name) ->
|
||||
Key = {group, Name},
|
||||
ets:member(pg2_table, Key) orelse true =:= ets:insert(pg2_table, {Key}).
|
||||
|
||||
delete_group(Name) ->
|
||||
_ = [leave_group(Name, Pid) || Pid <- group_members(Name)],
|
||||
true = ets:delete(pg2_table, {group, Name}),
|
||||
ok.
|
||||
|
||||
member_died(Ref) ->
|
||||
[{{ref, Ref}, Pid}] = ets:lookup(pg2_table, {ref, Ref}),
|
||||
Names = member_groups(Pid),
|
||||
_ = [leave_group(Name, P) ||
|
||||
Name <- Names,
|
||||
P <- member_in_group(Pid, Name)],
|
||||
%% Kept for backward compatibility with links. Can be removed, eventually.
|
||||
_ = [gen_server:abcast(nodes(), ?MODULE, {del_member, Name, Pid}) ||
|
||||
Name <- Names],
|
||||
ok.
|
||||
|
||||
join_group(Name, Pid) ->
|
||||
Ref_Pid = {ref, Pid},
|
||||
try _ = ets:update_counter(pg2_table, Ref_Pid, {4, +1})
|
||||
catch _:_ ->
|
||||
{RPid, Ref} = do_monitor(Pid),
|
||||
true = ets:insert(pg2_table, {Ref_Pid, RPid, Ref, 1}),
|
||||
true = ets:insert(pg2_table, {{ref, Ref}, Pid})
|
||||
end,
|
||||
Member_Name_Pid = {member, Name, Pid},
|
||||
try _ = ets:update_counter(pg2_table, Member_Name_Pid, {2, +1})
|
||||
catch _:_ ->
|
||||
true = ets:insert(pg2_table, {Member_Name_Pid, 1}),
|
||||
_ = [ets:insert(pg2_table, {{local_member, Name, Pid}}) ||
|
||||
node(Pid) =:= node()],
|
||||
true = ets:insert(pg2_table, {{pid, Pid, Name}})
|
||||
end.
|
||||
|
||||
leave_group(Name, Pid) ->
|
||||
Member_Name_Pid = {member, Name, Pid},
|
||||
try ets:update_counter(pg2_table, Member_Name_Pid, {2, -1}) of
|
||||
N ->
|
||||
if
|
||||
N =:= 0 ->
|
||||
true = ets:delete(pg2_table, {pid, Pid, Name}),
|
||||
_ = [ets:delete(pg2_table, {local_member, Name, Pid}) ||
|
||||
node(Pid) =:= node()],
|
||||
true = ets:delete(pg2_table, Member_Name_Pid);
|
||||
true ->
|
||||
ok
|
||||
end,
|
||||
Ref_Pid = {ref, Pid},
|
||||
case ets:update_counter(pg2_table, Ref_Pid, {4, -1}) of
|
||||
0 ->
|
||||
[{Ref_Pid,RPid,Ref,0}] = ets:lookup(pg2_table, Ref_Pid),
|
||||
true = ets:delete(pg2_table, {ref, Ref}),
|
||||
true = ets:delete(pg2_table, Ref_Pid),
|
||||
true = erlang:demonitor(Ref, [flush]),
|
||||
kill_monitor_proc(RPid, Pid);
|
||||
_ ->
|
||||
ok
|
||||
end
|
||||
catch _:_ ->
|
||||
ok
|
||||
end.
|
||||
|
||||
all_members() ->
|
||||
[[G, group_members(G)] || G <- all_groups()].
|
||||
|
||||
group_members(Name) ->
|
||||
[P ||
|
||||
[P, N] <- ets:match(pg2_table, {{member, Name, '$1'},'$2'}),
|
||||
_ <- lists:seq(1, N)].
|
||||
|
||||
local_group_members(Name) ->
|
||||
[P ||
|
||||
[Pid] <- ets:match(pg2_table, {{local_member, Name, '$1'}}),
|
||||
P <- member_in_group(Pid, Name)].
|
||||
|
||||
member_in_group(Pid, Name) ->
|
||||
case ets:lookup(pg2_table, {member, Name, Pid}) of
|
||||
[] -> [];
|
||||
[{{member, Name, Pid}, N}] ->
|
||||
lists:duplicate(N, Pid)
|
||||
end.
|
||||
|
||||
member_groups(Pid) ->
|
||||
[Name || [Name] <- ets:match(pg2_table, {{pid, Pid, '$1'}})].
|
||||
|
||||
all_groups() ->
|
||||
[N || [N] <- ets:match(pg2_table, {{group,'$1'}})].
|
||||
|
||||
ensure_started() ->
|
||||
case whereis(?MODULE) of
|
||||
undefined ->
|
||||
C = {pg2, {?MODULE, start_link, []}, permanent,
|
||||
1000, worker, [?MODULE]},
|
||||
supervisor:start_child(kernel_safe_sup, C);
|
||||
Pg2Pid ->
|
||||
{ok, Pg2Pid}
|
||||
end.
|
||||
|
||||
|
||||
kill_monitor_proc(RPid, Pid) ->
|
||||
RPid =:= Pid orelse exit(RPid, kill).
|
||||
|
||||
%% When/if erlang:monitor() returns before trying to connect to the
|
||||
%% other node this function can be removed.
|
||||
do_monitor(Pid) ->
|
||||
case (node(Pid) =:= node()) orelse lists:member(node(Pid), nodes()) of
|
||||
true ->
|
||||
%% Assume the node is still up
|
||||
{Pid, erlang:monitor(process, Pid)};
|
||||
false ->
|
||||
F = fun() ->
|
||||
Ref = erlang:monitor(process, Pid),
|
||||
receive
|
||||
{'DOWN', Ref, process, Pid, _Info} ->
|
||||
exit(normal)
|
||||
end
|
||||
end,
|
||||
erlang:spawn_monitor(F)
|
||||
end.
|
Loading…
Reference in New Issue
Block a user