Skip to content

Commit ab1b76a

Browse files
committed
Make all quorum queue Ra configuration parameters explicit and configurable
Add support for configuring all quorum queue Ra system parameters via rabbitmq.conf using the quorum_queue.* namespace. This allows operators to fine-tune Raft log and WAL behavior specifically for quorum queues. New quorum_queue.* configuration options: - quorum_queue.wal_compute_checksums (default: true) - quorum_queue.segment_compute_checksums (default: true) - quorum_queue.max_append_entries_rpc_batch_size (default: 16) - quorum_queue.compress_mem_tables (default: true) - quorum_queue.segment_max_size_bytes (default: 64 MB) - quorum_queue.segment_max_entries (default: 4096) - quorum_queue.wal_max_size_bytes (default: 512 MB) - quorum_queue.wal_max_entries (default: 500,000) - quorum_queue.wal_max_batch_size (default: 4096) - quorum_queue.snapshot_chunk_size (default: 1 MB) Each quorum_queue.* setting has a fallback to the corresponding raft.* setting for backward compatibility with existing configurations. All configuration is mapped to rabbit application environment variables and explicitly applied to the quorum_queues Ra system during initialization, making the data flow transparent and maintainable. Add macros for all default values that match the RabbitMQ prelaunch defaults, centralizing configuration defaults in one place for easier maintenance. Simplify WalMaxEntries handling to use application:get_env with macro default instead of complex case statement checking DefaultConfig.
1 parent eacdd0c commit ab1b76a

4 files changed

Lines changed: 229 additions & 8 deletions

File tree

deps/rabbit/docs/rabbitmq.conf.example

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,10 +487,46 @@
487487
## when there are multiple publishers sending to the same quorum queue.
488488
# quorum_queue.commands_soft_limit = 32
489489

490+
## Enables or disables WAL checksums for quorum queues.
491+
##
492+
# quorum_queue.wal_compute_checksums = true
493+
494+
## Enables or disables segment file checksums for quorum queues.
495+
##
496+
# quorum_queue.segment_compute_checksums = true
497+
498+
## Sets the maximum batch size for append entries RPC for quorum queues.
499+
##
500+
# quorum_queue.max_append_entries_rpc_batch_size = 16
501+
502+
## Enables or disables memory table compression for quorum queues.
503+
##
504+
# quorum_queue.compress_mem_tables = true
505+
490506
## Sets the maximum size of Raft log segment files for quorum queues.
491507
##
492508
# quorum_queue.segment_max_size_bytes = 64000000
493509

510+
## Sets the maximum number of entries per Raft log segment for quorum queues.
511+
##
512+
# quorum_queue.segment_max_entries = 65536
513+
514+
## Sets the maximum size of the Write-Ahead Log (WAL) for quorum queues.
515+
##
516+
# quorum_queue.wal_max_size_bytes = 536870912
517+
518+
## Sets the maximum number of entries in the Write-Ahead Log (WAL) for quorum queues.
519+
##
520+
# quorum_queue.wal_max_entries = 500000
521+
522+
## Sets the maximum batch size for WAL writes for quorum queues.
523+
##
524+
# quorum_queue.wal_max_batch_size = 4096
525+
526+
## Sets the maximum size of snapshot chunks for quorum queues.
527+
##
528+
# quorum_queue.snapshot_chunk_size = 1000000
529+
494530
## Changes classic queue storage implementation version.
495531
## As of 4.0.0, version 2 is the default and this is a forward compatibility setting,
496532
## that is, it will be useful when a new version is developed.

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2754,6 +2754,23 @@ end}.
27542754
{validators, ["non_zero_positive_integer"]}
27552755
]}.
27562756

