"""Download manager and local file cache.""" from __future__ import annotations import hashlib from pathlib import Path import httpx from .config import cache_dir, thumbnails_dir, USER_AGENT def _url_hash(url: str) -> str: return hashlib.sha256(url.encode()).hexdigest()[:16] _IMAGE_MAGIC = { b'\x89PNG': True, b'\xff\xd8\xff': True, # JPEG b'GIF8': True, b'RIFF': True, # WebP b'\x00\x00\x00': True, # MP4/MOV b'\x1aE\xdf\xa3': True, # WebM/MKV } def _is_valid_media(path: Path) -> bool: """Check if a file looks like actual media, not an HTML error page.""" try: with open(path, "rb") as f: header = f.read(16) if not header or header.startswith(b'<') or header.startswith(b' str: path = url.split("?")[0] if "." in path.split("/")[-1]: return "." + path.split("/")[-1].rsplit(".", 1)[-1] return ".jpg" async def download_image( url: str, client: httpx.AsyncClient | None = None, dest_dir: Path | None = None, progress_callback=None, ) -> Path: """Download an image to the cache, returning the local path. Skips if already cached. progress_callback: optional callable(bytes_downloaded, total_bytes) """ dest_dir = dest_dir or cache_dir() filename = _url_hash(url) + _ext_from_url(url) local = dest_dir / filename # Validate cached file isn't corrupt (e.g. HTML error page saved as image) if local.exists(): if _is_valid_media(local): return local else: local.unlink() # Remove corrupt cache entry # Extract referer from URL domain (needed for Gelbooru CDN etc.) from urllib.parse import urlparse parsed = urlparse(url) # Map CDN hostnames back to the main site referer_host = parsed.netloc if referer_host.startswith("img") and "gelbooru" in referer_host: referer_host = "gelbooru.com" elif referer_host.startswith("cdn") and "donmai" in referer_host: referer_host = "danbooru.donmai.us" referer = f"{parsed.scheme}://{referer_host}/" own_client = client is None if own_client: client = httpx.AsyncClient( headers={ "User-Agent": USER_AGENT, "Referer": referer, "Accept": "image/*,video/*,*/*", }, follow_redirects=True, timeout=60.0, ) try: if progress_callback: async with client.stream("GET", url) as resp: resp.raise_for_status() content_type = resp.headers.get("content-type", "") if "text/html" in content_type: raise ValueError(f"Server returned HTML instead of media (possible captcha/block)") total = int(resp.headers.get("content-length", 0)) downloaded = 0 chunks = [] async for chunk in resp.aiter_bytes(8192): chunks.append(chunk) downloaded += len(chunk) progress_callback(downloaded, total) data = b"".join(chunks) local.write_bytes(data) else: resp = await client.get(url) resp.raise_for_status() content_type = resp.headers.get("content-type", "") if "text/html" in content_type: raise ValueError(f"Server returned HTML instead of media (possible captcha/block)") local.write_bytes(resp.content) # Verify the downloaded file if not _is_valid_media(local): local.unlink() raise ValueError("Downloaded file is not valid media") finally: if own_client: await client.aclose() return local async def download_thumbnail( url: str, client: httpx.AsyncClient | None = None, ) -> Path: """Download a thumbnail preview image.""" return await download_image(url, client, thumbnails_dir()) def cached_path_for(url: str, dest_dir: Path | None = None) -> Path: """Return the expected cache path for a URL (may not exist yet).""" dest_dir = dest_dir or cache_dir() return dest_dir / (_url_hash(url) + _ext_from_url(url)) def is_cached(url: str, dest_dir: Path | None = None) -> bool: return cached_path_for(url, dest_dir).exists() def delete_from_library(post_id: int, folder: str | None = None) -> bool: """Delete a saved image from the library. Returns True if a file was deleted.""" from .config import saved_dir, saved_folder_dir search_dir = saved_folder_dir(folder) if folder else saved_dir() from .config import MEDIA_EXTENSIONS for ext in MEDIA_EXTENSIONS: path = search_dir / f"{post_id}{ext}" if path.exists(): path.unlink() return True return False def cache_size_bytes(include_thumbnails: bool = True) -> int: """Total size of all cached files in bytes.""" total = sum(f.stat().st_size for f in cache_dir().iterdir() if f.is_file()) if include_thumbnails: total += sum(f.stat().st_size for f in thumbnails_dir().iterdir() if f.is_file()) return total def cache_file_count(include_thumbnails: bool = True) -> tuple[int, int]: """Return (image_count, thumbnail_count).""" images = sum(1 for f in cache_dir().iterdir() if f.is_file()) thumbs = sum(1 for f in thumbnails_dir().iterdir() if f.is_file()) if include_thumbnails else 0 return images, thumbs def evict_oldest(max_bytes: int, protected_paths: set[str] | None = None) -> int: """Delete oldest non-protected cached images until under max_bytes. Returns count deleted.""" protected = protected_paths or set() files = sorted(cache_dir().iterdir(), key=lambda f: f.stat().st_mtime) deleted = 0 current = cache_size_bytes(include_thumbnails=False) for f in files: if current <= max_bytes: break if not f.is_file() or str(f) in protected: continue size = f.stat().st_size f.unlink() current -= size deleted += 1 return deleted def clear_cache(clear_images: bool = True, clear_thumbnails: bool = True) -> int: """Delete all cached files. Returns count deleted.""" deleted = 0 if clear_images: for f in cache_dir().iterdir(): if f.is_file(): f.unlink() deleted += 1 if clear_thumbnails: for f in thumbnails_dir().iterdir(): if f.is_file(): f.unlink() deleted += 1 return deleted