feat(examples, lightrag): fix logging and code improvements

Fix logging output in evaluation test harness and examples:
- Replace print() statements with logger calls in e2e_test_harness.py
- Update copy_llm_cache_to_another_storage.py to use logger instead of print
- Remove redundant logging configuration in copy_llm_cache_to_another_storage.py
Fix path handling and typos:
- Correct makedirs() call in lightrag_openai_demo.py to create log_dir directly
- Update constants.py comments to clarify SOURCE_IDS_LIMIT_METHOD options
- Remove duplicate return statement in utils.py normalize_extracted_info()
- Fix error string formatting in chroma_impl.py with !s conversion
- Remove unused pipmaster import from chroma_impl.py
This commit is contained in:
clssck 2025-12-05 18:10:19 +01:00
parent dd1413f3eb
commit 65d2cd16b1
6 changed files with 50 additions and 77 deletions

View file

@ -24,7 +24,7 @@ def configure_logging():
log_file_path = os.path.abspath(os.path.join(log_dir, 'lightrag_demo.log')) log_file_path = os.path.abspath(os.path.join(log_dir, 'lightrag_demo.log'))
print(f'\nLightRAG demo log file: {log_file_path}\n') print(f'\nLightRAG demo log file: {log_file_path}\n')
os.makedirs(os.path.dirname(log_dir), exist_ok=True) os.makedirs(log_dir, exist_ok=True)
# Get log file max size and backup count from environment variables # Get log file max size and backup count from environment variables
log_max_bytes = int(os.getenv('LOG_MAX_BYTES', 10485760)) # Default 10MB log_max_bytes = int(os.getenv('LOG_MAX_BYTES', 10485760)) # Default 10MB

View file

@ -5,7 +5,6 @@ This handy script helps you to copy the LLM caches from one storage solution to
""" """
import asyncio import asyncio
import logging
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv
@ -13,13 +12,12 @@ from dotenv import load_dotenv
from lightrag.kg.json_kv_impl import JsonKVStorage from lightrag.kg.json_kv_impl import JsonKVStorage
from lightrag.kg.postgres_impl import PGKVStorage, PostgreSQLDB from lightrag.kg.postgres_impl import PGKVStorage, PostgreSQLDB
from lightrag.namespace import NameSpace from lightrag.namespace import NameSpace
from lightrag.utils import logger
load_dotenv() load_dotenv()
ROOT_DIR = os.environ.get('ROOT_DIR') ROOT_DIR = os.environ.get('ROOT_DIR')
WORKING_DIR = f'{ROOT_DIR}/dickens' WORKING_DIR = f'{ROOT_DIR}/dickens'
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.INFO)
if not os.path.exists(WORKING_DIR): if not os.path.exists(WORKING_DIR):
os.mkdir(WORKING_DIR) os.mkdir(WORKING_DIR)
@ -66,13 +64,13 @@ async def copy_from_postgres_to_json():
if mode not in kv: if mode not in kv:
kv[mode] = {} kv[mode] = {}
kv[mode][hash_value] = cache_entry kv[mode][hash_value] = cache_entry
print(f'Copying {flattened_key} -> {mode}[{hash_value}]') logger.info(f'Copying {flattened_key} -> {mode}[{hash_value}]')
else: else:
print(f'Skipping invalid key format: {flattened_key}') logger.warning(f'Skipping invalid key format: {flattened_key}')
await to_llm_response_cache.upsert(kv) await to_llm_response_cache.upsert(kv)
await to_llm_response_cache.index_done_callback() await to_llm_response_cache.index_done_callback()
print('Mission accomplished!') logger.info('Mission accomplished!')
async def copy_from_json_to_postgres(): async def copy_from_json_to_postgres():

View file

