diff --git a/examples/lightrag_openai_demo.py b/examples/lightrag_openai_demo.py index c72f0a82..0f8095f8 100644 --- a/examples/lightrag_openai_demo.py +++ b/examples/lightrag_openai_demo.py @@ -24,7 +24,7 @@ def configure_logging(): log_file_path = os.path.abspath(os.path.join(log_dir, 'lightrag_demo.log')) print(f'\nLightRAG demo log file: {log_file_path}\n') - os.makedirs(os.path.dirname(log_dir), exist_ok=True) + os.makedirs(log_dir, exist_ok=True) # Get log file max size and backup count from environment variables log_max_bytes = int(os.getenv('LOG_MAX_BYTES', 10485760)) # Default 10MB diff --git a/examples/unofficial-sample/copy_llm_cache_to_another_storage.py b/examples/unofficial-sample/copy_llm_cache_to_another_storage.py index ad7a188d..2a4ebf5a 100644 --- a/examples/unofficial-sample/copy_llm_cache_to_another_storage.py +++ b/examples/unofficial-sample/copy_llm_cache_to_another_storage.py @@ -5,7 +5,6 @@ This handy script helps you to copy the LLM caches from one storage solution to """ import asyncio -import logging import os from dotenv import load_dotenv @@ -13,13 +12,12 @@ from dotenv import load_dotenv from lightrag.kg.json_kv_impl import JsonKVStorage from lightrag.kg.postgres_impl import PGKVStorage, PostgreSQLDB from lightrag.namespace import NameSpace +from lightrag.utils import logger load_dotenv() ROOT_DIR = os.environ.get('ROOT_DIR') WORKING_DIR = f'{ROOT_DIR}/dickens' -logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO) - if not os.path.exists(WORKING_DIR): os.mkdir(WORKING_DIR) @@ -66,13 +64,13 @@ async def copy_from_postgres_to_json(): if mode not in kv: kv[mode] = {} kv[mode][hash_value] = cache_entry - print(f'Copying {flattened_key} -> {mode}[{hash_value}]') + logger.info(f'Copying {flattened_key} -> {mode}[{hash_value}]') else: - print(f'Skipping invalid key format: {flattened_key}') + logger.warning(f'Skipping invalid key format: {flattened_key}') await to_llm_response_cache.upsert(kv) await to_llm_response_cache.index_done_callback() - print('Mission accomplished!') + logger.info('Mission accomplished!') async def copy_from_json_to_postgres(): diff --git a/lightrag/constants.py b/lightrag/constants.py index 8d5fc826..d46433e0 100644 --- a/lightrag/constants.py +++ b/lightrag/constants.py @@ -61,9 +61,9 @@ DEFAULT_RERANK_BINDING = 'null' # Default source ids limit in meta data for entity and relation DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 300 DEFAULT_MAX_SOURCE_IDS_PER_RELATION = 300 -### control chunk_ids limitation method: FIFO, FIFO -### FIFO: First in first out +### control chunk_ids limitation method: KEEP, FIFO ### KEEP: Keep oldest (less merge action and faster) +### FIFO: First in first out SOURCE_IDS_LIMIT_METHOD_KEEP = 'KEEP' SOURCE_IDS_LIMIT_METHOD_FIFO = 'FIFO' DEFAULT_SOURCE_IDS_LIMIT_METHOD = SOURCE_IDS_LIMIT_METHOD_FIFO diff --git a/lightrag/evaluation/e2e_test_harness.py b/lightrag/evaluation/e2e_test_harness.py index 3eeb86d4..1447e912 100644 --- a/lightrag/evaluation/e2e_test_harness.py +++ b/lightrag/evaluation/e2e_test_harness.py @@ -36,6 +36,7 @@ from pathlib import Path import httpx from dotenv import load_dotenv +from lightrag.utils import logger # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) @@ -85,28 +86,26 @@ class E2ETestHarness: async with httpx.AsyncClient(timeout=30.0) as client: response = await client.get(f'{self.rag_url}/health') response.raise_for_status() - print(f'[OK] LightRAG API accessible at {self.rag_url}') + logger.info(f'LightRAG API accessible at {self.rag_url}') return True except Exception as e: - print(f'[ERROR] Cannot connect to LightRAG API: {e}') + logger.error(f'Cannot connect to LightRAG API: {e}') return False async def download_papers(self) -> list[str]: """Download arXiv papers.""" if self.skip_download: - print('[SKIP] Paper download (--skip-download)') + logger.info('Paper download skipped (--skip-download)') # Check existing papers existing = [ str(self.papers_dir / f'{pid}.pdf') for pid in self.paper_ids if (self.papers_dir / f'{pid}.pdf').exists() ] - print(f'[INFO] Found {len(existing)} existing papers') + logger.info(f'Found {len(existing)} existing papers') return existing - print('\n' + '=' * 60) - print('STEP 1: Download arXiv Papers') - print('=' * 60) + logger.info('STEP 1: Download arXiv Papers') from lightrag.evaluation.download_arxiv import download_papers @@ -115,7 +114,7 @@ class E2ETestHarness: async def clear_existing_data(self) -> bool: """Clear existing documents in LightRAG (optional).""" - print('\n[INFO] Clearing existing data...') + logger.info('Clearing existing data...') try: headers = {'X-API-Key': self.api_key} if self.api_key else {} async with httpx.AsyncClient(timeout=60.0) as client: @@ -134,7 +133,7 @@ class E2ETestHarness: all_docs.extend(status_docs) if all_docs: - print(f'[INFO] Clearing {len(all_docs)} existing documents...') + logger.info(f'Clearing {len(all_docs)} existing documents...') for doc in all_docs: doc_id = doc.get('id') if doc_id: @@ -142,24 +141,22 @@ class E2ETestHarness: f'{self.rag_url}/documents/{doc_id}', headers=headers, ) - print('[OK] Cleared existing documents') + logger.info('Cleared existing documents') else: - print('[OK] No existing documents to clear') + logger.info('No existing documents to clear') return True except Exception as e: - print(f'[WARN] Could not clear data: {e}') + logger.warning(f'Could not clear data: {e}') return False async def ingest_papers(self, paper_paths: list[str]) -> bool: """Ingest papers into LightRAG.""" if self.skip_ingest: - print('[SKIP] Paper ingestion (--skip-ingest)') + logger.info('Paper ingestion skipped (--skip-ingest)') return True - print('\n' + '=' * 60) - print('STEP 2: Ingest Papers into LightRAG') - print('=' * 60) + logger.info('STEP 2: Ingest Papers into LightRAG') headers = {'X-API-Key': self.api_key} if self.api_key else {} @@ -167,10 +164,10 @@ class E2ETestHarness: for paper_path in paper_paths: path = Path(paper_path) if not path.exists(): - print(f'[WARN] Paper not found: {paper_path}') + logger.warning(f'Paper not found: {paper_path}') continue - print(f'[UPLOAD] {path.name}') + logger.info(f'Uploading {path.name}') try: with open(path, 'rb') as f: @@ -182,17 +179,15 @@ class E2ETestHarness: ) response.raise_for_status() result = response.json() - print(f' [OK] Uploaded: {result}') + logger.info(f'Uploaded: {result}') except Exception as e: - print(f' [ERROR] Upload failed: {e}') + logger.error(f'Upload failed: {e}') return True async def wait_for_processing(self) -> bool: """Wait for all documents to finish processing.""" - print('\n' + '=' * 60) - print('STEP 3: Wait for Document Processing') - print('=' * 60) + logger.info('STEP 3: Wait for Document Processing') headers = {'X-API-Key': self.api_key} if self.api_key else {} start_time = time.time() @@ -215,31 +210,29 @@ class E2ETestHarness: failed = len(statuses.get('failed', [])) elapsed = int(time.time() - start_time) - print( - f' [{elapsed}s] Processing: {processing}, Pending: {pending}, Completed: {completed}, Failed: {failed}' + logger.info( + f'[{elapsed}s] Processing: {processing}, Pending: {pending}, Completed: {completed}, Failed: {failed}' ) if processing == 0 and pending == 0: - print('[OK] All documents processed') + logger.info('All documents processed') return True except Exception as e: - print(f' [WARN] Status check failed: {e}') + logger.warning(f'Status check failed: {e}') await asyncio.sleep(POLL_INTERVAL_SECONDS) - print('[ERROR] Timeout waiting for document processing') + logger.error('Timeout waiting for document processing') return False async def generate_dataset(self) -> Path: """Generate Q&A dataset from ingested papers.""" if self.dataset_path and self.dataset_path.exists(): - print(f'[SKIP] Using existing dataset: {self.dataset_path}') + logger.info(f'Using existing dataset: {self.dataset_path}') return self.dataset_path - print('\n' + '=' * 60) - print('STEP 4: Generate Q&A Dataset') - print('=' * 60) + logger.info('STEP 4: Generate Q&A Dataset') from lightrag.evaluation.generate_arxiv_dataset import generate_dataset @@ -256,9 +249,7 @@ class E2ETestHarness: async def run_ragas_evaluation(self, dataset_path: Path) -> dict: """Run RAGAS evaluation.""" - print('\n' + '=' * 60) - print('STEP 5: Run RAGAS Evaluation') - print('=' * 60) + logger.info('STEP 5: Run RAGAS Evaluation') from lightrag.evaluation.eval_rag_quality import RAGEvaluator @@ -272,14 +263,11 @@ class E2ETestHarness: async def run_full_pipeline(self) -> dict: """Run the complete E2E test pipeline.""" - print('=' * 70) - print('E2E RAGAS TEST HARNESS FOR LIGHTRAG') - print('=' * 70) - print(f'RAG URL: {self.rag_url}') - print(f'Papers: {", ".join(self.paper_ids)}') - print(f'Questions: {self.questions_per_paper} per paper') - print(f'Results: {self.results_dir}') - print('=' * 70) + logger.info('E2E RAGAS TEST HARNESS FOR LIGHTRAG') + logger.info(f'RAG URL: {self.rag_url}') + logger.info(f'Papers: {", ".join(self.paper_ids)}') + logger.info(f'Questions: {self.questions_per_paper} per paper') + logger.info(f'Results: {self.results_dir}') start_time = time.time() @@ -321,12 +309,9 @@ class E2ETestHarness: with open(summary_path, 'w') as f: json.dump(summary, f, indent=2) - print('\n' + '=' * 70) - print('E2E PIPELINE COMPLETE') - print('=' * 70) - print(f'Total time: {elapsed_time:.1f} seconds') - print(f'Summary saved: {summary_path}') - print('=' * 70) + logger.info('E2E PIPELINE COMPLETE') + logger.info(f'Total time: {elapsed_time:.1f} seconds') + logger.info(f'Summary saved: {summary_path}') return summary @@ -345,14 +330,12 @@ async def run_ab_test( Returns: A/B comparison results """ - print('=' * 70) - print('A/B TEST: WITH vs WITHOUT ORPHAN CONNECTIONS') - print('=' * 70) + logger.info('A/B TEST: WITH vs WITHOUT ORPHAN CONNECTIONS') results = {} # Test A: WITHOUT orphan connections - print('\n[A] Running WITHOUT orphan connections...') + logger.info('[A] Running WITHOUT orphan connections...') os.environ['AUTO_CONNECT_ORPHANS'] = 'false' harness_a = E2ETestHarness(**harness_config) @@ -363,7 +346,7 @@ async def run_ab_test( await harness_a.clear_existing_data() # Test B: WITH orphan connections - print('\n[B] Running WITH orphan connections...') + logger.info('[B] Running WITH orphan connections...') os.environ['AUTO_CONNECT_ORPHANS'] = 'true' # Force re-ingest for test B @@ -375,9 +358,7 @@ async def run_ab_test( results['with_orphans'] = await harness_b.run_full_pipeline() # Compare results - print('\n' + '=' * 70) - print('A/B COMPARISON') - print('=' * 70) + logger.info('A/B COMPARISON') a_stats = results['without_orphans'].get('ragas_results', {}).get('average_metrics', {}) b_stats = results['with_orphans'].get('ragas_results', {}).get('average_metrics', {}) @@ -401,7 +382,7 @@ async def run_ab_test( } status = 'UP' if diff > 0 else ('DOWN' if diff < 0 else '~') - print(f' {metric:<20} A: {a_val:.4f} B: {b_val:.4f} [{status}] {pct:+.1f}%') + logger.info(f'{metric:<20} A: {a_val:.4f} B: {b_val:.4f} [{status}] {pct:+.1f}%') # Verdict ragas_improvement = comparison['improvement'].get('ragas_score', {}).get('percent', 0) @@ -413,13 +394,13 @@ async def run_ab_test( verdict = 'NO SIGNIFICANT DIFFERENCE' comparison['verdict'] = verdict - print(f'\nVERDICT: {verdict}') + logger.info(f'VERDICT: {verdict}') # Save comparison comp_path = harness_a.results_dir / f'ab_comparison_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json' with open(comp_path, 'w') as f: json.dump(comparison, f, indent=2) - print(f'\nComparison saved: {comp_path}') + logger.info(f'Comparison saved: {comp_path}') return comparison diff --git a/lightrag/kg/deprecated/chroma_impl.py b/lightrag/kg/deprecated/chroma_impl.py index 98df9605..25b20419 100644 --- a/lightrag/kg/deprecated/chroma_impl.py +++ b/lightrag/kg/deprecated/chroma_impl.py @@ -4,14 +4,10 @@ from dataclasses import dataclass from typing import Any, final import numpy as np -import pipmaster as pm from lightrag.base import BaseVectorStorage from lightrag.utils import logger -if not pm.is_installed('chromadb'): - pm.install('chromadb') - from chromadb import HttpClient, PersistentClient # type: ignore from chromadb.config import Settings # type: ignore @@ -217,7 +213,7 @@ class ChromaVectorDBStorage(BaseVectorStorage): self._collection.delete(ids=ids) logger.debug(f'Successfully deleted {len(ids)} vectors from {self.namespace}') except Exception as e: - logger.error(f'Error while deleting vectors from {self.namespace}: {e}') + logger.error(f'Error while deleting vectors from {self.namespace}: {e!s}') raise async def get_by_id(self, id: str) -> dict[str, Any] | None: @@ -245,7 +241,7 @@ class ChromaVectorDBStorage(BaseVectorStorage): **result['metadatas'][0], } except Exception as e: - logger.error(f'Error retrieving vector data for ID {id}: {e}') + logger.error(f'Error retrieving vector data for ID {id}: {e!s}') return None async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: diff --git a/lightrag/utils.py b/lightrag/utils.py index 51014323..bf9ee415 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -2042,8 +2042,6 @@ def normalize_extracted_info(name: str, remove_inner_quotes=False) -> str: if len(name) < 6 and should_filter_by_dots(name): # Filter out mixed numeric and dot content with length < 6 return '' - # Filter out mixed numeric and dot content with length < 6, requiring at least one dot - return '' return name