From c0dfe65d1aa839d9dec980d9f1efea4b4df19cbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20MANSUY?= Date: Thu, 4 Dec 2025 19:14:30 +0800 Subject: [PATCH] cherry-pick 6b9f13c7 --- lightrag/tools/migrate_llm_cache.py | 134 +++++++++++----------------- tools/README_MIGRATE_LLM_CACHE.md | 25 ++++-- 2 files changed, 70 insertions(+), 89 deletions(-) diff --git a/lightrag/tools/migrate_llm_cache.py b/lightrag/tools/migrate_llm_cache.py index 165ea1f4..a339d985 100644 --- a/lightrag/tools/migrate_llm_cache.py +++ b/lightrag/tools/migrate_llm_cache.py @@ -561,33 +561,20 @@ class MigrationTool: Yields: Dictionary batches of cache entries - - Note: - This method creates a snapshot of matching items while holding the lock, - then releases the lock before yielding batches. This prevents deadlock - when the target storage (also JsonKVStorage) tries to acquire the same - lock during upsert operations. """ - # Create a snapshot of matching items while holding the lock async with storage._storage_lock: - matching_items = [ - (key, value) - for key, value in storage._data.items() - if key.startswith("default:extract:") - or key.startswith("default:summary:") - ] - - # Now iterate over snapshot without holding lock - batch = {} - for key, value in matching_items: - batch[key] = value - if len(batch) >= batch_size: + batch = {} + for key, value in storage._data.items(): + if key.startswith("default:extract:") or key.startswith( + "default:summary:" + ): + batch[key] = value + if len(batch) >= batch_size: + yield batch + batch = {} + # Yield remaining items + if batch: yield batch - batch = {} - - # Yield remaining items - if batch: - yield batch async def stream_default_caches_redis(self, storage, batch_size: int): """Stream default caches from RedisKVStorage - yields batches @@ -816,18 +803,39 @@ class MigrationTool: for key, value in STORAGE_TYPES.items(): print(f"[{key}] {value}") + def get_user_choice( + self, prompt: str, valid_choices: list, allow_exit: bool = False + ) -> str: + """Get user choice with validation + + Args: + prompt: Prompt message + valid_choices: List of valid choices + allow_exit: If True, allow user to press Enter or input '0' to exit + + Returns: + User's choice, or None if user chose to exit + """ + exit_hint = " (Press Enter or 0 to exit)" if allow_exit else "" + while True: + choice = input(f"\n{prompt}{exit_hint}: ").strip() + + # Check for exit + if allow_exit and (choice == "" or choice == "0"): + return None + + if choice in valid_choices: + return choice + print(f"✗ Invalid choice, please enter one of: {', '.join(valid_choices)}") + async def setup_storage( - self, - storage_type: str, - use_streaming: bool = False, - exclude_storage_name: str = None, + self, storage_type: str, use_streaming: bool = False ) -> tuple: """Setup and initialize storage Args: storage_type: Type label (source/target) use_streaming: If True, only count records without loading. If False, load all data (legacy mode) - exclude_storage_name: Storage type to exclude from selection (e.g., to prevent selecting same as source) Returns: Tuple of (storage_instance, storage_name, workspace, total_count) @@ -835,58 +843,20 @@ class MigrationTool: """ print(f"\n=== {storage_type} Storage Setup ===") - # Filter and remap available storage types if exclusion is specified - if exclude_storage_name: - # Get available storage types (excluding source) - available_list = [ - (k, v) for k, v in STORAGE_TYPES.items() if v != exclude_storage_name - ] + # Get storage type choice - allow exit for source storage + allow_exit = storage_type == "Source" + choice = self.get_user_choice( + f"Select {storage_type} storage type (1-4)", + list(STORAGE_TYPES.keys()), + allow_exit=allow_exit, + ) - # Remap to sequential numbering (1, 2, 3...) - remapped_types = { - str(i + 1): name for i, (_, name) in enumerate(available_list) - } + # Handle exit + if choice is None: + print("\n✓ Migration cancelled by user") + return None, None, None, 0 - # Print available types with new sequential numbers - print( - f"\nAvailable Storage Types for Target (source: {exclude_storage_name} excluded):" - ) - for key, value in remapped_types.items(): - print(f"[{key}] {value}") - - available_types = remapped_types - else: - # For source storage, use original numbering - available_types = STORAGE_TYPES.copy() - self.print_storage_types() - - # Generate dynamic prompt based on number of options - num_options = len(available_types) - if num_options == 1: - prompt_range = "1" - else: - prompt_range = f"1-{num_options}" - - # Custom input handling with exit support - while True: - choice = input( - f"\nSelect {storage_type} storage type ({prompt_range}) (Press Enter or 0 to exit): " - ).strip() - - # Check for exit - if choice == "" or choice == "0": - print("\n✓ Migration cancelled by user") - return None, None, None, 0 - - # Check if choice is valid - if choice in available_types: - break - - print( - f"✗ Invalid choice. Please enter one of: {', '.join(available_types.keys())}" - ) - - storage_name = available_types[choice] + storage_name = STORAGE_TYPES[choice] # Check environment variables print("\nChecking environment variables...") @@ -1171,6 +1141,7 @@ class MigrationTool: # Print header self.print_header() + self.print_storage_types() # Setup source storage with streaming (only count, don't load all data) ( @@ -1191,15 +1162,12 @@ class MigrationTool: return # Setup target storage with streaming (only count, don't load all data) - # Exclude source storage type from target selection ( self.target_storage, target_storage_name, self.target_workspace, target_count, - ) = await self.setup_storage( - "Target", use_streaming=True, exclude_storage_name=source_storage_name - ) + ) = await self.setup_storage("Target", use_streaming=True) if not self.target_storage: print("\n✗ Target storage setup failed") diff --git a/tools/README_MIGRATE_LLM_CACHE.md b/tools/README_MIGRATE_LLM_CACHE.md index edd49acd..594bc771 100644 --- a/tools/README_MIGRATE_LLM_CACHE.md +++ b/tools/README_MIGRATE_LLM_CACHE.md @@ -93,9 +93,11 @@ Supported KV Storage Types: [3] PGKVStorage [4] MongoKVStorage -Select Source storage type (1-4): 1 +Select Source storage type (1-4) (Press Enter or 0 to exit): 1 ``` +**Note**: You can press Enter or type `0` at the source storage selection to exit gracefully. + #### 2. Source Storage Validation The tool will: - Check required environment variables @@ -113,22 +115,33 @@ Initializing Source storage... - Connection Status: ✓ Success Counting cache records... -- default:extract: 8,500 records -- default:summary: 234 records - Total: 8,734 records ``` +**Progress Display by Storage Type:** +- **JsonKVStorage**: Fast in-memory counting, no progress display needed +- **RedisKVStorage**: Real-time scanning progress + ``` + Scanning Redis keys... found 8,734 records + ``` +- **PostgreSQL**: Shows timing if operation takes >1 second + ``` + Counting PostgreSQL records... (took 2.3s) + ``` +- **MongoDB**: Shows timing if operation takes >1 second + ``` + Counting MongoDB documents... (took 1.8s) + ``` + #### 3. Select Target Storage Type Repeat steps 1-2 to select and validate the target storage. #### 4. Confirm Migration -Review the migration summary and confirm: - ``` -Migration Confirmation ================================================== +Migration Confirmation Source: JsonKVStorage (workspace: space1) - 8,734 records Target: MongoKVStorage (workspace: space1) - 0 records Batch Size: 1,000 records/batch