This commit is contained in:
Holger Weiß 2022-06-26 21:51:27 +02:00 committed by GitHub
commit 047e930f5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 124 additions and 5 deletions

View File

@ -70,7 +70,7 @@
{stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.28"}}},
{if_var_true, stun,
{stun, ".*", {git, "https://github.com/processone/stun", {tag, "1.2.2"}}}},
{xmpp, ".*", {git, "https://github.com/processone/xmpp", {tag, "1.5.8"}}},
{xmpp, ".*", {git, "https://github.com/weiss/xmpp", {branch, "feature/send-queue"}}},
{yconf, ".*", {git, "https://github.com/processone/yconf", {tag, "1.0.13"}}}
]}.

View File

@ -555,6 +555,18 @@ init([State, Opts]) ->
TLSVerify = proplists:get_bool(tls_verify, Opts),
Zlib = proplists:get_bool(zlib, Opts),
Timeout = ejabberd_option:negotiation_timeout(),
MaxQSize = case ejabberd_option:c2s_max_send_queue_size() of
undefined ->
proplists:get_value(max_send_queue_size, Opts, 10);
C2SMaxQSize ->
C2SMaxQSize
end,
MaxQDelay = case ejabberd_option:c2s_max_send_queue_delay() of
undefined ->
proplists:get_value(max_send_queue_delay, Opts, 0);
C2SMaxQDelay ->
C2SMaxQDelay
end,
State1 = State#{tls_options => TLSOpts2,
tls_required => TLSRequired,
tls_enabled => TLSEnabled,
@ -567,7 +579,8 @@ init([State, Opts]) ->
access => Access,
shaper => Shaper},
State2 = xmpp_stream_in:set_timeout(State1, Timeout),
ejabberd_hooks:run_fold(c2s_init, {ok, State2}, [Opts]).
State3 = xmpp_stream_in:configure_queue(State2, MaxQSize, MaxQDelay),
ejabberd_hooks:run_fold(c2s_init, {ok, State3}, [Opts]).
handle_call(get_presence, From, #{jid := JID} = State) ->
Pres = case maps:get(pres_last, State, error) of
@ -1022,4 +1035,6 @@ listen_options() ->
{tls_verify, false},
{zlib, false},
{max_stanza_size, infinity},
{max_send_queue_size, 10},
{max_send_queue_delay, 0},
{max_fsm_queue, 10000}].

View File

@ -688,6 +688,10 @@ listen_opt_type(tls) ->
econf:bool();
listen_opt_type(max_stanza_size) ->
econf:pos_int(infinity);
listen_opt_type(max_send_queue_size) ->
econf:non_neg_int();
listen_opt_type(max_send_queue_delay) ->
econf:non_neg_int();
listen_opt_type(max_fsm_queue) ->
econf:pos_int();
listen_opt_type(send_timeout) ->

View File

@ -22,6 +22,8 @@
-export([c2s_cafile/0, c2s_cafile/1]).
-export([c2s_ciphers/0, c2s_ciphers/1]).
-export([c2s_dhfile/0, c2s_dhfile/1]).
-export([c2s_max_send_queue_delay/0]).
-export([c2s_max_send_queue_size/0]).
-export([c2s_protocol_options/0, c2s_protocol_options/1]).
-export([c2s_tls_compression/0, c2s_tls_compression/1]).
-export([ca_file/0]).
@ -124,6 +126,8 @@
-export([s2s_dns_retries/0, s2s_dns_retries/1]).
-export([s2s_dns_timeout/0, s2s_dns_timeout/1]).
-export([s2s_max_retry_delay/0]).
-export([s2s_max_send_queue_delay/0, s2s_max_send_queue_delay/1]).
-export([s2s_max_send_queue_size/0, s2s_max_send_queue_size/1]).
-export([s2s_protocol_options/0, s2s_protocol_options/1]).
-export([s2s_queue_type/0, s2s_queue_type/1]).
-export([s2s_timeout/0, s2s_timeout/1]).
@ -275,6 +279,14 @@ c2s_dhfile() ->
c2s_dhfile(Host) ->
ejabberd_config:get_option({c2s_dhfile, Host}).
-spec c2s_max_send_queue_delay() -> 'undefined' | non_neg_integer().
c2s_max_send_queue_delay() ->
ejabberd_config:get_option({c2s_max_send_queue_delay, global}).
-spec c2s_max_send_queue_size() -> 'undefined' | non_neg_integer().
c2s_max_send_queue_size() ->
ejabberd_config:get_option({c2s_max_send_queue_size, global}).
-spec c2s_protocol_options() -> 'undefined' | binary().
c2s_protocol_options() ->
c2s_protocol_options(global).
@ -851,6 +863,20 @@ s2s_dns_timeout(Host) ->
s2s_max_retry_delay() ->
ejabberd_config:get_option({s2s_max_retry_delay, global}).
-spec s2s_max_send_queue_delay() -> 'undefined' | non_neg_integer().
s2s_max_send_queue_delay() ->
s2s_max_send_queue_delay(global).
-spec s2s_max_send_queue_delay(global | binary()) -> 'undefined' | non_neg_integer().
s2s_max_send_queue_delay(Host) ->
ejabberd_config:get_option({s2s_max_send_queue_delay, Host}).
-spec s2s_max_send_queue_size() -> 'undefined' | non_neg_integer().
s2s_max_send_queue_size() ->
s2s_max_send_queue_size(global).
-spec s2s_max_send_queue_size(global | binary()) -> 'undefined' | non_neg_integer().
s2s_max_send_queue_size(Host) ->
ejabberd_config:get_option({s2s_max_send_queue_size, Host}).
-spec s2s_protocol_options() -> 'undefined' | binary().
s2s_protocol_options() ->
s2s_protocol_options(global).

