forked from beenuar/AiSOC
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathssrf_guard.py
More file actions
236 lines (185 loc) · 8.37 KB
/
Copy pathssrf_guard.py
File metadata and controls
236 lines (185 loc) · 8.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
"""
SSRF guard for playbook outbound HTTP steps.
Playbooks let analysts configure ``http`` and ``notify`` steps that make
outbound HTTP requests to arbitrary URLs. Without validation, a playbook
author can target cloud instance metadata services (e.g. AWS
``169.254.169.254``, GCP ``metadata.google.internal``), loopback / private
RFC1918 addresses, or non-HTTP schemes like ``file://`` — a classic
Server-Side Request Forgery (SSRF) class of bug.
This module provides :func:`validate_outbound_url`, called by playbook step
handlers before they hand a URL to ``httpx``. It rejects:
* Schemes other than ``http`` / ``https`` (overridable via
``AISOC_SSRF_ALLOWED_SCHEMES``).
* URLs with userinfo (``http://user:pw@host/...``) — these are a common
credential-smuggling and host-spoofing vector.
* Hostnames or IP literals that resolve to loopback, link-local, private,
reserved, unspecified, or multicast ranges.
* A blocklist of well-known cloud metadata endpoints (AWS, GCP, Azure,
Alibaba, Oracle) even if their hostnames happen to resolve to a public IP.
* Hostnames where *any* of the returned addresses is unsafe (defence in
depth against DNS records that mix one public and one private answer).
Operators can:
* Set ``AISOC_SSRF_ALLOW_PRIVATE=1`` to relax the private/loopback check
for on-prem deployments where playbooks must reach internal services
(still rejects cloud-metadata literals and link-local/multicast).
* Set ``AISOC_SSRF_EXTRA_BLOCKED_HOSTS=foo.internal,bar.svc`` to extend
the metadata host blocklist.
Failures raise :class:`SSRFError`, which propagates up to the engine and
marks the step as failed via the normal exception path.
"""
from __future__ import annotations
import ipaddress
import os
import socket
from urllib.parse import urlsplit
class SSRFError(ValueError):
"""Raised when an outbound URL is blocked by the SSRF guard."""
# ---------------------------------------------------------------------------
# Static blocklists
# ---------------------------------------------------------------------------
# Hostnames that must never be reached regardless of DNS resolution, e.g.
# operators sometimes wire ``metadata.google.internal`` to a public proxy.
# Comparison is case-insensitive against the URL's hostname (no port).
_DEFAULT_BLOCKED_HOSTS: frozenset[str] = frozenset(
{
# AWS / Alibaba IMDS literal
"169.254.169.254",
# GCP IMDS
"metadata.google.internal",
"metadata",
# Azure IMDS — its IP literal is the same as AWS; hostname forms vary.
"metadata.azure.com",
# Alibaba Cloud
"100.100.100.200",
# Oracle Cloud Infrastructure
"169.254.169.254.nip.io",
}
)
_DEFAULT_ALLOWED_SCHEMES: frozenset[str] = frozenset({"http", "https"})
def _env_set(name: str) -> frozenset[str]:
"""Read a comma-separated env var into a lowercase frozenset."""
raw = os.getenv(name, "") or ""
return frozenset(part.strip().lower() for part in raw.split(",") if part.strip())
def _allowed_schemes() -> frozenset[str]:
extra = _env_set("AISOC_SSRF_ALLOWED_SCHEMES")
return extra or _DEFAULT_ALLOWED_SCHEMES
def _blocked_hosts() -> frozenset[str]:
return _DEFAULT_BLOCKED_HOSTS | _env_set("AISOC_SSRF_EXTRA_BLOCKED_HOSTS")
def _allow_private_by_default() -> bool:
return os.getenv("AISOC_SSRF_ALLOW_PRIVATE", "").strip().lower() in {"1", "true", "yes"}
# ---------------------------------------------------------------------------
# IP classification
# ---------------------------------------------------------------------------
def _is_disallowed_address(ip: ipaddress._BaseAddress, *, allow_private: bool) -> tuple[bool, str]:
"""Return ``(blocked, reason)`` for an IP address.
When ``allow_private`` is True we still reject loopback, link-local,
unspecified, multicast, and reserved addresses — only the
private/RFC1918 + ULA bands are loosened.
"""
if ip.is_loopback:
return True, "loopback address"
if ip.is_link_local:
return True, "link-local address (cloud metadata range)"
if ip.is_unspecified:
return True, "unspecified address"
if ip.is_multicast:
return True, "multicast address"
if ip.is_reserved:
return True, "reserved address"
# Block IPv4-mapped/IPv4-compatible IPv6 against private v4 ranges.
if isinstance(ip, ipaddress.IPv6Address):
mapped = getattr(ip, "ipv4_mapped", None)
if mapped is not None:
return _is_disallowed_address(mapped, allow_private=allow_private)
if not allow_private and ip.is_private:
return True, "private/RFC1918 address"
return False, ""
def _coerce_ip(value: str) -> ipaddress._BaseAddress | None:
"""Try to parse ``value`` as an IP address (v4 or v6), return None if not."""
try:
return ipaddress.ip_address(value)
except ValueError:
return None
def _resolve_host_addresses(host: str) -> list[ipaddress._BaseAddress]:
"""Resolve ``host`` via ``getaddrinfo`` and return all unique IP objects.
Raises :class:`SSRFError` if the host cannot be resolved.
"""
try:
infos = socket.getaddrinfo(host, None, type=socket.SOCK_STREAM)
except OSError as exc:
raise SSRFError(f"could not resolve hostname {host!r}: {exc}") from exc
addrs: list[ipaddress._BaseAddress] = []
seen: set[str] = set()
for info in infos:
sockaddr = info[4]
addr_str = sockaddr[0] if sockaddr else ""
# IPv6 sockaddrs can include zone IDs ("fe80::1%eth0"); strip them.
addr_str = addr_str.split("%", 1)[0]
if not addr_str or addr_str in seen:
continue
seen.add(addr_str)
ip = _coerce_ip(addr_str)
if ip is not None:
addrs.append(ip)
if not addrs:
raise SSRFError(f"hostname {host!r} resolved to no usable addresses")
return addrs
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def validate_outbound_url(url: str, *, allow_private: bool | None = None) -> str:
"""Validate ``url`` for outbound HTTP calls from playbook steps.
Parameters
----------
url:
The candidate URL string.
allow_private:
Override the env-driven default. ``True`` permits private/RFC1918
addresses (still blocks loopback, link-local, metadata literals,
and explicit blocklist entries). ``None`` falls back to
``AISOC_SSRF_ALLOW_PRIVATE``.
Returns
-------
str
The original URL on success.
Raises
------
SSRFError
If the URL is blocked for any reason.
"""
if not isinstance(url, str) or not url.strip():
raise SSRFError("URL is empty")
try:
parts = urlsplit(url.strip())
except ValueError as exc:
raise SSRFError(f"could not parse URL: {exc}") from exc
scheme = (parts.scheme or "").lower()
if scheme not in _allowed_schemes():
raise SSRFError(f"scheme {scheme!r} is not allowed (must be one of {sorted(_allowed_schemes())})")
# Reject userinfo to avoid credential smuggling and host-confusion attacks.
if parts.username or parts.password:
raise SSRFError("URL must not include userinfo (user:password@host)")
host = (parts.hostname or "").strip().lower()
if not host:
raise SSRFError("URL has no hostname")
# Explicit blocklist takes precedence over IP / DNS checks.
blocked = _blocked_hosts()
if host in blocked:
raise SSRFError(f"hostname {host!r} is on the SSRF blocklist (cloud metadata or operator-blocked)")
if allow_private is None:
allow_private = _allow_private_by_default()
# If host is an IP literal, validate it directly.
literal_ip = _coerce_ip(host)
if literal_ip is not None:
blocked_flag, reason = _is_disallowed_address(literal_ip, allow_private=allow_private)
if blocked_flag:
raise SSRFError(f"host {host!r} is blocked: {reason}")
return url
# Otherwise, resolve the hostname and validate every returned address.
addresses = _resolve_host_addresses(host)
for ip in addresses:
blocked_flag, reason = _is_disallowed_address(ip, allow_private=allow_private)
if blocked_flag:
raise SSRFError(f"hostname {host!r} resolves to a blocked address {ip} ({reason})")
return url
__all__ = ["SSRFError", "validate_outbound_url"]