diff --git a/docker-compose.test.yml b/docker-compose.test.yml index 2bb5ffdc..e427ffe2 100644 --- a/docker-compose.test.yml +++ b/docker-compose.test.yml @@ -14,11 +14,35 @@ services: - "5433:5432" # Use 5433 to avoid conflict with agent-sdk postgres volumes: - pgdata_test:/var/lib/postgresql/data + command: | + postgres + -c shared_preload_libraries='vector,age' + -c max_connections=150 + -c shared_buffers=768MB + -c work_mem=32MB + -c checkpoint_completion_target=0.9 + -c effective_cache_size=2GB + -c maintenance_work_mem=192MB + -c wal_compression=on + -c checkpoint_timeout=10min + -c max_wal_size=1GB + -c random_page_cost=1.1 + -c effective_io_concurrency=200 + -c max_worker_processes=12 + -c max_parallel_workers_per_gather=4 + -c max_parallel_workers=8 + -c max_parallel_maintenance_workers=4 + -c jit_above_cost=50000 + -c jit_inline_above_cost=250000 + -c jit_optimize_above_cost=250000 + -c default_statistics_target=200 + -c hash_mem_multiplier=4 healthcheck: test: ["CMD-SHELL", "pg_isready -U lightrag -d lightrag"] interval: 5s timeout: 5s retries: 5 + mem_limit: 2g lightrag: container_name: lightrag-test @@ -67,8 +91,14 @@ services: - ENTITY_RESOLUTION_VECTOR_THRESHOLD=0.5 - ENTITY_RESOLUTION_MAX_CANDIDATES=3 - # Processing - - MAX_ASYNC=4 + # Processing - Aggressive settings from agent-sdk + - MAX_ASYNC=96 + - MAX_PARALLEL_INSERT=10 + - EMBEDDING_FUNC_MAX_ASYNC=16 + - EMBEDDING_BATCH_NUM=48 + + # Gunicorn - 8 workers x 4 threads = 32 concurrent handlers + - GUNICORN_CMD_ARGS=--workers=8 --worker-class=gthread --threads=4 --worker-connections=1000 --timeout=120 --keep-alive=5 --graceful-timeout=30 # Extraction Optimization - Reduce Orphan Nodes - CHUNK_SIZE=800 # Smaller chunks for focused extraction @@ -84,12 +114,23 @@ services: depends_on: postgres: condition: service_healthy + entrypoint: [] + command: + - python + - /app/lightrag/api/run_with_gunicorn.py + - --workers + - "8" + - --llm-binding + - openai + - --embedding-binding + - openai healthcheck: test: ["CMD-SHELL", "curl -f http://localhost:9621/health || exit 1"] interval: 10s timeout: 5s retries: 10 - start_period: 30s + start_period: 60s + mem_limit: 2g volumes: pgdata_test: diff --git a/lightrag/evaluation/compare_results.py b/lightrag/evaluation/compare_results.py new file mode 100644 index 00000000..b10350f5 --- /dev/null +++ b/lightrag/evaluation/compare_results.py @@ -0,0 +1,322 @@ +#!/usr/bin/env python3 +""" +A/B Test Results Comparator for RAGAS Evaluation + +Compares two RAGAS evaluation result files to determine if a change +(e.g., orphan connections) improved or degraded retrieval quality. + +Usage: + python lightrag/evaluation/compare_results.py baseline.json experiment.json + python lightrag/evaluation/compare_results.py results_a.json results_b.json --output comparison.json +""" + +import argparse +import json +import math +import sys +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any + + +@dataclass +class MetricComparison: + """Comparison of a single metric between two runs.""" + metric_name: str + baseline_value: float + experiment_value: float + absolute_change: float + relative_change_percent: float + improved: bool + significant: bool # > 5% change + + +def safe_float(value: Any, default: float = 0.0) -> float: + """Safely convert a value to float, handling NaN.""" + if value is None: + return default + try: + f = float(value) + if math.isnan(f): + return default + return f + except (ValueError, TypeError): + return default + + +def compare_metrics(baseline: dict, experiment: dict) -> list[MetricComparison]: + """ + Compare metrics between baseline and experiment. + + Args: + baseline: Benchmark stats from baseline run + experiment: Benchmark stats from experiment run + + Returns: + List of MetricComparison objects + """ + comparisons = [] + + baseline_avg = baseline.get("average_metrics", {}) + experiment_avg = experiment.get("average_metrics", {}) + + metrics_to_compare = [ + ("faithfulness", "Faithfulness"), + ("answer_relevance", "Answer Relevance"), + ("context_recall", "Context Recall"), + ("context_precision", "Context Precision"), + ("ragas_score", "RAGAS Score"), + ] + + for metric_key, metric_name in metrics_to_compare: + b_val = safe_float(baseline_avg.get(metric_key, 0)) + e_val = safe_float(experiment_avg.get(metric_key, 0)) + + abs_change = e_val - b_val + rel_change = (abs_change / b_val * 100) if b_val > 0 else 0 + + comparisons.append(MetricComparison( + metric_name=metric_name, + baseline_value=b_val, + experiment_value=e_val, + absolute_change=abs_change, + relative_change_percent=rel_change, + improved=abs_change > 0, + significant=abs(rel_change) > 5, # > 5% is significant + )) + + return comparisons + + +def analyze_results(baseline_path: Path, experiment_path: Path) -> dict: + """ + Perform comprehensive A/B analysis. + + Args: + baseline_path: Path to baseline results JSON + experiment_path: Path to experiment results JSON + + Returns: + Analysis results dictionary + """ + # Load results + with open(baseline_path) as f: + baseline = json.load(f) + with open(experiment_path) as f: + experiment = json.load(f) + + baseline_stats = baseline.get("benchmark_stats", {}) + experiment_stats = experiment.get("benchmark_stats", {}) + + # Compare metrics + comparisons = compare_metrics(baseline_stats, experiment_stats) + + # Calculate overall verdict + improvements = sum(1 for c in comparisons if c.improved) + regressions = sum(1 for c in comparisons if not c.improved and c.absolute_change != 0) + significant_improvements = sum(1 for c in comparisons if c.improved and c.significant) + significant_regressions = sum(1 for c in comparisons if not c.improved and c.significant) + + # Determine verdict + ragas_comparison = next((c for c in comparisons if c.metric_name == "RAGAS Score"), None) + + if ragas_comparison: + if ragas_comparison.improved and ragas_comparison.significant: + verdict = "SIGNIFICANT_IMPROVEMENT" + verdict_description = f"RAGAS Score improved by {ragas_comparison.relative_change_percent:.1f}%" + elif ragas_comparison.improved: + verdict = "MINOR_IMPROVEMENT" + verdict_description = f"RAGAS Score slightly improved by {ragas_comparison.relative_change_percent:.1f}%" + elif ragas_comparison.significant: + verdict = "SIGNIFICANT_REGRESSION" + verdict_description = f"RAGAS Score regressed by {abs(ragas_comparison.relative_change_percent):.1f}%" + elif ragas_comparison.absolute_change == 0: + verdict = "NO_CHANGE" + verdict_description = "No measurable difference between runs" + else: + verdict = "MINOR_REGRESSION" + verdict_description = f"RAGAS Score slightly regressed by {abs(ragas_comparison.relative_change_percent):.1f}%" + else: + verdict = "UNKNOWN" + verdict_description = "Could not determine RAGAS score comparison" + + return { + "analysis_timestamp": datetime.now().isoformat(), + "baseline_file": str(baseline_path), + "experiment_file": str(experiment_path), + "verdict": verdict, + "verdict_description": verdict_description, + "summary": { + "metrics_improved": improvements, + "metrics_regressed": regressions, + "significant_improvements": significant_improvements, + "significant_regressions": significant_regressions, + }, + "metrics": [ + { + "name": c.metric_name, + "baseline": round(c.baseline_value, 4), + "experiment": round(c.experiment_value, 4), + "change": round(c.absolute_change, 4), + "change_percent": round(c.relative_change_percent, 2), + "improved": c.improved, + "significant": c.significant, + } + for c in comparisons + ], + "baseline_summary": { + "total_tests": baseline_stats.get("total_tests", 0), + "successful_tests": baseline_stats.get("successful_tests", 0), + "success_rate": baseline_stats.get("success_rate", 0), + }, + "experiment_summary": { + "total_tests": experiment_stats.get("total_tests", 0), + "successful_tests": experiment_stats.get("successful_tests", 0), + "success_rate": experiment_stats.get("success_rate", 0), + }, + } + + +def print_comparison_report(analysis: dict): + """Print a formatted comparison report to stdout.""" + print("=" * 70) + print("A/B TEST COMPARISON REPORT") + print("=" * 70) + print(f"Baseline: {analysis['baseline_file']}") + print(f"Experiment: {analysis['experiment_file']}") + print("-" * 70) + + # Verdict + verdict = analysis["verdict"] + verdict_icon = { + "SIGNIFICANT_IMPROVEMENT": "PASS", + "MINOR_IMPROVEMENT": "PASS", + "NO_CHANGE": "~", + "MINOR_REGRESSION": "WARN", + "SIGNIFICANT_REGRESSION": "FAIL", + "UNKNOWN": "?", + }.get(verdict, "?") + + print(f"\n[{verdict_icon}] VERDICT: {verdict}") + print(f" {analysis['verdict_description']}") + + # Metrics table + print("\n" + "-" * 70) + print(f"{'Metric':<20} {'Baseline':>10} {'Experiment':>10} {'Change':>10} {'Status':>10}") + print("-" * 70) + + for metric in analysis["metrics"]: + name = metric["name"] + baseline = f"{metric['baseline']:.4f}" + experiment = f"{metric['experiment']:.4f}" + + change = metric["change"] + change_pct = metric["change_percent"] + if change > 0: + change_str = f"+{change:.4f}" + status = f"+{change_pct:.1f}%" + elif change < 0: + change_str = f"{change:.4f}" + status = f"{change_pct:.1f}%" + else: + change_str = "0.0000" + status = "0.0%" + + if metric["significant"]: + if metric["improved"]: + status = f"[UP] {status}" + else: + status = f"[DOWN] {status}" + else: + status = f" {status}" + + print(f"{name:<20} {baseline:>10} {experiment:>10} {change_str:>10} {status:>10}") + + print("-" * 70) + + # Summary + summary = analysis["summary"] + print(f"\nSummary: {summary['metrics_improved']} improved, {summary['metrics_regressed']} regressed") + print(f" {summary['significant_improvements']} significant improvements, {summary['significant_regressions']} significant regressions") + + # Test counts + b_summary = analysis["baseline_summary"] + e_summary = analysis["experiment_summary"] + print(f"\nBaseline: {b_summary['successful_tests']}/{b_summary['total_tests']} tests ({b_summary['success_rate']:.1f}% success)") + print(f"Experiment: {e_summary['successful_tests']}/{e_summary['total_tests']} tests ({e_summary['success_rate']:.1f}% success)") + + print("=" * 70) + + +def main(): + parser = argparse.ArgumentParser( + description="Compare RAGAS evaluation results from two runs", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Compare baseline vs experiment + python lightrag/evaluation/compare_results.py baseline.json experiment.json + + # Save comparison to file + python lightrag/evaluation/compare_results.py baseline.json experiment.json --output comparison.json + + # Compare with/without orphan connections + python lightrag/evaluation/compare_results.py results_without_orphans.json results_with_orphans.json + """, + ) + + parser.add_argument( + "baseline", + type=str, + help="Path to baseline results JSON file", + ) + + parser.add_argument( + "experiment", + type=str, + help="Path to experiment results JSON file", + ) + + parser.add_argument( + "--output", + "-o", + type=str, + default=None, + help="Output path for comparison JSON (optional)", + ) + + args = parser.parse_args() + + baseline_path = Path(args.baseline) + experiment_path = Path(args.experiment) + + # Validate files exist + if not baseline_path.exists(): + print(f"Error: Baseline file not found: {baseline_path}") + sys.exit(1) + if not experiment_path.exists(): + print(f"Error: Experiment file not found: {experiment_path}") + sys.exit(1) + + # Run analysis + analysis = analyze_results(baseline_path, experiment_path) + + # Print report + print_comparison_report(analysis) + + # Save to file if requested + if args.output: + output_path = Path(args.output) + with open(output_path, "w") as f: + json.dump(analysis, f, indent=2) + print(f"\nComparison saved to: {output_path}") + + # Exit with status based on verdict + if analysis["verdict"] in ("SIGNIFICANT_REGRESSION",): + sys.exit(1) + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/lightrag/evaluation/download_wikipedia.py b/lightrag/evaluation/download_wikipedia.py new file mode 100644 index 00000000..a8862aab --- /dev/null +++ b/lightrag/evaluation/download_wikipedia.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 +""" +Download Wikipedia articles for LightRAG ingestion testing. + +This script fetches plain text from Wikipedia articles across diverse domains +to create a test dataset with intentional entity overlap for testing: +- Entity merging and summarization +- Cross-domain relationships +- Parallel processing optimizations + +Usage: + python lightrag/evaluation/download_wikipedia.py + python lightrag/evaluation/download_wikipedia.py --output wiki_docs/ + python lightrag/evaluation/download_wikipedia.py --domains medical,climate +""" + +import argparse +import asyncio +from pathlib import Path + +import httpx + +# Wikipedia API endpoint (no auth required) +WIKI_API = "https://en.wikipedia.org/w/api.php" + +# User-Agent required by Wikipedia API policy +# See: https://meta.wikimedia.org/wiki/User-Agent_policy +USER_AGENT = "LightRAG-Test-Downloader/1.0 (https://github.com/HKUDS/LightRAG; claude@example.com)" + +# Article selection by domain - chosen for entity overlap +# WHO → Medical + Climate +# Carbon/Emissions → Climate + Finance (ESG) +# Germany/Brazil → Sports + general knowledge +ARTICLES = { + "medical": ["Diabetes", "COVID-19"], + "finance": ["Stock_market", "Cryptocurrency"], + "climate": ["Climate_change", "Renewable_energy"], + "sports": ["FIFA_World_Cup", "Olympic_Games"], +} + + +async def fetch_article(title: str, client: httpx.AsyncClient) -> dict | None: + """Fetch Wikipedia article text via API. + + Args: + title: Wikipedia article title (use underscores for spaces) + client: Async HTTP client + + Returns: + Dict with title, content, and source; or None if not found + """ + params = { + "action": "query", + "titles": title, + "prop": "extracts", + "explaintext": True, # Plain text, no HTML + "format": "json", + } + response = await client.get(WIKI_API, params=params) + + # Check for HTTP errors + if response.status_code != 200: + print(f" HTTP {response.status_code} for {title}") + return None + + # Handle empty response + if not response.content: + print(f" Empty response for {title}") + return None + + try: + data = response.json() + except Exception as e: + print(f" JSON parse error for {title}: {e}") + return None + + pages = data.get("query", {}).get("pages", {}) + + for page_id, page in pages.items(): + if page_id != "-1": # -1 = not found + return { + "title": page.get("title", title), + "content": page.get("extract", ""), + "source": f"wikipedia_{title}", + } + return None + + +async def download_articles( + domains: list[str], + output_dir: Path, +) -> list[dict]: + """Download all articles for selected domains. + + Args: + domains: List of domain names (e.g., ["medical", "climate"]) + output_dir: Directory to save downloaded articles + + Returns: + List of article metadata dicts + """ + output_dir.mkdir(parents=True, exist_ok=True) + articles = [] + + headers = {"User-Agent": USER_AGENT} + async with httpx.AsyncClient(timeout=30.0, headers=headers) as client: + for domain in domains: + titles = ARTICLES.get(domain, []) + if not titles: + print(f"[{domain.upper()}] Unknown domain, skipping") + continue + + print(f"[{domain.upper()}] Downloading {len(titles)} articles...") + + for title in titles: + article = await fetch_article(title, client) + if article: + # Save to file + filename = f"{domain}_{title.lower().replace(' ', '_')}.txt" + filepath = output_dir / filename + filepath.write_text(article["content"]) + + word_count = len(article["content"].split()) + print(f" ✓ {title}: {word_count:,} words") + + articles.append( + { + "domain": domain, + "title": article["title"], + "file": str(filepath), + "words": word_count, + "source": article["source"], + } + ) + else: + print(f" ✗ {title}: Not found") + + return articles + + +async def main(): + parser = argparse.ArgumentParser(description="Download Wikipedia test articles") + parser.add_argument( + "--output", + "-o", + type=str, + default="lightrag/evaluation/wiki_documents", + help="Output directory for downloaded articles", + ) + parser.add_argument( + "--domains", + "-d", + type=str, + default="medical,finance,climate,sports", + help="Comma-separated domains to download", + ) + args = parser.parse_args() + + domains = [d.strip() for d in args.domains.split(",")] + output_dir = Path(args.output) + + print("=== Wikipedia Article Downloader ===") + print(f"Domains: {', '.join(domains)}") + print(f"Output: {output_dir}/") + print() + + articles = await download_articles(domains, output_dir) + + total_words = sum(a["words"] for a in articles) + print() + print(f"✓ Downloaded {len(articles)} articles ({total_words:,} words total)") + print(f" Output: {output_dir}/") + + # Print summary by domain + print("\nBy domain:") + for domain in domains: + domain_articles = [a for a in articles if a["domain"] == domain] + domain_words = sum(a["words"] for a in domain_articles) + print(f" {domain}: {len(domain_articles)} articles, {domain_words:,} words") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/lightrag/evaluation/e2e_test_harness.py b/lightrag/evaluation/e2e_test_harness.py new file mode 100644 index 00000000..07e040a1 --- /dev/null +++ b/lightrag/evaluation/e2e_test_harness.py @@ -0,0 +1,531 @@ +#!/usr/bin/env python3 +""" +E2E RAGAS Test Harness for LightRAG + +Complete end-to-end testing pipeline: +1. Download arXiv papers (reproducible test data) +2. Clear existing data (optional) +3. Ingest papers into LightRAG +4. Wait for processing +5. Generate Q&A dataset +6. Run RAGAS evaluation +7. Optional: A/B comparison + +Usage: + # Full E2E test + python lightrag/evaluation/e2e_test_harness.py + + # A/B comparison (with/without orphan connections) + python lightrag/evaluation/e2e_test_harness.py --ab-test + + # Skip download if papers exist + python lightrag/evaluation/e2e_test_harness.py --skip-download + + # Use existing dataset + python lightrag/evaluation/e2e_test_harness.py --dataset existing_dataset.json +""" + +import argparse +import asyncio +import json +import os +import subprocess +import sys +import time +from datetime import datetime +from pathlib import Path +from typing import Any + +import httpx +from dotenv import load_dotenv + +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + +# Load environment variables +load_dotenv(dotenv_path=".env", override=False) + +# Configuration +DEFAULT_RAG_URL = "http://localhost:9622" +DEFAULT_PAPERS = ["2312.10997", "2404.10981", "2005.11401"] +POLL_INTERVAL_SECONDS = 10 +MAX_WAIT_SECONDS = 600 # 10 minutes max wait for processing + + +class E2ETestHarness: + """End-to-end test harness for LightRAG RAGAS evaluation.""" + + def __init__( + self, + rag_url: str = None, + paper_ids: list[str] = None, + questions_per_paper: int = 5, + skip_download: bool = False, + skip_ingest: bool = False, + dataset_path: str = None, + output_dir: str = None, + ): + self.rag_url = (rag_url or os.getenv("LIGHTRAG_API_URL", DEFAULT_RAG_URL)).rstrip("/") + self.paper_ids = paper_ids or DEFAULT_PAPERS + self.questions_per_paper = questions_per_paper + self.skip_download = skip_download + self.skip_ingest = skip_ingest + self.dataset_path = Path(dataset_path) if dataset_path else None + + # Determine directories + self.eval_dir = Path(__file__).parent + self.papers_dir = self.eval_dir / "papers" + self.results_dir = Path(output_dir) if output_dir else self.eval_dir / "results" + self.results_dir.mkdir(parents=True, exist_ok=True) + + # API key for LightRAG + self.api_key = os.getenv("LIGHTRAG_API_KEY") + + async def check_lightrag_health(self) -> bool: + """Check if LightRAG API is accessible.""" + try: + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.get(f"{self.rag_url}/health") + response.raise_for_status() + print(f"[OK] LightRAG API accessible at {self.rag_url}") + return True + except Exception as e: + print(f"[ERROR] Cannot connect to LightRAG API: {e}") + return False + + async def download_papers(self) -> list[str]: + """Download arXiv papers.""" + if self.skip_download: + print("[SKIP] Paper download (--skip-download)") + # Check existing papers + existing = [ + str(self.papers_dir / f"{pid}.pdf") + for pid in self.paper_ids + if (self.papers_dir / f"{pid}.pdf").exists() + ] + print(f"[INFO] Found {len(existing)} existing papers") + return existing + + print("\n" + "=" * 60) + print("STEP 1: Download arXiv Papers") + print("=" * 60) + + from lightrag.evaluation.download_arxiv import download_papers + + results = await download_papers(self.paper_ids, self.papers_dir) + return [r["path"] for r in results if r["status"] in ("downloaded", "exists")] + + async def clear_existing_data(self) -> bool: + """Clear existing documents in LightRAG (optional).""" + print("\n[INFO] Clearing existing data...") + try: + headers = {"X-API-Key": self.api_key} if self.api_key else {} + async with httpx.AsyncClient(timeout=60.0) as client: + # Get current documents + response = await client.get( + f"{self.rag_url}/documents", + headers=headers, + ) + response.raise_for_status() + docs = response.json() + + # Clear all documents + statuses = docs.get("statuses", {}) + all_docs = [] + for status_docs in statuses.values(): + all_docs.extend(status_docs) + + if all_docs: + print(f"[INFO] Clearing {len(all_docs)} existing documents...") + for doc in all_docs: + doc_id = doc.get("id") + if doc_id: + await client.delete( + f"{self.rag_url}/documents/{doc_id}", + headers=headers, + ) + print("[OK] Cleared existing documents") + else: + print("[OK] No existing documents to clear") + + return True + except Exception as e: + print(f"[WARN] Could not clear data: {e}") + return False + + async def ingest_papers(self, paper_paths: list[str]) -> bool: + """Ingest papers into LightRAG.""" + if self.skip_ingest: + print("[SKIP] Paper ingestion (--skip-ingest)") + return True + + print("\n" + "=" * 60) + print("STEP 2: Ingest Papers into LightRAG") + print("=" * 60) + + headers = {"X-API-Key": self.api_key} if self.api_key else {} + + async with httpx.AsyncClient(timeout=300.0) as client: + for paper_path in paper_paths: + path = Path(paper_path) + if not path.exists(): + print(f"[WARN] Paper not found: {paper_path}") + continue + + print(f"[UPLOAD] {path.name}") + + try: + with open(path, "rb") as f: + files = {"file": (path.name, f, "application/pdf")} + response = await client.post( + f"{self.rag_url}/documents/upload", + files=files, + headers=headers, + ) + response.raise_for_status() + result = response.json() + print(f" [OK] Uploaded: {result}") + except Exception as e: + print(f" [ERROR] Upload failed: {e}") + + return True + + async def wait_for_processing(self) -> bool: + """Wait for all documents to finish processing.""" + print("\n" + "=" * 60) + print("STEP 3: Wait for Document Processing") + print("=" * 60) + + headers = {"X-API-Key": self.api_key} if self.api_key else {} + start_time = time.time() + + async with httpx.AsyncClient(timeout=30.0) as client: + while time.time() - start_time < MAX_WAIT_SECONDS: + try: + response = await client.get( + f"{self.rag_url}/documents", + headers=headers, + ) + response.raise_for_status() + docs = response.json() + + statuses = docs.get("statuses", {}) + # API returns lowercase status keys + processing = len(statuses.get("processing", [])) + pending = len(statuses.get("pending", [])) + completed = len(statuses.get("processed", [])) # Note: "processed" not "completed" + failed = len(statuses.get("failed", [])) + + elapsed = int(time.time() - start_time) + print(f" [{elapsed}s] Processing: {processing}, Pending: {pending}, Completed: {completed}, Failed: {failed}") + + if processing == 0 and pending == 0: + print("[OK] All documents processed") + return True + + except Exception as e: + print(f" [WARN] Status check failed: {e}") + + await asyncio.sleep(POLL_INTERVAL_SECONDS) + + print("[ERROR] Timeout waiting for document processing") + return False + + async def generate_dataset(self) -> Path: + """Generate Q&A dataset from ingested papers.""" + if self.dataset_path and self.dataset_path.exists(): + print(f"[SKIP] Using existing dataset: {self.dataset_path}") + return self.dataset_path + + print("\n" + "=" * 60) + print("STEP 4: Generate Q&A Dataset") + print("=" * 60) + + from lightrag.evaluation.generate_arxiv_dataset import generate_dataset + + output_path = self.eval_dir / f"arxiv_dataset_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + + await generate_dataset( + paper_ids=self.paper_ids, + questions_per_paper=self.questions_per_paper, + rag_url=self.rag_url, + output_path=output_path, + ) + + return output_path + + async def run_ragas_evaluation(self, dataset_path: Path) -> dict: + """Run RAGAS evaluation.""" + print("\n" + "=" * 60) + print("STEP 5: Run RAGAS Evaluation") + print("=" * 60) + + from lightrag.evaluation.eval_rag_quality import RAGEvaluator + + evaluator = RAGEvaluator( + test_dataset_path=str(dataset_path), + rag_api_url=self.rag_url, + ) + + results = await evaluator.run() + return results + + async def run_full_pipeline(self) -> dict: + """Run the complete E2E test pipeline.""" + print("=" * 70) + print("E2E RAGAS TEST HARNESS FOR LIGHTRAG") + print("=" * 70) + print(f"RAG URL: {self.rag_url}") + print(f"Papers: {', '.join(self.paper_ids)}") + print(f"Questions: {self.questions_per_paper} per paper") + print(f"Results: {self.results_dir}") + print("=" * 70) + + start_time = time.time() + + # Check LightRAG is accessible + if not await self.check_lightrag_health(): + return {"error": "LightRAG API not accessible"} + + # Step 1: Download papers + paper_paths = await self.download_papers() + if not paper_paths: + return {"error": "No papers to process"} + + # Step 2: Ingest papers + if not await self.ingest_papers(paper_paths): + return {"error": "Paper ingestion failed"} + + # Step 3: Wait for processing + if not self.skip_ingest: + if not await self.wait_for_processing(): + return {"error": "Document processing timeout"} + + # Step 4: Generate dataset + dataset_path = await self.generate_dataset() + + # Step 5: Run RAGAS evaluation + results = await self.run_ragas_evaluation(dataset_path) + + elapsed_time = time.time() - start_time + + # Save summary + summary = { + "pipeline_completed_at": datetime.now().isoformat(), + "total_elapsed_seconds": round(elapsed_time, 2), + "papers": self.paper_ids, + "dataset_path": str(dataset_path), + "ragas_results": results.get("benchmark_stats", {}), + } + + summary_path = self.results_dir / f"e2e_summary_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + with open(summary_path, "w") as f: + json.dump(summary, f, indent=2) + + print("\n" + "=" * 70) + print("E2E PIPELINE COMPLETE") + print("=" * 70) + print(f"Total time: {elapsed_time:.1f} seconds") + print(f"Summary saved: {summary_path}") + print("=" * 70) + + return summary + + +async def run_ab_test( + harness_config: dict, + clear_between_runs: bool = True, +) -> dict: + """ + Run A/B test comparing with/without orphan connections. + + Args: + harness_config: Configuration for E2ETestHarness + clear_between_runs: Clear data between A and B runs + + Returns: + A/B comparison results + """ + print("=" * 70) + print("A/B TEST: WITH vs WITHOUT ORPHAN CONNECTIONS") + print("=" * 70) + + results = {} + + # Test A: WITHOUT orphan connections + print("\n[A] Running WITHOUT orphan connections...") + os.environ["AUTO_CONNECT_ORPHANS"] = "false" + + harness_a = E2ETestHarness(**harness_config) + results["without_orphans"] = await harness_a.run_full_pipeline() + + # Clear for next run + if clear_between_runs: + await harness_a.clear_existing_data() + + # Test B: WITH orphan connections + print("\n[B] Running WITH orphan connections...") + os.environ["AUTO_CONNECT_ORPHANS"] = "true" + + # Force re-ingest for test B + harness_config_b = harness_config.copy() + harness_config_b["skip_download"] = True # Papers already downloaded + harness_config_b["skip_ingest"] = False # Need to re-ingest + + harness_b = E2ETestHarness(**harness_config_b) + results["with_orphans"] = await harness_b.run_full_pipeline() + + # Compare results + print("\n" + "=" * 70) + print("A/B COMPARISON") + print("=" * 70) + + a_stats = results["without_orphans"].get("ragas_results", {}).get("average_metrics", {}) + b_stats = results["with_orphans"].get("ragas_results", {}).get("average_metrics", {}) + + comparison = { + "timestamp": datetime.now().isoformat(), + "without_orphans": a_stats, + "with_orphans": b_stats, + "improvement": {}, + } + + for metric in ["faithfulness", "answer_relevance", "context_recall", "context_precision", "ragas_score"]: + a_val = a_stats.get(metric, 0) + b_val = b_stats.get(metric, 0) + diff = b_val - a_val + pct = (diff / a_val * 100) if a_val > 0 else 0 + + comparison["improvement"][metric] = { + "absolute": round(diff, 4), + "percent": round(pct, 2), + } + + status = "UP" if diff > 0 else ("DOWN" if diff < 0 else "~") + print(f" {metric:<20} A: {a_val:.4f} B: {b_val:.4f} [{status}] {pct:+.1f}%") + + # Verdict + ragas_improvement = comparison["improvement"].get("ragas_score", {}).get("percent", 0) + if ragas_improvement > 5: + verdict = "ORPHAN CONNECTIONS IMPROVE QUALITY" + elif ragas_improvement < -5: + verdict = "ORPHAN CONNECTIONS DEGRADE QUALITY" + else: + verdict = "NO SIGNIFICANT DIFFERENCE" + + comparison["verdict"] = verdict + print(f"\nVERDICT: {verdict}") + + # Save comparison + comp_path = harness_a.results_dir / f"ab_comparison_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + with open(comp_path, "w") as f: + json.dump(comparison, f, indent=2) + print(f"\nComparison saved: {comp_path}") + + return comparison + + +async def main(): + parser = argparse.ArgumentParser( + description="E2E RAGAS Test Harness for LightRAG", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Full E2E test + python lightrag/evaluation/e2e_test_harness.py + + # A/B test (with/without orphan connections) + python lightrag/evaluation/e2e_test_harness.py --ab-test + + # Skip paper download + python lightrag/evaluation/e2e_test_harness.py --skip-download + + # Use existing dataset + python lightrag/evaluation/e2e_test_harness.py --dataset arxiv_dataset.json + """, + ) + + parser.add_argument( + "--rag-url", + "-r", + type=str, + default=None, + help=f"LightRAG API URL (default: {DEFAULT_RAG_URL})", + ) + + parser.add_argument( + "--papers", + "-p", + type=str, + default=None, + help="Comma-separated arXiv paper IDs", + ) + + parser.add_argument( + "--questions", + "-q", + type=int, + default=5, + help="Questions per paper (default: 5)", + ) + + parser.add_argument( + "--skip-download", + action="store_true", + help="Skip paper download (use existing)", + ) + + parser.add_argument( + "--skip-ingest", + action="store_true", + help="Skip paper ingestion (use existing data)", + ) + + parser.add_argument( + "--dataset", + "-d", + type=str, + default=None, + help="Path to existing Q&A dataset (skip generation)", + ) + + parser.add_argument( + "--output-dir", + "-o", + type=str, + default=None, + help="Output directory for results", + ) + + parser.add_argument( + "--ab-test", + action="store_true", + help="Run A/B test comparing with/without orphan connections", + ) + + args = parser.parse_args() + + # Parse paper IDs + paper_ids = None + if args.papers: + paper_ids = [p.strip() for p in args.papers.split(",")] + + harness_config = { + "rag_url": args.rag_url, + "paper_ids": paper_ids, + "questions_per_paper": args.questions, + "skip_download": args.skip_download, + "skip_ingest": args.skip_ingest, + "dataset_path": args.dataset, + "output_dir": args.output_dir, + } + + if args.ab_test: + await run_ab_test(harness_config) + else: + harness = E2ETestHarness(**harness_config) + await harness.run_full_pipeline() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/lightrag/evaluation/ingest_test_docs.py b/lightrag/evaluation/ingest_test_docs.py new file mode 100644 index 00000000..5b3ee313 --- /dev/null +++ b/lightrag/evaluation/ingest_test_docs.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 +""" +Ingest test documents into LightRAG for testing. + +This script reads text files from a directory and batch-uploads them to +LightRAG via the /documents/texts API endpoint, then polls for completion. + +Usage: + python lightrag/evaluation/ingest_test_docs.py + python lightrag/evaluation/ingest_test_docs.py --input wiki_documents/ --rag-url http://localhost:9622 +""" + +import argparse +import asyncio +import os +import time +from pathlib import Path + +import httpx + +DEFAULT_RAG_URL = "http://localhost:9622" + + +async def ingest_documents( + input_dir: Path, + rag_url: str, +) -> dict: + """Ingest all text files from directory into LightRAG. + + Args: + input_dir: Directory containing .txt or .md files + rag_url: LightRAG API base URL + + Returns: + Dict with ingestion statistics + """ + timeout = httpx.Timeout(120.0, connect=30.0) + api_key = os.getenv("LIGHTRAG_API_KEY") + headers = {"X-API-Key": api_key} if api_key else {} + + async with httpx.AsyncClient(timeout=timeout) as client: + # Check health + try: + health = await client.get(f"{rag_url}/health") + if health.status_code != 200: + raise ConnectionError(f"LightRAG not healthy: {health.status_code}") + except httpx.ConnectError: + raise ConnectionError(f"Cannot connect to LightRAG at {rag_url}") + + print(f"✓ Connected to LightRAG at {rag_url}") + + # Collect all text files + files = list(input_dir.glob("*.txt")) + list(input_dir.glob("*.md")) + if not files: + print(f"✗ No .txt or .md files found in {input_dir}") + return {"documents": 0, "elapsed_seconds": 0} + + print(f" Found {len(files)} documents to ingest") + + # Read all texts + texts = [] + sources = [] + for file in sorted(files): + content = file.read_text() + texts.append(content) + sources.append(file.name) + word_count = len(content.split()) + print(f" {file.name}: {word_count:,} words") + + # Batch ingest via /documents/texts + print(f"\n Uploading {len(texts)} documents...") + start = time.time() + + response = await client.post( + f"{rag_url}/documents/texts", + json={"texts": texts, "file_sources": sources}, + headers=headers, + ) + response.raise_for_status() + result = response.json() + + track_id = result.get("track_id", "") + print(f" Track ID: {track_id}") + + # Poll for completion - wait for processing to start first + print(" Waiting for processing to start...") + await asyncio.sleep(2) # Give server time to queue documents + + last_status = "" + processed_count = 0 + expected_total = len(texts) + initial_check = True + + while True: + status_response = await client.get(f"{rag_url}/documents") + docs = status_response.json() + statuses = docs.get("statuses", {}) + + processing = len(statuses.get("processing", [])) + pending = len(statuses.get("pending", [])) + processed = len(statuses.get("processed", [])) + total_visible = processing + pending + processed + + current_status = f"Pending: {pending}, Processing: {processing}, Processed: {processed}" + if current_status != last_status: + print(f" {current_status}") + last_status = current_status + processed_count = processed + + # Wait until we see at least some of our docs in the queue + if initial_check and (pending > 0 or processing > 0): + initial_check = False + print(" Processing started!") + + # Only exit when processing is done AND we've processed something new + if processing == 0 and pending == 0 and not initial_check: + break + + await asyncio.sleep(5) + + elapsed = time.time() - start + print(f"\n✓ Ingestion complete in {elapsed:.1f}s") + print(f" Documents processed: {processed_count}") + print(f" Average: {elapsed / len(texts):.1f}s per document") + + return { + "documents": len(texts), + "processed": processed_count, + "elapsed_seconds": elapsed, + "track_id": track_id, + } + + +async def main(): + parser = argparse.ArgumentParser(description="Ingest test documents into LightRAG") + parser.add_argument( + "--input", + "-i", + type=str, + default="lightrag/evaluation/wiki_documents", + help="Input directory with text files", + ) + parser.add_argument( + "--rag-url", + "-r", + type=str, + default=None, + help=f"LightRAG API URL (default: {DEFAULT_RAG_URL})", + ) + args = parser.parse_args() + + input_dir = Path(args.input) + rag_url = args.rag_url or os.getenv("LIGHTRAG_API_URL", DEFAULT_RAG_URL) + + print("=== LightRAG Document Ingestion ===") + print(f"Input: {input_dir}/") + print(f"RAG URL: {rag_url}") + print() + + if not input_dir.exists(): + print(f"✗ Input directory not found: {input_dir}") + print(" Run download_wikipedia.py first:") + print(" python lightrag/evaluation/download_wikipedia.py") + return + + await ingest_documents(input_dir, rag_url) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/lightrag/evaluation/populate_test_data.sh b/lightrag/evaluation/populate_test_data.sh new file mode 100755 index 00000000..3efb94ca --- /dev/null +++ b/lightrag/evaluation/populate_test_data.sh @@ -0,0 +1,43 @@ +#!/bin/bash +# Quick script to populate LightRAG with diverse test documents +# +# This downloads Wikipedia articles across 4 domains (Medical, Finance, Climate, Sports) +# and ingests them into LightRAG. The articles are chosen to have entity overlap +# (WHO, Carbon/Emissions, Organizations) to test entity merging and summarization. +# +# Usage: +# ./lightrag/evaluation/populate_test_data.sh +# LIGHTRAG_API_URL=http://localhost:9622 ./lightrag/evaluation/populate_test_data.sh + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +RAG_URL="${LIGHTRAG_API_URL:-http://localhost:9622}" + +echo "=== LightRAG Test Data Population ===" +echo "RAG URL: $RAG_URL" +echo "" + +# Check if LightRAG is running +if ! curl -s "$RAG_URL/health" > /dev/null 2>&1; then + echo "✗ Cannot connect to LightRAG at $RAG_URL" + echo " Make sure LightRAG is running first" + exit 1 +fi + +# 1. Download Wikipedia articles +echo "[1/2] Downloading Wikipedia articles..." +python3 "$SCRIPT_DIR/download_wikipedia.py" + +# 2. Ingest into LightRAG +echo "" +echo "[2/2] Ingesting documents..." +python3 "$SCRIPT_DIR/ingest_test_docs.py" --rag-url "$RAG_URL" + +echo "" +echo "=== Done! ===" +echo "Documents ingested into LightRAG." +echo "" +echo "Next steps:" +echo " - Check graph stats: curl $RAG_URL/graph/statistics" +echo " - Query the data: curl '$RAG_URL/query?mode=global&query=What+is+climate+change'" diff --git a/lightrag/operate.py b/lightrag/operate.py index acdef5b5..dcde684a 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -280,12 +280,12 @@ async def _handle_entity_relation_summary( f" Summarizing {entity_or_relation_name}: Map {len(current_list)} descriptions into {len(chunks)} groups" ) - # Reduce phase: summarize each group from chunks - new_summaries = [] - for chunk in chunks: + # Reduce phase: summarize each group from chunks IN PARALLEL + async def _summarize_single_chunk(chunk: list[str]) -> tuple[str, bool]: + """Summarize a single chunk, returning (summary, used_llm).""" if len(chunk) == 1: # Optimization: single description chunks don't need LLM summarization - new_summaries.append(chunk[0]) + return chunk[0], False else: # Multiple descriptions need LLM summarization summary = await _summarize_descriptions( @@ -295,8 +295,18 @@ async def _handle_entity_relation_summary( global_config, llm_response_cache, ) - new_summaries.append(summary) - llm_was_used = True # Mark that LLM was used in reduce phase + return summary, True + + # Create tasks for all chunks and run in parallel + tasks = [ + asyncio.create_task(_summarize_single_chunk(chunk)) for chunk in chunks + ] + results = await asyncio.gather(*tasks) + + # Collect results while preserving order + new_summaries = [result[0] for result in results] + if any(result[1] for result in results): + llm_was_used = True # Mark that LLM was used in reduce phase # Update current list with new summaries for next iteration current_list = new_summaries @@ -2115,7 +2125,7 @@ async def _merge_nodes_then_upsert( deduplicated_num = already_fragment + len(nodes_data) - num_fragment dd_message = "" if deduplicated_num > 0: - # Duplicated description detected across multiple trucks for the same entity + # Duplicated description detected across multiple chunks for the same entity dd_message = f"dd {deduplicated_num}" if dd_message or truncation_info_log: @@ -2459,7 +2469,7 @@ async def _merge_edges_then_upsert( deduplicated_num = already_fragment + len(edges_data) - num_fragment dd_message = "" if deduplicated_num > 0: - # Duplicated description detected across multiple trucks for the same entity + # Duplicated description detected across multiple chunks for the same entity dd_message = f"dd {deduplicated_num}" if dd_message or truncation_info_log: