Untrack tests/ directory and related dev tooling

Removes the tests/ folder from git tracking and adds it to .gitignore.
The 81 tests (16 Phase A core + 65 popout state machine) stay on
disk as local-only working notes, the same way docs/ and project.md
are gitignored. Running them is `pytest tests/` from the project
root inside .venv as before — nothing about the tests themselves
changed, just whether they're version-controlled.

Reverts the related additions in pyproject.toml and README.md from
commit bf14466 (Phase A baseline) so the public surface doesn't
reference a tests/ folder that no longer ships:

  - pyproject.toml: drops [project.optional-dependencies] test extra
    and [tool.pytest.ini_options]. pytest + pytest-asyncio are still
    installed in the local .venv via the previous pip install -e ".[test]"
    so the suite keeps running locally; new clones won't get them
    automatically.

  - README.md: drops the "Run tests:" section from the Linux install
    block. The README's install instructions return to their pre-
    Phase-A state.

  - .gitignore: adds `tests/` alongside the existing `docs/` and
    `project.md` lines (the same convention used for the refactor
    inventory / plan / notes / final report docs).

The 12 test files removed from tracking (`git rm -r --cached`):
  tests/__init__.py
  tests/conftest.py
  tests/core/__init__.py
  tests/core/test_cache.py
  tests/core/test_concurrency.py
  tests/core/test_config.py
  tests/core/test_db.py
  tests/core/api/__init__.py
  tests/core/api/test_base.py
  tests/gui/__init__.py
  tests/gui/popout/__init__.py
  tests/gui/popout/test_state.py

Verification:
  - tests/ still exists on disk
  - `pytest tests/` still runs and passes 81 / 81 in 0.11s
  - `git ls-files tests/` returns nothing
  - `git status` is clean
This commit is contained in:
pax 2026-04-08 20:47:50 -05:00
parent a2b759be90
commit 1b66b03a30
15 changed files with 1 additions and 1266 deletions

1
.gitignore vendored
View File

@ -9,5 +9,6 @@ build/
venv/
docs/
project.md
tests/
*.bak/
*.dll

View File

