security: fix #1 — add public-host validator helper
Introduces core/api/_safety.py containing check_public_host and the validate_public_request async request-hook. The hook rejects any URL whose host is (or resolves to) loopback, RFC1918, link-local (including 169.254.169.254 cloud metadata), CGNAT, unique-local v6, or multicast. Called on every request hop so it covers both the initial URL and every redirect target that httpx would otherwise follow blindly. Also exports redact_url / redact_params for finding #3 — the secret-key set lives in the same module since both #1 and #3 work is wired through httpx client event_hooks. Helper is stdlib-only (ipaddress, socket, urllib.parse) plus httpx; no new deps. Not yet wired into any httpx client; per-file wiring commits follow. Audit-Ref: SECURITY_AUDIT.md finding #1 Severity: High
This commit is contained in:
parent
72803f0b14
commit
013fe43f95
150
booru_viewer/core/api/_safety.py
Normal file
150
booru_viewer/core/api/_safety.py
Normal file
@ -0,0 +1,150 @@
|
|||||||
|
"""Network-safety helpers for httpx clients.
|
||||||
|
|
||||||
|
Keeps SSRF guards and secret redaction in one place so every httpx
|
||||||
|
client in the project can share a single implementation. All helpers
|
||||||
|
here are pure stdlib + httpx; no Qt, no project-side imports.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import ipaddress
|
||||||
|
import socket
|
||||||
|
from typing import Any, Mapping
|
||||||
|
from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# SSRF guard — finding #1
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
_BLOCKED_V4 = [
|
||||||
|
ipaddress.ip_network("0.0.0.0/8"), # this-network
|
||||||
|
ipaddress.ip_network("10.0.0.0/8"), # RFC1918
|
||||||
|
ipaddress.ip_network("100.64.0.0/10"), # CGNAT
|
||||||
|
ipaddress.ip_network("127.0.0.0/8"), # loopback
|
||||||
|
ipaddress.ip_network("169.254.0.0/16"), # link-local (incl. 169.254.169.254 metadata)
|
||||||
|
ipaddress.ip_network("172.16.0.0/12"), # RFC1918
|
||||||
|
ipaddress.ip_network("192.0.0.0/24"), # IETF protocol assignments
|
||||||
|
ipaddress.ip_network("192.168.0.0/16"), # RFC1918
|
||||||
|
ipaddress.ip_network("198.18.0.0/15"), # benchmark
|
||||||
|
ipaddress.ip_network("224.0.0.0/4"), # multicast
|
||||||
|
ipaddress.ip_network("240.0.0.0/4"), # reserved
|
||||||
|
]
|
||||||
|
|
||||||
|
_BLOCKED_V6 = [
|
||||||
|
ipaddress.ip_network("::1/128"), # loopback
|
||||||
|
ipaddress.ip_network("::/128"), # unspecified
|
||||||
|
ipaddress.ip_network("::ffff:0:0/96"), # IPv4-mapped (covers v4 via v6)
|
||||||
|
ipaddress.ip_network("64:ff9b::/96"), # well-known NAT64
|
||||||
|
ipaddress.ip_network("fc00::/7"), # unique local
|
||||||
|
ipaddress.ip_network("fe80::/10"), # link-local
|
||||||
|
ipaddress.ip_network("ff00::/8"), # multicast
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _is_blocked_ip(ip: ipaddress._BaseAddress) -> bool:
|
||||||
|
nets = _BLOCKED_V4 if isinstance(ip, ipaddress.IPv4Address) else _BLOCKED_V6
|
||||||
|
return any(ip in net for net in nets)
|
||||||
|
|
||||||
|
|
||||||
|
def check_public_host(host: str) -> None:
|
||||||
|
"""Raise httpx.RequestError if ``host`` is (or resolves to) a non-public IP.
|
||||||
|
|
||||||
|
Blocks loopback, RFC1918, link-local (including the 169.254.169.254
|
||||||
|
cloud-metadata endpoint), unique-local v6, and similar. Used by both
|
||||||
|
the initial request and every redirect hop — see
|
||||||
|
``validate_public_request`` for the async wrapper.
|
||||||
|
"""
|
||||||
|
if not host:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
ip = ipaddress.ip_address(host)
|
||||||
|
except ValueError:
|
||||||
|
ip = None
|
||||||
|
if ip is not None:
|
||||||
|
if _is_blocked_ip(ip):
|
||||||
|
raise httpx.RequestError(f"blocked address: {host}")
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
infos = socket.getaddrinfo(host, None)
|
||||||
|
except socket.gaierror as e:
|
||||||
|
raise httpx.RequestError(f"DNS resolution failed for {host}: {e}")
|
||||||
|
seen: set[str] = set()
|
||||||
|
for info in infos:
|
||||||
|
addr = info[4][0]
|
||||||
|
if addr in seen:
|
||||||
|
continue
|
||||||
|
seen.add(addr)
|
||||||
|
try:
|
||||||
|
resolved = ipaddress.ip_address(addr.split("%", 1)[0])
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
if _is_blocked_ip(resolved):
|
||||||
|
raise httpx.RequestError(
|
||||||
|
f"blocked request target {host} -> {addr}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def validate_public_request(request: httpx.Request) -> None:
|
||||||
|
"""httpx request event hook — rejects private/metadata targets.
|
||||||
|
|
||||||
|
Fires on every hop including redirects. The initial request to a
|
||||||
|
user-configured booru base_url is also validated; this intentionally
|
||||||
|
blocks users from pointing the app at ``http://localhost/`` or an
|
||||||
|
RFC1918 address (behavior change from v0.2.5).
|
||||||
|
|
||||||
|
Limitation: TOCTOU / DNS rebinding. We resolve the host here, but
|
||||||
|
the kernel will re-resolve when the TCP connection actually opens,
|
||||||
|
and a rebinder that returns a public IP on first query and a
|
||||||
|
private IP on the second can bypass this hook. The project's threat
|
||||||
|
model is a *malicious booru returning a 3xx to a private address* —
|
||||||
|
not an active rebinder controlling the DNS recursor — so this check
|
||||||
|
is the intended defense line. If the threat model ever widens, the
|
||||||
|
follow-up is a custom httpx transport that validates post-connect.
|
||||||
|
"""
|
||||||
|
host = request.url.host
|
||||||
|
if not host:
|
||||||
|
return
|
||||||
|
await asyncio.to_thread(check_public_host, host)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Credential redaction — finding #3
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
# Case-sensitive; matches the literal param names every booru client
|
||||||
|
# uses today (verified via grep across danbooru/e621/gelbooru/moebooru).
|
||||||
|
SECRET_KEYS: frozenset[str] = frozenset({
|
||||||
|
"login",
|
||||||
|
"api_key",
|
||||||
|
"user_id",
|
||||||
|
"password_hash",
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
def redact_url(url: str) -> str:
|
||||||
|
"""Replace secret query params with ``***`` in a URL string.
|
||||||
|
|
||||||
|
Preserves ordering and non-secret params. Empty-query URLs pass
|
||||||
|
through unchanged.
|
||||||
|
"""
|
||||||
|
parts = urlsplit(url)
|
||||||
|
if not parts.query:
|
||||||
|
return url
|
||||||
|
pairs = parse_qsl(parts.query, keep_blank_values=True)
|
||||||
|
redacted = [(k, "***" if k in SECRET_KEYS else v) for k, v in pairs]
|
||||||
|
return urlunsplit((
|
||||||
|
parts.scheme,
|
||||||
|
parts.netloc,
|
||||||
|
parts.path,
|
||||||
|
urlencode(redacted),
|
||||||
|
parts.fragment,
|
||||||
|
))
|
||||||
|
|
||||||
|
|
||||||
|
def redact_params(params: Mapping[str, Any]) -> dict[str, Any]:
|
||||||
|
"""Return a copy of ``params`` with secret keys replaced by ``***``."""
|
||||||
|
return {k: ("***" if k in SECRET_KEYS else v) for k, v in params.items()}
|
||||||
215
tests/core/api/test_safety.py
Normal file
215
tests/core/api/test_safety.py
Normal file
@ -0,0 +1,215 @@
|
|||||||
|
"""Tests for the shared network-safety helpers (SSRF guard + secret redaction)."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import socket
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from booru_viewer.core.api._safety import (
|
||||||
|
SECRET_KEYS,
|
||||||
|
check_public_host,
|
||||||
|
redact_params,
|
||||||
|
redact_url,
|
||||||
|
validate_public_request,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ======================================================================
|
||||||
|
# SSRF guard — finding #1
|
||||||
|
# ======================================================================
|
||||||
|
|
||||||
|
|
||||||
|
def test_public_v4_literal_passes():
|
||||||
|
check_public_host("8.8.8.8")
|
||||||
|
check_public_host("1.1.1.1")
|
||||||
|
|
||||||
|
|
||||||
|
def test_loopback_v4_rejected():
|
||||||
|
with pytest.raises(httpx.RequestError):
|
||||||
|
check_public_host("127.0.0.1")
|
||||||
|
with pytest.raises(httpx.RequestError):
|
||||||
|
check_public_host("127.0.0.53")
|
||||||
|
|
||||||
|
|
||||||
|
def test_cloud_metadata_ip_rejected():
|
||||||
|
"""169.254.169.254 — AWS/GCE/Azure metadata service."""
|
||||||
|
with pytest.raises(httpx.RequestError):
|
||||||
|
check_public_host("169.254.169.254")
|
||||||
|
|
||||||
|
|
||||||
|
def test_rfc1918_rejected():
|
||||||
|
with pytest.raises(httpx.RequestError):
|
||||||
|
check_public_host("10.0.0.1")
|
||||||
|
with pytest.raises(httpx.RequestError):
|
||||||
|
check_public_host("172.16.5.4")
|
||||||
|
with pytest.raises(httpx.RequestError):
|
||||||
|
check_public_host("192.168.1.1")
|
||||||
|
|
||||||
|
|
||||||
|
def test_cgnat_rejected():
|
||||||
|
with pytest.raises(httpx.RequestError):
|
||||||
|
check_public_host("100.64.0.1")
|
||||||
|
|
||||||
|
|
||||||
|
def test_multicast_v4_rejected():
|
||||||
|
with pytest.raises(httpx.RequestError):
|
||||||
|
check_public_host("224.0.0.1")
|
||||||
|
|
||||||
|
|
||||||
|
def test_ipv6_loopback_rejected():
|
||||||
|
with pytest.raises(httpx.RequestError):
|
||||||
|
check_public_host("::1")
|
||||||
|
|
||||||
|
|
||||||
|
def test_ipv6_unique_local_rejected():
|
||||||
|
with pytest.raises(httpx.RequestError):
|
||||||
|
check_public_host("fc00::1")
|
||||||
|
with pytest.raises(httpx.RequestError):
|
||||||
|
check_public_host("fd12:3456:789a::1")
|
||||||
|
|
||||||
|
|
||||||
|
def test_ipv6_link_local_rejected():
|
||||||
|
with pytest.raises(httpx.RequestError):
|
||||||
|
check_public_host("fe80::1")
|
||||||
|
|
||||||
|
|
||||||
|
def test_ipv6_multicast_rejected():
|
||||||
|
with pytest.raises(httpx.RequestError):
|
||||||
|
check_public_host("ff02::1")
|
||||||
|
|
||||||
|
|
||||||
|
def test_public_v6_passes():
|
||||||
|
# Google DNS
|
||||||
|
check_public_host("2001:4860:4860::8888")
|
||||||
|
|
||||||
|
|
||||||
|
def test_hostname_dns_failure_raises():
|
||||||
|
def _gaierror(*a, **kw):
|
||||||
|
raise socket.gaierror(-2, "Name or service not known")
|
||||||
|
with patch("socket.getaddrinfo", _gaierror):
|
||||||
|
with pytest.raises(httpx.RequestError):
|
||||||
|
check_public_host("nonexistent.test.invalid")
|
||||||
|
|
||||||
|
|
||||||
|
def test_hostname_resolving_to_loopback_rejected():
|
||||||
|
def _fake(*a, **kw):
|
||||||
|
return [(socket.AF_INET, 0, 0, "", ("127.0.0.1", 0))]
|
||||||
|
with patch("socket.getaddrinfo", _fake):
|
||||||
|
with pytest.raises(httpx.RequestError, match="blocked request target"):
|
||||||
|
check_public_host("mean.example")
|
||||||
|
|
||||||
|
|
||||||
|
def test_hostname_resolving_to_metadata_rejected():
|
||||||
|
def _fake(*a, **kw):
|
||||||
|
return [(socket.AF_INET, 0, 0, "", ("169.254.169.254", 0))]
|
||||||
|
with patch("socket.getaddrinfo", _fake):
|
||||||
|
with pytest.raises(httpx.RequestError):
|
||||||
|
check_public_host("stolen.example")
|
||||||
|
|
||||||
|
|
||||||
|
def test_hostname_resolving_to_public_passes():
|
||||||
|
def _fake(*a, **kw):
|
||||||
|
return [(socket.AF_INET, 0, 0, "", ("8.8.8.8", 0))]
|
||||||
|
with patch("socket.getaddrinfo", _fake):
|
||||||
|
check_public_host("dns.google")
|
||||||
|
|
||||||
|
|
||||||
|
def test_hostname_with_mixed_results_rejected_on_any_private():
|
||||||
|
"""If any resolved address is private, reject — conservative."""
|
||||||
|
def _fake(*a, **kw):
|
||||||
|
return [
|
||||||
|
(socket.AF_INET, 0, 0, "", ("8.8.8.8", 0)),
|
||||||
|
(socket.AF_INET, 0, 0, "", ("127.0.0.1", 0)),
|
||||||
|
]
|
||||||
|
with patch("socket.getaddrinfo", _fake):
|
||||||
|
with pytest.raises(httpx.RequestError):
|
||||||
|
check_public_host("dualhomed.example")
|
||||||
|
|
||||||
|
|
||||||
|
def test_empty_host_passes():
|
||||||
|
"""Edge case: httpx can call us with a relative URL mid-redirect."""
|
||||||
|
check_public_host("")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_validate_public_request_hook_rejects_metadata():
|
||||||
|
request = httpx.Request("GET", "http://169.254.169.254/latest/meta-data/")
|
||||||
|
with pytest.raises(httpx.RequestError):
|
||||||
|
await validate_public_request(request)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_validate_public_request_hook_allows_public():
|
||||||
|
def _fake(*a, **kw):
|
||||||
|
return [(socket.AF_INET, 0, 0, "", ("8.8.8.8", 0))]
|
||||||
|
with patch("socket.getaddrinfo", _fake):
|
||||||
|
request = httpx.Request("GET", "https://example.test/")
|
||||||
|
await validate_public_request(request) # must not raise
|
||||||
|
|
||||||
|
|
||||||
|
# ======================================================================
|
||||||
|
# Credential redaction — finding #3
|
||||||
|
# ======================================================================
|
||||||
|
|
||||||
|
|
||||||
|
def test_secret_keys_covers_all_booru_client_params():
|
||||||
|
"""Every secret query param used by any booru client must be in SECRET_KEYS."""
|
||||||
|
# Danbooru: login + api_key
|
||||||
|
# e621: login + api_key
|
||||||
|
# Gelbooru: api_key + user_id
|
||||||
|
# Moebooru: login + password_hash
|
||||||
|
for key in ("login", "api_key", "user_id", "password_hash"):
|
||||||
|
assert key in SECRET_KEYS
|
||||||
|
|
||||||
|
|
||||||
|
def test_redact_url_replaces_secrets():
|
||||||
|
redacted = redact_url("https://x.test/posts.json?login=alice&api_key=supersecret&tags=cats")
|
||||||
|
assert "alice" not in redacted
|
||||||
|
assert "supersecret" not in redacted
|
||||||
|
assert "tags=cats" in redacted
|
||||||
|
assert "login=%2A%2A%2A" in redacted
|
||||||
|
assert "api_key=%2A%2A%2A" in redacted
|
||||||
|
|
||||||
|
|
||||||
|
def test_redact_url_leaves_non_secret_params_alone():
|
||||||
|
redacted = redact_url("https://x.test/posts.json?tags=cats&limit=50")
|
||||||
|
assert redacted == "https://x.test/posts.json?tags=cats&limit=50"
|
||||||
|
|
||||||
|
|
||||||
|
def test_redact_url_no_query_passthrough():
|
||||||
|
assert redact_url("https://x.test/") == "https://x.test/"
|
||||||
|
assert redact_url("https://x.test/posts.json") == "https://x.test/posts.json"
|
||||||
|
|
||||||
|
|
||||||
|
def test_redact_url_password_hash_and_user_id():
|
||||||
|
redacted = redact_url(
|
||||||
|
"https://x.test/post.json?login=a&password_hash=b&user_id=42&tags=cats"
|
||||||
|
)
|
||||||
|
assert "password_hash=%2A%2A%2A" in redacted
|
||||||
|
assert "user_id=%2A%2A%2A" in redacted
|
||||||
|
assert "tags=cats" in redacted
|
||||||
|
|
||||||
|
|
||||||
|
def test_redact_url_preserves_fragment_and_path():
|
||||||
|
redacted = redact_url("https://x.test/a/b/c?api_key=secret#frag")
|
||||||
|
assert redacted.startswith("https://x.test/a/b/c?")
|
||||||
|
assert redacted.endswith("#frag")
|
||||||
|
|
||||||
|
|
||||||
|
def test_redact_params_replaces_secrets():
|
||||||
|
out = redact_params({"api_key": "s", "tags": "cats", "login": "alice"})
|
||||||
|
assert out["api_key"] == "***"
|
||||||
|
assert out["login"] == "***"
|
||||||
|
assert out["tags"] == "cats"
|
||||||
|
|
||||||
|
|
||||||
|
def test_redact_params_empty():
|
||||||
|
assert redact_params({}) == {}
|
||||||
|
|
||||||
|
|
||||||
|
def test_redact_params_no_secrets():
|
||||||
|
out = redact_params({"tags": "cats", "limit": 50})
|
||||||
|
assert out == {"tags": "cats", "limit": 50}
|
||||||
Loading…
x
Reference in New Issue
Block a user