Implement cache for roster

This commit is contained in:
Evgeniy Khramtsov 2017-05-17 14:47:35 +03:00
parent 3f13396d73
commit f782955c06
16 changed files with 646 additions and 644 deletions

View File

@ -104,13 +104,6 @@
).
%% @type affiliation() = 'none' | 'owner' | 'publisher' | 'publish-only' | 'member' | 'outcast'.
-type(subscription() :: 'none'
| 'pending'
| 'unconfigured'
| 'subscribed'
).
%% @type subscription() = 'none' | 'pending' | 'unconfigured' | 'subscribed'.
-type(accessModel() :: 'open'
| 'presence'
| 'roster'

View File

@ -20,7 +20,7 @@
{deps, [{lager, ".*", {git, "https://github.com/basho/lager", {tag, "3.2.1"}}},
{p1_utils, ".*", {git, "https://github.com/processone/p1_utils", "470539a"}},
{cache_tab, ".*", {git, "https://github.com/processone/cache_tab", "51eee22"}},
{cache_tab, ".*", {git, "https://github.com/processone/cache_tab", "6f762a59"}},
{fast_tls, ".*", {git, "https://github.com/processone/fast_tls", {tag, "1.0.11"}}},
{stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.8"}}},
{fast_xml, ".*", {git, "https://github.com/processone/fast_xml", {tag, "1.1.21"}}},

View File

@ -46,7 +46,7 @@
reject_unauthenticated_packet/2, process_closed/2,
process_terminated/2, process_info/2]).
%% API
-export([get_presence/1, get_subscription/2, get_subscribed/1,
-export([get_presence/1, resend_presence/1, resend_presence/2,
open_session/1, call/3, send/2, close/1, close/2, stop/1,
reply/2, copy_state/2, set_timeout/2, route/2,
host_up/1, host_down/1]).
@ -54,6 +54,7 @@
-include("ejabberd.hrl").
-include("xmpp.hrl").
-include("logger.hrl").
-include("mod_roster.hrl").
-define(SETS, gb_sets).
@ -93,23 +94,13 @@ reply(Ref, Reply) ->
get_presence(Ref) ->
call(Ref, get_presence, 1000).
-spec get_subscription(jid() | ljid(), state()) -> both | from | to | none.
get_subscription(#jid{} = From, State) ->
get_subscription(jid:tolower(From), State);
get_subscription(LFrom, #{pres_f := PresF, pres_t := PresT}) ->
LBFrom = jid:remove_resource(LFrom),
F = ?SETS:is_element(LFrom, PresF) orelse ?SETS:is_element(LBFrom, PresF),
T = ?SETS:is_element(LFrom, PresT) orelse ?SETS:is_element(LBFrom, PresT),
if F and T -> both;
F -> from;
T -> to;
true -> none
end.
-spec resend_presence(pid()) -> ok.
resend_presence(Pid) ->
resend_presence(Pid, undefined).
-spec get_subscribed(pid()) -> [ljid()].
%% Return list of all available resources of contacts
get_subscribed(Ref) ->
call(Ref, get_subscribed, 1000).
-spec resend_presence(pid(), jid() | undefined) -> ok.
resend_presence(Pid, To) ->
route(Pid, {resend_presence, To}).
-spec close(pid()) -> ok;
(state()) -> state().
@ -183,8 +174,7 @@ host_down(Host) ->
copy_state(#{owner := Owner} = NewState,
#{jid := JID, resource := Resource, sid := {Time, _},
auth_module := AuthModule, lserver := LServer,
pres_t := PresT, pres_a := PresA,
pres_f := PresF} = OldState) ->
pres_a := PresA} = OldState) ->
State1 = case OldState of
#{pres_last := Pres, pres_timestamp := PresTS} ->
NewState#{pres_last => Pres, pres_timestamp => PresTS};
@ -196,8 +186,7 @@ copy_state(#{owner := Owner} = NewState,
conn => Conn,
sid => {Time, Owner},
auth_module => AuthModule,
pres_t => PresT, pres_a => PresA,
pres_f => PresF},
pres_a => PresA},
ejabberd_hooks:run_fold(c2s_copy_session, LServer, State2, [OldState]).
-spec open_session(state()) -> {ok, state()} | state().
@ -238,10 +227,17 @@ process_info(#{lserver := LServer} = State, {route, Packet}) ->
true ->
State1
end;
process_info(State, force_update_presence) ->
process_info(#{jid := JID} = State, {resend_presence, To}) ->
case maps:get(pres_last, State, error) of
error -> State;
Pres -> process_self_presence(State, Pres)
Pres when To == undefined ->
process_self_presence(State, Pres);
Pres when To#jid.luser == JID#jid.luser andalso
To#jid.lserver == JID#jid.lserver andalso
To#jid.lresource == <<"">> ->
process_self_presence(State, Pres);
Pres ->
process_presence_out(State, xmpp:set_to(Pres, To))
end;
process_info(State, Info) ->
?WARNING_MSG("got unexpected info: ~p", [Info]),
@ -390,15 +386,11 @@ bind(R, #{user := U, server := S, access := Access, lang := Lang,
allow ->
State1 = open_session(State#{resource => Resource,
sid => ejabberd_sm:make_sid()}),
LBJID = jid:remove_resource(jid:tolower(JID)),
PresF = ?SETS:add_element(LBJID, maps:get(pres_f, State1)),
PresT = ?SETS:add_element(LBJID, maps:get(pres_t, State1)),
State2 = State1#{pres_f => PresF, pres_t => PresT},
State3 = ejabberd_hooks:run_fold(
c2s_session_opened, LServer, State2, []),
State2 = ejabberd_hooks:run_fold(
c2s_session_opened, LServer, State1, []),
?INFO_MSG("(~s) Opened c2s session for ~s",
[SockMod:pp(Socket), jid:encode(JID)]),
{ok, State3};
{ok, State2};
deny ->
ejabberd_hooks:run(forbidden_session_hook, LServer, [JID]),
?INFO_MSG("(~s) Forbidden c2s session for ~s",
@ -513,8 +505,6 @@ init([State, Opts]) ->
tls_enabled => TLSEnabled,
tls_verify => TLSVerify,
pres_a => ?SETS:new(),
pres_f => ?SETS:new(),
pres_t => ?SETS:new(),
zlib => Zlib,
lang => ?MYLANG,
server => ?MYNAME,
@ -532,9 +522,6 @@ handle_call(get_presence, From, #{jid := JID} = State) ->
end,
reply(From, Pres),
State;
handle_call(get_subscribed, From, #{pres_f := PresF} = State) ->
reply(From, ?SETS:to_list(PresF)),
State;
handle_call(Request, From, #{lserver := LServer} = State) ->
ejabberd_hooks:run_fold(
c2s_handle_call, LServer, State, [Request, From]).
@ -589,36 +576,36 @@ process_message_in(State, #message{type = T} = Msg) ->
-spec process_presence_in(state(), presence()) -> {boolean(), state()}.
process_presence_in(#{lserver := LServer, pres_a := PresA} = State0,
#presence{from = From, to = To, type = T} = Pres) ->
#presence{from = From, type = T} = Pres) ->
State = ejabberd_hooks:run_fold(c2s_presence_in, LServer, State0, [Pres]),
case T of
probe ->
NewState = add_to_pres_a(State, From),
route_probe_reply(From, To, NewState),
{false, NewState};
route_probe_reply(From, State),
{false, State};
error ->
A = ?SETS:del_element(jid:tolower(From), PresA),
{true, State#{pres_a => A}};
_ ->
case privacy_check_packet(State, Pres, in) of
allow ->
NewState = add_to_pres_a(State, From),
{true, NewState};
{true, State};
deny ->
{false, State}
end
end.
-spec route_probe_reply(jid(), jid(), state()) -> ok.
route_probe_reply(From, To, #{lserver := LServer, pres_f := PresF,
pres_last := LastPres,
pres_timestamp := TS} = State) ->
LFrom = jid:tolower(From),
LBFrom = jid:remove_resource(LFrom),
case ?SETS:is_element(LFrom, PresF)
orelse ?SETS:is_element(LBFrom, PresF) of
true ->
%% To is my JID
-spec route_probe_reply(jid(), state()) -> ok.
route_probe_reply(From, #{jid := To,
pres_last := LastPres,
pres_timestamp := TS} = State) ->
{LUser, LServer, LResource} = jid:tolower(To),
IsAnotherResource = case jid:tolower(From) of
{LUser, LServer, R} when R /= LResource -> true;
_ -> false
end,
Subscription = get_subscription(To, From),
if IsAnotherResource orelse
Subscription == both orelse Subscription == from ->
Packet = xmpp_util:add_delay_info(LastPres, To, TS),
case privacy_check_packet(State, Packet, out) of
deny ->
@ -627,19 +614,12 @@ route_probe_reply(From, To, #{lserver := LServer, pres_f := PresF,
ejabberd_hooks:run(presence_probe_hook,
LServer,
[From, To, self()]),
%% Don't route a presence probe to oneself
case From == To of
false ->
ejabberd_router:route(
xmpp:set_from_to(Packet, To, From));
true ->
ok
end
ejabberd_router:route(xmpp:set_from_to(Packet, To, From))
end;
false ->
true ->
ok
end;
route_probe_reply(_, _, _) ->
route_probe_reply(_, _) ->
ok.
-spec process_presence_out(state(), presence()) -> state().
@ -675,11 +655,22 @@ process_presence_out(#{user := User, server := Server, lserver := LServer,
State;
allow ->
ejabberd_router:route(Pres),
A = case Type of
available -> ?SETS:add_element(LTo, PresA);
unavailable -> ?SETS:del_element(LTo, PresA)
end,
State#{pres_a => A}
LBareTo = jid:remove_resource(LTo),
LBareFrom = jid:remove_resource(jid:tolower(From)),
if LBareTo /= LBareFrom ->
Subscription = get_subscription(From, To),
if Subscription /= both andalso Subscription /= from ->
A = case Type of
available -> ?SETS:add_element(LTo, PresA);
unavailable -> ?SETS:del_element(LTo, PresA)
end,
State#{pres_a => A};
true ->
State
end;
true ->
State
end
end.
-spec process_self_presence(state(), presence()) -> state().
@ -716,24 +707,81 @@ update_priority(#{ip := IP, conn := Conn, auth_module := AuthMod,
ejabberd_sm:set_presence(SID, U, S, R, Priority, Pres, Info).
-spec broadcast_presence_unavailable(state(), presence()) -> state().
broadcast_presence_unavailable(#{pres_a := PresA} = State, Pres) ->
JIDs = filter_blocked(State, Pres, PresA),
broadcast_presence_unavailable(#{jid := JID, pres_a := PresA} = State, Pres) ->
#jid{luser = LUser, lserver = LServer} = JID,
BareJID = jid:remove_resource(JID),
Items1 = ejabberd_hooks:run_fold(roster_get, LServer,
[], [{LUser, LServer}]),
Items2 = ?SETS:fold(
fun(LJID, Items) ->
[#roster{jid = LJID, subscription = from}|Items]
end, Items1, PresA),
JIDs = lists:foldl(
fun(#roster{jid = LJID, subscription = Sub}, Tos)
when Sub == both orelse Sub == from ->
To = jid:make(LJID),
P = xmpp:set_to(Pres, jid:make(LJID)),
case privacy_check_packet(State, P, out) of
allow -> [To|Tos];
deny -> Tos
end;
(_, Tos) ->
Tos
end, [BareJID], Items2),
route_multiple(State, JIDs, Pres),
State#{pres_a => ?SETS:new()}.
-spec broadcast_presence_available(state(), presence(), boolean()) -> state().
broadcast_presence_available(#{pres_a := PresA, pres_f := PresF,
pres_t := PresT, jid := JID} = State,
broadcast_presence_available(#{jid := JID} = State,
Pres, _FromUnavailable = true) ->
Probe = #presence{from = JID, type = probe},
TJIDs = filter_blocked(State, Probe, PresT),
FJIDs = filter_blocked(State, Pres, PresF),
#jid{luser = LUser, lserver = LServer} = JID,
BareJID = jid:remove_resource(JID),
Items = ejabberd_hooks:run_fold(roster_get, LServer,
[], [{LUser, LServer}]),
{FJIDs, TJIDs} =
lists:foldl(
fun(#roster{jid = LJID, subscription = Sub}, {F, T}) ->
To = jid:make(LJID),
F1 = if Sub == both orelse Sub == from ->
Pres1 = xmpp:set_to(Pres, To),
case privacy_check_packet(State, Pres1, out) of
allow -> [To|F];
deny -> F
end;
true -> F
end,
T1 = if Sub == both orelse Sub == to ->
Probe1 = xmpp:set_to(Probe, To),
case privacy_check_packet(State, Probe1, out) of
allow -> [To|T];
deny -> T
end;
true -> T
end,
{F1, T1}
end, {[BareJID], [BareJID]}, Items),
route_multiple(State, TJIDs, Probe),
route_multiple(State, FJIDs, Pres),
State#{pres_a => ?SETS:union(PresA, PresF)};
broadcast_presence_available(#{pres_a := PresA, pres_f := PresF} = State,
State;
broadcast_presence_available(#{jid := JID} = State,
Pres, _FromUnavailable = false) ->
JIDs = filter_blocked(State, Pres, ?SETS:intersection(PresA, PresF)),
#jid{luser = LUser, lserver = LServer} = JID,
BareJID = jid:remove_resource(JID),
Items = ejabberd_hooks:run_fold(
roster_get, LServer, [], [{LUser, LServer}]),
JIDs = lists:foldl(
fun(#roster{jid = LJID, subscription = Sub}, Tos)
when Sub == both orelse Sub == from ->
To = jid:make(LJID),
P = xmpp:set_to(Pres, jid:make(LJID)),
case privacy_check_packet(State, P, out) of
allow -> [To|Tos];
deny -> Tos
end;
(_, Tos) ->
Tos
end, [BareJID], Items),
route_multiple(State, JIDs, Pres),
State.
@ -761,23 +809,17 @@ get_priority_from_presence(#presence{priority = Prio}) ->
_ -> Prio
end.
-spec filter_blocked(state(), presence(), ?SETS:set()) -> [jid()].
filter_blocked(#{jid := From} = State, Pres, LJIDSet) ->
?SETS:fold(
fun(LJID, Acc) ->
To = jid:make(LJID),
Pkt = xmpp:set_from_to(Pres, From, To),
case privacy_check_packet(State, Pkt, out) of
allow -> [To|Acc];
deny -> Acc
end
end, [], LJIDSet).
-spec route_multiple(state(), [jid()], stanza()) -> ok.
route_multiple(#{lserver := LServer}, JIDs, Pkt) ->
From = xmpp:get_from(Pkt),
ejabberd_router_multicast:route_multicast(From, LServer, JIDs, Pkt).
get_subscription(#jid{luser = LUser, lserver = LServer}, JID) ->
{Subscription, _} = ejabberd_hooks:run_fold(
roster_get_jid_info, LServer, {none, []},
[LUser, LServer, JID]),
Subscription.
-spec resource_conflict_action(binary(), binary(), binary()) ->
{accept_resource, binary()} | closenew.
resource_conflict_action(U, S, R) ->
@ -855,30 +897,6 @@ change_shaper(#{shaper := ShaperName, ip := IP, lserver := LServer,
LServer),
xmpp_stream_in:change_shaper(State, Shaper).
-spec add_to_pres_a(state(), jid()) -> state().
add_to_pres_a(#{pres_a := PresA, pres_f := PresF} = State, From) ->
LFrom = jid:tolower(From),
LBFrom = jid:remove_resource(LFrom),
case (?SETS):is_element(LFrom, PresA) orelse
(?SETS):is_element(LBFrom, PresA) of
true ->
State;
false ->
case (?SETS):is_element(LFrom, PresF) of
true ->
A = (?SETS):add_element(LFrom, PresA),
State#{pres_a => A};
false ->
case (?SETS):is_element(LBFrom, PresF) of
true ->
A = (?SETS):add_element(LBFrom, PresA),
State#{pres_a => A};
false ->
State
end
end
end.
-spec format_reason(state(), term()) -> binary().
format_reason(#{stop_reason := Reason}, _) ->
xmpp_stream_in:format_error(Reason);

View File

@ -858,7 +858,7 @@ force_update_presence({LUser, LServer}) ->
Mod = get_sm_backend(LServer),
Ss = online(get_sessions(Mod, LUser, LServer)),
lists:foreach(fun (#session{sid = {_, Pid}}) ->
ejabberd_c2s:route(Pid, force_update_presence)
ejabberd_c2s:resend_presence(Pid)
end,
Ss).

View File

@ -57,39 +57,40 @@ filter_packet({#message{} = Msg, State} = Acc) ->
From = xmpp:get_from(Msg),
LFrom = jid:tolower(From),
LBFrom = jid:remove_resource(LFrom),
#{pres_a := PresA,
pres_t := PresT,
pres_f := PresF} = State,
#{pres_a := PresA, jid := JID, lserver := LServer} = State,
case (Msg#message.body == [] andalso
Msg#message.subject == [])
orelse ejabberd_router:is_my_route(From#jid.lserver)
orelse (?SETS):is_element(LFrom, PresA)
orelse (?SETS):is_element(LBFrom, PresA)
orelse sets_bare_member(LBFrom, PresA)
orelse (?SETS):is_element(LFrom, PresT)
orelse (?SETS):is_element(LBFrom, PresT)
orelse (?SETS):is_element(LFrom, PresF)
orelse (?SETS):is_element(LBFrom, PresF) of
true ->
Acc;
orelse sets_bare_member(LBFrom, PresA) of
false ->
#{lserver := LServer} = State,
Drop = gen_mod:get_module_opt(LServer, ?MODULE, drop, true),
Log = gen_mod:get_module_opt(LServer, ?MODULE, log, false),
if
Log ->
?INFO_MSG("Drop packet: ~s",
[fxml:element_to_binary(
xmpp:encode(Msg, ?NS_CLIENT))]);
true ->
ok
end,
if
Drop ->
{stop, {drop, State}};
true ->
Acc
end
{Sub, _} = ejabberd_hooks:run_fold(
roster_get_jid_info, LServer,
{none, []}, [JID#jid.luser, LServer, From]),
case Sub of
none ->
Drop = gen_mod:get_module_opt(LServer, ?MODULE, drop, true),
Log = gen_mod:get_module_opt(LServer, ?MODULE, log, false),
if
Log ->
?INFO_MSG("Drop packet: ~s",
[fxml:element_to_binary(
xmpp:encode(Msg, ?NS_CLIENT))]);
true ->
ok
end,
if
Drop ->
{stop, {drop, State}};
true ->
Acc
end;
_ ->
Acc
end;
true ->
Acc
end;
filter_packet(Acc) ->
Acc.

View File

@ -203,7 +203,9 @@ disco_info(Acc, _, _, _Node, _Lang) ->
-spec c2s_presence_in(ejabberd_c2s:state(), presence()) -> ejabberd_c2s:state().
c2s_presence_in(C2SState,
#presence{from = From, to = To, type = Type} = Presence) ->
Subscription = ejabberd_c2s:get_subscription(From, C2SState),
{Subscription, _} = ejabberd_hooks:run_fold(
roster_get_jid_info, To#jid.lserver,
{none, []}, [To#jid.luser, To#jid.lserver, From]),
Insert = (Type == available)
and ((Subscription == both) or (Subscription == to)),
Delete = (Type == unavailable) or (Type == error),

View File

@ -157,7 +157,10 @@ privacy_check_packet(allow, C2SState,
when T == get; T == set ->
case xmpp:has_subtag(IQ, #last{}) of
true ->
Sub = ejabberd_c2s:get_subscription(From, C2SState),
#jid{luser = LUser, lserver = LServer} = To,
{Sub, _} = ejabberd_hooks:run_fold(
roster_get_jid_info, LServer,
{none, []}, [LUser, LServer, From]),
if Sub == from; Sub == both ->
Pres = #presence{from = To, to = From},
case ejabberd_hooks:run_fold(

View File

@ -255,7 +255,7 @@ set_room_option(_Acc, {mam, Val}, _Lang) ->
set_room_option(Acc, _Property, _Lang) ->
Acc.
-spec user_receive_packet({stanza(), ejabberd_c2s:state()}) -> {stanza(), ejabberd_c2s:state()}.
-spec user_receive_packet({stanza(), c2s_state()}) -> {stanza(), c2s_state()}.
user_receive_packet({Pkt, #{jid := JID} = C2SState}) ->
Peer = xmpp:get_from(Pkt),
LUser = JID#jid.luser,
@ -263,7 +263,7 @@ user_receive_packet({Pkt, #{jid := JID} = C2SState}) ->
Pkt2 = case should_archive(Pkt, LServer) of
true ->
Pkt1 = strip_my_archived_tag(Pkt, LServer),
case store_msg(C2SState, Pkt1, LUser, LServer, Peer, recv) of
case store_msg(Pkt1, LUser, LServer, Peer, recv) of
{ok, ID} ->
set_stanza_id(Pkt1, JID, ID);
_ ->
@ -274,7 +274,7 @@ user_receive_packet({Pkt, #{jid := JID} = C2SState}) ->
end,
{Pkt2, C2SState}.
-spec user_send_packet({stanza(), ejabberd_c2s:state()}) -> {stanza(), ejabberd_c2s:state()}.
-spec user_send_packet({stanza(), c2s_state()}) -> {stanza(), c2s_state()}.
user_send_packet({Pkt, #{jid := JID} = C2SState}) ->
Peer = xmpp:get_to(Pkt),
LUser = JID#jid.luser,
@ -282,7 +282,7 @@ user_send_packet({Pkt, #{jid := JID} = C2SState}) ->
Pkt2 = case should_archive(Pkt, LServer) of
true ->
Pkt1 = strip_my_archived_tag(Pkt, LServer),
case store_msg(C2SState, xmpp:set_from_to(Pkt1, JID, Peer),
case store_msg(xmpp:set_from_to(Pkt1, JID, Peer),
LUser, LServer, Peer, send) of
{ok, ID} ->
set_stanza_id(Pkt1, JID, ID);
@ -301,7 +301,7 @@ offline_message({_Action, #message{from = Peer, to = To} = Pkt} = Acc) ->
case should_archive(Pkt, LServer) of
true ->
Pkt1 = strip_my_archived_tag(Pkt, LServer),
case store_msg(undefined, Pkt1, LUser, LServer, Peer, recv) of
case store_msg(Pkt1, LUser, LServer, Peer, recv) of
{ok, ID} ->
{archived, set_stanza_id(Pkt1, To, ID)};
_ ->
@ -311,8 +311,8 @@ offline_message({_Action, #message{from = Peer, to = To} = Pkt} = Acc) ->
Acc
end.
-spec user_send_packet_strip_tag({stanza(), ejabberd_c2s:state()}) ->
{stanza(), ejabberd_c2s:state()}.
-spec user_send_packet_strip_tag({stanza(), c2s_state()}) ->
{stanza(), c2s_state()}.
user_send_packet_strip_tag({Pkt, #{jid := JID} = C2SState}) ->
LServer = JID#jid.lserver,
{strip_my_archived_tag(Pkt, LServer), C2SState}.
@ -415,16 +415,16 @@ disco_sm_features({result, OtherFeatures},
disco_sm_features(Acc, _From, _To, _Node, _Lang) ->
Acc.
-spec message_is_archived(boolean(), ejabberd_c2s:state(), message()) -> boolean().
-spec message_is_archived(boolean(), c2s_state(), message()) -> boolean().
message_is_archived(true, _C2SState, _Pkt) ->
true;
message_is_archived(false, #{jid := JID} = C2SState, Pkt) ->
message_is_archived(false, #{jid := JID}, Pkt) ->
#jid{luser = LUser, lserver = LServer} = JID,
Peer = xmpp:get_from(Pkt),
case gen_mod:get_module_opt(LServer, ?MODULE, assume_mam_usage, false) of
true ->
should_archive(strip_my_archived_tag(Pkt, LServer), LServer)
andalso should_archive_peer(C2SState, LUser, LServer,
andalso should_archive_peer(LUser, LServer,
get_prefs(LUser, LServer),
Peer);
false ->
@ -615,9 +615,9 @@ strip_x_jid_tags(Pkt) ->
end, Els),
xmpp:set_els(Pkt, NewEls).
-spec should_archive_peer(c2s_state() | undefined, binary(), binary(),
-spec should_archive_peer(binary(), binary(),
#archive_prefs{}, jid()) -> boolean().
should_archive_peer(C2SState, LUser, LServer,
should_archive_peer(LUser, LServer,
#archive_prefs{default = Default,
always = Always,
never = Never},
@ -635,23 +635,11 @@ should_archive_peer(C2SState, LUser, LServer,
always -> true;
never -> false;
roster ->
Sub = case C2SState of
undefined ->
{S, _} = ejabberd_hooks:run_fold(
roster_get_jid_info,
LServer, {none, []},
[LUser, LServer, Peer]),
S;
_ ->
ejabberd_c2s:get_subscription(
LPeer, C2SState)
end,
case Sub of
both -> true;
from -> true;
to -> true;
_ -> false
end
{Sub, _} = ejabberd_hooks:run_fold(
roster_get_jid_info,
LServer, {none, []},
[LUser, LServer, Peer]),
Sub == both orelse Sub == from orelse Sub == to
end
end
end.
@ -719,12 +707,12 @@ may_enter_room(From,
may_enter_room(From, MUCState) ->
mod_muc_room:is_occupant_or_admin(From, MUCState).
-spec store_msg(c2s_state() | undefined, stanza(),
-spec store_msg(stanza(),
binary(), binary(), jid(), send | recv) ->
{ok, binary()} | pass.
store_msg(C2SState, Pkt, LUser, LServer, Peer, Dir) ->
store_msg(Pkt, LUser, LServer, Peer, Dir) ->
Prefs = get_prefs(LUser, LServer),
case should_archive_peer(C2SState, LUser, LServer, Prefs, Peer) of
case should_archive_peer(LUser, LServer, Prefs, Peer) of
true ->
US = {LUser, LServer},
case ejabberd_hooks:run_fold(store_mam_message, LServer, Pkt,

View File

@ -43,6 +43,7 @@
-include("logger.hrl").
-include("xmpp.hrl").
-include("pubsub.hrl").
-include("mod_roster.hrl").
-define(STDTREE, <<"tree">>).
-define(STDNODE, <<"flat">>).
@ -405,9 +406,19 @@ terminate_plugins(Host, ServerHost, Plugins, TreePlugin) ->
TreePlugin:terminate(Host, ServerHost),
ok.
get_subscribed(User, Server) ->
Items = ejabberd_hooks:run_fold(roster_get, Server, [], [{User, Server}]),
lists:filtermap(
fun(#roster{jid = LJID, subscription = Sub})
when Sub == both orelse Sub == from ->
{true, LJID};
(_) ->
false
end, Items).
send_loop(State) ->
receive
{presence, JID, Pid} ->
{presence, JID, _Pid} ->
Host = State#state.host,
ServerHost = State#state.server_host,
DBType = State#state.db_type,
@ -429,26 +440,21 @@ send_loop(State) ->
State#state.plugins),
if not State#state.ignore_pep_from_offline ->
{User, Server, Resource} = LJID,
case catch ejabberd_c2s:get_subscribed(Pid) of
Contacts when is_list(Contacts) ->
lists:foreach(
fun({U, S, R}) when S == ServerHost ->
case user_resources(U, S) of
[] -> %% offline
PeerJID = jid:make(U, S, R),
self() ! {presence, User, Server, [Resource], PeerJID};
_ -> %% online
% this is already handled by presence probe
ok
end;
(_) ->
% we can not do anything in any cases
ok
end,
Contacts);
_ ->
ok
end;
Contacts = get_subscribed(User, Server),
lists:foreach(
fun({U, S, R}) when S == ServerHost ->
case user_resources(U, S) of
[] -> %% offline
PeerJID = jid:make(U, S, R),
self() ! {presence, User, Server, [Resource], PeerJID};
_ -> %% online
%% this is already handled by presence probe
ok
end;
(_) ->
%% we can not do anything in any cases
ok
end, Contacts);
true ->
ok
end,

View File

<
@ -43,8 +43,8 @@
-export([start/2, stop/1, reload/3, process_iq/1, export/1,
import_info/0, process_local_iq/1, get_user_roster/2,
import/5, c2s_session_opened/1, get_roster/2,
import_start/2, import_stop/2, user_receive_packet/1,
import/5, get_roster/2,
import_start/2, import_stop/2,
c2s_self_presence/1, in_subscription/6,
out_subscription/4, set_items/3, remove_user/2,
get_jid_info/4, encode_item/1, webadmin_page/3,
@ -63,38 +63,41 @@
-include("ejabberd_web_admin.hrl").
-define(SETS, gb_sets).
-define(ROSTER_CACHE, roster_cache).
-define(ROSTER_ITEM_CACHE, roster_item_cache).
-define(ROSTER_VERSION_CACHE, roster_version_cache).
-export_type([subscription/0]).
-callback init(binary(), gen_mod:opts()) -> any().
-callback import(binary(), binary(), #roster{} | [binary()]) -> ok.
-callback read_roster_version(binary(), binary()) -> binary() | error.
-callback read_roster_version(binary(), binary()) -> {ok, binary()} | error.
-callback write_roster_version(binary(), binary(), boolean(), binary()) -> any().
-callback get_roster(binary(), binary()) -> [#roster{}].
-callback get_roster_by_jid(binary(), binary(), ljid()) -> #roster{}.
-callback get_only_items(binary(), binary()) -> [#roster{}].
-callback get_roster(binary(), binary()) -> {ok, [#roster{}]} | error.
-callback get_roster_item(binary(), binary(), ljid()) -> {ok, #roster{}} | error.
-callback read_subscription_and_groups(binary(), binary(), ljid())
-> {ok, {subscription(), [binary()]}} | error.
-callback roster_subscribe(binary(), binary(), ljid(), #roster{}) -> any().
-callback transaction(binary(), function()) -> {atomic, any()} | {aborted, any()}.
-callback get_roster_by_jid_with_groups(binary(), binary(), ljid()) -> #roster{}.
-callback remove_user(binary(), binary()) -> {atomic, any()}.
-callback remove_user(binary(), binary()) -> any().
-callback update_roster(binary(), binary(), ljid(), #roster{}) -> any().
-callback del_roster(binary(), binary(), ljid()) -> any().
-callback read_subscription_and_groups(binary(), binary(), ljid()) ->
{subscription(), [binary()]}.
-callback use_cache(binary(), roster | roster_version) -> boolean().
-callback cache_nodes(binary()) -> [node()].
-optional_callbacks([use_cache/2, cache_nodes/1]).
start(Host, Opts) ->
IQDisc = gen_mod:get_opt(iqdisc, Opts, gen_iq_handler:iqdisc(Host)),
Mod = gen_mod:db_mod(Host, Opts, ?MODULE),
Mod:init(Host, Opts),
init_cache(Mod, Host, Opts),
ejabberd_hooks:add(roster_get, Host, ?MODULE,
get_user_roster, 50),
ejabberd_hooks:add(roster_in_subscription, Host,
?MODULE, in_subscription, 50),
ejabberd_hooks:add(roster_out_subscription, Host,
?MODULE, out_subscription, 50),
ejabberd_hooks:add(c2s_session_opened, Host, ?MODULE,
c2s_session_opened, 50),
ejabberd_hooks:add(roster_get_jid_info, Host, ?MODULE,
get_jid_info, 50),
ejabberd_hooks:add(remove_user, Host, ?MODULE,
@ -107,8 +110,6 @@ start(Host, Opts) ->
webadmin_page, 50),
ejabberd_hooks:add(webadmin_user, Host, ?MODULE,
webadmin_user, 50),
ejabberd_hooks:add(user_receive_packet, Host, ?MODULE,
user_receive_packet, 50),
gen_iq_handler:add_iq_handler(ejabberd_sm, Host,
?NS_ROSTER, ?MODULE, process_iq, IQDisc).
@ -119,8 +120,6 @@ stop(Host) ->
?MODULE, in_subscription, 50),
ejabberd_hooks:delete(roster_out_subscription, Host,
?MODULE, out_subscription, 50),
ejabberd_hooks:delete(c2s_session_opened, Host, ?MODULE,
c2s_session_opened, 50),
ejabberd_hooks:delete(roster_get_jid_info, Host,
?MODULE, get_jid_info, 50),
ejabberd_hooks:delete(remove_user, Host, ?MODULE,
@ -133,8 +132,6 @@ stop(Host) ->
webadmin_page, 50),
ejabberd_hooks:delete(webadmin_user, Host, ?MODULE,
webadmin_user, 50),
ejabberd_hooks:delete(user_receive_packet, Host, ?MODULE,
user_receive_packet, 50),
gen_iq_handler:remove_iq_handler(ejabberd_sm, Host,
?NS_ROSTER).
@ -244,7 +241,7 @@ roster_version(LServer, LUser) ->
true ->
case read_roster_version(LUser, LServer) of
error -> not_found;
V -> V
{ok, V} -> V
end;
false ->
roster_hash(ejabberd_hooks:run_fold(roster_get, LServer,
@ -252,8 +249,12 @@ roster_version(LServer, LUser) ->
end.
read_roster_version(LUser, LServer) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
Mod:read_roster_version(LUser, LServer).
ets_cache:lookup(
?ROSTER_VERSION_CACHE, {LUser, LServer},
fun() ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
Mod:read_roster_version(LUser, LServer)
end).
write_roster_version(LUser, LServer) ->
write_roster_version(LUser, LServer, false).
@ -265,6 +266,11 @@ write_roster_version(LUser, LServer, InTransaction) ->
Ver = str:sha(term_to_binary(p1_time_compat:unique_integer())),
Mod = gen_mod:db_mod(LServer, ?MODULE),
Mod:write_roster_version(LUser, LServer, InTransaction, Ver),
if InTransaction -> ok;
true ->
ets_cache:delete(?ROSTER_VERSION_CACHE, {LUser, LServer},
cache_nodes(Mod, LServer))
end,
Ver.
%% Load roster from DB only if neccesary.
@ -289,9 +295,9 @@ process_iq_get(#iq{to = To, lang = Lang,
ejabberd_hooks:run_fold(
roster_get, To#jid.lserver, [], [US])),
RosterVersion};
RequestedVersion ->
{ok, RequestedVersion} ->
{false, false};
NewVersion ->
{ok, NewVersion} ->
{lists:map(fun encode_item/1,
ejabberd_hooks:run_fold(
roster_get, To#jid.lserver, [], [US])),
@ -343,18 +349,72 @@ get_user_roster(Acc, {LUser, LServer}) ->
get_roster(LUser, LServer) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
Mod:get_roster(LUser, LServer).
R = case use_cache(Mod, LServer, roster) of
true ->
ets_cache:lookup(
?ROSTER_CACHE, {LUser, LServer},
fun() -> Mod:get_roster(LUser, LServer) end);
false ->
Mod:get_roster(LUser, LServer)
end,
case R of
{ok, Items} -> Items;
error -> []
end.
get_roster_item(LUser, LServer, LJID) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
case Mod:get_roster_item(LUser, LServer, LJID) of
{ok, Item} ->
Item;
error ->
#roster{usj = {LUser, LServer, LJID},
us = {LUser, LServer}, jid = LJID}
end.
get_subscription_and_groups(LUser, LServer, LJID) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
Res = case use_cache(Mod, LServer, roster) of
true ->
LBJID = jid:remove_resource(LJID),
ets_cache:lookup(
?ROSTER_ITEM_CACHE, {LUser, LServer, LBJID},
fun() ->
Items = get_roster(LUser, LServer),
case lists:keyfind(LBJID, #roster.jid, Items) of
#roster{subscription = Sub, groups = Groups} ->
{ok, {Sub, Groups}};
false when element(3, LJID) == <<"">> ->
error;
false ->
case lists:keyfind(LJID, #roster.jid, Items) of
{Sub, Groups} ->
{ok, {Sub, Groups}};
false ->
error
end
end
end);
false ->
Mod:read_subscription_and_groups(LUser, LServer, LJID)
end,
case Res of
{ok, SubAndGroups} ->
SubAndGroups;
error ->
{none, []}
end.
set_roster(#roster{us = {LUser, LServer}, jid = LJID} = Item) ->
transaction(
LServer,
LUser, LServer, [LJID],
fun() ->
update_roster_t(LUser, LServer, LJID, Item)
end).
del_roster(LUser, LServer, LJID) ->
transaction(
LServer,
LUser, LServer, [LJID],
fun() ->
del_roster_t(LUser, LServer, LJID)
end).
@ -387,46 +447,36 @@ decode_item(Item, R, Managed) ->
end,
groups = Item#roster_item.groups}.
get_roster_by_jid_t(LUser, LServer, LJID) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
Mod:get_roster_by_jid(LUser, LServer, LJID).
process_iq_set(#iq{from = From, to = To,
sub_els = [#roster_query{items = QueryItems}]} = IQ) ->
sub_els = [#roster_query{items = [QueryItem]}]} = IQ) ->
#jid{user = User, luser = LUser, lserver = LServer} = To,
Managed = {From#jid.luser, From#jid.lserver} /= {LUser, LServer},
LJID = jid:tolower(QueryItem#roster_item.jid),
F = fun () ->
lists:map(
fun(#roster_item{jid = JID1} = QueryItem) ->
LJID = jid:tolower(JID1),
Item = get_roster_by_jid_t(LUser, LServer, LJID),
Item2 = decode_item(QueryItem, Item, Managed),
Item3 = ejabberd_hooks:run_fold(roster_process_item,
LServer, Item2,
[LServer]),
case Item3#roster.subscription of
remove -> del_roster_t(LUser, LServer, LJID);
_ -> update_roster_t(LUser, LServer, LJID, Item3)
end,
case roster_version_on_db(LServer) of
true -> write_roster_version_t(LUser, LServer);
false -> ok
end,
{Item, Item3}
end, QueryItems)
Item = get_roster_item(LUser, LServer, LJID),
Item2 = decode_item(QueryItem, Item, Managed),
Item3 = ejabberd_hooks:run_fold(roster_process_item,
LServer, Item2,
[LServer]),
case Item3#roster.subscription of
remove -> del_roster_t(LUser, LServer, LJID);
_ -> update_roster_t(LUser, LServer, LJID, Item3)
end,
case roster_version_on_db(LServer) of
true -> write_roster_version_t(LUser, LServer);
false -> ok
end,
{Item, Item3}
end,
case transaction(LServer, F) of
{atomic, ItemPairs} ->
lists:foreach(
fun({OldItem, Item}) ->
push_item(User, LServer, To, Item),
case Item#roster.subscription of
remove ->
send_unsubscribing_presence(To, OldItem);
_ ->
ok
end
end, ItemPairs),
case transaction(LUser, LServer, [LJID], F) of
{atomic, {OldItem, Item}} ->
push_item(User, LServer, To, OldItem, Item),
case Item#roster.subscription of
remove ->
send_unsubscribing_presence(To, OldItem);
_ ->
ok
end,
xmpp:make_iq_result(IQ);
E ->
?ERROR_MSG("roster set failed:~nIQ = ~s~nError = ~p",
@ -434,126 +484,66 @@ process_iq_set(#iq{from = From, to = To,
xmpp:make_error(IQ, xmpp:err_internal_server_error())
end.
push_item(User, Server, From, Item) ->
push_item(User, Server, From, OldItem, NewItem) ->
case roster_versioning_enabled(Server) of
true ->
push_item_version(Server, User, From, Item,
roster_version(Server, User));
false ->
lists:foreach(fun (Resource) ->
push_item(User, Server, Resource, From, Item)
end,
ejabberd_sm:get_user_resources(User, Server))
true ->
push_item_version(Server, User, From, OldItem, NewItem,
roster_version(Server, User));
false ->
lists:foreach(
fun(Resource) ->
push_item(User, Server, Resource, From, OldItem, NewItem)
end, ejabberd_sm:get_user_resources(User, Server))
end.
push_item(User, Server, Resource, From, Item) ->
push_item(User, Server, Resource, From, Item,
not_found).
push_item(User, Server, Resource, From, OldItem, NewItem) ->
push_item(User, Server, Resource, From, OldItem, NewItem, undefined).
push_item(User, Server, Resource, From, Item,
RosterVersion) ->
Ver = case RosterVersion of
not_found -> undefined;
_ -> RosterVersion
end,
push_item(User, Server, Resource, From, OldItem, NewItem, Ver) ->
To = jid:make(User, Server, Resource),
route_presence_change(To, OldItem, NewItem),
ResIQ = #iq{type = set, from = From, to = To,
id = <<"push", (randoms:get_string())/binary>>,
sub_els = [#roster_query{ver = Ver,
items = [encode_item(Item)]}]},
ejabberd_router:route(xmpp:put_meta(ResIQ, roster_item, Item)).
items = [encode_item(NewItem)]}]},
ejabberd_router:route(ResIQ).
push_item_version(Server, User, From, Item,
RosterVersion) ->
lists:foreach(fun (Resource) ->
push_item(User, Server, Resource, From, Item,
RosterVersion)
end,
ejabberd_sm:get_user_resources(User, Server)).
push_item_version(Server, User, From, OldItem, NewItem, RosterVersion) ->
lists:foreach(
fun(Resource) ->
push_item(User, Server, Resource, From,
OldItem, NewItem, RosterVersion)
end, ejabberd_sm:get_user_resources(User, Server)).
-spec user_receive_packet({stanza(), ejabberd_c2s:state()}) -> {stanza(), ejabberd_c2s:state()}.
user_receive_packet({#iq{type = set, meta = #{roster_item := Item}} = IQ, State}) ->
{IQ, roster_change(State, Item)};
user_receive_packet(Acc) ->
Acc.
-spec roster_change(ejabberd_c2s:state(), #roster{}) -> ejabberd_c2s:state().
roster_change(#{user := U, server := S, resource := R,
pres_a := PresA, pres_f := PresF, pres_t := PresT} = State,
#roster{jid = IJID, subscription = ISubscription}) ->
LIJID = jid:tolower(IJID),
IsFrom = (ISubscription == both) or (ISubscription == from),
IsTo = (ISubscription == both) or (ISubscription == to),
OldIsFrom = ?SETS:is_element(LIJID, PresF),
FSet = if IsFrom -> ?SETS:add_element(LIJID, PresF);
true -> ?SETS:del_element(LIJID, PresF)
end,
TSet = if IsTo -> ?SETS:add_element(LIJID, PresT);
true -> ?SETS:del_element(LIJID, PresT)
end,
State1 = State#{pres_f => FSet, pres_t => TSet},
case maps:get(pres_last, State, undefined) of
undefined ->
State1;
LastPres ->
From = jid:make(U, S, R),
To = jid:make(IJID),
Cond1 = IsFrom andalso not OldIsFrom,
Cond2 = not IsFrom andalso OldIsFrom andalso
?SETS:is_element(LIJID, PresA),
if Cond1 ->
case ejabberd_hooks:run_fold(
privacy_check_packet, allow,
[State1, LastPres, out]) of
deny ->
ok;
allow ->
Pres = xmpp:set_from_to(LastPres, From, To),
ejabberd_router:route(Pres)
end,
A = ?SETS:add_element(LIJID, PresA),
State1#{pres_a => A};
Cond2 ->
PU = #presence{from = From, to = To, type = unavailable},
case ejabberd_hooks:run_fold(
privacy_check_packet, allow,
[State1, PU, out]) of
deny ->
ok;
allow ->
ejabberd_router:route(PU)
end,
A = ?SETS:del_element(LIJID, PresA),
State1#{pres_a => A};
true ->
State1
end
-spec route_presence_change(jid(), #roster{}, #roster{}) -> ok.
route_presence_change(From, OldItem, NewItem) ->
OldSub = OldItem#roster.subscription,
NewSub = NewItem#roster.subscription,
To = jid:make(NewItem#roster.jid),
NewIsFrom = NewSub == both orelse NewSub == from,
OldIsFrom = OldSub == both orelse OldSub == from,
if NewIsFrom andalso not OldIsFrom ->
case ejabberd_sm:get_session_pid(
From#jid.luser, From#jid.lserver, From#jid.lresource) of
none ->
ok;
Pid ->
ejabberd_c2s:resend_presence(Pid, To)
end;
OldIsFrom andalso not NewIsFrom ->
PU = #presence{from = From, to = To, type = unavailable},
case ejabberd_hooks:run_fold(
privacy_check_packet, allow,
[From, PU, out]) of
deny ->
ok;
allow ->
ejabberd_router:route(PU)
end;
true ->
ok
end.
-spec c2s_session_opened(ejabberd_c2s:state()) -> ejabberd_c2s:state().
c2s_session_opened(#{jid := #jid{luser = LUser, lserver = LServer},
pres_f := PresF, pres_t := PresT} = State) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
Items = Mod:get_only_items(LUser, LServer),
{F, T} = fill_subscription_lists(Items, PresF, PresT),
State#{pres_f => F, pres_t => T}.
fill_subscription_lists([I | Is], F, T) ->
J = element(3, I#roster.usj),
{F1, T1} = case I#roster.subscription of
both ->
{?SETS:add_element(J, F), ?SETS:add_element(J, T)};
from ->
{?SETS:add_element(J, F), T};
to ->
{F, ?SETS:add_element(J, T)};
_ ->
{F, T}
end,
fill_subscription_lists(Is, F1, T1);
fill_subscription_lists([], F, T) ->
{F, T}.
ask_to_pending(subscribe) -> out;
ask_to_pending(unsubscribe) -> none;
ask_to_pending(Ask) -> Ask.
@ -562,9 +552,15 @@ roster_subscribe_t(LUser, LServer, LJID, Item) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
Mod:roster_subscribe(LUser, LServer, LJID, Item).
transaction(LServer, F) ->
transaction(LUser, LServer, LJIDs, F) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
Mod:transaction(LServer, F).
case Mod:transaction(LServer, F) of
{atomic, _} = Result ->
delete_cache(Mod, LUser, LServer, LJIDs),
Result;
Err ->
Err
end.
-spec in_subscription(boolean(), binary(), binary(), jid(),
subscribe | subscribed | unsubscribe | unsubscribed,
@ -579,18 +575,13 @@ in_subscription(_, User, Server, JID, Type, Reason) ->
out_subscription(User, Server, JID, Type) ->
process_subscription(out, User, Server, JID, Type, <<"">>).
get_roster_by_jid_with_groups_t(LUser, LServer, LJID) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
Mod:get_roster_by_jid_with_groups(LUser, LServer, LJID).
process_subscription(Direction, User, Server, JID1,
Type, Reason) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
LJID = jid:tolower(JID1),
F = fun () ->
Item = get_roster_by_jid_with_groups_t(LUser, LServer,
LJID),
Item = get_roster_item(LUser, LServer, LJID),
NewState = case Direction of
out ->
out_state_change(Item#roster.subscription,
@ -611,46 +602,48 @@ process_subscription(Direction, User, Server, JID1,
_ -> <<"">>
end,
case NewState of
none -> {none, AutoReply};
{none, none}
when Item#roster.subscription == none,
Item#roster.ask == in ->
del_roster_t(LUser, LServer, LJID), {none, AutoReply};
{Subscription, Pending} ->
NewItem = Item#roster{subscription = Subscription,
ask = Pending,
askmessage = AskMessage},
roster_subscribe_t(LUser, LServer, LJID, NewItem),
case roster_version_on_db(LServer) of
true -> write_roster_version_t(LUser, LServer);
false -> ok
end,
{{push, NewItem}, AutoReply}
none ->
{none, AutoReply};
{none, none} when Item#roster.subscription == none,
Item#roster.ask == in ->
del_roster_t(LUser, LServer, LJID), {none, AutoReply};
{Subscription, Pending} ->
NewItem = Item#roster{subscription = Subscription,
ask = Pending,
askmessage = AskMessage},