video_player: add stream-record for cache population during playback
Replaces the parallel httpx download with mpv's stream-record per-file option. When play_file receives an HTTP URL, it passes stream_record pointing at a .part temp file alongside the URL. mpv writes the incoming network stream to disk as it decodes, so a single HTTP connection serves both playback and cache population. On clean EOF the .part is promoted to the real cache path via os.replace. Seeks invalidate the recording (mpv may skip byte ranges), so _seeked_during_record flags it for discard. stop() and rapid-click cleanup also discard incomplete .part files. At this commit both pipelines are active — _load still runs the httpx download in parallel. Whichever finishes second wins os.replace. The next commit removes the httpx path.
This commit is contained in:
parent
264c421dff
commit
a01ac34944
@ -2,6 +2,10 @@
|
|||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from PySide6.QtCore import Qt, QTimer, Signal, Property
|
from PySide6.QtCore import Qt, QTimer, Signal, Property
|
||||||
from PySide6.QtGui import QColor
|
from PySide6.QtGui import QColor
|
||||||
from PySide6.QtWidgets import (
|
from PySide6.QtWidgets import (
|
||||||
@ -10,6 +14,8 @@ from PySide6.QtWidgets import (
|
|||||||
|
|
||||||
import mpv as mpvlib
|
import mpv as mpvlib
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
from .mpv_gl import _MpvGLWidget
|
from .mpv_gl import _MpvGLWidget
|
||||||
|
|
||||||
|
|
||||||
@ -232,6 +238,14 @@ class VideoPlayer(QWidget):
|
|||||||
# spawn unmuted by default. _ensure_mpv replays this on creation.
|
# spawn unmuted by default. _ensure_mpv replays this on creation.
|
||||||
self._pending_mute: bool = False
|
self._pending_mute: bool = False
|
||||||
|
|
||||||
|
# Stream-record state: mpv's stream-record option tees its
|
||||||
|
# network stream into a .part file that gets promoted to the
|
||||||
|
# real cache path on clean EOF. Eliminates the parallel httpx
|
||||||
|
# download that used to race with mpv for the same bytes.
|
||||||
|
self._stream_record_tmp: Path | None = None
|
||||||
|
self._stream_record_target: Path | None = None
|
||||||
|
self._seeked_during_record: bool = False
|
||||||
|
|
||||||
def _ensure_mpv(self) -> mpvlib.MPV:
|
def _ensure_mpv(self) -> mpvlib.MPV:
|
||||||
"""Set up mpv callbacks on first use. MPV instance is pre-created."""
|
"""Set up mpv callbacks on first use. MPV instance is pre-created."""
|
||||||
if self._mpv is not None:
|
if self._mpv is not None:
|
||||||
@ -313,6 +327,8 @@ class VideoPlayer(QWidget):
|
|||||||
def seek_to_ms(self, ms: int) -> None:
|
def seek_to_ms(self, ms: int) -> None:
|
||||||
if self._mpv:
|
if self._mpv:
|
||||||
self._mpv.seek(ms / 1000.0, 'absolute+exact')
|
self._mpv.seek(ms / 1000.0, 'absolute+exact')
|
||||||
|
if self._stream_record_target is not None:
|
||||||
|
self._seeked_during_record = True
|
||||||
|
|
||||||
def play_file(self, path: str, info: str = "") -> None:
|
def play_file(self, path: str, info: str = "") -> None:
|
||||||
"""Play a file from a local path OR a remote http(s) URL.
|
"""Play a file from a local path OR a remote http(s) URL.
|
||||||
@ -347,11 +363,23 @@ class VideoPlayer(QWidget):
|
|||||||
self._eof_ignore_until = _time.monotonic() + self._eof_ignore_window_secs
|
self._eof_ignore_until = _time.monotonic() + self._eof_ignore_window_secs
|
||||||
self._last_video_size = None # reset dedupe so new file fires a fit
|
self._last_video_size = None # reset dedupe so new file fires a fit
|
||||||
self._apply_loop_to_mpv()
|
self._apply_loop_to_mpv()
|
||||||
|
|
||||||
|
# Clean up any leftover .part from a previous play_file that
|
||||||
|
# didn't finish (rapid clicks, popout closed mid-stream, etc).
|
||||||
|
self._discard_stream_record()
|
||||||
|
|
||||||
if path.startswith(("http://", "https://")):
|
if path.startswith(("http://", "https://")):
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
from ...core.cache import _referer_for
|
from ...core.cache import _referer_for, cached_path_for
|
||||||
referer = _referer_for(urlparse(path))
|
referer = _referer_for(urlparse(path))
|
||||||
m.loadfile(path, "replace", referrer=referer)
|
target = cached_path_for(path)
|
||||||
|
target.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
tmp = target.with_suffix(target.suffix + ".part")
|
||||||
|
m.loadfile(path, "replace",
|
||||||
|
referrer=referer,
|
||||||
|
stream_record=tmp.as_posix())
|
||||||
|
self._stream_record_tmp = tmp
|
||||||
|
self._stream_record_target = target
|
||||||
else:
|
else:
|
||||||
m.loadfile(path)
|
m.loadfile(path)
|
||||||
if self._autoplay:
|
if self._autoplay:
|
||||||
@ -362,6 +390,7 @@ class VideoPlayer(QWidget):
|
|||||||
self._poll_timer.start()
|
self._poll_timer.start()
|
||||||
|
|
||||||
def stop(self) -> None:
|
def stop(self) -> None:
|
||||||
|
self._discard_stream_record()
|
||||||
self._poll_timer.stop()
|
self._poll_timer.stop()
|
||||||
if self._mpv:
|
if self._mpv:
|
||||||
self._mpv.command('stop')
|
self._mpv.command('stop')
|
||||||
@ -419,6 +448,8 @@ class VideoPlayer(QWidget):
|
|||||||
"""
|
"""
|
||||||
if self._mpv:
|
if self._mpv:
|
||||||
self._mpv.seek(pos / 1000.0, 'absolute+exact')
|
self._mpv.seek(pos / 1000.0, 'absolute+exact')
|
||||||
|
if self._stream_record_target is not None:
|
||||||
|
self._seeked_during_record = True
|
||||||
|
|
||||||
def _seek_relative(self, ms: int) -> None:
|
def _seek_relative(self, ms: int) -> None:
|
||||||
if self._mpv:
|
if self._mpv:
|
||||||
@ -516,12 +547,61 @@ class VideoPlayer(QWidget):
|
|||||||
if not self._eof_pending:
|
if not self._eof_pending:
|
||||||
return
|
return
|
||||||
self._eof_pending = False
|
self._eof_pending = False
|
||||||
|
self._finalize_stream_record()
|
||||||
if self._loop_state == 1: # Once
|
if self._loop_state == 1: # Once
|
||||||
self.pause()
|
self.pause()
|
||||||
elif self._loop_state == 2: # Next
|
elif self._loop_state == 2: # Next
|
||||||
self.pause()
|
self.pause()
|
||||||
self.play_next.emit()
|
self.play_next.emit()
|
||||||
|
|
||||||
|
# -- Stream-record helpers --
|
||||||
|
|
||||||
|
def _discard_stream_record(self) -> None:
|
||||||
|
"""Remove any pending stream-record temp file without promoting."""
|
||||||
|
tmp = self._stream_record_tmp
|
||||||
|
self._stream_record_tmp = None
|
||||||
|
self._stream_record_target = None
|
||||||
|
self._seeked_during_record = False
|
||||||
|
if tmp is not None:
|
||||||
|
try:
|
||||||
|
tmp.unlink(missing_ok=True)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _finalize_stream_record(self) -> None:
|
||||||
|
"""Promote the stream-record .part file to its final cache path.
|
||||||
|
|
||||||
|
Only promotes if: (a) there is a pending stream-record, (b) the
|
||||||
|
user did not seek during playback (seeking invalidates the file
|
||||||
|
because mpv may have skipped byte ranges), and (c) the .part
|
||||||
|
file exists and is non-empty.
|
||||||
|
"""
|
||||||
|
tmp = self._stream_record_tmp
|
||||||
|
target = self._stream_record_target
|
||||||
|
self._stream_record_tmp = None
|
||||||
|
self._stream_record_target = None
|
||||||
|
if tmp is None or target is None:
|
||||||
|
return
|
||||||
|
if self._seeked_during_record:
|
||||||
|
log.debug("Stream-record discarded (seek during playback): %s", tmp.name)
|
||||||
|
try:
|
||||||
|
tmp.unlink(missing_ok=True)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
return
|
||||||
|
if not tmp.exists() or tmp.stat().st_size == 0:
|
||||||
|
log.debug("Stream-record .part missing or empty: %s", tmp.name)
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
os.replace(tmp, target)
|
||||||
|
log.debug("Stream-record promoted: %s -> %s", tmp.name, target.name)
|
||||||
|
except OSError as e:
|
||||||
|
log.warning("Stream-record promote failed: %s", e)
|
||||||
|
try:
|
||||||
|
tmp.unlink(missing_ok=True)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _fmt(ms: int) -> str:
|
def _fmt(ms: int) -> str:
|
||||||
s = ms // 1000
|
s = ms // 1000
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user