Improve workspace isolation tests with better parallelism checks and cleanup
• Add finalize_share_data cleanup • Refactor lock timing measurement • Add timeline overlap validation • Include purpose/scope documentation • Fix tokenizer integration
This commit is contained in:
parent
5da82bb096
commit
21ad990e36
2 changed files with 468 additions and 62 deletions
265
tests/README_WORKSPACE_ISOLATION_TESTS.md
Normal file
265
tests/README_WORKSPACE_ISOLATION_TESTS.md
Normal file
|
|
@ -0,0 +1,265 @@
|
|||
# Workspace Isolation Test Suite
|
||||
|
||||
## Overview
|
||||
Comprehensive test coverage for LightRAG's workspace isolation feature, ensuring that different workspaces (projects) can coexist independently without data contamination or resource conflicts.
|
||||
|
||||
## Test Architecture
|
||||
|
||||
### Design Principles
|
||||
1. **Concurrency-Based Assertions**: Instead of timing-based tests (which are flaky), we measure actual concurrent lock holders
|
||||
2. **Timeline Validation**: Finite state machine validates proper sequential execution
|
||||
3. **Performance Metrics**: Each test reports execution metrics for debugging and optimization
|
||||
4. **Configurable Stress Testing**: Environment variables control test intensity
|
||||
|
||||
## Test Categories
|
||||
|
||||
### 1. Data Isolation Tests
|
||||
**Tests:** 1, 4, 8, 9, 10
|
||||
**Purpose:** Verify that data in one workspace doesn't leak into another
|
||||
|
||||
- **Test 1: Pipeline Status Isolation** - Core shared data structures remain separate
|
||||
- **Test 4: Multi-Workspace Concurrency** - Concurrent operations don't interfere
|
||||
- **Test 8: Update Flags Isolation** - Flag management respects workspace boundaries
|
||||
- **Test 9: Empty Workspace Standardization** - Edge case handling for empty workspace strings
|
||||
- **Test 10: JsonKVStorage Integration** - Storage layer properly isolates data
|
||||
|
||||
### 2. Lock Mechanism Tests
|
||||
**Tests:** 2, 5, 6
|
||||
**Purpose:** Validate that locking mechanisms allow parallelism across workspaces while enforcing serialization within workspaces
|
||||
|
||||
- **Test 2: Lock Mechanism** - Different workspaces run in parallel, same workspace serializes
|
||||
- **Test 5: Re-entrance Protection** - Prevent deadlocks from re-entrant lock acquisition
|
||||
- **Test 6: Namespace Lock Isolation** - Different namespaces within same workspace are independent
|
||||
|
||||
### 3. Backward Compatibility Tests
|
||||
**Test:** 3
|
||||
**Purpose:** Ensure legacy code without workspace parameters still functions correctly
|
||||
|
||||
- Default workspace fallback behavior
|
||||
- Empty workspace handling
|
||||
- None vs empty string normalization
|
||||
|
||||
### 4. Error Handling Tests
|
||||
**Test:** 7
|
||||
**Purpose:** Validate guardrails for invalid configurations
|
||||
|
||||
- Missing workspace validation
|
||||
- Workspace normalization
|
||||
- Edge case handling
|
||||
|
||||
### 5. End-to-End Integration Tests
|
||||
**Test:** 11
|
||||
**Purpose:** Validate complete LightRAG workflows maintain isolation
|
||||
|
||||
- Full document insertion pipeline
|
||||
- File system separation
|
||||
- Data content verification
|
||||
|
||||
## Running Tests
|
||||
|
||||
### Basic Usage
|
||||
```bash
|
||||
# Run all workspace isolation tests
|
||||
pytest tests/test_workspace_isolation.py -v
|
||||
|
||||
# Run specific test
|
||||
pytest tests/test_workspace_isolation.py::test_lock_mechanism -v
|
||||
|
||||
# Run with detailed output
|
||||
pytest tests/test_workspace_isolation.py -v -s
|
||||
```
|
||||
|
||||
### Environment Configuration
|
||||
|
||||
#### Stress Testing
|
||||
Enable stress testing with configurable number of workers:
|
||||
```bash
|
||||
# Enable stress mode with default 3 workers
|
||||
LIGHTRAG_STRESS_TEST=true pytest tests/test_workspace_isolation.py -v
|
||||
|
||||
# Custom number of workers (e.g., 10)
|
||||
LIGHTRAG_STRESS_TEST=true LIGHTRAG_TEST_WORKERS=10 pytest tests/test_workspace_isolation.py -v
|
||||
```
|
||||
|
||||
#### Keep Test Artifacts
|
||||
Preserve temporary directories for manual inspection:
|
||||
```bash
|
||||
# Keep test artifacts (useful for debugging)
|
||||
LIGHTRAG_KEEP_ARTIFACTS=true pytest tests/test_workspace_isolation.py -v
|
||||
```
|
||||
|
||||
#### Combined Example
|
||||
```bash
|
||||
# Stress test with 20 workers and keep artifacts
|
||||
LIGHTRAG_STRESS_TEST=true \
|
||||
LIGHTRAG_TEST_WORKERS=20 \
|
||||
LIGHTRAG_KEEP_ARTIFACTS=true \
|
||||
pytest tests/test_workspace_isolation.py::test_lock_mechanism -v -s
|
||||
```
|
||||
|
||||
### CI/CD Integration
|
||||
```bash
|
||||
# Recommended CI/CD command (no artifacts, default workers)
|
||||
pytest tests/test_workspace_isolation.py -v --tb=short
|
||||
```
|
||||
|
||||
## Test Implementation Details
|
||||
|
||||
### Helper Functions
|
||||
|
||||
#### `_measure_lock_parallelism`
|
||||
Measures actual concurrency rather than wall-clock time.
|
||||
|
||||
**Returns:**
|
||||
- `max_parallel`: Peak number of concurrent lock holders
|
||||
- `timeline`: Ordered list of (task_name, event) tuples
|
||||
- `metrics`: Dict with performance data (duration, concurrency, workers)
|
||||
|
||||
**Example:**
|
||||
```python
|
||||
workload = [
|
||||
("task1", "workspace1", "namespace"),
|
||||
("task2", "workspace2", "namespace"),
|
||||
]
|
||||
max_parallel, timeline, metrics = await _measure_lock_parallelism(workload)
|
||||
|
||||
# Assert on actual behavior, not timing
|
||||
assert max_parallel >= 2 # Two different workspaces should run concurrently
|
||||
```
|
||||
|
||||
#### `_assert_no_timeline_overlap`
|
||||
Validates sequential execution using finite state machine.
|
||||
|
||||
**Validates:**
|
||||
- No overlapping lock acquisitions
|
||||
- Proper lock release ordering
|
||||
- All locks properly released
|
||||
|
||||
**Example:**
|
||||
```python
|
||||
timeline = [
|
||||
("task1", "start"),
|
||||
("task1", "end"),
|
||||
("task2", "start"),
|
||||
("task2", "end"),
|
||||
]
|
||||
_assert_no_timeline_overlap(timeline) # Passes - no overlap
|
||||
|
||||
timeline_bad = [
|
||||
("task1", "start"),
|
||||
("task2", "start"), # ERROR: task2 started before task1 ended
|
||||
("task1", "end"),
|
||||
]
|
||||
_assert_no_timeline_overlap(timeline_bad) # Raises AssertionError
|
||||
```
|
||||
|
||||
## Configuration Variables
|
||||
|
||||
| Variable | Type | Default | Description |
|
||||
|----------|------|---------|-------------|
|
||||
| `LIGHTRAG_STRESS_TEST` | bool | `false` | Enable stress testing mode |
|
||||
| `LIGHTRAG_TEST_WORKERS` | int | `3` | Number of parallel workers in stress mode |
|
||||
| `LIGHTRAG_KEEP_ARTIFACTS` | bool | `false` | Keep temporary test directories |
|
||||
|
||||
## Performance Benchmarks
|
||||
|
||||
### Expected Performance (Reference System)
|
||||
- **Test 1-9**: < 1s each
|
||||
- **Test 10**: < 2s (includes file I/O)
|
||||
- **Test 11**: < 5s (includes full RAG pipeline)
|
||||
- **Total Suite**: < 15s
|
||||
|
||||
### Stress Test Performance
|
||||
With `LIGHTRAG_TEST_WORKERS=10`:
|
||||
- **Test 2 (Parallel)**: ~0.05s (10 workers, all concurrent)
|
||||
- **Test 2 (Serial)**: ~0.10s (2 workers, serialized)
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Common Issues
|
||||
|
||||
#### Flaky Test Failures
|
||||
**Symptom:** Tests pass locally but fail in CI/CD
|
||||
**Cause:** System under heavy load, timing-based assertions
|
||||
**Solution:** Our tests use concurrency-based assertions, not timing. If failures persist, check the `timeline` output in error messages.
|
||||
|
||||
#### Resource Cleanup Errors
|
||||
**Symptom:** "Directory not empty" or "Cannot remove directory"
|
||||
**Cause:** Concurrent test execution or OS file locking
|
||||
**Solution:** Run tests serially (`pytest -n 1`) or use `LIGHTRAG_KEEP_ARTIFACTS=true` to inspect state
|
||||
|
||||
#### Lock Timeout Errors
|
||||
**Symptom:** "Lock acquisition timeout"
|
||||
**Cause:** Deadlock or resource starvation
|
||||
**Solution:** Check test output for deadlock patterns, review lock acquisition order
|
||||
|
||||
### Debug Tips
|
||||
|
||||
1. **Enable verbose output:**
|
||||
```bash
|
||||
pytest tests/test_workspace_isolation.py -v -s
|
||||
```
|
||||
|
||||
2. **Run single test with artifacts:**
|
||||
```bash
|
||||
LIGHTRAG_KEEP_ARTIFACTS=true pytest tests/test_workspace_isolation.py::test_json_kv_storage_workspace_isolation -v -s
|
||||
```
|
||||
|
||||
3. **Check performance metrics:**
|
||||
Look for the "Performance:" lines in test output showing duration and concurrency.
|
||||
|
||||
4. **Inspect timeline on failure:**
|
||||
Timeline data is included in assertion error messages.
|
||||
|
||||
## Contributing
|
||||
|
||||
### Adding New Tests
|
||||
|
||||
1. **Follow naming convention:** `test_<feature>_<aspect>`
|
||||
2. **Add purpose/scope comments:** Explain what and why
|
||||
3. **Use helper functions:** `_measure_lock_parallelism`, `_assert_no_timeline_overlap`
|
||||
4. **Document assertions:** Explain expected behavior in assertions
|
||||
5. **Update this README:** Add test to appropriate category
|
||||
|
||||
### Test Template
|
||||
```python
|
||||
@pytest.mark.asyncio
|
||||
async def test_new_feature():
|
||||
"""
|
||||
Brief description of what this test validates.
|
||||
"""
|
||||
# Purpose: Why this test exists
|
||||
# Scope: What functions/classes this tests
|
||||
print("\n" + "=" * 60)
|
||||
print("TEST N: Feature Name")
|
||||
print("=" * 60)
|
||||
|
||||
# Test implementation
|
||||
# ...
|
||||
|
||||
print("✅ PASSED: Feature Name")
|
||||
print(f" Validation details")
|
||||
```
|
||||
|
||||
## Related Documentation
|
||||
|
||||
- [Workspace Isolation Design Doc](../docs/LightRAG_concurrent_explain.md)
|
||||
- [Project Intelligence](.clinerules/01-basic.md)
|
||||
- [Memory Bank](../.memory-bank/)
|
||||
|
||||
## Test Coverage Matrix
|
||||
|
||||
| Component | Data Isolation | Lock Mechanism | Backward Compat | Error Handling | E2E |
|
||||
|-----------|:--------------:|:--------------:|:---------------:|:--------------:|:---:|
|
||||
| shared_storage | ✅ T1, T4 | ✅ T2, T5, T6 | ✅ T3 | ✅ T7 | ✅ T11 |
|
||||
| update_flags | ✅ T8 | - | - | - | - |
|
||||
| JsonKVStorage | ✅ T10 | - | - | - | ✅ T11 |
|
||||
| LightRAG Core | - | - | - | - | ✅ T11 |
|
||||
| Namespace | ✅ T9 | - | ✅ T3 | ✅ T7 | - |
|
||||
|
||||
**Legend:** T# = Test number
|
||||
|
||||
## Version History
|
||||
|
||||
- **v2.0** (2025-01-18): Added performance metrics, stress testing, configurable cleanup
|
||||
- **v1.0** (Initial): Basic workspace isolation tests with timing-based assertions
|
||||
|
|
@ -26,12 +26,14 @@ import tempfile
|
|||
import numpy as np
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple, Dict
|
||||
from lightrag.kg.shared_storage import (
|
||||
get_final_namespace,
|
||||
get_namespace_lock,
|
||||
get_default_workspace,
|
||||
set_default_workspace,
|
||||
initialize_share_data,
|
||||
finalize_share_data,
|
||||
initialize_pipeline_status,
|
||||
get_namespace_data,
|
||||
set_all_update_flags,
|
||||
|
|
@ -41,6 +43,16 @@ from lightrag.kg.shared_storage import (
|
|||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Test Configuration
|
||||
# =============================================================================
|
||||
|
||||
# Stress test configuration (enable via environment variable)
|
||||
STRESS_TEST_MODE = os.getenv("LIGHTRAG_STRESS_TEST", "false").lower() == "true"
|
||||
PARALLEL_WORKERS = int(os.getenv("LIGHTRAG_TEST_WORKERS", "3"))
|
||||
KEEP_TEST_ARTIFACTS = os.getenv("LIGHTRAG_KEEP_ARTIFACTS", "false").lower() == "true"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Pytest Fixtures
|
||||
# =============================================================================
|
||||
|
|
@ -51,7 +63,85 @@ def setup_shared_data():
|
|||
"""Initialize shared data before each test"""
|
||||
initialize_share_data()
|
||||
yield
|
||||
# Cleanup after test if needed
|
||||
finalize_share_data()
|
||||
|
||||
|
||||
async def _measure_lock_parallelism(
|
||||
workload: List[Tuple[str, str, str]], hold_time: float = 0.05
|
||||
) -> Tuple[int, List[Tuple[str, str]], Dict[str, float]]:
|
||||
"""Run lock acquisition workload and capture peak concurrency and timeline.
|
||||
|
||||
Args:
|
||||
workload: List of (name, workspace, namespace) tuples
|
||||
hold_time: How long each worker holds the lock (seconds)
|
||||
|
||||
Returns:
|
||||
Tuple of (max_parallel, timeline, metrics) where:
|
||||
- max_parallel: Peak number of concurrent lock holders
|
||||
- timeline: List of (name, event) tuples tracking execution order
|
||||
- metrics: Dict with performance metrics (total_duration, max_concurrency, etc.)
|
||||
"""
|
||||
|
||||
running = 0
|
||||
max_parallel = 0
|
||||
timeline: List[Tuple[str, str]] = []
|
||||
start_time = time.time()
|
||||
|
||||
async def worker(name: str, workspace: str, namespace: str) -> None:
|
||||
nonlocal running, max_parallel
|
||||
lock = get_namespace_lock(namespace, workspace)
|
||||
async with lock:
|
||||
running += 1
|
||||
max_parallel = max(max_parallel, running)
|
||||
timeline.append((name, "start"))
|
||||
await asyncio.sleep(hold_time)
|
||||
timeline.append((name, "end"))
|
||||
running -= 1
|
||||
|
||||
await asyncio.gather(*(worker(*args) for args in workload))
|
||||
|
||||
metrics = {
|
||||
"total_duration": time.time() - start_time,
|
||||
"max_concurrency": max_parallel,
|
||||
"avg_hold_time": hold_time,
|
||||
"num_workers": len(workload),
|
||||
}
|
||||
|
||||
return max_parallel, timeline, metrics
|
||||
|
||||
|
||||
def _assert_no_timeline_overlap(timeline: List[Tuple[str, str]]) -> None:
|
||||
"""Ensure that timeline events never overlap for sequential execution.
|
||||
|
||||
This function implements a finite state machine that validates:
|
||||
- No overlapping lock acquisitions (only one task active at a time)
|
||||
- Proper lock release order (task releases its own lock)
|
||||
- All locks are properly released
|
||||
|
||||
Args:
|
||||
timeline: List of (name, event) tuples where event is "start" or "end"
|
||||
|
||||
Raises:
|
||||
AssertionError: If timeline shows overlapping execution or improper locking
|
||||
"""
|
||||
|
||||
active_task = None
|
||||
for name, event in timeline:
|
||||
if event == "start":
|
||||
if active_task is not None:
|
||||
raise AssertionError(
|
||||
f"Task '{name}' started before '{active_task}' released the lock"
|
||||
)
|
||||
active_task = name
|
||||
else:
|
||||
if active_task != name:
|
||||
raise AssertionError(
|
||||
f"Task '{name}' finished while '{active_task}' was expected to hold the lock"
|
||||
)
|
||||
active_task = None
|
||||
|
||||
if active_task is not None:
|
||||
raise AssertionError(f"Task '{active_task}' did not release the lock properly")
|
||||
|
||||
|
||||
# =============================================================================
|
||||
|
|
@ -64,6 +154,8 @@ async def test_pipeline_status_isolation():
|
|||
"""
|
||||
Test that pipeline status is isolated between different workspaces.
|
||||
"""
|
||||
# Purpose: Ensure pipeline_status shared data remains unique per workspace.
|
||||
# Scope: initialize_pipeline_status and get_namespace_data interactions.
|
||||
print("\n" + "=" * 60)
|
||||
print("TEST 1: Pipeline Status Isolation")
|
||||
print("=" * 60)
|
||||
|
|
@ -118,52 +210,53 @@ async def test_lock_mechanism():
|
|||
Tests both parallel execution for different workspaces and serialization
|
||||
for the same workspace.
|
||||
"""
|
||||
# Purpose: Validate that keyed locks isolate workspaces while serializing
|
||||
# requests within the same workspace. Scope: get_namespace_lock scheduling
|
||||
# semantics for both cross-workspace and single-workspace cases.
|
||||
print("\n" + "=" * 60)
|
||||
print("TEST 2: Lock Mechanism (No Deadlocks)")
|
||||
print("=" * 60)
|
||||
|
||||
# Test 2.1: Different workspaces should run in parallel
|
||||
print("\nTest 2.1: Different workspaces locks should be parallel")
|
||||
|
||||
async def acquire_lock_timed(workspace, namespace, hold_time):
|
||||
"""Acquire a lock and hold it for specified time"""
|
||||
lock = get_namespace_lock(namespace, workspace)
|
||||
start = time.time()
|
||||
async with lock:
|
||||
print(f" [{workspace}] acquired lock at {time.time() - start:.2f}s")
|
||||
await asyncio.sleep(hold_time)
|
||||
print(f" [{workspace}] releasing lock at {time.time() - start:.2f}s")
|
||||
|
||||
start = time.time()
|
||||
await asyncio.gather(
|
||||
acquire_lock_timed("ws_a", "test_namespace", 0.5),
|
||||
acquire_lock_timed("ws_b", "test_namespace", 0.5),
|
||||
acquire_lock_timed("ws_c", "test_namespace", 0.5),
|
||||
|
||||
# Support stress testing with configurable number of workers
|
||||
num_workers = PARALLEL_WORKERS if STRESS_TEST_MODE else 3
|
||||
parallel_workload = [
|
||||
(f"ws_{chr(97+i)}", f"ws_{chr(97+i)}", "test_namespace")
|
||||
for i in range(num_workers)
|
||||
]
|
||||
|
||||
max_parallel, timeline_parallel, metrics = await _measure_lock_parallelism(
|
||||
parallel_workload
|
||||
)
|
||||
assert max_parallel >= 2, (
|
||||
"Locks for distinct workspaces should overlap; "
|
||||
f"observed max concurrency: {max_parallel}, timeline={timeline_parallel}"
|
||||
)
|
||||
elapsed = time.time() - start
|
||||
|
||||
# If locks are properly isolated by workspace, this should take ~0.5s (parallel)
|
||||
# If they block each other, it would take ~1.5s (serial)
|
||||
assert elapsed < 1.0, f"Locks blocked each other: {elapsed:.2f}s (expected < 1.0s)"
|
||||
|
||||
print("✅ PASSED: Lock Mechanism - Parallel (Different Workspaces)")
|
||||
print(f" Locks ran in parallel: {elapsed:.2f}s")
|
||||
print(f" Locks overlapped for different workspaces (max concurrency={max_parallel})")
|
||||
print(f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} workers")
|
||||
|
||||
# Test 2.2: Same workspace should serialize
|
||||
print("\nTest 2.2: Same workspace locks should serialize")
|
||||
|
||||
start = time.time()
|
||||
await asyncio.gather(
|
||||
acquire_lock_timed("ws_same", "test_namespace", 0.3),
|
||||
acquire_lock_timed("ws_same", "test_namespace", 0.3),
|
||||
serial_workload = [
|
||||
("serial_run_1", "ws_same", "test_namespace"),
|
||||
("serial_run_2", "ws_same", "test_namespace"),
|
||||
]
|
||||
max_parallel_serial, timeline_serial, metrics_serial = await _measure_lock_parallelism(
|
||||
serial_workload
|
||||
)
|
||||
elapsed = time.time() - start
|
||||
|
||||
# Same workspace should serialize, taking ~0.6s
|
||||
assert elapsed >= 0.5, f"Locks didn't serialize: {elapsed:.2f}s (expected >= 0.5s)"
|
||||
assert max_parallel_serial == 1, (
|
||||
"Same workspace locks should not overlap; "
|
||||
f"observed {max_parallel_serial} with timeline {timeline_serial}"
|
||||
)
|
||||
_assert_no_timeline_overlap(timeline_serial)
|
||||
|
||||
print("✅ PASSED: Lock Mechanism - Serial (Same Workspace)")
|
||||
print(f" Locks serialized correctly: {elapsed:.2f}s")
|
||||
print(" Same workspace operations executed sequentially with no overlap")
|
||||
print(f" Performance: {metrics_serial['total_duration']:.3f}s for {metrics_serial['num_workers']} tasks")
|
||||
|
||||
|
||||
# =============================================================================
|
||||
|
|
@ -176,6 +269,9 @@ async def test_backward_compatibility():
|
|||
"""
|
||||
Test that legacy code without workspace parameter still works correctly.
|
||||
"""
|
||||
# Purpose: Validate backward-compatible defaults when workspace arguments
|
||||
# are omitted. Scope: get_final_namespace, set/get_default_workspace and
|
||||
# initialize_pipeline_status fallback behavior.
|
||||
print("\n" + "=" * 60)
|
||||
print("TEST 3: Backward Compatibility")
|
||||
print("=" * 60)
|
||||
|
|
@ -247,6 +343,9 @@ async def test_multi_workspace_concurrency():
|
|||
Test that multiple workspaces can operate concurrently without interference.
|
||||
Simulates concurrent operations on different workspaces.
|
||||
"""
|
||||
# Purpose: Simulate concurrent workloads touching pipeline_status across
|
||||
# workspaces. Scope: initialize_pipeline_status, get_namespace_lock, and
|
||||
# shared dictionary mutation while ensuring isolation.
|
||||
print("\n" + "=" * 60)
|
||||
print("TEST 4: Multi-Workspace Concurrency")
|
||||
print("=" * 60)
|
||||
|
|
@ -327,6 +426,9 @@ async def test_namespace_lock_reentrance():
|
|||
Test that NamespaceLock prevents re-entrance in the same coroutine
|
||||
and allows concurrent use in different coroutines.
|
||||
"""
|
||||
# Purpose: Ensure NamespaceLock enforces single entry per coroutine while
|
||||
# allowing concurrent reuse through ContextVar isolation. Scope: lock
|
||||
# re-entrance checks and concurrent gather semantics.
|
||||
print("\n" + "=" * 60)
|
||||
print("TEST 5: NamespaceLock Re-entrance Protection")
|
||||
print("=" * 60)
|
||||
|
|
@ -396,37 +498,29 @@ async def test_different_namespace_lock_isolation():
|
|||
"""
|
||||
Test that locks for different namespaces (same workspace) are independent.
|
||||
"""
|
||||
# Purpose: Confirm that namespace isolation is enforced even when workspace
|
||||
# is the same. Scope: get_namespace_lock behavior when namespaces differ.
|
||||
print("\n" + "=" * 60)
|
||||
print("TEST 6: Different Namespace Lock Isolation")
|
||||
print("=" * 60)
|
||||
|
||||
print("\nTesting locks with same workspace but different namespaces")
|
||||
|
||||
async def acquire_lock_timed(workspace, namespace, hold_time, name):
|
||||
"""Acquire a lock and hold it for specified time"""
|
||||
lock = get_namespace_lock(namespace, workspace)
|
||||
start = time.time()
|
||||
async with lock:
|
||||
print(f" [{name}] acquired lock at {time.time() - start:.2f}s")
|
||||
await asyncio.sleep(hold_time)
|
||||
print(f" [{name}] releasing lock at {time.time() - start:.2f}s")
|
||||
workload = [
|
||||
("ns_a", "same_ws", "namespace_a"),
|
||||
("ns_b", "same_ws", "namespace_b"),
|
||||
("ns_c", "same_ws", "namespace_c"),
|
||||
]
|
||||
max_parallel, timeline, metrics = await _measure_lock_parallelism(workload)
|
||||
|
||||
# These should run in parallel (different namespaces)
|
||||
start = time.time()
|
||||
await asyncio.gather(
|
||||
acquire_lock_timed("same_ws", "namespace_a", 0.5, "ns_a"),
|
||||
acquire_lock_timed("same_ws", "namespace_b", 0.5, "ns_b"),
|
||||
acquire_lock_timed("same_ws", "namespace_c", 0.5, "ns_c"),
|
||||
assert max_parallel >= 2, (
|
||||
"Different namespaces within the same workspace should run concurrently; "
|
||||
f"observed max concurrency {max_parallel} with timeline {timeline}"
|
||||
)
|
||||
elapsed = time.time() - start
|
||||
|
||||
# If locks are properly isolated by namespace, this should take ~0.5s (parallel)
|
||||
assert (
|
||||
elapsed < 1.0
|
||||
), f"Different namespace locks blocked each other: {elapsed:.2f}s (expected < 1.0s)"
|
||||
|
||||
print("✅ PASSED: Different Namespace Lock Isolation")
|
||||
print(f" Different namespace locks ran in parallel: {elapsed:.2f}s")
|
||||
print(f" Different namespace locks ran in parallel (max concurrency={max_parallel})")
|
||||
print(f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} namespaces")
|
||||
|
||||
|
||||
# =============================================================================
|
||||
|
|
@ -439,10 +533,18 @@ async def test_error_handling():
|
|||
"""
|
||||
Test error handling for invalid workspace configurations.
|
||||
"""
|
||||
# Purpose: Validate guardrails for workspace normalization and namespace
|
||||
# derivation. Scope: set_default_workspace conversions and get_final_namespace
|
||||
# failure paths when configuration is invalid.
|
||||
print("\n" + "=" * 60)
|
||||
print("TEST 7: Error Handling")
|
||||
print("=" * 60)
|
||||
|
||||
# Test 7.0: Missing default workspace should raise ValueError
|
||||
print("\nTest 7.0: Missing workspace raises ValueError")
|
||||
with pytest.raises(ValueError):
|
||||
get_final_namespace("test_namespace", workspace=None)
|
||||
|
||||
# Test 7.1: set_default_workspace(None) converts to empty string
|
||||
print("\nTest 7.1: set_default_workspace(None) converts to empty string")
|
||||
|
||||
|
|
@ -481,6 +583,9 @@ async def test_update_flags_workspace_isolation():
|
|||
"""
|
||||
Test that update flags are properly isolated between workspaces.
|
||||
"""
|
||||
# Purpose: Confirm update flag setters/readers respect workspace scoping.
|
||||
# Scope: set_all_update_flags, clear_all_update_flags, get_all_update_flags_status,
|
||||
# and get_update_flag interactions across namespaces.
|
||||
print("\n" + "=" * 60)
|
||||
print("TEST 8: Update Flags Workspace Isolation")
|
||||
print("=" * 60)
|
||||
|
|
@ -576,6 +681,20 @@ async def test_update_flags_workspace_isolation():
|
|||
assert (
|
||||
len(workspace2_keys) == 0
|
||||
), f"workspace2 keys should not be present, got {len(workspace2_keys)}"
|
||||
for key, values in status1.items():
|
||||
assert all(values), f"All flags in {key} should be True, got {values}"
|
||||
|
||||
# Workspace2 query should only surface workspace2 namespaces
|
||||
status2 = await get_all_update_flags_status(workspace=workspace2)
|
||||
expected_ws2_keys = {
|
||||
f"{workspace2}:{test_namespace}",
|
||||
f"{workspace2}:ns_c",
|
||||
}
|
||||
assert (
|
||||
set(status2.keys()) == expected_ws2_keys
|
||||
), f"Unexpected namespaces for workspace2: {status2.keys()}"
|
||||
for key, values in status2.items():
|
||||
assert all(values), f"All flags in {key} should be True, got {values}"
|
||||
|
||||
print("✅ PASSED: Update Flags - get_all_update_flags_status Filtering")
|
||||
print(
|
||||
|
|
@ -593,6 +712,9 @@ async def test_empty_workspace_standardization():
|
|||
"""
|
||||
Test that empty workspace is properly standardized to "" instead of "_".
|
||||
"""
|
||||
# Purpose: Verify namespace formatting when workspace is an empty string.
|
||||
# Scope: get_final_namespace output and initialize_pipeline_status behavior
|
||||
# between empty and non-empty workspaces.
|
||||
print("\n" + "=" * 60)
|
||||
print("TEST 9: Empty Workspace Standardization")
|
||||
print("=" * 60)
|
||||
|
|
@ -645,6 +767,9 @@ async def test_json_kv_storage_workspace_isolation():
|
|||
Creates two JsonKVStorage instances with different workspaces, writes different data,
|
||||
and verifies they don't mix.
|
||||
"""
|
||||
# Purpose: Ensure JsonKVStorage respects workspace-specific directories and data.
|
||||
# Scope: storage initialization, upsert/get_by_id operations, and filesystem layout
|
||||
# inside the temporary working directory.
|
||||
print("\n" + "=" * 60)
|
||||
print("TEST 10: JsonKVStorage Workspace Isolation (Integration)")
|
||||
print("=" * 60)
|
||||
|
|
@ -790,10 +915,12 @@ async def test_json_kv_storage_workspace_isolation():
|
|||
print(f" Workspace directories correctly created: {ws1_dir} and {ws2_dir}")
|
||||
|
||||
finally:
|
||||
# Cleanup test directory
|
||||
if os.path.exists(test_dir):
|
||||
# Cleanup test directory (unless KEEP_TEST_ARTIFACTS is set)
|
||||
if os.path.exists(test_dir) and not KEEP_TEST_ARTIFACTS:
|
||||
shutil.rmtree(test_dir)
|
||||
print(f"\n Cleaned up test directory: {test_dir}")
|
||||
elif KEEP_TEST_ARTIFACTS:
|
||||
print(f"\n Kept test directory for inspection: {test_dir}")
|
||||
|
||||
|
||||
# =============================================================================
|
||||
|
|
@ -808,6 +935,9 @@ async def test_lightrag_end_to_end_workspace_isolation():
|
|||
insert different data, and verify file separation.
|
||||
Uses mock LLM and embedding functions to avoid external API calls.
|
||||
"""
|
||||
# Purpose: Validate that full LightRAG flows keep artifacts scoped per workspace.
|
||||
# Scope: LightRAG.initialize_storages + ainsert side effects plus filesystem
|
||||
# verification for generated storage files.
|
||||
print("\n" + "=" * 60)
|
||||
print("TEST 11: LightRAG End-to-End Workspace Isolation")
|
||||
print("=" * 60)
|
||||
|
|
@ -852,12 +982,21 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le
|
|||
print("\nTest 11.1: Create two LightRAG instances with different workspaces")
|
||||
|
||||
from lightrag import LightRAG
|
||||
from lightrag.utils import EmbeddingFunc
|
||||
from lightrag.utils import EmbeddingFunc, Tokenizer
|
||||
|
||||
# Create different mock LLM functions for each workspace
|
||||
mock_llm_func_a = create_mock_llm_func("project_a")
|
||||
mock_llm_func_b = create_mock_llm_func("project_b")
|
||||
|
||||
class _SimpleTokenizerImpl:
|
||||
def encode(self, content: str) -> list[int]:
|
||||
return [ord(ch) for ch in content]
|
||||
|
||||
def decode(self, tokens: list[int]) -> str:
|
||||
return "".join(chr(t) for t in tokens)
|
||||
|
||||
tokenizer = Tokenizer("mock-tokenizer", _SimpleTokenizerImpl())
|
||||
|
||||
rag1 = LightRAG(
|
||||
working_dir=test_dir,
|
||||
workspace="project_a",
|
||||
|
|
@ -867,6 +1006,7 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le
|
|||
max_token_size=8192,
|
||||
func=mock_embedding_func,
|
||||
),
|
||||
tokenizer=tokenizer,
|
||||
)
|
||||
|
||||
rag2 = LightRAG(
|
||||
|
|
@ -878,6 +1018,7 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le
|
|||
max_token_size=8192,
|
||||
func=mock_embedding_func,
|
||||
),
|
||||
tokenizer=tokenizer,
|
||||
)
|
||||
|
||||
# Initialize storages
|
||||
|
|
@ -1000,9 +1141,9 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le
|
|||
print("\n ✓ Test complete - workspace isolation verified at E2E level")
|
||||
|
||||
finally:
|
||||
# Cleanup test directory
|
||||
# if os.path.exists(test_dir):
|
||||
# shutil.rmtree(test_dir)
|
||||
# print(f"\n Cleaned up test directory: {test_dir}")
|
||||
print("Keep test directory for manual inspection:")
|
||||
print(f" {test_dir}")
|
||||
# Cleanup test directory (unless KEEP_TEST_ARTIFACTS is set)
|
||||
if os.path.exists(test_dir) and not KEEP_TEST_ARTIFACTS:
|
||||
shutil.rmtree(test_dir)
|
||||
print(f"\n Cleaned up test directory: {test_dir}")
|
||||
elif KEEP_TEST_ARTIFACTS:
|
||||
print(f"\n Kept test directory for inspection: {test_dir}")
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue