test: add concurrent execution to workspace isolation test

• Add async sleep to mock functions
• Test concurrent ainsert operations
• Use asyncio.gather for parallel exec
• Measure concurrent execution time

(cherry picked from commit 6ae0c14438)
This commit is contained in:
yangdx 2025-11-18 10:17:34 +08:00 committed by Raphaël MANSUY
parent 9cf1629117
commit 60520e0188

View file

@ -22,6 +22,7 @@ import asyncio
import time
import os
import shutil
import tempfile
import numpy as np
import pytest
from pathlib import Path
@ -46,10 +47,10 @@ from lightrag.kg.shared_storage import (
# Test Configuration
# =============================================================================
# Test configuration is handled via pytest fixtures in conftest.py
# - Use CLI options: --keep-artifacts, --stress-test, --test-workers=N
# - Or environment variables: LIGHTRAG_KEEP_ARTIFACTS, LIGHTRAG_STRESS_TEST, LIGHTRAG_TEST_WORKERS
# Priority: CLI options > Environment variables > Default values
# Stress test configuration (enable via environment variable)
STRESS_TEST_MODE = os.getenv("LIGHTRAG_STRESS_TEST", "false").lower() == "true"
PARALLEL_WORKERS = int(os.getenv("LIGHTRAG_TEST_WORKERS", "3"))
KEEP_TEST_ARTIFACTS = os.getenv("LIGHTRAG_KEEP_ARTIFACTS", "false").lower() == "true"
# =============================================================================
@ -148,7 +149,6 @@ def _assert_no_timeline_overlap(timeline: List[Tuple[str, str]]) -> None:
# =============================================================================
@pytest.mark.offline
@pytest.mark.asyncio
async def test_pipeline_status_isolation():
"""
@ -203,9 +203,8 @@ async def test_pipeline_status_isolation():
# =============================================================================
@pytest.mark.offline
@pytest.mark.asyncio
async def test_lock_mechanism(stress_test_mode, parallel_workers):
async def test_lock_mechanism():
"""
Test that the new keyed lock mechanism works correctly without deadlocks.
Tests both parallel execution for different workspaces and serialization
@ -222,7 +221,7 @@ async def test_lock_mechanism(stress_test_mode, parallel_workers):
print("\nTest 2.1: Different workspaces locks should be parallel")
# Support stress testing with configurable number of workers
num_workers = parallel_workers if stress_test_mode else 3
num_workers = PARALLEL_WORKERS if STRESS_TEST_MODE else 3
parallel_workload = [
(f"ws_{chr(97+i)}", f"ws_{chr(97+i)}", "test_namespace")
for i in range(num_workers)
@ -273,7 +272,6 @@ async def test_lock_mechanism(stress_test_mode, parallel_workers):
# =============================================================================
@pytest.mark.offline
@pytest.mark.asyncio
async def test_backward_compatibility():
"""
@ -290,7 +288,7 @@ async def test_backward_compatibility():
print("\nTest 3.1: get_final_namespace with workspace=None")
set_default_workspace("my_default_workspace")
final_ns = get_final_namespace("pipeline_status")
final_ns = get_final_namespace("pipeline_status", workspace=None)
expected = "my_default_workspace:pipeline_status"
assert final_ns == expected, f"Expected {expected}, got {final_ns}"
@ -347,7 +345,6 @@ async def test_backward_compatibility():
# =============================================================================
@pytest.mark.offline
@pytest.mark.asyncio
async def test_multi_workspace_concurrency():
"""
@ -431,7 +428,6 @@ async def test_multi_workspace_concurrency():
# =============================================================================
@pytest.mark.offline
@pytest.mark.asyncio
async def test_namespace_lock_reentrance():
"""
@ -505,7 +501,6 @@ async def test_namespace_lock_reentrance():
# =============================================================================
@pytest.mark.offline
@pytest.mark.asyncio
async def test_different_namespace_lock_isolation():
"""
@ -545,7 +540,6 @@ async def test_different_namespace_lock_isolation():
# =============================================================================
@pytest.mark.offline
@pytest.mark.asyncio
async def test_error_handling():
"""
@ -596,7 +590,6 @@ async def test_error_handling():
# =============================================================================
@pytest.mark.offline
@pytest.mark.asyncio
async def test_update_flags_workspace_isolation():
"""
@ -726,7 +719,6 @@ async def test_update_flags_workspace_isolation():
# =============================================================================
@pytest.mark.offline
@pytest.mark.asyncio
async def test_empty_workspace_standardization():
"""
@ -780,9 +772,8 @@ async def test_empty_workspace_standardization():
# =============================================================================
@pytest.mark.offline
@pytest.mark.asyncio
async def test_json_kv_storage_workspace_isolation(keep_test_artifacts):
async def test_json_kv_storage_workspace_isolation():
"""
Integration test: Verify JsonKVStorage properly isolates data between workspaces.
Creates two JsonKVStorage instances with different workspaces, writes different data,
@ -795,13 +786,8 @@ async def test_json_kv_storage_workspace_isolation(keep_test_artifacts):
print("TEST 10: JsonKVStorage Workspace Isolation (Integration)")
print("=" * 60)
# Create temporary test directory under project temp/
test_dir = str(
Path(__file__).parent.parent / "temp/test_json_kv_storage_workspace_isolation"
)
if os.path.exists(test_dir):
shutil.rmtree(test_dir)
os.makedirs(test_dir, exist_ok=True)
# Create temporary test directory
test_dir = tempfile.mkdtemp(prefix="lightrag_test_kv_")
print(f"\n Using test directory: {test_dir}")
try:
@ -862,9 +848,6 @@ async def test_json_kv_storage_workspace_isolation(keep_test_artifacts):
}
)
print(" Written to storage1: entity1, entity2")
# Persist data to disk
await storage1.index_done_callback()
print(" Persisted storage1 data to disk")
# Write to storage2
await storage2.upsert(
@ -880,9 +863,6 @@ async def test_json_kv_storage_workspace_isolation(keep_test_artifacts):
}
)
print(" Written to storage2: entity1, entity2")
# Persist data to disk
await storage2.index_done_callback()
print(" Persisted storage2 data to disk")
# Test 10.3: Read data from each storage and verify isolation
print("\nTest 10.3: Read data and verify isolation")
@ -947,11 +927,11 @@ async def test_json_kv_storage_workspace_isolation(keep_test_artifacts):
print(f" Workspace directories correctly created: {ws1_dir} and {ws2_dir}")
finally:
# Cleanup test directory (unless keep_test_artifacts is set)
if os.path.exists(test_dir) and not keep_test_artifacts:
# Cleanup test directory (unless KEEP_TEST_ARTIFACTS is set)
if os.path.exists(test_dir) and not KEEP_TEST_ARTIFACTS:
shutil.rmtree(test_dir)
print(f"\n Cleaned up test directory: {test_dir}")
elif keep_test_artifacts:
elif KEEP_TEST_ARTIFACTS:
print(f"\n Kept test directory for inspection: {test_dir}")
@ -960,9 +940,8 @@ async def test_json_kv_storage_workspace_isolation(keep_test_artifacts):
# =============================================================================
@pytest.mark.offline
@pytest.mark.asyncio
async def test_lightrag_end_to_end_workspace_isolation(keep_test_artifacts):
async def test_lightrag_end_to_end_workspace_isolation():
"""
End-to-end test: Create two LightRAG instances with different workspaces,
insert different data, and verify file separation.
@ -975,11 +954,9 @@ async def test_lightrag_end_to_end_workspace_isolation(keep_test_artifacts):
print("TEST 11: LightRAG End-to-End Workspace Isolation")
print("=" * 60)
# Create temporary test directory under project temp/
test_dir = str(
Path(__file__).parent.parent
/ "temp/test_lightrag_end_to_end_workspace_isolation"
)
# Create temporary test directory
# test_dir = tempfile.mkdtemp(prefix="lightrag_test_e2e_")
test_dir = str(Path(__file__).parent.parent / "temp/e2e_workspace_isolation")
if os.path.exists(test_dir):
shutil.rmtree(test_dir)
os.makedirs(test_dir, exist_ok=True)
@ -995,7 +972,7 @@ async def test_lightrag_end_to_end_workspace_isolation(keep_test_artifacts):
) -> str:
# Add coroutine switching to simulate async I/O and allow concurrent execution
await asyncio.sleep(0)
# Return different responses based on workspace
# Format: entity<|#|>entity_name<|#|>entity_type<|#|>entity_description
# Format: relation<|#|>source_entity<|#|>target_entity<|#|>keywords<|#|>description
@ -1078,10 +1055,11 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le
print(" Starting concurrent insert operations...")
start_time = time.time()
await asyncio.gather(
rag1.ainsert(text_for_project_a), rag2.ainsert(text_for_project_b)
rag1.ainsert(text_for_project_a),
rag2.ainsert(text_for_project_b)
)
elapsed_time = time.time() - start_time
print(f" Inserted to project_a: {len(text_for_project_a)} chars (concurrent)")
print(f" Inserted to project_b: {len(text_for_project_b)} chars (concurrent)")
print(f" Total concurrent execution time: {elapsed_time:.3f}s")
@ -1185,9 +1163,9 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le
print("\n ✓ Test complete - workspace isolation verified at E2E level")
finally:
# Cleanup test directory (unless keep_test_artifacts is set)
if os.path.exists(test_dir) and not keep_test_artifacts:
# Cleanup test directory (unless KEEP_TEST_ARTIFACTS is set)
if os.path.exists(test_dir) and not KEEP_TEST_ARTIFACTS:
shutil.rmtree(test_dir)
print(f"\n Cleaned up test directory: {test_dir}")
elif keep_test_artifacts:
elif KEEP_TEST_ARTIFACTS:
print(f"\n Kept test directory for inspection: {test_dir}")