<!-- .github/pull_request_template.md --> ## Description <!-- Provide a clear description of the changes in this PR --> ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin. --------- Co-authored-by: lxobr <122801072+lxobr@users.noreply.github.com> Co-authored-by: Hande <159312713+hande-k@users.noreply.github.com>
255 lines
8.4 KiB
Python
255 lines
8.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Cross-benchmark analysis orchestration script.
|
|
Downloads qa-benchmarks volume and processes each benchmark folder.
|
|
"""
|
|
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
import pandas as pd
|
|
from analysis.analyze_single_benchmark import analyze_single_benchmark_folder
|
|
|
|
|
|
def download_modal_volume(volume_name: str, download_path: str) -> None:
|
|
"""Download entire modal volume to local directory."""
|
|
print(f"📥 Downloading modal volume: {volume_name}")
|
|
|
|
# Create download directory if it doesn't exist
|
|
Path(download_path).mkdir(parents=True, exist_ok=True)
|
|
|
|
original_dir = os.getcwd()
|
|
os.chdir(download_path)
|
|
|
|
try:
|
|
cmd = ["modal", "volume", "get", volume_name, "/"]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode == 0:
|
|
print("✅ Successfully downloaded modal volume")
|
|
else:
|
|
print(f"❌ Error downloading volume: {result.stderr}")
|
|
sys.exit(1)
|
|
finally:
|
|
os.chdir(original_dir)
|
|
|
|
|
|
def get_benchmark_folders(volume_path: str) -> list:
|
|
"""Get list of benchmark folders from downloaded volume."""
|
|
volume_dir = Path(volume_path)
|
|
|
|
if not volume_dir.exists():
|
|
print(f"❌ Volume directory does not exist: {volume_path}")
|
|
return []
|
|
|
|
benchmark_folders = []
|
|
for item in volume_dir.iterdir():
|
|
if item.is_dir() and not item.name.startswith("."):
|
|
benchmark_folders.append(item.name)
|
|
|
|
print(f"📁 Found {len(benchmark_folders)} benchmark folders")
|
|
return sorted(benchmark_folders)
|
|
|
|
|
|
def check_evaluated_folder_exists(benchmark_path: str) -> bool:
|
|
"""Check if evaluated folder exists and contains JSON files."""
|
|
evaluated_path = Path(benchmark_path) / "evaluated"
|
|
|
|
if not evaluated_path.exists():
|
|
print(f"⚠️ No evaluated folder found: {evaluated_path}")
|
|
return False
|
|
|
|
json_files = list(evaluated_path.glob("*.json"))
|
|
if not json_files:
|
|
print(f"⚠️ No JSON files found in evaluated folder: {evaluated_path}")
|
|
return False
|
|
|
|
print(f"✅ Found {len(json_files)} JSON files in evaluated folder")
|
|
return True
|
|
|
|
|
|
def check_analysis_files_exist(benchmark_path: str) -> bool:
|
|
"""Check if analysis files already exist for this benchmark."""
|
|
analysis_path = Path(benchmark_path) / "analysis"
|
|
|
|
if not analysis_path.exists():
|
|
return False
|
|
|
|
# Check for any CSV files in analysis folder
|
|
csv_files = list(analysis_path.glob("*.csv"))
|
|
return len(csv_files) > 0
|
|
|
|
|
|
def create_analysis_folder(benchmark_path: str) -> str:
|
|
"""Create analysis folder for benchmark if it doesn't exist."""
|
|
analysis_path = Path(benchmark_path) / "analysis"
|
|
analysis_path.mkdir(parents=True, exist_ok=True)
|
|
return str(analysis_path)
|
|
|
|
|
|
def process_single_benchmark(benchmark_folder: str, volume_path: str) -> bool:
|
|
"""Process a single benchmark folder."""
|
|
benchmark_path = Path(volume_path) / benchmark_folder
|
|
|
|
print(f"\n🔄 Processing benchmark: {benchmark_folder}")
|
|
|
|
# Check if evaluated folder exists
|
|
if not check_evaluated_folder_exists(benchmark_path):
|
|
return False
|
|
|
|
# Check if analysis already exists
|
|
if check_analysis_files_exist(benchmark_path):
|
|
print(f"⏭️ Analysis files already exist, skipping: {benchmark_folder}")
|
|
return False
|
|
|
|
try:
|
|
# Run analysis for this benchmark
|
|
analyze_single_benchmark_folder(str(benchmark_path))
|
|
print(f"✅ Successfully processed: {benchmark_folder}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"❌ Error processing {benchmark_folder}: {e}")
|
|
return False
|
|
|
|
|
|
def print_progress_update(current: int, total: int) -> None:
|
|
"""Print progress update every 1/5 of total."""
|
|
if current % max(1, total // 5) == 0:
|
|
print(f"📊 Progress: {current}/{total} benchmarks processed")
|
|
|
|
|
|
def process_all_benchmarks(volume_path: str) -> dict:
|
|
"""Process all benchmark folders with progress tracking."""
|
|
benchmark_folders = get_benchmark_folders(volume_path)
|
|
|
|
results = {"processed": [], "skipped": [], "failed": []}
|
|
|
|
if not benchmark_folders:
|
|
print("❌ No benchmark folders found")
|
|
return results
|
|
|
|
print(f"\n🚀 Starting analysis of {len(benchmark_folders)} benchmarks")
|
|
|
|
for i, folder in enumerate(benchmark_folders):
|
|
print_progress_update(i, len(benchmark_folders))
|
|
|
|
success = process_single_benchmark(folder, volume_path)
|
|
|
|
if success:
|
|
results["processed"].append(folder)
|
|
else:
|
|
results["skipped"].append(folder)
|
|
|
|
return results
|
|
|
|
|
|
def create_cross_benchmark_summary(volume_path: str, results: dict) -> None:
|
|
"""Create a summary CSV with average metrics from all processed benchmarks."""
|
|
print("\n📊 Creating cross-benchmark summary...")
|
|
|
|
summary_data = []
|
|
metrics = ["directllm_correctness", "deepeval_correctness", "EM", "f1"]
|
|
|
|
for benchmark_folder in results["processed"]:
|
|
benchmark_path = Path(volume_path) / benchmark_folder
|
|
aggregate_csv_path = benchmark_path / "analysis" / "metrics_aggregate.csv"
|
|
|
|
if aggregate_csv_path.exists():
|
|
try:
|
|
# Read the aggregate metrics CSV
|
|
df = pd.read_csv(aggregate_csv_path, index_col=0)
|
|
|
|
# Calculate average of averages for each metric
|
|
benchmark_summary = {"benchmark": benchmark_folder, "questions_count": len(df)}
|
|
|
|
for metric in metrics:
|
|
mean_col = f"{metric}_mean"
|
|
if mean_col in df.columns:
|
|
benchmark_summary[f"{metric}_avg"] = df[mean_col].mean()
|
|
else:
|
|
benchmark_summary[f"{metric}_avg"] = None
|
|
|
|
summary_data.append(benchmark_summary)
|
|
print(f" ✅ Added {benchmark_folder}: {len(df)} questions")
|
|
|
|
except Exception as e:
|
|
print(f" ❌ Error reading {benchmark_folder}: {e}")
|
|
else:
|
|
print(f" ⚠️ No aggregate file found for {benchmark_folder}")
|
|
|
|
if summary_data:
|
|
# Create summary DataFrame
|
|
summary_df = pd.DataFrame(summary_data)
|
|
|
|
# Sort by overall performance (average of all metrics)
|
|
metric_cols = [f"{metric}_avg" for metric in metrics]
|
|
valid_metrics = [col for col in metric_cols if col in summary_df.columns]
|
|
|
|
if valid_metrics:
|
|
summary_df["overall_avg"] = summary_df[valid_metrics].mean(axis=1)
|
|
summary_df = summary_df.sort_values("overall_avg", ascending=False)
|
|
|
|
# Save summary CSV
|
|
summary_path = Path(volume_path) / "cross_benchmark_summary.csv"
|
|
summary_df.to_csv(summary_path, index=False)
|
|
|
|
print(f"📈 Cross-benchmark summary saved to: {summary_path}")
|
|
print(f"📊 Processed {len(summary_df)} benchmarks")
|
|
|
|
# Print top performers
|
|
print("\n🏆 Top 3 performers:")
|
|
for i, row in summary_df.head(3).iterrows():
|
|
print(f" {i + 1}. {row['benchmark']}: {row.get('overall_avg', 'N/A'):.4f}")
|
|
else:
|
|
print("❌ No benchmark data found for summary")
|
|
|
|
|
|
def print_summary(results: dict) -> None:
|
|
"""Print summary of processing results."""
|
|
print("\n" + "=" * 50)
|
|
print("📊 PROCESSING SUMMARY")
|
|
print("=" * 50)
|
|
|
|
print(f"✅ Successfully processed: {len(results['processed'])}")
|
|
print(f"⏭️ Skipped (already exists): {len(results['skipped'])}")
|
|
print(f"❌ Failed: {len(results['failed'])}")
|
|
|
|
if results["processed"]:
|
|
print("\n📁 Processed benchmarks:")
|
|
for folder in results["processed"]:
|
|
print(f" - {folder}")
|
|
|
|
if results["skipped"]:
|
|
print("\n⏭️ Skipped benchmarks:")
|
|
for folder in results["skipped"]:
|
|
print(f" - {folder}")
|
|
|
|
|
|
def main():
|
|
"""Main orchestration function."""
|
|
VOLUME_NAME = "qa-benchmarks"
|
|
DOWNLOAD_PATH = "temp"
|
|
|
|
print("🚀 Starting cross-benchmark analysis")
|
|
print(f"📦 Modal volume: {VOLUME_NAME}")
|
|
print(f"📁 Download path: {DOWNLOAD_PATH}")
|
|
print("-" * 50)
|
|
|
|
# Download modal volume
|
|
download_modal_volume(VOLUME_NAME, DOWNLOAD_PATH)
|
|
|
|
# Process all benchmarks
|
|
results = process_all_benchmarks(DOWNLOAD_PATH)
|
|
|
|
# Create cross-benchmark summary
|
|
create_cross_benchmark_summary(DOWNLOAD_PATH, results)
|
|
|
|
# Print summary
|
|
print_summary(results)
|
|
|
|
print("\n🎉 Cross-benchmark analysis completed!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|