25
1
mirror of https://github.com/processone/ejabberd.git synced 2024-12-22 17:28:25 +01:00

Allow flexible timeout format in session_expiry option of mod_mqtt

This commit is contained in:
Evgeny Khramtsov 2019-07-17 22:58:14 +03:00
parent d718b35d46
commit dd301306d3
2 changed files with 14 additions and 13 deletions

View File

@ -204,7 +204,7 @@ mod_options(Host) ->
[{match_retained_limit, 1000}, [{match_retained_limit, 1000},
{max_topic_depth, 8}, {max_topic_depth, 8},
{max_topic_aliases, 100}, {max_topic_aliases, 100},
{session_expiry, 300}, {session_expiry, timer:minutes(5)},
{max_queue, 5000}, {max_queue, 5000},
{access_subscribe, []}, {access_subscribe, []},
{access_publish, []}, {access_publish, []},
@ -219,7 +219,9 @@ mod_options(Host) ->
mod_opt_type(max_queue) -> mod_opt_type(max_queue) ->
econf:pos_int(unlimited); econf:pos_int(unlimited);
mod_opt_type(session_expiry) -> mod_opt_type(session_expiry) ->
econf:non_neg_int(); econf:either(
econf:int(0, 0),
econf:timeout(second));
mod_opt_type(match_retained_limit) -> mod_opt_type(match_retained_limit) ->
econf:pos_int(infinity); econf:pos_int(infinity);
mod_opt_type(max_topic_depth) -> mod_opt_type(max_topic_depth) ->

View File

@ -36,9 +36,9 @@
peername :: undefined | peername(), peername :: undefined | peername(),
timeout = infinity :: timer(), timeout = infinity :: timer(),
jid :: undefined | jid:jid(), jid :: undefined | jid:jid(),
session_expiry = 0 :: seconds(), session_expiry = 0 :: milli_seconds(),
will :: undefined | publish(), will :: undefined | publish(),
will_delay = 0 :: seconds(), will_delay = 0 :: milli_seconds(),
stop_reason :: undefined | error_reason(), stop_reason :: undefined | error_reason(),
acks = #{} :: acks(), acks = #{} :: acks(),
subscriptions = #{} :: subscriptions(), subscriptions = #{} :: subscriptions(),
@ -178,7 +178,7 @@ handle_call({get_state, Pid}, From, State) ->
?DEBUG("Transferring MQTT session state to ~p at ~s", [Pid, node(Pid)]), ?DEBUG("Transferring MQTT session state to ~p at ~s", [Pid, node(Pid)]),
Q1 = p1_queue:file_to_ram(State1#state.queue), Q1 = p1_queue:file_to_ram(State1#state.queue),
p1_server:reply(From, {ok, State1#state{queue = Q1}}), p1_server:reply(From, {ok, State1#state{queue = Q1}}),
SessionExpiry = timer:seconds(State1#state.session_expiry), SessionExpiry = State1#state.session_expiry,
State2 = set_timeout(State1, min(SessionExpiry, ?RELAY_TIMEOUT)), State2 = set_timeout(State1, min(SessionExpiry, ?RELAY_TIMEOUT)),
State3 = State2#state{queue = undefined, State3 = State2#state{queue = undefined,
stop_reason = {resumed, Pid}, stop_reason = {resumed, Pid},
@ -348,7 +348,7 @@ handle_packet(#disconnect{code = Code, properties = Props},
Reason = maps:get(reason_string, Props, <<>>), Reason = maps:get(reason_string, Props, <<>>),
Expiry = case maps:get(session_expiry_interval, Props, undefined) of Expiry = case maps:get(session_expiry_interval, Props, undefined) of
undefined -> State#state.session_expiry; undefined -> State#state.session_expiry;
SE -> min(SE, session_expiry(Server)) SE -> min(timer:seconds(SE), session_expiry(Server))
end, end,
State1 = State#state{session_expiry = Expiry}, State1 = State#state{session_expiry = Expiry},
State2 = case Code of State2 = case Code of
@ -407,13 +407,12 @@ stop(#state{session_expiry = SessExp} = State, Reason) ->
State2 = if WillDelay == 0 -> State2 = if WillDelay == 0 ->
publish_will(State1); publish_will(State1);
WillDelay < SessExp -> WillDelay < SessExp ->
erlang:start_timer( erlang:start_timer(WillDelay, self(), publish_will),
timer:seconds(WillDelay), self(), publish_will),
State1; State1;
true -> true ->
State1 State1
end, end,
State3 = set_timeout(State2, timer:seconds(SessExp)), State3 = set_timeout(State2, SessExp),
State4 = State3#state{stop_reason = Reason}, State4 = State3#state{stop_reason = Reason},
noreply(State4) noreply(State4)
end. end.
@ -656,7 +655,7 @@ set_session_properties(#state{version = Version,
properties = Props} = Pkt) -> properties = Props} = Pkt) ->
SEMin = case CleanStart of SEMin = case CleanStart of
false when Version == ?MQTT_VERSION_4 -> infinity; false when Version == ?MQTT_VERSION_4 -> infinity;
_ -> maps:get(session_expiry_interval, Props, 0) _ -> timer:seconds(maps:get(session_expiry_interval, Props, 0))
end, end,
SEConfig = session_expiry(Server), SEConfig = session_expiry(Server),
State1 = State#state{session_expiry = min(SEMin, SEConfig)}, State1 = State#state{session_expiry = min(SEMin, SEConfig)},
@ -671,7 +670,7 @@ set_will_properties(State, #connect{will = #publish{} = Will,
Ret -> Ret Ret -> Ret
end, end,
State#state{will = Will#publish{properties = Props1}, State#state{will = Will#publish{properties = Props1},
will_delay = WillDelay}; will_delay = timer:seconds(WillDelay)};
set_will_properties(State, _) -> set_will_properties(State, _) ->
State. State.
@ -683,7 +682,7 @@ get_connack_properties(#state{session_expiry = SessExp, jid = JID},
<<>> -> #{assigned_client_identifier => JID#jid.lresource}; <<>> -> #{assigned_client_identifier => JID#jid.lresource};
_ -> #{} _ -> #{}
end, end,
Props1#{session_expiry_interval => SessExp, Props1#{session_expiry_interval => SessExp div 1000,
shared_subscription_available => false, shared_subscription_available => false,
topic_alias_maximum => topic_alias_maximum(JID#jid.lserver), topic_alias_maximum => topic_alias_maximum(JID#jid.lserver),
server_keep_alive => KeepAlive}. server_keep_alive => KeepAlive}.
@ -1072,7 +1071,7 @@ queue_type(Host) ->
queue_limit(Host) -> queue_limit(Host) ->
mod_mqtt_opt:max_queue(Host). mod_mqtt_opt:max_queue(Host).
-spec session_expiry(binary()) -> seconds(). -spec session_expiry(binary()) -> milli_seconds().
session_expiry(Host) -> session_expiry(Host) ->
mod_mqtt_opt:session_expiry(Host). mod_mqtt_opt:session_expiry(Host).