LightRAG/tests/test_postgres_migration.py
BukeLy 0fb7c5bc3b test: add unit test for Case 1 sequential workspace migration bug
Add test_case1_sequential_workspace_migration to verify the fix for
the multi-tenant data loss bug in PostgreSQL Case 1 migration.

Problem:
- When workspace_a migrates first (Case 4: only legacy table exists)
- Then workspace_b initializes later (Case 1: both tables exist)
- Bug: Case 1 only checked if legacy table was globally empty
- Result: workspace_b's data was not migrated, causing data loss

Test Scenario:
1. Legacy table contains data from both workspace_a (3 records) and
   workspace_b (3 records)
2. workspace_a initializes first → triggers Case 4 migration
3. workspace_b initializes second → triggers Case 1 migration
4. Verify workspace_b's data is correctly migrated to new table
5. Verify workspace_b's data is deleted from legacy table
6. Verify legacy table is dropped when empty

This test uses mock tracking of inserted records to verify migration
behavior without requiring a real PostgreSQL database.

Related: GitHub PR #2391 comment #2553973066
2025-11-26 01:32:25 +08:00

805 lines
28 KiB
Python

import pytest
from unittest.mock import patch, AsyncMock
import numpy as np
from lightrag.utils import EmbeddingFunc
from lightrag.kg.postgres_impl import (
PGVectorStorage,
)
from lightrag.namespace import NameSpace
# Mock PostgreSQLDB
@pytest.fixture
def mock_pg_db():
"""Mock PostgreSQL database connection"""
db = AsyncMock()
db.workspace = "test_workspace"
# Mock query responses with multirows support
async def mock_query(sql, params=None, multirows=False, **kwargs):
# Default return value
if multirows:
return [] # Return empty list for multirows
return {"exists": False, "count": 0}
# Mock for execute that mimics PostgreSQLDB.execute() behavior
async def mock_execute(sql, data=None, **kwargs):
"""
Mock that mimics PostgreSQLDB.execute() behavior:
- Accepts data as dict[str, Any] | None (second parameter)
- Internally converts dict.values() to tuple for AsyncPG
"""
# Mimic real execute() which accepts dict and converts to tuple
if data is not None and not isinstance(data, dict):
raise TypeError(
f"PostgreSQLDB.execute() expects data as dict, got {type(data).__name__}"
)
return None
db.query = AsyncMock(side_effect=mock_query)
db.execute = AsyncMock(side_effect=mock_execute)
return db
# Mock get_data_init_lock to avoid async lock issues in tests
@pytest.fixture(autouse=True)
def mock_data_init_lock():
with patch("lightrag.kg.postgres_impl.get_data_init_lock") as mock_lock:
mock_lock_ctx = AsyncMock()
mock_lock.return_value = mock_lock_ctx
yield mock_lock
# Mock ClientManager
@pytest.fixture
def mock_client_manager(mock_pg_db):
with patch("lightrag.kg.postgres_impl.ClientManager") as mock_manager:
mock_manager.get_client = AsyncMock(return_value=mock_pg_db)
mock_manager.release_client = AsyncMock()
yield mock_manager
# Mock Embedding function
@pytest.fixture
def mock_embedding_func():
async def embed_func(texts, **kwargs):
return np.array([[0.1] * 768 for _ in texts])
func = EmbeddingFunc(embedding_dim=768, func=embed_func, model_name="test_model")
return func
@pytest.mark.asyncio
async def test_postgres_table_naming(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""Test if table name is correctly generated with model suffix"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=mock_embedding_func,
workspace="test_ws",
)
# Verify table name contains model suffix
expected_suffix = "test_model_768d"
assert expected_suffix in storage.table_name
assert storage.table_name == f"LIGHTRAG_VDB_CHUNKS_{expected_suffix}"
# Verify legacy table name
assert storage.legacy_table_name == "LIGHTRAG_VDB_CHUNKS"
@pytest.mark.asyncio
async def test_postgres_migration_trigger(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""Test if migration logic is triggered correctly"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=mock_embedding_func,
workspace="test_ws",
)
# Setup mocks for migration scenario
# 1. New table does not exist, legacy table exists
async def mock_table_exists(db, table_name):
return table_name == storage.legacy_table_name
# 2. Legacy table has 100 records
mock_rows = [
{"id": f"test_id_{i}", "content": f"content_{i}", "workspace": "test_ws"}
for i in range(100)
]
async def mock_query(sql, params=None, multirows=False, **kwargs):
if "COUNT(*)" in sql:
return {"count": 100}
elif multirows and "SELECT *" in sql:
# Mock batch fetch for migration
# Handle workspace filtering: params = [workspace, offset, limit] or [offset, limit]
if "WHERE workspace" in sql:
# With workspace filter: params[0]=workspace, params[1]=offset, params[2]=limit
offset = params[1] if len(params) > 1 else 0
limit = params[2] if len(params) > 2 else 500
else:
# No workspace filter: params[0]=offset, params[1]=limit
offset = params[0] if params else 0
limit = params[1] if len(params) > 1 else 500
start = offset
end = min(offset + limit, len(mock_rows))
return mock_rows[start:end]
return {}
mock_pg_db.query = AsyncMock(side_effect=mock_query)
with (
patch(
"lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists
),
patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()),
):
# Initialize storage (should trigger migration)
await storage.initialize()
# Verify migration was executed
# Check that execute was called for inserting rows
assert mock_pg_db.execute.call_count > 0
@pytest.mark.asyncio
async def test_postgres_no_migration_needed(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""Test scenario where new table already exists (no migration needed)"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=mock_embedding_func,
workspace="test_ws",
)
# Mock: new table already exists
async def mock_table_exists(db, table_name):
return table_name == storage.table_name
with (
patch(
"lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists
),
patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()) as mock_create,
):
await storage.initialize()
# Verify no table creation was attempted
mock_create.assert_not_called()
@pytest.mark.asyncio
async def test_scenario_1_new_workspace_creation(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""
Scenario 1: New workspace creation
Expected behavior:
- No legacy table exists
- Directly create new table with model suffix
- No migration needed
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
embedding_func = EmbeddingFunc(
embedding_dim=3072,
func=mock_embedding_func.func,
model_name="text-embedding-3-large",
)
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func,
workspace="new_workspace",
)
# Mock: neither table exists
async def mock_table_exists(db, table_name):
return False
with (
patch(
"lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists
),
patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()) as mock_create,
):
await storage.initialize()
# Verify table name format
assert "text_embedding_3_large_3072d" in storage.table_name
# Verify new table creation was called
mock_create.assert_called_once()
call_args = mock_create.call_args
assert (
call_args[0][1] == storage.table_name
) # table_name is second positional arg
@pytest.mark.asyncio
async def test_scenario_2_legacy_upgrade_migration(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""
Scenario 2: Upgrade from legacy version
Expected behavior:
- Legacy table exists (without model suffix)
- New table doesn't exist
- Automatically migrate data to new table with suffix
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
embedding_func = EmbeddingFunc(
embedding_dim=1536,
func=mock_embedding_func.func,
model_name="text-embedding-ada-002",
)
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func,
workspace="legacy_workspace",
)
# Mock: only legacy table exists
async def mock_table_exists(db, table_name):
return table_name == storage.legacy_table_name
# Mock: legacy table has 50 records
mock_rows = [
{
"id": f"legacy_id_{i}",
"content": f"legacy_content_{i}",
"workspace": "legacy_workspace",
}
for i in range(50)
]
# Track which queries have been made for proper response
query_history = []
async def mock_query(sql, params=None, multirows=False, **kwargs):
query_history.append(sql)
if "COUNT(*)" in sql:
# Determine table type:
# - Legacy: contains base name but NOT model suffix
# - New: contains model suffix (e.g., text_embedding_ada_002_1536d)
sql_upper = sql.upper()
base_name = storage.legacy_table_name.upper()
# Check if this is querying the new table (has model suffix)
has_model_suffix = any(
suffix in sql_upper
for suffix in ["TEXT_EMBEDDING", "_1536D", "_768D", "_1024D", "_3072D"]
)
is_legacy_table = base_name in sql_upper and not has_model_suffix
is_new_table = has_model_suffix
has_workspace_filter = "WHERE workspace" in sql
if is_legacy_table and has_workspace_filter:
# Count for legacy table with workspace filter (before migration)
return {"count": 50}
elif is_legacy_table and not has_workspace_filter:
# Total count for legacy table (after deletion, checking remaining)
return {"count": 0}
elif is_new_table:
# Count for new table (verification after migration)
return {"count": 50}
else:
# Fallback
return {"count": 0}
elif multirows and "SELECT *" in sql:
# Mock batch fetch for migration
# Handle workspace filtering: params = [workspace, offset, limit] or [offset, limit]
if "WHERE workspace" in sql:
# With workspace filter: params[0]=workspace, params[1]=offset, params[2]=limit
offset = params[1] if len(params) > 1 else 0
limit = params[2] if len(params) > 2 else 500
else:
# No workspace filter: params[0]=offset, params[1]=limit
offset = params[0] if params else 0
limit = params[1] if len(params) > 1 else 500
start = offset
end = min(offset + limit, len(mock_rows))
return mock_rows[start:end]
return {}
mock_pg_db.query = AsyncMock(side_effect=mock_query)
with (
patch(
"lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists
),
patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()) as mock_create,
):
await storage.initialize()
# Verify table name contains ada-002
assert "text_embedding_ada_002_1536d" in storage.table_name
# Verify migration was executed
assert mock_pg_db.execute.call_count >= 50 # At least one execute per row
mock_create.assert_called_once()
# Verify legacy table was automatically deleted after successful migration
# This prevents Case 1 warnings on next startup
delete_calls = [
call
for call in mock_pg_db.execute.call_args_list
if call[0][0] and "DROP TABLE" in call[0][0]
]
assert (
len(delete_calls) >= 1
), "Legacy table should be deleted after successful migration"
# Check if legacy table was dropped
dropped_table = storage.legacy_table_name
assert any(
dropped_table in str(call) for call in delete_calls
), f"Expected to drop '{dropped_table}'"
@pytest.mark.asyncio
async def test_scenario_3_multi_model_coexistence(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""
Scenario 3: Multiple embedding models coexist
Expected behavior:
- Different embedding models create separate tables
- Tables are isolated by model suffix
- No interference between different models
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
# Workspace A: uses bge-small (768d)
embedding_func_a = EmbeddingFunc(
embedding_dim=768, func=mock_embedding_func.func, model_name="bge-small"
)
storage_a = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func_a,
workspace="workspace_a",
)
# Workspace B: uses bge-large (1024d)
async def embed_func_b(texts, **kwargs):
return np.array([[0.1] * 1024 for _ in texts])
embedding_func_b = EmbeddingFunc(
embedding_dim=1024, func=embed_func_b, model_name="bge-large"
)
storage_b = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func_b,
workspace="workspace_b",
)
# Verify different table names
assert storage_a.table_name != storage_b.table_name
assert "bge_small_768d" in storage_a.table_name
assert "bge_large_1024d" in storage_b.table_name
# Mock: both tables don't exist yet
async def mock_table_exists(db, table_name):
return False
with (
patch(
"lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists
),
patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()) as mock_create,
):
# Initialize both storages
await storage_a.initialize()
await storage_b.initialize()
# Verify two separate tables were created
assert mock_create.call_count == 2
# Verify table names are different
call_args_list = mock_create.call_args_list
table_names = [call[0][1] for call in call_args_list] # Second positional arg
assert len(set(table_names)) == 2 # Two unique table names
assert storage_a.table_name in table_names
assert storage_b.table_name in table_names
@pytest.mark.asyncio
async def test_case1_empty_legacy_auto_cleanup(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""
Case 1a: Both new and legacy tables exist, but legacy is EMPTY
Expected: Automatically delete empty legacy table (safe cleanup)
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
embedding_func = EmbeddingFunc(
embedding_dim=1536,
func=mock_embedding_func.func,
model_name="test-model",
)
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func,
workspace="test_ws",
)
# Mock: Both tables exist
async def mock_table_exists(db, table_name):
return True # Both new and legacy exist
# Mock: Legacy table is empty (0 records)
async def mock_query(sql, params=None, multirows=False, **kwargs):
if "COUNT(*)" in sql:
if storage.legacy_table_name in sql:
return {"count": 0} # Empty legacy table
else:
return {"count": 100} # New table has data
return {}
mock_pg_db.query = AsyncMock(side_effect=mock_query)
with patch(
"lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists
):
await storage.initialize()
# Verify: Empty legacy table should be automatically cleaned up
# Empty tables are safe to delete without data loss risk
delete_calls = [
call
for call in mock_pg_db.execute.call_args_list
if call[0][0] and "DROP TABLE" in call[0][0]
]
assert len(delete_calls) >= 1, "Empty legacy table should be auto-deleted"
# Check if legacy table was dropped
dropped_table = storage.legacy_table_name
assert any(
dropped_table in str(call) for call in delete_calls
), f"Expected to drop empty legacy table '{dropped_table}'"
print(
f"✅ Case 1a: Empty legacy table '{dropped_table}' auto-deleted successfully"
)
@pytest.mark.asyncio
async def test_case1_nonempty_legacy_warning(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""
Case 1b: Both new and legacy tables exist, and legacy HAS DATA
Expected: Log warning, do not delete legacy (preserve data)
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
embedding_func = EmbeddingFunc(
embedding_dim=1536,
func=mock_embedding_func.func,
model_name="test-model",
)
storage = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func,
workspace="test_ws",
)
# Mock: Both tables exist
async def mock_table_exists(db, table_name):
return True # Both new and legacy exist
# Mock: Legacy table has data (50 records)
async def mock_query(sql, params=None, multirows=False, **kwargs):
if "COUNT(*)" in sql:
if storage.legacy_table_name in sql:
return {"count": 50} # Legacy has data
else:
return {"count": 100} # New table has data
return {}
mock_pg_db.query = AsyncMock(side_effect=mock_query)
with patch(
"lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists
):
await storage.initialize()
# Verify: Legacy table with data should be preserved
# We never auto-delete tables that contain data to prevent accidental data loss
delete_calls = [
call
for call in mock_pg_db.execute.call_args_list
if call[0][0] and "DROP TABLE" in call[0][0]
]
# Check if legacy table was deleted (it should not be)
dropped_table = storage.legacy_table_name
legacy_deleted = any(dropped_table in str(call) for call in delete_calls)
assert not legacy_deleted, "Legacy table with data should NOT be auto-deleted"
print(
f"✅ Case 1b: Legacy table '{dropped_table}' with data preserved (warning only)"
)
@pytest.mark.asyncio
async def test_case1_sequential_workspace_migration(
mock_client_manager, mock_pg_db, mock_embedding_func
):
"""
Case 1c: Sequential workspace migration (Multi-tenant scenario)
Critical bug fix verification:
Timeline:
1. Legacy table has workspace_a (3 records) + workspace_b (3 records)
2. Workspace A initializes first → Case 4 (only legacy exists) → migrates A's data
3. Workspace B initializes later → Case 1 (both tables exist) → should migrate B's data
4. Verify workspace B's data is correctly migrated to new table
5. Verify legacy table is cleaned up after both workspaces migrate
This test verifies the fix where Case 1 now checks and migrates current
workspace's data instead of just checking if legacy table is empty globally.
"""
config = {
"embedding_batch_num": 10,
"vector_db_storage_cls_kwargs": {"cosine_better_than_threshold": 0.8},
}
embedding_func = EmbeddingFunc(
embedding_dim=1536,
func=mock_embedding_func.func,
model_name="test-model",
)
# Mock data: Legacy table has 6 records total (3 from workspace_a, 3 from workspace_b)
mock_rows_a = [
{"id": f"a_{i}", "content": f"A content {i}", "workspace": "workspace_a"}
for i in range(3)
]
mock_rows_b = [
{"id": f"b_{i}", "content": f"B content {i}", "workspace": "workspace_b"}
for i in range(3)
]
# Track migration state
migration_state = {"new_table_exists": False, "workspace_a_migrated": False}
# Step 1: Simulate workspace_a initialization (Case 4)
# CRITICAL: Set db.workspace to workspace_a
mock_pg_db.workspace = "workspace_a"
storage_a = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func,
workspace="workspace_a",
)
# Mock table_exists for workspace_a
async def mock_table_exists_a(db, table_name):
if table_name == storage_a.legacy_table_name:
return True
if table_name == storage_a.table_name:
return migration_state["new_table_exists"]
return False
# Track inserted records count for verification
inserted_count = {"workspace_a": 0}
# Mock execute to track inserts
async def mock_execute_a(sql, data=None, **kwargs):
if sql and "INSERT INTO" in sql.upper():
inserted_count["workspace_a"] += 1
return None
# Mock query for workspace_a (Case 4)
async def mock_query_a(sql, params=None, multirows=False, **kwargs):
sql_upper = sql.upper()
base_name = storage_a.legacy_table_name.upper()
if "COUNT(*)" in sql:
has_model_suffix = "TEST_MODEL_1536D" in sql_upper
is_legacy = base_name in sql_upper and not has_model_suffix
has_workspace_filter = "WHERE workspace" in sql
if is_legacy and has_workspace_filter:
workspace = params[0] if params and len(params) > 0 else None
if workspace == "workspace_a":
# After migration starts, pretend legacy is empty for this workspace
return {"count": 3 - inserted_count["workspace_a"]}
elif workspace == "workspace_b":
return {"count": 3}
elif is_legacy and not has_workspace_filter:
# Global count in legacy table
remaining = 6 - inserted_count["workspace_a"]
return {"count": remaining}
elif has_model_suffix:
# New table count (for verification)
return {"count": inserted_count["workspace_a"]}
elif multirows and "SELECT *" in sql:
if "WHERE workspace" in sql:
workspace = params[0] if params and len(params) > 0 else None
if workspace == "workspace_a":
offset = params[1] if len(params) > 1 else 0
limit = params[2] if len(params) > 2 else 500
return mock_rows_a[offset : offset + limit]
return {}
mock_pg_db.query = AsyncMock(side_effect=mock_query_a)
mock_pg_db.execute = AsyncMock(side_effect=mock_execute_a)
# Initialize workspace_a (Case 4)
with (
patch(
"lightrag.kg.postgres_impl._pg_table_exists",
side_effect=mock_table_exists_a,
),
patch("lightrag.kg.postgres_impl._pg_create_table", AsyncMock()),
):
await storage_a.initialize()
migration_state["new_table_exists"] = True
migration_state["workspace_a_migrated"] = True
print("✅ Step 1: Workspace A initialized (Case 4)")
assert mock_pg_db.execute.call_count >= 3
print(f"✅ Step 1: {mock_pg_db.execute.call_count} execute calls")
# Step 2: Simulate workspace_b initialization (Case 1)
# CRITICAL: Set db.workspace to workspace_b
mock_pg_db.workspace = "workspace_b"
storage_b = PGVectorStorage(
namespace=NameSpace.VECTOR_STORE_CHUNKS,
global_config=config,
embedding_func=embedding_func,
workspace="workspace_b",
)
mock_pg_db.reset_mock()
migration_state["workspace_b_migrated"] = False
# Mock table_exists for workspace_b (both exist)
async def mock_table_exists_b(db, table_name):
return True
# Track inserted records count for workspace_b
inserted_count["workspace_b"] = 0
# Mock execute for workspace_b to track inserts
async def mock_execute_b(sql, data=None, **kwargs):
if sql and "INSERT INTO" in sql.upper():
inserted_count["workspace_b"] += 1
return None
# Mock query for workspace_b (Case 1)
async def mock_query_b(sql, params=None, multirows=False, **kwargs):
sql_upper = sql.upper()
base_name = storage_b.legacy_table_name.upper()
if "COUNT(*)" in sql:
has_model_suffix = "TEST_MODEL_1536D" in sql_upper
is_legacy = base_name in sql_upper and not has_model_suffix
has_workspace_filter = "WHERE workspace" in sql
if is_legacy and has_workspace_filter:
workspace = params[0] if params and len(params) > 0 else None
if workspace == "workspace_b":
# After migration starts, pretend legacy is empty for this workspace
return {"count": 3 - inserted_count["workspace_b"]}
elif workspace == "workspace_a":
return {"count": 0} # Already migrated
elif is_legacy and not has_workspace_filter:
# Global count: only workspace_b data remains
return {"count": 3 - inserted_count["workspace_b"]}
elif has_model_suffix:
# New table total count (workspace_a: 3 + workspace_b: inserted)
if has_workspace_filter:
workspace = params[0] if params and len(params) > 0 else None
if workspace == "workspace_b":
return {"count": inserted_count["workspace_b"]}
elif workspace == "workspace_a":
return {"count": 3}
else:
# Total count in new table (for verification)
return {"count": 3 + inserted_count["workspace_b"]}
elif multirows and "SELECT *" in sql:
if "WHERE workspace" in sql:
workspace = params[0] if params and len(params) > 0 else None
if workspace == "workspace_b":
offset = params[1] if len(params) > 1 else 0
limit = params[2] if len(params) > 2 else 500
return mock_rows_b[offset : offset + limit]
return {}
mock_pg_db.query = AsyncMock(side_effect=mock_query_b)
mock_pg_db.execute = AsyncMock(side_effect=mock_execute_b)
# Initialize workspace_b (Case 1)
with patch(
"lightrag.kg.postgres_impl._pg_table_exists", side_effect=mock_table_exists_b
):
await storage_b.initialize()
migration_state["workspace_b_migrated"] = True
print("✅ Step 2: Workspace B initialized (Case 1)")
# Verify workspace_b migration happened
execute_calls = mock_pg_db.execute.call_args_list
insert_calls = [
call for call in execute_calls if call[0][0] and "INSERT INTO" in call[0][0]
]
assert len(insert_calls) >= 3, f"Expected >= 3 inserts, got {len(insert_calls)}"
print(f"✅ Step 2: {len(insert_calls)} insert calls")
# Verify DELETE and DROP TABLE
delete_calls = [
call
for call in execute_calls
if call[0][0]
and "DELETE FROM" in call[0][0]
and "WHERE workspace" in call[0][0]
]
assert len(delete_calls) >= 1, "Expected DELETE workspace_b data"
print("✅ Step 2: DELETE workspace_b from legacy")
drop_calls = [
call for call in execute_calls if call[0][0] and "DROP TABLE" in call[0][0]
]
assert len(drop_calls) >= 1, "Expected DROP TABLE"
print("✅ Step 2: Legacy table dropped")
print("\n🎉 Case 1c: Sequential workspace migration verified!")