2757+
{mapping, "quorum_queue.wal_compute_checksums", "rabbit.quorum_wal_compute_checksums", [
2758+
{datatype, {enum, [true, false]}}
2759+
]}.
2760+
2761+
{mapping, "quorum_queue.segment_compute_checksums", "rabbit.quorum_segment_compute_checksums", [
2762+
{datatype, {enum, [true, false]}}
2763+
]}.
2764+
2765+
{mapping, "quorum_queue.max_append_entries_rpc_batch_size", "rabbit.quorum_max_append_entries_rpc_batch_size", [
2766+
{datatype, integer},
2767+
{validators, ["non_zero_positive_integer"]}
2768+
]}.
2769+
2770+
{mapping, "quorum_queue.compress_mem_tables", "rabbit.quorum_compress_mem_tables", [
2771+
{datatype, {enum, [true, false]}}
2772+
]}.
2773+
27572774
{mapping, "quorum_queue.segment_max_size_bytes", "rabbit.quorum_segment_max_size_bytes", [
27582775
{datatype, integer},
27592776
{validators, ["non_zero_positive_integer"]}
@@ -2762,7 +2779,101 @@ end}.
27622779
{translation, "rabbit.quorum_segment_max_size_bytes",
27632780
fun(Conf) ->
27642781
case cuttlefish:conf_get("quorum_queue.segment_max_size_bytes", Conf, undefined) of
2765-
undefined -> cuttlefish:unset();
2782+
undefined ->
2783+
case cuttlefish:conf_get("raft.segment_max_size_bytes", Conf, undefined) of
2784+
undefined -> cuttlefish:unset();
2785+
Val -> Val
2786+
end;
2787+
Val -> Val
2788+
end
2789+
end
2790+
}.
2791+
2792+
{mapping, "quorum_queue.segment_max_entries", "rabbit.quorum_segment_max_entries", [
2793+
{datatype, integer},
2794+
{validators, ["non_zero_positive_integer", "positive_16_bit_unsigned_integer"]}
2795+
]}.
2796+
2797+
{translation, "rabbit.quorum_segment_max_entries",
2798+
fun(Conf) ->
2799+
case cuttlefish:conf_get("quorum_queue.segment_max_entries", Conf, undefined) of
2800+
undefined ->
2801+
case cuttlefish:conf_get("raft.segment_max_entries", Conf, undefined) of
2802+
undefined -> cuttlefish:unset();
2803+
Val -> Val
2804+
end;
2805+
Val -> Val
2806+
end
2807+
end
2808+
}.
2809+
2810+
{mapping, "quorum_queue.wal_max_size_bytes", "rabbit.quorum_wal_max_size_bytes", [
2811+
{datatype, integer},
2812+
{validators, ["non_zero_positive_integer"]}
2813+
]}.
2814+
2815+
{translation, "rabbit.quorum_wal_max_size_bytes",
2816+
fun(Conf) ->
2817+
case cuttlefish:conf_get("quorum_queue.wal_max_size_bytes", Conf, undefined) of
2818+
undefined ->
2819+
case cuttlefish:conf_get("raft.wal_max_size_bytes", Conf, undefined) of
2820+
undefined -> cuttlefish:unset();
2821+
Val -> Val
2822+
end;
2823+
Val -> Val
2824+
end
2825+
end
2826+
}.
2827+
2828+
{mapping, "quorum_queue.wal_max_entries", "rabbit.quorum_wal_max_entries", [
2829+
{datatype, integer},
2830+
{validators, ["non_zero_positive_integer"]}
2831+
]}.
2832+
2833+
{translation, "rabbit.quorum_wal_max_entries",
2834+
fun(Conf) ->
2835+
case cuttlefish:conf_get("quorum_queue.wal_max_entries", Conf, undefined) of
2836+
undefined ->
2837+
case cuttlefish:conf_get("raft.wal_max_entries", Conf, undefined) of
2838+
undefined -> cuttlefish:unset();
2839+
Val -> Val
2840+
end;
2841+
Val -> Val
2842+
end
2843+
end
2844+
}.
2845+
2846+
{mapping, "quorum_queue.wal_max_batch_size", "rabbit.quorum_wal_max_batch_size", [
2847+
{datatype, integer},
2848+
{validators, ["non_zero_positive_integer"]}
2849+
]}.
2850+
2851+
{translation, "rabbit.quorum_wal_max_batch_size",
2852+
fun(Conf) ->
2853+
case cuttlefish:conf_get("quorum_queue.wal_max_batch_size", Conf, undefined) of
2854+
undefined ->
2855+
case cuttlefish:conf_get("raft.wal_max_batch_size", Conf, undefined) of
2856+
undefined -> cuttlefish:unset();
2857+
Val -> Val
2858+
end;
2859+
Val -> Val
2860+
end
2861+
end
2862+
}.
2863+
2864+
{mapping, "quorum_queue.snapshot_chunk_size", "rabbit.quorum_snapshot_chunk_size", [
2865+
{datatype, integer},
2866+
{validators, ["non_zero_positive_integer"]}
2867+
]}.
2868+
2869+
{translation, "rabbit.quorum_snapshot_chunk_size",
2870+
fun(Conf) ->
2871+
case cuttlefish:conf_get("quorum_queue.snapshot_chunk_size", Conf, undefined) of
2872+
undefined ->
2873+
case cuttlefish:conf_get("raft.snapshot_chunk_size", Conf, undefined) of
2874+
undefined -> cuttlefish:unset();
2875+
Val -> Val
2876+
end;
27662877
Val -> Val
27672878
end
27682879
end

deps/rabbit/src/rabbit_ra_systems.erl

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@
2626
-define(QUORUM_AER_MAX_RPC_SIZE, 16).
2727
-define(QUORUM_DEFAULT_WAL_MAX_ENTRIES, 500_000).
2828
-define(QUORUM_DEFAULT_SEGMENT_MAX_SIZE_B, 64_000_000).
29+
-define(QUORUM_DEFAULT_SEGMENT_MAX_ENTRIES, 4096).
30+
-define(QUORUM_DEFAULT_WAL_MAX_SIZE_B, 536_870_912).
31+
-define(QUORUM_DEFAULT_WAL_MAX_BATCH_SIZE, 8192).
32+
-define(QUORUM_DEFAULT_SNAPSHOT_CHUNK_SIZE, 1_000_000).
2933
%% the default min bin vheap value in OTP 26
3034
-define(MIN_BIN_VHEAP_SIZE_DEFAULT, 46422).
3135
-define(MIN_BIN_VHEAP_SIZE_MULT, 64).
@@ -118,13 +122,8 @@ get_config(quorum_queues = RaSystem) ->
118122
WalChecksums = application:get_env(rabbit, quorum_wal_compute_checksums, Checksums),
119123
SegmentChecksums = application:get_env(rabbit, quorum_segment_compute_checksums,
120124
Checksums),
121-
WalMaxEntries = case DefaultConfig of
122-
#{wal_max_entries := MaxEntries}
123-
when is_integer(MaxEntries) ->
124-
MaxEntries;
125-
_ ->
126-
?QUORUM_DEFAULT_WAL_MAX_ENTRIES
127-
end,
125+
WalMaxEntries = application:get_env(rabbit, quorum_wal_max_entries,
126+
?QUORUM_DEFAULT_WAL_MAX_ENTRIES),
128127
AERBatchSize = application:get_env(rabbit, quorum_max_append_entries_rpc_batch_size,
129128
?QUORUM_AER_MAX_RPC_SIZE),
130129
CompressMemTables = application:get_env(rabbit, quorum_compress_mem_tables, true),
@@ -136,6 +135,14 @@ get_config(quorum_queues = RaSystem) ->
136135
end,
137136
SegmentMaxSizeBytes = application:get_env(rabbit, quorum_segment_max_size_bytes,
138137
?QUORUM_DEFAULT_SEGMENT_MAX_SIZE_B),
138+
SegmentMaxEntries = application:get_env(rabbit, quorum_segment_max_entries,
139+
?QUORUM_DEFAULT_SEGMENT_MAX_ENTRIES),
140+
WalMaxSizeBytes = application:get_env(rabbit, quorum_wal_max_size_bytes,
141+
?QUORUM_DEFAULT_WAL_MAX_SIZE_B),
142+
WalMaxBatchSize = application:get_env(rabbit, quorum_wal_max_batch_size,
143+
?QUORUM_DEFAULT_WAL_MAX_BATCH_SIZE),
144+
SnapshotChunkSize = application:get_env(rabbit, quorum_snapshot_chunk_size,
145+
?QUORUM_DEFAULT_SNAPSHOT_CHUNK_SIZE),
139146

140147
DefaultConfig#{name => RaSystem,
141148
wal_min_bin_vheap_size => MinBinVheapSize,
@@ -146,6 +153,10 @@ get_config(quorum_queues = RaSystem) ->
146153
segment_compute_checksums => SegmentChecksums,
147154
compress_mem_tables => CompressMemTables,
148155
segment_max_size_bytes => SegmentMaxSizeBytes,
156+
segment_max_entries => SegmentMaxEntries,
157+
wal_max_size_bytes => WalMaxSizeBytes,
158+
wal_max_batch_size => WalMaxBatchSize,
159+
snapshot_chunk_size => SnapshotChunkSize,
149160
server_recovery_strategy => {rabbit_quorum_queue,
150161
system_recover, []}};
151162
get_config(coordination = RaSystem) ->

deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,13 +1160,76 @@ credential_validator.regexp = ^abc\\d+",
11601160
]}],
11611161
[]},
11621162

