This commit is contained in:
Raphaël MANSUY 2025-12-04 19:14:31 +08:00
parent 107b32aa8d
commit 68cc386456

View file

@ -1,21 +1,12 @@
#!/usr/bin/env python #!/usr/bin/env python
""" """
Test script for Workspace Isolation Feature Test script for PR #2366: Workspace Isolation Feature
Comprehensive test suite covering workspace isolation in LightRAG: Tests the 4 key scenarios mentioned in PR description:
1. Pipeline Status Isolation - Data isolation between workspaces 1. Multi-Workspace Concurrency Test
2. Lock Mechanism - Parallel execution for different workspaces, serial for same workspace 2. Pipeline Status Isolation Test
3. Backward Compatibility - Legacy code without workspace parameters 3. Backward Compatibility Test
4. Multi-Workspace Concurrency - Concurrent operations on different workspaces 4. Lock Mechanism Test
5. NamespaceLock Re-entrance Protection - Prevents deadlocks
6. Different Namespace Lock Isolation - Locks isolated by namespace
7. Error Handling - Invalid workspace configurations
8. Update Flags Workspace Isolation - Update flags properly isolated
9. Empty Workspace Standardization - Empty workspace handling
10. JsonKVStorage Workspace Isolation - Integration test for KV storage
11. LightRAG End-to-End Workspace Isolation - Complete E2E test with two instances
Total: 11 test scenarios
""" """
import asyncio import asyncio
@ -26,14 +17,12 @@ import tempfile
import numpy as np import numpy as np
import pytest import pytest
from pathlib import Path from pathlib import Path
from typing import List, Tuple, Dict
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_final_namespace, get_final_namespace,
get_namespace_lock, get_namespace_lock,
get_default_workspace, get_default_workspace,
set_default_workspace, set_default_workspace,
initialize_share_data, initialize_share_data,
finalize_share_data,
initialize_pipeline_status, initialize_pipeline_status,
get_namespace_data, get_namespace_data,
set_all_update_flags, set_all_update_flags,
@ -41,16 +30,7 @@ from lightrag.kg.shared_storage import (
get_all_update_flags_status, get_all_update_flags_status,
get_update_flag, get_update_flag,
) )
from lightrag.kg.json_kv_impl import JsonKVStorage
# =============================================================================
# Test Configuration
# =============================================================================
# Stress test configuration (enable via environment variable)
STRESS_TEST_MODE = os.getenv("LIGHTRAG_STRESS_TEST", "false").lower() == "true"
PARALLEL_WORKERS = int(os.getenv("LIGHTRAG_TEST_WORKERS", "3"))
KEEP_TEST_ARTIFACTS = os.getenv("LIGHTRAG_KEEP_ARTIFACTS", "false").lower() == "true"
# ============================================================================= # =============================================================================
@ -63,85 +43,7 @@ def setup_shared_data():
"""Initialize shared data before each test""" """Initialize shared data before each test"""
initialize_share_data() initialize_share_data()
yield yield
finalize_share_data() # Cleanup after test if needed
async def _measure_lock_parallelism(
workload: List[Tuple[str, str, str]], hold_time: float = 0.05
) -> Tuple[int, List[Tuple[str, str]], Dict[str, float]]:
"""Run lock acquisition workload and capture peak concurrency and timeline.
Args:
workload: List of (name, workspace, namespace) tuples
hold_time: How long each worker holds the lock (seconds)
Returns:
Tuple of (max_parallel, timeline, metrics) where:
- max_parallel: Peak number of concurrent lock holders
- timeline: List of (name, event) tuples tracking execution order
- metrics: Dict with performance metrics (total_duration, max_concurrency, etc.)
"""
running = 0
max_parallel = 0
timeline: List[Tuple[str, str]] = []
start_time = time.time()
async def worker(name: str, workspace: str, namespace: str) -> None:
nonlocal running, max_parallel
lock = get_namespace_lock(namespace, workspace)
async with lock:
running += 1
max_parallel = max(max_parallel, running)
timeline.append((name, "start"))
await asyncio.sleep(hold_time)
timeline.append((name, "end"))
running -= 1
await asyncio.gather(*(worker(*args) for args in workload))
metrics = {
"total_duration": time.time() - start_time,
"max_concurrency": max_parallel,
"avg_hold_time": hold_time,
"num_workers": len(workload),
}
return max_parallel, timeline, metrics
def _assert_no_timeline_overlap(timeline: List[Tuple[str, str]]) -> None:
"""Ensure that timeline events never overlap for sequential execution.
This function implements a finite state machine that validates:
- No overlapping lock acquisitions (only one task active at a time)
- Proper lock release order (task releases its own lock)
- All locks are properly released
Args:
timeline: List of (name, event) tuples where event is "start" or "end"
Raises:
AssertionError: If timeline shows overlapping execution or improper locking
"""
active_task = None
for name, event in timeline:
if event == "start":
if active_task is not None:
raise AssertionError(
f"Task '{name}' started before '{active_task}' released the lock"
)
active_task = name
else:
if active_task != name:
raise AssertionError(
f"Task '{name}' finished while '{active_task}' was expected to hold the lock"
)
active_task = None
if active_task is not None:
raise AssertionError(f"Task '{active_task}' did not release the lock properly")
# ============================================================================= # =============================================================================
@ -154,8 +56,6 @@ async def test_pipeline_status_isolation():
""" """
Test that pipeline status is isolated between different workspaces. Test that pipeline status is isolated between different workspaces.
""" """
# Purpose: Ensure pipeline_status shared data remains unique per workspace.
# Scope: initialize_pipeline_status and get_namespace_data interactions.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 1: Pipeline Status Isolation") print("TEST 1: Pipeline Status Isolation")
print("=" * 60) print("=" * 60)
@ -175,9 +75,7 @@ async def test_pipeline_status_isolation():
data2 = await get_namespace_data("pipeline_status", workspace=workspace2) data2 = await get_namespace_data("pipeline_status", workspace=workspace2)
# Verify they are independent objects # Verify they are independent objects
assert ( assert data1 is not data2, "Pipeline status data objects are the same (should be different)"
data1 is not data2
), "Pipeline status data objects are the same (should be different)"
# Modify workspace1's data and verify workspace2 is not affected # Modify workspace1's data and verify workspace2 is not affected
data1["test_key"] = "workspace1_value" data1["test_key"] = "workspace1_value"
@ -187,12 +85,8 @@ async def test_pipeline_status_isolation():
data2_check = await get_namespace_data("pipeline_status", workspace=workspace2) data2_check = await get_namespace_data("pipeline_status", workspace=workspace2)
assert "test_key" in data1_check, "test_key not found in workspace1" assert "test_key" in data1_check, "test_key not found in workspace1"
assert ( assert data1_check["test_key"] == "workspace1_value", f"workspace1 test_key value incorrect: {data1_check.get('test_key')}"
data1_check["test_key"] == "workspace1_value" assert "test_key" not in data2_check, f"test_key leaked to workspace2: {data2_check.get('test_key')}"
), f"workspace1 test_key value incorrect: {data1_check.get('test_key')}"
assert (
"test_key" not in data2_check
), f"test_key leaked to workspace2: {data2_check.get('test_key')}"
print("✅ PASSED: Pipeline Status Isolation") print("✅ PASSED: Pipeline Status Isolation")
print(" Different workspaces have isolated pipeline status") print(" Different workspaces have isolated pipeline status")
@ -210,53 +104,56 @@ async def test_lock_mechanism():
Tests both parallel execution for different workspaces and serialization Tests both parallel execution for different workspaces and serialization
for the same workspace. for the same workspace.
""" """
# Purpose: Validate that keyed locks isolate workspaces while serializing
# requests within the same workspace. Scope: get_namespace_lock scheduling
# semantics for both cross-workspace and single-workspace cases.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 2: Lock Mechanism (No Deadlocks)") print("TEST 2: Lock Mechanism (No Deadlocks)")
print("=" * 60) print("=" * 60)
# Test 2.1: Different workspaces should run in parallel # Test 2.1: Different workspaces should run in parallel
print("\nTest 2.1: Different workspaces locks should be parallel") print("\nTest 2.1: Different workspaces locks should be parallel")
# Support stress testing with configurable number of workers
num_workers = PARALLEL_WORKERS if STRESS_TEST_MODE else 3
parallel_workload = [
(f"ws_{chr(97+i)}", f"ws_{chr(97+i)}", "test_namespace")
for i in range(num_workers)
]
max_parallel, timeline_parallel, metrics = await _measure_lock_parallelism(
parallel_workload
)
assert max_parallel >= 2, (
"Locks for distinct workspaces should overlap; "
f"observed max concurrency: {max_parallel}, timeline={timeline_parallel}"
)
print("✅ PASSED: Lock Mechanism - Parallel (Different Workspaces)") async def acquire_lock_timed(workspace, namespace, hold_time):
print(f" Locks overlapped for different workspaces (max concurrency={max_parallel})") """Acquire a lock and hold it for specified time"""
print(f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} workers") lock = get_namespace_lock(namespace, workspace)
start = time.time()
async with lock:
print(
f" [{workspace}] acquired lock at {time.time() - start:.2f}s"
)
await asyncio.sleep(hold_time)
print(
f" [{workspace}] releasing lock at {time.time() - start:.2f}s"
)
start = time.time()
await asyncio.gather(
acquire_lock_timed("ws_a", "test_namespace", 0.5),
acquire_lock_timed("ws_b", "test_namespace", 0.5),
acquire_lock_timed("ws_c", "test_namespace", 0.5),
)
elapsed = time.time() - start
# If locks are properly isolated by workspace, this should take ~0.5s (parallel)
# If they block each other, it would take ~1.5s (serial)
assert elapsed < 1.0, f"Locks blocked each other: {elapsed:.2f}s (expected < 1.0s)"
print(f"✅ PASSED: Lock Mechanism - Parallel (Different Workspaces)")
print(f" Locks ran in parallel: {elapsed:.2f}s")
# Test 2.2: Same workspace should serialize # Test 2.2: Same workspace should serialize
print("\nTest 2.2: Same workspace locks should serialize") print("\nTest 2.2: Same workspace locks should serialize")
serial_workload = [
("serial_run_1", "ws_same", "test_namespace"),
("serial_run_2", "ws_same", "test_namespace"),
]
max_parallel_serial, timeline_serial, metrics_serial = await _measure_lock_parallelism(
serial_workload
)
assert max_parallel_serial == 1, (
"Same workspace locks should not overlap; "
f"observed {max_parallel_serial} with timeline {timeline_serial}"
)
_assert_no_timeline_overlap(timeline_serial)
print("✅ PASSED: Lock Mechanism - Serial (Same Workspace)") start = time.time()
print(" Same workspace operations executed sequentially with no overlap") await asyncio.gather(
print(f" Performance: {metrics_serial['total_duration']:.3f}s for {metrics_serial['num_workers']} tasks") acquire_lock_timed("ws_same", "test_namespace", 0.3),
acquire_lock_timed("ws_same", "test_namespace", 0.3),
)
elapsed = time.time() - start
# Same workspace should serialize, taking ~0.6s
assert elapsed >= 0.5, f"Locks didn't serialize: {elapsed:.2f}s (expected >= 0.5s)"
print(f"✅ PASSED: Lock Mechanism - Serial (Same Workspace)")
print(f" Locks serialized correctly: {elapsed:.2f}s")
# ============================================================================= # =============================================================================
@ -269,9 +166,6 @@ async def test_backward_compatibility():
""" """
Test that legacy code without workspace parameter still works correctly. Test that legacy code without workspace parameter still works correctly.
""" """
# Purpose: Validate backward-compatible defaults when workspace arguments
# are omitted. Scope: get_final_namespace, set/get_default_workspace and
# initialize_pipeline_status fallback behavior.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 3: Backward Compatibility") print("TEST 3: Backward Compatibility")
print("=" * 60) print("=" * 60)
@ -285,7 +179,7 @@ async def test_backward_compatibility():
assert final_ns == expected, f"Expected {expected}, got {final_ns}" assert final_ns == expected, f"Expected {expected}, got {final_ns}"
print("✅ PASSED: Backward Compatibility - get_final_namespace") print(f"✅ PASSED: Backward Compatibility - get_final_namespace")
print(f" Correctly uses default workspace: {final_ns}") print(f" Correctly uses default workspace: {final_ns}")
# Test 3.2: get_default_workspace # Test 3.2: get_default_workspace
@ -296,7 +190,7 @@ async def test_backward_compatibility():
assert retrieved == "test_default", f"Expected 'test_default', got {retrieved}" assert retrieved == "test_default", f"Expected 'test_default', got {retrieved}"
print("✅ PASSED: Backward Compatibility - default workspace") print(f"✅ PASSED: Backward Compatibility - default workspace")
print(f" Default workspace set/get correctly: {retrieved}") print(f" Default workspace set/get correctly: {retrieved}")
# Test 3.3: Empty workspace handling # Test 3.3: Empty workspace handling
@ -306,11 +200,9 @@ async def test_backward_compatibility():
final_ns_empty = get_final_namespace("pipeline_status", workspace=None) final_ns_empty = get_final_namespace("pipeline_status", workspace=None)
expected_empty = "pipeline_status" # Should be just the namespace without ':' expected_empty = "pipeline_status" # Should be just the namespace without ':'
assert ( assert final_ns_empty == expected_empty, f"Expected '{expected_empty}', got '{final_ns_empty}'"
final_ns_empty == expected_empty
), f"Expected '{expected_empty}', got '{final_ns_empty}'"
print("✅ PASSED: Backward Compatibility - empty workspace") print(f"✅ PASSED: Backward Compatibility - empty workspace")
print(f" Empty workspace handled correctly: '{final_ns_empty}'") print(f" Empty workspace handled correctly: '{final_ns_empty}'")
# Test 3.4: None workspace with default set # Test 3.4: None workspace with default set
@ -324,12 +216,10 @@ async def test_backward_compatibility():
"pipeline_status", workspace="compat_test_workspace" "pipeline_status", workspace="compat_test_workspace"
) )
assert ( assert data is not None, "Failed to initialize pipeline status with default workspace"
data is not None
), "Failed to initialize pipeline status with default workspace"
print("✅ PASSED: Backward Compatibility - pipeline init with None") print(f"✅ PASSED: Backward Compatibility - pipeline init with None")
print(" Pipeline status initialized with default workspace") print(f" Pipeline status initialized with default workspace")
# ============================================================================= # =============================================================================
@ -343,9 +233,6 @@ async def test_multi_workspace_concurrency():
Test that multiple workspaces can operate concurrently without interference. Test that multiple workspaces can operate concurrently without interference.
Simulates concurrent operations on different workspaces. Simulates concurrent operations on different workspaces.
""" """
# Purpose: Simulate concurrent workloads touching pipeline_status across
# workspaces. Scope: initialize_pipeline_status, get_namespace_lock, and
# shared dictionary mutation while ensuring isolation.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 4: Multi-Workspace Concurrency") print("TEST 4: Multi-Workspace Concurrency")
print("=" * 60) print("=" * 60)
@ -390,10 +277,8 @@ async def test_multi_workspace_concurrency():
# Verify all workspaces completed # Verify all workspaces completed
assert set(results_list) == set(workspaces), "Not all workspaces completed" assert set(results_list) == set(workspaces), "Not all workspaces completed"
print("✅ PASSED: Multi-Workspace Concurrency - Execution") print(f"✅ PASSED: Multi-Workspace Concurrency - Execution")
print( print(f" All {len(workspaces)} workspaces completed successfully in {elapsed:.2f}s")
f" All {len(workspaces)} workspaces completed successfully in {elapsed:.2f}s"
)
# Verify data isolation - each workspace should have its own data # Verify data isolation - each workspace should have its own data
print("\n Verifying data isolation...") print("\n Verifying data isolation...")
@ -403,16 +288,12 @@ async def test_multi_workspace_concurrency():
expected_key = f"{ws}_key" expected_key = f"{ws}_key"
expected_value = f"{ws}_value" expected_value = f"{ws}_value"
assert ( assert expected_key in data, f"Data not properly isolated for {ws}: missing {expected_key}"
expected_key in data assert data[expected_key] == expected_value, f"Data not properly isolated for {ws}: {expected_key}={data[expected_key]} (expected {expected_value})"
), f"Data not properly isolated for {ws}: missing {expected_key}"
assert (
data[expected_key] == expected_value
), f"Data not properly isolated for {ws}: {expected_key}={data[expected_key]} (expected {expected_value})"
print(f" [{ws}] Data correctly isolated: {expected_key}={data[expected_key]}") print(f" [{ws}] Data correctly isolated: {expected_key}={data[expected_key]}")
print("✅ PASSED: Multi-Workspace Concurrency - Data Isolation") print(f"✅ PASSED: Multi-Workspace Concurrency - Data Isolation")
print(" All workspaces have properly isolated data") print(f" All workspaces have properly isolated data")
# ============================================================================= # =============================================================================
@ -426,9 +307,6 @@ async def test_namespace_lock_reentrance():
Test that NamespaceLock prevents re-entrance in the same coroutine Test that NamespaceLock prevents re-entrance in the same coroutine
and allows concurrent use in different coroutines. and allows concurrent use in different coroutines.
""" """
# Purpose: Ensure NamespaceLock enforces single entry per coroutine while
# allowing concurrent reuse through ContextVar isolation. Scope: lock
# re-entrance checks and concurrent gather semantics.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 5: NamespaceLock Re-entrance Protection") print("TEST 5: NamespaceLock Re-entrance Protection")
print("=" * 60) print("=" * 60)
@ -454,8 +332,8 @@ async def test_namespace_lock_reentrance():
assert reentrance_failed_correctly, "Re-entrance protection not working" assert reentrance_failed_correctly, "Re-entrance protection not working"
print("✅ PASSED: NamespaceLock Re-entrance Protection") print(f"✅ PASSED: NamespaceLock Re-entrance Protection")
print(" Re-entrance correctly raises RuntimeError") print(f" Re-entrance correctly raises RuntimeError")
# Test 5.2: Same NamespaceLock instance in different coroutines should succeed # Test 5.2: Same NamespaceLock instance in different coroutines should succeed
print("\nTest 5.2: Same NamespaceLock instance in different coroutines") print("\nTest 5.2: Same NamespaceLock instance in different coroutines")
@ -478,14 +356,10 @@ async def test_namespace_lock_reentrance():
# Both coroutines should have completed # Both coroutines should have completed
expected_entries = 4 # 2 starts + 2 ends expected_entries = 4 # 2 starts + 2 ends
assert ( assert len(concurrent_results) == expected_entries, f"Expected {expected_entries} entries, got {len(concurrent_results)}"
len(concurrent_results) == expected_entries
), f"Expected {expected_entries} entries, got {len(concurrent_results)}"
print("✅ PASSED: NamespaceLock Concurrent Reuse") print(f"✅ PASSED: NamespaceLock Concurrent Reuse")
print( print(f" Same NamespaceLock instance used successfully in {expected_entries//2} concurrent coroutines")
f" Same NamespaceLock instance used successfully in {expected_entries//2} concurrent coroutines"
)
# ============================================================================= # =============================================================================
@ -498,29 +372,35 @@ async def test_different_namespace_lock_isolation():
""" """
Test that locks for different namespaces (same workspace) are independent. Test that locks for different namespaces (same workspace) are independent.
""" """
# Purpose: Confirm that namespace isolation is enforced even when workspace
# is the same. Scope: get_namespace_lock behavior when namespaces differ.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 6: Different Namespace Lock Isolation") print("TEST 6: Different Namespace Lock Isolation")
print("=" * 60) print("=" * 60)
print("\nTesting locks with same workspace but different namespaces") print("\nTesting locks with same workspace but different namespaces")
workload = [ async def acquire_lock_timed(workspace, namespace, hold_time, name):
("ns_a", "same_ws", "namespace_a"), """Acquire a lock and hold it for specified time"""
("ns_b", "same_ws", "namespace_b"), lock = get_namespace_lock(namespace, workspace)
("ns_c", "same_ws", "namespace_c"), start = time.time()
] async with lock:
max_parallel, timeline, metrics = await _measure_lock_parallelism(workload) print(f" [{name}] acquired lock at {time.time() - start:.2f}s")
await asyncio.sleep(hold_time)
print(f" [{name}] releasing lock at {time.time() - start:.2f}s")
assert max_parallel >= 2, ( # These should run in parallel (different namespaces)
"Different namespaces within the same workspace should run concurrently; " start = time.time()
f"observed max concurrency {max_parallel} with timeline {timeline}" await asyncio.gather(
acquire_lock_timed("same_ws", "namespace_a", 0.5, "ns_a"),
acquire_lock_timed("same_ws", "namespace_b", 0.5, "ns_b"),
acquire_lock_timed("same_ws", "namespace_c", 0.5, "ns_c"),
) )
elapsed = time.time() - start
print("✅ PASSED: Different Namespace Lock Isolation") # If locks are properly isolated by namespace, this should take ~0.5s (parallel)
print(f" Different namespace locks ran in parallel (max concurrency={max_parallel})") assert elapsed < 1.0, f"Different namespace locks blocked each other: {elapsed:.2f}s (expected < 1.0s)"
print(f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} namespaces")
print(f"✅ PASSED: Different Namespace Lock Isolation")
print(f" Different namespace locks ran in parallel: {elapsed:.2f}s")
# ============================================================================= # =============================================================================
@ -533,18 +413,10 @@ async def test_error_handling():
""" """
Test error handling for invalid workspace configurations. Test error handling for invalid workspace configurations.
""" """
# Purpose: Validate guardrails for workspace normalization and namespace
# derivation. Scope: set_default_workspace conversions and get_final_namespace
# failure paths when configuration is invalid.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 7: Error Handling") print("TEST 7: Error Handling")
print("=" * 60) print("=" * 60)
# Test 7.0: Missing default workspace should raise ValueError
print("\nTest 7.0: Missing workspace raises ValueError")
with pytest.raises(ValueError):
get_final_namespace("test_namespace", workspace=None)
# Test 7.1: set_default_workspace(None) converts to empty string # Test 7.1: set_default_workspace(None) converts to empty string
print("\nTest 7.1: set_default_workspace(None) converts to empty string") print("\nTest 7.1: set_default_workspace(None) converts to empty string")
@ -554,10 +426,8 @@ async def test_error_handling():
# Should convert None to "" automatically # Should convert None to "" automatically
assert default_ws == "", f"Expected empty string, got: '{default_ws}'" assert default_ws == "", f"Expected empty string, got: '{default_ws}'"
print("✅ PASSED: Error Handling - None to Empty String") print(f"✅ PASSED: Error Handling - None to Empty String")
print( print(f" set_default_workspace(None) correctly converts to empty string: '{default_ws}'")
f" set_default_workspace(None) correctly converts to empty string: '{default_ws}'"
)
# Test 7.2: Empty string workspace behavior # Test 7.2: Empty string workspace behavior
print("\nTest 7.2: Empty string workspace creates valid namespace") print("\nTest 7.2: Empty string workspace creates valid namespace")
@ -566,7 +436,7 @@ async def test_error_handling():
final_ns = get_final_namespace("test_namespace", workspace="") final_ns = get_final_namespace("test_namespace", workspace="")
assert final_ns == "test_namespace", f"Unexpected namespace: '{final_ns}'" assert final_ns == "test_namespace", f"Unexpected namespace: '{final_ns}'"
print("✅ PASSED: Error Handling - Empty Workspace Namespace") print(f"✅ PASSED: Error Handling - Empty Workspace Namespace")
print(f" Empty workspace creates valid namespace: '{final_ns}'") print(f" Empty workspace creates valid namespace: '{final_ns}'")
# Restore default workspace for other tests # Restore default workspace for other tests
@ -583,9 +453,6 @@ async def test_update_flags_workspace_isolation():
""" """
Test that update flags are properly isolated between workspaces. Test that update flags are properly isolated between workspaces.
""" """
# Purpose: Confirm update flag setters/readers respect workspace scoping.
# Scope: set_all_update_flags, clear_all_update_flags, get_all_update_flags_status,
# and get_update_flag interactions across namespaces.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 8: Update Flags Workspace Isolation") print("TEST 8: Update Flags Workspace Isolation")
print("=" * 60) print("=" * 60)
@ -615,17 +482,11 @@ async def test_update_flags_workspace_isolation():
await set_all_update_flags(test_namespace, workspace=workspace1) await set_all_update_flags(test_namespace, workspace=workspace1)
# Check that only workspace1's flags are set # Check that only workspace1's flags are set
assert ( assert flag1_obj.value is True, f"Flag1 should be True after set_all_update_flags, got {flag1_obj.value}"
flag1_obj.value is True assert flag2_obj.value is False, f"Flag2 should still be False, got {flag2_obj.value}"
), f"Flag1 should be True after set_all_update_flags, got {flag1_obj.value}"
assert (
flag2_obj.value is False
), f"Flag2 should still be False, got {flag2_obj.value}"
print("✅ PASSED: Update Flags - set_all_update_flags Isolation") print(f"✅ PASSED: Update Flags - set_all_update_flags Isolation")
print( print(f" set_all_update_flags isolated: ws1={flag1_obj.value}, ws2={flag2_obj.value}")
f" set_all_update_flags isolated: ws1={flag1_obj.value}, ws2={flag2_obj.value}"
)
# Test 8.2: clear_all_update_flags isolation # Test 8.2: clear_all_update_flags isolation
print("\nTest 8.2: clear_all_update_flags workspace isolation") print("\nTest 8.2: clear_all_update_flags workspace isolation")
@ -642,15 +503,11 @@ async def test_update_flags_workspace_isolation():
await clear_all_update_flags(test_namespace, workspace=workspace1) await clear_all_update_flags(test_namespace, workspace=workspace1)
# Check that only workspace1's flags are cleared # Check that only workspace1's flags are cleared
assert ( assert flag1_obj.value is False, f"Flag1 should be False after clear, got {flag1_obj.value}"
flag1_obj.value is False
), f"Flag1 should be False after clear, got {flag1_obj.value}"
assert flag2_obj.value is True, f"Flag2 should still be True, got {flag2_obj.value}" assert flag2_obj.value is True, f"Flag2 should still be True, got {flag2_obj.value}"
print("✅ PASSED: Update Flags - clear_all_update_flags Isolation") print(f"✅ PASSED: Update Flags - clear_all_update_flags Isolation")
print( print(f" clear_all_update_flags isolated: ws1={flag1_obj.value}, ws2={flag2_obj.value}")
f" clear_all_update_flags isolated: ws1={flag1_obj.value}, ws2={flag2_obj.value}"
)
# Test 8.3: get_all_update_flags_status workspace filtering # Test 8.3: get_all_update_flags_status workspace filtering
print("\nTest 8.3: get_all_update_flags_status workspace filtering") print("\nTest 8.3: get_all_update_flags_status workspace filtering")
@ -675,31 +532,11 @@ async def test_update_flags_workspace_isolation():
workspace1_keys = [k for k in status1.keys() if workspace1 in k] workspace1_keys = [k for k in status1.keys() if workspace1 in k]
workspace2_keys = [k for k in status1.keys() if workspace2 in k] workspace2_keys = [k for k in status1.keys() if workspace2 in k]
assert ( assert len(workspace1_keys) > 0, f"workspace1 keys should be present, got {len(workspace1_keys)}"
len(workspace1_keys) > 0 assert len(workspace2_keys) == 0, f"workspace2 keys should not be present, got {len(workspace2_keys)}"
), f"workspace1 keys should be present, got {len(workspace1_keys)}"
assert (
len(workspace2_keys) == 0
), f"workspace2 keys should not be present, got {len(workspace2_keys)}"
for key, values in status1.items():
assert all(values), f"All flags in {key} should be True, got {values}"
# Workspace2 query should only surface workspace2 namespaces print(f"✅ PASSED: Update Flags - get_all_update_flags_status Filtering")
status2 = await get_all_update_flags_status(workspace=workspace2) print(f" Status correctly filtered: ws1 keys={len(workspace1_keys)}, ws2 keys={len(workspace2_keys)}")
expected_ws2_keys = {
f"{workspace2}:{test_namespace}",
f"{workspace2}:ns_c",
}
assert (
set(status2.keys()) == expected_ws2_keys
), f"Unexpected namespaces for workspace2: {status2.keys()}"
for key, values in status2.items():
assert all(values), f"All flags in {key} should be True, got {values}"
print("✅ PASSED: Update Flags - get_all_update_flags_status Filtering")
print(
f" Status correctly filtered: ws1 keys={len(workspace1_keys)}, ws2 keys={len(workspace2_keys)}"
)
# ============================================================================= # =============================================================================
@ -712,9 +549,6 @@ async def test_empty_workspace_standardization():
""" """
Test that empty workspace is properly standardized to "" instead of "_". Test that empty workspace is properly standardized to "" instead of "_".
""" """
# Purpose: Verify namespace formatting when workspace is an empty string.
# Scope: get_final_namespace output and initialize_pipeline_status behavior
# between empty and non-empty workspaces.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 9: Empty Workspace Standardization") print("TEST 9: Empty Workspace Standardization")
print("=" * 60) print("=" * 60)
@ -726,11 +560,9 @@ async def test_empty_workspace_standardization():
final_ns = get_final_namespace("test_namespace", workspace=None) final_ns = get_final_namespace("test_namespace", workspace=None)
# Should be just "test_namespace" without colon prefix # Should be just "test_namespace" without colon prefix
assert ( assert final_ns == "test_namespace", f"Unexpected namespace format: '{final_ns}' (expected 'test_namespace')"
final_ns == "test_namespace"
), f"Unexpected namespace format: '{final_ns}' (expected 'test_namespace')"
print("✅ PASSED: Empty Workspace Standardization - Format") print(f"✅ PASSED: Empty Workspace Standardization - Format")
print(f" Empty workspace creates correct namespace: '{final_ns}'") print(f" Empty workspace creates correct namespace: '{final_ns}'")
# Test 9.2: Empty workspace vs non-empty workspace behavior # Test 9.2: Empty workspace vs non-empty workspace behavior
@ -747,12 +579,10 @@ async def test_empty_workspace_standardization():
data_nonempty = await get_namespace_data("pipeline_status", workspace="test_ws") data_nonempty = await get_namespace_data("pipeline_status", workspace="test_ws")
# They should be different objects # They should be different objects
assert ( assert data_empty is not data_nonempty, "Empty and non-empty workspaces share data (should be independent)"
data_empty is not data_nonempty
), "Empty and non-empty workspaces share data (should be independent)"
print("✅ PASSED: Empty Workspace Standardization - Behavior") print(f"✅ PASSED: Empty Workspace Standardization - Behavior")
print(" Empty and non-empty workspaces have independent data") print(f" Empty and non-empty workspaces have independent data")
# ============================================================================= # =============================================================================
@ -767,9 +597,6 @@ async def test_json_kv_storage_workspace_isolation():
Creates two JsonKVStorage instances with different workspaces, writes different data, Creates two JsonKVStorage instances with different workspaces, writes different data,
and verifies they don't mix. and verifies they don't mix.
""" """
# Purpose: Ensure JsonKVStorage respects workspace-specific directories and data.
# Scope: storage initialization, upsert/get_by_id operations, and filesystem layout
# inside the temporary working directory.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 10: JsonKVStorage Workspace Isolation (Integration)") print("TEST 10: JsonKVStorage Workspace Isolation (Integration)")
print("=" * 60) print("=" * 60)
@ -792,9 +619,7 @@ async def test_json_kv_storage_workspace_isolation():
} }
# Test 10.1: Create two JsonKVStorage instances with different workspaces # Test 10.1: Create two JsonKVStorage instances with different workspaces
print( print("\nTest 10.1: Create two JsonKVStorage instances with different workspaces")
"\nTest 10.1: Create two JsonKVStorage instances with different workspaces"
)
from lightrag.kg.json_kv_impl import JsonKVStorage from lightrag.kg.json_kv_impl import JsonKVStorage
@ -816,41 +641,25 @@ async def test_json_kv_storage_workspace_isolation():
await storage1.initialize() await storage1.initialize()
await storage2.initialize() await storage2.initialize()
print(" Storage1 created: workspace=workspace1, namespace=entities") print(f" Storage1 created: workspace=workspace1, namespace=entities")
print(" Storage2 created: workspace=workspace2, namespace=entities") print(f" Storage2 created: workspace=workspace2, namespace=entities")
# Test 10.2: Write different data to each storage # Test 10.2: Write different data to each storage
print("\nTest 10.2: Write different data to each storage") print("\nTest 10.2: Write different data to each storage")
# Write to storage1 (upsert expects dict[str, dict]) # Write to storage1 (upsert expects dict[str, dict])
await storage1.upsert( await storage1.upsert({
{ "entity1": {"content": "Data from workspace1 - AI Research", "type": "entity"},
"entity1": { "entity2": {"content": "Data from workspace1 - Machine Learning", "type": "entity"}
"content": "Data from workspace1 - AI Research", })
"type": "entity", print(f" Written to storage1: entity1, entity2")
},
"entity2": {
"content": "Data from workspace1 - Machine Learning",
"type": "entity",
},
}
)
print(" Written to storage1: entity1, entity2")
# Write to storage2 # Write to storage2
await storage2.upsert( await storage2.upsert({
{ "entity1": {"content": "Data from workspace2 - Deep Learning", "type": "entity"},
"entity1": { "entity2": {"content": "Data from workspace2 - Neural Networks", "type": "entity"}
"content": "Data from workspace2 - Deep Learning", })
"type": "entity", print(f" Written to storage2: entity1, entity2")
},
"entity2": {
"content": "Data from workspace2 - Neural Networks",
"type": "entity",
},
}
)
print(" Written to storage2: entity1, entity2")
# Test 10.3: Read data from each storage and verify isolation # Test 10.3: Read data from each storage and verify isolation
print("\nTest 10.3: Read data and verify isolation") print("\nTest 10.3: Read data and verify isolation")
@ -873,29 +682,15 @@ async def test_json_kv_storage_workspace_isolation():
assert result1_entity2 is not None, "Storage1 entity2 should not be None" assert result1_entity2 is not None, "Storage1 entity2 should not be None"
assert result2_entity1 is not None, "Storage2 entity1 should not be None" assert result2_entity1 is not None, "Storage2 entity1 should not be None"
assert result2_entity2 is not None, "Storage2 entity2 should not be None" assert result2_entity2 is not None, "Storage2 entity2 should not be None"
assert ( assert result1_entity1.get("content") == "Data from workspace1 - AI Research", f"Storage1 entity1 content mismatch"
result1_entity1.get("content") == "Data from workspace1 - AI Research" assert result1_entity2.get("content") == "Data from workspace1 - Machine Learning", f"Storage1 entity2 content mismatch"
), "Storage1 entity1 content mismatch" assert result2_entity1.get("content") == "Data from workspace2 - Deep Learning", f"Storage2 entity1 content mismatch"
assert ( assert result2_entity2.get("content") == "Data from workspace2 - Neural Networks", f"Storage2 entity2 content mismatch"
result1_entity2.get("content") == "Data from workspace1 - Machine Learning" assert result1_entity1.get("content") != result2_entity1.get("content"), "Storage1 and Storage2 entity1 should have different content"
), "Storage1 entity2 content mismatch" assert result1_entity2.get("content") != result2_entity2.get("content"), "Storage1 and Storage2 entity2 should have different content"
assert (
result2_entity1.get("content") == "Data from workspace2 - Deep Learning"
), "Storage2 entity1 content mismatch"
assert (
result2_entity2.get("content") == "Data from workspace2 - Neural Networks"
), "Storage2 entity2 content mismatch"
assert result1_entity1.get("content") != result2_entity1.get(
"content"
), "Storage1 and Storage2 entity1 should have different content"
assert result1_entity2.get("content") != result2_entity2.get(
"content"
), "Storage1 and Storage2 entity2 should have different content"
print("✅ PASSED: JsonKVStorage - Data Isolation") print(f"✅ PASSED: JsonKVStorage - Data Isolation")
print( print(f" Two storage instances correctly isolated: ws1 and ws2 have different data")
" Two storage instances correctly isolated: ws1 and ws2 have different data"
)
# Test 10.4: Verify file structure # Test 10.4: Verify file structure
print("\nTest 10.4: Verify file structure") print("\nTest 10.4: Verify file structure")
@ -911,16 +706,14 @@ async def test_json_kv_storage_workspace_isolation():
assert ws1_exists, "workspace1 directory should exist" assert ws1_exists, "workspace1 directory should exist"
assert ws2_exists, "workspace2 directory should exist" assert ws2_exists, "workspace2 directory should exist"
print("✅ PASSED: JsonKVStorage - File Structure") print(f"✅ PASSED: JsonKVStorage - File Structure")
print(f" Workspace directories correctly created: {ws1_dir} and {ws2_dir}") print(f" Workspace directories correctly created: {ws1_dir} and {ws2_dir}")
finally: finally:
# Cleanup test directory (unless KEEP_TEST_ARTIFACTS is set) # Cleanup test directory
if os.path.exists(test_dir) and not KEEP_TEST_ARTIFACTS: if os.path.exists(test_dir):
shutil.rmtree(test_dir) shutil.rmtree(test_dir)
print(f"\n Cleaned up test directory: {test_dir}") print(f"\n Cleaned up test directory: {test_dir}")
elif KEEP_TEST_ARTIFACTS:
print(f"\n Kept test directory for inspection: {test_dir}")
# ============================================================================= # =============================================================================
@ -935,42 +728,26 @@ async def test_lightrag_end_to_end_workspace_isolation():
insert different data, and verify file separation. insert different data, and verify file separation.
Uses mock LLM and embedding functions to avoid external API calls. Uses mock LLM and embedding functions to avoid external API calls.
""" """
# Purpose: Validate that full LightRAG flows keep artifacts scoped per workspace.
# Scope: LightRAG.initialize_storages + ainsert side effects plus filesystem
# verification for generated storage files.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 11: LightRAG End-to-End Workspace Isolation") print("TEST 11: LightRAG End-to-End Workspace Isolation")
print("=" * 60) print("=" * 60)
# Create temporary test directory # Create temporary test directory
# test_dir = tempfile.mkdtemp(prefix="lightrag_test_e2e_") test_dir = tempfile.mkdtemp(prefix="lightrag_test_e2e_")
test_dir = str(Path(__file__).parent.parent / "temp/e2e_workspace_isolation")
if os.path.exists(test_dir):
shutil.rmtree(test_dir)
os.makedirs(test_dir, exist_ok=True)
print(f"\n Using test directory: {test_dir}") print(f"\n Using test directory: {test_dir}")
try: try:
# Factory function to create different mock LLM functions for each workspace # Mock LLM function
def create_mock_llm_func(workspace_name): async def mock_llm_func(
"""Create a mock LLM function that returns different content based on workspace""" prompt, system_prompt=None, history_messages=[], **kwargs
async def mock_llm_func( ) -> str:
prompt, system_prompt=None, history_messages=[], **kwargs # Return a mock response that simulates entity extraction in the correct format
) -> str: # Format: entity<|#|>entity_name<|#|>entity_type<|#|>entity_description
# Return different responses based on workspace # Format: relation<|#|>source_entity<|#|>target_entity<|#|>keywords<|#|>description
# Format: entity<|#|>entity_name<|#|>entity_type<|#|>entity_description return """entity<|#|>Artificial Intelligence<|#|>concept<|#|>AI is a field of computer science focused on creating intelligent machines.
# Format: relation<|#|>source_entity<|#|>target_entity<|#|>keywords<|#|>description
if workspace_name == "project_a":
return """entity<|#|>Artificial Intelligence<|#|>concept<|#|>AI is a field of computer science focused on creating intelligent machines.
entity<|#|>Machine Learning<|#|>concept<|#|>Machine Learning is a subset of AI that enables systems to learn from data. entity<|#|>Machine Learning<|#|>concept<|#|>Machine Learning is a subset of AI that enables systems to learn from data.
relation<|#|>Machine Learning<|#|>Artificial Intelligence<|#|>subset, related field<|#|>Machine Learning is a key component and subset of Artificial Intelligence. relation<|#|>Machine Learning<|#|>Artificial Intelligence<|#|>subset, related field<|#|>Machine Learning is a key component and subset of Artificial Intelligence.
<|COMPLETE|>""" <|COMPLETE|>"""
else: # project_b
return """entity<|#|>Deep Learning<|#|>concept<|#|>Deep Learning is a subset of machine learning using neural networks with multiple layers.
entity<|#|>Neural Networks<|#|>concept<|#|>Neural Networks are computing systems inspired by biological neural networks.
relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Learning uses multiple layers of Neural Networks to learn representations.
<|COMPLETE|>"""
return mock_llm_func
# Mock embedding function # Mock embedding function
async def mock_embedding_func(texts: list[str]) -> np.ndarray: async def mock_embedding_func(texts: list[str]) -> np.ndarray:
@ -980,51 +757,36 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le
print("\nTest 11.1: Create two LightRAG instances with different workspaces") print("\nTest 11.1: Create two LightRAG instances with different workspaces")
from lightrag import LightRAG from lightrag import LightRAG
from lightrag.utils import EmbeddingFunc, Tokenizer from lightrag.utils import EmbeddingFunc
# Create different mock LLM functions for each workspace
mock_llm_func_a = create_mock_llm_func("project_a")
mock_llm_func_b = create_mock_llm_func("project_b")
class _SimpleTokenizerImpl:
def encode(self, content: str) -> list[int]:
return [ord(ch) for ch in content]
def decode(self, tokens: list[int]) -> str:
return "".join(chr(t) for t in tokens)
tokenizer = Tokenizer("mock-tokenizer", _SimpleTokenizerImpl())
rag1 = LightRAG( rag1 = LightRAG(
working_dir=test_dir, working_dir=test_dir,
workspace="project_a", workspace="project_a",
llm_model_func=mock_llm_func_a, llm_model_func=mock_llm_func,
embedding_func=EmbeddingFunc( embedding_func=EmbeddingFunc(
embedding_dim=384, embedding_dim=384,
max_token_size=8192, max_token_size=8192,
func=mock_embedding_func, func=mock_embedding_func,
), ),
tokenizer=tokenizer,
) )
rag2 = LightRAG( rag2 = LightRAG(
working_dir=test_dir, working_dir=test_dir,
workspace="project_b", workspace="project_b",
llm_model_func=mock_llm_func_b, llm_model_func=mock_llm_func,
embedding_func=EmbeddingFunc( embedding_func=EmbeddingFunc(
embedding_dim=384, embedding_dim=384,
max_token_size=8192, max_token_size=8192,
func=mock_embedding_func, func=mock_embedding_func,
), ),
tokenizer=tokenizer,
) )
# Initialize storages # Initialize storages
await rag1.initialize_storages() await rag1.initialize_storages()
await rag2.initialize_storages() await rag2.initialize_storages()
print(" RAG1 created: workspace=project_a") print(f" RAG1 created: workspace=project_a")
print(" RAG2 created: workspace=project_b") print(f" RAG2 created: workspace=project_b")
# Test 11.2: Insert different data to each RAG instance # Test 11.2: Insert different data to each RAG instance
print("\nTest 11.2: Insert different data to each RAG instance") print("\nTest 11.2: Insert different data to each RAG instance")
@ -1058,20 +820,20 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le
assert project_b_exists, "project_b directory should exist" assert project_b_exists, "project_b directory should exist"
# List files in each directory # List files in each directory
print("\n Files in project_a/:") print(f"\n Files in project_a/:")
for file in sorted(project_a_dir.glob("*")): for file in sorted(project_a_dir.glob("*")):
if file.is_file(): if file.is_file():
size = file.stat().st_size size = file.stat().st_size
print(f" - {file.name} ({size} bytes)") print(f" - {file.name} ({size} bytes)")
print("\n Files in project_b/:") print(f"\n Files in project_b/:")
for file in sorted(project_b_dir.glob("*")): for file in sorted(project_b_dir.glob("*")):
if file.is_file(): if file.is_file():
size = file.stat().st_size size = file.stat().st_size
print(f" - {file.name} ({size} bytes)") print(f" - {file.name} ({size} bytes)")
print("✅ PASSED: LightRAG E2E - File Structure") print(f"✅ PASSED: LightRAG E2E - File Structure")
print(" Workspace directories correctly created and separated") print(f" Workspace directories correctly created and separated")
# Test 11.4: Verify data isolation by checking file contents # Test 11.4: Verify data isolation by checking file contents
print("\nTest 11.4: Verify data isolation (check file contents)") print("\nTest 11.4: Verify data isolation (check file contents)")
@ -1093,55 +855,19 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le
print(f" project_b doc count: {len(docs_b_content)}") print(f" project_b doc count: {len(docs_b_content)}")
# Verify they contain different data # Verify they contain different data
assert ( assert docs_a_content != docs_b_content, "Document storage not properly isolated"
docs_a_content != docs_b_content
), "Document storage not properly isolated"
# Verify each workspace contains its own text content print(f"✅ PASSED: LightRAG E2E - Data Isolation")
docs_a_str = json.dumps(docs_a_content) print(f" Document storage correctly isolated between workspaces")
docs_b_str = json.dumps(docs_b_content)
# Check project_a contains its text and NOT project_b's text
assert (
"Artificial Intelligence" in docs_a_str
), "project_a should contain 'Artificial Intelligence'"
assert (
"Machine Learning" in docs_a_str
), "project_a should contain 'Machine Learning'"
assert (
"Deep Learning" not in docs_a_str
), "project_a should NOT contain 'Deep Learning' from project_b"
assert (
"Neural Networks" not in docs_a_str
), "project_a should NOT contain 'Neural Networks' from project_b"
# Check project_b contains its text and NOT project_a's text
assert (
"Deep Learning" in docs_b_str
), "project_b should contain 'Deep Learning'"
assert (
"Neural Networks" in docs_b_str
), "project_b should contain 'Neural Networks'"
assert (
"Artificial Intelligence" not in docs_b_str
), "project_b should NOT contain 'Artificial Intelligence' from project_a"
# Note: "Machine Learning" might appear in project_b's text, so we skip that check
print("✅ PASSED: LightRAG E2E - Data Isolation")
print(" Document storage correctly isolated between workspaces")
print(" project_a contains only its own data")
print(" project_b contains only its own data")
else: else:
print(" Document storage files not found (may not be created yet)") print(f" Document storage files not found (may not be created yet)")
print("✅ PASSED: LightRAG E2E - Data Isolation") print(f"✅ PASSED: LightRAG E2E - Data Isolation")
print(" Skipped file content check (files not created)") print(f" Skipped file content check (files not created)")
print("\n ✓ Test complete - workspace isolation verified at E2E level") print(f"\n ✓ Test complete - workspace isolation verified at E2E level")
finally: finally:
# Cleanup test directory (unless KEEP_TEST_ARTIFACTS is set) # Cleanup test directory
if os.path.exists(test_dir) and not KEEP_TEST_ARTIFACTS: if os.path.exists(test_dir):
shutil.rmtree(test_dir) shutil.rmtree(test_dir)
print(f"\n Cleaned up test directory: {test_dir}") print(f"\n Cleaned up test directory: {test_dir}")
elif KEEP_TEST_ARTIFACTS:
print(f"\n Kept test directory for inspection: {test_dir}")