mirror of
https://github.com/processone/ejabberd.git
synced 2024-11-22 16:20:52 +01:00
Merge remote-tracking branch 'processone/pr/1122'
* processone/pr/1122: mod_client_state: Add function specifications mod_client_state: Add "queue_pep" option mod_client_state: Queue chat state notifications Move CSI queue handling into mod_client_state
This commit is contained in:
commit
1b02c5fbf3
@ -104,7 +104,6 @@
|
|||||||
ip,
|
ip,
|
||||||
aux_fields = [],
|
aux_fields = [],
|
||||||
csi_state = active,
|
csi_state = active,
|
||||||
csi_queue = [],
|
|
||||||
mgmt_state,
|
mgmt_state,
|
||||||
mgmt_xmlns,
|
mgmt_xmlns,
|
||||||
mgmt_queue,
|
mgmt_queue,
|
||||||
@ -1147,7 +1146,7 @@ session_established({xmlstreamelement,
|
|||||||
#xmlel{name = <<"active">>,
|
#xmlel{name = <<"active">>,
|
||||||
attrs = [{<<"xmlns">>, ?NS_CLIENT_STATE}]}},
|
attrs = [{<<"xmlns">>, ?NS_CLIENT_STATE}]}},
|
||||||
StateData) ->
|
StateData) ->
|
||||||
NewStateData = csi_queue_flush(StateData),
|
NewStateData = csi_flush_queue(StateData),
|
||||||
fsm_next_state(session_established, NewStateData#state{csi_state = active});
|
fsm_next_state(session_established, NewStateData#state{csi_state = active});
|
||||||
session_established({xmlstreamelement,
|
session_established({xmlstreamelement,
|
||||||
#xmlel{name = <<"inactive">>,
|
#xmlel{name = <<"inactive">>,
|
||||||
@ -2763,7 +2762,7 @@ handle_resume(StateData, Attrs) ->
|
|||||||
#xmlel{name = <<"r">>,
|
#xmlel{name = <<"r">>,
|
||||||
attrs = [{<<"xmlns">>, AttrXmlns}],
|
attrs = [{<<"xmlns">>, AttrXmlns}],
|
||||||
children = []}),
|
children = []}),
|
||||||
FlushedState = csi_queue_flush(NewState),
|
FlushedState = csi_flush_queue(NewState),
|
||||||
NewStateData = FlushedState#state{csi_state = active},
|
NewStateData = FlushedState#state{csi_state = active},
|
||||||
?INFO_MSG("Resumed session for ~s",
|
?INFO_MSG("Resumed session for ~s",
|
||||||
[jid:to_string(NewStateData#state.jid)]),
|
[jid:to_string(NewStateData#state.jid)]),
|
||||||
@ -2995,7 +2994,6 @@ inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) ->
|
|||||||
privacy_list = OldStateData#state.privacy_list,
|
privacy_list = OldStateData#state.privacy_list,
|
||||||
aux_fields = OldStateData#state.aux_fields,
|
aux_fields = OldStateData#state.aux_fields,
|
||||||
csi_state = OldStateData#state.csi_state,
|
csi_state = OldStateData#state.csi_state,
|
||||||
csi_queue = OldStateData#state.csi_queue,
|
|
||||||
mgmt_xmlns = OldStateData#state.mgmt_xmlns,
|
mgmt_xmlns = OldStateData#state.mgmt_xmlns,
|
||||||
mgmt_queue = OldStateData#state.mgmt_queue,
|
mgmt_queue = OldStateData#state.mgmt_queue,
|
||||||
mgmt_timeout = OldStateData#state.mgmt_timeout,
|
mgmt_timeout = OldStateData#state.mgmt_timeout,
|
||||||
@ -3028,65 +3026,25 @@ add_resent_delay_info(#state{server = From}, El, Time) ->
|
|||||||
%%% XEP-0352
|
%%% XEP-0352
|
||||||
%%%----------------------------------------------------------------------
|
%%%----------------------------------------------------------------------
|
||||||
|
|
||||||
csi_filter_stanza(#state{csi_state = CsiState, jid = JID} = StateData,
|
csi_filter_stanza(#state{csi_state = CsiState, server = Server} = StateData,
|
||||||
Stanza) ->
|
Stanza) ->
|
||||||
Action = ejabberd_hooks:run_fold(csi_filter_stanza,
|
{StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_filter_stanza, Server,
|
||||||
StateData#state.server,
|
{StateData, [Stanza]},
|
||||||
send, [Stanza]),
|
[Server, Stanza]),
|
||||||
?DEBUG("Going to ~p stanza for inactive client ~p",
|
StateData2 = lists:foldl(fun(CurStanza, AccState) ->
|
||||||
[Action, jid:to_string(JID)]),
|
send_stanza(AccState, CurStanza)
|
||||||
case Action of
|
end, StateData1#state{csi_state = active},
|
||||||
queue -> csi_queue_add(StateData, Stanza);
|
Stanzas),
|
||||||
drop -> StateData;
|
StateData2#state{csi_state = CsiState}.
|
||||||
send ->
|
|
||||||
From = fxml:get_tag_attr_s(<<"from">>, Stanza),
|
|
||||||
StateData1 = csi_queue_send(StateData, From),
|
|
||||||
StateData2 = send_stanza(StateData1#state{csi_state = active},
|
|
||||||
Stanza),
|
|
||||||
StateData2#state{csi_state = CsiState}
|
|
||||||
end.
|
|
||||||
|
|
||||||
csi_queue_add(#state{csi_queue = Queue} = StateData, Stanza) ->
|
csi_flush_queue(#state{csi_state = CsiState, server = Server} = StateData) ->
|
||||||
case length(StateData#state.csi_queue) >= csi_max_queue(StateData) of
|
{StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_flush_queue, Server,
|
||||||
true -> csi_queue_add(csi_queue_flush(StateData), Stanza);
|
{StateData, []}, [Server]),
|
||||||
false ->
|
StateData2 = lists:foldl(fun(CurStanza, AccState) ->
|
||||||
From = fxml:get_tag_attr_s(<<"from">>, Stanza),
|
send_stanza(AccState, CurStanza)
|
||||||
NewQueue = lists:keystore(From, 1, Queue, {From, p1_time_compat:timestamp(), Stanza}),
|
end, StateData1#state{csi_state = active},
|
||||||
StateData#state{csi_queue = NewQueue}
|
Stanzas),
|
||||||
end.
|
StateData2#state{csi_state = CsiState}.
|
||||||
|
|
||||||
csi_queue_send(#state{csi_queue = Queue, csi_state = CsiState, server = Host} =
|
|
||||||
StateData, From) ->
|
|
||||||
case lists:keytake(From, 1, Queue) of
|
|
||||||
{value, {From, Time, Stanza}, NewQueue} ->
|
|
||||||
NewStanza = jlib:add_delay_info(Stanza, Host, Time,
|
|
||||||
<<"Client Inactive">>),
|
|
||||||
NewStateData = send_stanza(StateData#state{csi_state = active},
|
|
||||||
NewStanza),
|
|
||||||
NewStateData#state{csi_queue = NewQueue, csi_state = CsiState};
|
|
||||||
false -> StateData
|
|
||||||
end.
|
|
||||||
|
|
||||||
csi_queue_flush(#state{csi_queue = Queue, csi_state = CsiState, jid = JID,
|
|
||||||
server = Host} = StateData) ->
|
|
||||||
?DEBUG("Flushing CSI queue for ~s", [jid:to_string(JID)]),
|
|
||||||
NewStateData =
|
|
||||||
lists:foldl(fun({_From, Time, Stanza}, AccState) ->
|
|
||||||
NewStanza =
|
|
||||||
jlib:add_delay_info(Stanza, Host, Time,
|
|
||||||
<<"Client Inactive">>),
|
|
||||||
send_stanza(AccState, NewStanza)
|
|
||||||
end, StateData#state{csi_state = active}, Queue),
|
|
||||||
NewStateData#state{csi_queue = [], csi_state = CsiState}.
|
|
||||||
|
|
||||||
%% Make sure we won't push too many messages to the XEP-0198 queue when the
|
|
||||||
%% client becomes 'active' again. Otherwise, the client might not manage to
|
|
||||||
%% acknowledge the message flood in time. Also, don't let the queue grow to
|
|
||||||
%% more than 100 stanzas.
|
|
||||||
csi_max_queue(#state{mgmt_max_queue = infinity}) -> 100;
|
|
||||||
csi_max_queue(#state{mgmt_max_queue = Max}) when Max > 200 -> 100;
|
|
||||||
csi_max_queue(#state{mgmt_max_queue = Max}) when Max < 2 -> 1;
|
|
||||||
csi_max_queue(#state{mgmt_max_queue = Max}) -> Max div 2.
|
|
||||||
|
|
||||||
%%%----------------------------------------------------------------------
|
%%%----------------------------------------------------------------------
|
||||||
%%% JID Set memory footprint reduction code
|
%%% JID Set memory footprint reduction code
|
||||||
|
@ -30,22 +30,44 @@
|
|||||||
|
|
||||||
-behavior(gen_mod).
|
-behavior(gen_mod).
|
||||||
|
|
||||||
-export([start/2, stop/1, add_stream_feature/2,
|
%% gen_mod callbacks.
|
||||||
filter_presence/2, filter_chat_states/2,
|
-export([start/2, stop/1, mod_opt_type/1]).
|
||||||
mod_opt_type/1]).
|
|
||||||
|
%% ejabberd_hooks callbacks.
|
||||||
|
-export([filter_presence/3, filter_chat_states/3, filter_pep/3, filter_other/3,
|
||||||
|
flush_queue/2, add_stream_feature/2]).
|
||||||
|
|
||||||
-include("ejabberd.hrl").
|
-include("ejabberd.hrl").
|
||||||
-include("logger.hrl").
|
-include("logger.hrl").
|
||||||
-include("jlib.hrl").
|
-include("jlib.hrl").
|
||||||
|
|
||||||
|
-define(CSI_QUEUE_MAX, 100).
|
||||||
|
|
||||||
|
-type csi_type() :: presence | chatstate | {pep, binary(), binary()}.
|
||||||
|
-type csi_key() :: {ljid(), csi_type()}.
|
||||||
|
-type csi_stanza() :: {csi_key(), erlang:timestamp(), xmlel()}.
|
||||||
|
-type csi_queue() :: [csi_stanza()].
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% gen_mod callbacks.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec start(binary(), gen_mod:opts()) -> ok.
|
||||||
|
|
||||||
start(Host, Opts) ->
|
start(Host, Opts) ->
|
||||||
QueuePresence = gen_mod:get_opt(queue_presence, Opts,
|
QueuePresence =
|
||||||
|
gen_mod:get_opt(queue_presence, Opts,
|
||||||
fun(B) when is_boolean(B) -> B end,
|
fun(B) when is_boolean(B) -> B end,
|
||||||
true),
|
true),
|
||||||
DropChatStates = gen_mod:get_opt(drop_chat_states, Opts,
|
QueueChatStates =
|
||||||
|
gen_mod:get_opt(queue_chat_states, Opts,
|
||||||
fun(B) when is_boolean(B) -> B end,
|
fun(B) when is_boolean(B) -> B end,
|
||||||
true),
|
true),
|
||||||
if QueuePresence; DropChatStates ->
|
QueuePEP =
|
||||||
|
gen_mod:get_opt(queue_pep, Opts,
|
||||||
|
fun(B) when is_boolean(B) -> B end,
|
||||||
|
false),
|
||||||
|
if QueuePresence; QueueChatStates; QueuePEP ->
|
||||||
ejabberd_hooks:add(c2s_post_auth_features, Host, ?MODULE,
|
ejabberd_hooks:add(c2s_post_auth_features, Host, ?MODULE,
|
||||||
add_stream_feature, 50),
|
add_stream_feature, 50),
|
||||||
if QueuePresence ->
|
if QueuePresence ->
|
||||||
@ -53,23 +75,39 @@ start(Host, Opts) ->
|
|||||||
filter_presence, 50);
|
filter_presence, 50);
|
||||||
true -> ok
|
true -> ok
|
||||||
end,
|
end,
|
||||||
if DropChatStates ->
|
if QueueChatStates ->
|
||||||
ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE,
|
ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE,
|
||||||
filter_chat_states, 50);
|
filter_chat_states, 50);
|
||||||
true -> ok
|
true -> ok
|
||||||
end;
|
end,
|
||||||
|
if QueuePEP ->
|
||||||
|
ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE,
|
||||||
|
filter_pep, 50);
|
||||||
true -> ok
|
true -> ok
|
||||||
end,
|
end,
|
||||||
ok.
|
ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE,
|
||||||
|
filter_other, 100),
|
||||||
|
ejabberd_hooks:add(csi_flush_queue, Host, ?MODULE,
|
||||||
|
flush_queue, 50);
|
||||||
|
true -> ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec stop(binary()) -> ok.
|
||||||
|
|
||||||
stop(Host) ->
|
stop(Host) ->
|
||||||
QueuePresence = gen_mod:get_module_opt(Host, ?MODULE, queue_presence,
|
QueuePresence =
|
||||||
|
gen_mod:get_module_opt(Host, ?MODULE, queue_presence,
|
||||||
fun(B) when is_boolean(B) -> B end,
|
fun(B) when is_boolean(B) -> B end,
|
||||||
true),
|
true),
|
||||||
DropChatStates = gen_mod:get_module_opt(Host, ?MODULE, drop_chat_states,
|
QueueChatStates =
|
||||||
|
gen_mod:get_module_opt(Host, ?MODULE, queue_chat_states,
|
||||||
fun(B) when is_boolean(B) -> B end,
|
fun(B) when is_boolean(B) -> B end,
|
||||||
true),
|
true),
|
||||||
if QueuePresence; DropChatStates ->
|
QueuePEP =
|
||||||
|
gen_mod:get_module_opt(Host, ?MODULE, queue_pep,
|
||||||
|
fun(B) when is_boolean(B) -> B end,
|
||||||
|
false),
|
||||||
|
if QueuePresence; QueueChatStates; QueuePEP ->
|
||||||
ejabberd_hooks:delete(c2s_post_auth_features, Host, ?MODULE,
|
ejabberd_hooks:delete(c2s_post_auth_features, Host, ?MODULE,
|
||||||
add_stream_feature, 50),
|
add_stream_feature, 50),
|
||||||
if QueuePresence ->
|
if QueuePresence ->
|
||||||
@ -77,14 +115,95 @@ stop(Host) ->
|
|||||||
filter_presence, 50);
|
filter_presence, 50);
|
||||||
true -> ok
|
true -> ok
|
||||||
end,
|
end,
|
||||||
if DropChatStates ->
|
if QueueChatStates ->
|
||||||
ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE,
|
ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE,
|
||||||
filter_chat_states, 50);
|
filter_chat_states, 50);
|
||||||
true -> ok
|
true -> ok
|
||||||
end;
|
end,
|
||||||
|
if QueuePEP ->
|
||||||
|
ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE,
|
||||||
|
filter_pep, 50);
|
||||||
true -> ok
|
true -> ok
|
||||||
end,
|
end,
|
||||||
ok.
|
ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE,
|
||||||
|
filter_other, 100),
|
||||||
|
ejabberd_hooks:delete(csi_flush_queue, Host, ?MODULE,
|
||||||
|
flush_queue, 50);
|
||||||
|
true -> ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec mod_opt_type(atom()) -> fun((term()) -> term()) | [atom()].
|
||||||
|
|
||||||
|
mod_opt_type(queue_presence) ->
|
||||||
|
fun(B) when is_boolean(B) -> B end;
|
||||||
|
mod_opt_type(queue_chat_states) ->
|
||||||
|
fun(B) when is_boolean(B) -> B end;
|
||||||
|
mod_opt_type(queue_pep) ->
|
||||||
|
fun(B) when is_boolean(B) -> B end;
|
||||||
|
mod_opt_type(_) -> [queue_presence, queue_chat_states, queue_pep].
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% ejabberd_hooks callbacks.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec filter_presence({term(), [xmlel()]}, binary(), xmlel())
|
||||||
|
-> {term(), [xmlel()]} | {stop, {term(), [xmlel()]}}.
|
||||||
|
|
||||||
|
filter_presence({C2SState, _OutStanzas} = Acc, Host,
|
||||||
|
#xmlel{name = <<"presence">>, attrs = Attrs} = Stanza) ->
|
||||||
|
case fxml:get_attr(<<"type">>, Attrs) of
|
||||||
|
{value, Type} when Type /= <<"unavailable">> ->
|
||||||
|
Acc;
|
||||||
|
_ ->
|
||||||
|
?DEBUG("Got availability presence stanza", []),
|
||||||
|
queue_add(presence, Stanza, Host, C2SState)
|
||||||
|
end;
|
||||||
|
filter_presence(Acc, _Host, _Stanza) -> Acc.
|
||||||
|
|
||||||
|
-spec filter_chat_states({term(), [xmlel()]}, binary(), xmlel())
|
||||||
|
-> {term(), [xmlel()]} | {stop, {term(), [xmlel()]}}.
|
||||||
|
|
||||||
|
filter_chat_states({C2SState, _OutStanzas} = Acc, Host,
|
||||||
|
#xmlel{name = <<"message">>} = Stanza) ->
|
||||||
|
case jlib:is_standalone_chat_state(Stanza) of
|
||||||
|
true ->
|
||||||
|
?DEBUG("Got standalone chat state notification", []),
|
||||||
|
queue_add(chatstate, Stanza, Host, C2SState);
|
||||||
|
false ->
|
||||||
|
Acc
|
||||||
|
end;
|
||||||
|
filter_chat_states(Acc, _Host, _Stanza) -> Acc.
|
||||||
|
|
||||||
|
-spec filter_pep({term(), [xmlel()]}, binary(), xmlel())
|
||||||
|
-> {term(), [xmlel()]} | {stop, {term(), [xmlel()]}}.
|
||||||
|
|
||||||
|
filter_pep({C2SState, _OutStanzas} = Acc, Host,
|
||||||
|
#xmlel{name = <<"message">>} = Stanza) ->
|
||||||
|
case find_pep(Stanza) of
|
||||||
|
{value, Type} ->
|
||||||
|
?DEBUG("Got PEP notification", []),
|
||||||
|
queue_add(Type, Stanza, Host, C2SState);
|
||||||
|
false ->
|
||||||
|
Acc
|
||||||
|
end;
|
||||||
|
filter_pep(Acc, _Host, _Stanza) -> Acc.
|
||||||
|
|
||||||
|
-spec filter_other({term(), [xmlel()]}, binary(), xmlel())
|
||||||
|
-> {stop, {term(), [xmlel()]}}.
|
||||||
|
|
||||||
|
filter_other({C2SState, _OutStanzas}, Host, Stanza) ->
|
||||||
|
?DEBUG("Won't add stanza to CSI queue", []),
|
||||||
|
queue_take(Stanza, Host, C2SState).
|
||||||
|
|
||||||
|
-spec flush_queue({term(), [xmlel()]}, binary()) -> {term(), [xmlel()]}.
|
||||||
|
|
||||||
|
flush_queue({C2SState, _OutStanzas}, Host) ->
|
||||||
|
?DEBUG("Going to flush CSI queue", []),
|
||||||
|
Queue = get_queue(C2SState),
|
||||||
|
NewState = set_queue([], C2SState),
|
||||||
|
{NewState, get_stanzas(Queue, Host)}.
|
||||||
|
|
||||||
|
-spec add_stream_feature([xmlel()], binary) -> [xmlel()].
|
||||||
|
|
||||||
add_stream_feature(Features, _Host) ->
|
add_stream_feature(Features, _Host) ->
|
||||||
Feature = #xmlel{name = <<"csi">>,
|
Feature = #xmlel{name = <<"csi">>,
|
||||||
@ -92,30 +211,100 @@ add_stream_feature(Features, _Host) ->
|
|||||||
children = []},
|
children = []},
|
||||||
[Feature | Features].
|
[Feature | Features].
|
||||||
|
|
||||||
filter_presence(_Action, #xmlel{name = <<"presence">>, attrs = Attrs}) ->
|
%%--------------------------------------------------------------------
|
||||||
case fxml:get_attr(<<"type">>, Attrs) of
|
%% Internal functions.
|
||||||
{value, Type} when Type /= <<"unavailable">> ->
|
%%--------------------------------------------------------------------
|
||||||
?DEBUG("Got important presence stanza", []),
|
|
||||||
{stop, send};
|
-spec queue_add(csi_type(), xmlel(), binary(), term())
|
||||||
|
-> {stop, {term(), [xmlel()]}}.
|
||||||
|
|
||||||
|
queue_add(Type, Stanza, Host, C2SState) ->
|
||||||
|
case get_queue(C2SState) of
|
||||||
|
Queue when length(Queue) >= ?CSI_QUEUE_MAX ->
|
||||||
|
?DEBUG("CSI queue too large, going to flush it", []),
|
||||||
|
NewState = set_queue([], C2SState),
|
||||||
|
{stop, {NewState, get_stanzas(Queue, Host) ++ [Stanza]}};
|
||||||
|
Queue ->
|
||||||
|
?DEBUG("Adding stanza to CSI queue", []),
|
||||||
|
From = fxml:get_tag_attr_s(<<"from">>, Stanza),
|
||||||
|
Key = {jid:tolower(jid:from_string(From)), Type},
|
||||||
|
Entry = {Key, p1_time_compat:timestamp(), Stanza},
|
||||||
|
NewQueue = lists:keystore(Key, 1, Queue, Entry),
|
||||||
|
NewState = set_queue(NewQueue, C2SState),
|
||||||
|
{stop, {NewState, []}}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec queue_take(xmlel(), binary(), term()) -> {stop, {term(), [xmlel()]}}.
|
||||||
|
|
||||||
|
queue_take(Stanza, Host, C2SState) ->
|
||||||
|
From = fxml:get_tag_attr_s(<<"from">>, Stanza),
|
||||||
|
{LUser, LServer, _LResource} = jid:tolower(jid:from_string(From)),
|
||||||
|
{Selected, Rest} = lists:partition(
|
||||||
|
fun({{{U, S, _R}, _Type}, _Time, _Stanza}) ->
|
||||||
|
U == LUser andalso S == LServer
|
||||||
|
end, get_queue(C2SState)),
|
||||||
|
NewState = set_queue(Rest, C2SState),
|
||||||
|
{stop, {NewState, get_stanzas(Selected, Host) ++ [Stanza]}}.
|
||||||
|
|
||||||
|
-spec set_queue(csi_queue(), term()) -> term().
|
||||||
|
|
||||||
|
set_queue(Queue, C2SState) ->
|
||||||
|
ejabberd_c2s:set_aux_field(csi_queue, Queue, C2SState).
|
||||||
|
|
||||||
|
-spec get_queue(term()) -> csi_queue().
|
||||||
|
|
||||||
|
get_queue(C2SState) ->
|
||||||
|
case ejabberd_c2s:get_aux_field(csi_queue, C2SState) of
|
||||||
|
{ok, Queue} ->
|
||||||
|
Queue;
|
||||||
|
error ->
|
||||||
|
[]
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec get_stanzas(csi_queue(), binary()) -> [xmlel()].
|
||||||
|
|
||||||
|
get_stanzas(Queue, Host) ->
|
||||||
|
lists:map(fun({_Key, Time, Stanza}) ->
|
||||||
|
jlib:add_delay_info(Stanza, Host, Time,
|
||||||
|
<<"Client Inactive">>)
|
||||||
|
end, Queue).
|
||||||
|
|
||||||
|
-spec find_pep(xmlel()) -> {pep, binary(), binary()} | false.
|
||||||
|
|
||||||
|
find_pep(#xmlel{name = <<"message">>} = Stanza) ->
|
||||||
|
From = fxml:get_tag_attr_s(<<"from">>, Stanza),
|
||||||
|
case jid:from_string(From) of
|
||||||
|
#jid{luser = <<>>} -> % It's not PEP.
|
||||||
|
false;
|
||||||
_ ->
|
_ ->
|
||||||
?DEBUG("Got availability presence stanza", []),
|
case fxml:get_subtag_with_xmlns(Stanza, <<"event">>,
|
||||||
{stop, queue}
|
?NS_PUBSUB_EVENT) of
|
||||||
end;
|
#xmlel{children = Els} ->
|
||||||
filter_presence(Action, _Stanza) -> Action.
|
get_pep_node_and_xmlns(fxml:remove_cdata(Els));
|
||||||
|
|
||||||
filter_chat_states(_Action, #xmlel{name = <<"message">>} = Stanza) ->
|
|
||||||
case jlib:is_standalone_chat_state(Stanza) of
|
|
||||||
true ->
|
|
||||||
?DEBUG("Got standalone chat state notification", []),
|
|
||||||
{stop, drop};
|
|
||||||
false ->
|
false ->
|
||||||
?DEBUG("Got message stanza", []),
|
false
|
||||||
{stop, send}
|
end
|
||||||
end;
|
end.
|
||||||
filter_chat_states(Action, _Stanza) -> Action.
|
|
||||||
|
|
||||||
mod_opt_type(drop_chat_states) ->
|
-spec get_pep_node_and_xmlns([xmlel()]) -> {pep, binary(), binary()} | false.
|
||||||
fun(B) when is_boolean(B) -> B end;
|
|
||||||
mod_opt_type(queue_presence) ->
|
get_pep_node_and_xmlns([#xmlel{name = <<"items">>, attrs = ItemsAttrs,
|
||||||
fun(B) when is_boolean(B) -> B end;
|
children = Item}]) ->
|
||||||
mod_opt_type(_) -> [drop_chat_states, queue_presence].
|
case {fxml:get_attr(<<"node">>, ItemsAttrs), fxml:remove_cdata(Item)} of
|
||||||
|
{{value, Node}, [#xmlel{name = <<"item">>, children = Payload}]} ->
|
||||||
|
case fxml:remove_cdata(Payload) of
|
||||||
|
[#xmlel{attrs = PayloadAttrs}] ->
|
||||||
|
case fxml:get_attr(<<"xmlns">>, PayloadAttrs) of
|
||||||
|
{value, XMLNS} ->
|
||||||
|
{value, {pep, Node, XMLNS}};
|
||||||
|
false ->
|
||||||
|
false
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
false
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
false
|
||||||
|
end;
|
||||||
|
get_pep_node_and_xmlns(_) ->
|
||||||
|
false.
|
||||||
|
@ -2255,15 +2255,46 @@ client_state_master(Config) ->
|
|||||||
ChatState = #message{to = Peer, thread = <<"1">>,
|
ChatState = #message{to = Peer, thread = <<"1">>,
|
||||||
sub_els = [#chatstate{type = active}]},
|
sub_els = [#chatstate{type = active}]},
|
||||||
Message = ChatState#message{body = [#text{data = <<"body">>}]},
|
Message = ChatState#message{body = [#text{data = <<"body">>}]},
|
||||||
|
PepPayload = xmpp_codec:encode(#presence{}),
|
||||||
|
PepOne = #message{
|
||||||
|
to = Peer,
|
||||||
|
sub_els =
|
||||||
|
[#pubsub_event{
|
||||||
|
items =
|
||||||
|
[#pubsub_event_items{
|
||||||
|
node = <<"foo-1">>,
|
||||||
|
items =
|
||||||
|
[#pubsub_event_item{
|
||||||
|
id = <<"pep-1">>,
|
||||||
|
xml_els = [PepPayload]}]}]}]},
|
||||||
|
PepTwo = #message{
|
||||||
|
to = Peer,
|
||||||
|
sub_els =
|
||||||
|
[#pubsub_event{
|
||||||
|
items =
|
||||||
|
[#pubsub_event_items{
|
||||||
|
node = <<"foo-2">>,
|
||||||
|
items =
|
||||||
|
[#pubsub_event_item{
|
||||||
|
id = <<"pep-2">>,
|
||||||
|
xml_els = [PepPayload]}]}]}]},
|
||||||
%% Wait for the slave to become inactive.
|
%% Wait for the slave to become inactive.
|
||||||
wait_for_slave(Config),
|
wait_for_slave(Config),
|
||||||
%% Should be dropped:
|
|
||||||
send(Config, ChatState),
|
|
||||||
%% Should be queued (but see below):
|
%% Should be queued (but see below):
|
||||||
send(Config, Presence),
|
send(Config, Presence),
|
||||||
%% Should replace the previous presence in the queue:
|
%% Should replace the previous presence in the queue:
|
||||||
send(Config, Presence#presence{type = unavailable}),
|
send(Config, Presence#presence{type = unavailable}),
|
||||||
%% Should be sent immediately, together with the previous presence:
|
%% The following two PEP stanzas should be queued (but see below):
|
||||||
|
send(Config, PepOne),
|
||||||
|
send(Config, PepTwo),
|
||||||
|
%% The following two PEP stanzas should replace the previous two:
|
||||||
|
send(Config, PepOne),
|
||||||
|
send(Config, PepTwo),
|
||||||
|
%% Should be queued (but see below):
|
||||||
|
send(Config, ChatState),
|
||||||
|
%% Should replace the previous chat state in the queue:
|
||||||
|
send(Config, ChatState#message{sub_els = [#chatstate{type = composing}]}),
|
||||||
|
%% Should be sent immediately, together with the queued stanzas:
|
||||||
send(Config, Message),
|
send(Config, Message),
|
||||||
%% Wait for the slave to become active.
|
%% Wait for the slave to become active.
|
||||||
wait_for_slave(Config),
|
wait_for_slave(Config),
|
||||||
@ -2277,6 +2308,31 @@ client_state_slave(Config) ->
|
|||||||
wait_for_master(Config),
|
wait_for_master(Config),
|
||||||
?recv1(#presence{from = Peer, type = unavailable,
|
?recv1(#presence{from = Peer, type = unavailable,
|
||||||
sub_els = [#delay{}]}),
|
sub_els = [#delay{}]}),
|
||||||
|
#message{
|
||||||
|
from = Peer,
|
||||||
|
sub_els =
|
||||||
|
[#pubsub_event{
|
||||||
|
items =
|
||||||
|
[#pubsub_event_items{
|
||||||
|
node = <<"foo-1">>,
|
||||||
|
items =
|
||||||
|
[#pubsub_event_item{
|
||||||
|
id = <<"pep-1">>}]}]},
|
||||||
|
#delay{}]} = recv(),
|
||||||
|
#message{
|
||||||
|
from = Peer,
|
||||||
|
sub_els =
|
||||||
|
[#pubsub_event{
|
||||||
|
items =
|
||||||
|
[#pubsub_event_items{
|
||||||
|
node = <<"foo-2">>,
|
||||||
|
items =
|
||||||
|
[#pubsub_event_item{
|
||||||
|
id = <<"pep-2">>}]}]},
|
||||||
|
#delay{}]} = recv(),
|
||||||
|
?recv1(#message{from = Peer, thread = <<"1">>,
|
||||||
|
sub_els = [#chatstate{type = composing},
|
||||||
|
#delay{}]}),
|
||||||
?recv1(#message{from = Peer, thread = <<"1">>,
|
?recv1(#message{from = Peer, thread = <<"1">>,
|
||||||
body = [#text{data = <<"body">>}],
|
body = [#text{data = <<"body">>}],
|
||||||
sub_els = [#chatstate{type = active}]}),
|
sub_els = [#chatstate{type = active}]}),
|
||||||
|
@ -213,8 +213,9 @@ Welcome to this XMPP server."
|
|||||||
db_type: internal
|
db_type: internal
|
||||||
mod_carboncopy: []
|
mod_carboncopy: []
|
||||||
mod_client_state:
|
mod_client_state:
|
||||||
drop_chat_states: true
|
|
||||||
queue_presence: true
|
queue_presence: true
|
||||||
|
queue_chat_states: true
|
||||||
|
queue_pep: true
|
||||||
mod_adhoc: []
|
mod_adhoc: []
|
||||||
mod_configure: []
|
mod_configure: []
|
||||||
mod_disco: []
|
mod_disco: []
|
||||||
@ -269,8 +270,9 @@ Welcome to this XMPP server."
|
|||||||
db_type: internal
|
db_type: internal
|
||||||
mod_carboncopy: []
|
mod_carboncopy: []
|
||||||
mod_client_state:
|
mod_client_state:
|
||||||
drop_chat_states: true
|
|
||||||
queue_presence: true
|
queue_presence: true
|
||||||
|
queue_chat_states: true
|
||||||
|
queue_pep: true
|
||||||
mod_adhoc: []
|
mod_adhoc: []
|
||||||
mod_configure: []
|
mod_configure: []
|
||||||
mod_disco: []
|
mod_disco: []
|
||||||
|
Loading…
Reference in New Issue
Block a user