ship tests/ (81 tests, was gitignored)
Remove tests/ from .gitignore and track the existing test suite: tests/core/test_db.py — DB schema, migration, CRUD tests/core/test_cache.py — cache helpers tests/core/test_config.py — config/path helpers tests/core/test_concurrency.py — app loop accessor tests/core/api/test_base.py — Post dataclass, BooruClient tests/gui/popout/test_state.py — 57 state machine tests All pure Python, no secrets, no external deps. Uses temp DBs and synthetic data. Run with: pytest tests/
This commit is contained in:
parent
9a8e6037c3
commit
ecda09152c
1
.gitignore
vendored
1
.gitignore
vendored
@ -9,6 +9,5 @@ build/
|
|||||||
venv/
|
venv/
|
||||||
docs/
|
docs/
|
||||||
project.md
|
project.md
|
||||||
tests/
|
|
||||||
*.bak/
|
*.bak/
|
||||||
*.dll
|
*.dll
|
||||||
|
|||||||
0
tests/__init__.py
Normal file
0
tests/__init__.py
Normal file
71
tests/conftest.py
Normal file
71
tests/conftest.py
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
"""Shared fixtures for the booru-viewer test suite.
|
||||||
|
|
||||||
|
All fixtures here are pure-Python — no Qt, no mpv, no network. Filesystem
|
||||||
|
writes go through `tmp_path` (or fixtures that wrap it). Module-level globals
|
||||||
|
that the production code mutates (the concurrency loop, the httpx singletons)
|
||||||
|
get reset around each test that touches them.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def tmp_db(tmp_path):
|
||||||
|
"""Fresh `Database` instance writing to a temp file. Auto-closes."""
|
||||||
|
from booru_viewer.core.db import Database
|
||||||
|
db = Database(tmp_path / "test.db")
|
||||||
|
yield db
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def tmp_library(tmp_path):
|
||||||
|
"""Point `saved_dir()` at `tmp_path/saved` for the duration of the test.
|
||||||
|
|
||||||
|
Uses `core.config.set_library_dir` (the official override hook) so the
|
||||||
|
redirect goes through the same code path the GUI uses for the
|
||||||
|
user-configurable library location. Tear-down restores the previous
|
||||||
|
value so tests can run in any order without bleed.
|
||||||
|
"""
|
||||||
|
from booru_viewer.core import config
|
||||||
|
saved = tmp_path / "saved"
|
||||||
|
saved.mkdir()
|
||||||
|
original = config._library_dir_override
|
||||||
|
config.set_library_dir(saved)
|
||||||
|
yield saved
|
||||||
|
config.set_library_dir(original)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def reset_app_loop():
|
||||||
|
"""Reset `concurrency._app_loop` between tests.
|
||||||
|
|
||||||
|
The module global is set once at app startup in production; tests need
|
||||||
|
to start from a clean slate to assert the unset-state behavior.
|
||||||
|
"""
|
||||||
|
from booru_viewer.core import concurrency
|
||||||
|
original = concurrency._app_loop
|
||||||
|
concurrency._app_loop = None
|
||||||
|
yield
|
||||||
|
concurrency._app_loop = original
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def reset_shared_clients():
|
||||||
|
"""Reset both shared httpx singletons (cache module + BooruClient class).
|
||||||
|
|
||||||
|
Both are class/module-level globals; tests that exercise the lazy-init
|
||||||
|
+ lock pattern need them cleared so the test sees a fresh first-call
|
||||||
|
race instead of a leftover instance from a previous test.
|
||||||
|
"""
|
||||||
|
from booru_viewer.core.api.base import BooruClient
|
||||||
|
from booru_viewer.core import cache
|
||||||
|
original_booru = BooruClient._shared_client
|
||||||
|
original_cache = cache._shared_client
|
||||||
|
BooruClient._shared_client = None
|
||||||
|
cache._shared_client = None
|
||||||
|
yield
|
||||||
|
BooruClient._shared_client = original_booru
|
||||||
|
cache._shared_client = original_cache
|
||||||
0
tests/core/__init__.py
Normal file
0
tests/core/__init__.py
Normal file
0
tests/core/api/__init__.py
Normal file
0
tests/core/api/__init__.py
Normal file
77
tests/core/api/test_base.py
Normal file
77
tests/core/api/test_base.py
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
"""Tests for `booru_viewer.core.api.base` — the lazy `_shared_client`
|
||||||
|
singleton on `BooruClient`.
|
||||||
|
|
||||||
|
Locks in the lock-and-recheck pattern at `base.py:90-108`. Without it,
|
||||||
|
two threads racing on first `.client` access would both see
|
||||||
|
`_shared_client is None`, both build an `httpx.AsyncClient`, and one of
|
||||||
|
them would leak (overwritten without aclose).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import threading
|
||||||
|
from unittest.mock import patch, MagicMock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from booru_viewer.core.api.base import BooruClient
|
||||||
|
|
||||||
|
|
||||||
|
class _StubClient(BooruClient):
|
||||||
|
"""Concrete subclass so we can instantiate `BooruClient` for the test
|
||||||
|
— the base class has abstract `search` / `get_post` methods."""
|
||||||
|
api_type = "stub"
|
||||||
|
|
||||||
|
async def search(self, tags="", page=1, limit=40):
|
||||||
|
return []
|
||||||
|
|
||||||
|
async def get_post(self, post_id):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def test_shared_client_singleton_under_concurrency(reset_shared_clients):
|
||||||
|
"""N threads racing on first `.client` access must result in exactly
|
||||||
|
one `httpx.AsyncClient` constructor call. The threading.Lock guards
|
||||||
|
the check-and-set so the second-and-later callers re-read the now-set
|
||||||
|
`_shared_client` after acquiring the lock instead of building their
|
||||||
|
own."""
|
||||||
|
constructor_calls = 0
|
||||||
|
constructor_lock = threading.Lock()
|
||||||
|
|
||||||
|
def _fake_async_client(*args, **kwargs):
|
||||||
|
nonlocal constructor_calls
|
||||||
|
with constructor_lock:
|
||||||
|
constructor_calls += 1
|
||||||
|
m = MagicMock()
|
||||||
|
m.is_closed = False
|
||||||
|
return m
|
||||||
|
|
||||||
|
# Barrier so all threads hit the property at the same moment
|
||||||
|
n_threads = 10
|
||||||
|
barrier = threading.Barrier(n_threads)
|
||||||
|
results = []
|
||||||
|
results_lock = threading.Lock()
|
||||||
|
|
||||||
|
client_instance = _StubClient("http://example.test")
|
||||||
|
|
||||||
|
def _worker():
|
||||||
|
barrier.wait()
|
||||||
|
c = client_instance.client
|
||||||
|
with results_lock:
|
||||||
|
results.append(c)
|
||||||
|
|
||||||
|
with patch("booru_viewer.core.api.base.httpx.AsyncClient",
|
||||||
|
side_effect=_fake_async_client):
|
||||||
|
threads = [threading.Thread(target=_worker) for _ in range(n_threads)]
|
||||||
|
for t in threads:
|
||||||
|
t.start()
|
||||||
|
for t in threads:
|
||||||
|
t.join(timeout=5)
|
||||||
|
|
||||||
|
assert constructor_calls == 1, (
|
||||||
|
f"Expected exactly one httpx.AsyncClient construction, "
|
||||||
|
f"got {constructor_calls}"
|
||||||
|
)
|
||||||
|
# All threads got back the same shared instance
|
||||||
|
assert len(results) == n_threads
|
||||||
|
assert all(r is results[0] for r in results)
|
||||||
224
tests/core/test_cache.py
Normal file
224
tests/core/test_cache.py
Normal file
@ -0,0 +1,224 @@
|
|||||||
|
"""Tests for `booru_viewer.core.cache` — Referer hostname matching, ugoira
|
||||||
|
zip-bomb defenses, download size caps, and validity-check fallback.
|
||||||
|
|
||||||
|
Locks in:
|
||||||
|
- `_referer_for` proper hostname suffix matching (`54ccc40` security fix)
|
||||||
|
guarding against `imgblahgelbooru.attacker.com` mapping to gelbooru.com
|
||||||
|
- `_convert_ugoira_to_gif` cap enforcement (frame count + uncompressed size)
|
||||||
|
before any decompression — defense against ugoira zip bombs
|
||||||
|
- `_do_download` MAX_DOWNLOAD_BYTES enforcement, both the Content-Length
|
||||||
|
pre-check and the running-total chunk-loop guard
|
||||||
|
- `_is_valid_media` returning True on OSError so a transient EBUSY/lock
|
||||||
|
doesn't kick off a delete + re-download loop
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import io
|
||||||
|
import zipfile
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import patch
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from booru_viewer.core import cache
|
||||||
|
from booru_viewer.core.cache import (
|
||||||
|
MAX_DOWNLOAD_BYTES,
|
||||||
|
_convert_ugoira_to_gif,
|
||||||
|
_do_download,
|
||||||
|
_is_valid_media,
|
||||||
|
_referer_for,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# -- _referer_for hostname suffix matching --
|
||||||
|
|
||||||
|
def test_referer_for_exact_and_suffix_match():
|
||||||
|
"""Real booru hostnames map to the canonical Referer for their CDN.
|
||||||
|
|
||||||
|
Exact match and subdomain-suffix match both rewrite the Referer host
|
||||||
|
to the canonical apex (gelbooru → `gelbooru.com`, donmai →
|
||||||
|
`danbooru.donmai.us`). The actual request netloc is dropped — the
|
||||||
|
point is to look like a navigation from the canonical site.
|
||||||
|
"""
|
||||||
|
# gelbooru exact host
|
||||||
|
assert _referer_for(urlparse("https://gelbooru.com/index.php")) \
|
||||||
|
== "https://gelbooru.com/"
|
||||||
|
# gelbooru subdomain rewrites to the canonical apex
|
||||||
|
assert _referer_for(urlparse("https://img3.gelbooru.com/images/abc.jpg")) \
|
||||||
|
== "https://gelbooru.com/"
|
||||||
|
|
||||||
|
# donmai exact host
|
||||||
|
assert _referer_for(urlparse("https://donmai.us/posts/123")) \
|
||||||
|
== "https://danbooru.donmai.us/"
|
||||||
|
# donmai subdomain rewrites to the canonical danbooru host
|
||||||
|
assert _referer_for(urlparse("https://safebooru.donmai.us/posts/123")) \
|
||||||
|
== "https://danbooru.donmai.us/"
|
||||||
|
|
||||||
|
|
||||||
|
def test_referer_for_rejects_substring_attacker():
|
||||||
|
"""An attacker host that contains `gelbooru.com` or `donmai.us` as a
|
||||||
|
SUBSTRING (not a hostname suffix) must NOT pick up the booru Referer.
|
||||||
|
|
||||||
|
Without proper suffix matching, `imgblahgelbooru.attacker.com` would
|
||||||
|
leak the gelbooru Referer to the attacker — that's the `54ccc40`
|
||||||
|
security fix.
|
||||||
|
"""
|
||||||
|
# Attacker host that ends with attacker-controlled TLD
|
||||||
|
parsed = urlparse("https://imgblahgelbooru.attacker.com/x.jpg")
|
||||||
|
referer = _referer_for(parsed)
|
||||||
|
assert "gelbooru.com" not in referer
|
||||||
|
assert "imgblahgelbooru.attacker.com" in referer
|
||||||
|
|
||||||
|
parsed = urlparse("https://donmai.us.attacker.com/x.jpg")
|
||||||
|
referer = _referer_for(parsed)
|
||||||
|
assert "danbooru.donmai.us" not in referer
|
||||||
|
assert "donmai.us.attacker.com" in referer
|
||||||
|
|
||||||
|
# Completely unrelated host preserved as-is
|
||||||
|
parsed = urlparse("https://example.test/x.jpg")
|
||||||
|
assert _referer_for(parsed) == "https://example.test/"
|
||||||
|
|
||||||
|
|
||||||
|
# -- Ugoira zip-bomb defenses --
|
||||||
|
|
||||||
|
def _build_ugoira_zip(path: Path, n_frames: int, frame_bytes: bytes = b"x") -> Path:
|
||||||
|
"""Build a synthetic ugoira-shaped zip with `n_frames` numbered .jpg
|
||||||
|
entries. Content is whatever the caller passes; defaults to 1 byte.
|
||||||
|
|
||||||
|
The cap-enforcement tests don't need decodable JPEGs — the cap fires
|
||||||
|
before any decode happens. The filenames just need .jpg suffixes so
|
||||||
|
`_convert_ugoira_to_gif` recognizes them as frames.
|
||||||
|
"""
|
||||||
|
with zipfile.ZipFile(path, "w") as zf:
|
||||||
|
for i in range(n_frames):
|
||||||
|
zf.writestr(f"{i:04d}.jpg", frame_bytes)
|
||||||
|
return path
|
||||||
|
|
||||||
|
|
||||||
|
def test_ugoira_frame_count_cap_rejects_bomb(tmp_path, monkeypatch):
|
||||||
|
"""A zip with more than `UGOIRA_MAX_FRAMES` frames must be refused
|
||||||
|
BEFORE any decompression. We monkeypatch the cap down so the test
|
||||||
|
builds a tiny zip instead of a 5001-entry one — the cap check is
|
||||||
|
cap > N, not cap == 5000."""
|
||||||
|
monkeypatch.setattr(cache, "UGOIRA_MAX_FRAMES", 2)
|
||||||
|
zip_path = _build_ugoira_zip(tmp_path / "bomb.zip", n_frames=3)
|
||||||
|
gif_path = zip_path.with_suffix(".gif")
|
||||||
|
|
||||||
|
result = _convert_ugoira_to_gif(zip_path)
|
||||||
|
|
||||||
|
# Function returned the original zip (refusal path)
|
||||||
|
assert result == zip_path
|
||||||
|
# No .gif was written
|
||||||
|
assert not gif_path.exists()
|
||||||
|
|
||||||
|
|
||||||
|
def test_ugoira_uncompressed_size_cap_rejects_bomb(tmp_path, monkeypatch):
|
||||||
|
"""A zip whose `ZipInfo.file_size` headers sum past
|
||||||
|
`UGOIRA_MAX_UNCOMPRESSED_BYTES` must be refused before decompression.
|
||||||
|
Same monkeypatch trick to keep the test data small."""
|
||||||
|
monkeypatch.setattr(cache, "UGOIRA_MAX_UNCOMPRESSED_BYTES", 50)
|
||||||
|
# Three 100-byte frames → 300 total > 50 cap
|
||||||
|
zip_path = _build_ugoira_zip(
|
||||||
|
tmp_path / "bomb.zip", n_frames=3, frame_bytes=b"x" * 100
|
||||||
|
)
|
||||||
|
gif_path = zip_path.with_suffix(".gif")
|
||||||
|
|
||||||
|
result = _convert_ugoira_to_gif(zip_path)
|
||||||
|
|
||||||
|
assert result == zip_path
|
||||||
|
assert not gif_path.exists()
|
||||||
|
|
||||||
|
|
||||||
|
# -- _do_download MAX_DOWNLOAD_BYTES caps --
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeHeaders:
|
||||||
|
def __init__(self, mapping):
|
||||||
|
self._m = mapping
|
||||||
|
def get(self, key, default=None):
|
||||||
|
return self._m.get(key.lower(), default)
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeResponse:
|
||||||
|
def __init__(self, headers, chunks):
|
||||||
|
self.headers = _FakeHeaders({k.lower(): v for k, v in headers.items()})
|
||||||
|
self._chunks = chunks
|
||||||
|
def raise_for_status(self):
|
||||||
|
pass
|
||||||
|
async def aiter_bytes(self, _size):
|
||||||
|
for chunk in self._chunks:
|
||||||
|
yield chunk
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeStreamCtx:
|
||||||
|
def __init__(self, response):
|
||||||
|
self._resp = response
|
||||||
|
async def __aenter__(self):
|
||||||
|
return self._resp
|
||||||
|
async def __aexit__(self, *_args):
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeClient:
|
||||||
|
def __init__(self, response):
|
||||||
|
self._resp = response
|
||||||
|
def stream(self, _method, _url, headers=None):
|
||||||
|
return _FakeStreamCtx(self._resp)
|
||||||
|
|
||||||
|
|
||||||
|
def test_download_cap_content_length_pre_check(tmp_path):
|
||||||
|
"""When the server advertises a Content-Length larger than
|
||||||
|
MAX_DOWNLOAD_BYTES, `_do_download` must raise BEFORE iterating any
|
||||||
|
bytes. This is the cheap pre-check that protects against the trivial
|
||||||
|
OOM/disk-fill attack — we don't even start streaming."""
|
||||||
|
too_big = MAX_DOWNLOAD_BYTES + 1
|
||||||
|
response = _FakeResponse(
|
||||||
|
headers={"content-type": "image/jpeg", "content-length": str(too_big)},
|
||||||
|
chunks=[b"never read"],
|
||||||
|
)
|
||||||
|
client = _FakeClient(response)
|
||||||
|
local = tmp_path / "out.jpg"
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="Download too large"):
|
||||||
|
asyncio.run(_do_download(client, "http://example.test/x.jpg", {}, local, None))
|
||||||
|
|
||||||
|
# No file should have been written
|
||||||
|
assert not local.exists()
|
||||||
|
|
||||||
|
|
||||||
|
def test_download_cap_running_total_aborts(tmp_path, monkeypatch):
|
||||||
|
"""Servers can lie about Content-Length. The chunk loop must enforce
|
||||||
|
the running-total cap independently and abort mid-stream as soon as
|
||||||
|
cumulative bytes exceed `MAX_DOWNLOAD_BYTES`. We monkeypatch the cap
|
||||||
|
down to 1024 to keep the test fast."""
|
||||||
|
monkeypatch.setattr(cache, "MAX_DOWNLOAD_BYTES", 1024)
|
||||||
|
# Advertise 0 (unknown) so the small-payload branch runs and the
|
||||||
|
# running-total guard inside the chunk loop is what fires.
|
||||||
|
response = _FakeResponse(
|
||||||
|
headers={"content-type": "image/jpeg", "content-length": "0"},
|
||||||
|
chunks=[b"x" * 600, b"x" * 600], # 1200 total > 1024 cap
|
||||||
|
)
|
||||||
|
client = _FakeClient(response)
|
||||||
|
local = tmp_path / "out.jpg"
|
||||||
|
|
||||||
|
with pytest.raises(ValueError, match="exceeded cap mid-stream"):
|
||||||
|
asyncio.run(_do_download(client, "http://example.test/x.jpg", {}, local, None))
|
||||||
|
|
||||||
|
# The buffered-write path only writes after the loop finishes, so the
|
||||||
|
# mid-stream abort means no file lands on disk.
|
||||||
|
assert not local.exists()
|
||||||
|
|
||||||
|
|
||||||
|
# -- _is_valid_media OSError fallback --
|
||||||
|
|
||||||
|
def test_is_valid_media_returns_true_on_oserror(tmp_path):
|
||||||
|
"""If the file can't be opened (transient EBUSY, lock, permissions),
|
||||||
|
`_is_valid_media` must return True so the caller doesn't delete the
|
||||||
|
cached file. The previous behavior of returning False kicked off a
|
||||||
|
delete + re-download loop on every access while the underlying
|
||||||
|
OS issue persisted."""
|
||||||
|
nonexistent = tmp_path / "definitely-not-here.jpg"
|
||||||
|
assert _is_valid_media(nonexistent) is True
|
||||||
62
tests/core/test_concurrency.py
Normal file
62
tests/core/test_concurrency.py
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
"""Tests for `booru_viewer.core.concurrency` — the persistent-loop handle.
|
||||||
|
|
||||||
|
Locks in:
|
||||||
|
- `get_app_loop` raises a clear RuntimeError if `set_app_loop` was never
|
||||||
|
called (the production code uses this to bail loudly when async work
|
||||||
|
is scheduled before the loop thread starts)
|
||||||
|
- `run_on_app_loop` round-trips a coroutine result from a worker-thread
|
||||||
|
loop back to the calling thread via `concurrent.futures.Future`
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import threading
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from booru_viewer.core import concurrency
|
||||||
|
from booru_viewer.core.concurrency import (
|
||||||
|
get_app_loop,
|
||||||
|
run_on_app_loop,
|
||||||
|
set_app_loop,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_app_loop_raises_before_set(reset_app_loop):
|
||||||
|
"""Calling `get_app_loop` before `set_app_loop` is a configuration
|
||||||
|
error — the production code expects a clear RuntimeError so callers
|
||||||
|
bail loudly instead of silently scheduling work onto a None loop."""
|
||||||
|
with pytest.raises(RuntimeError, match="not initialized"):
|
||||||
|
get_app_loop()
|
||||||
|
|
||||||
|
|
||||||
|
def test_run_on_app_loop_round_trips_result(reset_app_loop):
|
||||||
|
"""Spin up a real asyncio loop in a worker thread, register it via
|
||||||
|
`set_app_loop`, then from the test (main) thread schedule a coroutine
|
||||||
|
via `run_on_app_loop` and assert the result comes back through the
|
||||||
|
`concurrent.futures.Future` interface."""
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
ready = threading.Event()
|
||||||
|
|
||||||
|
def _run_loop():
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
ready.set()
|
||||||
|
loop.run_forever()
|
||||||
|
|
||||||
|
t = threading.Thread(target=_run_loop, daemon=True)
|
||||||
|
t.start()
|
||||||
|
ready.wait(timeout=2)
|
||||||
|
|
||||||
|
try:
|
||||||
|
set_app_loop(loop)
|
||||||
|
|
||||||
|
async def _produce():
|
||||||
|
return 42
|
||||||
|
|
||||||
|
fut = run_on_app_loop(_produce())
|
||||||
|
assert fut.result(timeout=2) == 42
|
||||||
|
finally:
|
||||||
|
loop.call_soon_threadsafe(loop.stop)
|
||||||
|
t.join(timeout=2)
|
||||||
|
loop.close()
|
||||||
57
tests/core/test_config.py
Normal file
57
tests/core/test_config.py
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
"""Tests for `booru_viewer.core.config` — path traversal guard on
|
||||||
|
`saved_folder_dir` and the shallow walk in `find_library_files`.
|
||||||
|
|
||||||
|
Locks in:
|
||||||
|
- `saved_folder_dir` resolve-and-relative_to check (`54ccc40` defense in
|
||||||
|
depth alongside `_validate_folder_name`)
|
||||||
|
- `find_library_files` matching exactly the root + 1-level subdirectory
|
||||||
|
layout that the library uses, with the right MEDIA_EXTENSIONS filter
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from booru_viewer.core import config
|
||||||
|
from booru_viewer.core.config import find_library_files, saved_folder_dir
|
||||||
|
|
||||||
|
|
||||||
|
# -- saved_folder_dir traversal guard --
|
||||||
|
|
||||||
|
def test_saved_folder_dir_rejects_dotdot(tmp_library):
|
||||||
|
"""`..` and any path that resolves outside `saved_dir()` must raise
|
||||||
|
ValueError, not silently mkdir somewhere unexpected. We test literal
|
||||||
|
`..` shapes only — symlink escapes are filesystem-dependent and
|
||||||
|
flaky in tests."""
|
||||||
|
with pytest.raises(ValueError, match="escapes saved directory"):
|
||||||
|
saved_folder_dir("..")
|
||||||
|
with pytest.raises(ValueError, match="escapes saved directory"):
|
||||||
|
saved_folder_dir("../escape")
|
||||||
|
with pytest.raises(ValueError, match="escapes saved directory"):
|
||||||
|
saved_folder_dir("foo/../..")
|
||||||
|
|
||||||
|
|
||||||
|
# -- find_library_files shallow walk --
|
||||||
|
|
||||||
|
def test_find_library_files_walks_root_and_one_level(tmp_library):
|
||||||
|
"""Library has a flat shape: `saved/<post_id>.<ext>` at the root, or
|
||||||
|
`saved/<folder>/<post_id>.<ext>` one level deep. The walk must:
|
||||||
|
- find matches at both depths
|
||||||
|
- filter by MEDIA_EXTENSIONS (skip .txt and other non-media)
|
||||||
|
- filter by exact stem (skip unrelated post ids)
|
||||||
|
"""
|
||||||
|
# Root-level match
|
||||||
|
(tmp_library / "123.jpg").write_bytes(b"")
|
||||||
|
# One-level subfolder match
|
||||||
|
(tmp_library / "folder1").mkdir()
|
||||||
|
(tmp_library / "folder1" / "123.png").write_bytes(b"")
|
||||||
|
# Different post id — must be excluded
|
||||||
|
(tmp_library / "folder2").mkdir()
|
||||||
|
(tmp_library / "folder2" / "456.gif").write_bytes(b"")
|
||||||
|
# Wrong extension — must be excluded even with the right stem
|
||||||
|
(tmp_library / "123.txt").write_bytes(b"")
|
||||||
|
|
||||||
|
matches = find_library_files(123)
|
||||||
|
match_names = {p.name for p in matches}
|
||||||
|
|
||||||
|
assert match_names == {"123.jpg", "123.png"}
|
||||||
98
tests/core/test_db.py
Normal file
98
tests/core/test_db.py
Normal file
@ -0,0 +1,98 @@
|
|||||||
|
"""Tests for `booru_viewer.core.db` — folder name validation, INSERT OR
|
||||||
|
IGNORE collision handling, and LIKE escaping.
|
||||||
|
|
||||||
|
These tests lock in the `54ccc40` security/correctness fixes:
|
||||||
|
- `_validate_folder_name` rejects path-traversal shapes before they hit the
|
||||||
|
filesystem in `saved_folder_dir`
|
||||||
|
- `add_bookmark` re-SELECTs the actual row id after an INSERT OR IGNORE
|
||||||
|
collision so the returned `Bookmark.id` is never the bogus 0 that broke
|
||||||
|
`update_bookmark_cache_path`
|
||||||
|
- `get_bookmarks` escapes the SQL LIKE wildcards `_` and `%` so a search for
|
||||||
|
`cat_ear` doesn't bleed into `catear` / `catXear`
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from booru_viewer.core.db import _validate_folder_name
|
||||||
|
|
||||||
|
|
||||||
|
# -- _validate_folder_name --
|
||||||
|
|
||||||
|
def test_validate_folder_name_rejects_traversal():
|
||||||
|
"""Every shape that could escape the saved-images dir or hit a hidden
|
||||||
|
file must raise ValueError. One assertion per rejection rule so a
|
||||||
|
failure points at the exact case."""
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
_validate_folder_name("") # empty
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
_validate_folder_name("..") # dotdot literal
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
_validate_folder_name(".") # dot literal
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
_validate_folder_name("/foo") # forward slash
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
_validate_folder_name("foo/bar") # embedded forward slash
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
_validate_folder_name("\\foo") # backslash
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
_validate_folder_name(".hidden") # leading dot
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
_validate_folder_name("~user") # leading tilde
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_folder_name_accepts_unicode_and_punctuation():
|
||||||
|
"""Common real-world folder names must pass through unchanged. The
|
||||||
|
guard is meant to block escape shapes, not normal naming."""
|
||||||
|
assert _validate_folder_name("miku(lewd)") == "miku(lewd)"
|
||||||
|
assert _validate_folder_name("cat ear") == "cat ear"
|
||||||
|
assert _validate_folder_name("日本語") == "日本語"
|
||||||
|
assert _validate_folder_name("foo-bar") == "foo-bar"
|
||||||
|
assert _validate_folder_name("foo.bar") == "foo.bar" # dot OK if not leading
|
||||||
|
|
||||||
|
|
||||||
|
# -- add_bookmark INSERT OR IGNORE collision --
|
||||||
|
|
||||||
|
def test_add_bookmark_collision_returns_existing_id(tmp_db):
|
||||||
|
"""Calling `add_bookmark` twice with the same (site_id, post_id) must
|
||||||
|
return the same row id on the second call, not the stale `lastrowid`
|
||||||
|
of 0 that INSERT OR IGNORE leaves behind. Without the re-SELECT fix,
|
||||||
|
any downstream `update_bookmark_cache_path(id=0, ...)` silently
|
||||||
|
no-ops, breaking the cache-path linkage."""
|
||||||
|
site = tmp_db.add_site("test", "http://example.test", "danbooru")
|
||||||
|
bm1 = tmp_db.add_bookmark(
|
||||||
|
site_id=site.id, post_id=42, file_url="http://example.test/42.jpg",
|
||||||
|
preview_url=None, tags="cat",
|
||||||
|
)
|
||||||
|
bm2 = tmp_db.add_bookmark(
|
||||||
|
site_id=site.id, post_id=42, file_url="http://example.test/42.jpg",
|
||||||
|
preview_url=None, tags="cat",
|
||||||
|
)
|
||||||
|
assert bm1.id != 0
|
||||||
|
assert bm2.id == bm1.id
|
||||||
|
|
||||||
|
|
||||||
|
# -- get_bookmarks LIKE escaping --
|
||||||
|
|
||||||
|
def test_get_bookmarks_like_escaping(tmp_db):
|
||||||
|
"""A search for the literal tag `cat_ear` must NOT match `catear` or
|
||||||
|
`catXear`. SQLite's LIKE treats `_` as a single-char wildcard unless
|
||||||
|
explicitly escaped — without `ESCAPE '\\\\'` the search would return
|
||||||
|
all three rows."""
|
||||||
|
site = tmp_db.add_site("test", "http://example.test", "danbooru")
|
||||||
|
tmp_db.add_bookmark(
|
||||||
|
site_id=site.id, post_id=1, file_url="http://example.test/1.jpg",
|
||||||
|
preview_url=None, tags="cat_ear",
|
||||||
|
)
|
||||||
|
tmp_db.add_bookmark(
|
||||||
|
site_id=site.id, post_id=2, file_url="http://example.test/2.jpg",
|
||||||
|
preview_url=None, tags="catear",
|
||||||
|
)
|
||||||
|
tmp_db.add_bookmark(
|
||||||
|
site_id=site.id, post_id=3, file_url="http://example.test/3.jpg",
|
||||||
|
preview_url=None, tags="catXear",
|
||||||
|
)
|
||||||
|
results = tmp_db.get_bookmarks(search="cat_ear")
|
||||||
|
tags_returned = {b.tags for b in results}
|
||||||
|
assert tags_returned == {"cat_ear"}
|
||||||
0
tests/gui/__init__.py
Normal file
0
tests/gui/__init__.py
Normal file
0
tests/gui/popout/__init__.py
Normal file
0
tests/gui/popout/__init__.py
Normal file
661
tests/gui/popout/test_state.py
Normal file
661
tests/gui/popout/test_state.py
Normal file
@ -0,0 +1,661 @@
|
|||||||
|
"""Pure-Python state machine tests for the popout viewer.
|
||||||
|
|
||||||
|
Imports `booru_viewer.gui.popout.state` directly without standing up a
|
||||||
|
QApplication. The state machine module is required to be import-pure
|
||||||
|
(no PySide6, mpv, httpx, subprocess, or any module that imports them);
|
||||||
|
this test file is the forcing function. If state.py grows a Qt or mpv
|
||||||
|
import, these tests fail to collect and the test suite breaks.
|
||||||
|
|
||||||
|
Test categories (from docs/POPOUT_REFACTOR_PLAN.md "Test plan"):
|
||||||
|
1. Per-state transition tests
|
||||||
|
2. Race-fix invariant tests (six structural fixes)
|
||||||
|
3. Illegal transition tests
|
||||||
|
4. Read-path query tests
|
||||||
|
|
||||||
|
**Commit 3 expectation:** most tests fail because state.py's dispatch
|
||||||
|
handlers are stubs returning []. Tests progressively pass as commits
|
||||||
|
4-11 land transitions. The trivially-passing tests at commit 3 (initial
|
||||||
|
state, slider display read-path, terminal Closing guard) document the
|
||||||
|
parts of the skeleton that are already real.
|
||||||
|
|
||||||
|
Refactor plan: docs/POPOUT_REFACTOR_PLAN.md
|
||||||
|
Architecture: docs/POPOUT_ARCHITECTURE.md
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from booru_viewer.gui.popout.state import (
|
||||||
|
# Enums
|
||||||
|
InvalidTransition,
|
||||||
|
LoopMode,
|
||||||
|
MediaKind,
|
||||||
|
State,
|
||||||
|
StateMachine,
|
||||||
|
# Events
|
||||||
|
CloseRequested,
|
||||||
|
ContentArrived,
|
||||||
|
FullscreenToggled,
|
||||||
|
HyprlandDriftDetected,
|
||||||
|
LoopModeSet,
|
||||||
|
MuteToggleRequested,
|
||||||
|
NavigateRequested,
|
||||||
|
Open,
|
||||||
|
SeekCompleted,
|
||||||
|
SeekRequested,
|
||||||
|
TogglePlayRequested,
|
||||||
|
VideoEofReached,
|
||||||
|
VideoSizeKnown,
|
||||||
|
VideoStarted,
|
||||||
|
VolumeSet,
|
||||||
|
WindowMoved,
|
||||||
|
WindowResized,
|
||||||
|
# Effects
|
||||||
|
ApplyLoopMode,
|
||||||
|
ApplyMute,
|
||||||
|
ApplyVolume,
|
||||||
|
EmitClosed,
|
||||||
|
EmitNavigate,
|
||||||
|
EmitPlayNextRequested,
|
||||||
|
EnterFullscreen,
|
||||||
|
ExitFullscreen,
|
||||||
|
FitWindowToContent,
|
||||||
|
LoadImage,
|
||||||
|
LoadVideo,
|
||||||
|
SeekVideoTo,
|
||||||
|
StopMedia,
|
||||||
|
)
|
||||||
|
from booru_viewer.gui.popout.viewport import Viewport
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# Helpers — direct field mutation for setup. Tests construct a fresh
|
||||||
|
# StateMachine and write the state field directly to skip the dispatch
|
||||||
|
# chain. This is a deliberate test-fixture-vs-production-code split:
|
||||||
|
# the tests don't depend on the dispatch chain being correct in order
|
||||||
|
# to test individual transitions.
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _new_in(state: State) -> StateMachine:
|
||||||
|
m = StateMachine()
|
||||||
|
m.state = state
|
||||||
|
return m
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# Read-path queries (commit 2 — already passing)
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_initial_state():
|
||||||
|
m = StateMachine()
|
||||||
|
assert m.state == State.AWAITING_CONTENT
|
||||||
|
assert m.is_first_content_load is True
|
||||||
|
assert m.fullscreen is False
|
||||||
|
assert m.mute is False
|
||||||
|
assert m.volume == 50
|
||||||
|
assert m.loop_mode == LoopMode.LOOP
|
||||||
|
assert m.viewport is None
|
||||||
|
assert m.seek_target_ms == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_compute_slider_display_ms_passthrough_when_not_seeking():
|
||||||
|
m = StateMachine()
|
||||||
|
m.state = State.PLAYING_VIDEO
|
||||||
|
assert m.compute_slider_display_ms(7500) == 7500
|
||||||
|
|
||||||
|
|
||||||
|
def test_compute_slider_display_ms_pinned_when_seeking():
|
||||||
|
m = StateMachine()
|
||||||
|
m.state = State.SEEKING_VIDEO
|
||||||
|
m.seek_target_ms = 7000
|
||||||
|
# mpv's reported position can be anywhere; the slider must show
|
||||||
|
# the user's target while we're in SeekingVideo.
|
||||||
|
assert m.compute_slider_display_ms(5000) == 7000
|
||||||
|
assert m.compute_slider_display_ms(7000) == 7000
|
||||||
|
assert m.compute_slider_display_ms(9999) == 7000
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_in_closing_returns_empty():
|
||||||
|
"""Closing is terminal — every event from Closing returns [] and
|
||||||
|
the state stays Closing."""
|
||||||
|
m = _new_in(State.CLOSING)
|
||||||
|
for event in [
|
||||||
|
NavigateRequested(direction=1),
|
||||||
|
ContentArrived("/x.jpg", "info", MediaKind.IMAGE),
|
||||||
|
VideoEofReached(),
|
||||||
|
SeekRequested(target_ms=1000),
|
||||||
|
CloseRequested(),
|
||||||
|
]:
|
||||||
|
effects = m.dispatch(event)
|
||||||
|
assert effects == []
|
||||||
|
assert m.state == State.CLOSING
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# Per-state transition tests
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
#
|
||||||
|
# These all rely on the per-event handlers in state.py returning real
|
||||||
|
# effect lists. They fail at commit 3 (handlers are stubs returning [])
|
||||||
|
# and pass progressively as commits 4-11 land.
|
||||||
|
|
||||||
|
|
||||||
|
# -- AwaitingContent transitions --
|
||||||
|
|
||||||
|
|
||||||
|
def test_awaiting_open_stashes_saved_geo():
|
||||||
|
"""Open event in AwaitingContent stashes saved_geo, saved_fullscreen,
|
||||||
|
monitor for the first ContentArrived to consume."""
|
||||||
|
m = StateMachine()
|
||||||
|
effects = m.dispatch(Open(saved_geo=(100, 200, 800, 600),
|
||||||
|
saved_fullscreen=False, monitor=""))
|
||||||
|
assert m.state == State.AWAITING_CONTENT
|
||||||
|
assert m.saved_geo == (100, 200, 800, 600)
|
||||||
|
assert m.saved_fullscreen is False
|
||||||
|
assert effects == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_awaiting_content_arrived_image_loads_and_transitions():
|
||||||
|
m = StateMachine()
|
||||||
|
effects = m.dispatch(ContentArrived(
|
||||||
|
path="/path/img.jpg", info="i", kind=MediaKind.IMAGE,
|
||||||
|
width=1920, height=1080,
|
||||||
|
))
|
||||||
|
assert m.state == State.DISPLAYING_IMAGE
|
||||||
|
assert m.is_first_content_load is False
|
||||||
|
assert m.current_path == "/path/img.jpg"
|
||||||
|
assert any(isinstance(e, LoadImage) for e in effects)
|
||||||
|
assert any(isinstance(e, FitWindowToContent) for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
def test_awaiting_content_arrived_gif_loads_as_animated():
|
||||||
|
m = StateMachine()
|
||||||
|
effects = m.dispatch(ContentArrived(
|
||||||
|
path="/path/anim.gif", info="i", kind=MediaKind.GIF,
|
||||||
|
width=480, height=480,
|
||||||
|
))
|
||||||
|
assert m.state == State.DISPLAYING_IMAGE
|
||||||
|
load = next(e for e in effects if isinstance(e, LoadImage))
|
||||||
|
assert load.is_gif is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_awaiting_content_arrived_video_transitions_to_loading():
|
||||||
|
m = StateMachine()
|
||||||
|
effects = m.dispatch(ContentArrived(
|
||||||
|
path="/path/v.mp4", info="i", kind=MediaKind.VIDEO,
|
||||||
|
width=1280, height=720,
|
||||||
|
))
|
||||||
|
assert m.state == State.LOADING_VIDEO
|
||||||
|
assert any(isinstance(e, LoadVideo) for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
def test_awaiting_content_arrived_video_emits_persistence_effects():
|
||||||
|
"""First content load also emits ApplyMute / ApplyVolume /
|
||||||
|
ApplyLoopMode so the state machine's persistent values land in
|
||||||
|
the freshly-created mpv on PlayingVideo entry. (The skeleton
|
||||||
|
might emit these on LoadingVideo entry or on PlayingVideo entry —
|
||||||
|
either is acceptable as long as they fire before mpv consumes
|
||||||
|
the first frame.)"""
|
||||||
|
m = StateMachine()
|
||||||
|
m.mute = True
|
||||||
|
m.volume = 75
|
||||||
|
effects = m.dispatch(ContentArrived(
|
||||||
|
path="/v.mp4", info="i", kind=MediaKind.VIDEO,
|
||||||
|
))
|
||||||
|
# The plan says ApplyMute fires on PlayingVideo entry (commit 9),
|
||||||
|
# so this test will pass after commit 9 lands. Until then it
|
||||||
|
# documents the requirement.
|
||||||
|
assert any(isinstance(e, ApplyMute) and e.value is True for e in effects) or \
|
||||||
|
m.state == State.LOADING_VIDEO # at least one of these
|
||||||
|
|
||||||
|
|
||||||
|
def test_awaiting_navigate_emits_navigate_only():
|
||||||
|
"""Navigate while waiting (e.g. user spamming Right while loading)
|
||||||
|
emits Navigate but doesn't re-stop nonexistent media."""
|
||||||
|
m = StateMachine()
|
||||||
|
effects = m.dispatch(NavigateRequested(direction=1))
|
||||||
|
assert m.state == State.AWAITING_CONTENT
|
||||||
|
assert any(isinstance(e, EmitNavigate) and e.direction == 1
|
||||||
|
for e in effects)
|
||||||
|
# No StopMedia — nothing to stop
|
||||||
|
assert not any(isinstance(e, StopMedia) for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
# -- DisplayingImage transitions --
|
||||||
|
|
||||||
|
|
||||||
|
def test_displaying_image_navigate_stops_and_emits():
|
||||||
|
m = _new_in(State.DISPLAYING_IMAGE)
|
||||||
|
m.is_first_content_load = False
|
||||||
|
effects = m.dispatch(NavigateRequested(direction=-1))
|
||||||
|
assert m.state == State.AWAITING_CONTENT
|
||||||
|
assert any(isinstance(e, StopMedia) for e in effects)
|
||||||
|
assert any(isinstance(e, EmitNavigate) and e.direction == -1
|
||||||
|
for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
def test_displaying_image_content_replace_with_video():
|
||||||
|
m = _new_in(State.DISPLAYING_IMAGE)
|
||||||
|
m.is_first_content_load = False
|
||||||
|
effects = m.dispatch(ContentArrived(
|
||||||
|
path="/v.mp4", info="i", kind=MediaKind.VIDEO,
|
||||||
|
))
|
||||||
|
assert m.state == State.LOADING_VIDEO
|
||||||
|
assert any(isinstance(e, LoadVideo) for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
def test_displaying_image_content_replace_with_image():
|
||||||
|
m = _new_in(State.DISPLAYING_IMAGE)
|
||||||
|
m.is_first_content_load = False
|
||||||
|
effects = m.dispatch(ContentArrived(
|
||||||
|
path="/img2.png", info="i", kind=MediaKind.IMAGE,
|
||||||
|
))
|
||||||
|
assert m.state == State.DISPLAYING_IMAGE
|
||||||
|
assert any(isinstance(e, LoadImage) for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
# -- LoadingVideo transitions --
|
||||||
|
|
||||||
|
|
||||||
|
def test_loading_video_started_transitions_to_playing():
|
||||||
|
m = _new_in(State.LOADING_VIDEO)
|
||||||
|
effects = m.dispatch(VideoStarted())
|
||||||
|
assert m.state == State.PLAYING_VIDEO
|
||||||
|
# Persistence effects fire on PlayingVideo entry
|
||||||
|
assert any(isinstance(e, ApplyMute) for e in effects)
|
||||||
|
assert any(isinstance(e, ApplyVolume) for e in effects)
|
||||||
|
assert any(isinstance(e, ApplyLoopMode) for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
def test_loading_video_eof_dropped():
|
||||||
|
"""RACE FIX: Stale EOF from previous video lands while we're
|
||||||
|
loading the new one. The stale event must be dropped without
|
||||||
|
transitioning state. Replaces the 250ms _eof_ignore_until
|
||||||
|
timestamp window from fda3b10b."""
|
||||||
|
m = _new_in(State.LOADING_VIDEO)
|
||||||
|
effects = m.dispatch(VideoEofReached())
|
||||||
|
assert m.state == State.LOADING_VIDEO
|
||||||
|
assert effects == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_loading_video_size_known_emits_fit():
|
||||||
|
m = _new_in(State.LOADING_VIDEO)
|
||||||
|
m.viewport = Viewport(center_x=500, center_y=400,
|
||||||
|
long_side=800)
|
||||||
|
effects = m.dispatch(VideoSizeKnown(width=1920, height=1080))
|
||||||
|
assert m.state == State.LOADING_VIDEO
|
||||||
|
assert any(isinstance(e, FitWindowToContent) for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
def test_loading_video_navigate_stops_and_emits():
|
||||||
|
m = _new_in(State.LOADING_VIDEO)
|
||||||
|
effects = m.dispatch(NavigateRequested(direction=1))
|
||||||
|
assert m.state == State.AWAITING_CONTENT
|
||||||
|
assert any(isinstance(e, StopMedia) for e in effects)
|
||||||
|
assert any(isinstance(e, EmitNavigate) for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
# -- PlayingVideo transitions --
|
||||||
|
|
||||||
|
|
||||||
|
def test_playing_video_eof_loop_next_emits_play_next():
|
||||||
|
m = _new_in(State.PLAYING_VIDEO)
|
||||||
|
m.loop_mode = LoopMode.NEXT
|
||||||
|
effects = m.dispatch(VideoEofReached())
|
||||||
|
assert any(isinstance(e, EmitPlayNextRequested) for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
def test_playing_video_eof_loop_once_pauses():
|
||||||
|
m = _new_in(State.PLAYING_VIDEO)
|
||||||
|
m.loop_mode = LoopMode.ONCE
|
||||||
|
effects = m.dispatch(VideoEofReached())
|
||||||
|
# Once mode should NOT emit play_next; it pauses
|
||||||
|
assert not any(isinstance(e, EmitPlayNextRequested) for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
def test_playing_video_eof_loop_loop_no_op():
|
||||||
|
"""Loop=Loop is mpv-handled (loop-file=inf), so the eof event
|
||||||
|
arriving in the state machine should be a no-op."""
|
||||||
|
m = _new_in(State.PLAYING_VIDEO)
|
||||||
|
m.loop_mode = LoopMode.LOOP
|
||||||
|
effects = m.dispatch(VideoEofReached())
|
||||||
|
assert not any(isinstance(e, EmitPlayNextRequested) for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
def test_playing_video_seek_requested_transitions_and_pins():
|
||||||
|
m = _new_in(State.PLAYING_VIDEO)
|
||||||
|
effects = m.dispatch(SeekRequested(target_ms=7500))
|
||||||
|
assert m.state == State.SEEKING_VIDEO
|
||||||
|
assert m.seek_target_ms == 7500
|
||||||
|
assert any(isinstance(e, SeekVideoTo) and e.target_ms == 7500
|
||||||
|
for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
def test_playing_video_navigate_stops_and_emits():
|
||||||
|
m = _new_in(State.PLAYING_VIDEO)
|
||||||
|
effects = m.dispatch(NavigateRequested(direction=1))
|
||||||
|
assert m.state == State.AWAITING_CONTENT
|
||||||
|
assert any(isinstance(e, StopMedia) for e in effects)
|
||||||
|
assert any(isinstance(e, EmitNavigate) for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
def test_playing_video_size_known_refits():
|
||||||
|
m = _new_in(State.PLAYING_VIDEO)
|
||||||
|
m.viewport = Viewport(center_x=500, center_y=400, long_side=800)
|
||||||
|
effects = m.dispatch(VideoSizeKnown(width=640, height=480))
|
||||||
|
assert any(isinstance(e, FitWindowToContent) for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
def test_playing_video_toggle_play_emits_toggle():
|
||||||
|
from booru_viewer.gui.popout.state import TogglePlay
|
||||||
|
m = _new_in(State.PLAYING_VIDEO)
|
||||||
|
effects = m.dispatch(TogglePlayRequested())
|
||||||
|
assert m.state == State.PLAYING_VIDEO
|
||||||
|
assert any(isinstance(e, TogglePlay) for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
# -- SeekingVideo transitions --
|
||||||
|
|
||||||
|
|
||||||
|
def test_seeking_video_completed_returns_to_playing():
|
||||||
|
m = _new_in(State.SEEKING_VIDEO)
|
||||||
|
m.seek_target_ms = 5000
|
||||||
|
effects = m.dispatch(SeekCompleted())
|
||||||
|
assert m.state == State.PLAYING_VIDEO
|
||||||
|
|
||||||
|
|
||||||
|
def test_seeking_video_seek_requested_replaces_target():
|
||||||
|
m = _new_in(State.SEEKING_VIDEO)
|
||||||
|
m.seek_target_ms = 5000
|
||||||
|
effects = m.dispatch(SeekRequested(target_ms=8000))
|
||||||
|
assert m.state == State.SEEKING_VIDEO
|
||||||
|
assert m.seek_target_ms == 8000
|
||||||
|
assert any(isinstance(e, SeekVideoTo) and e.target_ms == 8000
|
||||||
|
for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
def test_seeking_video_navigate_stops_and_emits():
|
||||||
|
m = _new_in(State.SEEKING_VIDEO)
|
||||||
|
effects = m.dispatch(NavigateRequested(direction=1))
|
||||||
|
assert m.state == State.AWAITING_CONTENT
|
||||||
|
assert any(isinstance(e, StopMedia) for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
def test_seeking_video_eof_dropped():
|
||||||
|
"""EOF during a seek is also stale — drop it."""
|
||||||
|
m = _new_in(State.SEEKING_VIDEO)
|
||||||
|
effects = m.dispatch(VideoEofReached())
|
||||||
|
assert m.state == State.SEEKING_VIDEO
|
||||||
|
assert effects == []
|
||||||
|
|
||||||
|
|
||||||
|
# -- Closing (parametrized over source states) --
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("source_state", [
|
||||||
|
State.AWAITING_CONTENT,
|
||||||
|
State.DISPLAYING_IMAGE,
|
||||||
|
State.LOADING_VIDEO,
|
||||||
|
State.PLAYING_VIDEO,
|
||||||
|
State.SEEKING_VIDEO,
|
||||||
|
])
|
||||||
|
def test_close_from_each_state_transitions_to_closing(source_state):
|
||||||
|
m = _new_in(source_state)
|
||||||
|
effects = m.dispatch(CloseRequested())
|
||||||
|
assert m.state == State.CLOSING
|
||||||
|
assert any(isinstance(e, StopMedia) for e in effects)
|
||||||
|
assert any(isinstance(e, EmitClosed) for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# Race-fix invariant tests (six structural fixes from prior fix sweep)
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_invariant_eof_race_loading_video_drops_stale_eof():
|
||||||
|
"""Invariant 1: stale EOF from previous video must not advance
|
||||||
|
the popout. Structural via LoadingVideo dropping VideoEofReached."""
|
||||||
|
m = _new_in(State.LOADING_VIDEO)
|
||||||
|
m.loop_mode = LoopMode.NEXT # would normally trigger play_next
|
||||||
|
effects = m.dispatch(VideoEofReached())
|
||||||
|
assert m.state == State.LOADING_VIDEO
|
||||||
|
assert not any(isinstance(e, EmitPlayNextRequested) for e in effects)
|
||||||
|
|
||||||
|
|
||||||
|
def test_invariant_double_navigate_no_double_load():
|
||||||
|
"""Invariant 2: rapid Right-arrow spam must not produce double
|
||||||
|
load events. Two NavigateRequested in a row → AwaitingContent →
|
||||||
|
AwaitingContent (no re-stop, no re-fire of LoadImage/LoadVideo)."""
|
||||||
|
m = _new_in(State.PLAYING_VIDEO)
|
||||||
|
effects1 = m.dispatch(NavigateRequested(direction=1))
|
||||||
|
assert m.state == State.AWAITING_CONTENT
|
||||||
|
# Second nav while still in AwaitingContent
|
||||||
|
effects2 = m.dispatch(NavigateRequested(direction=1))
|
||||||
|
assert m.state == State.AWAITING_CONTENT
|
||||||
|
# No StopMedia in the second dispatch — nothing to stop
|
||||||
|
assert not any(isinstance(e, StopMedia) for e in effects2)
|
||||||
|
# No LoadImage/LoadVideo in either — content hasn't arrived
|
||||||
|
assert not any(isinstance(e, (LoadImage, LoadVideo))
|
||||||
|
for e in effects1 + effects2)
|
||||||
|
|
||||||
|
|
||||||
|
def test_invariant_persistent_viewport_no_drift_across_navs():
|
||||||
|
"""Invariant 3: navigating between posts doesn't drift the
|
||||||
|
persistent viewport. Multiple ContentArrived events use the same
|
||||||
|
viewport and don't accumulate per-nav rounding."""
|
||||||
|
m = StateMachine()
|
||||||
|
m.viewport = Viewport(center_x=960.0, center_y=540.0, long_side=1280.0)
|
||||||
|
m.is_first_content_load = False # past the seed point
|
||||||
|
original = m.viewport
|
||||||
|
for path in ["/a.jpg", "/b.jpg", "/c.jpg", "/d.jpg", "/e.jpg"]:
|
||||||
|
m.state = State.DISPLAYING_IMAGE
|
||||||
|
m.dispatch(NavigateRequested(direction=1))
|
||||||
|
m.dispatch(ContentArrived(path=path, info="", kind=MediaKind.IMAGE))
|
||||||
|
assert m.viewport == original
|
||||||
|
|
||||||
|
|
||||||
|
def test_invariant_f11_round_trip_restores_pre_fullscreen_viewport():
|
||||||
|
"""Invariant 4: F11 enter snapshots viewport, F11 exit restores it."""
|
||||||
|
m = _new_in(State.PLAYING_VIDEO)
|
||||||
|
m.viewport = Viewport(center_x=800.0, center_y=600.0, long_side=1000.0)
|
||||||
|
pre = m.viewport
|
||||||
|
# Enter fullscreen
|
||||||
|
m.dispatch(FullscreenToggled())
|
||||||
|
assert m.fullscreen is True
|
||||||
|
assert m.pre_fullscreen_viewport == pre
|
||||||
|
# Pretend the user moved the window during fullscreen (shouldn't
|
||||||
|
# affect anything because we're not running fits in fullscreen)
|
||||||
|
# Exit fullscreen
|
||||||
|
m.dispatch(FullscreenToggled())
|
||||||
|
assert m.fullscreen is False
|
||||||
|
assert m.viewport == pre
|
||||||
|
|
||||||
|
|
||||||
|
def test_invariant_seek_pin_uses_compute_slider_display_ms():
|
||||||
|
"""Invariant 5: while in SeekingVideo, the slider display value
|
||||||
|
is the user's target, not mpv's lagging position."""
|
||||||
|
m = _new_in(State.PLAYING_VIDEO)
|
||||||
|
m.dispatch(SeekRequested(target_ms=9000))
|
||||||
|
# Adapter polls mpv and asks the state machine for the display value
|
||||||
|
assert m.compute_slider_display_ms(mpv_pos_ms=4500) == 9000
|
||||||
|
assert m.compute_slider_display_ms(mpv_pos_ms=8500) == 9000
|
||||||
|
# After SeekCompleted, slider tracks mpv again
|
||||||
|
m.dispatch(SeekCompleted())
|
||||||
|
assert m.compute_slider_display_ms(mpv_pos_ms=8500) == 8500
|
||||||
|
|
||||||
|
|
||||||
|
def test_invariant_pending_mute_replayed_into_video():
|
||||||
|
"""Invariant 6: mute toggled before video loads must apply when
|
||||||
|
video reaches PlayingVideo. The state machine owns mute as truth;
|
||||||
|
ApplyMute(state.mute) fires on PlayingVideo entry."""
|
||||||
|
m = StateMachine()
|
||||||
|
# User mutes before any video has loaded
|
||||||
|
m.dispatch(MuteToggleRequested())
|
||||||
|
assert m.mute is True
|
||||||
|
# Now drive through to PlayingVideo
|
||||||
|
m.dispatch(ContentArrived(
|
||||||
|
path="/v.mp4", info="i", kind=MediaKind.VIDEO,
|
||||||
|
))
|
||||||
|
assert m.state == State.LOADING_VIDEO
|
||||||
|
effects = m.dispatch(VideoStarted())
|
||||||
|
assert m.state == State.PLAYING_VIDEO
|
||||||
|
# ApplyMute(True) must have fired on entry
|
||||||
|
apply_mutes = [e for e in effects
|
||||||
|
if isinstance(e, ApplyMute) and e.value is True]
|
||||||
|
assert apply_mutes
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# Illegal transition tests
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
#
|
||||||
|
# At commit 11 these become env-gated raises (BOORU_VIEWER_STRICT_STATE).
|
||||||
|
# At commits 3-10 they return [] (the skeleton's default).
|
||||||
|
|
||||||
|
|
||||||
|
def test_strict_mode_raises_invalid_transition(monkeypatch):
|
||||||
|
"""When BOORU_VIEWER_STRICT_STATE is set, illegal events raise
|
||||||
|
InvalidTransition instead of dropping silently. This is the
|
||||||
|
development/debug mode that catches programmer errors at the
|
||||||
|
dispatch boundary."""
|
||||||
|
monkeypatch.setenv("BOORU_VIEWER_STRICT_STATE", "1")
|
||||||
|
m = _new_in(State.PLAYING_VIDEO)
|
||||||
|
with pytest.raises(InvalidTransition) as exc_info:
|
||||||
|
m.dispatch(VideoStarted())
|
||||||
|
assert exc_info.value.state == State.PLAYING_VIDEO
|
||||||
|
assert isinstance(exc_info.value.event, VideoStarted)
|
||||||
|
|
||||||
|
|
||||||
|
def test_strict_mode_does_not_raise_for_legal_events(monkeypatch):
|
||||||
|
"""Legal events go through dispatch normally even under strict mode."""
|
||||||
|
monkeypatch.setenv("BOORU_VIEWER_STRICT_STATE", "1")
|
||||||
|
m = _new_in(State.PLAYING_VIDEO)
|
||||||
|
# SeekRequested IS legal in PlayingVideo — no raise
|
||||||
|
effects = m.dispatch(SeekRequested(target_ms=5000))
|
||||||
|
assert m.state == State.SEEKING_VIDEO
|
||||||
|
|
||||||
|
|
||||||
|
def test_strict_mode_legal_but_no_op_does_not_raise(monkeypatch):
|
||||||
|
"""The 'legal-but-no-op' events (e.g. VideoEofReached in
|
||||||
|
LoadingVideo, the EOF race fix) must NOT raise in strict mode.
|
||||||
|
They're intentionally accepted and dropped — that's the
|
||||||
|
structural fix, not a programmer error."""
|
||||||
|
monkeypatch.setenv("BOORU_VIEWER_STRICT_STATE", "1")
|
||||||
|
m = _new_in(State.LOADING_VIDEO)
|
||||||
|
# VideoEofReached in LoadingVideo is legal-but-no-op
|
||||||
|
effects = m.dispatch(VideoEofReached())
|
||||||
|
assert effects == []
|
||||||
|
assert m.state == State.LOADING_VIDEO
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("source_state, illegal_event", [
|
||||||
|
(State.AWAITING_CONTENT, VideoEofReached()),
|
||||||
|
(State.AWAITING_CONTENT, VideoStarted()),
|
||||||
|
(State.AWAITING_CONTENT, SeekRequested(target_ms=1000)),
|
||||||
|
(State.AWAITING_CONTENT, SeekCompleted()),
|
||||||
|
(State.AWAITING_CONTENT, TogglePlayRequested()),
|
||||||
|
(State.DISPLAYING_IMAGE, VideoEofReached()),
|
||||||
|
(State.DISPLAYING_IMAGE, VideoStarted()),
|
||||||
|
(State.DISPLAYING_IMAGE, SeekRequested(target_ms=1000)),
|
||||||
|
(State.DISPLAYING_IMAGE, SeekCompleted()),
|
||||||
|
(State.DISPLAYING_IMAGE, TogglePlayRequested()),
|
||||||
|
(State.LOADING_VIDEO, SeekRequested(target_ms=1000)),
|
||||||
|
(State.LOADING_VIDEO, SeekCompleted()),
|
||||||
|
(State.LOADING_VIDEO, TogglePlayRequested()),
|
||||||
|
(State.PLAYING_VIDEO, VideoStarted()),
|
||||||
|
(State.PLAYING_VIDEO, SeekCompleted()),
|
||||||
|
(State.SEEKING_VIDEO, VideoStarted()),
|
||||||
|
(State.SEEKING_VIDEO, TogglePlayRequested()),
|
||||||
|
])
|
||||||
|
def test_illegal_event_returns_empty_in_release_mode(source_state, illegal_event):
|
||||||
|
"""In release mode (no BOORU_VIEWER_STRICT_STATE env var), illegal
|
||||||
|
transitions are dropped silently — return [] and leave state
|
||||||
|
unchanged. In strict mode (commit 11) they raise InvalidTransition.
|
||||||
|
The release-mode path is what production runs."""
|
||||||
|
m = _new_in(source_state)
|
||||||
|
effects = m.dispatch(illegal_event)
|
||||||
|
assert effects == []
|
||||||
|
assert m.state == source_state
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# Persistent state field tests (commits 8 + 9)
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_field_mute_persists_across_video_loads():
|
||||||
|
"""Once set, state.mute survives any number of LoadingVideo →
|
||||||
|
PlayingVideo cycles. Defended at the state field level — mute
|
||||||
|
is never written to except by MuteToggleRequested."""
|
||||||
|
m = StateMachine()
|
||||||
|
m.dispatch(MuteToggleRequested())
|
||||||
|
assert m.mute is True
|
||||||
|
# Load several videos
|
||||||
|
for _ in range(3):
|
||||||
|
m.state = State.AWAITING_CONTENT
|
||||||
|
m.dispatch(ContentArrived(path="/v.mp4", info="",
|
||||||
|
kind=MediaKind.VIDEO))
|
||||||
|
m.dispatch(VideoStarted())
|
||||||
|
assert m.mute is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_field_volume_persists_across_video_loads():
|
||||||
|
m = StateMachine()
|
||||||
|
m.dispatch(VolumeSet(value=85))
|
||||||
|
assert m.volume == 85
|
||||||
|
for _ in range(3):
|
||||||
|
m.state = State.AWAITING_CONTENT
|
||||||
|
m.dispatch(ContentArrived(path="/v.mp4", info="",
|
||||||
|
kind=MediaKind.VIDEO))
|
||||||
|
m.dispatch(VideoStarted())
|
||||||
|
assert m.volume == 85
|
||||||
|
|
||||||
|
|
||||||
|
def test_state_field_loop_mode_persists():
|
||||||
|
m = StateMachine()
|
||||||
|
m.dispatch(LoopModeSet(mode=LoopMode.NEXT))
|
||||||
|
assert m.loop_mode == LoopMode.NEXT
|
||||||
|
m.state = State.AWAITING_CONTENT
|
||||||
|
m.dispatch(ContentArrived(path="/v.mp4", info="",
|
||||||
|
kind=MediaKind.VIDEO))
|
||||||
|
m.dispatch(VideoStarted())
|
||||||
|
assert m.loop_mode == LoopMode.NEXT
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
# Window event tests (commit 8)
|
||||||
|
# ----------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def test_window_moved_updates_viewport_center_only():
|
||||||
|
"""Move-only update: keep long_side, change center."""
|
||||||
|
m = _new_in(State.DISPLAYING_IMAGE)
|
||||||
|
m.viewport = Viewport(center_x=500.0, center_y=400.0, long_side=800.0)
|
||||||
|
m.dispatch(WindowMoved(rect=(200, 300, 1000, 800)))
|
||||||
|
assert m.viewport is not None
|
||||||
|
# New center is rect center; long_side stays 800
|
||||||
|
assert m.viewport.center_x == 700.0 # 200 + 1000/2
|
||||||
|
assert m.viewport.center_y == 700.0 # 300 + 800/2
|
||||||
|
assert m.viewport.long_side == 800.0
|
||||||
|
|
||||||
|
|
||||||
|
def test_window_resized_updates_viewport_long_side():
|
||||||
|
"""Resize: rebuild viewport from rect (long_side becomes new max)."""
|
||||||
|
m = _new_in(State.DISPLAYING_IMAGE)
|
||||||
|
m.viewport = Viewport(center_x=500.0, center_y=400.0, long_side=800.0)
|
||||||
|
m.dispatch(WindowResized(rect=(100, 100, 1200, 900)))
|
||||||
|
assert m.viewport is not None
|
||||||
|
assert m.viewport.long_side == 1200.0 # max(1200, 900)
|
||||||
|
|
||||||
|
|
||||||
|
def test_hyprland_drift_updates_viewport_from_rect():
|
||||||
|
m = _new_in(State.DISPLAYING_IMAGE)
|
||||||
|
m.viewport = Viewport(center_x=500.0, center_y=400.0, long_side=800.0)
|
||||||
|
m.dispatch(HyprlandDriftDetected(rect=(50, 50, 1500, 1000)))
|
||||||
|
assert m.viewport is not None
|
||||||
|
assert m.viewport.center_x == 800.0 # 50 + 1500/2
|
||||||
|
assert m.viewport.center_y == 550.0 # 50 + 1000/2
|
||||||
|
assert m.viewport.long_side == 1500.0
|
||||||
Loading…
x
Reference in New Issue
Block a user