Skip to content

Commit 42927f0

Browse files
committed
Make all quorum queue Ra configuration parameters explicit and configurable
Add support for configuring all quorum queue Ra system parameters via rabbitmq.conf using the quorum_queue.* namespace. This allows operators to fine-tune Raft log and WAL behavior specifically for quorum queues. New quorum_queue.* configuration options: - quorum_queue.wal_compute_checksums (default: true) - quorum_queue.segment_compute_checksums (default: true) - quorum_queue.max_append_entries_rpc_batch_size (default: 16) - quorum_queue.compress_mem_tables (default: true) - quorum_queue.segment_max_size_bytes (default: 64 MB) - quorum_queue.segment_max_entries (default: 4096) - quorum_queue.wal_max_size_bytes (default: 512 MB) - quorum_queue.wal_max_entries (default: 500,000) - quorum_queue.wal_max_batch_size (default: 4096) - quorum_queue.snapshot_chunk_size (default: 1 MB) Each quorum_queue.* setting has a fallback to the corresponding raft.* setting for backward compatibility with existing configurations. All configuration is mapped to rabbit application environment variables and explicitly applied to the quorum_queues Ra system during initialization, making the data flow transparent and maintainable. Add macros for all default values that match the RabbitMQ prelaunch defaults, centralizing configuration defaults in one place for easier maintenance. Simplify WalMaxEntries handling to use application:get_env with macro default instead of complex case statement checking DefaultConfig.
1 parent eacdd0c commit 42927f0

4 files changed

Lines changed: 159 additions & 26 deletions

File tree

deps/rabbit/docs/rabbitmq.conf.example

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,10 +487,46 @@
487487
## when there are multiple publishers sending to the same quorum queue.
488488
# quorum_queue.commands_soft_limit = 32
489489

490+
## Enables or disables WAL checksums for quorum queues.
491+
##
492+
# quorum_queue.wal_compute_checksums = true
493+
494+
## Enables or disables segment file checksums for quorum queues.
495+
##
496+
# quorum_queue.segment_compute_checksums = true
497+
498+
## Sets the maximum batch size for append entries RPC for quorum queues.
499+
##
500+
# quorum_queue.max_append_entries_rpc_batch_size = 16
501+
502+
## Enables or disables memory table compression for quorum queues.
503+
##
504+
# quorum_queue.compress_mem_tables = true
505+
490506
## Sets the maximum size of Raft log segment files for quorum queues.
491507
##
492508
# quorum_queue.segment_max_size_bytes = 64000000
493509

510+
## Sets the maximum number of entries per Raft log segment for quorum queues.
511+
##
512+
# quorum_queue.segment_max_entries = 4096
513+
514+
## Sets the maximum size of the Write-Ahead Log (WAL) for quorum queues.
515+
##
516+
# quorum_queue.wal_max_size_bytes = 536870912
517+
518+
## Sets the maximum number of entries in the Write-Ahead Log (WAL) for quorum queues.
519+
##
520+
# quorum_queue.wal_max_entries = 500000
521+
522+
## Sets the maximum batch size for WAL writes for quorum queues.
523+
##
524+
# quorum_queue.wal_max_batch_size = 4096
525+
526+
## Sets the maximum size of snapshot chunks for quorum queues.
527+
##
528+
# quorum_queue.snapshot_chunk_size = 1000000
529+
494530
## Changes classic queue storage implementation version.
495531
## As of 4.0.0, version 2 is the default and this is a forward compatibility setting,
496532
## that is, it will be useful when a new version is developed.

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 41 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2754,19 +2754,52 @@ end}.
27542754
{validators, ["non_zero_positive_integer"]}
27552755
]}.
27562756

2757+
{mapping, "quorum_queue.wal_compute_checksums", "rabbit.quorum_wal_compute_checksums", [
2758+
{datatype, {enum, [true, false]}}
2759+
]}.
2760+
2761+
{mapping, "quorum_queue.segment_compute_checksums", "rabbit.quorum_segment_compute_checksums", [
2762+
{datatype, {enum, [true, false]}}
2763+
]}.
2764+
2765+
{mapping, "quorum_queue.max_append_entries_rpc_batch_size", "rabbit.quorum_max_append_entries_rpc_batch_size", [
2766+
{datatype, integer},
2767+
{validators, ["non_zero_positive_integer"]}
2768+
]}.
2769+
2770+
{mapping, "quorum_queue.compress_mem_tables", "rabbit.quorum_compress_mem_tables", [
2771+
{datatype, {enum, [true, false]}}
2772+
]}.
2773+
27572774
{mapping, "quorum_queue.segment_max_size_bytes", "rabbit.quorum_segment_max_size_bytes", [
27582775
{datatype, integer},
27592776
{validators, ["non_zero_positive_integer"]}
27602777
]}.
27612778

