-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun_benchmark.py
More file actions
99 lines (76 loc) · 3.42 KB
/
run_benchmark.py
File metadata and controls
99 lines (76 loc) · 3.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import re
import asyncio
from pathlib import Path
from ai_powered_jenkins_auditor.security_advisor_agent.workflow import app_workflow
from ai_powered_jenkins_auditor.security_advisor_agent.state import AgentState
from tests.benchmark_data import BENCHMARK_CASES, GROUND_TRUTH
RESULTS_DIR = Path("benchmark_results")
async def run_benchmark_case(case_name, pipeline_builder):
"""
Async function to run a single benchmark case
"""
print(f"\n--- Running Benchmark Case: {case_name} ---")
pipeline_obj = pipeline_builder()
ground_truth = GROUND_TRUTH[case_name]
initial_state: AgentState = {
"pipeline": pipeline_obj,
"tasks_to_do": [],
"raw_findings": [],
"final_report": ""
}
final_state = await app_workflow.ainvoke(initial_state)
final_report = final_state.get("final_report", "Error: Report generation failed.")
# Save results
result_filename = RESULTS_DIR / f"{case_name}_report.md"
result_filename.write_text(final_report, encoding='utf-8')
print(f"--- Full report saved to: {result_filename} ---")
return final_report, ground_truth
async def run_and_generate_table():
"""
Async version of the benchmark runner
"""
RESULTS_DIR.mkdir(exist_ok=True)
table_data = []
print("--- Starting Benchmark Run ---")
for case_name, pipeline_builder in BENCHMARK_CASES.items():
final_report, ground_truth = await run_benchmark_case(case_name, pipeline_builder)
# Analysis logic
report_lower = final_report.lower()
expected_vulnerabilities = ground_truth["expected_vulnerabilities"]
found_count = sum(1 for vuln in expected_vulnerabilities if vuln in report_lower)
reported_count = len(re.findall(r"#### vulnerability:", report_lower))
status = "✅ PASS"
details = f"Found {found_count}/{len(expected_vulnerabilities)} expected issues."
if found_count < len(expected_vulnerabilities):
status = "❌ FAIL"
details = f"Missed {len(expected_vulnerabilities) - found_count} expected issue(s)."
false_positives = reported_count - found_count
if false_positives > 0:
status = "❌ FAIL"
details += f" Reported {false_positives} unexpected issue(s)."
if not expected_vulnerabilities and reported_count > 0:
status = "❌ FAIL"
details = f"Reported {reported_count} false positive(s)."
elif not expected_vulnerabilities and reported_count == 0:
status = "✅ PASS"
details = "Correctly found no issues."
table_data.append({
"case": case_name,
"status": status,
"details": details
})
# Print results
print("\n\n" + "="*65)
print("--- BENCHMARK RESULTS SUMMARY ---")
print("="*65 + "\n")
print(f"| {'Benchmark Case'.ljust(25)} | {'Status'.ljust(8)} | {'Details'.ljust(40)} |")
print(f"|:{'-'*25}|:{'-'*8}|:{'-'*40}|")
for row in table_data:
print(f"| {row['case'].ljust(25)} | {row['status'].ljust(8)} | {row['details'].ljust(40)} |")
print("\n" + "="*65)
print(f"Detailed reports for each run are saved in the '{RESULTS_DIR}' directory.")
print("="*65 + "\n")
async def main():
await run_and_generate_table()
if __name__ == "__main__":
asyncio.run(main())