perf: optimize V1 planInputPartitions() with batch API and eliminate S3 traversal#62
Open
Thor-ChenBiao wants to merge 1 commit intozilliztech:mainfrom
Open
perf: optimize V1 planInputPartitions() with batch API and eliminate S3 traversal#62Thor-ChenBiao wants to merge 1 commit intozilliztech:mainfrom
Thor-ChenBiao wants to merge 1 commit intozilliztech:mainfrom
Conversation
…S3 traversal
## Problem
planInputPartitions() runs on Driver single-threaded. For N segments:
- N HTTP calls to get segment info (one per segment)
- 2N S3 listStatus calls to traverse binlog directories
## Solution
### 1. Batch Segment Info API
- Add `getSegmentsInfoBatch()` in MilvusClient.scala
- Single HTTP call fetches all segment insertLogIDs at once
- N HTTP calls → 1 HTTP call
### 2. Eliminate S3 Traversal
Before: Even though API returned insertLogIDs, code still traversed S3 to "verify":
fs.listStatus(segmentPath) // 1st S3 call: list field directories
→ fs.listStatus(fieldPath) // 2nd S3 call: list binlog files per field
→ filter by insertLogIDs // then filter results
After: Trust API response, build paths directly via string concatenation:
insertLogIDs: ["100/123456", "101/789012"]
↓ split("/")
fieldID: "100", logID: "123456"
↓ concat
fullPath: s"${rootPath}/${fieldID}/${logID}"
Result: 2N S3 calls → 0 S3 calls
### 3. Parallel Segment Processing
- Use .par.foreach with ConcurrentHashMap for thread-safe parallel processing
- Note: After batch API optimization, this has minimal impact since
remaining operations are pure memory/CPU (no I/O)
## Changes
- MilvusClient.scala: Add getSegmentsInfoBatch() method
- MilvusDataSource.scala:
- Batch fetch at start of planInputPartitions()
- New buildFieldMapFromLogInfo() builds paths without S3
- Remove SegmentInfoCache (no longer needed with batch API)
- Fail-fast on batch API failure (no silent fallback)
Collaborator
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: Thor-ChenBiao The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Collaborator
|
Welcome @Thor-ChenBiao! It looks like this is your first PR to zilliztech/spark-milvus 🎉 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
planInputPartitions() runs on Driver single-threaded. For N segments:
Solution
1. Batch Segment Info API
getSegmentsInfoBatch()in MilvusClient.scala2. Eliminate S3 Traversal
Before: Even though API returned insertLogIDs, code still traversed S3 to "verify":
fs.listStatus(segmentPath) // 1st S3 call: list field directories
→ fs.listStatus(fieldPath) // 2nd S3 call: list binlog files per field
→ filter by insertLogIDs // then filter results
After: Trust API response, build paths directly via string concatenation:
insertLogIDs: ["100/123456", "101/789012"]
↓ split("/")
fieldID: "100", logID: "123456"
↓ concat
fullPath: s"${rootPath}/${fieldID}/${logID}"
Result: 2N S3 calls → 0 S3 calls
3. Parallel Segment Processing
Changes