2762-
{translation, "rabbit.quorum_segment_max_size_bytes",
2763-
fun(Conf) ->
2764-
case cuttlefish:conf_get("quorum_queue.segment_max_size_bytes", Conf, undefined) of
2765-
undefined -> cuttlefish:unset();
2766-
Val -> Val
2767-
end
2768-
end
2769-
}.
2779+
{mapping, "quorum_queue.segment_max_entries", "rabbit.quorum_segment_max_entries", [
2780+
{datatype, integer},
2781+
{validators, ["non_zero_positive_integer", "positive_16_bit_unsigned_integer"]}
2782+
]}.
2783+
2784+
{mapping, "quorum_queue.wal_max_size_bytes", "rabbit.quorum_wal_max_size_bytes", [
2785+
{datatype, integer},
2786+
{validators, ["non_zero_positive_integer"]}
2787+
]}.
2788+
2789+
{mapping, "quorum_queue.wal_max_entries", "rabbit.quorum_wal_max_entries", [
2790+
{datatype, integer},
2791+
{validators, ["non_zero_positive_integer"]}
2792+
]}.
2793+
2794+
{mapping, "quorum_queue.wal_max_batch_size", "rabbit.quorum_wal_max_batch_size", [
2795+
{datatype, integer},
2796+
{validators, ["non_zero_positive_integer"]}
2797+
]}.
2798+
2799+
{mapping, "quorum_queue.snapshot_chunk_size", "rabbit.quorum_snapshot_chunk_size", [
2800+
{datatype, integer},
2801+
{validators, ["non_zero_positive_integer"]}
2802+
]}.
27702803

27712804
%%
27722805
%% Quorum Queue membership reconciliation

deps/rabbit/src/rabbit_ra_systems.erl

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424

2525
-define(COORD_WAL_MAX_SIZE_B, 64_000_000).
2626
-define(QUORUM_AER_MAX_RPC_SIZE, 16).
27-
-define(QUORUM_DEFAULT_WAL_MAX_ENTRIES, 500_000).
28-
-define(QUORUM_DEFAULT_SEGMENT_MAX_SIZE_B, 64_000_000).
27+
-define(QUORUM_DEFAULT_WAL_MAX_SIZE_B, 536_870_912).
28+
-define(QUORUM_DEFAULT_WAL_MAX_BATCH_SIZE, 4096).
2929
%% the default min bin vheap value in OTP 26
3030
-define(MIN_BIN_VHEAP_SIZE_DEFAULT, 46422).
3131
-define(MIN_BIN_VHEAP_SIZE_MULT, 64).
@@ -113,18 +113,13 @@ ensure_ra_system_started(RaSystem) ->
113113

114114
-spec get_config(ra_system_name()) -> ra_system:config().
115115
get_config(quorum_queues = RaSystem) ->
116-
DefaultConfig = get_default_config(),
116+
DefaultConfig = ra_system:default_config(),
117117
Checksums = application:get_env(rabbit, quorum_compute_checksums, true),
118118
WalChecksums = application:get_env(rabbit, quorum_wal_compute_checksums, Checksums),
119119
SegmentChecksums = application:get_env(rabbit, quorum_segment_compute_checksums,
120120
Checksums),
121-
WalMaxEntries = case DefaultConfig of
122-
#{wal_max_entries := MaxEntries}
123-
when is_integer(MaxEntries) ->
124-
MaxEntries;
125-
_ ->
126-
?QUORUM_DEFAULT_WAL_MAX_ENTRIES
127-
end,
121+
WalMaxEntries = application:get_env(rabbit, quorum_wal_max_entries,
122+
maps:get(wal_max_entries, DefaultConfig)),
128123
AERBatchSize = application:get_env(rabbit, quorum_max_append_entries_rpc_batch_size,
129124
?QUORUM_AER_MAX_RPC_SIZE),
130125
CompressMemTables = application:get_env(rabbit, quorum_compress_mem_tables, true),
@@ -135,7 +130,15 @@ get_config(quorum_queues = RaSystem) ->
135130
?MIN_BIN_VHEAP_SIZE_DEFAULT
136131
end,
137132
SegmentMaxSizeBytes = application:get_env(rabbit, quorum_segment_max_size_bytes,
138-
?QUORUM_DEFAULT_SEGMENT_MAX_SIZE_B),
133+
maps:get(segment_max_size_bytes, DefaultConfig)),
134+
SegmentMaxEntries = application:get_env(rabbit, quorum_segment_max_entries,
135+
maps:get(segment_max_entries, DefaultConfig)),
136+
WalMaxSizeBytes = application:get_env(rabbit, quorum_wal_max_size_bytes,
137+
?QUORUM_DEFAULT_WAL_MAX_SIZE_B),
138+
WalMaxBatchSize = application:get_env(rabbit, quorum_wal_max_batch_size,
139+
?QUORUM_DEFAULT_WAL_MAX_BATCH_SIZE),
140+
SnapshotChunkSize = application:get_env(rabbit, quorum_snapshot_chunk_size,
141+
maps:get(snapshot_chunk_size, DefaultConfig)),
139142

