diff --git a/main.py b/main.py new file mode 100644 index 0000000..89cde60 --- /dev/null +++ b/main.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +AnimeLibrarian - Outil d'organisation et de vérification de collections d'anime +Point d'entrée principal de l'application +""" + +import os +import sys +import argparse +from pathlib import Path + +# Ajout du répertoire courant au path pour importer les modules locaux +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from src.core import AnimeLibrarianCore +from src.ui import AnimeLibrarianUI +from src.utils import setup_logging, load_config + + +def main(): + """Point d'entrée principal de l'application""" + parser = argparse.ArgumentParser( + description="AnimeLibrarian - Outil d'organisation et de vérification de collections d'anime" + ) + parser.add_argument( + "--config", "-c", + type=str, + help="Chemin vers le fichier de configuration", + default="config.json" + ) + parser.add_argument( + "--verbose", "-v", + action="store_true", + help="Active le mode verbeux" + ) + parser.add_argument( + "--directory", "-d", + type=str, + help="Répertoire de base à analyser (contournement de la sélection interactive)" + ) + + args = parser.parse_args() + + # Configuration du logging + setup_logging(verbose=args.verbose) + + # Chargement de la configuration + config = load_config(args.config) + + # Initialisation du cœur de l'application + core = AnimeLibrarianCore(config) + + # Initialisation de l'interface utilisateur + ui = AnimeLibrarianUI(core) + + # Lancement de l'interface interactive + ui.run(preselected_directory=args.directory) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\nOpération annulée par l'utilisateur.") + sys.exit(0) + except Exception as e: + print(f"Erreur: {e}") + sys.exit(1) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..94277c0 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,27 @@ +# AnimeLibrarian - Dépendances Python +# Projet développé par la Légion de Muyue + +# Core dependencies +requests>=2.28.0 +pathlib2>=2.3.0 + +# Video processing (optional but recommended) +ffmpeg-python>=0.2.0 + +# Image processing (for trace.moe) +Pillow>=9.0.0 + +# Data handling +dataclasses>=0.8; python_version<"3.7" + +# Logging and configuration +pyyaml>=6.0 + +# Development dependencies (optional) +pytest>=7.0.0 +pytest-cov>=4.0.0 +black>=22.0.0 +flake8>=5.0.0 + +# Optional: Advanced video information +# ffprobe-python>=0.1.0 \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..9b29248 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,3 @@ +# AnimeLibrarian - Package principal +__version__ = "1.0.0" +__author__ = "AnimeLibrarian Team" \ No newline at end of file diff --git a/src/api/__init__.py b/src/api/__init__.py new file mode 100644 index 0000000..34a24b4 --- /dev/null +++ b/src/api/__init__.py @@ -0,0 +1 @@ +# Package api \ No newline at end of file diff --git a/src/api/thetvdb_client.py b/src/api/thetvdb_client.py new file mode 100644 index 0000000..f3101e8 --- /dev/null +++ b/src/api/thetvdb_client.py @@ -0,0 +1,491 @@ +""" +Client API pour TheTVDB - Projet de la Légion de Muyue +""" + +import json +import time +import requests +from typing import Dict, Any, List, Optional, Tuple +from datetime import datetime, timedelta +import logging + +logger = logging.getLogger(__name__) + + +class TheTVDBClient: + """Client pour l'API TheTVDB""" + + BASE_URL = "https://api.thetvdb.com" + + def __init__(self, api_key: str = None, language: str = "fra"): + """ + Initialise le client TheTVDB + + Args: + api_key: Clé API TheTVDB (requise pour les requêtes authentifiées) + language: Code de langue pour les métadonnées ('fra', 'eng', etc.) + """ + self.api_key = api_key + self.language = language + self.token = None + self.token_expires = None + self.session = requests.Session() + + # Headers par défaut + self.session.headers.update({ + "Content-Type": "application/json", + "Accept": "application/json" + }) + + def login(self) -> bool: + """ + Authentifie le client auprès de l'API TheTVDB + + Returns: + bool: True si l'authentification a réussi + """ + if not self.api_key: + logger.warning("Aucune clé API TheTVDB fournie - utilisation en mode limité") + return False + + try: + auth_data = { + "apikey": self.api_key + } + + response = self.session.post( + f"{self.BASE_URL}/login", + json=auth_data, + timeout=10 + ) + + if response.status_code == 200: + auth_response = response.json() + self.token = auth_response.get("token") + + if self.token: + # Ajout du token aux headers + self.session.headers.update({ + "Authorization": f"Bearer {self.token}" + }) + + # Le token expire après 24 heures + self.token_expires = datetime.now() + timedelta(hours=24) + + logger.info("Authentification TheTVDB réussie") + return True + else: + logger.error("Token non trouvé dans la réponse TheTVDB") + return False + else: + logger.error(f"Échec de l'authentification TheTVDB: {response.status_code} - {response.text}") + return False + + except Exception as e: + logger.error(f"Erreur lors de l'authentification TheTVDB: {e}") + return False + + def ensure_authenticated(self) -> bool: + """Vérifie que le client est authentifié et ré-authentifie si nécessaire""" + + # Si pas de clé API, on ne peut pas s'authentifier + if not self.api_key: + return False + + # Si on a un token valide + if self.token and self.token_expires and datetime.now() < self.token_expires: + return True + + # Sinon, on s'authentifie + return self.login() + + def search_series(self, name: str) -> List[Dict[str, Any]]: + """ + Recherche des séries par nom + + Args: + name: Nom de la série à rechercher + + Returns: + Liste des séries trouvées + """ + try: + # Ajout de paramètres pour les résultats pertinents + params = { + "name": name, + "type": "series" + } + + # Utilisation de l'endpoint public pour la recherche + url = f"{self.BASE_URL}/search/series" + + response = self.session.get(url, params=params, timeout=10) + + if response.status_code == 200: + data = response.json() + return data.get("data", []) + elif response.status_code == 401 and self.ensure_authenticated(): + # Réessayer avec authentification + response = self.session.get(url, params=params, timeout=10) + if response.status_code == 200: + data = response.json() + return data.get("data", []) + + logger.error(f"Erreur recherche série {name}: {response.status_code} - {response.text}") + return [] + + except Exception as e: + logger.error(f"Erreur lors de la recherche de série {name}: {e}") + return [] + + def get_series_by_id(self, series_id: int) -> Optional[Dict[str, Any]]: + """ + Récupère les informations détaillées d'une série par son ID + + Args: + series_id: ID TheTVDB de la série + + Returns: + Dictionnaire avec les informations de la série ou None + """ + try: + url = f"{self.BASE_URL}/series/{series_id}" + params = {"lang": self.language} + + response = self.session.get(url, params=params, timeout=10) + + if response.status_code == 200: + return response.json().get("data") + elif response.status_code == 401 and self.ensure_authenticated(): + response = self.session.get(url, params=params, timeout=10) + if response.status_code == 200: + return response.json().get("data") + + logger.error(f"Erreur récupération série {series_id}: {response.status_code}") + return None + + except Exception as e: + logger.error(f"Erreur lors de la récupération de la série {series_id}: {e}") + return None + + def get_episodes(self, series_id: int, page: int = 0) -> List[Dict[str, Any]]: + """ + Récupère les épisodes d'une série + + Args: + series_id: ID TheTVDB de la série + page: Page de résultats (pagination) + + Returns: + Liste des épisodes + """ + try: + url = f"{self.BASE_URL}/series/{series_id}/episodes/query" + params = { + "lang": self.language, + "page": page, + "pageSize": 100 # Maximum par page + } + + response = self.session.get(url, params=params, timeout=10) + + if response.status_code == 200: + data = response.json() + return data.get("data", []) + elif response.status_code == 401 and self.ensure_authenticated(): + response = self.session.get(url, params=params, timeout=10) + if response.status_code == 200: + data = response.json() + return data.get("data", []) + + logger.error(f"Erreur récupération épisodes série {series_id}: {response.status_code}") + return [] + + except Exception as e: + logger.error(f"Erreur lors de la récupération des épisodes de la série {series_id}: {e}") + return [] + + def get_all_episodes(self, series_id: int) -> List[Dict[str, Any]]: + """ + Récupère tous les épisodes d'une série (toutes les pages) + + Args: + series_id: ID TheTVDB de la série + + Returns: + Liste complète de tous les épisodes + """ + all_episodes = [] + page = 0 + + while True: + episodes = self.get_episodes(series_id, page) + if not episodes: + break + + all_episodes.extend(episodes) + page += 1 + + # Petite pause pour éviter de surcharger l'API + time.sleep(0.1) + + logger.info(f"Récupéré {len(all_episodes)} épisodes pour la série {series_id}") + return all_episodes + + def get_episode_by_number(self, series_id: int, season: int, episode: int) -> Optional[Dict[str, Any]]: + """ + Récupère un épisode spécifique par son numéro + + Args: + series_id: ID TheTVDB de la série + season: Numéro de saison + episode: Numéro d'épisode + + Returns: + Dictionnaire avec les informations de l'épisode ou None + """ + try: + url = f"{self.BASE_URL}/series/{series_id}/episodes/{season}/{episode}" + params = {"lang": self.language} + + response = self.session.get(url, params=params, timeout=10) + + if response.status_code == 200: + return response.json().get("data") + elif response.status_code == 401 and self.ensure_authenticated(): + response = self.session.get(url, params=params, timeout=10) + if response.status_code == 200: + return response.json().get("data") + + return None + + except Exception as e: + logger.error(f"Erreur lors de la récupération de l'épisode S{season:02d}E{episode:02d}: {e}") + return None + + def get_series_artwork(self, series_id: int) -> List[Dict[str, Any]]: + """ + Récupère les artworks (posters, bannières) d'une série + + Args: + series_id: ID TheTVDB de la série + + Returns: + Liste des artworks + """ + try: + url = f"{self.BASE_URL}/series/{series_id}/images/query" + params = { + "keyType": "poster,series", + "lang": self.language + } + + response = self.session.get(url, params=params, timeout=10) + + if response.status_code == 200: + data = response.json() + return data.get("data", []) + elif response.status_code == 401 and self.ensure_authenticated(): + response = self.session.get(url, params=params, timeout=10) + if response.status_code == 200: + data = response.json() + return data.get("data", []) + + return [] + + except Exception as e: + logger.error(f"Erreur lors de la récupération des artworks de la série {series_id}: {e}") + return [] + + def build_episode_map(self, series_id: int) -> Dict[Tuple[int, int], Dict[str, Any]]: + """ + Construit une map des épisodes pour lookup rapide + + Returns: + Dict avec (saison, épisode) -> données épisode + """ + episodes = self.get_all_episodes(series_id) + episode_map = {} + + for ep in episodes: + season = ep.get("airedSeason", 1) + episode_num = ep.get("airedEpisodeNumber", 0) + + if season and episode_num: + episode_map[(season, episode_num)] = ep + + logger.info(f"Map d'épisodes construite: {len(episode_map)} entrées") + return episode_map + + def get_episode_title(self, series_id: int, season: int, episode: int, fallback_title: str = None) -> str: + """ + Récupère le titre d'un épisode + + Args: + series_id: ID TheTVDB de la série + season: Numéro de saison + episode: Numéro d'épisode + fallback_title: Titre par défaut si non trouvé + + Returns: + Titre de l'épisode + """ + episode_data = self.get_episode_by_number(series_id, season, episode) + + if episode_data and episode_data.get("episodeName"): + return episode_data["episodeName"] + + # Utiliser le titre de fallback ou générer un titre générique + if fallback_title: + return fallback_title + + return f"Episode {episode:02d}" + + def search_best_match(self, search_name: str, year: int = None) -> Optional[Dict[str, Any]]: + """ + Recherche la meilleure correspondance pour une série + + Args: + search_name: Nom à rechercher + year: Année de sortie pour affiner la recherche + + Returns: + Meilleure correspondance ou None + """ + search_term = search_name + + # Ajouter l'année si disponible + if year: + search_term = f"{search_term} ({year})" + + results = self.search_series(search_term) + + if not results and year: + # Réessayer sans l'année + results = self.search_series(search_name) + + if not results: + return None + + # Score de correspondance + best_match = None + best_score = 0 + + search_lower = search_name.lower() + + for series in results: + name = series.get("seriesName", "").lower() + + # Score basique de similarité + score = 0 + + # Correspondance exacte + if name == search_lower: + score = 100 + # Correspondance partielle + elif search_lower in name or name in search_lower: + score = 70 + # Correspondance de mots + else: + search_words = set(search_lower.split()) + name_words = set(name.split()) + common_words = search_words.intersection(name_words) + score = len(common_words) * 20 + + # Bonus pour l'année + if year: + series_year = self._extract_year_from_series(series) + if series_year == year: + score += 15 + elif abs(series_year - year) <= 1: + score += 5 + + # Mise à jour de la meilleure correspondance + if score > best_score: + best_score = score + best_match = series + + # Seuil minimum de confiance + if best_score >= 50: + logger.info(f"Meilleure correspondance pour '{search_name}': {best_match.get('seriesName')} (score: {best_score})") + return best_match + + logger.warning(f"Pas de correspondance suffisante pour '{search_name}' (meilleur score: {best_score})") + return None + + def _extract_year_from_series(self, series: Dict[str, Any]) -> int: + """Extrait l'année de la série""" + first_aired = series.get("firstAired", "") + if first_aired: + try: + return datetime.strptime(first_aired, "%Y-%m-%d").year + except: + pass + + return 0 + + def get_recommended_format(self, series: Dict[str, Any], episode_info: Dict[str, Any]) -> str: + """ + Génère le nom de fichier recommandé selon le format TheTVDB + + Args: + series: Informations de la série + episode_info: Informations de l'épisode + + Returns: + Nom de fichier formaté + """ + series_name = series.get("seriesName", "Unknown Series") + + season = episode_info.get("season", 1) + episode = episode_info.get("episode", 0) + title = episode_info.get("title", "") + + # Nettoyage et formatage + clean_series = self._clean_string(series_name) + clean_title = self._clean_string(title) if title else "" + + # Construction du nom + if episode_info.get("special", False): + # Épisode spécial + formatted = f"{clean_series} - S00E{episode:02d}" + else: + # Épisode normal + formatted = f"{clean_series} - S{season:02d}E{episode:02d}" + + # Ajout du titre + if clean_title: + formatted += f" - {clean_title}" + + return formatted + + def _clean_string(self, text: str) -> str: + """Nettoie une chaîne pour le formatage de nom de fichier""" + if not text: + return "" + + # Suppression des caractères invalides + invalid_chars = '<>:"/\\|?*' + for char in invalid_chars: + text = text.replace(char, '') + + # Normalisation des espaces + text = text.replace(' ', ' ').strip() + + return text + + def get_languages(self) -> List[Dict[str, Any]]: + """Récupère la liste des langues disponibles""" + try: + url = f"{self.BASE_URL}/languages" + + response = self.session.get(url, timeout=10) + + if response.status_code == 200: + return response.json().get("data", []) + + return [] + + except Exception as e: + logger.error(f"Erreur lors de la récupération des langues: {e}") + return [] \ No newline at end of file diff --git a/src/api/tracemoe_client.py b/src/api/tracemoe_client.py new file mode 100644 index 0000000..a049185 --- /dev/null +++ b/src/api/tracemoe_client.py @@ -0,0 +1,429 @@ +""" +Client pour l'API trace.moe - Projet de la Légion de Muyue +""" + +import base64 +import json +import time +import requests +from pathlib import Path +from typing import Dict, Any, List, Optional, Tuple +import logging + +logger = logging.getLogger(__name__) + + +class TraceMoeClient: + """Client pour l'API trace.moe de reconnaissance de scènes d'anime""" + + BASE_URL = "https://api.trace.moe" + + def __init__(self, config: Dict[str, Any] = None): + """ + Initialise le client trace.moe + + Args: + config: Configuration du client + """ + self.config = config or {} + self.api_key = self.config.get("trace_moe_api_key") + self.session = requests.Session() + + # Configuration des limites + self.max_retries = self.config.get("max_retries", 3) + self.retry_delay = self.config.get("retry_delay", 1.0) + self.timeout = self.config.get("timeout", 30) + + # Limites de l'API + self.rate_limit = self.config.get("rate_limit", 1.0) # secondes entre requêtes + self.last_request_time = 0 + + def check_rate_limit(self): + """Vérifie et respecte les limites de taux de l'API""" + current_time = time.time() + elapsed = current_time - self.last_request_time + + if elapsed < self.rate_limit: + sleep_time = self.rate_limit - elapsed + logger.debug(f"Rate limit: attente de {sleep_time:.1f}s") + time.sleep(sleep_time) + + self.last_request_time = time.time() + + def extract_frame_from_video(self, video_path: Path, timestamp: float = None) -> Optional[bytes]: + """ + Extrait une frame d'une vidéo pour analyse + + Args: + video_path: Chemin vers le fichier vidéo + timestamp: Temps en secondes (sinon 60s ou 30% de la durée) + + Returns: + Image en bytes ou None en cas d'erreur + """ + try: + # Import ici pour éviter les dépendances si non utilisé + import subprocess + + # Détermination du timestamp + if timestamp is None: + # Essayer d'obtenir la durée de la vidéo + try: + cmd = [ + 'ffprobe', '-v', 'quiet', '-show_entries', + 'format=duration', '-of', 'csv=p=0', str(video_path) + ] + result = subprocess.run( + cmd, capture_output=True, text=True, timeout=10 + ) + if result.returncode == 0: + duration = float(result.stdout.strip()) + timestamp = min(duration * 0.3, 60) # 30% de la durée, max 60s + else: + timestamp = 60 + except: + timestamp = 60 + + # Extraction de la frame avec ffmpeg + cmd = [ + 'ffmpeg', '-i', str(video_path), + '-ss', str(timestamp), + '-frames:v', '1', + '-f', 'image2pipe', + '-vcodec', 'png', + '-' + ] + + result = subprocess.run( + cmd, capture_output=True, timeout=20 + ) + + if result.returncode == 0 and result.stdout: + return result.stdout + else: + logger.error(f"Erreur extraction frame: {result.stderr.decode('utf-8')}") + return None + + except subprocess.TimeoutExpired: + logger.error("Timeout lors de l'extraction de frame") + return None + except Exception as e: + logger.error(f"Erreur lors de l'extraction de frame: {e}") + return None + + def analyze_frame(self, frame_data: bytes) -> Optional[Dict[str, Any]]: + """ + Analyse une frame avec l'API trace.moe + + Args: + frame_data: Données de l'image en bytes + + Returns: + Réponse de l'API ou None en cas d'erreur + """ + self.check_rate_limit() + + try: + # Encodage en base64 + encoded_image = base64.b64encode(frame_data).decode('utf-8') + + # Préparation de la requête + url = f"{self.BASE_URL}/search" + data = { + "image": encoded_image + } + + # Ajout de la clé API si disponible + if self.api_key: + data["key"] = self.api_key + + # Requête avec retry + for attempt in range(self.max_retries): + try: + response = self.session.post( + url, json=data, timeout=self.timeout + ) + + if response.status_code == 200: + return response.json() + elif response.status_code == 429: + # Too Many Requests - augmenter le délai + logger.warning("API rate limit atteint, augmentation du délai") + time.sleep(self.rate_limit * (attempt + 1)) + continue + else: + logger.error(f"Erreur API trace.moe: {response.status_code} - {response.text}") + if attempt < self.max_retries - 1: + time.sleep(self.retry_delay) + continue + + except requests.exceptions.Timeout: + logger.error(f"Timeout API (tentative {attempt + 1})") + if attempt < self.max_retries - 1: + time.sleep(self.retry_delay * (attempt + 1)) + continue + except Exception as e: + logger.error(f"Erreur requête API (tentative {attempt + 1}): {e}") + if attempt < self.max_retries - 1: + time.sleep(self.retry_delay) + continue + + return None + + except Exception as e: + logger.error(f"Erreur lors de l'analyse de frame: {e}") + return None + + def search_video(self, video_path: Path, timestamp: float = None) -> Optional[Dict[str, Any]]: + """ + Analyse un fichier vidéo complet + + Args: + video_path: Chemin vers le fichier vidéo + timestamp: Timestamp spécifique à analyser + + Returns: + Résultat de l'analyse ou None + """ + logger.debug(f"Analyse de la vidéo: {video_path.name}") + + # Extraction de frame + frame_data = self.extract_frame_from_video(video_path, timestamp) + if not frame_data: + logger.error(f"Impossible d'extraire une frame de {video_path}") + return None + + # Analyse de la frame + result = self.analyze_frame(frame_data) + if result: + # Ajout d'informations supplémentaires + result['source_file'] = str(video_path) + result['timestamp_used'] = timestamp + result['frame_size'] = len(frame_data) + + logger.debug(f"Analyse réussie pour {video_path.name}") + + return result + + def identify_anime_episode(self, video_path: Path, timestamp: float = None) -> Optional[Dict[str, Any]]: + """ + Identifie l'anime et l'épisode à partir d'une vidéo + + Args: + video_path: Chemin vers le fichier vidéo + timestamp: Timestamp spécifique à analyser + + Returns: + Informations détaillées sur l'épisode identifié + """ + result = self.search_video(video_path, timestamp) + + if not result: + return None + + # Traitement des résultats + return self._process_trace_moe_result(result) + + def _process_trace_moe_result(self, result: Dict[str, Any]) -> Dict[str, Any]: + """Traite et formate les résultats de trace.moe""" + + processed = { + "success": False, + "matches": [], + "best_match": None, + "error": None + } + + try: + # Vérification des erreurs + if result.get("error"): + processed["error"] = result["error"] + return processed + + # Extraction des résultats + docs = result.get("result", []) + if not docs: + processed["error"] = "Aucune correspondance trouvée" + return processed + + processed["matches"] = docs + + # Identification de la meilleure correspondance + if docs: + best_match = docs[0] # Le premier est généralement le plus probable + + processed["best_match"] = { + "anime_title": best_match.get("anilist", {}).get("title", {}).get("romaji", ""), + "anime_title_en": best_match.get("anilist", {}).get("title", {}).get("english", ""), + "anime_title_native": best_match.get("anilist", {}).get("title", {}).get("native", ""), + "episode": best_match.get("episode", None), + "timestamp": best_match.get("from", None), + "similarity": best_match.get("similarity", 0), + "anilist_id": best_match.get("anilist", {}).get("id", None), + "mal_id": best_match.get("anilist", {}).get("idMal", None), + "filename": best_match.get("filename", ""), + "season": best_match.get("season", None) + } + + # Conversion du timestamp + if best_match.get("from"): + processed["best_match"]["timestamp_formatted"] = self._format_timestamp( + best_match.get("from") + ) + + # Validation du seuil de confiance + confidence = best_match.get("similarity", 0) + processed["high_confidence"] = confidence >= 0.85 + processed["medium_confidence"] = confidence >= 0.70 + + processed["success"] = True + + except Exception as e: + processed["error"] = f"Erreur lors du traitement des résultats: {e}" + logger.error(f"Erreur traitement trace.moe: {e}") + + return processed + + def _format_timestamp(self, seconds: float) -> str: + """Formate un timestamp en HH:MM:SS""" + if not seconds: + return "00:00:00" + + hours = int(seconds // 3600) + minutes = int((seconds % 3600) // 60) + secs = int(seconds % 60) + + return f"{hours:02d}:{minutes:02d}:{secs:02d}" + + def verify_episode_number(self, video_path: Path, expected_episode: int, + expected_series: str = None) -> Dict[str, Any]: + """ + Vérifie si un fichier vidéo correspond à l'épisode attendu + + Args: + video_path: Chemin vers le fichier vidéo + expected_episode: Numéro d'épisode attendu + expected_series: Nom de série attendu (optionnel) + + Returns: + Résultat de la vérification + """ + verification = { + "success": False, + "episode_match": False, + "series_match": False, + "confidence": 0, + "identified_episode": None, + "identified_series": None, + "error": None + } + + try: + # Analyse de la vidéo + result = self.identify_anime_episode(video_path) + + if not result or not result["success"]: + verification["error"] = result.get("error", "Erreur d'identification") + return verification + + best_match = result["best_match"] + if not best_match: + verification["error"] = "Aucune correspondance trouvée" + return verification + + # Vérification de l'épisode + identified_episode = best_match.get("episode") + verification["identified_episode"] = identified_episode + verification["confidence"] = best_match.get("similarity", 0) + + if identified_episode is not None and identified_episode == expected_episode: + verification["episode_match"] = True + + # Vérification de la série si spécifiée + if expected_series: + anime_titles = [ + best_match.get("anime_title", "").lower(), + best_match.get("anime_title_en", "").lower(), + best_match.get("anime_title_native", "").lower() + ] + + expected_lower = expected_series.lower() + + for title in anime_titles: + if title and (expected_lower in title or title in expected_lower): + verification["series_match"] = True + break + + verification["identified_series"] = best_match.get("anime_title") + + # Succès si l'épisode correspond + verification["success"] = verification["episode_match"] + + except Exception as e: + verification["error"] = f"Erreur lors de la vérification: {e}" + logger.error(f"Erreur vérification épisode: {e}") + + return verification + + def batch_verify_episodes(self, episodes: List[Tuple[Path, int, str]]) -> List[Dict[str, Any]]: + """ + Vérification par lot de plusieurs épisodes + + Args: + episodes: Liste de tuples (video_path, expected_episode, expected_series) + + Returns: + Liste des résultats de vérification + """ + results = [] + + for video_path, expected_episode, expected_series in episodes: + logger.info(f"Vérification de {video_path.name}") + + result = self.verify_episode_number( + video_path, expected_episode, expected_series + ) + + results.append(result) + + # Petite pause entre les vérifications pour respecter les limites + time.sleep(self.rate_limit) + + return results + + def get_api_limits(self) -> Dict[str, Any]: + """Retourne les informations sur les limites de l'API""" + return { + "rate_limit": self.rate_limit, + "max_retries": self.max_retries, + "timeout": self.timeout, + "has_api_key": bool(self.api_key) + } + + def test_connection(self) -> Dict[str, Any]: + """Teste la connexion à l'API trace.moe""" + try: + # Création d'une petite image de test + import io + from PIL import Image + + # Création d'une image 1x1 noir + img = Image.new('RGB', (1, 1), color='black') + img_bytes = io.BytesIO() + img.save(img_bytes, format='PNG') + img_bytes.seek(0) + + # Test avec cette image + result = self.analyze_frame(img_bytes.read()) + + return { + "success": True, + "api_responding": result is not None, + "result": result + } + + except Exception as e: + return { + "success": False, + "error": str(e) + } \ No newline at end of file diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..c5ceaaf --- /dev/null +++ b/src/core/__init__.py @@ -0,0 +1,464 @@ +""" +Cœur de l'application AnimeLibrarian - Projet de la Légion de Muyue +""" + +import os +from pathlib import Path +from typing import Dict, Any, List, Optional, Tuple +import logging + +from .directory_checker import DirectoryChecker +from .file_scanner import FileScanner +from .media_detector import MediaDetector +from .file_renamer import FileRenamer +from ..api.thetvdb_client import TheTVDBClient +from ..api.tracemoe_client import TraceMoeClient +from ..models.episode import Series, Episode + +logger = logging.getLogger(__name__) + + +class AnimeLibrarianCore: + """Cœur logique de l'application AnimeLibrarian""" + + def __init__(self, config: Dict[str, Any] = None): + """ + Initialise le cœur de l'application + + Args: + config: Configuration de l'application + """ + self.config = config or {} + + # Initialisation des composants + self.directory_checker = DirectoryChecker(config) + self.file_scanner = FileScanner(config) + self.media_detector = MediaDetector(config) + self.file_renamer = FileRenamer(config) + + # Clients API + self.tvdb_client = TheTVDBClient( + api_key=self.config.get("thetvdb_api_key"), + language=self.config.get("language", "fra") + ) + self.trace_moe_client = TraceMoeClient(self.config) + + # État de l'application + self.current_directory = None + self.series_list = [] + self.selected_series = [] + + logger.info("Cœur d'AnimeLibrarian initialisé") + + def check_directory_compatibility(self, directory_path: str) -> Dict[str, Any]: + """ + Vérifie la compatibilité d'un répertoire + + Args: + directory_path: Chemin du répertoire à vérifier + + Returns: + Résultat détaillé de la vérification + """ + logger.info(f"Vérification de compatibilité: {directory_path}") + + result = self.directory_checker.check_directory(directory_path) + + if result.is_compatible: + self.current_directory = Path(directory_path) + logger.info(f"Répertoire compatible: {directory_path}") + else: + logger.warning(f"Répertoire incompatible: {directory_path}") + + return result.__dict__ + + def scan_series(self, directory_path: str = None) -> List[Dict[str, Any]]: + """ + Scan les séries dans le répertoire + + Args: + directory_path: Chemin du répertoire (utilise le courant si None) + + Returns: + Liste des séries trouvées + """ + if directory_path: + self.current_directory = Path(directory_path) + + if not self.current_directory: + raise ValueError("Aucun répertoire spécifié ou compatible") + + logger.info(f"Scan des séries dans: {self.current_directory}") + + # Scan des fichiers multimédia + media_files = self.file_scanner.scan_directory( + self.current_directory, + {'.mp4', '.mkv', '.avi', '.mov', '.wmv', '.flv', '.webm', + '.m4v', '.mpg', '.mpeg', '.3gp', '.ts', '.m2ts', '.ogv'} + ) + + # Détection de la structure des séries + series_structure = self._group_files_by_series(media_files) + + # Création des objets Series + self.series_list = [] + + for series_name, files in series_structure.items(): + series = self._create_series(series_name, files) + self.series_list.append(series) + + logger.info(f"Trouvé {len(self.series_list)} séries") + + return [self._serialize_series(series) for series in self.series_list] + + def verify_episodes_numbers(self, series_indices: List[int]) -> Dict[str, Any]: + """ + Vérifie les numéros d'épisodes avec trace.moe + + Args: + series_indices: Indices des séries à vérifier + + Returns: + Résultats de la vérification + """ + logger.info(f"Vérification des numéros d'épisodes pour {len(series_indices)} séries") + + results = { + "series_verified": 0, + "episodes_verified": 0, + "verification_results": [], + "errors": [] + } + + # Authentification TVDB si possible + self.tvdb_client.login() + + for idx in series_indices: + if 0 <= idx < len(self.series_list): + series = self.series_list[idx] + logger.info(f"Vérification de la série: {series.name}") + + try: + series_result = self._verify_series_episodes(series) + results["verification_results"].append(series_result) + results["series_verified"] += 1 + results["episodes_verified"] += len(series.episodes) + + except Exception as e: + error_msg = f"Erreur vérification série {series.name}: {e}" + logger.error(error_msg) + results["errors"].append(error_msg) + + logger.info(f"Vérification terminée: {results['series_verified']} séries, {results['episodes_verified']} épisodes") + return results + + def verify_files_integrity(self, series_indices: List[int]) -> Dict[str, Any]: + """ + Vérifie l'intégrité des fichiers + + Args: + series_indices: Indices des séries à vérifier + + Returns: + Résultats de la vérification d'intégrité + """ + logger.info(f"Vérification d'intégrité pour {len(series_indices)} séries") + + results = { + "files_checked": 0, + "valid_files": 0, + "invalid_files": [], + "issues": [], + "duplicates": [] + } + + # Collection de tous les fichiers + all_files = [] + + for idx in series_indices: + if 0 <= idx < len(self.series_list): + series = self.series_list[idx] + all_files.extend(series.episodes) + + # Vérification de chaque fichier + for episode in all_files: + metadata = self.media_detector.analyze_media_file(episode.file_path) + + results["files_checked"] += 1 + + if metadata.get("is_valid", False): + results["valid_files"] += 1 + + # Mise à jour des métadonnées de l'épisode + episode.duration = metadata.get("duration") + episode.resolution = metadata.get("resolution") + episode.codec = metadata.get("codec") + episode.verified = True + else: + results["invalid_files"].append({ + "filename": episode.filename, + "path": str(episode.file_path), + "issue": metadata.get("error", "Fichier invalide") + }) + results["issues"].append(f"Fichier invalide: {episode.filename}") + + # Détection des doublons + file_info_list = [{"path": ep.file_path, "size": ep.file_size} for ep in all_files] + duplicates = self.file_scanner.get_duplicates(file_info_list) + + if duplicates: + results["duplicates"] = [ + [str(dup["path"].name) for dup in dup_group] + for dup_group in duplicates + ] + results["issues"].append(f"{len(duplicates)} groupes de doublons détectés") + + logger.info(f"Intégrité vérifiée: {results['valid_files']}/{results['files_checked']} fichiers valides") + return results + + def rename_files(self, series_indices: List[int], dry_run: bool = False) -> Dict[str, Any]: + """ + Renomme les fichiers selon les normes TVDB + + Args: + series_indices: Indices des séries à traiter + dry_run: Si True, simule le renommage sans exécuter + + Returns: + Résultats du renommage + """ + logger.info(f"Renommage pour {len(series_indices)} séries (dry_run={dry_run})") + + # Configuration du mode dry_run + self.file_renamer.dry_run = dry_run + + results = { + "series_processed": 0, + "rename_plan": [], + "rename_results": [], + "stats": {} + } + + # Authentification TVDB si possible + self.tvdb_client.login() + + for idx in series_indices: + if 0 <= idx < len(self.series_list): + series = self.series_list[idx] + logger.info(f"Préparation du renommage pour: {series.name}") + + # Préparation du plan de renommage + rename_plan = self.file_renamer.prepare_rename_plan(series) + results["rename_plan"].extend(rename_plan) + + # Exécution + if rename_plan: + rename_results = self.file_renamer.execute_rename_plan(rename_plan) + results["rename_results"].extend(rename_results) + + results["series_processed"] += 1 + + # Statistiques + results["stats"] = self.file_renamer.get_stats() + + logger.info(f"Renommage terminé: {results['stats']['renamed']} fichiers renommés") + return results + + def _group_files_by_series(self, media_files: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]: + """Groupe les fichiers par série""" + import re + + series_structure = {} + + for file_info in media_files: + filename = file_info['path'].name + + # Extraction du nom de série + patterns = [ + r'^\[?([^\[\]]+?)\]?\s*[-_.]?\s*S\d{1,2}E\d{1,2}', + r'^([^-_\[\]]+?)(?:\s*[-_.]\s*)?(?:S\d{1,2}E\d{1,2}|Episode\s*\d{1,3}|E\d{1,3})', + r'^([^-_()]+)(?:\s*[-_.]\s*)?\d{1,3}', + ] + + series_name = filename # Fallback + + for pattern in patterns: + match = re.search(pattern, filename, re.IGNORECASE) + if match: + series_name = match.group(1).strip() + break + + # Nettoyage du nom + series_name = re.sub(r'[^\w\s-]', '', series_name) + series_name = re.sub(r'\s+', ' ', series_name).strip() + + if series_name not in series_structure: + series_structure[series_name] = [] + + series_structure[series_name].append(file_info) + + return series_structure + + def _create_series(self, series_name: str, files: List[Dict[str, Any]]) -> Series: + """Crée un objet Series à partir de fichiers""" + series = Series( + name=series_name, + directory=self.current_directory / series_name if self.current_directory else Path("."), + total_episodes=0 + ) + + # Recherche TVDB + tvdb_match = self.tvdb_client.search_best_match(series_name) + if tvdb_match: + series.tvdb_id = tvdb_match.get("id") + series.total_episodes = tvdb_match.get("totalEpisodes") + logger.debug(f"Série trouvée sur TVDB: {series_name} -> ID: {series.tvdb_id}") + + # Création des épisodes + episodes = [] + + for file_info in files: + # Analyse du nom de fichier + episode_info = self.media_detector.extract_episode_info(file_info['path'].name) + + # Création de l'objet Episode + episode = Episode( + file_path=file_info['path'], + series_name=series_name, + season=episode_info['season'], + episode=episode_info['episode'], + title=episode_info['title'], + special=episode_info['special'], + file_size=file_info['size'], + resolution=file_info.get('resolution'), + codec=file_info.get('codec') + ) + + # Analyse multimédia + media_metadata = self.media_detector.analyze_media_file(file_info['path']) + if media_metadata: + episode.duration = media_metadata.get('duration') + episode.resolution = media_metadata.get('resolution') or episode.resolution + episode.codec = media_metadata.get('codec') or episode.codec + episode.verified = media_metadata.get('is_valid', False) + + episodes.append(episode) + + # Tri des épisodes + episodes.sort(key=lambda ep: (ep.season, ep.episode)) + + # Ajout à la série + for episode in episodes: + series.add_episode(episode) + + return series + + def _verify_series_episodes(self, series: Series) -> Dict[str, Any]: + """Vérifie les épisodes d'une série avec trace.moe""" + result = { + "series_name": series.name, + "episodes_verified": 0, + "episode_results": [], + "summary": {} + } + + # Préparation des épisodes à vérifier + episodes_to_verify = [] + + for episode in series.episodes: + if not episode.special: # Ne vérifier que les épisodes normaux + episodes_to_verify.append((episode.file_path, episode.episode, series.name)) + + # Vérification par lot + if episodes_to_verify: + verification_results = self.trace_moe_client.batch_verify_episodes(episodes_to_verify) + + for i, (episode, verification_result) in enumerate(zip(series.episodes, verification_results)): + if not episode.special: # seulement les épisodes normaux + episode_result = { + "filename": episode.filename, + "expected_episode": episode.episode, + "identified_episode": verification_result.get("identified_episode"), + "confidence": verification_result.get("confidence", 0), + "match": verification_result.get("episode_match", False), + "error": verification_result.get("error") + } + + result["episode_results"].append(episode_result) + result["episodes_verified"] += 1 + + # Résumé + matches = sum(1 for r in result["episode_results"] if r.get("match", False)) + result["summary"] = { + "total": len(result["episode_results"]), + "matches": matches, + "mismatches": len(result["episode_results"]) - matches, + "match_rate": matches / len(result["episode_results"]) if result["episode_results"] else 0 + } + + return result + + def _serialize_series(self, series: Series) -> Dict[str, Any]: + """Sérialise un objet Series pour l'UI""" + completeness = series.check_completeness() + + return { + "name": series.name, + "directory": str(series.directory), + "total_episodes": len(series.episodes), + "regular_episodes": len(series.get_regular_episodes()), + "special_episodes": len(series.get_specials()), + "tvdb_id": series.tvdb_id, + "completeness": completeness, + "is_complete": completeness.get("is_complete", False), + "missing_episodes": completeness.get("missing_episodes", []), + "duplicate_episodes": completeness.get("duplicate_episodes", []), + "total_size": sum(ep.file_size for ep in series.episodes) + } + + def get_series_details(self, series_index: int) -> Optional[Dict[str, Any]]: + """ + Retourne les détails d'une série + + Args: + series_index: Index de la série + + Returns: + Détails de la série ou None + """ + if 0 <= series_index < len(self.series_list): + series = self.series_list[series_index] + + return { + "info": self._serialize_series(series), + "episodes": [ + { + "filename": ep.filename, + "season": ep.season, + "episode": ep.episode, + "title": ep.title, + "special": ep.special, + "duration": ep.duration, + "resolution": ep.resolution, + "codec": ep.codec, + "file_size": ep.file_size, + "verified": ep.verified, + "absolute_number": ep.absolute_number + } + for ep in series.episodes + ] + } + + return None + + def get_application_status(self) -> Dict[str, Any]: + """Retourne le statut actuel de l'application""" + return { + "current_directory": str(self.current_directory) if self.current_directory else None, + "series_count": len(self.series_list), + "total_episodes": sum(len(series.episodes) for series in self.series_list), + "tvdb_configured": bool(self.config.get("thetvdb_api_key")), + "trace_moe_configured": bool(self.config.get("trace_moe_api_key")), + "tvdb_authenticated": self.tvdb_client.token is not None, + "trace_moe_limits": self.trace_moe_client.get_api_limits() + } \ No newline at end of file diff --git a/src/core/directory_checker.py b/src/core/directory_checker.py new file mode 100644 index 0000000..7b609e9 --- /dev/null +++ b/src/core/directory_checker.py @@ -0,0 +1,333 @@ +""" +Vérificateur de compatibilité des répertoires pour AnimeLibrarian +""" + +import os +import shutil +import stat +from pathlib import Path +from typing import List, Dict, Any, Optional +import logging + +from ..models.episode import DirectoryCompatibilityResult, Series, Episode +from .file_scanner import FileScanner +from .media_detector import MediaDetector + +logger = logging.getLogger(__name__) + + +class DirectoryChecker: + """Vérifie si un répertoire est compatible avec AnimeLibrarian""" + + def __init__(self, config: Dict[str, Any] = None): + self.config = config or {} + self.file_scanner = FileScanner(config) + self.media_detector = MediaDetector() + + # Extensions vidéo supportées + self.video_extensions = { + '.mp4', '.mkv', '.avi', '.mov', '.wmv', '.flv', '.webm', + '.m4v', '.mpg', '.mpeg', '.3gp', '.ts', '.m2ts', '.ogv' + } + + # Tailles minimales/maximales pour les fichiers vidéo + self.min_video_size = 50 * 1024 * 1024 # 50 Mo + self.max_video_size = 50 * 1024 * 1024 * 1024 # 50 Go + + def check_directory(self, directory_path: str) -> DirectoryCompatibilityResult: + """ + Vérifie la compatibilité d'un répertoire + + Args: + directory_path: Chemin du répertoire à vérifier + + Returns: + DirectoryCompatibilityResult: Résultat détaillé de la vérification + """ + logger.info(f"Vérification du répertoire: {directory_path}") + + path = Path(directory_path).resolve() + result = DirectoryCompatibilityResult(path=path) + + # 1. Vérification de base du répertoire + self._check_basic_directory(path, result) + + if not result.is_compatible: + return result + + # 2. Vérification des permissions + self._check_permissions(path, result) + + # 3. Vérification de l'espace disque + self._check_disk_space(path, result) + + # 4. Analyse des séries et épisodes + if not self._analyze_media_structure(path, result): + return result + + # 5. Vérification finale de compatibilité + self._finalize_compatibility(result) + + logger.info(f"Vérification terminée: {result.summary()}") + return result + + def _check_basic_directory(self, path: Path, result: DirectoryCompatibilityResult): + """Vérifications de base du répertoire""" + + # Existence du répertoire + if not path.exists(): + result.add_error(f"Le répertoire n'existe pas: {path}") + return + + # C'est bien un répertoire + if not path.is_dir(): + result.add_error(f"Le chemin n'est pas un répertoire: {path}") + return + + # Accessibilité + if not os.access(path, os.R_OK): + result.add_error(f"Impossible de lire le répertoire: {path}") + return + + # Non-vide + try: + items = list(path.iterdir()) + if not items: + result.add_error("Le répertoire est vide") + return + except PermissionError: + result.add_error(f"Permission refusée lors de l'accès à: {path}") + return + + logger.debug("✅ Vérifications de base réussies") + + def _check_permissions(self, path: Path, result: DirectoryCompatibilityResult): + """Vérifie les permissions du répertoire""" + permissions = {} + + # Lecture + permissions['read'] = os.access(path, os.R_OK) + + # Écriture + permissions['write'] = os.access(path, os.W_OK) + + # Exécution (navigation) + permissions['execute'] = os.access(path, os.X_OK) + + # Propriétaire + try: + stat_info = path.stat() + permissions['is_owner'] = (stat_info.st_uid == os.getuid()) + except: + permissions['is_owner'] = False + + result.permissions = permissions + + # Ajout des avertissements si nécessaire + if not permissions['write']: + result.add_warning("Le répertoire n'est pas accessible en écriture (renommage impossible)") + + if not permissions['execute']: + result.add_error("Impossible de naviguer dans le répertoire") + result.is_compatible = False + + logger.debug(f"Permissions: {permissions}") + + def _check_disk_space(self, path: Path, result: DirectoryCompatibilityResult): + """Vérifie l'espace disque disponible""" + try: + stat_info = shutil.disk_usage(path) + + total = stat_info.total + free = stat_info.free + used = stat_info.used + + result.disk_space = { + 'total': total, + 'free': free, + 'used': used, + 'free_percent': round((free / total) * 100, 2) + } + + # Avertissement si moins de 10% d'espace libre + if result.disk_space['free_percent'] < 10: + result.add_warning(f"Moins de 10% d'espace disque disponible ({result.disk_space['free_percent']:.1f}%)") + + # Erreur si moins de 1 Go + if free < 1024 * 1024 * 1024: + result.add_warning("Moins de 1 Go d'espace disque disponible") + + except Exception as e: + result.add_warning(f"Impossible de vérifier l'espace disque: {e}") + + logger.debug(f"Espace disque: {result.disk_space}") + + def _analyze_media_structure(self, path: Path, result: DirectoryCompatibilityResult) -> bool: + """ + Analyse la structure des médias dans le répertoire + + Returns: + bool: True si la structure est valide + """ + try: + # Scan des fichiers multimédia + media_files = self.file_scanner.scan_directory(path, self.video_extensions) + + if not media_files: + result.add_error("Aucun fichier multimédia trouvé") + return False + + result.total_episodes = len(media_files) + + # Analyse de la structure des répertoires + series_found = self._detect_series_structure(path, media_files) + + if not series_found: + result.add_error("Structure de séries non détectée (doit contenir des sous-répertoires par série)") + return False + + result.found_series = list(series_found.keys()) + + # Calcul de la taille totale + total_size = 0 + for file_info in media_files: + total_size += file_info['size'] + + result.total_size = total_size + + # Validation de la qualité des fichiers + self._validate_media_files(media_files, result) + + logger.debug(f"Séries trouvées: {len(result.found_series)}, Épisodes: {result.total_episodes}") + return True + + except Exception as e: + result.add_error(f"Erreur lors de l'analyse des médias: {e}") + return False + + def _detect_series_structure(self, path: Path, media_files: List[Dict]) -> Dict[str, List[Dict]]: + """ + Détecte la structure des séries + + Returns: + Dict: Nom de série -> Liste de fichiers + """ + series_structure = {} + + # Analyse de la structure de répertoires + for item in path.iterdir(): + if item.is_dir(): + # Vérifier si le répertoire contient des vidéos + series_videos = self.file_scanner.scan_directory(item, self.video_extensions) + if series_videos: + series_name = item.name + series_structure[series_name] = series_videos + + # Si pas de sous-répertoires, essayer de grouper par nom de fichier + if not series_structure: + series_structure = self._group_flat_files(media_files) + + return series_structure + + def _group_flat_files(self, media_files: List[Dict]) -> Dict[str, List[Dict]]: + """Groupe les fichiers plats par nom de série""" + import re + + series_structure = {} + + for file_info in media_files: + filename = file_info['path'].stem + + # Tentative d'extraction du nom de série + # Format attendu: [SeriesName] S01E01 ou SeriesName S01E01 + pattern = r'^\[?([^\[\]]+)\]?\s*[_-]?\s*S\d{1,2}E\d{1,2}' + match = re.search(pattern, filename, re.IGNORECASE) + + if match: + series_name = match.group(1).strip() + else: + # Si pas de format SxxEyy, utiliser le début du nom + pattern = r'^([^-_]+)' + match = re.search(pattern, filename) + series_name = match.group(1).strip() if match else filename + + if series_name not in series_structure: + series_structure[series_name] = [] + + series_structure[series_name].append(file_info) + + return series_structure + + def _validate_media_files(self, media_files: List[Dict], result: DirectoryCompatibilityResult): + """Valide la qualité des fichiers multimédia""" + invalid_files = [] + + for file_info in media_files: + file_path = file_info['path'] + file_size = file_info['size'] + + # Vérification de la taille + if file_size < self.min_video_size: + invalid_files.append(f"{file_path.name} (trop petit: {file_size / 1024 / 1024:.1f} Mo)") + continue + + if file_size > self.max_video_size: + result.add_warning(f"{file_path.name} (taille inhabituelle: {file_size / 1024 / 1024:.1f} Mo)") + + # Vérification de l'extention + if file_path.suffix.lower() not in self.video_extensions: + invalid_files.append(f"{file_path.name} (extension non supportée)") + + if invalid_files: + result.add_error(f"Fichiers invalides détectés: {', '.join(invalid_files[:5])}") + if len(invalid_files) > 5: + result.add_error(f"...et {len(invalid_files) - 5} autres fichiers") + + def _finalize_compatibility(self, result: DirectoryCompatibilityResult): + """Finalise la décision de compatibilité""" + + # Si on a des erreurs, c'est incompatible + if result.errors: + result.is_compatible = False + return + + # Vérifications minimales + if len(result.found_series) == 0: + result.add_error("Aucune série détectée") + result.is_compatible = False + return + + if result.total_episodes == 0: + result.add_error("Aucun épisode détecté") + result.is_compatible = False + return + + # Si on peut écrire, c'est compatible + if result.permissions.get('write', False): + result.is_compatible = True + else: + # On peut quand même utiliser en lecture seule + result.is_compatible = True + result.add_warning("Mode lecture seule (renommage impossible)") + + def get_compatibility_recommendations(self, result: DirectoryCompatibilityResult) -> List[str]: + """Retourne des recommandations pour améliorer la compatibilité""" + recommendations = [] + + if result.errors: + recommendations.append("Corrigez les erreurs avant de continuer") + + if not result.permissions.get('write', False): + recommendations.append("Assurez-vous d'avoir les droits d'écriture pour le renommage") + + if result.disk_space.get('free_percent', 100) < 10: + recommendations.append("Libérez de l'espace disque avant de commencer") + + if len(result.found_series) == 0: + recommendations.append("Organisez vos fichiers en sous-répertoires par série") + recommendations.append("Exemple: /Series/NomDeLaSaison/S01E01.mkv") + + if result.total_episodes > 0 and result.total_size == 0: + recommendations.append("Certains fichiers semblent corrompus ou vides") + + return recommendations \ No newline at end of file diff --git a/src/core/file_renamer.py b/src/core/file_renamer.py new file mode 100644 index 0000000..2efa201 --- /dev/null +++ b/src/core/file_renamer.py @@ -0,0 +1,475 @@ +""" +Module de renommage de fichiers - Projet de la Légion de Muyue +""" + +import os +import shutil +from pathlib import Path +from typing import Dict, Any, List, Optional, Tuple +import logging + +from ..models.episode import Episode, Series +from ..api.thetvdb_client import TheTVDBClient + +logger = logging.getLogger(__name__) + + +class FileRenamer: + """Gère le renommage des fichiers selon les normes TVDB""" + + def __init__(self, config: Dict[str, Any] = None): + """ + Initialise le module de renommage + + Args: + config: Configuration du renommage + """ + self.config = config or {} + + # Options de renommage + self.dry_run = self.config.get("dry_run", False) + self.backup_original = self.config.get("backup_original", True) + self.include_absolute_number = self.config.get("include_absolute_number", True) + self.include_episode_title = self.config.get("include_episode_title", True) + self.include_technical_info = self.config.get("include_technical_info", True) + + # Client TVDB pour les titres + self.tvdb_client = TheTVDBClient( + api_key=self.config.get("thetvdb_api_key"), + language=self.config.get("language", "fra") + ) + + # Cache pour les titres d'épisodes + self.title_cache = {} + + # Statistiques + self.stats = { + "renamed": 0, + "skipped": 0, + "errors": 0, + "backups_created": 0 + } + + def prepare_rename_plan(self, series: Series) -> List[Dict[str, Any]]: + """ + Prépare un plan de renommage pour une série + + Args: + series: Série à traiter + + Returns: + Liste des opérations de renommage planifiées + """ + logger.info(f"Préparation du plan de renommage pour: {series.name}") + + # Authentification TVDB si possible + self.tvdb_client.login() + + # Recherche de la série sur TVDB + tvdb_series = None + if series.tvdb_id: + tvdb_series = self.tvdb_client.get_series_by_id(series.tvdb_id) + else: + tvdb_series = self.tvdb_client.search_best_match(series.name) + if tvdb_series: + series.tvdb_id = tvdb_series.get("id") + + # Construction de la map d'épisodes TVDB si disponible + episode_map = {} + if tvdb_series and series.tvdb_id: + episode_map = self.tvdb_client.build_episode_map(series.tvdb_id) + + # Préparation du plan de renommage + rename_plan = [] + + for episode in series.episodes: + # Titre de l'épisode + episode_title = self._get_episode_title(episode, tvdb_series, episode_map) + + # Génération du nouveau nom + new_name = self._generate_new_name(episode, series.name, episode_title) + + # Vérification si le renommage est nécessaire + if new_name != episode.filename: + rename_plan.append({ + "episode": episode, + "old_path": episode.file_path, + "new_name": new_name, + "new_path": episode.file_path.parent / new_name, + "reason": self._get_rename_reason(episode, series.name, episode_title), + "tvdb_matched": bool(episode_title and episode_title != f"Episode {episode.episode:02d}") + }) + + logger.info(f"Plan de renommage: {len(rename_plan)} fichiers à renommer") + return rename_plan + + def execute_rename_plan(self, rename_plan: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """ + Exécute un plan de renommage + + Args: + rename_plan: Plan de renommage à exécuter + + Returns: + Résultats des opérations de renommage + """ + logger.info(f"Exécution du plan de renommage: {len(rename_plan)} opérations") + + results = [] + + for operation in rename_plan: + result = self._rename_file(operation) + results.append(result) + + if result["success"]: + self.stats["renamed"] += 1 + if result["backup_created"]: + self.stats["backups_created"] += 1 + else: + self.stats["errors"] += 1 + + logger.info(f"Plan exécuté: {self.stats['renamed']} renommés, {self.stats['errors']} erreurs") + return results + + def _rename_file(self, operation: Dict[str, Any]) -> Dict[str, Any]: + """ + Renomme un fichier spécifique + + Args: + operation: Détails de l'opération de renommage + + Returns: + Résultat de l'opération + """ + episode = operation["episode"] + old_path = operation["old_path"] + new_path = operation["new_path"] + + result = { + "episode": episode, + "old_path": old_path, + "new_path": new_path, + "success": False, + "error": None, + "backup_created": False, + "dry_run": self.dry_run + } + + try: + # Vérifications préalables + if not old_path.exists(): + result["error"] = "Le fichier source n'existe plus" + return result + + if new_path.exists(): + result["error"] = "Le fichier de destination existe déjà" + return result + + if not old_path.parent.exists(): + result["error"] = "Le répertoire source n'existe plus" + return result + + # Vérification des permissions + if not os.access(old_path, os.R_OK): + result["error"] = "Permission de lecture refusée" + return result + + if not os.access(old_path.parent, os.W_OK): + result["error"] = "Permission d'écriture refusée" + return result + + # Mode dry-run: juste simulation + if self.dry_run: + result["success"] = True + result["message"] = f"[DRY-RUN] {old_path.name} -> {new_path.name}" + logger.info(f"[DRY-RUN] Renommage simulé: {old_path.name} -> {new_path.name}") + return result + + # Création de la sauvegarde si demandé + backup_path = None + if self.backup_original: + backup_path = self._create_backup(old_path) + result["backup_created"] = backup_path is not None + + # Renommage effectif + try: + old_path.rename(new_path) + result["success"] = True + result["message"] = f"Renommé: {old_path.name} -> {new_path.name}" + logger.info(f"Fichier renommé: {old_path.name} -> {new_path.name}") + + # Mise à jour du chemin dans l'épisode + episode.file_path = new_path + + except Exception as rename_error: + # Restauration de la sauvegarde si disponible + if backup_path and backup_path.exists(): + try: + shutil.copy2(backup_path, old_path) + logger.info(f"Sauvegarde restaurée suite à l'erreur: {old_path}") + except: + pass + + result["error"] = f"Erreur lors du renommage: {rename_error}" + logger.error(f"Erreur renommage {old_path}: {rename_error}") + + except Exception as e: + result["error"] = f"Erreur lors de la préparation du renommage: {e}" + logger.error(f"Erreur préparation renommage {old_path}: {e}") + + return result + + def _create_backup(self, file_path: Path) -> Optional[Path]: + """ + Crée une sauvegarde du fichier + + Args: + file_path: Chemin du fichier à sauvegarder + + Returns: + Chemin de la sauvegarde ou None si erreur + """ + try: + # Nom de la sauvegarde avec timestamp + import datetime + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + backup_name = f"{file_path.stem}_backup_{timestamp}{file_path.suffix}" + backup_path = file_path.parent / ".backups" / backup_name + + # Création du répertoire de sauvegarde + backup_path.parent.mkdir(exist_ok=True) + + # Copie du fichier + shutil.copy2(file_path, backup_path) + logger.debug(f"Sauvegarde créée: {backup_path}") + + return backup_path + + except Exception as e: + logger.error(f"Erreur création sauvegarde {file_path}: {e}") + return None + + def _get_episode_title(self, episode: Episode, tvdb_series: Dict[str, Any], + episode_map: Dict[Tuple[int, int], Dict[str, Any]]) -> str: + """ + Récupère le titre d'un épisode depuis TVDB + + Args: + episode: Épisode concerné + tvdb_series: Série TVDB + episode_map: Map des épisodes TVDB + + Returns: + Titre de l'épisode + """ + # Vérification du cache + cache_key = (episode.series_name, episode.season, episode.episode) + if cache_key in self.title_cache: + return self.title_cache[cache_key] + + # Récupération depuis TVDB + title = None + if tvdb_series and episode_map: + episode_key = (episode.season, episode.episode) + if episode_key in episode_map: + title = episode_map[episode_key].get("episodeName") + + # Fallback: utilisation du titre existant ou titre générique + if not title: + title = episode.title if episode.title else f"Episode {episode.episode:02d}" + + # Mise en cache + self.title_cache[cache_key] = title + return title + + def _generate_new_name(self, episode: Episode, series_name: str, episode_title: str) -> str: + """ + Génère le nouveau nom de fichier pour un épisode + + Args: + episode: Épisode à renommer + series_name: Nom de la série + episode_title: Titre de l'épisode + + Returns: + Nouveau nom de fichier + """ + # Nettoyage du nom de série + clean_series = self._clean_filename(series_name) + + # Construction du nom de base + if episode.special: + base_name = f"{clean_series} - S00E{episode.episode:02d}" + else: + base_name = f"{clean_series} - S{episode.season:02d}E{episode.episode:02d}" + + # Numéro absolu si disponible + if self.include_absolute_number and episode.absolute_number: + base_name += f" ({episode.absolute_number})" + + # Titre de l'épisode + if self.include_episode_title and episode_title: + clean_title = self._clean_filename(episode_title) + base_name += f" - {clean_title}" + + # Informations techniques + if self.include_technical_info: + technical_info = self._build_technical_info(episode) + if technical_info: + base_name += f" [{technical_info}]" + + return f"{base_name}{episode.file_path.suffix}" + + def _clean_filename(self, name: str) -> str: + """Nettoie une chaîne pour l'utiliser dans un nom de fichier""" + if not name: + return "" + + # Suppression des caractères invalides + invalid_chars = '<>:"/\\|?*' + for char in invalid_chars: + name = name.replace(char, '') + + # Remplacement des caractères problématiques + name = name.replace('\n', ' ').replace('\r', ' ') + + # Normalisation des espaces + while ' ' in name: + name = name.replace(' ', ' ') + + # Nettoyage au début et à la fin + name = name.strip(' -._') + + return name + + def _build_technical_info(self, episode: Episode) -> str: + """Construit la chaîne d'informations techniques""" + parts = [] + + # Résolution + if episode.resolution: + parts.append(episode.resolution) + + # Codec + if episode.codec: + parts.append(episode.codec) + + # Source (extrait du nom si disponible) + source = self._extract_source_from_filename(episode.filename) + if source: + parts.append(source) + + # Checksum si disponible + if episode.checksum and len(episode.checksum) >= 8: + parts.append(episode.checksum[:8]) + + return ' '.join(parts) + + def _extract_source_from_filename(self, filename: str) -> Optional[str]: + """Extrait la source du nom de fichier""" + import re + + patterns = [ + r'(BD|Blu-?Ray|BluRay)', + r'(WEB[-\s]?DL|WEBRip)', + r'(HDTV|TV)', + r'(DVD)' + ] + + for pattern in patterns: + match = re.search(pattern, filename, re.IGNORECASE) + if match: + return match.group(1).upper() + + return None + + def _get_rename_reason(self, episode: Episode, series_name: str, episode_title: str) -> str: + """Génère la raison du renommage pour les logs""" + reasons = [] + + # Vérification du format SxxEyy + if not self._has_standard_episode_format(episode.filename): + reasons.append("Format SxxEyy manquant") + + # Vérification du nom de série + if not self._has_correct_series_name(episode.filename, series_name): + reasons.append("Nom de série incorrect") + + # Vérification du titre d'épisode + if self.include_episode_title and episode_title: + if not self._has_episode_title(episode.filename): + reasons.append("Titre d'épisode manquant") + + # Vérification des informations techniques + if self.include_technical_info and not self._has_technical_info(episode.filename): + reasons.append("Informations techniques manquantes") + + # Vérification de la propreté du nom + if self._has_invalid_characters(episode.filename): + reasons.append("Caractères invalides détectés") + + return "; ".join(reasons) if reasons else "Formatage selon les standards TVDB" + + def _has_standard_episode_format(self, filename: str) -> bool: + """Vérifie si le nom de fichier utilise le format SxxEyy standard""" + import re + return bool(re.search(r'S\d{1,2}E\d{1,2}', filename, re.IGNORECASE)) + + def _has_correct_series_name(self, filename: str, series_name: str) -> bool: + """Vérifie si le nom de série est correct""" + import re + clean_series = self._clean_filename(series_name).lower() + clean_filename = filename.lower() + + return clean_series in clean_filename + + def _has_episode_title(self, filename: str) -> bool: + """Vérifie si le nom de fichier contient un titre d'épisode""" + # Heuristique: recherche de patterns typiques de titres + import re + + # Patterns qui suggèrent un titre + patterns = [ + r'S\d{1,2}E\d{1,2}\s*[-_]\s*[^.\[\]()]+', # S01E01 - Title + r'S\d{1,2}E\d{1,2}\s*([^.\[\]()]+)', # S01E01 Title + ] + + for pattern in patterns: + if re.search(pattern, filename, re.IGNORECASE): + return True + + return False + + def _has_technical_info(self, filename: str) -> bool: + """Vérifie si le nom de fichier contient des informations techniques""" + import re + + tech_patterns = [ + r'\d{3,4}p', # 1080p, 720p + r'(H\.?264|H\.?265|x264|x265)', # Codecs + r'(BD|WEB|DVD|HDTV)' # Sources + ] + + for pattern in tech_patterns: + if re.search(pattern, filename, re.IGNORECASE): + return True + + return False + + def _has_invalid_characters(self, filename: str) -> bool: + """Vérifie la présence de caractères invalides""" + invalid_chars = '<>:"|?*' + return any(char in filename for char in invalid_chars) + + def get_stats(self) -> Dict[str, Any]: + """Retourne les statistiques de renommage""" + return self.stats.copy() + + def reset_stats(self): + """Réinitialise les statistiques""" + self.stats = { + "renamed": 0, + "skipped": 0, + "errors": 0, + "backups_created": 0 + } \ No newline at end of file diff --git a/src/core/file_scanner.py b/src/core/file_scanner.py new file mode 100644 index 0000000..9d6a9ed --- /dev/null +++ b/src/core/file_scanner.py @@ -0,0 +1,239 @@ +""" +Scanneur de fichiers pour AnimeLibrarian +""" + +import os +import hashlib +from pathlib import Path +from typing import List, Dict, Any, Set, Optional +import logging + +logger = logging.getLogger(__name__) + + +class FileScanner: + """Scanne les fichiers multimédia dans les répertoires""" + + def __init__(self, config: Dict[str, Any] = None): + self.config = config or {} + + # Filtres de fichiers et répertoires à ignorer + self.ignore_patterns = { + # Répertoires + '.git', '.svn', '.hg', '.bzr', '__pycache__', 'node_modules', '.vscode', + '.idea', '.DS_Store', 'Thumbs.db', 'Desktop.ini', 'System Volume Information', + '$RECYCLE.BIN', 'Recycled', 'Temp', 'tmp', 'cache', 'Cache', + + # Extensions de fichiers à ignorer + '.txt', '.log', '.nfo', '.sfv', '.md5', '.sha1', '.dat', '.db', + '.ini', '.conf', '.cfg', '.xml', '.json', '.yml', '.yaml', + '.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.ico', + '.mp3', '.wav', '.flac', '.ogg', '.m4a', '.aac', + '.srt', '.ass', '.ssa', '.sub', '.idx', '.vtt', + '.zip', '.rar', '.7z', '.tar', '.gz', '.bz2', '.exe', '.dmg', + + # Fichiers de samples + 'sample', 'SAMPLE', 'Sample' + } + + def scan_directory(self, directory: Path, extensions: Set[str], + recursive: bool = True, max_depth: int = 10) -> List[Dict[str, Any]]: + """ + Scanne un répertoire à la recherche de fichiers avec les extensions spécifiées + + Args: + directory: Répertoire à scanner + extensions: Set d'extensions de fichiers à rechercher + recursive: Si True, scan récursivement les sous-répertoires + max_depth: Profondeur maximale de scan récursif + + Returns: + Liste de dictionnaires avec informations sur chaque fichier + """ + logger.debug(f"Scan du répertoire: {directory} (extensions: {extensions})") + + if not directory.exists() or not directory.is_dir(): + logger.warning(f"Répertoire invalide: {directory}") + return [] + + media_files = [] + + try: + if recursive: + for depth, root, dirs, files in self._walk_directory(directory, max_depth): + # Filtrage des répertoires à ignorer + dirs[:] = [d for d in dirs if not self._should_ignore(d, is_directory=True)] + + # Analyse des fichiers + for filename in files: + if self._should_ignore(filename): + continue + + file_path = Path(root) / filename + if file_path.suffix.lower() in extensions: + file_info = self._analyze_file(file_path) + if file_info: + media_files.append(file_info) + else: + for item in directory.iterdir(): + if item.is_file() and not self._should_ignore(item.name): + if item.suffix.lower() in extensions: + file_info = self._analyze_file(item) + if file_info: + media_files.append(file_info) + + except PermissionError: + logger.error(f"Permission refusée lors du scan de: {directory}") + except Exception as e: + logger.error(f"Erreur lors du scan de {directory}: {e}") + + logger.debug(f"Trouvé {len(media_files)} fichiers multimédia") + return media_files + + def _walk_directory(self, directory: Path, max_depth: int): + """Générateur pour parcourir les répertoires avec contrôle de profondeur""" + directory = directory.resolve() + for root, dirs, files in os.walk(directory): + depth = Path(root).relative_to(directory).parts + if len(depth) > max_depth: + dirs[:] = [] # Ne pas explorer plus profondément + continue + + yield (len(depth), root, dirs, files) + + def _should_ignore(self, name: str, is_directory: bool = False) -> bool: + """Détermine si un fichier/répertoire doit être ignoré""" + + # Ignorer les fichiers cachés + if name.startswith('.') and not name.startswith('._'): + return True + + # Ignorer les fichiers système + if name.lower() in {'system volume information', '$recycle.bin', 'recycled'}: + return True + + # Vérification des patterns d'ignore + for pattern in self.ignore_patterns: + if pattern.lower() in name.lower(): + return True + + return False + + def _analyze_file(self, file_path: Path) -> Optional[Dict[str, Any]]: + """Analyse un fichier et retourne ses métadonnées""" + try: + stat_info = file_path.stat() + + file_info = { + 'path': file_path, + 'name': file_path.name, + 'stem': file_path.stem, + 'suffix': file_path.suffix.lower(), + 'size': stat_info.st_size, + 'created': stat_info.st_ctime, + 'modified': stat_info.st_mtime, + 'is_readable': os.access(file_path, os.R_OK), + 'is_writable': os.access(file_path, os.W_OK) + } + + # Calcul du checksum si nécessaire (optionnel pour les gros fichiers) + if stat_info.st_size < 1024 * 1024 * 100: # < 100 Mo + try: + file_info['checksum'] = self._calculate_checksum(file_path) + except: + file_info['checksum'] = None + + return file_info + + except Exception as e: + logger.error(f"Erreur lors de l'analyse du fichier {file_path}: {e}") + return None + + def _calculate_checksum(self, file_path: Path, algorithm: str = 'md5') -> str: + """Calcule le checksum d'un fichier""" + hash_func = hashlib.new(algorithm) + + with open(file_path, 'rb') as f: + for chunk in iter(lambda: f.read(8192), b''): + hash_func.update(chunk) + + return hash_func.hexdigest() + + def get_duplicates(self, files: List[Dict[str, Any]]) -> List[List[Dict[str, Any]]]: + """Identifie les fichiers en double basés sur le nom et la taille""" + + # Groupement par nom et taille + groups = {} + + for file_info in files: + key = (file_info['name'], file_info['size']) + if key not in groups: + groups[key] = [] + groups[key].append(file_info) + + # Filtrage pour ne garder que les groupes avec doublons + duplicates = [group for group in groups.values() if len(group) > 1] + + return duplicates + + def get_checksum_duplicates(self, files: List[Dict[str, Any]]) -> List[List[Dict[str, Any]]]: + """Identifie les fichiers en double basés sur le checksum""" + + # S'assurer que tous les fichiers ont un checksum + for file_info in files: + if 'checksum' not in file_info or not file_info['checksum']: + file_info['checksum'] = self._calculate_checksum(file_info['path']) + + # Groupement par checksum + groups = {} + for file_info in files: + checksum = file_info['checksum'] + if checksum: + if checksum not in groups: + groups[checksum] = [] + groups[checksum].append(file_info) + + # Filtrage pour ne garder que les groupes avec doublons + duplicates = [group for group in groups.values() if len(group) > 1] + + return duplicates + + def get_file_stats(self, files: List[Dict[str, Any]]) -> Dict[str, Any]: + """Retourne des statistiques sur les fichiers""" + if not files: + return { + 'total_files': 0, + 'total_size': 0, + 'avg_size': 0, + 'max_size': 0, + 'min_size': 0, + 'extensions': {}, + 'total_readable': 0, + 'total_writable': 0 + } + + total_size = sum(f['size'] for f in files) + avg_size = total_size // len(files) + max_size = max(f['size'] for f in files) + min_size = min(f['size'] for f in files) + + # Compte par extension + extensions = {} + for file_info in files: + ext = file_info['suffix'] + extensions[ext] = extensions.get(ext, 0) + 1 + + # Permissions + readable = sum(1 for f in files if f['is_readable']) + writable = sum(1 for f in files if f['is_writable']) + + return { + 'total_files': len(files), + 'total_size': total_size, + 'avg_size': avg_size, + 'max_size': max_size, + 'min_size': min_size, + 'extensions': extensions, + 'total_readable': readable, + 'total_writable': writable + } \ No newline at end of file diff --git a/src/core/media_detector.py b/src/core/media_detector.py new file mode 100644 index 0000000..57c31f9 --- /dev/null +++ b/src/core/media_detector.py @@ -0,0 +1,352 @@ +""" +Détecteur de médias et analyseur de fichiers multimédia +""" + +import os +import re +import subprocess +from pathlib import Path +from typing import List, Dict, Any, Optional, Tuple +import logging + +logger = logging.getLogger(__name__) + + +class MediaDetector: + """Détecte et analyse les fichiers multimédia""" + + def __init__(self, config: Dict[str, Any] = None): + self.config = config or {} + self.ffprobe_path = self._find_ffprobe() + + # Patterns pour détecter les numéros d'épisodes + self.episode_patterns = [ + # S01E01, S1E1 + r'(?:S(\d{1,2})\s*[-_.]?\s*)?E(\d{1,2})', + # 01, Episode 01, Ep 01 + r'(?:Episode|Ep)\s*(\d{1,3})', + # Numbers in brackets: [01], (01) + r'[\[({](\d{1,3})[\])}]', + # Standalone numbers at end + r'(\d{1,3})(?:\.\w+)?$', + # Pattern for absolute numbering: 001-100 + r'(\d{3})-(?:\d{3})' + ] + + # Patterns pour détecter les informations techniques + self.resolution_patterns = [ + r'(\d{3,4}p)', # 1080p, 720p, 480p + r'(4K|UHD)', # 4K, UHD + r'(HD|SD)', # HD, SD + ] + + self.codec_patterns = [ + r'(H\.?264|H\.?265|AVC|HEVC|x264|x265)', + r'(XVID|DIVX)', + r'(VP9|AV1)', + ] + + self.source_patterns = [ + r'(BD|Blu-?Ray)', + r'(WEB|WEB-?DL|WEBRip)', + r'(DVD)', + r'(HDTV|TV)', + ] + + def _find_ffprobe(self) -> Optional[str]: + """Cherche le chemin de ffprobe""" + try: + result = subprocess.run( + ['ffprobe', '-version'], + capture_output=True, + text=True, + timeout=5 + ) + if result.returncode == 0: + return 'ffprobe' + except: + pass + + # Chemins courants + common_paths = [ + '/usr/bin/ffprobe', + '/usr/local/bin/ffprobe', + '/opt/homebrew/bin/ffprobe', + ] + + for path in common_paths: + if os.path.exists(path): + return path + + return None + + def analyze_media_file(self, file_path: Path) -> Dict[str, Any]: + """ + Analyse un fichier multimédia et extrait ses métadonnées + + Returns: + Dict contenant les métadonnées du fichier + """ + logger.debug(f"Analyse du fichier: {file_path}") + + metadata = { + 'file_path': file_path, + 'duration': None, + 'resolution': None, + 'codec': None, + 'bitrate': None, + 'audio_tracks': 0, + 'subtitle_tracks': 0, + 'file_size': 0, + 'is_valid': False + } + + # Taille du fichier + try: + metadata['file_size'] = file_path.stat().st_size + except: + pass + + # Analyse avec ffprobe si disponible + if self.ffprobe_path: + metadata.update(self._analyze_with_ffprobe(file_path)) + + # Analyse basée sur le nom de fichier + filename_info = self._analyze_filename(file_path.name) + metadata.update(filename_info) + + # Validation du fichier + metadata['is_valid'] = self._validate_file(metadata) + + return metadata + + def _analyze_with_ffprobe(self, file_path: Path) -> Dict[str, Any]: + """Analyse un fichier avec ffprobe""" + metadata = {} + + try: + cmd = [ + self.ffprobe_path, + '-v', 'quiet', + '-print_format', 'json', + '-show_format', + '-show_streams', + str(file_path) + ] + + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=30 + ) + + if result.returncode == 0: + import json + probe_data = json.loads(result.stdout) + + # Informations du format + format_info = probe_data.get('format', {}) + metadata['duration'] = float(format_info.get('duration', 0)) + metadata['bitrate'] = int(format_info.get('bit_rate', 0)) + + # Analyse des streams + video_streams = [] + audio_streams = [] + subtitle_streams = [] + + for stream in probe_data.get('streams', []): + if stream.get('codec_type') == 'video': + video_streams.append(stream) + elif stream.get('codec_type') == 'audio': + audio_streams.append(stream) + elif stream.get('codec_type') == 'subtitle': + subtitle_streams.append(stream) + + metadata['audio_tracks'] = len(audio_streams) + metadata['subtitle_tracks'] = len(subtitle_streams) + + # Informations vidéo + if video_streams: + video = video_streams[0] + metadata['codec'] = video.get('codec_name', '').upper() + + width = video.get('width') + height = video.get('height') + if width and height: + metadata['resolution'] = f"{height}p" + + # Détection du 4K + if height >= 2160: + metadata['resolution'] = "4K" + + except subprocess.TimeoutExpired: + logger.warning(f"Timeout lors de l'analyse de {file_path}") + except Exception as e: + logger.warning(f"Erreur ffprobe pour {file_path}: {e}") + + return metadata + + def _analyze_filename(self, filename: str) -> Dict[str, Any]: + """Extrait les informations du nom de fichier""" + metadata = {} + + # Nettoyage du nom de fichier (sans l'extension) + clean_name = Path(filename).stem.lower() + + # Recherche des informations techniques + for pattern in self.resolution_patterns: + match = re.search(pattern, clean_name, re.IGNORECASE) + if match and 'resolution' not in metadata: + metadata['resolution'] = match.group(1).upper() + break + + for pattern in self.codec_patterns: + match = re.search(pattern, clean_name, re.IGNORECASE) + if match and 'codec' not in metadata: + metadata['codec'] = match.group(1).upper() + break + + # Détection de la source + for pattern in self.source_patterns: + match = re.search(pattern, clean_name, re.IGNORECASE) + if match and 'source' not in metadata: + metadata['source'] = match.group(1).upper() + break + + return metadata + + def _validate_file(self, metadata: Dict[str, Any]) -> bool: + """Valide si le fichier semble être un épisode valide""" + + # Taille minimale (50 Mo) + if metadata.get('file_size', 0) < 50 * 1024 * 1024: + return False + + # Durée minimale (5 minutes) + if metadata.get('duration', 0) and metadata['duration'] < 300: + return False + + # Codec vidéo + valid_codecs = {'H264', 'H265', 'AVC', 'HEVC', 'X264', 'X265', 'VP9', 'AV1'} + codec = metadata.get('codec', '').upper() + if codec and codec not in valid_codecs: + return False + + return True + + def extract_episode_info(self, filename: str) -> Dict[str, Any]: + """ + Extrait les informations d'épisode du nom de fichier + + Returns: + Dict avec season, episode, title, etc. + """ + episode_info = { + 'season': 1, # Par défaut + 'episode': None, + 'title': None, + 'special': False, + 'absolute_number': None + } + + clean_name = Path(filename).stem + + # Tentatives avec différents patterns + for pattern in self.episode_patterns: + match = re.search(pattern, clean_name, re.IGNORECASE) + if match: + groups = match.groups() + + # S01E01 format + if len(groups) >= 2 and groups[0] and groups[1]: + episode_info['season'] = int(groups[0]) + episode_info['episode'] = int(groups[1]) + # E01 or Episode 01 format + elif len(groups) >= 1 and groups[0]: + episode_num = int(groups[0]) + + # Si c'est un grand nombre, c'est probablement un épisode spécial + if episode_num > 100: + episode_info['episode'] = episode_num + episode_info['special'] = True + else: + episode_info['episode'] = episode_num + + break + + # Extraction du titre si possible + episode_info['title'] = self._extract_title(clean_name, episode_info) + + return episode_info + + def _extract_title(self, filename: str, episode_info: Dict[str, Any]) -> Optional[str]: + """Extrait le titre de l'épisode du nom de fichier""" + + # Suppression des numéros d'épisode et patterns techniques + title_part = filename + + # Suppression du pattern SxxEyy + title_part = re.sub(r'S\d{1,2}\s*[-_.]?\s*E\d{1,2}', '', title_part, flags=re.IGNORECASE) + + # Suppression du pattern Episode xx + title_part = re.sub(r'(?:Episode|Ep)\s*\d{1,3}', '', title_part, flags=re.IGNORECASE) + + # Suppression des patterns techniques + for pattern in self.resolution_patterns + self.codec_patterns + self.source_patterns: + title_part = re.sub(pattern, '', title_part, flags=re.IGNORECASE) + + # Suppression des crochets et parenthèses vides + title_part = re.sub(r'[\[\](){}]', '', title_part) + + # Nettoyage final + title_part = re.sub(r'^[-_\s]+|[-_\s]+$', '', title_part) # Suppression au début/fin + title_part = re.sub(r'[-_\s]{2,}', ' ', title_part) # Remplacement multiples par un seul + + return title_part.strip() if title_part.strip() else None + + def get_series_name_from_filename(self, filename: str) -> str: + """Extrait le nom de la série du nom de fichier""" + + clean_name = Path(filename).stem + + # Pattern pour le nom avant le numéro d'épisode + patterns = [ + r'^\[?([^\[\]]+?)\]?\s*[-_.]?\s*S\d{1,2}E\d{1,2}', + r'^([^-_\[\]]+?)(?:\s*[-_.]\s*)?(?:S\d{1,2}E\d{1,2}|Episode\s*\d{1,3}|E\d{1,3})', + r'^([^-_()]+)(?:\s*[-_.]\s*)?\d{1,3}', + ] + + for pattern in patterns: + match = re.search(pattern, clean_name, re.IGNORECASE) + if match: + series_name = match.group(1).strip() + # Nettoyage du nom + series_name = re.sub(r'[^\w\s-]', '', series_name) + series_name = re.sub(r'\s+', ' ', series_name).strip() + return series_name + + # Fallback: retourne le début du nom + words = clean_name.split() + if len(words) >= 2: + return ' '.join(words[:2]) + + return clean_name + + def batch_analyze(self, file_paths: List[Path]) -> List[Dict[str, Any]]: + """Analyse par lots plusieurs fichiers multimédia""" + results = [] + + for file_path in file_paths: + try: + metadata = self.analyze_media_file(file_path) + results.append(metadata) + except Exception as e: + logger.error(f"Erreur lors de l'analyse de {file_path}: {e}") + results.append({ + 'file_path': file_path, + 'error': str(e), + 'is_valid': False + }) + + return results \ No newline at end of file diff --git a/src/models/__init__.py b/src/models/__init__.py new file mode 100644 index 0000000..43539fe --- /dev/null +++ b/src/models/__init__.py @@ -0,0 +1 @@ +# Package models \ No newline at end of file diff --git a/src/models/episode.py b/src/models/episode.py new file mode 100644 index 0000000..ba1f6e5 --- /dev/null +++ b/src/models/episode.py @@ -0,0 +1,175 @@ +""" +Modèle de données pour les séries et épisodes +""" + +from dataclasses import dataclass, field +from typing import List, Optional, Dict, Any +from pathlib import Path +import datetime + + +@dataclass +class Episode: + """Représente un épisode d'anime""" + file_path: Path + series_name: str + season: int = 1 + episode: int = 0 + title: Optional[str] = None + absolute_number: Optional[int] = None + special: bool = False + file_size: int = 0 + duration: Optional[float] = None + checksum: Optional[str] = None + resolution: Optional[str] = None + codec: Optional[str] = None + verified: bool = False + metadata: Dict[str, Any] = field(default_factory=dict) + + @property + def season_episode_str(self) -> str: + """Retourne la représentation SXXEYY""" + if self.special: + return f"S00E{self.episode:02d}" + return f"S{self.season:02d}E{self.episode:02d}" + + @property + def filename(self) -> str: + """Retourne le nom de fichier actuel""" + return self.file_path.name + + def get_recommended_name(self, series_name: str, include_absolute: bool = False) -> str: + """Génère le nom recommandé selon le format TVDB""" + # Nettoyage du nom de série + clean_series = self._clean_series_name(series_name) + + # Construction du nom de base + if self.special: + base_name = f"{clean_series} - S00E{self.episode:02d}" + else: + base_name = f"{clean_series} - S{self.season:02d}E{self.episode:02d}" + if include_absolute and self.absolute_number: + base_name += f" ({self.absolute_number})" + + # Ajout du titre si disponible + if self.title: + clean_title = self._clean_title(self.title) + base_name += f" - {clean_title}" + + # Ajout des informations techniques + suffix = "" + if self.resolution: + suffix += f" [{self.resolution}]" + if self.codec: + suffix += f" [{self.codec}]" + if self.checksum: + suffix += f" [{self.checksum}]" + + return f"{base_name}{suffix}{self.file_path.suffix}" + + def _clean_series_name(self, name: str) -> str: + """Nettoie le nom de série pour le formatage""" + # Suppression des caractères spéciaux et normalisation + import re + name = re.sub(r'[<>:"/\\|?*]', '', name) + name = re.sub(r'\s+', ' ', name).strip() + return name + + def _clean_title(self, title: str) -> str: + """Nettoie le titre d'épisode pour le formatage""" + import re + title = re.sub(r'[<>:"/\\|?*]', '', title) + title = re.sub(r'\s+', ' ', title).strip() + return title + + +@dataclass +class Series: + """Représente une série d'anime""" + name: str + directory: Path + episodes: List[Episode] = field(default_factory=list) + tvdb_id: Optional[int] = None + anidb_id: Optional[int] = None + total_episodes: Optional[int] = None + special_count: int = 0 + languages: List[str] = field(default_factory=list) + metadata: Dict[str, Any] = field(default_factory=dict) + last_scan: Optional[datetime.datetime] = None + + def add_episode(self, episode: Episode): + """Ajoute un épisode à la série""" + self.episodes.append(episode) + + def get_episode_by_number(self, season: int, episode: int) -> Optional[Episode]: + """Récupère un épisode par son numéro""" + for ep in self.episodes: + if ep.season == season and ep.episode == episode: + return ep + return None + + def get_specials(self) -> List[Episode]: + """Retourne la liste des épisodes spéciaux""" + return [ep for ep in self.episodes if ep.special] + + def get_regular_episodes(self) -> List[Episode]: + """Retourne la liste des épisodes réguliers""" + return [ep for ep in self.episodes if not ep.special] + + def check_completeness(self) -> Dict[str, Any]: + """Vérifie la complétude de la série""" + regular_episodes = self.get_regular_episodes() + episode_numbers = [ep.episode for ep in regular_episodes] + + result = { + 'total_found': len(regular_episodes), + 'specials_found': len(self.get_specials()), + 'missing_episodes': [], + 'duplicate_episodes': [], + 'is_complete': False + } + + if self.total_episodes: + expected = set(range(1, self.total_episodes + 1)) + found = set(episode_numbers) + result['missing_episodes'] = list(expected - found) + + # Vérification des doublons + from collections import Counter + counts = Counter(episode_numbers) + result['duplicate_episodes'] = [num for num, count in counts.items() if count > 1] + + result['is_complete'] = ( + len(result['missing_episodes']) == 0 and + len(result['duplicate_episodes']) == 0 + ) + + return result + + +@dataclass +class DirectoryCompatibilityResult: + """Résultat de la vérification de compatibilité de répertoire""" + is_compatible: bool = False + path: Path = field(default_factory=Path) + errors: List[str] = field(default_factory=list) + warnings: List[str] = field(default_factory=list) + found_series: List[str] = field(default_factory=list) + total_episodes: int = 0 + total_size: int = 0 + permissions: Dict[str, bool] = field(default_factory=dict) + disk_space: Dict[str, int] = field(default_factory=dict) + + def add_error(self, error: str): + """Ajoute une erreur""" + self.errors.append(error) + self.is_compatible = False + + def add_warning(self, warning: str): + """Ajoute un avertissement""" + self.warnings.append(warning) + + def summary(self) -> str: + """Retourne un résumé du résultat""" + status = "✅ Compatible" if self.is_compatible else "❌ Incompatible" + return f"{status} - {len(self.found_series)} séries, {self.total_episodes} épisodes" \ No newline at end of file diff --git a/src/ui/__init__.py b/src/ui/__init__.py new file mode 100644 index 0000000..aef63b8 --- /dev/null +++ b/src/ui/__init__.py @@ -0,0 +1,390 @@ +""" +Interface utilisateur en ligne de commande - Projet de la Légion de Muyue +""" + +import sys +import os +from pathlib import Path +from typing import Dict, Any, List, Optional +import logging + +from .core import AnimeLibrarianCore + +logger = logging.getLogger(__name__) + + +class AnimeLibrarianUI: + """Interface utilisateur interactive pour AnimeLibrarian""" + + def __init__(self, core: AnimeLibrarianCore): + """ + Initialise l'interface utilisateur + + Args: + core: Cœur de l'application + """ + self.core = core + self.current_series_list = [] + self.selected_series = [] + + logger.info("Interface utilisateur initialisée") + + def run(self, preselected_directory: str = None): + """ + Lance l'interface interactive + + Args: + preselected_directory: Répertoire pré-sélectionné (contourne la sélection) + """ + self._print_banner() + + try: + # Étape 1: Sélection du répertoire + directory = self._select_directory(preselected_directory) + if not directory: + return + + # Étape 2: Scan des séries + self._scan_series(directory) + if not self.current_series_list: + print("❌ Aucune série trouvée dans ce répertoire.") + return + + # Étape 3: Sélection des séries + self._select_series() + if not self.selected_series: + print("❌ Aucune série sélectionnée.") + return + + # Étape 4: Configuration des opérations + operations = self._configure_operations() + + # Étape 5: Exécution des opérations + self._execute_operations(operations) + + # Étape 6: Rapport final + self._generate_report() + + except KeyboardInterrupt: + print("\n\n⚠️ Opération annulée par l'utilisateur.") + except Exception as e: + print(f"\n❌ Erreur inattendue: {e}") + logger.error(f"Erreur UI: {e}") + + def _print_banner(self): + """Affiche la bannière de l'application""" + print("\n" + "="*60) + print("🎬 AnimeLibrarian - Organisation de collections d'anime") + print("="*60) + print("📋 Projet développé par la Légion de Muyue") + print("="*60) + print() + + def _select_directory(self, preselected_directory: str = None) -> Optional[str]: + """Sélection du répertoire contenant les séries""" + + if preselected_directory: + directory = preselected_directory + print(f"📁 Répertoire pré-sélectionné: {directory}") + else: + directory = input("📁 Entrez le chemin du répertoire contenant vos séries: ").strip() + + if not directory: + print("❌ Aucun répertoire spécifié.") + return None + + # Vérification de compatibilité + print(f"\n🔍 Vérification de la compatibilité du répertoire...") + result = self.core.check_directory_compatibility(directory) + + if not result.get("is_compatible"): + print("❌ Le répertoire n'est pas compatible:") + for error in result.get("errors", []): + print(f" • {error}") + + # Affichage des recommandations + recommendations = self.core.directory_checker.get_compatibility_recommendations( + type('Result', (), result)() + ) + if recommendations: + print("\n💡 Recommandations:") + for rec in recommendations: + print(f" • {rec}") + + return None + + print("✅ Le répertoire est compatible!") + + # Résumé du répertoire + print(f" 📊 Séries trouvées: {len(result.get('found_series', []))}") + print(f" 📺 Épisodes trouvés: {result.get('total_episodes', 0)}") + print(f" 💾 Taille totale: {self._format_size(result.get('total_size', 0))}") + + return directory + + def _scan_series(self, directory: str): + """Scan des séries dans le répertoire""" + print(f"\n🔍 Scan des séries en cours...") + + try: + self.current_series_list = self.core.scan_series(directory) + print(f"✅ {len(self.current_series_list)} séries trouvées!") + except Exception as e: + print(f"❌ Erreur lors du scan: {e}") + raise + + def _select_series(self): + """Sélection des séries à traiter""" + print("\n📋 Séries disponibles:") + print("-" * 60) + + for i, series in enumerate(self.current_series_list): + status = "✅" if series["is_complete"] else "⚠️" + episodes = series["total_episodes"] + size = self._format_size(series["total_size"]) + + print(f"{i+1:2d}. {status} {series['name']}") + print(f" 📺 {episodes} épisodes • 💾 {size}") + + # Afficher les problèmes si non complète + if not series["is_complete"]: + if series["completeness"]["missing_episodes"]: + print(f" ⚠️ Manque: {series['completeness']['missing_episodes']}") + if series["completeness"]["duplicate_episodes"]: + print(f" ⚠️ Doublons: {series['completeness']['duplicate_episodes']}") + + print("\n" + "-"*60) + + # Sélection + selection = input("🎯 Choisissez les séries à traiter (ex: 1,3,5-8, * pour tout): ").strip() + + if not selection: + self.selected_series = [] + return + + # Traitement de la sélection + indices = self._parse_selection(selection, len(self.current_series_list)) + + if not indices: + print("❌ Sélection invalide.") + self.selected_series = [] + return + + # Conversion en objets séries + self.selected_series = [ + self.current_series_list[i] for i in indices + ] + + print(f"✅ {len(self.selected_series)} série(s) sélectionnée(s):") + for series in self.selected_series: + print(f" • {series['name']}") + + def _configure_operations(self) -> Dict[str, bool]: + """Configuration des opérations à effectuer""" + print("\n⚙️ Configuration des opérations:") + print("-" * 40) + + operations = {} + + # Vérification des numéros d'épisodes + if self._confirm_operation("Vérifier les numéros d'épisodes avec trace.moe?"): + operations["verify_episodes"] = True + else: + operations["verify_episodes"] = False + + # Vérification de l'intégrité + if self._confirm_operation("Vérifier l'intégrité des fichiers?"): + operations["verify_integrity"] = True + else: + operations["verify_integrity"] = False + + # Renommage + if self._confirm_operation("Renommer les fichiers selon les standards TVDB?"): + operations["rename_files"] = True + + # Options de renommage + if self._confirm_operation(" • Créer des sauvegardes avant renommage? (recommandé)"): + operations["backup_files"] = True + else: + operations["backup_files"] = False + else: + operations["rename_files"] = False + operations["backup_files"] = False + + return operations + + def _execute_operations(self, operations: Dict[str, bool]): + """Exécute les opérations configurées""" + print("\n🚀 Exécution des opérations...") + + indices = [self.current_series_list.index(series) for series in self.selected_series] + + # 1. Vérification des épisodes + if operations.get("verify_episodes"): + print(f"\n🔍 Vérification des numéros d'épisodes...") + try: + results = self.core.verify_episodes_numbers(indices) + self._display_episode_verification_results(results) + except Exception as e: + print(f"❌ Erreur lors de la vérification des épisodes: {e}") + + # 2. Vérification d'intégrité + if operations.get("verify_integrity"): + print(f"\n🛡️ Vérification de l'intégrité des fichiers...") + try: + results = self.core.verify_files_integrity(indices) + self._display_integrity_results(results) + except Exception as e: + print(f"❌ Erreur lors de la vérification d'intégrité: {e}") + + # 3. Renommage + if operations.get("rename_files"): + print(f"\n📝 Renommage des fichiers...") + + # Configuration du renommage + self.core.file_renamer.backup_original = operations.get("backup_files", True) + + # Confirmation finale + if self._confirm_operation("⚠️ Confirmer le renommage des fichiers?"): + try: + results = self.core.rename_files(indices, dry_run=False) + self._display_rename_results(results) + except Exception as e: + print(f"❌ Erreur lors du renommage: {e}") + else: + print("❌ Renommage annulé.") + + def _display_episode_verification_results(self, results: Dict[str, Any]): + """Affiche les résultats de vérification des épisodes""" + print(f"✅ {results['series_verified']} séries vérifiées") + print(f"📺 {results['episodes_verified']} épisodes vérifiés") + + for series_result in results["verification_results"]: + series_name = series_result["series_name"] + summary = series_result["summary"] + + print(f"\n📋 {series_name}:") + print(f" ✅ Correspondances: {summary['matches']}/{summary['total']}") + print(f" 📊 Taux de réussite: {summary['match_rate']:.1%}") + + if summary['mismatches'] > 0: + print(f" ⚠️ Incohérences: {summary['mismatches']}") + + def _display_integrity_results(self, results: Dict[str, Any]): + """Affiche les résultats de vérification d'intégrité""" + print(f"📊 Fichiers vérifiés: {results['files_checked']}") + print(f"✅ Fichiers valides: {results['valid_files']}") + + if results["invalid_files"]: + print(f"❌ Fichiers invalides: {len(results['invalid_files'])}") + for file_info in results["invalid_files"][:5]: # Limite à 5 + print(f" • {file_info['filename']}: {file_info['issue']}") + + if len(results["invalid_files"]) > 5: + print(f" ...et {len(results['invalid_files']) - 5} autres fichiers") + + if results["duplicates"]: + print(f"🔄 Doublons détectés: {len(results['duplicates'])}") + for dup_group in results["duplicates"][:3]: # Limite à 3 + print(f" • {', '.join(dup_group)}") + + def _display_rename_results(self, results: Dict[str, Any]): + """Affiche les résultats du renommage""" + stats = results["stats"] + + print(f"📝 Renommage terminé:") + print(f" ✅ Fichiers renommés: {stats['renamed']}") + print(f" 💾 Sauvegardes créées: {stats['backups_created']}") + print(f" ❌ Erreurs: {stats['errors']}") + + if results["rename_plan"]: + print(f"\n📋 Plan de renommage ({len(results['rename_plan'])} fichiers):") + for result in results["rename_results"][:5]: # Limite à 5 + if result["success"]: + print(f" ✅ {result['old_path'].name} → {result['new_path'].name}") + else: + print(f" ❌ {result['old_path'].name}: {result['error']}") + + if len(results["rename_results"]) > 5: + print(f" ...et {len(results['rename_results']) - 5} autres fichiers") + + def _generate_report(self): + """Génère un rapport final""" + print("\n" + "="*60) + print("📊 RAPPORT FINAL - AnimeLibrarian") + print("="*60) + + # Statistiques globales + print(f"🎯 Séries traitées: {len(self.selected_series)}") + + total_episodes = sum(series["total_episodes"] for series in self.selected_series) + print(f"📺 Épisodes analysés: {total_episodes}") + + total_size = sum(series["total_size"] for series in self.selected_series) + print(f"💾 Taille totale: {self._format_size(total_size)}") + + # Statistiques de l'application + status = self.core.get_application_status() + print(f"🔗 TVDB configuré: {'Oui' if status['tvdb_configured'] else 'Non'}") + print(f"🔍 Trace.moe configuré: {'Oui' if status['trace_moe_configured'] else 'Non'}") + + print("\n✅ Opération terminée avec succès!") + print("Merci d'utiliser AnimeLibrarian - Projet de la Légion de Muyue") + print("="*60) + + def _confirm_operation(self, message: str) -> bool: + """Demande confirmation à l'utilisateur""" + response = input(f"{message} (o/n): ").strip().lower() + return response in ('o', 'oui', 'y', 'yes') + + def _parse_selection(self, selection: str, max_index: int) -> List[int]: + """Parse une sélection de séries (1,3,5-8, *)""" + if selection.strip() == '*': + return list(range(max_index)) + + indices = set() + + parts = selection.split(',') + for part in parts: + part = part.strip() + + if '-' in part: + # Range (5-8) + try: + start, end = part.split('-', 1) + start = int(start.strip()) + end = int(end.strip()) + + # Validation + if 1 <= start <= max_index and 1 <= end <= max_index: + indices.update(range(start - 1, end)) + except ValueError: + continue + else: + # Single number + try: + idx = int(part) + if 1 <= idx <= max_index: + indices.add(idx - 1) + except ValueError: + continue + + return sorted(indices) + + def _format_size(self, size_bytes: int) -> str: + """Formate une taille en octets en format lisible""" + if not size_bytes: + return "0 B" + + units = ["B", "KB", "MB", "GB", "TB"] + unit_index = 0 + size = float(size_bytes) + + while size >= 1024 and unit_index < len(units) - 1: + size /= 1024 + unit_index += 1 + + if unit_index == 0: + return f"{int(size)} {units[unit_index]}" + else: + return f"{size:.1f} {units[unit_index]}" \ No newline at end of file diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..f7295d2 --- /dev/null +++ b/src/utils/__init__.py @@ -0,0 +1,16 @@ +""" +Utils et helpers pour AnimeLibrarian - Projet de la Légion de Muyue +""" + +from .config import load_config, save_config, get_config_template, validate_config, setup_directories, mask_sensitive_data +from .logging import setup_logging + +__all__ = [ + 'load_config', + 'save_config', + 'get_config_template', + 'validate_config', + 'setup_directories', + 'mask_sensitive_data', + 'setup_logging' +] \ No newline at end of file diff --git a/src/utils/config.py b/src/utils/config.py new file mode 100644 index 0000000..701f221 --- /dev/null +++ b/src/utils/config.py @@ -0,0 +1,294 @@ +""" +Fichier de configuration pour AnimeLibrarian - Projet de la Légion de Muyue +""" + +import json +import os +from pathlib import Path +from typing import Dict, Any, Optional +import logging + +logger = logging.getLogger(__name__) + + +DEFAULT_CONFIG = { + # Configuration générale + "language": "fra", + "log_level": "INFO", + "temp_directory": "/tmp/animelibrarian", + + # Configuration TVDB + "thetvdb_api_key": None, + "thetvdb_base_url": "https://api.thetvdb.com", + + # Configuration trace.moe + "trace_moe_api_key": None, + "trace_moe_base_url": "https://api.trace.moe", + "trace_moe_rate_limit": 1.0, + "trace_moe_timeout": 30, + "trace_moe_max_retries": 3, + + # Configuration du scan de fichiers + "video_extensions": [".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm", + ".m4v", ".mpg", ".mpeg", ".3gp", ".ts", ".m2ts", ".ogv"], + "max_scan_depth": 10, + "ignore_patterns": [".git", ".svn", ".hg", "__pycache__", "node_modules", + ".vscode", ".idea", "Thumbs.db", ".DS_Store"], + + # Configuration de détection multimédia + "min_video_size": 50 * 1024 * 1024, # 50 Mo + "max_video_size": 50 * 1024 * 1024 * 1024, # 50 Go + "min_video_duration": 300, # 5 minutes + + # Configuration du renommage + "backup_original": True, + "backup_directory": ".backups", + "include_absolute_number": True, + "include_episode_title": True, + "include_technical_info": True, + "dry_run": False, + + # Configuration du cache + "enable_cache": True, + "cache_directory": ".cache", + "cache_expiry_hours": 24, + + # Configuration de la compatibilité + "min_free_space_percent": 5, + "min_free_space_bytes": 1024 * 1024 * 1024, # 1 Go + + # Configuration de l'interface + "max_display_items": 20, + "confirm_operations": True +} + + +def load_config(config_path: str = "config.json") -> Dict[str, Any]: + """ + Charge la configuration depuis un fichier + + Args: + config_path: Chemin vers le fichier de configuration + + Returns: + Dictionnaire de configuration + """ + config = DEFAULT_CONFIG.copy() + + if not config_path: + return config + + config_file = Path(config_path) + + if not config_file.exists(): + logger.info(f"Fichier de configuration non trouvé: {config_path}") + logger.info("Utilisation de la configuration par défaut") + save_config(config, config_path) + return config + + try: + with open(config_file, 'r', encoding='utf-8') as f: + user_config = json.load(f) + + # Fusion avec la configuration par défaut + config.update(user_config) + logger.info(f"Configuration chargée depuis: {config_path}") + + except json.JSONDecodeError as e: + logger.error(f"Erreur de format JSON dans {config_path}: {e}") + logger.info("Utilisation de la configuration par défaut") + except Exception as e: + logger.error(f"Erreur lors du chargement de la configuration: {e}") + logger.info("Utilisation de la configuration par défaut") + + return config + + +def save_config(config: Dict[str, Any], config_path: str = "config.json"): + """ + Sauvegarde la configuration dans un fichier + + Args: + config: Configuration à sauvegarder + config_path: Chemin vers le fichier de configuration + """ + try: + config_file = Path(config_path) + + # Création du répertoire parent si nécessaire + config_file.parent.mkdir(parents=True, exist_ok=True) + + with open(config_file, 'w', encoding='utf-8') as f: + json.dump(config, f, indent=2, ensure_ascii=False) + + logger.info(f"Configuration sauvegardée dans: {config_path}") + + except Exception as e: + logger.error(f"Erreur lors de la sauvegarde de la configuration: {e}") + + +def get_config_template() -> str: + """Retourne un modèle de configuration avec commentaires""" + template = """{ + // Configuration générale + "language": "fra", + "log_level": "INFO", + "temp_directory": "/tmp/animelibrarian", + + // Configuration TVDB - Obtenez votre clé API sur https://thetvdb.com/ + "thetvdb_api_key": null, + "thetvdb_base_url": "https://api.thetvdb.com", + + // Configuration trace.moe - Clé API optionnelle pour des limites plus élevées + "trace_moe_api_key": null, + "trace_moe_base_url": "https://api.trace.moe", + "trace_moe_rate_limit": 1.0, + "trace_moe_timeout": 30, + "trace_moe_max_retries": 3, + + // Extensions vidéo supportées + "video_extensions": [ + ".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm", + ".m4v", ".mpg", ".mpeg", ".3gp", ".ts", ".m2ts", ".ogv" + ], + + // Configuration du scan + "max_scan_depth": 10, + "ignore_patterns": [ + ".git", ".svn", ".hg", "__pycache__", "node_modules", + ".vscode", ".idea", "Thumbs.db", ".DS_Store" + ], + + // Configuration de détection multimédia + "min_video_size": 52428800, + "max_video_size": 53687091200, + "min_video_duration": 300, + + // Configuration du renommage + "backup_original": true, + "backup_directory": ".backups", + "include_absolute_number": true, + "include_episode_title": true, + "include_technical_info": true, + "dry_run": false, + + // Configuration du cache + "enable_cache": true, + "cache_directory": ".cache", + "cache_expiry_hours": 24, + + // Configuration de la compatibilité + "min_free_space_percent": 5, + "min_free_space_bytes": 1073741824, + + // Configuration de l'interface + "max_display_items": 20, + "confirm_operations": true +}""" + + return template + + +def validate_config(config: Dict[str, Any]) -> Dict[str, Any]: + """ + Valide la configuration et retourne les problèmes + + Args: + config: Configuration à valider + + Returns: + Dictionnaire avec les erreurs et avertissements + """ + result = { + "errors": [], + "warnings": [], + "is_valid": True + } + + # Validation des clés API + if not config.get("thetvdb_api_key"): + result["warnings"].append("Clé API TVDB non configurée - fonctionnalités limitées") + + # Validation des chemins + temp_dir = Path(config.get("temp_directory", "/tmp/animelibrarian")) + if not temp_dir.parent.exists(): + result["errors"].append(f"Répertoire parent invalide pour temp_directory: {temp_dir.parent}") + + # Validation des valeurs numériques + if config.get("min_video_size", 0) < 0: + result["errors"].append("min_video_size doit être positif") + + if config.get("max_video_size", 0) <= config.get("min_video_size", 0): + result["errors"].append("max_video_size doit être supérieur à min_video_size") + + if config.get("trace_moe_rate_limit", 0) <= 0: + result["errors"].append("trace_moe_rate_limit doit être positif") + + # Validation des extensions + extensions = config.get("video_extensions", []) + if not extensions or not all(ext.startswith('.') for ext in extensions): + result["errors"].append("video_extensions doit contenir des extensions valides (ex: .mp4)") + + # Validation du niveau de log + valid_log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] + if config.get("log_level", "").upper() not in valid_log_levels: + result["errors"].append(f"log_level doit être l'une de: {', '.join(valid_log_levels)}") + + result["is_valid"] = len(result["errors"]) == 0 + + return result + + +def setup_directories(config: Dict[str, Any]): + """ + Crée les répertoires nécessaires selon la configuration + + Args: + config: Configuration de l'application + """ + directories = [] + + # Répertoire temporaire + if config.get("temp_directory"): + directories.append(Path(config["temp_directory"])) + + # Répertoire de sauvegarde + if config.get("backup_original") and config.get("backup_directory"): + # Ce sera créé relativement au répertoire de travail + pass + + # Répertoire de cache + if config.get("enable_cache") and config.get("cache_directory"): + directories.append(Path(config["cache_directory"])) + + for directory in directories: + try: + directory.mkdir(parents=True, exist_ok=True) + logger.debug(f"Répertoire créé/vérifié: {directory}") + except Exception as e: + logger.warning(f"Impossible de créer le répertoire {directory}: {e}") + + +def mask_sensitive_data(config: Dict[str, Any]) -> Dict[str, Any]: + """ + Masque les données sensibles pour l'affichage + + Args: + config: Configuration originale + + Returns: + Configuration avec les données sensibles masquées + """ + masked = config.copy() + + # Liste des clés sensibles + sensitive_keys = [ + "thetvdb_api_key", + "trace_moe_api_key" + ] + + for key in sensitive_keys: + if key in masked and masked[key]: + masked[key] = "***masqué***" + + return masked \ No newline at end of file diff --git a/src/utils/logging.py b/src/utils/logging.py new file mode 100644 index 0000000..1ff2be1 --- /dev/null +++ b/src/utils/logging.py @@ -0,0 +1,69 @@ +""" +Configuration du logging pour AnimeLibrarian - Projet de la Légion de Muyue +""" + +import logging +import logging.handlers +import os +from pathlib import Path +from typing import Optional + + +def setup_logging(verbose: bool = False, log_file: Optional[str] = None): + """ + Configure le système de logging + + Args: + verbose: Active le mode verbeux (DEBUG) + log_file: Fichier de log (optionnel) + """ + + # Niveau de log + level = logging.DEBUG if verbose else logging.INFO + + # Configuration du formatter + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # Configuration du logger racine + root_logger = logging.getLogger() + root_logger.setLevel(level) + + # Suppression des handlers existants + for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + + # Handler console + console_handler = logging.StreamHandler() + console_handler.setLevel(level) + console_handler.setFormatter(formatter) + root_logger.addHandler(console_handler) + + # Handler fichier si spécifié + if log_file: + try: + log_path = Path(log_file) + log_path.parent.mkdir(parents=True, exist_ok=True) + + file_handler = logging.handlers.RotatingFileHandler( + log_file, + maxBytes=10 * 1024 * 1024, # 10 Mo + backupCount=5 + ) + file_handler.setLevel(logging.DEBUG) # Toujours DEBUG dans les fichiers + file_handler.setFormatter(formatter) + root_logger.addHandler(file_handler) + + except Exception as e: + print(f"⚠️ Impossible de créer le fichier de log {log_file}: {e}") + + # Configuration spécifique pour les modules externes + external_modules = [ + 'urllib3.connectionpool', + 'requests.packages.urllib3.connectionpool' + ] + + for module in external_modules: + logging.getLogger(module).setLevel(logging.WARNING) \ No newline at end of file