chore(docker-compose, lightrag): optimize test infrastructure and add evaluation tools

Add comprehensive E2E testing infrastructure with PostgreSQL performance tuning,
Gunicorn multi-worker support, and evaluation scripts for RAGAS-based quality
assessment. Introduces 4 new evaluation utilities: compare_results.py for A/B test
analysis, download_wikipedia.py for reproducible test datasets, e2e_test_harness.py
for automated evaluation pipelines, and ingest_test_docs.py for batch document
ingestion. Updates docker-compose.test.yml with aggressive async settings, memory
limits, and optimized chunking parameters. Parallelize entity summarization in
operate.py for improved extraction performance. Fix typos in merge node/edge logs.
This commit is contained in:
clssck 2025-11-29 10:39:20 +01:00
parent d2c9e6e2ec
commit ef7327bb3e
7 changed files with 1311 additions and 11 deletions

View file

@ -14,11 +14,35 @@ services:
- "5433:5432" # Use 5433 to avoid conflict with agent-sdk postgres
volumes:
- pgdata_test:/var/lib/postgresql/data
command: |
postgres
-c shared_preload_libraries='vector,age'
-c max_connections=150
-c shared_buffers=768MB
-c work_mem=32MB
-c checkpoint_completion_target=0.9
-c effective_cache_size=2GB
-c maintenance_work_mem=192MB
-c wal_compression=on
-c checkpoint_timeout=10min
-c max_wal_size=1GB
-c random_page_cost=1.1
-c effective_io_concurrency=200
-c max_worker_processes=12
-c max_parallel_workers_per_gather=4
-c max_parallel_workers=8
-c max_parallel_maintenance_workers=4
-c jit_above_cost=50000
-c jit_inline_above_cost=250000
-c jit_optimize_above_cost=250000
-c default_statistics_target=200
-c hash_mem_multiplier=4
healthcheck:
test: ["CMD-SHELL", "pg_isready -U lightrag -d lightrag"]
interval: 5s
timeout: 5s
retries: 5
mem_limit: 2g
lightrag:
container_name: lightrag-test
@ -67,8 +91,14 @@ services:
- ENTITY_RESOLUTION_VECTOR_THRESHOLD=0.5
- ENTITY_RESOLUTION_MAX_CANDIDATES=3
# Processing
- MAX_ASYNC=4
# Processing - Aggressive settings from agent-sdk
- MAX_ASYNC=96
- MAX_PARALLEL_INSERT=10
- EMBEDDING_FUNC_MAX_ASYNC=16
- EMBEDDING_BATCH_NUM=48
# Gunicorn - 8 workers x 4 threads = 32 concurrent handlers
- GUNICORN_CMD_ARGS=--workers=8 --worker-class=gthread --threads=4 --worker-connections=1000 --timeout=120 --keep-alive=5 --graceful-timeout=30
# Extraction Optimization - Reduce Orphan Nodes
- CHUNK_SIZE=800 # Smaller chunks for focused extraction
@ -84,12 +114,23 @@ services:
depends_on:
postgres:
condition: service_healthy
entrypoint: []
command:
- python
- /app/lightrag/api/run_with_gunicorn.py
- --workers
- "8"
- --llm-binding
- openai
- --embedding-binding
- openai
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:9621/health || exit 1"]
interval: 10s
timeout: 5s
retries: 10
start_period: 30s
start_period: 60s
mem_limit: 2g
volumes:
pgdata_test:

View file

