Skip to content

Commit 6b517ff

Browse files
fix(rate-limiter): address review findings across engine, tests, and benchmarks
Correctness: - Fix sliding window member collision in multi-process Redis by including PID in sorted-set member string (redis_backend.rs) - Reject zero-count rate strings in Python _parse_rate() to match Rust (rate_limiter.py) - Fix allow_many() returning fewer results than input checks (rate_limiter.py) - Guard sub-second window_nanos producing reset_timestamp == now (memory.rs) - Fix EvalDimension docstring incorrectly describing EvalResult (types.rs) - Deduplicate from_dims empty case via unlimited(0) (types.rs) Performance: - Add EVALSHA caching with NOSCRIPT fallback to Rust Redis backend, matching the Python _ensure_scripts_loaded/_evalsha pattern (redis_backend.rs) - Cache parsed rate strings in Python MemoryBackend to avoid per-request _parse_rate() overhead (rate_limiter.py) - Fix Criterion benchmarks to advance clock between iterations so they measure steady-state under-limit path, not blocked path (benches/rate_limiter.rs) Tests: - Add TokenBucketAlgorithm.sweep() eviction tests - Add _extract_user_identity dict fallback chain tests (email/id/sub) - Add prompt_pre_fetch Rust async Redis path coverage - Remove unused jwt_token_alice/jwt_token_bob fixtures from integration tests - Fix time.sleep() -> asyncio.sleep() in async integration tests Load tests and Makefile: - Fix response misclassification: HTTP 429 now classified as [rate-limited] instead of [infra-error] in backend correctness and scale load tests - Use HTTP 429 as primary signal in capacity test, keep 422 string match as secondary fallback - Add missing RL_REQS_PER_SECOND and RL_PROMPT_ID Makefile defaults - Fix misleading "500 users" comment to reflect actual default of 100 Signed-off-by: Pratik Gandhi <gandhipratik203@gmail.com>
1 parent 28a4eb8 commit 6b517ff

11 files changed

Lines changed: 304 additions & 111 deletions

File tree

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2326,6 +2326,8 @@ MCP_RATE_LIMITER_REDIS_CAPACITY_LOCUSTFILE ?= tests/loadtest/locustfile_rate_lim
23262326
RL_ALGORITHM ?= fixed_window
23272327
RL_USERS ?= 100
23282328
RL_SPAWN_RATE ?= 10
2329+
RL_REQS_PER_SECOND ?= 0.25
2330+
RL_PROMPT_ID ?=
23292331
MCP_PROTOCOL_HOST ?= http://localhost:4444
23302332
MCP_BENCHMARK_HOST ?= http://localhost:8080
23312333
MCP_BENCHMARK_SERVER_ID ?= 9779b6698cbd4b4995ee04a4fab38737
@@ -2446,7 +2448,7 @@ benchmark-rate-limiter: ## Rate limiter correctness test (1
24462448
# help: benchmark-rate-limiter-scale - Multi-user scale test showing Redis memory divergence across algorithms
24472449
.PHONY: benchmark-rate-limiter-scale
24482450
RL_RUN_TIME ?= 300s
2449-
benchmark-rate-limiter-scale: ## Scale test: 500 unique users, Redis memory timeline per algorithm
2451+
benchmark-rate-limiter-scale: ## Scale test: RL_USERS unique users (default 100), Redis memory timeline per algorithm
24502452
@echo "📈 Running rate limiter scale test (resource divergence)..."
24512453
@echo " Algorithm: $(RL_ALGORITHM) (must match plugins/config.yaml)"
24522454
@echo " Users: $(RL_USERS) unique identities (each creates own Redis key)"

