1919 is_allowed_origin ,
2020 is_allowed_sec_fetch_site ,
2121 parse_allow_hosts ,
22+ peer_ip_allowed ,
2223)
2324
2425# ---------------------------------------------------------------------------
@@ -243,8 +244,14 @@ async def _call_middleware(
243244 * ,
244245 headers : list [tuple [bytes , bytes ]],
245246 scope_type : str = "http" ,
247+ client : tuple [str , int ] | None = None ,
246248) -> tuple [list [dict ], bool ]:
247- """Run the middleware against a synthetic scope; return (sent, inner_called)."""
249+ """Run the middleware against a synthetic scope; return (sent, inner_called).
250+
251+ ``client`` populates ASGI's ``scope["client"]`` (the real ``(host, port)``
252+ peer). Omitted by default so loopback-only tests don't need it — the peer
253+ gate is a pass-through unless ``allowed_networks`` is set.
254+ """
248255 inner_called = False
249256
250257 async def inner (scope , receive , send ):
@@ -263,6 +270,8 @@ async def receive():
263270 return {"type" : "http.request" , "body" : b"" , "more_body" : False }
264271
265272 scope = {"type" : scope_type , "method" : "GET" , "path" : "/x" , "headers" : headers }
273+ if client is not None :
274+ scope ["client" ] = client
266275 await middleware (scope , receive , send )
267276 return sent , inner_called
268277
@@ -518,8 +527,8 @@ def test_is_allowed_host_without_networks_unchanged() -> None:
518527
519528def test_evaluate_loopback_native_lan_agent_passes () -> None :
520529 nets = parse_allow_hosts (["192.168.1.0/24" ])
521- # Native remote agent: LAN Host, no Origin , no Sec-Fetch-Site.
522- assert evaluate_loopback (["192.168.1.50:8000" ], [], [], nets ) is True
530+ # Native remote agent: in-range peer, LAN Host , no Origin/ Sec-Fetch-Site.
531+ assert evaluate_loopback (["192.168.1.50:8000" ], [], [], nets , peer_ip = "192.168.1.50" ) is True
523532
524533
525534def test_evaluate_loopback_lan_browser_origin_rejected () -> None :
@@ -532,14 +541,34 @@ def test_evaluate_loopback_lan_browser_origin_rejected() -> None:
532541 ["http://192.168.1.50:8000" ],
533542 [],
534543 nets ,
544+ peer_ip = "192.168.1.50" ,
535545 )
536546 is False
537547 )
538548
539549
540550def test_evaluate_loopback_lan_cross_site_subresource_rejected () -> None :
541551 nets = parse_allow_hosts (["192.168.1.0/24" ])
542- assert evaluate_loopback (["192.168.1.50:8000" ], [], ["cross-site" ], nets ) is False
552+ assert (
553+ evaluate_loopback (["192.168.1.50:8000" ], [], ["cross-site" ], nets , peer_ip = "192.168.1.50" )
554+ is False
555+ )
556+
557+
558+ def test_evaluate_loopback_spoofed_host_foreign_peer_rejected () -> None :
559+ """The core --allow-host fix: an out-of-range peer that spoofs an
560+ in-range Host header is rejected on the peer gate. Host alone is not
561+ the authority."""
562+ nets = parse_allow_hosts (["192.168.1.0/24" ])
563+ # Attacker at 10.0.0.5 (outside the CIDR) sends Host: 192.168.1.50.
564+ assert evaluate_loopback (["192.168.1.50:8000" ], [], [], nets , peer_ip = "10.0.0.5" ) is False
565+
566+
567+ def test_evaluate_loopback_in_range_peer_loopback_host_passes () -> None :
568+ """A local client on the LAN-bound server (peer 127.0.0.1, Host
569+ loopback) still works after the opt-in."""
570+ nets = parse_allow_hosts (["192.168.1.0/24" ])
571+ assert evaluate_loopback (["127.0.0.1:8000" ], [], [], nets , peer_ip = "127.0.0.1" ) is True
543572
544573
545574async def test_middleware_allows_lan_host_when_opted_in () -> None :
@@ -548,6 +577,7 @@ async def test_middleware_allows_lan_host_when_opted_in() -> None:
548577 sent , inner_called = await _call_middleware (
549578 middleware ,
550579 headers = [(b"host" , b"192.168.1.50:8000" )],
580+ client = ("192.168.1.50" , 51234 ),
551581 )
552582 assert inner_called is True
553583 assert sent [0 ]["status" ] == 200
@@ -562,17 +592,62 @@ async def test_middleware_rejects_lan_host_browser_origin_when_opted_in() -> Non
562592 (b"host" , b"192.168.1.50:8000" ),
563593 (b"origin" , b"http://192.168.1.50:8000" ),
564594 ],
595+ client = ("192.168.1.50" , 51234 ),
596+ )
597+ assert inner_called is False
598+ assert sent [0 ]["status" ] == 403
599+
600+
601+ async def test_middleware_rejects_spoofed_host_from_foreign_peer () -> None :
602+ """The --allow-host fix at the transport boundary: a peer OUTSIDE the
603+ allowed range spoofs an in-range Host header. The peer gate refuses it
604+ even though the Host passes the range check."""
605+ nets = parse_allow_hosts (["192.168.1.0/24" ])
606+ middleware = LocalhostOnlyHTTPMiddleware (app = None , allowed_networks = nets ) # type: ignore[arg-type]
607+ sent , inner_called = await _call_middleware (
608+ middleware ,
609+ headers = [(b"host" , b"192.168.1.50:8000" )],
610+ client = ("10.0.0.5" , 51234 ),
611+ )
612+ assert inner_called is False
613+ assert sent [0 ]["status" ] == 403
614+
615+
616+ async def test_middleware_rejects_when_opted_in_and_peer_unknown () -> None :
617+ """Bound off loopback but the ASGI server didn't populate scope["client"]
618+ — fail closed rather than fall back to the spoofable Host header."""
619+ nets = parse_allow_hosts (["192.168.1.0/24" ])
620+ middleware = LocalhostOnlyHTTPMiddleware (app = None , allowed_networks = nets ) # type: ignore[arg-type]
621+ sent , inner_called = await _call_middleware (
622+ middleware ,
623+ headers = [(b"host" , b"192.168.1.50:8000" )],
624+ client = None ,
565625 )
566626 assert inner_called is False
567627 assert sent [0 ]["status" ] == 403
568628
569629
630+ async def test_middleware_allows_loopback_peer_when_opted_in () -> None :
631+ """A local client (peer 127.0.0.1) on a LAN-bound server still works."""
632+ nets = parse_allow_hosts (["192.168.1.0/24" ])
633+ middleware = LocalhostOnlyHTTPMiddleware (app = None , allowed_networks = nets ) # type: ignore[arg-type]
634+ sent , inner_called = await _call_middleware (
635+ middleware ,
636+ headers = [(b"host" , b"127.0.0.1:8000" )],
637+ client = ("127.0.0.1" , 51234 ),
638+ )
639+ assert inner_called is True
640+ assert sent [0 ]["status" ] == 200
641+
642+
570643async def test_middleware_rejects_lan_host_without_opt_in () -> None :
571- # Default middleware (no networks) keeps rejecting LAN hosts.
644+ # Default middleware (no networks) keeps rejecting LAN hosts. A loopback
645+ # peer is irrelevant — the loopback bind already guarantees it.
572646 middleware = LocalhostOnlyHTTPMiddleware (app = None ) # type: ignore[arg-type]
573647 sent , inner_called = await _call_middleware (
574648 middleware ,
575649 headers = [(b"host" , b"192.168.1.50:8000" )],
650+ client = ("127.0.0.1" , 51234 ),
576651 )
577652 assert inner_called is False
578653 assert sent [0 ]["status" ] == 403
@@ -599,3 +674,56 @@ def test_bind_host_for_networks_prioritizes_ipv4_reachability() -> None:
599674 # and silently drop IPv4 reachability. See bind_host_for_networks docstring.
600675 assert bind_host_for_networks (parse_allow_hosts (["192.168.1.0/24" , "fd00::/8" ])) == "0.0.0.0"
601676 assert bind_host_for_networks (parse_allow_hosts (["fd00::/8" , "10.0.0.0/8" ])) == "0.0.0.0"
677+
678+
679+ # ---------------------------------------------------------------------------
680+ # peer_ip_allowed — the authoritative, unforgeable LAN gate (--allow-host fix)
681+ # ---------------------------------------------------------------------------
682+
683+
684+ def test_peer_ip_allowed_no_networks_is_passthrough () -> None :
685+ # Loopback-only default: the kernel bind guarantees a loopback peer, so the
686+ # gate is a pass-through — even when the peer is unknown — keeping behavior
687+ # byte-for-byte unchanged.
688+ assert peer_ip_allowed (None , None ) is True
689+ assert peer_ip_allowed ("127.0.0.1" , None ) is True
690+ assert peer_ip_allowed ("192.168.1.50" , None ) is True
691+ assert peer_ip_allowed ("10.0.0.5" , []) is True
692+
693+
694+ def test_peer_ip_allowed_in_network () -> None :
695+ nets = parse_allow_hosts (["192.168.1.0/24" ])
696+ assert peer_ip_allowed ("192.168.1.50" , nets ) is True
697+ assert peer_ip_allowed ("192.168.1.1" , nets ) is True
698+
699+
700+ def test_peer_ip_allowed_out_of_network_rejected () -> None :
701+ nets = parse_allow_hosts (["192.168.1.0/24" ])
702+ assert peer_ip_allowed ("192.168.2.50" , nets ) is False
703+ assert peer_ip_allowed ("10.0.0.5" , nets ) is False
704+
705+
706+ def test_peer_ip_allowed_loopback_always_ok_when_opted_in () -> None :
707+ nets = parse_allow_hosts (["192.168.1.0/24" ])
708+ assert peer_ip_allowed ("127.0.0.1" , nets ) is True
709+ assert peer_ip_allowed ("::1" , nets ) is True
710+
711+
712+ def test_peer_ip_allowed_missing_fails_closed_when_opted_in () -> None :
713+ nets = parse_allow_hosts (["192.168.1.0/24" ])
714+ assert peer_ip_allowed (None , nets ) is False
715+ assert peer_ip_allowed ("" , nets ) is False
716+
717+
718+ def test_peer_ip_allowed_unparseable_fails_closed () -> None :
719+ nets = parse_allow_hosts (["192.168.1.0/24" ])
720+ assert peer_ip_allowed ("not-an-ip" , nets ) is False
721+ # A DNS name never reaches here (peer is always an IP literal), but if it
722+ # somehow did, it must not slip through.
723+ assert peer_ip_allowed ("attacker.example.com" , nets ) is False
724+
725+
726+ def test_peer_ip_allowed_strips_ipv6_zone_id () -> None :
727+ nets = parse_allow_hosts (["fd00::/8" ])
728+ assert peer_ip_allowed ("fd00::1%eth0" , nets ) is True
729+ assert peer_ip_allowed ("2001:db8::1%eth0" , nets ) is False
0 commit comments