mirror of
https://github.com/processone/ejabberd.git
synced 2024-06-08 21:43:07 +02:00
Optionally queue outgoing data
Support queueing outgoing stanzas and stream management elements for up to a configurable number of milliseconds (with a configurable queue size limit). This allows for batching up multiple XML elements into a single TCP packet in order to reduce the TCP/IP overhead. The feature is supported by ejabberd_c2s, ejabberd_s2s_out, and ejabberd_service. It can be enabled by configuring the max. number of milliseconds to queue an element (default: 0), and optionally the max. number of elements to queue (default: 10). This can be done by using the following new ejabberd_c2s/ejabberd_service listener options: - max_send_queue_size - max_send_queue_delay For ejabberd_c2s, the following global options can be specified instead: - c2s_max_send_queue_size - c2s_max_send_queue_delay For ejabberd_s2s_out, the following global options can be specified: - s2s_max_send_queue_size - s2s_max_send_queue_delay
This commit is contained in:
parent
a89b1f332d
commit
abfb62f27a
|
@ -70,7 +70,7 @@
|
|||
{stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.28"}}},
|
||||
{if_var_true, stun,
|
||||
{stun, ".*", {git, "https://github.com/processone/stun", {tag, "1.2.2"}}}},
|
||||
{xmpp, ".*", {git, "https://github.com/processone/xmpp", {tag, "1.5.8"}}},
|
||||
{xmpp, ".*", {git, "https://github.com/weiss/xmpp", {branch, "feature/send-queue"}}},
|
||||
{yconf, ".*", {git, "https://github.com/processone/yconf", {tag, "1.0.13"}}}
|
||||
]}.
|
||||
|
||||
|
|
|
@ -555,6 +555,18 @@ init([State, Opts]) ->
|
|||
TLSVerify = proplists:get_bool(tls_verify, Opts),
|
||||
Zlib = proplists:get_bool(zlib, Opts),
|
||||
Timeout = ejabberd_option:negotiation_timeout(),
|
||||
MaxQSize = case ejabberd_option:c2s_max_send_queue_size() of
|
||||
undefined ->
|
||||
proplists:get_value(max_send_queue_size, Opts, 10);
|
||||
C2SMaxQSize ->
|
||||
C2SMaxQSize
|
||||
end,
|
||||
MaxQDelay = case ejabberd_option:c2s_max_send_queue_delay() of
|
||||
undefined ->
|
||||
proplists:get_value(max_send_queue_delay, Opts, 0);
|
||||
C2SMaxQDelay ->
|
||||
C2SMaxQDelay
|
||||
end,
|
||||
State1 = State#{tls_options => TLSOpts2,
|
||||
tls_required => TLSRequired,
|
||||
tls_enabled => TLSEnabled,
|
||||
|
@ -567,7 +579,8 @@ init([State, Opts]) ->
|
|||
access => Access,
|
||||
shaper => Shaper},
|
||||
State2 = xmpp_stream_in:set_timeout(State1, Timeout),
|
||||
ejabberd_hooks:run_fold(c2s_init, {ok, State2}, [Opts]).
|
||||
State3 = xmpp_stream_in:configure_queue(State2, MaxQSize, MaxQDelay),
|
||||
ejabberd_hooks:run_fold(c2s_init, {ok, State3}, [Opts]).
|
||||
|
||||
handle_call(get_presence, From, #{jid := JID} = State) ->
|
||||
Pres = case maps:get(pres_last, State, error) of
|
||||
|
@ -1022,4 +1035,6 @@ listen_options() ->
|
|||
{tls_verify, false},
|
||||
{zlib, false},
|
||||
{max_stanza_size, infinity},
|
||||
{max_send_queue_size, 10},
|
||||
{max_send_queue_delay, 0},
|
||||
{max_fsm_queue, 10000}].
|
||||
|
|
|
@ -680,6 +680,10 @@ listen_opt_type(tls) ->
|
|||
econf:bool();
|
||||
listen_opt_type(max_stanza_size) ->
|
||||
econf:pos_int(infinity);
|
||||
listen_opt_type(max_send_queue_size) ->
|
||||
econf:non_neg_int();
|
||||
listen_opt_type(max_send_queue_delay) ->
|
||||
econf:non_neg_int();
|
||||
listen_opt_type(max_fsm_queue) ->
|
||||
econf:pos_int();
|
||||
listen_opt_type(send_timeout) ->
|
||||
|
|
|
@ -22,6 +22,8 @@
|
|||
-export([c2s_cafile/0, c2s_cafile/1]).
|
||||
-export([c2s_ciphers/0, c2s_ciphers/1]).
|
||||
-export([c2s_dhfile/0, c2s_dhfile/1]).
|
||||
-export([c2s_max_send_queue_delay/0]).
|
||||
-export([c2s_max_send_queue_size/0]).
|
||||
-export([c2s_protocol_options/0, c2s_protocol_options/1]).
|
||||
-export([c2s_tls_compression/0, c2s_tls_compression/1]).
|
||||
-export([ca_file/0]).
|
||||
|
@ -124,6 +126,8 @@
|
|||
-export([s2s_dns_retries/0, s2s_dns_retries/1]).
|
||||
-export([s2s_dns_timeout/0, s2s_dns_timeout/1]).
|
||||
-export([s2s_max_retry_delay/0]).
|
||||
-export([s2s_max_send_queue_delay/0, s2s_max_send_queue_delay/1]).
|
||||
-export([s2s_max_send_queue_size/0, s2s_max_send_queue_size/1]).
|
||||
-export([s2s_protocol_options/0, s2s_protocol_options/1]).
|
||||
-export([s2s_queue_type/0, s2s_queue_type/1]).
|
||||
-export([s2s_timeout/0, s2s_timeout/1]).
|
||||
|
@ -275,6 +279,14 @@ c2s_dhfile() ->
|
|||
c2s_dhfile(Host) ->
|
||||
ejabberd_config:get_option({c2s_dhfile, Host}).
|
||||
|
||||
-spec c2s_max_send_queue_delay() -> 'undefined' | non_neg_integer().
|
||||
c2s_max_send_queue_delay() ->
|
||||
ejabberd_config:get_option({c2s_max_send_queue_delay, global}).
|
||||
|
||||
-spec c2s_max_send_queue_size() -> 'undefined' | non_neg_integer().
|
||||
c2s_max_send_queue_size() ->
|
||||
ejabberd_config:get_option({c2s_max_send_queue_size, global}).
|
||||
|
||||
-spec c2s_protocol_options() -> 'undefined' | binary().
|
||||
c2s_protocol_options() ->
|
||||
c2s_protocol_options(global).
|
||||
|
@ -851,6 +863,20 @@ s2s_dns_timeout(Host) ->
|
|||
s2s_max_retry_delay() ->
|
||||
ejabberd_config:get_option({s2s_max_retry_delay, global}).
|
||||
|
||||
-spec s2s_max_send_queue_delay() -> 'undefined' | non_neg_integer().
|
||||
s2s_max_send_queue_delay() ->
|
||||
s2s_max_send_queue_delay(global).
|
||||
-spec s2s_max_send_queue_delay(global | binary()) -> 'undefined' | non_neg_integer().
|
||||
s2s_max_send_queue_delay(Host) ->
|
||||
ejabberd_config:get_option({s2s_max_send_queue_delay, Host}).
|
||||
|
||||
-spec s2s_max_send_queue_size() -> 'undefined' | non_neg_integer().
|
||||
s2s_max_send_queue_size() ->
|
||||
s2s_max_send_queue_size(global).
|
||||
-spec s2s_max_send_queue_size(global | binary()) -> 'undefined' | non_neg_integer().
|
||||
s2s_max_send_queue_size(Host) ->
|
||||
ejabberd_config:get_option({s2s_max_send_queue_size, Host}).
|
||||
|
||||
-spec s2s_protocol_options() -> 'undefined' | binary().
|
||||
s2s_protocol_options() ->
|
||||
s2s_protocol_options(global).
|
||||
|
|
|
@ -95,6 +95,10 @@ opt_type(c2s_ciphers) ->
|
|||
end;
|
||||
opt_type(c2s_dhfile) ->
|
||||
econf:file();
|
||||
opt_type(c2s_max_send_queue_delay) ->
|
||||
econf:non_neg_int();
|
||||
opt_type(c2s_max_send_queue_size) ->
|
||||
econf:non_neg_int();
|
||||
opt_type(c2s_protocol_options) ->
|
||||
econf:and_then(
|
||||
econf:list(econf:binary(), [unique]),
|
||||
|
@ -337,6 +341,10 @@ opt_type(s2s_dns_timeout) ->
|
|||
econf:timeout(second, infinity);
|
||||
opt_type(s2s_max_retry_delay) ->
|
||||
econf:timeout(second);
|
||||
opt_type(s2s_max_send_queue_delay) ->
|
||||
econf:non_neg_int();
|
||||
opt_type(s2s_max_send_queue_size) ->
|
||||
econf:non_neg_int();
|
||||
opt_type(s2s_protocol_options) ->
|
||||
opt_type(c2s_protocol_options);
|
||||
opt_type(s2s_queue_type) ->
|
||||
|
@ -527,6 +535,8 @@ options() ->
|
|||
{c2s_cafile, undefined},
|
||||
{c2s_ciphers, undefined},
|
||||
{c2s_dhfile, undefined},
|
||||
{c2s_max_send_queue_delay, undefined},
|
||||
{c2s_max_send_queue_size, undefined},
|
||||
{c2s_protocol_options, undefined},
|
||||
{c2s_tls_compression, undefined},
|
||||
{ca_file, iolist_to_binary(pkix:get_cafile())},
|
||||
|
@ -635,6 +645,8 @@ options() ->
|
|||
{s2s_dns_retries, 2},
|
||||
{s2s_dns_timeout, timer:seconds(10)},
|
||||
{s2s_max_retry_delay, timer:seconds(300)},
|
||||
{s2s_max_send_queue_delay, 0},
|
||||
{s2s_max_send_queue_size, 10},
|
||||
{s2s_protocol_options, undefined},
|
||||
{s2s_queue_type,
|
||||
fun(Host) -> ejabberd_config:get_option({queue_type, Host}) end},
|
||||
|
@ -705,6 +717,8 @@ globals() ->
|
|||
auth_cache_life_time,
|
||||
auth_cache_missed,
|
||||
auth_cache_size,
|
||||
c2s_max_send_queue_delay,
|
||||
c2s_max_send_queue_size,
|
||||
ca_file,
|
||||
captcha_cmd,
|
||||
captcha_host,
|
||||
|
@ -752,6 +766,8 @@ globals() ->
|
|||
router_use_cache,
|
||||
rpc_timeout,
|
||||
s2s_max_retry_delay,
|
||||
c2s_max_send_queue_delay,
|
||||
c2s_max_send_queue_size,
|
||||
shaper,
|
||||
sm_cache_life_time,
|
||||
sm_cache_missed,
|
||||
|
|
|
@ -430,6 +430,31 @@ doc() ->
|
|||
"dhparam -out dh.pem 2048\". If this option is not specified, "
|
||||
"2048-bit MODP Group with 256-bit Prime Order Subgroup will be "
|
||||
"used as defined in RFC5114 Section 2.3.")}},
|
||||
{c2s_max_send_queue_delay,
|
||||
#{value => ?T("non_neg_integer()"),
|
||||
desc =>
|
||||
[?T("Specifies the maximum number of milliseconds to queue an "
|
||||
"outgoing stanza or stream management element. Setting this "
|
||||
"option to a positive (non-zero) number allows for batching up "
|
||||
"multiple XML elements into a single TCP packet in order to "
|
||||
"reduce the TCP/IP overhead. The default value is '0', which "
|
||||
"disables queueing."), "",
|
||||
?T("To set a specific file per listener, use the listener's "
|
||||
"http://../listen-options/#max_send_queue_delay[max_send_queue_delay] "
|
||||
"option. Please note that 'c2s_max_send_queue_delay' overrides "
|
||||
"the listener's 'max_send_queue_delay' option."), ""]}},
|
||||
{c2s_max_send_queue_size,
|
||||
#{value => ?T("non_neg_integer()"),
|
||||
desc =>
|
||||
[?T("Specifies the maximum number of elements to add to the send "
|
||||
"queue. The default value is '10'. Note that this option has "
|
||||
"no effect if 'max_send_queue_delay' isn't set to a value "
|
||||
"larger than '0'. Setting this option to '0' disables "
|
||||
"queueing."), "",
|
||||
?T("To set a specific file per listener, use the listener's "
|
||||
"http://../listen-options/#max_send_queue_size[max_send_queue_size] "
|
||||
"option. Please note that 'c2s_max_send_queue_size' overrides "
|
||||
"the listener's 'max_send_queue_size' option."), ""]}},
|
||||
{c2s_protocol_options,
|
||||
#{value => "[Option, ...]",
|
||||
desc =>
|
||||
|
@ -1118,6 +1143,31 @@ doc() ->
|
|||
"dhparam -out dh.pem 2048\". If this option is not specified, "
|
||||
"2048-bit MODP Group with 256-bit Prime Order Subgroup will be "
|
||||
"used as defined in RFC5114 Section 2.3.")}},
|
||||
{s2s_max_send_queue_delay,
|
||||
#{value => ?T("non_neg_integer()"),
|
||||
desc =>
|
||||
[?T("Specifies the maximum number of milliseconds to queue an "
|
||||
"outgoing stanza or stream management element. Setting this "
|
||||
"option to a positive (non-zero) number allows for batching up "
|
||||
"multiple XML elements into a single TCP packet in order to "
|
||||
"reduce the TCP/IP overhead. The default value is '0', which "
|
||||
"disables queueing."), "",
|
||||
?T("To set a specific file per listener, use the listener's "
|
||||
"http://../listen-options/#max_send_queue_delay[max_send_queue_delay] "
|
||||
"option. Please note that 's2s_max_send_queue_delay' overrides "
|
||||
"the listener's 'max_send_queue_delay' option."), ""]}},
|
||||
{s2s_max_send_queue_size,
|
||||
#{value => ?T("non_neg_integer()"),
|
||||
desc =>
|
||||
[?T("Specifies the maximum number of elements to add to the send "
|
||||
"queue. The default value is '10'. Note that this option has "
|
||||
"no effect if 'max_send_queue_delay' isn't set to a value "
|
||||
"larger than '0'. Setting this option to '0' disables "
|
||||
"queueing."), "",
|
||||
?T("To set a specific file per listener, use the listener's "
|
||||
"http://../listen-options/#max_send_queue_size[max_send_queue_size] "
|
||||
"option. Please note that 's2s_max_send_queue_size' overrides "
|
||||
"the listener's 'max_send_queue_size' option."), ""]}},
|
||||
{s2s_protocol_options,
|
||||
#{value => "[Option, ...]",
|
||||
desc =>
|
||||
|
|
|
@ -279,6 +279,8 @@ init([#{server := LServer, remote_server := RServer} = State, Opts]) ->
|
|||
false -> unlimited
|
||||
end,
|
||||
Timeout = ejabberd_option:negotiation_timeout(),
|
||||
MaxQSize = ejabberd_option:s2s_max_send_queue_size(),
|
||||
MaxQDelay = ejabberd_option:s2s_max_send_queue_delay(),
|
||||
State1 = State#{on_route => queue,
|
||||
queue => p1_queue:new(QueueType, QueueLimit),
|
||||
xmlns => ?NS_SERVER,
|
||||
|
@ -286,9 +288,10 @@ init([#{server := LServer, remote_server := RServer} = State, Opts]) ->
|
|||
server_host => ServerHost,
|
||||
shaper => none},
|
||||
State2 = xmpp_stream_out:set_timeout(State1, Timeout),
|
||||
State3 = xmpp_stream_out:configure_queue(State2, MaxQSize, MaxQDelay),
|
||||
?INFO_MSG("Outbound s2s connection started: ~ts -> ~ts",
|
||||
[LServer, RServer]),
|
||||
ejabberd_hooks:run_fold(s2s_out_init, ServerHost, {ok, State2}, [Opts]).
|
||||
ejabberd_hooks:run_fold(s2s_out_init, ServerHost, {ok, State3}, [Opts]).
|
||||
|
||||
handle_call(Request, From, #{server_host := ServerHost} = State) ->
|
||||
ejabberd_hooks:run_fold(s2s_out_handle_call, ServerHost, State, [Request, From]).
|
||||
|
|
|
@ -117,10 +117,13 @@ init([State, Opts]) ->
|
|||
true -> TLSOpts1
|
||||
end,
|
||||
GlobalRoutes = proplists:get_value(global_routes, Opts, true),
|
||||
MaxQSize = proplists:get_value(max_send_queue_size, Opts, 10),
|
||||
MaxQDelay = proplists:get_value(max_send_queue_delay, Opts, 0),
|
||||
Timeout = ejabberd_option:negotiation_timeout(),
|
||||
State1 = xmpp_stream_in:change_shaper(State, ejabberd_shaper:new(Shaper)),
|
||||
State2 = xmpp_stream_in:set_timeout(State1, Timeout),
|
||||
State3 = State2#{access => Access,
|
||||
State3 = xmpp_stream_in:configure_queue(State2, MaxQSize, MaxQDelay),
|
||||
State4 = State3#{access => Access,
|
||||
xmlns => ?NS_COMPONENT,
|
||||
lang => ejabberd_option:language(),
|
||||
server => ejabberd_config:get_myname(),
|
||||
|
@ -129,7 +132,7 @@ init([State, Opts]) ->
|
|||
tls_options => TLSOpts,
|
||||
global_routes => GlobalRoutes,
|
||||
check_from => CheckFrom},
|
||||
ejabberd_hooks:run_fold(component_init, {ok, State3}, [Opts]).
|
||||
ejabberd_hooks:run_fold(component_init, {ok, State4}, [Opts]).
|
||||
|
||||
handle_stream_start(_StreamStart,
|
||||
#{remote_server := RemoteServer,
|
||||
|
@ -302,6 +305,8 @@ listen_options() ->
|
|||
{tls, false},
|
||||
{tls_compression, false},
|
||||
{max_stanza_size, infinity},
|
||||
{max_send_queue_size, 10},
|
||||
{max_send_queue_delay, 0},
|
||||
{max_fsm_queue, 10000},
|
||||
{password, undefined},
|
||||
{hosts, []},
|
||||
|
|
Loading…
Reference in New Issue
Block a user