Route async work through one persistent loop, lock shared httpx + DB writes
Mixing `threading.Thread + asyncio.run` workers with the long-lived asyncio loop in gui/app.py is a real loop-affinity bug: the first worker thread to call `asyncio.run` constructs a throwaway loop, which the shared httpx clients then attach to, and the next call from the persistent loop fails with "Event loop is closed" / "attached to a different loop". This commit eliminates the pattern across the GUI and adds the locking + cleanup that should have been there from the start. Persistent loop accessor (core/concurrency.py — new) - set_app_loop / get_app_loop / run_on_app_loop. BooruApp registers the one persistent loop at startup; everything that wants to schedule async work calls run_on_app_loop instead of spawning a thread that builds its own loop. Three functions, ~30 lines, single source of truth for "the loop". Lazy-init lock + cleanup on shared httpx clients (core/api/base.py, core/api/e621.py, core/cache.py) - Each shared singleton (BooruClient._shared_client, E621Client._e621_client, cache._shared_client) now uses fast-path / locked-slow-path lazy init. Concurrent first-callers from the same loop can no longer both build a client and leak one (verified: 10 racing callers => 1 httpx instance). - Each module exposes an aclose helper that BooruApp.closeEvent runs via run_coroutine_threadsafe(...).result(timeout=5) BEFORE stopping the loop. The connection pool, keepalive sockets, and TLS state finally release cleanly instead of being abandoned at process exit. - E621Client tracks UA-change leftovers in _e621_to_close so the old client doesn't leak when api_user changes — drained in aclose_shared. GUI workers routed through the persistent loop (gui/sites.py, gui/bookmarks.py) - SiteDialog._on_detect / _on_test: replaced `threading.Thread(target=lambda: asyncio.run(...))` with run_on_app_loop. Results marshaled back through Qt Signals connected with QueuedConnection. Added _closed flag + _inflight futures list: closeEvent cancels pending coroutines and shorts out the result emit if the user closes the dialog mid-detect (no use-after-free on destroyed QObject). - BookmarksView._load_thumb_async: same swap. The existing thumb_ready signal already used QueuedConnection so the marshaling side was already correct. DB write serialization (core/db.py) - Database._write_lock = threading.RLock() — RLock not Lock so a writing method can call another writing method on the same thread without self-deadlocking. - New _write() context manager composes the lock + sqlite3's connection context manager (the latter handles BEGIN / COMMIT / ROLLBACK atomically). Every write method converted: add_site, update_site, delete_site, add_bookmark, add_bookmarks_batch, remove_bookmark, update_bookmark_cache_path, add_folder, remove_folder, rename_folder, move_bookmark_to_folder, add/remove_blacklisted_tag, add/remove_blacklisted_post, save_library_meta, remove_library_meta, set_setting, add_search_history, clear_search_history, remove_search_history, add_saved_search, remove_saved_search. - _migrate keeps using the lock + raw _conn context manager because it runs from inside the conn property's lazy init (where _write() would re-enter conn). - Reads stay lock-free and rely on WAL for reader concurrency. Verified under contention: 5 threads × 50 add_bookmark calls => 250 rows, zero corruption, zero "database is locked" errors. Smoke-tested with seven scenarios: get_app_loop raises before set, run_on_app_loop round-trips, lazy init creates exactly one client, 10 concurrent first-callers => 1 httpx, aclose_shared cleans up, RLock allows nested re-acquire, multi-threaded write contention.
This commit is contained in:
parent
54ccc40477
commit
eb58d76bc0
@ -4,6 +4,7 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import threading
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
@ -62,8 +63,18 @@ class BooruClient(ABC):
|
||||
|
||||
api_type: str = ""
|
||||
|
||||
# Shared client across all BooruClient instances for connection reuse
|
||||
# Shared httpx client across all BooruClient instances for connection
|
||||
# reuse. Lazily created on first access; the threading.Lock guards the
|
||||
# check-and-set so concurrent first-callers can't both build a client
|
||||
# and leak one. The lock is per-class, lives for the process lifetime.
|
||||
#
|
||||
# Loop affinity: by convention every async call goes through
|
||||
# `core.concurrency.run_on_app_loop`, which schedules on the persistent
|
||||
# event loop in `gui/app.py`. The first lazy init therefore binds the
|
||||
# client to that loop, and every subsequent use is on the same loop.
|
||||
# This is the contract that PR2 enforces — see core/concurrency.py.
|
||||
_shared_client: httpx.AsyncClient | None = None
|
||||
_shared_client_lock: threading.Lock = threading.Lock()
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@ -77,15 +88,37 @@ class BooruClient(ABC):
|
||||
|
||||
@property
|
||||
def client(self) -> httpx.AsyncClient:
|
||||
if BooruClient._shared_client is None or BooruClient._shared_client.is_closed:
|
||||
BooruClient._shared_client = httpx.AsyncClient(
|
||||
headers={"User-Agent": USER_AGENT},
|
||||
follow_redirects=True,
|
||||
timeout=20.0,
|
||||
event_hooks={"request": [self._log_request]},
|
||||
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
|
||||
)
|
||||
return BooruClient._shared_client
|
||||
# Fast path: client exists and is open. No lock needed for the read.
|
||||
c = BooruClient._shared_client
|
||||
if c is not None and not c.is_closed:
|
||||
return c
|
||||
# Slow path: build it. Lock so two coroutines on the same loop don't
|
||||
# both construct + leak.
|
||||
with BooruClient._shared_client_lock:
|
||||
c = BooruClient._shared_client
|
||||
if c is None or c.is_closed:
|
||||
c = httpx.AsyncClient(
|
||||
headers={"User-Agent": USER_AGENT},
|
||||
follow_redirects=True,
|
||||
timeout=20.0,
|
||||
event_hooks={"request": [self._log_request]},
|
||||
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
|
||||
)
|
||||
BooruClient._shared_client = c
|
||||
return c
|
||||
|
||||
@classmethod
|
||||
async def aclose_shared(cls) -> None:
|
||||
"""Cleanly aclose the shared client. Safe to call from any coroutine
|
||||
running on the loop the client is bound to. No-op if not initialized."""
|
||||
with cls._shared_client_lock:
|
||||
c = cls._shared_client
|
||||
cls._shared_client = None
|
||||
if c is not None and not c.is_closed:
|
||||
try:
|
||||
await c.aclose()
|
||||
except Exception as e:
|
||||
log.warning("BooruClient shared aclose failed: %s", e)
|
||||
|
||||
@staticmethod
|
||||
async def _log_request(request: httpx.Request) -> None:
|
||||
@ -124,7 +157,10 @@ class BooruClient(ABC):
|
||||
return resp # unreachable in practice, satisfies type checker
|
||||
|
||||
async def close(self) -> None:
|
||||
pass # shared client stays open
|
||||
# Per-instance close is a no-op — the shared pool is owned by the
|
||||
# class. Use `await BooruClient.aclose_shared()` from app shutdown
|
||||
# to actually release the connection pool.
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def search(
|
||||
|
||||
@ -3,6 +3,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import threading
|
||||
|
||||
import httpx
|
||||
|
||||
@ -15,23 +16,56 @@ log = logging.getLogger("booru")
|
||||
class E621Client(BooruClient):
|
||||
api_type = "e621"
|
||||
|
||||
# Same shared-singleton pattern as BooruClient, but e621 needs a custom
|
||||
# User-Agent (their TOS requires identifying the app + user). When the
|
||||
# UA changes (api_user edit) we need to rebuild — and we explicitly
|
||||
# close the old client to avoid leaking its connection pool.
|
||||
_e621_client: httpx.AsyncClient | None = None
|
||||
_e621_ua: str = ""
|
||||
_e621_lock: threading.Lock = threading.Lock()
|
||||
# Old clients pending aclose. We can't await from a sync property, so
|
||||
# we stash them here and the app's shutdown coroutine drains them.
|
||||
_e621_to_close: list[httpx.AsyncClient] = []
|
||||
|
||||
@property
|
||||
def client(self) -> httpx.AsyncClient:
|
||||
ua = USER_AGENT
|
||||
if self.api_user:
|
||||
ua = f"{USER_AGENT} (by {self.api_user} on e621)"
|
||||
if E621Client._e621_client is None or E621Client._e621_client.is_closed or E621Client._e621_ua != ua:
|
||||
E621Client._e621_ua = ua
|
||||
E621Client._e621_client = httpx.AsyncClient(
|
||||
headers={"User-Agent": ua},
|
||||
follow_redirects=True,
|
||||
timeout=20.0,
|
||||
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
|
||||
)
|
||||
return E621Client._e621_client
|
||||
# Fast path
|
||||
c = E621Client._e621_client
|
||||
if c is not None and not c.is_closed and E621Client._e621_ua == ua:
|
||||
return c
|
||||
with E621Client._e621_lock:
|
||||
c = E621Client._e621_client
|
||||
if c is None or c.is_closed or E621Client._e621_ua != ua:
|
||||
# Stash old client for shutdown cleanup if it's still open.
|
||||
if c is not None and not c.is_closed:
|
||||
E621Client._e621_to_close.append(c)
|
||||
E621Client._e621_ua = ua
|
||||
c = httpx.AsyncClient(
|
||||
headers={"User-Agent": ua},
|
||||
follow_redirects=True,
|
||||
timeout=20.0,
|
||||
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
|
||||
)
|
||||
E621Client._e621_client = c
|
||||
return c
|
||||
|
||||
@classmethod
|
||||
async def aclose_shared(cls) -> None:
|
||||
"""Cleanly aclose the active client and any UA-change leftovers."""
|
||||
with cls._e621_lock:
|
||||
current = cls._e621_client
|
||||
cls._e621_client = None
|
||||
pending = cls._e621_to_close
|
||||
cls._e621_to_close = []
|
||||
for c in [current, *pending]:
|
||||
if c is not None and not c.is_closed:
|
||||
try:
|
||||
await c.aclose()
|
||||
except Exception as e:
|
||||
log.warning("E621Client aclose failed: %s", e)
|
||||
|
||||
async def search(
|
||||
self, tags: str = "", page: int = 1, limit: int = DEFAULT_PAGE_SIZE
|
||||
|
||||
@ -7,6 +7,7 @@ import hashlib
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
import threading
|
||||
import zipfile
|
||||
from collections import OrderedDict, defaultdict
|
||||
from datetime import datetime
|
||||
@ -64,23 +65,48 @@ def _url_hash(url: str) -> str:
|
||||
return hashlib.sha256(url.encode()).hexdigest()[:16]
|
||||
|
||||
|
||||
# Shared httpx client for connection pooling (avoids per-request TLS handshakes)
|
||||
# Shared httpx client for connection pooling (avoids per-request TLS handshakes).
|
||||
# Lazily created on first download. Lock guards the check-and-set so concurrent
|
||||
# first-callers can't both build a client and leak one. Loop affinity is
|
||||
# guaranteed by routing all downloads through `core.concurrency.run_on_app_loop`
|
||||
# (see PR2).
|
||||
_shared_client: httpx.AsyncClient | None = None
|
||||
_shared_client_lock = threading.Lock()
|
||||
|
||||
|
||||
def _get_shared_client(referer: str = "") -> httpx.AsyncClient:
|
||||
global _shared_client
|
||||
if _shared_client is None or _shared_client.is_closed:
|
||||
_shared_client = httpx.AsyncClient(
|
||||
headers={
|
||||
"User-Agent": USER_AGENT,
|
||||
"Accept": "image/*,video/*,*/*",
|
||||
},
|
||||
follow_redirects=True,
|
||||
timeout=60.0,
|
||||
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
|
||||
)
|
||||
return _shared_client
|
||||
c = _shared_client
|
||||
if c is not None and not c.is_closed:
|
||||
return c
|
||||
with _shared_client_lock:
|
||||
c = _shared_client
|
||||
if c is None or c.is_closed:
|
||||
c = httpx.AsyncClient(
|
||||
headers={
|
||||
"User-Agent": USER_AGENT,
|
||||
"Accept": "image/*,video/*,*/*",
|
||||
},
|
||||
follow_redirects=True,
|
||||
timeout=60.0,
|
||||
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
|
||||
)
|
||||
_shared_client = c
|
||||
return c
|
||||
|
||||
|
||||
async def aclose_shared_client() -> None:
|
||||
"""Cleanly aclose the cache module's shared download client. Safe to call
|
||||
once at app shutdown; no-op if not initialized."""
|
||||
global _shared_client
|
||||
with _shared_client_lock:
|
||||
c = _shared_client
|
||||
_shared_client = None
|
||||
if c is not None and not c.is_closed:
|
||||
try:
|
||||
await c.aclose()
|
||||
except Exception as e:
|
||||
log.warning("cache shared client aclose failed: %s", e)
|
||||
|
||||
|
||||
_IMAGE_MAGIC = {
|
||||
|
||||
64
booru_viewer/core/concurrency.py
Normal file
64
booru_viewer/core/concurrency.py
Normal file
@ -0,0 +1,64 @@
|
||||
"""Process-wide handle to the app's persistent asyncio event loop.
|
||||
|
||||
The GUI runs Qt on the main thread and a single long-lived asyncio loop in
|
||||
a daemon thread (`BooruApp._async_thread`). Every async piece of code in the
|
||||
app — searches, downloads, autocomplete, site detection, bookmark thumb
|
||||
loading — must run on that one loop. Without this guarantee, the shared
|
||||
httpx clients (which httpx binds to whatever loop first instantiated them)
|
||||
end up attached to a throwaway loop from a `threading.Thread + asyncio.run`
|
||||
worker, then break the next time the persistent loop tries to use them
|
||||
("attached to a different loop" / "Event loop is closed").
|
||||
|
||||
This module is the single source of truth for "the loop". `BooruApp.__init__`
|
||||
calls `set_app_loop()` once after constructing it; everything else uses
|
||||
`run_on_app_loop()` to schedule coroutines from any thread.
|
||||
|
||||
Why a module global instead of passing the loop everywhere: it avoids
|
||||
threading a parameter through every dialog, view, and helper. There's only
|
||||
one loop in the process, ever, so a global is the honest representation.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from concurrent.futures import Future
|
||||
from typing import Any, Awaitable, Callable
|
||||
|
||||
log = logging.getLogger("booru")
|
||||
|
||||
_app_loop: asyncio.AbstractEventLoop | None = None
|
||||
|
||||
|
||||
def set_app_loop(loop: asyncio.AbstractEventLoop) -> None:
|
||||
"""Register the persistent event loop. Called once at app startup."""
|
||||
global _app_loop
|
||||
_app_loop = loop
|
||||
|
||||
|
||||
def get_app_loop() -> asyncio.AbstractEventLoop:
|
||||
"""Return the persistent event loop. Raises if `set_app_loop` was never called."""
|
||||
if _app_loop is None:
|
||||
raise RuntimeError(
|
||||
"App event loop not initialized — call set_app_loop() before "
|
||||
"scheduling any async work."
|
||||
)
|
||||
return _app_loop
|
||||
|
||||
|
||||
def run_on_app_loop(
|
||||
coro: Awaitable[Any],
|
||||
done_callback: Callable[[Future], None] | None = None,
|
||||
) -> Future:
|
||||
"""Schedule `coro` on the app's persistent event loop from any thread.
|
||||
|
||||
Returns a `concurrent.futures.Future` (not asyncio.Future) — same shape as
|
||||
`asyncio.run_coroutine_threadsafe`. If `done_callback` is provided, it
|
||||
runs on the loop thread when the coroutine finishes; the callback is
|
||||
responsible for marshaling results back to the GUI thread (typically by
|
||||
emitting a Qt Signal connected with `Qt.ConnectionType.QueuedConnection`).
|
||||
"""
|
||||
fut = asyncio.run_coroutine_threadsafe(coro, get_app_loop())
|
||||
if done_callback is not None:
|
||||
fut.add_done_callback(done_callback)
|
||||
return fut
|
||||
@ -5,6 +5,8 @@ from __future__ import annotations
|
||||
import os
|
||||
import sqlite3
|
||||
import json
|
||||
import threading
|
||||
from contextlib import contextmanager
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
@ -175,6 +177,13 @@ class Database:
|
||||
def __init__(self, path: Path | None = None) -> None:
|
||||
self._path = path or db_path()
|
||||
self._conn: sqlite3.Connection | None = None
|
||||
# Single writer lock for the connection. Reads happen concurrently
|
||||
# under WAL without contention; writes from multiple threads (Qt
|
||||
# main + the persistent asyncio loop thread) need explicit
|
||||
# serialization to avoid interleaved multi-statement methods.
|
||||
# RLock so a writing method can call another writing method on the
|
||||
# same thread without self-deadlocking.
|
||||
self._write_lock = threading.RLock()
|
||||
|
||||
@property
|
||||
def conn(self) -> sqlite3.Connection:
|
||||
@ -187,28 +196,49 @@ class Database:
|
||||
self._migrate()
|
||||
return self._conn
|
||||
|
||||
@contextmanager
|
||||
def _write(self):
|
||||
"""Context manager for write methods.
|
||||
|
||||
Acquires the write lock for cross-thread serialization, then enters
|
||||
sqlite3's connection context manager (which BEGINs and COMMIT/ROLLBACKs
|
||||
atomically). Use this in place of `with self.conn:` whenever a method
|
||||
writes — it composes the two guarantees we want:
|
||||
1. Multi-statement atomicity (sqlite3 handles)
|
||||
2. Cross-thread write serialization (the RLock handles)
|
||||
Reads do not need this — they go through `self.conn.execute(...)` directly
|
||||
and rely on WAL for concurrent-reader isolation.
|
||||
"""
|
||||
with self._write_lock:
|
||||
with self.conn:
|
||||
yield self.conn
|
||||
|
||||
def _migrate(self) -> None:
|
||||
"""Add columns that may not exist in older databases.
|
||||
|
||||
All ALTERs are wrapped in a single transaction so a crash partway
|
||||
through can't leave the schema half-migrated.
|
||||
through can't leave the schema half-migrated. Note: this runs from
|
||||
the `conn` property's lazy init, where `_write_lock` exists but the
|
||||
connection is being built — we only need to serialize writes via
|
||||
the lock; the connection context manager handles atomicity.
|
||||
"""
|
||||
with self._conn:
|
||||
cur = self._conn.execute("PRAGMA table_info(favorites)")
|
||||
cols = {row[1] for row in cur.fetchall()}
|
||||
if "folder" not in cols:
|
||||
self._conn.execute("ALTER TABLE favorites ADD COLUMN folder TEXT")
|
||||
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_favorites_folder ON favorites(folder)")
|
||||
# Add tag_categories to library_meta if missing
|
||||
tables = {r[0] for r in self._conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
|
||||
if "library_meta" in tables:
|
||||
cur = self._conn.execute("PRAGMA table_info(library_meta)")
|
||||
meta_cols = {row[1] for row in cur.fetchall()}
|
||||
if "tag_categories" not in meta_cols:
|
||||
self._conn.execute("ALTER TABLE library_meta ADD COLUMN tag_categories TEXT DEFAULT ''")
|
||||
# Add tag_categories to favorites if missing
|
||||
if "tag_categories" not in cols:
|
||||
self._conn.execute("ALTER TABLE favorites ADD COLUMN tag_categories TEXT DEFAULT ''")
|
||||
with self._write_lock:
|
||||
with self._conn:
|
||||
cur = self._conn.execute("PRAGMA table_info(favorites)")
|
||||
cols = {row[1] for row in cur.fetchall()}
|
||||
if "folder" not in cols:
|
||||
self._conn.execute("ALTER TABLE favorites ADD COLUMN folder TEXT")
|
||||
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_favorites_folder ON favorites(folder)")
|
||||
# Add tag_categories to library_meta if missing
|
||||
tables = {r[0] for r in self._conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
|
||||
if "library_meta" in tables:
|
||||
cur = self._conn.execute("PRAGMA table_info(library_meta)")
|
||||
meta_cols = {row[1] for row in cur.fetchall()}
|
||||
if "tag_categories" not in meta_cols:
|
||||
self._conn.execute("ALTER TABLE library_meta ADD COLUMN tag_categories TEXT DEFAULT ''")
|
||||
# Add tag_categories to favorites if missing
|
||||
if "tag_categories" not in cols:
|
||||
self._conn.execute("ALTER TABLE favorites ADD COLUMN tag_categories TEXT DEFAULT ''")
|
||||
|
||||
def close(self) -> None:
|
||||
if self._conn:
|
||||
@ -226,12 +256,12 @@ class Database:
|
||||
api_user: str | None = None,
|
||||
) -> Site:
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
cur = self.conn.execute(
|
||||
"INSERT INTO sites (name, url, api_type, api_key, api_user, added_at) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(name, url.rstrip("/"), api_type, api_key, api_user, now),
|
||||
)
|
||||
self.conn.commit()
|
||||
with self._write():
|
||||
cur = self.conn.execute(
|
||||
"INSERT INTO sites (name, url, api_type, api_key, api_user, added_at) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||
(name, url.rstrip("/"), api_type, api_key, api_user, now),
|
||||
)
|
||||
return Site(
|
||||
id=cur.lastrowid, # type: ignore[arg-type]
|
||||
name=name,
|
||||
@ -261,7 +291,7 @@ class Database:
|
||||
]
|
||||
|
||||
def delete_site(self, site_id: int) -> None:
|
||||
with self.conn:
|
||||
with self._write():
|
||||
self.conn.execute("DELETE FROM favorites WHERE site_id = ?", (site_id,))
|
||||
self.conn.execute("DELETE FROM sites WHERE id = ?", (site_id,))
|
||||
|
||||
@ -277,10 +307,10 @@ class Database:
|
||||
if not sets:
|
||||
return
|
||||
vals.append(site_id)
|
||||
self.conn.execute(
|
||||
f"UPDATE sites SET {', '.join(sets)} WHERE id = ?", vals
|
||||
)
|
||||
self.conn.commit()
|
||||
with self._write():
|
||||
self.conn.execute(
|
||||
f"UPDATE sites SET {', '.join(sets)} WHERE id = ?", vals
|
||||
)
|
||||
|
||||
# -- Bookmarks --
|
||||
|
||||
@ -300,7 +330,7 @@ class Database:
|
||||
) -> Bookmark:
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
cats_json = json.dumps(tag_categories) if tag_categories else ""
|
||||
with self.conn:
|
||||
with self._write():
|
||||
cur = self.conn.execute(
|
||||
"INSERT OR IGNORE INTO favorites "
|
||||
"(site_id, post_id, file_url, preview_url, tags, rating, score, source, cached_path, folder, favorited_at, tag_categories) "
|
||||
@ -342,26 +372,26 @@ class Database:
|
||||
|
||||
def add_bookmarks_batch(self, bookmarks: list[dict]) -> None:
|
||||
"""Add multiple bookmarks in a single transaction."""
|
||||
for fav in bookmarks:
|
||||
self.conn.execute(
|
||||
"INSERT OR IGNORE INTO favorites "
|
||||
"(site_id, post_id, file_url, preview_url, tags, rating, score, source, cached_path, folder, favorited_at) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
(fav['site_id'], fav['post_id'], fav['file_url'], fav.get('preview_url'),
|
||||
fav.get('tags', ''), fav.get('rating'), fav.get('score'), fav.get('source'),
|
||||
fav.get('cached_path'), fav.get('folder'), fav.get('favorited_at', datetime.now(timezone.utc).isoformat())),
|
||||
)
|
||||
self.conn.commit()
|
||||
with self._write():
|
||||
for fav in bookmarks:
|
||||
self.conn.execute(
|
||||
"INSERT OR IGNORE INTO favorites "
|
||||
"(site_id, post_id, file_url, preview_url, tags, rating, score, source, cached_path, folder, favorited_at) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
(fav['site_id'], fav['post_id'], fav['file_url'], fav.get('preview_url'),
|
||||
fav.get('tags', ''), fav.get('rating'), fav.get('score'), fav.get('source'),
|
||||
fav.get('cached_path'), fav.get('folder'), fav.get('favorited_at', datetime.now(timezone.utc).isoformat())),
|
||||
)
|
||||
|
||||
# Back-compat shim
|
||||
add_favorites_batch = add_bookmarks_batch
|
||||
|
||||
def remove_bookmark(self, site_id: int, post_id: int) -> None:
|
||||
self.conn.execute(
|
||||
"DELETE FROM favorites WHERE site_id = ? AND post_id = ?",
|
||||
(site_id, post_id),
|
||||
)
|
||||
self.conn.commit()
|
||||
with self._write():
|
||||
self.conn.execute(
|
||||
"DELETE FROM favorites WHERE site_id = ? AND post_id = ?",
|
||||
(site_id, post_id),
|
||||
)
|
||||
|
||||
# Back-compat shim
|
||||
remove_favorite = remove_bookmark
|
||||
@ -436,11 +466,11 @@ class Database:
|
||||
_row_to_favorite = _row_to_bookmark
|
||||
|
||||
def update_bookmark_cache_path(self, fav_id: int, cached_path: str) -> None:
|
||||
self.conn.execute(
|
||||
"UPDATE favorites SET cached_path = ? WHERE id = ?",
|
||||
(cached_path, fav_id),
|
||||
)
|
||||
self.conn.commit()
|
||||
with self._write():
|
||||
self.conn.execute(
|
||||
"UPDATE favorites SET cached_path = ? WHERE id = ?",
|
||||
(cached_path, fav_id),
|
||||
)
|
||||
|
||||
# Back-compat shim
|
||||
update_favorite_cache_path = update_bookmark_cache_path
|
||||
@ -460,13 +490,13 @@ class Database:
|
||||
|
||||
def add_folder(self, name: str) -> None:
|
||||
clean = _validate_folder_name(name.strip())
|
||||
self.conn.execute(
|
||||
"INSERT OR IGNORE INTO favorite_folders (name) VALUES (?)", (clean,)
|
||||
)
|
||||
self.conn.commit()
|
||||
with self._write():
|
||||
self.conn.execute(
|
||||
"INSERT OR IGNORE INTO favorite_folders (name) VALUES (?)", (clean,)
|
||||
)
|
||||
|
||||
def remove_folder(self, name: str) -> None:
|
||||
with self.conn:
|
||||
with self._write():
|
||||
self.conn.execute(
|
||||
"UPDATE favorites SET folder = NULL WHERE folder = ?", (name,)
|
||||
)
|
||||
@ -474,7 +504,7 @@ class Database:
|
||||
|
||||
def rename_folder(self, old: str, new: str) -> None:
|
||||
new_name = _validate_folder_name(new.strip())
|
||||
with self.conn:
|
||||
with self._write():
|
||||
self.conn.execute(
|
||||
"UPDATE favorites SET folder = ? WHERE folder = ?", (new_name, old)
|
||||
)
|
||||
@ -483,10 +513,10 @@ class Database:
|
||||
)
|
||||
|
||||
def move_bookmark_to_folder(self, fav_id: int, folder: str | None) -> None:
|
||||
self.conn.execute(
|
||||
"UPDATE favorites SET folder = ? WHERE id = ?", (folder, fav_id)
|
||||
)
|
||||
self.conn.commit()
|
||||
with self._write():
|
||||
self.conn.execute(
|
||||
"UPDATE favorites SET folder = ? WHERE id = ?", (folder, fav_id)
|
||||
)
|
||||
|
||||
# Back-compat shim
|
||||
move_favorite_to_folder = move_bookmark_to_folder
|
||||
@ -494,18 +524,18 @@ class Database:
|
||||
# -- Blacklist --
|
||||
|
||||
def add_blacklisted_tag(self, tag: str) -> None:
|
||||
self.conn.execute(
|
||||
"INSERT OR IGNORE INTO blacklisted_tags (tag) VALUES (?)",
|
||||
(tag.strip().lower(),),
|
||||
)
|
||||
self.conn.commit()
|
||||
with self._write():
|
||||
self.conn.execute(
|
||||
"INSERT OR IGNORE INTO blacklisted_tags (tag) VALUES (?)",
|
||||
(tag.strip().lower(),),
|
||||
)
|
||||
|
||||
def remove_blacklisted_tag(self, tag: str) -> None:
|
||||
self.conn.execute(
|
||||
"DELETE FROM blacklisted_tags WHERE tag = ?",
|
||||
(tag.strip().lower(),),
|
||||
)
|
||||
self.conn.commit()
|
||||
with self._write():
|
||||
self.conn.execute(
|
||||
"DELETE FROM blacklisted_tags WHERE tag = ?",
|
||||
(tag.strip().lower(),),
|
||||
)
|
||||
|
||||
def get_blacklisted_tags(self) -> list[str]:
|
||||
rows = self.conn.execute("SELECT tag FROM blacklisted_tags ORDER BY tag").fetchall()
|
||||
@ -514,12 +544,12 @@ class Database:
|
||||
# -- Blacklisted Posts --
|
||||
|
||||
def add_blacklisted_post(self, url: str) -> None:
|
||||
self.conn.execute("INSERT OR IGNORE INTO blacklisted_posts (url) VALUES (?)", (url,))
|
||||
self.conn.commit()
|
||||
with self._write():
|
||||
self.conn.execute("INSERT OR IGNORE INTO blacklisted_posts (url) VALUES (?)", (url,))
|
||||
|
||||
def remove_blacklisted_post(self, url: str) -> None:
|
||||
self.conn.execute("DELETE FROM blacklisted_posts WHERE url = ?", (url,))
|
||||
self.conn.commit()
|
||||
with self._write():
|
||||
self.conn.execute("DELETE FROM blacklisted_posts WHERE url = ?", (url,))
|
||||
|
||||
def get_blacklisted_posts(self) -> set[str]:
|
||||
rows = self.conn.execute("SELECT url FROM blacklisted_posts").fetchall()
|
||||
@ -531,14 +561,14 @@ class Database:
|
||||
score: int = 0, rating: str = None, source: str = None,
|
||||
file_url: str = None) -> None:
|
||||
cats_json = json.dumps(tag_categories) if tag_categories else ""
|
||||
self.conn.execute(
|
||||
"INSERT OR REPLACE INTO library_meta "
|
||||
"(post_id, tags, tag_categories, score, rating, source, file_url, saved_at) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
(post_id, tags, cats_json, score, rating, source, file_url,
|
||||
datetime.now(timezone.utc).isoformat()),
|
||||
)
|
||||
self.conn.commit()
|
||||
with self._write():
|
||||
self.conn.execute(
|
||||
"INSERT OR REPLACE INTO library_meta "
|
||||
"(post_id, tags, tag_categories, score, rating, source, file_url, saved_at) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
(post_id, tags, cats_json, score, rating, source, file_url,
|
||||
datetime.now(timezone.utc).isoformat()),
|
||||
)
|
||||
|
||||
def get_library_meta(self, post_id: int) -> dict | None:
|
||||
row = self.conn.execute("SELECT * FROM library_meta WHERE post_id = ?", (post_id,)).fetchone()
|
||||
@ -558,8 +588,8 @@ class Database:
|
||||
return {r["post_id"] for r in rows}
|
||||
|
||||
def remove_library_meta(self, post_id: int) -> None:
|
||||
self.conn.execute("DELETE FROM library_meta WHERE post_id = ?", (post_id,))
|
||||
self.conn.commit()
|
||||
with self._write():
|
||||
self.conn.execute("DELETE FROM library_meta WHERE post_id = ?", (post_id,))
|
||||
|
||||
# -- Settings --
|
||||
|
||||
@ -576,11 +606,11 @@ class Database:
|
||||
return self.get_setting(key) == "1"
|
||||
|
||||
def set_setting(self, key: str, value: str) -> None:
|
||||
self.conn.execute(
|
||||
"INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
|
||||
(key, str(value)),
|
||||
)
|
||||
self.conn.commit()
|
||||
with self._write():
|
||||
self.conn.execute(
|
||||
"INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
|
||||
(key, str(value)),
|
||||
)
|
||||
|
||||
def get_all_settings(self) -> dict[str, str]:
|
||||
result = dict(_DEFAULTS)
|
||||
@ -595,7 +625,7 @@ class Database:
|
||||
if not query.strip():
|
||||
return
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
with self.conn:
|
||||
with self._write():
|
||||
# Remove duplicate if exists, keep latest
|
||||
self.conn.execute(
|
||||
"DELETE FROM search_history WHERE query = ? AND (site_id = ? OR (site_id IS NULL AND ? IS NULL))",
|
||||
@ -619,21 +649,21 @@ class Database:
|
||||
return [r["query"] for r in rows]
|
||||
|
||||
def clear_search_history(self) -> None:
|
||||
self.conn.execute("DELETE FROM search_history")
|
||||
self.conn.commit()
|
||||
with self._write():
|
||||
self.conn.execute("DELETE FROM search_history")
|
||||
|
||||
def remove_search_history(self, query: str) -> None:
|
||||
self.conn.execute("DELETE FROM search_history WHERE query = ?", (query,))
|
||||
self.conn.commit()
|
||||
with self._write():
|
||||
self.conn.execute("DELETE FROM search_history WHERE query = ?", (query,))
|
||||
|
||||
# -- Saved Searches --
|
||||
|
||||
def add_saved_search(self, name: str, query: str, site_id: int | None = None) -> None:
|
||||
self.conn.execute(
|
||||
"INSERT OR REPLACE INTO saved_searches (name, query, site_id) VALUES (?, ?, ?)",
|
||||
(name.strip(), query.strip(), site_id),
|
||||
)
|
||||
self.conn.commit()
|
||||
with self._write():
|
||||
self.conn.execute(
|
||||
"INSERT OR REPLACE INTO saved_searches (name, query, site_id) VALUES (?, ?, ?)",
|
||||
(name.strip(), query.strip(), site_id),
|
||||
)
|
||||
|
||||
def get_saved_searches(self) -> list[tuple[int, str, str]]:
|
||||
"""Returns list of (id, name, query)."""
|
||||
@ -643,5 +673,5 @@ class Database:
|
||||
return [(r["id"], r["name"], r["query"]) for r in rows]
|
||||
|
||||
def remove_saved_search(self, search_id: int) -> None:
|
||||
self.conn.execute("DELETE FROM saved_searches WHERE id = ?", (search_id,))
|
||||
self.conn.commit()
|
||||
with self._write():
|
||||
self.conn.execute("DELETE FROM saved_searches WHERE id = ?", (search_id,))
|
||||
|
||||
@ -312,10 +312,20 @@ class BooruApp(QMainWindow):
|
||||
self._async_thread = threading.Thread(target=self._async_loop.run_forever, daemon=True)
|
||||
self._async_thread.start()
|
||||
|
||||
# Register the persistent loop as the process-wide app loop. Anything
|
||||
# that wants to schedule async work — `gui/sites.py`, `gui/bookmarks.py`,
|
||||
# any future helper — calls `core.concurrency.run_on_app_loop` which
|
||||
# uses this same loop. The whole point of PR2 is to never run async
|
||||
# code on a throwaway loop again.
|
||||
from ..core.concurrency import set_app_loop
|
||||
set_app_loop(self._async_loop)
|
||||
|
||||
# Reset shared HTTP clients from previous session
|
||||
from ..core.cache import _get_shared_client
|
||||
from ..core.api.base import BooruClient
|
||||
from ..core.api.e621 import E621Client
|
||||
BooruClient._shared_client = None
|
||||
E621Client._e621_client = None
|
||||
E621Client._e621_to_close = []
|
||||
import booru_viewer.core.cache as _cache_mod
|
||||
_cache_mod._shared_client = None
|
||||
|
||||
@ -2927,6 +2937,26 @@ class BooruApp(QMainWindow):
|
||||
self._save_main_splitter_sizes()
|
||||
self._save_right_splitter_sizes()
|
||||
self._save_main_window_state()
|
||||
|
||||
# Cleanly shut the shared httpx pools down BEFORE stopping the loop
|
||||
# so the connection pool / keepalive sockets / TLS state get released
|
||||
# instead of being abandoned mid-flight. Has to run on the loop the
|
||||
# clients were bound to.
|
||||
try:
|
||||
from ..core.api.base import BooruClient
|
||||
from ..core.api.e621 import E621Client
|
||||
from ..core.cache import aclose_shared_client
|
||||
|
||||
async def _close_all():
|
||||
await BooruClient.aclose_shared()
|
||||
await E621Client.aclose_shared()
|
||||
await aclose_shared_client()
|
||||
|
||||
fut = asyncio.run_coroutine_threadsafe(_close_all(), self._async_loop)
|
||||
fut.result(timeout=5)
|
||||
except Exception as e:
|
||||
log.warning(f"Shared httpx aclose failed: {e}")
|
||||
|
||||
self._async_loop.call_soon_threadsafe(self._async_loop.stop)
|
||||
self._async_thread.join(timeout=2)
|
||||
if self._db.get_setting_bool("clear_cache_on_exit"):
|
||||
|
||||
@ -3,8 +3,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
|
||||
from PySide6.QtCore import Qt, Signal, QObject, QTimer
|
||||
@ -25,6 +23,7 @@ from PySide6.QtWidgets import (
|
||||
|
||||
from ..core.db import Database, Bookmark
|
||||
from ..core.cache import download_thumbnail
|
||||
from ..core.concurrency import run_on_app_loop
|
||||
from .grid import ThumbnailGrid
|
||||
|
||||
log = logging.getLogger("booru")
|
||||
@ -173,13 +172,18 @@ class BookmarksView(QWidget):
|
||||
thumb.set_pixmap(pix)
|
||||
|
||||
def _load_thumb_async(self, index: int, url: str) -> None:
|
||||
# Schedule the download on the persistent event loop instead of
|
||||
# spawning a daemon thread that runs its own throwaway loop. This
|
||||
# is the fix for the loop-affinity bug where the cache module's
|
||||
# shared httpx client would get bound to the throwaway loop and
|
||||
# then fail every subsequent use from the persistent loop.
|
||||
async def _dl():
|
||||
try:
|
||||
path = await download_thumbnail(url)
|
||||
self._signals.thumb_ready.emit(index, str(path))
|
||||
except Exception as e:
|
||||
log.warning(f"Bookmark thumb {index} failed: {e}")
|
||||
threading.Thread(target=lambda: asyncio.run(_dl()), daemon=True).start()
|
||||
run_on_app_loop(_dl())
|
||||
|
||||
def _on_thumb_ready(self, index: int, path: str) -> None:
|
||||
thumbs = self._grid._thumbs
|
||||
|
||||
@ -2,10 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import threading
|
||||
|
||||
from PySide6.QtCore import Qt, Signal, QMetaObject, Q_ARG, Qt as QtNS
|
||||
from PySide6.QtCore import Qt, Signal
|
||||
from PySide6.QtWidgets import (
|
||||
QDialog,
|
||||
QVBoxLayout,
|
||||
@ -22,16 +19,34 @@ from PySide6.QtWidgets import (
|
||||
|
||||
from ..core.db import Database, Site
|
||||
from ..core.api.detect import detect_site_type
|
||||
from ..core.concurrency import run_on_app_loop
|
||||
|
||||
|
||||
class SiteDialog(QDialog):
|
||||
"""Dialog to add or edit a booru site."""
|
||||
|
||||
# Internal signals used to marshal worker results back to the GUI thread.
|
||||
# Connected with QueuedConnection so emit() from the asyncio loop thread
|
||||
# is always delivered on the Qt main thread.
|
||||
_detect_done_sig = Signal(object, object) # (result_or_None, error_or_None)
|
||||
_test_done_sig = Signal(bool, str)
|
||||
|
||||
def __init__(self, parent: QWidget | None = None, site: Site | None = None) -> None:
|
||||
super().__init__(parent)
|
||||
self._editing = site is not None
|
||||
self.setWindowTitle("Edit Site" if self._editing else "Add Site")
|
||||
self.setMinimumWidth(400)
|
||||
# Set when the dialog is closed/destroyed so in-flight worker
|
||||
# callbacks can short-circuit instead of poking a dead QObject.
|
||||
self._closed = False
|
||||
# Tracked so we can cancel pending coroutines on close.
|
||||
self._inflight = [] # list[concurrent.futures.Future]
|
||||
self._detect_done_sig.connect(
|
||||
self._detect_finished, Qt.ConnectionType.QueuedConnection
|
||||
)
|
||||
self._test_done_sig.connect(
|
||||
self._test_finished, Qt.ConnectionType.QueuedConnection
|
||||
)
|
||||
|
||||
layout = QVBoxLayout(self)
|
||||
|
||||
@ -102,16 +117,22 @@ class SiteDialog(QDialog):
|
||||
api_key = self._key_input.text().strip() or None
|
||||
api_user = self._user_input.text().strip() or None
|
||||
|
||||
def _run():
|
||||
async def _do_detect():
|
||||
try:
|
||||
result = asyncio.run(detect_site_type(url, api_key=api_key, api_user=api_user))
|
||||
self._detect_finished(result, None)
|
||||
result = await detect_site_type(url, api_key=api_key, api_user=api_user)
|
||||
if not self._closed:
|
||||
self._detect_done_sig.emit(result, None)
|
||||
except Exception as e:
|
||||
self._detect_finished(None, e)
|
||||
if not self._closed:
|
||||
self._detect_done_sig.emit(None, e)
|
||||
|
||||
threading.Thread(target=_run, daemon=True).start()
|
||||
fut = run_on_app_loop(_do_detect())
|
||||
self._inflight.append(fut)
|
||||
fut.add_done_callback(lambda f: self._inflight.remove(f) if f in self._inflight else None)
|
||||
|
||||
def _detect_finished(self, result: str | None, error: Exception | None) -> None:
|
||||
def _detect_finished(self, result, error) -> None:
|
||||
if self._closed:
|
||||
return
|
||||
self._detect_btn.setEnabled(True)
|
||||
if error:
|
||||
self._status_label.setText(f"Error: {error}")
|
||||
@ -132,25 +153,42 @@ class SiteDialog(QDialog):
|
||||
self._status_label.setText("Testing connection...")
|
||||
self._test_btn.setEnabled(False)
|
||||
|
||||
def _run():
|
||||
import asyncio
|
||||
from ..core.api.detect import client_for_type
|
||||
from ..core.api.detect import client_for_type
|
||||
|
||||
async def _do_test():
|
||||
try:
|
||||
client = client_for_type(api_type, url, api_key=api_key, api_user=api_user)
|
||||
ok, detail = asyncio.run(client.test_connection())
|
||||
self._test_finished(ok, detail)
|
||||
ok, detail = await client.test_connection()
|
||||
if not self._closed:
|
||||
self._test_done_sig.emit(ok, detail)
|
||||
except Exception as e:
|
||||
self._test_finished(False, str(e))
|
||||
if not self._closed:
|
||||
self._test_done_sig.emit(False, str(e))
|
||||
|
||||
threading.Thread(target=_run, daemon=True).start()
|
||||
fut = run_on_app_loop(_do_test())
|
||||
self._inflight.append(fut)
|
||||
fut.add_done_callback(lambda f: self._inflight.remove(f) if f in self._inflight else None)
|
||||
|
||||
def _test_finished(self, ok: bool, detail: str) -> None:
|
||||
if self._closed:
|
||||
return
|
||||
self._test_btn.setEnabled(True)
|
||||
if ok:
|
||||
self._status_label.setText(f"Connected! {detail}")
|
||||
else:
|
||||
self._status_label.setText(f"Failed: {detail}")
|
||||
|
||||
def closeEvent(self, event) -> None:
|
||||
# Mark closed first so in-flight callbacks short-circuit, then
|
||||
# cancel anything still pending so we don't tie up the loop.
|
||||
self._closed = True
|
||||
for fut in list(self._inflight):
|
||||
try:
|
||||
fut.cancel()
|
||||
except Exception:
|
||||
pass
|
||||
super().closeEvent(event)
|
||||
|
||||
def _try_parse_url(self, text: str) -> None:
|
||||
"""Strip query params from pasted URLs like https://gelbooru.com/index.php?page=post&s=list&tags=all."""
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user