From 60520e0188663c2956a094a045f68646f916b000 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 18 Nov 2025 10:17:34 +0800 Subject: [PATCH] test: add concurrent execution to workspace isolation test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Add async sleep to mock functions • Test concurrent ainsert operations • Use asyncio.gather for parallel exec • Measure concurrent execution time (cherry picked from commit 6ae0c14438042f0bae7eaefb1ea738148be3a97c) --- tests/test_workspace_isolation.py | 72 +++++++++++-------------------- 1 file changed, 25 insertions(+), 47 deletions(-) diff --git a/tests/test_workspace_isolation.py b/tests/test_workspace_isolation.py index 68f7f8ec..7aa4ae09 100644 --- a/tests/test_workspace_isolation.py +++ b/tests/test_workspace_isolation.py @@ -22,6 +22,7 @@ import asyncio import time import os import shutil +import tempfile import numpy as np import pytest from pathlib import Path @@ -46,10 +47,10 @@ from lightrag.kg.shared_storage import ( # Test Configuration # ============================================================================= -# Test configuration is handled via pytest fixtures in conftest.py -# - Use CLI options: --keep-artifacts, --stress-test, --test-workers=N -# - Or environment variables: LIGHTRAG_KEEP_ARTIFACTS, LIGHTRAG_STRESS_TEST, LIGHTRAG_TEST_WORKERS -# Priority: CLI options > Environment variables > Default values +# Stress test configuration (enable via environment variable) +STRESS_TEST_MODE = os.getenv("LIGHTRAG_STRESS_TEST", "false").lower() == "true" +PARALLEL_WORKERS = int(os.getenv("LIGHTRAG_TEST_WORKERS", "3")) +KEEP_TEST_ARTIFACTS = os.getenv("LIGHTRAG_KEEP_ARTIFACTS", "false").lower() == "true" # ============================================================================= @@ -148,7 +149,6 @@ def _assert_no_timeline_overlap(timeline: List[Tuple[str, str]]) -> None: # ============================================================================= -@pytest.mark.offline @pytest.mark.asyncio async def test_pipeline_status_isolation(): """ @@ -203,9 +203,8 @@ async def test_pipeline_status_isolation(): # ============================================================================= -@pytest.mark.offline @pytest.mark.asyncio -async def test_lock_mechanism(stress_test_mode, parallel_workers): +async def test_lock_mechanism(): """ Test that the new keyed lock mechanism works correctly without deadlocks. Tests both parallel execution for different workspaces and serialization @@ -222,7 +221,7 @@ async def test_lock_mechanism(stress_test_mode, parallel_workers): print("\nTest 2.1: Different workspaces locks should be parallel") # Support stress testing with configurable number of workers - num_workers = parallel_workers if stress_test_mode else 3 + num_workers = PARALLEL_WORKERS if STRESS_TEST_MODE else 3 parallel_workload = [ (f"ws_{chr(97+i)}", f"ws_{chr(97+i)}", "test_namespace") for i in range(num_workers) @@ -273,7 +272,6 @@ async def test_lock_mechanism(stress_test_mode, parallel_workers): # ============================================================================= -@pytest.mark.offline @pytest.mark.asyncio async def test_backward_compatibility(): """ @@ -290,7 +288,7 @@ async def test_backward_compatibility(): print("\nTest 3.1: get_final_namespace with workspace=None") set_default_workspace("my_default_workspace") - final_ns = get_final_namespace("pipeline_status") + final_ns = get_final_namespace("pipeline_status", workspace=None) expected = "my_default_workspace:pipeline_status" assert final_ns == expected, f"Expected {expected}, got {final_ns}" @@ -347,7 +345,6 @@ async def test_backward_compatibility(): # ============================================================================= -@pytest.mark.offline @pytest.mark.asyncio async def test_multi_workspace_concurrency(): """ @@ -431,7 +428,6 @@ async def test_multi_workspace_concurrency(): # ============================================================================= -@pytest.mark.offline @pytest.mark.asyncio async def test_namespace_lock_reentrance(): """ @@ -505,7 +501,6 @@ async def test_namespace_lock_reentrance(): # ============================================================================= -@pytest.mark.offline @pytest.mark.asyncio async def test_different_namespace_lock_isolation(): """ @@ -545,7 +540,6 @@ async def test_different_namespace_lock_isolation(): # ============================================================================= -@pytest.mark.offline @pytest.mark.asyncio async def test_error_handling(): """ @@ -596,7 +590,6 @@ async def test_error_handling(): # ============================================================================= -@pytest.mark.offline @pytest.mark.asyncio async def test_update_flags_workspace_isolation(): """ @@ -726,7 +719,6 @@ async def test_update_flags_workspace_isolation(): # ============================================================================= -@pytest.mark.offline @pytest.mark.asyncio async def test_empty_workspace_standardization(): """ @@ -780,9 +772,8 @@ async def test_empty_workspace_standardization(): # ============================================================================= -@pytest.mark.offline @pytest.mark.asyncio -async def test_json_kv_storage_workspace_isolation(keep_test_artifacts): +async def test_json_kv_storage_workspace_isolation(): """ Integration test: Verify JsonKVStorage properly isolates data between workspaces. Creates two JsonKVStorage instances with different workspaces, writes different data, @@ -795,13 +786,8 @@ async def test_json_kv_storage_workspace_isolation(keep_test_artifacts): print("TEST 10: JsonKVStorage Workspace Isolation (Integration)") print("=" * 60) - # Create temporary test directory under project temp/ - test_dir = str( - Path(__file__).parent.parent / "temp/test_json_kv_storage_workspace_isolation" - ) - if os.path.exists(test_dir): - shutil.rmtree(test_dir) - os.makedirs(test_dir, exist_ok=True) + # Create temporary test directory + test_dir = tempfile.mkdtemp(prefix="lightrag_test_kv_") print(f"\n Using test directory: {test_dir}") try: @@ -862,9 +848,6 @@ async def test_json_kv_storage_workspace_isolation(keep_test_artifacts): } ) print(" Written to storage1: entity1, entity2") - # Persist data to disk - await storage1.index_done_callback() - print(" Persisted storage1 data to disk") # Write to storage2 await storage2.upsert( @@ -880,9 +863,6 @@ async def test_json_kv_storage_workspace_isolation(keep_test_artifacts): } ) print(" Written to storage2: entity1, entity2") - # Persist data to disk - await storage2.index_done_callback() - print(" Persisted storage2 data to disk") # Test 10.3: Read data from each storage and verify isolation print("\nTest 10.3: Read data and verify isolation") @@ -947,11 +927,11 @@ async def test_json_kv_storage_workspace_isolation(keep_test_artifacts): print(f" Workspace directories correctly created: {ws1_dir} and {ws2_dir}") finally: - # Cleanup test directory (unless keep_test_artifacts is set) - if os.path.exists(test_dir) and not keep_test_artifacts: + # Cleanup test directory (unless KEEP_TEST_ARTIFACTS is set) + if os.path.exists(test_dir) and not KEEP_TEST_ARTIFACTS: shutil.rmtree(test_dir) print(f"\n Cleaned up test directory: {test_dir}") - elif keep_test_artifacts: + elif KEEP_TEST_ARTIFACTS: print(f"\n Kept test directory for inspection: {test_dir}") @@ -960,9 +940,8 @@ async def test_json_kv_storage_workspace_isolation(keep_test_artifacts): # ============================================================================= -@pytest.mark.offline @pytest.mark.asyncio -async def test_lightrag_end_to_end_workspace_isolation(keep_test_artifacts): +async def test_lightrag_end_to_end_workspace_isolation(): """ End-to-end test: Create two LightRAG instances with different workspaces, insert different data, and verify file separation. @@ -975,11 +954,9 @@ async def test_lightrag_end_to_end_workspace_isolation(keep_test_artifacts): print("TEST 11: LightRAG End-to-End Workspace Isolation") print("=" * 60) - # Create temporary test directory under project temp/ - test_dir = str( - Path(__file__).parent.parent - / "temp/test_lightrag_end_to_end_workspace_isolation" - ) + # Create temporary test directory + # test_dir = tempfile.mkdtemp(prefix="lightrag_test_e2e_") + test_dir = str(Path(__file__).parent.parent / "temp/e2e_workspace_isolation") if os.path.exists(test_dir): shutil.rmtree(test_dir) os.makedirs(test_dir, exist_ok=True) @@ -995,7 +972,7 @@ async def test_lightrag_end_to_end_workspace_isolation(keep_test_artifacts): ) -> str: # Add coroutine switching to simulate async I/O and allow concurrent execution await asyncio.sleep(0) - + # Return different responses based on workspace # Format: entity<|#|>entity_name<|#|>entity_type<|#|>entity_description # Format: relation<|#|>source_entity<|#|>target_entity<|#|>keywords<|#|>description @@ -1078,10 +1055,11 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le print(" Starting concurrent insert operations...") start_time = time.time() await asyncio.gather( - rag1.ainsert(text_for_project_a), rag2.ainsert(text_for_project_b) + rag1.ainsert(text_for_project_a), + rag2.ainsert(text_for_project_b) ) elapsed_time = time.time() - start_time - + print(f" Inserted to project_a: {len(text_for_project_a)} chars (concurrent)") print(f" Inserted to project_b: {len(text_for_project_b)} chars (concurrent)") print(f" Total concurrent execution time: {elapsed_time:.3f}s") @@ -1185,9 +1163,9 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le print("\n ✓ Test complete - workspace isolation verified at E2E level") finally: - # Cleanup test directory (unless keep_test_artifacts is set) - if os.path.exists(test_dir) and not keep_test_artifacts: + # Cleanup test directory (unless KEEP_TEST_ARTIFACTS is set) + if os.path.exists(test_dir) and not KEEP_TEST_ARTIFACTS: shutil.rmtree(test_dir) print(f"\n Cleaned up test directory: {test_dir}") - elif keep_test_artifacts: + elif KEEP_TEST_ARTIFACTS: print(f"\n Kept test directory for inspection: {test_dir}")