plugins/rate_limiter/rate_limiter.py

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ def _parse_rate(rate: str) -> tuple[int, int]:
118118
count = int(count_str)
119119
except (ValueError, AttributeError):
120120
raise ValueError(f"Invalid rate string {rate!r}: expected '<count>/<unit>' e.g. '60/m'")
121+
if count <= 0:
122+
raise ValueError(f"Invalid rate string {rate!r}: count must be > 0, got {count}")
121123
per = per.strip().lower()
122124
if per in ("s", "sec", "second"):
123125
return count, 1
@@ -449,6 +451,7 @@ def __init__(self, algorithm: FixedWindowAlgorithm | SlidingWindowAlgorithm | To
449451
self._lock = asyncio.Lock()
450452
self._sweep_interval = sweep_interval
451453
self._sweep_task: Optional[asyncio.Task] = None # type: ignore[type-arg]
454+
self._parsed_cache: Dict[str, tuple[int, int]] = {} # rate_str → (count, window)
452455

453456
def _ensure_sweep_task(self) -> None:
454457
"""Start the background sweep task if it is not already running."""
@@ -470,7 +473,11 @@ async def allow(self, key: str, limit: Optional[str]) -> tuple[bool, int, int, d
470473
self._ensure_sweep_task()
471474
if not limit:
472475
return True, 0, 0, {"limited": False}
473-
count, window = _parse_rate(limit)
476+
parsed = self._parsed_cache.get(limit)
477+
if parsed is None:
478+
parsed = _parse_rate(limit)
479+
self._parsed_cache[limit] = parsed
480+
count, window = parsed
474481
return await self._algorithm.allow(self._lock, key, count, window)
475482

476483

@@ -826,27 +833,37 @@ async def allow_many(self, checks: List[Tuple[str, str]]) -> List[tuple[bool, in
826833
Returns:
827834
One (allowed, limit, reset_timestamp, metadata) tuple per input check.
828835
"""
829-
active = [(key, limit) for key, limit in checks if limit]
830-
if not active:
831-
return [(True, 0, 0, {"limited": False})] * len(checks)
836+
no_limit: tuple[bool, int, int, dict[str, Any]] = (True, 0, 0, {"limited": False})
837+
active_indices = [i for i, (_, limit) in enumerate(checks) if limit]
838+
if not active_indices:
839+
return [no_limit] * len(checks)
832840

841+
active = [checks[i] for i in active_indices]
833842
parsed: List[Tuple[str, int, int]] = [(key, *_parse_rate(limit)) for key, limit in active] # type: ignore[misc]
834843
redis_keys = [f"{self._prefix}:{key}:{window}" for key, _count, window in parsed]
835844

836845
try:
837846
client = await self._get_client()
838847
await self._ensure_scripts_loaded(client)
839848
if self._algorithm_name == ALGORITHM_SLIDING_WINDOW:
840-
return await self._allow_many_sliding(client, parsed, redis_keys)
841-
if self._algorithm_name == ALGORITHM_TOKEN_BUCKET:
842-
return await self._allow_many_token_bucket(client, parsed, redis_keys)
843-
return await self._allow_many_fixed(client, parsed, redis_keys)
849+
active_results = await self._allow_many_sliding(client, parsed, redis_keys)
850+
elif self._algorithm_name == ALGORITHM_TOKEN_BUCKET:
851+
active_results = await self._allow_many_token_bucket(client, parsed, redis_keys)
852+
else:
853+
active_results = await self._allow_many_fixed(client, parsed, redis_keys)
844854

845855
except Exception:
846856
logger.exception("RedisBackend.allow_many failed; %s", "falling back to memory" if self._fallback else "allowing request")
847857
if self._fallback is not None:
848-
return [await self._fallback.allow(key, limit) for key, limit in active]
849-
return [(True, 0, 0, {"limited": False})] * len(active)
858+
active_results = [await self._fallback.allow(key, limit) for key, limit in active]
859+
else:
860+
active_results = [no_limit] * len(active)
861+
862+
# Map active results back to the full input list.
863+
results: List[tuple[bool, int, int, dict[str, Any]]] = [no_limit] * len(checks)
864+
for idx, result in zip(active_indices, active_results):
865+
results[idx] = result
866+
return results
850867

851868
async def _allow_many_fixed(
852869
self, client: Any, parsed: List[Tuple[str, int, int]], redis_keys: List[str]

plugins_rust/rate_limiter/benches/rate_limiter.rs

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,85 +3,96 @@
33
//
44
// Criterion benchmarks for the rate limiter memory backend.
55
// PERF-01, MEM-02, MEM-03, MEM-04.
6+
//
7+
// Each iteration measures a single "under-limit" request — the realistic
8+
// hot path in production. The clock advances between iterations so the
9+
// window resets and counters stay low, preventing benchmark drift into the
10+
// "blocked" code path or unbounded VecDeque growth (sliding window).
611

712
use std::hint::black_box;
813
use std::sync::Arc;
914

1015
use criterion::{Criterion, criterion_group, criterion_main};
1116
use rate_limiter_rust::{
12-
clock::FakeClock,
17+
clock::{FakeClock, FakeClockHandle},
1318
config::{Algorithm, EngineConfig, parse_rate},
1419
engine::RateLimiterEngine,
1520
};
1621

1722
const T0_UNIX: i64 = 1_000_000;
18-
const WINDOW: u64 = 1_000_000_000; // 1s
23+
const LIMIT: u64 = 100;
24+
const WINDOW: u64 = 60_000_000_000; // 60s in nanos
1925

20-
fn make_engine(algorithm: Algorithm) -> RateLimiterEngine {
21-
let (clock, _handle) = FakeClock::new(T0_UNIX);
26+
fn make_engine(algorithm: Algorithm) -> (RateLimiterEngine, FakeClockHandle) {
27+
let (clock, handle) = FakeClock::new(T0_UNIX);
2228
let cfg = EngineConfig {
23-
by_user: Some(parse_rate("1000000/s").unwrap()),
29+
by_user: Some(parse_rate(&format!("{}/m", LIMIT)).unwrap()),
2430
by_tenant: None,
2531
by_tool: Default::default(),
2632
algorithm,
2733
};
28-
RateLimiterEngine::new_with_clock(cfg, Arc::new(clock))
34+
(RateLimiterEngine::new_with_clock(cfg, Arc::new(clock)), handle)
2935
}
3036

3137
fn bench_fixed_window(c: &mut Criterion) {
32-
let engine = make_engine(Algorithm::FixedWindow);
38+
let (engine, handle) = make_engine(Algorithm::FixedWindow);
3339
c.bench_function("fixed_window/single_key", |b| {
3440
b.iter(|| {
41+
// Advance past the window so each iteration is a fresh "allowed" request.
42+
handle.advance_secs(61);
3543
engine
3644
.evaluate_many(
37-
black_box(vec![("user:bench".to_string(), 1_000_000, WINDOW)]),
38-
T0_UNIX,
45+
black_box(vec![("user:bench".to_string(), LIMIT, WINDOW)]),
46+
handle.unix_secs(),
3947
)
4048
.unwrap()
4149
})
4250
});
4351
}
4452

4553
fn bench_token_bucket(c: &mut Criterion) {
46-
let engine = make_engine(Algorithm::TokenBucket);
54+
let (engine, handle) = make_engine(Algorithm::TokenBucket);
4755
c.bench_function("token_bucket/single_key", |b| {
4856
b.iter(|| {
57+
handle.advance_secs(61);
4958
engine
5059
.evaluate_many(
51-
black_box(vec![("user:bench".to_string(), 1_000_000, WINDOW)]),
52-
T0_UNIX,
60+
black_box(vec![("user:bench".to_string(), LIMIT, WINDOW)]),
61+
handle.unix_secs(),
5362
)
5463
.unwrap()
5564
})
5665
});
5766
}
5867

5968
fn bench_sliding_window(c: &mut Criterion) {
60-
let engine = make_engine(Algorithm::SlidingWindow);
69+
let (engine, handle) = make_engine(Algorithm::SlidingWindow);
6170
c.bench_function("sliding_window/single_key", |b| {
6271
b.iter(|| {
72+
handle.advance_secs(61);
6373
engine
6474
.evaluate_many(
65-
black_box(vec![("user:bench".to_string(), 1_000_000, WINDOW)]),
66-
T0_UNIX,
75+
black_box(vec![("user:bench".to_string(), LIMIT, WINDOW)]),
76+
handle.unix_secs(),
6777
)
6878
.unwrap()
6979
})
7080
});
7181
}
7282

7383
fn bench_multi_dim(c: &mut Criterion) {
74-
let engine = make_engine(Algorithm::FixedWindow);
84+
let (engine, handle) = make_engine(Algorithm::FixedWindow);
7585
c.bench_function("fixed_window/three_dims", |b| {
7686
b.iter(|| {
87+
handle.advance_secs(61);
7788
engine
7889
.evaluate_many(
7990
black_box(vec![
80-
("user:alice".to_string(), 1_000_000, WINDOW),
81-
("tenant:acme".to_string(), 10_000_000, WINDOW),
82-
("tool:search".to_string(), 100_000, WINDOW),
91+
("user:alice".to_string(), LIMIT, WINDOW),
92+
("tenant:acme".to_string(), LIMIT * 100, WINDOW),
93+
("tool:search".to_string(), LIMIT / 10, WINDOW),
8394
]),
84-
T0_UNIX,
95+
handle.unix_secs(),
8596
)
8697
.unwrap()
8798
})

plugins_rust/rate_limiter/src/memory.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,10 @@ fn fixed_window(
206206
*window_start_unix = now_unix;
207207
}
208208

209-
let window_secs = (window_nanos / 1_000_000_000) as i64;
209+
// At least 1 second so reset_timestamp is always in the future, even if
210+
// window_nanos < 1 billion (sub-second window — currently unreachable via
211+
// config parsing but guarded defensively).
212+
let window_secs = (window_nanos / 1_000_000_000).max(1) as i64;
210213
// Constant within a window — matches Python backend behaviour (CORR-02).
211214
let reset_timestamp = *window_start_unix + window_secs;
212215

0 commit comments

Comments
 (0)