diff --git a/src/mod_multicast.erl b/src/mod_multicast.erl index 9f02995fe..846049a55 100644 --- a/src/mod_multicast.erl +++ b/src/mod_multicast.erl @@ -47,52 +47,36 @@ -include("translate.hrl"). -include("xmpp.hrl"). --record(state, - {lserver, lservice, access, service_limits}). +-record(multicastc, {rserver :: binary(), + response, + ts :: integer()}). + +-record(dest, {jid_string :: binary() | none, + jid_jid :: xmpp:jid(), + type :: to | cc | bcc, + address :: address()}). + +-type limit_value() :: {default | custom, integer()}. +-record(limits, {message :: limit_value(), + presence :: limit_value()}). + +-record(service_limits, {local :: #limits{}, + remote :: #limits{}}). + +-type routing() :: route_single | {route_multicast, binary(), #service_limits{}}. + +-record(group, {server :: binary(), + dests :: [#dest{}], + multicast :: routing(), + others :: [#address{}], + addresses :: [#address{}]}). + +-record(state, {lserver :: binary(), + lservice :: binary(), + access :: atom(), + service_limits :: #service_limits{}}). -type state() :: #state{}. --record(multicastc, {rserver, response, ts}). - -%% ts: timestamp (in seconds) when the cache item was last updated - --record(dest, {jid_string = none :: binary(), - jid_jid :: jid(), - type :: atom(), - full_xml :: address()}). - -%% jid_string = string() -%% jid_jid = jid() -%% full_xml = xml() - --record(group, - {server, dests, multicast, others, addresses}). - -%% server = string() -%% dests = [string()] -%% multicast = {cached, local_server} | {cached, string()} | {cached, not_supported} | {obsolete, not_supported} | {obsolete, string()} | not_cached -%% after being updated, possible values are: local | multicast_not_supported | {multicast_supported, string(), limits()} -%% others = [xml()] -%% packet = xml() - --record(waiter, - {awaiting, group, renewal = false, sender, packet, - aattrs, addresses}). - -%% awaiting = {[Remote_service], Local_service, Type_awaiting} -%% Remote_service = Local_service = string() -%% Type_awaiting = info | items -%% group = #group -%% renewal = true | false -%% sender = From -%% packet = xml() -%% aattrs = [xml()] - --record(limits, {message, presence}). - -%% message = presence = integer() | infinite - --record(service_limits, {local, remote}). - %% All the elements are of type value() -define(VERSION_MULTICAST, <<"$Revision: 440 $ ">>). @@ -104,6 +88,8 @@ -define(MAXTIME_CACHE_NEGATIVE, 86400). +-define(MAXTIME_CACHE_NEGOTIATING, 600). + -define(CACHE_PURGE_TIMER, 86400000). -define(DISCO_QUERY_TIMEOUT, 10000). @@ -130,6 +116,7 @@ reload(LServerS, NewOpts, OldOpts) -> %% gen_server callbacks %%==================================================================== +-spec init(list()) -> {ok, state()}. init([LServerS, Opts]) -> process_flag(trap_exit, true), [LServiceS|_] = gen_mod:get_opt_hosts(LServerS, Opts), @@ -137,7 +124,6 @@ init([LServerS, Opts]) -> SLimits = build_service_limit_record(gen_mod:get_opt(limits, Opts)), create_cache(), try_start_loop(), - create_pool(), ejabberd_router_multicast:register_route(LServerS), ejabberd_router:register_route(LServiceS, LServerS), {ok, @@ -277,21 +263,22 @@ iq_vcard(Lang) -> %%% Route %%%------------------------- +-spec route_trusted(binary(), binary(), jid(), [jid()], stanza()) -> 'ok'. route_trusted(LServiceS, LServerS, FromJID, Destinations, Packet) -> Packet_stripped = Packet, - AAttrs = [], Delivereds = [], Dests2 = lists:map( fun(D) -> #dest{jid_string = jid:encode(D), - jid_jid = D, type = bcc, - full_xml = #address{type = bcc, jid = D}} + jid_jid = D, type = bcc, + address = #address{type = bcc, jid = D}} end, Destinations), Groups = group_dests(Dests2), route_common(LServerS, LServiceS, FromJID, Groups, - Delivereds, Packet_stripped, AAttrs). + Delivereds, Packet_stripped). +-spec route_untrusted(binary(), binary(), atom(), #service_limits{}, stanza()) -> 'ok'. route_untrusted(LServiceS, LServerS, Access, SLimits, Packet) -> try route_untrusted2(LServiceS, LServerS, Access, SLimits, Packet) @@ -321,6 +308,7 @@ route_untrusted(LServiceS, LServerS, Access, SLimits, Packet) -> <<"Unknown problem">>) end. +-spec route_untrusted2(binary(), binary(), atom(), #service_limits{}, stanza()) -> 'ok'. route_untrusted2(LServiceS, LServerS, Access, SLimits, Packet) -> FromJID = xmpp:get_from(Packet), ok = check_access(LServerS, Access, FromJID), @@ -333,53 +321,40 @@ route_untrusted2(LServiceS, LServerS, Access, SLimits, Packet) -> Groups = group_dests(Dests2), ok = check_relay(FromJID#jid.server, LServerS, Groups), route_common(LServerS, LServiceS, FromJID, Groups, - Delivereds, Packet_stripped, []). + Delivereds, Packet_stripped). -spec route_common(binary(), binary(), jid(), [#group{}], - [address()], stanza(), list()) -> any(). + [address()], stanza()) -> 'ok'. route_common(LServerS, LServiceS, FromJID, Groups, - Delivereds, Packet_stripped, AAttrs) -> - Groups2 = look_cached_servers(LServerS, Groups), + Delivereds, Packet_stripped) -> + Groups2 = look_cached_servers(LServerS, LServiceS, Groups), Groups3 = build_others_xml(Groups2), Groups4 = add_addresses(Delivereds, Groups3), AGroups = decide_action_groups(Groups4), - act_groups(FromJID, Packet_stripped, AAttrs, LServiceS, + act_groups(FromJID, Packet_stripped, LServiceS, AGroups). -act_groups(FromJID, Packet_stripped, AAttrs, LServiceS, - AGroups) -> - [perform(FromJID, Packet_stripped, AAttrs, LServiceS, - AGroup) - || AGroup <- AGroups]. +-spec act_groups(jid(), stanza(), binary(), [{routing(), #group{}}]) -> 'ok'. +act_groups(FromJID, Packet_stripped, LServiceS, AGroups) -> + lists:foreach( + fun(AGroup) -> + perform(FromJID, Packet_stripped, LServiceS, + AGroup) + end, AGroups). -perform(From, Packet, AAttrs, _, +-spec perform(jid(), stanza(), binary(), + {routing(), #group{}}) -> 'ok'. +perform(From, Packet, _, {route_single, Group}) -> - [route_packet(From, ToUser, Packet, AAttrs, - Group#group.others, Group#group.addresses) - || ToUser <- Group#group.dests]; -perform(From, Packet, AAttrs, _, + lists:foreach( + fun(ToUser) -> + route_packet(From, ToUser, Packet, + Group#group.others, Group#group.addresses) + end, Group#group.dests); +perform(From, Packet, _, {{route_multicast, JID, RLimits}, Group}) -> - route_packet_multicast(From, JID, Packet, AAttrs, - Group#group.dests, Group#group.addresses, RLimits); -perform(From, Packet, AAttrs, LServiceS, - {{ask, Old_service, renewal}, Group}) -> - send_query_info(Old_service, LServiceS), - add_waiter(#waiter{awaiting = - {[Old_service], LServiceS, info}, - group = Group, renewal = true, sender = From, - packet = Packet, aattrs = AAttrs, - addresses = Group#group.addresses}); -perform(_From, _Packet, _AAttrs, LServiceS, - {{ask, LServiceS, _}, _Group}) -> - ok; -perform(From, Packet, AAttrs, LServiceS, - {{ask, Server, not_renewal}, Group}) -> - send_query_info(Server, LServiceS), - add_waiter(#waiter{awaiting = - {[Server], LServiceS, info}, - group = Group, renewal = false, sender = From, - packet = Packet, aattrs = AAttrs, - addresses = Group#group.addresses}). + route_packet_multicast(From, JID, Packet, + Group#group.dests, Group#group.addresses, RLimits). %%%------------------------- %%% Check access permission @@ -427,7 +402,7 @@ split_addresses_todeliver(Addresses) -> %%% Check does not exceed limit of destinations %%%------------------------- --spec check_limit_dests(_, jid(), stanza(), [address()]) -> ok. +-spec check_limit_dests(#service_limits{}, jid(), stanza(), [address()]) -> ok. check_limit_dests(SLimits, FromJID, Packet, Addresses) -> SenderT = sender_type(FromJID), @@ -448,10 +423,10 @@ check_limit_dests(SLimits, FromJID, Packet, convert_dest_record(Addrs) -> lists:map( fun(#address{jid = undefined} = Addr) -> - #dest{jid_string = none, full_xml = Addr}; + #dest{jid_string = none, address = Addr}; (#address{jid = JID, type = Type} = Addr) -> #dest{jid_string = jid:encode(JID), jid_jid = JID, - type = Type, full_xml = Addr} + type = Type, address = Addr} end, Addrs). %%%------------------------- @@ -469,9 +444,9 @@ split_dests_jid(Dests) -> end, Dests). --spec report_not_jid(jid(), stanza(), #dest{}) -> any(). +-spec report_not_jid(jid(), stanza(), [#dest{}]) -> any(). report_not_jid(From, Packet, Dests) -> - Dests2 = [fxml:element_to_binary(xmpp:encode(Dest#dest.full_xml)) + Dests2 = [fxml:element_to_binary(xmpp:encode(Dest#dest.address)) || Dest <- Dests], [route_error(xmpp:set_from_to(Packet, From, From), jid_malformed, <<"This service can not process the address: ", @@ -497,14 +472,14 @@ group_dests(Dests) -> %%% Look for cached responses %%%------------------------- -look_cached_servers(LServerS, Groups) -> - [look_cached(LServerS, Group) || Group <- Groups]. +look_cached_servers(LServerS, LServiceS, Groups) -> + [look_cached(LServerS, LServiceS, Group) || Group <- Groups]. -look_cached(LServerS, G) -> +look_cached(LServerS, LServiceS, G) -> Maxtime_positive = (?MAXTIME_CACHE_POSITIVE), Maxtime_negative = (?MAXTIME_CACHE_NEGATIVE), Cached_response = search_server_on_cache(G#group.server, - LServerS, + LServerS, LServiceS, {Maxtime_positive, Maxtime_negative}), G#group{multicast = Cached_response}. @@ -520,7 +495,7 @@ build_others_xml(Groups) -> build_other_xml(Dests) -> lists:foldl(fun (Dest, R) -> - XML = Dest#dest.full_xml, + XML = Dest#dest.address, case Dest#dest.type of to -> [add_delivered(XML) | R]; cc -> [add_delivered(XML) | R]; @@ -554,53 +529,38 @@ add_addresses2(Delivereds, [Group | Groups], Res, Pa, %%% Decide action groups %%%------------------------- +-spec decide_action_groups([#group{}]) -> [{routing(), #group{}}]. decide_action_groups(Groups) -> - [{decide_action_group(Group), Group} + [{Group#group.multicast, Group} || Group <- Groups]. -decide_action_group(Group) -> - Server = Group#group.server, - case Group#group.multicast of - {cached, local_server} -> - %% Send a copy of the packet to each local user on Dests - route_single; - {cached, not_supported} -> - %% Send a copy of the packet to each remote user on Dests - route_single; - {cached, {multicast_supported, JID, RLimits}} -> - {route_multicast, JID, RLimits}; - {obsolete, - {multicast_supported, Old_service, _RLimits}} -> - {ask, Old_service, renewal}; - {obsolete, not_supported} -> {ask, Server, not_renewal}; - not_cached -> {ask, Server, not_renewal} - end. - %%%------------------------- %%% Route packet %%%------------------------- -route_packet(From, ToDest, Packet, AAttrs, Others, Addresses) -> +-spec route_packet(jid(), #dest{}, xmpp:stanza(), [addresses()], [addresses()]) -> 'ok'. +route_packet(From, ToDest, Packet, Others, Addresses) -> Dests = case ToDest#dest.type of bcc -> []; _ -> [ToDest] end, route_packet2(From, ToDest#dest.jid_string, Dests, - Packet, AAttrs, {Others, Addresses}). + Packet, {Others, Addresses}). -route_packet_multicast(From, ToS, Packet, AAttrs, Dests, +-spec route_packet_multicast(jid(), binary(), xmpp:stanza(), [#dest{}], [address()], #limits{}) -> 'ok'. +route_packet_multicast(From, ToS, Packet, Dests, Addresses, Limits) -> Type_of_stanza = type_of_stanza(Packet), {_Type, Limit_number} = get_limit_number(Type_of_stanza, Limits), Fragmented_dests = fragment_dests(Dests, Limit_number), - [route_packet2(From, ToS, DFragment, Packet, AAttrs, - Addresses) - || DFragment <- Fragmented_dests]. + lists:foreach(fun(DFragment) -> + route_packet2(From, ToS, DFragment, Packet, + Addresses) + end, Fragmented_dests). --spec route_packet2(jid(), binary(), [#dest{}], stanza(), list(), [address()]) -> ok. -route_packet2(From, ToS, Dests, Packet, _AAttrs, - Addresses) -> +-spec route_packet2(jid(), binary(), [#dest{}], xmpp:stanza(), {[address()], [address()]} | [address()]) -> 'ok'. +route_packet2(From, ToS, Dests, Packet, Addresses) -> Els = case append_dests(Dests, Addresses) of [] -> xmpp:get_els(Packet); @@ -613,10 +573,10 @@ route_packet2(From, ToS, Dests, Packet, _AAttrs, -spec append_dests([#dest{}], {[address()], [address()]} | [address()]) -> [address()]. append_dests(_Dests, {Others, Addresses}) -> - Addresses++Others; + Addresses ++ Others; append_dests([], Addresses) -> Addresses; append_dests([Dest | Dests], Addresses) -> - append_dests(Dests, [Dest#dest.full_xml | Addresses]). + append_dests(Dests, [Dest#dest.address | Addresses]). %%%------------------------- %%% Check relay @@ -647,20 +607,22 @@ check_relay_required(LServerS, Groups) -> %%% Check protocol support: Send request %%%------------------------- -send_query_info(RServerS, LServiceS) -> +-spec send_query_info(binary(), binary(), binary()) -> ok. +send_query_info(RServerS, LServiceS, ID) -> case str:str(RServerS, <<"echo.">>) of - 1 -> false; - _ -> send_query(RServerS, LServiceS, #disco_info{}) + 1 -> ok; + _ -> send_query(RServerS, LServiceS, ID, #disco_info{}) end. -send_query_items(RServerS, LServiceS) -> - send_query(RServerS, LServiceS, #disco_items{}). +-spec send_query_items(binary(), binary(), binary()) -> ok. +send_query_items(RServerS, LServiceS, ID) -> + send_query(RServerS, LServiceS, ID, #disco_items{}). --spec send_query(binary(), binary(), [disco_info()|disco_items()]) -> ok. -send_query(RServerS, LServiceS, SubEl) -> +-spec send_query(binary(), binary(), binary(), disco_info()|disco_items()) -> ok. +send_query(RServerS, LServiceS, ID, SubEl) -> Packet = #iq{from = stj(LServiceS), to = stj(RServerS), - id = randoms:get_string(), + id = ID, type = get, sub_els = [SubEl]}, ejabberd_router:route(Packet). @@ -670,10 +632,31 @@ send_query(RServerS, LServiceS, SubEl) -> process_iqreply_error(LServiceS, Packet) -> FromS = jts(xmpp:get_from(Packet)), - case search_waiter(FromS, LServiceS, info) of - {found_waiter, Waiter} -> - received_awaiter(FromS, Waiter, LServiceS); - _ -> ok + ID = Packet#iq.id, + case str:tokens(ID, <<"/">>) of + [RServer, _] -> + case look_server(RServer) of + {cached, {_Response, {wait_for_info, ID}}, _TS} + when RServer == FromS -> + add_response(RServer, not_supported, cached); + {cached, {_Response, {wait_for_items, ID}}, _TS} + when RServer == FromS -> + add_response(RServer, not_supported, cached); + {cached, {Response, {wait_for_items_info, ID, Items}}, + _TS} -> + case lists:member(FromS, Items) of + true -> + received_awaiter( + FromS, RServer, Response, ID, Items, + LServiceS); + false -> + ok + end; + _ -> + ok + end; + _ -> + ok end. %%%------------------------- @@ -681,12 +664,12 @@ process_iqreply_error(LServiceS, Packet) -> %%%------------------------- -spec process_iqreply_result(binary(), iq()) -> any(). -process_iqreply_result(LServiceS, #iq{from = From, sub_els = [SubEl]}) -> +process_iqreply_result(LServiceS, #iq{from = From, id = ID, sub_els = [SubEl]}) -> case SubEl of #disco_info{} -> - process_discoinfo_result(From, LServiceS, SubEl); + process_discoinfo_result(From, LServiceS, ID, SubEl); #disco_items{} -> - process_discoitems_result(From, LServiceS, SubEl); + process_discoitems_result(From, LServiceS, ID, SubEl); _ -> ok end. @@ -695,46 +678,53 @@ process_iqreply_result(LServiceS, #iq{from = From, sub_els = [SubEl]}) -> %%% Check protocol support: Receive response: Disco Info %%%------------------------- -process_discoinfo_result(From, LServiceS, DiscoInfo) -> +process_discoinfo_result(From, LServiceS, ID, DiscoInfo) -> FromS = jts(From), - case search_waiter(FromS, LServiceS, info) of - {found_waiter, Waiter} -> - process_discoinfo_result2(From, FromS, LServiceS, DiscoInfo, - Waiter); - _ -> ok + case str:tokens(ID, <<"/">>) of + [RServer, _] -> + case look_server(RServer) of + {cached, {Response, {wait_for_info, ID} = ST}, _TS} + when RServer == FromS -> + process_discoinfo_result2( + From, FromS, LServiceS, DiscoInfo, + RServer, Response, ST); + {cached, {Response, {wait_for_items_info, ID, Items} = ST}, + _TS} -> + case lists:member(FromS, Items) of + true -> + process_discoinfo_result2( + From, FromS, LServiceS, DiscoInfo, + RServer, Response, ST); + false -> + ok + end; + _ -> + ok + end; + _ -> + ok end. process_discoinfo_result2(From, FromS, LServiceS, #disco_info{features = Feats} = DiscoInfo, - Waiter) -> + RServer, Response, ST) -> Multicast_support = lists:member(?NS_ADDRESS, Feats), - Group = Waiter#waiter.group, - RServer = Group#group.server, case Multicast_support of true -> SenderT = sender_type(From), RLimits = get_limits_xml(DiscoInfo, SenderT), - add_response(RServer, {multicast_supported, FromS, RLimits}), - FromM = Waiter#waiter.sender, - DestsM = Group#group.dests, - PacketM = Waiter#waiter.packet, - AAttrsM = Waiter#waiter.aattrs, - AddressesM = Waiter#waiter.addresses, - RServiceM = FromS, - route_packet_multicast(FromM, RServiceM, PacketM, - AAttrsM, DestsM, AddressesM, RLimits), - delo_waiter(Waiter); + add_response(RServer, {multicast_supported, FromS, RLimits}, cached); false -> - case FromS of - RServer -> - send_query_items(FromS, LServiceS), - delo_waiter(Waiter), - add_waiter(Waiter#waiter{awaiting = - {[FromS], LServiceS, items}, - renewal = false}); - %% We asked a component, and it does not support XEP33 - _ -> received_awaiter(FromS, Waiter, LServiceS) - end + case ST of + {wait_for_info, _ID} -> + Random = randoms:get_string(), + ID = <>, + send_query_items(FromS, LServiceS, ID), + add_response(RServer, Response, {wait_for_items, ID}); + %% We asked a component, and it does not support XEP33 + {wait_for_items_info, ID, Items} -> + received_awaiter(FromS, RServer, Response, ID, Items, LServiceS) + end end. get_limits_xml(DiscoInfo, SenderT) -> @@ -778,27 +768,32 @@ get_limits_values(Fields) -> %%% Check protocol support: Receive response: Disco Items %%%------------------------- -process_discoitems_result(From, LServiceS, #disco_items{items = Items}) -> +process_discoitems_result(From, LServiceS, ID, #disco_items{items = Items}) -> FromS = jts(From), - case search_waiter(FromS, LServiceS, items) of - {found_waiter, Waiter} -> - List = lists:flatmap( - fun(#disco_item{jid = #jid{luser = <<"">>, - lserver = LServer, - lresource = <<"">>}}) -> - [LServer]; - (_) -> - [] - end, Items), - case List of - [] -> - received_awaiter(FromS, Waiter, LServiceS); + case str:tokens(ID, <<"/">>) of + [FromS = RServer, _] -> + case look_server(RServer) of + {cached, {Response, {wait_for_items, ID}}, _TS} -> + List = lists:flatmap( + fun(#disco_item{jid = #jid{luser = <<"">>, + lserver = LServer, + lresource = <<"">>}}) -> + [LServer]; + (_) -> + [] + end, Items), + case List of + [] -> + add_response(RServer, not_supported, cached); + _ -> + Random = randoms:get_string(), + ID2 = <>, + [send_query_info(Item, LServiceS, ID2) || Item <- List], + add_response(RServer, Response, + {wait_for_items_info, ID2, List}) + end; _ -> - [send_query_info(Item, LServiceS) || Item <- List], - delo_waiter(Waiter), - add_waiter(Waiter#waiter{awaiting = - {List, LServiceS, info}, - renewal = false}) + ok end; _ -> ok @@ -808,33 +803,12 @@ process_discoitems_result(From, LServiceS, #disco_items{items = Items}) -> %%% Check protocol support: Receive response: Received awaiter %%%------------------------- -received_awaiter(JID, Waiter, LServiceS) -> - {JIDs, LServiceS, _} = Waiter#waiter.awaiting, - delo_waiter(Waiter), - Group = Waiter#waiter.group, - RServer = Group#group.server, +received_awaiter(JID, RServer, Response, ID, JIDs, _LServiceS) -> case lists:delete(JID, JIDs) of - [] -> - case Waiter#waiter.renewal of - false -> - add_response(RServer, not_supported), - From = Waiter#waiter.sender, - Packet = Waiter#waiter.packet, - AAttrs = Waiter#waiter.aattrs, - Others = Group#group.others, - Addresses = Waiter#waiter.addresses, - [route_packet(From, ToUser, Packet, AAttrs, Others, Addresses) - || ToUser <- Group#group.dests]; - true -> - send_query_info(RServer, LServiceS), - add_waiter(Waiter#waiter{awaiting = - {[RServer], LServiceS, info}, - renewal = false}) - end; - JIDs2 -> - add_waiter(Waiter#waiter{awaiting = - {JIDs2, LServiceS, info}, - renewal = false}) + [] -> + add_response(RServer, not_supported, cached); + JIDs2 -> + add_response(RServer, Response, {wait_for_items_info, ID, JIDs2}) end. %%%------------------------- @@ -846,25 +820,52 @@ create_cache() -> [{ram_copies, [node()]}, {attributes, record_info(fields, multicastc)}]). -add_response(RServer, Response) -> +add_response(RServer, Response, State) -> Secs = calendar:datetime_to_gregorian_seconds(calendar:local_time()), mnesia:dirty_write(#multicastc{rserver = RServer, - response = Response, ts = Secs}). + response = {Response, State}, ts = Secs}). -search_server_on_cache(RServer, LServerS, _Maxmins) +search_server_on_cache(RServer, LServerS, _LServiceS, _Maxmins) when RServer == LServerS -> - {cached, local_server}; -search_server_on_cache(RServer, _LServerS, Maxmins) -> + route_single; +search_server_on_cache(RServer, _LServerS, LServiceS, Maxmins) -> case look_server(RServer) of - not_cached -> not_cached; - {cached, Response, Ts} -> - Now = calendar:datetime_to_gregorian_seconds(calendar:local_time()), - case is_obsolete(Response, Ts, Now, Maxmins) of - false -> {cached, Response}; - true -> {obsolete, Response} - end + not_cached -> + query_info(RServer, LServiceS, not_supported), + route_single; + {cached, {Response, State}, TS} -> + Now = calendar:datetime_to_gregorian_seconds(calendar:local_time()), + Response2 = + case State of + cached -> + case is_obsolete(Response, TS, Now, Maxmins) of + false -> ok; + true -> + query_info(RServer, LServiceS, Response) + end, + Response; + _ -> + if + Now - TS > ?MAXTIME_CACHE_NEGOTIATING -> + query_info(RServer, LServiceS, not_supported), + not_supported; + true -> + Response + end + end, + case Response2 of + not_supported -> route_single; + {multicast_supported, Service, Limits} -> + {route_multicast, Service, Limits} + end end. +query_info(RServer, LServiceS, Response) -> + Random = randoms:get_string(), + ID = <>, + send_query_info(RServer, LServiceS, ID), + add_response(RServer, Response, {wait_for_info, ID}). + look_server(RServer) -> case mnesia:dirty_read(multicastc, RServer) of [] -> not_cached; @@ -935,44 +936,6 @@ purge_loop(NM) -> try_stop -> purge_loop_finished end. -%%%------------------------- -%%% Pool -%%%------------------------- - -create_pool() -> - catch - begin - ets:new(multicastp, - [duplicate_bag, public, named_table, {keypos, 2}]), - ets:give_away(multicastp, whereis(ejabberd), ok) - end. - -add_waiter(Waiter) -> - true = ets:insert(multicastp, Waiter). - -delo_waiter(Waiter) -> - true = ets:delete_object(multicastp, Waiter). - --spec search_waiter(binary(), binary(), info | items) -> - {found_waiter, #waiter{}} | waiter_not_found. - -search_waiter(JID, LServiceS, Type) -> - Rs = ets:foldl(fun (W, Res) -> - {JIDs, LServiceS1, Type1} = W#waiter.awaiting, - case lists:member(JID, JIDs) and - (LServiceS == LServiceS1) - and (Type1 == Type) - of - true -> Res ++ [W]; - false -> Res - end - end, - [], multicastp), - case Rs of - [R | _] -> {found_waiter, R}; - [] -> waiter_not_found - end. - %%%------------------------- %%% Limits: utils %%%------------------------- @@ -1006,11 +969,13 @@ get_from_limitopts(LimitOpts, SenderT) -> build_remote_limit_record(LimitOpts, SenderT) -> build_limit_record(LimitOpts, SenderT). +-spec build_limit_record(any(), local | remote) -> #limits{}. build_limit_record(LimitOpts, SenderT) -> Limits = [get_limit_value(Name, Default, LimitOpts) || {Name, Default} <- list_of_limits(SenderT)], list_to_tuple([limits | Limits]). +-spec get_limit_value(atom(), integer(), any()) -> limit_value(). get_limit_value(Name, Default, LimitOpts) -> case lists:keysearch(Name, 1, LimitOpts) of {value, {Name, Number}} -> {custom, Number}; @@ -1019,11 +984,13 @@ get_limit_value(Name, Default, LimitOpts) -> type_of_stanza(Stanza) -> element(1, Stanza). +-spec get_limit_number(message | presence, #limits{}) -> limit_value(). get_limit_number(message, Limits) -> Limits#limits.message; get_limit_number(presence, Limits) -> Limits#limits.presence. +-spec get_slimit_group(local | remote, #service_limits{}) -> #limits{}. get_slimit_group(local, SLimits) -> SLimits#service_limits.local; get_slimit_group(remote, SLimits) ->