Fix linting

This commit is contained in:
yangdx 2025-11-08 18:16:03 +08:00
parent 55274dde59
commit 0f2c0de8df
2 changed files with 194 additions and 152 deletions

View file

@ -261,7 +261,7 @@ The tool implements comprehensive error tracking to ensure transparent and resil
### Error Reporting ### Error Reporting
After migration completes, a detailed report includes: After migration completes, a detailed report includes:
- **Statistics**: Total records, success/failure counts, success rate - **Statistics**: Total records, success/failure counts, success rate
- **Error Summary**: Grouped by error type with occurrence counts - **Error Summary**: Grouped by error type with occurrence counts
- **Error Details**: Batch number, error type, message, and records lost - **Error Details**: Batch number, error type, message, and records lost
- **Recommendations**: Clear indication of success or need for review - **Recommendations**: Clear indication of success or need for review

View file

@ -10,7 +10,7 @@ Usage:
Supported KV Storage Types: Supported KV Storage Types:
- JsonKVStorage - JsonKVStorage
- RedisKVStorage - RedisKVStorage
- PGKVStorage - PGKVStorage
- MongoKVStorage - MongoKVStorage
""" """
@ -58,6 +58,7 @@ DEFAULT_BATCH_SIZE = 1000
@dataclass @dataclass
class MigrationStats: class MigrationStats:
"""Migration statistics and error tracking""" """Migration statistics and error tracking"""
total_source_records: int = 0 total_source_records: int = 0
total_batches: int = 0 total_batches: int = 0
successful_batches: int = 0 successful_batches: int = 0
@ -65,16 +66,18 @@ class MigrationStats:
successful_records: int = 0 successful_records: int = 0
failed_records: int = 0 failed_records: int = 0
errors: List[Dict[str, Any]] = field(default_factory=list) errors: List[Dict[str, Any]] = field(default_factory=list)
def add_error(self, batch_idx: int, error: Exception, batch_size: int): def add_error(self, batch_idx: int, error: Exception, batch_size: int):
"""Record batch error""" """Record batch error"""
self.errors.append({ self.errors.append(
'batch': batch_idx, {
'error_type': type(error).__name__, "batch": batch_idx,
'error_msg': str(error), "error_type": type(error).__name__,
'records_lost': batch_size, "error_msg": str(error),
'timestamp': time.time() "records_lost": batch_size,
}) "timestamp": time.time(),
}
)
self.failed_batches += 1 self.failed_batches += 1
self.failed_records += batch_size self.failed_records += batch_size
@ -91,12 +94,12 @@ class MigrationTool:
def get_workspace_for_storage(self, storage_name: str) -> str: def get_workspace_for_storage(self, storage_name: str) -> str:
"""Get workspace for a specific storage type """Get workspace for a specific storage type
Priority: Storage-specific env var > WORKSPACE env var > empty string Priority: Storage-specific env var > WORKSPACE env var > empty string
Args: Args:
storage_name: Storage implementation name storage_name: Storage implementation name
Returns: Returns:
Workspace name Workspace name
""" """
@ -105,72 +108,78 @@ class MigrationTool:
specific_workspace = os.getenv(WORKSPACE_ENV_MAP[storage_name]) specific_workspace = os.getenv(WORKSPACE_ENV_MAP[storage_name])
if specific_workspace: if specific_workspace:
return specific_workspace return specific_workspace
# Check generic WORKSPACE # Check generic WORKSPACE
workspace = os.getenv("WORKSPACE", "") workspace = os.getenv("WORKSPACE", "")
return workspace return workspace
def check_env_vars(self, storage_name: str) -> bool: def check_env_vars(self, storage_name: str) -> bool:
"""Check if all required environment variables exist """Check if all required environment variables exist
Args: Args:
storage_name: Storage implementation name storage_name: Storage implementation name
Returns: Returns:
True if all required env vars exist, False otherwise True if all required env vars exist, False otherwise
""" """
required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, []) required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, [])
missing_vars = [var for var in required_vars if var not in os.environ] missing_vars = [var for var in required_vars if var not in os.environ]
if missing_vars: if missing_vars:
print(f"✗ Missing required environment variables: {', '.join(missing_vars)}") print(
f"✗ Missing required environment variables: {', '.join(missing_vars)}"
)
return False return False
print("✓ All required environment variables are set") print("✓ All required environment variables are set")
return True return True
def get_storage_class(self, storage_name: str): def get_storage_class(self, storage_name: str):
"""Dynamically import and return storage class """Dynamically import and return storage class
Args: Args:
storage_name: Storage implementation name storage_name: Storage implementation name
Returns: Returns:
Storage class Storage class
""" """
if storage_name == "JsonKVStorage": if storage_name == "JsonKVStorage":
from lightrag.kg.json_kv_impl import JsonKVStorage from lightrag.kg.json_kv_impl import JsonKVStorage
return JsonKVStorage return JsonKVStorage
elif storage_name == "RedisKVStorage": elif storage_name == "RedisKVStorage":
from lightrag.kg.redis_impl import RedisKVStorage from lightrag.kg.redis_impl import RedisKVStorage
return RedisKVStorage return RedisKVStorage
elif storage_name == "PGKVStorage": elif storage_name == "PGKVStorage":
from lightrag.kg.postgres_impl import PGKVStorage from lightrag.kg.postgres_impl import PGKVStorage
return PGKVStorage return PGKVStorage
elif storage_name == "MongoKVStorage": elif storage_name == "MongoKVStorage":
from lightrag.kg.mongo_impl import MongoKVStorage from lightrag.kg.mongo_impl import MongoKVStorage
return MongoKVStorage return MongoKVStorage
else: else:
raise ValueError(f"Unsupported storage type: {storage_name}") raise ValueError(f"Unsupported storage type: {storage_name}")
async def initialize_storage(self, storage_name: str, workspace: str): async def initialize_storage(self, storage_name: str, workspace: str):
"""Initialize storage instance """Initialize storage instance
Args: Args:
storage_name: Storage implementation name storage_name: Storage implementation name
workspace: Workspace name workspace: Workspace name
Returns: Returns:
Initialized storage instance Initialized storage instance
""" """
storage_class = self.get_storage_class(storage_name) storage_class = self.get_storage_class(storage_name)
# Create global config # Create global config
global_config = { global_config = {
"working_dir": os.getenv("WORKING_DIR", "./rag_storage"), "working_dir": os.getenv("WORKING_DIR", "./rag_storage"),
"embedding_batch_num": 10, "embedding_batch_num": 10,
} }
# Initialize storage # Initialize storage
storage = storage_class( storage = storage_class(
namespace=NameSpace.KV_STORE_LLM_RESPONSE_CACHE, namespace=NameSpace.KV_STORE_LLM_RESPONSE_CACHE,
@ -178,18 +187,18 @@ class MigrationTool:
global_config=global_config, global_config=global_config,
embedding_func=None, embedding_func=None,
) )
# Initialize the storage # Initialize the storage
await storage.initialize() await storage.initialize()
return storage return storage
async def get_default_caches_json(self, storage) -> Dict[str, Any]: async def get_default_caches_json(self, storage) -> Dict[str, Any]:
"""Get default caches from JsonKVStorage """Get default caches from JsonKVStorage
Args: Args:
storage: JsonKVStorage instance storage: JsonKVStorage instance
Returns: Returns:
Dictionary of cache entries with default:extract:* or default:summary:* keys Dictionary of cache entries with default:extract:* or default:summary:* keys
""" """
@ -197,39 +206,41 @@ class MigrationTool:
async with storage._storage_lock: async with storage._storage_lock:
filtered = {} filtered = {}
for key, value in storage._data.items(): for key, value in storage._data.items():
if key.startswith("default:extract:") or key.startswith("default:summary:"): if key.startswith("default:extract:") or key.startswith(
"default:summary:"
):
filtered[key] = value filtered[key] = value
return filtered return filtered
async def get_default_caches_redis(self, storage, batch_size: int = 1000) -> Dict[str, Any]: async def get_default_caches_redis(
self, storage, batch_size: int = 1000
) -> Dict[str, Any]:
"""Get default caches from RedisKVStorage with pagination """Get default caches from RedisKVStorage with pagination
Args: Args:
storage: RedisKVStorage instance storage: RedisKVStorage instance
batch_size: Number of keys to process per batch batch_size: Number of keys to process per batch
Returns: Returns:
Dictionary of cache entries with default:extract:* or default:summary:* keys Dictionary of cache entries with default:extract:* or default:summary:* keys
""" """
import json import json
cache_data = {} cache_data = {}
# Use _get_redis_connection() context manager # Use _get_redis_connection() context manager
async with storage._get_redis_connection() as redis: async with storage._get_redis_connection() as redis:
for pattern in ["default:extract:*", "default:summary:*"]: for pattern in ["default:extract:*", "default:summary:*"]:
# Add namespace prefix to pattern # Add namespace prefix to pattern
prefixed_pattern = f"{storage.final_namespace}:{pattern}" prefixed_pattern = f"{storage.final_namespace}:{pattern}"
cursor = 0 cursor = 0
while True: while True:
# SCAN already implements cursor-based pagination # SCAN already implements cursor-based pagination
cursor, keys = await redis.scan( cursor, keys = await redis.scan(
cursor, cursor, match=prefixed_pattern, count=batch_size
match=prefixed_pattern,
count=batch_size
) )
if keys: if keys:
# Process this batch using pipeline with error handling # Process this batch using pipeline with error handling
try: try:
@ -237,74 +248,88 @@ class MigrationTool:
for key in keys: for key in keys:
pipe.get(key) pipe.get(key)
values = await pipe.execute() values = await pipe.execute()
for key, value in zip(keys, values): for key, value in zip(keys, values):
if value: if value:
key_str = key.decode() if isinstance(key, bytes) else key key_str = (
key.decode() if isinstance(key, bytes) else key
)
# Remove namespace prefix to get original key # Remove namespace prefix to get original key
original_key = key_str.replace(f"{storage.final_namespace}:", "", 1) original_key = key_str.replace(
f"{storage.final_namespace}:", "", 1
)
cache_data[original_key] = json.loads(value) cache_data[original_key] = json.loads(value)
except Exception as e: except Exception as e:
# Pipeline execution failed, fall back to individual gets # Pipeline execution failed, fall back to individual gets
print(f"⚠️ Pipeline execution failed for batch, using individual gets: {e}") print(
f"⚠️ Pipeline execution failed for batch, using individual gets: {e}"
)
for key in keys: for key in keys:
try: try:
value = await redis.get(key) value = await redis.get(key)
if value: if value:
key_str = key.decode() if isinstance(key, bytes) else key key_str = (
original_key = key_str.replace(f"{storage.final_namespace}:", "", 1) key.decode()
if isinstance(key, bytes)
else key
)
original_key = key_str.replace(
f"{storage.final_namespace}:", "", 1
)
cache_data[original_key] = json.loads(value) cache_data[original_key] = json.loads(value)
except Exception as individual_error: except Exception as individual_error:
print(f"⚠️ Failed to get individual key {key}: {individual_error}") print(
f"⚠️ Failed to get individual key {key}: {individual_error}"
)
continue continue
if cursor == 0: if cursor == 0:
break break
# Yield control periodically to avoid blocking # Yield control periodically to avoid blocking
await asyncio.sleep(0) await asyncio.sleep(0)
return cache_data return cache_data
async def get_default_caches_pg(self, storage, batch_size: int = 1000) -> Dict[str, Any]: async def get_default_caches_pg(
self, storage, batch_size: int = 1000
) -> Dict[str, Any]:
"""Get default caches from PGKVStorage with pagination """Get default caches from PGKVStorage with pagination
Args: Args:
storage: PGKVStorage instance storage: PGKVStorage instance
batch_size: Number of records to fetch per batch batch_size: Number of records to fetch per batch
Returns: Returns:
Dictionary of cache entries with default:extract:* or default:summary:* keys Dictionary of cache entries with default:extract:* or default:summary:* keys
""" """
from lightrag.kg.postgres_impl import namespace_to_table_name from lightrag.kg.postgres_impl import namespace_to_table_name
cache_data = {} cache_data = {}
table_name = namespace_to_table_name(storage.namespace) table_name = namespace_to_table_name(storage.namespace)
offset = 0 offset = 0
while True: while True:
# Use LIMIT and OFFSET for pagination # Use LIMIT and OFFSET for pagination
query = f""" query = f"""
SELECT id as key, original_prompt, return_value, chunk_id, cache_type, queryparam, SELECT id as key, original_prompt, return_value, chunk_id, cache_type, queryparam,
EXTRACT(EPOCH FROM create_time)::BIGINT as create_time, EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM {table_name} FROM {table_name}
WHERE workspace = $1 WHERE workspace = $1
AND (id LIKE 'default:extract:%' OR id LIKE 'default:summary:%') AND (id LIKE 'default:extract:%' OR id LIKE 'default:summary:%')
ORDER BY id ORDER BY id
LIMIT $2 OFFSET $3 LIMIT $2 OFFSET $3
""" """
results = await storage.db.query( results = await storage.db.query(
query, query, [storage.workspace, batch_size, offset], multirows=True
[storage.workspace, batch_size, offset],
multirows=True
) )
if not results: if not results:
break break
for row in results: for row in results:
# Map PostgreSQL fields to cache format # Map PostgreSQL fields to cache format
cache_entry = { cache_entry = {
@ -317,61 +342,63 @@ class MigrationTool:
"update_time": row.get("update_time", 0), "update_time": row.get("update_time", 0),
} }
cache_data[row["key"]] = cache_entry cache_data[row["key"]] = cache_entry
# If we got fewer results than batch_size, we're done # If we got fewer results than batch_size, we're done
if len(results) < batch_size: if len(results) < batch_size:
break break
offset += batch_size offset += batch_size
# Yield control periodically # Yield control periodically
await asyncio.sleep(0) await asyncio.sleep(0)
return cache_data return cache_data
async def get_default_caches_mongo(self, storage, batch_size: int = 1000) -> Dict[str, Any]: async def get_default_caches_mongo(
self, storage, batch_size: int = 1000
) -> Dict[str, Any]:
"""Get default caches from MongoKVStorage with cursor-based pagination """Get default caches from MongoKVStorage with cursor-based pagination
Args: Args:
storage: MongoKVStorage instance storage: MongoKVStorage instance
batch_size: Number of documents to process per batch batch_size: Number of documents to process per batch
Returns: Returns:
Dictionary of cache entries with default:extract:* or default:summary:* keys Dictionary of cache entries with default:extract:* or default:summary:* keys
""" """
cache_data = {} cache_data = {}
# MongoDB query with regex - use _data not collection # MongoDB query with regex - use _data not collection
query = {"_id": {"$regex": "^default:(extract|summary):"}} query = {"_id": {"$regex": "^default:(extract|summary):"}}
# Use cursor without to_list() - process in batches # Use cursor without to_list() - process in batches
cursor = storage._data.find(query).batch_size(batch_size) cursor = storage._data.find(query).batch_size(batch_size)
async for doc in cursor: async for doc in cursor:
# Process each document as it comes # Process each document as it comes
doc_copy = doc.copy() doc_copy = doc.copy()
key = doc_copy.pop("_id") key = doc_copy.pop("_id")
# Filter ALL MongoDB/database-specific fields # Filter ALL MongoDB/database-specific fields
# Following .clinerules: "Always filter deprecated/incompatible fields during deserialization" # Following .clinerules: "Always filter deprecated/incompatible fields during deserialization"
for field_name in ["namespace", "workspace", "_id", "content"]: for field_name in ["namespace", "workspace", "_id", "content"]:
doc_copy.pop(field_name, None) doc_copy.pop(field_name, None)
cache_data[key] = doc_copy cache_data[key] = doc_copy
# Periodically yield control (every batch_size documents) # Periodically yield control (every batch_size documents)
if len(cache_data) % batch_size == 0: if len(cache_data) % batch_size == 0:
await asyncio.sleep(0) await asyncio.sleep(0)
return cache_data return cache_data
async def get_default_caches(self, storage, storage_name: str) -> Dict[str, Any]: async def get_default_caches(self, storage, storage_name: str) -> Dict[str, Any]:
"""Get default caches from any storage type """Get default caches from any storage type
Args: Args:
storage: Storage instance storage: Storage instance
storage_name: Storage type name storage_name: Storage type name
Returns: Returns:
Dictionary of cache entries Dictionary of cache entries
""" """
@ -388,10 +415,10 @@ class MigrationTool:
async def count_cache_types(self, cache_data: Dict[str, Any]) -> Dict[str, int]: async def count_cache_types(self, cache_data: Dict[str, Any]) -> Dict[str, int]:
"""Count cache entries by type """Count cache entries by type
Args: Args:
cache_data: Dictionary of cache entries cache_data: Dictionary of cache entries
Returns: Returns:
Dictionary with counts for each cache type Dictionary with counts for each cache type
""" """
@ -399,13 +426,13 @@ class MigrationTool:
"extract": 0, "extract": 0,
"summary": 0, "summary": 0,
} }
for key in cache_data.keys(): for key in cache_data.keys():
if key.startswith("default:extract:"): if key.startswith("default:extract:"):
counts["extract"] += 1 counts["extract"] += 1
elif key.startswith("default:summary:"): elif key.startswith("default:summary:"):
counts["summary"] += 1 counts["summary"] += 1
return counts return counts
def print_header(self): def print_header(self):
@ -422,11 +449,11 @@ class MigrationTool:
def get_user_choice(self, prompt: str, valid_choices: list) -> str: def get_user_choice(self, prompt: str, valid_choices: list) -> str:
"""Get user choice with validation """Get user choice with validation
Args: Args:
prompt: Prompt message prompt: Prompt message
valid_choices: List of valid choices valid_choices: List of valid choices
Returns: Returns:
User's choice User's choice
""" """
@ -438,30 +465,29 @@ class MigrationTool:
async def setup_storage(self, storage_type: str) -> tuple: async def setup_storage(self, storage_type: str) -> tuple:
"""Setup and initialize storage """Setup and initialize storage
Args: Args:
storage_type: Type label (source/target) storage_type: Type label (source/target)
Returns: Returns:
Tuple of (storage_instance, storage_name, workspace, cache_data) Tuple of (storage_instance, storage_name, workspace, cache_data)
""" """
print(f"\n=== {storage_type} Storage Setup ===") print(f"\n=== {storage_type} Storage Setup ===")
# Get storage type choice # Get storage type choice
choice = self.get_user_choice( choice = self.get_user_choice(
f"Select {storage_type} storage type (1-4)", f"Select {storage_type} storage type (1-4)", list(STORAGE_TYPES.keys())
list(STORAGE_TYPES.keys())
) )
storage_name = STORAGE_TYPES[choice] storage_name = STORAGE_TYPES[choice]
# Check environment variables # Check environment variables
print("\nChecking environment variables...") print("\nChecking environment variables...")
if not self.check_env_vars(storage_name): if not self.check_env_vars(storage_name):
return None, None, None, None return None, None, None, None
# Get workspace # Get workspace
workspace = self.get_workspace_for_storage(storage_name) workspace = self.get_workspace_for_storage(storage_name)
# Initialize storage # Initialize storage
print(f"\nInitializing {storage_type} storage...") print(f"\nInitializing {storage_type} storage...")
try: try:
@ -472,86 +498,89 @@ class MigrationTool:
except Exception as e: except Exception as e:
print(f"✗ Initialization failed: {e}") print(f"✗ Initialization failed: {e}")
return None, None, None, None return None, None, None, None
# Get cache data # Get cache data
print("\nCounting cache records...") print("\nCounting cache records...")
try: try:
cache_data = await self.get_default_caches(storage, storage_name) cache_data = await self.get_default_caches(storage, storage_name)
counts = await self.count_cache_types(cache_data) counts = await self.count_cache_types(cache_data)
print(f"- default:extract: {counts['extract']:,} records") print(f"- default:extract: {counts['extract']:,} records")
print(f"- default:summary: {counts['summary']:,} records") print(f"- default:summary: {counts['summary']:,} records")
print(f"- Total: {len(cache_data):,} records") print(f"- Total: {len(cache_data):,} records")
except Exception as e: except Exception as e:
print(f"✗ Counting failed: {e}") print(f"✗ Counting failed: {e}")
return None, None, None, None return None, None, None, None
return storage, storage_name, workspace, cache_data return storage, storage_name, workspace, cache_data
async def migrate_caches( async def migrate_caches(
self, self, source_data: Dict[str, Any], target_storage, target_storage_name: str
source_data: Dict[str, Any],
target_storage,
target_storage_name: str
) -> MigrationStats: ) -> MigrationStats:
"""Migrate caches in batches with error tracking """Migrate caches in batches with error tracking
Args: Args:
source_data: Source cache data source_data: Source cache data
target_storage: Target storage instance target_storage: Target storage instance
target_storage_name: Target storage type name target_storage_name: Target storage type name
Returns: Returns:
MigrationStats object with migration results and errors MigrationStats object with migration results and errors
""" """
stats = MigrationStats() stats = MigrationStats()
stats.total_source_records = len(source_data) stats.total_source_records = len(source_data)
if stats.total_source_records == 0: if stats.total_source_records == 0:
print("\nNo records to migrate") print("\nNo records to migrate")
return stats return stats
# Convert to list for batching # Convert to list for batching
items = list(source_data.items()) items = list(source_data.items())
stats.total_batches = (stats.total_source_records + self.batch_size - 1) // self.batch_size stats.total_batches = (
stats.total_source_records + self.batch_size - 1
) // self.batch_size
print("\n=== Starting Migration ===") print("\n=== Starting Migration ===")
for batch_idx in range(stats.total_batches): for batch_idx in range(stats.total_batches):
start_idx = batch_idx * self.batch_size start_idx = batch_idx * self.batch_size
end_idx = min((batch_idx + 1) * self.batch_size, stats.total_source_records) end_idx = min((batch_idx + 1) * self.batch_size, stats.total_source_records)
batch_items = items[start_idx:end_idx] batch_items = items[start_idx:end_idx]
batch_data = dict(batch_items) batch_data = dict(batch_items)
# Determine current cache type for display # Determine current cache type for display
current_key = batch_items[0][0] current_key = batch_items[0][0]
cache_type = "extract" if "extract" in current_key else "summary" cache_type = "extract" if "extract" in current_key else "summary"
try: try:
# Attempt to write batch # Attempt to write batch
await target_storage.upsert(batch_data) await target_storage.upsert(batch_data)
# Success - update stats # Success - update stats
stats.successful_batches += 1 stats.successful_batches += 1
stats.successful_records += len(batch_data) stats.successful_records += len(batch_data)
# Calculate progress # Calculate progress
progress = (end_idx / stats.total_source_records) * 100 progress = (end_idx / stats.total_source_records) * 100
bar_length = 20 bar_length = 20
filled_length = int(bar_length * end_idx // stats.total_source_records) filled_length = int(bar_length * end_idx // stats.total_source_records)
bar = "" * filled_length + "" * (bar_length - filled_length) bar = "" * filled_length + "" * (bar_length - filled_length)
print(f"Batch {batch_idx + 1}/{stats.total_batches}: {bar} " print(
f"{end_idx:,}/{stats.total_source_records:,} ({progress:.0f}%) - " f"Batch {batch_idx + 1}/{stats.total_batches}: {bar} "
f"default:{cache_type}") f"{end_idx:,}/{stats.total_source_records:,} ({progress:.0f}%) - "
f"default:{cache_type}"
)
except Exception as e: except Exception as e:
# Error - record and continue # Error - record and continue
stats.add_error(batch_idx + 1, e, len(batch_data)) stats.add_error(batch_idx + 1, e, len(batch_data))
print(f"Batch {batch_idx + 1}/{stats.total_batches}: ✗ FAILED - " print(
f"{type(e).__name__}: {str(e)}") f"Batch {batch_idx + 1}/{stats.total_batches}: ✗ FAILED - "
f"{type(e).__name__}: {str(e)}"
)
# Final persist # Final persist
print("\nPersisting data to disk...") print("\nPersisting data to disk...")
try: try:
@ -560,19 +589,19 @@ class MigrationTool:
except Exception as e: except Exception as e:
print(f"✗ Persist failed: {e}") print(f"✗ Persist failed: {e}")
stats.add_error(0, e, 0) # batch 0 = persist error stats.add_error(0, e, 0) # batch 0 = persist error
return stats return stats
def print_migration_report(self, stats: MigrationStats): def print_migration_report(self, stats: MigrationStats):
"""Print comprehensive migration report """Print comprehensive migration report
Args: Args:
stats: MigrationStats object with migration results stats: MigrationStats object with migration results
""" """
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("Migration Complete - Final Report") print("Migration Complete - Final Report")
print("=" * 60) print("=" * 60)
# Overall statistics # Overall statistics
print("\n📊 Statistics:") print("\n📊 Statistics:")
print(f" Total source records: {stats.total_source_records:,}") print(f" Total source records: {stats.total_source_records:,}")
@ -581,37 +610,41 @@ class MigrationTool:
print(f" Failed batches: {stats.failed_batches:,}") print(f" Failed batches: {stats.failed_batches:,}")
print(f" Successfully migrated: {stats.successful_records:,}") print(f" Successfully migrated: {stats.successful_records:,}")
print(f" Failed to migrate: {stats.failed_records:,}") print(f" Failed to migrate: {stats.failed_records:,}")
# Success rate # Success rate
success_rate = (stats.successful_records / stats.total_source_records * 100) if stats.total_source_records > 0 else 0 success_rate = (
(stats.successful_records / stats.total_source_records * 100)
if stats.total_source_records > 0
else 0
)
print(f" Success rate: {success_rate:.2f}%") print(f" Success rate: {success_rate:.2f}%")
# Error details # Error details
if stats.errors: if stats.errors:
print(f"\n⚠️ Errors encountered: {len(stats.errors)}") print(f"\n⚠️ Errors encountered: {len(stats.errors)}")
print("\nError Details:") print("\nError Details:")
print("-" * 60) print("-" * 60)
# Group errors by type # Group errors by type
error_types = {} error_types = {}
for error in stats.errors: for error in stats.errors:
err_type = error['error_type'] err_type = error["error_type"]
error_types[err_type] = error_types.get(err_type, 0) + 1 error_types[err_type] = error_types.get(err_type, 0) + 1
print("\nError Summary:") print("\nError Summary:")
for err_type, count in sorted(error_types.items(), key=lambda x: -x[1]): for err_type, count in sorted(error_types.items(), key=lambda x: -x[1]):
print(f" - {err_type}: {count} occurrence(s)") print(f" - {err_type}: {count} occurrence(s)")
print("\nFirst 5 errors:") print("\nFirst 5 errors:")
for i, error in enumerate(stats.errors[:5], 1): for i, error in enumerate(stats.errors[:5], 1):
print(f"\n {i}. Batch {error['batch']}") print(f"\n {i}. Batch {error['batch']}")
print(f" Type: {error['error_type']}") print(f" Type: {error['error_type']}")
print(f" Message: {error['error_msg']}") print(f" Message: {error['error_msg']}")
print(f" Records lost: {error['records_lost']:,}") print(f" Records lost: {error['records_lost']:,}")
if len(stats.errors) > 5: if len(stats.errors) > 5:
print(f"\n ... and {len(stats.errors) - 5} more errors") print(f"\n ... and {len(stats.errors) - 5} more errors")
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("⚠️ WARNING: Migration completed with errors!") print("⚠️ WARNING: Migration completed with errors!")
print(" Please review the error details above.") print(" Please review the error details above.")
@ -627,75 +660,84 @@ class MigrationTool:
# Print header # Print header
self.print_header() self.print_header()
self.print_storage_types() self.print_storage_types()
# Setup source storage # Setup source storage
( (
self.source_storage, self.source_storage,
source_storage_name, source_storage_name,
self.source_workspace, self.source_workspace,
source_data source_data,
) = await self.setup_storage("Source") ) = await self.setup_storage("Source")
if not self.source_storage: if not self.source_storage:
print("\n✗ Source storage setup failed") print("\n✗ Source storage setup failed")
return return
if not source_data: if not source_data:
print("\n⚠ Source storage has no cache records to migrate") print("\n⚠ Source storage has no cache records to migrate")
# Cleanup # Cleanup
await self.source_storage.finalize() await self.source_storage.finalize()
return return
# Setup target storage # Setup target storage
( (
self.target_storage, self.target_storage,
target_storage_name, target_storage_name,
self.target_workspace, self.target_workspace,
target_data target_data,
) = await self.setup_storage("Target") ) = await self.setup_storage("Target")
if not self.target_storage: if not self.target_storage:
print("\n✗ Target storage setup failed") print("\n✗ Target storage setup failed")
# Cleanup source # Cleanup source
await self.source_storage.finalize() await self.source_storage.finalize()
return return
# Show migration summary # Show migration summary
print("\n" + "=" * 50) print("\n" + "=" * 50)
print("Migration Confirmation") print("Migration Confirmation")
print("=" * 50) print("=" * 50)
print(f"Source: {source_storage_name} (workspace: {self.source_workspace if self.source_workspace else '(default)'}) - {len(source_data):,} records") print(
print(f"Target: {target_storage_name} (workspace: {self.target_workspace if self.target_workspace else '(default)'}) - {len(target_data):,} records") f"Source: {source_storage_name} (workspace: {self.source_workspace if self.source_workspace else '(default)'}) - {len(source_data):,} records"
)
print(
f"Target: {target_storage_name} (workspace: {self.target_workspace if self.target_workspace else '(default)'}) - {len(target_data):,} records"
)
print(f"Batch Size: {self.batch_size:,} records/batch") print(f"Batch Size: {self.batch_size:,} records/batch")
if target_data: if target_data:
print(f"\n⚠ Warning: Target storage already has {len(target_data):,} records") print(
f"\n⚠ Warning: Target storage already has {len(target_data):,} records"
)
print("Migration will overwrite records with the same keys") print("Migration will overwrite records with the same keys")
# Confirm migration # Confirm migration
confirm = input("\nContinue? (y/n): ").strip().lower() confirm = input("\nContinue? (y/n): ").strip().lower()
if confirm != 'y': if confirm != "y":
print("\n✗ Migration cancelled") print("\n✗ Migration cancelled")
# Cleanup # Cleanup
await self.source_storage.finalize() await self.source_storage.finalize()
await self.target_storage.finalize() await self.target_storage.finalize()
return return
# Perform migration with error tracking # Perform migration with error tracking
stats = await self.migrate_caches(source_data, self.target_storage, target_storage_name) stats = await self.migrate_caches(
source_data, self.target_storage, target_storage_name
)
# Print comprehensive migration report # Print comprehensive migration report
self.print_migration_report(stats) self.print_migration_report(stats)
# Cleanup # Cleanup
await self.source_storage.finalize() await self.source_storage.finalize()
await self.target_storage.finalize() await self.target_storage.finalize()
except KeyboardInterrupt: except KeyboardInterrupt:
print("\n\n✗ Migration interrupted by user") print("\n\n✗ Migration interrupted by user")
except Exception as e: except Exception as e:
print(f"\n✗ Migration failed: {e}") print(f"\n✗ Migration failed: {e}")
import traceback import traceback
traceback.print_exc() traceback.print_exc()
finally: finally:
# Ensure cleanup # Ensure cleanup