diff --git a/src/mod_mqtt.erl b/src/mod_mqtt.erl index c76d910a4..e0e0a58e3 100644 --- a/src/mod_mqtt.erl +++ b/src/mod_mqtt.erl @@ -204,7 +204,7 @@ mod_options(Host) -> [{match_retained_limit, 1000}, {max_topic_depth, 8}, {max_topic_aliases, 100}, - {session_expiry, 300}, + {session_expiry, timer:minutes(5)}, {max_queue, 5000}, {access_subscribe, []}, {access_publish, []}, @@ -219,7 +219,9 @@ mod_options(Host) -> mod_opt_type(max_queue) -> econf:pos_int(unlimited); mod_opt_type(session_expiry) -> - econf:non_neg_int(); + econf:either( + econf:int(0, 0), + econf:timeout(second)); mod_opt_type(match_retained_limit) -> econf:pos_int(infinity); mod_opt_type(max_topic_depth) -> diff --git a/src/mod_mqtt_session.erl b/src/mod_mqtt_session.erl index 0ac5be895..1c5a44649 100644 --- a/src/mod_mqtt_session.erl +++ b/src/mod_mqtt_session.erl @@ -36,9 +36,9 @@ peername :: undefined | peername(), timeout = infinity :: timer(), jid :: undefined | jid:jid(), - session_expiry = 0 :: seconds(), + session_expiry = 0 :: milli_seconds(), will :: undefined | publish(), - will_delay = 0 :: seconds(), + will_delay = 0 :: milli_seconds(), stop_reason :: undefined | error_reason(), acks = #{} :: acks(), subscriptions = #{} :: subscriptions(), @@ -178,7 +178,7 @@ handle_call({get_state, Pid}, From, State) -> ?DEBUG("Transferring MQTT session state to ~p at ~s", [Pid, node(Pid)]), Q1 = p1_queue:file_to_ram(State1#state.queue), p1_server:reply(From, {ok, State1#state{queue = Q1}}), - SessionExpiry = timer:seconds(State1#state.session_expiry), + SessionExpiry = State1#state.session_expiry, State2 = set_timeout(State1, min(SessionExpiry, ?RELAY_TIMEOUT)), State3 = State2#state{queue = undefined, stop_reason = {resumed, Pid}, @@ -348,7 +348,7 @@ handle_packet(#disconnect{code = Code, properties = Props}, Reason = maps:get(reason_string, Props, <<>>), Expiry = case maps:get(session_expiry_interval, Props, undefined) of undefined -> State#state.session_expiry; - SE -> min(SE, session_expiry(Server)) + SE -> min(timer:seconds(SE), session_expiry(Server)) end, State1 = State#state{session_expiry = Expiry}, State2 = case Code of @@ -407,13 +407,12 @@ stop(#state{session_expiry = SessExp} = State, Reason) -> State2 = if WillDelay == 0 -> publish_will(State1); WillDelay < SessExp -> - erlang:start_timer( - timer:seconds(WillDelay), self(), publish_will), + erlang:start_timer(WillDelay, self(), publish_will), State1; true -> State1 end, - State3 = set_timeout(State2, timer:seconds(SessExp)), + State3 = set_timeout(State2, SessExp), State4 = State3#state{stop_reason = Reason}, noreply(State4) end. @@ -656,7 +655,7 @@ set_session_properties(#state{version = Version, properties = Props} = Pkt) -> SEMin = case CleanStart of false when Version == ?MQTT_VERSION_4 -> infinity; - _ -> maps:get(session_expiry_interval, Props, 0) + _ -> timer:seconds(maps:get(session_expiry_interval, Props, 0)) end, SEConfig = session_expiry(Server), State1 = State#state{session_expiry = min(SEMin, SEConfig)}, @@ -671,7 +670,7 @@ set_will_properties(State, #connect{will = #publish{} = Will, Ret -> Ret end, State#state{will = Will#publish{properties = Props1}, - will_delay = WillDelay}; + will_delay = timer:seconds(WillDelay)}; set_will_properties(State, _) -> State. @@ -683,7 +682,7 @@ get_connack_properties(#state{session_expiry = SessExp, jid = JID}, <<>> -> #{assigned_client_identifier => JID#jid.lresource}; _ -> #{} end, - Props1#{session_expiry_interval => SessExp, + Props1#{session_expiry_interval => SessExp div 1000, shared_subscription_available => false, topic_alias_maximum => topic_alias_maximum(JID#jid.lserver), server_keep_alive => KeepAlive}. @@ -1072,7 +1071,7 @@ queue_type(Host) -> queue_limit(Host) -> mod_mqtt_opt:max_queue(Host). --spec session_expiry(binary()) -> seconds(). +-spec session_expiry(binary()) -> milli_seconds(). session_expiry(Host) -> mod_mqtt_opt:session_expiry(Host).