diff --git a/lightrag/api/routers/query_routes.py b/lightrag/api/routers/query_routes.py index f0ee0e98..b8f95a8f 100644 --- a/lightrag/api/routers/query_routes.py +++ b/lightrag/api/routers/query_routes.py @@ -103,6 +103,11 @@ class QueryRequest(BaseModel): description="If True, includes reference list in responses. Affects /query and /query/stream endpoints. /query/data always includes references.", ) + include_chunk_content: Optional[bool] = Field( + default=False, + description="If True, includes actual chunk text content in references. Only applies when include_references=True. Useful for evaluation and debugging.", + ) + stream: Optional[bool] = Field( default=True, description="If True, enables streaming output for real-time responses. Only affects /query/stream endpoint.", @@ -130,7 +135,10 @@ class QueryRequest(BaseModel): def to_query_params(self, is_stream: bool) -> "QueryParam": """Converts a QueryRequest instance into a QueryParam instance.""" # Use Pydantic's `.model_dump(exclude_none=True)` to remove None values automatically - request_data = self.model_dump(exclude_none=True, exclude={"query"}) + # Exclude API-level parameters that don't belong in QueryParam + request_data = self.model_dump( + exclude_none=True, exclude={"query", "include_chunk_content"} + ) # Ensure `mode` and `stream` are set explicitly param = QueryParam(**request_data) @@ -368,13 +376,39 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60): # Extract LLM response and references from unified result llm_response = result.get("llm_response", {}) - references = result.get("data", {}).get("references", []) + data = result.get("data", {}) + references = data.get("references", []) # Get the non-streaming response content response_content = llm_response.get("content", "") if not response_content: response_content = "No relevant context found for the query." + # Enrich references with chunk content if requested + if request.include_references and request.include_chunk_content: + chunks = data.get("chunks", []) + # Create a mapping from reference_id to chunk content + ref_id_to_content = {} + for chunk in chunks: + ref_id = chunk.get("reference_id", "") + content = chunk.get("content", "") + if ref_id and content: + # If multiple chunks have same reference_id, concatenate + if ref_id in ref_id_to_content: + ref_id_to_content[ref_id] += "\n\n" + content + else: + ref_id_to_content[ref_id] = content + + # Add content to references + enriched_references = [] + for ref in references: + ref_copy = ref.copy() + ref_id = ref.get("reference_id", "") + if ref_id in ref_id_to_content: + ref_copy["content"] = ref_id_to_content[ref_id] + enriched_references.append(ref_copy) + references = enriched_references + # Return response with or without references based on request if request.include_references: return QueryResponse(response=response_content, references=references) diff --git a/lightrag/evaluation/eval_rag_quality.py b/lightrag/evaluation/eval_rag_quality.py index 1a26a103..0b5dff11 100644 --- a/lightrag/evaluation/eval_rag_quality.py +++ b/lightrag/evaluation/eval_rag_quality.py @@ -10,8 +10,8 @@ Evaluates RAG response quality using RAGAS metrics: Usage: python lightrag/evaluation/eval_rag_quality.py - python lightrag/evaluation/eval_rag_quality.py http://localhost:8000 - python lightrag/evaluation/eval_rag_quality.py http://your-rag-server.com:8000 + python lightrag/evaluation/eval_rag_quality.py http://localhost:9621 + python lightrag/evaluation/eval_rag_quality.py http://your-rag-server.com:9621 Results are saved to: lightrag/evaluation/results/ - results_YYYYMMDD_HHMMSS.csv (CSV export for analysis) @@ -70,17 +70,17 @@ class RAGEvaluator: Args: test_dataset_path: Path to test dataset JSON file - rag_api_url: Base URL of LightRAG API (e.g., http://localhost:8000) + rag_api_url: Base URL of LightRAG API (e.g., http://localhost:9621) If None, will try to read from environment or use default """ if test_dataset_path is None: test_dataset_path = Path(__file__).parent / "sample_dataset.json" if rag_api_url is None: - rag_api_url = os.getenv("LIGHTRAG_API_URL", "http://localhost:8000") + rag_api_url = os.getenv("LIGHTRAG_API_URL", "http://localhost:9621") self.test_dataset_path = Path(test_dataset_path) - self.rag_api_url = rag_api_url.rstrip("/") # Remove trailing slash + self.rag_api_url = rag_api_url.rstrip("/") self.results_dir = Path(__file__).parent / "results" self.results_dir.mkdir(exist_ok=True) @@ -100,12 +100,14 @@ class RAGEvaluator: async def generate_rag_response( self, question: str, + client: httpx.AsyncClient, ) -> Dict[str, Any]: """ Generate RAG response by calling LightRAG API. Args: question: The user query. + client: Shared httpx AsyncClient for connection pooling. Returns: Dictionary with 'answer' and 'contexts' keys. @@ -115,71 +117,104 @@ class RAGEvaluator: Exception: If LightRAG API is unavailable. """ try: - async with httpx.AsyncClient(timeout=60.0) as client: - payload = { - "query": question, - "mode": "mix", - "include_references": True, - "response_type": "Multiple Paragraphs", - "top_k": 10, - } + payload = { + "query": question, + "mode": "mix", + "include_references": True, + "include_chunk_content": True, # NEW: Request chunk content in references + "response_type": "Multiple Paragraphs", + "top_k": 10, + } - response = await client.post( - f"{self.rag_api_url}/query", - json=payload, - ) - response.raise_for_status() # Better error handling - result = response.json() + # Single optimized API call - gets both answer AND chunk content + response = await client.post( + f"{self.rag_api_url}/query", + json=payload, + ) + response.raise_for_status() + result = response.json() - # Extract text content from each reference document - references = result.get("references", []) - contexts = [ - ref.get("text", "") for ref in references if ref.get("text") - ] + answer = result.get("response", "No response generated") + references = result.get("references", []) - return { - "answer": result.get("response", "No response generated"), - "contexts": contexts, # List of strings, not JSON dump - } + # DEBUG: Inspect the API response + print(f" šŸ” References Count: {len(references)}") + if references: + first_ref = references[0] + print(f" šŸ” First Reference Keys: {list(first_ref.keys())}") + if "content" in first_ref: + print(f" šŸ” Content Preview: {first_ref['content'][:100]}...") - except httpx.ConnectError: + # Extract chunk content from enriched references + contexts = [ + ref.get("content", "") for ref in references if ref.get("content") + ] + + return { + "answer": answer, + "contexts": contexts, # List of strings from actual retrieved chunks + } + + except httpx.ConnectError as e: raise Exception( f"āŒ Cannot connect to LightRAG API at {self.rag_api_url}\n" f" Make sure LightRAG server is running:\n" - f" python -m lightrag.api.lightrag_server" + f" python -m lightrag.api.lightrag_server\n" + f" Error: {str(e)}" ) except httpx.HTTPStatusError as e: raise Exception( f"LightRAG API error {e.response.status_code}: {e.response.text}" ) + except httpx.ReadTimeout as e: + raise Exception( + f"Request timeout after waiting for response\n" + f" Question: {question[:100]}...\n" + f" Error: {str(e)}" + ) except Exception as e: - raise Exception(f"Error calling LightRAG API: {str(e)}") + raise Exception(f"Error calling LightRAG API: {type(e).__name__}: {str(e)}") - async def evaluate_responses(self) -> List[Dict[str, Any]]: + async def evaluate_single_case( + self, + idx: int, + test_case: Dict[str, str], + semaphore: asyncio.Semaphore, + client: httpx.AsyncClient, + ) -> Dict[str, Any]: """ - Evaluate all test cases and return metrics + Evaluate a single test case with concurrency control + + Args: + idx: Test case index (1-based) + test_case: Test case dictionary with question and ground_truth + semaphore: Semaphore to control concurrency + client: Shared httpx AsyncClient for connection pooling Returns: - List of evaluation results with metrics + Evaluation result dictionary """ - print("\n" + "=" * 70) - print("šŸš€ Starting RAGAS Evaluation of Portfolio RAG System") - print("=" * 70 + "\n") - - results = [] - - for idx, test_case in enumerate(self.test_cases, 1): + async with semaphore: question = test_case["question"] ground_truth = test_case["ground_truth"] print(f"[{idx}/{len(self.test_cases)}] Evaluating: {question[:60]}...") # Generate RAG response by calling actual LightRAG API - rag_response = await self.generate_rag_response(question=question) + rag_response = await self.generate_rag_response( + question=question, client=client + ) # *** CRITICAL FIX: Use actual retrieved contexts, NOT ground_truth *** retrieved_contexts = rag_response["contexts"] + # DEBUG: Print what was actually retrieved + print(f" šŸ“ Retrieved {len(retrieved_contexts)} contexts") + if retrieved_contexts: + print(f" šŸ“„ First context preview: {retrieved_contexts[0][:100]}...") + else: + print(" āš ļø WARNING: No contexts retrieved!") + # Prepare dataset for RAGAS evaluation with CORRECT contexts eval_dataset = Dataset.from_dict( { @@ -236,8 +271,6 @@ class RAGEvaluator: ragas_score = sum(metrics.values()) / len(metrics) if metrics else 0 result["ragas_score"] = round(ragas_score, 4) - results.append(result) - # Print metrics print(f" āœ… Faithfulness: {metrics['faithfulness']:.4f}") print(f" āœ… Answer Relevance: {metrics['answer_relevance']:.4f}") @@ -245,21 +278,58 @@ class RAGEvaluator: print(f" āœ… Context Precision: {metrics['context_precision']:.4f}") print(f" šŸ“Š RAGAS Score: {result['ragas_score']:.4f}\n") + return result + except Exception as e: import traceback print(f" āŒ Error evaluating: {str(e)}") print(f" šŸ” Full traceback:\n{traceback.format_exc()}\n") - result = { + return { "question": question, "error": str(e), "metrics": {}, "ragas_score": 0, "timestamp": datetime.now().isoformat(), } - results.append(result) - return results + async def evaluate_responses(self) -> List[Dict[str, Any]]: + """ + Evaluate all test cases in parallel and return metrics + + Returns: + List of evaluation results with metrics + """ + # Get MAX_ASYNC from environment (default to 4 if not set) + max_async = int(os.getenv("MAX_ASYNC", "4")) + + print("\n" + "=" * 70) + print("šŸš€ Starting RAGAS Evaluation of Portfolio RAG System") + print(f"šŸ”§ Parallel evaluations: {max_async}") + print("=" * 70 + "\n") + + # Create semaphore to limit concurrent evaluations + semaphore = asyncio.Semaphore(max_async) + + # Create shared HTTP client with connection pooling and proper timeouts + # Timeout: 3 minutes for connect, 5 minutes for read (LLM can be slow) + timeout = httpx.Timeout(180.0, connect=180.0, read=300.0) + limits = httpx.Limits( + max_connections=max_async * 2, # Allow some buffer + max_keepalive_connections=max_async, + ) + + async with httpx.AsyncClient(timeout=timeout, limits=limits) as client: + # Create tasks for all test cases + tasks = [ + self.evaluate_single_case(idx, test_case, semaphore, client) + for idx, test_case in enumerate(self.test_cases, 1) + ] + + # Run all evaluations in parallel (limited by semaphore) + results = await asyncio.gather(*tasks) + + return list(results) def _export_to_csv(self, results: List[Dict[str, Any]]) -> Path: """ @@ -321,6 +391,111 @@ class RAGEvaluator: return csv_path + def _calculate_benchmark_stats( + self, results: List[Dict[str, Any]] + ) -> Dict[str, Any]: + """ + Calculate benchmark statistics from evaluation results + + Args: + results: List of evaluation results + + Returns: + Dictionary with benchmark statistics + """ + # Filter out results with errors + valid_results = [r for r in results if r.get("metrics")] + total_tests = len(results) + successful_tests = len(valid_results) + failed_tests = total_tests - successful_tests + + if not valid_results: + return { + "total_tests": total_tests, + "successful_tests": 0, + "failed_tests": failed_tests, + "success_rate": 0.0, + } + + # Calculate averages for each metric (handling NaN values) + import math + + metrics_sum = { + "faithfulness": 0.0, + "answer_relevance": 0.0, + "context_recall": 0.0, + "context_precision": 0.0, + "ragas_score": 0.0, + } + + for result in valid_results: + metrics = result.get("metrics", {}) + # Skip NaN values when summing + faithfulness = metrics.get("faithfulness", 0) + if ( + not math.isnan(faithfulness) + if isinstance(faithfulness, float) + else True + ): + metrics_sum["faithfulness"] += faithfulness + + answer_relevance = metrics.get("answer_relevance", 0) + if ( + not math.isnan(answer_relevance) + if isinstance(answer_relevance, float) + else True + ): + metrics_sum["answer_relevance"] += answer_relevance + + context_recall = metrics.get("context_recall", 0) + if ( + not math.isnan(context_recall) + if isinstance(context_recall, float) + else True + ): + metrics_sum["context_recall"] += context_recall + + context_precision = metrics.get("context_precision", 0) + if ( + not math.isnan(context_precision) + if isinstance(context_precision, float) + else True + ): + metrics_sum["context_precision"] += context_precision + + ragas_score = result.get("ragas_score", 0) + if not math.isnan(ragas_score) if isinstance(ragas_score, float) else True: + metrics_sum["ragas_score"] += ragas_score + + # Calculate averages + n = len(valid_results) + avg_metrics = {} + for k, v in metrics_sum.items(): + avg_val = v / n if n > 0 else 0 + # Handle NaN in average + avg_metrics[k] = round(avg_val, 4) if not math.isnan(avg_val) else 0.0 + + # Find min and max RAGAS scores (filter out NaN) + ragas_scores = [] + for r in valid_results: + score = r.get("ragas_score", 0) + if isinstance(score, float) and math.isnan(score): + continue # Skip NaN values + ragas_scores.append(score) + + min_score = min(ragas_scores) if ragas_scores else 0 + max_score = max(ragas_scores) if ragas_scores else 0 + + return { + "total_tests": total_tests, + "successful_tests": successful_tests, + "failed_tests": failed_tests, + "success_rate": round(successful_tests / total_tests * 100, 2), + "average_metrics": avg_metrics, + "min_ragas_score": round(min_score, 4), + "max_ragas_score": round(max_score, 4), + } + async def run(self) -> Dict[str, Any]: """Run complete evaluation pipeline""" @@ -331,11 +506,15 @@ class RAGEvaluator: elapsed_time = time.time() - start_time + # Calculate benchmark statistics + benchmark_stats = self._calculate_benchmark_stats(results) + # Save results summary = { "timestamp": datetime.now().isoformat(), "total_tests": len(results), "elapsed_time_seconds": round(elapsed_time, 2), + "benchmark_stats": benchmark_stats, "results": results, } @@ -357,9 +536,29 @@ class RAGEvaluator: print("šŸ“Š EVALUATION COMPLETE") print("=" * 70) print(f"Total Tests: {len(results)}") + print(f"Successful: {benchmark_stats['successful_tests']}") + print(f"Failed: {benchmark_stats['failed_tests']}") + print(f"Success Rate: {benchmark_stats['success_rate']:.2f}%") print(f"Elapsed Time: {elapsed_time:.2f} seconds") + print(f"Avg Time/Test: {elapsed_time / len(results):.2f} seconds") + + # Print benchmark metrics + print("\n" + "=" * 70) + print("šŸ“ˆ BENCHMARK RESULTS (Moyennes)") + print("=" * 70) + avg = benchmark_stats["average_metrics"] + print(f"Moyenne Faithfulness: {avg['faithfulness']:.4f}") + print(f"Moyenne Answer Relevance: {avg['answer_relevance']:.4f}") + print(f"Moyenne Context Recall: {avg['context_recall']:.4f}") + print(f"Moyenne Context Precision: {avg['context_precision']:.4f}") + print(f"Moyenne RAGAS Score: {avg['ragas_score']:.4f}") + print(f"\nMin RAGAS Score: {benchmark_stats['min_ragas_score']:.4f}") + print(f"Max RAGAS Score: {benchmark_stats['max_ragas_score']:.4f}") + + print("\n" + "=" * 70) + print("šŸ“ GENERATED FILES") + print("=" * 70) print(f"Results Dir: {self.results_dir.absolute()}") - print("\nšŸ“ Generated Files:") print(f" • CSV: {csv_path.name}") print(f" • JSON: {json_path.name}") print("=" * 70 + "\n") @@ -373,8 +572,8 @@ async def main(): Usage: python lightrag/evaluation/eval_rag_quality.py - python lightrag/evaluation/eval_rag_quality.py http://localhost:8000 - python lightrag/evaluation/eval_rag_quality.py http://your-server.com:8000 + python lightrag/evaluation/eval_rag_quality.py http://localhost:9621 + python lightrag/evaluation/eval_rag_quality.py http://your-server.com:9621 """ try: # Get RAG API URL from command line or environment @@ -388,7 +587,7 @@ async def main(): if rag_api_url: print(f"šŸ“” RAG API URL: {rag_api_url}") else: - print("šŸ“” RAG API URL: http://localhost:8000 (default)") + print("šŸ“” RAG API URL: http://localhost:9621 (default)") print("=" * 70 + "\n") evaluator = RAGEvaluator(rag_api_url=rag_api_url)