diff --git a/deps/rabbit/docs/rabbitmq.conf.example b/deps/rabbit/docs/rabbitmq.conf.example index 81b8220bac6d..47e2c41d0fab 100644 --- a/deps/rabbit/docs/rabbitmq.conf.example +++ b/deps/rabbit/docs/rabbitmq.conf.example @@ -487,6 +487,46 @@ ## when there are multiple publishers sending to the same quorum queue. # quorum_queue.commands_soft_limit = 32 +## Enables or disables WAL checksums for quorum queues. +## +# quorum_queue.wal_compute_checksums = true + +## Enables or disables segment file checksums for quorum queues. +## +# quorum_queue.segment_compute_checksums = true + +## Sets the maximum batch size for append entries RPC for quorum queues. +## +# quorum_queue.max_append_entries_rpc_batch_size = 16 + +## Enables or disables memory table compression for quorum queues. +## +# quorum_queue.compress_mem_tables = true + +## Sets the maximum size of Raft log segment files for quorum queues. +## +# quorum_queue.segment_max_size_bytes = 64000000 + +## Sets the maximum number of entries per Raft log segment for quorum queues. +## +# quorum_queue.segment_max_entries = 4096 + +## Sets the maximum size of the Write-Ahead Log (WAL) for quorum queues. +## +# quorum_queue.wal_max_size_bytes = 536870912 + +## Sets the maximum number of entries in the Write-Ahead Log (WAL) for quorum queues. +## +# quorum_queue.wal_max_entries = 500000 + +## Sets the maximum batch size for WAL writes for quorum queues. +## +# quorum_queue.wal_max_batch_size = 4096 + +## Sets the maximum size of snapshot chunks for quorum queues. +## +# quorum_queue.snapshot_chunk_size = 1000000 + ## Changes classic queue storage implementation version. ## As of 4.0.0, version 2 is the default and this is a forward compatibility setting, ## that is, it will be useful when a new version is developed. diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 7b708dcdc9ca..d87c8ef9368f 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2754,6 +2754,53 @@ end}. {validators, ["non_zero_positive_integer"]} ]}. +{mapping, "quorum_queue.wal_compute_checksums", "rabbit.quorum_wal_compute_checksums", [ + {datatype, {enum, [true, false]}} +]}. + +{mapping, "quorum_queue.segment_compute_checksums", "rabbit.quorum_segment_compute_checksums", [ + {datatype, {enum, [true, false]}} +]}. + +{mapping, "quorum_queue.max_append_entries_rpc_batch_size", "rabbit.quorum_max_append_entries_rpc_batch_size", [ + {datatype, integer}, + {validators, ["non_zero_positive_integer"]} +]}. + +{mapping, "quorum_queue.compress_mem_tables", "rabbit.quorum_compress_mem_tables", [ + {datatype, {enum, [true, false]}} +]}. + +{mapping, "quorum_queue.segment_max_size_bytes", "rabbit.quorum_segment_max_size_bytes", [ + {datatype, integer}, + {validators, ["non_zero_positive_integer"]} +]}. + +{mapping, "quorum_queue.segment_max_entries", "rabbit.quorum_segment_max_entries", [ + {datatype, integer}, + {validators, ["non_zero_positive_integer", "positive_16_bit_unsigned_integer"]} +]}. + +{mapping, "quorum_queue.wal_max_size_bytes", "rabbit.quorum_wal_max_size_bytes", [ + {datatype, integer}, + {validators, ["non_zero_positive_integer"]} +]}. + +{mapping, "quorum_queue.wal_max_entries", "rabbit.quorum_wal_max_entries", [ + {datatype, integer}, + {validators, ["non_zero_positive_integer"]} +]}. + +{mapping, "quorum_queue.wal_max_batch_size", "rabbit.quorum_wal_max_batch_size", [ + {datatype, integer}, + {validators, ["non_zero_positive_integer"]} +]}. + +{mapping, "quorum_queue.snapshot_chunk_size", "rabbit.quorum_snapshot_chunk_size", [ + {datatype, integer}, + {validators, ["non_zero_positive_integer"]} +]}. + %% %% Quorum Queue membership reconciliation %% diff --git a/deps/rabbit/src/rabbit_ra_systems.erl b/deps/rabbit/src/rabbit_ra_systems.erl index 960a0f804507..f76602d4e9e3 100644 --- a/deps/rabbit/src/rabbit_ra_systems.erl +++ b/deps/rabbit/src/rabbit_ra_systems.erl @@ -24,7 +24,8 @@ -define(COORD_WAL_MAX_SIZE_B, 64_000_000). -define(QUORUM_AER_MAX_RPC_SIZE, 16). --define(QUORUM_DEFAULT_WAL_MAX_ENTRIES, 500_000). +-define(QUORUM_DEFAULT_WAL_MAX_SIZE_B, 536_870_912). +-define(QUORUM_DEFAULT_WAL_MAX_BATCH_SIZE, 4096). %% the default min bin vheap value in OTP 26 -define(MIN_BIN_VHEAP_SIZE_DEFAULT, 46422). -define(MIN_BIN_VHEAP_SIZE_MULT, 64). @@ -112,18 +113,13 @@ ensure_ra_system_started(RaSystem) -> -spec get_config(ra_system_name()) -> ra_system:config(). get_config(quorum_queues = RaSystem) -> - DefaultConfig = get_default_config(), + DefaultConfig = ra_system:default_config(), Checksums = application:get_env(rabbit, quorum_compute_checksums, true), WalChecksums = application:get_env(rabbit, quorum_wal_compute_checksums, Checksums), SegmentChecksums = application:get_env(rabbit, quorum_segment_compute_checksums, Checksums), - WalMaxEntries = case DefaultConfig of - #{wal_max_entries := MaxEntries} - when is_integer(MaxEntries) -> - MaxEntries; - _ -> - ?QUORUM_DEFAULT_WAL_MAX_ENTRIES - end, + WalMaxEntries = application:get_env(rabbit, quorum_wal_max_entries, + maps:get(wal_max_entries, DefaultConfig)), AERBatchSize = application:get_env(rabbit, quorum_max_append_entries_rpc_batch_size, ?QUORUM_AER_MAX_RPC_SIZE), CompressMemTables = application:get_env(rabbit, quorum_compress_mem_tables, true), @@ -133,6 +129,16 @@ get_config(quorum_queues = RaSystem) -> _ -> ?MIN_BIN_VHEAP_SIZE_DEFAULT end, + SegmentMaxSizeBytes = application:get_env(rabbit, quorum_segment_max_size_bytes, + maps:get(segment_max_size_bytes, DefaultConfig)), + SegmentMaxEntries = application:get_env(rabbit, quorum_segment_max_entries, + maps:get(segment_max_entries, DefaultConfig)), + WalMaxSizeBytes = application:get_env(rabbit, quorum_wal_max_size_bytes, + ?QUORUM_DEFAULT_WAL_MAX_SIZE_B), + WalMaxBatchSize = application:get_env(rabbit, quorum_wal_max_batch_size, + ?QUORUM_DEFAULT_WAL_MAX_BATCH_SIZE), + SnapshotChunkSize = application:get_env(rabbit, quorum_snapshot_chunk_size, + maps:get(snapshot_chunk_size, DefaultConfig)), DefaultConfig#{name => RaSystem, wal_min_bin_vheap_size => MinBinVheapSize, @@ -142,10 +148,15 @@ get_config(quorum_queues = RaSystem) -> wal_max_entries => WalMaxEntries, segment_compute_checksums => SegmentChecksums, compress_mem_tables => CompressMemTables, + segment_max_size_bytes => SegmentMaxSizeBytes, + segment_max_entries => SegmentMaxEntries, + wal_max_size_bytes => WalMaxSizeBytes, + wal_max_batch_size => WalMaxBatchSize, + snapshot_chunk_size => SnapshotChunkSize, server_recovery_strategy => {rabbit_quorum_queue, system_recover, []}}; get_config(coordination = RaSystem) -> - DefaultConfig = get_default_config(), + DefaultConfig = ra_system:default_config(), CoordDataDir = filename:join( [rabbit:data_dir(), "coordination", node()]), DefaultConfig#{name => RaSystem, @@ -154,13 +165,7 @@ get_config(coordination = RaSystem) -> wal_max_size_bytes => ?COORD_WAL_MAX_SIZE_B, names => ra_system:derive_names(RaSystem)}. --spec get_default_config() -> ra_system:config(). - -get_default_config() -> - ra_system:default_config(). - -spec ensure_stopped() -> ok | no_return(). - ensure_stopped() -> ?LOG_DEBUG( "Stopping Ra systems", diff --git a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets index 649d097f30a4..9beb4f529cec 100644 --- a/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets +++ b/deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets @@ -1160,6 +1160,76 @@ credential_validator.regexp = ^abc\\d+", ]}], []}, + {quorum_queue_wal_compute_checksums, + "quorum_queue.wal_compute_checksums = true", + [{rabbit, [ + {quorum_wal_compute_checksums, true} + ]}], + []}, + + {quorum_queue_segment_compute_checksums, + "quorum_queue.segment_compute_checksums = true", + [{rabbit, [ + {quorum_segment_compute_checksums, true} + ]}], + []}, + + {quorum_queue_max_append_entries_rpc_batch_size, + "quorum_queue.max_append_entries_rpc_batch_size = 16", + [{rabbit, [ + {quorum_max_append_entries_rpc_batch_size, 16} + ]}], + []}, + + {quorum_queue_compress_mem_tables, + "quorum_queue.compress_mem_tables = true", + [{rabbit, [ + {quorum_compress_mem_tables, true} + ]}], + []}, + + {quorum_queue_segment_max_size_bytes, + "quorum_queue.segment_max_size_bytes = 64000000", + [{rabbit, [ + {quorum_segment_max_size_bytes, 64000000} + ]}], + []}, + + {quorum_queue_segment_max_entries, + "quorum_queue.segment_max_entries = 4096", + [{rabbit, [ + {quorum_segment_max_entries, 4096} + ]}], + []}, + + {quorum_queue_wal_max_size_bytes, + "quorum_queue.wal_max_size_bytes = 536870912", + [{rabbit, [ + {quorum_wal_max_size_bytes, 536870912} + ]}], + []}, + + {quorum_queue_wal_max_entries, + "quorum_queue.wal_max_entries = 500000", + [{rabbit, [ + {quorum_wal_max_entries, 500000} + ]}], + []}, + + {quorum_queue_wal_max_batch_size, + "quorum_queue.wal_max_batch_size = 4096", + [{rabbit, [ + {quorum_wal_max_batch_size, 4096} + ]}], + []}, + + {quorum_queue_snapshot_chunk_size, + "quorum_queue.snapshot_chunk_size = 1000000", + [{rabbit, [ + {quorum_snapshot_chunk_size, 1000000} + ]}], + []}, + %% %% Runtime parameters %%