Skip to content

Commit ee0f238

Browse files
committed
fix(db): resolve transaction leak and connection pool exhaustion
Critical fixes for load test failures at 4000 concurrent users: Issue #1 - Transaction leak in streamablehttp_transport.py (CRITICAL): - Add explicit asyncio.CancelledError handling in get_db() context manager - When MCP handlers are cancelled (client disconnect, timeout), the finally block may not execute properly, leaving transactions "idle in transaction" - Now explicitly rollback and close before re-raising CancelledError - Add rollback in direct SessionLocal usage at line ~1425 Issue #2 - Missing db parameter in admin routes (HIGH): - Add `db: Session = Depends(get_db)` to 73 remaining admin routes - Routes with @require_permission but no db param caused decorator to create fresh session via fresh_db_session() for EVERY permission check - This doubled connection usage for affected routes under load Issue #3 - Slow recovery from transaction leaks (MEDIUM): - Reduce IDLE_TRANSACTION_TIMEOUT from 300s to 30s in docker-compose.yml - Reduce CLIENT_IDLE_TIMEOUT from 300s to 60s - Leaked transactions now killed faster, preventing pool exhaustion Root cause confirmed: list_resources() MCP handler was primary source, with 155+ connections stuck on `SELECT resources.*` for up to 273 seconds. See todo/rca2.md for full analysis including live test data showing connection leak progression and 606+ idle transaction timeout errors. Signed-off-by: Mihai Criveti <crivetimihai@gmail.com>
1 parent b63923e commit ee0f238

3 files changed

Lines changed: 71 additions & 16 deletions

File tree

docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -719,14 +719,14 @@ services:
719719
- SERVER_IDLE_TIMEOUT=600 # Close unused server connections after 10 min
720720
# Timeout settings
721721
- QUERY_WAIT_TIMEOUT=60 # Max wait for available connection before failing request
722-
- CLIENT_IDLE_TIMEOUT=300 # Close idle client connections (aligned with IDLE_TRANSACTION_TIMEOUT)
722+
- CLIENT_IDLE_TIMEOUT=60 # Close idle client connections (aligned with IDLE_TRANSACTION_TIMEOUT)
723723
- SERVER_CONNECT_TIMEOUT=5 # Timeout for new connections to PostgreSQL
724724
# Transaction cleanup - critical for avoiding idle-in-transaction buildup
725725
# NOTE: In transaction pooling, session-level advisory locks (used by migrations)
726726
# can stick unless the reset query clears them; DISCARD ALL is safest.
727727
- SERVER_RESET_QUERY=DISCARD ALL # Reset connection state when returned to pool
728728
- SERVER_RESET_QUERY_ALWAYS=1 # Always run reset query even after clean transactions
729-
- IDLE_TRANSACTION_TIMEOUT=300 # Kill transactions idle > 300s (increased from 120s to handle slow MCP calls)
729+
- IDLE_TRANSACTION_TIMEOUT=30 # Kill transactions idle > 30s to prevent connection pool exhaustion
730730
# Authentication
731731
- AUTH_TYPE=scram-sha-256 # Match PostgreSQL auth method
732732
depends_on:

