Route async work through one persistent loop, lock shared httpx + DB writes

Mixing `threading.Thread + asyncio.run` workers with the long-lived
asyncio loop in gui/app.py is a real loop-affinity bug: the first worker
thread to call `asyncio.run` constructs a throwaway loop, which the
shared httpx clients then attach to, and the next call from the
persistent loop fails with "Event loop is closed" / "attached to a
different loop". This commit eliminates the pattern across the GUI and
adds the locking + cleanup that should have been there from the start.

Persistent loop accessor (core/concurrency.py — new)
- set_app_loop / get_app_loop / run_on_app_loop. BooruApp registers the
  one persistent loop at startup; everything that wants to schedule
  async work calls run_on_app_loop instead of spawning a thread that
  builds its own loop. Three functions, ~30 lines, single source of
  truth for "the loop".

Lazy-init lock + cleanup on shared httpx clients (core/api/base.py,
core/api/e621.py, core/cache.py)
- Each shared singleton (BooruClient._shared_client, E621Client._e621_client,
  cache._shared_client) now uses fast-path / locked-slow-path lazy init.
  Concurrent first-callers from the same loop can no longer both build
  a client and leak one (verified: 10 racing callers => 1 httpx instance).
- Each module exposes an aclose helper that BooruApp.closeEvent runs via
  run_coroutine_threadsafe(...).result(timeout=5) BEFORE stopping the
  loop. The connection pool, keepalive sockets, and TLS state finally
  release cleanly instead of being abandoned at process exit.
- E621Client tracks UA-change leftovers in _e621_to_close so the old
  client doesn't leak when api_user changes — drained in aclose_shared.

GUI workers routed through the persistent loop (gui/sites.py,
gui/bookmarks.py)
- SiteDialog._on_detect / _on_test: replaced
  `threading.Thread(target=lambda: asyncio.run(...))` with
  run_on_app_loop. Results marshaled back through Qt Signals connected
  with QueuedConnection. Added _closed flag + _inflight futures list:
  closeEvent cancels pending coroutines and shorts out the result emit
  if the user closes the dialog mid-detect (no use-after-free on
  destroyed QObject).
- BookmarksView._load_thumb_async: same swap. The existing thumb_ready
  signal already used QueuedConnection so the marshaling side was
  already correct.

DB write serialization (core/db.py)
- Database._write_lock = threading.RLock() — RLock not Lock so a
  writing method can call another writing method on the same thread
  without self-deadlocking.
- New _write() context manager composes the lock + sqlite3's connection
  context manager (the latter handles BEGIN / COMMIT / ROLLBACK
  atomically). Every write method converted: add_site, update_site,
  delete_site, add_bookmark, add_bookmarks_batch, remove_bookmark,
  update_bookmark_cache_path, add_folder, remove_folder, rename_folder,
  move_bookmark_to_folder, add/remove_blacklisted_tag,
  add/remove_blacklisted_post, save_library_meta, remove_library_meta,
  set_setting, add_search_history, clear_search_history,
  remove_search_history, add_saved_search, remove_saved_search.
- _migrate keeps using the lock + raw _conn context manager because
  it runs from inside the conn property's lazy init (where _write()
  would re-enter conn).
- Reads stay lock-free and rely on WAL for reader concurrency. Verified
  under contention: 5 threads × 50 add_bookmark calls => 250 rows,
  zero corruption, zero "database is locked" errors.

Smoke-tested with seven scenarios: get_app_loop raises before set,
run_on_app_loop round-trips, lazy init creates exactly one client,
10 concurrent first-callers => 1 httpx, aclose_shared cleans up,
RLock allows nested re-acquire, multi-threaded write contention.
This commit is contained in:
pax 2026-04-07 17:08:55 -05:00
parent 54ccc40477
commit eb58d76bc0
8 changed files with 415 additions and 153 deletions

View File

