|
| 1 | +"""URL validation shared by every HTTP-based communication protocol. |
| 2 | +
|
| 3 | +Centralised so all three HTTP protocols (http, streamable_http, sse) enforce |
| 4 | +the same trust boundary at every network edge — manual discovery AND tool |
| 5 | +invocation. Issue #83 (CVE-class SSRF) was caused by the runtime invocation |
| 6 | +path forgetting the discovery-time check, so this module also provides an |
| 7 | +explicit ``ensure_secure_url`` to call before every aiohttp request. |
| 8 | +""" |
| 9 | + |
| 10 | +from __future__ import annotations |
| 11 | + |
| 12 | +from ipaddress import ip_address |
| 13 | +from typing import Optional |
| 14 | +from urllib.parse import urlparse |
| 15 | + |
| 16 | +# Hostnames considered safe to talk to over plain HTTP. |
| 17 | +_LOOPBACK_HOSTNAMES = frozenset({"localhost", "127.0.0.1", "::1", "[::1]"}) |
| 18 | + |
| 19 | + |
| 20 | +def is_secure_url(url: str) -> bool: |
| 21 | + """Return True if ``url`` is safe to fetch from a UTCP HTTP protocol. |
| 22 | +
|
| 23 | + Allowed: |
| 24 | + - Any ``https://`` URL. |
| 25 | + - ``http://`` URLs whose host is exactly ``localhost``, ``127.0.0.1``, |
| 26 | + or ``::1``. |
| 27 | +
|
| 28 | + Disallowed: |
| 29 | + - Plain ``http://`` to any other host (MITM exposure). |
| 30 | + - URLs whose hostname *starts* with ``localhost`` / ``127.0.0.1`` but |
| 31 | + isn't actually loopback (e.g. ``http://localhost.evil.com``, |
| 32 | + ``http://127.0.0.1.attacker.example``). The earlier ``startswith`` |
| 33 | + check let these through. |
| 34 | + - Anything without a scheme/host (file://, gopher://, javascript:, ...). |
| 35 | + """ |
| 36 | + if not isinstance(url, str) or not url: |
| 37 | + return False |
| 38 | + |
| 39 | + try: |
| 40 | + parsed = urlparse(url) |
| 41 | + except ValueError: |
| 42 | + return False |
| 43 | + |
| 44 | + scheme = (parsed.scheme or "").lower() |
| 45 | + if scheme not in {"http", "https"}: |
| 46 | + return False |
| 47 | + |
| 48 | + host = (parsed.hostname or "").lower() |
| 49 | + if not host: |
| 50 | + return False |
| 51 | + |
| 52 | + if scheme == "https": |
| 53 | + return True |
| 54 | + |
| 55 | + # http:// is only allowed for loopback. |
| 56 | + if host in _LOOPBACK_HOSTNAMES: |
| 57 | + return True |
| 58 | + |
| 59 | + # Catch any other literal loopback IP that urlparse normalised |
| 60 | + # (e.g. ``http://127.000.000.001``). |
| 61 | + try: |
| 62 | + return ip_address(host).is_loopback |
| 63 | + except ValueError: |
| 64 | + return False |
| 65 | + |
| 66 | + |
| 67 | +def is_loopback_url(url: str) -> bool: |
| 68 | + """Return True if ``url``'s host is a literal loopback address. |
| 69 | +
|
| 70 | + Used by the OpenAPI converter to detect the SSRF case where a remote spec |
| 71 | + declares ``servers: [{ url: "http://127.0.0.1:..." }]`` to redirect tool |
| 72 | + invocation at the host running the agent. Hostname-based — not a string |
| 73 | + prefix — so ``http://localhost.evil.com`` returns False. |
| 74 | + """ |
| 75 | + if not isinstance(url, str) or not url: |
| 76 | + return False |
| 77 | + |
| 78 | + try: |
| 79 | + parsed = urlparse(url) |
| 80 | + except ValueError: |
| 81 | + return False |
| 82 | + |
| 83 | + host = (parsed.hostname or "").lower() |
| 84 | + if not host: |
| 85 | + return False |
| 86 | + |
| 87 | + if host in _LOOPBACK_HOSTNAMES: |
| 88 | + return True |
| 89 | + |
| 90 | + try: |
| 91 | + return ip_address(host).is_loopback |
| 92 | + except ValueError: |
| 93 | + return False |
| 94 | + |
| 95 | + |
| 96 | +def ensure_secure_url(url: str, *, context: Optional[str] = None) -> None: |
| 97 | + """Raise ``ValueError`` if ``url`` is not safe to fetch. |
| 98 | +
|
| 99 | + ``context`` is a short label (``"manual discovery"``, ``"tool invocation"``, |
| 100 | + etc.) included in the error so log readers can tell which trust boundary |
| 101 | + was breached. |
| 102 | + """ |
| 103 | + if is_secure_url(url): |
| 104 | + return |
| 105 | + |
| 106 | + where = f" during {context}" if context else "" |
| 107 | + raise ValueError( |
| 108 | + f"Security error{where}: URL must use HTTPS or be a literal loopback " |
| 109 | + f"address (localhost / 127.0.0.1 / ::1). Got: {url!r}. " |
| 110 | + "Plain HTTP to any other host is rejected to prevent MITM attacks " |
| 111 | + "and SSRF into internal services." |
| 112 | + ) |
0 commit comments