@ -0,0 +1,322 @@
#!/usr/bin/env python3
"""
A/B Test Results Comparator for RAGAS Evaluation
Compares two RAGAS evaluation result files to determine if a change
(e.g., orphan connections) improved or degraded retrieval quality.
Usage:
python lightrag/evaluation/compare_results.py baseline.json experiment.json
python lightrag/evaluation/compare_results.py results_a.json results_b.json --output comparison.json
"""
import argparse
import json
import math
import sys
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any
@dataclass
class MetricComparison:
"""Comparison of a single metric between two runs."""
metric_name: str
baseline_value: float
experiment_value: float
absolute_change: float
relative_change_percent: float
improved: bool
significant: bool # > 5% change
def safe_float(value: Any, default: float = 0.0) -> float:
"""Safely convert a value to float, handling NaN."""
if value is None:
return default
try:
f = float(value)
if math.isnan(f):
return default
return f
except (ValueError, TypeError):
return default
def compare_metrics(baseline: dict, experiment: dict) -> list[MetricComparison]:
"""
Compare metrics between baseline and experiment.
Args:
baseline: Benchmark stats from baseline run
experiment: Benchmark stats from experiment run
Returns:
List of MetricComparison objects
"""
comparisons = []
baseline_avg = baseline.get("average_metrics", {})
experiment_avg = experiment.get("average_metrics", {})
metrics_to_compare = [
("faithfulness", "Faithfulness"),
("answer_relevance", "Answer Relevance"),
("context_recall", "Context Recall"),
("context_precision", "Context Precision"),
("ragas_score", "RAGAS Score"),
]
for metric_key, metric_name in metrics_to_compare:
b_val = safe_float(baseline_avg.get(metric_key, 0))
e_val = safe_float(experiment_avg.get(metric_key, 0))
abs_change = e_val - b_val
rel_change = (abs_change / b_val * 100) if b_val > 0 else 0
comparisons.append(MetricComparison(
metric_name=metric_name,
baseline_value=b_val,
experiment_value=e_val,
absolute_change=abs_change,
relative_change_percent=rel_change,
improved=abs_change > 0,
significant=abs(rel_change) > 5, # > 5% is significant
))
return comparisons
def analyze_results(baseline_path: Path, experiment_path: Path) -> dict:
"""
Perform comprehensive A/B analysis.
Args:
baseline_path: Path to baseline results JSON
experiment_path: Path to experiment results JSON
Returns:
Analysis results dictionary
"""
# Load results
with open(baseline_path) as f:
baseline = json.load(f)
with open(experiment_path) as f:
experiment = json.load(f)
baseline_stats = baseline.get("benchmark_stats", {})
experiment_stats = experiment.get("benchmark_stats", {})
# Compare metrics
comparisons = compare_metrics(baseline_stats, experiment_stats)
# Calculate overall verdict
improvements = sum(1 for c in comparisons if c.improved)
regressions = sum(1 for c in comparisons if not c.improved and c.absolute_change != 0)
significant_improvements = sum(1 for c in comparisons if c.improved and c.significant)
significant_regressions = sum(1 for c in comparisons if not c.improved and c.significant)
# Determine verdict
ragas_comparison = next((c for c in comparisons if c.metric_name == "RAGAS Score"), None)
if ragas_comparison:
if ragas_comparison.improved and ragas_comparison.significant:
verdict = "SIGNIFICANT_IMPROVEMENT"
verdict_description = f"RAGAS Score improved by {ragas_comparison.relative_change_percent:.1f}%"
elif ragas_comparison.improved:
verdict = "MINOR_IMPROVEMENT"
verdict_description = f"RAGAS Score slightly improved by {ragas_comparison.relative_change_percent:.1f}%"
elif ragas_comparison.significant:
verdict = "SIGNIFICANT_REGRESSION"
verdict_description = f"RAGAS Score regressed by {abs(ragas_comparison.relative_change_percent):.1f}%"
elif ragas_comparison.absolute_change == 0:
verdict = "NO_CHANGE"
verdict_description = "No measurable difference between runs"
else:
verdict = "MINOR_REGRESSION"
verdict_description = f"RAGAS Score slightly regressed by {abs(ragas_comparison.relative_change_percent):.1f}%"
else:
verdict = "UNKNOWN"
verdict_description = "Could not determine RAGAS score comparison"
return {
"analysis_timestamp": datetime.now().isoformat(),
"baseline_file": str(baseline_path),
"experiment_file": str(experiment_path),
"verdict": verdict,
"verdict_description": verdict_description,
"summary": {
"metrics_improved": improvements,
"metrics_regressed": regressions,
"significant_improvements": significant_improvements,
"significant_regressions": significant_regressions,
},
"metrics": [
{
"name": c.metric_name,
"baseline": round(c.baseline_value, 4),
"experiment": round(c.experiment_value, 4),
"change": round(c.absolute_change, 4),
"change_percent": round(c.relative_change_percent, 2),
"improved": c.improved,
"significant": c.significant,
}
for c in comparisons
],
"baseline_summary": {
"total_tests": baseline_stats.get("total_tests", 0),
"successful_tests": baseline_stats.get("successful_tests", 0),
"success_rate": baseline_stats.get("success_rate", 0),
},
"experiment_summary": {
"total_tests": experiment_stats.get("total_tests", 0),
"successful_tests": experiment_stats.get("successful_tests", 0),
"success_rate": experiment_stats.get("success_rate", 0),
},
}
def print_comparison_report(analysis: dict):
"""Print a formatted comparison report to stdout."""
print("=" * 70)
print("A/B TEST COMPARISON REPORT")
print("=" * 70)
print(f"Baseline: {analysis['baseline_file']}")
print(f"Experiment: {analysis['experiment_file']}")
print("-" * 70)
# Verdict
verdict = analysis["verdict"]
verdict_icon = {
"SIGNIFICANT_IMPROVEMENT": "PASS",
"MINOR_IMPROVEMENT": "PASS",
"NO_CHANGE": "~",
"MINOR_REGRESSION": "WARN",
"SIGNIFICANT_REGRESSION": "FAIL",
"UNKNOWN": "?",
}.get(verdict, "?")
print(f"\n[{verdict_icon}] VERDICT: {verdict}")
print(f" {analysis['verdict_description']}")
# Metrics table
print("\n" + "-" * 70)
print(f"{'Metric':<20} {'Baseline':>10} {'Experiment':>10} {'Change':>10} {'Status':>10}")
print("-" * 70)
for metric in analysis["metrics"]:
name = metric["name"]
baseline = f"{metric['baseline']:.4f}"
experiment = f"{metric['experiment']:.4f}"
change = metric["change"]
change_pct = metric["change_percent"]
if change > 0:
change_str = f"+{change:.4f}"
status = f"+{change_pct:.1f}%"
elif change < 0:
change_str = f"{change:.4f}"
status = f"{change_pct:.1f}%"
else:
change_str = "0.0000"
status = "0.0%"
if metric["significant"]:
if metric["improved"]:
status = f"[UP] {status}"
else:
status = f"[DOWN] {status}"
else:
status = f" {status}"
print(f"{name:<20} {baseline:>10} {experiment:>10} {change_str:>10} {status:>10}")
print("-" * 70)
# Summary
summary = analysis["summary"]
print(f"\nSummary: {summary['metrics_improved']} improved, {summary['metrics_regressed']} regressed")
print(f" {summary['significant_improvements']} significant improvements, {summary['significant_regressions']} significant regressions")
# Test counts
b_summary = analysis["baseline_summary"]
e_summary = analysis["experiment_summary"]
print(f"\nBaseline: {b_summary['successful_tests']}/{b_summary['total_tests']} tests ({b_summary['success_rate']:.1f}% success)")
print(f"Experiment: {e_summary['successful_tests']}/{e_summary['total_tests']} tests ({e_summary['success_rate']:.1f}% success)")
print("=" * 70)
def main():
parser = argparse.ArgumentParser(
description="Compare RAGAS evaluation results from two runs",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Compare baseline vs experiment
python lightrag/evaluation/compare_results.py baseline.json experiment.json
# Save comparison to file
python lightrag/evaluation/compare_results.py baseline.json experiment.json --output comparison.json
# Compare with/without orphan connections
python lightrag/evaluation/compare_results.py results_without_orphans.json results_with_orphans.json
""",
)
parser.add_argument(
"baseline",
type=str,
help="Path to baseline results JSON file",
)
parser.add_argument(
"experiment",
type=str,
help="Path to experiment results JSON file",
)
parser.add_argument(
"--output",
"-o",
type=str,
default=None,
help="Output path for comparison JSON (optional)",
)
args = parser.parse_args()
baseline_path = Path(args.baseline)
experiment_path = Path(args.experiment)
# Validate files exist
if not baseline_path.exists():
print(f"Error: Baseline file not found: {baseline_path}")
sys.exit(1)
if not experiment_path.exists():
print(f"Error: Experiment file not found: {experiment_path}")
sys.exit(1)
# Run analysis
analysis = analyze_results(baseline_path, experiment_path)
# Print report
print_comparison_report(analysis)
# Save to file if requested
if args.output:
output_path = Path(args.output)
with open(output_path, "w") as f:
json.dump(analysis, f, indent=2)
print(f"\nComparison saved to: {output_path}")
# Exit with status based on verdict
if analysis["verdict"] in ("SIGNIFICANT_REGRESSION",):
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,183 @@
#!/usr/bin/env python3
"""
Download Wikipedia articles for LightRAG ingestion testing.
This script fetches plain text from Wikipedia articles across diverse domains
to create a test dataset with intentional entity overlap for testing:
- Entity merging and summarization
- Cross-domain relationships
- Parallel processing optimizations
Usage:
python lightrag/evaluation/download_wikipedia.py
python lightrag/evaluation/download_wikipedia.py --output wiki_docs/
python lightrag/evaluation/download_wikipedia.py --domains medical,climate
"""
import argparse
import asyncio
from pathlib import Path
import httpx
# Wikipedia API endpoint (no auth required)
WIKI_API = "https://en.wikipedia.org/w/api.php"
# User-Agent required by Wikipedia API policy
# See: https://meta.wikimedia.org/wiki/User-Agent_policy
USER_AGENT = "LightRAG-Test-Downloader/1.0 (https://github.com/HKUDS/LightRAG; claude@example.com)"
# Article selection by domain - chosen for entity overlap
# WHO → Medical + Climate
# Carbon/Emissions → Climate + Finance (ESG)
# Germany/Brazil → Sports + general knowledge
ARTICLES = {
"medical": ["Diabetes", "COVID-19"],
"finance": ["Stock_market", "Cryptocurrency"],
"climate": ["Climate_change", "Renewable_energy"],
"sports": ["FIFA_World_Cup", "Olympic_Games"],
}
async def fetch_article(title: str, client: httpx.AsyncClient) -> dict | None:
"""Fetch Wikipedia article text via API.
Args:
title: Wikipedia article title (use underscores for spaces)
client: Async HTTP client
Returns:
Dict with title, content, and source; or None if not found
"""
params = {
"action": "query",
"titles": title,
"prop": "extracts",
"explaintext": True, # Plain text, no HTML
"format": "json",
}
response = await client.get(WIKI_API, params=params)
# Check for HTTP errors
if response.status_code != 200:
print(f" HTTP {response.status_code} for {title}")
return None
# Handle empty response
if not response.content:
print(f" Empty response for {title}")
return None
try:
data = response.json()
except Exception as e:
print(f" JSON parse error for {title}: {e}")
return None
pages = data.get("query", {}).get("pages", {})
for page_id, page in pages.items():
if page_id != "-1": # -1 = not found
return {
"title": page.get("title", title),
"content": page.get("extract", ""),
"source": f"wikipedia_{title}",
}
return None
async def download_articles(
domains: list[str],
output_dir: Path,
) -> list[dict]:
"""Download all articles for selected domains.
Args:
domains: List of domain names (e.g., ["medical", "climate"])
output_dir: Directory to save downloaded articles
Returns:
List of article metadata dicts
"""
output_dir.mkdir(parents=True, exist_ok=True)
articles = []
headers = {"User-Agent": USER_AGENT}
async with httpx.AsyncClient(timeout=30.0, headers=headers) as client:
for domain in domains:
titles = ARTICLES.get(domain, [])
if not titles:
print(f"[{domain.upper()}] Unknown domain, skipping")
continue
print(f"[{domain.upper()}] Downloading {len(titles)} articles...")
for title in titles:
article = await fetch_article(title, client)
if article:
# Save to file
filename = f"{domain}_{title.lower().replace(' ', '_')}.txt"
filepath = output_dir / filename
filepath.write_text(article["content"])
word_count = len(article["content"].split())
print(f"{title}: {word_count:,} words")
articles.append(
{
"domain": domain,
"title": article["title"],
"file": str(filepath),
"words": word_count,
"source": article["source"],
}
)
else:
print(f"{title}: Not found")
return articles
async def main():
parser = argparse.ArgumentParser(description="Download Wikipedia test articles")
parser.add_argument(
"--output",
"-o",
type=str,
default="lightrag/evaluation/wiki_documents",
help="Output directory for downloaded articles",
)
parser.add_argument(
"--domains",
"-d",
type=str,
default="medical,finance,climate,sports",
help="Comma-separated domains to download",
)
args = parser.parse_args()
domains = [d.strip() for d in args.domains.split(",")]
output_dir = Path(args.output)
print("=== Wikipedia Article Downloader ===")
print(f"Domains: {', '.join(domains)}")
print(f"Output: {output_dir}/")
print()
articles = await download_articles(domains, output_dir)
total_words = sum(a["words"] for a in articles)
print()
print(f"✓ Downloaded {len(articles)} articles ({total_words:,} words total)")
print(f" Output: {output_dir}/")
# Print summary by domain
print("\nBy domain:")
for domain in domains:
domain_articles = [a for a in articles if a["domain"] == domain]
domain_words = sum(a["words"] for a in domain_articles)
print(f" {domain}: {len(domain_articles)} articles, {domain_words:,} words")
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1,531 @@
#!/usr/bin/env python3
"""
E2E RAGAS Test Harness for LightRAG
Complete end-to-end testing pipeline:
1. Download arXiv papers (reproducible test data)
2. Clear existing data (optional)
3. Ingest papers into LightRAG
4. Wait for processing
5. Generate Q&A dataset
6. Run RAGAS evaluation
7. Optional: A/B comparison
Usage:
# Full E2E test
python lightrag/evaluation/e2e_test_harness.py
# A/B comparison (with/without orphan connections)
python lightrag/evaluation/e2e_test_harness.py --ab-test
# Skip download if papers exist
python lightrag/evaluation/e2e_test_harness.py --skip-download
# Use existing dataset
python lightrag/evaluation/e2e_test_harness.py --dataset existing_dataset.json
"""
import argparse
import asyncio
import json
import os
import subprocess
import sys
import time
from datetime import datetime
from pathlib import Path
from typing import Any
import httpx
from dotenv import load_dotenv
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
# Load environment variables
load_dotenv(dotenv_path=".env", override=False)
# Configuration
DEFAULT_RAG_URL = "http://localhost:9622"
DEFAULT_PAPERS = ["2312.10997", "2404.10981", "2005.11401"]
POLL_INTERVAL_SECONDS = 10
MAX_WAIT_SECONDS = 600 # 10 minutes max wait for processing
class E2ETestHarness:
"""End-to-end test harness for LightRAG RAGAS evaluation."""
def __init__(
self,
rag_url: str = None,
paper_ids: list[str] = None,
questions_per_paper: int = 5,
skip_download: bool = False,
skip_ingest: bool = False,
dataset_path: str = None,
output_dir: str = None,
):
self.rag_url = (rag_url or os.getenv("LIGHTRAG_API_URL", DEFAULT_RAG_URL)).rstrip("/")
self.paper_ids = paper_ids or DEFAULT_PAPERS
self.questions_per_paper = questions_per_paper
self.skip_download = skip_download
self.skip_ingest = skip_ingest
self.dataset_path = Path(dataset_path) if dataset_path else None
# Determine directories
self.eval_dir = Path(__file__).parent
self.papers_dir = self.eval_dir / "papers"
self.results_dir = Path(output_dir) if output_dir else self.eval_dir / "results"
self.results_dir.mkdir(parents=True, exist_ok=True)
# API key for LightRAG
self.api_key = os.getenv("LIGHTRAG_API_KEY")
async def check_lightrag_health(self) -> bool:
"""Check if LightRAG API is accessible."""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(f"{self.rag_url}/health")
response.raise_for_status()
print(f"[OK] LightRAG API accessible at {self.rag_url}")
return True
except Exception as e:
print(f"[ERROR] Cannot connect to LightRAG API: {e}")
return False
async def download_papers(self) -> list[str]:
"""Download arXiv papers."""
if self.skip_download:
print("[SKIP] Paper download (--skip-download)")
# Check existing papers
existing = [
str(self.papers_dir / f"{pid}.pdf")
for pid in self.paper_ids
if (self.papers_dir / f"{pid}.pdf").exists()
]
print(f"[INFO] Found {len(existing)} existing papers")
return existing
print("\n" + "=" * 60)
print("STEP 1: Download arXiv Papers")
print("=" * 60)
from lightrag.evaluation.download_arxiv import download_papers
results = await download_papers(self.paper_ids, self.papers_dir)
return [r["path"] for r in results if r["status"] in ("downloaded", "exists")]
async def clear_existing_data(self) -> bool:
"""Clear existing documents in LightRAG (optional)."""
print("\n[INFO] Clearing existing data...")
try:
headers = {"X-API-Key": self.api_key} if self.api_key else {}
async with httpx.AsyncClient(timeout=60.0) as client:
# Get current documents
response = await client.get(
f"{self.rag_url}/documents",
headers=headers,
)
response.raise_for_status()
docs = response.json()
# Clear all documents
statuses = docs.get("statuses", {})
all_docs = []
for status_docs in statuses.values():
all_docs.extend(status_docs)
if all_docs:
print(f"[INFO] Clearing {len(all_docs)} existing documents...")
for doc in all_docs:
doc_id = doc.get("id")
if doc_id:
await client.delete(
f"{self.rag_url}/documents/{doc_id}",
headers=headers,
)
print("[OK] Cleared existing documents")
else:
print("[OK] No existing documents to clear")
return True
except Exception as e:
print(f"[WARN] Could not clear data: {e}")
return False
async def ingest_papers(self, paper_paths: list[str]) -> bool:
"""Ingest papers into LightRAG."""
if self.skip_ingest:
print("[SKIP] Paper ingestion (--skip-ingest)")
return True
print("\n" + "=" * 60)
print("STEP 2: Ingest Papers into LightRAG")
print("=" * 60)
headers = {"X-API-Key": self.api_key} if self.api_key else {}
async with httpx.AsyncClient(timeout=300.0) as client:
for paper_path in paper_paths:
path = Path(paper_path)
if not path.exists():
print(f"[WARN] Paper not found: {paper_path}")
continue
print(f"[UPLOAD] {path.name}")
try:
with open(path, "rb") as f:
files = {"file": (path.name, f, "application/pdf")}
response = await client.post(
f"{self.rag_url}/documents/upload",
files=files,
headers=headers,
)
response.raise_for_status()
result = response.json()
print(f" [OK] Uploaded: {result}")
except Exception as e:
print(f" [ERROR] Upload failed: {e}")
return True
async def wait_for_processing(self) -> bool:
"""Wait for all documents to finish processing."""
print("\n" + "=" * 60)
print("STEP 3: Wait for Document Processing")
print("=" * 60)
headers = {"X-API-Key": self.api_key} if self.api_key else {}
start_time = time.time()
async with httpx.AsyncClient(timeout=30.0) as client:
while time.time() - start_time < MAX_WAIT_SECONDS:
try:
response = await client.get(
f"{self.rag_url}/documents",
headers=headers,
)
response.raise_for_status()
docs = response.json()
statuses = docs.get("statuses", {})
# API returns lowercase status keys
processing = len(statuses.get("processing", []))
pending = len(statuses.get("pending", []))
completed = len(statuses.get("processed", [])) # Note: "processed" not "completed"
failed = len(statuses.get("failed", []))
elapsed = int(time.time() - start_time)
print(f" [{elapsed}s] Processing: {processing}, Pending: {pending}, Completed: {completed}, Failed: {failed}")
if processing == 0 and pending == 0:
print("[OK] All documents processed")
return True
except Exception as e:
print(f" [WARN] Status check failed: {e}")
await asyncio.sleep(POLL_INTERVAL_SECONDS)
print("[ERROR] Timeout waiting for document processing")
return False
async def generate_dataset(self) -> Path:
"""Generate Q&A dataset from ingested papers."""
if self.dataset_path and self.dataset_path.exists():
print(f"[SKIP] Using existing dataset: {self.dataset_path}")
return self.dataset_path
print("\n" + "=" * 60)
print("STEP 4: Generate Q&A Dataset")
print("=" * 60)
from lightrag.evaluation.generate_arxiv_dataset import generate_dataset
output_path = self.eval_dir / f"arxiv_dataset_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
await generate_dataset(
paper_ids=self.paper_ids,
questions_per_paper=self.questions_per_paper,
rag_url=self.rag_url,
output_path=output_path,
)
return output_path
async def run_ragas_evaluation(self, dataset_path: Path) -> dict:
"""Run RAGAS evaluation."""
print("\n" + "=" * 60)
print("STEP 5: Run RAGAS Evaluation")
print("=" * 60)
from lightrag.evaluation.eval_rag_quality import RAGEvaluator
evaluator = RAGEvaluator(
test_dataset_path=str(dataset_path),
rag_api_url=self.rag_url,
)
results = await evaluator.run()
return results
async def run_full_pipeline(self) -> dict:
"""Run the complete E2E test pipeline."""
print("=" * 70)
print("E2E RAGAS TEST HARNESS FOR LIGHTRAG")
print("=" * 70)
print(f"RAG URL: {self.rag_url}")
print(f"Papers: {', '.join(self.paper_ids)}")
print(f"Questions: {self.questions_per_paper} per paper")
print(f"Results: {self.results_dir}")
print("=" * 70)
start_time = time.time()
# Check LightRAG is accessible
if not await self.check_lightrag_health():
return {"error": "LightRAG API not accessible"}
# Step 1: Download papers
paper_paths = await self.download_papers()
if not paper_paths:
return {"error": "No papers to process"}
# Step 2: Ingest papers
if not await self.ingest_papers(paper_paths):
return {"error": "Paper ingestion failed"}
# Step 3: Wait for processing
if not self.skip_ingest:
if not await self.wait_for_processing():
return {"error": "Document processing timeout"}
# Step 4: Generate dataset
dataset_path = await self.generate_dataset()
# Step 5: Run RAGAS evaluation
results = await self.run_ragas_evaluation(dataset_path)
elapsed_time = time.time() - start_time
# Save summary
summary = {
"pipeline_completed_at": datetime.now().isoformat(),
"total_elapsed_seconds": round(elapsed_time, 2),
"papers": self.paper_ids,
"dataset_path": str(dataset_path),
"ragas_results": results.get("benchmark_stats", {}),
}
summary_path = self.results_dir / f"e2e_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(summary_path, "w") as f:
json.dump(summary, f, indent=2)
print("\n" + "=" * 70)
print("E2E PIPELINE COMPLETE")
print("=" * 70)
print(f"Total time: {elapsed_time:.1f} seconds")
print(f"Summary saved: {summary_path}")
print("=" * 70)
return summary
async def run_ab_test(
harness_config: dict,
clear_between_runs: bool = True,
) -> dict:
"""
Run A/B test comparing with/without orphan connections.
Args:
harness_config: Configuration for E2ETestHarness
clear_between_runs: Clear data between A and B runs
Returns:
A/B comparison results
"""
print("=" * 70)
print("A/B TEST: WITH vs WITHOUT ORPHAN CONNECTIONS")
print("=" * 70)
results = {}
# Test A: WITHOUT orphan connections
print("\n[A] Running WITHOUT orphan connections...")
os.environ["AUTO_CONNECT_ORPHANS"] = "false"
harness_a = E2ETestHarness(**harness_config)
results["without_orphans"] = await harness_a.run_full_pipeline()
# Clear for next run
if clear_between_runs:
await harness_a.clear_existing_data()
# Test B: WITH orphan connections
print("\n[B] Running WITH orphan connections...")
os.environ["AUTO_CONNECT_ORPHANS"] = "true"
# Force re-ingest for test B
harness_config_b = harness_config.copy()
harness_config_b["skip_download"] = True # Papers already downloaded
harness_config_b["skip_ingest"] = False # Need to re-ingest
harness_b = E2ETestHarness(**harness_config_b)
results["with_orphans"] = await harness_b.run_full_pipeline()
# Compare results
print("\n" + "=" * 70)
print("A/B COMPARISON")
print("=" * 70)
a_stats = results["without_orphans"].get("ragas_results", {}).get("average_metrics", {})
b_stats = results["with_orphans"].get("ragas_results", {}).get("average_metrics", {})
comparison = {
"timestamp": datetime.now().isoformat(),
"without_orphans": a_stats,
"with_orphans": b_stats,
"improvement": {},
}
for metric in ["faithfulness", "answer_relevance", "context_recall", "context_precision", "ragas_score"]:
a_val = a_stats.get(metric, 0)
b_val = b_stats.get(metric, 0)
diff = b_val - a_val
pct = (diff / a_val * 100) if a_val > 0 else 0
comparison["improvement"][metric] = {
"absolute": round(diff, 4),
"percent": round(pct, 2),
}
status = "UP" if diff > 0 else ("DOWN" if diff < 0 else "~")
print(f" {metric:<20} A: {a_val:.4f} B: {b_val:.4f} [{status}] {pct:+.1f}%")
# Verdict
ragas_improvement = comparison["improvement"].get("ragas_score", {}).get("percent", 0)
if ragas_improvement > 5:
verdict = "ORPHAN CONNECTIONS IMPROVE QUALITY"
elif ragas_improvement < -5:
verdict = "ORPHAN CONNECTIONS DEGRADE QUALITY"
else:
verdict = "NO SIGNIFICANT DIFFERENCE"
comparison["verdict"] = verdict
print(f"\nVERDICT: {verdict}")
# Save comparison
comp_path = harness_a.results_dir / f"ab_comparison_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(comp_path, "w") as f:
json.dump(comparison, f, indent=2)
print(f"\nComparison saved: {comp_path}")
return comparison
async def main():
parser = argparse.ArgumentParser(
description="E2E RAGAS Test Harness for LightRAG",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Full E2E test
python lightrag/evaluation/e2e_test_harness.py
# A/B test (with/without orphan connections)
python lightrag/evaluation/e2e_test_harness.py --ab-test
# Skip paper download
python lightrag/evaluation/e2e_test_harness.py --skip-download
# Use existing dataset
python lightrag/evaluation/e2e_test_harness.py --dataset arxiv_dataset.json
""",
)
parser.add_argument(
"--rag-url",
"-r",
type=str,
default=None,
help=f"LightRAG API URL (default: {DEFAULT_RAG_URL})",
)
parser.add_argument(
"--papers",
"-p",
type=str,
default=None,
help="Comma-separated arXiv paper IDs",
)
parser.add_argument(
"--questions",
"-q",
type=int,
default=5,
help="Questions per paper (default: 5)",
)
parser.add_argument(
"--skip-download",
action="store_true",
help="Skip paper download (use existing)",
)
parser.add_argument(
"--skip-ingest",
action="store_true",
help="Skip paper ingestion (use existing data)",
)
parser.add_argument(
"--dataset",
"-d",
type=str,
default=None,
help="Path to existing Q&A dataset (skip generation)",
)
parser.add_argument(
"--output-dir",
"-o",
type=str,
default=None,
help="Output directory for results",
)
parser.add_argument(
"--ab-test",
action="store_true",
help="Run A/B test comparing with/without orphan connections",
)
args = parser.parse_args()
# Parse paper IDs
paper_ids = None
if args.papers:
paper_ids = [p.strip() for p in args.papers.split(",")]
harness_config = {
"rag_url": args.rag_url,
"paper_ids": paper_ids,
"questions_per_paper": args.questions,
"skip_download": args.skip_download,
"skip_ingest": args.skip_ingest,
"dataset_path": args.dataset,
"output_dir": args.output_dir,
}
if args.ab_test:
await run_ab_test(harness_config)
else:
harness = E2ETestHarness(**harness_config)
await harness.run_full_pipeline()
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1,170 @@
#!/usr/bin/env python3
"""
Ingest test documents into LightRAG for testing.
This script reads text files from a directory and batch-uploads them to
LightRAG via the /documents/texts API endpoint, then polls for completion.
Usage:
python lightrag/evaluation/ingest_test_docs.py
python lightrag/evaluation/ingest_test_docs.py --input wiki_documents/ --rag-url http://localhost:9622
"""
import argparse
import asyncio
import os
import time
from pathlib import Path
import httpx
DEFAULT_RAG_URL = "http://localhost:9622"
async def ingest_documents(
input_dir: Path,
rag_url: str,
) -> dict:
"""Ingest all text files from directory into LightRAG.
Args:
input_dir: Directory containing .txt or .md files
rag_url: LightRAG API base URL
Returns:
Dict with ingestion statistics
"""
timeout = httpx.Timeout(120.0, connect=30.0)
api_key = os.getenv("LIGHTRAG_API_KEY")
headers = {"X-API-Key": api_key} if api_key else {}
async with httpx.AsyncClient(timeout=timeout) as client:
# Check health
try:
health = await client.get(f"{rag_url}/health")
if health.status_code != 200:
raise ConnectionError(f"LightRAG not healthy: {health.status_code}")
except httpx.ConnectError:
raise ConnectionError(f"Cannot connect to LightRAG at {rag_url}")
print(f"✓ Connected to LightRAG at {rag_url}")
# Collect all text files
files = list(input_dir.glob("*.txt")) + list(input_dir.glob("*.md"))
if not files:
print(f"✗ No .txt or .md files found in {input_dir}")
return {"documents": 0, "elapsed_seconds": 0}
print(f" Found {len(files)} documents to ingest")
# Read all texts
texts = []
sources = []
for file in sorted(files):
content = file.read_text()
texts.append(content)
sources.append(file.name)
word_count = len(content.split())
print(f" {file.name}: {word_count:,} words")
# Batch ingest via /documents/texts
print(f"\n Uploading {len(texts)} documents...")
start = time.time()
response = await client.post(
f"{rag_url}/documents/texts",
json={"texts": texts, "file_sources": sources},
headers=headers,
)
response.raise_for_status()
result = response.json()
track_id = result.get("track_id", "")
print(f" Track ID: {track_id}")
# Poll for completion - wait for processing to start first
print(" Waiting for processing to start...")
await asyncio.sleep(2) # Give server time to queue documents
last_status = ""
processed_count = 0
expected_total = len(texts)
initial_check = True
while True:
status_response = await client.get(f"{rag_url}/documents")
docs = status_response.json()
statuses = docs.get("statuses", {})
processing = len(statuses.get("processing", []))
pending = len(statuses.get("pending", []))
processed = len(statuses.get("processed", []))
total_visible = processing + pending + processed
current_status = f"Pending: {pending}, Processing: {processing}, Processed: {processed}"
if current_status != last_status:
print(f" {current_status}")
last_status = current_status
processed_count = processed
# Wait until we see at least some of our docs in the queue
if initial_check and (pending > 0 or processing > 0):
initial_check = False
print(" Processing started!")
# Only exit when processing is done AND we've processed something new
if processing == 0 and pending == 0 and not initial_check:
break
await asyncio.sleep(5)
elapsed = time.time() - start
print(f"\n✓ Ingestion complete in {elapsed:.1f}s")
print(f" Documents processed: {processed_count}")
print(f" Average: {elapsed / len(texts):.1f}s per document")
return {
"documents": len(texts),
"processed": processed_count,
"elapsed_seconds": elapsed,
"track_id": track_id,
}
async def main():
parser = argparse.ArgumentParser(description="Ingest test documents into LightRAG")
parser.add_argument(
"--input",
"-i",
type=str,
default="lightrag/evaluation/wiki_documents",
help="Input directory with text files",
)
parser.add_argument(
"--rag-url",
"-r",
type=str,
default=None,
help=f"LightRAG API URL (default: {DEFAULT_RAG_URL})",
)
args = parser.parse_args()
input_dir = Path(args.input)
rag_url = args.rag_url or os.getenv("LIGHTRAG_API_URL", DEFAULT_RAG_URL)
print("=== LightRAG Document Ingestion ===")
print(f"Input: {input_dir}/")
print(f"RAG URL: {rag_url}")
print()
if not input_dir.exists():
print(f"✗ Input directory not found: {input_dir}")
print(" Run download_wikipedia.py first:")
print(" python lightrag/evaluation/download_wikipedia.py")
return
await ingest_documents(input_dir, rag_url)
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1,43 @@
#!/bin/bash
# Quick script to populate LightRAG with diverse test documents
#
# This downloads Wikipedia articles across 4 domains (Medical, Finance, Climate, Sports)
# and ingests them into LightRAG. The articles are chosen to have entity overlap
# (WHO, Carbon/Emissions, Organizations) to test entity merging and summarization.
#
# Usage:
# ./lightrag/evaluation/populate_test_data.sh
# LIGHTRAG_API_URL=http://localhost:9622 ./lightrag/evaluation/populate_test_data.sh
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
RAG_URL="${LIGHTRAG_API_URL:-http://localhost:9622}"
echo "=== LightRAG Test Data Population ==="
echo "RAG URL: $RAG_URL"
echo ""
# Check if LightRAG is running
if ! curl -s "$RAG_URL/health" > /dev/null 2>&1; then
echo "✗ Cannot connect to LightRAG at $RAG_URL"
echo " Make sure LightRAG is running first"
exit 1
fi
# 1. Download Wikipedia articles
echo "[1/2] Downloading Wikipedia articles..."
python3 "$SCRIPT_DIR/download_wikipedia.py"
# 2. Ingest into LightRAG
echo ""
echo "[2/2] Ingesting documents..."
python3 "$SCRIPT_DIR/ingest_test_docs.py" --rag-url "$RAG_URL"
echo ""
echo "=== Done! ==="
echo "Documents ingested into LightRAG."
echo ""
echo "Next steps:"
echo " - Check graph stats: curl $RAG_URL/graph/statistics"
echo " - Query the data: curl '$RAG_URL/query?mode=global&query=What+is+climate+change'"

