mirror of
https://github.com/processone/ejabberd.git
synced 2024-12-20 17:27:00 +01:00
PubSub: refactor send_last_items remove send_loop
This commit is contained in:
parent
8f5a1c4a2a
commit
9c5427e0c2
@ -52,7 +52,7 @@
|
|||||||
%% exports for hooks
|
%% exports for hooks
|
||||||
-export([presence_probe/3, caps_add/3, caps_update/3,
|
-export([presence_probe/3, caps_add/3, caps_update/3,
|
||||||
in_subscription/6, out_subscription/4,
|
in_subscription/6, out_subscription/4,
|
||||||
on_user_offline/3, remove_user/2,
|
on_user_online/1, on_user_offline/2, remove_user/2,
|
||||||
disco_local_identity/5, disco_local_features/5,
|
disco_local_identity/5, disco_local_features/5,
|
||||||
disco_local_items/5, disco_sm_identity/5,
|
disco_local_items/5, disco_sm_identity/5,
|
||||||
disco_sm_features/5, disco_sm_items/5,
|
disco_sm_features/5, disco_sm_items/5,
|
||||||
@ -89,11 +89,7 @@
|
|||||||
%% API and gen_server callbacks
|
%% API and gen_server callbacks
|
||||||
-export([start/2, stop/1, init/1,
|
-export([start/2, stop/1, init/1,
|
||||||
handle_call/3, handle_cast/2, handle_info/2,
|
handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3, depends/2, export/1]).
|
terminate/2, code_change/3, depends/2, export/1, mod_opt_type/1]).
|
||||||
|
|
||||||
-export([send_loop/1, mod_opt_type/1]).
|
|
||||||
|
|
||||||
-define(LOOPNAME, ejabberd_mod_pubsub_loop).
|
|
||||||
|
|
||||||
%%====================================================================
|
%%====================================================================
|
||||||
%% API
|
%% API
|
||||||
@ -300,7 +296,9 @@ init([ServerHost, Opts]) ->
|
|||||||
?MODULE, process_commands, IQDisc),
|
?MODULE, process_commands, IQDisc),
|
||||||
Plugins
|
Plugins
|
||||||
end, Hosts),
|
end, Hosts),
|
||||||
ejabberd_hooks:add(sm_remove_connection_hook, ServerHost,
|
ejabberd_hooks:add(c2s_session_opened, ServerHost,
|
||||||
|
?MODULE, on_user_online, 75),
|
||||||
|
ejabberd_hooks:add(c2s_terminated, ServerHost,
|
||||||
?MODULE, on_user_offline, 75),
|
?MODULE, on_user_offline, 75),
|
||||||
ejabberd_hooks:add(disco_local_identity, ServerHost,
|
ejabberd_hooks:add(disco_local_identity, ServerHost,
|
||||||
?MODULE, disco_local_identity, 75),
|
?MODULE, disco_local_identity, 75),
|
||||||
@ -337,34 +335,16 @@ init([ServerHost, Opts]) ->
|
|||||||
false ->
|
false ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
{_, State} = init_send_loop(ServerHost, Hosts),
|
|
||||||
{ok, State}.
|
|
||||||
|
|
||||||
init_send_loop(ServerHost, Hosts) ->
|
|
||||||
NodeTree = config(ServerHost, nodetree),
|
NodeTree = config(ServerHost, nodetree),
|
||||||
Plugins = config(ServerHost, plugins),
|
Plugins = config(ServerHost, plugins),
|
||||||
LastItemCache = config(ServerHost, last_item_cache),
|
|
||||||
MaxItemsNode = config(ServerHost, max_items_node),
|
|
||||||
PepMapping = config(ServerHost, pep_mapping),
|
PepMapping = config(ServerHost, pep_mapping),
|
||||||
PepOffline = config(ServerHost, ignore_pep_from_offline),
|
|
||||||
Access = config(ServerHost, access),
|
|
||||||
DBType = gen_mod:db_type(ServerHost, ?MODULE),
|
DBType = gen_mod:db_type(ServerHost, ?MODULE),
|
||||||
State = #state{hosts = Hosts, server_host = ServerHost,
|
{ok, #state{hosts = Hosts, server_host = ServerHost,
|
||||||
access = Access, pep_mapping = PepMapping,
|
access = Access, pep_mapping = PepMapping,
|
||||||
ignore_pep_from_offline = PepOffline,
|
ignore_pep_from_offline = PepOffline,
|
||||||
last_item_cache = LastItemCache,
|
last_item_cache = LastItemCache,
|
||||||
max_items_node = MaxItemsNode, nodetree = NodeTree,
|
max_items_node = MaxItemsNode, nodetree = NodeTree,
|
||||||
plugins = Plugins, db_type = DBType},
|
plugins = Plugins, db_type = DBType}}.
|
||||||
Proc = gen_mod:get_module_proc(ServerHost, ?LOOPNAME),
|
|
||||||
Pid = case whereis(Proc) of
|
|
||||||
undefined ->
|
|
||||||
SendLoop = spawn(?MODULE, send_loop, [State]),
|
|
||||||
register(Proc, SendLoop),
|
|
||||||
SendLoop;
|
|
||||||
Loop ->
|
|
||||||
Loop
|
|
||||||
end,
|
|
||||||
{Pid, State}.
|
|
||||||
|
|
||||||
depends(ServerHost, Opts) ->
|
depends(ServerHost, Opts) ->
|
||||||
Host = gen_mod:get_opt_host(ServerHost, Opts, <<"pubsub.@HOST@">>),
|
Host = gen_mod:get_opt_host(ServerHost, Opts, <<"pubsub.@HOST@">>),
|
||||||
@ -414,94 +394,6 @@ terminate_plugins(Host, ServerHost, Plugins, TreePlugin) ->
|
|||||||
TreePlugin:terminate(Host, ServerHost),
|
TreePlugin:terminate(Host, ServerHost),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
get_subscribed(User, Server) ->
|
|
||||||
Items = ejabberd_hooks:run_fold(roster_get, Server, [], [{User, Server}]),
|
|
||||||
lists:filtermap(
|
|
||||||
fun(#roster{jid = LJID, subscription = Sub})
|
|
||||||
when Sub == both orelse Sub == from ->
|
|
||||||
{true, LJID};
|
|
||||||
(_) ->
|
|
||||||
false
|
|
||||||
end, Items).
|
|
||||||
|
|
||||||
send_loop(State) ->
|
|
||||||
receive
|
|
||||||
{presence, JID, _Pid} ->
|
|
||||||
ServerHost = State#state.server_host,
|
|
||||||
Host = host(State#state.server_host),
|
|
||||||
DBType = State#state.db_type,
|
|
||||||
LJID = jid:tolower(JID),
|
|
||||||
BJID = jid:remove_resource(LJID),
|
|
||||||
lists:foreach(
|
|
||||||
fun(PType) ->
|
|
||||||
Subs = get_subscriptions_for_send_last(Host, PType, DBType, JID, LJID, BJID),
|
|
||||||
lists:foreach(
|
|
||||||
fun({NodeRec, _, _, SubJID}) ->
|
|
||||||
{_, Node} = NodeRec#pubsub_node.nodeid,
|
|
||||||
Nidx = NodeRec#pubsub_node.id,
|
|
||||||
Options = NodeRec#pubsub_node.options,
|
|
||||||
[send_items(Host, Node, Nidx, PType, Options, SubJID, last)
|
|
||||||
|| NodeRec#pubsub_node.type == PType]
|
|
||||||
end,
|
|
||||||
lists:usort(Subs))
|
|
||||||
end,
|
|
||||||
State#state.plugins),
|
|
||||||
if not State#state.ignore_pep_from_offline ->
|
|
||||||
{User, Server, Resource} = LJID,
|
|
||||||
Contacts = get_subscribed(User, Server),
|
|
||||||
lists:foreach(
|
|
||||||
fun({U, S, R}) when S == ServerHost ->
|
|
||||||
case user_resources(U, S) of
|
|
||||||
[] -> %% offline
|
|
||||||
PeerJID = jid:make(U, S, R),
|
|
||||||
self() ! {presence, User, Server, [Resource], PeerJID};
|
|
||||||
_ -> %% online
|
|
||||||
%% this is already handled by presence probe
|
|
||||||
ok
|
|
||||||
end;
|
|
||||||
(_) ->
|
|
||||||
%% we can not do anything in any cases
|
|
||||||
ok
|
|
||||||
end, Contacts);
|
|
||||||
true ->
|
|
||||||
ok
|
|
||||||
end,
|
|
||||||
send_loop(State);
|
|
||||||
{presence, User, Server, Resources, JID} ->
|
|
||||||
spawn(fun() ->
|
|
||||||
Host = host(State#state.server_host),
|
|
||||||
Owner = jid:remove_resource(jid:tolower(JID)),
|
|
||||||
lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, type = Type, id = Nidx, options = Options}) ->
|
|
||||||
case match_option(Options, send_last_published_item, on_sub_and_presence) of
|
|
||||||
true ->
|
|
||||||
lists:foreach(fun(Resource) ->
|
|
||||||
LJID = {User, Server, Resource},
|
|
||||||
Subscribed = case get_option(Options, access_model) of
|
|
||||||
open -> true;
|
|
||||||
presence -> true;
|
|
||||||
whitelist -> false; % subscribers are added manually
|
|
||||||
authorize -> false; % likewise
|
|
||||||
roster ->
|
|
||||||
Grps = get_option(Options, roster_groups_allowed, []),
|
|
||||||
{OU, OS, _} = Owner,
|
|
||||||
element(2, get_roster_info(OU, OS, LJID, Grps))
|
|
||||||
end,
|
|
||||||
if Subscribed -> send_items(Owner, Node, Nidx, Type, Options, LJID, last);
|
|
||||||
true -> ok
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
Resources);
|
|
||||||
_ ->
|
|
||||||
ok
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
tree_action(Host, get_nodes, [Owner, JID]))
|
|
||||||
end),
|
|
||||||
send_loop(State);
|
|
||||||
stop ->
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% -------
|
%% -------
|
||||||
%% disco hooks handling functions
|
%% disco hooks handling functions
|
||||||
%%
|
%%
|
||||||
@ -660,12 +552,12 @@ disco_items(Host, Node, From) ->
|
|||||||
end.
|
end.
|
||||||
|
|
||||||
%% -------
|
%% -------
|
||||||
%% presence hooks handling functions
|
%% presence and session hooks handling functions
|
||||||
%%
|
%%
|
||||||
|
|
||||||
-spec caps_add(jid(), jid(), [binary()]) -> ok.
|
-spec caps_add(jid(), jid(), [binary()]) -> ok.
|
||||||
caps_add(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID, _Features)
|
caps_add(#jid{lserver = S1} = From, #jid{lserver = S2} = To, _Features)
|
||||||
when Host =/= S ->
|
when S1 =/= S2 ->
|
||||||
%% When a remote contact goes online while the local user is offline, the
|
%% When a remote contact goes online while the local user is offline, the
|
||||||
%% remote contact won't receive last items from the local user even if
|
%% remote contact won't receive last items from the local user even if
|
||||||
%% ignore_pep_from_offline is set to false. To work around this issue a bit,
|
%% ignore_pep_from_offline is set to false. To work around this issue a bit,
|
||||||
@ -675,30 +567,36 @@ caps_add(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID
|
|||||||
%% contact becomes available; the former is also executed when the local
|
%% contact becomes available; the former is also executed when the local
|
||||||
%% user goes online (because that triggers the contact to send a presence
|
%% user goes online (because that triggers the contact to send a presence
|
||||||
%% packet with CAPS).
|
%% packet with CAPS).
|
||||||
presence(Host, {presence, U, S, [R], JID});
|
send_last_pep(To, From);
|
||||||
caps_add(_From, _To, _Feature) ->
|
caps_add(_From, _To, _Feature) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
-spec caps_update(jid(), jid(), [binary()]) -> ok.
|
-spec caps_update(jid(), jid(), [binary()]) -> ok.
|
||||||
caps_update(#jid{luser = U, lserver = S, lresource = R}, #jid{lserver = Host} = JID, _Features) ->
|
caps_update(#jid{lserver = S1} = From, #jid{lserver = S2} = To, _Features)
|
||||||
presence(Host, {presence, U, S, [R], JID}).
|
when S1 =/= S2 ->
|
||||||
|
send_last_pep(To, From).
|
||||||
|
|
||||||
-spec presence_probe(jid(), jid(), pid()) -> ok.
|
-spec presence_probe(jid(), jid(), pid()) -> ok.
|
||||||
presence_probe(#jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}, _Pid) ->
|
presence_probe(#jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}, _Pid) ->
|
||||||
%% ignore presence_probe from my other ressources
|
%% ignore presence_probe from my other ressources
|
||||||
%% to not get duplicated last items
|
|
||||||
ok;
|
ok;
|
||||||
presence_probe(#jid{lserver = S} = From, #jid{lserver = S} = To, Pid) ->
|
presence_probe(#jid{lserver = S} = From, #jid{lserver = S} = To, _Pid) ->
|
||||||
presence(S, {presence, From, Pid}),
|
send_last_pep(To, From);
|
||||||
presence(S, {presence, From#jid.luser, S, [From#jid.lresource], To});
|
|
||||||
presence_probe(_From, _To, _Pid) ->
|
presence_probe(_From, _To, _Pid) ->
|
||||||
%% ignore presence_probe from remote contacts,
|
%% ignore presence_probe from remote contacts, those are handled via caps_add
|
||||||
%% those are handled via caps_add
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
presence(ServerHost, Presence) ->
|
-spec on_user_online(ejabberd_c2s:state()) -> ejabberd_c2s:state().
|
||||||
gen_mod:get_module_proc(ServerHost, ?LOOPNAME) ! Presence,
|
on_user_online(C2SState) ->
|
||||||
ok.
|
JID = maps:get(jid, C2SState),
|
||||||
|
send_last_items(JID),
|
||||||
|
C2SState.
|
||||||
|
|
||||||
|
-spec on_user_offline(ejabberd_c2s:state(), atom()) -> ejabberd_c2s:state().
|
||||||
|
on_user_offline(C2SState, _Reason) ->
|
||||||
|
JID = maps:get(jid, C2SState),
|
||||||
|
purge_offline(jid:tolower(JID)),
|
||||||
|
C2SState.
|
||||||
|
|
||||||
%% -------
|
%% -------
|
||||||
%% subscription hooks handling functions
|
%% subscription hooks handling functions
|
||||||
@ -707,14 +605,8 @@ presence(ServerHost, Presence) ->
|
|||||||
-spec out_subscription(
|
-spec out_subscription(
|
||||||
binary(), binary(), jid(),
|
binary(), binary(), jid(),
|
||||||
subscribed | unsubscribed | subscribe | unsubscribe) -> boolean().
|
subscribed | unsubscribed | subscribe | unsubscribe) -> boolean().
|
||||||
out_subscription(User, Server, JID, subscribed) ->
|
out_subscription(User, Server, To, subscribed) ->
|
||||||
Owner = jid:make(User, Server),
|
send_last_pep(jid:make(User, Server), To),
|
||||||
{PUser, PServer, PResource} = jid:tolower(JID),
|
|
||||||
PResources = case PResource of
|
|
||||||
<<>> -> user_resources(PUser, PServer);
|
|
||||||
_ -> [PResource]
|
|
||||||
end,
|
|
||||||
presence(Server, {presence, PUser, PServer, PResources, Owner}),
|
|
||||||
true;
|
true;
|
||||||
out_subscription(_, _, _, _) ->
|
out_subscription(_, _, _, _) ->
|
||||||
true.
|
true.
|
||||||
@ -883,7 +775,9 @@ terminate(_Reason,
|
|||||||
false ->
|
false ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
ejabberd_hooks:delete(sm_remove_connection_hook, ServerHost,
|
ejabberd_hooks:delete(c2s_session_opened, ServerHost,
|
||||||
|
?MODULE, on_user_online, 75),
|
||||||
|
ejabberd_hooks:delete(c2s_terminated, ServerHost,
|
||||||
?MODULE, on_user_offline, 75),
|
?MODULE, on_user_offline, 75),
|
||||||
ejabberd_hooks:delete(disco_local_identity, ServerHost,
|
ejabberd_hooks:delete(disco_local_identity, ServerHost,
|
||||||
?MODULE, disco_local_identity, 75),
|
?MODULE, disco_local_identity, 75),
|
||||||
@ -901,12 +795,6 @@ terminate(_Reason,
|
|||||||
?MODULE, remove_user, 50),
|
?MODULE, remove_user, 50),
|
||||||
ejabberd_hooks:delete(c2s_handle_info, ServerHost,
|
ejabberd_hooks:delete(c2s_handle_info, ServerHost,
|
||||||
?MODULE, c2s_handle_info, 50),
|
?MODULE, c2s_handle_info, 50),
|
||||||
case whereis(gen_mod:get_module_proc(ServerHost, ?LOOPNAME)) of
|
|
||||||
undefined ->
|
|
||||||
?ERROR_MSG("~s process is dead, pubsub was broken", [?LOOPNAME]);
|
|
||||||
Pid ->
|
|
||||||
Pid ! stop
|
|
||||||
end,
|
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(Host) ->
|
fun(Host) ->
|
||||||
gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO),
|
gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_DISCO_INFO),
|
||||||
@ -1797,7 +1685,7 @@ subscribe_node(Host, Node, From, JID, Configuration) ->
|
|||||||
Nidx = TNode#pubsub_node.id,
|
Nidx = TNode#pubsub_node.id,
|
||||||
Type = TNode#pubsub_node.type,
|
Type = TNode#pubsub_node.type,
|
||||||
Options = TNode#pubsub_node.options,
|
Options = TNode#pubsub_node.options,
|
||||||
send_items(Host, Node, Nidx, Type, Options, Subscriber, last),
|
send_items(Host, Node, Nidx, Type, Options, Subscriber, 1),
|
||||||
ServerHost = serverhost(Host),
|
ServerHost = serverhost(Host),
|
||||||
ejabberd_hooks:run(pubsub_subscribe_node, ServerHost,
|
ejabberd_hooks:run(pubsub_subscribe_node, ServerHost,
|
||||||
[ServerHost, Host, Node, Subscriber, SubId]),
|
[ServerHost, Host, Node, Subscriber, SubId]),
|
||||||
@ -2069,8 +1957,6 @@ purge_node(Host, Node, Owner) ->
|
|||||||
%% @doc <p>Return the items of a given node.</p>
|
%% @doc <p>Return the items of a given node.</p>
|
||||||
%% <p>The number of items to return is limited by MaxItems.</p>
|
%% <p>The number of items to return is limited by MaxItems.</p>
|
||||||
%% <p>The permission are not checked in this function.</p>
|
%% <p>The permission are not checked in this function.</p>
|
||||||
%% @todo We probably need to check that the user doing the query has the right
|
|
||||||
%% to read the items.
|
|
||||||
-spec get_items(host(), binary(), jid(), binary(),
|
-spec get_items(host(), binary(), jid(), binary(),
|
||||||
binary(), [binary()], undefined | rsm_set()) ->
|
binary(), [binary()], undefined | rsm_set()) ->
|
||||||
{result, pubsub()} | {error, stanza_error()}.
|
{result, pubsub()} | {error, stanza_error()}.
|
||||||
@ -2153,43 +2039,23 @@ get_allowed_items_call(Host, Nidx, From, Type, Options, Owners, RSM) ->
|
|||||||
{PS, RG} = get_presence_and_roster_permissions(Host, From, Owners, AccessModel, AllowedGroups),
|
{PS, RG} = get_presence_and_roster_permissions(Host, From, Owners, AccessModel, AllowedGroups),
|
||||||
node_call(Host, Type, get_items, [Nidx, From, AccessModel, PS, RG, undefined, RSM]).
|
node_call(Host, Type, get_items, [Nidx, From, AccessModel, PS, RG, undefined, RSM]).
|
||||||
|
|
||||||
get_last_items(Host, Type, Nidx, LJID, Count) ->
|
get_last_items(Host, Type, Nidx, LJID, 1) ->
|
||||||
case get_cached_item(Host, Nidx) of
|
case get_cached_item(Host, Nidx) of
|
||||||
undefined ->
|
undefined ->
|
||||||
case node_action(Host, Type, get_last_items, [Nidx, LJID, Count]) of
|
case node_action(Host, Type, get_last_items, [Nidx, LJID, 1]) of
|
||||||
{result, Items} -> Items;
|
{result, Items} -> Items;
|
||||||
_ -> []
|
_ -> []
|
||||||
end;
|
end;
|
||||||
LastItem ->
|
LastItem ->
|
||||||
[LastItem]
|
[LastItem]
|
||||||
end.
|
|
||||||
|
|
||||||
%% @doc <p>Resend the items of a node to the user.</p>
|
|
||||||
%% @todo use cache-last-item feature
|
|
||||||
send_items(Host, Node, Nidx, Type, Options, LJID, last) ->
|
|
||||||
case get_last_items(Host, Type, Nidx, LJID, 1) of
|
|
||||||
[LastItem] ->
|
|
||||||
Stanza = items_event_stanza(Node, Options, [LastItem]),
|
|
||||||
dispatch_items(Host, LJID, Node, Stanza);
|
|
||||||
_ ->
|
|
||||||
ok
|
|
||||||
end;
|
end;
|
||||||
send_items(Host, Node, Nidx, Type, Options, LJID, Number) when Number > 0 ->
|
get_last_items(Host, Type, Nidx, LJID, Count) when Count > 1 ->
|
||||||
Stanza = items_event_stanza(Node, Options, get_last_items(Host, Type, Nidx, Number, LJID)),
|
case node_action(Host, Type, get_last_items, [Nidx, LJID, Count]) of
|
||||||
dispatch_items(Host, LJID, Node, Stanza);
|
{result, Items} -> Items;
|
||||||
send_items(Host, Node, _Nidx, _Type, Options, LJID, _) ->
|
_ -> []
|
||||||
Stanza = items_event_stanza(Node, Options, []),
|
end;
|
||||||
dispatch_items(Host, LJID, Node, Stanza).
|
get_last_items(_Host, _Type, _Nidx, _LJID, _Count) ->
|
||||||
|
[].
|
||||||
dispatch_items({FromU, FromS, FromR}, To, Node, Stanza) ->
|
|
||||||
SenderResource = user_resource(FromU, FromS, FromR),
|
|
||||||
ejabberd_sm:route(jid:make(FromU, FromS, SenderResource),
|
|
||||||
{send_filtered, {pep_message, <<((Node))/binary, "+notify">>},
|
|
||||||
jid:make(FromU, FromS), jid:make(To),
|
|
||||||
Stanza});
|
|
||||||
dispatch_items(From, To, _Node, Stanza) ->
|
|
||||||
ejabberd_router:route(
|
|
||||||
xmpp:set_from_to(Stanza, service_jid(From), jid:make(To))).
|
|
||||||
|
|
||||||
%% @doc <p>Return the list of affiliations as an XMPP response.</p>
|
%% @doc <p>Return the list of affiliations as an XMPP response.</p>
|
||||||
-spec get_affiliations(host(), binary(), jid(), [binary()]) ->
|
-spec get_affiliations(host(), binary(), jid(), [binary()]) ->
|
||||||
@ -2536,14 +2402,15 @@ get_subscriptions_for_send_last(Host, PType, sql, JID, LJID, BJID) ->
|
|||||||
{result, Subs} = node_action(Host, PType,
|
{result, Subs} = node_action(Host, PType,
|
||||||
get_entity_subscriptions_for_send_last,
|
get_entity_subscriptions_for_send_last,
|
||||||
[Host, JID]),
|
[Host, JID]),
|
||||||
[{Node, Sub, SubId, SubJID}
|
[{Node, SubId, SubJID}
|
||||||
|| {Node, Sub, SubId, SubJID} <- Subs,
|
|| {Node, Sub, SubId, SubJID} <- Subs,
|
||||||
Sub =:= subscribed, (SubJID == LJID) or (SubJID == BJID)];
|
Sub =:= subscribed, (SubJID == LJID) or (SubJID == BJID)];
|
||||||
|
% sql version already filter result by on_sub_and_presence
|
||||||
get_subscriptions_for_send_last(Host, PType, _, JID, LJID, BJID) ->
|
get_subscriptions_for_send_last(Host, PType, _, JID, LJID, BJID) ->
|
||||||
{result, Subs} = node_action(Host, PType,
|
{result, Subs} = node_action(Host, PType,
|
||||||
get_entity_subscriptions,
|
get_entity_subscriptions,
|
||||||
[Host, JID]),
|
[Host, JID]),
|
||||||
[{Node, Sub, SubId, SubJID}
|
[{Node, SubId, SubJID}
|
||||||
|| {Node, Sub, SubId, SubJID} <- Subs,
|
|| {Node, Sub, SubId, SubJID} <- Subs,
|
||||||
Sub =:= subscribed, (SubJID == LJID) or (SubJID == BJID),
|
Sub =:= subscribed, (SubJID == LJID) or (SubJID == BJID),
|
||||||
match_option(Node, send_last_published_item, on_sub_and_presence)].
|
match_option(Node, send_last_published_item, on_sub_and_presence)].
|
||||||
@ -2932,8 +2799,9 @@ get_options_for_subs(Host, Nidx, Subs, true) ->
|
|||||||
broadcast_stanza(Host, _Node, _Nidx, _Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM) ->
|
broadcast_stanza(Host, _Node, _Nidx, _Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM) ->
|
||||||
NotificationType = get_option(NodeOptions, notification_type, headline),
|
NotificationType = get_option(NodeOptions, notification_type, headline),
|
||||||
BroadcastAll = get_option(NodeOptions, broadcast_all_resources), %% XXX this is not standard, but usefull
|
BroadcastAll = get_option(NodeOptions, broadcast_all_resources), %% XXX this is not standard, but usefull
|
||||||
From = service_jid(Host),
|
Stanza = add_message_type(
|
||||||
Stanza = add_message_type(BaseStanza, NotificationType),
|
xmpp:set_from(BaseStanza, service_jid(Host)),
|
||||||
|
NotificationType),
|
||||||
%% Handles explicit subscriptions
|
%% Handles explicit subscriptions
|
||||||
SubIDsByJID = subscribed_nodes_by_jid(NotifyType, SubsByDepth),
|
SubIDsByJID = subscribed_nodes_by_jid(NotifyType, SubsByDepth),
|
||||||
lists:foreach(fun ({LJID, _NodeName, SubIDs}) ->
|
lists:foreach(fun ({LJID, _NodeName, SubIDs}) ->
|
||||||
@ -2956,7 +2824,7 @@ broadcast_stanza(Host, _Node, _Nidx, _Type, NodeOptions, SubsByDepth, NotifyType
|
|||||||
end,
|
end,
|
||||||
lists:foreach(fun(To) ->
|
lists:foreach(fun(To) ->
|
||||||
ejabberd_router:route(
|
ejabberd_router:route(
|
||||||
xmpp:set_from_to(StanzaToSend, From, jid:make(To)))
|
xmpp:set_to(StanzaToSend, jid:make(To)))
|
||||||
end, LJIDs)
|
end, LJIDs)
|
||||||
end, SubIDsByJID).
|
end, SubIDsByJID).
|
||||||
|
|
||||||
@ -2965,55 +2833,145 @@ broadcast_stanza({LUser, LServer, LResource}, Publisher, Node, Nidx, Type, NodeO
|
|||||||
%% Handles implicit presence subscriptions
|
%% Handles implicit presence subscriptions
|
||||||
SenderResource = user_resource(LUser, LServer, LResource),
|
SenderResource = user_resource(LUser, LServer, LResource),
|
||||||
NotificationType = get_option(NodeOptions, notification_type, headline),
|
NotificationType = get_option(NodeOptions, notification_type, headline),
|
||||||
Stanza = add_message_type(BaseStanza, NotificationType),
|
Stanza = add_message_type(
|
||||||
|
xmpp:set_from(BaseStanza, jid:make(LUser, LServer)),
|
||||||
|
NotificationType),
|
||||||
%% set the from address on the notification to the bare JID of the account owner
|
%% set the from address on the notification to the bare JID of the account owner
|
||||||
%% Also, add "replyto" if entity has presence subscription to the account owner
|
%% Also, add "replyto" if entity has presence subscription to the account owner
|
||||||
%% See XEP-0163 1.1 section 4.3.1
|
%% See XEP-0163 1.1 section 4.3.1
|
||||||
ejabberd_sm:route(jid:make(LUser, LServer, SenderResource),
|
ejabberd_sm:route(jid:make(LUser, LServer, SenderResource),
|
||||||
{pep_message, <<((Node))/binary, "+notify">>,
|
{pep_message, <<((Node))/binary, "+notify">>,
|
||||||
jid:make(LUser, LServer),
|
|
||||||
add_extended_headers(
|
add_extended_headers(
|
||||||
Stanza, extended_headers([Publisher]))});
|
Stanza, extended_headers([Publisher]))});
|
||||||
broadcast_stanza(Host, _Publisher, Node, Nidx, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM) ->
|
broadcast_stanza(Host, _Publisher, Node, Nidx, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM) ->
|
||||||
broadcast_stanza(Host, Node, Nidx, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM).
|
broadcast_stanza(Host, Node, Nidx, Type, NodeOptions, SubsByDepth, NotifyType, BaseStanza, SHIM).
|
||||||
|
|
||||||
-spec c2s_handle_info(ejabberd_c2s:state(), term()) -> ejabberd_c2s:state().
|
-spec c2s_handle_info(ejabberd_c2s:state(), term()) -> ejabberd_c2s:state().
|
||||||
c2s_handle_info(#{server := Server} = C2SState,
|
c2s_handle_info(#{lserver := LServer} = C2SState,
|
||||||
{pep_message, Feature, From, Packet}) ->
|
{pep_message, Feature, Packet}) ->
|
||||||
LServer = jid:nameprep(Server),
|
[maybe_send_pep_stanza(LServer, USR, Caps, Feature, Packet)
|
||||||
lists:foreach(
|
|| {USR, Caps} <- mod_caps:list_features(C2SState)],
|
||||||
fun({USR, Caps}) ->
|
|
||||||
Features = mod_caps:get_features(LServer, Caps),
|
|
||||||
case lists:member(Feature, Features) of
|
|
||||||
true ->
|
|
||||||
To = jid:make(USR),
|
|
||||||
NewPacket = xmpp:set_from_to(Packet, From, To),
|
|
||||||
ejabberd_router:route(NewPacket);
|
|
||||||
false ->
|
|
||||||
ok
|
|
||||||
end
|
|
||||||
end, mod_caps:list_features(C2SState)),
|
|
||||||
{stop, C2SState};
|
{stop, C2SState};
|
||||||
c2s_handle_info(#{server := Server} = C2SState,
|
c2s_handle_info(#{lserver := LServer} = C2SState,
|
||||||
{send_filtered, {pep_message, Feature}, From, To, Packet}) ->
|
{pep_message, Feature, Packet, USR}) ->
|
||||||
LServer = jid:nameprep(Server),
|
case mod_caps:get_user_caps(USR, C2SState) of
|
||||||
case mod_caps:get_user_caps(To, C2SState) of
|
{ok, Caps} -> maybe_send_pep_stanza(LServer, USR, Caps, Feature, Packet);
|
||||||
{ok, Caps} ->
|
error -> ok
|
||||||
Features = mod_caps:get_features(LServer, Caps),
|
|
||||||
case lists:member(Feature, Features) of
|
|
||||||
true ->
|
|
||||||
NewPacket = xmpp:set_from_to(Packet, From, To),
|
|
||||||
ejabberd_router:route(NewPacket);
|
|
||||||
false ->
|
|
||||||
ok
|
|
||||||
end;
|
|
||||||
error ->
|
|
||||||
ok
|
|
||||||
end,
|
end,
|
||||||
{stop, C2SState};
|
{stop, C2SState};
|
||||||
c2s_handle_info(C2SState, _) ->
|
c2s_handle_info(C2SState, _) ->
|
||||||
C2SState.
|
C2SState.
|
||||||
|
|
||||||
|
send_items(Host, Node, Nidx, Type, Options, LJID, Number) ->
|
||||||
|
send_items(Host, Node, Nidx, Type, Options, Host, LJID, LJID, Number).
|
||||||
|
send_items(Host, Node, Nidx, Type, Options, Publisher, SubLJID, ToLJID, Number) ->
|
||||||
|
case get_last_items(Host, Type, Nidx, SubLJID, Number) of
|
||||||
|
[] ->
|
||||||
|
ok;
|
||||||
|
Items ->
|
||||||
|
Stanza = items_event_stanza(Node, Options, Items),
|
||||||
|
send_stanza(Host, Publisher, ToLJID, Node, Stanza)
|
||||||
|
end.
|
||||||
|
|
||||||
|
send_stanza({LUser, LServer, LResource}, Publisher, USR, Node, BaseStanza) ->
|
||||||
|
SenderResource = user_resource(LUser, LServer, LResource),
|
||||||
|
Stanza = xmpp:set_from(BaseStanza, jid:make(LUser, LServer)),
|
||||||
|
USRs = case USR of
|
||||||
|
{PUser, PServer, <<>>} ->
|
||||||
|
[{PUser, PServer, PRessource}
|
||||||
|
|| PRessource <- user_resources(PUser, PServer)];
|
||||||
|
_ ->
|
||||||
|
[USR]
|
||||||
|
end,
|
||||||
|
[ejabberd_sm:route(jid:make(LUser, LServer, SenderResource),
|
||||||
|
{pep_message, <<((Node))/binary, "+notify">>,
|
||||||
|
add_extended_headers(
|
||||||
|
Stanza, extended_headers([Publisher])),
|
||||||
|
To}) || To <- USRs];
|
||||||
|
send_stanza(Host, _Publisher, USR, _Node, Stanza) ->
|
||||||
|
ejabberd_router:route(
|
||||||
|
xmpp:set_from_to(Stanza, service_jid(Host), jid:make(USR))).
|
||||||
|
|
||||||
|
maybe_send_pep_stanza(LServer, USR, Caps, Feature, Packet) ->
|
||||||
|
Features = mod_caps:get_features(LServer, Caps),
|
||||||
|
case lists:member(Feature, Features) of
|
||||||
|
true ->
|
||||||
|
ejabberd_router:route(xmpp:set_to(Packet, jid:make(USR)));
|
||||||
|
false ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
send_last_items(JID) ->
|
||||||
|
?DEBUG("~s", [jid:to_string(JID)]),
|
||||||
|
ServerHost = JID#jid.lserver,
|
||||||
|
Host = host(ServerHost),
|
||||||
|
DBType = config(ServerHost, db_type),
|
||||||
|
LJID = jid:tolower(JID),
|
||||||
|
BJID = jid:remove_resource(LJID),
|
||||||
|
lists:foreach(
|
||||||
|
fun(PType) ->
|
||||||
|
Subs = get_subscriptions_for_send_last(Host, PType, DBType, JID, LJID, BJID),
|
||||||
|
lists:foreach(
|
||||||
|
fun({#pubsub_node{nodeid = {_, Node}, type = Type, id = Nidx,
|
||||||
|
options = Options}, _, SubJID})
|
||||||
|
when Type == PType->
|
||||||
|
send_items(Host, Node, Nidx, PType, Options, Host, SubJID, LJID, 1);
|
||||||
|
(_) ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
lists:usort(Subs))
|
||||||
|
end, config(ServerHost, plugins)).
|
||||||
|
% pep_from_offline hack can not work anymore, as sender c2s does not
|
||||||
|
% exists when sender is offline, so we can't get match receiver caps
|
||||||
|
% does it make sens to send PEP from an offline contact anyway ?
|
||||||
|
% case config(ServerHost, ignore_pep_from_offline) of
|
||||||
|
% false ->
|
||||||
|
% Roster = ejabberd_hooks:run_fold(roster_get, ServerHost, [],
|
||||||
|
% [{JID#jid.luser, ServerHost}]),
|
||||||
|
% lists:foreach(
|
||||||
|
% fun(#roster{jid = {U, S, R}, subscription = Sub})
|
||||||
|
% when Sub == both orelse Sub == from,
|
||||||
|
% S == ServerHost ->
|
||||||
|
% case user_resources(U, S) of
|
||||||
|
% [] -> send_last_pep(jid:make(U, S, R), JID);
|
||||||
|
% _ -> ok %% this is already handled by presence probe
|
||||||
|
% end;
|
||||||
|
% (_) ->
|
||||||
|
% ok %% we can not do anything in any cases
|
||||||
|
% end, Roster);
|
||||||
|
% true ->
|
||||||
|
% ok
|
||||||
|
% end.
|
||||||
|
send_last_pep(From, To) ->
|
||||||
|
?DEBUG("~s -> ~s", [jid:to_string(From), jid:to_string(To)]),
|
||||||
|
ServerHost = From#jid.lserver,
|
||||||
|
Host = host(ServerHost),
|
||||||
|
Publisher = jid:tolower(From),
|
||||||
|
Owner = jid:remove_resource(Publisher),
|
||||||
|
lists:foreach(
|
||||||
|
fun(#pubsub_node{nodeid = {_, Node}, type = Type, id = Nidx, options = Options}) ->
|
||||||
|
case match_option(Options, send_last_published_item, on_sub_and_presence) of
|
||||||
|
true ->
|
||||||
|
LJID = jid:tolower(To),
|
||||||
|
Subscribed = case get_option(Options, access_model) of
|
||||||
|
open -> true;
|
||||||
|
presence -> true;
|
||||||
|
whitelist -> false; % subscribers are added manually
|
||||||
|
authorize -> false; % likewise
|
||||||
|
roster ->
|
||||||
|
Grps = get_option(Options, roster_groups_allowed, []),
|
||||||
|
{OU, OS, _} = Owner,
|
||||||
|
element(2, get_roster_info(OU, OS, LJID, Grps))
|
||||||
|
end,
|
||||||
|
if Subscribed -> send_items(Owner, Node, Nidx, Type, Options, Publisher, LJID, LJID, 1);
|
||||||
|
true -> ok
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
tree_action(Host, get_nodes, [Owner, From])).
|
||||||
|
|
||||||
subscribed_nodes_by_jid(NotifyType, SubsByDepth) ->
|
subscribed_nodes_by_jid(NotifyType, SubsByDepth) ->
|
||||||
NodesToDeliver = fun (Depth, Node, Subs, Acc) ->
|
NodesToDeliver = fun (Depth, Node, Subs, Acc) ->
|
||||||
NodeName = case Node#pubsub_node.nodeid of
|
NodeName = case Node#pubsub_node.nodeid of
|
||||||
@ -3186,9 +3144,6 @@ node_owners_call(_Host, _Type, _Nidx, Owners) ->
|
|||||||
%% @doc <p>Return the maximum number of items for a given node.</p>
|
%% @doc <p>Return the maximum number of items for a given node.</p>
|
||||||
%% <p>Unlimited means that there is no limit in the number of items that can
|
%% <p>Unlimited means that there is no limit in the number of items that can
|
||||||
%% be stored.</p>
|
%% be stored.</p>
|
||||||
%% @todo In practice, the current data structure means that we cannot manage
|
|
||||||
%% millions of items on a given node. This should be addressed in a new
|
|
||||||
%% version.
|
|
||||||
-spec max_items(host(), [{atom(), any()}]) -> non_neg_integer().
|
-spec max_items(host(), [{atom(), any()}]) -> non_neg_integer().
|
||||||
max_items(Host, Options) ->
|
max_items(Host, Options) ->
|
||||||
case get_option(Options, persist_items) of
|
case get_option(Options, persist_items) of
|
||||||
@ -3792,14 +3747,6 @@ subid_shim(SubIds) ->
|
|||||||
extended_headers(Jids) ->
|
extended_headers(Jids) ->
|
||||||
[#address{type = replyto, jid = Jid} || Jid <- Jids].
|
[#address{type = replyto, jid = Jid} || Jid <- Jids].
|
||||||
|
|
||||||
-spec on_user_offline(ejabberd_sm:sid(), jid(), ejabberd_sm:info()) -> ok.
|
|
||||||
on_user_offline(_, JID, _) ->
|
|
||||||
{User, Server, Resource} = jid:tolower(JID),
|
|
||||||
case user_resources(User, Server) of
|
|
||||||
[] -> purge_offline({User, Server, Resource});
|
|
||||||
_ -> ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
-spec purge_offline(ljid()) -> ok.
|
-spec purge_offline(ljid()) -> ok.
|
||||||
purge_offline(LJID) ->
|
purge_offline(LJID) ->
|
||||||
Host = host(element(2, LJID)),
|
Host = host(element(2, LJID)),
|
||||||
@ -3840,7 +3787,7 @@ purge_offline(LJID) ->
|
|||||||
end
|
end
|
||||||
end, lists:usort(lists:flatten(Affs)));
|
end, lists:usort(lists:flatten(Affs)));
|
||||||
{Error, _} ->
|
{Error, _} ->
|
||||||
?DEBUG("on_user_offline ~p", [Error])
|
?ERROR_MSG("can not purge offline: ~p", [Error])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec purge_offline(host(), ljid(), binary()) -> ok | {error, stanza_error()}.
|
-spec purge_offline(host(), ljid(), binary()) -> ok | {error, stanza_error()}.
|
||||||
@ -3852,13 +3799,13 @@ purge_offline(Host, LJID, Node) ->
|
|||||||
{result, {[], _}} ->
|
{result, {[], _}} ->
|
||||||
ok;
|
ok;
|
||||||
{result, {Items, _}} ->
|
{result, {Items, _}} ->
|
||||||
{User, Server, _} = LJID,
|
{User, Server, Resource} = LJID,
|
||||||
PublishModel = get_option(Options, publish_model),
|
PublishModel = get_option(Options, publish_model),
|
||||||
ForceNotify = get_option(Options, notify_retract),
|
ForceNotify = get_option(Options, notify_retract),
|
||||||
{_, NodeId} = Node#pubsub_node.nodeid,
|
{_, NodeId} = Node#pubsub_node.nodeid,
|
||||||
lists:foreach(fun
|
lists:foreach(fun
|
||||||
(#pubsub_item{itemid = {ItemId, _}, modification = {_, {U, S, _}}})
|
(#pubsub_item{itemid = {ItemId, _}, modification = {_, {U, S, R}}})
|
||||||
when (U == User) and (S == Server) ->
|
when (U == User) and (S == Server) and (R == Resource) ->
|
||||||
case node_action(Host, Type, delete_item, [Nidx, {U, S, <<>>}, PublishModel, ItemId]) of
|
case node_action(Host, Type, delete_item, [Nidx, {U, S, <<>>}, PublishModel, ItemId]) of
|
||||||
{result, {_, broadcast}} ->
|
{result, {_, broadcast}} ->
|
||||||
broadcast_retract_items(Host, NodeId, Nidx, Type, Options, [ItemId], ForceNotify),
|
broadcast_retract_items(Host, NodeId, Nidx, Type, Options, [ItemId], ForceNotify),
|
||||||
|
Loading…
Reference in New Issue
Block a user