Add LLM cache migration tool for KV storage backends
- Supports JSON/Redis/PostgreSQL/MongoDB - Batch migration with error tracking - Workspace-aware data transfer - Memory-efficient pagination - Comprehensive migration reporting
This commit is contained in:
parent
cf732dbfc6
commit
55274dde59
2 changed files with 1122 additions and 0 deletions
401
tools/README_MIGRATE_LLM_CACHE.md
Normal file
401
tools/README_MIGRATE_LLM_CACHE.md
Normal file
|
|
@ -0,0 +1,401 @@
|
|||
# LLM Cache Migration Tool - User Guide
|
||||
|
||||
## Overview
|
||||
|
||||
This tool migrates LightRAG's LLM response cache between different KV storage implementations. It specifically migrates caches generated during file extraction (mode `default`), including entity extraction and summary caches.
|
||||
|
||||
## Supported Storage Types
|
||||
|
||||
1. **JsonKVStorage** - File-based JSON storage
|
||||
2. **RedisKVStorage** - Redis database storage
|
||||
3. **PGKVStorage** - PostgreSQL database storage
|
||||
4. **MongoKVStorage** - MongoDB database storage
|
||||
|
||||
## Cache Types
|
||||
|
||||
The tool migrates the following cache types:
|
||||
- `default:extract:*` - Entity and relationship extraction caches
|
||||
- `default:summary:*` - Entity and relationship summary caches
|
||||
|
||||
**Note**: Query caches (modes like `local`, `global`, etc.) are NOT migrated.
|
||||
|
||||
## Prerequisites
|
||||
|
||||
### 1. Environment Variable Configuration
|
||||
|
||||
Ensure the relevant storage environment variables are configured in your `.env` file:
|
||||
|
||||
#### Workspace Configuration (Optional)
|
||||
```bash
|
||||
# Generic workspace (shared by all storages)
|
||||
WORKSPACE=space1
|
||||
|
||||
# Or configure independent workspace for specific storage
|
||||
POSTGRES_WORKSPACE=pg_space
|
||||
MONGODB_WORKSPACE=mongo_space
|
||||
REDIS_WORKSPACE=redis_space
|
||||
```
|
||||
|
||||
**Workspace Priority**: Storage-specific > Generic WORKSPACE > Empty string
|
||||
|
||||
#### JsonKVStorage
|
||||
```bash
|
||||
WORKING_DIR=./rag_storage
|
||||
```
|
||||
|
||||
#### RedisKVStorage
|
||||
```bash
|
||||
REDIS_URI=redis://localhost:6379
|
||||
```
|
||||
|
||||
#### PGKVStorage
|
||||
```bash
|
||||
POSTGRES_HOST=localhost
|
||||
POSTGRES_PORT=5432
|
||||
POSTGRES_USER=your_username
|
||||
POSTGRES_PASSWORD=your_password
|
||||
POSTGRES_DATABASE=your_database
|
||||
```
|
||||
|
||||
#### MongoKVStorage
|
||||
```bash
|
||||
MONGO_URI=mongodb://root:root@localhost:27017/
|
||||
MONGO_DATABASE=LightRAG
|
||||
```
|
||||
|
||||
### 2. Install Dependencies
|
||||
|
||||
Ensure LightRAG and its dependencies are installed:
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
### Basic Usage
|
||||
|
||||
Run from the LightRAG project root directory:
|
||||
|
||||
```bash
|
||||
python tools/migrate_llm_cache.py
|
||||
```
|
||||
|
||||
### Interactive Workflow
|
||||
|
||||
The tool guides you through the following steps:
|
||||
|
||||
#### 1. Select Source Storage Type
|
||||
```
|
||||
Supported KV Storage Types:
|
||||
[1] JsonKVStorage
|
||||
[2] RedisKVStorage
|
||||
[3] PGKVStorage
|
||||
[4] MongoKVStorage
|
||||
|
||||
Select Source storage type (1-4): 1
|
||||
```
|
||||
|
||||
#### 2. Source Storage Validation
|
||||
The tool will:
|
||||
- Check required environment variables
|
||||
- Auto-detect workspace configuration
|
||||
- Initialize and connect to storage
|
||||
- Count cache records available for migration
|
||||
|
||||
```
|
||||
Checking environment variables...
|
||||
✓ All required environment variables are set
|
||||
|
||||
Initializing Source storage...
|
||||
- Storage Type: JsonKVStorage
|
||||
- Workspace: space1
|
||||
- Connection Status: ✓ Success
|
||||
|
||||
Counting cache records...
|
||||
- default:extract: 8,500 records
|
||||
- default:summary: 234 records
|
||||
- Total: 8,734 records
|
||||
```
|
||||
|
||||
#### 3. Select Target Storage Type
|
||||
|
||||
Repeat steps 1-2 to select and validate the target storage.
|
||||
|
||||
#### 4. Confirm Migration
|
||||
|
||||
Review the migration summary and confirm:
|
||||
|
||||
```
|
||||
Migration Confirmation
|
||||
==================================================
|
||||
Source: JsonKVStorage (workspace: space1) - 8,734 records
|
||||
Target: MongoKVStorage (workspace: space1) - 0 records
|
||||
Batch Size: 1,000 records/batch
|
||||
|
||||
⚠ Warning: Target storage already has 0 records
|
||||
Migration will overwrite records with the same keys
|
||||
|
||||
Continue? (y/n): y
|
||||
```
|
||||
|
||||
#### 5. Execute Migration
|
||||
|
||||
Observe migration progress:
|
||||
|
||||
```
|
||||
=== Starting Migration ===
|
||||
Batch 1/9: ████████░░ 1000/8734 (11%) - default:extract
|
||||
Batch 2/9: ████████████████░░ 2000/8734 (23%) - default:extract
|
||||
...
|
||||
Batch 9/9: ████████████████████ 8734/8734 (100%) - default:summary
|
||||
|
||||
Persisting data to disk...
|
||||
```
|
||||
|
||||
#### 6. Review Migration Report
|
||||
|
||||
The tool provides a comprehensive final report showing statistics and any errors encountered:
|
||||
|
||||
**Successful Migration:**
|
||||
```
|
||||
Migration Complete - Final Report
|
||||
|
||||
📊 Statistics:
|
||||
Total source records: 8,734
|
||||
Total batches: 9
|
||||
Successful batches: 9
|
||||
Failed batches: 0
|
||||
Successfully migrated: 8,734
|
||||
Failed to migrate: 0
|
||||
Success rate: 100.00%
|
||||
|
||||
✓ SUCCESS: All records migrated successfully!
|
||||
```
|
||||
|
||||
**Migration with Errors:**
|
||||
```
|
||||
Migration Complete - Final Report
|
||||
|
||||
📊 Statistics:
|
||||
Total source records: 8,734
|
||||
Total batches: 9
|
||||
Successful batches: 8
|
||||
Failed batches: 1
|
||||
Successfully migrated: 7,734
|
||||
Failed to migrate: 1,000
|
||||
Success rate: 88.55%
|
||||
|
||||
⚠️ Errors encountered: 1
|
||||
|
||||
Error Details:
|
||||
------------------------------------------------------------
|
||||
|
||||
Error Summary:
|
||||
- ConnectionError: 1 occurrence(s)
|
||||
|
||||
First 5 errors:
|
||||
|
||||
1. Batch 2
|
||||
Type: ConnectionError
|
||||
Message: Connection timeout after 30s
|
||||
Records lost: 1,000
|
||||
|
||||
⚠️ WARNING: Migration completed with errors!
|
||||
Please review the error details above.
|
||||
```
|
||||
|
||||
## Technical Details
|
||||
|
||||
### Workspace Handling
|
||||
|
||||
The tool retrieves workspace in the following priority order:
|
||||
|
||||
1. **Storage-specific workspace environment variables**
|
||||
- PGKVStorage: `POSTGRES_WORKSPACE`
|
||||
- MongoKVStorage: `MONGODB_WORKSPACE`
|
||||
- RedisKVStorage: `REDIS_WORKSPACE`
|
||||
|
||||
2. **Generic workspace environment variable**
|
||||
- `WORKSPACE`
|
||||
|
||||
3. **Default value**
|
||||
- Empty string (uses storage's default workspace)
|
||||
|
||||
### Batch Migration
|
||||
|
||||
- Default batch size: 1000 records/batch
|
||||
- Avoids memory overflow from loading too much data at once
|
||||
- Each batch is committed independently, supporting resume capability
|
||||
|
||||
### Memory-Efficient Pagination
|
||||
|
||||
For large datasets, the tool implements storage-specific pagination strategies:
|
||||
|
||||
- **JsonKVStorage**: Direct in-memory access (data already loaded in shared storage)
|
||||
- **RedisKVStorage**: Cursor-based SCAN with pipeline batching (1000 keys/batch)
|
||||
- **PGKVStorage**: SQL LIMIT/OFFSET pagination (1000 records/batch)
|
||||
- **MongoKVStorage**: Cursor streaming with batch_size (1000 documents/batch)
|
||||
|
||||
This ensures the tool can handle millions of cache records without memory issues.
|
||||
|
||||
### Prefix Filtering Implementation
|
||||
|
||||
The tool uses optimized filtering methods for different storage types:
|
||||
|
||||
- **JsonKVStorage**: Direct dictionary iteration with lock protection
|
||||
- **RedisKVStorage**: SCAN command with namespace-prefixed patterns + pipeline for bulk GET
|
||||
- **PGKVStorage**: SQL LIKE queries with proper field mapping (id, return_value, etc.)
|
||||
- **MongoKVStorage**: MongoDB regex queries on `_id` field with cursor streaming
|
||||
|
||||
## Error Handling & Resilience
|
||||
|
||||
The tool implements comprehensive error tracking to ensure transparent and resilient migrations:
|
||||
|
||||
### Batch-Level Error Tracking
|
||||
- Each batch is independently error-checked
|
||||
- Failed batches are logged but don't stop the migration
|
||||
- Successful batches are committed even if later batches fail
|
||||
- Real-time progress shows ✓ (success) or ✗ (failed) for each batch
|
||||
|
||||
### Error Reporting
|
||||
After migration completes, a detailed report includes:
|
||||
- **Statistics**: Total records, success/failure counts, success rate
|
||||
- **Error Summary**: Grouped by error type with occurrence counts
|
||||
- **Error Details**: Batch number, error type, message, and records lost
|
||||
- **Recommendations**: Clear indication of success or need for review
|
||||
|
||||
### No Double Data Loading
|
||||
- Unlike traditional verification approaches, the tool does NOT reload all target data
|
||||
- Errors are detected during migration, not after
|
||||
- This eliminates memory overhead and handles pre-existing target data correctly
|
||||
|
||||
## Important Notes
|
||||
|
||||
1. **Data Overwrite Warning**
|
||||
- Migration will overwrite records with the same keys in the target storage
|
||||
- Tool displays a warning if target storage already has data
|
||||
- Pre-existing data in target storage is handled correctly
|
||||
|
||||
2. **Workspace Consistency**
|
||||
- Recommended to use the same workspace for source and target
|
||||
- Cache data in different workspaces are completely isolated
|
||||
|
||||
3. **Interrupt and Resume**
|
||||
- Migration can be interrupted at any time (Ctrl+C)
|
||||
- Already migrated data will remain in target storage
|
||||
- Re-running will overwrite existing records
|
||||
- Failed batches can be manually retried
|
||||
|
||||
4. **Performance Considerations**
|
||||
- Large data migration may take considerable time
|
||||
- Recommend migrating during off-peak hours
|
||||
- Ensure stable network connection (for remote databases)
|
||||
- Memory usage stays constant regardless of dataset size
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Missing Environment Variables
|
||||
```
|
||||
✗ Missing required environment variables: POSTGRES_USER, POSTGRES_PASSWORD
|
||||
```
|
||||
**Solution**: Add missing variables to your `.env` file
|
||||
|
||||
### Connection Failed
|
||||
```
|
||||
✗ Initialization failed: Connection refused
|
||||
```
|
||||
**Solutions**:
|
||||
- Check if database service is running
|
||||
- Verify connection parameters (host, port, credentials)
|
||||
- Check firewall settings
|
||||
|
||||
**Solutions**:
|
||||
- Check migration process for error logs
|
||||
- Re-run migration tool
|
||||
- Check target storage capacity and permissions
|
||||
|
||||
## Example Scenarios
|
||||
|
||||
### Scenario 1: JSON to MongoDB Migration
|
||||
|
||||
Use case: Migrating from single-machine development to production
|
||||
|
||||
```bash
|
||||
# 1. Configure environment variables
|
||||
WORKSPACE=production
|
||||
MONGO_URI=mongodb://user:pass@prod-server:27017/
|
||||
MONGO_DATABASE=LightRAG
|
||||
|
||||
# 2. Run tool
|
||||
python tools/migrate_llm_cache.py
|
||||
|
||||
# 3. Select: 1 (JsonKVStorage) -> 4 (MongoKVStorage)
|
||||
```
|
||||
|
||||
### Scenario 2: PostgreSQL Database Switch
|
||||
|
||||
Use case: Database migration or upgrade
|
||||
|
||||
```bash
|
||||
# 1. Configure old and new databases
|
||||
POSTGRES_WORKSPACE=old_db # Source
|
||||
# ... Configure new database as default
|
||||
|
||||
# 2. Run tool and select same storage type
|
||||
```
|
||||
|
||||
### Scenario 3: Redis to PostgreSQL
|
||||
|
||||
Use case: Migrating from cache storage to relational database
|
||||
|
||||
```bash
|
||||
# 1. Ensure both databases are accessible
|
||||
REDIS_URI=redis://old-redis:6379
|
||||
POSTGRES_HOST=new-postgres-server
|
||||
# ... Other PostgreSQL configs
|
||||
|
||||
# 2. Run tool
|
||||
python tools/migrate_llm_cache.py
|
||||
|
||||
# 3. Select: 2 (RedisKVStorage) -> 3 (PGKVStorage)
|
||||
```
|
||||
|
||||
## Tool Limitations
|
||||
|
||||
1. **Only Default Mode Caches**
|
||||
- Only migrates `default:extract:*` and `default:summary:*`
|
||||
- Query caches are not included
|
||||
|
||||
2. **Workspace Isolation**
|
||||
- Different workspaces are treated as completely separate
|
||||
- Cross-workspace migration requires manual workspace reconfiguration
|
||||
|
||||
3. **Network Dependency**
|
||||
- Tool requires stable network connection for remote databases
|
||||
- Large datasets may fail if connection is interrupted
|
||||
|
||||
## Best Practices
|
||||
|
||||
1. **Backup Before Migration**
|
||||
- Always backup your data before migration
|
||||
- Test migration on non-production data first
|
||||
|
||||
2. **Verify Results**
|
||||
- Check the verification output after migration
|
||||
- Manually verify a few cache entries if needed
|
||||
|
||||
3. **Monitor Performance**
|
||||
- Watch database resource usage during migration
|
||||
- Consider migrating in smaller batches if needed
|
||||
|
||||
4. **Clean Old Data**
|
||||
- After successful migration, consider cleaning old cache data
|
||||
- Keep backups for a reasonable period before deletion
|
||||
|
||||
## Support
|
||||
|
||||
For issues or questions:
|
||||
- Check LightRAG documentation
|
||||
- Review error logs for detailed information
|
||||
- Ensure all environment variables are correctly configured
|
||||
721
tools/migrate_llm_cache.py
Normal file
721
tools/migrate_llm_cache.py
Normal file
|
|
@ -0,0 +1,721 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
LLM Cache Migration Tool for LightRAG
|
||||
|
||||
This tool migrates LLM response cache (default:extract:* and default:summary:*)
|
||||
between different KV storage implementations while preserving workspace isolation.
|
||||
|
||||
Usage:
|
||||
python tools/migrate_llm_cache.py
|
||||
|
||||
Supported KV Storage Types:
|
||||
- JsonKVStorage
|
||||
- RedisKVStorage
|
||||
- PGKVStorage
|
||||
- MongoKVStorage
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from typing import Any, Dict, List
|
||||
from dataclasses import dataclass, field
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Add parent directory to path for imports
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
from lightrag.kg import STORAGE_ENV_REQUIREMENTS
|
||||
from lightrag.namespace import NameSpace
|
||||
from lightrag.utils import setup_logger
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv(dotenv_path=".env", override=False)
|
||||
|
||||
# Setup logger
|
||||
setup_logger("lightrag", level="INFO")
|
||||
|
||||
# Storage type configurations
|
||||
STORAGE_TYPES = {
|
||||
"1": "JsonKVStorage",
|
||||
"2": "RedisKVStorage",
|
||||
"3": "PGKVStorage",
|
||||
"4": "MongoKVStorage",
|
||||
}
|
||||
|
||||
# Workspace environment variable mapping
|
||||
WORKSPACE_ENV_MAP = {
|
||||
"PGKVStorage": "POSTGRES_WORKSPACE",
|
||||
"MongoKVStorage": "MONGODB_WORKSPACE",
|
||||
"RedisKVStorage": "REDIS_WORKSPACE",
|
||||
}
|
||||
|
||||
# Default batch size for migration
|
||||
DEFAULT_BATCH_SIZE = 1000
|
||||
|
||||
|
||||
@dataclass
|
||||
class MigrationStats:
|
||||
"""Migration statistics and error tracking"""
|
||||
total_source_records: int = 0
|
||||
total_batches: int = 0
|
||||
successful_batches: int = 0
|
||||
failed_batches: int = 0
|
||||
successful_records: int = 0
|
||||
failed_records: int = 0
|
||||
errors: List[Dict[str, Any]] = field(default_factory=list)
|
||||
|
||||
def add_error(self, batch_idx: int, error: Exception, batch_size: int):
|
||||
"""Record batch error"""
|
||||
self.errors.append({
|
||||
'batch': batch_idx,
|
||||
'error_type': type(error).__name__,
|
||||
'error_msg': str(error),
|
||||
'records_lost': batch_size,
|
||||
'timestamp': time.time()
|
||||
})
|
||||
self.failed_batches += 1
|
||||
self.failed_records += batch_size
|
||||
|
||||
|
||||
class MigrationTool:
|
||||
"""LLM Cache Migration Tool"""
|
||||
|
||||
def __init__(self):
|
||||
self.source_storage = None
|
||||
self.target_storage = None
|
||||
self.source_workspace = ""
|
||||
self.target_workspace = ""
|
||||
self.batch_size = DEFAULT_BATCH_SIZE
|
||||
|
||||
def get_workspace_for_storage(self, storage_name: str) -> str:
|
||||
"""Get workspace for a specific storage type
|
||||
|
||||
Priority: Storage-specific env var > WORKSPACE env var > empty string
|
||||
|
||||
Args:
|
||||
storage_name: Storage implementation name
|
||||
|
||||
Returns:
|
||||
Workspace name
|
||||
"""
|
||||
# Check storage-specific workspace
|
||||
if storage_name in WORKSPACE_ENV_MAP:
|
||||
specific_workspace = os.getenv(WORKSPACE_ENV_MAP[storage_name])
|
||||
if specific_workspace:
|
||||
return specific_workspace
|
||||
|
||||
# Check generic WORKSPACE
|
||||
workspace = os.getenv("WORKSPACE", "")
|
||||
return workspace
|
||||
|
||||
def check_env_vars(self, storage_name: str) -> bool:
|
||||
"""Check if all required environment variables exist
|
||||
|
||||
Args:
|
||||
storage_name: Storage implementation name
|
||||
|
||||
Returns:
|
||||
True if all required env vars exist, False otherwise
|
||||
"""
|
||||
required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, [])
|
||||
missing_vars = [var for var in required_vars if var not in os.environ]
|
||||
|
||||
if missing_vars:
|
||||
print(f"✗ Missing required environment variables: {', '.join(missing_vars)}")
|
||||
return False
|
||||
|
||||
print("✓ All required environment variables are set")
|
||||
return True
|
||||
|
||||
def get_storage_class(self, storage_name: str):
|
||||
"""Dynamically import and return storage class
|
||||
|
||||
Args:
|
||||
storage_name: Storage implementation name
|
||||
|
||||
Returns:
|
||||
Storage class
|
||||
"""
|
||||
if storage_name == "JsonKVStorage":
|
||||
from lightrag.kg.json_kv_impl import JsonKVStorage
|
||||
return JsonKVStorage
|
||||
elif storage_name == "RedisKVStorage":
|
||||
from lightrag.kg.redis_impl import RedisKVStorage
|
||||
return RedisKVStorage
|
||||
elif storage_name == "PGKVStorage":
|
||||
from lightrag.kg.postgres_impl import PGKVStorage
|
||||
return PGKVStorage
|
||||
elif storage_name == "MongoKVStorage":
|
||||
from lightrag.kg.mongo_impl import MongoKVStorage
|
||||
return MongoKVStorage
|
||||
else:
|
||||
raise ValueError(f"Unsupported storage type: {storage_name}")
|
||||
|
||||
async def initialize_storage(self, storage_name: str, workspace: str):
|
||||
"""Initialize storage instance
|
||||
|
||||
Args:
|
||||
storage_name: Storage implementation name
|
||||
workspace: Workspace name
|
||||
|
||||
Returns:
|
||||
Initialized storage instance
|
||||
"""
|
||||
storage_class = self.get_storage_class(storage_name)
|
||||
|
||||
# Create global config
|
||||
global_config = {
|
||||
"working_dir": os.getenv("WORKING_DIR", "./rag_storage"),
|
||||
"embedding_batch_num": 10,
|
||||
}
|
||||
|
||||
# Initialize storage
|
||||
storage = storage_class(
|
||||
namespace=NameSpace.KV_STORE_LLM_RESPONSE_CACHE,
|
||||
workspace=workspace,
|
||||
global_config=global_config,
|
||||
embedding_func=None,
|
||||
)
|
||||
|
||||
# Initialize the storage
|
||||
await storage.initialize()
|
||||
|
||||
return storage
|
||||
|
||||
async def get_default_caches_json(self, storage) -> Dict[str, Any]:
|
||||
"""Get default caches from JsonKVStorage
|
||||
|
||||
Args:
|
||||
storage: JsonKVStorage instance
|
||||
|
||||
Returns:
|
||||
Dictionary of cache entries with default:extract:* or default:summary:* keys
|
||||
"""
|
||||
# Access _data directly - it's a dict from shared_storage
|
||||
async with storage._storage_lock:
|
||||
filtered = {}
|
||||
for key, value in storage._data.items():
|
||||
if key.startswith("default:extract:") or key.startswith("default:summary:"):
|
||||
filtered[key] = value
|
||||
return filtered
|
||||
|
||||
async def get_default_caches_redis(self, storage, batch_size: int = 1000) -> Dict[str, Any]:
|
||||
"""Get default caches from RedisKVStorage with pagination
|
||||
|
||||
Args:
|
||||
storage: RedisKVStorage instance
|
||||
batch_size: Number of keys to process per batch
|
||||
|
||||
Returns:
|
||||
Dictionary of cache entries with default:extract:* or default:summary:* keys
|
||||
"""
|
||||
import json
|
||||
|
||||
cache_data = {}
|
||||
|
||||
# Use _get_redis_connection() context manager
|
||||
async with storage._get_redis_connection() as redis:
|
||||
for pattern in ["default:extract:*", "default:summary:*"]:
|
||||
# Add namespace prefix to pattern
|
||||
prefixed_pattern = f"{storage.final_namespace}:{pattern}"
|
||||
cursor = 0
|
||||
|
||||
while True:
|
||||
# SCAN already implements cursor-based pagination
|
||||
cursor, keys = await redis.scan(
|
||||
cursor,
|
||||
match=prefixed_pattern,
|
||||
count=batch_size
|
||||
)
|
||||
|
||||
if keys:
|
||||
# Process this batch using pipeline with error handling
|
||||
try:
|
||||
pipe = redis.pipeline()
|
||||
for key in keys:
|
||||
pipe.get(key)
|
||||
values = await pipe.execute()
|
||||
|
||||
for key, value in zip(keys, values):
|
||||
if value:
|
||||
key_str = key.decode() if isinstance(key, bytes) else key
|
||||
# Remove namespace prefix to get original key
|
||||
original_key = key_str.replace(f"{storage.final_namespace}:", "", 1)
|
||||
cache_data[original_key] = json.loads(value)
|
||||
|
||||
except Exception as e:
|
||||
# Pipeline execution failed, fall back to individual gets
|
||||
print(f"⚠️ Pipeline execution failed for batch, using individual gets: {e}")
|
||||
for key in keys:
|
||||
try:
|
||||
value = await redis.get(key)
|
||||
if value:
|
||||
key_str = key.decode() if isinstance(key, bytes) else key
|
||||
original_key = key_str.replace(f"{storage.final_namespace}:", "", 1)
|
||||
cache_data[original_key] = json.loads(value)
|
||||
except Exception as individual_error:
|
||||
print(f"⚠️ Failed to get individual key {key}: {individual_error}")
|
||||
continue
|
||||
|
||||
if cursor == 0:
|
||||
break
|
||||
|
||||
# Yield control periodically to avoid blocking
|
||||
await asyncio.sleep(0)
|
||||
|
||||
return cache_data
|
||||
|
||||
async def get_default_caches_pg(self, storage, batch_size: int = 1000) -> Dict[str, Any]:
|
||||
"""Get default caches from PGKVStorage with pagination
|
||||
|
||||
Args:
|
||||
storage: PGKVStorage instance
|
||||
batch_size: Number of records to fetch per batch
|
||||
|
||||
Returns:
|
||||
Dictionary of cache entries with default:extract:* or default:summary:* keys
|
||||
"""
|
||||
from lightrag.kg.postgres_impl import namespace_to_table_name
|
||||
|
||||
cache_data = {}
|
||||
table_name = namespace_to_table_name(storage.namespace)
|
||||
offset = 0
|
||||
|
||||
while True:
|
||||
# Use LIMIT and OFFSET for pagination
|
||||
query = f"""
|
||||
SELECT id as key, original_prompt, return_value, chunk_id, cache_type, queryparam,
|
||||
EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
|
||||
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
|
||||
FROM {table_name}
|
||||
WHERE workspace = $1
|
||||
AND (id LIKE 'default:extract:%' OR id LIKE 'default:summary:%')
|
||||
ORDER BY id
|
||||
LIMIT $2 OFFSET $3
|
||||
"""
|
||||
|
||||
results = await storage.db.query(
|
||||
query,
|
||||
[storage.workspace, batch_size, offset],
|
||||
multirows=True
|
||||
)
|
||||
|
||||
if not results:
|
||||
break
|
||||
|
||||
for row in results:
|
||||
# Map PostgreSQL fields to cache format
|
||||
cache_entry = {
|
||||
"return": row.get("return_value", ""),
|
||||
"cache_type": row.get("cache_type"),
|
||||
"original_prompt": row.get("original_prompt", ""),
|
||||
"chunk_id": row.get("chunk_id"),
|
||||
"queryparam": row.get("queryparam"),
|
||||
"create_time": row.get("create_time", 0),
|
||||
"update_time": row.get("update_time", 0),
|
||||
}
|
||||
cache_data[row["key"]] = cache_entry
|
||||
|
||||
# If we got fewer results than batch_size, we're done
|
||||
if len(results) < batch_size:
|
||||
break
|
||||
|
||||
offset += batch_size
|
||||
|
||||
# Yield control periodically
|
||||
await asyncio.sleep(0)
|
||||
|
||||
return cache_data
|
||||
|
||||
async def get_default_caches_mongo(self, storage, batch_size: int = 1000) -> Dict[str, Any]:
|
||||
"""Get default caches from MongoKVStorage with cursor-based pagination
|
||||
|
||||
Args:
|
||||
storage: MongoKVStorage instance
|
||||
batch_size: Number of documents to process per batch
|
||||
|
||||
Returns:
|
||||
Dictionary of cache entries with default:extract:* or default:summary:* keys
|
||||
"""
|
||||
cache_data = {}
|
||||
|
||||
# MongoDB query with regex - use _data not collection
|
||||
query = {"_id": {"$regex": "^default:(extract|summary):"}}
|
||||
|
||||
# Use cursor without to_list() - process in batches
|
||||
cursor = storage._data.find(query).batch_size(batch_size)
|
||||
|
||||
async for doc in cursor:
|
||||
# Process each document as it comes
|
||||
doc_copy = doc.copy()
|
||||
key = doc_copy.pop("_id")
|
||||
|
||||
# Filter ALL MongoDB/database-specific fields
|
||||
# Following .clinerules: "Always filter deprecated/incompatible fields during deserialization"
|
||||
for field_name in ["namespace", "workspace", "_id", "content"]:
|
||||
doc_copy.pop(field_name, None)
|
||||
|
||||
cache_data[key] = doc_copy
|
||||
|
||||
# Periodically yield control (every batch_size documents)
|
||||
if len(cache_data) % batch_size == 0:
|
||||
await asyncio.sleep(0)
|
||||
|
||||
return cache_data
|
||||
|
||||
async def get_default_caches(self, storage, storage_name: str) -> Dict[str, Any]:
|
||||
"""Get default caches from any storage type
|
||||
|
||||
Args:
|
||||
storage: Storage instance
|
||||
storage_name: Storage type name
|
||||
|
||||
Returns:
|
||||
Dictionary of cache entries
|
||||
"""
|
||||
if storage_name == "JsonKVStorage":
|
||||
return await self.get_default_caches_json(storage)
|
||||
elif storage_name == "RedisKVStorage":
|
||||
return await self.get_default_caches_redis(storage)
|
||||
elif storage_name == "PGKVStorage":
|
||||
return await self.get_default_caches_pg(storage)
|
||||
elif storage_name == "MongoKVStorage":
|
||||
return await self.get_default_caches_mongo(storage)
|
||||
else:
|
||||
raise ValueError(f"Unsupported storage type: {storage_name}")
|
||||
|
||||
async def count_cache_types(self, cache_data: Dict[str, Any]) -> Dict[str, int]:
|
||||
"""Count cache entries by type
|
||||
|
||||
Args:
|
||||
cache_data: Dictionary of cache entries
|
||||
|
||||
Returns:
|
||||
Dictionary with counts for each cache type
|
||||
"""
|
||||
counts = {
|
||||
"extract": 0,
|
||||
"summary": 0,
|
||||
}
|
||||
|
||||
for key in cache_data.keys():
|
||||
if key.startswith("default:extract:"):
|
||||
counts["extract"] += 1
|
||||
elif key.startswith("default:summary:"):
|
||||
counts["summary"] += 1
|
||||
|
||||
return counts
|
||||
|
||||
def print_header(self):
|
||||
"""Print tool header"""
|
||||
print("\n" + "=" * 50)
|
||||
print("LLM Cache Migration Tool - LightRAG")
|
||||
print("=" * 50)
|
||||
|
||||
def print_storage_types(self):
|
||||
"""Print available storage types"""
|
||||
print("\nSupported KV Storage Types:")
|
||||
for key, value in STORAGE_TYPES.items():
|
||||
print(f"[{key}] {value}")
|
||||
|
||||
def get_user_choice(self, prompt: str, valid_choices: list) -> str:
|
||||
"""Get user choice with validation
|
||||
|
||||
Args:
|
||||
prompt: Prompt message
|
||||
valid_choices: List of valid choices
|
||||
|
||||
Returns:
|
||||
User's choice
|
||||
"""
|
||||
while True:
|
||||
choice = input(f"\n{prompt}: ").strip()
|
||||
if choice in valid_choices:
|
||||
return choice
|
||||
print(f"✗ Invalid choice, please enter one of: {', '.join(valid_choices)}")
|
||||
|
||||
async def setup_storage(self, storage_type: str) -> tuple:
|
||||
"""Setup and initialize storage
|
||||
|
||||
Args:
|
||||
storage_type: Type label (source/target)
|
||||
|
||||
Returns:
|
||||
Tuple of (storage_instance, storage_name, workspace, cache_data)
|
||||
"""
|
||||
print(f"\n=== {storage_type} Storage Setup ===")
|
||||
|
||||
# Get storage type choice
|
||||
choice = self.get_user_choice(
|
||||
f"Select {storage_type} storage type (1-4)",
|
||||
list(STORAGE_TYPES.keys())
|
||||
)
|
||||
storage_name = STORAGE_TYPES[choice]
|
||||
|
||||
# Check environment variables
|
||||
print("\nChecking environment variables...")
|
||||
if not self.check_env_vars(storage_name):
|
||||
return None, None, None, None
|
||||
|
||||
# Get workspace
|
||||
workspace = self.get_workspace_for_storage(storage_name)
|
||||
|
||||
# Initialize storage
|
||||
print(f"\nInitializing {storage_type} storage...")
|
||||
try:
|
||||
storage = await self.initialize_storage(storage_name, workspace)
|
||||
print(f"- Storage Type: {storage_name}")
|
||||
print(f"- Workspace: {workspace if workspace else '(default)'}")
|
||||
print("- Connection Status: ✓ Success")
|
||||
except Exception as e:
|
||||
print(f"✗ Initialization failed: {e}")
|
||||
return None, None, None, None
|
||||
|
||||
# Get cache data
|
||||
print("\nCounting cache records...")
|
||||
try:
|
||||
cache_data = await self.get_default_caches(storage, storage_name)
|
||||
counts = await self.count_cache_types(cache_data)
|
||||
|
||||
print(f"- default:extract: {counts['extract']:,} records")
|
||||
print(f"- default:summary: {counts['summary']:,} records")
|
||||
print(f"- Total: {len(cache_data):,} records")
|
||||
except Exception as e:
|
||||
print(f"✗ Counting failed: {e}")
|
||||
return None, None, None, None
|
||||
|
||||
return storage, storage_name, workspace, cache_data
|
||||
|
||||
async def migrate_caches(
|
||||
self,
|
||||
source_data: Dict[str, Any],
|
||||
target_storage,
|
||||
target_storage_name: str
|
||||
) -> MigrationStats:
|
||||
"""Migrate caches in batches with error tracking
|
||||
|
||||
Args:
|
||||
source_data: Source cache data
|
||||
target_storage: Target storage instance
|
||||
target_storage_name: Target storage type name
|
||||
|
||||
Returns:
|
||||
MigrationStats object with migration results and errors
|
||||
"""
|
||||
stats = MigrationStats()
|
||||
stats.total_source_records = len(source_data)
|
||||
|
||||
if stats.total_source_records == 0:
|
||||
print("\nNo records to migrate")
|
||||
return stats
|
||||
|
||||
# Convert to list for batching
|
||||
items = list(source_data.items())
|
||||
stats.total_batches = (stats.total_source_records + self.batch_size - 1) // self.batch_size
|
||||
|
||||
print("\n=== Starting Migration ===")
|
||||
|
||||
for batch_idx in range(stats.total_batches):
|
||||
start_idx = batch_idx * self.batch_size
|
||||
end_idx = min((batch_idx + 1) * self.batch_size, stats.total_source_records)
|
||||
batch_items = items[start_idx:end_idx]
|
||||
batch_data = dict(batch_items)
|
||||
|
||||
# Determine current cache type for display
|
||||
current_key = batch_items[0][0]
|
||||
cache_type = "extract" if "extract" in current_key else "summary"
|
||||
|
||||
try:
|
||||
# Attempt to write batch
|
||||
await target_storage.upsert(batch_data)
|
||||
|
||||
# Success - update stats
|
||||
stats.successful_batches += 1
|
||||
stats.successful_records += len(batch_data)
|
||||
|
||||
# Calculate progress
|
||||
progress = (end_idx / stats.total_source_records) * 100
|
||||
bar_length = 20
|
||||
filled_length = int(bar_length * end_idx // stats.total_source_records)
|
||||
bar = "█" * filled_length + "░" * (bar_length - filled_length)
|
||||
|
||||
print(f"Batch {batch_idx + 1}/{stats.total_batches}: {bar} "
|
||||
f"{end_idx:,}/{stats.total_source_records:,} ({progress:.0f}%) - "
|
||||
f"default:{cache_type} ✓")
|
||||
|
||||
except Exception as e:
|
||||
# Error - record and continue
|
||||
stats.add_error(batch_idx + 1, e, len(batch_data))
|
||||
|
||||
print(f"Batch {batch_idx + 1}/{stats.total_batches}: ✗ FAILED - "
|
||||
f"{type(e).__name__}: {str(e)}")
|
||||
|
||||
# Final persist
|
||||
print("\nPersisting data to disk...")
|
||||
try:
|
||||
await target_storage.index_done_callback()
|
||||
print("✓ Data persisted successfully")
|
||||
except Exception as e:
|
||||
print(f"✗ Persist failed: {e}")
|
||||
stats.add_error(0, e, 0) # batch 0 = persist error
|
||||
|
||||
return stats
|
||||
|
||||
def print_migration_report(self, stats: MigrationStats):
|
||||
"""Print comprehensive migration report
|
||||
|
||||
Args:
|
||||
stats: MigrationStats object with migration results
|
||||
"""
|
||||
print("\n" + "=" * 60)
|
||||
print("Migration Complete - Final Report")
|
||||
print("=" * 60)
|
||||
|
||||
# Overall statistics
|
||||
print("\n📊 Statistics:")
|
||||
print(f" Total source records: {stats.total_source_records:,}")
|
||||
print(f" Total batches: {stats.total_batches:,}")
|
||||
print(f" Successful batches: {stats.successful_batches:,}")
|
||||
print(f" Failed batches: {stats.failed_batches:,}")
|
||||
print(f" Successfully migrated: {stats.successful_records:,}")
|
||||
print(f" Failed to migrate: {stats.failed_records:,}")
|
||||
|
||||
# Success rate
|
||||
success_rate = (stats.successful_records / stats.total_source_records * 100) if stats.total_source_records > 0 else 0
|
||||
print(f" Success rate: {success_rate:.2f}%")
|
||||
|
||||
# Error details
|
||||
if stats.errors:
|
||||
print(f"\n⚠️ Errors encountered: {len(stats.errors)}")
|
||||
print("\nError Details:")
|
||||
print("-" * 60)
|
||||
|
||||
# Group errors by type
|
||||
error_types = {}
|
||||
for error in stats.errors:
|
||||
err_type = error['error_type']
|
||||
error_types[err_type] = error_types.get(err_type, 0) + 1
|
||||
|
||||
print("\nError Summary:")
|
||||
for err_type, count in sorted(error_types.items(), key=lambda x: -x[1]):
|
||||
print(f" - {err_type}: {count} occurrence(s)")
|
||||
|
||||
print("\nFirst 5 errors:")
|
||||
for i, error in enumerate(stats.errors[:5], 1):
|
||||
print(f"\n {i}. Batch {error['batch']}")
|
||||
print(f" Type: {error['error_type']}")
|
||||
print(f" Message: {error['error_msg']}")
|
||||
print(f" Records lost: {error['records_lost']:,}")
|
||||
|
||||
if len(stats.errors) > 5:
|
||||
print(f"\n ... and {len(stats.errors) - 5} more errors")
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
print("⚠️ WARNING: Migration completed with errors!")
|
||||
print(" Please review the error details above.")
|
||||
print("=" * 60)
|
||||
else:
|
||||
print("\n" + "=" * 60)
|
||||
print("✓ SUCCESS: All records migrated successfully!")
|
||||
print("=" * 60)
|
||||
|
||||
async def run(self):
|
||||
"""Run the migration tool"""
|
||||
try:
|
||||
# Print header
|
||||
self.print_header()
|
||||
self.print_storage_types()
|
||||
|
||||
# Setup source storage
|
||||
(
|
||||
self.source_storage,
|
||||
source_storage_name,
|
||||
self.source_workspace,
|
||||
source_data
|
||||
) = await self.setup_storage("Source")
|
||||
|
||||
if not self.source_storage:
|
||||
print("\n✗ Source storage setup failed")
|
||||
return
|
||||
|
||||
if not source_data:
|
||||
print("\n⚠ Source storage has no cache records to migrate")
|
||||
# Cleanup
|
||||
await self.source_storage.finalize()
|
||||
return
|
||||
|
||||
# Setup target storage
|
||||
(
|
||||
self.target_storage,
|
||||
target_storage_name,
|
||||
self.target_workspace,
|
||||
target_data
|
||||
) = await self.setup_storage("Target")
|
||||
|
||||
if not self.target_storage:
|
||||
print("\n✗ Target storage setup failed")
|
||||
# Cleanup source
|
||||
await self.source_storage.finalize()
|
||||
return
|
||||
|
||||
# Show migration summary
|
||||
print("\n" + "=" * 50)
|
||||
print("Migration Confirmation")
|
||||
print("=" * 50)
|
||||
print(f"Source: {source_storage_name} (workspace: {self.source_workspace if self.source_workspace else '(default)'}) - {len(source_data):,} records")
|
||||
print(f"Target: {target_storage_name} (workspace: {self.target_workspace if self.target_workspace else '(default)'}) - {len(target_data):,} records")
|
||||
print(f"Batch Size: {self.batch_size:,} records/batch")
|
||||
|
||||
if target_data:
|
||||
print(f"\n⚠ Warning: Target storage already has {len(target_data):,} records")
|
||||
print("Migration will overwrite records with the same keys")
|
||||
|
||||
# Confirm migration
|
||||
confirm = input("\nContinue? (y/n): ").strip().lower()
|
||||
if confirm != 'y':
|
||||
print("\n✗ Migration cancelled")
|
||||
# Cleanup
|
||||
await self.source_storage.finalize()
|
||||
await self.target_storage.finalize()
|
||||
return
|
||||
|
||||
# Perform migration with error tracking
|
||||
stats = await self.migrate_caches(source_data, self.target_storage, target_storage_name)
|
||||
|
||||
# Print comprehensive migration report
|
||||
self.print_migration_report(stats)
|
||||
|
||||
# Cleanup
|
||||
await self.source_storage.finalize()
|
||||
await self.target_storage.finalize()
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\n✗ Migration interrupted by user")
|
||||
except Exception as e:
|
||||
print(f"\n✗ Migration failed: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
finally:
|
||||
# Ensure cleanup
|
||||
if self.source_storage:
|
||||
try:
|
||||
await self.source_storage.finalize()
|
||||
except Exception:
|
||||
pass
|
||||
if self.target_storage:
|
||||
try:
|
||||
await self.target_storage.finalize()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
async def main():
|
||||
"""Main entry point"""
|
||||
tool = MigrationTool()
|
||||
await tool.run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Loading…
Add table
Reference in a new issue