Route async work through one persistent loop, lock shared httpx + DB writes

Mixing `threading.Thread + asyncio.run` workers with the long-lived
asyncio loop in gui/app.py is a real loop-affinity bug: the first worker
thread to call `asyncio.run` constructs a throwaway loop, which the
shared httpx clients then attach to, and the next call from the
persistent loop fails with "Event loop is closed" / "attached to a
different loop". This commit eliminates the pattern across the GUI and
adds the locking + cleanup that should have been there from the start.

Persistent loop accessor (core/concurrency.py — new)
- set_app_loop / get_app_loop / run_on_app_loop. BooruApp registers the
  one persistent loop at startup; everything that wants to schedule
  async work calls run_on_app_loop instead of spawning a thread that
  builds its own loop. Three functions, ~30 lines, single source of
  truth for "the loop".

Lazy-init lock + cleanup on shared httpx clients (core/api/base.py,
core/api/e621.py, core/cache.py)
- Each shared singleton (BooruClient._shared_client, E621Client._e621_client,
  cache._shared_client) now uses fast-path / locked-slow-path lazy init.
  Concurrent first-callers from the same loop can no longer both build
  a client and leak one (verified: 10 racing callers => 1 httpx instance).
- Each module exposes an aclose helper that BooruApp.closeEvent runs via
  run_coroutine_threadsafe(...).result(timeout=5) BEFORE stopping the
  loop. The connection pool, keepalive sockets, and TLS state finally
  release cleanly instead of being abandoned at process exit.
- E621Client tracks UA-change leftovers in _e621_to_close so the old
  client doesn't leak when api_user changes — drained in aclose_shared.

GUI workers routed through the persistent loop (gui/sites.py,
gui/bookmarks.py)
- SiteDialog._on_detect / _on_test: replaced
  `threading.Thread(target=lambda: asyncio.run(...))` with
  run_on_app_loop. Results marshaled back through Qt Signals connected
  with QueuedConnection. Added _closed flag + _inflight futures list:
  closeEvent cancels pending coroutines and shorts out the result emit
  if the user closes the dialog mid-detect (no use-after-free on
  destroyed QObject).
- BookmarksView._load_thumb_async: same swap. The existing thumb_ready
  signal already used QueuedConnection so the marshaling side was
  already correct.

DB write serialization (core/db.py)
- Database._write_lock = threading.RLock() — RLock not Lock so a
  writing method can call another writing method on the same thread
  without self-deadlocking.
- New _write() context manager composes the lock + sqlite3's connection
  context manager (the latter handles BEGIN / COMMIT / ROLLBACK
  atomically). Every write method converted: add_site, update_site,
  delete_site, add_bookmark, add_bookmarks_batch, remove_bookmark,
  update_bookmark_cache_path, add_folder, remove_folder, rename_folder,
  move_bookmark_to_folder, add/remove_blacklisted_tag,
  add/remove_blacklisted_post, save_library_meta, remove_library_meta,
  set_setting, add_search_history, clear_search_history,
  remove_search_history, add_saved_search, remove_saved_search.
- _migrate keeps using the lock + raw _conn context manager because
  it runs from inside the conn property's lazy init (where _write()
  would re-enter conn).
- Reads stay lock-free and rely on WAL for reader concurrency. Verified
  under contention: 5 threads × 50 add_bookmark calls => 250 rows,
  zero corruption, zero "database is locked" errors.

Smoke-tested with seven scenarios: get_app_loop raises before set,
run_on_app_loop round-trips, lazy init creates exactly one client,
10 concurrent first-callers => 1 httpx, aclose_shared cleans up,
RLock allows nested re-acquire, multi-threaded write contention.
This commit is contained in:
pax 2026-04-07 17:08:55 -05:00
parent 54ccc40477
commit eb58d76bc0
8 changed files with 415 additions and 153 deletions

View File

