Improve mod_multicast

This commit is contained in:
Paweł Chmielowski 2018-04-04 12:05:37 +02:00
parent e2652ce02f
commit 45a3c7e0ce
1 changed files with 231 additions and 264 deletions

View File

@ -47,52 +47,36 @@
-include("translate.hrl"). -include("translate.hrl").
-include("xmpp.hrl"). -include("xmpp.hrl").
-record(state, -record(multicastc, {rserver :: binary(),
{lserver, lservice, access, service_limits}). response,
ts :: integer()}).
-record(dest, {jid_string :: binary() | none,
jid_jid :: xmpp:jid(),
type :: to | cc | bcc,
address :: address()}).
-type limit_value() :: {default | custom, integer()}.
-record(limits, {message :: limit_value(),
presence :: limit_value()}).
-record(service_limits, {local :: #limits{},
remote :: #limits{}}).
-type routing() :: route_single | {route_multicast, binary(), #service_limits{}}.
-record(group, {server :: binary(),
dests :: [#dest{}],
multicast :: routing(),
others :: [#address{}],
addresses :: [#address{}]}).
-record(state, {lserver :: binary(),
lservice :: binary(),
access :: atom(),
service_limits :: #service_limits{}}).
-type state() :: #state{}. -type state() :: #state{}.
-record(multicastc, {rserver, response, ts}).
%% ts: timestamp (in seconds) when the cache item was last updated
-record(dest, {jid_string = none :: binary(),
jid_jid :: jid(),
type :: atom(),
full_xml :: address()}).
%% jid_string = string()
%% jid_jid = jid()
%% full_xml = xml()
-record(group,
{server, dests, multicast, others, addresses}).
%% server = string()
%% dests = [string()]
%% multicast = {cached, local_server} | {cached, string()} | {cached, not_supported} | {obsolete, not_supported} | {obsolete, string()} | not_cached
%% after being updated, possible values are: local | multicast_not_supported | {multicast_supported, string(), limits()}
%% others = [xml()]
%% packet = xml()
-record(waiter,
{awaiting, group, renewal = false, sender, packet,
aattrs, addresses}).
%% awaiting = {[Remote_service], Local_service, Type_awaiting}
%% Remote_service = Local_service = string()
%% Type_awaiting = info | items
%% group = #group
%% renewal = true | false
%% sender = From
%% packet = xml()
%% aattrs = [xml()]
-record(limits, {message, presence}).
%% message = presence = integer() | infinite
-record(service_limits, {local, remote}).
%% All the elements are of type value() %% All the elements are of type value()
-define(VERSION_MULTICAST, <<"$Revision: 440 $ ">>). -define(VERSION_MULTICAST, <<"$Revision: 440 $ ">>).
@ -104,6 +88,8 @@
-define(MAXTIME_CACHE_NEGATIVE, 86400). -define(MAXTIME_CACHE_NEGATIVE, 86400).
-define(MAXTIME_CACHE_NEGOTIATING, 600).
-define(CACHE_PURGE_TIMER, 86400000). -define(CACHE_PURGE_TIMER, 86400000).
-define(DISCO_QUERY_TIMEOUT, 10000). -define(DISCO_QUERY_TIMEOUT, 10000).
@ -130,6 +116,7 @@ reload(LServerS, NewOpts, OldOpts) ->
%% gen_server callbacks %% gen_server callbacks
%%==================================================================== %%====================================================================
-spec init(list()) -> {ok, state()}.
init([LServerS, Opts]) -> init([LServerS, Opts]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
[LServiceS|_] = gen_mod:get_opt_hosts(LServerS, Opts), [LServiceS|_] = gen_mod:get_opt_hosts(LServerS, Opts),
@ -137,7 +124,6 @@ init([LServerS, Opts]) ->
SLimits = build_service_limit_record(gen_mod:get_opt(limits, Opts)), SLimits = build_service_limit_record(gen_mod:get_opt(limits, Opts)),
create_cache(), create_cache(),
try_start_loop(), try_start_loop(),
create_pool(),
ejabberd_router_multicast:register_route(LServerS), ejabberd_router_multicast:register_route(LServerS),
ejabberd_router:register_route(LServiceS, LServerS), ejabberd_router:register_route(LServiceS, LServerS),
{ok, {ok,
@ -277,21 +263,22 @@ iq_vcard(Lang) ->
%%% Route %%% Route
%%%------------------------- %%%-------------------------
-spec route_trusted(binary(), binary(), jid(), [jid()], stanza()) -> 'ok'.
route_trusted(LServiceS, LServerS, FromJID, route_trusted(LServiceS, LServerS, FromJID,
Destinations, Packet) -> Destinations, Packet) ->
Packet_stripped = Packet, Packet_stripped = Packet,
AAttrs = [],
Delivereds = [], Delivereds = [],
Dests2 = lists:map( Dests2 = lists:map(
fun(D) -> fun(D) ->
#dest{jid_string = jid:encode(D), #dest{jid_string = jid:encode(D),
jid_jid = D, type = bcc, jid_jid = D, type = bcc,
full_xml = #address{type = bcc, jid = D}} address = #address{type = bcc, jid = D}}
end, Destinations), end, Destinations),
Groups = group_dests(Dests2), Groups = group_dests(Dests2),
route_common(LServerS, LServiceS, FromJID, Groups, route_common(LServerS, LServiceS, FromJID, Groups,
Delivereds, Packet_stripped, AAttrs). Delivereds, Packet_stripped).
-spec route_untrusted(binary(), binary(), atom(), #service_limits{}, stanza()) -> 'ok'.
route_untrusted(LServiceS, LServerS, Access, SLimits, Packet) -> route_untrusted(LServiceS, LServerS, Access, SLimits, Packet) ->
try route_untrusted2(LServiceS, LServerS, Access, try route_untrusted2(LServiceS, LServerS, Access,
SLimits, Packet) SLimits, Packet)
@ -321,6 +308,7 @@ route_untrusted(LServiceS, LServerS, Access, SLimits, Packet) ->
<<"Unknown problem">>) <<"Unknown problem">>)
end. end.
-spec route_untrusted2(binary(), binary(), atom(), #service_limits{}, stanza()) -> 'ok'.
route_untrusted2(LServiceS, LServerS, Access, SLimits, Packet) -> route_untrusted2(LServiceS, LServerS, Access, SLimits, Packet) ->
FromJID = xmpp:get_from(Packet), FromJID = xmpp:get_from(Packet),
ok = check_access(LServerS, Access, FromJID), ok = check_access(LServerS, Access, FromJID),
@ -333,53 +321,40 @@ route_untrusted2(LServiceS, LServerS, Access, SLimits, Packet) ->
Groups = group_dests(Dests2), Groups = group_dests(Dests2),
ok = check_relay(FromJID#jid.server, LServerS, Groups), ok = check_relay(FromJID#jid.server, LServerS, Groups),
route_common(LServerS, LServiceS, FromJID, Groups, route_common(LServerS, LServiceS, FromJID, Groups,
Delivereds, Packet_stripped, []). Delivereds, Packet_stripped).
-spec route_common(binary(), binary(), jid(), [#group{}], -spec route_common(binary(), binary(), jid(), [#group{}],
[address()], stanza(), list()) -> any(). [address()], stanza()) -> 'ok'.
route_common(LServerS, LServiceS, FromJID, Groups, route_common(LServerS, LServiceS, FromJID, Groups,
Delivereds, Packet_stripped, AAttrs) -> Delivereds, Packet_stripped) ->
Groups2 = look_cached_servers(LServerS, Groups), Groups2 = look_cached_servers(LServerS, LServiceS, Groups),
Groups3 = build_others_xml(Groups2), Groups3 = build_others_xml(Groups2),
Groups4 = add_addresses(Delivereds, Groups3), Groups4 = add_addresses(Delivereds, Groups3),
AGroups = decide_action_groups(Groups4), AGroups = decide_action_groups(Groups4),
act_groups(FromJID, Packet_stripped, AAttrs, LServiceS, act_groups(FromJID, Packet_stripped, LServiceS,
AGroups). AGroups).
act_groups(FromJID, Packet_stripped, AAttrs, LServiceS, -spec act_groups(jid(), stanza(), binary(), [{routing(), #group{}}]) -> 'ok'.
AGroups) -> act_groups(FromJID, Packet_stripped, LServiceS, AGroups) ->
[perform(FromJID, Packet_stripped, AAttrs, LServiceS, lists:foreach(
AGroup) fun(AGroup) ->
|| AGroup <- AGroups]. perform(FromJID, Packet_stripped, LServiceS,
AGroup)
end, AGroups).
perform(From, Packet, AAttrs, _, -spec perform(jid(), stanza(), binary(),
{routing(), #group{}}) -> 'ok'.
perform(From, Packet, _,
{route_single, Group}) -> {route_single, Group}) ->
[route_packet(From, ToUser, Packet, AAttrs, lists:foreach(
Group#group.others, Group#group.addresses) fun(ToUser) ->
|| ToUser <- Group#group.dests]; route_packet(From, ToUser, Packet,
perform(From, Packet, AAttrs, _, Group#group.others, Group#group.addresses)
end, Group#group.dests);
perform(From, Packet, _,
{{route_multicast, JID, RLimits}, Group}) -> {{route_multicast, JID, RLimits}, Group}) ->
route_packet_multicast(From, JID, Packet, AAttrs, route_packet_multicast(From, JID, Packet,
Group#group.dests, Group#group.addresses, RLimits); Group#group.dests, Group#group.addresses, RLimits).
perform(From, Packet, AAttrs, LServiceS,
{{ask, Old_service, renewal}, Group}) ->
send_query_info(Old_service, LServiceS),
add_waiter(#waiter{awaiting =
{[Old_service], LServiceS, info},
group = Group, renewal = true, sender = From,
packet = Packet, aattrs = AAttrs,
addresses = Group#group.addresses});
perform(_From, _Packet, _AAttrs, LServiceS,
{{ask, LServiceS, _}, _Group}) ->
ok;
perform(From, Packet, AAttrs, LServiceS,
{{ask, Server, not_renewal}, Group}) ->
send_query_info(Server, LServiceS),
add_waiter(#waiter{awaiting =
{[Server], LServiceS, info},
group = Group, renewal = false, sender = From,
packet = Packet, aattrs = AAttrs,
addresses = Group#group.addresses}).
%%%------------------------- %%%-------------------------
%%% Check access permission %%% Check access permission
@ -427,7 +402,7 @@ split_addresses_todeliver(Addresses) ->
%%% Check does not exceed limit of destinations %%% Check does not exceed limit of destinations
%%%------------------------- %%%-------------------------
-spec check_limit_dests(_, jid(), stanza(), [address()]) -> ok. -spec check_limit_dests(#service_limits{}, jid(), stanza(), [address()]) -> ok.
check_limit_dests(SLimits, FromJID, Packet, check_limit_dests(SLimits, FromJID, Packet,
Addresses) -> Addresses) ->
SenderT = sender_type(FromJID), SenderT = sender_type(FromJID),
@ -448,10 +423,10 @@ check_limit_dests(SLimits, FromJID, Packet,
convert_dest_record(Addrs) -> convert_dest_record(Addrs) ->
lists:map( lists:map(
fun(#address{jid = undefined} = Addr) -> fun(#address{jid = undefined} = Addr) ->
#dest{jid_string = none, full_xml = Addr}; #dest{jid_string = none, address = Addr};
(#address{jid = JID, type = Type} = Addr) -> (#address{jid = JID, type = Type} = Addr) ->
#dest{jid_string = jid:encode(JID), jid_jid = JID, #dest{jid_string = jid:encode(JID), jid_jid = JID,
type = Type, full_xml = Addr} type = Type, address = Addr}
end, Addrs). end, Addrs).
%%%------------------------- %%%-------------------------
@ -469,9 +444,9 @@ split_dests_jid(Dests) ->
end, end,
Dests). Dests).
-spec report_not_jid(jid(), stanza(), #dest{}) -> any(). -spec report_not_jid(jid(), stanza(), [#dest{}]) -> any().
report_not_jid(From, Packet, Dests) -> report_not_jid(From, Packet, Dests) ->
Dests2 = [fxml:element_to_binary(xmpp:encode(Dest#dest.full_xml)) Dests2 = [fxml:element_to_binary(xmpp:encode(Dest#dest.address))
|| Dest <- Dests], || Dest <- Dests],
[route_error(xmpp:set_from_to(Packet, From, From), jid_malformed, [route_error(xmpp:set_from_to(Packet, From, From), jid_malformed,
<<"This service can not process the address: ", <<"This service can not process the address: ",
@ -497,14 +472,14 @@ group_dests(Dests) ->
%%% Look for cached responses %%% Look for cached responses
%%%------------------------- %%%-------------------------
look_cached_servers(LServerS, Groups) -> look_cached_servers(LServerS, LServiceS, Groups) ->
[look_cached(LServerS, Group) || Group <- Groups]. [look_cached(LServerS, LServiceS, Group) || Group <- Groups].
look_cached(LServerS, G) -> look_cached(LServerS, LServiceS, G) ->
Maxtime_positive = (?MAXTIME_CACHE_POSITIVE), Maxtime_positive = (?MAXTIME_CACHE_POSITIVE),
Maxtime_negative = (?MAXTIME_CACHE_NEGATIVE), Maxtime_negative = (?MAXTIME_CACHE_NEGATIVE),
Cached_response = search_server_on_cache(G#group.server, Cached_response = search_server_on_cache(G#group.server,
LServerS, LServerS, LServiceS,
{Maxtime_positive, {Maxtime_positive,
Maxtime_negative}), Maxtime_negative}),
G#group{multicast = Cached_response}. G#group{multicast = Cached_response}.
@ -520,7 +495,7 @@ build_others_xml(Groups) ->
build_other_xml(Dests) -> build_other_xml(Dests) ->
lists:foldl(fun (Dest, R) -> lists:foldl(fun (Dest, R) ->
XML = Dest#dest.full_xml, XML = Dest#dest.address,
case Dest#dest.type of case Dest#dest.type of
to -> [add_delivered(XML) | R]; to -> [add_delivered(XML) | R];
cc -> [add_delivered(XML) | R]; cc -> [add_delivered(XML) | R];
@ -554,53 +529,38 @@ add_addresses2(Delivereds, [Group | Groups], Res, Pa,
%%% Decide action groups %%% Decide action groups
%%%------------------------- %%%-------------------------
-spec decide_action_groups([#group{}]) -> [{routing(), #group{}}].
decide_action_groups(Groups) -> decide_action_groups(Groups) ->
[{decide_action_group(Group), Group} [{Group#group.multicast, Group}
|| Group <- Groups]. || Group <- Groups].
decide_action_group(Group) ->
Server = Group#group.server,
case Group#group.multicast of
{cached, local_server} ->
%% Send a copy of the packet to each local user on Dests
route_single;
{cached, not_supported} ->
%% Send a copy of the packet to each remote user on Dests
route_single;
{cached, {multicast_supported, JID, RLimits}} ->
{route_multicast, JID, RLimits};
{obsolete,
{multicast_supported, Old_service, _RLimits}} ->
{ask, Old_service, renewal};
{obsolete, not_supported} -> {ask, Server, not_renewal};
not_cached -> {ask, Server, not_renewal}
end.
%%%------------------------- %%%-------------------------
%%% Route packet %%% Route packet
%%%------------------------- %%%-------------------------
route_packet(From, ToDest, Packet, AAttrs, Others, Addresses) -> -spec route_packet(jid(), #dest{}, xmpp:stanza(), [addresses()], [addresses()]) -> 'ok'.
route_packet(From, ToDest, Packet, Others, Addresses) ->
Dests = case ToDest#dest.type of Dests = case ToDest#dest.type of
bcc -> []; bcc -> [];
_ -> [ToDest] _ -> [ToDest]
end, end,
route_packet2(From, ToDest#dest.jid_string, Dests, route_packet2(From, ToDest#dest.jid_string, Dests,
Packet, AAttrs, {Others, Addresses}). Packet, {Others, Addresses}).
route_packet_multicast(From, ToS, Packet, AAttrs, Dests, -spec route_packet_multicast(jid(), binary(), xmpp:stanza(), [#dest{}], [address()], #limits{}) -> 'ok'.
route_packet_multicast(From, ToS, Packet, Dests,
Addresses, Limits) -> Addresses, Limits) ->
Type_of_stanza = type_of_stanza(Packet), Type_of_stanza = type_of_stanza(Packet),
{_Type, Limit_number} = get_limit_number(Type_of_stanza, {_Type, Limit_number} = get_limit_number(Type_of_stanza,
Limits), Limits),
Fragmented_dests = fragment_dests(Dests, Limit_number), Fragmented_dests = fragment_dests(Dests, Limit_number),
[route_packet2(From, ToS, DFragment, Packet, AAttrs, lists:foreach(fun(DFragment) ->
Addresses) route_packet2(From, ToS, DFragment, Packet,
|| DFragment <- Fragmented_dests]. Addresses)
end, Fragmented_dests).
-spec route_packet2(jid(), binary(), [#dest{}], stanza(), list(), [address()]) -> ok. -spec route_packet2(jid(), binary(), [#dest{}], xmpp:stanza(), {[address()], [address()]} | [address()]) -> 'ok'.
route_packet2(From, ToS, Dests, Packet, _AAttrs, route_packet2(From, ToS, Dests, Packet, Addresses) ->
Addresses) ->
Els = case append_dests(Dests, Addresses) of Els = case append_dests(Dests, Addresses) of
[] -> [] ->
xmpp:get_els(Packet); xmpp:get_els(Packet);
@ -613,10 +573,10 @@ route_packet2(From, ToS, Dests, Packet, _AAttrs,
-spec append_dests([#dest{}], {[address()], [address()]} | [address()]) -> [address()]. -spec append_dests([#dest{}], {[address()], [address()]} | [address()]) -> [address()].
append_dests(_Dests, {Others, Addresses}) -> append_dests(_Dests, {Others, Addresses}) ->
Addresses++Others; Addresses ++ Others;
append_dests([], Addresses) -> Addresses; append_dests([], Addresses) -> Addresses;
append_dests([Dest | Dests], Addresses) -> append_dests([Dest | Dests], Addresses) ->
append_dests(Dests, [Dest#dest.full_xml | Addresses]). append_dests(Dests, [Dest#dest.address | Addresses]).
%%%------------------------- %%%-------------------------
%%% Check relay %%% Check relay
@ -647,20 +607,22 @@ check_relay_required(LServerS, Groups) ->
%%% Check protocol support: Send request %%% Check protocol support: Send request
%%%------------------------- %%%-------------------------
send_query_info(RServerS, LServiceS) -> -spec send_query_info(binary(), binary(), binary()) -> ok.
send_query_info(RServerS, LServiceS, ID) ->
case str:str(RServerS, <<"echo.">>) of case str:str(RServerS, <<"echo.">>) of
1 -> false; 1 -> ok;
_ -> send_query(RServerS, LServiceS, #disco_info{}) _ -> send_query(RServerS, LServiceS, ID, #disco_info{})
end. end.
send_query_items(RServerS, LServiceS) -> -spec send_query_items(binary(), binary(), binary()) -> ok.
send_query(RServerS, LServiceS, #disco_items{}). send_query_items(RServerS, LServiceS, ID) ->
send_query(RServerS, LServiceS, ID, #disco_items{}).
-spec send_query(binary(), binary(), [disco_info()|disco_items()]) -> ok. -spec send_query(binary(), binary(), binary(), disco_info()|disco_items()) -> ok.
send_query(RServerS, LServiceS, SubEl) -> send_query(RServerS, LServiceS, ID, SubEl) ->
Packet = #iq{from = stj(LServiceS), Packet = #iq{from = stj(LServiceS),
to = stj(RServerS), to = stj(RServerS),
id = randoms:get_string(), id = ID,
type = get, sub_els = [SubEl]}, type = get, sub_els = [SubEl]},
ejabberd_router:route(Packet). ejabberd_router:route(Packet).
@ -670,10 +632,31 @@ send_query(RServerS, LServiceS, SubEl) ->
process_iqreply_error(LServiceS, Packet) -> process_iqreply_error(LServiceS, Packet) ->
FromS = jts(xmpp:get_from(Packet)), FromS = jts(xmpp:get_from(Packet)),
case search_waiter(FromS, LServiceS, info) of ID = Packet#iq.id,
{found_waiter, Waiter} -> case str:tokens(ID, <<"/">>) of
received_awaiter(FromS, Waiter, LServiceS); [RServer, _] ->
_ -> ok case look_server(RServer) of
{cached, {_Response, {wait_for_info, ID}}, _TS}
when RServer == FromS ->
add_response(RServer, not_supported, cached);
{cached, {_Response, {wait_for_items, ID}}, _TS}
when RServer == FromS ->
add_response(RServer, not_supported, cached);
{cached, {Response, {wait_for_items_info, ID, Items}},
_TS} ->
case lists:member(FromS, Items) of
true ->
received_awaiter(
FromS, RServer, Response, ID, Items,
LServiceS);
false ->
ok
end;
_ ->
ok
end;
_ ->
ok
end. end.
%%%------------------------- %%%-------------------------
@ -681,12 +664,12 @@ process_iqreply_error(LServiceS, Packet) ->
%%%------------------------- %%%-------------------------
-spec process_iqreply_result(binary(), iq()) -> any(). -spec process_iqreply_result(binary(), iq()) -> any().
process_iqreply_result(LServiceS, #iq{from = From, sub_els = [SubEl]}) -> process_iqreply_result(LServiceS, #iq{from = From, id = ID, sub_els = [SubEl]}) ->
case SubEl of case SubEl of
#disco_info{} -> #disco_info{} ->
process_discoinfo_result(From, LServiceS, SubEl); process_discoinfo_result(From, LServiceS, ID, SubEl);
#disco_items{} -> #disco_items{} ->
process_discoitems_result(From, LServiceS, SubEl); process_discoitems_result(From, LServiceS, ID, SubEl);
_ -> _ ->
ok ok
end. end.
@ -695,46 +678,53 @@ process_iqreply_result(LServiceS, #iq{from = From, sub_els = [SubEl]}) ->
%%% Check protocol support: Receive response: Disco Info %%% Check protocol support: Receive response: Disco Info
%%%------------------------- %%%-------------------------
process_discoinfo_result(From, LServiceS, DiscoInfo) -> process_discoinfo_result(From, LServiceS, ID, DiscoInfo) ->
FromS = jts(From), FromS = jts(From),
case search_waiter(FromS, LServiceS, info) of case str:tokens(ID, <<"/">>) of
{found_waiter, Waiter} -> [RServer, _] ->
process_discoinfo_result2(From, FromS, LServiceS, DiscoInfo, case look_server(RServer) of
Waiter); {cached, {Response, {wait_for_info, ID} = ST}, _TS}
_ -> ok when RServer == FromS ->
process_discoinfo_result2(
From, FromS, LServiceS, DiscoInfo,
RServer, Response, ST);
{cached, {Response, {wait_for_items_info, ID, Items} = ST},
_TS} ->
case lists:member(FromS, Items) of
true ->
process_discoinfo_result2(
From, FromS, LServiceS, DiscoInfo,
RServer, Response, ST);
false ->
ok
end;
_ ->
ok
end;
_ ->
ok
end. end.
process_discoinfo_result2(From, FromS, LServiceS, process_discoinfo_result2(From, FromS, LServiceS,
#disco_info{features = Feats} = DiscoInfo, #disco_info{features = Feats} = DiscoInfo,
Waiter) -> RServer, Response, ST) ->
Multicast_support = lists:member(?NS_ADDRESS, Feats), Multicast_support = lists:member(?NS_ADDRESS, Feats),
Group = Waiter#waiter.group,
RServer = Group#group.server,
case Multicast_support of case Multicast_support of
true -> true ->
SenderT = sender_type(From), SenderT = sender_type(From),
RLimits = get_limits_xml(DiscoInfo, SenderT), RLimits = get_limits_xml(DiscoInfo, SenderT),
add_response(RServer, {multicast_supported, FromS, RLimits}), add_response(RServer, {multicast_supported, FromS, RLimits}, cached);
FromM = Waiter#waiter.sender,
DestsM = Group#group.dests,
PacketM = Waiter#waiter.packet,
AAttrsM = Waiter#waiter.aattrs,
AddressesM = Waiter#waiter.addresses,
RServiceM = FromS,
route_packet_multicast(FromM, RServiceM, PacketM,
AAttrsM, DestsM, AddressesM, RLimits),
delo_waiter(Waiter);
false -> false ->
case FromS of case ST of
RServer -> {wait_for_info, _ID} ->
send_query_items(FromS, LServiceS), Random = randoms:get_string(),
delo_waiter(Waiter), ID = <<RServer/binary, $/, Random/binary>>,
add_waiter(Waiter#waiter{awaiting = send_query_items(FromS, LServiceS, ID),
{[FromS], LServiceS, items}, add_response(RServer, Response, {wait_for_items, ID});
renewal = false}); %% We asked a component, and it does not support XEP33
%% We asked a component, and it does not support XEP33 {wait_for_items_info, ID, Items} ->
_ -> received_awaiter(FromS, Waiter, LServiceS) received_awaiter(FromS, RServer, Response, ID, Items, LServiceS)
end end
end. end.
get_limits_xml(DiscoInfo, SenderT) -> get_limits_xml(DiscoInfo, SenderT) ->
@ -778,27 +768,32 @@ get_limits_values(Fields) ->
%%% Check protocol support: Receive response: Disco Items %%% Check protocol support: Receive response: Disco Items
%%%------------------------- %%%-------------------------
process_discoitems_result(From, LServiceS, #disco_items{items = Items}) -> process_discoitems_result(From, LServiceS, ID, #disco_items{items = Items}) ->
FromS = jts(From), FromS = jts(From),
case search_waiter(FromS, LServiceS, items) of case str:tokens(ID, <<"/">>) of
{found_waiter, Waiter} -> [FromS = RServer, _] ->
List = lists:flatmap( case look_server(RServer) of
fun(#disco_item{jid = #jid{luser = <<"">>, {cached, {Response, {wait_for_items, ID}}, _TS} ->
lserver = LServer, List = lists:flatmap(
lresource = <<"">>}}) -> fun(#disco_item{jid = #jid{luser = <<"">>,
[LServer]; lserver = LServer,
(_) -> lresource = <<"">>}}) ->
[] [LServer];
end, Items), (_) ->
case List of []
[] -> end, Items),
received_awaiter(FromS, Waiter, LServiceS); case List of
[] ->
add_response(RServer, not_supported, cached);
_ ->
Random = randoms:get_string(),
ID2 = <<RServer/binary, $/, Random/binary>>,
[send_query_info(Item, LServiceS, ID2) || Item <- List],
add_response(RServer, Response,
{wait_for_items_info, ID2, List})
end;
_ -> _ ->
[send_query_info(Item, LServiceS) || Item <- List], ok
delo_waiter(Waiter),
add_waiter(Waiter#waiter{awaiting =
{List, LServiceS, info},
renewal = false})
end; end;
_ -> _ ->
ok ok
@ -808,33 +803,12 @@ process_discoitems_result(From, LServiceS, #disco_items{items = Items}) ->
%%% Check protocol support: Receive response: Received awaiter %%% Check protocol support: Receive response: Received awaiter
%%%------------------------- %%%-------------------------
received_awaiter(JID, Waiter, LServiceS) -> received_awaiter(JID, RServer, Response, ID, JIDs, _LServiceS) ->
{JIDs, LServiceS, _} = Waiter#waiter.awaiting,
delo_waiter(Waiter),
Group = Waiter#waiter.group,
RServer = Group#group.server,
case lists:delete(JID, JIDs) of case lists:delete(JID, JIDs) of
[] -> [] ->
case Waiter#waiter.renewal of add_response(RServer, not_supported, cached);
false -> JIDs2 ->
add_response(RServer, not_supported), add_response(RServer, Response, {wait_for_items_info, ID, JIDs2})
From = Waiter#waiter.sender,
Packet = Waiter#waiter.packet,
AAttrs = Waiter#waiter.aattrs,
Others = Group#group.others,
Addresses = Waiter#waiter.addresses,
[route_packet(From, ToUser, Packet, AAttrs, Others, Addresses)
|| ToUser <- Group#group.dests];
true ->
send_query_info(RServer, LServiceS),
add_waiter(Waiter#waiter{awaiting =
{[RServer], LServiceS, info},
renewal = false})
end;
JIDs2 ->
add_waiter(Waiter#waiter{awaiting =
{JIDs2, LServiceS, info},
renewal = false})
end. end.
%%%------------------------- %%%-------------------------
@ -846,25 +820,52 @@ create_cache() ->
[{ram_copies, [node()]}, [{ram_copies, [node()]},
{attributes, record_info(fields, multicastc)}]). {attributes, record_info(fields, multicastc)}]).
add_response(RServer, Response) -> add_response(RServer, Response, State) ->
Secs = calendar:datetime_to_gregorian_seconds(calendar:local_time()), Secs = calendar:datetime_to_gregorian_seconds(calendar:local_time()),
mnesia:dirty_write(#multicastc{rserver = RServer, mnesia:dirty_write(#multicastc{rserver = RServer,
response = Response, ts = Secs}). response = {Response, State}, ts = Secs}).
search_server_on_cache(RServer, LServerS, _Maxmins) search_server_on_cache(RServer, LServerS, _LServiceS, _Maxmins)
when RServer == LServerS -> when RServer == LServerS ->
{cached, local_server}; route_single;
search_server_on_cache(RServer, _LServerS, Maxmins) -> search_server_on_cache(RServer, _LServerS, LServiceS, Maxmins) ->
case look_server(RServer) of case look_server(RServer) of
not_cached -> not_cached; not_cached ->
{cached, Response, Ts} -> query_info(RServer, LServiceS, not_supported),
Now = calendar:datetime_to_gregorian_seconds(calendar:local_time()), route_single;
case is_obsolete(Response, Ts, Now, Maxmins) of {cached, {Response, State}, TS} ->
false -> {cached, Response}; Now = calendar:datetime_to_gregorian_seconds(calendar:local_time()),
true -> {obsolete, Response} Response2 =
end case State of
cached ->
case is_obsolete(Response, TS, Now, Maxmins) of
false -> ok;
true ->
query_info(RServer, LServiceS, Response)
end,
Response;
_ ->
if
Now - TS > ?MAXTIME_CACHE_NEGOTIATING ->
query_info(RServer, LServiceS, not_supported),
not_supported;
true ->
Response
end
end,
case Response2 of
not_supported -> route_single;
{multicast_supported, Service, Limits} ->
{route_multicast, Service, Limits}
end
end. end.
query_info(RServer, LServiceS, Response) ->
Random = randoms:get_string(),
ID = <<RServer/binary, $/, Random/binary>>,
send_query_info(RServer, LServiceS, ID),
add_response(RServer, Response, {wait_for_info, ID}).
look_server(RServer) -> look_server(RServer) ->
case mnesia:dirty_read(multicastc, RServer) of case mnesia:dirty_read(multicastc, RServer) of
[] -> not_cached; [] -> not_cached;
@ -935,44 +936,6 @@ purge_loop(NM) ->
try_stop -> purge_loop_finished try_stop -> purge_loop_finished
end. end.
%%%-------------------------
%%% Pool
%%%-------------------------
create_pool() ->
catch
begin
ets:new(multicastp,
[duplicate_bag, public, named_table, {keypos, 2}]),
ets:give_away(multicastp, whereis(ejabberd), ok)
end.
add_waiter(Waiter) ->
true = ets:insert(multicastp, Waiter).
delo_waiter(Waiter) ->
true = ets:delete_object(multicastp, Waiter).
-spec search_waiter(binary(), binary(), info | items) ->
{found_waiter, #waiter{}} | waiter_not_found.
search_waiter(JID, LServiceS, Type) ->
Rs = ets:foldl(fun (W, Res) ->
{JIDs, LServiceS1, Type1} = W#waiter.awaiting,
case lists:member(JID, JIDs) and
(LServiceS == LServiceS1)
and (Type1 == Type)
of
true -> Res ++ [W];
false -> Res
end
end,
[], multicastp),
case Rs of
[R | _] -> {found_waiter, R};
[] -> waiter_not_found
end.
%%%------------------------- %%%-------------------------
%%% Limits: utils %%% Limits: utils
%%%------------------------- %%%-------------------------
@ -1006,11 +969,13 @@ get_from_limitopts(LimitOpts, SenderT) ->
build_remote_limit_record(LimitOpts, SenderT) -> build_remote_limit_record(LimitOpts, SenderT) ->
build_limit_record(LimitOpts, SenderT). build_limit_record(LimitOpts, SenderT).
-spec build_limit_record(any(), local | remote) -> #limits{}.
build_limit_record(LimitOpts, SenderT) -> build_limit_record(LimitOpts, SenderT) ->
Limits = [get_limit_value(Name, Default, LimitOpts) Limits = [get_limit_value(Name, Default, LimitOpts)
|| {Name, Default} <- list_of_limits(SenderT)], || {Name, Default} <- list_of_limits(SenderT)],
list_to_tuple([limits | Limits]). list_to_tuple([limits | Limits]).
-spec get_limit_value(atom(), integer(), any()) -> limit_value().
get_limit_value(Name, Default, LimitOpts) -> get_limit_value(Name, Default, LimitOpts) ->
case lists:keysearch(Name, 1, LimitOpts) of case lists:keysearch(Name, 1, LimitOpts) of
{value, {Name, Number}} -> {custom, Number}; {value, {Name, Number}} -> {custom, Number};
@ -1019,11 +984,13 @@ get_limit_value(Name, Default, LimitOpts) ->
type_of_stanza(Stanza) -> element(1, Stanza). type_of_stanza(Stanza) -> element(1, Stanza).
-spec get_limit_number(message | presence, #limits{}) -> limit_value().
get_limit_number(message, Limits) -> get_limit_number(message, Limits) ->
Limits#limits.message; Limits#limits.message;
get_limit_number(presence, Limits) -> get_limit_number(presence, Limits) ->
Limits#limits.presence. Limits#limits.presence.
-spec get_slimit_group(local | remote, #service_limits{}) -> #limits{}.
get_slimit_group(local, SLimits) -> get_slimit_group(local, SLimits) ->
SLimits#service_limits.local; SLimits#service_limits.local;
get_slimit_group(remote, SLimits) -> get_slimit_group(remote, SLimits) ->