improve send last published items spawning

SVN Revision: 2036
This commit is contained in:
Christophe Romain 2009-04-22 22:19:41 +00:00
parent 6acf7fb5ec
commit eab7a509f9
2 changed files with 136 additions and 69 deletions

View File

@ -1,3 +1,9 @@
2009-04-23 Christophe Romain <christophe.romain@process-one.net>
* src/mod_pubsub/mod_pubsub.erl: improve send last published items
(not spawned as much) and allow to send last PEP items of our offline
contacts if configured for (fix discussion issue on standars ML)
2009-04-22 Badlop <badlop@process-one.net>
* src/ejabberd.cfg.example: Fix English typos. Fix line length:

View File

@ -108,6 +108,10 @@
code_change/3
]).
%% calls for parallel sending of last items
-export([send_loop/1
]).
-define(PROCNAME, ejabberd_mod_pubsub).
-define(PLUGIN_PREFIX, "node_").
-define(TREE_PREFIX, "nodetree_").
@ -116,8 +120,10 @@
host,
access,
pep_mapping = [],
pep_sendlast_offline,
nodetree = ?STDTREE,
plugins = [?STDNODE]}).
plugins = [?STDNODE],
send_loop}).
%%====================================================================
%% API
@ -157,6 +163,7 @@ init([ServerHost, Opts]) ->
?DEBUG("pubsub init ~p ~p",[ServerHost,Opts]),
Host = gen_mod:get_opt_host(ServerHost, Opts, "pubsub.@HOST@"),
Access = gen_mod:get_opt(access_createnode, Opts, all),
PepOffline = gen_mod:get_opt(pep_sendlast_offline, Opts, false),
IQDisc = gen_mod:get_opt(iqdisc, Opts, one_queue),
mod_disco:register_feature(ServerHost, ?NS_PUBSUB),
ejabberd_hooks:add(disco_sm_identity, ServerHost, ?MODULE, disco_sm_identity, 75),
@ -188,12 +195,15 @@ init([ServerHost, Opts]) ->
ets:insert(gen_mod:get_module_proc(ServerHost, pubsub_state), {plugins, Plugins}),
ets:insert(gen_mod:get_module_proc(ServerHost, pubsub_state), {pep_mapping, PepMapping}),
init_nodes(Host, ServerHost),
{ok, #state{host = Host,
State = #state{host = Host,
server_host = ServerHost,
access = Access,
pep_mapping = PepMapping,
pep_sendlast_offline = PepOffline,
nodetree = NodeTree,
plugins = Plugins}}.
plugins = Plugins},
SendLoop = spawn(?MODULE, send_loop, [State]), %% TODO supervise that process
{ok, State#state{send_loop = SendLoop}}.
%% @spec (Host, ServerHost, Opts) -> Plugins
%% Host = mod_pubsub:host() Opts = [{Key,Value}]
@ -308,6 +318,113 @@ update_database(Host) ->
ok
end.
send_loop(State) ->
receive
{presence, JID, Pid} ->
Host = State#state.host,
ServerHost = State#state.server_host,
LJID = jlib:jid_tolower(JID),
BJID = jlib:jid_remove_resource(LJID),
%% for each node From is subscribed to
%% and if the node is so configured, send the last published item to From
lists:foreach(fun(Type) ->
{result, Subscriptions} = node_action(Type, get_entity_subscriptions, [Host, JID]),
lists:foreach(
fun({Node, subscribed, SubJID}) ->
if (SubJID == LJID) or (SubJID == BJID) ->
case tree_action(Host, get_node, [Host, Node, JID]) of
#pubsub_node{options = Options} ->
case get_option(Options, send_last_published_item) of
on_sub_and_presence ->
send_items(Host, Node, SubJID, last);
_ ->
ok
end;
_ ->
ok
end;
true ->
% resource not concerned about that subscription
ok
end;
(_) ->
ok
end, Subscriptions)
end, State#state.plugins),
%% and force send the last PEP events published by its offline and local contacts
%% only if pubsub is explicitely configured for that.
%% this is a hack in a sense that PEP should only be based on presence
%% and is not able to "store" events of remote users (via s2s)
%% this makes that hack only work for local domain by now
if State#state.pep_sendlast_offline ->
case catch ejabberd_c2s:get_subscribed(Pid) of
Contacts when is_list(Contacts) ->
{User, Server, Resource} = jlib:jid_tolower(JID),
lists:foreach(
fun({U, S, R}) -> %% local contacts
case ejabberd_sm:get_user_resources(U, S) of
[] -> %% offline
case S of
ServerHost -> %% local contact, so we may have pep items
PeerJID = jlib:make_jid(U, S, R),
handle_cast({presence, User, Server, [Resource], PeerJID}, State);
_ -> %% remote contact, no items available
ok
end;
_ -> %% online
% this is already handled by presence probe
ok
end;
(_) -> %% remote contacts
% we can not do anything in any cases
ok
end, Contacts);
_ ->
ok
end;
true ->
ok
end,
send_loop(State);
{presence, User, Server, Resources, JID} ->
Owner = jlib:jid_remove_resource(jlib:jid_tolower(JID)),
Host = State#state.host,
ServerHost = State#state.server_host,
lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, options = Options}) ->
case get_option(Options, send_last_published_item) of
on_sub_and_presence ->
lists:foreach(fun(Resource) ->
LJID = {User, Server, Resource},
case is_caps_notify(ServerHost, Node, LJID) of
true ->
Subscribed = case get_option(Options, access_model) of
open -> true;
presence -> true;
whitelist -> false; % subscribers are added manually
authorize -> false; % likewise
roster ->
Grps = get_option(Options, roster_groups_allowed, []),
{OU, OS, _} = Owner,
element(2, get_roster_info(OU, OS, LJID, Grps))
end,
if Subscribed ->
send_items(Owner, Node, LJID, last);
true ->
ok
end;
false ->
ok
end
end, Resources);
_ ->
ok
end
end, tree_action(Host, get_nodes, [Owner, JID])),
send_loop(State);
stop ->
ok
end.
%% -------
%% disco hooks handling functions
%%
@ -415,9 +532,9 @@ disco_sm_items(Acc, From, To, Node, _Lang) ->
%% presence hooks handling functions
%%
presence_probe(#jid{luser = User, lserver = Server, lresource = Resource} = JID, JID, _Pid) ->
presence_probe(#jid{luser = User, lserver = Server, lresource = Resource} = JID, JID, Pid) ->
Proc = gen_mod:get_module_proc(Server, ?PROCNAME),
gen_server:cast(Proc, {presence, JID}),
gen_server:cast(Proc, {presence, JID, Pid}),
gen_server:cast(Proc, {presence, User, Server, [Resource], JID});
presence_probe(#jid{luser = User, lserver = Server, lresource = Resource}, #jid{lserver = Host} = JID, _Pid) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
@ -478,72 +595,14 @@ handle_call(stop, _From, State) ->
%% Description: Handling cast messages
%%--------------------------------------------------------------------
%% @private
handle_cast({presence, JID}, State) ->
handle_cast({presence, JID, Pid}, State) ->
%% A new resource is available. send last published items
Host = State#state.host,
LJID = jlib:jid_tolower(JID),
%% for each node From is subscribed to
%% and if the node is so configured, send the last published item to From
spawn(fun() ->
lists:foreach(fun(Type) ->
{result, Subscriptions} = node_action(Type, get_entity_subscriptions, [Host, JID]),
lists:foreach(
fun({Node, subscribed, _SubJID}) ->
case tree_action(Host, get_node, [Host, Node, JID]) of
#pubsub_node{options = Options} ->
case get_option(Options, send_last_published_item) of
on_sub_and_presence ->
send_items(Host, Node, LJID, last);
_ ->
ok
end;
_ ->
ok
end;
(_) ->
ok
end, Subscriptions)
end, State#state.plugins)
end),
State#state.send_loop ! {presence, JID, Pid},
{noreply, State};
handle_cast({presence, User, Server, Resources, JID}, State) ->
%% A new resource is available. send last published PEP items
Owner = jlib:jid_remove_resource(jlib:jid_tolower(JID)),
Host = State#state.host,
ServerHost = State#state.server_host,
spawn(fun() ->
lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, options = Options}) ->
case get_option(Options, send_last_published_item) of
on_sub_and_presence ->
lists:foreach(fun(Resource) ->
LJID = {User, Server, Resource},
case is_caps_notify(ServerHost, Node, LJID) of
true ->
Subscribed = case get_option(Options, access_model) of
open -> true;
presence -> true;
whitelist -> false; % subscribers are added manually
authorize -> false; % likewise
roster ->
Grps = get_option(Options, roster_groups_allowed, []),
{OU, OS, _} = Owner,
element(2, get_roster_info(OU, OS, LJID, Grps))
end,
if Subscribed ->
send_items(Owner, Node, LJID, last);
true ->
ok
end;
false ->
ok
end
end, Resources);
_ ->
ok
end
end, tree_action(Host, get_nodes, [Owner, JID]))
end),
State#state.send_loop ! {presence, User, Server, Resources, JID},
{noreply, State};
handle_cast({remove_user, LUser, LServer}, State) ->
@ -600,7 +659,8 @@ handle_info(_Info, State) ->
terminate(_Reason, #state{host = Host,
server_host = ServerHost,
nodetree = TreePlugin,
plugins = Plugins}) ->
plugins = Plugins,
send_loop = SendLoop}) ->
terminate_plugins(Host, ServerHost, Plugins, TreePlugin),
ejabberd_router:unregister_route(Host),
case lists:member(?PEPNODE, Plugins) of
@ -622,6 +682,7 @@ terminate(_Reason, #state{host = Host,
gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHost, ?NS_PUBSUB),
gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHost, ?NS_PUBSUB_OWNER),
mod_disco:unregister_feature(ServerHost, ?NS_PUBSUB),
SendLoop ! stop,
ok.
%%--------------------------------------------------------------------
@ -820,7 +881,7 @@ iq_disco_items(Host, Item, From) ->
%% Note: Multiple Node Discovery not supported (mask on pubsub#type)
%% TODO this code is also back-compatible with pubsub v1.8 (for client issue)
%% TODO make it pubsub v1.12 compliant (breaks client compatibility ?)
%% TODO That is, remove name attribute
%% TODO That is, remove name attribute (or node?, please check)
Action =
fun(#pubsub_node{type = Type}) ->
NodeItems = case node_call(Type, get_items, [Host, Node, From]) of
@ -1563,7 +1624,7 @@ publish_item(Host, ServerHost, Node, Publisher, ItemId, Payload) ->
end
end,
ejabberd_hooks:run(pubsub_publish_item, ServerHost, [ServerHost, Node, Publisher, service_jid(Host), ItemId, Payload]),
Reply = [],
Reply = [], %% TODO EJAB-909
case transaction(Host, Node, Action, sync_dirty) of
{error, ?ERR_ITEM_NOT_FOUND} ->
%% handles auto-create feature