mirror of
https://github.com/processone/ejabberd.git
synced 2024-11-30 16:36:29 +01:00
improve send last published items spawning
SVN Revision: 2037
This commit is contained in:
parent
0141779b9c
commit
1d44abfc8a
@ -1,3 +1,10 @@
|
|||||||
|
|
||||||
|
2009-04-23 Christophe Romain <christophe.romain@process-one.net>
|
||||||
|
|
||||||
|
* src/mod_pubsub/mod_pubsub.erl: improve send last published items
|
||||||
|
(not spawned as much) and allow to send last PEP items of our offline
|
||||||
|
contacts if configured for (fix discussion issue on standars ML)
|
||||||
|
|
||||||
2009-04-22 Badlop <badlop@process-one.net>
|
2009-04-22 Badlop <badlop@process-one.net>
|
||||||
|
|
||||||
* src/ejabberd.cfg.example: Fix English typos. Fix line length:
|
* src/ejabberd.cfg.example: Fix English typos. Fix line length:
|
||||||
|
@ -109,6 +109,10 @@
|
|||||||
code_change/3
|
code_change/3
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
%% calls for parallel sending of last items
|
||||||
|
-export([send_loop/1
|
||||||
|
]).
|
||||||
|
|
||||||
-define(PROCNAME, ejabberd_mod_pubsub).
|
-define(PROCNAME, ejabberd_mod_pubsub).
|
||||||
-define(PLUGIN_PREFIX, "node_").
|
-define(PLUGIN_PREFIX, "node_").
|
||||||
-define(TREE_PREFIX, "nodetree_").
|
-define(TREE_PREFIX, "nodetree_").
|
||||||
@ -117,8 +121,10 @@
|
|||||||
host,
|
host,
|
||||||
access,
|
access,
|
||||||
pep_mapping = [],
|
pep_mapping = [],
|
||||||
|
pep_sendlast_offline,
|
||||||
nodetree = ?STDTREE,
|
nodetree = ?STDTREE,
|
||||||
plugins = [?STDNODE]}).
|
plugins = [?STDNODE],
|
||||||
|
send_loop}).
|
||||||
|
|
||||||
%%====================================================================
|
%%====================================================================
|
||||||
%% API
|
%% API
|
||||||
@ -158,6 +164,7 @@ init([ServerHost, Opts]) ->
|
|||||||
?DEBUG("pubsub init ~p ~p",[ServerHost,Opts]),
|
?DEBUG("pubsub init ~p ~p",[ServerHost,Opts]),
|
||||||
Host = gen_mod:get_opt_host(ServerHost, Opts, "pubsub.@HOST@"),
|
Host = gen_mod:get_opt_host(ServerHost, Opts, "pubsub.@HOST@"),
|
||||||
Access = gen_mod:get_opt(access_createnode, Opts, all),
|
Access = gen_mod:get_opt(access_createnode, Opts, all),
|
||||||
|
PepOffline = gen_mod:get_opt(pep_sendlast_offline, Opts, false),
|
||||||
IQDisc = gen_mod:get_opt(iqdisc, Opts, one_queue),
|
IQDisc = gen_mod:get_opt(iqdisc, Opts, one_queue),
|
||||||
ServerHostB = list_to_binary(ServerHost),
|
ServerHostB = list_to_binary(ServerHost),
|
||||||
mod_disco:register_feature(ServerHost, ?NS_PUBSUB_s),
|
mod_disco:register_feature(ServerHost, ?NS_PUBSUB_s),
|
||||||
@ -190,12 +197,15 @@ init([ServerHost, Opts]) ->
|
|||||||
ets:insert(gen_mod:get_module_proc(ServerHost, pubsub_state), {plugins, Plugins}),
|
ets:insert(gen_mod:get_module_proc(ServerHost, pubsub_state), {plugins, Plugins}),
|
||||||
ets:insert(gen_mod:get_module_proc(ServerHost, pubsub_state), {pep_mapping, PepMapping}),
|
ets:insert(gen_mod:get_module_proc(ServerHost, pubsub_state), {pep_mapping, PepMapping}),
|
||||||
init_nodes(Host, ServerHost),
|
init_nodes(Host, ServerHost),
|
||||||
{ok, #state{host = Host,
|
State = #state{host = Host,
|
||||||
server_host = ServerHost,
|
server_host = ServerHost,
|
||||||
access = Access,
|
access = Access,
|
||||||
pep_mapping = PepMapping,
|
pep_mapping = PepMapping,
|
||||||
|
pep_sendlast_offline = PepOffline,
|
||||||
nodetree = NodeTree,
|
nodetree = NodeTree,
|
||||||
plugins = Plugins}}.
|
plugins = Plugins},
|
||||||
|
SendLoop = spawn(?MODULE, send_loop, [State]), %% TODO supervise that process
|
||||||
|
{ok, State#state{send_loop = SendLoop}}.
|
||||||
|
|
||||||
%% @spec (Host, ServerHost, Opts) -> Plugins
|
%% @spec (Host, ServerHost, Opts) -> Plugins
|
||||||
%% Host = mod_pubsub:host() Opts = [{Key,Value}]
|
%% Host = mod_pubsub:host() Opts = [{Key,Value}]
|
||||||
@ -310,6 +320,113 @@ update_database(Host) ->
|
|||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
send_loop(State) ->
|
||||||
|
receive
|
||||||
|
{presence, JID, Pid} ->
|
||||||
|
Host = State#state.host,
|
||||||
|
ServerHost = State#state.server_host,
|
||||||
|
LJID = jlib:short_prepd_jid(JID),
|
||||||
|
BJID = jlib:short_prepd_bare_jid(JID),
|
||||||
|
%% for each node From is subscribed to
|
||||||
|
%% and if the node is so configured, send the last published item to From
|
||||||
|
lists:foreach(fun(Type) ->
|
||||||
|
{result, Subscriptions} = node_action(Type, get_entity_subscriptions, [Host, JID]),
|
||||||
|
lists:foreach(
|
||||||
|
fun({Node, subscribed, SubJID}) ->
|
||||||
|
if (SubJID == LJID) or (SubJID == BJID) ->
|
||||||
|
case tree_action(Host, get_node, [Host, Node, JID]) of
|
||||||
|
#pubsub_node{options = Options} ->
|
||||||
|
case get_option(Options, send_last_published_item) of
|
||||||
|
on_sub_and_presence ->
|
||||||
|
send_items(Host, Node, SubJID, last);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end;
|
||||||
|
true ->
|
||||||
|
% resource not concerned about that subscription
|
||||||
|
ok
|
||||||
|
end;
|
||||||
|
(_) ->
|
||||||
|
ok
|
||||||
|
end, Subscriptions)
|
||||||
|
end, State#state.plugins),
|
||||||
|
%% and force send the last PEP events published by its offline and local contacts
|
||||||
|
%% only if pubsub is explicitely configured for that.
|
||||||
|
%% this is a hack in a sense that PEP should only be based on presence
|
||||||
|
%% and is not able to "store" events of remote users (via s2s)
|
||||||
|
%% this makes that hack only work for local domain by now
|
||||||
|
if State#state.pep_sendlast_offline ->
|
||||||
|
case catch ejabberd_c2s:get_subscribed(Pid) of
|
||||||
|
Contacts when is_list(Contacts) ->
|
||||||
|
{User, Server, Resource} = LJID,
|
||||||
|
lists:foreach(
|
||||||
|
fun({U, S, R}) -> %% local contacts
|
||||||
|
case ejabberd_sm:get_user_resources(U, S) of
|
||||||
|
[] -> %% offline
|
||||||
|
case S of
|
||||||
|
ServerHost -> %% local contact, so we may have pep items
|
||||||
|
PeerJID = exmpp_jlib:make_jid(U, S, R),
|
||||||
|
handle_cast({presence, User, Server, [Resource], PeerJID}, State);
|
||||||
|
_ -> %% remote contact, no items available
|
||||||
|
ok
|
||||||
|
end;
|
||||||
|
_ -> %% online
|
||||||
|
% this is already handled by presence probe
|
||||||
|
ok
|
||||||
|
end;
|
||||||
|
(_) -> %% remote contacts
|
||||||
|
% we can not do anything in any cases
|
||||||
|
ok
|
||||||
|
end, Contacts);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end;
|
||||||
|
true ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
send_loop(State);
|
||||||
|
{presence, User, Server, Resources, JID} ->
|
||||||
|
Owner = jlib:short_prepd_bare_jid(JID),
|
||||||
|
Host = State#state.host,
|
||||||
|
ServerHost = State#state.server_host,
|
||||||
|
lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, options = Options}) ->
|
||||||
|
case get_option(Options, send_last_published_item) of
|
||||||
|
on_sub_and_presence ->
|
||||||
|
lists:foreach(fun(Resource) ->
|
||||||
|
LJID = {User, Server, Resource},
|
||||||
|
case is_caps_notify(ServerHost, Node, LJID) of
|
||||||
|
true ->
|
||||||
|
Subscribed = case get_option(Options, access_model) of
|
||||||
|
open -> true;
|
||||||
|
presence -> true;
|
||||||
|
whitelist -> false; % subscribers are added manually
|
||||||
|
authorize -> false; % likewise
|
||||||
|
roster ->
|
||||||
|
Grps = get_option(Options, roster_groups_allowed, []),
|
||||||
|
{OU, OS, _} = Owner,
|
||||||
|
element(2, get_roster_info(OU, OS, LJID, Grps))
|
||||||
|
end,
|
||||||
|
if Subscribed ->
|
||||||
|
send_items(Owner, Node, LJID, last);
|
||||||
|
true ->
|
||||||
|
ok
|
||||||
|
end;
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
end, Resources);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
end, tree_action(Host, get_nodes, [Owner, JID])),
|
||||||
|
send_loop(State);
|
||||||
|
stop ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
%% -------
|
%% -------
|
||||||
%% disco hooks handling functions
|
%% disco hooks handling functions
|
||||||
%%
|
%%
|
||||||
@ -416,11 +533,11 @@ disco_sm_items(Acc, From, To, NodeB, _Lang) ->
|
|||||||
%% presence hooks handling functions
|
%% presence hooks handling functions
|
||||||
%%
|
%%
|
||||||
|
|
||||||
presence_probe(JID, JID, _Pid) ->
|
presence_probe(JID, JID, Pid) ->
|
||||||
{U, S, R} = jlib:short_prepd_jid(JID),
|
{U, S, R} = jlib:short_prepd_jid(JID),
|
||||||
Host = S, % exmpp_jid:ldomain_as_list(JID),
|
Host = S, % exmpp_jid:ldomain_as_list(JID),
|
||||||
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
|
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
|
||||||
gen_server:cast(Proc, {presence, JID}),
|
gen_server:cast(Proc, {presence, JID, Pid}),
|
||||||
gen_server:cast(Proc, {presence, U, S, [R], JID});
|
gen_server:cast(Proc, {presence, U, S, [R], JID});
|
||||||
presence_probe(Peer, JID, _Pid) ->
|
presence_probe(Peer, JID, _Pid) ->
|
||||||
{U, S, R} = jlib:short_prepd_jid(Peer),
|
{U, S, R} = jlib:short_prepd_jid(Peer),
|
||||||
@ -483,72 +600,14 @@ handle_call(stop, _From, State) ->
|
|||||||
%% Description: Handling cast messages
|
%% Description: Handling cast messages
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% @private
|
%% @private
|
||||||
handle_cast({presence, JID}, State) ->
|
handle_cast({presence, JID, Pid}, State) ->
|
||||||
%% A new resource is available. send last published items
|
%% A new resource is available. send last published items
|
||||||
Host = State#state.host,
|
State#state.send_loop ! {presence, JID, Pid},
|
||||||
LJID = jlib:short_prepd_jid(JID),
|
|
||||||
%% for each node From is subscribed to
|
|
||||||
%% and if the node is so configured, send the last published item to From
|
|
||||||
spawn(fun() ->
|
|
||||||
lists:foreach(fun(Type) ->
|
|
||||||
{result, Subscriptions} = node_action(Type, get_entity_subscriptions, [Host, JID]),
|
|
||||||
lists:foreach(
|
|
||||||
fun({Node, subscribed, _SubJID}) ->
|
|
||||||
case tree_action(Host, get_node, [Host, Node, JID]) of
|
|
||||||
#pubsub_node{options = Options} ->
|
|
||||||
case get_option(Options, send_last_published_item) of
|
|
||||||
on_sub_and_presence ->
|
|
||||||
send_items(Host, Node, LJID, last);
|
|
||||||
_ ->
|
|
||||||
ok
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
ok
|
|
||||||
end;
|
|
||||||
(_) ->
|
|
||||||
ok
|
|
||||||
end, Subscriptions)
|
|
||||||
end, State#state.plugins)
|
|
||||||
end),
|
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_cast({presence, User, Server, Resources, JID}, State) ->
|
handle_cast({presence, User, Server, Resources, JID}, State) ->
|
||||||
%% A new resource is available. send last published PEP items
|
%% A new resource is available. send last published PEP items
|
||||||
Owner = jlib:short_prepd_bare_jid(JID),
|
State#state.send_loop ! {presence, User, Server, Resources, JID},
|
||||||
Host = State#state.host,
|
|
||||||
ServerHost = State#state.server_host,
|
|
||||||
spawn(fun() ->
|
|
||||||
lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, options = Options}) ->
|
|
||||||
case get_option(Options, send_last_published_item) of
|
|
||||||
on_sub_and_presence ->
|
|
||||||
lists:foreach(fun(Resource) ->
|
|
||||||
LJID = {User, Server, Resource},
|
|
||||||
case is_caps_notify(ServerHost, Node, LJID) of
|
|
||||||
true ->
|
|
||||||
Subscribed = case get_option(Options, access_model) of
|
|
||||||
open -> true;
|
|
||||||
presence -> true;
|
|
||||||
whitelist -> false; % subscribers are added manually
|
|
||||||
authorize -> false; % likewise
|
|
||||||
roster ->
|
|
||||||
Grps = get_option(Options, roster_groups_allowed, []),
|
|
||||||
{OU, OS, _} = Owner,
|
|
||||||
element(2, get_roster_info(OU, OS, LJID, Grps))
|
|
||||||
end,
|
|
||||||
if Subscribed ->
|
|
||||||
send_items(Owner, Node, LJID, last);
|
|
||||||
true ->
|
|
||||||
ok
|
|
||||||
end;
|
|
||||||
false ->
|
|
||||||
ok
|
|
||||||
end
|
|
||||||
end, Resources);
|
|
||||||
_ ->
|
|
||||||
ok
|
|
||||||
end
|
|
||||||
end, tree_action(ServerHost, get_nodes, [Owner, JID]))
|
|
||||||
end),
|
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_cast({remove_user, LUser, LServer}, State) ->
|
handle_cast({remove_user, LUser, LServer}, State) ->
|
||||||
@ -605,7 +664,8 @@ handle_info(_Info, State) ->
|
|||||||
terminate(_Reason, #state{host = Host,
|
terminate(_Reason, #state{host = Host,
|
||||||
server_host = ServerHost,
|
server_host = ServerHost,
|
||||||
nodetree = TreePlugin,
|
nodetree = TreePlugin,
|
||||||
plugins = Plugins}) ->
|
plugins = Plugins,
|
||||||
|
send_loop = SendLoop}) ->
|
||||||
terminate_plugins(Host, ServerHost, Plugins, TreePlugin),
|
terminate_plugins(Host, ServerHost, Plugins, TreePlugin),
|
||||||
ejabberd_router:unregister_route(Host),
|
ejabberd_router:unregister_route(Host),
|
||||||
ServerHostB = list_to_binary(ServerHost),
|
ServerHostB = list_to_binary(ServerHost),
|
||||||
@ -631,6 +691,7 @@ terminate(_Reason, #state{host = Host,
|
|||||||
gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHostB, ?NS_PUBSUB),
|
gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHostB, ?NS_PUBSUB),
|
||||||
gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHostB, ?NS_PUBSUB_OWNER),
|
gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHostB, ?NS_PUBSUB_OWNER),
|
||||||
mod_disco:unregister_feature(ServerHost, ?NS_PUBSUB_s),
|
mod_disco:unregister_feature(ServerHost, ?NS_PUBSUB_s),
|
||||||
|
SendLoop ! stop,
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
@ -829,7 +890,7 @@ iq_disco_items(Host, Item, From) ->
|
|||||||
%% Note: Multiple Node Discovery not supported (mask on pubsub#type)
|
%% Note: Multiple Node Discovery not supported (mask on pubsub#type)
|
||||||
%% TODO this code is also back-compatible with pubsub v1.8 (for client issue)
|
%% TODO this code is also back-compatible with pubsub v1.8 (for client issue)
|
||||||
%% TODO make it pubsub v1.12 compliant (breaks client compatibility ?)
|
%% TODO make it pubsub v1.12 compliant (breaks client compatibility ?)
|
||||||
%% TODO That is, remove name attribute
|
%% TODO That is, remove name attribute (or node?, please check)
|
||||||
Action =
|
Action =
|
||||||
fun(#pubsub_node{type = Type}) ->
|
fun(#pubsub_node{type = Type}) ->
|
||||||
NodeItems = case node_call(Type, get_items, [Host, Node, From]) of
|
NodeItems = case node_call(Type, get_items, [Host, Node, From]) of
|
||||||
@ -1570,7 +1631,7 @@ publish_item(Host, ServerHost, Node, Publisher, ItemId, Payload) ->
|
|||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
ejabberd_hooks:run(pubsub_publish_item, ServerHost, [ServerHost, Node, Publisher, service_jid(Host), ItemId, Payload]),
|
ejabberd_hooks:run(pubsub_publish_item, ServerHost, [ServerHost, Node, Publisher, service_jid(Host), ItemId, Payload]),
|
||||||
Reply = [],
|
Reply = [], %% TODO EJAB-909
|
||||||
case transaction(Host, Node, Action, sync_dirty) of
|
case transaction(Host, Node, Action, sync_dirty) of
|
||||||
{error, 'item-not-found'} ->
|
{error, 'item-not-found'} ->
|
||||||
%% handles auto-create feature
|
%% handles auto-create feature
|
||||||
|
Loading…
Reference in New Issue
Block a user