Add complete AnimeLibrarian implementation

- Full application structure with core, API clients, and UI modules
- Directory compatibility checker with comprehensive validation
- TheTVDB API integration for metadata and standardized naming
- trace.moe API integration for episode verification
- File renamer with TVDB format compliance
- Interactive CLI interface with detailed reporting
- Configuration system with validation and defaults
- Comprehensive error handling and logging
- Support for backup and dry-run operations
- Project developed by the Légion de Muyue

💘 Generated with Crush

Assisted-by: GLM-4.6 via Crush <crush@charm.land>
This commit is contained in:
2025-12-21 18:24:06 +01:00
parent 1ba3055895
commit 2605f08feb
17 changed files with 3828 additions and 0 deletions

69
main.py Normal file
View File

@@ -0,0 +1,69 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
AnimeLibrarian - Outil d'organisation et de vérification de collections d'anime
Point d'entrée principal de l'application
"""
import os
import sys
import argparse
from pathlib import Path
# Ajout du répertoire courant au path pour importer les modules locaux
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from src.core import AnimeLibrarianCore
from src.ui import AnimeLibrarianUI
from src.utils import setup_logging, load_config
def main():
"""Point d'entrée principal de l'application"""
parser = argparse.ArgumentParser(
description="AnimeLibrarian - Outil d'organisation et de vérification de collections d'anime"
)
parser.add_argument(
"--config", "-c",
type=str,
help="Chemin vers le fichier de configuration",
default="config.json"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Active le mode verbeux"
)
parser.add_argument(
"--directory", "-d",
type=str,
help="Répertoire de base à analyser (contournement de la sélection interactive)"
)
args = parser.parse_args()
# Configuration du logging
setup_logging(verbose=args.verbose)
# Chargement de la configuration
config = load_config(args.config)
# Initialisation du cœur de l'application
core = AnimeLibrarianCore(config)
# Initialisation de l'interface utilisateur
ui = AnimeLibrarianUI(core)
# Lancement de l'interface interactive
ui.run(preselected_directory=args.directory)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nOpération annulée par l'utilisateur.")
sys.exit(0)
except Exception as e:
print(f"Erreur: {e}")
sys.exit(1)

27
requirements.txt Normal file
View File

@@ -0,0 +1,27 @@
# AnimeLibrarian - Dépendances Python
# Projet développé par la Légion de Muyue
# Core dependencies
requests>=2.28.0
pathlib2>=2.3.0
# Video processing (optional but recommended)
ffmpeg-python>=0.2.0
# Image processing (for trace.moe)
Pillow>=9.0.0
# Data handling
dataclasses>=0.8; python_version<"3.7"
# Logging and configuration
pyyaml>=6.0
# Development dependencies (optional)
pytest>=7.0.0
pytest-cov>=4.0.0
black>=22.0.0
flake8>=5.0.0
# Optional: Advanced video information
# ffprobe-python>=0.1.0

3
src/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
# AnimeLibrarian - Package principal
__version__ = "1.0.0"
__author__ = "AnimeLibrarian Team"

1
src/api/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Package api

491
src/api/thetvdb_client.py Normal file
View File

@@ -0,0 +1,491 @@
"""
Client API pour TheTVDB - Projet de la Légion de Muyue
"""
import json
import time
import requests
from typing import Dict, Any, List, Optional, Tuple
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class TheTVDBClient:
"""Client pour l'API TheTVDB"""
BASE_URL = "https://api.thetvdb.com"
def __init__(self, api_key: str = None, language: str = "fra"):
"""
Initialise le client TheTVDB
Args:
api_key: Clé API TheTVDB (requise pour les requêtes authentifiées)
language: Code de langue pour les métadonnées ('fra', 'eng', etc.)
"""
self.api_key = api_key
self.language = language
self.token = None
self.token_expires = None
self.session = requests.Session()
# Headers par défaut
self.session.headers.update({
"Content-Type": "application/json",
"Accept": "application/json"
})
def login(self) -> bool:
"""
Authentifie le client auprès de l'API TheTVDB
Returns:
bool: True si l'authentification a réussi
"""
if not self.api_key:
logger.warning("Aucune clé API TheTVDB fournie - utilisation en mode limité")
return False
try:
auth_data = {
"apikey": self.api_key
}
response = self.session.post(
f"{self.BASE_URL}/login",
json=auth_data,
timeout=10
)
if response.status_code == 200:
auth_response = response.json()
self.token = auth_response.get("token")
if self.token:
# Ajout du token aux headers
self.session.headers.update({
"Authorization": f"Bearer {self.token}"
})
# Le token expire après 24 heures
self.token_expires = datetime.now() + timedelta(hours=24)
logger.info("Authentification TheTVDB réussie")
return True
else:
logger.error("Token non trouvé dans la réponse TheTVDB")
return False
else:
logger.error(f"Échec de l'authentification TheTVDB: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"Erreur lors de l'authentification TheTVDB: {e}")
return False
def ensure_authenticated(self) -> bool:
"""Vérifie que le client est authentifié et ré-authentifie si nécessaire"""
# Si pas de clé API, on ne peut pas s'authentifier
if not self.api_key:
return False
# Si on a un token valide
if self.token and self.token_expires and datetime.now() < self.token_expires:
return True
# Sinon, on s'authentifie
return self.login()
def search_series(self, name: str) -> List[Dict[str, Any]]:
"""
Recherche des séries par nom
Args:
name: Nom de la série à rechercher
Returns:
Liste des séries trouvées
"""
try:
# Ajout de paramètres pour les résultats pertinents
params = {
"name": name,
"type": "series"
}
# Utilisation de l'endpoint public pour la recherche
url = f"{self.BASE_URL}/search/series"
response = self.session.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
return data.get("data", [])
elif response.status_code == 401 and self.ensure_authenticated():
# Réessayer avec authentification
response = self.session.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
return data.get("data", [])
logger.error(f"Erreur recherche série {name}: {response.status_code} - {response.text}")
return []
except Exception as e:
logger.error(f"Erreur lors de la recherche de série {name}: {e}")
return []
def get_series_by_id(self, series_id: int) -> Optional[Dict[str, Any]]:
"""
Récupère les informations détaillées d'une série par son ID
Args:
series_id: ID TheTVDB de la série
Returns:
Dictionnaire avec les informations de la série ou None
"""
try:
url = f"{self.BASE_URL}/series/{series_id}"
params = {"lang": self.language}
response = self.session.get(url, params=params, timeout=10)
if response.status_code == 200:
return response.json().get("data")
elif response.status_code == 401 and self.ensure_authenticated():
response = self.session.get(url, params=params, timeout=10)
if response.status_code == 200:
return response.json().get("data")
logger.error(f"Erreur récupération série {series_id}: {response.status_code}")
return None
except Exception as e:
logger.error(f"Erreur lors de la récupération de la série {series_id}: {e}")
return None
def get_episodes(self, series_id: int, page: int = 0) -> List[Dict[str, Any]]:
"""
Récupère les épisodes d'une série
Args:
series_id: ID TheTVDB de la série
page: Page de résultats (pagination)
Returns:
Liste des épisodes
"""
try:
url = f"{self.BASE_URL}/series/{series_id}/episodes/query"
params = {
"lang": self.language,
"page": page,
"pageSize": 100 # Maximum par page
}
response = self.session.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
return data.get("data", [])
elif response.status_code == 401 and self.ensure_authenticated():
response = self.session.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
return data.get("data", [])
logger.error(f"Erreur récupération épisodes série {series_id}: {response.status_code}")
return []
except Exception as e:
logger.error(f"Erreur lors de la récupération des épisodes de la série {series_id}: {e}")
return []
def get_all_episodes(self, series_id: int) -> List[Dict[str, Any]]:
"""
Récupère tous les épisodes d'une série (toutes les pages)
Args:
series_id: ID TheTVDB de la série
Returns:
Liste complète de tous les épisodes
"""
all_episodes = []
page = 0
while True:
episodes = self.get_episodes(series_id, page)
if not episodes:
break
all_episodes.extend(episodes)
page += 1
# Petite pause pour éviter de surcharger l'API
time.sleep(0.1)
logger.info(f"Récupéré {len(all_episodes)} épisodes pour la série {series_id}")
return all_episodes
def get_episode_by_number(self, series_id: int, season: int, episode: int) -> Optional[Dict[str, Any]]:
"""
Récupère un épisode spécifique par son numéro
Args:
series_id: ID TheTVDB de la série
season: Numéro de saison
episode: Numéro d'épisode
Returns:
Dictionnaire avec les informations de l'épisode ou None
"""
try:
url = f"{self.BASE_URL}/series/{series_id}/episodes/{season}/{episode}"
params = {"lang": self.language}
response = self.session.get(url, params=params, timeout=10)
if response.status_code == 200:
return response.json().get("data")
elif response.status_code == 401 and self.ensure_authenticated():
response = self.session.get(url, params=params, timeout=10)
if response.status_code == 200:
return response.json().get("data")
return None
except Exception as e:
logger.error(f"Erreur lors de la récupération de l'épisode S{season:02d}E{episode:02d}: {e}")
return None
def get_series_artwork(self, series_id: int) -> List[Dict[str, Any]]:
"""
Récupère les artworks (posters, bannières) d'une série
Args:
series_id: ID TheTVDB de la série
Returns:
Liste des artworks
"""
try:
url = f"{self.BASE_URL}/series/{series_id}/images/query"
params = {
"keyType": "poster,series",
"lang": self.language
}
response = self.session.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
return data.get("data", [])
elif response.status_code == 401 and self.ensure_authenticated():
response = self.session.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
return data.get("data", [])
return []
except Exception as e:
logger.error(f"Erreur lors de la récupération des artworks de la série {series_id}: {e}")
return []
def build_episode_map(self, series_id: int) -> Dict[Tuple[int, int], Dict[str, Any]]:
"""
Construit une map des épisodes pour lookup rapide
Returns:
Dict avec (saison, épisode) -> données épisode
"""
episodes = self.get_all_episodes(series_id)
episode_map = {}
for ep in episodes:
season = ep.get("airedSeason", 1)
episode_num = ep.get("airedEpisodeNumber", 0)
if season and episode_num:
episode_map[(season, episode_num)] = ep
logger.info(f"Map d'épisodes construite: {len(episode_map)} entrées")
return episode_map
def get_episode_title(self, series_id: int, season: int, episode: int, fallback_title: str = None) -> str:
"""
Récupère le titre d'un épisode
Args:
series_id: ID TheTVDB de la série
season: Numéro de saison
episode: Numéro d'épisode
fallback_title: Titre par défaut si non trouvé
Returns:
Titre de l'épisode
"""
episode_data = self.get_episode_by_number(series_id, season, episode)
if episode_data and episode_data.get("episodeName"):
return episode_data["episodeName"]
# Utiliser le titre de fallback ou générer un titre générique
if fallback_title:
return fallback_title
return f"Episode {episode:02d}"
def search_best_match(self, search_name: str, year: int = None) -> Optional[Dict[str, Any]]:
"""
Recherche la meilleure correspondance pour une série
Args:
search_name: Nom à rechercher
year: Année de sortie pour affiner la recherche
Returns:
Meilleure correspondance ou None
"""
search_term = search_name
# Ajouter l'année si disponible
if year:
search_term = f"{search_term} ({year})"
results = self.search_series(search_term)
if not results and year:
# Réessayer sans l'année
results = self.search_series(search_name)
if not results:
return None
# Score de correspondance
best_match = None
best_score = 0
search_lower = search_name.lower()
for series in results:
name = series.get("seriesName", "").lower()
# Score basique de similarité
score = 0
# Correspondance exacte
if name == search_lower:
score = 100
# Correspondance partielle
elif search_lower in name or name in search_lower:
score = 70
# Correspondance de mots
else:
search_words = set(search_lower.split())
name_words = set(name.split())
common_words = search_words.intersection(name_words)
score = len(common_words) * 20
# Bonus pour l'année
if year:
series_year = self._extract_year_from_series(series)
if series_year == year:
score += 15
elif abs(series_year - year) <= 1:
score += 5
# Mise à jour de la meilleure correspondance
if score > best_score:
best_score = score
best_match = series
# Seuil minimum de confiance
if best_score >= 50:
logger.info(f"Meilleure correspondance pour '{search_name}': {best_match.get('seriesName')} (score: {best_score})")
return best_match
logger.warning(f"Pas de correspondance suffisante pour '{search_name}' (meilleur score: {best_score})")
return None
def _extract_year_from_series(self, series: Dict[str, Any]) -> int:
"""Extrait l'année de la série"""
first_aired = series.get("firstAired", "")
if first_aired:
try:
return datetime.strptime(first_aired, "%Y-%m-%d").year
except:
pass
return 0
def get_recommended_format(self, series: Dict[str, Any], episode_info: Dict[str, Any]) -> str:
"""
Génère le nom de fichier recommandé selon le format TheTVDB
Args:
series: Informations de la série
episode_info: Informations de l'épisode
Returns:
Nom de fichier formaté
"""
series_name = series.get("seriesName", "Unknown Series")
season = episode_info.get("season", 1)
episode = episode_info.get("episode", 0)
title = episode_info.get("title", "")
# Nettoyage et formatage
clean_series = self._clean_string(series_name)
clean_title = self._clean_string(title) if title else ""
# Construction du nom
if episode_info.get("special", False):
# Épisode spécial
formatted = f"{clean_series} - S00E{episode:02d}"
else:
# Épisode normal
formatted = f"{clean_series} - S{season:02d}E{episode:02d}"
# Ajout du titre
if clean_title:
formatted += f" - {clean_title}"
return formatted
def _clean_string(self, text: str) -> str:
"""Nettoie une chaîne pour le formatage de nom de fichier"""
if not text:
return ""
# Suppression des caractères invalides
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
text = text.replace(char, '')
# Normalisation des espaces
text = text.replace(' ', ' ').strip()
return text
def get_languages(self) -> List[Dict[str, Any]]:
"""Récupère la liste des langues disponibles"""
try:
url = f"{self.BASE_URL}/languages"
response = self.session.get(url, timeout=10)
if response.status_code == 200:
return response.json().get("data", [])
return []
except Exception as e:
logger.error(f"Erreur lors de la récupération des langues: {e}")
return []

429
src/api/tracemoe_client.py Normal file
View File

@@ -0,0 +1,429 @@
"""
Client pour l'API trace.moe - Projet de la Légion de Muyue
"""
import base64
import json
import time
import requests
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple
import logging
logger = logging.getLogger(__name__)
class TraceMoeClient:
"""Client pour l'API trace.moe de reconnaissance de scènes d'anime"""
BASE_URL = "https://api.trace.moe"
def __init__(self, config: Dict[str, Any] = None):
"""
Initialise le client trace.moe
Args:
config: Configuration du client
"""
self.config = config or {}
self.api_key = self.config.get("trace_moe_api_key")
self.session = requests.Session()
# Configuration des limites
self.max_retries = self.config.get("max_retries", 3)
self.retry_delay = self.config.get("retry_delay", 1.0)
self.timeout = self.config.get("timeout", 30)
# Limites de l'API
self.rate_limit = self.config.get("rate_limit", 1.0) # secondes entre requêtes
self.last_request_time = 0
def check_rate_limit(self):
"""Vérifie et respecte les limites de taux de l'API"""
current_time = time.time()
elapsed = current_time - self.last_request_time
if elapsed < self.rate_limit:
sleep_time = self.rate_limit - elapsed
logger.debug(f"Rate limit: attente de {sleep_time:.1f}s")
time.sleep(sleep_time)
self.last_request_time = time.time()
def extract_frame_from_video(self, video_path: Path, timestamp: float = None) -> Optional[bytes]:
"""
Extrait une frame d'une vidéo pour analyse
Args:
video_path: Chemin vers le fichier vidéo
timestamp: Temps en secondes (sinon 60s ou 30% de la durée)
Returns:
Image en bytes ou None en cas d'erreur
"""
try:
# Import ici pour éviter les dépendances si non utilisé
import subprocess
# Détermination du timestamp
if timestamp is None:
# Essayer d'obtenir la durée de la vidéo
try:
cmd = [
'ffprobe', '-v', 'quiet', '-show_entries',
'format=duration', '-of', 'csv=p=0', str(video_path)
]
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
duration = float(result.stdout.strip())
timestamp = min(duration * 0.3, 60) # 30% de la durée, max 60s
else:
timestamp = 60
except:
timestamp = 60
# Extraction de la frame avec ffmpeg
cmd = [
'ffmpeg', '-i', str(video_path),
'-ss', str(timestamp),
'-frames:v', '1',
'-f', 'image2pipe',
'-vcodec', 'png',
'-'
]
result = subprocess.run(
cmd, capture_output=True, timeout=20
)
if result.returncode == 0 and result.stdout:
return result.stdout
else:
logger.error(f"Erreur extraction frame: {result.stderr.decode('utf-8')}")
return None
except subprocess.TimeoutExpired:
logger.error("Timeout lors de l'extraction de frame")
return None
except Exception as e:
logger.error(f"Erreur lors de l'extraction de frame: {e}")
return None
def analyze_frame(self, frame_data: bytes) -> Optional[Dict[str, Any]]:
"""
Analyse une frame avec l'API trace.moe
Args:
frame_data: Données de l'image en bytes
Returns:
Réponse de l'API ou None en cas d'erreur
"""
self.check_rate_limit()
try:
# Encodage en base64
encoded_image = base64.b64encode(frame_data).decode('utf-8')
# Préparation de la requête
url = f"{self.BASE_URL}/search"
data = {
"image": encoded_image
}
# Ajout de la clé API si disponible
if self.api_key:
data["key"] = self.api_key
# Requête avec retry
for attempt in range(self.max_retries):
try:
response = self.session.post(
url, json=data, timeout=self.timeout
)
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
# Too Many Requests - augmenter le délai
logger.warning("API rate limit atteint, augmentation du délai")
time.sleep(self.rate_limit * (attempt + 1))
continue
else:
logger.error(f"Erreur API trace.moe: {response.status_code} - {response.text}")
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay)
continue
except requests.exceptions.Timeout:
logger.error(f"Timeout API (tentative {attempt + 1})")
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay * (attempt + 1))
continue
except Exception as e:
logger.error(f"Erreur requête API (tentative {attempt + 1}): {e}")
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay)
continue
return None
except Exception as e:
logger.error(f"Erreur lors de l'analyse de frame: {e}")
return None
def search_video(self, video_path: Path, timestamp: float = None) -> Optional[Dict[str, Any]]:
"""
Analyse un fichier vidéo complet
Args:
video_path: Chemin vers le fichier vidéo
timestamp: Timestamp spécifique à analyser
Returns:
Résultat de l'analyse ou None
"""
logger.debug(f"Analyse de la vidéo: {video_path.name}")
# Extraction de frame
frame_data = self.extract_frame_from_video(video_path, timestamp)
if not frame_data:
logger.error(f"Impossible d'extraire une frame de {video_path}")
return None
# Analyse de la frame
result = self.analyze_frame(frame_data)
if result:
# Ajout d'informations supplémentaires
result['source_file'] = str(video_path)
result['timestamp_used'] = timestamp
result['frame_size'] = len(frame_data)
logger.debug(f"Analyse réussie pour {video_path.name}")
return result
def identify_anime_episode(self, video_path: Path, timestamp: float = None) -> Optional[Dict[str, Any]]:
"""
Identifie l'anime et l'épisode à partir d'une vidéo
Args:
video_path: Chemin vers le fichier vidéo
timestamp: Timestamp spécifique à analyser
Returns:
Informations détaillées sur l'épisode identifié
"""
result = self.search_video(video_path, timestamp)
if not result:
return None
# Traitement des résultats
return self._process_trace_moe_result(result)
def _process_trace_moe_result(self, result: Dict[str, Any]) -> Dict[str, Any]:
"""Traite et formate les résultats de trace.moe"""
processed = {
"success": False,
"matches": [],
"best_match": None,
"error": None
}
try:
# Vérification des erreurs
if result.get("error"):
processed["error"] = result["error"]
return processed
# Extraction des résultats
docs = result.get("result", [])
if not docs:
processed["error"] = "Aucune correspondance trouvée"
return processed
processed["matches"] = docs
# Identification de la meilleure correspondance
if docs:
best_match = docs[0] # Le premier est généralement le plus probable
processed["best_match"] = {
"anime_title": best_match.get("anilist", {}).get("title", {}).get("romaji", ""),
"anime_title_en": best_match.get("anilist", {}).get("title", {}).get("english", ""),
"anime_title_native": best_match.get("anilist", {}).get("title", {}).get("native", ""),
"episode": best_match.get("episode", None),
"timestamp": best_match.get("from", None),
"similarity": best_match.get("similarity", 0),
"anilist_id": best_match.get("anilist", {}).get("id", None),
"mal_id": best_match.get("anilist", {}).get("idMal", None),
"filename": best_match.get("filename", ""),
"season": best_match.get("season", None)
}
# Conversion du timestamp
if best_match.get("from"):
processed["best_match"]["timestamp_formatted"] = self._format_timestamp(
best_match.get("from")
)
# Validation du seuil de confiance
confidence = best_match.get("similarity", 0)
processed["high_confidence"] = confidence >= 0.85
processed["medium_confidence"] = confidence >= 0.70
processed["success"] = True
except Exception as e:
processed["error"] = f"Erreur lors du traitement des résultats: {e}"
logger.error(f"Erreur traitement trace.moe: {e}")
return processed
def _format_timestamp(self, seconds: float) -> str:
"""Formate un timestamp en HH:MM:SS"""
if not seconds:
return "00:00:00"
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
def verify_episode_number(self, video_path: Path, expected_episode: int,
expected_series: str = None) -> Dict[str, Any]:
"""
Vérifie si un fichier vidéo correspond à l'épisode attendu
Args:
video_path: Chemin vers le fichier vidéo
expected_episode: Numéro d'épisode attendu
expected_series: Nom de série attendu (optionnel)
Returns:
Résultat de la vérification
"""
verification = {
"success": False,
"episode_match": False,
"series_match": False,
"confidence": 0,
"identified_episode": None,
"identified_series": None,
"error": None
}
try:
# Analyse de la vidéo
result = self.identify_anime_episode(video_path)
if not result or not result["success"]:
verification["error"] = result.get("error", "Erreur d'identification")
return verification
best_match = result["best_match"]
if not best_match:
verification["error"] = "Aucune correspondance trouvée"
return verification
# Vérification de l'épisode
identified_episode = best_match.get("episode")
verification["identified_episode"] = identified_episode
verification["confidence"] = best_match.get("similarity", 0)
if identified_episode is not None and identified_episode == expected_episode:
verification["episode_match"] = True
# Vérification de la série si spécifiée
if expected_series:
anime_titles = [
best_match.get("anime_title", "").lower(),
best_match.get("anime_title_en", "").lower(),
best_match.get("anime_title_native", "").lower()
]
expected_lower = expected_series.lower()
for title in anime_titles:
if title and (expected_lower in title or title in expected_lower):
verification["series_match"] = True
break
verification["identified_series"] = best_match.get("anime_title")
# Succès si l'épisode correspond
verification["success"] = verification["episode_match"]
except Exception as e:
verification["error"] = f"Erreur lors de la vérification: {e}"
logger.error(f"Erreur vérification épisode: {e}")
return verification
def batch_verify_episodes(self, episodes: List[Tuple[Path, int, str]]) -> List[Dict[str, Any]]:
"""
Vérification par lot de plusieurs épisodes
Args:
episodes: Liste de tuples (video_path, expected_episode, expected_series)
Returns:
Liste des résultats de vérification
"""
results = []
for video_path, expected_episode, expected_series in episodes:
logger.info(f"Vérification de {video_path.name}")
result = self.verify_episode_number(
video_path, expected_episode, expected_series
)
results.append(result)
# Petite pause entre les vérifications pour respecter les limites
time.sleep(self.rate_limit)
return results
def get_api_limits(self) -> Dict[str, Any]:
"""Retourne les informations sur les limites de l'API"""
return {
"rate_limit": self.rate_limit,
"max_retries": self.max_retries,
"timeout": self.timeout,
"has_api_key": bool(self.api_key)
}
def test_connection(self) -> Dict[str, Any]:
"""Teste la connexion à l'API trace.moe"""
try:
# Création d'une petite image de test
import io
from PIL import Image
# Création d'une image 1x1 noir
img = Image.new('RGB', (1, 1), color='black')
img_bytes = io.BytesIO()
img.save(img_bytes, format='PNG')
img_bytes.seek(0)
# Test avec cette image
result = self.analyze_frame(img_bytes.read())
return {
"success": True,
"api_responding": result is not None,
"result": result
}
except Exception as e:
return {
"success": False,
"error": str(e)
}

