Skip to content

Commit 5568d1f

Browse files
xiangfu0claude
andcommitted
feat: add Apache Pinot vector search client
Adds a complete Apache Pinot client for VectorDBBench. Index types: HNSW (Lucene), IVF_FLAT, IVF_PQ, IVF_ON_DISK Metrics: L2, IP, COSINE Filters: NumGE, StrEqual Optional dep: pip install "vectordb-bench[pinot]" Parallel loading: thread_safe=True — each worker thread maintains its own row buffer and flushes to Pinot via a fresh HTTP session. Since Pinot's ingestFromFile is synchronous (blocks until HNSW index is built, ~6 min per 100K×768D segment), concurrent flushes across threads reduce load time significantly vs sequential flushing. Benchmark results: Small dataset (OpenAI 50K, 768D, L2): HNSW: 798 QPS, recall=1.000 IVF_FLAT: 800 QPS, recall=1.000 IVF_PQ: 795 QPS, recall=1.000 IVF_ON_DISK: 691 QPS, recall=1.000 Large dataset (Cohere 1M, 768D, COSINE): HNSW m=16: 74 QPS, recall=0.982 Filter benchmark (Cohere 1M, COSINE, HNSW m=32): 1% NumGE: 71 QPS, recall=0.977 99% NumGE: 97 QPS, recall=0.649 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 77d76ab commit 5568d1f

File tree

7 files changed

+768
-0
lines changed

7 files changed

+768
-0
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ turbopuffer = [ "turbopuffer" ]
8282
zvec = [ "zvec" ]
8383
endee = [ "endee==0.1.10" ]
8484
lindorm = [ "opensearch-py" ]
85+
pinot = [ "requests" ]
8586

8687
[project.urls]
8788
Repository = "https://github.com/zilliztech/VectorDBBench"

vectordb_bench/backend/clients/__init__.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class DB(Enum):
6161
Lindorm = "Lindorm"
6262
VectorChord = "VectorChord"
6363
PolarDB = "PolarDB"
64+
Pinot = "Pinot"
6465

6566
@property
6667
def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912, C901, PLR0915
@@ -257,6 +258,11 @@ def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912, C901, PLR0915
257258

258259
return PolarDB
259260

261+
if self == DB.Pinot:
262+
from .pinot.pinot import Pinot
263+
264+
return Pinot
265+
260266
msg = f"Unknown DB: {self.name}"
261267
raise ValueError(msg)
262268

@@ -455,6 +461,11 @@ def config_cls(self) -> type[DBConfig]: # noqa: PLR0911, PLR0912, C901, PLR0915
455461

456462
return PolarDBConfig
457463

464+
if self == DB.Pinot:
465+
from .pinot.config import PinotConfig
466+
467+
return PinotConfig
468+
458469
msg = f"Unknown DB: {self.name}"
459470
raise ValueError(msg)
460471

