Initial commit: Reorganiser le projet FFA Calendar Scraper

- Créer une arborescence propre (src/, scripts/, config/, data/, docs/, tests/)
- Déplacer les modules Python dans src/
- Déplacer les scripts autonomes dans scripts/
- Nettoyer les fichiers temporaires et __pycache__
- Mettre à jour le README.md avec documentation complète
- Mettre à jour les imports dans les scripts pour la nouvelle structure
- Configurer le .gitignore pour ignorer les données et logs
- Organiser les données dans data/ (courses, resultats, clubs, exports)

Structure du projet:
- src/: Modules principaux (ffa_scraper, ffa_analyzer)
- scripts/: Scripts CLI et utilitaires
- config/: Configuration (config.env)
- data/: Données générées
- docs/: Documentation
- tests/: Tests unitaires

💘 Generated with Crush

Assisted-by: GLM-4.7 via Crush <crush@charm.land>
This commit is contained in:
Muyue
2026-01-01 18:05:14 +01:00
commit a5406a4e89
16 changed files with 3920 additions and 0 deletions

360
scripts/athlete_summary.py Executable file
View File

@@ -0,0 +1,360 @@
#!/usr/bin/env python3
"""
Script pour générer un récapitulatif complet d'un athlète
Utilise les fichiers CSV pour extraire toutes les informations
"""
import pandas as pd
import os
import sys
import argparse
import logging
from datetime import datetime
from collections import defaultdict
import re
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def parse_time(time_str):
"""Convertir un temps en secondes pour comparaison"""
if not time_str or pd.isna(time_str):
return None
# Format HH:MM:SS
if ':' in time_str:
parts = time_str.split(':')
if len(parts) == 3:
try:
h, m, s = parts
return int(h) * 3600 + int(m) * 60 + float(s)
except:
pass
elif len(parts) == 2:
try:
m, s = parts
return int(m) * 60 + float(s)
except:
pass
# Format en secondes ou minutes
try:
# Enlever les non-numériques
clean = re.sub(r'[^\d.,]', '', str(time_str))
return float(clean.replace(',', '.'))
except:
return None
def format_time(seconds):
"""Formater des secondes en HH:MM:SS"""
if not seconds:
return "N/A"
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = seconds % 60
if hours > 0:
return f"{hours}:{minutes:02d}:{secs:05.2f}"
elif minutes > 0:
return f"{minutes}:{secs:05.2f}"
else:
return f"{secs:.2f}s"
def get_athlete_summary(nom, prenom=None, data_dir="data"):
"""Générer un récapitulatif complet d'un athlète"""
results_path = os.path.join(data_dir, 'resultats', 'results.csv')
courses_path = os.path.join(data_dir, 'courses', 'courses_list.csv')
if not os.path.exists(results_path):
logging.error(f"Fichier de résultats introuvable: {results_path}")
return None
try:
df_results = pd.read_csv(results_path, encoding='utf-8-sig')
# Filtre par nom
mask = df_results['nom'].str.contains(nom, case=False, na=False)
if prenom:
mask &= df_results['prenom'].str.contains(prenom, case=False, na=False)
results = df_results[mask]
if results.empty:
return None
# Charger les courses pour plus d'info
courses_df = None
if os.path.exists(courses_path):
courses_df = pd.read_csv(courses_path, encoding='utf-8-sig')
# Créer le récapitulatif
summary = {
'nom': results.iloc[0]['nom'],
'prenom': results.iloc[0]['prenom'],
'club': results.iloc[0]['club'],
'total_courses': len(results),
'categories': sorted(results['categorie'].dropna().unique().tolist()),
'results': [],
'statistics': {}
}
# Calculer les statistiques
places = []
times = []
podiums = 0
victoires = 0
courses_by_year = defaultdict(int)
courses_by_type = defaultdict(int)
for idx, result in results.iterrows():
# Extraire les places
try:
place = int(result['place'])
places.append(place)
if place == 1:
victoires += 1
podiums += 1
elif place <= 3:
podiums += 1
except:
pass
# Extraire les temps
time_seconds = parse_time(result.get('temps', result.get('resultat', '')))
if time_seconds:
times.append(time_seconds)
# Extraire les infos de course si disponible
course_info = {}
if courses_df is not None and 'course_url' in result:
course_match = courses_df[courses_df['lien'] == result['course_url']]
if not course_match.empty:
course_info = course_match.iloc[0].to_dict()
# Extraire l'année de la course
if 'date' in result and pd.notna(result['date']):
try:
year = str(result['date']).split('-')[0]
courses_by_year[year] += 1
except:
pass
elif course_info.get('date'):
try:
year = str(course_info['date']).split('-')[0]
courses_by_year[year] += 1
except:
pass
# Extraire le type de course
if course_info.get('type'):
courses_by_type[course_info['type']] += 1
# Ajouter le résultat détaillé
detailed_result = result.to_dict()
detailed_result['course_details'] = course_info
summary['results'].append(detailed_result)
# Calculer les statistiques finales
summary['statistics'] = {
'victoires': victoires,
'podiums': podiums,
'places_moyenne': sum(places) / len(places) if places else 0,
'meilleure_place': min(places) if places else None,
'pire_place': max(places) if places else None,
'meilleur_temps': min(times) if times else None,
'temps_moyen': sum(times) / len(times) if times else None,
'courses_par_annee': dict(courses_by_year),
'courses_par_type': dict(courses_by_type),
'categories': summary['categories']
}
return summary
except Exception as e:
logging.error(f"Erreur lors de la génération du récapitulatif: {e}")
return None
def display_athlete_summary(summary, show_full_results=False):
"""Afficher le récapitulatif de l'athlète"""
if not summary:
print("\n❌ Impossible de générer le récapitulatif")
return
stats = summary['statistics']
print(f"\n{'='*80}")
print(f"📊 RÉCAPITULATIF DE {summary['prenom']} {summary['nom']}")
print(f"{'='*80}\n")
# Informations générales
print(f"🏟️ Club: {summary['club']}")
print(f"🏃 Total des courses: {summary['total_courses']}")
print()
# Statistiques de performance
print(f"🏆 STATISTIQUES DE PERFORMANCE")
print(f"{''*40}")
print(f"Victoires: {stats['victoires']}")
print(f"Podiums (top 3): {stats['podiums']}")
print(f"Meilleure place: {stats['meilleure_place']}")
print(f"Place moyenne: {stats['places_moyenne']:.2f}")
print()
# Statistiques de temps
if stats['meilleur_temps']:
print(f"⏱️ STATISTIQUES DE TEMPS")
print(f"{''*40}")
print(f"Meilleur temps: {format_time(stats['meilleur_temps'])}")
print(f"Temps moyen: {format_time(stats['temps_moyen'])}")
print()
# Répartition par année
if stats['courses_par_annee']:
print(f"📅 RÉPARTITION PAR ANNÉE")
print(f"{''*40}")
for year, count in sorted(stats['courses_par_annee'].items()):
print(f"{year}: {count} course(s)")
print()
# Répartition par type
if stats['courses_par_type']:
print(f"🏷️ RÉPARTITION PAR TYPE DE COURSE")
print(f"{''*40}")
for course_type, count in sorted(stats['courses_par_type'].items(), key=lambda x: x[1], reverse=True):
print(f"{course_type}: {count} course(s)")
print()
# Catégories
if stats['categories']:
print(f"📊 CATÉGORIES")
print(f"{''*40}")
print(", ".join(cat for cat in stats['categories'] if cat))
print()
print(f"{'='*80}\n")
# Résultats détaillés
if show_full_results:
print(f"📋 LISTE COMPLÈTE DES RÉSULTATS")
print(f"{'='*80}\n")
for i, result in enumerate(summary['results'], 1):
print(f"{i}. {result.get('course_details', {}).get('nom', 'Inconnu')}")
if result.get('course_details', {}).get('date'):
print(f" 📅 {result['course_details']['date']}")
if result.get('course_details', {}).get('lieu'):
print(f" 📍 {result['course_details']['lieu']}")
print(f" 🏆 Place: {result.get('place', 'N/A')}")
temps = result.get('temps', result.get('resultat', 'N/A'))
print(f" ⏱️ Temps: {temps}")
if result.get('categorie'):
print(f" 🏷️ Catégorie: {result['categorie']}")
if result.get('points'):
print(f" 🎯 Points: {result['points']}")
print()
print(f"{'='*80}\n")
def export_summary_txt(summary, output_dir="data"):
"""Exporter le récapitulatif en fichier texte"""
os.makedirs(os.path.join(output_dir, 'exports'), exist_ok=True)
filename = f"summary_{summary['nom']}_{summary['prenom']}.txt"
filepath = os.path.join(output_dir, 'exports', filename.replace(" ", "_"))
with open(filepath, 'w', encoding='utf-8') as f:
f.write("="*80 + "\n")
f.write(f"RÉCAPITULATIF DE {summary['prenom']} {summary['nom']}\n")
f.write("="*80 + "\n\n")
stats = summary['statistics']
# Informations générales
f.write(f"Club: {summary['club']}\n")
f.write(f"Total des courses: {summary['total_courses']}\n\n")
# Statistiques
f.write("STATISTIQUES DE PERFORMANCE\n")
f.write("-"*40 + "\n")
f.write(f"Victoires: {stats['victoires']}\n")
f.write(f"Podiums: {stats['podiums']}\n")
f.write(f"Meilleure place: {stats['meilleure_place']}\n")
f.write(f"Place moyenne: {stats['places_moyenne']:.2f}\n\n")
# Temps
if stats['meilleur_temps']:
f.write("STATISTIQUES DE TEMPS\n")
f.write("-"*40 + "\n")
f.write(f"Meilleur temps: {format_time(stats['meilleur_temps'])}\n")
f.write(f"Temps moyen: {format_time(stats['temps_moyen'])}\n\n")
# Répartition
if stats['courses_par_annee']:
f.write("RÉPARTITION PAR ANNÉE\n")
f.write("-"*40 + "\n")
for year, count in sorted(stats['courses_par_annee'].items()):
f.write(f"{year}: {count}\n")
f.write("\n")
if stats['courses_par_type']:
f.write("RÉPARTITION PAR TYPE\n")
f.write("-"*40 + "\n")
for course_type, count in sorted(stats['courses_par_type'].items(), key=lambda x: x[1], reverse=True):
f.write(f"{course_type}: {count}\n")
f.write("\n")
# Liste des courses
f.write("LISTE DES COURSES\n")
f.write("="*80 + "\n\n")
for i, result in enumerate(summary['results'], 1):
f.write(f"{i}. {result.get('course_details', {}).get('nom', 'Inconnu')}\n")
f.write(f" Date: {result.get('course_details', {}).get('date', 'N/A')}\n")
f.write(f" Place: {result.get('place', 'N/A')}\n")
f.write(f" Temps: {result.get('temps', result.get('resultat', 'N/A'))}\n")
f.write("\n")
logging.info(f"Exporté le récapitulatif dans {filepath}")
return filepath
def main():
parser = argparse.ArgumentParser(description='Générer un récapitulatif complet d\'un athlète')
parser.add_argument('nom', help='Nom de l\'athlète')
parser.add_argument('--prenom', help='Prénom de l\'athlète')
parser.add_argument('--data-dir', default='data',
help='Répertoire des données CSV')
parser.add_argument('--full', action='store_true',
help='Afficher la liste complète des résultats')
parser.add_argument('--export', action='store_true',
help='Exporter le récapitulatif en fichier texte')
args = parser.parse_args()
# Générer le récapitulatif
print(f"\n📊 Génération du récapitulatif pour {args.prenom or ''} {args.nom}...")
summary = get_athlete_summary(args.nom, args.prenom, args.data_dir)
if summary:
display_athlete_summary(summary, show_full_results=args.full)
if args.export:
filepath = export_summary_txt(summary, args.data_dir)
print(f"💾 Exporté dans: {filepath}")
else:
print("\n❌ Aucun résultat trouvé pour cet athlète")
print("💡 Vérifiez que les données ont été scrapées avec:")
print(" python ffa_cli.py scrape --fetch-details")
if __name__ == "__main__":
main()

