25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-11-28 16:34:13 +01:00

mod_client_state: Queue stanzas of each full JID

Keep the latest stanzas of each given full JID, rather than dropping
them when stanzas from a different resource are received.  This change
makes sure the recipient receives the latest status of all clients of
each contact.  It also ensures the recipient will see the current list
of occupants of joined MUC rooms.
This commit is contained in:
Holger Weiss 2017-04-06 22:19:00 +02:00
parent 7827faae4b
commit 0ddd2e0ebf

View File

@ -49,6 +49,8 @@
-type csi_type() :: presence | chatstate | {pep, binary()}. -type csi_type() :: presence | chatstate | {pep, binary()}.
-type csi_queue() :: {non_neg_integer(), map()}. -type csi_queue() :: {non_neg_integer(), map()}.
-type csi_timestamp() :: {non_neg_integer(), erlang:timestamp()}. -type csi_timestamp() :: {non_neg_integer(), erlang:timestamp()}.
-type csi_key() :: {ljid(), csi_type()}.
-type csi_element() :: {csi_timestamp(), stanza()}.
-type c2s_state() :: ejabberd_c2s:state(). -type c2s_state() :: ejabberd_c2s:state().
-type filter_acc() :: {stanza() | drop, c2s_state()}. -type filter_acc() :: {stanza() | drop, c2s_state()}.
@ -320,25 +322,21 @@ enqueue_stanza(Type, Stanza, #{csi_state := inactive,
C2SState1 = flush_queue(C2SState), C2SState1 = flush_queue(C2SState),
enqueue_stanza(Type, Stanza, C2SState1); enqueue_stanza(Type, Stanza, C2SState1);
false -> false ->
#jid{luser = U, lserver = S} = xmpp:get_from(Stanza), From = jid:tolower(xmpp:get_from(Stanza)),
Q1 = queue_in({U, S}, Type, Stanza, Q), Q1 = queue_in({From, Type}, Stanza, Q),
{stop, {drop, C2SState#{csi_queue => Q1}}} {stop, {drop, C2SState#{csi_queue => Q1}}}
end; end;
enqueue_stanza(_Type, Stanza, State) -> enqueue_stanza(_Type, Stanza, State) ->
{Stanza, State}. {Stanza, State}.
-spec dequeue_sender(jid(), c2s_state()) -> c2s_state(). -spec dequeue_sender(jid(), c2s_state()) -> c2s_state().
dequeue_sender(#jid{luser = U, lserver = S}, dequeue_sender(#jid{luser = U, lserver = S} = Sender,
#{csi_queue := Q, jid := JID} = C2SState) -> #{csi_queue := Q, jid := JID} = C2SState) ->
?DEBUG("Flushing packets of ~s@~s from CSI queue of ~s", ?DEBUG("Flushing packets of ~s@~s from CSI queue of ~s",
[U, S, jid:encode(JID)]), [U, S, jid:encode(JID)]),
case queue_take({U, S}, Q) of {Elems, Q1} = queue_take(Sender, Q),
{Stanzas, Q1} -> C2SState1 = flush_stanzas(C2SState, Elems),
C2SState1 = flush_stanzas(C2SState, Stanzas), C2SState1#{csi_queue => Q1}.
C2SState1#{csi_queue => Q1};
error ->
C2SState
end.
-spec flush_queue(c2s_state()) -> c2s_state(). -spec flush_queue(c2s_state()) -> c2s_state().
flush_queue(#{csi_queue := Q, jid := JID} = C2SState) -> flush_queue(#{csi_queue := Q, jid := JID} = C2SState) ->
@ -350,7 +348,7 @@ flush_queue(#{csi_queue := Q, jid := JID} = C2SState) ->
[{csi_type(), csi_timestamp(), stanza()}]) -> c2s_state(). [{csi_type(), csi_timestamp(), stanza()}]) -> c2s_state().
flush_stanzas(#{lserver := LServer} = C2SState, Elems) -> flush_stanzas(#{lserver := LServer} = C2SState, Elems) ->
lists:foldl( lists:foldl(
fun({_Type, Time, Stanza}, AccState) -> fun({Time, Stanza}, AccState) ->
Stanza1 = add_delay_info(Stanza, LServer, Time), Stanza1 = add_delay_info(Stanza, LServer, Time),
ejabberd_c2s:send(AccState, Stanza1) ejabberd_c2s:send(AccState, Stanza1)
end, C2SState, Elems). end, C2SState, Elems).
@ -381,46 +379,27 @@ get_pep_node(#message{} = Msg) ->
queue_new() -> queue_new() ->
{0, #{}}. {0, #{}}.
-spec queue_in(term(), term(), term(), csi_queue()) -> csi_queue(). -spec queue_in(csi_key(), csi_element(), csi_queue()) -> csi_queue().
queue_in(Key, Type, Val, {Seq, Q}) -> queue_in(Key, Val, {Seq, Q}) ->
Seq1 = Seq + 1, Seq1 = Seq + 1,
Time = {Seq1, p1_time_compat:timestamp()}, Time = {Seq1, p1_time_compat:timestamp()},
case maps:get(Key, Q, error) of Q1 = maps:put(Key, {Time, Val}, Q),
error -> {Seq1, Q1}.
Q1 = maps:put(Key, [{Type, Time, Val}], Q),
{Seq1, Q1};
TypeVals ->
case lists:keymember(Type, 1, TypeVals) of
true ->
TypeVals1 = lists:keyreplace(
Type, 1, TypeVals, {Type, Time, Val}),
Q1 = maps:put(Key, TypeVals1, Q),
{Seq1, Q1};
false ->
TypeVals1 = [{Type, Time, Val}|TypeVals],
Q1 = maps:put(Key, TypeVals1, Q),
{Seq1, Q1}
end
end.
-spec queue_take(term(), csi_queue()) -> {list(), csi_queue()} | error. -spec queue_take(jid(), csi_queue()) -> {[csi_element()], csi_queue()}.
queue_take(Key, {Seq, Q}) -> queue_take(#jid{luser = LUser, lserver = LServer}, {Seq, Q}) ->
case maps:get(Key, Q, error) of {Vals, Q1} = maps:fold(fun({{U, S, _}, _} = Key, Val, {AccVals, AccQ})
error -> when U == LUser, S == LServer ->
error; {[Val | AccVals], maps:remove(Key, AccQ)};
TypeVals -> (_, _, Acc) ->
Q1 = maps:remove(Key, Q), Acc
{lists:keysort(2, TypeVals), {Seq, Q1}} end, {[], Q}, Q),
end. {lists:keysort(1, Vals), {Seq, Q1}}.
-spec queue_len(csi_queue()) -> non_neg_integer(). -spec queue_len(csi_queue()) -> non_neg_integer().
queue_len({_, Q}) -> queue_len({_, Q}) ->
maps:size(Q). maps:size(Q).
-spec queue_to_list(csi_queue()) -> [term()]. -spec queue_to_list(csi_queue()) -> [csi_element()].
queue_to_list({_, _, Q}) -> queue_to_list({_, Q}) ->
TypeVals = maps:fold( lists:keysort(1, maps:values(Q)).
fun(_, Vals, Acc) ->
Vals ++ Acc
end, [], Q),
lists:keysort(2, TypeVals).