@ -147,12 +147,6 @@ booru-viewer
Or without installing: `python3 -m booru_viewer.main_gui`
**Run tests:**
```sh
pip install -e ".[test]"
pytest tests/
```
**Desktop entry:** To add booru-viewer to your app launcher, create `~/.local/share/applications/booru-viewer.desktop`:
```ini
[Desktop Entry]

View File

@ -14,16 +14,6 @@ dependencies = [
"python-mpv>=1.0",
]
[project.optional-dependencies]
test = [
"pytest>=8.0",
"pytest-asyncio>=0.23",
]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
[project.scripts]
booru-viewer = "booru_viewer.main_gui:main"

View File

View File

@ -1,71 +0,0 @@
"""Shared fixtures for the booru-viewer test suite.
All fixtures here are pure-Python no Qt, no mpv, no network. Filesystem
writes go through `tmp_path` (or fixtures that wrap it). Module-level globals
that the production code mutates (the concurrency loop, the httpx singletons)
get reset around each test that touches them.
"""
from __future__ import annotations
import pytest
@pytest.fixture
def tmp_db(tmp_path):
"""Fresh `Database` instance writing to a temp file. Auto-closes."""
from booru_viewer.core.db import Database
db = Database(tmp_path / "test.db")
yield db
db.close()
@pytest.fixture
def tmp_library(tmp_path):
"""Point `saved_dir()` at `tmp_path/saved` for the duration of the test.
Uses `core.config.set_library_dir` (the official override hook) so the
redirect goes through the same code path the GUI uses for the
user-configurable library location. Tear-down restores the previous
value so tests can run in any order without bleed.
"""
from booru_viewer.core import config
saved = tmp_path / "saved"
saved.mkdir()
original = config._library_dir_override
config.set_library_dir(saved)
yield saved
config.set_library_dir(original)
@pytest.fixture
def reset_app_loop():
"""Reset `concurrency._app_loop` between tests.
The module global is set once at app startup in production; tests need
to start from a clean slate to assert the unset-state behavior.
"""
from booru_viewer.core import concurrency
original = concurrency._app_loop
concurrency._app_loop = None
yield
concurrency._app_loop = original
@pytest.fixture
def reset_shared_clients():
"""Reset both shared httpx singletons (cache module + BooruClient class).
Both are class/module-level globals; tests that exercise the lazy-init
+ lock pattern need them cleared so the test sees a fresh first-call
race instead of a leftover instance from a previous test.
"""
from booru_viewer.core.api.base import BooruClient
from booru_viewer.core import cache
original_booru = BooruClient._shared_client
original_cache = cache._shared_client
BooruClient._shared_client = None
cache._shared_client = None
yield
BooruClient._shared_client = original_booru
cache._shared_client = original_cache

View File

View File

@ -1,77 +0,0 @@
"""Tests for `booru_viewer.core.api.base` — the lazy `_shared_client`
singleton on `BooruClient`.
Locks in the lock-and-recheck pattern at `base.py:90-108`. Without it,
two threads racing on first `.client` access would both see
`_shared_client is None`, both build an `httpx.AsyncClient`, and one of
them would leak (overwritten without aclose).
"""
from __future__ import annotations
import threading
from unittest.mock import patch, MagicMock
import pytest
from booru_viewer.core.api.base import BooruClient
class _StubClient(BooruClient):
"""Concrete subclass so we can instantiate `BooruClient` for the test
the base class has abstract `search` / `get_post` methods."""
api_type = "stub"
async def search(self, tags="", page=1, limit=40):
return []
async def get_post(self, post_id):
return None
def test_shared_client_singleton_under_concurrency(reset_shared_clients):
"""N threads racing on first `.client` access must result in exactly
one `httpx.AsyncClient` constructor call. The threading.Lock guards
the check-and-set so the second-and-later callers re-read the now-set
`_shared_client` after acquiring the lock instead of building their
own."""
constructor_calls = 0
constructor_lock = threading.Lock()
def _fake_async_client(*args, **kwargs):
nonlocal constructor_calls
with constructor_lock:
constructor_calls += 1
m = MagicMock()
m.is_closed = False
return m
# Barrier so all threads hit the property at the same moment
n_threads = 10
barrier = threading.Barrier(n_threads)
results = []
results_lock = threading.Lock()
client_instance = _StubClient("http://example.test")
def _worker():
barrier.wait()
c = client_instance.client
with results_lock:
results.append(c)
with patch("booru_viewer.core.api.base.httpx.AsyncClient",
side_effect=_fake_async_client):
threads = [threading.Thread(target=_worker) for _ in range(n_threads)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=5)
assert constructor_calls == 1, (
f"Expected exactly one httpx.AsyncClient construction, "
f"got {constructor_calls}"
)
# All threads got back the same shared instance
assert len(results) == n_threads
assert all(r is results[0] for r in results)

View File

@ -1,224 +0,0 @@
"""Tests for `booru_viewer.core.cache` — Referer hostname matching, ugoira
zip-bomb defenses, download size caps, and validity-check fallback.
Locks in:
- `_referer_for` proper hostname suffix matching (`54ccc40` security fix)
guarding against `imgblahgelbooru.attacker.com` mapping to gelbooru.com
- `_convert_ugoira_to_gif` cap enforcement (frame count + uncompressed size)
before any decompression defense against ugoira zip bombs
- `_do_download` MAX_DOWNLOAD_BYTES enforcement, both the Content-Length
pre-check and the running-total chunk-loop guard
- `_is_valid_media` returning True on OSError so a transient EBUSY/lock
doesn't kick off a delete + re-download loop
"""
from __future__ import annotations
import asyncio
import io
import zipfile
from pathlib import Path
from unittest.mock import patch
from urllib.parse import urlparse
import pytest
from booru_viewer.core import cache
from booru_viewer.core.cache import (
MAX_DOWNLOAD_BYTES,
_convert_ugoira_to_gif,
_do_download,
_is_valid_media,
_referer_for,
)
# -- _referer_for hostname suffix matching --
def test_referer_for_exact_and_suffix_match():
"""Real booru hostnames map to the canonical Referer for their CDN.
Exact match and subdomain-suffix match both rewrite the Referer host
to the canonical apex (gelbooru `gelbooru.com`, donmai
`danbooru.donmai.us`). The actual request netloc is dropped the
point is to look like a navigation from the canonical site.
"""
# gelbooru exact host
assert _referer_for(urlparse("https://gelbooru.com/index.php")) \
== "https://gelbooru.com/"
# gelbooru subdomain rewrites to the canonical apex
assert _referer_for(urlparse("https://img3.gelbooru.com/images/abc.jpg")) \
== "https://gelbooru.com/"
# donmai exact host
assert _referer_for(urlparse("https://donmai.us/posts/123")) \
== "https://danbooru.donmai.us/"
# donmai subdomain rewrites to the canonical danbooru host
assert _referer_for(urlparse("https://safebooru.donmai.us/posts/123")) \
== "https://danbooru.donmai.us/"
def test_referer_for_rejects_substring_attacker():
"""An attacker host that contains `gelbooru.com` or `donmai.us` as a
SUBSTRING (not a hostname suffix) must NOT pick up the booru Referer.
Without proper suffix matching, `imgblahgelbooru.attacker.com` would
leak the gelbooru Referer to the attacker that's the `54ccc40`
security fix.
"""
# Attacker host that ends with attacker-controlled TLD
parsed = urlparse("https://imgblahgelbooru.attacker.com/x.jpg")
referer = _referer_for(parsed)
assert "gelbooru.com" not in referer
assert "imgblahgelbooru.attacker.com" in referer
parsed = urlparse("https://donmai.us.attacker.com/x.jpg")
referer = _referer_for(parsed)
assert "danbooru.donmai.us" not in referer
assert "donmai.us.attacker.com" in referer
# Completely unrelated host preserved as-is
parsed = urlparse("https://example.test/x.jpg")
assert _referer_for(parsed) == "https://example.test/"
# -- Ugoira zip-bomb defenses --
def _build_ugoira_zip(path: Path, n_frames: int, frame_bytes: bytes = b"x") -> Path:
"""Build a synthetic ugoira-shaped zip with `n_frames` numbered .jpg
entries. Content is whatever the caller passes; defaults to 1 byte.
The cap-enforcement tests don't need decodable JPEGs — the cap fires
before any decode happens. The filenames just need .jpg suffixes so
`_convert_ugoira_to_gif` recognizes them as frames.
"""
with zipfile.ZipFile(path, "w") as zf:
for i in range(n_frames):
zf.writestr(f"{i:04d}.jpg", frame_bytes)
return path
def test_ugoira_frame_count_cap_rejects_bomb(tmp_path, monkeypatch):
"""A zip with more than `UGOIRA_MAX_FRAMES` frames must be refused
BEFORE any decompression. We monkeypatch the cap down so the test
builds a tiny zip instead of a 5001-entry one the cap check is
cap > N, not cap == 5000."""
monkeypatch.setattr(cache, "UGOIRA_MAX_FRAMES", 2)
zip_path = _build_ugoira_zip(tmp_path / "bomb.zip", n_frames=3)
gif_path = zip_path.with_suffix(".gif")
result = _convert_ugoira_to_gif(zip_path)
# Function returned the original zip (refusal path)
assert result == zip_path
# No .gif was written
assert not gif_path.exists()
def test_ugoira_uncompressed_size_cap_rejects_bomb(tmp_path, monkeypatch):
"""A zip whose `ZipInfo.file_size` headers sum past
`UGOIRA_MAX_UNCOMPRESSED_BYTES` must be refused before decompression.
Same monkeypatch trick to keep the test data small."""
monkeypatch.setattr(cache, "UGOIRA_MAX_UNCOMPRESSED_BYTES", 50)
# Three 100-byte frames → 300 total > 50 cap
zip_path = _build_ugoira_zip(
tmp_path / "bomb.zip", n_frames=3, frame_bytes=b"x" * 100
)
gif_path = zip_path.with_suffix(".gif")
result = _convert_ugoira_to_gif(zip_path)
assert result == zip_path
assert not gif_path.exists()
# -- _do_download MAX_DOWNLOAD_BYTES caps --
class _FakeHeaders:
def __init__(self, mapping):
self._m = mapping
def get(self, key, default=None):
return self._m.get(key.lower(), default)
class _FakeResponse:
def __init__(self, headers, chunks):
self.headers = _FakeHeaders({k.lower(): v for k, v in headers.items()})
self._chunks = chunks
def raise_for_status(self):
pass
async def aiter_bytes(self, _size):
for chunk in self._chunks:
yield chunk
class _FakeStreamCtx:
def __init__(self, response):
self._resp = response
async def __aenter__(self):
return self._resp
async def __aexit__(self, *_args):
return False
class _FakeClient:
def __init__(self, response):
self._resp = response
def stream(self, _method, _url, headers=None):
return _FakeStreamCtx(self._resp)
def test_download_cap_content_length_pre_check(tmp_path):
"""When the server advertises a Content-Length larger than
MAX_DOWNLOAD_BYTES, `_do_download` must raise BEFORE iterating any
bytes. This is the cheap pre-check that protects against the trivial
OOM/disk-fill attack we don't even start streaming."""
too_big = MAX_DOWNLOAD_BYTES + 1
response = _FakeResponse(
headers={"content-type": "image/jpeg", "content-length": str(too_big)},
chunks=[b"never read"],
)
client = _FakeClient(response)
local = tmp_path / "out.jpg"
with pytest.raises(ValueError, match="Download too large"):
asyncio.run(_do_download(client, "http://example.test/x.jpg", {}, local, None))
# No file should have been written
assert not local.exists()
def test_download_cap_running_total_aborts(tmp_path, monkeypatch):
"""Servers can lie about Content-Length. The chunk loop must enforce
the running-total cap independently and abort mid-stream as soon as
cumulative bytes exceed `MAX_DOWNLOAD_BYTES`. We monkeypatch the cap
down to 1024 to keep the test fast."""
monkeypatch.setattr(cache, "MAX_DOWNLOAD_BYTES", 1024)
# Advertise 0 (unknown) so the small-payload branch runs and the
# running-total guard inside the chunk loop is what fires.
response = _FakeResponse(
headers={"content-type": "image/jpeg", "content-length": "0"},
chunks=[b"x" * 600, b"x" * 600], # 1200 total > 1024 cap
)
client = _FakeClient(response)
local = tmp_path / "out.jpg"
with pytest.raises(ValueError, match="exceeded cap mid-stream"):
asyncio.run(_do_download(client, "http://example.test/x.jpg", {}, local, None))
# The buffered-write path only writes after the loop finishes, so the
# mid-stream abort means no file lands on disk.
assert not local.exists()
# -- _is_valid_media OSError fallback --
def test_is_valid_media_returns_true_on_oserror(tmp_path):
"""If the file can't be opened (transient EBUSY, lock, permissions),
`_is_valid_media` must return True so the caller doesn't delete the
cached file. The previous behavior of returning False kicked off a
delete + re-download loop on every access while the underlying
OS issue persisted."""
nonexistent = tmp_path / "definitely-not-here.jpg"
assert _is_valid_media(nonexistent) is True

View File

@ -1,62 +0,0 @@
"""Tests for `booru_viewer.core.concurrency` — the persistent-loop handle.
Locks in:
- `get_app_loop` raises a clear RuntimeError if `set_app_loop` was never
called (the production code uses this to bail loudly when async work
is scheduled before the loop thread starts)
- `run_on_app_loop` round-trips a coroutine result from a worker-thread
loop back to the calling thread via `concurrent.futures.Future`
"""
from __future__ import annotations
import asyncio
import threading
import pytest
from booru_viewer.core import concurrency
from booru_viewer.core.concurrency import (
get_app_loop,
run_on_app_loop,
set_app_loop,
)
def test_get_app_loop_raises_before_set(reset_app_loop):
"""Calling `get_app_loop` before `set_app_loop` is a configuration
error the production code expects a clear RuntimeError so callers
bail loudly instead of silently scheduling work onto a None loop."""
with pytest.raises(RuntimeError, match="not initialized"):
get_app_loop()
def test_run_on_app_loop_round_trips_result(reset_app_loop):
"""Spin up a real asyncio loop in a worker thread, register it via
`set_app_loop`, then from the test (main) thread schedule a coroutine
via `run_on_app_loop` and assert the result comes back through the
`concurrent.futures.Future` interface."""
loop = asyncio.new_event_loop()
ready = threading.Event()
def _run_loop():
asyncio.set_event_loop(loop)
ready.set()
loop.run_forever()
t = threading.Thread(target=_run_loop, daemon=True)
t.start()
ready.wait(timeout=2)
try:
set_app_loop(loop)
async def _produce():
return 42
fut = run_on_app_loop(_produce())
assert fut.result(timeout=2) == 42
finally:
loop.call_soon_threadsafe(loop.stop)
t.join(timeout=2)
loop.close()

View File

@ -1,57 +0,0 @@
"""Tests for `booru_viewer.core.config` — path traversal guard on
`saved_folder_dir` and the shallow walk in `find_library_files`.
Locks in:
- `saved_folder_dir` resolve-and-relative_to check (`54ccc40` defense in
depth alongside `_validate_folder_name`)
- `find_library_files` matching exactly the root + 1-level subdirectory
layout that the library uses, with the right MEDIA_EXTENSIONS filter
"""
from __future__ import annotations
import pytest
from booru_viewer.core import config
from booru_viewer.core.config import find_library_files, saved_folder_dir
# -- saved_folder_dir traversal guard --
def test_saved_folder_dir_rejects_dotdot(tmp_library):
"""`..` and any path that resolves outside `saved_dir()` must raise
ValueError, not silently mkdir somewhere unexpected. We test literal
`..` shapes only symlink escapes are filesystem-dependent and
flaky in tests."""
with pytest.raises(ValueError, match="escapes saved directory"):
saved_folder_dir("..")
with pytest.raises(ValueError, match="escapes saved directory"):
saved_folder_dir("../escape")
with pytest.raises(ValueError, match="escapes saved directory"):
saved_folder_dir("foo/../..")
# -- find_library_files shallow walk --
def test_find_library_files_walks_root_and_one_level(tmp_library):
"""Library has a flat shape: `saved/<post_id>.<ext>` at the root, or
`saved/<folder>/<post_id>.<ext>` one level deep. The walk must:
- find matches at both depths
- filter by MEDIA_EXTENSIONS (skip .txt and other non-media)
- filter by exact stem (skip unrelated post ids)
"""
# Root-level match
(tmp_library / "123.jpg").write_bytes(b"")
# One-level subfolder match
(tmp_library / "folder1").mkdir()
(tmp_library / "folder1" / "123.png").write_bytes(b"")
# Different post id — must be excluded
(tmp_library / "folder2").mkdir()
(tmp_library / "folder2" / "456.gif").write_bytes(b"")
# Wrong extension — must be excluded even with the right stem
(tmp_library / "123.txt").write_bytes(b"")
matches = find_library_files(123)
match_names = {p.name for p in matches}
assert match_names == {"123.jpg", "123.png"}

View File

@ -1,98 +0,0 @@
"""Tests for `booru_viewer.core.db` — folder name validation, INSERT OR
IGNORE collision handling, and LIKE escaping.
These tests lock in the `54ccc40` security/correctness fixes:
- `_validate_folder_name` rejects path-traversal shapes before they hit the
filesystem in `saved_folder_dir`
- `add_bookmark` re-SELECTs the actual row id after an INSERT OR IGNORE
collision so the returned `Bookmark.id` is never the bogus 0 that broke
`update_bookmark_cache_path`
- `get_bookmarks` escapes the SQL LIKE wildcards `_` and `%` so a search for
`cat_ear` doesn't bleed into `catear` / `catXear`
"""
from __future__ import annotations
import pytest
from booru_viewer.core.db import _validate_folder_name
# -- _validate_folder_name --
def test_validate_folder_name_rejects_traversal():
"""Every shape that could escape the saved-images dir or hit a hidden
file must raise ValueError. One assertion per rejection rule so a
failure points at the exact case."""
with pytest.raises(ValueError):
_validate_folder_name("") # empty
with pytest.raises(ValueError):
_validate_folder_name("..") # dotdot literal
with pytest.raises(ValueError):
_validate_folder_name(".") # dot literal
with pytest.raises(ValueError):
_validate_folder_name("/foo") # forward slash
with pytest.raises(ValueError):
_validate_folder_name("foo/bar") # embedded forward slash
with pytest.raises(ValueError):
_validate_folder_name("\\foo") # backslash
with pytest.raises(ValueError):
_validate_folder_name(".hidden") # leading dot
with pytest.raises(ValueError):
_validate_folder_name("~user") # leading tilde
def test_validate_folder_name_accepts_unicode_and_punctuation():
"""Common real-world folder names must pass through unchanged. The
guard is meant to block escape shapes, not normal naming."""
assert _validate_folder_name("miku(lewd)") == "miku(lewd)"
assert _validate_folder_name("cat ear") == "cat ear"
assert _validate_folder_name("日本語") == "日本語"
assert _validate_folder_name("foo-bar") == "foo-bar"
assert _validate_folder_name("foo.bar") == "foo.bar" # dot OK if not leading
# -- add_bookmark INSERT OR IGNORE collision --
def test_add_bookmark_collision_returns_existing_id(tmp_db):
"""Calling `add_bookmark` twice with the same (site_id, post_id) must
return the same row id on the second call, not the stale `lastrowid`
of 0 that INSERT OR IGNORE leaves behind. Without the re-SELECT fix,
any downstream `update_bookmark_cache_path(id=0, ...)` silently
no-ops, breaking the cache-path linkage."""
site = tmp_db.add_site("test", "http://example.test", "danbooru")
bm1 = tmp_db.add_bookmark(
site_id=site.id, post_id=42, file_url="http://example.test/42.jpg",
preview_url=None, tags="cat",
)
bm2 = tmp_db.add_bookmark(
site_id=site.id, post_id=42, file_url="http://example.test/42.jpg",
preview_url=None, tags="cat",
)
assert bm1.id != 0
assert bm2.id == bm1.id
# -- get_bookmarks LIKE escaping --
def test_get_bookmarks_like_escaping(tmp_db):
"""A search for the literal tag `cat_ear` must NOT match `catear` or
`catXear`. SQLite's LIKE treats `_` as a single-char wildcard unless
explicitly escaped without `ESCAPE '\\\\'` the search would return
all three rows."""
site = tmp_db.add_site("test", "http://example.test", "danbooru")
tmp_db.add_bookmark(
site_id=site.id, post_id=1, file_url="http://example.test/1.jpg",
preview_url=None, tags="cat_ear",
)
tmp_db.add_bookmark(
site_id=site.id, post_id=2, file_url="http://example.test/2.jpg",
preview_url=None, tags="catear",
)
tmp_db.add_bookmark(
site_id=site.id, post_id=3, file_url="http://example.test/3.jpg",
preview_url=None, tags="catXear",
)
results = tmp_db.get_bookmarks(search="cat_ear")
tags_returned = {b.tags for b in results}
assert tags_returned == {"cat_ear"}

View File

View File

@ -1,661 +0,0 @@
"""Pure-Python state machine tests for the popout viewer.
Imports `booru_viewer.gui.popout.state` directly without standing up a
QApplication. The state machine module is required to be import-pure
(no PySide6, mpv, httpx, subprocess, or any module that imports them);
this test file is the forcing function. If state.py grows a Qt or mpv
import, these tests fail to collect and the test suite breaks.
Test categories (from docs/POPOUT_REFACTOR_PLAN.md "Test plan"):
1. Per-state transition tests
2. Race-fix invariant tests (six structural fixes)
3. Illegal transition tests
4. Read-path query tests
**Commit 3 expectation:** most tests fail because state.py's dispatch
handlers are stubs returning []. Tests progressively pass as commits
4-11 land transitions. The trivially-passing tests at commit 3 (initial
state, slider display read-path, terminal Closing guard) document the
parts of the skeleton that are already real.
Refactor plan: docs/POPOUT_REFACTOR_PLAN.md
Architecture: docs/POPOUT_ARCHITECTURE.md
"""
from __future__ import annotations
import pytest
from booru_viewer.gui.popout.state import (
# Enums
InvalidTransition,
LoopMode,
MediaKind,
State,
StateMachine,
# Events
CloseRequested,
ContentArrived,
FullscreenToggled,
HyprlandDriftDetected,
LoopModeSet,
MuteToggleRequested,
NavigateRequested,
Open,
SeekCompleted,
SeekRequested,
TogglePlayRequested,
VideoEofReached,
VideoSizeKnown,
VideoStarted,
VolumeSet,
WindowMoved,
WindowResized,
# Effects
ApplyLoopMode,
ApplyMute,
ApplyVolume,
EmitClosed,
EmitNavigate,
EmitPlayNextRequested,
EnterFullscreen,
ExitFullscreen,
FitWindowToContent,
LoadImage,
LoadVideo,
SeekVideoTo,
StopMedia,
)
from booru_viewer.gui.popout.viewport import Viewport
# ----------------------------------------------------------------------
# Helpers — direct field mutation for setup. Tests construct a fresh
# StateMachine and write the state field directly to skip the dispatch
# chain. This is a deliberate test-fixture-vs-production-code split:
# the tests don't depend on the dispatch chain being correct in order
# to test individual transitions.
# ----------------------------------------------------------------------
def _new_in(state: State) -> StateMachine:
m = StateMachine()
m.state = state
return m
# ----------------------------------------------------------------------
# Read-path queries (commit 2 — already passing)
# ----------------------------------------------------------------------
def test_initial_state():
m = StateMachine()
assert m.state == State.AWAITING_CONTENT
assert m.is_first_content_load is True
assert m.fullscreen is False
assert m.mute is False
assert m.volume == 50
assert m.loop_mode == LoopMode.LOOP
assert m.viewport is None
assert m.seek_target_ms == 0
def test_compute_slider_display_ms_passthrough_when_not_seeking():
m = StateMachine()
m.state = State.PLAYING_VIDEO
assert m.compute_slider_display_ms(7500) == 7500
def test_compute_slider_display_ms_pinned_when_seeking():
m = StateMachine()
m.state = State.SEEKING_VIDEO
m.seek_target_ms = 7000
# mpv's reported position can be anywhere; the slider must show
# the user's target while we're in SeekingVideo.
assert m.compute_slider_display_ms(5000) == 7000
assert m.compute_slider_display_ms(7000) == 7000
assert m.compute_slider_display_ms(9999) == 7000
def test_dispatch_in_closing_returns_empty():
"""Closing is terminal — every event from Closing returns [] and
the state stays Closing."""
m = _new_in(State.CLOSING)
for event in [
NavigateRequested(direction=1),
ContentArrived("/x.jpg", "info", MediaKind.IMAGE),
VideoEofReached(),
SeekRequested(target_ms=1000),
CloseRequested(),
]:
effects = m.dispatch(event)
assert effects == []
assert m.state == State.CLOSING
# ----------------------------------------------------------------------
# Per-state transition tests
# ----------------------------------------------------------------------
#
# These all rely on the per-event handlers in state.py returning real
# effect lists. They fail at commit 3 (handlers are stubs returning [])
# and pass progressively as commits 4-11 land.
# -- AwaitingContent transitions --
def test_awaiting_open_stashes_saved_geo():
"""Open event in AwaitingContent stashes saved_geo, saved_fullscreen,
monitor for the first ContentArrived to consume."""
m = StateMachine()
effects = m.dispatch(Open(saved_geo=(100, 200, 800, 600),
saved_fullscreen=False, monitor=""))
assert m.state == State.AWAITING_CONTENT
assert m.saved_geo == (100, 200, 800, 600)
assert m.saved_fullscreen is False
assert effects == []
def test_awaiting_content_arrived_image_loads_and_transitions():
m = StateMachine()
effects = m.dispatch(ContentArrived(
path="/path/img.jpg", info="i", kind=MediaKind.IMAGE,
width=1920, height=1080,
))
assert m.state == State.DISPLAYING_IMAGE
assert m.is_first_content_load is False
assert m.current_path == "/path/img.jpg"
assert any(isinstance(e, LoadImage) for e in effects)
assert any(isinstance(e, FitWindowToContent) for e in effects)
def test_awaiting_content_arrived_gif_loads_as_animated():
m = StateMachine()
effects = m.dispatch(ContentArrived(
path="/path/anim.gif", info="i", kind=MediaKind.GIF,
width=480, height=480,
))
assert m.state == State.DISPLAYING_IMAGE
load = next(e for e in effects if isinstance(e, LoadImage))
assert load.is_gif is True
def test_awaiting_content_arrived_video_transitions_to_loading():
m = StateMachine()
effects = m.dispatch(ContentArrived(
path="/path/v.mp4", info="i", kind=MediaKind.VIDEO,
width=1280, height=720,
))
assert m.state == State.LOADING_VIDEO
assert any(isinstance(e, LoadVideo) for e in effects)
def test_awaiting_content_arrived_video_emits_persistence_effects():
"""First content load also emits ApplyMute / ApplyVolume /
ApplyLoopMode so the state machine's persistent values land in
the freshly-created mpv on PlayingVideo entry. (The skeleton
might emit these on LoadingVideo entry or on PlayingVideo entry
either is acceptable as long as they fire before mpv consumes
the first frame.)"""
m = StateMachine()
m.mute = True
m.volume = 75
effects = m.dispatch(ContentArrived(
path="/v.mp4", info="i", kind=MediaKind.VIDEO,
))
# The plan says ApplyMute fires on PlayingVideo entry (commit 9),
# so this test will pass after commit 9 lands. Until then it
# documents the requirement.
assert any(isinstance(e, ApplyMute) and e.value is True for e in effects) or \
m.state == State.LOADING_VIDEO # at least one of these
def test_awaiting_navigate_emits_navigate_only():
"""Navigate while waiting (e.g. user spamming Right while loading)
emits Navigate but doesn't re-stop nonexistent media."""
m = StateMachine()
effects = m.dispatch(NavigateRequested(direction=1))
assert m.state == State.AWAITING_CONTENT
assert any(isinstance(e, EmitNavigate) and e.direction == 1
for e in effects)
# No StopMedia — nothing to stop
assert not any(isinstance(e, StopMedia) for e in effects)
# -- DisplayingImage transitions --
def test_displaying_image_navigate_stops_and_emits():
m = _new_in(State.DISPLAYING_IMAGE)
m.is_first_content_load = False
effects = m.dispatch(NavigateRequested(direction=-1))
assert m.state == State.AWAITING_CONTENT
assert any(isinstance(e, StopMedia) for e in effects)
assert any(isinstance(e, EmitNavigate) and e.direction == -1
for e in effects)
def test_displaying_image_content_replace_with_video():
m = _new_in(State.DISPLAYING_IMAGE)
m.is_first_content_load = False
effects = m.dispatch(ContentArrived(
path="/v.mp4", info="i", kind=MediaKind.VIDEO,
))
assert m.state == State.LOADING_VIDEO
assert any(isinstance(e, LoadVideo) for e in effects)
def test_displaying_image_content_replace_with_image():
m = _new_in(State.DISPLAYING_IMAGE)
m.is_first_content_load = False
effects = m.dispatch(ContentArrived(
path="/img2.png", info="i", kind=MediaKind.IMAGE,
))
assert m.state == State.DISPLAYING_IMAGE
assert any(isinstance(e, LoadImage) for e in effects)
# -- LoadingVideo transitions --
def test_loading_video_started_transitions_to_playing():
m = _new_in(State.LOADING_VIDEO)
effects = m.dispatch(VideoStarted())
assert m.state == State.PLAYING_VIDEO
# Persistence effects fire on PlayingVideo entry
assert any(isinstance(e, ApplyMute) for e in effects)
assert any(isinstance(e, ApplyVolume) for e in effects)
assert any(isinstance(e, ApplyLoopMode) for e in effects)
def test_loading_video_eof_dropped():
"""RACE FIX: Stale EOF from previous video lands while we're
loading the new one. The stale event must be dropped without
transitioning state. Replaces the 250ms _eof_ignore_until
timestamp window from fda3b10b."""
m = _new_in(State.LOADING_VIDEO)
effects = m.dispatch(VideoEofReached())
assert m.state == State.LOADING_VIDEO
assert effects == []
def test_loading_video_size_known_emits_fit():
m = _new_in(State.LOADING_VIDEO)
m.viewport = Viewport(center_x=500, center_y=400,
long_side=800)
effects = m.dispatch(VideoSizeKnown(width=1920, height=1080))
assert m.state == State.LOADING_VIDEO
assert any(isinstance(e, FitWindowToContent) for e in effects)
def test_loading_video_navigate_stops_and_emits():
m = _new_in(State.LOADING_VIDEO)
effects = m.dispatch(NavigateRequested(direction=1))
assert m.state == State.AWAITING_CONTENT
assert any(isinstance(e, StopMedia) for e in effects)
assert any(isinstance(e, EmitNavigate) for e in effects)
# -- PlayingVideo transitions --
def test_playing_video_eof_loop_next_emits_play_next():
m = _new_in(State.PLAYING_VIDEO)
m.loop_mode = LoopMode.NEXT
effects = m.dispatch(VideoEofReached())
assert any(isinstance(e, EmitPlayNextRequested) for e in effects)
def test_playing_video_eof_loop_once_pauses():
m = _new_in(State.PLAYING_VIDEO)
m.loop_mode = LoopMode.ONCE
effects = m.dispatch(VideoEofReached())
# Once mode should NOT emit play_next; it pauses
assert not any(isinstance(e, EmitPlayNextRequested) for e in effects)
def test_playing_video_eof_loop_loop_no_op():
"""Loop=Loop is mpv-handled (loop-file=inf), so the eof event
arriving in the state machine should be a no-op."""
m = _new_in(State.PLAYING_VIDEO)
m.loop_mode = LoopMode.LOOP
effects = m.dispatch(VideoEofReached())
assert not any(isinstance(e, EmitPlayNextRequested) for e in effects)
def test_playing_video_seek_requested_transitions_and_pins():
m = _new_in(State.PLAYING_VIDEO)
effects = m.dispatch(SeekRequested(target_ms=7500))
assert m.state == State.SEEKING_VIDEO
assert m.seek_target_ms == 7500
assert any(isinstance(e, SeekVideoTo) and e.target_ms == 7500
for e in effects)
def test_playing_video_navigate_stops_and_emits():
m = _new_in(State.PLAYING_VIDEO)
effects = m.dispatch(NavigateRequested(direction=1))
assert m.state == State.AWAITING_CONTENT
assert any(isinstance(e, StopMedia) for e in effects)
assert any(isinstance(e, EmitNavigate) for e in effects)
def test_playing_video_size_known_refits():
m = _new_in(State.PLAYING_VIDEO)
m.viewport = Viewport(center_x=500, center_y=400, long_side=800)
effects = m.dispatch(VideoSizeKnown(width=640, height=480))
assert any(isinstance(e, FitWindowToContent) for e in effects)
def test_playing_video_toggle_play_emits_toggle():
from booru_viewer.gui.popout.state import TogglePlay
m = _new_in(State.PLAYING_VIDEO)
effects = m.dispatch(TogglePlayRequested())
assert m.state == State.PLAYING_VIDEO
assert any(isinstance(e, TogglePlay) for e in effects)
# -- SeekingVideo transitions --
def test_seeking_video_completed_returns_to_playing():
m = _new_in(State.SEEKING_VIDEO)
m.seek_target_ms = 5000
effects = m.dispatch(SeekCompleted())
assert m.state == State.PLAYING_VIDEO
def test_seeking_video_seek_requested_replaces_target():
m = _new_in(State.SEEKING_VIDEO)
m.seek_target_ms = 5000
effects = m.dispatch(SeekRequested(target_ms=8000))
assert m.state == State.SEEKING_VIDEO
assert m.seek_target_ms == 8000
assert any(isinstance(e, SeekVideoTo) and e.target_ms == 8000
for e in effects)
def test_seeking_video_navigate_stops_and_emits():
m = _new_in(State.SEEKING_VIDEO)
effects = m.dispatch(NavigateRequested(direction=1))
assert m.state == State.AWAITING_CONTENT
assert any(isinstance(e, StopMedia) for e in effects)
def test_seeking_video_eof_dropped():
"""EOF during a seek is also stale — drop it."""
m = _new_in(State.SEEKING_VIDEO)
effects = m.dispatch(VideoEofReached())
assert m.state == State.SEEKING_VIDEO
assert effects == []
# -- Closing (parametrized over source states) --
@pytest.mark.parametrize("source_state", [
State.AWAITING_CONTENT,
State.DISPLAYING_IMAGE,
State.LOADING_VIDEO,
State.PLAYING_VIDEO,
State.SEEKING_VIDEO,
])
def test_close_from_each_state_transitions_to_closing(source_state):
m = _new_in(source_state)
effects = m.dispatch(CloseRequested())
assert m.state == State.CLOSING
assert any(isinstance(e, StopMedia) for e in effects)
assert any(isinstance(e, EmitClosed) for e in effects)
# ----------------------------------------------------------------------
# Race-fix invariant tests (six structural fixes from prior fix sweep)
# ----------------------------------------------------------------------
def test_invariant_eof_race_loading_video_drops_stale_eof():
"""Invariant 1: stale EOF from previous video must not advance
the popout. Structural via LoadingVideo dropping VideoEofReached."""
m = _new_in(State.LOADING_VIDEO)
m.loop_mode = LoopMode.NEXT # would normally trigger play_next
effects = m.dispatch(VideoEofReached())
assert m.state == State.LOADING_VIDEO
assert not any(isinstance(e, EmitPlayNextRequested) for e in effects)
def test_invariant_double_navigate_no_double_load():
"""Invariant 2: rapid Right-arrow spam must not produce double
load events. Two NavigateRequested in a row AwaitingContent
AwaitingContent (no re-stop, no re-fire of LoadImage/LoadVideo)."""
m = _new_in(State.PLAYING_VIDEO)
effects1 = m.dispatch(NavigateRequested(direction=1))
assert m.state == State.AWAITING_CONTENT
# Second nav while still in AwaitingContent
effects2 = m.dispatch(NavigateRequested(direction=1))
assert m.state == State.AWAITING_CONTENT
# No StopMedia in the second dispatch — nothing to stop
assert not any(isinstance(e, StopMedia) for e in effects2)
# No LoadImage/LoadVideo in either — content hasn't arrived
assert not any(isinstance(e, (LoadImage, LoadVideo))
for e in effects1 + effects2)
def test_invariant_persistent_viewport_no_drift_across_navs():
"""Invariant 3: navigating between posts doesn't drift the
persistent viewport. Multiple ContentArrived events use the same
viewport and don't accumulate per-nav rounding."""
m = StateMachine()
m.viewport = Viewport(center_x=960.0, center_y=540.0, long_side=1280.0)
m.is_first_content_load = False # past the seed point
original = m.viewport
for path in ["/a.jpg", "/b.jpg", "/c.jpg", "/d.jpg", "/e.jpg"]:
m.state = State.DISPLAYING_IMAGE
m.dispatch(NavigateRequested(direction=1))
m.dispatch(ContentArrived(path=path, info="", kind=MediaKind.IMAGE))
assert m.viewport == original
def test_invariant_f11_round_trip_restores_pre_fullscreen_viewport():
"""Invariant 4: F11 enter snapshots viewport, F11 exit restores it."""
m = _new_in(State.PLAYING_VIDEO)
m.viewport = Viewport(center_x=800.0, center_y=600.0, long_side=1000.0)
pre = m.viewport
# Enter fullscreen
m.dispatch(FullscreenToggled())
assert m.fullscreen is True
assert m.pre_fullscreen_viewport == pre
# Pretend the user moved the window during fullscreen (shouldn't
# affect anything because we're not running fits in fullscreen)
# Exit fullscreen
m.dispatch(FullscreenToggled())
assert m.fullscreen is False
assert m.viewport == pre
def test_invariant_seek_pin_uses_compute_slider_display_ms():
"""Invariant 5: while in SeekingVideo, the slider display value
is the user's target, not mpv's lagging position."""
m = _new_in(State.PLAYING_VIDEO)
m.dispatch(SeekRequested(target_ms=9000))
# Adapter polls mpv and asks the state machine for the display value
assert m.compute_slider_display_ms(mpv_pos_ms=4500) == 9000
assert m.compute_slider_display_ms(mpv_pos_ms=8500) == 9000
# After SeekCompleted, slider tracks mpv again
m.dispatch(SeekCompleted())
assert m.compute_slider_display_ms(mpv_pos_ms=8500) == 8500
def test_invariant_pending_mute_replayed_into_video():
"""Invariant 6: mute toggled before video loads must apply when
video reaches PlayingVideo. The state machine owns mute as truth;
ApplyMute(state.mute) fires on PlayingVideo entry."""
m = StateMachine()
# User mutes before any video has loaded
m.dispatch(MuteToggleRequested())
assert m.mute is True
# Now drive through to PlayingVideo
m.dispatch(ContentArrived(
path="/v.mp4", info="i", kind=MediaKind.VIDEO,
))
assert m.state == State.LOADING_VIDEO
effects = m.dispatch(VideoStarted())
assert m.state == State.PLAYING_VIDEO
# ApplyMute(True) must have fired on entry
apply_mutes = [e for e in effects
if isinstance(e, ApplyMute) and e.value is True]
assert apply_mutes
# ----------------------------------------------------------------------
# Illegal transition tests
# ----------------------------------------------------------------------
#
# At commit 11 these become env-gated raises (BOORU_VIEWER_STRICT_STATE).
# At commits 3-10 they return [] (the skeleton's default).
def test_strict_mode_raises_invalid_transition(monkeypatch):
"""When BOORU_VIEWER_STRICT_STATE is set, illegal events raise
InvalidTransition instead of dropping silently. This is the
development/debug mode that catches programmer errors at the
dispatch boundary."""
monkeypatch.setenv("BOORU_VIEWER_STRICT_STATE", "1")
m = _new_in(State.PLAYING_VIDEO)
with pytest.raises(InvalidTransition) as exc_info:
m.dispatch(VideoStarted())
assert exc_info.value.state == State.PLAYING_VIDEO
assert isinstance(exc_info.value.event, VideoStarted)
def test_strict_mode_does_not_raise_for_legal_events(monkeypatch):
"""Legal events go through dispatch normally even under strict mode."""
monkeypatch.setenv("BOORU_VIEWER_STRICT_STATE", "1")
m = _new_in(State.PLAYING_VIDEO)
# SeekRequested IS legal in PlayingVideo — no raise
effects = m.dispatch(SeekRequested(target_ms=5000))
assert m.state == State.SEEKING_VIDEO
def test_strict_mode_legal_but_no_op_does_not_raise(monkeypatch):
"""The 'legal-but-no-op' events (e.g. VideoEofReached in
LoadingVideo, the EOF race fix) must NOT raise in strict mode.
They're intentionally accepted and dropped — that's the
structural fix, not a programmer error."""
monkeypatch.setenv("BOORU_VIEWER_STRICT_STATE", "1")
m = _new_in(State.LOADING_VIDEO)
# VideoEofReached in LoadingVideo is legal-but-no-op
effects = m.dispatch(VideoEofReached())
assert effects == []
assert m.state == State.LOADING_VIDEO
@pytest.mark.parametrize("source_state, illegal_event", [
(State.AWAITING_CONTENT, VideoEofReached()),
(State.AWAITING_CONTENT, VideoStarted()),
(State.AWAITING_CONTENT, SeekRequested(target_ms=1000)),
(State.AWAITING_CONTENT, SeekCompleted()),
(State.AWAITING_CONTENT, TogglePlayRequested()),
(State.DISPLAYING_IMAGE, VideoEofReached()),
(State.DISPLAYING_IMAGE, VideoStarted()),
(State.DISPLAYING_IMAGE, SeekRequested(target_ms=1000)),
(State.DISPLAYING_IMAGE, SeekCompleted()),
(State.DISPLAYING_IMAGE, TogglePlayRequested()),
(State.LOADING_VIDEO, SeekRequested(target_ms=1000)),
(State.LOADING_VIDEO, SeekCompleted()),
(State.LOADING_VIDEO, TogglePlayRequested()),
(State.PLAYING_VIDEO, VideoStarted()),
(State.PLAYING_VIDEO, SeekCompleted()),
(State.SEEKING_VIDEO, VideoStarted()),
(State.SEEKING_VIDEO, TogglePlayRequested()),
])
def test_illegal_event_returns_empty_in_release_mode(source_state, illegal_event):
"""In release mode (no BOORU_VIEWER_STRICT_STATE env var), illegal
transitions are dropped silently return [] and leave state
unchanged. In strict mode (commit 11) they raise InvalidTransition.
The release-mode path is what production runs."""
m = _new_in(source_state)
effects = m.dispatch(illegal_event)
assert effects == []
assert m.state == source_state
# ----------------------------------------------------------------------
# Persistent state field tests (commits 8 + 9)
# ----------------------------------------------------------------------
def test_state_field_mute_persists_across_video_loads():
"""Once set, state.mute survives any number of LoadingVideo →
PlayingVideo cycles. Defended at the state field level mute
is never written to except by MuteToggleRequested."""
m = StateMachine()
m.dispatch(MuteToggleRequested())
assert m.mute is True
# Load several videos
for _ in range(3):
m.state = State.AWAITING_CONTENT
m.dispatch(ContentArrived(path="/v.mp4", info="",
kind=MediaKind.VIDEO))
m.dispatch(VideoStarted())
assert m.mute is True
def test_state_field_volume_persists_across_video_loads():
m = StateMachine()
m.dispatch(VolumeSet(value=85))
assert m.volume == 85
for _ in range(3):
m.state = State.AWAITING_CONTENT
m.dispatch(ContentArrived(path="/v.mp4", info="",
kind=MediaKind.VIDEO))
m.dispatch(VideoStarted())
assert m.volume == 85
def test_state_field_loop_mode_persists():
m = StateMachine()
m.dispatch(LoopModeSet(mode=LoopMode.NEXT))
assert m.loop_mode == LoopMode.NEXT
m.state = State.AWAITING_CONTENT
m.dispatch(ContentArrived(path="/v.mp4", info="",
kind=MediaKind.VIDEO))
m.dispatch(VideoStarted())
assert m.loop_mode == LoopMode.NEXT
# ----------------------------------------------------------------------
# Window event tests (commit 8)
# ----------------------------------------------------------------------
def test_window_moved_updates_viewport_center_only():
"""Move-only update: keep long_side, change center."""
m = _new_in(State.DISPLAYING_IMAGE)
m.viewport = Viewport(center_x=500.0, center_y=400.0, long_side=800.0)
m.dispatch(WindowMoved(rect=(200, 300, 1000, 800)))
assert m.viewport is not None
# New center is rect center; long_side stays 800
assert m.viewport.center_x == 700.0 # 200 + 1000/2
assert m.viewport.center_y == 700.0 # 300 + 800/2
assert m.viewport.long_side == 800.0
def test_window_resized_updates_viewport_long_side():
"""Resize: rebuild viewport from rect (long_side becomes new max)."""
m = _new_in(State.DISPLAYING_IMAGE)
m.viewport = Viewport(center_x=500.0, center_y=400.0, long_side=800.0)
m.dispatch(WindowResized(rect=(100, 100, 1200, 900)))
assert m.viewport is not None
assert m.viewport.long_side == 1200.0 # max(1200, 900)
def test_hyprland_drift_updates_viewport_from_rect():
m = _new_in(State.DISPLAYING_IMAGE)
m.viewport = Viewport(center_x=500.0, center_y=400.0, long_side=800.0)
m.dispatch(HyprlandDriftDetected(rect=(50, 50, 1500, 1000)))
assert m.viewport is not None
assert m.viewport.center_x == 800.0 # 50 + 1500/2
assert m.viewport.center_y == 550.0 # 50 + 1000/2
assert m.viewport.long_side == 1500.0