140143
DefaultConfig#{name => RaSystem,
141144
wal_min_bin_vheap_size => MinBinVheapSize,
@@ -146,10 +149,14 @@ get_config(quorum_queues = RaSystem) ->
146149
segment_compute_checksums => SegmentChecksums,
147150
compress_mem_tables => CompressMemTables,
148151
segment_max_size_bytes => SegmentMaxSizeBytes,
152+
segment_max_entries => SegmentMaxEntries,
153+
wal_max_size_bytes => WalMaxSizeBytes,
154+
wal_max_batch_size => WalMaxBatchSize,
155+
snapshot_chunk_size => SnapshotChunkSize,
149156
server_recovery_strategy => {rabbit_quorum_queue,
150157
system_recover, []}};
151158
get_config(coordination = RaSystem) ->
152-
DefaultConfig = get_default_config(),
159+
DefaultConfig = ra_system:default_config(),
153160
CoordDataDir = filename:join(
154161
[rabbit:data_dir(), "coordination", node()]),
155162
DefaultConfig#{name => RaSystem,
@@ -158,13 +165,7 @@ get_config(coordination = RaSystem) ->
158165
wal_max_size_bytes => ?COORD_WAL_MAX_SIZE_B,
159166
names => ra_system:derive_names(RaSystem)}.
160167

161-
-spec get_default_config() -> ra_system:config().
162-
163-
get_default_config() ->
164-
ra_system:default_config().
165-
166168
-spec ensure_stopped() -> ok | no_return().
167-
168169
ensure_stopped() ->
169170
?LOG_DEBUG(
170171
"Stopping Ra systems",

deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,13 +1160,76 @@ credential_validator.regexp = ^abc\\d+",
11601160
]}],
11611161
[]},
11621162

1163+
{quorum_queue_wal_compute_checksums,
1164+
"quorum_queue.wal_compute_checksums = true",
1165+
[{rabbit, [
1166+
{quorum_wal_compute_checksums, true}
1167+
]}],
1168+
[]},
1169+
1170+
{quorum_queue_segment_compute_checksums,
1171+
"quorum_queue.segment_compute_checksums = true",
1172+
[{rabbit, [
1173+
{quorum_segment_compute_checksums, true}
1174+
]}],
1175+
[]},
1176+
1177+
{quorum_queue_max_append_entries_rpc_batch_size,
1178+
"quorum_queue.max_append_entries_rpc_batch_size = 16",
1179+
[{rabbit, [
1180+
{quorum_max_append_entries_rpc_batch_size, 16}
1181+
]}],
1182+
[]},
1183+
1184+
{quorum_queue_compress_mem_tables,
1185+
"quorum_queue.compress_mem_tables = true",
1186+
[{rabbit, [
1187+
{quorum_compress_mem_tables, true}
1188+
]}],
1189+
[]},
1190+
11631191
{quorum_queue_segment_max_size_bytes,
11641192
"quorum_queue.segment_max_size_bytes = 64000000",
11651193
[{rabbit, [
11661194
{quorum_segment_max_size_bytes, 64000000}
11671195
]}],
11681196
[]},
11691197

1198+
{quorum_queue_segment_max_entries,
1199+
"quorum_queue.segment_max_entries = 4096",
1200+
[{rabbit, [
1201+
{quorum_segment_max_entries, 4096}
1202+
]}],
1203+
[]},
1204+
1205+
{quorum_queue_wal_max_size_bytes,
1206+
"quorum_queue.wal_max_size_bytes = 536870912",
1207+
[{rabbit, [
1208+
{quorum_wal_max_size_bytes, 536870912}
1209+
]}],
1210+
[]},
1211+
1212+
{quorum_queue_wal_max_entries,
1213+
"quorum_queue.wal_max_entries = 500000",
1214+
[{rabbit, [
1215+
{quorum_wal_max_entries, 500000}
1216+
]}],
1217+
[]},
1218+
1219+
{quorum_queue_wal_max_batch_size,
1220+
"quorum_queue.wal_max_batch_size = 4096",
1221+
[{rabbit, [
1222+
{quorum_wal_max_batch_size, 4096}
1223+
]}],
1224+
[]},
1225+
1226+
{quorum_queue_snapshot_chunk_size,
1227+
"quorum_queue.snapshot_chunk_size = 1000000",
1228+
[{rabbit, [
1229+
{quorum_snapshot_chunk_size, 1000000}
1230+
]}],
1231+
[]},
1232+
11701233
%%
11711234
%% Runtime parameters
11721235
%%

0 commit comments

Comments
 (0)