25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-12-22 17:28:25 +01:00

Merge pull request #1666 from weiss/fix-csi

Let CSI keep latest stanzas of each given full JID
This commit is contained in:
Christophe Romain 2017-04-07 15:10:39 +02:00 committed by GitHub
commit f2ca4cb3cd

View File

@ -47,17 +47,17 @@
-define(CSI_QUEUE_MAX, 100). -define(CSI_QUEUE_MAX, 100).
-type csi_type() :: presence | chatstate | {pep, binary()}. -type csi_type() :: presence | chatstate | {pep, binary()}.
-type csi_queue() :: {non_neg_integer(), non_neg_integer(), map()}. -type csi_queue() :: {non_neg_integer(), map()}.
-type csi_timestamp() :: {non_neg_integer(), erlang:timestamp()}. -type csi_timestamp() :: {non_neg_integer(), erlang:timestamp()}.
-type csi_key() :: {ljid(), csi_type()}.
-type csi_element() :: {csi_timestamp(), stanza()}.
-type c2s_state() :: ejabberd_c2s:state(). -type c2s_state() :: ejabberd_c2s:state().
-type filter_acc() :: {stanza() | drop, c2s_state()}. -type filter_acc() :: {stanza() | drop, c2s_state()}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_mod callbacks. %% gen_mod callbacks.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec start(binary(), gen_mod:opts()) -> ok. -spec start(binary(), gen_mod:opts()) -> ok.
start(Host, Opts) -> start(Host, Opts) ->
QueuePresence = QueuePresence =
gen_mod:get_opt(queue_presence, Opts, gen_mod:get_opt(queue_presence, Opts,
@ -92,7 +92,6 @@ start(Host, Opts) ->
end. end.
-spec stop(binary()) -> ok. -spec stop(binary()) -> ok.
stop(Host) -> stop(Host) ->
QueuePresence = QueuePresence =
gen_mod:get_module_opt(Host, ?MODULE, queue_presence, gen_mod:get_module_opt(Host, ?MODULE, queue_presence,
@ -165,7 +164,6 @@ reload(Host, NewOpts, _OldOpts) ->
end. end.
-spec mod_opt_type(atom()) -> fun((term()) -> term()) | [atom()]. -spec mod_opt_type(atom()) -> fun((term()) -> term()) | [atom()].
mod_opt_type(queue_presence) -> mod_opt_type(queue_presence) ->
fun(B) when is_boolean(B) -> B end; fun(B) when is_boolean(B) -> B end;
mod_opt_type(queue_chat_states) -> mod_opt_type(queue_chat_states) ->
@ -175,7 +173,6 @@ mod_opt_type(queue_pep) ->
mod_opt_type(_) -> [queue_presence, queue_chat_states, queue_pep]. mod_opt_type(_) -> [queue_presence, queue_chat_states, queue_pep].
-spec depends(binary(), gen_mod:opts()) -> [{module(), hard | soft}]. -spec depends(binary(), gen_mod:opts()) -> [{module(), hard | soft}].
depends(_Host, _Opts) -> depends(_Host, _Opts) ->
[]. [].
@ -320,25 +317,21 @@ enqueue_stanza(Type, Stanza, #{csi_state := inactive,
C2SState1 = flush_queue(C2SState), C2SState1 = flush_queue(C2SState),
enqueue_stanza(Type, Stanza, C2SState1); enqueue_stanza(Type, Stanza, C2SState1);
false -> false ->
#jid{luser = U, lserver = S} = xmpp:get_from(Stanza), From = jid:tolower(xmpp:get_from(Stanza)),
Q1 = queue_in({U, S}, Type, Stanza, Q), Q1 = queue_in({From, Type}, Stanza, Q),
{stop, {drop, C2SState#{csi_queue => Q1}}} {stop, {drop, C2SState#{csi_queue => Q1}}}
end; end;
enqueue_stanza(_Type, Stanza, State) -> enqueue_stanza(_Type, Stanza, State) ->
{Stanza, State}. {Stanza, State}.
-spec dequeue_sender(jid(), c2s_state()) -> c2s_state(). -spec dequeue_sender(jid(), c2s_state()) -> c2s_state().
dequeue_sender(#jid{luser = U, lserver = S}, dequeue_sender(#jid{luser = U, lserver = S} = Sender,
#{csi_queue := Q, jid := JID} = C2SState) -> #{csi_queue := Q, jid := JID} = C2SState) ->
?DEBUG("Flushing packets of ~s@~s from CSI queue of ~s", ?DEBUG("Flushing packets of ~s@~s from CSI queue of ~s",
[U, S, jid:encode(JID)]), [U, S, jid:encode(JID)]),
case queue_take({U, S}, Q) of {Elems, Q1} = queue_take(Sender, Q),
{Stanzas, Q1} -> C2SState1 = flush_stanzas(C2SState, Elems),
C2SState1 = flush_stanzas(C2SState, Stanzas), C2SState1#{csi_queue => Q1}.
C2SState1#{csi_queue => Q1};
error ->
C2SState
end.
-spec flush_queue(c2s_state()) -> c2s_state(). -spec flush_queue(c2s_state()) -> c2s_state().
flush_queue(#{csi_queue := Q, jid := JID} = C2SState) -> flush_queue(#{csi_queue := Q, jid := JID} = C2SState) ->
@ -350,7 +343,7 @@ flush_queue(#{csi_queue := Q, jid := JID} = C2SState) ->
[{csi_type(), csi_timestamp(), stanza()}]) -> c2s_state(). [{csi_type(), csi_timestamp(), stanza()}]) -> c2s_state().
flush_stanzas(#{lserver := LServer} = C2SState, Elems) -> flush_stanzas(#{lserver := LServer} = C2SState, Elems) ->
lists:foldl( lists:foldl(
fun({_Type, Time, Stanza}, AccState) -> fun({Time, Stanza}, AccState) ->
Stanza1 = add_delay_info(Stanza, LServer, Time), Stanza1 = add_delay_info(Stanza, LServer, Time),
ejabberd_c2s:send(AccState, Stanza1) ejabberd_c2s:send(AccState, Stanza1)
end, C2SState, Elems). end, C2SState, Elems).
@ -379,48 +372,29 @@ get_pep_node(#message{} = Msg) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec queue_new() -> csi_queue(). -spec queue_new() -> csi_queue().
queue_new() -> queue_new() ->
{0, 0, #{}}. {0, #{}}.
-spec queue_in(term(), term(), term(), csi_queue()) -> csi_queue(). -spec queue_in(csi_key(), stanza(), csi_queue()) -> csi_queue().
queue_in(Key, Type, Val, {N, Seq, Q}) -> queue_in(Key, Stanza, {Seq, Q}) ->
Seq1 = Seq + 1, Seq1 = Seq + 1,
Time = {Seq1, p1_time_compat:timestamp()}, Time = {Seq1, p1_time_compat:timestamp()},
case maps:get(Key, Q, error) of Q1 = maps:put(Key, {Time, Stanza}, Q),
error -> {Seq1, Q1}.
Q1 = maps:put(Key, [{Type, Time, Val}], Q),
{N + 1, Seq1, Q1};
TypeVals ->
case lists:keymember(Type, 1, TypeVals) of
true ->
TypeVals1 = lists:keyreplace(
Type, 1, TypeVals, {Type, Time, Val}),
Q1 = maps:put(Key, TypeVals1, Q),
{N, Seq1, Q1};
false ->
TypeVals1 = [{Type, Time, Val}|TypeVals],
Q1 = maps:put(Key, TypeVals1, Q),
{N + 1, Seq1, Q1}
end
end.
-spec queue_take(term(), csi_queue()) -> {list(), csi_queue()} | error. -spec queue_take(jid(), csi_queue()) -> {[csi_element()], csi_queue()}.
queue_take(Key, {N, Seq, Q}) -> queue_take(#jid{luser = LUser, lserver = LServer}, {Seq, Q}) ->
case maps:get(Key, Q, error) of {Vals, Q1} = maps:fold(fun({{U, S, _}, _} = Key, Val, {AccVals, AccQ})
error -> when U == LUser, S == LServer ->
error; {[Val | AccVals], maps:remove(Key, AccQ)};
TypeVals -> (_, _, Acc) ->
Q1 = maps:remove(Key, Q), Acc
{lists:keysort(2, TypeVals), {N-length(TypeVals), Seq, Q1}} end, {[], Q}, Q),
end. {lists:keysort(1, Vals), {Seq, Q1}}.
-spec queue_len(csi_queue()) -> non_neg_integer(). -spec queue_len(csi_queue()) -> non_neg_integer().
queue_len({N, _, _}) -> queue_len({_, Q}) ->
N. maps:size(Q).
-spec queue_to_list(csi_queue()) -> [term()]. -spec queue_to_list(csi_queue()) -> [csi_element()].
queue_to_list({_, _, Q}) -> queue_to_list({_, Q}) ->
TypeVals = maps:fold( lists:keysort(1, maps:values(Q)).
fun(_, Vals, Acc) ->
Vals ++ Acc
end, [], Q),
lists:keysort(2, TypeVals).