Route async work through one persistent loop, lock shared httpx + DB writes
Mixing `threading.Thread + asyncio.run` workers with the long-lived asyncio loop in gui/app.py is a real loop-affinity bug: the first worker thread to call `asyncio.run` constructs a throwaway loop, which the shared httpx clients then attach to, and the next call from the persistent loop fails with "Event loop is closed" / "attached to a different loop". This commit eliminates the pattern across the GUI and adds the locking + cleanup that should have been there from the start. Persistent loop accessor (core/concurrency.py — new) - set_app_loop / get_app_loop / run_on_app_loop. BooruApp registers the one persistent loop at startup; everything that wants to schedule async work calls run_on_app_loop instead of spawning a thread that builds its own loop. Three functions, ~30 lines, single source of truth for "the loop". Lazy-init lock + cleanup on shared httpx clients (core/api/base.py, core/api/e621.py, core/cache.py) - Each shared singleton (BooruClient._shared_client, E621Client._e621_client, cache._shared_client) now uses fast-path / locked-slow-path lazy init. Concurrent first-callers from the same loop can no longer both build a client and leak one (verified: 10 racing callers => 1 httpx instance). - Each module exposes an aclose helper that BooruApp.closeEvent runs via run_coroutine_threadsafe(...).result(timeout=5) BEFORE stopping the loop. The connection pool, keepalive sockets, and TLS state finally release cleanly instead of being abandoned at process exit. - E621Client tracks UA-change leftovers in _e621_to_close so the old client doesn't leak when api_user changes — drained in aclose_shared. GUI workers routed through the persistent loop (gui/sites.py, gui/bookmarks.py) - SiteDialog._on_detect / _on_test: replaced `threading.Thread(target=lambda: asyncio.run(...))` with run_on_app_loop. Results marshaled back through Qt Signals connected with QueuedConnection. Added _closed flag + _inflight futures list: closeEvent cancels pending coroutines and shorts out the result emit if the user closes the dialog mid-detect (no use-after-free on destroyed QObject). - BookmarksView._load_thumb_async: same swap. The existing thumb_ready signal already used QueuedConnection so the marshaling side was already correct. DB write serialization (core/db.py) - Database._write_lock = threading.RLock() — RLock not Lock so a writing method can call another writing method on the same thread without self-deadlocking. - New _write() context manager composes the lock + sqlite3's connection context manager (the latter handles BEGIN / COMMIT / ROLLBACK atomically). Every write method converted: add_site, update_site, delete_site, add_bookmark, add_bookmarks_batch, remove_bookmark, update_bookmark_cache_path, add_folder, remove_folder, rename_folder, move_bookmark_to_folder, add/remove_blacklisted_tag, add/remove_blacklisted_post, save_library_meta, remove_library_meta, set_setting, add_search_history, clear_search_history, remove_search_history, add_saved_search, remove_saved_search. - _migrate keeps using the lock + raw _conn context manager because it runs from inside the conn property's lazy init (where _write() would re-enter conn). - Reads stay lock-free and rely on WAL for reader concurrency. Verified under contention: 5 threads × 50 add_bookmark calls => 250 rows, zero corruption, zero "database is locked" errors. Smoke-tested with seven scenarios: get_app_loop raises before set, run_on_app_loop round-trips, lazy init creates exactly one client, 10 concurrent first-callers => 1 httpx, aclose_shared cleans up, RLock allows nested re-acquire, multi-threaded write contention.
This commit is contained in:
parent
54ccc40477
commit
eb58d76bc0
@ -4,6 +4,7 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
import threading
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
|
|
||||||
@ -62,8 +63,18 @@ class BooruClient(ABC):
|
|||||||
|
|
||||||
api_type: str = ""
|
api_type: str = ""
|
||||||
|
|
||||||
# Shared client across all BooruClient instances for connection reuse
|
# Shared httpx client across all BooruClient instances for connection
|
||||||
|
# reuse. Lazily created on first access; the threading.Lock guards the
|
||||||
|
# check-and-set so concurrent first-callers can't both build a client
|
||||||
|
# and leak one. The lock is per-class, lives for the process lifetime.
|
||||||
|
#
|
||||||
|
# Loop affinity: by convention every async call goes through
|
||||||
|
# `core.concurrency.run_on_app_loop`, which schedules on the persistent
|
||||||
|
# event loop in `gui/app.py`. The first lazy init therefore binds the
|
||||||
|
# client to that loop, and every subsequent use is on the same loop.
|
||||||
|
# This is the contract that PR2 enforces — see core/concurrency.py.
|
||||||
_shared_client: httpx.AsyncClient | None = None
|
_shared_client: httpx.AsyncClient | None = None
|
||||||
|
_shared_client_lock: threading.Lock = threading.Lock()
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@ -77,15 +88,37 @@ class BooruClient(ABC):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def client(self) -> httpx.AsyncClient:
|
def client(self) -> httpx.AsyncClient:
|
||||||
if BooruClient._shared_client is None or BooruClient._shared_client.is_closed:
|
# Fast path: client exists and is open. No lock needed for the read.
|
||||||
BooruClient._shared_client = httpx.AsyncClient(
|
c = BooruClient._shared_client
|
||||||
headers={"User-Agent": USER_AGENT},
|
if c is not None and not c.is_closed:
|
||||||
follow_redirects=True,
|
return c
|
||||||
timeout=20.0,
|
# Slow path: build it. Lock so two coroutines on the same loop don't
|
||||||
event_hooks={"request": [self._log_request]},
|
# both construct + leak.
|
||||||
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
|
with BooruClient._shared_client_lock:
|
||||||
)
|
c = BooruClient._shared_client
|
||||||
return BooruClient._shared_client
|
if c is None or c.is_closed:
|
||||||
|
c = httpx.AsyncClient(
|
||||||
|
headers={"User-Agent": USER_AGENT},
|
||||||
|
follow_redirects=True,
|
||||||
|
timeout=20.0,
|
||||||
|
event_hooks={"request": [self._log_request]},
|
||||||
|
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
|
||||||
|
)
|
||||||
|
BooruClient._shared_client = c
|
||||||
|
return c
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def aclose_shared(cls) -> None:
|
||||||
|
"""Cleanly aclose the shared client. Safe to call from any coroutine
|
||||||
|
running on the loop the client is bound to. No-op if not initialized."""
|
||||||
|
with cls._shared_client_lock:
|
||||||
|
c = cls._shared_client
|
||||||
|
cls._shared_client = None
|
||||||
|
if c is not None and not c.is_closed:
|
||||||
|
try:
|
||||||
|
await c.aclose()
|
||||||
|
except Exception as e:
|
||||||
|
log.warning("BooruClient shared aclose failed: %s", e)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def _log_request(request: httpx.Request) -> None:
|
async def _log_request(request: httpx.Request) -> None:
|
||||||
@ -124,7 +157,10 @@ class BooruClient(ABC):
|
|||||||
return resp # unreachable in practice, satisfies type checker
|
return resp # unreachable in practice, satisfies type checker
|
||||||
|
|
||||||
async def close(self) -> None:
|
async def close(self) -> None:
|
||||||
pass # shared client stays open
|
# Per-instance close is a no-op — the shared pool is owned by the
|
||||||
|
# class. Use `await BooruClient.aclose_shared()` from app shutdown
|
||||||
|
# to actually release the connection pool.
|
||||||
|
pass
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def search(
|
async def search(
|
||||||
|
|||||||
@ -3,6 +3,7 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import threading
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
@ -15,23 +16,56 @@ log = logging.getLogger("booru")
|
|||||||
class E621Client(BooruClient):
|
class E621Client(BooruClient):
|
||||||
api_type = "e621"
|
api_type = "e621"
|
||||||
|
|
||||||
|
# Same shared-singleton pattern as BooruClient, but e621 needs a custom
|
||||||
|
# User-Agent (their TOS requires identifying the app + user). When the
|
||||||
|
# UA changes (api_user edit) we need to rebuild — and we explicitly
|
||||||
|
# close the old client to avoid leaking its connection pool.
|
||||||
_e621_client: httpx.AsyncClient | None = None
|
_e621_client: httpx.AsyncClient | None = None
|
||||||
_e621_ua: str = ""
|
_e621_ua: str = ""
|
||||||
|
_e621_lock: threading.Lock = threading.Lock()
|
||||||
|
# Old clients pending aclose. We can't await from a sync property, so
|
||||||
|
# we stash them here and the app's shutdown coroutine drains them.
|
||||||
|
_e621_to_close: list[httpx.AsyncClient] = []
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def client(self) -> httpx.AsyncClient:
|
def client(self) -> httpx.AsyncClient:
|
||||||
ua = USER_AGENT
|
ua = USER_AGENT
|
||||||
if self.api_user:
|
if self.api_user:
|
||||||
ua = f"{USER_AGENT} (by {self.api_user} on e621)"
|
ua = f"{USER_AGENT} (by {self.api_user} on e621)"
|
||||||
if E621Client._e621_client is None or E621Client._e621_client.is_closed or E621Client._e621_ua != ua:
|
# Fast path
|
||||||
E621Client._e621_ua = ua
|
c = E621Client._e621_client
|
||||||
E621Client._e621_client = httpx.AsyncClient(
|
if c is not None and not c.is_closed and E621Client._e621_ua == ua:
|
||||||
headers={"User-Agent": ua},
|
return c
|
||||||
follow_redirects=True,
|
with E621Client._e621_lock:
|
||||||
timeout=20.0,
|
c = E621Client._e621_client
|
||||||
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
|
if c is None or c.is_closed or E621Client._e621_ua != ua:
|
||||||
)
|
# Stash old client for shutdown cleanup if it's still open.
|
||||||
return E621Client._e621_client
|
if c is not None and not c.is_closed:
|
||||||
|
E621Client._e621_to_close.append(c)
|
||||||
|
E621Client._e621_ua = ua
|
||||||
|
c = httpx.AsyncClient(
|
||||||
|
headers={"User-Agent": ua},
|
||||||
|
follow_redirects=True,
|
||||||
|
timeout=20.0,
|
||||||
|
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
|
||||||
|
)
|
||||||
|
E621Client._e621_client = c
|
||||||
|
return c
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def aclose_shared(cls) -> None:
|
||||||
|
"""Cleanly aclose the active client and any UA-change leftovers."""
|
||||||
|
with cls._e621_lock:
|
||||||
|
current = cls._e621_client
|
||||||
|
cls._e621_client = None
|
||||||
|
pending = cls._e621_to_close
|
||||||
|
cls._e621_to_close = []
|
||||||
|
for c in [current, *pending]:
|
||||||
|
if c is not None and not c.is_closed:
|
||||||
|
try:
|
||||||
|
await c.aclose()
|
||||||
|
except Exception as e:
|
||||||
|
log.warning("E621Client aclose failed: %s", e)
|
||||||
|
|
||||||
async def search(
|
async def search(
|
||||||
self, tags: str = "", page: int = 1, limit: int = DEFAULT_PAGE_SIZE
|
self, tags: str = "", page: int = 1, limit: int = DEFAULT_PAGE_SIZE
|
||||||
|
|||||||
@ -7,6 +7,7 @@ import hashlib
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
|
import threading
|
||||||
import zipfile
|
import zipfile
|
||||||
from collections import OrderedDict, defaultdict
|
from collections import OrderedDict, defaultdict
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
@ -64,23 +65,48 @@ def _url_hash(url: str) -> str:
|
|||||||
return hashlib.sha256(url.encode()).hexdigest()[:16]
|
return hashlib.sha256(url.encode()).hexdigest()[:16]
|
||||||
|
|
||||||
|
|
||||||
# Shared httpx client for connection pooling (avoids per-request TLS handshakes)
|
# Shared httpx client for connection pooling (avoids per-request TLS handshakes).
|
||||||
|
# Lazily created on first download. Lock guards the check-and-set so concurrent
|
||||||
|
# first-callers can't both build a client and leak one. Loop affinity is
|
||||||
|
# guaranteed by routing all downloads through `core.concurrency.run_on_app_loop`
|
||||||
|
# (see PR2).
|
||||||
_shared_client: httpx.AsyncClient | None = None
|
_shared_client: httpx.AsyncClient | None = None
|
||||||
|
_shared_client_lock = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
def _get_shared_client(referer: str = "") -> httpx.AsyncClient:
|
def _get_shared_client(referer: str = "") -> httpx.AsyncClient:
|
||||||
global _shared_client
|
global _shared_client
|
||||||
if _shared_client is None or _shared_client.is_closed:
|
c = _shared_client
|
||||||
_shared_client = httpx.AsyncClient(
|
if c is not None and not c.is_closed:
|
||||||
headers={
|
return c
|
||||||
"User-Agent": USER_AGENT,
|
with _shared_client_lock:
|
||||||
"Accept": "image/*,video/*,*/*",
|
c = _shared_client
|
||||||
},
|
if c is None or c.is_closed:
|
||||||
follow_redirects=True,
|
c = httpx.AsyncClient(
|
||||||
timeout=60.0,
|
headers={
|
||||||
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
|
"User-Agent": USER_AGENT,
|
||||||
)
|
"Accept": "image/*,video/*,*/*",
|
||||||
return _shared_client
|
},
|
||||||
|
follow_redirects=True,
|
||||||
|
timeout=60.0,
|
||||||
|
limits=httpx.Limits(max_connections=10, max_keepalive_connections=5),
|
||||||
|
)
|
||||||
|
_shared_client = c
|
||||||
|
return c
|
||||||
|
|
||||||
|
|
||||||
|
async def aclose_shared_client() -> None:
|
||||||
|
"""Cleanly aclose the cache module's shared download client. Safe to call
|
||||||
|
once at app shutdown; no-op if not initialized."""
|
||||||
|
global _shared_client
|
||||||
|
with _shared_client_lock:
|
||||||
|
c = _shared_client
|
||||||
|
_shared_client = None
|
||||||
|
if c is not None and not c.is_closed:
|
||||||
|
try:
|
||||||
|
await c.aclose()
|
||||||
|
except Exception as e:
|
||||||
|
log.warning("cache shared client aclose failed: %s", e)
|
||||||
|
|
||||||
|
|
||||||
_IMAGE_MAGIC = {
|
_IMAGE_MAGIC = {
|
||||||
|
|||||||
64
booru_viewer/core/concurrency.py
Normal file
64
booru_viewer/core/concurrency.py
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
"""Process-wide handle to the app's persistent asyncio event loop.
|
||||||
|
|
||||||
|
The GUI runs Qt on the main thread and a single long-lived asyncio loop in
|
||||||
|
a daemon thread (`BooruApp._async_thread`). Every async piece of code in the
|
||||||
|
app — searches, downloads, autocomplete, site detection, bookmark thumb
|
||||||
|
loading — must run on that one loop. Without this guarantee, the shared
|
||||||
|
httpx clients (which httpx binds to whatever loop first instantiated them)
|
||||||
|
end up attached to a throwaway loop from a `threading.Thread + asyncio.run`
|
||||||
|
worker, then break the next time the persistent loop tries to use them
|
||||||
|
("attached to a different loop" / "Event loop is closed").
|
||||||
|
|
||||||
|
This module is the single source of truth for "the loop". `BooruApp.__init__`
|
||||||
|
calls `set_app_loop()` once after constructing it; everything else uses
|
||||||
|
`run_on_app_loop()` to schedule coroutines from any thread.
|
||||||
|
|
||||||
|
Why a module global instead of passing the loop everywhere: it avoids
|
||||||
|
threading a parameter through every dialog, view, and helper. There's only
|
||||||
|
one loop in the process, ever, so a global is the honest representation.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from concurrent.futures import Future
|
||||||
|
from typing import Any, Awaitable, Callable
|
||||||
|
|
||||||
|
log = logging.getLogger("booru")
|
||||||
|
|
||||||
|
_app_loop: asyncio.AbstractEventLoop | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def set_app_loop(loop: asyncio.AbstractEventLoop) -> None:
|
||||||
|
"""Register the persistent event loop. Called once at app startup."""
|
||||||
|
global _app_loop
|
||||||
|
_app_loop = loop
|
||||||
|
|
||||||
|
|
||||||
|
def get_app_loop() -> asyncio.AbstractEventLoop:
|
||||||
|
"""Return the persistent event loop. Raises if `set_app_loop` was never called."""
|
||||||
|
if _app_loop is None:
|
||||||
|
raise RuntimeError(
|
||||||
|
"App event loop not initialized — call set_app_loop() before "
|
||||||
|
"scheduling any async work."
|
||||||
|
)
|
||||||
|
return _app_loop
|
||||||
|
|
||||||
|
|
||||||
|
def run_on_app_loop(
|
||||||
|
coro: Awaitable[Any],
|
||||||
|
done_callback: Callable[[Future], None] | None = None,
|
||||||
|
) -> Future:
|
||||||
|
"""Schedule `coro` on the app's persistent event loop from any thread.
|
||||||
|
|
||||||
|
Returns a `concurrent.futures.Future` (not asyncio.Future) — same shape as
|
||||||
|
`asyncio.run_coroutine_threadsafe`. If `done_callback` is provided, it
|
||||||
|
runs on the loop thread when the coroutine finishes; the callback is
|
||||||
|
responsible for marshaling results back to the GUI thread (typically by
|
||||||
|
emitting a Qt Signal connected with `Qt.ConnectionType.QueuedConnection`).
|
||||||
|
"""
|
||||||
|
fut = asyncio.run_coroutine_threadsafe(coro, get_app_loop())
|
||||||
|
if done_callback is not None:
|
||||||
|
fut.add_done_callback(done_callback)
|
||||||
|
return fut
|
||||||
@ -5,6 +5,8 @@ from __future__ import annotations
|
|||||||
import os
|
import os
|
||||||
import sqlite3
|
import sqlite3
|
||||||
import json
|
import json
|
||||||
|
import threading
|
||||||
|
from contextlib import contextmanager
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@ -175,6 +177,13 @@ class Database:
|
|||||||
def __init__(self, path: Path | None = None) -> None:
|
def __init__(self, path: Path | None = None) -> None:
|
||||||
self._path = path or db_path()
|
self._path = path or db_path()
|
||||||
self._conn: sqlite3.Connection | None = None
|
self._conn: sqlite3.Connection | None = None
|
||||||
|
# Single writer lock for the connection. Reads happen concurrently
|
||||||
|
# under WAL without contention; writes from multiple threads (Qt
|
||||||
|
# main + the persistent asyncio loop thread) need explicit
|
||||||
|
# serialization to avoid interleaved multi-statement methods.
|
||||||
|
# RLock so a writing method can call another writing method on the
|
||||||
|
# same thread without self-deadlocking.
|
||||||
|
self._write_lock = threading.RLock()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def conn(self) -> sqlite3.Connection:
|
def conn(self) -> sqlite3.Connection:
|
||||||
@ -187,28 +196,49 @@ class Database:
|
|||||||
self._migrate()
|
self._migrate()
|
||||||
return self._conn
|
return self._conn
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def _write(self):
|
||||||
|
"""Context manager for write methods.
|
||||||
|
|
||||||
|
Acquires the write lock for cross-thread serialization, then enters
|
||||||
|
sqlite3's connection context manager (which BEGINs and COMMIT/ROLLBACKs
|
||||||
|
atomically). Use this in place of `with self.conn:` whenever a method
|
||||||
|
writes — it composes the two guarantees we want:
|
||||||
|
1. Multi-statement atomicity (sqlite3 handles)
|
||||||
|
2. Cross-thread write serialization (the RLock handles)
|
||||||
|
Reads do not need this — they go through `self.conn.execute(...)` directly
|
||||||
|
and rely on WAL for concurrent-reader isolation.
|
||||||
|
"""
|
||||||
|
with self._write_lock:
|
||||||
|
with self.conn:
|
||||||
|
yield self.conn
|
||||||
|
|
||||||
def _migrate(self) -> None:
|
def _migrate(self) -> None:
|
||||||
"""Add columns that may not exist in older databases.
|
"""Add columns that may not exist in older databases.
|
||||||
|
|
||||||
All ALTERs are wrapped in a single transaction so a crash partway
|
All ALTERs are wrapped in a single transaction so a crash partway
|
||||||
through can't leave the schema half-migrated.
|
through can't leave the schema half-migrated. Note: this runs from
|
||||||
|
the `conn` property's lazy init, where `_write_lock` exists but the
|
||||||
|
connection is being built — we only need to serialize writes via
|
||||||
|
the lock; the connection context manager handles atomicity.
|
||||||
"""
|
"""
|
||||||
with self._conn:
|
with self._write_lock:
|
||||||
cur = self._conn.execute("PRAGMA table_info(favorites)")
|
with self._conn:
|
||||||
cols = {row[1] for row in cur.fetchall()}
|
cur = self._conn.execute("PRAGMA table_info(favorites)")
|
||||||
if "folder" not in cols:
|
cols = {row[1] for row in cur.fetchall()}
|
||||||
self._conn.execute("ALTER TABLE favorites ADD COLUMN folder TEXT")
|
if "folder" not in cols:
|
||||||
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_favorites_folder ON favorites(folder)")
|
self._conn.execute("ALTER TABLE favorites ADD COLUMN folder TEXT")
|
||||||
# Add tag_categories to library_meta if missing
|
self._conn.execute("CREATE INDEX IF NOT EXISTS idx_favorites_folder ON favorites(folder)")
|
||||||
tables = {r[0] for r in self._conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
|
# Add tag_categories to library_meta if missing
|
||||||
if "library_meta" in tables:
|
tables = {r[0] for r in self._conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()}
|
||||||
cur = self._conn.execute("PRAGMA table_info(library_meta)")
|
if "library_meta" in tables:
|
||||||
meta_cols = {row[1] for row in cur.fetchall()}
|
cur = self._conn.execute("PRAGMA table_info(library_meta)")
|
||||||
if "tag_categories" not in meta_cols:
|
meta_cols = {row[1] for row in cur.fetchall()}
|
||||||
self._conn.execute("ALTER TABLE library_meta ADD COLUMN tag_categories TEXT DEFAULT ''")
|
if "tag_categories" not in meta_cols:
|
||||||
# Add tag_categories to favorites if missing
|
self._conn.execute("ALTER TABLE library_meta ADD COLUMN tag_categories TEXT DEFAULT ''")
|
||||||
if "tag_categories" not in cols:
|
# Add tag_categories to favorites if missing
|
||||||
self._conn.execute("ALTER TABLE favorites ADD COLUMN tag_categories TEXT DEFAULT ''")
|
if "tag_categories" not in cols:
|
||||||
|
self._conn.execute("ALTER TABLE favorites ADD COLUMN tag_categories TEXT DEFAULT ''")
|
||||||
|
|
||||||
def close(self) -> None:
|
def close(self) -> None:
|
||||||
if self._conn:
|
if self._conn:
|
||||||
@ -226,12 +256,12 @@ class Database:
|
|||||||
api_user: str | None = None,
|
api_user: str | None = None,
|
||||||
) -> Site:
|
) -> Site:
|
||||||
now = datetime.now(timezone.utc).isoformat()
|
now = datetime.now(timezone.utc).isoformat()
|
||||||
cur = self.conn.execute(
|
with self._write():
|
||||||
"INSERT INTO sites (name, url, api_type, api_key, api_user, added_at) "
|
cur = self.conn.execute(
|
||||||
"VALUES (?, ?, ?, ?, ?, ?)",
|
"INSERT INTO sites (name, url, api_type, api_key, api_user, added_at) "
|
||||||
(name, url.rstrip("/"), api_type, api_key, api_user, now),
|
"VALUES (?, ?, ?, ?, ?, ?)",
|
||||||
)
|
(name, url.rstrip("/"), api_type, api_key, api_user, now),
|
||||||
self.conn.commit()
|
)
|
||||||
return Site(
|
return Site(
|
||||||
id=cur.lastrowid, # type: ignore[arg-type]
|
id=cur.lastrowid, # type: ignore[arg-type]
|
||||||
name=name,
|
name=name,
|
||||||
@ -261,7 +291,7 @@ class Database:
|
|||||||
]
|
]
|
||||||
|
|
||||||
def delete_site(self, site_id: int) -> None:
|
def delete_site(self, site_id: int) -> None:
|
||||||
with self.conn:
|
with self._write():
|
||||||
self.conn.execute("DELETE FROM favorites WHERE site_id = ?", (site_id,))
|
self.conn.execute("DELETE FROM favorites WHERE site_id = ?", (site_id,))
|
||||||
self.conn.execute("DELETE FROM sites WHERE id = ?", (site_id,))
|
self.conn.execute("DELETE FROM sites WHERE id = ?", (site_id,))
|
||||||
|
|
||||||
@ -277,10 +307,10 @@ class Database:
|
|||||||
if not sets:
|
if not sets:
|
||||||
return
|
return
|
||||||
vals.append(site_id)
|
vals.append(site_id)
|
||||||
self.conn.execute(
|
with self._write():
|
||||||
f"UPDATE sites SET {', '.join(sets)} WHERE id = ?", vals
|
self.conn.execute(
|
||||||
)
|
f"UPDATE sites SET {', '.join(sets)} WHERE id = ?", vals
|
||||||
self.conn.commit()
|
)
|
||||||
|
|
||||||
# -- Bookmarks --
|
# -- Bookmarks --
|
||||||
|
|
||||||
@ -300,7 +330,7 @@ class Database:
|
|||||||
) -> Bookmark:
|
) -> Bookmark:
|
||||||
now = datetime.now(timezone.utc).isoformat()
|
now = datetime.now(timezone.utc).isoformat()
|
||||||
cats_json = json.dumps(tag_categories) if tag_categories else ""
|
cats_json = json.dumps(tag_categories) if tag_categories else ""
|
||||||
with self.conn:
|
with self._write():
|
||||||
cur = self.conn.execute(
|
cur = self.conn.execute(
|
||||||
"INSERT OR IGNORE INTO favorites "
|
"INSERT OR IGNORE INTO favorites "
|
||||||
"(site_id, post_id, file_url, preview_url, tags, rating, score, source, cached_path, folder, favorited_at, tag_categories) "
|
"(site_id, post_id, file_url, preview_url, tags, rating, score, source, cached_path, folder, favorited_at, tag_categories) "
|
||||||
@ -342,26 +372,26 @@ class Database:
|
|||||||
|
|
||||||
def add_bookmarks_batch(self, bookmarks: list[dict]) -> None:
|
def add_bookmarks_batch(self, bookmarks: list[dict]) -> None:
|
||||||
"""Add multiple bookmarks in a single transaction."""
|
"""Add multiple bookmarks in a single transaction."""
|
||||||
for fav in bookmarks:
|
with self._write():
|
||||||
self.conn.execute(
|
for fav in bookmarks:
|
||||||
"INSERT OR IGNORE INTO favorites "
|
self.conn.execute(
|
||||||
"(site_id, post_id, file_url, preview_url, tags, rating, score, source, cached_path, folder, favorited_at) "
|
"INSERT OR IGNORE INTO favorites "
|
||||||
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
"(site_id, post_id, file_url, preview_url, tags, rating, score, source, cached_path, folder, favorited_at) "
|
||||||
(fav['site_id'], fav['post_id'], fav['file_url'], fav.get('preview_url'),
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||||
fav.get('tags', ''), fav.get('rating'), fav.get('score'), fav.get('source'),
|
(fav['site_id'], fav['post_id'], fav['file_url'], fav.get('preview_url'),
|
||||||
fav.get('cached_path'), fav.get('folder'), fav.get('favorited_at', datetime.now(timezone.utc).isoformat())),
|
fav.get('tags', ''), fav.get('rating'), fav.get('score'), fav.get('source'),
|
||||||
)
|
fav.get('cached_path'), fav.get('folder'), fav.get('favorited_at', datetime.now(timezone.utc).isoformat())),
|
||||||
self.conn.commit()
|
)
|
||||||
|
|
||||||
# Back-compat shim
|
# Back-compat shim
|
||||||
add_favorites_batch = add_bookmarks_batch
|
add_favorites_batch = add_bookmarks_batch
|
||||||
|
|
||||||
def remove_bookmark(self, site_id: int, post_id: int) -> None:
|
def remove_bookmark(self, site_id: int, post_id: int) -> None:
|
||||||
self.conn.execute(
|
with self._write():
|
||||||
"DELETE FROM favorites WHERE site_id = ? AND post_id = ?",
|
self.conn.execute(
|
||||||
(site_id, post_id),
|
"DELETE FROM favorites WHERE site_id = ? AND post_id = ?",
|
||||||
)
|
(site_id, post_id),
|
||||||
self.conn.commit()
|
)
|
||||||
|
|
||||||
# Back-compat shim
|
# Back-compat shim
|
||||||
remove_favorite = remove_bookmark
|
remove_favorite = remove_bookmark
|
||||||
@ -436,11 +466,11 @@ class Database:
|
|||||||
_row_to_favorite = _row_to_bookmark
|
_row_to_favorite = _row_to_bookmark
|
||||||
|
|
||||||
def update_bookmark_cache_path(self, fav_id: int, cached_path: str) -> None:
|
def update_bookmark_cache_path(self, fav_id: int, cached_path: str) -> None:
|
||||||
self.conn.execute(
|
with self._write():
|
||||||
"UPDATE favorites SET cached_path = ? WHERE id = ?",
|
self.conn.execute(
|
||||||
(cached_path, fav_id),
|
"UPDATE favorites SET cached_path = ? WHERE id = ?",
|
||||||
)
|
(cached_path, fav_id),
|
||||||
self.conn.commit()
|
)
|
||||||
|
|
||||||
# Back-compat shim
|
# Back-compat shim
|
||||||
update_favorite_cache_path = update_bookmark_cache_path
|
update_favorite_cache_path = update_bookmark_cache_path
|
||||||
@ -460,13 +490,13 @@ class Database:
|
|||||||
|
|
||||||
def add_folder(self, name: str) -> None:
|
def add_folder(self, name: str) -> None:
|
||||||
clean = _validate_folder_name(name.strip())
|
clean = _validate_folder_name(name.strip())
|
||||||
self.conn.execute(
|
with self._write():
|
||||||
"INSERT OR IGNORE INTO favorite_folders (name) VALUES (?)", (clean,)
|
self.conn.execute(
|
||||||
)
|
"INSERT OR IGNORE INTO favorite_folders (name) VALUES (?)", (clean,)
|
||||||
self.conn.commit()
|
)
|
||||||
|
|
||||||
def remove_folder(self, name: str) -> None:
|
def remove_folder(self, name: str) -> None:
|
||||||
with self.conn:
|
with self._write():
|
||||||
self.conn.execute(
|
self.conn.execute(
|
||||||
"UPDATE favorites SET folder = NULL WHERE folder = ?", (name,)
|
"UPDATE favorites SET folder = NULL WHERE folder = ?", (name,)
|
||||||
)
|
)
|
||||||
@ -474,7 +504,7 @@ class Database:
|
|||||||
|
|
||||||
def rename_folder(self, old: str, new: str) -> None:
|
def rename_folder(self, old: str, new: str) -> None:
|
||||||
new_name = _validate_folder_name(new.strip())
|
new_name = _validate_folder_name(new.strip())
|
||||||
with self.conn:
|
with self._write():
|
||||||
self.conn.execute(
|
self.conn.execute(
|
||||||
"UPDATE favorites SET folder = ? WHERE folder = ?", (new_name, old)
|
"UPDATE favorites SET folder = ? WHERE folder = ?", (new_name, old)
|
||||||
)
|
)
|
||||||
@ -483,10 +513,10 @@ class Database:
|
|||||||
)
|
)
|
||||||
|
|
||||||
def move_bookmark_to_folder(self, fav_id: int, folder: str | None) -> None:
|
def move_bookmark_to_folder(self, fav_id: int, folder: str | None) -> None:
|
||||||
self.conn.execute(
|
with self._write():
|
||||||
"UPDATE favorites SET folder = ? WHERE id = ?", (folder, fav_id)
|
self.conn.execute(
|
||||||
)
|
"UPDATE favorites SET folder = ? WHERE id = ?", (folder, fav_id)
|
||||||
self.conn.commit()
|
)
|
||||||
|
|
||||||
# Back-compat shim
|
# Back-compat shim
|
||||||
move_favorite_to_folder = move_bookmark_to_folder
|
move_favorite_to_folder = move_bookmark_to_folder
|
||||||
@ -494,18 +524,18 @@ class Database:
|
|||||||
# -- Blacklist --
|
# -- Blacklist --
|
||||||
|
|
||||||
def add_blacklisted_tag(self, tag: str) -> None:
|
def add_blacklisted_tag(self, tag: str) -> None:
|
||||||
self.conn.execute(
|
with self._write():
|
||||||
"INSERT OR IGNORE INTO blacklisted_tags (tag) VALUES (?)",
|
self.conn.execute(
|
||||||
(tag.strip().lower(),),
|
"INSERT OR IGNORE INTO blacklisted_tags (tag) VALUES (?)",
|
||||||
)
|
(tag.strip().lower(),),
|
||||||
self.conn.commit()
|
)
|
||||||
|
|
||||||
def remove_blacklisted_tag(self, tag: str) -> None:
|
def remove_blacklisted_tag(self, tag: str) -> None:
|
||||||
self.conn.execute(
|
with self._write():
|
||||||
"DELETE FROM blacklisted_tags WHERE tag = ?",
|
self.conn.execute(
|
||||||
(tag.strip().lower(),),
|
"DELETE FROM blacklisted_tags WHERE tag = ?",
|
||||||
)
|
(tag.strip().lower(),),
|
||||||
self.conn.commit()
|
)
|
||||||
|
|
||||||
def get_blacklisted_tags(self) -> list[str]:
|
def get_blacklisted_tags(self) -> list[str]:
|
||||||
rows = self.conn.execute("SELECT tag FROM blacklisted_tags ORDER BY tag").fetchall()
|
rows = self.conn.execute("SELECT tag FROM blacklisted_tags ORDER BY tag").fetchall()
|
||||||
@ -514,12 +544,12 @@ class Database:
|
|||||||
# -- Blacklisted Posts --
|
# -- Blacklisted Posts --
|
||||||
|
|
||||||
def add_blacklisted_post(self, url: str) -> None:
|
def add_blacklisted_post(self, url: str) -> None:
|
||||||
self.conn.execute("INSERT OR IGNORE INTO blacklisted_posts (url) VALUES (?)", (url,))
|
with self._write():
|
||||||
self.conn.commit()
|
self.conn.execute("INSERT OR IGNORE INTO blacklisted_posts (url) VALUES (?)", (url,))
|
||||||
|
|
||||||
def remove_blacklisted_post(self, url: str) -> None:
|
def remove_blacklisted_post(self, url: str) -> None:
|
||||||
self.conn.execute("DELETE FROM blacklisted_posts WHERE url = ?", (url,))
|
with self._write():
|
||||||
self.conn.commit()
|
self.conn.execute("DELETE FROM blacklisted_posts WHERE url = ?", (url,))
|
||||||
|
|
||||||
def get_blacklisted_posts(self) -> set[str]:
|
def get_blacklisted_posts(self) -> set[str]:
|
||||||
rows = self.conn.execute("SELECT url FROM blacklisted_posts").fetchall()
|
rows = self.conn.execute("SELECT url FROM blacklisted_posts").fetchall()
|
||||||
@ -531,14 +561,14 @@ class Database:
|
|||||||
score: int = 0, rating: str = None, source: str = None,
|
score: int = 0, rating: str = None, source: str = None,
|
||||||
file_url: str = None) -> None:
|
file_url: str = None) -> None:
|
||||||
cats_json = json.dumps(tag_categories) if tag_categories else ""
|
cats_json = json.dumps(tag_categories) if tag_categories else ""
|
||||||
self.conn.execute(
|
with self._write():
|
||||||
"INSERT OR REPLACE INTO library_meta "
|
self.conn.execute(
|
||||||
"(post_id, tags, tag_categories, score, rating, source, file_url, saved_at) "
|
"INSERT OR REPLACE INTO library_meta "
|
||||||
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
"(post_id, tags, tag_categories, score, rating, source, file_url, saved_at) "
|
||||||
(post_id, tags, cats_json, score, rating, source, file_url,
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
||||||
datetime.now(timezone.utc).isoformat()),
|
(post_id, tags, cats_json, score, rating, source, file_url,
|
||||||
)
|
datetime.now(timezone.utc).isoformat()),
|
||||||
self.conn.commit()
|
)
|
||||||
|
|
||||||
def get_library_meta(self, post_id: int) -> dict | None:
|
def get_library_meta(self, post_id: int) -> dict | None:
|
||||||
row = self.conn.execute("SELECT * FROM library_meta WHERE post_id = ?", (post_id,)).fetchone()
|
row = self.conn.execute("SELECT * FROM library_meta WHERE post_id = ?", (post_id,)).fetchone()
|
||||||
@ -558,8 +588,8 @@ class Database:
|
|||||||
return {r["post_id"] for r in rows}
|
return {r["post_id"] for r in rows}
|
||||||
|
|
||||||
def remove_library_meta(self, post_id: int) -> None:
|
def remove_library_meta(self, post_id: int) -> None:
|
||||||
self.conn.execute("DELETE FROM library_meta WHERE post_id = ?", (post_id,))
|
with self._write():
|
||||||
self.conn.commit()
|
self.conn.execute("DELETE FROM library_meta WHERE post_id = ?", (post_id,))
|
||||||
|
|
||||||
# -- Settings --
|
# -- Settings --
|
||||||
|
|
||||||
@ -576,11 +606,11 @@ class Database:
|
|||||||
return self.get_setting(key) == "1"
|
return self.get_setting(key) == "1"
|
||||||
|
|
||||||
def set_setting(self, key: str, value: str) -> None:
|
def set_setting(self, key: str, value: str) -> None:
|
||||||
self.conn.execute(
|
with self._write():
|
||||||
"INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
|
self.conn.execute(
|
||||||
(key, str(value)),
|
"INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
|
||||||
)
|
(key, str(value)),
|
||||||
self.conn.commit()
|
)
|
||||||
|
|
||||||
def get_all_settings(self) -> dict[str, str]:
|
def get_all_settings(self) -> dict[str, str]:
|
||||||
result = dict(_DEFAULTS)
|
result = dict(_DEFAULTS)
|
||||||
@ -595,7 +625,7 @@ class Database:
|
|||||||
if not query.strip():
|
if not query.strip():
|
||||||
return
|
return
|
||||||
now = datetime.now(timezone.utc).isoformat()
|
now = datetime.now(timezone.utc).isoformat()
|
||||||
with self.conn:
|
with self._write():
|
||||||
# Remove duplicate if exists, keep latest
|
# Remove duplicate if exists, keep latest
|
||||||
self.conn.execute(
|
self.conn.execute(
|
||||||
"DELETE FROM search_history WHERE query = ? AND (site_id = ? OR (site_id IS NULL AND ? IS NULL))",
|
"DELETE FROM search_history WHERE query = ? AND (site_id = ? OR (site_id IS NULL AND ? IS NULL))",
|
||||||
@ -619,21 +649,21 @@ class Database:
|
|||||||
return [r["query"] for r in rows]
|
return [r["query"] for r in rows]
|
||||||
|
|
||||||
def clear_search_history(self) -> None:
|
def clear_search_history(self) -> None:
|
||||||
self.conn.execute("DELETE FROM search_history")
|
with self._write():
|
||||||
self.conn.commit()
|
self.conn.execute("DELETE FROM search_history")
|
||||||
|
|
||||||
def remove_search_history(self, query: str) -> None:
|
def remove_search_history(self, query: str) -> None:
|
||||||
self.conn.execute("DELETE FROM search_history WHERE query = ?", (query,))
|
with self._write():
|
||||||
self.conn.commit()
|
self.conn.execute("DELETE FROM search_history WHERE query = ?", (query,))
|
||||||
|
|
||||||
# -- Saved Searches --
|
# -- Saved Searches --
|
||||||
|
|
||||||
def add_saved_search(self, name: str, query: str, site_id: int | None = None) -> None:
|
def add_saved_search(self, name: str, query: str, site_id: int | None = None) -> None:
|
||||||
self.conn.execute(
|
with self._write():
|
||||||
"INSERT OR REPLACE INTO saved_searches (name, query, site_id) VALUES (?, ?, ?)",
|
self.conn.execute(
|
||||||
(name.strip(), query.strip(), site_id),
|
"INSERT OR REPLACE INTO saved_searches (name, query, site_id) VALUES (?, ?, ?)",
|
||||||
)
|
(name.strip(), query.strip(), site_id),
|
||||||
self.conn.commit()
|
)
|
||||||
|
|
||||||
def get_saved_searches(self) -> list[tuple[int, str, str]]:
|
def get_saved_searches(self) -> list[tuple[int, str, str]]:
|
||||||
"""Returns list of (id, name, query)."""
|
"""Returns list of (id, name, query)."""
|
||||||
@ -643,5 +673,5 @@ class Database:
|
|||||||
return [(r["id"], r["name"], r["query"]) for r in rows]
|
return [(r["id"], r["name"], r["query"]) for r in rows]
|
||||||
|
|
||||||
def remove_saved_search(self, search_id: int) -> None:
|
def remove_saved_search(self, search_id: int) -> None:
|
||||||
self.conn.execute("DELETE FROM saved_searches WHERE id = ?", (search_id,))
|
with self._write():
|
||||||
self.conn.commit()
|
self.conn.execute("DELETE FROM saved_searches WHERE id = ?", (search_id,))
|
||||||
|
|||||||
@ -312,10 +312,20 @@ class BooruApp(QMainWindow):
|
|||||||
self._async_thread = threading.Thread(target=self._async_loop.run_forever, daemon=True)
|
self._async_thread = threading.Thread(target=self._async_loop.run_forever, daemon=True)
|
||||||
self._async_thread.start()
|
self._async_thread.start()
|
||||||
|
|
||||||
|
# Register the persistent loop as the process-wide app loop. Anything
|
||||||
|
# that wants to schedule async work — `gui/sites.py`, `gui/bookmarks.py`,
|
||||||
|
# any future helper — calls `core.concurrency.run_on_app_loop` which
|
||||||
|
# uses this same loop. The whole point of PR2 is to never run async
|
||||||
|
# code on a throwaway loop again.
|
||||||
|
from ..core.concurrency import set_app_loop
|
||||||
|
set_app_loop(self._async_loop)
|
||||||
|
|
||||||
# Reset shared HTTP clients from previous session
|
# Reset shared HTTP clients from previous session
|
||||||
from ..core.cache import _get_shared_client
|
|
||||||
from ..core.api.base import BooruClient
|
from ..core.api.base import BooruClient
|
||||||
|
from ..core.api.e621 import E621Client
|
||||||
BooruClient._shared_client = None
|
BooruClient._shared_client = None
|
||||||
|
E621Client._e621_client = None
|
||||||
|
E621Client._e621_to_close = []
|
||||||
import booru_viewer.core.cache as _cache_mod
|
import booru_viewer.core.cache as _cache_mod
|
||||||
_cache_mod._shared_client = None
|
_cache_mod._shared_client = None
|
||||||
|
|
||||||
@ -2927,6 +2937,26 @@ class BooruApp(QMainWindow):
|
|||||||
self._save_main_splitter_sizes()
|
self._save_main_splitter_sizes()
|
||||||
self._save_right_splitter_sizes()
|
self._save_right_splitter_sizes()
|
||||||
self._save_main_window_state()
|
self._save_main_window_state()
|
||||||
|
|
||||||
|
# Cleanly shut the shared httpx pools down BEFORE stopping the loop
|
||||||
|
# so the connection pool / keepalive sockets / TLS state get released
|
||||||
|
# instead of being abandoned mid-flight. Has to run on the loop the
|
||||||
|
# clients were bound to.
|
||||||
|
try:
|
||||||
|
from ..core.api.base import BooruClient
|
||||||
|
from ..core.api.e621 import E621Client
|
||||||
|
from ..core.cache import aclose_shared_client
|
||||||
|
|
||||||
|
async def _close_all():
|
||||||
|
await BooruClient.aclose_shared()
|
||||||
|
await E621Client.aclose_shared()
|
||||||
|
await aclose_shared_client()
|
||||||
|
|
||||||
|
fut = asyncio.run_coroutine_threadsafe(_close_all(), self._async_loop)
|
||||||
|
fut.result(timeout=5)
|
||||||
|
except Exception as e:
|
||||||
|
log.warning(f"Shared httpx aclose failed: {e}")
|
||||||
|
|
||||||
self._async_loop.call_soon_threadsafe(self._async_loop.stop)
|
self._async_loop.call_soon_threadsafe(self._async_loop.stop)
|
||||||
self._async_thread.join(timeout=2)
|
self._async_thread.join(timeout=2)
|
||||||
if self._db.get_setting_bool("clear_cache_on_exit"):
|
if self._db.get_setting_bool("clear_cache_on_exit"):
|
||||||
|
|||||||
@ -3,8 +3,6 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import threading
|
|
||||||
import asyncio
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from PySide6.QtCore import Qt, Signal, QObject, QTimer
|
from PySide6.QtCore import Qt, Signal, QObject, QTimer
|
||||||
@ -25,6 +23,7 @@ from PySide6.QtWidgets import (
|
|||||||
|
|
||||||
from ..core.db import Database, Bookmark
|
from ..core.db import Database, Bookmark
|
||||||
from ..core.cache import download_thumbnail
|
from ..core.cache import download_thumbnail
|
||||||
|
from ..core.concurrency import run_on_app_loop
|
||||||
from .grid import ThumbnailGrid
|
from .grid import ThumbnailGrid
|
||||||
|
|
||||||
log = logging.getLogger("booru")
|
log = logging.getLogger("booru")
|
||||||
@ -173,13 +172,18 @@ class BookmarksView(QWidget):
|
|||||||
thumb.set_pixmap(pix)
|
thumb.set_pixmap(pix)
|
||||||
|
|
||||||
def _load_thumb_async(self, index: int, url: str) -> None:
|
def _load_thumb_async(self, index: int, url: str) -> None:
|
||||||
|
# Schedule the download on the persistent event loop instead of
|
||||||
|
# spawning a daemon thread that runs its own throwaway loop. This
|
||||||
|
# is the fix for the loop-affinity bug where the cache module's
|
||||||
|
# shared httpx client would get bound to the throwaway loop and
|
||||||
|
# then fail every subsequent use from the persistent loop.
|
||||||
async def _dl():
|
async def _dl():
|
||||||
try:
|
try:
|
||||||
path = await download_thumbnail(url)
|
path = await download_thumbnail(url)
|
||||||
self._signals.thumb_ready.emit(index, str(path))
|
self._signals.thumb_ready.emit(index, str(path))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.warning(f"Bookmark thumb {index} failed: {e}")
|
log.warning(f"Bookmark thumb {index} failed: {e}")
|
||||||
threading.Thread(target=lambda: asyncio.run(_dl()), daemon=True).start()
|
run_on_app_loop(_dl())
|
||||||
|
|
||||||
def _on_thumb_ready(self, index: int, path: str) -> None:
|
def _on_thumb_ready(self, index: int, path: str) -> None:
|
||||||
thumbs = self._grid._thumbs
|
thumbs = self._grid._thumbs
|
||||||
|
|||||||
@ -2,10 +2,7 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import asyncio
|
from PySide6.QtCore import Qt, Signal
|
||||||
import threading
|
|
||||||
|
|
||||||
from PySide6.QtCore import Qt, Signal, QMetaObject, Q_ARG, Qt as QtNS
|
|
||||||
from PySide6.QtWidgets import (
|
from PySide6.QtWidgets import (
|
||||||
QDialog,
|
QDialog,
|
||||||
QVBoxLayout,
|
QVBoxLayout,
|
||||||
@ -22,16 +19,34 @@ from PySide6.QtWidgets import (
|
|||||||
|
|
||||||
from ..core.db import Database, Site
|
from ..core.db import Database, Site
|
||||||
from ..core.api.detect import detect_site_type
|
from ..core.api.detect import detect_site_type
|
||||||
|
from ..core.concurrency import run_on_app_loop
|
||||||
|
|
||||||
|
|
||||||
class SiteDialog(QDialog):
|
class SiteDialog(QDialog):
|
||||||
"""Dialog to add or edit a booru site."""
|
"""Dialog to add or edit a booru site."""
|
||||||
|
|
||||||
|
# Internal signals used to marshal worker results back to the GUI thread.
|
||||||
|
# Connected with QueuedConnection so emit() from the asyncio loop thread
|
||||||
|
# is always delivered on the Qt main thread.
|
||||||
|
_detect_done_sig = Signal(object, object) # (result_or_None, error_or_None)
|
||||||
|
_test_done_sig = Signal(bool, str)
|
||||||
|
|
||||||
def __init__(self, parent: QWidget | None = None, site: Site | None = None) -> None:
|
def __init__(self, parent: QWidget | None = None, site: Site | None = None) -> None:
|
||||||
super().__init__(parent)
|
super().__init__(parent)
|
||||||
self._editing = site is not None
|
self._editing = site is not None
|
||||||
self.setWindowTitle("Edit Site" if self._editing else "Add Site")
|
self.setWindowTitle("Edit Site" if self._editing else "Add Site")
|
||||||
self.setMinimumWidth(400)
|
self.setMinimumWidth(400)
|
||||||
|
# Set when the dialog is closed/destroyed so in-flight worker
|
||||||
|
# callbacks can short-circuit instead of poking a dead QObject.
|
||||||
|
self._closed = False
|
||||||
|
# Tracked so we can cancel pending coroutines on close.
|
||||||
|
self._inflight = [] # list[concurrent.futures.Future]
|
||||||
|
self._detect_done_sig.connect(
|
||||||
|
self._detect_finished, Qt.ConnectionType.QueuedConnection
|
||||||
|
)
|
||||||
|
self._test_done_sig.connect(
|
||||||
|
self._test_finished, Qt.ConnectionType.QueuedConnection
|
||||||
|
)
|
||||||
|
|
||||||
layout = QVBoxLayout(self)
|
layout = QVBoxLayout(self)
|
||||||
|
|
||||||
@ -102,16 +117,22 @@ class SiteDialog(QDialog):
|
|||||||
api_key = self._key_input.text().strip() or None
|
api_key = self._key_input.text().strip() or None
|
||||||
api_user = self._user_input.text().strip() or None
|
api_user = self._user_input.text().strip() or None
|
||||||
|
|
||||||
def _run():
|
async def _do_detect():
|
||||||
try:
|
try:
|
||||||
result = asyncio.run(detect_site_type(url, api_key=api_key, api_user=api_user))
|
result = await detect_site_type(url, api_key=api_key, api_user=api_user)
|
||||||
self._detect_finished(result, None)
|
if not self._closed:
|
||||||
|
self._detect_done_sig.emit(result, None)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self._detect_finished(None, e)
|
if not self._closed:
|
||||||
|
self._detect_done_sig.emit(None, e)
|
||||||
|
|
||||||
threading.Thread(target=_run, daemon=True).start()
|
fut = run_on_app_loop(_do_detect())
|
||||||
|
self._inflight.append(fut)
|
||||||
|
fut.add_done_callback(lambda f: self._inflight.remove(f) if f in self._inflight else None)
|
||||||
|
|
||||||
def _detect_finished(self, result: str | None, error: Exception | None) -> None:
|
def _detect_finished(self, result, error) -> None:
|
||||||
|
if self._closed:
|
||||||
|
return
|
||||||
self._detect_btn.setEnabled(True)
|
self._detect_btn.setEnabled(True)
|
||||||
if error:
|
if error:
|
||||||
self._status_label.setText(f"Error: {error}")
|
self._status_label.setText(f"Error: {error}")
|
||||||
@ -132,25 +153,42 @@ class SiteDialog(QDialog):
|
|||||||
self._status_label.setText("Testing connection...")
|
self._status_label.setText("Testing connection...")
|
||||||
self._test_btn.setEnabled(False)
|
self._test_btn.setEnabled(False)
|
||||||
|
|
||||||
def _run():
|
from ..core.api.detect import client_for_type
|
||||||
import asyncio
|
|
||||||
from ..core.api.detect import client_for_type
|
async def _do_test():
|
||||||
try:
|
try:
|
||||||
client = client_for_type(api_type, url, api_key=api_key, api_user=api_user)
|
client = client_for_type(api_type, url, api_key=api_key, api_user=api_user)
|
||||||
ok, detail = asyncio.run(client.test_connection())
|
ok, detail = await client.test_connection()
|
||||||
self._test_finished(ok, detail)
|
if not self._closed:
|
||||||
|
self._test_done_sig.emit(ok, detail)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self._test_finished(False, str(e))
|
if not self._closed:
|
||||||
|
self._test_done_sig.emit(False, str(e))
|
||||||
|
|
||||||
threading.Thread(target=_run, daemon=True).start()
|
fut = run_on_app_loop(_do_test())
|
||||||
|
self._inflight.append(fut)
|
||||||
|
fut.add_done_callback(lambda f: self._inflight.remove(f) if f in self._inflight else None)
|
||||||
|
|
||||||
def _test_finished(self, ok: bool, detail: str) -> None:
|
def _test_finished(self, ok: bool, detail: str) -> None:
|
||||||
|
if self._closed:
|
||||||
|
return
|
||||||
self._test_btn.setEnabled(True)
|
self._test_btn.setEnabled(True)
|
||||||
if ok:
|
if ok:
|
||||||
self._status_label.setText(f"Connected! {detail}")
|
self._status_label.setText(f"Connected! {detail}")
|
||||||
else:
|
else:
|
||||||
self._status_label.setText(f"Failed: {detail}")
|
self._status_label.setText(f"Failed: {detail}")
|
||||||
|
|
||||||
|
def closeEvent(self, event) -> None:
|
||||||
|
# Mark closed first so in-flight callbacks short-circuit, then
|
||||||
|
# cancel anything still pending so we don't tie up the loop.
|
||||||
|
self._closed = True
|
||||||
|
for fut in list(self._inflight):
|
||||||
|
try:
|
||||||
|
fut.cancel()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
super().closeEvent(event)
|
||||||
|
|
||||||
def _try_parse_url(self, text: str) -> None:
|
def _try_parse_url(self, text: str) -> None:
|
||||||
"""Strip query params from pasted URLs like https://gelbooru.com/index.php?page=post&s=list&tags=all."""
|
"""Strip query params from pasted URLs like https://gelbooru.com/index.php?page=post&s=list&tags=all."""
|
||||||
from urllib.parse import urlparse, parse_qs
|
from urllib.parse import urlparse, parse_qs
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user