From 68cc386456695980ef40c9b2077613ac2e26c791 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20MANSUY?= Date: Thu, 4 Dec 2025 19:14:31 +0800 Subject: [PATCH] cherry-pick a990c1d4 --- tests/test_workspace_isolation.py | 590 ++++++++---------------------- 1 file changed, 158 insertions(+), 432 deletions(-) diff --git a/tests/test_workspace_isolation.py b/tests/test_workspace_isolation.py index c68242c8..58942e6c 100644 --- a/tests/test_workspace_isolation.py +++ b/tests/test_workspace_isolation.py @@ -1,21 +1,12 @@ #!/usr/bin/env python """ -Test script for Workspace Isolation Feature +Test script for PR #2366: Workspace Isolation Feature -Comprehensive test suite covering workspace isolation in LightRAG: -1. Pipeline Status Isolation - Data isolation between workspaces -2. Lock Mechanism - Parallel execution for different workspaces, serial for same workspace -3. Backward Compatibility - Legacy code without workspace parameters -4. Multi-Workspace Concurrency - Concurrent operations on different workspaces -5. NamespaceLock Re-entrance Protection - Prevents deadlocks -6. Different Namespace Lock Isolation - Locks isolated by namespace -7. Error Handling - Invalid workspace configurations -8. Update Flags Workspace Isolation - Update flags properly isolated -9. Empty Workspace Standardization - Empty workspace handling -10. JsonKVStorage Workspace Isolation - Integration test for KV storage -11. LightRAG End-to-End Workspace Isolation - Complete E2E test with two instances - -Total: 11 test scenarios +Tests the 4 key scenarios mentioned in PR description: +1. Multi-Workspace Concurrency Test +2. Pipeline Status Isolation Test +3. Backward Compatibility Test +4. Lock Mechanism Test """ import asyncio @@ -26,14 +17,12 @@ import tempfile import numpy as np import pytest from pathlib import Path -from typing import List, Tuple, Dict from lightrag.kg.shared_storage import ( get_final_namespace, get_namespace_lock, get_default_workspace, set_default_workspace, initialize_share_data, - finalize_share_data, initialize_pipeline_status, get_namespace_data, set_all_update_flags, @@ -41,16 +30,7 @@ from lightrag.kg.shared_storage import ( get_all_update_flags_status, get_update_flag, ) - - -# ============================================================================= -# Test Configuration -# ============================================================================= - -# Stress test configuration (enable via environment variable) -STRESS_TEST_MODE = os.getenv("LIGHTRAG_STRESS_TEST", "false").lower() == "true" -PARALLEL_WORKERS = int(os.getenv("LIGHTRAG_TEST_WORKERS", "3")) -KEEP_TEST_ARTIFACTS = os.getenv("LIGHTRAG_KEEP_ARTIFACTS", "false").lower() == "true" +from lightrag.kg.json_kv_impl import JsonKVStorage # ============================================================================= @@ -63,85 +43,7 @@ def setup_shared_data(): """Initialize shared data before each test""" initialize_share_data() yield - finalize_share_data() - - -async def _measure_lock_parallelism( - workload: List[Tuple[str, str, str]], hold_time: float = 0.05 -) -> Tuple[int, List[Tuple[str, str]], Dict[str, float]]: - """Run lock acquisition workload and capture peak concurrency and timeline. - - Args: - workload: List of (name, workspace, namespace) tuples - hold_time: How long each worker holds the lock (seconds) - - Returns: - Tuple of (max_parallel, timeline, metrics) where: - - max_parallel: Peak number of concurrent lock holders - - timeline: List of (name, event) tuples tracking execution order - - metrics: Dict with performance metrics (total_duration, max_concurrency, etc.) - """ - - running = 0 - max_parallel = 0 - timeline: List[Tuple[str, str]] = [] - start_time = time.time() - - async def worker(name: str, workspace: str, namespace: str) -> None: - nonlocal running, max_parallel - lock = get_namespace_lock(namespace, workspace) - async with lock: - running += 1 - max_parallel = max(max_parallel, running) - timeline.append((name, "start")) - await asyncio.sleep(hold_time) - timeline.append((name, "end")) - running -= 1 - - await asyncio.gather(*(worker(*args) for args in workload)) - - metrics = { - "total_duration": time.time() - start_time, - "max_concurrency": max_parallel, - "avg_hold_time": hold_time, - "num_workers": len(workload), - } - - return max_parallel, timeline, metrics - - -def _assert_no_timeline_overlap(timeline: List[Tuple[str, str]]) -> None: - """Ensure that timeline events never overlap for sequential execution. - - This function implements a finite state machine that validates: - - No overlapping lock acquisitions (only one task active at a time) - - Proper lock release order (task releases its own lock) - - All locks are properly released - - Args: - timeline: List of (name, event) tuples where event is "start" or "end" - - Raises: - AssertionError: If timeline shows overlapping execution or improper locking - """ - - active_task = None - for name, event in timeline: - if event == "start": - if active_task is not None: - raise AssertionError( - f"Task '{name}' started before '{active_task}' released the lock" - ) - active_task = name - else: - if active_task != name: - raise AssertionError( - f"Task '{name}' finished while '{active_task}' was expected to hold the lock" - ) - active_task = None - - if active_task is not None: - raise AssertionError(f"Task '{active_task}' did not release the lock properly") + # Cleanup after test if needed # ============================================================================= @@ -154,8 +56,6 @@ async def test_pipeline_status_isolation(): """ Test that pipeline status is isolated between different workspaces. """ - # Purpose: Ensure pipeline_status shared data remains unique per workspace. - # Scope: initialize_pipeline_status and get_namespace_data interactions. print("\n" + "=" * 60) print("TEST 1: Pipeline Status Isolation") print("=" * 60) @@ -175,9 +75,7 @@ async def test_pipeline_status_isolation(): data2 = await get_namespace_data("pipeline_status", workspace=workspace2) # Verify they are independent objects - assert ( - data1 is not data2 - ), "Pipeline status data objects are the same (should be different)" + assert data1 is not data2, "Pipeline status data objects are the same (should be different)" # Modify workspace1's data and verify workspace2 is not affected data1["test_key"] = "workspace1_value" @@ -187,12 +85,8 @@ async def test_pipeline_status_isolation(): data2_check = await get_namespace_data("pipeline_status", workspace=workspace2) assert "test_key" in data1_check, "test_key not found in workspace1" - assert ( - data1_check["test_key"] == "workspace1_value" - ), f"workspace1 test_key value incorrect: {data1_check.get('test_key')}" - assert ( - "test_key" not in data2_check - ), f"test_key leaked to workspace2: {data2_check.get('test_key')}" + assert data1_check["test_key"] == "workspace1_value", f"workspace1 test_key value incorrect: {data1_check.get('test_key')}" + assert "test_key" not in data2_check, f"test_key leaked to workspace2: {data2_check.get('test_key')}" print("✅ PASSED: Pipeline Status Isolation") print(" Different workspaces have isolated pipeline status") @@ -210,53 +104,56 @@ async def test_lock_mechanism(): Tests both parallel execution for different workspaces and serialization for the same workspace. """ - # Purpose: Validate that keyed locks isolate workspaces while serializing - # requests within the same workspace. Scope: get_namespace_lock scheduling - # semantics for both cross-workspace and single-workspace cases. print("\n" + "=" * 60) print("TEST 2: Lock Mechanism (No Deadlocks)") print("=" * 60) # Test 2.1: Different workspaces should run in parallel print("\nTest 2.1: Different workspaces locks should be parallel") - - # Support stress testing with configurable number of workers - num_workers = PARALLEL_WORKERS if STRESS_TEST_MODE else 3 - parallel_workload = [ - (f"ws_{chr(97+i)}", f"ws_{chr(97+i)}", "test_namespace") - for i in range(num_workers) - ] - - max_parallel, timeline_parallel, metrics = await _measure_lock_parallelism( - parallel_workload - ) - assert max_parallel >= 2, ( - "Locks for distinct workspaces should overlap; " - f"observed max concurrency: {max_parallel}, timeline={timeline_parallel}" - ) - print("✅ PASSED: Lock Mechanism - Parallel (Different Workspaces)") - print(f" Locks overlapped for different workspaces (max concurrency={max_parallel})") - print(f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} workers") + async def acquire_lock_timed(workspace, namespace, hold_time): + """Acquire a lock and hold it for specified time""" + lock = get_namespace_lock(namespace, workspace) + start = time.time() + async with lock: + print( + f" [{workspace}] acquired lock at {time.time() - start:.2f}s" + ) + await asyncio.sleep(hold_time) + print( + f" [{workspace}] releasing lock at {time.time() - start:.2f}s" + ) + + start = time.time() + await asyncio.gather( + acquire_lock_timed("ws_a", "test_namespace", 0.5), + acquire_lock_timed("ws_b", "test_namespace", 0.5), + acquire_lock_timed("ws_c", "test_namespace", 0.5), + ) + elapsed = time.time() - start + + # If locks are properly isolated by workspace, this should take ~0.5s (parallel) + # If they block each other, it would take ~1.5s (serial) + assert elapsed < 1.0, f"Locks blocked each other: {elapsed:.2f}s (expected < 1.0s)" + + print(f"✅ PASSED: Lock Mechanism - Parallel (Different Workspaces)") + print(f" Locks ran in parallel: {elapsed:.2f}s") # Test 2.2: Same workspace should serialize print("\nTest 2.2: Same workspace locks should serialize") - serial_workload = [ - ("serial_run_1", "ws_same", "test_namespace"), - ("serial_run_2", "ws_same", "test_namespace"), - ] - max_parallel_serial, timeline_serial, metrics_serial = await _measure_lock_parallelism( - serial_workload - ) - assert max_parallel_serial == 1, ( - "Same workspace locks should not overlap; " - f"observed {max_parallel_serial} with timeline {timeline_serial}" - ) - _assert_no_timeline_overlap(timeline_serial) - print("✅ PASSED: Lock Mechanism - Serial (Same Workspace)") - print(" Same workspace operations executed sequentially with no overlap") - print(f" Performance: {metrics_serial['total_duration']:.3f}s for {metrics_serial['num_workers']} tasks") + start = time.time() + await asyncio.gather( + acquire_lock_timed("ws_same", "test_namespace", 0.3), + acquire_lock_timed("ws_same", "test_namespace", 0.3), + ) + elapsed = time.time() - start + + # Same workspace should serialize, taking ~0.6s + assert elapsed >= 0.5, f"Locks didn't serialize: {elapsed:.2f}s (expected >= 0.5s)" + + print(f"✅ PASSED: Lock Mechanism - Serial (Same Workspace)") + print(f" Locks serialized correctly: {elapsed:.2f}s") # ============================================================================= @@ -269,9 +166,6 @@ async def test_backward_compatibility(): """ Test that legacy code without workspace parameter still works correctly. """ - # Purpose: Validate backward-compatible defaults when workspace arguments - # are omitted. Scope: get_final_namespace, set/get_default_workspace and - # initialize_pipeline_status fallback behavior. print("\n" + "=" * 60) print("TEST 3: Backward Compatibility") print("=" * 60) @@ -285,7 +179,7 @@ async def test_backward_compatibility(): assert final_ns == expected, f"Expected {expected}, got {final_ns}" - print("✅ PASSED: Backward Compatibility - get_final_namespace") + print(f"✅ PASSED: Backward Compatibility - get_final_namespace") print(f" Correctly uses default workspace: {final_ns}") # Test 3.2: get_default_workspace @@ -296,7 +190,7 @@ async def test_backward_compatibility(): assert retrieved == "test_default", f"Expected 'test_default', got {retrieved}" - print("✅ PASSED: Backward Compatibility - default workspace") + print(f"✅ PASSED: Backward Compatibility - default workspace") print(f" Default workspace set/get correctly: {retrieved}") # Test 3.3: Empty workspace handling @@ -306,11 +200,9 @@ async def test_backward_compatibility(): final_ns_empty = get_final_namespace("pipeline_status", workspace=None) expected_empty = "pipeline_status" # Should be just the namespace without ':' - assert ( - final_ns_empty == expected_empty - ), f"Expected '{expected_empty}', got '{final_ns_empty}'" + assert final_ns_empty == expected_empty, f"Expected '{expected_empty}', got '{final_ns_empty}'" - print("✅ PASSED: Backward Compatibility - empty workspace") + print(f"✅ PASSED: Backward Compatibility - empty workspace") print(f" Empty workspace handled correctly: '{final_ns_empty}'") # Test 3.4: None workspace with default set @@ -324,12 +216,10 @@ async def test_backward_compatibility(): "pipeline_status", workspace="compat_test_workspace" ) - assert ( - data is not None - ), "Failed to initialize pipeline status with default workspace" + assert data is not None, "Failed to initialize pipeline status with default workspace" - print("✅ PASSED: Backward Compatibility - pipeline init with None") - print(" Pipeline status initialized with default workspace") + print(f"✅ PASSED: Backward Compatibility - pipeline init with None") + print(f" Pipeline status initialized with default workspace") # ============================================================================= @@ -343,9 +233,6 @@ async def test_multi_workspace_concurrency(): Test that multiple workspaces can operate concurrently without interference. Simulates concurrent operations on different workspaces. """ - # Purpose: Simulate concurrent workloads touching pipeline_status across - # workspaces. Scope: initialize_pipeline_status, get_namespace_lock, and - # shared dictionary mutation while ensuring isolation. print("\n" + "=" * 60) print("TEST 4: Multi-Workspace Concurrency") print("=" * 60) @@ -390,10 +277,8 @@ async def test_multi_workspace_concurrency(): # Verify all workspaces completed assert set(results_list) == set(workspaces), "Not all workspaces completed" - print("✅ PASSED: Multi-Workspace Concurrency - Execution") - print( - f" All {len(workspaces)} workspaces completed successfully in {elapsed:.2f}s" - ) + print(f"✅ PASSED: Multi-Workspace Concurrency - Execution") + print(f" All {len(workspaces)} workspaces completed successfully in {elapsed:.2f}s") # Verify data isolation - each workspace should have its own data print("\n Verifying data isolation...") @@ -403,16 +288,12 @@ async def test_multi_workspace_concurrency(): expected_key = f"{ws}_key" expected_value = f"{ws}_value" - assert ( - expected_key in data - ), f"Data not properly isolated for {ws}: missing {expected_key}" - assert ( - data[expected_key] == expected_value - ), f"Data not properly isolated for {ws}: {expected_key}={data[expected_key]} (expected {expected_value})" + assert expected_key in data, f"Data not properly isolated for {ws}: missing {expected_key}" + assert data[expected_key] == expected_value, f"Data not properly isolated for {ws}: {expected_key}={data[expected_key]} (expected {expected_value})" print(f" [{ws}] Data correctly isolated: {expected_key}={data[expected_key]}") - print("✅ PASSED: Multi-Workspace Concurrency - Data Isolation") - print(" All workspaces have properly isolated data") + print(f"✅ PASSED: Multi-Workspace Concurrency - Data Isolation") + print(f" All workspaces have properly isolated data") # ============================================================================= @@ -426,9 +307,6 @@ async def test_namespace_lock_reentrance(): Test that NamespaceLock prevents re-entrance in the same coroutine and allows concurrent use in different coroutines. """ - # Purpose: Ensure NamespaceLock enforces single entry per coroutine while - # allowing concurrent reuse through ContextVar isolation. Scope: lock - # re-entrance checks and concurrent gather semantics. print("\n" + "=" * 60) print("TEST 5: NamespaceLock Re-entrance Protection") print("=" * 60) @@ -454,8 +332,8 @@ async def test_namespace_lock_reentrance(): assert reentrance_failed_correctly, "Re-entrance protection not working" - print("✅ PASSED: NamespaceLock Re-entrance Protection") - print(" Re-entrance correctly raises RuntimeError") + print(f"✅ PASSED: NamespaceLock Re-entrance Protection") + print(f" Re-entrance correctly raises RuntimeError") # Test 5.2: Same NamespaceLock instance in different coroutines should succeed print("\nTest 5.2: Same NamespaceLock instance in different coroutines") @@ -478,14 +356,10 @@ async def test_namespace_lock_reentrance(): # Both coroutines should have completed expected_entries = 4 # 2 starts + 2 ends - assert ( - len(concurrent_results) == expected_entries - ), f"Expected {expected_entries} entries, got {len(concurrent_results)}" + assert len(concurrent_results) == expected_entries, f"Expected {expected_entries} entries, got {len(concurrent_results)}" - print("✅ PASSED: NamespaceLock Concurrent Reuse") - print( - f" Same NamespaceLock instance used successfully in {expected_entries//2} concurrent coroutines" - ) + print(f"✅ PASSED: NamespaceLock Concurrent Reuse") + print(f" Same NamespaceLock instance used successfully in {expected_entries//2} concurrent coroutines") # ============================================================================= @@ -498,29 +372,35 @@ async def test_different_namespace_lock_isolation(): """ Test that locks for different namespaces (same workspace) are independent. """ - # Purpose: Confirm that namespace isolation is enforced even when workspace - # is the same. Scope: get_namespace_lock behavior when namespaces differ. print("\n" + "=" * 60) print("TEST 6: Different Namespace Lock Isolation") print("=" * 60) print("\nTesting locks with same workspace but different namespaces") - workload = [ - ("ns_a", "same_ws", "namespace_a"), - ("ns_b", "same_ws", "namespace_b"), - ("ns_c", "same_ws", "namespace_c"), - ] - max_parallel, timeline, metrics = await _measure_lock_parallelism(workload) + async def acquire_lock_timed(workspace, namespace, hold_time, name): + """Acquire a lock and hold it for specified time""" + lock = get_namespace_lock(namespace, workspace) + start = time.time() + async with lock: + print(f" [{name}] acquired lock at {time.time() - start:.2f}s") + await asyncio.sleep(hold_time) + print(f" [{name}] releasing lock at {time.time() - start:.2f}s") - assert max_parallel >= 2, ( - "Different namespaces within the same workspace should run concurrently; " - f"observed max concurrency {max_parallel} with timeline {timeline}" + # These should run in parallel (different namespaces) + start = time.time() + await asyncio.gather( + acquire_lock_timed("same_ws", "namespace_a", 0.5, "ns_a"), + acquire_lock_timed("same_ws", "namespace_b", 0.5, "ns_b"), + acquire_lock_timed("same_ws", "namespace_c", 0.5, "ns_c"), ) + elapsed = time.time() - start - print("✅ PASSED: Different Namespace Lock Isolation") - print(f" Different namespace locks ran in parallel (max concurrency={max_parallel})") - print(f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} namespaces") + # If locks are properly isolated by namespace, this should take ~0.5s (parallel) + assert elapsed < 1.0, f"Different namespace locks blocked each other: {elapsed:.2f}s (expected < 1.0s)" + + print(f"✅ PASSED: Different Namespace Lock Isolation") + print(f" Different namespace locks ran in parallel: {elapsed:.2f}s") # ============================================================================= @@ -533,18 +413,10 @@ async def test_error_handling(): """ Test error handling for invalid workspace configurations. """ - # Purpose: Validate guardrails for workspace normalization and namespace - # derivation. Scope: set_default_workspace conversions and get_final_namespace - # failure paths when configuration is invalid. print("\n" + "=" * 60) print("TEST 7: Error Handling") print("=" * 60) - # Test 7.0: Missing default workspace should raise ValueError - print("\nTest 7.0: Missing workspace raises ValueError") - with pytest.raises(ValueError): - get_final_namespace("test_namespace", workspace=None) - # Test 7.1: set_default_workspace(None) converts to empty string print("\nTest 7.1: set_default_workspace(None) converts to empty string") @@ -554,10 +426,8 @@ async def test_error_handling(): # Should convert None to "" automatically assert default_ws == "", f"Expected empty string, got: '{default_ws}'" - print("✅ PASSED: Error Handling - None to Empty String") - print( - f" set_default_workspace(None) correctly converts to empty string: '{default_ws}'" - ) + print(f"✅ PASSED: Error Handling - None to Empty String") + print(f" set_default_workspace(None) correctly converts to empty string: '{default_ws}'") # Test 7.2: Empty string workspace behavior print("\nTest 7.2: Empty string workspace creates valid namespace") @@ -566,7 +436,7 @@ async def test_error_handling(): final_ns = get_final_namespace("test_namespace", workspace="") assert final_ns == "test_namespace", f"Unexpected namespace: '{final_ns}'" - print("✅ PASSED: Error Handling - Empty Workspace Namespace") + print(f"✅ PASSED: Error Handling - Empty Workspace Namespace") print(f" Empty workspace creates valid namespace: '{final_ns}'") # Restore default workspace for other tests @@ -583,9 +453,6 @@ async def test_update_flags_workspace_isolation(): """ Test that update flags are properly isolated between workspaces. """ - # Purpose: Confirm update flag setters/readers respect workspace scoping. - # Scope: set_all_update_flags, clear_all_update_flags, get_all_update_flags_status, - # and get_update_flag interactions across namespaces. print("\n" + "=" * 60) print("TEST 8: Update Flags Workspace Isolation") print("=" * 60) @@ -615,17 +482,11 @@ async def test_update_flags_workspace_isolation(): await set_all_update_flags(test_namespace, workspace=workspace1) # Check that only workspace1's flags are set - assert ( - flag1_obj.value is True - ), f"Flag1 should be True after set_all_update_flags, got {flag1_obj.value}" - assert ( - flag2_obj.value is False - ), f"Flag2 should still be False, got {flag2_obj.value}" + assert flag1_obj.value is True, f"Flag1 should be True after set_all_update_flags, got {flag1_obj.value}" + assert flag2_obj.value is False, f"Flag2 should still be False, got {flag2_obj.value}" - print("✅ PASSED: Update Flags - set_all_update_flags Isolation") - print( - f" set_all_update_flags isolated: ws1={flag1_obj.value}, ws2={flag2_obj.value}" - ) + print(f"✅ PASSED: Update Flags - set_all_update_flags Isolation") + print(f" set_all_update_flags isolated: ws1={flag1_obj.value}, ws2={flag2_obj.value}") # Test 8.2: clear_all_update_flags isolation print("\nTest 8.2: clear_all_update_flags workspace isolation") @@ -642,15 +503,11 @@ async def test_update_flags_workspace_isolation(): await clear_all_update_flags(test_namespace, workspace=workspace1) # Check that only workspace1's flags are cleared - assert ( - flag1_obj.value is False - ), f"Flag1 should be False after clear, got {flag1_obj.value}" + assert flag1_obj.value is False, f"Flag1 should be False after clear, got {flag1_obj.value}" assert flag2_obj.value is True, f"Flag2 should still be True, got {flag2_obj.value}" - print("✅ PASSED: Update Flags - clear_all_update_flags Isolation") - print( - f" clear_all_update_flags isolated: ws1={flag1_obj.value}, ws2={flag2_obj.value}" - ) + print(f"✅ PASSED: Update Flags - clear_all_update_flags Isolation") + print(f" clear_all_update_flags isolated: ws1={flag1_obj.value}, ws2={flag2_obj.value}") # Test 8.3: get_all_update_flags_status workspace filtering print("\nTest 8.3: get_all_update_flags_status workspace filtering") @@ -675,31 +532,11 @@ async def test_update_flags_workspace_isolation(): workspace1_keys = [k for k in status1.keys() if workspace1 in k] workspace2_keys = [k for k in status1.keys() if workspace2 in k] - assert ( - len(workspace1_keys) > 0 - ), f"workspace1 keys should be present, got {len(workspace1_keys)}" - assert ( - len(workspace2_keys) == 0 - ), f"workspace2 keys should not be present, got {len(workspace2_keys)}" - for key, values in status1.items(): - assert all(values), f"All flags in {key} should be True, got {values}" + assert len(workspace1_keys) > 0, f"workspace1 keys should be present, got {len(workspace1_keys)}" + assert len(workspace2_keys) == 0, f"workspace2 keys should not be present, got {len(workspace2_keys)}" - # Workspace2 query should only surface workspace2 namespaces - status2 = await get_all_update_flags_status(workspace=workspace2) - expected_ws2_keys = { - f"{workspace2}:{test_namespace}", - f"{workspace2}:ns_c", - } - assert ( - set(status2.keys()) == expected_ws2_keys - ), f"Unexpected namespaces for workspace2: {status2.keys()}" - for key, values in status2.items(): - assert all(values), f"All flags in {key} should be True, got {values}" - - print("✅ PASSED: Update Flags - get_all_update_flags_status Filtering") - print( - f" Status correctly filtered: ws1 keys={len(workspace1_keys)}, ws2 keys={len(workspace2_keys)}" - ) + print(f"✅ PASSED: Update Flags - get_all_update_flags_status Filtering") + print(f" Status correctly filtered: ws1 keys={len(workspace1_keys)}, ws2 keys={len(workspace2_keys)}") # ============================================================================= @@ -712,9 +549,6 @@ async def test_empty_workspace_standardization(): """ Test that empty workspace is properly standardized to "" instead of "_". """ - # Purpose: Verify namespace formatting when workspace is an empty string. - # Scope: get_final_namespace output and initialize_pipeline_status behavior - # between empty and non-empty workspaces. print("\n" + "=" * 60) print("TEST 9: Empty Workspace Standardization") print("=" * 60) @@ -726,11 +560,9 @@ async def test_empty_workspace_standardization(): final_ns = get_final_namespace("test_namespace", workspace=None) # Should be just "test_namespace" without colon prefix - assert ( - final_ns == "test_namespace" - ), f"Unexpected namespace format: '{final_ns}' (expected 'test_namespace')" + assert final_ns == "test_namespace", f"Unexpected namespace format: '{final_ns}' (expected 'test_namespace')" - print("✅ PASSED: Empty Workspace Standardization - Format") + print(f"✅ PASSED: Empty Workspace Standardization - Format") print(f" Empty workspace creates correct namespace: '{final_ns}'") # Test 9.2: Empty workspace vs non-empty workspace behavior @@ -747,12 +579,10 @@ async def test_empty_workspace_standardization(): data_nonempty = await get_namespace_data("pipeline_status", workspace="test_ws") # They should be different objects - assert ( - data_empty is not data_nonempty - ), "Empty and non-empty workspaces share data (should be independent)" + assert data_empty is not data_nonempty, "Empty and non-empty workspaces share data (should be independent)" - print("✅ PASSED: Empty Workspace Standardization - Behavior") - print(" Empty and non-empty workspaces have independent data") + print(f"✅ PASSED: Empty Workspace Standardization - Behavior") + print(f" Empty and non-empty workspaces have independent data") # ============================================================================= @@ -767,9 +597,6 @@ async def test_json_kv_storage_workspace_isolation(): Creates two JsonKVStorage instances with different workspaces, writes different data, and verifies they don't mix. """ - # Purpose: Ensure JsonKVStorage respects workspace-specific directories and data. - # Scope: storage initialization, upsert/get_by_id operations, and filesystem layout - # inside the temporary working directory. print("\n" + "=" * 60) print("TEST 10: JsonKVStorage Workspace Isolation (Integration)") print("=" * 60) @@ -792,9 +619,7 @@ async def test_json_kv_storage_workspace_isolation(): } # Test 10.1: Create two JsonKVStorage instances with different workspaces - print( - "\nTest 10.1: Create two JsonKVStorage instances with different workspaces" - ) + print("\nTest 10.1: Create two JsonKVStorage instances with different workspaces") from lightrag.kg.json_kv_impl import JsonKVStorage @@ -816,41 +641,25 @@ async def test_json_kv_storage_workspace_isolation(): await storage1.initialize() await storage2.initialize() - print(" Storage1 created: workspace=workspace1, namespace=entities") - print(" Storage2 created: workspace=workspace2, namespace=entities") + print(f" Storage1 created: workspace=workspace1, namespace=entities") + print(f" Storage2 created: workspace=workspace2, namespace=entities") # Test 10.2: Write different data to each storage print("\nTest 10.2: Write different data to each storage") # Write to storage1 (upsert expects dict[str, dict]) - await storage1.upsert( - { - "entity1": { - "content": "Data from workspace1 - AI Research", - "type": "entity", - }, - "entity2": { - "content": "Data from workspace1 - Machine Learning", - "type": "entity", - }, - } - ) - print(" Written to storage1: entity1, entity2") + await storage1.upsert({ + "entity1": {"content": "Data from workspace1 - AI Research", "type": "entity"}, + "entity2": {"content": "Data from workspace1 - Machine Learning", "type": "entity"} + }) + print(f" Written to storage1: entity1, entity2") # Write to storage2 - await storage2.upsert( - { - "entity1": { - "content": "Data from workspace2 - Deep Learning", - "type": "entity", - }, - "entity2": { - "content": "Data from workspace2 - Neural Networks", - "type": "entity", - }, - } - ) - print(" Written to storage2: entity1, entity2") + await storage2.upsert({ + "entity1": {"content": "Data from workspace2 - Deep Learning", "type": "entity"}, + "entity2": {"content": "Data from workspace2 - Neural Networks", "type": "entity"} + }) + print(f" Written to storage2: entity1, entity2") # Test 10.3: Read data from each storage and verify isolation print("\nTest 10.3: Read data and verify isolation") @@ -873,29 +682,15 @@ async def test_json_kv_storage_workspace_isolation(): assert result1_entity2 is not None, "Storage1 entity2 should not be None" assert result2_entity1 is not None, "Storage2 entity1 should not be None" assert result2_entity2 is not None, "Storage2 entity2 should not be None" - assert ( - result1_entity1.get("content") == "Data from workspace1 - AI Research" - ), "Storage1 entity1 content mismatch" - assert ( - result1_entity2.get("content") == "Data from workspace1 - Machine Learning" - ), "Storage1 entity2 content mismatch" - assert ( - result2_entity1.get("content") == "Data from workspace2 - Deep Learning" - ), "Storage2 entity1 content mismatch" - assert ( - result2_entity2.get("content") == "Data from workspace2 - Neural Networks" - ), "Storage2 entity2 content mismatch" - assert result1_entity1.get("content") != result2_entity1.get( - "content" - ), "Storage1 and Storage2 entity1 should have different content" - assert result1_entity2.get("content") != result2_entity2.get( - "content" - ), "Storage1 and Storage2 entity2 should have different content" + assert result1_entity1.get("content") == "Data from workspace1 - AI Research", f"Storage1 entity1 content mismatch" + assert result1_entity2.get("content") == "Data from workspace1 - Machine Learning", f"Storage1 entity2 content mismatch" + assert result2_entity1.get("content") == "Data from workspace2 - Deep Learning", f"Storage2 entity1 content mismatch" + assert result2_entity2.get("content") == "Data from workspace2 - Neural Networks", f"Storage2 entity2 content mismatch" + assert result1_entity1.get("content") != result2_entity1.get("content"), "Storage1 and Storage2 entity1 should have different content" + assert result1_entity2.get("content") != result2_entity2.get("content"), "Storage1 and Storage2 entity2 should have different content" - print("✅ PASSED: JsonKVStorage - Data Isolation") - print( - " Two storage instances correctly isolated: ws1 and ws2 have different data" - ) + print(f"✅ PASSED: JsonKVStorage - Data Isolation") + print(f" Two storage instances correctly isolated: ws1 and ws2 have different data") # Test 10.4: Verify file structure print("\nTest 10.4: Verify file structure") @@ -911,16 +706,14 @@ async def test_json_kv_storage_workspace_isolation(): assert ws1_exists, "workspace1 directory should exist" assert ws2_exists, "workspace2 directory should exist" - print("✅ PASSED: JsonKVStorage - File Structure") + print(f"✅ PASSED: JsonKVStorage - File Structure") print(f" Workspace directories correctly created: {ws1_dir} and {ws2_dir}") finally: - # Cleanup test directory (unless KEEP_TEST_ARTIFACTS is set) - if os.path.exists(test_dir) and not KEEP_TEST_ARTIFACTS: + # Cleanup test directory + if os.path.exists(test_dir): shutil.rmtree(test_dir) print(f"\n Cleaned up test directory: {test_dir}") - elif KEEP_TEST_ARTIFACTS: - print(f"\n Kept test directory for inspection: {test_dir}") # ============================================================================= @@ -935,42 +728,26 @@ async def test_lightrag_end_to_end_workspace_isolation(): insert different data, and verify file separation. Uses mock LLM and embedding functions to avoid external API calls. """ - # Purpose: Validate that full LightRAG flows keep artifacts scoped per workspace. - # Scope: LightRAG.initialize_storages + ainsert side effects plus filesystem - # verification for generated storage files. print("\n" + "=" * 60) print("TEST 11: LightRAG End-to-End Workspace Isolation") print("=" * 60) # Create temporary test directory - # test_dir = tempfile.mkdtemp(prefix="lightrag_test_e2e_") - test_dir = str(Path(__file__).parent.parent / "temp/e2e_workspace_isolation") - if os.path.exists(test_dir): - shutil.rmtree(test_dir) - os.makedirs(test_dir, exist_ok=True) + test_dir = tempfile.mkdtemp(prefix="lightrag_test_e2e_") print(f"\n Using test directory: {test_dir}") try: - # Factory function to create different mock LLM functions for each workspace - def create_mock_llm_func(workspace_name): - """Create a mock LLM function that returns different content based on workspace""" - async def mock_llm_func( - prompt, system_prompt=None, history_messages=[], **kwargs - ) -> str: - # Return different responses based on workspace - # Format: entity<|#|>entity_name<|#|>entity_type<|#|>entity_description - # Format: relation<|#|>source_entity<|#|>target_entity<|#|>keywords<|#|>description - if workspace_name == "project_a": - return """entity<|#|>Artificial Intelligence<|#|>concept<|#|>AI is a field of computer science focused on creating intelligent machines. + # Mock LLM function + async def mock_llm_func( + prompt, system_prompt=None, history_messages=[], **kwargs + ) -> str: + # Return a mock response that simulates entity extraction in the correct format + # Format: entity<|#|>entity_name<|#|>entity_type<|#|>entity_description + # Format: relation<|#|>source_entity<|#|>target_entity<|#|>keywords<|#|>description + return """entity<|#|>Artificial Intelligence<|#|>concept<|#|>AI is a field of computer science focused on creating intelligent machines. entity<|#|>Machine Learning<|#|>concept<|#|>Machine Learning is a subset of AI that enables systems to learn from data. relation<|#|>Machine Learning<|#|>Artificial Intelligence<|#|>subset, related field<|#|>Machine Learning is a key component and subset of Artificial Intelligence. <|COMPLETE|>""" - else: # project_b - return """entity<|#|>Deep Learning<|#|>concept<|#|>Deep Learning is a subset of machine learning using neural networks with multiple layers. -entity<|#|>Neural Networks<|#|>concept<|#|>Neural Networks are computing systems inspired by biological neural networks. -relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Learning uses multiple layers of Neural Networks to learn representations. -<|COMPLETE|>""" - return mock_llm_func # Mock embedding function async def mock_embedding_func(texts: list[str]) -> np.ndarray: @@ -980,51 +757,36 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le print("\nTest 11.1: Create two LightRAG instances with different workspaces") from lightrag import LightRAG - from lightrag.utils import EmbeddingFunc, Tokenizer - - # Create different mock LLM functions for each workspace - mock_llm_func_a = create_mock_llm_func("project_a") - mock_llm_func_b = create_mock_llm_func("project_b") - - class _SimpleTokenizerImpl: - def encode(self, content: str) -> list[int]: - return [ord(ch) for ch in content] - - def decode(self, tokens: list[int]) -> str: - return "".join(chr(t) for t in tokens) - - tokenizer = Tokenizer("mock-tokenizer", _SimpleTokenizerImpl()) + from lightrag.utils import EmbeddingFunc rag1 = LightRAG( working_dir=test_dir, workspace="project_a", - llm_model_func=mock_llm_func_a, + llm_model_func=mock_llm_func, embedding_func=EmbeddingFunc( embedding_dim=384, max_token_size=8192, func=mock_embedding_func, ), - tokenizer=tokenizer, ) rag2 = LightRAG( working_dir=test_dir, workspace="project_b", - llm_model_func=mock_llm_func_b, + llm_model_func=mock_llm_func, embedding_func=EmbeddingFunc( embedding_dim=384, max_token_size=8192, func=mock_embedding_func, ), - tokenizer=tokenizer, ) # Initialize storages await rag1.initialize_storages() await rag2.initialize_storages() - print(" RAG1 created: workspace=project_a") - print(" RAG2 created: workspace=project_b") + print(f" RAG1 created: workspace=project_a") + print(f" RAG2 created: workspace=project_b") # Test 11.2: Insert different data to each RAG instance print("\nTest 11.2: Insert different data to each RAG instance") @@ -1058,20 +820,20 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le assert project_b_exists, "project_b directory should exist" # List files in each directory - print("\n Files in project_a/:") + print(f"\n Files in project_a/:") for file in sorted(project_a_dir.glob("*")): if file.is_file(): size = file.stat().st_size print(f" - {file.name} ({size} bytes)") - print("\n Files in project_b/:") + print(f"\n Files in project_b/:") for file in sorted(project_b_dir.glob("*")): if file.is_file(): size = file.stat().st_size print(f" - {file.name} ({size} bytes)") - print("✅ PASSED: LightRAG E2E - File Structure") - print(" Workspace directories correctly created and separated") + print(f"✅ PASSED: LightRAG E2E - File Structure") + print(f" Workspace directories correctly created and separated") # Test 11.4: Verify data isolation by checking file contents print("\nTest 11.4: Verify data isolation (check file contents)") @@ -1093,55 +855,19 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le print(f" project_b doc count: {len(docs_b_content)}") # Verify they contain different data - assert ( - docs_a_content != docs_b_content - ), "Document storage not properly isolated" + assert docs_a_content != docs_b_content, "Document storage not properly isolated" - # Verify each workspace contains its own text content - docs_a_str = json.dumps(docs_a_content) - docs_b_str = json.dumps(docs_b_content) - - # Check project_a contains its text and NOT project_b's text - assert ( - "Artificial Intelligence" in docs_a_str - ), "project_a should contain 'Artificial Intelligence'" - assert ( - "Machine Learning" in docs_a_str - ), "project_a should contain 'Machine Learning'" - assert ( - "Deep Learning" not in docs_a_str - ), "project_a should NOT contain 'Deep Learning' from project_b" - assert ( - "Neural Networks" not in docs_a_str - ), "project_a should NOT contain 'Neural Networks' from project_b" - - # Check project_b contains its text and NOT project_a's text - assert ( - "Deep Learning" in docs_b_str - ), "project_b should contain 'Deep Learning'" - assert ( - "Neural Networks" in docs_b_str - ), "project_b should contain 'Neural Networks'" - assert ( - "Artificial Intelligence" not in docs_b_str - ), "project_b should NOT contain 'Artificial Intelligence' from project_a" - # Note: "Machine Learning" might appear in project_b's text, so we skip that check - - print("✅ PASSED: LightRAG E2E - Data Isolation") - print(" Document storage correctly isolated between workspaces") - print(" project_a contains only its own data") - print(" project_b contains only its own data") + print(f"✅ PASSED: LightRAG E2E - Data Isolation") + print(f" Document storage correctly isolated between workspaces") else: - print(" Document storage files not found (may not be created yet)") - print("✅ PASSED: LightRAG E2E - Data Isolation") - print(" Skipped file content check (files not created)") + print(f" Document storage files not found (may not be created yet)") + print(f"✅ PASSED: LightRAG E2E - Data Isolation") + print(f" Skipped file content check (files not created)") - print("\n ✓ Test complete - workspace isolation verified at E2E level") + print(f"\n ✓ Test complete - workspace isolation verified at E2E level") finally: - # Cleanup test directory (unless KEEP_TEST_ARTIFACTS is set) - if os.path.exists(test_dir) and not KEEP_TEST_ARTIFACTS: + # Cleanup test directory + if os.path.exists(test_dir): shutil.rmtree(test_dir) print(f"\n Cleaned up test directory: {test_dir}") - elif KEEP_TEST_ARTIFACTS: - print(f"\n Kept test directory for inspection: {test_dir}")