diff --git a/.github/workflows/e2e-tests.yml b/.github/workflows/e2e-tests.yml index 7fd969e5..4f8b01b5 100644 --- a/.github/workflows/e2e-tests.yml +++ b/.github/workflows/e2e-tests.yml @@ -74,10 +74,9 @@ jobs: POSTGRES_USER: lightrag POSTGRES_PASSWORD: lightrag_test_password POSTGRES_DB: lightrag_test - POSTGRES_WORKSPACE: e2e_test run: | - pytest tests/test_e2e_postgres_migration.py -v --tb=short -s - timeout-minutes: 10 + pytest tests/test_e2e_multi_instance.py -k "postgres" -v --tb=short -s + timeout-minutes: 20 - name: Upload PostgreSQL test results if: always() @@ -146,8 +145,8 @@ jobs: QDRANT_URL: http://localhost:6333 QDRANT_API_KEY: "" run: | - pytest tests/test_e2e_qdrant_migration.py -v --tb=short -s - timeout-minutes: 10 + pytest tests/test_e2e_multi_instance.py -k "qdrant" -v --tb=short -s + timeout-minutes: 15 - name: Upload Qdrant test results if: always() diff --git a/tests/test_e2e_multi_instance.py b/tests/test_e2e_multi_instance.py new file mode 100644 index 00000000..b6935cd7 --- /dev/null +++ b/tests/test_e2e_multi_instance.py @@ -0,0 +1,589 @@ +""" +E2E Tests for Multi-Instance LightRAG with Multiple Workspaces + +These tests verify: +1. Multiple LightRAG instances with different embedding models +2. Multiple workspaces isolation +3. Both PostgreSQL and Qdrant vector storage +4. Real document insertion and query operations + +Prerequisites: +- PostgreSQL with pgvector extension +- Qdrant server running +- Environment variables configured +""" + +import os +import pytest +import asyncio +import numpy as np +import tempfile +import shutil +from lightrag import LightRAG +from lightrag.utils import EmbeddingFunc +from lightrag.kg.postgres_impl import PostgreSQLDB +from qdrant_client import QdrantClient + + +# Configuration fixtures +@pytest.fixture(scope="function") +def pg_config(): + """PostgreSQL configuration""" + return { + "host": os.getenv("POSTGRES_HOST", "localhost"), + "port": int(os.getenv("POSTGRES_PORT", "5432")), + "user": os.getenv("POSTGRES_USER", "lightrag"), + "password": os.getenv("POSTGRES_PASSWORD", "lightrag_test_password"), + "database": os.getenv("POSTGRES_DB", "lightrag_test"), + "workspace": "multi_instance_test", + "max_connections": 10, + "connection_retry_attempts": 3, + "connection_retry_backoff": 0.5, + "connection_retry_backoff_max": 5.0, + "pool_close_timeout": 5.0, + } + + +@pytest.fixture(scope="function") +def qdrant_config(): + """Qdrant configuration""" + return { + "url": os.getenv("QDRANT_URL", "http://localhost:6333"), + "api_key": os.getenv("QDRANT_API_KEY", None), + } + + +# Cleanup fixtures +@pytest.fixture(scope="function") +async def pg_cleanup(pg_config): + """Cleanup PostgreSQL tables before and after test""" + db = PostgreSQLDB(pg_config) + await db.initdb() + + tables_to_drop = [ + "lightrag_doc_full", + "lightrag_doc_chunks", + "lightrag_vdb_chunks", + "lightrag_vdb_chunks_model_a_768d", + "lightrag_vdb_chunks_model_b_1024d", + "lightrag_vdb_entity", + "lightrag_vdb_relation", + "lightrag_llm_cache", + "lightrag_doc_status", + "lightrag_full_entities", + "lightrag_full_relations", + "lightrag_entity_chunks", + "lightrag_relation_chunks", + ] + + # Cleanup before + for table in tables_to_drop: + try: + await db.execute(f"DROP TABLE IF EXISTS {table} CASCADE", None) + except Exception: + pass + + yield db + + # Cleanup after + for table in tables_to_drop: + try: + await db.execute(f"DROP TABLE IF EXISTS {table} CASCADE", None) + except Exception: + pass + + if db.pool: + await db.pool.close() + + +@pytest.fixture(scope="function") +def qdrant_cleanup(qdrant_config): + """Cleanup Qdrant collections before and after test""" + client = QdrantClient( + url=qdrant_config["url"], + api_key=qdrant_config["api_key"], + timeout=60, + ) + + collections_to_delete = [ + "lightrag_vdb_chunks_model_a_768d", + "lightrag_vdb_chunks_model_b_1024d", + ] + + # Cleanup before + for collection in collections_to_delete: + try: + if client.collection_exists(collection): + client.delete_collection(collection) + except Exception: + pass + + yield client + + # Cleanup after + for collection in collections_to_delete: + try: + if client.collection_exists(collection): + client.delete_collection(collection) + except Exception: + pass + + +@pytest.fixture +def temp_working_dirs(): + """Create multiple temporary working directories""" + dirs = { + "workspace_a": tempfile.mkdtemp(prefix="lightrag_workspace_a_"), + "workspace_b": tempfile.mkdtemp(prefix="lightrag_workspace_b_"), + } + yield dirs + # Cleanup + for dir_path in dirs.values(): + shutil.rmtree(dir_path, ignore_errors=True) + + +@pytest.fixture +def mock_llm_func(): + """Mock LLM function that returns proper entity/relation format""" + async def llm_func(prompt, system_prompt=None, history_messages=[], **kwargs): + await asyncio.sleep(0) # Simulate async I/O + return """entity<|#|>Artificial Intelligence<|#|>concept<|#|>AI is a field of computer science. +entity<|#|>Machine Learning<|#|>concept<|#|>ML is a subset of AI. +relation<|#|>Machine Learning<|#|>Artificial Intelligence<|#|>subset<|#|>ML is a subset of AI. +<|COMPLETE|>""" + return llm_func + + +@pytest.fixture +def mock_tokenizer(): + """Create a mock tokenizer""" + from lightrag.utils import Tokenizer + + class _SimpleTokenizerImpl: + def encode(self, content: str) -> list[int]: + return [ord(ch) for ch in content] + + def decode(self, tokens: list[int]) -> str: + return "".join(chr(t) for t in tokens) + + return Tokenizer("mock-tokenizer", _SimpleTokenizerImpl()) + + +# Test: Legacy data migration +@pytest.mark.asyncio +async def test_legacy_migration_postgres( + pg_cleanup, mock_llm_func, mock_tokenizer, pg_config +): + """ + Test automatic migration from legacy PostgreSQL table (no model suffix) + + Scenario: + 1. Create legacy table without model suffix + 2. Insert test data with 1536d vectors + 3. Initialize LightRAG with model_name (triggers migration) + 4. Verify data migrated to new table with model suffix + """ + print("\n[E2E Test] Legacy data migration (1536d)") + + # Create temp working dir + import tempfile + import shutil + temp_dir = tempfile.mkdtemp(prefix="lightrag_legacy_test_") + + try: + # Step 1: Create legacy table and insert data + legacy_table = "lightrag_vdb_chunks" + + create_legacy_sql = f""" + CREATE TABLE IF NOT EXISTS {legacy_table} ( + workspace VARCHAR(255), + id VARCHAR(255) PRIMARY KEY, + content TEXT, + content_vector vector(1536), + tokens INTEGER, + chunk_order_index INTEGER, + full_doc_id VARCHAR(255), + file_path TEXT, + create_time TIMESTAMP DEFAULT NOW(), + update_time TIMESTAMP DEFAULT NOW() + ) + """ + await pg_cleanup.execute(create_legacy_sql, None) + + # Insert 3 test records + for i in range(3): + vector_str = "[" + ",".join(["0.1"] * 1536) + "]" + insert_sql = f""" + INSERT INTO {legacy_table} + (workspace, id, content, content_vector, tokens, chunk_order_index, full_doc_id, file_path) + VALUES ($1, $2, $3, $4::vector, $5, $6, $7, $8) + """ + await pg_cleanup.execute(insert_sql, { + "workspace": pg_config["workspace"], + "id": f"legacy_{i}", + "content": f"Legacy content {i}", + "content_vector": vector_str, + "tokens": 100, + "chunk_order_index": i, + "full_doc_id": "legacy_doc", + "file_path": "/test/path" + }) + + # Verify legacy data + count_result = await pg_cleanup.query( + f"SELECT COUNT(*) as count FROM {legacy_table} WHERE workspace=$1", + [pg_config["workspace"]] + ) + legacy_count = count_result.get("count", 0) + print(f"✅ Legacy table created with {legacy_count} records") + + # Step 2: Initialize LightRAG with model_name (triggers migration) + async def embed_func(texts): + await asyncio.sleep(0) + return np.random.rand(len(texts), 1536) + + embedding_func = EmbeddingFunc( + embedding_dim=1536, + max_token_size=8192, + func=embed_func, + model_name="text-embedding-ada-002" + ) + + rag = LightRAG( + working_dir=temp_dir, + llm_model_func=mock_llm_func, + embedding_func=embedding_func, + tokenizer=mock_tokenizer, + kv_storage="PGKVStorage", + vector_storage="PGVectorStorage", + graph_storage="PGGraphStorage", + doc_status_storage="PGDocStatusStorage", + vector_db_storage_cls_kwargs={ + **pg_config, + "cosine_better_than_threshold": 0.8 + }, + kv_storage_cls_kwargs=pg_config, + graph_storage_cls_kwargs=pg_config, + doc_status_storage_cls_kwargs=pg_config, + ) + + print("🔄 Initializing LightRAG (triggers migration)...") + await rag.initialize_storages() + + # Step 3: Verify migration + new_table = rag.chunk_entity_relation_graph.chunk_vdb.table_name + assert "text_embedding_ada_002_1536d" in new_table.lower() + + new_count_result = await pg_cleanup.query( + f"SELECT COUNT(*) as count FROM {new_table} WHERE workspace=$1", + [pg_config["workspace"]] + ) + new_count = new_count_result.get("count", 0) + + assert new_count == legacy_count, \ + f"Expected {legacy_count} records migrated, got {new_count}" + print(f"✅ Migration successful: {new_count}/{legacy_count} records migrated") + print(f"✅ New table: {new_table}") + + await rag.finalize_storages() + + finally: + # Cleanup temp dir + shutil.rmtree(temp_dir, ignore_errors=True) + + +# Test: Multiple LightRAG instances with PostgreSQL +@pytest.mark.asyncio +async def test_multi_instance_postgres( + pg_cleanup, temp_working_dirs, mock_llm_func, mock_tokenizer, pg_config +): + """ + Test multiple LightRAG instances with different dimensions and model names + + Scenarios: + - Instance A: model-a (768d) - explicit model name + - Instance B: model-b (1024d) - explicit model name + - Both instances insert documents independently + - Verify separate tables created for each model+dimension combination + - Verify data isolation between instances + + Note: Additional embedding functions (C: 1536d, D: no model_name) are defined + but not used in this test. They can be activated for extended testing. + """ + print("\n[E2E Multi-Instance] PostgreSQL with 2 models (768d vs 1024d)") + + # Instance A: 768d with model-a + async def embed_func_a(texts): + await asyncio.sleep(0) + return np.random.rand(len(texts), 768) + + embedding_func_a = EmbeddingFunc( + embedding_dim=768, + max_token_size=8192, + func=embed_func_a, + model_name="model-a" + ) + + # Instance B: 1024d with model-b + async def embed_func_b(texts): + await asyncio.sleep(0) + return np.random.rand(len(texts), 1024) + + embedding_func_b = EmbeddingFunc( + embedding_dim=1024, + max_token_size=8192, + func=embed_func_b, + model_name="model-b" + ) + + # Instance C: 1536d with text-embedding-ada-002 + async def embed_func_c(texts): + await asyncio.sleep(0) + return np.random.rand(len(texts), 1536) + + embedding_func_c = EmbeddingFunc( + embedding_dim=1536, + max_token_size=8192, + func=embed_func_c, + model_name="text-embedding-ada-002" + ) + + # Instance D: 768d WITHOUT model_name (backward compatibility) + async def embed_func_d(texts): + await asyncio.sleep(0) + return np.random.rand(len(texts), 768) + + embedding_func_d = EmbeddingFunc( + embedding_dim=768, + max_token_size=8192, + func=embed_func_d + # NO model_name - test backward compatibility + ) + + # Initialize LightRAG instance A + print("📦 Initializing LightRAG instance A (model-a, 768d)...") + rag_a = LightRAG( + working_dir=temp_working_dirs["workspace_a"], + llm_model_func=mock_llm_func, + embedding_func=embedding_func_a, + tokenizer=mock_tokenizer, + kv_storage="PGKVStorage", + vector_storage="PGVectorStorage", + graph_storage="PGGraphStorage", + doc_status_storage="PGDocStatusStorage", + vector_db_storage_cls_kwargs={ + **pg_config, + "cosine_better_than_threshold": 0.8 + }, + kv_storage_cls_kwargs=pg_config, + graph_storage_cls_kwargs=pg_config, + doc_status_storage_cls_kwargs=pg_config, + ) + + await rag_a.initialize_storages() + table_a = rag_a.chunk_entity_relation_graph.chunk_vdb.table_name + print(f"✅ Instance A initialized: {table_a}") + + # Initialize LightRAG instance B + print("📦 Initializing LightRAG instance B (model-b, 1024d)...") + rag_b = LightRAG( + working_dir=temp_working_dirs["workspace_b"], + llm_model_func=mock_llm_func, + embedding_func=embedding_func_b, + tokenizer=mock_tokenizer, + kv_storage="PGKVStorage", + vector_storage="PGVectorStorage", + graph_storage="PGGraphStorage", + doc_status_storage="PGDocStatusStorage", + vector_db_storage_cls_kwargs={ + **pg_config, + "cosine_better_than_threshold": 0.8 + }, + kv_storage_cls_kwargs=pg_config, + graph_storage_cls_kwargs=pg_config, + doc_status_storage_cls_kwargs=pg_config, + ) + + await rag_b.initialize_storages() + table_b = rag_b.chunk_entity_relation_graph.chunk_vdb.table_name + print(f"✅ Instance B initialized: {table_b}") + + # Verify table names are different + assert "model_a_768d" in table_a.lower() + assert "model_b_1024d" in table_b.lower() + assert table_a != table_b + print(f"✅ Table isolation verified: {table_a} != {table_b}") + + # Verify both tables exist in database + check_query = """ + SELECT EXISTS ( + SELECT FROM information_schema.tables + WHERE table_name = $1 + ) + """ + result_a = await pg_cleanup.query(check_query, [table_a.lower()]) + result_b = await pg_cleanup.query(check_query, [table_b.lower()]) + + assert result_a.get("exists") == True, f"Table {table_a} should exist" + assert result_b.get("exists") == True, f"Table {table_b} should exist" + print("✅ Both tables exist in PostgreSQL") + + # Insert documents in instance A + print("📝 Inserting document in instance A...") + await rag_a.ainsert("Document A: This is about artificial intelligence and neural networks.") + + # Insert documents in instance B + print("📝 Inserting document in instance B...") + await rag_b.ainsert("Document B: This is about machine learning and deep learning.") + + # Verify data isolation + count_a_result = await pg_cleanup.query( + f"SELECT COUNT(*) as count FROM {table_a}", + [] + ) + count_b_result = await pg_cleanup.query( + f"SELECT COUNT(*) as count FROM {table_b}", + [] + ) + + count_a = count_a_result.get("count", 0) + count_b = count_b_result.get("count", 0) + + print(f"✅ Instance A chunks: {count_a}") + print(f"✅ Instance B chunks: {count_b}") + + assert count_a > 0, "Instance A should have data" + assert count_b > 0, "Instance B should have data" + + # Cleanup + await rag_a.finalize_storages() + await rag_b.finalize_storages() + + print("✅ Multi-instance PostgreSQL test passed!") + + +# Test: Multiple LightRAG instances with Qdrant +@pytest.mark.asyncio +async def test_multi_instance_qdrant( + qdrant_cleanup, temp_working_dirs, mock_llm_func, mock_tokenizer, qdrant_config +): + """ + Test multiple LightRAG instances with different models using Qdrant + + Scenario: + - Instance A: model-a (768d) + - Instance B: model-b (1024d) + - Both insert documents independently + - Verify separate collections created and data isolated + """ + print("\n[E2E Multi-Instance] Qdrant with 2 models (768d vs 1024d)") + + # Create embedding function for model A (768d) + async def embed_func_a(texts): + await asyncio.sleep(0) + return np.random.rand(len(texts), 768) + + embedding_func_a = EmbeddingFunc( + embedding_dim=768, + max_token_size=8192, + func=embed_func_a, + model_name="model-a" + ) + + # Create embedding function for model B (1024d) + async def embed_func_b(texts): + await asyncio.sleep(0) + return np.random.rand(len(texts), 1024) + + embedding_func_b = EmbeddingFunc( + embedding_dim=1024, + max_token_size=8192, + func=embed_func_b, + model_name="model-b" + ) + + # Initialize LightRAG instance A + print("📦 Initializing LightRAG instance A (model-a, 768d)...") + rag_a = LightRAG( + working_dir=temp_working_dirs["workspace_a"], + llm_model_func=mock_llm_func, + embedding_func=embedding_func_a, + tokenizer=mock_tokenizer, + vector_storage="QdrantVectorDBStorage", + vector_db_storage_cls_kwargs={ + **qdrant_config, + "cosine_better_than_threshold": 0.8 + }, + ) + + await rag_a.initialize_storages() + collection_a = rag_a.chunk_entity_relation_graph.chunk_vdb.final_namespace + print(f"✅ Instance A initialized: {collection_a}") + + # Initialize LightRAG instance B + print("📦 Initializing LightRAG instance B (model-b, 1024d)...") + rag_b = LightRAG( + working_dir=temp_working_dirs["workspace_b"], + llm_model_func=mock_llm_func, + embedding_func=embedding_func_b, + tokenizer=mock_tokenizer, + vector_storage="QdrantVectorDBStorage", + vector_db_storage_cls_kwargs={ + **qdrant_config, + "cosine_better_than_threshold": 0.8 + }, + ) + + await rag_b.initialize_storages() + collection_b = rag_b.chunk_entity_relation_graph.chunk_vdb.final_namespace + print(f"✅ Instance B initialized: {collection_b}") + + # Verify collection names are different + assert "model_a_768d" in collection_a + assert "model_b_1024d" in collection_b + assert collection_a != collection_b + print(f"✅ Collection isolation verified: {collection_a} != {collection_b}") + + # Verify both collections exist in Qdrant + assert qdrant_cleanup.collection_exists(collection_a), \ + f"Collection {collection_a} should exist" + assert qdrant_cleanup.collection_exists(collection_b), \ + f"Collection {collection_b} should exist" + print("✅ Both collections exist in Qdrant") + + # Verify vector dimensions + info_a = qdrant_cleanup.get_collection(collection_a) + info_b = qdrant_cleanup.get_collection(collection_b) + + assert info_a.config.params.vectors.size == 768, "Model A should use 768 dimensions" + assert info_b.config.params.vectors.size == 1024, "Model B should use 1024 dimensions" + print(f"✅ Vector dimensions verified: {info_a.config.params.vectors.size}d vs {info_b.config.params.vectors.size}d") + + # Insert documents in instance A + print("📝 Inserting document in instance A...") + await rag_a.ainsert("Document A: This is about artificial intelligence and neural networks.") + + # Insert documents in instance B + print("📝 Inserting document in instance B...") + await rag_b.ainsert("Document B: This is about machine learning and deep learning.") + + # Verify data isolation + count_a = qdrant_cleanup.count(collection_a).count + count_b = qdrant_cleanup.count(collection_b).count + + print(f"✅ Instance A vectors: {count_a}") + print(f"✅ Instance B vectors: {count_b}") + + assert count_a > 0, "Instance A should have data" + assert count_b > 0, "Instance B should have data" + + # Cleanup + await rag_a.finalize_storages() + await rag_b.finalize_storages() + + print("✅ Multi-instance Qdrant test passed!") + + +if __name__ == "__main__": + # Run tests with pytest + pytest.main([__file__, "-v", "-s"]) diff --git a/tests/test_e2e_postgres_migration.py b/tests/test_e2e_postgres_migration.py deleted file mode 100644 index c6065d78..00000000 --- a/tests/test_e2e_postgres_migration.py +++ /dev/null @@ -1,355 +0,0 @@ -""" -E2E Tests for PostgreSQL Vector Storage Model Isolation - -These tests use a REAL PostgreSQL database with pgvector extension. -Unlike unit tests, these verify actual database operations, data migration, -and multi-model isolation scenarios. - -Prerequisites: -- PostgreSQL with pgvector extension -- Environment variables: POSTGRES_HOST, POSTGRES_PORT, POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_DB -""" - -import os -import pytest -import asyncio -import numpy as np -from lightrag.utils import EmbeddingFunc -from lightrag.kg.postgres_impl import PGVectorStorage, PostgreSQLDB, ClientManager -from lightrag.namespace import NameSpace - - -# E2E test configuration from environment -@pytest.fixture(scope="function") -def pg_config(): - """Real PostgreSQL configuration from environment variables""" - return { - "host": os.getenv("POSTGRES_HOST", "localhost"), - "port": int(os.getenv("POSTGRES_PORT", "5432")), - "user": os.getenv("POSTGRES_USER", "lightrag"), - "password": os.getenv("POSTGRES_PASSWORD", "lightrag_test_password"), - "database": os.getenv("POSTGRES_DB", "lightrag_test"), - "workspace": os.getenv("POSTGRES_WORKSPACE", "e2e_test"), - "max_connections": 10, - # Connection retry configuration - "connection_retry_attempts": 3, - "connection_retry_backoff": 0.5, - "connection_retry_backoff_max": 5.0, - "pool_close_timeout": 5.0, - } - - -@pytest.fixture(scope="function") -async def real_db(pg_config): - """Create a real PostgreSQL database connection""" - db = PostgreSQLDB(pg_config) - await db.initdb() - yield db - # Cleanup: close connection pool - if db.pool: - await db.pool.close() - - -@pytest.fixture -async def cleanup_tables(real_db): - """Cleanup test tables before and after each test""" - # Cleanup before test - tables_to_drop = [ - "LIGHTRAG_VDB_CHUNKS", - "LIGHTRAG_VDB_CHUNKS_test_model_768d", - "LIGHTRAG_VDB_CHUNKS_text_embedding_ada_002_1536d", - "LIGHTRAG_VDB_CHUNKS_bge_small_768d", - "LIGHTRAG_VDB_CHUNKS_bge_large_1024d", - ] - - for table in tables_to_drop: - try: - await real_db.execute(f"DROP TABLE IF EXISTS {table} CASCADE", None) - except Exception: - pass - - yield - - # Cleanup after test - for table in tables_to_drop: - try: - await real_db.execute(f"DROP TABLE IF EXISTS {table} CASCADE", None) - except Exception: - pass - - -@pytest.fixture -def mock_embedding_func(): - """Create a mock embedding function for testing""" - async def embed_func(texts, **kwargs): - # Generate fake embeddings with consistent dimension - return np.array([[0.1] * 768 for _ in texts]) - - return EmbeddingFunc( - embedding_dim=768, - func=embed_func, - model_name="test_model" - ) - - -@pytest.mark.asyncio -async def test_e2e_fresh_installation(real_db, cleanup_tables, mock_embedding_func, pg_config): - """ - E2E Test: Fresh installation with model_name specified - - Scenario: New workspace, no legacy data - Expected: Create new table with model suffix, no migration needed - """ - print("\n[E2E Test] Fresh installation with model_name") - - # Reset ClientManager to use our test config - ClientManager._instance = None - ClientManager._client_config = pg_config - - # Create storage with model_name - storage = PGVectorStorage( - namespace=NameSpace.VECTOR_STORE_CHUNKS, - global_config={ - "embedding_batch_num": 10, - "vector_db_storage_cls_kwargs": { - "cosine_better_than_threshold": 0.8 - } - }, - embedding_func=mock_embedding_func, - workspace="e2e_test" - ) - - # Initialize storage (should create new table) - await storage.initialize() - - # Verify table name - assert "test_model_768d" in storage.table_name - expected_table = "LIGHTRAG_VDB_CHUNKS_test_model_768d" - assert storage.table_name == expected_table - - # Verify table exists - check_query = """ - SELECT EXISTS ( - SELECT FROM information_schema.tables - WHERE table_name = $1 - ) - """ - result = await real_db.query(check_query, [expected_table.lower()]) - assert result.get("exists") == True, f"Table {expected_table} should exist" - - # Verify legacy table does NOT exist - legacy_result = await real_db.query(check_query, ["LIGHTRAG_VDB_CHUNKS".lower()]) - assert legacy_result.get("exists") == False, "Legacy table should not exist" - - print(f"✅ Fresh installation successful: {expected_table} created") - - await storage.finalize() - - -@pytest.mark.asyncio -async def test_e2e_legacy_migration(real_db, cleanup_tables, pg_config): - """ - E2E Test: Upgrade from legacy format with automatic migration - - Scenario: - 1. Create legacy table (without model suffix) - 2. Insert test data - 3. Initialize with model_name (triggers migration) - 4. Verify data migrated to new table - """ - print("\n[E2E Test] Legacy data migration") - - # Step 1: Create legacy table and insert data - legacy_table = "LIGHTRAG_VDB_CHUNKS" - - create_legacy_sql = f""" - CREATE TABLE IF NOT EXISTS {legacy_table} ( - workspace VARCHAR(255), - id VARCHAR(255) PRIMARY KEY, - content TEXT, - content_vector vector(1536), - tokens INTEGER, - chunk_order_index INTEGER, - full_doc_id VARCHAR(255), - file_path TEXT, - create_time TIMESTAMP, - update_time TIMESTAMP - ) - """ - await real_db.execute(create_legacy_sql, None) - - # Insert test data into legacy table - test_data = [ - ("e2e_test", f"legacy_doc_{i}", f"Legacy content {i}", - [0.1] * 1536, 100, i, "legacy_doc", "/test/path", "NOW()", "NOW()") - for i in range(10) - ] - - for data in test_data: - insert_sql = f""" - INSERT INTO {legacy_table} - (workspace, id, content, content_vector, tokens, chunk_order_index, full_doc_id, file_path, create_time, update_time) - VALUES ($1, $2, $3, $4::vector, $5, $6, $7, $8, {data[8]}, {data[9]}) - """ - await real_db.execute(insert_sql, { - "workspace": data[0], - "id": data[1], - "content": data[2], - "content_vector": data[3], - "tokens": data[4], - "chunk_order_index": data[5], - "full_doc_id": data[6], - "file_path": data[7] - }) - - # Verify legacy data exists - count_result = await real_db.query(f"SELECT COUNT(*) as count FROM {legacy_table} WHERE workspace=$1", ["e2e_test"]) - legacy_count = count_result.get("count", 0) - assert legacy_count == 10, f"Expected 10 records in legacy table, got {legacy_count}" - print(f"✅ Legacy table created with {legacy_count} records") - - # Step 2: Initialize storage with model_name (triggers migration) - ClientManager._instance = None - ClientManager._client_config = pg_config - - async def embed_func(texts, **kwargs): - return np.array([[0.1] * 1536 for _ in texts]) - - embedding_func = EmbeddingFunc( - embedding_dim=1536, - func=embed_func, - model_name="text-embedding-ada-002" - ) - - storage = PGVectorStorage( - namespace=NameSpace.VECTOR_STORE_CHUNKS, - global_config={ - "embedding_batch_num": 10, - "vector_db_storage_cls_kwargs": { - "cosine_better_than_threshold": 0.8 - } - }, - embedding_func=embedding_func, - workspace="e2e_test" - ) - - # Initialize (should trigger migration) - print("🔄 Starting migration...") - await storage.initialize() - print("✅ Migration completed") - - # Step 3: Verify migration - new_table = storage.table_name - assert "text_embedding_ada_002_1536d" in new_table - - # Count records in new table - new_count_result = await real_db.query(f"SELECT COUNT(*) as count FROM {new_table} WHERE workspace=$1", ["e2e_test"]) - new_count = new_count_result.get("count", 0) - - assert new_count == legacy_count, f"Expected {legacy_count} records in new table, got {new_count}" - print(f"✅ Data migration verified: {new_count}/{legacy_count} records migrated") - - # Verify data content - sample_result = await real_db.query(f"SELECT id, content FROM {new_table} WHERE workspace=$1 LIMIT 1", ["e2e_test"]) - assert sample_result is not None - assert "Legacy content" in sample_result.get("content", "") - print(f"✅ Data integrity verified: {sample_result.get('id')}") - - await storage.finalize() - - -@pytest.mark.asyncio -async def test_e2e_multi_model_coexistence(real_db, cleanup_tables, pg_config): - """ - E2E Test: Multiple embedding models coexisting - - Scenario: - 1. Create storage with model A (768d) - 2. Create storage with model B (1024d) - 3. Verify separate tables created - 4. Verify data isolation - """ - print("\n[E2E Test] Multi-model coexistence") - - ClientManager._instance = None - ClientManager._client_config = pg_config - - # Model A: 768 dimensions - async def embed_func_a(texts, **kwargs): - return np.array([[0.1] * 768 for _ in texts]) - - embedding_func_a = EmbeddingFunc( - embedding_dim=768, - func=embed_func_a, - model_name="bge-small" - ) - - storage_a = PGVectorStorage( - namespace=NameSpace.VECTOR_STORE_CHUNKS, - global_config={ - "embedding_batch_num": 10, - "vector_db_storage_cls_kwargs": { - "cosine_better_than_threshold": 0.8 - } - }, - embedding_func=embedding_func_a, - workspace="e2e_test" - ) - - await storage_a.initialize() - table_a = storage_a.table_name - assert "bge_small_768d" in table_a - print(f"✅ Model A table created: {table_a}") - - # Model B: 1024 dimensions - async def embed_func_b(texts, **kwargs): - return np.array([[0.1] * 1024 for _ in texts]) - - embedding_func_b = EmbeddingFunc( - embedding_dim=1024, - func=embed_func_b, - model_name="bge-large" - ) - - storage_b = PGVectorStorage( - namespace=NameSpace.VECTOR_STORE_CHUNKS, - global_config={ - "embedding_batch_num": 10, - "vector_db_storage_cls_kwargs": { - "cosine_better_than_threshold": 0.8 - } - }, - embedding_func=embedding_func_b, - workspace="e2e_test" - ) - - await storage_b.initialize() - table_b = storage_b.table_name - assert "bge_large_1024d" in table_b - print(f"✅ Model B table created: {table_b}") - - # Verify tables are different - assert table_a != table_b, "Tables should have different names" - print(f"✅ Table isolation verified: {table_a} != {table_b}") - - # Verify both tables exist - check_query = """ - SELECT EXISTS ( - SELECT FROM information_schema.tables - WHERE table_name = $1 - ) - """ - result_a = await real_db.query(check_query, [table_a.lower()]) - result_b = await real_db.query(check_query, [table_b.lower()]) - - assert result_a.get("exists") == True - assert result_b.get("exists") == True - print("✅ Both tables exist in database") - - await storage_a.finalize() - await storage_b.finalize() - - -if __name__ == "__main__": - # Run tests with pytest - pytest.main([__file__, "-v", "-s"]) diff --git a/tests/test_e2e_qdrant_migration.py b/tests/test_e2e_qdrant_migration.py deleted file mode 100644 index 8b18a564..00000000 --- a/tests/test_e2e_qdrant_migration.py +++ /dev/null @@ -1,346 +0,0 @@ -""" -E2E Tests for Qdrant Vector Storage Model Isolation - -These tests use a REAL Qdrant server. -Unlike unit tests, these verify actual collection operations, data migration, -and multi-model isolation scenarios. - -Prerequisites: -- Qdrant server running -- Environment variables: QDRANT_URL (optional QDRANT_API_KEY) -""" - -import os -import pytest -import asyncio -import numpy as np -from lightrag.utils import EmbeddingFunc -from lightrag.kg.qdrant_impl import QdrantVectorDBStorage -from lightrag.namespace import NameSpace -from qdrant_client import QdrantClient -from qdrant_client.models import Distance, VectorParams - - -# E2E test configuration from environment -@pytest.fixture(scope="function") -def qdrant_config(): - """Real Qdrant configuration from environment variables""" - return { - "url": os.getenv("QDRANT_URL", "http://localhost:6333"), - "api_key": os.getenv("QDRANT_API_KEY", None), - } - - -@pytest.fixture(scope="function") -def qdrant_client(qdrant_config): - """Create a real Qdrant client""" - client = QdrantClient( - url=qdrant_config["url"], - api_key=qdrant_config["api_key"], - timeout=60, - ) - yield client - # Client auto-closes - - -@pytest.fixture -async def cleanup_collections(qdrant_client): - """Cleanup test collections before and after each test""" - collections_to_delete = [ - "lightrag_vdb_chunks", # legacy - "e2e_test_chunks", # legacy with workspace - "lightrag_vdb_chunks_test_model_768d", - "lightrag_vdb_chunks_text_embedding_ada_002_1536d", - "lightrag_vdb_chunks_bge_small_768d", - "lightrag_vdb_chunks_bge_large_1024d", - ] - - # Cleanup before test - for collection in collections_to_delete: - try: - if qdrant_client.collection_exists(collection): - qdrant_client.delete_collection(collection) - except Exception: - pass - - yield - - # Cleanup after test - for collection in collections_to_delete: - try: - if qdrant_client.collection_exists(collection): - qdrant_client.delete_collection(collection) - except Exception: - pass - - -@pytest.fixture -def mock_embedding_func(): - """Create a mock embedding function for testing""" - async def embed_func(texts, **kwargs): - return np.array([[0.1] * 768 for _ in texts]) - - return EmbeddingFunc( - embedding_dim=768, - func=embed_func, - model_name="test_model" - ) - - -@pytest.mark.asyncio -async def test_e2e_qdrant_fresh_installation(qdrant_client, cleanup_collections, mock_embedding_func, qdrant_config): - """ - E2E Test: Fresh Qdrant installation with model_name specified - - Scenario: New workspace, no legacy collection - Expected: Create new collection with model suffix, no migration needed - """ - print("\n[E2E Test] Fresh Qdrant installation with model_name") - - # Create storage with model_name - storage = QdrantVectorDBStorage( - namespace=NameSpace.VECTOR_STORE_CHUNKS, - global_config={ - "embedding_batch_num": 10, - "vector_db_storage_cls_kwargs": { - "url": qdrant_config["url"], - "api_key": qdrant_config["api_key"], - "cosine_better_than_threshold": 0.8, - } - }, - embedding_func=mock_embedding_func, - workspace="e2e_test" - ) - - # Initialize storage (should create new collection) - await storage.initialize() - - # Verify collection name - assert "test_model_768d" in storage.final_namespace - expected_collection = "lightrag_vdb_chunks_test_model_768d" - assert storage.final_namespace == expected_collection - - # Verify collection exists - assert qdrant_client.collection_exists(expected_collection), \ - f"Collection {expected_collection} should exist" - - # Verify collection properties - collection_info = qdrant_client.get_collection(expected_collection) - assert collection_info.vectors_count == 0, "New collection should be empty" - print(f"✅ Fresh installation successful: {expected_collection} created") - - # Verify legacy collection does NOT exist - assert not qdrant_client.collection_exists("lightrag_vdb_chunks"), \ - "Legacy collection should not exist" - assert not qdrant_client.collection_exists("e2e_test_chunks"), \ - "Legacy workspace collection should not exist" - - await storage.finalize() - - -@pytest.mark.asyncio -async def test_e2e_qdrant_legacy_migration(qdrant_client, cleanup_collections, qdrant_config): - """ - E2E Test: Upgrade from legacy Qdrant collection with automatic migration - - Scenario: - 1. Create legacy collection (without model suffix) - 2. Insert test data - 3. Initialize with model_name (triggers migration) - 4. Verify data migrated to new collection - """ - print("\n[E2E Test] Legacy Qdrant collection migration") - - # Step 1: Create legacy collection and insert data - legacy_collection = "e2e_test_chunks" # workspace-prefixed legacy name - - qdrant_client.create_collection( - collection_name=legacy_collection, - vectors_config=VectorParams(size=1536, distance=Distance.COSINE), - ) - - # Insert test data into legacy collection - from qdrant_client.models import PointStruct - - test_points = [ - PointStruct( - id=i, - vector=[0.1] * 1536, - payload={ - "workspace_id": "e2e_test", - "content": f"Legacy content {i}", - "id": f"legacy_doc_{i}", - } - ) - for i in range(10) - ] - - qdrant_client.upsert( - collection_name=legacy_collection, - points=test_points, - wait=True, - ) - - # Verify legacy data exists - legacy_info = qdrant_client.get_collection(legacy_collection) - legacy_count = legacy_info.vectors_count - assert legacy_count == 10, f"Expected 10 vectors in legacy collection, got {legacy_count}" - print(f"✅ Legacy collection created with {legacy_count} vectors") - - # Step 2: Initialize storage with model_name (triggers migration) - async def embed_func(texts, **kwargs): - return np.array([[0.1] * 1536 for _ in texts]) - - embedding_func = EmbeddingFunc( - embedding_dim=1536, - func=embed_func, - model_name="text-embedding-ada-002" - ) - - storage = QdrantVectorDBStorage( - namespace=NameSpace.VECTOR_STORE_CHUNKS, - global_config={ - "embedding_batch_num": 10, - "vector_db_storage_cls_kwargs": { - "url": qdrant_config["url"], - "api_key": qdrant_config["api_key"], - "cosine_better_than_threshold": 0.8, - } - }, - embedding_func=embedding_func, - workspace="e2e_test" - ) - - # Initialize (should trigger migration) - print("🔄 Starting migration...") - await storage.initialize() - print("✅ Migration completed") - - # Step 3: Verify migration - new_collection = storage.final_namespace - assert "text_embedding_ada_002_1536d" in new_collection - - # Verify new collection exists and has data - assert qdrant_client.collection_exists(new_collection), \ - f"New collection {new_collection} should exist" - - new_info = qdrant_client.get_collection(new_collection) - new_count = new_info.vectors_count - - assert new_count == legacy_count, \ - f"Expected {legacy_count} vectors in new collection, got {new_count}" - print(f"✅ Data migration verified: {new_count}/{legacy_count} vectors migrated") - - # Verify data content - sample_points = qdrant_client.scroll( - collection_name=new_collection, - limit=1, - with_payload=True, - )[0] - - assert len(sample_points) > 0, "Should have at least one point" - sample = sample_points[0] - assert "Legacy content" in sample.payload.get("content", "") - print(f"✅ Data integrity verified: {sample.payload.get('id')}") - - await storage.finalize() - - -@pytest.mark.asyncio -async def test_e2e_qdrant_multi_model_coexistence(qdrant_client, cleanup_collections, qdrant_config): - """ - E2E Test: Multiple embedding models coexisting in Qdrant - - Scenario: - 1. Create storage with model A (768d) - 2. Create storage with model B (1024d) - 3. Verify separate collections created - 4. Verify data isolation - """ - print("\n[E2E Test] Multi-model coexistence in Qdrant") - - # Model A: 768 dimensions - async def embed_func_a(texts, **kwargs): - return np.array([[0.1] * 768 for _ in texts]) - - embedding_func_a = EmbeddingFunc( - embedding_dim=768, - func=embed_func_a, - model_name="bge-small" - ) - - storage_a = QdrantVectorDBStorage( - namespace=NameSpace.VECTOR_STORE_CHUNKS, - global_config={ - "embedding_batch_num": 10, - "vector_db_storage_cls_kwargs": { - "url": qdrant_config["url"], - "api_key": qdrant_config["api_key"], - "cosine_better_than_threshold": 0.8, - } - }, - embedding_func=embedding_func_a, - workspace="e2e_test" - ) - - await storage_a.initialize() - collection_a = storage_a.final_namespace - assert "bge_small_768d" in collection_a - print(f"✅ Model A collection created: {collection_a}") - - # Model B: 1024 dimensions - async def embed_func_b(texts, **kwargs): - return np.array([[0.1] * 1024 for _ in texts]) - - embedding_func_b = EmbeddingFunc( - embedding_dim=1024, - func=embed_func_b, - model_name="bge-large" - ) - - storage_b = QdrantVectorDBStorage( - namespace=NameSpace.VECTOR_STORE_CHUNKS, - global_config={ - "embedding_batch_num": 10, - "vector_db_storage_cls_kwargs": { - "url": qdrant_config["url"], - "api_key": qdrant_config["api_key"], - "cosine_better_than_threshold": 0.8, - } - }, - embedding_func=embedding_func_b, - workspace="e2e_test" - ) - - await storage_b.initialize() - collection_b = storage_b.final_namespace - assert "bge_large_1024d" in collection_b - print(f"✅ Model B collection created: {collection_b}") - - # Verify collections are different - assert collection_a != collection_b, "Collections should have different names" - print(f"✅ Collection isolation verified: {collection_a} != {collection_b}") - - # Verify both collections exist - assert qdrant_client.collection_exists(collection_a), \ - f"Collection {collection_a} should exist" - assert qdrant_client.collection_exists(collection_b), \ - f"Collection {collection_b} should exist" - print("✅ Both collections exist in Qdrant") - - # Verify vector dimensions - info_a = qdrant_client.get_collection(collection_a) - info_b = qdrant_client.get_collection(collection_b) - - # Qdrant stores vector config in config.params.vectors - assert info_a.config.params.vectors.size == 768, "Model A should use 768 dimensions" - assert info_b.config.params.vectors.size == 1024, "Model B should use 1024 dimensions" - print(f"✅ Vector dimensions verified: {info_a.config.params.vectors.size}d vs {info_b.config.params.vectors.size}d") - - await storage_a.finalize() - await storage_b.finalize() - - -if __name__ == "__main__": - # Run tests with pytest - pytest.main([__file__, "-v", "-s"])