1163+
{quorum_queue_wal_compute_checksums,
1164+
"quorum_queue.wal_compute_checksums = true",
1165+
[{rabbit, [
1166+
{quorum_wal_compute_checksums, true}
1167+
]}],
1168+
[]},
1169+
1170+
{quorum_queue_segment_compute_checksums,
1171+
"quorum_queue.segment_compute_checksums = true",
1172+
[{rabbit, [
1173+
{quorum_segment_compute_checksums, true}
1174+
]}],
1175+
[]},
1176+
1177+
{quorum_queue_max_append_entries_rpc_batch_size,
1178+
"quorum_queue.max_append_entries_rpc_batch_size = 16",
1179+
[{rabbit, [
1180+
{quorum_max_append_entries_rpc_batch_size, 16}
1181+
]}],
1182+
[]},
1183+
1184+
{quorum_queue_compress_mem_tables,
1185+
"quorum_queue.compress_mem_tables = true",
1186+
[{rabbit, [
1187+
{quorum_compress_mem_tables, true}
1188+
]}],
1189+
[]},
1190+
11631191
{quorum_queue_segment_max_size_bytes,
11641192
"quorum_queue.segment_max_size_bytes = 64000000",
11651193
[{rabbit, [
11661194
{quorum_segment_max_size_bytes, 64000000}
11671195
]}],
11681196
[]},
11691197

