diff --git a/src/mod_client_state.erl b/src/mod_client_state.erl index a5ac611f6..0d92cb2df 100644 --- a/src/mod_client_state.erl +++ b/src/mod_client_state.erl @@ -49,6 +49,8 @@ -type csi_type() :: presence | chatstate | {pep, binary()}. -type csi_queue() :: {non_neg_integer(), map()}. -type csi_timestamp() :: {non_neg_integer(), erlang:timestamp()}. +-type csi_key() :: {ljid(), csi_type()}. +-type csi_element() :: {csi_timestamp(), stanza()}. -type c2s_state() :: ejabberd_c2s:state(). -type filter_acc() :: {stanza() | drop, c2s_state()}. @@ -320,25 +322,21 @@ enqueue_stanza(Type, Stanza, #{csi_state := inactive, C2SState1 = flush_queue(C2SState), enqueue_stanza(Type, Stanza, C2SState1); false -> - #jid{luser = U, lserver = S} = xmpp:get_from(Stanza), - Q1 = queue_in({U, S}, Type, Stanza, Q), + From = jid:tolower(xmpp:get_from(Stanza)), + Q1 = queue_in({From, Type}, Stanza, Q), {stop, {drop, C2SState#{csi_queue => Q1}}} end; enqueue_stanza(_Type, Stanza, State) -> {Stanza, State}. -spec dequeue_sender(jid(), c2s_state()) -> c2s_state(). -dequeue_sender(#jid{luser = U, lserver = S}, +dequeue_sender(#jid{luser = U, lserver = S} = Sender, #{csi_queue := Q, jid := JID} = C2SState) -> ?DEBUG("Flushing packets of ~s@~s from CSI queue of ~s", [U, S, jid:encode(JID)]), - case queue_take({U, S}, Q) of - {Stanzas, Q1} -> - C2SState1 = flush_stanzas(C2SState, Stanzas), - C2SState1#{csi_queue => Q1}; - error -> - C2SState - end. + {Elems, Q1} = queue_take(Sender, Q), + C2SState1 = flush_stanzas(C2SState, Elems), + C2SState1#{csi_queue => Q1}. -spec flush_queue(c2s_state()) -> c2s_state(). flush_queue(#{csi_queue := Q, jid := JID} = C2SState) -> @@ -350,7 +348,7 @@ flush_queue(#{csi_queue := Q, jid := JID} = C2SState) -> [{csi_type(), csi_timestamp(), stanza()}]) -> c2s_state(). flush_stanzas(#{lserver := LServer} = C2SState, Elems) -> lists:foldl( - fun({_Type, Time, Stanza}, AccState) -> + fun({Time, Stanza}, AccState) -> Stanza1 = add_delay_info(Stanza, LServer, Time), ejabberd_c2s:send(AccState, Stanza1) end, C2SState, Elems). @@ -381,46 +379,27 @@ get_pep_node(#message{} = Msg) -> queue_new() -> {0, #{}}. --spec queue_in(term(), term(), term(), csi_queue()) -> csi_queue(). -queue_in(Key, Type, Val, {Seq, Q}) -> +-spec queue_in(csi_key(), csi_element(), csi_queue()) -> csi_queue(). +queue_in(Key, Val, {Seq, Q}) -> Seq1 = Seq + 1, Time = {Seq1, p1_time_compat:timestamp()}, - case maps:get(Key, Q, error) of - error -> - Q1 = maps:put(Key, [{Type, Time, Val}], Q), - {Seq1, Q1}; - TypeVals -> - case lists:keymember(Type, 1, TypeVals) of - true -> - TypeVals1 = lists:keyreplace( - Type, 1, TypeVals, {Type, Time, Val}), - Q1 = maps:put(Key, TypeVals1, Q), - {Seq1, Q1}; - false -> - TypeVals1 = [{Type, Time, Val}|TypeVals], - Q1 = maps:put(Key, TypeVals1, Q), - {Seq1, Q1} - end - end. + Q1 = maps:put(Key, {Time, Val}, Q), + {Seq1, Q1}. --spec queue_take(term(), csi_queue()) -> {list(), csi_queue()} | error. -queue_take(Key, {Seq, Q}) -> - case maps:get(Key, Q, error) of - error -> - error; - TypeVals -> - Q1 = maps:remove(Key, Q), - {lists:keysort(2, TypeVals), {Seq, Q1}} - end. +-spec queue_take(jid(), csi_queue()) -> {[csi_element()], csi_queue()}. +queue_take(#jid{luser = LUser, lserver = LServer}, {Seq, Q}) -> + {Vals, Q1} = maps:fold(fun({{U, S, _}, _} = Key, Val, {AccVals, AccQ}) + when U == LUser, S == LServer -> + {[Val | AccVals], maps:remove(Key, AccQ)}; + (_, _, Acc) -> + Acc + end, {[], Q}, Q), + {lists:keysort(1, Vals), {Seq, Q1}}. -spec queue_len(csi_queue()) -> non_neg_integer(). queue_len({_, Q}) -> maps:size(Q). --spec queue_to_list(csi_queue()) -> [term()]. -queue_to_list({_, _, Q}) -> - TypeVals = maps:fold( - fun(_, Vals, Acc) -> - Vals ++ Acc - end, [], Q), - lists:keysort(2, TypeVals). +-spec queue_to_list(csi_queue()) -> [csi_element()]. +queue_to_list({_, Q}) -> + lists:keysort(1, maps:values(Q)).