diff --git a/include/mod_muc_room.hrl b/include/mod_muc_room.hrl index 46eb149bb..2f09e5a5c 100644 --- a/include/mod_muc_room.hrl +++ b/include/mod_muc_room.hrl @@ -24,7 +24,7 @@ -record(lqueue, { - queue = p1_queue:new() :: p1_queue:queue(), + queue = p1_queue:new() :: p1_queue:queue(lqueue_elem()), max = 0 :: integer() }). @@ -118,7 +118,7 @@ just_created = erlang:system_time(microsecond) :: true | integer(), activity = treap:empty() :: treap:treap(), room_shaper = none :: ejabberd_shaper:shaper(), - room_queue :: p1_queue:queue() | undefined + room_queue :: p1_queue:queue({message | presence, jid()}) | undefined }). -type users() :: #{ljid() => #user{}}. diff --git a/rebar.config b/rebar.config index f81e0821c..9809c28b6 100644 --- a/rebar.config +++ b/rebar.config @@ -19,7 +19,7 @@ %%%---------------------------------------------------------------------- {deps, [{lager, ".*", {git, "https://github.com/erlang-lager/lager", "3.6.10"}}, - {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", {tag, "1.0.15"}}}, + {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", "2887223"}}, {cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "1.0.19"}}}, {fast_tls, ".*", {git, "https://github.com/processone/fast_tls", {tag, "1.1.1"}}}, {stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.16"}}}, diff --git a/src/ejabberd_redis.erl b/src/ejabberd_redis.erl index 18a73414b..a9ceb1833 100644 --- a/src/ejabberd_redis.erl +++ b/src/ejabberd_redis.erl @@ -55,8 +55,9 @@ -record(state, {connection :: pid() | undefined, num :: pos_integer(), subscriptions = #{} :: subscriptions(), - pending_q :: p1_queue:queue()}). + pending_q :: queue()}). +-type queue() :: p1_queue:queue({{pid(), term()}, integer()}). -type subscriptions() :: #{binary() => [pid()]}. -type error_reason() :: binary() | timeout | disconnected | overloaded. -type redis_error() :: {error, error_reason()}. @@ -592,7 +593,7 @@ fsm_limit_opts() -> get_queue_type() -> ejabberd_option:redis_queue_type(). --spec flush_queue(p1_queue:queue()) -> p1_queue:queue(). +-spec flush_queue(queue()) -> queue(). flush_queue(Q) -> CurrTime = erlang:monotonic_time(millisecond), p1_queue:dropwhile( @@ -605,7 +606,7 @@ flush_queue(Q) -> true end, Q). --spec clean_queue(p1_queue:queue(), integer()) -> p1_queue:queue(). +-spec clean_queue(queue(), integer()) -> queue(). clean_queue(Q, CurrTime) -> Q1 = p1_queue:dropwhile( fun({_From, Time}) -> diff --git a/src/mod_mqtt_session.erl b/src/mod_mqtt_session.erl index 9d90ab0a8..dd7a7c47f 100644 --- a/src/mod_mqtt_session.erl +++ b/src/mod_mqtt_session.erl @@ -46,7 +46,7 @@ id = 0 :: non_neg_integer(), in_flight :: undefined | publish() | pubrel(), codec :: mqtt_codec:state(), - queue :: undefined | p1_queue:queue(), + queue :: undefined | p1_queue:queue(publish()), tls :: boolean()}). -type acks() :: #{non_neg_integer() => pubrec()}. diff --git a/src/mod_muc_room.erl b/src/mod_muc_room.erl index 415a7f5de..bbbd3a2ec 100644 --- a/src/mod_muc_room.erl +++ b/src/mod_muc_room.erl @@ -1540,7 +1540,7 @@ get_service_max_users(StateData) -> get_max_users_admin_threshold(StateData) -> mod_muc_opt:max_users_admin_threshold(StateData#state.server_host). --spec room_queue_new(binary(), ejabberd_shaper:shaper(), _) -> p1_queue:queue(). +-spec room_queue_new(binary(), ejabberd_shaper:shaper(), _) -> p1_queue:queue({message | presence, jid()}) | undefined. room_queue_new(ServerHost, Shaper, QueueType) -> HaveRoomShaper = Shaper /= none, HaveMessageShaper = mod_muc_opt:user_message_shaper(ServerHost) /= none, @@ -2121,7 +2121,7 @@ get_history(Nick, Packet, #state{history = History}) -> p1_queue:to_list(History#lqueue.queue) end. --spec filter_history(p1_queue:queue(), erlang:timestamp(), +-spec filter_history(p1_queue:queue(lqueue_elem()), erlang:timestamp(), binary(), muc_history()) -> [lqueue_elem()]. filter_history(Queue, Now, Nick, #muc_history{since = Since, @@ -2516,7 +2516,7 @@ lqueue_in(Item, #lqueue{queue = Q1, max = Max}) -> true -> #lqueue{queue = Q2, max = Max} end. --spec lqueue_cut(p1_queue:queue(), non_neg_integer()) -> p1_queue:queue(). +-spec lqueue_cut(p1_queue:queue(lqueue_elem()), non_neg_integer()) -> p1_queue:queue(lqueue_elem()). lqueue_cut(Q, 0) -> Q; lqueue_cut(Q, N) -> {_, Q1} = p1_queue:out(Q), diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl index 09af0c7cf..c151807ff 100644 --- a/src/mod_stream_mgmt.erl +++ b/src/mod_stream_mgmt.erl @@ -50,6 +50,7 @@ is_record(Pkt, sm_r)). -type state() :: ejabberd_c2s:state(). +-type queue() :: p1_queue:queue({non_neg_integer(), erlang:timestamp(), xmpp_element() | xmlel()}). -type error_reason() :: session_not_found | session_timed_out | session_is_dead | session_has_exited | session_was_killed | session_copy_timed_out | @@ -298,7 +299,7 @@ set_resume_timeout(State, Timeout) -> State1 = restart_pending_timer(State, Timeout), State1#{mgmt_timeout => Timeout}. --spec queue_find(fun((stanza()) -> boolean()), p1_queue:queue()) +-spec queue_find(fun((stanza()) -> boolean()), queue()) -> stanza() | none. queue_find(Pred, Queue) -> case p1_queue:out(Queue) of