349
scripts/extract_races.py Executable file
View File

@@ -0,0 +1,349 @@
#!/usr/bin/env python3
"""
Script pour extraire les types de courses, distances et statistiques
Analyse les données pour identifier les patterns de courses (100m, marathon, etc.)
"""
import pandas as pd
import os
import sys
import argparse
import logging
import re
from collections import defaultdict, Counter
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# Patterns pour extraire les distances des noms de courses
DISTANCE_PATTERNS = [
(r'(\d+)\s*m', lambda x: int(x.group(1)), 'm'), # 100m, 5000m
(r'(\d+)\s*km', lambda x: int(x.group(1)) * 1000, 'km'), # 10km, semi-marathon
(r'marathon', lambda x: 42195, 'marathon'),
(r'semi[-\s]?marathon', lambda x: 21097, 'semi-marathon'),
(r'demi[-\s]?fond', lambda x: 0, 'demi-fond'),
(r'fond', lambda x: 0, 'fond'),
(r'sprint', lambda x: 0, 'sprint'),
(r'haies', lambda x: 0, 'haies'),
(r'cross', lambda x: 0, 'cross country'),
(r'route', lambda x: 0, 'route'),
(r'trail', lambda x: 0, 'trail'),
(r'ultra', lambda x: 0, 'ultra'),
]
def extract_distance_from_name(course_name):
"""Extraire la distance à partir du nom de course"""
if pd.isna(course_name):
return None, None
course_name_lower = course_name.lower()
for pattern, extractor, unit in DISTANCE_PATTERNS:
match = re.search(pattern, course_name_lower, re.IGNORECASE)
if match:
try:
distance = extractor(match)
return distance, unit
except:
continue
return None, None
def categorize_course(course_type, course_name):
"""Catégoriser une course"""
if pd.isna(course_type):
course_type = ''
if pd.isna(course_name):
course_name = ''
combined = (course_type + ' ' + course_name).lower()
# Catégories principales
if any(x in combined for x in ['100m', '200m', '400m', 'sprint']):
return 'Sprint'
elif any(x in combined for x in ['800m', '1500m', 'demi-fond']):
return 'Demi-fond'
elif any(x in combined for x in ['5000m', '10000m', 'fond']):
return 'Fond'
elif 'marathon' in combined:
return 'Marathon'
elif any(x in combined for x in ['semi', '21km']):
return 'Semi-marathon'
elif 'trail' in combined:
return 'Trail'
elif 'cross' in combined:
return 'Cross country'
elif 'route' in combined and 'km' in combined:
return 'Route'
elif 'haies' in combined:
return 'Haies'
else:
return 'Autre'
def analyze_courses(data_dir="data"):
"""Analyser toutes les courses et extraire les statistiques"""
courses_path = os.path.join(data_dir, 'courses', 'courses_list.csv')
if not os.path.exists(courses_path):
logging.error(f"Fichier de courses introuvable: {courses_path}")
return None
try:
df = pd.read_csv(courses_path, encoding='utf-8-sig')
logging.info(f"Chargé {len(df)} courses")
# Extraire les distances
df['distance_meters'], df['distance_unit'] = zip(
*df['nom'].apply(extract_distance_from_name)
)
# Catégoriser les courses
df['category'] = df.apply(
lambda row: categorize_course(row['type'], row['nom']),
axis=1
)
# Statistiques globales
stats = {
'total_courses': len(df),
'types': {},
'categories': {},
'distances': {},
'by_type': {},
'by_location': {},
'by_date': {}
}
# Analyse par type
type_counts = df['type'].value_counts()
for course_type, count in type_counts.items():
stats['types'][course_type] = count
# Analyse par catégorie
category_counts = df['category'].value_counts()
for category, count in category_counts.items():
stats['categories'][category] = count
# Analyse par distance (pour les courses avec distance)
df_with_distance = df[df['distance_meters'] > 0]
distance_counts = df_with_distance['distance_meters'].value_counts()
for distance, count in distance_counts.items():
stats['distances'][distance] = count
# Détails par type
for course_type in df['type'].unique():
if pd.notna(course_type):
type_df = df[df['type'] == course_type]
stats['by_type'][course_type] = {
'count': len(type_df),
'categories': type_df['category'].value_counts().to_dict(),
'locations': type_df['lieu'].value_counts().head(10).to_dict()
}
# Détails par lieu
location_counts = df['lieu'].value_counts().head(20)
for location, count in location_counts.items():
stats['by_location'][location] = count
# Détails par date (mois/année)
df['date'] = pd.to_datetime(df['date'], errors='coerce')
df['month_year'] = df['date'].dt.to_period('M')
date_counts = df['month_year'].value_counts().sort_index()
for period, count in date_counts.items():
stats['by_date'][str(period)] = count
return df, stats
except Exception as e:
logging.error(f"Erreur lors de l'analyse des courses: {e}")
return None, None
def display_analysis(stats, df=None, show_details=False):
"""Afficher les résultats de l'analyse"""
if not stats:
print("\n❌ Impossible d'analyser les courses")
return
print(f"\n{'='*80}")
print(f"📊 ANALYSE DES COURSES")
print(f"{'='*80}\n")
# Vue d'ensemble
print(f"📋 VUE D'ENSEMBLE")
print(f"{''*40}")
print(f"Total des courses: {stats['total_courses']}")
print()
# Types de courses
print(f"🏷️ TYPES DE COURSES")
print(f"{''*40}")
for course_type, count in sorted(stats['types'].items(), key=lambda x: x[1], reverse=True):
print(f" {course_type}: {count} courses")
print()
# Catégories
print(f"📊 CATÉGORIES")
print(f"{''*40}")
for category, count in sorted(stats['categories'].items(), key=lambda x: x[1], reverse=True):
print(f" {category}: {count} courses")
print()
# Distances
if stats['distances']:
print(f"📏 DISTANCES EXTRACTÉES")
print(f"{''*40}")
# Trier par distance
for distance in sorted(stats['distances'].keys()):
count = stats['distances'][distance]
if distance == 42195:
distance_str = "Marathon (42.195 km)"
elif distance == 21097:
distance_str = "Semi-marathon (21.097 km)"
elif distance >= 1000:
distance_str = f"{distance/1000:.1f} km"
else:
distance_str = f"{distance} m"
print(f" {distance_str}: {count} courses")
print()
# Lieux les plus populaires
print(f"📍 LIEUX LES PLUS POPULAIRES (Top 20)")
print(f"{''*40}")
for i, (location, count) in enumerate(sorted(stats['by_location'].items(), key=lambda x: x[1], reverse=True), 1):
print(f" {i:2d}. {location}: {count} courses")
print()
# Répartition par date
if stats['by_date']:
print(f"📅 RÉPARTITION PAR DATE")
print(f"{''*40}")
for period, count in list(stats['by_date'].items())[-12:]: # Derniers 12 mois
print(f" {period}: {count} courses")
print()
print(f"{'='*80}\n")
# Détails par type
if show_details and stats['by_type']:
print(f"📋 DÉTAILS PAR TYPE DE COURSE")
print(f"{'='*80}\n")
for course_type, details in sorted(stats['by_type'].items(), key=lambda x: x[1]['count'], reverse=True):
print(f"🔹 {course_type}")
print(f" Nombre de courses: {details['count']}")
print(f" Répartition par catégorie:")
for category, count in sorted(details['categories'].items(), key=lambda x: x[1], reverse=True)[:5]:
print(f" - {category}: {count}")
print(f" Top lieux:")
for i, (location, count) in enumerate(sorted(details['locations'].items(), key=lambda x: x[1], reverse=True)[:5], 1):
print(f" {i}. {location}: {count}")
print()
def export_analysis_csv(stats, df, output_dir="data"):
"""Exporter l'analyse en CSV"""
os.makedirs(os.path.join(output_dir, 'exports'), exist_ok=True)
# Exporter le DataFrame enrichi avec distances et catégories
courses_with_analysis = os.path.join(output_dir, 'exports', 'courses_analysis.csv')
if df is not None:
df.to_csv(courses_with_analysis, index=False, encoding='utf-8-sig')
logging.info(f"Exporté {len(df)} courses analysées dans {courses_with_analysis}")
# Exporter les statistiques par type
types_csv = os.path.join(output_dir, 'exports', 'courses_by_type.csv')
if stats['types']:
types_df = pd.DataFrame(list(stats['types'].items()), columns=['Type', 'Count'])
types_df.to_csv(types_csv, index=False, encoding='utf-8-sig')
# Exporter les statistiques par catégorie
categories_csv = os.path.join(output_dir, 'exports', 'courses_by_category.csv')
if stats['categories']:
categories_df = pd.DataFrame(list(stats['categories'].items()), columns=['Category', 'Count'])
categories_df.to_csv(categories_csv, index=False, encoding='utf-8-sig')
# Exporter les statistiques par distance
distances_csv = os.path.join(output_dir, 'exports', 'courses_by_distance.csv')
if stats['distances']:
distances_df = pd.DataFrame(list(stats['distances'].items()), columns=['Distance (m)', 'Count'])
distances_df = distances_df.sort_values('Distance (m)')
distances_df.to_csv(distances_csv, index=False, encoding='utf-8-sig')
return {
'courses_analysis': courses_with_analysis,
'by_type': types_csv,
'by_category': categories_csv,
'by_distance': distances_csv
}
def search_courses_by_distance(df, min_distance=None, max_distance=None):
"""Rechercher des courses par distance"""
if df is None:
return []
mask = df['distance_meters'] > 0
if min_distance is not None:
mask &= df['distance_meters'] >= min_distance
if max_distance is not None:
mask &= df['distance_meters'] <= max_distance
courses = df[mask].to_dict('records')
return courses
def main():
parser = argparse.ArgumentParser(description='Extraire et analyser les types de courses et distances')
parser.add_argument('--data-dir', default='data',
help='Répertoire des données CSV')
parser.add_argument('--details', action='store_true',
help='Afficher les détails par type de course')
parser.add_argument('--export', action='store_true',
help='Exporter l\'analyse en CSV')
parser.add_argument('--search-distance', action='store_true',
help='Rechercher des courses par distance')
parser.add_argument('--min-distance', type=int,
help='Distance minimum en mètres')
parser.add_argument('--max-distance', type=int,
help='Distance maximum en mètres')
args = parser.parse_args()
# Analyse des courses
print(f"\n📊 Analyse des courses depuis {args.data_dir}/...")
df, stats = analyze_courses(args.data_dir)
if df is not None and stats is not None:
# Affichage
display_analysis(stats, df, show_details=args.details)
# Recherche par distance
if args.search_distance:
print(f"\n🔍 Recherche de courses par distance:")
print(f" Min: {args.min_distance}m, Max: {args.max_distance}m")
courses = search_courses_by_distance(df, args.min_distance, args.max_distance)
if courses:
print(f"\n Trouvé {len(courses)} courses:")
for i, course in enumerate(courses[:20], 1):
print(f" {i}. {course['nom']} - {course['distance_meters']}m")
if len(courses) > 20:
print(f" ... et {len(courses) - 20} autres")
else:
print(" Aucune course trouvée avec ces critères")
# Export
if args.export:
files = export_analysis_csv(stats, df, args.data_dir)
print(f"\n💾 Exporté dans:")
for key, filepath in files.items():
print(f" {key}: {filepath}")
else:
print("\n❌ Impossible d'analyser les courses")
print("💡 Vérifiez que les données ont été scrapées avec:")
print(" python ffa_cli.py scrape")
if __name__ == "__main__":
main()