1198+
{quorum_queue_segment_max_entries,
1199+
"quorum_queue.segment_max_entries = 65536",
1200+
[{rabbit, [
1201+
{quorum_segment_max_entries, 65536}
1202+
]}],
1203+
[]},
1204+
1205+
{quorum_queue_wal_max_size_bytes,
1206+
"quorum_queue.wal_max_size_bytes = 536870912",
1207+
[{rabbit, [
1208+
{quorum_wal_max_size_bytes, 536870912}
1209+
]}],
1210+
[]},
1211+
1212+
{quorum_queue_wal_max_entries,
1213+
"quorum_queue.wal_max_entries = 500000",
1214+
[{rabbit, [
1215+
{quorum_wal_max_entries, 500000}
1216+
]}],
1217+
[]},
1218+
1219+
{quorum_queue_wal_max_batch_size,
1220+
"quorum_queue.wal_max_batch_size = 4096",
1221+
[{rabbit, [
1222+
{quorum_wal_max_batch_size, 4096}
1223+
]}],
1224+
[]},
1225+
1226+
{quorum_queue_snapshot_chunk_size,
1227+
"quorum_queue.snapshot_chunk_size = 1000000",
1228+
[{rabbit, [
1229+
{quorum_snapshot_chunk_size, 1000000}
1230+
]}],
1231+
[]},
1232+
11701233
%%
11711234
%% Runtime parameters
11721235
%%

0 commit comments

Comments
 (0)