Skip to content

Commit 6e845de

Browse files
authored
feat(linter): Adapter for bandit (#132)
Lintrunner adapter for bandit https://github.com/PyCQA/bandit
1 parent 3ae2575 commit 6e845de

File tree

2 files changed

+265
-0
lines changed

2 files changed

+265
-0
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# SPDX-FileCopyrightText: Copyright 2026 Arm Limited and/or its affiliates <open-source-office@arm.com>
2+
# SPDX-License-Identifier: MIT
3+
4+
[[linter]]
5+
code = 'BANDIT'
6+
include_patterns = ['**/*.py']
7+
exclude_patterns = []
8+
command = [
9+
'python',
10+
'-m',
11+
'lintrunner_adapters',
12+
'run',
13+
'bandit_linter',
14+
'--configfile',
15+
'.bandit.yaml',
16+
'--',
17+
'@{{PATHSFILE}}',
18+
]
19+
init_command = [
20+
'python',
21+
'-m',
22+
'lintrunner_adapters',
23+
'run',
24+
'pip_init',
25+
'--dry-run={{DRYRUN}}',
26+
'--requirement=requirements-lintrunner.txt',
27+
]
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
# SPDX-FileCopyrightText: Copyright 2026 Arm Limited and/or its affiliates <open-source-office@arm.com>
2+
# SPDX-License-Identifier: MIT
3+
"""Adapter for https://bandit.readthedocs.io/en/latest/"""
4+
5+
from __future__ import annotations
6+
7+
import argparse
8+
import concurrent.futures
9+
import json
10+
import logging
11+
import os
12+
import sys
13+
from typing import Any
14+
15+
from lintrunner_adapters import (
16+
LintMessage,
17+
LintSeverity,
18+
add_default_options,
19+
run_command,
20+
)
21+
22+
LINTER_CODE = "BANDIT"
23+
24+
SEVERITIES = {
25+
"LOW": LintSeverity.ADVICE,
26+
"MEDIUM": LintSeverity.WARNING,
27+
"HIGH": LintSeverity.ERROR,
28+
}
29+
30+
31+
def format_lint_messages(
32+
message: str,
33+
code: str,
34+
string_code: str | None,
35+
) -> str:
36+
suffix = string_code or code
37+
formatted = f"{message} ({suffix})" if suffix else message
38+
return formatted
39+
40+
41+
def decode_failed_message(filename: str, err: UnicodeDecodeError) -> LintMessage:
42+
return LintMessage(
43+
path=filename,
44+
line=None,
45+
char=None,
46+
code=LINTER_CODE,
47+
severity=LintSeverity.ERROR,
48+
name="decode-failed",
49+
original=None,
50+
replacement=None,
51+
description=(f"utf-8 decoding failed due to {err.__class__.__name__}:\n{err}"),
52+
)
53+
54+
55+
def command_failed_message(filename: str, err: Exception) -> LintMessage:
56+
return LintMessage(
57+
path=filename,
58+
line=None,
59+
char=None,
60+
code=LINTER_CODE,
61+
severity=LintSeverity.ERROR,
62+
name="command-failed",
63+
original=None,
64+
replacement=None,
65+
description=(f"Failed due to {err.__class__.__name__}:\n{err}"),
66+
)
67+
68+
69+
def run_bandit(
70+
filename: str,
71+
configfile: str | None,
72+
timeout: int,
73+
) -> bytes | None:
74+
return run_command(
75+
[
76+
"bandit",
77+
"--format",
78+
"json",
79+
"--quiet",
80+
"--exit-zero",
81+
*([f"--configfile={configfile}"] if configfile else []),
82+
filename,
83+
],
84+
check=True,
85+
timeout=timeout,
86+
).stdout
87+
88+
89+
def parse_payload(
90+
stdout_bytes: bytes | None,
91+
filename: str,
92+
) -> tuple[dict[str, Any] | None, list[LintMessage]]:
93+
if not stdout_bytes:
94+
return None, []
95+
try:
96+
stdout = stdout_bytes.decode("utf-8").strip()
97+
except UnicodeDecodeError as err:
98+
return None, [decode_failed_message(filename, err)]
99+
if not stdout:
100+
return None, []
101+
return json.loads(stdout), []
102+
103+
104+
def coerce_optional_int(value: Any) -> int | None:
105+
try:
106+
return int(value) if value is not None else None
107+
except (TypeError, ValueError):
108+
return None
109+
110+
111+
def errors_from_payload(payload: dict[str, Any], filename: str) -> list[LintMessage]:
112+
return [
113+
LintMessage(
114+
path=error.get("filename") or filename,
115+
line=None,
116+
char=None,
117+
code=LINTER_CODE,
118+
severity=LintSeverity.ERROR,
119+
name="bandit-error",
120+
original=None,
121+
replacement=None,
122+
description=error.get("reason") or "Bandit reported an error.",
123+
)
124+
for error in payload.get("errors", [])
125+
]
126+
127+
128+
def issue_to_message(issue: dict[str, Any], filename: str) -> LintMessage:
129+
code = issue.get("test_id") or LINTER_CODE
130+
issue_text = issue.get("issue_text") or "Bandit reported an issue."
131+
severity = SEVERITIES.get(
132+
(issue.get("issue_severity") or "").upper(),
133+
LintSeverity.WARNING,
134+
)
135+
return LintMessage(
136+
path=issue.get("filename") or filename,
137+
name=code,
138+
description=format_lint_messages(
139+
issue_text,
140+
code,
141+
issue.get("test_name"),
142+
),
143+
line=coerce_optional_int(issue.get("line_number")),
144+
char=coerce_optional_int(issue.get("col_offset")),
145+
code=LINTER_CODE,
146+
severity=severity,
147+
original=None,
148+
replacement=None,
149+
)
150+
151+
152+
def issues_from_payload(payload: dict[str, Any], filename: str) -> list[LintMessage]:
153+
return [issue_to_message(issue, filename) for issue in payload.get("results", [])]
154+
155+
156+
def check_file(
157+
filename: str,
158+
*,
159+
configfile: str | None,
160+
timeout: int,
161+
) -> list[LintMessage]:
162+
try:
163+
stdout_bytes = run_bandit(
164+
filename=filename,
165+
configfile=configfile,
166+
timeout=timeout,
167+
)
168+
except Exception as err:
169+
return [command_failed_message(filename, err)]
170+
171+
payload, decode_messages = parse_payload(stdout_bytes, filename)
172+
if decode_messages:
173+
return decode_messages
174+
if payload is None:
175+
return []
176+
177+
return [
178+
*errors_from_payload(payload, filename),
179+
*issues_from_payload(payload, filename),
180+
]
181+
182+
183+
def main() -> None:
184+
parser = argparse.ArgumentParser(
185+
description=f"bandit wrapper linter. Linter code: {LINTER_CODE}",
186+
fromfile_prefix_chars="@",
187+
)
188+
parser.add_argument(
189+
"--configfile",
190+
default=None,
191+
type=str,
192+
help="bandit config file",
193+
)
194+
parser.add_argument(
195+
"--timeout",
196+
default=90,
197+
type=int,
198+
help="seconds to wait for bandit",
199+
)
200+
add_default_options(parser)
201+
args = parser.parse_args()
202+
203+
logging.basicConfig(
204+
format="<%(threadName)s:%(levelname)s> %(message)s",
205+
level=(
206+
logging.NOTSET
207+
if args.verbose
208+
else logging.DEBUG
209+
if len(args.filenames) < 1000
210+
else logging.INFO
211+
),
212+
stream=sys.stderr,
213+
)
214+
215+
with concurrent.futures.ThreadPoolExecutor(
216+
max_workers=os.cpu_count(),
217+
thread_name_prefix="Thread",
218+
) as executor:
219+
futures = {
220+
executor.submit(
221+
check_file,
222+
filename,
223+
configfile=args.configfile,
224+
timeout=args.timeout,
225+
): filename
226+
for filename in args.filenames
227+
}
228+
for future in concurrent.futures.as_completed(futures):
229+
try:
230+
for lint_message in future.result():
231+
lint_message.display()
232+
except Exception:
233+
logging.critical('Failed at "%s".', futures[future])
234+
raise
235+
236+
237+
if __name__ == "__main__":
238+
main()

0 commit comments

Comments
 (0)