Description
EsMetadataMigrationPipeline.migrateAll() currently migrates index metadata sequentially using concatMap. Since index creation is independent across indices, this could be parallelized for better performance on clusters with many indices.
Current Behavior
Flux.fromIterable(indices)
.concatMap(indexName -> migrateIndexMetadata(indexName).thenReturn(indexName))
Proposed Change
Replace concatMap with flatMap and a bounded concurrency:
Flux.fromIterable(indices)
.flatMap(indexName -> migrateIndexMetadata(indexName).thenReturn(indexName), 5)
A default concurrency of 5 should be safe since index creation operations are independent.
Context
Identified during PR review of the RfsPipeline refactor. Kept sequential to match existing behavior and limit PR scope.
Description
EsMetadataMigrationPipeline.migrateAll()currently migrates index metadata sequentially usingconcatMap. Since index creation is independent across indices, this could be parallelized for better performance on clusters with many indices.Current Behavior
Proposed Change
Replace
concatMapwithflatMapand a bounded concurrency:A default concurrency of 5 should be safe since index creation operations are independent.
Context
Identified during PR review of the RfsPipeline refactor. Kept sequential to match existing behavior and limit PR scope.