diff --git a/tests/test_workspace_isolation.py b/tests/test_workspace_isolation.py index 7aa4ae09..4a6f284b 100644 --- a/tests/test_workspace_isolation.py +++ b/tests/test_workspace_isolation.py @@ -26,14 +26,12 @@ import tempfile import numpy as np import pytest from pathlib import Path -from typing import List, Tuple, Dict from lightrag.kg.shared_storage import ( get_final_namespace, get_namespace_lock, get_default_workspace, set_default_workspace, initialize_share_data, - finalize_share_data, initialize_pipeline_status, get_namespace_data, set_all_update_flags, @@ -43,16 +41,6 @@ from lightrag.kg.shared_storage import ( ) -# ============================================================================= -# Test Configuration -# ============================================================================= - -# Stress test configuration (enable via environment variable) -STRESS_TEST_MODE = os.getenv("LIGHTRAG_STRESS_TEST", "false").lower() == "true" -PARALLEL_WORKERS = int(os.getenv("LIGHTRAG_TEST_WORKERS", "3")) -KEEP_TEST_ARTIFACTS = os.getenv("LIGHTRAG_KEEP_ARTIFACTS", "false").lower() == "true" - - # ============================================================================= # Pytest Fixtures # ============================================================================= @@ -63,85 +51,7 @@ def setup_shared_data(): """Initialize shared data before each test""" initialize_share_data() yield - finalize_share_data() - - -async def _measure_lock_parallelism( - workload: List[Tuple[str, str, str]], hold_time: float = 0.05 -) -> Tuple[int, List[Tuple[str, str]], Dict[str, float]]: - """Run lock acquisition workload and capture peak concurrency and timeline. - - Args: - workload: List of (name, workspace, namespace) tuples - hold_time: How long each worker holds the lock (seconds) - - Returns: - Tuple of (max_parallel, timeline, metrics) where: - - max_parallel: Peak number of concurrent lock holders - - timeline: List of (name, event) tuples tracking execution order - - metrics: Dict with performance metrics (total_duration, max_concurrency, etc.) - """ - - running = 0 - max_parallel = 0 - timeline: List[Tuple[str, str]] = [] - start_time = time.time() - - async def worker(name: str, workspace: str, namespace: str) -> None: - nonlocal running, max_parallel - lock = get_namespace_lock(namespace, workspace) - async with lock: - running += 1 - max_parallel = max(max_parallel, running) - timeline.append((name, "start")) - await asyncio.sleep(hold_time) - timeline.append((name, "end")) - running -= 1 - - await asyncio.gather(*(worker(*args) for args in workload)) - - metrics = { - "total_duration": time.time() - start_time, - "max_concurrency": max_parallel, - "avg_hold_time": hold_time, - "num_workers": len(workload), - } - - return max_parallel, timeline, metrics - - -def _assert_no_timeline_overlap(timeline: List[Tuple[str, str]]) -> None: - """Ensure that timeline events never overlap for sequential execution. - - This function implements a finite state machine that validates: - - No overlapping lock acquisitions (only one task active at a time) - - Proper lock release order (task releases its own lock) - - All locks are properly released - - Args: - timeline: List of (name, event) tuples where event is "start" or "end" - - Raises: - AssertionError: If timeline shows overlapping execution or improper locking - """ - - active_task = None - for name, event in timeline: - if event == "start": - if active_task is not None: - raise AssertionError( - f"Task '{name}' started before '{active_task}' released the lock" - ) - active_task = name - else: - if active_task != name: - raise AssertionError( - f"Task '{name}' finished while '{active_task}' was expected to hold the lock" - ) - active_task = None - - if active_task is not None: - raise AssertionError(f"Task '{active_task}' did not release the lock properly") + # Cleanup after test if needed # ============================================================================= @@ -154,8 +64,6 @@ async def test_pipeline_status_isolation(): """ Test that pipeline status is isolated between different workspaces. """ - # Purpose: Ensure pipeline_status shared data remains unique per workspace. - # Scope: initialize_pipeline_status and get_namespace_data interactions. print("\n" + "=" * 60) print("TEST 1: Pipeline Status Isolation") print("=" * 60) @@ -210,9 +118,6 @@ async def test_lock_mechanism(): Tests both parallel execution for different workspaces and serialization for the same workspace. """ - # Purpose: Validate that keyed locks isolate workspaces while serializing - # requests within the same workspace. Scope: get_namespace_lock scheduling - # semantics for both cross-workspace and single-workspace cases. print("\n" + "=" * 60) print("TEST 2: Lock Mechanism (No Deadlocks)") print("=" * 60) @@ -220,51 +125,45 @@ async def test_lock_mechanism(): # Test 2.1: Different workspaces should run in parallel print("\nTest 2.1: Different workspaces locks should be parallel") - # Support stress testing with configurable number of workers - num_workers = PARALLEL_WORKERS if STRESS_TEST_MODE else 3 - parallel_workload = [ - (f"ws_{chr(97+i)}", f"ws_{chr(97+i)}", "test_namespace") - for i in range(num_workers) - ] + async def acquire_lock_timed(workspace, namespace, hold_time): + """Acquire a lock and hold it for specified time""" + lock = get_namespace_lock(namespace, workspace) + start = time.time() + async with lock: + print(f" [{workspace}] acquired lock at {time.time() - start:.2f}s") + await asyncio.sleep(hold_time) + print(f" [{workspace}] releasing lock at {time.time() - start:.2f}s") - max_parallel, timeline_parallel, metrics = await _measure_lock_parallelism( - parallel_workload - ) - assert max_parallel >= 2, ( - "Locks for distinct workspaces should overlap; " - f"observed max concurrency: {max_parallel}, timeline={timeline_parallel}" + start = time.time() + await asyncio.gather( + acquire_lock_timed("ws_a", "test_namespace", 0.5), + acquire_lock_timed("ws_b", "test_namespace", 0.5), + acquire_lock_timed("ws_c", "test_namespace", 0.5), ) + elapsed = time.time() - start + + # If locks are properly isolated by workspace, this should take ~0.5s (parallel) + # If they block each other, it would take ~1.5s (serial) + assert elapsed < 1.0, f"Locks blocked each other: {elapsed:.2f}s (expected < 1.0s)" print("✅ PASSED: Lock Mechanism - Parallel (Different Workspaces)") - print( - f" Locks overlapped for different workspaces (max concurrency={max_parallel})" - ) - print( - f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} workers" - ) + print(f" Locks ran in parallel: {elapsed:.2f}s") # Test 2.2: Same workspace should serialize print("\nTest 2.2: Same workspace locks should serialize") - serial_workload = [ - ("serial_run_1", "ws_same", "test_namespace"), - ("serial_run_2", "ws_same", "test_namespace"), - ] - ( - max_parallel_serial, - timeline_serial, - metrics_serial, - ) = await _measure_lock_parallelism(serial_workload) - assert max_parallel_serial == 1, ( - "Same workspace locks should not overlap; " - f"observed {max_parallel_serial} with timeline {timeline_serial}" + + start = time.time() + await asyncio.gather( + acquire_lock_timed("ws_same", "test_namespace", 0.3), + acquire_lock_timed("ws_same", "test_namespace", 0.3), ) - _assert_no_timeline_overlap(timeline_serial) + elapsed = time.time() - start + + # Same workspace should serialize, taking ~0.6s + assert elapsed >= 0.5, f"Locks didn't serialize: {elapsed:.2f}s (expected >= 0.5s)" print("✅ PASSED: Lock Mechanism - Serial (Same Workspace)") - print(" Same workspace operations executed sequentially with no overlap") - print( - f" Performance: {metrics_serial['total_duration']:.3f}s for {metrics_serial['num_workers']} tasks" - ) + print(f" Locks serialized correctly: {elapsed:.2f}s") # ============================================================================= @@ -277,9 +176,6 @@ async def test_backward_compatibility(): """ Test that legacy code without workspace parameter still works correctly. """ - # Purpose: Validate backward-compatible defaults when workspace arguments - # are omitted. Scope: get_final_namespace, set/get_default_workspace and - # initialize_pipeline_status fallback behavior. print("\n" + "=" * 60) print("TEST 3: Backward Compatibility") print("=" * 60) @@ -351,9 +247,6 @@ async def test_multi_workspace_concurrency(): Test that multiple workspaces can operate concurrently without interference. Simulates concurrent operations on different workspaces. """ - # Purpose: Simulate concurrent workloads touching pipeline_status across - # workspaces. Scope: initialize_pipeline_status, get_namespace_lock, and - # shared dictionary mutation while ensuring isolation. print("\n" + "=" * 60) print("TEST 4: Multi-Workspace Concurrency") print("=" * 60) @@ -434,9 +327,6 @@ async def test_namespace_lock_reentrance(): Test that NamespaceLock prevents re-entrance in the same coroutine and allows concurrent use in different coroutines. """ - # Purpose: Ensure NamespaceLock enforces single entry per coroutine while - # allowing concurrent reuse through ContextVar isolation. Scope: lock - # re-entrance checks and concurrent gather semantics. print("\n" + "=" * 60) print("TEST 5: NamespaceLock Re-entrance Protection") print("=" * 60) @@ -506,33 +396,37 @@ async def test_different_namespace_lock_isolation(): """ Test that locks for different namespaces (same workspace) are independent. """ - # Purpose: Confirm that namespace isolation is enforced even when workspace - # is the same. Scope: get_namespace_lock behavior when namespaces differ. print("\n" + "=" * 60) print("TEST 6: Different Namespace Lock Isolation") print("=" * 60) print("\nTesting locks with same workspace but different namespaces") - workload = [ - ("ns_a", "same_ws", "namespace_a"), - ("ns_b", "same_ws", "namespace_b"), - ("ns_c", "same_ws", "namespace_c"), - ] - max_parallel, timeline, metrics = await _measure_lock_parallelism(workload) + async def acquire_lock_timed(workspace, namespace, hold_time, name): + """Acquire a lock and hold it for specified time""" + lock = get_namespace_lock(namespace, workspace) + start = time.time() + async with lock: + print(f" [{name}] acquired lock at {time.time() - start:.2f}s") + await asyncio.sleep(hold_time) + print(f" [{name}] releasing lock at {time.time() - start:.2f}s") - assert max_parallel >= 2, ( - "Different namespaces within the same workspace should run concurrently; " - f"observed max concurrency {max_parallel} with timeline {timeline}" + # These should run in parallel (different namespaces) + start = time.time() + await asyncio.gather( + acquire_lock_timed("same_ws", "namespace_a", 0.5, "ns_a"), + acquire_lock_timed("same_ws", "namespace_b", 0.5, "ns_b"), + acquire_lock_timed("same_ws", "namespace_c", 0.5, "ns_c"), ) + elapsed = time.time() - start + + # If locks are properly isolated by namespace, this should take ~0.5s (parallel) + assert ( + elapsed < 1.0 + ), f"Different namespace locks blocked each other: {elapsed:.2f}s (expected < 1.0s)" print("✅ PASSED: Different Namespace Lock Isolation") - print( - f" Different namespace locks ran in parallel (max concurrency={max_parallel})" - ) - print( - f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} namespaces" - ) + print(f" Different namespace locks ran in parallel: {elapsed:.2f}s") # ============================================================================= @@ -545,18 +439,10 @@ async def test_error_handling(): """ Test error handling for invalid workspace configurations. """ - # Purpose: Validate guardrails for workspace normalization and namespace - # derivation. Scope: set_default_workspace conversions and get_final_namespace - # failure paths when configuration is invalid. print("\n" + "=" * 60) print("TEST 7: Error Handling") print("=" * 60) - # Test 7.0: Missing default workspace should raise ValueError - print("\nTest 7.0: Missing workspace raises ValueError") - with pytest.raises(ValueError): - get_final_namespace("test_namespace", workspace=None) - # Test 7.1: set_default_workspace(None) converts to empty string print("\nTest 7.1: set_default_workspace(None) converts to empty string") @@ -595,9 +481,6 @@ async def test_update_flags_workspace_isolation(): """ Test that update flags are properly isolated between workspaces. """ - # Purpose: Confirm update flag setters/readers respect workspace scoping. - # Scope: set_all_update_flags, clear_all_update_flags, get_all_update_flags_status, - # and get_update_flag interactions across namespaces. print("\n" + "=" * 60) print("TEST 8: Update Flags Workspace Isolation") print("=" * 60) @@ -693,20 +576,6 @@ async def test_update_flags_workspace_isolation(): assert ( len(workspace2_keys) == 0 ), f"workspace2 keys should not be present, got {len(workspace2_keys)}" - for key, values in status1.items(): - assert all(values), f"All flags in {key} should be True, got {values}" - - # Workspace2 query should only surface workspace2 namespaces - status2 = await get_all_update_flags_status(workspace=workspace2) - expected_ws2_keys = { - f"{workspace2}:{test_namespace}", - f"{workspace2}:ns_c", - } - assert ( - set(status2.keys()) == expected_ws2_keys - ), f"Unexpected namespaces for workspace2: {status2.keys()}" - for key, values in status2.items(): - assert all(values), f"All flags in {key} should be True, got {values}" print("✅ PASSED: Update Flags - get_all_update_flags_status Filtering") print( @@ -724,9 +593,6 @@ async def test_empty_workspace_standardization(): """ Test that empty workspace is properly standardized to "" instead of "_". """ - # Purpose: Verify namespace formatting when workspace is an empty string. - # Scope: get_final_namespace output and initialize_pipeline_status behavior - # between empty and non-empty workspaces. print("\n" + "=" * 60) print("TEST 9: Empty Workspace Standardization") print("=" * 60) @@ -779,9 +645,6 @@ async def test_json_kv_storage_workspace_isolation(): Creates two JsonKVStorage instances with different workspaces, writes different data, and verifies they don't mix. """ - # Purpose: Ensure JsonKVStorage respects workspace-specific directories and data. - # Scope: storage initialization, upsert/get_by_id operations, and filesystem layout - # inside the temporary working directory. print("\n" + "=" * 60) print("TEST 10: JsonKVStorage Workspace Isolation (Integration)") print("=" * 60) @@ -927,12 +790,10 @@ async def test_json_kv_storage_workspace_isolation(): print(f" Workspace directories correctly created: {ws1_dir} and {ws2_dir}") finally: - # Cleanup test directory (unless KEEP_TEST_ARTIFACTS is set) - if os.path.exists(test_dir) and not KEEP_TEST_ARTIFACTS: + # Cleanup test directory + if os.path.exists(test_dir): shutil.rmtree(test_dir) print(f"\n Cleaned up test directory: {test_dir}") - elif KEEP_TEST_ARTIFACTS: - print(f"\n Kept test directory for inspection: {test_dir}") # ============================================================================= @@ -947,9 +808,6 @@ async def test_lightrag_end_to_end_workspace_isolation(): insert different data, and verify file separation. Uses mock LLM and embedding functions to avoid external API calls. """ - # Purpose: Validate that full LightRAG flows keep artifacts scoped per workspace. - # Scope: LightRAG.initialize_storages + ainsert side effects plus filesystem - # verification for generated storage files. print("\n" + "=" * 60) print("TEST 11: LightRAG End-to-End Workspace Isolation") print("=" * 60) @@ -966,13 +824,9 @@ async def test_lightrag_end_to_end_workspace_isolation(): # Factory function to create different mock LLM functions for each workspace def create_mock_llm_func(workspace_name): """Create a mock LLM function that returns different content based on workspace""" - async def mock_llm_func( prompt, system_prompt=None, history_messages=[], **kwargs ) -> str: - # Add coroutine switching to simulate async I/O and allow concurrent execution - await asyncio.sleep(0) - # Return different responses based on workspace # Format: entity<|#|>entity_name<|#|>entity_type<|#|>entity_description # Format: relation<|#|>source_entity<|#|>target_entity<|#|>keywords<|#|>description @@ -986,34 +840,22 @@ relation<|#|>Machine Learning<|#|>Artificial Intelligence<|#|>subset, related fi entity<|#|>Neural Networks<|#|>concept<|#|>Neural Networks are computing systems inspired by biological neural networks. relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Learning uses multiple layers of Neural Networks to learn representations. <|COMPLETE|>""" - return mock_llm_func # Mock embedding function async def mock_embedding_func(texts: list[str]) -> np.ndarray: - # Add coroutine switching to simulate async I/O and allow concurrent execution - await asyncio.sleep(0) return np.random.rand(len(texts), 384) # 384-dimensional vectors # Test 11.1: Create two LightRAG instances with different workspaces print("\nTest 11.1: Create two LightRAG instances with different workspaces") from lightrag import LightRAG - from lightrag.utils import EmbeddingFunc, Tokenizer + from lightrag.utils import EmbeddingFunc # Create different mock LLM functions for each workspace mock_llm_func_a = create_mock_llm_func("project_a") mock_llm_func_b = create_mock_llm_func("project_b") - class _SimpleTokenizerImpl: - def encode(self, content: str) -> list[int]: - return [ord(ch) for ch in content] - - def decode(self, tokens: list[int]) -> str: - return "".join(chr(t) for t in tokens) - - tokenizer = Tokenizer("mock-tokenizer", _SimpleTokenizerImpl()) - rag1 = LightRAG( working_dir=test_dir, workspace="project_a", @@ -1023,7 +865,6 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le max_token_size=8192, func=mock_embedding_func, ), - tokenizer=tokenizer, ) rag2 = LightRAG( @@ -1035,7 +876,6 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le max_token_size=8192, func=mock_embedding_func, ), - tokenizer=tokenizer, ) # Initialize storages @@ -1045,24 +885,19 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le print(" RAG1 created: workspace=project_a") print(" RAG2 created: workspace=project_b") - # Test 11.2: Insert different data to each RAG instance (CONCURRENTLY) - print("\nTest 11.2: Insert different data to each RAG instance (concurrently)") + # Test 11.2: Insert different data to each RAG instance + print("\nTest 11.2: Insert different data to each RAG instance") text_for_project_a = "This document is about Artificial Intelligence and Machine Learning. AI is transforming the world." text_for_project_b = "This document is about Deep Learning and Neural Networks. Deep learning uses multiple layers." - # Insert to both projects concurrently to test workspace isolation under concurrent load - print(" Starting concurrent insert operations...") - start_time = time.time() - await asyncio.gather( - rag1.ainsert(text_for_project_a), - rag2.ainsert(text_for_project_b) - ) - elapsed_time = time.time() - start_time - - print(f" Inserted to project_a: {len(text_for_project_a)} chars (concurrent)") - print(f" Inserted to project_b: {len(text_for_project_b)} chars (concurrent)") - print(f" Total concurrent execution time: {elapsed_time:.3f}s") + # Insert to project_a + await rag1.ainsert(text_for_project_a) + print(f" Inserted to project_a: {len(text_for_project_a)} chars") + + # Insert to project_b + await rag2.ainsert(text_for_project_b) + print(f" Inserted to project_b: {len(text_for_project_b)} chars") # Test 11.3: Verify file structure print("\nTest 11.3: Verify workspace directory structure") @@ -1163,9 +998,9 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le print("\n ✓ Test complete - workspace isolation verified at E2E level") finally: - # Cleanup test directory (unless KEEP_TEST_ARTIFACTS is set) - if os.path.exists(test_dir) and not KEEP_TEST_ARTIFACTS: - shutil.rmtree(test_dir) - print(f"\n Cleaned up test directory: {test_dir}") - elif KEEP_TEST_ARTIFACTS: - print(f"\n Kept test directory for inspection: {test_dir}") + # Cleanup test directory + # if os.path.exists(test_dir): + # shutil.rmtree(test_dir) + # print(f"\n Cleaned up test directory: {test_dir}") + print("Keep test directory for manual inspection:") + print(f" {test_dir}")