From 5be04263b2f81a147937ddc2c102869b9b521e17 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sat, 8 Nov 2025 19:58:36 +0800 Subject: [PATCH] Fix deadlock in JSON cache migration and prevent same storage selection - Snapshot JSON data before yielding batches - Release lock during batch processing - Exclude source type from target selection - Add detailed docstring for lock behavior - Filter available storage types properly --- lightrag/tools/migrate_llm_cache.py | 64 ++++++++++++++++++++++------- 1 file changed, 50 insertions(+), 14 deletions(-) diff --git a/lightrag/tools/migrate_llm_cache.py b/lightrag/tools/migrate_llm_cache.py index a339d985..4c147aae 100644 --- a/lightrag/tools/migrate_llm_cache.py +++ b/lightrag/tools/migrate_llm_cache.py @@ -561,20 +561,33 @@ class MigrationTool: Yields: Dictionary batches of cache entries + + Note: + This method creates a snapshot of matching items while holding the lock, + then releases the lock before yielding batches. This prevents deadlock + when the target storage (also JsonKVStorage) tries to acquire the same + lock during upsert operations. """ + # Create a snapshot of matching items while holding the lock async with storage._storage_lock: - batch = {} - for key, value in storage._data.items(): - if key.startswith("default:extract:") or key.startswith( - "default:summary:" - ): - batch[key] = value - if len(batch) >= batch_size: - yield batch - batch = {} - # Yield remaining items - if batch: + matching_items = [ + (key, value) + for key, value in storage._data.items() + if key.startswith("default:extract:") + or key.startswith("default:summary:") + ] + + # Now iterate over snapshot without holding lock + batch = {} + for key, value in matching_items: + batch[key] = value + if len(batch) >= batch_size: yield batch + batch = {} + + # Yield remaining items + if batch: + yield batch async def stream_default_caches_redis(self, storage, batch_size: int): """Stream default caches from RedisKVStorage - yields batches @@ -829,13 +842,17 @@ class MigrationTool: print(f"āœ— Invalid choice, please enter one of: {', '.join(valid_choices)}") async def setup_storage( - self, storage_type: str, use_streaming: bool = False + self, + storage_type: str, + use_streaming: bool = False, + exclude_storage_name: str = None, ) -> tuple: """Setup and initialize storage Args: storage_type: Type label (source/target) use_streaming: If True, only count records without loading. If False, load all data (legacy mode) + exclude_storage_name: Storage type to exclude from selection (e.g., to prevent selecting same as source) Returns: Tuple of (storage_instance, storage_name, workspace, total_count) @@ -843,11 +860,27 @@ class MigrationTool: """ print(f"\n=== {storage_type} Storage Setup ===") + # Filter available storage types if exclusion is specified + available_types = STORAGE_TYPES.copy() + if exclude_storage_name: + # Remove the excluded storage type from available options + available_types = { + k: v for k, v in STORAGE_TYPES.items() if v != exclude_storage_name + } + + # Print available types + print("\nAvailable Storage Types for Target:") + for key, value in available_types.items(): + print(f"[{key}] {value}") + else: + # Print all storage types for source + self.print_storage_types() + # Get storage type choice - allow exit for source storage allow_exit = storage_type == "Source" choice = self.get_user_choice( f"Select {storage_type} storage type (1-4)", - list(STORAGE_TYPES.keys()), + list(available_types.keys()), allow_exit=allow_exit, ) @@ -1162,12 +1195,15 @@ class MigrationTool: return # Setup target storage with streaming (only count, don't load all data) + # Exclude source storage type from target selection ( self.target_storage, target_storage_name, self.target_workspace, target_count, - ) = await self.setup_storage("Target", use_streaming=True) + ) = await self.setup_storage( + "Target", use_streaming=True, exclude_storage_name=source_storage_name + ) if not self.target_storage: print("\nāœ— Target storage setup failed")