283
scripts/ffa_cli.py Normal file
View File

@@ -0,0 +1,283 @@
#!/usr/bin/env python3
"""
Interface en ligne de commande pour le scraper FFA
"""
import argparse
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../src'))
from ffa_scraper import FFAScraper
from ffa_analyzer import FFADataAnalyzer
import logging
def setup_logging(verbose=False):
"""Configurer le logging"""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def scrape_command(args):
"""Commande de scraping"""
scraper = FFAScraper(output_dir=args.output)
use_multithreading = args.multithreading and not args.no_multithreading
stats = scraper.scrap_all_data(
limit_courses=args.limit_courses,
limit_results=args.limit_results,
fetch_details=args.fetch_details,
max_pages=args.max_pages,
use_multithreading=use_multithreading
)
print(f"\nScraping terminé:")
print(f"- Clubs: {stats['clubs_count']}")
print(f"- Courses: {stats['courses_count']}")
print(f"- Résultats: {stats['results_count']}")
def check_command(args):
"""Commande de vérification du nombre de courses"""
scraper = FFAScraper(output_dir=args.output)
total_pages, total_courses, courses_per_page = scraper._detect_pagination_info()
print("\n" + "="*60)
print("📊 Informations de pagination")
print("="*60)
if total_pages:
print(f"Nombre total de pages: {total_pages}")
print(f"Estimation du nombre total de courses: ~{total_courses}")
print(f"Courses par page: ~{courses_per_page}")
print(f"\n⏱️ Estimation du temps de scraping:")
use_multithreading = args.multithreading and not args.no_multithreading
if use_multithreading:
print(f" - Multithreading (4 workers): ~{total_pages / 4 * 2:.0f} secondes")
else:
print(f" - Séquentiel: ~{total_pages * 2:.0f} secondes")
if total_pages > 10:
print(f"\n⚠️ Attention: {total_pages} pages à scraper!")
if args.auto:
print(f"\nUtilisation de {total_pages} pages pour le scraping.")
stats = scraper.scrap_all_data(
limit_courses=args.limit_courses,
limit_results=args.limit_results,
fetch_details=args.fetch_details,
max_pages=total_pages,
use_multithreading=use_multithreading
)
print(f"\nScraping terminé:")
print(f"- Courses: {stats['courses_count']}")
else:
print("⚠️ Impossible de détecter la pagination. Utilisez --max-pages pour spécifier le nombre de pages.")
print("="*60)
def list_command(args):
"""Commande de listing des données"""
analyzer = FFADataAnalyzer(data_dir=args.data_dir)
print("\n=== Données disponibles ===")
if analyzer.courses_df is not None:
print(f"\n📅 Courses: {len(analyzer.courses_df)} compétitions")
if len(analyzer.courses_df) > 0:
print(" Types de courses:")
types = analyzer.courses_df['type'].value_counts()
for course_type, count in types.head(5).items():
print(f" - {course_type}: {count}")
if analyzer.results_df is not None:
print(f"\n🏃 Résultats: {len(analyzer.results_df)} entrées")
if len(analyzer.results_df) > 0:
print(" Clubs les plus représentés:")
clubs = analyzer.results_df['club'].value_counts().head(5)
for club, count in clubs.items():
print(f" - {club}: {count} résultats")
print("\n Premiers résultats:")
for i, result in enumerate(analyzer.results_df.head(3).to_dict('records'), 1):
print(f" {i}. {result.get('prenom', '')} {result.get('nom', '')} - {result.get('club', '')} - Place: {result.get('place', '')}")
if analyzer.clubs_df is not None and len(analyzer.clubs_df) > 0:
print(f"\n🏟️ Clubs: {len(analyzer.clubs_df)} clubs")
print("\n=== === ===\n")
def search_command(args):
"""Commande de recherche"""
analyzer = FFADataAnalyzer(data_dir=args.data_dir)
if args.type == 'athlete':
results = analyzer.search_athlete(args.nom, args.prenom)
print(f"\nTrouvé {len(results)} résultats pour {args.nom} {args.prenom or ''}")
for i, result in enumerate(results[:20], 1): # Limiter l'affichage
print(f"{i}. {result['prenom']} {result['nom']} - {result['club']} - Place: {result['place']} - {result.get('course_url', '')}")
elif args.type == 'course':
courses = analyzer.get_course_by_date(args.start_date, args.end_date)
print(f"\nTrouvé {len(courses)} courses entre {args.start_date} et {args.end_date}")
for i, course in enumerate(courses[:20], 1):
print(f"{i}. {course.get('nom', 'Inconnu')} - {course.get('date', 'Date inconnue')} - {course.get('lieu', 'Lieu inconnu')}")
elif args.type == 'club':
club_info = analyzer.search_club_in_results(args.nom)
if club_info and club_info.get('athletes'):
print(f"\nClub: {args.nom}")
print(f"Athlètes: {len(club_info.get('athletes', []))}")
for i, athlete in enumerate(club_info.get('athletes', [])[:10], 1):
print(f"{i}. {athlete['prenom']} {athlete['nom']} - {len(athlete['results'])} résultats")
else:
print(f"\nAucun résultat trouvé pour le club: {args.nom}")
def stats_command(args):
"""Commande de statistiques"""
analyzer = FFADataAnalyzer(data_dir=args.data_dir)
if args.type == 'athlete':
stats = analyzer.get_athlete_stats(args.nom, args.prenom)
if stats:
print(f"\nStatistiques pour {stats['prenom']} {stats['nom']}:")
print(f"- Club: {stats.get('club', 'Inconnu')}")
print(f"- Courses total: {stats.get('total_courses', 0)}")
print(f"- Victoires: {stats.get('victoires', 0)}")
print(f"- Podiums: {stats.get('podiums', 0)}")
print(f"- Catégories: {', '.join(stats.get('categories', []))}")
print(f"- Courses par année: {stats.get('courses_par_annee', {})}")
elif args.type == 'club':
rankings = analyzer.get_club_rankings(args.course_url)
print(f"\nClassement par club pour la course {args.course_url}:")
for i, club in enumerate(rankings[:10], 1):
print(f"{i}. {club['club']} - Score: {club['score']} - Participants: {club['participants']}")
def top_command(args):
"""Commande pour afficher le top des athlètes"""
analyzer = FFADataAnalyzer(data_dir=args.data_dir)
top_athletes = analyzer.get_top_athletes(limit=args.limit, min_results=args.min_results)
print(f"\n=== Top {len(top_athletes)} athlètes ===")
print(f"(Minimum {args.min_results} résultats)\n")
for i, athlete in enumerate(top_athletes, 1):
print(f"{i}. {athlete['prenom']} {athlete['nom']}")
print(f" Club: {athlete.get('club', 'Inconnu')}")
print(f" Victoires: {athlete['victoires']} | Podiums: {athlete['podiums']} | Courses: {athlete['results_count']}")
if athlete.get('place_moyenne'):
print(f" Place moyenne: {athlete['place_moyenne']:.2f}")
print()
def export_command(args):
"""Commande d'export"""
analyzer = FFADataAnalyzer(data_dir=args.data_dir)
if args.type == 'athlete':
filepath = analyzer.export_athlete_csv(args.nom, args.prenom, args.filename)
if filepath:
print(f"Exporté dans: {filepath}")
else:
print("Aucun résultat trouvé pour cet athlète")
def main():
parser = argparse.ArgumentParser(description='FFA Calendar Scraper - Outil de scraping et d\'analyse des données de la FFA')
parser.add_argument('--verbose', '-v', action='store_true', help='Mode verbeux')
subparsers = parser.add_subparsers(dest='command', help='Commande à exécuter')
# Commande scrape
scrape_parser = subparsers.add_parser('scrape', help='Lancer le scraping des données')
scrape_parser.add_argument('--output', '-o', default='data', help='Répertoire de sortie des données')
scrape_parser.add_argument('--limit-courses', type=int, help='Limiter le nombre de courses à scraper')
scrape_parser.add_argument('--limit-results', type=int, help='Limiter le nombre de résultats à scraper')
scrape_parser.add_argument('--fetch-details', action='store_true', help='Récupérer les détails et résultats de chaque course (plus lent)')
scrape_parser.add_argument('--max-pages', type=int, default=10, help='Nombre maximum de pages à scraper (défaut: 10)')
scrape_parser.add_argument('--multithreading', action='store_true', default=True, help='Activer le multithreading pour accélérer le scraping (défaut: True)')
scrape_parser.add_argument('--no-multithreading', action='store_true', help='Désactiver le multithreading (scraping séquentiel)')
# Commande list
list_parser = subparsers.add_parser('list', help='Lister les données disponibles')
list_parser.add_argument('--data-dir', default='data', help='Répertoire des données')
# Commande search
search_parser = subparsers.add_parser('search', help='Rechercher des données')
search_parser.add_argument('type', choices=['athlete', 'club', 'course'], help='Type de recherche')
search_parser.add_argument('--data-dir', default='data', help='Répertoire des données')
# Arguments spécifiques à la recherche d'athlète
search_parser.add_argument('--nom', help='Nom de l\'athlète ou du club')
search_parser.add_argument('--prenom', help='Prénom de l\'athlète')
search_parser.add_argument('--start-date', help='Date de début (format: YYYY-MM-DD)')
search_parser.add_argument('--end-date', help='Date de fin (format: YYYY-MM-DD)')
# Commande stats
stats_parser = subparsers.add_parser('stats', help='Afficher des statistiques')
stats_parser.add_argument('type', choices=['athlete', 'club'], help='Type de statistiques')
stats_parser.add_argument('--nom', help='Nom de l\'athlète ou du club')
stats_parser.add_argument('--prenom', help='Prénom de l\'athlète')
stats_parser.add_argument('--course-url', help='URL de la course pour le classement par club')
stats_parser.add_argument('--data-dir', default='data', help='Répertoire des données')
# Commande top
top_parser = subparsers.add_parser('top', help='Afficher le top des athlètes')
top_parser.add_argument('--limit', type=int, default=10, help='Nombre d\'athlètes à afficher (défaut: 10)')
top_parser.add_argument('--min-results', type=int, default=3, help='Nombre minimum de résultats (défaut: 3)')
top_parser.add_argument('--data-dir', default='data', help='Répertoire des données')
# Commande export
export_parser = subparsers.add_parser('export', help='Exporter des données en CSV')
export_parser.add_argument('type', choices=['athlete'], help='Type d\'export')
export_parser.add_argument('--nom', help='Nom de l\'athlète ou du club')
export_parser.add_argument('--prenom', help='Prénom de l\'athlète')
export_parser.add_argument('--filename', help='Nom du fichier de sortie')
export_parser.add_argument('--data-dir', default='data', help='Répertoire des données')
# Commande check
check_parser = subparsers.add_parser('check', help='Vérifier le nombre total de courses disponibles')
check_parser.add_argument('--output', '-o', default='data', help='Répertoire de sortie des données')
check_parser.add_argument('--limit-courses', type=int, help='Limiter le nombre de courses à scraper')
check_parser.add_argument('--limit-results', type=int, help='Limiter le nombre de résultats à scraper')
check_parser.add_argument('--fetch-details', action='store_true', help='Récupérer les détails et résultats de chaque course (plus lent)')
check_parser.add_argument('--auto', action='store_true', help='Lancer automatiquement le scraping après la vérification')
check_parser.add_argument('--multithreading', action='store_true', default=True, help='Activer le multithreading pour accélérer le scraping (défaut: True)')
check_parser.add_argument('--no-multithreading', action='store_true', help='Désactiver le multithreading (scraping séquentiel)')
args = parser.parse_args()
if not args.command:
parser.print_help()
return
setup_logging(args.verbose)
try:
if args.command == 'scrape':
scrape_command(args)
elif args.command == 'list':
list_command(args)
elif args.command == 'search':
search_command(args)
elif args.command == 'top':
top_command(args)
elif args.command == 'stats':
stats_command(args)
elif args.command == 'export':
export_command(args)
elif args.command == 'check':
check_command(args)
except Exception as e:
logging.error(f"Erreur lors de l'exécution de la commande {args.command}: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

181
scripts/list_clubs.py Executable file
View File

@@ -0,0 +1,181 @@
#!/usr/bin/env python3
"""
Script pour lister tous les clubs présents dans les résultats FFA
Utilise les fichiers CSV générés par le scraper ou les données live depuis l'URL FFA
"""
import pandas as pd
import os
import sys
import argparse
import logging
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../src'))
from ffa_scraper import FFAScraper
from collections import defaultdict
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def list_clubs_from_csv(data_dir="data"):
"""Lister tous les clubs à partir des fichiers CSV"""
results_path = os.path.join(data_dir, 'resultats', 'results.csv')
if not os.path.exists(results_path):
logging.error(f"Fichier de résultats introuvable: {results_path}")
print("\n💡 Pour générer les résultats, utilisez:")
print(" python ffa_cli.py scrape --fetch-details")
return []
try:
df = pd.read_csv(results_path, encoding='utf-8-sig')
logging.info(f"Chargé {len(df)} résultats")
# Extraire les clubs uniques
clubs_info = df.groupby('club').agg({
'nom': lambda x: x.nunique(), # Nombre d'athlètes uniques
'dept': lambda x: x.mode()[0] if len(x.mode()) > 0 else '',
'ligue': lambda x: x.mode()[0] if len(x.mode()) > 0 else ''
}).reset_index()
clubs_info.columns = ['club', 'athletes_count', 'departement', 'ligue']
clubs_info = clubs_info.sort_values('athletes_count', ascending=False)
return clubs_info.to_dict('records')
except Exception as e:
logging.error(f"Erreur lors de la lecture des CSV: {e}")
return []
def list_clubs_live():
"""Lister les clubs depuis l'URL FFA (besoin de scraping live)"""
scraper = FFAScraper()
logging.info("Récupération des données en direct depuis le site FFA...")
logging.warning("Note: Cette méthode nécessite un scraping complet, ce qui peut prendre du temps")
# Récupérer les résultats depuis le site
# Pour simplifier, nous récupérons quelques courses et extrayons les clubs
total_pages, total_courses, _ = scraper._detect_pagination_info()
if not total_pages:
logging.error("Impossible de détecter les données")
return []
# Limiter à quelques pages pour éviter trop de temps
max_pages = min(5, total_pages)
logging.info(f"Analyse de {max_pages} pages pour extraire les clubs...")
clubs = defaultdict(lambda: {
'count': 0,
'athletes': set(),
'dept': '',
'ligue': ''
})
# Scraper les courses et récupérer les résultats
courses = scraper.get_courses_list(max_pages=max_pages, use_multithreading=True)
for course in courses:
if course.get('resultats_url'):
results = scraper.get_course_results(course['resultats_url'])
for result in results:
club = result.get('club', 'Inconnu')
clubs[club]['count'] += 1
clubs[club]['athletes'].add(f"{result.get('prenom', '')} {result.get('nom', '')}")
if not clubs[club]['dept'] and result.get('dept'):
clubs[club]['dept'] = result['dept']
if not clubs[club]['ligue'] and result.get('ligue'):
clubs[club]['ligue'] = result['ligue']
# Convertir en liste et trier
clubs_list = []
for club, info in clubs.items():
clubs_list.append({
'club': club,
'athletes_count': len(info['athletes']),
'results_count': info['count'],
'departement': info['dept'],
'ligue': info['ligue']
})
clubs_list.sort(key=lambda x: x['athletes_count'], ascending=False)
return clubs_list
def display_clubs(clubs, limit=None, show_details=False):
"""Afficher la liste des clubs"""
if not clubs:
print("\n❌ Aucun club trouvé")
return
print(f"\n{'='*80}")
print(f"📊 LISTE DES CLUBS ({len(clubs)} clubs trouvés)")
print(f"{'='*80}\n")
if limit:
clubs = clubs[:limit]
for i, club in enumerate(clubs, 1):
print(f"{i:3d}. {club['club']}")
print(f" Athlètes: {club['athletes_count']}")
if show_details:
if 'results_count' in club:
print(f" Résultats totaux: {club['results_count']}")
if club.get('departement'):
print(f" Département: {club['departement']}")
if club.get('ligue'):
print(f" Ligue: {club['ligue']}")
print()
print(f"{'='*80}")
def export_clubs_csv(clubs, filename="clubs_list_export.csv", output_dir="data"):
"""Exporter la liste des clubs en CSV"""
os.makedirs(output_dir, exist_ok=True)
filepath = os.path.join(output_dir, filename)
df = pd.DataFrame(clubs)
df.to_csv(filepath, index=False, encoding='utf-8-sig')
logging.info(f"Exporté {len(clubs)} clubs dans {filepath}")
return filepath
def main():
parser = argparse.ArgumentParser(description='Lister tous les clubs des résultats FFA')
parser.add_argument('--csv', action='store_true', default=True,
help='Utiliser les fichiers CSV existants (défaut)')
parser.add_argument('--live', action='store_true',
help='Récupérer les données en direct depuis le site FFA')
parser.add_argument('--limit', type=int,
help='Limiter le nombre de clubs affichés')
parser.add_argument('--details', action='store_true',
help='Afficher les détails (dpt, ligue, résultats)')
parser.add_argument('--export', action='store_true',
help='Exporter la liste en CSV')
parser.add_argument('--output', default='data',
help='Répertoire de sortie pour les données CSV')
parser.add_argument('--export-filename', default='clubs_list_export.csv',
help='Nom du fichier CSV exporté')
args = parser.parse_args()
# Choisir la source des données
if args.live:
print("\n⚠️ Mode live: récupération des données depuis le site FFA...")
clubs = list_clubs_live()
else:
print(f"\n📂 Mode CSV: utilisation des fichiers dans {args.output}/")
clubs = list_clubs_from_csv(args.output)
# Afficher les résultats
display_clubs(clubs, limit=args.limit, show_details=args.details)
# Exporter si demandé
if args.export and clubs:
filepath = export_clubs_csv(clubs, args.export_filename, args.output)
print(f"\n💾 Exporté dans: {filepath}")
if __name__ == "__main__":
main()

72
scripts/monitor_scraping.py Executable file
View File

@@ -0,0 +1,72 @@
#!/usr/bin/env python3
"""
Script de surveillance du scraping FFA
"""
import time
import pandas as pd
from pathlib import Path
def monitor_scraping(data_dir="data_2024_2025"):
"""Surveiller le scraping et afficher les statistiques"""
results_file = Path(data_dir) / "resultats" / "results.csv"
courses_file = Path(data_dir) / "courses" / "courses_list.csv"
while True:
print("\n" + "="*60)
print(f"📊 Surveillance du scraping - {time.strftime('%H:%M:%S')}")
print("="*60)
# Statistiques des courses
if courses_file.exists():
courses_df = pd.read_csv(courses_file)
print(f"📅 Courses disponibles: {len(courses_df)}")
# Statistiques des résultats
if results_file.exists():
results_df = pd.read_csv(results_file)
print(f"🏃 Résultats récupérés: {len(results_df)}")
print(f"💾 Taille fichier: {results_file.stat().st_size / (1024*1024):.2f} Mo")
# Recherche Augustin ROUX
augustin_mask = (
results_df['nom'].str.contains('ROUX', case=False, na=False) &
results_df['prenom'].str.contains('Augustin', case=False, na=False)
)
augustin_results = results_df[augustin_mask]
print(f"\n🎯 Recherche: Augustin ROUX")
print(f" Résultats trouvés: {len(augustin_results)}")
if len(augustin_results) > 0:
print(f"\n Détails des résultats:")
for idx, row in augustin_results.iterrows():
print(f" - Place {row['place']}: {row['resultat']} ({row['date'] if 'date' in row else 'N/A'})")
if 'club' in row and pd.notna(row['club']):
print(f" Club: {row['club']}")
# Top 5 clubs par nombre de résultats
print(f"\n🏟️ Top 5 clubs par nombre de résultats:")
top_clubs = results_df['club'].value_counts().head(5)
for club, count in top_clubs.items():
print(f" - {club}: {count} résultats")
# Recherche clubs Charente-Maritime (17)
print(f"\n📍 Clubs Charente-Maritime (17):")
dept17_mask = results_df['club'].str.contains(r'[\( ]17[\) ]', na=False)
dept17_results = results_df[dept17_mask]
dept17_clubs = dept17_results['club'].unique() if len(dept17_results) > 0 else []
if len(dept17_clubs) > 0:
for club in dept17_clubs[:10]:
count = len(results_df[results_df['club'] == club])
print(f" - {club}: {count} résultats")
else:
print(f" Aucun résultat trouvé pour le département 17")
print("\n⏳ Prochaine vérification dans 60 secondes...")
time.sleep(60)
if __name__ == "__main__":
monitor_scraping()

298
scripts/post_process.py Executable file
View File

@@ -0,0 +1,298 @@
#!/usr/bin/env python3
"""
Script de post-traitement pour analyser et trier les données scrapées
"""
import os
import sys
import logging
import pandas as pd
from collections import defaultdict
from datetime import datetime
def analyze_clubs(data_dir):
"""Analyser et extraire tous les clubs"""
logging.info("=== Analyse des clubs ===")
results_path = os.path.join(data_dir, 'resultats', 'results.csv')
if os.path.exists(results_path):
df = pd.read_csv(results_path, encoding='utf-8-sig')
# Extraire les clubs uniques
clubs_info = df.groupby('club').agg({
'nom': lambda x: x.nunique(),
'prenom': lambda x: x.nunique()
}).reset_index()
clubs_info.columns = ['club', 'athletes_count', 'unique_athletes']
clubs_info = clubs_info.sort_values('athletes_count', ascending=False)
# Sauvegarder
clubs_dir = os.path.join(data_dir, 'clubs')
os.makedirs(clubs_dir, exist_ok=True)
clubs_file = os.path.join(clubs_dir, 'clubs_list.csv')
clubs_info.to_csv(clubs_file, index=False, encoding='utf-8-sig')
logging.info(f"{len(clubs_info)} clubs exportés dans {clubs_file}")
logging.info(f" Top 5 clubs:")
for i, club in clubs_info.head(5).iterrows():
logging.info(f" {i+1}. {club['club']}: {club['athletes_count']} résultats")
return clubs_info
else:
logging.warning("⚠️ Fichier de résultats introuvable")
return None
def analyze_courses(data_dir):
"""Analyser et extraire les statistiques des courses"""
logging.info("=== Analyse des courses ===")
courses_path = os.path.join(data_dir, 'courses', 'courses_list.csv')
if os.path.exists(courses_path):
df = pd.read_csv(courses_path, encoding='utf-8-sig')
# Convertir les dates
df['date'] = pd.to_datetime(df['date'], errors='coerce')
df['année'] = df['date'].dt.year
df['mois'] = df['date'].dt.month
# Statistiques par année
courses_by_year = df.groupby('année').size().reset_index(name='count')
courses_by_year = courses_by_year.sort_values('année')
# Statistiques par type
courses_by_type = df['type'].value_counts().reset_index()
courses_by_type.columns = ['type', 'count']
# Statistiques par lieu (top 50)
courses_by_location = df['lieu'].value_counts().head(50).reset_index()
courses_by_location.columns = ['lieu', 'count']
# Sauvegarder les statistiques
stats_dir = os.path.join(data_dir, 'statistics')
os.makedirs(stats_dir, exist_ok=True)
# Export par année
year_file = os.path.join(stats_dir, 'courses_by_year.csv')
courses_by_year.to_csv(year_file, index=False, encoding='utf-8-sig')
# Export par type
type_file = os.path.join(stats_dir, 'courses_by_type.csv')
courses_by_type.to_csv(type_file, index=False, encoding='utf-8-sig')
# Export par lieu
location_file = os.path.join(stats_dir, 'courses_by_location.csv')
courses_by_location.to_csv(location_file, index=False, encoding='utf-8-sig')
logging.info(f"✅ Statistiques exportées dans {stats_dir}")
logging.info(f" Années: {len(courses_by_year)}")
logging.info(f" Types: {len(courses_by_type)}")
logging.info(f" Lieux: {len(courses_by_location)}")
# Récapitulatif
logging.info(f"\n📊 RÉCAPITULATIF DES COURSES:")
logging.info(f" Total: {len(df)} courses")
logging.info(f" Plage de dates: {df['date'].min()} au {df['date'].max()}")
logging.info(f" Années: {len(courses_by_year)}")
logging.info(f" Types: {len(courses_by_type)}")
return {
'total': len(df),
'years': len(courses_by_year),
'types': len(courses_by_type),
'locations': len(courses_by_location)
}
else:
logging.warning("⚠️ Fichier de courses introuvable")
return None
def extract_distances_from_courses(data_dir):
"""Extraire et catégoriser les distances des courses"""
logging.info("=== Extraction des distances ===")
courses_path = os.path.join(data_dir, 'courses', 'courses_list.csv')
if os.path.exists(courses_path):
df = pd.read_csv(courses_path, encoding='utf-8-sig')
import re
# Fonction pour extraire la distance
def extract_distance(course_name):
patterns = [
(r'(\d+)\s*km', lambda m: int(m.group(1)) * 1000),
(r'(\d+)\s*m', lambda m: int(m.group(1))),
(r'marathon', lambda m: 42195),
(r'semi[-\s]?marathon', lambda m: 21097),
]
for pattern, extractor in patterns:
match = re.search(pattern, course_name, re.IGNORECASE)
if match:
try:
return extractor(match)
except:
pass
return None
# Extraire les distances
df['distance_meters'] = df['nom'].apply(extract_distance)
# Catégoriser
def categorize_distance(distance):
if pd.isna(distance):
return 'Autre'
elif distance < 400:
return 'Sprint'
elif distance < 2000:
return 'Demi-fond'
elif distance < 5000:
return 'Fond'
elif distance < 10000:
return 'Intermédiaire'
elif distance < 21000:
return '10km'
elif distance < 22000:
return 'Semi-marathon'
elif distance < 43000:
return 'Longue distance'
elif distance < 50000:
return 'Marathon'
else:
return 'Ultra'
df['category'] = df['distance_meters'].apply(categorize_distance)
# Statistiques par catégorie
categories = df['category'].value_counts().reset_index()
categories.columns = ['category', 'count']
# Sauvegarder les courses avec distances
courses_with_distance = os.path.join(data_dir, 'courses', 'courses_with_distances.csv')
df.to_csv(courses_with_distance, index=False, encoding='utf-8-sig')
# Sauvegarder les statistiques
stats_dir = os.path.join(data_dir, 'statistics')
categories_file = os.path.join(stats_dir, 'courses_by_category.csv')
categories.to_csv(categories_file, index=False, encoding='utf-8-sig')
logging.info(f"✅ Distances extraites et exportées")
logging.info(f" Catégories: {len(categories)}")
logging.info(f"\nRépartition par catégorie:")
for _, row in categories.head(10).iterrows():
logging.info(f" {row['category']}: {row['count']} courses")
return categories
else:
logging.warning("⚠️ Fichier de courses introuvable")
return None
def create_summary(data_dir):
"""Créer un récapitulatif global"""
logging.info("=== Création du récapitulatif ===")
summary_dir = os.path.join(data_dir, 'summary')
os.makedirs(summary_dir, exist_ok=True)
# Créer un fichier de récapitulatif
summary_file = os.path.join(summary_dir, 'global_summary.txt')
with open(summary_file, 'w', encoding='utf-8') as f:
f.write("="*80 + "\n")
f.write("RÉCAPITULATIF GLOBAL DES DONNÉES FFA\n")
f.write("="*80 + "\n")
f.write(f"Date de génération: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
# Courses
courses_path = os.path.join(data_dir, 'courses', 'courses_list.csv')
if os.path.exists(courses_path):
df_courses = pd.read_csv(courses_path, encoding='utf-8-sig')
f.write(f"COURSES\n")
f.write("-"*40 + "\n")
f.write(f"Total des courses: {len(df_courses)}\n")
df_courses['date'] = pd.to_datetime(df_courses['date'], errors='coerce')
f.write(f"Première course: {df_courses['date'].min()}\n")
f.write(f"Dernière course: {df_courses['date'].max()}\n")
years = df_courses['date'].dt.year.dropna().unique()
f.write(f"Années couvertes: {len(years)} ({min(years)} à {max(years)})\n\n")
# Résultats
results_path = os.path.join(data_dir, 'resultats', 'results.csv')
if os.path.exists(results_path):
df_results = pd.read_csv(results_path, encoding='utf-8-sig')
f.write(f"RÉSULTATS\n")
f.write("-"*40 + "\n")
f.write(f"Total des résultats: {len(df_results)}\n")
clubs = df_results['club'].nunique()
f.write(f"Clubs uniques: {clubs}\n")
f.write(f"Athlètes uniques: {df_results['nom'].nunique()}\n\n")
# Clubs
clubs_path = os.path.join(data_dir, 'clubs', 'clubs_list.csv')
if os.path.exists(clubs_path):
df_clubs = pd.read_csv(clubs_path, encoding='utf-8-sig')
f.write(f"CLUBS\n")
f.write("-"*40 + "\n")
f.write(f"Total des clubs: {len(df_clubs)}\n\n")
f.write(f"Top 10 clubs:\n")
for i, club in df_clubs.head(10).iterrows():
f.write(f" {i+1}. {club['club']}: {club['athletes_count']} résultats\n")
f.write("\n")
logging.info(f"✅ Récapitulatif global créé dans {summary_file}")
return summary_file
def main():
"""Fonction principale"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
data_dir = sys.argv[1] if len(sys.argv) > 1 else 'data_2010_2026'
logging.info(f"{'='*80}")
logging.info(f"POST-TRAITEMENT DES DONNÉES FFA")
logging.info(f"{'='*80}")
logging.info(f"Répertoire: {data_dir}\n")
# Analyser les clubs
clubs = analyze_clubs(data_dir)
# Analyser les courses
courses_stats = analyze_courses(data_dir)
# Extraire les distances
categories = extract_distances_from_courses(data_dir)
# Créer le récapitulatif
summary = create_summary(data_dir)
logging.info(f"\n{'='*80}")
logging.info(f"POST-TRAITEMENT TERMINÉ")
logging.info(f"{'='*80}")
# Afficher les statistiques
if courses_stats:
print(f"\n📊 STATISTIQUES FINALES:")
print(f" Courses: {courses_stats['total']}")
print(f" Années: {courses_stats['years']}")
print(f" Types: {courses_stats['types']}")
if clubs is not None:
print(f" Clubs: {len(clubs)}")
if categories is not None:
print(f" Catégories: {len(categories)}")
print(f"\n✅ Toutes les données ont été analysées et exportées!")
print(f"📁 Répertoire principal: {data_dir}")
if __name__ == "__main__":
main()

312
scripts/scrape_all_periods.py Executable file
View File

@@ -0,0 +1,312 @@
#!/usr/bin/env python3
"""
Script de scraping FFA avec multithreading maximal
Scrape par périodes de 15 jours et exécute les scripts de post-traitement
"""
import os
import sys
import time
import logging
import subprocess
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import pandas as pd
# Charger le module scraper
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../src'))
from ffa_scraper import FFAScraper
def get_15_day_periods(start_year=2010, end_year=2026):
"""Générer les périodes de 15 jours entre start_year et end_year"""
periods = []
start_date = datetime(start_year, 1, 1)
end_date = datetime(end_year, 12, 31)
current_date = start_date
while current_date <= end_date:
period_end = current_date + timedelta(days=14)
if period_end > end_date:
period_end = end_date
period_name = f"{current_date.strftime('%Y-%m-%d')}_to_{period_end.strftime('%Y-%m-%d')}"
periods.append({
'name': period_name,
'start': current_date,
'end': period_end
})
current_date = period_end + timedelta(days=1)
logging.info(f"Nombre total de périodes de 15 jours: {len(periods)}")
return periods
def scrape_period(period, period_index, total_periods):
"""Scraper une période spécifique"""
scraper = FFAScraper()
start_str = period['start'].strftime('%Y-%m-%d')
end_str = period['end'].strftime('%Y-%m-%d')
year = period['start'].year
# Construire l'URL pour cette période
url = (
f"https://www.athle.fr/bases/liste.aspx?frmpostback=true"
f"&frmbase=calendrier&frmmode=1&frmespace=0"
f"&frmsaisonffa={year}"
f"&frmdate1={start_str}&frmdate2={end_str}"
f"&frmtype1=&frmniveau=&frmligue=&frmdepartement=&frmniveaulab="
f"&frmepreuve=&frmtype2=&frmtype3=&frmtype4=&frmposition=4"
)
try:
# Scraper avec multithreading interne en utilisant l'URL personnalisée
courses = scraper.get_courses_list(max_pages=1, use_multithreading=False, calendar_url=url)
if courses:
logging.info(f"[{period_index + 1}/{total_periods}] {len(courses)} courses pour {start_str} au {end_str}")
# Sauvegarder immédiatement dans un fichier spécifique à la période
output_dir = os.getenv('OUTPUT_DIR', 'data_2010_2026')
period_dir = os.path.join(output_dir, 'courses', 'periods')
os.makedirs(period_dir, exist_ok=True)
period_file = os.path.join(period_dir, f"courses_{period['name']}.csv")
df = pd.DataFrame(courses)
df.to_csv(period_file, index=False, encoding='utf-8-sig')
return {
'period': period,
'courses': courses,
'success': True
}
else:
return {
'period': period,
'courses': [],
'success': True
}
except Exception as e:
logging.error(f"Erreur pour {start_str} au {end_str}: {e}")
return {
'period': period,
'courses': [],
'success': False,
'error': str(e)
}
finally:
scraper._close_all_selenium()
def scrape_all_periods_multithreaded(periods, max_workers=8):
"""Scraper toutes les périodes avec multithreading maximal"""
all_courses = []
total_periods = len(periods)
logging.info(f"=== Scraping avec {max_workers} workers ===")
logging.info(f"Périodes à scraper: {total_periods}")
with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix='scraper') as executor:
# Soumettre toutes les tâches
future_to_period = {
executor.submit(scrape_period, period, i, total_periods): i
for i, period in enumerate(periods)
}
# Barre de progression
with tqdm(total=total_periods, desc="Périodes scrapées", unit="période") as pbar:
for future in as_completed(future_to_period):
period_index = future_to_period[future]
try:
result = future.result()
all_courses.extend(result['courses'])
pbar.update(1)
pbar.set_postfix({
'total': len(all_courses),
'success': result['success']
})
except Exception as e:
logging.error(f"Erreur sur la période {period_index}: {e}")
pbar.update(1)
return all_courses
def merge_all_period_courses(output_dir):
"""Fusionner tous les fichiers CSV de périodes"""
logging.info(f"\n=== Fusion de tous les fichiers CSV ===")
periods_dir = os.path.join(output_dir, 'courses', 'periods')
all_courses = []
# Lire tous les fichiers CSV
if os.path.exists(periods_dir):
period_files = [f for f in os.listdir(periods_dir) if f.endswith('.csv')]
for period_file in tqdm(period_files, desc="Fusion des fichiers"):
file_path = os.path.join(periods_dir, period_file)
try:
df = pd.read_csv(file_path, encoding='utf-8-sig')
all_courses.append(df)
except Exception as e:
logging.warning(f"Erreur lors de la lecture de {period_file}: {e}")
if all_courses:
# Fusionner tous les DataFrames
merged_df = pd.concat(all_courses, ignore_index=True)
# Sauvegarder le fichier consolidé
courses_list_path = os.path.join(output_dir, 'courses', 'courses_list.csv')
os.makedirs(os.path.dirname(courses_list_path), exist_ok=True)
merged_df.to_csv(courses_list_path, index=False, encoding='utf-8-sig')
logging.info(f"✅ Fusionné {len(all_courses)} fichiers dans {courses_list_path}")
logging.info(f" Total: {len(merged_df)} courses")
return merged_df
else:
logging.error("❌ Aucun fichier CSV à fusionner")
return None
def run_post_processing(output_dir):
"""Exécuter les scripts de post-traitement"""
logging.info(f"\n=== Exécution des scripts de post-traitement ===")
# Exécuter le script de post-traitement principal
post_process_script = os.path.join('.', 'post_process.py')
if os.path.exists(post_process_script):
logging.info(f"\n📝 Exécution de post_process.py...")
try:
result = subprocess.run(
[sys.executable, post_process_script, output_dir],
capture_output=True,
text=True,
timeout=600
)
if result.returncode == 0:
logging.info(f"✅ post_process.py terminé avec succès")
# Afficher les résultats
output_lines = result.stdout.split('\n')
for line in output_lines[-30:]: # Dernières 30 lignes
if line.strip():
logging.info(f" {line}")
else:
logging.error(f"❌ post_process.py a échoué")
logging.error(f" Erreur: {result.stderr[:500]}")
except subprocess.TimeoutExpired:
logging.warning(f"⏰ post_process.py a expiré après 10 minutes")
except Exception as e:
logging.error(f"❌ Erreur lors de l'exécution de post_process.py: {e}")
else:
logging.warning(f"⚠️ Script post_process.py introuvable")
# Exécuter les scripts utilitaires supplémentaires
additional_scripts = [
('list_clubs.py', ['--output', output_dir, '--details']),
('extract_races.py', ['--data-dir', output_dir, '--details']),
]
for script_name, args in additional_scripts:
script_path = os.path.join('.', script_name)
if os.path.exists(script_path):
logging.info(f"\n📝 Exécution de {script_name}...")
try:
result = subprocess.run(
[sys.executable, script_path] + args,
capture_output=True,
text=True,
timeout=300
)
if result.returncode == 0:
logging.info(f"{script_name} terminé avec succès")
else:
logging.warning(f"⚠️ {script_name} a rencontré des erreurs")
except subprocess.TimeoutExpired:
logging.warning(f"{script_name} a expiré après 5 minutes")
except Exception as e:
logging.warning(f"⚠️ Erreur lors de l'exécution de {script_name}: {e}")
else:
logging.warning(f"⚠️ Script {script_name} introuvable")
def main():
"""Fonction principale"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('ffa_scraper.log'),
logging.StreamHandler()
]
)
# Configuration
start_year = 2010
end_year = 2026
max_workers = 8 # Workers pour le multithreading
logging.info(f"{'='*80}")
logging.info(f"SCRAPING FFA COMPLET ({start_year}-{end_year})")
logging.info(f"{'='*80}")
logging.info(f"Mode: Multithreading avec {max_workers} workers")
logging.info(f"Périodes: 15 jours par période")
# Générer les périodes
periods = get_15_day_periods(start_year, end_year)
# Scraper toutes les périodes
start_time = time.time()
all_courses = scrape_all_periods_multithreaded(periods, max_workers)
end_time = time.time()
# Statistiques
logging.info(f"\n{'='*80}")
logging.info(f"RÉSUMÉ DU SCRAPING")
logging.info(f"{'='*80}")
logging.info(f"Temps total: {(end_time - start_time)/60:.1f} minutes")
logging.info(f"Courses récupérées: {len(all_courses)}")
logging.info(f"Temps moyen par période: {(end_time - start_time)/len(periods):.1f} secondes")
# Fusionner tous les fichiers CSV
output_dir = os.getenv('OUTPUT_DIR', 'data_2010_2026')
merged_df = merge_all_period_courses(output_dir)
if merged_df is not None:
# Statistiques supplémentaires
print(f"\n{'='*80}")
print(f"STATISTIQUES DES COURSES")
print(f"{'='*80}")
print(f"Total: {len(merged_df)} courses")
# Courses par année
merged_df['date'] = pd.to_datetime(merged_df['date'], errors='coerce')
merged_df['année'] = merged_df['date'].dt.year
print(f"\nCourses par année:")
for year in sorted(merged_df['année'].dropna().unique()):
count = len(merged_df[merged_df['année'] == year])
print(f" {year}: {count} courses")
print(f"\n✅ Scraping terminé avec succès!")
# Exécuter les scripts de post-traitement
run_post_processing(output_dir)
print(f"\n{'='*80}")
print(f"TOUTES LES DONNÉES SONT DISPONIBLES DANS: {output_dir}")
print(f"{'='*80}")
else:
logging.error("❌ Erreur lors de la fusion des fichiers")
sys.exit(1)
if __name__ == "__main__":
main()

