Fix deadlock in JSON cache migration and prevent same storage selection
- Snapshot JSON data before yielding batches - Release lock during batch processing - Exclude source type from target selection - Add detailed docstring for lock behavior - Filter available storage types properly
This commit is contained in:
parent
6b9f13c792
commit
5be04263b2
1 changed files with 50 additions and 14 deletions
|
|
@ -561,20 +561,33 @@ class MigrationTool:
|
||||||
|
|
||||||
Yields:
|
Yields:
|
||||||
Dictionary batches of cache entries
|
Dictionary batches of cache entries
|
||||||
|
|
||||||
|
Note:
|
||||||
|
This method creates a snapshot of matching items while holding the lock,
|
||||||
|
then releases the lock before yielding batches. This prevents deadlock
|
||||||
|
when the target storage (also JsonKVStorage) tries to acquire the same
|
||||||
|
lock during upsert operations.
|
||||||
"""
|
"""
|
||||||
|
# Create a snapshot of matching items while holding the lock
|
||||||
async with storage._storage_lock:
|
async with storage._storage_lock:
|
||||||
batch = {}
|
matching_items = [
|
||||||
for key, value in storage._data.items():
|
(key, value)
|
||||||
if key.startswith("default:extract:") or key.startswith(
|
for key, value in storage._data.items()
|
||||||
"default:summary:"
|
if key.startswith("default:extract:")
|
||||||
):
|
or key.startswith("default:summary:")
|
||||||
batch[key] = value
|
]
|
||||||
if len(batch) >= batch_size:
|
|
||||||
yield batch
|
# Now iterate over snapshot without holding lock
|
||||||
batch = {}
|
batch = {}
|
||||||
# Yield remaining items
|
for key, value in matching_items:
|
||||||
if batch:
|
batch[key] = value
|
||||||
|
if len(batch) >= batch_size:
|
||||||
yield batch
|
yield batch
|
||||||
|
batch = {}
|
||||||
|
|
||||||
|
# Yield remaining items
|
||||||
|
if batch:
|
||||||
|
yield batch
|
||||||
|
|
||||||
async def stream_default_caches_redis(self, storage, batch_size: int):
|
async def stream_default_caches_redis(self, storage, batch_size: int):
|
||||||
"""Stream default caches from RedisKVStorage - yields batches
|
"""Stream default caches from RedisKVStorage - yields batches
|
||||||
|
|
@ -829,13 +842,17 @@ class MigrationTool:
|
||||||
print(f"✗ Invalid choice, please enter one of: {', '.join(valid_choices)}")
|
print(f"✗ Invalid choice, please enter one of: {', '.join(valid_choices)}")
|
||||||
|
|
||||||
async def setup_storage(
|
async def setup_storage(
|
||||||
self, storage_type: str, use_streaming: bool = False
|
self,
|
||||||
|
storage_type: str,
|
||||||
|
use_streaming: bool = False,
|
||||||
|
exclude_storage_name: str = None,
|
||||||
) -> tuple:
|
) -> tuple:
|
||||||
"""Setup and initialize storage
|
"""Setup and initialize storage
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
storage_type: Type label (source/target)
|
storage_type: Type label (source/target)
|
||||||
use_streaming: If True, only count records without loading. If False, load all data (legacy mode)
|
use_streaming: If True, only count records without loading. If False, load all data (legacy mode)
|
||||||
|
exclude_storage_name: Storage type to exclude from selection (e.g., to prevent selecting same as source)
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Tuple of (storage_instance, storage_name, workspace, total_count)
|
Tuple of (storage_instance, storage_name, workspace, total_count)
|
||||||
|
|
@ -843,11 +860,27 @@ class MigrationTool:
|
||||||
"""
|
"""
|
||||||
print(f"\n=== {storage_type} Storage Setup ===")
|
print(f"\n=== {storage_type} Storage Setup ===")
|
||||||
|
|
||||||
|
# Filter available storage types if exclusion is specified
|
||||||
|
available_types = STORAGE_TYPES.copy()
|
||||||
|
if exclude_storage_name:
|
||||||
|
# Remove the excluded storage type from available options
|
||||||
|
available_types = {
|
||||||
|
k: v for k, v in STORAGE_TYPES.items() if v != exclude_storage_name
|
||||||
|
}
|
||||||
|
|
||||||
|
# Print available types
|
||||||
|
print("\nAvailable Storage Types for Target:")
|
||||||
|
for key, value in available_types.items():
|
||||||
|
print(f"[{key}] {value}")
|
||||||
|
else:
|
||||||
|
# Print all storage types for source
|
||||||
|
self.print_storage_types()
|
||||||
|
|
||||||
# Get storage type choice - allow exit for source storage
|
# Get storage type choice - allow exit for source storage
|
||||||
allow_exit = storage_type == "Source"
|
allow_exit = storage_type == "Source"
|
||||||
choice = self.get_user_choice(
|
choice = self.get_user_choice(
|
||||||
f"Select {storage_type} storage type (1-4)",
|
f"Select {storage_type} storage type (1-4)",
|
||||||
list(STORAGE_TYPES.keys()),
|
list(available_types.keys()),
|
||||||
allow_exit=allow_exit,
|
allow_exit=allow_exit,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -1162,12 +1195,15 @@ class MigrationTool:
|
||||||
return
|
return
|
||||||
|
|
||||||
# Setup target storage with streaming (only count, don't load all data)
|
# Setup target storage with streaming (only count, don't load all data)
|
||||||
|
# Exclude source storage type from target selection
|
||||||
(
|
(
|
||||||
self.target_storage,
|
self.target_storage,
|
||||||
target_storage_name,
|
target_storage_name,
|
||||||
self.target_workspace,
|
self.target_workspace,
|
||||||
target_count,
|
target_count,
|
||||||
) = await self.setup_storage("Target", use_streaming=True)
|
) = await self.setup_storage(
|
||||||
|
"Target", use_streaming=True, exclude_storage_name=source_storage_name
|
||||||
|
)
|
||||||
|
|
||||||
if not self.target_storage:
|
if not self.target_storage:
|
||||||
print("\n✗ Target storage setup failed")
|
print("\n✗ Target storage setup failed")
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue