This document provides a comprehensive guide to all parameter configurations for the Milvus Spark Connector.
This connector requires Milvus 2.6 or later (Storage V2).
For Milvus 2.5 and earlier versions, please use the legacy branch which is no longer actively maintained.
Milvus Spark Connector provides the milvus data source format for reading and writing Milvus data.
Additionally, a convenient MilvusDataReader utility class is provided to simplify collection data reading operations.
MilvusDataReader provides a convenient method to read collection data.
import com.zilliz.spark.connector.{MilvusDataReader, MilvusDataReaderConfig, MilvusOption}
// Basic usage
val milvusDF = MilvusDataReader.read(
spark,
MilvusDataReaderConfig(
uri = "http://localhost:19530",
token = "your-token",
collectionName = "your_collection"
)
)
// Usage with additional options
val milvusDFWithOptions = MilvusDataReader.read(
spark,
MilvusDataReaderConfig(
uri = "http://localhost:19530",
token = "your-token",
collectionName = "your_collection",
options = Map(
MilvusOption.MilvusDatabaseName -> "your_database",
MilvusOption.MilvusPartitionName -> "your_partition"
)
)
)| Parameter | Type | Required | Description |
|---|---|---|---|
uri |
String | Yes | Milvus server connection URI |
token |
String | Yes | Milvus authentication token |
collectionName |
String | Yes | Collection name |
options |
Map[String, String] | No | Additional configuration options, supports the following parameters |
Basic Connection Parameters:
MilvusOption.MilvusDatabaseName- Database nameMilvusOption.MilvusPartitionName- Partition name
S3 Storage Parameters:
MilvusOption.S3Endpoint- S3 service endpointMilvusOption.S3BucketName- S3 bucket nameMilvusOption.S3RootPath- S3 root pathMilvusOption.S3AccessKey- S3 access keyMilvusOption.S3SecretKey- S3 secret keyMilvusOption.S3UseSSL- Whether to use SSL connection ("true"/"false")MilvusOption.S3PathStyleAccess- Whether to use path-style access ("true"/"false")- Note: For Alibaba Cloud OSS, this must be manually set to "false"
- Other S3-compatible storage typically doesn't require setting this parameter
// Alibaba Cloud OSS Configuration Example
val ossOptions = Map(
MilvusOption.S3Endpoint -> "oss-cn-hangzhou.aliyuncs.com",
MilvusOption.S3BucketName -> "your-bucket-name",
MilvusOption.S3RootPath -> "your-root-path",
MilvusOption.S3AccessKey -> "your-access-key",
MilvusOption.S3SecretKey -> "your-secret-key",
MilvusOption.S3UseSSL -> "true",
MilvusOption.S3PathStyleAccess -> "false", // Must be set to false for OSS
MilvusOption.MilvusDatabaseName -> "default"
)
// AWS S3 Configuration Example
val s3Options = Map(
MilvusOption.S3Endpoint -> "s3.amazonaws.com",
MilvusOption.S3BucketName -> "your-bucket-name",
MilvusOption.S3RootPath -> "your-root-path",
MilvusOption.S3AccessKey -> "your-access-key",
MilvusOption.S3SecretKey -> "your-secret-key",
MilvusOption.S3UseSSL -> "true",
MilvusOption.MilvusDatabaseName -> "default"
// S3PathStyleAccess typically doesn't need to be set
)MilvusDataReader reads data using the Storage V2 (Loon) FFI interface. Delete operations are handled internally by the storage layer, so no separate delete log processing is needed.
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
MilvusOption.MilvusUri |
String | Yes | - | Milvus server connection URI, format: http://host:port or https://host:port |
MilvusOption.MilvusToken |
String | No | "" | Milvus server authentication token |
MilvusOption.MilvusDatabaseName |
String | No | "" | Database name, defaults to default database |
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
MilvusOption.MilvusServerPemPath |
String | No | "" | Server certificate file path (one-way TLS) |
MilvusOption.MilvusClientKeyPath |
String | No | "" | Client private key file path (mutual TLS) |
MilvusOption.MilvusClientPemPath |
String | No | "" | Client certificate file path (mutual TLS) |
MilvusOption.MilvusCaPemPath |
String | No | "" | CA certificate file path (mutual TLS) |
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
MilvusOption.MilvusCollectionName |
String | Yes | - | Collection name |
MilvusOption.MilvusPartitionName |
String | No | "" | Partition name, operates on all partitions when empty |
MilvusOption.MilvusCollectionID |
String | No | "" | Collection ID, usually auto-retrieved |
MilvusOption.MilvusPartitionID |
String | No | "" | Partition ID, usually auto-retrieved |
MilvusOption.MilvusSegmentID |
String | No | "" | Segment ID, for reading specific segments |
MilvusOption.ReaderFieldIDs |
String | No | "" | Comma-separated field ID list, for reading specific fields |
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
MilvusOption.MilvusInsertMaxBatchSize |
Int | No | 5000 | Maximum batch size for single insert operation |
MilvusOption.MilvusRetryCount |
Int | No | 3 | Number of retries on operation failure |
MilvusOption.MilvusRetryInterval |
Int | No | 1000 | Retry interval in milliseconds |
val df = spark.read
.format("milvus")
.option(MilvusOption.MilvusUri, "http://localhost:19530")
.option(MilvusOption.MilvusToken, "your-token")
.option(MilvusOption.MilvusCollectionName, "your_collection")
.option(MilvusOption.MilvusDatabaseName, "your_database")
.option(MilvusOption.ReaderFieldIDs, "1,2,100,101") // Read only specified fields
.load()df.write
.format("milvus")
.option(MilvusOption.MilvusUri, "http://localhost:19530")
.option(MilvusOption.MilvusToken, "your-token")
.option(MilvusOption.MilvusCollectionName, "your_collection")
.option(MilvusOption.MilvusDatabaseName, "your_database")
.option(MilvusOption.MilvusInsertMaxBatchSize, "1000")
.option(MilvusOption.MilvusRetryCount, "5")
.save()The output schema for milvus format depends on the Milvus collection schema and includes:
- User-defined fields (based on collection schema)
$meta(StringType) - Dynamic fields (if enabled)
- Version Requirement: This connector requires Milvus 2.6+ with Storage V2
- SSL/TLS Configuration: Supports both one-way and mutual TLS authentication, configure certificate files as needed
- Batch Size: Properly setting
MilvusOption.MilvusInsertMaxBatchSizecan optimize write performance - Retry Mechanism: Built-in retry mechanism improves operation reliability
- Parameter Constants: It's recommended to use constants defined in the
MilvusOptionclass to avoid string spelling errors
- Bool
- Int8, Int16, Int32, Int64
- Float, Double
- String, VarChar
- JSON
- FloatVector
- Float16Vector
- BFloat16Vector
- BinaryVector
- Int8Vector
- SparseFloatVector
- Array (supports scalar element types)