215
scripts/search_athlete.py Executable file
View File

@@ -0,0 +1,215 @@
#!/usr/bin/env python3
"""
Script pour rechercher un athlète dans les résultats FFA
Peut utiliser les fichiers CSV ou chercher directement depuis l'URL FFA
"""
import pandas as pd
import os
import sys
import argparse
import logging
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../src'))
from ffa_scraper import FFAScraper
from ffa_analyzer import FFADataAnalyzer
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def search_athlete_csv(nom, prenom=None, data_dir="data"):
"""Rechercher un athlète dans les fichiers CSV"""
results_path = os.path.join(data_dir, 'resultats', 'results.csv')
if not os.path.exists(results_path):
logging.error(f"Fichier de résultats introuvable: {results_path}")
return []
try:
df = pd.read_csv(results_path, encoding='utf-8-sig')
# Filtre par nom (obligatoire)
mask = df['nom'].str.contains(nom, case=False, na=False)
# Filtre par prénom (optionnel)
if prenom:
mask &= df['prenom'].str.contains(prenom, case=False, na=False)
results = df[mask].to_dict('records')
logging.info(f"Trouvé {len(results)} résultats pour {nom} {prenom or ''}")
return results
except Exception as e:
logging.error(f"Erreur lors de la recherche dans les CSV: {e}")
return []
def search_athlete_live(nom, prenom=None, max_pages=5):
"""Rechercher un athlète en direct depuis le site FFA"""
scraper = FFAScraper()
logging.info(f"Recherche en direct pour {nom} {prenom or ''}...")
logging.warning("Note: Cela peut prendre du temps car il faut scraper les résultats")
# Récupérer les courses et leurs résultats
total_pages, total_courses, _ = scraper._detect_pagination_info()
if not total_pages:
logging.error("Impossible de détecter les données")
return []
max_pages = min(max_pages, total_pages)
logging.info(f"Analyse de {max_pages} pages...")
all_results = []
courses = scraper.get_courses_list(max_pages=max_pages, use_multithreading=True)
for course in courses:
if course.get('resultats_url'):
results = scraper.get_course_results(course['resultats_url'])
# Filtrer les résultats pour cet athlète
for result in results:
match_nom = nom.lower() in result.get('nom', '').lower()
match_prenom = not prenom or prenom.lower() in result.get('prenom', '').lower()
if match_nom and match_prenom:
# Ajouter des infos de la course
result['course_nom'] = course.get('nom', '')
result['course_date'] = course.get('date', '')
result['course_lieu'] = course.get('lieu', '')
all_results.append(result)
logging.info(f"Trouvé {len(all_results)} résultats en direct")
return all_results
def display_athlete_results(results, show_details=False, limit=None):
"""Afficher les résultats d'un athlète"""
if not results:
print("\n❌ Aucun résultat trouvé pour cet athlète")
return
# Identifier l'athlète
athlete_nom = results[0].get('nom', 'Inconnu')
athlete_prenom = results[0].get('prenom', '')
print(f"\n{'='*80}")
print(f"🏃 RÉSULTATS POUR {athlete_prenom} {athlete_nom}")
print(f"{'='*80}\n")
if limit:
results = results[:limit]
# Afficher les informations générales
print(f"Club: {results[0].get('club', 'Inconnu')}")
print(f"Total des courses: {len(results)}")
if show_details:
# Calculer des statistiques
podiums = 0
victoires = 0
places = []
for result in results:
try:
place = int(result.get('place', 0))
if place == 1:
victoires += 1
podiums += 1
elif place <= 3:
podiums += 1
places.append(place)
except:
pass
print(f"Victoires: {victoires}")
print(f"Podiums: {podiums}")
if places:
avg_place = sum(places) / len(places)
print(f"Place moyenne: {avg_place:.2f}")
print(f"\n{'='*80}\n")
# Afficher les résultats individuels
print(f"{'📋 LISTE DES COURSES':<}")
print(f"{'='*80}\n")
for i, result in enumerate(results, 1):
print(f"{i}. {result.get('course_nom', result.get('course_url', 'Inconnu'))}")
if result.get('course_date'):
print(f" 📅 Date: {result['course_date']}")
if result.get('course_lieu'):
print(f" 📍 Lieu: {result['course_lieu']}")
print(f" 🏆 Place: {result.get('place', 'N/A')}")
print(f" ⏱️ Temps: {result.get('temps', result.get('resultat', 'N/A'))}")
print(f" 🏷️ Catégorie: {result.get('categorie', 'N/A')}")
if show_details:
if result.get('points'):
print(f" 🎯 Points: {result['points']}")
if result.get('niveau'):
print(f" 📊 Niveau: {result['niveau']}")
print()
print(f"{'='*80}")
def export_results_csv(results, nom, prenom=None, output_dir="data"):
"""Exporter les résultats d'un athlète en CSV"""
os.makedirs(os.path.join(output_dir, 'exports'), exist_ok=True)
if prenom:
filename = f"athlete_{nom}_{prenom}_results.csv"
else:
filename = f"athlete_{nom}_results.csv"
filepath = os.path.join(output_dir, 'exports', filename.replace(" ", "_"))
df = pd.DataFrame(results)
df.to_csv(filepath, index=False, encoding='utf-8-sig')
logging.info(f"Exporté {len(results)} résultats dans {filepath}")
return filepath
def main():
parser = argparse.ArgumentParser(description='Rechercher un athlète dans les résultats FFA')
parser.add_argument('nom', help='Nom de l\'athlète à rechercher')
parser.add_argument('--prenom', help='Prénom de l\'athlète (optionnel)')
parser.add_argument('--csv', action='store_true', default=True,
help='Utiliser les fichiers CSV existants (défaut)')
parser.add_argument('--live', action='store_true',
help='Récupérer les données en direct depuis le site FFA')
parser.add_argument('--data-dir', default='data',
help='Répertoire des données CSV')
parser.add_argument('--max-pages', type=int, default=5,
help='Nombre maximum de pages à scraper en mode live (défaut: 5)')
parser.add_argument('--details', action='store_true',
help='Afficher les détails complets')
parser.add_argument('--limit', type=int,
help='Limiter le nombre de résultats affichés')
parser.add_argument('--export', action='store_true',
help='Exporter les résultats en CSV')
args = parser.parse_args()
# Recherche
if args.live:
print(f"\n🔍 Mode live: recherche en direct sur le site FFA...")
results = search_athlete_live(args.nom, args.prenom, args.max_pages)
else:
print(f"\n📂 Mode CSV: recherche dans {args.data_dir}/")
results = search_athlete_csv(args.nom, args.prenom, args.data_dir)
# Affichage
display_athlete_results(results, show_details=args.details, limit=args.limit)
# Export
if args.export and results:
filepath = export_results_csv(results, args.nom, args.prenom, args.data_dir)
print(f"\n💾 Exporté dans: {filepath}")
if __name__ == "__main__":
main()

