Improve workspace isolation tests with better parallelism checks and cleanup

• Add finalize_share_data cleanup
• Refactor lock timing measurement
• Add timeline overlap validation
• Include purpose/scope documentation
• Fix tokenizer integration

(cherry picked from commit 21ad990e36)
This commit is contained in:
yangdx 2025-11-18 01:38:31 +08:00 committed by Raphaël MANSUY
parent 5febb88824
commit 7e7c86601e
2 changed files with 468 additions and 62 deletions

View file

@ -0,0 +1,265 @@
# Workspace Isolation Test Suite
## Overview
Comprehensive test coverage for LightRAG's workspace isolation feature, ensuring that different workspaces (projects) can coexist independently without data contamination or resource conflicts.
## Test Architecture
### Design Principles
1. **Concurrency-Based Assertions**: Instead of timing-based tests (which are flaky), we measure actual concurrent lock holders
2. **Timeline Validation**: Finite state machine validates proper sequential execution
3. **Performance Metrics**: Each test reports execution metrics for debugging and optimization
4. **Configurable Stress Testing**: Environment variables control test intensity
## Test Categories
### 1. Data Isolation Tests
**Tests:** 1, 4, 8, 9, 10
**Purpose:** Verify that data in one workspace doesn't leak into another
- **Test 1: Pipeline Status Isolation** - Core shared data structures remain separate
- **Test 4: Multi-Workspace Concurrency** - Concurrent operations don't interfere
- **Test 8: Update Flags Isolation** - Flag management respects workspace boundaries
- **Test 9: Empty Workspace Standardization** - Edge case handling for empty workspace strings
- **Test 10: JsonKVStorage Integration** - Storage layer properly isolates data
### 2. Lock Mechanism Tests
**Tests:** 2, 5, 6
**Purpose:** Validate that locking mechanisms allow parallelism across workspaces while enforcing serialization within workspaces
- **Test 2: Lock Mechanism** - Different workspaces run in parallel, same workspace serializes
- **Test 5: Re-entrance Protection** - Prevent deadlocks from re-entrant lock acquisition
- **Test 6: Namespace Lock Isolation** - Different namespaces within same workspace are independent
### 3. Backward Compatibility Tests
**Test:** 3
**Purpose:** Ensure legacy code without workspace parameters still functions correctly
- Default workspace fallback behavior
- Empty workspace handling
- None vs empty string normalization
### 4. Error Handling Tests
**Test:** 7
**Purpose:** Validate guardrails for invalid configurations
- Missing workspace validation
- Workspace normalization
- Edge case handling
### 5. End-to-End Integration Tests
**Test:** 11
**Purpose:** Validate complete LightRAG workflows maintain isolation
- Full document insertion pipeline
- File system separation
- Data content verification
## Running Tests
### Basic Usage
```bash
# Run all workspace isolation tests
pytest tests/test_workspace_isolation.py -v
# Run specific test
pytest tests/test_workspace_isolation.py::test_lock_mechanism -v
# Run with detailed output
pytest tests/test_workspace_isolation.py -v -s
```
### Environment Configuration
#### Stress Testing
Enable stress testing with configurable number of workers:
```bash
# Enable stress mode with default 3 workers
LIGHTRAG_STRESS_TEST=true pytest tests/test_workspace_isolation.py -v
# Custom number of workers (e.g., 10)
LIGHTRAG_STRESS_TEST=true LIGHTRAG_TEST_WORKERS=10 pytest tests/test_workspace_isolation.py -v
```
#### Keep Test Artifacts
Preserve temporary directories for manual inspection:
```bash
# Keep test artifacts (useful for debugging)
LIGHTRAG_KEEP_ARTIFACTS=true pytest tests/test_workspace_isolation.py -v
```
#### Combined Example
```bash
# Stress test with 20 workers and keep artifacts
LIGHTRAG_STRESS_TEST=true \
LIGHTRAG_TEST_WORKERS=20 \
LIGHTRAG_KEEP_ARTIFACTS=true \
pytest tests/test_workspace_isolation.py::test_lock_mechanism -v -s
```
### CI/CD Integration
```bash
# Recommended CI/CD command (no artifacts, default workers)
pytest tests/test_workspace_isolation.py -v --tb=short
```
## Test Implementation Details
### Helper Functions
#### `_measure_lock_parallelism`
Measures actual concurrency rather than wall-clock time.
**Returns:**
- `max_parallel`: Peak number of concurrent lock holders
- `timeline`: Ordered list of (task_name, event) tuples
- `metrics`: Dict with performance data (duration, concurrency, workers)
**Example:**
```python
workload = [
("task1", "workspace1", "namespace"),
("task2", "workspace2", "namespace"),
]
max_parallel, timeline, metrics = await _measure_lock_parallelism(workload)
# Assert on actual behavior, not timing
assert max_parallel >= 2 # Two different workspaces should run concurrently
```
#### `_assert_no_timeline_overlap`
Validates sequential execution using finite state machine.
**Validates:**
- No overlapping lock acquisitions
- Proper lock release ordering
- All locks properly released
**Example:**
```python
timeline = [
("task1", "start"),
("task1", "end"),
("task2", "start"),
("task2", "end"),
]
_assert_no_timeline_overlap(timeline) # Passes - no overlap
timeline_bad = [
("task1", "start"),
("task2", "start"), # ERROR: task2 started before task1 ended
("task1", "end"),
]
_assert_no_timeline_overlap(timeline_bad) # Raises AssertionError
```
## Configuration Variables
| Variable | Type | Default | Description |
|----------|------|---------|-------------|
| `LIGHTRAG_STRESS_TEST` | bool | `false` | Enable stress testing mode |
| `LIGHTRAG_TEST_WORKERS` | int | `3` | Number of parallel workers in stress mode |
| `LIGHTRAG_KEEP_ARTIFACTS` | bool | `false` | Keep temporary test directories |
## Performance Benchmarks
### Expected Performance (Reference System)
- **Test 1-9**: < 1s each
- **Test 10**: < 2s (includes file I/O)
- **Test 11**: < 5s (includes full RAG pipeline)
- **Total Suite**: < 15s
### Stress Test Performance
With `LIGHTRAG_TEST_WORKERS=10`:
- **Test 2 (Parallel)**: ~0.05s (10 workers, all concurrent)
- **Test 2 (Serial)**: ~0.10s (2 workers, serialized)
## Troubleshooting
### Common Issues
#### Flaky Test Failures
**Symptom:** Tests pass locally but fail in CI/CD
**Cause:** System under heavy load, timing-based assertions
**Solution:** Our tests use concurrency-based assertions, not timing. If failures persist, check the `timeline` output in error messages.
#### Resource Cleanup Errors
**Symptom:** "Directory not empty" or "Cannot remove directory"
**Cause:** Concurrent test execution or OS file locking
**Solution:** Run tests serially (`pytest -n 1`) or use `LIGHTRAG_KEEP_ARTIFACTS=true` to inspect state
#### Lock Timeout Errors
**Symptom:** "Lock acquisition timeout"
**Cause:** Deadlock or resource starvation
**Solution:** Check test output for deadlock patterns, review lock acquisition order
### Debug Tips
1. **Enable verbose output:**
```bash
pytest tests/test_workspace_isolation.py -v -s
```
2. **Run single test with artifacts:**
```bash
LIGHTRAG_KEEP_ARTIFACTS=true pytest tests/test_workspace_isolation.py::test_json_kv_storage_workspace_isolation -v -s
```
3. **Check performance metrics:**
Look for the "Performance:" lines in test output showing duration and concurrency.
4. **Inspect timeline on failure:**
Timeline data is included in assertion error messages.
## Contributing
### Adding New Tests
1. **Follow naming convention:** `test_<feature>_<aspect>`
2. **Add purpose/scope comments:** Explain what and why
3. **Use helper functions:** `_measure_lock_parallelism`, `_assert_no_timeline_overlap`
4. **Document assertions:** Explain expected behavior in assertions
5. **Update this README:** Add test to appropriate category
### Test Template
```python
@pytest.mark.asyncio
async def test_new_feature():
"""
Brief description of what this test validates.
"""
# Purpose: Why this test exists
# Scope: What functions/classes this tests
print("\n" + "=" * 60)
print("TEST N: Feature Name")
print("=" * 60)
# Test implementation
# ...
print("✅ PASSED: Feature Name")
print(f" Validation details")
```
## Related Documentation
- [Workspace Isolation Design Doc](../docs/LightRAG_concurrent_explain.md)
- [Project Intelligence](.clinerules/01-basic.md)
- [Memory Bank](../.memory-bank/)
## Test Coverage Matrix
| Component | Data Isolation | Lock Mechanism | Backward Compat | Error Handling | E2E |
|-----------|:--------------:|:--------------:|:---------------:|:--------------:|:---:|
| shared_storage | ✅ T1, T4 | ✅ T2, T5, T6 | ✅ T3 | ✅ T7 | ✅ T11 |
| update_flags | ✅ T8 | - | - | - | - |
| JsonKVStorage | ✅ T10 | - | - | - | ✅ T11 |
| LightRAG Core | - | - | - | - | ✅ T11 |
| Namespace | ✅ T9 | - | ✅ T3 | ✅ T7 | - |
**Legend:** T# = Test number
## Version History
- **v2.0** (2025-01-18): Added performance metrics, stress testing, configurable cleanup
- **v1.0** (Initial): Basic workspace isolation tests with timing-based assertions