mcpgateway/admin.py

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1089,6 +1089,7 @@ async def update_global_passthrough_headers(
10891089
@rate_limit(requests_per_minute=10) # Strict limit for cache operations
10901090
async def invalidate_passthrough_headers_cache(
10911091
_user=Depends(get_current_user_with_permissions),
1092+
db: Session = Depends(get_db),
10921093
) -> Dict[str, Any]:
10931094
"""Invalidate the GlobalConfig cache.
10941095

@@ -1126,6 +1127,7 @@ async def invalidate_passthrough_headers_cache(
11261127
@rate_limit(requests_per_minute=30)
11271128
async def get_passthrough_headers_cache_stats(
11281129
_user=Depends(get_current_user_with_permissions),
1130+
db: Session = Depends(get_db),
11291131
) -> Dict[str, Any]:
11301132
"""Get GlobalConfig cache statistics.
11311133

@@ -1161,6 +1163,7 @@ async def get_passthrough_headers_cache_stats(
11611163
@rate_limit(requests_per_minute=10)
11621164
async def invalidate_a2a_stats_cache(
11631165
_user=Depends(get_current_user_with_permissions),
1166+
db: Session = Depends(get_db),
11641167
) -> Dict[str, Any]:
11651168
"""Invalidate the A2A stats cache.
11661169

@@ -1196,6 +1199,7 @@ async def invalidate_a2a_stats_cache(
11961199
@rate_limit(requests_per_minute=30)
11971200
async def get_a2a_stats_cache_stats(
11981201
_user=Depends(get_current_user_with_permissions),
1202+
db: Session = Depends(get_db),
11991203
) -> Dict[str, Any]:
12001204
"""Get A2A stats cache statistics.
12011205

@@ -1225,6 +1229,7 @@ async def get_a2a_stats_cache_stats(
12251229
async def get_mcp_session_pool_metrics(
12261230
request: Request, # pylint: disable=unused-argument
12271231
_user=Depends(get_current_user_with_permissions),
1232+
db: Session = Depends(get_db),
12281233
) -> Dict[str, Any]:
12291234
"""Get MCP session pool metrics.
12301235

@@ -10354,7 +10359,7 @@ async def admin_set_prompt_state(
1035410359

1035510360
@admin_router.post("/roots")
1035610361
@require_permission("admin.system_config", allow_admin_bypass=False)
10357-
async def admin_add_root(request: Request, user=Depends(get_current_user_with_permissions)) -> RedirectResponse:
10362+
async def admin_add_root(request: Request, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> RedirectResponse:
1035810363
"""Add a new root via the admin UI.
1035910364

1036010365
Expects form fields:
@@ -10388,7 +10393,7 @@ async def admin_add_root(request: Request, user=Depends(get_current_user_with_pe
1038810393

1038910394
@admin_router.post("/roots/{uri:path}/delete")
1039010395
@require_permission("admin.system_config", allow_admin_bypass=False)
10391-
async def admin_delete_root(uri: str, request: Request, user=Depends(get_current_user_with_permissions)) -> RedirectResponse:
10396+
async def admin_delete_root(uri: str, request: Request, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)) -> RedirectResponse:
1039210397
"""
1039310398
Delete a root via the admin UI.
1039410399

@@ -10778,7 +10783,7 @@ async def admin_test_gateway(
1077810783
# Event Streaming via SSE to the Admin UI
1077910784
@admin_router.get("/events")
1078010785
@require_permission("admin.events", allow_admin_bypass=False)
10781-
async def admin_events(request: Request, _user=Depends(get_current_user_with_permissions)):
10786+
async def admin_events(request: Request, _user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
1078210787
"""
1078310788
Stream admin events from all services via SSE (Server-Sent Events).
1078410789

@@ -11264,6 +11269,7 @@ async def admin_get_logs(
1126411269
offset: int = 0,
1126511270
order: str = "desc",
1126611271
user=Depends(get_current_user_with_permissions), # pylint: disable=unused-argument
11272+
db: Session = Depends(get_db),
1126711273
) -> Dict[str, Any]:
1126811274
"""Get filtered log entries from the in-memory buffer.
1126911275

@@ -11349,6 +11355,7 @@ async def admin_stream_logs(
1134911355
entity_id: Optional[str] = None,
1135011356
level: Optional[str] = None,
1135111357
user=Depends(get_current_user_with_permissions), # pylint: disable=unused-argument
11358+
db: Session = Depends(get_db),
1135211359
):
1135311360
"""Stream real-time log updates via Server-Sent Events.
1135411361

@@ -11433,6 +11440,7 @@ async def generate():
1143311440
async def admin_get_log_file(
1143411441
filename: Optional[str] = None,
1143511442
user=Depends(get_current_user_with_permissions), # pylint: disable=unused-argument
11443+
db: Session = Depends(get_db),
1143611444
):
1143711445
"""Download log file.
1143811446

@@ -11563,6 +11571,7 @@ async def admin_export_logs(
1156311571
request_id: Optional[str] = None,
1156411572
search: Optional[str] = None,
1156511573
user=Depends(get_current_user_with_permissions), # pylint: disable=unused-argument
11574+
db: Session = Depends(get_db),
1156611575
):
1156711576
"""Export filtered logs in JSON or CSV format.
1156811577

@@ -11952,7 +11961,7 @@ async def admin_import_configuration(request: Request, db: Session = Depends(get
1195211961

1195311962
@admin_router.get("/import/status/{import_id}")
1195411963
@require_permission("admin.system_config", allow_admin_bypass=False)
11955-
async def admin_get_import_status(import_id: str, user=Depends(get_current_user_with_permissions)):
11964+
async def admin_get_import_status(import_id: str, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
1195611965
"""Get import status via Admin UI.
1195711966

1195811967
Args:
@@ -11976,7 +11985,7 @@ async def admin_get_import_status(import_id: str, user=Depends(get_current_user_
1197611985

1197711986
@admin_router.get("/import/status")
1197811987
@require_permission("admin.system_config", allow_admin_bypass=False)
11979-
async def admin_list_import_statuses(user=Depends(get_current_user_with_permissions)):
11988+
async def admin_list_import_statuses(user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
1198011989
"""List all import statuses via Admin UI.
1198111990

1198211991
Args:
@@ -13879,6 +13888,7 @@ async def admin_generate_support_bundle(
1387913888
include_env: bool = Query(default=True, description="Include environment config"),
1388013889
include_system: bool = Query(default=True, description="Include system info"),
1388113890
user=Depends(get_current_user_with_permissions),
13891+
db: Session = Depends(get_db),
1388213892
):
1388313893
"""
1388413894
Generate and download a support bundle with sanitized diagnostics.
@@ -13955,6 +13965,7 @@ async def admin_generate_support_bundle(
1395513965
async def get_maintenance_partial(
1395613966
request: Request,
1395713967
_user=Depends(get_current_user_with_permissions),
13968+
db: Session = Depends(get_db),
1395813969
):
1395913970
"""Render the maintenance dashboard partial (platform admin only).
1396013971

@@ -14000,7 +14011,7 @@ async def get_maintenance_partial(
1400014011

1400114012
@admin_router.get("/observability/partial", response_class=HTMLResponse)
1400214013
@require_permission("admin.system_config", allow_admin_bypass=False)
14003-
async def get_observability_partial(request: Request, _user=Depends(get_current_user_with_permissions)):
14014+
async def get_observability_partial(request: Request, _user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
1400414015
"""Render the observability dashboard partial.
1400514016

1400614017
Args:
@@ -14016,7 +14027,7 @@ async def get_observability_partial(request: Request, _user=Depends(get_current_
1401614027

1401714028
@admin_router.get("/observability/metrics/partial", response_class=HTMLResponse)
1401814029
@require_permission("admin.system_config", allow_admin_bypass=False)
14019-
async def get_observability_metrics_partial(request: Request, _user=Depends(get_current_user_with_permissions)):
14030+
async def get_observability_metrics_partial(request: Request, _user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
1402014031
"""Render the advanced metrics dashboard partial.
1402114032

1402214033
Args:
@@ -14032,7 +14043,7 @@ async def get_observability_metrics_partial(request: Request, _user=Depends(get_
1403214043

1403314044
@admin_router.get("/observability/stats", response_class=HTMLResponse)
1403414045
@require_permission("admin.system_config", allow_admin_bypass=False)
14035-
async def get_observability_stats(request: Request, hours: int = Query(24, ge=1, le=168), _user=Depends(get_current_user_with_permissions)):
14046+
async def get_observability_stats(request: Request, hours: int = Query(24, ge=1, le=168), _user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
1403614047
"""Get observability statistics for the dashboard.
1403714048

1403814049
Args:
@@ -14089,6 +14100,7 @@ async def get_observability_traces(
1408914100
attribute_search: Optional[str] = Query(None),
1409014101
tool_name: Optional[str] = Query(None),
1409114102
_user=Depends(get_current_user_with_permissions),
14103+
db: Session = Depends(get_db),
1409214104
):
1409314105
"""Get list of traces for the dashboard.
1409414106

@@ -14175,7 +14187,7 @@ async def get_observability_traces(
1417514187

1417614188
@admin_router.get("/observability/trace/{trace_id}", response_class=HTMLResponse)
1417714189
@require_permission("admin.system_config", allow_admin_bypass=False)
14178-
async def get_observability_trace_detail(request: Request, trace_id: str, _user=Depends(get_current_user_with_permissions)):
14190+
async def get_observability_trace_detail(request: Request, trace_id: str, _user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)):
1417914191
"""Get detailed trace information with spans.
1418014192

1418114193
Args:
@@ -14215,6 +14227,7 @@ async def save_observability_query(
1421514227
filter_config: dict = Body(..., description="Filter configuration as JSON"),
1421614228
is_shared: bool = Body(False, description="Whether query is shared with team"),
1421714229
user=Depends(get_current_user_with_permissions),
14230+
db: Session = Depends(get_db),
1421814231
):
1421914232
"""Save a new observability query filter configuration.
1422014233

@@ -14259,7 +14272,7 @@ async def save_observability_query(
1425914272

1426014273
@admin_router.get("/observability/queries", response_model=list)
1426114274
@require_permission("admin.system_config", allow_admin_bypass=False)
14262-
async def list_observability_queries(request: Request, user=Depends(get_current_user_with_permissions)): # pylint: disable=unused-argument
14275+
async def list_observability_queries(request: Request, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): # pylint: disable=unused-argument
1426314276
"""List saved observability queries for the current user.
1426414277

1426514278
Returns user's own queries plus any shared queries.
@@ -14307,7 +14320,7 @@ async def list_observability_queries(request: Request, user=Depends(get_current_
1430714320

1430814321
@admin_router.get("/observability/queries/{query_id}", response_model=dict)
1430914322
@require_permission("admin.system_config", allow_admin_bypass=False)
14310-
async def get_observability_query(request: Request, query_id: int, user=Depends(get_current_user_with_permissions)): # pylint: disable=unused-argument
14323+
async def get_observability_query(request: Request, query_id: int, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): # pylint: disable=unused-argument
1431114324
"""Get a specific saved query by ID.
1431214325

1431314326
Args:
@@ -14362,6 +14375,7 @@ async def update_observability_query(
1436214375
filter_config: Optional[dict] = Body(None),
1436314376
is_shared: Optional[bool] = Body(None),
1436414377
user=Depends(get_current_user_with_permissions),
14378+
db: Session = Depends(get_db),
1436514379
):
1436614380
"""Update an existing saved query.
1436714381

@@ -14427,7 +14441,7 @@ async def update_observability_query(
1442714441

1442814442
@admin_router.delete("/observability/queries/{query_id}", status_code=204)
1442914443
@require_permission("admin.system_config", allow_admin_bypass=False)
14430-
async def delete_observability_query(request: Request, query_id: int, user=Depends(get_current_user_with_permissions)): # pylint: disable=unused-argument
14444+
async def delete_observability_query(request: Request, query_id: int, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): # pylint: disable=unused-argument
1443114445
"""Delete a saved query.
1443214446

1443314447
Args:
@@ -14460,7 +14474,7 @@ async def delete_observability_query(request: Request, query_id: int, user=Depen
1446014474

1446114475
@admin_router.post("/observability/queries/{query_id}/use", response_model=dict)
1446214476
@require_permission("admin.system_config", allow_admin_bypass=False)
14463-
async def track_query_usage(request: Request, query_id: int, user=Depends(get_current_user_with_permissions)): # pylint: disable=unused-argument
14477+
async def track_query_usage(request: Request, query_id: int, user=Depends(get_current_user_with_permissions), db: Session = Depends(get_db)): # pylint: disable=unused-argument
1446414478
"""Track usage of a saved query (increments use count and updates last_used_at).
1446514479

1446614480
Args:
@@ -14515,6 +14529,7 @@ async def get_latency_percentiles(
1451514529
hours: int = Query(24, ge=1, le=168, description="Time range in hours"),
1451614530
interval_minutes: int = Query(60, ge=5, le=1440, description="Aggregation interval in minutes"),
1451714531
_user=Depends(get_current_user_with_permissions),
14532+
db: Session = Depends(get_db),
1451814533
):
1451914534
"""Get latency percentiles (p50, p90, p95, p99) over time.
1452014535

@@ -14682,6 +14697,7 @@ async def get_timeseries_metrics(
1468214697
hours: int = Query(24, ge=1, le=168, description="Time range in hours"),
1468314698
interval_minutes: int = Query(60, ge=5, le=1440, description="Aggregation interval in minutes"),
1468414699
_user=Depends(get_current_user_with_permissions),
14700+
db: Session = Depends(get_db),
1468514701
):
1468614702
"""Get time-series metrics (request rate, error rate, throughput).
1468714703

@@ -15008,6 +15024,7 @@ async def get_top_slow_endpoints(
1500815024
hours: int = Query(24, ge=1, le=168, description="Time range in hours"),
1500915025
limit: int = Query(10, ge=1, le=100, description="Number of results"),
1501015026
_user=Depends(get_current_user_with_permissions),
15027+
db: Session = Depends(get_db),
1501115028
):
1501215029
"""Get top N slowest endpoints by average duration.
1501315030

@@ -15075,6 +15092,7 @@ async def get_top_volume_endpoints(
1507515092
hours: int = Query(24, ge=1, le=168, description="Time range in hours"),
1507615093
limit: int = Query(10, ge=1, le=100, description="Number of results"),
1507715094
_user=Depends(get_current_user_with_permissions),
15095+
db: Session = Depends(get_db),
1507815096
):
1507915097
"""Get top N highest volume endpoints by request count.
1508015098

@@ -15140,6 +15158,7 @@ async def get_top_error_endpoints(
1514015158
hours: int = Query(24, ge=1, le=168, description="Time range in hours"),
1514115159
limit: int = Query(10, ge=1, le=100, description="Number of results"),
1514215160
_user=Depends(get_current_user_with_permissions),
15161+
db: Session = Depends(get_db),
1514315162
):
1514415163
"""Get top N error-prone endpoints by error count and rate.
1514515164

@@ -15209,6 +15228,7 @@ async def get_latency_heatmap(
1520915228
time_buckets: int = Query(24, ge=10, le=100, description="Number of time buckets"),
1521015229
latency_buckets: int = Query(20, ge=5, le=50, description="Number of latency buckets"),
1521115230
_user=Depends(get_current_user_with_permissions),
15231+
db: Session = Depends(get_db),
1521215232
):
1521315233
"""Get latency distribution heatmap data.
1521415234

@@ -15255,6 +15275,7 @@ async def get_tool_usage(
1525515275
hours: int = Query(24, ge=1, le=168, description="Time range in hours"),
1525615276
limit: int = Query(20, ge=5, le=100, description="Number of tools to return"),
1525715277
_user=Depends(get_current_user_with_permissions),
15278+
db: Session = Depends(get_db),
1525815279
):
1525915280
"""Get tool usage frequency statistics.
1526015281

@@ -15326,6 +15347,7 @@ async def get_tool_performance(
1532615347
hours: int = Query(24, ge=1, le=168, description="Time range in hours"),
1532715348
limit: int = Query(20, ge=5, le=100, description="Number of tools to return"),
1532815349
_user=Depends(get_current_user_with_permissions),
15350+
db: Session = Depends(get_db),
1532915351
):
1533015352
"""Get tool performance metrics (avg, min, max duration).
1533115353

@@ -15376,6 +15398,7 @@ async def get_tool_errors(
1537615398
hours: int = Query(24, ge=1, le=168, description="Time range in hours"),
1537715399
limit: int = Query(20, ge=5, le=100, description="Number of tools to return"),
1537815400
_user=Depends(get_current_user_with_permissions),
15401+
db: Session = Depends(get_db),
1537915402
):
1538015403
"""Get tool error rates and statistics.
1538115404

@@ -15446,6 +15469,7 @@ async def get_tool_chains(
1544615469
hours: int = Query(24, ge=1, le=168, description="Time range in hours"),
1544715470
limit: int = Query(20, ge=5, le=100, description="Number of chains to return"),
1544815471
_user=Depends(get_current_user_with_permissions),
15472+
db: Session = Depends(get_db),
1544915473
):
1545015474
"""Get tool chain analysis (which tools are invoked together in the same trace).
1545115475

@@ -15522,6 +15546,7 @@ async def get_tool_chains(
1552215546
async def get_tools_partial(
1552315547
request: Request,
1552415548
_user=Depends(get_current_user_with_permissions),
15549+
db: Session = Depends(get_db),
1552515550
):
1552615551
"""Render the tool invocation metrics dashboard HTML partial.
1552715552

@@ -15555,6 +15580,7 @@ async def get_prompt_usage(
1555515580
hours: int = Query(24, ge=1, le=168, description="Time range in hours"),
1555615581
limit: int = Query(20, ge=5, le=100, description="Number of prompts to return"),
1555715582
_user=Depends(get_current_user_with_permissions),
15583+
db: Session = Depends(get_db),
1555815584
):
1555915585
"""Get prompt rendering frequency statistics.
1556015586

@@ -15626,6 +15652,7 @@ async def get_prompt_performance(
1562615652
hours: int = Query(24, ge=1, le=168, description="Time range in hours"),
1562715653
limit: int = Query(20, ge=5, le=100, description="Number of prompts to return"),
1562815654
_user=Depends(get_current_user_with_permissions),
15655+
db: Session = Depends(get_db),
1562915656
):
1563015657
"""Get prompt performance metrics (avg, min, max duration).
1563115658

@@ -15675,6 +15702,7 @@ async def get_prompts_errors(
1567515702
hours: int = Query(24, description="Time range in hours"),
1567615703
limit: int = Query(20, description="Maximum number of results"),
1567715704
_user=Depends(get_current_user_with_permissions),
15705+
db: Session = Depends(get_db),
1567815706
):
1567915707
"""Get prompt error rates.
1568015708

@@ -15736,6 +15764,7 @@ async def get_prompts_errors(
1573615764
async def get_prompts_partial(
1573715765
request: Request,
1573815766
_user=Depends(get_current_user_with_permissions),
15767+
db: Session = Depends(get_db),
1573915768
):
1574015769
"""Render the prompt rendering metrics dashboard HTML partial.
1574115770

@@ -15769,6 +15798,7 @@ async def get_resource_usage(
1576915798
hours: int = Query(24, ge=1, le=168, description="Time range in hours"),
1577015799
limit: int = Query(20, ge=5, le=100, description="Number of resources to return"),
1577115800
_user=Depends(get_current_user_with_permissions),
15801+
db: Session = Depends(get_db),
1577215802
):
1577315803
"""Get resource fetch frequency statistics.
1577415804

@@ -15840,6 +15870,7 @@ async def get_resource_performance(
1584015870
hours: int = Query(24, ge=1, le=168, description="Time range in hours"),
1584115871
limit: int = Query(20, ge=5, le=100, description="Number of resources to return"),
1584215872
_user=Depends(get_current_user_with_permissions),
15873+
db: Session = Depends(get_db),
1584315874
):
1584415875
"""Get resource performance metrics (avg, min, max duration).
1584515876

@@ -15889,6 +15920,7 @@ async def get_resources_errors(
1588915920
hours: int = Query(24, description="Time range in hours"),
1589015921
limit: int = Query(20, description="Maximum number of results"),
1589115922
_user=Depends(get_current_user_with_permissions),
15923+
db: Session = Depends(get_db),
1589215924
):
1589315925
"""Get resource error rates.
1589415926

@@ -15950,6 +15982,7 @@ async def get_resources_errors(
1595015982
async def get_resources_partial(
1595115983
request: Request,
1595215984
_user=Depends(get_current_user_with_permissions),
15985+
db: Session = Depends(get_db),
1595315986
):
1595415987
"""Render the resource fetch metrics dashboard HTML partial.
1595515988

0 commit comments

Comments
 (0)