269
scripts/search_race.py Executable file
View File

@@ -0,0 +1,269 @@
#!/usr/bin/env python3
"""
Script pour rechercher une course dans les données FFA
Peut utiliser les fichiers CSV ou chercher directement depuis l'URL FFA
"""
import pandas as pd
import os
import sys
import argparse
import logging
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../src'))
from ffa_scraper import FFAScraper
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def search_race_by_name_csv(nom_course, data_dir="data"):
"""Rechercher une course par nom dans les fichiers CSV"""
courses_path = os.path.join(data_dir, 'courses', 'courses_list.csv')
if not os.path.exists(courses_path):
logging.error(f"Fichier de courses introuvable: {courses_path}")
return []
try:
df = pd.read_csv(courses_path, encoding='utf-8-sig')
mask = df['nom'].str.contains(nom_course, case=False, na=False)
courses = df[mask].to_dict('records')
logging.info(f"Trouvé {len(courses)} courses correspondant à '{nom_course}'")
return courses
except Exception as e:
logging.error(f"Erreur lors de la recherche dans les CSV: {e}")
return []
def search_race_by_date_csv(start_date, end_date=None, data_dir="data"):
"""Rechercher des courses par date dans les fichiers CSV"""
courses_path = os.path.join(data_dir, 'courses', 'courses_list.csv')
if not os.path.exists(courses_path):
logging.error(f"Fichier de courses introuvable: {courses_path}")
return []
try:
df = pd.read_csv(courses_path, encoding='utf-8-sig')
# Convertir les dates
df['date'] = pd.to_datetime(df['date'], errors='coerce')
if end_date:
mask = (df['date'] >= pd.to_datetime(start_date)) & (df['date'] <= pd.to_datetime(end_date))
else:
mask = df['date'] == pd.to_datetime(start_date)
courses = df[mask].sort_values('date').to_dict('records')
logging.info(f"Trouvé {len(courses)} courses dans la période")
return courses
except Exception as e:
logging.error(f"Erreur lors de la recherche par date: {e}")
return []
def search_race_by_type_csv(type_course, data_dir="data"):
"""Rechercher des courses par type dans les fichiers CSV"""
courses_path = os.path.join(data_dir, 'courses', 'courses_list.csv')
if not os.path.exists(courses_path):
logging.error(f"Fichier de courses introuvable: {courses_path}")
return []
try:
df = pd.read_csv(courses_path, encoding='utf-8-sig')
mask = df['type'].str.contains(type_course, case=False, na=False)
courses = df[mask].to_dict('records')
logging.info(f"Trouvé {len(courses)} courses de type '{type_course}'")
return courses
except Exception as e:
logging.error(f"Erreur lors de la recherche par type: {e}")
return []
def search_race_by_location_csv(lieu, data_dir="data"):
"""Rechercher des courses par lieu dans les fichiers CSV"""
courses_path = os.path.join(data_dir, 'courses', 'courses_list.csv')
if not os.path.exists(courses_path):
logging.error(f"Fichier de courses introuvable: {courses_path}")
return []
try:
df = pd.read_csv(courses_path, encoding='utf-8-sig')
mask = df['lieu'].str.contains(lieu, case=False, na=False)
courses = df[mask].to_dict('records')
logging.info(f"Trouvé {len(courses)} courses à '{lieu}'")
return courses
except Exception as e:
logging.error(f"Erreur lors de la recherche par lieu: {e}")
return []
def search_race_live(search_term, search_type="name", max_pages=5):
"""Rechercher une course en direct depuis le site FFA"""
scraper = FFAScraper()
logging.info(f"Recherche en direct de courses (type: {search_type})...")
logging.warning("Note: Cela peut prendre du temps")
# Récupérer les courses
total_pages, total_courses, _ = scraper._detect_pagination_info()
if not total_pages:
logging.error("Impossible de détecter les données")
return []
max_pages = min(max_pages, total_pages)
logging.info(f"Analyse de {max_pages} pages...")
courses = scraper.get_courses_list(max_pages=max_pages, use_multithreading=True)
# Filtrer selon le type de recherche
filtered_courses = []
for course in courses:
match = False
if search_type == "name":
match = search_term.lower() in course.get('nom', '').lower()
elif search_type == "type":
match = search_term.lower() in course.get('type', '').lower()
elif search_type == "location":
match = search_term.lower() in course.get('lieu', '').lower()
if match:
filtered_courses.append(course)
logging.info(f"Trouvé {len(filtered_courses)} courses en direct")
return filtered_courses
def display_race_results(courses, show_details=False, limit=None):
"""Afficher les résultats de recherche de courses"""
if not courses:
print("\n❌ Aucune course trouvée")
return
print(f"\n{'='*80}")
print(f"📅 RÉSULTATS DE LA RECHERCHE ({len(courses)} courses trouvées)")
print(f"{'='*80}\n")
if limit:
courses = courses[:limit]
for i, course in enumerate(courses, 1):
print(f"{i}. {course.get('nom', 'Inconnu')}")
if course.get('date'):
print(f" 📅 Date: {course['date']}")
if course.get('lieu'):
print(f" 📍 Lieu: {course['lieu']}")
if course.get('type'):
print(f" 🏷️ Type: {course['type']}")
if course.get('niveau'):
print(f" 📊 Niveau: {course['niveau']}")
if show_details:
if course.get('discipline'):
print(f" 🎯 Discipline: {course['discipline']}")
if course.get('fiche_detail'):
print(f" 🔗 Détails: {course['fiche_detail']}")
if course.get('resultats_url'):
print(f" 🏆 Résultats: {course['resultats_url']}")
if course.get('page'):
print(f" 📄 Page: {course['page']}")
print()
print(f"{'='*80}")
def export_race_csv(courses, filename, output_dir="data"):
"""Exporter les résultats de recherche en CSV"""
os.makedirs(os.path.join(output_dir, 'exports'), exist_ok=True)
filepath = os.path.join(output_dir, 'exports', filename.replace(" ", "_"))
df = pd.DataFrame(courses)
df.to_csv(filepath, index=False, encoding='utf-8-sig')
logging.info(f"Exporté {len(courses)} courses dans {filepath}")
return filepath
def main():
parser = argparse.ArgumentParser(description='Rechercher une course dans les données FFA')
parser.add_argument('--csv', action='store_true', default=True,
help='Utiliser les fichiers CSV existants (défaut)')
parser.add_argument('--live', action='store_true',
help='Récupérer les données en direct depuis le site FFA')
parser.add_argument('--data-dir', default='data',
help='Répertoire des données CSV')
parser.add_argument('--max-pages', type=int, default=5,
help='Nombre maximum de pages à scraper en mode live (défaut: 5)')
parser.add_argument('--details', action='store_true',
help='Afficher les détails complets')
parser.add_argument('--limit', type=int,
help='Limiter le nombre de courses affichées')
parser.add_argument('--export', action='store_true',
help='Exporter les résultats en CSV')
parser.add_argument('--export-filename',
help='Nom du fichier CSV exporté')
# Critères de recherche
parser.add_argument('--name', help='Rechercher par nom de course')
parser.add_argument('--location', help='Rechercher par lieu')
parser.add_argument('--type', help='Rechercher par type de course')
parser.add_argument('--start-date', help='Date de début (format: YYYY-MM-DD)')
parser.add_argument('--end-date', help='Date de fin (format: YYYY-MM-DD)')
args = parser.parse_args()
# Vérifier qu'au moins un critère de recherche est fourni
if not any([args.name, args.location, args.type, args.start_date]):
parser.error("Au moins un critère de recherche est requis (--name, --location, --type, --start-date)")
# Recherche
courses = []
if args.live:
# Mode live
if args.name:
print(f"\n🔍 Mode live: recherche de courses par nom '{args.name}'...")
courses = search_race_live(args.name, "name", args.max_pages)
elif args.type:
print(f"\n🔍 Mode live: recherche de courses par type '{args.type}'...")
courses = search_race_live(args.type, "type", args.max_pages)
elif args.location:
print(f"\n🔍 Mode live: recherche de courses par lieu '{args.location}'...")
courses = search_race_live(args.location, "location", args.max_pages)
else:
# Mode CSV
print(f"\n📂 Mode CSV: recherche dans {args.data_dir}/")
if args.name:
courses = search_race_by_name_csv(args.name, args.data_dir)
elif args.location:
courses = search_race_by_location_csv(args.location, args.data_dir)
elif args.type:
courses = search_race_by_type_csv(args.type, args.data_dir)
elif args.start_date:
courses = search_race_by_date_csv(args.start_date, args.end_date, args.data_dir)
# Affichage
display_race_results(courses, show_details=args.details, limit=args.limit)
# Export
if args.export and courses:
if args.export_filename:
filename = args.export_filename
else:
filename = f"race_search_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
filepath = export_race_csv(courses, filename, args.data_dir)
print(f"\n💾 Exporté dans: {filepath}")
if __name__ == "__main__":
main()