View file

@ -26,12 +26,14 @@ import tempfile
import numpy as np import numpy as np
import pytest import pytest
from pathlib import Path from pathlib import Path
from typing import List, Tuple, Dict
from lightrag.kg.shared_storage import ( from lightrag.kg.shared_storage import (
get_final_namespace, get_final_namespace,
get_namespace_lock, get_namespace_lock,
get_default_workspace, get_default_workspace,
set_default_workspace, set_default_workspace,
initialize_share_data, initialize_share_data,
finalize_share_data,
initialize_pipeline_status, initialize_pipeline_status,
get_namespace_data, get_namespace_data,
set_all_update_flags, set_all_update_flags,
@ -41,6 +43,16 @@ from lightrag.kg.shared_storage import (
) )
# =============================================================================
# Test Configuration
# =============================================================================
# Stress test configuration (enable via environment variable)
STRESS_TEST_MODE = os.getenv("LIGHTRAG_STRESS_TEST", "false").lower() == "true"
PARALLEL_WORKERS = int(os.getenv("LIGHTRAG_TEST_WORKERS", "3"))
KEEP_TEST_ARTIFACTS = os.getenv("LIGHTRAG_KEEP_ARTIFACTS", "false").lower() == "true"
# ============================================================================= # =============================================================================
# Pytest Fixtures # Pytest Fixtures
# ============================================================================= # =============================================================================
@ -51,7 +63,85 @@ def setup_shared_data():
"""Initialize shared data before each test""" """Initialize shared data before each test"""
initialize_share_data() initialize_share_data()
yield yield
# Cleanup after test if needed finalize_share_data()
async def _measure_lock_parallelism(
workload: List[Tuple[str, str, str]], hold_time: float = 0.05
) -> Tuple[int, List[Tuple[str, str]], Dict[str, float]]:
"""Run lock acquisition workload and capture peak concurrency and timeline.
Args:
workload: List of (name, workspace, namespace) tuples
hold_time: How long each worker holds the lock (seconds)
Returns:
Tuple of (max_parallel, timeline, metrics) where:
- max_parallel: Peak number of concurrent lock holders
- timeline: List of (name, event) tuples tracking execution order
- metrics: Dict with performance metrics (total_duration, max_concurrency, etc.)
"""
running = 0
max_parallel = 0
timeline: List[Tuple[str, str]] = []
start_time = time.time()
async def worker(name: str, workspace: str, namespace: str) -> None:
nonlocal running, max_parallel
lock = get_namespace_lock(namespace, workspace)
async with lock:
running += 1
max_parallel = max(max_parallel, running)
timeline.append((name, "start"))
await asyncio.sleep(hold_time)
timeline.append((name, "end"))
running -= 1
await asyncio.gather(*(worker(*args) for args in workload))
metrics = {
"total_duration": time.time() - start_time,
"max_concurrency": max_parallel,
"avg_hold_time": hold_time,
"num_workers": len(workload),
}
return max_parallel, timeline, metrics
def _assert_no_timeline_overlap(timeline: List[Tuple[str, str]]) -> None:
"""Ensure that timeline events never overlap for sequential execution.
This function implements a finite state machine that validates:
- No overlapping lock acquisitions (only one task active at a time)
- Proper lock release order (task releases its own lock)
- All locks are properly released
Args:
timeline: List of (name, event) tuples where event is "start" or "end"
Raises:
AssertionError: If timeline shows overlapping execution or improper locking
"""
active_task = None
for name, event in timeline:
if event == "start":
if active_task is not None:
raise AssertionError(
f"Task '{name}' started before '{active_task}' released the lock"
)
active_task = name
else:
if active_task != name:
raise AssertionError(
f"Task '{name}' finished while '{active_task}' was expected to hold the lock"
)
active_task = None
if active_task is not None:
raise AssertionError(f"Task '{active_task}' did not release the lock properly")
# ============================================================================= # =============================================================================
@ -64,6 +154,8 @@ async def test_pipeline_status_isolation():
""" """
Test that pipeline status is isolated between different workspaces. Test that pipeline status is isolated between different workspaces.
""" """
# Purpose: Ensure pipeline_status shared data remains unique per workspace.
# Scope: initialize_pipeline_status and get_namespace_data interactions.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 1: Pipeline Status Isolation") print("TEST 1: Pipeline Status Isolation")
print("=" * 60) print("=" * 60)
@ -118,52 +210,53 @@ async def test_lock_mechanism():
Tests both parallel execution for different workspaces and serialization Tests both parallel execution for different workspaces and serialization
for the same workspace. for the same workspace.
""" """
# Purpose: Validate that keyed locks isolate workspaces while serializing
# requests within the same workspace. Scope: get_namespace_lock scheduling
# semantics for both cross-workspace and single-workspace cases.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 2: Lock Mechanism (No Deadlocks)") print("TEST 2: Lock Mechanism (No Deadlocks)")
print("=" * 60) print("=" * 60)
# Test 2.1: Different workspaces should run in parallel # Test 2.1: Different workspaces should run in parallel
print("\nTest 2.1: Different workspaces locks should be parallel") print("\nTest 2.1: Different workspaces locks should be parallel")
async def acquire_lock_timed(workspace, namespace, hold_time): # Support stress testing with configurable number of workers
"""Acquire a lock and hold it for specified time""" num_workers = PARALLEL_WORKERS if STRESS_TEST_MODE else 3
lock = get_namespace_lock(namespace, workspace) parallel_workload = [
start = time.time() (f"ws_{chr(97+i)}", f"ws_{chr(97+i)}", "test_namespace")
async with lock: for i in range(num_workers)
print(f" [{workspace}] acquired lock at {time.time() - start:.2f}s") ]
await asyncio.sleep(hold_time)
print(f" [{workspace}] releasing lock at {time.time() - start:.2f}s") max_parallel, timeline_parallel, metrics = await _measure_lock_parallelism(
parallel_workload
start = time.time() )
await asyncio.gather( assert max_parallel >= 2, (
acquire_lock_timed("ws_a", "test_namespace", 0.5), "Locks for distinct workspaces should overlap; "
acquire_lock_timed("ws_b", "test_namespace", 0.5), f"observed max concurrency: {max_parallel}, timeline={timeline_parallel}"
acquire_lock_timed("ws_c", "test_namespace", 0.5),
) )
elapsed = time.time() - start
# If locks are properly isolated by workspace, this should take ~0.5s (parallel)
# If they block each other, it would take ~1.5s (serial)
assert elapsed < 1.0, f"Locks blocked each other: {elapsed:.2f}s (expected < 1.0s)"
print("✅ PASSED: Lock Mechanism - Parallel (Different Workspaces)") print("✅ PASSED: Lock Mechanism - Parallel (Different Workspaces)")
print(f" Locks ran in parallel: {elapsed:.2f}s") print(f" Locks overlapped for different workspaces (max concurrency={max_parallel})")
print(f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} workers")
# Test 2.2: Same workspace should serialize # Test 2.2: Same workspace should serialize
print("\nTest 2.2: Same workspace locks should serialize") print("\nTest 2.2: Same workspace locks should serialize")
serial_workload = [
start = time.time() ("serial_run_1", "ws_same", "test_namespace"),
await asyncio.gather( ("serial_run_2", "ws_same", "test_namespace"),
acquire_lock_timed("ws_same", "test_namespace", 0.3), ]
acquire_lock_timed("ws_same", "test_namespace", 0.3), max_parallel_serial, timeline_serial, metrics_serial = await _measure_lock_parallelism(
serial_workload
) )
elapsed = time.time() - start assert max_parallel_serial == 1, (
"Same workspace locks should not overlap; "
# Same workspace should serialize, taking ~0.6s f"observed {max_parallel_serial} with timeline {timeline_serial}"
assert elapsed >= 0.5, f"Locks didn't serialize: {elapsed:.2f}s (expected >= 0.5s)" )
_assert_no_timeline_overlap(timeline_serial)
print("✅ PASSED: Lock Mechanism - Serial (Same Workspace)") print("✅ PASSED: Lock Mechanism - Serial (Same Workspace)")
print(f" Locks serialized correctly: {elapsed:.2f}s") print(" Same workspace operations executed sequentially with no overlap")
print(f" Performance: {metrics_serial['total_duration']:.3f}s for {metrics_serial['num_workers']} tasks")
# ============================================================================= # =============================================================================
@ -176,6 +269,9 @@ async def test_backward_compatibility():
""" """
Test that legacy code without workspace parameter still works correctly. Test that legacy code without workspace parameter still works correctly.
""" """
# Purpose: Validate backward-compatible defaults when workspace arguments
# are omitted. Scope: get_final_namespace, set/get_default_workspace and
# initialize_pipeline_status fallback behavior.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 3: Backward Compatibility") print("TEST 3: Backward Compatibility")
print("=" * 60) print("=" * 60)
@ -247,6 +343,9 @@ async def test_multi_workspace_concurrency():
Test that multiple workspaces can operate concurrently without interference. Test that multiple workspaces can operate concurrently without interference.
Simulates concurrent operations on different workspaces. Simulates concurrent operations on different workspaces.
""" """
# Purpose: Simulate concurrent workloads touching pipeline_status across
# workspaces. Scope: initialize_pipeline_status, get_namespace_lock, and
# shared dictionary mutation while ensuring isolation.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 4: Multi-Workspace Concurrency") print("TEST 4: Multi-Workspace Concurrency")
print("=" * 60) print("=" * 60)
@ -327,6 +426,9 @@ async def test_namespace_lock_reentrance():
Test that NamespaceLock prevents re-entrance in the same coroutine Test that NamespaceLock prevents re-entrance in the same coroutine
and allows concurrent use in different coroutines. and allows concurrent use in different coroutines.
""" """
# Purpose: Ensure NamespaceLock enforces single entry per coroutine while
# allowing concurrent reuse through ContextVar isolation. Scope: lock
# re-entrance checks and concurrent gather semantics.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 5: NamespaceLock Re-entrance Protection") print("TEST 5: NamespaceLock Re-entrance Protection")
print("=" * 60) print("=" * 60)
@ -396,37 +498,29 @@ async def test_different_namespace_lock_isolation():
""" """
Test that locks for different namespaces (same workspace) are independent. Test that locks for different namespaces (same workspace) are independent.
""" """
# Purpose: Confirm that namespace isolation is enforced even when workspace
# is the same. Scope: get_namespace_lock behavior when namespaces differ.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 6: Different Namespace Lock Isolation") print("TEST 6: Different Namespace Lock Isolation")
print("=" * 60) print("=" * 60)
print("\nTesting locks with same workspace but different namespaces") print("\nTesting locks with same workspace but different namespaces")
async def acquire_lock_timed(workspace, namespace, hold_time, name): workload = [
"""Acquire a lock and hold it for specified time""" ("ns_a", "same_ws", "namespace_a"),
lock = get_namespace_lock(namespace, workspace) ("ns_b", "same_ws", "namespace_b"),
start = time.time() ("ns_c", "same_ws", "namespace_c"),
async with lock: ]
print(f" [{name}] acquired lock at {time.time() - start:.2f}s") max_parallel, timeline, metrics = await _measure_lock_parallelism(workload)
await asyncio.sleep(hold_time)
print(f" [{name}] releasing lock at {time.time() - start:.2f}s")
# These should run in parallel (different namespaces) assert max_parallel >= 2, (
start = time.time() "Different namespaces within the same workspace should run concurrently; "
await asyncio.gather( f"observed max concurrency {max_parallel} with timeline {timeline}"
acquire_lock_timed("same_ws", "namespace_a", 0.5, "ns_a"),
acquire_lock_timed("same_ws", "namespace_b", 0.5, "ns_b"),
acquire_lock_timed("same_ws", "namespace_c", 0.5, "ns_c"),
) )
elapsed = time.time() - start
# If locks are properly isolated by namespace, this should take ~0.5s (parallel)
assert (
elapsed < 1.0
), f"Different namespace locks blocked each other: {elapsed:.2f}s (expected < 1.0s)"
print("✅ PASSED: Different Namespace Lock Isolation") print("✅ PASSED: Different Namespace Lock Isolation")
print(f" Different namespace locks ran in parallel: {elapsed:.2f}s") print(f" Different namespace locks ran in parallel (max concurrency={max_parallel})")
print(f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} namespaces")
# ============================================================================= # =============================================================================
@ -439,10 +533,18 @@ async def test_error_handling():
""" """
Test error handling for invalid workspace configurations. Test error handling for invalid workspace configurations.
""" """
# Purpose: Validate guardrails for workspace normalization and namespace
# derivation. Scope: set_default_workspace conversions and get_final_namespace
# failure paths when configuration is invalid.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 7: Error Handling") print("TEST 7: Error Handling")
print("=" * 60) print("=" * 60)
# Test 7.0: Missing default workspace should raise ValueError
print("\nTest 7.0: Missing workspace raises ValueError")
with pytest.raises(ValueError):
get_final_namespace("test_namespace", workspace=None)
# Test 7.1: set_default_workspace(None) converts to empty string # Test 7.1: set_default_workspace(None) converts to empty string
print("\nTest 7.1: set_default_workspace(None) converts to empty string") print("\nTest 7.1: set_default_workspace(None) converts to empty string")
@ -481,6 +583,9 @@ async def test_update_flags_workspace_isolation():
""" """
Test that update flags are properly isolated between workspaces. Test that update flags are properly isolated between workspaces.
""" """
# Purpose: Confirm update flag setters/readers respect workspace scoping.
# Scope: set_all_update_flags, clear_all_update_flags, get_all_update_flags_status,
# and get_update_flag interactions across namespaces.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 8: Update Flags Workspace Isolation") print("TEST 8: Update Flags Workspace Isolation")
print("=" * 60) print("=" * 60)
@ -576,6 +681,20 @@ async def test_update_flags_workspace_isolation():
assert ( assert (
len(workspace2_keys) == 0 len(workspace2_keys) == 0
), f"workspace2 keys should not be present, got {len(workspace2_keys)}" ), f"workspace2 keys should not be present, got {len(workspace2_keys)}"
for key, values in status1.items():
assert all(values), f"All flags in {key} should be True, got {values}"
# Workspace2 query should only surface workspace2 namespaces
status2 = await get_all_update_flags_status(workspace=workspace2)
expected_ws2_keys = {
f"{workspace2}:{test_namespace}",
f"{workspace2}:ns_c",
}
assert (
set(status2.keys()) == expected_ws2_keys
), f"Unexpected namespaces for workspace2: {status2.keys()}"
for key, values in status2.items():
assert all(values), f"All flags in {key} should be True, got {values}"
print("✅ PASSED: Update Flags - get_all_update_flags_status Filtering") print("✅ PASSED: Update Flags - get_all_update_flags_status Filtering")
print( print(
@ -593,6 +712,9 @@ async def test_empty_workspace_standardization():
""" """
Test that empty workspace is properly standardized to "" instead of "_". Test that empty workspace is properly standardized to "" instead of "_".
""" """
# Purpose: Verify namespace formatting when workspace is an empty string.
# Scope: get_final_namespace output and initialize_pipeline_status behavior
# between empty and non-empty workspaces.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 9: Empty Workspace Standardization") print("TEST 9: Empty Workspace Standardization")
print("=" * 60) print("=" * 60)
@ -645,6 +767,9 @@ async def test_json_kv_storage_workspace_isolation():
Creates two JsonKVStorage instances with different workspaces, writes different data, Creates two JsonKVStorage instances with different workspaces, writes different data,
and verifies they don't mix. and verifies they don't mix.
""" """
# Purpose: Ensure JsonKVStorage respects workspace-specific directories and data.
# Scope: storage initialization, upsert/get_by_id operations, and filesystem layout
# inside the temporary working directory.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 10: JsonKVStorage Workspace Isolation (Integration)") print("TEST 10: JsonKVStorage Workspace Isolation (Integration)")
print("=" * 60) print("=" * 60)
@ -790,10 +915,12 @@ async def test_json_kv_storage_workspace_isolation():
print(f" Workspace directories correctly created: {ws1_dir} and {ws2_dir}") print(f" Workspace directories correctly created: {ws1_dir} and {ws2_dir}")
finally: finally:
# Cleanup test directory # Cleanup test directory (unless KEEP_TEST_ARTIFACTS is set)
if os.path.exists(test_dir): if os.path.exists(test_dir) and not KEEP_TEST_ARTIFACTS:
shutil.rmtree(test_dir) shutil.rmtree(test_dir)
print(f"\n Cleaned up test directory: {test_dir}") print(f"\n Cleaned up test directory: {test_dir}")
elif KEEP_TEST_ARTIFACTS:
print(f"\n Kept test directory for inspection: {test_dir}")
# ============================================================================= # =============================================================================
@ -808,6 +935,9 @@ async def test_lightrag_end_to_end_workspace_isolation():
insert different data, and verify file separation. insert different data, and verify file separation.
Uses mock LLM and embedding functions to avoid external API calls. Uses mock LLM and embedding functions to avoid external API calls.
""" """
# Purpose: Validate that full LightRAG flows keep artifacts scoped per workspace.
# Scope: LightRAG.initialize_storages + ainsert side effects plus filesystem
# verification for generated storage files.
print("\n" + "=" * 60) print("\n" + "=" * 60)
print("TEST 11: LightRAG End-to-End Workspace Isolation") print("TEST 11: LightRAG End-to-End Workspace Isolation")
print("=" * 60) print("=" * 60)
@ -850,12 +980,21 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le
print("\nTest 11.1: Create two LightRAG instances with different workspaces") print("\nTest 11.1: Create two LightRAG instances with different workspaces")
from lightrag import LightRAG from lightrag import LightRAG
from lightrag.utils import EmbeddingFunc from lightrag.utils import EmbeddingFunc, Tokenizer
# Create different mock LLM functions for each workspace # Create different mock LLM functions for each workspace
mock_llm_func_a = create_mock_llm_func("project_a") mock_llm_func_a = create_mock_llm_func("project_a")
mock_llm_func_b = create_mock_llm_func("project_b") mock_llm_func_b = create_mock_llm_func("project_b")
class _SimpleTokenizerImpl:
def encode(self, content: str) -> list[int]:
return [ord(ch) for ch in content]
def decode(self, tokens: list[int]) -> str:
return "".join(chr(t) for t in tokens)
tokenizer = Tokenizer("mock-tokenizer", _SimpleTokenizerImpl())
rag1 = LightRAG( rag1 = LightRAG(
working_dir=test_dir, working_dir=test_dir,
workspace="project_a", workspace="project_a",
@ -865,6 +1004,7 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le
max_token_size=8192, max_token_size=8192,
func=mock_embedding_func, func=mock_embedding_func,
), ),
tokenizer=tokenizer,
) )
rag2 = LightRAG( rag2 = LightRAG(
@ -876,6 +1016,7 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le
max_token_size=8192, max_token_size=8192,
func=mock_embedding_func, func=mock_embedding_func,
), ),
tokenizer=tokenizer,
) )
# Initialize storages # Initialize storages
@ -998,9 +1139,9 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le
print("\n ✓ Test complete - workspace isolation verified at E2E level") print("\n ✓ Test complete - workspace isolation verified at E2E level")
finally: finally:
# Cleanup test directory # Cleanup test directory (unless KEEP_TEST_ARTIFACTS is set)
# if os.path.exists(test_dir): if os.path.exists(test_dir) and not KEEP_TEST_ARTIFACTS:
# shutil.rmtree(test_dir) shutil.rmtree(test_dir)
# print(f"\n Cleaned up test directory: {test_dir}") print(f"\n Cleaned up test directory: {test_dir}")
print("Keep test directory for manual inspection:") elif KEEP_TEST_ARTIFACTS:
print(f" {test_dir}") print(f"\n Kept test directory for inspection: {test_dir}")