Enhance workspace isolation test with distinct mock data and persistence

• Use different mock LLM per workspace
• Add persistent test directory
• Create workspace-specific responses
• Skip cleanup for inspection

(cherry picked from commit 99262adaaa)
This commit is contained in:
yangdx 2025-11-18 00:38:31 +08:00 committed by Raphaël MANSUY
parent 4da291468d
commit fd76e0f7ce

View file

@ -26,14 +26,12 @@ import tempfile
import numpy as np import numpy as np
import pytest import pytest
from pathlib import Path from pathlib import Path
from typing import List, Tuple, Dict
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_final_namespace, get_final_namespace,
get_namespace_lock, get_namespace_lock,
get_default_workspace, get_default_workspace,
set_default_workspace, set_default_workspace,
initialize_share_data, initialize_share_data,
finalize_share_data,
initialize_pipeline_status, initialize_pipeline_status,
get_namespace_data, get_namespace_data,
set_all_update_flags, set_all_update_flags,
@ -43,16 +41,6 @@ from lightrag.kg.shared_storage import (
) )
# =============================================================================
# Test Configuration
# =============================================================================
# Stress test configuration (enable via environment variable)
STRESS_TEST_MODE = os.getenv("LIGHTRAG_STRESS_TEST", "false").lower() == "true"
PARALLEL_WORKERS = int(os.getenv("LIGHTRAG_TEST_WORKERS", "3"))
KEEP_TEST_ARTIFACTS = os.getenv("LIGHTRAG_KEEP_ARTIFACTS", "false").lower() == "true"
# ============================================================================= # =============================================================================
# Pytest Fixtures # Pytest Fixtures
# ============================================================================= # =============================================================================
@ -63,85 +51,7 @@ def setup_shared_data():
"""Initialize shared data before each test""" """Initialize shared data before each test"""
initialize_share_data() initialize_share_data()
yield yield
finalize_share_data() # Cleanup after test if needed
async def _measure_lock_parallelism(
workload: List[Tuple[str, str, str]], hold_time: float = 0.05
) -> Tuple[int, List[Tuple[str, str]], Dict[str, float]]:
"""Run lock acquisition workload and capture peak concurrency and timeline.
Args:
workload: List of (name, workspace, namespace) tuples
hold_time: How long each worker holds the lock (seconds)
Returns:
Tuple of (max_parallel, timeline, metrics) where:
- max_parallel: Peak number of concurrent lock holders
- timeline: List of (name, event) tuples tracking execution order
- metrics: Dict with performance metrics (total_duration, max_concurrency, etc.)
"""
running = 0
max_parallel = 0
timeline: List[Tuple[str, str]] = []
start_time = time.time()
async def worker(name: str, workspace: str, namespace: str) -> None:
nonlocal running, max_parallel
lock = get_namespace_lock(namespace, workspace)
async with lock:
running += 1
max_parallel = max(max_parallel, running)
timeline.append((name, "start"))
await asyncio.sleep(hold_time)
timeline.append((name, "end"))
running -= 1
await asyncio.gather(*(worker(*args) for args in workload))
metrics = {
"total_duration": time.time() - start_time,
"max_concurrency": max_parallel,
"avg_hold_time": hold_time,
"num_workers": len(workload),
}
return max_parallel, timeline, metrics
def _assert_no_timeline_overlap(timeline: List[Tuple[str, str]]) -> None:
"""Ensure that timeline events never overlap for sequential execution.
This function implements a finite state machine that validates:
- No overlapping lock acquisitions (only one task active at a time)
- Proper lock release order (task releases its own lock)
- All locks are properly released
Args:
timeline: List of (name, event) tuples where event is "start" or "end"
Raises:
AssertionError: If timeline shows overlapping execution or improper locking
"""
active_task = None
for name, event in timeline:
if event == "start":
if active_task is not None:
raise AssertionError(
f"Task '{name}' started before '{active_task}' released the lock"
)
active_task = name
else:
if active_task != name:
raise AssertionError(
f"Task '{name}' finished while '{active_task}' was expected to hold the lock"
)
active_task = None
if active_task is not None:
raise AssertionError(f"Task '{active_task}' did not release the lock properly")
# ============================================================================= # =============================================================================
@ -154,8 +64,6 @@ async def test_pipeline_status_isolation():
""" """
Test that pipeline status is isolated between different workspaces. Test that pipeline status is isolated between different workspaces.
""" """
# Purpose: Ensure pipeline_status shared data remains unique per workspace.
# Scope: initialize_pipeline_status and get_namespace_data interactions.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 1: Pipeline Status Isolation") print("TEST 1: Pipeline Status Isolation")
print("=" * 60) print("=" * 60)
@ -210,9 +118,6 @@ async def test_lock_mechanism():
Tests both parallel execution for different workspaces and serialization Tests both parallel execution for different workspaces and serialization
for the same workspace. for the same workspace.
""" """
# Purpose: Validate that keyed locks isolate workspaces while serializing
# requests within the same workspace. Scope: get_namespace_lock scheduling
# semantics for both cross-workspace and single-workspace cases.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 2: Lock Mechanism (No Deadlocks)") print("TEST 2: Lock Mechanism (No Deadlocks)")
print("=" * 60) print("=" * 60)
@ -220,51 +125,45 @@ async def test_lock_mechanism():
# Test 2.1: Different workspaces should run in parallel # Test 2.1: Different workspaces should run in parallel
print("\nTest 2.1: Different workspaces locks should be parallel") print("\nTest 2.1: Different workspaces locks should be parallel")
# Support stress testing with configurable number of workers async def acquire_lock_timed(workspace, namespace, hold_time):
num_workers = PARALLEL_WORKERS if STRESS_TEST_MODE else 3 """Acquire a lock and hold it for specified time"""
parallel_workload = [ lock = get_namespace_lock(namespace, workspace)
(f"ws_{chr(97+i)}", f"ws_{chr(97+i)}", "test_namespace") start = time.time()
for i in range(num_workers) async with lock:
] print(f" [{workspace}] acquired lock at {time.time() - start:.2f}s")
await asyncio.sleep(hold_time)
print(f" [{workspace}] releasing lock at {time.time() - start:.2f}s")
max_parallel, timeline_parallel, metrics = await _measure_lock_parallelism( start = time.time()
parallel_workload await asyncio.gather(
) acquire_lock_timed("ws_a", "test_namespace", 0.5),
assert max_parallel >= 2, ( acquire_lock_timed("ws_b", "test_namespace", 0.5),
"Locks for distinct workspaces should overlap; " acquire_lock_timed("ws_c", "test_namespace", 0.5),
f"observed max concurrency: {max_parallel}, timeline={timeline_parallel}"
) )
elapsed = time.time() - start
# If locks are properly isolated by workspace, this should take ~0.5s (parallel)
# If they block each other, it would take ~1.5s (serial)
assert elapsed < 1.0, f"Locks blocked each other: {elapsed:.2f}s (expected < 1.0s)"
print("✅ PASSED: Lock Mechanism - Parallel (Different Workspaces)") print("✅ PASSED: Lock Mechanism - Parallel (Different Workspaces)")
print( print(f" Locks ran in parallel: {elapsed:.2f}s")
f" Locks overlapped for different workspaces (max concurrency={max_parallel})"
)
print(
f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} workers"
)
# Test 2.2: Same workspace should serialize # Test 2.2: Same workspace should serialize
print("\nTest 2.2: Same workspace locks should serialize") print("\nTest 2.2: Same workspace locks should serialize")
serial_workload = [
("serial_run_1", "ws_same", "test_namespace"), start = time.time()
("serial_run_2", "ws_same", "test_namespace"), await asyncio.gather(
] acquire_lock_timed("ws_same", "test_namespace", 0.3),
( acquire_lock_timed("ws_same", "test_namespace", 0.3),
max_parallel_serial,
timeline_serial,
metrics_serial,
) = await _measure_lock_parallelism(serial_workload)
assert max_parallel_serial == 1, (
"Same workspace locks should not overlap; "
f"observed {max_parallel_serial} with timeline {timeline_serial}"
) )
_assert_no_timeline_overlap(timeline_serial) elapsed = time.time() - start
# Same workspace should serialize, taking ~0.6s
assert elapsed >= 0.5, f"Locks didn't serialize: {elapsed:.2f}s (expected >= 0.5s)"
print("✅ PASSED: Lock Mechanism - Serial (Same Workspace)") print("✅ PASSED: Lock Mechanism - Serial (Same Workspace)")
print(" Same workspace operations executed sequentially with no overlap") print(f" Locks serialized correctly: {elapsed:.2f}s")
print(
f" Performance: {metrics_serial['total_duration']:.3f}s for {metrics_serial['num_workers']} tasks"
)
# ============================================================================= # =============================================================================
@ -277,9 +176,6 @@ async def test_backward_compatibility():
""" """
Test that legacy code without workspace parameter still works correctly. Test that legacy code without workspace parameter still works correctly.
""" """
# Purpose: Validate backward-compatible defaults when workspace arguments
# are omitted. Scope: get_final_namespace, set/get_default_workspace and
# initialize_pipeline_status fallback behavior.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 3: Backward Compatibility") print("TEST 3: Backward Compatibility")
print("=" * 60) print("=" * 60)
@ -351,9 +247,6 @@ async def test_multi_workspace_concurrency():
Test that multiple workspaces can operate concurrently without interference. Test that multiple workspaces can operate concurrently without interference.
Simulates concurrent operations on different workspaces. Simulates concurrent operations on different workspaces.
""" """
# Purpose: Simulate concurrent workloads touching pipeline_status across
# workspaces. Scope: initialize_pipeline_status, get_namespace_lock, and
# shared dictionary mutation while ensuring isolation.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 4: Multi-Workspace Concurrency") print("TEST 4: Multi-Workspace Concurrency")
print("=" * 60) print("=" * 60)
@ -434,9 +327,6 @@ async def test_namespace_lock_reentrance():
Test that NamespaceLock prevents re-entrance in the same coroutine Test that NamespaceLock prevents re-entrance in the same coroutine
and allows concurrent use in different coroutines. and allows concurrent use in different coroutines.
""" """
# Purpose: Ensure NamespaceLock enforces single entry per coroutine while
# allowing concurrent reuse through ContextVar isolation. Scope: lock
# re-entrance checks and concurrent gather semantics.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 5: NamespaceLock Re-entrance Protection") print("TEST 5: NamespaceLock Re-entrance Protection")
print("=" * 60) print("=" * 60)
@ -506,33 +396,37 @@ async def test_different_namespace_lock_isolation():
""" """
Test that locks for different namespaces (same workspace) are independent. Test that locks for different namespaces (same workspace) are independent.
""" """
# Purpose: Confirm that namespace isolation is enforced even when workspace
# is the same. Scope: get_namespace_lock behavior when namespaces differ.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 6: Different Namespace Lock Isolation") print("TEST 6: Different Namespace Lock Isolation")
print("=" * 60) print("=" * 60)
print("\nTesting locks with same workspace but different namespaces") print("\nTesting locks with same workspace but different namespaces")
workload = [ async def acquire_lock_timed(workspace, namespace, hold_time, name):
("ns_a", "same_ws", "namespace_a"), """Acquire a lock and hold it for specified time"""
("ns_b", "same_ws", "namespace_b"), lock = get_namespace_lock(namespace, workspace)
("ns_c", "same_ws", "namespace_c"), start = time.time()
] async with lock:
max_parallel, timeline, metrics = await _measure_lock_parallelism(workload) print(f" [{name}] acquired lock at {time.time() - start:.2f}s")
await asyncio.sleep(hold_time)
print(f" [{name}] releasing lock at {time.time() - start:.2f}s")
assert max_parallel >= 2, ( # These should run in parallel (different namespaces)
"Different namespaces within the same workspace should run concurrently; " start = time.time()
f"observed max concurrency {max_parallel} with timeline {timeline}" await asyncio.gather(
acquire_lock_timed("same_ws", "namespace_a", 0.5, "ns_a"),
acquire_lock_timed("same_ws", "namespace_b", 0.5, "ns_b"),
acquire_lock_timed("same_ws", "namespace_c", 0.5, "ns_c"),
) )
elapsed = time.time() - start
# If locks are properly isolated by namespace, this should take ~0.5s (parallel)
assert (
elapsed < 1.0
), f"Different namespace locks blocked each other: {elapsed:.2f}s (expected < 1.0s)"
print("✅ PASSED: Different Namespace Lock Isolation") print("✅ PASSED: Different Namespace Lock Isolation")
print( print(f" Different namespace locks ran in parallel: {elapsed:.2f}s")
f" Different namespace locks ran in parallel (max concurrency={max_parallel})"
)
print(
f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} namespaces"
)
# ============================================================================= # =============================================================================
@ -545,18 +439,10 @@ async def test_error_handling():
""" """
Test error handling for invalid workspace configurations. Test error handling for invalid workspace configurations.
""" """
# Purpose: Validate guardrails for workspace normalization and namespace
# derivation. Scope: set_default_workspace conversions and get_final_namespace
# failure paths when configuration is invalid.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 7: Error Handling") print("TEST 7: Error Handling")
print("=" * 60) print("=" * 60)
# Test 7.0: Missing default workspace should raise ValueError
print("\nTest 7.0: Missing workspace raises ValueError")
with pytest.raises(ValueError):
get_final_namespace("test_namespace", workspace=None)
# Test 7.1: set_default_workspace(None) converts to empty string # Test 7.1: set_default_workspace(None) converts to empty string
print("\nTest 7.1: set_default_workspace(None) converts to empty string") print("\nTest 7.1: set_default_workspace(None) converts to empty string")
@ -595,9 +481,6 @@ async def test_update_flags_workspace_isolation():
""" """
Test that update flags are properly isolated between workspaces. Test that update flags are properly isolated between workspaces.
""" """
# Purpose: Confirm update flag setters/readers respect workspace scoping.
# Scope: set_all_update_flags, clear_all_update_flags, get_all_update_flags_status,
# and get_update_flag interactions across namespaces.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 8: Update Flags Workspace Isolation") print("TEST 8: Update Flags Workspace Isolation")
print("=" * 60) print("=" * 60)
@ -693,20 +576,6 @@ async def test_update_flags_workspace_isolation():
assert ( assert (
len(workspace2_keys) == 0 len(workspace2_keys) == 0
), f"workspace2 keys should not be present, got {len(workspace2_keys)}" ), f"workspace2 keys should not be present, got {len(workspace2_keys)}"
for key, values in status1.items():
assert all(values), f"All flags in {key} should be True, got {values}"
# Workspace2 query should only surface workspace2 namespaces
status2 = await get_all_update_flags_status(workspace=workspace2)
expected_ws2_keys = {
f"{workspace2}:{test_namespace}",
f"{workspace2}:ns_c",
}
assert (
set(status2.keys()) == expected_ws2_keys
), f"Unexpected namespaces for workspace2: {status2.keys()}"
for key, values in status2.items():
assert all(values), f"All flags in {key} should be True, got {values}"
print("✅ PASSED: Update Flags - get_all_update_flags_status Filtering") print("✅ PASSED: Update Flags - get_all_update_flags_status Filtering")
print( print(
@ -724,9 +593,6 @@ async def test_empty_workspace_standardization():
""" """
Test that empty workspace is properly standardized to "" instead of "_". Test that empty workspace is properly standardized to "" instead of "_".
""" """
# Purpose: Verify namespace formatting when workspace is an empty string.
# Scope: get_final_namespace output and initialize_pipeline_status behavior
# between empty and non-empty workspaces.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 9: Empty Workspace Standardization") print("TEST 9: Empty Workspace Standardization")
print("=" * 60) print("=" * 60)
@ -779,9 +645,6 @@ async def test_json_kv_storage_workspace_isolation():
Creates two JsonKVStorage instances with different workspaces, writes different data, Creates two JsonKVStorage instances with different workspaces, writes different data,
and verifies they don't mix. and verifies they don't mix.
""" """
# Purpose: Ensure JsonKVStorage respects workspace-specific directories and data.
# Scope: storage initialization, upsert/get_by_id operations, and filesystem layout
# inside the temporary working directory.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 10: JsonKVStorage Workspace Isolation (Integration)") print("TEST 10: JsonKVStorage Workspace Isolation (Integration)")
print("=" * 60) print("=" * 60)
@ -927,12 +790,10 @@ async def test_json_kv_storage_workspace_isolation():
print(f" Workspace directories correctly created: {ws1_dir} and {ws2_dir}") print(f" Workspace directories correctly created: {ws1_dir} and {ws2_dir}")
finally: finally:
# Cleanup test directory (unless KEEP_TEST_ARTIFACTS is set) # Cleanup test directory
if os.path.exists(test_dir) and not KEEP_TEST_ARTIFACTS: if os.path.exists(test_dir):
shutil.rmtree(test_dir) shutil.rmtree(test_dir)
print(f"\n Cleaned up test directory: {test_dir}") print(f"\n Cleaned up test directory: {test_dir}")
elif KEEP_TEST_ARTIFACTS:
print(f"\n Kept test directory for inspection: {test_dir}")
# ============================================================================= # =============================================================================
@ -947,9 +808,6 @@ async def test_lightrag_end_to_end_workspace_isolation():
insert different data, and verify file separation. insert different data, and verify file separation.
Uses mock LLM and embedding functions to avoid external API calls. Uses mock LLM and embedding functions to avoid external API calls.
""" """
# Purpose: Validate that full LightRAG flows keep artifacts scoped per workspace.
# Scope: LightRAG.initialize_storages + ainsert side effects plus filesystem
# verification for generated storage files.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 11: LightRAG End-to-End Workspace Isolation") print("TEST 11: LightRAG End-to-End Workspace Isolation")
print("=" * 60) print("=" * 60)
@ -966,13 +824,9 @@ async def test_lightrag_end_to_end_workspace_isolation():
# Factory function to create different mock LLM functions for each workspace # Factory function to create different mock LLM functions for each workspace
def create_mock_llm_func(workspace_name): def create_mock_llm_func(workspace_name):
"""Create a mock LLM function that returns different content based on workspace""" """Create a mock LLM function that returns different content based on workspace"""
async def mock_llm_func( async def mock_llm_func(
prompt, system_prompt=None, history_messages=[], **kwargs prompt, system_prompt=None, history_messages=[], **kwargs
) -> str: ) -> str:
# Add coroutine switching to simulate async I/O and allow concurrent execution
await asyncio.sleep(0)
# Return different responses based on workspace # Return different responses based on workspace
# Format: entity<|#|>entity_name<|#|>entity_type<|#|>entity_description # Format: entity<|#|>entity_name<|#|>entity_type<|#|>entity_description
# Format: relation<|#|>source_entity<|#|>target_entity<|#|>keywords<|#|>description # Format: relation<|#|>source_entity<|#|>target_entity<|#|>keywords<|#|>description
@ -986,34 +840,22 @@ relation<|#|>Machine Learning<|#|>Artificial Intelligence<|#|>subset, related fi
entity<|#|>Neural Networks<|#|>concept<|#|>Neural Networks are computing systems inspired by biological neural networks. entity<|#|>Neural Networks<|#|>concept<|#|>Neural Networks are computing systems inspired by biological neural networks.
relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Learning uses multiple layers of Neural Networks to learn representations. relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Learning uses multiple layers of Neural Networks to learn representations.
<|COMPLETE|>""" <|COMPLETE|>"""
return mock_llm_func return mock_llm_func
# Mock embedding function # Mock embedding function
async def mock_embedding_func(texts: list[str]) -> np.ndarray: async def mock_embedding_func(texts: list[str]) -> np.ndarray:
# Add coroutine switching to simulate async I/O and allow concurrent execution
await asyncio.sleep(0)
return np.random.rand(len(texts), 384) # 384-dimensional vectors return np.random.rand(len(texts), 384) # 384-dimensional vectors
# Test 11.1: Create two LightRAG instances with different workspaces # Test 11.1: Create two LightRAG instances with different workspaces
print("\nTest 11.1: Create two LightRAG instances with different workspaces") print("\nTest 11.1: Create two LightRAG instances with different workspaces")
from lightrag import LightRAG from lightrag import LightRAG
from lightrag.utils import EmbeddingFunc, Tokenizer from lightrag.utils import EmbeddingFunc
# Create different mock LLM functions for each workspace # Create different mock LLM functions for each workspace
mock_llm_func_a = create_mock_llm_func("project_a") mock_llm_func_a = create_mock_llm_func("project_a")
mock_llm_func_b = create_mock_llm_func("project_b") mock_llm_func_b = create_mock_llm_func("project_b")
class _SimpleTokenizerImpl:
def encode(self, content: str) -> list[int]:
return [ord(ch) for ch in content]
def decode(self, tokens: list[int]) -> str:
return "".join(chr(t) for t in tokens)
tokenizer = Tokenizer("mock-tokenizer", _SimpleTokenizerImpl())
rag1 = LightRAG( rag1 = LightRAG(
working_dir=test_dir, working_dir=test_dir,
workspace="project_a", workspace="project_a",
@ -1023,7 +865,6 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le
max_token_size=8192, max_token_size=8192,
func=mock_embedding_func, func=mock_embedding_func,
), ),
tokenizer=tokenizer,
) )
rag2 = LightRAG( rag2 = LightRAG(
@ -1035,7 +876,6 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le
max_token_size=8192, max_token_size=8192,
func=mock_embedding_func, func=mock_embedding_func,
), ),
tokenizer=tokenizer,
) )
# Initialize storages # Initialize storages
@ -1045,24 +885,19 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le
print(" RAG1 created: workspace=project_a") print(" RAG1 created: workspace=project_a")
print(" RAG2 created: workspace=project_b") print(" RAG2 created: workspace=project_b")
# Test 11.2: Insert different data to each RAG instance (CONCURRENTLY) # Test 11.2: Insert different data to each RAG instance
print("\nTest 11.2: Insert different data to each RAG instance (concurrently)") print("\nTest 11.2: Insert different data to each RAG instance")
text_for_project_a = "This document is about Artificial Intelligence and Machine Learning. AI is transforming the world." text_for_project_a = "This document is about Artificial Intelligence and Machine Learning. AI is transforming the world."
text_for_project_b = "This document is about Deep Learning and Neural Networks. Deep learning uses multiple layers." text_for_project_b = "This document is about Deep Learning and Neural Networks. Deep learning uses multiple layers."
# Insert to both projects concurrently to test workspace isolation under concurrent load # Insert to project_a
print(" Starting concurrent insert operations...") await rag1.ainsert(text_for_project_a)
start_time = time.time() print(f" Inserted to project_a: {len(text_for_project_a)} chars")
await asyncio.gather(
rag1.ainsert(text_for_project_a), # Insert to project_b
rag2.ainsert(text_for_project_b) await rag2.ainsert(text_for_project_b)
) print(f" Inserted to project_b: {len(text_for_project_b)} chars")
elapsed_time = time.time() - start_time
print(f" Inserted to project_a: {len(text_for_project_a)} chars (concurrent)")
print(f" Inserted to project_b: {len(text_for_project_b)} chars (concurrent)")
print(f" Total concurrent execution time: {elapsed_time:.3f}s")
# Test 11.3: Verify file structure # Test 11.3: Verify file structure
print("\nTest 11.3: Verify workspace directory structure") print("\nTest 11.3: Verify workspace directory structure")
@ -1163,9 +998,9 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le
print("\n ✓ Test complete - workspace isolation verified at E2E level") print("\n ✓ Test complete - workspace isolation verified at E2E level")
finally: finally:
# Cleanup test directory (unless KEEP_TEST_ARTIFACTS is set) # Cleanup test directory
if os.path.exists(test_dir) and not KEEP_TEST_ARTIFACTS: # if os.path.exists(test_dir):
shutil.rmtree(test_dir) # shutil.rmtree(test_dir)
print(f"\n Cleaned up test directory: {test_dir}") # print(f"\n Cleaned up test directory: {test_dir}")
elif KEEP_TEST_ARTIFACTS: print("Keep test directory for manual inspection:")
print(f"\n Kept test directory for inspection: {test_dir}") print(f" {test_dir}")