Fix deadlock in JSON cache migration and prevent same storage selection

- Snapshot JSON data before yielding batches
- Release lock during batch processing
- Exclude source type from target selection
- Add detailed docstring for lock behavior
- Filter available storage types properly

(cherry picked from commit 5be04263b2)
This commit is contained in:
yangdx 2025-11-08 19:58:36 +08:00 committed by Raphaël MANSUY
parent 5a5e583b9c
commit fa5510e6f6

View file

@ -65,10 +65,6 @@ DEFAULT_BATCH_SIZE = 1000
# Default count batch size for efficient counting
DEFAULT_COUNT_BATCH_SIZE = 1000
# ANSI color codes for terminal output
BOLD_CYAN = "\033[1;36m"
RESET = "\033[0m"
@dataclass
class MigrationStats:
@ -128,100 +124,27 @@ class MigrationTool:
workspace = os.getenv("WORKSPACE", "")
return workspace
def check_config_ini_for_storage(self, storage_name: str) -> bool:
"""Check if config.ini has configuration for the storage type
Args:
storage_name: Storage implementation name
Returns:
True if config.ini has the necessary configuration
"""
try:
import configparser
config = configparser.ConfigParser()
config.read("config.ini", "utf-8")
if storage_name == "RedisKVStorage":
return config.has_option("redis", "uri")
elif storage_name == "PGKVStorage":
return (
config.has_option("postgres", "user")
and config.has_option("postgres", "password")
and config.has_option("postgres", "database")
)
elif storage_name == "MongoKVStorage":
return config.has_option("mongodb", "uri") and config.has_option(
"mongodb", "database"
)
return False
except Exception:
return False
def check_env_vars(self, storage_name: str) -> bool:
"""Check environment variables, show warnings if missing but don't fail
"""Check if all required environment variables exist
Args:
storage_name: Storage implementation name
Returns:
Always returns True (warnings only, no hard failure)
True if all required env vars exist, False otherwise
"""
required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, [])
if not required_vars:
print("✓ No environment variables required")
return True
missing_vars = [var for var in required_vars if var not in os.environ]
if missing_vars:
print(
f"⚠️ Warning: Missing environment variables: {', '.join(missing_vars)}"
f"✗ Missing required environment variables: {', '.join(missing_vars)}"
)
# Check if config.ini has configuration
has_config = self.check_config_ini_for_storage(storage_name)
if has_config:
print(" ✓ Found configuration in config.ini")
else:
print(f" Will attempt to use defaults for {storage_name}")
return True
return False
print("✓ All required environment variables are set")
return True
def count_available_storage_types(self) -> int:
"""Count available storage types (with env vars, config.ini, or defaults)
Returns:
Number of available storage types
"""
available_count = 0
for storage_name in STORAGE_TYPES.values():
# Check if storage requires configuration
required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, [])
if not required_vars:
# JsonKVStorage, MongoKVStorage etc. - no config needed
available_count += 1
else:
# Check if has environment variables
has_env = all(var in os.environ for var in required_vars)
if has_env:
available_count += 1
else:
# Check if has config.ini configuration
has_config = self.check_config_ini_for_storage(storage_name)
if has_config:
available_count += 1
return available_count
def get_storage_class(self, storage_name: str):
"""Dynamically import and return storage class
@ -251,7 +174,7 @@ class MigrationTool:
raise ValueError(f"Unsupported storage type: {storage_name}")
async def initialize_storage(self, storage_name: str, workspace: str):
"""Initialize storage instance with fallback to config.ini and defaults
"""Initialize storage instance
Args:
storage_name: Storage implementation name
@ -259,9 +182,6 @@ class MigrationTool:
Returns:
Initialized storage instance
Raises:
Exception: If initialization fails
"""
storage_class = self.get_storage_class(storage_name)
@ -279,7 +199,7 @@ class MigrationTool:
embedding_func=None,
)
# Initialize the storage (may raise exception if connection fails)
# Initialize the storage
await storage.initialize()
return storage
@ -896,30 +816,30 @@ class MigrationTool:
for key, value in STORAGE_TYPES.items():
print(f"[{key}] {value}")
def format_workspace(self, workspace: str) -> str:
"""Format workspace name with highlighting
def get_user_choice(
self, prompt: str, valid_choices: list, allow_exit: bool = False
) -> str:
"""Get user choice with validation
Args:
workspace: Workspace name (may be empty)
prompt: Prompt message
valid_choices: List of valid choices
allow_exit: If True, allow user to press Enter or input '0' to exit
Returns:
Formatted workspace string with ANSI color codes
User's choice, or None if user chose to exit
"""
if workspace:
return f"{BOLD_CYAN}{workspace}{RESET}"
else:
return f"{BOLD_CYAN}(default){RESET}"
exit_hint = " (Press Enter or 0 to exit)" if allow_exit else ""
while True:
choice = input(f"\n{prompt}{exit_hint}: ").strip()
def format_storage_name(self, storage_name: str) -> str:
"""Format storage type name with highlighting
# Check for exit
if allow_exit and (choice == "" or choice == "0"):
return None
Args:
storage_name: Storage type name
Returns:
Formatted storage name string with ANSI color codes
"""
return f"{BOLD_CYAN}{storage_name}{RESET}"
if choice in valid_choices:
return choice
print(f"✗ Invalid choice, please enter one of: {', '.join(valid_choices)}")
async def setup_storage(
self,
@ -927,7 +847,7 @@ class MigrationTool:
use_streaming: bool = False,
exclude_storage_name: str = None,
) -> tuple:
"""Setup and initialize storage with config.ini fallback support
"""Setup and initialize storage
Args:
storage_type: Type label (source/target)
@ -940,129 +860,54 @@ class MigrationTool:
"""
print(f"\n=== {storage_type} Storage Setup ===")
# Filter and remap available storage types if exclusion is specified
# Filter available storage types if exclusion is specified
available_types = STORAGE_TYPES.copy()
if exclude_storage_name:
# Get available storage types (excluding source)
available_list = [
(k, v) for k, v in STORAGE_TYPES.items() if v != exclude_storage_name
]
# Remap to sequential numbering (1, 2, 3...)
remapped_types = {
str(i + 1): name for i, (_, name) in enumerate(available_list)
# Remove the excluded storage type from available options
available_types = {
k: v for k, v in STORAGE_TYPES.items() if v != exclude_storage_name
}
# Print available types with new sequential numbers
print(
f"\nAvailable Storage Types for Target (source: {exclude_storage_name} excluded):"
)
for key, value in remapped_types.items():
# Print available types
print("\nAvailable Storage Types for Target:")
for key, value in available_types.items():
print(f"[{key}] {value}")
available_types = remapped_types
else:
# For source storage, use original numbering
available_types = STORAGE_TYPES.copy()
# Print all storage types for source
self.print_storage_types()
# Generate dynamic prompt based on number of options
num_options = len(available_types)
if num_options == 1:
prompt_range = "1"
else:
prompt_range = f"1-{num_options}"
# Get storage type choice - allow exit for source storage
allow_exit = storage_type == "Source"
choice = self.get_user_choice(
f"Select {storage_type} storage type (1-4)",
list(available_types.keys()),
allow_exit=allow_exit,
)
# Custom input handling with exit support
while True:
choice = input(
f"\nSelect {storage_type} storage type ({prompt_range}) (Press Enter or 0 to exit): "
).strip()
# Handle exit
if choice is None:
print("\n✓ Migration cancelled by user")
return None, None, None, 0
# Check for exit
if choice == "" or choice == "0":
print("\n✓ Migration cancelled by user")
return None, None, None, 0
storage_name = STORAGE_TYPES[choice]
# Check if choice is valid
if choice in available_types:
break
print(
f"✗ Invalid choice. Please enter one of: {', '.join(available_types.keys())}"
)
storage_name = available_types[choice]
# Check configuration (warnings only, doesn't block)
print("\nChecking configuration...")
self.check_env_vars(storage_name)
# Check environment variables
print("\nChecking environment variables...")
if not self.check_env_vars(storage_name):
return None, None, None, 0
# Get workspace
workspace = self.get_workspace_for_storage(storage_name)
# Initialize storage (real validation point)
# Initialize storage
print(f"\nInitializing {storage_type} storage...")
try:
storage = await self.initialize_storage(storage_name, workspace)
print(f"- Storage Type: {storage_name}")
print(f"- Workspace: {workspace if workspace else '(default)'}")
print("- Connection Status: ✓ Success")
# Show configuration source for transparency
if storage_name == "RedisKVStorage":
config_source = (
"environment variable"
if "REDIS_URI" in os.environ
else "config.ini or default"
)
print(f"- Configuration Source: {config_source}")
elif storage_name == "PGKVStorage":
config_source = (
"environment variables"
if all(
var in os.environ
for var in STORAGE_ENV_REQUIREMENTS[storage_name]
)
else "config.ini or defaults"
)
print(f"- Configuration Source: {config_source}")
elif storage_name == "MongoKVStorage":
config_source = (
"environment variables"
if all(
var in os.environ
for var in STORAGE_ENV_REQUIREMENTS[storage_name]
)
else "config.ini or defaults"
)
print(f"- Configuration Source: {config_source}")
except Exception as e:
print(f"✗ Initialization failed: {e}")
print(f"\nFor {storage_name}, you can configure using:")
print(" 1. Environment variables (highest priority)")
# Show specific environment variable requirements
if storage_name in STORAGE_ENV_REQUIREMENTS:
for var in STORAGE_ENV_REQUIREMENTS[storage_name]:
print(f" - {var}")
print(" 2. config.ini file (medium priority)")
if storage_name == "RedisKVStorage":
print(" [redis]")
print(" uri = redis://localhost:6379")
elif storage_name == "PGKVStorage":
print(" [postgres]")
print(" host = localhost")
print(" port = 5432")
print(" user = postgres")
print(" password = yourpassword")
print(" database = lightrag")
elif storage_name == "MongoKVStorage":
print(" [mongodb]")
print(" uri = mongodb://root:root@localhost:27017/")
print(" database = LightRAG")
return None, None, None, 0
# Count cache records efficiently
@ -1320,7 +1165,7 @@ class MigrationTool:
print("=" * 60)
async def run(self):
"""Run the migration tool with streaming approach and early validation"""
"""Run the migration tool with streaming approach"""
try:
# Initialize shared storage (REQUIRED for storage classes to work)
from lightrag.kg.shared_storage import initialize_share_data
@ -1329,6 +1174,7 @@ class MigrationTool:
# Print header
self.print_header()
self.print_storage_types()
# Setup source storage with streaming (only count, don't load all data)
(
@ -1342,32 +1188,8 @@ class MigrationTool:
if self.source_storage is None:
return
# Check if there are at least 2 storage types available
available_count = self.count_available_storage_types()
if available_count <= 1:
print("\n" + "=" * 60)
print("⚠️ Warning: Migration Not Possible")
print("=" * 60)
print(f"Only {available_count} storage type(s) available.")
print("Migration requires at least 2 different storage types.")
print("\nTo enable migration, configure additional storage:")
print(" 1. Set environment variables, OR")
print(" 2. Update config.ini file")
print("\nSupported storage types:")
for name in STORAGE_TYPES.values():
if name != source_storage_name:
print(f" - {name}")
if name in STORAGE_ENV_REQUIREMENTS:
for var in STORAGE_ENV_REQUIREMENTS[name]:
print(f" Required: {var}")
print("=" * 60)
# Cleanup
await self.source_storage.finalize()
return
if source_count == 0:
print("\n Source storage has no cache records to migrate")
print("\n⚠ Source storage has no cache records to migrate")
# Cleanup
await self.source_storage.finalize()
return
@ -1394,10 +1216,10 @@ class MigrationTool:
print("Migration Confirmation")
print("=" * 50)
print(
f"Source: {self.format_storage_name(source_storage_name)} (workspace: {self.format_workspace(self.source_workspace)}) - {source_count:,} records"
f"Source: {source_storage_name} (workspace: {self.source_workspace if self.source_workspace else '(default)'}) - {source_count:,} records"
)
print(
f"Target: {self.format_storage_name(target_storage_name)} (workspace: {self.format_workspace(self.target_workspace)}) - {target_count:,} records"
f"Target: {target_storage_name} (workspace: {self.target_workspace if self.target_workspace else '(default)'}) - {target_count:,} records"
)
print(f"Batch Size: {self.batch_size:,} records/batch")
print("Memory Mode: Streaming (memory-optimized)")