Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions deps/rabbit/docs/rabbitmq.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,46 @@
## when there are multiple publishers sending to the same quorum queue.
# quorum_queue.commands_soft_limit = 32

## Enables or disables WAL checksums for quorum queues.
##
# quorum_queue.wal_compute_checksums = true

## Enables or disables segment file checksums for quorum queues.
##
# quorum_queue.segment_compute_checksums = true

## Sets the maximum batch size for append entries RPC for quorum queues.
##
# quorum_queue.max_append_entries_rpc_batch_size = 16

## Enables or disables memory table compression for quorum queues.
##
# quorum_queue.compress_mem_tables = true

## Sets the maximum size of Raft log segment files for quorum queues.
##
# quorum_queue.segment_max_size_bytes = 64000000

## Sets the maximum number of entries per Raft log segment for quorum queues.
##
# quorum_queue.segment_max_entries = 4096

## Sets the maximum size of the Write-Ahead Log (WAL) for quorum queues.
##
# quorum_queue.wal_max_size_bytes = 536870912

## Sets the maximum number of entries in the Write-Ahead Log (WAL) for quorum queues.
##
# quorum_queue.wal_max_entries = 500000

## Sets the maximum batch size for WAL writes for quorum queues.
##
# quorum_queue.wal_max_batch_size = 4096

## Sets the maximum size of snapshot chunks for quorum queues.
##
# quorum_queue.snapshot_chunk_size = 1000000

## Changes classic queue storage implementation version.
## As of 4.0.0, version 2 is the default and this is a forward compatibility setting,
## that is, it will be useful when a new version is developed.
Expand Down
47 changes: 47 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -2754,6 +2754,53 @@ end}.
{validators, ["non_zero_positive_integer"]}
]}.

{mapping, "quorum_queue.wal_compute_checksums", "rabbit.quorum_wal_compute_checksums", [
{datatype, {enum, [true, false]}}
]}.

{mapping, "quorum_queue.segment_compute_checksums", "rabbit.quorum_segment_compute_checksums", [
{datatype, {enum, [true, false]}}
]}.

{mapping, "quorum_queue.max_append_entries_rpc_batch_size", "rabbit.quorum_max_append_entries_rpc_batch_size", [
{datatype, integer},
{validators, ["non_zero_positive_integer"]}
]}.

{mapping, "quorum_queue.compress_mem_tables", "rabbit.quorum_compress_mem_tables", [
{datatype, {enum, [true, false]}}
]}.

{mapping, "quorum_queue.segment_max_size_bytes", "rabbit.quorum_segment_max_size_bytes", [
{datatype, integer},
{validators, ["non_zero_positive_integer"]}
]}.

{mapping, "quorum_queue.segment_max_entries", "rabbit.quorum_segment_max_entries", [
{datatype, integer},
{validators, ["non_zero_positive_integer", "positive_16_bit_unsigned_integer"]}
]}.

{mapping, "quorum_queue.wal_max_size_bytes", "rabbit.quorum_wal_max_size_bytes", [
{datatype, integer},
{validators, ["non_zero_positive_integer"]}
]}.

{mapping, "quorum_queue.wal_max_entries", "rabbit.quorum_wal_max_entries", [
{datatype, integer},
{validators, ["non_zero_positive_integer"]}
]}.

{mapping, "quorum_queue.wal_max_batch_size", "rabbit.quorum_wal_max_batch_size", [
{datatype, integer},
{validators, ["non_zero_positive_integer"]}
]}.

{mapping, "quorum_queue.snapshot_chunk_size", "rabbit.quorum_snapshot_chunk_size", [
{datatype, integer},
{validators, ["non_zero_positive_integer"]}
]}.

%%
%% Quorum Queue membership reconciliation
%%
Expand Down
37 changes: 21 additions & 16 deletions deps/rabbit/src/rabbit_ra_systems.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@

-define(COORD_WAL_MAX_SIZE_B, 64_000_000).
-define(QUORUM_AER_MAX_RPC_SIZE, 16).
-define(QUORUM_DEFAULT_WAL_MAX_ENTRIES, 500_000).
-define(QUORUM_DEFAULT_WAL_MAX_SIZE_B, 536_870_912).
-define(QUORUM_DEFAULT_WAL_MAX_BATCH_SIZE, 4096).
%% the default min bin vheap value in OTP 26
-define(MIN_BIN_VHEAP_SIZE_DEFAULT, 46422).
-define(MIN_BIN_VHEAP_SIZE_MULT, 64).
Expand Down Expand Up @@ -112,18 +113,13 @@ ensure_ra_system_started(RaSystem) ->

-spec get_config(ra_system_name()) -> ra_system:config().
get_config(quorum_queues = RaSystem) ->
DefaultConfig = get_default_config(),
DefaultConfig = ra_system:default_config(),
Checksums = application:get_env(rabbit, quorum_compute_checksums, true),
WalChecksums = application:get_env(rabbit, quorum_wal_compute_checksums, Checksums),
SegmentChecksums = application:get_env(rabbit, quorum_segment_compute_checksums,
Checksums),
WalMaxEntries = case DefaultConfig of
#{wal_max_entries := MaxEntries}
when is_integer(MaxEntries) ->
MaxEntries;
_ ->
?QUORUM_DEFAULT_WAL_MAX_ENTRIES
end,
WalMaxEntries = application:get_env(rabbit, quorum_wal_max_entries,
maps:get(wal_max_entries, DefaultConfig)),
AERBatchSize = application:get_env(rabbit, quorum_max_append_entries_rpc_batch_size,
?QUORUM_AER_MAX_RPC_SIZE),
CompressMemTables = application:get_env(rabbit, quorum_compress_mem_tables, true),
Expand All @@ -133,6 +129,16 @@ get_config(quorum_queues = RaSystem) ->
_ ->
?MIN_BIN_VHEAP_SIZE_DEFAULT
end,
SegmentMaxSizeBytes = application:get_env(rabbit, quorum_segment_max_size_bytes,
maps:get(segment_max_size_bytes, DefaultConfig)),
SegmentMaxEntries = application:get_env(rabbit, quorum_segment_max_entries,
maps:get(segment_max_entries, DefaultConfig)),
WalMaxSizeBytes = application:get_env(rabbit, quorum_wal_max_size_bytes,
?QUORUM_DEFAULT_WAL_MAX_SIZE_B),
WalMaxBatchSize = application:get_env(rabbit, quorum_wal_max_batch_size,
?QUORUM_DEFAULT_WAL_MAX_BATCH_SIZE),
SnapshotChunkSize = application:get_env(rabbit, quorum_snapshot_chunk_size,
maps:get(snapshot_chunk_size, DefaultConfig)),

DefaultConfig#{name => RaSystem,
wal_min_bin_vheap_size => MinBinVheapSize,
Expand All @@ -142,10 +148,15 @@ get_config(quorum_queues = RaSystem) ->
wal_max_entries => WalMaxEntries,
segment_compute_checksums => SegmentChecksums,
compress_mem_tables => CompressMemTables,
segment_max_size_bytes => SegmentMaxSizeBytes,
segment_max_entries => SegmentMaxEntries,
wal_max_size_bytes => WalMaxSizeBytes,
wal_max_batch_size => WalMaxBatchSize,
snapshot_chunk_size => SnapshotChunkSize,
server_recovery_strategy => {rabbit_quorum_queue,
system_recover, []}};
get_config(coordination = RaSystem) ->
DefaultConfig = get_default_config(),
DefaultConfig = ra_system:default_config(),
CoordDataDir = filename:join(
[rabbit:data_dir(), "coordination", node()]),
DefaultConfig#{name => RaSystem,
Expand All @@ -154,13 +165,7 @@ get_config(coordination = RaSystem) ->
wal_max_size_bytes => ?COORD_WAL_MAX_SIZE_B,
names => ra_system:derive_names(RaSystem)}.

-spec get_default_config() -> ra_system:config().

get_default_config() ->
ra_system:default_config().

-spec ensure_stopped() -> ok | no_return().

ensure_stopped() ->
?LOG_DEBUG(
"Stopping Ra systems",
Expand Down
70 changes: 70 additions & 0 deletions deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,76 @@ credential_validator.regexp = ^abc\\d+",
]}],
[]},

{quorum_queue_wal_compute_checksums,
"quorum_queue.wal_compute_checksums = true",
[{rabbit, [
{quorum_wal_compute_checksums, true}
]}],
[]},

{quorum_queue_segment_compute_checksums,
"quorum_queue.segment_compute_checksums = true",
[{rabbit, [
{quorum_segment_compute_checksums, true}
]}],
[]},

{quorum_queue_max_append_entries_rpc_batch_size,
"quorum_queue.max_append_entries_rpc_batch_size = 16",
[{rabbit, [
{quorum_max_append_entries_rpc_batch_size, 16}
]}],
[]},

{quorum_queue_compress_mem_tables,
"quorum_queue.compress_mem_tables = true",
[{rabbit, [
{quorum_compress_mem_tables, true}
]}],
[]},

{quorum_queue_segment_max_size_bytes,
"quorum_queue.segment_max_size_bytes = 64000000",
[{rabbit, [
{quorum_segment_max_size_bytes, 64000000}
]}],
[]},

{quorum_queue_segment_max_entries,
"quorum_queue.segment_max_entries = 4096",
[{rabbit, [
{quorum_segment_max_entries, 4096}
]}],
[]},

{quorum_queue_wal_max_size_bytes,
"quorum_queue.wal_max_size_bytes = 536870912",
[{rabbit, [
{quorum_wal_max_size_bytes, 536870912}
]}],
[]},

{quorum_queue_wal_max_entries,
"quorum_queue.wal_max_entries = 500000",
[{rabbit, [
{quorum_wal_max_entries, 500000}
]}],
[]},

{quorum_queue_wal_max_batch_size,
"quorum_queue.wal_max_batch_size = 4096",
[{rabbit, [
{quorum_wal_max_batch_size, 4096}
]}],
[]},

{quorum_queue_snapshot_chunk_size,
"quorum_queue.snapshot_chunk_size = 1000000",
[{rabbit, [
{quorum_snapshot_chunk_size, 1000000}
]}],
[]},

%%
%% Runtime parameters
%%
Expand Down
Loading