@@ -631,6 +642,15 @@ def case_config_cls( # noqa: C901, PLR0911, PLR0912, PLR0915
631642

632643
return _vectorchord_case_config.get(index_type)
633644

645+
if self == DB.Pinot:
646+
from .pinot.config import PinotHNSWConfig, PinotIVFFlatConfig, PinotIVFPQConfig
647+
648+
return {
649+
IndexType.HNSW: PinotHNSWConfig,
650+
IndexType.IVFFlat: PinotIVFFlatConfig,
651+
IndexType.IVFPQ: PinotIVFPQConfig,
652+
}.get(index_type, PinotHNSWConfig)
653+
634654
# DB.Pinecone, DB.Redis
635655
return EmptyDBCaseConfig
636656

vectordb_bench/backend/clients/pinot/__init__.py

Whitespace-only changes.
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
from typing import Annotated, TypedDict, Unpack
2+
3+
import click
4+
from pydantic import SecretStr
5+
6+
from ....cli.cli import (
7+
CommonTypedDict,
8+
HNSWFlavor2,
9+
click_parameter_decorators_from_typed_dict,
10+
run,
11+
)
12+
from .. import DB
13+
14+
15+
class PinotTypedDict(TypedDict):
16+
controller_host: Annotated[
17+
str,
18+
click.option("--controller-host", type=str, default="localhost", help="Pinot Controller host"),
19+
]
20+
controller_port: Annotated[
21+
int,
22+
click.option("--controller-port", type=int, default=9000, help="Pinot Controller port"),
23+
]
24+
broker_host: Annotated[
25+
str,
26+
click.option("--broker-host", type=str, default="localhost", help="Pinot Broker host"),
27+
]
28+
broker_port: Annotated[
29+
int,
30+
click.option("--broker-port", type=int, default=8099, help="Pinot Broker port"),
31+
]
32+
username: Annotated[
33+
str,
34+
click.option("--username", type=str, default=None, help="Pinot username (optional)"),
35+
]
36+
password: Annotated[
37+
str,
38+
click.option("--password", type=str, default=None, help="Pinot password (optional)"),
39+
]
40+
ingest_batch_size: Annotated[
41+
int,
42+
click.option(
43+
"--ingest-batch-size",
44+
type=int,
45+
default=100_000,
46+
show_default=True,
47+
help=(
48+
"Rows buffered before flushing one Pinot segment (one ingestFromFile call). "
49+
"Larger values mean fewer segments and better IVF training / query performance. "
50+
"Reduce if memory is constrained (100K x 768-dim float32 ~= 300 MB)."
51+
),
52+
),
53+
]
54+
55+
56+
def _pinot_db_config(parameters: dict):
57+
from .config import PinotConfig
58+
59+
return PinotConfig(
60+
db_label=parameters["db_label"],
61+
controller_host=parameters["controller_host"],
62+
controller_port=parameters["controller_port"],
63+
broker_host=parameters["broker_host"],
64+
broker_port=parameters["broker_port"],
65+
username=parameters.get("username"),
66+
password=SecretStr(parameters["password"]) if parameters.get("password") else None,
67+
ingest_batch_size=parameters["ingest_batch_size"],
68+
)
69+
70+
71+
@click.group()
72+
def Pinot():
73+
"""Apache Pinot vector search benchmarks."""
74+
75+
76+
# ---------------------------------------------------------------------------
77+
# HNSW
78+
# ---------------------------------------------------------------------------
79+
80+
81+
class PinotHNSWTypedDict(CommonTypedDict, PinotTypedDict, HNSWFlavor2): ...
82+
83+
84+
@Pinot.command("hnsw")
85+
@click_parameter_decorators_from_typed_dict(PinotHNSWTypedDict)
86+
def pinot_hnsw(**parameters: Unpack[PinotHNSWTypedDict]):
87+
from .config import PinotHNSWConfig
88+
89+
run(
90+
db=DB.Pinot,
91+
db_config=_pinot_db_config(parameters),
92+
db_case_config=PinotHNSWConfig(
93+
m=parameters["m"],
94+
ef_construction=parameters["ef_construction"],
95+
ef=parameters["ef_runtime"],
96+
),
97+
**parameters,
98+
)
99+
100+
101+
# ---------------------------------------------------------------------------
102+
# IVF_FLAT
103+
# ---------------------------------------------------------------------------
104+
105+
106+
class PinotIVFFlatTypedDict(CommonTypedDict, PinotTypedDict):
107+
nlist: Annotated[
108+
int,
109+
click.option("--nlist", type=int, default=128, help="Number of Voronoi cells (IVF nlist)"),
110+
]
111+
quantizer: Annotated[
112+
str,
113+
click.option(
114+
"--quantizer",
115+
type=click.Choice(["FLAT", "SQ8", "SQ4"]),
116+
default="FLAT",
117+
help="Quantizer type for IVF_FLAT",
118+
),
119+
]
120+
nprobe: Annotated[
121+
int,
122+
click.option("--nprobe", type=int, default=8, help="Number of cells to probe at query time"),
123+
]
124+
train_sample_size: Annotated[
125+
int,
126+
click.option(
127+
"--train-sample-size",
128+
type=int,
129+
default=None,
130+
help="Training sample size (defaults to max(nlist*50, 1000) if not set)",
131+
),
132+
]
133+
134+
135+
@Pinot.command("ivf-flat")
136+
@click_parameter_decorators_from_typed_dict(PinotIVFFlatTypedDict)
137+
def pinot_ivf_flat(**parameters: Unpack[PinotIVFFlatTypedDict]):
138+
from .config import PinotIVFFlatConfig
139+
140+
run(
141+
db=DB.Pinot,
142+
db_config=_pinot_db_config(parameters),
143+
db_case_config=PinotIVFFlatConfig(
144+
nlist=parameters["nlist"],
145+
quantizer=parameters["quantizer"],
146+
nprobe=parameters["nprobe"],
147+
train_sample_size=parameters.get("train_sample_size"),
148+
),
149+
**parameters,
150+
)
151+
152+
153+
# ---------------------------------------------------------------------------
154+
# IVF_PQ
155+
# ---------------------------------------------------------------------------
156+
157+
158+
class PinotIVFPQTypedDict(CommonTypedDict, PinotTypedDict):
159+
nlist: Annotated[
160+
int,
161+
click.option("--nlist", type=int, default=128, help="Number of Voronoi cells (IVF nlist)"),
162+
]
163+
pq_m: Annotated[
164+
int,
165+
click.option("--pq-m", type=int, default=8, help="Number of PQ sub-quantizers (must divide dimension)"),
166+
]
167+
pq_nbits: Annotated[
168+
int,
169+
click.option(
170+
"--pq-nbits",
171+
type=click.Choice(["4", "6", "8"]),
172+
default="8",
173+
help="Bits per PQ code (4, 6, or 8)",
174+
),
175+
]
176+
train_sample_size: Annotated[
177+
int,
178+
click.option("--train-sample-size", type=int, default=6400, help="Training sample size (must be >= nlist)"),
179+
]
180+
nprobe: Annotated[
181+
int,
182+
click.option("--nprobe", type=int, default=8, help="Number of cells to probe at query time"),
183+
]
184+
185+
186+
@Pinot.command("ivf-pq")
187+
@click_parameter_decorators_from_typed_dict(PinotIVFPQTypedDict)
188+
def pinot_ivf_pq(**parameters: Unpack[PinotIVFPQTypedDict]):
189+
from .config import PinotIVFPQConfig
190+
191+
run(
192+
db=DB.Pinot,
193+
db_config=_pinot_db_config(parameters),
194+
db_case_config=PinotIVFPQConfig(
195+
nlist=parameters["nlist"],
196+
pq_m=parameters["pq_m"],
197+
pq_nbits=int(parameters["pq_nbits"]),
198+
train_sample_size=parameters["train_sample_size"],
199+
nprobe=parameters["nprobe"],
200+
),
201+
**parameters,
202+
)
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
from pydantic import BaseModel, SecretStr
2+
3+
from ..api import DBCaseConfig, DBConfig, MetricType
4+
5+
6+
class PinotConfig(DBConfig):
7+
controller_host: str = "localhost"
8+
controller_port: int = 9000
9+
broker_host: str = "localhost"
10+
broker_port: int = 8099
11+
username: str | None = None
12+
password: SecretStr | None = None
13+
# Rows buffered before flushing one Pinot segment (one ingestFromFile call).
14+
# Larger values → fewer segments → better IVF training & query perf.
15+
# 100_000 rows x 768-dim float32 ~= 300 MB in-memory.
16+
ingest_batch_size: int = 100_000
17+
18+
def to_dict(self) -> dict:
19+
return {
20+
"controller_host": self.controller_host,
21+
"controller_port": self.controller_port,
22+
"broker_host": self.broker_host,
23+
"broker_port": self.broker_port,
24+
"username": self.username,
25+
"password": self.password.get_secret_value() if self.password else None,
26+
"ingest_batch_size": self.ingest_batch_size,
27+
}
28+
29+
30+
class PinotHNSWConfig(BaseModel, DBCaseConfig):
31+
"""HNSW vector index config for Apache Pinot (Lucene-based)."""
32+
33+
metric_type: MetricType | None = None
34+
m: int = 16 # maxCon: max connections per node
35+
ef_construction: int = 100 # beamWidth: construction beam width
36+
ef: int | None = None # ef_search: HNSW candidate list size at query time (default=k)
37+
38+
def index_param(self) -> dict:
39+
return {
40+
"vectorIndexType": "HNSW",
41+
"maxCon": str(self.m),
42+
"beamWidth": str(self.ef_construction),
43+
}
44+
45+
def search_param(self) -> dict:
46+
# ef controls the HNSW candidate list during search via vectorSimilarity(col, q, ef).
47+
# Larger ef → better recall, slightly higher latency. Defaults to k if not set.
48+
return {"ef": self.ef} if self.ef is not None else {}
49+
50+
51+
class PinotIVFFlatConfig(BaseModel, DBCaseConfig):
52+
"""IVF_FLAT vector index config for Apache Pinot."""
53+
54+
metric_type: MetricType | None = None
55+
nlist: int = 128 # number of Voronoi cells (centroids)
56+
quantizer: str = "FLAT" # FLAT, SQ8, or SQ4
57+
train_sample_size: int | None = None # defaults to max(nlist*50, 1000) if None
58+
nprobe: int = 8 # number of cells to probe at query time
59+
60+
def index_param(self) -> dict:
61+
params: dict = {
62+
"vectorIndexType": "IVF_FLAT",
63+
"nlist": str(self.nlist),
64+
"quantizer": self.quantizer,
65+
}
66+
if self.train_sample_size is not None:
67+
params["trainSampleSize"] = str(self.train_sample_size)
68+
return params
69+
70+
def search_param(self) -> dict:
71+
return {"nprobe": self.nprobe}
72+
73+
74+
class PinotIVFPQConfig(BaseModel, DBCaseConfig):
75+
"""IVF_PQ vector index config for Apache Pinot (residual product quantization)."""
76+
77+
metric_type: MetricType | None = None
78+
nlist: int = 128 # number of Voronoi cells (centroids)
79+
pq_m: int = 8 # number of sub-quantizers (must divide vectorDimension)
80+
pq_nbits: int = 8 # bits per sub-quantizer code: 4, 6, or 8
81+
train_sample_size: int = 6400 # training sample size (must be >= nlist)
82+
nprobe: int = 8 # number of cells to probe at query time
83+
84+
def index_param(self) -> dict:
85+
return {
86+
"vectorIndexType": "IVF_PQ",
87+
"nlist": str(self.nlist),
88+
"pqM": str(self.pq_m),
89+
"pqNbits": str(self.pq_nbits),
90+
"trainSampleSize": str(self.train_sample_size),
91+
}
92+
93+
def search_param(self) -> dict:
94+
return {"nprobe": self.nprobe}

0 commit comments

Comments
 (0)