<!-- .github/pull_request_template.md --> ## Description <!-- Provide a clear description of the changes in this PR --> ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin. --------- Co-authored-by: lxobr <122801072+lxobr@users.noreply.github.com> Co-authored-by: Hande <159312713+hande-k@users.noreply.github.com>
539 lines
19 KiB
Python
539 lines
19 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Postprocessing script to create benchmark summary JSON from cross-benchmark analysis results.
|
||
Converts CSV data into JSON format with confidence intervals.
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import pandas as pd
|
||
from pathlib import Path
|
||
from typing import Dict, List, Any, Tuple
|
||
import numpy as np
|
||
|
||
|
||
def validate_csv_exists(csv_path: str) -> bool:
|
||
"""Validate that the CSV file exists and is readable."""
|
||
if not os.path.exists(csv_path):
|
||
print(f"❌ CSV file not found: {csv_path}")
|
||
return False
|
||
|
||
if not csv_path.endswith(".csv"):
|
||
print(f"❌ File is not a CSV: {csv_path}")
|
||
return False
|
||
|
||
print(f"✅ CSV file found: {csv_path}")
|
||
return True
|
||
|
||
|
||
def read_summary_dataframe(csv_path: str) -> pd.DataFrame:
|
||
"""Read the cross-benchmark summary CSV into a DataFrame."""
|
||
try:
|
||
df = pd.read_csv(csv_path)
|
||
print(f"✅ Successfully loaded CSV with {len(df)} rows and {len(df.columns)} columns")
|
||
return df
|
||
except Exception as e:
|
||
print(f"❌ Error reading CSV: {e}")
|
||
raise
|
||
|
||
|
||
def validate_required_columns(df: pd.DataFrame) -> bool:
|
||
"""Validate that the DataFrame has all required columns."""
|
||
required_columns = [
|
||
"benchmark",
|
||
"directllm_correctness_avg",
|
||
"deepeval_correctness_avg",
|
||
"EM_avg",
|
||
"f1_avg",
|
||
]
|
||
|
||
missing_columns = [col for col in required_columns if col not in df.columns]
|
||
if missing_columns:
|
||
print(f"❌ Missing required columns: {missing_columns}")
|
||
print(f"Available columns: {list(df.columns)}")
|
||
return False
|
||
|
||
print(f"✅ All required columns found: {required_columns}")
|
||
return True
|
||
|
||
|
||
def load_cross_benchmark_data(csv_path: str) -> pd.DataFrame:
|
||
"""Load cross-benchmark summary CSV data."""
|
||
print(f"📊 Loading cross-benchmark data from {csv_path}")
|
||
|
||
# Validate file exists
|
||
if not validate_csv_exists(csv_path):
|
||
raise FileNotFoundError(f"CSV file not found: {csv_path}")
|
||
|
||
# Read DataFrame
|
||
df = read_summary_dataframe(csv_path)
|
||
|
||
# Validate required columns
|
||
if not validate_required_columns(df):
|
||
raise ValueError("CSV missing required columns")
|
||
|
||
print("✅ Successfully loaded cross-benchmark data")
|
||
return df
|
||
|
||
|
||
def get_benchmark_analysis_path(benchmark_name: str, temp_dir: str) -> str:
|
||
"""Get the path to the analysis folder for a benchmark."""
|
||
analysis_path = os.path.join(temp_dir, benchmark_name, "analysis")
|
||
return analysis_path
|
||
|
||
|
||
def load_aggregate_metrics(benchmark_name: str, temp_dir: str) -> pd.DataFrame:
|
||
"""Load the metrics_aggregate.csv file for a benchmark."""
|
||
analysis_path = get_benchmark_analysis_path(benchmark_name, temp_dir)
|
||
aggregate_csv_path = os.path.join(analysis_path, "metrics_aggregate.csv")
|
||
|
||
if not os.path.exists(aggregate_csv_path):
|
||
raise FileNotFoundError(f"Aggregate metrics file not found: {aggregate_csv_path}")
|
||
|
||
try:
|
||
df = pd.read_csv(aggregate_csv_path, index_col=0)
|
||
print(f"✅ Loaded aggregate metrics for {benchmark_name}: {len(df)} questions")
|
||
return df
|
||
except Exception as e:
|
||
print(f"❌ Error loading aggregate metrics for {benchmark_name}: {e}")
|
||
raise
|
||
|
||
|
||
def bootstrap_confidence_interval(
|
||
data: List[float], n_bootstrap: int = 1000, confidence: float = 0.95
|
||
) -> List[float]:
|
||
"""Calculate bootstrap confidence interval for given data."""
|
||
bootstrap_means = []
|
||
|
||
for _ in range(n_bootstrap):
|
||
# Resample with replacement
|
||
resampled = np.random.choice(data, size=len(data), replace=True)
|
||
# Calculate mean of resampled data
|
||
bootstrap_means.append(np.mean(resampled))
|
||
|
||
# Calculate confidence interval
|
||
alpha = 1 - confidence
|
||
lower_percentile = (alpha / 2) * 100
|
||
upper_percentile = (1 - alpha / 2) * 100
|
||
|
||
lower_bound = np.percentile(bootstrap_means, lower_percentile)
|
||
upper_bound = np.percentile(bootstrap_means, upper_percentile)
|
||
|
||
return [lower_bound, upper_bound]
|
||
|
||
|
||
def load_all_run_scores(benchmark_name: str, temp_dir: str, metric: str) -> List[float]:
|
||
"""Load all individual run scores for a metric from the metrics CSV files."""
|
||
analysis_path = get_benchmark_analysis_path(benchmark_name, temp_dir)
|
||
metrics_csv_path = os.path.join(analysis_path, f"metrics_{metric}.csv")
|
||
|
||
if not os.path.exists(metrics_csv_path):
|
||
raise FileNotFoundError(f"Metrics CSV file not found: {metrics_csv_path}")
|
||
|
||
try:
|
||
df = pd.read_csv(metrics_csv_path, index_col=0)
|
||
# Flatten all scores (each row is a question, each column is a run)
|
||
all_scores = df.values.flatten()
|
||
# Remove any NaN values
|
||
all_scores = all_scores[~np.isnan(all_scores)]
|
||
|
||
print(
|
||
f" 📊 {metric}: loaded {len(all_scores)} individual run scores from {len(df)} questions × {len(df.columns)} runs"
|
||
)
|
||
return all_scores.tolist()
|
||
except Exception as e:
|
||
print(f"❌ Error loading run scores for {metric} in {benchmark_name}: {e}")
|
||
raise
|
||
|
||
|
||
def process_single_metric_with_bootstrap(
|
||
benchmark_name: str, temp_dir: str, metric: str, cross_benchmark_mean: float = None
|
||
) -> Tuple[float, List[float]]:
|
||
"""Process a single metric: load run scores, calculate bootstrap CI, and return mean and CI."""
|
||
print(f"📊 Calculating {metric} for {benchmark_name}")
|
||
|
||
# Load all individual run scores for bootstrapping
|
||
all_run_scores = load_all_run_scores(benchmark_name, temp_dir, metric)
|
||
|
||
# Use provided mean or calculate from run scores
|
||
if cross_benchmark_mean is not None:
|
||
final_mean = round(cross_benchmark_mean, 3)
|
||
print(f" 📊 {metric}: using cross-benchmark mean={final_mean:.3f}")
|
||
else:
|
||
final_mean = round(np.mean(all_run_scores), 3)
|
||
print(f" 📊 {metric}: calculated mean from run scores={final_mean:.3f}")
|
||
|
||
# Calculate bootstrap confidence interval from all run scores
|
||
confidence_interval = bootstrap_confidence_interval(all_run_scores)
|
||
|
||
# Round confidence interval to 3 decimal places
|
||
confidence_interval = [round(ci, 3) for ci in confidence_interval]
|
||
|
||
print(f" 📊 {metric}: run scores range=[{min(all_run_scores):.3f}, {max(all_run_scores):.3f}]")
|
||
print(
|
||
f" ✅ {metric}: mean={final_mean:.3f}, CI=[{confidence_interval[0]:.3f}, {confidence_interval[1]:.3f}]"
|
||
)
|
||
return final_mean, confidence_interval
|
||
|
||
|
||
def process_single_benchmark(
|
||
benchmark_name: str, temp_dir: str, cross_benchmark_means: Dict[str, float] = None
|
||
) -> Dict[str, Any]:
|
||
"""Process a single benchmark and return formatted data."""
|
||
print(f"🔄 Processing benchmark: {benchmark_name}")
|
||
|
||
# Define metrics to process
|
||
metrics = ["directllm_correctness", "deepeval_correctness", "EM", "f1"]
|
||
|
||
# Calculate values for each metric
|
||
metric_values = {}
|
||
for metric in metrics:
|
||
try:
|
||
# Get cross-benchmark mean if available
|
||
cross_benchmark_mean = None
|
||
if cross_benchmark_means and metric in cross_benchmark_means:
|
||
cross_benchmark_mean = cross_benchmark_means[metric]
|
||
|
||
mean, confidence_interval = process_single_metric_with_bootstrap(
|
||
benchmark_name, temp_dir, metric, cross_benchmark_mean
|
||
)
|
||
metric_values[metric] = {"mean": mean, "confidence_interval": confidence_interval}
|
||
except Exception as e:
|
||
print(f"❌ Error processing {metric} for {benchmark_name}: {e}")
|
||
return None
|
||
|
||
print(f"✅ Successfully processed {benchmark_name} with {len(metric_values)} metrics")
|
||
return metric_values
|
||
|
||
|
||
def extract_confidence_intervals(
|
||
metric_values: Dict[str, Dict[str, Any]],
|
||
) -> Dict[str, List[float]]:
|
||
"""Extract confidence intervals from processed metric values."""
|
||
print(f"📊 Extracting confidence intervals for {len(metric_values)} metrics")
|
||
|
||
confidence_intervals = {}
|
||
for metric, data in metric_values.items():
|
||
if "confidence_interval" in data:
|
||
confidence_intervals[metric] = data["confidence_interval"]
|
||
print(
|
||
f" ✅ {metric}: CI=[{data['confidence_interval'][0]:.4f}, {data['confidence_interval'][1]:.4f}]"
|
||
)
|
||
else:
|
||
print(f" ❌ {metric}: No confidence interval found")
|
||
confidence_intervals[metric] = [0.0, 0.0] # Fallback
|
||
|
||
return confidence_intervals
|
||
|
||
|
||
def map_metric_names(metric: str) -> str:
|
||
"""Map internal metric names to output format names."""
|
||
mapping = {
|
||
"directllm_correctness": "Human-LLM Correctness",
|
||
"deepeval_correctness": "DeepEval Correctness",
|
||
"f1": "DeepEval F1",
|
||
"EM": "DeepEval EM",
|
||
}
|
||
return mapping.get(metric, metric)
|
||
|
||
|
||
def create_metric_entry(
|
||
metric_name: str, mean: float, confidence_interval: List[float]
|
||
) -> Tuple[str, float, List[float]]:
|
||
"""Create a formatted metric entry with proper name mapping."""
|
||
mapped_name = map_metric_names(metric_name)
|
||
error_name = f"{mapped_name} Error"
|
||
return mapped_name, mean, error_name, confidence_interval
|
||
|
||
|
||
def format_benchmark_entry(
|
||
benchmark_name: str, means: Dict[str, float], confidence_intervals: Dict[str, List[float]]
|
||
) -> Dict[str, Any]:
|
||
"""Format benchmark data into required JSON structure."""
|
||
print(f"📝 Formatting benchmark entry for {benchmark_name}")
|
||
|
||
formatted_entry = {"system": benchmark_name}
|
||
|
||
# Process each metric
|
||
for metric, mean in means.items():
|
||
if metric in confidence_intervals:
|
||
mapped_name, mean_value, error_name, ci = create_metric_entry(
|
||
metric, mean, confidence_intervals[metric]
|
||
)
|
||
|
||
# Add metric value (already rounded to 3 decimal places)
|
||
formatted_entry[mapped_name] = mean_value
|
||
# Add error interval (already rounded to 3 decimal places)
|
||
formatted_entry[error_name] = ci
|
||
|
||
print(f" ✅ {mapped_name}: {mean_value:.3f}, Error: [{ci[0]:.3f}, {ci[1]:.3f}]")
|
||
else:
|
||
print(f" ❌ {metric}: No confidence interval found")
|
||
|
||
return formatted_entry
|
||
|
||
|
||
def validate_benchmark_folder(benchmark_name: str, temp_dir: str) -> bool:
|
||
"""Validate that a benchmark folder has the required analysis files."""
|
||
analysis_path = get_benchmark_analysis_path(benchmark_name, temp_dir)
|
||
|
||
if not os.path.exists(analysis_path):
|
||
print(" ❌ Analysis folder not found: {analysis_path}")
|
||
return False
|
||
|
||
# Check for required metric files
|
||
required_files = [
|
||
"metrics_directllm_correctness.csv",
|
||
"metrics_deepeval_correctness.csv",
|
||
"metrics_EM.csv",
|
||
"metrics_f1.csv",
|
||
]
|
||
|
||
missing_files = []
|
||
for file in required_files:
|
||
file_path = os.path.join(analysis_path, file)
|
||
if not os.path.exists(file_path):
|
||
missing_files.append(file)
|
||
|
||
if missing_files:
|
||
print(f" ❌ Missing required files: {missing_files}")
|
||
return False
|
||
|
||
print(" ✅ Benchmark folder validated")
|
||
return True
|
||
|
||
|
||
def handle_processing_errors(benchmark_name: str, error: Exception) -> None:
|
||
"""Handle and log processing errors for a benchmark."""
|
||
print(f" ❌ Error processing {benchmark_name}: {error}")
|
||
print(f" 📝 Skipping {benchmark_name} and continuing with next benchmark")
|
||
|
||
|
||
def process_all_benchmarks(temp_dir: str, max_benchmarks: int = 3) -> List[Dict[str, Any]]:
|
||
"""Process all benchmarks with optional limit for testing."""
|
||
print(f"Processing benchmarks from {temp_dir} (max: {max_benchmarks})")
|
||
|
||
# Load cross-benchmark summary to get benchmark names
|
||
csv_path = os.path.join(temp_dir, "cross_benchmark_summary.csv")
|
||
summary_df = load_cross_benchmark_data(csv_path)
|
||
|
||
results = []
|
||
processed_count = 0
|
||
skipped_count = 0
|
||
error_count = 0
|
||
|
||
print(f"\n📊 Found {len(summary_df)} benchmarks to process")
|
||
|
||
# Process each benchmark
|
||
for _, row in summary_df.iterrows():
|
||
if max_benchmarks is not None and processed_count >= max_benchmarks:
|
||
print(f"⏹️ Reached max benchmark limit ({max_benchmarks})")
|
||
break
|
||
|
||
benchmark_name = row["benchmark"]
|
||
total_benchmarks = len(summary_df)
|
||
current_progress = processed_count + 1
|
||
print(f"\n📊 Processing benchmark {current_progress}/{total_benchmarks}: {benchmark_name}")
|
||
|
||
# Validate benchmark folder (PHASE 6 - IMPLEMENTED)
|
||
if not validate_benchmark_folder(benchmark_name, temp_dir):
|
||
print(f" ⏭️ Skipping {benchmark_name} due to validation failure")
|
||
skipped_count += 1
|
||
continue
|
||
|
||
# Get cross-benchmark means for this benchmark
|
||
cross_benchmark_means = {
|
||
"directllm_correctness": row.get("directllm_correctness_avg"),
|
||
"deepeval_correctness": row.get("deepeval_correctness_avg"),
|
||
"EM": row.get("EM_avg"),
|
||
"f1": row.get("f1_avg"),
|
||
}
|
||
|
||
# Process single benchmark with error handling (PHASE 6 - IMPLEMENTED)
|
||
try:
|
||
metric_values = process_single_benchmark(
|
||
benchmark_name, temp_dir, cross_benchmark_means
|
||
)
|
||
|
||
if metric_values:
|
||
# Extract confidence intervals (PHASE 4 - IMPLEMENTED)
|
||
print("📊 Extracting confidence intervals for {benchmark_name}")
|
||
confidence_intervals = extract_confidence_intervals(metric_values)
|
||
|
||
# Extract means for formatting
|
||
means = {metric: data["mean"] for metric, data in metric_values.items()}
|
||
|
||
# Format benchmark entry (PHASE 5 - IMPLEMENTED)
|
||
formatted_entry = format_benchmark_entry(
|
||
benchmark_name, means, confidence_intervals
|
||
)
|
||
|
||
print(f"✅ Successfully processed and formatted {benchmark_name}")
|
||
results.append(formatted_entry)
|
||
processed_count += 1
|
||
else:
|
||
print(f"❌ Failed to process {benchmark_name}")
|
||
error_count += 1
|
||
|
||
except Exception as e:
|
||
handle_processing_errors(benchmark_name, e)
|
||
error_count += 1
|
||
|
||
# Print final summary (PHASE 6 - IMPLEMENTED)
|
||
print("\n📊 Processing Summary:")
|
||
print(f" ✅ Successfully processed: {processed_count}")
|
||
print(f" ⏭️ Skipped (validation): {skipped_count}")
|
||
print(f" ❌ Errors: {error_count}")
|
||
print(f" 📁 Total benchmarks found: {len(summary_df)}")
|
||
|
||
return results
|
||
|
||
|
||
def validate_output_data(results: List[Dict[str, Any]]) -> bool:
|
||
"""Validate that the output data has the correct structure."""
|
||
if not results:
|
||
print("❌ No results to save")
|
||
return False
|
||
|
||
print(f"📊 Validating {len(results)} benchmark results")
|
||
|
||
for i, result in enumerate(results):
|
||
# Check required fields
|
||
if "system" not in result:
|
||
print(f"❌ Result {i}: Missing 'system' field")
|
||
return False
|
||
|
||
# Check that we have metric data
|
||
metric_count = 0
|
||
for key in result.keys():
|
||
if key != "system" and not key.endswith(" Error"):
|
||
metric_count += 1
|
||
|
||
if metric_count == 0:
|
||
print(f"❌ Result {i}: No metric data found")
|
||
return False
|
||
|
||
print(f" ✅ Result {i}: {result['system']} with {metric_count} metrics")
|
||
|
||
print("✅ Output data validation passed")
|
||
return True
|
||
|
||
|
||
def format_json_output(results: List[Dict[str, Any]]) -> str:
|
||
"""Format the results as a JSON string with proper indentation."""
|
||
try:
|
||
json_string = json.dumps(results, indent=2, ensure_ascii=False)
|
||
print(f"✅ Successfully formatted JSON output ({len(json_string)} characters)")
|
||
return json_string
|
||
except Exception as e:
|
||
print(f"❌ Error formatting JSON: {e}")
|
||
raise
|
||
|
||
|
||
def create_output_directory(output_path: str) -> None:
|
||
"""Create output directory if it doesn't exist."""
|
||
output_dir = os.path.dirname(output_path)
|
||
if output_dir and not os.path.exists(output_dir):
|
||
os.makedirs(output_dir)
|
||
print(f"📁 Created output directory: {output_dir}")
|
||
|
||
|
||
def save_benchmark_summary_json(results: List[Dict[str, Any]], output_path: str) -> None:
|
||
"""Save benchmark summary to JSON file."""
|
||
print(f"💾 Saving {len(results)} benchmark results to {output_path}")
|
||
|
||
# Validate output data (PHASE 7 - IMPLEMENTED)
|
||
if not validate_output_data(results):
|
||
raise ValueError("Output data validation failed")
|
||
|
||
# Create output directory if needed
|
||
create_output_directory(output_path)
|
||
|
||
# Format JSON output (PHASE 7 - IMPLEMENTED)
|
||
json_string = format_json_output(results)
|
||
|
||
# Save to file
|
||
try:
|
||
with open(output_path, "w", encoding="utf-8") as f:
|
||
f.write(json_string)
|
||
print(f"✅ Successfully saved JSON to {output_path}")
|
||
|
||
# Print file size
|
||
file_size = os.path.getsize(output_path)
|
||
print(f"📄 File size: {file_size} bytes")
|
||
|
||
except Exception as e:
|
||
print(f"❌ Error saving JSON file: {e}")
|
||
raise
|
||
|
||
|
||
def main():
|
||
"""Main function to orchestrate the benchmark summary creation."""
|
||
print("🚀 Starting benchmark summary JSON creation")
|
||
print("-" * 50)
|
||
|
||
# Configuration
|
||
TEMP_DIR = "temp"
|
||
CROSS_BENCHMARK_CSV = f"{TEMP_DIR}/cross_benchmark_summary.csv"
|
||
OUTPUT_PATH = "benchmark_summary.json"
|
||
MAX_BENCHMARKS = None # Process all benchmarks
|
||
|
||
print(f"📁 Temp directory: {TEMP_DIR}")
|
||
print(f"📊 Cross-benchmark CSV: {CROSS_BENCHMARK_CSV}")
|
||
print(f"💾 Output path: {OUTPUT_PATH}")
|
||
print(f"🔢 Max benchmarks to process: {MAX_BENCHMARKS}")
|
||
print("-" * 50)
|
||
|
||
# Check if temp directory exists
|
||
if not os.path.exists(TEMP_DIR):
|
||
print(f"❌ Temp directory not found: {TEMP_DIR}")
|
||
print("Please run run_cross_benchmark_analysis.py first")
|
||
return
|
||
|
||
# Check if cross-benchmark CSV exists
|
||
if not os.path.exists(CROSS_BENCHMARK_CSV):
|
||
print(f"❌ Cross-benchmark CSV not found: {CROSS_BENCHMARK_CSV}")
|
||
print("Please run run_cross_benchmark_analysis.py first")
|
||
return
|
||
|
||
print("✅ Required files found")
|
||
|
||
# Load cross-benchmark data (PHASE 2 - IMPLEMENTED)
|
||
print("🔄 Loading cross-benchmark data...")
|
||
try:
|
||
summary_df = load_cross_benchmark_data(CROSS_BENCHMARK_CSV)
|
||
print("📊 Loaded {len(summary_df)} benchmarks from CSV")
|
||
|
||
# Show all benchmarks found
|
||
if len(summary_df) > 0:
|
||
print("📋 All benchmarks found:")
|
||
for i, row in summary_df.iterrows():
|
||
print(f" {i + 1}. {row['benchmark']}: {row.get('overall_avg', 'N/A'):.4f}")
|
||
else:
|
||
print("⚠️ No benchmarks found in CSV")
|
||
|
||
except Exception as e:
|
||
print(f"❌ Error loading cross-benchmark data: {e}")
|
||
return
|
||
|
||
# Process benchmarks (PHASE 3, 4, 5 & 6 - IMPLEMENTED)
|
||
print("🔄 Processing and formatting benchmarks with validation...")
|
||
results = process_all_benchmarks(TEMP_DIR, MAX_BENCHMARKS)
|
||
|
||
print(f"\n📊 Processed {len(results)} benchmarks")
|
||
|
||
# Save results (PHASE 7 - IMPLEMENTED)
|
||
print("💾 Saving results...")
|
||
try:
|
||
save_benchmark_summary_json(results, OUTPUT_PATH)
|
||
print(f"\n🎉 Success! JSON saved to: {OUTPUT_PATH}")
|
||
print("📄 You can now use the benchmark summary JSON file")
|
||
except Exception as e:
|
||
print(f"❌ Error saving results: {e}")
|
||
return
|
||
|
||
print("\n🎉 Benchmark summary creation completed!")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|