Ray Data integration for Milvus Storage, providing efficient datasource and datasink implementations for reading from and writing to Milvus Storage format.
ray-milvus enables seamless integration between Ray Data and Milvus Storage, allowing you to:
- Read data from Milvus Storage into Ray Datasets for distributed processing
- Write Ray Datasets to Milvus Storage format (Apache Arrow/Parquet)
- Leverage Ray's parallel processing capabilities with Milvus Storage
- Build scalable data pipelines for vector data and machine learning workloads
# Using pip
pip install ray-milvus
# Using uv
uv add ray-milvus
# For development
git clone https://github.com/your-repo/ray-milvus.git
cd ray-milvus
uv sync- Python >= 3.10
- ray[data] >= 2.51.1
- pyarrow >= 21.0.0
- numpy >= 1.24.0
- milvus-storage
import ray
import pyarrow as pa
from ray_milvus import write_milvus
# Initialize Ray
ray.init()
# Create a Ray dataset
ds = ray.data.range(1000)
# Define schema for Milvus storage
schema = pa.schema([
pa.field("id", pa.int64(), nullable=True, metadata={"PARQUET:field_id": "1"})
])
# Write to Milvus storage
write_milvus(
ds,
path="/tmp/my_dataset",
schema=schema,
properties={"fs.storage_type": "local", "fs.root_path": "/tmp/"}
)from ray_milvus import read_milvus
# Define column groups (JSON manifest)
column_groups = '''
{
"column_groups": [
{
"columns": ["id", "vector", "text"],
"format": "parquet",
"paths": ["/tmp/my_dataset/column_group_0.parquet"]
}
]
}'''
# Read from Milvus storage
ds = read_milvus(
column_groups=column_groups,
schema=schema,
properties={"fs.storage_type": "local", "fs.root_path": "/tmp/"}
)
# Process with Ray Data
filtered = ds.filter(lambda row: row["id"] > 500)
print(f"Filtered dataset: {filtered.count()} rows")Ray Data datasource for reading from Milvus Storage.
Key Features:
- Parallel reading with configurable parallelism
- Column projection support (read only specific columns)
- Predicate pushdown for efficient filtering
- Automatic batch processing
Usage:
import ray
from ray_milvus import MilvusDatasource
datasource = MilvusDatasource(
column_groups=[column_groups_json],
schema=schema,
columns=["id", "vector"], # Optional: read specific columns
predicate="id > 100", # Optional: filter expression
properties={"fs.storage_type": "local"}
)
ds = ray.data.read_datasource(datasource, parallelism=4)Ray Data datasink for writing to Milvus Storage.
Key Features:
- Parallel writing with Ray workers
- Automatic schema conversion
- Progress tracking and statistics
- Resource cleanup and error handling
Usage:
from ray_milvus import MilvusDatasink
datasink = MilvusDatasink(
path="/tmp/output",
schema=schema,
properties={"fs.storage_type": "local"}
)
ds.write_datasink(datasink)# Read from multiple column groups in parallel
column_groups = [
'{"segments": [{"path": "/tmp/cg1"}]}',
'{"segments": [{"path": "/tmp/cg2"}]}',
'{"segments": [{"path": "/tmp/cg3"}]}'
]
ds = read_milvus(
column_groups=column_groups,
schema=schema,
properties=properties,
parallelism=8 # Control parallel tasks
)# Read only specific columns for better performance
ds = read_milvus(
column_groups=column_groups,
schema=schema,
columns=["id", "vector"], # Only read these columns
properties=properties
)import ray
import numpy as np
from ray_milvus import read_milvus, write_milvus
# Read data
ds = read_milvus(column_groups, schema, properties)
# Transform with Ray
def normalize_vector(row):
vector = np.array(row["vector"])
norm = np.linalg.norm(vector)
row["vector"] = (vector / norm).tolist() if norm > 0 else vector.tolist()
return row
processed_ds = ds.map(normalize_vector)
# Filter
filtered_ds = processed_ds.filter(lambda row: row["id"] > 1000)
# Write back
write_milvus(filtered_ds, "/tmp/processed", schema, properties)# Group and aggregate
def compute_stats(batch):
import pyarrow as pa
df = batch.to_pandas()
vectors = np.array(df["vector"].tolist())
return {
"label": df["label"].iloc[0],
"count": len(df),
"mean_norm": float(np.linalg.norm(vectors, axis=1).mean())
}
stats = ds.groupby("label").map_groups(
compute_stats,
batch_format="pyarrow"
)
for stat in stats.take_all():
print(stat)Common properties for Milvus Storage:
properties = {
"fs.storage_type": "local", # or "s3", "azure", etc.
"fs.root_path": "/tmp/", # Base path for storage
# S3 configuration (if using S3)
# "fs.s3.endpoint": "s3.amazonaws.com",
# "fs.s3.access_key": "your-access-key",
# "fs.s3.secret_key": "your-secret-key",
# "fs.s3.region": "us-west-2",
}Define schemas with PARQUET field IDs for Milvus Storage:
schema = pa.schema([
pa.field("id", pa.int64(), nullable=True,
metadata={"PARQUET:field_id": "1"}),
pa.field("vector", pa.list_(pa.float32()), nullable=True,
metadata={"PARQUET:field_id": "2"}),
pa.field("text", pa.string(), nullable=True,
metadata={"PARQUET:field_id": "3"}),
])The examples/ directory contains a complete working example:
basic_example.py- Basic read/write operations and data processing
Run the example:
python examples/basic_example.pydef read_milvus(
column_groups: Union[str, List[str]],
schema: pa.Schema,
columns: Optional[List[str]] = None,
predicate: Optional[str] = None,
properties: Optional[Dict[str, str]] = None,
parallelism: int = -1,
**read_args
) -> ray.data.DatasetParameters:
column_groups: JSON string(s) with column group manifestsschema: PyArrow schema for the datasetcolumns: Optional list of columns to readpredicate: Optional filter expressionproperties: Storage configuration propertiesparallelism: Number of parallel read tasks (-1 for default)**read_args: Additional Ray read arguments
Returns: Ray Dataset
def write_milvus(
dataset: ray.data.Dataset,
path: str,
schema: pa.Schema,
properties: Optional[Dict[str, str]] = None,
**write_args
) -> NoneParameters:
dataset: Ray Dataset to writepath: Base path for outputschema: PyArrow schema for the datasetproperties: Storage configuration properties**write_args: Additional Ray write arguments
# Clone the repository
git clone https://github.com/your-repo/ray-milvus.git
cd ray-milvus
# Install with dev dependencies
uv sync
# Run tests
pytest tests/# Run all tests
pytest
# Run with coverage
pytest --cov=ray_milvus
# Run specific test file
pytest tests/test_integration.py- Parallelism: Tune the
parallelismparameter based on your cluster size and data distribution - Column Projection: Only read columns you need to reduce I/O
- Batch Size: Adjust Ray's batch size for your workload
- Memory: Monitor memory usage when processing large datasets
Import Error: Cannot import milvus_storage
pip install milvus-storageSchema Mismatch Error Ensure your PyArrow schema matches the data types in your Milvus Storage files.
Memory Issues Reduce parallelism or use streaming operations:
ds.map_batches(fn, batch_size=100).write_milvus(...)Contributions are welcome! Please:
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request
This project is licensed under the Server Side Public License v1 (SSPLv1) and the GNU Affero General Public License v3 (AGPLv3).
- Initial release
- MilvusDatasource implementation
- MilvusDatasink implementation
- Support for parallel reading and writing
- Column projection and predicate pushdown
- Examples and documentation