diff --git a/src/mod_multicast.erl b/src/mod_multicast.erl new file mode 100644 index 000000000..b4471b269 --- /dev/null +++ b/src/mod_multicast.erl @@ -0,0 +1,1262 @@ +%%%%---------------------------------------------------------------------- +%%% File : mod_multicast.erl +%%% Author : Badlop +%%% Purpose : Extended Stanza Addressing (XEP-0033) support +%%% Created : 29 May 2007 by Badlop +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2010 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License +%%% along with this program; if not, write to the Free Software +%%% Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA +%%% 02111-1307 USA +%%% +%%%---------------------------------------------------------------------- + +%%%================================== +%%%% Headers + +-module(mod_multicast). +-author('badlop@process-one.net'). + +-behaviour(gen_server). +-behaviour(gen_mod). + +%% API +-export([start_link/2, start/2, stop/1]). + +%% gen_server callbacks +-export([init/1, + handle_info/2, + handle_call/3, + handle_cast/2, + terminate/2, + code_change/3 + ]). + +-export([ + purge_loop/1 + ]). + +-include_lib("exmpp/include/exmpp.hrl"). + +-include("ejabberd.hrl"). +-include("jlib.hrl"). + +%% Copied from mod_muc_room.erl +-define(ERR(Packet,Type, Lang, ErrText), + exmpp_stanza:error(Packet#xmlel.ns, + Type, + {Lang, translate:translate(Lang, ErrText)})). + +-record(state, {lserver, lservice, access, service_limits}). + +-record(multicastc, {rserver, response, ts}). +%% ts: timestamp (in seconds) when the cache item was last updated + +-record(dest, {jid_string, jid_jid, type, full_xml}). +%% jid_string = string() +%% jid_jid = jid() +%% full_xml = xml() + +-record(group, {server, dests, multicast, others, addresses}). +%% server = string() +%% dests = [string()] +%% multicast = {cached, local_server} | {cached, string()} | {cached, not_supported} | {obsolete, not_supported} | {obsolete, string()} | not_cached +%% after being updated, possible values are: local | multicast_not_supported | {multicast_supported, string(), limits()} +%% others = [xml()] +%% packet = xml() + +-record(waiter, {awaiting, group, renewal=false, sender, packet, aattrs, addresses}). +%% awaiting = {[Remote_service], Local_service, Type_awaiting} +%% Remote_service = Local_service = string() +%% Type_awaiting = info | items +%% group = #group +%% renewal = true | false +%% sender = From +%% packet = xml() +%% aattrs = [xml()] + +-record(limits, {message, presence}). +%% message = presence = integer() | infinite + +-record(service_limits, {local, remote}). +%% local = remote = limits() + +%% All the elements are of type value() + +-define(VERSION_MULTICAST, ?VERSION). +-define(PROCNAME, ejabberd_mod_multicast). + +-define(PURGE_PROCNAME, ejabberd_mod_multicast_purgeloop). + +%% Time in seconds +-define(MAXTIME_CACHE_POSITIVE, 86400). +-define(MAXTIME_CACHE_NEGATIVE, 86400). + +%% Time in miliseconds +-define(CACHE_PURGE_TIMER, 86400000). % Purge the cache every 24 hours +-define(DISCO_QUERY_TIMEOUT, 10000). % After 10 seconds of delay the server is declared dead + +-define(DEFAULT_LIMIT_LOCAL_MESSAGE, 100). +-define(DEFAULT_LIMIT_LOCAL_PRESENCE, 100). +-define(DEFAULT_LIMIT_REMOTE_MESSAGE, 20). +-define(DEFAULT_LIMIT_REMOTE_PRESENCE,20). + + +%%%==================================================================== +%%%% API +%%-------------------------------------------------------------------- +%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server +%%-------------------------------------------------------------------- +start_link(LServerS, Opts) -> + Proc = gen_mod:get_module_proc(LServerS, ?PROCNAME), + gen_server:start_link({local, Proc}, ?MODULE, [LServerS, Opts], []). + +start(LServerS, Opts) -> + Proc = gen_mod:get_module_proc(LServerS, ?PROCNAME), + ChildSpec = { + Proc, + {?MODULE, start_link, [LServerS, Opts]}, + temporary, + 1000, + worker, + [?MODULE]}, + supervisor:start_child(ejabberd_sup, ChildSpec). + +stop(LServerS) -> + Proc = gen_mod:get_module_proc(LServerS, ?PROCNAME), + gen_server:call(Proc, stop), + supervisor:terminate_child(ejabberd_sup, Proc), + supervisor:delete_child(ejabberd_sup, Proc). + +%%%==================================================================== +%%%% gen_server callbacks + +%%-------------------------------------------------------------------- +%% Function: init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%-------------------------------------------------------------------- +init([LServerS, Opts]) -> + LServiceS = gen_mod:get_opt(host, Opts, "multicast." ++ LServerS), + Access = gen_mod:get_opt(access, Opts, all), + SLimits = build_service_limit_record(gen_mod:get_opt(limits, Opts, [])), + create_cache(), + try_start_loop(), + create_pool(), + ejabberd_router_multicast:register_route(LServerS), + ejabberd_router:register_route(LServiceS), + {ok, #state{lservice = LServiceS, + lserver = LServerS, + access = Access, + service_limits = SLimits}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call(stop, _From, State) -> + try_stop_loop(), + {stop, normal, ok, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- + +handle_info({route, From, To, #xmlel{name=iq} = Packet}, State) -> + IQ = exmpp_iq:xmlel_to_iq(Packet), + case catch process_iq(From, IQ, State) of + Result when is_record(Result, iq) -> + ejabberd_router:route(To, From, exmpp_iq:iq_to_xmlel(Result)); + {'EXIT', Reason} -> + ?ERROR_MSG("Error when processing IQ stanza: ~p", [Reason]), + Err = exmpp_stanza:reply_with_error(Packet, 'internal-server-error'), + ejabberd_router:route(To, From, Err); + reply -> + LServiceS = jts(To), + case exmpp_xml:get_attribute_as_list(Packet, type, "error") of + "result" -> process_iqreply_result(From, LServiceS, Packet, State); + "error" -> process_iqreply_error(From, LServiceS, Packet) + end + end, + {noreply, State}; + +%% XEP33 allows only 'message' and 'presence' stanza type +handle_info({route, From, To, #xmlel{name = Stanza_type} = Packet}, + #state{lservice = LServiceS, + lserver = LServerS, + access = Access, + service_limits = SLimits} = State) + when (Stanza_type == message) or (Stanza_type == presence) -> + %%io:format("Multicast packet: ~nFrom: ~p~nTo: ~p~nPacket: ~p~n", [From, To, Packet]), + route_untrusted(LServiceS, LServerS, Access, SLimits, From, To, Packet), + {noreply, State}; + +%% Handle multicast packets sent by trusted local services +handle_info({route_trusted, From, Destinations, Packet}, + #state{lservice = LServiceS, + lserver = LServerS} = State) -> + %%io:format("Multicast packet2: ~nFrom: ~p~nDestinations: ~p~nPacket: ~p~n", [From, Destinations, Packet]), + route_trusted(LServiceS, LServerS, From, Destinations, Packet), + {noreply, State}; + +handle_info({get_host, Pid}, State) -> + Pid ! {my_host, State#state.lservice}, + {noreply, State}; + +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, State) -> + ejabberd_router_multicast:unregister_route(State#state.lserver), + ejabberd_router:unregister_route(State#state.lservice), + ok. + +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%==================================================================== +%% Internal functions +%%==================================================================== + +%%%================================== +%%%% IQ Request Processing + +%% disco#info request +process_iq(From, #iq{type = get, ns = ?NS_DISCO_INFO, lang = Lang} = IQ, State) -> + Response = #xmlel{ns = ?NS_DISCO_INFO, name = 'query', + children = iq_disco_info(From, Lang, State)}, + exmpp_iq:result(IQ, Response); + +%% disco#items request +process_iq(_, #iq{type = get, ns = ?NS_DISCO_ITEMS} = IQ, _) -> + Response = #xmlel{ns = ?NS_DISCO_ITEMS, name = 'query'}, + exmpp_iq:result(IQ, Response); + +%% vCard request +process_iq(_, #iq{type = get, ns = ?NS_VCARD, lang = Lang} = IQ, _) -> + Response = #xmlel{ns = ?NS_VCARD, name = 'vCard', children = iq_vcard(Lang)}, + exmpp_iq:result(IQ, Response); + +%% version request +process_iq(_, #iq{type = get, ns = ?NS_SOFT_VERSION} = IQ, _) -> + Response = #xmlel{ns = ?NS_SOFT_VERSION, name = 'query', + children = iq_version()}, + exmpp_iq:result(IQ, Response); + +%% Unknown "set" or "get" request +process_iq(_, #iq{type=Type} = IQ, _) when Type==get; Type==set -> + exmpp_iq:error(IQ, 'service-unavailable'); + +%% IQ "result" or "error". +process_iq(_, #iq{type=Type}, _) when Type==result; Type==error -> + reply; + +%% Other IQ +process_iq(_, _IQ, _) -> + unknown_iq. + +-define(FEATURE(Feat), #xmlel{name = feature, attrs = [#xmlattr{name = var, value = Feat}]}). + +iq_disco_info(From, Lang, State) -> + [#xmlel{ns = ?NS_DISCO_INFO, name = 'identity', + attrs = [?XMLATTR('category', <<"service">>), + ?XMLATTR('type', <<"multicast">>), + ?XMLATTR('name', translate:translate(Lang, + "Multicast"))]}, + ?FEATURE(?NS_DISCO_INFO_b), + ?FEATURE(?NS_DISCO_ITEMS_b), + ?FEATURE(?NS_VCARD_b), + ?FEATURE(?NS_ADDRESS_b)] ++ + iq_disco_info_extras(From, State). + +iq_vcard(Lang) -> + [#xmlel{ns = ?NS_VCARD, name = 'FN', children = [#xmlcdata{cdata = <<"ejabberd/mod_multicast">>}]}, + #xmlel{ns = ?NS_VCARD, name = 'URL', children = [#xmlcdata{cdata = list_to_binary(?EJABBERD_URI)}]}, + #xmlel{ns = ?NS_VCARD, name ='DESC', children = [#xmlcdata{cdata = list_to_binary( + translate:translate(Lang, "ejabberd Multicast service") ++ + "\nCopyright (c) 2003-2010 ProcessOne")}]} + ]. + +iq_version() -> + [#xmlel{ns = ?NS_VCARD, name = 'name', children = [#xmlcdata{cdata = <<"mod_multicast">>}]}, + #xmlel{ns = ?NS_VCARD, name = 'version', children = [#xmlcdata{cdata = list_to_binary(?VERSION_MULTICAST)}]} + ]. + +%%%================================== +%%%% Route + +%% Destinations = [string()] +route_trusted(LServiceS, LServerS, FromJID, Destinations, Packet) -> + + %% Strip 'addresses' from packet + Packet_stripped = Packet, + AAttrs = [], %{"xmlns", ?NS_ADDRESS}], + Delivereds = [], + + Dests2 = lists:map( + fun(D) -> + DS = jts(D), + XML = #xmlel{name = address, + ns = ?NS_ADDRESS, + attrs = [#xmlattr{name = type, value = <<"bcc">>}, + #xmlattr{name = jid, value = list_to_binary(DS)}] }, + #dest{jid_string = DS, + jid_jid = D, + type = "bcc", + full_xml = XML} + end, + Destinations), + + %% Group Not_delivered by server + Groups = group_dests(Dests2), + + route_common(LServerS, LServiceS, FromJID, Groups, Delivereds, Packet_stripped, AAttrs). + +route_untrusted(LServiceS, LServerS, Access, SLimits, From, To, Packet) -> + try route_untrusted2(LServiceS, LServerS, Access, SLimits, From, Packet) + catch + throw:adenied -> route_error(To, From, Packet, forbidden, "Access denied by service policy"); + throw:eadsele -> route_error(To, From, Packet, 'bad-request', "No addresses element found"); + throw:eadeles -> route_error(To, From, Packet, 'bad-request', "No address elements found"); + throw:ewxmlns -> route_error(To, From, Packet, 'bad-request', "Wrong xmlns"); + throw:etoorec -> route_error(To, From, Packet, 'not-acceptable', "Too many receiver fields were specified"); + throw:edrelay -> route_error(To, From, Packet, forbidden, "Packet relay is denied by service policy"); + EType:EReason -> + ?ERROR_MSG("Multicast unknown error: Type: ~p, Reason: ~p~nStacktace: ~p", [EType, EReason, erlang:get_stacktrace()]), + route_error(To, From, Packet, 'internal-server-error', "Unknown problem") + end. + +route_untrusted2(LServiceS, LServerS, Access, SLimits, FromJID, Packet) -> + ok = check_access(LServerS, Access, FromJID), + + %% Strip 'addresses' from packet + {ok, Packet_stripped, AAttrs, Addresses} = strip_addresses_element(Packet), + + %% Split Addresses in To_deliver and Delivered + {To_deliver, Delivereds} = split_addresses_todeliver(Addresses), + + %% Convert XML to record + Dests = convert_dest_record(To_deliver), + + %% Split the destinations by JID + {Dests2, Not_jids} = split_dests_jid(Dests), + report_not_jid(FromJID, Packet, Not_jids), + + %% Check limit + ok = check_limit_dests(SLimits, FromJID, Packet, Dests2), + + %% Group Not_delivered by server + Groups = group_dests(Dests2), + + %% Check relay for each Group + ok = check_relay(exmpp_jid:domain_as_list(FromJID), LServerS, Groups), + + route_common(LServerS, LServiceS, FromJID, Groups, Delivereds, Packet_stripped, AAttrs). + +route_common(LServerS, LServiceS, FromJID, Groups, Delivereds, Packet_stripped, AAttrs) -> + %% Gather multicast service for each Group + Groups2 = look_cached_servers(LServerS, Groups), + + %% Create Delivered XML element for each Group + Groups3 = build_others_xml(Groups2), + + %% Add preliminary packet for each group + Groups4 = add_addresses(Delivereds, Groups3), + + %% Decide action for each Group + AGroups = decide_action_groups(Groups4), + + act_groups(FromJID, Packet_stripped, AAttrs, LServiceS, AGroups). + +act_groups(FromJID, Packet_stripped, AAttrs, LServiceS, AGroups) -> + [perform(FromJID, Packet_stripped, AAttrs, LServiceS, AGroup) || AGroup <- AGroups]. + +perform(From, Packet, AAttrs, _, {route_single, Group}) -> + [route_packet(From, ToUser, Packet, AAttrs, Group#group.addresses) || ToUser <- Group#group.dests]; + +perform(From, Packet, AAttrs, _, {{route_multicast, JID, RLimits}, Group}) -> + route_packet_multicast(From, JID, Packet, AAttrs, Group#group.dests, Group#group.addresses, RLimits); + +perform(From, Packet, AAttrs, LServiceS, {{ask, Old_service, renewal}, Group}) -> + send_query_info(Old_service, LServiceS), + add_waiter(#waiter{awaiting = {[Old_service], LServiceS, info}, + group = Group, + renewal = true, + sender = From, + packet = Packet, + aattrs = AAttrs, + addresses = Group#group.addresses + }); + +perform(From, Packet, AAttrs, LServiceS, {{ask, Server, not_renewal}, Group}) -> + send_query_info(Server, LServiceS), + add_waiter(#waiter{awaiting = {[Server], LServiceS, info}, + group = Group, + renewal = false, + sender = From, + packet = Packet, + aattrs = AAttrs, + addresses = Group#group.addresses + }). + +%%%================================== +%%%% Check access permission + +check_access(LServerS, Access, From) -> + case acl:match_rule(LServerS, Access, From) of + allow -> + ok; + _ -> + throw(adenied) + end. + +%%%================================== +%%%% Strip 'addresses' XML element + +strip_addresses_element(Packet) -> + case exmpp_xml:get_element(Packet, addresses) of + undefined -> throw(eadsele); + AddrEl -> + case exmpp_xml:get_ns_as_atom(AddrEl) of + ?NS_ADDRESS -> + Packet_stripped = exmpp_xml:remove_elements(Packet, addresses), + {ok, Packet_stripped, AddrEl#xmlel.attrs, exmpp_xml:get_child_elements(AddrEl)}; + _ -> throw(ewxmlns) + end + end. + +%%%================================== +%%%% Split Addresses + +%% @spec (Addresses) -> {To_deliver, Delivereds} +%% @doc Split the addresses list into two lists: the ones to deliver, and the ones already delivered. +split_addresses_todeliver(Addresses) -> + lists:partition( + fun(XML) -> + case XML of + #xmlel{name = address} = Packet -> + case exmpp_xml:get_attribute_as_binary(Packet, "delivered", no_delivered) of + <<"true">> -> false; + _ -> + Type = exmpp_xml:get_attribute_as_binary(Packet, "type", no_type), + case Type of + <<"to">> -> true; + <<"cc">> -> true; + <<"bcc">> -> true; + _ -> false + end + end; + _ -> false + end + end, + Addresses). + +%%%================================== +%%%% Check does not exceed limit of destinations + +check_limit_dests(SLimits, FromJID, Packet, Addresses) -> + SenderT = sender_type(FromJID), + Limits = get_slimit_group(SenderT, SLimits), + Type_of_stanza = Packet#xmlel.name, + {_Type, Limit_number} = get_limit_number(Type_of_stanza, Limits), + case length(Addresses) > Limit_number of + false -> + ok; + true -> + throw(etoorec) + end. + +%%%================================== +%%%% Convert Destination XML to record + +convert_dest_record(XMLs) -> + lists:map( + fun(XML) -> + case exmpp_xml:get_attribute_as_list(XML, jid, "") of + [] -> + #dest{jid_string = none, full_xml = XML}; + JIDS -> + Type = exmpp_xml:get_attribute_as_list(XML, type, ""), + JIDJ = stj(JIDS), + #dest{jid_string = JIDS, + jid_jid = JIDJ, + type = Type, + full_xml = XML} + end + end, + XMLs). + +%%%================================== +%%%% Split destinations by existence of JID and send error messages for other dests + +split_dests_jid(Dests) -> + lists:partition( + fun(Dest) -> + case Dest#dest.jid_string of + none -> false; + _ -> true + end + end, + Dests). + +%% Sends an error message for each unknown address +%% Currently only 'jid' addresses are acceptable on ejabberd +report_not_jid(From, Packet, Dests) -> + Dests2 = [exmpp_xml:document_to_list(Dest#dest.full_xml) || Dest <- Dests], + [route_error(From, From, Packet, 'jid-malformed', + "This service can not process the address: " ++ D) + || D <- Dests2]. + +%%%================================== +%%%% Group destinations by their servers + +group_dests(Dests) -> + D = lists:foldl( + fun(Dest, Dict) -> + ServerS = exmpp_jid:domain_as_list(Dest#dest.jid_jid), + dict:append(ServerS, Dest, Dict) + end, + dict:new(), + Dests), + Keys = dict:fetch_keys(D), + [ #group{server = Key, dests = dict:fetch(Key, D)} || Key <- Keys ]. + +%%%================================== +%%%% Look for cached responses + +look_cached_servers(LServerS, Groups) -> + [look_cached(LServerS, Group) || Group <- Groups]. + +look_cached(LServerS, G) -> + Maxtime_positive = ?MAXTIME_CACHE_POSITIVE, + Maxtime_negative = ?MAXTIME_CACHE_NEGATIVE, + + Cached_response = + search_server_on_cache(G#group.server, LServerS, + {Maxtime_positive, Maxtime_negative}), + G#group{multicast = Cached_response}. + +%%%================================== +%%%% Build delivered XML element + +build_others_xml(Groups) -> + [Group#group{others = build_other_xml(Group#group.dests)} || Group <- Groups]. + +%% Add delivered=true +%% and remove addresses which type == bcc +build_other_xml(Dests) -> + lists:foldl( + fun(Dest, R) -> + XML = Dest#dest.full_xml, + case Dest#dest.type of + "to" -> [add_delivered(XML) | R]; + "cc" -> [add_delivered(XML) | R]; + "bcc" -> R; + _ -> [XML | R] + end + end, + [], + Dests). + +add_delivered(Stanza) -> + exmpp_xml:set_attribute(Stanza, delivered, 'true'). + +%%%================================== +%%%% Add preliminary packets + +add_addresses(Delivereds, Groups) -> + Ps = [Group#group.others || Group <- Groups], + add_addresses2(Delivereds, Groups, [], [], Ps). + +add_addresses2(_, [], Res, _, []) -> + Res; + +add_addresses2(Delivereds, [Group | Groups], Res, Pa, [Pi | Pz]) -> + Addresses = lists:append([Delivereds, Pi] ++ Pa ++ Pz), + Group2 = Group#group{addresses = Addresses}, + add_addresses2(Delivereds, Groups, [Group2 | Res], [Pi | Pa], Pz). + +%%%================================== +%%%% Decide action groups + +decide_action_groups(Groups) -> + [{decide_action_group(Group), Group} || Group <- Groups]. + +decide_action_group(Group) -> + Server = Group#group.server, + case Group#group.multicast of + + {cached, local_server} -> + %% Send a copy of the packet to each local user on Dests + route_single; + + {cached, not_supported} -> + %% Send a copy of the packet to each remote user on Dests + route_single; + + {cached, {multicast_supported, JID, RLimits}} -> + %% XEP33 is supported by the server, thanks to this service + {route_multicast, JID, RLimits}; + + {obsolete, {multicast_supported, Old_service, _RLimits}} -> + {ask, Old_service, renewal}; + + {obsolete, not_supported} -> + {ask, Server, not_renewal}; + + not_cached -> + {ask, Server, not_renewal} + + end. + +%%%================================== +%%%% Route packet + +%% Build and send packet to this group of destinations +%% From = jid() +%% ToS = string() +route_packet(From, ToDest, Packet, AAttrs, Addresses) -> + Dests = case ToDest#dest.type of + "bcc" -> []; + _ -> [] + end, + route_packet2(From, ToDest#dest.jid_string, Dests, Packet, AAttrs, Addresses). + +route_packet_multicast(From, ToS, Packet, AAttrs, Dests, Addresses, Limits) -> + Type_of_stanza = Packet#xmlel.name, + {_Type, Limit_number} = get_limit_number(Type_of_stanza, Limits), + Fragmented_dests = fragment_dests(Dests, Limit_number), + [route_packet2(From, ToS, DFragment, Packet, AAttrs, Addresses) || DFragment <- Fragmented_dests]. + +route_packet2(From, ToS, Dests, Packet, AAttrs, Addresses) -> + Packet2 = case append_dests(Dests, Addresses) of + [] -> + Packet; + ACs -> + Children = [#xmlel{name = addresses, ns = ?NS_ADDRESS, attrs = AAttrs, children = ACs}], + exmpp_xml:append_children(Packet, Children) + end, + ToJID = stj(ToS), + ejabberd_router:route(From, ToJID, Packet2). + +append_dests([], Addresses) -> + Addresses; +append_dests([Dest | Dests], Addresses) -> + append_dests(Dests, [Dest#dest.full_xml | Addresses]). + +%%%================================== +%%%% Check relay + +check_relay(RS, LS, Gs) -> + case check_relay_required(RS, LS, Gs) of + false -> ok; + true -> throw(edrelay) + end. + +%% If the sender is external, and at least one destination is external, +%% then this package requires relaying +check_relay_required(RServer, LServerS, Groups) -> + case string:str(RServer, LServerS) > 0 of + true -> false; + false -> check_relay_required(LServerS, Groups) + end. + +check_relay_required(LServerS, Groups) -> + lists:any( + fun(Group) -> + Group#group.server /= LServerS + end, + Groups). + +%%%================================== +%%%% Check protocol support: Send request + +%% Ask the server if it supports XEP33 +send_query_info(RServerS, LServiceS) -> + %% Don't ask a service which JID is "echo.*", + case string:str(RServerS, "echo.") of + 1 -> false; + _ -> send_query(RServerS, LServiceS, ?NS_DISCO_INFO) + end. + +send_query_items(RServerS, LServiceS) -> + send_query(RServerS, LServiceS, ?NS_DISCO_ITEMS). + +send_query(RServerS, LServiceS, XMLNS) -> + Request = #xmlel{ns = XMLNS, name = 'query'}, + Packet = exmpp_stanza:set_recipient( + exmpp_iq:get(?NS_JABBER_CLIENT, Request), + RServerS), + ejabberd_router:route(stj(LServiceS), stj(RServerS), Packet). + +%%%================================== +%%%% Check protocol support: Receive response: Error + +process_iqreply_error(From, LServiceS, _Packet) -> + %% We don't need to change the TO attribute in the outgoing XMPP packet, + %% since ejabberd will do it + + %% We do not change the FROM attribute in the outgoing XMPP packet, + %% this way the user will know what server reported the error + + FromS = jts(From), + case search_waiter(FromS, LServiceS, info) of + {found_waiter, Waiter} -> + received_awaiter(FromS, Waiter, LServiceS); + _ -> ok + end. + +%%%================================== +%%%% Check protocol support: Receive response: Disco + +process_iqreply_result(From, LServiceS, Packet, State) -> + QueryEl = exmpp_xml:get_element(Packet, 'query'), + Els2 = exmpp_xml:get_child_elements(QueryEl), + case exmpp_xml:get_ns_as_atom(QueryEl) of + ?NS_DISCO_INFO -> + process_discoinfo_result(From, LServiceS, Els2, State); + ?NS_DISCO_ITEMS -> + process_discoitems_result(From, LServiceS, Els2) + end. + +%%%================================== +%%%% Check protocol support: Receive response: Disco Info + +process_discoinfo_result(From, LServiceS, Els, _State) -> + FromS = jts(From), + case search_waiter(FromS, LServiceS, info) of + {found_waiter, Waiter} -> + process_discoinfo_result2(From, FromS, LServiceS, Els, Waiter); + _ -> + ok + end. + +process_discoinfo_result2(From, FromS, LServiceS, Els, Waiter) -> + %% Check the response, to see if it includes the XEP33 feature. If support == + Multicast_support = + lists:any( + fun(XML) -> + case XML of + #xmlel{name = feature, attrs = Attrs} -> + ?NS_ADDRESS_b == exmpp_xml:get_attribute_from_list_as_binary(Attrs, var, ""); + _ -> false + end + end, + Els), + + Group = Waiter#waiter.group, + RServer = Group#group.server, + + case Multicast_support of + true -> + %% Inspect the XML of the disco#info response to get the limits of the remote service + SenderT = sender_type(From), + RLimits = get_limits_xml(Els, SenderT), + + %% Store this response on cache + add_response(RServer, {multicast_supported, FromS, RLimits}), + + %% Send XEP33 packet to JID + FromM = Waiter#waiter.sender, + DestsM = Group#group.dests, + PacketM = Waiter#waiter.packet, + AAttrsM = Waiter#waiter.aattrs, + AddressesM = Waiter#waiter.addresses, + RServiceM = FromS, + route_packet_multicast(FromM, RServiceM, PacketM, AAttrsM, DestsM, AddressesM, RLimits), + + %% Remove from Pool + delo_waiter(Waiter); + + false -> + %% So we now know that JID does not support XEP33 + case FromS of + + RServer -> + %% We asked the server, now let's see if any component supports it: + + %% Send disco#items query to JID + send_query_items(FromS, LServiceS), + + %% Store on Pool + delo_waiter(Waiter), + add_waiter(Waiter#waiter{ + awaiting = {[FromS], LServiceS, items}, + renewal = false + }); + + %% We asked a component, and it does not support XEP33 + _ -> + received_awaiter(FromS, Waiter, LServiceS) + + end + end. + +get_limits_xml(Els, SenderT) -> + %% Get limits reported by the remote service + LimitOpts = get_limits_els(Els), + + %% Build the final list of limits + %% For the ones not reported, put default numbers + build_remote_limit_record(LimitOpts, SenderT). + +%% Look for disco#info extras which may report limits +get_limits_els(Els) -> + lists:foldl( + fun(XML, R) -> + case XML of + #xmlel{name = x, attrs = Attrs, children = SubEls} -> + case (?NS_DATA_FORMS_b == exmpp_xml:get_attribute_from_list_as_binary(Attrs, xmlns, "")) and + (<<"result">> == exmpp_xml:get_attribute_from_list_as_binary(Attrs, type, "")) of + true -> get_limits_fields(SubEls) ++ R; + false -> R + end; + _ -> R + end + end, + [], + Els + ). + +get_limits_fields(Fields) -> + {Head, Tail} = lists:partition( + fun(Field) -> + case Field of + #xmlel{name = field, attrs = Attrs} -> + (<<"FORM_TYPE">> == exmpp_xml:get_attribute_from_list_as_binary(Attrs, var, "")) + and (<<"hidden">> == exmpp_xml:get_attribute_from_list_as_binary(Attrs, type, "")); + _ -> false + end + end, + Fields + ), + case Head of + [] -> []; + _ -> get_limits_values(Tail) + end. + +get_limits_values(Values) -> + lists:foldl( + fun(Value, R) -> + case Value of + #xmlel{name = field, attrs = Attrs, children = SubEls} -> + %% TODO: Only one subel is expected here, but there may be several + #xmlel{children = SubElsV} = exmpp_xml:get_element(SubEls, value), + Number = exmpp_xml:get_cdata_from_list_as_list(SubElsV), + Name = exmpp_xml:get_attribute_from_list_as_list(Attrs, var, ""), + [{list_to_atom(Name), list_to_integer(Number)} | R]; + _ -> R + end + end, + [], + Values + ). + +%%%================================== +%%%% Check protocol support: Receive response: Disco Items + +process_discoitems_result(From, LServiceS, Els) -> + %% Convert list of xmlel into list of strings + List = lists:foldl( + fun(XML, Res) -> + %% For each one, if it's "item", look for jid + case XML of + #xmlel{name = item, attrs = Attrs} -> + Res ++ [exmpp_xml:get_attribute_from_list_as_list(Attrs, jid, "")]; + _ -> Res + end + end, + [], + Els), + + %% Send disco#info queries to each item + [send_query_info(Item, LServiceS) || Item <- List], + + %% Search who was awaiting a disco#items response from this JID + FromS = jts(From), + {found_waiter, Waiter} = search_waiter(FromS, LServiceS, items), + + case List of + [] -> + %% The server doesn't support XEP33, and it has no items + received_awaiter(FromS, Waiter, LServiceS); + _ -> + delo_waiter(Waiter), + add_waiter(Waiter#waiter{ + awaiting = {List, LServiceS, info}, + renewal = false + }) + end. + +%%%================================== +%%%% Check protocol support: Receive response: Received awaiter + +received_awaiter(JID, Waiter, LServiceS) -> + {JIDs, LServiceS, _WaiterType} = Waiter#waiter.awaiting, + delo_waiter(Waiter), + Group = Waiter#waiter.group, + RServer = Group#group.server, + + %% Remove this awaiter from the list of awaiting JIDs. + case lists:delete(JID, JIDs) of + + [] -> + %% We couldn't find any service in this server that supports XEP33 + case Waiter#waiter.renewal of + + false -> + %% Store on cache the response + add_response(RServer, not_supported), + + %% Send a copy of the packet to each remote user on Dests + From = Waiter#waiter.sender, + Packet = Waiter#waiter.packet, + AAttrs = Waiter#waiter.aattrs, + Addresses = Waiter#waiter.addresses, + [route_packet(From, ToUser, Packet, AAttrs, Addresses) + || ToUser <- Group#group.dests]; + + true -> + %% We asked this component because the cache + %% said it would support XEP33, but it doesn't! + send_query_info(RServer, LServiceS), + add_waiter(Waiter#waiter{ + awaiting = {[RServer], LServiceS, info}, + renewal = false + }) + end; + + JIDs2 -> + %% Maybe other component on the server supports XEP33 + add_waiter(Waiter#waiter{ + awaiting = {JIDs2, LServiceS, info}, + renewal = false + }) + end. + +%%%================================== +%%%% Cache + +create_cache() -> + mnesia:create_table(multicastc, [{ram_copies, [node()]}, + {attributes, record_info(fields, multicastc)}]). + +%% Add this response to the cache. +%% If a previous response still exists, it's overwritten +add_response(RServer, Response) -> + Secs = calendar:datetime_to_gregorian_seconds(calendar:now_to_datetime(now())), + mnesia:dirty_write(#multicastc{rserver = RServer, + response = Response, + ts = Secs}). + +%% Search on the cache if there is a response for the server +%% If there is a response but is obsolete, +%% don't bother removing since it will later be overwritten anyway +search_server_on_cache(RServer, LServerS, _Maxmins) + when RServer == LServerS -> + {cached, local_server}; + +search_server_on_cache(RServer, _LServerS, Maxmins) -> + case look_server(RServer) of + not_cached -> + not_cached; + {cached, Response, Ts} -> + Now = calendar:datetime_to_gregorian_seconds(calendar:now_to_datetime(now())), + case is_obsolete(Response, Ts, Now, Maxmins) of + false -> {cached, Response}; + true -> {obsolete, Response} + end + end. + +look_server(RServer) -> + case mnesia:dirty_read(multicastc, RServer) of + [] -> not_cached; + [M] -> {cached, M#multicastc.response, M#multicastc.ts} + end. + +is_obsolete(Response, Ts, Now, {Max_pos, Max_neg}) -> + Max = case Response of + multicast_not_supported -> Max_neg; + _ -> Max_pos + end, + (Now - Ts) > Max. + +%%%================================== +%%%% Purge cache + +purge() -> + Maxmins_positive = ?MAXTIME_CACHE_POSITIVE, + Maxmins_negative = ?MAXTIME_CACHE_NEGATIVE, + Now = calendar:datetime_to_gregorian_seconds(calendar:now_to_datetime(now())), + purge(Now, {Maxmins_positive, Maxmins_negative}). + +purge(Now, Maxmins) -> + F = fun() -> + mnesia:foldl( + fun(R, _) -> + #multicastc{response = Response, ts = Ts} = R, + %% If this record is obsolete, delete it + case is_obsolete(Response, Ts, Now, Maxmins) of + true -> mnesia:delete_object(R); + false -> ok + end + end, + none, + multicastc) + end, + mnesia:transaction(F). + +%%%================================== +%%%% Purge cache loop + +try_start_loop() -> + case lists:member(?PURGE_PROCNAME, registered()) of + true -> ok; + false -> start_loop() + end, + ?PURGE_PROCNAME ! new_module. + +start_loop() -> + register(?PURGE_PROCNAME, spawn(?MODULE, purge_loop, [0])), + ?PURGE_PROCNAME ! purge_now. + +try_stop_loop() -> + ?PURGE_PROCNAME ! try_stop. + +%% NM = number of modules are running on this node +purge_loop(NM) -> + receive + purge_now -> + purge(), + timer:send_after(?CACHE_PURGE_TIMER, ?PURGE_PROCNAME, purge_now), + purge_loop(NM); + new_module -> + purge_loop(NM + 1); + try_stop when NM > 1 -> + purge_loop(NM - 1); + try_stop -> + purge_loop_finished + end. + +%%%================================== +%%%% Pool + +create_pool() -> + catch ets:new(multicastp, [duplicate_bag, public, named_table, {keypos, 2}]). + +%% If a Waiter with the same key exists, it overwrites it +add_waiter(Waiter) -> + true = ets:insert(multicastp, Waiter). + +delo_waiter(Waiter) -> + true = ets:delete_object(multicastp, Waiter). + +%% Search on the Pool who is waiting for this result +%% If there are several matches, pick the first one only +search_waiter(JID, LServiceS, Type) -> + Rs = ets:foldl( + fun(W, Res) -> + {JIDs, LServiceS1, Type1} = W#waiter.awaiting, + case lists:member(JID, JIDs) + and (LServiceS == LServiceS1) + and (Type1 == Type) of + true -> Res ++ [W]; + false -> Res + end + end, + [], + multicastp + ), + case Rs of + [R | _] -> {found_waiter, R}; + [] -> waiter_not_found + end. + +%%%================================== +%%%% Limits: utils + +%% Type definitions for data structures related with XEP33 limits +%% limit() = {Name, Value} +%% Name = atom() +%% Value = {Type, Number} +%% Type = default | custom +%% Number = integer() | infinite + +list_of_limits(local) -> + [{message, ?DEFAULT_LIMIT_LOCAL_MESSAGE}, + {presence, ?DEFAULT_LIMIT_LOCAL_PRESENCE}]; + +list_of_limits(remote) -> + [{message, ?DEFAULT_LIMIT_REMOTE_MESSAGE}, + {presence, ?DEFAULT_LIMIT_REMOTE_PRESENCE}]. + +build_service_limit_record(LimitOpts) -> + LimitOptsL = get_from_limitopts(LimitOpts, local), + LimitOptsR = get_from_limitopts(LimitOpts, remote), + {service_limits, + build_limit_record(LimitOptsL, local), + build_limit_record(LimitOptsR, remote) + }. + +get_from_limitopts(LimitOpts, SenderT) -> + [{StanzaT, Number} + || {SenderT2, StanzaT, Number} <- LimitOpts, + SenderT =:= SenderT2]. + +%% Build a record of type #limits{} +%% In fact, it builds a list and then converts to tuple +%% It is important to put the elements in the list in +%% the same order than the elements in record #limits +build_remote_limit_record(LimitOpts, SenderT) -> + build_limit_record(LimitOpts, SenderT). + +build_limit_record(LimitOpts, SenderT) -> + Limits = [ + get_limit_value(Name, Default, LimitOpts) + || {Name, Default} <- list_of_limits(SenderT)], + list_to_tuple([limits | Limits]). + +get_limit_value(Name, Default, LimitOpts) -> + case lists:keysearch(Name, 1, LimitOpts) of + {value, {Name, Number}} -> + {custom, Number}; + false -> + {default, Default} + end. + +get_limit_number(message, Limits) -> Limits#limits.message; +get_limit_number(presence, Limits) -> Limits#limits.presence. + +get_slimit_group(local, SLimits) -> SLimits#service_limits.local; +get_slimit_group(remote, SLimits) -> SLimits#service_limits.remote. + +fragment_dests(Dests, Limit_number) -> + {R, _} = lists:foldl( + fun(Dest, {Res, Count}) -> + case Count of + Limit_number -> + Head2 = [Dest], + {[Head2 | Res], 0}; + _ -> + [Head | Tail] = Res, + Head2 = [Dest | Head], + {[Head2 | Tail], Count+1} + end + end, + {[[]], 0}, + Dests), + R. + +%%%================================== +%%%% Limits: XEP-0128 Service Discovery Extensions + +%% Some parts of code are borrowed from mod_muc_room.erl + +-define(RFIELDT(Type, Var, Val), + #xmlel{name = 'field', attrs = [?XMLATTR('type', Type), + ?XMLATTR('var', Var)], + children = [#xmlel{name = 'value', + children = [#xmlcdata{cdata = Val}]}]}). + +-define(RFIELDV(Var, Val), + #xmlel{name = 'field', attrs = [?XMLATTR('var', Var)], + children = [#xmlel{name = 'value', + children = [#xmlcdata{cdata = Val}]}]}). + +iq_disco_info_extras(From, State) -> + SenderT = sender_type(From), + Service_limits = State#state.service_limits, + case iq_disco_info_extras2(SenderT, Service_limits) of + [] -> []; + List_limits_xmpp -> + Children = [?RFIELDT("hidden", "FORM_TYPE", ?NS_ADDRESS)] ++ List_limits_xmpp, + [#xmlel{name = x, ns = ?NS_DATA_FORMS, attrs = [#xmlattr{name = type, value = <<"result">>}], children = Children}] + end. + +sender_type(From) -> + Local_hosts = ?MYHOSTS, + case lists:member(exmpp_jid:domain_as_list(From), Local_hosts) of + true -> local; + false -> remote + end. + +iq_disco_info_extras2(SenderT, SLimits) -> + %% And report only the limits that are interesting for this sender + Limits = get_slimit_group(SenderT, SLimits), + Stanza_types = [message, presence], + lists:foldl( + fun(Type_of_stanza, R) -> + %% Report only custom limits + case get_limit_number(Type_of_stanza, Limits) of + {custom, Number} -> + [?RFIELDV(to_string(Type_of_stanza), to_string(Number)) | R]; + {default, _} -> R + end + end, + [], + Stanza_types). + +to_string(A) -> + hd(io_lib:format("~p",[A])). + +%%%================================== +%%%% Error report + +route_error(From, To, Packet, ErrType, ErrText) -> + Lang = exmpp_stanza:get_lang(Packet), + Err = exmpp_stanza:reply_with_error( + Packet, + ?ERR(Packet, ErrType, Lang, ErrText) + ), + ejabberd_router:route(From, To, Err). + +stj(String) -> exmpp_jid:parse(String). +jts(String) -> exmpp_jid:prep_to_list(String). + +%%%================================== + +%%% vim: set foldmethod=marker foldmarker=%%%%,%%%=: