From d011a1c0e7fe1ca9db9eec7f25d661a1f5ee017f Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 18 Nov 2025 10:31:53 +0800 Subject: [PATCH] Refactor test configuration to use pytest fixtures and CLI options MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Add pytest command-line options • Create session-scoped fixtures • Remove hardcoded environment vars • Update test function signatures • Improve configuration priority (cherry picked from commit 1fe05df211c495764dbce2edc47b820c24bdafe9) --- tests/conftest.py | 392 ++------ tests/test_workspace_isolation.py | 1448 +++++++++++++---------------- 2 files changed, 729 insertions(+), 1111 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 61376575..41db438d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,351 +1,85 @@ """ -Pytest configuration and fixtures for multi-tenant testing. +Pytest configuration for LightRAG tests. -Provides: -- Database fixtures for different testing modes -- Tenant and KB context fixtures -- Mock LLM and embedding services -- Multi-tenant test utilities +This file provides command-line options and fixtures for test configuration. """ -import os import pytest -import asyncio -import psycopg2 -import json -from typing import Dict, List, Optional, Generator -from contextlib import contextmanager -from datetime import datetime -from unittest.mock import MagicMock, patch -import uuid - -# ============================================================================ -# Environment and Mode Detection -# ============================================================================ - -MULTITENANT_MODE = os.getenv("MULTITENANT_MODE", "demo") -POSTGRES_HOST = os.getenv("POSTGRES_HOST", "localhost") -POSTGRES_PORT = int(os.getenv("POSTGRES_PORT", "5432")) -POSTGRES_USER = os.getenv("POSTGRES_USER", "lightrag") -POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "lightrag_secure_password") -POSTGRES_DATABASE = os.getenv("POSTGRES_DATABASE", "lightrag_multitenant") -# ============================================================================ -# Database Connection Management -# ============================================================================ +def pytest_addoption(parser): + """Add custom command-line options for LightRAG tests.""" -@pytest.fixture(scope="session") -def db_connection_string(): - """Generate PostgreSQL connection string.""" - return f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DATABASE}" + parser.addoption( + "--keep-artifacts", + action="store_true", + default=False, + help="Keep test artifacts (temporary directories and files) after test completion for inspection", + ) + + parser.addoption( + "--stress-test", + action="store_true", + default=False, + help="Enable stress test mode with more intensive workloads", + ) + + parser.addoption( + "--test-workers", + action="store", + default=3, + type=int, + help="Number of parallel workers for stress tests (default: 3)", + ) @pytest.fixture(scope="session") -def postgres_connection(): - """Create persistent PostgreSQL connection for session.""" - try: - conn = psycopg2.connect( - host=POSTGRES_HOST, - port=POSTGRES_PORT, - user=POSTGRES_USER, - password=POSTGRES_PASSWORD, - database=POSTGRES_DATABASE - ) - conn.autocommit = False - yield conn - conn.close() - except psycopg2.Error as e: - pytest.skip(f"PostgreSQL not available: {e}") +def keep_test_artifacts(request): + """ + Fixture to determine whether to keep test artifacts. + Priority: CLI option > Environment variable > Default (False) + """ + import os -@contextmanager -def database_transaction(postgres_connection): - """Context manager for database transactions with rollback.""" - cursor = postgres_connection.cursor() - try: - yield cursor - postgres_connection.commit() - except Exception as e: - postgres_connection.rollback() - raise e - finally: - cursor.close() + # Check CLI option first + if request.config.getoption("--keep-artifacts"): + return True + # Fall back to environment variable + return os.getenv("LIGHTRAG_KEEP_ARTIFACTS", "false").lower() == "true" -# ============================================================================ -# Mode-Specific Fixtures -# ============================================================================ - -@pytest.fixture -def testing_mode(): - """Return current testing mode.""" - return MULTITENANT_MODE - - -@pytest.fixture -def is_compatibility_mode(): - """Check if running in compatibility mode (MULTITENANT_MODE=off).""" - return MULTITENANT_MODE == "off" - - -@pytest.fixture -def is_single_tenant_mode(): - """Check if running in single-tenant mode (MULTITENANT_MODE=on).""" - return MULTITENANT_MODE == "on" - - -@pytest.fixture -def is_demo_mode(): - """Check if running in demo mode (MULTITENANT_MODE=demo).""" - return MULTITENANT_MODE == "demo" - - -# ============================================================================ -# Tenant and KB Fixtures -# ============================================================================ - -@pytest.fixture -def demo_tenant_acme(): - """Acme Corp tenant for demo mode.""" - return { - "tenant_id": "acme-corp", - "name": "Acme Corporation", - "kbs": ["kb-prod", "kb-dev"] - } - - -@pytest.fixture -def demo_tenant_techstart(): - """TechStart tenant for demo mode.""" - return { - "tenant_id": "techstart", - "name": "TechStart Inc", - "kbs": ["kb-main", "kb-backup"] - } - - -@pytest.fixture -def default_tenant(): - """Default tenant for compatibility and on modes.""" - return { - "tenant_id": "default", - "name": "Default Tenant", - "kbs": ["default"] - } - - -@pytest.fixture -def test_tenant_1(): - """Test tenant 1 for single-tenant mode.""" - return { - "tenant_id": "tenant-1", - "name": "Test Tenant 1", - "kbs": ["kb-default", "kb-secondary", "kb-experimental"] - } - - -# ============================================================================ -# Test Data Fixtures -# ============================================================================ - -@pytest.fixture -def sample_document(): - """Sample document for testing.""" - return { - "title": "Test Document", - "content": "This is a test document for LightRAG multi-tenant testing.", - "file_type": "text", - "metadata": { - "source": "test", - "version": "1.0" - } - } - - -@pytest.fixture -def sample_entity(): - """Sample entity for testing.""" - return { - "name": "TestEntity", - "type": "Person", - "description": "A test entity for multi-tenant isolation testing", - "metadata": { - "test": True, - "created_by": "pytest" - } - } - - -@pytest.fixture -def sample_relation(): - """Sample relation for testing.""" - return { - "source_entity": "Entity1", - "target_entity": "Entity2", - "relation_type": "knows", - "description": "Test relationship between entities", - "weight": 0.8 - } - - -# ============================================================================ -# Database Query Helpers -# ============================================================================ - -class DatabaseHelper: - """Helper class for database operations in tests.""" - - def __init__(self, connection): - self.connection = connection - - def execute_query(self, query: str, params: tuple = ()) -> List[Dict]: - """Execute a SELECT query and return results.""" - with database_transaction(self.connection) as cursor: - cursor.execute(query, params) - columns = [desc[0] for desc in cursor.description] - return [dict(zip(columns, row)) for row in cursor.fetchall()] - - def execute_insert(self, table: str, data: Dict) -> None: - """Insert a row into a table.""" - columns = ", ".join(data.keys()) - placeholders = ", ".join(["%s"] * len(data)) - query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})" - with database_transaction(self.connection) as cursor: - cursor.execute(query, tuple(data.values())) - - def execute_delete(self, table: str, where: Dict) -> int: - """Delete rows from a table.""" - where_clause = " AND ".join([f"{k} = %s" for k in where.keys()]) - query = f"DELETE FROM {table} WHERE {where_clause}" - with database_transaction(self.connection) as cursor: - cursor.execute(query, tuple(where.values())) - return cursor.rowcount - - def count_documents(self, tenant_id: str, kb_id: str) -> int: - """Count documents for a tenant/KB.""" - query = "SELECT COUNT(*) as count FROM documents WHERE tenant_id = %s AND kb_id = %s" - result = self.execute_query(query, (tenant_id, kb_id)) - return result[0]["count"] if result else 0 - - def count_entities(self, tenant_id: str, kb_id: str) -> int: - """Count entities for a tenant/KB.""" - query = "SELECT COUNT(*) as count FROM entities WHERE tenant_id = %s AND kb_id = %s" - result = self.execute_query(query, (tenant_id, kb_id)) - return result[0]["count"] if result else 0 - - def get_all_documents(self, tenant_id: str, kb_id: str) -> List[Dict]: - """Get all documents for a tenant/KB.""" - query = "SELECT * FROM documents WHERE tenant_id = %s AND kb_id = %s ORDER BY created_at DESC" - return self.execute_query(query, (tenant_id, kb_id)) - - def get_all_entities(self, tenant_id: str, kb_id: str) -> List[Dict]: - """Get all entities for a tenant/KB.""" - query = "SELECT * FROM entities WHERE tenant_id = %s AND kb_id = %s ORDER BY created_at DESC" - return self.execute_query(query, (tenant_id, kb_id)) - - def verify_tenant_isolation(self, tenant_id: str) -> bool: - """Verify that no cross-tenant data exists when querying this tenant.""" - # Check that all documents belong to this tenant - query = """ - SELECT COUNT(*) as count FROM documents - WHERE tenant_id != %s AND EXISTS ( - SELECT 1 FROM documents d2 - WHERE d2.tenant_id = %s AND d2.id = documents.id - ) - """ - result = self.execute_query(query, (tenant_id, tenant_id)) - return result[0]["count"] == 0 if result else True - - def clear_tenant_data(self, tenant_id: str, kb_id: Optional[str] = None) -> None: - """Clear all data for a tenant/KB.""" - tables = ["document_status", "embeddings", "documents", "entities", "relations"] - - for table in tables: - if kb_id: - where = {"tenant_id": tenant_id, "kb_id": kb_id} - else: - where = {"tenant_id": tenant_id} - self.execute_delete(table, where) - - -@pytest.fixture -def db_helper(postgres_connection): - """Provide database helper for tests.""" - return DatabaseHelper(postgres_connection) - - -# ============================================================================ -# Mock Services -# ============================================================================ - -@pytest.fixture -def mock_llm_service(): - """Mock LLM service for testing.""" - mock = MagicMock() - mock.generate = MagicMock(return_value="Mock LLM response") - mock.extract_entities = MagicMock(return_value=["Entity1", "Entity2"]) - return mock - - -@pytest.fixture -def mock_embedding_service(): - """Mock embedding service for testing.""" - mock = MagicMock() - mock.embed_text = MagicMock(return_value=[0.1] * 1024) # 1024-dim vector - mock.embed_batch = MagicMock(return_value=[[0.1] * 1024 for _ in range(10)]) - return mock - - -# ============================================================================ -# Async Event Loop -# ============================================================================ @pytest.fixture(scope="session") -def event_loop(): - """Create event loop for async tests.""" - loop = asyncio.get_event_loop_policy().new_event_loop() - yield loop - loop.close() +def stress_test_mode(request): + """ + Fixture to determine whether stress test mode is enabled. + + Priority: CLI option > Environment variable > Default (False) + """ + import os + + # Check CLI option first + if request.config.getoption("--stress-test"): + return True + + # Fall back to environment variable + return os.getenv("LIGHTRAG_STRESS_TEST", "false").lower() == "true" -# ============================================================================ -# Markers and Parametrization -# ============================================================================ +@pytest.fixture(scope="session") +def parallel_workers(request): + """ + Fixture to determine the number of parallel workers for stress tests. -def pytest_configure(config): - """Register custom pytest markers.""" - config.addinivalue_line( - "markers", "compatibility: mark test to run only in compatibility mode" - ) - config.addinivalue_line( - "markers", "single_tenant: mark test to run only in single-tenant mode" - ) - config.addinivalue_line( - "markers", "multi_tenant: mark test to run only in demo/multi-tenant mode" - ) - config.addinivalue_line( - "markers", "database: mark test that requires database connection" - ) - config.addinivalue_line( - "markers", "isolation: mark test that verifies data isolation" - ) + Priority: CLI option > Environment variable > Default (3) + """ + import os + # Check CLI option first + cli_workers = request.config.getoption("--test-workers") + if cli_workers != 3: # Non-default value provided + return cli_workers -# ============================================================================ -# Test Collection Hooks -# ============================================================================ - -def pytest_collection_modifyitems(config, items): - """Skip tests based on testing mode.""" - skip_compatibility = pytest.mark.skip(reason="Not in compatibility mode") - skip_single_tenant = pytest.mark.skip(reason="Not in single-tenant mode") - skip_multi_tenant = pytest.mark.skip(reason="Not in multi-tenant mode") - - for item in items: - if "compatibility" in item.keywords and MULTITENANT_MODE != "off": - item.add_marker(skip_compatibility) - if "single_tenant" in item.keywords and MULTITENANT_MODE != "on": - item.add_marker(skip_single_tenant) - if "multi_tenant" in item.keywords and MULTITENANT_MODE != "demo": - item.add_marker(skip_multi_tenant) + # Fall back to environment variable + return int(os.getenv("LIGHTRAG_TEST_WORKERS", "3")) diff --git a/tests/test_workspace_isolation.py b/tests/test_workspace_isolation.py index bc765633..262b414b 100644 --- a/tests/test_workspace_isolation.py +++ b/tests/test_workspace_isolation.py @@ -1,12 +1,21 @@ #!/usr/bin/env python """ -Test script for PR #2366: Workspace Isolation Feature +Test script for Workspace Isolation Feature -Tests the 4 key scenarios mentioned in PR description: -1. Multi-Workspace Concurrency Test -2. Pipeline Status Isolation Test -3. Backward Compatibility Test -4. Lock Mechanism Test +Comprehensive test suite covering workspace isolation in LightRAG: +1. Pipeline Status Isolation - Data isolation between workspaces +2. Lock Mechanism - Parallel execution for different workspaces, serial for same workspace +3. Backward Compatibility - Legacy code without workspace parameters +4. Multi-Workspace Concurrency - Concurrent operations on different workspaces +5. NamespaceLock Re-entrance Protection - Prevents deadlocks +6. Different Namespace Lock Isolation - Locks isolated by namespace +7. Error Handling - Invalid workspace configurations +8. Update Flags Workspace Isolation - Update flags properly isolated +9. Empty Workspace Standardization - Empty workspace handling +10. JsonKVStorage Workspace Isolation - Integration test for KV storage +11. LightRAG End-to-End Workspace Isolation - Complete E2E test with two instances + +Total: 11 test scenarios """ import asyncio @@ -15,13 +24,16 @@ import os import shutil import tempfile import numpy as np +import pytest from pathlib import Path +from typing import List, Tuple, Dict from lightrag.kg.shared_storage import ( get_final_namespace, get_namespace_lock, get_default_workspace, set_default_workspace, initialize_share_data, + finalize_share_data, initialize_pipeline_status, get_namespace_data, set_all_update_flags, @@ -31,37 +43,105 @@ from lightrag.kg.shared_storage import ( ) -class TestResults: - """Track test results""" +# ============================================================================= +# Test Configuration +# ============================================================================= - def __init__(self): - self.results = [] - - def add(self, test_name, passed, message=""): - self.results.append({"name": test_name, "passed": passed, "message": message}) - status = "✅ PASSED" if passed else "❌ FAILED" - print(f"\n{status}: {test_name}") - if message: - print(f" {message}") - - def summary(self): - print("\n" + "=" * 60) - print("TEST SUMMARY") - print("=" * 60) - passed = sum(1 for r in self.results if r["passed"]) - total = len(self.results) - print(f"Passed: {passed}/{total}") - print() - for r in self.results: - status = "✅" if r["passed"] else "❌" - print(f"{status} {r['name']}") - if r["message"]: - print(f" {r['message']}") - print("=" * 60) - return passed == total +# Test configuration is handled via pytest fixtures in conftest.py +# - Use CLI options: --keep-artifacts, --stress-test, --test-workers=N +# - Or environment variables: LIGHTRAG_KEEP_ARTIFACTS, LIGHTRAG_STRESS_TEST, LIGHTRAG_TEST_WORKERS +# Priority: CLI options > Environment variables > Default values -results = TestResults() +# ============================================================================= +# Pytest Fixtures +# ============================================================================= + + +@pytest.fixture(autouse=True) +def setup_shared_data(): + """Initialize shared data before each test""" + initialize_share_data() + yield + finalize_share_data() + + +async def _measure_lock_parallelism( + workload: List[Tuple[str, str, str]], hold_time: float = 0.05 +) -> Tuple[int, List[Tuple[str, str]], Dict[str, float]]: + """Run lock acquisition workload and capture peak concurrency and timeline. + + Args: + workload: List of (name, workspace, namespace) tuples + hold_time: How long each worker holds the lock (seconds) + + Returns: + Tuple of (max_parallel, timeline, metrics) where: + - max_parallel: Peak number of concurrent lock holders + - timeline: List of (name, event) tuples tracking execution order + - metrics: Dict with performance metrics (total_duration, max_concurrency, etc.) + """ + + running = 0 + max_parallel = 0 + timeline: List[Tuple[str, str]] = [] + start_time = time.time() + + async def worker(name: str, workspace: str, namespace: str) -> None: + nonlocal running, max_parallel + lock = get_namespace_lock(namespace, workspace) + async with lock: + running += 1 + max_parallel = max(max_parallel, running) + timeline.append((name, "start")) + await asyncio.sleep(hold_time) + timeline.append((name, "end")) + running -= 1 + + await asyncio.gather(*(worker(*args) for args in workload)) + + metrics = { + "total_duration": time.time() - start_time, + "max_concurrency": max_parallel, + "avg_hold_time": hold_time, + "num_workers": len(workload), + } + + return max_parallel, timeline, metrics + + +def _assert_no_timeline_overlap(timeline: List[Tuple[str, str]]) -> None: + """Ensure that timeline events never overlap for sequential execution. + + This function implements a finite state machine that validates: + - No overlapping lock acquisitions (only one task active at a time) + - Proper lock release order (task releases its own lock) + - All locks are properly released + + Args: + timeline: List of (name, event) tuples where event is "start" or "end" + + Raises: + AssertionError: If timeline shows overlapping execution or improper locking + """ + + active_task = None + for name, event in timeline: + if event == "start": + if active_task is not None: + raise AssertionError( + f"Task '{name}' started before '{active_task}' released the lock" + ) + active_task = name + else: + if active_task != name: + raise AssertionError( + f"Task '{name}' finished while '{active_task}' was expected to hold the lock" + ) + active_task = None + + if active_task is not None: + raise AssertionError(f"Task '{active_task}' did not release the lock properly") # ============================================================================= @@ -69,70 +149,53 @@ results = TestResults() # ============================================================================= +@pytest.mark.asyncio async def test_pipeline_status_isolation(): """ Test that pipeline status is isolated between different workspaces. """ + # Purpose: Ensure pipeline_status shared data remains unique per workspace. + # Scope: initialize_pipeline_status and get_namespace_data interactions. print("\n" + "=" * 60) print("TEST 1: Pipeline Status Isolation") print("=" * 60) - try: - # Initialize shared storage - initialize_share_data() + # Initialize shared storage + initialize_share_data() - # Initialize pipeline status for two different workspaces - workspace1 = "test_workspace_1" - workspace2 = "test_workspace_2" + # Initialize pipeline status for two different workspaces + workspace1 = "test_workspace_1" + workspace2 = "test_workspace_2" - await initialize_pipeline_status(workspace1) - await initialize_pipeline_status(workspace2) + await initialize_pipeline_status(workspace1) + await initialize_pipeline_status(workspace2) - # Get pipeline status data for both workspaces - data1 = await get_namespace_data("pipeline_status", workspace=workspace1) - data2 = await get_namespace_data("pipeline_status", workspace=workspace2) + # Get pipeline status data for both workspaces + data1 = await get_namespace_data("pipeline_status", workspace=workspace1) + data2 = await get_namespace_data("pipeline_status", workspace=workspace2) - # Verify they are independent objects - if data1 is data2: - results.add( - "Pipeline Status Isolation", - False, - "Pipeline status data objects are the same (should be different)", - ) - return False + # Verify they are independent objects + assert ( + data1 is not data2 + ), "Pipeline status data objects are the same (should be different)" - # Modify workspace1's data and verify workspace2 is not affected - data1["test_key"] = "workspace1_value" + # Modify workspace1's data and verify workspace2 is not affected + data1["test_key"] = "workspace1_value" - # Re-fetch to ensure we get the latest data - data1_check = await get_namespace_data("pipeline_status", workspace=workspace1) - data2_check = await get_namespace_data("pipeline_status", workspace=workspace2) + # Re-fetch to ensure we get the latest data + data1_check = await get_namespace_data("pipeline_status", workspace=workspace1) + data2_check = await get_namespace_data("pipeline_status", workspace=workspace2) - if ( - "test_key" in data1_check - and data1_check["test_key"] == "workspace1_value" - and "test_key" not in data2_check - ): - results.add( - "Pipeline Status Isolation", - True, - "Different workspaces have isolated pipeline status", - ) - return True - else: - results.add( - "Pipeline Status Isolation", - False, - f"Pipeline status not properly isolated: ws1={data1_check.get('test_key')}, ws2={data2_check.get('test_key')}", - ) - return False + assert "test_key" in data1_check, "test_key not found in workspace1" + assert ( + data1_check["test_key"] == "workspace1_value" + ), f"workspace1 test_key value incorrect: {data1_check.get('test_key')}" + assert ( + "test_key" not in data2_check + ), f"test_key leaked to workspace2: {data2_check.get('test_key')}" - except Exception as e: - results.add("Pipeline Status Isolation", False, f"Exception: {str(e)}") - import traceback - - traceback.print_exc() - return False + print("✅ PASSED: Pipeline Status Isolation") + print(" Different workspaces have isolated pipeline status") # ============================================================================= @@ -140,88 +203,68 @@ async def test_pipeline_status_isolation(): # ============================================================================= -async def test_lock_mechanism(): +@pytest.mark.asyncio +async def test_lock_mechanism(stress_test_mode, parallel_workers): """ Test that the new keyed lock mechanism works correctly without deadlocks. Tests both parallel execution for different workspaces and serialization for the same workspace. """ + # Purpose: Validate that keyed locks isolate workspaces while serializing + # requests within the same workspace. Scope: get_namespace_lock scheduling + # semantics for both cross-workspace and single-workspace cases. print("\n" + "=" * 60) print("TEST 2: Lock Mechanism (No Deadlocks)") print("=" * 60) - try: - # Test 2.1: Different workspaces should run in parallel - print("\nTest 2.1: Different workspaces locks should be parallel") + # Test 2.1: Different workspaces should run in parallel + print("\nTest 2.1: Different workspaces locks should be parallel") - async def acquire_lock_timed(workspace, namespace, hold_time): - """Acquire a lock and hold it for specified time""" - lock = get_namespace_lock(namespace, workspace) - start = time.time() - async with lock: - print(f" [{workspace}] acquired lock at {time.time() - start:.2f}s") - await asyncio.sleep(hold_time) - print(f" [{workspace}] releasing lock at {time.time() - start:.2f}s") + # Support stress testing with configurable number of workers + num_workers = parallel_workers if stress_test_mode else 3 + parallel_workload = [ + (f"ws_{chr(97+i)}", f"ws_{chr(97+i)}", "test_namespace") + for i in range(num_workers) + ] - start = time.time() - await asyncio.gather( - acquire_lock_timed("ws_a", "test_namespace", 0.5), - acquire_lock_timed("ws_b", "test_namespace", 0.5), - acquire_lock_timed("ws_c", "test_namespace", 0.5), - ) - elapsed = time.time() - start + max_parallel, timeline_parallel, metrics = await _measure_lock_parallelism( + parallel_workload + ) + assert max_parallel >= 2, ( + "Locks for distinct workspaces should overlap; " + f"observed max concurrency: {max_parallel}, timeline={timeline_parallel}" + ) - # If locks are properly isolated by workspace, this should take ~0.5s (parallel) - # If they block each other, it would take ~1.5s (serial) - parallel_ok = elapsed < 1.0 + print("✅ PASSED: Lock Mechanism - Parallel (Different Workspaces)") + print( + f" Locks overlapped for different workspaces (max concurrency={max_parallel})" + ) + print( + f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} workers" + ) - if parallel_ok: - results.add( - "Lock Mechanism - Parallel (Different Workspaces)", - True, - f"Locks ran in parallel: {elapsed:.2f}s", - ) - else: - results.add( - "Lock Mechanism - Parallel (Different Workspaces)", - False, - f"Locks blocked each other: {elapsed:.2f}s (expected < 1.0s)", - ) + # Test 2.2: Same workspace should serialize + print("\nTest 2.2: Same workspace locks should serialize") + serial_workload = [ + ("serial_run_1", "ws_same", "test_namespace"), + ("serial_run_2", "ws_same", "test_namespace"), + ] + ( + max_parallel_serial, + timeline_serial, + metrics_serial, + ) = await _measure_lock_parallelism(serial_workload) + assert max_parallel_serial == 1, ( + "Same workspace locks should not overlap; " + f"observed {max_parallel_serial} with timeline {timeline_serial}" + ) + _assert_no_timeline_overlap(timeline_serial) - # Test 2.2: Same workspace should serialize - print("\nTest 2.2: Same workspace locks should serialize") - - start = time.time() - await asyncio.gather( - acquire_lock_timed("ws_same", "test_namespace", 0.3), - acquire_lock_timed("ws_same", "test_namespace", 0.3), - ) - elapsed = time.time() - start - - # Same workspace should serialize, taking ~0.6s - serial_ok = elapsed >= 0.5 - - if serial_ok: - results.add( - "Lock Mechanism - Serial (Same Workspace)", - True, - f"Locks serialized correctly: {elapsed:.2f}s", - ) - else: - results.add( - "Lock Mechanism - Serial (Same Workspace)", - False, - f"Locks didn't serialize: {elapsed:.2f}s (expected >= 0.5s)", - ) - - return parallel_ok and serial_ok - - except Exception as e: - results.add("Lock Mechanism", False, f"Exception: {str(e)}") - import traceback - - traceback.print_exc() - return False + print("✅ PASSED: Lock Mechanism - Serial (Same Workspace)") + print(" Same workspace operations executed sequentially with no overlap") + print( + f" Performance: {metrics_serial['total_duration']:.3f}s for {metrics_serial['num_workers']} tasks" + ) # ============================================================================= @@ -229,114 +272,72 @@ async def test_lock_mechanism(): # ============================================================================= +@pytest.mark.asyncio async def test_backward_compatibility(): """ Test that legacy code without workspace parameter still works correctly. """ + # Purpose: Validate backward-compatible defaults when workspace arguments + # are omitted. Scope: get_final_namespace, set/get_default_workspace and + # initialize_pipeline_status fallback behavior. print("\n" + "=" * 60) print("TEST 3: Backward Compatibility") print("=" * 60) - try: - # Test 3.1: get_final_namespace with None should use default workspace - print("\nTest 3.1: get_final_namespace with workspace=None") + # Test 3.1: get_final_namespace with None should use default workspace + print("\nTest 3.1: get_final_namespace with workspace=None") - set_default_workspace("my_default_workspace") - final_ns = get_final_namespace("pipeline_status", workspace=None) - expected = "my_default_workspace:pipeline_status" + set_default_workspace("my_default_workspace") + final_ns = get_final_namespace("pipeline_status", workspace=None) + expected = "my_default_workspace:pipeline_status" - if final_ns == expected: - results.add( - "Backward Compatibility - get_final_namespace", - True, - f"Correctly uses default workspace: {final_ns}", - ) - compat_1_ok = True - else: - results.add( - "Backward Compatibility - get_final_namespace", - False, - f"Expected {expected}, got {final_ns}", - ) - compat_1_ok = False + assert final_ns == expected, f"Expected {expected}, got {final_ns}" - # Test 3.2: get_default_workspace - print("\nTest 3.2: get/set default workspace") + print("✅ PASSED: Backward Compatibility - get_final_namespace") + print(f" Correctly uses default workspace: {final_ns}") - set_default_workspace("test_default") - retrieved = get_default_workspace() + # Test 3.2: get_default_workspace + print("\nTest 3.2: get/set default workspace") - if retrieved == "test_default": - results.add( - "Backward Compatibility - default workspace", - True, - f"Default workspace set/get correctly: {retrieved}", - ) - compat_2_ok = True - else: - results.add( - "Backward Compatibility - default workspace", - False, - f"Expected 'test_default', got {retrieved}", - ) - compat_2_ok = False + set_default_workspace("test_default") + retrieved = get_default_workspace() - # Test 3.3: Empty workspace handling - print("\nTest 3.3: Empty workspace handling") + assert retrieved == "test_default", f"Expected 'test_default', got {retrieved}" - set_default_workspace("") - final_ns_empty = get_final_namespace("pipeline_status", workspace=None) - expected_empty = "pipeline_status" # Should be just the namespace without ':' + print("✅ PASSED: Backward Compatibility - default workspace") + print(f" Default workspace set/get correctly: {retrieved}") - if final_ns_empty == expected_empty: - results.add( - "Backward Compatibility - empty workspace", - True, - f"Empty workspace handled correctly: '{final_ns_empty}'", - ) - compat_3_ok = True - else: - results.add( - "Backward Compatibility - empty workspace", - False, - f"Expected '{expected_empty}', got '{final_ns_empty}'", - ) - compat_3_ok = False + # Test 3.3: Empty workspace handling + print("\nTest 3.3: Empty workspace handling") - # Test 3.4: None workspace with default set - print("\nTest 3.4: initialize_pipeline_status with workspace=None") - set_default_workspace("compat_test_workspace") - initialize_share_data() - await initialize_pipeline_status(workspace=None) # Should use default + set_default_workspace("") + final_ns_empty = get_final_namespace("pipeline_status", workspace=None) + expected_empty = "pipeline_status" # Should be just the namespace without ':' - # Try to get data using the default workspace explicitly - data = await get_namespace_data( - "pipeline_status", workspace="compat_test_workspace" - ) + assert ( + final_ns_empty == expected_empty + ), f"Expected '{expected_empty}', got '{final_ns_empty}'" - if data is not None: - results.add( - "Backward Compatibility - pipeline init with None", - True, - "Pipeline status initialized with default workspace", - ) - compat_4_ok = True - else: - results.add( - "Backward Compatibility - pipeline init with None", - False, - "Failed to initialize pipeline status with default workspace", - ) - compat_4_ok = False + print("✅ PASSED: Backward Compatibility - empty workspace") + print(f" Empty workspace handled correctly: '{final_ns_empty}'") - return compat_1_ok and compat_2_ok and compat_3_ok and compat_4_ok + # Test 3.4: None workspace with default set + print("\nTest 3.4: initialize_pipeline_status with workspace=None") + set_default_workspace("compat_test_workspace") + initialize_share_data() + await initialize_pipeline_status(workspace=None) # Should use default - except Exception as e: - results.add("Backward Compatibility", False, f"Exception: {str(e)}") - import traceback + # Try to get data using the default workspace explicitly + data = await get_namespace_data( + "pipeline_status", workspace="compat_test_workspace" + ) - traceback.print_exc() - return False + assert ( + data is not None + ), "Failed to initialize pipeline status with default workspace" + + print("✅ PASSED: Backward Compatibility - pipeline init with None") + print(" Pipeline status initialized with default workspace") # ============================================================================= @@ -344,107 +345,82 @@ async def test_backward_compatibility(): # ============================================================================= +@pytest.mark.asyncio async def test_multi_workspace_concurrency(): """ Test that multiple workspaces can operate concurrently without interference. Simulates concurrent operations on different workspaces. """ + # Purpose: Simulate concurrent workloads touching pipeline_status across + # workspaces. Scope: initialize_pipeline_status, get_namespace_lock, and + # shared dictionary mutation while ensuring isolation. print("\n" + "=" * 60) print("TEST 4: Multi-Workspace Concurrency") print("=" * 60) - try: - initialize_share_data() + initialize_share_data() - async def workspace_operations(workspace_id): - """Simulate operations on a specific workspace""" - print(f"\n [{workspace_id}] Starting operations") + async def workspace_operations(workspace_id): + """Simulate operations on a specific workspace""" + print(f"\n [{workspace_id}] Starting operations") - # Initialize pipeline status - await initialize_pipeline_status(workspace_id) + # Initialize pipeline status + await initialize_pipeline_status(workspace_id) - # Get lock and perform operations - lock = get_namespace_lock("test_operations", workspace_id) - async with lock: - # Get workspace data - data = await get_namespace_data( - "pipeline_status", workspace=workspace_id - ) + # Get lock and perform operations + lock = get_namespace_lock("test_operations", workspace_id) + async with lock: + # Get workspace data + data = await get_namespace_data("pipeline_status", workspace=workspace_id) - # Modify data - data[f"{workspace_id}_key"] = f"{workspace_id}_value" - data["timestamp"] = time.time() + # Modify data + data[f"{workspace_id}_key"] = f"{workspace_id}_value" + data["timestamp"] = time.time() - # Simulate some work - await asyncio.sleep(0.1) + # Simulate some work + await asyncio.sleep(0.1) - print(f" [{workspace_id}] Completed operations") + print(f" [{workspace_id}] Completed operations") - return workspace_id + return workspace_id - # Run multiple workspaces concurrently - workspaces = ["concurrent_ws_1", "concurrent_ws_2", "concurrent_ws_3"] + # Run multiple workspaces concurrently + workspaces = ["concurrent_ws_1", "concurrent_ws_2", "concurrent_ws_3"] - start = time.time() - results_list = await asyncio.gather( - *[workspace_operations(ws) for ws in workspaces] - ) - elapsed = time.time() - start + start = time.time() + results_list = await asyncio.gather( + *[workspace_operations(ws) for ws in workspaces] + ) + elapsed = time.time() - start - print(f"\n All workspaces completed in {elapsed:.2f}s") + print(f"\n All workspaces completed in {elapsed:.2f}s") - # Verify all workspaces completed - if set(results_list) == set(workspaces): - results.add( - "Multi-Workspace Concurrency - Execution", - True, - f"All {len(workspaces)} workspaces completed successfully in {elapsed:.2f}s", - ) - exec_ok = True - else: - results.add( - "Multi-Workspace Concurrency - Execution", - False, - "Not all workspaces completed", - ) - exec_ok = False + # Verify all workspaces completed + assert set(results_list) == set(workspaces), "Not all workspaces completed" - # Verify data isolation - each workspace should have its own data - print("\n Verifying data isolation...") - isolation_ok = True + print("✅ PASSED: Multi-Workspace Concurrency - Execution") + print( + f" All {len(workspaces)} workspaces completed successfully in {elapsed:.2f}s" + ) - for ws in workspaces: - data = await get_namespace_data("pipeline_status", workspace=ws) - expected_key = f"{ws}_key" - expected_value = f"{ws}_value" + # Verify data isolation - each workspace should have its own data + print("\n Verifying data isolation...") - if expected_key not in data or data[expected_key] != expected_value: - results.add( - f"Multi-Workspace Concurrency - Data Isolation ({ws})", - False, - f"Data not properly isolated for {ws}", - ) - isolation_ok = False - else: - print( - f" [{ws}] Data correctly isolated: {expected_key}={data[expected_key]}" - ) + for ws in workspaces: + data = await get_namespace_data("pipeline_status", workspace=ws) + expected_key = f"{ws}_key" + expected_value = f"{ws}_value" - if isolation_ok: - results.add( - "Multi-Workspace Concurrency - Data Isolation", - True, - "All workspaces have properly isolated data", - ) + assert ( + expected_key in data + ), f"Data not properly isolated for {ws}: missing {expected_key}" + assert ( + data[expected_key] == expected_value + ), f"Data not properly isolated for {ws}: {expected_key}={data[expected_key]} (expected {expected_value})" + print(f" [{ws}] Data correctly isolated: {expected_key}={data[expected_key]}") - return exec_ok and isolation_ok - - except Exception as e: - results.add("Multi-Workspace Concurrency", False, f"Exception: {str(e)}") - import traceback - - traceback.print_exc() - return False + print("✅ PASSED: Multi-Workspace Concurrency - Data Isolation") + print(" All workspaces have properly isolated data") # ============================================================================= @@ -452,94 +428,72 @@ async def test_multi_workspace_concurrency(): # ============================================================================= +@pytest.mark.asyncio async def test_namespace_lock_reentrance(): """ Test that NamespaceLock prevents re-entrance in the same coroutine and allows concurrent use in different coroutines. """ + # Purpose: Ensure NamespaceLock enforces single entry per coroutine while + # allowing concurrent reuse through ContextVar isolation. Scope: lock + # re-entrance checks and concurrent gather semantics. print("\n" + "=" * 60) print("TEST 5: NamespaceLock Re-entrance Protection") print("=" * 60) + # Test 5.1: Same coroutine re-entrance should fail + print("\nTest 5.1: Same coroutine re-entrance should raise RuntimeError") + + lock = get_namespace_lock("test_reentrance", "test_ws") + + reentrance_failed_correctly = False try: - # Test 5.1: Same coroutine re-entrance should fail - print("\nTest 5.1: Same coroutine re-entrance should raise RuntimeError") - - lock = get_namespace_lock("test_reentrance", "test_ws") - - reentrance_failed_correctly = False - try: + async with lock: + print(" Acquired lock first time") + # Try to acquire the same lock again in the same coroutine async with lock: - print(" Acquired lock first time") - # Try to acquire the same lock again in the same coroutine - async with lock: - print(" ERROR: Should not reach here - re-entrance succeeded!") - except RuntimeError as e: - if "already acquired" in str(e).lower(): - print(f" ✓ Re-entrance correctly blocked: {e}") - reentrance_failed_correctly = True - else: - print(f" ✗ Unexpected RuntimeError: {e}") - - if reentrance_failed_correctly: - results.add( - "NamespaceLock Re-entrance Protection", - True, - "Re-entrance correctly raises RuntimeError", - ) + print(" ERROR: Should not reach here - re-entrance succeeded!") + except RuntimeError as e: + if "already acquired" in str(e).lower(): + print(f" ✓ Re-entrance correctly blocked: {e}") + reentrance_failed_correctly = True else: - results.add( - "NamespaceLock Re-entrance Protection", - False, - "Re-entrance protection not working", - ) + raise - # Test 5.2: Same NamespaceLock instance in different coroutines should succeed - print("\nTest 5.2: Same NamespaceLock instance in different coroutines") + assert reentrance_failed_correctly, "Re-entrance protection not working" - shared_lock = get_namespace_lock("test_concurrent", "test_ws") - concurrent_results = [] + print("✅ PASSED: NamespaceLock Re-entrance Protection") + print(" Re-entrance correctly raises RuntimeError") - async def use_shared_lock(coroutine_id): - """Use the same NamespaceLock instance""" - async with shared_lock: - concurrent_results.append(f"coroutine_{coroutine_id}_start") - await asyncio.sleep(0.1) - concurrent_results.append(f"coroutine_{coroutine_id}_end") + # Test 5.2: Same NamespaceLock instance in different coroutines should succeed + print("\nTest 5.2: Same NamespaceLock instance in different coroutines") - # This should work because each coroutine gets its own ContextVar - await asyncio.gather( - use_shared_lock(1), - use_shared_lock(2), - ) + shared_lock = get_namespace_lock("test_concurrent", "test_ws") + concurrent_results = [] - # Both coroutines should have completed - expected_entries = 4 # 2 starts + 2 ends - if len(concurrent_results) == expected_entries: - results.add( - "NamespaceLock Concurrent Reuse", - True, - f"Same NamespaceLock instance used successfully in {expected_entries//2} concurrent coroutines", - ) - concurrent_ok = True - else: - results.add( - "NamespaceLock Concurrent Reuse", - False, - f"Expected {expected_entries} entries, got {len(concurrent_results)}", - ) - concurrent_ok = False + async def use_shared_lock(coroutine_id): + """Use the same NamespaceLock instance""" + async with shared_lock: + concurrent_results.append(f"coroutine_{coroutine_id}_start") + await asyncio.sleep(0.1) + concurrent_results.append(f"coroutine_{coroutine_id}_end") - return reentrance_failed_correctly and concurrent_ok + # This should work because each coroutine gets its own ContextVar + await asyncio.gather( + use_shared_lock(1), + use_shared_lock(2), + ) - except Exception as e: - results.add( - "NamespaceLock Re-entrance Protection", False, f"Exception: {str(e)}" - ) - import traceback + # Both coroutines should have completed + expected_entries = 4 # 2 starts + 2 ends + assert ( + len(concurrent_results) == expected_entries + ), f"Expected {expected_entries} entries, got {len(concurrent_results)}" - traceback.print_exc() - return False + print("✅ PASSED: NamespaceLock Concurrent Reuse") + print( + f" Same NamespaceLock instance used successfully in {expected_entries//2} concurrent coroutines" + ) # ============================================================================= @@ -547,59 +501,38 @@ async def test_namespace_lock_reentrance(): # ============================================================================= +@pytest.mark.asyncio async def test_different_namespace_lock_isolation(): """ Test that locks for different namespaces (same workspace) are independent. """ + # Purpose: Confirm that namespace isolation is enforced even when workspace + # is the same. Scope: get_namespace_lock behavior when namespaces differ. print("\n" + "=" * 60) print("TEST 6: Different Namespace Lock Isolation") print("=" * 60) - try: - print("\nTesting locks with same workspace but different namespaces") + print("\nTesting locks with same workspace but different namespaces") - async def acquire_lock_timed(workspace, namespace, hold_time, name): - """Acquire a lock and hold it for specified time""" - lock = get_namespace_lock(namespace, workspace) - start = time.time() - async with lock: - print(f" [{name}] acquired lock at {time.time() - start:.2f}s") - await asyncio.sleep(hold_time) - print(f" [{name}] releasing lock at {time.time() - start:.2f}s") + workload = [ + ("ns_a", "same_ws", "namespace_a"), + ("ns_b", "same_ws", "namespace_b"), + ("ns_c", "same_ws", "namespace_c"), + ] + max_parallel, timeline, metrics = await _measure_lock_parallelism(workload) - # These should run in parallel (different namespaces) - start = time.time() - await asyncio.gather( - acquire_lock_timed("same_ws", "namespace_a", 0.5, "ns_a"), - acquire_lock_timed("same_ws", "namespace_b", 0.5, "ns_b"), - acquire_lock_timed("same_ws", "namespace_c", 0.5, "ns_c"), - ) - elapsed = time.time() - start + assert max_parallel >= 2, ( + "Different namespaces within the same workspace should run concurrently; " + f"observed max concurrency {max_parallel} with timeline {timeline}" + ) - # If locks are properly isolated by namespace, this should take ~0.5s (parallel) - namespace_isolation_ok = elapsed < 1.0 - - if namespace_isolation_ok: - results.add( - "Different Namespace Lock Isolation", - True, - f"Different namespace locks ran in parallel: {elapsed:.2f}s", - ) - else: - results.add( - "Different Namespace Lock Isolation", - False, - f"Different namespace locks blocked each other: {elapsed:.2f}s (expected < 1.0s)", - ) - - return namespace_isolation_ok - - except Exception as e: - results.add("Different Namespace Lock Isolation", False, f"Exception: {str(e)}") - import traceback - - traceback.print_exc() - return False + print("✅ PASSED: Different Namespace Lock Isolation") + print( + f" Different namespace locks ran in parallel (max concurrency={max_parallel})" + ) + print( + f" Performance: {metrics['total_duration']:.3f}s for {metrics['num_workers']} namespaces" + ) # ============================================================================= @@ -607,68 +540,49 @@ async def test_different_namespace_lock_isolation(): # ============================================================================= +@pytest.mark.asyncio async def test_error_handling(): """ Test error handling for invalid workspace configurations. """ + # Purpose: Validate guardrails for workspace normalization and namespace + # derivation. Scope: set_default_workspace conversions and get_final_namespace + # failure paths when configuration is invalid. print("\n" + "=" * 60) print("TEST 7: Error Handling") print("=" * 60) - try: - # Test 7.1: set_default_workspace(None) converts to empty string - print("\nTest 7.1: set_default_workspace(None) converts to empty string") + # Test 7.0: Missing default workspace should raise ValueError + print("\nTest 7.0: Missing workspace raises ValueError") + with pytest.raises(ValueError): + get_final_namespace("test_namespace", workspace=None) - set_default_workspace(None) - default_ws = get_default_workspace() + # Test 7.1: set_default_workspace(None) converts to empty string + print("\nTest 7.1: set_default_workspace(None) converts to empty string") - # Should convert None to "" automatically - conversion_ok = default_ws == "" + set_default_workspace(None) + default_ws = get_default_workspace() - if conversion_ok: - results.add( - "Error Handling - None to Empty String", - True, - f"set_default_workspace(None) correctly converts to empty string: '{default_ws}'", - ) - else: - results.add( - "Error Handling - None to Empty String", - False, - f"Expected empty string, got: '{default_ws}'", - ) + # Should convert None to "" automatically + assert default_ws == "", f"Expected empty string, got: '{default_ws}'" - # Test 7.2: Empty string workspace behavior - print("\nTest 7.2: Empty string workspace creates valid namespace") + print("✅ PASSED: Error Handling - None to Empty String") + print( + f" set_default_workspace(None) correctly converts to empty string: '{default_ws}'" + ) - # With empty workspace, should create namespace without colon - final_ns = get_final_namespace("test_namespace", workspace="") - namespace_ok = final_ns == "test_namespace" + # Test 7.2: Empty string workspace behavior + print("\nTest 7.2: Empty string workspace creates valid namespace") - if namespace_ok: - results.add( - "Error Handling - Empty Workspace Namespace", - True, - f"Empty workspace creates valid namespace: '{final_ns}'", - ) - else: - results.add( - "Error Handling - Empty Workspace Namespace", - False, - f"Unexpected namespace: '{final_ns}'", - ) + # With empty workspace, should create namespace without colon + final_ns = get_final_namespace("test_namespace", workspace="") + assert final_ns == "test_namespace", f"Unexpected namespace: '{final_ns}'" - # Restore default workspace for other tests - set_default_workspace("") + print("✅ PASSED: Error Handling - Empty Workspace Namespace") + print(f" Empty workspace creates valid namespace: '{final_ns}'") - return conversion_ok and namespace_ok - - except Exception as e: - results.add("Error Handling", False, f"Exception: {str(e)}") - import traceback - - traceback.print_exc() - return False + # Restore default workspace for other tests + set_default_workspace("") # ============================================================================= @@ -676,123 +590,128 @@ async def test_error_handling(): # ============================================================================= +@pytest.mark.asyncio async def test_update_flags_workspace_isolation(): """ Test that update flags are properly isolated between workspaces. """ + # Purpose: Confirm update flag setters/readers respect workspace scoping. + # Scope: set_all_update_flags, clear_all_update_flags, get_all_update_flags_status, + # and get_update_flag interactions across namespaces. print("\n" + "=" * 60) print("TEST 8: Update Flags Workspace Isolation") print("=" * 60) - try: - initialize_share_data() + initialize_share_data() - workspace1 = "update_flags_ws1" - workspace2 = "update_flags_ws2" - test_namespace = "test_update_flags_ns" + workspace1 = "update_flags_ws1" + workspace2 = "update_flags_ws2" + test_namespace = "test_update_flags_ns" - # Initialize namespaces for both workspaces - await initialize_pipeline_status(workspace1) - await initialize_pipeline_status(workspace2) + # Initialize namespaces for both workspaces + await initialize_pipeline_status(workspace1) + await initialize_pipeline_status(workspace2) - # Test 8.1: set_all_update_flags isolation - print("\nTest 8.1: set_all_update_flags workspace isolation") + # Test 8.1: set_all_update_flags isolation + print("\nTest 8.1: set_all_update_flags workspace isolation") - # Create flags for both workspaces (simulating workers) - flag1_obj = await get_update_flag(test_namespace, workspace=workspace1) - flag2_obj = await get_update_flag(test_namespace, workspace=workspace2) + # Create flags for both workspaces (simulating workers) + flag1_obj = await get_update_flag(test_namespace, workspace=workspace1) + flag2_obj = await get_update_flag(test_namespace, workspace=workspace2) - # Set all flags for workspace1 - await set_all_update_flags(test_namespace, workspace=workspace1) + # Initial state should be False + assert flag1_obj.value is False, "Flag1 initial value should be False" + assert flag2_obj.value is False, "Flag2 initial value should be False" - # Check that only workspace1's flags are set - set_flags_isolated = flag1_obj.value is True and flag2_obj.value is False + # Set all flags for workspace1 + await set_all_update_flags(test_namespace, workspace=workspace1) - if set_flags_isolated: - results.add( - "Update Flags - set_all_update_flags Isolation", - True, - f"set_all_update_flags isolated: ws1={flag1_obj.value}, ws2={flag2_obj.value}", - ) - else: - results.add( - "Update Flags - set_all_update_flags Isolation", - False, - f"Flags not isolated: ws1={flag1_obj.value}, ws2={flag2_obj.value}", - ) + # Check that only workspace1's flags are set + assert ( + flag1_obj.value is True + ), f"Flag1 should be True after set_all_update_flags, got {flag1_obj.value}" + assert ( + flag2_obj.value is False + ), f"Flag2 should still be False, got {flag2_obj.value}" - # Test 8.2: clear_all_update_flags isolation - print("\nTest 8.2: clear_all_update_flags workspace isolation") + print("✅ PASSED: Update Flags - set_all_update_flags Isolation") + print( + f" set_all_update_flags isolated: ws1={flag1_obj.value}, ws2={flag2_obj.value}" + ) - # Set flags for both workspaces - await set_all_update_flags(test_namespace, workspace=workspace1) - await set_all_update_flags(test_namespace, workspace=workspace2) + # Test 8.2: clear_all_update_flags isolation + print("\nTest 8.2: clear_all_update_flags workspace isolation") - # Clear only workspace1 - await clear_all_update_flags(test_namespace, workspace=workspace1) + # Set flags for both workspaces + await set_all_update_flags(test_namespace, workspace=workspace1) + await set_all_update_flags(test_namespace, workspace=workspace2) - # Check that only workspace1's flags are cleared - clear_flags_isolated = flag1_obj.value is False and flag2_obj.value is True + # Verify both are set + assert flag1_obj.value is True, "Flag1 should be True" + assert flag2_obj.value is True, "Flag2 should be True" - if clear_flags_isolated: - results.add( - "Update Flags - clear_all_update_flags Isolation", - True, - f"clear_all_update_flags isolated: ws1={flag1_obj.value}, ws2={flag2_obj.value}", - ) - else: - results.add( - "Update Flags - clear_all_update_flags Isolation", - False, - f"Flags not isolated: ws1={flag1_obj.value}, ws2={flag2_obj.value}", - ) + # Clear only workspace1 + await clear_all_update_flags(test_namespace, workspace=workspace1) - # Test 8.3: get_all_update_flags_status workspace filtering - print("\nTest 8.3: get_all_update_flags_status workspace filtering") + # Check that only workspace1's flags are cleared + assert ( + flag1_obj.value is False + ), f"Flag1 should be False after clear, got {flag1_obj.value}" + assert flag2_obj.value is True, f"Flag2 should still be True, got {flag2_obj.value}" - # Initialize more namespaces for testing - await get_update_flag("ns_a", workspace=workspace1) - await get_update_flag("ns_b", workspace=workspace1) - await get_update_flag("ns_c", workspace=workspace2) + print("✅ PASSED: Update Flags - clear_all_update_flags Isolation") + print( + f" clear_all_update_flags isolated: ws1={flag1_obj.value}, ws2={flag2_obj.value}" + ) - # Set flags for workspace1 - await set_all_update_flags("ns_a", workspace=workspace1) - await set_all_update_flags("ns_b", workspace=workspace1) + # Test 8.3: get_all_update_flags_status workspace filtering + print("\nTest 8.3: get_all_update_flags_status workspace filtering") - # Set flags for workspace2 - await set_all_update_flags("ns_c", workspace=workspace2) + # Initialize more namespaces for testing + await get_update_flag("ns_a", workspace=workspace1) + await get_update_flag("ns_b", workspace=workspace1) + await get_update_flag("ns_c", workspace=workspace2) - # Get status for workspace1 only - status1 = await get_all_update_flags_status(workspace=workspace1) + # Set flags for workspace1 + await set_all_update_flags("ns_a", workspace=workspace1) + await set_all_update_flags("ns_b", workspace=workspace1) - # Check that workspace1's namespaces are present - # The keys should include workspace1's namespaces but not workspace2's - workspace1_keys = [k for k in status1.keys() if workspace1 in k] - workspace2_keys = [k for k in status1.keys() if workspace2 in k] + # Set flags for workspace2 + await set_all_update_flags("ns_c", workspace=workspace2) - status_filtered = len(workspace1_keys) > 0 and len(workspace2_keys) == 0 + # Get status for workspace1 only + status1 = await get_all_update_flags_status(workspace=workspace1) - if status_filtered: - results.add( - "Update Flags - get_all_update_flags_status Filtering", - True, - f"Status correctly filtered: ws1 keys={len(workspace1_keys)}, ws2 keys={len(workspace2_keys)}", - ) - else: - results.add( - "Update Flags - get_all_update_flags_status Filtering", - False, - f"Status not filtered correctly: ws1 keys={len(workspace1_keys)}, ws2 keys={len(workspace2_keys)}", - ) + # Check that workspace1's namespaces are present + # The keys should include workspace1's namespaces but not workspace2's + workspace1_keys = [k for k in status1.keys() if workspace1 in k] + workspace2_keys = [k for k in status1.keys() if workspace2 in k] - return set_flags_isolated and clear_flags_isolated and status_filtered + assert ( + len(workspace1_keys) > 0 + ), f"workspace1 keys should be present, got {len(workspace1_keys)}" + assert ( + len(workspace2_keys) == 0 + ), f"workspace2 keys should not be present, got {len(workspace2_keys)}" + for key, values in status1.items(): + assert all(values), f"All flags in {key} should be True, got {values}" - except Exception as e: - results.add("Update Flags Workspace Isolation", False, f"Exception: {str(e)}") - import traceback + # Workspace2 query should only surface workspace2 namespaces + status2 = await get_all_update_flags_status(workspace=workspace2) + expected_ws2_keys = { + f"{workspace2}:{test_namespace}", + f"{workspace2}:ns_c", + } + assert ( + set(status2.keys()) == expected_ws2_keys + ), f"Unexpected namespaces for workspace2: {status2.keys()}" + for key, values in status2.items(): + assert all(values), f"All flags in {key} should be True, got {values}" - traceback.print_exc() - return False + print("✅ PASSED: Update Flags - get_all_update_flags_status Filtering") + print( + f" Status correctly filtered: ws1 keys={len(workspace1_keys)}, ws2 keys={len(workspace2_keys)}" + ) # ============================================================================= @@ -800,74 +719,52 @@ async def test_update_flags_workspace_isolation(): # ============================================================================= +@pytest.mark.asyncio async def test_empty_workspace_standardization(): """ Test that empty workspace is properly standardized to "" instead of "_". """ + # Purpose: Verify namespace formatting when workspace is an empty string. + # Scope: get_final_namespace output and initialize_pipeline_status behavior + # between empty and non-empty workspaces. print("\n" + "=" * 60) print("TEST 9: Empty Workspace Standardization") print("=" * 60) - try: - # Test 9.1: Empty string workspace creates namespace without colon - print("\nTest 9.1: Empty string workspace namespace format") + # Test 9.1: Empty string workspace creates namespace without colon + print("\nTest 9.1: Empty string workspace namespace format") - set_default_workspace("") - final_ns = get_final_namespace("test_namespace", workspace=None) + set_default_workspace("") + final_ns = get_final_namespace("test_namespace", workspace=None) - # Should be just "test_namespace" without colon prefix - empty_ws_ok = final_ns == "test_namespace" + # Should be just "test_namespace" without colon prefix + assert ( + final_ns == "test_namespace" + ), f"Unexpected namespace format: '{final_ns}' (expected 'test_namespace')" - if empty_ws_ok: - results.add( - "Empty Workspace Standardization - Format", - True, - f"Empty workspace creates correct namespace: '{final_ns}'", - ) - else: - results.add( - "Empty Workspace Standardization - Format", - False, - f"Unexpected namespace format: '{final_ns}' (expected 'test_namespace')", - ) + print("✅ PASSED: Empty Workspace Standardization - Format") + print(f" Empty workspace creates correct namespace: '{final_ns}'") - # Test 9.2: Empty workspace vs non-empty workspace behavior - print("\nTest 9.2: Empty vs non-empty workspace behavior") + # Test 9.2: Empty workspace vs non-empty workspace behavior + print("\nTest 9.2: Empty vs non-empty workspace behavior") - initialize_share_data() + initialize_share_data() - # Initialize with empty workspace - await initialize_pipeline_status(workspace="") - data_empty = await get_namespace_data("pipeline_status", workspace="") + # Initialize with empty workspace + await initialize_pipeline_status(workspace="") + data_empty = await get_namespace_data("pipeline_status", workspace="") - # Initialize with non-empty workspace - await initialize_pipeline_status(workspace="test_ws") - data_nonempty = await get_namespace_data("pipeline_status", workspace="test_ws") + # Initialize with non-empty workspace + await initialize_pipeline_status(workspace="test_ws") + data_nonempty = await get_namespace_data("pipeline_status", workspace="test_ws") - # They should be different objects - behavior_ok = data_empty is not data_nonempty + # They should be different objects + assert ( + data_empty is not data_nonempty + ), "Empty and non-empty workspaces share data (should be independent)" - if behavior_ok: - results.add( - "Empty Workspace Standardization - Behavior", - True, - "Empty and non-empty workspaces have independent data", - ) - else: - results.add( - "Empty Workspace Standardization - Behavior", - False, - "Empty and non-empty workspaces share data (should be independent)", - ) - - return empty_ws_ok and behavior_ok - - except Exception as e: - results.add("Empty Workspace Standardization", False, f"Exception: {str(e)}") - import traceback - - traceback.print_exc() - return False + print("✅ PASSED: Empty Workspace Standardization - Behavior") + print(" Empty and non-empty workspaces have independent data") # ============================================================================= @@ -875,12 +772,16 @@ async def test_empty_workspace_standardization(): # ============================================================================= -async def test_json_kv_storage_workspace_isolation(): +@pytest.mark.asyncio +async def test_json_kv_storage_workspace_isolation(keep_test_artifacts): """ Integration test: Verify JsonKVStorage properly isolates data between workspaces. Creates two JsonKVStorage instances with different workspaces, writes different data, and verifies they don't mix. """ + # Purpose: Ensure JsonKVStorage respects workspace-specific directories and data. + # Scope: storage initialization, upsert/get_by_id operations, and filesystem layout + # inside the temporary working directory. print("\n" + "=" * 60) print("TEST 10: JsonKVStorage Workspace Isolation (Integration)") print("=" * 60) @@ -980,33 +881,33 @@ async def test_json_kv_storage_workspace_isolation(): print(f" Storage2 entity2: {result2_entity2}") # Verify isolation (get_by_id returns dict) - isolated = ( - result1_entity1 is not None - and result1_entity2 is not None - and result2_entity1 is not None - and result2_entity2 is not None - and result1_entity1.get("content") == "Data from workspace1 - AI Research" - and result1_entity2.get("content") - == "Data from workspace1 - Machine Learning" - and result2_entity1.get("content") == "Data from workspace2 - Deep Learning" - and result2_entity2.get("content") - == "Data from workspace2 - Neural Networks" - and result1_entity1.get("content") != result2_entity1.get("content") - and result1_entity2.get("content") != result2_entity2.get("content") - ) + assert result1_entity1 is not None, "Storage1 entity1 should not be None" + assert result1_entity2 is not None, "Storage1 entity2 should not be None" + assert result2_entity1 is not None, "Storage2 entity1 should not be None" + assert result2_entity2 is not None, "Storage2 entity2 should not be None" + assert ( + result1_entity1.get("content") == "Data from workspace1 - AI Research" + ), "Storage1 entity1 content mismatch" + assert ( + result1_entity2.get("content") == "Data from workspace1 - Machine Learning" + ), "Storage1 entity2 content mismatch" + assert ( + result2_entity1.get("content") == "Data from workspace2 - Deep Learning" + ), "Storage2 entity1 content mismatch" + assert ( + result2_entity2.get("content") == "Data from workspace2 - Neural Networks" + ), "Storage2 entity2 content mismatch" + assert result1_entity1.get("content") != result2_entity1.get( + "content" + ), "Storage1 and Storage2 entity1 should have different content" + assert result1_entity2.get("content") != result2_entity2.get( + "content" + ), "Storage1 and Storage2 entity2 should have different content" - if isolated: - results.add( - "JsonKVStorage - Data Isolation", - True, - "Two storage instances correctly isolated: ws1 and ws2 have different data", - ) - else: - results.add( - "JsonKVStorage - Data Isolation", - False, - "Data not properly isolated between workspaces", - ) + print("✅ PASSED: JsonKVStorage - Data Isolation") + print( + " Two storage instances correctly isolated: ws1 and ws2 have different data" + ) # Test 10.4: Verify file structure print("\nTest 10.4: Verify file structure") @@ -1019,34 +920,19 @@ async def test_json_kv_storage_workspace_isolation(): print(f" workspace1 directory exists: {ws1_exists}") print(f" workspace2 directory exists: {ws2_exists}") - if ws1_exists and ws2_exists: - results.add( - "JsonKVStorage - File Structure", - True, - f"Workspace directories correctly created: {ws1_dir} and {ws2_dir}", - ) - file_structure_ok = True - else: - results.add( - "JsonKVStorage - File Structure", - False, - "Workspace directories not created properly", - ) - file_structure_ok = False + assert ws1_exists, "workspace1 directory should exist" + assert ws2_exists, "workspace2 directory should exist" - return isolated and file_structure_ok + print("✅ PASSED: JsonKVStorage - File Structure") + print(f" Workspace directories correctly created: {ws1_dir} and {ws2_dir}") - except Exception as e: - results.add("JsonKVStorage Workspace Isolation", False, f"Exception: {str(e)}") - import traceback - - traceback.print_exc() - return False finally: - # Cleanup test directory - if os.path.exists(test_dir): + # Cleanup test directory (unless keep_test_artifacts is set) + if os.path.exists(test_dir) and not keep_test_artifacts: shutil.rmtree(test_dir) print(f"\n Cleaned up test directory: {test_dir}") + elif keep_test_artifacts: + print(f"\n Kept test directory for inspection: {test_dir}") # ============================================================================= @@ -1054,58 +940,102 @@ async def test_json_kv_storage_workspace_isolation(): # ============================================================================= -async def test_lightrag_end_to_end_workspace_isolation(): +@pytest.mark.asyncio +async def test_lightrag_end_to_end_workspace_isolation(keep_test_artifacts): """ End-to-end test: Create two LightRAG instances with different workspaces, insert different data, and verify file separation. Uses mock LLM and embedding functions to avoid external API calls. """ + # Purpose: Validate that full LightRAG flows keep artifacts scoped per workspace. + # Scope: LightRAG.initialize_storages + ainsert side effects plus filesystem + # verification for generated storage files. print("\n" + "=" * 60) print("TEST 11: LightRAG End-to-End Workspace Isolation") print("=" * 60) # Create temporary test directory - test_dir = tempfile.mkdtemp(prefix="lightrag_test_e2e_") + # test_dir = tempfile.mkdtemp(prefix="lightrag_test_e2e_") + test_dir = str(Path(__file__).parent.parent / "temp/e2e_workspace_isolation") + if os.path.exists(test_dir): + shutil.rmtree(test_dir) + os.makedirs(test_dir, exist_ok=True) print(f"\n Using test directory: {test_dir}") try: - # Mock LLM function - async def mock_llm_func( - prompt, system_prompt=None, history_messages=[], **kwargs - ) -> str: - # Return a mock response that simulates entity extraction - return """{"entities": [{"name": "Test Entity", "type": "Concept"}], "relationships": []}""" + # Factory function to create different mock LLM functions for each workspace + def create_mock_llm_func(workspace_name): + """Create a mock LLM function that returns different content based on workspace""" + + async def mock_llm_func( + prompt, system_prompt=None, history_messages=[], **kwargs + ) -> str: + # Add coroutine switching to simulate async I/O and allow concurrent execution + await asyncio.sleep(0) + + # Return different responses based on workspace + # Format: entity<|#|>entity_name<|#|>entity_type<|#|>entity_description + # Format: relation<|#|>source_entity<|#|>target_entity<|#|>keywords<|#|>description + if workspace_name == "project_a": + return """entity<|#|>Artificial Intelligence<|#|>concept<|#|>AI is a field of computer science focused on creating intelligent machines. +entity<|#|>Machine Learning<|#|>concept<|#|>Machine Learning is a subset of AI that enables systems to learn from data. +relation<|#|>Machine Learning<|#|>Artificial Intelligence<|#|>subset, related field<|#|>Machine Learning is a key component and subset of Artificial Intelligence. +<|COMPLETE|>""" + else: # project_b + return """entity<|#|>Deep Learning<|#|>concept<|#|>Deep Learning is a subset of machine learning using neural networks with multiple layers. +entity<|#|>Neural Networks<|#|>concept<|#|>Neural Networks are computing systems inspired by biological neural networks. +relation<|#|>Deep Learning<|#|>Neural Networks<|#|>uses, composed of<|#|>Deep Learning uses multiple layers of Neural Networks to learn representations. +<|COMPLETE|>""" + + return mock_llm_func # Mock embedding function async def mock_embedding_func(texts: list[str]) -> np.ndarray: + # Add coroutine switching to simulate async I/O and allow concurrent execution + await asyncio.sleep(0) return np.random.rand(len(texts), 384) # 384-dimensional vectors # Test 11.1: Create two LightRAG instances with different workspaces print("\nTest 11.1: Create two LightRAG instances with different workspaces") from lightrag import LightRAG - from lightrag.utils import EmbeddingFunc + from lightrag.utils import EmbeddingFunc, Tokenizer + + # Create different mock LLM functions for each workspace + mock_llm_func_a = create_mock_llm_func("project_a") + mock_llm_func_b = create_mock_llm_func("project_b") + + class _SimpleTokenizerImpl: + def encode(self, content: str) -> list[int]: + return [ord(ch) for ch in content] + + def decode(self, tokens: list[int]) -> str: + return "".join(chr(t) for t in tokens) + + tokenizer = Tokenizer("mock-tokenizer", _SimpleTokenizerImpl()) rag1 = LightRAG( working_dir=test_dir, workspace="project_a", - llm_model_func=mock_llm_func, + llm_model_func=mock_llm_func_a, embedding_func=EmbeddingFunc( embedding_dim=384, max_token_size=8192, func=mock_embedding_func, ), + tokenizer=tokenizer, ) rag2 = LightRAG( working_dir=test_dir, workspace="project_b", - llm_model_func=mock_llm_func, + llm_model_func=mock_llm_func_b, embedding_func=EmbeddingFunc( embedding_dim=384, max_token_size=8192, func=mock_embedding_func, ), + tokenizer=tokenizer, ) # Initialize storages @@ -1115,19 +1045,23 @@ async def test_lightrag_end_to_end_workspace_isolation(): print(" RAG1 created: workspace=project_a") print(" RAG2 created: workspace=project_b") - # Test 11.2: Insert different data to each RAG instance - print("\nTest 11.2: Insert different data to each RAG instance") + # Test 11.2: Insert different data to each RAG instance (CONCURRENTLY) + print("\nTest 11.2: Insert different data to each RAG instance (concurrently)") text_for_project_a = "This document is about Artificial Intelligence and Machine Learning. AI is transforming the world." text_for_project_b = "This document is about Deep Learning and Neural Networks. Deep learning uses multiple layers." - # Insert to project_a - await rag1.ainsert(text_for_project_a) - print(f" Inserted to project_a: {len(text_for_project_a)} chars") + # Insert to both projects concurrently to test workspace isolation under concurrent load + print(" Starting concurrent insert operations...") + start_time = time.time() + await asyncio.gather( + rag1.ainsert(text_for_project_a), rag2.ainsert(text_for_project_b) + ) + elapsed_time = time.time() - start_time - # Insert to project_b - await rag2.ainsert(text_for_project_b) - print(f" Inserted to project_b: {len(text_for_project_b)} chars") + print(f" Inserted to project_a: {len(text_for_project_a)} chars (concurrent)") + print(f" Inserted to project_b: {len(text_for_project_b)} chars (concurrent)") + print(f" Total concurrent execution time: {elapsed_time:.3f}s") # Test 11.3: Verify file structure print("\nTest 11.3: Verify workspace directory structure") @@ -1143,33 +1077,24 @@ async def test_lightrag_end_to_end_workspace_isolation(): print(f" project_b directory: {project_b_dir}") print(f" project_b exists: {project_b_exists}") - if project_a_exists and project_b_exists: - # List files in each directory - print("\n Files in project_a/:") - for file in sorted(project_a_dir.glob("*")): - if file.is_file(): - size = file.stat().st_size - print(f" - {file.name} ({size} bytes)") + assert project_a_exists, "project_a directory should exist" + assert project_b_exists, "project_b directory should exist" - print("\n Files in project_b/:") - for file in sorted(project_b_dir.glob("*")): - if file.is_file(): - size = file.stat().st_size - print(f" - {file.name} ({size} bytes)") + # List files in each directory + print("\n Files in project_a/:") + for file in sorted(project_a_dir.glob("*")): + if file.is_file(): + size = file.stat().st_size + print(f" - {file.name} ({size} bytes)") - results.add( - "LightRAG E2E - File Structure", - True, - "Workspace directories correctly created and separated", - ) - structure_ok = True - else: - results.add( - "LightRAG E2E - File Structure", - False, - "Workspace directories not created properly", - ) - structure_ok = False + print("\n Files in project_b/:") + for file in sorted(project_b_dir.glob("*")): + if file.is_file(): + size = file.stat().st_size + print(f" - {file.name} ({size} bytes)") + + print("✅ PASSED: LightRAG E2E - File Structure") + print(" Workspace directories correctly created and separated") # Test 11.4: Verify data isolation by checking file contents print("\nTest 11.4: Verify data isolation (check file contents)") @@ -1191,96 +1116,55 @@ async def test_lightrag_end_to_end_workspace_isolation(): print(f" project_b doc count: {len(docs_b_content)}") # Verify they contain different data - docs_isolated = docs_a_content != docs_b_content + assert ( + docs_a_content != docs_b_content + ), "Document storage not properly isolated" - if docs_isolated: - results.add( - "LightRAG E2E - Data Isolation", - True, - "Document storage correctly isolated between workspaces", - ) - else: - results.add( - "LightRAG E2E - Data Isolation", - False, - "Document storage not properly isolated", - ) + # Verify each workspace contains its own text content + docs_a_str = json.dumps(docs_a_content) + docs_b_str = json.dumps(docs_b_content) - data_ok = docs_isolated + # Check project_a contains its text and NOT project_b's text + assert ( + "Artificial Intelligence" in docs_a_str + ), "project_a should contain 'Artificial Intelligence'" + assert ( + "Machine Learning" in docs_a_str + ), "project_a should contain 'Machine Learning'" + assert ( + "Deep Learning" not in docs_a_str + ), "project_a should NOT contain 'Deep Learning' from project_b" + assert ( + "Neural Networks" not in docs_a_str + ), "project_a should NOT contain 'Neural Networks' from project_b" + + # Check project_b contains its text and NOT project_a's text + assert ( + "Deep Learning" in docs_b_str + ), "project_b should contain 'Deep Learning'" + assert ( + "Neural Networks" in docs_b_str + ), "project_b should contain 'Neural Networks'" + assert ( + "Artificial Intelligence" not in docs_b_str + ), "project_b should NOT contain 'Artificial Intelligence' from project_a" + # Note: "Machine Learning" might appear in project_b's text, so we skip that check + + print("✅ PASSED: LightRAG E2E - Data Isolation") + print(" Document storage correctly isolated between workspaces") + print(" project_a contains only its own data") + print(" project_b contains only its own data") else: print(" Document storage files not found (may not be created yet)") - results.add( - "LightRAG E2E - Data Isolation", - True, - "Skipped file content check (files not created)", - ) - data_ok = True + print("✅ PASSED: LightRAG E2E - Data Isolation") + print(" Skipped file content check (files not created)") print("\n ✓ Test complete - workspace isolation verified at E2E level") - return structure_ok and data_ok - - except Exception as e: - results.add("LightRAG E2E Workspace Isolation", False, f"Exception: {str(e)}") - import traceback - - traceback.print_exc() - return False finally: - # Cleanup test directory - if os.path.exists(test_dir): + # Cleanup test directory (unless keep_test_artifacts is set) + if os.path.exists(test_dir) and not keep_test_artifacts: shutil.rmtree(test_dir) print(f"\n Cleaned up test directory: {test_dir}") - - -# ============================================================================= -# Main Test Runner -# ============================================================================= - - -async def main(): - """Run all tests""" - print("\n") - print("╔" + "═" * 58 + "╗") - print("║" + " " * 10 + "Workspace Isolation Test Suite" + " " * 18 + "║") - print("║" + " " * 15 + "PR #2366 - Complete Coverage" + " " * 15 + "║") - print("╚" + "═" * 58 + "╝") - - # Run all tests (ordered by priority) - # Core PR requirements (Tests 1-4) - await test_pipeline_status_isolation() - await test_lock_mechanism() - await test_backward_compatibility() - await test_multi_workspace_concurrency() - - # Additional comprehensive tests (Tests 5-9) - await test_namespace_lock_reentrance() - await test_different_namespace_lock_isolation() - await test_error_handling() - await test_update_flags_workspace_isolation() - await test_empty_workspace_standardization() - - # Integration and E2E tests (Tests 10-11) - print("\n" + "=" * 60) - print("INTEGRATION & END-TO-END TESTS") - print("=" * 60) - await test_json_kv_storage_workspace_isolation() - await test_lightrag_end_to_end_workspace_isolation() - - # Print summary - all_passed = results.summary() - - if all_passed: - print( - "\n🎉 All tests passed! The workspace isolation feature is working correctly." - ) - print(" Coverage: 100% - Unit, Integration, and E2E validated") - return 0 - else: - print("\n⚠️ Some tests failed. Please review the results above.") - return 1 - - -if __name__ == "__main__": - exit_code = asyncio.run(main()) - exit(exit_code) + elif keep_test_artifacts: + print(f"\n Kept test directory for inspection: {test_dir}")