diff --git a/src/mod_push.erl b/src/mod_push.erl index dd16e87fa..077556ffd 100644 --- a/src/mod_push.erl +++ b/src/mod_push.erl @@ -44,7 +44,7 @@ -export([get_commands_spec/0, delete_old_sessions/1]). %% API (used by mod_push_keepalive). --export([notify/2, notify/4, notify/6]). +-export([notify/2, notify/4, notify/6, is_message_with_body/1]). %% For IQ callbacks -export([delete_session/3]). @@ -388,7 +388,7 @@ c2s_session_pending(#{push_enabled := true, mgmt_queue := Queue} = State) -> case p1_queue:len(Queue) of Len when Len > 0 -> ?DEBUG("Notifying client of unacknowledged stanza(s)", []), - Pkt = queue_find(fun is_message_with_body/1, Queue), + Pkt = mod_stream_mgmt:queue_find(fun is_message_with_body/1, Queue), notify(State, Pkt), State; 0 -> @@ -462,6 +462,15 @@ notify(LServer, PushLJID, Node, XData, Pkt, HandleResponse) -> sub_els = [PubSub]}, ejabberd_router:route_iq(IQ, HandleResponse). +%%-------------------------------------------------------------------- +%% Miscellaneous. +%%-------------------------------------------------------------------- +-spec is_message_with_body(stanza()) -> boolean(). +is_message_with_body(#message{} = Msg) -> + get_body_text(Msg) /= none; +is_message_with_body(_Stanza) -> + false. + %%-------------------------------------------------------------------- %% Internal functions. %%-------------------------------------------------------------------- @@ -583,21 +592,6 @@ drop_online_sessions(LUser, LServer, Clients) -> [Client || {TS, _, _, _} = Client <- Clients, lists:keyfind(TS, 1, SessIDs) == false]. --spec queue_find(fun((stanza()) -> boolean()), p1_queue:queue()) - -> stanza() | none. -queue_find(Pred, Queue) -> - case p1_queue:out(Queue) of - {{value, {_, _, Pkt}}, Queue1} -> - case Pred(Pkt) of - true -> - Pkt; - false -> - queue_find(Pred, Queue1) - end; - {empty, _Queue1} -> - none - end. - -spec make_summary(binary(), xmpp_element() | xmlel() | none) -> xdata() | undefined. make_summary(Host, #message{from = From} = Pkt) -> @@ -630,12 +624,6 @@ make_summary(Host, #message{from = From} = Pkt) -> make_summary(_Host, _Pkt) -> undefined. --spec is_message_with_body(stanza()) -> boolean(). -is_message_with_body(#message{} = Msg) -> - get_body_text(Msg) /= none; -is_message_with_body(_Stanza) -> - false. - -spec get_body_text(message()) -> binary() | none. get_body_text(#message{body = Body} = Msg) -> case xmpp:get_text(Body) of diff --git a/src/mod_push_keepalive.erl b/src/mod_push_keepalive.erl index 7c1815c02..1122d84e1 100644 --- a/src/mod_push_keepalive.erl +++ b/src/mod_push_keepalive.erl @@ -130,18 +130,24 @@ unregister_hooks(Host) -> %%-------------------------------------------------------------------- -spec c2s_stanza(c2s_state(), xmpp_element() | xmlel(), term()) -> c2s_state(). c2s_stanza(#{push_enabled := true, mgmt_state := pending} = State, - _Pkt, _SendResult) -> - maybe_restore_resume_timeout(State); + Pkt, _SendResult) -> + case mod_push:is_message_with_body(Pkt) of + true -> + maybe_restore_resume_timeout(State); + false -> + State + end; c2s_stanza(State, _Pkt, _SendResult) -> State. -spec c2s_session_pending(c2s_state()) -> c2s_state(). c2s_session_pending(#{push_enabled := true, mgmt_queue := Queue} = State) -> - case p1_queue:len(Queue) of - 0 -> + case mod_stream_mgmt:queue_find(fun mod_push:is_message_with_body/1, + Queue) of + none -> State1 = maybe_adjust_resume_timeout(State), maybe_start_wakeup_timer(State1); - _ -> + _Msg -> State end; c2s_session_pending(State) -> diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl index a8aeaaef0..5ebd16cb0 100644 --- a/src/mod_stream_mgmt.erl +++ b/src/mod_stream_mgmt.erl @@ -33,8 +33,8 @@ c2s_unbinded_packet/2, c2s_closed/2, c2s_terminated/2, c2s_handle_send/3, c2s_handle_info/2, c2s_handle_call/3, c2s_handle_recv/3]). -%% adjust pending session timeout --export([get_resume_timeout/1, set_resume_timeout/2]). +%% adjust pending session timeout / access queue +-export([get_resume_timeout/1, set_resume_timeout/2, queue_find/2]). -include("ejabberd.hrl"). -include("xmpp.hrl"). @@ -304,7 +304,7 @@ c2s_terminated(State, _Reason) -> State. %%%=================================================================== -%%% Adjust pending session timeout +%%% Adjust pending session timeout / access queue %%%=================================================================== -spec get_resume_timeout(state()) -> non_neg_integer(). get_resume_timeout(#{mgmt_timeout := Timeout}) -> @@ -317,6 +317,21 @@ set_resume_timeout(State, Timeout) -> State1 = restart_pending_timer(State, Timeout), State1#{mgmt_timeout => Timeout}. +-spec queue_find(fun((stanza()) -> boolean()), p1_queue:queue()) + -> stanza() | none. +queue_find(Pred, Queue) -> + case p1_queue:out(Queue) of + {{value, {_, _, Pkt}}, Queue1} -> + case Pred(Pkt) of + true -> + Pkt; + false -> + queue_find(Pred, Queue1) + end; + {empty, _Queue1} -> + none + end. + %%%=================================================================== %%% Internal functions %%%===================================================================