View File

@ -95,6 +95,10 @@ opt_type(c2s_ciphers) ->
end;
opt_type(c2s_dhfile) ->
econf:file();
opt_type(c2s_max_send_queue_delay) ->
econf:non_neg_int();
opt_type(c2s_max_send_queue_size) ->
econf:non_neg_int();
opt_type(c2s_protocol_options) ->
econf:and_then(
econf:list(econf:binary(), [unique]),
@ -337,6 +341,10 @@ opt_type(s2s_dns_timeout) ->
econf:timeout(second, infinity);
opt_type(s2s_max_retry_delay) ->
econf:timeout(second);
opt_type(s2s_max_send_queue_delay) ->
econf:non_neg_int();
opt_type(s2s_max_send_queue_size) ->
econf:non_neg_int();
opt_type(s2s_protocol_options) ->
opt_type(c2s_protocol_options);
opt_type(s2s_queue_type) ->
@ -527,6 +535,8 @@ options() ->
{c2s_cafile, undefined},
{c2s_ciphers, undefined},
{c2s_dhfile, undefined},
{c2s_max_send_queue_delay, undefined},
{c2s_max_send_queue_size, undefined},
{c2s_protocol_options, undefined},
{c2s_tls_compression, undefined},
{ca_file, iolist_to_binary(pkix:get_cafile())},
@ -635,6 +645,8 @@ options() ->
{s2s_dns_retries, 2},
{s2s_dns_timeout, timer:seconds(10)},
{s2s_max_retry_delay, timer:seconds(300)},
{s2s_max_send_queue_delay, 0},
{s2s_max_send_queue_size, 10},
{s2s_protocol_options, undefined},
{s2s_queue_type,
fun(Host) -> ejabberd_config:get_option({queue_type, Host}) end},
@ -705,6 +717,8 @@ globals() ->
auth_cache_life_time,
auth_cache_missed,
auth_cache_size,
c2s_max_send_queue_delay,
c2s_max_send_queue_size,
ca_file,
captcha_cmd,
captcha_host,
@ -752,6 +766,8 @@ globals() ->
router_use_cache,
rpc_timeout,
s2s_max_retry_delay,
c2s_max_send_queue_delay,
c2s_max_send_queue_size,
shaper,
sm_cache_life_time,
sm_cache_missed,

View File

@ -430,6 +430,31 @@ doc() ->
"dhparam -out dh.pem 2048\". If this option is not specified, "
"2048-bit MODP Group with 256-bit Prime Order Subgroup will be "
"used as defined in RFC5114 Section 2.3.")}},
{c2s_max_send_queue_delay,
#{value => ?T("non_neg_integer()"),
desc =>
[?T("Specifies the maximum number of milliseconds to queue an "
"outgoing stanza or stream management element. Setting this "
"option to a positive (non-zero) number allows for batching up "
"multiple XML elements into a single TCP packet in order to "
"reduce the TCP/IP overhead. The default value is '0', which "
"disables queueing."), "",
?T("To set a specific file per listener, use the listener's "
"http://../listen-options/#max_send_queue_delay[max_send_queue_delay] "
"option. Please note that 'c2s_max_send_queue_delay' overrides "
"the listener's 'max_send_queue_delay' option."), ""]}},
{c2s_max_send_queue_size,
#{value => ?T("non_neg_integer()"),
desc =>
[?T("Specifies the maximum number of elements to add to the send "
"queue. The default value is '10'. Note that this option has "
"no effect if 'max_send_queue_delay' isn't set to a value "
"larger than '0'. Setting this option to '0' disables "
"queueing."), "",
?T("To set a specific file per listener, use the listener's "
"http://../listen-options/#max_send_queue_size[max_send_queue_size] "
"option. Please note that 'c2s_max_send_queue_size' overrides "
"the listener's 'max_send_queue_size' option."), ""]}},
{c2s_protocol_options,
#{value => "[Option, ...]",
desc =>
@ -1118,6 +1143,31 @@ doc() ->
"dhparam -out dh.pem 2048\". If this option is not specified, "
"2048-bit MODP Group with 256-bit Prime Order Subgroup will be "
"used as defined in RFC5114 Section 2.3.")}},
{s2s_max_send_queue_delay,
#{value => ?T("non_neg_integer()"),
desc =>
[?T("Specifies the maximum number of milliseconds to queue an "
"outgoing stanza or stream management element. Setting this "
"option to a positive (non-zero) number allows for batching up "
"multiple XML elements into a single TCP packet in order to "
"reduce the TCP/IP overhead. The default value is '0', which "
"disables queueing."), "",
?T("To set a specific file per listener, use the listener's "
"http://../listen-options/#max_send_queue_delay[max_send_queue_delay] "
"option. Please note that 's2s_max_send_queue_delay' overrides "
"the listener's 'max_send_queue_delay' option."), ""]}},
{s2s_max_send_queue_size,
#{value => ?T("non_neg_integer()"),
desc =>
[?T("Specifies the maximum number of elements to add to the send "
"queue. The default value is '10'. Note that this option has "
"no effect if 'max_send_queue_delay' isn't set to a value "
"larger than '0'. Setting this option to '0' disables "
"queueing."), "",
?T("To set a specific file per listener, use the listener's "
"http://../listen-options/#max_send_queue_size[max_send_queue_size] "
"option. Please note that 's2s_max_send_queue_size' overrides "
"the listener's 'max_send_queue_size' option."), ""]}},
{s2s_protocol_options,
#{value => "[Option, ...]",
desc =>

View File

@ -279,6 +279,8 @@ init([#{server := LServer, remote_server := RServer} = State, Opts]) ->
false -> unlimited
end,
Timeout = ejabberd_option:negotiation_timeout(),
MaxQSize = ejabberd_option:s2s_max_send_queue_size(),
MaxQDelay = ejabberd_option:s2s_max_send_queue_delay(),
State1 = State#{on_route => queue,
queue => p1_queue:new(QueueType, QueueLimit),
xmlns => ?NS_SERVER,
@ -286,9 +288,10 @@ init([#{server := LServer, remote_server := RServer} = State, Opts]) ->
server_host => ServerHost,
shaper => none},
State2 = xmpp_stream_out:set_timeout(State1, Timeout),
State3 = xmpp_stream_out:configure_queue(State2, MaxQSize, MaxQDelay),
?INFO_MSG("Outbound s2s connection started: ~ts -> ~ts",
[LServer, RServer]),
ejabberd_hooks:run_fold(s2s_out_init, ServerHost, {ok, State2}, [Opts]).
ejabberd_hooks:run_fold(s2s_out_init, ServerHost, {ok, State3}, [Opts]).
handle_call(Request, From, #{server_host := ServerHost} = State) ->
ejabberd_hooks:run_fold(s2s_out_handle_call, ServerHost, State, [Request, From]).

View File

@ -117,10 +117,13 @@ init([State, Opts]) ->
true -> TLSOpts1
end,
GlobalRoutes = proplists:get_value(global_routes, Opts, true),
MaxQSize = proplists:get_value(max_send_queue_size, Opts, 10),
MaxQDelay = proplists:get_value(max_send_queue_delay, Opts, 0),
Timeout = ejabberd_option:negotiation_timeout(),
State1 = xmpp_stream_in:change_shaper(State, ejabberd_shaper:new(Shaper)),
State2 = xmpp_stream_in:set_timeout(State1, Timeout),
State3 = State2#{access => Access,
State3 = xmpp_stream_in:configure_queue(State2, MaxQSize, MaxQDelay),
State4 = State3#{access => Access,
xmlns => ?NS_COMPONENT,
lang => ejabberd_option:language(),
server => ejabberd_config:get_myname(),
@ -129,7 +132,7 @@ init([State, Opts]) ->
tls_options => TLSOpts,
global_routes => GlobalRoutes,
check_from => CheckFrom},
ejabberd_hooks:run_fold(component_init, {ok, State3}, [Opts]).
ejabberd_hooks:run_fold(component_init, {ok, State4}, [Opts]).
handle_stream_start(_StreamStart,
#{remote_server := RemoteServer,
@ -302,6 +305,8 @@ listen_options() ->
{tls, false},
{tls_compression, false},
{max_stanza_size, infinity},
{max_send_queue_size, 10},
{max_send_queue_delay, 0},
{max_fsm_queue, 10000},
{password, undefined},
{hosts, []},