This commit is contained in:
Raphaël MANSUY 2025-12-04 19:14:30 +08:00
parent c90c4737a5
commit 840acfbef8

View file

@ -26,9 +26,7 @@ from dataclasses import dataclass, field
from dotenv import load_dotenv
# Add project root to path for imports
sys.path.insert(
0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
)
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from lightrag.kg import STORAGE_ENV_REQUIREMENTS
from lightrag.namespace import NameSpace
@ -62,14 +60,6 @@ WORKSPACE_ENV_MAP = {
DEFAULT_BATCH_SIZE = 1000
# Default count batch size for efficient counting
DEFAULT_COUNT_BATCH_SIZE = 1000
# ANSI color codes for terminal output
BOLD_CYAN = "\033[1;36m"
RESET = "\033[0m"
@dataclass
class MigrationStats:
"""Migration statistics and error tracking"""
@ -128,100 +118,27 @@ class MigrationTool:
workspace = os.getenv("WORKSPACE", "")
return workspace
def check_config_ini_for_storage(self, storage_name: str) -> bool:
"""Check if config.ini has configuration for the storage type
Args:
storage_name: Storage implementation name
Returns:
True if config.ini has the necessary configuration
"""
try:
import configparser
config = configparser.ConfigParser()
config.read("config.ini", "utf-8")
if storage_name == "RedisKVStorage":
return config.has_option("redis", "uri")
elif storage_name == "PGKVStorage":
return (
config.has_option("postgres", "user")
and config.has_option("postgres", "password")
and config.has_option("postgres", "database")
)
elif storage_name == "MongoKVStorage":
return config.has_option("mongodb", "uri") and config.has_option(
"mongodb", "database"
)
return False
except Exception:
return False
def check_env_vars(self, storage_name: str) -> bool:
"""Check environment variables, show warnings if missing but don't fail
"""Check if all required environment variables exist
Args:
storage_name: Storage implementation name
Returns:
Always returns True (warnings only, no hard failure)
True if all required env vars exist, False otherwise
"""
required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, [])
if not required_vars:
print("✓ No environment variables required")
return True
missing_vars = [var for var in required_vars if var not in os.environ]
if missing_vars:
print(
f"⚠️ Warning: Missing environment variables: {', '.join(missing_vars)}"
f"✗ Missing required environment variables: {', '.join(missing_vars)}"
)
# Check if config.ini has configuration
has_config = self.check_config_ini_for_storage(storage_name)
if has_config:
print(" ✓ Found configuration in config.ini")
else:
print(f" Will attempt to use defaults for {storage_name}")
return True
return False
print("✓ All required environment variables are set")
return True
def count_available_storage_types(self) -> int:
"""Count available storage types (with env vars, config.ini, or defaults)
Returns:
Number of available storage types
"""
available_count = 0
for storage_name in STORAGE_TYPES.values():
# Check if storage requires configuration
required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, [])
if not required_vars:
# JsonKVStorage, MongoKVStorage etc. - no config needed
available_count += 1
else:
# Check if has environment variables
has_env = all(var in os.environ for var in required_vars)
if has_env:
available_count += 1
else:
# Check if has config.ini configuration
has_config = self.check_config_ini_for_storage(storage_name)
if has_config:
available_count += 1
return available_count
def get_storage_class(self, storage_name: str):
"""Dynamically import and return storage class
@ -251,7 +168,7 @@ class MigrationTool:
raise ValueError(f"Unsupported storage type: {storage_name}")
async def initialize_storage(self, storage_name: str, workspace: str):
"""Initialize storage instance with fallback to config.ini and defaults
"""Initialize storage instance
Args:
storage_name: Storage implementation name
@ -259,9 +176,6 @@ class MigrationTool:
Returns:
Initialized storage instance
Raises:
Exception: If initialization fails
"""
storage_class = self.get_storage_class(storage_name)
@ -279,7 +193,7 @@ class MigrationTool:
embedding_func=None,
)
# Initialize the storage (may raise exception if connection fails)
# Initialize the storage
await storage.initialize()
return storage
@ -504,364 +418,6 @@ class MigrationTool:
else:
raise ValueError(f"Unsupported storage type: {storage_name}")
async def count_default_caches_json(self, storage) -> int:
"""Count default caches in JsonKVStorage - O(N) but very fast in-memory
Args:
storage: JsonKVStorage instance
Returns:
Total count of cache records
"""
async with storage._storage_lock:
return sum(
1
for key in storage._data.keys()
if key.startswith("default:extract:")
or key.startswith("default:summary:")
)
async def count_default_caches_redis(self, storage) -> int:
"""Count default caches in RedisKVStorage using SCAN with progress display
Args:
storage: RedisKVStorage instance
Returns:
Total count of cache records
"""
count = 0
print("Scanning Redis keys...", end="", flush=True)
async with storage._get_redis_connection() as redis:
for pattern in ["default:extract:*", "default:summary:*"]:
prefixed_pattern = f"{storage.final_namespace}:{pattern}"
cursor = 0
while True:
cursor, keys = await redis.scan(
cursor, match=prefixed_pattern, count=DEFAULT_COUNT_BATCH_SIZE
)
count += len(keys)
# Show progress
print(
f"\rScanning Redis keys... found {count:,} records",
end="",
flush=True,
)
if cursor == 0:
break
print() # New line after progress
return count
async def count_default_caches_pg(self, storage) -> int:
"""Count default caches in PostgreSQL using COUNT(*) with progress indicator
Args:
storage: PGKVStorage instance
Returns:
Total count of cache records
"""
from lightrag.kg.postgres_impl import namespace_to_table_name
table_name = namespace_to_table_name(storage.namespace)
query = f"""
SELECT COUNT(*) as count
FROM {table_name}
WHERE workspace = $1
AND (id LIKE 'default:extract:%' OR id LIKE 'default:summary:%')
"""
print("Counting PostgreSQL records...", end="", flush=True)
start_time = time.time()
result = await storage.db.query(query, [storage.workspace])
elapsed = time.time() - start_time
if elapsed > 1:
print(f" (took {elapsed:.1f}s)", end="")
print() # New line
return result["count"] if result else 0
async def count_default_caches_mongo(self, storage) -> int:
"""Count default caches in MongoDB using count_documents with progress indicator
Args:
storage: MongoKVStorage instance
Returns:
Total count of cache records
"""
query = {"_id": {"$regex": "^default:(extract|summary):"}}
print("Counting MongoDB documents...", end="", flush=True)
start_time = time.time()
count = await storage._data.count_documents(query)
elapsed = time.time() - start_time
if elapsed > 1:
print(f" (took {elapsed:.1f}s)", end="")
print() # New line
return count
async def count_default_caches(self, storage, storage_name: str) -> int:
"""Count default caches from any storage type efficiently
Args:
storage: Storage instance
storage_name: Storage type name
Returns:
Total count of cache records
"""
if storage_name == "JsonKVStorage":
return await self.count_default_caches_json(storage)
elif storage_name == "RedisKVStorage":
return await self.count_default_caches_redis(storage)
elif storage_name == "PGKVStorage":
return await self.count_default_caches_pg(storage)
elif storage_name == "MongoKVStorage":
return await self.count_default_caches_mongo(storage)
else:
raise ValueError(f"Unsupported storage type: {storage_name}")
async def stream_default_caches_json(self, storage, batch_size: int):
"""Stream default caches from JsonKVStorage - yields batches
Args:
storage: JsonKVStorage instance
batch_size: Size of each batch to yield
Yields:
Dictionary batches of cache entries
Note:
This method creates a snapshot of matching items while holding the lock,
then releases the lock before yielding batches. This prevents deadlock
when the target storage (also JsonKVStorage) tries to acquire the same
lock during upsert operations.
"""
# Create a snapshot of matching items while holding the lock
async with storage._storage_lock:
matching_items = [
(key, value)
for key, value in storage._data.items()
if key.startswith("default:extract:")
or key.startswith("default:summary:")
]
# Now iterate over snapshot without holding lock
batch = {}
for key, value in matching_items:
batch[key] = value
if len(batch) >= batch_size:
yield batch
batch = {}
# Yield remaining items
if batch:
yield batch
async def stream_default_caches_redis(self, storage, batch_size: int):
"""Stream default caches from RedisKVStorage - yields batches
Args:
storage: RedisKVStorage instance
batch_size: Size of each batch to yield
Yields:
Dictionary batches of cache entries
"""
import json
async with storage._get_redis_connection() as redis:
for pattern in ["default:extract:*", "default:summary:*"]:
prefixed_pattern = f"{storage.final_namespace}:{pattern}"
cursor = 0
while True:
cursor, keys = await redis.scan(
cursor, match=prefixed_pattern, count=batch_size
)
if keys:
try:
pipe = redis.pipeline()
for key in keys:
pipe.get(key)
values = await pipe.execute()
batch = {}
for key, value in zip(keys, values):
if value:
key_str = (
key.decode() if isinstance(key, bytes) else key
)
original_key = key_str.replace(
f"{storage.final_namespace}:", "", 1
)
batch[original_key] = json.loads(value)
if batch:
yield batch
except Exception as e:
print(f"⚠️ Pipeline execution failed for batch: {e}")
# Fall back to individual gets
batch = {}
for key in keys:
try:
value = await redis.get(key)
if value:
key_str = (
key.decode()
if isinstance(key, bytes)
else key
)
original_key = key_str.replace(
f"{storage.final_namespace}:", "", 1
)
batch[original_key] = json.loads(value)
except Exception as individual_error:
print(
f"⚠️ Failed to get individual key {key}: {individual_error}"
)
continue
if batch:
yield batch
if cursor == 0:
break
await asyncio.sleep(0)
async def stream_default_caches_pg(self, storage, batch_size: int):
"""Stream default caches from PostgreSQL - yields batches
Args:
storage: PGKVStorage instance
batch_size: Size of each batch to yield
Yields:
Dictionary batches of cache entries
"""
from lightrag.kg.postgres_impl import namespace_to_table_name
table_name = namespace_to_table_name(storage.namespace)
offset = 0
while True:
query = f"""
SELECT id as key, original_prompt, return_value, chunk_id, cache_type, queryparam,
EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM {table_name}
WHERE workspace = $1
AND (id LIKE 'default:extract:%' OR id LIKE 'default:summary:%')
ORDER BY id
LIMIT $2 OFFSET $3
"""
results = await storage.db.query(
query, [storage.workspace, batch_size, offset], multirows=True
)
if not results:
break
batch = {}
for row in results:
cache_entry = {
"return": row.get("return_value", ""),
"cache_type": row.get("cache_type"),
"original_prompt": row.get("original_prompt", ""),
"chunk_id": row.get("chunk_id"),
"queryparam": row.get("queryparam"),
"create_time": row.get("create_time", 0),
"update_time": row.get("update_time", 0),
}
batch[row["key"]] = cache_entry
if batch:
yield batch
if len(results) < batch_size:
break
offset += batch_size
await asyncio.sleep(0)
async def stream_default_caches_mongo(self, storage, batch_size: int):
"""Stream default caches from MongoDB - yields batches
Args:
storage: MongoKVStorage instance
batch_size: Size of each batch to yield
Yields:
Dictionary batches of cache entries
"""
query = {"_id": {"$regex": "^default:(extract|summary):"}}
cursor = storage._data.find(query).batch_size(batch_size)
batch = {}
async for doc in cursor:
doc_copy = doc.copy()
key = doc_copy.pop("_id")
# Filter MongoDB/database-specific fields
for field_name in ["namespace", "workspace", "_id", "content"]:
doc_copy.pop(field_name, None)
batch[key] = doc_copy
if len(batch) >= batch_size:
yield batch
batch = {}
# Yield remaining items
if batch:
yield batch
async def stream_default_caches(
self, storage, storage_name: str, batch_size: int = None
):
"""Stream default caches from any storage type - unified interface
Args:
storage: Storage instance
storage_name: Storage type name
batch_size: Size of each batch to yield (defaults to self.batch_size)
Yields:
Dictionary batches of cache entries
"""
if batch_size is None:
batch_size = self.batch_size
if storage_name == "JsonKVStorage":
async for batch in self.stream_default_caches_json(storage, batch_size):
yield batch
elif storage_name == "RedisKVStorage":
async for batch in self.stream_default_caches_redis(storage, batch_size):
yield batch
elif storage_name == "PGKVStorage":
async for batch in self.stream_default_caches_pg(storage, batch_size):
yield batch
elif storage_name == "MongoKVStorage":
async for batch in self.stream_default_caches_mongo(storage, batch_size):
yield batch
else:
raise ValueError(f"Unsupported storage type: {storage_name}")
async def count_cache_types(self, cache_data: Dict[str, Any]) -> Dict[str, int]:
"""Count cache entries by type
@ -896,201 +452,77 @@ class MigrationTool:
for key, value in STORAGE_TYPES.items():
print(f"[{key}] {value}")
def format_workspace(self, workspace: str) -> str:
"""Format workspace name with highlighting
def get_user_choice(self, prompt: str, valid_choices: list) -> str:
"""Get user choice with validation
Args:
workspace: Workspace name (may be empty)
prompt: Prompt message
valid_choices: List of valid choices
Returns:
Formatted workspace string with ANSI color codes
User's choice
"""
if workspace:
return f"{BOLD_CYAN}{workspace}{RESET}"
else:
return f"{BOLD_CYAN}(default){RESET}"
while True:
choice = input(f"\n{prompt}: ").strip()
if choice in valid_choices:
return choice
print(f"✗ Invalid choice, please enter one of: {', '.join(valid_choices)}")
def format_storage_name(self, storage_name: str) -> str:
"""Format storage type name with highlighting
Args:
storage_name: Storage type name
Returns:
Formatted storage name string with ANSI color codes
"""
return f"{BOLD_CYAN}{storage_name}{RESET}"
async def setup_storage(
self,
storage_type: str,
use_streaming: bool = False,
exclude_storage_name: str = None,
) -> tuple:
"""Setup and initialize storage with config.ini fallback support
async def setup_storage(self, storage_type: str) -> tuple:
"""Setup and initialize storage
Args:
storage_type: Type label (source/target)
use_streaming: If True, only count records without loading. If False, load all data (legacy mode)
exclude_storage_name: Storage type to exclude from selection (e.g., to prevent selecting same as source)
Returns:
Tuple of (storage_instance, storage_name, workspace, total_count)
Returns (None, None, None, 0) if user chooses to exit
Tuple of (storage_instance, storage_name, workspace, cache_data)
"""
print(f"\n=== {storage_type} Storage Setup ===")
# Filter and remap available storage types if exclusion is specified
if exclude_storage_name:
# Get available storage types (excluding source)
available_list = [
(k, v) for k, v in STORAGE_TYPES.items() if v != exclude_storage_name
]
# Get storage type choice
choice = self.get_user_choice(
f"Select {storage_type} storage type (1-4)", list(STORAGE_TYPES.keys())
)
storage_name = STORAGE_TYPES[choice]
# Remap to sequential numbering (1, 2, 3...)
remapped_types = {
str(i + 1): name for i, (_, name) in enumerate(available_list)
}
# Print available types with new sequential numbers
print(
f"\nAvailable Storage Types for Target (source: {exclude_storage_name} excluded):"
)
for key, value in remapped_types.items():
print(f"[{key}] {value}")
available_types = remapped_types
else:
# For source storage, use original numbering
available_types = STORAGE_TYPES.copy()
self.print_storage_types()
# Generate dynamic prompt based on number of options
num_options = len(available_types)
if num_options == 1:
prompt_range = "1"
else:
prompt_range = f"1-{num_options}"
# Custom input handling with exit support
while True:
choice = input(
f"\nSelect {storage_type} storage type ({prompt_range}) (Press Enter to exit): "
).strip()
# Check for exit
if choice == "" or choice == "0":
print("\n✓ Migration cancelled by user")
return None, None, None, 0
# Check if choice is valid
if choice in available_types:
break
print(
f"✗ Invalid choice. Please enter one of: {', '.join(available_types.keys())}"
)
storage_name = available_types[choice]
# Check configuration (warnings only, doesn't block)
print("\nChecking configuration...")
self.check_env_vars(storage_name)
# Check environment variables
print("\nChecking environment variables...")
if not self.check_env_vars(storage_name):
return None, None, None, None
# Get workspace
workspace = self.get_workspace_for_storage(storage_name)
# Initialize storage (real validation point)
# Initialize storage
print(f"\nInitializing {storage_type} storage...")
try:
storage = await self.initialize_storage(storage_name, workspace)
print(f"- Storage Type: {storage_name}")
print(f"- Workspace: {workspace if workspace else '(default)'}")
print("- Connection Status: ✓ Success")
# Show configuration source for transparency
if storage_name == "RedisKVStorage":
config_source = (
"environment variable"
if "REDIS_URI" in os.environ
else "config.ini or default"
)
print(f"- Configuration Source: {config_source}")
elif storage_name == "PGKVStorage":
config_source = (
"environment variables"
if all(
var in os.environ
for var in STORAGE_ENV_REQUIREMENTS[storage_name]
)
else "config.ini or defaults"
)
print(f"- Configuration Source: {config_source}")
elif storage_name == "MongoKVStorage":
config_source = (
"environment variables"
if all(
var in os.environ
for var in STORAGE_ENV_REQUIREMENTS[storage_name]
)
else "config.ini or defaults"
)
print(f"- Configuration Source: {config_source}")
except Exception as e:
print(f"✗ Initialization failed: {e}")
print(f"\nFor {storage_name}, you can configure using:")
print(" 1. Environment variables (highest priority)")
return None, None, None, None
# Show specific environment variable requirements
if storage_name in STORAGE_ENV_REQUIREMENTS:
for var in STORAGE_ENV_REQUIREMENTS[storage_name]:
print(f" - {var}")
print(" 2. config.ini file (medium priority)")
if storage_name == "RedisKVStorage":
print(" [redis]")
print(" uri = redis://localhost:6379")
elif storage_name == "PGKVStorage":
print(" [postgres]")
print(" host = localhost")
print(" port = 5432")
print(" user = postgres")
print(" password = yourpassword")
print(" database = lightrag")
elif storage_name == "MongoKVStorage":
print(" [mongodb]")
print(" uri = mongodb://root:root@localhost:27017/")
print(" database = LightRAG")
return None, None, None, 0
# Count cache records efficiently
print(f"\n{'Counting' if use_streaming else 'Loading'} cache records...")
# Get cache data
print("\nCounting cache records...")
try:
if use_streaming:
# Use efficient counting without loading data
total_count = await self.count_default_caches(storage, storage_name)
print(f"- Total: {total_count:,} records")
else:
# Legacy mode: load all data
cache_data = await self.get_default_caches(storage, storage_name)
counts = await self.count_cache_types(cache_data)
total_count = len(cache_data)
cache_data = await self.get_default_caches(storage, storage_name)
counts = await self.count_cache_types(cache_data)
print(f"- default:extract: {counts['extract']:,} records")
print(f"- default:summary: {counts['summary']:,} records")
print(f"- Total: {total_count:,} records")
print(f"- default:extract: {counts['extract']:,} records")
print(f"- default:summary: {counts['summary']:,} records")
print(f"- Total: {len(cache_data):,} records")
except Exception as e:
print(f"{'Counting' if use_streaming else 'Loading'} failed: {e}")
return None, None, None, 0
print(f"✗ Counting failed: {e}")
return None, None, None, None
return storage, storage_name, workspace, total_count
return storage, storage_name, workspace, cache_data
async def migrate_caches(
self, source_data: Dict[str, Any], target_storage, target_storage_name: str
) -> MigrationStats:
"""Migrate caches in batches with error tracking (Legacy mode - loads all data)
"""Migrate caches in batches with error tracking
Args:
source_data: Source cache data
@ -1165,98 +597,6 @@ class MigrationTool:
return stats
async def migrate_caches_streaming(
self,
source_storage,
source_storage_name: str,
target_storage,
target_storage_name: str,
total_records: int,
) -> MigrationStats:
"""Migrate caches using streaming approach - minimal memory footprint
Args:
source_storage: Source storage instance
source_storage_name: Source storage type name
target_storage: Target storage instance
target_storage_name: Target storage type name
total_records: Total number of records to migrate
Returns:
MigrationStats object with migration results and errors
"""
stats = MigrationStats()
stats.total_source_records = total_records
if stats.total_source_records == 0:
print("\nNo records to migrate")
return stats
# Calculate total batches
stats.total_batches = (total_records + self.batch_size - 1) // self.batch_size
print("\n=== Starting Streaming Migration ===")
print(
f"💡 Memory-optimized mode: Processing {self.batch_size:,} records at a time\n"
)
batch_idx = 0
# Stream batches from source and write to target immediately
async for batch in self.stream_default_caches(
source_storage, source_storage_name
):
batch_idx += 1
# Determine current cache type for display
if batch:
first_key = next(iter(batch.keys()))
cache_type = "extract" if "extract" in first_key else "summary"
else:
cache_type = "unknown"
try:
# Write batch to target storage
await target_storage.upsert(batch)
# Success - update stats
stats.successful_batches += 1
stats.successful_records += len(batch)
# Calculate progress with known total
progress = (stats.successful_records / total_records) * 100
bar_length = 20
filled_length = int(
bar_length * stats.successful_records // total_records
)
bar = "" * filled_length + "" * (bar_length - filled_length)
print(
f"Batch {batch_idx}/{stats.total_batches}: {bar} "
f"{stats.successful_records:,}/{total_records:,} ({progress:.1f}%) - "
f"default:{cache_type}"
)
except Exception as e:
# Error - record and continue
stats.add_error(batch_idx, e, len(batch))
print(
f"Batch {batch_idx}/{stats.total_batches}: ✗ FAILED - "
f"{type(e).__name__}: {str(e)}"
)
# Final persist
print("\nPersisting data to disk...")
try:
await target_storage.index_done_callback()
print("✓ Data persisted successfully")
except Exception as e:
print(f"✗ Persist failed: {e}")
stats.add_error(0, e, 0) # batch 0 = persist error
return stats
def print_migration_report(self, stats: MigrationStats):
"""Print comprehensive migration report
@ -1320,68 +660,41 @@ class MigrationTool:
print("=" * 60)
async def run(self):
"""Run the migration tool with streaming approach and early validation"""
"""Run the migration tool"""
try:
# Initialize shared storage (REQUIRED for storage classes to work)
from lightrag.kg.shared_storage import initialize_share_data
initialize_share_data(workers=1)
# Print header
self.print_header()
self.print_storage_types()
# Setup source storage with streaming (only count, don't load all data)
# Setup source storage
(
self.source_storage,
source_storage_name,
self.source_workspace,
source_count,
) = await self.setup_storage("Source", use_streaming=True)
source_data,
) = await self.setup_storage("Source")
# Check if user cancelled (setup_storage returns None for all fields)
if self.source_storage is None:
if not self.source_storage:
print("\n✗ Source storage setup failed")
return
# Check if there are at least 2 storage types available
available_count = self.count_available_storage_types()
if available_count <= 1:
print("\n" + "=" * 60)
print("⚠️ Warning: Migration Not Possible")
print("=" * 60)
print(f"Only {available_count} storage type(s) available.")
print("Migration requires at least 2 different storage types.")
print("\nTo enable migration, configure additional storage:")
print(" 1. Set environment variables, OR")
print(" 2. Update config.ini file")
print("\nSupported storage types:")
for name in STORAGE_TYPES.values():
if name != source_storage_name:
print(f" - {name}")
if name in STORAGE_ENV_REQUIREMENTS:
for var in STORAGE_ENV_REQUIREMENTS[name]:
print(f" Required: {var}")
print("=" * 60)
if not source_data:
print("\n⚠ Source storage has no cache records to migrate")
# Cleanup
await self.source_storage.finalize()
return
if source_count == 0:
print("\n⚠️ Source storage has no cache records to migrate")
# Cleanup
await self.source_storage.finalize()
return
# Setup target storage with streaming (only count, don't load all data)
# Exclude source storage type from target selection
# Setup target storage
(
self.target_storage,
target_storage_name,
self.target_workspace,
target_count,
) = await self.setup_storage(
"Target", use_streaming=True, exclude_storage_name=source_storage_name
)
target_data,
) = await self.setup_storage("Target")
if not self.target_storage:
print("\n✗ Target storage setup failed")
@ -1394,17 +707,16 @@ class MigrationTool:
print("Migration Confirmation")
print("=" * 50)
print(
f"Source: {self.format_storage_name(source_storage_name)} (workspace: {self.format_workspace(self.source_workspace)}) - {source_count:,} records"
f"Source: {source_storage_name} (workspace: {self.source_workspace if self.source_workspace else '(default)'}) - {len(source_data):,} records"
)
print(
f"Target: {self.format_storage_name(target_storage_name)} (workspace: {self.format_workspace(self.target_workspace)}) - {target_count:,} records"
f"Target: {target_storage_name} (workspace: {self.target_workspace if self.target_workspace else '(default)'}) - {len(target_data):,} records"
)
print(f"Batch Size: {self.batch_size:,} records/batch")
print("Memory Mode: Streaming (memory-optimized)")
if target_count > 0:
if target_data:
print(
f"\n⚠️ Warning: Target storage already has {target_count:,} records"
f"\n⚠️ Warning: Target storage already has {len(target_data):,} records"
)
print("Migration will overwrite records with the same keys")
@ -1417,13 +729,9 @@ class MigrationTool:
await self.target_storage.finalize()
return
# Perform streaming migration with error tracking
stats = await self.migrate_caches_streaming(
self.source_storage,
source_storage_name,
self.target_storage,
target_storage_name,
source_count,
# Perform migration with error tracking
stats = await self.migrate_caches(
source_data, self.target_storage, target_storage_name
)
# Print comprehensive migration report
@ -1452,11 +760,10 @@ class MigrationTool:
await self.target_storage.finalize()
except Exception:
pass
# Finalize shared storage
try:
from lightrag.kg.shared_storage import finalize_share_data
finalize_share_data()
except Exception:
pass