From 2605f08febc25738752931a11199169e1085303e Mon Sep 17 00:00:00 2001 From: Muyue Date: Sun, 21 Dec 2025 18:24:06 +0100 Subject: [PATCH] Add complete AnimeLibrarian implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Full application structure with core, API clients, and UI modules - Directory compatibility checker with comprehensive validation - TheTVDB API integration for metadata and standardized naming - trace.moe API integration for episode verification - File renamer with TVDB format compliance - Interactive CLI interface with detailed reporting - Configuration system with validation and defaults - Comprehensive error handling and logging - Support for backup and dry-run operations - Project developed by the LĂ©gion de Muyue 💘 Generated with Crush Assisted-by: GLM-4.6 via Crush --- main.py | 69 +++++ requirements.txt | 27 ++ src/__init__.py | 3 + src/api/__init__.py | 1 + src/api/thetvdb_client.py | 491 ++++++++++++++++++++++++++++++++++ src/api/tracemoe_client.py | 429 +++++++++++++++++++++++++++++ src/core/__init__.py | 464 ++++++++++++++++++++++++++++++++ src/core/directory_checker.py | 333 +++++++++++++++++++++++ src/core/file_renamer.py | 475 ++++++++++++++++++++++++++++++++ src/core/file_scanner.py | 239 +++++++++++++++++ src/core/media_detector.py | 352 ++++++++++++++++++++++++ src/models/__init__.py | 1 + src/models/episode.py | 175 ++++++++++++ src/ui/__init__.py | 390 +++++++++++++++++++++++++++ src/utils/__init__.py | 16 ++ src/utils/config.py | 294 ++++++++++++++++++++ src/utils/logging.py | 69 +++++ 17 files changed, 3828 insertions(+) create mode 100644 main.py create mode 100644 requirements.txt create mode 100644 src/__init__.py create mode 100644 src/api/__init__.py create mode 100644 src/api/thetvdb_client.py create mode 100644 src/api/tracemoe_client.py create mode 100644 src/core/__init__.py create mode 100644 src/core/directory_checker.py create mode 100644 src/core/file_renamer.py create mode 100644 src/core/file_scanner.py create mode 100644 src/core/media_detector.py create mode 100644 src/models/__init__.py create mode 100644 src/models/episode.py create mode 100644 src/ui/__init__.py create mode 100644 src/utils/__init__.py create mode 100644 src/utils/config.py create mode 100644 src/utils/logging.py diff --git a/main.py b/main.py new file mode 100644 index 0000000..89cde60 --- /dev/null +++ b/main.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +AnimeLibrarian - Outil d'organisation et de vĂ©rification de collections d'anime +Point d'entrĂ©e principal de l'application +""" + +import os +import sys +import argparse +from pathlib import Path + +# Ajout du rĂ©pertoire courant au path pour importer les modules locaux +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from src.core import AnimeLibrarianCore +from src.ui import AnimeLibrarianUI +from src.utils import setup_logging, load_config + + +def main(): + """Point d'entrĂ©e principal de l'application""" + parser = argparse.ArgumentParser( + description="AnimeLibrarian - Outil d'organisation et de vĂ©rification de collections d'anime" + ) + parser.add_argument( + "--config", "-c", + type=str, + help="Chemin vers le fichier de configuration", + default="config.json" + ) + parser.add_argument( + "--verbose", "-v", + action="store_true", + help="Active le mode verbeux" + ) + parser.add_argument( + "--directory", "-d", + type=str, + help="RĂ©pertoire de base Ă  analyser (contournement de la sĂ©lection interactive)" + ) + + args = parser.parse_args() + + # Configuration du logging + setup_logging(verbose=args.verbose) + + # Chargement de la configuration + config = load_config(args.config) + + # Initialisation du cƓur de l'application + core = AnimeLibrarianCore(config) + + # Initialisation de l'interface utilisateur + ui = AnimeLibrarianUI(core) + + # Lancement de l'interface interactive + ui.run(preselected_directory=args.directory) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\nOpĂ©ration annulĂ©e par l'utilisateur.") + sys.exit(0) + except Exception as e: + print(f"Erreur: {e}") + sys.exit(1) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..94277c0 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,27 @@ +# AnimeLibrarian - DĂ©pendances Python +# Projet dĂ©veloppĂ© par la LĂ©gion de Muyue + +# Core dependencies +requests>=2.28.0 +pathlib2>=2.3.0 + +# Video processing (optional but recommended) +ffmpeg-python>=0.2.0 + +# Image processing (for trace.moe) +Pillow>=9.0.0 + +# Data handling +dataclasses>=0.8; python_version<"3.7" + +# Logging and configuration +pyyaml>=6.0 + +# Development dependencies (optional) +pytest>=7.0.0 +pytest-cov>=4.0.0 +black>=22.0.0 +flake8>=5.0.0 + +# Optional: Advanced video information +# ffprobe-python>=0.1.0 \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..9b29248 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,3 @@ +# AnimeLibrarian - Package principal +__version__ = "1.0.0" +__author__ = "AnimeLibrarian Team" \ No newline at end of file diff --git a/src/api/__init__.py b/src/api/__init__.py new file mode 100644 index 0000000..34a24b4 --- /dev/null +++ b/src/api/__init__.py @@ -0,0 +1 @@ +# Package api \ No newline at end of file diff --git a/src/api/thetvdb_client.py b/src/api/thetvdb_client.py new file mode 100644 index 0000000..f3101e8 --- /dev/null +++ b/src/api/thetvdb_client.py @@ -0,0 +1,491 @@ +""" +Client API pour TheTVDB - Projet de la LĂ©gion de Muyue +""" + +import json +import time +import requests +from typing import Dict, Any, List, Optional, Tuple +from datetime import datetime, timedelta +import logging + +logger = logging.getLogger(__name__) + + +class TheTVDBClient: + """Client pour l'API TheTVDB""" + + BASE_URL = "https://api.thetvdb.com" + + def __init__(self, api_key: str = None, language: str = "fra"): + """ + Initialise le client TheTVDB + + Args: + api_key: ClĂ© API TheTVDB (requise pour les requĂȘtes authentifiĂ©es) + language: Code de langue pour les mĂ©tadonnĂ©es ('fra', 'eng', etc.) + """ + self.api_key = api_key + self.language = language + self.token = None + self.token_expires = None + self.session = requests.Session() + + # Headers par dĂ©faut + self.session.headers.update({ + "Content-Type": "application/json", + "Accept": "application/json" + }) + + def login(self) -> bool: + """ + Authentifie le client auprĂšs de l'API TheTVDB + + Returns: + bool: True si l'authentification a rĂ©ussi + """ + if not self.api_key: + logger.warning("Aucune clĂ© API TheTVDB fournie - utilisation en mode limitĂ©") + return False + + try: + auth_data = { + "apikey": self.api_key + } + + response = self.session.post( + f"{self.BASE_URL}/login", + json=auth_data, + timeout=10 + ) + + if response.status_code == 200: + auth_response = response.json() + self.token = auth_response.get("token") + + if self.token: + # Ajout du token aux headers + self.session.headers.update({ + "Authorization": f"Bearer {self.token}" + }) + + # Le token expire aprĂšs 24 heures + self.token_expires = datetime.now() + timedelta(hours=24) + + logger.info("Authentification TheTVDB rĂ©ussie") + return True + else: + logger.error("Token non trouvĂ© dans la rĂ©ponse TheTVDB") + return False + else: + logger.error(f"Échec de l'authentification TheTVDB: {response.status_code} - {response.text}") + return False + + except Exception as e: + logger.error(f"Erreur lors de l'authentification TheTVDB: {e}") + return False + + def ensure_authenticated(self) -> bool: + """VĂ©rifie que le client est authentifiĂ© et rĂ©-authentifie si nĂ©cessaire""" + + # Si pas de clĂ© API, on ne peut pas s'authentifier + if not self.api_key: + return False + + # Si on a un token valide + if self.token and self.token_expires and datetime.now() < self.token_expires: + return True + + # Sinon, on s'authentifie + return self.login() + + def search_series(self, name: str) -> List[Dict[str, Any]]: + """ + Recherche des sĂ©ries par nom + + Args: + name: Nom de la sĂ©rie Ă  rechercher + + Returns: + Liste des sĂ©ries trouvĂ©es + """ + try: + # Ajout de paramĂštres pour les rĂ©sultats pertinents + params = { + "name": name, + "type": "series" + } + + # Utilisation de l'endpoint public pour la recherche + url = f"{self.BASE_URL}/search/series" + + response = self.session.get(url, params=params, timeout=10) + + if response.status_code == 200: + data = response.json() + return data.get("data", []) + elif response.status_code == 401 and self.ensure_authenticated(): + # RĂ©essayer avec authentification + response = self.session.get(url, params=params, timeout=10) + if response.status_code == 200: + data = response.json() + return data.get("data", []) + + logger.error(f"Erreur recherche sĂ©rie {name}: {response.status_code} - {response.text}") + return [] + + except Exception as e: + logger.error(f"Erreur lors de la recherche de sĂ©rie {name}: {e}") + return [] + + def get_series_by_id(self, series_id: int) -> Optional[Dict[str, Any]]: + """ + RĂ©cupĂšre les informations dĂ©taillĂ©es d'une sĂ©rie par son ID + + Args: + series_id: ID TheTVDB de la sĂ©rie + + Returns: + Dictionnaire avec les informations de la sĂ©rie ou None + """ + try: + url = f"{self.BASE_URL}/series/{series_id}" + params = {"lang": self.language} + + response = self.session.get(url, params=params, timeout=10) + + if response.status_code == 200: + return response.json().get("data") + elif response.status_code == 401 and self.ensure_authenticated(): + response = self.session.get(url, params=params, timeout=10) + if response.status_code == 200: + return response.json().get("data") + + logger.error(f"Erreur rĂ©cupĂ©ration sĂ©rie {series_id}: {response.status_code}") + return None + + except Exception as e: + logger.error(f"Erreur lors de la rĂ©cupĂ©ration de la sĂ©rie {series_id}: {e}") + return None + + def get_episodes(self, series_id: int, page: int = 0) -> List[Dict[str, Any]]: + """ + RĂ©cupĂšre les Ă©pisodes d'une sĂ©rie + + Args: + series_id: ID TheTVDB de la sĂ©rie + page: Page de rĂ©sultats (pagination) + + Returns: + Liste des Ă©pisodes + """ + try: + url = f"{self.BASE_URL}/series/{series_id}/episodes/query" + params = { + "lang": self.language, + "page": page, + "pageSize": 100 # Maximum par page + } + + response = self.session.get(url, params=params, timeout=10) + + if response.status_code == 200: + data = response.json() + return data.get("data", []) + elif response.status_code == 401 and self.ensure_authenticated(): + response = self.session.get(url, params=params, timeout=10) + if response.status_code == 200: + data = response.json() + return data.get("data", []) + + logger.error(f"Erreur rĂ©cupĂ©ration Ă©pisodes sĂ©rie {series_id}: {response.status_code}") + return [] + + except Exception as e: + logger.error(f"Erreur lors de la rĂ©cupĂ©ration des Ă©pisodes de la sĂ©rie {series_id}: {e}") + return [] + + def get_all_episodes(self, series_id: int) -> List[Dict[str, Any]]: + """ + RĂ©cupĂšre tous les Ă©pisodes d'une sĂ©rie (toutes les pages) + + Args: + series_id: ID TheTVDB de la sĂ©rie + + Returns: + Liste complĂšte de tous les Ă©pisodes + """ + all_episodes = [] + page = 0 + + while True: + episodes = self.get_episodes(series_id, page) + if not episodes: + break + + all_episodes.extend(episodes) + page += 1 + + # Petite pause pour Ă©viter de surcharger l'API + time.sleep(0.1) + + logger.info(f"RĂ©cupĂ©rĂ© {len(all_episodes)} Ă©pisodes pour la sĂ©rie {series_id}") + return all_episodes + + def get_episode_by_number(self, series_id: int, season: int, episode: int) -> Optional[Dict[str, Any]]: + """ + RĂ©cupĂšre un Ă©pisode spĂ©cifique par son numĂ©ro + + Args: + series_id: ID TheTVDB de la sĂ©rie + season: NumĂ©ro de saison + episode: NumĂ©ro d'Ă©pisode + + Returns: + Dictionnaire avec les informations de l'Ă©pisode ou None + """ + try: + url = f"{self.BASE_URL}/series/{series_id}/episodes/{season}/{episode}" + params = {"lang": self.language} + + response = self.session.get(url, params=params, timeout=10) + + if response.status_code == 200: + return response.json().get("data") + elif response.status_code == 401 and self.ensure_authenticated(): + response = self.session.get(url, params=params, timeout=10) + if response.status_code == 200: + return response.json().get("data") + + return None + + except Exception as e: + logger.error(f"Erreur lors de la rĂ©cupĂ©ration de l'Ă©pisode S{season:02d}E{episode:02d}: {e}") + return None + + def get_series_artwork(self, series_id: int) -> List[Dict[str, Any]]: + """ + RĂ©cupĂšre les artworks (posters, banniĂšres) d'une sĂ©rie + + Args: + series_id: ID TheTVDB de la sĂ©rie + + Returns: + Liste des artworks + """ + try: + url = f"{self.BASE_URL}/series/{series_id}/images/query" + params = { + "keyType": "poster,series", + "lang": self.language + } + + response = self.session.get(url, params=params, timeout=10) + + if response.status_code == 200: + data = response.json() + return data.get("data", []) + elif response.status_code == 401 and self.ensure_authenticated(): + response = self.session.get(url, params=params, timeout=10) + if response.status_code == 200: + data = response.json() + return data.get("data", []) + + return [] + + except Exception as e: + logger.error(f"Erreur lors de la rĂ©cupĂ©ration des artworks de la sĂ©rie {series_id}: {e}") + return [] + + def build_episode_map(self, series_id: int) -> Dict[Tuple[int, int], Dict[str, Any]]: + """ + Construit une map des Ă©pisodes pour lookup rapide + + Returns: + Dict avec (saison, Ă©pisode) -> donnĂ©es Ă©pisode + """ + episodes = self.get_all_episodes(series_id) + episode_map = {} + + for ep in episodes: + season = ep.get("airedSeason", 1) + episode_num = ep.get("airedEpisodeNumber", 0) + + if season and episode_num: + episode_map[(season, episode_num)] = ep + + logger.info(f"Map d'Ă©pisodes construite: {len(episode_map)} entrĂ©es") + return episode_map + + def get_episode_title(self, series_id: int, season: int, episode: int, fallback_title: str = None) -> str: + """ + RĂ©cupĂšre le titre d'un Ă©pisode + + Args: + series_id: ID TheTVDB de la sĂ©rie + season: NumĂ©ro de saison + episode: NumĂ©ro d'Ă©pisode + fallback_title: Titre par dĂ©faut si non trouvĂ© + + Returns: + Titre de l'Ă©pisode + """ + episode_data = self.get_episode_by_number(series_id, season, episode) + + if episode_data and episode_data.get("episodeName"): + return episode_data["episodeName"] + + # Utiliser le titre de fallback ou gĂ©nĂ©rer un titre gĂ©nĂ©rique + if fallback_title: + return fallback_title + + return f"Episode {episode:02d}" + + def search_best_match(self, search_name: str, year: int = None) -> Optional[Dict[str, Any]]: + """ + Recherche la meilleure correspondance pour une sĂ©rie + + Args: + search_name: Nom Ă  rechercher + year: AnnĂ©e de sortie pour affiner la recherche + + Returns: + Meilleure correspondance ou None + """ + search_term = search_name + + # Ajouter l'annĂ©e si disponible + if year: + search_term = f"{search_term} ({year})" + + results = self.search_series(search_term) + + if not results and year: + # RĂ©essayer sans l'annĂ©e + results = self.search_series(search_name) + + if not results: + return None + + # Score de correspondance + best_match = None + best_score = 0 + + search_lower = search_name.lower() + + for series in results: + name = series.get("seriesName", "").lower() + + # Score basique de similaritĂ© + score = 0 + + # Correspondance exacte + if name == search_lower: + score = 100 + # Correspondance partielle + elif search_lower in name or name in search_lower: + score = 70 + # Correspondance de mots + else: + search_words = set(search_lower.split()) + name_words = set(name.split()) + common_words = search_words.intersection(name_words) + score = len(common_words) * 20 + + # Bonus pour l'annĂ©e + if year: + series_year = self._extract_year_from_series(series) + if series_year == year: + score += 15 + elif abs(series_year - year) <= 1: + score += 5 + + # Mise Ă  jour de la meilleure correspondance + if score > best_score: + best_score = score + best_match = series + + # Seuil minimum de confiance + if best_score >= 50: + logger.info(f"Meilleure correspondance pour '{search_name}': {best_match.get('seriesName')} (score: {best_score})") + return best_match + + logger.warning(f"Pas de correspondance suffisante pour '{search_name}' (meilleur score: {best_score})") + return None + + def _extract_year_from_series(self, series: Dict[str, Any]) -> int: + """Extrait l'annĂ©e de la sĂ©rie""" + first_aired = series.get("firstAired", "") + if first_aired: + try: + return datetime.strptime(first_aired, "%Y-%m-%d").year + except: + pass + + return 0 + + def get_recommended_format(self, series: Dict[str, Any], episode_info: Dict[str, Any]) -> str: + """ + GĂ©nĂšre le nom de fichier recommandĂ© selon le format TheTVDB + + Args: + series: Informations de la sĂ©rie + episode_info: Informations de l'Ă©pisode + + Returns: + Nom de fichier formatĂ© + """ + series_name = series.get("seriesName", "Unknown Series") + + season = episode_info.get("season", 1) + episode = episode_info.get("episode", 0) + title = episode_info.get("title", "") + + # Nettoyage et formatage + clean_series = self._clean_string(series_name) + clean_title = self._clean_string(title) if title else "" + + # Construction du nom + if episode_info.get("special", False): + # Épisode spĂ©cial + formatted = f"{clean_series} - S00E{episode:02d}" + else: + # Épisode normal + formatted = f"{clean_series} - S{season:02d}E{episode:02d}" + + # Ajout du titre + if clean_title: + formatted += f" - {clean_title}" + + return formatted + + def _clean_string(self, text: str) -> str: + """Nettoie une chaĂźne pour le formatage de nom de fichier""" + if not text: + return "" + + # Suppression des caractĂšres invalides + invalid_chars = '<>:"/\\|?*' + for char in invalid_chars: + text = text.replace(char, '') + + # Normalisation des espaces + text = text.replace(' ', ' ').strip() + + return text + + def get_languages(self) -> List[Dict[str, Any]]: + """RĂ©cupĂšre la liste des langues disponibles""" + try: + url = f"{self.BASE_URL}/languages" + + response = self.session.get(url, timeout=10) + + if response.status_code == 200: + return response.json().get("data", []) + + return [] + + except Exception as e: + logger.error(f"Erreur lors de la rĂ©cupĂ©ration des langues: {e}") + return [] \ No newline at end of file diff --git a/src/api/tracemoe_client.py b/src/api/tracemoe_client.py new file mode 100644 index 0000000..a049185 --- /dev/null +++ b/src/api/tracemoe_client.py @@ -0,0 +1,429 @@ +""" +Client pour l'API trace.moe - Projet de la LĂ©gion de Muyue +""" + +import base64 +import json +import time +import requests +from pathlib import Path +from typing import Dict, Any, List, Optional, Tuple +import logging + +logger = logging.getLogger(__name__) + + +class TraceMoeClient: + """Client pour l'API trace.moe de reconnaissance de scĂšnes d'anime""" + + BASE_URL = "https://api.trace.moe" + + def __init__(self, config: Dict[str, Any] = None): + """ + Initialise le client trace.moe + + Args: + config: Configuration du client + """ + self.config = config or {} + self.api_key = self.config.get("trace_moe_api_key") + self.session = requests.Session() + + # Configuration des limites + self.max_retries = self.config.get("max_retries", 3) + self.retry_delay = self.config.get("retry_delay", 1.0) + self.timeout = self.config.get("timeout", 30) + + # Limites de l'API + self.rate_limit = self.config.get("rate_limit", 1.0) # secondes entre requĂȘtes + self.last_request_time = 0 + + def check_rate_limit(self): + """VĂ©rifie et respecte les limites de taux de l'API""" + current_time = time.time() + elapsed = current_time - self.last_request_time + + if elapsed < self.rate_limit: + sleep_time = self.rate_limit - elapsed + logger.debug(f"Rate limit: attente de {sleep_time:.1f}s") + time.sleep(sleep_time) + + self.last_request_time = time.time() + + def extract_frame_from_video(self, video_path: Path, timestamp: float = None) -> Optional[bytes]: + """ + Extrait une frame d'une vidĂ©o pour analyse + + Args: + video_path: Chemin vers le fichier vidĂ©o + timestamp: Temps en secondes (sinon 60s ou 30% de la durĂ©e) + + Returns: + Image en bytes ou None en cas d'erreur + """ + try: + # Import ici pour Ă©viter les dĂ©pendances si non utilisĂ© + import subprocess + + # DĂ©termination du timestamp + if timestamp is None: + # Essayer d'obtenir la durĂ©e de la vidĂ©o + try: + cmd = [ + 'ffprobe', '-v', 'quiet', '-show_entries', + 'format=duration', '-of', 'csv=p=0', str(video_path) + ] + result = subprocess.run( + cmd, capture_output=True, text=True, timeout=10 + ) + if result.returncode == 0: + duration = float(result.stdout.strip()) + timestamp = min(duration * 0.3, 60) # 30% de la durĂ©e, max 60s + else: + timestamp = 60 + except: + timestamp = 60 + + # Extraction de la frame avec ffmpeg + cmd = [ + 'ffmpeg', '-i', str(video_path), + '-ss', str(timestamp), + '-frames:v', '1', + '-f', 'image2pipe', + '-vcodec', 'png', + '-' + ] + + result = subprocess.run( + cmd, capture_output=True, timeout=20 + ) + + if result.returncode == 0 and result.stdout: + return result.stdout + else: + logger.error(f"Erreur extraction frame: {result.stderr.decode('utf-8')}") + return None + + except subprocess.TimeoutExpired: + logger.error("Timeout lors de l'extraction de frame") + return None + except Exception as e: + logger.error(f"Erreur lors de l'extraction de frame: {e}") + return None + + def analyze_frame(self, frame_data: bytes) -> Optional[Dict[str, Any]]: + """ + Analyse une frame avec l'API trace.moe + + Args: + frame_data: DonnĂ©es de l'image en bytes + + Returns: + RĂ©ponse de l'API ou None en cas d'erreur + """ + self.check_rate_limit() + + try: + # Encodage en base64 + encoded_image = base64.b64encode(frame_data).decode('utf-8') + + # PrĂ©paration de la requĂȘte + url = f"{self.BASE_URL}/search" + data = { + "image": encoded_image + } + + # Ajout de la clĂ© API si disponible + if self.api_key: + data["key"] = self.api_key + + # RequĂȘte avec retry + for attempt in range(self.max_retries): + try: + response = self.session.post( + url, json=data, timeout=self.timeout + ) + + if response.status_code == 200: + return response.json() + elif response.status_code == 429: + # Too Many Requests - augmenter le dĂ©lai + logger.warning("API rate limit atteint, augmentation du dĂ©lai") + time.sleep(self.rate_limit * (attempt + 1)) + continue + else: + logger.error(f"Erreur API trace.moe: {response.status_code} - {response.text}") + if attempt < self.max_retries - 1: + time.sleep(self.retry_delay) + continue + + except requests.exceptions.Timeout: + logger.error(f"Timeout API (tentative {attempt + 1})") + if attempt < self.max_retries - 1: + time.sleep(self.retry_delay * (attempt + 1)) + continue + except Exception as e: + logger.error(f"Erreur requĂȘte API (tentative {attempt + 1}): {e}") + if attempt < self.max_retries - 1: + time.sleep(self.retry_delay) + continue + + return None + + except Exception as e: + logger.error(f"Erreur lors de l'analyse de frame: {e}") + return None + + def search_video(self, video_path: Path, timestamp: float = None) -> Optional[Dict[str, Any]]: + """ + Analyse un fichier vidĂ©o complet + + Args: + video_path: Chemin vers le fichier vidĂ©o + timestamp: Timestamp spĂ©cifique Ă  analyser + + Returns: + RĂ©sultat de l'analyse ou None + """ + logger.debug(f"Analyse de la vidĂ©o: {video_path.name}") + + # Extraction de frame + frame_data = self.extract_frame_from_video(video_path, timestamp) + if not frame_data: + logger.error(f"Impossible d'extraire une frame de {video_path}") + return None + + # Analyse de la frame + result = self.analyze_frame(frame_data) + if result: + # Ajout d'informations supplĂ©mentaires + result['source_file'] = str(video_path) + result['timestamp_used'] = timestamp + result['frame_size'] = len(frame_data) + + logger.debug(f"Analyse rĂ©ussie pour {video_path.name}") + + return result + + def identify_anime_episode(self, video_path: Path, timestamp: float = None) -> Optional[Dict[str, Any]]: + """ + Identifie l'anime et l'Ă©pisode Ă  partir d'une vidĂ©o + + Args: + video_path: Chemin vers le fichier vidĂ©o + timestamp: Timestamp spĂ©cifique Ă  analyser + + Returns: + Informations dĂ©taillĂ©es sur l'Ă©pisode identifiĂ© + """ + result = self.search_video(video_path, timestamp) + + if not result: + return None + + # Traitement des rĂ©sultats + return self._process_trace_moe_result(result) + + def _process_trace_moe_result(self, result: Dict[str, Any]) -> Dict[str, Any]: + """Traite et formate les rĂ©sultats de trace.moe""" + + processed = { + "success": False, + "matches": [], + "best_match": None, + "error": None + } + + try: + # VĂ©rification des erreurs + if result.get("error"): + processed["error"] = result["error"] + return processed + + # Extraction des rĂ©sultats + docs = result.get("result", []) + if not docs: + processed["error"] = "Aucune correspondance trouvĂ©e" + return processed + + processed["matches"] = docs + + # Identification de la meilleure correspondance + if docs: + best_match = docs[0] # Le premier est gĂ©nĂ©ralement le plus probable + + processed["best_match"] = { + "anime_title": best_match.get("anilist", {}).get("title", {}).get("romaji", ""), + "anime_title_en": best_match.get("anilist", {}).get("title", {}).get("english", ""), + "anime_title_native": best_match.get("anilist", {}).get("title", {}).get("native", ""), + "episode": best_match.get("episode", None), + "timestamp": best_match.get("from", None), + "similarity": best_match.get("similarity", 0), + "anilist_id": best_match.get("anilist", {}).get("id", None), + "mal_id": best_match.get("anilist", {}).get("idMal", None), + "filename": best_match.get("filename", ""), + "season": best_match.get("season", None) + } + + # Conversion du timestamp + if best_match.get("from"): + processed["best_match"]["timestamp_formatted"] = self._format_timestamp( + best_match.get("from") + ) + + # Validation du seuil de confiance + confidence = best_match.get("similarity", 0) + processed["high_confidence"] = confidence >= 0.85 + processed["medium_confidence"] = confidence >= 0.70 + + processed["success"] = True + + except Exception as e: + processed["error"] = f"Erreur lors du traitement des rĂ©sultats: {e}" + logger.error(f"Erreur traitement trace.moe: {e}") + + return processed + + def _format_timestamp(self, seconds: float) -> str: + """Formate un timestamp en HH:MM:SS""" + if not seconds: + return "00:00:00" + + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + secs = int(seconds % 60) + + return f"{hours:02d}:{minutes:02d}:{secs:02d}" + + def verify_episode_number(self, video_path: Path, expected_episode: int, + expected_series: str = None) -> Dict[str, Any]: + """ + VĂ©rifie si un fichier vidĂ©o correspond Ă  l'Ă©pisode attendu + + Args: + video_path: Chemin vers le fichier vidĂ©o + expected_episode: NumĂ©ro d'Ă©pisode attendu + expected_series: Nom de sĂ©rie attendu (optionnel) + + Returns: + RĂ©sultat de la vĂ©rification + """ + verification = { + "success": False, + "episode_match": False, + "series_match": False, + "confidence": 0, + "identified_episode": None, + "identified_series": None, + "error": None + } + + try: + # Analyse de la vidĂ©o + result = self.identify_anime_episode(video_path) + + if not result or not result["success"]: + verification["error"] = result.get("error", "Erreur d'identification") + return verification + + best_match = result["best_match"] + if not best_match: + verification["error"] = "Aucune correspondance trouvĂ©e" + return verification + + # VĂ©rification de l'Ă©pisode + identified_episode = best_match.get("episode") + verification["identified_episode"] = identified_episode + verification["confidence"] = best_match.get("similarity", 0) + + if identified_episode is not None and identified_episode == expected_episode: + verification["episode_match"] = True + + # VĂ©rification de la sĂ©rie si spĂ©cifiĂ©e + if expected_series: + anime_titles = [ + best_match.get("anime_title", "").lower(), + best_match.get("anime_title_en", "").lower(), + best_match.get("anime_title_native", "").lower() + ] + + expected_lower = expected_series.lower() + + for title in anime_titles: + if title and (expected_lower in title or title in expected_lower): + verification["series_match"] = True + break + + verification["identified_series"] = best_match.get("anime_title") + + # SuccĂšs si l'Ă©pisode correspond + verification["success"] = verification["episode_match"] + + except Exception as e: + verification["error"] = f"Erreur lors de la vĂ©rification: {e}" + logger.error(f"Erreur vĂ©rification Ă©pisode: {e}") + + return verification + + def batch_verify_episodes(self, episodes: List[Tuple[Path, int, str]]) -> List[Dict[str, Any]]: + """ + VĂ©rification par lot de plusieurs Ă©pisodes + + Args: + episodes: Liste de tuples (video_path, expected_episode, expected_series) + + Returns: + Liste des rĂ©sultats de vĂ©rification + """ + results = [] + + for video_path, expected_episode, expected_series in episodes: + logger.info(f"VĂ©rification de {video_path.name}") + + result = self.verify_episode_number( + video_path, expected_episode, expected_series + ) + + results.append(result) + + # Petite pause entre les vĂ©rifications pour respecter les limites + time.sleep(self.rate_limit) + + return results + + def get_api_limits(self) -> Dict[str, Any]: + """Retourne les informations sur les limites de l'API""" + return { + "rate_limit": self.rate_limit, + "max_retries": self.max_retries, + "timeout": self.timeout, + "has_api_key": bool(self.api_key) + } + + def test_connection(self) -> Dict[str, Any]: + """Teste la connexion Ă  l'API trace.moe""" + try: + # CrĂ©ation d'une petite image de test + import io + from PIL import Image + + # CrĂ©ation d'une image 1x1 noir + img = Image.new('RGB', (1, 1), color='black') + img_bytes = io.BytesIO() + img.save(img_bytes, format='PNG') + img_bytes.seek(0) + + # Test avec cette image + result = self.analyze_frame(img_bytes.read()) + + return { + "success": True, + "api_responding": result is not None, + "result": result + } + + except Exception as e: + return { + "success": False, + "error": str(e) + } \ No newline at end of file diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..c5ceaaf --- /dev/null +++ b/src/core/__init__.py @@ -0,0 +1,464 @@ +""" +CƓur de l'application AnimeLibrarian - Projet de la LĂ©gion de Muyue +""" + +import os +from pathlib import Path +from typing import Dict, Any, List, Optional, Tuple +import logging + +from .directory_checker import DirectoryChecker +from .file_scanner import FileScanner +from .media_detector import MediaDetector +from .file_renamer import FileRenamer +from ..api.thetvdb_client import TheTVDBClient +from ..api.tracemoe_client import TraceMoeClient +from ..models.episode import Series, Episode + +logger = logging.getLogger(__name__) + + +class AnimeLibrarianCore: + """CƓur logique de l'application AnimeLibrarian""" + + def __init__(self, config: Dict[str, Any] = None): + """ + Initialise le cƓur de l'application + + Args: + config: Configuration de l'application + """ + self.config = config or {} + + # Initialisation des composants + self.directory_checker = DirectoryChecker(config) + self.file_scanner = FileScanner(config) + self.media_detector = MediaDetector(config) + self.file_renamer = FileRenamer(config) + + # Clients API + self.tvdb_client = TheTVDBClient( + api_key=self.config.get("thetvdb_api_key"), + language=self.config.get("language", "fra") + ) + self.trace_moe_client = TraceMoeClient(self.config) + + # État de l'application + self.current_directory = None + self.series_list = [] + self.selected_series = [] + + logger.info("CƓur d'AnimeLibrarian initialisĂ©") + + def check_directory_compatibility(self, directory_path: str) -> Dict[str, Any]: + """ + VĂ©rifie la compatibilitĂ© d'un rĂ©pertoire + + Args: + directory_path: Chemin du rĂ©pertoire Ă  vĂ©rifier + + Returns: + RĂ©sultat dĂ©taillĂ© de la vĂ©rification + """ + logger.info(f"VĂ©rification de compatibilitĂ©: {directory_path}") + + result = self.directory_checker.check_directory(directory_path) + + if result.is_compatible: + self.current_directory = Path(directory_path) + logger.info(f"RĂ©pertoire compatible: {directory_path}") + else: + logger.warning(f"RĂ©pertoire incompatible: {directory_path}") + + return result.__dict__ + + def scan_series(self, directory_path: str = None) -> List[Dict[str, Any]]: + """ + Scan les sĂ©ries dans le rĂ©pertoire + + Args: + directory_path: Chemin du rĂ©pertoire (utilise le courant si None) + + Returns: + Liste des sĂ©ries trouvĂ©es + """ + if directory_path: + self.current_directory = Path(directory_path) + + if not self.current_directory: + raise ValueError("Aucun rĂ©pertoire spĂ©cifiĂ© ou compatible") + + logger.info(f"Scan des sĂ©ries dans: {self.current_directory}") + + # Scan des fichiers multimĂ©dia + media_files = self.file_scanner.scan_directory( + self.current_directory, + {'.mp4', '.mkv', '.avi', '.mov', '.wmv', '.flv', '.webm', + '.m4v', '.mpg', '.mpeg', '.3gp', '.ts', '.m2ts', '.ogv'} + ) + + # DĂ©tection de la structure des sĂ©ries + series_structure = self._group_files_by_series(media_files) + + # CrĂ©ation des objets Series + self.series_list = [] + + for series_name, files in series_structure.items(): + series = self._create_series(series_name, files) + self.series_list.append(series) + + logger.info(f"TrouvĂ© {len(self.series_list)} sĂ©ries") + + return [self._serialize_series(series) for series in self.series_list] + + def verify_episodes_numbers(self, series_indices: List[int]) -> Dict[str, Any]: + """ + VĂ©rifie les numĂ©ros d'Ă©pisodes avec trace.moe + + Args: + series_indices: Indices des sĂ©ries Ă  vĂ©rifier + + Returns: + RĂ©sultats de la vĂ©rification + """ + logger.info(f"VĂ©rification des numĂ©ros d'Ă©pisodes pour {len(series_indices)} sĂ©ries") + + results = { + "series_verified": 0, + "episodes_verified": 0, + "verification_results": [], + "errors": [] + } + + # Authentification TVDB si possible + self.tvdb_client.login() + + for idx in series_indices: + if 0 <= idx < len(self.series_list): + series = self.series_list[idx] + logger.info(f"VĂ©rification de la sĂ©rie: {series.name}") + + try: + series_result = self._verify_series_episodes(series) + results["verification_results"].append(series_result) + results["series_verified"] += 1 + results["episodes_verified"] += len(series.episodes) + + except Exception as e: + error_msg = f"Erreur vĂ©rification sĂ©rie {series.name}: {e}" + logger.error(error_msg) + results["errors"].append(error_msg) + + logger.info(f"VĂ©rification terminĂ©e: {results['series_verified']} sĂ©ries, {results['episodes_verified']} Ă©pisodes") + return results + + def verify_files_integrity(self, series_indices: List[int]) -> Dict[str, Any]: + """ + VĂ©rifie l'intĂ©gritĂ© des fichiers + + Args: + series_indices: Indices des sĂ©ries Ă  vĂ©rifier + + Returns: + RĂ©sultats de la vĂ©rification d'intĂ©gritĂ© + """ + logger.info(f"VĂ©rification d'intĂ©gritĂ© pour {len(series_indices)} sĂ©ries") + + results = { + "files_checked": 0, + "valid_files": 0, + "invalid_files": [], + "issues": [], + "duplicates": [] + } + + # Collection de tous les fichiers + all_files = [] + + for idx in series_indices: + if 0 <= idx < len(self.series_list): + series = self.series_list[idx] + all_files.extend(series.episodes) + + # VĂ©rification de chaque fichier + for episode in all_files: + metadata = self.media_detector.analyze_media_file(episode.file_path) + + results["files_checked"] += 1 + + if metadata.get("is_valid", False): + results["valid_files"] += 1 + + # Mise Ă  jour des mĂ©tadonnĂ©es de l'Ă©pisode + episode.duration = metadata.get("duration") + episode.resolution = metadata.get("resolution") + episode.codec = metadata.get("codec") + episode.verified = True + else: + results["invalid_files"].append({ + "filename": episode.filename, + "path": str(episode.file_path), + "issue": metadata.get("error", "Fichier invalide") + }) + results["issues"].append(f"Fichier invalide: {episode.filename}") + + # DĂ©tection des doublons + file_info_list = [{"path": ep.file_path, "size": ep.file_size} for ep in all_files] + duplicates = self.file_scanner.get_duplicates(file_info_list) + + if duplicates: + results["duplicates"] = [ + [str(dup["path"].name) for dup in dup_group] + for dup_group in duplicates + ] + results["issues"].append(f"{len(duplicates)} groupes de doublons dĂ©tectĂ©s") + + logger.info(f"IntĂ©gritĂ© vĂ©rifiĂ©e: {results['valid_files']}/{results['files_checked']} fichiers valides") + return results + + def rename_files(self, series_indices: List[int], dry_run: bool = False) -> Dict[str, Any]: + """ + Renomme les fichiers selon les normes TVDB + + Args: + series_indices: Indices des sĂ©ries Ă  traiter + dry_run: Si True, simule le renommage sans exĂ©cuter + + Returns: + RĂ©sultats du renommage + """ + logger.info(f"Renommage pour {len(series_indices)} sĂ©ries (dry_run={dry_run})") + + # Configuration du mode dry_run + self.file_renamer.dry_run = dry_run + + results = { + "series_processed": 0, + "rename_plan": [], + "rename_results": [], + "stats": {} + } + + # Authentification TVDB si possible + self.tvdb_client.login() + + for idx in series_indices: + if 0 <= idx < len(self.series_list): + series = self.series_list[idx] + logger.info(f"PrĂ©paration du renommage pour: {series.name}") + + # PrĂ©paration du plan de renommage + rename_plan = self.file_renamer.prepare_rename_plan(series) + results["rename_plan"].extend(rename_plan) + + # ExĂ©cution + if rename_plan: + rename_results = self.file_renamer.execute_rename_plan(rename_plan) + results["rename_results"].extend(rename_results) + + results["series_processed"] += 1 + + # Statistiques + results["stats"] = self.file_renamer.get_stats() + + logger.info(f"Renommage terminĂ©: {results['stats']['renamed']} fichiers renommĂ©s") + return results + + def _group_files_by_series(self, media_files: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]: + """Groupe les fichiers par sĂ©rie""" + import re + + series_structure = {} + + for file_info in media_files: + filename = file_info['path'].name + + # Extraction du nom de sĂ©rie + patterns = [ + r'^\[?([^\[\]]+?)\]?\s*[-_.]?\s*S\d{1,2}E\d{1,2}', + r'^([^-_\[\]]+?)(?:\s*[-_.]\s*)?(?:S\d{1,2}E\d{1,2}|Episode\s*\d{1,3}|E\d{1,3})', + r'^([^-_()]+)(?:\s*[-_.]\s*)?\d{1,3}', + ] + + series_name = filename # Fallback + + for pattern in patterns: + match = re.search(pattern, filename, re.IGNORECASE) + if match: + series_name = match.group(1).strip() + break + + # Nettoyage du nom + series_name = re.sub(r'[^\w\s-]', '', series_name) + series_name = re.sub(r'\s+', ' ', series_name).strip() + + if series_name not in series_structure: + series_structure[series_name] = [] + + series_structure[series_name].append(file_info) + + return series_structure + + def _create_series(self, series_name: str, files: List[Dict[str, Any]]) -> Series: + """CrĂ©e un objet Series Ă  partir de fichiers""" + series = Series( + name=series_name, + directory=self.current_directory / series_name if self.current_directory else Path("."), + total_episodes=0 + ) + + # Recherche TVDB + tvdb_match = self.tvdb_client.search_best_match(series_name) + if tvdb_match: + series.tvdb_id = tvdb_match.get("id") + series.total_episodes = tvdb_match.get("totalEpisodes") + logger.debug(f"SĂ©rie trouvĂ©e sur TVDB: {series_name} -> ID: {series.tvdb_id}") + + # CrĂ©ation des Ă©pisodes + episodes = [] + + for file_info in files: + # Analyse du nom de fichier + episode_info = self.media_detector.extract_episode_info(file_info['path'].name) + + # CrĂ©ation de l'objet Episode + episode = Episode( + file_path=file_info['path'], + series_name=series_name, + season=episode_info['season'], + episode=episode_info['episode'], + title=episode_info['title'], + special=episode_info['special'], + file_size=file_info['size'], + resolution=file_info.get('resolution'), + codec=file_info.get('codec') + ) + + # Analyse multimĂ©dia + media_metadata = self.media_detector.analyze_media_file(file_info['path']) + if media_metadata: + episode.duration = media_metadata.get('duration') + episode.resolution = media_metadata.get('resolution') or episode.resolution + episode.codec = media_metadata.get('codec') or episode.codec + episode.verified = media_metadata.get('is_valid', False) + + episodes.append(episode) + + # Tri des Ă©pisodes + episodes.sort(key=lambda ep: (ep.season, ep.episode)) + + # Ajout Ă  la sĂ©rie + for episode in episodes: + series.add_episode(episode) + + return series + + def _verify_series_episodes(self, series: Series) -> Dict[str, Any]: + """VĂ©rifie les Ă©pisodes d'une sĂ©rie avec trace.moe""" + result = { + "series_name": series.name, + "episodes_verified": 0, + "episode_results": [], + "summary": {} + } + + # PrĂ©paration des Ă©pisodes Ă  vĂ©rifier + episodes_to_verify = [] + + for episode in series.episodes: + if not episode.special: # Ne vĂ©rifier que les Ă©pisodes normaux + episodes_to_verify.append((episode.file_path, episode.episode, series.name)) + + # VĂ©rification par lot + if episodes_to_verify: + verification_results = self.trace_moe_client.batch_verify_episodes(episodes_to_verify) + + for i, (episode, verification_result) in enumerate(zip(series.episodes, verification_results)): + if not episode.special: # seulement les Ă©pisodes normaux + episode_result = { + "filename": episode.filename, + "expected_episode": episode.episode, + "identified_episode": verification_result.get("identified_episode"), + "confidence": verification_result.get("confidence", 0), + "match": verification_result.get("episode_match", False), + "error": verification_result.get("error") + } + + result["episode_results"].append(episode_result) + result["episodes_verified"] += 1 + + # RĂ©sumĂ© + matches = sum(1 for r in result["episode_results"] if r.get("match", False)) + result["summary"] = { + "total": len(result["episode_results"]), + "matches": matches, + "mismatches": len(result["episode_results"]) - matches, + "match_rate": matches / len(result["episode_results"]) if result["episode_results"] else 0 + } + + return result + + def _serialize_series(self, series: Series) -> Dict[str, Any]: + """SĂ©rialise un objet Series pour l'UI""" + completeness = series.check_completeness() + + return { + "name": series.name, + "directory": str(series.directory), + "total_episodes": len(series.episodes), + "regular_episodes": len(series.get_regular_episodes()), + "special_episodes": len(series.get_specials()), + "tvdb_id": series.tvdb_id, + "completeness": completeness, + "is_complete": completeness.get("is_complete", False), + "missing_episodes": completeness.get("missing_episodes", []), + "duplicate_episodes": completeness.get("duplicate_episodes", []), + "total_size": sum(ep.file_size for ep in series.episodes) + } + + def get_series_details(self, series_index: int) -> Optional[Dict[str, Any]]: + """ + Retourne les dĂ©tails d'une sĂ©rie + + Args: + series_index: Index de la sĂ©rie + + Returns: + DĂ©tails de la sĂ©rie ou None + """ + if 0 <= series_index < len(self.series_list): + series = self.series_list[series_index] + + return { + "info": self._serialize_series(series), + "episodes": [ + { + "filename": ep.filename, + "season": ep.season, + "episode": ep.episode, + "title": ep.title, + "special": ep.special, + "duration": ep.duration, + "resolution": ep.resolution, + "codec": ep.codec, + "file_size": ep.file_size, + "verified": ep.verified, + "absolute_number": ep.absolute_number + } + for ep in series.episodes + ] + } + + return None + + def get_application_status(self) -> Dict[str, Any]: + """Retourne le statut actuel de l'application""" + return { + "current_directory": str(self.current_directory) if self.current_directory else None, + "series_count": len(self.series_list), + "total_episodes": sum(len(series.episodes) for series in self.series_list), + "tvdb_configured": bool(self.config.get("thetvdb_api_key")), + "trace_moe_configured": bool(self.config.get("trace_moe_api_key")), + "tvdb_authenticated": self.tvdb_client.token is not None, + "trace_moe_limits": self.trace_moe_client.get_api_limits() + } \ No newline at end of file diff --git a/src/core/directory_checker.py b/src/core/directory_checker.py new file mode 100644 index 0000000..7b609e9 --- /dev/null +++ b/src/core/directory_checker.py @@ -0,0 +1,333 @@ +""" +VĂ©rificateur de compatibilitĂ© des rĂ©pertoires pour AnimeLibrarian +""" + +import os +import shutil +import stat +from pathlib import Path +from typing import List, Dict, Any, Optional +import logging + +from ..models.episode import DirectoryCompatibilityResult, Series, Episode +from .file_scanner import FileScanner +from .media_detector import MediaDetector + +logger = logging.getLogger(__name__) + + +class DirectoryChecker: + """VĂ©rifie si un rĂ©pertoire est compatible avec AnimeLibrarian""" + + def __init__(self, config: Dict[str, Any] = None): + self.config = config or {} + self.file_scanner = FileScanner(config) + self.media_detector = MediaDetector() + + # Extensions vidĂ©o supportĂ©es + self.video_extensions = { + '.mp4', '.mkv', '.avi', '.mov', '.wmv', '.flv', '.webm', + '.m4v', '.mpg', '.mpeg', '.3gp', '.ts', '.m2ts', '.ogv' + } + + # Tailles minimales/maximales pour les fichiers vidĂ©o + self.min_video_size = 50 * 1024 * 1024 # 50 Mo + self.max_video_size = 50 * 1024 * 1024 * 1024 # 50 Go + + def check_directory(self, directory_path: str) -> DirectoryCompatibilityResult: + """ + VĂ©rifie la compatibilitĂ© d'un rĂ©pertoire + + Args: + directory_path: Chemin du rĂ©pertoire Ă  vĂ©rifier + + Returns: + DirectoryCompatibilityResult: RĂ©sultat dĂ©taillĂ© de la vĂ©rification + """ + logger.info(f"VĂ©rification du rĂ©pertoire: {directory_path}") + + path = Path(directory_path).resolve() + result = DirectoryCompatibilityResult(path=path) + + # 1. VĂ©rification de base du rĂ©pertoire + self._check_basic_directory(path, result) + + if not result.is_compatible: + return result + + # 2. VĂ©rification des permissions + self._check_permissions(path, result) + + # 3. VĂ©rification de l'espace disque + self._check_disk_space(path, result) + + # 4. Analyse des sĂ©ries et Ă©pisodes + if not self._analyze_media_structure(path, result): + return result + + # 5. VĂ©rification finale de compatibilitĂ© + self._finalize_compatibility(result) + + logger.info(f"VĂ©rification terminĂ©e: {result.summary()}") + return result + + def _check_basic_directory(self, path: Path, result: DirectoryCompatibilityResult): + """VĂ©rifications de base du rĂ©pertoire""" + + # Existence du rĂ©pertoire + if not path.exists(): + result.add_error(f"Le rĂ©pertoire n'existe pas: {path}") + return + + # C'est bien un rĂ©pertoire + if not path.is_dir(): + result.add_error(f"Le chemin n'est pas un rĂ©pertoire: {path}") + return + + # AccessibilitĂ© + if not os.access(path, os.R_OK): + result.add_error(f"Impossible de lire le rĂ©pertoire: {path}") + return + + # Non-vide + try: + items = list(path.iterdir()) + if not items: + result.add_error("Le rĂ©pertoire est vide") + return + except PermissionError: + result.add_error(f"Permission refusĂ©e lors de l'accĂšs Ă : {path}") + return + + logger.debug("✅ VĂ©rifications de base rĂ©ussies") + + def _check_permissions(self, path: Path, result: DirectoryCompatibilityResult): + """VĂ©rifie les permissions du rĂ©pertoire""" + permissions = {} + + # Lecture + permissions['read'] = os.access(path, os.R_OK) + + # Écriture + permissions['write'] = os.access(path, os.W_OK) + + # ExĂ©cution (navigation) + permissions['execute'] = os.access(path, os.X_OK) + + # PropriĂ©taire + try: + stat_info = path.stat() + permissions['is_owner'] = (stat_info.st_uid == os.getuid()) + except: + permissions['is_owner'] = False + + result.permissions = permissions + + # Ajout des avertissements si nĂ©cessaire + if not permissions['write']: + result.add_warning("Le rĂ©pertoire n'est pas accessible en Ă©criture (renommage impossible)") + + if not permissions['execute']: + result.add_error("Impossible de naviguer dans le rĂ©pertoire") + result.is_compatible = False + + logger.debug(f"Permissions: {permissions}") + + def _check_disk_space(self, path: Path, result: DirectoryCompatibilityResult): + """VĂ©rifie l'espace disque disponible""" + try: + stat_info = shutil.disk_usage(path) + + total = stat_info.total + free = stat_info.free + used = stat_info.used + + result.disk_space = { + 'total': total, + 'free': free, + 'used': used, + 'free_percent': round((free / total) * 100, 2) + } + + # Avertissement si moins de 10% d'espace libre + if result.disk_space['free_percent'] < 10: + result.add_warning(f"Moins de 10% d'espace disque disponible ({result.disk_space['free_percent']:.1f}%)") + + # Erreur si moins de 1 Go + if free < 1024 * 1024 * 1024: + result.add_warning("Moins de 1 Go d'espace disque disponible") + + except Exception as e: + result.add_warning(f"Impossible de vĂ©rifier l'espace disque: {e}") + + logger.debug(f"Espace disque: {result.disk_space}") + + def _analyze_media_structure(self, path: Path, result: DirectoryCompatibilityResult) -> bool: + """ + Analyse la structure des mĂ©dias dans le rĂ©pertoire + + Returns: + bool: True si la structure est valide + """ + try: + # Scan des fichiers multimĂ©dia + media_files = self.file_scanner.scan_directory(path, self.video_extensions) + + if not media_files: + result.add_error("Aucun fichier multimĂ©dia trouvĂ©") + return False + + result.total_episodes = len(media_files) + + # Analyse de la structure des rĂ©pertoires + series_found = self._detect_series_structure(path, media_files) + + if not series_found: + result.add_error("Structure de sĂ©ries non dĂ©tectĂ©e (doit contenir des sous-rĂ©pertoires par sĂ©rie)") + return False + + result.found_series = list(series_found.keys()) + + # Calcul de la taille totale + total_size = 0 + for file_info in media_files: + total_size += file_info['size'] + + result.total_size = total_size + + # Validation de la qualitĂ© des fichiers + self._validate_media_files(media_files, result) + + logger.debug(f"SĂ©ries trouvĂ©es: {len(result.found_series)}, Épisodes: {result.total_episodes}") + return True + + except Exception as e: + result.add_error(f"Erreur lors de l'analyse des mĂ©dias: {e}") + return False + + def _detect_series_structure(self, path: Path, media_files: List[Dict]) -> Dict[str, List[Dict]]: + """ + DĂ©tecte la structure des sĂ©ries + + Returns: + Dict: Nom de sĂ©rie -> Liste de fichiers + """ + series_structure = {} + + # Analyse de la structure de rĂ©pertoires + for item in path.iterdir(): + if item.is_dir(): + # VĂ©rifier si le rĂ©pertoire contient des vidĂ©os + series_videos = self.file_scanner.scan_directory(item, self.video_extensions) + if series_videos: + series_name = item.name + series_structure[series_name] = series_videos + + # Si pas de sous-rĂ©pertoires, essayer de grouper par nom de fichier + if not series_structure: + series_structure = self._group_flat_files(media_files) + + return series_structure + + def _group_flat_files(self, media_files: List[Dict]) -> Dict[str, List[Dict]]: + """Groupe les fichiers plats par nom de sĂ©rie""" + import re + + series_structure = {} + + for file_info in media_files: + filename = file_info['path'].stem + + # Tentative d'extraction du nom de sĂ©rie + # Format attendu: [SeriesName] S01E01 ou SeriesName S01E01 + pattern = r'^\[?([^\[\]]+)\]?\s*[_-]?\s*S\d{1,2}E\d{1,2}' + match = re.search(pattern, filename, re.IGNORECASE) + + if match: + series_name = match.group(1).strip() + else: + # Si pas de format SxxEyy, utiliser le dĂ©but du nom + pattern = r'^([^-_]+)' + match = re.search(pattern, filename) + series_name = match.group(1).strip() if match else filename + + if series_name not in series_structure: + series_structure[series_name] = [] + + series_structure[series_name].append(file_info) + + return series_structure + + def _validate_media_files(self, media_files: List[Dict], result: DirectoryCompatibilityResult): + """Valide la qualitĂ© des fichiers multimĂ©dia""" + invalid_files = [] + + for file_info in media_files: + file_path = file_info['path'] + file_size = file_info['size'] + + # VĂ©rification de la taille + if file_size < self.min_video_size: + invalid_files.append(f"{file_path.name} (trop petit: {file_size / 1024 / 1024:.1f} Mo)") + continue + + if file_size > self.max_video_size: + result.add_warning(f"{file_path.name} (taille inhabituelle: {file_size / 1024 / 1024:.1f} Mo)") + + # VĂ©rification de l'extention + if file_path.suffix.lower() not in self.video_extensions: + invalid_files.append(f"{file_path.name} (extension non supportĂ©e)") + + if invalid_files: + result.add_error(f"Fichiers invalides dĂ©tectĂ©s: {', '.join(invalid_files[:5])}") + if len(invalid_files) > 5: + result.add_error(f"...et {len(invalid_files) - 5} autres fichiers") + + def _finalize_compatibility(self, result: DirectoryCompatibilityResult): + """Finalise la dĂ©cision de compatibilitĂ©""" + + # Si on a des erreurs, c'est incompatible + if result.errors: + result.is_compatible = False + return + + # VĂ©rifications minimales + if len(result.found_series) == 0: + result.add_error("Aucune sĂ©rie dĂ©tectĂ©e") + result.is_compatible = False + return + + if result.total_episodes == 0: + result.add_error("Aucun Ă©pisode dĂ©tectĂ©") + result.is_compatible = False + return + + # Si on peut Ă©crire, c'est compatible + if result.permissions.get('write', False): + result.is_compatible = True + else: + # On peut quand mĂȘme utiliser en lecture seule + result.is_compatible = True + result.add_warning("Mode lecture seule (renommage impossible)") + + def get_compatibility_recommendations(self, result: DirectoryCompatibilityResult) -> List[str]: + """Retourne des recommandations pour amĂ©liorer la compatibilitĂ©""" + recommendations = [] + + if result.errors: + recommendations.append("Corrigez les erreurs avant de continuer") + + if not result.permissions.get('write', False): + recommendations.append("Assurez-vous d'avoir les droits d'Ă©criture pour le renommage") + + if result.disk_space.get('free_percent', 100) < 10: + recommendations.append("LibĂ©rez de l'espace disque avant de commencer") + + if len(result.found_series) == 0: + recommendations.append("Organisez vos fichiers en sous-rĂ©pertoires par sĂ©rie") + recommendations.append("Exemple: /Series/NomDeLaSaison/S01E01.mkv") + + if result.total_episodes > 0 and result.total_size == 0: + recommendations.append("Certains fichiers semblent corrompus ou vides") + + return recommendations \ No newline at end of file diff --git a/src/core/file_renamer.py b/src/core/file_renamer.py new file mode 100644 index 0000000..2efa201 --- /dev/null +++ b/src/core/file_renamer.py @@ -0,0 +1,475 @@ +""" +Module de renommage de fichiers - Projet de la LĂ©gion de Muyue +""" + +import os +import shutil +from pathlib import Path +from typing import Dict, Any, List, Optional, Tuple +import logging + +from ..models.episode import Episode, Series +from ..api.thetvdb_client import TheTVDBClient + +logger = logging.getLogger(__name__) + + +class FileRenamer: + """GĂšre le renommage des fichiers selon les normes TVDB""" + + def __init__(self, config: Dict[str, Any] = None): + """ + Initialise le module de renommage + + Args: + config: Configuration du renommage + """ + self.config = config or {} + + # Options de renommage + self.dry_run = self.config.get("dry_run", False) + self.backup_original = self.config.get("backup_original", True) + self.include_absolute_number = self.config.get("include_absolute_number", True) + self.include_episode_title = self.config.get("include_episode_title", True) + self.include_technical_info = self.config.get("include_technical_info", True) + + # Client TVDB pour les titres + self.tvdb_client = TheTVDBClient( + api_key=self.config.get("thetvdb_api_key"), + language=self.config.get("language", "fra") + ) + + # Cache pour les titres d'Ă©pisodes + self.title_cache = {} + + # Statistiques + self.stats = { + "renamed": 0, + "skipped": 0, + "errors": 0, + "backups_created": 0 + } + + def prepare_rename_plan(self, series: Series) -> List[Dict[str, Any]]: + """ + PrĂ©pare un plan de renommage pour une sĂ©rie + + Args: + series: SĂ©rie Ă  traiter + + Returns: + Liste des opĂ©rations de renommage planifiĂ©es + """ + logger.info(f"PrĂ©paration du plan de renommage pour: {series.name}") + + # Authentification TVDB si possible + self.tvdb_client.login() + + # Recherche de la sĂ©rie sur TVDB + tvdb_series = None + if series.tvdb_id: + tvdb_series = self.tvdb_client.get_series_by_id(series.tvdb_id) + else: + tvdb_series = self.tvdb_client.search_best_match(series.name) + if tvdb_series: + series.tvdb_id = tvdb_series.get("id") + + # Construction de la map d'Ă©pisodes TVDB si disponible + episode_map = {} + if tvdb_series and series.tvdb_id: + episode_map = self.tvdb_client.build_episode_map(series.tvdb_id) + + # PrĂ©paration du plan de renommage + rename_plan = [] + + for episode in series.episodes: + # Titre de l'Ă©pisode + episode_title = self._get_episode_title(episode, tvdb_series, episode_map) + + # GĂ©nĂ©ration du nouveau nom + new_name = self._generate_new_name(episode, series.name, episode_title) + + # VĂ©rification si le renommage est nĂ©cessaire + if new_name != episode.filename: + rename_plan.append({ + "episode": episode, + "old_path": episode.file_path, + "new_name": new_name, + "new_path": episode.file_path.parent / new_name, + "reason": self._get_rename_reason(episode, series.name, episode_title), + "tvdb_matched": bool(episode_title and episode_title != f"Episode {episode.episode:02d}") + }) + + logger.info(f"Plan de renommage: {len(rename_plan)} fichiers Ă  renommer") + return rename_plan + + def execute_rename_plan(self, rename_plan: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + ExĂ©cute un plan de renommage + + Args: + rename_plan: Plan de renommage Ă  exĂ©cuter + + Returns: + RĂ©sultats des opĂ©rations de renommage + """ + logger.info(f"ExĂ©cution du plan de renommage: {len(rename_plan)} opĂ©rations") + + results = [] + + for operation in rename_plan: + result = self._rename_file(operation) + results.append(result) + + if result["success"]: + self.stats["renamed"] += 1 + if result["backup_created"]: + self.stats["backups_created"] += 1 + else: + self.stats["errors"] += 1 + + logger.info(f"Plan exĂ©cutĂ©: {self.stats['renamed']} renommĂ©s, {self.stats['errors']} erreurs") + return results + + def _rename_file(self, operation: Dict[str, Any]) -> Dict[str, Any]: + """ + Renomme un fichier spĂ©cifique + + Args: + operation: DĂ©tails de l'opĂ©ration de renommage + + Returns: + RĂ©sultat de l'opĂ©ration + """ + episode = operation["episode"] + old_path = operation["old_path"] + new_path = operation["new_path"] + + result = { + "episode": episode, + "old_path": old_path, + "new_path": new_path, + "success": False, + "error": None, + "backup_created": False, + "dry_run": self.dry_run + } + + try: + # VĂ©rifications prĂ©alables + if not old_path.exists(): + result["error"] = "Le fichier source n'existe plus" + return result + + if new_path.exists(): + result["error"] = "Le fichier de destination existe dĂ©jĂ " + return result + + if not old_path.parent.exists(): + result["error"] = "Le rĂ©pertoire source n'existe plus" + return result + + # VĂ©rification des permissions + if not os.access(old_path, os.R_OK): + result["error"] = "Permission de lecture refusĂ©e" + return result + + if not os.access(old_path.parent, os.W_OK): + result["error"] = "Permission d'Ă©criture refusĂ©e" + return result + + # Mode dry-run: juste simulation + if self.dry_run: + result["success"] = True + result["message"] = f"[DRY-RUN] {old_path.name} -> {new_path.name}" + logger.info(f"[DRY-RUN] Renommage simulĂ©: {old_path.name} -> {new_path.name}") + return result + + # CrĂ©ation de la sauvegarde si demandĂ© + backup_path = None + if self.backup_original: + backup_path = self._create_backup(old_path) + result["backup_created"] = backup_path is not None + + # Renommage effectif + try: + old_path.rename(new_path) + result["success"] = True + result["message"] = f"RenommĂ©: {old_path.name} -> {new_path.name}" + logger.info(f"Fichier renommĂ©: {old_path.name} -> {new_path.name}") + + # Mise Ă  jour du chemin dans l'Ă©pisode + episode.file_path = new_path + + except Exception as rename_error: + # Restauration de la sauvegarde si disponible + if backup_path and backup_path.exists(): + try: + shutil.copy2(backup_path, old_path) + logger.info(f"Sauvegarde restaurĂ©e suite Ă  l'erreur: {old_path}") + except: + pass + + result["error"] = f"Erreur lors du renommage: {rename_error}" + logger.error(f"Erreur renommage {old_path}: {rename_error}") + + except Exception as e: + result["error"] = f"Erreur lors de la prĂ©paration du renommage: {e}" + logger.error(f"Erreur prĂ©paration renommage {old_path}: {e}") + + return result + + def _create_backup(self, file_path: Path) -> Optional[Path]: + """ + CrĂ©e une sauvegarde du fichier + + Args: + file_path: Chemin du fichier Ă  sauvegarder + + Returns: + Chemin de la sauvegarde ou None si erreur + """ + try: + # Nom de la sauvegarde avec timestamp + import datetime + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + backup_name = f"{file_path.stem}_backup_{timestamp}{file_path.suffix}" + backup_path = file_path.parent / ".backups" / backup_name + + # CrĂ©ation du rĂ©pertoire de sauvegarde + backup_path.parent.mkdir(exist_ok=True) + + # Copie du fichier + shutil.copy2(file_path, backup_path) + logger.debug(f"Sauvegarde créée: {backup_path}") + + return backup_path + + except Exception as e: + logger.error(f"Erreur crĂ©ation sauvegarde {file_path}: {e}") + return None + + def _get_episode_title(self, episode: Episode, tvdb_series: Dict[str, Any], + episode_map: Dict[Tuple[int, int], Dict[str, Any]]) -> str: + """ + RĂ©cupĂšre le titre d'un Ă©pisode depuis TVDB + + Args: + episode: Épisode concernĂ© + tvdb_series: SĂ©rie TVDB + episode_map: Map des Ă©pisodes TVDB + + Returns: + Titre de l'Ă©pisode + """ + # VĂ©rification du cache + cache_key = (episode.series_name, episode.season, episode.episode) + if cache_key in self.title_cache: + return self.title_cache[cache_key] + + # RĂ©cupĂ©ration depuis TVDB + title = None + if tvdb_series and episode_map: + episode_key = (episode.season, episode.episode) + if episode_key in episode_map: + title = episode_map[episode_key].get("episodeName") + + # Fallback: utilisation du titre existant ou titre gĂ©nĂ©rique + if not title: + title = episode.title if episode.title else f"Episode {episode.episode:02d}" + + # Mise en cache + self.title_cache[cache_key] = title + return title + + def _generate_new_name(self, episode: Episode, series_name: str, episode_title: str) -> str: + """ + GĂ©nĂšre le nouveau nom de fichier pour un Ă©pisode + + Args: + episode: Épisode Ă  renommer + series_name: Nom de la sĂ©rie + episode_title: Titre de l'Ă©pisode + + Returns: + Nouveau nom de fichier + """ + # Nettoyage du nom de sĂ©rie + clean_series = self._clean_filename(series_name) + + # Construction du nom de base + if episode.special: + base_name = f"{clean_series} - S00E{episode.episode:02d}" + else: + base_name = f"{clean_series} - S{episode.season:02d}E{episode.episode:02d}" + + # NumĂ©ro absolu si disponible + if self.include_absolute_number and episode.absolute_number: + base_name += f" ({episode.absolute_number})" + + # Titre de l'Ă©pisode + if self.include_episode_title and episode_title: + clean_title = self._clean_filename(episode_title) + base_name += f" - {clean_title}" + + # Informations techniques + if self.include_technical_info: + technical_info = self._build_technical_info(episode) + if technical_info: + base_name += f" [{technical_info}]" + + return f"{base_name}{episode.file_path.suffix}" + + def _clean_filename(self, name: str) -> str: + """Nettoie une chaĂźne pour l'utiliser dans un nom de fichier""" + if not name: + return "" + + # Suppression des caractĂšres invalides + invalid_chars = '<>:"/\\|?*' + for char in invalid_chars: + name = name.replace(char, '') + + # Remplacement des caractĂšres problĂ©matiques + name = name.replace('\n', ' ').replace('\r', ' ') + + # Normalisation des espaces + while ' ' in name: + name = name.replace(' ', ' ') + + # Nettoyage au dĂ©but et Ă  la fin + name = name.strip(' -._') + + return name + + def _build_technical_info(self, episode: Episode) -> str: + """Construit la chaĂźne d'informations techniques""" + parts = [] + + # RĂ©solution + if episode.resolution: + parts.append(episode.resolution) + + # Codec + if episode.codec: + parts.append(episode.codec) + + # Source (extrait du nom si disponible) + source = self._extract_source_from_filename(episode.filename) + if source: + parts.append(source) + + # Checksum si disponible + if episode.checksum and len(episode.checksum) >= 8: + parts.append(episode.checksum[:8]) + + return ' '.join(parts) + + def _extract_source_from_filename(self, filename: str) -> Optional[str]: + """Extrait la source du nom de fichier""" + import re + + patterns = [ + r'(BD|Blu-?Ray|BluRay)', + r'(WEB[-\s]?DL|WEBRip)', + r'(HDTV|TV)', + r'(DVD)' + ] + + for pattern in patterns: + match = re.search(pattern, filename, re.IGNORECASE) + if match: + return match.group(1).upper() + + return None + + def _get_rename_reason(self, episode: Episode, series_name: str, episode_title: str) -> str: + """GĂ©nĂšre la raison du renommage pour les logs""" + reasons = [] + + # VĂ©rification du format SxxEyy + if not self._has_standard_episode_format(episode.filename): + reasons.append("Format SxxEyy manquant") + + # VĂ©rification du nom de sĂ©rie + if not self._has_correct_series_name(episode.filename, series_name): + reasons.append("Nom de sĂ©rie incorrect") + + # VĂ©rification du titre d'Ă©pisode + if self.include_episode_title and episode_title: + if not self._has_episode_title(episode.filename): + reasons.append("Titre d'Ă©pisode manquant") + + # VĂ©rification des informations techniques + if self.include_technical_info and not self._has_technical_info(episode.filename): + reasons.append("Informations techniques manquantes") + + # VĂ©rification de la propretĂ© du nom + if self._has_invalid_characters(episode.filename): + reasons.append("CaractĂšres invalides dĂ©tectĂ©s") + + return "; ".join(reasons) if reasons else "Formatage selon les standards TVDB" + + def _has_standard_episode_format(self, filename: str) -> bool: + """VĂ©rifie si le nom de fichier utilise le format SxxEyy standard""" + import re + return bool(re.search(r'S\d{1,2}E\d{1,2}', filename, re.IGNORECASE)) + + def _has_correct_series_name(self, filename: str, series_name: str) -> bool: + """VĂ©rifie si le nom de sĂ©rie est correct""" + import re + clean_series = self._clean_filename(series_name).lower() + clean_filename = filename.lower() + + return clean_series in clean_filename + + def _has_episode_title(self, filename: str) -> bool: + """VĂ©rifie si le nom de fichier contient un titre d'Ă©pisode""" + # Heuristique: recherche de patterns typiques de titres + import re + + # Patterns qui suggĂšrent un titre + patterns = [ + r'S\d{1,2}E\d{1,2}\s*[-_]\s*[^.\[\]()]+', # S01E01 - Title + r'S\d{1,2}E\d{1,2}\s*([^.\[\]()]+)', # S01E01 Title + ] + + for pattern in patterns: + if re.search(pattern, filename, re.IGNORECASE): + return True + + return False + + def _has_technical_info(self, filename: str) -> bool: + """VĂ©rifie si le nom de fichier contient des informations techniques""" + import re + + tech_patterns = [ + r'\d{3,4}p', # 1080p, 720p + r'(H\.?264|H\.?265|x264|x265)', # Codecs + r'(BD|WEB|DVD|HDTV)' # Sources + ] + + for pattern in tech_patterns: + if re.search(pattern, filename, re.IGNORECASE): + return True + + return False + + def _has_invalid_characters(self, filename: str) -> bool: + """VĂ©rifie la prĂ©sence de caractĂšres invalides""" + invalid_chars = '<>:"|?*' + return any(char in filename for char in invalid_chars) + + def get_stats(self) -> Dict[str, Any]: + """Retourne les statistiques de renommage""" + return self.stats.copy() + + def reset_stats(self): + """RĂ©initialise les statistiques""" + self.stats = { + "renamed": 0, + "skipped": 0, + "errors": 0, + "backups_created": 0 + } \ No newline at end of file diff --git a/src/core/file_scanner.py b/src/core/file_scanner.py new file mode 100644 index 0000000..9d6a9ed --- /dev/null +++ b/src/core/file_scanner.py @@ -0,0 +1,239 @@ +""" +Scanneur de fichiers pour AnimeLibrarian +""" + +import os +import hashlib +from pathlib import Path +from typing import List, Dict, Any, Set, Optional +import logging + +logger = logging.getLogger(__name__) + + +class FileScanner: + """Scanne les fichiers multimĂ©dia dans les rĂ©pertoires""" + + def __init__(self, config: Dict[str, Any] = None): + self.config = config or {} + + # Filtres de fichiers et rĂ©pertoires Ă  ignorer + self.ignore_patterns = { + # RĂ©pertoires + '.git', '.svn', '.hg', '.bzr', '__pycache__', 'node_modules', '.vscode', + '.idea', '.DS_Store', 'Thumbs.db', 'Desktop.ini', 'System Volume Information', + '$RECYCLE.BIN', 'Recycled', 'Temp', 'tmp', 'cache', 'Cache', + + # Extensions de fichiers Ă  ignorer + '.txt', '.log', '.nfo', '.sfv', '.md5', '.sha1', '.dat', '.db', + '.ini', '.conf', '.cfg', '.xml', '.json', '.yml', '.yaml', + '.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.ico', + '.mp3', '.wav', '.flac', '.ogg', '.m4a', '.aac', + '.srt', '.ass', '.ssa', '.sub', '.idx', '.vtt', + '.zip', '.rar', '.7z', '.tar', '.gz', '.bz2', '.exe', '.dmg', + + # Fichiers de samples + 'sample', 'SAMPLE', 'Sample' + } + + def scan_directory(self, directory: Path, extensions: Set[str], + recursive: bool = True, max_depth: int = 10) -> List[Dict[str, Any]]: + """ + Scanne un rĂ©pertoire Ă  la recherche de fichiers avec les extensions spĂ©cifiĂ©es + + Args: + directory: RĂ©pertoire Ă  scanner + extensions: Set d'extensions de fichiers Ă  rechercher + recursive: Si True, scan rĂ©cursivement les sous-rĂ©pertoires + max_depth: Profondeur maximale de scan rĂ©cursif + + Returns: + Liste de dictionnaires avec informations sur chaque fichier + """ + logger.debug(f"Scan du rĂ©pertoire: {directory} (extensions: {extensions})") + + if not directory.exists() or not directory.is_dir(): + logger.warning(f"RĂ©pertoire invalide: {directory}") + return [] + + media_files = [] + + try: + if recursive: + for depth, root, dirs, files in self._walk_directory(directory, max_depth): + # Filtrage des rĂ©pertoires Ă  ignorer + dirs[:] = [d for d in dirs if not self._should_ignore(d, is_directory=True)] + + # Analyse des fichiers + for filename in files: + if self._should_ignore(filename): + continue + + file_path = Path(root) / filename + if file_path.suffix.lower() in extensions: + file_info = self._analyze_file(file_path) + if file_info: + media_files.append(file_info) + else: + for item in directory.iterdir(): + if item.is_file() and not self._should_ignore(item.name): + if item.suffix.lower() in extensions: + file_info = self._analyze_file(item) + if file_info: + media_files.append(file_info) + + except PermissionError: + logger.error(f"Permission refusĂ©e lors du scan de: {directory}") + except Exception as e: + logger.error(f"Erreur lors du scan de {directory}: {e}") + + logger.debug(f"TrouvĂ© {len(media_files)} fichiers multimĂ©dia") + return media_files + + def _walk_directory(self, directory: Path, max_depth: int): + """GĂ©nĂ©rateur pour parcourir les rĂ©pertoires avec contrĂŽle de profondeur""" + directory = directory.resolve() + for root, dirs, files in os.walk(directory): + depth = Path(root).relative_to(directory).parts + if len(depth) > max_depth: + dirs[:] = [] # Ne pas explorer plus profondĂ©ment + continue + + yield (len(depth), root, dirs, files) + + def _should_ignore(self, name: str, is_directory: bool = False) -> bool: + """DĂ©termine si un fichier/rĂ©pertoire doit ĂȘtre ignorĂ©""" + + # Ignorer les fichiers cachĂ©s + if name.startswith('.') and not name.startswith('._'): + return True + + # Ignorer les fichiers systĂšme + if name.lower() in {'system volume information', '$recycle.bin', 'recycled'}: + return True + + # VĂ©rification des patterns d'ignore + for pattern in self.ignore_patterns: + if pattern.lower() in name.lower(): + return True + + return False + + def _analyze_file(self, file_path: Path) -> Optional[Dict[str, Any]]: + """Analyse un fichier et retourne ses mĂ©tadonnĂ©es""" + try: + stat_info = file_path.stat() + + file_info = { + 'path': file_path, + 'name': file_path.name, + 'stem': file_path.stem, + 'suffix': file_path.suffix.lower(), + 'size': stat_info.st_size, + 'created': stat_info.st_ctime, + 'modified': stat_info.st_mtime, + 'is_readable': os.access(file_path, os.R_OK), + 'is_writable': os.access(file_path, os.W_OK) + } + + # Calcul du checksum si nĂ©cessaire (optionnel pour les gros fichiers) + if stat_info.st_size < 1024 * 1024 * 100: # < 100 Mo + try: + file_info['checksum'] = self._calculate_checksum(file_path) + except: + file_info['checksum'] = None + + return file_info + + except Exception as e: + logger.error(f"Erreur lors de l'analyse du fichier {file_path}: {e}") + return None + + def _calculate_checksum(self, file_path: Path, algorithm: str = 'md5') -> str: + """Calcule le checksum d'un fichier""" + hash_func = hashlib.new(algorithm) + + with open(file_path, 'rb') as f: + for chunk in iter(lambda: f.read(8192), b''): + hash_func.update(chunk) + + return hash_func.hexdigest() + + def get_duplicates(self, files: List[Dict[str, Any]]) -> List[List[Dict[str, Any]]]: + """Identifie les fichiers en double basĂ©s sur le nom et la taille""" + + # Groupement par nom et taille + groups = {} + + for file_info in files: + key = (file_info['name'], file_info['size']) + if key not in groups: + groups[key] = [] + groups[key].append(file_info) + + # Filtrage pour ne garder que les groupes avec doublons + duplicates = [group for group in groups.values() if len(group) > 1] + + return duplicates + + def get_checksum_duplicates(self, files: List[Dict[str, Any]]) -> List[List[Dict[str, Any]]]: + """Identifie les fichiers en double basĂ©s sur le checksum""" + + # S'assurer que tous les fichiers ont un checksum + for file_info in files: + if 'checksum' not in file_info or not file_info['checksum']: + file_info['checksum'] = self._calculate_checksum(file_info['path']) + + # Groupement par checksum + groups = {} + for file_info in files: + checksum = file_info['checksum'] + if checksum: + if checksum not in groups: + groups[checksum] = [] + groups[checksum].append(file_info) + + # Filtrage pour ne garder que les groupes avec doublons + duplicates = [group for group in groups.values() if len(group) > 1] + + return duplicates + + def get_file_stats(self, files: List[Dict[str, Any]]) -> Dict[str, Any]: + """Retourne des statistiques sur les fichiers""" + if not files: + return { + 'total_files': 0, + 'total_size': 0, + 'avg_size': 0, + 'max_size': 0, + 'min_size': 0, + 'extensions': {}, + 'total_readable': 0, + 'total_writable': 0 + } + + total_size = sum(f['size'] for f in files) + avg_size = total_size // len(files) + max_size = max(f['size'] for f in files) + min_size = min(f['size'] for f in files) + + # Compte par extension + extensions = {} + for file_info in files: + ext = file_info['suffix'] + extensions[ext] = extensions.get(ext, 0) + 1 + + # Permissions + readable = sum(1 for f in files if f['is_readable']) + writable = sum(1 for f in files if f['is_writable']) + + return { + 'total_files': len(files), + 'total_size': total_size, + 'avg_size': avg_size, + 'max_size': max_size, + 'min_size': min_size, + 'extensions': extensions, + 'total_readable': readable, + 'total_writable': writable + } \ No newline at end of file diff --git a/src/core/media_detector.py b/src/core/media_detector.py new file mode 100644 index 0000000..57c31f9 --- /dev/null +++ b/src/core/media_detector.py @@ -0,0 +1,352 @@ +""" +DĂ©tecteur de mĂ©dias et analyseur de fichiers multimĂ©dia +""" + +import os +import re +import subprocess +from pathlib import Path +from typing import List, Dict, Any, Optional, Tuple +import logging + +logger = logging.getLogger(__name__) + + +class MediaDetector: + """DĂ©tecte et analyse les fichiers multimĂ©dia""" + + def __init__(self, config: Dict[str, Any] = None): + self.config = config or {} + self.ffprobe_path = self._find_ffprobe() + + # Patterns pour dĂ©tecter les numĂ©ros d'Ă©pisodes + self.episode_patterns = [ + # S01E01, S1E1 + r'(?:S(\d{1,2})\s*[-_.]?\s*)?E(\d{1,2})', + # 01, Episode 01, Ep 01 + r'(?:Episode|Ep)\s*(\d{1,3})', + # Numbers in brackets: [01], (01) + r'[\[({](\d{1,3})[\])}]', + # Standalone numbers at end + r'(\d{1,3})(?:\.\w+)?$', + # Pattern for absolute numbering: 001-100 + r'(\d{3})-(?:\d{3})' + ] + + # Patterns pour dĂ©tecter les informations techniques + self.resolution_patterns = [ + r'(\d{3,4}p)', # 1080p, 720p, 480p + r'(4K|UHD)', # 4K, UHD + r'(HD|SD)', # HD, SD + ] + + self.codec_patterns = [ + r'(H\.?264|H\.?265|AVC|HEVC|x264|x265)', + r'(XVID|DIVX)', + r'(VP9|AV1)', + ] + + self.source_patterns = [ + r'(BD|Blu-?Ray)', + r'(WEB|WEB-?DL|WEBRip)', + r'(DVD)', + r'(HDTV|TV)', + ] + + def _find_ffprobe(self) -> Optional[str]: + """Cherche le chemin de ffprobe""" + try: + result = subprocess.run( + ['ffprobe', '-version'], + capture_output=True, + text=True, + timeout=5 + ) + if result.returncode == 0: + return 'ffprobe' + except: + pass + + # Chemins courants + common_paths = [ + '/usr/bin/ffprobe', + '/usr/local/bin/ffprobe', + '/opt/homebrew/bin/ffprobe', + ] + + for path in common_paths: + if os.path.exists(path): + return path + + return None + + def analyze_media_file(self, file_path: Path) -> Dict[str, Any]: + """ + Analyse un fichier multimĂ©dia et extrait ses mĂ©tadonnĂ©es + + Returns: + Dict contenant les mĂ©tadonnĂ©es du fichier + """ + logger.debug(f"Analyse du fichier: {file_path}") + + metadata = { + 'file_path': file_path, + 'duration': None, + 'resolution': None, + 'codec': None, + 'bitrate': None, + 'audio_tracks': 0, + 'subtitle_tracks': 0, + 'file_size': 0, + 'is_valid': False + } + + # Taille du fichier + try: + metadata['file_size'] = file_path.stat().st_size + except: + pass + + # Analyse avec ffprobe si disponible + if self.ffprobe_path: + metadata.update(self._analyze_with_ffprobe(file_path)) + + # Analyse basĂ©e sur le nom de fichier + filename_info = self._analyze_filename(file_path.name) + metadata.update(filename_info) + + # Validation du fichier + metadata['is_valid'] = self._validate_file(metadata) + + return metadata + + def _analyze_with_ffprobe(self, file_path: Path) -> Dict[str, Any]: + """Analyse un fichier avec ffprobe""" + metadata = {} + + try: + cmd = [ + self.ffprobe_path, + '-v', 'quiet', + '-print_format', 'json', + '-show_format', + '-show_streams', + str(file_path) + ] + + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=30 + ) + + if result.returncode == 0: + import json + probe_data = json.loads(result.stdout) + + # Informations du format + format_info = probe_data.get('format', {}) + metadata['duration'] = float(format_info.get('duration', 0)) + metadata['bitrate'] = int(format_info.get('bit_rate', 0)) + + # Analyse des streams + video_streams = [] + audio_streams = [] + subtitle_streams = [] + + for stream in probe_data.get('streams', []): + if stream.get('codec_type') == 'video': + video_streams.append(stream) + elif stream.get('codec_type') == 'audio': + audio_streams.append(stream) + elif stream.get('codec_type') == 'subtitle': + subtitle_streams.append(stream) + + metadata['audio_tracks'] = len(audio_streams) + metadata['subtitle_tracks'] = len(subtitle_streams) + + # Informations vidĂ©o + if video_streams: + video = video_streams[0] + metadata['codec'] = video.get('codec_name', '').upper() + + width = video.get('width') + height = video.get('height') + if width and height: + metadata['resolution'] = f"{height}p" + + # DĂ©tection du 4K + if height >= 2160: + metadata['resolution'] = "4K" + + except subprocess.TimeoutExpired: + logger.warning(f"Timeout lors de l'analyse de {file_path}") + except Exception as e: + logger.warning(f"Erreur ffprobe pour {file_path}: {e}") + + return metadata + + def _analyze_filename(self, filename: str) -> Dict[str, Any]: + """Extrait les informations du nom de fichier""" + metadata = {} + + # Nettoyage du nom de fichier (sans l'extension) + clean_name = Path(filename).stem.lower() + + # Recherche des informations techniques + for pattern in self.resolution_patterns: + match = re.search(pattern, clean_name, re.IGNORECASE) + if match and 'resolution' not in metadata: + metadata['resolution'] = match.group(1).upper() + break + + for pattern in self.codec_patterns: + match = re.search(pattern, clean_name, re.IGNORECASE) + if match and 'codec' not in metadata: + metadata['codec'] = match.group(1).upper() + break + + # DĂ©tection de la source + for pattern in self.source_patterns: + match = re.search(pattern, clean_name, re.IGNORECASE) + if match and 'source' not in metadata: + metadata['source'] = match.group(1).upper() + break + + return metadata + + def _validate_file(self, metadata: Dict[str, Any]) -> bool: + """Valide si le fichier semble ĂȘtre un Ă©pisode valide""" + + # Taille minimale (50 Mo) + if metadata.get('file_size', 0) < 50 * 1024 * 1024: + return False + + # DurĂ©e minimale (5 minutes) + if metadata.get('duration', 0) and metadata['duration'] < 300: + return False + + # Codec vidĂ©o + valid_codecs = {'H264', 'H265', 'AVC', 'HEVC', 'X264', 'X265', 'VP9', 'AV1'} + codec = metadata.get('codec', '').upper() + if codec and codec not in valid_codecs: + return False + + return True + + def extract_episode_info(self, filename: str) -> Dict[str, Any]: + """ + Extrait les informations d'Ă©pisode du nom de fichier + + Returns: + Dict avec season, episode, title, etc. + """ + episode_info = { + 'season': 1, # Par dĂ©faut + 'episode': None, + 'title': None, + 'special': False, + 'absolute_number': None + } + + clean_name = Path(filename).stem + + # Tentatives avec diffĂ©rents patterns + for pattern in self.episode_patterns: + match = re.search(pattern, clean_name, re.IGNORECASE) + if match: + groups = match.groups() + + # S01E01 format + if len(groups) >= 2 and groups[0] and groups[1]: + episode_info['season'] = int(groups[0]) + episode_info['episode'] = int(groups[1]) + # E01 or Episode 01 format + elif len(groups) >= 1 and groups[0]: + episode_num = int(groups[0]) + + # Si c'est un grand nombre, c'est probablement un Ă©pisode spĂ©cial + if episode_num > 100: + episode_info['episode'] = episode_num + episode_info['special'] = True + else: + episode_info['episode'] = episode_num + + break + + # Extraction du titre si possible + episode_info['title'] = self._extract_title(clean_name, episode_info) + + return episode_info + + def _extract_title(self, filename: str, episode_info: Dict[str, Any]) -> Optional[str]: + """Extrait le titre de l'Ă©pisode du nom de fichier""" + + # Suppression des numĂ©ros d'Ă©pisode et patterns techniques + title_part = filename + + # Suppression du pattern SxxEyy + title_part = re.sub(r'S\d{1,2}\s*[-_.]?\s*E\d{1,2}', '', title_part, flags=re.IGNORECASE) + + # Suppression du pattern Episode xx + title_part = re.sub(r'(?:Episode|Ep)\s*\d{1,3}', '', title_part, flags=re.IGNORECASE) + + # Suppression des patterns techniques + for pattern in self.resolution_patterns + self.codec_patterns + self.source_patterns: + title_part = re.sub(pattern, '', title_part, flags=re.IGNORECASE) + + # Suppression des crochets et parenthĂšses vides + title_part = re.sub(r'[\[\](){}]', '', title_part) + + # Nettoyage final + title_part = re.sub(r'^[-_\s]+|[-_\s]+$', '', title_part) # Suppression au dĂ©but/fin + title_part = re.sub(r'[-_\s]{2,}', ' ', title_part) # Remplacement multiples par un seul + + return title_part.strip() if title_part.strip() else None + + def get_series_name_from_filename(self, filename: str) -> str: + """Extrait le nom de la sĂ©rie du nom de fichier""" + + clean_name = Path(filename).stem + + # Pattern pour le nom avant le numĂ©ro d'Ă©pisode + patterns = [ + r'^\[?([^\[\]]+?)\]?\s*[-_.]?\s*S\d{1,2}E\d{1,2}', + r'^([^-_\[\]]+?)(?:\s*[-_.]\s*)?(?:S\d{1,2}E\d{1,2}|Episode\s*\d{1,3}|E\d{1,3})', + r'^([^-_()]+)(?:\s*[-_.]\s*)?\d{1,3}', + ] + + for pattern in patterns: + match = re.search(pattern, clean_name, re.IGNORECASE) + if match: + series_name = match.group(1).strip() + # Nettoyage du nom + series_name = re.sub(r'[^\w\s-]', '', series_name) + series_name = re.sub(r'\s+', ' ', series_name).strip() + return series_name + + # Fallback: retourne le dĂ©but du nom + words = clean_name.split() + if len(words) >= 2: + return ' '.join(words[:2]) + + return clean_name + + def batch_analyze(self, file_paths: List[Path]) -> List[Dict[str, Any]]: + """Analyse par lots plusieurs fichiers multimĂ©dia""" + results = [] + + for file_path in file_paths: + try: + metadata = self.analyze_media_file(file_path) + results.append(metadata) + except Exception as e: + logger.error(f"Erreur lors de l'analyse de {file_path}: {e}") + results.append({ + 'file_path': file_path, + 'error': str(e), + 'is_valid': False + }) + + return results \ No newline at end of file diff --git a/src/models/__init__.py b/src/models/__init__.py new file mode 100644 index 0000000..43539fe --- /dev/null +++ b/src/models/__init__.py @@ -0,0 +1 @@ +# Package models \ No newline at end of file diff --git a/src/models/episode.py b/src/models/episode.py new file mode 100644 index 0000000..ba1f6e5 --- /dev/null +++ b/src/models/episode.py @@ -0,0 +1,175 @@ +""" +ModĂšle de donnĂ©es pour les sĂ©ries et Ă©pisodes +""" + +from dataclasses import dataclass, field +from typing import List, Optional, Dict, Any +from pathlib import Path +import datetime + + +@dataclass +class Episode: + """ReprĂ©sente un Ă©pisode d'anime""" + file_path: Path + series_name: str + season: int = 1 + episode: int = 0 + title: Optional[str] = None + absolute_number: Optional[int] = None + special: bool = False + file_size: int = 0 + duration: Optional[float] = None + checksum: Optional[str] = None + resolution: Optional[str] = None + codec: Optional[str] = None + verified: bool = False + metadata: Dict[str, Any] = field(default_factory=dict) + + @property + def season_episode_str(self) -> str: + """Retourne la reprĂ©sentation SXXEYY""" + if self.special: + return f"S00E{self.episode:02d}" + return f"S{self.season:02d}E{self.episode:02d}" + + @property + def filename(self) -> str: + """Retourne le nom de fichier actuel""" + return self.file_path.name + + def get_recommended_name(self, series_name: str, include_absolute: bool = False) -> str: + """GĂ©nĂšre le nom recommandĂ© selon le format TVDB""" + # Nettoyage du nom de sĂ©rie + clean_series = self._clean_series_name(series_name) + + # Construction du nom de base + if self.special: + base_name = f"{clean_series} - S00E{self.episode:02d}" + else: + base_name = f"{clean_series} - S{self.season:02d}E{self.episode:02d}" + if include_absolute and self.absolute_number: + base_name += f" ({self.absolute_number})" + + # Ajout du titre si disponible + if self.title: + clean_title = self._clean_title(self.title) + base_name += f" - {clean_title}" + + # Ajout des informations techniques + suffix = "" + if self.resolution: + suffix += f" [{self.resolution}]" + if self.codec: + suffix += f" [{self.codec}]" + if self.checksum: + suffix += f" [{self.checksum}]" + + return f"{base_name}{suffix}{self.file_path.suffix}" + + def _clean_series_name(self, name: str) -> str: + """Nettoie le nom de sĂ©rie pour le formatage""" + # Suppression des caractĂšres spĂ©ciaux et normalisation + import re + name = re.sub(r'[<>:"/\\|?*]', '', name) + name = re.sub(r'\s+', ' ', name).strip() + return name + + def _clean_title(self, title: str) -> str: + """Nettoie le titre d'Ă©pisode pour le formatage""" + import re + title = re.sub(r'[<>:"/\\|?*]', '', title) + title = re.sub(r'\s+', ' ', title).strip() + return title + + +@dataclass +class Series: + """ReprĂ©sente une sĂ©rie d'anime""" + name: str + directory: Path + episodes: List[Episode] = field(default_factory=list) + tvdb_id: Optional[int] = None + anidb_id: Optional[int] = None + total_episodes: Optional[int] = None + special_count: int = 0 + languages: List[str] = field(default_factory=list) + metadata: Dict[str, Any] = field(default_factory=dict) + last_scan: Optional[datetime.datetime] = None + + def add_episode(self, episode: Episode): + """Ajoute un Ă©pisode Ă  la sĂ©rie""" + self.episodes.append(episode) + + def get_episode_by_number(self, season: int, episode: int) -> Optional[Episode]: + """RĂ©cupĂšre un Ă©pisode par son numĂ©ro""" + for ep in self.episodes: + if ep.season == season and ep.episode == episode: + return ep + return None + + def get_specials(self) -> List[Episode]: + """Retourne la liste des Ă©pisodes spĂ©ciaux""" + return [ep for ep in self.episodes if ep.special] + + def get_regular_episodes(self) -> List[Episode]: + """Retourne la liste des Ă©pisodes rĂ©guliers""" + return [ep for ep in self.episodes if not ep.special] + + def check_completeness(self) -> Dict[str, Any]: + """VĂ©rifie la complĂ©tude de la sĂ©rie""" + regular_episodes = self.get_regular_episodes() + episode_numbers = [ep.episode for ep in regular_episodes] + + result = { + 'total_found': len(regular_episodes), + 'specials_found': len(self.get_specials()), + 'missing_episodes': [], + 'duplicate_episodes': [], + 'is_complete': False + } + + if self.total_episodes: + expected = set(range(1, self.total_episodes + 1)) + found = set(episode_numbers) + result['missing_episodes'] = list(expected - found) + + # VĂ©rification des doublons + from collections import Counter + counts = Counter(episode_numbers) + result['duplicate_episodes'] = [num for num, count in counts.items() if count > 1] + + result['is_complete'] = ( + len(result['missing_episodes']) == 0 and + len(result['duplicate_episodes']) == 0 + ) + + return result + + +@dataclass +class DirectoryCompatibilityResult: + """RĂ©sultat de la vĂ©rification de compatibilitĂ© de rĂ©pertoire""" + is_compatible: bool = False + path: Path = field(default_factory=Path) + errors: List[str] = field(default_factory=list) + warnings: List[str] = field(default_factory=list) + found_series: List[str] = field(default_factory=list) + total_episodes: int = 0 + total_size: int = 0 + permissions: Dict[str, bool] = field(default_factory=dict) + disk_space: Dict[str, int] = field(default_factory=dict) + + def add_error(self, error: str): + """Ajoute une erreur""" + self.errors.append(error) + self.is_compatible = False + + def add_warning(self, warning: str): + """Ajoute un avertissement""" + self.warnings.append(warning) + + def summary(self) -> str: + """Retourne un rĂ©sumĂ© du rĂ©sultat""" + status = "✅ Compatible" if self.is_compatible else "❌ Incompatible" + return f"{status} - {len(self.found_series)} sĂ©ries, {self.total_episodes} Ă©pisodes" \ No newline at end of file diff --git a/src/ui/__init__.py b/src/ui/__init__.py new file mode 100644 index 0000000..aef63b8 --- /dev/null +++ b/src/ui/__init__.py @@ -0,0 +1,390 @@ +""" +Interface utilisateur en ligne de commande - Projet de la LĂ©gion de Muyue +""" + +import sys +import os +from pathlib import Path +from typing import Dict, Any, List, Optional +import logging + +from .core import AnimeLibrarianCore + +logger = logging.getLogger(__name__) + + +class AnimeLibrarianUI: + """Interface utilisateur interactive pour AnimeLibrarian""" + + def __init__(self, core: AnimeLibrarianCore): + """ + Initialise l'interface utilisateur + + Args: + core: CƓur de l'application + """ + self.core = core + self.current_series_list = [] + self.selected_series = [] + + logger.info("Interface utilisateur initialisĂ©e") + + def run(self, preselected_directory: str = None): + """ + Lance l'interface interactive + + Args: + preselected_directory: RĂ©pertoire prĂ©-sĂ©lectionnĂ© (contourne la sĂ©lection) + """ + self._print_banner() + + try: + # Étape 1: SĂ©lection du rĂ©pertoire + directory = self._select_directory(preselected_directory) + if not directory: + return + + # Étape 2: Scan des sĂ©ries + self._scan_series(directory) + if not self.current_series_list: + print("❌ Aucune sĂ©rie trouvĂ©e dans ce rĂ©pertoire.") + return + + # Étape 3: SĂ©lection des sĂ©ries + self._select_series() + if not self.selected_series: + print("❌ Aucune sĂ©rie sĂ©lectionnĂ©e.") + return + + # Étape 4: Configuration des opĂ©rations + operations = self._configure_operations() + + # Étape 5: ExĂ©cution des opĂ©rations + self._execute_operations(operations) + + # Étape 6: Rapport final + self._generate_report() + + except KeyboardInterrupt: + print("\n\n⚠ OpĂ©ration annulĂ©e par l'utilisateur.") + except Exception as e: + print(f"\n❌ Erreur inattendue: {e}") + logger.error(f"Erreur UI: {e}") + + def _print_banner(self): + """Affiche la banniĂšre de l'application""" + print("\n" + "="*60) + print("🎬 AnimeLibrarian - Organisation de collections d'anime") + print("="*60) + print("📋 Projet dĂ©veloppĂ© par la LĂ©gion de Muyue") + print("="*60) + print() + + def _select_directory(self, preselected_directory: str = None) -> Optional[str]: + """SĂ©lection du rĂ©pertoire contenant les sĂ©ries""" + + if preselected_directory: + directory = preselected_directory + print(f"📁 RĂ©pertoire prĂ©-sĂ©lectionnĂ©: {directory}") + else: + directory = input("📁 Entrez le chemin du rĂ©pertoire contenant vos sĂ©ries: ").strip() + + if not directory: + print("❌ Aucun rĂ©pertoire spĂ©cifiĂ©.") + return None + + # VĂ©rification de compatibilitĂ© + print(f"\n🔍 VĂ©rification de la compatibilitĂ© du rĂ©pertoire...") + result = self.core.check_directory_compatibility(directory) + + if not result.get("is_compatible"): + print("❌ Le rĂ©pertoire n'est pas compatible:") + for error in result.get("errors", []): + print(f" ‱ {error}") + + # Affichage des recommandations + recommendations = self.core.directory_checker.get_compatibility_recommendations( + type('Result', (), result)() + ) + if recommendations: + print("\n💡 Recommandations:") + for rec in recommendations: + print(f" ‱ {rec}") + + return None + + print("✅ Le rĂ©pertoire est compatible!") + + # RĂ©sumĂ© du rĂ©pertoire + print(f" 📊 SĂ©ries trouvĂ©es: {len(result.get('found_series', []))}") + print(f" đŸ“ș Épisodes trouvĂ©s: {result.get('total_episodes', 0)}") + print(f" đŸ’Ÿ Taille totale: {self._format_size(result.get('total_size', 0))}") + + return directory + + def _scan_series(self, directory: str): + """Scan des sĂ©ries dans le rĂ©pertoire""" + print(f"\n🔍 Scan des sĂ©ries en cours...") + + try: + self.current_series_list = self.core.scan_series(directory) + print(f"✅ {len(self.current_series_list)} sĂ©ries trouvĂ©es!") + except Exception as e: + print(f"❌ Erreur lors du scan: {e}") + raise + + def _select_series(self): + """SĂ©lection des sĂ©ries Ă  traiter""" + print("\n📋 SĂ©ries disponibles:") + print("-" * 60) + + for i, series in enumerate(self.current_series_list): + status = "✅" if series["is_complete"] else "⚠" + episodes = series["total_episodes"] + size = self._format_size(series["total_size"]) + + print(f"{i+1:2d}. {status} {series['name']}") + print(f" đŸ“ș {episodes} Ă©pisodes ‱ đŸ’Ÿ {size}") + + # Afficher les problĂšmes si non complĂšte + if not series["is_complete"]: + if series["completeness"]["missing_episodes"]: + print(f" ⚠ Manque: {series['completeness']['missing_episodes']}") + if series["completeness"]["duplicate_episodes"]: + print(f" ⚠ Doublons: {series['completeness']['duplicate_episodes']}") + + print("\n" + "-"*60) + + # SĂ©lection + selection = input("🎯 Choisissez les sĂ©ries Ă  traiter (ex: 1,3,5-8, * pour tout): ").strip() + + if not selection: + self.selected_series = [] + return + + # Traitement de la sĂ©lection + indices = self._parse_selection(selection, len(self.current_series_list)) + + if not indices: + print("❌ SĂ©lection invalide.") + self.selected_series = [] + return + + # Conversion en objets sĂ©ries + self.selected_series = [ + self.current_series_list[i] for i in indices + ] + + print(f"✅ {len(self.selected_series)} sĂ©rie(s) sĂ©lectionnĂ©e(s):") + for series in self.selected_series: + print(f" ‱ {series['name']}") + + def _configure_operations(self) -> Dict[str, bool]: + """Configuration des opĂ©rations Ă  effectuer""" + print("\n⚙ Configuration des opĂ©rations:") + print("-" * 40) + + operations = {} + + # VĂ©rification des numĂ©ros d'Ă©pisodes + if self._confirm_operation("VĂ©rifier les numĂ©ros d'Ă©pisodes avec trace.moe?"): + operations["verify_episodes"] = True + else: + operations["verify_episodes"] = False + + # VĂ©rification de l'intĂ©gritĂ© + if self._confirm_operation("VĂ©rifier l'intĂ©gritĂ© des fichiers?"): + operations["verify_integrity"] = True + else: + operations["verify_integrity"] = False + + # Renommage + if self._confirm_operation("Renommer les fichiers selon les standards TVDB?"): + operations["rename_files"] = True + + # Options de renommage + if self._confirm_operation(" ‱ CrĂ©er des sauvegardes avant renommage? (recommandĂ©)"): + operations["backup_files"] = True + else: + operations["backup_files"] = False + else: + operations["rename_files"] = False + operations["backup_files"] = False + + return operations + + def _execute_operations(self, operations: Dict[str, bool]): + """ExĂ©cute les opĂ©rations configurĂ©es""" + print("\n🚀 ExĂ©cution des opĂ©rations...") + + indices = [self.current_series_list.index(series) for series in self.selected_series] + + # 1. VĂ©rification des Ă©pisodes + if operations.get("verify_episodes"): + print(f"\n🔍 VĂ©rification des numĂ©ros d'Ă©pisodes...") + try: + results = self.core.verify_episodes_numbers(indices) + self._display_episode_verification_results(results) + except Exception as e: + print(f"❌ Erreur lors de la vĂ©rification des Ă©pisodes: {e}") + + # 2. VĂ©rification d'intĂ©gritĂ© + if operations.get("verify_integrity"): + print(f"\nđŸ›Ąïž VĂ©rification de l'intĂ©gritĂ© des fichiers...") + try: + results = self.core.verify_files_integrity(indices) + self._display_integrity_results(results) + except Exception as e: + print(f"❌ Erreur lors de la vĂ©rification d'intĂ©gritĂ©: {e}") + + # 3. Renommage + if operations.get("rename_files"): + print(f"\n📝 Renommage des fichiers...") + + # Configuration du renommage + self.core.file_renamer.backup_original = operations.get("backup_files", True) + + # Confirmation finale + if self._confirm_operation("⚠ Confirmer le renommage des fichiers?"): + try: + results = self.core.rename_files(indices, dry_run=False) + self._display_rename_results(results) + except Exception as e: + print(f"❌ Erreur lors du renommage: {e}") + else: + print("❌ Renommage annulĂ©.") + + def _display_episode_verification_results(self, results: Dict[str, Any]): + """Affiche les rĂ©sultats de vĂ©rification des Ă©pisodes""" + print(f"✅ {results['series_verified']} sĂ©ries vĂ©rifiĂ©es") + print(f"đŸ“ș {results['episodes_verified']} Ă©pisodes vĂ©rifiĂ©s") + + for series_result in results["verification_results"]: + series_name = series_result["series_name"] + summary = series_result["summary"] + + print(f"\n📋 {series_name}:") + print(f" ✅ Correspondances: {summary['matches']}/{summary['total']}") + print(f" 📊 Taux de rĂ©ussite: {summary['match_rate']:.1%}") + + if summary['mismatches'] > 0: + print(f" ⚠ IncohĂ©rences: {summary['mismatches']}") + + def _display_integrity_results(self, results: Dict[str, Any]): + """Affiche les rĂ©sultats de vĂ©rification d'intĂ©gritĂ©""" + print(f"📊 Fichiers vĂ©rifiĂ©s: {results['files_checked']}") + print(f"✅ Fichiers valides: {results['valid_files']}") + + if results["invalid_files"]: + print(f"❌ Fichiers invalides: {len(results['invalid_files'])}") + for file_info in results["invalid_files"][:5]: # Limite Ă  5 + print(f" ‱ {file_info['filename']}: {file_info['issue']}") + + if len(results["invalid_files"]) > 5: + print(f" ...et {len(results['invalid_files']) - 5} autres fichiers") + + if results["duplicates"]: + print(f"🔄 Doublons dĂ©tectĂ©s: {len(results['duplicates'])}") + for dup_group in results["duplicates"][:3]: # Limite Ă  3 + print(f" ‱ {', '.join(dup_group)}") + + def _display_rename_results(self, results: Dict[str, Any]): + """Affiche les rĂ©sultats du renommage""" + stats = results["stats"] + + print(f"📝 Renommage terminĂ©:") + print(f" ✅ Fichiers renommĂ©s: {stats['renamed']}") + print(f" đŸ’Ÿ Sauvegardes créées: {stats['backups_created']}") + print(f" ❌ Erreurs: {stats['errors']}") + + if results["rename_plan"]: + print(f"\n📋 Plan de renommage ({len(results['rename_plan'])} fichiers):") + for result in results["rename_results"][:5]: # Limite Ă  5 + if result["success"]: + print(f" ✅ {result['old_path'].name} → {result['new_path'].name}") + else: + print(f" ❌ {result['old_path'].name}: {result['error']}") + + if len(results["rename_results"]) > 5: + print(f" ...et {len(results['rename_results']) - 5} autres fichiers") + + def _generate_report(self): + """GĂ©nĂšre un rapport final""" + print("\n" + "="*60) + print("📊 RAPPORT FINAL - AnimeLibrarian") + print("="*60) + + # Statistiques globales + print(f"🎯 SĂ©ries traitĂ©es: {len(self.selected_series)}") + + total_episodes = sum(series["total_episodes"] for series in self.selected_series) + print(f"đŸ“ș Épisodes analysĂ©s: {total_episodes}") + + total_size = sum(series["total_size"] for series in self.selected_series) + print(f"đŸ’Ÿ Taille totale: {self._format_size(total_size)}") + + # Statistiques de l'application + status = self.core.get_application_status() + print(f"🔗 TVDB configurĂ©: {'Oui' if status['tvdb_configured'] else 'Non'}") + print(f"🔍 Trace.moe configurĂ©: {'Oui' if status['trace_moe_configured'] else 'Non'}") + + print("\n✅ OpĂ©ration terminĂ©e avec succĂšs!") + print("Merci d'utiliser AnimeLibrarian - Projet de la LĂ©gion de Muyue") + print("="*60) + + def _confirm_operation(self, message: str) -> bool: + """Demande confirmation Ă  l'utilisateur""" + response = input(f"{message} (o/n): ").strip().lower() + return response in ('o', 'oui', 'y', 'yes') + + def _parse_selection(self, selection: str, max_index: int) -> List[int]: + """Parse une sĂ©lection de sĂ©ries (1,3,5-8, *)""" + if selection.strip() == '*': + return list(range(max_index)) + + indices = set() + + parts = selection.split(',') + for part in parts: + part = part.strip() + + if '-' in part: + # Range (5-8) + try: + start, end = part.split('-', 1) + start = int(start.strip()) + end = int(end.strip()) + + # Validation + if 1 <= start <= max_index and 1 <= end <= max_index: + indices.update(range(start - 1, end)) + except ValueError: + continue + else: + # Single number + try: + idx = int(part) + if 1 <= idx <= max_index: + indices.add(idx - 1) + except ValueError: + continue + + return sorted(indices) + + def _format_size(self, size_bytes: int) -> str: + """Formate une taille en octets en format lisible""" + if not size_bytes: + return "0 B" + + units = ["B", "KB", "MB", "GB", "TB"] + unit_index = 0 + size = float(size_bytes) + + while size >= 1024 and unit_index < len(units) - 1: + size /= 1024 + unit_index += 1 + + if unit_index == 0: + return f"{int(size)} {units[unit_index]}" + else: + return f"{size:.1f} {units[unit_index]}" \ No newline at end of file diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..f7295d2 --- /dev/null +++ b/src/utils/__init__.py @@ -0,0 +1,16 @@ +""" +Utils et helpers pour AnimeLibrarian - Projet de la LĂ©gion de Muyue +""" + +from .config import load_config, save_config, get_config_template, validate_config, setup_directories, mask_sensitive_data +from .logging import setup_logging + +__all__ = [ + 'load_config', + 'save_config', + 'get_config_template', + 'validate_config', + 'setup_directories', + 'mask_sensitive_data', + 'setup_logging' +] \ No newline at end of file diff --git a/src/utils/config.py b/src/utils/config.py new file mode 100644 index 0000000..701f221 --- /dev/null +++ b/src/utils/config.py @@ -0,0 +1,294 @@ +""" +Fichier de configuration pour AnimeLibrarian - Projet de la LĂ©gion de Muyue +""" + +import json +import os +from pathlib import Path +from typing import Dict, Any, Optional +import logging + +logger = logging.getLogger(__name__) + + +DEFAULT_CONFIG = { + # Configuration gĂ©nĂ©rale + "language": "fra", + "log_level": "INFO", + "temp_directory": "/tmp/animelibrarian", + + # Configuration TVDB + "thetvdb_api_key": None, + "thetvdb_base_url": "https://api.thetvdb.com", + + # Configuration trace.moe + "trace_moe_api_key": None, + "trace_moe_base_url": "https://api.trace.moe", + "trace_moe_rate_limit": 1.0, + "trace_moe_timeout": 30, + "trace_moe_max_retries": 3, + + # Configuration du scan de fichiers + "video_extensions": [".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm", + ".m4v", ".mpg", ".mpeg", ".3gp", ".ts", ".m2ts", ".ogv"], + "max_scan_depth": 10, + "ignore_patterns": [".git", ".svn", ".hg", "__pycache__", "node_modules", + ".vscode", ".idea", "Thumbs.db", ".DS_Store"], + + # Configuration de dĂ©tection multimĂ©dia + "min_video_size": 50 * 1024 * 1024, # 50 Mo + "max_video_size": 50 * 1024 * 1024 * 1024, # 50 Go + "min_video_duration": 300, # 5 minutes + + # Configuration du renommage + "backup_original": True, + "backup_directory": ".backups", + "include_absolute_number": True, + "include_episode_title": True, + "include_technical_info": True, + "dry_run": False, + + # Configuration du cache + "enable_cache": True, + "cache_directory": ".cache", + "cache_expiry_hours": 24, + + # Configuration de la compatibilitĂ© + "min_free_space_percent": 5, + "min_free_space_bytes": 1024 * 1024 * 1024, # 1 Go + + # Configuration de l'interface + "max_display_items": 20, + "confirm_operations": True +} + + +def load_config(config_path: str = "config.json") -> Dict[str, Any]: + """ + Charge la configuration depuis un fichier + + Args: + config_path: Chemin vers le fichier de configuration + + Returns: + Dictionnaire de configuration + """ + config = DEFAULT_CONFIG.copy() + + if not config_path: + return config + + config_file = Path(config_path) + + if not config_file.exists(): + logger.info(f"Fichier de configuration non trouvĂ©: {config_path}") + logger.info("Utilisation de la configuration par dĂ©faut") + save_config(config, config_path) + return config + + try: + with open(config_file, 'r', encoding='utf-8') as f: + user_config = json.load(f) + + # Fusion avec la configuration par dĂ©faut + config.update(user_config) + logger.info(f"Configuration chargĂ©e depuis: {config_path}") + + except json.JSONDecodeError as e: + logger.error(f"Erreur de format JSON dans {config_path}: {e}") + logger.info("Utilisation de la configuration par dĂ©faut") + except Exception as e: + logger.error(f"Erreur lors du chargement de la configuration: {e}") + logger.info("Utilisation de la configuration par dĂ©faut") + + return config + + +def save_config(config: Dict[str, Any], config_path: str = "config.json"): + """ + Sauvegarde la configuration dans un fichier + + Args: + config: Configuration Ă  sauvegarder + config_path: Chemin vers le fichier de configuration + """ + try: + config_file = Path(config_path) + + # CrĂ©ation du rĂ©pertoire parent si nĂ©cessaire + config_file.parent.mkdir(parents=True, exist_ok=True) + + with open(config_file, 'w', encoding='utf-8') as f: + json.dump(config, f, indent=2, ensure_ascii=False) + + logger.info(f"Configuration sauvegardĂ©e dans: {config_path}") + + except Exception as e: + logger.error(f"Erreur lors de la sauvegarde de la configuration: {e}") + + +def get_config_template() -> str: + """Retourne un modĂšle de configuration avec commentaires""" + template = """{ + // Configuration gĂ©nĂ©rale + "language": "fra", + "log_level": "INFO", + "temp_directory": "/tmp/animelibrarian", + + // Configuration TVDB - Obtenez votre clĂ© API sur https://thetvdb.com/ + "thetvdb_api_key": null, + "thetvdb_base_url": "https://api.thetvdb.com", + + // Configuration trace.moe - ClĂ© API optionnelle pour des limites plus Ă©levĂ©es + "trace_moe_api_key": null, + "trace_moe_base_url": "https://api.trace.moe", + "trace_moe_rate_limit": 1.0, + "trace_moe_timeout": 30, + "trace_moe_max_retries": 3, + + // Extensions vidĂ©o supportĂ©es + "video_extensions": [ + ".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm", + ".m4v", ".mpg", ".mpeg", ".3gp", ".ts", ".m2ts", ".ogv" + ], + + // Configuration du scan + "max_scan_depth": 10, + "ignore_patterns": [ + ".git", ".svn", ".hg", "__pycache__", "node_modules", + ".vscode", ".idea", "Thumbs.db", ".DS_Store" + ], + + // Configuration de dĂ©tection multimĂ©dia + "min_video_size": 52428800, + "max_video_size": 53687091200, + "min_video_duration": 300, + + // Configuration du renommage + "backup_original": true, + "backup_directory": ".backups", + "include_absolute_number": true, + "include_episode_title": true, + "include_technical_info": true, + "dry_run": false, + + // Configuration du cache + "enable_cache": true, + "cache_directory": ".cache", + "cache_expiry_hours": 24, + + // Configuration de la compatibilitĂ© + "min_free_space_percent": 5, + "min_free_space_bytes": 1073741824, + + // Configuration de l'interface + "max_display_items": 20, + "confirm_operations": true +}""" + + return template + + +def validate_config(config: Dict[str, Any]) -> Dict[str, Any]: + """ + Valide la configuration et retourne les problĂšmes + + Args: + config: Configuration Ă  valider + + Returns: + Dictionnaire avec les erreurs et avertissements + """ + result = { + "errors": [], + "warnings": [], + "is_valid": True + } + + # Validation des clĂ©s API + if not config.get("thetvdb_api_key"): + result["warnings"].append("ClĂ© API TVDB non configurĂ©e - fonctionnalitĂ©s limitĂ©es") + + # Validation des chemins + temp_dir = Path(config.get("temp_directory", "/tmp/animelibrarian")) + if not temp_dir.parent.exists(): + result["errors"].append(f"RĂ©pertoire parent invalide pour temp_directory: {temp_dir.parent}") + + # Validation des valeurs numĂ©riques + if config.get("min_video_size", 0) < 0: + result["errors"].append("min_video_size doit ĂȘtre positif") + + if config.get("max_video_size", 0) <= config.get("min_video_size", 0): + result["errors"].append("max_video_size doit ĂȘtre supĂ©rieur Ă  min_video_size") + + if config.get("trace_moe_rate_limit", 0) <= 0: + result["errors"].append("trace_moe_rate_limit doit ĂȘtre positif") + + # Validation des extensions + extensions = config.get("video_extensions", []) + if not extensions or not all(ext.startswith('.') for ext in extensions): + result["errors"].append("video_extensions doit contenir des extensions valides (ex: .mp4)") + + # Validation du niveau de log + valid_log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] + if config.get("log_level", "").upper() not in valid_log_levels: + result["errors"].append(f"log_level doit ĂȘtre l'une de: {', '.join(valid_log_levels)}") + + result["is_valid"] = len(result["errors"]) == 0 + + return result + + +def setup_directories(config: Dict[str, Any]): + """ + CrĂ©e les rĂ©pertoires nĂ©cessaires selon la configuration + + Args: + config: Configuration de l'application + """ + directories = [] + + # RĂ©pertoire temporaire + if config.get("temp_directory"): + directories.append(Path(config["temp_directory"])) + + # RĂ©pertoire de sauvegarde + if config.get("backup_original") and config.get("backup_directory"): + # Ce sera créé relativement au rĂ©pertoire de travail + pass + + # RĂ©pertoire de cache + if config.get("enable_cache") and config.get("cache_directory"): + directories.append(Path(config["cache_directory"])) + + for directory in directories: + try: + directory.mkdir(parents=True, exist_ok=True) + logger.debug(f"RĂ©pertoire créé/vĂ©rifiĂ©: {directory}") + except Exception as e: + logger.warning(f"Impossible de crĂ©er le rĂ©pertoire {directory}: {e}") + + +def mask_sensitive_data(config: Dict[str, Any]) -> Dict[str, Any]: + """ + Masque les donnĂ©es sensibles pour l'affichage + + Args: + config: Configuration originale + + Returns: + Configuration avec les donnĂ©es sensibles masquĂ©es + """ + masked = config.copy() + + # Liste des clĂ©s sensibles + sensitive_keys = [ + "thetvdb_api_key", + "trace_moe_api_key" + ] + + for key in sensitive_keys: + if key in masked and masked[key]: + masked[key] = "***masquĂ©***" + + return masked \ No newline at end of file diff --git a/src/utils/logging.py b/src/utils/logging.py new file mode 100644 index 0000000..1ff2be1 --- /dev/null +++ b/src/utils/logging.py @@ -0,0 +1,69 @@ +""" +Configuration du logging pour AnimeLibrarian - Projet de la LĂ©gion de Muyue +""" + +import logging +import logging.handlers +import os +from pathlib import Path +from typing import Optional + + +def setup_logging(verbose: bool = False, log_file: Optional[str] = None): + """ + Configure le systĂšme de logging + + Args: + verbose: Active le mode verbeux (DEBUG) + log_file: Fichier de log (optionnel) + """ + + # Niveau de log + level = logging.DEBUG if verbose else logging.INFO + + # Configuration du formatter + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # Configuration du logger racine + root_logger = logging.getLogger() + root_logger.setLevel(level) + + # Suppression des handlers existants + for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + + # Handler console + console_handler = logging.StreamHandler() + console_handler.setLevel(level) + console_handler.setFormatter(formatter) + root_logger.addHandler(console_handler) + + # Handler fichier si spĂ©cifiĂ© + if log_file: + try: + log_path = Path(log_file) + log_path.parent.mkdir(parents=True, exist_ok=True) + + file_handler = logging.handlers.RotatingFileHandler( + log_file, + maxBytes=10 * 1024 * 1024, # 10 Mo + backupCount=5 + ) + file_handler.setLevel(logging.DEBUG) # Toujours DEBUG dans les fichiers + file_handler.setFormatter(formatter) + root_logger.addHandler(file_handler) + + except Exception as e: + print(f"⚠ Impossible de crĂ©er le fichier de log {log_file}: {e}") + + # Configuration spĂ©cifique pour les modules externes + external_modules = [ + 'urllib3.connectionpool', + 'requests.packages.urllib3.connectionpool' + ] + + for module in external_modules: + logging.getLogger(module).setLevel(logging.WARNING) \ No newline at end of file