diff --git a/lightrag/evaluation/README.md b/lightrag/evaluation/README_EVALUASTION_RAGAS.md similarity index 100% rename from lightrag/evaluation/README.md rename to lightrag/evaluation/README_EVALUASTION_RAGAS.md diff --git a/lightrag/tools/README_MIGRATE_LLM_CACHE.md b/lightrag/tools/README_MIGRATE_LLM_CACHE.md index 36499ebb..beb7e6b0 100644 --- a/lightrag/tools/README_MIGRATE_LLM_CACHE.md +++ b/lightrag/tools/README_MIGRATE_LLM_CACHE.md @@ -17,59 +17,11 @@ The tool migrates the following cache types: - `default:extract:*` - Entity and relationship extraction caches - `default:summary:*` - Entity and relationship summary caches -**Note**: Query caches (modes like `local`, `global`, etc.) are NOT migrated. +**Note**: Query caches (modes like `mix`,`local`, `global`, etc.) are NOT migrated. ## Prerequisites -### 1. Environment Variable Configuration - -Ensure the relevant storage environment variables are configured in your `.env` file: - -#### Workspace Configuration (Optional) -```bash -# Generic workspace (shared by all storages) -WORKSPACE=space1 - -# Or configure independent workspace for specific storage -POSTGRES_WORKSPACE=pg_space -MONGODB_WORKSPACE=mongo_space -REDIS_WORKSPACE=redis_space -``` - -**Workspace Priority**: Storage-specific > Generic WORKSPACE > Empty string - -#### JsonKVStorage -```bash -WORKING_DIR=./rag_storage -``` - -#### RedisKVStorage -```bash -REDIS_URI=redis://localhost:6379 -``` - -#### PGKVStorage -```bash -POSTGRES_HOST=localhost -POSTGRES_PORT=5432 -POSTGRES_USER=your_username -POSTGRES_PASSWORD=your_password -POSTGRES_DATABASE=your_database -``` - -#### MongoKVStorage -```bash -MONGO_URI=mongodb://root:root@localhost:27017/ -MONGO_DATABASE=LightRAG -``` - -### 2. Install Dependencies - -Ensure LightRAG and its dependencies are installed: - -```bash -pip install -r requirements.txt -``` +The LLM Cache Migration Tool reads the storage configuration of the LightRAG Server and provides an LLM migration option to select source and destination storage. Ensure that both the source and destination storage have been correctly configured and are accessible via the LightRAG Server before cache migration. ## Usage @@ -95,10 +47,10 @@ Supported KV Storage Types: [3] PGKVStorage [4] MongoKVStorage -Select Source storage type (1-4) (Press Enter or 0 to exit): 1 +Select Source storage type (1-4) (Press Enter to exit): 1 ``` -**Note**: You can press Enter or type `0` at the source storage selection to exit gracefully. +**Note**: You can press Enter or type `0` at any storage selection prompt to exit gracefully. #### 2. Source Storage Validation The tool will: @@ -121,23 +73,43 @@ Counting cache records... ``` **Progress Display by Storage Type:** -- **JsonKVStorage**: Fast in-memory counting, no progress display needed -- **RedisKVStorage**: Real-time scanning progress +- **JsonKVStorage**: Fast in-memory counting, displays final count without incremental progress + ``` + Counting cache records... + - Total: 8,734 records + ``` +- **RedisKVStorage**: Real-time scanning progress with incremental counts ``` Scanning Redis keys... found 8,734 records ``` -- **PostgreSQL**: Shows timing if operation takes >1 second +- **PostgreSQL**: Quick COUNT(*) query, shows timing only if operation takes >1 second ``` Counting PostgreSQL records... (took 2.3s) ``` -- **MongoDB**: Shows timing if operation takes >1 second +- **MongoDB**: Fast count_documents(), shows timing only if operation takes >1 second ``` Counting MongoDB documents... (took 1.8s) ``` #### 3. Select Target Storage Type -Repeat steps 1-2 to select and validate the target storage. +The tool automatically excludes the source storage type from the target selection and renumbers the remaining options sequentially: + +``` +Available Storage Types for Target (source: JsonKVStorage excluded): +[1] RedisKVStorage +[2] PGKVStorage +[3] MongoKVStorage + +Select Target storage type (1-3) (Press Enter or 0 to exit): 1 +``` + +**Important Notes:** +- You **cannot** select the same storage type for both source and target +- Options are automatically renumbered (e.g., [1], [2], [3] instead of [2], [3], [4]) +- You can press Enter or type `0` to exit at this point as well + +The tool then validates the target storage following the same process as the source (checking environment variables, initializing connection, counting records). #### 4. Confirm Migration @@ -147,8 +119,9 @@ Migration Confirmation Source: JsonKVStorage (workspace: space1) - 8,734 records Target: MongoKVStorage (workspace: space1) - 0 records Batch Size: 1,000 records/batch +Memory Mode: Streaming (memory-optimized) -⚠ Warning: Target storage already has 0 records +⚠️ Warning: Target storage already has 0 records Migration will overwrite records with the same keys Continue? (y/n): y @@ -156,18 +129,27 @@ Continue? (y/n): y #### 5. Execute Migration -Observe migration progress: +The tool uses **streaming migration** by default for memory efficiency. Observe migration progress: ``` -=== Starting Migration === -Batch 1/9: ████████░░ 1000/8734 (11%) - default:extract -Batch 2/9: ████████████████░░ 2000/8734 (23%) - default:extract +=== Starting Streaming Migration === +💡 Memory-optimized mode: Processing 1,000 records at a time + +Batch 1/9: ████████░░░░░░░░░░░░ 1000/8734 (11.4%) - default:extract ✓ +Batch 2/9: ████████████░░░░░░░░ 2000/8734 (22.9%) - default:extract ✓ ... -Batch 9/9: ████████████████████ 8734/8734 (100%) - default:summary +Batch 9/9: ████████████████████ 8734/8734 (100.0%) - default:summary ✓ Persisting data to disk... +✓ Data persisted successfully ``` +**Key Features:** +- **Streaming mode**: Processes data in batches without loading entire dataset into memory +- **Real-time progress**: Shows progress bar with precise percentage and cache type +- **Success indicators**: ✓ for successful batches, ✗ for failed batches +- **Constant memory usage**: Handles millions of records efficiently + #### 6. Review Migration Report The tool provides a comprehensive final report showing statistics and any errors encountered: @@ -290,24 +272,96 @@ After migration completes, a detailed report includes: 1. **Data Overwrite Warning** - Migration will overwrite records with the same keys in the target storage - Tool displays a warning if target storage already has data + - Data migration can be performed repeatedly - Pre-existing data in target storage is handled correctly - -2. **Workspace Consistency** - - Recommended to use the same workspace for source and target - - Cache data in different workspaces are completely isolated - 3. **Interrupt and Resume** - Migration can be interrupted at any time (Ctrl+C) - Already migrated data will remain in target storage - Re-running will overwrite existing records - Failed batches can be manually retried - 4. **Performance Considerations** - Large data migration may take considerable time - Recommend migrating during off-peak hours - Ensure stable network connection (for remote databases) - Memory usage stays constant regardless of dataset size +## Storage Configuration + +The tool supports multiple configuration methods with the following priority: + +1. **Environment variables** (highest priority) +2. **config.ini file** (medium priority) +3. **Default values** (lowest priority) + +#### Option A: Environment Variable Configuration + +Configure storage settings in your `.env` file: + +#### Workspace Configuration (Optional) + +```bash +# Generic workspace (shared by all storages) +WORKSPACE=space1 + +# Or configure independent workspace for specific storage +POSTGRES_WORKSPACE=pg_space +MONGODB_WORKSPACE=mongo_space +REDIS_WORKSPACE=redis_space +``` + +**Workspace Priority**: Storage-specific > Generic WORKSPACE > Empty string + +#### JsonKVStorage + +```bash +WORKING_DIR=./rag_storage +``` + +#### RedisKVStorage + +```bash +REDIS_URI=redis://localhost:6379 +``` + +#### PGKVStorage + +```bash +POSTGRES_HOST=localhost +POSTGRES_PORT=5432 +POSTGRES_USER=your_username +POSTGRES_PASSWORD=your_password +POSTGRES_DATABASE=your_database +``` + +#### MongoKVStorage + +```bash +MONGO_URI=mongodb://root:root@localhost:27017/ +MONGO_DATABASE=LightRAG +``` + +#### Option B: config.ini Configuration + +Alternatively, create a `config.ini` file in the project root: + +```ini +[redis] +uri = redis://localhost:6379 + +[postgres] +host = localhost +port = 5432 +user = postgres +password = yourpassword +database = lightrag + +[mongodb] +uri = mongodb://root:root@localhost:27017/ +database = LightRAG +``` + +**Note**: Environment variables take precedence over config.ini settings. JsonKVStorage uses `WORKING_DIR` environment variable or defaults to `./rag_storage`. + ## Troubleshooting ### Missing Environment Variables @@ -345,22 +399,12 @@ MONGO_DATABASE=LightRAG # 2. Run tool python -m lightrag.tools.migrate_llm_cache -# 3. Select: 1 (JsonKVStorage) -> 4 (MongoKVStorage) +# 3. Select: 1 (JsonKVStorage) -> 1 (MongoKVStorage - renumbered from 4) ``` -### Scenario 2: PostgreSQL Database Switch +**Note**: After selecting JsonKVStorage as source, MongoKVStorage will be shown as option [1] in the target selection since options are renumbered after excluding the source. -Use case: Database migration or upgrade - -```bash -# 1. Configure old and new databases -POSTGRES_WORKSPACE=old_db # Source -# ... Configure new database as default - -# 2. Run tool and select same storage type -``` - -### Scenario 3: Redis to PostgreSQL +### Scenario 2: Redis to PostgreSQL Use case: Migrating from cache storage to relational database @@ -373,20 +417,38 @@ POSTGRES_HOST=new-postgres-server # 2. Run tool python -m lightrag.tools.migrate_llm_cache -# 3. Select: 2 (RedisKVStorage) -> 3 (PGKVStorage) +# 3. Select: 2 (RedisKVStorage) -> 2 (PGKVStorage - renumbered from 3) ``` +**Note**: After selecting RedisKVStorage as source, PGKVStorage will be shown as option [2] in the target selection. + +### Scenario 3: Different Workspaces Migration + +Use case: Migrating data between different workspace environments + +```bash +# Configure separate workspaces for source and target +POSTGRES_WORKSPACE=dev_workspace # For development environment +MONGODB_WORKSPACE=prod_workspace # For production environment + +# Run tool +python -m lightrag.tools.migrate_llm_cache + +# Select: 3 (PGKVStorage with dev_workspace) -> 3 (MongoKVStorage with prod_workspace) +``` + +**Note**: This allows you to migrate between different logical data partitions while changing storage backends. + ## Tool Limitations -1. **Only Default Mode Caches** +1. **Same Storage Type Not Allowed** + - You cannot migrate between the same storage type (e.g., PostgreSQL to PostgreSQL) + - This is enforced by the tool automatically excluding the source storage type from target selection + - For same-storage migrations (e.g., database switches), use database-native tools instead +2. **Only Default Mode Caches** - Only migrates `default:extract:*` and `default:summary:*` - Query caches are not included - -2. **Workspace Isolation** - - Different workspaces are treated as completely separate - - Cross-workspace migration requires manual workspace reconfiguration - -3. **Network Dependency** +4. **Network Dependency** - Tool requires stable network connection for remote databases - Large datasets may fail if connection is interrupted @@ -407,10 +469,3 @@ python -m lightrag.tools.migrate_llm_cache 4. **Clean Old Data** - After successful migration, consider cleaning old cache data - Keep backups for a reasonable period before deletion - -## Support - -For issues or questions: -- Check LightRAG documentation -- Review error logs for detailed information -- Ensure all environment variables are correctly configured diff --git a/lightrag/tools/migrate_llm_cache.py b/lightrag/tools/migrate_llm_cache.py index 2440bea8..942f244c 100644 --- a/lightrag/tools/migrate_llm_cache.py +++ b/lightrag/tools/migrate_llm_cache.py @@ -12,7 +12,7 @@ Usage: Supported KV Storage Types: - JsonKVStorage - - RedisKVStorage + - RedisKVStorage - PGKVStorage - MongoKVStorage """ @@ -26,13 +26,18 @@ from dataclasses import dataclass, field from dotenv import load_dotenv # Add project root to path for imports -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) +sys.path.insert( + 0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +) from lightrag.kg import STORAGE_ENV_REQUIREMENTS from lightrag.namespace import NameSpace from lightrag.utils import setup_logger # Load environment variables +# use the .env that is inside the current folder +# allows to use different .env file for each lightrag instance +# the OS environment variables take precedence over the .env file load_dotenv(dotenv_path=".env", override=False) # Setup logger @@ -57,9 +62,18 @@ WORKSPACE_ENV_MAP = { DEFAULT_BATCH_SIZE = 1000 +# Default count batch size for efficient counting +DEFAULT_COUNT_BATCH_SIZE = 1000 + +# ANSI color codes for terminal output +BOLD_CYAN = "\033[1;36m" +RESET = "\033[0m" + + @dataclass class MigrationStats: """Migration statistics and error tracking""" + total_source_records: int = 0 total_batches: int = 0 successful_batches: int = 0 @@ -67,16 +81,18 @@ class MigrationStats: successful_records: int = 0 failed_records: int = 0 errors: List[Dict[str, Any]] = field(default_factory=list) - + def add_error(self, batch_idx: int, error: Exception, batch_size: int): """Record batch error""" - self.errors.append({ - 'batch': batch_idx, - 'error_type': type(error).__name__, - 'error_msg': str(error), - 'records_lost': batch_size, - 'timestamp': time.time() - }) + self.errors.append( + { + "batch": batch_idx, + "error_type": type(error).__name__, + "error_msg": str(error), + "records_lost": batch_size, + "timestamp": time.time(), + } + ) self.failed_batches += 1 self.failed_records += batch_size @@ -93,12 +109,12 @@ class MigrationTool: def get_workspace_for_storage(self, storage_name: str) -> str: """Get workspace for a specific storage type - + Priority: Storage-specific env var > WORKSPACE env var > empty string - + Args: storage_name: Storage implementation name - + Returns: Workspace name """ @@ -107,72 +123,154 @@ class MigrationTool: specific_workspace = os.getenv(WORKSPACE_ENV_MAP[storage_name]) if specific_workspace: return specific_workspace - + # Check generic WORKSPACE workspace = os.getenv("WORKSPACE", "") return workspace - def check_env_vars(self, storage_name: str) -> bool: - """Check if all required environment variables exist - + def check_config_ini_for_storage(self, storage_name: str) -> bool: + """Check if config.ini has configuration for the storage type + Args: storage_name: Storage implementation name - + Returns: - True if all required env vars exist, False otherwise + True if config.ini has the necessary configuration + """ + try: + import configparser + + config = configparser.ConfigParser() + config.read("config.ini", "utf-8") + + if storage_name == "RedisKVStorage": + return config.has_option("redis", "uri") + elif storage_name == "PGKVStorage": + return ( + config.has_option("postgres", "user") + and config.has_option("postgres", "password") + and config.has_option("postgres", "database") + ) + elif storage_name == "MongoKVStorage": + return config.has_option("mongodb", "uri") and config.has_option( + "mongodb", "database" + ) + + return False + except Exception: + return False + + def check_env_vars(self, storage_name: str) -> bool: + """Check environment variables, show warnings if missing but don't fail + + Args: + storage_name: Storage implementation name + + Returns: + Always returns True (warnings only, no hard failure) """ required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, []) + + if not required_vars: + print("✓ No environment variables required") + return True + missing_vars = [var for var in required_vars if var not in os.environ] - + if missing_vars: - print(f"✗ Missing required environment variables: {', '.join(missing_vars)}") - return False - + print( + f"⚠️ Warning: Missing environment variables: {', '.join(missing_vars)}" + ) + + # Check if config.ini has configuration + has_config = self.check_config_ini_for_storage(storage_name) + if has_config: + print(" ✓ Found configuration in config.ini") + else: + print(f" Will attempt to use defaults for {storage_name}") + + return True + print("✓ All required environment variables are set") return True + def count_available_storage_types(self) -> int: + """Count available storage types (with env vars, config.ini, or defaults) + + Returns: + Number of available storage types + """ + available_count = 0 + + for storage_name in STORAGE_TYPES.values(): + # Check if storage requires configuration + required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, []) + + if not required_vars: + # JsonKVStorage, MongoKVStorage etc. - no config needed + available_count += 1 + else: + # Check if has environment variables + has_env = all(var in os.environ for var in required_vars) + if has_env: + available_count += 1 + else: + # Check if has config.ini configuration + has_config = self.check_config_ini_for_storage(storage_name) + if has_config: + available_count += 1 + + return available_count + def get_storage_class(self, storage_name: str): """Dynamically import and return storage class - + Args: storage_name: Storage implementation name - + Returns: Storage class """ if storage_name == "JsonKVStorage": from lightrag.kg.json_kv_impl import JsonKVStorage + return JsonKVStorage elif storage_name == "RedisKVStorage": from lightrag.kg.redis_impl import RedisKVStorage + return RedisKVStorage elif storage_name == "PGKVStorage": from lightrag.kg.postgres_impl import PGKVStorage + return PGKVStorage elif storage_name == "MongoKVStorage": from lightrag.kg.mongo_impl import MongoKVStorage + return MongoKVStorage else: raise ValueError(f"Unsupported storage type: {storage_name}") async def initialize_storage(self, storage_name: str, workspace: str): - """Initialize storage instance - + """Initialize storage instance with fallback to config.ini and defaults + Args: storage_name: Storage implementation name workspace: Workspace name - + Returns: Initialized storage instance + + Raises: + Exception: If initialization fails """ storage_class = self.get_storage_class(storage_name) - + # Create global config global_config = { "working_dir": os.getenv("WORKING_DIR", "./rag_storage"), "embedding_batch_num": 10, } - + # Initialize storage storage = storage_class( namespace=NameSpace.KV_STORE_LLM_RESPONSE_CACHE, @@ -180,18 +278,18 @@ class MigrationTool: global_config=global_config, embedding_func=None, ) - - # Initialize the storage + + # Initialize the storage (may raise exception if connection fails) await storage.initialize() - + return storage async def get_default_caches_json(self, storage) -> Dict[str, Any]: """Get default caches from JsonKVStorage - + Args: storage: JsonKVStorage instance - + Returns: Dictionary of cache entries with default:extract:* or default:summary:* keys """ @@ -199,39 +297,41 @@ class MigrationTool: async with storage._storage_lock: filtered = {} for key, value in storage._data.items(): - if key.startswith("default:extract:") or key.startswith("default:summary:"): + if key.startswith("default:extract:") or key.startswith( + "default:summary:" + ): filtered[key] = value return filtered - async def get_default_caches_redis(self, storage, batch_size: int = 1000) -> Dict[str, Any]: + async def get_default_caches_redis( + self, storage, batch_size: int = 1000 + ) -> Dict[str, Any]: """Get default caches from RedisKVStorage with pagination - + Args: storage: RedisKVStorage instance batch_size: Number of keys to process per batch - + Returns: Dictionary of cache entries with default:extract:* or default:summary:* keys """ import json - + cache_data = {} - + # Use _get_redis_connection() context manager async with storage._get_redis_connection() as redis: for pattern in ["default:extract:*", "default:summary:*"]: # Add namespace prefix to pattern prefixed_pattern = f"{storage.final_namespace}:{pattern}" cursor = 0 - + while True: # SCAN already implements cursor-based pagination cursor, keys = await redis.scan( - cursor, - match=prefixed_pattern, - count=batch_size + cursor, match=prefixed_pattern, count=batch_size ) - + if keys: # Process this batch using pipeline with error handling try: @@ -239,74 +339,88 @@ class MigrationTool: for key in keys: pipe.get(key) values = await pipe.execute() - + for key, value in zip(keys, values): if value: - key_str = key.decode() if isinstance(key, bytes) else key + key_str = ( + key.decode() if isinstance(key, bytes) else key + ) # Remove namespace prefix to get original key - original_key = key_str.replace(f"{storage.final_namespace}:", "", 1) + original_key = key_str.replace( + f"{storage.final_namespace}:", "", 1 + ) cache_data[original_key] = json.loads(value) - + except Exception as e: # Pipeline execution failed, fall back to individual gets - print(f"⚠️ Pipeline execution failed for batch, using individual gets: {e}") + print( + f"⚠️ Pipeline execution failed for batch, using individual gets: {e}" + ) for key in keys: try: value = await redis.get(key) if value: - key_str = key.decode() if isinstance(key, bytes) else key - original_key = key_str.replace(f"{storage.final_namespace}:", "", 1) + key_str = ( + key.decode() + if isinstance(key, bytes) + else key + ) + original_key = key_str.replace( + f"{storage.final_namespace}:", "", 1 + ) cache_data[original_key] = json.loads(value) except Exception as individual_error: - print(f"⚠️ Failed to get individual key {key}: {individual_error}") + print( + f"⚠️ Failed to get individual key {key}: {individual_error}" + ) continue - + if cursor == 0: break - + # Yield control periodically to avoid blocking await asyncio.sleep(0) - + return cache_data - async def get_default_caches_pg(self, storage, batch_size: int = 1000) -> Dict[str, Any]: + async def get_default_caches_pg( + self, storage, batch_size: int = 1000 + ) -> Dict[str, Any]: """Get default caches from PGKVStorage with pagination - + Args: storage: PGKVStorage instance batch_size: Number of records to fetch per batch - + Returns: Dictionary of cache entries with default:extract:* or default:summary:* keys """ from lightrag.kg.postgres_impl import namespace_to_table_name - + cache_data = {} table_name = namespace_to_table_name(storage.namespace) offset = 0 - + while True: # Use LIMIT and OFFSET for pagination query = f""" SELECT id as key, original_prompt, return_value, chunk_id, cache_type, queryparam, EXTRACT(EPOCH FROM create_time)::BIGINT as create_time, EXTRACT(EPOCH FROM update_time)::BIGINT as update_time - FROM {table_name} - WHERE workspace = $1 + FROM {table_name} + WHERE workspace = $1 AND (id LIKE 'default:extract:%' OR id LIKE 'default:summary:%') ORDER BY id LIMIT $2 OFFSET $3 """ - + results = await storage.db.query( - query, - [storage.workspace, batch_size, offset], - multirows=True + query, [storage.workspace, batch_size, offset], multirows=True ) - + if not results: break - + for row in results: # Map PostgreSQL fields to cache format cache_entry = { @@ -319,61 +433,63 @@ class MigrationTool: "update_time": row.get("update_time", 0), } cache_data[row["key"]] = cache_entry - + # If we got fewer results than batch_size, we're done if len(results) < batch_size: break - + offset += batch_size - + # Yield control periodically await asyncio.sleep(0) - + return cache_data - async def get_default_caches_mongo(self, storage, batch_size: int = 1000) -> Dict[str, Any]: + async def get_default_caches_mongo( + self, storage, batch_size: int = 1000 + ) -> Dict[str, Any]: """Get default caches from MongoKVStorage with cursor-based pagination - + Args: storage: MongoKVStorage instance batch_size: Number of documents to process per batch - + Returns: Dictionary of cache entries with default:extract:* or default:summary:* keys """ cache_data = {} - + # MongoDB query with regex - use _data not collection query = {"_id": {"$regex": "^default:(extract|summary):"}} - + # Use cursor without to_list() - process in batches cursor = storage._data.find(query).batch_size(batch_size) - + async for doc in cursor: # Process each document as it comes doc_copy = doc.copy() key = doc_copy.pop("_id") - + # Filter ALL MongoDB/database-specific fields # Following .clinerules: "Always filter deprecated/incompatible fields during deserialization" for field_name in ["namespace", "workspace", "_id", "content"]: doc_copy.pop(field_name, None) - + cache_data[key] = doc_copy - + # Periodically yield control (every batch_size documents) if len(cache_data) % batch_size == 0: await asyncio.sleep(0) - + return cache_data async def get_default_caches(self, storage, storage_name: str) -> Dict[str, Any]: """Get default caches from any storage type - + Args: storage: Storage instance storage_name: Storage type name - + Returns: Dictionary of cache entries """ @@ -388,12 +504,370 @@ class MigrationTool: else: raise ValueError(f"Unsupported storage type: {storage_name}") + async def count_default_caches_json(self, storage) -> int: + """Count default caches in JsonKVStorage - O(N) but very fast in-memory + + Args: + storage: JsonKVStorage instance + + Returns: + Total count of cache records + """ + async with storage._storage_lock: + return sum( + 1 + for key in storage._data.keys() + if key.startswith("default:extract:") + or key.startswith("default:summary:") + ) + + async def count_default_caches_redis(self, storage) -> int: + """Count default caches in RedisKVStorage using SCAN with progress display + + Args: + storage: RedisKVStorage instance + + Returns: + Total count of cache records + """ + count = 0 + print("Scanning Redis keys...", end="", flush=True) + + async with storage._get_redis_connection() as redis: + for pattern in ["default:extract:*", "default:summary:*"]: + prefixed_pattern = f"{storage.final_namespace}:{pattern}" + cursor = 0 + while True: + cursor, keys = await redis.scan( + cursor, match=prefixed_pattern, count=DEFAULT_COUNT_BATCH_SIZE + ) + count += len(keys) + + # Show progress + print( + f"\rScanning Redis keys... found {count:,} records", + end="", + flush=True, + ) + + if cursor == 0: + break + + print() # New line after progress + return count + + async def count_default_caches_pg(self, storage) -> int: + """Count default caches in PostgreSQL using COUNT(*) with progress indicator + + Args: + storage: PGKVStorage instance + + Returns: + Total count of cache records + """ + from lightrag.kg.postgres_impl import namespace_to_table_name + + table_name = namespace_to_table_name(storage.namespace) + + query = f""" + SELECT COUNT(*) as count + FROM {table_name} + WHERE workspace = $1 + AND (id LIKE 'default:extract:%' OR id LIKE 'default:summary:%') + """ + + print("Counting PostgreSQL records...", end="", flush=True) + start_time = time.time() + + result = await storage.db.query(query, [storage.workspace]) + + elapsed = time.time() - start_time + if elapsed > 1: + print(f" (took {elapsed:.1f}s)", end="") + print() # New line + + return result["count"] if result else 0 + + async def count_default_caches_mongo(self, storage) -> int: + """Count default caches in MongoDB using count_documents with progress indicator + + Args: + storage: MongoKVStorage instance + + Returns: + Total count of cache records + """ + query = {"_id": {"$regex": "^default:(extract|summary):"}} + + print("Counting MongoDB documents...", end="", flush=True) + start_time = time.time() + + count = await storage._data.count_documents(query) + + elapsed = time.time() - start_time + if elapsed > 1: + print(f" (took {elapsed:.1f}s)", end="") + print() # New line + + return count + + async def count_default_caches(self, storage, storage_name: str) -> int: + """Count default caches from any storage type efficiently + + Args: + storage: Storage instance + storage_name: Storage type name + + Returns: + Total count of cache records + """ + if storage_name == "JsonKVStorage": + return await self.count_default_caches_json(storage) + elif storage_name == "RedisKVStorage": + return await self.count_default_caches_redis(storage) + elif storage_name == "PGKVStorage": + return await self.count_default_caches_pg(storage) + elif storage_name == "MongoKVStorage": + return await self.count_default_caches_mongo(storage) + else: + raise ValueError(f"Unsupported storage type: {storage_name}") + + async def stream_default_caches_json(self, storage, batch_size: int): + """Stream default caches from JsonKVStorage - yields batches + + Args: + storage: JsonKVStorage instance + batch_size: Size of each batch to yield + + Yields: + Dictionary batches of cache entries + + Note: + This method creates a snapshot of matching items while holding the lock, + then releases the lock before yielding batches. This prevents deadlock + when the target storage (also JsonKVStorage) tries to acquire the same + lock during upsert operations. + """ + # Create a snapshot of matching items while holding the lock + async with storage._storage_lock: + matching_items = [ + (key, value) + for key, value in storage._data.items() + if key.startswith("default:extract:") + or key.startswith("default:summary:") + ] + + # Now iterate over snapshot without holding lock + batch = {} + for key, value in matching_items: + batch[key] = value + if len(batch) >= batch_size: + yield batch + batch = {} + + # Yield remaining items + if batch: + yield batch + + async def stream_default_caches_redis(self, storage, batch_size: int): + """Stream default caches from RedisKVStorage - yields batches + + Args: + storage: RedisKVStorage instance + batch_size: Size of each batch to yield + + Yields: + Dictionary batches of cache entries + """ + import json + + async with storage._get_redis_connection() as redis: + for pattern in ["default:extract:*", "default:summary:*"]: + prefixed_pattern = f"{storage.final_namespace}:{pattern}" + cursor = 0 + + while True: + cursor, keys = await redis.scan( + cursor, match=prefixed_pattern, count=batch_size + ) + + if keys: + try: + pipe = redis.pipeline() + for key in keys: + pipe.get(key) + values = await pipe.execute() + + batch = {} + for key, value in zip(keys, values): + if value: + key_str = ( + key.decode() if isinstance(key, bytes) else key + ) + original_key = key_str.replace( + f"{storage.final_namespace}:", "", 1 + ) + batch[original_key] = json.loads(value) + + if batch: + yield batch + + except Exception as e: + print(f"⚠️ Pipeline execution failed for batch: {e}") + # Fall back to individual gets + batch = {} + for key in keys: + try: + value = await redis.get(key) + if value: + key_str = ( + key.decode() + if isinstance(key, bytes) + else key + ) + original_key = key_str.replace( + f"{storage.final_namespace}:", "", 1 + ) + batch[original_key] = json.loads(value) + except Exception as individual_error: + print( + f"⚠️ Failed to get individual key {key}: {individual_error}" + ) + continue + + if batch: + yield batch + + if cursor == 0: + break + + await asyncio.sleep(0) + + async def stream_default_caches_pg(self, storage, batch_size: int): + """Stream default caches from PostgreSQL - yields batches + + Args: + storage: PGKVStorage instance + batch_size: Size of each batch to yield + + Yields: + Dictionary batches of cache entries + """ + from lightrag.kg.postgres_impl import namespace_to_table_name + + table_name = namespace_to_table_name(storage.namespace) + offset = 0 + + while True: + query = f""" + SELECT id as key, original_prompt, return_value, chunk_id, cache_type, queryparam, + EXTRACT(EPOCH FROM create_time)::BIGINT as create_time, + EXTRACT(EPOCH FROM update_time)::BIGINT as update_time + FROM {table_name} + WHERE workspace = $1 + AND (id LIKE 'default:extract:%' OR id LIKE 'default:summary:%') + ORDER BY id + LIMIT $2 OFFSET $3 + """ + + results = await storage.db.query( + query, [storage.workspace, batch_size, offset], multirows=True + ) + + if not results: + break + + batch = {} + for row in results: + cache_entry = { + "return": row.get("return_value", ""), + "cache_type": row.get("cache_type"), + "original_prompt": row.get("original_prompt", ""), + "chunk_id": row.get("chunk_id"), + "queryparam": row.get("queryparam"), + "create_time": row.get("create_time", 0), + "update_time": row.get("update_time", 0), + } + batch[row["key"]] = cache_entry + + if batch: + yield batch + + if len(results) < batch_size: + break + + offset += batch_size + await asyncio.sleep(0) + + async def stream_default_caches_mongo(self, storage, batch_size: int): + """Stream default caches from MongoDB - yields batches + + Args: + storage: MongoKVStorage instance + batch_size: Size of each batch to yield + + Yields: + Dictionary batches of cache entries + """ + query = {"_id": {"$regex": "^default:(extract|summary):"}} + cursor = storage._data.find(query).batch_size(batch_size) + + batch = {} + async for doc in cursor: + doc_copy = doc.copy() + key = doc_copy.pop("_id") + + # Filter MongoDB/database-specific fields + for field_name in ["namespace", "workspace", "_id", "content"]: + doc_copy.pop(field_name, None) + + batch[key] = doc_copy + + if len(batch) >= batch_size: + yield batch + batch = {} + + # Yield remaining items + if batch: + yield batch + + async def stream_default_caches( + self, storage, storage_name: str, batch_size: int = None + ): + """Stream default caches from any storage type - unified interface + + Args: + storage: Storage instance + storage_name: Storage type name + batch_size: Size of each batch to yield (defaults to self.batch_size) + + Yields: + Dictionary batches of cache entries + """ + if batch_size is None: + batch_size = self.batch_size + + if storage_name == "JsonKVStorage": + async for batch in self.stream_default_caches_json(storage, batch_size): + yield batch + elif storage_name == "RedisKVStorage": + async for batch in self.stream_default_caches_redis(storage, batch_size): + yield batch + elif storage_name == "PGKVStorage": + async for batch in self.stream_default_caches_pg(storage, batch_size): + yield batch + elif storage_name == "MongoKVStorage": + async for batch in self.stream_default_caches_mongo(storage, batch_size): + yield batch + else: + raise ValueError(f"Unsupported storage type: {storage_name}") + async def count_cache_types(self, cache_data: Dict[str, Any]) -> Dict[str, int]: """Count cache entries by type - + Args: cache_data: Dictionary of cache entries - + Returns: Dictionary with counts for each cache type """ @@ -401,13 +875,13 @@ class MigrationTool: "extract": 0, "summary": 0, } - + for key in cache_data.keys(): if key.startswith("default:extract:"): counts["extract"] += 1 elif key.startswith("default:summary:"): counts["summary"] += 1 - + return counts def print_header(self): @@ -422,138 +896,264 @@ class MigrationTool: for key, value in STORAGE_TYPES.items(): print(f"[{key}] {value}") - def get_user_choice(self, prompt: str, valid_choices: list) -> str: - """Get user choice with validation - - Args: - prompt: Prompt message - valid_choices: List of valid choices - - Returns: - User's choice - """ - while True: - choice = input(f"\n{prompt}: ").strip() - if choice in valid_choices: - return choice - print(f"✗ Invalid choice, please enter one of: {', '.join(valid_choices)}") + def format_workspace(self, workspace: str) -> str: + """Format workspace name with highlighting + + Args: + workspace: Workspace name (may be empty) + + Returns: + Formatted workspace string with ANSI color codes + """ + if workspace: + return f"{BOLD_CYAN}{workspace}{RESET}" + else: + return f"{BOLD_CYAN}(default){RESET}" + + def format_storage_name(self, storage_name: str) -> str: + """Format storage type name with highlighting + + Args: + storage_name: Storage type name + + Returns: + Formatted storage name string with ANSI color codes + """ + return f"{BOLD_CYAN}{storage_name}{RESET}" + + async def setup_storage( + self, + storage_type: str, + use_streaming: bool = False, + exclude_storage_name: str = None, + ) -> tuple: + """Setup and initialize storage with config.ini fallback support - async def setup_storage(self, storage_type: str) -> tuple: - """Setup and initialize storage - Args: storage_type: Type label (source/target) - + use_streaming: If True, only count records without loading. If False, load all data (legacy mode) + exclude_storage_name: Storage type to exclude from selection (e.g., to prevent selecting same as source) + Returns: - Tuple of (storage_instance, storage_name, workspace, cache_data) + Tuple of (storage_instance, storage_name, workspace, total_count) + Returns (None, None, None, 0) if user chooses to exit """ print(f"\n=== {storage_type} Storage Setup ===") - - # Get storage type choice - choice = self.get_user_choice( - f"Select {storage_type} storage type (1-4)", - list(STORAGE_TYPES.keys()) - ) - storage_name = STORAGE_TYPES[choice] - - # Check environment variables - print("\nChecking environment variables...") - if not self.check_env_vars(storage_name): - return None, None, None, None - + + # Filter and remap available storage types if exclusion is specified + if exclude_storage_name: + # Get available storage types (excluding source) + available_list = [ + (k, v) for k, v in STORAGE_TYPES.items() if v != exclude_storage_name + ] + + # Remap to sequential numbering (1, 2, 3...) + remapped_types = { + str(i + 1): name for i, (_, name) in enumerate(available_list) + } + + # Print available types with new sequential numbers + print( + f"\nAvailable Storage Types for Target (source: {exclude_storage_name} excluded):" + ) + for key, value in remapped_types.items(): + print(f"[{key}] {value}") + + available_types = remapped_types + else: + # For source storage, use original numbering + available_types = STORAGE_TYPES.copy() + self.print_storage_types() + + # Generate dynamic prompt based on number of options + num_options = len(available_types) + if num_options == 1: + prompt_range = "1" + else: + prompt_range = f"1-{num_options}" + + # Custom input handling with exit support + while True: + choice = input( + f"\nSelect {storage_type} storage type ({prompt_range}) (Press Enter to exit): " + ).strip() + + # Check for exit + if choice == "" or choice == "0": + print("\n✓ Migration cancelled by user") + return None, None, None, 0 + + # Check if choice is valid + if choice in available_types: + break + + print( + f"✗ Invalid choice. Please enter one of: {', '.join(available_types.keys())}" + ) + + storage_name = available_types[choice] + + # Check configuration (warnings only, doesn't block) + print("\nChecking configuration...") + self.check_env_vars(storage_name) + # Get workspace workspace = self.get_workspace_for_storage(storage_name) - - # Initialize storage + + # Initialize storage (real validation point) print(f"\nInitializing {storage_type} storage...") try: storage = await self.initialize_storage(storage_name, workspace) print(f"- Storage Type: {storage_name}") print(f"- Workspace: {workspace if workspace else '(default)'}") print("- Connection Status: ✓ Success") + + # Show configuration source for transparency + if storage_name == "RedisKVStorage": + config_source = ( + "environment variable" + if "REDIS_URI" in os.environ + else "config.ini or default" + ) + print(f"- Configuration Source: {config_source}") + elif storage_name == "PGKVStorage": + config_source = ( + "environment variables" + if all( + var in os.environ + for var in STORAGE_ENV_REQUIREMENTS[storage_name] + ) + else "config.ini or defaults" + ) + print(f"- Configuration Source: {config_source}") + elif storage_name == "MongoKVStorage": + config_source = ( + "environment variables" + if all( + var in os.environ + for var in STORAGE_ENV_REQUIREMENTS[storage_name] + ) + else "config.ini or defaults" + ) + print(f"- Configuration Source: {config_source}") + except Exception as e: print(f"✗ Initialization failed: {e}") - return None, None, None, None - - # Get cache data - print("\nCounting cache records...") + print(f"\nFor {storage_name}, you can configure using:") + print(" 1. Environment variables (highest priority)") + + # Show specific environment variable requirements + if storage_name in STORAGE_ENV_REQUIREMENTS: + for var in STORAGE_ENV_REQUIREMENTS[storage_name]: + print(f" - {var}") + + print(" 2. config.ini file (medium priority)") + if storage_name == "RedisKVStorage": + print(" [redis]") + print(" uri = redis://localhost:6379") + elif storage_name == "PGKVStorage": + print(" [postgres]") + print(" host = localhost") + print(" port = 5432") + print(" user = postgres") + print(" password = yourpassword") + print(" database = lightrag") + elif storage_name == "MongoKVStorage": + print(" [mongodb]") + print(" uri = mongodb://root:root@localhost:27017/") + print(" database = LightRAG") + + return None, None, None, 0 + + # Count cache records efficiently + print(f"\n{'Counting' if use_streaming else 'Loading'} cache records...") try: - cache_data = await self.get_default_caches(storage, storage_name) - counts = await self.count_cache_types(cache_data) - - print(f"- default:extract: {counts['extract']:,} records") - print(f"- default:summary: {counts['summary']:,} records") - print(f"- Total: {len(cache_data):,} records") + if use_streaming: + # Use efficient counting without loading data + total_count = await self.count_default_caches(storage, storage_name) + print(f"- Total: {total_count:,} records") + else: + # Legacy mode: load all data + cache_data = await self.get_default_caches(storage, storage_name) + counts = await self.count_cache_types(cache_data) + total_count = len(cache_data) + + print(f"- default:extract: {counts['extract']:,} records") + print(f"- default:summary: {counts['summary']:,} records") + print(f"- Total: {total_count:,} records") except Exception as e: - print(f"✗ Counting failed: {e}") - return None, None, None, None - - return storage, storage_name, workspace, cache_data + print(f"✗ {'Counting' if use_streaming else 'Loading'} failed: {e}") + return None, None, None, 0 + + return storage, storage_name, workspace, total_count async def migrate_caches( - self, - source_data: Dict[str, Any], - target_storage, - target_storage_name: str + self, source_data: Dict[str, Any], target_storage, target_storage_name: str ) -> MigrationStats: - """Migrate caches in batches with error tracking - + """Migrate caches in batches with error tracking (Legacy mode - loads all data) + Args: source_data: Source cache data target_storage: Target storage instance target_storage_name: Target storage type name - + Returns: MigrationStats object with migration results and errors """ stats = MigrationStats() stats.total_source_records = len(source_data) - + if stats.total_source_records == 0: print("\nNo records to migrate") return stats - + # Convert to list for batching items = list(source_data.items()) - stats.total_batches = (stats.total_source_records + self.batch_size - 1) // self.batch_size - + stats.total_batches = ( + stats.total_source_records + self.batch_size - 1 + ) // self.batch_size + print("\n=== Starting Migration ===") - + for batch_idx in range(stats.total_batches): start_idx = batch_idx * self.batch_size end_idx = min((batch_idx + 1) * self.batch_size, stats.total_source_records) batch_items = items[start_idx:end_idx] batch_data = dict(batch_items) - + # Determine current cache type for display current_key = batch_items[0][0] cache_type = "extract" if "extract" in current_key else "summary" - + try: # Attempt to write batch await target_storage.upsert(batch_data) - + # Success - update stats stats.successful_batches += 1 stats.successful_records += len(batch_data) - + # Calculate progress progress = (end_idx / stats.total_source_records) * 100 bar_length = 20 filled_length = int(bar_length * end_idx // stats.total_source_records) bar = "█" * filled_length + "░" * (bar_length - filled_length) - - print(f"Batch {batch_idx + 1}/{stats.total_batches}: {bar} " - f"{end_idx:,}/{stats.total_source_records:,} ({progress:.0f}%) - " - f"default:{cache_type} ✓") - + + print( + f"Batch {batch_idx + 1}/{stats.total_batches}: {bar} " + f"{end_idx:,}/{stats.total_source_records:,} ({progress:.0f}%) - " + f"default:{cache_type} ✓" + ) + except Exception as e: # Error - record and continue stats.add_error(batch_idx + 1, e, len(batch_data)) - - print(f"Batch {batch_idx + 1}/{stats.total_batches}: ✗ FAILED - " - f"{type(e).__name__}: {str(e)}") - + + print( + f"Batch {batch_idx + 1}/{stats.total_batches}: ✗ FAILED - " + f"{type(e).__name__}: {str(e)}" + ) + # Final persist print("\nPersisting data to disk...") try: @@ -562,19 +1162,111 @@ class MigrationTool: except Exception as e: print(f"✗ Persist failed: {e}") stats.add_error(0, e, 0) # batch 0 = persist error - + + return stats + + async def migrate_caches_streaming( + self, + source_storage, + source_storage_name: str, + target_storage, + target_storage_name: str, + total_records: int, + ) -> MigrationStats: + """Migrate caches using streaming approach - minimal memory footprint + + Args: + source_storage: Source storage instance + source_storage_name: Source storage type name + target_storage: Target storage instance + target_storage_name: Target storage type name + total_records: Total number of records to migrate + + Returns: + MigrationStats object with migration results and errors + """ + stats = MigrationStats() + stats.total_source_records = total_records + + if stats.total_source_records == 0: + print("\nNo records to migrate") + return stats + + # Calculate total batches + stats.total_batches = (total_records + self.batch_size - 1) // self.batch_size + + print("\n=== Starting Streaming Migration ===") + print( + f"💡 Memory-optimized mode: Processing {self.batch_size:,} records at a time\n" + ) + + batch_idx = 0 + + # Stream batches from source and write to target immediately + async for batch in self.stream_default_caches( + source_storage, source_storage_name + ): + batch_idx += 1 + + # Determine current cache type for display + if batch: + first_key = next(iter(batch.keys())) + cache_type = "extract" if "extract" in first_key else "summary" + else: + cache_type = "unknown" + + try: + # Write batch to target storage + await target_storage.upsert(batch) + + # Success - update stats + stats.successful_batches += 1 + stats.successful_records += len(batch) + + # Calculate progress with known total + progress = (stats.successful_records / total_records) * 100 + bar_length = 20 + filled_length = int( + bar_length * stats.successful_records // total_records + ) + bar = "█" * filled_length + "░" * (bar_length - filled_length) + + print( + f"Batch {batch_idx}/{stats.total_batches}: {bar} " + f"{stats.successful_records:,}/{total_records:,} ({progress:.1f}%) - " + f"default:{cache_type} ✓" + ) + + except Exception as e: + # Error - record and continue + stats.add_error(batch_idx, e, len(batch)) + + print( + f"Batch {batch_idx}/{stats.total_batches}: ✗ FAILED - " + f"{type(e).__name__}: {str(e)}" + ) + + # Final persist + print("\nPersisting data to disk...") + try: + await target_storage.index_done_callback() + print("✓ Data persisted successfully") + except Exception as e: + print(f"✗ Persist failed: {e}") + stats.add_error(0, e, 0) # batch 0 = persist error + return stats def print_migration_report(self, stats: MigrationStats): """Print comprehensive migration report - + Args: stats: MigrationStats object with migration results """ print("\n" + "=" * 60) print("Migration Complete - Final Report") print("=" * 60) - + # Overall statistics print("\n📊 Statistics:") print(f" Total source records: {stats.total_source_records:,}") @@ -583,37 +1275,41 @@ class MigrationTool: print(f" Failed batches: {stats.failed_batches:,}") print(f" Successfully migrated: {stats.successful_records:,}") print(f" Failed to migrate: {stats.failed_records:,}") - + # Success rate - success_rate = (stats.successful_records / stats.total_source_records * 100) if stats.total_source_records > 0 else 0 + success_rate = ( + (stats.successful_records / stats.total_source_records * 100) + if stats.total_source_records > 0 + else 0 + ) print(f" Success rate: {success_rate:.2f}%") - + # Error details if stats.errors: print(f"\n⚠️ Errors encountered: {len(stats.errors)}") print("\nError Details:") print("-" * 60) - + # Group errors by type error_types = {} for error in stats.errors: - err_type = error['error_type'] + err_type = error["error_type"] error_types[err_type] = error_types.get(err_type, 0) + 1 - + print("\nError Summary:") for err_type, count in sorted(error_types.items(), key=lambda x: -x[1]): print(f" - {err_type}: {count} occurrence(s)") - + print("\nFirst 5 errors:") for i, error in enumerate(stats.errors[:5], 1): print(f"\n {i}. Batch {error['batch']}") print(f" Type: {error['error_type']}") print(f" Message: {error['error_msg']}") print(f" Records lost: {error['records_lost']:,}") - + if len(stats.errors) > 5: print(f"\n ... and {len(stats.errors) - 5} more errors") - + print("\n" + "=" * 60) print("⚠️ WARNING: Migration completed with errors!") print(" Please review the error details above.") @@ -624,84 +1320,125 @@ class MigrationTool: print("=" * 60) async def run(self): - """Run the migration tool""" + """Run the migration tool with streaming approach and early validation""" try: # Initialize shared storage (REQUIRED for storage classes to work) from lightrag.kg.shared_storage import initialize_share_data + initialize_share_data(workers=1) - + # Print header self.print_header() - self.print_storage_types() - - # Setup source storage + + # Setup source storage with streaming (only count, don't load all data) ( self.source_storage, source_storage_name, self.source_workspace, - source_data - ) = await self.setup_storage("Source") - - if not self.source_storage: - print("\n✗ Source storage setup failed") + source_count, + ) = await self.setup_storage("Source", use_streaming=True) + + # Check if user cancelled (setup_storage returns None for all fields) + if self.source_storage is None: return - - if not source_data: - print("\n⚠ Source storage has no cache records to migrate") + + # Check if there are at least 2 storage types available + available_count = self.count_available_storage_types() + if available_count <= 1: + print("\n" + "=" * 60) + print("⚠️ Warning: Migration Not Possible") + print("=" * 60) + print(f"Only {available_count} storage type(s) available.") + print("Migration requires at least 2 different storage types.") + print("\nTo enable migration, configure additional storage:") + print(" 1. Set environment variables, OR") + print(" 2. Update config.ini file") + print("\nSupported storage types:") + for name in STORAGE_TYPES.values(): + if name != source_storage_name: + print(f" - {name}") + if name in STORAGE_ENV_REQUIREMENTS: + for var in STORAGE_ENV_REQUIREMENTS[name]: + print(f" Required: {var}") + print("=" * 60) + # Cleanup await self.source_storage.finalize() return - - # Setup target storage + + if source_count == 0: + print("\n⚠️ Source storage has no cache records to migrate") + # Cleanup + await self.source_storage.finalize() + return + + # Setup target storage with streaming (only count, don't load all data) + # Exclude source storage type from target selection ( self.target_storage, target_storage_name, self.target_workspace, - target_data - ) = await self.setup_storage("Target") - + target_count, + ) = await self.setup_storage( + "Target", use_streaming=True, exclude_storage_name=source_storage_name + ) + if not self.target_storage: print("\n✗ Target storage setup failed") # Cleanup source await self.source_storage.finalize() return - + # Show migration summary print("\n" + "=" * 50) print("Migration Confirmation") print("=" * 50) - print(f"Source: {source_storage_name} (workspace: {self.source_workspace if self.source_workspace else '(default)'}) - {len(source_data):,} records") - print(f"Target: {target_storage_name} (workspace: {self.target_workspace if self.target_workspace else '(default)'}) - {len(target_data):,} records") + print( + f"Source: {self.format_storage_name(source_storage_name)} (workspace: {self.format_workspace(self.source_workspace)}) - {source_count:,} records" + ) + print( + f"Target: {self.format_storage_name(target_storage_name)} (workspace: {self.format_workspace(self.target_workspace)}) - {target_count:,} records" + ) print(f"Batch Size: {self.batch_size:,} records/batch") - - if target_data: - print(f"\n⚠ Warning: Target storage already has {len(target_data):,} records") + print("Memory Mode: Streaming (memory-optimized)") + + if target_count > 0: + print( + f"\n⚠️ Warning: Target storage already has {target_count:,} records" + ) print("Migration will overwrite records with the same keys") - + # Confirm migration confirm = input("\nContinue? (y/n): ").strip().lower() - if confirm != 'y': + if confirm != "y": print("\n✗ Migration cancelled") # Cleanup await self.source_storage.finalize() await self.target_storage.finalize() return - - # Perform migration with error tracking - stats = await self.migrate_caches(source_data, self.target_storage, target_storage_name) - + + # Perform streaming migration with error tracking + stats = await self.migrate_caches_streaming( + self.source_storage, + source_storage_name, + self.target_storage, + target_storage_name, + source_count, + ) + # Print comprehensive migration report self.print_migration_report(stats) - + # Cleanup await self.source_storage.finalize() await self.target_storage.finalize() - + except KeyboardInterrupt: print("\n\n✗ Migration interrupted by user") except Exception as e: print(f"\n✗ Migration failed: {e}") import traceback + traceback.print_exc() finally: # Ensure cleanup @@ -715,10 +1452,11 @@ class MigrationTool: await self.target_storage.finalize() except Exception: pass - + # Finalize shared storage try: from lightrag.kg.shared_storage import finalize_share_data + finalize_share_data() except Exception: pass