@ -61,9 +61,9 @@ DEFAULT_RERANK_BINDING = 'null'
# Default source ids limit in meta data for entity and relation # Default source ids limit in meta data for entity and relation
DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 300 DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 300
DEFAULT_MAX_SOURCE_IDS_PER_RELATION = 300 DEFAULT_MAX_SOURCE_IDS_PER_RELATION = 300
### control chunk_ids limitation method: FIFO, FIFO ### control chunk_ids limitation method: KEEP, FIFO
### FIFO: First in first out
### KEEP: Keep oldest (less merge action and faster) ### KEEP: Keep oldest (less merge action and faster)
### FIFO: First in first out
SOURCE_IDS_LIMIT_METHOD_KEEP = 'KEEP' SOURCE_IDS_LIMIT_METHOD_KEEP = 'KEEP'
SOURCE_IDS_LIMIT_METHOD_FIFO = 'FIFO' SOURCE_IDS_LIMIT_METHOD_FIFO = 'FIFO'
DEFAULT_SOURCE_IDS_LIMIT_METHOD = SOURCE_IDS_LIMIT_METHOD_FIFO DEFAULT_SOURCE_IDS_LIMIT_METHOD = SOURCE_IDS_LIMIT_METHOD_FIFO

View file

@ -36,6 +36,7 @@ from pathlib import Path
import httpx import httpx
from dotenv import load_dotenv from dotenv import load_dotenv
from lightrag.utils import logger
# Add parent directory to path # Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent.parent))
@ -85,28 +86,26 @@ class E2ETestHarness:
async with httpx.AsyncClient(timeout=30.0) as client: async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(f'{self.rag_url}/health') response = await client.get(f'{self.rag_url}/health')
response.raise_for_status() response.raise_for_status()
print(f'[OK] LightRAG API accessible at {self.rag_url}') logger.info(f'LightRAG API accessible at {self.rag_url}')
return True return True
except Exception as e: except Exception as e:
print(f'[ERROR] Cannot connect to LightRAG API: {e}') logger.error(f'Cannot connect to LightRAG API: {e}')
return False return False
async def download_papers(self) -> list[str]: async def download_papers(self) -> list[str]:
"""Download arXiv papers.""" """Download arXiv papers."""
if self.skip_download: if self.skip_download:
print('[SKIP] Paper download (--skip-download)') logger.info('Paper download skipped (--skip-download)')
# Check existing papers # Check existing papers
existing = [ existing = [
str(self.papers_dir / f'{pid}.pdf') str(self.papers_dir / f'{pid}.pdf')
for pid in self.paper_ids for pid in self.paper_ids
if (self.papers_dir / f'{pid}.pdf').exists() if (self.papers_dir / f'{pid}.pdf').exists()
] ]
print(f'[INFO] Found {len(existing)} existing papers') logger.info(f'Found {len(existing)} existing papers')
return existing return existing
print('\n' + '=' * 60) logger.info('STEP 1: Download arXiv Papers')
print('STEP 1: Download arXiv Papers')
print('=' * 60)
from lightrag.evaluation.download_arxiv import download_papers from lightrag.evaluation.download_arxiv import download_papers
@ -115,7 +114,7 @@ class E2ETestHarness:
async def clear_existing_data(self) -> bool: async def clear_existing_data(self) -> bool:
"""Clear existing documents in LightRAG (optional).""" """Clear existing documents in LightRAG (optional)."""
print('\n[INFO] Clearing existing data...') logger.info('Clearing existing data...')
try: try:
headers = {'X-API-Key': self.api_key} if self.api_key else {} headers = {'X-API-Key': self.api_key} if self.api_key else {}
async with httpx.AsyncClient(timeout=60.0) as client: async with httpx.AsyncClient(timeout=60.0) as client:
@ -134,7 +133,7 @@ class E2ETestHarness:
all_docs.extend(status_docs) all_docs.extend(status_docs)
if all_docs: if all_docs:
print(f'[INFO] Clearing {len(all_docs)} existing documents...') logger.info(f'Clearing {len(all_docs)} existing documents...')
for doc in all_docs: for doc in all_docs:
doc_id = doc.get('id') doc_id = doc.get('id')
if doc_id: if doc_id:
@ -142,24 +141,22 @@ class E2ETestHarness:
f'{self.rag_url}/documents/{doc_id}', f'{self.rag_url}/documents/{doc_id}',
headers=headers, headers=headers,
) )
print('[OK] Cleared existing documents') logger.info('Cleared existing documents')
else: else:
print('[OK] No existing documents to clear') logger.info('No existing documents to clear')
return True return True
except Exception as e: except Exception as e:
print(f'[WARN] Could not clear data: {e}') logger.warning(f'Could not clear data: {e}')
return False return False
async def ingest_papers(self, paper_paths: list[str]) -> bool: async def ingest_papers(self, paper_paths: list[str]) -> bool:
"""Ingest papers into LightRAG.""" """Ingest papers into LightRAG."""
if self.skip_ingest: if self.skip_ingest:
print('[SKIP] Paper ingestion (--skip-ingest)') logger.info('Paper ingestion skipped (--skip-ingest)')
return True return True
print('\n' + '=' * 60) logger.info('STEP 2: Ingest Papers into LightRAG')
print('STEP 2: Ingest Papers into LightRAG')
print('=' * 60)
headers = {'X-API-Key': self.api_key} if self.api_key else {} headers = {'X-API-Key': self.api_key} if self.api_key else {}
@ -167,10 +164,10 @@ class E2ETestHarness:
for paper_path in paper_paths: for paper_path in paper_paths:
path = Path(paper_path) path = Path(paper_path)
if not path.exists(): if not path.exists():
print(f'[WARN] Paper not found: {paper_path}') logger.warning(f'Paper not found: {paper_path}')
continue continue
print(f'[UPLOAD] {path.name}') logger.info(f'Uploading {path.name}')
try: try:
with open(path, 'rb') as f: with open(path, 'rb') as f:
@ -182,17 +179,15 @@ class E2ETestHarness:
) )
response.raise_for_status() response.raise_for_status()
result = response.json() result = response.json()
print(f' [OK] Uploaded: {result}') logger.info(f'Uploaded: {result}')
except Exception as e: except Exception as e:
print(f' [ERROR] Upload failed: {e}') logger.error(f'Upload failed: {e}')
return True return True
async def wait_for_processing(self) -> bool: async def wait_for_processing(self) -> bool:
"""Wait for all documents to finish processing.""" """Wait for all documents to finish processing."""
print('\n' + '=' * 60) logger.info('STEP 3: Wait for Document Processing')
print('STEP 3: Wait for Document Processing')
print('=' * 60)
headers = {'X-API-Key': self.api_key} if self.api_key else {} headers = {'X-API-Key': self.api_key} if self.api_key else {}
start_time = time.time() start_time = time.time()
@ -215,31 +210,29 @@ class E2ETestHarness:
failed = len(statuses.get('failed', [])) failed = len(statuses.get('failed', []))
elapsed = int(time.time() - start_time) elapsed = int(time.time() - start_time)
print( logger.info(
f' [{elapsed}s] Processing: {processing}, Pending: {pending}, Completed: {completed}, Failed: {failed}' f'[{elapsed}s] Processing: {processing}, Pending: {pending}, Completed: {completed}, Failed: {failed}'
) )
if processing == 0 and pending == 0: if processing == 0 and pending == 0:
print('[OK] All documents processed') logger.info('All documents processed')
return True return True
except Exception as e: except Exception as e:
print(f' [WARN] Status check failed: {e}') logger.warning(f'Status check failed: {e}')
await asyncio.sleep(POLL_INTERVAL_SECONDS) await asyncio.sleep(POLL_INTERVAL_SECONDS)
print('[ERROR] Timeout waiting for document processing') logger.error('Timeout waiting for document processing')
return False return False
async def generate_dataset(self) -> Path: async def generate_dataset(self) -> Path:
"""Generate Q&A dataset from ingested papers.""" """Generate Q&A dataset from ingested papers."""
if self.dataset_path and self.dataset_path.exists(): if self.dataset_path and self.dataset_path.exists():
print(f'[SKIP] Using existing dataset: {self.dataset_path}') logger.info(f'Using existing dataset: {self.dataset_path}')
return self.dataset_path return self.dataset_path
print('\n' + '=' * 60) logger.info('STEP 4: Generate Q&A Dataset')
print('STEP 4: Generate Q&A Dataset')
print('=' * 60)
from lightrag.evaluation.generate_arxiv_dataset import generate_dataset from lightrag.evaluation.generate_arxiv_dataset import generate_dataset
@ -256,9 +249,7 @@ class E2ETestHarness:
async def run_ragas_evaluation(self, dataset_path: Path) -> dict: async def run_ragas_evaluation(self, dataset_path: Path) -> dict:
"""Run RAGAS evaluation.""" """Run RAGAS evaluation."""
print('\n' + '=' * 60) logger.info('STEP 5: Run RAGAS Evaluation')
print('STEP 5: Run RAGAS Evaluation')
print('=' * 60)
from lightrag.evaluation.eval_rag_quality import RAGEvaluator from lightrag.evaluation.eval_rag_quality import RAGEvaluator
@ -272,14 +263,11 @@ class E2ETestHarness:
async def run_full_pipeline(self) -> dict: async def run_full_pipeline(self) -> dict:
"""Run the complete E2E test pipeline.""" """Run the complete E2E test pipeline."""
print('=' * 70) logger.info('E2E RAGAS TEST HARNESS FOR LIGHTRAG')
print('E2E RAGAS TEST HARNESS FOR LIGHTRAG') logger.info(f'RAG URL: {self.rag_url}')
print('=' * 70) logger.info(f'Papers: {", ".join(self.paper_ids)}')
print(f'RAG URL: {self.rag_url}') logger.info(f'Questions: {self.questions_per_paper} per paper')
print(f'Papers: {", ".join(self.paper_ids)}') logger.info(f'Results: {self.results_dir}')
print(f'Questions: {self.questions_per_paper} per paper')
print(f'Results: {self.results_dir}')
print('=' * 70)
start_time = time.time() start_time = time.time()
@ -321,12 +309,9 @@ class E2ETestHarness:
with open(summary_path, 'w') as f: with open(summary_path, 'w') as f:
json.dump(summary, f, indent=2) json.dump(summary, f, indent=2)
print('\n' + '=' * 70) logger.info('E2E PIPELINE COMPLETE')
print('E2E PIPELINE COMPLETE') logger.info(f'Total time: {elapsed_time:.1f} seconds')
print('=' * 70) logger.info(f'Summary saved: {summary_path}')
print(f'Total time: {elapsed_time:.1f} seconds')
print(f'Summary saved: {summary_path}')
print('=' * 70)
return summary return summary
@ -345,14 +330,12 @@ async def run_ab_test(
Returns: Returns:
A/B comparison results A/B comparison results
""" """
print('=' * 70) logger.info('A/B TEST: WITH vs WITHOUT ORPHAN CONNECTIONS')
print('A/B TEST: WITH vs WITHOUT ORPHAN CONNECTIONS')
print('=' * 70)
results = {} results = {}
# Test A: WITHOUT orphan connections # Test A: WITHOUT orphan connections
print('\n[A] Running WITHOUT orphan connections...') logger.info('[A] Running WITHOUT orphan connections...')
os.environ['AUTO_CONNECT_ORPHANS'] = 'false' os.environ['AUTO_CONNECT_ORPHANS'] = 'false'
harness_a = E2ETestHarness(**harness_config) harness_a = E2ETestHarness(**harness_config)
@ -363,7 +346,7 @@ async def run_ab_test(
await harness_a.clear_existing_data() await harness_a.clear_existing_data()
# Test B: WITH orphan connections # Test B: WITH orphan connections
print('\n[B] Running WITH orphan connections...') logger.info('[B] Running WITH orphan connections...')
os.environ['AUTO_CONNECT_ORPHANS'] = 'true' os.environ['AUTO_CONNECT_ORPHANS'] = 'true'
# Force re-ingest for test B # Force re-ingest for test B
@ -375,9 +358,7 @@ async def run_ab_test(
results['with_orphans'] = await harness_b.run_full_pipeline() results['with_orphans'] = await harness_b.run_full_pipeline()
# Compare results # Compare results
print('\n' + '=' * 70) logger.info('A/B COMPARISON')
print('A/B COMPARISON')
print('=' * 70)
a_stats = results['without_orphans'].get('ragas_results', {}).get('average_metrics', {}) a_stats = results['without_orphans'].get('ragas_results', {}).get('average_metrics', {})
b_stats = results['with_orphans'].get('ragas_results', {}).get('average_metrics', {}) b_stats = results['with_orphans'].get('ragas_results', {}).get('average_metrics', {})
@ -401,7 +382,7 @@ async def run_ab_test(
} }
status = 'UP' if diff > 0 else ('DOWN' if diff < 0 else '~') status = 'UP' if diff > 0 else ('DOWN' if diff < 0 else '~')
print(f' {metric:<20} A: {a_val:.4f} B: {b_val:.4f} [{status}] {pct:+.1f}%') logger.info(f'{metric:<20} A: {a_val:.4f} B: {b_val:.4f} [{status}] {pct:+.1f}%')
# Verdict # Verdict
ragas_improvement = comparison['improvement'].get('ragas_score', {}).get('percent', 0) ragas_improvement = comparison['improvement'].get('ragas_score', {}).get('percent', 0)
@ -413,13 +394,13 @@ async def run_ab_test(
verdict = 'NO SIGNIFICANT DIFFERENCE' verdict = 'NO SIGNIFICANT DIFFERENCE'
comparison['verdict'] = verdict comparison['verdict'] = verdict
print(f'\nVERDICT: {verdict}') logger.info(f'VERDICT: {verdict}')
# Save comparison # Save comparison
comp_path = harness_a.results_dir / f'ab_comparison_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json' comp_path = harness_a.results_dir / f'ab_comparison_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'
with open(comp_path, 'w') as f: with open(comp_path, 'w') as f:
json.dump(comparison, f, indent=2) json.dump(comparison, f, indent=2)
print(f'\nComparison saved: {comp_path}') logger.info(f'Comparison saved: {comp_path}')
return comparison return comparison

View file

@ -4,14 +4,10 @@ from dataclasses import dataclass
from typing import Any, final from typing import Any, final
import numpy as np import numpy as np
import pipmaster as pm
from lightrag.base import BaseVectorStorage from lightrag.base import BaseVectorStorage
from lightrag.utils import logger from lightrag.utils import logger
if not pm.is_installed('chromadb'):
pm.install('chromadb')
from chromadb import HttpClient, PersistentClient # type: ignore from chromadb import HttpClient, PersistentClient # type: ignore
from chromadb.config import Settings # type: ignore from chromadb.config import Settings # type: ignore
@ -217,7 +213,7 @@ class ChromaVectorDBStorage(BaseVectorStorage):
self._collection.delete(ids=ids) self._collection.delete(ids=ids)
logger.debug(f'Successfully deleted {len(ids)} vectors from {self.namespace}') logger.debug(f'Successfully deleted {len(ids)} vectors from {self.namespace}')
except Exception as e: except Exception as e:
logger.error(f'Error while deleting vectors from {self.namespace}: {e}') logger.error(f'Error while deleting vectors from {self.namespace}: {e!s}')
raise raise
async def get_by_id(self, id: str) -> dict[str, Any] | None: async def get_by_id(self, id: str) -> dict[str, Any] | None:
@ -245,7 +241,7 @@ class ChromaVectorDBStorage(BaseVectorStorage):
**result['metadatas'][0], **result['metadatas'][0],
} }
except Exception as e: except Exception as e:
logger.error(f'Error retrieving vector data for ID {id}: {e}') logger.error(f'Error retrieving vector data for ID {id}: {e!s}')
return None return None
async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]: async def get_by_ids(self, ids: list[str]) -> list[dict[str, Any]]:

View file

@ -2042,8 +2042,6 @@ def normalize_extracted_info(name: str, remove_inner_quotes=False) -> str:
if len(name) < 6 and should_filter_by_dots(name): if len(name) < 6 and should_filter_by_dots(name):
# Filter out mixed numeric and dot content with length < 6 # Filter out mixed numeric and dot content with length < 6
return '' return ''
# Filter out mixed numeric and dot content with length < 6, requiring at least one dot
return ''
return name return name