From 55274dde597fe2a4adab5f1b2cd6ad865f4a2e78 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 8 Nov 2025 17:57:22 +0800 Subject: [PATCH 01/12] Add LLM cache migration tool for KV storage backends - Supports JSON/Redis/PostgreSQL/MongoDB - Batch migration with error tracking - Workspace-aware data transfer - Memory-efficient pagination - Comprehensive migration reporting --- tools/README_MIGRATE_LLM_CACHE.md | 401 +++++++++++++++++ tools/migrate_llm_cache.py | 721 ++++++++++++++++++++++++++++++ 2 files changed, 1122 insertions(+) create mode 100644 tools/README_MIGRATE_LLM_CACHE.md create mode 100644 tools/migrate_llm_cache.py diff --git a/tools/README_MIGRATE_LLM_CACHE.md b/tools/README_MIGRATE_LLM_CACHE.md new file mode 100644 index 00000000..edd49acd --- /dev/null +++ b/tools/README_MIGRATE_LLM_CACHE.md @@ -0,0 +1,401 @@ +# LLM Cache Migration Tool - User Guide + +## Overview + +This tool migrates LightRAG's LLM response cache between different KV storage implementations. It specifically migrates caches generated during file extraction (mode `default`), including entity extraction and summary caches. + +## Supported Storage Types + +1. **JsonKVStorage** - File-based JSON storage +2. **RedisKVStorage** - Redis database storage +3. **PGKVStorage** - PostgreSQL database storage +4. **MongoKVStorage** - MongoDB database storage + +## Cache Types + +The tool migrates the following cache types: +- `default:extract:*` - Entity and relationship extraction caches +- `default:summary:*` - Entity and relationship summary caches + +**Note**: Query caches (modes like `local`, `global`, etc.) are NOT migrated. + +## Prerequisites + +### 1. Environment Variable Configuration + +Ensure the relevant storage environment variables are configured in your `.env` file: + +#### Workspace Configuration (Optional) +```bash +# Generic workspace (shared by all storages) +WORKSPACE=space1 + +# Or configure independent workspace for specific storage +POSTGRES_WORKSPACE=pg_space +MONGODB_WORKSPACE=mongo_space +REDIS_WORKSPACE=redis_space +``` + +**Workspace Priority**: Storage-specific > Generic WORKSPACE > Empty string + +#### JsonKVStorage +```bash +WORKING_DIR=./rag_storage +``` + +#### RedisKVStorage +```bash +REDIS_URI=redis://localhost:6379 +``` + +#### PGKVStorage +```bash +POSTGRES_HOST=localhost +POSTGRES_PORT=5432 +POSTGRES_USER=your_username +POSTGRES_PASSWORD=your_password +POSTGRES_DATABASE=your_database +``` + +#### MongoKVStorage +```bash +MONGO_URI=mongodb://root:root@localhost:27017/ +MONGO_DATABASE=LightRAG +``` + +### 2. Install Dependencies + +Ensure LightRAG and its dependencies are installed: + +```bash +pip install -r requirements.txt +``` + +## Usage + +### Basic Usage + +Run from the LightRAG project root directory: + +```bash +python tools/migrate_llm_cache.py +``` + +### Interactive Workflow + +The tool guides you through the following steps: + +#### 1. Select Source Storage Type +``` +Supported KV Storage Types: +[1] JsonKVStorage +[2] RedisKVStorage +[3] PGKVStorage +[4] MongoKVStorage + +Select Source storage type (1-4): 1 +``` + +#### 2. Source Storage Validation +The tool will: +- Check required environment variables +- Auto-detect workspace configuration +- Initialize and connect to storage +- Count cache records available for migration + +``` +Checking environment variables... +✓ All required environment variables are set + +Initializing Source storage... +- Storage Type: JsonKVStorage +- Workspace: space1 +- Connection Status: ✓ Success + +Counting cache records... +- default:extract: 8,500 records +- default:summary: 234 records +- Total: 8,734 records +``` + +#### 3. Select Target Storage Type + +Repeat steps 1-2 to select and validate the target storage. + +#### 4. Confirm Migration + +Review the migration summary and confirm: + +``` +Migration Confirmation +================================================== +Source: JsonKVStorage (workspace: space1) - 8,734 records +Target: MongoKVStorage (workspace: space1) - 0 records +Batch Size: 1,000 records/batch + +⚠ Warning: Target storage already has 0 records +Migration will overwrite records with the same keys + +Continue? (y/n): y +``` + +#### 5. Execute Migration + +Observe migration progress: + +``` +=== Starting Migration === +Batch 1/9: ████████░░ 1000/8734 (11%) - default:extract +Batch 2/9: ████████████████░░ 2000/8734 (23%) - default:extract +... +Batch 9/9: ████████████████████ 8734/8734 (100%) - default:summary + +Persisting data to disk... +``` + +#### 6. Review Migration Report + +The tool provides a comprehensive final report showing statistics and any errors encountered: + +**Successful Migration:** +``` +Migration Complete - Final Report + +📊 Statistics: + Total source records: 8,734 + Total batches: 9 + Successful batches: 9 + Failed batches: 0 + Successfully migrated: 8,734 + Failed to migrate: 0 + Success rate: 100.00% + +✓ SUCCESS: All records migrated successfully! +``` + +**Migration with Errors:** +``` +Migration Complete - Final Report + +📊 Statistics: + Total source records: 8,734 + Total batches: 9 + Successful batches: 8 + Failed batches: 1 + Successfully migrated: 7,734 + Failed to migrate: 1,000 + Success rate: 88.55% + +⚠️ Errors encountered: 1 + +Error Details: +------------------------------------------------------------ + +Error Summary: + - ConnectionError: 1 occurrence(s) + +First 5 errors: + + 1. Batch 2 + Type: ConnectionError + Message: Connection timeout after 30s + Records lost: 1,000 + +⚠️ WARNING: Migration completed with errors! + Please review the error details above. +``` + +## Technical Details + +### Workspace Handling + +The tool retrieves workspace in the following priority order: + +1. **Storage-specific workspace environment variables** + - PGKVStorage: `POSTGRES_WORKSPACE` + - MongoKVStorage: `MONGODB_WORKSPACE` + - RedisKVStorage: `REDIS_WORKSPACE` + +2. **Generic workspace environment variable** + - `WORKSPACE` + +3. **Default value** + - Empty string (uses storage's default workspace) + +### Batch Migration + +- Default batch size: 1000 records/batch +- Avoids memory overflow from loading too much data at once +- Each batch is committed independently, supporting resume capability + +### Memory-Efficient Pagination + +For large datasets, the tool implements storage-specific pagination strategies: + +- **JsonKVStorage**: Direct in-memory access (data already loaded in shared storage) +- **RedisKVStorage**: Cursor-based SCAN with pipeline batching (1000 keys/batch) +- **PGKVStorage**: SQL LIMIT/OFFSET pagination (1000 records/batch) +- **MongoKVStorage**: Cursor streaming with batch_size (1000 documents/batch) + +This ensures the tool can handle millions of cache records without memory issues. + +### Prefix Filtering Implementation + +The tool uses optimized filtering methods for different storage types: + +- **JsonKVStorage**: Direct dictionary iteration with lock protection +- **RedisKVStorage**: SCAN command with namespace-prefixed patterns + pipeline for bulk GET +- **PGKVStorage**: SQL LIKE queries with proper field mapping (id, return_value, etc.) +- **MongoKVStorage**: MongoDB regex queries on `_id` field with cursor streaming + +## Error Handling & Resilience + +The tool implements comprehensive error tracking to ensure transparent and resilient migrations: + +### Batch-Level Error Tracking +- Each batch is independently error-checked +- Failed batches are logged but don't stop the migration +- Successful batches are committed even if later batches fail +- Real-time progress shows ✓ (success) or ✗ (failed) for each batch + +### Error Reporting +After migration completes, a detailed report includes: +- **Statistics**: Total records, success/failure counts, success rate +- **Error Summary**: Grouped by error type with occurrence counts +- **Error Details**: Batch number, error type, message, and records lost +- **Recommendations**: Clear indication of success or need for review + +### No Double Data Loading +- Unlike traditional verification approaches, the tool does NOT reload all target data +- Errors are detected during migration, not after +- This eliminates memory overhead and handles pre-existing target data correctly + +## Important Notes + +1. **Data Overwrite Warning** + - Migration will overwrite records with the same keys in the target storage + - Tool displays a warning if target storage already has data + - Pre-existing data in target storage is handled correctly + +2. **Workspace Consistency** + - Recommended to use the same workspace for source and target + - Cache data in different workspaces are completely isolated + +3. **Interrupt and Resume** + - Migration can be interrupted at any time (Ctrl+C) + - Already migrated data will remain in target storage + - Re-running will overwrite existing records + - Failed batches can be manually retried + +4. **Performance Considerations** + - Large data migration may take considerable time + - Recommend migrating during off-peak hours + - Ensure stable network connection (for remote databases) + - Memory usage stays constant regardless of dataset size + +## Troubleshooting + +### Missing Environment Variables +``` +✗ Missing required environment variables: POSTGRES_USER, POSTGRES_PASSWORD +``` +**Solution**: Add missing variables to your `.env` file + +### Connection Failed +``` +✗ Initialization failed: Connection refused +``` +**Solutions**: +- Check if database service is running +- Verify connection parameters (host, port, credentials) +- Check firewall settings + +**Solutions**: +- Check migration process for error logs +- Re-run migration tool +- Check target storage capacity and permissions + +## Example Scenarios + +### Scenario 1: JSON to MongoDB Migration + +Use case: Migrating from single-machine development to production + +```bash +# 1. Configure environment variables +WORKSPACE=production +MONGO_URI=mongodb://user:pass@prod-server:27017/ +MONGO_DATABASE=LightRAG + +# 2. Run tool +python tools/migrate_llm_cache.py + +# 3. Select: 1 (JsonKVStorage) -> 4 (MongoKVStorage) +``` + +### Scenario 2: PostgreSQL Database Switch + +Use case: Database migration or upgrade + +```bash +# 1. Configure old and new databases +POSTGRES_WORKSPACE=old_db # Source +# ... Configure new database as default + +# 2. Run tool and select same storage type +``` + +### Scenario 3: Redis to PostgreSQL + +Use case: Migrating from cache storage to relational database + +```bash +# 1. Ensure both databases are accessible +REDIS_URI=redis://old-redis:6379 +POSTGRES_HOST=new-postgres-server +# ... Other PostgreSQL configs + +# 2. Run tool +python tools/migrate_llm_cache.py + +# 3. Select: 2 (RedisKVStorage) -> 3 (PGKVStorage) +``` + +## Tool Limitations + +1. **Only Default Mode Caches** + - Only migrates `default:extract:*` and `default:summary:*` + - Query caches are not included + +2. **Workspace Isolation** + - Different workspaces are treated as completely separate + - Cross-workspace migration requires manual workspace reconfiguration + +3. **Network Dependency** + - Tool requires stable network connection for remote databases + - Large datasets may fail if connection is interrupted + +## Best Practices + +1. **Backup Before Migration** + - Always backup your data before migration + - Test migration on non-production data first + +2. **Verify Results** + - Check the verification output after migration + - Manually verify a few cache entries if needed + +3. **Monitor Performance** + - Watch database resource usage during migration + - Consider migrating in smaller batches if needed + +4. **Clean Old Data** + - After successful migration, consider cleaning old cache data + - Keep backups for a reasonable period before deletion + +## Support + +For issues or questions: +- Check LightRAG documentation +- Review error logs for detailed information +- Ensure all environment variables are correctly configured diff --git a/tools/migrate_llm_cache.py b/tools/migrate_llm_cache.py new file mode 100644 index 00000000..cb48f394 --- /dev/null +++ b/tools/migrate_llm_cache.py @@ -0,0 +1,721 @@ +#!/usr/bin/env python3 +""" +LLM Cache Migration Tool for LightRAG + +This tool migrates LLM response cache (default:extract:* and default:summary:*) +between different KV storage implementations while preserving workspace isolation. + +Usage: + python tools/migrate_llm_cache.py + +Supported KV Storage Types: + - JsonKVStorage + - RedisKVStorage + - PGKVStorage + - MongoKVStorage +""" + +import asyncio +import os +import sys +import time +from typing import Any, Dict, List +from dataclasses import dataclass, field +from dotenv import load_dotenv + +# Add parent directory to path for imports +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +from lightrag.kg import STORAGE_ENV_REQUIREMENTS +from lightrag.namespace import NameSpace +from lightrag.utils import setup_logger + +# Load environment variables +load_dotenv(dotenv_path=".env", override=False) + +# Setup logger +setup_logger("lightrag", level="INFO") + +# Storage type configurations +STORAGE_TYPES = { + "1": "JsonKVStorage", + "2": "RedisKVStorage", + "3": "PGKVStorage", + "4": "MongoKVStorage", +} + +# Workspace environment variable mapping +WORKSPACE_ENV_MAP = { + "PGKVStorage": "POSTGRES_WORKSPACE", + "MongoKVStorage": "MONGODB_WORKSPACE", + "RedisKVStorage": "REDIS_WORKSPACE", +} + +# Default batch size for migration +DEFAULT_BATCH_SIZE = 1000 + + +@dataclass +class MigrationStats: + """Migration statistics and error tracking""" + total_source_records: int = 0 + total_batches: int = 0 + successful_batches: int = 0 + failed_batches: int = 0 + successful_records: int = 0 + failed_records: int = 0 + errors: List[Dict[str, Any]] = field(default_factory=list) + + def add_error(self, batch_idx: int, error: Exception, batch_size: int): + """Record batch error""" + self.errors.append({ + 'batch': batch_idx, + 'error_type': type(error).__name__, + 'error_msg': str(error), + 'records_lost': batch_size, + 'timestamp': time.time() + }) + self.failed_batches += 1 + self.failed_records += batch_size + + +class MigrationTool: + """LLM Cache Migration Tool""" + + def __init__(self): + self.source_storage = None + self.target_storage = None + self.source_workspace = "" + self.target_workspace = "" + self.batch_size = DEFAULT_BATCH_SIZE + + def get_workspace_for_storage(self, storage_name: str) -> str: + """Get workspace for a specific storage type + + Priority: Storage-specific env var > WORKSPACE env var > empty string + + Args: + storage_name: Storage implementation name + + Returns: + Workspace name + """ + # Check storage-specific workspace + if storage_name in WORKSPACE_ENV_MAP: + specific_workspace = os.getenv(WORKSPACE_ENV_MAP[storage_name]) + if specific_workspace: + return specific_workspace + + # Check generic WORKSPACE + workspace = os.getenv("WORKSPACE", "") + return workspace + + def check_env_vars(self, storage_name: str) -> bool: + """Check if all required environment variables exist + + Args: + storage_name: Storage implementation name + + Returns: + True if all required env vars exist, False otherwise + """ + required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, []) + missing_vars = [var for var in required_vars if var not in os.environ] + + if missing_vars: + print(f"✗ Missing required environment variables: {', '.join(missing_vars)}") + return False + + print("✓ All required environment variables are set") + return True + + def get_storage_class(self, storage_name: str): + """Dynamically import and return storage class + + Args: + storage_name: Storage implementation name + + Returns: + Storage class + """ + if storage_name == "JsonKVStorage": + from lightrag.kg.json_kv_impl import JsonKVStorage + return JsonKVStorage + elif storage_name == "RedisKVStorage": + from lightrag.kg.redis_impl import RedisKVStorage + return RedisKVStorage + elif storage_name == "PGKVStorage": + from lightrag.kg.postgres_impl import PGKVStorage + return PGKVStorage + elif storage_name == "MongoKVStorage": + from lightrag.kg.mongo_impl import MongoKVStorage + return MongoKVStorage + else: + raise ValueError(f"Unsupported storage type: {storage_name}") + + async def initialize_storage(self, storage_name: str, workspace: str): + """Initialize storage instance + + Args: + storage_name: Storage implementation name + workspace: Workspace name + + Returns: + Initialized storage instance + """ + storage_class = self.get_storage_class(storage_name) + + # Create global config + global_config = { + "working_dir": os.getenv("WORKING_DIR", "./rag_storage"), + "embedding_batch_num": 10, + } + + # Initialize storage + storage = storage_class( + namespace=NameSpace.KV_STORE_LLM_RESPONSE_CACHE, + workspace=workspace, + global_config=global_config, + embedding_func=None, + ) + + # Initialize the storage + await storage.initialize() + + return storage + + async def get_default_caches_json(self, storage) -> Dict[str, Any]: + """Get default caches from JsonKVStorage + + Args: + storage: JsonKVStorage instance + + Returns: + Dictionary of cache entries with default:extract:* or default:summary:* keys + """ + # Access _data directly - it's a dict from shared_storage + async with storage._storage_lock: + filtered = {} + for key, value in storage._data.items(): + if key.startswith("default:extract:") or key.startswith("default:summary:"): + filtered[key] = value + return filtered + + async def get_default_caches_redis(self, storage, batch_size: int = 1000) -> Dict[str, Any]: + """Get default caches from RedisKVStorage with pagination + + Args: + storage: RedisKVStorage instance + batch_size: Number of keys to process per batch + + Returns: + Dictionary of cache entries with default:extract:* or default:summary:* keys + """ + import json + + cache_data = {} + + # Use _get_redis_connection() context manager + async with storage._get_redis_connection() as redis: + for pattern in ["default:extract:*", "default:summary:*"]: + # Add namespace prefix to pattern + prefixed_pattern = f"{storage.final_namespace}:{pattern}" + cursor = 0 + + while True: + # SCAN already implements cursor-based pagination + cursor, keys = await redis.scan( + cursor, + match=prefixed_pattern, + count=batch_size + ) + + if keys: + # Process this batch using pipeline with error handling + try: + pipe = redis.pipeline() + for key in keys: + pipe.get(key) + values = await pipe.execute() + + for key, value in zip(keys, values): + if value: + key_str = key.decode() if isinstance(key, bytes) else key + # Remove namespace prefix to get original key + original_key = key_str.replace(f"{storage.final_namespace}:", "", 1) + cache_data[original_key] = json.loads(value) + + except Exception as e: + # Pipeline execution failed, fall back to individual gets + print(f"⚠️ Pipeline execution failed for batch, using individual gets: {e}") + for key in keys: + try: + value = await redis.get(key) + if value: + key_str = key.decode() if isinstance(key, bytes) else key + original_key = key_str.replace(f"{storage.final_namespace}:", "", 1) + cache_data[original_key] = json.loads(value) + except Exception as individual_error: + print(f"⚠️ Failed to get individual key {key}: {individual_error}") + continue + + if cursor == 0: + break + + # Yield control periodically to avoid blocking + await asyncio.sleep(0) + + return cache_data + + async def get_default_caches_pg(self, storage, batch_size: int = 1000) -> Dict[str, Any]: + """Get default caches from PGKVStorage with pagination + + Args: + storage: PGKVStorage instance + batch_size: Number of records to fetch per batch + + Returns: + Dictionary of cache entries with default:extract:* or default:summary:* keys + """ + from lightrag.kg.postgres_impl import namespace_to_table_name + + cache_data = {} + table_name = namespace_to_table_name(storage.namespace) + offset = 0 + + while True: + # Use LIMIT and OFFSET for pagination + query = f""" + SELECT id as key, original_prompt, return_value, chunk_id, cache_type, queryparam, + EXTRACT(EPOCH FROM create_time)::BIGINT as create_time, + EXTRACT(EPOCH FROM update_time)::BIGINT as update_time + FROM {table_name} + WHERE workspace = $1 + AND (id LIKE 'default:extract:%' OR id LIKE 'default:summary:%') + ORDER BY id + LIMIT $2 OFFSET $3 + """ + + results = await storage.db.query( + query, + [storage.workspace, batch_size, offset], + multirows=True + ) + + if not results: + break + + for row in results: + # Map PostgreSQL fields to cache format + cache_entry = { + "return": row.get("return_value", ""), + "cache_type": row.get("cache_type"), + "original_prompt": row.get("original_prompt", ""), + "chunk_id": row.get("chunk_id"), + "queryparam": row.get("queryparam"), + "create_time": row.get("create_time", 0), + "update_time": row.get("update_time", 0), + } + cache_data[row["key"]] = cache_entry + + # If we got fewer results than batch_size, we're done + if len(results) < batch_size: + break + + offset += batch_size + + # Yield control periodically + await asyncio.sleep(0) + + return cache_data + + async def get_default_caches_mongo(self, storage, batch_size: int = 1000) -> Dict[str, Any]: + """Get default caches from MongoKVStorage with cursor-based pagination + + Args: + storage: MongoKVStorage instance + batch_size: Number of documents to process per batch + + Returns: + Dictionary of cache entries with default:extract:* or default:summary:* keys + """ + cache_data = {} + + # MongoDB query with regex - use _data not collection + query = {"_id": {"$regex": "^default:(extract|summary):"}} + + # Use cursor without to_list() - process in batches + cursor = storage._data.find(query).batch_size(batch_size) + + async for doc in cursor: + # Process each document as it comes + doc_copy = doc.copy() + key = doc_copy.pop("_id") + + # Filter ALL MongoDB/database-specific fields + # Following .clinerules: "Always filter deprecated/incompatible fields during deserialization" + for field_name in ["namespace", "workspace", "_id", "content"]: + doc_copy.pop(field_name, None) + + cache_data[key] = doc_copy + + # Periodically yield control (every batch_size documents) + if len(cache_data) % batch_size == 0: + await asyncio.sleep(0) + + return cache_data + + async def get_default_caches(self, storage, storage_name: str) -> Dict[str, Any]: + """Get default caches from any storage type + + Args: + storage: Storage instance + storage_name: Storage type name + + Returns: + Dictionary of cache entries + """ + if storage_name == "JsonKVStorage": + return await self.get_default_caches_json(storage) + elif storage_name == "RedisKVStorage": + return await self.get_default_caches_redis(storage) + elif storage_name == "PGKVStorage": + return await self.get_default_caches_pg(storage) + elif storage_name == "MongoKVStorage": + return await self.get_default_caches_mongo(storage) + else: + raise ValueError(f"Unsupported storage type: {storage_name}") + + async def count_cache_types(self, cache_data: Dict[str, Any]) -> Dict[str, int]: + """Count cache entries by type + + Args: + cache_data: Dictionary of cache entries + + Returns: + Dictionary with counts for each cache type + """ + counts = { + "extract": 0, + "summary": 0, + } + + for key in cache_data.keys(): + if key.startswith("default:extract:"): + counts["extract"] += 1 + elif key.startswith("default:summary:"): + counts["summary"] += 1 + + return counts + + def print_header(self): + """Print tool header""" + print("\n" + "=" * 50) + print("LLM Cache Migration Tool - LightRAG") + print("=" * 50) + + def print_storage_types(self): + """Print available storage types""" + print("\nSupported KV Storage Types:") + for key, value in STORAGE_TYPES.items(): + print(f"[{key}] {value}") + + def get_user_choice(self, prompt: str, valid_choices: list) -> str: + """Get user choice with validation + + Args: + prompt: Prompt message + valid_choices: List of valid choices + + Returns: + User's choice + """ + while True: + choice = input(f"\n{prompt}: ").strip() + if choice in valid_choices: + return choice + print(f"✗ Invalid choice, please enter one of: {', '.join(valid_choices)}") + + async def setup_storage(self, storage_type: str) -> tuple: + """Setup and initialize storage + + Args: + storage_type: Type label (source/target) + + Returns: + Tuple of (storage_instance, storage_name, workspace, cache_data) + """ + print(f"\n=== {storage_type} Storage Setup ===") + + # Get storage type choice + choice = self.get_user_choice( + f"Select {storage_type} storage type (1-4)", + list(STORAGE_TYPES.keys()) + ) + storage_name = STORAGE_TYPES[choice] + + # Check environment variables + print("\nChecking environment variables...") + if not self.check_env_vars(storage_name): + return None, None, None, None + + # Get workspace + workspace = self.get_workspace_for_storage(storage_name) + + # Initialize storage + print(f"\nInitializing {storage_type} storage...") + try: + storage = await self.initialize_storage(storage_name, workspace) + print(f"- Storage Type: {storage_name}") + print(f"- Workspace: {workspace if workspace else '(default)'}") + print("- Connection Status: ✓ Success") + except Exception as e: + print(f"✗ Initialization failed: {e}") + return None, None, None, None + + # Get cache data + print("\nCounting cache records...") + try: + cache_data = await self.get_default_caches(storage, storage_name) + counts = await self.count_cache_types(cache_data) + + print(f"- default:extract: {counts['extract']:,} records") + print(f"- default:summary: {counts['summary']:,} records") + print(f"- Total: {len(cache_data):,} records") + except Exception as e: + print(f"✗ Counting failed: {e}") + return None, None, None, None + + return storage, storage_name, workspace, cache_data + + async def migrate_caches( + self, + source_data: Dict[str, Any], + target_storage, + target_storage_name: str + ) -> MigrationStats: + """Migrate caches in batches with error tracking + + Args: + source_data: Source cache data + target_storage: Target storage instance + target_storage_name: Target storage type name + + Returns: + MigrationStats object with migration results and errors + """ + stats = MigrationStats() + stats.total_source_records = len(source_data) + + if stats.total_source_records == 0: + print("\nNo records to migrate") + return stats + + # Convert to list for batching + items = list(source_data.items()) + stats.total_batches = (stats.total_source_records + self.batch_size - 1) // self.batch_size + + print("\n=== Starting Migration ===") + + for batch_idx in range(stats.total_batches): + start_idx = batch_idx * self.batch_size + end_idx = min((batch_idx + 1) * self.batch_size, stats.total_source_records) + batch_items = items[start_idx:end_idx] + batch_data = dict(batch_items) + + # Determine current cache type for display + current_key = batch_items[0][0] + cache_type = "extract" if "extract" in current_key else "summary" + + try: + # Attempt to write batch + await target_storage.upsert(batch_data) + + # Success - update stats + stats.successful_batches += 1 + stats.successful_records += len(batch_data) + + # Calculate progress + progress = (end_idx / stats.total_source_records) * 100 + bar_length = 20 + filled_length = int(bar_length * end_idx // stats.total_source_records) + bar = "█" * filled_length + "░" * (bar_length - filled_length) + + print(f"Batch {batch_idx + 1}/{stats.total_batches}: {bar} " + f"{end_idx:,}/{stats.total_source_records:,} ({progress:.0f}%) - " + f"default:{cache_type} ✓") + + except Exception as e: + # Error - record and continue + stats.add_error(batch_idx + 1, e, len(batch_data)) + + print(f"Batch {batch_idx + 1}/{stats.total_batches}: ✗ FAILED - " + f"{type(e).__name__}: {str(e)}") + + # Final persist + print("\nPersisting data to disk...") + try: + await target_storage.index_done_callback() + print("✓ Data persisted successfully") + except Exception as e: + print(f"✗ Persist failed: {e}") + stats.add_error(0, e, 0) # batch 0 = persist error + + return stats + + def print_migration_report(self, stats: MigrationStats): + """Print comprehensive migration report + + Args: + stats: MigrationStats object with migration results + """ + print("\n" + "=" * 60) + print("Migration Complete - Final Report") + print("=" * 60) + + # Overall statistics + print("\n📊 Statistics:") + print(f" Total source records: {stats.total_source_records:,}") + print(f" Total batches: {stats.total_batches:,}") + print(f" Successful batches: {stats.successful_batches:,}") + print(f" Failed batches: {stats.failed_batches:,}") + print(f" Successfully migrated: {stats.successful_records:,}") + print(f" Failed to migrate: {stats.failed_records:,}") + + # Success rate + success_rate = (stats.successful_records / stats.total_source_records * 100) if stats.total_source_records > 0 else 0 + print(f" Success rate: {success_rate:.2f}%") + + # Error details + if stats.errors: + print(f"\n⚠️ Errors encountered: {len(stats.errors)}") + print("\nError Details:") + print("-" * 60) + + # Group errors by type + error_types = {} + for error in stats.errors: + err_type = error['error_type'] + error_types[err_type] = error_types.get(err_type, 0) + 1 + + print("\nError Summary:") + for err_type, count in sorted(error_types.items(), key=lambda x: -x[1]): + print(f" - {err_type}: {count} occurrence(s)") + + print("\nFirst 5 errors:") + for i, error in enumerate(stats.errors[:5], 1): + print(f"\n {i}. Batch {error['batch']}") + print(f" Type: {error['error_type']}") + print(f" Message: {error['error_msg']}") + print(f" Records lost: {error['records_lost']:,}") + + if len(stats.errors) > 5: + print(f"\n ... and {len(stats.errors) - 5} more errors") + + print("\n" + "=" * 60) + print("⚠️ WARNING: Migration completed with errors!") + print(" Please review the error details above.") + print("=" * 60) + else: + print("\n" + "=" * 60) + print("✓ SUCCESS: All records migrated successfully!") + print("=" * 60) + + async def run(self): + """Run the migration tool""" + try: + # Print header + self.print_header() + self.print_storage_types() + + # Setup source storage + ( + self.source_storage, + source_storage_name, + self.source_workspace, + source_data + ) = await self.setup_storage("Source") + + if not self.source_storage: + print("\n✗ Source storage setup failed") + return + + if not source_data: + print("\n⚠ Source storage has no cache records to migrate") + # Cleanup + await self.source_storage.finalize() + return + + # Setup target storage + ( + self.target_storage, + target_storage_name, + self.target_workspace, + target_data + ) = await self.setup_storage("Target") + + if not self.target_storage: + print("\n✗ Target storage setup failed") + # Cleanup source + await self.source_storage.finalize() + return + + # Show migration summary + print("\n" + "=" * 50) + print("Migration Confirmation") + print("=" * 50) + print(f"Source: {source_storage_name} (workspace: {self.source_workspace if self.source_workspace else '(default)'}) - {len(source_data):,} records") + print(f"Target: {target_storage_name} (workspace: {self.target_workspace if self.target_workspace else '(default)'}) - {len(target_data):,} records") + print(f"Batch Size: {self.batch_size:,} records/batch") + + if target_data: + print(f"\n⚠ Warning: Target storage already has {len(target_data):,} records") + print("Migration will overwrite records with the same keys") + + # Confirm migration + confirm = input("\nContinue? (y/n): ").strip().lower() + if confirm != 'y': + print("\n✗ Migration cancelled") + # Cleanup + await self.source_storage.finalize() + await self.target_storage.finalize() + return + + # Perform migration with error tracking + stats = await self.migrate_caches(source_data, self.target_storage, target_storage_name) + + # Print comprehensive migration report + self.print_migration_report(stats) + + # Cleanup + await self.source_storage.finalize() + await self.target_storage.finalize() + + except KeyboardInterrupt: + print("\n\n✗ Migration interrupted by user") + except Exception as e: + print(f"\n✗ Migration failed: {e}") + import traceback + traceback.print_exc() + finally: + # Ensure cleanup + if self.source_storage: + try: + await self.source_storage.finalize() + except Exception: + pass + if self.target_storage: + try: + await self.target_storage.finalize() + except Exception: + pass + + +async def main(): + """Main entry point""" + tool = MigrationTool() + await tool.run() + + +if __name__ == "__main__": + asyncio.run(main()) From 0f2c0de8df2f84f60d8def43f8c7edf8bf799411 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 8 Nov 2025 18:16:03 +0800 Subject: [PATCH 02/12] Fix linting --- tools/README_MIGRATE_LLM_CACHE.md | 2 +- tools/migrate_llm_cache.py | 344 +++++++++++++++++------------- 2 files changed, 194 insertions(+), 152 deletions(-) diff --git a/tools/README_MIGRATE_LLM_CACHE.md b/tools/README_MIGRATE_LLM_CACHE.md index edd49acd..364083d7 100644 --- a/tools/README_MIGRATE_LLM_CACHE.md +++ b/tools/README_MIGRATE_LLM_CACHE.md @@ -261,7 +261,7 @@ The tool implements comprehensive error tracking to ensure transparent and resil ### Error Reporting After migration completes, a detailed report includes: - **Statistics**: Total records, success/failure counts, success rate -- **Error Summary**: Grouped by error type with occurrence counts +- **Error Summary**: Grouped by error type with occurrence counts - **Error Details**: Batch number, error type, message, and records lost - **Recommendations**: Clear indication of success or need for review diff --git a/tools/migrate_llm_cache.py b/tools/migrate_llm_cache.py index cb48f394..26b7c81c 100644 --- a/tools/migrate_llm_cache.py +++ b/tools/migrate_llm_cache.py @@ -10,7 +10,7 @@ Usage: Supported KV Storage Types: - JsonKVStorage - - RedisKVStorage + - RedisKVStorage - PGKVStorage - MongoKVStorage """ @@ -58,6 +58,7 @@ DEFAULT_BATCH_SIZE = 1000 @dataclass class MigrationStats: """Migration statistics and error tracking""" + total_source_records: int = 0 total_batches: int = 0 successful_batches: int = 0 @@ -65,16 +66,18 @@ class MigrationStats: successful_records: int = 0 failed_records: int = 0 errors: List[Dict[str, Any]] = field(default_factory=list) - + def add_error(self, batch_idx: int, error: Exception, batch_size: int): """Record batch error""" - self.errors.append({ - 'batch': batch_idx, - 'error_type': type(error).__name__, - 'error_msg': str(error), - 'records_lost': batch_size, - 'timestamp': time.time() - }) + self.errors.append( + { + "batch": batch_idx, + "error_type": type(error).__name__, + "error_msg": str(error), + "records_lost": batch_size, + "timestamp": time.time(), + } + ) self.failed_batches += 1 self.failed_records += batch_size @@ -91,12 +94,12 @@ class MigrationTool: def get_workspace_for_storage(self, storage_name: str) -> str: """Get workspace for a specific storage type - + Priority: Storage-specific env var > WORKSPACE env var > empty string - + Args: storage_name: Storage implementation name - + Returns: Workspace name """ @@ -105,72 +108,78 @@ class MigrationTool: specific_workspace = os.getenv(WORKSPACE_ENV_MAP[storage_name]) if specific_workspace: return specific_workspace - + # Check generic WORKSPACE workspace = os.getenv("WORKSPACE", "") return workspace def check_env_vars(self, storage_name: str) -> bool: """Check if all required environment variables exist - + Args: storage_name: Storage implementation name - + Returns: True if all required env vars exist, False otherwise """ required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, []) missing_vars = [var for var in required_vars if var not in os.environ] - + if missing_vars: - print(f"✗ Missing required environment variables: {', '.join(missing_vars)}") + print( + f"✗ Missing required environment variables: {', '.join(missing_vars)}" + ) return False - + print("✓ All required environment variables are set") return True def get_storage_class(self, storage_name: str): """Dynamically import and return storage class - + Args: storage_name: Storage implementation name - + Returns: Storage class """ if storage_name == "JsonKVStorage": from lightrag.kg.json_kv_impl import JsonKVStorage + return JsonKVStorage elif storage_name == "RedisKVStorage": from lightrag.kg.redis_impl import RedisKVStorage + return RedisKVStorage elif storage_name == "PGKVStorage": from lightrag.kg.postgres_impl import PGKVStorage + return PGKVStorage elif storage_name == "MongoKVStorage": from lightrag.kg.mongo_impl import MongoKVStorage + return MongoKVStorage else: raise ValueError(f"Unsupported storage type: {storage_name}") async def initialize_storage(self, storage_name: str, workspace: str): """Initialize storage instance - + Args: storage_name: Storage implementation name workspace: Workspace name - + Returns: Initialized storage instance """ storage_class = self.get_storage_class(storage_name) - + # Create global config global_config = { "working_dir": os.getenv("WORKING_DIR", "./rag_storage"), "embedding_batch_num": 10, } - + # Initialize storage storage = storage_class( namespace=NameSpace.KV_STORE_LLM_RESPONSE_CACHE, @@ -178,18 +187,18 @@ class MigrationTool: global_config=global_config, embedding_func=None, ) - + # Initialize the storage await storage.initialize() - + return storage async def get_default_caches_json(self, storage) -> Dict[str, Any]: """Get default caches from JsonKVStorage - + Args: storage: JsonKVStorage instance - + Returns: Dictionary of cache entries with default:extract:* or default:summary:* keys """ @@ -197,39 +206,41 @@ class MigrationTool: async with storage._storage_lock: filtered = {} for key, value in storage._data.items(): - if key.startswith("default:extract:") or key.startswith("default:summary:"): + if key.startswith("default:extract:") or key.startswith( + "default:summary:" + ): filtered[key] = value return filtered - async def get_default_caches_redis(self, storage, batch_size: int = 1000) -> Dict[str, Any]: + async def get_default_caches_redis( + self, storage, batch_size: int = 1000 + ) -> Dict[str, Any]: """Get default caches from RedisKVStorage with pagination - + Args: storage: RedisKVStorage instance batch_size: Number of keys to process per batch - + Returns: Dictionary of cache entries with default:extract:* or default:summary:* keys """ import json - + cache_data = {} - + # Use _get_redis_connection() context manager async with storage._get_redis_connection() as redis: for pattern in ["default:extract:*", "default:summary:*"]: # Add namespace prefix to pattern prefixed_pattern = f"{storage.final_namespace}:{pattern}" cursor = 0 - + while True: # SCAN already implements cursor-based pagination cursor, keys = await redis.scan( - cursor, - match=prefixed_pattern, - count=batch_size + cursor, match=prefixed_pattern, count=batch_size ) - + if keys: # Process this batch using pipeline with error handling try: @@ -237,74 +248,88 @@ class MigrationTool: for key in keys: pipe.get(key) values = await pipe.execute() - + for key, value in zip(keys, values): if value: - key_str = key.decode() if isinstance(key, bytes) else key + key_str = ( + key.decode() if isinstance(key, bytes) else key + ) # Remove namespace prefix to get original key - original_key = key_str.replace(f"{storage.final_namespace}:", "", 1) + original_key = key_str.replace( + f"{storage.final_namespace}:", "", 1 + ) cache_data[original_key] = json.loads(value) - + except Exception as e: # Pipeline execution failed, fall back to individual gets - print(f"⚠️ Pipeline execution failed for batch, using individual gets: {e}") + print( + f"⚠️ Pipeline execution failed for batch, using individual gets: {e}" + ) for key in keys: try: value = await redis.get(key) if value: - key_str = key.decode() if isinstance(key, bytes) else key - original_key = key_str.replace(f"{storage.final_namespace}:", "", 1) + key_str = ( + key.decode() + if isinstance(key, bytes) + else key + ) + original_key = key_str.replace( + f"{storage.final_namespace}:", "", 1 + ) cache_data[original_key] = json.loads(value) except Exception as individual_error: - print(f"⚠️ Failed to get individual key {key}: {individual_error}") + print( + f"⚠️ Failed to get individual key {key}: {individual_error}" + ) continue - + if cursor == 0: break - + # Yield control periodically to avoid blocking await asyncio.sleep(0) - + return cache_data - async def get_default_caches_pg(self, storage, batch_size: int = 1000) -> Dict[str, Any]: + async def get_default_caches_pg( + self, storage, batch_size: int = 1000 + ) -> Dict[str, Any]: """Get default caches from PGKVStorage with pagination - + Args: storage: PGKVStorage instance batch_size: Number of records to fetch per batch - + Returns: Dictionary of cache entries with default:extract:* or default:summary:* keys """ from lightrag.kg.postgres_impl import namespace_to_table_name - + cache_data = {} table_name = namespace_to_table_name(storage.namespace) offset = 0 - + while True: # Use LIMIT and OFFSET for pagination query = f""" SELECT id as key, original_prompt, return_value, chunk_id, cache_type, queryparam, EXTRACT(EPOCH FROM create_time)::BIGINT as create_time, EXTRACT(EPOCH FROM update_time)::BIGINT as update_time - FROM {table_name} - WHERE workspace = $1 + FROM {table_name} + WHERE workspace = $1 AND (id LIKE 'default:extract:%' OR id LIKE 'default:summary:%') ORDER BY id LIMIT $2 OFFSET $3 """ - + results = await storage.db.query( - query, - [storage.workspace, batch_size, offset], - multirows=True + query, [storage.workspace, batch_size, offset], multirows=True ) - + if not results: break - + for row in results: # Map PostgreSQL fields to cache format cache_entry = { @@ -317,61 +342,63 @@ class MigrationTool: "update_time": row.get("update_time", 0), } cache_data[row["key"]] = cache_entry - + # If we got fewer results than batch_size, we're done if len(results) < batch_size: break - + offset += batch_size - + # Yield control periodically await asyncio.sleep(0) - + return cache_data - async def get_default_caches_mongo(self, storage, batch_size: int = 1000) -> Dict[str, Any]: + async def get_default_caches_mongo( + self, storage, batch_size: int = 1000 + ) -> Dict[str, Any]: """Get default caches from MongoKVStorage with cursor-based pagination - + Args: storage: MongoKVStorage instance batch_size: Number of documents to process per batch - + Returns: Dictionary of cache entries with default:extract:* or default:summary:* keys """ cache_data = {} - + # MongoDB query with regex - use _data not collection query = {"_id": {"$regex": "^default:(extract|summary):"}} - + # Use cursor without to_list() - process in batches cursor = storage._data.find(query).batch_size(batch_size) - + async for doc in cursor: # Process each document as it comes doc_copy = doc.copy() key = doc_copy.pop("_id") - + # Filter ALL MongoDB/database-specific fields # Following .clinerules: "Always filter deprecated/incompatible fields during deserialization" for field_name in ["namespace", "workspace", "_id", "content"]: doc_copy.pop(field_name, None) - + cache_data[key] = doc_copy - + # Periodically yield control (every batch_size documents) if len(cache_data) % batch_size == 0: await asyncio.sleep(0) - + return cache_data async def get_default_caches(self, storage, storage_name: str) -> Dict[str, Any]: """Get default caches from any storage type - + Args: storage: Storage instance storage_name: Storage type name - + Returns: Dictionary of cache entries """ @@ -388,10 +415,10 @@ class MigrationTool: async def count_cache_types(self, cache_data: Dict[str, Any]) -> Dict[str, int]: """Count cache entries by type - + Args: cache_data: Dictionary of cache entries - + Returns: Dictionary with counts for each cache type """ @@ -399,13 +426,13 @@ class MigrationTool: "extract": 0, "summary": 0, } - + for key in cache_data.keys(): if key.startswith("default:extract:"): counts["extract"] += 1 elif key.startswith("default:summary:"): counts["summary"] += 1 - + return counts def print_header(self): @@ -422,11 +449,11 @@ class MigrationTool: def get_user_choice(self, prompt: str, valid_choices: list) -> str: """Get user choice with validation - + Args: prompt: Prompt message valid_choices: List of valid choices - + Returns: User's choice """ @@ -438,30 +465,29 @@ class MigrationTool: async def setup_storage(self, storage_type: str) -> tuple: """Setup and initialize storage - + Args: storage_type: Type label (source/target) - + Returns: Tuple of (storage_instance, storage_name, workspace, cache_data) """ print(f"\n=== {storage_type} Storage Setup ===") - + # Get storage type choice choice = self.get_user_choice( - f"Select {storage_type} storage type (1-4)", - list(STORAGE_TYPES.keys()) + f"Select {storage_type} storage type (1-4)", list(STORAGE_TYPES.keys()) ) storage_name = STORAGE_TYPES[choice] - + # Check environment variables print("\nChecking environment variables...") if not self.check_env_vars(storage_name): return None, None, None, None - + # Get workspace workspace = self.get_workspace_for_storage(storage_name) - + # Initialize storage print(f"\nInitializing {storage_type} storage...") try: @@ -472,86 +498,89 @@ class MigrationTool: except Exception as e: print(f"✗ Initialization failed: {e}") return None, None, None, None - + # Get cache data print("\nCounting cache records...") try: cache_data = await self.get_default_caches(storage, storage_name) counts = await self.count_cache_types(cache_data) - + print(f"- default:extract: {counts['extract']:,} records") print(f"- default:summary: {counts['summary']:,} records") print(f"- Total: {len(cache_data):,} records") except Exception as e: print(f"✗ Counting failed: {e}") return None, None, None, None - + return storage, storage_name, workspace, cache_data async def migrate_caches( - self, - source_data: Dict[str, Any], - target_storage, - target_storage_name: str + self, source_data: Dict[str, Any], target_storage, target_storage_name: str ) -> MigrationStats: """Migrate caches in batches with error tracking - + Args: source_data: Source cache data target_storage: Target storage instance target_storage_name: Target storage type name - + Returns: MigrationStats object with migration results and errors """ stats = MigrationStats() stats.total_source_records = len(source_data) - + if stats.total_source_records == 0: print("\nNo records to migrate") return stats - + # Convert to list for batching items = list(source_data.items()) - stats.total_batches = (stats.total_source_records + self.batch_size - 1) // self.batch_size - + stats.total_batches = ( + stats.total_source_records + self.batch_size - 1 + ) // self.batch_size + print("\n=== Starting Migration ===") - + for batch_idx in range(stats.total_batches): start_idx = batch_idx * self.batch_size end_idx = min((batch_idx + 1) * self.batch_size, stats.total_source_records) batch_items = items[start_idx:end_idx] batch_data = dict(batch_items) - + # Determine current cache type for display current_key = batch_items[0][0] cache_type = "extract" if "extract" in current_key else "summary" - + try: # Attempt to write batch await target_storage.upsert(batch_data) - + # Success - update stats stats.successful_batches += 1 stats.successful_records += len(batch_data) - + # Calculate progress progress = (end_idx / stats.total_source_records) * 100 bar_length = 20 filled_length = int(bar_length * end_idx // stats.total_source_records) bar = "█" * filled_length + "░" * (bar_length - filled_length) - - print(f"Batch {batch_idx + 1}/{stats.total_batches}: {bar} " - f"{end_idx:,}/{stats.total_source_records:,} ({progress:.0f}%) - " - f"default:{cache_type} ✓") - + + print( + f"Batch {batch_idx + 1}/{stats.total_batches}: {bar} " + f"{end_idx:,}/{stats.total_source_records:,} ({progress:.0f}%) - " + f"default:{cache_type} ✓" + ) + except Exception as e: # Error - record and continue stats.add_error(batch_idx + 1, e, len(batch_data)) - - print(f"Batch {batch_idx + 1}/{stats.total_batches}: ✗ FAILED - " - f"{type(e).__name__}: {str(e)}") - + + print( + f"Batch {batch_idx + 1}/{stats.total_batches}: ✗ FAILED - " + f"{type(e).__name__}: {str(e)}" + ) + # Final persist print("\nPersisting data to disk...") try: @@ -560,19 +589,19 @@ class MigrationTool: except Exception as e: print(f"✗ Persist failed: {e}") stats.add_error(0, e, 0) # batch 0 = persist error - + return stats def print_migration_report(self, stats: MigrationStats): """Print comprehensive migration report - + Args: stats: MigrationStats object with migration results """ print("\n" + "=" * 60) print("Migration Complete - Final Report") print("=" * 60) - + # Overall statistics print("\n📊 Statistics:") print(f" Total source records: {stats.total_source_records:,}") @@ -581,37 +610,41 @@ class MigrationTool: print(f" Failed batches: {stats.failed_batches:,}") print(f" Successfully migrated: {stats.successful_records:,}") print(f" Failed to migrate: {stats.failed_records:,}") - + # Success rate - success_rate = (stats.successful_records / stats.total_source_records * 100) if stats.total_source_records > 0 else 0 + success_rate = ( + (stats.successful_records / stats.total_source_records * 100) + if stats.total_source_records > 0 + else 0 + ) print(f" Success rate: {success_rate:.2f}%") - + # Error details if stats.errors: print(f"\n⚠️ Errors encountered: {len(stats.errors)}") print("\nError Details:") print("-" * 60) - + # Group errors by type error_types = {} for error in stats.errors: - err_type = error['error_type'] + err_type = error["error_type"] error_types[err_type] = error_types.get(err_type, 0) + 1 - + print("\nError Summary:") for err_type, count in sorted(error_types.items(), key=lambda x: -x[1]): print(f" - {err_type}: {count} occurrence(s)") - + print("\nFirst 5 errors:") for i, error in enumerate(stats.errors[:5], 1): print(f"\n {i}. Batch {error['batch']}") print(f" Type: {error['error_type']}") print(f" Message: {error['error_msg']}") print(f" Records lost: {error['records_lost']:,}") - + if len(stats.errors) > 5: print(f"\n ... and {len(stats.errors) - 5} more errors") - + print("\n" + "=" * 60) print("⚠️ WARNING: Migration completed with errors!") print(" Please review the error details above.") @@ -627,75 +660,84 @@ class MigrationTool: # Print header self.print_header() self.print_storage_types() - + # Setup source storage ( self.source_storage, source_storage_name, self.source_workspace, - source_data + source_data, ) = await self.setup_storage("Source") - + if not self.source_storage: print("\n✗ Source storage setup failed") return - + if not source_data: print("\n⚠ Source storage has no cache records to migrate") # Cleanup await self.source_storage.finalize() return - + # Setup target storage ( self.target_storage, target_storage_name, self.target_workspace, - target_data + target_data, ) = await self.setup_storage("Target") - + if not self.target_storage: print("\n✗ Target storage setup failed") # Cleanup source await self.source_storage.finalize() return - + # Show migration summary print("\n" + "=" * 50) print("Migration Confirmation") print("=" * 50) - print(f"Source: {source_storage_name} (workspace: {self.source_workspace if self.source_workspace else '(default)'}) - {len(source_data):,} records") - print(f"Target: {target_storage_name} (workspace: {self.target_workspace if self.target_workspace else '(default)'}) - {len(target_data):,} records") + print( + f"Source: {source_storage_name} (workspace: {self.source_workspace if self.source_workspace else '(default)'}) - {len(source_data):,} records" + ) + print( + f"Target: {target_storage_name} (workspace: {self.target_workspace if self.target_workspace else '(default)'}) - {len(target_data):,} records" + ) print(f"Batch Size: {self.batch_size:,} records/batch") - + if target_data: - print(f"\n⚠ Warning: Target storage already has {len(target_data):,} records") + print( + f"\n⚠ Warning: Target storage already has {len(target_data):,} records" + ) print("Migration will overwrite records with the same keys") - + # Confirm migration confirm = input("\nContinue? (y/n): ").strip().lower() - if confirm != 'y': + if confirm != "y": print("\n✗ Migration cancelled") # Cleanup await self.source_storage.finalize() await self.target_storage.finalize() return - + # Perform migration with error tracking - stats = await self.migrate_caches(source_data, self.target_storage, target_storage_name) - + stats = await self.migrate_caches( + source_data, self.target_storage, target_storage_name + ) + # Print comprehensive migration report self.print_migration_report(stats) - + # Cleanup await self.source_storage.finalize() await self.target_storage.finalize() - + except KeyboardInterrupt: print("\n\n✗ Migration interrupted by user") except Exception as e: print(f"\n✗ Migration failed: {e}") import traceback + traceback.print_exc() finally: # Ensure cleanup From 6fc54d3625a9272521e8565ea691cd8084b5343d Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 8 Nov 2025 18:33:13 +0800 Subject: [PATCH 03/12] Move LLM cache migration tool to lightrag.tools module - Relocated tool to proper package structure - Updated import paths and documentation - Added shared storage initialization - Fixed module path resolution - Updated usage instructions --- .../tools}/README_MIGRATE_LLM_CACHE.md | 8 +++++--- .../tools}/migrate_llm_cache.py | 19 ++++++++++++++++--- 2 files changed, 21 insertions(+), 6 deletions(-) rename {tools => lightrag/tools}/README_MIGRATE_LLM_CACHE.md (98%) rename {tools => lightrag/tools}/migrate_llm_cache.py (97%) diff --git a/tools/README_MIGRATE_LLM_CACHE.md b/lightrag/tools/README_MIGRATE_LLM_CACHE.md similarity index 98% rename from tools/README_MIGRATE_LLM_CACHE.md rename to lightrag/tools/README_MIGRATE_LLM_CACHE.md index 364083d7..3fef0303 100644 --- a/tools/README_MIGRATE_LLM_CACHE.md +++ b/lightrag/tools/README_MIGRATE_LLM_CACHE.md @@ -78,7 +78,9 @@ pip install -r requirements.txt Run from the LightRAG project root directory: ```bash -python tools/migrate_llm_cache.py +python -m lightrag.tools.migrate_llm_cache +# or +python lightrag/tools/migrate_llm_cache.py ``` ### Interactive Workflow @@ -328,7 +330,7 @@ MONGO_URI=mongodb://user:pass@prod-server:27017/ MONGO_DATABASE=LightRAG # 2. Run tool -python tools/migrate_llm_cache.py +python -m lightrag.tools.migrate_llm_cache # 3. Select: 1 (JsonKVStorage) -> 4 (MongoKVStorage) ``` @@ -356,7 +358,7 @@ POSTGRES_HOST=new-postgres-server # ... Other PostgreSQL configs # 2. Run tool -python tools/migrate_llm_cache.py +python -m lightrag.tools.migrate_llm_cache # 3. Select: 2 (RedisKVStorage) -> 3 (PGKVStorage) ``` diff --git a/tools/migrate_llm_cache.py b/lightrag/tools/migrate_llm_cache.py similarity index 97% rename from tools/migrate_llm_cache.py rename to lightrag/tools/migrate_llm_cache.py index 26b7c81c..68866264 100644 --- a/tools/migrate_llm_cache.py +++ b/lightrag/tools/migrate_llm_cache.py @@ -6,7 +6,9 @@ This tool migrates LLM response cache (default:extract:* and default:summary:*) between different KV storage implementations while preserving workspace isolation. Usage: - python tools/migrate_llm_cache.py + python -m lightrag.tools.migrate_llm_cache + # or + python lightrag/tools/migrate_llm_cache.py Supported KV Storage Types: - JsonKVStorage @@ -23,8 +25,8 @@ from typing import Any, Dict, List from dataclasses import dataclass, field from dotenv import load_dotenv -# Add parent directory to path for imports -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +# Add project root to path for imports +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from lightrag.kg import STORAGE_ENV_REQUIREMENTS from lightrag.namespace import NameSpace @@ -657,6 +659,10 @@ class MigrationTool: async def run(self): """Run the migration tool""" try: + # Initialize shared storage (REQUIRED for storage classes to work) + from lightrag.kg.shared_storage import initialize_share_data + initialize_share_data(workers=1) + # Print header self.print_header() self.print_storage_types() @@ -751,6 +757,13 @@ class MigrationTool: await self.target_storage.finalize() except Exception: pass + + # Finalize shared storage + try: + from lightrag.kg.shared_storage import finalize_share_data + finalize_share_data() + except Exception: + pass async def main(): From d0d31e9262c3b5b1492dd64efd517edb5e4906a2 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 8 Nov 2025 18:52:33 +0800 Subject: [PATCH 04/12] Improve LLM cache migration tool configuration and messaging --- lightrag/tools/migrate_llm_cache.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lightrag/tools/migrate_llm_cache.py b/lightrag/tools/migrate_llm_cache.py index 68866264..a9d1f18a 100644 --- a/lightrag/tools/migrate_llm_cache.py +++ b/lightrag/tools/migrate_llm_cache.py @@ -33,6 +33,9 @@ from lightrag.namespace import NameSpace from lightrag.utils import setup_logger # Load environment variables +# use the .env that is inside the current folder +# allows to use different .env file for each lightrag instance +# the OS environment variables take precedence over the .env file load_dotenv(dotenv_path=".env", override=False) # Setup logger @@ -713,7 +716,7 @@ class MigrationTool: if target_data: print( - f"\n⚠ Warning: Target storage already has {len(target_data):,} records" + f"\n⚠️ Warning: Target storage already has {len(target_data):,} records" ) print("Migration will overwrite records with the same keys") From 6b9f13c79287ac26ae598da851a6406d392f3421 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 8 Nov 2025 19:38:00 +0800 Subject: [PATCH 05/12] Enhance LLM cache migration tool with streaming and improved UX - Add streaming migration for memory efficiency - Implement graceful exit with Enter/0 - Add progress indicators for counting - Optimize batch processing by storage type - Update docs with new progress displays --- lightrag/tools/README_MIGRATE_LLM_CACHE.md | 25 +- lightrag/tools/migrate_llm_cache.py | 559 +++++++++++++++++++-- 2 files changed, 538 insertions(+), 46 deletions(-) diff --git a/lightrag/tools/README_MIGRATE_LLM_CACHE.md b/lightrag/tools/README_MIGRATE_LLM_CACHE.md index 3fef0303..b06efa5b 100644 --- a/lightrag/tools/README_MIGRATE_LLM_CACHE.md +++ b/lightrag/tools/README_MIGRATE_LLM_CACHE.md @@ -95,9 +95,11 @@ Supported KV Storage Types: [3] PGKVStorage [4] MongoKVStorage -Select Source storage type (1-4): 1 +Select Source storage type (1-4) (Press Enter or 0 to exit): 1 ``` +**Note**: You can press Enter or type `0` at the source storage selection to exit gracefully. + #### 2. Source Storage Validation The tool will: - Check required environment variables @@ -115,22 +117,33 @@ Initializing Source storage... - Connection Status: ✓ Success Counting cache records... -- default:extract: 8,500 records -- default:summary: 234 records - Total: 8,734 records ``` +**Progress Display by Storage Type:** +- **JsonKVStorage**: Fast in-memory counting, no progress display needed +- **RedisKVStorage**: Real-time scanning progress + ``` + Scanning Redis keys... found 8,734 records + ``` +- **PostgreSQL**: Shows timing if operation takes >1 second + ``` + Counting PostgreSQL records... (took 2.3s) + ``` +- **MongoDB**: Shows timing if operation takes >1 second + ``` + Counting MongoDB documents... (took 1.8s) + ``` + #### 3. Select Target Storage Type Repeat steps 1-2 to select and validate the target storage. #### 4. Confirm Migration -Review the migration summary and confirm: - ``` -Migration Confirmation ================================================== +Migration Confirmation Source: JsonKVStorage (workspace: space1) - 8,734 records Target: MongoKVStorage (workspace: space1) - 0 records Batch Size: 1,000 records/batch diff --git a/lightrag/tools/migrate_llm_cache.py b/lightrag/tools/migrate_llm_cache.py index a9d1f18a..a339d985 100644 --- a/lightrag/tools/migrate_llm_cache.py +++ b/lightrag/tools/migrate_llm_cache.py @@ -26,7 +26,9 @@ from dataclasses import dataclass, field from dotenv import load_dotenv # Add project root to path for imports -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) +sys.path.insert( + 0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +) from lightrag.kg import STORAGE_ENV_REQUIREMENTS from lightrag.namespace import NameSpace @@ -60,6 +62,10 @@ WORKSPACE_ENV_MAP = { DEFAULT_BATCH_SIZE = 1000 +# Default count batch size for efficient counting +DEFAULT_COUNT_BATCH_SIZE = 1000 + + @dataclass class MigrationStats: """Migration statistics and error tracking""" @@ -418,6 +424,351 @@ class MigrationTool: else: raise ValueError(f"Unsupported storage type: {storage_name}") + async def count_default_caches_json(self, storage) -> int: + """Count default caches in JsonKVStorage - O(N) but very fast in-memory + + Args: + storage: JsonKVStorage instance + + Returns: + Total count of cache records + """ + async with storage._storage_lock: + return sum( + 1 + for key in storage._data.keys() + if key.startswith("default:extract:") + or key.startswith("default:summary:") + ) + + async def count_default_caches_redis(self, storage) -> int: + """Count default caches in RedisKVStorage using SCAN with progress display + + Args: + storage: RedisKVStorage instance + + Returns: + Total count of cache records + """ + count = 0 + print("Scanning Redis keys...", end="", flush=True) + + async with storage._get_redis_connection() as redis: + for pattern in ["default:extract:*", "default:summary:*"]: + prefixed_pattern = f"{storage.final_namespace}:{pattern}" + cursor = 0 + while True: + cursor, keys = await redis.scan( + cursor, match=prefixed_pattern, count=DEFAULT_COUNT_BATCH_SIZE + ) + count += len(keys) + + # Show progress + print( + f"\rScanning Redis keys... found {count:,} records", + end="", + flush=True, + ) + + if cursor == 0: + break + + print() # New line after progress + return count + + async def count_default_caches_pg(self, storage) -> int: + """Count default caches in PostgreSQL using COUNT(*) with progress indicator + + Args: + storage: PGKVStorage instance + + Returns: + Total count of cache records + """ + from lightrag.kg.postgres_impl import namespace_to_table_name + + table_name = namespace_to_table_name(storage.namespace) + + query = f""" + SELECT COUNT(*) as count + FROM {table_name} + WHERE workspace = $1 + AND (id LIKE 'default:extract:%' OR id LIKE 'default:summary:%') + """ + + print("Counting PostgreSQL records...", end="", flush=True) + start_time = time.time() + + result = await storage.db.query(query, [storage.workspace]) + + elapsed = time.time() - start_time + if elapsed > 1: + print(f" (took {elapsed:.1f}s)", end="") + print() # New line + + return result["count"] if result else 0 + + async def count_default_caches_mongo(self, storage) -> int: + """Count default caches in MongoDB using count_documents with progress indicator + + Args: + storage: MongoKVStorage instance + + Returns: + Total count of cache records + """ + query = {"_id": {"$regex": "^default:(extract|summary):"}} + + print("Counting MongoDB documents...", end="", flush=True) + start_time = time.time() + + count = await storage._data.count_documents(query) + + elapsed = time.time() - start_time + if elapsed > 1: + print(f" (took {elapsed:.1f}s)", end="") + print() # New line + + return count + + async def count_default_caches(self, storage, storage_name: str) -> int: + """Count default caches from any storage type efficiently + + Args: + storage: Storage instance + storage_name: Storage type name + + Returns: + Total count of cache records + """ + if storage_name == "JsonKVStorage": + return await self.count_default_caches_json(storage) + elif storage_name == "RedisKVStorage": + return await self.count_default_caches_redis(storage) + elif storage_name == "PGKVStorage": + return await self.count_default_caches_pg(storage) + elif storage_name == "MongoKVStorage": + return await self.count_default_caches_mongo(storage) + else: + raise ValueError(f"Unsupported storage type: {storage_name}") + + async def stream_default_caches_json(self, storage, batch_size: int): + """Stream default caches from JsonKVStorage - yields batches + + Args: + storage: JsonKVStorage instance + batch_size: Size of each batch to yield + + Yields: + Dictionary batches of cache entries + """ + async with storage._storage_lock: + batch = {} + for key, value in storage._data.items(): + if key.startswith("default:extract:") or key.startswith( + "default:summary:" + ): + batch[key] = value + if len(batch) >= batch_size: + yield batch + batch = {} + # Yield remaining items + if batch: + yield batch + + async def stream_default_caches_redis(self, storage, batch_size: int): + """Stream default caches from RedisKVStorage - yields batches + + Args: + storage: RedisKVStorage instance + batch_size: Size of each batch to yield + + Yields: + Dictionary batches of cache entries + """ + import json + + async with storage._get_redis_connection() as redis: + for pattern in ["default:extract:*", "default:summary:*"]: + prefixed_pattern = f"{storage.final_namespace}:{pattern}" + cursor = 0 + + while True: + cursor, keys = await redis.scan( + cursor, match=prefixed_pattern, count=batch_size + ) + + if keys: + try: + pipe = redis.pipeline() + for key in keys: + pipe.get(key) + values = await pipe.execute() + + batch = {} + for key, value in zip(keys, values): + if value: + key_str = ( + key.decode() if isinstance(key, bytes) else key + ) + original_key = key_str.replace( + f"{storage.final_namespace}:", "", 1 + ) + batch[original_key] = json.loads(value) + + if batch: + yield batch + + except Exception as e: + print(f"⚠️ Pipeline execution failed for batch: {e}") + # Fall back to individual gets + batch = {} + for key in keys: + try: + value = await redis.get(key) + if value: + key_str = ( + key.decode() + if isinstance(key, bytes) + else key + ) + original_key = key_str.replace( + f"{storage.final_namespace}:", "", 1 + ) + batch[original_key] = json.loads(value) + except Exception as individual_error: + print( + f"⚠️ Failed to get individual key {key}: {individual_error}" + ) + continue + + if batch: + yield batch + + if cursor == 0: + break + + await asyncio.sleep(0) + + async def stream_default_caches_pg(self, storage, batch_size: int): + """Stream default caches from PostgreSQL - yields batches + + Args: + storage: PGKVStorage instance + batch_size: Size of each batch to yield + + Yields: + Dictionary batches of cache entries + """ + from lightrag.kg.postgres_impl import namespace_to_table_name + + table_name = namespace_to_table_name(storage.namespace) + offset = 0 + + while True: + query = f""" + SELECT id as key, original_prompt, return_value, chunk_id, cache_type, queryparam, + EXTRACT(EPOCH FROM create_time)::BIGINT as create_time, + EXTRACT(EPOCH FROM update_time)::BIGINT as update_time + FROM {table_name} + WHERE workspace = $1 + AND (id LIKE 'default:extract:%' OR id LIKE 'default:summary:%') + ORDER BY id + LIMIT $2 OFFSET $3 + """ + + results = await storage.db.query( + query, [storage.workspace, batch_size, offset], multirows=True + ) + + if not results: + break + + batch = {} + for row in results: + cache_entry = { + "return": row.get("return_value", ""), + "cache_type": row.get("cache_type"), + "original_prompt": row.get("original_prompt", ""), + "chunk_id": row.get("chunk_id"), + "queryparam": row.get("queryparam"), + "create_time": row.get("create_time", 0), + "update_time": row.get("update_time", 0), + } + batch[row["key"]] = cache_entry + + if batch: + yield batch + + if len(results) < batch_size: + break + + offset += batch_size + await asyncio.sleep(0) + + async def stream_default_caches_mongo(self, storage, batch_size: int): + """Stream default caches from MongoDB - yields batches + + Args: + storage: MongoKVStorage instance + batch_size: Size of each batch to yield + + Yields: + Dictionary batches of cache entries + """ + query = {"_id": {"$regex": "^default:(extract|summary):"}} + cursor = storage._data.find(query).batch_size(batch_size) + + batch = {} + async for doc in cursor: + doc_copy = doc.copy() + key = doc_copy.pop("_id") + + # Filter MongoDB/database-specific fields + for field_name in ["namespace", "workspace", "_id", "content"]: + doc_copy.pop(field_name, None) + + batch[key] = doc_copy + + if len(batch) >= batch_size: + yield batch + batch = {} + + # Yield remaining items + if batch: + yield batch + + async def stream_default_caches( + self, storage, storage_name: str, batch_size: int = None + ): + """Stream default caches from any storage type - unified interface + + Args: + storage: Storage instance + storage_name: Storage type name + batch_size: Size of each batch to yield (defaults to self.batch_size) + + Yields: + Dictionary batches of cache entries + """ + if batch_size is None: + batch_size = self.batch_size + + if storage_name == "JsonKVStorage": + async for batch in self.stream_default_caches_json(storage, batch_size): + yield batch + elif storage_name == "RedisKVStorage": + async for batch in self.stream_default_caches_redis(storage, batch_size): + yield batch + elif storage_name == "PGKVStorage": + async for batch in self.stream_default_caches_pg(storage, batch_size): + yield batch + elif storage_name == "MongoKVStorage": + async for batch in self.stream_default_caches_mongo(storage, batch_size): + yield batch + else: + raise ValueError(f"Unsupported storage type: {storage_name}") + async def count_cache_types(self, cache_data: Dict[str, Any]) -> Dict[str, int]: """Count cache entries by type @@ -452,43 +803,65 @@ class MigrationTool: for key, value in STORAGE_TYPES.items(): print(f"[{key}] {value}") - def get_user_choice(self, prompt: str, valid_choices: list) -> str: + def get_user_choice( + self, prompt: str, valid_choices: list, allow_exit: bool = False + ) -> str: """Get user choice with validation Args: prompt: Prompt message valid_choices: List of valid choices + allow_exit: If True, allow user to press Enter or input '0' to exit Returns: - User's choice + User's choice, or None if user chose to exit """ + exit_hint = " (Press Enter or 0 to exit)" if allow_exit else "" while True: - choice = input(f"\n{prompt}: ").strip() + choice = input(f"\n{prompt}{exit_hint}: ").strip() + + # Check for exit + if allow_exit and (choice == "" or choice == "0"): + return None + if choice in valid_choices: return choice print(f"✗ Invalid choice, please enter one of: {', '.join(valid_choices)}") - async def setup_storage(self, storage_type: str) -> tuple: + async def setup_storage( + self, storage_type: str, use_streaming: bool = False + ) -> tuple: """Setup and initialize storage Args: storage_type: Type label (source/target) + use_streaming: If True, only count records without loading. If False, load all data (legacy mode) Returns: - Tuple of (storage_instance, storage_name, workspace, cache_data) + Tuple of (storage_instance, storage_name, workspace, total_count) + Returns (None, None, None, 0) if user chooses to exit """ print(f"\n=== {storage_type} Storage Setup ===") - # Get storage type choice + # Get storage type choice - allow exit for source storage + allow_exit = storage_type == "Source" choice = self.get_user_choice( - f"Select {storage_type} storage type (1-4)", list(STORAGE_TYPES.keys()) + f"Select {storage_type} storage type (1-4)", + list(STORAGE_TYPES.keys()), + allow_exit=allow_exit, ) + + # Handle exit + if choice is None: + print("\n✓ Migration cancelled by user") + return None, None, None, 0 + storage_name = STORAGE_TYPES[choice] # Check environment variables print("\nChecking environment variables...") if not self.check_env_vars(storage_name): - return None, None, None, None + return None, None, None, 0 # Get workspace workspace = self.get_workspace_for_storage(storage_name) @@ -502,27 +875,34 @@ class MigrationTool: print("- Connection Status: ✓ Success") except Exception as e: print(f"✗ Initialization failed: {e}") - return None, None, None, None + return None, None, None, 0 - # Get cache data - print("\nCounting cache records...") + # Count cache records efficiently + print(f"\n{'Counting' if use_streaming else 'Loading'} cache records...") try: - cache_data = await self.get_default_caches(storage, storage_name) - counts = await self.count_cache_types(cache_data) + if use_streaming: + # Use efficient counting without loading data + total_count = await self.count_default_caches(storage, storage_name) + print(f"- Total: {total_count:,} records") + else: + # Legacy mode: load all data + cache_data = await self.get_default_caches(storage, storage_name) + counts = await self.count_cache_types(cache_data) + total_count = len(cache_data) - print(f"- default:extract: {counts['extract']:,} records") - print(f"- default:summary: {counts['summary']:,} records") - print(f"- Total: {len(cache_data):,} records") + print(f"- default:extract: {counts['extract']:,} records") + print(f"- default:summary: {counts['summary']:,} records") + print(f"- Total: {total_count:,} records") except Exception as e: - print(f"✗ Counting failed: {e}") - return None, None, None, None + print(f"✗ {'Counting' if use_streaming else 'Loading'} failed: {e}") + return None, None, None, 0 - return storage, storage_name, workspace, cache_data + return storage, storage_name, workspace, total_count async def migrate_caches( self, source_data: Dict[str, Any], target_storage, target_storage_name: str ) -> MigrationStats: - """Migrate caches in batches with error tracking + """Migrate caches in batches with error tracking (Legacy mode - loads all data) Args: source_data: Source cache data @@ -597,6 +977,98 @@ class MigrationTool: return stats + async def migrate_caches_streaming( + self, + source_storage, + source_storage_name: str, + target_storage, + target_storage_name: str, + total_records: int, + ) -> MigrationStats: + """Migrate caches using streaming approach - minimal memory footprint + + Args: + source_storage: Source storage instance + source_storage_name: Source storage type name + target_storage: Target storage instance + target_storage_name: Target storage type name + total_records: Total number of records to migrate + + Returns: + MigrationStats object with migration results and errors + """ + stats = MigrationStats() + stats.total_source_records = total_records + + if stats.total_source_records == 0: + print("\nNo records to migrate") + return stats + + # Calculate total batches + stats.total_batches = (total_records + self.batch_size - 1) // self.batch_size + + print("\n=== Starting Streaming Migration ===") + print( + f"💡 Memory-optimized mode: Processing {self.batch_size:,} records at a time\n" + ) + + batch_idx = 0 + + # Stream batches from source and write to target immediately + async for batch in self.stream_default_caches( + source_storage, source_storage_name + ): + batch_idx += 1 + + # Determine current cache type for display + if batch: + first_key = next(iter(batch.keys())) + cache_type = "extract" if "extract" in first_key else "summary" + else: + cache_type = "unknown" + + try: + # Write batch to target storage + await target_storage.upsert(batch) + + # Success - update stats + stats.successful_batches += 1 + stats.successful_records += len(batch) + + # Calculate progress with known total + progress = (stats.successful_records / total_records) * 100 + bar_length = 20 + filled_length = int( + bar_length * stats.successful_records // total_records + ) + bar = "█" * filled_length + "░" * (bar_length - filled_length) + + print( + f"Batch {batch_idx}/{stats.total_batches}: {bar} " + f"{stats.successful_records:,}/{total_records:,} ({progress:.1f}%) - " + f"default:{cache_type} ✓" + ) + + except Exception as e: + # Error - record and continue + stats.add_error(batch_idx, e, len(batch)) + + print( + f"Batch {batch_idx}/{stats.total_batches}: ✗ FAILED - " + f"{type(e).__name__}: {str(e)}" + ) + + # Final persist + print("\nPersisting data to disk...") + try: + await target_storage.index_done_callback() + print("✓ Data persisted successfully") + except Exception as e: + print(f"✗ Persist failed: {e}") + stats.add_error(0, e, 0) # batch 0 = persist error + + return stats + def print_migration_report(self, stats: MigrationStats): """Print comprehensive migration report @@ -660,41 +1132,42 @@ class MigrationTool: print("=" * 60) async def run(self): - """Run the migration tool""" + """Run the migration tool with streaming approach""" try: # Initialize shared storage (REQUIRED for storage classes to work) from lightrag.kg.shared_storage import initialize_share_data + initialize_share_data(workers=1) - + # Print header self.print_header() self.print_storage_types() - # Setup source storage + # Setup source storage with streaming (only count, don't load all data) ( self.source_storage, source_storage_name, self.source_workspace, - source_data, - ) = await self.setup_storage("Source") + source_count, + ) = await self.setup_storage("Source", use_streaming=True) - if not self.source_storage: - print("\n✗ Source storage setup failed") + # Check if user cancelled (setup_storage returns None for all fields) + if self.source_storage is None: return - if not source_data: + if source_count == 0: print("\n⚠ Source storage has no cache records to migrate") # Cleanup await self.source_storage.finalize() return - # Setup target storage + # Setup target storage with streaming (only count, don't load all data) ( self.target_storage, target_storage_name, self.target_workspace, - target_data, - ) = await self.setup_storage("Target") + target_count, + ) = await self.setup_storage("Target", use_streaming=True) if not self.target_storage: print("\n✗ Target storage setup failed") @@ -707,16 +1180,17 @@ class MigrationTool: print("Migration Confirmation") print("=" * 50) print( - f"Source: {source_storage_name} (workspace: {self.source_workspace if self.source_workspace else '(default)'}) - {len(source_data):,} records" + f"Source: {source_storage_name} (workspace: {self.source_workspace if self.source_workspace else '(default)'}) - {source_count:,} records" ) print( - f"Target: {target_storage_name} (workspace: {self.target_workspace if self.target_workspace else '(default)'}) - {len(target_data):,} records" + f"Target: {target_storage_name} (workspace: {self.target_workspace if self.target_workspace else '(default)'}) - {target_count:,} records" ) print(f"Batch Size: {self.batch_size:,} records/batch") + print("Memory Mode: Streaming (memory-optimized)") - if target_data: + if target_count > 0: print( - f"\n⚠️ Warning: Target storage already has {len(target_data):,} records" + f"\n⚠️ Warning: Target storage already has {target_count:,} records" ) print("Migration will overwrite records with the same keys") @@ -729,9 +1203,13 @@ class MigrationTool: await self.target_storage.finalize() return - # Perform migration with error tracking - stats = await self.migrate_caches( - source_data, self.target_storage, target_storage_name + # Perform streaming migration with error tracking + stats = await self.migrate_caches_streaming( + self.source_storage, + source_storage_name, + self.target_storage, + target_storage_name, + source_count, ) # Print comprehensive migration report @@ -760,10 +1238,11 @@ class MigrationTool: await self.target_storage.finalize() except Exception: pass - + # Finalize shared storage try: from lightrag.kg.shared_storage import finalize_share_data + finalize_share_data() except Exception: pass From 5be04263b2f81a147937ddc2c102869b9b521e17 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 8 Nov 2025 19:58:36 +0800 Subject: [PATCH 06/12] Fix deadlock in JSON cache migration and prevent same storage selection - Snapshot JSON data before yielding batches - Release lock during batch processing - Exclude source type from target selection - Add detailed docstring for lock behavior - Filter available storage types properly --- lightrag/tools/migrate_llm_cache.py | 64 ++++++++++++++++++++++------- 1 file changed, 50 insertions(+), 14 deletions(-) diff --git a/lightrag/tools/migrate_llm_cache.py b/lightrag/tools/migrate_llm_cache.py index a339d985..4c147aae 100644 --- a/lightrag/tools/migrate_llm_cache.py +++ b/lightrag/tools/migrate_llm_cache.py @@ -561,20 +561,33 @@ class MigrationTool: Yields: Dictionary batches of cache entries + + Note: + This method creates a snapshot of matching items while holding the lock, + then releases the lock before yielding batches. This prevents deadlock + when the target storage (also JsonKVStorage) tries to acquire the same + lock during upsert operations. """ + # Create a snapshot of matching items while holding the lock async with storage._storage_lock: - batch = {} - for key, value in storage._data.items(): - if key.startswith("default:extract:") or key.startswith( - "default:summary:" - ): - batch[key] = value - if len(batch) >= batch_size: - yield batch - batch = {} - # Yield remaining items - if batch: + matching_items = [ + (key, value) + for key, value in storage._data.items() + if key.startswith("default:extract:") + or key.startswith("default:summary:") + ] + + # Now iterate over snapshot without holding lock + batch = {} + for key, value in matching_items: + batch[key] = value + if len(batch) >= batch_size: yield batch + batch = {} + + # Yield remaining items + if batch: + yield batch async def stream_default_caches_redis(self, storage, batch_size: int): """Stream default caches from RedisKVStorage - yields batches @@ -829,13 +842,17 @@ class MigrationTool: print(f"✗ Invalid choice, please enter one of: {', '.join(valid_choices)}") async def setup_storage( - self, storage_type: str, use_streaming: bool = False + self, + storage_type: str, + use_streaming: bool = False, + exclude_storage_name: str = None, ) -> tuple: """Setup and initialize storage Args: storage_type: Type label (source/target) use_streaming: If True, only count records without loading. If False, load all data (legacy mode) + exclude_storage_name: Storage type to exclude from selection (e.g., to prevent selecting same as source) Returns: Tuple of (storage_instance, storage_name, workspace, total_count) @@ -843,11 +860,27 @@ class MigrationTool: """ print(f"\n=== {storage_type} Storage Setup ===") + # Filter available storage types if exclusion is specified + available_types = STORAGE_TYPES.copy() + if exclude_storage_name: + # Remove the excluded storage type from available options + available_types = { + k: v for k, v in STORAGE_TYPES.items() if v != exclude_storage_name + } + + # Print available types + print("\nAvailable Storage Types for Target:") + for key, value in available_types.items(): + print(f"[{key}] {value}") + else: + # Print all storage types for source + self.print_storage_types() + # Get storage type choice - allow exit for source storage allow_exit = storage_type == "Source" choice = self.get_user_choice( f"Select {storage_type} storage type (1-4)", - list(STORAGE_TYPES.keys()), + list(available_types.keys()), allow_exit=allow_exit, ) @@ -1162,12 +1195,15 @@ class MigrationTool: return # Setup target storage with streaming (only count, don't load all data) + # Exclude source storage type from target selection ( self.target_storage, target_storage_name, self.target_workspace, target_count, - ) = await self.setup_storage("Target", use_streaming=True) + ) = await self.setup_storage( + "Target", use_streaming=True, exclude_storage_name=source_storage_name + ) if not self.target_storage: print("\n✗ Target storage setup failed") From b72632e4d4cfa5ead6896da59af8c93bfbd307c8 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 8 Nov 2025 20:03:59 +0800 Subject: [PATCH 07/12] Add async generator lock management rule to cline extension --- .clinerules/01-basic.md | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/.clinerules/01-basic.md b/.clinerules/01-basic.md index 566a8159..955afa83 100644 --- a/.clinerules/01-basic.md +++ b/.clinerules/01-basic.md @@ -47,6 +47,31 @@ else np.frombuffer(base64.b64decode(dp.embedding), dtype=np.float32) **Location**: Neo4j storage finalization **Impact**: Prevents application shutdown failures +### 6. Async Generator Lock Management (CRITICAL) +**Pattern**: Never hold locks across async generator yields - create snapshots instead +**Issue**: Holding locks while yielding causes deadlock when consumers need the same lock +**Location**: `lightrag/tools/migrate_llm_cache.py` - `stream_default_caches_json` +**Solution**: Create snapshot of data while holding lock, release lock, then iterate over snapshot +```python +# WRONG - Deadlock prone: +async with storage._storage_lock: + for key, value in storage._data.items(): + batch[key] = value + if len(batch) >= batch_size: + yield batch # Lock still held! + +# CORRECT - Snapshot approach: +async with storage._storage_lock: + matching_items = [(k, v) for k, v in storage._data.items() if condition] +# Lock released here +for key, value in matching_items: + batch[key] = value + if len(batch) >= batch_size: + yield batch # No lock held +``` +**Impact**: Prevents deadlocks in Json→Json migrations and similar scenarios where source/target share locks +**Applicable To**: Any async generator that needs to access shared resources while yielding + ## Architecture Patterns ### 1. Dependency Injection From e95b02fb55d005c5c068a277064c48dbeddc9599 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 8 Nov 2025 20:42:27 +0800 Subject: [PATCH 08/12] Refactor storage selection UI with dynamic numbering and inline prompts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Remove standalone get_user_choice method • Add dynamic sequential numbering • Inline choice validation logic • Remove redundant storage type prints • Improve excluded storage handling --- lightrag/tools/migrate_llm_cache.py | 90 ++++++++++++++--------------- 1 file changed, 43 insertions(+), 47 deletions(-) diff --git a/lightrag/tools/migrate_llm_cache.py b/lightrag/tools/migrate_llm_cache.py index 4c147aae..165ea1f4 100644 --- a/lightrag/tools/migrate_llm_cache.py +++ b/lightrag/tools/migrate_llm_cache.py @@ -816,31 +816,6 @@ class MigrationTool: for key, value in STORAGE_TYPES.items(): print(f"[{key}] {value}") - def get_user_choice( - self, prompt: str, valid_choices: list, allow_exit: bool = False - ) -> str: - """Get user choice with validation - - Args: - prompt: Prompt message - valid_choices: List of valid choices - allow_exit: If True, allow user to press Enter or input '0' to exit - - Returns: - User's choice, or None if user chose to exit - """ - exit_hint = " (Press Enter or 0 to exit)" if allow_exit else "" - while True: - choice = input(f"\n{prompt}{exit_hint}: ").strip() - - # Check for exit - if allow_exit and (choice == "" or choice == "0"): - return None - - if choice in valid_choices: - return choice - print(f"✗ Invalid choice, please enter one of: {', '.join(valid_choices)}") - async def setup_storage( self, storage_type: str, @@ -860,36 +835,58 @@ class MigrationTool: """ print(f"\n=== {storage_type} Storage Setup ===") - # Filter available storage types if exclusion is specified - available_types = STORAGE_TYPES.copy() + # Filter and remap available storage types if exclusion is specified if exclude_storage_name: - # Remove the excluded storage type from available options - available_types = { - k: v for k, v in STORAGE_TYPES.items() if v != exclude_storage_name + # Get available storage types (excluding source) + available_list = [ + (k, v) for k, v in STORAGE_TYPES.items() if v != exclude_storage_name + ] + + # Remap to sequential numbering (1, 2, 3...) + remapped_types = { + str(i + 1): name for i, (_, name) in enumerate(available_list) } - # Print available types - print("\nAvailable Storage Types for Target:") - for key, value in available_types.items(): + # Print available types with new sequential numbers + print( + f"\nAvailable Storage Types for Target (source: {exclude_storage_name} excluded):" + ) + for key, value in remapped_types.items(): print(f"[{key}] {value}") + + available_types = remapped_types else: - # Print all storage types for source + # For source storage, use original numbering + available_types = STORAGE_TYPES.copy() self.print_storage_types() - # Get storage type choice - allow exit for source storage - allow_exit = storage_type == "Source" - choice = self.get_user_choice( - f"Select {storage_type} storage type (1-4)", - list(available_types.keys()), - allow_exit=allow_exit, - ) + # Generate dynamic prompt based on number of options + num_options = len(available_types) + if num_options == 1: + prompt_range = "1" + else: + prompt_range = f"1-{num_options}" - # Handle exit - if choice is None: - print("\n✓ Migration cancelled by user") - return None, None, None, 0 + # Custom input handling with exit support + while True: + choice = input( + f"\nSelect {storage_type} storage type ({prompt_range}) (Press Enter or 0 to exit): " + ).strip() - storage_name = STORAGE_TYPES[choice] + # Check for exit + if choice == "" or choice == "0": + print("\n✓ Migration cancelled by user") + return None, None, None, 0 + + # Check if choice is valid + if choice in available_types: + break + + print( + f"✗ Invalid choice. Please enter one of: {', '.join(available_types.keys())}" + ) + + storage_name = available_types[choice] # Check environment variables print("\nChecking environment variables...") @@ -1174,7 +1171,6 @@ class MigrationTool: # Print header self.print_header() - self.print_storage_types() # Setup source storage with streaming (only count, don't load all data) ( From 1864b282427e98c703656eaebc5fa4e1f54e0566 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 8 Nov 2025 21:16:41 +0800 Subject: [PATCH 09/12] Add colored output formatting to migration confirmation display --- lightrag/tools/migrate_llm_cache.py | 33 +++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/lightrag/tools/migrate_llm_cache.py b/lightrag/tools/migrate_llm_cache.py index 165ea1f4..db6933b2 100644 --- a/lightrag/tools/migrate_llm_cache.py +++ b/lightrag/tools/migrate_llm_cache.py @@ -65,6 +65,10 @@ DEFAULT_BATCH_SIZE = 1000 # Default count batch size for efficient counting DEFAULT_COUNT_BATCH_SIZE = 1000 +# ANSI color codes for terminal output +BOLD_CYAN = "\033[1;36m" +RESET = "\033[0m" + @dataclass class MigrationStats: @@ -816,6 +820,31 @@ class MigrationTool: for key, value in STORAGE_TYPES.items(): print(f"[{key}] {value}") + def format_workspace(self, workspace: str) -> str: + """Format workspace name with highlighting + + Args: + workspace: Workspace name (may be empty) + + Returns: + Formatted workspace string with ANSI color codes + """ + if workspace: + return f"{BOLD_CYAN}{workspace}{RESET}" + else: + return f"{BOLD_CYAN}(default){RESET}" + + def format_storage_name(self, storage_name: str) -> str: + """Format storage type name with highlighting + + Args: + storage_name: Storage type name + + Returns: + Formatted storage name string with ANSI color codes + """ + return f"{BOLD_CYAN}{storage_name}{RESET}" + async def setup_storage( self, storage_type: str, @@ -1212,10 +1241,10 @@ class MigrationTool: print("Migration Confirmation") print("=" * 50) print( - f"Source: {source_storage_name} (workspace: {self.source_workspace if self.source_workspace else '(default)'}) - {source_count:,} records" + f"Source: {self.format_storage_name(source_storage_name)} (workspace: {self.format_workspace(self.source_workspace)}) - {source_count:,} records" ) print( - f"Target: {target_storage_name} (workspace: {self.target_workspace if self.target_workspace else '(default)'}) - {target_count:,} records" + f"Target: {self.format_storage_name(target_storage_name)} (workspace: {self.format_workspace(self.target_workspace)}) - {target_count:,} records" ) print(f"Batch Size: {self.batch_size:,} records/batch") print("Memory Mode: Streaming (memory-optimized)") From 1a91bcdb5f98e946760511c21c818ac4b5ec4181 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 8 Nov 2025 22:48:49 +0800 Subject: [PATCH 10/12] Improve storage config validation and add config.ini fallback support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Add MongoDB env requirements • Support config.ini fallback • Warn on missing env vars • Check available storage count • Show config source info --- lightrag/kg/__init__.py | 27 +++- lightrag/tools/migrate_llm_cache.py | 183 +++++++++++++++++++++++++--- 2 files changed, 189 insertions(+), 21 deletions(-) diff --git a/lightrag/kg/__init__.py b/lightrag/kg/__init__.py index 8d42441a..c62f7a3d 100644 --- a/lightrag/kg/__init__.py +++ b/lightrag/kg/__init__.py @@ -45,13 +45,19 @@ STORAGE_IMPLEMENTATIONS = { STORAGE_ENV_REQUIREMENTS: dict[str, list[str]] = { # KV Storage Implementations "JsonKVStorage": [], - "MongoKVStorage": [], + "MongoKVStorage": [ + "MONGO_URI", + "MONGO_DATABASE", + ], "RedisKVStorage": ["REDIS_URI"], "PGKVStorage": ["POSTGRES_USER", "POSTGRES_PASSWORD", "POSTGRES_DATABASE"], # Graph Storage Implementations "NetworkXStorage": [], "Neo4JStorage": ["NEO4J_URI", "NEO4J_USERNAME", "NEO4J_PASSWORD"], - "MongoGraphStorage": [], + "MongoGraphStorage": [ + "MONGO_URI", + "MONGO_DATABASE", + ], "MemgraphStorage": ["MEMGRAPH_URI"], "AGEStorage": [ "AGE_POSTGRES_DB", @@ -65,17 +71,26 @@ STORAGE_ENV_REQUIREMENTS: dict[str, list[str]] = { ], # Vector Storage Implementations "NanoVectorDBStorage": [], - "MilvusVectorDBStorage": [], - "ChromaVectorDBStorage": [], + "MilvusVectorDBStorage": [ + "MILVUS_URI", + "MILVUS_DB_NAME", + ], + # "ChromaVectorDBStorage": [], "PGVectorStorage": ["POSTGRES_USER", "POSTGRES_PASSWORD", "POSTGRES_DATABASE"], "FaissVectorDBStorage": [], "QdrantVectorDBStorage": ["QDRANT_URL"], # QDRANT_API_KEY has default value None - "MongoVectorDBStorage": [], + "MongoVectorDBStorage": [ + "MONGO_URI", + "MONGO_DATABASE", + ], # Document Status Storage Implementations "JsonDocStatusStorage": [], "RedisDocStatusStorage": ["REDIS_URI"], "PGDocStatusStorage": ["POSTGRES_USER", "POSTGRES_PASSWORD", "POSTGRES_DATABASE"], - "MongoDocStatusStorage": [], + "MongoDocStatusStorage": [ + "MONGO_URI", + "MONGO_DATABASE", + ], } # Storage implementation module mapping diff --git a/lightrag/tools/migrate_llm_cache.py b/lightrag/tools/migrate_llm_cache.py index db6933b2..43dfc9dc 100644 --- a/lightrag/tools/migrate_llm_cache.py +++ b/lightrag/tools/migrate_llm_cache.py @@ -128,27 +128,100 @@ class MigrationTool: workspace = os.getenv("WORKSPACE", "") return workspace - def check_env_vars(self, storage_name: str) -> bool: - """Check if all required environment variables exist + def check_config_ini_for_storage(self, storage_name: str) -> bool: + """Check if config.ini has configuration for the storage type Args: storage_name: Storage implementation name Returns: - True if all required env vars exist, False otherwise + True if config.ini has the necessary configuration + """ + try: + import configparser + + config = configparser.ConfigParser() + config.read("config.ini", "utf-8") + + if storage_name == "RedisKVStorage": + return config.has_option("redis", "uri") + elif storage_name == "PGKVStorage": + return ( + config.has_option("postgres", "user") + and config.has_option("postgres", "password") + and config.has_option("postgres", "database") + ) + elif storage_name == "MongoKVStorage": + return config.has_option("mongodb", "uri") and config.has_option( + "mongodb", "database" + ) + + return False + except Exception: + return False + + def check_env_vars(self, storage_name: str) -> bool: + """Check environment variables, show warnings if missing but don't fail + + Args: + storage_name: Storage implementation name + + Returns: + Always returns True (warnings only, no hard failure) """ required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, []) + + if not required_vars: + print("✓ No environment variables required") + return True + missing_vars = [var for var in required_vars if var not in os.environ] if missing_vars: print( - f"✗ Missing required environment variables: {', '.join(missing_vars)}" + f"⚠️ Warning: Missing environment variables: {', '.join(missing_vars)}" ) - return False + + # Check if config.ini has configuration + has_config = self.check_config_ini_for_storage(storage_name) + if has_config: + print(" ✓ Found configuration in config.ini") + else: + print(f" Will attempt to use defaults for {storage_name}") + + return True print("✓ All required environment variables are set") return True + def count_available_storage_types(self) -> int: + """Count available storage types (with env vars, config.ini, or defaults) + + Returns: + Number of available storage types + """ + available_count = 0 + + for storage_name in STORAGE_TYPES.values(): + # Check if storage requires configuration + required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, []) + + if not required_vars: + # JsonKVStorage, MongoKVStorage etc. - no config needed + available_count += 1 + else: + # Check if has environment variables + has_env = all(var in os.environ for var in required_vars) + if has_env: + available_count += 1 + else: + # Check if has config.ini configuration + has_config = self.check_config_ini_for_storage(storage_name) + if has_config: + available_count += 1 + + return available_count + def get_storage_class(self, storage_name: str): """Dynamically import and return storage class @@ -178,7 +251,7 @@ class MigrationTool: raise ValueError(f"Unsupported storage type: {storage_name}") async def initialize_storage(self, storage_name: str, workspace: str): - """Initialize storage instance + """Initialize storage instance with fallback to config.ini and defaults Args: storage_name: Storage implementation name @@ -186,6 +259,9 @@ class MigrationTool: Returns: Initialized storage instance + + Raises: + Exception: If initialization fails """ storage_class = self.get_storage_class(storage_name) @@ -203,7 +279,7 @@ class MigrationTool: embedding_func=None, ) - # Initialize the storage + # Initialize the storage (may raise exception if connection fails) await storage.initialize() return storage @@ -851,7 +927,7 @@ class MigrationTool: use_streaming: bool = False, exclude_storage_name: str = None, ) -> tuple: - """Setup and initialize storage + """Setup and initialize storage with config.ini fallback support Args: storage_type: Type label (source/target) @@ -917,23 +993,76 @@ class MigrationTool: storage_name = available_types[choice] - # Check environment variables - print("\nChecking environment variables...") - if not self.check_env_vars(storage_name): - return None, None, None, 0 + # Check configuration (warnings only, doesn't block) + print("\nChecking configuration...") + self.check_env_vars(storage_name) # Get workspace workspace = self.get_workspace_for_storage(storage_name) - # Initialize storage + # Initialize storage (real validation point) print(f"\nInitializing {storage_type} storage...") try: storage = await self.initialize_storage(storage_name, workspace) print(f"- Storage Type: {storage_name}") print(f"- Workspace: {workspace if workspace else '(default)'}") print("- Connection Status: ✓ Success") + + # Show configuration source for transparency + if storage_name == "RedisKVStorage": + config_source = ( + "environment variable" + if "REDIS_URI" in os.environ + else "config.ini or default" + ) + print(f"- Configuration Source: {config_source}") + elif storage_name == "PGKVStorage": + config_source = ( + "environment variables" + if all( + var in os.environ + for var in STORAGE_ENV_REQUIREMENTS[storage_name] + ) + else "config.ini or defaults" + ) + print(f"- Configuration Source: {config_source}") + elif storage_name == "MongoKVStorage": + config_source = ( + "environment variables" + if all( + var in os.environ + for var in STORAGE_ENV_REQUIREMENTS[storage_name] + ) + else "config.ini or defaults" + ) + print(f"- Configuration Source: {config_source}") + except Exception as e: print(f"✗ Initialization failed: {e}") + print(f"\nFor {storage_name}, you can configure using:") + print(" 1. Environment variables (highest priority)") + + # Show specific environment variable requirements + if storage_name in STORAGE_ENV_REQUIREMENTS: + for var in STORAGE_ENV_REQUIREMENTS[storage_name]: + print(f" - {var}") + + print(" 2. config.ini file (medium priority)") + if storage_name == "RedisKVStorage": + print(" [redis]") + print(" uri = redis://localhost:6379") + elif storage_name == "PGKVStorage": + print(" [postgres]") + print(" host = localhost") + print(" port = 5432") + print(" user = postgres") + print(" password = yourpassword") + print(" database = lightrag") + elif storage_name == "MongoKVStorage": + print(" [mongodb]") + print(" uri = mongodb://root:root@localhost:27017/") + print(" database = LightRAG") + return None, None, None, 0 # Count cache records efficiently @@ -1191,7 +1320,7 @@ class MigrationTool: print("=" * 60) async def run(self): - """Run the migration tool with streaming approach""" + """Run the migration tool with streaming approach and early validation""" try: # Initialize shared storage (REQUIRED for storage classes to work) from lightrag.kg.shared_storage import initialize_share_data @@ -1213,8 +1342,32 @@ class MigrationTool: if self.source_storage is None: return + # Check if there are at least 2 storage types available + available_count = self.count_available_storage_types() + if available_count <= 1: + print("\n" + "=" * 60) + print("⚠️ Warning: Migration Not Possible") + print("=" * 60) + print(f"Only {available_count} storage type(s) available.") + print("Migration requires at least 2 different storage types.") + print("\nTo enable migration, configure additional storage:") + print(" 1. Set environment variables, OR") + print(" 2. Update config.ini file") + print("\nSupported storage types:") + for name in STORAGE_TYPES.values(): + if name != source_storage_name: + print(f" - {name}") + if name in STORAGE_ENV_REQUIREMENTS: + for var in STORAGE_ENV_REQUIREMENTS[name]: + print(f" Required: {var}") + print("=" * 60) + + # Cleanup + await self.source_storage.finalize() + return + if source_count == 0: - print("\n⚠ Source storage has no cache records to migrate") + print("\n⚠️ Source storage has no cache records to migrate") # Cleanup await self.source_storage.finalize() return From 987bc09cabee3525cbc6d59dd2a77e866338c841 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 8 Nov 2025 23:48:19 +0800 Subject: [PATCH 11/12] Update LLM cache migration docs and improve UX prompts --- ...{README.md => README_EVALUASTION_RAGAS.md} | 0 lightrag/tools/README_MIGRATE_LLM_CACHE.md | 249 +++++++++++------- lightrag/tools/migrate_llm_cache.py | 2 +- 3 files changed, 153 insertions(+), 98 deletions(-) rename lightrag/evaluation/{README.md => README_EVALUASTION_RAGAS.md} (100%) diff --git a/lightrag/evaluation/README.md b/lightrag/evaluation/README_EVALUASTION_RAGAS.md similarity index 100% rename from lightrag/evaluation/README.md rename to lightrag/evaluation/README_EVALUASTION_RAGAS.md diff --git a/lightrag/tools/README_MIGRATE_LLM_CACHE.md b/lightrag/tools/README_MIGRATE_LLM_CACHE.md index b06efa5b..8dd72984 100644 --- a/lightrag/tools/README_MIGRATE_LLM_CACHE.md +++ b/lightrag/tools/README_MIGRATE_LLM_CACHE.md @@ -17,59 +17,11 @@ The tool migrates the following cache types: - `default:extract:*` - Entity and relationship extraction caches - `default:summary:*` - Entity and relationship summary caches -**Note**: Query caches (modes like `local`, `global`, etc.) are NOT migrated. +**Note**: Query caches (modes like `mix`,`local`, `global`, etc.) are NOT migrated. ## Prerequisites -### 1. Environment Variable Configuration - -Ensure the relevant storage environment variables are configured in your `.env` file: - -#### Workspace Configuration (Optional) -```bash -# Generic workspace (shared by all storages) -WORKSPACE=space1 - -# Or configure independent workspace for specific storage -POSTGRES_WORKSPACE=pg_space -MONGODB_WORKSPACE=mongo_space -REDIS_WORKSPACE=redis_space -``` - -**Workspace Priority**: Storage-specific > Generic WORKSPACE > Empty string - -#### JsonKVStorage -```bash -WORKING_DIR=./rag_storage -``` - -#### RedisKVStorage -```bash -REDIS_URI=redis://localhost:6379 -``` - -#### PGKVStorage -```bash -POSTGRES_HOST=localhost -POSTGRES_PORT=5432 -POSTGRES_USER=your_username -POSTGRES_PASSWORD=your_password -POSTGRES_DATABASE=your_database -``` - -#### MongoKVStorage -```bash -MONGO_URI=mongodb://root:root@localhost:27017/ -MONGO_DATABASE=LightRAG -``` - -### 2. Install Dependencies - -Ensure LightRAG and its dependencies are installed: - -```bash -pip install -r requirements.txt -``` +The LLM Cache Migration Tool reads the storage configuration of the LightRAG Server and provides an LLM migration option to select source and destination storage. Ensure that both the source and destination storage have been correctly configured and are accessible via the LightRAG Server before cache migration. ## Usage @@ -95,10 +47,10 @@ Supported KV Storage Types: [3] PGKVStorage [4] MongoKVStorage -Select Source storage type (1-4) (Press Enter or 0 to exit): 1 +Select Source storage type (1-4) (Press Enter to exit): 1 ``` -**Note**: You can press Enter or type `0` at the source storage selection to exit gracefully. +**Note**: You can press Enter or type `0` at any storage selection prompt to exit gracefully. #### 2. Source Storage Validation The tool will: @@ -121,23 +73,43 @@ Counting cache records... ``` **Progress Display by Storage Type:** -- **JsonKVStorage**: Fast in-memory counting, no progress display needed -- **RedisKVStorage**: Real-time scanning progress +- **JsonKVStorage**: Fast in-memory counting, displays final count without incremental progress + ``` + Counting cache records... + - Total: 8,734 records + ``` +- **RedisKVStorage**: Real-time scanning progress with incremental counts ``` Scanning Redis keys... found 8,734 records ``` -- **PostgreSQL**: Shows timing if operation takes >1 second +- **PostgreSQL**: Quick COUNT(*) query, shows timing only if operation takes >1 second ``` Counting PostgreSQL records... (took 2.3s) ``` -- **MongoDB**: Shows timing if operation takes >1 second +- **MongoDB**: Fast count_documents(), shows timing only if operation takes >1 second ``` Counting MongoDB documents... (took 1.8s) ``` #### 3. Select Target Storage Type -Repeat steps 1-2 to select and validate the target storage. +The tool automatically excludes the source storage type from the target selection and renumbers the remaining options sequentially: + +``` +Available Storage Types for Target (source: JsonKVStorage excluded): +[1] RedisKVStorage +[2] PGKVStorage +[3] MongoKVStorage + +Select Target storage type (1-3) (Press Enter or 0 to exit): 1 +``` + +**Important Notes:** +- You **cannot** select the same storage type for both source and target +- Options are automatically renumbered (e.g., [1], [2], [3] instead of [2], [3], [4]) +- You can press Enter or type `0` to exit at this point as well + +The tool then validates the target storage following the same process as the source (checking environment variables, initializing connection, counting records). #### 4. Confirm Migration @@ -147,8 +119,9 @@ Migration Confirmation Source: JsonKVStorage (workspace: space1) - 8,734 records Target: MongoKVStorage (workspace: space1) - 0 records Batch Size: 1,000 records/batch +Memory Mode: Streaming (memory-optimized) -⚠ Warning: Target storage already has 0 records +⚠️ Warning: Target storage already has 0 records Migration will overwrite records with the same keys Continue? (y/n): y @@ -156,18 +129,27 @@ Continue? (y/n): y #### 5. Execute Migration -Observe migration progress: +The tool uses **streaming migration** by default for memory efficiency. Observe migration progress: ``` -=== Starting Migration === -Batch 1/9: ████████░░ 1000/8734 (11%) - default:extract -Batch 2/9: ████████████████░░ 2000/8734 (23%) - default:extract +=== Starting Streaming Migration === +💡 Memory-optimized mode: Processing 1,000 records at a time + +Batch 1/9: ████████░░░░░░░░░░░░ 1000/8734 (11.4%) - default:extract ✓ +Batch 2/9: ████████████░░░░░░░░ 2000/8734 (22.9%) - default:extract ✓ ... -Batch 9/9: ████████████████████ 8734/8734 (100%) - default:summary +Batch 9/9: ████████████████████ 8734/8734 (100.0%) - default:summary ✓ Persisting data to disk... +✓ Data persisted successfully ``` +**Key Features:** +- **Streaming mode**: Processes data in batches without loading entire dataset into memory +- **Real-time progress**: Shows progress bar with precise percentage and cache type +- **Success indicators**: ✓ for successful batches, ✗ for failed batches +- **Constant memory usage**: Handles millions of records efficiently + #### 6. Review Migration Report The tool provides a comprehensive final report showing statistics and any errors encountered: @@ -290,24 +272,96 @@ After migration completes, a detailed report includes: 1. **Data Overwrite Warning** - Migration will overwrite records with the same keys in the target storage - Tool displays a warning if target storage already has data + - Data migration can be performed repeatedly - Pre-existing data in target storage is handled correctly - -2. **Workspace Consistency** - - Recommended to use the same workspace for source and target - - Cache data in different workspaces are completely isolated - 3. **Interrupt and Resume** - Migration can be interrupted at any time (Ctrl+C) - Already migrated data will remain in target storage - Re-running will overwrite existing records - Failed batches can be manually retried - 4. **Performance Considerations** - Large data migration may take considerable time - Recommend migrating during off-peak hours - Ensure stable network connection (for remote databases) - Memory usage stays constant regardless of dataset size +## Storage Configuration + +The tool supports multiple configuration methods with the following priority: + +1. **Environment variables** (highest priority) +2. **config.ini file** (medium priority) +3. **Default values** (lowest priority) + +#### Option A: Environment Variable Configuration + +Configure storage settings in your `.env` file: + +#### Workspace Configuration (Optional) + +```bash +# Generic workspace (shared by all storages) +WORKSPACE=space1 + +# Or configure independent workspace for specific storage +POSTGRES_WORKSPACE=pg_space +MONGODB_WORKSPACE=mongo_space +REDIS_WORKSPACE=redis_space +``` + +**Workspace Priority**: Storage-specific > Generic WORKSPACE > Empty string + +#### JsonKVStorage + +```bash +WORKING_DIR=./rag_storage +``` + +#### RedisKVStorage + +```bash +REDIS_URI=redis://localhost:6379 +``` + +#### PGKVStorage + +```bash +POSTGRES_HOST=localhost +POSTGRES_PORT=5432 +POSTGRES_USER=your_username +POSTGRES_PASSWORD=your_password +POSTGRES_DATABASE=your_database +``` + +#### MongoKVStorage + +```bash +MONGO_URI=mongodb://root:root@localhost:27017/ +MONGO_DATABASE=LightRAG +``` + +#### Option B: config.ini Configuration + +Alternatively, create a `config.ini` file in the project root: + +```ini +[redis] +uri = redis://localhost:6379 + +[postgres] +host = localhost +port = 5432 +user = postgres +password = yourpassword +database = lightrag + +[mongodb] +uri = mongodb://root:root@localhost:27017/ +database = LightRAG +``` + +**Note**: Environment variables take precedence over config.ini settings. JsonKVStorage uses `WORKING_DIR` environment variable or defaults to `./rag_storage`. + ## Troubleshooting ### Missing Environment Variables @@ -345,22 +399,12 @@ MONGO_DATABASE=LightRAG # 2. Run tool python -m lightrag.tools.migrate_llm_cache -# 3. Select: 1 (JsonKVStorage) -> 4 (MongoKVStorage) +# 3. Select: 1 (JsonKVStorage) -> 1 (MongoKVStorage - renumbered from 4) ``` -### Scenario 2: PostgreSQL Database Switch +**Note**: After selecting JsonKVStorage as source, MongoKVStorage will be shown as option [1] in the target selection since options are renumbered after excluding the source. -Use case: Database migration or upgrade - -```bash -# 1. Configure old and new databases -POSTGRES_WORKSPACE=old_db # Source -# ... Configure new database as default - -# 2. Run tool and select same storage type -``` - -### Scenario 3: Redis to PostgreSQL +### Scenario 2: Redis to PostgreSQL Use case: Migrating from cache storage to relational database @@ -373,20 +417,38 @@ POSTGRES_HOST=new-postgres-server # 2. Run tool python -m lightrag.tools.migrate_llm_cache -# 3. Select: 2 (RedisKVStorage) -> 3 (PGKVStorage) +# 3. Select: 2 (RedisKVStorage) -> 2 (PGKVStorage - renumbered from 3) ``` +**Note**: After selecting RedisKVStorage as source, PGKVStorage will be shown as option [2] in the target selection. + +### Scenario 3: Different Workspaces Migration + +Use case: Migrating data between different workspace environments + +```bash +# Configure separate workspaces for source and target +POSTGRES_WORKSPACE=dev_workspace # For development environment +MONGODB_WORKSPACE=prod_workspace # For production environment + +# Run tool +python -m lightrag.tools.migrate_llm_cache + +# Select: 3 (PGKVStorage with dev_workspace) -> 3 (MongoKVStorage with prod_workspace) +``` + +**Note**: This allows you to migrate between different logical data partitions while changing storage backends. + ## Tool Limitations -1. **Only Default Mode Caches** +1. **Same Storage Type Not Allowed** + - You cannot migrate between the same storage type (e.g., PostgreSQL to PostgreSQL) + - This is enforced by the tool automatically excluding the source storage type from target selection + - For same-storage migrations (e.g., database switches), use database-native tools instead +2. **Only Default Mode Caches** - Only migrates `default:extract:*` and `default:summary:*` - Query caches are not included - -2. **Workspace Isolation** - - Different workspaces are treated as completely separate - - Cross-workspace migration requires manual workspace reconfiguration - -3. **Network Dependency** +4. **Network Dependency** - Tool requires stable network connection for remote databases - Large datasets may fail if connection is interrupted @@ -407,10 +469,3 @@ python -m lightrag.tools.migrate_llm_cache 4. **Clean Old Data** - After successful migration, consider cleaning old cache data - Keep backups for a reasonable period before deletion - -## Support - -For issues or questions: -- Check LightRAG documentation -- Review error logs for detailed information -- Ensure all environment variables are correctly configured diff --git a/lightrag/tools/migrate_llm_cache.py b/lightrag/tools/migrate_llm_cache.py index 43dfc9dc..942f244c 100644 --- a/lightrag/tools/migrate_llm_cache.py +++ b/lightrag/tools/migrate_llm_cache.py @@ -975,7 +975,7 @@ class MigrationTool: # Custom input handling with exit support while True: choice = input( - f"\nSelect {storage_type} storage type ({prompt_range}) (Press Enter or 0 to exit): " + f"\nSelect {storage_type} storage type ({prompt_range}) (Press Enter to exit): " ).strip() # Check for exit From a75efb06dcb6a783bfb39e039d2f4de53e162754 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 9 Nov 2025 00:02:19 +0800 Subject: [PATCH 12/12] Fix: prevent source data corruption by target upsert function MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Prevent mutations bugs by using copy() when storing cache values • Protect filtered cache data and ensure batch data isolation --- lightrag/tools/migrate_llm_cache.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lightrag/tools/migrate_llm_cache.py b/lightrag/tools/migrate_llm_cache.py index 942f244c..b0d4823c 100644 --- a/lightrag/tools/migrate_llm_cache.py +++ b/lightrag/tools/migrate_llm_cache.py @@ -300,7 +300,7 @@ class MigrationTool: if key.startswith("default:extract:") or key.startswith( "default:summary:" ): - filtered[key] = value + filtered[key] = value.copy() return filtered async def get_default_caches_redis( @@ -475,7 +475,7 @@ class MigrationTool: for field_name in ["namespace", "workspace", "_id", "content"]: doc_copy.pop(field_name, None) - cache_data[key] = doc_copy + cache_data[key] = doc_copy.copy() # Periodically yield control (every batch_size documents) if len(cache_data) % batch_size == 0: @@ -660,7 +660,7 @@ class MigrationTool: # Now iterate over snapshot without holding lock batch = {} for key, value in matching_items: - batch[key] = value + batch[key] = value.copy() if len(batch) >= batch_size: yield batch batch = {} @@ -821,7 +821,7 @@ class MigrationTool: for field_name in ["namespace", "workspace", "_id", "content"]: doc_copy.pop(field_name, None) - batch[key] = doc_copy + batch[key] = doc_copy.copy() if len(batch) >= batch_size: yield batch