""" Complete Migration Scenario Tests This test module covers all migration cases that were previously missing: 1. Case 1: Both new and legacy exist (warning scenario) 2. Case 2: Only new exists (already migrated) 3. Legacy upgrade from old versions (backward compatibility) 4. Empty legacy data migration 5. Workspace isolation verification 6. Model switching scenario Tests are implemented for both PostgreSQL and Qdrant backends. """ import pytest import numpy as np from unittest.mock import MagicMock, patch, AsyncMock from lightrag.utils import EmbeddingFunc from lightrag.kg.qdrant_impl import QdrantVectorDBStorage, _find_legacy_collection # ============================================================================ # Fixtures # ============================================================================ @pytest.fixture def mock_qdrant_client(): """Mock QdrantClient for Qdrant tests""" with patch("lightrag.kg.qdrant_impl.QdrantClient") as mock_client_cls: client = mock_client_cls.return_value client.collection_exists.return_value = False client.count.return_value.count = 0 collection_info = MagicMock() collection_info.payload_schema = {} client.get_collection.return_value = collection_info yield client @pytest.fixture(autouse=True) def mock_data_init_lock(): """Mock get_data_init_lock to avoid async lock issues""" with patch("lightrag.kg.qdrant_impl.get_data_init_lock") as mock_lock: mock_lock_ctx = AsyncMock() mock_lock.return_value = mock_lock_ctx yield mock_lock @pytest.fixture def mock_embedding_func(): """Create a mock embedding function""" async def embed_func(texts, **kwargs): return np.array([[0.1] * 768 for _ in texts]) return EmbeddingFunc(embedding_dim=768, func=embed_func, model_name="test-model") @pytest.fixture def qdrant_config(): """Basic Qdrant configuration""" return { "embedding_batch_num": 10, "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8}, } # ============================================================================ # Case 1: Both new and legacy exist (Warning scenario) # ============================================================================ @pytest.mark.asyncio async def test_case1_both_collections_exist_qdrant( mock_qdrant_client, mock_embedding_func, qdrant_config ): """ Case 1: Both new and legacy collections exist Expected: Log warning, do not migrate """ storage = QdrantVectorDBStorage( namespace="chunks", global_config=qdrant_config, embedding_func=mock_embedding_func, workspace="test_ws", ) # Mock: Both collections exist def collection_exists_side_effect(name): return name in [storage.final_namespace, storage.legacy_namespace] mock_qdrant_client.collection_exists.side_effect = collection_exists_side_effect # Initialize (should trigger warning, not migration) await storage.initialize() # Verify: No migration attempted mock_qdrant_client.scroll.assert_not_called() mock_qdrant_client.create_collection.assert_not_called() print("✅ Case 1: Warning logged when both collections exist") # ============================================================================ # Case 2: Only new exists (Already migrated scenario) # ============================================================================ @pytest.mark.asyncio async def test_case2_only_new_exists_qdrant( mock_qdrant_client, mock_embedding_func, qdrant_config ): """ Case 2: Only new collection exists, legacy deleted Expected: Verify index, normal operation """ storage = QdrantVectorDBStorage( namespace="chunks", global_config=qdrant_config, embedding_func=mock_embedding_func, workspace="test_ws", ) # Mock: Only new collection exists mock_qdrant_client.collection_exists.side_effect = ( lambda name: name == storage.final_namespace ) # Initialize (should check index but not migrate) await storage.initialize() # Verify: get_collection called to check index mock_qdrant_client.get_collection.assert_called_with(storage.final_namespace) # Verify: No migration attempted mock_qdrant_client.scroll.assert_not_called() print("✅ Case 2: Index check when only new collection exists") # ============================================================================ # Legacy upgrade from old versions (Backward compatibility) # ============================================================================ @pytest.mark.asyncio async def test_backward_compat_workspace_naming_qdrant(mock_qdrant_client): """ Test backward compatibility with old workspace-based naming Old format: {workspace}_{namespace} """ # Mock old-style collection name old_collection_name = "prod_chunks" mock_qdrant_client.collection_exists.side_effect = ( lambda name: name == old_collection_name ) # Test _find_legacy_collection with old naming found = _find_legacy_collection( mock_qdrant_client, namespace="chunks", workspace="prod" ) assert found == old_collection_name print(f"✅ Backward compat: Found old collection '{old_collection_name}'") @pytest.mark.asyncio async def test_backward_compat_no_workspace_naming_qdrant(mock_qdrant_client): """ Test backward compatibility with old no-workspace naming Old format: {namespace} """ # Mock old-style collection name (no workspace) old_collection_name = "chunks" mock_qdrant_client.collection_exists.side_effect = ( lambda name: name == old_collection_name ) # Test _find_legacy_collection with old naming (no workspace) found = _find_legacy_collection( mock_qdrant_client, namespace="chunks", workspace=None ) assert found == old_collection_name print(f"✅ Backward compat: Found old collection '{old_collection_name}'") @pytest.mark.asyncio async def test_backward_compat_migration_qdrant( mock_qdrant_client, mock_embedding_func, qdrant_config ): """ Test full migration from old workspace-based collection """ storage = QdrantVectorDBStorage( namespace="chunks", global_config=qdrant_config, embedding_func=mock_embedding_func, workspace="prod", ) # Mock old-style collection exists old_collection_name = "prod_chunks" def collection_exists_side_effect(name): # Only old collection exists initially if name == old_collection_name: return True return False mock_qdrant_client.collection_exists.side_effect = collection_exists_side_effect mock_qdrant_client.count.return_value.count = 50 # Mock data mock_point = MagicMock() mock_point.id = "old_id" mock_point.vector = [0.1] * 768 mock_point.payload = {"content": "test", "id": "doc1"} mock_qdrant_client.scroll.side_effect = [([mock_point], None)] # Initialize (should trigger migration from old collection) await storage.initialize() # Verify: Migration from old collection scroll_calls = mock_qdrant_client.scroll.call_args_list assert len(scroll_calls) >= 1 assert scroll_calls[0].kwargs["collection_name"] == old_collection_name print(f"✅ Backward compat: Migrated from old collection '{old_collection_name}'") # ============================================================================ # Empty legacy data migration # ============================================================================ @pytest.mark.asyncio async def test_empty_legacy_migration_qdrant( mock_qdrant_client, mock_embedding_func, qdrant_config ): """ Test migration when legacy collection exists but is empty Expected: Skip data migration, create new collection """ storage = QdrantVectorDBStorage( namespace="chunks", global_config=qdrant_config, embedding_func=mock_embedding_func, workspace="test_ws", ) # Mock: Legacy collection exists but is empty mock_qdrant_client.collection_exists.side_effect = ( lambda name: name == storage.legacy_namespace ) mock_qdrant_client.count.return_value.count = 0 # Empty! # Initialize (should skip data migration) await storage.initialize() # Verify: Create collection called mock_qdrant_client.create_collection.assert_called() # Verify: No data scroll attempted mock_qdrant_client.scroll.assert_not_called() print("✅ Empty legacy: Skipped data migration for empty collection") # ============================================================================ # Workspace isolation verification # ============================================================================ @pytest.mark.asyncio async def test_workspace_isolation_qdrant(mock_qdrant_client): """ Test workspace isolation within same collection Expected: Different workspaces use same collection but isolated by workspace_id """ config = { "embedding_batch_num": 10, "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8}, } async def embed_func(texts, **kwargs): return np.array([[0.1] * 768 for _ in texts]) embedding_func = EmbeddingFunc( embedding_dim=768, func=embed_func, model_name="test-model" ) # Create two storages with same model but different workspaces storage_a = QdrantVectorDBStorage( namespace="chunks", global_config=config, embedding_func=embedding_func, workspace="workspace_a", ) storage_b = QdrantVectorDBStorage( namespace="chunks", global_config=config, embedding_func=embedding_func, workspace="workspace_b", ) # Verify: Same collection name (model+dim isolation) assert storage_a.final_namespace == storage_b.final_namespace print( f"✅ Workspace isolation: Same collection '{storage_a.final_namespace}' for both workspaces" ) # Verify: Different effective workspaces assert storage_a.effective_workspace != storage_b.effective_workspace print( f"✅ Workspace isolation: Different workspaces '{storage_a.effective_workspace}' vs '{storage_b.effective_workspace}'" ) # ============================================================================ # Model switching scenario # ============================================================================ @pytest.mark.asyncio async def test_model_switch_scenario_qdrant(mock_qdrant_client): """ Test switching embedding models Expected: New collection created, old data preserved """ config = { "embedding_batch_num": 10, "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8}, } async def embed_func(texts, **kwargs): return np.array([[0.1] * 768 for _ in texts]) # Model A: 768d embedding_func_a = EmbeddingFunc( embedding_dim=768, func=embed_func, model_name="model-a" ) # Model B: 768d with different name embedding_func_b = EmbeddingFunc( embedding_dim=768, func=embed_func, model_name="model-b" ) # Create storage for model A storage_a = QdrantVectorDBStorage( namespace="chunks", global_config=config, embedding_func=embedding_func_a, workspace="test_ws", ) # Create storage for model B storage_b = QdrantVectorDBStorage( namespace="chunks", global_config=config, embedding_func=embedding_func_b, workspace="test_ws", ) # Verify: Different collection names despite same dimension assert storage_a.final_namespace != storage_b.final_namespace assert "model_a_768d" in storage_a.final_namespace assert "model_b_768d" in storage_b.final_namespace print("✅ Model switch: Different collections for different models") print(f" - Model A: {storage_a.final_namespace}") print(f" - Model B: {storage_b.final_namespace}") # ============================================================================ # Integration test with all scenarios # ============================================================================ @pytest.mark.asyncio async def test_migration_flow_all_cases_qdrant( mock_qdrant_client, mock_embedding_func, qdrant_config ): """ Integration test simulating the full migration lifecycle """ storage = QdrantVectorDBStorage( namespace="chunks", global_config=qdrant_config, embedding_func=mock_embedding_func, workspace="test_ws", ) # Scenario 1: First initialization (Case 3: Neither exists) mock_qdrant_client.collection_exists.return_value = False await storage.initialize() mock_qdrant_client.create_collection.assert_called() print("✅ Scenario 1: New collection created") # Reset mocks mock_qdrant_client.reset_mock() # Scenario 2: Second initialization (Case 2: Only new exists) mock_qdrant_client.collection_exists.side_effect = ( lambda name: name == storage.final_namespace ) collection_info = MagicMock() collection_info.payload_schema = {} mock_qdrant_client.get_collection.return_value = collection_info storage2 = QdrantVectorDBStorage( namespace="chunks", global_config=qdrant_config, embedding_func=mock_embedding_func, workspace="test_ws", ) await storage2.initialize() mock_qdrant_client.get_collection.assert_called() mock_qdrant_client.create_collection.assert_not_called() print("✅ Scenario 2: Existing collection reused") if __name__ == "__main__": pytest.main([__file__, "-v", "-s"])