From f7b500bca279f197d02eaf26333a8fe76f8c9144 Mon Sep 17 00:00:00 2001 From: BukeLy Date: Thu, 13 Nov 2025 22:31:14 +0800 Subject: [PATCH] feat: Add workspace isolation support for pipeline status MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Problem: In multi-tenant scenarios, different workspaces share a single global pipeline_status namespace, causing pipelines from different tenants to block each other, severely impacting concurrent processing performance. Solution: - Extended get_namespace_data() to recognize workspace-specific pipeline namespaces with pattern "{workspace}:pipeline" (following GraphDB pattern) - Added workspace parameter to initialize_pipeline_status() for per-tenant isolated pipeline namespaces - Updated all 7 call sites to use workspace-aware locks: * lightrag.py: process_document_queue(), aremove_document() * document_routes.py: background_delete_documents(), clear_documents(), cancel_pipeline(), get_pipeline_status(), delete_documents() Impact: - Different workspaces can process documents concurrently without blocking - Backward compatible: empty workspace defaults to "pipeline_status" - Maintains fail-fast: uninitialized pipeline raises clear error - Expected N× performance improvement for N concurrent tenants Bug fixes: - Fixed AttributeError by using self.workspace instead of self.global_config - Fixed pipeline status endpoint to show workspace-specific status - Fixed delete endpoint to check workspace-specific busy flag Code changes: 4 files, 141 insertions(+), 28 deletions(-) Testing: All syntax checks passed, comprehensive workspace isolation tests completed (cherry picked from commit eb52ec94d7c776ca38d6519ac2bc5a357f4bd39a) --- .gitignore | 2 + lightrag/api/routers/document_routes.py | 102 ++++++-- lightrag/kg/shared_storage.py | 309 ++++++++---------------- lightrag/lightrag.py | 39 ++- 4 files changed, 225 insertions(+), 227 deletions(-) diff --git a/.gitignore b/.gitignore index 8a5059c8..3c676aaf 100644 --- a/.gitignore +++ b/.gitignore @@ -72,3 +72,5 @@ download_models_hf.py # Cline files memory-bank +.claude/CLAUDE.md +.claude/ diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index a59f10c7..fda7a70b 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -1641,11 +1641,26 @@ async def background_delete_documents( """Background task to delete multiple documents""" from lightrag.kg.shared_storage import ( get_namespace_data, - get_pipeline_status_lock, + get_storage_keyed_lock, + initialize_pipeline_status, ) - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_pipeline_status_lock() + # Step 1: Get workspace + workspace = rag.workspace + + # Step 2: Construct namespace + namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" + + # Step 3: Ensure initialization + await initialize_pipeline_status(workspace) + + # Step 4: Get lock + pipeline_status_lock = get_storage_keyed_lock( + keys="status", namespace=namespace, enable_logging=False + ) + + # Step 5: Get data + pipeline_status = await get_namespace_data(namespace) total_docs = len(doc_ids) successful_deletions = [] @@ -2134,12 +2149,27 @@ def create_document_routes( """ from lightrag.kg.shared_storage import ( get_namespace_data, - get_pipeline_status_lock, + get_storage_keyed_lock, + initialize_pipeline_status, ) # Get pipeline status and lock - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_pipeline_status_lock() + # Step 1: Get workspace + workspace = rag.workspace + + # Step 2: Construct namespace + namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" + + # Step 3: Ensure initialization + await initialize_pipeline_status(workspace) + + # Step 4: Get lock + pipeline_status_lock = get_storage_keyed_lock( + keys="status", namespace=namespace, enable_logging=False + ) + + # Step 5: Get data + pipeline_status = await get_namespace_data(namespace) # Check and set status with lock async with pipeline_status_lock: @@ -2331,9 +2361,14 @@ def create_document_routes( from lightrag.kg.shared_storage import ( get_namespace_data, get_all_update_flags_status, + initialize_pipeline_status, ) - pipeline_status = await get_namespace_data("pipeline_status") + # Get workspace-specific pipeline status + workspace = rag.workspace + namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" + await initialize_pipeline_status(workspace) + pipeline_status = await get_namespace_data(namespace) # Get update flags status for all namespaces update_status = await get_all_update_flags_status() @@ -2538,17 +2573,31 @@ def create_document_routes( doc_ids = delete_request.doc_ids try: - from lightrag.kg.shared_storage import get_namespace_data + from lightrag.kg.shared_storage import ( + get_namespace_data, + get_storage_keyed_lock, + initialize_pipeline_status, + ) - pipeline_status = await get_namespace_data("pipeline_status") + # Get workspace-specific pipeline status + workspace = rag.workspace + namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" + await initialize_pipeline_status(workspace) - # Check if pipeline is busy - if pipeline_status.get("busy", False): - return DeleteDocByIdResponse( - status="busy", - message="Cannot delete documents while pipeline is busy", - doc_id=", ".join(doc_ids), - ) + # Use workspace-aware lock to check busy flag + pipeline_status_lock = get_storage_keyed_lock( + keys="status", namespace=namespace, enable_logging=False + ) + pipeline_status = await get_namespace_data(namespace) + + # Check if pipeline is busy with proper lock + async with pipeline_status_lock: + if pipeline_status.get("busy", False): + return DeleteDocByIdResponse( + status="busy", + message="Cannot delete documents while pipeline is busy", + doc_id=", ".join(doc_ids), + ) # Add deletion task to background tasks background_tasks.add_task( @@ -2944,11 +2993,26 @@ def create_document_routes( try: from lightrag.kg.shared_storage import ( get_namespace_data, - get_pipeline_status_lock, + get_storage_keyed_lock, + initialize_pipeline_status, ) - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_pipeline_status_lock() + # Step 1: Get workspace + workspace = rag.workspace + + # Step 2: Construct namespace + namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" + + # Step 3: Ensure initialization + await initialize_pipeline_status(workspace) + + # Step 4: Get lock + pipeline_status_lock = get_storage_keyed_lock( + keys="status", namespace=namespace, enable_logging=False + ) + + # Step 5: Get data + pipeline_status = await get_namespace_data(namespace) async with pipeline_status_lock: if not pipeline_status.get("busy", False): diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 87f0f9a9..8f34e64c 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -75,16 +75,16 @@ _last_mp_cleanup_time: Optional[float] = None _initialized = None -# Default workspace for backward compatibility -_default_workspace: Optional[str] = None - # shared data for storage across processes _shared_dicts: Optional[Dict[str, Any]] = None _init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized _update_flags: Optional[Dict[str, bool]] = None # namespace -> updated # locks for mutex access +_storage_lock: Optional[LockType] = None _internal_lock: Optional[LockType] = None +_pipeline_status_lock: Optional[LockType] = None +_graph_db_lock: Optional[LockType] = None _data_init_lock: Optional[LockType] = None # Manager for all keyed locks _storage_keyed_lock: Optional["KeyedUnifiedLock"] = None @@ -95,22 +95,6 @@ _async_locks: Optional[Dict[str, asyncio.Lock]] = None _debug_n_locks_acquired: int = 0 -def get_final_namespace(namespace: str, workspace: str | None = None): - global _default_workspace - if workspace is None: - workspace = _default_workspace - - if workspace is None: - direct_log( - f"Error: Invoke namespace operation without workspace, pid={os.getpid()}", - level="ERROR", - ) - raise ValueError("Invoke namespace operation without workspace") - - final_namespace = f"{workspace}:{namespace}" if workspace else f"{namespace}" - return final_namespace - - def inc_debug_n_locks_acquired(): global _debug_n_locks_acquired if DEBUG_LOCKS: @@ -1069,10 +1053,40 @@ def get_internal_lock(enable_logging: bool = False) -> UnifiedLock: ) -# Workspace based storage_lock is implemented by get_storage_keyed_lock instead. -# Workspace based pipeline_status_lock is implemented by get_storage_keyed_lock instead. -# No need to implement graph_db_lock: -# data integrity is ensured by entity level keyed-lock and allowing only one process to hold pipeline at a time. +def get_storage_lock(enable_logging: bool = False) -> UnifiedLock: + """return unified storage lock for data consistency""" + async_lock = _async_locks.get("storage_lock") if _is_multiprocess else None + return UnifiedLock( + lock=_storage_lock, + is_async=not _is_multiprocess, + name="storage_lock", + enable_logging=enable_logging, + async_lock=async_lock, + ) + + +def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock: + """return unified storage lock for data consistency""" + async_lock = _async_locks.get("pipeline_status_lock") if _is_multiprocess else None + return UnifiedLock( + lock=_pipeline_status_lock, + is_async=not _is_multiprocess, + name="pipeline_status_lock", + enable_logging=enable_logging, + async_lock=async_lock, + ) + + +def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock: + """return unified graph database lock for ensuring atomic operations""" + async_lock = _async_locks.get("graph_db_lock") if _is_multiprocess else None + return UnifiedLock( + lock=_graph_db_lock, + is_async=not _is_multiprocess, + name="graph_db_lock", + enable_logging=enable_logging, + async_lock=async_lock, + ) def get_storage_keyed_lock( @@ -1176,11 +1190,14 @@ def initialize_share_data(workers: int = 1): _manager, \ _workers, \ _is_multiprocess, \ + _storage_lock, \ _lock_registry, \ _lock_registry_count, \ _lock_cleanup_data, \ _registry_guard, \ _internal_lock, \ + _pipeline_status_lock, \ + _graph_db_lock, \ _data_init_lock, \ _shared_dicts, \ _init_flags, \ @@ -1208,6 +1225,9 @@ def initialize_share_data(workers: int = 1): _lock_cleanup_data = _manager.dict() _registry_guard = _manager.RLock() _internal_lock = _manager.Lock() + _storage_lock = _manager.Lock() + _pipeline_status_lock = _manager.Lock() + _graph_db_lock = _manager.Lock() _data_init_lock = _manager.Lock() _shared_dicts = _manager.dict() _init_flags = _manager.dict() @@ -1218,6 +1238,8 @@ def initialize_share_data(workers: int = 1): # Initialize async locks for multiprocess mode _async_locks = { "internal_lock": asyncio.Lock(), + "storage_lock": asyncio.Lock(), + "pipeline_status_lock": asyncio.Lock(), "graph_db_lock": asyncio.Lock(), "data_init_lock": asyncio.Lock(), } @@ -1228,6 +1250,9 @@ def initialize_share_data(workers: int = 1): else: _is_multiprocess = False _internal_lock = asyncio.Lock() + _storage_lock = asyncio.Lock() + _pipeline_status_lock = asyncio.Lock() + _graph_db_lock = asyncio.Lock() _data_init_lock = asyncio.Lock() _shared_dicts = {} _init_flags = {} @@ -1245,19 +1270,23 @@ def initialize_share_data(workers: int = 1): _initialized = True -async def initialize_pipeline_status(workspace: str | None = None): +async def initialize_pipeline_status(workspace: str = ""): """ - Initialize pipeline_status share data with default values. - This function could be called before during FASTAPI lifespan for each worker. + Initialize pipeline namespace with default values. Args: - workspace: Optional workspace identifier for pipeline_status of specific workspace. - If None or empty string, uses the default workspace set by - set_default_workspace(). + workspace: Optional workspace identifier for multi-tenant isolation. + Empty string (default) uses global "pipeline_status" namespace. + + This function is called during FASTAPI lifespan for each worker. """ - pipeline_namespace = await get_namespace_data( - "pipeline_status", first_init=True, workspace=workspace - ) + # Construct namespace (following GraphDB pattern) + if workspace: + namespace = f"{workspace}:pipeline" + else: + namespace = "pipeline_status" # Backward compatibility + + pipeline_namespace = await get_namespace_data(namespace, first_init=True) async with get_internal_lock(): # Check if already initialized by checking for required fields @@ -1280,14 +1309,12 @@ async def initialize_pipeline_status(workspace: str | None = None): "history_messages": history_messages, # 使用共享列表对象 } ) - - final_namespace = get_final_namespace("pipeline_status", workspace) direct_log( - f"Process {os.getpid()} Pipeline namespace '{final_namespace}' initialized" + f"Process {os.getpid()} Pipeline namespace '{namespace}' initialized" ) -async def get_update_flag(namespace: str, workspace: str | None = None): +async def get_update_flag(namespace: str): """ Create a namespace's update flag for a workers. Returen the update flag to caller for referencing or reset. @@ -1296,16 +1323,14 @@ async def get_update_flag(namespace: str, workspace: str | None = None): if _update_flags is None: raise ValueError("Try to create namespace before Shared-Data is initialized") - final_namespace = get_final_namespace(namespace, workspace) - async with get_internal_lock(): - if final_namespace not in _update_flags: + if namespace not in _update_flags: if _is_multiprocess and _manager is not None: - _update_flags[final_namespace] = _manager.list() + _update_flags[namespace] = _manager.list() else: - _update_flags[final_namespace] = [] + _update_flags[namespace] = [] direct_log( - f"Process {os.getpid()} initialized updated flags for namespace: [{final_namespace}]" + f"Process {os.getpid()} initialized updated flags for namespace: [{namespace}]" ) if _is_multiprocess and _manager is not None: @@ -1318,43 +1343,39 @@ async def get_update_flag(namespace: str, workspace: str | None = None): new_update_flag = MutableBoolean(False) - _update_flags[final_namespace].append(new_update_flag) + _update_flags[namespace].append(new_update_flag) return new_update_flag -async def set_all_update_flags(namespace: str, workspace: str | None = None): +async def set_all_update_flags(namespace: str): """Set all update flag of namespace indicating all workers need to reload data from files""" global _update_flags if _update_flags is None: raise ValueError("Try to create namespace before Shared-Data is initialized") - final_namespace = get_final_namespace(namespace, workspace) - async with get_internal_lock(): - if final_namespace not in _update_flags: - raise ValueError(f"Namespace {final_namespace} not found in update flags") + if namespace not in _update_flags: + raise ValueError(f"Namespace {namespace} not found in update flags") # Update flags for both modes - for i in range(len(_update_flags[final_namespace])): - _update_flags[final_namespace][i].value = True + for i in range(len(_update_flags[namespace])): + _update_flags[namespace][i].value = True -async def clear_all_update_flags(namespace: str, workspace: str | None = None): +async def clear_all_update_flags(namespace: str): """Clear all update flag of namespace indicating all workers need to reload data from files""" global _update_flags if _update_flags is None: raise ValueError("Try to create namespace before Shared-Data is initialized") - final_namespace = get_final_namespace(namespace, workspace) - async with get_internal_lock(): - if final_namespace not in _update_flags: - raise ValueError(f"Namespace {final_namespace} not found in update flags") + if namespace not in _update_flags: + raise ValueError(f"Namespace {namespace} not found in update flags") # Update flags for both modes - for i in range(len(_update_flags[final_namespace])): - _update_flags[final_namespace][i].value = False + for i in range(len(_update_flags[namespace])): + _update_flags[namespace][i].value = False -async def get_all_update_flags_status(workspace: str | None = None) -> Dict[str, list]: +async def get_all_update_flags_status() -> Dict[str, list]: """ Get update flags status for all namespaces. @@ -1364,17 +1385,9 @@ async def get_all_update_flags_status(workspace: str | None = None) -> Dict[str, if _update_flags is None: return {} - if workspace is None: - workspace = get_default_workspace - result = {} async with get_internal_lock(): for namespace, flags in _update_flags.items(): - namespace_split = namespace.split(":") - if workspace and not namespace_split[0] == workspace: - continue - if not workspace and namespace_split[0]: - continue worker_statuses = [] for flag in flags: if _is_multiprocess: @@ -1386,9 +1399,7 @@ async def get_all_update_flags_status(workspace: str | None = None) -> Dict[str, return result -async def try_initialize_namespace( - namespace: str, workspace: str | None = None -) -> bool: +async def try_initialize_namespace(namespace: str) -> bool: """ Returns True if the current worker(process) gets initialization permission for loading data later. The worker does not get the permission is prohibited to load data from files. @@ -1398,139 +1409,57 @@ async def try_initialize_namespace( if _init_flags is None: raise ValueError("Try to create nanmespace before Shared-Data is initialized") - final_namespace = get_final_namespace(namespace, workspace) - async with get_internal_lock(): - if final_namespace not in _init_flags: - _init_flags[final_namespace] = True + if namespace not in _init_flags: + _init_flags[namespace] = True direct_log( - f"Process {os.getpid()} ready to initialize storage namespace: [{final_namespace}]" + f"Process {os.getpid()} ready to initialize storage namespace: [{namespace}]" ) return True direct_log( - f"Process {os.getpid()} storage namespace already initialized: [{final_namespace}]" + f"Process {os.getpid()} storage namespace already initialized: [{namespace}]" ) return False async def get_namespace_data( - namespace: str, first_init: bool = False, workspace: str | None = None + namespace: str, first_init: bool = False ) -> Dict[str, Any]: """get the shared data reference for specific namespace Args: namespace: The namespace to retrieve - first_init: If True, allows pipeline_status namespace to create namespace if it doesn't exist. - Prevent getting pipeline_status namespace without initialize_pipeline_status(). - This parameter is used internally by initialize_pipeline_status(). - workspace: Workspace identifier (may be empty string for global namespace) + allow_create: If True, allows creation of the namespace if it doesn't exist. + Used internally by initialize_pipeline_status(). """ if _shared_dicts is None: direct_log( - f"Error: Try to getnanmespace before it is initialized, pid={os.getpid()}", + f"Error: try to getnanmespace before it is initialized, pid={os.getpid()}", level="ERROR", ) raise ValueError("Shared dictionaries not initialized") - final_namespace = get_final_namespace(namespace, workspace) - async with get_internal_lock(): - if final_namespace not in _shared_dicts: + if namespace not in _shared_dicts: # Special handling for pipeline_status namespace - if final_namespace.endswith(":pipeline_status") and not first_init: + # Supports both global "pipeline_status" and workspace-specific "{workspace}:pipeline" + is_pipeline = namespace == "pipeline_status" or namespace.endswith( + ":pipeline" + ) + + if is_pipeline and not first_init: # Check if pipeline_status should have been initialized but wasn't - # This helps users to call initialize_pipeline_status() before get_namespace_data() - raise PipelineNotInitializedError(final_namespace) + # This helps users understand they need to call initialize_pipeline_status() + raise PipelineNotInitializedError(namespace) # For other namespaces or when allow_create=True, create them dynamically if _is_multiprocess and _manager is not None: - _shared_dicts[final_namespace] = _manager.dict() + _shared_dicts[namespace] = _manager.dict() else: - _shared_dicts[final_namespace] = {} + _shared_dicts[namespace] = {} - return _shared_dicts[final_namespace] - - -class NamespaceLock: - """ - Reusable namespace lock wrapper that creates a fresh context on each use. - - This class solves the lock re-entrance issue by implementing the async context - manager protocol. Each time it's used in an 'async with' statement, it creates - a new _KeyedLockContext internally, allowing the same NamespaceLock instance - to be used multiple times safely, even in concurrent scenarios. - - Example: - lock = NamespaceLock("my_namespace", "workspace1") - - # Can be used multiple times safely - async with lock: - await do_something() - - # Can even be used concurrently (each creates its own context) - await asyncio.gather( - use_lock_1(lock), - use_lock_2(lock) - ) - """ - - def __init__( - self, namespace: str, workspace: str | None = None, enable_logging: bool = False - ): - self._namespace = namespace - self._workspace = workspace - self._enable_logging = enable_logging - self._current_ctx = None - - async def __aenter__(self): - """Create a fresh context each time we enter""" - final_namespace = get_final_namespace(self._namespace, self._workspace) - self._current_ctx = get_storage_keyed_lock( - ["default_key"], - namespace=final_namespace, - enable_logging=self._enable_logging, - ) - return await self._current_ctx.__aenter__() - - async def __aexit__(self, exc_type, exc_val, exc_tb): - """Exit the current context and clean up""" - if self._current_ctx is None: - raise RuntimeError("NamespaceLock exited without being entered") - - result = await self._current_ctx.__aexit__(exc_type, exc_val, exc_tb) - self._current_ctx = None - return result - - -def get_namespace_lock( - namespace: str, workspace: str | None = None, enable_logging: bool = False -) -> NamespaceLock: - """Get a reusable namespace lock wrapper. - - This function returns a NamespaceLock instance that can be used multiple times - safely, even in concurrent scenarios. Each use creates a fresh lock context - internally, preventing lock re-entrance errors. - - Args: - namespace: The namespace to get the lock for. - workspace: Workspace identifier (may be empty string for global namespace) - enable_logging: Whether to enable lock operation logging - - Returns: - NamespaceLock: A reusable lock wrapper that can be used with 'async with' - - Example: - lock = get_namespace_lock("pipeline_status", workspace="space1") - - # Can be used multiple times - async with lock: - await do_something() - - async with lock: - await do_something_else() - """ - return NamespaceLock(namespace, workspace, enable_logging) + return _shared_dicts[namespace] def finalize_share_data(): @@ -1546,7 +1475,10 @@ def finalize_share_data(): global \ _manager, \ _is_multiprocess, \ + _storage_lock, \ _internal_lock, \ + _pipeline_status_lock, \ + _graph_db_lock, \ _data_init_lock, \ _shared_dicts, \ _init_flags, \ @@ -1611,41 +1543,12 @@ def finalize_share_data(): _is_multiprocess = None _shared_dicts = None _init_flags = None + _storage_lock = None _internal_lock = None + _pipeline_status_lock = None + _graph_db_lock = None _data_init_lock = None _update_flags = None _async_locks = None direct_log(f"Process {os.getpid()} storage data finalization complete") - - -def set_default_workspace(workspace: str | None = None): - """ - Set default workspace for namespace operations for backward compatibility. - - This allows get_namespace_data(),get_namespace_lock() or initialize_pipeline_status() to - automatically use the correct workspace when called without workspace parameters, - maintaining compatibility with legacy code that doesn't pass workspace explicitly. - - Args: - workspace: Workspace identifier (may be empty string for global namespace) - """ - global _default_workspace - if workspace is None: - workspace = "" - _default_workspace = workspace - direct_log( - f"Default workspace set to: '{_default_workspace}' (empty means global)", - level="DEBUG", - ) - - -def get_default_workspace() -> str: - """ - Get default workspace for backward compatibility. - - Returns: - The default workspace string. Empty string means global namespace. None means not set. - """ - global _default_workspace - return _default_workspace diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 0d706dfc..4b3fc3a6 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -61,9 +61,10 @@ from lightrag.kg import ( from lightrag.kg.shared_storage import ( get_namespace_data, - get_pipeline_status_lock, get_graph_db_lock, get_data_init_lock, + get_storage_keyed_lock, + initialize_pipeline_status, ) from lightrag.base import ( @@ -1573,8 +1574,22 @@ class LightRAG: """ # Get pipeline status shared data and lock - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_pipeline_status_lock() + # Step 1: Get workspace + workspace = self.workspace + + # Step 2: Construct namespace (following GraphDB pattern) + namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" + + # Step 3: Ensure initialization (on first access) + await initialize_pipeline_status(workspace) + + # Step 4: Get lock + pipeline_status_lock = get_storage_keyed_lock( + keys="status", namespace=namespace, enable_logging=False + ) + + # Step 5: Get data + pipeline_status = await get_namespace_data(namespace) # Check if another process is already processing the queue async with pipeline_status_lock: @@ -2902,8 +2917,22 @@ class LightRAG: doc_llm_cache_ids: list[str] = [] # Get pipeline status shared data and lock for status updates - pipeline_status = await get_namespace_data("pipeline_status") - pipeline_status_lock = get_pipeline_status_lock() + # Step 1: Get workspace + workspace = self.workspace + + # Step 2: Construct namespace (following GraphDB pattern) + namespace = f"{workspace}:pipeline" if workspace else "pipeline_status" + + # Step 3: Ensure initialization (on first access) + await initialize_pipeline_status(workspace) + + # Step 4: Get lock + pipeline_status_lock = get_storage_keyed_lock( + keys="status", namespace=namespace, enable_logging=False + ) + + # Step 5: Get data + pipeline_status = await get_namespace_data(namespace) async with pipeline_status_lock: log_message = f"Starting deletion process for document {doc_id}"