Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/godot_ai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,12 @@ def main(argv: Sequence[str] | None = None) -> None:
"Expose the server to a named LAN range for remote agents (issue "
"#421). Takes a CIDR or bare IP (repeatable, or comma-separated), "
"e.g. --allow-host 192.168.1.0/24. When set, both transports bind "
"off loopback and the rebinding guard widens its Host allowlist to "
"these networks ONLY — browser Origin / Sec-Fetch-Site checks stay "
"on. Omit for the default loopback-only behavior. Prefer an SSH "
"tunnel / Tailscale on untrusted networks; only name ranges you trust."
"off loopback and access is gated on the real socket peer address "
"(unforgeable) falling inside these networks ONLY — the Host header "
"is range-checked too but is not the authority, and browser Origin "
"/ Sec-Fetch-Site checks stay on. Omit for the default loopback-only "
"behavior. Prefer an SSH tunnel / Tailscale on untrusted networks; "
"only name ranges you trust."
),
)
parser.add_argument(
Expand Down
101 changes: 85 additions & 16 deletions src/godot_ai/transport/origin_guard.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@
``no-cors`` subresources that wouldn't carry an Origin — is refused with
HTTP 403 long before reaching FastMCP or our session registry.

When the ``--allow-host`` opt-in (#421) binds the transport off loopback,
the **real socket peer** (``scope["client"]`` / ``remote_address``) is the
authoritative LAN gate — see :func:`peer_ip_allowed`. The ``Host`` header
is client-controlled, so a peer outside the allowed range could otherwise
pass the range check by spoofing ``Host: <an-allowed-ip>``; gating on the
unforgeable peer address closes that. In the default loopback-only mode the
peer gate is a pass-through and behavior is byte-for-byte unchanged.

See umbrella #343, finding #1 (audit-v2).
"""

Comment on lines +44 to 48
Expand Down Expand Up @@ -158,6 +166,45 @@ def _host_ip_in_networks(host_header: str, networks: Sequence[IPNetwork] | None)
return any(ip in net for net in networks)


def peer_ip_allowed(peer_ip: str | None, allowed_networks: Sequence[IPNetwork] | None) -> bool:
"""Whether the real TCP peer address is permitted to reach the transport.

This is the authoritative LAN gate. The ``Host`` header is
client-controlled — a peer outside an allowed network can spoof
``Host: <an-allowed-ip>`` and pass :func:`is_allowed_host` — so when
the transport is bound off loopback (the ``--allow-host`` opt-in, #421)
the *socket peer* (``scope["client"]`` for ASGI, ``remote_address`` for
the WebSocket server), which the client cannot forge, must itself be
loopback or fall inside an allowed network.

With no allowlist (the default), the transport stays bound to loopback,
so the kernel already guarantees a loopback peer and this is a
pass-through that keeps the original behavior byte-for-byte — including
contexts (e.g. unit scopes) where the peer address isn't populated.
When an allowlist *is* set but the peer can't be determined, it fails
closed: better to refuse than to fall back to the spoofable header.
"""
if not allowed_networks:
return True
if not peer_ip:
return False
## Strip an IPv6 zone id (``fe80::1%eth0``) before parsing.
candidate = peer_ip.split("%", 1)[0].strip()
try:
ip = ipaddress.ip_address(candidate)
except ValueError:
return False
## A dual-stack listener can report an IPv4 peer in IPv4-mapped IPv6 form
## (``::ffff:127.0.0.1``), whose ``.is_loopback`` is False and which never
## matches an IPv4 allowlist network. Unwrap it to the real IPv4 address so
## a genuine loopback / in-network peer isn't wrongly rejected.
if isinstance(ip, ipaddress.IPv6Address) and ip.ipv4_mapped is not None:
ip = ip.ipv4_mapped
if ip.is_loopback:
return True
return any(ip in net for net in allowed_networks)
Comment on lines +193 to +205


def is_allowed_host(
host_header: str | None,
allowed_networks: Sequence[IPNetwork] | None = None,
Expand Down Expand Up @@ -230,22 +277,30 @@ def evaluate_loopback(
origins: list[str],
sec_fetch_sites: list[str] | None = None,
allowed_networks: Sequence[IPNetwork] | None = None,
peer_ip: str | None = None,
) -> bool:
"""Return True iff the request's headers pass the allowlist.
"""Return True iff the request passes the allowlist.

Both transports (ASGI middleware + WebSocket ``process_request``)
funnel their per-request header extraction through this helper so
the duplicate-header smuggling rule, the value-allowlist rule, and
the Sec-Fetch-Site cross-origin reject rule are evaluated identically.
A divergence between the two transports would be a security
regression — this helper exists to prevent it.

``allowed_networks`` (the ``--allow-host`` opt-in, #421) only widens
the *Host* allowlist to named LAN CIDRs. The Origin and Sec-Fetch-Site
rules are deliberately left untouched: a browser on the LAN sends a
non-loopback Origin (rejected) and a foreign Sec-Fetch-Site (rejected),
so DNS-rebinding defense survives the opt-in. A native remote agent
sends neither header, so it passes once its Host IP is allowed.
funnel their per-request extraction through this helper so the
duplicate-header smuggling rule, the value-allowlist rule, the
Sec-Fetch-Site cross-origin reject rule, and the peer-address gate are
evaluated identically. A divergence between the two transports would be
a security regression — this helper exists to prevent it.

``allowed_networks`` (the ``--allow-host`` opt-in, #421) widens access
to named LAN CIDRs. Authorization is anchored on ``peer_ip`` — the real
socket peer, which the client cannot forge — not on the ``Host`` header
(which it can). The Host header is still range-checked, but only as a
secondary filter; the peer gate is what actually keeps out-of-range
hosts off the server. The Origin and Sec-Fetch-Site rules are left
untouched: a browser on the LAN sends a non-loopback Origin (rejected)
and a foreign Sec-Fetch-Site (rejected), so DNS-rebinding defense
survives the opt-in. A native remote agent sends neither header, so it
passes once both its peer address and Host IP are allowed.

When ``allowed_networks`` is None (the loopback-only default), the peer
gate is a pass-through and behavior is byte-for-byte unchanged.
"""
if len(hosts) > 1 or len(origins) > 1:
return False
Expand All @@ -255,7 +310,8 @@ def evaluate_loopback(
origin = origins[0] if origins else None
sec_fetch_site = sec_fetch_sites[0] if sec_fetch_sites else None
return (
is_allowed_host(host, allowed_networks)
peer_ip_allowed(peer_ip, allowed_networks)
and is_allowed_host(host, allowed_networks)
and is_allowed_origin(origin)
and is_allowed_sec_fetch_site(sec_fetch_site)
)
Expand Down Expand Up @@ -300,7 +356,13 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
elif key == b"sec-fetch-site":
sec_fetch_sites.append(raw_value.decode("latin-1"))

if evaluate_loopback(hosts, origins, sec_fetch_sites, self.allowed_networks):
## ASGI populates ``scope["client"]`` with the real ``(host, port)``
## peer — unforgeable, unlike the Host header. ``None`` in unusual
## servers; the peer gate fails closed for it only when opted in.
client = scope.get("client")
peer_ip = client[0] if client else None

if evaluate_loopback(hosts, origins, sec_fetch_sites, self.allowed_networks, peer_ip):
await self.app(scope, receive, send)
return
await _send_forbidden(send)
Expand Down Expand Up @@ -342,7 +404,14 @@ async def guard(connection, request):
hosts = list(request.headers.get_all("Host"))
origins = list(request.headers.get_all("Origin"))
sec_fetch_sites = list(request.headers.get_all("Sec-Fetch-Site"))
if evaluate_loopback(hosts, origins, sec_fetch_sites, networks):
## ``remote_address`` is the real TCP peer (set before the HTTP
## upgrade) — unforgeable, unlike the Host header. It's a
## ``(host, port[, flowinfo, scopeid])`` tuple, or None if the
## socket is already gone; the peer gate fails closed for None
## only when opted in.
Comment on lines +407 to +411
remote = getattr(connection, "remote_address", None)
peer_ip = remote[0] if remote else None
if evaluate_loopback(hosts, origins, sec_fetch_sites, networks, peer_ip):
return None
return connection.respond(HTTPStatus.FORBIDDEN, FORBIDDEN_BODY_TEXT)

Expand Down
149 changes: 144 additions & 5 deletions tests/unit/test_origin_guard.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
is_allowed_origin,
is_allowed_sec_fetch_site,
parse_allow_hosts,
peer_ip_allowed,
)

# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -243,8 +244,14 @@ async def _call_middleware(
*,
headers: list[tuple[bytes, bytes]],
scope_type: str = "http",
client: tuple[str, int] | None = None,
) -> tuple[list[dict], bool]:
"""Run the middleware against a synthetic scope; return (sent, inner_called)."""
"""Run the middleware against a synthetic scope; return (sent, inner_called).

``client`` populates ASGI's ``scope["client"]`` (the real ``(host, port)``
peer). Omitted by default so loopback-only tests don't need it — the peer
gate is a pass-through unless ``allowed_networks`` is set.
"""
inner_called = False

async def inner(scope, receive, send):
Expand All @@ -263,6 +270,8 @@ async def receive():
return {"type": "http.request", "body": b"", "more_body": False}

scope = {"type": scope_type, "method": "GET", "path": "/x", "headers": headers}
if client is not None:
scope["client"] = client
await middleware(scope, receive, send)
return sent, inner_called

Expand Down Expand Up @@ -518,8 +527,8 @@ def test_is_allowed_host_without_networks_unchanged() -> None:

def test_evaluate_loopback_native_lan_agent_passes() -> None:
nets = parse_allow_hosts(["192.168.1.0/24"])
# Native remote agent: LAN Host, no Origin, no Sec-Fetch-Site.
assert evaluate_loopback(["192.168.1.50:8000"], [], [], nets) is True
# Native remote agent: in-range peer, LAN Host, no Origin/Sec-Fetch-Site.
assert evaluate_loopback(["192.168.1.50:8000"], [], [], nets, peer_ip="192.168.1.50") is True


def test_evaluate_loopback_lan_browser_origin_rejected() -> None:
Expand All @@ -532,14 +541,34 @@ def test_evaluate_loopback_lan_browser_origin_rejected() -> None:
["http://192.168.1.50:8000"],
[],
nets,
peer_ip="192.168.1.50",
)
is False
)