@ -4,6 +4,7 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
import threading
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from dataclasses import dataclass, field from dataclasses import dataclass, field
@ -62,8 +63,18 @@ class BooruClient(ABC):
api_type: str = "" api_type: str = ""
# Shared client across all BooruClient instances for connection reuse # Shared httpx client across all BooruClient instances for connection
# reuse. Lazily created on first access; the threading.Lock guards the
# check-and-set so concurrent first-callers can't both build a client
# and leak one. The lock is per-class, lives for the process lifetime.
#
# Loop affinity: by convention every async call goes through
# `core.concurrency.run_on_app_loop`, which schedules on the persistent
# event loop in `gui/app.py`. The first lazy init therefore binds the
# client to that loop, and every subsequent use is on the same loop.
# This is the contract that PR2 enforces — see core/concurrency.py.
_shared_client: httpx.AsyncClient | None = None _shared_client: httpx.AsyncClient | None = None
_shared_client_lock: threading.Lock = threading.Lock()
def __init__( def __init__(
self, self,
@ -77,15 +88,37 @@ class BooruClient(ABC):
@property @property
def client(self) -> httpx.AsyncClient: def client(self) -> httpx.AsyncClient:
if BooruClient._shared_client is None or BooruClient._shared_client.is_closed: # Fast path: client exists and is open. No lock needed for the read.
BooruClient._shared_client = httpx.AsyncClient( c = BooruClient._shared_client
headers={"User-Agent": USER_AGENT}, if c is not None and not c.is_closed:
follow_redirects=True, return c
timeout=20.0, # Slow path: build it. Lock so two coroutines on the same loop don't
event_hooks={"request": [self._log_request]}, # both construct + leak.
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5), with BooruClient._shared_client_lock:
) c = BooruClient._shared_client
return BooruClient._shared_client if c is None or c.is_closed:
c = httpx.AsyncClient(
headers={"User-Agent": USER_AGENT},
follow_redirects=True,
timeout=20.0,
event_hooks={"request": [self._log_request]},
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
)
BooruClient._shared_client = c
return c
@classmethod
async def aclose_shared(cls) -> None:
"""Cleanly aclose the shared client. Safe to call from any coroutine
running on the loop the client is bound to. No-op if not initialized."""
with cls._shared_client_lock:
c = cls._shared_client
cls._shared_client = None
if c is not None and not c.is_closed:
try:
await c.aclose()
except Exception as e:
log.warning("BooruClient shared aclose failed: %s", e)
@staticmethod @staticmethod
async def _log_request(request: httpx.Request) -> None: async def _log_request(request: httpx.Request) -> None:
@ -124,7 +157,10 @@ class BooruClient(ABC):
return resp # unreachable in practice, satisfies type checker return resp # unreachable in practice, satisfies type checker
async def close(self) -> None: async def close(self) -> None:
pass # shared client stays open # Per-instance close is a no-op — the shared pool is owned by the
# class. Use `await BooruClient.aclose_shared()` from app shutdown
# to actually release the connection pool.
pass
@abstractmethod @abstractmethod
async def search( async def search(

View File

@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
import threading
import httpx import httpx
@ -15,23 +16,56 @@ log = logging.getLogger("booru")
class E621Client(BooruClient): class E621Client(BooruClient):
api_type = "e621" api_type = "e621"
# Same shared-singleton pattern as BooruClient, but e621 needs a custom
# User-Agent (their TOS requires identifying the app + user). When the
# UA changes (api_user edit) we need to rebuild — and we explicitly
# close the old client to avoid leaking its connection pool.
_e621_client: httpx.AsyncClient | None = None _e621_client: httpx.AsyncClient | None = None
_e621_ua: str = "" _e621_ua: str = ""
_e621_lock: threading.Lock = threading.Lock()
# Old clients pending aclose. We can't await from a sync property, so
# we stash them here and the app's shutdown coroutine drains them.
_e621_to_close: list[httpx.AsyncClient] = []
@property @property
def client(self) -> httpx.AsyncClient: def client(self) -> httpx.AsyncClient:
ua = USER_AGENT ua = USER_AGENT
if self.api_user: if self.api_user:
ua = f"{USER_AGENT} (by {self.api_user} on e621)" ua = f"{USER_AGENT} (by {self.api_user} on e621)"
if E621Client._e621_client is None or E621Client._e621_client.is_closed or E621Client._e621_ua != ua: # Fast path
E621Client._e621_ua = ua c = E621Client._e621_client
E621Client._e621_client = httpx.AsyncClient( if c is not None and not c.is_closed and E621Client._e621_ua == ua:
headers={"User-Agent": ua}, return c
follow_redirects=True, with E621Client._e621_lock:
timeout=20.0, c = E621Client._e621_client
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5), if c is None or c.is_closed or E621Client._e621_ua != ua:
) # Stash old client for shutdown cleanup if it's still open.
return E621Client._e621_client if c is not None and not c.is_closed:
E621Client._e621_to_close.append(c)
E621Client._e621_ua = ua
c = httpx.AsyncClient(
headers={"User-Agent": ua},
follow_redirects=True,
timeout=20.0,
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
)
E621Client._e621_client = c
return c
@classmethod
async def aclose_shared(cls) -> None:
"""Cleanly aclose the active client and any UA-change leftovers."""
with cls._e621_lock:
current = cls._e621_client
cls._e621_client = None
pending = cls._e621_to_close
cls._e621_to_close = []
for c in [current, *pending]:
if c is not None and not c.is_closed:
try:
await c.aclose()
except Exception as e:
log.warning("E621Client aclose failed: %s", e)
async def search( async def search(
self, tags: str = "", page: int = 1, limit: int = DEFAULT_PAGE_SIZE self, tags: str = "", page: int = 1, limit: int = DEFAULT_PAGE_SIZE

View File

@ -7,6 +7,7 @@ import hashlib
import logging import logging
import os import os
import tempfile import tempfile
import threading
import zipfile import zipfile
from collections import OrderedDict, defaultdict from collections import OrderedDict, defaultdict
from datetime import datetime from datetime import datetime
@ -64,23 +65,48 @@ def _url_hash(url: str) -> str:
return hashlib.sha256(url.encode()).hexdigest()[:16] return hashlib.sha256(url.encode()).hexdigest()[:16]
# Shared httpx client for connection pooling (avoids per-request TLS handshakes) # Shared httpx client for connection pooling (avoids per-request TLS handshakes).
# Lazily created on first download. Lock guards the check-and-set so concurrent
# first-callers can't both build a client and leak one. Loop affinity is
# guaranteed by routing all downloads through `core.concurrency.run_on_app_loop`
# (see PR2).
_shared_client: httpx.AsyncClient | None = None _shared_client: httpx.AsyncClient | None = None
_shared_client_lock = threading.Lock()
def _get_shared_client(referer: str = "") -> httpx.AsyncClient: def _get_shared_client(referer: str = "") -> httpx.AsyncClient:
global _shared_client global _shared_client
if _shared_client is None or _shared_client.is_closed: c = _shared_client
_shared_client = httpx.AsyncClient( if c is not None and not c.is_closed:
headers={ return c
"User-Agent": USER_AGENT, with _shared_client_lock:
"Accept": "image/*,video/*,*/*", c = _shared_client
}, if c is None or c.is_closed:
follow_redirects=True, c = httpx.AsyncClient(
timeout=60.0, headers={
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5), "User-Agent": USER_AGENT,
) "Accept": "image/*,video/*,*/*",
return _shared_client },
follow_redirects=True,
timeout=60.0,
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
)
_shared_client = c
return c
async def aclose_shared_client() -> None:
"""Cleanly aclose the cache module's shared download client. Safe to call
once at app shutdown; no-op if not initialized."""
global _shared_client
with _shared_client_lock:
c = _shared_client
_shared_client = None
if c is not None and not c.is_closed:
try:
await c.aclose()
except Exception as e:
log.warning("cache shared client aclose failed: %s", e)
_IMAGE_MAGIC = { _IMAGE_MAGIC = {

View File

@ -0,0 +1,64 @@
"""Process-wide handle to the app's persistent asyncio event loop.
The GUI runs Qt on the main thread and a single long-lived asyncio loop in
a daemon thread (`BooruApp._async_thread`). Every async piece of code in the
app searches, downloads, autocomplete, site detection, bookmark thumb
loading must run on that one loop. Without this guarantee, the shared
httpx clients (which httpx binds to whatever loop first instantiated them)
end up attached to a throwaway loop from a `threading.Thread + asyncio.run`
worker, then break the next time the persistent loop tries to use them
("attached to a different loop" / "Event loop is closed").
This module is the single source of truth for "the loop". `BooruApp.__init__`
calls `set_app_loop()` once after constructing it; everything else uses
`run_on_app_loop()` to schedule coroutines from any thread.
Why a module global instead of passing the loop everywhere: it avoids
threading a parameter through every dialog, view, and helper. There's only
one loop in the process, ever, so a global is the honest representation.
"""
from __future__ import annotations
import asyncio
import logging
from concurrent.futures import Future
from typing import Any, Awaitable, Callable
log = logging.getLogger("booru")
_app_loop: asyncio.AbstractEventLoop | None = None
def set_app_loop(loop: asyncio.AbstractEventLoop) -> None:
"""Register the persistent event loop. Called once at app startup."""
global _app_loop
_app_loop = loop
def get_app_loop() -> asyncio.AbstractEventLoop:
"""Return the persistent event loop. Raises if `set_app_loop` was never called."""
if _app_loop is None:
raise RuntimeError(
"App event loop not initialized — call set_app_loop() before "
"scheduling any async work."
)
return _app_loop
def run_on_app_loop(
coro: Awaitable[Any],
done_callback: Callable[[Future], None] | None = None,
) -> Future:
"""Schedule `coro` on the app's persistent event loop from any thread.
Returns a `concurrent.futures.Future` (not asyncio.Future) same shape as
`asyncio.run_coroutine_threadsafe`. If `done_callback` is provided, it
runs on the loop thread when the coroutine finishes; the callback is
responsible for marshaling results back to the GUI thread (typically by
emitting a Qt Signal connected with `Qt.ConnectionType.QueuedConnection`).
"""
fut = asyncio.run_coroutine_threadsafe(coro, get_app_loop())
if done_callback is not None:
fut.add_done_callback(done_callback)
return fut

View File

@ -5,6 +5,8 @@ from __future__ import annotations
import os import os
import sqlite3 import sqlite3
import json import json
import threading
from contextlib import contextmanager
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
@ -175,6 +177,13 @@ class Database:
def __init__(self, path: Path | None = None) -> None: def __init__(self, path: Path | None = None) -> None:
self._path = path or db_path() self._path = path or db_path()
self._conn: sqlite3.Connection | None = None self._conn: sqlite3.Connection | None = None
# Single writer lock for the connection. Reads happen concurrently
# under WAL without contention; writes from multiple threads (Qt
# main + the persistent asyncio loop thread) need explicit
# serialization to avoid interleaved multi-statement methods.
# RLock so a writing method can call another writing method on the
# same thread without self-deadlocking.
self._write_lock = threading.RLock()
@property @property
def conn(self) -> sqlite3.Connection: def conn(self) -> sqlite3.Connection:
@ -187,28 +196,49 @@ class Database:
self._migrate() self._migrate()
return self._conn return self._conn
@contextmanager
def _write(self):
"""Context manager for write methods.
Acquires the write lock for cross-thread serialization, then enters
sqlite3's connection context manager (which BEGINs and COMMIT/ROLLBACKs
atomically). Use this in place of `with self.conn:` whenever a method
writes it composes the two guarantees we want:
1. Multi-statement atomicity (sqlite3 handles)
2. Cross-thread write serialization (the RLock handles)
Reads do not need this they go through `self.conn.execute(...)` directly
and rely on WAL for concurrent-reader isolation.
"""
with self._write_lock:
with self.conn:
yield self.conn
def _migrate(self) -> None: def _migrate(self) -> None:
"""Add columns that may not exist in older databases. """Add columns that may not exist in older databases.
All ALTERs are wrapped in a single transaction so a crash partway All ALTERs are wrapped in a single transaction so a crash partway
through can't leave the schema half-migrated. through can't leave the schema half-migrated. Note: this runs from
the `conn` property's lazy init, where `_write_lock` exists but the
connection is being built we only need to serialize writes via
the lock; the connection context manager handles atomicity.
""" """
with self._conn: with self._write_lock:
cur = self._conn.execute("PRAGMA table_info(favorites)") with self._conn:
cols = {row[1] for row in cur.fetchall()} cur = self._conn.execute("PRAGMA table_info(favorites)")
if "folder" not in cols: cols = {row[1] for row in cur.fetchall()}
self._conn.execute("ALTER TABLE favorites ADD COLUMN folder TEXT") if "folder" not in cols:
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_favorites_folder ON favorites(folder)") self._conn.execute("ALTER TABLE favorites ADD COLUMN folder TEXT")
# Add tag_categories to library_meta if missing self._conn.execute("CREATE INDEX IF NOT EXISTS idx_favorites_folder ON favorites(folder)")
tables = {r[0] for r in self._conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()} # Add tag_categories to library_meta if missing
if "library_meta" in tables: tables = {r[0] for r in self._conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
cur = self._conn.execute("PRAGMA table_info(library_meta)") if "library_meta" in tables:
meta_cols = {row[1] for row in cur.fetchall()} cur = self._conn.execute("PRAGMA table_info(library_meta)")
if "tag_categories" not in meta_cols: meta_cols = {row[1] for row in cur.fetchall()}
self._conn.execute("ALTER TABLE library_meta ADD COLUMN tag_categories TEXT DEFAULT ''") if "tag_categories" not in meta_cols:
# Add tag_categories to favorites if missing self._conn.execute("ALTER TABLE library_meta ADD COLUMN tag_categories TEXT DEFAULT ''")
if "tag_categories" not in cols: # Add tag_categories to favorites if missing
self._conn.execute("ALTER TABLE favorites ADD COLUMN tag_categories TEXT DEFAULT ''") if "tag_categories" not in cols:
self._conn.execute("ALTER TABLE favorites ADD COLUMN tag_categories TEXT DEFAULT ''")
def close(self) -> None: def close(self) -> None:
if self._conn: if self._conn:
@ -226,12 +256,12 @@ class Database:
api_user: str | None = None, api_user: str | None = None,
) -> Site: ) -> Site:
now = datetime.now(timezone.utc).isoformat() now = datetime.now(timezone.utc).isoformat()
cur = self.conn.execute( with self._write():
"INSERT INTO sites (name, url, api_type, api_key, api_user, added_at) " cur = self.conn.execute(
"VALUES (?, ?, ?, ?, ?, ?)", "INSERT INTO sites (name, url, api_type, api_key, api_user, added_at) "
(name, url.rstrip("/"), api_type, api_key, api_user, now), "VALUES (?, ?, ?, ?, ?, ?)",
) (name, url.rstrip("/"), api_type, api_key, api_user, now),
self.conn.commit() )
return Site( return Site(
id=cur.lastrowid, # type: ignore[arg-type] id=cur.lastrowid, # type: ignore[arg-type]
name=name, name=name,
@ -261,7 +291,7 @@ class Database:
] ]
def delete_site(self, site_id: int) -> None: def delete_site(self, site_id: int) -> None:
with self.conn: with self._write():
self.conn.execute("DELETE FROM favorites WHERE site_id = ?", (site_id,)) self.conn.execute("DELETE FROM favorites WHERE site_id = ?", (site_id,))
self.conn.execute("DELETE FROM sites WHERE id = ?", (site_id,)) self.conn.execute("DELETE FROM sites WHERE id = ?", (site_id,))
@ -277,10 +307,10 @@ class Database:
if not sets: if not sets:
return return
vals.append(site_id) vals.append(site_id)
self.conn.execute( with self._write():
f"UPDATE sites SET {', '.join(sets)} WHERE id = ?", vals self.conn.execute(
) f"UPDATE sites SET {', '.join(sets)} WHERE id = ?", vals
self.conn.commit() )
# -- Bookmarks -- # -- Bookmarks --
@ -300,7 +330,7 @@ class Database:
) -> Bookmark: ) -> Bookmark:
now = datetime.now(timezone.utc).isoformat() now = datetime.now(timezone.utc).isoformat()
cats_json = json.dumps(tag_categories) if tag_categories else "" cats_json = json.dumps(tag_categories) if tag_categories else ""
with self.conn: with self._write():
cur = self.conn.execute( cur = self.conn.execute(
"INSERT OR IGNORE INTO favorites " "INSERT OR IGNORE INTO favorites "
"(site_id, post_id, file_url, preview_url, tags, rating, score, source, cached_path, folder, favorited_at, tag_categories) " "(site_id, post_id, file_url, preview_url, tags, rating, score, source, cached_path, folder, favorited_at, tag_categories) "
@ -342,26 +372,26 @@ class Database:
def add_bookmarks_batch(self, bookmarks: list[dict]) -> None: def add_bookmarks_batch(self, bookmarks: list[dict]) -> None:
"""Add multiple bookmarks in a single transaction.""" """Add multiple bookmarks in a single transaction."""
for fav in bookmarks: with self._write():
self.conn.execute( for fav in bookmarks:
"INSERT OR IGNORE INTO favorites " self.conn.execute(
"(site_id, post_id, file_url, preview_url, tags, rating, score, source, cached_path, folder, favorited_at) " "INSERT OR IGNORE INTO favorites "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", "(site_id, post_id, file_url, preview_url, tags, rating, score, source, cached_path, folder, favorited_at) "
(fav['site_id'], fav['post_id'], fav['file_url'], fav.get('preview_url'), "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
fav.get('tags', ''), fav.get('rating'), fav.get('score'), fav.get('source'), (fav['site_id'], fav['post_id'], fav['file_url'], fav.get('preview_url'),
fav.get('cached_path'), fav.get('folder'), fav.get('favorited_at', datetime.now(timezone.utc).isoformat())), fav.get('tags', ''), fav.get('rating'), fav.get('score'), fav.get('source'),
) fav.get('cached_path'), fav.get('folder'), fav.get('favorited_at', datetime.now(timezone.utc).isoformat())),
self.conn.commit() )
# Back-compat shim # Back-compat shim
add_favorites_batch = add_bookmarks_batch add_favorites_batch = add_bookmarks_batch
def remove_bookmark(self, site_id: int, post_id: int) -> None: def remove_bookmark(self, site_id: int, post_id: int) -> None:
self.conn.execute( with self._write():
"DELETE FROM favorites WHERE site_id = ? AND post_id = ?", self.conn.execute(
(site_id, post_id), "DELETE FROM favorites WHERE site_id = ? AND post_id = ?",
) (site_id, post_id),
self.conn.commit() )
# Back-compat shim # Back-compat shim
remove_favorite = remove_bookmark remove_favorite = remove_bookmark
@ -436,11 +466,11 @@ class Database:
_row_to_favorite = _row_to_bookmark _row_to_favorite = _row_to_bookmark
def update_bookmark_cache_path(self, fav_id: int, cached_path: str) -> None: def update_bookmark_cache_path(self, fav_id: int, cached_path: str) -> None:
self.conn.execute( with self._write():
"UPDATE favorites SET cached_path = ? WHERE id = ?", self.conn.execute(
(cached_path, fav_id), "UPDATE favorites SET cached_path = ? WHERE id = ?",
) (cached_path, fav_id),
self.conn.commit() )
# Back-compat shim # Back-compat shim
update_favorite_cache_path = update_bookmark_cache_path update_favorite_cache_path = update_bookmark_cache_path
@ -460,13 +490,13 @@ class Database:
def add_folder(self, name: str) -> None: def add_folder(self, name: str) -> None:
clean = _validate_folder_name(name.strip()) clean = _validate_folder_name(name.strip())
self.conn.execute( with self._write():
"INSERT OR IGNORE INTO favorite_folders (name) VALUES (?)", (clean,) self.conn.execute(
) "INSERT OR IGNORE INTO favorite_folders (name) VALUES (?)", (clean,)
self.conn.commit() )
def remove_folder(self, name: str) -> None: def remove_folder(self, name: str) -> None:
with self.conn: with self._write():
self.conn.execute( self.conn.execute(
"UPDATE favorites SET folder = NULL WHERE folder = ?", (name,) "UPDATE favorites SET folder = NULL WHERE folder = ?", (name,)
) )
@ -474,7 +504,7 @@ class Database:
def rename_folder(self, old: str, new: str) -> None: def rename_folder(self, old: str, new: str) -> None:
new_name = _validate_folder_name(new.strip()) new_name = _validate_folder_name(new.strip())
with self.conn: with self._write():
self.conn.execute( self.conn.execute(
"UPDATE favorites SET folder = ? WHERE folder = ?", (new_name, old) "UPDATE favorites SET folder = ? WHERE folder = ?", (new_name, old)
) )
@ -483,10 +513,10 @@ class Database:
) )
def move_bookmark_to_folder(self, fav_id: int, folder: str | None) -> None: def move_bookmark_to_folder(self, fav_id: int, folder: str | None) -> None:
self.conn.execute( with self._write():
"UPDATE favorites SET folder = ? WHERE id = ?", (folder, fav_id) self.conn.execute(
) "UPDATE favorites SET folder = ? WHERE id = ?", (folder, fav_id)
self.conn.commit() )
# Back-compat shim # Back-compat shim
move_favorite_to_folder = move_bookmark_to_folder move_favorite_to_folder = move_bookmark_to_folder
@ -494,18 +524,18 @@ class Database:
# -- Blacklist -- # -- Blacklist --
def add_blacklisted_tag(self, tag: str) -> None: def add_blacklisted_tag(self, tag: str) -> None:
self.conn.execute( with self._write():
"INSERT OR IGNORE INTO blacklisted_tags (tag) VALUES (?)", self.conn.execute(
(tag.strip().lower(),), "INSERT OR IGNORE INTO blacklisted_tags (tag) VALUES (?)",
) (tag.strip().lower(),),
self.conn.commit() )
def remove_blacklisted_tag(self, tag: str) -> None: def remove_blacklisted_tag(self, tag: str) -> None:
self.conn.execute( with self._write():
"DELETE FROM blacklisted_tags WHERE tag = ?", self.conn.execute(
(tag.strip().lower(),), "DELETE FROM blacklisted_tags WHERE tag = ?",
) (tag.strip().lower(),),
self.conn.commit() )
def get_blacklisted_tags(self) -> list[str]: def get_blacklisted_tags(self) -> list[str]:
rows = self.conn.execute("SELECT tag FROM blacklisted_tags ORDER BY tag").fetchall() rows = self.conn.execute("SELECT tag FROM blacklisted_tags ORDER BY tag").fetchall()
@ -514,12 +544,12 @@ class Database:
# -- Blacklisted Posts -- # -- Blacklisted Posts --
def add_blacklisted_post(self, url: str) -> None: def add_blacklisted_post(self, url: str) -> None:
self.conn.execute("INSERT OR IGNORE INTO blacklisted_posts (url) VALUES (?)", (url,)) with self._write():
self.conn.commit() self.conn.execute("INSERT OR IGNORE INTO blacklisted_posts (url) VALUES (?)", (url,))
def remove_blacklisted_post(self, url: str) -> None: def remove_blacklisted_post(self, url: str) -> None:
self.conn.execute("DELETE FROM blacklisted_posts WHERE url = ?", (url,)) with self._write():
self.conn.commit() self.conn.execute("DELETE FROM blacklisted_posts WHERE url = ?", (url,))
def get_blacklisted_posts(self) -> set[str]: def get_blacklisted_posts(self) -> set[str]:
rows = self.conn.execute("SELECT url FROM blacklisted_posts").fetchall() rows = self.conn.execute("SELECT url FROM blacklisted_posts").fetchall()
@ -531,14 +561,14 @@ class Database:
score: int = 0, rating: str = None, source: str = None, score: int = 0, rating: str = None, source: str = None,
file_url: str = None) -> None: file_url: str = None) -> None:
cats_json = json.dumps(tag_categories) if tag_categories else "" cats_json = json.dumps(tag_categories) if tag_categories else ""
self.conn.execute( with self._write():
"INSERT OR REPLACE INTO library_meta " self.conn.execute(
"(post_id, tags, tag_categories, score, rating, source, file_url, saved_at) " "INSERT OR REPLACE INTO library_meta "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)", "(post_id, tags, tag_categories, score, rating, source, file_url, saved_at) "
(post_id, tags, cats_json, score, rating, source, file_url, "VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
datetime.now(timezone.utc).isoformat()), (post_id, tags, cats_json, score, rating, source, file_url,
) datetime.now(timezone.utc).isoformat()),
self.conn.commit() )
def get_library_meta(self, post_id: int) -> dict | None: def get_library_meta(self, post_id: int) -> dict | None:
row = self.conn.execute("SELECT * FROM library_meta WHERE post_id = ?", (post_id,)).fetchone() row = self.conn.execute("SELECT * FROM library_meta WHERE post_id = ?", (post_id,)).fetchone()
@ -558,8 +588,8 @@ class Database:
return {r["post_id"] for r in rows} return {r["post_id"] for r in rows}
def remove_library_meta(self, post_id: int) -> None: def remove_library_meta(self, post_id: int) -> None:
self.conn.execute("DELETE FROM library_meta WHERE post_id = ?", (post_id,)) with self._write():
self.conn.commit() self.conn.execute("DELETE FROM library_meta WHERE post_id = ?", (post_id,))
# -- Settings -- # -- Settings --
@ -576,11 +606,11 @@ class Database:
return self.get_setting(key) == "1" return self.get_setting(key) == "1"
def set_setting(self, key: str, value: str) -> None: def set_setting(self, key: str, value: str) -> None:
self.conn.execute( with self._write():
"INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)", self.conn.execute(
(key, str(value)), "INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
) (key, str(value)),
self.conn.commit() )
def get_all_settings(self) -> dict[str, str]: def get_all_settings(self) -> dict[str, str]:
result = dict(_DEFAULTS) result = dict(_DEFAULTS)
@ -595,7 +625,7 @@ class Database:
if not query.strip(): if not query.strip():
return return
now = datetime.now(timezone.utc).isoformat() now = datetime.now(timezone.utc).isoformat()
with self.conn: with self._write():
# Remove duplicate if exists, keep latest # Remove duplicate if exists, keep latest
self.conn.execute( self.conn.execute(
"DELETE FROM search_history WHERE query = ? AND (site_id = ? OR (site_id IS NULL AND ? IS NULL))", "DELETE FROM search_history WHERE query = ? AND (site_id = ? OR (site_id IS NULL AND ? IS NULL))",
@ -619,21 +649,21 @@ class Database:
return [r["query"] for r in rows] return [r["query"] for r in rows]
def clear_search_history(self) -> None: def clear_search_history(self) -> None:
self.conn.execute("DELETE FROM search_history") with self._write():
self.conn.commit() self.conn.execute("DELETE FROM search_history")
def remove_search_history(self, query: str) -> None: def remove_search_history(self, query: str) -> None:
self.conn.execute("DELETE FROM search_history WHERE query = ?", (query,)) with self._write():
self.conn.commit() self.conn.execute("DELETE FROM search_history WHERE query = ?", (query,))
# -- Saved Searches -- # -- Saved Searches --
def add_saved_search(self, name: str, query: str, site_id: int | None = None) -> None: def add_saved_search(self, name: str, query: str, site_id: int | None = None) -> None:
self.conn.execute( with self._write():
"INSERT OR REPLACE INTO saved_searches (name, query, site_id) VALUES (?, ?, ?)", self.conn.execute(
(name.strip(), query.strip(), site_id), "INSERT OR REPLACE INTO saved_searches (name, query, site_id) VALUES (?, ?, ?)",
) (name.strip(), query.strip(), site_id),
self.conn.commit() )
def get_saved_searches(self) -> list[tuple[int, str, str]]: def get_saved_searches(self) -> list[tuple[int, str, str]]:
"""Returns list of (id, name, query).""" """Returns list of (id, name, query)."""
@ -643,5 +673,5 @@ class Database:
return [(r["id"], r["name"], r["query"]) for r in rows] return [(r["id"], r["name"], r["query"]) for r in rows]
def remove_saved_search(self, search_id: int) -> None: def remove_saved_search(self, search_id: int) -> None:
self.conn.execute("DELETE FROM saved_searches WHERE id = ?", (search_id,)) with self._write():
self.conn.commit() self.conn.execute("DELETE FROM saved_searches WHERE id = ?", (search_id,))

View File

@ -312,10 +312,20 @@ class BooruApp(QMainWindow):
self._async_thread = threading.Thread(target=self._async_loop.run_forever, daemon=True) self._async_thread = threading.Thread(target=self._async_loop.run_forever, daemon=True)
self._async_thread.start() self._async_thread.start()
# Register the persistent loop as the process-wide app loop. Anything
# that wants to schedule async work — `gui/sites.py`, `gui/bookmarks.py`,
# any future helper — calls `core.concurrency.run_on_app_loop` which
# uses this same loop. The whole point of PR2 is to never run async
# code on a throwaway loop again.
from ..core.concurrency import set_app_loop
set_app_loop(self._async_loop)
# Reset shared HTTP clients from previous session # Reset shared HTTP clients from previous session
from ..core.cache import _get_shared_client
from ..core.api.base import BooruClient from ..core.api.base import BooruClient
from ..core.api.e621 import E621Client
BooruClient._shared_client = None BooruClient._shared_client = None
E621Client._e621_client = None
E621Client._e621_to_close = []
import booru_viewer.core.cache as _cache_mod import booru_viewer.core.cache as _cache_mod
_cache_mod._shared_client = None _cache_mod._shared_client = None
@ -2927,6 +2937,26 @@ class BooruApp(QMainWindow):
self._save_main_splitter_sizes() self._save_main_splitter_sizes()
self._save_right_splitter_sizes() self._save_right_splitter_sizes()
self._save_main_window_state() self._save_main_window_state()
# Cleanly shut the shared httpx pools down BEFORE stopping the loop
# so the connection pool / keepalive sockets / TLS state get released
# instead of being abandoned mid-flight. Has to run on the loop the
# clients were bound to.
try:
from ..core.api.base import BooruClient
from ..core.api.e621 import E621Client
from ..core.cache import aclose_shared_client
async def _close_all():
await BooruClient.aclose_shared()
await E621Client.aclose_shared()
await aclose_shared_client()
fut = asyncio.run_coroutine_threadsafe(_close_all(), self._async_loop)
fut.result(timeout=5)
except Exception as e:
log.warning(f"Shared httpx aclose failed: {e}")
self._async_loop.call_soon_threadsafe(self._async_loop.stop) self._async_loop.call_soon_threadsafe(self._async_loop.stop)
self._async_thread.join(timeout=2) self._async_thread.join(timeout=2)
if self._db.get_setting_bool("clear_cache_on_exit"): if self._db.get_setting_bool("clear_cache_on_exit"):

View File

@ -3,8 +3,6 @@
from __future__ import annotations from __future__ import annotations
import logging import logging
import threading
import asyncio
from pathlib import Path from pathlib import Path
from PySide6.QtCore import Qt, Signal, QObject, QTimer from PySide6.QtCore import Qt, Signal, QObject, QTimer
@ -25,6 +23,7 @@ from PySide6.QtWidgets import (
from ..core.db import Database, Bookmark from ..core.db import Database, Bookmark
from ..core.cache import download_thumbnail from ..core.cache import download_thumbnail
from ..core.concurrency import run_on_app_loop
from .grid import ThumbnailGrid from .grid import ThumbnailGrid
log = logging.getLogger("booru") log = logging.getLogger("booru")
@ -173,13 +172,18 @@ class BookmarksView(QWidget):
thumb.set_pixmap(pix) thumb.set_pixmap(pix)
def _load_thumb_async(self, index: int, url: str) -> None: def _load_thumb_async(self, index: int, url: str) -> None:
# Schedule the download on the persistent event loop instead of
# spawning a daemon thread that runs its own throwaway loop. This
# is the fix for the loop-affinity bug where the cache module's
# shared httpx client would get bound to the throwaway loop and
# then fail every subsequent use from the persistent loop.
async def _dl(): async def _dl():
try: try:
path = await download_thumbnail(url) path = await download_thumbnail(url)
self._signals.thumb_ready.emit(index, str(path)) self._signals.thumb_ready.emit(index, str(path))
except Exception as e: except Exception as e:
log.warning(f"Bookmark thumb {index} failed: {e}") log.warning(f"Bookmark thumb {index} failed: {e}")
threading.Thread(target=lambda: asyncio.run(_dl()), daemon=True).start() run_on_app_loop(_dl())
def _on_thumb_ready(self, index: int, path: str) -> None: def _on_thumb_ready(self, index: int, path: str) -> None:
thumbs = self._grid._thumbs thumbs = self._grid._thumbs

View File

@ -2,10 +2,7 @@
from __future__ import annotations from __future__ import annotations
import asyncio from PySide6.QtCore import Qt, Signal
import threading
from PySide6.QtCore import Qt, Signal, QMetaObject, Q_ARG, Qt as QtNS
from PySide6.QtWidgets import ( from PySide6.QtWidgets import (
QDialog, QDialog,
QVBoxLayout, QVBoxLayout,
@ -22,16 +19,34 @@ from PySide6.QtWidgets import (
from ..core.db import Database, Site from ..core.db import Database, Site
from ..core.api.detect import detect_site_type from ..core.api.detect import detect_site_type
from ..core.concurrency import run_on_app_loop
class SiteDialog(QDialog): class SiteDialog(QDialog):
"""Dialog to add or edit a booru site.""" """Dialog to add or edit a booru site."""
# Internal signals used to marshal worker results back to the GUI thread.
# Connected with QueuedConnection so emit() from the asyncio loop thread
# is always delivered on the Qt main thread.
_detect_done_sig = Signal(object, object) # (result_or_None, error_or_None)
_test_done_sig = Signal(bool, str)
def __init__(self, parent: QWidget | None = None, site: Site | None = None) -> None: def __init__(self, parent: QWidget | None = None, site: Site | None = None) -> None:
super().__init__(parent) super().__init__(parent)
self._editing = site is not None self._editing = site is not None
self.setWindowTitle("Edit Site" if self._editing else "Add Site") self.setWindowTitle("Edit Site" if self._editing else "Add Site")
self.setMinimumWidth(400) self.setMinimumWidth(400)
# Set when the dialog is closed/destroyed so in-flight worker
# callbacks can short-circuit instead of poking a dead QObject.
self._closed = False
# Tracked so we can cancel pending coroutines on close.
self._inflight = [] # list[concurrent.futures.Future]
self._detect_done_sig.connect(
self._detect_finished, Qt.ConnectionType.QueuedConnection
)
self._test_done_sig.connect(
self._test_finished, Qt.ConnectionType.QueuedConnection
)
layout = QVBoxLayout(self) layout = QVBoxLayout(self)
@ -102,16 +117,22 @@ class SiteDialog(QDialog):
api_key = self._key_input.text().strip() or None api_key = self._key_input.text().strip() or None
api_user = self._user_input.text().strip() or None api_user = self._user_input.text().strip() or None
def _run(): async def _do_detect():
try: try:
result = asyncio.run(detect_site_type(url, api_key=api_key, api_user=api_user)) result = await detect_site_type(url, api_key=api_key, api_user=api_user)
self._detect_finished(result, None) if not self._closed:
self._detect_done_sig.emit(result, None)
except Exception as e: except Exception as e:
self._detect_finished(None, e) if not self._closed:
self._detect_done_sig.emit(None, e)
threading.Thread(target=_run, daemon=True).start() fut = run_on_app_loop(_do_detect())
self._inflight.append(fut)
fut.add_done_callback(lambda f: self._inflight.remove(f) if f in self._inflight else None)
def _detect_finished(self, result: str | None, error: Exception | None) -> None: def _detect_finished(self, result, error) -> None:
if self._closed:
return
self._detect_btn.setEnabled(True) self._detect_btn.setEnabled(True)
if error: if error:
self._status_label.setText(f"Error: {error}") self._status_label.setText(f"Error: {error}")
@ -132,25 +153,42 @@ class SiteDialog(QDialog):
self._status_label.setText("Testing connection...") self._status_label.setText("Testing connection...")
self._test_btn.setEnabled(False) self._test_btn.setEnabled(False)
def _run(): from ..core.api.detect import client_for_type
import asyncio
from ..core.api.detect import client_for_type async def _do_test():
try: try:
client = client_for_type(api_type, url, api_key=api_key, api_user=api_user) client = client_for_type(api_type, url, api_key=api_key, api_user=api_user)
ok, detail = asyncio.run(client.test_connection()) ok, detail = await client.test_connection()
self._test_finished(ok, detail) if not self._closed:
self._test_done_sig.emit(ok, detail)
except Exception as e: except Exception as e:
self._test_finished(False, str(e)) if not self._closed:
self._test_done_sig.emit(False, str(e))
threading.Thread(target=_run, daemon=True).start() fut = run_on_app_loop(_do_test())
self._inflight.append(fut)
fut.add_done_callback(lambda f: self._inflight.remove(f) if f in self._inflight else None)
def _test_finished(self, ok: bool, detail: str) -> None: def _test_finished(self, ok: bool, detail: str) -> None:
if self._closed:
return
self._test_btn.setEnabled(True) self._test_btn.setEnabled(True)
if ok: if ok:
self._status_label.setText(f"Connected! {detail}") self._status_label.setText(f"Connected! {detail}")
else: else:
self._status_label.setText(f"Failed: {detail}") self._status_label.setText(f"Failed: {detail}")
def closeEvent(self, event) -> None:
# Mark closed first so in-flight callbacks short-circuit, then
# cancel anything still pending so we don't tie up the loop.
self._closed = True
for fut in list(self._inflight):
try:
fut.cancel()
except Exception:
pass
super().closeEvent(event)
def _try_parse_url(self, text: str) -> None: def _try_parse_url(self, text: str) -> None:
"""Strip query params from pasted URLs like https://gelbooru.com/index.php?page=post&s=list&tags=all.""" """Strip query params from pasted URLs like https://gelbooru.com/index.php?page=post&s=list&tags=all."""
from urllib.parse import urlparse, parse_qs from urllib.parse import urlparse, parse_qs