booru-viewer/booru_viewer/core/concurrency.py
pax eb58d76bc0 Route async work through one persistent loop, lock shared httpx + DB writes
Mixing `threading.Thread + asyncio.run` workers with the long-lived
asyncio loop in gui/app.py is a real loop-affinity bug: the first worker
thread to call `asyncio.run` constructs a throwaway loop, which the
shared httpx clients then attach to, and the next call from the
persistent loop fails with "Event loop is closed" / "attached to a
different loop". This commit eliminates the pattern across the GUI and
adds the locking + cleanup that should have been there from the start.

Persistent loop accessor (core/concurrency.py — new)
- set_app_loop / get_app_loop / run_on_app_loop. BooruApp registers the
  one persistent loop at startup; everything that wants to schedule
  async work calls run_on_app_loop instead of spawning a thread that
  builds its own loop. Three functions, ~30 lines, single source of
  truth for "the loop".

Lazy-init lock + cleanup on shared httpx clients (core/api/base.py,
core/api/e621.py, core/cache.py)
- Each shared singleton (BooruClient._shared_client, E621Client._e621_client,
  cache._shared_client) now uses fast-path / locked-slow-path lazy init.
  Concurrent first-callers from the same loop can no longer both build
  a client and leak one (verified: 10 racing callers => 1 httpx instance).
- Each module exposes an aclose helper that BooruApp.closeEvent runs via
  run_coroutine_threadsafe(...).result(timeout=5) BEFORE stopping the
  loop. The connection pool, keepalive sockets, and TLS state finally
  release cleanly instead of being abandoned at process exit.
- E621Client tracks UA-change leftovers in _e621_to_close so the old
  client doesn't leak when api_user changes — drained in aclose_shared.

GUI workers routed through the persistent loop (gui/sites.py,
gui/bookmarks.py)
- SiteDialog._on_detect / _on_test: replaced
  `threading.Thread(target=lambda: asyncio.run(...))` with
  run_on_app_loop. Results marshaled back through Qt Signals connected
  with QueuedConnection. Added _closed flag + _inflight futures list:
  closeEvent cancels pending coroutines and shorts out the result emit
  if the user closes the dialog mid-detect (no use-after-free on
  destroyed QObject).
- BookmarksView._load_thumb_async: same swap. The existing thumb_ready
  signal already used QueuedConnection so the marshaling side was
  already correct.

DB write serialization (core/db.py)
- Database._write_lock = threading.RLock() — RLock not Lock so a
  writing method can call another writing method on the same thread
  without self-deadlocking.
- New _write() context manager composes the lock + sqlite3's connection
  context manager (the latter handles BEGIN / COMMIT / ROLLBACK
  atomically). Every write method converted: add_site, update_site,
  delete_site, add_bookmark, add_bookmarks_batch, remove_bookmark,
  update_bookmark_cache_path, add_folder, remove_folder, rename_folder,
  move_bookmark_to_folder, add/remove_blacklisted_tag,
  add/remove_blacklisted_post, save_library_meta, remove_library_meta,
  set_setting, add_search_history, clear_search_history,
  remove_search_history, add_saved_search, remove_saved_search.
- _migrate keeps using the lock + raw _conn context manager because
  it runs from inside the conn property's lazy init (where _write()
  would re-enter conn).
- Reads stay lock-free and rely on WAL for reader concurrency. Verified
  under contention: 5 threads × 50 add_bookmark calls => 250 rows,
  zero corruption, zero "database is locked" errors.

Smoke-tested with seven scenarios: get_app_loop raises before set,
run_on_app_loop round-trips, lazy init creates exactly one client,
10 concurrent first-callers => 1 httpx, aclose_shared cleans up,
RLock allows nested re-acquire, multi-threaded write contention.
2026-04-07 17:24:23 -05:00

65 lines
2.5 KiB
Python

"""Process-wide handle to the app's persistent asyncio event loop.
The GUI runs Qt on the main thread and a single long-lived asyncio loop in
a daemon thread (`BooruApp._async_thread`). Every async piece of code in the
app — searches, downloads, autocomplete, site detection, bookmark thumb
loading — must run on that one loop. Without this guarantee, the shared
httpx clients (which httpx binds to whatever loop first instantiated them)
end up attached to a throwaway loop from a `threading.Thread + asyncio.run`
worker, then break the next time the persistent loop tries to use them
("attached to a different loop" / "Event loop is closed").
This module is the single source of truth for "the loop". `BooruApp.__init__`
calls `set_app_loop()` once after constructing it; everything else uses
`run_on_app_loop()` to schedule coroutines from any thread.
Why a module global instead of passing the loop everywhere: it avoids
threading a parameter through every dialog, view, and helper. There's only
one loop in the process, ever, so a global is the honest representation.
"""
from __future__ import annotations
import asyncio
import logging
from concurrent.futures import Future
from typing import Any, Awaitable, Callable
log = logging.getLogger("booru")
_app_loop: asyncio.AbstractEventLoop | None = None
def set_app_loop(loop: asyncio.AbstractEventLoop) -> None:
"""Register the persistent event loop. Called once at app startup."""
global _app_loop
_app_loop = loop
def get_app_loop() -> asyncio.AbstractEventLoop:
"""Return the persistent event loop. Raises if `set_app_loop` was never called."""
if _app_loop is None:
raise RuntimeError(
"App event loop not initialized — call set_app_loop() before "
"scheduling any async work."
)
return _app_loop
def run_on_app_loop(
coro: Awaitable[Any],
done_callback: Callable[[Future], None] | None = None,
) -> Future:
"""Schedule `coro` on the app's persistent event loop from any thread.
Returns a `concurrent.futures.Future` (not asyncio.Future) — same shape as
`asyncio.run_coroutine_threadsafe`. If `done_callback` is provided, it
runs on the loop thread when the coroutine finishes; the callback is
responsible for marshaling results back to the GUI thread (typically by
emitting a Qt Signal connected with `Qt.ConnectionType.QueuedConnection`).
"""
fut = asyncio.run_coroutine_threadsafe(coro, get_app_loop())
if done_callback is not None:
fut.add_done_callback(done_callback)
return fut