def test_evaluate_loopback_lan_cross_site_subresource_rejected() -> None:
nets = parse_allow_hosts(["192.168.1.0/24"])
assert evaluate_loopback(["192.168.1.50:8000"], [], ["cross-site"], nets) is False
assert (
evaluate_loopback(["192.168.1.50:8000"], [], ["cross-site"], nets, peer_ip="192.168.1.50")
is False
)


def test_evaluate_loopback_spoofed_host_foreign_peer_rejected() -> None:
"""The core --allow-host fix: an out-of-range peer that spoofs an
in-range Host header is rejected on the peer gate. Host alone is not
the authority."""
nets = parse_allow_hosts(["192.168.1.0/24"])
# Attacker at 10.0.0.5 (outside the CIDR) sends Host: 192.168.1.50.
assert evaluate_loopback(["192.168.1.50:8000"], [], [], nets, peer_ip="10.0.0.5") is False


def test_evaluate_loopback_in_range_peer_loopback_host_passes() -> None:
"""A local client on the LAN-bound server (peer 127.0.0.1, Host
loopback) still works after the opt-in."""
nets = parse_allow_hosts(["192.168.1.0/24"])
assert evaluate_loopback(["127.0.0.1:8000"], [], [], nets, peer_ip="127.0.0.1") is True


