diff --git a/tests/README_WORKSPACE_ISOLATION_TESTS.md b/tests/README_WORKSPACE_ISOLATION_TESTS.md new file mode 100644 index 00000000..42d84b5d --- /dev/null +++ b/tests/README_WORKSPACE_ISOLATION_TESTS.md @@ -0,0 +1,265 @@ +# Workspace Isolation Test Suite + +## Overview +Comprehensive test coverage for LightRAG's workspace isolation feature, ensuring that different workspaces (projects) can coexist independently without data contamination or resource conflicts. + +## Test Architecture + +### Design Principles +1. **Concurrency-Based Assertions**: Instead of timing-based tests (which are flaky), we measure actual concurrent lock holders +2. **Timeline Validation**: Finite state machine validates proper sequential execution +3. **Performance Metrics**: Each test reports execution metrics for debugging and optimization +4. **Configurable Stress Testing**: Environment variables control test intensity + +## Test Categories + +### 1. Data Isolation Tests +**Tests:** 1, 4, 8, 9, 10 +**Purpose:** Verify that data in one workspace doesn't leak into another + +- **Test 1: Pipeline Status Isolation** - Core shared data structures remain separate +- **Test 4: Multi-Workspace Concurrency** - Concurrent operations don't interfere +- **Test 8: Update Flags Isolation** - Flag management respects workspace boundaries +- **Test 9: Empty Workspace Standardization** - Edge case handling for empty workspace strings +- **Test 10: JsonKVStorage Integration** - Storage layer properly isolates data + +### 2. Lock Mechanism Tests +**Tests:** 2, 5, 6 +**Purpose:** Validate that locking mechanisms allow parallelism across workspaces while enforcing serialization within workspaces + +- **Test 2: Lock Mechanism** - Different workspaces run in parallel, same workspace serializes +- **Test 5: Re-entrance Protection** - Prevent deadlocks from re-entrant lock acquisition +- **Test 6: Namespace Lock Isolation** - Different namespaces within same workspace are independent + +### 3. Backward Compatibility Tests +**Test:** 3 +**Purpose:** Ensure legacy code without workspace parameters still functions correctly + +- Default workspace fallback behavior +- Empty workspace handling +- None vs empty string normalization + +### 4. Error Handling Tests +**Test:** 7 +**Purpose:** Validate guardrails for invalid configurations + +- Missing workspace validation +- Workspace normalization +- Edge case handling + +### 5. End-to-End Integration Tests +**Test:** 11 +**Purpose:** Validate complete LightRAG workflows maintain isolation + +- Full document insertion pipeline +- File system separation +- Data content verification + +## Running Tests + +### Basic Usage +```bash +# Run all workspace isolation tests +pytest tests/test_workspace_isolation.py -v + +# Run specific test +pytest tests/test_workspace_isolation.py::test_lock_mechanism -v + +# Run with detailed output +pytest tests/test_workspace_isolation.py -v -s +``` + +### Environment Configuration + +#### Stress Testing +Enable stress testing with configurable number of workers: +```bash +# Enable stress mode with default 3 workers +LIGHTRAG_STRESS_TEST=true pytest tests/test_workspace_isolation.py -v + +# Custom number of workers (e.g., 10) +LIGHTRAG_STRESS_TEST=true LIGHTRAG_TEST_WORKERS=10 pytest tests/test_workspace_isolation.py -v +``` + +#### Keep Test Artifacts +Preserve temporary directories for manual inspection: +```bash +# Keep test artifacts (useful for debugging) +LIGHTRAG_KEEP_ARTIFACTS=true pytest tests/test_workspace_isolation.py -v +``` + +#### Combined Example +```bash +# Stress test with 20 workers and keep artifacts +LIGHTRAG_STRESS_TEST=true \ +LIGHTRAG_TEST_WORKERS=20 \ +LIGHTRAG_KEEP_ARTIFACTS=true \ +pytest tests/test_workspace_isolation.py::test_lock_mechanism -v -s +``` + +### CI/CD Integration +```bash +# Recommended CI/CD command (no artifacts, default workers) +pytest tests/test_workspace_isolation.py -v --tb=short +``` + +## Test Implementation Details + +### Helper Functions + +#### `_measure_lock_parallelism` +Measures actual concurrency rather than wall-clock time. + +**Returns:** +- `max_parallel`: Peak number of concurrent lock holders +- `timeline`: Ordered list of (task_name, event) tuples +- `metrics`: Dict with performance data (duration, concurrency, workers) + +**Example:** +```python +workload = [ + ("task1", "workspace1", "namespace"), + ("task2", "workspace2", "namespace"), +] +max_parallel, timeline, metrics = await _measure_lock_parallelism(workload) + +# Assert on actual behavior, not timing +assert max_parallel >= 2 # Two different workspaces should run concurrently +``` + +#### `_assert_no_timeline_overlap` +Validates sequential execution using finite state machine. + +**Validates:** +- No overlapping lock acquisitions +- Proper lock release ordering +- All locks properly released + +**Example:** +```python +timeline = [ + ("task1", "start"), + ("task1", "end"), + ("task2", "start"), + ("task2", "end"), +] +_assert_no_timeline_overlap(timeline) # Passes - no overlap + +timeline_bad = [ + ("task1", "start"), + ("task2", "start"), # ERROR: task2 started before task1 ended + ("task1", "end"), +] +_assert_no_timeline_overlap(timeline_bad) # Raises AssertionError +``` + +## Configuration Variables + +| Variable | Type | Default | Description | +|----------|------|---------|-------------| +| `LIGHTRAG_STRESS_TEST` | bool | `false` | Enable stress testing mode | +| `LIGHTRAG_TEST_WORKERS` | int | `3` | Number of parallel workers in stress mode | +| `LIGHTRAG_KEEP_ARTIFACTS` | bool | `false` | Keep temporary test directories | + +## Performance Benchmarks + +### Expected Performance (Reference System) +- **Test 1-9**: < 1s each +- **Test 10**: < 2s (includes file I/O) +- **Test 11**: < 5s (includes full RAG pipeline) +- **Total Suite**: < 15s + +### Stress Test Performance +With `LIGHTRAG_TEST_WORKERS=10`: +- **Test 2 (Parallel)**: ~0.05s (10 workers, all concurrent) +- **Test 2 (Serial)**: ~0.10s (2 workers, serialized) + +## Troubleshooting + +### Common Issues + +#### Flaky Test Failures +**Symptom:** Tests pass locally but fail in CI/CD +**Cause:** System under heavy load, timing-based assertions +**Solution:** Our tests use concurrency-based assertions, not timing. If failures persist, check the `timeline` output in error messages. + +#### Resource Cleanup Errors +**Symptom:** "Directory not empty" or "Cannot remove directory" +**Cause:** Concurrent test execution or OS file locking +**Solution:** Run tests serially (`pytest -n 1`) or use `LIGHTRAG_KEEP_ARTIFACTS=true` to inspect state + +#### Lock Timeout Errors +**Symptom:** "Lock acquisition timeout" +**Cause:** Deadlock or resource starvation +**Solution:** Check test output for deadlock patterns, review lock acquisition order + +### Debug Tips + +1. **Enable verbose output:** + ```bash + pytest tests/test_workspace_isolation.py -v -s + ``` + +2. **Run single test with artifacts:** + ```bash + LIGHTRAG_KEEP_ARTIFACTS=true pytest tests/test_workspace_isolation.py::test_json_kv_storage_workspace_isolation -v -s + ``` + +3. **Check performance metrics:** + Look for the "Performance:" lines in test output showing duration and concurrency. + +4. **Inspect timeline on failure:** + Timeline data is included in assertion error messages. + +## Contributing + +### Adding New Tests + +1. **Follow naming convention:** `test__` +2. **Add purpose/scope comments:** Explain what and why +3. **Use helper functions:** `_measure_lock_parallelism`, `_assert_no_timeline_overlap` +4. **Document assertions:** Explain expected behavior in assertions +5. **Update this README:** Add test to appropriate category + +### Test Template +```python +@pytest.mark.asyncio +async def test_new_feature(): + """ + Brief description of what this test validates. + """ + # Purpose: Why this test exists + # Scope: What functions/classes this tests + print("\n" + "=" * 60) + print("TEST N: Feature Name") + print("=" * 60) + + # Test implementation + # ... + + print("✅ PASSED: Feature Name") + print(f" Validation details") +``` + +## Related Documentation + +- [Workspace Isolation Design Doc](../docs/LightRAG_concurrent_explain.md) +- [Project Intelligence](.clinerules/01-basic.md) +- [Memory Bank](../.memory-bank/) + +## Test Coverage Matrix + +| Component | Data Isolation | Lock Mechanism | Backward Compat | Error Handling | E2E | +|-----------|:--------------:|:--------------:|:---------------:|:--------------:|:---:| +| shared_storage | ✅ T1, T4 | ✅ T2, T5, T6 | ✅ T3 | ✅ T7 | ✅ T11 | +| update_flags | ✅ T8 | - | - | - | - | +| JsonKVStorage | ✅ T10 | - | - | - | ✅ T11 | +| LightRAG Core | - | - | - | - | ✅ T11 | +| Namespace | ✅ T9 | - | ✅ T3 | ✅ T7 | - | + +**Legend:** T# = Test number + +## Version History + +- **v2.0** (2025-01-18): Added performance metrics, stress testing, configurable cleanup +- **v1.0** (Initial): Basic workspace isolation tests with timing-based assertions diff --git a/tests/test_workspace_isolation.py b/tests/test_workspace_isolation.py index f962a300..7a378e9c 100644 --- a/tests/test_workspace_isolation.py +++ b/tests/test_workspace_isolation.py @@ -26,12 +26,14 @@ import tempfile import numpy as np import pytest from pathlib import Path +from typing import List, Tuple, Dict from lightrag.kg.shared_storage import ( get_final_namespace, get_namespace_lock, get_default_workspace, set_default_workspace, initialize_share_data, + finalize_share_data, initialize_pipeline_status, get_namespace_data, set_all_update_flags, @@ -41,6 +43,16 @@ from lightrag.kg.shared_storage import ( ) +# ============================================================================= +# Test Configuration +# ============================================================================= + +# Stress test configuration (enable via environment variable) +STRESS_TEST_MODE = os.getenv("LIGHTRAG_STRESS_TEST", "false").lower() == "true" +PARALLEL_WORKERS = int(os.getenv("LIGHTRAG_TEST_WORKERS", "3")) +KEEP_TEST_ARTIFACTS = os.getenv("LIGHTRAG_KEEP_ARTIFACTS", "false").lower() == "true" + + # ============================================================================= # Pytest Fixtures # ============================================================================= @@ -51,7 +63,85 @@ def setup_shared_data(): """Initialize shared data before each test""" initialize_share_data() yield - # Cleanup after test if needed + finalize_share_data() + + +async def _measure_lock_parallelism( + workload: List[Tuple[str, str, str]], hold_time: float = 0.05 +) -> Tuple[int, List[Tuple[str, str]], Dict[str, float]]: + """Run lock acquisition workload and capture peak concurrency and timeline. + + Args: + workload: List of (name, workspace, namespace) tuples + hold_time: How long each worker holds the lock (seconds) + + Returns: + Tuple of (max_parallel, timeline, metrics) where: + - max_parallel: Peak number of concurrent lock holders + - timeline: List of (name, event) tuples tracking execution order + - metrics: Dict with performance metrics (total_duration, max_concurrency, etc.) + """ + + running = 0 + max_parallel = 0 + timeline: List[Tuple[str, str]] = [] + start_time = time.time() + + async def worker(name: str, workspace: str, namespace: str) -> None: + nonlocal running, max_parallel + lock = get_namespace_lock(namespace, workspace) + async with lock: + running += 1 + max_parallel = max(max_parallel, running) + timeline.append((name, "start")) + await asyncio.sleep(hold_time) + timeline.append((name, "end")) + running -= 1 + + await asyncio.gather(*(worker(*args) for args in workload)) + + metrics = { + "total_duration": time.time() - start_time, + "max_concurrency": max_parallel, + "avg_hold_time": hold_time, + "num_workers": len(workload), + } + + return max_parallel, timeline, metrics + + +def _assert_no_timeline_overlap(timeline: List[Tuple[str, str]]) -> None: + """Ensure that timeline events never overlap for sequential execution. + + This function implements a finite state machine that validates: + - No overlapping lock acquisitions (only one task active at a time) + - Proper lock release order (task releases its own lock) + - All locks are properly released + + Args: + timeline: List of (name, event) tuples where event is "start" or "end" + + Raises: + AssertionError: If timeline shows overlapping execution or improper locking + """ + + active_task = None + for name, event in timeline: + if event == "start": + if active_task is not None: + raise AssertionError( + f"Task '{name}' started before '{active_task}' released the lock" + ) + active_task = name + else: + if active_task != name: + raise AssertionError( + f"Task '{name}' finished while '{active_task}' was expected to hold the lock" + ) + active_task = None + + if active_task is not None: + raise AssertionError(f"Task '{active_task}' did not release the lock properly") # ============================================================================= @@ -64,6 +154,8 @@ async def test_pipeline_status_isolation(): """ Test that pipeline status is isolated between different workspaces. """ + # Purpose: Ensure pipeline_status shared data remains unique per workspace. + # Scope: initialize_pipeline_status and get_namespace_data interactions. print("\n" + "=" * 60) print("TEST 1: Pipeline Status Isolation") print("=" * 60) @@ -118,52 +210,53 @@ async def test_lock_mechanism(): Tests both parallel execution for different workspaces and serialization for the same workspace. """ + # Purpose: Validate that keyed locks isolate workspaces while serializing + # requests within the same workspace. Scope: get_namespace_lock scheduling + # semantics for both cross-workspace and single-workspace cases. print("\n" + "=" * 60) print("TEST 2: Lock Mechanism (No Deadlocks)") print("=" * 60) # Test 2.1: Different workspaces should run in parallel print("\nTest 2.1: Different workspaces locks should be parallel") - - async def acquire_lock_timed(workspace, namespace, hold_time): - """Acquire a lock and hold it for specified time""" - lock = get_namespace_lock(namespace, workspace) - start = time.time() - async with lock: - print(f" [{workspace}] acquired lock at {time.time() - start:.2f}s") - await asyncio.sleep(hold_time) - print(f" [{workspace}] releasing lock at {time.time() - start:.2f}s") - - start = time.time() - await asyncio.gather( - acquire_lock_timed("ws_a", "test_namespace", 0.5), - acquire_lock_timed("ws_b", "test_namespace", 0.5), - acquire_lock_timed("ws_c", "test_namespace", 0.5), + + # Support stress testing with configurable number of workers + num_workers = PARALLEL_WORKERS if STRESS_TEST_MODE else 3 + parallel_workload = [ + (f"ws_{chr(97+i)}", f"ws_{chr(97+i)}", "test_namespace") + for i in range(num_workers) + ] + + max_parallel, timeline_parallel, metrics = await _measure_lock_parallelism( + parallel_workload + ) + assert max_parallel >= 2, ( + "Locks for distinct workspaces should overlap; " + f"observed max concurrency: {max_parallel}, timeline={timeline_parallel}" ) - elapsed = time.time() - start - - # If locks are properly isolated by workspace, this should take ~0.5s (parallel) - # If they block each other, it would take ~1.5s (serial) - assert elapsed < 1.0, f"Locks blocked each other: {elapsed:.2f}s (expected < 1.0s)" print("✅ PASSED: Lock Mechanism - Parallel (Different Workspaces)") - print(f" Locks ran in parallel: {elapsed:.2f}s") + print(f" Locks overlapped for different workspaces (max concurrency={max_parallel})") + print(f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} workers") # Test 2.2: Same workspace should serialize print("\nTest 2.2: Same workspace locks should serialize") - - start = time.time() - await asyncio.gather( - acquire_lock_timed("ws_same", "test_namespace", 0.3), - acquire_lock_timed("ws_same", "test_namespace", 0.3), + serial_workload = [ + ("serial_run_1", "ws_same", "test_namespace"), + ("serial_run_2", "ws_same", "test_namespace"), + ] + max_parallel_serial, timeline_serial, metrics_serial = await _measure_lock_parallelism( + serial_workload ) - elapsed = time.time() - start - - # Same workspace should serialize, taking ~0.6s - assert elapsed >= 0.5, f"Locks didn't serialize: {elapsed:.2f}s (expected >= 0.5s)" + assert max_parallel_serial == 1, ( + "Same workspace locks should not overlap; " + f"observed {max_parallel_serial} with timeline {timeline_serial}" + ) + _assert_no_timeline_overlap(timeline_serial) print("✅ PASSED: Lock Mechanism - Serial (Same Workspace)") - print(f" Locks serialized correctly: {elapsed:.2f}s") + print(" Same workspace operations executed sequentially with no overlap") + print(f" Performance: {metrics_serial['total_duration']:.3f}s for {metrics_serial['num_workers']} tasks") # ============================================================================= @@ -176,6 +269,9 @@ async def test_backward_compatibility(): """ Test that legacy code without workspace parameter still works correctly. """ + # Purpose: Validate backward-compatible defaults when workspace arguments + # are omitted. Scope: get_final_namespace, set/get_default_workspace and + # initialize_pipeline_status fallback behavior. print("\n" + "=" * 60) print("TEST 3: Backward Compatibility") print("=" * 60) @@ -247,6 +343,9 @@ async def test_multi_workspace_concurrency(): Test that multiple workspaces can operate concurrently without interference. Simulates concurrent operations on different workspaces. """ + # Purpose: Simulate concurrent workloads touching pipeline_status across + # workspaces. Scope: initialize_pipeline_status, get_namespace_lock, and + # shared dictionary mutation while ensuring isolation. print("\n" + "=" * 60) print("TEST 4: Multi-Workspace Concurrency") print("=" * 60) @@ -327,6 +426,9 @@ async def test_namespace_lock_reentrance(): Test that NamespaceLock prevents re-entrance in the same coroutine and allows concurrent use in different coroutines. """ + # Purpose: Ensure NamespaceLock enforces single entry per coroutine while + # allowing concurrent reuse through ContextVar isolation. Scope: lock + # re-entrance checks and concurrent gather semantics. print("\n" + "=" * 60) print("TEST 5: NamespaceLock Re-entrance Protection") print("=" * 60) @@ -396,37 +498,29 @@ async def test_different_namespace_lock_isolation(): """ Test that locks for different namespaces (same workspace) are independent. """ + # Purpose: Confirm that namespace isolation is enforced even when workspace + # is the same. Scope: get_namespace_lock behavior when namespaces differ. print("\n" + "=" * 60) print("TEST 6: Different Namespace Lock Isolation") print("=" * 60) print("\nTesting locks with same workspace but different namespaces") - async def acquire_lock_timed(workspace, namespace, hold_time, name): - """Acquire a lock and hold it for specified time""" - lock = get_namespace_lock(namespace, workspace) - start = time.time() - async with lock: - print(f" [{name}] acquired lock at {time.time() - start:.2f}s") - await asyncio.sleep(hold_time) - print(f" [{name}] releasing lock at {time.time() - start:.2f}s") + workload = [ + ("ns_a", "same_ws", "namespace_a"), + ("ns_b", "same_ws", "namespace_b"), + ("ns_c", "same_ws", "namespace_c"), + ] + max_parallel, timeline, metrics = await _measure_lock_parallelism(workload) - # These should run in parallel (different namespaces) - start = time.time() - await asyncio.gather( - acquire_lock_timed("same_ws", "namespace_a", 0.5, "ns_a"), - acquire_lock_timed("same_ws", "namespace_b", 0.5, "ns_b"), - acquire_lock_timed("same_ws", "namespace_c", 0.5, "ns_c"), + assert max_parallel >= 2, ( + "Different namespaces within the same workspace should run concurrently; " + f"observed max concurrency {max_parallel} with timeline {timeline}" ) - elapsed = time.time() - start - - # If locks are properly isolated by namespace, this should take ~0.5s (parallel) - assert ( - elapsed < 1.0 - ), f"Different namespace locks blocked each other: {elapsed:.2f}s (expected < 1.0s)" print("✅ PASSED: Different Namespace Lock Isolation") - print(f" Different namespace locks ran in parallel: {elapsed:.2f}s") + print(f" Different namespace locks ran in parallel (max concurrency={max_parallel})") + print(f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} namespaces") # ============================================================================= @@ -439,10 +533,18 @@ async def test_error_handling(): """ Test error handling for invalid workspace configurations. """ + # Purpose: Validate guardrails for workspace normalization and namespace + # derivation. Scope: set_default_workspace conversions and get_final_namespace + # failure paths when configuration is invalid. print("\n" + "=" * 60) print("TEST 7: Error Handling") print("=" * 60) + # Test 7.0: Missing default workspace should raise ValueError + print("\nTest 7.0: Missing workspace raises ValueError") + with pytest.raises(ValueError): + get_final_namespace("test_namespace", workspace=None) + # Test 7.1: set_default_workspace(None) converts to empty string print("\nTest 7.1: set_default_workspace(None) converts to empty string") @@ -481,6 +583,9 @@ async def test_update_flags_workspace_isolation(): """ Test that update flags are properly isolated between workspaces. """ + # Purpose: Confirm update flag setters/readers respect workspace scoping. + # Scope: set_all_update_flags, clear_all_update_flags, get_all_update_flags_status, + # and get_update_flag interactions across namespaces. print("\n" + "=" * 60) print("TEST 8: Update Flags Workspace Isolation") print("=" * 60) @@ -576,6 +681,20 @@ async def test_update_flags_workspace_isolation(): assert ( len(workspace2_keys) == 0 ), f"workspace2 keys should not be present, got {len(workspace2_keys)}" + for key, values in status1.items(): + assert all(values), f"All flags in {key} should be True, got {values}" + + # Workspace2 query should only surface workspace2 namespaces + status2 = await get_all_update_flags_status(workspace=workspace2) + expected_ws2_keys = { + f"{workspace2}:{test_namespace}", + f"{workspace2}:ns_c", + } + assert ( + set(status2.keys()) == expected_ws2_keys + ), f"Unexpected namespaces for workspace2: {status2.keys()}" + for key, values in status2.items(): + assert all(values), f"All flags in {key} should be True, got {values}" print("✅ PASSED: Update Flags - get_all_update_flags_status Filtering") print( @@ -593,6 +712,9 @@ async def test_empty_workspace_standardization(): """ Test that empty workspace is properly standardized to "" instead of "_". """ + # Purpose: Verify namespace formatting when workspace is an empty string. + # Scope: get_final_namespace output and initialize_pipeline_status behavior + # between empty and non-empty workspaces. print("\n" + "=" * 60) print("TEST 9: Empty Workspace Standardization") print("=" * 60) @@ -645,6 +767,9 @@ async def test_json_kv_storage_workspace_isolation(): Creates two JsonKVStorage instances with different workspaces, writes different data, and verifies they don't mix. """ + # Purpose: Ensure JsonKVStorage respects workspace-specific directories and data. + # Scope: storage initialization, upsert/get_by_id operations, and filesystem layout + # inside the temporary working directory. print("\n" + "=" * 60) print("TEST 10: JsonKVStorage Workspace Isolation (Integration)") print("=" * 60) @@ -790,10 +915,12 @@ async def test_json_kv_storage_workspace_isolation(): print(f" Workspace directories correctly created: {ws1_dir} and {ws2_dir}") finally: - # Cleanup test directory - if os.path.exists(test_dir): + # Cleanup test directory (unless KEEP_TEST_ARTIFACTS is set) + if os.path.exists(test_dir) and not KEEP_TEST_ARTIFACTS: shutil.rmtree(test_dir) print(f"\n Cleaned up test directory: {test_dir}") + elif KEEP_TEST_ARTIFACTS: + print(f"\n Kept test directory for inspection: {test_dir}") # ============================================================================= @@ -808,6 +935,9 @@ async def test_lightrag_end_to_end_workspace_isolation(): insert different data, and verify file separation. Uses mock LLM and embedding functions to avoid external API calls. """ + # Purpose: Validate that full LightRAG flows keep artifacts scoped per workspace. + # Scope: LightRAG.initialize_storages + ainsert side effects plus filesystem + # verification for generated storage files. print("\n" + "=" * 60) print("TEST 11: LightRAG End-to-End Workspace Isolation") print("=" * 60) @@ -852,12 +982,21 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le print("\nTest 11.1: Create two LightRAG instances with different workspaces") from lightrag import LightRAG - from lightrag.utils import EmbeddingFunc + from lightrag.utils import EmbeddingFunc, Tokenizer # Create different mock LLM functions for each workspace mock_llm_func_a = create_mock_llm_func("project_a") mock_llm_func_b = create_mock_llm_func("project_b") + class _SimpleTokenizerImpl: + def encode(self, content: str) -> list[int]: + return [ord(ch) for ch in content] + + def decode(self, tokens: list[int]) -> str: + return "".join(chr(t) for t in tokens) + + tokenizer = Tokenizer("mock-tokenizer", _SimpleTokenizerImpl()) + rag1 = LightRAG( working_dir=test_dir, workspace="project_a", @@ -867,6 +1006,7 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le max_token_size=8192, func=mock_embedding_func, ), + tokenizer=tokenizer, ) rag2 = LightRAG( @@ -878,6 +1018,7 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le max_token_size=8192, func=mock_embedding_func, ), + tokenizer=tokenizer, ) # Initialize storages @@ -1000,9 +1141,9 @@ relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Le print("\n ✓ Test complete - workspace isolation verified at E2E level") finally: - # Cleanup test directory - # if os.path.exists(test_dir): - # shutil.rmtree(test_dir) - # print(f"\n Cleaned up test directory: {test_dir}") - print("Keep test directory for manual inspection:") - print(f" {test_dir}") + # Cleanup test directory (unless KEEP_TEST_ARTIFACTS is set) + if os.path.exists(test_dir) and not KEEP_TEST_ARTIFACTS: + shutil.rmtree(test_dir) + print(f"\n Cleaned up test directory: {test_dir}") + elif KEEP_TEST_ARTIFACTS: + print(f"\n Kept test directory for inspection: {test_dir}")