464
src/core/__init__.py Normal file
View File

@@ -0,0 +1,464 @@
"""
Cœur de l'application AnimeLibrarian - Projet de la Légion de Muyue
"""
import os
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple
import logging
from .directory_checker import DirectoryChecker
from .file_scanner import FileScanner
from .media_detector import MediaDetector
from .file_renamer import FileRenamer
from ..api.thetvdb_client import TheTVDBClient
from ..api.tracemoe_client import TraceMoeClient
from ..models.episode import Series, Episode
logger = logging.getLogger(__name__)
class AnimeLibrarianCore:
"""Cœur logique de l'application AnimeLibrarian"""
def __init__(self, config: Dict[str, Any] = None):
"""
Initialise le cœur de l'application
Args:
config: Configuration de l'application
"""
self.config = config or {}
# Initialisation des composants
self.directory_checker = DirectoryChecker(config)
self.file_scanner = FileScanner(config)
self.media_detector = MediaDetector(config)
self.file_renamer = FileRenamer(config)
# Clients API
self.tvdb_client = TheTVDBClient(
api_key=self.config.get("thetvdb_api_key"),
language=self.config.get("language", "fra")
)
self.trace_moe_client = TraceMoeClient(self.config)
# État de l'application
self.current_directory = None
self.series_list = []
self.selected_series = []
logger.info("Cœur d'AnimeLibrarian initialisé")
def check_directory_compatibility(self, directory_path: str) -> Dict[str, Any]:
"""
Vérifie la compatibilité d'un répertoire
Args:
directory_path: Chemin du répertoire à vérifier
Returns:
Résultat détaillé de la vérification
"""
logger.info(f"Vérification de compatibilité: {directory_path}")
result = self.directory_checker.check_directory(directory_path)
if result.is_compatible:
self.current_directory = Path(directory_path)
logger.info(f"Répertoire compatible: {directory_path}")
else:
logger.warning(f"Répertoire incompatible: {directory_path}")
return result.__dict__
def scan_series(self, directory_path: str = None) -> List[Dict[str, Any]]:
"""
Scan les séries dans le répertoire
Args:
directory_path: Chemin du répertoire (utilise le courant si None)
Returns:
Liste des séries trouvées
"""
if directory_path:
self.current_directory = Path(directory_path)
if not self.current_directory:
raise ValueError("Aucun répertoire spécifié ou compatible")
logger.info(f"Scan des séries dans: {self.current_directory}")
# Scan des fichiers multimédia
media_files = self.file_scanner.scan_directory(
self.current_directory,
{'.mp4', '.mkv', '.avi', '.mov', '.wmv', '.flv', '.webm',
'.m4v', '.mpg', '.mpeg', '.3gp', '.ts', '.m2ts', '.ogv'}
)
# Détection de la structure des séries
series_structure = self._group_files_by_series(media_files)
# Création des objets Series
self.series_list = []
for series_name, files in series_structure.items():
series = self._create_series(series_name, files)
self.series_list.append(series)
logger.info(f"Trouvé {len(self.series_list)} séries")
return [self._serialize_series(series) for series in self.series_list]
def verify_episodes_numbers(self, series_indices: List[int]) -> Dict[str, Any]:
"""
Vérifie les numéros d'épisodes avec trace.moe
Args:
series_indices: Indices des séries à vérifier
Returns:
Résultats de la vérification
"""
logger.info(f"Vérification des numéros d'épisodes pour {len(series_indices)} séries")
results = {
"series_verified": 0,
"episodes_verified": 0,
"verification_results": [],
"errors": []
}
# Authentification TVDB si possible
self.tvdb_client.login()
for idx in series_indices:
if 0 <= idx < len(self.series_list):
series = self.series_list[idx]
logger.info(f"Vérification de la série: {series.name}")
try:
series_result = self._verify_series_episodes(series)
results["verification_results"].append(series_result)
results["series_verified"] += 1
results["episodes_verified"] += len(series.episodes)
except Exception as e:
error_msg = f"Erreur vérification série {series.name}: {e}"
logger.error(error_msg)
results["errors"].append(error_msg)
logger.info(f"Vérification terminée: {results['series_verified']} séries, {results['episodes_verified']} épisodes")
return results
def verify_files_integrity(self, series_indices: List[int]) -> Dict[str, Any]:
"""
Vérifie l'intégrité des fichiers
Args:
series_indices: Indices des séries à vérifier
Returns:
Résultats de la vérification d'intégrité
"""
logger.info(f"Vérification d'intégrité pour {len(series_indices)} séries")
results = {
"files_checked": 0,
"valid_files": 0,
"invalid_files": [],
"issues": [],
"duplicates": []
}
# Collection de tous les fichiers
all_files = []
for idx in series_indices:
if 0 <= idx < len(self.series_list):
series = self.series_list[idx]
all_files.extend(series.episodes)
# Vérification de chaque fichier
for episode in all_files:
metadata = self.media_detector.analyze_media_file(episode.file_path)
results["files_checked"] += 1
if metadata.get("is_valid", False):
results["valid_files"] += 1
# Mise à jour des métadonnées de l'épisode
episode.duration = metadata.get("duration")
episode.resolution = metadata.get("resolution")
episode.codec = metadata.get("codec")
episode.verified = True
else:
results["invalid_files"].append({
"filename": episode.filename,
"path": str(episode.file_path),
"issue": metadata.get("error", "Fichier invalide")
})
results["issues"].append(f"Fichier invalide: {episode.filename}")
# Détection des doublons
file_info_list = [{"path": ep.file_path, "size": ep.file_size} for ep in all_files]
duplicates = self.file_scanner.get_duplicates(file_info_list)
if duplicates:
results["duplicates"] = [
[str(dup["path"].name) for dup in dup_group]
for dup_group in duplicates
]
results["issues"].append(f"{len(duplicates)} groupes de doublons détectés")
logger.info(f"Intégrité vérifiée: {results['valid_files']}/{results['files_checked']} fichiers valides")
return results
def rename_files(self, series_indices: List[int], dry_run: bool = False) -> Dict[str, Any]:
"""
Renomme les fichiers selon les normes TVDB
Args:
series_indices: Indices des séries à traiter
dry_run: Si True, simule le renommage sans exécuter
Returns:
Résultats du renommage
"""
logger.info(f"Renommage pour {len(series_indices)} séries (dry_run={dry_run})")
# Configuration du mode dry_run
self.file_renamer.dry_run = dry_run
results = {
"series_processed": 0,
"rename_plan": [],
"rename_results": [],
"stats": {}
}
# Authentification TVDB si possible
self.tvdb_client.login()
for idx in series_indices:
if 0 <= idx < len(self.series_list):
series = self.series_list[idx]
logger.info(f"Préparation du renommage pour: {series.name}")
# Préparation du plan de renommage
rename_plan = self.file_renamer.prepare_rename_plan(series)
results["rename_plan"].extend(rename_plan)
# Exécution
if rename_plan:
rename_results = self.file_renamer.execute_rename_plan(rename_plan)
results["rename_results"].extend(rename_results)
results["series_processed"] += 1
# Statistiques
results["stats"] = self.file_renamer.get_stats()
logger.info(f"Renommage terminé: {results['stats']['renamed']} fichiers renommés")
return results
def _group_files_by_series(self, media_files: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
"""Groupe les fichiers par série"""
import re
series_structure = {}
for file_info in media_files:
filename = file_info['path'].name
# Extraction du nom de série
patterns = [
r'^\[?([^\[\]]+?)\]?\s*[-_.]?\s*S\d{1,2}E\d{1,2}',
r'^([^-_\[\]]+?)(?:\s*[-_.]\s*)?(?:S\d{1,2}E\d{1,2}|Episode\s*\d{1,3}|E\d{1,3})',
r'^([^-_()]+)(?:\s*[-_.]\s*)?\d{1,3}',
]
series_name = filename # Fallback
for pattern in patterns:
match = re.search(pattern, filename, re.IGNORECASE)
if match:
series_name = match.group(1).strip()
break
# Nettoyage du nom
series_name = re.sub(r'[^\w\s-]', '', series_name)
series_name = re.sub(r'\s+', ' ', series_name).strip()
if series_name not in series_structure:
series_structure[series_name] = []
series_structure[series_name].append(file_info)
return series_structure
def _create_series(self, series_name: str, files: List[Dict[str, Any]]) -> Series:
"""Crée un objet Series à partir de fichiers"""
series = Series(
name=series_name,
directory=self.current_directory / series_name if self.current_directory else Path("."),
total_episodes=0
)
# Recherche TVDB
tvdb_match = self.tvdb_client.search_best_match(series_name)
if tvdb_match:
series.tvdb_id = tvdb_match.get("id")
series.total_episodes = tvdb_match.get("totalEpisodes")
logger.debug(f"Série trouvée sur TVDB: {series_name} -> ID: {series.tvdb_id}")
# Création des épisodes
episodes = []
for file_info in files:
# Analyse du nom de fichier
episode_info = self.media_detector.extract_episode_info(file_info['path'].name)
# Création de l'objet Episode
episode = Episode(
file_path=file_info['path'],
series_name=series_name,
season=episode_info['season'],
episode=episode_info['episode'],
title=episode_info['title'],
special=episode_info['special'],
file_size=file_info['size'],
resolution=file_info.get('resolution'),
codec=file_info.get('codec')
)
# Analyse multimédia
media_metadata = self.media_detector.analyze_media_file(file_info['path'])
if media_metadata:
episode.duration = media_metadata.get('duration')
episode.resolution = media_metadata.get('resolution') or episode.resolution
episode.codec = media_metadata.get('codec') or episode.codec
episode.verified = media_metadata.get('is_valid', False)
episodes.append(episode)
# Tri des épisodes
episodes.sort(key=lambda ep: (ep.season, ep.episode))
# Ajout à la série
for episode in episodes:
series.add_episode(episode)
return series
def _verify_series_episodes(self, series: Series) -> Dict[str, Any]:
"""Vérifie les épisodes d'une série avec trace.moe"""
result = {
"series_name": series.name,
"episodes_verified": 0,
"episode_results": [],
"summary": {}
}
# Préparation des épisodes à vérifier
episodes_to_verify = []
for episode in series.episodes:
if not episode.special: # Ne vérifier que les épisodes normaux
episodes_to_verify.append((episode.file_path, episode.episode, series.name))
# Vérification par lot
if episodes_to_verify:
verification_results = self.trace_moe_client.batch_verify_episodes(episodes_to_verify)
for i, (episode, verification_result) in enumerate(zip(series.episodes, verification_results)):
if not episode.special: # seulement les épisodes normaux
episode_result = {
"filename": episode.filename,
"expected_episode": episode.episode,
"identified_episode": verification_result.get("identified_episode"),
"confidence": verification_result.get("confidence", 0),
"match": verification_result.get("episode_match", False),
"error": verification_result.get("error")
}
result["episode_results"].append(episode_result)
result["episodes_verified"] += 1
# Résumé
matches = sum(1 for r in result["episode_results"] if r.get("match", False))
result["summary"] = {
"total": len(result["episode_results"]),
"matches": matches,
"mismatches": len(result["episode_results"]) - matches,
"match_rate": matches / len(result["episode_results"]) if result["episode_results"] else 0
}
return result
def _serialize_series(self, series: Series) -> Dict[str, Any]:
"""Sérialise un objet Series pour l'UI"""
completeness = series.check_completeness()
return {
"name": series.name,
"directory": str(series.directory),
"total_episodes": len(series.episodes),
"regular_episodes": len(series.get_regular_episodes()),
"special_episodes": len(series.get_specials()),
"tvdb_id": series.tvdb_id,
"completeness": completeness,
"is_complete": completeness.get("is_complete", False),
"missing_episodes": completeness.get("missing_episodes", []),
"duplicate_episodes": completeness.get("duplicate_episodes", []),
"total_size": sum(ep.file_size for ep in series.episodes)
}
def get_series_details(self, series_index: int) -> Optional[Dict[str, Any]]:
"""
Retourne les détails d'une série
Args:
series_index: Index de la série
Returns:
Détails de la série ou None
"""
if 0 <= series_index < len(self.series_list):
series = self.series_list[series_index]
return {
"info": self._serialize_series(series),
"episodes": [
{
"filename": ep.filename,
"season": ep.season,
"episode": ep.episode,
"title": ep.title,
"special": ep.special,
"duration": ep.duration,
"resolution": ep.resolution,
"codec": ep.codec,
"file_size": ep.file_size,
"verified": ep.verified,
"absolute_number": ep.absolute_number
}
for ep in series.episodes
]
}
return None
def get_application_status(self) -> Dict[str, Any]:
"""Retourne le statut actuel de l'application"""
return {
"current_directory": str(self.current_directory) if self.current_directory else None,
"series_count": len(self.series_list),
"total_episodes": sum(len(series.episodes) for series in self.series_list),
"tvdb_configured": bool(self.config.get("thetvdb_api_key")),
"trace_moe_configured": bool(self.config.get("trace_moe_api_key")),
"tvdb_authenticated": self.tvdb_client.token is not None,
"trace_moe_limits": self.trace_moe_client.get_api_limits()
}

View File

@@ -0,0 +1,333 @@
"""
Vérificateur de compatibilité des répertoires pour AnimeLibrarian
"""
import os
import shutil
import stat
from pathlib import Path
from typing import List, Dict, Any, Optional
import logging
from ..models.episode import DirectoryCompatibilityResult, Series, Episode
from .file_scanner import FileScanner
from .media_detector import MediaDetector
logger = logging.getLogger(__name__)
class DirectoryChecker:
"""Vérifie si un répertoire est compatible avec AnimeLibrarian"""
def __init__(self, config: Dict[str, Any] = None):
self.config = config or {}
self.file_scanner = FileScanner(config)
self.media_detector = MediaDetector()
# Extensions vidéo supportées
self.video_extensions = {
'.mp4', '.mkv', '.avi', '.mov', '.wmv', '.flv', '.webm',
'.m4v', '.mpg', '.mpeg', '.3gp', '.ts', '.m2ts', '.ogv'
}
# Tailles minimales/maximales pour les fichiers vidéo
self.min_video_size = 50 * 1024 * 1024 # 50 Mo
self.max_video_size = 50 * 1024 * 1024 * 1024 # 50 Go
def check_directory(self, directory_path: str) -> DirectoryCompatibilityResult:
"""
Vérifie la compatibilité d'un répertoire
Args:
directory_path: Chemin du répertoire à vérifier
Returns:
DirectoryCompatibilityResult: Résultat détaillé de la vérification
"""
logger.info(f"Vérification du répertoire: {directory_path}")
path = Path(directory_path).resolve()
result = DirectoryCompatibilityResult(path=path)
# 1. Vérification de base du répertoire
self._check_basic_directory(path, result)
if not result.is_compatible:
return result
# 2. Vérification des permissions
self._check_permissions(path, result)
# 3. Vérification de l'espace disque
self._check_disk_space(path, result)
# 4. Analyse des séries et épisodes
if not self._analyze_media_structure(path, result):
return result
# 5. Vérification finale de compatibilité
self._finalize_compatibility(result)
logger.info(f"Vérification terminée: {result.summary()}")
return result
def _check_basic_directory(self, path: Path, result: DirectoryCompatibilityResult):
"""Vérifications de base du répertoire"""
# Existence du répertoire
if not path.exists():
result.add_error(f"Le répertoire n'existe pas: {path}")
return
# C'est bien un répertoire
if not path.is_dir():
result.add_error(f"Le chemin n'est pas un répertoire: {path}")
return
# Accessibilité
if not os.access(path, os.R_OK):
result.add_error(f"Impossible de lire le répertoire: {path}")
return
# Non-vide
try:
items = list(path.iterdir())
if not items:
result.add_error("Le répertoire est vide")
return
except PermissionError:
result.add_error(f"Permission refusée lors de l'accès à: {path}")
return
logger.debug("✅ Vérifications de base réussies")
def _check_permissions(self, path: Path, result: DirectoryCompatibilityResult):
"""Vérifie les permissions du répertoire"""
permissions = {}
# Lecture
permissions['read'] = os.access(path, os.R_OK)
# Écriture
permissions['write'] = os.access(path, os.W_OK)
# Exécution (navigation)
permissions['execute'] = os.access(path, os.X_OK)
# Propriétaire
try:
stat_info = path.stat()
permissions['is_owner'] = (stat_info.st_uid == os.getuid())
except:
permissions['is_owner'] = False
result.permissions = permissions
# Ajout des avertissements si nécessaire
if not permissions['write']:
result.add_warning("Le répertoire n'est pas accessible en écriture (renommage impossible)")
if not permissions['execute']:
result.add_error("Impossible de naviguer dans le répertoire")
result.is_compatible = False
logger.debug(f"Permissions: {permissions}")
def _check_disk_space(self, path: Path, result: DirectoryCompatibilityResult):
"""Vérifie l'espace disque disponible"""
try:
stat_info = shutil.disk_usage(path)
total = stat_info.total
free = stat_info.free
used = stat_info.used
result.disk_space = {
'total': total,
'free': free,
'used': used,
'free_percent': round((free / total) * 100, 2)
}
# Avertissement si moins de 10% d'espace libre
if result.disk_space['free_percent'] < 10:
result.add_warning(f"Moins de 10% d'espace disque disponible ({result.disk_space['free_percent']:.1f}%)")
# Erreur si moins de 1 Go
if free < 1024 * 1024 * 1024:
result.add_warning("Moins de 1 Go d'espace disque disponible")
except Exception as e:
result.add_warning(f"Impossible de vérifier l'espace disque: {e}")
logger.debug(f"Espace disque: {result.disk_space}")
def _analyze_media_structure(self, path: Path, result: DirectoryCompatibilityResult) -> bool:
"""
Analyse la structure des médias dans le répertoire
Returns:
bool: True si la structure est valide
"""
try:
# Scan des fichiers multimédia
media_files = self.file_scanner.scan_directory(path, self.video_extensions)
if not media_files:
result.add_error("Aucun fichier multimédia trouvé")
return False
result.total_episodes = len(media_files)
# Analyse de la structure des répertoires
series_found = self._detect_series_structure(path, media_files)
if not series_found:
result.add_error("Structure de séries non détectée (doit contenir des sous-répertoires par série)")
return False
result.found_series = list(series_found.keys())
# Calcul de la taille totale
total_size = 0
for file_info in media_files:
total_size += file_info['size']
result.total_size = total_size
# Validation de la qualité des fichiers
self._validate_media_files(media_files, result)
logger.debug(f"Séries trouvées: {len(result.found_series)}, Épisodes: {result.total_episodes}")
return True
except Exception as e:
result.add_error(f"Erreur lors de l'analyse des médias: {e}")
return False
def _detect_series_structure(self, path: Path, media_files: List[Dict]) -> Dict[str, List[Dict]]:
"""
Détecte la structure des séries
Returns:
Dict: Nom de série -> Liste de fichiers
"""
series_structure = {}
# Analyse de la structure de répertoires
for item in path.iterdir():
if item.is_dir():
# Vérifier si le répertoire contient des vidéos
series_videos = self.file_scanner.scan_directory(item, self.video_extensions)
if series_videos:
series_name = item.name
series_structure[series_name] = series_videos
# Si pas de sous-répertoires, essayer de grouper par nom de fichier
if not series_structure:
series_structure = self._group_flat_files(media_files)
return series_structure
def _group_flat_files(self, media_files: List[Dict]) -> Dict[str, List[Dict]]:
"""Groupe les fichiers plats par nom de série"""
import re
series_structure = {}
for file_info in media_files:
filename = file_info['path'].stem
# Tentative d'extraction du nom de série
# Format attendu: [SeriesName] S01E01 ou SeriesName S01E01
pattern = r'^\[?([^\[\]]+)\]?\s*[_-]?\s*S\d{1,2}E\d{1,2}'
match = re.search(pattern, filename, re.IGNORECASE)
if match:
series_name = match.group(1).strip()
else:
# Si pas de format SxxEyy, utiliser le début du nom
pattern = r'^([^-_]+)'
match = re.search(pattern, filename)
series_name = match.group(1).strip() if match else filename
if series_name not in series_structure:
series_structure[series_name] = []
series_structure[series_name].append(file_info)
return series_structure
def _validate_media_files(self, media_files: List[Dict], result: DirectoryCompatibilityResult):
"""Valide la qualité des fichiers multimédia"""
invalid_files = []
for file_info in media_files:
file_path = file_info['path']
file_size = file_info['size']
# Vérification de la taille
if file_size < self.min_video_size:
invalid_files.append(f"{file_path.name} (trop petit: {file_size / 1024 / 1024:.1f} Mo)")
continue
if file_size > self.max_video_size:
result.add_warning(f"{file_path.name} (taille inhabituelle: {file_size / 1024 / 1024:.1f} Mo)")
# Vérification de l'extention
if file_path.suffix.lower() not in self.video_extensions:
invalid_files.append(f"{file_path.name} (extension non supportée)")
if invalid_files:
result.add_error(f"Fichiers invalides détectés: {', '.join(invalid_files[:5])}")
if len(invalid_files) > 5:
result.add_error(f"...et {len(invalid_files) - 5} autres fichiers")
def _finalize_compatibility(self, result: DirectoryCompatibilityResult):
"""Finalise la décision de compatibilité"""
# Si on a des erreurs, c'est incompatible
if result.errors:
result.is_compatible = False
return
# Vérifications minimales
if len(result.found_series) == 0:
result.add_error("Aucune série détectée")
result.is_compatible = False
return
if result.total_episodes == 0:
result.add_error("Aucun épisode détecté")
result.is_compatible = False
return
# Si on peut écrire, c'est compatible
if result.permissions.get('write', False):
result.is_compatible = True
else:
# On peut quand même utiliser en lecture seule
result.is_compatible = True
result.add_warning("Mode lecture seule (renommage impossible)")
def get_compatibility_recommendations(self, result: DirectoryCompatibilityResult) -> List[str]:
"""Retourne des recommandations pour améliorer la compatibilité"""
recommendations = []
if result.errors:
recommendations.append("Corrigez les erreurs avant de continuer")
if not result.permissions.get('write', False):
recommendations.append("Assurez-vous d'avoir les droits d'écriture pour le renommage")
if result.disk_space.get('free_percent', 100) < 10:
recommendations.append("Libérez de l'espace disque avant de commencer")
if len(result.found_series) == 0:
recommendations.append("Organisez vos fichiers en sous-répertoires par série")
recommendations.append("Exemple: /Series/NomDeLaSaison/S01E01.mkv")
if result.total_episodes > 0 and result.total_size == 0:
recommendations.append("Certains fichiers semblent corrompus ou vides")
return recommendations

475
src/core/file_renamer.py Normal file
View File

@@ -0,0 +1,475 @@
"""
Module de renommage de fichiers - Projet de la Légion de Muyue
"""
import os
import shutil
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple
import logging
from ..models.episode import Episode, Series
from ..api.thetvdb_client import TheTVDBClient
logger = logging.getLogger(__name__)
class FileRenamer:
"""Gère le renommage des fichiers selon les normes TVDB"""
def __init__(self, config: Dict[str, Any] = None):
"""
Initialise le module de renommage
Args:
config: Configuration du renommage
"""
self.config = config or {}
# Options de renommage
self.dry_run = self.config.get("dry_run", False)
self.backup_original = self.config.get("backup_original", True)
self.include_absolute_number = self.config.get("include_absolute_number", True)
self.include_episode_title = self.config.get("include_episode_title", True)
self.include_technical_info = self.config.get("include_technical_info", True)
# Client TVDB pour les titres
self.tvdb_client = TheTVDBClient(
api_key=self.config.get("thetvdb_api_key"),
language=self.config.get("language", "fra")
)
# Cache pour les titres d'épisodes
self.title_cache = {}
# Statistiques
self.stats = {
"renamed": 0,
"skipped": 0,
"errors": 0,
"backups_created": 0
}
def prepare_rename_plan(self, series: Series) -> List[Dict[str, Any]]:
"""
Prépare un plan de renommage pour une série
Args:
series: Série à traiter
Returns:
Liste des opérations de renommage planifiées
"""
logger.info(f"Préparation du plan de renommage pour: {series.name}")
# Authentification TVDB si possible
self.tvdb_client.login()
# Recherche de la série sur TVDB
tvdb_series = None
if series.tvdb_id:
tvdb_series = self.tvdb_client.get_series_by_id(series.tvdb_id)
else:
tvdb_series = self.tvdb_client.search_best_match(series.name)
if tvdb_series:
series.tvdb_id = tvdb_series.get("id")
# Construction de la map d'épisodes TVDB si disponible
episode_map = {}
if tvdb_series and series.tvdb_id:
episode_map = self.tvdb_client.build_episode_map(series.tvdb_id)
# Préparation du plan de renommage
rename_plan = []
for episode in series.episodes:
# Titre de l'épisode
episode_title = self._get_episode_title(episode, tvdb_series, episode_map)
# Génération du nouveau nom
new_name = self._generate_new_name(episode, series.name, episode_title)
# Vérification si le renommage est nécessaire
if new_name != episode.filename:
rename_plan.append({
"episode": episode,
"old_path": episode.file_path,
"new_name": new_name,
"new_path": episode.file_path.parent / new_name,
"reason": self._get_rename_reason(episode, series.name, episode_title),
"tvdb_matched": bool(episode_title and episode_title != f"Episode {episode.episode:02d}")
})
logger.info(f"Plan de renommage: {len(rename_plan)} fichiers à renommer")
return rename_plan
def execute_rename_plan(self, rename_plan: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Exécute un plan de renommage
Args:
rename_plan: Plan de renommage à exécuter
Returns:
Résultats des opérations de renommage
"""
logger.info(f"Exécution du plan de renommage: {len(rename_plan)} opérations")
results = []
for operation in rename_plan:
result = self._rename_file(operation)
results.append(result)
if result["success"]:
self.stats["renamed"] += 1
if result["backup_created"]:
self.stats["backups_created"] += 1
else:
self.stats["errors"] += 1
logger.info(f"Plan exécuté: {self.stats['renamed']} renommés, {self.stats['errors']} erreurs")
return results
def _rename_file(self, operation: Dict[str, Any]) -> Dict[str, Any]:
"""
Renomme un fichier spécifique
Args:
operation: Détails de l'opération de renommage
Returns:
Résultat de l'opération
"""
episode = operation["episode"]
old_path = operation["old_path"]
new_path = operation["new_path"]
result = {
"episode": episode,
"old_path": old_path,
"new_path": new_path,
"success": False,
"error": None,
"backup_created": False,
"dry_run": self.dry_run
}
try:
# Vérifications préalables
if not old_path.exists():
result["error"] = "Le fichier source n'existe plus"
return result
if new_path.exists():
result["error"] = "Le fichier de destination existe déjà"
return result
if not old_path.parent.exists():
result["error"] = "Le répertoire source n'existe plus"
return result
# Vérification des permissions
if not os.access(old_path, os.R_OK):
result["error"] = "Permission de lecture refusée"
return result
if not os.access(old_path.parent, os.W_OK):
result["error"] = "Permission d'écriture refusée"
return result
# Mode dry-run: juste simulation
if self.dry_run:
result["success"] = True
result["message"] = f"[DRY-RUN] {old_path.name} -> {new_path.name}"
logger.info(f"[DRY-RUN] Renommage simulé: {old_path.name} -> {new_path.name}")
return result
# Création de la sauvegarde si demandé
backup_path = None
if self.backup_original:
backup_path = self._create_backup(old_path)
result["backup_created"] = backup_path is not None
# Renommage effectif
try:
old_path.rename(new_path)
result["success"] = True
result["message"] = f"Renommé: {old_path.name} -> {new_path.name}"
logger.info(f"Fichier renommé: {old_path.name} -> {new_path.name}")
# Mise à jour du chemin dans l'épisode
episode.file_path = new_path
except Exception as rename_error:
# Restauration de la sauvegarde si disponible
if backup_path and backup_path.exists():
try:
shutil.copy2(backup_path, old_path)
logger.info(f"Sauvegarde restaurée suite à l'erreur: {old_path}")
except:
pass
result["error"] = f"Erreur lors du renommage: {rename_error}"
logger.error(f"Erreur renommage {old_path}: {rename_error}")
except Exception as e:
result["error"] = f"Erreur lors de la préparation du renommage: {e}"
logger.error(f"Erreur préparation renommage {old_path}: {e}")
return result
def _create_backup(self, file_path: Path) -> Optional[Path]:
"""
Crée une sauvegarde du fichier
Args:
file_path: Chemin du fichier à sauvegarder
Returns:
Chemin de la sauvegarde ou None si erreur
"""
try:
# Nom de la sauvegarde avec timestamp
import datetime
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"{file_path.stem}_backup_{timestamp}{file_path.suffix}"
backup_path = file_path.parent / ".backups" / backup_name
# Création du répertoire de sauvegarde
backup_path.parent.mkdir(exist_ok=True)
# Copie du fichier
shutil.copy2(file_path, backup_path)
logger.debug(f"Sauvegarde créée: {backup_path}")
return backup_path
except Exception as e:
logger.error(f"Erreur création sauvegarde {file_path}: {e}")
return None
def _get_episode_title(self, episode: Episode, tvdb_series: Dict[str, Any],
episode_map: Dict[Tuple[int, int], Dict[str, Any]]) -> str:
"""
Récupère le titre d'un épisode depuis TVDB
Args:
episode: Épisode concerné
tvdb_series: Série TVDB
episode_map: Map des épisodes TVDB
Returns:
Titre de l'épisode
"""
# Vérification du cache
cache_key = (episode.series_name, episode.season, episode.episode)
if cache_key in self.title_cache:
return self.title_cache[cache_key]
# Récupération depuis TVDB
title = None
if tvdb_series and episode_map:
episode_key = (episode.season, episode.episode)
if episode_key in episode_map:
title = episode_map[episode_key].get("episodeName")
# Fallback: utilisation du titre existant ou titre générique
if not title:
title = episode.title if episode.title else f"Episode {episode.episode:02d}"
# Mise en cache
self.title_cache[cache_key] = title
return title
def _generate_new_name(self, episode: Episode, series_name: str, episode_title: str) -> str:
"""
Génère le nouveau nom de fichier pour un épisode
Args:
episode: Épisode à renommer
series_name: Nom de la série
episode_title: Titre de l'épisode
Returns:
Nouveau nom de fichier
"""
# Nettoyage du nom de série
clean_series = self._clean_filename(series_name)
# Construction du nom de base
if episode.special:
base_name = f"{clean_series} - S00E{episode.episode:02d}"
else:
base_name = f"{clean_series} - S{episode.season:02d}E{episode.episode:02d}"
# Numéro absolu si disponible
if self.include_absolute_number and episode.absolute_number:
base_name += f" ({episode.absolute_number})"
# Titre de l'épisode
if self.include_episode_title and episode_title:
clean_title = self._clean_filename(episode_title)
base_name += f" - {clean_title}"
# Informations techniques
if self.include_technical_info:
technical_info = self._build_technical_info(episode)
if technical_info:
base_name += f" [{technical_info}]"
return f"{base_name}{episode.file_path.suffix}"
def _clean_filename(self, name: str) -> str:
"""Nettoie une chaîne pour l'utiliser dans un nom de fichier"""
if not name:
return ""
# Suppression des caractères invalides
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
name = name.replace(char, '')
# Remplacement des caractères problématiques
name = name.replace('\n', ' ').replace('\r', ' ')
# Normalisation des espaces
while ' ' in name:
name = name.replace(' ', ' ')
# Nettoyage au début et à la fin
name = name.strip(' -._')
return name
def _build_technical_info(self, episode: Episode) -> str:
"""Construit la chaîne d'informations techniques"""
parts = []
# Résolution
if episode.resolution:
parts.append(episode.resolution)
# Codec
if episode.codec:
parts.append(episode.codec)
# Source (extrait du nom si disponible)
source = self._extract_source_from_filename(episode.filename)
if source:
parts.append(source)
# Checksum si disponible
if episode.checksum and len(episode.checksum) >= 8:
parts.append(episode.checksum[:8])
return ' '.join(parts)
def _extract_source_from_filename(self, filename: str) -> Optional[str]:
"""Extrait la source du nom de fichier"""
import re
patterns = [
r'(BD|Blu-?Ray|BluRay)',
r'(WEB[-\s]?DL|WEBRip)',
r'(HDTV|TV)',
r'(DVD)'
]
for pattern in patterns:
match = re.search(pattern, filename, re.IGNORECASE)
if match:
return match.group(1).upper()
return None
def _get_rename_reason(self, episode: Episode, series_name: str, episode_title: str) -> str:
"""Génère la raison du renommage pour les logs"""
reasons = []
# Vérification du format SxxEyy
if not self._has_standard_episode_format(episode.filename):
reasons.append("Format SxxEyy manquant")
# Vérification du nom de série
if not self._has_correct_series_name(episode.filename, series_name):
reasons.append("Nom de série incorrect")
# Vérification du titre d'épisode
if self.include_episode_title and episode_title:
if not self._has_episode_title(episode.filename):
reasons.append("Titre d'épisode manquant")
# Vérification des informations techniques
if self.include_technical_info and not self._has_technical_info(episode.filename):
reasons.append("Informations techniques manquantes")
# Vérification de la propreté du nom
if self._has_invalid_characters(episode.filename):
reasons.append("Caractères invalides détectés")
return "; ".join(reasons) if reasons else "Formatage selon les standards TVDB"
def _has_standard_episode_format(self, filename: str) -> bool:
"""Vérifie si le nom de fichier utilise le format SxxEyy standard"""
import re
return bool(re.search(r'S\d{1,2}E\d{1,2}', filename, re.IGNORECASE))
def _has_correct_series_name(self, filename: str, series_name: str) -> bool:
"""Vérifie si le nom de série est correct"""
import re
clean_series = self._clean_filename(series_name).lower()
clean_filename = filename.lower()
return clean_series in clean_filename
def _has_episode_title(self, filename: str) -> bool:
"""Vérifie si le nom de fichier contient un titre d'épisode"""
# Heuristique: recherche de patterns typiques de titres
import re
# Patterns qui suggèrent un titre
patterns = [
r'S\d{1,2}E\d{1,2}\s*[-_]\s*[^.\[\]()]+', # S01E01 - Title
r'S\d{1,2}E\d{1,2}\s*([^.\[\]()]+)', # S01E01 Title
]
for pattern in patterns:
if re.search(pattern, filename, re.IGNORECASE):
return True
return False
def _has_technical_info(self, filename: str) -> bool:
"""Vérifie si le nom de fichier contient des informations techniques"""
import re
tech_patterns = [
r'\d{3,4}p', # 1080p, 720p
r'(H\.?264|H\.?265|x264|x265)', # Codecs
r'(BD|WEB|DVD|HDTV)' # Sources
]
for pattern in tech_patterns:
if re.search(pattern, filename, re.IGNORECASE):
return True
return False
def _has_invalid_characters(self, filename: str) -> bool:
"""Vérifie la présence de caractères invalides"""
invalid_chars = '<>:"|?*'
return any(char in filename for char in invalid_chars)
def get_stats(self) -> Dict[str, Any]:
"""Retourne les statistiques de renommage"""
return self.stats.copy()
def reset_stats(self):
"""Réinitialise les statistiques"""
self.stats = {
"renamed": 0,
"skipped": 0,
"errors": 0,
"backups_created": 0
}

239
src/core/file_scanner.py Normal file
View File

@@ -0,0 +1,239 @@
"""
Scanneur de fichiers pour AnimeLibrarian
"""
import os
import hashlib
from pathlib import Path
from typing import List, Dict, Any, Set, Optional
import logging
logger = logging.getLogger(__name__)
class FileScanner:
"""Scanne les fichiers multimédia dans les répertoires"""
def __init__(self, config: Dict[str, Any] = None):
self.config = config or {}
# Filtres de fichiers et répertoires à ignorer
self.ignore_patterns = {
# Répertoires
'.git', '.svn', '.hg', '.bzr', '__pycache__', 'node_modules', '.vscode',
'.idea', '.DS_Store', 'Thumbs.db', 'Desktop.ini', 'System Volume Information',
'$RECYCLE.BIN', 'Recycled', 'Temp', 'tmp', 'cache', 'Cache',
# Extensions de fichiers à ignorer
'.txt', '.log', '.nfo', '.sfv', '.md5', '.sha1', '.dat', '.db',
'.ini', '.conf', '.cfg', '.xml', '.json', '.yml', '.yaml',
'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.ico',
'.mp3', '.wav', '.flac', '.ogg', '.m4a', '.aac',
'.srt', '.ass', '.ssa', '.sub', '.idx', '.vtt',
'.zip', '.rar', '.7z', '.tar', '.gz', '.bz2', '.exe', '.dmg',
# Fichiers de samples
'sample', 'SAMPLE', 'Sample'
}
def scan_directory(self, directory: Path, extensions: Set[str],
recursive: bool = True, max_depth: int = 10) -> List[Dict[str, Any]]:
"""
Scanne un répertoire à la recherche de fichiers avec les extensions spécifiées
Args:
directory: Répertoire à scanner
extensions: Set d'extensions de fichiers à rechercher
recursive: Si True, scan récursivement les sous-répertoires
max_depth: Profondeur maximale de scan récursif
Returns:
Liste de dictionnaires avec informations sur chaque fichier
"""
logger.debug(f"Scan du répertoire: {directory} (extensions: {extensions})")
if not directory.exists() or not directory.is_dir():
logger.warning(f"Répertoire invalide: {directory}")
return []
media_files = []
try:
if recursive:
for depth, root, dirs, files in self._walk_directory(directory, max_depth):
# Filtrage des répertoires à ignorer
dirs[:] = [d for d in dirs if not self._should_ignore(d, is_directory=True)]
# Analyse des fichiers
for filename in files:
if self._should_ignore(filename):
continue
file_path = Path(root) / filename
if file_path.suffix.lower() in extensions:
file_info = self._analyze_file(file_path)
if file_info:
media_files.append(file_info)
else:
for item in directory.iterdir():
if item.is_file() and not self._should_ignore(item.name):
if item.suffix.lower() in extensions:
file_info = self._analyze_file(item)
if file_info:
media_files.append(file_info)
except PermissionError:
logger.error(f"Permission refusée lors du scan de: {directory}")
except Exception as e:
logger.error(f"Erreur lors du scan de {directory}: {e}")
logger.debug(f"Trouvé {len(media_files)} fichiers multimédia")
return media_files
def _walk_directory(self, directory: Path, max_depth: int):
"""Générateur pour parcourir les répertoires avec contrôle de profondeur"""
directory = directory.resolve()
for root, dirs, files in os.walk(directory):
depth = Path(root).relative_to(directory).parts
if len(depth) > max_depth:
dirs[:] = [] # Ne pas explorer plus profondément
continue
yield (len(depth), root, dirs, files)
def _should_ignore(self, name: str, is_directory: bool = False) -> bool:
"""Détermine si un fichier/répertoire doit être ignoré"""
# Ignorer les fichiers cachés
if name.startswith('.') and not name.startswith('._'):
return True
# Ignorer les fichiers système
if name.lower() in {'system volume information', '$recycle.bin', 'recycled'}:
return True
# Vérification des patterns d'ignore
for pattern in self.ignore_patterns:
if pattern.lower() in name.lower():
return True
return False
def _analyze_file(self, file_path: Path) -> Optional[Dict[str, Any]]:
"""Analyse un fichier et retourne ses métadonnées"""
try:
stat_info = file_path.stat()
file_info = {
'path': file_path,
'name': file_path.name,
'stem': file_path.stem,
'suffix': file_path.suffix.lower(),
'size': stat_info.st_size,
'created': stat_info.st_ctime,
'modified': stat_info.st_mtime,
'is_readable': os.access(file_path, os.R_OK),
'is_writable': os.access(file_path, os.W_OK)
}
# Calcul du checksum si nécessaire (optionnel pour les gros fichiers)
if stat_info.st_size < 1024 * 1024 * 100: # < 100 Mo
try:
file_info['checksum'] = self._calculate_checksum(file_path)
except:
file_info['checksum'] = None
return file_info
except Exception as e:
logger.error(f"Erreur lors de l'analyse du fichier {file_path}: {e}")
return None
def _calculate_checksum(self, file_path: Path, algorithm: str = 'md5') -> str:
"""Calcule le checksum d'un fichier"""
hash_func = hashlib.new(algorithm)
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
hash_func.update(chunk)
return hash_func.hexdigest()
def get_duplicates(self, files: List[Dict[str, Any]]) -> List[List[Dict[str, Any]]]:
"""Identifie les fichiers en double basés sur le nom et la taille"""
# Groupement par nom et taille
groups = {}
for file_info in files:
key = (file_info['name'], file_info['size'])
if key not in groups:
groups[key] = []
groups[key].append(file_info)
# Filtrage pour ne garder que les groupes avec doublons
duplicates = [group for group in groups.values() if len(group) > 1]
return duplicates
def get_checksum_duplicates(self, files: List[Dict[str, Any]]) -> List[List[Dict[str, Any]]]:
"""Identifie les fichiers en double basés sur le checksum"""
# S'assurer que tous les fichiers ont un checksum
for file_info in files:
if 'checksum' not in file_info or not file_info['checksum']:
file_info['checksum'] = self._calculate_checksum(file_info['path'])
# Groupement par checksum
groups = {}
for file_info in files:
checksum = file_info['checksum']
if checksum:
if checksum not in groups:
groups[checksum] = []
groups[checksum].append(file_info)
# Filtrage pour ne garder que les groupes avec doublons
duplicates = [group for group in groups.values() if len(group) > 1]
return duplicates
def get_file_stats(self, files: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Retourne des statistiques sur les fichiers"""
if not files:
return {
'total_files': 0,
'total_size': 0,
'avg_size': 0,
'max_size': 0,
'min_size': 0,
'extensions': {},
'total_readable': 0,
'total_writable': 0
}
total_size = sum(f['size'] for f in files)
avg_size = total_size // len(files)
max_size = max(f['size'] for f in files)
min_size = min(f['size'] for f in files)
# Compte par extension
extensions = {}
for file_info in files:
ext = file_info['suffix']
extensions[ext] = extensions.get(ext, 0) + 1
# Permissions
readable = sum(1 for f in files if f['is_readable'])
writable = sum(1 for f in files if f['is_writable'])
return {
'total_files': len(files),
'total_size': total_size,
'avg_size': avg_size,
'max_size': max_size,
'min_size': min_size,
'extensions': extensions,
'total_readable': readable,
'total_writable': writable
}

352
src/core/media_detector.py Normal file
View File

@@ -0,0 +1,352 @@
"""
Détecteur de médias et analyseur de fichiers multimédia
"""
import os
import re
import subprocess
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
import logging
logger = logging.getLogger(__name__)
class MediaDetector:
"""Détecte et analyse les fichiers multimédia"""
def __init__(self, config: Dict[str, Any] = None):
self.config = config or {}
self.ffprobe_path = self._find_ffprobe()
# Patterns pour détecter les numéros d'épisodes
self.episode_patterns = [
# S01E01, S1E1
r'(?:S(\d{1,2})\s*[-_.]?\s*)?E(\d{1,2})',
# 01, Episode 01, Ep 01
r'(?:Episode|Ep)\s*(\d{1,3})',
# Numbers in brackets: [01], (01)
r'[\[({](\d{1,3})[\])}]',
# Standalone numbers at end
r'(\d{1,3})(?:\.\w+)?$',
# Pattern for absolute numbering: 001-100
r'(\d{3})-(?:\d{3})'
]
# Patterns pour détecter les informations techniques
self.resolution_patterns = [
r'(\d{3,4}p)', # 1080p, 720p, 480p
r'(4K|UHD)', # 4K, UHD
r'(HD|SD)', # HD, SD
]
self.codec_patterns = [
r'(H\.?264|H\.?265|AVC|HEVC|x264|x265)',
r'(XVID|DIVX)',
r'(VP9|AV1)',
]
self.source_patterns = [
r'(BD|Blu-?Ray)',
r'(WEB|WEB-?DL|WEBRip)',
r'(DVD)',
r'(HDTV|TV)',
]
def _find_ffprobe(self) -> Optional[str]:
"""Cherche le chemin de ffprobe"""
try:
result = subprocess.run(
['ffprobe', '-version'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
return 'ffprobe'
except:
pass
# Chemins courants
common_paths = [
'/usr/bin/ffprobe',
'/usr/local/bin/ffprobe',
'/opt/homebrew/bin/ffprobe',
]
for path in common_paths:
if os.path.exists(path):
return path
return None
def analyze_media_file(self, file_path: Path) -> Dict[str, Any]:
"""
Analyse un fichier multimédia et extrait ses métadonnées
Returns:
Dict contenant les métadonnées du fichier
"""
logger.debug(f"Analyse du fichier: {file_path}")
metadata = {
'file_path': file_path,
'duration': None,
'resolution': None,
'codec': None,
'bitrate': None,
'audio_tracks': 0,
'subtitle_tracks': 0,
'file_size': 0,
'is_valid': False
}
# Taille du fichier
try:
metadata['file_size'] = file_path.stat().st_size
except:
pass
# Analyse avec ffprobe si disponible
if self.ffprobe_path:
metadata.update(self._analyze_with_ffprobe(file_path))
# Analyse basée sur le nom de fichier
filename_info = self._analyze_filename(file_path.name)
metadata.update(filename_info)
# Validation du fichier
metadata['is_valid'] = self._validate_file(metadata)
return metadata
def _analyze_with_ffprobe(self, file_path: Path) -> Dict[str, Any]:
"""Analyse un fichier avec ffprobe"""
metadata = {}
try:
cmd = [
self.ffprobe_path,
'-v', 'quiet',
'-print_format', 'json',
'-show_format',
'-show_streams',
str(file_path)
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
import json
probe_data = json.loads(result.stdout)
# Informations du format
format_info = probe_data.get('format', {})
metadata['duration'] = float(format_info.get('duration', 0))
metadata['bitrate'] = int(format_info.get('bit_rate', 0))
# Analyse des streams
video_streams = []
audio_streams = []
subtitle_streams = []
for stream in probe_data.get('streams', []):
if stream.get('codec_type') == 'video':
video_streams.append(stream)
elif stream.get('codec_type') == 'audio':
audio_streams.append(stream)
elif stream.get('codec_type') == 'subtitle':
subtitle_streams.append(stream)
metadata['audio_tracks'] = len(audio_streams)
metadata['subtitle_tracks'] = len(subtitle_streams)
# Informations vidéo
if video_streams:
video = video_streams[0]
metadata['codec'] = video.get('codec_name', '').upper()
width = video.get('width')
height = video.get('height')
if width and height:
metadata['resolution'] = f"{height}p"
# Détection du 4K
if height >= 2160:
metadata['resolution'] = "4K"
except subprocess.TimeoutExpired:
logger.warning(f"Timeout lors de l'analyse de {file_path}")
except Exception as e:
logger.warning(f"Erreur ffprobe pour {file_path}: {e}")
return metadata
def _analyze_filename(self, filename: str) -> Dict[str, Any]:
"""Extrait les informations du nom de fichier"""
metadata = {}
# Nettoyage du nom de fichier (sans l'extension)
clean_name = Path(filename).stem.lower()
# Recherche des informations techniques
for pattern in self.resolution_patterns:
match = re.search(pattern, clean_name, re.IGNORECASE)
if match and 'resolution' not in metadata:
metadata['resolution'] = match.group(1).upper()
break
for pattern in self.codec_patterns:
match = re.search(pattern, clean_name, re.IGNORECASE)
if match and 'codec' not in metadata:
metadata['codec'] = match.group(1).upper()
break
# Détection de la source
for pattern in self.source_patterns:
match = re.search(pattern, clean_name, re.IGNORECASE)
if match and 'source' not in metadata:
metadata['source'] = match.group(1).upper()
break
return metadata
def _validate_file(self, metadata: Dict[str, Any]) -> bool:
"""Valide si le fichier semble être un épisode valide"""
# Taille minimale (50 Mo)
if metadata.get('file_size', 0) < 50 * 1024 * 1024:
return False
# Durée minimale (5 minutes)
if metadata.get('duration', 0) and metadata['duration'] < 300:
return False
# Codec vidéo
valid_codecs = {'H264', 'H265', 'AVC', 'HEVC', 'X264', 'X265', 'VP9', 'AV1'}
codec = metadata.get('codec', '').upper()
if codec and codec not in valid_codecs:
return False
return True
def extract_episode_info(self, filename: str) -> Dict[str, Any]:
"""
Extrait les informations d'épisode du nom de fichier
Returns:
Dict avec season, episode, title, etc.
"""
episode_info = {
'season': 1, # Par défaut
'episode': None,
'title': None,
'special': False,
'absolute_number': None
}
clean_name = Path(filename).stem
# Tentatives avec différents patterns
for pattern in self.episode_patterns:
match = re.search(pattern, clean_name, re.IGNORECASE)
if match:
groups = match.groups()
# S01E01 format
if len(groups) >= 2 and groups[0] and groups[1]:
episode_info['season'] = int(groups[0])
episode_info['episode'] = int(groups[1])
# E01 or Episode 01 format
elif len(groups) >= 1 and groups[0]:
episode_num = int(groups[0])
# Si c'est un grand nombre, c'est probablement un épisode spécial
if episode_num > 100:
episode_info['episode'] = episode_num
episode_info['special'] = True
else:
episode_info['episode'] = episode_num
break
# Extraction du titre si possible
episode_info['title'] = self._extract_title(clean_name, episode_info)
return episode_info
def _extract_title(self, filename: str, episode_info: Dict[str, Any]) -> Optional[str]:
"""Extrait le titre de l'épisode du nom de fichier"""
# Suppression des numéros d'épisode et patterns techniques
title_part = filename
# Suppression du pattern SxxEyy
title_part = re.sub(r'S\d{1,2}\s*[-_.]?\s*E\d{1,2}', '', title_part, flags=re.IGNORECASE)
# Suppression du pattern Episode xx
title_part = re.sub(r'(?:Episode|Ep)\s*\d{1,3}', '', title_part, flags=re.IGNORECASE)
# Suppression des patterns techniques
for pattern in self.resolution_patterns + self.codec_patterns + self.source_patterns:
title_part = re.sub(pattern, '', title_part, flags=re.IGNORECASE)
# Suppression des crochets et parenthèses vides
title_part = re.sub(r'[\[\](){}]', '', title_part)
# Nettoyage final
title_part = re.sub(r'^[-_\s]+|[-_\s]+$', '', title_part) # Suppression au début/fin
title_part = re.sub(r'[-_\s]{2,}', ' ', title_part) # Remplacement multiples par un seul
return title_part.strip() if title_part.strip() else None
def get_series_name_from_filename(self, filename: str) -> str:
"""Extrait le nom de la série du nom de fichier"""
clean_name = Path(filename).stem
# Pattern pour le nom avant le numéro d'épisode
patterns = [
r'^\[?([^\[\]]+?)\]?\s*[-_.]?\s*S\d{1,2}E\d{1,2}',
r'^([^-_\[\]]+?)(?:\s*[-_.]\s*)?(?:S\d{1,2}E\d{1,2}|Episode\s*\d{1,3}|E\d{1,3})',
r'^([^-_()]+)(?:\s*[-_.]\s*)?\d{1,3}',
]
for pattern in patterns:
match = re.search(pattern, clean_name, re.IGNORECASE)
if match:
series_name = match.group(1).strip()
# Nettoyage du nom
series_name = re.sub(r'[^\w\s-]', '', series_name)
series_name = re.sub(r'\s+', ' ', series_name).strip()
return series_name
# Fallback: retourne le début du nom
words = clean_name.split()
if len(words) >= 2:
return ' '.join(words[:2])
return clean_name
def batch_analyze(self, file_paths: List[Path]) -> List[Dict[str, Any]]:
"""Analyse par lots plusieurs fichiers multimédia"""
results = []
for file_path in file_paths:
try:
metadata = self.analyze_media_file(file_path)
results.append(metadata)
except Exception as e:
logger.error(f"Erreur lors de l'analyse de {file_path}: {e}")
results.append({
'file_path': file_path,
'error': str(e),
'is_valid': False
})
return results

1
src/models/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Package models

175
src/models/episode.py Normal file
View File

@@ -0,0 +1,175 @@
"""
Modèle de données pour les séries et épisodes
"""
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from pathlib import Path
import datetime
@dataclass
class Episode:
"""Représente un épisode d'anime"""
file_path: Path
series_name: str
season: int = 1
episode: int = 0
title: Optional[str] = None
absolute_number: Optional[int] = None
special: bool = False
file_size: int = 0
duration: Optional[float] = None
checksum: Optional[str] = None
resolution: Optional[str] = None
codec: Optional[str] = None
verified: bool = False
metadata: Dict[str, Any] = field(default_factory=dict)
@property
def season_episode_str(self) -> str:
"""Retourne la représentation SXXEYY"""
if self.special:
return f"S00E{self.episode:02d}"
return f"S{self.season:02d}E{self.episode:02d}"
@property
def filename(self) -> str:
"""Retourne le nom de fichier actuel"""
return self.file_path.name
def get_recommended_name(self, series_name: str, include_absolute: bool = False) -> str:
"""Génère le nom recommandé selon le format TVDB"""
# Nettoyage du nom de série
clean_series = self._clean_series_name(series_name)
# Construction du nom de base
if self.special:
base_name = f"{clean_series} - S00E{self.episode:02d}"
else:
base_name = f"{clean_series} - S{self.season:02d}E{self.episode:02d}"
if include_absolute and self.absolute_number:
base_name += f" ({self.absolute_number})"
# Ajout du titre si disponible
if self.title:
clean_title = self._clean_title(self.title)
base_name += f" - {clean_title}"
# Ajout des informations techniques
suffix = ""
if self.resolution:
suffix += f" [{self.resolution}]"
if self.codec:
suffix += f" [{self.codec}]"
if self.checksum:
suffix += f" [{self.checksum}]"
return f"{base_name}{suffix}{self.file_path.suffix}"
def _clean_series_name(self, name: str) -> str:
"""Nettoie le nom de série pour le formatage"""
# Suppression des caractères spéciaux et normalisation
import re
name = re.sub(r'[<>:"/\\|?*]', '', name)
name = re.sub(r'\s+', ' ', name).strip()
return name
def _clean_title(self, title: str) -> str:
"""Nettoie le titre d'épisode pour le formatage"""
import re
title = re.sub(r'[<>:"/\\|?*]', '', title)
title = re.sub(r'\s+', ' ', title).strip()
return title
@dataclass
class Series:
"""Représente une série d'anime"""
name: str
directory: Path
episodes: List[Episode] = field(default_factory=list)
tvdb_id: Optional[int] = None
anidb_id: Optional[int] = None
total_episodes: Optional[int] = None
special_count: int = 0
languages: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
last_scan: Optional[datetime.datetime] = None
def add_episode(self, episode: Episode):
"""Ajoute un épisode à la série"""
self.episodes.append(episode)
def get_episode_by_number(self, season: int, episode: int) -> Optional[Episode]:
"""Récupère un épisode par son numéro"""
for ep in self.episodes:
if ep.season == season and ep.episode == episode:
return ep
return None
def get_specials(self) -> List[Episode]:
"""Retourne la liste des épisodes spéciaux"""
return [ep for ep in self.episodes if ep.special]
def get_regular_episodes(self) -> List[Episode]:
"""Retourne la liste des épisodes réguliers"""
return [ep for ep in self.episodes if not ep.special]
def check_completeness(self) -> Dict[str, Any]:
"""Vérifie la complétude de la série"""
regular_episodes = self.get_regular_episodes()
episode_numbers = [ep.episode for ep in regular_episodes]
result = {
'total_found': len(regular_episodes),
'specials_found': len(self.get_specials()),
'missing_episodes': [],
'duplicate_episodes': [],
'is_complete': False
}
if self.total_episodes:
expected = set(range(1, self.total_episodes + 1))
found = set(episode_numbers)
result['missing_episodes'] = list(expected - found)
# Vérification des doublons
from collections import Counter
counts = Counter(episode_numbers)
result['duplicate_episodes'] = [num for num, count in counts.items() if count > 1]
result['is_complete'] = (
len(result['missing_episodes']) == 0 and
len(result['duplicate_episodes']) == 0
)
return result
@dataclass
class DirectoryCompatibilityResult:
"""Résultat de la vérification de compatibilité de répertoire"""
is_compatible: bool = False
path: Path = field(default_factory=Path)
errors: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
found_series: List[str] = field(default_factory=list)
total_episodes: int = 0
total_size: int = 0
permissions: Dict[str, bool] = field(default_factory=dict)
disk_space: Dict[str, int] = field(default_factory=dict)
def add_error(self, error: str):
"""Ajoute une erreur"""
self.errors.append(error)
self.is_compatible = False
def add_warning(self, warning: str):
"""Ajoute un avertissement"""
self.warnings.append(warning)
def summary(self) -> str:
"""Retourne un résumé du résultat"""
status = "✅ Compatible" if self.is_compatible else "❌ Incompatible"
return f"{status} - {len(self.found_series)} séries, {self.total_episodes} épisodes"

390
src/ui/__init__.py Normal file
View File

@@ -0,0 +1,390 @@
"""
Interface utilisateur en ligne de commande - Projet de la Légion de Muyue
"""
import sys
import os
from pathlib import Path
from typing import Dict, Any, List, Optional
import logging
from .core import AnimeLibrarianCore
logger = logging.getLogger(__name__)
class AnimeLibrarianUI:
"""Interface utilisateur interactive pour AnimeLibrarian"""
def __init__(self, core: AnimeLibrarianCore):
"""
Initialise l'interface utilisateur
Args:
core: Cœur de l'application
"""
self.core = core
self.current_series_list = []
self.selected_series = []
logger.info("Interface utilisateur initialisée")
def run(self, preselected_directory: str = None):
"""
Lance l'interface interactive
Args:
preselected_directory: Répertoire pré-sélectionné (contourne la sélection)
"""
self._print_banner()
try:
# Étape 1: Sélection du répertoire
directory = self._select_directory(preselected_directory)
if not directory:
return
# Étape 2: Scan des séries
self._scan_series(directory)
if not self.current_series_list:
print("❌ Aucune série trouvée dans ce répertoire.")
return
# Étape 3: Sélection des séries
self._select_series()
if not self.selected_series:
print("❌ Aucune série sélectionnée.")
return
# Étape 4: Configuration des opérations
operations = self._configure_operations()
# Étape 5: Exécution des opérations
self._execute_operations(operations)
# Étape 6: Rapport final
self._generate_report()
except KeyboardInterrupt:
print("\n\n⚠️ Opération annulée par l'utilisateur.")
except Exception as e:
print(f"\n❌ Erreur inattendue: {e}")
logger.error(f"Erreur UI: {e}")
def _print_banner(self):
"""Affiche la bannière de l'application"""
print("\n" + "="*60)
print("🎬 AnimeLibrarian - Organisation de collections d'anime")
print("="*60)
print("📋 Projet développé par la Légion de Muyue")
print("="*60)
print()
def _select_directory(self, preselected_directory: str = None) -> Optional[str]:
"""Sélection du répertoire contenant les séries"""
if preselected_directory:
directory = preselected_directory
print(f"📁 Répertoire pré-sélectionné: {directory}")
else:
directory = input("📁 Entrez le chemin du répertoire contenant vos séries: ").strip()
if not directory:
print("❌ Aucun répertoire spécifié.")
return None
# Vérification de compatibilité
print(f"\n🔍 Vérification de la compatibilité du répertoire...")
result = self.core.check_directory_compatibility(directory)
if not result.get("is_compatible"):
print("❌ Le répertoire n'est pas compatible:")
for error in result.get("errors", []):
print(f"{error}")
# Affichage des recommandations
recommendations = self.core.directory_checker.get_compatibility_recommendations(
type('Result', (), result)()
)
if recommendations:
print("\n💡 Recommandations:")
for rec in recommendations:
print(f"{rec}")
return None
print("✅ Le répertoire est compatible!")
# Résumé du répertoire
print(f" 📊 Séries trouvées: {len(result.get('found_series', []))}")
print(f" 📺 Épisodes trouvés: {result.get('total_episodes', 0)}")
print(f" 💾 Taille totale: {self._format_size(result.get('total_size', 0))}")
return directory
def _scan_series(self, directory: str):
"""Scan des séries dans le répertoire"""
print(f"\n🔍 Scan des séries en cours...")
try:
self.current_series_list = self.core.scan_series(directory)
print(f"{len(self.current_series_list)} séries trouvées!")
except Exception as e:
print(f"❌ Erreur lors du scan: {e}")
raise
def _select_series(self):
"""Sélection des séries à traiter"""
print("\n📋 Séries disponibles:")
print("-" * 60)
for i, series in enumerate(self.current_series_list):
status = "" if series["is_complete"] else "⚠️"
episodes = series["total_episodes"]
size = self._format_size(series["total_size"])
print(f"{i+1:2d}. {status} {series['name']}")
print(f" 📺 {episodes} épisodes • 💾 {size}")
# Afficher les problèmes si non complète
if not series["is_complete"]:
if series["completeness"]["missing_episodes"]:
print(f" ⚠️ Manque: {series['completeness']['missing_episodes']}")
if series["completeness"]["duplicate_episodes"]:
print(f" ⚠️ Doublons: {series['completeness']['duplicate_episodes']}")
print("\n" + "-"*60)
# Sélection
selection = input("🎯 Choisissez les séries à traiter (ex: 1,3,5-8, * pour tout): ").strip()
if not selection:
self.selected_series = []
return
# Traitement de la sélection
indices = self._parse_selection(selection, len(self.current_series_list))
if not indices:
print("❌ Sélection invalide.")
self.selected_series = []
return
# Conversion en objets séries
self.selected_series = [
self.current_series_list[i] for i in indices
]
print(f"{len(self.selected_series)} série(s) sélectionnée(s):")
for series in self.selected_series:
print(f"{series['name']}")
def _configure_operations(self) -> Dict[str, bool]:
"""Configuration des opérations à effectuer"""
print("\n⚙️ Configuration des opérations:")
print("-" * 40)
operations = {}
# Vérification des numéros d'épisodes
if self._confirm_operation("Vérifier les numéros d'épisodes avec trace.moe?"):
operations["verify_episodes"] = True
else:
operations["verify_episodes"] = False
# Vérification de l'intégrité
if self._confirm_operation("Vérifier l'intégrité des fichiers?"):
operations["verify_integrity"] = True
else:
operations["verify_integrity"] = False
# Renommage
if self._confirm_operation("Renommer les fichiers selon les standards TVDB?"):
operations["rename_files"] = True
# Options de renommage
if self._confirm_operation(" • Créer des sauvegardes avant renommage? (recommandé)"):
operations["backup_files"] = True
else:
operations["backup_files"] = False
else:
operations["rename_files"] = False
operations["backup_files"] = False
return operations
def _execute_operations(self, operations: Dict[str, bool]):
"""Exécute les opérations configurées"""
print("\n🚀 Exécution des opérations...")
indices = [self.current_series_list.index(series) for series in self.selected_series]
# 1. Vérification des épisodes
if operations.get("verify_episodes"):
print(f"\n🔍 Vérification des numéros d'épisodes...")
try:
results = self.core.verify_episodes_numbers(indices)
self._display_episode_verification_results(results)
except Exception as e:
print(f"❌ Erreur lors de la vérification des épisodes: {e}")
# 2. Vérification d'intégrité
if operations.get("verify_integrity"):
print(f"\n🛡️ Vérification de l'intégrité des fichiers...")
try:
results = self.core.verify_files_integrity(indices)
self._display_integrity_results(results)
except Exception as e:
print(f"❌ Erreur lors de la vérification d'intégrité: {e}")
# 3. Renommage
if operations.get("rename_files"):
print(f"\n📝 Renommage des fichiers...")
# Configuration du renommage
self.core.file_renamer.backup_original = operations.get("backup_files", True)
# Confirmation finale
if self._confirm_operation("⚠️ Confirmer le renommage des fichiers?"):
try:
results = self.core.rename_files(indices, dry_run=False)
self._display_rename_results(results)
except Exception as e:
print(f"❌ Erreur lors du renommage: {e}")
else:
print("❌ Renommage annulé.")
def _display_episode_verification_results(self, results: Dict[str, Any]):
"""Affiche les résultats de vérification des épisodes"""
print(f"{results['series_verified']} séries vérifiées")
print(f"📺 {results['episodes_verified']} épisodes vérifiés")
for series_result in results["verification_results"]:
series_name = series_result["series_name"]
summary = series_result["summary"]
print(f"\n📋 {series_name}:")
print(f" ✅ Correspondances: {summary['matches']}/{summary['total']}")
print(f" 📊 Taux de réussite: {summary['match_rate']:.1%}")
if summary['mismatches'] > 0:
print(f" ⚠️ Incohérences: {summary['mismatches']}")
def _display_integrity_results(self, results: Dict[str, Any]):
"""Affiche les résultats de vérification d'intégrité"""
print(f"📊 Fichiers vérifiés: {results['files_checked']}")
print(f"✅ Fichiers valides: {results['valid_files']}")
if results["invalid_files"]:
print(f"❌ Fichiers invalides: {len(results['invalid_files'])}")
for file_info in results["invalid_files"][:5]: # Limite à 5
print(f"{file_info['filename']}: {file_info['issue']}")
if len(results["invalid_files"]) > 5:
print(f" ...et {len(results['invalid_files']) - 5} autres fichiers")
if results["duplicates"]:
print(f"🔄 Doublons détectés: {len(results['duplicates'])}")
for dup_group in results["duplicates"][:3]: # Limite à 3
print(f"{', '.join(dup_group)}")
def _display_rename_results(self, results: Dict[str, Any]):
"""Affiche les résultats du renommage"""
stats = results["stats"]
print(f"📝 Renommage terminé:")
print(f" ✅ Fichiers renommés: {stats['renamed']}")
print(f" 💾 Sauvegardes créées: {stats['backups_created']}")
print(f" ❌ Erreurs: {stats['errors']}")
if results["rename_plan"]:
print(f"\n📋 Plan de renommage ({len(results['rename_plan'])} fichiers):")
for result in results["rename_results"][:5]: # Limite à 5
if result["success"]:
print(f"{result['old_path'].name}{result['new_path'].name}")
else:
print(f"{result['old_path'].name}: {result['error']}")
if len(results["rename_results"]) > 5:
print(f" ...et {len(results['rename_results']) - 5} autres fichiers")
def _generate_report(self):
"""Génère un rapport final"""
print("\n" + "="*60)
print("📊 RAPPORT FINAL - AnimeLibrarian")
print("="*60)
# Statistiques globales
print(f"🎯 Séries traitées: {len(self.selected_series)}")
total_episodes = sum(series["total_episodes"] for series in self.selected_series)
print(f"📺 Épisodes analysés: {total_episodes}")
total_size = sum(series["total_size"] for series in self.selected_series)
print(f"💾 Taille totale: {self._format_size(total_size)}")
# Statistiques de l'application
status = self.core.get_application_status()
print(f"🔗 TVDB configuré: {'Oui' if status['tvdb_configured'] else 'Non'}")
print(f"🔍 Trace.moe configuré: {'Oui' if status['trace_moe_configured'] else 'Non'}")
print("\n✅ Opération terminée avec succès!")
print("Merci d'utiliser AnimeLibrarian - Projet de la Légion de Muyue")
print("="*60)
def _confirm_operation(self, message: str) -> bool:
"""Demande confirmation à l'utilisateur"""
response = input(f"{message} (o/n): ").strip().lower()
return response in ('o', 'oui', 'y', 'yes')
def _parse_selection(self, selection: str, max_index: int) -> List[int]:
"""Parse une sélection de séries (1,3,5-8, *)"""
if selection.strip() == '*':
return list(range(max_index))
indices = set()
parts = selection.split(',')
for part in parts:
part = part.strip()
if '-' in part:
# Range (5-8)
try:
start, end = part.split('-', 1)
start = int(start.strip())
end = int(end.strip())
# Validation
if 1 <= start <= max_index and 1 <= end <= max_index:
indices.update(range(start - 1, end))
except ValueError:
continue
else:
# Single number
try:
idx = int(part)
if 1 <= idx <= max_index:
indices.add(idx - 1)
except ValueError:
continue
return sorted(indices)
def _format_size(self, size_bytes: int) -> str:
"""Formate une taille en octets en format lisible"""
if not size_bytes:
return "0 B"
units = ["B", "KB", "MB", "GB", "TB"]
unit_index = 0
size = float(size_bytes)
while size >= 1024 and unit_index < len(units) - 1:
size /= 1024
unit_index += 1
if unit_index == 0:
return f"{int(size)} {units[unit_index]}"
else:
return f"{size:.1f} {units[unit_index]}"

16
src/utils/__init__.py Normal file
View File

@@ -0,0 +1,16 @@
"""
Utils et helpers pour AnimeLibrarian - Projet de la Légion de Muyue
"""
from .config import load_config, save_config, get_config_template, validate_config, setup_directories, mask_sensitive_data
from .logging import setup_logging
__all__ = [
'load_config',
'save_config',
'get_config_template',
'validate_config',
'setup_directories',
'mask_sensitive_data',
'setup_logging'
]

294
src/utils/config.py Normal file
View File

@@ -0,0 +1,294 @@
"""
Fichier de configuration pour AnimeLibrarian - Projet de la Légion de Muyue
"""
import json
import os
from pathlib import Path
from typing import Dict, Any, Optional
import logging
logger = logging.getLogger(__name__)
DEFAULT_CONFIG = {
# Configuration générale
"language": "fra",
"log_level": "INFO",
"temp_directory": "/tmp/animelibrarian",
# Configuration TVDB
"thetvdb_api_key": None,
"thetvdb_base_url": "https://api.thetvdb.com",
# Configuration trace.moe
"trace_moe_api_key": None,
"trace_moe_base_url": "https://api.trace.moe",
"trace_moe_rate_limit": 1.0,
"trace_moe_timeout": 30,
"trace_moe_max_retries": 3,
# Configuration du scan de fichiers
"video_extensions": [".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm",
".m4v", ".mpg", ".mpeg", ".3gp", ".ts", ".m2ts", ".ogv"],
"max_scan_depth": 10,
"ignore_patterns": [".git", ".svn", ".hg", "__pycache__", "node_modules",
".vscode", ".idea", "Thumbs.db", ".DS_Store"],
# Configuration de détection multimédia
"min_video_size": 50 * 1024 * 1024, # 50 Mo
"max_video_size": 50 * 1024 * 1024 * 1024, # 50 Go
"min_video_duration": 300, # 5 minutes
# Configuration du renommage
"backup_original": True,
"backup_directory": ".backups",
"include_absolute_number": True,
"include_episode_title": True,
"include_technical_info": True,
"dry_run": False,
# Configuration du cache
"enable_cache": True,
"cache_directory": ".cache",
"cache_expiry_hours": 24,
# Configuration de la compatibilité
"min_free_space_percent": 5,
"min_free_space_bytes": 1024 * 1024 * 1024, # 1 Go
# Configuration de l'interface
"max_display_items": 20,
"confirm_operations": True
}
def load_config(config_path: str = "config.json") -> Dict[str, Any]:
"""
Charge la configuration depuis un fichier
Args:
config_path: Chemin vers le fichier de configuration
Returns:
Dictionnaire de configuration
"""
config = DEFAULT_CONFIG.copy()
if not config_path:
return config
config_file = Path(config_path)
if not config_file.exists():
logger.info(f"Fichier de configuration non trouvé: {config_path}")
logger.info("Utilisation de la configuration par défaut")
save_config(config, config_path)
return config
try:
with open(config_file, 'r', encoding='utf-8') as f:
user_config = json.load(f)
# Fusion avec la configuration par défaut
config.update(user_config)
logger.info(f"Configuration chargée depuis: {config_path}")
except json.JSONDecodeError as e:
logger.error(f"Erreur de format JSON dans {config_path}: {e}")
logger.info("Utilisation de la configuration par défaut")
except Exception as e:
logger.error(f"Erreur lors du chargement de la configuration: {e}")
logger.info("Utilisation de la configuration par défaut")
return config
def save_config(config: Dict[str, Any], config_path: str = "config.json"):
"""
Sauvegarde la configuration dans un fichier
Args:
config: Configuration à sauvegarder
config_path: Chemin vers le fichier de configuration
"""
try:
config_file = Path(config_path)
# Création du répertoire parent si nécessaire
config_file.parent.mkdir(parents=True, exist_ok=True)
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
logger.info(f"Configuration sauvegardée dans: {config_path}")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde de la configuration: {e}")
def get_config_template() -> str:
"""Retourne un modèle de configuration avec commentaires"""
template = """{
// Configuration générale
"language": "fra",
"log_level": "INFO",
"temp_directory": "/tmp/animelibrarian",
// Configuration TVDB - Obtenez votre clé API sur https://thetvdb.com/
"thetvdb_api_key": null,
"thetvdb_base_url": "https://api.thetvdb.com",
// Configuration trace.moe - Clé API optionnelle pour des limites plus élevées
"trace_moe_api_key": null,
"trace_moe_base_url": "https://api.trace.moe",
"trace_moe_rate_limit": 1.0,
"trace_moe_timeout": 30,
"trace_moe_max_retries": 3,
// Extensions vidéo supportées
"video_extensions": [
".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", ".webm",
".m4v", ".mpg", ".mpeg", ".3gp", ".ts", ".m2ts", ".ogv"
],
// Configuration du scan
"max_scan_depth": 10,
"ignore_patterns": [
".git", ".svn", ".hg", "__pycache__", "node_modules",
".vscode", ".idea", "Thumbs.db", ".DS_Store"
],
// Configuration de détection multimédia
"min_video_size": 52428800,
"max_video_size": 53687091200,
"min_video_duration": 300,
// Configuration du renommage
"backup_original": true,
"backup_directory": ".backups",
"include_absolute_number": true,
"include_episode_title": true,
"include_technical_info": true,
"dry_run": false,
// Configuration du cache
"enable_cache": true,
"cache_directory": ".cache",
"cache_expiry_hours": 24,
// Configuration de la compatibilité
"min_free_space_percent": 5,
"min_free_space_bytes": 1073741824,
// Configuration de l'interface
"max_display_items": 20,
"confirm_operations": true
}"""
return template
def validate_config(config: Dict[str, Any]) -> Dict[str, Any]:
"""
Valide la configuration et retourne les problèmes
Args:
config: Configuration à valider
Returns:
Dictionnaire avec les erreurs et avertissements
"""
result = {
"errors": [],
"warnings": [],
"is_valid": True
}
# Validation des clés API
if not config.get("thetvdb_api_key"):
result["warnings"].append("Clé API TVDB non configurée - fonctionnalités limitées")
# Validation des chemins
temp_dir = Path(config.get("temp_directory", "/tmp/animelibrarian"))
if not temp_dir.parent.exists():
result["errors"].append(f"Répertoire parent invalide pour temp_directory: {temp_dir.parent}")
# Validation des valeurs numériques
if config.get("min_video_size", 0) < 0:
result["errors"].append("min_video_size doit être positif")
if config.get("max_video_size", 0) <= config.get("min_video_size", 0):
result["errors"].append("max_video_size doit être supérieur à min_video_size")
if config.get("trace_moe_rate_limit", 0) <= 0:
result["errors"].append("trace_moe_rate_limit doit être positif")
# Validation des extensions
extensions = config.get("video_extensions", [])
if not extensions or not all(ext.startswith('.') for ext in extensions):
result["errors"].append("video_extensions doit contenir des extensions valides (ex: .mp4)")
# Validation du niveau de log
valid_log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
if config.get("log_level", "").upper() not in valid_log_levels:
result["errors"].append(f"log_level doit être l'une de: {', '.join(valid_log_levels)}")
result["is_valid"] = len(result["errors"]) == 0
return result
def setup_directories(config: Dict[str, Any]):
"""
Crée les répertoires nécessaires selon la configuration
Args:
config: Configuration de l'application
"""
directories = []
# Répertoire temporaire
if config.get("temp_directory"):
directories.append(Path(config["temp_directory"]))
# Répertoire de sauvegarde
if config.get("backup_original") and config.get("backup_directory"):
# Ce sera créé relativement au répertoire de travail
pass
# Répertoire de cache
if config.get("enable_cache") and config.get("cache_directory"):
directories.append(Path(config["cache_directory"]))
for directory in directories:
try:
directory.mkdir(parents=True, exist_ok=True)
logger.debug(f"Répertoire créé/vérifié: {directory}")
except Exception as e:
logger.warning(f"Impossible de créer le répertoire {directory}: {e}")
def mask_sensitive_data(config: Dict[str, Any]) -> Dict[str, Any]:
"""
Masque les données sensibles pour l'affichage
Args:
config: Configuration originale
Returns:
Configuration avec les données sensibles masquées
"""
masked = config.copy()
# Liste des clés sensibles
sensitive_keys = [
"thetvdb_api_key",
"trace_moe_api_key"
]
for key in sensitive_keys:
if key in masked and masked[key]:
masked[key] = "***masqué***"
return masked

69
src/utils/logging.py Normal file
View File

@@ -0,0 +1,69 @@
"""
Configuration du logging pour AnimeLibrarian - Projet de la Légion de Muyue
"""
import logging
import logging.handlers
import os
from pathlib import Path
from typing import Optional
def setup_logging(verbose: bool = False, log_file: Optional[str] = None):
"""
Configure le système de logging
Args:
verbose: Active le mode verbeux (DEBUG)
log_file: Fichier de log (optionnel)
"""
# Niveau de log
level = logging.DEBUG if verbose else logging.INFO
# Configuration du formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Configuration du logger racine
root_logger = logging.getLogger()
root_logger.setLevel(level)
# Suppression des handlers existants
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
# Handler console
console_handler = logging.StreamHandler()
console_handler.setLevel(level)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# Handler fichier si spécifié
if log_file:
try:
log_path = Path(log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)
file_handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=10 * 1024 * 1024, # 10 Mo
backupCount=5
)
file_handler.setLevel(logging.DEBUG) # Toujours DEBUG dans les fichiers
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
except Exception as e:
print(f"⚠️ Impossible de créer le fichier de log {log_file}: {e}")
# Configuration spécifique pour les modules externes
external_modules = [
'urllib3.connectionpool',
'requests.packages.urllib3.connectionpool'
]
for module in external_modules:
logging.getLogger(module).setLevel(logging.WARNING)