diff --git a/.gitignore b/.gitignore index 3683245..348a9cd 100644 --- a/.gitignore +++ b/.gitignore @@ -9,5 +9,6 @@ build/ venv/ docs/ project.md +tests/ *.bak/ *.dll diff --git a/README.md b/README.md index 9ea4862..7ff8d5e 100644 --- a/README.md +++ b/README.md @@ -147,12 +147,6 @@ booru-viewer Or without installing: `python3 -m booru_viewer.main_gui` -**Run tests:** -```sh -pip install -e ".[test]" -pytest tests/ -``` - **Desktop entry:** To add booru-viewer to your app launcher, create `~/.local/share/applications/booru-viewer.desktop`: ```ini [Desktop Entry] diff --git a/pyproject.toml b/pyproject.toml index f0320f8..f2e6eb5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,16 +14,6 @@ dependencies = [ "python-mpv>=1.0", ] -[project.optional-dependencies] -test = [ - "pytest>=8.0", - "pytest-asyncio>=0.23", -] - -[tool.pytest.ini_options] -asyncio_mode = "auto" -testpaths = ["tests"] - [project.scripts] booru-viewer = "booru_viewer.main_gui:main" diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/conftest.py b/tests/conftest.py deleted file mode 100644 index 4c10b49..0000000 --- a/tests/conftest.py +++ /dev/null @@ -1,71 +0,0 @@ -"""Shared fixtures for the booru-viewer test suite. - -All fixtures here are pure-Python — no Qt, no mpv, no network. Filesystem -writes go through `tmp_path` (or fixtures that wrap it). Module-level globals -that the production code mutates (the concurrency loop, the httpx singletons) -get reset around each test that touches them. -""" - -from __future__ import annotations - -import pytest - - -@pytest.fixture -def tmp_db(tmp_path): - """Fresh `Database` instance writing to a temp file. Auto-closes.""" - from booru_viewer.core.db import Database - db = Database(tmp_path / "test.db") - yield db - db.close() - - -@pytest.fixture -def tmp_library(tmp_path): - """Point `saved_dir()` at `tmp_path/saved` for the duration of the test. - - Uses `core.config.set_library_dir` (the official override hook) so the - redirect goes through the same code path the GUI uses for the - user-configurable library location. Tear-down restores the previous - value so tests can run in any order without bleed. - """ - from booru_viewer.core import config - saved = tmp_path / "saved" - saved.mkdir() - original = config._library_dir_override - config.set_library_dir(saved) - yield saved - config.set_library_dir(original) - - -@pytest.fixture -def reset_app_loop(): - """Reset `concurrency._app_loop` between tests. - - The module global is set once at app startup in production; tests need - to start from a clean slate to assert the unset-state behavior. - """ - from booru_viewer.core import concurrency - original = concurrency._app_loop - concurrency._app_loop = None - yield - concurrency._app_loop = original - - -@pytest.fixture -def reset_shared_clients(): - """Reset both shared httpx singletons (cache module + BooruClient class). - - Both are class/module-level globals; tests that exercise the lazy-init - + lock pattern need them cleared so the test sees a fresh first-call - race instead of a leftover instance from a previous test. - """ - from booru_viewer.core.api.base import BooruClient - from booru_viewer.core import cache - original_booru = BooruClient._shared_client - original_cache = cache._shared_client - BooruClient._shared_client = None - cache._shared_client = None - yield - BooruClient._shared_client = original_booru - cache._shared_client = original_cache diff --git a/tests/core/__init__.py b/tests/core/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/core/api/__init__.py b/tests/core/api/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/core/api/test_base.py b/tests/core/api/test_base.py deleted file mode 100644 index 330b9c0..0000000 --- a/tests/core/api/test_base.py +++ /dev/null @@ -1,77 +0,0 @@ -"""Tests for `booru_viewer.core.api.base` — the lazy `_shared_client` -singleton on `BooruClient`. - -Locks in the lock-and-recheck pattern at `base.py:90-108`. Without it, -two threads racing on first `.client` access would both see -`_shared_client is None`, both build an `httpx.AsyncClient`, and one of -them would leak (overwritten without aclose). -""" - -from __future__ import annotations - -import threading -from unittest.mock import patch, MagicMock - -import pytest - -from booru_viewer.core.api.base import BooruClient - - -class _StubClient(BooruClient): - """Concrete subclass so we can instantiate `BooruClient` for the test - — the base class has abstract `search` / `get_post` methods.""" - api_type = "stub" - - async def search(self, tags="", page=1, limit=40): - return [] - - async def get_post(self, post_id): - return None - - -def test_shared_client_singleton_under_concurrency(reset_shared_clients): - """N threads racing on first `.client` access must result in exactly - one `httpx.AsyncClient` constructor call. The threading.Lock guards - the check-and-set so the second-and-later callers re-read the now-set - `_shared_client` after acquiring the lock instead of building their - own.""" - constructor_calls = 0 - constructor_lock = threading.Lock() - - def _fake_async_client(*args, **kwargs): - nonlocal constructor_calls - with constructor_lock: - constructor_calls += 1 - m = MagicMock() - m.is_closed = False - return m - - # Barrier so all threads hit the property at the same moment - n_threads = 10 - barrier = threading.Barrier(n_threads) - results = [] - results_lock = threading.Lock() - - client_instance = _StubClient("http://example.test") - - def _worker(): - barrier.wait() - c = client_instance.client - with results_lock: - results.append(c) - - with patch("booru_viewer.core.api.base.httpx.AsyncClient", - side_effect=_fake_async_client): - threads = [threading.Thread(target=_worker) for _ in range(n_threads)] - for t in threads: - t.start() - for t in threads: - t.join(timeout=5) - - assert constructor_calls == 1, ( - f"Expected exactly one httpx.AsyncClient construction, " - f"got {constructor_calls}" - ) - # All threads got back the same shared instance - assert len(results) == n_threads - assert all(r is results[0] for r in results) diff --git a/tests/core/test_cache.py b/tests/core/test_cache.py deleted file mode 100644 index 87393c9..0000000 --- a/tests/core/test_cache.py +++ /dev/null @@ -1,224 +0,0 @@ -"""Tests for `booru_viewer.core.cache` — Referer hostname matching, ugoira -zip-bomb defenses, download size caps, and validity-check fallback. - -Locks in: -- `_referer_for` proper hostname suffix matching (`54ccc40` security fix) - guarding against `imgblahgelbooru.attacker.com` mapping to gelbooru.com -- `_convert_ugoira_to_gif` cap enforcement (frame count + uncompressed size) - before any decompression — defense against ugoira zip bombs -- `_do_download` MAX_DOWNLOAD_BYTES enforcement, both the Content-Length - pre-check and the running-total chunk-loop guard -- `_is_valid_media` returning True on OSError so a transient EBUSY/lock - doesn't kick off a delete + re-download loop -""" - -from __future__ import annotations - -import asyncio -import io -import zipfile -from pathlib import Path -from unittest.mock import patch -from urllib.parse import urlparse - -import pytest - -from booru_viewer.core import cache -from booru_viewer.core.cache import ( - MAX_DOWNLOAD_BYTES, - _convert_ugoira_to_gif, - _do_download, - _is_valid_media, - _referer_for, -) - - -# -- _referer_for hostname suffix matching -- - -def test_referer_for_exact_and_suffix_match(): - """Real booru hostnames map to the canonical Referer for their CDN. - - Exact match and subdomain-suffix match both rewrite the Referer host - to the canonical apex (gelbooru → `gelbooru.com`, donmai → - `danbooru.donmai.us`). The actual request netloc is dropped — the - point is to look like a navigation from the canonical site. - """ - # gelbooru exact host - assert _referer_for(urlparse("https://gelbooru.com/index.php")) \ - == "https://gelbooru.com/" - # gelbooru subdomain rewrites to the canonical apex - assert _referer_for(urlparse("https://img3.gelbooru.com/images/abc.jpg")) \ - == "https://gelbooru.com/" - - # donmai exact host - assert _referer_for(urlparse("https://donmai.us/posts/123")) \ - == "https://danbooru.donmai.us/" - # donmai subdomain rewrites to the canonical danbooru host - assert _referer_for(urlparse("https://safebooru.donmai.us/posts/123")) \ - == "https://danbooru.donmai.us/" - - -def test_referer_for_rejects_substring_attacker(): - """An attacker host that contains `gelbooru.com` or `donmai.us` as a - SUBSTRING (not a hostname suffix) must NOT pick up the booru Referer. - - Without proper suffix matching, `imgblahgelbooru.attacker.com` would - leak the gelbooru Referer to the attacker — that's the `54ccc40` - security fix. - """ - # Attacker host that ends with attacker-controlled TLD - parsed = urlparse("https://imgblahgelbooru.attacker.com/x.jpg") - referer = _referer_for(parsed) - assert "gelbooru.com" not in referer - assert "imgblahgelbooru.attacker.com" in referer - - parsed = urlparse("https://donmai.us.attacker.com/x.jpg") - referer = _referer_for(parsed) - assert "danbooru.donmai.us" not in referer - assert "donmai.us.attacker.com" in referer - - # Completely unrelated host preserved as-is - parsed = urlparse("https://example.test/x.jpg") - assert _referer_for(parsed) == "https://example.test/" - - -# -- Ugoira zip-bomb defenses -- - -def _build_ugoira_zip(path: Path, n_frames: int, frame_bytes: bytes = b"x") -> Path: - """Build a synthetic ugoira-shaped zip with `n_frames` numbered .jpg - entries. Content is whatever the caller passes; defaults to 1 byte. - - The cap-enforcement tests don't need decodable JPEGs — the cap fires - before any decode happens. The filenames just need .jpg suffixes so - `_convert_ugoira_to_gif` recognizes them as frames. - """ - with zipfile.ZipFile(path, "w") as zf: - for i in range(n_frames): - zf.writestr(f"{i:04d}.jpg", frame_bytes) - return path - - -def test_ugoira_frame_count_cap_rejects_bomb(tmp_path, monkeypatch): - """A zip with more than `UGOIRA_MAX_FRAMES` frames must be refused - BEFORE any decompression. We monkeypatch the cap down so the test - builds a tiny zip instead of a 5001-entry one — the cap check is - cap > N, not cap == 5000.""" - monkeypatch.setattr(cache, "UGOIRA_MAX_FRAMES", 2) - zip_path = _build_ugoira_zip(tmp_path / "bomb.zip", n_frames=3) - gif_path = zip_path.with_suffix(".gif") - - result = _convert_ugoira_to_gif(zip_path) - - # Function returned the original zip (refusal path) - assert result == zip_path - # No .gif was written - assert not gif_path.exists() - - -def test_ugoira_uncompressed_size_cap_rejects_bomb(tmp_path, monkeypatch): - """A zip whose `ZipInfo.file_size` headers sum past - `UGOIRA_MAX_UNCOMPRESSED_BYTES` must be refused before decompression. - Same monkeypatch trick to keep the test data small.""" - monkeypatch.setattr(cache, "UGOIRA_MAX_UNCOMPRESSED_BYTES", 50) - # Three 100-byte frames → 300 total > 50 cap - zip_path = _build_ugoira_zip( - tmp_path / "bomb.zip", n_frames=3, frame_bytes=b"x" * 100 - ) - gif_path = zip_path.with_suffix(".gif") - - result = _convert_ugoira_to_gif(zip_path) - - assert result == zip_path - assert not gif_path.exists() - - -# -- _do_download MAX_DOWNLOAD_BYTES caps -- - - -class _FakeHeaders: - def __init__(self, mapping): - self._m = mapping - def get(self, key, default=None): - return self._m.get(key.lower(), default) - - -class _FakeResponse: - def __init__(self, headers, chunks): - self.headers = _FakeHeaders({k.lower(): v for k, v in headers.items()}) - self._chunks = chunks - def raise_for_status(self): - pass - async def aiter_bytes(self, _size): - for chunk in self._chunks: - yield chunk - - -class _FakeStreamCtx: - def __init__(self, response): - self._resp = response - async def __aenter__(self): - return self._resp - async def __aexit__(self, *_args): - return False - - -class _FakeClient: - def __init__(self, response): - self._resp = response - def stream(self, _method, _url, headers=None): - return _FakeStreamCtx(self._resp) - - -def test_download_cap_content_length_pre_check(tmp_path): - """When the server advertises a Content-Length larger than - MAX_DOWNLOAD_BYTES, `_do_download` must raise BEFORE iterating any - bytes. This is the cheap pre-check that protects against the trivial - OOM/disk-fill attack — we don't even start streaming.""" - too_big = MAX_DOWNLOAD_BYTES + 1 - response = _FakeResponse( - headers={"content-type": "image/jpeg", "content-length": str(too_big)}, - chunks=[b"never read"], - ) - client = _FakeClient(response) - local = tmp_path / "out.jpg" - - with pytest.raises(ValueError, match="Download too large"): - asyncio.run(_do_download(client, "http://example.test/x.jpg", {}, local, None)) - - # No file should have been written - assert not local.exists() - - -def test_download_cap_running_total_aborts(tmp_path, monkeypatch): - """Servers can lie about Content-Length. The chunk loop must enforce - the running-total cap independently and abort mid-stream as soon as - cumulative bytes exceed `MAX_DOWNLOAD_BYTES`. We monkeypatch the cap - down to 1024 to keep the test fast.""" - monkeypatch.setattr(cache, "MAX_DOWNLOAD_BYTES", 1024) - # Advertise 0 (unknown) so the small-payload branch runs and the - # running-total guard inside the chunk loop is what fires. - response = _FakeResponse( - headers={"content-type": "image/jpeg", "content-length": "0"}, - chunks=[b"x" * 600, b"x" * 600], # 1200 total > 1024 cap - ) - client = _FakeClient(response) - local = tmp_path / "out.jpg" - - with pytest.raises(ValueError, match="exceeded cap mid-stream"): - asyncio.run(_do_download(client, "http://example.test/x.jpg", {}, local, None)) - - # The buffered-write path only writes after the loop finishes, so the - # mid-stream abort means no file lands on disk. - assert not local.exists() - - -# -- _is_valid_media OSError fallback -- - -def test_is_valid_media_returns_true_on_oserror(tmp_path): - """If the file can't be opened (transient EBUSY, lock, permissions), - `_is_valid_media` must return True so the caller doesn't delete the - cached file. The previous behavior of returning False kicked off a - delete + re-download loop on every access while the underlying - OS issue persisted.""" - nonexistent = tmp_path / "definitely-not-here.jpg" - assert _is_valid_media(nonexistent) is True diff --git a/tests/core/test_concurrency.py b/tests/core/test_concurrency.py deleted file mode 100644 index bb3bff0..0000000 --- a/tests/core/test_concurrency.py +++ /dev/null @@ -1,62 +0,0 @@ -"""Tests for `booru_viewer.core.concurrency` — the persistent-loop handle. - -Locks in: -- `get_app_loop` raises a clear RuntimeError if `set_app_loop` was never - called (the production code uses this to bail loudly when async work - is scheduled before the loop thread starts) -- `run_on_app_loop` round-trips a coroutine result from a worker-thread - loop back to the calling thread via `concurrent.futures.Future` -""" - -from __future__ import annotations - -import asyncio -import threading - -import pytest - -from booru_viewer.core import concurrency -from booru_viewer.core.concurrency import ( - get_app_loop, - run_on_app_loop, - set_app_loop, -) - - -def test_get_app_loop_raises_before_set(reset_app_loop): - """Calling `get_app_loop` before `set_app_loop` is a configuration - error — the production code expects a clear RuntimeError so callers - bail loudly instead of silently scheduling work onto a None loop.""" - with pytest.raises(RuntimeError, match="not initialized"): - get_app_loop() - - -def test_run_on_app_loop_round_trips_result(reset_app_loop): - """Spin up a real asyncio loop in a worker thread, register it via - `set_app_loop`, then from the test (main) thread schedule a coroutine - via `run_on_app_loop` and assert the result comes back through the - `concurrent.futures.Future` interface.""" - loop = asyncio.new_event_loop() - ready = threading.Event() - - def _run_loop(): - asyncio.set_event_loop(loop) - ready.set() - loop.run_forever() - - t = threading.Thread(target=_run_loop, daemon=True) - t.start() - ready.wait(timeout=2) - - try: - set_app_loop(loop) - - async def _produce(): - return 42 - - fut = run_on_app_loop(_produce()) - assert fut.result(timeout=2) == 42 - finally: - loop.call_soon_threadsafe(loop.stop) - t.join(timeout=2) - loop.close() diff --git a/tests/core/test_config.py b/tests/core/test_config.py deleted file mode 100644 index b5ef43c..0000000 --- a/tests/core/test_config.py +++ /dev/null @@ -1,57 +0,0 @@ -"""Tests for `booru_viewer.core.config` — path traversal guard on -`saved_folder_dir` and the shallow walk in `find_library_files`. - -Locks in: -- `saved_folder_dir` resolve-and-relative_to check (`54ccc40` defense in - depth alongside `_validate_folder_name`) -- `find_library_files` matching exactly the root + 1-level subdirectory - layout that the library uses, with the right MEDIA_EXTENSIONS filter -""" - -from __future__ import annotations - -import pytest - -from booru_viewer.core import config -from booru_viewer.core.config import find_library_files, saved_folder_dir - - -# -- saved_folder_dir traversal guard -- - -def test_saved_folder_dir_rejects_dotdot(tmp_library): - """`..` and any path that resolves outside `saved_dir()` must raise - ValueError, not silently mkdir somewhere unexpected. We test literal - `..` shapes only — symlink escapes are filesystem-dependent and - flaky in tests.""" - with pytest.raises(ValueError, match="escapes saved directory"): - saved_folder_dir("..") - with pytest.raises(ValueError, match="escapes saved directory"): - saved_folder_dir("../escape") - with pytest.raises(ValueError, match="escapes saved directory"): - saved_folder_dir("foo/../..") - - -# -- find_library_files shallow walk -- - -def test_find_library_files_walks_root_and_one_level(tmp_library): - """Library has a flat shape: `saved/.` at the root, or - `saved//.` one level deep. The walk must: - - find matches at both depths - - filter by MEDIA_EXTENSIONS (skip .txt and other non-media) - - filter by exact stem (skip unrelated post ids) - """ - # Root-level match - (tmp_library / "123.jpg").write_bytes(b"") - # One-level subfolder match - (tmp_library / "folder1").mkdir() - (tmp_library / "folder1" / "123.png").write_bytes(b"") - # Different post id — must be excluded - (tmp_library / "folder2").mkdir() - (tmp_library / "folder2" / "456.gif").write_bytes(b"") - # Wrong extension — must be excluded even with the right stem - (tmp_library / "123.txt").write_bytes(b"") - - matches = find_library_files(123) - match_names = {p.name for p in matches} - - assert match_names == {"123.jpg", "123.png"} diff --git a/tests/core/test_db.py b/tests/core/test_db.py deleted file mode 100644 index 3543356..0000000 --- a/tests/core/test_db.py +++ /dev/null @@ -1,98 +0,0 @@ -"""Tests for `booru_viewer.core.db` — folder name validation, INSERT OR -IGNORE collision handling, and LIKE escaping. - -These tests lock in the `54ccc40` security/correctness fixes: -- `_validate_folder_name` rejects path-traversal shapes before they hit the - filesystem in `saved_folder_dir` -- `add_bookmark` re-SELECTs the actual row id after an INSERT OR IGNORE - collision so the returned `Bookmark.id` is never the bogus 0 that broke - `update_bookmark_cache_path` -- `get_bookmarks` escapes the SQL LIKE wildcards `_` and `%` so a search for - `cat_ear` doesn't bleed into `catear` / `catXear` -""" - -from __future__ import annotations - -import pytest - -from booru_viewer.core.db import _validate_folder_name - - -# -- _validate_folder_name -- - -def test_validate_folder_name_rejects_traversal(): - """Every shape that could escape the saved-images dir or hit a hidden - file must raise ValueError. One assertion per rejection rule so a - failure points at the exact case.""" - with pytest.raises(ValueError): - _validate_folder_name("") # empty - with pytest.raises(ValueError): - _validate_folder_name("..") # dotdot literal - with pytest.raises(ValueError): - _validate_folder_name(".") # dot literal - with pytest.raises(ValueError): - _validate_folder_name("/foo") # forward slash - with pytest.raises(ValueError): - _validate_folder_name("foo/bar") # embedded forward slash - with pytest.raises(ValueError): - _validate_folder_name("\\foo") # backslash - with pytest.raises(ValueError): - _validate_folder_name(".hidden") # leading dot - with pytest.raises(ValueError): - _validate_folder_name("~user") # leading tilde - - -def test_validate_folder_name_accepts_unicode_and_punctuation(): - """Common real-world folder names must pass through unchanged. The - guard is meant to block escape shapes, not normal naming.""" - assert _validate_folder_name("miku(lewd)") == "miku(lewd)" - assert _validate_folder_name("cat ear") == "cat ear" - assert _validate_folder_name("日本語") == "日本語" - assert _validate_folder_name("foo-bar") == "foo-bar" - assert _validate_folder_name("foo.bar") == "foo.bar" # dot OK if not leading - - -# -- add_bookmark INSERT OR IGNORE collision -- - -def test_add_bookmark_collision_returns_existing_id(tmp_db): - """Calling `add_bookmark` twice with the same (site_id, post_id) must - return the same row id on the second call, not the stale `lastrowid` - of 0 that INSERT OR IGNORE leaves behind. Without the re-SELECT fix, - any downstream `update_bookmark_cache_path(id=0, ...)` silently - no-ops, breaking the cache-path linkage.""" - site = tmp_db.add_site("test", "http://example.test", "danbooru") - bm1 = tmp_db.add_bookmark( - site_id=site.id, post_id=42, file_url="http://example.test/42.jpg", - preview_url=None, tags="cat", - ) - bm2 = tmp_db.add_bookmark( - site_id=site.id, post_id=42, file_url="http://example.test/42.jpg", - preview_url=None, tags="cat", - ) - assert bm1.id != 0 - assert bm2.id == bm1.id - - -# -- get_bookmarks LIKE escaping -- - -def test_get_bookmarks_like_escaping(tmp_db): - """A search for the literal tag `cat_ear` must NOT match `catear` or - `catXear`. SQLite's LIKE treats `_` as a single-char wildcard unless - explicitly escaped — without `ESCAPE '\\\\'` the search would return - all three rows.""" - site = tmp_db.add_site("test", "http://example.test", "danbooru") - tmp_db.add_bookmark( - site_id=site.id, post_id=1, file_url="http://example.test/1.jpg", - preview_url=None, tags="cat_ear", - ) - tmp_db.add_bookmark( - site_id=site.id, post_id=2, file_url="http://example.test/2.jpg", - preview_url=None, tags="catear", - ) - tmp_db.add_bookmark( - site_id=site.id, post_id=3, file_url="http://example.test/3.jpg", - preview_url=None, tags="catXear", - ) - results = tmp_db.get_bookmarks(search="cat_ear") - tags_returned = {b.tags for b in results} - assert tags_returned == {"cat_ear"} diff --git a/tests/gui/__init__.py b/tests/gui/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/gui/popout/__init__.py b/tests/gui/popout/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/gui/popout/test_state.py b/tests/gui/popout/test_state.py deleted file mode 100644 index 8c891ba..0000000 --- a/tests/gui/popout/test_state.py +++ /dev/null @@ -1,661 +0,0 @@ -"""Pure-Python state machine tests for the popout viewer. - -Imports `booru_viewer.gui.popout.state` directly without standing up a -QApplication. The state machine module is required to be import-pure -(no PySide6, mpv, httpx, subprocess, or any module that imports them); -this test file is the forcing function. If state.py grows a Qt or mpv -import, these tests fail to collect and the test suite breaks. - -Test categories (from docs/POPOUT_REFACTOR_PLAN.md "Test plan"): - 1. Per-state transition tests - 2. Race-fix invariant tests (six structural fixes) - 3. Illegal transition tests - 4. Read-path query tests - -**Commit 3 expectation:** most tests fail because state.py's dispatch -handlers are stubs returning []. Tests progressively pass as commits -4-11 land transitions. The trivially-passing tests at commit 3 (initial -state, slider display read-path, terminal Closing guard) document the -parts of the skeleton that are already real. - -Refactor plan: docs/POPOUT_REFACTOR_PLAN.md -Architecture: docs/POPOUT_ARCHITECTURE.md -""" - -from __future__ import annotations - -import pytest - -from booru_viewer.gui.popout.state import ( - # Enums - InvalidTransition, - LoopMode, - MediaKind, - State, - StateMachine, - # Events - CloseRequested, - ContentArrived, - FullscreenToggled, - HyprlandDriftDetected, - LoopModeSet, - MuteToggleRequested, - NavigateRequested, - Open, - SeekCompleted, - SeekRequested, - TogglePlayRequested, - VideoEofReached, - VideoSizeKnown, - VideoStarted, - VolumeSet, - WindowMoved, - WindowResized, - # Effects - ApplyLoopMode, - ApplyMute, - ApplyVolume, - EmitClosed, - EmitNavigate, - EmitPlayNextRequested, - EnterFullscreen, - ExitFullscreen, - FitWindowToContent, - LoadImage, - LoadVideo, - SeekVideoTo, - StopMedia, -) -from booru_viewer.gui.popout.viewport import Viewport - - -# ---------------------------------------------------------------------- -# Helpers — direct field mutation for setup. Tests construct a fresh -# StateMachine and write the state field directly to skip the dispatch -# chain. This is a deliberate test-fixture-vs-production-code split: -# the tests don't depend on the dispatch chain being correct in order -# to test individual transitions. -# ---------------------------------------------------------------------- - - -def _new_in(state: State) -> StateMachine: - m = StateMachine() - m.state = state - return m - - -# ---------------------------------------------------------------------- -# Read-path queries (commit 2 — already passing) -# ---------------------------------------------------------------------- - - -def test_initial_state(): - m = StateMachine() - assert m.state == State.AWAITING_CONTENT - assert m.is_first_content_load is True - assert m.fullscreen is False - assert m.mute is False - assert m.volume == 50 - assert m.loop_mode == LoopMode.LOOP - assert m.viewport is None - assert m.seek_target_ms == 0 - - -def test_compute_slider_display_ms_passthrough_when_not_seeking(): - m = StateMachine() - m.state = State.PLAYING_VIDEO - assert m.compute_slider_display_ms(7500) == 7500 - - -def test_compute_slider_display_ms_pinned_when_seeking(): - m = StateMachine() - m.state = State.SEEKING_VIDEO - m.seek_target_ms = 7000 - # mpv's reported position can be anywhere; the slider must show - # the user's target while we're in SeekingVideo. - assert m.compute_slider_display_ms(5000) == 7000 - assert m.compute_slider_display_ms(7000) == 7000 - assert m.compute_slider_display_ms(9999) == 7000 - - -def test_dispatch_in_closing_returns_empty(): - """Closing is terminal — every event from Closing returns [] and - the state stays Closing.""" - m = _new_in(State.CLOSING) - for event in [ - NavigateRequested(direction=1), - ContentArrived("/x.jpg", "info", MediaKind.IMAGE), - VideoEofReached(), - SeekRequested(target_ms=1000), - CloseRequested(), - ]: - effects = m.dispatch(event) - assert effects == [] - assert m.state == State.CLOSING - - -# ---------------------------------------------------------------------- -# Per-state transition tests -# ---------------------------------------------------------------------- -# -# These all rely on the per-event handlers in state.py returning real -# effect lists. They fail at commit 3 (handlers are stubs returning []) -# and pass progressively as commits 4-11 land. - - -# -- AwaitingContent transitions -- - - -def test_awaiting_open_stashes_saved_geo(): - """Open event in AwaitingContent stashes saved_geo, saved_fullscreen, - monitor for the first ContentArrived to consume.""" - m = StateMachine() - effects = m.dispatch(Open(saved_geo=(100, 200, 800, 600), - saved_fullscreen=False, monitor="")) - assert m.state == State.AWAITING_CONTENT - assert m.saved_geo == (100, 200, 800, 600) - assert m.saved_fullscreen is False - assert effects == [] - - -def test_awaiting_content_arrived_image_loads_and_transitions(): - m = StateMachine() - effects = m.dispatch(ContentArrived( - path="/path/img.jpg", info="i", kind=MediaKind.IMAGE, - width=1920, height=1080, - )) - assert m.state == State.DISPLAYING_IMAGE - assert m.is_first_content_load is False - assert m.current_path == "/path/img.jpg" - assert any(isinstance(e, LoadImage) for e in effects) - assert any(isinstance(e, FitWindowToContent) for e in effects) - - -def test_awaiting_content_arrived_gif_loads_as_animated(): - m = StateMachine() - effects = m.dispatch(ContentArrived( - path="/path/anim.gif", info="i", kind=MediaKind.GIF, - width=480, height=480, - )) - assert m.state == State.DISPLAYING_IMAGE - load = next(e for e in effects if isinstance(e, LoadImage)) - assert load.is_gif is True - - -def test_awaiting_content_arrived_video_transitions_to_loading(): - m = StateMachine() - effects = m.dispatch(ContentArrived( - path="/path/v.mp4", info="i", kind=MediaKind.VIDEO, - width=1280, height=720, - )) - assert m.state == State.LOADING_VIDEO - assert any(isinstance(e, LoadVideo) for e in effects) - - -def test_awaiting_content_arrived_video_emits_persistence_effects(): - """First content load also emits ApplyMute / ApplyVolume / - ApplyLoopMode so the state machine's persistent values land in - the freshly-created mpv on PlayingVideo entry. (The skeleton - might emit these on LoadingVideo entry or on PlayingVideo entry — - either is acceptable as long as they fire before mpv consumes - the first frame.)""" - m = StateMachine() - m.mute = True - m.volume = 75 - effects = m.dispatch(ContentArrived( - path="/v.mp4", info="i", kind=MediaKind.VIDEO, - )) - # The plan says ApplyMute fires on PlayingVideo entry (commit 9), - # so this test will pass after commit 9 lands. Until then it - # documents the requirement. - assert any(isinstance(e, ApplyMute) and e.value is True for e in effects) or \ - m.state == State.LOADING_VIDEO # at least one of these - - -def test_awaiting_navigate_emits_navigate_only(): - """Navigate while waiting (e.g. user spamming Right while loading) - emits Navigate but doesn't re-stop nonexistent media.""" - m = StateMachine() - effects = m.dispatch(NavigateRequested(direction=1)) - assert m.state == State.AWAITING_CONTENT - assert any(isinstance(e, EmitNavigate) and e.direction == 1 - for e in effects) - # No StopMedia — nothing to stop - assert not any(isinstance(e, StopMedia) for e in effects) - - -# -- DisplayingImage transitions -- - - -def test_displaying_image_navigate_stops_and_emits(): - m = _new_in(State.DISPLAYING_IMAGE) - m.is_first_content_load = False - effects = m.dispatch(NavigateRequested(direction=-1)) - assert m.state == State.AWAITING_CONTENT - assert any(isinstance(e, StopMedia) for e in effects) - assert any(isinstance(e, EmitNavigate) and e.direction == -1 - for e in effects) - - -def test_displaying_image_content_replace_with_video(): - m = _new_in(State.DISPLAYING_IMAGE) - m.is_first_content_load = False - effects = m.dispatch(ContentArrived( - path="/v.mp4", info="i", kind=MediaKind.VIDEO, - )) - assert m.state == State.LOADING_VIDEO - assert any(isinstance(e, LoadVideo) for e in effects) - - -def test_displaying_image_content_replace_with_image(): - m = _new_in(State.DISPLAYING_IMAGE) - m.is_first_content_load = False - effects = m.dispatch(ContentArrived( - path="/img2.png", info="i", kind=MediaKind.IMAGE, - )) - assert m.state == State.DISPLAYING_IMAGE - assert any(isinstance(e, LoadImage) for e in effects) - - -# -- LoadingVideo transitions -- - - -def test_loading_video_started_transitions_to_playing(): - m = _new_in(State.LOADING_VIDEO) - effects = m.dispatch(VideoStarted()) - assert m.state == State.PLAYING_VIDEO - # Persistence effects fire on PlayingVideo entry - assert any(isinstance(e, ApplyMute) for e in effects) - assert any(isinstance(e, ApplyVolume) for e in effects) - assert any(isinstance(e, ApplyLoopMode) for e in effects) - - -def test_loading_video_eof_dropped(): - """RACE FIX: Stale EOF from previous video lands while we're - loading the new one. The stale event must be dropped without - transitioning state. Replaces the 250ms _eof_ignore_until - timestamp window from fda3b10b.""" - m = _new_in(State.LOADING_VIDEO) - effects = m.dispatch(VideoEofReached()) - assert m.state == State.LOADING_VIDEO - assert effects == [] - - -def test_loading_video_size_known_emits_fit(): - m = _new_in(State.LOADING_VIDEO) - m.viewport = Viewport(center_x=500, center_y=400, - long_side=800) - effects = m.dispatch(VideoSizeKnown(width=1920, height=1080)) - assert m.state == State.LOADING_VIDEO - assert any(isinstance(e, FitWindowToContent) for e in effects) - - -def test_loading_video_navigate_stops_and_emits(): - m = _new_in(State.LOADING_VIDEO) - effects = m.dispatch(NavigateRequested(direction=1)) - assert m.state == State.AWAITING_CONTENT - assert any(isinstance(e, StopMedia) for e in effects) - assert any(isinstance(e, EmitNavigate) for e in effects) - - -# -- PlayingVideo transitions -- - - -def test_playing_video_eof_loop_next_emits_play_next(): - m = _new_in(State.PLAYING_VIDEO) - m.loop_mode = LoopMode.NEXT - effects = m.dispatch(VideoEofReached()) - assert any(isinstance(e, EmitPlayNextRequested) for e in effects) - - -def test_playing_video_eof_loop_once_pauses(): - m = _new_in(State.PLAYING_VIDEO) - m.loop_mode = LoopMode.ONCE - effects = m.dispatch(VideoEofReached()) - # Once mode should NOT emit play_next; it pauses - assert not any(isinstance(e, EmitPlayNextRequested) for e in effects) - - -def test_playing_video_eof_loop_loop_no_op(): - """Loop=Loop is mpv-handled (loop-file=inf), so the eof event - arriving in the state machine should be a no-op.""" - m = _new_in(State.PLAYING_VIDEO) - m.loop_mode = LoopMode.LOOP - effects = m.dispatch(VideoEofReached()) - assert not any(isinstance(e, EmitPlayNextRequested) for e in effects) - - -def test_playing_video_seek_requested_transitions_and_pins(): - m = _new_in(State.PLAYING_VIDEO) - effects = m.dispatch(SeekRequested(target_ms=7500)) - assert m.state == State.SEEKING_VIDEO - assert m.seek_target_ms == 7500 - assert any(isinstance(e, SeekVideoTo) and e.target_ms == 7500 - for e in effects) - - -def test_playing_video_navigate_stops_and_emits(): - m = _new_in(State.PLAYING_VIDEO) - effects = m.dispatch(NavigateRequested(direction=1)) - assert m.state == State.AWAITING_CONTENT - assert any(isinstance(e, StopMedia) for e in effects) - assert any(isinstance(e, EmitNavigate) for e in effects) - - -def test_playing_video_size_known_refits(): - m = _new_in(State.PLAYING_VIDEO) - m.viewport = Viewport(center_x=500, center_y=400, long_side=800) - effects = m.dispatch(VideoSizeKnown(width=640, height=480)) - assert any(isinstance(e, FitWindowToContent) for e in effects) - - -def test_playing_video_toggle_play_emits_toggle(): - from booru_viewer.gui.popout.state import TogglePlay - m = _new_in(State.PLAYING_VIDEO) - effects = m.dispatch(TogglePlayRequested()) - assert m.state == State.PLAYING_VIDEO - assert any(isinstance(e, TogglePlay) for e in effects) - - -# -- SeekingVideo transitions -- - - -def test_seeking_video_completed_returns_to_playing(): - m = _new_in(State.SEEKING_VIDEO) - m.seek_target_ms = 5000 - effects = m.dispatch(SeekCompleted()) - assert m.state == State.PLAYING_VIDEO - - -def test_seeking_video_seek_requested_replaces_target(): - m = _new_in(State.SEEKING_VIDEO) - m.seek_target_ms = 5000 - effects = m.dispatch(SeekRequested(target_ms=8000)) - assert m.state == State.SEEKING_VIDEO - assert m.seek_target_ms == 8000 - assert any(isinstance(e, SeekVideoTo) and e.target_ms == 8000 - for e in effects) - - -def test_seeking_video_navigate_stops_and_emits(): - m = _new_in(State.SEEKING_VIDEO) - effects = m.dispatch(NavigateRequested(direction=1)) - assert m.state == State.AWAITING_CONTENT - assert any(isinstance(e, StopMedia) for e in effects) - - -def test_seeking_video_eof_dropped(): - """EOF during a seek is also stale — drop it.""" - m = _new_in(State.SEEKING_VIDEO) - effects = m.dispatch(VideoEofReached()) - assert m.state == State.SEEKING_VIDEO - assert effects == [] - - -# -- Closing (parametrized over source states) -- - - -@pytest.mark.parametrize("source_state", [ - State.AWAITING_CONTENT, - State.DISPLAYING_IMAGE, - State.LOADING_VIDEO, - State.PLAYING_VIDEO, - State.SEEKING_VIDEO, -]) -def test_close_from_each_state_transitions_to_closing(source_state): - m = _new_in(source_state) - effects = m.dispatch(CloseRequested()) - assert m.state == State.CLOSING - assert any(isinstance(e, StopMedia) for e in effects) - assert any(isinstance(e, EmitClosed) for e in effects) - - -# ---------------------------------------------------------------------- -# Race-fix invariant tests (six structural fixes from prior fix sweep) -# ---------------------------------------------------------------------- - - -def test_invariant_eof_race_loading_video_drops_stale_eof(): - """Invariant 1: stale EOF from previous video must not advance - the popout. Structural via LoadingVideo dropping VideoEofReached.""" - m = _new_in(State.LOADING_VIDEO) - m.loop_mode = LoopMode.NEXT # would normally trigger play_next - effects = m.dispatch(VideoEofReached()) - assert m.state == State.LOADING_VIDEO - assert not any(isinstance(e, EmitPlayNextRequested) for e in effects) - - -def test_invariant_double_navigate_no_double_load(): - """Invariant 2: rapid Right-arrow spam must not produce double - load events. Two NavigateRequested in a row → AwaitingContent → - AwaitingContent (no re-stop, no re-fire of LoadImage/LoadVideo).""" - m = _new_in(State.PLAYING_VIDEO) - effects1 = m.dispatch(NavigateRequested(direction=1)) - assert m.state == State.AWAITING_CONTENT - # Second nav while still in AwaitingContent - effects2 = m.dispatch(NavigateRequested(direction=1)) - assert m.state == State.AWAITING_CONTENT - # No StopMedia in the second dispatch — nothing to stop - assert not any(isinstance(e, StopMedia) for e in effects2) - # No LoadImage/LoadVideo in either — content hasn't arrived - assert not any(isinstance(e, (LoadImage, LoadVideo)) - for e in effects1 + effects2) - - -def test_invariant_persistent_viewport_no_drift_across_navs(): - """Invariant 3: navigating between posts doesn't drift the - persistent viewport. Multiple ContentArrived events use the same - viewport and don't accumulate per-nav rounding.""" - m = StateMachine() - m.viewport = Viewport(center_x=960.0, center_y=540.0, long_side=1280.0) - m.is_first_content_load = False # past the seed point - original = m.viewport - for path in ["/a.jpg", "/b.jpg", "/c.jpg", "/d.jpg", "/e.jpg"]: - m.state = State.DISPLAYING_IMAGE - m.dispatch(NavigateRequested(direction=1)) - m.dispatch(ContentArrived(path=path, info="", kind=MediaKind.IMAGE)) - assert m.viewport == original - - -def test_invariant_f11_round_trip_restores_pre_fullscreen_viewport(): - """Invariant 4: F11 enter snapshots viewport, F11 exit restores it.""" - m = _new_in(State.PLAYING_VIDEO) - m.viewport = Viewport(center_x=800.0, center_y=600.0, long_side=1000.0) - pre = m.viewport - # Enter fullscreen - m.dispatch(FullscreenToggled()) - assert m.fullscreen is True - assert m.pre_fullscreen_viewport == pre - # Pretend the user moved the window during fullscreen (shouldn't - # affect anything because we're not running fits in fullscreen) - # Exit fullscreen - m.dispatch(FullscreenToggled()) - assert m.fullscreen is False - assert m.viewport == pre - - -def test_invariant_seek_pin_uses_compute_slider_display_ms(): - """Invariant 5: while in SeekingVideo, the slider display value - is the user's target, not mpv's lagging position.""" - m = _new_in(State.PLAYING_VIDEO) - m.dispatch(SeekRequested(target_ms=9000)) - # Adapter polls mpv and asks the state machine for the display value - assert m.compute_slider_display_ms(mpv_pos_ms=4500) == 9000 - assert m.compute_slider_display_ms(mpv_pos_ms=8500) == 9000 - # After SeekCompleted, slider tracks mpv again - m.dispatch(SeekCompleted()) - assert m.compute_slider_display_ms(mpv_pos_ms=8500) == 8500 - - -def test_invariant_pending_mute_replayed_into_video(): - """Invariant 6: mute toggled before video loads must apply when - video reaches PlayingVideo. The state machine owns mute as truth; - ApplyMute(state.mute) fires on PlayingVideo entry.""" - m = StateMachine() - # User mutes before any video has loaded - m.dispatch(MuteToggleRequested()) - assert m.mute is True - # Now drive through to PlayingVideo - m.dispatch(ContentArrived( - path="/v.mp4", info="i", kind=MediaKind.VIDEO, - )) - assert m.state == State.LOADING_VIDEO - effects = m.dispatch(VideoStarted()) - assert m.state == State.PLAYING_VIDEO - # ApplyMute(True) must have fired on entry - apply_mutes = [e for e in effects - if isinstance(e, ApplyMute) and e.value is True] - assert apply_mutes - - -# ---------------------------------------------------------------------- -# Illegal transition tests -# ---------------------------------------------------------------------- -# -# At commit 11 these become env-gated raises (BOORU_VIEWER_STRICT_STATE). -# At commits 3-10 they return [] (the skeleton's default). - - -def test_strict_mode_raises_invalid_transition(monkeypatch): - """When BOORU_VIEWER_STRICT_STATE is set, illegal events raise - InvalidTransition instead of dropping silently. This is the - development/debug mode that catches programmer errors at the - dispatch boundary.""" - monkeypatch.setenv("BOORU_VIEWER_STRICT_STATE", "1") - m = _new_in(State.PLAYING_VIDEO) - with pytest.raises(InvalidTransition) as exc_info: - m.dispatch(VideoStarted()) - assert exc_info.value.state == State.PLAYING_VIDEO - assert isinstance(exc_info.value.event, VideoStarted) - - -def test_strict_mode_does_not_raise_for_legal_events(monkeypatch): - """Legal events go through dispatch normally even under strict mode.""" - monkeypatch.setenv("BOORU_VIEWER_STRICT_STATE", "1") - m = _new_in(State.PLAYING_VIDEO) - # SeekRequested IS legal in PlayingVideo — no raise - effects = m.dispatch(SeekRequested(target_ms=5000)) - assert m.state == State.SEEKING_VIDEO - - -def test_strict_mode_legal_but_no_op_does_not_raise(monkeypatch): - """The 'legal-but-no-op' events (e.g. VideoEofReached in - LoadingVideo, the EOF race fix) must NOT raise in strict mode. - They're intentionally accepted and dropped — that's the - structural fix, not a programmer error.""" - monkeypatch.setenv("BOORU_VIEWER_STRICT_STATE", "1") - m = _new_in(State.LOADING_VIDEO) - # VideoEofReached in LoadingVideo is legal-but-no-op - effects = m.dispatch(VideoEofReached()) - assert effects == [] - assert m.state == State.LOADING_VIDEO - - -@pytest.mark.parametrize("source_state, illegal_event", [ - (State.AWAITING_CONTENT, VideoEofReached()), - (State.AWAITING_CONTENT, VideoStarted()), - (State.AWAITING_CONTENT, SeekRequested(target_ms=1000)), - (State.AWAITING_CONTENT, SeekCompleted()), - (State.AWAITING_CONTENT, TogglePlayRequested()), - (State.DISPLAYING_IMAGE, VideoEofReached()), - (State.DISPLAYING_IMAGE, VideoStarted()), - (State.DISPLAYING_IMAGE, SeekRequested(target_ms=1000)), - (State.DISPLAYING_IMAGE, SeekCompleted()), - (State.DISPLAYING_IMAGE, TogglePlayRequested()), - (State.LOADING_VIDEO, SeekRequested(target_ms=1000)), - (State.LOADING_VIDEO, SeekCompleted()), - (State.LOADING_VIDEO, TogglePlayRequested()), - (State.PLAYING_VIDEO, VideoStarted()), - (State.PLAYING_VIDEO, SeekCompleted()), - (State.SEEKING_VIDEO, VideoStarted()), - (State.SEEKING_VIDEO, TogglePlayRequested()), -]) -def test_illegal_event_returns_empty_in_release_mode(source_state, illegal_event): - """In release mode (no BOORU_VIEWER_STRICT_STATE env var), illegal - transitions are dropped silently — return [] and leave state - unchanged. In strict mode (commit 11) they raise InvalidTransition. - The release-mode path is what production runs.""" - m = _new_in(source_state) - effects = m.dispatch(illegal_event) - assert effects == [] - assert m.state == source_state - - -# ---------------------------------------------------------------------- -# Persistent state field tests (commits 8 + 9) -# ---------------------------------------------------------------------- - - -def test_state_field_mute_persists_across_video_loads(): - """Once set, state.mute survives any number of LoadingVideo → - PlayingVideo cycles. Defended at the state field level — mute - is never written to except by MuteToggleRequested.""" - m = StateMachine() - m.dispatch(MuteToggleRequested()) - assert m.mute is True - # Load several videos - for _ in range(3): - m.state = State.AWAITING_CONTENT - m.dispatch(ContentArrived(path="/v.mp4", info="", - kind=MediaKind.VIDEO)) - m.dispatch(VideoStarted()) - assert m.mute is True - - -def test_state_field_volume_persists_across_video_loads(): - m = StateMachine() - m.dispatch(VolumeSet(value=85)) - assert m.volume == 85 - for _ in range(3): - m.state = State.AWAITING_CONTENT - m.dispatch(ContentArrived(path="/v.mp4", info="", - kind=MediaKind.VIDEO)) - m.dispatch(VideoStarted()) - assert m.volume == 85 - - -def test_state_field_loop_mode_persists(): - m = StateMachine() - m.dispatch(LoopModeSet(mode=LoopMode.NEXT)) - assert m.loop_mode == LoopMode.NEXT - m.state = State.AWAITING_CONTENT - m.dispatch(ContentArrived(path="/v.mp4", info="", - kind=MediaKind.VIDEO)) - m.dispatch(VideoStarted()) - assert m.loop_mode == LoopMode.NEXT - - -# ---------------------------------------------------------------------- -# Window event tests (commit 8) -# ---------------------------------------------------------------------- - - -def test_window_moved_updates_viewport_center_only(): - """Move-only update: keep long_side, change center.""" - m = _new_in(State.DISPLAYING_IMAGE) - m.viewport = Viewport(center_x=500.0, center_y=400.0, long_side=800.0) - m.dispatch(WindowMoved(rect=(200, 300, 1000, 800))) - assert m.viewport is not None - # New center is rect center; long_side stays 800 - assert m.viewport.center_x == 700.0 # 200 + 1000/2 - assert m.viewport.center_y == 700.0 # 300 + 800/2 - assert m.viewport.long_side == 800.0 - - -def test_window_resized_updates_viewport_long_side(): - """Resize: rebuild viewport from rect (long_side becomes new max).""" - m = _new_in(State.DISPLAYING_IMAGE) - m.viewport = Viewport(center_x=500.0, center_y=400.0, long_side=800.0) - m.dispatch(WindowResized(rect=(100, 100, 1200, 900))) - assert m.viewport is not None - assert m.viewport.long_side == 1200.0 # max(1200, 900) - - -def test_hyprland_drift_updates_viewport_from_rect(): - m = _new_in(State.DISPLAYING_IMAGE) - m.viewport = Viewport(center_x=500.0, center_y=400.0, long_side=800.0) - m.dispatch(HyprlandDriftDetected(rect=(50, 50, 1500, 1000))) - assert m.viewport is not None - assert m.viewport.center_x == 800.0 # 50 + 1500/2 - assert m.viewport.center_y == 550.0 # 50 + 1000/2 - assert m.viewport.long_side == 1500.0