diff --git a/tests/test_postgres_migration.py b/tests/test_postgres_migration.py index 46c34a36..df88e700 100644 --- a/tests/test_postgres_migration.py +++ b/tests/test_postgres_migration.py @@ -574,3 +574,232 @@ async def test_case1_nonempty_legacy_warning( print( f"✅ Case 1b: Legacy table '{dropped_table}' with data preserved (warning only)" ) + + +@pytest.mark.asyncio +async def test_case1_sequential_workspace_migration( + mock_client_manager, mock_pg_db, mock_embedding_func +): + """ + Case 1c: Sequential workspace migration (Multi-tenant scenario) + + Critical bug fix verification: + Timeline: + 1. Legacy table has workspace_a (3 records) + workspace_b (3 records) + 2. Workspace A initializes first → Case 4 (only legacy exists) → migrates A's data + 3. Workspace B initializes later → Case 1 (both tables exist) → should migrate B's data + 4. Verify workspace B's data is correctly migrated to new table + 5. Verify legacy table is cleaned up after both workspaces migrate + + This test verifies the fix where Case 1 now checks and migrates current + workspace's data instead of just checking if legacy table is empty globally. + """ + config = { + "embedding_batch_num": 10, + "vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8}, + } + + embedding_func = EmbeddingFunc( + embedding_dim=1536, + func=mock_embedding_func.func, + model_name="test-model", + ) + + # Mock data: Legacy table has 6 records total (3 from workspace_a, 3 from workspace_b) + mock_rows_a = [ + {"id": f"a_{i}", "content": f"A content {i}", "workspace": "workspace_a"} + for i in range(3) + ] + mock_rows_b = [ + {"id": f"b_{i}", "content": f"B content {i}", "workspace": "workspace_b"} + for i in range(3) + ] + + # Track migration state + migration_state = {"new_table_exists": False, "workspace_a_migrated": False} + + # Step 1: Simulate workspace_a initialization (Case 4) + # CRITICAL: Set db.workspace to workspace_a + mock_pg_db.workspace = "workspace_a" + + storage_a = PGVectorStorage( + namespace=NameSpace.VECTOR_STORE_CHUNKS, + global_config=config, + embedding_func=embedding_func, + workspace="workspace_a", + ) + + # Mock table_exists for workspace_a + async def mock_table_exists_a(db, table_name): + if table_name == storage_a.legacy_table_name: + return True + if table_name == storage_a.table_name: + return migration_state["new_table_exists"] + return False + + # Track inserted records count for verification + inserted_count = {"workspace_a": 0} + + # Mock execute to track inserts + async def mock_execute_a(sql, data=None, **kwargs): + if sql and "INSERT INTO" in sql.upper(): + inserted_count["workspace_a"] += 1 + return None + + # Mock query for workspace_a (Case 4) + async def mock_query_a(sql, params=None, multirows=False, **kwargs): + sql_upper = sql.upper() + base_name = storage_a.legacy_table_name.upper() + + if "COUNT(*)" in sql: + has_model_suffix = "TEST_MODEL_1536D" in sql_upper + is_legacy = base_name in sql_upper and not has_model_suffix + has_workspace_filter = "WHERE workspace" in sql + + if is_legacy and has_workspace_filter: + workspace = params[0] if params and len(params) > 0 else None + if workspace == "workspace_a": + # After migration starts, pretend legacy is empty for this workspace + return {"count": 3 - inserted_count["workspace_a"]} + elif workspace == "workspace_b": + return {"count": 3} + elif is_legacy and not has_workspace_filter: + # Global count in legacy table + remaining = 6 - inserted_count["workspace_a"] + return {"count": remaining} + elif has_model_suffix: + # New table count (for verification) + return {"count": inserted_count["workspace_a"]} + elif multirows and "SELECT *" in sql: + if "WHERE workspace" in sql: + workspace = params[0] if params and len(params) > 0 else None + if workspace == "workspace_a": + offset = params[1] if len(params) > 1 else 0 + limit = params[2] if len(params) > 2 else 500 + return mock_rows_a[offset : offset + limit] + return {} + + mock_pg_db.query = AsyncMock(side_effect=mock_query_a) + mock_pg_db.execute = AsyncMock(side_effect=mock_execute_a) + + # Initialize workspace_a (Case 4) + with ( + patch( + "lightrag.kg.postgres_impl._pg_table_exists", + side_effect=mock_table_exists_a, + ), + patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()), + ): + await storage_a.initialize() + migration_state["new_table_exists"] = True + migration_state["workspace_a_migrated"] = True + + print("✅ Step 1: Workspace A initialized (Case 4)") + assert mock_pg_db.execute.call_count >= 3 + print(f"✅ Step 1: {mock_pg_db.execute.call_count} execute calls") + + # Step 2: Simulate workspace_b initialization (Case 1) + # CRITICAL: Set db.workspace to workspace_b + mock_pg_db.workspace = "workspace_b" + + storage_b = PGVectorStorage( + namespace=NameSpace.VECTOR_STORE_CHUNKS, + global_config=config, + embedding_func=embedding_func, + workspace="workspace_b", + ) + + mock_pg_db.reset_mock() + migration_state["workspace_b_migrated"] = False + + # Mock table_exists for workspace_b (both exist) + async def mock_table_exists_b(db, table_name): + return True + + # Track inserted records count for workspace_b + inserted_count["workspace_b"] = 0 + + # Mock execute for workspace_b to track inserts + async def mock_execute_b(sql, data=None, **kwargs): + if sql and "INSERT INTO" in sql.upper(): + inserted_count["workspace_b"] += 1 + return None + + # Mock query for workspace_b (Case 1) + async def mock_query_b(sql, params=None, multirows=False, **kwargs): + sql_upper = sql.upper() + base_name = storage_b.legacy_table_name.upper() + + if "COUNT(*)" in sql: + has_model_suffix = "TEST_MODEL_1536D" in sql_upper + is_legacy = base_name in sql_upper and not has_model_suffix + has_workspace_filter = "WHERE workspace" in sql + + if is_legacy and has_workspace_filter: + workspace = params[0] if params and len(params) > 0 else None + if workspace == "workspace_b": + # After migration starts, pretend legacy is empty for this workspace + return {"count": 3 - inserted_count["workspace_b"]} + elif workspace == "workspace_a": + return {"count": 0} # Already migrated + elif is_legacy and not has_workspace_filter: + # Global count: only workspace_b data remains + return {"count": 3 - inserted_count["workspace_b"]} + elif has_model_suffix: + # New table total count (workspace_a: 3 + workspace_b: inserted) + if has_workspace_filter: + workspace = params[0] if params and len(params) > 0 else None + if workspace == "workspace_b": + return {"count": inserted_count["workspace_b"]} + elif workspace == "workspace_a": + return {"count": 3} + else: + # Total count in new table (for verification) + return {"count": 3 + inserted_count["workspace_b"]} + elif multirows and "SELECT *" in sql: + if "WHERE workspace" in sql: + workspace = params[0] if params and len(params) > 0 else None + if workspace == "workspace_b": + offset = params[1] if len(params) > 1 else 0 + limit = params[2] if len(params) > 2 else 500 + return mock_rows_b[offset : offset + limit] + return {} + + mock_pg_db.query = AsyncMock(side_effect=mock_query_b) + mock_pg_db.execute = AsyncMock(side_effect=mock_execute_b) + + # Initialize workspace_b (Case 1) + with patch( + "lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists_b + ): + await storage_b.initialize() + migration_state["workspace_b_migrated"] = True + + print("✅ Step 2: Workspace B initialized (Case 1)") + + # Verify workspace_b migration happened + execute_calls = mock_pg_db.execute.call_args_list + insert_calls = [ + call for call in execute_calls if call[0][0] and "INSERT INTO" in call[0][0] + ] + assert len(insert_calls) >= 3, f"Expected >= 3 inserts, got {len(insert_calls)}" + print(f"✅ Step 2: {len(insert_calls)} insert calls") + + # Verify DELETE and DROP TABLE + delete_calls = [ + call + for call in execute_calls + if call[0][0] + and "DELETE FROM" in call[0][0] + and "WHERE workspace" in call[0][0] + ] + assert len(delete_calls) >= 1, "Expected DELETE workspace_b data" + print("✅ Step 2: DELETE workspace_b from legacy") + + drop_calls = [ + call for call in execute_calls if call[0][0] and "DROP TABLE" in call[0][0] + ] + assert len(drop_calls) >= 1, "Expected DROP TABLE" + print("✅ Step 2: Legacy table dropped") + + print("\n🎉 Case 1c: Sequential workspace migration verified!") diff --git a/uv.lock b/uv.lock index 97703af0..019f7539 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.10" resolution-markers = [ "python_full_version >= '3.14' and python_full_version < '4' and platform_machine == 'x86_64' and sys_platform == 'darwin'", @@ -2735,7 +2735,6 @@ requires-dist = [ { name = "json-repair", marker = "extra == 'api'" }, { name = "langfuse", marker = "extra == 'observability'", specifier = ">=3.8.1" }, { name = "lightrag-hku", extras = ["api", "offline-llm", "offline-storage"], marker = "extra == 'offline'" }, - { name = "lightrag-hku", extras = ["pytest"], marker = "extra == 'evaluation'" }, { name = "llama-index", marker = "extra == 'offline-llm'", specifier = ">=0.9.0,<1.0.0" }, { name = "nano-vectordb" }, { name = "nano-vectordb", marker = "extra == 'api'" }, @@ -2753,6 +2752,7 @@ requires-dist = [ { name = "passlib", extras = ["bcrypt"], marker = "extra == 'api'" }, { name = "pipmaster" }, { name = "pipmaster", marker = "extra == 'api'" }, + { name = "pre-commit", marker = "extra == 'evaluation'" }, { name = "pre-commit", marker = "extra == 'pytest'" }, { name = "psutil", marker = "extra == 'api'" }, { name = "pycryptodome", marker = "extra == 'api'", specifier = ">=3.0.0,<4.0.0" }, @@ -2764,7 +2764,9 @@ requires-dist = [ { name = "pypdf", marker = "extra == 'api'", specifier = ">=6.1.0" }, { name = "pypinyin" }, { name = "pypinyin", marker = "extra == 'api'" }, + { name = "pytest", marker = "extra == 'evaluation'", specifier = ">=8.4.2" }, { name = "pytest", marker = "extra == 'pytest'", specifier = ">=8.4.2" }, + { name = "pytest-asyncio", marker = "extra == 'evaluation'", specifier = ">=1.2.0" }, { name = "pytest-asyncio", marker = "extra == 'pytest'", specifier = ">=1.2.0" }, { name = "python-docx", marker = "extra == 'api'", specifier = ">=0.8.11,<2.0.0" }, { name = "python-dotenv" },