test: add E2E test for workspace migration isolation

Why this change is needed:
Add end-to-end test to verify the P0 bug fix for cross-workspace data
leakage during PostgreSQL migration. Unit tests use mocks and cannot verify
that real SQL queries correctly filter by workspace in actual database.

What this test does:
- Creates legacy table with MIXED data (workspace_a + workspace_b)
- Initializes LightRAG for workspace_a only
- Verifies ONLY workspace_a data migrated to new table
- Verifies workspace_b data NOT leaked to new table (0 records)
- Verifies workspace_b data preserved in legacy table (3 records)
- Verifies workspace_a data cleaned from legacy after migration (0 records)

Impact:
- tests/test_e2e_multi_instance.py: Add test_workspace_migration_isolation_e2e_postgres
- Validates multi-tenant isolation in real PostgreSQL environment
- Prevents regression of critical security fix

Testing:
E2E test passes with real PostgreSQL container, confirming workspace
filtering works correctly with actual SQL execution.
This commit is contained in:
BukeLy 2025-11-23 16:27:05 +08:00
parent cfc6587e04
commit 49bbb3a4d7

View file

@ -71,6 +71,8 @@ async def pg_cleanup(pg_config):
"lightrag_doc_full",
"lightrag_doc_chunks",
"lightrag_vdb_chunks",
"lightrag_vdb_chunks_text_embedding_ada_002_1536d",
"lightrag_vdb_chunks_text_embedding_3_large_3072d",
"lightrag_vdb_chunks_model_a_768d",
"lightrag_vdb_chunks_model_b_1024d",
"lightrag_vdb_entity",
@ -266,6 +268,7 @@ async def test_legacy_migration_postgres(
rag = LightRAG(
working_dir=temp_dir,
workspace=pg_config["workspace"], # Match workspace with test data
llm_model_func=mock_llm_func,
embedding_func=embedding_func,
tokenizer=mock_tokenizer,
@ -321,6 +324,242 @@ async def test_legacy_migration_postgres(
shutil.rmtree(temp_dir, ignore_errors=True)
# Test: Workspace migration isolation (P0 Bug Fix Verification)
@pytest.mark.asyncio
async def test_workspace_migration_isolation_e2e_postgres(
pg_cleanup, mock_llm_func, mock_tokenizer, pg_config
):
"""
E2E Test: Workspace isolation during PostgreSQL migration
Critical P0 Bug Verification:
- Legacy table contains MIXED data from workspace_a and workspace_b
- Initialize LightRAG for workspace_a only
- Verify ONLY workspace_a data migrated to new table
- Verify workspace_b data NOT leaked to workspace_a's table
- Verify workspace_b data preserved in legacy table
This test validates the fix for the cross-workspace data leakage bug
where setup_table() was copying ALL records regardless of workspace.
"""
print("\n[E2E P0 Bug Fix] Workspace migration isolation (PostgreSQL)")
import tempfile
import shutil
temp_dir = tempfile.mkdtemp(prefix="lightrag_workspace_isolation_")
try:
# Step 1: Create legacy table with MIXED workspace data
legacy_table = "lightrag_vdb_chunks"
create_legacy_sql = f"""
CREATE TABLE IF NOT EXISTS {legacy_table} (
workspace VARCHAR(255),
id VARCHAR(255) PRIMARY KEY,
content TEXT,
content_vector vector(1536),
tokens INTEGER,
chunk_order_index INTEGER,
full_doc_id VARCHAR(255),
file_path TEXT,
create_time TIMESTAMP DEFAULT NOW(),
update_time TIMESTAMP DEFAULT NOW()
)
"""
await pg_cleanup.execute(create_legacy_sql, None)
# Insert 3 records for workspace_a
for i in range(3):
vector_str = "[" + ",".join([str(0.1 + i * 0.01)] * 1536) + "]"
insert_sql = f"""
INSERT INTO {legacy_table}
(workspace, id, content, content_vector, tokens, chunk_order_index, full_doc_id, file_path)
VALUES ($1, $2, $3, $4::vector, $5, $6, $7, $8)
"""
await pg_cleanup.execute(
insert_sql,
{
"workspace": "workspace_a",
"id": f"a_{i}",
"content": f"Workspace A content {i}",
"content_vector": vector_str,
"tokens": 100,
"chunk_order_index": i,
"full_doc_id": "doc_a",
"file_path": "/workspace_a/doc.txt",
},
)
# Insert 3 records for workspace_b
for i in range(3):
vector_str = "[" + ",".join([str(0.5 + i * 0.01)] * 1536) + "]"
insert_sql = f"""
INSERT INTO {legacy_table}
(workspace, id, content, content_vector, tokens, chunk_order_index, full_doc_id, file_path)
VALUES ($1, $2, $3, $4::vector, $5, $6, $7, $8)
"""
await pg_cleanup.execute(
insert_sql,
{
"workspace": "workspace_b",
"id": f"b_{i}",
"content": f"Workspace B content {i}",
"content_vector": vector_str,
"tokens": 100,
"chunk_order_index": i,
"full_doc_id": "doc_b",
"file_path": "/workspace_b/doc.txt",
},
)
# Verify legacy table has BOTH workspaces' data
total_count_result = await pg_cleanup.query(
f"SELECT COUNT(*) as count FROM {legacy_table}", []
)
total_count = total_count_result.get("count", 0)
assert total_count == 6, f"Expected 6 total records, got {total_count}"
workspace_a_count_result = await pg_cleanup.query(
f"SELECT COUNT(*) as count FROM {legacy_table} WHERE workspace=$1",
["workspace_a"],
)
workspace_a_count = workspace_a_count_result.get("count", 0)
assert (
workspace_a_count == 3
), f"Expected 3 workspace_a records, got {workspace_a_count}"
workspace_b_count_result = await pg_cleanup.query(
f"SELECT COUNT(*) as count FROM {legacy_table} WHERE workspace=$1",
["workspace_b"],
)
workspace_b_count = workspace_b_count_result.get("count", 0)
assert (
workspace_b_count == 3
), f"Expected 3 workspace_b records, got {workspace_b_count}"
print(
f"✅ Legacy table created: {total_count} records (workspace_a: {workspace_a_count}, workspace_b: {workspace_b_count})"
)
# Step 2: Initialize LightRAG for workspace_a ONLY
async def embed_func(texts):
await asyncio.sleep(0)
return np.random.rand(len(texts), 1536)
embedding_func = EmbeddingFunc(
embedding_dim=1536,
max_token_size=8192,
func=embed_func,
model_name="text-embedding-ada-002",
)
rag = LightRAG(
working_dir=temp_dir,
workspace="workspace_a", # CRITICAL: Only workspace_a
llm_model_func=mock_llm_func,
embedding_func=embedding_func,
tokenizer=mock_tokenizer,
kv_storage="PGKVStorage",
vector_storage="PGVectorStorage",
doc_status_storage="PGDocStatusStorage",
vector_db_storage_cls_kwargs={
**pg_config,
"workspace": "workspace_a", # CRITICAL: Filter by workspace_a
"cosine_better_than_threshold": 0.8,
},
)
print("🔄 Initializing LightRAG for workspace_a (triggers migration)...")
await rag.initialize_storages()
# Step 3: Verify workspace isolation
new_table = rag.chunks_vdb.table_name
assert "text_embedding_ada_002_1536d" in new_table.lower()
print(f"✅ New table created: {new_table}")
# Verify: NEW table contains ONLY workspace_a data (3 records)
new_workspace_a_result = await pg_cleanup.query(
f"SELECT COUNT(*) as count FROM {new_table} WHERE workspace=$1",
["workspace_a"],
)
new_workspace_a_count = new_workspace_a_result.get("count", 0)
assert (
new_workspace_a_count == 3
), f"Expected 3 workspace_a records in new table, got {new_workspace_a_count}"
print(
f"✅ Migration successful: {new_workspace_a_count} workspace_a records migrated"
)
# Verify: NEW table does NOT contain workspace_b data (0 records)
new_workspace_b_result = await pg_cleanup.query(
f"SELECT COUNT(*) as count FROM {new_table} WHERE workspace=$1",
["workspace_b"],
)
new_workspace_b_count = new_workspace_b_result.get("count", 0)
assert (
new_workspace_b_count == 0
), f"workspace_b data leaked! Found {new_workspace_b_count} records in new table"
print("✅ No data leakage: 0 workspace_b records in new table (isolated)")
# Verify: LEGACY table still exists (because workspace_b data remains)
check_legacy_query = """
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_name = $1
)
"""
legacy_result = await pg_cleanup.query(
check_legacy_query, [legacy_table.lower()]
)
legacy_exists = legacy_result.get("exists", False)
assert (
legacy_exists
), f"Legacy table '{legacy_table}' should still exist (has workspace_b data)"
# Verify: LEGACY table still has workspace_b data (3 records)
legacy_workspace_b_result = await pg_cleanup.query(
f"SELECT COUNT(*) as count FROM {legacy_table} WHERE workspace=$1",
["workspace_b"],
)
legacy_workspace_b_count = legacy_workspace_b_result.get("count", 0)
assert (
legacy_workspace_b_count == 3
), f"workspace_b data lost! Only {legacy_workspace_b_count} remain in legacy table"
print(
f"✅ Legacy table preserved: {legacy_workspace_b_count} workspace_b records remain (not migrated)"
)
# Verify: LEGACY table does NOT have workspace_a data (migrated and deleted)
legacy_workspace_a_result = await pg_cleanup.query(
f"SELECT COUNT(*) as count FROM {legacy_table} WHERE workspace=$1",
["workspace_a"],
)
legacy_workspace_a_count = legacy_workspace_a_result.get("count", 0)
assert (
legacy_workspace_a_count == 0
), f"workspace_a data should be removed from legacy after migration, found {legacy_workspace_a_count}"
print(
"✅ Legacy cleanup verified: 0 workspace_a records in legacy (cleaned after migration)"
)
print(
"\n🎉 P0 Bug Fix Verified: Workspace migration isolation working correctly!"
)
print(
" - workspace_a: 3 records migrated to new table, 0 in legacy (migrated)"
)
print(
" - workspace_b: 0 records in new table (isolated), 3 in legacy (preserved)"
)
await rag.finalize_storages()
finally:
# Cleanup temp dir
shutil.rmtree(temp_dir, ignore_errors=True)
# Test: Qdrant legacy data migration
@pytest.mark.asyncio
async def test_legacy_migration_qdrant(
@ -488,6 +727,7 @@ async def test_multi_instance_postgres(
print("📦 Initializing LightRAG instance A (model-a, 768d)...")
rag_a = LightRAG(
working_dir=temp_working_dirs["workspace_a"],
workspace=pg_config["workspace"], # Use same workspace to test model isolation
llm_model_func=mock_llm_func,
embedding_func=embedding_func_a,
tokenizer=mock_tokenizer,
@ -506,6 +746,7 @@ async def test_multi_instance_postgres(
print("📦 Initializing LightRAG instance B (model-b, 1024d)...")
rag_b = LightRAG(
working_dir=temp_working_dirs["workspace_b"],
workspace=pg_config["workspace"], # Use same workspace to test model isolation
llm_model_func=mock_llm_func,
embedding_func=embedding_func_b,
tokenizer=mock_tokenizer,
@ -1179,6 +1420,7 @@ async def test_dimension_mismatch_postgres(
rag = LightRAG(
working_dir=temp_dir,
workspace=pg_config["workspace"], # Match workspace with test data
llm_model_func=mock_llm_func,
embedding_func=embedding_func_new,
tokenizer=mock_tokenizer,