async def test_middleware_allows_lan_host_when_opted_in() -> None:
Expand All @@ -548,6 +577,7 @@ async def test_middleware_allows_lan_host_when_opted_in() -> None:
sent, inner_called = await _call_middleware(
middleware,
headers=[(b"host", b"192.168.1.50:8000")],
client=("192.168.1.50", 51234),
)
assert inner_called is True
assert sent[0]["status"] == 200
Expand All @@ -562,17 +592,62 @@ async def test_middleware_rejects_lan_host_browser_origin_when_opted_in() -> Non
(b"host", b"192.168.1.50:8000"),
(b"origin", b"http://192.168.1.50:8000"),
],
client=("192.168.1.50", 51234),
)
assert inner_called is False
assert sent[0]["status"] == 403


async def test_middleware_rejects_spoofed_host_from_foreign_peer() -> None:
"""The --allow-host fix at the transport boundary: a peer OUTSIDE the
allowed range spoofs an in-range Host header. The peer gate refuses it
even though the Host passes the range check."""
nets = parse_allow_hosts(["192.168.1.0/24"])
middleware = LocalhostOnlyHTTPMiddleware(app=None, allowed_networks=nets) # type: ignore[arg-type]
sent, inner_called = await _call_middleware(
middleware,
headers=[(b"host", b"192.168.1.50:8000")],
client=("10.0.0.5", 51234),
)
assert inner_called is False
assert sent[0]["status"] == 403


async def test_middleware_rejects_when_opted_in_and_peer_unknown() -> None:
"""Bound off loopback but the ASGI server didn't populate scope["client"]
— fail closed rather than fall back to the spoofable Host header."""
nets = parse_allow_hosts(["192.168.1.0/24"])
middleware = LocalhostOnlyHTTPMiddleware(app=None, allowed_networks=nets) # type: ignore[arg-type]
sent, inner_called = await _call_middleware(
middleware,
headers=[(b"host", b"192.168.1.50:8000")],
client=None,
)
assert inner_called is False
assert sent[0]["status"] == 403