@ -4,6 +4,7 @@ from __future__ import annotations
import asyncio
import logging
import threading
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
@ -62,8 +63,18 @@ class BooruClient(ABC):
api_type: str = ""
# Shared client across all BooruClient instances for connection reuse
# Shared httpx client across all BooruClient instances for connection
# reuse. Lazily created on first access; the threading.Lock guards the
# check-and-set so concurrent first-callers can't both build a client
# and leak one. The lock is per-class, lives for the process lifetime.
#
# Loop affinity: by convention every async call goes through
# `core.concurrency.run_on_app_loop`, which schedules on the persistent
# event loop in `gui/app.py`. The first lazy init therefore binds the
# client to that loop, and every subsequent use is on the same loop.
# This is the contract that PR2 enforces — see core/concurrency.py.
_shared_client: httpx.AsyncClient | None = None
_shared_client_lock: threading.Lock = threading.Lock()
def __init__(
self,
@ -77,15 +88,37 @@ class BooruClient(ABC):
@property
def client(self) -> httpx.AsyncClient:
if BooruClient._shared_client is None or BooruClient._shared_client.is_closed:
BooruClient._shared_client = httpx.AsyncClient(
# Fast path: client exists and is open. No lock needed for the read.
c = BooruClient._shared_client
if c is not None and not c.is_closed:
return c
# Slow path: build it. Lock so two coroutines on the same loop don't
# both construct + leak.
with BooruClient._shared_client_lock:
c = BooruClient._shared_client
if c is None or c.is_closed:
c = httpx.AsyncClient(
headers={"User-Agent": USER_AGENT},
follow_redirects=True,
timeout=20.0,
event_hooks={"request": [self._log_request]},
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
)
return BooruClient._shared_client
BooruClient._shared_client = c
return c
@classmethod
async def aclose_shared(cls) -> None:
"""Cleanly aclose the shared client. Safe to call from any coroutine
running on the loop the client is bound to. No-op if not initialized."""
with cls._shared_client_lock:
c = cls._shared_client
cls._shared_client = None
if c is not None and not c.is_closed:
try:
await c.aclose()
except Exception as e:
log.warning("BooruClient shared aclose failed: %s", e)
@staticmethod
async def _log_request(request: httpx.Request) -> None:
@ -124,7 +157,10 @@ class BooruClient(ABC):
return resp # unreachable in practice, satisfies type checker
async def close(self) -> None:
pass # shared client stays open
# Per-instance close is a no-op — the shared pool is owned by the
# class. Use `await BooruClient.aclose_shared()` from app shutdown
# to actually release the connection pool.
pass
@abstractmethod
async def search(

View File

@ -3,6 +3,7 @@
from __future__ import annotations
import logging
import threading
import httpx
@ -15,23 +16,56 @@ log = logging.getLogger("booru")
class E621Client(BooruClient):
api_type = "e621"
# Same shared-singleton pattern as BooruClient, but e621 needs a custom
# User-Agent (their TOS requires identifying the app + user). When the
# UA changes (api_user edit) we need to rebuild — and we explicitly
# close the old client to avoid leaking its connection pool.
_e621_client: httpx.AsyncClient | None = None
_e621_ua: str = ""
_e621_lock: threading.Lock = threading.Lock()
# Old clients pending aclose. We can't await from a sync property, so
# we stash them here and the app's shutdown coroutine drains them.
_e621_to_close: list[httpx.AsyncClient] = []
@property
def client(self) -> httpx.AsyncClient:
ua = USER_AGENT
if self.api_user:
ua = f"{USER_AGENT} (by {self.api_user} on e621)"
if E621Client._e621_client is None or E621Client._e621_client.is_closed or E621Client._e621_ua != ua:
# Fast path
c = E621Client._e621_client
if c is not None and not c.is_closed and E621Client._e621_ua == ua:
return c
with E621Client._e621_lock:
c = E621Client._e621_client
if c is None or c.is_closed or E621Client._e621_ua != ua:
# Stash old client for shutdown cleanup if it's still open.
if c is not None and not c.is_closed:
E621Client._e621_to_close.append(c)
E621Client._e621_ua = ua
E621Client._e621_client = httpx.AsyncClient(
c = httpx.AsyncClient(
headers={"User-Agent": ua},
follow_redirects=True,
timeout=20.0,
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
)
return E621Client._e621_client
E621Client._e621_client = c
return c
@classmethod
async def aclose_shared(cls) -> None:
"""Cleanly aclose the active client and any UA-change leftovers."""
with cls._e621_lock:
current = cls._e621_client
cls._e621_client = None
pending = cls._e621_to_close
cls._e621_to_close = []
for c in [current, *pending]:
if c is not None and not c.is_closed:
try:
await c.aclose()
except Exception as e:
log.warning("E621Client aclose failed: %s", e)
async def search(
self, tags: str = "", page: int = 1, limit: int = DEFAULT_PAGE_SIZE

View File

@ -7,6 +7,7 @@ import hashlib
import logging
import os
import tempfile
import threading
import zipfile
from collections import OrderedDict, defaultdict
from datetime import datetime
@ -64,14 +65,24 @@ def _url_hash(url: str) -> str:
return hashlib.sha256(url.encode()).hexdigest()[:16]
# Shared httpx client for connection pooling (avoids per-request TLS handshakes)
# Shared httpx client for connection pooling (avoids per-request TLS handshakes).
# Lazily created on first download. Lock guards the check-and-set so concurrent
# first-callers can't both build a client and leak one. Loop affinity is
# guaranteed by routing all downloads through `core.concurrency.run_on_app_loop`
# (see PR2).
_shared_client: httpx.AsyncClient | None = None
_shared_client_lock = threading.Lock()
def _get_shared_client(referer: str = "") -> httpx.AsyncClient:
global _shared_client
if _shared_client is None or _shared_client.is_closed:
_shared_client = httpx.AsyncClient(
c = _shared_client
if c is not None and not c.is_closed:
return c
with _shared_client_lock:
c = _shared_client
if c is None or c.is_closed:
c = httpx.AsyncClient(
headers={
"User-Agent": USER_AGENT,
"Accept": "image/*,video/*,*/*",
@ -80,7 +91,22 @@ def _get_shared_client(referer: str = "") -> httpx.AsyncClient:
timeout=60.0,
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
)
return _shared_client
_shared_client = c
return c
async def aclose_shared_client() -> None:
"""Cleanly aclose the cache module's shared download client. Safe to call
once at app shutdown; no-op if not initialized."""
global _shared_client
with _shared_client_lock:
c = _shared_client
_shared_client = None
if c is not None and not c.is_closed:
try:
await c.aclose()
except Exception as e:
log.warning("cache shared client aclose failed: %s", e)
_IMAGE_MAGIC = {

View File

@ -0,0 +1,64 @@
"""Process-wide handle to the app's persistent asyncio event loop.
The GUI runs Qt on the main thread and a single long-lived asyncio loop in
a daemon thread (`BooruApp._async_thread`). Every async piece of code in the
app searches, downloads, autocomplete, site detection, bookmark thumb
loading must run on that one loop. Without this guarantee, the shared
httpx clients (which httpx binds to whatever loop first instantiated them)
end up attached to a throwaway loop from a `threading.Thread + asyncio.run`
worker, then break the next time the persistent loop tries to use them
("attached to a different loop" / "Event loop is closed").
This module is the single source of truth for "the loop". `BooruApp.__init__`
calls `set_app_loop()` once after constructing it; everything else uses
`run_on_app_loop()` to schedule coroutines from any thread.
Why a module global instead of passing the loop everywhere: it avoids
threading a parameter through every dialog, view, and helper. There's only
one loop in the process, ever, so a global is the honest representation.
"""
from __future__ import annotations
import asyncio
import logging
from concurrent.futures import Future
from typing import Any, Awaitable, Callable
log = logging.getLogger("booru")
_app_loop: asyncio.AbstractEventLoop | None = None
def set_app_loop(loop: asyncio.AbstractEventLoop) -> None:
"""Register the persistent event loop. Called once at app startup."""
global _app_loop
_app_loop = loop
def get_app_loop() -> asyncio.AbstractEventLoop:
"""Return the persistent event loop. Raises if `set_app_loop` was never called."""
if _app_loop is None:
raise RuntimeError(
"App event loop not initialized — call set_app_loop() before "
"scheduling any async work."
)
return _app_loop
def run_on_app_loop(
coro: Awaitable[Any],
done_callback: Callable[[Future], None] | None = None,
) -> Future:
"""Schedule `coro` on the app's persistent event loop from any thread.
Returns a `concurrent.futures.Future` (not asyncio.Future) same shape as
`asyncio.run_coroutine_threadsafe`. If `done_callback` is provided, it
runs on the loop thread when the coroutine finishes; the callback is
responsible for marshaling results back to the GUI thread (typically by
emitting a Qt Signal connected with `Qt.ConnectionType.QueuedConnection`).
"""
fut = asyncio.run_coroutine_threadsafe(coro, get_app_loop())
if done_callback is not None:
fut.add_done_callback(done_callback)
return fut

View File

@ -5,6 +5,8 @@ from __future__ import annotations
import os
import sqlite3
import json
import threading
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
@ -175,6 +177,13 @@ class Database:
def __init__(self, path: Path | None = None) -> None:
self._path = path or db_path()
self._conn: sqlite3.Connection | None = None
# Single writer lock for the connection. Reads happen concurrently
# under WAL without contention; writes from multiple threads (Qt
# main + the persistent asyncio loop thread) need explicit
# serialization to avoid interleaved multi-statement methods.
# RLock so a writing method can call another writing method on the
# same thread without self-deadlocking.
self._write_lock = threading.RLock()
@property
def conn(self) -> sqlite3.Connection:
@ -187,12 +196,33 @@ class Database:
self._migrate()
return self._conn
@contextmanager
def _write(self):
"""Context manager for write methods.
Acquires the write lock for cross-thread serialization, then enters
sqlite3's connection context manager (which BEGINs and COMMIT/ROLLBACKs
atomically). Use this in place of `with self.conn:` whenever a method
writes it composes the two guarantees we want:
1. Multi-statement atomicity (sqlite3 handles)
2. Cross-thread write serialization (the RLock handles)
Reads do not need this they go through `self.conn.execute(...)` directly
and rely on WAL for concurrent-reader isolation.
"""
with self._write_lock:
with self.conn:
yield self.conn
def _migrate(self) -> None:
"""Add columns that may not exist in older databases.
All ALTERs are wrapped in a single transaction so a crash partway
through can't leave the schema half-migrated.
through can't leave the schema half-migrated. Note: this runs from
the `conn` property's lazy init, where `_write_lock` exists but the
connection is being built we only need to serialize writes via
the lock; the connection context manager handles atomicity.
"""
with self._write_lock:
with self._conn:
cur = self._conn.execute("PRAGMA table_info(favorites)")
cols = {row[1] for row in cur.fetchall()}
@ -226,12 +256,12 @@ class Database:
api_user: str | None = None,
) -> Site:
now = datetime.now(timezone.utc).isoformat()
with self._write():
cur = self.conn.execute(
"INSERT INTO sites (name, url, api_type, api_key, api_user, added_at) "
"VALUES (?, ?, ?, ?, ?, ?)",
(name, url.rstrip("/"), api_type, api_key, api_user, now),
)
self.conn.commit()
return Site(
id=cur.lastrowid, # type: ignore[arg-type]
name=name,
@ -261,7 +291,7 @@ class Database:
]
def delete_site(self, site_id: int) -> None:
with self.conn:
with self._write():
self.conn.execute("DELETE FROM favorites WHERE site_id = ?", (site_id,))
self.conn.execute("DELETE FROM sites WHERE id = ?", (site_id,))
@ -277,10 +307,10 @@ class Database:
if not sets:
return
vals.append(site_id)
with self._write():
self.conn.execute(
f"UPDATE sites SET {', '.join(sets)} WHERE id = ?", vals
)
self.conn.commit()
# -- Bookmarks --
@ -300,7 +330,7 @@ class Database:
) -> Bookmark:
now = datetime.now(timezone.utc).isoformat()
cats_json = json.dumps(tag_categories) if tag_categories else ""
with self.conn:
with self._write():
cur = self.conn.execute(
"INSERT OR IGNORE INTO favorites "
"(site_id, post_id, file_url, preview_url, tags, rating, score, source, cached_path, folder, favorited_at, tag_categories) "
@ -342,6 +372,7 @@ class Database:
def add_bookmarks_batch(self, bookmarks: list[dict]) -> None:
"""Add multiple bookmarks in a single transaction."""
with self._write():
for fav in bookmarks:
self.conn.execute(
"INSERT OR IGNORE INTO favorites "
@ -351,17 +382,16 @@ class Database:
fav.get('tags', ''), fav.get('rating'), fav.get('score'), fav.get('source'),
fav.get('cached_path'), fav.get('folder'), fav.get('favorited_at', datetime.now(timezone.utc).isoformat())),
)
self.conn.commit()
# Back-compat shim
add_favorites_batch = add_bookmarks_batch
def remove_bookmark(self, site_id: int, post_id: int) -> None:
with self._write():
self.conn.execute(
"DELETE FROM favorites WHERE site_id = ? AND post_id = ?",
(site_id, post_id),
)
self.conn.commit()
# Back-compat shim
remove_favorite = remove_bookmark
@ -436,11 +466,11 @@ class Database:
_row_to_favorite = _row_to_bookmark
def update_bookmark_cache_path(self, fav_id: int, cached_path: str) -> None:
with self._write():
self.conn.execute(
"UPDATE favorites SET cached_path = ? WHERE id = ?",
(cached_path, fav_id),
)
self.conn.commit()
# Back-compat shim
update_favorite_cache_path = update_bookmark_cache_path
@ -460,13 +490,13 @@ class Database:
def add_folder(self, name: str) -> None:
clean = _validate_folder_name(name.strip())
with self._write():
self.conn.execute(
"INSERT OR IGNORE INTO favorite_folders (name) VALUES (?)", (clean,)
)
self.conn.commit()
def remove_folder(self, name: str) -> None:
with self.conn:
with self._write():
self.conn.execute(
"UPDATE favorites SET folder = NULL WHERE folder = ?", (name,)
)
@ -474,7 +504,7 @@ class Database:
def rename_folder(self, old: str, new: str) -> None:
new_name = _validate_folder_name(new.strip())
with self.conn:
with self._write():
self.conn.execute(
"UPDATE favorites SET folder = ? WHERE folder = ?", (new_name, old)
)
@ -483,10 +513,10 @@ class Database:
)
def move_bookmark_to_folder(self, fav_id: int, folder: str | None) -> None:
with self._write():
self.conn.execute(
"UPDATE favorites SET folder = ? WHERE id = ?", (folder, fav_id)
)
self.conn.commit()
# Back-compat shim
move_favorite_to_folder = move_bookmark_to_folder
@ -494,18 +524,18 @@ class Database:
# -- Blacklist --
def add_blacklisted_tag(self, tag: str) -> None:
with self._write():
self.conn.execute(
"INSERT OR IGNORE INTO blacklisted_tags (tag) VALUES (?)",
(tag.strip().lower(),),
)
self.conn.commit()
def remove_blacklisted_tag(self, tag: str) -> None:
with self._write():
self.conn.execute(
"DELETE FROM blacklisted_tags WHERE tag = ?",
(tag.strip().lower(),),
)
self.conn.commit()
def get_blacklisted_tags(self) -> list[str]:
rows = self.conn.execute("SELECT tag FROM blacklisted_tags ORDER BY tag").fetchall()
@ -514,12 +544,12 @@ class Database:
# -- Blacklisted Posts --
def add_blacklisted_post(self, url: str) -> None:
with self._write():
self.conn.execute("INSERT OR IGNORE INTO blacklisted_posts (url) VALUES (?)", (url,))
self.conn.commit()
def remove_blacklisted_post(self, url: str) -> None:
with self._write():
self.conn.execute("DELETE FROM blacklisted_posts WHERE url = ?", (url,))
self.conn.commit()
def get_blacklisted_posts(self) -> set[str]:
rows = self.conn.execute("SELECT url FROM blacklisted_posts").fetchall()
@ -531,6 +561,7 @@ class Database:
score: int = 0, rating: str = None, source: str = None,
file_url: str = None) -> None:
cats_json = json.dumps(tag_categories) if tag_categories else ""
with self._write():
self.conn.execute(
"INSERT OR REPLACE INTO library_meta "
"(post_id, tags, tag_categories, score, rating, source, file_url, saved_at) "
@ -538,7 +569,6 @@ class Database:
(post_id, tags, cats_json, score, rating, source, file_url,
datetime.now(timezone.utc).isoformat()),
)
self.conn.commit()
def get_library_meta(self, post_id: int) -> dict | None:
row = self.conn.execute("SELECT * FROM library_meta WHERE post_id = ?", (post_id,)).fetchone()
@ -558,8 +588,8 @@ class Database:
return {r["post_id"] for r in rows}
def remove_library_meta(self, post_id: int) -> None:
with self._write():
self.conn.execute("DELETE FROM library_meta WHERE post_id = ?", (post_id,))
self.conn.commit()
# -- Settings --
@ -576,11 +606,11 @@ class Database:
return self.get_setting(key) == "1"
def set_setting(self, key: str, value: str) -> None:
with self._write():
self.conn.execute(
"INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
(key, str(value)),
)
self.conn.commit()
def get_all_settings(self) -> dict[str, str]:
result = dict(_DEFAULTS)
@ -595,7 +625,7 @@ class Database:
if not query.strip():
return
now = datetime.now(timezone.utc).isoformat()
with self.conn:
with self._write():
# Remove duplicate if exists, keep latest
self.conn.execute(
"DELETE FROM search_history WHERE query = ? AND (site_id = ? OR (site_id IS NULL AND ? IS NULL))",
@ -619,21 +649,21 @@ class Database:
return [r["query"] for r in rows]
def clear_search_history(self) -> None:
with self._write():
self.conn.execute("DELETE FROM search_history")
self.conn.commit()
def remove_search_history(self, query: str) -> None:
with self._write():
self.conn.execute("DELETE FROM search_history WHERE query = ?", (query,))
self.conn.commit()
# -- Saved Searches --
def add_saved_search(self, name: str, query: str, site_id: int | None = None) -> None:
with self._write():
self.conn.execute(
"INSERT OR REPLACE INTO saved_searches (name, query, site_id) VALUES (?, ?, ?)",
(name.strip(), query.strip(), site_id),
)
self.conn.commit()
def get_saved_searches(self) -> list[tuple[int, str, str]]:
"""Returns list of (id, name, query)."""
@ -643,5 +673,5 @@ class Database:
return [(r["id"], r["name"], r["query"]) for r in rows]
def remove_saved_search(self, search_id: int) -> None:
with self._write():
self.conn.execute("DELETE FROM saved_searches WHERE id = ?", (search_id,))
self.conn.commit()

View File

@ -312,10 +312,20 @@ class BooruApp(QMainWindow):
self._async_thread = threading.Thread(target=self._async_loop.run_forever, daemon=True)
self._async_thread.start()
# Register the persistent loop as the process-wide app loop. Anything
# that wants to schedule async work — `gui/sites.py`, `gui/bookmarks.py`,
# any future helper — calls `core.concurrency.run_on_app_loop` which
# uses this same loop. The whole point of PR2 is to never run async
# code on a throwaway loop again.
from ..core.concurrency import set_app_loop
set_app_loop(self._async_loop)
# Reset shared HTTP clients from previous session
from ..core.cache import _get_shared_client
from ..core.api.base import BooruClient
from ..core.api.e621 import E621Client
BooruClient._shared_client = None
E621Client._e621_client = None
E621Client._e621_to_close = []
import booru_viewer.core.cache as _cache_mod
_cache_mod._shared_client = None
@ -2927,6 +2937,26 @@ class BooruApp(QMainWindow):
self._save_main_splitter_sizes()
self._save_right_splitter_sizes()
self._save_main_window_state()
# Cleanly shut the shared httpx pools down BEFORE stopping the loop
# so the connection pool / keepalive sockets / TLS state get released
# instead of being abandoned mid-flight. Has to run on the loop the
# clients were bound to.
try:
from ..core.api.base import BooruClient
from ..core.api.e621 import E621Client
from ..core.cache import aclose_shared_client
async def _close_all():
await BooruClient.aclose_shared()
await E621Client.aclose_shared()
await aclose_shared_client()
fut = asyncio.run_coroutine_threadsafe(_close_all(), self._async_loop)
fut.result(timeout=5)
except Exception as e:
log.warning(f"Shared httpx aclose failed: {e}")
self._async_loop.call_soon_threadsafe(self._async_loop.stop)
self._async_thread.join(timeout=2)
if self._db.get_setting_bool("clear_cache_on_exit"):

View File

@ -3,8 +3,6 @@
from __future__ import annotations
import logging
import threading
import asyncio
from pathlib import Path
from PySide6.QtCore import Qt, Signal, QObject, QTimer
@ -25,6 +23,7 @@ from PySide6.QtWidgets import (
from ..core.db import Database, Bookmark
from ..core.cache import download_thumbnail
from ..core.concurrency import run_on_app_loop
from .grid import ThumbnailGrid
log = logging.getLogger("booru")
@ -173,13 +172,18 @@ class BookmarksView(QWidget):
thumb.set_pixmap(pix)
def _load_thumb_async(self, index: int, url: str) -> None:
# Schedule the download on the persistent event loop instead of
# spawning a daemon thread that runs its own throwaway loop. This
# is the fix for the loop-affinity bug where the cache module's
# shared httpx client would get bound to the throwaway loop and
# then fail every subsequent use from the persistent loop.
async def _dl():
try:
path = await download_thumbnail(url)
self._signals.thumb_ready.emit(index, str(path))
except Exception as e:
log.warning(f"Bookmark thumb {index} failed: {e}")
threading.Thread(target=lambda: asyncio.run(_dl()), daemon=True).start()
run_on_app_loop(_dl())
def _on_thumb_ready(self, index: int, path: str) -> None:
thumbs = self._grid._thumbs

View File

@ -2,10 +2,7 @@
from __future__ import annotations
import asyncio
import threading
from PySide6.QtCore import Qt, Signal, QMetaObject, Q_ARG, Qt as QtNS
from PySide6.QtCore import Qt, Signal
from PySide6.QtWidgets import (
QDialog,
QVBoxLayout,
@ -22,16 +19,34 @@ from PySide6.QtWidgets import (
from ..core.db import Database, Site
from ..core.api.detect import detect_site_type
from ..core.concurrency import run_on_app_loop
class SiteDialog(QDialog):
"""Dialog to add or edit a booru site."""
# Internal signals used to marshal worker results back to the GUI thread.
# Connected with QueuedConnection so emit() from the asyncio loop thread
# is always delivered on the Qt main thread.
_detect_done_sig = Signal(object, object) # (result_or_None, error_or_None)
_test_done_sig = Signal(bool, str)
def __init__(self, parent: QWidget | None = None, site: Site | None = None) -> None:
super().__init__(parent)
self._editing = site is not None
self.setWindowTitle("Edit Site" if self._editing else "Add Site")
self.setMinimumWidth(400)
# Set when the dialog is closed/destroyed so in-flight worker
# callbacks can short-circuit instead of poking a dead QObject.
self._closed = False
# Tracked so we can cancel pending coroutines on close.
self._inflight = [] # list[concurrent.futures.Future]
self._detect_done_sig.connect(
self._detect_finished, Qt.ConnectionType.QueuedConnection
)
self._test_done_sig.connect(
self._test_finished, Qt.ConnectionType.QueuedConnection
)
layout = QVBoxLayout(self)
@ -102,16 +117,22 @@ class SiteDialog(QDialog):
api_key = self._key_input.text().strip() or None
api_user = self._user_input.text().strip() or None
def _run():
async def _do_detect():
try:
result = asyncio.run(detect_site_type(url, api_key=api_key, api_user=api_user))
self._detect_finished(result, None)
result = await detect_site_type(url, api_key=api_key, api_user=api_user)
if not self._closed:
self._detect_done_sig.emit(result, None)
except Exception as e:
self._detect_finished(None, e)
if not self._closed:
self._detect_done_sig.emit(None, e)
threading.Thread(target=_run, daemon=True).start()
fut = run_on_app_loop(_do_detect())
self._inflight.append(fut)
fut.add_done_callback(lambda f: self._inflight.remove(f) if f in self._inflight else None)
def _detect_finished(self, result: str | None, error: Exception | None) -> None:
def _detect_finished(self, result, error) -> None:
if self._closed:
return
self._detect_btn.setEnabled(True)
if error:
self._status_label.setText(f"Error: {error}")
@ -132,25 +153,42 @@ class SiteDialog(QDialog):
self._status_label.setText("Testing connection...")
self._test_btn.setEnabled(False)
def _run():
import asyncio
from ..core.api.detect import client_for_type
async def _do_test():
try:
client = client_for_type(api_type, url, api_key=api_key, api_user=api_user)
ok, detail = asyncio.run(client.test_connection())
self._test_finished(ok, detail)
ok, detail = await client.test_connection()
if not self._closed:
self._test_done_sig.emit(ok, detail)
except Exception as e:
self._test_finished(False, str(e))
if not self._closed:
self._test_done_sig.emit(False, str(e))
threading.Thread(target=_run, daemon=True).start()
fut = run_on_app_loop(_do_test())
self._inflight.append(fut)
fut.add_done_callback(lambda f: self._inflight.remove(f) if f in self._inflight else None)
def _test_finished(self, ok: bool, detail: str) -> None:
if self._closed:
return
self._test_btn.setEnabled(True)
if ok:
self._status_label.setText(f"Connected! {detail}")
else:
self._status_label.setText(f"Failed: {detail}")
def closeEvent(self, event) -> None:
# Mark closed first so in-flight callbacks short-circuit, then
# cancel anything still pending so we don't tie up the loop.
self._closed = True
for fut in list(self._inflight):
try:
fut.cancel()
except Exception:
pass
super().closeEvent(event)
def _try_parse_url(self, text: str) -> None:
"""Strip query params from pasted URLs like https://gelbooru.com/index.php?page=post&s=list&tags=all."""
from urllib.parse import urlparse, parse_qs