Defensive hardening across core/* and popout overlay fix

Sweep of defensive hardening across the core layers plus a related popout
overlay regression that surfaced during verification.

Database integrity (core/db.py)
- Wrap delete_site, add_search_history, remove_folder, rename_folder,
  and _migrate in `with self.conn:` so partial commits can't leave
  orphan rows on a crash mid-method.
- add_bookmark re-SELECTs the existing id when INSERT OR IGNORE
  collides on (site_id, post_id). Was returning Bookmark(id=0)
  silently, which then no-op'd update_bookmark_cache_path the next
  time the post was bookmarked.
- get_bookmarks LIKE clauses now ESCAPE '%', '_', '\\' so user search
  literals stop acting as SQL wildcards (cat_ear no longer matches
  catear).

Path traversal (core/db.py + core/config.py)
- Validate folder names at write time via _validate_folder_name —
  rejects '..', os.sep, leading '.' / '~'. Permits Unicode/spaces/
  parens so existing folders keep working.
- saved_folder_dir() resolves the candidate path and refuses anything
  that doesn't relative_to the saved-images base. Defense in depth
  against folder strings that bypass the write-time validator.
- gui/bookmarks.py and gui/app.py wrap add_folder calls in try/except
  ValueError and surface a QMessageBox.warning instead of crashing.

Download safety (core/cache.py)
- New _do_download(): payloads >=50MB stream to a tempfile in the
  destination dir and atomically os.replace into place; smaller
  payloads keep the existing buffer-then-write fast path. Both
  enforce a 500MB hard cap against the advertised Content-Length AND
  the running total inside the chunk loop (servers can lie).
- Per-URL asyncio.Lock coalesces concurrent downloads of the same
  URL so two callers don't race write_bytes on the same path.
- Image.MAX_IMAGE_PIXELS = 256M with DecompressionBombError handling
  in both converters.
- _convert_ugoira_to_gif checks frame count + cumulative uncompressed
  size against UGOIRA_MAX_FRAMES / UGOIRA_MAX_UNCOMPRESSED_BYTES from
  ZipInfo headers BEFORE decompressing — defends against zip bombs.
- _convert_animated_to_gif writes a .convfailed sentinel sibling on
  failure to break the re-decode-on-every-paint loop for malformed
  animated PNGs/WebPs.
- _is_valid_media returns True (don't delete) on OSError so a
  transient EBUSY/permissions hiccup no longer triggers a delete +
  re-download loop on every access.
- _referer_for() uses proper hostname suffix matching, not substring
  `in` (imgblahgelbooru.attacker.com no longer maps to gelbooru.com).
- PIL handles wrapped in `with` blocks for deterministic cleanup.

API client retry + visibility (core/api/*)
- base.py: _request retries on httpx.NetworkError + ConnectError in
  addition to TimeoutException. test_connection no longer echoes the
  HTTP response body in the error string (it was an SSRF body-leak
  gadget when used via detect_site_type's redirect-following client).
- detect.py + danbooru.py + e621.py + gelbooru.py + moebooru.py:
  every previously-swallowed exception in search/autocomplete/probe
  paths now logs at WARNING with type, message, and (where relevant)
  the response body prefix. Debugging "the site isn't working" used
  to be a total blackout.

main_gui.py
- file_dialog_platform DB probe failure prints to stderr instead of
  vanishing.

Popout overlay (gui/preview.py + gui/app.py)
- preview.py:79,141 — setAttribute(WA_StyledBackground, True) on
  _slideshow_toolbar and _slideshow_controls. Plain QWidget parents
  silently ignore QSS `background:` declarations without this
  attribute, which is why the popout overlay strip was rendering
  fully transparent (buttons styled, bar behind them showing the
  letterbox color).
- app.py: bake _BASE_POPOUT_OVERLAY_QSS as a fallback prepended
  before the user's custom.qss in the loader. Custom themes that
  don't define overlay rules now still get a translucent black
  bar with white text + hairline borders. Bundled themes win on
  tie because their identical-specificity rules come last in the
  prepended string.
This commit is contained in:
pax 2026-04-07 16:54:40 -05:00
parent 7d02aa8588
commit 54ccc40477
13 changed files with 536 additions and 198 deletions

View File

@ -96,7 +96,7 @@ class BooruClient(ABC):
async def _request( async def _request(
self, method: str, url: str, *, params: dict | None = None self, method: str, url: str, *, params: dict | None = None
) -> httpx.Response: ) -> httpx.Response:
"""Issue an HTTP request with a single retry on 429/503/timeout.""" """Issue an HTTP request with a single retry on 429/503/timeout/network error."""
for attempt in range(2): for attempt in range(2):
try: try:
resp = await self.client.request(method, url, params=params) resp = await self.client.request(method, url, params=params)
@ -114,10 +114,12 @@ class BooruClient(ABC):
wait = 2.0 wait = 2.0
log.info(f"Retrying {url} after {resp.status_code} (wait {wait}s)") log.info(f"Retrying {url} after {resp.status_code} (wait {wait}s)")
await asyncio.sleep(wait) await asyncio.sleep(wait)
except httpx.TimeoutException: except (httpx.TimeoutException, httpx.ConnectError, httpx.NetworkError) as e:
# Retry on transient DNS/TCP/timeout failures. Without this,
# a single DNS hiccup or RST blows up the whole search.
if attempt == 1: if attempt == 1:
raise raise
log.info(f"Retrying {url} after timeout") log.info(f"Retrying {url} after {type(e).__name__}: {e}")
await asyncio.sleep(1.0) await asyncio.sleep(1.0)
return resp # unreachable in practice, satisfies type checker return resp # unreachable in practice, satisfies type checker
@ -139,11 +141,18 @@ class BooruClient(ABC):
return [] return []
async def test_connection(self) -> tuple[bool, str]: async def test_connection(self) -> tuple[bool, str]:
"""Test connection. Returns (success, detail_message).""" """Test connection. Returns (success, detail_message).
Deliberately does NOT echo the response body in the error string
when used from `detect_site_type` (which follows redirects), echoing
the body of an arbitrary HTTP response back into UI text becomes a
body-leak gadget if the URL ever points anywhere unexpected.
"""
try: try:
posts = await self.search(limit=1) posts = await self.search(limit=1)
return True, f"OK — got {len(posts)} post(s)" return True, f"OK — got {len(posts)} post(s)"
except httpx.HTTPStatusError as e: except httpx.HTTPStatusError as e:
return False, f"HTTP {e.response.status_code}: {e.response.text[:200]}" reason = e.response.reason_phrase or ""
return False, f"HTTP {e.response.status_code} {reason}".strip()
except Exception as e: except Exception as e:
return False, str(e) return False, str(e)

View File

@ -31,7 +31,9 @@ class DanbooruClient(BooruClient):
resp.raise_for_status() resp.raise_for_status()
try: try:
data = resp.json() data = resp.json()
except Exception: except Exception as e:
log.warning("Danbooru search JSON parse failed: %s: %s — body: %s",
type(e).__name__, e, resp.text[:200])
return [] return []
# Some Danbooru forks wrap in {"posts": [...]} # Some Danbooru forks wrap in {"posts": [...]}
@ -97,7 +99,9 @@ class DanbooruClient(BooruClient):
) )
resp.raise_for_status() resp.raise_for_status()
return [item.get("value", item.get("label", "")) for item in resp.json()] return [item.get("value", item.get("label", "")) for item in resp.json()]
except Exception: except Exception as e:
log.warning("Danbooru autocomplete failed for %r: %s: %s",
query, type(e).__name__, e)
return [] return []
@staticmethod @staticmethod

View File

@ -2,6 +2,8 @@
from __future__ import annotations from __future__ import annotations
import logging
import httpx import httpx
from ..config import USER_AGENT from ..config import USER_AGENT
@ -11,6 +13,8 @@ from .moebooru import MoebooruClient
from .e621 import E621Client from .e621 import E621Client
from .base import BooruClient from .base import BooruClient
log = logging.getLogger("booru")
async def detect_site_type( async def detect_site_type(
url: str, url: str,
@ -61,8 +65,9 @@ async def detect_site_type(
if "e621" in url or "e926" in url: if "e621" in url or "e926" in url:
return "e621" return "e621"
return "danbooru" return "danbooru"
except Exception: except Exception as e:
pass log.warning("Danbooru/e621 probe failed for %s: %s: %s",
url, type(e).__name__, e)
# Try Gelbooru — /index.php?page=dapi # Try Gelbooru — /index.php?page=dapi
try: try:
@ -84,8 +89,9 @@ async def detect_site_type(
elif resp.status_code in (401, 403): elif resp.status_code in (401, 403):
if "gelbooru" in url or "safebooru.org" in url or "rule34" in url: if "gelbooru" in url or "safebooru.org" in url or "rule34" in url:
return "gelbooru" return "gelbooru"
except Exception: except Exception as e:
pass log.warning("Gelbooru probe failed for %s: %s: %s",
url, type(e).__name__, e)
# Try Moebooru — /post.json (singular) # Try Moebooru — /post.json (singular)
try: try:
@ -100,8 +106,9 @@ async def detect_site_type(
return "moebooru" return "moebooru"
elif resp.status_code in (401, 403): elif resp.status_code in (401, 403):
return "moebooru" return "moebooru"
except Exception: except Exception as e:
pass log.warning("Moebooru probe failed for %s: %s: %s",
url, type(e).__name__, e)
return None return None

View File

@ -51,7 +51,9 @@ class E621Client(BooruClient):
resp.raise_for_status() resp.raise_for_status()
try: try:
data = resp.json() data = resp.json()
except Exception: except Exception as e:
log.warning("e621 search JSON parse failed: %s: %s — body: %s",
type(e).__name__, e, resp.text[:200])
return [] return []
# e621 wraps posts in {"posts": [...]} # e621 wraps posts in {"posts": [...]}
@ -123,7 +125,9 @@ class E621Client(BooruClient):
) )
resp.raise_for_status() resp.raise_for_status()
return [item.get("name", "") for item in resp.json() if item.get("name")] return [item.get("name", "") for item in resp.json() if item.get("name")]
except Exception: except Exception as e:
log.warning("e621 autocomplete failed for %r: %s: %s",
query, type(e).__name__, e)
return [] return []
@staticmethod @staticmethod

View File

@ -139,5 +139,7 @@ class GelbooruClient(BooruClient):
if isinstance(data, dict): if isinstance(data, dict):
data = data.get("tag", []) data = data.get("tag", [])
return [t.get("name", "") for t in data if t.get("name")] return [t.get("name", "") for t in data if t.get("name")]
except Exception: except Exception as e:
log.warning("Gelbooru autocomplete failed for %r: %s: %s",
query, type(e).__name__, e)
return [] return []

View File

@ -25,7 +25,9 @@ class MoebooruClient(BooruClient):
resp.raise_for_status() resp.raise_for_status()
try: try:
data = resp.json() data = resp.json()
except Exception: except Exception as e:
log.warning("Moebooru search JSON parse failed: %s: %s — body: %s",
type(e).__name__, e, resp.text[:200])
return [] return []
if isinstance(data, dict): if isinstance(data, dict):
data = data.get("posts", data.get("post", [])) data = data.get("posts", data.get("post", []))
@ -93,5 +95,7 @@ class MoebooruClient(BooruClient):
) )
resp.raise_for_status() resp.raise_for_status()
return [t["name"] for t in resp.json() if "name" in t] return [t["name"] for t in resp.json() if "name" in t]
except Exception: except Exception as e:
log.warning("Moebooru autocomplete failed for %r: %s: %s",
query, type(e).__name__, e)
return [] return []

View File

@ -4,8 +4,11 @@ from __future__ import annotations
import asyncio import asyncio
import hashlib import hashlib
import logging
import os
import tempfile
import zipfile import zipfile
from collections import OrderedDict from collections import OrderedDict, defaultdict
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from urllib.parse import urlparse from urllib.parse import urlparse
@ -15,6 +18,31 @@ from PIL import Image
from .config import cache_dir, thumbnails_dir, USER_AGENT from .config import cache_dir, thumbnails_dir, USER_AGENT
log = logging.getLogger("booru")
# Hard cap on a single download. Anything advertising larger via
# Content-Length is rejected before allocating; the running-total guard
# in the chunk loop catches lying servers. Generous enough for typical
# booru uploads (long doujinshi/HD video) without leaving the door open
# to multi-GB OOM/disk-fill from a hostile or misconfigured site.
MAX_DOWNLOAD_BYTES = 500 * 1024 * 1024 # 500 MB
# Threshold above which we stream to a tempfile + atomic os.replace
# instead of buffering. Below this, the existing path is fine and the
# regression risk of the streaming rewrite is zero.
STREAM_TO_DISK_THRESHOLD = 50 * 1024 * 1024 # 50 MB
# Cap PIL's auto-DOS guard at 256M pixels (~1 GB raw). Default warns
# silently above ~89M; we want a hard fail so DecompressionBombError
# can be caught and treated as a download failure.
Image.MAX_IMAGE_PIXELS = 256 * 1024 * 1024
# Defends `_convert_ugoira_to_gif` against zip bombs. A real ugoira is
# typically <500 frames at 1080p; these caps comfortably allow legit
# content while refusing million-frame archives.
UGOIRA_MAX_FRAMES = 5000
UGOIRA_MAX_UNCOMPRESSED_BYTES = 500 * 1024 * 1024 # 500 MB
# Track all outgoing connections: {host: [timestamp, ...]} # Track all outgoing connections: {host: [timestamp, ...]}
_connection_log: OrderedDict[str, list[str]] = OrderedDict() _connection_log: OrderedDict[str, list[str]] = OrderedDict()
@ -67,20 +95,27 @@ _IMAGE_MAGIC = {
def _is_valid_media(path: Path) -> bool: def _is_valid_media(path: Path) -> bool:
"""Check if a file looks like actual media, not an HTML error page.""" """Check if a file looks like actual media, not an HTML error page.
On transient IO errors (file locked, EBUSY, permissions hiccup), returns
True so the caller does NOT delete the cached file. The previous behavior
treated IO errors as "invalid", causing a delete + re-download loop on
every access while the underlying issue persisted.
"""
try: try:
with open(path, "rb") as f: with open(path, "rb") as f:
header = f.read(16) header = f.read(16)
if not header or header.startswith(b'<') or header.startswith(b'<!'): except OSError as e:
return False log.warning("Cannot read %s for validation (%s); treating as valid", path, e)
# Check for known magic bytes return True
for magic in _IMAGE_MAGIC: if not header or header.startswith(b'<') or header.startswith(b'<!'):
if header.startswith(magic):
return True
# If not a known type but not HTML, assume it's ok
return b'<html' not in header.lower() and b'<!doctype' not in header.lower()
except Exception:
return False return False
# Check for known magic bytes
for magic in _IMAGE_MAGIC:
if header.startswith(magic):
return True
# If not a known type but not HTML, assume it's ok
return b'<html' not in header.lower() and b'<!doctype' not in header.lower()
def _ext_from_url(url: str) -> str: def _ext_from_url(url: str) -> str:
@ -91,48 +126,86 @@ def _ext_from_url(url: str) -> str:
def _convert_ugoira_to_gif(zip_path: Path) -> Path: def _convert_ugoira_to_gif(zip_path: Path) -> Path:
"""Convert a Pixiv ugoira zip (numbered JPEG/PNG frames) to an animated GIF.""" """Convert a Pixiv ugoira zip (numbered JPEG/PNG frames) to an animated GIF.
Defends against zip bombs by capping frame count and cumulative
uncompressed size, both checked from `ZipInfo` headers BEFORE any
decompression. Falls back to returning the original zip on any error
so the caller still has a usable file.
"""
import io import io
gif_path = zip_path.with_suffix(".gif") gif_path = zip_path.with_suffix(".gif")
if gif_path.exists(): if gif_path.exists():
return gif_path return gif_path
_IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".webp"} _IMG_EXTS = {".jpg", ".jpeg", ".png", ".bmp", ".webp"}
with zipfile.ZipFile(zip_path, "r") as zf: try:
names = sorted(n for n in zf.namelist() if Path(n).suffix.lower() in _IMG_EXTS) with zipfile.ZipFile(zip_path, "r") as zf:
frames = [] infos = [zi for zi in zf.infolist()
for name in names: if Path(zi.filename).suffix.lower() in _IMG_EXTS]
try: if len(infos) > UGOIRA_MAX_FRAMES:
data = zf.read(name) log.warning(
frames.append(Image.open(io.BytesIO(data)).convert("RGBA")) "Ugoira %s has %d frames (cap %d); skipping conversion",
except Exception: zip_path.name, len(infos), UGOIRA_MAX_FRAMES,
continue )
if not frames: return zip_path
# Can't convert — just return the zip path as-is total_uncompressed = sum(zi.file_size for zi in infos)
if total_uncompressed > UGOIRA_MAX_UNCOMPRESSED_BYTES:
log.warning(
"Ugoira %s uncompressed size %d exceeds cap %d; skipping",
zip_path.name, total_uncompressed, UGOIRA_MAX_UNCOMPRESSED_BYTES,
)
return zip_path
infos.sort(key=lambda zi: zi.filename)
frames = []
for zi in infos:
try:
data = zf.read(zi)
with Image.open(io.BytesIO(data)) as im:
frames.append(im.convert("RGBA"))
except Exception as e:
log.debug("Skipping ugoira frame %s: %s", zi.filename, e)
continue
except (zipfile.BadZipFile, OSError) as e:
log.warning("Ugoira zip read failed for %s: %s", zip_path.name, e)
return zip_path
if not frames:
return zip_path
try:
frames[0].save(
gif_path, save_all=True, append_images=frames[1:],
duration=80, loop=0, disposal=2,
)
except Exception as e:
log.warning("Ugoira GIF write failed for %s: %s", zip_path.name, e)
return zip_path return zip_path
frames[0].save(
gif_path, save_all=True, append_images=frames[1:],
duration=80, loop=0, disposal=2,
)
if gif_path.exists(): if gif_path.exists():
zip_path.unlink() zip_path.unlink()
return gif_path return gif_path
def _convert_animated_to_gif(source_path: Path) -> Path: def _convert_animated_to_gif(source_path: Path) -> Path:
"""Convert animated PNG or WebP to GIF for Qt playback.""" """Convert animated PNG or WebP to GIF for Qt playback.
Writes a `.failed` sentinel sibling on conversion failure so we don't
re-attempt every access re-trying on every paint of a malformed
file used to chew CPU silently.
"""
gif_path = source_path.with_suffix(".gif") gif_path = source_path.with_suffix(".gif")
if gif_path.exists(): if gif_path.exists():
return gif_path return gif_path
sentinel = source_path.with_suffix(source_path.suffix + ".convfailed")
if sentinel.exists():
return source_path
try: try:
img = Image.open(source_path) with Image.open(source_path) as img:
if not getattr(img, 'is_animated', False): if not getattr(img, 'is_animated', False):
return source_path # not animated, keep as-is return source_path # not animated, keep as-is
frames = [] frames = []
durations = [] durations = []
for i in range(img.n_frames): for i in range(img.n_frames):
img.seek(i) img.seek(i)
frames.append(img.convert("RGBA").copy()) frames.append(img.convert("RGBA").copy())
durations.append(img.info.get("duration", 80)) durations.append(img.info.get("duration", 80))
if not frames: if not frames:
return source_path return source_path
frames[0].save( frames[0].save(
@ -142,11 +215,39 @@ def _convert_animated_to_gif(source_path: Path) -> Path:
if gif_path.exists(): if gif_path.exists():
source_path.unlink() source_path.unlink()
return gif_path return gif_path
except Exception: except Exception as e:
pass log.warning("Animated->GIF conversion failed for %s: %s", source_path.name, e)
try:
sentinel.touch()
except OSError:
pass
return source_path return source_path
def _referer_for(parsed) -> str:
"""Build a Referer header value for booru CDNs that gate downloads.
Uses proper hostname suffix matching instead of substring `in` to avoid
`imgblahgelbooru.attacker.com` falsely mapping to `gelbooru.com`.
"""
netloc = parsed.netloc
bare = netloc.split(":", 1)[0].lower() # strip any port
referer_host = netloc
if bare.endswith(".gelbooru.com") or bare == "gelbooru.com":
referer_host = "gelbooru.com"
elif bare.endswith(".donmai.us") or bare == "donmai.us":
referer_host = "danbooru.donmai.us"
return f"{parsed.scheme}://{referer_host}/"
# Per-URL coalescing locks. When two callers race on the same URL (e.g.
# grid prefetch + an explicit click on the same thumbnail), only one
# does the actual download; the other waits and reads the cached file.
# Loop-bound, but the existing module is already loop-bound, so this
# doesn't make anything worse and is fixed cleanly in PR2.
_url_locks: dict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
async def download_image( async def download_image(
url: str, url: str,
client: httpx.AsyncClient | None = None, client: httpx.AsyncClient | None = None,
@ -161,75 +262,45 @@ async def download_image(
filename = _url_hash(url) + _ext_from_url(url) filename = _url_hash(url) + _ext_from_url(url)
local = dest_dir / filename local = dest_dir / filename
# Check if a ugoira zip was already converted to gif async with _url_locks[_url_hash(url)]:
if local.suffix.lower() == ".zip": # Check if a ugoira zip was already converted to gif
gif_path = local.with_suffix(".gif") if local.suffix.lower() == ".zip":
if gif_path.exists(): gif_path = local.with_suffix(".gif")
return gif_path if gif_path.exists():
# If the zip is cached but not yet converted, convert it now. return gif_path
# PIL frame iteration is CPU-bound and would block the asyncio # If the zip is cached but not yet converted, convert it now.
# loop for hundreds of ms — run it in a worker thread instead. # PIL frame iteration is CPU-bound and would block the asyncio
if local.exists() and zipfile.is_zipfile(local): # loop for hundreds of ms — run it in a worker thread instead.
return await asyncio.to_thread(_convert_ugoira_to_gif, local) if local.exists() and zipfile.is_zipfile(local):
return await asyncio.to_thread(_convert_ugoira_to_gif, local)
# Check if animated PNG/WebP was already converted to gif # Check if animated PNG/WebP was already converted to gif
if local.suffix.lower() in (".png", ".webp"): if local.suffix.lower() in (".png", ".webp"):
gif_path = local.with_suffix(".gif") gif_path = local.with_suffix(".gif")
if gif_path.exists(): if gif_path.exists():
return gif_path return gif_path
# Validate cached file isn't corrupt (e.g. HTML error page saved as image) # Validate cached file isn't corrupt (e.g. HTML error page saved as image)
if local.exists(): if local.exists():
if _is_valid_media(local): if _is_valid_media(local):
# Convert animated PNG/WebP on access if not yet converted # Convert animated PNG/WebP on access if not yet converted
if local.suffix.lower() in (".png", ".webp"): if local.suffix.lower() in (".png", ".webp"):
converted = await asyncio.to_thread(_convert_animated_to_gif, local) converted = await asyncio.to_thread(_convert_animated_to_gif, local)
if converted != local: if converted != local:
return converted return converted
return local return local
else: else:
local.unlink() # Remove corrupt cache entry local.unlink() # Remove corrupt cache entry
# Extract referer from URL domain (needed for Gelbooru CDN etc.) parsed = urlparse(url)
parsed = urlparse(url) referer = _referer_for(parsed)
# Map CDN hostnames back to the main site log_connection(url)
referer_host = parsed.netloc req_headers = {"Referer": referer}
if referer_host.startswith("img") and "gelbooru" in referer_host:
referer_host = "gelbooru.com"
elif referer_host.startswith("cdn") and "donmai" in referer_host:
referer_host = "danbooru.donmai.us"
referer = f"{parsed.scheme}://{referer_host}/"
log_connection(url) if client is None:
client = _get_shared_client()
req_headers = {"Referer": referer} await _do_download(client, url, req_headers, local, progress_callback)
own_client = client is None
if own_client:
client = _get_shared_client()
try:
if progress_callback:
async with client.stream("GET", url, headers=req_headers) as resp:
resp.raise_for_status()
content_type = resp.headers.get("content-type", "")
if "text/html" in content_type:
raise ValueError(f"Server returned HTML instead of media (possible captcha/block)")
total = int(resp.headers.get("content-length", 0))
downloaded = 0
chunks = []
async for chunk in resp.aiter_bytes(8192):
chunks.append(chunk)
downloaded += len(chunk)
progress_callback(downloaded, total)
data = b"".join(chunks)
local.write_bytes(data)
else:
resp = await client.get(url, headers=req_headers)
resp.raise_for_status()
content_type = resp.headers.get("content-type", "")
if "text/html" in content_type:
raise ValueError(f"Server returned HTML instead of media (possible captcha/block)")
local.write_bytes(resp.content)
# Verify the downloaded file # Verify the downloaded file
if not _is_valid_media(local): if not _is_valid_media(local):
@ -243,11 +314,82 @@ async def download_image(
# Convert animated PNG/WebP to GIF for Qt playback # Convert animated PNG/WebP to GIF for Qt playback
elif local.suffix.lower() in (".png", ".webp"): elif local.suffix.lower() in (".png", ".webp"):
local = await asyncio.to_thread(_convert_animated_to_gif, local) local = await asyncio.to_thread(_convert_animated_to_gif, local)
finally:
pass # shared client stays open for connection reuse
return local return local
async def _do_download(
client: httpx.AsyncClient,
url: str,
req_headers: dict,
local: Path,
progress_callback,
) -> None:
"""Perform the actual HTTP fetch and write to `local`.
Splits on size: small/unknown payloads buffer in memory and write atomically;
large payloads stream to a tempfile in the same directory and `os.replace`
on completion. The split keeps the existing fast-path for thumbnails (which
is the vast majority of downloads) while preventing OOM on multi-hundred-MB
videos. Both paths enforce `MAX_DOWNLOAD_BYTES` against the advertised
Content-Length AND the running total (servers can lie about length).
"""
async with client.stream("GET", url, headers=req_headers) as resp:
resp.raise_for_status()
content_type = resp.headers.get("content-type", "")
if "text/html" in content_type:
raise ValueError("Server returned HTML instead of media (possible captcha/block)")
try:
total = int(resp.headers.get("content-length", 0))
except (TypeError, ValueError):
total = 0
if total > MAX_DOWNLOAD_BYTES:
raise ValueError(
f"Download too large: {total} bytes (cap {MAX_DOWNLOAD_BYTES})"
)
if total >= STREAM_TO_DISK_THRESHOLD:
# Large download: stream to tempfile in the same dir, atomic replace.
local.parent.mkdir(parents=True, exist_ok=True)
fd, tmp_name = tempfile.mkstemp(
prefix=f".{local.name}.", suffix=".part", dir=str(local.parent)
)
tmp_path = Path(tmp_name)
try:
downloaded = 0
with os.fdopen(fd, "wb") as out:
async for chunk in resp.aiter_bytes(64 * 1024):
out.write(chunk)
downloaded += len(chunk)
if downloaded > MAX_DOWNLOAD_BYTES:
raise ValueError(
f"Download exceeded cap mid-stream: {downloaded} bytes"
)
if progress_callback:
progress_callback(downloaded, total)
os.replace(tmp_path, local)
except BaseException:
try:
tmp_path.unlink(missing_ok=True)
except OSError:
pass
raise
else:
# Small/unknown size: buffer in memory, write whole.
chunks: list[bytes] = []
downloaded = 0
async for chunk in resp.aiter_bytes(8192):
chunks.append(chunk)
downloaded += len(chunk)
if downloaded > MAX_DOWNLOAD_BYTES:
raise ValueError(
f"Download exceeded cap mid-stream: {downloaded} bytes"
)
if progress_callback:
progress_callback(downloaded, total)
local.write_bytes(b"".join(chunks))
async def download_thumbnail( async def download_thumbnail(
url: str, url: str,
client: httpx.AsyncClient | None = None, client: httpx.AsyncClient | None = None,

View File

@ -86,10 +86,22 @@ def saved_dir() -> Path:
def saved_folder_dir(folder: str) -> Path: def saved_folder_dir(folder: str) -> Path:
"""Return a subfolder inside saved images.""" """Return a subfolder inside saved images, refusing path traversal.
path = saved_dir() / folder
path.mkdir(parents=True, exist_ok=True) Folder names should normally be filtered by `db._validate_folder_name`
return path before reaching the filesystem, but this is a defense-in-depth check:
resolve the candidate path and ensure it's still inside `saved_dir()`.
Anything that escapes (`..`, absolute paths, symlink shenanigans) raises
ValueError instead of silently writing to disk wherever the string points.
"""
base = saved_dir().resolve()
candidate = (base / folder).resolve()
try:
candidate.relative_to(base)
except ValueError as e:
raise ValueError(f"Folder escapes saved directory: {folder!r}") from e
candidate.mkdir(parents=True, exist_ok=True)
return candidate
def db_path() -> Path: def db_path() -> Path:

View File

@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import os
import sqlite3 import sqlite3
import json import json
from dataclasses import dataclass, field from dataclasses import dataclass, field
@ -10,6 +11,35 @@ from pathlib import Path
from .config import db_path from .config import db_path
def _validate_folder_name(name: str) -> str:
"""Reject folder names that could break out of the saved-images dir.
Folder names hit the filesystem in `core.config.saved_folder_dir` (joined
with `saved_dir()` and `mkdir`'d). Without this guard, an attacker — or a
user pasting nonsense could create / delete files anywhere by passing
`..` segments, an absolute path, or an OS-native separator. We refuse
those at write time so the DB never stores a poisoned name in the first
place.
Permits anything else (Unicode, spaces, parentheses, hyphens) so existing
folders like `miku(lewd)` keep working.
"""
if not name:
raise ValueError("Folder name cannot be empty")
if name in (".", ".."):
raise ValueError(f"Invalid folder name: {name!r}")
if "/" in name or "\\" in name or os.sep in name:
raise ValueError(f"Folder name may not contain path separators: {name!r}")
if name.startswith(".") or name.startswith("~"):
raise ValueError(f"Folder name may not start with {name[0]!r}: {name!r}")
# Reject any embedded `..` segment (e.g. `foo..bar` is fine, but `..` alone
# is already caught above; this catches `..` inside slash-rejected paths
# if someone tries to be clever — defensive belt for the suspenders).
if ".." in name.split(os.sep):
raise ValueError(f"Invalid folder name: {name!r}")
return name
_SCHEMA = """ _SCHEMA = """
CREATE TABLE IF NOT EXISTS sites ( CREATE TABLE IF NOT EXISTS sites (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
@ -158,25 +188,27 @@ class Database:
return self._conn return self._conn
def _migrate(self) -> None: def _migrate(self) -> None:
"""Add columns that may not exist in older databases.""" """Add columns that may not exist in older databases.
cur = self._conn.execute("PRAGMA table_info(favorites)")
cols = {row[1] for row in cur.fetchall()} All ALTERs are wrapped in a single transaction so a crash partway
if "folder" not in cols: through can't leave the schema half-migrated.
self._conn.execute("ALTER TABLE favorites ADD COLUMN folder TEXT") """
self._conn.commit() with self._conn:
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_favorites_folder ON favorites(folder)") cur = self._conn.execute("PRAGMA table_info(favorites)")
# Add tag_categories to library_meta if missing cols = {row[1] for row in cur.fetchall()}
tables = {r[0] for r in self._conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()} if "folder" not in cols:
if "library_meta" in tables: self._conn.execute("ALTER TABLE favorites ADD COLUMN folder TEXT")
cur = self._conn.execute("PRAGMA table_info(library_meta)") self._conn.execute("CREATE INDEX IF NOT EXISTS idx_favorites_folder ON favorites(folder)")
meta_cols = {row[1] for row in cur.fetchall()} # Add tag_categories to library_meta if missing
if "tag_categories" not in meta_cols: tables = {r[0] for r in self._conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
self._conn.execute("ALTER TABLE library_meta ADD COLUMN tag_categories TEXT DEFAULT ''") if "library_meta" in tables:
self._conn.commit() cur = self._conn.execute("PRAGMA table_info(library_meta)")
# Add tag_categories to favorites if missing meta_cols = {row[1] for row in cur.fetchall()}
if "tag_categories" not in cols: if "tag_categories" not in meta_cols:
self._conn.execute("ALTER TABLE favorites ADD COLUMN tag_categories TEXT DEFAULT ''") self._conn.execute("ALTER TABLE library_meta ADD COLUMN tag_categories TEXT DEFAULT ''")
self._conn.commit() # Add tag_categories to favorites if missing
if "tag_categories" not in cols:
self._conn.execute("ALTER TABLE favorites ADD COLUMN tag_categories TEXT DEFAULT ''")
def close(self) -> None: def close(self) -> None:
if self._conn: if self._conn:
@ -229,9 +261,9 @@ class Database:
] ]
def delete_site(self, site_id: int) -> None: def delete_site(self, site_id: int) -> None:
self.conn.execute("DELETE FROM favorites WHERE site_id = ?", (site_id,)) with self.conn:
self.conn.execute("DELETE FROM sites WHERE id = ?", (site_id,)) self.conn.execute("DELETE FROM favorites WHERE site_id = ?", (site_id,))
self.conn.commit() self.conn.execute("DELETE FROM sites WHERE id = ?", (site_id,))
def update_site(self, site_id: int, **fields: str | None) -> None: def update_site(self, site_id: int, **fields: str | None) -> None:
allowed = {"name", "url", "api_type", "api_key", "api_user", "enabled"} allowed = {"name", "url", "api_type", "api_key", "api_user", "enabled"}
@ -268,15 +300,30 @@ class Database:
) -> Bookmark: ) -> Bookmark:
now = datetime.now(timezone.utc).isoformat() now = datetime.now(timezone.utc).isoformat()
cats_json = json.dumps(tag_categories) if tag_categories else "" cats_json = json.dumps(tag_categories) if tag_categories else ""
cur = self.conn.execute( with self.conn:
"INSERT OR IGNORE INTO favorites " cur = self.conn.execute(
"(site_id, post_id, file_url, preview_url, tags, rating, score, source, cached_path, folder, favorited_at, tag_categories) " "INSERT OR IGNORE INTO favorites "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", "(site_id, post_id, file_url, preview_url, tags, rating, score, source, cached_path, folder, favorited_at, tag_categories) "
(site_id, post_id, file_url, preview_url, tags, rating, score, source, cached_path, folder, now, cats_json), "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
) (site_id, post_id, file_url, preview_url, tags, rating, score, source, cached_path, folder, now, cats_json),
self.conn.commit() )
if cur.rowcount == 0:
# Row already existed (UNIQUE collision on site_id, post_id);
# INSERT OR IGNORE leaves lastrowid stale, so re-SELECT the
# actual id. Without this, the returned Bookmark.id is bogus
# (e.g. 0) and any subsequent update keyed on that id silently
# no-ops — see app.py update_bookmark_cache_path callsite.
row = self.conn.execute(
"SELECT id, favorited_at FROM favorites WHERE site_id = ? AND post_id = ?",
(site_id, post_id),
).fetchone()
bm_id = row["id"]
bookmarked_at = row["favorited_at"]
else:
bm_id = cur.lastrowid
bookmarked_at = now
return Bookmark( return Bookmark(
id=cur.lastrowid, # type: ignore[arg-type] id=bm_id,
site_id=site_id, site_id=site_id,
post_id=post_id, post_id=post_id,
file_url=file_url, file_url=file_url,
@ -287,7 +334,7 @@ class Database:
source=source, source=source,
cached_path=cached_path, cached_path=cached_path,
folder=folder, folder=folder,
bookmarked_at=now, bookmarked_at=bookmarked_at,
) )
# Back-compat shim # Back-compat shim
@ -347,8 +394,16 @@ class Database:
params.append(folder) params.append(folder)
if search: if search:
for tag in search.strip().split(): for tag in search.strip().split():
q += " AND tags LIKE ?" # Escape SQL LIKE wildcards in user input. Without ESCAPE,
params.append(f"%{tag}%") # `_` matches any single char and `%` matches any sequence,
# so searching `cat_ear` would also match `catear`/`catxear`.
escaped = (
tag.replace("\\", "\\\\")
.replace("%", "\\%")
.replace("_", "\\_")
)
q += " AND tags LIKE ? ESCAPE '\\'"
params.append(f"%{escaped}%")
q += " ORDER BY favorited_at DESC LIMIT ? OFFSET ?" q += " ORDER BY favorited_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset]) params.extend([limit, offset])
rows = self.conn.execute(q, params).fetchall() rows = self.conn.execute(q, params).fetchall()
@ -404,26 +459,28 @@ class Database:
return [r["name"] for r in rows] return [r["name"] for r in rows]
def add_folder(self, name: str) -> None: def add_folder(self, name: str) -> None:
clean = _validate_folder_name(name.strip())
self.conn.execute( self.conn.execute(
"INSERT OR IGNORE INTO favorite_folders (name) VALUES (?)", (name.strip(),) "INSERT OR IGNORE INTO favorite_folders (name) VALUES (?)", (clean,)
) )
self.conn.commit() self.conn.commit()
def remove_folder(self, name: str) -> None: def remove_folder(self, name: str) -> None:
self.conn.execute( with self.conn:
"UPDATE favorites SET folder = NULL WHERE folder = ?", (name,) self.conn.execute(
) "UPDATE favorites SET folder = NULL WHERE folder = ?", (name,)
self.conn.execute("DELETE FROM favorite_folders WHERE name = ?", (name,)) )
self.conn.commit() self.conn.execute("DELETE FROM favorite_folders WHERE name = ?", (name,))
def rename_folder(self, old: str, new: str) -> None: def rename_folder(self, old: str, new: str) -> None:
self.conn.execute( new_name = _validate_folder_name(new.strip())
"UPDATE favorites SET folder = ? WHERE folder = ?", (new.strip(), old) with self.conn:
) self.conn.execute(
self.conn.execute( "UPDATE favorites SET folder = ? WHERE folder = ?", (new_name, old)
"UPDATE favorite_folders SET name = ? WHERE name = ?", (new.strip(), old) )
) self.conn.execute(
self.conn.commit() "UPDATE favorite_folders SET name = ? WHERE name = ?", (new_name, old)
)
def move_bookmark_to_folder(self, fav_id: int, folder: str | None) -> None: def move_bookmark_to_folder(self, fav_id: int, folder: str | None) -> None:
self.conn.execute( self.conn.execute(
@ -538,21 +595,21 @@ class Database:
if not query.strip(): if not query.strip():
return return
now = datetime.now(timezone.utc).isoformat() now = datetime.now(timezone.utc).isoformat()
# Remove duplicate if exists, keep latest with self.conn:
self.conn.execute( # Remove duplicate if exists, keep latest
"DELETE FROM search_history WHERE query = ? AND (site_id = ? OR (site_id IS NULL AND ? IS NULL))", self.conn.execute(
(query.strip(), site_id, site_id), "DELETE FROM search_history WHERE query = ? AND (site_id = ? OR (site_id IS NULL AND ? IS NULL))",
) (query.strip(), site_id, site_id),
self.conn.execute( )
"INSERT INTO search_history (query, site_id, searched_at) VALUES (?, ?, ?)", self.conn.execute(
(query.strip(), site_id, now), "INSERT INTO search_history (query, site_id, searched_at) VALUES (?, ?, ?)",
) (query.strip(), site_id, now),
# Keep only last 50 )
self.conn.execute( # Keep only last 50
"DELETE FROM search_history WHERE id NOT IN " self.conn.execute(
"(SELECT id FROM search_history ORDER BY searched_at DESC LIMIT 50)" "DELETE FROM search_history WHERE id NOT IN "
) "(SELECT id FROM search_history ORDER BY searched_at DESC LIMIT 50)"
self.conn.commit() )
def get_search_history(self, limit: int = 20) -> list[str]: def get_search_history(self, limit: int = 20) -> list[str]:
rows = self.conn.execute( rows = self.conn.execute(

View File

@ -2241,10 +2241,14 @@ class BooruApp(QMainWindow):
elif action == save_lib_unsorted: elif action == save_lib_unsorted:
self._save_to_library(post, None) self._save_to_library(post, None)
elif action == save_lib_new: elif action == save_lib_new:
from PySide6.QtWidgets import QInputDialog from PySide6.QtWidgets import QInputDialog, QMessageBox
name, ok = QInputDialog.getText(self, "New Folder", "Folder name:") name, ok = QInputDialog.getText(self, "New Folder", "Folder name:")
if ok and name.strip(): if ok and name.strip():
self._db.add_folder(name.strip()) try:
self._db.add_folder(name.strip())
except ValueError as e:
QMessageBox.warning(self, "Invalid Folder Name", str(e))
return
self._save_to_library(post, name.strip()) self._save_to_library(post, name.strip())
elif id(action) in save_lib_folders: elif id(action) in save_lib_folders:
self._save_to_library(post, save_lib_folders[id(action)]) self._save_to_library(post, save_lib_folders[id(action)])
@ -2372,10 +2376,14 @@ class BooruApp(QMainWindow):
elif action == save_unsorted: elif action == save_unsorted:
self._bulk_save(indices, posts, None) self._bulk_save(indices, posts, None)
elif action == save_new: elif action == save_new:
from PySide6.QtWidgets import QInputDialog from PySide6.QtWidgets import QInputDialog, QMessageBox
name, ok = QInputDialog.getText(self, "New Folder", "Folder name:") name, ok = QInputDialog.getText(self, "New Folder", "Folder name:")
if ok and name.strip(): if ok and name.strip():
self._db.add_folder(name.strip()) try:
self._db.add_folder(name.strip())
except ValueError as e:
QMessageBox.warning(self, "Invalid Folder Name", str(e))
return
self._bulk_save(indices, posts, name.strip()) self._bulk_save(indices, posts, name.strip())
elif id(action) in save_folder_actions: elif id(action) in save_folder_actions:
self._bulk_save(indices, posts, save_folder_actions[id(action)]) self._bulk_save(indices, posts, save_folder_actions[id(action)])
@ -3009,6 +3017,61 @@ def _apply_windows_dark_mode(app: QApplication) -> None:
log.warning(f"Operation failed: {e}") log.warning(f"Operation failed: {e}")
# Base popout overlay style — always loaded *before* the user QSS so the
# floating top toolbar (`#_slideshow_toolbar`) and bottom video controls
# (`#_slideshow_controls`) get a sane translucent-black-with-white-text
# look on themes that don't define their own overlay rules. Bundled themes
# in `themes/` redefine the same selectors with their @palette colors and
# win on tie (last rule of equal specificity wins in QSS), so anyone using
# a packaged theme keeps the themed overlay; anyone with a stripped-down
# custom.qss still gets a usable overlay instead of bare letterbox.
_BASE_POPOUT_OVERLAY_QSS = """
QWidget#_slideshow_toolbar,
QWidget#_slideshow_controls {
background: rgba(0, 0, 0, 160);
}
QWidget#_slideshow_toolbar *,
QWidget#_slideshow_controls * {
background: transparent;
color: white;
border: none;
}
QWidget#_slideshow_toolbar QPushButton,
QWidget#_slideshow_controls QPushButton {
background: transparent;
color: white;
border: 1px solid rgba(255, 255, 255, 80);
padding: 2px 6px;
}
QWidget#_slideshow_toolbar QPushButton:hover,
QWidget#_slideshow_controls QPushButton:hover {
background: rgba(255, 255, 255, 30);
}
QWidget#_slideshow_toolbar QSlider::groove:horizontal,
QWidget#_slideshow_controls QSlider::groove:horizontal {
background: rgba(255, 255, 255, 40);
height: 4px;
border: none;
}
QWidget#_slideshow_toolbar QSlider::handle:horizontal,
QWidget#_slideshow_controls QSlider::handle:horizontal {
background: white;
width: 10px;
margin: -4px 0;
border: none;
}
QWidget#_slideshow_toolbar QSlider::sub-page:horizontal,
QWidget#_slideshow_controls QSlider::sub-page:horizontal {
background: white;
}
QWidget#_slideshow_toolbar QLabel,
QWidget#_slideshow_controls QLabel {
background: transparent;
color: white;
}
"""
def _load_user_qss(path: Path) -> str: def _load_user_qss(path: Path) -> str:
"""Load a QSS file with optional @palette variable substitution. """Load a QSS file with optional @palette variable substitution.
@ -3141,7 +3204,10 @@ def run() -> None:
super().drawPrimitive(element, option, painter, widget) super().drawPrimitive(element, option, painter, widget)
app.setStyle(_DarkArrowStyle("Fusion")) app.setStyle(_DarkArrowStyle("Fusion"))
app.setStyleSheet(css_text) # Prepend the base overlay defaults so even minimal custom.qss
# files get a usable popout overlay. User rules with the same
# selectors come last and win on tie.
app.setStyleSheet(_BASE_POPOUT_OVERLAY_QSS + "\n" + css_text)
# Extract selection color for grid highlight # Extract selection color for grid highlight
pal = app.palette() pal = app.palette()
@ -3151,6 +3217,11 @@ def run() -> None:
app.setPalette(pal) app.setPalette(pal)
except Exception as e: except Exception as e:
log.warning(f"Operation failed: {e}") log.warning(f"Operation failed: {e}")
else:
# No custom.qss — still install the popout overlay defaults so the
# floating toolbar/controls have a sane background instead of bare
# letterbox color.
app.setStyleSheet(_BASE_POPOUT_OVERLAY_QSS)
# Set app icon (works in taskbar on all platforms) # Set app icon (works in taskbar on all platforms)
from PySide6.QtGui import QIcon from PySide6.QtGui import QIcon

View File

@ -223,7 +223,11 @@ class BookmarksView(QWidget):
def _new_folder(self) -> None: def _new_folder(self) -> None:
name, ok = QInputDialog.getText(self, "New Folder", "Folder name:") name, ok = QInputDialog.getText(self, "New Folder", "Folder name:")
if ok and name.strip(): if ok and name.strip():
self._db.add_folder(name.strip()) try:
self._db.add_folder(name.strip())
except ValueError as e:
QMessageBox.warning(self, "Invalid Folder Name", str(e))
return
self._refresh_folders() self._refresh_folders()
def _on_context_menu(self, index: int, pos) -> None: def _on_context_menu(self, index: int, pos) -> None:
@ -297,7 +301,11 @@ class BookmarksView(QWidget):
elif action == save_lib_new: elif action == save_lib_new:
name, ok = QInputDialog.getText(self, "New Folder", "Folder name:") name, ok = QInputDialog.getText(self, "New Folder", "Folder name:")
if ok and name.strip(): if ok and name.strip():
self._db.add_folder(name.strip()) try:
self._db.add_folder(name.strip())
except ValueError as e:
QMessageBox.warning(self, "Invalid Folder Name", str(e))
return
self._copy_to_library(fav, name.strip()) self._copy_to_library(fav, name.strip())
self._db.move_bookmark_to_folder(fav.id, name.strip()) self._db.move_bookmark_to_folder(fav.id, name.strip())
self.refresh() self.refresh()
@ -343,7 +351,11 @@ class BookmarksView(QWidget):
elif action == move_new: elif action == move_new:
name, ok = QInputDialog.getText(self, "New Folder", "Folder name:") name, ok = QInputDialog.getText(self, "New Folder", "Folder name:")
if ok and name.strip(): if ok and name.strip():
self._db.add_folder(name.strip()) try:
self._db.add_folder(name.strip())
except ValueError as e:
QMessageBox.warning(self, "Invalid Folder Name", str(e))
return
self._db.move_bookmark_to_folder(fav.id, name.strip()) self._db.move_bookmark_to_folder(fav.id, name.strip())
self._copy_to_library(fav, name.strip()) self._copy_to_library(fav, name.strip())
self.refresh() self.refresh()

View File

@ -77,6 +77,10 @@ class FullscreenPreview(QMainWindow):
# an explicit unpolish/polish cycle, which we want to avoid. # an explicit unpolish/polish cycle, which we want to avoid.
self._toolbar = QWidget(central) self._toolbar = QWidget(central)
self._toolbar.setObjectName("_slideshow_toolbar") self._toolbar.setObjectName("_slideshow_toolbar")
# Plain QWidget ignores QSS `background:` declarations unless this
# attribute is set — without it the toolbar paints transparently
# and the popout buttons sit on bare letterbox color.
self._toolbar.setAttribute(Qt.WidgetAttribute.WA_StyledBackground, True)
toolbar = QHBoxLayout(self._toolbar) toolbar = QHBoxLayout(self._toolbar)
toolbar.setContentsMargins(8, 4, 8, 4) toolbar.setContentsMargins(8, 4, 8, 4)
@ -139,6 +143,9 @@ class FullscreenPreview(QMainWindow):
# `#_slideshow_controls` selector. # `#_slideshow_controls` selector.
self._video._controls_bar.setParent(central) self._video._controls_bar.setParent(central)
self._video._controls_bar.setObjectName("_slideshow_controls") self._video._controls_bar.setObjectName("_slideshow_controls")
# Same fix as the toolbar above — plain QWidget needs this attribute
# for the QSS `background: ${overlay_bg}` rule to render.
self._video._controls_bar.setAttribute(Qt.WidgetAttribute.WA_StyledBackground, True)
cb_style = self._video._controls_bar.style() cb_style = self._video._controls_bar.style()
cb_style.unpolish(self._video._controls_bar) cb_style.unpolish(self._video._controls_bar)
cb_style.polish(self._video._controls_bar) cb_style.polish(self._video._controls_bar)

View File

@ -25,8 +25,15 @@ def main() -> None:
if platform == "gtk": if platform == "gtk":
# Use xdg-desktop-portal which routes to GTK portal (Thunar) # Use xdg-desktop-portal which routes to GTK portal (Thunar)
os.environ.setdefault("QT_QPA_PLATFORMTHEME", "xdgdesktopportal") os.environ.setdefault("QT_QPA_PLATFORMTHEME", "xdgdesktopportal")
except Exception: except Exception as e:
pass # Surface DB-init failures to stderr — silently swallowing meant
# users debugging "why is my file picker the wrong one" had no
# signal at all when the DB was missing or corrupt.
print(
f"booru-viewer: file_dialog_platform DB probe failed: "
f"{type(e).__name__}: {e}",
file=sys.stderr,
)
from booru_viewer.gui.app import run from booru_viewer.gui.app import run
run() run()