async def test_middleware_allows_loopback_peer_when_opted_in() -> None:
"""A local client (peer 127.0.0.1) on a LAN-bound server still works."""
nets = parse_allow_hosts(["192.168.1.0/24"])
middleware = LocalhostOnlyHTTPMiddleware(app=None, allowed_networks=nets) # type: ignore[arg-type]
sent, inner_called = await _call_middleware(
middleware,
headers=[(b"host", b"127.0.0.1:8000")],
client=("127.0.0.1", 51234),
)
assert inner_called is True
assert sent[0]["status"] == 200


async def test_middleware_rejects_lan_host_without_opt_in() -> None:
# Default middleware (no networks) keeps rejecting LAN hosts.
# Default middleware (no networks) keeps rejecting LAN hosts. A loopback
# peer is irrelevant — the loopback bind already guarantees it.
middleware = LocalhostOnlyHTTPMiddleware(app=None) # type: ignore[arg-type]
sent, inner_called = await _call_middleware(
middleware,
headers=[(b"host", b"192.168.1.50:8000")],
client=("127.0.0.1", 51234),
)
assert inner_called is False
assert sent[0]["status"] == 403
Expand All @@ -599,3 +674,67 @@ def test_bind_host_for_networks_prioritizes_ipv4_reachability() -> None:
# and silently drop IPv4 reachability. See bind_host_for_networks docstring.
assert bind_host_for_networks(parse_allow_hosts(["192.168.1.0/24", "fd00::/8"])) == "0.0.0.0"
assert bind_host_for_networks(parse_allow_hosts(["fd00::/8", "10.0.0.0/8"])) == "0.0.0.0"


# ---------------------------------------------------------------------------
# peer_ip_allowed — the authoritative, unforgeable LAN gate (--allow-host fix)
# ---------------------------------------------------------------------------


def test_peer_ip_allowed_no_networks_is_passthrough() -> None:
# Loopback-only default: the kernel bind guarantees a loopback peer, so the
# gate is a pass-through — even when the peer is unknown — keeping behavior
# byte-for-byte unchanged.
assert peer_ip_allowed(None, None) is True
assert peer_ip_allowed("127.0.0.1", None) is True
assert peer_ip_allowed("192.168.1.50", None) is True
assert peer_ip_allowed("10.0.0.5", []) is True


def test_peer_ip_allowed_in_network() -> None:
nets = parse_allow_hosts(["192.168.1.0/24"])
assert peer_ip_allowed("192.168.1.50", nets) is True
assert peer_ip_allowed("192.168.1.1", nets) is True


def test_peer_ip_allowed_out_of_network_rejected() -> None:
nets = parse_allow_hosts(["192.168.1.0/24"])
assert peer_ip_allowed("192.168.2.50", nets) is False
assert peer_ip_allowed("10.0.0.5", nets) is False


def test_peer_ip_allowed_loopback_always_ok_when_opted_in() -> None:
nets = parse_allow_hosts(["192.168.1.0/24"])
assert peer_ip_allowed("127.0.0.1", nets) is True
assert peer_ip_allowed("::1", nets) is True

Comment on lines +706 to +710

def test_peer_ip_allowed_missing_fails_closed_when_opted_in() -> None:
nets = parse_allow_hosts(["192.168.1.0/24"])
assert peer_ip_allowed(None, nets) is False
assert peer_ip_allowed("", nets) is False


def test_peer_ip_allowed_unparseable_fails_closed() -> None:
nets = parse_allow_hosts(["192.168.1.0/24"])
assert peer_ip_allowed("not-an-ip", nets) is False
# A DNS name never reaches here (peer is always an IP literal), but if it
# somehow did, it must not slip through.
assert peer_ip_allowed("attacker.example.com", nets) is False


def test_peer_ip_allowed_strips_ipv6_zone_id() -> None:
nets = parse_allow_hosts(["fd00::/8"])
assert peer_ip_allowed("fd00::1%eth0", nets) is True
assert peer_ip_allowed("2001:db8::1%eth0", nets) is False


def test_peer_ip_allowed_unwraps_ipv4_mapped_ipv6() -> None:
## A dual-stack listener can report an IPv4 peer as IPv4-mapped IPv6
## (``::ffff:a.b.c.d``); ``.is_loopback`` is False on that form and it
## never matches an IPv4 network, so without unwrapping a genuine
## loopback / in-network peer would be wrongly rejected.
nets = parse_allow_hosts(["192.168.1.0/24"])
assert peer_ip_allowed("::ffff:127.0.0.1", nets) is True # mapped loopback
assert peer_ip_allowed("::ffff:192.168.1.50", nets) is True # mapped in-network
assert peer_ip_allowed("::ffff:10.0.0.5", nets) is False # mapped out-of-network
Loading