View file

@ -280,12 +280,12 @@ async def _handle_entity_relation_summary(
f" Summarizing {entity_or_relation_name}: Map {len(current_list)} descriptions into {len(chunks)} groups"
)
# Reduce phase: summarize each group from chunks
new_summaries = []
for chunk in chunks:
# Reduce phase: summarize each group from chunks IN PARALLEL
async def _summarize_single_chunk(chunk: list[str]) -> tuple[str, bool]:
"""Summarize a single chunk, returning (summary, used_llm)."""
if len(chunk) == 1:
# Optimization: single description chunks don't need LLM summarization
new_summaries.append(chunk[0])
return chunk[0], False
else:
# Multiple descriptions need LLM summarization
summary = await _summarize_descriptions(
@ -295,8 +295,18 @@ async def _handle_entity_relation_summary(
global_config,
llm_response_cache,
)
new_summaries.append(summary)
llm_was_used = True # Mark that LLM was used in reduce phase
return summary, True
# Create tasks for all chunks and run in parallel
tasks = [
asyncio.create_task(_summarize_single_chunk(chunk)) for chunk in chunks
]
results = await asyncio.gather(*tasks)
# Collect results while preserving order
new_summaries = [result[0] for result in results]
if any(result[1] for result in results):
llm_was_used = True # Mark that LLM was used in reduce phase
# Update current list with new summaries for next iteration
current_list = new_summaries
@ -2115,7 +2125,7 @@ async def _merge_nodes_then_upsert(
deduplicated_num = already_fragment + len(nodes_data) - num_fragment
dd_message = ""
if deduplicated_num > 0:
# Duplicated description detected across multiple trucks for the same entity
# Duplicated description detected across multiple chunks for the same entity
dd_message = f"dd {deduplicated_num}"
if dd_message or truncation_info_log:
@ -2459,7 +2469,7 @@ async def _merge_edges_then_upsert(
deduplicated_num = already_fragment + len(edges_data) - num_fragment
dd_message = ""
if deduplicated_num > 0:
# Duplicated description detected across multiple trucks for the same entity
# Duplicated description detected across multiple chunks for the same entity
dd_message = f"dd {deduplicated_num}"
if dd_message or truncation_info_log: