From ba74c1c367cba56feb31e896bec3c07645bde8f6 Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Tue, 17 May 2016 19:27:18 +0200 Subject: [PATCH 1/4] Move CSI queue handling into mod_client_state Let mod_client_state handle the queueing of stanzas, not just their classification. This simplifies the ejabberd_c2s code and gives (custom) CSI modules more flexibility. --- src/ejabberd_c2s.erl | 80 +++++++---------------------- src/mod_client_state.erl | 107 ++++++++++++++++++++++++++++++--------- 2 files changed, 103 insertions(+), 84 deletions(-) diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 080880bec..5e30d5ffc 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -104,7 +104,6 @@ ip, aux_fields = [], csi_state = active, - csi_queue = [], mgmt_state, mgmt_xmlns, mgmt_queue, @@ -1147,7 +1146,7 @@ session_established({xmlstreamelement, #xmlel{name = <<"active">>, attrs = [{<<"xmlns">>, ?NS_CLIENT_STATE}]}}, StateData) -> - NewStateData = csi_queue_flush(StateData), + NewStateData = csi_flush_queue(StateData), fsm_next_state(session_established, NewStateData#state{csi_state = active}); session_established({xmlstreamelement, #xmlel{name = <<"inactive">>, @@ -2763,7 +2762,7 @@ handle_resume(StateData, Attrs) -> #xmlel{name = <<"r">>, attrs = [{<<"xmlns">>, AttrXmlns}], children = []}), - FlushedState = csi_queue_flush(NewState), + FlushedState = csi_flush_queue(NewState), NewStateData = FlushedState#state{csi_state = active}, ?INFO_MSG("Resumed session for ~s", [jid:to_string(NewStateData#state.jid)]), @@ -2995,7 +2994,6 @@ inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) -> privacy_list = OldStateData#state.privacy_list, aux_fields = OldStateData#state.aux_fields, csi_state = OldStateData#state.csi_state, - csi_queue = OldStateData#state.csi_queue, mgmt_xmlns = OldStateData#state.mgmt_xmlns, mgmt_queue = OldStateData#state.mgmt_queue, mgmt_timeout = OldStateData#state.mgmt_timeout, @@ -3028,65 +3026,25 @@ add_resent_delay_info(#state{server = From}, El, Time) -> %%% XEP-0352 %%%---------------------------------------------------------------------- -csi_filter_stanza(#state{csi_state = CsiState, jid = JID} = StateData, +csi_filter_stanza(#state{csi_state = CsiState, server = Server} = StateData, Stanza) -> - Action = ejabberd_hooks:run_fold(csi_filter_stanza, - StateData#state.server, - send, [Stanza]), - ?DEBUG("Going to ~p stanza for inactive client ~p", - [Action, jid:to_string(JID)]), - case Action of - queue -> csi_queue_add(StateData, Stanza); - drop -> StateData; - send -> - From = fxml:get_tag_attr_s(<<"from">>, Stanza), - StateData1 = csi_queue_send(StateData, From), - StateData2 = send_stanza(StateData1#state{csi_state = active}, - Stanza), - StateData2#state{csi_state = CsiState} - end. + {StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_filter_stanza, Server, + {StateData, [Stanza]}, + [Server, Stanza]), + StateData2 = lists:foldl(fun(CurStanza, AccState) -> + send_stanza(AccState, CurStanza) + end, StateData1#state{csi_state = active}, + Stanzas), + StateData2#state{csi_state = CsiState}. -csi_queue_add(#state{csi_queue = Queue} = StateData, Stanza) -> - case length(StateData#state.csi_queue) >= csi_max_queue(StateData) of - true -> csi_queue_add(csi_queue_flush(StateData), Stanza); - false -> - From = fxml:get_tag_attr_s(<<"from">>, Stanza), - NewQueue = lists:keystore(From, 1, Queue, {From, p1_time_compat:timestamp(), Stanza}), - StateData#state{csi_queue = NewQueue} - end. - -csi_queue_send(#state{csi_queue = Queue, csi_state = CsiState, server = Host} = - StateData, From) -> - case lists:keytake(From, 1, Queue) of - {value, {From, Time, Stanza}, NewQueue} -> - NewStanza = jlib:add_delay_info(Stanza, Host, Time, - <<"Client Inactive">>), - NewStateData = send_stanza(StateData#state{csi_state = active}, - NewStanza), - NewStateData#state{csi_queue = NewQueue, csi_state = CsiState}; - false -> StateData - end. - -csi_queue_flush(#state{csi_queue = Queue, csi_state = CsiState, jid = JID, - server = Host} = StateData) -> - ?DEBUG("Flushing CSI queue for ~s", [jid:to_string(JID)]), - NewStateData = - lists:foldl(fun({_From, Time, Stanza}, AccState) -> - NewStanza = - jlib:add_delay_info(Stanza, Host, Time, - <<"Client Inactive">>), - send_stanza(AccState, NewStanza) - end, StateData#state{csi_state = active}, Queue), - NewStateData#state{csi_queue = [], csi_state = CsiState}. - -%% Make sure we won't push too many messages to the XEP-0198 queue when the -%% client becomes 'active' again. Otherwise, the client might not manage to -%% acknowledge the message flood in time. Also, don't let the queue grow to -%% more than 100 stanzas. -csi_max_queue(#state{mgmt_max_queue = infinity}) -> 100; -csi_max_queue(#state{mgmt_max_queue = Max}) when Max > 200 -> 100; -csi_max_queue(#state{mgmt_max_queue = Max}) when Max < 2 -> 1; -csi_max_queue(#state{mgmt_max_queue = Max}) -> Max div 2. +csi_flush_queue(#state{csi_state = CsiState, server = Server} = StateData) -> + {StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_flush_queue, Server, + {StateData, []}, [Server]), + StateData2 = lists:foldl(fun(CurStanza, AccState) -> + send_stanza(AccState, CurStanza) + end, StateData1#state{csi_state = active}, + Stanzas), + StateData2#state{csi_state = CsiState}. %%%---------------------------------------------------------------------- %%% JID Set memory footprint reduction code diff --git a/src/mod_client_state.erl b/src/mod_client_state.erl index 790e808f1..7d23beb0d 100644 --- a/src/mod_client_state.erl +++ b/src/mod_client_state.erl @@ -31,20 +31,22 @@ -behavior(gen_mod). -export([start/2, stop/1, add_stream_feature/2, - filter_presence/2, filter_chat_states/2, + filter_presence/3, filter_chat_states/3, filter_other/3, flush_queue/2, mod_opt_type/1]). -include("ejabberd.hrl"). -include("logger.hrl"). -include("jlib.hrl"). +-define(CSI_QUEUE_MAX, 100). + start(Host, Opts) -> QueuePresence = gen_mod:get_opt(queue_presence, Opts, fun(B) when is_boolean(B) -> B end, true), DropChatStates = gen_mod:get_opt(drop_chat_states, Opts, - fun(B) when is_boolean(B) -> B end, - true), + fun(B) when is_boolean(B) -> B end, + true), if QueuePresence; DropChatStates -> ejabberd_hooks:add(c2s_post_auth_features, Host, ?MODULE, add_stream_feature, 50), @@ -57,10 +59,13 @@ start(Host, Opts) -> ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE, filter_chat_states, 50); true -> ok - end; + end, + ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE, + filter_other, 100), + ejabberd_hooks:add(csi_flush_queue, Host, ?MODULE, + flush_queue, 50); true -> ok - end, - ok. + end. stop(Host) -> QueuePresence = gen_mod:get_module_opt(Host, ?MODULE, queue_presence, @@ -81,10 +86,13 @@ stop(Host) -> ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE, filter_chat_states, 50); true -> ok - end; + end, + ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE, + filter_other, 100), + ejabberd_hooks:delete(csi_flush_queue, Host, ?MODULE, + flush_queue, 50); true -> ok - end, - ok. + end. add_stream_feature(Features, _Host) -> Feature = #xmlel{name = <<"csi">>, @@ -92,30 +100,83 @@ add_stream_feature(Features, _Host) -> children = []}, [Feature | Features]. -filter_presence(_Action, #xmlel{name = <<"presence">>, attrs = Attrs}) -> +filter_presence({C2SState, _OutStanzas} = Acc, Host, + #xmlel{name = <<"presence">>, attrs = Attrs} = Stanza) -> case fxml:get_attr(<<"type">>, Attrs) of {value, Type} when Type /= <<"unavailable">> -> - ?DEBUG("Got important presence stanza", []), - {stop, send}; + Acc; _ -> ?DEBUG("Got availability presence stanza", []), - {stop, queue} + queue_add(presence, Stanza, Host, C2SState) end; -filter_presence(Action, _Stanza) -> Action. +filter_presence(Acc, _Host, _Stanza) -> Acc. -filter_chat_states(_Action, #xmlel{name = <<"message">>} = Stanza) -> +filter_chat_states({C2SState, _OutStanzas} = Acc, _Host, + #xmlel{name = <<"message">>} = Stanza) -> case jlib:is_standalone_chat_state(Stanza) of - true -> + true -> % Drop the stanza. ?DEBUG("Got standalone chat state notification", []), - {stop, drop}; + {stop, {C2SState, []}}; false -> - ?DEBUG("Got message stanza", []), - {stop, send} + Acc end; -filter_chat_states(Action, _Stanza) -> Action. +filter_chat_states(Acc, _Host, _Stanza) -> Acc. + +filter_other({C2SState, _OutStanzas}, Host, Stanza) -> + ?DEBUG("Won't add stanza to CSI queue", []), + queue_take(Stanza, Host, C2SState). + +flush_queue({C2SState, _OutStanzas}, Host) -> + ?DEBUG("Going to flush CSI queue", []), + Queue = get_queue(C2SState), + NewState = set_queue([], C2SState), + {stop, {NewState, get_stanzas(Queue, Host)}}. + +queue_add(Type, Stanza, Host, C2SState) -> + case get_queue(C2SState) of + Queue when length(Queue) >= ?CSI_QUEUE_MAX -> + ?DEBUG("CSI queue too large, going to flush it", []), + NewState = set_queue([], C2SState), + {stop, {NewState, get_stanzas(Queue, Host) ++ [Stanza]}}; + Queue -> + ?DEBUG("Adding stanza to CSI queue", []), + From = fxml:get_tag_attr_s(<<"from">>, Stanza), + Key = {jid:tolower(jid:from_string(From)), Type}, + Entry = {Key, p1_time_compat:timestamp(), Stanza}, + NewQueue = lists:keystore(Key, 1, Queue, Entry), + NewState = set_queue(NewQueue, C2SState), + {stop, {NewState, []}} + end. + +queue_take(Stanza, Host, C2SState) -> + From = fxml:get_tag_attr_s(<<"from">>, Stanza), + {LUser, LServer, _LResource} = jid:tolower(jid:from_string(From)), + {Selected, Rest} = lists:partition( + fun({{{U, S, _R}, _Type}, _Time, _Stanza}) -> + U == LUser andalso S == LServer + end, get_queue(C2SState)), + NewState = set_queue(Rest, C2SState), + {stop, {NewState, get_stanzas(Selected, Host) ++ [Stanza]}}. + +set_queue(Queue, C2SState) -> + ejabberd_c2s:set_aux_field(csi_queue, Queue, C2SState). + +get_queue(C2SState) -> + case ejabberd_c2s:get_aux_field(csi_queue, C2SState) of + {ok, Queue} -> + Queue; + error -> + [] + end. + +get_stanzas(Queue, Host) -> + lists:map(fun({_Key, Time, Stanza}) -> + jlib:add_delay_info(Stanza, Host, Time, + <<"Client Inactive">>) + end, Queue). -mod_opt_type(drop_chat_states) -> - fun(B) when is_boolean(B) -> B end; mod_opt_type(queue_presence) -> fun(B) when is_boolean(B) -> B end; -mod_opt_type(_) -> [drop_chat_states, queue_presence]. +mod_opt_type(drop_chat_states) -> + fun(B) when is_boolean(B) -> B end; +mod_opt_type(_) -> [queue_presence, drop_chat_states]. From 4f009e64fc11ca352443e6a9df5fa4d7cb715f5c Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Tue, 17 May 2016 20:55:45 +0200 Subject: [PATCH 2/4] mod_client_state: Queue chat state notifications Queue standalone chat states instead of simply dropping them when the client is inactive. Only the most recent chat state of a given client is queued. --- src/mod_client_state.erl | 30 +++++++++++++-------------- test/ejabberd_SUITE.erl | 11 +++++++--- test/ejabberd_SUITE_data/ejabberd.yml | 4 ++-- 3 files changed, 25 insertions(+), 20 deletions(-) diff --git a/src/mod_client_state.erl b/src/mod_client_state.erl index 7d23beb0d..f51a7fd24 100644 --- a/src/mod_client_state.erl +++ b/src/mod_client_state.erl @@ -44,10 +44,10 @@ start(Host, Opts) -> QueuePresence = gen_mod:get_opt(queue_presence, Opts, fun(B) when is_boolean(B) -> B end, true), - DropChatStates = gen_mod:get_opt(drop_chat_states, Opts, - fun(B) when is_boolean(B) -> B end, - true), - if QueuePresence; DropChatStates -> + QueueChatStates = gen_mod:get_opt(queue_chat_states, Opts, + fun(B) when is_boolean(B) -> B end, + true), + if QueuePresence; QueueChatStates -> ejabberd_hooks:add(c2s_post_auth_features, Host, ?MODULE, add_stream_feature, 50), if QueuePresence -> @@ -55,7 +55,7 @@ start(Host, Opts) -> filter_presence, 50); true -> ok end, - if DropChatStates -> + if QueueChatStates -> ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE, filter_chat_states, 50); true -> ok @@ -71,10 +71,10 @@ stop(Host) -> QueuePresence = gen_mod:get_module_opt(Host, ?MODULE, queue_presence, fun(B) when is_boolean(B) -> B end, true), - DropChatStates = gen_mod:get_module_opt(Host, ?MODULE, drop_chat_states, - fun(B) when is_boolean(B) -> B end, - true), - if QueuePresence; DropChatStates -> + QueueChatStates = gen_mod:get_module_opt(Host, ?MODULE, queue_chat_states, + fun(B) when is_boolean(B) -> B end, + true), + if QueuePresence; QueueChatStates -> ejabberd_hooks:delete(c2s_post_auth_features, Host, ?MODULE, add_stream_feature, 50), if QueuePresence -> @@ -82,7 +82,7 @@ stop(Host) -> filter_presence, 50); true -> ok end, - if DropChatStates -> + if QueueChatStates -> ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE, filter_chat_states, 50); true -> ok @@ -111,12 +111,12 @@ filter_presence({C2SState, _OutStanzas} = Acc, Host, end; filter_presence(Acc, _Host, _Stanza) -> Acc. -filter_chat_states({C2SState, _OutStanzas} = Acc, _Host, +filter_chat_states({C2SState, _OutStanzas} = Acc, Host, #xmlel{name = <<"message">>} = Stanza) -> case jlib:is_standalone_chat_state(Stanza) of - true -> % Drop the stanza. + true -> ?DEBUG("Got standalone chat state notification", []), - {stop, {C2SState, []}}; + queue_add(chatstate, Stanza, Host, C2SState); false -> Acc end; @@ -177,6 +177,6 @@ get_stanzas(Queue, Host) -> mod_opt_type(queue_presence) -> fun(B) when is_boolean(B) -> B end; -mod_opt_type(drop_chat_states) -> +mod_opt_type(queue_chat_states) -> fun(B) when is_boolean(B) -> B end; -mod_opt_type(_) -> [queue_presence, drop_chat_states]. +mod_opt_type(_) -> [queue_presence, queue_chat_states]. diff --git a/test/ejabberd_SUITE.erl b/test/ejabberd_SUITE.erl index aa465fb66..f3f7ebde3 100644 --- a/test/ejabberd_SUITE.erl +++ b/test/ejabberd_SUITE.erl @@ -2257,13 +2257,15 @@ client_state_master(Config) -> Message = ChatState#message{body = [#text{data = <<"body">>}]}, %% Wait for the slave to become inactive. wait_for_slave(Config), - %% Should be dropped: - send(Config, ChatState), %% Should be queued (but see below): send(Config, Presence), %% Should replace the previous presence in the queue: send(Config, Presence#presence{type = unavailable}), - %% Should be sent immediately, together with the previous presence: + %% Should be queued (but see below): + send(Config, ChatState), + %% Should replace the previous chat state in the queue: + send(Config, ChatState#message{sub_els = [#chatstate{type = composing}]}), + %% Should be sent immediately, together with the queued stanzas: send(Config, Message), %% Wait for the slave to become active. wait_for_slave(Config), @@ -2277,6 +2279,9 @@ client_state_slave(Config) -> wait_for_master(Config), ?recv1(#presence{from = Peer, type = unavailable, sub_els = [#delay{}]}), + ?recv1(#message{from = Peer, thread = <<"1">>, + sub_els = [#chatstate{type = composing}, + #delay{}]}), ?recv1(#message{from = Peer, thread = <<"1">>, body = [#text{data = <<"body">>}], sub_els = [#chatstate{type = active}]}), diff --git a/test/ejabberd_SUITE_data/ejabberd.yml b/test/ejabberd_SUITE_data/ejabberd.yml index 869c24c7a..30fff88fc 100644 --- a/test/ejabberd_SUITE_data/ejabberd.yml +++ b/test/ejabberd_SUITE_data/ejabberd.yml @@ -213,8 +213,8 @@ Welcome to this XMPP server." db_type: internal mod_carboncopy: [] mod_client_state: - drop_chat_states: true queue_presence: true + queue_chat_states: true mod_adhoc: [] mod_configure: [] mod_disco: [] @@ -269,8 +269,8 @@ Welcome to this XMPP server." db_type: internal mod_carboncopy: [] mod_client_state: - drop_chat_states: true queue_presence: true + queue_chat_states: true mod_adhoc: [] mod_configure: [] mod_disco: [] From 8f72c27b88cd83e50879caf80d8d2546fd19ef2d Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Tue, 17 May 2016 22:12:04 +0200 Subject: [PATCH 3/4] mod_client_state: Add "queue_pep" option If the new "queue_pep" option is enabled and the client is inactive, PEP notifications are throttled in a similar way to presence stanzas and chat states. Only the most recent notification of a given node and payload type will be queued from a given contact. --- src/mod_client_state.erl | 105 +++++++++++++++++++++----- test/ejabberd_SUITE.erl | 51 +++++++++++++ test/ejabberd_SUITE_data/ejabberd.yml | 2 + 3 files changed, 141 insertions(+), 17 deletions(-) diff --git a/src/mod_client_state.erl b/src/mod_client_state.erl index f51a7fd24..4e5938f06 100644 --- a/src/mod_client_state.erl +++ b/src/mod_client_state.erl @@ -31,8 +31,8 @@ -behavior(gen_mod). -export([start/2, stop/1, add_stream_feature/2, - filter_presence/3, filter_chat_states/3, filter_other/3, flush_queue/2, - mod_opt_type/1]). + filter_presence/3, filter_chat_states/3, filter_pep/3, filter_other/3, + flush_queue/2, mod_opt_type/1]). -include("ejabberd.hrl"). -include("logger.hrl"). @@ -41,13 +41,19 @@ -define(CSI_QUEUE_MAX, 100). start(Host, Opts) -> - QueuePresence = gen_mod:get_opt(queue_presence, Opts, - fun(B) when is_boolean(B) -> B end, - true), - QueueChatStates = gen_mod:get_opt(queue_chat_states, Opts, - fun(B) when is_boolean(B) -> B end, - true), - if QueuePresence; QueueChatStates -> + QueuePresence = + gen_mod:get_opt(queue_presence, Opts, + fun(B) when is_boolean(B) -> B end, + true), + QueueChatStates = + gen_mod:get_opt(queue_chat_states, Opts, + fun(B) when is_boolean(B) -> B end, + true), + QueuePEP = + gen_mod:get_opt(queue_pep, Opts, + fun(B) when is_boolean(B) -> B end, + false), + if QueuePresence; QueueChatStates; QueuePEP -> ejabberd_hooks:add(c2s_post_auth_features, Host, ?MODULE, add_stream_feature, 50), if QueuePresence -> @@ -60,6 +66,11 @@ start(Host, Opts) -> filter_chat_states, 50); true -> ok end, + if QueuePEP -> + ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE, + filter_pep, 50); + true -> ok + end, ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE, filter_other, 100), ejabberd_hooks:add(csi_flush_queue, Host, ?MODULE, @@ -68,13 +79,19 @@ start(Host, Opts) -> end. stop(Host) -> - QueuePresence = gen_mod:get_module_opt(Host, ?MODULE, queue_presence, - fun(B) when is_boolean(B) -> B end, - true), - QueueChatStates = gen_mod:get_module_opt(Host, ?MODULE, queue_chat_states, - fun(B) when is_boolean(B) -> B end, - true), - if QueuePresence; QueueChatStates -> + QueuePresence = + gen_mod:get_module_opt(Host, ?MODULE, queue_presence, + fun(B) when is_boolean(B) -> B end, + true), + QueueChatStates = + gen_mod:get_module_opt(Host, ?MODULE, queue_chat_states, + fun(B) when is_boolean(B) -> B end, + true), + QueuePEP = + gen_mod:get_module_opt(Host, ?MODULE, queue_pep, + fun(B) when is_boolean(B) -> B end, + false), + if QueuePresence; QueueChatStates; QueuePEP -> ejabberd_hooks:delete(c2s_post_auth_features, Host, ?MODULE, add_stream_feature, 50), if QueuePresence -> @@ -87,6 +104,11 @@ stop(Host) -> filter_chat_states, 50); true -> ok end, + if QueuePEP -> + ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE, + filter_pep, 50); + true -> ok + end, ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE, filter_other, 100), ejabberd_hooks:delete(csi_flush_queue, Host, ?MODULE, @@ -122,6 +144,17 @@ filter_chat_states({C2SState, _OutStanzas} = Acc, Host, end; filter_chat_states(Acc, _Host, _Stanza) -> Acc. +filter_pep({C2SState, _OutStanzas} = Acc, Host, + #xmlel{name = <<"message">>} = Stanza) -> + case find_pep(Stanza) of + {value, Type} -> + ?DEBUG("Got PEP notification", []), + queue_add(Type, Stanza, Host, C2SState); + false -> + Acc + end; +filter_pep(Acc, _Host, _Stanza) -> Acc. + filter_other({C2SState, _OutStanzas}, Host, Stanza) -> ?DEBUG("Won't add stanza to CSI queue", []), queue_take(Stanza, Host, C2SState). @@ -132,6 +165,42 @@ flush_queue({C2SState, _OutStanzas}, Host) -> NewState = set_queue([], C2SState), {stop, {NewState, get_stanzas(Queue, Host)}}. +find_pep(#xmlel{name = <<"message">>} = Stanza) -> + From = fxml:get_tag_attr_s(<<"from">>, Stanza), + case jid:from_string(From) of + #jid{luser = <<>>} -> % It's not PEP. + false; + _ -> + case fxml:get_subtag_with_xmlns(Stanza, <<"event">>, + ?NS_PUBSUB_EVENT) of + #xmlel{children = Els} -> + get_pep_node_and_xmlns(fxml:remove_cdata(Els)); + false -> + false + end + end. + +get_pep_node_and_xmlns([#xmlel{name = <<"items">>, attrs = ItemsAttrs, + children = Item}]) -> + case {fxml:get_attr(<<"node">>, ItemsAttrs), fxml:remove_cdata(Item)} of + {{value, Node}, [#xmlel{name = <<"item">>, children = Payload}]} -> + case fxml:remove_cdata(Payload) of + [#xmlel{attrs = PayloadAttrs}] -> + case fxml:get_attr(<<"xmlns">>, PayloadAttrs) of + {value, XMLNS} -> + {value, {Node, XMLNS}}; + false -> + false + end; + _ -> + false + end; + _ -> + false + end; +get_pep_node_and_xmlns(_) -> + false. + queue_add(Type, Stanza, Host, C2SState) -> case get_queue(C2SState) of Queue when length(Queue) >= ?CSI_QUEUE_MAX -> @@ -179,4 +248,6 @@ mod_opt_type(queue_presence) -> fun(B) when is_boolean(B) -> B end; mod_opt_type(queue_chat_states) -> fun(B) when is_boolean(B) -> B end; -mod_opt_type(_) -> [queue_presence, queue_chat_states]. +mod_opt_type(queue_pep) -> + fun(B) when is_boolean(B) -> B end; +mod_opt_type(_) -> [queue_presence, queue_chat_states, queue_pep]. diff --git a/test/ejabberd_SUITE.erl b/test/ejabberd_SUITE.erl index f3f7ebde3..800d5ebf3 100644 --- a/test/ejabberd_SUITE.erl +++ b/test/ejabberd_SUITE.erl @@ -2255,12 +2255,41 @@ client_state_master(Config) -> ChatState = #message{to = Peer, thread = <<"1">>, sub_els = [#chatstate{type = active}]}, Message = ChatState#message{body = [#text{data = <<"body">>}]}, + PepPayload = xmpp_codec:encode(#presence{}), + PepOne = #message{ + to = Peer, + sub_els = + [#pubsub_event{ + items = + [#pubsub_event_items{ + node = <<"foo-1">>, + items = + [#pubsub_event_item{ + id = <<"pep-1">>, + xml_els = [PepPayload]}]}]}]}, + PepTwo = #message{ + to = Peer, + sub_els = + [#pubsub_event{ + items = + [#pubsub_event_items{ + node = <<"foo-2">>, + items = + [#pubsub_event_item{ + id = <<"pep-2">>, + xml_els = [PepPayload]}]}]}]}, %% Wait for the slave to become inactive. wait_for_slave(Config), %% Should be queued (but see below): send(Config, Presence), %% Should replace the previous presence in the queue: send(Config, Presence#presence{type = unavailable}), + %% The following two PEP stanzas should be queued (but see below): + send(Config, PepOne), + send(Config, PepTwo), + %% The following two PEP stanzas should replace the previous two: + send(Config, PepOne), + send(Config, PepTwo), %% Should be queued (but see below): send(Config, ChatState), %% Should replace the previous chat state in the queue: @@ -2279,6 +2308,28 @@ client_state_slave(Config) -> wait_for_master(Config), ?recv1(#presence{from = Peer, type = unavailable, sub_els = [#delay{}]}), + #message{ + from = Peer, + sub_els = + [#pubsub_event{ + items = + [#pubsub_event_items{ + node = <<"foo-1">>, + items = + [#pubsub_event_item{ + id = <<"pep-1">>}]}]}, + #delay{}]} = recv(), + #message{ + from = Peer, + sub_els = + [#pubsub_event{ + items = + [#pubsub_event_items{ + node = <<"foo-2">>, + items = + [#pubsub_event_item{ + id = <<"pep-2">>}]}]}, + #delay{}]} = recv(), ?recv1(#message{from = Peer, thread = <<"1">>, sub_els = [#chatstate{type = composing}, #delay{}]}), diff --git a/test/ejabberd_SUITE_data/ejabberd.yml b/test/ejabberd_SUITE_data/ejabberd.yml index 30fff88fc..1adbcce8a 100644 --- a/test/ejabberd_SUITE_data/ejabberd.yml +++ b/test/ejabberd_SUITE_data/ejabberd.yml @@ -215,6 +215,7 @@ Welcome to this XMPP server." mod_client_state: queue_presence: true queue_chat_states: true + queue_pep: true mod_adhoc: [] mod_configure: [] mod_disco: [] @@ -271,6 +272,7 @@ Welcome to this XMPP server." mod_client_state: queue_presence: true queue_chat_states: true + queue_pep: true mod_adhoc: [] mod_configure: [] mod_disco: [] From 420ae655900103ef2125e0a0b4bc59ac94aea227 Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Wed, 18 May 2016 21:30:38 +0200 Subject: [PATCH 4/4] mod_client_state: Add function specifications Add function specifications and apply cosmetic changes to mod_client_state. --- src/mod_client_state.erl | 157 ++++++++++++++++++++++++++------------- 1 file changed, 107 insertions(+), 50 deletions(-) diff --git a/src/mod_client_state.erl b/src/mod_client_state.erl index 4e5938f06..0d3a289a3 100644 --- a/src/mod_client_state.erl +++ b/src/mod_client_state.erl @@ -30,9 +30,12 @@ -behavior(gen_mod). --export([start/2, stop/1, add_stream_feature/2, - filter_presence/3, filter_chat_states/3, filter_pep/3, filter_other/3, - flush_queue/2, mod_opt_type/1]). +%% gen_mod callbacks. +-export([start/2, stop/1, mod_opt_type/1]). + +%% ejabberd_hooks callbacks. +-export([filter_presence/3, filter_chat_states/3, filter_pep/3, filter_other/3, + flush_queue/2, add_stream_feature/2]). -include("ejabberd.hrl"). -include("logger.hrl"). @@ -40,6 +43,17 @@ -define(CSI_QUEUE_MAX, 100). +-type csi_type() :: presence | chatstate | {pep, binary(), binary()}. +-type csi_key() :: {ljid(), csi_type()}. +-type csi_stanza() :: {csi_key(), erlang:timestamp(), xmlel()}. +-type csi_queue() :: [csi_stanza()]. + +%%-------------------------------------------------------------------- +%% gen_mod callbacks. +%%-------------------------------------------------------------------- + +-spec start(binary(), gen_mod:opts()) -> ok. + start(Host, Opts) -> QueuePresence = gen_mod:get_opt(queue_presence, Opts, @@ -78,6 +92,8 @@ start(Host, Opts) -> true -> ok end. +-spec stop(binary()) -> ok. + stop(Host) -> QueuePresence = gen_mod:get_module_opt(Host, ?MODULE, queue_presence, @@ -116,11 +132,22 @@ stop(Host) -> true -> ok end. -add_stream_feature(Features, _Host) -> - Feature = #xmlel{name = <<"csi">>, - attrs = [{<<"xmlns">>, ?NS_CLIENT_STATE}], - children = []}, - [Feature | Features]. +-spec mod_opt_type(atom()) -> fun((term()) -> term()) | [atom()]. + +mod_opt_type(queue_presence) -> + fun(B) when is_boolean(B) -> B end; +mod_opt_type(queue_chat_states) -> + fun(B) when is_boolean(B) -> B end; +mod_opt_type(queue_pep) -> + fun(B) when is_boolean(B) -> B end; +mod_opt_type(_) -> [queue_presence, queue_chat_states, queue_pep]. + +%%-------------------------------------------------------------------- +%% ejabberd_hooks callbacks. +%%-------------------------------------------------------------------- + +-spec filter_presence({term(), [xmlel()]}, binary(), xmlel()) + -> {term(), [xmlel()]} | {stop, {term(), [xmlel()]}}. filter_presence({C2SState, _OutStanzas} = Acc, Host, #xmlel{name = <<"presence">>, attrs = Attrs} = Stanza) -> @@ -133,6 +160,9 @@ filter_presence({C2SState, _OutStanzas} = Acc, Host, end; filter_presence(Acc, _Host, _Stanza) -> Acc. +-spec filter_chat_states({term(), [xmlel()]}, binary(), xmlel()) + -> {term(), [xmlel()]} | {stop, {term(), [xmlel()]}}. + filter_chat_states({C2SState, _OutStanzas} = Acc, Host, #xmlel{name = <<"message">>} = Stanza) -> case jlib:is_standalone_chat_state(Stanza) of @@ -144,6 +174,9 @@ filter_chat_states({C2SState, _OutStanzas} = Acc, Host, end; filter_chat_states(Acc, _Host, _Stanza) -> Acc. +-spec filter_pep({term(), [xmlel()]}, binary(), xmlel()) + -> {term(), [xmlel()]} | {stop, {term(), [xmlel()]}}. + filter_pep({C2SState, _OutStanzas} = Acc, Host, #xmlel{name = <<"message">>} = Stanza) -> case find_pep(Stanza) of @@ -155,51 +188,35 @@ filter_pep({C2SState, _OutStanzas} = Acc, Host, end; filter_pep(Acc, _Host, _Stanza) -> Acc. +-spec filter_other({term(), [xmlel()]}, binary(), xmlel()) + -> {stop, {term(), [xmlel()]}}. + filter_other({C2SState, _OutStanzas}, Host, Stanza) -> ?DEBUG("Won't add stanza to CSI queue", []), queue_take(Stanza, Host, C2SState). +-spec flush_queue({term(), [xmlel()]}, binary()) -> {term(), [xmlel()]}. + flush_queue({C2SState, _OutStanzas}, Host) -> ?DEBUG("Going to flush CSI queue", []), Queue = get_queue(C2SState), NewState = set_queue([], C2SState), - {stop, {NewState, get_stanzas(Queue, Host)}}. + {NewState, get_stanzas(Queue, Host)}. -find_pep(#xmlel{name = <<"message">>} = Stanza) -> - From = fxml:get_tag_attr_s(<<"from">>, Stanza), - case jid:from_string(From) of - #jid{luser = <<>>} -> % It's not PEP. - false; - _ -> - case fxml:get_subtag_with_xmlns(Stanza, <<"event">>, - ?NS_PUBSUB_EVENT) of - #xmlel{children = Els} -> - get_pep_node_and_xmlns(fxml:remove_cdata(Els)); - false -> - false - end - end. +-spec add_stream_feature([xmlel()], binary) -> [xmlel()]. -get_pep_node_and_xmlns([#xmlel{name = <<"items">>, attrs = ItemsAttrs, - children = Item}]) -> - case {fxml:get_attr(<<"node">>, ItemsAttrs), fxml:remove_cdata(Item)} of - {{value, Node}, [#xmlel{name = <<"item">>, children = Payload}]} -> - case fxml:remove_cdata(Payload) of - [#xmlel{attrs = PayloadAttrs}] -> - case fxml:get_attr(<<"xmlns">>, PayloadAttrs) of - {value, XMLNS} -> - {value, {Node, XMLNS}}; - false -> - false - end; - _ -> - false - end; - _ -> - false - end; -get_pep_node_and_xmlns(_) -> - false. +add_stream_feature(Features, _Host) -> + Feature = #xmlel{name = <<"csi">>, + attrs = [{<<"xmlns">>, ?NS_CLIENT_STATE}], + children = []}, + [Feature | Features]. + +%%-------------------------------------------------------------------- +%% Internal functions. +%%-------------------------------------------------------------------- + +-spec queue_add(csi_type(), xmlel(), binary(), term()) + -> {stop, {term(), [xmlel()]}}. queue_add(Type, Stanza, Host, C2SState) -> case get_queue(C2SState) of @@ -217,6 +234,8 @@ queue_add(Type, Stanza, Host, C2SState) -> {stop, {NewState, []}} end. +-spec queue_take(xmlel(), binary(), term()) -> {stop, {term(), [xmlel()]}}. + queue_take(Stanza, Host, C2SState) -> From = fxml:get_tag_attr_s(<<"from">>, Stanza), {LUser, LServer, _LResource} = jid:tolower(jid:from_string(From)), @@ -227,9 +246,13 @@ queue_take(Stanza, Host, C2SState) -> NewState = set_queue(Rest, C2SState), {stop, {NewState, get_stanzas(Selected, Host) ++ [Stanza]}}. +-spec set_queue(csi_queue(), term()) -> term(). + set_queue(Queue, C2SState) -> ejabberd_c2s:set_aux_field(csi_queue, Queue, C2SState). +-spec get_queue(term()) -> csi_queue(). + get_queue(C2SState) -> case ejabberd_c2s:get_aux_field(csi_queue, C2SState) of {ok, Queue} -> @@ -238,16 +261,50 @@ get_queue(C2SState) -> [] end. +-spec get_stanzas(csi_queue(), binary()) -> [xmlel()]. + get_stanzas(Queue, Host) -> lists:map(fun({_Key, Time, Stanza}) -> jlib:add_delay_info(Stanza, Host, Time, <<"Client Inactive">>) end, Queue). -mod_opt_type(queue_presence) -> - fun(B) when is_boolean(B) -> B end; -mod_opt_type(queue_chat_states) -> - fun(B) when is_boolean(B) -> B end; -mod_opt_type(queue_pep) -> - fun(B) when is_boolean(B) -> B end; -mod_opt_type(_) -> [queue_presence, queue_chat_states, queue_pep]. +-spec find_pep(xmlel()) -> {pep, binary(), binary()} | false. + +find_pep(#xmlel{name = <<"message">>} = Stanza) -> + From = fxml:get_tag_attr_s(<<"from">>, Stanza), + case jid:from_string(From) of + #jid{luser = <<>>} -> % It's not PEP. + false; + _ -> + case fxml:get_subtag_with_xmlns(Stanza, <<"event">>, + ?NS_PUBSUB_EVENT) of + #xmlel{children = Els} -> + get_pep_node_and_xmlns(fxml:remove_cdata(Els)); + false -> + false + end + end. + +-spec get_pep_node_and_xmlns([xmlel()]) -> {pep, binary(), binary()} | false. + +get_pep_node_and_xmlns([#xmlel{name = <<"items">>, attrs = ItemsAttrs, + children = Item}]) -> + case {fxml:get_attr(<<"node">>, ItemsAttrs), fxml:remove_cdata(Item)} of + {{value, Node}, [#xmlel{name = <<"item">>, children = Payload}]} -> + case fxml:remove_cdata(Payload) of + [#xmlel{attrs = PayloadAttrs}] -> + case fxml:get_attr(<<"xmlns">>, PayloadAttrs) of + {value, XMLNS} -> + {value, {pep, Node, XMLNS}}; + false -> + false + end; + _ -> + false + end; + _ -> + false + end; +get_pep_node_and_xmlns(_) -> + false.