This commit is contained in:
Raphaël MANSUY 2025-12-04 19:14:30 +08:00
parent ba441a7b73
commit c0dfe65d1a
2 changed files with 70 additions and 89 deletions

View file

@ -561,30 +561,17 @@ class MigrationTool:
Yields: Yields:
Dictionary batches of cache entries Dictionary batches of cache entries
Note:
This method creates a snapshot of matching items while holding the lock,
then releases the lock before yielding batches. This prevents deadlock
when the target storage (also JsonKVStorage) tries to acquire the same
lock during upsert operations.
""" """
# Create a snapshot of matching items while holding the lock
async with storage._storage_lock: async with storage._storage_lock:
matching_items = [
(key, value)
for key, value in storage._data.items()
if key.startswith("default:extract:")
or key.startswith("default:summary:")
]
# Now iterate over snapshot without holding lock
batch = {} batch = {}
for key, value in matching_items: for key, value in storage._data.items():
if key.startswith("default:extract:") or key.startswith(
"default:summary:"
):
batch[key] = value batch[key] = value
if len(batch) >= batch_size: if len(batch) >= batch_size:
yield batch yield batch
batch = {} batch = {}
# Yield remaining items # Yield remaining items
if batch: if batch:
yield batch yield batch
@ -816,18 +803,39 @@ class MigrationTool:
for key, value in STORAGE_TYPES.items(): for key, value in STORAGE_TYPES.items():
print(f"[{key}] {value}") print(f"[{key}] {value}")
def get_user_choice(
self, prompt: str, valid_choices: list, allow_exit: bool = False
) -> str:
"""Get user choice with validation
Args:
prompt: Prompt message
valid_choices: List of valid choices
allow_exit: If True, allow user to press Enter or input '0' to exit
Returns:
User's choice, or None if user chose to exit
"""
exit_hint = " (Press Enter or 0 to exit)" if allow_exit else ""
while True:
choice = input(f"\n{prompt}{exit_hint}: ").strip()
# Check for exit
if allow_exit and (choice == "" or choice == "0"):
return None
if choice in valid_choices:
return choice
print(f"✗ Invalid choice, please enter one of: {', '.join(valid_choices)}")
async def setup_storage( async def setup_storage(
self, self, storage_type: str, use_streaming: bool = False
storage_type: str,
use_streaming: bool = False,
exclude_storage_name: str = None,
) -> tuple: ) -> tuple:
"""Setup and initialize storage """Setup and initialize storage
Args: Args:
storage_type: Type label (source/target) storage_type: Type label (source/target)
use_streaming: If True, only count records without loading. If False, load all data (legacy mode) use_streaming: If True, only count records without loading. If False, load all data (legacy mode)
exclude_storage_name: Storage type to exclude from selection (e.g., to prevent selecting same as source)
Returns: Returns:
Tuple of (storage_instance, storage_name, workspace, total_count) Tuple of (storage_instance, storage_name, workspace, total_count)
@ -835,58 +843,20 @@ class MigrationTool:
""" """
print(f"\n=== {storage_type} Storage Setup ===") print(f"\n=== {storage_type} Storage Setup ===")
# Filter and remap available storage types if exclusion is specified # Get storage type choice - allow exit for source storage
if exclude_storage_name: allow_exit = storage_type == "Source"
# Get available storage types (excluding source) choice = self.get_user_choice(
available_list = [ f"Select {storage_type} storage type (1-4)",
(k, v) for k, v in STORAGE_TYPES.items() if v != exclude_storage_name list(STORAGE_TYPES.keys()),
] allow_exit=allow_exit,
# Remap to sequential numbering (1, 2, 3...)
remapped_types = {
str(i + 1): name for i, (_, name) in enumerate(available_list)
}
# Print available types with new sequential numbers
print(
f"\nAvailable Storage Types for Target (source: {exclude_storage_name} excluded):"
) )
for key, value in remapped_types.items():
print(f"[{key}] {value}")
available_types = remapped_types # Handle exit
else: if choice is None:
# For source storage, use original numbering
available_types = STORAGE_TYPES.copy()
self.print_storage_types()
# Generate dynamic prompt based on number of options
num_options = len(available_types)
if num_options == 1:
prompt_range = "1"
else:
prompt_range = f"1-{num_options}"
# Custom input handling with exit support
while True:
choice = input(
f"\nSelect {storage_type} storage type ({prompt_range}) (Press Enter or 0 to exit): "
).strip()
# Check for exit
if choice == "" or choice == "0":
print("\n✓ Migration cancelled by user") print("\n✓ Migration cancelled by user")
return None, None, None, 0 return None, None, None, 0
# Check if choice is valid storage_name = STORAGE_TYPES[choice]
if choice in available_types:
break
print(
f"✗ Invalid choice. Please enter one of: {', '.join(available_types.keys())}"
)
storage_name = available_types[choice]
# Check environment variables # Check environment variables
print("\nChecking environment variables...") print("\nChecking environment variables...")
@ -1171,6 +1141,7 @@ class MigrationTool:
# Print header # Print header
self.print_header() self.print_header()
self.print_storage_types()
# Setup source storage with streaming (only count, don't load all data) # Setup source storage with streaming (only count, don't load all data)
( (
@ -1191,15 +1162,12 @@ class MigrationTool:
return return
# Setup target storage with streaming (only count, don't load all data) # Setup target storage with streaming (only count, don't load all data)
# Exclude source storage type from target selection
( (
self.target_storage, self.target_storage,
target_storage_name, target_storage_name,
self.target_workspace, self.target_workspace,
target_count, target_count,
) = await self.setup_storage( ) = await self.setup_storage("Target", use_streaming=True)
"Target", use_streaming=True, exclude_storage_name=source_storage_name
)
if not self.target_storage: if not self.target_storage:
print("\n✗ Target storage setup failed") print("\n✗ Target storage setup failed")

View file

@ -93,9 +93,11 @@ Supported KV Storage Types:
[3] PGKVStorage [3] PGKVStorage
[4] MongoKVStorage [4] MongoKVStorage
Select Source storage type (1-4): 1 Select Source storage type (1-4) (Press Enter or 0 to exit): 1
``` ```
**Note**: You can press Enter or type `0` at the source storage selection to exit gracefully.
#### 2. Source Storage Validation #### 2. Source Storage Validation
The tool will: The tool will:
- Check required environment variables - Check required environment variables
@ -113,22 +115,33 @@ Initializing Source storage...
- Connection Status: ✓ Success - Connection Status: ✓ Success
Counting cache records... Counting cache records...
- default:extract: 8,500 records
- default:summary: 234 records
- Total: 8,734 records - Total: 8,734 records
``` ```
**Progress Display by Storage Type:**
- **JsonKVStorage**: Fast in-memory counting, no progress display needed
- **RedisKVStorage**: Real-time scanning progress
```
Scanning Redis keys... found 8,734 records
```
- **PostgreSQL**: Shows timing if operation takes >1 second
```
Counting PostgreSQL records... (took 2.3s)
```
- **MongoDB**: Shows timing if operation takes >1 second
```
Counting MongoDB documents... (took 1.8s)
```
#### 3. Select Target Storage Type #### 3. Select Target Storage Type
Repeat steps 1-2 to select and validate the target storage. Repeat steps 1-2 to select and validate the target storage.
#### 4. Confirm Migration #### 4. Confirm Migration
Review the migration summary and confirm:
``` ```
Migration Confirmation
================================================== ==================================================
Migration Confirmation
Source: JsonKVStorage (workspace: space1) - 8,734 records Source: JsonKVStorage (workspace: space1) - 8,734 records
Target: MongoKVStorage (workspace: space1) - 0 records Target: MongoKVStorage (workspace: space1) - 0 records
Batch Size: 1,000 records/batch Batch Size: 1,000 records/batch