diff --git a/lightrag/tools/migrate_llm_cache.py b/lightrag/tools/migrate_llm_cache.py index 43dfc9dc..4c147aae 100644 --- a/lightrag/tools/migrate_llm_cache.py +++ b/lightrag/tools/migrate_llm_cache.py @@ -65,10 +65,6 @@ DEFAULT_BATCH_SIZE = 1000 # Default count batch size for efficient counting DEFAULT_COUNT_BATCH_SIZE = 1000 -# ANSI color codes for terminal output -BOLD_CYAN = "\033[1;36m" -RESET = "\033[0m" - @dataclass class MigrationStats: @@ -128,100 +124,27 @@ class MigrationTool: workspace = os.getenv("WORKSPACE", "") return workspace - def check_config_ini_for_storage(self, storage_name: str) -> bool: - """Check if config.ini has configuration for the storage type - - Args: - storage_name: Storage implementation name - - Returns: - True if config.ini has the necessary configuration - """ - try: - import configparser - - config = configparser.ConfigParser() - config.read("config.ini", "utf-8") - - if storage_name == "RedisKVStorage": - return config.has_option("redis", "uri") - elif storage_name == "PGKVStorage": - return ( - config.has_option("postgres", "user") - and config.has_option("postgres", "password") - and config.has_option("postgres", "database") - ) - elif storage_name == "MongoKVStorage": - return config.has_option("mongodb", "uri") and config.has_option( - "mongodb", "database" - ) - - return False - except Exception: - return False - def check_env_vars(self, storage_name: str) -> bool: - """Check environment variables, show warnings if missing but don't fail + """Check if all required environment variables exist Args: storage_name: Storage implementation name Returns: - Always returns True (warnings only, no hard failure) + True if all required env vars exist, False otherwise """ required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, []) - - if not required_vars: - print("✓ No environment variables required") - return True - missing_vars = [var for var in required_vars if var not in os.environ] if missing_vars: print( - f"⚠️ Warning: Missing environment variables: {', '.join(missing_vars)}" + f"✗ Missing required environment variables: {', '.join(missing_vars)}" ) - - # Check if config.ini has configuration - has_config = self.check_config_ini_for_storage(storage_name) - if has_config: - print(" ✓ Found configuration in config.ini") - else: - print(f" Will attempt to use defaults for {storage_name}") - - return True + return False print("✓ All required environment variables are set") return True - def count_available_storage_types(self) -> int: - """Count available storage types (with env vars, config.ini, or defaults) - - Returns: - Number of available storage types - """ - available_count = 0 - - for storage_name in STORAGE_TYPES.values(): - # Check if storage requires configuration - required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, []) - - if not required_vars: - # JsonKVStorage, MongoKVStorage etc. - no config needed - available_count += 1 - else: - # Check if has environment variables - has_env = all(var in os.environ for var in required_vars) - if has_env: - available_count += 1 - else: - # Check if has config.ini configuration - has_config = self.check_config_ini_for_storage(storage_name) - if has_config: - available_count += 1 - - return available_count - def get_storage_class(self, storage_name: str): """Dynamically import and return storage class @@ -251,7 +174,7 @@ class MigrationTool: raise ValueError(f"Unsupported storage type: {storage_name}") async def initialize_storage(self, storage_name: str, workspace: str): - """Initialize storage instance with fallback to config.ini and defaults + """Initialize storage instance Args: storage_name: Storage implementation name @@ -259,9 +182,6 @@ class MigrationTool: Returns: Initialized storage instance - - Raises: - Exception: If initialization fails """ storage_class = self.get_storage_class(storage_name) @@ -279,7 +199,7 @@ class MigrationTool: embedding_func=None, ) - # Initialize the storage (may raise exception if connection fails) + # Initialize the storage await storage.initialize() return storage @@ -896,30 +816,30 @@ class MigrationTool: for key, value in STORAGE_TYPES.items(): print(f"[{key}] {value}") - def format_workspace(self, workspace: str) -> str: - """Format workspace name with highlighting + def get_user_choice( + self, prompt: str, valid_choices: list, allow_exit: bool = False + ) -> str: + """Get user choice with validation Args: - workspace: Workspace name (may be empty) + prompt: Prompt message + valid_choices: List of valid choices + allow_exit: If True, allow user to press Enter or input '0' to exit Returns: - Formatted workspace string with ANSI color codes + User's choice, or None if user chose to exit """ - if workspace: - return f"{BOLD_CYAN}{workspace}{RESET}" - else: - return f"{BOLD_CYAN}(default){RESET}" + exit_hint = " (Press Enter or 0 to exit)" if allow_exit else "" + while True: + choice = input(f"\n{prompt}{exit_hint}: ").strip() - def format_storage_name(self, storage_name: str) -> str: - """Format storage type name with highlighting + # Check for exit + if allow_exit and (choice == "" or choice == "0"): + return None - Args: - storage_name: Storage type name - - Returns: - Formatted storage name string with ANSI color codes - """ - return f"{BOLD_CYAN}{storage_name}{RESET}" + if choice in valid_choices: + return choice + print(f"✗ Invalid choice, please enter one of: {', '.join(valid_choices)}") async def setup_storage( self, @@ -927,7 +847,7 @@ class MigrationTool: use_streaming: bool = False, exclude_storage_name: str = None, ) -> tuple: - """Setup and initialize storage with config.ini fallback support + """Setup and initialize storage Args: storage_type: Type label (source/target) @@ -940,129 +860,54 @@ class MigrationTool: """ print(f"\n=== {storage_type} Storage Setup ===") - # Filter and remap available storage types if exclusion is specified + # Filter available storage types if exclusion is specified + available_types = STORAGE_TYPES.copy() if exclude_storage_name: - # Get available storage types (excluding source) - available_list = [ - (k, v) for k, v in STORAGE_TYPES.items() if v != exclude_storage_name - ] - - # Remap to sequential numbering (1, 2, 3...) - remapped_types = { - str(i + 1): name for i, (_, name) in enumerate(available_list) + # Remove the excluded storage type from available options + available_types = { + k: v for k, v in STORAGE_TYPES.items() if v != exclude_storage_name } - # Print available types with new sequential numbers - print( - f"\nAvailable Storage Types for Target (source: {exclude_storage_name} excluded):" - ) - for key, value in remapped_types.items(): + # Print available types + print("\nAvailable Storage Types for Target:") + for key, value in available_types.items(): print(f"[{key}] {value}") - - available_types = remapped_types else: - # For source storage, use original numbering - available_types = STORAGE_TYPES.copy() + # Print all storage types for source self.print_storage_types() - # Generate dynamic prompt based on number of options - num_options = len(available_types) - if num_options == 1: - prompt_range = "1" - else: - prompt_range = f"1-{num_options}" + # Get storage type choice - allow exit for source storage + allow_exit = storage_type == "Source" + choice = self.get_user_choice( + f"Select {storage_type} storage type (1-4)", + list(available_types.keys()), + allow_exit=allow_exit, + ) - # Custom input handling with exit support - while True: - choice = input( - f"\nSelect {storage_type} storage type ({prompt_range}) (Press Enter or 0 to exit): " - ).strip() + # Handle exit + if choice is None: + print("\n✓ Migration cancelled by user") + return None, None, None, 0 - # Check for exit - if choice == "" or choice == "0": - print("\n✓ Migration cancelled by user") - return None, None, None, 0 + storage_name = STORAGE_TYPES[choice] - # Check if choice is valid - if choice in available_types: - break - - print( - f"✗ Invalid choice. Please enter one of: {', '.join(available_types.keys())}" - ) - - storage_name = available_types[choice] - - # Check configuration (warnings only, doesn't block) - print("\nChecking configuration...") - self.check_env_vars(storage_name) + # Check environment variables + print("\nChecking environment variables...") + if not self.check_env_vars(storage_name): + return None, None, None, 0 # Get workspace workspace = self.get_workspace_for_storage(storage_name) - # Initialize storage (real validation point) + # Initialize storage print(f"\nInitializing {storage_type} storage...") try: storage = await self.initialize_storage(storage_name, workspace) print(f"- Storage Type: {storage_name}") print(f"- Workspace: {workspace if workspace else '(default)'}") print("- Connection Status: ✓ Success") - - # Show configuration source for transparency - if storage_name == "RedisKVStorage": - config_source = ( - "environment variable" - if "REDIS_URI" in os.environ - else "config.ini or default" - ) - print(f"- Configuration Source: {config_source}") - elif storage_name == "PGKVStorage": - config_source = ( - "environment variables" - if all( - var in os.environ - for var in STORAGE_ENV_REQUIREMENTS[storage_name] - ) - else "config.ini or defaults" - ) - print(f"- Configuration Source: {config_source}") - elif storage_name == "MongoKVStorage": - config_source = ( - "environment variables" - if all( - var in os.environ - for var in STORAGE_ENV_REQUIREMENTS[storage_name] - ) - else "config.ini or defaults" - ) - print(f"- Configuration Source: {config_source}") - except Exception as e: print(f"✗ Initialization failed: {e}") - print(f"\nFor {storage_name}, you can configure using:") - print(" 1. Environment variables (highest priority)") - - # Show specific environment variable requirements - if storage_name in STORAGE_ENV_REQUIREMENTS: - for var in STORAGE_ENV_REQUIREMENTS[storage_name]: - print(f" - {var}") - - print(" 2. config.ini file (medium priority)") - if storage_name == "RedisKVStorage": - print(" [redis]") - print(" uri = redis://localhost:6379") - elif storage_name == "PGKVStorage": - print(" [postgres]") - print(" host = localhost") - print(" port = 5432") - print(" user = postgres") - print(" password = yourpassword") - print(" database = lightrag") - elif storage_name == "MongoKVStorage": - print(" [mongodb]") - print(" uri = mongodb://root:root@localhost:27017/") - print(" database = LightRAG") - return None, None, None, 0 # Count cache records efficiently @@ -1320,7 +1165,7 @@ class MigrationTool: print("=" * 60) async def run(self): - """Run the migration tool with streaming approach and early validation""" + """Run the migration tool with streaming approach""" try: # Initialize shared storage (REQUIRED for storage classes to work) from lightrag.kg.shared_storage import initialize_share_data @@ -1329,6 +1174,7 @@ class MigrationTool: # Print header self.print_header() + self.print_storage_types() # Setup source storage with streaming (only count, don't load all data) ( @@ -1342,32 +1188,8 @@ class MigrationTool: if self.source_storage is None: return - # Check if there are at least 2 storage types available - available_count = self.count_available_storage_types() - if available_count <= 1: - print("\n" + "=" * 60) - print("⚠️ Warning: Migration Not Possible") - print("=" * 60) - print(f"Only {available_count} storage type(s) available.") - print("Migration requires at least 2 different storage types.") - print("\nTo enable migration, configure additional storage:") - print(" 1. Set environment variables, OR") - print(" 2. Update config.ini file") - print("\nSupported storage types:") - for name in STORAGE_TYPES.values(): - if name != source_storage_name: - print(f" - {name}") - if name in STORAGE_ENV_REQUIREMENTS: - for var in STORAGE_ENV_REQUIREMENTS[name]: - print(f" Required: {var}") - print("=" * 60) - - # Cleanup - await self.source_storage.finalize() - return - if source_count == 0: - print("\n⚠️ Source storage has no cache records to migrate") + print("\n⚠ Source storage has no cache records to migrate") # Cleanup await self.source_storage.finalize() return @@ -1394,10 +1216,10 @@ class MigrationTool: print("Migration Confirmation") print("=" * 50) print( - f"Source: {self.format_storage_name(source_storage_name)} (workspace: {self.format_workspace(self.source_workspace)}) - {source_count:,} records" + f"Source: {source_storage_name} (workspace: {self.source_workspace if self.source_workspace else '(default)'}) - {source_count:,} records" ) print( - f"Target: {self.format_storage_name(target_storage_name)} (workspace: {self.format_workspace(self.target_workspace)}) - {target_count:,} records" + f"Target: {target_storage_name} (workspace: {self.target_workspace if self.target_workspace else '(default)'}) - {target_count:,} records" ) print(f"Batch Size: {self.batch_size:,} records/batch") print("Memory Mode: Streaming (memory-optimized)")