Fix linting

This commit is contained in:
yangdx 2025-11-08 18:16:03 +08:00
parent 55274dde59
commit 0f2c0de8df
2 changed files with 194 additions and 152 deletions

View file

@ -58,6 +58,7 @@ DEFAULT_BATCH_SIZE = 1000
@dataclass
class MigrationStats:
"""Migration statistics and error tracking"""
total_source_records: int = 0
total_batches: int = 0
successful_batches: int = 0
@ -68,13 +69,15 @@ class MigrationStats:
def add_error(self, batch_idx: int, error: Exception, batch_size: int):
"""Record batch error"""
self.errors.append({
'batch': batch_idx,
'error_type': type(error).__name__,
'error_msg': str(error),
'records_lost': batch_size,
'timestamp': time.time()
})
self.errors.append(
{
"batch": batch_idx,
"error_type": type(error).__name__,
"error_msg": str(error),
"records_lost": batch_size,
"timestamp": time.time(),
}
)
self.failed_batches += 1
self.failed_records += batch_size
@ -123,7 +126,9 @@ class MigrationTool:
missing_vars = [var for var in required_vars if var not in os.environ]
if missing_vars:
print(f"✗ Missing required environment variables: {', '.join(missing_vars)}")
print(
f"✗ Missing required environment variables: {', '.join(missing_vars)}"
)
return False
print("✓ All required environment variables are set")
@ -140,15 +145,19 @@ class MigrationTool:
"""
if storage_name == "JsonKVStorage":
from lightrag.kg.json_kv_impl import JsonKVStorage
return JsonKVStorage
elif storage_name == "RedisKVStorage":
from lightrag.kg.redis_impl import RedisKVStorage
return RedisKVStorage
elif storage_name == "PGKVStorage":
from lightrag.kg.postgres_impl import PGKVStorage
return PGKVStorage
elif storage_name == "MongoKVStorage":
from lightrag.kg.mongo_impl import MongoKVStorage
return MongoKVStorage
else:
raise ValueError(f"Unsupported storage type: {storage_name}")
@ -197,11 +206,15 @@ class MigrationTool:
async with storage._storage_lock:
filtered = {}
for key, value in storage._data.items():
if key.startswith("default:extract:") or key.startswith("default:summary:"):
if key.startswith("default:extract:") or key.startswith(
"default:summary:"
):
filtered[key] = value
return filtered
async def get_default_caches_redis(self, storage, batch_size: int = 1000) -> Dict[str, Any]:
async def get_default_caches_redis(
self, storage, batch_size: int = 1000
) -> Dict[str, Any]:
"""Get default caches from RedisKVStorage with pagination
Args:
@ -225,9 +238,7 @@ class MigrationTool:
while True:
# SCAN already implements cursor-based pagination
cursor, keys = await redis.scan(
cursor,
match=prefixed_pattern,
count=batch_size
cursor, match=prefixed_pattern, count=batch_size
)
if keys:
@ -240,23 +251,37 @@ class MigrationTool:
for key, value in zip(keys, values):
if value:
key_str = key.decode() if isinstance(key, bytes) else key
key_str = (
key.decode() if isinstance(key, bytes) else key
)
# Remove namespace prefix to get original key
original_key = key_str.replace(f"{storage.final_namespace}:", "", 1)
original_key = key_str.replace(
f"{storage.final_namespace}:", "", 1
)
cache_data[original_key] = json.loads(value)
except Exception as e:
# Pipeline execution failed, fall back to individual gets
print(f"⚠️ Pipeline execution failed for batch, using individual gets: {e}")
print(
f"⚠️ Pipeline execution failed for batch, using individual gets: {e}"
)
for key in keys:
try:
value = await redis.get(key)
if value:
key_str = key.decode() if isinstance(key, bytes) else key
original_key = key_str.replace(f"{storage.final_namespace}:", "", 1)
key_str = (
key.decode()
if isinstance(key, bytes)
else key
)
original_key = key_str.replace(
f"{storage.final_namespace}:", "", 1
)
cache_data[original_key] = json.loads(value)
except Exception as individual_error:
print(f"⚠️ Failed to get individual key {key}: {individual_error}")
print(
f"⚠️ Failed to get individual key {key}: {individual_error}"
)
continue
if cursor == 0:
@ -267,7 +292,9 @@ class MigrationTool:
return cache_data
async def get_default_caches_pg(self, storage, batch_size: int = 1000) -> Dict[str, Any]:
async def get_default_caches_pg(
self, storage, batch_size: int = 1000
) -> Dict[str, Any]:
"""Get default caches from PGKVStorage with pagination
Args:
@ -297,9 +324,7 @@ class MigrationTool:
"""
results = await storage.db.query(
query,
[storage.workspace, batch_size, offset],
multirows=True
query, [storage.workspace, batch_size, offset], multirows=True
)
if not results:
@ -329,7 +354,9 @@ class MigrationTool:
return cache_data
async def get_default_caches_mongo(self, storage, batch_size: int = 1000) -> Dict[str, Any]:
async def get_default_caches_mongo(
self, storage, batch_size: int = 1000
) -> Dict[str, Any]:
"""Get default caches from MongoKVStorage with cursor-based pagination
Args:
@ -449,8 +476,7 @@ class MigrationTool:
# Get storage type choice
choice = self.get_user_choice(
f"Select {storage_type} storage type (1-4)",
list(STORAGE_TYPES.keys())
f"Select {storage_type} storage type (1-4)", list(STORAGE_TYPES.keys())
)
storage_name = STORAGE_TYPES[choice]
@ -489,10 +515,7 @@ class MigrationTool:
return storage, storage_name, workspace, cache_data
async def migrate_caches(
self,
source_data: Dict[str, Any],
target_storage,
target_storage_name: str
self, source_data: Dict[str, Any], target_storage, target_storage_name: str
) -> MigrationStats:
"""Migrate caches in batches with error tracking
@ -513,7 +536,9 @@ class MigrationTool:
# Convert to list for batching
items = list(source_data.items())
stats.total_batches = (stats.total_source_records + self.batch_size - 1) // self.batch_size
stats.total_batches = (
stats.total_source_records + self.batch_size - 1
) // self.batch_size
print("\n=== Starting Migration ===")
@ -541,16 +566,20 @@ class MigrationTool:
filled_length = int(bar_length * end_idx // stats.total_source_records)
bar = "" * filled_length + "" * (bar_length - filled_length)
print(f"Batch {batch_idx + 1}/{stats.total_batches}: {bar} "
f"{end_idx:,}/{stats.total_source_records:,} ({progress:.0f}%) - "
f"default:{cache_type}")
print(
f"Batch {batch_idx + 1}/{stats.total_batches}: {bar} "
f"{end_idx:,}/{stats.total_source_records:,} ({progress:.0f}%) - "
f"default:{cache_type}"
)
except Exception as e:
# Error - record and continue
stats.add_error(batch_idx + 1, e, len(batch_data))
print(f"Batch {batch_idx + 1}/{stats.total_batches}: ✗ FAILED - "
f"{type(e).__name__}: {str(e)}")
print(
f"Batch {batch_idx + 1}/{stats.total_batches}: ✗ FAILED - "
f"{type(e).__name__}: {str(e)}"
)
# Final persist
print("\nPersisting data to disk...")
@ -583,7 +612,11 @@ class MigrationTool:
print(f" Failed to migrate: {stats.failed_records:,}")
# Success rate
success_rate = (stats.successful_records / stats.total_source_records * 100) if stats.total_source_records > 0 else 0
success_rate = (
(stats.successful_records / stats.total_source_records * 100)
if stats.total_source_records > 0
else 0
)
print(f" Success rate: {success_rate:.2f}%")
# Error details
@ -595,7 +628,7 @@ class MigrationTool:
# Group errors by type
error_types = {}
for error in stats.errors:
err_type = error['error_type']
err_type = error["error_type"]
error_types[err_type] = error_types.get(err_type, 0) + 1
print("\nError Summary:")
@ -633,7 +666,7 @@ class MigrationTool:
self.source_storage,
source_storage_name,
self.source_workspace,
source_data
source_data,
) = await self.setup_storage("Source")
if not self.source_storage:
@ -651,7 +684,7 @@ class MigrationTool:
self.target_storage,
target_storage_name,
self.target_workspace,
target_data
target_data,
) = await self.setup_storage("Target")
if not self.target_storage:
@ -664,17 +697,23 @@ class MigrationTool:
print("\n" + "=" * 50)
print("Migration Confirmation")
print("=" * 50)
print(f"Source: {source_storage_name} (workspace: {self.source_workspace if self.source_workspace else '(default)'}) - {len(source_data):,} records")
print(f"Target: {target_storage_name} (workspace: {self.target_workspace if self.target_workspace else '(default)'}) - {len(target_data):,} records")
print(
f"Source: {source_storage_name} (workspace: {self.source_workspace if self.source_workspace else '(default)'}) - {len(source_data):,} records"
)
print(
f"Target: {target_storage_name} (workspace: {self.target_workspace if self.target_workspace else '(default)'}) - {len(target_data):,} records"
)
print(f"Batch Size: {self.batch_size:,} records/batch")
if target_data:
print(f"\n⚠ Warning: Target storage already has {len(target_data):,} records")
print(
f"\n⚠ Warning: Target storage already has {len(target_data):,} records"
)
print("Migration will overwrite records with the same keys")
# Confirm migration
confirm = input("\nContinue? (y/n): ").strip().lower()
if confirm != 'y':
if confirm != "y":
print("\n✗ Migration cancelled")
# Cleanup
await self.source_storage.finalize()
@ -682,7 +721,9 @@ class MigrationTool:
return
# Perform migration with error tracking
stats = await self.migrate_caches(source_data, self.target_storage, target_storage_name)
stats = await self.migrate_caches(
source_data, self.target_storage, target_storage_name
)
# Print comprehensive migration report
self.print_migration_report(stats)
@ -696,6 +737,7 @@ class MigrationTool:
except Exception as e:
print(f"\n✗ Migration failed: {e}")
import traceback
traceback.print_exc()
finally:
# Ensure cleanup