pax 526606c7c5 Convert Pixiv ugoira zips to animated GIFs, add filetype to info panel
- Detect .zip files (Pixiv ugoira) and convert frames to animated GIF
- Cache the converted GIF so subsequent loads are instant
- Add filetype field to the info panel
- Add ZIP to valid media magic bytes
2026-04-04 19:40:49 -05:00

242 lines
8.0 KiB
Python

"""Download manager and local file cache."""
from __future__ import annotations
import hashlib
import zipfile
from pathlib import Path
import httpx
from PIL import Image
from .config import cache_dir, thumbnails_dir, USER_AGENT
def _url_hash(url: str) -> str:
return hashlib.sha256(url.encode()).hexdigest()[:16]
_IMAGE_MAGIC = {
b'\x89PNG': True,
b'\xff\xd8\xff': True, # JPEG
b'GIF8': True,
b'RIFF': True, # WebP
b'\x00\x00\x00': True, # MP4/MOV
b'\x1aE\xdf\xa3': True, # WebM/MKV
b'PK\x03\x04': True, # ZIP (ugoira)
}
def _is_valid_media(path: Path) -> bool:
"""Check if a file looks like actual media, not an HTML error page."""
try:
with open(path, "rb") as f:
header = f.read(16)
if not header or header.startswith(b'<') or header.startswith(b'<!'):
return False
# Check for known magic bytes
for magic in _IMAGE_MAGIC:
if header.startswith(magic):
return True
# If not a known type but not HTML, assume it's ok
return b'<html' not in header.lower() and b'<!doctype' not in header.lower()
except Exception:
return False
def _ext_from_url(url: str) -> str:
path = url.split("?")[0]
if "." in path.split("/")[-1]:
return "." + path.split("/")[-1].rsplit(".", 1)[-1]
return ".jpg"
def _convert_ugoira_to_gif(zip_path: Path) -> Path:
"""Convert a Pixiv ugoira zip (numbered JPEG/PNG frames) to an animated GIF."""
gif_path = zip_path.with_suffix(".gif")
if gif_path.exists():
return gif_path
with zipfile.ZipFile(zip_path, "r") as zf:
names = sorted(zf.namelist())
frames = []
for name in names:
data = zf.read(name)
frames.append(Image.open(__import__("io").BytesIO(data)).convert("RGBA"))
if not frames:
raise ValueError("Zip contains no image frames")
frames[0].save(
gif_path, save_all=True, append_images=frames[1:],
duration=80, loop=0, disposal=2,
)
zip_path.unlink()
return gif_path
async def download_image(
url: str,
client: httpx.AsyncClient | None = None,
dest_dir: Path | None = None,
progress_callback=None,
) -> Path:
"""Download an image to the cache, returning the local path. Skips if already cached.
progress_callback: optional callable(bytes_downloaded, total_bytes)
"""
dest_dir = dest_dir or cache_dir()
filename = _url_hash(url) + _ext_from_url(url)
local = dest_dir / filename
# Check if a ugoira zip was already converted to gif
if local.suffix.lower() == ".zip":
gif_path = local.with_suffix(".gif")
if gif_path.exists():
return gif_path
# Validate cached file isn't corrupt (e.g. HTML error page saved as image)
if local.exists():
if _is_valid_media(local):
return local
else:
local.unlink() # Remove corrupt cache entry
# Extract referer from URL domain (needed for Gelbooru CDN etc.)
from urllib.parse import urlparse
parsed = urlparse(url)
# Map CDN hostnames back to the main site
referer_host = parsed.netloc
if referer_host.startswith("img") and "gelbooru" in referer_host:
referer_host = "gelbooru.com"
elif referer_host.startswith("cdn") and "donmai" in referer_host:
referer_host = "danbooru.donmai.us"
referer = f"{parsed.scheme}://{referer_host}/"
own_client = client is None
if own_client:
client = httpx.AsyncClient(
headers={
"User-Agent": USER_AGENT,
"Referer": referer,
"Accept": "image/*,video/*,*/*",
},
follow_redirects=True,
timeout=60.0,
)
try:
if progress_callback:
async with client.stream("GET", url) as resp:
resp.raise_for_status()
content_type = resp.headers.get("content-type", "")
if "text/html" in content_type:
raise ValueError(f"Server returned HTML instead of media (possible captcha/block)")
total = int(resp.headers.get("content-length", 0))
downloaded = 0
chunks = []
async for chunk in resp.aiter_bytes(8192):
chunks.append(chunk)
downloaded += len(chunk)
progress_callback(downloaded, total)
data = b"".join(chunks)
local.write_bytes(data)
else:
resp = await client.get(url)
resp.raise_for_status()
content_type = resp.headers.get("content-type", "")
if "text/html" in content_type:
raise ValueError(f"Server returned HTML instead of media (possible captcha/block)")
local.write_bytes(resp.content)
# Verify the downloaded file
if not _is_valid_media(local):
local.unlink()
raise ValueError("Downloaded file is not valid media")
# Convert ugoira zip to animated GIF
if local.suffix.lower() == ".zip" and zipfile.is_zipfile(local):
local = _convert_ugoira_to_gif(local)
finally:
if own_client:
await client.aclose()
return local
async def download_thumbnail(
url: str,
client: httpx.AsyncClient | None = None,
) -> Path:
"""Download a thumbnail preview image."""
return await download_image(url, client, thumbnails_dir())
def cached_path_for(url: str, dest_dir: Path | None = None) -> Path:
"""Return the expected cache path for a URL (may not exist yet)."""
dest_dir = dest_dir or cache_dir()
return dest_dir / (_url_hash(url) + _ext_from_url(url))
def is_cached(url: str, dest_dir: Path | None = None) -> bool:
return cached_path_for(url, dest_dir).exists()
def delete_from_library(post_id: int, folder: str | None = None) -> bool:
"""Delete a saved image from the library. Returns True if a file was deleted."""
from .config import saved_dir, saved_folder_dir
search_dir = saved_folder_dir(folder) if folder else saved_dir()
from .config import MEDIA_EXTENSIONS
for ext in MEDIA_EXTENSIONS:
path = search_dir / f"{post_id}{ext}"
if path.exists():
path.unlink()
return True
return False
def cache_size_bytes(include_thumbnails: bool = True) -> int:
"""Total size of all cached files in bytes."""
total = sum(f.stat().st_size for f in cache_dir().iterdir() if f.is_file())
if include_thumbnails:
total += sum(f.stat().st_size for f in thumbnails_dir().iterdir() if f.is_file())
return total
def cache_file_count(include_thumbnails: bool = True) -> tuple[int, int]:
"""Return (image_count, thumbnail_count)."""
images = sum(1 for f in cache_dir().iterdir() if f.is_file())
thumbs = sum(1 for f in thumbnails_dir().iterdir() if f.is_file()) if include_thumbnails else 0
return images, thumbs
def evict_oldest(max_bytes: int, protected_paths: set[str] | None = None) -> int:
"""Delete oldest non-protected cached images until under max_bytes. Returns count deleted."""
protected = protected_paths or set()
files = sorted(cache_dir().iterdir(), key=lambda f: f.stat().st_mtime)
deleted = 0
current = cache_size_bytes(include_thumbnails=False)
for f in files:
if current <= max_bytes:
break
if not f.is_file() or str(f) in protected:
continue
size = f.stat().st_size
f.unlink()
current -= size
deleted += 1
return deleted
def clear_cache(clear_images: bool = True, clear_thumbnails: bool = True) -> int:
"""Delete all cached files. Returns count deleted."""
deleted = 0
if clear_images:
for f in cache_dir().iterdir():
if f.is_file():
f.unlink()
deleted += 1
if clear_thumbnails:
for f in thumbnails_dir().iterdir():
if f.is_file():
f.unlink()
deleted += 1
return deleted