From 013fe43f956185091690ba707fbfce6990b337c2 Mon Sep 17 00:00:00 2001 From: pax Date: Sat, 11 Apr 2026 16:09:53 -0500 Subject: [PATCH] =?UTF-8?q?security:=20fix=20#1=20=E2=80=94=20add=20public?= =?UTF-8?q?-host=20validator=20helper?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces core/api/_safety.py containing check_public_host and the validate_public_request async request-hook. The hook rejects any URL whose host is (or resolves to) loopback, RFC1918, link-local (including 169.254.169.254 cloud metadata), CGNAT, unique-local v6, or multicast. Called on every request hop so it covers both the initial URL and every redirect target that httpx would otherwise follow blindly. Also exports redact_url / redact_params for finding #3 — the secret-key set lives in the same module since both #1 and #3 work is wired through httpx client event_hooks. Helper is stdlib-only (ipaddress, socket, urllib.parse) plus httpx; no new deps. Not yet wired into any httpx client; per-file wiring commits follow. Audit-Ref: SECURITY_AUDIT.md finding #1 Severity: High --- booru_viewer/core/api/_safety.py | 150 +++++++++++++++++++++ tests/core/api/test_safety.py | 215 +++++++++++++++++++++++++++++++ 2 files changed, 365 insertions(+) create mode 100644 booru_viewer/core/api/_safety.py create mode 100644 tests/core/api/test_safety.py diff --git a/booru_viewer/core/api/_safety.py b/booru_viewer/core/api/_safety.py new file mode 100644 index 0000000..ee0dac8 --- /dev/null +++ b/booru_viewer/core/api/_safety.py @@ -0,0 +1,150 @@ +"""Network-safety helpers for httpx clients. + +Keeps SSRF guards and secret redaction in one place so every httpx +client in the project can share a single implementation. All helpers +here are pure stdlib + httpx; no Qt, no project-side imports. +""" + +from __future__ import annotations + +import asyncio +import ipaddress +import socket +from typing import Any, Mapping +from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit + +import httpx + + +# --------------------------------------------------------------------------- +# SSRF guard — finding #1 +# --------------------------------------------------------------------------- + +_BLOCKED_V4 = [ + ipaddress.ip_network("0.0.0.0/8"), # this-network + ipaddress.ip_network("10.0.0.0/8"), # RFC1918 + ipaddress.ip_network("100.64.0.0/10"), # CGNAT + ipaddress.ip_network("127.0.0.0/8"), # loopback + ipaddress.ip_network("169.254.0.0/16"), # link-local (incl. 169.254.169.254 metadata) + ipaddress.ip_network("172.16.0.0/12"), # RFC1918 + ipaddress.ip_network("192.0.0.0/24"), # IETF protocol assignments + ipaddress.ip_network("192.168.0.0/16"), # RFC1918 + ipaddress.ip_network("198.18.0.0/15"), # benchmark + ipaddress.ip_network("224.0.0.0/4"), # multicast + ipaddress.ip_network("240.0.0.0/4"), # reserved +] + +_BLOCKED_V6 = [ + ipaddress.ip_network("::1/128"), # loopback + ipaddress.ip_network("::/128"), # unspecified + ipaddress.ip_network("::ffff:0:0/96"), # IPv4-mapped (covers v4 via v6) + ipaddress.ip_network("64:ff9b::/96"), # well-known NAT64 + ipaddress.ip_network("fc00::/7"), # unique local + ipaddress.ip_network("fe80::/10"), # link-local + ipaddress.ip_network("ff00::/8"), # multicast +] + + +def _is_blocked_ip(ip: ipaddress._BaseAddress) -> bool: + nets = _BLOCKED_V4 if isinstance(ip, ipaddress.IPv4Address) else _BLOCKED_V6 + return any(ip in net for net in nets) + + +def check_public_host(host: str) -> None: + """Raise httpx.RequestError if ``host`` is (or resolves to) a non-public IP. + + Blocks loopback, RFC1918, link-local (including the 169.254.169.254 + cloud-metadata endpoint), unique-local v6, and similar. Used by both + the initial request and every redirect hop — see + ``validate_public_request`` for the async wrapper. + """ + if not host: + return + try: + ip = ipaddress.ip_address(host) + except ValueError: + ip = None + if ip is not None: + if _is_blocked_ip(ip): + raise httpx.RequestError(f"blocked address: {host}") + return + try: + infos = socket.getaddrinfo(host, None) + except socket.gaierror as e: + raise httpx.RequestError(f"DNS resolution failed for {host}: {e}") + seen: set[str] = set() + for info in infos: + addr = info[4][0] + if addr in seen: + continue + seen.add(addr) + try: + resolved = ipaddress.ip_address(addr.split("%", 1)[0]) + except ValueError: + continue + if _is_blocked_ip(resolved): + raise httpx.RequestError( + f"blocked request target {host} -> {addr}" + ) + + +async def validate_public_request(request: httpx.Request) -> None: + """httpx request event hook — rejects private/metadata targets. + + Fires on every hop including redirects. The initial request to a + user-configured booru base_url is also validated; this intentionally + blocks users from pointing the app at ``http://localhost/`` or an + RFC1918 address (behavior change from v0.2.5). + + Limitation: TOCTOU / DNS rebinding. We resolve the host here, but + the kernel will re-resolve when the TCP connection actually opens, + and a rebinder that returns a public IP on first query and a + private IP on the second can bypass this hook. The project's threat + model is a *malicious booru returning a 3xx to a private address* — + not an active rebinder controlling the DNS recursor — so this check + is the intended defense line. If the threat model ever widens, the + follow-up is a custom httpx transport that validates post-connect. + """ + host = request.url.host + if not host: + return + await asyncio.to_thread(check_public_host, host) + + +# --------------------------------------------------------------------------- +# Credential redaction — finding #3 +# --------------------------------------------------------------------------- + +# Case-sensitive; matches the literal param names every booru client +# uses today (verified via grep across danbooru/e621/gelbooru/moebooru). +SECRET_KEYS: frozenset[str] = frozenset({ + "login", + "api_key", + "user_id", + "password_hash", +}) + + +def redact_url(url: str) -> str: + """Replace secret query params with ``***`` in a URL string. + + Preserves ordering and non-secret params. Empty-query URLs pass + through unchanged. + """ + parts = urlsplit(url) + if not parts.query: + return url + pairs = parse_qsl(parts.query, keep_blank_values=True) + redacted = [(k, "***" if k in SECRET_KEYS else v) for k, v in pairs] + return urlunsplit(( + parts.scheme, + parts.netloc, + parts.path, + urlencode(redacted), + parts.fragment, + )) + + +def redact_params(params: Mapping[str, Any]) -> dict[str, Any]: + """Return a copy of ``params`` with secret keys replaced by ``***``.""" + return {k: ("***" if k in SECRET_KEYS else v) for k, v in params.items()} diff --git a/tests/core/api/test_safety.py b/tests/core/api/test_safety.py new file mode 100644 index 0000000..81ba9a0 --- /dev/null +++ b/tests/core/api/test_safety.py @@ -0,0 +1,215 @@ +"""Tests for the shared network-safety helpers (SSRF guard + secret redaction).""" + +from __future__ import annotations + +import socket +from unittest.mock import patch + +import httpx +import pytest + +from booru_viewer.core.api._safety import ( + SECRET_KEYS, + check_public_host, + redact_params, + redact_url, + validate_public_request, +) + + +# ====================================================================== +# SSRF guard — finding #1 +# ====================================================================== + + +def test_public_v4_literal_passes(): + check_public_host("8.8.8.8") + check_public_host("1.1.1.1") + + +def test_loopback_v4_rejected(): + with pytest.raises(httpx.RequestError): + check_public_host("127.0.0.1") + with pytest.raises(httpx.RequestError): + check_public_host("127.0.0.53") + + +def test_cloud_metadata_ip_rejected(): + """169.254.169.254 — AWS/GCE/Azure metadata service.""" + with pytest.raises(httpx.RequestError): + check_public_host("169.254.169.254") + + +def test_rfc1918_rejected(): + with pytest.raises(httpx.RequestError): + check_public_host("10.0.0.1") + with pytest.raises(httpx.RequestError): + check_public_host("172.16.5.4") + with pytest.raises(httpx.RequestError): + check_public_host("192.168.1.1") + + +def test_cgnat_rejected(): + with pytest.raises(httpx.RequestError): + check_public_host("100.64.0.1") + + +def test_multicast_v4_rejected(): + with pytest.raises(httpx.RequestError): + check_public_host("224.0.0.1") + + +def test_ipv6_loopback_rejected(): + with pytest.raises(httpx.RequestError): + check_public_host("::1") + + +def test_ipv6_unique_local_rejected(): + with pytest.raises(httpx.RequestError): + check_public_host("fc00::1") + with pytest.raises(httpx.RequestError): + check_public_host("fd12:3456:789a::1") + + +def test_ipv6_link_local_rejected(): + with pytest.raises(httpx.RequestError): + check_public_host("fe80::1") + + +def test_ipv6_multicast_rejected(): + with pytest.raises(httpx.RequestError): + check_public_host("ff02::1") + + +def test_public_v6_passes(): + # Google DNS + check_public_host("2001:4860:4860::8888") + + +def test_hostname_dns_failure_raises(): + def _gaierror(*a, **kw): + raise socket.gaierror(-2, "Name or service not known") + with patch("socket.getaddrinfo", _gaierror): + with pytest.raises(httpx.RequestError): + check_public_host("nonexistent.test.invalid") + + +def test_hostname_resolving_to_loopback_rejected(): + def _fake(*a, **kw): + return [(socket.AF_INET, 0, 0, "", ("127.0.0.1", 0))] + with patch("socket.getaddrinfo", _fake): + with pytest.raises(httpx.RequestError, match="blocked request target"): + check_public_host("mean.example") + + +def test_hostname_resolving_to_metadata_rejected(): + def _fake(*a, **kw): + return [(socket.AF_INET, 0, 0, "", ("169.254.169.254", 0))] + with patch("socket.getaddrinfo", _fake): + with pytest.raises(httpx.RequestError): + check_public_host("stolen.example") + + +def test_hostname_resolving_to_public_passes(): + def _fake(*a, **kw): + return [(socket.AF_INET, 0, 0, "", ("8.8.8.8", 0))] + with patch("socket.getaddrinfo", _fake): + check_public_host("dns.google") + + +def test_hostname_with_mixed_results_rejected_on_any_private(): + """If any resolved address is private, reject — conservative.""" + def _fake(*a, **kw): + return [ + (socket.AF_INET, 0, 0, "", ("8.8.8.8", 0)), + (socket.AF_INET, 0, 0, "", ("127.0.0.1", 0)), + ] + with patch("socket.getaddrinfo", _fake): + with pytest.raises(httpx.RequestError): + check_public_host("dualhomed.example") + + +def test_empty_host_passes(): + """Edge case: httpx can call us with a relative URL mid-redirect.""" + check_public_host("") + + +@pytest.mark.asyncio +async def test_validate_public_request_hook_rejects_metadata(): + request = httpx.Request("GET", "http://169.254.169.254/latest/meta-data/") + with pytest.raises(httpx.RequestError): + await validate_public_request(request) + + +@pytest.mark.asyncio +async def test_validate_public_request_hook_allows_public(): + def _fake(*a, **kw): + return [(socket.AF_INET, 0, 0, "", ("8.8.8.8", 0))] + with patch("socket.getaddrinfo", _fake): + request = httpx.Request("GET", "https://example.test/") + await validate_public_request(request) # must not raise + + +# ====================================================================== +# Credential redaction — finding #3 +# ====================================================================== + + +def test_secret_keys_covers_all_booru_client_params(): + """Every secret query param used by any booru client must be in SECRET_KEYS.""" + # Danbooru: login + api_key + # e621: login + api_key + # Gelbooru: api_key + user_id + # Moebooru: login + password_hash + for key in ("login", "api_key", "user_id", "password_hash"): + assert key in SECRET_KEYS + + +def test_redact_url_replaces_secrets(): + redacted = redact_url("https://x.test/posts.json?login=alice&api_key=supersecret&tags=cats") + assert "alice" not in redacted + assert "supersecret" not in redacted + assert "tags=cats" in redacted + assert "login=%2A%2A%2A" in redacted + assert "api_key=%2A%2A%2A" in redacted + + +def test_redact_url_leaves_non_secret_params_alone(): + redacted = redact_url("https://x.test/posts.json?tags=cats&limit=50") + assert redacted == "https://x.test/posts.json?tags=cats&limit=50" + + +def test_redact_url_no_query_passthrough(): + assert redact_url("https://x.test/") == "https://x.test/" + assert redact_url("https://x.test/posts.json") == "https://x.test/posts.json" + + +def test_redact_url_password_hash_and_user_id(): + redacted = redact_url( + "https://x.test/post.json?login=a&password_hash=b&user_id=42&tags=cats" + ) + assert "password_hash=%2A%2A%2A" in redacted + assert "user_id=%2A%2A%2A" in redacted + assert "tags=cats" in redacted + + +def test_redact_url_preserves_fragment_and_path(): + redacted = redact_url("https://x.test/a/b/c?api_key=secret#frag") + assert redacted.startswith("https://x.test/a/b/c?") + assert redacted.endswith("#frag") + + +def test_redact_params_replaces_secrets(): + out = redact_params({"api_key": "s", "tags": "cats", "login": "alice"}) + assert out["api_key"] == "***" + assert out["login"] == "***" + assert out["tags"] == "cats" + + +def test_redact_params_empty(): + assert redact_params({}) == {} + + +def test_redact_params_no_secrets(): + out = redact_params({"tags": "cats", "limit": 50}) + assert out == {"tags": "cats", "limit": 50}